xref: /unit/src/nxt_thread_pool.c (revision 494:7c83ddcc1c42)
10Sigor@sysoev.ru 
20Sigor@sysoev.ru /*
30Sigor@sysoev.ru  * Copyright (C) Igor Sysoev
40Sigor@sysoev.ru  * Copyright (C) NGINX, Inc.
50Sigor@sysoev.ru  */
60Sigor@sysoev.ru 
70Sigor@sysoev.ru #include <nxt_main.h>
80Sigor@sysoev.ru 
90Sigor@sysoev.ru 
100Sigor@sysoev.ru static nxt_int_t nxt_thread_pool_init(nxt_thread_pool_t *tp);
111Sigor@sysoev.ru static void nxt_thread_pool_exit(nxt_task_t *task, void *obj, void *data);
120Sigor@sysoev.ru static void nxt_thread_pool_start(void *ctx);
1353Sigor@sysoev.ru static void nxt_thread_pool_loop(void *ctx);
140Sigor@sysoev.ru static void nxt_thread_pool_wait(nxt_thread_pool_t *tp);
150Sigor@sysoev.ru 
160Sigor@sysoev.ru 
170Sigor@sysoev.ru nxt_thread_pool_t *
nxt_thread_pool_create(nxt_uint_t max_threads,nxt_nsec_t timeout,nxt_thread_pool_init_t init,nxt_event_engine_t * engine,nxt_work_handler_t exit)180Sigor@sysoev.ru nxt_thread_pool_create(nxt_uint_t max_threads, nxt_nsec_t timeout,
190Sigor@sysoev.ru     nxt_thread_pool_init_t init, nxt_event_engine_t *engine,
200Sigor@sysoev.ru     nxt_work_handler_t exit)
210Sigor@sysoev.ru {
220Sigor@sysoev.ru     nxt_thread_pool_t  *tp;
230Sigor@sysoev.ru 
240Sigor@sysoev.ru     tp = nxt_zalloc(sizeof(nxt_thread_pool_t));
250Sigor@sysoev.ru     if (tp == NULL) {
260Sigor@sysoev.ru         return NULL;
270Sigor@sysoev.ru     }
280Sigor@sysoev.ru 
290Sigor@sysoev.ru     tp->max_threads = max_threads;
300Sigor@sysoev.ru     tp->timeout = timeout;
310Sigor@sysoev.ru     tp->engine = engine;
321Sigor@sysoev.ru     tp->task.thread = engine->task.thread;
331Sigor@sysoev.ru     tp->task.log = engine->task.log;
340Sigor@sysoev.ru     tp->init = init;
350Sigor@sysoev.ru     tp->exit = exit;
360Sigor@sysoev.ru 
370Sigor@sysoev.ru     return tp;
380Sigor@sysoev.ru }
390Sigor@sysoev.ru 
400Sigor@sysoev.ru 
410Sigor@sysoev.ru nxt_int_t
nxt_thread_pool_post(nxt_thread_pool_t * tp,nxt_work_t * work)424Sigor@sysoev.ru nxt_thread_pool_post(nxt_thread_pool_t *tp, nxt_work_t *work)
430Sigor@sysoev.ru {
440Sigor@sysoev.ru     nxt_thread_log_debug("thread pool post");
450Sigor@sysoev.ru 
460Sigor@sysoev.ru     if (nxt_slow_path(nxt_thread_pool_init(tp) != NXT_OK)) {
470Sigor@sysoev.ru         return NXT_ERROR;
480Sigor@sysoev.ru     }
490Sigor@sysoev.ru 
504Sigor@sysoev.ru     nxt_locked_work_queue_add(&tp->work_queue, work);
510Sigor@sysoev.ru 
520Sigor@sysoev.ru     (void) nxt_sem_post(&tp->sem);
530Sigor@sysoev.ru 
540Sigor@sysoev.ru     return NXT_OK;
550Sigor@sysoev.ru }
560Sigor@sysoev.ru 
570Sigor@sysoev.ru 
580Sigor@sysoev.ru static nxt_int_t
nxt_thread_pool_init(nxt_thread_pool_t * tp)590Sigor@sysoev.ru nxt_thread_pool_init(nxt_thread_pool_t *tp)
600Sigor@sysoev.ru {
610Sigor@sysoev.ru     nxt_int_t            ret;
620Sigor@sysoev.ru     nxt_thread_link_t    *link;
630Sigor@sysoev.ru     nxt_thread_handle_t  handle;
640Sigor@sysoev.ru 
650Sigor@sysoev.ru     if (nxt_fast_path(tp->ready)) {
660Sigor@sysoev.ru         return NXT_OK;
670Sigor@sysoev.ru     }
680Sigor@sysoev.ru 
694Sigor@sysoev.ru     if (tp->max_threads == 0) {
704Sigor@sysoev.ru         /* The pool is being destroyed. */
714Sigor@sysoev.ru         return NXT_ERROR;
724Sigor@sysoev.ru     }
734Sigor@sysoev.ru 
740Sigor@sysoev.ru     nxt_thread_spin_lock(&tp->work_queue.lock);
750Sigor@sysoev.ru 
760Sigor@sysoev.ru     ret = NXT_OK;
770Sigor@sysoev.ru 
780Sigor@sysoev.ru     if (!tp->ready) {
790Sigor@sysoev.ru 
800Sigor@sysoev.ru         nxt_thread_log_debug("thread pool init");
810Sigor@sysoev.ru 
820Sigor@sysoev.ru         (void) nxt_atomic_fetch_add(&tp->threads, 1);
830Sigor@sysoev.ru 
840Sigor@sysoev.ru         if (nxt_fast_path(nxt_sem_init(&tp->sem, 0) == NXT_OK)) {
850Sigor@sysoev.ru 
86157Sigor@sysoev.ru             link = nxt_zalloc(sizeof(nxt_thread_link_t));
870Sigor@sysoev.ru 
880Sigor@sysoev.ru             if (nxt_fast_path(link != NULL)) {
890Sigor@sysoev.ru                 link->start = nxt_thread_pool_start;
9053Sigor@sysoev.ru                 link->work.data = tp;
9153Sigor@sysoev.ru 
920Sigor@sysoev.ru                 if (nxt_thread_create(&handle, link) == NXT_OK) {
930Sigor@sysoev.ru                     tp->ready = 1;
940Sigor@sysoev.ru                     goto done;
950Sigor@sysoev.ru                 }
960Sigor@sysoev.ru             }
970Sigor@sysoev.ru 
980Sigor@sysoev.ru             nxt_sem_destroy(&tp->sem);
990Sigor@sysoev.ru         }
1000Sigor@sysoev.ru 
1010Sigor@sysoev.ru         (void) nxt_atomic_fetch_add(&tp->threads, -1);
1020Sigor@sysoev.ru 
1030Sigor@sysoev.ru         ret = NXT_ERROR;
1040Sigor@sysoev.ru     }
1050Sigor@sysoev.ru 
1060Sigor@sysoev.ru done:
1070Sigor@sysoev.ru 
1080Sigor@sysoev.ru     nxt_thread_spin_unlock(&tp->work_queue.lock);
1090Sigor@sysoev.ru 
1100Sigor@sysoev.ru     return ret;
1110Sigor@sysoev.ru }
1120Sigor@sysoev.ru 
1130Sigor@sysoev.ru 
1140Sigor@sysoev.ru static void
nxt_thread_pool_start(void * ctx)1150Sigor@sysoev.ru nxt_thread_pool_start(void *ctx)
1160Sigor@sysoev.ru {
11753Sigor@sysoev.ru     nxt_thread_t       *thr;
11853Sigor@sysoev.ru     nxt_thread_pool_t  *tp;
11953Sigor@sysoev.ru 
12053Sigor@sysoev.ru     tp = ctx;
12153Sigor@sysoev.ru     thr = nxt_thread();
12253Sigor@sysoev.ru 
12353Sigor@sysoev.ru     tp->main = thr->handle;
12453Sigor@sysoev.ru     tp->task.thread = thr;
12553Sigor@sysoev.ru 
12653Sigor@sysoev.ru     nxt_thread_pool_loop(ctx);
12753Sigor@sysoev.ru }
12853Sigor@sysoev.ru 
12953Sigor@sysoev.ru 
13053Sigor@sysoev.ru static void
nxt_thread_pool_loop(void * ctx)13153Sigor@sysoev.ru nxt_thread_pool_loop(void *ctx)
13253Sigor@sysoev.ru {
1330Sigor@sysoev.ru     void                *obj, *data;
1341Sigor@sysoev.ru     nxt_task_t          *task;
1350Sigor@sysoev.ru     nxt_thread_t        *thr;
1360Sigor@sysoev.ru     nxt_thread_pool_t   *tp;
1370Sigor@sysoev.ru     nxt_work_handler_t  handler;
1380Sigor@sysoev.ru 
1390Sigor@sysoev.ru     tp = ctx;
1400Sigor@sysoev.ru     thr = nxt_thread();
1410Sigor@sysoev.ru 
1420Sigor@sysoev.ru     if (tp->init != NULL) {
1430Sigor@sysoev.ru         tp->init();
1440Sigor@sysoev.ru     }
1450Sigor@sysoev.ru 
1460Sigor@sysoev.ru     for ( ;; ) {
1470Sigor@sysoev.ru         nxt_thread_pool_wait(tp);
1480Sigor@sysoev.ru 
1491Sigor@sysoev.ru         handler = nxt_locked_work_queue_pop(&tp->work_queue, &task, &obj,
1501Sigor@sysoev.ru                                             &data);
1510Sigor@sysoev.ru 
1520Sigor@sysoev.ru         if (nxt_fast_path(handler != NULL)) {
1531Sigor@sysoev.ru             task->thread = thr;
1544Sigor@sysoev.ru 
1550Sigor@sysoev.ru             nxt_log_debug(thr->log, "locked work queue");
1560Sigor@sysoev.ru 
1571Sigor@sysoev.ru             handler(task, obj, data);
1580Sigor@sysoev.ru         }
1590Sigor@sysoev.ru 
1600Sigor@sysoev.ru         thr->log = &nxt_main_log;
1610Sigor@sysoev.ru     }
1620Sigor@sysoev.ru }
1630Sigor@sysoev.ru 
1640Sigor@sysoev.ru 
1650Sigor@sysoev.ru static void
nxt_thread_pool_wait(nxt_thread_pool_t * tp)1660Sigor@sysoev.ru nxt_thread_pool_wait(nxt_thread_pool_t *tp)
1670Sigor@sysoev.ru {
1680Sigor@sysoev.ru     nxt_err_t            err;
1690Sigor@sysoev.ru     nxt_thread_t         *thr;
1700Sigor@sysoev.ru     nxt_atomic_uint_t    waiting, threads;
1710Sigor@sysoev.ru     nxt_thread_link_t    *link;
1720Sigor@sysoev.ru     nxt_thread_handle_t  handle;
1730Sigor@sysoev.ru 
1740Sigor@sysoev.ru     thr = nxt_thread();
1750Sigor@sysoev.ru 
1760Sigor@sysoev.ru     nxt_log_debug(thr->log, "thread pool wait");
1770Sigor@sysoev.ru 
1780Sigor@sysoev.ru     (void) nxt_atomic_fetch_add(&tp->waiting, 1);
1790Sigor@sysoev.ru 
1800Sigor@sysoev.ru     for ( ;; ) {
1810Sigor@sysoev.ru         err = nxt_sem_wait(&tp->sem, tp->timeout);
1820Sigor@sysoev.ru 
1830Sigor@sysoev.ru         if (err == 0) {
1840Sigor@sysoev.ru             waiting = nxt_atomic_fetch_add(&tp->waiting, -1);
1850Sigor@sysoev.ru             break;
1860Sigor@sysoev.ru         }
1870Sigor@sysoev.ru 
1880Sigor@sysoev.ru         if (err == NXT_ETIMEDOUT) {
1890Sigor@sysoev.ru             if (nxt_thread_handle_equal(thr->handle, tp->main)) {
1900Sigor@sysoev.ru                 continue;
1910Sigor@sysoev.ru             }
1920Sigor@sysoev.ru         }
1930Sigor@sysoev.ru 
1940Sigor@sysoev.ru         (void) nxt_atomic_fetch_add(&tp->waiting, -1);
1950Sigor@sysoev.ru         (void) nxt_atomic_fetch_add(&tp->threads, -1);
1960Sigor@sysoev.ru 
1970Sigor@sysoev.ru         nxt_thread_exit(thr);
1980Sigor@sysoev.ru         nxt_unreachable();
1990Sigor@sysoev.ru     }
2000Sigor@sysoev.ru 
2010Sigor@sysoev.ru     nxt_log_debug(thr->log, "thread pool awake, waiting: %A", waiting);
2020Sigor@sysoev.ru 
2030Sigor@sysoev.ru     if (waiting > 1) {
2040Sigor@sysoev.ru         return;
2050Sigor@sysoev.ru     }
2060Sigor@sysoev.ru 
2070Sigor@sysoev.ru     do {
2080Sigor@sysoev.ru         threads = tp->threads;
2090Sigor@sysoev.ru 
2100Sigor@sysoev.ru         if (threads >= tp->max_threads) {
2110Sigor@sysoev.ru             return;
2120Sigor@sysoev.ru         }
2130Sigor@sysoev.ru 
2140Sigor@sysoev.ru     } while (!nxt_atomic_cmp_set(&tp->threads, threads, threads + 1));
2150Sigor@sysoev.ru 
2160Sigor@sysoev.ru     link = nxt_zalloc(sizeof(nxt_thread_link_t));
2170Sigor@sysoev.ru 
2180Sigor@sysoev.ru     if (nxt_fast_path(link != NULL)) {
21953Sigor@sysoev.ru         link->start = nxt_thread_pool_loop;
22053Sigor@sysoev.ru         link->work.data = tp;
2210Sigor@sysoev.ru 
2220Sigor@sysoev.ru         if (nxt_thread_create(&handle, link) != NXT_OK) {
2230Sigor@sysoev.ru             (void) nxt_atomic_fetch_add(&tp->threads, -1);
2240Sigor@sysoev.ru         }
2250Sigor@sysoev.ru     }
2260Sigor@sysoev.ru }
2270Sigor@sysoev.ru 
2280Sigor@sysoev.ru 
2290Sigor@sysoev.ru void
nxt_thread_pool_destroy(nxt_thread_pool_t * tp)2300Sigor@sysoev.ru nxt_thread_pool_destroy(nxt_thread_pool_t *tp)
2310Sigor@sysoev.ru {
2320Sigor@sysoev.ru     nxt_thread_t  *thr;
2330Sigor@sysoev.ru 
2341Sigor@sysoev.ru     thr = nxt_thread();
2350Sigor@sysoev.ru 
236*494Spluknet@nginx.com     nxt_log_debug(thr->log, "thread pool destroy: %A", tp->ready);
23753Sigor@sysoev.ru 
2381Sigor@sysoev.ru     if (!tp->ready) {
2394Sigor@sysoev.ru         nxt_work_queue_add(&thr->engine->fast_work_queue, tp->exit,
240157Sigor@sysoev.ru                            &tp->engine->task, tp, NULL);
2410Sigor@sysoev.ru         return;
2420Sigor@sysoev.ru     }
2430Sigor@sysoev.ru 
2440Sigor@sysoev.ru     if (tp->max_threads != 0) {
2450Sigor@sysoev.ru         /* Disable new threads creation and mark a pool as being destroyed. */
2460Sigor@sysoev.ru         tp->max_threads = 0;
2470Sigor@sysoev.ru 
2484Sigor@sysoev.ru         nxt_work_set(&tp->work, nxt_thread_pool_exit, &tp->task, tp, NULL);
2494Sigor@sysoev.ru 
2504Sigor@sysoev.ru         nxt_thread_pool_post(tp, &tp->work);
2510Sigor@sysoev.ru     }
2520Sigor@sysoev.ru }
2530Sigor@sysoev.ru 
2540Sigor@sysoev.ru 
2550Sigor@sysoev.ru /*
2560Sigor@sysoev.ru  * A thread handle (pthread_t) is either pointer or integer, so it can be
2570Sigor@sysoev.ru  * passed as work handler pointer "data" argument.  To convert void pointer
2580Sigor@sysoev.ru  * to pthread_t and vice versa the source argument should be cast first to
2590Sigor@sysoev.ru  * uintptr_t type and then to the destination type.
2600Sigor@sysoev.ru  *
2610Sigor@sysoev.ru  * If the handle would be a struct it should be stored in thread pool and
2620Sigor@sysoev.ru  * the thread pool must be freed in the thread pool exit procedure after
2630Sigor@sysoev.ru  * the last thread of pool will exit.
2640Sigor@sysoev.ru  */
2650Sigor@sysoev.ru 
2660Sigor@sysoev.ru static void
nxt_thread_pool_exit(nxt_task_t * task,void * obj,void * data)2671Sigor@sysoev.ru nxt_thread_pool_exit(nxt_task_t *task, void *obj, void *data)
2680Sigor@sysoev.ru {
2691Sigor@sysoev.ru     nxt_thread_t         *thread;
2700Sigor@sysoev.ru     nxt_thread_pool_t    *tp;
2710Sigor@sysoev.ru     nxt_atomic_uint_t    threads;
2720Sigor@sysoev.ru     nxt_thread_handle_t  handle;
2730Sigor@sysoev.ru 
2740Sigor@sysoev.ru     tp = obj;
2751Sigor@sysoev.ru     thread = task->thread;
2760Sigor@sysoev.ru 
2771Sigor@sysoev.ru     nxt_debug(task, "thread pool exit");
2780Sigor@sysoev.ru 
2790Sigor@sysoev.ru     if (data != NULL) {
2800Sigor@sysoev.ru         handle = (nxt_thread_handle_t) (uintptr_t) data;
2810Sigor@sysoev.ru         nxt_thread_wait(handle);
2820Sigor@sysoev.ru     }
2830Sigor@sysoev.ru 
2840Sigor@sysoev.ru     threads = nxt_atomic_fetch_add(&tp->threads, -1);
2850Sigor@sysoev.ru 
2861Sigor@sysoev.ru     nxt_debug(task, "thread pool threads: %A", threads);
2870Sigor@sysoev.ru 
2880Sigor@sysoev.ru     if (threads > 1) {
2894Sigor@sysoev.ru         nxt_work_set(&tp->work, nxt_thread_pool_exit, &tp->task, tp,
2904Sigor@sysoev.ru                      (void *) (uintptr_t) thread->handle);
2914Sigor@sysoev.ru 
2924Sigor@sysoev.ru         nxt_thread_pool_post(tp, &tp->work);
2930Sigor@sysoev.ru 
2940Sigor@sysoev.ru     } else {
2951Sigor@sysoev.ru         nxt_debug(task, "thread pool destroy");
2960Sigor@sysoev.ru 
2970Sigor@sysoev.ru         nxt_sem_destroy(&tp->sem);
2980Sigor@sysoev.ru 
299157Sigor@sysoev.ru         nxt_work_set(&tp->work, tp->exit, &tp->engine->task, tp,
3004Sigor@sysoev.ru                      (void *) (uintptr_t) thread->handle);
3010Sigor@sysoev.ru 
3024Sigor@sysoev.ru         nxt_event_engine_post(tp->engine, &tp->work);
3034Sigor@sysoev.ru 
3044Sigor@sysoev.ru         /* The "tp" memory should be freed by tp->exit handler. */
3050Sigor@sysoev.ru     }
3060Sigor@sysoev.ru 
3071Sigor@sysoev.ru     nxt_thread_exit(thread);
3081Sigor@sysoev.ru 
3090Sigor@sysoev.ru     nxt_unreachable();
3100Sigor@sysoev.ru }
311