Deleted
Added
1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7#include <nxt_main.h> 8 9 10static nxt_int_t nxt_thread_pool_init(nxt_thread_pool_t *tp); |
11static void nxt_thread_pool_exit(nxt_task_t *task, void *obj, void *data); |
12static void nxt_thread_pool_start(void *ctx); 13static void nxt_thread_pool_wait(nxt_thread_pool_t *tp); 14 15 16nxt_thread_pool_t * 17nxt_thread_pool_create(nxt_uint_t max_threads, nxt_nsec_t timeout, 18 nxt_thread_pool_init_t init, nxt_event_engine_t *engine, 19 nxt_work_handler_t exit) 20{ 21 nxt_thread_pool_t *tp; 22 23 tp = nxt_zalloc(sizeof(nxt_thread_pool_t)); 24 if (tp == NULL) { 25 return NULL; 26 } 27 28 tp->max_threads = max_threads; 29 tp->timeout = timeout; 30 tp->engine = engine; |
31 tp->task.thread = engine->task.thread; 32 tp->task.log = engine->task.log; |
33 tp->init = init; 34 tp->exit = exit; 35 36 return tp; 37} 38 39 40nxt_int_t 41nxt_thread_pool_post(nxt_thread_pool_t *tp, nxt_work_handler_t handler, |
42 nxt_task_t *task, void *obj, void *data) |
43{ 44 nxt_thread_log_debug("thread pool post"); 45 46 if (nxt_slow_path(nxt_thread_pool_init(tp) != NXT_OK)) { 47 return NXT_ERROR; 48 } 49 |
50 nxt_locked_work_queue_add(&tp->work_queue, handler, task, obj, data); |
51 52 (void) nxt_sem_post(&tp->sem); 53 54 return NXT_OK; 55} 56 57 58static nxt_int_t --- 55 unchanged lines hidden (view full) --- 114 return ret; 115} 116 117 118static void 119nxt_thread_pool_start(void *ctx) 120{ 121 void *obj, *data; |
122 nxt_task_t *task; |
123 nxt_thread_t *thr; 124 nxt_thread_pool_t *tp; 125 nxt_work_handler_t handler; 126 127 tp = ctx; 128 thr = nxt_thread(); 129 130 if (thr->link != NULL) { 131 /* Only the first thread has a link. */ 132 tp->main = thr->handle; 133 nxt_free(thr->link); 134 thr->link = NULL; |
135 136 tp->task.thread = thr; |
137 } 138 139 thr->thread_pool = tp; 140 141 if (tp->init != NULL) { 142 tp->init(); 143 } 144 145 nxt_thread_work_queue_create(thr, 8); 146 147 for ( ;; ) { 148 nxt_thread_pool_wait(tp); 149 |
150 handler = nxt_locked_work_queue_pop(&tp->work_queue, &task, &obj, 151 &data); |
152 153 if (nxt_fast_path(handler != NULL)) { |
154 task->thread = thr; |
155 nxt_log_debug(thr->log, "locked work queue"); |
156 handler(task, obj, data); |
157 } 158 159 for ( ;; ) { 160 thr->log = &nxt_main_log; 161 |
162 handler = nxt_thread_work_queue_pop(thr, &task, &obj, &data); |
163 164 if (handler == NULL) { 165 break; 166 } 167 |
168 handler(task, obj, data); |
169 } 170 171 thr->log = &nxt_main_log; 172 } 173} 174 175 176static void --- 60 unchanged lines hidden (view full) --- 237} 238 239 240void 241nxt_thread_pool_destroy(nxt_thread_pool_t *tp) 242{ 243 nxt_thread_t *thr; 244 |
245 thr = nxt_thread(); |
246 |
247 if (!tp->ready) { |
248 nxt_thread_work_queue_add(thr, &thr->work_queue.main, tp->exit, |
249 &tp->task, tp, NULL); |
250 return; 251 } 252 253 if (tp->max_threads != 0) { 254 /* Disable new threads creation and mark a pool as being destroyed. */ 255 tp->max_threads = 0; 256 |
257 nxt_thread_pool_post(tp, nxt_thread_pool_exit, &tp->task, tp, NULL); |
258 } 259} 260 261 262/* 263 * A thread handle (pthread_t) is either pointer or integer, so it can be 264 * passed as work handler pointer "data" argument. To convert void pointer 265 * to pthread_t and vice versa the source argument should be cast first to 266 * uintptr_t type and then to the destination type. 267 * 268 * If the handle would be a struct it should be stored in thread pool and 269 * the thread pool must be freed in the thread pool exit procedure after 270 * the last thread of pool will exit. 271 */ 272 273static void |
274nxt_thread_pool_exit(nxt_task_t *task, void *obj, void *data) |
275{ |
276 nxt_thread_t *thread; |
277 nxt_thread_pool_t *tp; 278 nxt_atomic_uint_t threads; 279 nxt_thread_handle_t handle; 280 281 tp = obj; |
282 thread = task->thread; |
283 |
284 nxt_debug(task, "thread pool exit"); |
285 286 if (data != NULL) { 287 handle = (nxt_thread_handle_t) (uintptr_t) data; 288 nxt_thread_wait(handle); 289 } 290 291 threads = nxt_atomic_fetch_add(&tp->threads, -1); 292 |
293 nxt_debug(task, "thread pool threads: %A", threads); |
294 295 if (threads > 1) { |
296 nxt_thread_pool_post(tp, nxt_thread_pool_exit, &tp->task, tp, 297 (void *) (uintptr_t) thread->handle); |
298 299 } else { |
300 nxt_debug(task, "thread pool destroy"); |
301 |
302 nxt_event_engine_post(tp->engine, tp->exit, &tp->task, tp, 303 (void *) (uintptr_t) thread->handle, 304 &nxt_main_log); |
305 306 nxt_sem_destroy(&tp->sem); 307 308 nxt_locked_work_queue_destroy(&tp->work_queue); 309 310 nxt_free(tp); 311 } 312 |
313 nxt_thread_work_queue_destroy(thread); |
314 |
315 nxt_thread_exit(thread); 316 |
317 nxt_unreachable(); 318} |