10Sigor@sysoev.ru 20Sigor@sysoev.ru /* 30Sigor@sysoev.ru * Copyright (C) Igor Sysoev 40Sigor@sysoev.ru * Copyright (C) NGINX, Inc. 50Sigor@sysoev.ru */ 60Sigor@sysoev.ru 70Sigor@sysoev.ru #include <nxt_main.h> 80Sigor@sysoev.ru 90Sigor@sysoev.ru 100Sigor@sysoev.ru static nxt_int_t nxt_thread_pool_init(nxt_thread_pool_t *tp); 111Sigor@sysoev.ru static void nxt_thread_pool_exit(nxt_task_t *task, void *obj, void *data); 120Sigor@sysoev.ru static void nxt_thread_pool_start(void *ctx); 13*53Sigor@sysoev.ru static void nxt_thread_pool_loop(void *ctx); 140Sigor@sysoev.ru static void nxt_thread_pool_wait(nxt_thread_pool_t *tp); 150Sigor@sysoev.ru 160Sigor@sysoev.ru 170Sigor@sysoev.ru nxt_thread_pool_t * 180Sigor@sysoev.ru nxt_thread_pool_create(nxt_uint_t max_threads, nxt_nsec_t timeout, 190Sigor@sysoev.ru nxt_thread_pool_init_t init, nxt_event_engine_t *engine, 200Sigor@sysoev.ru nxt_work_handler_t exit) 210Sigor@sysoev.ru { 220Sigor@sysoev.ru nxt_thread_pool_t *tp; 230Sigor@sysoev.ru 240Sigor@sysoev.ru tp = nxt_zalloc(sizeof(nxt_thread_pool_t)); 250Sigor@sysoev.ru if (tp == NULL) { 260Sigor@sysoev.ru return NULL; 270Sigor@sysoev.ru } 280Sigor@sysoev.ru 290Sigor@sysoev.ru tp->max_threads = max_threads; 300Sigor@sysoev.ru tp->timeout = timeout; 310Sigor@sysoev.ru tp->engine = engine; 321Sigor@sysoev.ru tp->task.thread = engine->task.thread; 331Sigor@sysoev.ru tp->task.log = engine->task.log; 340Sigor@sysoev.ru tp->init = init; 350Sigor@sysoev.ru tp->exit = exit; 360Sigor@sysoev.ru 370Sigor@sysoev.ru return tp; 380Sigor@sysoev.ru } 390Sigor@sysoev.ru 400Sigor@sysoev.ru 410Sigor@sysoev.ru nxt_int_t 424Sigor@sysoev.ru nxt_thread_pool_post(nxt_thread_pool_t *tp, nxt_work_t *work) 430Sigor@sysoev.ru { 440Sigor@sysoev.ru nxt_thread_log_debug("thread pool post"); 450Sigor@sysoev.ru 460Sigor@sysoev.ru if (nxt_slow_path(nxt_thread_pool_init(tp) != NXT_OK)) { 470Sigor@sysoev.ru return NXT_ERROR; 480Sigor@sysoev.ru } 490Sigor@sysoev.ru 504Sigor@sysoev.ru nxt_locked_work_queue_add(&tp->work_queue, work); 510Sigor@sysoev.ru 520Sigor@sysoev.ru (void) nxt_sem_post(&tp->sem); 530Sigor@sysoev.ru 540Sigor@sysoev.ru return NXT_OK; 550Sigor@sysoev.ru } 560Sigor@sysoev.ru 570Sigor@sysoev.ru 580Sigor@sysoev.ru static nxt_int_t 590Sigor@sysoev.ru nxt_thread_pool_init(nxt_thread_pool_t *tp) 600Sigor@sysoev.ru { 610Sigor@sysoev.ru nxt_int_t ret; 620Sigor@sysoev.ru nxt_thread_link_t *link; 630Sigor@sysoev.ru nxt_thread_handle_t handle; 640Sigor@sysoev.ru 650Sigor@sysoev.ru if (nxt_fast_path(tp->ready)) { 660Sigor@sysoev.ru return NXT_OK; 670Sigor@sysoev.ru } 680Sigor@sysoev.ru 694Sigor@sysoev.ru if (tp->max_threads == 0) { 704Sigor@sysoev.ru /* The pool is being destroyed. */ 714Sigor@sysoev.ru return NXT_ERROR; 724Sigor@sysoev.ru } 734Sigor@sysoev.ru 740Sigor@sysoev.ru nxt_thread_spin_lock(&tp->work_queue.lock); 750Sigor@sysoev.ru 760Sigor@sysoev.ru ret = NXT_OK; 770Sigor@sysoev.ru 780Sigor@sysoev.ru if (!tp->ready) { 790Sigor@sysoev.ru 800Sigor@sysoev.ru nxt_thread_log_debug("thread pool init"); 810Sigor@sysoev.ru 820Sigor@sysoev.ru (void) nxt_atomic_fetch_add(&tp->threads, 1); 830Sigor@sysoev.ru 840Sigor@sysoev.ru if (nxt_fast_path(nxt_sem_init(&tp->sem, 0) == NXT_OK)) { 850Sigor@sysoev.ru 860Sigor@sysoev.ru link = nxt_malloc(sizeof(nxt_thread_link_t)); 870Sigor@sysoev.ru 880Sigor@sysoev.ru if (nxt_fast_path(link != NULL)) { 890Sigor@sysoev.ru link->start = nxt_thread_pool_start; 90*53Sigor@sysoev.ru link->work.data = tp; 91*53Sigor@sysoev.ru 920Sigor@sysoev.ru if (nxt_thread_create(&handle, link) == NXT_OK) { 930Sigor@sysoev.ru tp->ready = 1; 940Sigor@sysoev.ru goto done; 950Sigor@sysoev.ru } 960Sigor@sysoev.ru } 970Sigor@sysoev.ru 980Sigor@sysoev.ru nxt_sem_destroy(&tp->sem); 990Sigor@sysoev.ru } 1000Sigor@sysoev.ru 1010Sigor@sysoev.ru (void) nxt_atomic_fetch_add(&tp->threads, -1); 1020Sigor@sysoev.ru 1030Sigor@sysoev.ru ret = NXT_ERROR; 1040Sigor@sysoev.ru } 1050Sigor@sysoev.ru 1060Sigor@sysoev.ru done: 1070Sigor@sysoev.ru 1080Sigor@sysoev.ru nxt_thread_spin_unlock(&tp->work_queue.lock); 1090Sigor@sysoev.ru 1100Sigor@sysoev.ru return ret; 1110Sigor@sysoev.ru } 1120Sigor@sysoev.ru 1130Sigor@sysoev.ru 1140Sigor@sysoev.ru static void 1150Sigor@sysoev.ru nxt_thread_pool_start(void *ctx) 1160Sigor@sysoev.ru { 117*53Sigor@sysoev.ru nxt_thread_t *thr; 118*53Sigor@sysoev.ru nxt_thread_pool_t *tp; 119*53Sigor@sysoev.ru 120*53Sigor@sysoev.ru tp = ctx; 121*53Sigor@sysoev.ru thr = nxt_thread(); 122*53Sigor@sysoev.ru 123*53Sigor@sysoev.ru tp->main = thr->handle; 124*53Sigor@sysoev.ru tp->task.thread = thr; 125*53Sigor@sysoev.ru 126*53Sigor@sysoev.ru nxt_thread_pool_loop(ctx); 127*53Sigor@sysoev.ru } 128*53Sigor@sysoev.ru 129*53Sigor@sysoev.ru 130*53Sigor@sysoev.ru static void 131*53Sigor@sysoev.ru nxt_thread_pool_loop(void *ctx) 132*53Sigor@sysoev.ru { 1330Sigor@sysoev.ru void *obj, *data; 1341Sigor@sysoev.ru nxt_task_t *task; 1350Sigor@sysoev.ru nxt_thread_t *thr; 1360Sigor@sysoev.ru nxt_thread_pool_t *tp; 1370Sigor@sysoev.ru nxt_work_handler_t handler; 1380Sigor@sysoev.ru 1390Sigor@sysoev.ru tp = ctx; 1400Sigor@sysoev.ru thr = nxt_thread(); 1410Sigor@sysoev.ru 1420Sigor@sysoev.ru if (tp->init != NULL) { 1430Sigor@sysoev.ru tp->init(); 1440Sigor@sysoev.ru } 1450Sigor@sysoev.ru 1460Sigor@sysoev.ru for ( ;; ) { 1470Sigor@sysoev.ru nxt_thread_pool_wait(tp); 1480Sigor@sysoev.ru 1491Sigor@sysoev.ru handler = nxt_locked_work_queue_pop(&tp->work_queue, &task, &obj, 1501Sigor@sysoev.ru &data); 1510Sigor@sysoev.ru 1520Sigor@sysoev.ru if (nxt_fast_path(handler != NULL)) { 1531Sigor@sysoev.ru task->thread = thr; 1544Sigor@sysoev.ru 1550Sigor@sysoev.ru nxt_log_debug(thr->log, "locked work queue"); 1560Sigor@sysoev.ru 1571Sigor@sysoev.ru handler(task, obj, data); 1580Sigor@sysoev.ru } 1590Sigor@sysoev.ru 1600Sigor@sysoev.ru thr->log = &nxt_main_log; 1610Sigor@sysoev.ru } 1620Sigor@sysoev.ru } 1630Sigor@sysoev.ru 1640Sigor@sysoev.ru 1650Sigor@sysoev.ru static void 1660Sigor@sysoev.ru nxt_thread_pool_wait(nxt_thread_pool_t *tp) 1670Sigor@sysoev.ru { 1680Sigor@sysoev.ru nxt_err_t err; 1690Sigor@sysoev.ru nxt_thread_t *thr; 1700Sigor@sysoev.ru nxt_atomic_uint_t waiting, threads; 1710Sigor@sysoev.ru nxt_thread_link_t *link; 1720Sigor@sysoev.ru nxt_thread_handle_t handle; 1730Sigor@sysoev.ru 1740Sigor@sysoev.ru thr = nxt_thread(); 1750Sigor@sysoev.ru 1760Sigor@sysoev.ru nxt_log_debug(thr->log, "thread pool wait"); 1770Sigor@sysoev.ru 1780Sigor@sysoev.ru (void) nxt_atomic_fetch_add(&tp->waiting, 1); 1790Sigor@sysoev.ru 1800Sigor@sysoev.ru for ( ;; ) { 1810Sigor@sysoev.ru err = nxt_sem_wait(&tp->sem, tp->timeout); 1820Sigor@sysoev.ru 1830Sigor@sysoev.ru if (err == 0) { 1840Sigor@sysoev.ru waiting = nxt_atomic_fetch_add(&tp->waiting, -1); 1850Sigor@sysoev.ru break; 1860Sigor@sysoev.ru } 1870Sigor@sysoev.ru 1880Sigor@sysoev.ru if (err == NXT_ETIMEDOUT) { 1890Sigor@sysoev.ru if (nxt_thread_handle_equal(thr->handle, tp->main)) { 1900Sigor@sysoev.ru continue; 1910Sigor@sysoev.ru } 1920Sigor@sysoev.ru } 1930Sigor@sysoev.ru 1940Sigor@sysoev.ru (void) nxt_atomic_fetch_add(&tp->waiting, -1); 1950Sigor@sysoev.ru (void) nxt_atomic_fetch_add(&tp->threads, -1); 1960Sigor@sysoev.ru 1970Sigor@sysoev.ru nxt_thread_exit(thr); 1980Sigor@sysoev.ru nxt_unreachable(); 1990Sigor@sysoev.ru } 2000Sigor@sysoev.ru 2010Sigor@sysoev.ru nxt_log_debug(thr->log, "thread pool awake, waiting: %A", waiting); 2020Sigor@sysoev.ru 2030Sigor@sysoev.ru if (waiting > 1) { 2040Sigor@sysoev.ru return; 2050Sigor@sysoev.ru } 2060Sigor@sysoev.ru 2070Sigor@sysoev.ru do { 2080Sigor@sysoev.ru threads = tp->threads; 2090Sigor@sysoev.ru 2100Sigor@sysoev.ru if (threads >= tp->max_threads) { 2110Sigor@sysoev.ru return; 2120Sigor@sysoev.ru } 2130Sigor@sysoev.ru 2140Sigor@sysoev.ru } while (!nxt_atomic_cmp_set(&tp->threads, threads, threads + 1)); 2150Sigor@sysoev.ru 2160Sigor@sysoev.ru link = nxt_zalloc(sizeof(nxt_thread_link_t)); 2170Sigor@sysoev.ru 2180Sigor@sysoev.ru if (nxt_fast_path(link != NULL)) { 219*53Sigor@sysoev.ru link->start = nxt_thread_pool_loop; 220*53Sigor@sysoev.ru link->work.data = tp; 2210Sigor@sysoev.ru 2220Sigor@sysoev.ru if (nxt_thread_create(&handle, link) != NXT_OK) { 2230Sigor@sysoev.ru (void) nxt_atomic_fetch_add(&tp->threads, -1); 2240Sigor@sysoev.ru } 2250Sigor@sysoev.ru } 2260Sigor@sysoev.ru } 2270Sigor@sysoev.ru 2280Sigor@sysoev.ru 2290Sigor@sysoev.ru void 2300Sigor@sysoev.ru nxt_thread_pool_destroy(nxt_thread_pool_t *tp) 2310Sigor@sysoev.ru { 2320Sigor@sysoev.ru nxt_thread_t *thr; 2330Sigor@sysoev.ru 2341Sigor@sysoev.ru thr = nxt_thread(); 2350Sigor@sysoev.ru 236*53Sigor@sysoev.ru nxt_log_debug(thr->log, "thread pool destroy: %d", tp->ready); 237*53Sigor@sysoev.ru 2381Sigor@sysoev.ru if (!tp->ready) { 2394Sigor@sysoev.ru nxt_work_queue_add(&thr->engine->fast_work_queue, tp->exit, 2404Sigor@sysoev.ru &tp->task, tp, NULL); 2410Sigor@sysoev.ru return; 2420Sigor@sysoev.ru } 2430Sigor@sysoev.ru 2440Sigor@sysoev.ru if (tp->max_threads != 0) { 2450Sigor@sysoev.ru /* Disable new threads creation and mark a pool as being destroyed. */ 2460Sigor@sysoev.ru tp->max_threads = 0; 2470Sigor@sysoev.ru 2484Sigor@sysoev.ru nxt_work_set(&tp->work, nxt_thread_pool_exit, &tp->task, tp, NULL); 2494Sigor@sysoev.ru 2504Sigor@sysoev.ru nxt_thread_pool_post(tp, &tp->work); 2510Sigor@sysoev.ru } 2520Sigor@sysoev.ru } 2530Sigor@sysoev.ru 2540Sigor@sysoev.ru 2550Sigor@sysoev.ru /* 2560Sigor@sysoev.ru * A thread handle (pthread_t) is either pointer or integer, so it can be 2570Sigor@sysoev.ru * passed as work handler pointer "data" argument. To convert void pointer 2580Sigor@sysoev.ru * to pthread_t and vice versa the source argument should be cast first to 2590Sigor@sysoev.ru * uintptr_t type and then to the destination type. 2600Sigor@sysoev.ru * 2610Sigor@sysoev.ru * If the handle would be a struct it should be stored in thread pool and 2620Sigor@sysoev.ru * the thread pool must be freed in the thread pool exit procedure after 2630Sigor@sysoev.ru * the last thread of pool will exit. 2640Sigor@sysoev.ru */ 2650Sigor@sysoev.ru 2660Sigor@sysoev.ru static void 2671Sigor@sysoev.ru nxt_thread_pool_exit(nxt_task_t *task, void *obj, void *data) 2680Sigor@sysoev.ru { 2691Sigor@sysoev.ru nxt_thread_t *thread; 2700Sigor@sysoev.ru nxt_thread_pool_t *tp; 2710Sigor@sysoev.ru nxt_atomic_uint_t threads; 2720Sigor@sysoev.ru nxt_thread_handle_t handle; 2730Sigor@sysoev.ru 2740Sigor@sysoev.ru tp = obj; 2751Sigor@sysoev.ru thread = task->thread; 2760Sigor@sysoev.ru 2771Sigor@sysoev.ru nxt_debug(task, "thread pool exit"); 2780Sigor@sysoev.ru 2790Sigor@sysoev.ru if (data != NULL) { 2800Sigor@sysoev.ru handle = (nxt_thread_handle_t) (uintptr_t) data; 2810Sigor@sysoev.ru nxt_thread_wait(handle); 2820Sigor@sysoev.ru } 2830Sigor@sysoev.ru 2840Sigor@sysoev.ru threads = nxt_atomic_fetch_add(&tp->threads, -1); 2850Sigor@sysoev.ru 2861Sigor@sysoev.ru nxt_debug(task, "thread pool threads: %A", threads); 2870Sigor@sysoev.ru 2880Sigor@sysoev.ru if (threads > 1) { 2894Sigor@sysoev.ru nxt_work_set(&tp->work, nxt_thread_pool_exit, &tp->task, tp, 2904Sigor@sysoev.ru (void *) (uintptr_t) thread->handle); 2914Sigor@sysoev.ru 2924Sigor@sysoev.ru nxt_thread_pool_post(tp, &tp->work); 2930Sigor@sysoev.ru 2940Sigor@sysoev.ru } else { 2951Sigor@sysoev.ru nxt_debug(task, "thread pool destroy"); 2960Sigor@sysoev.ru 2970Sigor@sysoev.ru nxt_sem_destroy(&tp->sem); 2980Sigor@sysoev.ru 2994Sigor@sysoev.ru nxt_work_set(&tp->work, nxt_thread_pool_exit, &tp->task, tp, 3004Sigor@sysoev.ru (void *) (uintptr_t) thread->handle); 3010Sigor@sysoev.ru 3024Sigor@sysoev.ru nxt_event_engine_post(tp->engine, &tp->work); 3034Sigor@sysoev.ru 3044Sigor@sysoev.ru /* The "tp" memory should be freed by tp->exit handler. */ 3050Sigor@sysoev.ru } 3060Sigor@sysoev.ru 3071Sigor@sysoev.ru nxt_thread_exit(thread); 3081Sigor@sysoev.ru 3090Sigor@sysoev.ru nxt_unreachable(); 3100Sigor@sysoev.ru } 311