1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 static nxt_int_t nxt_thread_pool_init(nxt_thread_pool_t *tp); 11 static void nxt_thread_pool_exit(nxt_task_t *task, void *obj, void *data); 12 static void nxt_thread_pool_start(void *ctx); 13 static void nxt_thread_pool_wait(nxt_thread_pool_t *tp); 14 15 16 nxt_thread_pool_t * 17 nxt_thread_pool_create(nxt_uint_t max_threads, nxt_nsec_t timeout, 18 nxt_thread_pool_init_t init, nxt_event_engine_t *engine, 19 nxt_work_handler_t exit) 20 { 21 nxt_thread_pool_t *tp; 22 23 tp = nxt_zalloc(sizeof(nxt_thread_pool_t)); 24 if (tp == NULL) { 25 return NULL; 26 } 27 28 tp->max_threads = max_threads; 29 tp->timeout = timeout; 30 tp->engine = engine; 31 tp->task.thread = engine->task.thread; 32 tp->task.log = engine->task.log; 33 tp->init = init; 34 tp->exit = exit; 35 36 return tp; 37 } 38 39 40 nxt_int_t 41 nxt_thread_pool_post(nxt_thread_pool_t *tp, nxt_work_t *work) 42 { 43 nxt_thread_log_debug("thread pool post"); 44 45 if (nxt_slow_path(nxt_thread_pool_init(tp) != NXT_OK)) { 46 return NXT_ERROR; 47 } 48 49 nxt_locked_work_queue_add(&tp->work_queue, work); 50 51 (void) nxt_sem_post(&tp->sem); 52 53 return NXT_OK; 54 } 55 56 57 static nxt_int_t 58 nxt_thread_pool_init(nxt_thread_pool_t *tp) 59 { 60 nxt_int_t ret; 61 nxt_thread_link_t *link; 62 nxt_thread_handle_t handle; 63 64 if (nxt_fast_path(tp->ready)) { 65 return NXT_OK; 66 } 67 68 if (tp->max_threads == 0) { 69 /* The pool is being destroyed. */ 70 return NXT_ERROR; 71 } 72 73 nxt_thread_spin_lock(&tp->work_queue.lock); 74 75 ret = NXT_OK; 76 77 if (!tp->ready) { 78 79 nxt_thread_log_debug("thread pool init"); 80 81 (void) nxt_atomic_fetch_add(&tp->threads, 1); 82 83 if (nxt_fast_path(nxt_sem_init(&tp->sem, 0) == NXT_OK)) { 84 85 link = nxt_malloc(sizeof(nxt_thread_link_t)); 86 87 if (nxt_fast_path(link != NULL)) { 88 link->start = nxt_thread_pool_start; 89 link->data = tp; 90 link->engine = tp->engine; 91 /* 92 * link->exit is not used. link->engine is used just to 93 * set thr->link by nxt_thread_trampoline() and the link 94 * is a mark of the first thread of pool. 95 */ 96 if (nxt_thread_create(&handle, link) == NXT_OK) { 97 tp->ready = 1; 98 goto done; 99 } 100 } 101 102 nxt_sem_destroy(&tp->sem); 103 } 104 105 (void) nxt_atomic_fetch_add(&tp->threads, -1); 106 107 ret = NXT_ERROR; 108 } 109 110 done: 111 112 nxt_thread_spin_unlock(&tp->work_queue.lock); 113 114 return ret; 115 } 116 117 118 static void 119 nxt_thread_pool_start(void *ctx) 120 { 121 void *obj, *data; 122 nxt_task_t *task; 123 nxt_thread_t *thr; 124 nxt_thread_pool_t *tp; 125 nxt_work_handler_t handler; 126 127 tp = ctx; 128 thr = nxt_thread(); 129 130 if (thr->link != NULL) { 131 /* Only the first thread has a link. */ 132 tp->main = thr->handle; 133 nxt_free(thr->link); 134 thr->link = NULL; 135 136 tp->task.thread = thr; 137 } 138 139 thr->thread_pool = tp; 140 141 if (tp->init != NULL) { 142 tp->init(); 143 } 144 145 for ( ;; ) { 146 nxt_thread_pool_wait(tp); 147 148 handler = nxt_locked_work_queue_pop(&tp->work_queue, &task, &obj, 149 &data); 150 151 if (nxt_fast_path(handler != NULL)) { 152 task->thread = thr; 153 154 nxt_log_debug(thr->log, "locked work queue"); 155 156 handler(task, obj, data); 157 } 158 159 thr->log = &nxt_main_log; 160 } 161 } 162 163 164 static void 165 nxt_thread_pool_wait(nxt_thread_pool_t *tp) 166 { 167 nxt_err_t err; 168 nxt_thread_t *thr; 169 nxt_atomic_uint_t waiting, threads; 170 nxt_thread_link_t *link; 171 nxt_thread_handle_t handle; 172 173 thr = nxt_thread(); 174 175 nxt_log_debug(thr->log, "thread pool wait"); 176 177 (void) nxt_atomic_fetch_add(&tp->waiting, 1); 178 179 for ( ;; ) { 180 err = nxt_sem_wait(&tp->sem, tp->timeout); 181 182 if (err == 0) { 183 waiting = nxt_atomic_fetch_add(&tp->waiting, -1); 184 break; 185 } 186 187 if (err == NXT_ETIMEDOUT) { 188 if (nxt_thread_handle_equal(thr->handle, tp->main)) { 189 continue; 190 } 191 } 192 193 (void) nxt_atomic_fetch_add(&tp->waiting, -1); 194 (void) nxt_atomic_fetch_add(&tp->threads, -1); 195 196 nxt_thread_exit(thr); 197 nxt_unreachable(); 198 } 199 200 nxt_log_debug(thr->log, "thread pool awake, waiting: %A", waiting); 201 202 if (waiting > 1) { 203 return; 204 } 205 206 do { 207 threads = tp->threads; 208 209 if (threads >= tp->max_threads) { 210 return; 211 } 212 213 } while (!nxt_atomic_cmp_set(&tp->threads, threads, threads + 1)); 214 215 link = nxt_zalloc(sizeof(nxt_thread_link_t)); 216 217 if (nxt_fast_path(link != NULL)) { 218 link->start = nxt_thread_pool_start; 219 link->data = tp; 220 221 if (nxt_thread_create(&handle, link) != NXT_OK) { 222 (void) nxt_atomic_fetch_add(&tp->threads, -1); 223 } 224 } 225 } 226 227 228 void 229 nxt_thread_pool_destroy(nxt_thread_pool_t *tp) 230 { 231 nxt_thread_t *thr; 232 233 thr = nxt_thread(); 234 235 if (!tp->ready) { 236 nxt_work_queue_add(&thr->engine->fast_work_queue, tp->exit, 237 &tp->task, tp, NULL); 238 return; 239 } 240 241 if (tp->max_threads != 0) { 242 /* Disable new threads creation and mark a pool as being destroyed. */ 243 tp->max_threads = 0; 244 245 nxt_work_set(&tp->work, nxt_thread_pool_exit, &tp->task, tp, NULL); 246 247 nxt_thread_pool_post(tp, &tp->work); 248 } 249 } 250 251 252 /* 253 * A thread handle (pthread_t) is either pointer or integer, so it can be 254 * passed as work handler pointer "data" argument. To convert void pointer 255 * to pthread_t and vice versa the source argument should be cast first to 256 * uintptr_t type and then to the destination type. 257 * 258 * If the handle would be a struct it should be stored in thread pool and 259 * the thread pool must be freed in the thread pool exit procedure after 260 * the last thread of pool will exit. 261 */ 262 263 static void 264 nxt_thread_pool_exit(nxt_task_t *task, void *obj, void *data) 265 { 266 nxt_thread_t *thread; 267 nxt_thread_pool_t *tp; 268 nxt_atomic_uint_t threads; 269 nxt_thread_handle_t handle; 270 271 tp = obj; 272 thread = task->thread; 273 274 nxt_debug(task, "thread pool exit"); 275 276 if (data != NULL) { 277 handle = (nxt_thread_handle_t) (uintptr_t) data; 278 nxt_thread_wait(handle); 279 } 280 281 threads = nxt_atomic_fetch_add(&tp->threads, -1); 282 283 nxt_debug(task, "thread pool threads: %A", threads); 284 285 if (threads > 1) { 286 nxt_work_set(&tp->work, nxt_thread_pool_exit, &tp->task, tp, 287 (void *) (uintptr_t) thread->handle); 288 289 nxt_thread_pool_post(tp, &tp->work); 290 291 } else { 292 nxt_debug(task, "thread pool destroy"); 293 294 nxt_sem_destroy(&tp->sem); 295 296 nxt_work_set(&tp->work, nxt_thread_pool_exit, &tp->task, tp, 297 (void *) (uintptr_t) thread->handle); 298 299 nxt_event_engine_post(tp->engine, &tp->work); 300 301 /* The "tp" memory should be freed by tp->exit handler. */ 302 } 303 304 nxt_thread_exit(thread); 305 306 nxt_unreachable(); 307 } 308