1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 #if (NXT_THREADS) 11 static void nxt_job_thread_trampoline(nxt_task_t *task, void *obj, void *data); 12 static void nxt_job_thread_return_handler(nxt_task_t *task, void *obj, 13 void *data); 14 #endif 15 16 17 void * 18 nxt_job_create(nxt_mem_pool_t *mp, size_t size) 19 { 20 size_t cache_size; 21 nxt_job_t *job; 22 23 if (mp == NULL) { 24 mp = nxt_mem_pool_create(256); 25 26 if (nxt_slow_path(mp == NULL)) { 27 return NULL; 28 } 29 30 job = nxt_mem_zalloc(mp, size); 31 cache_size = 0; 32 33 } else { 34 job = nxt_mem_cache_zalloc0(mp, size); 35 cache_size = size; 36 } 37 38 if (nxt_fast_path(job != NULL)) { 39 job->cache_size = (uint16_t) cache_size; 40 job->mem_pool = mp; 41 nxt_job_set_name(job, "job"); 42 } 43 44 /* Allow safe nxt_queue_remove() in nxt_job_destroy(). */ 45 nxt_queue_self(&job->link); 46 47 return job; 48 } 49 50 51 void 52 nxt_job_init(nxt_job_t *job, size_t size) 53 { 54 nxt_memzero(job, size); 55 56 nxt_job_set_name(job, "job"); 57 58 nxt_queue_self(&job->link); 59 } 60 61 62 void 63 nxt_job_destroy(nxt_task_t *task, void *data) 64 { 65 nxt_job_t *job; 66 67 job = data; 68 69 nxt_queue_remove(&job->link); 70 71 if (job->cache_size == 0) { 72 73 if (job->mem_pool != NULL) { 74 nxt_mem_pool_destroy(job->mem_pool); 75 } 76 77 } else { 78 nxt_mem_cache_free0(job->mem_pool, job, job->cache_size); 79 } 80 } 81 82 83 nxt_int_t 84 nxt_job_cleanup_add(nxt_mem_pool_t *mp, nxt_job_t *job) 85 { 86 nxt_mem_pool_cleanup_t *mpcl; 87 88 mpcl = nxt_mem_pool_cleanup(mp, 0); 89 90 if (nxt_fast_path(mpcl != NULL)) { 91 mpcl->handler = nxt_job_destroy; 92 mpcl->data = job; 93 return NXT_OK; 94 } 95 96 return NXT_ERROR; 97 } 98 99 100 /* 101 * The (void *) casts in nxt_thread_pool_post() and nxt_event_engine_post() 102 * calls and to the "nxt_work_handler_t" are required by Sun C. 103 */ 104 105 void 106 nxt_job_start(nxt_task_t *task, nxt_job_t *job, nxt_work_handler_t handler) 107 { 108 nxt_debug(task, "%s start", job->name); 109 110 #if (NXT_THREADS) 111 112 if (job->thread_pool != NULL) { 113 nxt_int_t ret; 114 115 job->engine = task->thread->engine; 116 117 nxt_work_set(&job->work, nxt_job_thread_trampoline, 118 job->task, job, (void *) handler); 119 120 ret = nxt_thread_pool_post(job->thread_pool, &job->work); 121 122 if (ret == NXT_OK) { 123 return; 124 } 125 126 handler = job->abort_handler; 127 } 128 129 #endif 130 131 handler(job->task, job, job->data); 132 } 133 134 135 #if (NXT_THREADS) 136 137 /* A trampoline function is called by a thread pool thread. */ 138 139 static void 140 nxt_job_thread_trampoline(nxt_task_t *task, void *obj, void *data) 141 { 142 nxt_job_t *job; 143 nxt_work_handler_t handler; 144 145 job = obj; 146 handler = (nxt_work_handler_t) data; 147 148 nxt_debug(task, "%s thread", job->name); 149 150 if (nxt_slow_path(job->cancel)) { 151 nxt_job_return(task, job, job->abort_handler); 152 153 } else { 154 handler(job->task, job, job->data); 155 } 156 } 157 158 #endif 159 160 161 void 162 nxt_job_return(nxt_task_t *task, nxt_job_t *job, nxt_work_handler_t handler) 163 { 164 nxt_debug(task, "%s return", job->name); 165 166 #if (NXT_THREADS) 167 168 if (job->engine != NULL) { 169 /* A return function is called in thread pool thread context. */ 170 171 nxt_work_set(&job->work, nxt_job_thread_return_handler, 172 job->task, job, (void *) handler); 173 174 nxt_event_engine_post(job->engine, &job->work); 175 176 return; 177 } 178 179 #endif 180 181 if (nxt_slow_path(job->cancel)) { 182 nxt_debug(task, "%s cancellation", job->name); 183 handler = job->abort_handler; 184 } 185 186 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 187 handler, job->task, job, job->data); 188 } 189 190 191 #if (NXT_THREADS) 192 193 static void 194 nxt_job_thread_return_handler(nxt_task_t *task, void *obj, void *data) 195 { 196 nxt_job_t *job; 197 nxt_work_handler_t handler; 198 199 job = obj; 200 handler = (nxt_work_handler_t) data; 201 202 job->task->thread = task->thread; 203 204 if (nxt_slow_path(job->cancel)) { 205 nxt_debug(task, "%s cancellation", job->name); 206 handler = job->abort_handler; 207 } 208 209 handler(job->task, job, job->data); 210 } 211 212 #endif 213