1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7#include <nxt_main.h> 8 9 10#if (NXT_THREADS)
| 1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7#include <nxt_main.h> 8 9 10#if (NXT_THREADS)
|
11static void nxt_job_thread_trampoline(nxt_thread_t *thr, void *obj, void *data); 12static void nxt_job_thread_return_handler(nxt_thread_t *thr, void *obj,
| 11static void nxt_job_thread_trampoline(nxt_task_t *task, void *obj, void *data); 12static void nxt_job_thread_return_handler(nxt_task_t *task, void *obj,
|
13 void *data); 14#endif 15 16 17void * 18nxt_job_create(nxt_mem_pool_t *mp, size_t size) 19{ 20 size_t cache_size; 21 nxt_job_t *job; 22 23 if (mp == NULL) { 24 mp = nxt_mem_pool_create(256); 25 26 if (nxt_slow_path(mp == NULL)) { 27 return NULL; 28 } 29 30 job = nxt_mem_zalloc(mp, size); 31 cache_size = 0; 32 33 } else { 34 job = nxt_mem_cache_zalloc0(mp, size); 35 cache_size = size; 36 } 37 38 if (nxt_fast_path(job != NULL)) { 39 job->cache_size = (uint16_t) cache_size; 40 job->mem_pool = mp; 41 nxt_job_set_name(job, "job"); 42 } 43 44 /* Allow safe nxt_queue_remove() in nxt_job_destroy(). */ 45 nxt_queue_self(&job->link); 46
| 13 void *data); 14#endif 15 16 17void * 18nxt_job_create(nxt_mem_pool_t *mp, size_t size) 19{ 20 size_t cache_size; 21 nxt_job_t *job; 22 23 if (mp == NULL) { 24 mp = nxt_mem_pool_create(256); 25 26 if (nxt_slow_path(mp == NULL)) { 27 return NULL; 28 } 29 30 job = nxt_mem_zalloc(mp, size); 31 cache_size = 0; 32 33 } else { 34 job = nxt_mem_cache_zalloc0(mp, size); 35 cache_size = size; 36 } 37 38 if (nxt_fast_path(job != NULL)) { 39 job->cache_size = (uint16_t) cache_size; 40 job->mem_pool = mp; 41 nxt_job_set_name(job, "job"); 42 } 43 44 /* Allow safe nxt_queue_remove() in nxt_job_destroy(). */ 45 nxt_queue_self(&job->link); 46
|
| 47 job->task.ident = nxt_task_next_ident(); 48
|
47 return job; 48} 49 50 51void 52nxt_job_init(nxt_job_t *job, size_t size) 53{ 54 nxt_memzero(job, size); 55 56 nxt_job_set_name(job, "job"); 57 58 nxt_queue_self(&job->link);
| 49 return job; 50} 51 52 53void 54nxt_job_init(nxt_job_t *job, size_t size) 55{ 56 nxt_memzero(job, size); 57 58 nxt_job_set_name(job, "job"); 59 60 nxt_queue_self(&job->link);
|
| 61 62 job->task.ident = nxt_task_next_ident();
|
59} 60 61 62void 63nxt_job_destroy(void *data) 64{ 65 nxt_job_t *job; 66 67 job = data; 68 69 nxt_queue_remove(&job->link); 70 71 if (job->cache_size == 0) { 72 73 if (job->mem_pool != NULL) { 74 nxt_mem_pool_destroy(job->mem_pool); 75 } 76 77 } else { 78 nxt_mem_cache_free0(job->mem_pool, job, job->cache_size); 79 } 80} 81 82 83nxt_int_t 84nxt_job_cleanup_add(nxt_mem_pool_t *mp, nxt_job_t *job) 85{ 86 nxt_mem_pool_cleanup_t *mpcl; 87 88 mpcl = nxt_mem_pool_cleanup(mp, 0); 89 90 if (nxt_fast_path(mpcl != NULL)) { 91 mpcl->handler = nxt_job_destroy; 92 mpcl->data = job; 93 return NXT_OK; 94 } 95 96 return NXT_ERROR; 97} 98 99 100/* 101 * The (void *) casts in nxt_thread_pool_post() and nxt_event_engine_post() 102 * calls and to the "nxt_work_handler_t" are required by Sun C. 103 */ 104 105void
| 63} 64 65 66void 67nxt_job_destroy(void *data) 68{ 69 nxt_job_t *job; 70 71 job = data; 72 73 nxt_queue_remove(&job->link); 74 75 if (job->cache_size == 0) { 76 77 if (job->mem_pool != NULL) { 78 nxt_mem_pool_destroy(job->mem_pool); 79 } 80 81 } else { 82 nxt_mem_cache_free0(job->mem_pool, job, job->cache_size); 83 } 84} 85 86 87nxt_int_t 88nxt_job_cleanup_add(nxt_mem_pool_t *mp, nxt_job_t *job) 89{ 90 nxt_mem_pool_cleanup_t *mpcl; 91 92 mpcl = nxt_mem_pool_cleanup(mp, 0); 93 94 if (nxt_fast_path(mpcl != NULL)) { 95 mpcl->handler = nxt_job_destroy; 96 mpcl->data = job; 97 return NXT_OK; 98 } 99 100 return NXT_ERROR; 101} 102 103 104/* 105 * The (void *) casts in nxt_thread_pool_post() and nxt_event_engine_post() 106 * calls and to the "nxt_work_handler_t" are required by Sun C. 107 */ 108 109void
|
106nxt_job_start(nxt_thread_t *thr, nxt_job_t *job, nxt_work_handler_t handler)
| 110nxt_job_start(nxt_task_t *task, nxt_job_t *job, nxt_work_handler_t handler)
|
107{
| 111{
|
108 nxt_log_debug(thr->log, "%s start", job->name);
| 112 nxt_debug(task, "%s start", job->name);
|
109 110#if (NXT_THREADS) 111 112 if (job->thread_pool != NULL) { 113 nxt_int_t ret; 114
| 113 114#if (NXT_THREADS) 115 116 if (job->thread_pool != NULL) { 117 nxt_int_t ret; 118
|
115 job->engine = thr->engine;
| 119 job->engine = task->thread->engine; 120
|
116 ret = nxt_thread_pool_post(job->thread_pool, nxt_job_thread_trampoline,
| 121 ret = nxt_thread_pool_post(job->thread_pool, nxt_job_thread_trampoline,
|
117 job, (void *) handler, job->log);
| 122 &job->task, job, (void *) handler);
|
118 if (ret == NXT_OK) { 119 return; 120 } 121 122 handler = job->abort_handler; 123 } 124 125#endif 126
| 123 if (ret == NXT_OK) { 124 return; 125 } 126 127 handler = job->abort_handler; 128 } 129 130#endif 131
|
127 handler(thr, job, job->data);
| 132 handler(&job->task, job, job->data);
|
128} 129 130 131#if (NXT_THREADS) 132 133/* A trampoline function is called by a thread pool thread. */ 134 135static void
| 133} 134 135 136#if (NXT_THREADS) 137 138/* A trampoline function is called by a thread pool thread. */ 139 140static void
|
136nxt_job_thread_trampoline(nxt_thread_t *thr, void *obj, void *data)
| 141nxt_job_thread_trampoline(nxt_task_t *task, void *obj, void *data)
|
137{ 138 nxt_job_t *job; 139 nxt_work_handler_t handler; 140 141 job = obj; 142 handler = (nxt_work_handler_t) data; 143
| 142{ 143 nxt_job_t *job; 144 nxt_work_handler_t handler; 145 146 job = obj; 147 handler = (nxt_work_handler_t) data; 148
|
144 nxt_log_debug(thr->log, "%s thread", job->name);
| 149 job->task.log = job->log;
|
145
| 150
|
| 151 nxt_debug(task, "%s thread", job->name); 152
|
146 if (nxt_slow_path(job->cancel)) {
| 153 if (nxt_slow_path(job->cancel)) {
|
147 nxt_job_return(thr, job, job->abort_handler);
| 154 nxt_job_return(task, job, job->abort_handler);
|
148 149 } else {
| 155 156 } else {
|
150 handler(thr, job, job->data);
| 157 handler(&job->task, job, job->data);
|
151 } 152} 153 154#endif 155 156 157void
| 158 } 159} 160 161#endif 162 163 164void
|
158nxt_job_return(nxt_thread_t *thr, nxt_job_t *job, nxt_work_handler_t handler)
| 165nxt_job_return(nxt_task_t *task, nxt_job_t *job, nxt_work_handler_t handler)
|
159{
| 166{
|
160 nxt_log_debug(thr->log, "%s return", job->name);
| 167 nxt_debug(task, "%s return", job->name);
|
161 162#if (NXT_THREADS) 163 164 if (job->engine != NULL) { 165 /* A return function is called in thread pool thread context. */ 166 nxt_event_engine_post(job->engine, nxt_job_thread_return_handler,
| 168 169#if (NXT_THREADS) 170 171 if (job->engine != NULL) { 172 /* A return function is called in thread pool thread context. */ 173 nxt_event_engine_post(job->engine, nxt_job_thread_return_handler,
|
167 job, (void *) handler, job->log);
| 174 &job->task, job, (void *) handler, job->log);
|
168 return; 169 } 170 171#endif 172 173 if (nxt_slow_path(job->cancel)) {
| 175 return; 176 } 177 178#endif 179 180 if (nxt_slow_path(job->cancel)) {
|
174 nxt_log_debug(thr->log, "%s cancellation", job->name);
| 181 nxt_debug(task, "%s cancellation", job->name);
|
175 handler = job->abort_handler; 176 } 177
| 182 handler = job->abort_handler; 183 } 184
|
178 nxt_thread_work_queue_push(thr, &thr->work_queue.main, 179 handler, job, job->data, thr->log);
| 185 nxt_thread_work_queue_push(task->thread, &task->thread->work_queue.main, 186 handler, &job->task, job, job->data);
|
180} 181 182 183#if (NXT_THREADS) 184 185static void
| 187} 188 189 190#if (NXT_THREADS) 191 192static void
|
186nxt_job_thread_return_handler(nxt_thread_t *thr, void *obj, void *data)
| 193nxt_job_thread_return_handler(nxt_task_t *task, void *obj, void *data)
|
187{ 188 nxt_job_t *job; 189 nxt_work_handler_t handler; 190 191 job = obj; 192 handler = (nxt_work_handler_t) data; 193
| 194{ 195 nxt_job_t *job; 196 nxt_work_handler_t handler; 197 198 job = obj; 199 handler = (nxt_work_handler_t) data; 200
|
| 201 job->task.thread = task->thread; 202
|
194 if (nxt_slow_path(job->cancel)) {
| 203 if (nxt_slow_path(job->cancel)) {
|
195 nxt_log_debug(thr->log, "%s cancellation", job->name);
| 204 nxt_debug(task, "%s cancellation", job->name);
|
196 handler = job->abort_handler; 197 } 198
| 205 handler = job->abort_handler; 206 } 207
|
199 handler(thr, job, job->data);
| 208 handler(&job->task, job, job->data);
|
200} 201 202#endif
| 209} 210 211#endif
|