xref: /unit/src/nxt_thread_pool.c (revision 0:a63ceefd6ab0)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 static nxt_int_t nxt_thread_pool_init(nxt_thread_pool_t *tp);
11 static void nxt_thread_pool_exit(nxt_thread_t *thr, void *obj, void *data);
12 static void nxt_thread_pool_start(void *ctx);
13 static void nxt_thread_pool_wait(nxt_thread_pool_t *tp);
14 
15 
16 nxt_thread_pool_t *
17 nxt_thread_pool_create(nxt_uint_t max_threads, nxt_nsec_t timeout,
18     nxt_thread_pool_init_t init, nxt_event_engine_t *engine,
19     nxt_work_handler_t exit)
20 {
21     nxt_thread_pool_t  *tp;
22 
23     tp = nxt_zalloc(sizeof(nxt_thread_pool_t));
24     if (tp == NULL) {
25         return NULL;
26     }
27 
28     tp->max_threads = max_threads;
29     tp->timeout = timeout;
30     tp->engine = engine;
31     tp->init = init;
32     tp->exit = exit;
33 
34     return tp;
35 }
36 
37 
38 nxt_int_t
39 nxt_thread_pool_post(nxt_thread_pool_t *tp, nxt_work_handler_t handler,
40     void *obj, void *data, nxt_log_t *log)
41 {
42     nxt_thread_log_debug("thread pool post");
43 
44     if (nxt_slow_path(nxt_thread_pool_init(tp) != NXT_OK)) {
45         return NXT_ERROR;
46     }
47 
48     nxt_locked_work_queue_add(&tp->work_queue, handler, obj, data, log);
49 
50     (void) nxt_sem_post(&tp->sem);
51 
52     return NXT_OK;
53 }
54 
55 
56 static nxt_int_t
57 nxt_thread_pool_init(nxt_thread_pool_t *tp)
58 {
59     nxt_int_t            ret;
60     nxt_thread_link_t    *link;
61     nxt_thread_handle_t  handle;
62 
63     if (nxt_fast_path(tp->ready)) {
64         return NXT_OK;
65     }
66 
67     nxt_thread_spin_lock(&tp->work_queue.lock);
68 
69     ret = NXT_OK;
70 
71     if (!tp->ready) {
72 
73         nxt_thread_log_debug("thread pool init");
74 
75         (void) nxt_atomic_fetch_add(&tp->threads, 1);
76 
77         if (nxt_fast_path(nxt_sem_init(&tp->sem, 0) == NXT_OK)) {
78 
79             nxt_locked_work_queue_create(&tp->work_queue, 0);
80 
81             link = nxt_malloc(sizeof(nxt_thread_link_t));
82 
83             if (nxt_fast_path(link != NULL)) {
84                 link->start = nxt_thread_pool_start;
85                 link->data = tp;
86                 link->engine = tp->engine;
87                 /*
88                  * link->exit is not used.  link->engine is used just to
89                  * set thr->link by nxt_thread_trampoline() and the link
90                  * is a mark of the first thread of pool.
91                  */
92                 if (nxt_thread_create(&handle, link) == NXT_OK) {
93                     tp->ready = 1;
94                     goto done;
95                 }
96             }
97 
98             nxt_sem_destroy(&tp->sem);
99         }
100 
101         (void) nxt_atomic_fetch_add(&tp->threads, -1);
102 
103         nxt_locked_work_queue_destroy(&tp->work_queue);
104 
105         ret = NXT_ERROR;
106     }
107 
108 done:
109 
110     nxt_thread_spin_unlock(&tp->work_queue.lock);
111 
112     return ret;
113 }
114 
115 
116 static void
117 nxt_thread_pool_start(void *ctx)
118 {
119     void                *obj, *data;
120     nxt_thread_t        *thr;
121     nxt_thread_pool_t   *tp;
122     nxt_work_handler_t  handler;
123 
124     tp = ctx;
125     thr = nxt_thread();
126 
127     if (thr->link != NULL) {
128         /* Only the first thread has a link. */
129         tp->main = thr->handle;
130         nxt_free(thr->link);
131         thr->link = NULL;
132     }
133 
134     thr->thread_pool = tp;
135 
136     if (tp->init != NULL) {
137         tp->init();
138     }
139 
140     nxt_thread_work_queue_create(thr, 8);
141 
142     for ( ;; ) {
143         nxt_thread_pool_wait(tp);
144 
145         handler = nxt_locked_work_queue_pop(&tp->work_queue, &obj,
146                                             &data, &thr->log);
147 
148         if (nxt_fast_path(handler != NULL)) {
149             nxt_log_debug(thr->log, "locked work queue");
150             handler(thr, obj, data);
151         }
152 
153         for ( ;; ) {
154             thr->log = &nxt_main_log;
155 
156             handler = nxt_thread_work_queue_pop(thr, &obj, &data, &thr->log);
157 
158             if (handler == NULL) {
159                 break;
160             }
161 
162             handler(thr, obj, data);
163         }
164 
165         thr->log = &nxt_main_log;
166     }
167 }
168 
169 
170 static void
171 nxt_thread_pool_wait(nxt_thread_pool_t *tp)
172 {
173     nxt_err_t            err;
174     nxt_thread_t         *thr;
175     nxt_atomic_uint_t    waiting, threads;
176     nxt_thread_link_t    *link;
177     nxt_thread_handle_t  handle;
178 
179     thr = nxt_thread();
180 
181     nxt_log_debug(thr->log, "thread pool wait");
182 
183     (void) nxt_atomic_fetch_add(&tp->waiting, 1);
184 
185     for ( ;; ) {
186         err = nxt_sem_wait(&tp->sem, tp->timeout);
187 
188         if (err == 0) {
189             waiting = nxt_atomic_fetch_add(&tp->waiting, -1);
190             break;
191         }
192 
193         if (err == NXT_ETIMEDOUT) {
194             if (nxt_thread_handle_equal(thr->handle, tp->main)) {
195                 continue;
196             }
197         }
198 
199         (void) nxt_atomic_fetch_add(&tp->waiting, -1);
200         (void) nxt_atomic_fetch_add(&tp->threads, -1);
201 
202         nxt_thread_exit(thr);
203         nxt_unreachable();
204     }
205 
206     nxt_log_debug(thr->log, "thread pool awake, waiting: %A", waiting);
207 
208     if (waiting > 1) {
209         return;
210     }
211 
212     do {
213         threads = tp->threads;
214 
215         if (threads >= tp->max_threads) {
216             return;
217         }
218 
219     } while (!nxt_atomic_cmp_set(&tp->threads, threads, threads + 1));
220 
221     link = nxt_zalloc(sizeof(nxt_thread_link_t));
222 
223     if (nxt_fast_path(link != NULL)) {
224         link->start = nxt_thread_pool_start;
225         link->data = tp;
226 
227         if (nxt_thread_create(&handle, link) != NXT_OK) {
228             (void) nxt_atomic_fetch_add(&tp->threads, -1);
229         }
230     }
231 }
232 
233 
234 void
235 nxt_thread_pool_destroy(nxt_thread_pool_t *tp)
236 {
237     nxt_thread_t  *thr;
238 
239     if (!tp->ready) {
240         thr = nxt_thread();
241 
242         nxt_thread_work_queue_add(thr, &thr->work_queue.main, tp->exit,
243                                   tp, NULL, &nxt_main_log);
244         return;
245     }
246 
247     if (tp->max_threads != 0) {
248         /* Disable new threads creation and mark a pool as being destroyed. */
249         tp->max_threads = 0;
250 
251         nxt_thread_pool_post(tp, nxt_thread_pool_exit, tp, NULL, &nxt_main_log);
252     }
253 }
254 
255 
256 /*
257  * A thread handle (pthread_t) is either pointer or integer, so it can be
258  * passed as work handler pointer "data" argument.  To convert void pointer
259  * to pthread_t and vice versa the source argument should be cast first to
260  * uintptr_t type and then to the destination type.
261  *
262  * If the handle would be a struct it should be stored in thread pool and
263  * the thread pool must be freed in the thread pool exit procedure after
264  * the last thread of pool will exit.
265  */
266 
267 static void
268 nxt_thread_pool_exit(nxt_thread_t *thr, void *obj, void *data)
269 {
270     nxt_thread_pool_t    *tp;
271     nxt_atomic_uint_t    threads;
272     nxt_thread_handle_t  handle;
273 
274     tp = obj;
275 
276     nxt_log_debug(thr->log, "thread pool exit");
277 
278     if (data != NULL) {
279         handle = (nxt_thread_handle_t) (uintptr_t) data;
280         nxt_thread_wait(handle);
281     }
282 
283     threads = nxt_atomic_fetch_add(&tp->threads, -1);
284 
285     nxt_log_debug(thr->log, "thread pool threads: %A", threads);
286 
287     if (threads > 1) {
288         nxt_thread_pool_post(tp, nxt_thread_pool_exit, tp,
289                              (void *) (uintptr_t) thr->handle, &nxt_main_log);
290 
291     } else {
292         nxt_main_log_debug("thread pool destroy");
293 
294         nxt_event_engine_post(tp->engine, tp->exit, tp,
295                               (void *) (uintptr_t) thr->handle, &nxt_main_log);
296 
297         nxt_sem_destroy(&tp->sem);
298 
299         nxt_locked_work_queue_destroy(&tp->work_queue);
300 
301         nxt_free(tp);
302     }
303 
304     nxt_thread_work_queue_destroy(thr);
305 
306     nxt_thread_exit(thr);
307     nxt_unreachable();
308 }
309