1
2 /*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7 #include <nxt_main.h>
8
9
10 /*
11 * Available work items are crucial for overall engine operation, so
12 * the items are preallocated in two chunks: cache and spare chunks.
13 * By default each chunk preallocates 409 work items on two or four
14 * CPU pages depending on platform. If all items in a cache chunk are
15 * exhausted then a spare chunk becomes a cache chunk, and a new spare
16 * chunk is allocated. This two-step allocation mitigates low memory
17 * condition impact on work queue operation. However, if both chunks
18 * are exhausted then a thread will sleep in reliance on another thread
19 * frees some memory. However, this may lead to deadlock and probably
20 * a process should be aborted. This behaviour should be considered as
21 * abort on program stack exhaustion.
22 *
23 * The cache and spare chunks initially are also allocated in two steps:
24 * a spare chunk is allocated first, then it becomes the cache chunk and
25 * a new spare chunk is allocated again.
26 */
27
28 static void nxt_work_queue_allocate(nxt_work_queue_cache_t *cache);
29
30
31 /* It should be adjusted with the "work_queue_bucket_items" directive. */
32 static nxt_uint_t nxt_work_queue_bucket_items = 409;
33
34
35 #if (NXT_DEBUG)
36
37 nxt_inline void
nxt_work_queue_thread_assert(nxt_work_queue_t * wq)38 nxt_work_queue_thread_assert(nxt_work_queue_t *wq)
39 {
40 nxt_tid_t tid;
41 nxt_thread_t *thread;
42
43 thread = nxt_thread();
44 tid = nxt_thread_tid(thread);
45
46 if (nxt_fast_path(wq->tid == tid)) {
47 return;
48 }
49
50 if (nxt_slow_path(nxt_pid != wq->pid)) {
51 wq->pid = nxt_pid;
52 wq->tid = tid;
53
54 return;
55 }
56
57 nxt_log_alert(thread->log, "work queue locked by thread %PT", wq->tid);
58 nxt_abort();
59 }
60
61
nxt_work_queue_thread_adopt(nxt_work_queue_t * wq)62 void nxt_work_queue_thread_adopt(nxt_work_queue_t *wq)
63 {
64 nxt_thread_t *thread;
65
66 thread = nxt_thread();
67
68 wq->pid = nxt_pid;
69 wq->tid = nxt_thread_tid(thread);
70 }
71
72
73 void
nxt_work_queue_name(nxt_work_queue_t * wq,const char * name)74 nxt_work_queue_name(nxt_work_queue_t *wq, const char *name)
75 {
76 nxt_work_queue_thread_assert(wq);
77
78 wq->name = name;
79 }
80
81 #else
82
83 #define nxt_work_queue_thread_assert(wq)
84
85 #endif
86
87
88 void
nxt_work_queue_cache_create(nxt_work_queue_cache_t * cache,size_t chunk_size)89 nxt_work_queue_cache_create(nxt_work_queue_cache_t *cache, size_t chunk_size)
90 {
91 nxt_memzero(cache, sizeof(nxt_work_queue_cache_t));
92
93 if (chunk_size == 0) {
94 chunk_size = nxt_work_queue_bucket_items;
95 }
96
97 /* nxt_work_queue_chunk_t already has one work item. */
98 cache->chunk_size = chunk_size - 1;
99
100 while (cache->next == NULL) {
101 nxt_work_queue_allocate(cache);
102 }
103 }
104
105
106 void
nxt_work_queue_cache_destroy(nxt_work_queue_cache_t * cache)107 nxt_work_queue_cache_destroy(nxt_work_queue_cache_t *cache)
108 {
109 nxt_work_queue_chunk_t *chunk, *next;
110
111 for (chunk = cache->chunk; chunk; chunk = next) {
112 next = chunk->next;
113 nxt_free(chunk);
114 }
115 }
116
117
118 static void
nxt_work_queue_allocate(nxt_work_queue_cache_t * cache)119 nxt_work_queue_allocate(nxt_work_queue_cache_t *cache)
120 {
121 size_t size;
122 nxt_uint_t i, n;
123 nxt_work_t *work;
124 nxt_work_queue_chunk_t *chunk;
125
126 n = cache->chunk_size;
127 size = sizeof(nxt_work_queue_chunk_t) + n * sizeof(nxt_work_t);
128
129 chunk = nxt_malloc(size);
130
131 if (nxt_fast_path(chunk != NULL)) {
132
133 chunk->next = cache->chunk;
134 cache->chunk = chunk;
135 work = &chunk->work;
136
137 for (i = 0; i < n; i++) {
138 work[i].next = &work[i + 1];
139 }
140
141 work[i].next = NULL;
142 work++;
143
144 } else if (cache->spare != NULL) {
145
146 work = NULL;
147
148 } else {
149 return;
150 }
151
152 cache->next = cache->spare;
153 cache->spare = work;
154 }
155
156
157 /* Add a work to a work queue tail. */
158
159 void
nxt_work_queue_add(nxt_work_queue_t * wq,nxt_work_handler_t handler,nxt_task_t * task,void * obj,void * data)160 nxt_work_queue_add(nxt_work_queue_t *wq, nxt_work_handler_t handler,
161 nxt_task_t *task, void *obj, void *data)
162 {
163 nxt_work_t *work;
164
165 nxt_work_queue_thread_assert(wq);
166
167 for ( ;; ) {
168 work = wq->cache->next;
169
170 if (nxt_fast_path(work != NULL)) {
171 wq->cache->next = work->next;
172 work->next = NULL;
173
174 work->handler = handler;
175 work->task = task;
176 work->obj = obj;
177 work->data = data;
178
179 if (wq->tail != NULL) {
180 wq->tail->next = work;
181
182 } else {
183 wq->head = work;
184 }
185
186 wq->tail = work;
187
188 return;
189 }
190
191 nxt_work_queue_allocate(wq->cache);
192 }
193 }
194
195
196 nxt_work_handler_t
nxt_work_queue_pop(nxt_work_queue_t * wq,nxt_task_t ** task,void ** obj,void ** data)197 nxt_work_queue_pop(nxt_work_queue_t *wq, nxt_task_t **task, void **obj,
198 void **data)
199 {
200 nxt_work_t *work;
201
202 nxt_work_queue_thread_assert(wq);
203
204 work = wq->head;
205
206 wq->head = work->next;
207
208 if (work->next == NULL) {
209 wq->tail = NULL;
210 }
211
212 *task = work->task;
213
214 *obj = work->obj;
215 nxt_prefetch(*obj);
216
217 *data = work->data;
218 nxt_prefetch(*data);
219
220 work->next = wq->cache->next;
221 wq->cache->next = work;
222
223 return work->handler;
224 }
225
226
227 /* Add a work to a locked work queue tail. */
228
229 void
nxt_locked_work_queue_add(nxt_locked_work_queue_t * lwq,nxt_work_t * work)230 nxt_locked_work_queue_add(nxt_locked_work_queue_t *lwq, nxt_work_t *work)
231 {
232 nxt_thread_spin_lock(&lwq->lock);
233
234 if (lwq->tail != NULL) {
235 lwq->tail->next = work;
236
237 } else {
238 lwq->head = work;
239 }
240
241 lwq->tail = work;
242
243 nxt_thread_spin_unlock(&lwq->lock);
244 }
245
246
247 /* Pop a work from a locked work queue head. */
248
249 nxt_work_handler_t
nxt_locked_work_queue_pop(nxt_locked_work_queue_t * lwq,nxt_task_t ** task,void ** obj,void ** data)250 nxt_locked_work_queue_pop(nxt_locked_work_queue_t *lwq, nxt_task_t **task,
251 void **obj, void **data)
252 {
253 nxt_work_t *work;
254 nxt_work_handler_t handler;
255
256 handler = NULL;
257
258 nxt_thread_spin_lock(&lwq->lock);
259
260 work = lwq->head;
261
262 if (work != NULL) {
263 *task = work->task;
264
265 *obj = work->obj;
266 nxt_prefetch(*obj);
267
268 *data = work->data;
269 nxt_prefetch(*data);
270
271 lwq->head = work->next;
272
273 if (work->next == NULL) {
274 lwq->tail = NULL;
275 }
276
277 handler = work->handler;
278 }
279
280 nxt_thread_spin_unlock(&lwq->lock);
281
282 return handler;
283 }
284
285
286 /* Move all works from a locked work queue to a usual work queue. */
287
288 void
nxt_locked_work_queue_move(nxt_thread_t * thr,nxt_locked_work_queue_t * lwq,nxt_work_queue_t * wq)289 nxt_locked_work_queue_move(nxt_thread_t *thr, nxt_locked_work_queue_t *lwq,
290 nxt_work_queue_t *wq)
291 {
292 nxt_work_t *work;
293
294 nxt_thread_spin_lock(&lwq->lock);
295
296 work = lwq->head;
297
298 lwq->head = NULL;
299 lwq->tail = NULL;
300
301 nxt_thread_spin_unlock(&lwq->lock);
302
303 while (work != NULL) {
304 work->task->thread = thr;
305
306 nxt_work_queue_add(wq, work->handler, work->task,
307 work->obj, work->data);
308
309 work = work->next;
310 }
311 }
312