1 2 /* 3 * Copyright (C) Max Romanov 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 #if (NXT_HAVE_MEMFD_CREATE) 10 11 #include <linux/memfd.h> 12 #include <unistd.h> 13 #include <sys/syscall.h> 14 15 #endif 16 17 #include <nxt_port_memory_int.h> 18 19 20 nxt_inline void 21 nxt_port_mmap_handler_use(nxt_port_mmap_handler_t *mmap_handler, int i) 22 { 23 int c; 24 25 c = nxt_atomic_fetch_add(&mmap_handler->use_count, i); 26 27 if (i < 0 && c == -i) { 28 if (mmap_handler->hdr != NULL) { 29 nxt_mem_munmap(mmap_handler->hdr, PORT_MMAP_SIZE); 30 mmap_handler->hdr = NULL; 31 } 32 33 nxt_free(mmap_handler); 34 } 35 } 36 37 38 static nxt_port_mmap_t * 39 nxt_port_mmap_at(nxt_port_mmaps_t *port_mmaps, uint32_t i) 40 { 41 uint32_t cap; 42 43 cap = port_mmaps->cap; 44 45 if (cap == 0) { 46 cap = i + 1; 47 } 48 49 while (i + 1 > cap) { 50 51 if (cap < 16) { 52 cap = cap * 2; 53 54 } else { 55 cap = cap + cap / 2; 56 } 57 } 58 59 if (cap != port_mmaps->cap) { 60 61 port_mmaps->elts = nxt_realloc(port_mmaps->elts, 62 cap * sizeof(nxt_port_mmap_t)); 63 if (nxt_slow_path(port_mmaps->elts == NULL)) { 64 return NULL; 65 } 66 67 nxt_memzero(port_mmaps->elts + port_mmaps->cap, 68 sizeof(nxt_port_mmap_t) * (cap - port_mmaps->cap)); 69 70 port_mmaps->cap = cap; 71 } 72 73 if (i + 1 > port_mmaps->size) { 74 port_mmaps->size = i + 1; 75 } 76 77 return port_mmaps->elts + i; 78 } 79 80 81 void 82 nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free_elts) 83 { 84 uint32_t i; 85 nxt_port_mmap_t *port_mmap; 86 87 if (port_mmaps == NULL) { 88 return; 89 } 90 91 port_mmap = port_mmaps->elts; 92 93 for (i = 0; i < port_mmaps->size; i++) { 94 nxt_port_mmap_handler_use(port_mmap[i].mmap_handler, -1); 95 } 96 97 port_mmaps->size = 0; 98 99 if (free_elts != 0) { 100 nxt_free(port_mmaps->elts); 101 } 102 } 103 104 105 #define nxt_port_mmap_free_junk(p, size) \ 106 memset((p), 0xA5, size) 107 108 109 static void 110 nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data) 111 { 112 u_char *p; 113 nxt_mp_t *mp; 114 nxt_buf_t *b; 115 nxt_chunk_id_t c; 116 nxt_port_mmap_header_t *hdr; 117 nxt_port_mmap_handler_t *mmap_handler; 118 119 if (nxt_buf_ts_handle(task, obj, data)) { 120 return; 121 } 122 123 b = obj; 124 125 mp = b->data; 126 127 #if (NXT_DEBUG) 128 if (nxt_slow_path(data != b->parent)) { 129 nxt_log_alert(task->log, "completion data (%p) != b->parent (%p)", 130 data, b->parent); 131 nxt_abort(); 132 } 133 #endif 134 135 mmap_handler = data; 136 hdr = mmap_handler->hdr; 137 138 if (nxt_slow_path(hdr->src_pid != nxt_pid && hdr->dst_pid != nxt_pid)) { 139 nxt_debug(task, "mmap buf completion: mmap for other process pair " 140 "%PI->%PI", hdr->src_pid, hdr->dst_pid); 141 142 goto release_buf; 143 } 144 145 if (b->is_port_mmap_sent && b->mem.pos > b->mem.start) { 146 /* 147 * Chunks until b->mem.pos has been sent to other side, 148 * let's release rest (if any). 149 */ 150 p = b->mem.pos - 1; 151 c = nxt_port_mmap_chunk_id(hdr, p) + 1; 152 p = nxt_port_mmap_chunk_start(hdr, c); 153 154 } else { 155 p = b->mem.start; 156 c = nxt_port_mmap_chunk_id(hdr, p); 157 } 158 159 nxt_port_mmap_free_junk(p, b->mem.end - p); 160 161 nxt_debug(task, "mmap buf completion: %p [%p,%uz] (sent=%d), " 162 "%PI->%PI,%d,%d", b, b->mem.start, b->mem.end - b->mem.start, 163 b->is_port_mmap_sent, hdr->src_pid, hdr->dst_pid, hdr->id, c); 164 165 while (p < b->mem.end) { 166 nxt_port_mmap_set_chunk_free(hdr->free_map, c); 167 168 p += PORT_MMAP_CHUNK_SIZE; 169 c++; 170 } 171 172 release_buf: 173 174 nxt_port_mmap_handler_use(mmap_handler, -1); 175 176 nxt_mp_free(mp, b); 177 nxt_mp_release(mp); 178 } 179 180 181 nxt_port_mmap_handler_t * 182 nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process, 183 nxt_fd_t fd) 184 { 185 void *mem; 186 struct stat mmap_stat; 187 nxt_port_mmap_t *port_mmap; 188 nxt_port_mmap_header_t *hdr; 189 nxt_port_mmap_handler_t *mmap_handler; 190 191 nxt_debug(task, "got new mmap fd #%FD from process %PI", 192 fd, process->pid); 193 194 port_mmap = NULL; 195 196 if (fstat(fd, &mmap_stat) == -1) { 197 nxt_log(task, NXT_LOG_WARN, "fstat(%FD) failed %E", fd, nxt_errno); 198 199 return NULL; 200 } 201 202 mem = nxt_mem_mmap(NULL, mmap_stat.st_size, 203 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 204 205 if (nxt_slow_path(mem == MAP_FAILED)) { 206 nxt_log(task, NXT_LOG_WARN, "mmap() failed %E", nxt_errno); 207 208 return NULL; 209 } 210 211 hdr = mem; 212 213 mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t)); 214 if (nxt_slow_path(mmap_handler == NULL)) { 215 nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler"); 216 217 nxt_mem_munmap(mem, PORT_MMAP_SIZE); 218 219 return NULL; 220 } 221 222 mmap_handler->hdr = hdr; 223 224 if (nxt_slow_path(hdr->src_pid != process->pid 225 || hdr->dst_pid != nxt_pid)) 226 { 227 nxt_log(task, NXT_LOG_WARN, "unexpected pid in mmap header detected: " 228 "%PI != %PI or %PI != %PI", hdr->src_pid, process->pid, 229 hdr->dst_pid, nxt_pid); 230 231 return NULL; 232 } 233 234 nxt_thread_mutex_lock(&process->incoming.mutex); 235 236 port_mmap = nxt_port_mmap_at(&process->incoming, hdr->id); 237 if (nxt_slow_path(port_mmap == NULL)) { 238 nxt_log(task, NXT_LOG_WARN, "failed to add mmap to incoming array"); 239 240 nxt_mem_munmap(mem, PORT_MMAP_SIZE); 241 hdr = NULL; 242 243 nxt_free(mmap_handler); 244 mmap_handler = NULL; 245 246 goto fail; 247 } 248 249 port_mmap->mmap_handler = mmap_handler; 250 nxt_port_mmap_handler_use(mmap_handler, 1); 251 252 hdr->sent_over = 0xFFFFu; 253 254 fail: 255 256 nxt_thread_mutex_unlock(&process->incoming.mutex); 257 258 return mmap_handler; 259 } 260 261 262 static nxt_port_mmap_handler_t * 263 nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, 264 nxt_port_t *port, nxt_bool_t tracking) 265 { 266 void *mem; 267 u_char *p, name[64]; 268 nxt_fd_t fd; 269 nxt_free_map_t *free_map; 270 nxt_port_mmap_t *port_mmap; 271 nxt_port_mmap_header_t *hdr; 272 nxt_port_mmap_handler_t *mmap_handler; 273 274 mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t)); 275 if (nxt_slow_path(mmap_handler == NULL)) { 276 nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler"); 277 278 return NULL; 279 } 280 281 port_mmap = nxt_port_mmap_at(&process->outgoing, process->outgoing.size); 282 if (nxt_slow_path(port_mmap == NULL)) { 283 nxt_log(task, NXT_LOG_WARN, 284 "failed to add port mmap to outgoing array"); 285 286 nxt_free(mmap_handler); 287 return NULL; 288 } 289 290 p = nxt_sprintf(name, name + sizeof(name), NXT_SHM_PREFIX "unit.%PI.%uxD", 291 nxt_pid, nxt_random(&task->thread->random)); 292 *p = '\0'; 293 294 #if (NXT_HAVE_MEMFD_CREATE) 295 296 fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC); 297 298 if (nxt_slow_path(fd == -1)) { 299 nxt_log(task, NXT_LOG_CRIT, "memfd_create(%s) failed %E", 300 name, nxt_errno); 301 302 goto remove_fail; 303 } 304 305 nxt_debug(task, "memfd_create(%s): %FD", name, fd); 306 307 #elif (NXT_HAVE_SHM_OPEN) 308 309 /* Just in case. */ 310 shm_unlink((char *) name); 311 312 fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR); 313 314 nxt_debug(task, "shm_open(%s): %FD", name, fd); 315 316 if (nxt_slow_path(fd == -1)) { 317 nxt_log(task, NXT_LOG_CRIT, "shm_open(%s) failed %E", name, nxt_errno); 318 319 goto remove_fail; 320 } 321 322 if (nxt_slow_path(shm_unlink((char *) name) == -1)) { 323 nxt_log(task, NXT_LOG_WARN, "shm_unlink(%s) failed %E", name, 324 nxt_errno); 325 } 326 327 #else 328 329 #error No working shared memory implementation. 330 331 #endif 332 333 if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) { 334 nxt_log(task, NXT_LOG_WARN, "ftruncate() failed %E", nxt_errno); 335 336 goto remove_fail; 337 } 338 339 mem = nxt_mem_mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE, 340 MAP_SHARED, fd, 0); 341 342 if (nxt_slow_path(mem == MAP_FAILED)) { 343 goto remove_fail; 344 } 345 346 mmap_handler->hdr = mem; 347 port_mmap->mmap_handler = mmap_handler; 348 nxt_port_mmap_handler_use(mmap_handler, 1); 349 350 /* Init segment header. */ 351 hdr = mmap_handler->hdr; 352 353 nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map)); 354 nxt_memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map)); 355 356 hdr->id = process->outgoing.size - 1; 357 hdr->src_pid = nxt_pid; 358 hdr->dst_pid = process->pid; 359 hdr->sent_over = port->id; 360 361 /* Mark first chunk as busy */ 362 free_map = tracking ? hdr->free_tracking_map : hdr->free_map; 363 364 nxt_port_mmap_set_chunk_busy(free_map, 0); 365 366 /* Mark as busy chunk followed the last available chunk. */ 367 nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT); 368 nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT); 369 370 nxt_debug(task, "send mmap fd %FD to process %PI", fd, port->pid); 371 372 /* TODO handle error */ 373 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL); 374 375 nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> %PI", 376 hdr->id, nxt_pid, process->pid); 377 378 return mmap_handler; 379 380 remove_fail: 381 382 nxt_free(mmap_handler); 383 384 process->outgoing.size--; 385 386 return NULL; 387 } 388 389 390 static nxt_port_mmap_handler_t * 391 nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c, 392 nxt_bool_t tracking) 393 { 394 nxt_process_t *process; 395 nxt_free_map_t *free_map; 396 nxt_port_mmap_t *port_mmap; 397 nxt_port_mmap_t *end_port_mmap; 398 nxt_port_mmap_header_t *hdr; 399 nxt_port_mmap_handler_t *mmap_handler; 400 401 process = port->process; 402 if (nxt_slow_path(process == NULL)) { 403 return NULL; 404 } 405 406 *c = 0; 407 408 nxt_thread_mutex_lock(&process->outgoing.mutex); 409 410 end_port_mmap = process->outgoing.elts + process->outgoing.size; 411 412 for (port_mmap = process->outgoing.elts; 413 port_mmap < end_port_mmap; 414 port_mmap++) 415 { 416 mmap_handler = port_mmap->mmap_handler; 417 hdr = mmap_handler->hdr; 418 419 if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port->id) { 420 continue; 421 } 422 423 free_map = tracking ? hdr->free_tracking_map : hdr->free_map; 424 425 if (nxt_port_mmap_get_free_chunk(free_map, c)) { 426 goto unlock_return; 427 } 428 } 429 430 /* TODO introduce port_mmap limit and release wait. */ 431 432 mmap_handler = nxt_port_new_port_mmap(task, process, port, tracking); 433 434 unlock_return: 435 436 nxt_thread_mutex_unlock(&process->outgoing.mutex); 437 438 return mmap_handler; 439 } 440 441 442 static nxt_port_mmap_handler_t * 443 nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id) 444 { 445 nxt_process_t *process; 446 nxt_port_mmap_handler_t *mmap_handler; 447 448 process = nxt_runtime_process_find(task->thread->runtime, spid); 449 if (nxt_slow_path(process == NULL)) { 450 return NULL; 451 } 452 453 nxt_thread_mutex_lock(&process->incoming.mutex); 454 455 if (nxt_fast_path(process->incoming.size > id)) { 456 mmap_handler = process->incoming.elts[id].mmap_handler; 457 458 } else { 459 mmap_handler = NULL; 460 461 nxt_debug(task, "invalid incoming mmap id %uD for pid %PI", id, spid); 462 } 463 464 nxt_thread_mutex_unlock(&process->incoming.mutex); 465 466 return mmap_handler; 467 } 468 469 470 nxt_int_t 471 nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_t *port, 472 nxt_port_mmap_tracking_t *tracking, uint32_t stream) 473 { 474 nxt_chunk_id_t c; 475 nxt_port_mmap_header_t *hdr; 476 nxt_port_mmap_handler_t *mmap_handler; 477 478 nxt_debug(task, "request tracking for stream #%uD", stream); 479 480 mmap_handler = nxt_port_mmap_get(task, port, &c, 1); 481 if (nxt_slow_path(mmap_handler == NULL)) { 482 return NXT_ERROR; 483 } 484 485 nxt_port_mmap_handler_use(mmap_handler, 1); 486 487 hdr = mmap_handler->hdr; 488 489 tracking->mmap_handler = mmap_handler; 490 tracking->tracking = hdr->tracking + c; 491 492 *tracking->tracking = stream; 493 494 nxt_debug(task, "outgoing tracking allocation: %PI->%PI,%d,%d", 495 hdr->src_pid, hdr->dst_pid, hdr->id, c); 496 497 return NXT_OK; 498 } 499 500 501 nxt_bool_t 502 nxt_port_mmap_tracking_cancel(nxt_task_t *task, 503 nxt_port_mmap_tracking_t *tracking, uint32_t stream) 504 { 505 nxt_bool_t res; 506 nxt_chunk_id_t c; 507 nxt_port_mmap_header_t *hdr; 508 nxt_port_mmap_handler_t *mmap_handler; 509 510 mmap_handler = tracking->mmap_handler; 511 512 if (nxt_slow_path(mmap_handler == NULL)) { 513 return 0; 514 } 515 516 hdr = mmap_handler->hdr; 517 518 res = nxt_atomic_cmp_set(tracking->tracking, stream, 0); 519 520 nxt_debug(task, "%s tracking for stream #%uD", 521 (res ? "cancelled" : "failed to cancel"), stream); 522 523 if (!res) { 524 c = tracking->tracking - hdr->tracking; 525 nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c); 526 } 527 528 nxt_port_mmap_handler_use(mmap_handler, -1); 529 530 return res; 531 } 532 533 534 nxt_int_t 535 nxt_port_mmap_tracking_write(uint32_t *buf, nxt_port_mmap_tracking_t *t) 536 { 537 nxt_port_mmap_handler_t *mmap_handler; 538 539 mmap_handler = t->mmap_handler; 540 541 #if (NXT_DEBUG) 542 { 543 nxt_atomic_t *tracking; 544 545 tracking = mmap_handler->hdr->tracking; 546 547 nxt_assert(t->tracking >= tracking); 548 nxt_assert(t->tracking < tracking + PORT_MMAP_CHUNK_COUNT); 549 } 550 #endif 551 552 buf[0] = mmap_handler->hdr->id; 553 buf[1] = t->tracking - mmap_handler->hdr->tracking; 554 555 return NXT_OK; 556 } 557 558 nxt_bool_t 559 nxt_port_mmap_tracking_read(nxt_task_t *task, nxt_port_recv_msg_t *msg) 560 { 561 nxt_buf_t *b; 562 nxt_bool_t res; 563 nxt_chunk_id_t c; 564 nxt_port_mmap_header_t *hdr; 565 nxt_port_mmap_handler_t *mmap_handler; 566 nxt_port_mmap_tracking_msg_t *tracking_msg; 567 568 b = msg->buf; 569 570 if (nxt_buf_used_size(b) < (int) sizeof(nxt_port_mmap_tracking_msg_t)) { 571 nxt_debug(task, "too small message %O", nxt_buf_used_size(b)); 572 return 0; 573 } 574 575 tracking_msg = (nxt_port_mmap_tracking_msg_t *) b->mem.pos; 576 577 b->mem.pos += sizeof(nxt_port_mmap_tracking_msg_t); 578 mmap_handler = nxt_port_get_port_incoming_mmap(task, msg->port_msg.pid, 579 tracking_msg->mmap_id); 580 581 if (nxt_slow_path(mmap_handler == NULL)) { 582 return 0; 583 } 584 585 hdr = mmap_handler->hdr; 586 587 c = tracking_msg->tracking_id; 588 res = nxt_atomic_cmp_set(hdr->tracking + c, msg->port_msg.stream, 0); 589 590 nxt_debug(task, "tracking for stream #%uD %s", msg->port_msg.stream, 591 (res ? "received" : "already cancelled")); 592 593 if (!res) { 594 nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c); 595 } 596 597 return res; 598 } 599 600 601 nxt_buf_t * 602 nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size) 603 { 604 size_t nchunks; 605 nxt_mp_t *mp; 606 nxt_buf_t *b; 607 nxt_chunk_id_t c; 608 nxt_port_mmap_header_t *hdr; 609 nxt_port_mmap_handler_t *mmap_handler; 610 611 nxt_debug(task, "request %z bytes shm buffer", size); 612 613 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 0); 614 if (nxt_slow_path(b == NULL)) { 615 return NULL; 616 } 617 618 b->completion_handler = nxt_port_mmap_buf_completion; 619 nxt_buf_set_port_mmap(b); 620 621 mmap_handler = nxt_port_mmap_get(task, port, &c, 0); 622 if (nxt_slow_path(mmap_handler == NULL)) { 623 mp = task->thread->engine->mem_pool; 624 nxt_mp_free(mp, b); 625 nxt_mp_release(mp); 626 return NULL; 627 } 628 629 b->parent = mmap_handler; 630 631 nxt_port_mmap_handler_use(mmap_handler, 1); 632 633 hdr = mmap_handler->hdr; 634 635 b->mem.start = nxt_port_mmap_chunk_start(hdr, c); 636 b->mem.pos = b->mem.start; 637 b->mem.free = b->mem.start; 638 b->mem.end = b->mem.start + PORT_MMAP_CHUNK_SIZE; 639 640 nchunks = size / PORT_MMAP_CHUNK_SIZE; 641 if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) { 642 nchunks++; 643 } 644 645 nxt_debug(task, "outgoing mmap buf allocation: %p [%p,%uz] %PI->%PI,%d,%d", 646 b, b->mem.start, b->mem.end - b->mem.start, 647 hdr->src_pid, hdr->dst_pid, hdr->id, c); 648 649 c++; 650 nchunks--; 651 652 /* Try to acquire as much chunks as required. */ 653 while (nchunks > 0) { 654 655 if (nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, c) == 0) { 656 break; 657 } 658 659 b->mem.end += PORT_MMAP_CHUNK_SIZE; 660 c++; 661 nchunks--; 662 } 663 664 return b; 665 } 666 667 668 nxt_int_t 669 nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size, 670 size_t min_size) 671 { 672 size_t nchunks, free_size; 673 nxt_chunk_id_t c, start; 674 nxt_port_mmap_header_t *hdr; 675 nxt_port_mmap_handler_t *mmap_handler; 676 677 nxt_debug(task, "request increase %z bytes shm buffer", size); 678 679 if (nxt_slow_path(nxt_buf_is_port_mmap(b) == 0)) { 680 nxt_log(task, NXT_LOG_WARN, 681 "failed to increase, not a mmap buffer"); 682 return NXT_ERROR; 683 } 684 685 free_size = nxt_buf_mem_free_size(&b->mem); 686 687 if (nxt_slow_path(size <= free_size)) { 688 return NXT_OK; 689 } 690 691 mmap_handler = b->parent; 692 hdr = mmap_handler->hdr; 693 694 start = nxt_port_mmap_chunk_id(hdr, b->mem.end); 695 696 size -= free_size; 697 698 nchunks = size / PORT_MMAP_CHUNK_SIZE; 699 if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) { 700 nchunks++; 701 } 702 703 c = start; 704 705 /* Try to acquire as much chunks as required. */ 706 while (nchunks > 0) { 707 708 if (nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, c) == 0) { 709 break; 710 } 711 712 c++; 713 nchunks--; 714 } 715 716 if (nchunks != 0 717 && min_size > free_size + PORT_MMAP_CHUNK_SIZE * (c - start)) 718 { 719 c--; 720 while (c >= start) { 721 nxt_port_mmap_set_chunk_free(hdr->free_map, c); 722 c--; 723 } 724 725 nxt_debug(task, "failed to increase, %uz chunks busy", nchunks); 726 727 return NXT_ERROR; 728 729 } else { 730 b->mem.end += PORT_MMAP_CHUNK_SIZE * (c - start); 731 732 return NXT_OK; 733 } 734 } 735 736 737 static nxt_buf_t * 738 nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port, 739 nxt_pid_t spid, nxt_port_mmap_msg_t *mmap_msg) 740 { 741 size_t nchunks; 742 nxt_buf_t *b; 743 nxt_port_mmap_header_t *hdr; 744 nxt_port_mmap_handler_t *mmap_handler; 745 746 mmap_handler = nxt_port_get_port_incoming_mmap(task, spid, 747 mmap_msg->mmap_id); 748 if (nxt_slow_path(mmap_handler == NULL)) { 749 return NULL; 750 } 751 752 b = nxt_buf_mem_ts_alloc(task, port->mem_pool, 0); 753 if (nxt_slow_path(b == NULL)) { 754 return NULL; 755 } 756 757 b->completion_handler = nxt_port_mmap_buf_completion; 758 759 nxt_buf_set_port_mmap(b); 760 761 nchunks = mmap_msg->size / PORT_MMAP_CHUNK_SIZE; 762 if ((mmap_msg->size % PORT_MMAP_CHUNK_SIZE) != 0) { 763 nchunks++; 764 } 765 766 hdr = mmap_handler->hdr; 767 768 b->mem.start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id); 769 b->mem.pos = b->mem.start; 770 b->mem.free = b->mem.start + mmap_msg->size; 771 b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE; 772 773 b->parent = mmap_handler; 774 nxt_port_mmap_handler_use(mmap_handler, 1); 775 776 nxt_debug(task, "incoming mmap buf allocation: %p [%p,%uz] %PI->%PI,%d,%d", 777 b, b->mem.start, b->mem.end - b->mem.start, 778 hdr->src_pid, hdr->dst_pid, hdr->id, mmap_msg->chunk_id); 779 780 return b; 781 } 782 783 784 void 785 nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port, 786 nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb) 787 { 788 size_t bsize; 789 nxt_buf_t *bmem; 790 nxt_uint_t i; 791 nxt_port_mmap_msg_t *mmap_msg; 792 nxt_port_mmap_header_t *hdr; 793 nxt_port_mmap_handler_t *mmap_handler; 794 795 nxt_debug(task, "prepare %z bytes message for transfer to process %PI " 796 "via shared memory", sb->size, port->pid); 797 798 bsize = sb->niov * sizeof(nxt_port_mmap_msg_t); 799 mmap_msg = port->mmsg_buf; 800 801 bmem = msg->buf; 802 803 for (i = 0; i < sb->niov; i++, mmap_msg++) { 804 805 /* Lookup buffer which starts current iov_base. */ 806 while (bmem && sb->iobuf[i].iov_base != bmem->mem.pos) { 807 bmem = bmem->next; 808 } 809 810 if (nxt_slow_path(bmem == NULL)) { 811 nxt_log_error(NXT_LOG_ERR, task->log, 812 "failed to find buf for iobuf[%d]", i); 813 return; 814 /* TODO clear b and exit */ 815 } 816 817 mmap_handler = bmem->parent; 818 hdr = mmap_handler->hdr; 819 820 mmap_msg->mmap_id = hdr->id; 821 mmap_msg->chunk_id = nxt_port_mmap_chunk_id(hdr, bmem->mem.pos); 822 mmap_msg->size = sb->iobuf[i].iov_len; 823 824 nxt_debug(task, "mmap_msg={%D, %D, %D} to %PI", 825 mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size, 826 port->pid); 827 } 828 829 sb->iobuf[0].iov_base = port->mmsg_buf; 830 sb->iobuf[0].iov_len = bsize; 831 sb->niov = 1; 832 sb->size = bsize; 833 834 msg->port_msg.mmap = 1; 835 } 836 837 838 void 839 nxt_port_mmap_read(nxt_task_t *task, nxt_port_recv_msg_t *msg) 840 { 841 nxt_buf_t *b, **pb; 842 nxt_port_mmap_msg_t *end, *mmap_msg; 843 844 pb = &msg->buf; 845 msg->size = 0; 846 847 for (b = msg->buf; b != NULL; b = b->next) { 848 849 mmap_msg = (nxt_port_mmap_msg_t *) b->mem.pos; 850 end = (nxt_port_mmap_msg_t *) b->mem.free; 851 852 while (mmap_msg < end) { 853 nxt_debug(task, "mmap_msg={%D, %D, %D} from %PI", 854 mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size, 855 msg->port_msg.pid); 856 857 *pb = nxt_port_mmap_get_incoming_buf(task, msg->port, 858 msg->port_msg.pid, mmap_msg); 859 if (nxt_slow_path(*pb == NULL)) { 860 nxt_log_error(NXT_LOG_ERR, task->log, 861 "failed to get mmap buffer"); 862 863 break; 864 } 865 866 msg->size += mmap_msg->size; 867 pb = &(*pb)->next; 868 mmap_msg++; 869 870 /* Mark original buf as complete. */ 871 b->mem.pos += sizeof(nxt_port_mmap_msg_t); 872 } 873 } 874 } 875 876 877 nxt_port_method_t 878 nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b) 879 { 880 nxt_port_method_t m; 881 nxt_port_mmap_header_t *hdr; 882 nxt_port_mmap_handler_t *mmap_handler; 883 884 m = NXT_PORT_METHOD_ANY; 885 886 for (; b != NULL; b = b->next) { 887 if (nxt_buf_used_size(b) == 0) { 888 /* empty buffers does not affect method */ 889 continue; 890 } 891 892 if (nxt_buf_is_port_mmap(b)) { 893 mmap_handler = b->parent; 894 hdr = mmap_handler->hdr; 895 896 if (m == NXT_PORT_METHOD_PLAIN) { 897 nxt_log_error(NXT_LOG_ERR, task->log, 898 "mixing plain and mmap buffers, " 899 "using plain mode"); 900 901 break; 902 } 903 904 if (port->pid != hdr->dst_pid) { 905 nxt_log_error(NXT_LOG_ERR, task->log, 906 "send mmap buffer for %PI to %PI, " 907 "using plain mode", hdr->dst_pid, port->pid); 908 909 m = NXT_PORT_METHOD_PLAIN; 910 911 break; 912 } 913 914 if (m == NXT_PORT_METHOD_ANY) { 915 nxt_debug(task, "using mmap mode"); 916 917 m = NXT_PORT_METHOD_MMAP; 918 } 919 } else { 920 if (m == NXT_PORT_METHOD_MMAP) { 921 nxt_log_error(NXT_LOG_ERR, task->log, 922 "mixing mmap and plain buffers, " 923 "switching to plain mode"); 924 925 m = NXT_PORT_METHOD_PLAIN; 926 927 break; 928 } 929 930 if (m == NXT_PORT_METHOD_ANY) { 931 nxt_debug(task, "using plain mode"); 932 933 m = NXT_PORT_METHOD_PLAIN; 934 } 935 } 936 } 937 938 return m; 939 } 940