xref: /unit/src/nxt_thread_pool.c (revision 4:76c63e9b6322)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 static nxt_int_t nxt_thread_pool_init(nxt_thread_pool_t *tp);
11 static void nxt_thread_pool_exit(nxt_task_t *task, void *obj, void *data);
12 static void nxt_thread_pool_start(void *ctx);
13 static void nxt_thread_pool_wait(nxt_thread_pool_t *tp);
14 
15 
16 nxt_thread_pool_t *
17 nxt_thread_pool_create(nxt_uint_t max_threads, nxt_nsec_t timeout,
18     nxt_thread_pool_init_t init, nxt_event_engine_t *engine,
19     nxt_work_handler_t exit)
20 {
21     nxt_thread_pool_t  *tp;
22 
23     tp = nxt_zalloc(sizeof(nxt_thread_pool_t));
24     if (tp == NULL) {
25         return NULL;
26     }
27 
28     tp->max_threads = max_threads;
29     tp->timeout = timeout;
30     tp->engine = engine;
31     tp->task.thread = engine->task.thread;
32     tp->task.log = engine->task.log;
33     tp->init = init;
34     tp->exit = exit;
35 
36     return tp;
37 }
38 
39 
40 nxt_int_t
41 nxt_thread_pool_post(nxt_thread_pool_t *tp, nxt_work_t *work)
42 {
43     nxt_thread_log_debug("thread pool post");
44 
45     if (nxt_slow_path(nxt_thread_pool_init(tp) != NXT_OK)) {
46         return NXT_ERROR;
47     }
48 
49     nxt_locked_work_queue_add(&tp->work_queue, work);
50 
51     (void) nxt_sem_post(&tp->sem);
52 
53     return NXT_OK;
54 }
55 
56 
57 static nxt_int_t
58 nxt_thread_pool_init(nxt_thread_pool_t *tp)
59 {
60     nxt_int_t            ret;
61     nxt_thread_link_t    *link;
62     nxt_thread_handle_t  handle;
63 
64     if (nxt_fast_path(tp->ready)) {
65         return NXT_OK;
66     }
67 
68     if (tp->max_threads == 0) {
69         /* The pool is being destroyed. */
70         return NXT_ERROR;
71     }
72 
73     nxt_thread_spin_lock(&tp->work_queue.lock);
74 
75     ret = NXT_OK;
76 
77     if (!tp->ready) {
78 
79         nxt_thread_log_debug("thread pool init");
80 
81         (void) nxt_atomic_fetch_add(&tp->threads, 1);
82 
83         if (nxt_fast_path(nxt_sem_init(&tp->sem, 0) == NXT_OK)) {
84 
85             link = nxt_malloc(sizeof(nxt_thread_link_t));
86 
87             if (nxt_fast_path(link != NULL)) {
88                 link->start = nxt_thread_pool_start;
89                 link->data = tp;
90                 link->engine = tp->engine;
91                 /*
92                  * link->exit is not used.  link->engine is used just to
93                  * set thr->link by nxt_thread_trampoline() and the link
94                  * is a mark of the first thread of pool.
95                  */
96                 if (nxt_thread_create(&handle, link) == NXT_OK) {
97                     tp->ready = 1;
98                     goto done;
99                 }
100             }
101 
102             nxt_sem_destroy(&tp->sem);
103         }
104 
105         (void) nxt_atomic_fetch_add(&tp->threads, -1);
106 
107         ret = NXT_ERROR;
108     }
109 
110 done:
111 
112     nxt_thread_spin_unlock(&tp->work_queue.lock);
113 
114     return ret;
115 }
116 
117 
118 static void
119 nxt_thread_pool_start(void *ctx)
120 {
121     void                *obj, *data;
122     nxt_task_t          *task;
123     nxt_thread_t        *thr;
124     nxt_thread_pool_t   *tp;
125     nxt_work_handler_t  handler;
126 
127     tp = ctx;
128     thr = nxt_thread();
129 
130     if (thr->link != NULL) {
131         /* Only the first thread has a link. */
132         tp->main = thr->handle;
133         nxt_free(thr->link);
134         thr->link = NULL;
135 
136         tp->task.thread = thr;
137     }
138 
139     thr->thread_pool = tp;
140 
141     if (tp->init != NULL) {
142         tp->init();
143     }
144 
145     for ( ;; ) {
146         nxt_thread_pool_wait(tp);
147 
148         handler = nxt_locked_work_queue_pop(&tp->work_queue, &task, &obj,
149                                             &data);
150 
151         if (nxt_fast_path(handler != NULL)) {
152             task->thread = thr;
153 
154             nxt_log_debug(thr->log, "locked work queue");
155 
156             handler(task, obj, data);
157         }
158 
159         thr->log = &nxt_main_log;
160     }
161 }
162 
163 
164 static void
165 nxt_thread_pool_wait(nxt_thread_pool_t *tp)
166 {
167     nxt_err_t            err;
168     nxt_thread_t         *thr;
169     nxt_atomic_uint_t    waiting, threads;
170     nxt_thread_link_t    *link;
171     nxt_thread_handle_t  handle;
172 
173     thr = nxt_thread();
174 
175     nxt_log_debug(thr->log, "thread pool wait");
176 
177     (void) nxt_atomic_fetch_add(&tp->waiting, 1);
178 
179     for ( ;; ) {
180         err = nxt_sem_wait(&tp->sem, tp->timeout);
181 
182         if (err == 0) {
183             waiting = nxt_atomic_fetch_add(&tp->waiting, -1);
184             break;
185         }
186 
187         if (err == NXT_ETIMEDOUT) {
188             if (nxt_thread_handle_equal(thr->handle, tp->main)) {
189                 continue;
190             }
191         }
192 
193         (void) nxt_atomic_fetch_add(&tp->waiting, -1);
194         (void) nxt_atomic_fetch_add(&tp->threads, -1);
195 
196         nxt_thread_exit(thr);
197         nxt_unreachable();
198     }
199 
200     nxt_log_debug(thr->log, "thread pool awake, waiting: %A", waiting);
201 
202     if (waiting > 1) {
203         return;
204     }
205 
206     do {
207         threads = tp->threads;
208 
209         if (threads >= tp->max_threads) {
210             return;
211         }
212 
213     } while (!nxt_atomic_cmp_set(&tp->threads, threads, threads + 1));
214 
215     link = nxt_zalloc(sizeof(nxt_thread_link_t));
216 
217     if (nxt_fast_path(link != NULL)) {
218         link->start = nxt_thread_pool_start;
219         link->data = tp;
220 
221         if (nxt_thread_create(&handle, link) != NXT_OK) {
222             (void) nxt_atomic_fetch_add(&tp->threads, -1);
223         }
224     }
225 }
226 
227 
228 void
229 nxt_thread_pool_destroy(nxt_thread_pool_t *tp)
230 {
231     nxt_thread_t  *thr;
232 
233     thr = nxt_thread();
234 
235     if (!tp->ready) {
236         nxt_work_queue_add(&thr->engine->fast_work_queue, tp->exit,
237                            &tp->task, tp, NULL);
238         return;
239     }
240 
241     if (tp->max_threads != 0) {
242         /* Disable new threads creation and mark a pool as being destroyed. */
243         tp->max_threads = 0;
244 
245         nxt_work_set(&tp->work, nxt_thread_pool_exit, &tp->task, tp, NULL);
246 
247         nxt_thread_pool_post(tp, &tp->work);
248     }
249 }
250 
251 
252 /*
253  * A thread handle (pthread_t) is either pointer or integer, so it can be
254  * passed as work handler pointer "data" argument.  To convert void pointer
255  * to pthread_t and vice versa the source argument should be cast first to
256  * uintptr_t type and then to the destination type.
257  *
258  * If the handle would be a struct it should be stored in thread pool and
259  * the thread pool must be freed in the thread pool exit procedure after
260  * the last thread of pool will exit.
261  */
262 
263 static void
264 nxt_thread_pool_exit(nxt_task_t *task, void *obj, void *data)
265 {
266     nxt_thread_t         *thread;
267     nxt_thread_pool_t    *tp;
268     nxt_atomic_uint_t    threads;
269     nxt_thread_handle_t  handle;
270 
271     tp = obj;
272     thread = task->thread;
273 
274     nxt_debug(task, "thread pool exit");
275 
276     if (data != NULL) {
277         handle = (nxt_thread_handle_t) (uintptr_t) data;
278         nxt_thread_wait(handle);
279     }
280 
281     threads = nxt_atomic_fetch_add(&tp->threads, -1);
282 
283     nxt_debug(task, "thread pool threads: %A", threads);
284 
285     if (threads > 1) {
286         nxt_work_set(&tp->work, nxt_thread_pool_exit, &tp->task, tp,
287                      (void *) (uintptr_t) thread->handle);
288 
289         nxt_thread_pool_post(tp, &tp->work);
290 
291     } else {
292         nxt_debug(task, "thread pool destroy");
293 
294         nxt_sem_destroy(&tp->sem);
295 
296         nxt_work_set(&tp->work, nxt_thread_pool_exit, &tp->task, tp,
297                      (void *) (uintptr_t) thread->handle);
298 
299         nxt_event_engine_post(tp->engine, &tp->work);
300 
301         /* The "tp" memory should be freed by tp->exit handler. */
302     }
303 
304     nxt_thread_exit(thread);
305 
306     nxt_unreachable();
307 }
308