1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 #if (NXT_THREADS) 11 static void nxt_job_thread_trampoline(nxt_task_t *task, void *obj, void *data); 12 static void nxt_job_thread_return_handler(nxt_task_t *task, void *obj, 13 void *data); 14 #endif 15 16 17 void * 18 nxt_job_create(nxt_mp_t *mp, size_t size) 19 { 20 size_t cache_size; 21 nxt_job_t *job; 22 23 if (mp == NULL) { 24 mp = nxt_mp_create(1024, 128, 256, 32); 25 if (nxt_slow_path(mp == NULL)) { 26 return NULL; 27 } 28 29 job = nxt_mp_zget(mp, size); 30 cache_size = 0; 31 32 } else { 33 job = nxt_mp_zalloc(mp, size); 34 cache_size = size; 35 } 36 37 if (nxt_fast_path(job != NULL)) { 38 job->cache_size = (uint16_t) cache_size; 39 job->mem_pool = mp; 40 nxt_job_set_name(job, "job"); 41 } 42 43 /* Allow safe nxt_queue_remove() in nxt_job_destroy(). */ 44 nxt_queue_self(&job->link); 45 46 return job; 47 } 48 49 50 void 51 nxt_job_init(nxt_job_t *job, size_t size) 52 { 53 nxt_memzero(job, size); 54 55 nxt_job_set_name(job, "job"); 56 57 nxt_queue_self(&job->link); 58 } 59 60 61 void 62 nxt_job_destroy(nxt_task_t *task, void *data) 63 { 64 nxt_job_t *job; 65 66 job = data; 67 68 nxt_queue_remove(&job->link); 69 70 if (job->cache_size == 0) { 71 72 if (job->mem_pool != NULL) { 73 nxt_mp_destroy(job->mem_pool); 74 } 75 76 } else { 77 nxt_mp_free(job->mem_pool, job); 78 } 79 } 80 81 82 #if 0 83 84 nxt_int_t 85 nxt_job_cleanup_add(nxt_mp_t *mp, nxt_job_t *job) 86 { 87 nxt_mem_pool_cleanup_t *mpcl; 88 89 mpcl = nxt_mem_pool_cleanup(mp, 0); 90 91 if (nxt_fast_path(mpcl != NULL)) { 92 mpcl->handler = nxt_job_destroy; 93 mpcl->data = job; 94 return NXT_OK; 95 } 96 97 return NXT_ERROR; 98 } 99 100 #endif 101 102 103 /* 104 * The (void *) casts in nxt_thread_pool_post() and nxt_event_engine_post() 105 * calls and to the "nxt_work_handler_t" are required by Sun C. 106 */ 107 108 void 109 nxt_job_start(nxt_task_t *task, nxt_job_t *job, nxt_work_handler_t handler) 110 { 111 nxt_debug(task, "%s start", job->name); 112 113 #if (NXT_THREADS) 114 115 if (job->thread_pool != NULL) { 116 nxt_int_t ret; 117 118 job->engine = task->thread->engine; 119 120 nxt_work_set(&job->work, nxt_job_thread_trampoline, 121 job->task, job, (void *) handler); 122 123 ret = nxt_thread_pool_post(job->thread_pool, &job->work); 124 125 if (ret == NXT_OK) { 126 return; 127 } 128 129 handler = job->abort_handler; 130 } 131 132 #endif 133 134 handler(job->task, job, job->data); 135 } 136 137 138 #if (NXT_THREADS) 139 140 /* A trampoline function is called by a thread pool thread. */ 141 142 static void 143 nxt_job_thread_trampoline(nxt_task_t *task, void *obj, void *data) 144 { 145 nxt_job_t *job; 146 nxt_work_handler_t handler; 147 148 job = obj; 149 handler = (nxt_work_handler_t) data; 150 151 nxt_debug(task, "%s thread", job->name); 152 153 if (nxt_slow_path(job->cancel)) { 154 nxt_job_return(task, job, job->abort_handler); 155 156 } else { 157 handler(job->task, job, job->data); 158 } 159 } 160 161 #endif 162 163 164 void 165 nxt_job_return(nxt_task_t *task, nxt_job_t *job, nxt_work_handler_t handler) 166 { 167 nxt_debug(task, "%s return", job->name); 168 169 #if (NXT_THREADS) 170 171 if (job->engine != NULL) { 172 /* A return function is called in thread pool thread context. */ 173 174 nxt_work_set(&job->work, nxt_job_thread_return_handler, 175 job->task, job, (void *) handler); 176 177 nxt_event_engine_post(job->engine, &job->work); 178 179 return; 180 } 181 182 #endif 183 184 if (nxt_slow_path(job->cancel)) { 185 nxt_debug(task, "%s cancellation", job->name); 186 handler = job->abort_handler; 187 } 188 189 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 190 handler, job->task, job, job->data); 191 } 192 193 194 #if (NXT_THREADS) 195 196 static void 197 nxt_job_thread_return_handler(nxt_task_t *task, void *obj, void *data) 198 { 199 nxt_job_t *job; 200 nxt_work_handler_t handler; 201 202 job = obj; 203 handler = (nxt_work_handler_t) data; 204 205 job->task->thread = task->thread; 206 207 if (nxt_slow_path(job->cancel)) { 208 nxt_debug(task, "%s cancellation", job->name); 209 handler = job->abort_handler; 210 } 211 212 handler(job->task, job, job->data); 213 } 214 215 #endif 216