1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 /* 11 * Available work items are crucial for overall engine operation, so 12 * the items are preallocated in two chunks: cache and spare chunks. 13 * By default each chunk preallocates 409 work items on two or four 14 * CPU pages depending on platform. If all items in a cache chunk are 15 * exhausted then a spare chunk becomes a cache chunk, and a new spare 16 * chunk is allocated. This two-step allocation mitigates low memory 17 * condition impact on work queue operation. However, if both chunks 18 * are exhausted then a thread will sleep in reliance on another thread 19 * frees some memory. However, this may lead to deadlock and probably 20 * a process should be aborted. This behaviour should be considered as 21 * abort on program stack exhaustion. 22 * 23 * The cache and spare chunks initially are also allocated in two steps: 24 * a spare chunk is allocated first, then it becomes the cache chunk and 25 * a new spare chunk is allocated again. 26 */ 27 28 static void nxt_work_queue_allocate(nxt_work_queue_cache_t *cache, 29 nxt_thread_spinlock_t *lock); 30 static void nxt_work_queue_sleep(nxt_thread_spinlock_t *lock); 31 static nxt_work_queue_t *nxt_thread_current_work_queue(nxt_thread_t *thr); 32 static nxt_work_handler_t nxt_locked_work_queue_pop_work( 33 nxt_locked_work_queue_t *lwq, void **obj, void **data, nxt_log_t **log); 34 35 36 /* It should be adjusted with the "work_queue_bucket_items" directive. */ 37 static nxt_uint_t nxt_work_queue_bucket_items = 409; 38 39 40 void 41 nxt_thread_work_queue_create(nxt_thread_t *thr, size_t chunk_size) 42 { 43 nxt_memzero(&thr->work_queue, sizeof(nxt_thread_work_queue_t)); 44 45 nxt_work_queue_name(&thr->work_queue.main, "main"); 46 nxt_work_queue_name(&thr->work_queue.last, "last"); 47 48 if (chunk_size == 0) { 49 chunk_size = nxt_work_queue_bucket_items; 50 } 51 52 /* nxt_work_queue_chunk_t already has one work item. */ 53 thr->work_queue.cache.chunk_size = chunk_size - 1; 54 55 while (thr->work_queue.cache.next == NULL) { 56 nxt_work_queue_allocate(&thr->work_queue.cache, NULL); 57 } 58 } 59 60 61 void 62 nxt_thread_work_queue_destroy(nxt_thread_t *thr) 63 { 64 nxt_work_queue_chunk_t *chunk, *next; 65 66 for (chunk = thr->work_queue.cache.chunk; chunk; chunk = next) { 67 next = chunk->next; 68 nxt_free(chunk); 69 } 70 } 71 72 73 static void 74 nxt_work_queue_allocate(nxt_work_queue_cache_t *cache, 75 nxt_thread_spinlock_t *lock) 76 { 77 size_t size; 78 nxt_uint_t i, n; 79 nxt_work_t *work; 80 nxt_work_queue_chunk_t *chunk; 81 82 n = cache->chunk_size; 83 size = sizeof(nxt_work_queue_chunk_t) + n * sizeof(nxt_work_t); 84 85 chunk = nxt_malloc(size); 86 87 if (nxt_fast_path(chunk != NULL)) { 88 89 chunk->next = cache->chunk; 90 cache->chunk = chunk; 91 work = &chunk->work; 92 93 for (i = 0; i < n; i++) { 94 work[i].next = &work[i + 1]; 95 } 96 97 work[i].next = NULL; 98 work++; 99 100 } else if (cache->spare != NULL) { 101 102 work = NULL; 103 104 } else { 105 nxt_work_queue_sleep(lock); 106 return; 107 } 108 109 cache->next = cache->spare; 110 cache->spare = work; 111 } 112 113 114 static void 115 nxt_work_queue_sleep(nxt_thread_spinlock_t *lock) 116 { 117 if (lock != NULL) { 118 nxt_thread_spin_unlock(lock); 119 } 120 121 nxt_nanosleep(100 * 1000000); /* 100ms */ 122 123 if (lock != NULL) { 124 nxt_thread_spin_lock(lock); 125 } 126 } 127 128 129 /* Add a work to a work queue tail. */ 130 131 void 132 nxt_thread_work_queue_add(nxt_thread_t *thr, nxt_work_queue_t *wq, 133 nxt_work_handler_t handler, void *obj, void *data, nxt_log_t *log) 134 { 135 nxt_work_t *work; 136 137 nxt_work_queue_attach(thr, wq); 138 139 for ( ;; ) { 140 work = thr->work_queue.cache.next; 141 142 if (nxt_fast_path(work != NULL)) { 143 thr->work_queue.cache.next = work->next; 144 work->next = NULL; 145 146 work->handler = handler; 147 work->obj = obj; 148 work->data = data; 149 work->log = log; 150 151 if (wq->tail != NULL) { 152 wq->tail->next = work; 153 154 } else { 155 wq->head = work; 156 } 157 158 wq->tail = work; 159 160 return; 161 } 162 163 nxt_work_queue_allocate(&thr->work_queue.cache, NULL); 164 } 165 } 166 167 168 /* Push a work to a work queue head. */ 169 170 void 171 nxt_thread_work_queue_push(nxt_thread_t *thr, nxt_work_queue_t *wq, 172 nxt_work_handler_t handler, void *obj, void *data, nxt_log_t *log) 173 { 174 nxt_work_t *work; 175 176 nxt_work_queue_attach(thr, wq); 177 178 for ( ;; ) { 179 work = thr->work_queue.cache.next; 180 181 if (nxt_fast_path(work != NULL)) { 182 thr->work_queue.cache.next = work->next; 183 work->next = wq->head; 184 185 work->handler = handler; 186 work->obj = obj; 187 work->data = data; 188 work->log = log; 189 190 wq->head = work; 191 192 if (wq->tail == NULL) { 193 wq->tail = work; 194 } 195 196 return; 197 } 198 199 nxt_work_queue_allocate(&thr->work_queue.cache, NULL); 200 } 201 } 202 203 204 /* Attach a work queue to a thread work queue. */ 205 206 void 207 nxt_work_queue_attach(nxt_thread_t *thr, nxt_work_queue_t *wq) 208 { 209 if (wq->next == NULL && wq != thr->work_queue.tail) { 210 211 if (thr->work_queue.tail != NULL) { 212 thr->work_queue.tail->next = wq; 213 214 } else { 215 thr->work_queue.head = wq; 216 } 217 218 thr->work_queue.tail = wq; 219 } 220 } 221 222 223 /* Pop a work from a thread work queue head. */ 224 225 nxt_work_handler_t 226 nxt_thread_work_queue_pop(nxt_thread_t *thr, void **obj, void **data, 227 nxt_log_t **log) 228 { 229 nxt_work_t *work; 230 nxt_work_queue_t *wq; 231 232 wq = nxt_thread_current_work_queue(thr); 233 234 if (wq != NULL) { 235 236 work = wq->head; 237 238 if (work != NULL) { 239 wq->head = work->next; 240 241 if (work->next == NULL) { 242 wq->tail = NULL; 243 } 244 245 *obj = work->obj; 246 nxt_prefetch(*obj); 247 *data = work->data; 248 nxt_prefetch(*data); 249 250 work->next = thr->work_queue.cache.next; 251 thr->work_queue.cache.next = work; 252 253 *log = work->log; 254 255 #if (NXT_DEBUG) 256 257 if (work->handler == NULL) { 258 nxt_log_alert(thr->log, "null work handler"); 259 nxt_abort(); 260 } 261 262 #endif 263 264 return work->handler; 265 } 266 } 267 268 return NULL; 269 } 270 271 272 static nxt_work_queue_t * 273 nxt_thread_current_work_queue(nxt_thread_t *thr) 274 { 275 nxt_work_queue_t *wq, *next; 276 277 for (wq = thr->work_queue.head; wq != NULL; wq = next) { 278 279 if (wq->head != NULL) { 280 nxt_log_debug(thr->log, "work queue: %s", wq->name); 281 return wq; 282 } 283 284 /* Detach empty work queue. */ 285 next = wq->next; 286 wq->next = NULL; 287 thr->work_queue.head = next; 288 } 289 290 thr->work_queue.tail = NULL; 291 292 return NULL; 293 } 294 295 296 /* Drop a work with specified data from a thread work queue. */ 297 298 void 299 nxt_thread_work_queue_drop(nxt_thread_t *thr, void *data) 300 { 301 nxt_work_t *work, *prev, *next, **link; 302 nxt_work_queue_t *wq; 303 304 for (wq = thr->work_queue.head; wq != NULL; wq = wq->next) { 305 306 prev = NULL; 307 link = &wq->head; 308 309 for (work = wq->head; work != NULL; work = next) { 310 311 next = work->next; 312 313 if (data != work->obj) { 314 prev = work; 315 link = &work->next; 316 317 } else { 318 if (next == NULL) { 319 wq->tail = prev; 320 } 321 322 nxt_log_debug(thr->log, "work queue drop"); 323 324 *link = next; 325 326 work->next = thr->work_queue.cache.next; 327 thr->work_queue.cache.next = work; 328 } 329 } 330 } 331 } 332 333 334 /* Add a work to the thread last work queue's tail. */ 335 336 void 337 nxt_thread_last_work_queue_add(nxt_thread_t *thr, nxt_work_handler_t handler, 338 void *obj, void *data, nxt_log_t *log) 339 { 340 nxt_work_t *work; 341 342 for ( ;; ) { 343 work = thr->work_queue.cache.next; 344 345 if (nxt_fast_path(work != NULL)) { 346 thr->work_queue.cache.next = work->next; 347 work->next = NULL; 348 349 work->handler = handler; 350 work->obj = obj; 351 work->data = data; 352 work->log = log; 353 354 if (thr->work_queue.last.tail != NULL) { 355 thr->work_queue.last.tail->next = work; 356 357 } else { 358 thr->work_queue.last.head = work; 359 } 360 361 thr->work_queue.last.tail = work; 362 363 return; 364 } 365 366 nxt_work_queue_allocate(&thr->work_queue.cache, NULL); 367 } 368 } 369 370 371 /* Pop a work from the thread last work queue's head. */ 372 373 nxt_work_handler_t 374 nxt_thread_last_work_queue_pop(nxt_thread_t *thr, void **obj, void **data, 375 nxt_log_t **log) 376 { 377 nxt_work_t *work; 378 379 work = thr->work_queue.last.head; 380 381 if (work != NULL) { 382 nxt_log_debug(thr->log, "work queue: %s", thr->work_queue.last.name); 383 384 thr->work_queue.last.head = work->next; 385 386 if (work->next == NULL) { 387 thr->work_queue.last.tail = NULL; 388 } 389 390 *obj = work->obj; 391 nxt_prefetch(*obj); 392 *data = work->data; 393 nxt_prefetch(*data); 394 395 work->next = thr->work_queue.cache.next; 396 thr->work_queue.cache.next = work; 397 398 *log = work->log; 399 400 #if (NXT_DEBUG) 401 402 if (work->handler == NULL) { 403 nxt_log_alert(thr->log, "null work handler"); 404 nxt_abort(); 405 } 406 407 #endif 408 409 return work->handler; 410 } 411 412 return NULL; 413 } 414 415 416 void 417 nxt_work_queue_destroy(nxt_work_queue_t *wq) 418 { 419 nxt_thread_t *thr; 420 nxt_work_queue_t *q; 421 422 thr = nxt_thread(); 423 424 /* Detach from a thread work queue. */ 425 426 if (thr->work_queue.head == wq) { 427 thr->work_queue.head = wq->next; 428 q = NULL; 429 goto found; 430 } 431 432 for (q = thr->work_queue.head; q != NULL; q = q->next) { 433 if (q->next == wq) { 434 q->next = wq->next; 435 goto found; 436 } 437 } 438 439 return; 440 441 found: 442 443 if (thr->work_queue.tail == wq) { 444 thr->work_queue.tail = q; 445 } 446 447 /* Move all queue's works to a thread work queue cache. */ 448 449 if (wq->tail != NULL) { 450 wq->tail->next = thr->work_queue.cache.next; 451 } 452 453 if (wq->head != NULL) { 454 thr->work_queue.cache.next = wq->head; 455 } 456 } 457 458 459 /* Locked work queue operations. */ 460 461 void 462 nxt_locked_work_queue_create(nxt_locked_work_queue_t *lwq, size_t chunk_size) 463 { 464 nxt_memzero(lwq, sizeof(nxt_locked_work_queue_t)); 465 466 if (chunk_size == 0) { 467 chunk_size = nxt_work_queue_bucket_items; 468 } 469 470 lwq->cache.chunk_size = chunk_size; 471 472 while (lwq->cache.next == NULL) { 473 nxt_work_queue_allocate(&lwq->cache, NULL); 474 } 475 } 476 477 478 void 479 nxt_locked_work_queue_destroy(nxt_locked_work_queue_t *lwq) 480 { 481 nxt_work_queue_chunk_t *chunk, *next; 482 483 for (chunk = lwq->cache.chunk; chunk; chunk = next) { 484 next = chunk->next; 485 nxt_free(chunk); 486 } 487 } 488 489 490 /* Add a work to a locked work queue tail. */ 491 492 void 493 nxt_locked_work_queue_add(nxt_locked_work_queue_t *lwq, 494 nxt_work_handler_t handler, void *obj, void *data, nxt_log_t *log) 495 { 496 nxt_work_t *work; 497 498 nxt_thread_spin_lock(&lwq->lock); 499 500 for ( ;; ) { 501 work = lwq->cache.next; 502 503 if (nxt_fast_path(work != NULL)) { 504 lwq->cache.next = work->next; 505 506 work->next = NULL; 507 work->handler = handler; 508 work->obj = obj; 509 work->data = data; 510 work->log = log; 511 512 if (lwq->tail != NULL) { 513 lwq->tail->next = work; 514 515 } else { 516 lwq->head = work; 517 } 518 519 lwq->tail = work; 520 521 break; 522 } 523 524 nxt_work_queue_allocate(&lwq->cache, &lwq->lock); 525 } 526 527 nxt_thread_spin_unlock(&lwq->lock); 528 } 529 530 531 /* Pop a work from a locked work queue head. */ 532 533 nxt_work_handler_t 534 nxt_locked_work_queue_pop(nxt_locked_work_queue_t *lwq, void **obj, 535 void **data, nxt_log_t **log) 536 { 537 nxt_work_handler_t handler; 538 539 nxt_thread_spin_lock(&lwq->lock); 540 541 handler = nxt_locked_work_queue_pop_work(lwq, obj, data, log); 542 543 nxt_thread_spin_unlock(&lwq->lock); 544 545 return handler; 546 } 547 548 549 static nxt_work_handler_t 550 nxt_locked_work_queue_pop_work(nxt_locked_work_queue_t *lwq, void **obj, 551 void **data, nxt_log_t **log) 552 { 553 nxt_work_t *work; 554 555 work = lwq->head; 556 557 if (work == NULL) { 558 return NULL; 559 } 560 561 *obj = work->obj; 562 nxt_prefetch(*obj); 563 *data = work->data; 564 nxt_prefetch(*data); 565 566 lwq->head = work->next; 567 568 if (work->next == NULL) { 569 lwq->tail = NULL; 570 } 571 572 work->next = lwq->cache.next; 573 lwq->cache.next = work; 574 575 *log = work->log; 576 577 return work->handler; 578 } 579 580 581 /* Move all works from a locked work queue to a usual work queue. */ 582 583 void 584 nxt_locked_work_queue_move(nxt_thread_t *thr, nxt_locked_work_queue_t *lwq, 585 nxt_work_queue_t *wq) 586 { 587 void *obj, *data; 588 nxt_log_t *log; 589 nxt_work_handler_t handler; 590 591 /* Locked work queue head can be tested without a lock. */ 592 593 if (nxt_fast_path(lwq->head == NULL)) { 594 return; 595 } 596 597 nxt_thread_spin_lock(&lwq->lock); 598 599 for ( ;; ) { 600 handler = nxt_locked_work_queue_pop_work(lwq, &obj, &data, &log); 601 602 if (handler == NULL) { 603 break; 604 } 605 606 nxt_thread_work_queue_add(thr, wq, handler, obj, data, log); 607 } 608 609 nxt_thread_spin_unlock(&lwq->lock); 610 } 611