xref: /unit/src/nxt_thread_pool.c (revision 0)
1*0Sigor@sysoev.ru 
2*0Sigor@sysoev.ru /*
3*0Sigor@sysoev.ru  * Copyright (C) Igor Sysoev
4*0Sigor@sysoev.ru  * Copyright (C) NGINX, Inc.
5*0Sigor@sysoev.ru  */
6*0Sigor@sysoev.ru 
7*0Sigor@sysoev.ru #include <nxt_main.h>
8*0Sigor@sysoev.ru 
9*0Sigor@sysoev.ru 
10*0Sigor@sysoev.ru static nxt_int_t nxt_thread_pool_init(nxt_thread_pool_t *tp);
11*0Sigor@sysoev.ru static void nxt_thread_pool_exit(nxt_thread_t *thr, void *obj, void *data);
12*0Sigor@sysoev.ru static void nxt_thread_pool_start(void *ctx);
13*0Sigor@sysoev.ru static void nxt_thread_pool_wait(nxt_thread_pool_t *tp);
14*0Sigor@sysoev.ru 
15*0Sigor@sysoev.ru 
16*0Sigor@sysoev.ru nxt_thread_pool_t *
17*0Sigor@sysoev.ru nxt_thread_pool_create(nxt_uint_t max_threads, nxt_nsec_t timeout,
18*0Sigor@sysoev.ru     nxt_thread_pool_init_t init, nxt_event_engine_t *engine,
19*0Sigor@sysoev.ru     nxt_work_handler_t exit)
20*0Sigor@sysoev.ru {
21*0Sigor@sysoev.ru     nxt_thread_pool_t  *tp;
22*0Sigor@sysoev.ru 
23*0Sigor@sysoev.ru     tp = nxt_zalloc(sizeof(nxt_thread_pool_t));
24*0Sigor@sysoev.ru     if (tp == NULL) {
25*0Sigor@sysoev.ru         return NULL;
26*0Sigor@sysoev.ru     }
27*0Sigor@sysoev.ru 
28*0Sigor@sysoev.ru     tp->max_threads = max_threads;
29*0Sigor@sysoev.ru     tp->timeout = timeout;
30*0Sigor@sysoev.ru     tp->engine = engine;
31*0Sigor@sysoev.ru     tp->init = init;
32*0Sigor@sysoev.ru     tp->exit = exit;
33*0Sigor@sysoev.ru 
34*0Sigor@sysoev.ru     return tp;
35*0Sigor@sysoev.ru }
36*0Sigor@sysoev.ru 
37*0Sigor@sysoev.ru 
38*0Sigor@sysoev.ru nxt_int_t
39*0Sigor@sysoev.ru nxt_thread_pool_post(nxt_thread_pool_t *tp, nxt_work_handler_t handler,
40*0Sigor@sysoev.ru     void *obj, void *data, nxt_log_t *log)
41*0Sigor@sysoev.ru {
42*0Sigor@sysoev.ru     nxt_thread_log_debug("thread pool post");
43*0Sigor@sysoev.ru 
44*0Sigor@sysoev.ru     if (nxt_slow_path(nxt_thread_pool_init(tp) != NXT_OK)) {
45*0Sigor@sysoev.ru         return NXT_ERROR;
46*0Sigor@sysoev.ru     }
47*0Sigor@sysoev.ru 
48*0Sigor@sysoev.ru     nxt_locked_work_queue_add(&tp->work_queue, handler, obj, data, log);
49*0Sigor@sysoev.ru 
50*0Sigor@sysoev.ru     (void) nxt_sem_post(&tp->sem);
51*0Sigor@sysoev.ru 
52*0Sigor@sysoev.ru     return NXT_OK;
53*0Sigor@sysoev.ru }
54*0Sigor@sysoev.ru 
55*0Sigor@sysoev.ru 
56*0Sigor@sysoev.ru static nxt_int_t
57*0Sigor@sysoev.ru nxt_thread_pool_init(nxt_thread_pool_t *tp)
58*0Sigor@sysoev.ru {
59*0Sigor@sysoev.ru     nxt_int_t            ret;
60*0Sigor@sysoev.ru     nxt_thread_link_t    *link;
61*0Sigor@sysoev.ru     nxt_thread_handle_t  handle;
62*0Sigor@sysoev.ru 
63*0Sigor@sysoev.ru     if (nxt_fast_path(tp->ready)) {
64*0Sigor@sysoev.ru         return NXT_OK;
65*0Sigor@sysoev.ru     }
66*0Sigor@sysoev.ru 
67*0Sigor@sysoev.ru     nxt_thread_spin_lock(&tp->work_queue.lock);
68*0Sigor@sysoev.ru 
69*0Sigor@sysoev.ru     ret = NXT_OK;
70*0Sigor@sysoev.ru 
71*0Sigor@sysoev.ru     if (!tp->ready) {
72*0Sigor@sysoev.ru 
73*0Sigor@sysoev.ru         nxt_thread_log_debug("thread pool init");
74*0Sigor@sysoev.ru 
75*0Sigor@sysoev.ru         (void) nxt_atomic_fetch_add(&tp->threads, 1);
76*0Sigor@sysoev.ru 
77*0Sigor@sysoev.ru         if (nxt_fast_path(nxt_sem_init(&tp->sem, 0) == NXT_OK)) {
78*0Sigor@sysoev.ru 
79*0Sigor@sysoev.ru             nxt_locked_work_queue_create(&tp->work_queue, 0);
80*0Sigor@sysoev.ru 
81*0Sigor@sysoev.ru             link = nxt_malloc(sizeof(nxt_thread_link_t));
82*0Sigor@sysoev.ru 
83*0Sigor@sysoev.ru             if (nxt_fast_path(link != NULL)) {
84*0Sigor@sysoev.ru                 link->start = nxt_thread_pool_start;
85*0Sigor@sysoev.ru                 link->data = tp;
86*0Sigor@sysoev.ru                 link->engine = tp->engine;
87*0Sigor@sysoev.ru                 /*
88*0Sigor@sysoev.ru                  * link->exit is not used.  link->engine is used just to
89*0Sigor@sysoev.ru                  * set thr->link by nxt_thread_trampoline() and the link
90*0Sigor@sysoev.ru                  * is a mark of the first thread of pool.
91*0Sigor@sysoev.ru                  */
92*0Sigor@sysoev.ru                 if (nxt_thread_create(&handle, link) == NXT_OK) {
93*0Sigor@sysoev.ru                     tp->ready = 1;
94*0Sigor@sysoev.ru                     goto done;
95*0Sigor@sysoev.ru                 }
96*0Sigor@sysoev.ru             }
97*0Sigor@sysoev.ru 
98*0Sigor@sysoev.ru             nxt_sem_destroy(&tp->sem);
99*0Sigor@sysoev.ru         }
100*0Sigor@sysoev.ru 
101*0Sigor@sysoev.ru         (void) nxt_atomic_fetch_add(&tp->threads, -1);
102*0Sigor@sysoev.ru 
103*0Sigor@sysoev.ru         nxt_locked_work_queue_destroy(&tp->work_queue);
104*0Sigor@sysoev.ru 
105*0Sigor@sysoev.ru         ret = NXT_ERROR;
106*0Sigor@sysoev.ru     }
107*0Sigor@sysoev.ru 
108*0Sigor@sysoev.ru done:
109*0Sigor@sysoev.ru 
110*0Sigor@sysoev.ru     nxt_thread_spin_unlock(&tp->work_queue.lock);
111*0Sigor@sysoev.ru 
112*0Sigor@sysoev.ru     return ret;
113*0Sigor@sysoev.ru }
114*0Sigor@sysoev.ru 
115*0Sigor@sysoev.ru 
116*0Sigor@sysoev.ru static void
117*0Sigor@sysoev.ru nxt_thread_pool_start(void *ctx)
118*0Sigor@sysoev.ru {
119*0Sigor@sysoev.ru     void                *obj, *data;
120*0Sigor@sysoev.ru     nxt_thread_t        *thr;
121*0Sigor@sysoev.ru     nxt_thread_pool_t   *tp;
122*0Sigor@sysoev.ru     nxt_work_handler_t  handler;
123*0Sigor@sysoev.ru 
124*0Sigor@sysoev.ru     tp = ctx;
125*0Sigor@sysoev.ru     thr = nxt_thread();
126*0Sigor@sysoev.ru 
127*0Sigor@sysoev.ru     if (thr->link != NULL) {
128*0Sigor@sysoev.ru         /* Only the first thread has a link. */
129*0Sigor@sysoev.ru         tp->main = thr->handle;
130*0Sigor@sysoev.ru         nxt_free(thr->link);
131*0Sigor@sysoev.ru         thr->link = NULL;
132*0Sigor@sysoev.ru     }
133*0Sigor@sysoev.ru 
134*0Sigor@sysoev.ru     thr->thread_pool = tp;
135*0Sigor@sysoev.ru 
136*0Sigor@sysoev.ru     if (tp->init != NULL) {
137*0Sigor@sysoev.ru         tp->init();
138*0Sigor@sysoev.ru     }
139*0Sigor@sysoev.ru 
140*0Sigor@sysoev.ru     nxt_thread_work_queue_create(thr, 8);
141*0Sigor@sysoev.ru 
142*0Sigor@sysoev.ru     for ( ;; ) {
143*0Sigor@sysoev.ru         nxt_thread_pool_wait(tp);
144*0Sigor@sysoev.ru 
145*0Sigor@sysoev.ru         handler = nxt_locked_work_queue_pop(&tp->work_queue, &obj,
146*0Sigor@sysoev.ru                                             &data, &thr->log);
147*0Sigor@sysoev.ru 
148*0Sigor@sysoev.ru         if (nxt_fast_path(handler != NULL)) {
149*0Sigor@sysoev.ru             nxt_log_debug(thr->log, "locked work queue");
150*0Sigor@sysoev.ru             handler(thr, obj, data);
151*0Sigor@sysoev.ru         }
152*0Sigor@sysoev.ru 
153*0Sigor@sysoev.ru         for ( ;; ) {
154*0Sigor@sysoev.ru             thr->log = &nxt_main_log;
155*0Sigor@sysoev.ru 
156*0Sigor@sysoev.ru             handler = nxt_thread_work_queue_pop(thr, &obj, &data, &thr->log);
157*0Sigor@sysoev.ru 
158*0Sigor@sysoev.ru             if (handler == NULL) {
159*0Sigor@sysoev.ru                 break;
160*0Sigor@sysoev.ru             }
161*0Sigor@sysoev.ru 
162*0Sigor@sysoev.ru             handler(thr, obj, data);
163*0Sigor@sysoev.ru         }
164*0Sigor@sysoev.ru 
165*0Sigor@sysoev.ru         thr->log = &nxt_main_log;
166*0Sigor@sysoev.ru     }
167*0Sigor@sysoev.ru }
168*0Sigor@sysoev.ru 
169*0Sigor@sysoev.ru 
170*0Sigor@sysoev.ru static void
171*0Sigor@sysoev.ru nxt_thread_pool_wait(nxt_thread_pool_t *tp)
172*0Sigor@sysoev.ru {
173*0Sigor@sysoev.ru     nxt_err_t            err;
174*0Sigor@sysoev.ru     nxt_thread_t         *thr;
175*0Sigor@sysoev.ru     nxt_atomic_uint_t    waiting, threads;
176*0Sigor@sysoev.ru     nxt_thread_link_t    *link;
177*0Sigor@sysoev.ru     nxt_thread_handle_t  handle;
178*0Sigor@sysoev.ru 
179*0Sigor@sysoev.ru     thr = nxt_thread();
180*0Sigor@sysoev.ru 
181*0Sigor@sysoev.ru     nxt_log_debug(thr->log, "thread pool wait");
182*0Sigor@sysoev.ru 
183*0Sigor@sysoev.ru     (void) nxt_atomic_fetch_add(&tp->waiting, 1);
184*0Sigor@sysoev.ru 
185*0Sigor@sysoev.ru     for ( ;; ) {
186*0Sigor@sysoev.ru         err = nxt_sem_wait(&tp->sem, tp->timeout);
187*0Sigor@sysoev.ru 
188*0Sigor@sysoev.ru         if (err == 0) {
189*0Sigor@sysoev.ru             waiting = nxt_atomic_fetch_add(&tp->waiting, -1);
190*0Sigor@sysoev.ru             break;
191*0Sigor@sysoev.ru         }
192*0Sigor@sysoev.ru 
193*0Sigor@sysoev.ru         if (err == NXT_ETIMEDOUT) {
194*0Sigor@sysoev.ru             if (nxt_thread_handle_equal(thr->handle, tp->main)) {
195*0Sigor@sysoev.ru                 continue;
196*0Sigor@sysoev.ru             }
197*0Sigor@sysoev.ru         }
198*0Sigor@sysoev.ru 
199*0Sigor@sysoev.ru         (void) nxt_atomic_fetch_add(&tp->waiting, -1);
200*0Sigor@sysoev.ru         (void) nxt_atomic_fetch_add(&tp->threads, -1);
201*0Sigor@sysoev.ru 
202*0Sigor@sysoev.ru         nxt_thread_exit(thr);
203*0Sigor@sysoev.ru         nxt_unreachable();
204*0Sigor@sysoev.ru     }
205*0Sigor@sysoev.ru 
206*0Sigor@sysoev.ru     nxt_log_debug(thr->log, "thread pool awake, waiting: %A", waiting);
207*0Sigor@sysoev.ru 
208*0Sigor@sysoev.ru     if (waiting > 1) {
209*0Sigor@sysoev.ru         return;
210*0Sigor@sysoev.ru     }
211*0Sigor@sysoev.ru 
212*0Sigor@sysoev.ru     do {
213*0Sigor@sysoev.ru         threads = tp->threads;
214*0Sigor@sysoev.ru 
215*0Sigor@sysoev.ru         if (threads >= tp->max_threads) {
216*0Sigor@sysoev.ru             return;
217*0Sigor@sysoev.ru         }
218*0Sigor@sysoev.ru 
219*0Sigor@sysoev.ru     } while (!nxt_atomic_cmp_set(&tp->threads, threads, threads + 1));
220*0Sigor@sysoev.ru 
221*0Sigor@sysoev.ru     link = nxt_zalloc(sizeof(nxt_thread_link_t));
222*0Sigor@sysoev.ru 
223*0Sigor@sysoev.ru     if (nxt_fast_path(link != NULL)) {
224*0Sigor@sysoev.ru         link->start = nxt_thread_pool_start;
225*0Sigor@sysoev.ru         link->data = tp;
226*0Sigor@sysoev.ru 
227*0Sigor@sysoev.ru         if (nxt_thread_create(&handle, link) != NXT_OK) {
228*0Sigor@sysoev.ru             (void) nxt_atomic_fetch_add(&tp->threads, -1);
229*0Sigor@sysoev.ru         }
230*0Sigor@sysoev.ru     }
231*0Sigor@sysoev.ru }
232*0Sigor@sysoev.ru 
233*0Sigor@sysoev.ru 
234*0Sigor@sysoev.ru void
235*0Sigor@sysoev.ru nxt_thread_pool_destroy(nxt_thread_pool_t *tp)
236*0Sigor@sysoev.ru {
237*0Sigor@sysoev.ru     nxt_thread_t  *thr;
238*0Sigor@sysoev.ru 
239*0Sigor@sysoev.ru     if (!tp->ready) {
240*0Sigor@sysoev.ru         thr = nxt_thread();
241*0Sigor@sysoev.ru 
242*0Sigor@sysoev.ru         nxt_thread_work_queue_add(thr, &thr->work_queue.main, tp->exit,
243*0Sigor@sysoev.ru                                   tp, NULL, &nxt_main_log);
244*0Sigor@sysoev.ru         return;
245*0Sigor@sysoev.ru     }
246*0Sigor@sysoev.ru 
247*0Sigor@sysoev.ru     if (tp->max_threads != 0) {
248*0Sigor@sysoev.ru         /* Disable new threads creation and mark a pool as being destroyed. */
249*0Sigor@sysoev.ru         tp->max_threads = 0;
250*0Sigor@sysoev.ru 
251*0Sigor@sysoev.ru         nxt_thread_pool_post(tp, nxt_thread_pool_exit, tp, NULL, &nxt_main_log);
252*0Sigor@sysoev.ru     }
253*0Sigor@sysoev.ru }
254*0Sigor@sysoev.ru 
255*0Sigor@sysoev.ru 
256*0Sigor@sysoev.ru /*
257*0Sigor@sysoev.ru  * A thread handle (pthread_t) is either pointer or integer, so it can be
258*0Sigor@sysoev.ru  * passed as work handler pointer "data" argument.  To convert void pointer
259*0Sigor@sysoev.ru  * to pthread_t and vice versa the source argument should be cast first to
260*0Sigor@sysoev.ru  * uintptr_t type and then to the destination type.
261*0Sigor@sysoev.ru  *
262*0Sigor@sysoev.ru  * If the handle would be a struct it should be stored in thread pool and
263*0Sigor@sysoev.ru  * the thread pool must be freed in the thread pool exit procedure after
264*0Sigor@sysoev.ru  * the last thread of pool will exit.
265*0Sigor@sysoev.ru  */
266*0Sigor@sysoev.ru 
267*0Sigor@sysoev.ru static void
268*0Sigor@sysoev.ru nxt_thread_pool_exit(nxt_thread_t *thr, void *obj, void *data)
269*0Sigor@sysoev.ru {
270*0Sigor@sysoev.ru     nxt_thread_pool_t    *tp;
271*0Sigor@sysoev.ru     nxt_atomic_uint_t    threads;
272*0Sigor@sysoev.ru     nxt_thread_handle_t  handle;
273*0Sigor@sysoev.ru 
274*0Sigor@sysoev.ru     tp = obj;
275*0Sigor@sysoev.ru 
276*0Sigor@sysoev.ru     nxt_log_debug(thr->log, "thread pool exit");
277*0Sigor@sysoev.ru 
278*0Sigor@sysoev.ru     if (data != NULL) {
279*0Sigor@sysoev.ru         handle = (nxt_thread_handle_t) (uintptr_t) data;
280*0Sigor@sysoev.ru         nxt_thread_wait(handle);
281*0Sigor@sysoev.ru     }
282*0Sigor@sysoev.ru 
283*0Sigor@sysoev.ru     threads = nxt_atomic_fetch_add(&tp->threads, -1);
284*0Sigor@sysoev.ru 
285*0Sigor@sysoev.ru     nxt_log_debug(thr->log, "thread pool threads: %A", threads);
286*0Sigor@sysoev.ru 
287*0Sigor@sysoev.ru     if (threads > 1) {
288*0Sigor@sysoev.ru         nxt_thread_pool_post(tp, nxt_thread_pool_exit, tp,
289*0Sigor@sysoev.ru                              (void *) (uintptr_t) thr->handle, &nxt_main_log);
290*0Sigor@sysoev.ru 
291*0Sigor@sysoev.ru     } else {
292*0Sigor@sysoev.ru         nxt_main_log_debug("thread pool destroy");
293*0Sigor@sysoev.ru 
294*0Sigor@sysoev.ru         nxt_event_engine_post(tp->engine, tp->exit, tp,
295*0Sigor@sysoev.ru                               (void *) (uintptr_t) thr->handle, &nxt_main_log);
296*0Sigor@sysoev.ru 
297*0Sigor@sysoev.ru         nxt_sem_destroy(&tp->sem);
298*0Sigor@sysoev.ru 
299*0Sigor@sysoev.ru         nxt_locked_work_queue_destroy(&tp->work_queue);
300*0Sigor@sysoev.ru 
301*0Sigor@sysoev.ru         nxt_free(tp);
302*0Sigor@sysoev.ru     }
303*0Sigor@sysoev.ru 
304*0Sigor@sysoev.ru     nxt_thread_work_queue_destroy(thr);
305*0Sigor@sysoev.ru 
306*0Sigor@sysoev.ru     nxt_thread_exit(thr);
307*0Sigor@sysoev.ru     nxt_unreachable();
308*0Sigor@sysoev.ru }
309