10Sigor@sysoev.ru
20Sigor@sysoev.ru /*
30Sigor@sysoev.ru * Copyright (C) Igor Sysoev
40Sigor@sysoev.ru * Copyright (C) NGINX, Inc.
50Sigor@sysoev.ru */
60Sigor@sysoev.ru
70Sigor@sysoev.ru #include <nxt_main.h>
80Sigor@sysoev.ru
90Sigor@sysoev.ru
100Sigor@sysoev.ru static nxt_int_t nxt_thread_pool_init(nxt_thread_pool_t *tp);
111Sigor@sysoev.ru static void nxt_thread_pool_exit(nxt_task_t *task, void *obj, void *data);
120Sigor@sysoev.ru static void nxt_thread_pool_start(void *ctx);
1353Sigor@sysoev.ru static void nxt_thread_pool_loop(void *ctx);
140Sigor@sysoev.ru static void nxt_thread_pool_wait(nxt_thread_pool_t *tp);
150Sigor@sysoev.ru
160Sigor@sysoev.ru
170Sigor@sysoev.ru nxt_thread_pool_t *
nxt_thread_pool_create(nxt_uint_t max_threads,nxt_nsec_t timeout,nxt_thread_pool_init_t init,nxt_event_engine_t * engine,nxt_work_handler_t exit)180Sigor@sysoev.ru nxt_thread_pool_create(nxt_uint_t max_threads, nxt_nsec_t timeout,
190Sigor@sysoev.ru nxt_thread_pool_init_t init, nxt_event_engine_t *engine,
200Sigor@sysoev.ru nxt_work_handler_t exit)
210Sigor@sysoev.ru {
220Sigor@sysoev.ru nxt_thread_pool_t *tp;
230Sigor@sysoev.ru
240Sigor@sysoev.ru tp = nxt_zalloc(sizeof(nxt_thread_pool_t));
250Sigor@sysoev.ru if (tp == NULL) {
260Sigor@sysoev.ru return NULL;
270Sigor@sysoev.ru }
280Sigor@sysoev.ru
290Sigor@sysoev.ru tp->max_threads = max_threads;
300Sigor@sysoev.ru tp->timeout = timeout;
310Sigor@sysoev.ru tp->engine = engine;
321Sigor@sysoev.ru tp->task.thread = engine->task.thread;
331Sigor@sysoev.ru tp->task.log = engine->task.log;
340Sigor@sysoev.ru tp->init = init;
350Sigor@sysoev.ru tp->exit = exit;
360Sigor@sysoev.ru
370Sigor@sysoev.ru return tp;
380Sigor@sysoev.ru }
390Sigor@sysoev.ru
400Sigor@sysoev.ru
410Sigor@sysoev.ru nxt_int_t
nxt_thread_pool_post(nxt_thread_pool_t * tp,nxt_work_t * work)424Sigor@sysoev.ru nxt_thread_pool_post(nxt_thread_pool_t *tp, nxt_work_t *work)
430Sigor@sysoev.ru {
440Sigor@sysoev.ru nxt_thread_log_debug("thread pool post");
450Sigor@sysoev.ru
460Sigor@sysoev.ru if (nxt_slow_path(nxt_thread_pool_init(tp) != NXT_OK)) {
470Sigor@sysoev.ru return NXT_ERROR;
480Sigor@sysoev.ru }
490Sigor@sysoev.ru
504Sigor@sysoev.ru nxt_locked_work_queue_add(&tp->work_queue, work);
510Sigor@sysoev.ru
520Sigor@sysoev.ru (void) nxt_sem_post(&tp->sem);
530Sigor@sysoev.ru
540Sigor@sysoev.ru return NXT_OK;
550Sigor@sysoev.ru }
560Sigor@sysoev.ru
570Sigor@sysoev.ru
580Sigor@sysoev.ru static nxt_int_t
nxt_thread_pool_init(nxt_thread_pool_t * tp)590Sigor@sysoev.ru nxt_thread_pool_init(nxt_thread_pool_t *tp)
600Sigor@sysoev.ru {
610Sigor@sysoev.ru nxt_int_t ret;
620Sigor@sysoev.ru nxt_thread_link_t *link;
630Sigor@sysoev.ru nxt_thread_handle_t handle;
640Sigor@sysoev.ru
650Sigor@sysoev.ru if (nxt_fast_path(tp->ready)) {
660Sigor@sysoev.ru return NXT_OK;
670Sigor@sysoev.ru }
680Sigor@sysoev.ru
694Sigor@sysoev.ru if (tp->max_threads == 0) {
704Sigor@sysoev.ru /* The pool is being destroyed. */
714Sigor@sysoev.ru return NXT_ERROR;
724Sigor@sysoev.ru }
734Sigor@sysoev.ru
740Sigor@sysoev.ru nxt_thread_spin_lock(&tp->work_queue.lock);
750Sigor@sysoev.ru
760Sigor@sysoev.ru ret = NXT_OK;
770Sigor@sysoev.ru
780Sigor@sysoev.ru if (!tp->ready) {
790Sigor@sysoev.ru
800Sigor@sysoev.ru nxt_thread_log_debug("thread pool init");
810Sigor@sysoev.ru
820Sigor@sysoev.ru (void) nxt_atomic_fetch_add(&tp->threads, 1);
830Sigor@sysoev.ru
840Sigor@sysoev.ru if (nxt_fast_path(nxt_sem_init(&tp->sem, 0) == NXT_OK)) {
850Sigor@sysoev.ru
86157Sigor@sysoev.ru link = nxt_zalloc(sizeof(nxt_thread_link_t));
870Sigor@sysoev.ru
880Sigor@sysoev.ru if (nxt_fast_path(link != NULL)) {
890Sigor@sysoev.ru link->start = nxt_thread_pool_start;
9053Sigor@sysoev.ru link->work.data = tp;
9153Sigor@sysoev.ru
920Sigor@sysoev.ru if (nxt_thread_create(&handle, link) == NXT_OK) {
930Sigor@sysoev.ru tp->ready = 1;
940Sigor@sysoev.ru goto done;
950Sigor@sysoev.ru }
960Sigor@sysoev.ru }
970Sigor@sysoev.ru
980Sigor@sysoev.ru nxt_sem_destroy(&tp->sem);
990Sigor@sysoev.ru }
1000Sigor@sysoev.ru
1010Sigor@sysoev.ru (void) nxt_atomic_fetch_add(&tp->threads, -1);
1020Sigor@sysoev.ru
1030Sigor@sysoev.ru ret = NXT_ERROR;
1040Sigor@sysoev.ru }
1050Sigor@sysoev.ru
1060Sigor@sysoev.ru done:
1070Sigor@sysoev.ru
1080Sigor@sysoev.ru nxt_thread_spin_unlock(&tp->work_queue.lock);
1090Sigor@sysoev.ru
1100Sigor@sysoev.ru return ret;
1110Sigor@sysoev.ru }
1120Sigor@sysoev.ru
1130Sigor@sysoev.ru
1140Sigor@sysoev.ru static void
nxt_thread_pool_start(void * ctx)1150Sigor@sysoev.ru nxt_thread_pool_start(void *ctx)
1160Sigor@sysoev.ru {
11753Sigor@sysoev.ru nxt_thread_t *thr;
11853Sigor@sysoev.ru nxt_thread_pool_t *tp;
11953Sigor@sysoev.ru
12053Sigor@sysoev.ru tp = ctx;
12153Sigor@sysoev.ru thr = nxt_thread();
12253Sigor@sysoev.ru
12353Sigor@sysoev.ru tp->main = thr->handle;
12453Sigor@sysoev.ru tp->task.thread = thr;
12553Sigor@sysoev.ru
12653Sigor@sysoev.ru nxt_thread_pool_loop(ctx);
12753Sigor@sysoev.ru }
12853Sigor@sysoev.ru
12953Sigor@sysoev.ru
13053Sigor@sysoev.ru static void
nxt_thread_pool_loop(void * ctx)13153Sigor@sysoev.ru nxt_thread_pool_loop(void *ctx)
13253Sigor@sysoev.ru {
1330Sigor@sysoev.ru void *obj, *data;
1341Sigor@sysoev.ru nxt_task_t *task;
1350Sigor@sysoev.ru nxt_thread_t *thr;
1360Sigor@sysoev.ru nxt_thread_pool_t *tp;
1370Sigor@sysoev.ru nxt_work_handler_t handler;
1380Sigor@sysoev.ru
1390Sigor@sysoev.ru tp = ctx;
1400Sigor@sysoev.ru thr = nxt_thread();
1410Sigor@sysoev.ru
1420Sigor@sysoev.ru if (tp->init != NULL) {
1430Sigor@sysoev.ru tp->init();
1440Sigor@sysoev.ru }
1450Sigor@sysoev.ru
1460Sigor@sysoev.ru for ( ;; ) {
1470Sigor@sysoev.ru nxt_thread_pool_wait(tp);
1480Sigor@sysoev.ru
1491Sigor@sysoev.ru handler = nxt_locked_work_queue_pop(&tp->work_queue, &task, &obj,
1501Sigor@sysoev.ru &data);
1510Sigor@sysoev.ru
1520Sigor@sysoev.ru if (nxt_fast_path(handler != NULL)) {
1531Sigor@sysoev.ru task->thread = thr;
1544Sigor@sysoev.ru
1550Sigor@sysoev.ru nxt_log_debug(thr->log, "locked work queue");
1560Sigor@sysoev.ru
1571Sigor@sysoev.ru handler(task, obj, data);
1580Sigor@sysoev.ru }
1590Sigor@sysoev.ru
1600Sigor@sysoev.ru thr->log = &nxt_main_log;
1610Sigor@sysoev.ru }
1620Sigor@sysoev.ru }
1630Sigor@sysoev.ru
1640Sigor@sysoev.ru
1650Sigor@sysoev.ru static void
nxt_thread_pool_wait(nxt_thread_pool_t * tp)1660Sigor@sysoev.ru nxt_thread_pool_wait(nxt_thread_pool_t *tp)
1670Sigor@sysoev.ru {
1680Sigor@sysoev.ru nxt_err_t err;
1690Sigor@sysoev.ru nxt_thread_t *thr;
1700Sigor@sysoev.ru nxt_atomic_uint_t waiting, threads;
1710Sigor@sysoev.ru nxt_thread_link_t *link;
1720Sigor@sysoev.ru nxt_thread_handle_t handle;
1730Sigor@sysoev.ru
1740Sigor@sysoev.ru thr = nxt_thread();
1750Sigor@sysoev.ru
1760Sigor@sysoev.ru nxt_log_debug(thr->log, "thread pool wait");
1770Sigor@sysoev.ru
1780Sigor@sysoev.ru (void) nxt_atomic_fetch_add(&tp->waiting, 1);
1790Sigor@sysoev.ru
1800Sigor@sysoev.ru for ( ;; ) {
1810Sigor@sysoev.ru err = nxt_sem_wait(&tp->sem, tp->timeout);
1820Sigor@sysoev.ru
1830Sigor@sysoev.ru if (err == 0) {
1840Sigor@sysoev.ru waiting = nxt_atomic_fetch_add(&tp->waiting, -1);
1850Sigor@sysoev.ru break;
1860Sigor@sysoev.ru }
1870Sigor@sysoev.ru
1880Sigor@sysoev.ru if (err == NXT_ETIMEDOUT) {
1890Sigor@sysoev.ru if (nxt_thread_handle_equal(thr->handle, tp->main)) {
1900Sigor@sysoev.ru continue;
1910Sigor@sysoev.ru }
1920Sigor@sysoev.ru }
1930Sigor@sysoev.ru
1940Sigor@sysoev.ru (void) nxt_atomic_fetch_add(&tp->waiting, -1);
1950Sigor@sysoev.ru (void) nxt_atomic_fetch_add(&tp->threads, -1);
1960Sigor@sysoev.ru
1970Sigor@sysoev.ru nxt_thread_exit(thr);
1980Sigor@sysoev.ru nxt_unreachable();
1990Sigor@sysoev.ru }
2000Sigor@sysoev.ru
2010Sigor@sysoev.ru nxt_log_debug(thr->log, "thread pool awake, waiting: %A", waiting);
2020Sigor@sysoev.ru
2030Sigor@sysoev.ru if (waiting > 1) {
2040Sigor@sysoev.ru return;
2050Sigor@sysoev.ru }
2060Sigor@sysoev.ru
2070Sigor@sysoev.ru do {
2080Sigor@sysoev.ru threads = tp->threads;
2090Sigor@sysoev.ru
2100Sigor@sysoev.ru if (threads >= tp->max_threads) {
2110Sigor@sysoev.ru return;
2120Sigor@sysoev.ru }
2130Sigor@sysoev.ru
2140Sigor@sysoev.ru } while (!nxt_atomic_cmp_set(&tp->threads, threads, threads + 1));
2150Sigor@sysoev.ru
2160Sigor@sysoev.ru link = nxt_zalloc(sizeof(nxt_thread_link_t));
2170Sigor@sysoev.ru
2180Sigor@sysoev.ru if (nxt_fast_path(link != NULL)) {
21953Sigor@sysoev.ru link->start = nxt_thread_pool_loop;
22053Sigor@sysoev.ru link->work.data = tp;
2210Sigor@sysoev.ru
2220Sigor@sysoev.ru if (nxt_thread_create(&handle, link) != NXT_OK) {
2230Sigor@sysoev.ru (void) nxt_atomic_fetch_add(&tp->threads, -1);
2240Sigor@sysoev.ru }
2250Sigor@sysoev.ru }
2260Sigor@sysoev.ru }
2270Sigor@sysoev.ru
2280Sigor@sysoev.ru
2290Sigor@sysoev.ru void
nxt_thread_pool_destroy(nxt_thread_pool_t * tp)2300Sigor@sysoev.ru nxt_thread_pool_destroy(nxt_thread_pool_t *tp)
2310Sigor@sysoev.ru {
2320Sigor@sysoev.ru nxt_thread_t *thr;
2330Sigor@sysoev.ru
2341Sigor@sysoev.ru thr = nxt_thread();
2350Sigor@sysoev.ru
236*494Spluknet@nginx.com nxt_log_debug(thr->log, "thread pool destroy: %A", tp->ready);
23753Sigor@sysoev.ru
2381Sigor@sysoev.ru if (!tp->ready) {
2394Sigor@sysoev.ru nxt_work_queue_add(&thr->engine->fast_work_queue, tp->exit,
240157Sigor@sysoev.ru &tp->engine->task, tp, NULL);
2410Sigor@sysoev.ru return;
2420Sigor@sysoev.ru }
2430Sigor@sysoev.ru
2440Sigor@sysoev.ru if (tp->max_threads != 0) {
2450Sigor@sysoev.ru /* Disable new threads creation and mark a pool as being destroyed. */
2460Sigor@sysoev.ru tp->max_threads = 0;
2470Sigor@sysoev.ru
2484Sigor@sysoev.ru nxt_work_set(&tp->work, nxt_thread_pool_exit, &tp->task, tp, NULL);
2494Sigor@sysoev.ru
2504Sigor@sysoev.ru nxt_thread_pool_post(tp, &tp->work);
2510Sigor@sysoev.ru }
2520Sigor@sysoev.ru }
2530Sigor@sysoev.ru
2540Sigor@sysoev.ru
2550Sigor@sysoev.ru /*
2560Sigor@sysoev.ru * A thread handle (pthread_t) is either pointer or integer, so it can be
2570Sigor@sysoev.ru * passed as work handler pointer "data" argument. To convert void pointer
2580Sigor@sysoev.ru * to pthread_t and vice versa the source argument should be cast first to
2590Sigor@sysoev.ru * uintptr_t type and then to the destination type.
2600Sigor@sysoev.ru *
2610Sigor@sysoev.ru * If the handle would be a struct it should be stored in thread pool and
2620Sigor@sysoev.ru * the thread pool must be freed in the thread pool exit procedure after
2630Sigor@sysoev.ru * the last thread of pool will exit.
2640Sigor@sysoev.ru */
2650Sigor@sysoev.ru
2660Sigor@sysoev.ru static void
nxt_thread_pool_exit(nxt_task_t * task,void * obj,void * data)2671Sigor@sysoev.ru nxt_thread_pool_exit(nxt_task_t *task, void *obj, void *data)
2680Sigor@sysoev.ru {
2691Sigor@sysoev.ru nxt_thread_t *thread;
2700Sigor@sysoev.ru nxt_thread_pool_t *tp;
2710Sigor@sysoev.ru nxt_atomic_uint_t threads;
2720Sigor@sysoev.ru nxt_thread_handle_t handle;
2730Sigor@sysoev.ru
2740Sigor@sysoev.ru tp = obj;
2751Sigor@sysoev.ru thread = task->thread;
2760Sigor@sysoev.ru
2771Sigor@sysoev.ru nxt_debug(task, "thread pool exit");
2780Sigor@sysoev.ru
2790Sigor@sysoev.ru if (data != NULL) {
2800Sigor@sysoev.ru handle = (nxt_thread_handle_t) (uintptr_t) data;
2810Sigor@sysoev.ru nxt_thread_wait(handle);
2820Sigor@sysoev.ru }
2830Sigor@sysoev.ru
2840Sigor@sysoev.ru threads = nxt_atomic_fetch_add(&tp->threads, -1);
2850Sigor@sysoev.ru
2861Sigor@sysoev.ru nxt_debug(task, "thread pool threads: %A", threads);
2870Sigor@sysoev.ru
2880Sigor@sysoev.ru if (threads > 1) {
2894Sigor@sysoev.ru nxt_work_set(&tp->work, nxt_thread_pool_exit, &tp->task, tp,
2904Sigor@sysoev.ru (void *) (uintptr_t) thread->handle);
2914Sigor@sysoev.ru
2924Sigor@sysoev.ru nxt_thread_pool_post(tp, &tp->work);
2930Sigor@sysoev.ru
2940Sigor@sysoev.ru } else {
2951Sigor@sysoev.ru nxt_debug(task, "thread pool destroy");
2960Sigor@sysoev.ru
2970Sigor@sysoev.ru nxt_sem_destroy(&tp->sem);
2980Sigor@sysoev.ru
299157Sigor@sysoev.ru nxt_work_set(&tp->work, tp->exit, &tp->engine->task, tp,
3004Sigor@sysoev.ru (void *) (uintptr_t) thread->handle);
3010Sigor@sysoev.ru
3024Sigor@sysoev.ru nxt_event_engine_post(tp->engine, &tp->work);
3034Sigor@sysoev.ru
3044Sigor@sysoev.ru /* The "tp" memory should be freed by tp->exit handler. */
3050Sigor@sysoev.ru }
3060Sigor@sysoev.ru
3071Sigor@sysoev.ru nxt_thread_exit(thread);
3081Sigor@sysoev.ru
3090Sigor@sysoev.ru nxt_unreachable();
3100Sigor@sysoev.ru }
311