10Sigor@sysoev.ru 20Sigor@sysoev.ru /* 30Sigor@sysoev.ru * Copyright (C) Igor Sysoev 40Sigor@sysoev.ru * Copyright (C) NGINX, Inc. 50Sigor@sysoev.ru */ 60Sigor@sysoev.ru 70Sigor@sysoev.ru #include <nxt_main.h> 80Sigor@sysoev.ru 90Sigor@sysoev.ru 100Sigor@sysoev.ru /* 110Sigor@sysoev.ru * Available work items are crucial for overall engine operation, so 120Sigor@sysoev.ru * the items are preallocated in two chunks: cache and spare chunks. 130Sigor@sysoev.ru * By default each chunk preallocates 409 work items on two or four 140Sigor@sysoev.ru * CPU pages depending on platform. If all items in a cache chunk are 150Sigor@sysoev.ru * exhausted then a spare chunk becomes a cache chunk, and a new spare 160Sigor@sysoev.ru * chunk is allocated. This two-step allocation mitigates low memory 170Sigor@sysoev.ru * condition impact on work queue operation. However, if both chunks 180Sigor@sysoev.ru * are exhausted then a thread will sleep in reliance on another thread 190Sigor@sysoev.ru * frees some memory. However, this may lead to deadlock and probably 200Sigor@sysoev.ru * a process should be aborted. This behaviour should be considered as 210Sigor@sysoev.ru * abort on program stack exhaustion. 220Sigor@sysoev.ru * 230Sigor@sysoev.ru * The cache and spare chunks initially are also allocated in two steps: 240Sigor@sysoev.ru * a spare chunk is allocated first, then it becomes the cache chunk and 250Sigor@sysoev.ru * a new spare chunk is allocated again. 260Sigor@sysoev.ru */ 270Sigor@sysoev.ru 284Sigor@sysoev.ru static void nxt_work_queue_allocate(nxt_work_queue_cache_t *cache); 290Sigor@sysoev.ru 300Sigor@sysoev.ru 310Sigor@sysoev.ru /* It should be adjusted with the "work_queue_bucket_items" directive. */ 320Sigor@sysoev.ru static nxt_uint_t nxt_work_queue_bucket_items = 409; 330Sigor@sysoev.ru 340Sigor@sysoev.ru 35127Smax.romanov@nginx.com #if (NXT_DEBUG) 36127Smax.romanov@nginx.com 37127Smax.romanov@nginx.com nxt_inline void 38127Smax.romanov@nginx.com nxt_work_queue_thread_assert(nxt_work_queue_t *wq) 39127Smax.romanov@nginx.com { 40127Smax.romanov@nginx.com nxt_tid_t tid; 41127Smax.romanov@nginx.com nxt_thread_t *thread; 42127Smax.romanov@nginx.com 43127Smax.romanov@nginx.com thread = nxt_thread(); 44127Smax.romanov@nginx.com tid = nxt_thread_tid(thread); 45127Smax.romanov@nginx.com 46127Smax.romanov@nginx.com if (nxt_fast_path(wq->tid == tid)) { 47127Smax.romanov@nginx.com return; 48127Smax.romanov@nginx.com } 49127Smax.romanov@nginx.com 50127Smax.romanov@nginx.com if (nxt_slow_path(nxt_pid != wq->pid)) { 51127Smax.romanov@nginx.com wq->pid = nxt_pid; 52127Smax.romanov@nginx.com wq->tid = tid; 53127Smax.romanov@nginx.com 54127Smax.romanov@nginx.com return; 55127Smax.romanov@nginx.com } 56127Smax.romanov@nginx.com 57127Smax.romanov@nginx.com nxt_log_alert(thread->log, "work queue locked by thread %PT", wq->tid); 58127Smax.romanov@nginx.com nxt_abort(); 59127Smax.romanov@nginx.com } 60127Smax.romanov@nginx.com 61165Smax.romanov@nginx.com 62165Smax.romanov@nginx.com void nxt_work_queue_thread_adopt(nxt_work_queue_t *wq) 63165Smax.romanov@nginx.com { 64165Smax.romanov@nginx.com nxt_thread_t *thread; 65165Smax.romanov@nginx.com 66165Smax.romanov@nginx.com thread = nxt_thread(); 67165Smax.romanov@nginx.com 68165Smax.romanov@nginx.com wq->pid = nxt_pid; 69165Smax.romanov@nginx.com wq->tid = nxt_thread_tid(thread); 70165Smax.romanov@nginx.com } 71165Smax.romanov@nginx.com 72165Smax.romanov@nginx.com 73165Smax.romanov@nginx.com void 74*521Szelenkov@nginx.com nxt_work_queue_name(nxt_work_queue_t *wq, const char *name) 75165Smax.romanov@nginx.com { 76165Smax.romanov@nginx.com nxt_work_queue_thread_assert(wq); 77165Smax.romanov@nginx.com 78165Smax.romanov@nginx.com wq->name = name; 79165Smax.romanov@nginx.com } 80165Smax.romanov@nginx.com 81127Smax.romanov@nginx.com #else 82127Smax.romanov@nginx.com 83127Smax.romanov@nginx.com #define nxt_work_queue_thread_assert(wq) 84127Smax.romanov@nginx.com 85127Smax.romanov@nginx.com #endif 86127Smax.romanov@nginx.com 87127Smax.romanov@nginx.com 880Sigor@sysoev.ru void 894Sigor@sysoev.ru nxt_work_queue_cache_create(nxt_work_queue_cache_t *cache, size_t chunk_size) 900Sigor@sysoev.ru { 914Sigor@sysoev.ru nxt_memzero(cache, sizeof(nxt_work_queue_cache_t)); 920Sigor@sysoev.ru 930Sigor@sysoev.ru if (chunk_size == 0) { 940Sigor@sysoev.ru chunk_size = nxt_work_queue_bucket_items; 950Sigor@sysoev.ru } 960Sigor@sysoev.ru 970Sigor@sysoev.ru /* nxt_work_queue_chunk_t already has one work item. */ 984Sigor@sysoev.ru cache->chunk_size = chunk_size - 1; 990Sigor@sysoev.ru 1004Sigor@sysoev.ru while (cache->next == NULL) { 1014Sigor@sysoev.ru nxt_work_queue_allocate(cache); 1020Sigor@sysoev.ru } 1030Sigor@sysoev.ru } 1040Sigor@sysoev.ru 1050Sigor@sysoev.ru 1060Sigor@sysoev.ru void 1074Sigor@sysoev.ru nxt_work_queue_cache_destroy(nxt_work_queue_cache_t *cache) 1080Sigor@sysoev.ru { 1090Sigor@sysoev.ru nxt_work_queue_chunk_t *chunk, *next; 1100Sigor@sysoev.ru 1114Sigor@sysoev.ru for (chunk = cache->chunk; chunk; chunk = next) { 1120Sigor@sysoev.ru next = chunk->next; 1130Sigor@sysoev.ru nxt_free(chunk); 1140Sigor@sysoev.ru } 1150Sigor@sysoev.ru } 1160Sigor@sysoev.ru 1170Sigor@sysoev.ru 1180Sigor@sysoev.ru static void 1194Sigor@sysoev.ru nxt_work_queue_allocate(nxt_work_queue_cache_t *cache) 1200Sigor@sysoev.ru { 1210Sigor@sysoev.ru size_t size; 1220Sigor@sysoev.ru nxt_uint_t i, n; 1230Sigor@sysoev.ru nxt_work_t *work; 1240Sigor@sysoev.ru nxt_work_queue_chunk_t *chunk; 1250Sigor@sysoev.ru 1260Sigor@sysoev.ru n = cache->chunk_size; 1270Sigor@sysoev.ru size = sizeof(nxt_work_queue_chunk_t) + n * sizeof(nxt_work_t); 1280Sigor@sysoev.ru 1290Sigor@sysoev.ru chunk = nxt_malloc(size); 1300Sigor@sysoev.ru 1310Sigor@sysoev.ru if (nxt_fast_path(chunk != NULL)) { 1320Sigor@sysoev.ru 1330Sigor@sysoev.ru chunk->next = cache->chunk; 1340Sigor@sysoev.ru cache->chunk = chunk; 1350Sigor@sysoev.ru work = &chunk->work; 1360Sigor@sysoev.ru 1370Sigor@sysoev.ru for (i = 0; i < n; i++) { 1380Sigor@sysoev.ru work[i].next = &work[i + 1]; 1390Sigor@sysoev.ru } 1400Sigor@sysoev.ru 1410Sigor@sysoev.ru work[i].next = NULL; 1420Sigor@sysoev.ru work++; 1430Sigor@sysoev.ru 1440Sigor@sysoev.ru } else if (cache->spare != NULL) { 1450Sigor@sysoev.ru 1460Sigor@sysoev.ru work = NULL; 1470Sigor@sysoev.ru 1480Sigor@sysoev.ru } else { 1490Sigor@sysoev.ru return; 1500Sigor@sysoev.ru } 1510Sigor@sysoev.ru 1520Sigor@sysoev.ru cache->next = cache->spare; 1530Sigor@sysoev.ru cache->spare = work; 1540Sigor@sysoev.ru } 1550Sigor@sysoev.ru 1560Sigor@sysoev.ru 1570Sigor@sysoev.ru /* Add a work to a work queue tail. */ 1580Sigor@sysoev.ru 1590Sigor@sysoev.ru void 1604Sigor@sysoev.ru nxt_work_queue_add(nxt_work_queue_t *wq, nxt_work_handler_t handler, 1614Sigor@sysoev.ru nxt_task_t *task, void *obj, void *data) 1620Sigor@sysoev.ru { 1630Sigor@sysoev.ru nxt_work_t *work; 1640Sigor@sysoev.ru 165127Smax.romanov@nginx.com nxt_work_queue_thread_assert(wq); 166127Smax.romanov@nginx.com 1670Sigor@sysoev.ru for ( ;; ) { 1684Sigor@sysoev.ru work = wq->cache->next; 1690Sigor@sysoev.ru 1700Sigor@sysoev.ru if (nxt_fast_path(work != NULL)) { 1714Sigor@sysoev.ru wq->cache->next = work->next; 1720Sigor@sysoev.ru work->next = NULL; 1730Sigor@sysoev.ru 1740Sigor@sysoev.ru work->handler = handler; 1751Sigor@sysoev.ru work->task = task; 1760Sigor@sysoev.ru work->obj = obj; 1770Sigor@sysoev.ru work->data = data; 1780Sigor@sysoev.ru 1790Sigor@sysoev.ru if (wq->tail != NULL) { 1800Sigor@sysoev.ru wq->tail->next = work; 1810Sigor@sysoev.ru 1820Sigor@sysoev.ru } else { 1830Sigor@sysoev.ru wq->head = work; 1840Sigor@sysoev.ru } 1850Sigor@sysoev.ru 1860Sigor@sysoev.ru wq->tail = work; 1870Sigor@sysoev.ru 1880Sigor@sysoev.ru return; 1890Sigor@sysoev.ru } 1900Sigor@sysoev.ru 1914Sigor@sysoev.ru nxt_work_queue_allocate(wq->cache); 1920Sigor@sysoev.ru } 1930Sigor@sysoev.ru } 1940Sigor@sysoev.ru 1950Sigor@sysoev.ru 1960Sigor@sysoev.ru nxt_work_handler_t 1974Sigor@sysoev.ru nxt_work_queue_pop(nxt_work_queue_t *wq, nxt_task_t **task, void **obj, 1981Sigor@sysoev.ru void **data) 1990Sigor@sysoev.ru { 2000Sigor@sysoev.ru nxt_work_t *work; 2010Sigor@sysoev.ru 202127Smax.romanov@nginx.com nxt_work_queue_thread_assert(wq); 203127Smax.romanov@nginx.com 2044Sigor@sysoev.ru work = wq->head; 2050Sigor@sysoev.ru 2064Sigor@sysoev.ru wq->head = work->next; 2070Sigor@sysoev.ru 2084Sigor@sysoev.ru if (work->next == NULL) { 2094Sigor@sysoev.ru wq->tail = NULL; 2100Sigor@sysoev.ru } 2110Sigor@sysoev.ru 2124Sigor@sysoev.ru *task = work->task; 2130Sigor@sysoev.ru 2144Sigor@sysoev.ru *obj = work->obj; 2154Sigor@sysoev.ru nxt_prefetch(*obj); 2160Sigor@sysoev.ru 2174Sigor@sysoev.ru *data = work->data; 2184Sigor@sysoev.ru nxt_prefetch(*data); 2190Sigor@sysoev.ru 2204Sigor@sysoev.ru work->next = wq->cache->next; 2214Sigor@sysoev.ru wq->cache->next = work; 2220Sigor@sysoev.ru 2234Sigor@sysoev.ru return work->handler; 2240Sigor@sysoev.ru } 2250Sigor@sysoev.ru 2260Sigor@sysoev.ru 2270Sigor@sysoev.ru /* Add a work to a locked work queue tail. */ 2280Sigor@sysoev.ru 2290Sigor@sysoev.ru void 2304Sigor@sysoev.ru nxt_locked_work_queue_add(nxt_locked_work_queue_t *lwq, nxt_work_t *work) 2310Sigor@sysoev.ru { 2320Sigor@sysoev.ru nxt_thread_spin_lock(&lwq->lock); 2330Sigor@sysoev.ru 2344Sigor@sysoev.ru if (lwq->tail != NULL) { 2354Sigor@sysoev.ru lwq->tail->next = work; 2360Sigor@sysoev.ru 2374Sigor@sysoev.ru } else { 2384Sigor@sysoev.ru lwq->head = work; 2394Sigor@sysoev.ru } 2400Sigor@sysoev.ru 2414Sigor@sysoev.ru lwq->tail = work; 2420Sigor@sysoev.ru 2430Sigor@sysoev.ru nxt_thread_spin_unlock(&lwq->lock); 2440Sigor@sysoev.ru } 2450Sigor@sysoev.ru 2460Sigor@sysoev.ru 2470Sigor@sysoev.ru /* Pop a work from a locked work queue head. */ 2480Sigor@sysoev.ru 2490Sigor@sysoev.ru nxt_work_handler_t 2501Sigor@sysoev.ru nxt_locked_work_queue_pop(nxt_locked_work_queue_t *lwq, nxt_task_t **task, 2511Sigor@sysoev.ru void **obj, void **data) 2520Sigor@sysoev.ru { 2534Sigor@sysoev.ru nxt_work_t *work; 2540Sigor@sysoev.ru nxt_work_handler_t handler; 2550Sigor@sysoev.ru 2564Sigor@sysoev.ru handler = NULL; 2574Sigor@sysoev.ru 2580Sigor@sysoev.ru nxt_thread_spin_lock(&lwq->lock); 2590Sigor@sysoev.ru 2604Sigor@sysoev.ru work = lwq->head; 2614Sigor@sysoev.ru 2624Sigor@sysoev.ru if (work != NULL) { 2634Sigor@sysoev.ru *task = work->task; 2644Sigor@sysoev.ru 2654Sigor@sysoev.ru *obj = work->obj; 2664Sigor@sysoev.ru nxt_prefetch(*obj); 2674Sigor@sysoev.ru 2684Sigor@sysoev.ru *data = work->data; 2694Sigor@sysoev.ru nxt_prefetch(*data); 2704Sigor@sysoev.ru 2714Sigor@sysoev.ru lwq->head = work->next; 2724Sigor@sysoev.ru 2734Sigor@sysoev.ru if (work->next == NULL) { 2744Sigor@sysoev.ru lwq->tail = NULL; 2754Sigor@sysoev.ru } 2764Sigor@sysoev.ru 2774Sigor@sysoev.ru handler = work->handler; 2784Sigor@sysoev.ru } 2790Sigor@sysoev.ru 2800Sigor@sysoev.ru nxt_thread_spin_unlock(&lwq->lock); 2810Sigor@sysoev.ru 2820Sigor@sysoev.ru return handler; 2830Sigor@sysoev.ru } 2840Sigor@sysoev.ru 2850Sigor@sysoev.ru 2860Sigor@sysoev.ru /* Move all works from a locked work queue to a usual work queue. */ 2870Sigor@sysoev.ru 2880Sigor@sysoev.ru void 2890Sigor@sysoev.ru nxt_locked_work_queue_move(nxt_thread_t *thr, nxt_locked_work_queue_t *lwq, 2900Sigor@sysoev.ru nxt_work_queue_t *wq) 2910Sigor@sysoev.ru { 2924Sigor@sysoev.ru nxt_work_t *work; 2930Sigor@sysoev.ru 2940Sigor@sysoev.ru nxt_thread_spin_lock(&lwq->lock); 2950Sigor@sysoev.ru 2964Sigor@sysoev.ru work = lwq->head; 2970Sigor@sysoev.ru 2984Sigor@sysoev.ru lwq->head = NULL; 2994Sigor@sysoev.ru lwq->tail = NULL; 3000Sigor@sysoev.ru 3010Sigor@sysoev.ru nxt_thread_spin_unlock(&lwq->lock); 3024Sigor@sysoev.ru 3034Sigor@sysoev.ru while (work != NULL) { 3044Sigor@sysoev.ru work->task->thread = thr; 3054Sigor@sysoev.ru 3064Sigor@sysoev.ru nxt_work_queue_add(wq, work->handler, work->task, 3074Sigor@sysoev.ru work->obj, work->data); 3084Sigor@sysoev.ru 3094Sigor@sysoev.ru work = work->next; 3104Sigor@sysoev.ru } 3110Sigor@sysoev.ru } 312