1 2 /* 3 * Copyright (C) Max Romanov 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 #if (NXT_HAVE_MEMFD_CREATE) 10 11 #include <linux/memfd.h> 12 #include <unistd.h> 13 #include <sys/syscall.h> 14 15 #endif 16 17 #include <nxt_port_memory_int.h> 18 19 20 nxt_inline void 21 nxt_port_mmap_handler_use(nxt_port_mmap_handler_t *mmap_handler, int i) 22 { 23 int c; 24 25 c = nxt_atomic_fetch_add(&mmap_handler->use_count, i); 26 27 if (i < 0 && c == -i) { 28 if (mmap_handler->hdr != NULL) { 29 nxt_mem_munmap(mmap_handler->hdr, PORT_MMAP_SIZE); 30 mmap_handler->hdr = NULL; 31 } 32 33 nxt_free(mmap_handler); 34 } 35 } 36 37 38 static nxt_port_mmap_t * 39 nxt_port_mmap_at(nxt_port_mmaps_t *port_mmaps, uint32_t i) 40 { 41 uint32_t cap; 42 43 cap = port_mmaps->cap; 44 45 if (cap == 0) { 46 cap = i + 1; 47 } 48 49 while (i + 1 > cap) { 50 51 if (cap < 16) { 52 cap = cap * 2; 53 54 } else { 55 cap = cap + cap / 2; 56 } 57 } 58 59 if (cap != port_mmaps->cap) { 60 61 port_mmaps->elts = nxt_realloc(port_mmaps->elts, 62 cap * sizeof(nxt_port_mmap_t)); 63 if (nxt_slow_path(port_mmaps->elts == NULL)) { 64 return NULL; 65 } 66 67 nxt_memzero(port_mmaps->elts + port_mmaps->cap, 68 sizeof(nxt_port_mmap_t) * (cap - port_mmaps->cap)); 69 70 port_mmaps->cap = cap; 71 } 72 73 if (i + 1 > port_mmaps->size) { 74 port_mmaps->size = i + 1; 75 } 76 77 return port_mmaps->elts + i; 78 } 79 80 81 void 82 nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free_elts) 83 { 84 uint32_t i; 85 nxt_port_mmap_t *port_mmap; 86 87 if (port_mmaps == NULL) { 88 return; 89 } 90 91 port_mmap = port_mmaps->elts; 92 93 for (i = 0; i < port_mmaps->size; i++) { 94 nxt_port_mmap_handler_use(port_mmap[i].mmap_handler, -1); 95 } 96 97 port_mmaps->size = 0; 98 99 if (free_elts != 0) { 100 nxt_free(port_mmaps->elts); 101 } 102 } 103 104 105 #define nxt_port_mmap_free_junk(p, size) \ 106 memset((p), 0xA5, size) 107 108 109 static void 110 nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data) 111 { 112 u_char *p; 113 nxt_mp_t *mp; 114 nxt_buf_t *b, *next; 115 nxt_port_t *port; 116 nxt_process_t *process; 117 nxt_chunk_id_t c; 118 nxt_port_mmap_header_t *hdr; 119 nxt_port_mmap_handler_t *mmap_handler; 120 121 if (nxt_buf_ts_handle(task, obj, data)) { 122 return; 123 } 124 125 b = obj; 126 127 nxt_assert(data == b->parent); 128 129 mmap_handler = data; 130 131 complete_buf: 132 133 hdr = mmap_handler->hdr; 134 135 if (nxt_slow_path(hdr->src_pid != nxt_pid && hdr->dst_pid != nxt_pid)) { 136 nxt_debug(task, "mmap buf completion: mmap for other process pair " 137 "%PI->%PI", hdr->src_pid, hdr->dst_pid); 138 139 goto release_buf; 140 } 141 142 if (b->is_port_mmap_sent && b->mem.pos > b->mem.start) { 143 /* 144 * Chunks until b->mem.pos has been sent to other side, 145 * let's release rest (if any). 146 */ 147 p = b->mem.pos - 1; 148 c = nxt_port_mmap_chunk_id(hdr, p) + 1; 149 p = nxt_port_mmap_chunk_start(hdr, c); 150 151 } else { 152 p = b->mem.start; 153 c = nxt_port_mmap_chunk_id(hdr, p); 154 } 155 156 nxt_port_mmap_free_junk(p, b->mem.end - p); 157 158 nxt_debug(task, "mmap buf completion: %p [%p,%uz] (sent=%d), " 159 "%PI->%PI,%d,%d", b, b->mem.start, b->mem.end - b->mem.start, 160 b->is_port_mmap_sent, hdr->src_pid, hdr->dst_pid, hdr->id, c); 161 162 while (p < b->mem.end) { 163 nxt_port_mmap_set_chunk_free(hdr->free_map, c); 164 165 p += PORT_MMAP_CHUNK_SIZE; 166 c++; 167 } 168 169 if (hdr->dst_pid == nxt_pid 170 && nxt_atomic_cmp_set(&hdr->oosm, 1, 0)) 171 { 172 process = nxt_runtime_process_find(task->thread->runtime, hdr->src_pid); 173 174 if (process != NULL && !nxt_queue_is_empty(&process->ports)) { 175 port = nxt_process_port_first(process); 176 177 if (port->type == NXT_PROCESS_APP) { 178 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_SHM_ACK, 179 -1, 0, 0, NULL); 180 } 181 } 182 } 183 184 release_buf: 185 186 nxt_port_mmap_handler_use(mmap_handler, -1); 187 188 next = b->next; 189 mp = b->data; 190 191 nxt_mp_free(mp, b); 192 nxt_mp_release(mp); 193 194 if (next != NULL) { 195 b = next; 196 mmap_handler = b->parent; 197 198 goto complete_buf; 199 } 200 } 201 202 203 nxt_port_mmap_handler_t * 204 nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process, 205 nxt_fd_t fd) 206 { 207 void *mem; 208 struct stat mmap_stat; 209 nxt_port_mmap_t *port_mmap; 210 nxt_port_mmap_header_t *hdr; 211 nxt_port_mmap_handler_t *mmap_handler; 212 213 nxt_debug(task, "got new mmap fd #%FD from process %PI", 214 fd, process->pid); 215 216 port_mmap = NULL; 217 218 if (fstat(fd, &mmap_stat) == -1) { 219 nxt_log(task, NXT_LOG_WARN, "fstat(%FD) failed %E", fd, nxt_errno); 220 221 return NULL; 222 } 223 224 mem = nxt_mem_mmap(NULL, mmap_stat.st_size, 225 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 226 227 if (nxt_slow_path(mem == MAP_FAILED)) { 228 nxt_log(task, NXT_LOG_WARN, "mmap() failed %E", nxt_errno); 229 230 return NULL; 231 } 232 233 hdr = mem; 234 235 mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t)); 236 if (nxt_slow_path(mmap_handler == NULL)) { 237 nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler"); 238 239 nxt_mem_munmap(mem, PORT_MMAP_SIZE); 240 241 return NULL; 242 } 243 244 mmap_handler->hdr = hdr; 245 246 if (nxt_slow_path(hdr->src_pid != process->pid 247 || hdr->dst_pid != nxt_pid)) 248 { 249 nxt_log(task, NXT_LOG_WARN, "unexpected pid in mmap header detected: " 250 "%PI != %PI or %PI != %PI", hdr->src_pid, process->pid, 251 hdr->dst_pid, nxt_pid); 252 253 return NULL; 254 } 255 256 nxt_thread_mutex_lock(&process->incoming.mutex); 257 258 port_mmap = nxt_port_mmap_at(&process->incoming, hdr->id); 259 if (nxt_slow_path(port_mmap == NULL)) { 260 nxt_log(task, NXT_LOG_WARN, "failed to add mmap to incoming array"); 261 262 nxt_mem_munmap(mem, PORT_MMAP_SIZE); 263 hdr = NULL; 264 265 nxt_free(mmap_handler); 266 mmap_handler = NULL; 267 268 goto fail; 269 } 270 271 port_mmap->mmap_handler = mmap_handler; 272 nxt_port_mmap_handler_use(mmap_handler, 1); 273 274 hdr->sent_over = 0xFFFFu; 275 276 fail: 277 278 nxt_thread_mutex_unlock(&process->incoming.mutex); 279 280 return mmap_handler; 281 } 282 283 284 static nxt_port_mmap_handler_t * 285 nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, 286 nxt_port_t *port, nxt_bool_t tracking, nxt_int_t n) 287 { 288 void *mem; 289 nxt_fd_t fd; 290 nxt_int_t i; 291 nxt_free_map_t *free_map; 292 nxt_port_mmap_t *port_mmap; 293 nxt_port_mmap_header_t *hdr; 294 nxt_port_mmap_handler_t *mmap_handler; 295 296 mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t)); 297 if (nxt_slow_path(mmap_handler == NULL)) { 298 nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler"); 299 300 return NULL; 301 } 302 303 port_mmap = nxt_port_mmap_at(&process->outgoing, process->outgoing.size); 304 if (nxt_slow_path(port_mmap == NULL)) { 305 nxt_log(task, NXT_LOG_WARN, 306 "failed to add port mmap to outgoing array"); 307 308 nxt_free(mmap_handler); 309 return NULL; 310 } 311 312 fd = nxt_shm_open(task, PORT_MMAP_SIZE); 313 if (nxt_slow_path(fd == -1)) { 314 goto remove_fail; 315 } 316 317 mem = nxt_mem_mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE, 318 MAP_SHARED, fd, 0); 319 320 if (nxt_slow_path(mem == MAP_FAILED)) { 321 goto remove_fail; 322 } 323 324 mmap_handler->hdr = mem; 325 port_mmap->mmap_handler = mmap_handler; 326 nxt_port_mmap_handler_use(mmap_handler, 1); 327 328 /* Init segment header. */ 329 hdr = mmap_handler->hdr; 330 331 nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map)); 332 nxt_memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map)); 333 334 hdr->id = process->outgoing.size - 1; 335 hdr->src_pid = nxt_pid; 336 hdr->dst_pid = process->pid; 337 hdr->sent_over = port->id; 338 339 /* Mark first chunk as busy */ 340 free_map = tracking ? hdr->free_tracking_map : hdr->free_map; 341 342 for (i = 0; i < n; i++) { 343 nxt_port_mmap_set_chunk_busy(free_map, i); 344 } 345 346 /* Mark as busy chunk followed the last available chunk. */ 347 nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT); 348 nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT); 349 350 nxt_debug(task, "send mmap fd %FD to process %PI", fd, port->pid); 351 352 /* TODO handle error */ 353 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL); 354 355 nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> %PI", 356 hdr->id, nxt_pid, process->pid); 357 358 return mmap_handler; 359 360 remove_fail: 361 362 nxt_free(mmap_handler); 363 364 process->outgoing.size--; 365 366 return NULL; 367 } 368 369 370 nxt_int_t 371 nxt_shm_open(nxt_task_t *task, size_t size) 372 { 373 nxt_fd_t fd; 374 375 #if (NXT_HAVE_MEMFD_CREATE || NXT_HAVE_SHM_OPEN) 376 377 u_char *p, name[64]; 378 379 p = nxt_sprintf(name, name + sizeof(name), NXT_SHM_PREFIX "unit.%PI.%uxD", 380 nxt_pid, nxt_random(&task->thread->random)); 381 *p = '\0'; 382 383 #endif 384 385 #if (NXT_HAVE_MEMFD_CREATE) 386 387 fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC); 388 389 if (nxt_slow_path(fd == -1)) { 390 nxt_alert(task, "memfd_create(%s) failed %E", name, nxt_errno); 391 392 return -1; 393 } 394 395 nxt_debug(task, "memfd_create(%s): %FD", name, fd); 396 397 #elif (NXT_HAVE_SHM_OPEN_ANON) 398 399 fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR); 400 401 if (nxt_slow_path(fd == -1)) { 402 nxt_alert(task, "shm_open(SHM_ANON) failed %E", nxt_errno); 403 404 return -1; 405 } 406 407 nxt_debug(task, "shm_open(SHM_ANON): %FD", fd); 408 409 #elif (NXT_HAVE_SHM_OPEN) 410 411 /* Just in case. */ 412 shm_unlink((char *) name); 413 414 fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR); 415 416 if (nxt_slow_path(fd == -1)) { 417 nxt_alert(task, "shm_open(%s) failed %E", name, nxt_errno); 418 419 return -1; 420 } 421 422 nxt_debug(task, "shm_open(%s): %FD", name, fd); 423 424 if (nxt_slow_path(shm_unlink((char *) name) == -1)) { 425 nxt_log(task, NXT_LOG_WARN, "shm_unlink(%s) failed %E", name, 426 nxt_errno); 427 } 428 429 #else 430 431 #error No working shared memory implementation. 432 433 #endif 434 435 if (nxt_slow_path(ftruncate(fd, size) == -1)) { 436 nxt_alert(task, "ftruncate() failed %E", nxt_errno); 437 438 nxt_fd_close(fd); 439 440 return -1; 441 } 442 443 return fd; 444 } 445 446 447 static nxt_port_mmap_handler_t * 448 nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c, 449 nxt_int_t n, nxt_bool_t tracking) 450 { 451 nxt_int_t i, res, nchunks; 452 nxt_process_t *process; 453 nxt_free_map_t *free_map; 454 nxt_port_mmap_t *port_mmap; 455 nxt_port_mmap_t *end_port_mmap; 456 nxt_port_mmap_header_t *hdr; 457 nxt_port_mmap_handler_t *mmap_handler; 458 459 process = port->process; 460 if (nxt_slow_path(process == NULL)) { 461 return NULL; 462 } 463 464 nxt_thread_mutex_lock(&process->outgoing.mutex); 465 466 end_port_mmap = process->outgoing.elts + process->outgoing.size; 467 468 for (port_mmap = process->outgoing.elts; 469 port_mmap < end_port_mmap; 470 port_mmap++) 471 { 472 mmap_handler = port_mmap->mmap_handler; 473 hdr = mmap_handler->hdr; 474 475 if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port->id) { 476 continue; 477 } 478 479 *c = 0; 480 481 free_map = tracking ? hdr->free_tracking_map : hdr->free_map; 482 483 while (nxt_port_mmap_get_free_chunk(free_map, c)) { 484 nchunks = 1; 485 486 while (nchunks < n) { 487 res = nxt_port_mmap_chk_set_chunk_busy(free_map, *c + nchunks); 488 489 if (res == 0) { 490 for (i = 0; i < nchunks; i++) { 491 nxt_port_mmap_set_chunk_free(free_map, *c + i); 492 } 493 494 *c += nchunks + 1; 495 nchunks = 0; 496 break; 497 } 498 499 nchunks++; 500 } 501 502 if (nchunks == n) { 503 goto unlock_return; 504 } 505 } 506 507 hdr->oosm = 1; 508 } 509 510 /* TODO introduce port_mmap limit and release wait. */ 511 512 *c = 0; 513 mmap_handler = nxt_port_new_port_mmap(task, process, port, tracking, n); 514 515 unlock_return: 516 517 nxt_thread_mutex_unlock(&process->outgoing.mutex); 518 519 return mmap_handler; 520 } 521 522 523 static nxt_port_mmap_handler_t * 524 nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id) 525 { 526 nxt_process_t *process; 527 nxt_port_mmap_handler_t *mmap_handler; 528 529 process = nxt_runtime_process_find(task->thread->runtime, spid); 530 if (nxt_slow_path(process == NULL)) { 531 return NULL; 532 } 533 534 nxt_thread_mutex_lock(&process->incoming.mutex); 535 536 if (nxt_fast_path(process->incoming.size > id)) { 537 mmap_handler = process->incoming.elts[id].mmap_handler; 538 539 } else { 540 mmap_handler = NULL; 541 542 nxt_debug(task, "invalid incoming mmap id %uD for pid %PI", id, spid); 543 } 544 545 nxt_thread_mutex_unlock(&process->incoming.mutex); 546 547 return mmap_handler; 548 } 549 550 551 nxt_int_t 552 nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_t *port, 553 nxt_port_mmap_tracking_t *tracking, uint32_t stream) 554 { 555 nxt_chunk_id_t c; 556 nxt_port_mmap_header_t *hdr; 557 nxt_port_mmap_handler_t *mmap_handler; 558 559 nxt_debug(task, "request tracking for stream #%uD", stream); 560 561 mmap_handler = nxt_port_mmap_get(task, port, &c, 1, 1); 562 if (nxt_slow_path(mmap_handler == NULL)) { 563 return NXT_ERROR; 564 } 565 566 nxt_port_mmap_handler_use(mmap_handler, 1); 567 568 hdr = mmap_handler->hdr; 569 570 tracking->mmap_handler = mmap_handler; 571 tracking->tracking = hdr->tracking + c; 572 573 *tracking->tracking = stream; 574 575 nxt_debug(task, "outgoing tracking allocation: %PI->%PI,%d,%d", 576 hdr->src_pid, hdr->dst_pid, hdr->id, c); 577 578 return NXT_OK; 579 } 580 581 582 nxt_bool_t 583 nxt_port_mmap_tracking_cancel(nxt_task_t *task, 584 nxt_port_mmap_tracking_t *tracking, uint32_t stream) 585 { 586 nxt_bool_t res; 587 nxt_chunk_id_t c; 588 nxt_port_mmap_header_t *hdr; 589 nxt_port_mmap_handler_t *mmap_handler; 590 591 mmap_handler = tracking->mmap_handler; 592 593 if (nxt_slow_path(mmap_handler == NULL)) { 594 return 0; 595 } 596 597 hdr = mmap_handler->hdr; 598 599 res = nxt_atomic_cmp_set(tracking->tracking, stream, 0); 600 601 nxt_debug(task, "%s tracking for stream #%uD", 602 (res ? "cancelled" : "failed to cancel"), stream); 603 604 if (!res) { 605 c = tracking->tracking - hdr->tracking; 606 nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c); 607 } 608 609 nxt_port_mmap_handler_use(mmap_handler, -1); 610 611 return res; 612 } 613 614 615 nxt_int_t 616 nxt_port_mmap_tracking_write(uint32_t *buf, nxt_port_mmap_tracking_t *t) 617 { 618 nxt_port_mmap_handler_t *mmap_handler; 619 620 mmap_handler = t->mmap_handler; 621 622 #if (NXT_DEBUG) 623 { 624 nxt_atomic_t *tracking; 625 626 tracking = mmap_handler->hdr->tracking; 627 628 nxt_assert(t->tracking >= tracking); 629 nxt_assert(t->tracking < tracking + PORT_MMAP_CHUNK_COUNT); 630 } 631 #endif 632 633 buf[0] = mmap_handler->hdr->id; 634 buf[1] = t->tracking - mmap_handler->hdr->tracking; 635 636 return NXT_OK; 637 } 638 639 nxt_bool_t 640 nxt_port_mmap_tracking_read(nxt_task_t *task, nxt_port_recv_msg_t *msg) 641 { 642 nxt_buf_t *b; 643 nxt_bool_t res; 644 nxt_chunk_id_t c; 645 nxt_port_mmap_header_t *hdr; 646 nxt_port_mmap_handler_t *mmap_handler; 647 nxt_port_mmap_tracking_msg_t *tracking_msg; 648 649 b = msg->buf; 650 651 if (nxt_buf_used_size(b) < (int) sizeof(nxt_port_mmap_tracking_msg_t)) { 652 nxt_debug(task, "too small message %O", nxt_buf_used_size(b)); 653 return 0; 654 } 655 656 tracking_msg = (nxt_port_mmap_tracking_msg_t *) b->mem.pos; 657 658 b->mem.pos += sizeof(nxt_port_mmap_tracking_msg_t); 659 mmap_handler = nxt_port_get_port_incoming_mmap(task, msg->port_msg.pid, 660 tracking_msg->mmap_id); 661 662 if (nxt_slow_path(mmap_handler == NULL)) { 663 return 0; 664 } 665 666 hdr = mmap_handler->hdr; 667 668 c = tracking_msg->tracking_id; 669 res = nxt_atomic_cmp_set(hdr->tracking + c, msg->port_msg.stream, 0); 670 671 nxt_debug(task, "tracking for stream #%uD %s", msg->port_msg.stream, 672 (res ? "received" : "already cancelled")); 673 674 if (!res) { 675 nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c); 676 } 677 678 return res; 679 } 680 681 682 nxt_buf_t * 683 nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size) 684 { 685 nxt_mp_t *mp; 686 nxt_buf_t *b; 687 nxt_int_t nchunks; 688 nxt_chunk_id_t c; 689 nxt_port_mmap_header_t *hdr; 690 nxt_port_mmap_handler_t *mmap_handler; 691 692 nxt_debug(task, "request %z bytes shm buffer", size); 693 694 nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE; 695 696 if (nxt_slow_path(nchunks > PORT_MMAP_CHUNK_COUNT)) { 697 nxt_alert(task, "requested buffer (%z) too big", size); 698 699 return NULL; 700 } 701 702 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 0); 703 if (nxt_slow_path(b == NULL)) { 704 return NULL; 705 } 706 707 b->completion_handler = nxt_port_mmap_buf_completion; 708 nxt_buf_set_port_mmap(b); 709 710 mmap_handler = nxt_port_mmap_get(task, port, &c, nchunks, 0); 711 if (nxt_slow_path(mmap_handler == NULL)) { 712 mp = task->thread->engine->mem_pool; 713 nxt_mp_free(mp, b); 714 nxt_mp_release(mp); 715 return NULL; 716 } 717 718 b->parent = mmap_handler; 719 720 nxt_port_mmap_handler_use(mmap_handler, 1); 721 722 hdr = mmap_handler->hdr; 723 724 b->mem.start = nxt_port_mmap_chunk_start(hdr, c); 725 b->mem.pos = b->mem.start; 726 b->mem.free = b->mem.start; 727 b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE; 728 729 nxt_debug(task, "outgoing mmap buf allocation: %p [%p,%uz] %PI->%PI,%d,%d", 730 b, b->mem.start, b->mem.end - b->mem.start, 731 hdr->src_pid, hdr->dst_pid, hdr->id, c); 732 733 return b; 734 } 735 736 737 nxt_int_t 738 nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size, 739 size_t min_size) 740 { 741 size_t nchunks, free_size; 742 nxt_chunk_id_t c, start; 743 nxt_port_mmap_header_t *hdr; 744 nxt_port_mmap_handler_t *mmap_handler; 745 746 nxt_debug(task, "request increase %z bytes shm buffer", size); 747 748 if (nxt_slow_path(nxt_buf_is_port_mmap(b) == 0)) { 749 nxt_log(task, NXT_LOG_WARN, 750 "failed to increase, not a mmap buffer"); 751 return NXT_ERROR; 752 } 753 754 free_size = nxt_buf_mem_free_size(&b->mem); 755 756 if (nxt_slow_path(size <= free_size)) { 757 return NXT_OK; 758 } 759 760 mmap_handler = b->parent; 761 hdr = mmap_handler->hdr; 762 763 start = nxt_port_mmap_chunk_id(hdr, b->mem.end); 764 765 size -= free_size; 766 767 nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE; 768 769 c = start; 770 771 /* Try to acquire as much chunks as required. */ 772 while (nchunks > 0) { 773 774 if (nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, c) == 0) { 775 break; 776 } 777 778 c++; 779 nchunks--; 780 } 781 782 if (nchunks != 0 783 && min_size > free_size + PORT_MMAP_CHUNK_SIZE * (c - start)) 784 { 785 c--; 786 while (c >= start) { 787 nxt_port_mmap_set_chunk_free(hdr->free_map, c); 788 c--; 789 } 790 791 nxt_debug(task, "failed to increase, %uz chunks busy", nchunks); 792 793 return NXT_ERROR; 794 795 } else { 796 b->mem.end += PORT_MMAP_CHUNK_SIZE * (c - start); 797 798 return NXT_OK; 799 } 800 } 801 802 803 static nxt_buf_t * 804 nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port, 805 nxt_pid_t spid, nxt_port_mmap_msg_t *mmap_msg) 806 { 807 size_t nchunks; 808 nxt_buf_t *b; 809 nxt_port_mmap_header_t *hdr; 810 nxt_port_mmap_handler_t *mmap_handler; 811 812 mmap_handler = nxt_port_get_port_incoming_mmap(task, spid, 813 mmap_msg->mmap_id); 814 if (nxt_slow_path(mmap_handler == NULL)) { 815 return NULL; 816 } 817 818 b = nxt_buf_mem_ts_alloc(task, port->mem_pool, 0); 819 if (nxt_slow_path(b == NULL)) { 820 return NULL; 821 } 822 823 b->completion_handler = nxt_port_mmap_buf_completion; 824 825 nxt_buf_set_port_mmap(b); 826 827 nchunks = mmap_msg->size / PORT_MMAP_CHUNK_SIZE; 828 if ((mmap_msg->size % PORT_MMAP_CHUNK_SIZE) != 0) { 829 nchunks++; 830 } 831 832 hdr = mmap_handler->hdr; 833 834 b->mem.start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id); 835 b->mem.pos = b->mem.start; 836 b->mem.free = b->mem.start + mmap_msg->size; 837 b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE; 838 839 b->parent = mmap_handler; 840 nxt_port_mmap_handler_use(mmap_handler, 1); 841 842 nxt_debug(task, "incoming mmap buf allocation: %p [%p,%uz] %PI->%PI,%d,%d", 843 b, b->mem.start, b->mem.end - b->mem.start, 844 hdr->src_pid, hdr->dst_pid, hdr->id, mmap_msg->chunk_id); 845 846 return b; 847 } 848 849 850 void 851 nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port, 852 nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb, void *mmsg_buf) 853 { 854 size_t bsize; 855 nxt_buf_t *bmem; 856 nxt_uint_t i; 857 nxt_port_mmap_msg_t *mmap_msg; 858 nxt_port_mmap_header_t *hdr; 859 nxt_port_mmap_handler_t *mmap_handler; 860 861 nxt_debug(task, "prepare %z bytes message for transfer to process %PI " 862 "via shared memory", sb->size, port->pid); 863 864 bsize = sb->niov * sizeof(nxt_port_mmap_msg_t); 865 mmap_msg = mmsg_buf; 866 867 bmem = msg->buf; 868 869 for (i = 0; i < sb->niov; i++, mmap_msg++) { 870 871 /* Lookup buffer which starts current iov_base. */ 872 while (bmem && sb->iobuf[i].iov_base != bmem->mem.pos) { 873 bmem = bmem->next; 874 } 875 876 if (nxt_slow_path(bmem == NULL)) { 877 nxt_log_error(NXT_LOG_ERR, task->log, 878 "failed to find buf for iobuf[%d]", i); 879 return; 880 /* TODO clear b and exit */ 881 } 882 883 mmap_handler = bmem->parent; 884 hdr = mmap_handler->hdr; 885 886 mmap_msg->mmap_id = hdr->id; 887 mmap_msg->chunk_id = nxt_port_mmap_chunk_id(hdr, bmem->mem.pos); 888 mmap_msg->size = sb->iobuf[i].iov_len; 889 890 nxt_debug(task, "mmap_msg={%D, %D, %D} to %PI", 891 mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size, 892 port->pid); 893 } 894 895 sb->iobuf[0].iov_base = mmsg_buf; 896 sb->iobuf[0].iov_len = bsize; 897 sb->niov = 1; 898 sb->size = bsize; 899 900 msg->port_msg.mmap = 1; 901 } 902 903 904 void 905 nxt_port_mmap_read(nxt_task_t *task, nxt_port_recv_msg_t *msg) 906 { 907 nxt_buf_t *b, **pb; 908 nxt_port_mmap_msg_t *end, *mmap_msg; 909 910 pb = &msg->buf; 911 msg->size = 0; 912 913 for (b = msg->buf; b != NULL; b = b->next) { 914 915 mmap_msg = (nxt_port_mmap_msg_t *) b->mem.pos; 916 end = (nxt_port_mmap_msg_t *) b->mem.free; 917 918 while (mmap_msg < end) { 919 nxt_debug(task, "mmap_msg={%D, %D, %D} from %PI", 920 mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size, 921 msg->port_msg.pid); 922 923 *pb = nxt_port_mmap_get_incoming_buf(task, msg->port, 924 msg->port_msg.pid, mmap_msg); 925 if (nxt_slow_path(*pb == NULL)) { 926 nxt_log_error(NXT_LOG_ERR, task->log, 927 "failed to get mmap buffer"); 928 929 break; 930 } 931 932 msg->size += mmap_msg->size; 933 pb = &(*pb)->next; 934 mmap_msg++; 935 936 /* Mark original buf as complete. */ 937 b->mem.pos += sizeof(nxt_port_mmap_msg_t); 938 } 939 } 940 } 941 942 943 nxt_port_method_t 944 nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b) 945 { 946 nxt_port_method_t m; 947 nxt_port_mmap_header_t *hdr; 948 nxt_port_mmap_handler_t *mmap_handler; 949 950 m = NXT_PORT_METHOD_ANY; 951 952 for (/* void */; b != NULL; b = b->next) { 953 if (nxt_buf_used_size(b) == 0) { 954 /* empty buffers does not affect method */ 955 continue; 956 } 957 958 if (nxt_buf_is_port_mmap(b)) { 959 mmap_handler = b->parent; 960 hdr = mmap_handler->hdr; 961 962 if (m == NXT_PORT_METHOD_PLAIN) { 963 nxt_log_error(NXT_LOG_ERR, task->log, 964 "mixing plain and mmap buffers, " 965 "using plain mode"); 966 967 break; 968 } 969 970 if (port->pid != hdr->dst_pid) { 971 nxt_log_error(NXT_LOG_ERR, task->log, 972 "send mmap buffer for %PI to %PI, " 973 "using plain mode", hdr->dst_pid, port->pid); 974 975 m = NXT_PORT_METHOD_PLAIN; 976 977 break; 978 } 979 980 if (m == NXT_PORT_METHOD_ANY) { 981 nxt_debug(task, "using mmap mode"); 982 983 m = NXT_PORT_METHOD_MMAP; 984 } 985 } else { 986 if (m == NXT_PORT_METHOD_MMAP) { 987 nxt_log_error(NXT_LOG_ERR, task->log, 988 "mixing mmap and plain buffers, " 989 "switching to plain mode"); 990 991 m = NXT_PORT_METHOD_PLAIN; 992 993 break; 994 } 995 996 if (m == NXT_PORT_METHOD_ANY) { 997 nxt_debug(task, "using plain mode"); 998 999 m = NXT_PORT_METHOD_PLAIN; 1000 } 1001 } 1002 } 1003 1004 return m; 1005 } 1006