1 2 /* 3 * Copyright (C) Max Romanov 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 #if (NXT_HAVE_MEMFD_CREATE) 10 11 #include <linux/memfd.h> 12 #include <unistd.h> 13 #include <sys/syscall.h> 14 15 #endif 16 17 #include <nxt_port_memory_int.h> 18 19 20 nxt_inline void 21 nxt_port_mmap_handler_use(nxt_port_mmap_handler_t *mmap_handler, int i) 22 { 23 int c; 24 25 c = nxt_atomic_fetch_add(&mmap_handler->use_count, i); 26 27 if (i < 0 && c == -i) { 28 if (mmap_handler->hdr != NULL) { 29 nxt_mem_munmap(mmap_handler->hdr, PORT_MMAP_SIZE); 30 mmap_handler->hdr = NULL; 31 } 32 33 nxt_free(mmap_handler); 34 } 35 } 36 37 38 static nxt_port_mmap_t * 39 nxt_port_mmap_at(nxt_port_mmaps_t *port_mmaps, uint32_t i) 40 { 41 uint32_t cap; 42 43 cap = port_mmaps->cap; 44 45 if (cap == 0) { 46 cap = i + 1; 47 } 48 49 while (i + 1 > cap) { 50 51 if (cap < 16) { 52 cap = cap * 2; 53 54 } else { 55 cap = cap + cap / 2; 56 } 57 } 58 59 if (cap != port_mmaps->cap) { 60 61 port_mmaps->elts = nxt_realloc(port_mmaps->elts, 62 cap * sizeof(nxt_port_mmap_t)); 63 if (nxt_slow_path(port_mmaps->elts == NULL)) { 64 return NULL; 65 } 66 67 nxt_memzero(port_mmaps->elts + port_mmaps->cap, 68 sizeof(nxt_port_mmap_t) * (cap - port_mmaps->cap)); 69 70 port_mmaps->cap = cap; 71 } 72 73 if (i + 1 > port_mmaps->size) { 74 port_mmaps->size = i + 1; 75 } 76 77 return port_mmaps->elts + i; 78 } 79 80 81 void 82 nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free_elts) 83 { 84 uint32_t i; 85 nxt_port_mmap_t *port_mmap; 86 87 if (port_mmaps == NULL) { 88 return; 89 } 90 91 port_mmap = port_mmaps->elts; 92 93 for (i = 0; i < port_mmaps->size; i++) { 94 nxt_port_mmap_handler_use(port_mmap[i].mmap_handler, -1); 95 } 96 97 port_mmaps->size = 0; 98 99 if (free_elts != 0) { 100 nxt_free(port_mmaps->elts); 101 } 102 } 103 104 105 #define nxt_port_mmap_free_junk(p, size) \ 106 memset((p), 0xA5, size) 107 108 109 static void 110 nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data) 111 { 112 u_char *p; 113 nxt_mp_t *mp; 114 nxt_buf_t *b; 115 nxt_chunk_id_t c; 116 nxt_port_mmap_header_t *hdr; 117 nxt_port_mmap_handler_t *mmap_handler; 118 119 if (nxt_buf_ts_handle(task, obj, data)) { 120 return; 121 } 122 123 b = obj; 124 125 mp = b->data; 126 127 nxt_assert(data == b->parent); 128 129 mmap_handler = data; 130 hdr = mmap_handler->hdr; 131 132 if (nxt_slow_path(hdr->src_pid != nxt_pid && hdr->dst_pid != nxt_pid)) { 133 nxt_debug(task, "mmap buf completion: mmap for other process pair " 134 "%PI->%PI", hdr->src_pid, hdr->dst_pid); 135 136 goto release_buf; 137 } 138 139 if (b->is_port_mmap_sent && b->mem.pos > b->mem.start) { 140 /* 141 * Chunks until b->mem.pos has been sent to other side, 142 * let's release rest (if any). 143 */ 144 p = b->mem.pos - 1; 145 c = nxt_port_mmap_chunk_id(hdr, p) + 1; 146 p = nxt_port_mmap_chunk_start(hdr, c); 147 148 } else { 149 p = b->mem.start; 150 c = nxt_port_mmap_chunk_id(hdr, p); 151 } 152 153 nxt_port_mmap_free_junk(p, b->mem.end - p); 154 155 nxt_debug(task, "mmap buf completion: %p [%p,%uz] (sent=%d), " 156 "%PI->%PI,%d,%d", b, b->mem.start, b->mem.end - b->mem.start, 157 b->is_port_mmap_sent, hdr->src_pid, hdr->dst_pid, hdr->id, c); 158 159 while (p < b->mem.end) { 160 nxt_port_mmap_set_chunk_free(hdr->free_map, c); 161 162 p += PORT_MMAP_CHUNK_SIZE; 163 c++; 164 } 165 166 release_buf: 167 168 nxt_port_mmap_handler_use(mmap_handler, -1); 169 170 nxt_mp_free(mp, b); 171 nxt_mp_release(mp); 172 } 173 174 175 nxt_port_mmap_handler_t * 176 nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process, 177 nxt_fd_t fd) 178 { 179 void *mem; 180 struct stat mmap_stat; 181 nxt_port_mmap_t *port_mmap; 182 nxt_port_mmap_header_t *hdr; 183 nxt_port_mmap_handler_t *mmap_handler; 184 185 nxt_debug(task, "got new mmap fd #%FD from process %PI", 186 fd, process->pid); 187 188 port_mmap = NULL; 189 190 if (fstat(fd, &mmap_stat) == -1) { 191 nxt_log(task, NXT_LOG_WARN, "fstat(%FD) failed %E", fd, nxt_errno); 192 193 return NULL; 194 } 195 196 mem = nxt_mem_mmap(NULL, mmap_stat.st_size, 197 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 198 199 if (nxt_slow_path(mem == MAP_FAILED)) { 200 nxt_log(task, NXT_LOG_WARN, "mmap() failed %E", nxt_errno); 201 202 return NULL; 203 } 204 205 hdr = mem; 206 207 mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t)); 208 if (nxt_slow_path(mmap_handler == NULL)) { 209 nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler"); 210 211 nxt_mem_munmap(mem, PORT_MMAP_SIZE); 212 213 return NULL; 214 } 215 216 mmap_handler->hdr = hdr; 217 218 if (nxt_slow_path(hdr->src_pid != process->pid 219 || hdr->dst_pid != nxt_pid)) 220 { 221 nxt_log(task, NXT_LOG_WARN, "unexpected pid in mmap header detected: " 222 "%PI != %PI or %PI != %PI", hdr->src_pid, process->pid, 223 hdr->dst_pid, nxt_pid); 224 225 return NULL; 226 } 227 228 nxt_thread_mutex_lock(&process->incoming.mutex); 229 230 port_mmap = nxt_port_mmap_at(&process->incoming, hdr->id); 231 if (nxt_slow_path(port_mmap == NULL)) { 232 nxt_log(task, NXT_LOG_WARN, "failed to add mmap to incoming array"); 233 234 nxt_mem_munmap(mem, PORT_MMAP_SIZE); 235 hdr = NULL; 236 237 nxt_free(mmap_handler); 238 mmap_handler = NULL; 239 240 goto fail; 241 } 242 243 port_mmap->mmap_handler = mmap_handler; 244 nxt_port_mmap_handler_use(mmap_handler, 1); 245 246 hdr->sent_over = 0xFFFFu; 247 248 fail: 249 250 nxt_thread_mutex_unlock(&process->incoming.mutex); 251 252 return mmap_handler; 253 } 254 255 256 static nxt_port_mmap_handler_t * 257 nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, 258 nxt_port_t *port, nxt_bool_t tracking, nxt_int_t n) 259 { 260 void *mem; 261 u_char *p, name[64]; 262 nxt_fd_t fd; 263 nxt_int_t i; 264 nxt_free_map_t *free_map; 265 nxt_port_mmap_t *port_mmap; 266 nxt_port_mmap_header_t *hdr; 267 nxt_port_mmap_handler_t *mmap_handler; 268 269 mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t)); 270 if (nxt_slow_path(mmap_handler == NULL)) { 271 nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler"); 272 273 return NULL; 274 } 275 276 port_mmap = nxt_port_mmap_at(&process->outgoing, process->outgoing.size); 277 if (nxt_slow_path(port_mmap == NULL)) { 278 nxt_log(task, NXT_LOG_WARN, 279 "failed to add port mmap to outgoing array"); 280 281 nxt_free(mmap_handler); 282 return NULL; 283 } 284 285 p = nxt_sprintf(name, name + sizeof(name), NXT_SHM_PREFIX "unit.%PI.%uxD", 286 nxt_pid, nxt_random(&task->thread->random)); 287 *p = '\0'; 288 289 #if (NXT_HAVE_MEMFD_CREATE) 290 291 fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC); 292 293 if (nxt_slow_path(fd == -1)) { 294 nxt_alert(task, "memfd_create(%s) failed %E", name, nxt_errno); 295 296 goto remove_fail; 297 } 298 299 nxt_debug(task, "memfd_create(%s): %FD", name, fd); 300 301 #elif (NXT_HAVE_SHM_OPEN_ANON) 302 303 fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR); 304 305 nxt_debug(task, "shm_open(SHM_ANON): %FD", fd); 306 307 if (nxt_slow_path(fd == -1)) { 308 nxt_alert(task, "shm_open(SHM_ANON) failed %E", nxt_errno); 309 310 goto remove_fail; 311 } 312 313 #elif (NXT_HAVE_SHM_OPEN) 314 315 /* Just in case. */ 316 shm_unlink((char *) name); 317 318 fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR); 319 320 nxt_debug(task, "shm_open(%s): %FD", name, fd); 321 322 if (nxt_slow_path(fd == -1)) { 323 nxt_alert(task, "shm_open(%s) failed %E", name, nxt_errno); 324 325 goto remove_fail; 326 } 327 328 if (nxt_slow_path(shm_unlink((char *) name) == -1)) { 329 nxt_log(task, NXT_LOG_WARN, "shm_unlink(%s) failed %E", name, 330 nxt_errno); 331 } 332 333 #else 334 335 #error No working shared memory implementation. 336 337 #endif 338 339 if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) { 340 nxt_log(task, NXT_LOG_WARN, "ftruncate() failed %E", nxt_errno); 341 342 goto remove_fail; 343 } 344 345 mem = nxt_mem_mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE, 346 MAP_SHARED, fd, 0); 347 348 if (nxt_slow_path(mem == MAP_FAILED)) { 349 goto remove_fail; 350 } 351 352 mmap_handler->hdr = mem; 353 port_mmap->mmap_handler = mmap_handler; 354 nxt_port_mmap_handler_use(mmap_handler, 1); 355 356 /* Init segment header. */ 357 hdr = mmap_handler->hdr; 358 359 nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map)); 360 nxt_memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map)); 361 362 hdr->id = process->outgoing.size - 1; 363 hdr->src_pid = nxt_pid; 364 hdr->dst_pid = process->pid; 365 hdr->sent_over = port->id; 366 367 /* Mark first chunk as busy */ 368 free_map = tracking ? hdr->free_tracking_map : hdr->free_map; 369 370 for (i = 0; i < n; i++) { 371 nxt_port_mmap_set_chunk_busy(free_map, i); 372 } 373 374 /* Mark as busy chunk followed the last available chunk. */ 375 nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT); 376 nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT); 377 378 nxt_debug(task, "send mmap fd %FD to process %PI", fd, port->pid); 379 380 /* TODO handle error */ 381 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL); 382 383 nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> %PI", 384 hdr->id, nxt_pid, process->pid); 385 386 return mmap_handler; 387 388 remove_fail: 389 390 nxt_free(mmap_handler); 391 392 process->outgoing.size--; 393 394 return NULL; 395 } 396 397 398 static nxt_port_mmap_handler_t * 399 nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c, 400 nxt_int_t n, nxt_bool_t tracking) 401 { 402 nxt_int_t i, res, nchunks; 403 nxt_process_t *process; 404 nxt_free_map_t *free_map; 405 nxt_port_mmap_t *port_mmap; 406 nxt_port_mmap_t *end_port_mmap; 407 nxt_port_mmap_header_t *hdr; 408 nxt_port_mmap_handler_t *mmap_handler; 409 410 process = port->process; 411 if (nxt_slow_path(process == NULL)) { 412 return NULL; 413 } 414 415 nxt_thread_mutex_lock(&process->outgoing.mutex); 416 417 end_port_mmap = process->outgoing.elts + process->outgoing.size; 418 419 for (port_mmap = process->outgoing.elts; 420 port_mmap < end_port_mmap; 421 port_mmap++) 422 { 423 mmap_handler = port_mmap->mmap_handler; 424 hdr = mmap_handler->hdr; 425 426 if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port->id) { 427 continue; 428 } 429 430 *c = 0; 431 432 free_map = tracking ? hdr->free_tracking_map : hdr->free_map; 433 434 while (nxt_port_mmap_get_free_chunk(free_map, c)) { 435 nchunks = 1; 436 437 while (nchunks < n) { 438 res = nxt_port_mmap_chk_set_chunk_busy(free_map, *c + nchunks); 439 440 if (res == 0) { 441 for (i = 0; i < nchunks; i++) { 442 nxt_port_mmap_set_chunk_free(free_map, *c + i); 443 } 444 445 *c += nchunks + 1; 446 nchunks = 0; 447 break; 448 } 449 450 nchunks++; 451 } 452 453 if (nchunks == n) { 454 goto unlock_return; 455 } 456 } 457 } 458 459 /* TODO introduce port_mmap limit and release wait. */ 460 461 *c = 0; 462 mmap_handler = nxt_port_new_port_mmap(task, process, port, tracking, n); 463 464 unlock_return: 465 466 nxt_thread_mutex_unlock(&process->outgoing.mutex); 467 468 return mmap_handler; 469 } 470 471 472 static nxt_port_mmap_handler_t * 473 nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id) 474 { 475 nxt_process_t *process; 476 nxt_port_mmap_handler_t *mmap_handler; 477 478 process = nxt_runtime_process_find(task->thread->runtime, spid); 479 if (nxt_slow_path(process == NULL)) { 480 return NULL; 481 } 482 483 nxt_thread_mutex_lock(&process->incoming.mutex); 484 485 if (nxt_fast_path(process->incoming.size > id)) { 486 mmap_handler = process->incoming.elts[id].mmap_handler; 487 488 } else { 489 mmap_handler = NULL; 490 491 nxt_debug(task, "invalid incoming mmap id %uD for pid %PI", id, spid); 492 } 493 494 nxt_thread_mutex_unlock(&process->incoming.mutex); 495 496 return mmap_handler; 497 } 498 499 500 nxt_int_t 501 nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_t *port, 502 nxt_port_mmap_tracking_t *tracking, uint32_t stream) 503 { 504 nxt_chunk_id_t c; 505 nxt_port_mmap_header_t *hdr; 506 nxt_port_mmap_handler_t *mmap_handler; 507 508 nxt_debug(task, "request tracking for stream #%uD", stream); 509 510 mmap_handler = nxt_port_mmap_get(task, port, &c, 1, 1); 511 if (nxt_slow_path(mmap_handler == NULL)) { 512 return NXT_ERROR; 513 } 514 515 nxt_port_mmap_handler_use(mmap_handler, 1); 516 517 hdr = mmap_handler->hdr; 518 519 tracking->mmap_handler = mmap_handler; 520 tracking->tracking = hdr->tracking + c; 521 522 *tracking->tracking = stream; 523 524 nxt_debug(task, "outgoing tracking allocation: %PI->%PI,%d,%d", 525 hdr->src_pid, hdr->dst_pid, hdr->id, c); 526 527 return NXT_OK; 528 } 529 530 531 nxt_bool_t 532 nxt_port_mmap_tracking_cancel(nxt_task_t *task, 533 nxt_port_mmap_tracking_t *tracking, uint32_t stream) 534 { 535 nxt_bool_t res; 536 nxt_chunk_id_t c; 537 nxt_port_mmap_header_t *hdr; 538 nxt_port_mmap_handler_t *mmap_handler; 539 540 mmap_handler = tracking->mmap_handler; 541 542 if (nxt_slow_path(mmap_handler == NULL)) { 543 return 0; 544 } 545 546 hdr = mmap_handler->hdr; 547 548 res = nxt_atomic_cmp_set(tracking->tracking, stream, 0); 549 550 nxt_debug(task, "%s tracking for stream #%uD", 551 (res ? "cancelled" : "failed to cancel"), stream); 552 553 if (!res) { 554 c = tracking->tracking - hdr->tracking; 555 nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c); 556 } 557 558 nxt_port_mmap_handler_use(mmap_handler, -1); 559 560 return res; 561 } 562 563 564 nxt_int_t 565 nxt_port_mmap_tracking_write(uint32_t *buf, nxt_port_mmap_tracking_t *t) 566 { 567 nxt_port_mmap_handler_t *mmap_handler; 568 569 mmap_handler = t->mmap_handler; 570 571 #if (NXT_DEBUG) 572 { 573 nxt_atomic_t *tracking; 574 575 tracking = mmap_handler->hdr->tracking; 576 577 nxt_assert(t->tracking >= tracking); 578 nxt_assert(t->tracking < tracking + PORT_MMAP_CHUNK_COUNT); 579 } 580 #endif 581 582 buf[0] = mmap_handler->hdr->id; 583 buf[1] = t->tracking - mmap_handler->hdr->tracking; 584 585 return NXT_OK; 586 } 587 588 nxt_bool_t 589 nxt_port_mmap_tracking_read(nxt_task_t *task, nxt_port_recv_msg_t *msg) 590 { 591 nxt_buf_t *b; 592 nxt_bool_t res; 593 nxt_chunk_id_t c; 594 nxt_port_mmap_header_t *hdr; 595 nxt_port_mmap_handler_t *mmap_handler; 596 nxt_port_mmap_tracking_msg_t *tracking_msg; 597 598 b = msg->buf; 599 600 if (nxt_buf_used_size(b) < (int) sizeof(nxt_port_mmap_tracking_msg_t)) { 601 nxt_debug(task, "too small message %O", nxt_buf_used_size(b)); 602 return 0; 603 } 604 605 tracking_msg = (nxt_port_mmap_tracking_msg_t *) b->mem.pos; 606 607 b->mem.pos += sizeof(nxt_port_mmap_tracking_msg_t); 608 mmap_handler = nxt_port_get_port_incoming_mmap(task, msg->port_msg.pid, 609 tracking_msg->mmap_id); 610 611 if (nxt_slow_path(mmap_handler == NULL)) { 612 return 0; 613 } 614 615 hdr = mmap_handler->hdr; 616 617 c = tracking_msg->tracking_id; 618 res = nxt_atomic_cmp_set(hdr->tracking + c, msg->port_msg.stream, 0); 619 620 nxt_debug(task, "tracking for stream #%uD %s", msg->port_msg.stream, 621 (res ? "received" : "already cancelled")); 622 623 if (!res) { 624 nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c); 625 } 626 627 return res; 628 } 629 630 631 nxt_buf_t * 632 nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size) 633 { 634 nxt_mp_t *mp; 635 nxt_buf_t *b; 636 nxt_int_t nchunks; 637 nxt_chunk_id_t c; 638 nxt_port_mmap_header_t *hdr; 639 nxt_port_mmap_handler_t *mmap_handler; 640 641 nxt_debug(task, "request %z bytes shm buffer", size); 642 643 nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE; 644 645 if (nxt_slow_path(nchunks > PORT_MMAP_CHUNK_COUNT)) { 646 nxt_alert(task, "requested buffer (%z) too big", size); 647 648 return NULL; 649 } 650 651 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 0); 652 if (nxt_slow_path(b == NULL)) { 653 return NULL; 654 } 655 656 b->completion_handler = nxt_port_mmap_buf_completion; 657 nxt_buf_set_port_mmap(b); 658 659 mmap_handler = nxt_port_mmap_get(task, port, &c, nchunks, 0); 660 if (nxt_slow_path(mmap_handler == NULL)) { 661 mp = task->thread->engine->mem_pool; 662 nxt_mp_free(mp, b); 663 nxt_mp_release(mp); 664 return NULL; 665 } 666 667 b->parent = mmap_handler; 668 669 nxt_port_mmap_handler_use(mmap_handler, 1); 670 671 hdr = mmap_handler->hdr; 672 673 b->mem.start = nxt_port_mmap_chunk_start(hdr, c); 674 b->mem.pos = b->mem.start; 675 b->mem.free = b->mem.start; 676 b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE; 677 678 nxt_debug(task, "outgoing mmap buf allocation: %p [%p,%uz] %PI->%PI,%d,%d", 679 b, b->mem.start, b->mem.end - b->mem.start, 680 hdr->src_pid, hdr->dst_pid, hdr->id, c); 681 682 return b; 683 } 684 685 686 nxt_int_t 687 nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size, 688 size_t min_size) 689 { 690 size_t nchunks, free_size; 691 nxt_chunk_id_t c, start; 692 nxt_port_mmap_header_t *hdr; 693 nxt_port_mmap_handler_t *mmap_handler; 694 695 nxt_debug(task, "request increase %z bytes shm buffer", size); 696 697 if (nxt_slow_path(nxt_buf_is_port_mmap(b) == 0)) { 698 nxt_log(task, NXT_LOG_WARN, 699 "failed to increase, not a mmap buffer"); 700 return NXT_ERROR; 701 } 702 703 free_size = nxt_buf_mem_free_size(&b->mem); 704 705 if (nxt_slow_path(size <= free_size)) { 706 return NXT_OK; 707 } 708 709 mmap_handler = b->parent; 710 hdr = mmap_handler->hdr; 711 712 start = nxt_port_mmap_chunk_id(hdr, b->mem.end); 713 714 size -= free_size; 715 716 nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE; 717 718 c = start; 719 720 /* Try to acquire as much chunks as required. */ 721 while (nchunks > 0) { 722 723 if (nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, c) == 0) { 724 break; 725 } 726 727 c++; 728 nchunks--; 729 } 730 731 if (nchunks != 0 732 && min_size > free_size + PORT_MMAP_CHUNK_SIZE * (c - start)) 733 { 734 c--; 735 while (c >= start) { 736 nxt_port_mmap_set_chunk_free(hdr->free_map, c); 737 c--; 738 } 739 740 nxt_debug(task, "failed to increase, %uz chunks busy", nchunks); 741 742 return NXT_ERROR; 743 744 } else { 745 b->mem.end += PORT_MMAP_CHUNK_SIZE * (c - start); 746 747 return NXT_OK; 748 } 749 } 750 751 752 static nxt_buf_t * 753 nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port, 754 nxt_pid_t spid, nxt_port_mmap_msg_t *mmap_msg) 755 { 756 size_t nchunks; 757 nxt_buf_t *b; 758 nxt_port_mmap_header_t *hdr; 759 nxt_port_mmap_handler_t *mmap_handler; 760 761 mmap_handler = nxt_port_get_port_incoming_mmap(task, spid, 762 mmap_msg->mmap_id); 763 if (nxt_slow_path(mmap_handler == NULL)) { 764 return NULL; 765 } 766 767 b = nxt_buf_mem_ts_alloc(task, port->mem_pool, 0); 768 if (nxt_slow_path(b == NULL)) { 769 return NULL; 770 } 771 772 b->completion_handler = nxt_port_mmap_buf_completion; 773 774 nxt_buf_set_port_mmap(b); 775 776 nchunks = mmap_msg->size / PORT_MMAP_CHUNK_SIZE; 777 if ((mmap_msg->size % PORT_MMAP_CHUNK_SIZE) != 0) { 778 nchunks++; 779 } 780 781 hdr = mmap_handler->hdr; 782 783 b->mem.start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id); 784 b->mem.pos = b->mem.start; 785 b->mem.free = b->mem.start + mmap_msg->size; 786 b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE; 787 788 b->parent = mmap_handler; 789 nxt_port_mmap_handler_use(mmap_handler, 1); 790 791 nxt_debug(task, "incoming mmap buf allocation: %p [%p,%uz] %PI->%PI,%d,%d", 792 b, b->mem.start, b->mem.end - b->mem.start, 793 hdr->src_pid, hdr->dst_pid, hdr->id, mmap_msg->chunk_id); 794 795 return b; 796 } 797 798 799 void 800 nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port, 801 nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb) 802 { 803 size_t bsize; 804 nxt_buf_t *bmem; 805 nxt_uint_t i; 806 nxt_port_mmap_msg_t *mmap_msg; 807 nxt_port_mmap_header_t *hdr; 808 nxt_port_mmap_handler_t *mmap_handler; 809 810 nxt_debug(task, "prepare %z bytes message for transfer to process %PI " 811 "via shared memory", sb->size, port->pid); 812 813 bsize = sb->niov * sizeof(nxt_port_mmap_msg_t); 814 mmap_msg = port->mmsg_buf; 815 816 bmem = msg->buf; 817 818 for (i = 0; i < sb->niov; i++, mmap_msg++) { 819 820 /* Lookup buffer which starts current iov_base. */ 821 while (bmem && sb->iobuf[i].iov_base != bmem->mem.pos) { 822 bmem = bmem->next; 823 } 824 825 if (nxt_slow_path(bmem == NULL)) { 826 nxt_log_error(NXT_LOG_ERR, task->log, 827 "failed to find buf for iobuf[%d]", i); 828 return; 829 /* TODO clear b and exit */ 830 } 831 832 mmap_handler = bmem->parent; 833 hdr = mmap_handler->hdr; 834 835 mmap_msg->mmap_id = hdr->id; 836 mmap_msg->chunk_id = nxt_port_mmap_chunk_id(hdr, bmem->mem.pos); 837 mmap_msg->size = sb->iobuf[i].iov_len; 838 839 nxt_debug(task, "mmap_msg={%D, %D, %D} to %PI", 840 mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size, 841 port->pid); 842 } 843 844 sb->iobuf[0].iov_base = port->mmsg_buf; 845 sb->iobuf[0].iov_len = bsize; 846 sb->niov = 1; 847 sb->size = bsize; 848 849 msg->port_msg.mmap = 1; 850 } 851 852 853 void 854 nxt_port_mmap_read(nxt_task_t *task, nxt_port_recv_msg_t *msg) 855 { 856 nxt_buf_t *b, **pb; 857 nxt_port_mmap_msg_t *end, *mmap_msg; 858 859 pb = &msg->buf; 860 msg->size = 0; 861 862 for (b = msg->buf; b != NULL; b = b->next) { 863 864 mmap_msg = (nxt_port_mmap_msg_t *) b->mem.pos; 865 end = (nxt_port_mmap_msg_t *) b->mem.free; 866 867 while (mmap_msg < end) { 868 nxt_debug(task, "mmap_msg={%D, %D, %D} from %PI", 869 mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size, 870 msg->port_msg.pid); 871 872 *pb = nxt_port_mmap_get_incoming_buf(task, msg->port, 873 msg->port_msg.pid, mmap_msg); 874 if (nxt_slow_path(*pb == NULL)) { 875 nxt_log_error(NXT_LOG_ERR, task->log, 876 "failed to get mmap buffer"); 877 878 break; 879 } 880 881 msg->size += mmap_msg->size; 882 pb = &(*pb)->next; 883 mmap_msg++; 884 885 /* Mark original buf as complete. */ 886 b->mem.pos += sizeof(nxt_port_mmap_msg_t); 887 } 888 } 889 } 890 891 892 nxt_port_method_t 893 nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b) 894 { 895 nxt_port_method_t m; 896 nxt_port_mmap_header_t *hdr; 897 nxt_port_mmap_handler_t *mmap_handler; 898 899 m = NXT_PORT_METHOD_ANY; 900 901 for (/* void */; b != NULL; b = b->next) { 902 if (nxt_buf_used_size(b) == 0) { 903 /* empty buffers does not affect method */ 904 continue; 905 } 906 907 if (nxt_buf_is_port_mmap(b)) { 908 mmap_handler = b->parent; 909 hdr = mmap_handler->hdr; 910 911 if (m == NXT_PORT_METHOD_PLAIN) { 912 nxt_log_error(NXT_LOG_ERR, task->log, 913 "mixing plain and mmap buffers, " 914 "using plain mode"); 915 916 break; 917 } 918 919 if (port->pid != hdr->dst_pid) { 920 nxt_log_error(NXT_LOG_ERR, task->log, 921 "send mmap buffer for %PI to %PI, " 922 "using plain mode", hdr->dst_pid, port->pid); 923 924 m = NXT_PORT_METHOD_PLAIN; 925 926 break; 927 } 928 929 if (m == NXT_PORT_METHOD_ANY) { 930 nxt_debug(task, "using mmap mode"); 931 932 m = NXT_PORT_METHOD_MMAP; 933 } 934 } else { 935 if (m == NXT_PORT_METHOD_MMAP) { 936 nxt_log_error(NXT_LOG_ERR, task->log, 937 "mixing mmap and plain buffers, " 938 "switching to plain mode"); 939 940 m = NXT_PORT_METHOD_PLAIN; 941 942 break; 943 } 944 945 if (m == NXT_PORT_METHOD_ANY) { 946 nxt_debug(task, "using plain mode"); 947 948 m = NXT_PORT_METHOD_PLAIN; 949 } 950 } 951 } 952 953 return m; 954 } 955