xref: /unit/src/nxt_work_queue.c (revision 521:93dc4a28dd37)
10Sigor@sysoev.ru 
20Sigor@sysoev.ru /*
30Sigor@sysoev.ru  * Copyright (C) Igor Sysoev
40Sigor@sysoev.ru  * Copyright (C) NGINX, Inc.
50Sigor@sysoev.ru  */
60Sigor@sysoev.ru 
70Sigor@sysoev.ru #include <nxt_main.h>
80Sigor@sysoev.ru 
90Sigor@sysoev.ru 
100Sigor@sysoev.ru /*
110Sigor@sysoev.ru  * Available work items are crucial for overall engine operation, so
120Sigor@sysoev.ru  * the items are preallocated in two chunks: cache and spare chunks.
130Sigor@sysoev.ru  * By default each chunk preallocates 409 work items on two or four
140Sigor@sysoev.ru  * CPU pages depending on platform.  If all items in a cache chunk are
150Sigor@sysoev.ru  * exhausted then a spare chunk becomes a cache chunk, and a new spare
160Sigor@sysoev.ru  * chunk is allocated.  This two-step allocation mitigates low memory
170Sigor@sysoev.ru  * condition impact on work queue operation.  However, if both chunks
180Sigor@sysoev.ru  * are exhausted then a thread will sleep in reliance on another thread
190Sigor@sysoev.ru  * frees some memory.  However, this may lead to deadlock and probably
200Sigor@sysoev.ru  * a process should be aborted.  This behaviour should be considered as
210Sigor@sysoev.ru  * abort on program stack exhaustion.
220Sigor@sysoev.ru  *
230Sigor@sysoev.ru  * The cache and spare chunks initially are also allocated in two steps:
240Sigor@sysoev.ru  * a spare chunk is allocated first, then it becomes the cache chunk and
250Sigor@sysoev.ru  * a new spare chunk is allocated again.
260Sigor@sysoev.ru  */
270Sigor@sysoev.ru 
284Sigor@sysoev.ru static void nxt_work_queue_allocate(nxt_work_queue_cache_t *cache);
290Sigor@sysoev.ru 
300Sigor@sysoev.ru 
310Sigor@sysoev.ru /* It should be adjusted with the "work_queue_bucket_items" directive. */
320Sigor@sysoev.ru static nxt_uint_t  nxt_work_queue_bucket_items = 409;
330Sigor@sysoev.ru 
340Sigor@sysoev.ru 
35127Smax.romanov@nginx.com #if (NXT_DEBUG)
36127Smax.romanov@nginx.com 
37127Smax.romanov@nginx.com nxt_inline void
nxt_work_queue_thread_assert(nxt_work_queue_t * wq)38127Smax.romanov@nginx.com nxt_work_queue_thread_assert(nxt_work_queue_t *wq)
39127Smax.romanov@nginx.com {
40127Smax.romanov@nginx.com     nxt_tid_t     tid;
41127Smax.romanov@nginx.com     nxt_thread_t  *thread;
42127Smax.romanov@nginx.com 
43127Smax.romanov@nginx.com     thread = nxt_thread();
44127Smax.romanov@nginx.com     tid = nxt_thread_tid(thread);
45127Smax.romanov@nginx.com 
46127Smax.romanov@nginx.com     if (nxt_fast_path(wq->tid == tid)) {
47127Smax.romanov@nginx.com         return;
48127Smax.romanov@nginx.com     }
49127Smax.romanov@nginx.com 
50127Smax.romanov@nginx.com     if (nxt_slow_path(nxt_pid != wq->pid)) {
51127Smax.romanov@nginx.com         wq->pid = nxt_pid;
52127Smax.romanov@nginx.com         wq->tid = tid;
53127Smax.romanov@nginx.com 
54127Smax.romanov@nginx.com         return;
55127Smax.romanov@nginx.com     }
56127Smax.romanov@nginx.com 
57127Smax.romanov@nginx.com     nxt_log_alert(thread->log, "work queue locked by thread %PT", wq->tid);
58127Smax.romanov@nginx.com     nxt_abort();
59127Smax.romanov@nginx.com }
60127Smax.romanov@nginx.com 
61165Smax.romanov@nginx.com 
nxt_work_queue_thread_adopt(nxt_work_queue_t * wq)62165Smax.romanov@nginx.com void nxt_work_queue_thread_adopt(nxt_work_queue_t *wq)
63165Smax.romanov@nginx.com {
64165Smax.romanov@nginx.com     nxt_thread_t  *thread;
65165Smax.romanov@nginx.com 
66165Smax.romanov@nginx.com     thread = nxt_thread();
67165Smax.romanov@nginx.com 
68165Smax.romanov@nginx.com     wq->pid = nxt_pid;
69165Smax.romanov@nginx.com     wq->tid = nxt_thread_tid(thread);
70165Smax.romanov@nginx.com }
71165Smax.romanov@nginx.com 
72165Smax.romanov@nginx.com 
73165Smax.romanov@nginx.com void
nxt_work_queue_name(nxt_work_queue_t * wq,const char * name)74*521Szelenkov@nginx.com nxt_work_queue_name(nxt_work_queue_t *wq, const char *name)
75165Smax.romanov@nginx.com {
76165Smax.romanov@nginx.com     nxt_work_queue_thread_assert(wq);
77165Smax.romanov@nginx.com 
78165Smax.romanov@nginx.com     wq->name = name;
79165Smax.romanov@nginx.com }
80165Smax.romanov@nginx.com 
81127Smax.romanov@nginx.com #else
82127Smax.romanov@nginx.com 
83127Smax.romanov@nginx.com #define nxt_work_queue_thread_assert(wq)
84127Smax.romanov@nginx.com 
85127Smax.romanov@nginx.com #endif
86127Smax.romanov@nginx.com 
87127Smax.romanov@nginx.com 
880Sigor@sysoev.ru void
nxt_work_queue_cache_create(nxt_work_queue_cache_t * cache,size_t chunk_size)894Sigor@sysoev.ru nxt_work_queue_cache_create(nxt_work_queue_cache_t *cache, size_t chunk_size)
900Sigor@sysoev.ru {
914Sigor@sysoev.ru     nxt_memzero(cache, sizeof(nxt_work_queue_cache_t));
920Sigor@sysoev.ru 
930Sigor@sysoev.ru     if (chunk_size == 0) {
940Sigor@sysoev.ru         chunk_size = nxt_work_queue_bucket_items;
950Sigor@sysoev.ru     }
960Sigor@sysoev.ru 
970Sigor@sysoev.ru     /* nxt_work_queue_chunk_t already has one work item. */
984Sigor@sysoev.ru     cache->chunk_size = chunk_size - 1;
990Sigor@sysoev.ru 
1004Sigor@sysoev.ru     while (cache->next == NULL) {
1014Sigor@sysoev.ru         nxt_work_queue_allocate(cache);
1020Sigor@sysoev.ru     }
1030Sigor@sysoev.ru }
1040Sigor@sysoev.ru 
1050Sigor@sysoev.ru 
1060Sigor@sysoev.ru void
nxt_work_queue_cache_destroy(nxt_work_queue_cache_t * cache)1074Sigor@sysoev.ru nxt_work_queue_cache_destroy(nxt_work_queue_cache_t *cache)
1080Sigor@sysoev.ru {
1090Sigor@sysoev.ru     nxt_work_queue_chunk_t  *chunk, *next;
1100Sigor@sysoev.ru 
1114Sigor@sysoev.ru     for (chunk = cache->chunk; chunk; chunk = next) {
1120Sigor@sysoev.ru         next = chunk->next;
1130Sigor@sysoev.ru         nxt_free(chunk);
1140Sigor@sysoev.ru     }
1150Sigor@sysoev.ru }
1160Sigor@sysoev.ru 
1170Sigor@sysoev.ru 
1180Sigor@sysoev.ru static void
nxt_work_queue_allocate(nxt_work_queue_cache_t * cache)1194Sigor@sysoev.ru nxt_work_queue_allocate(nxt_work_queue_cache_t *cache)
1200Sigor@sysoev.ru {
1210Sigor@sysoev.ru     size_t                  size;
1220Sigor@sysoev.ru     nxt_uint_t              i, n;
1230Sigor@sysoev.ru     nxt_work_t              *work;
1240Sigor@sysoev.ru     nxt_work_queue_chunk_t  *chunk;
1250Sigor@sysoev.ru 
1260Sigor@sysoev.ru     n = cache->chunk_size;
1270Sigor@sysoev.ru     size = sizeof(nxt_work_queue_chunk_t) + n * sizeof(nxt_work_t);
1280Sigor@sysoev.ru 
1290Sigor@sysoev.ru     chunk = nxt_malloc(size);
1300Sigor@sysoev.ru 
1310Sigor@sysoev.ru     if (nxt_fast_path(chunk != NULL)) {
1320Sigor@sysoev.ru 
1330Sigor@sysoev.ru         chunk->next = cache->chunk;
1340Sigor@sysoev.ru         cache->chunk = chunk;
1350Sigor@sysoev.ru         work = &chunk->work;
1360Sigor@sysoev.ru 
1370Sigor@sysoev.ru         for (i = 0; i < n; i++) {
1380Sigor@sysoev.ru             work[i].next = &work[i + 1];
1390Sigor@sysoev.ru         }
1400Sigor@sysoev.ru 
1410Sigor@sysoev.ru         work[i].next = NULL;
1420Sigor@sysoev.ru         work++;
1430Sigor@sysoev.ru 
1440Sigor@sysoev.ru     } else if (cache->spare != NULL) {
1450Sigor@sysoev.ru 
1460Sigor@sysoev.ru         work = NULL;
1470Sigor@sysoev.ru 
1480Sigor@sysoev.ru     } else {
1490Sigor@sysoev.ru         return;
1500Sigor@sysoev.ru     }
1510Sigor@sysoev.ru 
1520Sigor@sysoev.ru     cache->next = cache->spare;
1530Sigor@sysoev.ru     cache->spare = work;
1540Sigor@sysoev.ru }
1550Sigor@sysoev.ru 
1560Sigor@sysoev.ru 
1570Sigor@sysoev.ru /* Add a work to a work queue tail. */
1580Sigor@sysoev.ru 
1590Sigor@sysoev.ru void
nxt_work_queue_add(nxt_work_queue_t * wq,nxt_work_handler_t handler,nxt_task_t * task,void * obj,void * data)1604Sigor@sysoev.ru nxt_work_queue_add(nxt_work_queue_t *wq, nxt_work_handler_t handler,
1614Sigor@sysoev.ru     nxt_task_t *task, void *obj, void *data)
1620Sigor@sysoev.ru {
1630Sigor@sysoev.ru     nxt_work_t  *work;
1640Sigor@sysoev.ru 
165127Smax.romanov@nginx.com     nxt_work_queue_thread_assert(wq);
166127Smax.romanov@nginx.com 
1670Sigor@sysoev.ru     for ( ;; ) {
1684Sigor@sysoev.ru         work = wq->cache->next;
1690Sigor@sysoev.ru 
1700Sigor@sysoev.ru         if (nxt_fast_path(work != NULL)) {
1714Sigor@sysoev.ru             wq->cache->next = work->next;
1720Sigor@sysoev.ru             work->next = NULL;
1730Sigor@sysoev.ru 
1740Sigor@sysoev.ru             work->handler = handler;
1751Sigor@sysoev.ru             work->task = task;
1760Sigor@sysoev.ru             work->obj = obj;
1770Sigor@sysoev.ru             work->data = data;
1780Sigor@sysoev.ru 
1790Sigor@sysoev.ru             if (wq->tail != NULL) {
1800Sigor@sysoev.ru                 wq->tail->next = work;
1810Sigor@sysoev.ru 
1820Sigor@sysoev.ru             } else {
1830Sigor@sysoev.ru                 wq->head = work;
1840Sigor@sysoev.ru             }
1850Sigor@sysoev.ru 
1860Sigor@sysoev.ru             wq->tail = work;
1870Sigor@sysoev.ru 
1880Sigor@sysoev.ru             return;
1890Sigor@sysoev.ru         }
1900Sigor@sysoev.ru 
1914Sigor@sysoev.ru         nxt_work_queue_allocate(wq->cache);
1920Sigor@sysoev.ru     }
1930Sigor@sysoev.ru }
1940Sigor@sysoev.ru 
1950Sigor@sysoev.ru 
1960Sigor@sysoev.ru nxt_work_handler_t
nxt_work_queue_pop(nxt_work_queue_t * wq,nxt_task_t ** task,void ** obj,void ** data)1974Sigor@sysoev.ru nxt_work_queue_pop(nxt_work_queue_t *wq, nxt_task_t **task, void **obj,
1981Sigor@sysoev.ru     void **data)
1990Sigor@sysoev.ru {
2000Sigor@sysoev.ru     nxt_work_t  *work;
2010Sigor@sysoev.ru 
202127Smax.romanov@nginx.com     nxt_work_queue_thread_assert(wq);
203127Smax.romanov@nginx.com 
2044Sigor@sysoev.ru     work = wq->head;
2050Sigor@sysoev.ru 
2064Sigor@sysoev.ru     wq->head = work->next;
2070Sigor@sysoev.ru 
2084Sigor@sysoev.ru     if (work->next == NULL) {
2094Sigor@sysoev.ru         wq->tail = NULL;
2100Sigor@sysoev.ru     }
2110Sigor@sysoev.ru 
2124Sigor@sysoev.ru     *task = work->task;
2130Sigor@sysoev.ru 
2144Sigor@sysoev.ru     *obj = work->obj;
2154Sigor@sysoev.ru     nxt_prefetch(*obj);
2160Sigor@sysoev.ru 
2174Sigor@sysoev.ru     *data = work->data;
2184Sigor@sysoev.ru     nxt_prefetch(*data);
2190Sigor@sysoev.ru 
2204Sigor@sysoev.ru     work->next = wq->cache->next;
2214Sigor@sysoev.ru     wq->cache->next = work;
2220Sigor@sysoev.ru 
2234Sigor@sysoev.ru     return work->handler;
2240Sigor@sysoev.ru }
2250Sigor@sysoev.ru 
2260Sigor@sysoev.ru 
2270Sigor@sysoev.ru /* Add a work to a locked work queue tail. */
2280Sigor@sysoev.ru 
2290Sigor@sysoev.ru void
nxt_locked_work_queue_add(nxt_locked_work_queue_t * lwq,nxt_work_t * work)2304Sigor@sysoev.ru nxt_locked_work_queue_add(nxt_locked_work_queue_t *lwq, nxt_work_t *work)
2310Sigor@sysoev.ru {
2320Sigor@sysoev.ru     nxt_thread_spin_lock(&lwq->lock);
2330Sigor@sysoev.ru 
2344Sigor@sysoev.ru     if (lwq->tail != NULL) {
2354Sigor@sysoev.ru         lwq->tail->next = work;
2360Sigor@sysoev.ru 
2374Sigor@sysoev.ru     } else {
2384Sigor@sysoev.ru         lwq->head = work;
2394Sigor@sysoev.ru     }
2400Sigor@sysoev.ru 
2414Sigor@sysoev.ru     lwq->tail = work;
2420Sigor@sysoev.ru 
2430Sigor@sysoev.ru     nxt_thread_spin_unlock(&lwq->lock);
2440Sigor@sysoev.ru }
2450Sigor@sysoev.ru 
2460Sigor@sysoev.ru 
2470Sigor@sysoev.ru /* Pop a work from a locked work queue head. */
2480Sigor@sysoev.ru 
2490Sigor@sysoev.ru nxt_work_handler_t
nxt_locked_work_queue_pop(nxt_locked_work_queue_t * lwq,nxt_task_t ** task,void ** obj,void ** data)2501Sigor@sysoev.ru nxt_locked_work_queue_pop(nxt_locked_work_queue_t *lwq, nxt_task_t **task,
2511Sigor@sysoev.ru     void **obj, void **data)
2520Sigor@sysoev.ru {
2534Sigor@sysoev.ru     nxt_work_t          *work;
2540Sigor@sysoev.ru     nxt_work_handler_t  handler;
2550Sigor@sysoev.ru 
2564Sigor@sysoev.ru     handler = NULL;
2574Sigor@sysoev.ru 
2580Sigor@sysoev.ru     nxt_thread_spin_lock(&lwq->lock);
2590Sigor@sysoev.ru 
2604Sigor@sysoev.ru     work = lwq->head;
2614Sigor@sysoev.ru 
2624Sigor@sysoev.ru     if (work != NULL) {
2634Sigor@sysoev.ru         *task = work->task;
2644Sigor@sysoev.ru 
2654Sigor@sysoev.ru         *obj = work->obj;
2664Sigor@sysoev.ru         nxt_prefetch(*obj);
2674Sigor@sysoev.ru 
2684Sigor@sysoev.ru         *data = work->data;
2694Sigor@sysoev.ru         nxt_prefetch(*data);
2704Sigor@sysoev.ru 
2714Sigor@sysoev.ru         lwq->head = work->next;
2724Sigor@sysoev.ru 
2734Sigor@sysoev.ru         if (work->next == NULL) {
2744Sigor@sysoev.ru             lwq->tail = NULL;
2754Sigor@sysoev.ru         }
2764Sigor@sysoev.ru 
2774Sigor@sysoev.ru         handler = work->handler;
2784Sigor@sysoev.ru     }
2790Sigor@sysoev.ru 
2800Sigor@sysoev.ru     nxt_thread_spin_unlock(&lwq->lock);
2810Sigor@sysoev.ru 
2820Sigor@sysoev.ru     return handler;
2830Sigor@sysoev.ru }
2840Sigor@sysoev.ru 
2850Sigor@sysoev.ru 
2860Sigor@sysoev.ru /* Move all works from a locked work queue to a usual work queue. */
2870Sigor@sysoev.ru 
2880Sigor@sysoev.ru void
nxt_locked_work_queue_move(nxt_thread_t * thr,nxt_locked_work_queue_t * lwq,nxt_work_queue_t * wq)2890Sigor@sysoev.ru nxt_locked_work_queue_move(nxt_thread_t *thr, nxt_locked_work_queue_t *lwq,
2900Sigor@sysoev.ru     nxt_work_queue_t *wq)
2910Sigor@sysoev.ru {
2924Sigor@sysoev.ru     nxt_work_t  *work;
2930Sigor@sysoev.ru 
2940Sigor@sysoev.ru     nxt_thread_spin_lock(&lwq->lock);
2950Sigor@sysoev.ru 
2964Sigor@sysoev.ru     work = lwq->head;
2970Sigor@sysoev.ru 
2984Sigor@sysoev.ru     lwq->head = NULL;
2994Sigor@sysoev.ru     lwq->tail = NULL;
3000Sigor@sysoev.ru 
3010Sigor@sysoev.ru     nxt_thread_spin_unlock(&lwq->lock);
3024Sigor@sysoev.ru 
3034Sigor@sysoev.ru     while (work != NULL) {
3044Sigor@sysoev.ru         work->task->thread = thr;
3054Sigor@sysoev.ru 
3064Sigor@sysoev.ru         nxt_work_queue_add(wq, work->handler, work->task,
3074Sigor@sysoev.ru                            work->obj, work->data);
3084Sigor@sysoev.ru 
3094Sigor@sysoev.ru         work = work->next;
3104Sigor@sysoev.ru     }
3110Sigor@sysoev.ru }
312