xref: /unit/src/nxt_work_queue.c (revision 0:a63ceefd6ab0)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 /*
11  * Available work items are crucial for overall engine operation, so
12  * the items are preallocated in two chunks: cache and spare chunks.
13  * By default each chunk preallocates 409 work items on two or four
14  * CPU pages depending on platform.  If all items in a cache chunk are
15  * exhausted then a spare chunk becomes a cache chunk, and a new spare
16  * chunk is allocated.  This two-step allocation mitigates low memory
17  * condition impact on work queue operation.  However, if both chunks
18  * are exhausted then a thread will sleep in reliance on another thread
19  * frees some memory.  However, this may lead to deadlock and probably
20  * a process should be aborted.  This behaviour should be considered as
21  * abort on program stack exhaustion.
22  *
23  * The cache and spare chunks initially are also allocated in two steps:
24  * a spare chunk is allocated first, then it becomes the cache chunk and
25  * a new spare chunk is allocated again.
26  */
27 
28 static void nxt_work_queue_allocate(nxt_work_queue_cache_t *cache,
29     nxt_thread_spinlock_t *lock);
30 static void nxt_work_queue_sleep(nxt_thread_spinlock_t *lock);
31 static nxt_work_queue_t *nxt_thread_current_work_queue(nxt_thread_t *thr);
32 static nxt_work_handler_t nxt_locked_work_queue_pop_work(
33     nxt_locked_work_queue_t *lwq, void **obj, void **data, nxt_log_t **log);
34 
35 
36 /* It should be adjusted with the "work_queue_bucket_items" directive. */
37 static nxt_uint_t  nxt_work_queue_bucket_items = 409;
38 
39 
40 void
41 nxt_thread_work_queue_create(nxt_thread_t *thr, size_t chunk_size)
42 {
43     nxt_memzero(&thr->work_queue, sizeof(nxt_thread_work_queue_t));
44 
45     nxt_work_queue_name(&thr->work_queue.main, "main");
46     nxt_work_queue_name(&thr->work_queue.last, "last");
47 
48     if (chunk_size == 0) {
49         chunk_size = nxt_work_queue_bucket_items;
50     }
51 
52     /* nxt_work_queue_chunk_t already has one work item. */
53     thr->work_queue.cache.chunk_size = chunk_size - 1;
54 
55     while (thr->work_queue.cache.next == NULL) {
56         nxt_work_queue_allocate(&thr->work_queue.cache, NULL);
57     }
58 }
59 
60 
61 void
62 nxt_thread_work_queue_destroy(nxt_thread_t *thr)
63 {
64     nxt_work_queue_chunk_t  *chunk, *next;
65 
66     for (chunk = thr->work_queue.cache.chunk; chunk; chunk = next) {
67         next = chunk->next;
68         nxt_free(chunk);
69     }
70 }
71 
72 
73 static void
74 nxt_work_queue_allocate(nxt_work_queue_cache_t *cache,
75     nxt_thread_spinlock_t *lock)
76 {
77     size_t                  size;
78     nxt_uint_t              i, n;
79     nxt_work_t              *work;
80     nxt_work_queue_chunk_t  *chunk;
81 
82     n = cache->chunk_size;
83     size = sizeof(nxt_work_queue_chunk_t) + n * sizeof(nxt_work_t);
84 
85     chunk = nxt_malloc(size);
86 
87     if (nxt_fast_path(chunk != NULL)) {
88 
89         chunk->next = cache->chunk;
90         cache->chunk = chunk;
91         work = &chunk->work;
92 
93         for (i = 0; i < n; i++) {
94             work[i].next = &work[i + 1];
95         }
96 
97         work[i].next = NULL;
98         work++;
99 
100     } else if (cache->spare != NULL) {
101 
102         work = NULL;
103 
104     } else {
105         nxt_work_queue_sleep(lock);
106         return;
107     }
108 
109     cache->next = cache->spare;
110     cache->spare = work;
111 }
112 
113 
114 static void
115 nxt_work_queue_sleep(nxt_thread_spinlock_t *lock)
116 {
117     if (lock != NULL) {
118         nxt_thread_spin_unlock(lock);
119     }
120 
121     nxt_nanosleep(100 * 1000000);  /* 100ms */
122 
123     if (lock != NULL) {
124         nxt_thread_spin_lock(lock);
125     }
126 }
127 
128 
129 /* Add a work to a work queue tail. */
130 
131 void
132 nxt_thread_work_queue_add(nxt_thread_t *thr, nxt_work_queue_t *wq,
133     nxt_work_handler_t handler, void *obj, void *data, nxt_log_t *log)
134 {
135     nxt_work_t  *work;
136 
137     nxt_work_queue_attach(thr, wq);
138 
139     for ( ;; ) {
140         work = thr->work_queue.cache.next;
141 
142         if (nxt_fast_path(work != NULL)) {
143             thr->work_queue.cache.next = work->next;
144             work->next = NULL;
145 
146             work->handler = handler;
147             work->obj = obj;
148             work->data = data;
149             work->log = log;
150 
151             if (wq->tail != NULL) {
152                 wq->tail->next = work;
153 
154             } else {
155                 wq->head = work;
156             }
157 
158             wq->tail = work;
159 
160             return;
161         }
162 
163         nxt_work_queue_allocate(&thr->work_queue.cache, NULL);
164     }
165 }
166 
167 
168 /* Push a work to a work queue head. */
169 
170 void
171 nxt_thread_work_queue_push(nxt_thread_t *thr, nxt_work_queue_t *wq,
172     nxt_work_handler_t handler, void *obj, void *data, nxt_log_t *log)
173 {
174     nxt_work_t  *work;
175 
176     nxt_work_queue_attach(thr, wq);
177 
178     for ( ;; ) {
179         work = thr->work_queue.cache.next;
180 
181         if (nxt_fast_path(work != NULL)) {
182             thr->work_queue.cache.next = work->next;
183             work->next = wq->head;
184 
185             work->handler = handler;
186             work->obj = obj;
187             work->data = data;
188             work->log = log;
189 
190             wq->head = work;
191 
192             if (wq->tail == NULL) {
193                 wq->tail = work;
194             }
195 
196             return;
197         }
198 
199         nxt_work_queue_allocate(&thr->work_queue.cache, NULL);
200     }
201 }
202 
203 
204 /* Attach a work queue to a thread work queue. */
205 
206 void
207 nxt_work_queue_attach(nxt_thread_t *thr, nxt_work_queue_t *wq)
208 {
209     if (wq->next == NULL && wq != thr->work_queue.tail) {
210 
211         if (thr->work_queue.tail != NULL) {
212             thr->work_queue.tail->next = wq;
213 
214         } else {
215             thr->work_queue.head = wq;
216         }
217 
218         thr->work_queue.tail = wq;
219     }
220 }
221 
222 
223 /* Pop a work from a thread work queue head. */
224 
225 nxt_work_handler_t
226 nxt_thread_work_queue_pop(nxt_thread_t *thr, void **obj, void **data,
227     nxt_log_t **log)
228 {
229     nxt_work_t        *work;
230     nxt_work_queue_t  *wq;
231 
232     wq = nxt_thread_current_work_queue(thr);
233 
234     if (wq != NULL) {
235 
236         work = wq->head;
237 
238         if (work != NULL) {
239             wq->head = work->next;
240 
241             if (work->next == NULL) {
242                 wq->tail = NULL;
243             }
244 
245             *obj = work->obj;
246             nxt_prefetch(*obj);
247             *data = work->data;
248             nxt_prefetch(*data);
249 
250             work->next = thr->work_queue.cache.next;
251             thr->work_queue.cache.next = work;
252 
253             *log = work->log;
254 
255 #if (NXT_DEBUG)
256 
257             if (work->handler == NULL) {
258                 nxt_log_alert(thr->log, "null work handler");
259                 nxt_abort();
260             }
261 
262 #endif
263 
264             return work->handler;
265         }
266     }
267 
268     return NULL;
269 }
270 
271 
272 static nxt_work_queue_t *
273 nxt_thread_current_work_queue(nxt_thread_t *thr)
274 {
275     nxt_work_queue_t  *wq, *next;
276 
277     for (wq = thr->work_queue.head; wq != NULL; wq = next) {
278 
279         if (wq->head != NULL) {
280             nxt_log_debug(thr->log, "work queue: %s", wq->name);
281             return wq;
282         }
283 
284         /* Detach empty work queue. */
285         next = wq->next;
286         wq->next = NULL;
287         thr->work_queue.head = next;
288     }
289 
290     thr->work_queue.tail = NULL;
291 
292     return NULL;
293 }
294 
295 
296 /* Drop a work with specified data from a thread work queue. */
297 
298 void
299 nxt_thread_work_queue_drop(nxt_thread_t *thr, void *data)
300 {
301     nxt_work_t        *work, *prev, *next, **link;
302     nxt_work_queue_t  *wq;
303 
304     for (wq = thr->work_queue.head; wq != NULL; wq = wq->next) {
305 
306         prev = NULL;
307         link = &wq->head;
308 
309         for (work = wq->head; work != NULL; work = next) {
310 
311             next = work->next;
312 
313             if (data != work->obj) {
314                 prev = work;
315                 link = &work->next;
316 
317             } else {
318                 if (next == NULL) {
319                     wq->tail = prev;
320                 }
321 
322                 nxt_log_debug(thr->log, "work queue drop");
323 
324                 *link = next;
325 
326                 work->next = thr->work_queue.cache.next;
327                 thr->work_queue.cache.next = work;
328             }
329         }
330     }
331 }
332 
333 
334 /* Add a work to the thread last work queue's tail. */
335 
336 void
337 nxt_thread_last_work_queue_add(nxt_thread_t *thr, nxt_work_handler_t handler,
338     void *obj, void *data, nxt_log_t *log)
339 {
340     nxt_work_t  *work;
341 
342     for ( ;; ) {
343         work = thr->work_queue.cache.next;
344 
345         if (nxt_fast_path(work != NULL)) {
346             thr->work_queue.cache.next = work->next;
347             work->next = NULL;
348 
349             work->handler = handler;
350             work->obj = obj;
351             work->data = data;
352             work->log = log;
353 
354             if (thr->work_queue.last.tail != NULL) {
355                 thr->work_queue.last.tail->next = work;
356 
357             } else {
358                 thr->work_queue.last.head = work;
359             }
360 
361             thr->work_queue.last.tail = work;
362 
363             return;
364         }
365 
366         nxt_work_queue_allocate(&thr->work_queue.cache, NULL);
367     }
368 }
369 
370 
371 /* Pop a work from the thread last work queue's head. */
372 
373 nxt_work_handler_t
374 nxt_thread_last_work_queue_pop(nxt_thread_t *thr, void **obj, void **data,
375     nxt_log_t **log)
376 {
377     nxt_work_t  *work;
378 
379     work = thr->work_queue.last.head;
380 
381     if (work != NULL) {
382         nxt_log_debug(thr->log, "work queue: %s", thr->work_queue.last.name);
383 
384         thr->work_queue.last.head = work->next;
385 
386         if (work->next == NULL) {
387             thr->work_queue.last.tail = NULL;
388         }
389 
390         *obj = work->obj;
391         nxt_prefetch(*obj);
392         *data = work->data;
393         nxt_prefetch(*data);
394 
395         work->next = thr->work_queue.cache.next;
396         thr->work_queue.cache.next = work;
397 
398         *log = work->log;
399 
400 #if (NXT_DEBUG)
401 
402         if (work->handler == NULL) {
403             nxt_log_alert(thr->log, "null work handler");
404             nxt_abort();
405         }
406 
407 #endif
408 
409         return work->handler;
410     }
411 
412     return NULL;
413 }
414 
415 
416 void
417 nxt_work_queue_destroy(nxt_work_queue_t *wq)
418 {
419     nxt_thread_t      *thr;
420     nxt_work_queue_t  *q;
421 
422     thr = nxt_thread();
423 
424     /* Detach from a thread work queue. */
425 
426     if (thr->work_queue.head == wq) {
427         thr->work_queue.head = wq->next;
428         q = NULL;
429         goto found;
430     }
431 
432     for (q = thr->work_queue.head; q != NULL; q = q->next) {
433         if (q->next == wq) {
434             q->next = wq->next;
435             goto found;
436         }
437     }
438 
439     return;
440 
441 found:
442 
443     if (thr->work_queue.tail == wq) {
444         thr->work_queue.tail = q;
445     }
446 
447     /* Move all queue's works to a thread work queue cache. */
448 
449     if (wq->tail != NULL) {
450         wq->tail->next = thr->work_queue.cache.next;
451     }
452 
453     if (wq->head != NULL) {
454         thr->work_queue.cache.next = wq->head;
455     }
456 }
457 
458 
459 /* Locked work queue operations. */
460 
461 void
462 nxt_locked_work_queue_create(nxt_locked_work_queue_t *lwq, size_t chunk_size)
463 {
464     nxt_memzero(lwq, sizeof(nxt_locked_work_queue_t));
465 
466     if (chunk_size == 0) {
467         chunk_size = nxt_work_queue_bucket_items;
468     }
469 
470     lwq->cache.chunk_size = chunk_size;
471 
472     while (lwq->cache.next == NULL) {
473         nxt_work_queue_allocate(&lwq->cache, NULL);
474     }
475 }
476 
477 
478 void
479 nxt_locked_work_queue_destroy(nxt_locked_work_queue_t *lwq)
480 {
481     nxt_work_queue_chunk_t  *chunk, *next;
482 
483     for (chunk = lwq->cache.chunk; chunk; chunk = next) {
484         next = chunk->next;
485         nxt_free(chunk);
486     }
487 }
488 
489 
490 /* Add a work to a locked work queue tail. */
491 
492 void
493 nxt_locked_work_queue_add(nxt_locked_work_queue_t *lwq,
494     nxt_work_handler_t handler, void *obj, void *data, nxt_log_t *log)
495 {
496     nxt_work_t  *work;
497 
498     nxt_thread_spin_lock(&lwq->lock);
499 
500     for ( ;; ) {
501         work = lwq->cache.next;
502 
503         if (nxt_fast_path(work != NULL)) {
504             lwq->cache.next = work->next;
505 
506             work->next = NULL;
507             work->handler = handler;
508             work->obj = obj;
509             work->data = data;
510             work->log = log;
511 
512             if (lwq->tail != NULL) {
513                 lwq->tail->next = work;
514 
515             } else {
516                 lwq->head = work;
517             }
518 
519             lwq->tail = work;
520 
521             break;
522         }
523 
524         nxt_work_queue_allocate(&lwq->cache, &lwq->lock);
525     }
526 
527     nxt_thread_spin_unlock(&lwq->lock);
528 }
529 
530 
531 /* Pop a work from a locked work queue head. */
532 
533 nxt_work_handler_t
534 nxt_locked_work_queue_pop(nxt_locked_work_queue_t *lwq, void **obj,
535     void **data, nxt_log_t **log)
536 {
537     nxt_work_handler_t  handler;
538 
539     nxt_thread_spin_lock(&lwq->lock);
540 
541     handler = nxt_locked_work_queue_pop_work(lwq, obj, data, log);
542 
543     nxt_thread_spin_unlock(&lwq->lock);
544 
545     return handler;
546 }
547 
548 
549 static nxt_work_handler_t
550 nxt_locked_work_queue_pop_work(nxt_locked_work_queue_t *lwq, void **obj,
551     void **data, nxt_log_t **log)
552 {
553     nxt_work_t  *work;
554 
555     work = lwq->head;
556 
557     if (work == NULL) {
558         return NULL;
559     }
560 
561     *obj = work->obj;
562     nxt_prefetch(*obj);
563     *data = work->data;
564     nxt_prefetch(*data);
565 
566     lwq->head = work->next;
567 
568     if (work->next == NULL) {
569         lwq->tail = NULL;
570     }
571 
572     work->next = lwq->cache.next;
573     lwq->cache.next = work;
574 
575     *log = work->log;
576 
577     return work->handler;
578 }
579 
580 
581 /* Move all works from a locked work queue to a usual work queue. */
582 
583 void
584 nxt_locked_work_queue_move(nxt_thread_t *thr, nxt_locked_work_queue_t *lwq,
585     nxt_work_queue_t *wq)
586 {
587     void                *obj, *data;
588     nxt_log_t           *log;
589     nxt_work_handler_t  handler;
590 
591     /* Locked work queue head can be tested without a lock. */
592 
593     if (nxt_fast_path(lwq->head == NULL)) {
594         return;
595     }
596 
597     nxt_thread_spin_lock(&lwq->lock);
598 
599     for ( ;; ) {
600         handler = nxt_locked_work_queue_pop_work(lwq, &obj, &data, &log);
601 
602         if (handler == NULL) {
603             break;
604         }
605 
606         nxt_thread_work_queue_add(thr, wq, handler, obj, data, log);
607     }
608 
609     nxt_thread_spin_unlock(&lwq->lock);
610 }
611