nxt_work_queue.c (0:a63ceefd6ab0) nxt_work_queue.c (1:fdc027c56872)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8

--- 16 unchanged lines hidden (view full) ---

25 * a new spare chunk is allocated again.
26 */
27
28static void nxt_work_queue_allocate(nxt_work_queue_cache_t *cache,
29 nxt_thread_spinlock_t *lock);
30static void nxt_work_queue_sleep(nxt_thread_spinlock_t *lock);
31static nxt_work_queue_t *nxt_thread_current_work_queue(nxt_thread_t *thr);
32static nxt_work_handler_t nxt_locked_work_queue_pop_work(
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8

--- 16 unchanged lines hidden (view full) ---

25 * a new spare chunk is allocated again.
26 */
27
28static void nxt_work_queue_allocate(nxt_work_queue_cache_t *cache,
29 nxt_thread_spinlock_t *lock);
30static void nxt_work_queue_sleep(nxt_thread_spinlock_t *lock);
31static nxt_work_queue_t *nxt_thread_current_work_queue(nxt_thread_t *thr);
32static nxt_work_handler_t nxt_locked_work_queue_pop_work(
33 nxt_locked_work_queue_t *lwq, void **obj, void **data, nxt_log_t **log);
33 nxt_locked_work_queue_t *lwq, nxt_task_t **task, void **obj, void **data);
34
35
36/* It should be adjusted with the "work_queue_bucket_items" directive. */
37static nxt_uint_t nxt_work_queue_bucket_items = 409;
38
39
40void
41nxt_thread_work_queue_create(nxt_thread_t *thr, size_t chunk_size)

--- 83 unchanged lines hidden (view full) ---

125 }
126}
127
128
129/* Add a work to a work queue tail. */
130
131void
132nxt_thread_work_queue_add(nxt_thread_t *thr, nxt_work_queue_t *wq,
34
35
36/* It should be adjusted with the "work_queue_bucket_items" directive. */
37static nxt_uint_t nxt_work_queue_bucket_items = 409;
38
39
40void
41nxt_thread_work_queue_create(nxt_thread_t *thr, size_t chunk_size)

--- 83 unchanged lines hidden (view full) ---

125 }
126}
127
128
129/* Add a work to a work queue tail. */
130
131void
132nxt_thread_work_queue_add(nxt_thread_t *thr, nxt_work_queue_t *wq,
133 nxt_work_handler_t handler, void *obj, void *data, nxt_log_t *log)
133 nxt_work_handler_t handler, nxt_task_t *task, void *obj, void *data)
134{
135 nxt_work_t *work;
136
137 nxt_work_queue_attach(thr, wq);
138
139 for ( ;; ) {
140 work = thr->work_queue.cache.next;
141
142 if (nxt_fast_path(work != NULL)) {
143 thr->work_queue.cache.next = work->next;
144 work->next = NULL;
145
146 work->handler = handler;
134{
135 nxt_work_t *work;
136
137 nxt_work_queue_attach(thr, wq);
138
139 for ( ;; ) {
140 work = thr->work_queue.cache.next;
141
142 if (nxt_fast_path(work != NULL)) {
143 thr->work_queue.cache.next = work->next;
144 work->next = NULL;
145
146 work->handler = handler;
147 work->task = task;
147 work->obj = obj;
148 work->data = data;
148 work->obj = obj;
149 work->data = data;
149 work->log = log;
150
151 if (wq->tail != NULL) {
152 wq->tail->next = work;
153
154 } else {
155 wq->head = work;
156 }
157

--- 6 unchanged lines hidden (view full) ---

164 }
165}
166
167
168/* Push a work to a work queue head. */
169
170void
171nxt_thread_work_queue_push(nxt_thread_t *thr, nxt_work_queue_t *wq,
150
151 if (wq->tail != NULL) {
152 wq->tail->next = work;
153
154 } else {
155 wq->head = work;
156 }
157

--- 6 unchanged lines hidden (view full) ---

164 }
165}
166
167
168/* Push a work to a work queue head. */
169
170void
171nxt_thread_work_queue_push(nxt_thread_t *thr, nxt_work_queue_t *wq,
172 nxt_work_handler_t handler, void *obj, void *data, nxt_log_t *log)
172 nxt_work_handler_t handler, nxt_task_t *task, void *obj, void *data)
173{
174 nxt_work_t *work;
175
176 nxt_work_queue_attach(thr, wq);
177
178 for ( ;; ) {
179 work = thr->work_queue.cache.next;
180
181 if (nxt_fast_path(work != NULL)) {
182 thr->work_queue.cache.next = work->next;
183 work->next = wq->head;
184
185 work->handler = handler;
186 work->obj = obj;
187 work->data = data;
173{
174 nxt_work_t *work;
175
176 nxt_work_queue_attach(thr, wq);
177
178 for ( ;; ) {
179 work = thr->work_queue.cache.next;
180
181 if (nxt_fast_path(work != NULL)) {
182 thr->work_queue.cache.next = work->next;
183 work->next = wq->head;
184
185 work->handler = handler;
186 work->obj = obj;
187 work->data = data;
188 work->log = log;
189
190 wq->head = work;
191
192 if (wq->tail == NULL) {
193 wq->tail = work;
194 }
195
196 return;

--- 21 unchanged lines hidden (view full) ---

218 thr->work_queue.tail = wq;
219 }
220}
221
222
223/* Pop a work from a thread work queue head. */
224
225nxt_work_handler_t
188
189 wq->head = work;
190
191 if (wq->tail == NULL) {
192 wq->tail = work;
193 }
194
195 return;

--- 21 unchanged lines hidden (view full) ---

217 thr->work_queue.tail = wq;
218 }
219}
220
221
222/* Pop a work from a thread work queue head. */
223
224nxt_work_handler_t
226nxt_thread_work_queue_pop(nxt_thread_t *thr, void **obj, void **data,
227 nxt_log_t **log)
225nxt_thread_work_queue_pop(nxt_thread_t *thr, nxt_task_t **task, void **obj,
226 void **data)
228{
229 nxt_work_t *work;
230 nxt_work_queue_t *wq;
231
232 wq = nxt_thread_current_work_queue(thr);
233
234 if (wq != NULL) {
235
236 work = wq->head;
237
238 if (work != NULL) {
239 wq->head = work->next;
240
241 if (work->next == NULL) {
242 wq->tail = NULL;
243 }
244
227{
228 nxt_work_t *work;
229 nxt_work_queue_t *wq;
230
231 wq = nxt_thread_current_work_queue(thr);
232
233 if (wq != NULL) {
234
235 work = wq->head;
236
237 if (work != NULL) {
238 wq->head = work->next;
239
240 if (work->next == NULL) {
241 wq->tail = NULL;
242 }
243
244 *task = work->task;
245 *obj = work->obj;
246 nxt_prefetch(*obj);
247 *data = work->data;
248 nxt_prefetch(*data);
249
250 work->next = thr->work_queue.cache.next;
251 thr->work_queue.cache.next = work;
252
245 *obj = work->obj;
246 nxt_prefetch(*obj);
247 *data = work->data;
248 nxt_prefetch(*data);
249
250 work->next = thr->work_queue.cache.next;
251 thr->work_queue.cache.next = work;
252
253 *log = work->log;
254
255#if (NXT_DEBUG)
256
257 if (work->handler == NULL) {
258 nxt_log_alert(thr->log, "null work handler");
259 nxt_abort();
260 }
261
262#endif

--- 67 unchanged lines hidden (view full) ---

330 }
331}
332
333
334/* Add a work to the thread last work queue's tail. */
335
336void
337nxt_thread_last_work_queue_add(nxt_thread_t *thr, nxt_work_handler_t handler,
253#if (NXT_DEBUG)
254
255 if (work->handler == NULL) {
256 nxt_log_alert(thr->log, "null work handler");
257 nxt_abort();
258 }
259
260#endif

--- 67 unchanged lines hidden (view full) ---

328 }
329}
330
331
332/* Add a work to the thread last work queue's tail. */
333
334void
335nxt_thread_last_work_queue_add(nxt_thread_t *thr, nxt_work_handler_t handler,
338 void *obj, void *data, nxt_log_t *log)
336 void *obj, void *data)
339{
340 nxt_work_t *work;
341
342 for ( ;; ) {
343 work = thr->work_queue.cache.next;
344
345 if (nxt_fast_path(work != NULL)) {
346 thr->work_queue.cache.next = work->next;
347 work->next = NULL;
348
349 work->handler = handler;
350 work->obj = obj;
351 work->data = data;
337{
338 nxt_work_t *work;
339
340 for ( ;; ) {
341 work = thr->work_queue.cache.next;
342
343 if (nxt_fast_path(work != NULL)) {
344 thr->work_queue.cache.next = work->next;
345 work->next = NULL;
346
347 work->handler = handler;
348 work->obj = obj;
349 work->data = data;
352 work->log = log;
353
354 if (thr->work_queue.last.tail != NULL) {
355 thr->work_queue.last.tail->next = work;
356
357 } else {
358 thr->work_queue.last.head = work;
359 }
360

--- 5 unchanged lines hidden (view full) ---

366 nxt_work_queue_allocate(&thr->work_queue.cache, NULL);
367 }
368}
369
370
371/* Pop a work from the thread last work queue's head. */
372
373nxt_work_handler_t
350
351 if (thr->work_queue.last.tail != NULL) {
352 thr->work_queue.last.tail->next = work;
353
354 } else {
355 thr->work_queue.last.head = work;
356 }
357

--- 5 unchanged lines hidden (view full) ---

363 nxt_work_queue_allocate(&thr->work_queue.cache, NULL);
364 }
365}
366
367
368/* Pop a work from the thread last work queue's head. */
369
370nxt_work_handler_t
374nxt_thread_last_work_queue_pop(nxt_thread_t *thr, void **obj, void **data,
375 nxt_log_t **log)
371nxt_thread_last_work_queue_pop(nxt_thread_t *thr, nxt_task_t **task, void **obj,
372 void **data)
376{
377 nxt_work_t *work;
378
379 work = thr->work_queue.last.head;
380
381 if (work != NULL) {
382 nxt_log_debug(thr->log, "work queue: %s", thr->work_queue.last.name);
383
384 thr->work_queue.last.head = work->next;
385
386 if (work->next == NULL) {
387 thr->work_queue.last.tail = NULL;
388 }
389
373{
374 nxt_work_t *work;
375
376 work = thr->work_queue.last.head;
377
378 if (work != NULL) {
379 nxt_log_debug(thr->log, "work queue: %s", thr->work_queue.last.name);
380
381 thr->work_queue.last.head = work->next;
382
383 if (work->next == NULL) {
384 thr->work_queue.last.tail = NULL;
385 }
386
387 *task = work->task;
390 *obj = work->obj;
391 nxt_prefetch(*obj);
392 *data = work->data;
393 nxt_prefetch(*data);
394
395 work->next = thr->work_queue.cache.next;
396 thr->work_queue.cache.next = work;
397
388 *obj = work->obj;
389 nxt_prefetch(*obj);
390 *data = work->data;
391 nxt_prefetch(*data);
392
393 work->next = thr->work_queue.cache.next;
394 thr->work_queue.cache.next = work;
395
398 *log = work->log;
399
400#if (NXT_DEBUG)
401
402 if (work->handler == NULL) {
403 nxt_log_alert(thr->log, "null work handler");
404 nxt_abort();
405 }
406
407#endif

--- 78 unchanged lines hidden (view full) ---

486 }
487}
488
489
490/* Add a work to a locked work queue tail. */
491
492void
493nxt_locked_work_queue_add(nxt_locked_work_queue_t *lwq,
396#if (NXT_DEBUG)
397
398 if (work->handler == NULL) {
399 nxt_log_alert(thr->log, "null work handler");
400 nxt_abort();
401 }
402
403#endif

--- 78 unchanged lines hidden (view full) ---

482 }
483}
484
485
486/* Add a work to a locked work queue tail. */
487
488void
489nxt_locked_work_queue_add(nxt_locked_work_queue_t *lwq,
494 nxt_work_handler_t handler, void *obj, void *data, nxt_log_t *log)
490 nxt_work_handler_t handler, nxt_task_t *task, void *obj, void *data)
495{
496 nxt_work_t *work;
497
498 nxt_thread_spin_lock(&lwq->lock);
499
500 for ( ;; ) {
501 work = lwq->cache.next;
502
503 if (nxt_fast_path(work != NULL)) {
504 lwq->cache.next = work->next;
505
506 work->next = NULL;
507 work->handler = handler;
491{
492 nxt_work_t *work;
493
494 nxt_thread_spin_lock(&lwq->lock);
495
496 for ( ;; ) {
497 work = lwq->cache.next;
498
499 if (nxt_fast_path(work != NULL)) {
500 lwq->cache.next = work->next;
501
502 work->next = NULL;
503 work->handler = handler;
504 work->task = task;
508 work->obj = obj;
509 work->data = data;
505 work->obj = obj;
506 work->data = data;
510 work->log = log;
511
512 if (lwq->tail != NULL) {
513 lwq->tail->next = work;
514
515 } else {
516 lwq->head = work;
517 }
518

--- 7 unchanged lines hidden (view full) ---

526
527 nxt_thread_spin_unlock(&lwq->lock);
528}
529
530
531/* Pop a work from a locked work queue head. */
532
533nxt_work_handler_t
507
508 if (lwq->tail != NULL) {
509 lwq->tail->next = work;
510
511 } else {
512 lwq->head = work;
513 }
514

--- 7 unchanged lines hidden (view full) ---

522
523 nxt_thread_spin_unlock(&lwq->lock);
524}
525
526
527/* Pop a work from a locked work queue head. */
528
529nxt_work_handler_t
534nxt_locked_work_queue_pop(nxt_locked_work_queue_t *lwq, void **obj,
535 void **data, nxt_log_t **log)
530nxt_locked_work_queue_pop(nxt_locked_work_queue_t *lwq, nxt_task_t **task,
531 void **obj, void **data)
536{
537 nxt_work_handler_t handler;
538
539 nxt_thread_spin_lock(&lwq->lock);
540
532{
533 nxt_work_handler_t handler;
534
535 nxt_thread_spin_lock(&lwq->lock);
536
541 handler = nxt_locked_work_queue_pop_work(lwq, obj, data, log);
537 handler = nxt_locked_work_queue_pop_work(lwq, task, obj, data);
542
543 nxt_thread_spin_unlock(&lwq->lock);
544
545 return handler;
546}
547
548
549static nxt_work_handler_t
538
539 nxt_thread_spin_unlock(&lwq->lock);
540
541 return handler;
542}
543
544
545static nxt_work_handler_t
550nxt_locked_work_queue_pop_work(nxt_locked_work_queue_t *lwq, void **obj,
551 void **data, nxt_log_t **log)
546nxt_locked_work_queue_pop_work(nxt_locked_work_queue_t *lwq, nxt_task_t **task,
547 void **obj, void **data)
552{
553 nxt_work_t *work;
554
555 work = lwq->head;
556
557 if (work == NULL) {
558 return NULL;
559 }
560
548{
549 nxt_work_t *work;
550
551 work = lwq->head;
552
553 if (work == NULL) {
554 return NULL;
555 }
556
557 *task = work->task;
561 *obj = work->obj;
562 nxt_prefetch(*obj);
563 *data = work->data;
564 nxt_prefetch(*data);
565
566 lwq->head = work->next;
567
568 if (work->next == NULL) {
569 lwq->tail = NULL;
570 }
571
572 work->next = lwq->cache.next;
573 lwq->cache.next = work;
574
558 *obj = work->obj;
559 nxt_prefetch(*obj);
560 *data = work->data;
561 nxt_prefetch(*data);
562
563 lwq->head = work->next;
564
565 if (work->next == NULL) {
566 lwq->tail = NULL;
567 }
568
569 work->next = lwq->cache.next;
570 lwq->cache.next = work;
571
575 *log = work->log;
576
577 return work->handler;
578}
579
580
581/* Move all works from a locked work queue to a usual work queue. */
582
583void
584nxt_locked_work_queue_move(nxt_thread_t *thr, nxt_locked_work_queue_t *lwq,
585 nxt_work_queue_t *wq)
586{
587 void *obj, *data;
572 return work->handler;
573}
574
575
576/* Move all works from a locked work queue to a usual work queue. */
577
578void
579nxt_locked_work_queue_move(nxt_thread_t *thr, nxt_locked_work_queue_t *lwq,
580 nxt_work_queue_t *wq)
581{
582 void *obj, *data;
588 nxt_log_t *log;
583 nxt_task_t *task;
589 nxt_work_handler_t handler;
590
591 /* Locked work queue head can be tested without a lock. */
592
593 if (nxt_fast_path(lwq->head == NULL)) {
594 return;
595 }
596
597 nxt_thread_spin_lock(&lwq->lock);
598
599 for ( ;; ) {
584 nxt_work_handler_t handler;
585
586 /* Locked work queue head can be tested without a lock. */
587
588 if (nxt_fast_path(lwq->head == NULL)) {
589 return;
590 }
591
592 nxt_thread_spin_lock(&lwq->lock);
593
594 for ( ;; ) {
600 handler = nxt_locked_work_queue_pop_work(lwq, &obj, &data, &log);
595 handler = nxt_locked_work_queue_pop_work(lwq, &task, &obj, &data);
601
602 if (handler == NULL) {
603 break;
604 }
605
596
597 if (handler == NULL) {
598 break;
599 }
600
606 nxt_thread_work_queue_add(thr, wq, handler, obj, data, log);
601 task->thread = thr;
602
603 nxt_thread_work_queue_add(thr, wq, handler, task, obj, data);
607 }
608
609 nxt_thread_spin_unlock(&lwq->lock);
610}
604 }
605
606 nxt_thread_spin_unlock(&lwq->lock);
607}