1 2 /* 3 * Copyright (C) Max Romanov 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 #if (NXT_HAVE_MEMFD_CREATE) 10 11 #include <linux/memfd.h> 12 #include <unistd.h> 13 #include <sys/syscall.h> 14 15 #endif 16 17 #include <nxt_port_memory_int.h> 18 19 20 static void nxt_port_broadcast_shm_ack(nxt_task_t *task, nxt_port_t *port, 21 void *data); 22 23 24 nxt_inline void 25 nxt_port_mmap_handler_use(nxt_port_mmap_handler_t *mmap_handler, int i) 26 { 27 int c; 28 29 c = nxt_atomic_fetch_add(&mmap_handler->use_count, i); 30 31 if (i < 0 && c == -i) { 32 if (mmap_handler->hdr != NULL) { 33 nxt_mem_munmap(mmap_handler->hdr, PORT_MMAP_SIZE); 34 mmap_handler->hdr = NULL; 35 } 36 37 if (mmap_handler->fd != -1) { 38 nxt_fd_close(mmap_handler->fd); 39 } 40 41 nxt_free(mmap_handler); 42 } 43 } 44 45 46 static nxt_port_mmap_t * 47 nxt_port_mmap_at(nxt_port_mmaps_t *port_mmaps, uint32_t i) 48 { 49 uint32_t cap; 50 51 cap = port_mmaps->cap; 52 53 if (cap == 0) { 54 cap = i + 1; 55 } 56 57 while (i + 1 > cap) { 58 59 if (cap < 16) { 60 cap = cap * 2; 61 62 } else { 63 cap = cap + cap / 2; 64 } 65 } 66 67 if (cap != port_mmaps->cap) { 68 69 port_mmaps->elts = nxt_realloc(port_mmaps->elts, 70 cap * sizeof(nxt_port_mmap_t)); 71 if (nxt_slow_path(port_mmaps->elts == NULL)) { 72 return NULL; 73 } 74 75 nxt_memzero(port_mmaps->elts + port_mmaps->cap, 76 sizeof(nxt_port_mmap_t) * (cap - port_mmaps->cap)); 77 78 port_mmaps->cap = cap; 79 } 80 81 if (i + 1 > port_mmaps->size) { 82 port_mmaps->size = i + 1; 83 } 84 85 return port_mmaps->elts + i; 86 } 87 88 89 void 90 nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free_elts) 91 { 92 uint32_t i; 93 nxt_port_mmap_t *port_mmap; 94 95 if (port_mmaps == NULL) { 96 return; 97 } 98 99 port_mmap = port_mmaps->elts; 100 101 for (i = 0; i < port_mmaps->size; i++) { 102 nxt_port_mmap_handler_use(port_mmap[i].mmap_handler, -1); 103 } 104 105 port_mmaps->size = 0; 106 107 if (free_elts != 0) { 108 nxt_free(port_mmaps->elts); 109 } 110 } 111 112 113 #define nxt_port_mmap_free_junk(p, size) \ 114 memset((p), 0xA5, size) 115 116 117 static void 118 nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data) 119 { 120 u_char *p; 121 nxt_mp_t *mp; 122 nxt_buf_t *b, *next; 123 nxt_process_t *process; 124 nxt_chunk_id_t c; 125 nxt_port_mmap_header_t *hdr; 126 nxt_port_mmap_handler_t *mmap_handler; 127 128 if (nxt_buf_ts_handle(task, obj, data)) { 129 return; 130 } 131 132 b = obj; 133 134 nxt_assert(data == b->parent); 135 136 mmap_handler = data; 137 138 complete_buf: 139 140 hdr = mmap_handler->hdr; 141 142 if (nxt_slow_path(hdr->src_pid != nxt_pid && hdr->dst_pid != nxt_pid)) { 143 nxt_debug(task, "mmap buf completion: mmap for other process pair " 144 "%PI->%PI", hdr->src_pid, hdr->dst_pid); 145 146 goto release_buf; 147 } 148 149 if (b->is_port_mmap_sent && b->mem.pos > b->mem.start) { 150 /* 151 * Chunks until b->mem.pos has been sent to other side, 152 * let's release rest (if any). 153 */ 154 p = b->mem.pos - 1; 155 c = nxt_port_mmap_chunk_id(hdr, p) + 1; 156 p = nxt_port_mmap_chunk_start(hdr, c); 157 158 } else { 159 p = b->mem.start; 160 c = nxt_port_mmap_chunk_id(hdr, p); 161 } 162 163 nxt_port_mmap_free_junk(p, b->mem.end - p); 164 165 nxt_debug(task, "mmap buf completion: %p [%p,%uz] (sent=%d), " 166 "%PI->%PI,%d,%d", b, b->mem.start, b->mem.end - b->mem.start, 167 b->is_port_mmap_sent, hdr->src_pid, hdr->dst_pid, hdr->id, c); 168 169 while (p < b->mem.end) { 170 nxt_port_mmap_set_chunk_free(hdr->free_map, c); 171 172 p += PORT_MMAP_CHUNK_SIZE; 173 c++; 174 } 175 176 if (hdr->dst_pid == nxt_pid 177 && nxt_atomic_cmp_set(&hdr->oosm, 1, 0)) 178 { 179 process = nxt_runtime_process_find(task->thread->runtime, hdr->src_pid); 180 181 nxt_process_broadcast_shm_ack(task, process); 182 } 183 184 release_buf: 185 186 nxt_port_mmap_handler_use(mmap_handler, -1); 187 188 next = b->next; 189 mp = b->data; 190 191 nxt_mp_free(mp, b); 192 nxt_mp_release(mp); 193 194 if (next != NULL) { 195 b = next; 196 mmap_handler = b->parent; 197 198 goto complete_buf; 199 } 200 } 201 202 203 nxt_port_mmap_handler_t * 204 nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process, 205 nxt_fd_t fd) 206 { 207 void *mem; 208 struct stat mmap_stat; 209 nxt_port_mmap_t *port_mmap; 210 nxt_port_mmap_header_t *hdr; 211 nxt_port_mmap_handler_t *mmap_handler; 212 213 nxt_debug(task, "got new mmap fd #%FD from process %PI", 214 fd, process->pid); 215 216 port_mmap = NULL; 217 218 if (fstat(fd, &mmap_stat) == -1) { 219 nxt_log(task, NXT_LOG_WARN, "fstat(%FD) failed %E", fd, nxt_errno); 220 221 return NULL; 222 } 223 224 mem = nxt_mem_mmap(NULL, mmap_stat.st_size, 225 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 226 227 if (nxt_slow_path(mem == MAP_FAILED)) { 228 nxt_log(task, NXT_LOG_WARN, "mmap() failed %E", nxt_errno); 229 230 return NULL; 231 } 232 233 hdr = mem; 234 235 mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t)); 236 if (nxt_slow_path(mmap_handler == NULL)) { 237 nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler"); 238 239 nxt_mem_munmap(mem, PORT_MMAP_SIZE); 240 241 return NULL; 242 } 243 244 mmap_handler->hdr = hdr; 245 mmap_handler->fd = -1; 246 247 if (nxt_slow_path(hdr->src_pid != process->pid 248 || hdr->dst_pid != nxt_pid)) 249 { 250 nxt_log(task, NXT_LOG_WARN, "unexpected pid in mmap header detected: " 251 "%PI != %PI or %PI != %PI", hdr->src_pid, process->pid, 252 hdr->dst_pid, nxt_pid); 253 254 return NULL; 255 } 256 257 nxt_thread_mutex_lock(&process->incoming.mutex); 258 259 port_mmap = nxt_port_mmap_at(&process->incoming, hdr->id); 260 if (nxt_slow_path(port_mmap == NULL)) { 261 nxt_log(task, NXT_LOG_WARN, "failed to add mmap to incoming array"); 262 263 nxt_mem_munmap(mem, PORT_MMAP_SIZE); 264 hdr = NULL; 265 266 nxt_free(mmap_handler); 267 mmap_handler = NULL; 268 269 goto fail; 270 } 271 272 port_mmap->mmap_handler = mmap_handler; 273 nxt_port_mmap_handler_use(mmap_handler, 1); 274 275 hdr->sent_over = 0xFFFFu; 276 277 fail: 278 279 nxt_thread_mutex_unlock(&process->incoming.mutex); 280 281 return mmap_handler; 282 } 283 284 285 static nxt_port_mmap_handler_t * 286 nxt_port_new_port_mmap(nxt_task_t *task, nxt_port_mmaps_t *mmaps, 287 nxt_bool_t tracking, nxt_int_t n) 288 { 289 void *mem; 290 nxt_fd_t fd; 291 nxt_int_t i; 292 nxt_free_map_t *free_map; 293 nxt_port_mmap_t *port_mmap; 294 nxt_port_mmap_header_t *hdr; 295 nxt_port_mmap_handler_t *mmap_handler; 296 297 mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t)); 298 if (nxt_slow_path(mmap_handler == NULL)) { 299 nxt_alert(task, "failed to allocate mmap_handler"); 300 301 return NULL; 302 } 303 304 port_mmap = nxt_port_mmap_at(mmaps, mmaps->size); 305 if (nxt_slow_path(port_mmap == NULL)) { 306 nxt_alert(task, "failed to add port mmap to mmaps array"); 307 308 nxt_free(mmap_handler); 309 return NULL; 310 } 311 312 fd = nxt_shm_open(task, PORT_MMAP_SIZE); 313 if (nxt_slow_path(fd == -1)) { 314 goto remove_fail; 315 } 316 317 mem = nxt_mem_mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE, 318 MAP_SHARED, fd, 0); 319 320 if (nxt_slow_path(mem == MAP_FAILED)) { 321 nxt_fd_close(fd); 322 goto remove_fail; 323 } 324 325 mmap_handler->hdr = mem; 326 mmap_handler->fd = fd; 327 port_mmap->mmap_handler = mmap_handler; 328 nxt_port_mmap_handler_use(mmap_handler, 1); 329 330 /* Init segment header. */ 331 hdr = mmap_handler->hdr; 332 333 nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map)); 334 nxt_memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map)); 335 336 hdr->id = mmaps->size - 1; 337 hdr->src_pid = nxt_pid; 338 hdr->sent_over = 0xFFFFu; 339 340 /* Mark first chunk as busy */ 341 free_map = tracking ? hdr->free_tracking_map : hdr->free_map; 342 343 for (i = 0; i < n; i++) { 344 nxt_port_mmap_set_chunk_busy(free_map, i); 345 } 346 347 /* Mark as busy chunk followed the last available chunk. */ 348 nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT); 349 nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT); 350 351 nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> ...", 352 hdr->id, nxt_pid); 353 354 return mmap_handler; 355 356 remove_fail: 357 358 nxt_free(mmap_handler); 359 360 mmaps->size--; 361 362 return NULL; 363 } 364 365 366 nxt_int_t 367 nxt_shm_open(nxt_task_t *task, size_t size) 368 { 369 nxt_fd_t fd; 370 371 #if (NXT_HAVE_MEMFD_CREATE || NXT_HAVE_SHM_OPEN) 372 373 u_char *p, name[64]; 374 375 p = nxt_sprintf(name, name + sizeof(name), NXT_SHM_PREFIX "unit.%PI.%uxD", 376 nxt_pid, nxt_random(&task->thread->random)); 377 *p = '\0'; 378 379 #endif 380 381 #if (NXT_HAVE_MEMFD_CREATE) 382 383 fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC); 384 385 if (nxt_slow_path(fd == -1)) { 386 nxt_alert(task, "memfd_create(%s) failed %E", name, nxt_errno); 387 388 return -1; 389 } 390 391 nxt_debug(task, "memfd_create(%s): %FD", name, fd); 392 393 #elif (NXT_HAVE_SHM_OPEN_ANON) 394 395 fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR); 396 397 if (nxt_slow_path(fd == -1)) { 398 nxt_alert(task, "shm_open(SHM_ANON) failed %E", nxt_errno); 399 400 return -1; 401 } 402 403 nxt_debug(task, "shm_open(SHM_ANON): %FD", fd); 404 405 #elif (NXT_HAVE_SHM_OPEN) 406 407 /* Just in case. */ 408 shm_unlink((char *) name); 409 410 fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR); 411 412 if (nxt_slow_path(fd == -1)) { 413 nxt_alert(task, "shm_open(%s) failed %E", name, nxt_errno); 414 415 return -1; 416 } 417 418 nxt_debug(task, "shm_open(%s): %FD", name, fd); 419 420 if (nxt_slow_path(shm_unlink((char *) name) == -1)) { 421 nxt_log(task, NXT_LOG_WARN, "shm_unlink(%s) failed %E", name, 422 nxt_errno); 423 } 424 425 #else 426 427 #error No working shared memory implementation. 428 429 #endif 430 431 if (nxt_slow_path(ftruncate(fd, size) == -1)) { 432 nxt_alert(task, "ftruncate() failed %E", nxt_errno); 433 434 nxt_fd_close(fd); 435 436 return -1; 437 } 438 439 return fd; 440 } 441 442 443 static nxt_port_mmap_handler_t * 444 nxt_port_mmap_get(nxt_task_t *task, nxt_port_mmaps_t *mmaps, nxt_chunk_id_t *c, 445 nxt_int_t n, nxt_bool_t tracking) 446 { 447 nxt_int_t i, res, nchunks; 448 nxt_free_map_t *free_map; 449 nxt_port_mmap_t *port_mmap; 450 nxt_port_mmap_t *end_port_mmap; 451 nxt_port_mmap_header_t *hdr; 452 nxt_port_mmap_handler_t *mmap_handler; 453 454 nxt_thread_mutex_lock(&mmaps->mutex); 455 456 end_port_mmap = mmaps->elts + mmaps->size; 457 458 for (port_mmap = mmaps->elts; 459 port_mmap < end_port_mmap; 460 port_mmap++) 461 { 462 mmap_handler = port_mmap->mmap_handler; 463 hdr = mmap_handler->hdr; 464 465 if (hdr->sent_over != 0xFFFFu) { 466 continue; 467 } 468 469 *c = 0; 470 471 free_map = tracking ? hdr->free_tracking_map : hdr->free_map; 472 473 while (nxt_port_mmap_get_free_chunk(free_map, c)) { 474 nchunks = 1; 475 476 while (nchunks < n) { 477 res = nxt_port_mmap_chk_set_chunk_busy(free_map, *c + nchunks); 478 479 if (res == 0) { 480 for (i = 0; i < nchunks; i++) { 481 nxt_port_mmap_set_chunk_free(free_map, *c + i); 482 } 483 484 *c += nchunks + 1; 485 nchunks = 0; 486 break; 487 } 488 489 nchunks++; 490 } 491 492 if (nchunks == n) { 493 goto unlock_return; 494 } 495 } 496 497 hdr->oosm = 1; 498 } 499 500 /* TODO introduce port_mmap limit and release wait. */ 501 502 *c = 0; 503 mmap_handler = nxt_port_new_port_mmap(task, mmaps, tracking, n); 504 505 unlock_return: 506 507 nxt_thread_mutex_unlock(&mmaps->mutex); 508 509 return mmap_handler; 510 } 511 512 513 static nxt_port_mmap_handler_t * 514 nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id) 515 { 516 nxt_process_t *process; 517 nxt_port_mmap_handler_t *mmap_handler; 518 519 process = nxt_runtime_process_find(task->thread->runtime, spid); 520 if (nxt_slow_path(process == NULL)) { 521 return NULL; 522 } 523 524 nxt_thread_mutex_lock(&process->incoming.mutex); 525 526 if (nxt_fast_path(process->incoming.size > id)) { 527 mmap_handler = process->incoming.elts[id].mmap_handler; 528 529 } else { 530 mmap_handler = NULL; 531 532 nxt_debug(task, "invalid incoming mmap id %uD for pid %PI", id, spid); 533 } 534 535 nxt_thread_mutex_unlock(&process->incoming.mutex); 536 537 return mmap_handler; 538 } 539 540 541 nxt_int_t 542 nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_mmaps_t *mmaps, 543 nxt_port_mmap_tracking_t *tracking, uint32_t stream) 544 { 545 nxt_chunk_id_t c; 546 nxt_port_mmap_header_t *hdr; 547 nxt_port_mmap_handler_t *mmap_handler; 548 549 nxt_debug(task, "request tracking for stream #%uD", stream); 550 551 mmap_handler = nxt_port_mmap_get(task, mmaps, &c, 1, 1); 552 if (nxt_slow_path(mmap_handler == NULL)) { 553 return NXT_ERROR; 554 } 555 556 nxt_port_mmap_handler_use(mmap_handler, 1); 557 558 hdr = mmap_handler->hdr; 559 560 tracking->mmap_handler = mmap_handler; 561 tracking->tracking = hdr->tracking + c; 562 563 *tracking->tracking = stream; 564 565 nxt_debug(task, "outgoing tracking allocation: %PI->%PI,%d,%d", 566 hdr->src_pid, hdr->dst_pid, hdr->id, c); 567 568 return NXT_OK; 569 } 570 571 572 nxt_bool_t 573 nxt_port_mmap_tracking_cancel(nxt_task_t *task, 574 nxt_port_mmap_tracking_t *tracking, uint32_t stream) 575 { 576 nxt_bool_t res; 577 nxt_chunk_id_t c; 578 nxt_port_mmap_header_t *hdr; 579 nxt_port_mmap_handler_t *mmap_handler; 580 581 mmap_handler = tracking->mmap_handler; 582 583 if (nxt_slow_path(mmap_handler == NULL)) { 584 return 0; 585 } 586 587 hdr = mmap_handler->hdr; 588 589 res = nxt_atomic_cmp_set(tracking->tracking, stream, 0); 590 591 nxt_debug(task, "%s tracking for stream #%uD", 592 (res ? "cancelled" : "failed to cancel"), stream); 593 594 if (!res) { 595 c = tracking->tracking - hdr->tracking; 596 nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c); 597 } 598 599 nxt_port_mmap_handler_use(mmap_handler, -1); 600 601 return res; 602 } 603 604 605 nxt_int_t 606 nxt_port_mmap_tracking_write(uint32_t *buf, nxt_port_mmap_tracking_t *t) 607 { 608 nxt_port_mmap_handler_t *mmap_handler; 609 610 mmap_handler = t->mmap_handler; 611 612 #if (NXT_DEBUG) 613 { 614 nxt_atomic_t *tracking; 615 616 tracking = mmap_handler->hdr->tracking; 617 618 nxt_assert(t->tracking >= tracking); 619 nxt_assert(t->tracking < tracking + PORT_MMAP_CHUNK_COUNT); 620 } 621 #endif 622 623 buf[0] = mmap_handler->hdr->id; 624 buf[1] = t->tracking - mmap_handler->hdr->tracking; 625 626 return NXT_OK; 627 } 628 629 nxt_bool_t 630 nxt_port_mmap_tracking_read(nxt_task_t *task, nxt_port_recv_msg_t *msg) 631 { 632 nxt_buf_t *b; 633 nxt_bool_t res; 634 nxt_chunk_id_t c; 635 nxt_port_mmap_header_t *hdr; 636 nxt_port_mmap_handler_t *mmap_handler; 637 nxt_port_mmap_tracking_msg_t *tracking_msg; 638 639 b = msg->buf; 640 641 if (nxt_buf_used_size(b) < (int) sizeof(nxt_port_mmap_tracking_msg_t)) { 642 nxt_debug(task, "too small message %O", nxt_buf_used_size(b)); 643 return 0; 644 } 645 646 tracking_msg = (nxt_port_mmap_tracking_msg_t *) b->mem.pos; 647 648 b->mem.pos += sizeof(nxt_port_mmap_tracking_msg_t); 649 mmap_handler = nxt_port_get_port_incoming_mmap(task, msg->port_msg.pid, 650 tracking_msg->mmap_id); 651 652 if (nxt_slow_path(mmap_handler == NULL)) { 653 return 0; 654 } 655 656 hdr = mmap_handler->hdr; 657 658 c = tracking_msg->tracking_id; 659 res = nxt_atomic_cmp_set(hdr->tracking + c, msg->port_msg.stream, 0); 660 661 nxt_debug(task, "tracking for stream #%uD %s", msg->port_msg.stream, 662 (res ? "received" : "already cancelled")); 663 664 if (!res) { 665 nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c); 666 } 667 668 return res; 669 } 670 671 672 nxt_buf_t * 673 nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_mmaps_t *mmaps, size_t size) 674 { 675 nxt_mp_t *mp; 676 nxt_buf_t *b; 677 nxt_int_t nchunks; 678 nxt_chunk_id_t c; 679 nxt_port_mmap_header_t *hdr; 680 nxt_port_mmap_handler_t *mmap_handler; 681 682 nxt_debug(task, "request %z bytes shm buffer", size); 683 684 nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE; 685 686 if (nxt_slow_path(nchunks > PORT_MMAP_CHUNK_COUNT)) { 687 nxt_alert(task, "requested buffer (%z) too big", size); 688 689 return NULL; 690 } 691 692 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 0); 693 if (nxt_slow_path(b == NULL)) { 694 return NULL; 695 } 696 697 b->completion_handler = nxt_port_mmap_buf_completion; 698 nxt_buf_set_port_mmap(b); 699 700 mmap_handler = nxt_port_mmap_get(task, mmaps, &c, nchunks, 0); 701 if (nxt_slow_path(mmap_handler == NULL)) { 702 mp = task->thread->engine->mem_pool; 703 nxt_mp_free(mp, b); 704 nxt_mp_release(mp); 705 return NULL; 706 } 707 708 b->parent = mmap_handler; 709 710 nxt_port_mmap_handler_use(mmap_handler, 1); 711 712 hdr = mmap_handler->hdr; 713 714 b->mem.start = nxt_port_mmap_chunk_start(hdr, c); 715 b->mem.pos = b->mem.start; 716 b->mem.free = b->mem.start; 717 b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE; 718 719 nxt_debug(task, "outgoing mmap buf allocation: %p [%p,%uz] %PI->%PI,%d,%d", 720 b, b->mem.start, b->mem.end - b->mem.start, 721 hdr->src_pid, hdr->dst_pid, hdr->id, c); 722 723 return b; 724 } 725 726 727 nxt_int_t 728 nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size, 729 size_t min_size) 730 { 731 size_t nchunks, free_size; 732 nxt_chunk_id_t c, start; 733 nxt_port_mmap_header_t *hdr; 734 nxt_port_mmap_handler_t *mmap_handler; 735 736 nxt_debug(task, "request increase %z bytes shm buffer", size); 737 738 if (nxt_slow_path(nxt_buf_is_port_mmap(b) == 0)) { 739 nxt_log(task, NXT_LOG_WARN, 740 "failed to increase, not a mmap buffer"); 741 return NXT_ERROR; 742 } 743 744 free_size = nxt_buf_mem_free_size(&b->mem); 745 746 if (nxt_slow_path(size <= free_size)) { 747 return NXT_OK; 748 } 749 750 mmap_handler = b->parent; 751 hdr = mmap_handler->hdr; 752 753 start = nxt_port_mmap_chunk_id(hdr, b->mem.end); 754 755 size -= free_size; 756 757 nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE; 758 759 c = start; 760 761 /* Try to acquire as much chunks as required. */ 762 while (nchunks > 0) { 763 764 if (nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, c) == 0) { 765 break; 766 } 767 768 c++; 769 nchunks--; 770 } 771 772 if (nchunks != 0 773 && min_size > free_size + PORT_MMAP_CHUNK_SIZE * (c - start)) 774 { 775 c--; 776 while (c >= start) { 777 nxt_port_mmap_set_chunk_free(hdr->free_map, c); 778 c--; 779 } 780 781 nxt_debug(task, "failed to increase, %uz chunks busy", nchunks); 782 783 return NXT_ERROR; 784 785 } else { 786 b->mem.end += PORT_MMAP_CHUNK_SIZE * (c - start); 787 788 return NXT_OK; 789 } 790 } 791 792 793 static nxt_buf_t * 794 nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port, 795 nxt_pid_t spid, nxt_port_mmap_msg_t *mmap_msg) 796 { 797 size_t nchunks; 798 nxt_buf_t *b; 799 nxt_port_mmap_header_t *hdr; 800 nxt_port_mmap_handler_t *mmap_handler; 801 802 mmap_handler = nxt_port_get_port_incoming_mmap(task, spid, 803 mmap_msg->mmap_id); 804 if (nxt_slow_path(mmap_handler == NULL)) { 805 return NULL; 806 } 807 808 b = nxt_buf_mem_ts_alloc(task, port->mem_pool, 0); 809 if (nxt_slow_path(b == NULL)) { 810 return NULL; 811 } 812 813 b->completion_handler = nxt_port_mmap_buf_completion; 814 815 nxt_buf_set_port_mmap(b); 816 817 nchunks = mmap_msg->size / PORT_MMAP_CHUNK_SIZE; 818 if ((mmap_msg->size % PORT_MMAP_CHUNK_SIZE) != 0) { 819 nchunks++; 820 } 821 822 hdr = mmap_handler->hdr; 823 824 b->mem.start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id); 825 b->mem.pos = b->mem.start; 826 b->mem.free = b->mem.start + mmap_msg->size; 827 b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE; 828 829 b->parent = mmap_handler; 830 nxt_port_mmap_handler_use(mmap_handler, 1); 831 832 nxt_debug(task, "incoming mmap buf allocation: %p [%p,%uz] %PI->%PI,%d,%d", 833 b, b->mem.start, b->mem.end - b->mem.start, 834 hdr->src_pid, hdr->dst_pid, hdr->id, mmap_msg->chunk_id); 835 836 return b; 837 } 838 839 840 void 841 nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port, 842 nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb, void *mmsg_buf) 843 { 844 size_t bsize; 845 nxt_buf_t *bmem; 846 nxt_uint_t i; 847 nxt_port_mmap_msg_t *mmap_msg; 848 nxt_port_mmap_header_t *hdr; 849 nxt_port_mmap_handler_t *mmap_handler; 850 851 nxt_debug(task, "prepare %z bytes message for transfer to process %PI " 852 "via shared memory", sb->size, port->pid); 853 854 bsize = sb->niov * sizeof(nxt_port_mmap_msg_t); 855 mmap_msg = mmsg_buf; 856 857 bmem = msg->buf; 858 859 for (i = 0; i < sb->niov; i++, mmap_msg++) { 860 861 /* Lookup buffer which starts current iov_base. */ 862 while (bmem && sb->iobuf[i].iov_base != bmem->mem.pos) { 863 bmem = bmem->next; 864 } 865 866 if (nxt_slow_path(bmem == NULL)) { 867 nxt_log_error(NXT_LOG_ERR, task->log, 868 "failed to find buf for iobuf[%d]", i); 869 return; 870 /* TODO clear b and exit */ 871 } 872 873 mmap_handler = bmem->parent; 874 hdr = mmap_handler->hdr; 875 876 mmap_msg->mmap_id = hdr->id; 877 mmap_msg->chunk_id = nxt_port_mmap_chunk_id(hdr, bmem->mem.pos); 878 mmap_msg->size = sb->iobuf[i].iov_len; 879 880 nxt_debug(task, "mmap_msg={%D, %D, %D} to %PI", 881 mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size, 882 port->pid); 883 } 884 885 sb->iobuf[0].iov_base = mmsg_buf; 886 sb->iobuf[0].iov_len = bsize; 887 sb->niov = 1; 888 sb->size = bsize; 889 890 msg->port_msg.mmap = 1; 891 } 892 893 894 void 895 nxt_port_mmap_read(nxt_task_t *task, nxt_port_recv_msg_t *msg) 896 { 897 nxt_buf_t *b, **pb; 898 nxt_port_mmap_msg_t *end, *mmap_msg; 899 900 pb = &msg->buf; 901 msg->size = 0; 902 903 for (b = msg->buf; b != NULL; b = b->next) { 904 905 mmap_msg = (nxt_port_mmap_msg_t *) b->mem.pos; 906 end = (nxt_port_mmap_msg_t *) b->mem.free; 907 908 while (mmap_msg < end) { 909 nxt_debug(task, "mmap_msg={%D, %D, %D} from %PI", 910 mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size, 911 msg->port_msg.pid); 912 913 *pb = nxt_port_mmap_get_incoming_buf(task, msg->port, 914 msg->port_msg.pid, mmap_msg); 915 if (nxt_slow_path(*pb == NULL)) { 916 nxt_log_error(NXT_LOG_ERR, task->log, 917 "failed to get mmap buffer"); 918 919 break; 920 } 921 922 msg->size += mmap_msg->size; 923 pb = &(*pb)->next; 924 mmap_msg++; 925 926 /* Mark original buf as complete. */ 927 b->mem.pos += sizeof(nxt_port_mmap_msg_t); 928 } 929 } 930 } 931 932 933 nxt_port_method_t 934 nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b) 935 { 936 nxt_port_method_t m; 937 938 m = NXT_PORT_METHOD_ANY; 939 940 for (/* void */; b != NULL; b = b->next) { 941 if (nxt_buf_used_size(b) == 0) { 942 /* empty buffers does not affect method */ 943 continue; 944 } 945 946 if (nxt_buf_is_port_mmap(b)) { 947 if (m == NXT_PORT_METHOD_PLAIN) { 948 nxt_log_error(NXT_LOG_ERR, task->log, 949 "mixing plain and mmap buffers, " 950 "using plain mode"); 951 952 break; 953 } 954 955 if (m == NXT_PORT_METHOD_ANY) { 956 nxt_debug(task, "using mmap mode"); 957 958 m = NXT_PORT_METHOD_MMAP; 959 } 960 } else { 961 if (m == NXT_PORT_METHOD_MMAP) { 962 nxt_log_error(NXT_LOG_ERR, task->log, 963 "mixing mmap and plain buffers, " 964 "switching to plain mode"); 965 966 m = NXT_PORT_METHOD_PLAIN; 967 968 break; 969 } 970 971 if (m == NXT_PORT_METHOD_ANY) { 972 nxt_debug(task, "using plain mode"); 973 974 m = NXT_PORT_METHOD_PLAIN; 975 } 976 } 977 } 978 979 return m; 980 } 981 982 983 void 984 nxt_process_broadcast_shm_ack(nxt_task_t *task, nxt_process_t *process) 985 { 986 nxt_port_t *port; 987 988 if (nxt_slow_path(process == NULL || nxt_queue_is_empty(&process->ports))) 989 { 990 return; 991 } 992 993 port = nxt_process_port_first(process); 994 995 if (port->type == NXT_PROCESS_APP) { 996 nxt_port_post(task, port, nxt_port_broadcast_shm_ack, process); 997 } 998 } 999 1000 1001 static void 1002 nxt_port_broadcast_shm_ack(nxt_task_t *task, nxt_port_t *port, void *data) 1003 { 1004 nxt_process_t *process; 1005 1006 process = data; 1007 1008 nxt_queue_each(port, &process->ports, nxt_port_t, link) { 1009 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_SHM_ACK, 1010 -1, 0, 0, NULL); 1011 } nxt_queue_loop; 1012 } 1013