nxt_thread_pool.c (0:a63ceefd6ab0) nxt_thread_pool.c (1:fdc027c56872)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8
9
10static nxt_int_t nxt_thread_pool_init(nxt_thread_pool_t *tp);
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8
9
10static nxt_int_t nxt_thread_pool_init(nxt_thread_pool_t *tp);
11static void nxt_thread_pool_exit(nxt_thread_t *thr, void *obj, void *data);
11static void nxt_thread_pool_exit(nxt_task_t *task, void *obj, void *data);
12static void nxt_thread_pool_start(void *ctx);
13static void nxt_thread_pool_wait(nxt_thread_pool_t *tp);
14
15
16nxt_thread_pool_t *
17nxt_thread_pool_create(nxt_uint_t max_threads, nxt_nsec_t timeout,
18 nxt_thread_pool_init_t init, nxt_event_engine_t *engine,
19 nxt_work_handler_t exit)
20{
21 nxt_thread_pool_t *tp;
22
23 tp = nxt_zalloc(sizeof(nxt_thread_pool_t));
24 if (tp == NULL) {
25 return NULL;
26 }
27
28 tp->max_threads = max_threads;
29 tp->timeout = timeout;
30 tp->engine = engine;
12static void nxt_thread_pool_start(void *ctx);
13static void nxt_thread_pool_wait(nxt_thread_pool_t *tp);
14
15
16nxt_thread_pool_t *
17nxt_thread_pool_create(nxt_uint_t max_threads, nxt_nsec_t timeout,
18 nxt_thread_pool_init_t init, nxt_event_engine_t *engine,
19 nxt_work_handler_t exit)
20{
21 nxt_thread_pool_t *tp;
22
23 tp = nxt_zalloc(sizeof(nxt_thread_pool_t));
24 if (tp == NULL) {
25 return NULL;
26 }
27
28 tp->max_threads = max_threads;
29 tp->timeout = timeout;
30 tp->engine = engine;
31 tp->task.thread = engine->task.thread;
32 tp->task.log = engine->task.log;
31 tp->init = init;
32 tp->exit = exit;
33
34 return tp;
35}
36
37
38nxt_int_t
39nxt_thread_pool_post(nxt_thread_pool_t *tp, nxt_work_handler_t handler,
33 tp->init = init;
34 tp->exit = exit;
35
36 return tp;
37}
38
39
40nxt_int_t
41nxt_thread_pool_post(nxt_thread_pool_t *tp, nxt_work_handler_t handler,
40 void *obj, void *data, nxt_log_t *log)
42 nxt_task_t *task, void *obj, void *data)
41{
42 nxt_thread_log_debug("thread pool post");
43
44 if (nxt_slow_path(nxt_thread_pool_init(tp) != NXT_OK)) {
45 return NXT_ERROR;
46 }
47
43{
44 nxt_thread_log_debug("thread pool post");
45
46 if (nxt_slow_path(nxt_thread_pool_init(tp) != NXT_OK)) {
47 return NXT_ERROR;
48 }
49
48 nxt_locked_work_queue_add(&tp->work_queue, handler, obj, data, log);
50 nxt_locked_work_queue_add(&tp->work_queue, handler, task, obj, data);
49
50 (void) nxt_sem_post(&tp->sem);
51
52 return NXT_OK;
53}
54
55
56static nxt_int_t
57nxt_thread_pool_init(nxt_thread_pool_t *tp)
58{
59 nxt_int_t ret;
60 nxt_thread_link_t *link;
61 nxt_thread_handle_t handle;
62
63 if (nxt_fast_path(tp->ready)) {
64 return NXT_OK;
65 }
66
67 nxt_thread_spin_lock(&tp->work_queue.lock);
68
69 ret = NXT_OK;
70
71 if (!tp->ready) {
72
73 nxt_thread_log_debug("thread pool init");
74
75 (void) nxt_atomic_fetch_add(&tp->threads, 1);
76
77 if (nxt_fast_path(nxt_sem_init(&tp->sem, 0) == NXT_OK)) {
78
79 nxt_locked_work_queue_create(&tp->work_queue, 0);
80
81 link = nxt_malloc(sizeof(nxt_thread_link_t));
82
83 if (nxt_fast_path(link != NULL)) {
84 link->start = nxt_thread_pool_start;
85 link->data = tp;
86 link->engine = tp->engine;
87 /*
88 * link->exit is not used. link->engine is used just to
89 * set thr->link by nxt_thread_trampoline() and the link
90 * is a mark of the first thread of pool.
91 */
92 if (nxt_thread_create(&handle, link) == NXT_OK) {
93 tp->ready = 1;
94 goto done;
95 }
96 }
97
98 nxt_sem_destroy(&tp->sem);
99 }
100
101 (void) nxt_atomic_fetch_add(&tp->threads, -1);
102
103 nxt_locked_work_queue_destroy(&tp->work_queue);
104
105 ret = NXT_ERROR;
106 }
107
108done:
109
110 nxt_thread_spin_unlock(&tp->work_queue.lock);
111
112 return ret;
113}
114
115
116static void
117nxt_thread_pool_start(void *ctx)
118{
119 void *obj, *data;
51
52 (void) nxt_sem_post(&tp->sem);
53
54 return NXT_OK;
55}
56
57
58static nxt_int_t
59nxt_thread_pool_init(nxt_thread_pool_t *tp)
60{
61 nxt_int_t ret;
62 nxt_thread_link_t *link;
63 nxt_thread_handle_t handle;
64
65 if (nxt_fast_path(tp->ready)) {
66 return NXT_OK;
67 }
68
69 nxt_thread_spin_lock(&tp->work_queue.lock);
70
71 ret = NXT_OK;
72
73 if (!tp->ready) {
74
75 nxt_thread_log_debug("thread pool init");
76
77 (void) nxt_atomic_fetch_add(&tp->threads, 1);
78
79 if (nxt_fast_path(nxt_sem_init(&tp->sem, 0) == NXT_OK)) {
80
81 nxt_locked_work_queue_create(&tp->work_queue, 0);
82
83 link = nxt_malloc(sizeof(nxt_thread_link_t));
84
85 if (nxt_fast_path(link != NULL)) {
86 link->start = nxt_thread_pool_start;
87 link->data = tp;
88 link->engine = tp->engine;
89 /*
90 * link->exit is not used. link->engine is used just to
91 * set thr->link by nxt_thread_trampoline() and the link
92 * is a mark of the first thread of pool.
93 */
94 if (nxt_thread_create(&handle, link) == NXT_OK) {
95 tp->ready = 1;
96 goto done;
97 }
98 }
99
100 nxt_sem_destroy(&tp->sem);
101 }
102
103 (void) nxt_atomic_fetch_add(&tp->threads, -1);
104
105 nxt_locked_work_queue_destroy(&tp->work_queue);
106
107 ret = NXT_ERROR;
108 }
109
110done:
111
112 nxt_thread_spin_unlock(&tp->work_queue.lock);
113
114 return ret;
115}
116
117
118static void
119nxt_thread_pool_start(void *ctx)
120{
121 void *obj, *data;
122 nxt_task_t *task;
120 nxt_thread_t *thr;
121 nxt_thread_pool_t *tp;
122 nxt_work_handler_t handler;
123
124 tp = ctx;
125 thr = nxt_thread();
126
127 if (thr->link != NULL) {
128 /* Only the first thread has a link. */
129 tp->main = thr->handle;
130 nxt_free(thr->link);
131 thr->link = NULL;
123 nxt_thread_t *thr;
124 nxt_thread_pool_t *tp;
125 nxt_work_handler_t handler;
126
127 tp = ctx;
128 thr = nxt_thread();
129
130 if (thr->link != NULL) {
131 /* Only the first thread has a link. */
132 tp->main = thr->handle;
133 nxt_free(thr->link);
134 thr->link = NULL;
135
136 tp->task.thread = thr;
132 }
133
134 thr->thread_pool = tp;
135
136 if (tp->init != NULL) {
137 tp->init();
138 }
139
140 nxt_thread_work_queue_create(thr, 8);
141
142 for ( ;; ) {
143 nxt_thread_pool_wait(tp);
144
137 }
138
139 thr->thread_pool = tp;
140
141 if (tp->init != NULL) {
142 tp->init();
143 }
144
145 nxt_thread_work_queue_create(thr, 8);
146
147 for ( ;; ) {
148 nxt_thread_pool_wait(tp);
149
145 handler = nxt_locked_work_queue_pop(&tp->work_queue, &obj,
146 &data, &thr->log);
150 handler = nxt_locked_work_queue_pop(&tp->work_queue, &task, &obj,
151 &data);
147
148 if (nxt_fast_path(handler != NULL)) {
152
153 if (nxt_fast_path(handler != NULL)) {
154 task->thread = thr;
149 nxt_log_debug(thr->log, "locked work queue");
155 nxt_log_debug(thr->log, "locked work queue");
150 handler(thr, obj, data);
156 handler(task, obj, data);
151 }
152
153 for ( ;; ) {
154 thr->log = &nxt_main_log;
155
157 }
158
159 for ( ;; ) {
160 thr->log = &nxt_main_log;
161
156 handler = nxt_thread_work_queue_pop(thr, &obj, &data, &thr->log);
162 handler = nxt_thread_work_queue_pop(thr, &task, &obj, &data);
157
158 if (handler == NULL) {
159 break;
160 }
161
163
164 if (handler == NULL) {
165 break;
166 }
167
162 handler(thr, obj, data);
168 handler(task, obj, data);
163 }
164
165 thr->log = &nxt_main_log;
166 }
167}
168
169
170static void
171nxt_thread_pool_wait(nxt_thread_pool_t *tp)
172{
173 nxt_err_t err;
174 nxt_thread_t *thr;
175 nxt_atomic_uint_t waiting, threads;
176 nxt_thread_link_t *link;
177 nxt_thread_handle_t handle;
178
179 thr = nxt_thread();
180
181 nxt_log_debug(thr->log, "thread pool wait");
182
183 (void) nxt_atomic_fetch_add(&tp->waiting, 1);
184
185 for ( ;; ) {
186 err = nxt_sem_wait(&tp->sem, tp->timeout);
187
188 if (err == 0) {
189 waiting = nxt_atomic_fetch_add(&tp->waiting, -1);
190 break;
191 }
192
193 if (err == NXT_ETIMEDOUT) {
194 if (nxt_thread_handle_equal(thr->handle, tp->main)) {
195 continue;
196 }
197 }
198
199 (void) nxt_atomic_fetch_add(&tp->waiting, -1);
200 (void) nxt_atomic_fetch_add(&tp->threads, -1);
201
202 nxt_thread_exit(thr);
203 nxt_unreachable();
204 }
205
206 nxt_log_debug(thr->log, "thread pool awake, waiting: %A", waiting);
207
208 if (waiting > 1) {
209 return;
210 }
211
212 do {
213 threads = tp->threads;
214
215 if (threads >= tp->max_threads) {
216 return;
217 }
218
219 } while (!nxt_atomic_cmp_set(&tp->threads, threads, threads + 1));
220
221 link = nxt_zalloc(sizeof(nxt_thread_link_t));
222
223 if (nxt_fast_path(link != NULL)) {
224 link->start = nxt_thread_pool_start;
225 link->data = tp;
226
227 if (nxt_thread_create(&handle, link) != NXT_OK) {
228 (void) nxt_atomic_fetch_add(&tp->threads, -1);
229 }
230 }
231}
232
233
234void
235nxt_thread_pool_destroy(nxt_thread_pool_t *tp)
236{
237 nxt_thread_t *thr;
238
169 }
170
171 thr->log = &nxt_main_log;
172 }
173}
174
175
176static void
177nxt_thread_pool_wait(nxt_thread_pool_t *tp)
178{
179 nxt_err_t err;
180 nxt_thread_t *thr;
181 nxt_atomic_uint_t waiting, threads;
182 nxt_thread_link_t *link;
183 nxt_thread_handle_t handle;
184
185 thr = nxt_thread();
186
187 nxt_log_debug(thr->log, "thread pool wait");
188
189 (void) nxt_atomic_fetch_add(&tp->waiting, 1);
190
191 for ( ;; ) {
192 err = nxt_sem_wait(&tp->sem, tp->timeout);
193
194 if (err == 0) {
195 waiting = nxt_atomic_fetch_add(&tp->waiting, -1);
196 break;
197 }
198
199 if (err == NXT_ETIMEDOUT) {
200 if (nxt_thread_handle_equal(thr->handle, tp->main)) {
201 continue;
202 }
203 }
204
205 (void) nxt_atomic_fetch_add(&tp->waiting, -1);
206 (void) nxt_atomic_fetch_add(&tp->threads, -1);
207
208 nxt_thread_exit(thr);
209 nxt_unreachable();
210 }
211
212 nxt_log_debug(thr->log, "thread pool awake, waiting: %A", waiting);
213
214 if (waiting > 1) {
215 return;
216 }
217
218 do {
219 threads = tp->threads;
220
221 if (threads >= tp->max_threads) {
222 return;
223 }
224
225 } while (!nxt_atomic_cmp_set(&tp->threads, threads, threads + 1));
226
227 link = nxt_zalloc(sizeof(nxt_thread_link_t));
228
229 if (nxt_fast_path(link != NULL)) {
230 link->start = nxt_thread_pool_start;
231 link->data = tp;
232
233 if (nxt_thread_create(&handle, link) != NXT_OK) {
234 (void) nxt_atomic_fetch_add(&tp->threads, -1);
235 }
236 }
237}
238
239
240void
241nxt_thread_pool_destroy(nxt_thread_pool_t *tp)
242{
243 nxt_thread_t *thr;
244
239 if (!tp->ready) {
240 thr = nxt_thread();
245 thr = nxt_thread();
241
246
247 if (!tp->ready) {
242 nxt_thread_work_queue_add(thr, &thr->work_queue.main, tp->exit,
248 nxt_thread_work_queue_add(thr, &thr->work_queue.main, tp->exit,
243 tp, NULL, &nxt_main_log);
249 &tp->task, tp, NULL);
244 return;
245 }
246
247 if (tp->max_threads != 0) {
248 /* Disable new threads creation and mark a pool as being destroyed. */
249 tp->max_threads = 0;
250
250 return;
251 }
252
253 if (tp->max_threads != 0) {
254 /* Disable new threads creation and mark a pool as being destroyed. */
255 tp->max_threads = 0;
256
251 nxt_thread_pool_post(tp, nxt_thread_pool_exit, tp, NULL, &nxt_main_log);
257 nxt_thread_pool_post(tp, nxt_thread_pool_exit, &tp->task, tp, NULL);
252 }
253}
254
255
256/*
257 * A thread handle (pthread_t) is either pointer or integer, so it can be
258 * passed as work handler pointer "data" argument. To convert void pointer
259 * to pthread_t and vice versa the source argument should be cast first to
260 * uintptr_t type and then to the destination type.
261 *
262 * If the handle would be a struct it should be stored in thread pool and
263 * the thread pool must be freed in the thread pool exit procedure after
264 * the last thread of pool will exit.
265 */
266
267static void
258 }
259}
260
261
262/*
263 * A thread handle (pthread_t) is either pointer or integer, so it can be
264 * passed as work handler pointer "data" argument. To convert void pointer
265 * to pthread_t and vice versa the source argument should be cast first to
266 * uintptr_t type and then to the destination type.
267 *
268 * If the handle would be a struct it should be stored in thread pool and
269 * the thread pool must be freed in the thread pool exit procedure after
270 * the last thread of pool will exit.
271 */
272
273static void
268nxt_thread_pool_exit(nxt_thread_t *thr, void *obj, void *data)
274nxt_thread_pool_exit(nxt_task_t *task, void *obj, void *data)
269{
275{
276 nxt_thread_t *thread;
270 nxt_thread_pool_t *tp;
271 nxt_atomic_uint_t threads;
272 nxt_thread_handle_t handle;
273
274 tp = obj;
277 nxt_thread_pool_t *tp;
278 nxt_atomic_uint_t threads;
279 nxt_thread_handle_t handle;
280
281 tp = obj;
282 thread = task->thread;
275
283
276 nxt_log_debug(thr->log, "thread pool exit");
284 nxt_debug(task, "thread pool exit");
277
278 if (data != NULL) {
279 handle = (nxt_thread_handle_t) (uintptr_t) data;
280 nxt_thread_wait(handle);
281 }
282
283 threads = nxt_atomic_fetch_add(&tp->threads, -1);
284
285
286 if (data != NULL) {
287 handle = (nxt_thread_handle_t) (uintptr_t) data;
288 nxt_thread_wait(handle);
289 }
290
291 threads = nxt_atomic_fetch_add(&tp->threads, -1);
292
285 nxt_log_debug(thr->log, "thread pool threads: %A", threads);
293 nxt_debug(task, "thread pool threads: %A", threads);
286
287 if (threads > 1) {
294
295 if (threads > 1) {
288 nxt_thread_pool_post(tp, nxt_thread_pool_exit, tp,
289 (void *) (uintptr_t) thr->handle, &nxt_main_log);
296 nxt_thread_pool_post(tp, nxt_thread_pool_exit, &tp->task, tp,
297 (void *) (uintptr_t) thread->handle);
290
291 } else {
298
299 } else {
292 nxt_main_log_debug("thread pool destroy");
300 nxt_debug(task, "thread pool destroy");
293
301
294 nxt_event_engine_post(tp->engine, tp->exit, tp,
295 (void *) (uintptr_t) thr->handle, &nxt_main_log);
302 nxt_event_engine_post(tp->engine, tp->exit, &tp->task, tp,
303 (void *) (uintptr_t) thread->handle,
304 &nxt_main_log);
296
297 nxt_sem_destroy(&tp->sem);
298
299 nxt_locked_work_queue_destroy(&tp->work_queue);
300
301 nxt_free(tp);
302 }
303
305
306 nxt_sem_destroy(&tp->sem);
307
308 nxt_locked_work_queue_destroy(&tp->work_queue);
309
310 nxt_free(tp);
311 }
312
304 nxt_thread_work_queue_destroy(thr);
313 nxt_thread_work_queue_destroy(thread);
305
314
306 nxt_thread_exit(thr);
315 nxt_thread_exit(thread);
316
307 nxt_unreachable();
308}
317 nxt_unreachable();
318}