xref: /unit/src/nxt_work_queue.c (revision 521:93dc4a28dd37)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 /*
11  * Available work items are crucial for overall engine operation, so
12  * the items are preallocated in two chunks: cache and spare chunks.
13  * By default each chunk preallocates 409 work items on two or four
14  * CPU pages depending on platform.  If all items in a cache chunk are
15  * exhausted then a spare chunk becomes a cache chunk, and a new spare
16  * chunk is allocated.  This two-step allocation mitigates low memory
17  * condition impact on work queue operation.  However, if both chunks
18  * are exhausted then a thread will sleep in reliance on another thread
19  * frees some memory.  However, this may lead to deadlock and probably
20  * a process should be aborted.  This behaviour should be considered as
21  * abort on program stack exhaustion.
22  *
23  * The cache and spare chunks initially are also allocated in two steps:
24  * a spare chunk is allocated first, then it becomes the cache chunk and
25  * a new spare chunk is allocated again.
26  */
27 
28 static void nxt_work_queue_allocate(nxt_work_queue_cache_t *cache);
29 
30 
31 /* It should be adjusted with the "work_queue_bucket_items" directive. */
32 static nxt_uint_t  nxt_work_queue_bucket_items = 409;
33 
34 
35 #if (NXT_DEBUG)
36 
37 nxt_inline void
nxt_work_queue_thread_assert(nxt_work_queue_t * wq)38 nxt_work_queue_thread_assert(nxt_work_queue_t *wq)
39 {
40     nxt_tid_t     tid;
41     nxt_thread_t  *thread;
42 
43     thread = nxt_thread();
44     tid = nxt_thread_tid(thread);
45 
46     if (nxt_fast_path(wq->tid == tid)) {
47         return;
48     }
49 
50     if (nxt_slow_path(nxt_pid != wq->pid)) {
51         wq->pid = nxt_pid;
52         wq->tid = tid;
53 
54         return;
55     }
56 
57     nxt_log_alert(thread->log, "work queue locked by thread %PT", wq->tid);
58     nxt_abort();
59 }
60 
61 
nxt_work_queue_thread_adopt(nxt_work_queue_t * wq)62 void nxt_work_queue_thread_adopt(nxt_work_queue_t *wq)
63 {
64     nxt_thread_t  *thread;
65 
66     thread = nxt_thread();
67 
68     wq->pid = nxt_pid;
69     wq->tid = nxt_thread_tid(thread);
70 }
71 
72 
73 void
nxt_work_queue_name(nxt_work_queue_t * wq,const char * name)74 nxt_work_queue_name(nxt_work_queue_t *wq, const char *name)
75 {
76     nxt_work_queue_thread_assert(wq);
77 
78     wq->name = name;
79 }
80 
81 #else
82 
83 #define nxt_work_queue_thread_assert(wq)
84 
85 #endif
86 
87 
88 void
nxt_work_queue_cache_create(nxt_work_queue_cache_t * cache,size_t chunk_size)89 nxt_work_queue_cache_create(nxt_work_queue_cache_t *cache, size_t chunk_size)
90 {
91     nxt_memzero(cache, sizeof(nxt_work_queue_cache_t));
92 
93     if (chunk_size == 0) {
94         chunk_size = nxt_work_queue_bucket_items;
95     }
96 
97     /* nxt_work_queue_chunk_t already has one work item. */
98     cache->chunk_size = chunk_size - 1;
99 
100     while (cache->next == NULL) {
101         nxt_work_queue_allocate(cache);
102     }
103 }
104 
105 
106 void
nxt_work_queue_cache_destroy(nxt_work_queue_cache_t * cache)107 nxt_work_queue_cache_destroy(nxt_work_queue_cache_t *cache)
108 {
109     nxt_work_queue_chunk_t  *chunk, *next;
110 
111     for (chunk = cache->chunk; chunk; chunk = next) {
112         next = chunk->next;
113         nxt_free(chunk);
114     }
115 }
116 
117 
118 static void
nxt_work_queue_allocate(nxt_work_queue_cache_t * cache)119 nxt_work_queue_allocate(nxt_work_queue_cache_t *cache)
120 {
121     size_t                  size;
122     nxt_uint_t              i, n;
123     nxt_work_t              *work;
124     nxt_work_queue_chunk_t  *chunk;
125 
126     n = cache->chunk_size;
127     size = sizeof(nxt_work_queue_chunk_t) + n * sizeof(nxt_work_t);
128 
129     chunk = nxt_malloc(size);
130 
131     if (nxt_fast_path(chunk != NULL)) {
132 
133         chunk->next = cache->chunk;
134         cache->chunk = chunk;
135         work = &chunk->work;
136 
137         for (i = 0; i < n; i++) {
138             work[i].next = &work[i + 1];
139         }
140 
141         work[i].next = NULL;
142         work++;
143 
144     } else if (cache->spare != NULL) {
145 
146         work = NULL;
147 
148     } else {
149         return;
150     }
151 
152     cache->next = cache->spare;
153     cache->spare = work;
154 }
155 
156 
157 /* Add a work to a work queue tail. */
158 
159 void
nxt_work_queue_add(nxt_work_queue_t * wq,nxt_work_handler_t handler,nxt_task_t * task,void * obj,void * data)160 nxt_work_queue_add(nxt_work_queue_t *wq, nxt_work_handler_t handler,
161     nxt_task_t *task, void *obj, void *data)
162 {
163     nxt_work_t  *work;
164 
165     nxt_work_queue_thread_assert(wq);
166 
167     for ( ;; ) {
168         work = wq->cache->next;
169 
170         if (nxt_fast_path(work != NULL)) {
171             wq->cache->next = work->next;
172             work->next = NULL;
173 
174             work->handler = handler;
175             work->task = task;
176             work->obj = obj;
177             work->data = data;
178 
179             if (wq->tail != NULL) {
180                 wq->tail->next = work;
181 
182             } else {
183                 wq->head = work;
184             }
185 
186             wq->tail = work;
187 
188             return;
189         }
190 
191         nxt_work_queue_allocate(wq->cache);
192     }
193 }
194 
195 
196 nxt_work_handler_t
nxt_work_queue_pop(nxt_work_queue_t * wq,nxt_task_t ** task,void ** obj,void ** data)197 nxt_work_queue_pop(nxt_work_queue_t *wq, nxt_task_t **task, void **obj,
198     void **data)
199 {
200     nxt_work_t  *work;
201 
202     nxt_work_queue_thread_assert(wq);
203 
204     work = wq->head;
205 
206     wq->head = work->next;
207 
208     if (work->next == NULL) {
209         wq->tail = NULL;
210     }
211 
212     *task = work->task;
213 
214     *obj = work->obj;
215     nxt_prefetch(*obj);
216 
217     *data = work->data;
218     nxt_prefetch(*data);
219 
220     work->next = wq->cache->next;
221     wq->cache->next = work;
222 
223     return work->handler;
224 }
225 
226 
227 /* Add a work to a locked work queue tail. */
228 
229 void
nxt_locked_work_queue_add(nxt_locked_work_queue_t * lwq,nxt_work_t * work)230 nxt_locked_work_queue_add(nxt_locked_work_queue_t *lwq, nxt_work_t *work)
231 {
232     nxt_thread_spin_lock(&lwq->lock);
233 
234     if (lwq->tail != NULL) {
235         lwq->tail->next = work;
236 
237     } else {
238         lwq->head = work;
239     }
240 
241     lwq->tail = work;
242 
243     nxt_thread_spin_unlock(&lwq->lock);
244 }
245 
246 
247 /* Pop a work from a locked work queue head. */
248 
249 nxt_work_handler_t
nxt_locked_work_queue_pop(nxt_locked_work_queue_t * lwq,nxt_task_t ** task,void ** obj,void ** data)250 nxt_locked_work_queue_pop(nxt_locked_work_queue_t *lwq, nxt_task_t **task,
251     void **obj, void **data)
252 {
253     nxt_work_t          *work;
254     nxt_work_handler_t  handler;
255 
256     handler = NULL;
257 
258     nxt_thread_spin_lock(&lwq->lock);
259 
260     work = lwq->head;
261 
262     if (work != NULL) {
263         *task = work->task;
264 
265         *obj = work->obj;
266         nxt_prefetch(*obj);
267 
268         *data = work->data;
269         nxt_prefetch(*data);
270 
271         lwq->head = work->next;
272 
273         if (work->next == NULL) {
274             lwq->tail = NULL;
275         }
276 
277         handler = work->handler;
278     }
279 
280     nxt_thread_spin_unlock(&lwq->lock);
281 
282     return handler;
283 }
284 
285 
286 /* Move all works from a locked work queue to a usual work queue. */
287 
288 void
nxt_locked_work_queue_move(nxt_thread_t * thr,nxt_locked_work_queue_t * lwq,nxt_work_queue_t * wq)289 nxt_locked_work_queue_move(nxt_thread_t *thr, nxt_locked_work_queue_t *lwq,
290     nxt_work_queue_t *wq)
291 {
292     nxt_work_t  *work;
293 
294     nxt_thread_spin_lock(&lwq->lock);
295 
296     work = lwq->head;
297 
298     lwq->head = NULL;
299     lwq->tail = NULL;
300 
301     nxt_thread_spin_unlock(&lwq->lock);
302 
303     while (work != NULL) {
304         work->task->thread = thr;
305 
306         nxt_work_queue_add(wq, work->handler, work->task,
307                            work->obj, work->data);
308 
309         work = work->next;
310     }
311 }
312