1 2 /* 3 * Copyright (C) Max Romanov 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 #if (NXT_HAVE_MEMFD_CREATE) 10 11 #include <linux/memfd.h> 12 #include <unistd.h> 13 #include <sys/syscall.h> 14 15 #endif 16 17 #include <nxt_port_memory_int.h> 18 19 20 nxt_inline void 21 nxt_port_mmap_handler_use(nxt_port_mmap_handler_t *mmap_handler, int i) 22 { 23 int c; 24 25 c = nxt_atomic_fetch_add(&mmap_handler->use_count, i); 26 27 if (i < 0 && c == -i) { 28 if (mmap_handler->hdr != NULL) { 29 nxt_mem_munmap(mmap_handler->hdr, PORT_MMAP_SIZE); 30 mmap_handler->hdr = NULL; 31 } 32 33 nxt_free(mmap_handler); 34 } 35 } 36 37 38 static nxt_port_mmap_t * 39 nxt_port_mmap_at(nxt_port_mmaps_t *port_mmaps, uint32_t i) 40 { 41 uint32_t cap; 42 43 cap = port_mmaps->cap; 44 45 if (cap == 0) { 46 cap = i + 1; 47 } 48 49 while (i + 1 > cap) { 50 51 if (cap < 16) { 52 cap = cap * 2; 53 54 } else { 55 cap = cap + cap / 2; 56 } 57 } 58 59 if (cap != port_mmaps->cap) { 60 61 port_mmaps->elts = nxt_realloc(port_mmaps->elts, 62 cap * sizeof(nxt_port_mmap_t)); 63 if (nxt_slow_path(port_mmaps->elts == NULL)) { 64 return NULL; 65 } 66 67 nxt_memzero(port_mmaps->elts + port_mmaps->cap, 68 sizeof(nxt_port_mmap_t) * (cap - port_mmaps->cap)); 69 70 port_mmaps->cap = cap; 71 } 72 73 if (i + 1 > port_mmaps->size) { 74 port_mmaps->size = i + 1; 75 } 76 77 return port_mmaps->elts + i; 78 } 79 80 81 void 82 nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free_elts) 83 { 84 uint32_t i; 85 nxt_port_mmap_t *port_mmap; 86 87 if (port_mmaps == NULL) { 88 return; 89 } 90 91 port_mmap = port_mmaps->elts; 92 93 for (i = 0; i < port_mmaps->size; i++) { 94 nxt_port_mmap_handler_use(port_mmap[i].mmap_handler, -1); 95 } 96 97 port_mmaps->size = 0; 98 99 if (free_elts != 0) { 100 nxt_free(port_mmaps->elts); 101 } 102 } 103 104 105 #define nxt_port_mmap_free_junk(p, size) \ 106 memset((p), 0xA5, size) 107 108 109 static void 110 nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data) 111 { 112 u_char *p; 113 nxt_mp_t *mp; 114 nxt_buf_t *b; 115 nxt_chunk_id_t c; 116 nxt_port_mmap_header_t *hdr; 117 nxt_port_mmap_handler_t *mmap_handler; 118 119 if (nxt_buf_ts_handle(task, obj, data)) { 120 return; 121 } 122 123 b = obj; 124 125 mp = b->data; 126 127 #if (NXT_DEBUG) 128 if (nxt_slow_path(data != b->parent)) { 129 nxt_log_alert(task->log, "completion data (%p) != b->parent (%p)", 130 data, b->parent); 131 nxt_abort(); 132 } 133 #endif 134 135 mmap_handler = data; 136 hdr = mmap_handler->hdr; 137 138 if (nxt_slow_path(hdr->src_pid != nxt_pid && hdr->dst_pid != nxt_pid)) { 139 nxt_debug(task, "mmap buf completion: mmap for other process pair " 140 "%PI->%PI", hdr->src_pid, hdr->dst_pid); 141 142 goto release_buf; 143 } 144 145 if (b->is_port_mmap_sent && b->mem.pos > b->mem.start) { 146 /* 147 * Chunks until b->mem.pos has been sent to other side, 148 * let's release rest (if any). 149 */ 150 p = b->mem.pos - 1; 151 c = nxt_port_mmap_chunk_id(hdr, p) + 1; 152 p = nxt_port_mmap_chunk_start(hdr, c); 153 154 } else { 155 p = b->mem.start; 156 c = nxt_port_mmap_chunk_id(hdr, p); 157 } 158 159 nxt_port_mmap_free_junk(p, b->mem.end - p); 160 161 nxt_debug(task, "mmap buf completion: %p [%p,%d] (sent=%d), " 162 "%PI->%PI,%d,%d", b, b->mem.start, b->mem.end - b->mem.start, 163 b->is_port_mmap_sent, hdr->src_pid, hdr->dst_pid, hdr->id, c); 164 165 while (p < b->mem.end) { 166 nxt_port_mmap_set_chunk_free(hdr->free_map, c); 167 168 p += PORT_MMAP_CHUNK_SIZE; 169 c++; 170 } 171 172 release_buf: 173 174 nxt_port_mmap_handler_use(mmap_handler, -1); 175 176 nxt_mp_release(mp, b); 177 } 178 179 180 nxt_port_mmap_handler_t * 181 nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process, 182 nxt_fd_t fd) 183 { 184 void *mem; 185 struct stat mmap_stat; 186 nxt_port_mmap_t *port_mmap; 187 nxt_port_mmap_header_t *hdr; 188 nxt_port_mmap_handler_t *mmap_handler; 189 190 nxt_debug(task, "got new mmap fd #%FD from process %PI", 191 fd, process->pid); 192 193 port_mmap = NULL; 194 195 if (fstat(fd, &mmap_stat) == -1) { 196 nxt_log(task, NXT_LOG_WARN, "fstat(%FD) failed %E", fd, nxt_errno); 197 198 return NULL; 199 } 200 201 mem = nxt_mem_mmap(NULL, mmap_stat.st_size, 202 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 203 204 if (nxt_slow_path(mem == MAP_FAILED)) { 205 nxt_log(task, NXT_LOG_WARN, "mmap() failed %E", nxt_errno); 206 207 return NULL; 208 } 209 210 hdr = mem; 211 212 mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t)); 213 if (nxt_slow_path(mmap_handler == NULL)) { 214 nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler"); 215 216 nxt_mem_munmap(mem, PORT_MMAP_SIZE); 217 218 return NULL; 219 } 220 221 mmap_handler->hdr = hdr; 222 223 nxt_thread_mutex_lock(&process->incoming.mutex); 224 225 port_mmap = nxt_port_mmap_at(&process->incoming, hdr->id); 226 if (nxt_slow_path(port_mmap == NULL)) { 227 nxt_log(task, NXT_LOG_WARN, "failed to add mmap to incoming array"); 228 229 nxt_mem_munmap(mem, PORT_MMAP_SIZE); 230 hdr = NULL; 231 232 nxt_free(mmap_handler); 233 mmap_handler = NULL; 234 235 goto fail; 236 } 237 238 nxt_assert(hdr->src_pid == process->pid); 239 nxt_assert(hdr->dst_pid == nxt_pid); 240 241 port_mmap->mmap_handler = mmap_handler; 242 nxt_port_mmap_handler_use(mmap_handler, 1); 243 244 hdr->sent_over = 0xFFFFu; 245 246 fail: 247 248 nxt_thread_mutex_unlock(&process->incoming.mutex); 249 250 return mmap_handler; 251 } 252 253 254 static nxt_port_mmap_handler_t * 255 nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, 256 nxt_port_t *port, nxt_bool_t tracking) 257 { 258 void *mem; 259 u_char *p, name[64]; 260 nxt_fd_t fd; 261 nxt_free_map_t *free_map; 262 nxt_port_mmap_t *port_mmap; 263 nxt_port_mmap_header_t *hdr; 264 nxt_port_mmap_handler_t *mmap_handler; 265 266 mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t)); 267 if (nxt_slow_path(mmap_handler == NULL)) { 268 nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler"); 269 270 return NULL; 271 } 272 273 port_mmap = nxt_port_mmap_at(&process->outgoing, process->outgoing.size); 274 if (nxt_slow_path(port_mmap == NULL)) { 275 nxt_log(task, NXT_LOG_WARN, 276 "failed to add port mmap to outgoing array"); 277 278 nxt_free(mmap_handler); 279 return NULL; 280 } 281 282 p = nxt_sprintf(name, name + sizeof(name), "/unit.%PI.%uxD", 283 nxt_pid, nxt_random(&task->thread->random)); 284 *p = '\0'; 285 286 #if (NXT_HAVE_MEMFD_CREATE) 287 288 fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC); 289 290 if (nxt_slow_path(fd == -1)) { 291 nxt_log(task, NXT_LOG_CRIT, "memfd_create(%s) failed %E", 292 name, nxt_errno); 293 294 goto remove_fail; 295 } 296 297 nxt_debug(task, "memfd_create(%s): %FD", name, fd); 298 299 #elif (NXT_HAVE_SHM_OPEN) 300 301 /* Just in case. */ 302 shm_unlink((char *) name); 303 304 fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR); 305 306 nxt_debug(task, "shm_open(%s): %FD", name, fd); 307 308 if (nxt_slow_path(fd == -1)) { 309 nxt_log(task, NXT_LOG_CRIT, "shm_open(%s) failed %E", name, nxt_errno); 310 311 goto remove_fail; 312 } 313 314 if (nxt_slow_path(shm_unlink((char *) name) == -1)) { 315 nxt_log(task, NXT_LOG_WARN, "shm_unlink(%s) failed %E", name, 316 nxt_errno); 317 } 318 319 #else 320 321 #error No working shared memory implementation. 322 323 #endif 324 325 if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) { 326 nxt_log(task, NXT_LOG_WARN, "ftruncate() failed %E", nxt_errno); 327 328 goto remove_fail; 329 } 330 331 mem = nxt_mem_mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE, 332 MAP_SHARED, fd, 0); 333 334 if (nxt_slow_path(mem == MAP_FAILED)) { 335 goto remove_fail; 336 } 337 338 mmap_handler->hdr = mem; 339 port_mmap->mmap_handler = mmap_handler; 340 nxt_port_mmap_handler_use(mmap_handler, 1); 341 342 /* Init segment header. */ 343 hdr = mmap_handler->hdr; 344 345 nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map)); 346 nxt_memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map)); 347 348 hdr->id = process->outgoing.size - 1; 349 hdr->src_pid = nxt_pid; 350 hdr->dst_pid = process->pid; 351 hdr->sent_over = port->id; 352 353 /* Mark first chunk as busy */ 354 free_map = tracking ? hdr->free_tracking_map : hdr->free_map; 355 356 nxt_port_mmap_set_chunk_busy(free_map, 0); 357 358 /* Mark as busy chunk followed the last available chunk. */ 359 nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT); 360 nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT); 361 362 nxt_debug(task, "send mmap fd %FD to process %PI", fd, port->pid); 363 364 /* TODO handle error */ 365 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL); 366 367 nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> %PI", 368 hdr->id, nxt_pid, process->pid); 369 370 return mmap_handler; 371 372 remove_fail: 373 374 nxt_free(mmap_handler); 375 376 process->outgoing.size--; 377 378 return NULL; 379 } 380 381 382 static nxt_port_mmap_handler_t * 383 nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c, 384 nxt_bool_t tracking) 385 { 386 nxt_process_t *process; 387 nxt_free_map_t *free_map; 388 nxt_port_mmap_t *port_mmap; 389 nxt_port_mmap_t *end_port_mmap; 390 nxt_port_mmap_header_t *hdr; 391 nxt_port_mmap_handler_t *mmap_handler; 392 393 process = port->process; 394 if (nxt_slow_path(process == NULL)) { 395 return NULL; 396 } 397 398 *c = 0; 399 400 nxt_thread_mutex_lock(&process->outgoing.mutex); 401 402 end_port_mmap = process->outgoing.elts + process->outgoing.size; 403 404 for (port_mmap = process->outgoing.elts; 405 port_mmap < end_port_mmap; 406 port_mmap++) 407 { 408 mmap_handler = port_mmap->mmap_handler; 409 hdr = mmap_handler->hdr; 410 411 if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port->id) { 412 continue; 413 } 414 415 free_map = tracking ? hdr->free_tracking_map : hdr->free_map; 416 417 if (nxt_port_mmap_get_free_chunk(free_map, c)) { 418 goto unlock_return; 419 } 420 } 421 422 /* TODO introduce port_mmap limit and release wait. */ 423 424 mmap_handler = nxt_port_new_port_mmap(task, process, port, tracking); 425 426 unlock_return: 427 428 nxt_thread_mutex_unlock(&process->outgoing.mutex); 429 430 return mmap_handler; 431 } 432 433 434 static nxt_port_mmap_handler_t * 435 nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id) 436 { 437 nxt_process_t *process; 438 nxt_port_mmap_handler_t *mmap_handler; 439 440 process = nxt_runtime_process_find(task->thread->runtime, spid); 441 if (nxt_slow_path(process == NULL)) { 442 return NULL; 443 } 444 445 nxt_thread_mutex_lock(&process->incoming.mutex); 446 447 if (nxt_fast_path(process->incoming.size > id)) { 448 mmap_handler = process->incoming.elts[id].mmap_handler; 449 450 } else { 451 mmap_handler = NULL; 452 453 nxt_debug(task, "invalid incoming mmap id %uD for pid %PI", id, spid); 454 } 455 456 nxt_thread_mutex_unlock(&process->incoming.mutex); 457 458 return mmap_handler; 459 } 460 461 462 nxt_int_t 463 nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_t *port, 464 nxt_port_mmap_tracking_t *tracking, uint32_t stream) 465 { 466 nxt_chunk_id_t c; 467 nxt_port_mmap_header_t *hdr; 468 nxt_port_mmap_handler_t *mmap_handler; 469 470 nxt_debug(task, "request tracking for stream #%uD", stream); 471 472 mmap_handler = nxt_port_mmap_get(task, port, &c, 1); 473 if (nxt_slow_path(mmap_handler == NULL)) { 474 return NXT_ERROR; 475 } 476 477 nxt_port_mmap_handler_use(mmap_handler, 1); 478 479 hdr = mmap_handler->hdr; 480 481 tracking->mmap_handler = mmap_handler; 482 tracking->tracking = hdr->tracking + c; 483 484 *tracking->tracking = stream; 485 486 nxt_debug(task, "outgoing tracking allocation: %PI->%PI,%d,%d", 487 hdr->src_pid, hdr->dst_pid, hdr->id, c); 488 489 return NXT_OK; 490 } 491 492 493 nxt_bool_t 494 nxt_port_mmap_tracking_cancel(nxt_task_t *task, 495 nxt_port_mmap_tracking_t *tracking, uint32_t stream) 496 { 497 nxt_bool_t res; 498 nxt_chunk_id_t c; 499 nxt_port_mmap_header_t *hdr; 500 nxt_port_mmap_handler_t *mmap_handler; 501 502 mmap_handler = tracking->mmap_handler; 503 504 if (nxt_slow_path(mmap_handler == NULL)) { 505 return 0; 506 } 507 508 hdr = mmap_handler->hdr; 509 510 res = nxt_atomic_cmp_set(tracking->tracking, stream, 0); 511 512 nxt_debug(task, "%s tracking for stream #%uD", 513 (res ? "cancelled" : "failed to cancel"), stream); 514 515 if (!res) { 516 c = tracking->tracking - hdr->tracking; 517 nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c); 518 } 519 520 nxt_port_mmap_handler_use(mmap_handler, -1); 521 522 return res; 523 } 524 525 526 nxt_int_t 527 nxt_port_mmap_tracking_write(uint32_t *buf, nxt_port_mmap_tracking_t *t) 528 { 529 nxt_atomic_t *tracking; 530 nxt_port_mmap_handler_t *mmap_handler; 531 532 mmap_handler = t->mmap_handler; 533 tracking = mmap_handler->hdr->tracking; 534 535 nxt_assert(t->tracking >= tracking); 536 nxt_assert(t->tracking < tracking + PORT_MMAP_CHUNK_COUNT); 537 538 buf[0] = mmap_handler->hdr->id; 539 buf[1] = t->tracking - mmap_handler->hdr->tracking; 540 541 return NXT_OK; 542 } 543 544 nxt_bool_t 545 nxt_port_mmap_tracking_read(nxt_task_t *task, nxt_port_recv_msg_t *msg) 546 { 547 nxt_buf_t *b; 548 nxt_bool_t res; 549 nxt_chunk_id_t c; 550 nxt_port_mmap_header_t *hdr; 551 nxt_port_mmap_handler_t *mmap_handler; 552 nxt_port_mmap_tracking_msg_t *tracking_msg; 553 554 b = msg->buf; 555 556 if (nxt_buf_used_size(b) < (int)sizeof(nxt_port_mmap_tracking_msg_t)) { 557 nxt_debug(task, "too small message %u", nxt_buf_used_size(b)); 558 return 0; 559 } 560 561 tracking_msg = (nxt_port_mmap_tracking_msg_t *) b->mem.pos; 562 563 b->mem.pos += sizeof(nxt_port_mmap_tracking_msg_t); 564 mmap_handler = nxt_port_get_port_incoming_mmap(task, msg->port_msg.pid, 565 tracking_msg->mmap_id); 566 567 if (nxt_slow_path(mmap_handler == NULL)) { 568 return 0; 569 } 570 571 hdr = mmap_handler->hdr; 572 573 c = tracking_msg->tracking_id; 574 res = nxt_atomic_cmp_set(hdr->tracking + c, msg->port_msg.stream, 0); 575 576 nxt_debug(task, "tracking for stream #%uD %s", msg->port_msg.stream, 577 (res ? "received" : "already cancelled")); 578 579 if (!res) { 580 nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c); 581 } 582 583 return res; 584 } 585 586 587 nxt_buf_t * 588 nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size) 589 { 590 size_t nchunks; 591 nxt_buf_t *b; 592 nxt_chunk_id_t c; 593 nxt_port_mmap_header_t *hdr; 594 nxt_port_mmap_handler_t *mmap_handler; 595 596 nxt_debug(task, "request %z bytes shm buffer", size); 597 598 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 0); 599 if (nxt_slow_path(b == NULL)) { 600 return NULL; 601 } 602 603 b->completion_handler = nxt_port_mmap_buf_completion; 604 nxt_buf_set_port_mmap(b); 605 606 mmap_handler = nxt_port_mmap_get(task, port, &c, 0); 607 if (nxt_slow_path(mmap_handler == NULL)) { 608 nxt_mp_release(task->thread->engine->mem_pool, b); 609 return NULL; 610 } 611 612 b->parent = mmap_handler; 613 614 nxt_port_mmap_handler_use(mmap_handler, 1); 615 616 hdr = mmap_handler->hdr; 617 618 b->mem.start = nxt_port_mmap_chunk_start(hdr, c); 619 b->mem.pos = b->mem.start; 620 b->mem.free = b->mem.start; 621 b->mem.end = b->mem.start + PORT_MMAP_CHUNK_SIZE; 622 623 nchunks = size / PORT_MMAP_CHUNK_SIZE; 624 if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) { 625 nchunks++; 626 } 627 628 nxt_debug(task, "outgoing mmap buf allocation: %p [%p,%d] %PI->%PI,%d,%d", 629 b, b->mem.start, b->mem.end - b->mem.start, 630 hdr->src_pid, hdr->dst_pid, hdr->id, c); 631 632 c++; 633 nchunks--; 634 635 /* Try to acquire as much chunks as required. */ 636 while (nchunks > 0) { 637 638 if (nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, c) == 0) { 639 break; 640 } 641 642 b->mem.end += PORT_MMAP_CHUNK_SIZE; 643 c++; 644 nchunks--; 645 } 646 647 return b; 648 } 649 650 651 nxt_int_t 652 nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size, 653 size_t min_size) 654 { 655 size_t nchunks, free_size; 656 nxt_chunk_id_t c, start; 657 nxt_port_mmap_header_t *hdr; 658 nxt_port_mmap_handler_t *mmap_handler; 659 660 nxt_debug(task, "request increase %z bytes shm buffer", size); 661 662 if (nxt_slow_path(nxt_buf_is_port_mmap(b) == 0)) { 663 nxt_log(task, NXT_LOG_WARN, 664 "failed to increase, not a mmap buffer"); 665 return NXT_ERROR; 666 } 667 668 free_size = nxt_buf_mem_free_size(&b->mem); 669 670 if (nxt_slow_path(size <= free_size)) { 671 return NXT_OK; 672 } 673 674 mmap_handler = b->parent; 675 hdr = mmap_handler->hdr; 676 677 start = nxt_port_mmap_chunk_id(hdr, b->mem.end); 678 679 size -= free_size; 680 681 nchunks = size / PORT_MMAP_CHUNK_SIZE; 682 if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) { 683 nchunks++; 684 } 685 686 c = start; 687 688 /* Try to acquire as much chunks as required. */ 689 while (nchunks > 0) { 690 691 if (nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, c) == 0) { 692 break; 693 } 694 695 c++; 696 nchunks--; 697 } 698 699 if (nchunks != 0 700 && min_size > free_size + PORT_MMAP_CHUNK_SIZE * (c - start)) 701 { 702 c--; 703 while (c >= start) { 704 nxt_port_mmap_set_chunk_free(hdr->free_map, c); 705 c--; 706 } 707 708 nxt_debug(task, "failed to increase, %d chunks busy", nchunks); 709 710 return NXT_ERROR; 711 712 } else { 713 b->mem.end += PORT_MMAP_CHUNK_SIZE * (c - start); 714 715 return NXT_OK; 716 } 717 } 718 719 720 static nxt_buf_t * 721 nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port, 722 nxt_pid_t spid, nxt_port_mmap_msg_t *mmap_msg) 723 { 724 size_t nchunks; 725 nxt_buf_t *b; 726 nxt_port_mmap_header_t *hdr; 727 nxt_port_mmap_handler_t *mmap_handler; 728 729 mmap_handler = nxt_port_get_port_incoming_mmap(task, spid, 730 mmap_msg->mmap_id); 731 if (nxt_slow_path(mmap_handler == NULL)) { 732 return NULL; 733 } 734 735 b = nxt_buf_mem_ts_alloc(task, port->mem_pool, 0); 736 if (nxt_slow_path(b == NULL)) { 737 return NULL; 738 } 739 740 b->completion_handler = nxt_port_mmap_buf_completion; 741 742 nxt_buf_set_port_mmap(b); 743 744 nchunks = mmap_msg->size / PORT_MMAP_CHUNK_SIZE; 745 if ((mmap_msg->size % PORT_MMAP_CHUNK_SIZE) != 0) { 746 nchunks++; 747 } 748 749 hdr = mmap_handler->hdr; 750 751 b->mem.start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id); 752 b->mem.pos = b->mem.start; 753 b->mem.free = b->mem.start + mmap_msg->size; 754 b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE; 755 756 b->parent = mmap_handler; 757 nxt_port_mmap_handler_use(mmap_handler, 1); 758 759 nxt_debug(task, "incoming mmap buf allocation: %p [%p,%d] %PI->%PI,%d,%d", 760 b, b->mem.start, b->mem.end - b->mem.start, 761 hdr->src_pid, hdr->dst_pid, hdr->id, mmap_msg->chunk_id); 762 763 return b; 764 } 765 766 767 void 768 nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port, 769 nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb) 770 { 771 size_t bsize; 772 nxt_buf_t *bmem; 773 nxt_uint_t i; 774 nxt_port_mmap_msg_t *mmap_msg; 775 nxt_port_mmap_header_t *hdr; 776 nxt_port_mmap_handler_t *mmap_handler; 777 778 nxt_debug(task, "prepare %z bytes message for transfer to process %PI " 779 "via shared memory", sb->size, port->pid); 780 781 bsize = sb->niov * sizeof(nxt_port_mmap_msg_t); 782 mmap_msg = port->mmsg_buf; 783 784 bmem = msg->buf; 785 786 for (i = 0; i < sb->niov; i++, mmap_msg++) { 787 788 /* Lookup buffer which starts current iov_base. */ 789 while (bmem && sb->iobuf[i].iov_base != bmem->mem.pos) { 790 bmem = bmem->next; 791 } 792 793 if (nxt_slow_path(bmem == NULL)) { 794 nxt_log_error(NXT_LOG_ERR, task->log, 795 "failed to find buf for iobuf[%d]", i); 796 return; 797 /* TODO clear b and exit */ 798 } 799 800 mmap_handler = bmem->parent; 801 hdr = mmap_handler->hdr; 802 803 mmap_msg->mmap_id = hdr->id; 804 mmap_msg->chunk_id = nxt_port_mmap_chunk_id(hdr, bmem->mem.pos); 805 mmap_msg->size = sb->iobuf[i].iov_len; 806 807 nxt_debug(task, "mmap_msg={%D, %D, %D} to %PI", 808 mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size, 809 port->pid); 810 } 811 812 sb->iobuf[0].iov_base = port->mmsg_buf; 813 sb->iobuf[0].iov_len = bsize; 814 sb->niov = 1; 815 sb->size = bsize; 816 817 msg->port_msg.mmap = 1; 818 } 819 820 821 void 822 nxt_port_mmap_read(nxt_task_t *task, nxt_port_recv_msg_t *msg) 823 { 824 nxt_buf_t *b, **pb; 825 nxt_port_mmap_msg_t *end, *mmap_msg; 826 827 pb = &msg->buf; 828 msg->size = 0; 829 830 for (b = msg->buf; b != NULL; b = b->next) { 831 832 mmap_msg = (nxt_port_mmap_msg_t *) b->mem.pos; 833 end = (nxt_port_mmap_msg_t *) b->mem.free; 834 835 while (mmap_msg < end) { 836 nxt_assert(mmap_msg + 1 <= end); 837 838 nxt_debug(task, "mmap_msg={%D, %D, %D} from %PI", 839 mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size, 840 msg->port_msg.pid); 841 842 *pb = nxt_port_mmap_get_incoming_buf(task, msg->port, 843 msg->port_msg.pid, mmap_msg); 844 if (nxt_slow_path(*pb == NULL)) { 845 nxt_log_error(NXT_LOG_ERR, task->log, 846 "failed to get mmap buffer"); 847 848 break; 849 } 850 851 msg->size += mmap_msg->size; 852 pb = &(*pb)->next; 853 mmap_msg++; 854 855 /* Mark original buf as complete. */ 856 b->mem.pos += sizeof(nxt_port_mmap_msg_t); 857 } 858 } 859 } 860 861 862 nxt_port_method_t 863 nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b) 864 { 865 nxt_port_method_t m; 866 nxt_port_mmap_header_t *hdr; 867 nxt_port_mmap_handler_t *mmap_handler; 868 869 m = NXT_PORT_METHOD_ANY; 870 871 for (; b != NULL; b = b->next) { 872 if (nxt_buf_used_size(b) == 0) { 873 /* empty buffers does not affect method */ 874 continue; 875 } 876 877 if (nxt_buf_is_port_mmap(b)) { 878 mmap_handler = b->parent; 879 hdr = mmap_handler->hdr; 880 881 if (m == NXT_PORT_METHOD_PLAIN) { 882 nxt_log_error(NXT_LOG_ERR, task->log, 883 "mixing plain and mmap buffers, " 884 "using plain mode"); 885 886 break; 887 } 888 889 if (port->pid != hdr->dst_pid) { 890 nxt_log_error(NXT_LOG_ERR, task->log, 891 "send mmap buffer for %PI to %PI, " 892 "using plain mode", hdr->dst_pid, port->pid); 893 894 m = NXT_PORT_METHOD_PLAIN; 895 896 break; 897 } 898 899 if (m == NXT_PORT_METHOD_ANY) { 900 nxt_debug(task, "using mmap mode"); 901 902 m = NXT_PORT_METHOD_MMAP; 903 } 904 } else { 905 if (m == NXT_PORT_METHOD_MMAP) { 906 nxt_log_error(NXT_LOG_ERR, task->log, 907 "mixing mmap and plain buffers, " 908 "switching to plain mode"); 909 910 m = NXT_PORT_METHOD_PLAIN; 911 912 break; 913 } 914 915 if (m == NXT_PORT_METHOD_ANY) { 916 nxt_debug(task, "using plain mode"); 917 918 m = NXT_PORT_METHOD_PLAIN; 919 } 920 } 921 } 922 923 return m; 924 } 925