Deleted
Added
nxt_work_queue.c (0:a63ceefd6ab0) | nxt_work_queue.c (1:fdc027c56872) |
---|---|
1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7#include <nxt_main.h> 8 --- 16 unchanged lines hidden (view full) --- 25 * a new spare chunk is allocated again. 26 */ 27 28static void nxt_work_queue_allocate(nxt_work_queue_cache_t *cache, 29 nxt_thread_spinlock_t *lock); 30static void nxt_work_queue_sleep(nxt_thread_spinlock_t *lock); 31static nxt_work_queue_t *nxt_thread_current_work_queue(nxt_thread_t *thr); 32static nxt_work_handler_t nxt_locked_work_queue_pop_work( | 1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7#include <nxt_main.h> 8 --- 16 unchanged lines hidden (view full) --- 25 * a new spare chunk is allocated again. 26 */ 27 28static void nxt_work_queue_allocate(nxt_work_queue_cache_t *cache, 29 nxt_thread_spinlock_t *lock); 30static void nxt_work_queue_sleep(nxt_thread_spinlock_t *lock); 31static nxt_work_queue_t *nxt_thread_current_work_queue(nxt_thread_t *thr); 32static nxt_work_handler_t nxt_locked_work_queue_pop_work( |
33 nxt_locked_work_queue_t *lwq, void **obj, void **data, nxt_log_t **log); | 33 nxt_locked_work_queue_t *lwq, nxt_task_t **task, void **obj, void **data); |
34 35 36/* It should be adjusted with the "work_queue_bucket_items" directive. */ 37static nxt_uint_t nxt_work_queue_bucket_items = 409; 38 39 40void 41nxt_thread_work_queue_create(nxt_thread_t *thr, size_t chunk_size) --- 83 unchanged lines hidden (view full) --- 125 } 126} 127 128 129/* Add a work to a work queue tail. */ 130 131void 132nxt_thread_work_queue_add(nxt_thread_t *thr, nxt_work_queue_t *wq, | 34 35 36/* It should be adjusted with the "work_queue_bucket_items" directive. */ 37static nxt_uint_t nxt_work_queue_bucket_items = 409; 38 39 40void 41nxt_thread_work_queue_create(nxt_thread_t *thr, size_t chunk_size) --- 83 unchanged lines hidden (view full) --- 125 } 126} 127 128 129/* Add a work to a work queue tail. */ 130 131void 132nxt_thread_work_queue_add(nxt_thread_t *thr, nxt_work_queue_t *wq, |
133 nxt_work_handler_t handler, void *obj, void *data, nxt_log_t *log) | 133 nxt_work_handler_t handler, nxt_task_t *task, void *obj, void *data) |
134{ 135 nxt_work_t *work; 136 137 nxt_work_queue_attach(thr, wq); 138 139 for ( ;; ) { 140 work = thr->work_queue.cache.next; 141 142 if (nxt_fast_path(work != NULL)) { 143 thr->work_queue.cache.next = work->next; 144 work->next = NULL; 145 146 work->handler = handler; | 134{ 135 nxt_work_t *work; 136 137 nxt_work_queue_attach(thr, wq); 138 139 for ( ;; ) { 140 work = thr->work_queue.cache.next; 141 142 if (nxt_fast_path(work != NULL)) { 143 thr->work_queue.cache.next = work->next; 144 work->next = NULL; 145 146 work->handler = handler; |
147 work->task = task; |
|
147 work->obj = obj; 148 work->data = data; | 148 work->obj = obj; 149 work->data = data; |
149 work->log = log; | |
150 151 if (wq->tail != NULL) { 152 wq->tail->next = work; 153 154 } else { 155 wq->head = work; 156 } 157 --- 6 unchanged lines hidden (view full) --- 164 } 165} 166 167 168/* Push a work to a work queue head. */ 169 170void 171nxt_thread_work_queue_push(nxt_thread_t *thr, nxt_work_queue_t *wq, | 150 151 if (wq->tail != NULL) { 152 wq->tail->next = work; 153 154 } else { 155 wq->head = work; 156 } 157 --- 6 unchanged lines hidden (view full) --- 164 } 165} 166 167 168/* Push a work to a work queue head. */ 169 170void 171nxt_thread_work_queue_push(nxt_thread_t *thr, nxt_work_queue_t *wq, |
172 nxt_work_handler_t handler, void *obj, void *data, nxt_log_t *log) | 172 nxt_work_handler_t handler, nxt_task_t *task, void *obj, void *data) |
173{ 174 nxt_work_t *work; 175 176 nxt_work_queue_attach(thr, wq); 177 178 for ( ;; ) { 179 work = thr->work_queue.cache.next; 180 181 if (nxt_fast_path(work != NULL)) { 182 thr->work_queue.cache.next = work->next; 183 work->next = wq->head; 184 185 work->handler = handler; 186 work->obj = obj; 187 work->data = data; | 173{ 174 nxt_work_t *work; 175 176 nxt_work_queue_attach(thr, wq); 177 178 for ( ;; ) { 179 work = thr->work_queue.cache.next; 180 181 if (nxt_fast_path(work != NULL)) { 182 thr->work_queue.cache.next = work->next; 183 work->next = wq->head; 184 185 work->handler = handler; 186 work->obj = obj; 187 work->data = data; |
188 work->log = log; | |
189 190 wq->head = work; 191 192 if (wq->tail == NULL) { 193 wq->tail = work; 194 } 195 196 return; --- 21 unchanged lines hidden (view full) --- 218 thr->work_queue.tail = wq; 219 } 220} 221 222 223/* Pop a work from a thread work queue head. */ 224 225nxt_work_handler_t | 188 189 wq->head = work; 190 191 if (wq->tail == NULL) { 192 wq->tail = work; 193 } 194 195 return; --- 21 unchanged lines hidden (view full) --- 217 thr->work_queue.tail = wq; 218 } 219} 220 221 222/* Pop a work from a thread work queue head. */ 223 224nxt_work_handler_t |
226nxt_thread_work_queue_pop(nxt_thread_t *thr, void **obj, void **data, 227 nxt_log_t **log) | 225nxt_thread_work_queue_pop(nxt_thread_t *thr, nxt_task_t **task, void **obj, 226 void **data) |
228{ 229 nxt_work_t *work; 230 nxt_work_queue_t *wq; 231 232 wq = nxt_thread_current_work_queue(thr); 233 234 if (wq != NULL) { 235 236 work = wq->head; 237 238 if (work != NULL) { 239 wq->head = work->next; 240 241 if (work->next == NULL) { 242 wq->tail = NULL; 243 } 244 | 227{ 228 nxt_work_t *work; 229 nxt_work_queue_t *wq; 230 231 wq = nxt_thread_current_work_queue(thr); 232 233 if (wq != NULL) { 234 235 work = wq->head; 236 237 if (work != NULL) { 238 wq->head = work->next; 239 240 if (work->next == NULL) { 241 wq->tail = NULL; 242 } 243 |
244 *task = work->task; |
|
245 *obj = work->obj; 246 nxt_prefetch(*obj); 247 *data = work->data; 248 nxt_prefetch(*data); 249 250 work->next = thr->work_queue.cache.next; 251 thr->work_queue.cache.next = work; 252 | 245 *obj = work->obj; 246 nxt_prefetch(*obj); 247 *data = work->data; 248 nxt_prefetch(*data); 249 250 work->next = thr->work_queue.cache.next; 251 thr->work_queue.cache.next = work; 252 |
253 *log = work->log; 254 | |
255#if (NXT_DEBUG) 256 257 if (work->handler == NULL) { 258 nxt_log_alert(thr->log, "null work handler"); 259 nxt_abort(); 260 } 261 262#endif --- 67 unchanged lines hidden (view full) --- 330 } 331} 332 333 334/* Add a work to the thread last work queue's tail. */ 335 336void 337nxt_thread_last_work_queue_add(nxt_thread_t *thr, nxt_work_handler_t handler, | 253#if (NXT_DEBUG) 254 255 if (work->handler == NULL) { 256 nxt_log_alert(thr->log, "null work handler"); 257 nxt_abort(); 258 } 259 260#endif --- 67 unchanged lines hidden (view full) --- 328 } 329} 330 331 332/* Add a work to the thread last work queue's tail. */ 333 334void 335nxt_thread_last_work_queue_add(nxt_thread_t *thr, nxt_work_handler_t handler, |
338 void *obj, void *data, nxt_log_t *log) | 336 void *obj, void *data) |
339{ 340 nxt_work_t *work; 341 342 for ( ;; ) { 343 work = thr->work_queue.cache.next; 344 345 if (nxt_fast_path(work != NULL)) { 346 thr->work_queue.cache.next = work->next; 347 work->next = NULL; 348 349 work->handler = handler; 350 work->obj = obj; 351 work->data = data; | 337{ 338 nxt_work_t *work; 339 340 for ( ;; ) { 341 work = thr->work_queue.cache.next; 342 343 if (nxt_fast_path(work != NULL)) { 344 thr->work_queue.cache.next = work->next; 345 work->next = NULL; 346 347 work->handler = handler; 348 work->obj = obj; 349 work->data = data; |
352 work->log = log; | |
353 354 if (thr->work_queue.last.tail != NULL) { 355 thr->work_queue.last.tail->next = work; 356 357 } else { 358 thr->work_queue.last.head = work; 359 } 360 --- 5 unchanged lines hidden (view full) --- 366 nxt_work_queue_allocate(&thr->work_queue.cache, NULL); 367 } 368} 369 370 371/* Pop a work from the thread last work queue's head. */ 372 373nxt_work_handler_t | 350 351 if (thr->work_queue.last.tail != NULL) { 352 thr->work_queue.last.tail->next = work; 353 354 } else { 355 thr->work_queue.last.head = work; 356 } 357 --- 5 unchanged lines hidden (view full) --- 363 nxt_work_queue_allocate(&thr->work_queue.cache, NULL); 364 } 365} 366 367 368/* Pop a work from the thread last work queue's head. */ 369 370nxt_work_handler_t |
374nxt_thread_last_work_queue_pop(nxt_thread_t *thr, void **obj, void **data, 375 nxt_log_t **log) | 371nxt_thread_last_work_queue_pop(nxt_thread_t *thr, nxt_task_t **task, void **obj, 372 void **data) |
376{ 377 nxt_work_t *work; 378 379 work = thr->work_queue.last.head; 380 381 if (work != NULL) { 382 nxt_log_debug(thr->log, "work queue: %s", thr->work_queue.last.name); 383 384 thr->work_queue.last.head = work->next; 385 386 if (work->next == NULL) { 387 thr->work_queue.last.tail = NULL; 388 } 389 | 373{ 374 nxt_work_t *work; 375 376 work = thr->work_queue.last.head; 377 378 if (work != NULL) { 379 nxt_log_debug(thr->log, "work queue: %s", thr->work_queue.last.name); 380 381 thr->work_queue.last.head = work->next; 382 383 if (work->next == NULL) { 384 thr->work_queue.last.tail = NULL; 385 } 386 |
387 *task = work->task; |
|
390 *obj = work->obj; 391 nxt_prefetch(*obj); 392 *data = work->data; 393 nxt_prefetch(*data); 394 395 work->next = thr->work_queue.cache.next; 396 thr->work_queue.cache.next = work; 397 | 388 *obj = work->obj; 389 nxt_prefetch(*obj); 390 *data = work->data; 391 nxt_prefetch(*data); 392 393 work->next = thr->work_queue.cache.next; 394 thr->work_queue.cache.next = work; 395 |
398 *log = work->log; 399 | |
400#if (NXT_DEBUG) 401 402 if (work->handler == NULL) { 403 nxt_log_alert(thr->log, "null work handler"); 404 nxt_abort(); 405 } 406 407#endif --- 78 unchanged lines hidden (view full) --- 486 } 487} 488 489 490/* Add a work to a locked work queue tail. */ 491 492void 493nxt_locked_work_queue_add(nxt_locked_work_queue_t *lwq, | 396#if (NXT_DEBUG) 397 398 if (work->handler == NULL) { 399 nxt_log_alert(thr->log, "null work handler"); 400 nxt_abort(); 401 } 402 403#endif --- 78 unchanged lines hidden (view full) --- 482 } 483} 484 485 486/* Add a work to a locked work queue tail. */ 487 488void 489nxt_locked_work_queue_add(nxt_locked_work_queue_t *lwq, |
494 nxt_work_handler_t handler, void *obj, void *data, nxt_log_t *log) | 490 nxt_work_handler_t handler, nxt_task_t *task, void *obj, void *data) |
495{ 496 nxt_work_t *work; 497 498 nxt_thread_spin_lock(&lwq->lock); 499 500 for ( ;; ) { 501 work = lwq->cache.next; 502 503 if (nxt_fast_path(work != NULL)) { 504 lwq->cache.next = work->next; 505 506 work->next = NULL; 507 work->handler = handler; | 491{ 492 nxt_work_t *work; 493 494 nxt_thread_spin_lock(&lwq->lock); 495 496 for ( ;; ) { 497 work = lwq->cache.next; 498 499 if (nxt_fast_path(work != NULL)) { 500 lwq->cache.next = work->next; 501 502 work->next = NULL; 503 work->handler = handler; |
504 work->task = task; |
|
508 work->obj = obj; 509 work->data = data; | 505 work->obj = obj; 506 work->data = data; |
510 work->log = log; | |
511 512 if (lwq->tail != NULL) { 513 lwq->tail->next = work; 514 515 } else { 516 lwq->head = work; 517 } 518 --- 7 unchanged lines hidden (view full) --- 526 527 nxt_thread_spin_unlock(&lwq->lock); 528} 529 530 531/* Pop a work from a locked work queue head. */ 532 533nxt_work_handler_t | 507 508 if (lwq->tail != NULL) { 509 lwq->tail->next = work; 510 511 } else { 512 lwq->head = work; 513 } 514 --- 7 unchanged lines hidden (view full) --- 522 523 nxt_thread_spin_unlock(&lwq->lock); 524} 525 526 527/* Pop a work from a locked work queue head. */ 528 529nxt_work_handler_t |
534nxt_locked_work_queue_pop(nxt_locked_work_queue_t *lwq, void **obj, 535 void **data, nxt_log_t **log) | 530nxt_locked_work_queue_pop(nxt_locked_work_queue_t *lwq, nxt_task_t **task, 531 void **obj, void **data) |
536{ 537 nxt_work_handler_t handler; 538 539 nxt_thread_spin_lock(&lwq->lock); 540 | 532{ 533 nxt_work_handler_t handler; 534 535 nxt_thread_spin_lock(&lwq->lock); 536 |
541 handler = nxt_locked_work_queue_pop_work(lwq, obj, data, log); | 537 handler = nxt_locked_work_queue_pop_work(lwq, task, obj, data); |
542 543 nxt_thread_spin_unlock(&lwq->lock); 544 545 return handler; 546} 547 548 549static nxt_work_handler_t | 538 539 nxt_thread_spin_unlock(&lwq->lock); 540 541 return handler; 542} 543 544 545static nxt_work_handler_t |
550nxt_locked_work_queue_pop_work(nxt_locked_work_queue_t *lwq, void **obj, 551 void **data, nxt_log_t **log) | 546nxt_locked_work_queue_pop_work(nxt_locked_work_queue_t *lwq, nxt_task_t **task, 547 void **obj, void **data) |
552{ 553 nxt_work_t *work; 554 555 work = lwq->head; 556 557 if (work == NULL) { 558 return NULL; 559 } 560 | 548{ 549 nxt_work_t *work; 550 551 work = lwq->head; 552 553 if (work == NULL) { 554 return NULL; 555 } 556 |
557 *task = work->task; |
|
561 *obj = work->obj; 562 nxt_prefetch(*obj); 563 *data = work->data; 564 nxt_prefetch(*data); 565 566 lwq->head = work->next; 567 568 if (work->next == NULL) { 569 lwq->tail = NULL; 570 } 571 572 work->next = lwq->cache.next; 573 lwq->cache.next = work; 574 | 558 *obj = work->obj; 559 nxt_prefetch(*obj); 560 *data = work->data; 561 nxt_prefetch(*data); 562 563 lwq->head = work->next; 564 565 if (work->next == NULL) { 566 lwq->tail = NULL; 567 } 568 569 work->next = lwq->cache.next; 570 lwq->cache.next = work; 571 |
575 *log = work->log; 576 | |
577 return work->handler; 578} 579 580 581/* Move all works from a locked work queue to a usual work queue. */ 582 583void 584nxt_locked_work_queue_move(nxt_thread_t *thr, nxt_locked_work_queue_t *lwq, 585 nxt_work_queue_t *wq) 586{ 587 void *obj, *data; | 572 return work->handler; 573} 574 575 576/* Move all works from a locked work queue to a usual work queue. */ 577 578void 579nxt_locked_work_queue_move(nxt_thread_t *thr, nxt_locked_work_queue_t *lwq, 580 nxt_work_queue_t *wq) 581{ 582 void *obj, *data; |
588 nxt_log_t *log; | 583 nxt_task_t *task; |
589 nxt_work_handler_t handler; 590 591 /* Locked work queue head can be tested without a lock. */ 592 593 if (nxt_fast_path(lwq->head == NULL)) { 594 return; 595 } 596 597 nxt_thread_spin_lock(&lwq->lock); 598 599 for ( ;; ) { | 584 nxt_work_handler_t handler; 585 586 /* Locked work queue head can be tested without a lock. */ 587 588 if (nxt_fast_path(lwq->head == NULL)) { 589 return; 590 } 591 592 nxt_thread_spin_lock(&lwq->lock); 593 594 for ( ;; ) { |
600 handler = nxt_locked_work_queue_pop_work(lwq, &obj, &data, &log); | 595 handler = nxt_locked_work_queue_pop_work(lwq, &task, &obj, &data); |
601 602 if (handler == NULL) { 603 break; 604 } 605 | 596 597 if (handler == NULL) { 598 break; 599 } 600 |
606 nxt_thread_work_queue_add(thr, wq, handler, obj, data, log); | 601 task->thread = thr; 602 603 nxt_thread_work_queue_add(thr, wq, handler, task, obj, data); |
607 } 608 609 nxt_thread_spin_unlock(&lwq->lock); 610} | 604 } 605 606 nxt_thread_spin_unlock(&lwq->lock); 607} |