1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 #if (NXT_THREADS) 11 static void nxt_job_thread_trampoline(nxt_task_t *task, void *obj, void *data); 12 static void nxt_job_thread_return_handler(nxt_task_t *task, void *obj, 13 void *data); 14 #endif 15 16 17 void * 18 nxt_job_create(nxt_mem_pool_t *mp, size_t size) 19 { 20 size_t cache_size; 21 nxt_job_t *job; 22 23 if (mp == NULL) { 24 mp = nxt_mem_pool_create(256); 25 26 if (nxt_slow_path(mp == NULL)) { 27 return NULL; 28 } 29 30 job = nxt_mem_zalloc(mp, size); 31 cache_size = 0; 32 33 } else { 34 job = nxt_mem_cache_zalloc0(mp, size); 35 cache_size = size; 36 } 37 38 if (nxt_fast_path(job != NULL)) { 39 job->cache_size = (uint16_t) cache_size; 40 job->mem_pool = mp; 41 nxt_job_set_name(job, "job"); 42 } 43 44 /* Allow safe nxt_queue_remove() in nxt_job_destroy(). */ 45 nxt_queue_self(&job->link); 46 47 job->task.ident = nxt_task_next_ident(); 48 49 return job; 50 } 51 52 53 void 54 nxt_job_init(nxt_job_t *job, size_t size) 55 { 56 nxt_memzero(job, size); 57 58 nxt_job_set_name(job, "job"); 59 60 nxt_queue_self(&job->link); 61 62 job->task.ident = nxt_task_next_ident(); 63 } 64 65 66 void 67 nxt_job_destroy(void *data) 68 { 69 nxt_job_t *job; 70 71 job = data; 72 73 nxt_queue_remove(&job->link); 74 75 if (job->cache_size == 0) { 76 77 if (job->mem_pool != NULL) { 78 nxt_mem_pool_destroy(job->mem_pool); 79 } 80 81 } else { 82 nxt_mem_cache_free0(job->mem_pool, job, job->cache_size); 83 } 84 } 85 86 87 nxt_int_t 88 nxt_job_cleanup_add(nxt_mem_pool_t *mp, nxt_job_t *job) 89 { 90 nxt_mem_pool_cleanup_t *mpcl; 91 92 mpcl = nxt_mem_pool_cleanup(mp, 0); 93 94 if (nxt_fast_path(mpcl != NULL)) { 95 mpcl->handler = nxt_job_destroy; 96 mpcl->data = job; 97 return NXT_OK; 98 } 99 100 return NXT_ERROR; 101 } 102 103 104 /* 105 * The (void *) casts in nxt_thread_pool_post() and nxt_event_engine_post() 106 * calls and to the "nxt_work_handler_t" are required by Sun C. 107 */ 108 109 void 110 nxt_job_start(nxt_task_t *task, nxt_job_t *job, nxt_work_handler_t handler) 111 { 112 nxt_debug(task, "%s start", job->name); 113 114 #if (NXT_THREADS) 115 116 if (job->thread_pool != NULL) { 117 nxt_int_t ret; 118 119 job->engine = task->thread->engine; 120 121 ret = nxt_thread_pool_post(job->thread_pool, nxt_job_thread_trampoline, 122 &job->task, job, (void *) handler); 123 if (ret == NXT_OK) { 124 return; 125 } 126 127 handler = job->abort_handler; 128 } 129 130 #endif 131 132 handler(&job->task, job, job->data); 133 } 134 135 136 #if (NXT_THREADS) 137 138 /* A trampoline function is called by a thread pool thread. */ 139 140 static void 141 nxt_job_thread_trampoline(nxt_task_t *task, void *obj, void *data) 142 { 143 nxt_job_t *job; 144 nxt_work_handler_t handler; 145 146 job = obj; 147 handler = (nxt_work_handler_t) data; 148 149 job->task.log = job->log; 150 151 nxt_debug(task, "%s thread", job->name); 152 153 if (nxt_slow_path(job->cancel)) { 154 nxt_job_return(task, job, job->abort_handler); 155 156 } else { 157 handler(&job->task, job, job->data); 158 } 159 } 160 161 #endif 162 163 164 void 165 nxt_job_return(nxt_task_t *task, nxt_job_t *job, nxt_work_handler_t handler) 166 { 167 nxt_debug(task, "%s return", job->name); 168 169 #if (NXT_THREADS) 170 171 if (job->engine != NULL) { 172 /* A return function is called in thread pool thread context. */ 173 nxt_event_engine_post(job->engine, nxt_job_thread_return_handler, 174 &job->task, job, (void *) handler, job->log); 175 return; 176 } 177 178 #endif 179 180 if (nxt_slow_path(job->cancel)) { 181 nxt_debug(task, "%s cancellation", job->name); 182 handler = job->abort_handler; 183 } 184 185 nxt_thread_work_queue_push(task->thread, &task->thread->work_queue.main, 186 handler, &job->task, job, job->data); 187 } 188 189 190 #if (NXT_THREADS) 191 192 static void 193 nxt_job_thread_return_handler(nxt_task_t *task, void *obj, void *data) 194 { 195 nxt_job_t *job; 196 nxt_work_handler_t handler; 197 198 job = obj; 199 handler = (nxt_work_handler_t) data; 200 201 job->task.thread = task->thread; 202 203 if (nxt_slow_path(job->cancel)) { 204 nxt_debug(task, "%s cancellation", job->name); 205 handler = job->abort_handler; 206 } 207 208 handler(&job->task, job, job->data); 209 } 210 211 #endif 212