1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7#include <nxt_main.h> 8 9 10static nxt_int_t nxt_thread_pool_init(nxt_thread_pool_t *tp);
| 1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7#include <nxt_main.h> 8 9 10static nxt_int_t nxt_thread_pool_init(nxt_thread_pool_t *tp);
|
11static void nxt_thread_pool_exit(nxt_thread_t *thr, void *obj, void *data);
| 11static void nxt_thread_pool_exit(nxt_task_t *task, void *obj, void *data);
|
12static void nxt_thread_pool_start(void *ctx); 13static void nxt_thread_pool_wait(nxt_thread_pool_t *tp); 14 15 16nxt_thread_pool_t * 17nxt_thread_pool_create(nxt_uint_t max_threads, nxt_nsec_t timeout, 18 nxt_thread_pool_init_t init, nxt_event_engine_t *engine, 19 nxt_work_handler_t exit) 20{ 21 nxt_thread_pool_t *tp; 22 23 tp = nxt_zalloc(sizeof(nxt_thread_pool_t)); 24 if (tp == NULL) { 25 return NULL; 26 } 27 28 tp->max_threads = max_threads; 29 tp->timeout = timeout; 30 tp->engine = engine;
| 12static void nxt_thread_pool_start(void *ctx); 13static void nxt_thread_pool_wait(nxt_thread_pool_t *tp); 14 15 16nxt_thread_pool_t * 17nxt_thread_pool_create(nxt_uint_t max_threads, nxt_nsec_t timeout, 18 nxt_thread_pool_init_t init, nxt_event_engine_t *engine, 19 nxt_work_handler_t exit) 20{ 21 nxt_thread_pool_t *tp; 22 23 tp = nxt_zalloc(sizeof(nxt_thread_pool_t)); 24 if (tp == NULL) { 25 return NULL; 26 } 27 28 tp->max_threads = max_threads; 29 tp->timeout = timeout; 30 tp->engine = engine;
|
| 31 tp->task.thread = engine->task.thread; 32 tp->task.log = engine->task.log;
|
31 tp->init = init; 32 tp->exit = exit; 33 34 return tp; 35} 36 37 38nxt_int_t 39nxt_thread_pool_post(nxt_thread_pool_t *tp, nxt_work_handler_t handler,
| 33 tp->init = init; 34 tp->exit = exit; 35 36 return tp; 37} 38 39 40nxt_int_t 41nxt_thread_pool_post(nxt_thread_pool_t *tp, nxt_work_handler_t handler,
|
40 void *obj, void *data, nxt_log_t *log)
| 42 nxt_task_t *task, void *obj, void *data)
|
41{ 42 nxt_thread_log_debug("thread pool post"); 43 44 if (nxt_slow_path(nxt_thread_pool_init(tp) != NXT_OK)) { 45 return NXT_ERROR; 46 } 47
| 43{ 44 nxt_thread_log_debug("thread pool post"); 45 46 if (nxt_slow_path(nxt_thread_pool_init(tp) != NXT_OK)) { 47 return NXT_ERROR; 48 } 49
|
48 nxt_locked_work_queue_add(&tp->work_queue, handler, obj, data, log);
| 50 nxt_locked_work_queue_add(&tp->work_queue, handler, task, obj, data);
|
49 50 (void) nxt_sem_post(&tp->sem); 51 52 return NXT_OK; 53} 54 55 56static nxt_int_t 57nxt_thread_pool_init(nxt_thread_pool_t *tp) 58{ 59 nxt_int_t ret; 60 nxt_thread_link_t *link; 61 nxt_thread_handle_t handle; 62 63 if (nxt_fast_path(tp->ready)) { 64 return NXT_OK; 65 } 66 67 nxt_thread_spin_lock(&tp->work_queue.lock); 68 69 ret = NXT_OK; 70 71 if (!tp->ready) { 72 73 nxt_thread_log_debug("thread pool init"); 74 75 (void) nxt_atomic_fetch_add(&tp->threads, 1); 76 77 if (nxt_fast_path(nxt_sem_init(&tp->sem, 0) == NXT_OK)) { 78 79 nxt_locked_work_queue_create(&tp->work_queue, 0); 80 81 link = nxt_malloc(sizeof(nxt_thread_link_t)); 82 83 if (nxt_fast_path(link != NULL)) { 84 link->start = nxt_thread_pool_start; 85 link->data = tp; 86 link->engine = tp->engine; 87 /* 88 * link->exit is not used. link->engine is used just to 89 * set thr->link by nxt_thread_trampoline() and the link 90 * is a mark of the first thread of pool. 91 */ 92 if (nxt_thread_create(&handle, link) == NXT_OK) { 93 tp->ready = 1; 94 goto done; 95 } 96 } 97 98 nxt_sem_destroy(&tp->sem); 99 } 100 101 (void) nxt_atomic_fetch_add(&tp->threads, -1); 102 103 nxt_locked_work_queue_destroy(&tp->work_queue); 104 105 ret = NXT_ERROR; 106 } 107 108done: 109 110 nxt_thread_spin_unlock(&tp->work_queue.lock); 111 112 return ret; 113} 114 115 116static void 117nxt_thread_pool_start(void *ctx) 118{ 119 void *obj, *data;
| 51 52 (void) nxt_sem_post(&tp->sem); 53 54 return NXT_OK; 55} 56 57 58static nxt_int_t 59nxt_thread_pool_init(nxt_thread_pool_t *tp) 60{ 61 nxt_int_t ret; 62 nxt_thread_link_t *link; 63 nxt_thread_handle_t handle; 64 65 if (nxt_fast_path(tp->ready)) { 66 return NXT_OK; 67 } 68 69 nxt_thread_spin_lock(&tp->work_queue.lock); 70 71 ret = NXT_OK; 72 73 if (!tp->ready) { 74 75 nxt_thread_log_debug("thread pool init"); 76 77 (void) nxt_atomic_fetch_add(&tp->threads, 1); 78 79 if (nxt_fast_path(nxt_sem_init(&tp->sem, 0) == NXT_OK)) { 80 81 nxt_locked_work_queue_create(&tp->work_queue, 0); 82 83 link = nxt_malloc(sizeof(nxt_thread_link_t)); 84 85 if (nxt_fast_path(link != NULL)) { 86 link->start = nxt_thread_pool_start; 87 link->data = tp; 88 link->engine = tp->engine; 89 /* 90 * link->exit is not used. link->engine is used just to 91 * set thr->link by nxt_thread_trampoline() and the link 92 * is a mark of the first thread of pool. 93 */ 94 if (nxt_thread_create(&handle, link) == NXT_OK) { 95 tp->ready = 1; 96 goto done; 97 } 98 } 99 100 nxt_sem_destroy(&tp->sem); 101 } 102 103 (void) nxt_atomic_fetch_add(&tp->threads, -1); 104 105 nxt_locked_work_queue_destroy(&tp->work_queue); 106 107 ret = NXT_ERROR; 108 } 109 110done: 111 112 nxt_thread_spin_unlock(&tp->work_queue.lock); 113 114 return ret; 115} 116 117 118static void 119nxt_thread_pool_start(void *ctx) 120{ 121 void *obj, *data;
|
| 122 nxt_task_t *task;
|
120 nxt_thread_t *thr; 121 nxt_thread_pool_t *tp; 122 nxt_work_handler_t handler; 123 124 tp = ctx; 125 thr = nxt_thread(); 126 127 if (thr->link != NULL) { 128 /* Only the first thread has a link. */ 129 tp->main = thr->handle; 130 nxt_free(thr->link); 131 thr->link = NULL;
| 123 nxt_thread_t *thr; 124 nxt_thread_pool_t *tp; 125 nxt_work_handler_t handler; 126 127 tp = ctx; 128 thr = nxt_thread(); 129 130 if (thr->link != NULL) { 131 /* Only the first thread has a link. */ 132 tp->main = thr->handle; 133 nxt_free(thr->link); 134 thr->link = NULL;
|
| 135 136 tp->task.thread = thr;
|
132 } 133 134 thr->thread_pool = tp; 135 136 if (tp->init != NULL) { 137 tp->init(); 138 } 139 140 nxt_thread_work_queue_create(thr, 8); 141 142 for ( ;; ) { 143 nxt_thread_pool_wait(tp); 144
| 137 } 138 139 thr->thread_pool = tp; 140 141 if (tp->init != NULL) { 142 tp->init(); 143 } 144 145 nxt_thread_work_queue_create(thr, 8); 146 147 for ( ;; ) { 148 nxt_thread_pool_wait(tp); 149
|
145 handler = nxt_locked_work_queue_pop(&tp->work_queue, &obj, 146 &data, &thr->log);
| 150 handler = nxt_locked_work_queue_pop(&tp->work_queue, &task, &obj, 151 &data);
|
147 148 if (nxt_fast_path(handler != NULL)) {
| 152 153 if (nxt_fast_path(handler != NULL)) {
|
| 154 task->thread = thr;
|
149 nxt_log_debug(thr->log, "locked work queue");
| 155 nxt_log_debug(thr->log, "locked work queue");
|
150 handler(thr, obj, data);
| 156 handler(task, obj, data);
|
151 } 152 153 for ( ;; ) { 154 thr->log = &nxt_main_log; 155
| 157 } 158 159 for ( ;; ) { 160 thr->log = &nxt_main_log; 161
|
156 handler = nxt_thread_work_queue_pop(thr, &obj, &data, &thr->log);
| 162 handler = nxt_thread_work_queue_pop(thr, &task, &obj, &data);
|
157 158 if (handler == NULL) { 159 break; 160 } 161
| 163 164 if (handler == NULL) { 165 break; 166 } 167
|
162 handler(thr, obj, data);
| 168 handler(task, obj, data);
|
163 } 164 165 thr->log = &nxt_main_log; 166 } 167} 168 169 170static void 171nxt_thread_pool_wait(nxt_thread_pool_t *tp) 172{ 173 nxt_err_t err; 174 nxt_thread_t *thr; 175 nxt_atomic_uint_t waiting, threads; 176 nxt_thread_link_t *link; 177 nxt_thread_handle_t handle; 178 179 thr = nxt_thread(); 180 181 nxt_log_debug(thr->log, "thread pool wait"); 182 183 (void) nxt_atomic_fetch_add(&tp->waiting, 1); 184 185 for ( ;; ) { 186 err = nxt_sem_wait(&tp->sem, tp->timeout); 187 188 if (err == 0) { 189 waiting = nxt_atomic_fetch_add(&tp->waiting, -1); 190 break; 191 } 192 193 if (err == NXT_ETIMEDOUT) { 194 if (nxt_thread_handle_equal(thr->handle, tp->main)) { 195 continue; 196 } 197 } 198 199 (void) nxt_atomic_fetch_add(&tp->waiting, -1); 200 (void) nxt_atomic_fetch_add(&tp->threads, -1); 201 202 nxt_thread_exit(thr); 203 nxt_unreachable(); 204 } 205 206 nxt_log_debug(thr->log, "thread pool awake, waiting: %A", waiting); 207 208 if (waiting > 1) { 209 return; 210 } 211 212 do { 213 threads = tp->threads; 214 215 if (threads >= tp->max_threads) { 216 return; 217 } 218 219 } while (!nxt_atomic_cmp_set(&tp->threads, threads, threads + 1)); 220 221 link = nxt_zalloc(sizeof(nxt_thread_link_t)); 222 223 if (nxt_fast_path(link != NULL)) { 224 link->start = nxt_thread_pool_start; 225 link->data = tp; 226 227 if (nxt_thread_create(&handle, link) != NXT_OK) { 228 (void) nxt_atomic_fetch_add(&tp->threads, -1); 229 } 230 } 231} 232 233 234void 235nxt_thread_pool_destroy(nxt_thread_pool_t *tp) 236{ 237 nxt_thread_t *thr; 238
| 169 } 170 171 thr->log = &nxt_main_log; 172 } 173} 174 175 176static void 177nxt_thread_pool_wait(nxt_thread_pool_t *tp) 178{ 179 nxt_err_t err; 180 nxt_thread_t *thr; 181 nxt_atomic_uint_t waiting, threads; 182 nxt_thread_link_t *link; 183 nxt_thread_handle_t handle; 184 185 thr = nxt_thread(); 186 187 nxt_log_debug(thr->log, "thread pool wait"); 188 189 (void) nxt_atomic_fetch_add(&tp->waiting, 1); 190 191 for ( ;; ) { 192 err = nxt_sem_wait(&tp->sem, tp->timeout); 193 194 if (err == 0) { 195 waiting = nxt_atomic_fetch_add(&tp->waiting, -1); 196 break; 197 } 198 199 if (err == NXT_ETIMEDOUT) { 200 if (nxt_thread_handle_equal(thr->handle, tp->main)) { 201 continue; 202 } 203 } 204 205 (void) nxt_atomic_fetch_add(&tp->waiting, -1); 206 (void) nxt_atomic_fetch_add(&tp->threads, -1); 207 208 nxt_thread_exit(thr); 209 nxt_unreachable(); 210 } 211 212 nxt_log_debug(thr->log, "thread pool awake, waiting: %A", waiting); 213 214 if (waiting > 1) { 215 return; 216 } 217 218 do { 219 threads = tp->threads; 220 221 if (threads >= tp->max_threads) { 222 return; 223 } 224 225 } while (!nxt_atomic_cmp_set(&tp->threads, threads, threads + 1)); 226 227 link = nxt_zalloc(sizeof(nxt_thread_link_t)); 228 229 if (nxt_fast_path(link != NULL)) { 230 link->start = nxt_thread_pool_start; 231 link->data = tp; 232 233 if (nxt_thread_create(&handle, link) != NXT_OK) { 234 (void) nxt_atomic_fetch_add(&tp->threads, -1); 235 } 236 } 237} 238 239 240void 241nxt_thread_pool_destroy(nxt_thread_pool_t *tp) 242{ 243 nxt_thread_t *thr; 244
|
239 if (!tp->ready) { 240 thr = nxt_thread();
| 245 thr = nxt_thread();
|
241
| 246
|
| 247 if (!tp->ready) {
|
242 nxt_thread_work_queue_add(thr, &thr->work_queue.main, tp->exit,
| 248 nxt_thread_work_queue_add(thr, &thr->work_queue.main, tp->exit,
|
243 tp, NULL, &nxt_main_log);
| 249 &tp->task, tp, NULL);
|
244 return; 245 } 246 247 if (tp->max_threads != 0) { 248 /* Disable new threads creation and mark a pool as being destroyed. */ 249 tp->max_threads = 0; 250
| 250 return; 251 } 252 253 if (tp->max_threads != 0) { 254 /* Disable new threads creation and mark a pool as being destroyed. */ 255 tp->max_threads = 0; 256
|
251 nxt_thread_pool_post(tp, nxt_thread_pool_exit, tp, NULL, &nxt_main_log);
| 257 nxt_thread_pool_post(tp, nxt_thread_pool_exit, &tp->task, tp, NULL);
|
252 } 253} 254 255 256/* 257 * A thread handle (pthread_t) is either pointer or integer, so it can be 258 * passed as work handler pointer "data" argument. To convert void pointer 259 * to pthread_t and vice versa the source argument should be cast first to 260 * uintptr_t type and then to the destination type. 261 * 262 * If the handle would be a struct it should be stored in thread pool and 263 * the thread pool must be freed in the thread pool exit procedure after 264 * the last thread of pool will exit. 265 */ 266 267static void
| 258 } 259} 260 261 262/* 263 * A thread handle (pthread_t) is either pointer or integer, so it can be 264 * passed as work handler pointer "data" argument. To convert void pointer 265 * to pthread_t and vice versa the source argument should be cast first to 266 * uintptr_t type and then to the destination type. 267 * 268 * If the handle would be a struct it should be stored in thread pool and 269 * the thread pool must be freed in the thread pool exit procedure after 270 * the last thread of pool will exit. 271 */ 272 273static void
|
268nxt_thread_pool_exit(nxt_thread_t *thr, void *obj, void *data)
| 274nxt_thread_pool_exit(nxt_task_t *task, void *obj, void *data)
|
269{
| 275{
|
| 276 nxt_thread_t *thread;
|
270 nxt_thread_pool_t *tp; 271 nxt_atomic_uint_t threads; 272 nxt_thread_handle_t handle; 273 274 tp = obj;
| 277 nxt_thread_pool_t *tp; 278 nxt_atomic_uint_t threads; 279 nxt_thread_handle_t handle; 280 281 tp = obj;
|
| 282 thread = task->thread;
|
275
| 283
|
276 nxt_log_debug(thr->log, "thread pool exit");
| 284 nxt_debug(task, "thread pool exit");
|
277 278 if (data != NULL) { 279 handle = (nxt_thread_handle_t) (uintptr_t) data; 280 nxt_thread_wait(handle); 281 } 282 283 threads = nxt_atomic_fetch_add(&tp->threads, -1); 284
| 285 286 if (data != NULL) { 287 handle = (nxt_thread_handle_t) (uintptr_t) data; 288 nxt_thread_wait(handle); 289 } 290 291 threads = nxt_atomic_fetch_add(&tp->threads, -1); 292
|
285 nxt_log_debug(thr->log, "thread pool threads: %A", threads);
| 293 nxt_debug(task, "thread pool threads: %A", threads);
|
286 287 if (threads > 1) {
| 294 295 if (threads > 1) {
|
288 nxt_thread_pool_post(tp, nxt_thread_pool_exit, tp, 289 (void *) (uintptr_t) thr->handle, &nxt_main_log);
| 296 nxt_thread_pool_post(tp, nxt_thread_pool_exit, &tp->task, tp, 297 (void *) (uintptr_t) thread->handle);
|
290 291 } else {
| 298 299 } else {
|
292 nxt_main_log_debug("thread pool destroy");
| 300 nxt_debug(task, "thread pool destroy");
|
293
| 301
|
294 nxt_event_engine_post(tp->engine, tp->exit, tp, 295 (void *) (uintptr_t) thr->handle, &nxt_main_log);
| 302 nxt_event_engine_post(tp->engine, tp->exit, &tp->task, tp, 303 (void *) (uintptr_t) thread->handle, 304 &nxt_main_log);
|
296 297 nxt_sem_destroy(&tp->sem); 298 299 nxt_locked_work_queue_destroy(&tp->work_queue); 300 301 nxt_free(tp); 302 } 303
| 305 306 nxt_sem_destroy(&tp->sem); 307 308 nxt_locked_work_queue_destroy(&tp->work_queue); 309 310 nxt_free(tp); 311 } 312
|
304 nxt_thread_work_queue_destroy(thr);
| 313 nxt_thread_work_queue_destroy(thread);
|
305
| 314
|
306 nxt_thread_exit(thr);
| 315 nxt_thread_exit(thread); 316
|
307 nxt_unreachable(); 308}
| 317 nxt_unreachable(); 318}
|