xref: /unit/src/nxt_thread_pool.c (revision 494:7c83ddcc1c42)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 static nxt_int_t nxt_thread_pool_init(nxt_thread_pool_t *tp);
11 static void nxt_thread_pool_exit(nxt_task_t *task, void *obj, void *data);
12 static void nxt_thread_pool_start(void *ctx);
13 static void nxt_thread_pool_loop(void *ctx);
14 static void nxt_thread_pool_wait(nxt_thread_pool_t *tp);
15 
16 
17 nxt_thread_pool_t *
nxt_thread_pool_create(nxt_uint_t max_threads,nxt_nsec_t timeout,nxt_thread_pool_init_t init,nxt_event_engine_t * engine,nxt_work_handler_t exit)18 nxt_thread_pool_create(nxt_uint_t max_threads, nxt_nsec_t timeout,
19     nxt_thread_pool_init_t init, nxt_event_engine_t *engine,
20     nxt_work_handler_t exit)
21 {
22     nxt_thread_pool_t  *tp;
23 
24     tp = nxt_zalloc(sizeof(nxt_thread_pool_t));
25     if (tp == NULL) {
26         return NULL;
27     }
28 
29     tp->max_threads = max_threads;
30     tp->timeout = timeout;
31     tp->engine = engine;
32     tp->task.thread = engine->task.thread;
33     tp->task.log = engine->task.log;
34     tp->init = init;
35     tp->exit = exit;
36 
37     return tp;
38 }
39 
40 
41 nxt_int_t
nxt_thread_pool_post(nxt_thread_pool_t * tp,nxt_work_t * work)42 nxt_thread_pool_post(nxt_thread_pool_t *tp, nxt_work_t *work)
43 {
44     nxt_thread_log_debug("thread pool post");
45 
46     if (nxt_slow_path(nxt_thread_pool_init(tp) != NXT_OK)) {
47         return NXT_ERROR;
48     }
49 
50     nxt_locked_work_queue_add(&tp->work_queue, work);
51 
52     (void) nxt_sem_post(&tp->sem);
53 
54     return NXT_OK;
55 }
56 
57 
58 static nxt_int_t
nxt_thread_pool_init(nxt_thread_pool_t * tp)59 nxt_thread_pool_init(nxt_thread_pool_t *tp)
60 {
61     nxt_int_t            ret;
62     nxt_thread_link_t    *link;
63     nxt_thread_handle_t  handle;
64 
65     if (nxt_fast_path(tp->ready)) {
66         return NXT_OK;
67     }
68 
69     if (tp->max_threads == 0) {
70         /* The pool is being destroyed. */
71         return NXT_ERROR;
72     }
73 
74     nxt_thread_spin_lock(&tp->work_queue.lock);
75 
76     ret = NXT_OK;
77 
78     if (!tp->ready) {
79 
80         nxt_thread_log_debug("thread pool init");
81 
82         (void) nxt_atomic_fetch_add(&tp->threads, 1);
83 
84         if (nxt_fast_path(nxt_sem_init(&tp->sem, 0) == NXT_OK)) {
85 
86             link = nxt_zalloc(sizeof(nxt_thread_link_t));
87 
88             if (nxt_fast_path(link != NULL)) {
89                 link->start = nxt_thread_pool_start;
90                 link->work.data = tp;
91 
92                 if (nxt_thread_create(&handle, link) == NXT_OK) {
93                     tp->ready = 1;
94                     goto done;
95                 }
96             }
97 
98             nxt_sem_destroy(&tp->sem);
99         }
100 
101         (void) nxt_atomic_fetch_add(&tp->threads, -1);
102 
103         ret = NXT_ERROR;
104     }
105 
106 done:
107 
108     nxt_thread_spin_unlock(&tp->work_queue.lock);
109 
110     return ret;
111 }
112 
113 
114 static void
nxt_thread_pool_start(void * ctx)115 nxt_thread_pool_start(void *ctx)
116 {
117     nxt_thread_t       *thr;
118     nxt_thread_pool_t  *tp;
119 
120     tp = ctx;
121     thr = nxt_thread();
122 
123     tp->main = thr->handle;
124     tp->task.thread = thr;
125 
126     nxt_thread_pool_loop(ctx);
127 }
128 
129 
130 static void
nxt_thread_pool_loop(void * ctx)131 nxt_thread_pool_loop(void *ctx)
132 {
133     void                *obj, *data;
134     nxt_task_t          *task;
135     nxt_thread_t        *thr;
136     nxt_thread_pool_t   *tp;
137     nxt_work_handler_t  handler;
138 
139     tp = ctx;
140     thr = nxt_thread();
141 
142     if (tp->init != NULL) {
143         tp->init();
144     }
145 
146     for ( ;; ) {
147         nxt_thread_pool_wait(tp);
148 
149         handler = nxt_locked_work_queue_pop(&tp->work_queue, &task, &obj,
150                                             &data);
151 
152         if (nxt_fast_path(handler != NULL)) {
153             task->thread = thr;
154 
155             nxt_log_debug(thr->log, "locked work queue");
156 
157             handler(task, obj, data);
158         }
159 
160         thr->log = &nxt_main_log;
161     }
162 }
163 
164 
165 static void
nxt_thread_pool_wait(nxt_thread_pool_t * tp)166 nxt_thread_pool_wait(nxt_thread_pool_t *tp)
167 {
168     nxt_err_t            err;
169     nxt_thread_t         *thr;
170     nxt_atomic_uint_t    waiting, threads;
171     nxt_thread_link_t    *link;
172     nxt_thread_handle_t  handle;
173 
174     thr = nxt_thread();
175 
176     nxt_log_debug(thr->log, "thread pool wait");
177 
178     (void) nxt_atomic_fetch_add(&tp->waiting, 1);
179 
180     for ( ;; ) {
181         err = nxt_sem_wait(&tp->sem, tp->timeout);
182 
183         if (err == 0) {
184             waiting = nxt_atomic_fetch_add(&tp->waiting, -1);
185             break;
186         }
187 
188         if (err == NXT_ETIMEDOUT) {
189             if (nxt_thread_handle_equal(thr->handle, tp->main)) {
190                 continue;
191             }
192         }
193 
194         (void) nxt_atomic_fetch_add(&tp->waiting, -1);
195         (void) nxt_atomic_fetch_add(&tp->threads, -1);
196 
197         nxt_thread_exit(thr);
198         nxt_unreachable();
199     }
200 
201     nxt_log_debug(thr->log, "thread pool awake, waiting: %A", waiting);
202 
203     if (waiting > 1) {
204         return;
205     }
206 
207     do {
208         threads = tp->threads;
209 
210         if (threads >= tp->max_threads) {
211             return;
212         }
213 
214     } while (!nxt_atomic_cmp_set(&tp->threads, threads, threads + 1));
215 
216     link = nxt_zalloc(sizeof(nxt_thread_link_t));
217 
218     if (nxt_fast_path(link != NULL)) {
219         link->start = nxt_thread_pool_loop;
220         link->work.data = tp;
221 
222         if (nxt_thread_create(&handle, link) != NXT_OK) {
223             (void) nxt_atomic_fetch_add(&tp->threads, -1);
224         }
225     }
226 }
227 
228 
229 void
nxt_thread_pool_destroy(nxt_thread_pool_t * tp)230 nxt_thread_pool_destroy(nxt_thread_pool_t *tp)
231 {
232     nxt_thread_t  *thr;
233 
234     thr = nxt_thread();
235 
236     nxt_log_debug(thr->log, "thread pool destroy: %A", tp->ready);
237 
238     if (!tp->ready) {
239         nxt_work_queue_add(&thr->engine->fast_work_queue, tp->exit,
240                            &tp->engine->task, tp, NULL);
241         return;
242     }
243 
244     if (tp->max_threads != 0) {
245         /* Disable new threads creation and mark a pool as being destroyed. */
246         tp->max_threads = 0;
247 
248         nxt_work_set(&tp->work, nxt_thread_pool_exit, &tp->task, tp, NULL);
249 
250         nxt_thread_pool_post(tp, &tp->work);
251     }
252 }
253 
254 
255 /*
256  * A thread handle (pthread_t) is either pointer or integer, so it can be
257  * passed as work handler pointer "data" argument.  To convert void pointer
258  * to pthread_t and vice versa the source argument should be cast first to
259  * uintptr_t type and then to the destination type.
260  *
261  * If the handle would be a struct it should be stored in thread pool and
262  * the thread pool must be freed in the thread pool exit procedure after
263  * the last thread of pool will exit.
264  */
265 
266 static void
nxt_thread_pool_exit(nxt_task_t * task,void * obj,void * data)267 nxt_thread_pool_exit(nxt_task_t *task, void *obj, void *data)
268 {
269     nxt_thread_t         *thread;
270     nxt_thread_pool_t    *tp;
271     nxt_atomic_uint_t    threads;
272     nxt_thread_handle_t  handle;
273 
274     tp = obj;
275     thread = task->thread;
276 
277     nxt_debug(task, "thread pool exit");
278 
279     if (data != NULL) {
280         handle = (nxt_thread_handle_t) (uintptr_t) data;
281         nxt_thread_wait(handle);
282     }
283 
284     threads = nxt_atomic_fetch_add(&tp->threads, -1);
285 
286     nxt_debug(task, "thread pool threads: %A", threads);
287 
288     if (threads > 1) {
289         nxt_work_set(&tp->work, nxt_thread_pool_exit, &tp->task, tp,
290                      (void *) (uintptr_t) thread->handle);
291 
292         nxt_thread_pool_post(tp, &tp->work);
293 
294     } else {
295         nxt_debug(task, "thread pool destroy");
296 
297         nxt_sem_destroy(&tp->sem);
298 
299         nxt_work_set(&tp->work, tp->exit, &tp->engine->task, tp,
300                      (void *) (uintptr_t) thread->handle);
301 
302         nxt_event_engine_post(tp->engine, &tp->work);
303 
304         /* The "tp" memory should be freed by tp->exit handler. */
305     }
306 
307     nxt_thread_exit(thread);
308 
309     nxt_unreachable();
310 }
311