1*0Sigor@sysoev.ru 2*0Sigor@sysoev.ru /* 3*0Sigor@sysoev.ru * Copyright (C) Igor Sysoev 4*0Sigor@sysoev.ru * Copyright (C) NGINX, Inc. 5*0Sigor@sysoev.ru */ 6*0Sigor@sysoev.ru 7*0Sigor@sysoev.ru #include <nxt_main.h> 8*0Sigor@sysoev.ru 9*0Sigor@sysoev.ru 10*0Sigor@sysoev.ru static nxt_int_t nxt_thread_pool_init(nxt_thread_pool_t *tp); 11*0Sigor@sysoev.ru static void nxt_thread_pool_exit(nxt_thread_t *thr, void *obj, void *data); 12*0Sigor@sysoev.ru static void nxt_thread_pool_start(void *ctx); 13*0Sigor@sysoev.ru static void nxt_thread_pool_wait(nxt_thread_pool_t *tp); 14*0Sigor@sysoev.ru 15*0Sigor@sysoev.ru 16*0Sigor@sysoev.ru nxt_thread_pool_t * 17*0Sigor@sysoev.ru nxt_thread_pool_create(nxt_uint_t max_threads, nxt_nsec_t timeout, 18*0Sigor@sysoev.ru nxt_thread_pool_init_t init, nxt_event_engine_t *engine, 19*0Sigor@sysoev.ru nxt_work_handler_t exit) 20*0Sigor@sysoev.ru { 21*0Sigor@sysoev.ru nxt_thread_pool_t *tp; 22*0Sigor@sysoev.ru 23*0Sigor@sysoev.ru tp = nxt_zalloc(sizeof(nxt_thread_pool_t)); 24*0Sigor@sysoev.ru if (tp == NULL) { 25*0Sigor@sysoev.ru return NULL; 26*0Sigor@sysoev.ru } 27*0Sigor@sysoev.ru 28*0Sigor@sysoev.ru tp->max_threads = max_threads; 29*0Sigor@sysoev.ru tp->timeout = timeout; 30*0Sigor@sysoev.ru tp->engine = engine; 31*0Sigor@sysoev.ru tp->init = init; 32*0Sigor@sysoev.ru tp->exit = exit; 33*0Sigor@sysoev.ru 34*0Sigor@sysoev.ru return tp; 35*0Sigor@sysoev.ru } 36*0Sigor@sysoev.ru 37*0Sigor@sysoev.ru 38*0Sigor@sysoev.ru nxt_int_t 39*0Sigor@sysoev.ru nxt_thread_pool_post(nxt_thread_pool_t *tp, nxt_work_handler_t handler, 40*0Sigor@sysoev.ru void *obj, void *data, nxt_log_t *log) 41*0Sigor@sysoev.ru { 42*0Sigor@sysoev.ru nxt_thread_log_debug("thread pool post"); 43*0Sigor@sysoev.ru 44*0Sigor@sysoev.ru if (nxt_slow_path(nxt_thread_pool_init(tp) != NXT_OK)) { 45*0Sigor@sysoev.ru return NXT_ERROR; 46*0Sigor@sysoev.ru } 47*0Sigor@sysoev.ru 48*0Sigor@sysoev.ru nxt_locked_work_queue_add(&tp->work_queue, handler, obj, data, log); 49*0Sigor@sysoev.ru 50*0Sigor@sysoev.ru (void) nxt_sem_post(&tp->sem); 51*0Sigor@sysoev.ru 52*0Sigor@sysoev.ru return NXT_OK; 53*0Sigor@sysoev.ru } 54*0Sigor@sysoev.ru 55*0Sigor@sysoev.ru 56*0Sigor@sysoev.ru static nxt_int_t 57*0Sigor@sysoev.ru nxt_thread_pool_init(nxt_thread_pool_t *tp) 58*0Sigor@sysoev.ru { 59*0Sigor@sysoev.ru nxt_int_t ret; 60*0Sigor@sysoev.ru nxt_thread_link_t *link; 61*0Sigor@sysoev.ru nxt_thread_handle_t handle; 62*0Sigor@sysoev.ru 63*0Sigor@sysoev.ru if (nxt_fast_path(tp->ready)) { 64*0Sigor@sysoev.ru return NXT_OK; 65*0Sigor@sysoev.ru } 66*0Sigor@sysoev.ru 67*0Sigor@sysoev.ru nxt_thread_spin_lock(&tp->work_queue.lock); 68*0Sigor@sysoev.ru 69*0Sigor@sysoev.ru ret = NXT_OK; 70*0Sigor@sysoev.ru 71*0Sigor@sysoev.ru if (!tp->ready) { 72*0Sigor@sysoev.ru 73*0Sigor@sysoev.ru nxt_thread_log_debug("thread pool init"); 74*0Sigor@sysoev.ru 75*0Sigor@sysoev.ru (void) nxt_atomic_fetch_add(&tp->threads, 1); 76*0Sigor@sysoev.ru 77*0Sigor@sysoev.ru if (nxt_fast_path(nxt_sem_init(&tp->sem, 0) == NXT_OK)) { 78*0Sigor@sysoev.ru 79*0Sigor@sysoev.ru nxt_locked_work_queue_create(&tp->work_queue, 0); 80*0Sigor@sysoev.ru 81*0Sigor@sysoev.ru link = nxt_malloc(sizeof(nxt_thread_link_t)); 82*0Sigor@sysoev.ru 83*0Sigor@sysoev.ru if (nxt_fast_path(link != NULL)) { 84*0Sigor@sysoev.ru link->start = nxt_thread_pool_start; 85*0Sigor@sysoev.ru link->data = tp; 86*0Sigor@sysoev.ru link->engine = tp->engine; 87*0Sigor@sysoev.ru /* 88*0Sigor@sysoev.ru * link->exit is not used. link->engine is used just to 89*0Sigor@sysoev.ru * set thr->link by nxt_thread_trampoline() and the link 90*0Sigor@sysoev.ru * is a mark of the first thread of pool. 91*0Sigor@sysoev.ru */ 92*0Sigor@sysoev.ru if (nxt_thread_create(&handle, link) == NXT_OK) { 93*0Sigor@sysoev.ru tp->ready = 1; 94*0Sigor@sysoev.ru goto done; 95*0Sigor@sysoev.ru } 96*0Sigor@sysoev.ru } 97*0Sigor@sysoev.ru 98*0Sigor@sysoev.ru nxt_sem_destroy(&tp->sem); 99*0Sigor@sysoev.ru } 100*0Sigor@sysoev.ru 101*0Sigor@sysoev.ru (void) nxt_atomic_fetch_add(&tp->threads, -1); 102*0Sigor@sysoev.ru 103*0Sigor@sysoev.ru nxt_locked_work_queue_destroy(&tp->work_queue); 104*0Sigor@sysoev.ru 105*0Sigor@sysoev.ru ret = NXT_ERROR; 106*0Sigor@sysoev.ru } 107*0Sigor@sysoev.ru 108*0Sigor@sysoev.ru done: 109*0Sigor@sysoev.ru 110*0Sigor@sysoev.ru nxt_thread_spin_unlock(&tp->work_queue.lock); 111*0Sigor@sysoev.ru 112*0Sigor@sysoev.ru return ret; 113*0Sigor@sysoev.ru } 114*0Sigor@sysoev.ru 115*0Sigor@sysoev.ru 116*0Sigor@sysoev.ru static void 117*0Sigor@sysoev.ru nxt_thread_pool_start(void *ctx) 118*0Sigor@sysoev.ru { 119*0Sigor@sysoev.ru void *obj, *data; 120*0Sigor@sysoev.ru nxt_thread_t *thr; 121*0Sigor@sysoev.ru nxt_thread_pool_t *tp; 122*0Sigor@sysoev.ru nxt_work_handler_t handler; 123*0Sigor@sysoev.ru 124*0Sigor@sysoev.ru tp = ctx; 125*0Sigor@sysoev.ru thr = nxt_thread(); 126*0Sigor@sysoev.ru 127*0Sigor@sysoev.ru if (thr->link != NULL) { 128*0Sigor@sysoev.ru /* Only the first thread has a link. */ 129*0Sigor@sysoev.ru tp->main = thr->handle; 130*0Sigor@sysoev.ru nxt_free(thr->link); 131*0Sigor@sysoev.ru thr->link = NULL; 132*0Sigor@sysoev.ru } 133*0Sigor@sysoev.ru 134*0Sigor@sysoev.ru thr->thread_pool = tp; 135*0Sigor@sysoev.ru 136*0Sigor@sysoev.ru if (tp->init != NULL) { 137*0Sigor@sysoev.ru tp->init(); 138*0Sigor@sysoev.ru } 139*0Sigor@sysoev.ru 140*0Sigor@sysoev.ru nxt_thread_work_queue_create(thr, 8); 141*0Sigor@sysoev.ru 142*0Sigor@sysoev.ru for ( ;; ) { 143*0Sigor@sysoev.ru nxt_thread_pool_wait(tp); 144*0Sigor@sysoev.ru 145*0Sigor@sysoev.ru handler = nxt_locked_work_queue_pop(&tp->work_queue, &obj, 146*0Sigor@sysoev.ru &data, &thr->log); 147*0Sigor@sysoev.ru 148*0Sigor@sysoev.ru if (nxt_fast_path(handler != NULL)) { 149*0Sigor@sysoev.ru nxt_log_debug(thr->log, "locked work queue"); 150*0Sigor@sysoev.ru handler(thr, obj, data); 151*0Sigor@sysoev.ru } 152*0Sigor@sysoev.ru 153*0Sigor@sysoev.ru for ( ;; ) { 154*0Sigor@sysoev.ru thr->log = &nxt_main_log; 155*0Sigor@sysoev.ru 156*0Sigor@sysoev.ru handler = nxt_thread_work_queue_pop(thr, &obj, &data, &thr->log); 157*0Sigor@sysoev.ru 158*0Sigor@sysoev.ru if (handler == NULL) { 159*0Sigor@sysoev.ru break; 160*0Sigor@sysoev.ru } 161*0Sigor@sysoev.ru 162*0Sigor@sysoev.ru handler(thr, obj, data); 163*0Sigor@sysoev.ru } 164*0Sigor@sysoev.ru 165*0Sigor@sysoev.ru thr->log = &nxt_main_log; 166*0Sigor@sysoev.ru } 167*0Sigor@sysoev.ru } 168*0Sigor@sysoev.ru 169*0Sigor@sysoev.ru 170*0Sigor@sysoev.ru static void 171*0Sigor@sysoev.ru nxt_thread_pool_wait(nxt_thread_pool_t *tp) 172*0Sigor@sysoev.ru { 173*0Sigor@sysoev.ru nxt_err_t err; 174*0Sigor@sysoev.ru nxt_thread_t *thr; 175*0Sigor@sysoev.ru nxt_atomic_uint_t waiting, threads; 176*0Sigor@sysoev.ru nxt_thread_link_t *link; 177*0Sigor@sysoev.ru nxt_thread_handle_t handle; 178*0Sigor@sysoev.ru 179*0Sigor@sysoev.ru thr = nxt_thread(); 180*0Sigor@sysoev.ru 181*0Sigor@sysoev.ru nxt_log_debug(thr->log, "thread pool wait"); 182*0Sigor@sysoev.ru 183*0Sigor@sysoev.ru (void) nxt_atomic_fetch_add(&tp->waiting, 1); 184*0Sigor@sysoev.ru 185*0Sigor@sysoev.ru for ( ;; ) { 186*0Sigor@sysoev.ru err = nxt_sem_wait(&tp->sem, tp->timeout); 187*0Sigor@sysoev.ru 188*0Sigor@sysoev.ru if (err == 0) { 189*0Sigor@sysoev.ru waiting = nxt_atomic_fetch_add(&tp->waiting, -1); 190*0Sigor@sysoev.ru break; 191*0Sigor@sysoev.ru } 192*0Sigor@sysoev.ru 193*0Sigor@sysoev.ru if (err == NXT_ETIMEDOUT) { 194*0Sigor@sysoev.ru if (nxt_thread_handle_equal(thr->handle, tp->main)) { 195*0Sigor@sysoev.ru continue; 196*0Sigor@sysoev.ru } 197*0Sigor@sysoev.ru } 198*0Sigor@sysoev.ru 199*0Sigor@sysoev.ru (void) nxt_atomic_fetch_add(&tp->waiting, -1); 200*0Sigor@sysoev.ru (void) nxt_atomic_fetch_add(&tp->threads, -1); 201*0Sigor@sysoev.ru 202*0Sigor@sysoev.ru nxt_thread_exit(thr); 203*0Sigor@sysoev.ru nxt_unreachable(); 204*0Sigor@sysoev.ru } 205*0Sigor@sysoev.ru 206*0Sigor@sysoev.ru nxt_log_debug(thr->log, "thread pool awake, waiting: %A", waiting); 207*0Sigor@sysoev.ru 208*0Sigor@sysoev.ru if (waiting > 1) { 209*0Sigor@sysoev.ru return; 210*0Sigor@sysoev.ru } 211*0Sigor@sysoev.ru 212*0Sigor@sysoev.ru do { 213*0Sigor@sysoev.ru threads = tp->threads; 214*0Sigor@sysoev.ru 215*0Sigor@sysoev.ru if (threads >= tp->max_threads) { 216*0Sigor@sysoev.ru return; 217*0Sigor@sysoev.ru } 218*0Sigor@sysoev.ru 219*0Sigor@sysoev.ru } while (!nxt_atomic_cmp_set(&tp->threads, threads, threads + 1)); 220*0Sigor@sysoev.ru 221*0Sigor@sysoev.ru link = nxt_zalloc(sizeof(nxt_thread_link_t)); 222*0Sigor@sysoev.ru 223*0Sigor@sysoev.ru if (nxt_fast_path(link != NULL)) { 224*0Sigor@sysoev.ru link->start = nxt_thread_pool_start; 225*0Sigor@sysoev.ru link->data = tp; 226*0Sigor@sysoev.ru 227*0Sigor@sysoev.ru if (nxt_thread_create(&handle, link) != NXT_OK) { 228*0Sigor@sysoev.ru (void) nxt_atomic_fetch_add(&tp->threads, -1); 229*0Sigor@sysoev.ru } 230*0Sigor@sysoev.ru } 231*0Sigor@sysoev.ru } 232*0Sigor@sysoev.ru 233*0Sigor@sysoev.ru 234*0Sigor@sysoev.ru void 235*0Sigor@sysoev.ru nxt_thread_pool_destroy(nxt_thread_pool_t *tp) 236*0Sigor@sysoev.ru { 237*0Sigor@sysoev.ru nxt_thread_t *thr; 238*0Sigor@sysoev.ru 239*0Sigor@sysoev.ru if (!tp->ready) { 240*0Sigor@sysoev.ru thr = nxt_thread(); 241*0Sigor@sysoev.ru 242*0Sigor@sysoev.ru nxt_thread_work_queue_add(thr, &thr->work_queue.main, tp->exit, 243*0Sigor@sysoev.ru tp, NULL, &nxt_main_log); 244*0Sigor@sysoev.ru return; 245*0Sigor@sysoev.ru } 246*0Sigor@sysoev.ru 247*0Sigor@sysoev.ru if (tp->max_threads != 0) { 248*0Sigor@sysoev.ru /* Disable new threads creation and mark a pool as being destroyed. */ 249*0Sigor@sysoev.ru tp->max_threads = 0; 250*0Sigor@sysoev.ru 251*0Sigor@sysoev.ru nxt_thread_pool_post(tp, nxt_thread_pool_exit, tp, NULL, &nxt_main_log); 252*0Sigor@sysoev.ru } 253*0Sigor@sysoev.ru } 254*0Sigor@sysoev.ru 255*0Sigor@sysoev.ru 256*0Sigor@sysoev.ru /* 257*0Sigor@sysoev.ru * A thread handle (pthread_t) is either pointer or integer, so it can be 258*0Sigor@sysoev.ru * passed as work handler pointer "data" argument. To convert void pointer 259*0Sigor@sysoev.ru * to pthread_t and vice versa the source argument should be cast first to 260*0Sigor@sysoev.ru * uintptr_t type and then to the destination type. 261*0Sigor@sysoev.ru * 262*0Sigor@sysoev.ru * If the handle would be a struct it should be stored in thread pool and 263*0Sigor@sysoev.ru * the thread pool must be freed in the thread pool exit procedure after 264*0Sigor@sysoev.ru * the last thread of pool will exit. 265*0Sigor@sysoev.ru */ 266*0Sigor@sysoev.ru 267*0Sigor@sysoev.ru static void 268*0Sigor@sysoev.ru nxt_thread_pool_exit(nxt_thread_t *thr, void *obj, void *data) 269*0Sigor@sysoev.ru { 270*0Sigor@sysoev.ru nxt_thread_pool_t *tp; 271*0Sigor@sysoev.ru nxt_atomic_uint_t threads; 272*0Sigor@sysoev.ru nxt_thread_handle_t handle; 273*0Sigor@sysoev.ru 274*0Sigor@sysoev.ru tp = obj; 275*0Sigor@sysoev.ru 276*0Sigor@sysoev.ru nxt_log_debug(thr->log, "thread pool exit"); 277*0Sigor@sysoev.ru 278*0Sigor@sysoev.ru if (data != NULL) { 279*0Sigor@sysoev.ru handle = (nxt_thread_handle_t) (uintptr_t) data; 280*0Sigor@sysoev.ru nxt_thread_wait(handle); 281*0Sigor@sysoev.ru } 282*0Sigor@sysoev.ru 283*0Sigor@sysoev.ru threads = nxt_atomic_fetch_add(&tp->threads, -1); 284*0Sigor@sysoev.ru 285*0Sigor@sysoev.ru nxt_log_debug(thr->log, "thread pool threads: %A", threads); 286*0Sigor@sysoev.ru 287*0Sigor@sysoev.ru if (threads > 1) { 288*0Sigor@sysoev.ru nxt_thread_pool_post(tp, nxt_thread_pool_exit, tp, 289*0Sigor@sysoev.ru (void *) (uintptr_t) thr->handle, &nxt_main_log); 290*0Sigor@sysoev.ru 291*0Sigor@sysoev.ru } else { 292*0Sigor@sysoev.ru nxt_main_log_debug("thread pool destroy"); 293*0Sigor@sysoev.ru 294*0Sigor@sysoev.ru nxt_event_engine_post(tp->engine, tp->exit, tp, 295*0Sigor@sysoev.ru (void *) (uintptr_t) thr->handle, &nxt_main_log); 296*0Sigor@sysoev.ru 297*0Sigor@sysoev.ru nxt_sem_destroy(&tp->sem); 298*0Sigor@sysoev.ru 299*0Sigor@sysoev.ru nxt_locked_work_queue_destroy(&tp->work_queue); 300*0Sigor@sysoev.ru 301*0Sigor@sysoev.ru nxt_free(tp); 302*0Sigor@sysoev.ru } 303*0Sigor@sysoev.ru 304*0Sigor@sysoev.ru nxt_thread_work_queue_destroy(thr); 305*0Sigor@sysoev.ru 306*0Sigor@sysoev.ru nxt_thread_exit(thr); 307*0Sigor@sysoev.ru nxt_unreachable(); 308*0Sigor@sysoev.ru } 309