nxt_thread_pool.c (0:a63ceefd6ab0) nxt_thread_pool.c (1:fdc027c56872)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8
9
10static nxt_int_t nxt_thread_pool_init(nxt_thread_pool_t *tp);
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8
9
10static nxt_int_t nxt_thread_pool_init(nxt_thread_pool_t *tp);
11static void nxt_thread_pool_exit(nxt_thread_t *thr, void *obj, void *data);
11static void nxt_thread_pool_exit(nxt_task_t *task, void *obj, void *data);
12static void nxt_thread_pool_start(void *ctx);
13static void nxt_thread_pool_wait(nxt_thread_pool_t *tp);
14
15
16nxt_thread_pool_t *
17nxt_thread_pool_create(nxt_uint_t max_threads, nxt_nsec_t timeout,
18 nxt_thread_pool_init_t init, nxt_event_engine_t *engine,
19 nxt_work_handler_t exit)
20{
21 nxt_thread_pool_t *tp;
22
23 tp = nxt_zalloc(sizeof(nxt_thread_pool_t));
24 if (tp == NULL) {
25 return NULL;
26 }
27
28 tp->max_threads = max_threads;
29 tp->timeout = timeout;
30 tp->engine = engine;
12static void nxt_thread_pool_start(void *ctx);
13static void nxt_thread_pool_wait(nxt_thread_pool_t *tp);
14
15
16nxt_thread_pool_t *
17nxt_thread_pool_create(nxt_uint_t max_threads, nxt_nsec_t timeout,
18 nxt_thread_pool_init_t init, nxt_event_engine_t *engine,
19 nxt_work_handler_t exit)
20{
21 nxt_thread_pool_t *tp;
22
23 tp = nxt_zalloc(sizeof(nxt_thread_pool_t));
24 if (tp == NULL) {
25 return NULL;
26 }
27
28 tp->max_threads = max_threads;
29 tp->timeout = timeout;
30 tp->engine = engine;
31 tp->task.thread = engine->task.thread;
32 tp->task.log = engine->task.log;
31 tp->init = init;
32 tp->exit = exit;
33
34 return tp;
35}
36
37
38nxt_int_t
39nxt_thread_pool_post(nxt_thread_pool_t *tp, nxt_work_handler_t handler,
33 tp->init = init;
34 tp->exit = exit;
35
36 return tp;
37}
38
39
40nxt_int_t
41nxt_thread_pool_post(nxt_thread_pool_t *tp, nxt_work_handler_t handler,
40 void *obj, void *data, nxt_log_t *log)
42 nxt_task_t *task, void *obj, void *data)
41{
42 nxt_thread_log_debug("thread pool post");
43
44 if (nxt_slow_path(nxt_thread_pool_init(tp) != NXT_OK)) {
45 return NXT_ERROR;
46 }
47
43{
44 nxt_thread_log_debug("thread pool post");
45
46 if (nxt_slow_path(nxt_thread_pool_init(tp) != NXT_OK)) {
47 return NXT_ERROR;
48 }
49
48 nxt_locked_work_queue_add(&tp->work_queue, handler, obj, data, log);
50 nxt_locked_work_queue_add(&tp->work_queue, handler, task, obj, data);
49
50 (void) nxt_sem_post(&tp->sem);
51
52 return NXT_OK;
53}
54
55
56static nxt_int_t

--- 55 unchanged lines hidden (view full) ---

112 return ret;
113}
114
115
116static void
117nxt_thread_pool_start(void *ctx)
118{
119 void *obj, *data;
51
52 (void) nxt_sem_post(&tp->sem);
53
54 return NXT_OK;
55}
56
57
58static nxt_int_t

--- 55 unchanged lines hidden (view full) ---

114 return ret;
115}
116
117
118static void
119nxt_thread_pool_start(void *ctx)
120{
121 void *obj, *data;
122 nxt_task_t *task;
120 nxt_thread_t *thr;
121 nxt_thread_pool_t *tp;
122 nxt_work_handler_t handler;
123
124 tp = ctx;
125 thr = nxt_thread();
126
127 if (thr->link != NULL) {
128 /* Only the first thread has a link. */
129 tp->main = thr->handle;
130 nxt_free(thr->link);
131 thr->link = NULL;
123 nxt_thread_t *thr;
124 nxt_thread_pool_t *tp;
125 nxt_work_handler_t handler;
126
127 tp = ctx;
128 thr = nxt_thread();
129
130 if (thr->link != NULL) {
131 /* Only the first thread has a link. */
132 tp->main = thr->handle;
133 nxt_free(thr->link);
134 thr->link = NULL;
135
136 tp->task.thread = thr;
132 }
133
134 thr->thread_pool = tp;
135
136 if (tp->init != NULL) {
137 tp->init();
138 }
139
140 nxt_thread_work_queue_create(thr, 8);
141
142 for ( ;; ) {
143 nxt_thread_pool_wait(tp);
144
137 }
138
139 thr->thread_pool = tp;
140
141 if (tp->init != NULL) {
142 tp->init();
143 }
144
145 nxt_thread_work_queue_create(thr, 8);
146
147 for ( ;; ) {
148 nxt_thread_pool_wait(tp);
149
145 handler = nxt_locked_work_queue_pop(&tp->work_queue, &obj,
146 &data, &thr->log);
150 handler = nxt_locked_work_queue_pop(&tp->work_queue, &task, &obj,
151 &data);
147
148 if (nxt_fast_path(handler != NULL)) {
152
153 if (nxt_fast_path(handler != NULL)) {
154 task->thread = thr;
149 nxt_log_debug(thr->log, "locked work queue");
155 nxt_log_debug(thr->log, "locked work queue");
150 handler(thr, obj, data);
156 handler(task, obj, data);
151 }
152
153 for ( ;; ) {
154 thr->log = &nxt_main_log;
155
157 }
158
159 for ( ;; ) {
160 thr->log = &nxt_main_log;
161
156 handler = nxt_thread_work_queue_pop(thr, &obj, &data, &thr->log);
162 handler = nxt_thread_work_queue_pop(thr, &task, &obj, &data);
157
158 if (handler == NULL) {
159 break;
160 }
161
163
164 if (handler == NULL) {
165 break;
166 }
167
162 handler(thr, obj, data);
168 handler(task, obj, data);
163 }
164
165 thr->log = &nxt_main_log;
166 }
167}
168
169
170static void

--- 60 unchanged lines hidden (view full) ---

231}
232
233
234void
235nxt_thread_pool_destroy(nxt_thread_pool_t *tp)
236{
237 nxt_thread_t *thr;
238
169 }
170
171 thr->log = &nxt_main_log;
172 }
173}
174
175
176static void

--- 60 unchanged lines hidden (view full) ---

237}
238
239
240void
241nxt_thread_pool_destroy(nxt_thread_pool_t *tp)
242{
243 nxt_thread_t *thr;
244
239 if (!tp->ready) {
240 thr = nxt_thread();
245 thr = nxt_thread();
241
246
247 if (!tp->ready) {
242 nxt_thread_work_queue_add(thr, &thr->work_queue.main, tp->exit,
248 nxt_thread_work_queue_add(thr, &thr->work_queue.main, tp->exit,
243 tp, NULL, &nxt_main_log);
249 &tp->task, tp, NULL);
244 return;
245 }
246
247 if (tp->max_threads != 0) {
248 /* Disable new threads creation and mark a pool as being destroyed. */
249 tp->max_threads = 0;
250
250 return;
251 }
252
253 if (tp->max_threads != 0) {
254 /* Disable new threads creation and mark a pool as being destroyed. */
255 tp->max_threads = 0;
256
251 nxt_thread_pool_post(tp, nxt_thread_pool_exit, tp, NULL, &nxt_main_log);
257 nxt_thread_pool_post(tp, nxt_thread_pool_exit, &tp->task, tp, NULL);
252 }
253}
254
255
256/*
257 * A thread handle (pthread_t) is either pointer or integer, so it can be
258 * passed as work handler pointer "data" argument. To convert void pointer
259 * to pthread_t and vice versa the source argument should be cast first to
260 * uintptr_t type and then to the destination type.
261 *
262 * If the handle would be a struct it should be stored in thread pool and
263 * the thread pool must be freed in the thread pool exit procedure after
264 * the last thread of pool will exit.
265 */
266
267static void
258 }
259}
260
261
262/*
263 * A thread handle (pthread_t) is either pointer or integer, so it can be
264 * passed as work handler pointer "data" argument. To convert void pointer
265 * to pthread_t and vice versa the source argument should be cast first to
266 * uintptr_t type and then to the destination type.
267 *
268 * If the handle would be a struct it should be stored in thread pool and
269 * the thread pool must be freed in the thread pool exit procedure after
270 * the last thread of pool will exit.
271 */
272
273static void
268nxt_thread_pool_exit(nxt_thread_t *thr, void *obj, void *data)
274nxt_thread_pool_exit(nxt_task_t *task, void *obj, void *data)
269{
275{
276 nxt_thread_t *thread;
270 nxt_thread_pool_t *tp;
271 nxt_atomic_uint_t threads;
272 nxt_thread_handle_t handle;
273
274 tp = obj;
277 nxt_thread_pool_t *tp;
278 nxt_atomic_uint_t threads;
279 nxt_thread_handle_t handle;
280
281 tp = obj;
282 thread = task->thread;
275
283
276 nxt_log_debug(thr->log, "thread pool exit");
284 nxt_debug(task, "thread pool exit");
277
278 if (data != NULL) {
279 handle = (nxt_thread_handle_t) (uintptr_t) data;
280 nxt_thread_wait(handle);
281 }
282
283 threads = nxt_atomic_fetch_add(&tp->threads, -1);
284
285
286 if (data != NULL) {
287 handle = (nxt_thread_handle_t) (uintptr_t) data;
288 nxt_thread_wait(handle);
289 }
290
291 threads = nxt_atomic_fetch_add(&tp->threads, -1);
292
285 nxt_log_debug(thr->log, "thread pool threads: %A", threads);
293 nxt_debug(task, "thread pool threads: %A", threads);
286
287 if (threads > 1) {
294
295 if (threads > 1) {
288 nxt_thread_pool_post(tp, nxt_thread_pool_exit, tp,
289 (void *) (uintptr_t) thr->handle, &nxt_main_log);
296 nxt_thread_pool_post(tp, nxt_thread_pool_exit, &tp->task, tp,
297 (void *) (uintptr_t) thread->handle);
290
291 } else {
298
299 } else {
292 nxt_main_log_debug("thread pool destroy");
300 nxt_debug(task, "thread pool destroy");
293
301
294 nxt_event_engine_post(tp->engine, tp->exit, tp,
295 (void *) (uintptr_t) thr->handle, &nxt_main_log);
302 nxt_event_engine_post(tp->engine, tp->exit, &tp->task, tp,
303 (void *) (uintptr_t) thread->handle,
304 &nxt_main_log);
296
297 nxt_sem_destroy(&tp->sem);
298
299 nxt_locked_work_queue_destroy(&tp->work_queue);
300
301 nxt_free(tp);
302 }
303
305
306 nxt_sem_destroy(&tp->sem);
307
308 nxt_locked_work_queue_destroy(&tp->work_queue);
309
310 nxt_free(tp);
311 }
312
304 nxt_thread_work_queue_destroy(thr);
313 nxt_thread_work_queue_destroy(thread);
305
314
306 nxt_thread_exit(thr);
315 nxt_thread_exit(thread);
316
307 nxt_unreachable();
308}
317 nxt_unreachable();
318}