1 2 /* 3 * Copyright (C) Max Romanov 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 #if (NXT_HAVE_MEMFD_CREATE) 10 11 #include <linux/memfd.h> 12 #include <unistd.h> 13 #include <sys/syscall.h> 14 15 #endif 16 17 #include <nxt_port_memory_int.h> 18 19 20 nxt_inline void 21 nxt_port_mmap_handler_use(nxt_port_mmap_handler_t *mmap_handler, int i) 22 { 23 int c; 24 25 c = nxt_atomic_fetch_add(&mmap_handler->use_count, i); 26 27 if (i < 0 && c == -i) { 28 if (mmap_handler->hdr != NULL) { 29 nxt_mem_munmap(mmap_handler->hdr, PORT_MMAP_SIZE); 30 mmap_handler->hdr = NULL; 31 } 32 33 nxt_free(mmap_handler); 34 } 35 } 36 37 38 static nxt_port_mmap_t * 39 nxt_port_mmap_at(nxt_port_mmaps_t *port_mmaps, uint32_t i) 40 { 41 uint32_t cap; 42 43 cap = port_mmaps->cap; 44 45 if (cap == 0) { 46 cap = i + 1; 47 } 48 49 while (i + 1 > cap) { 50 51 if (cap < 16) { 52 cap = cap * 2; 53 54 } else { 55 cap = cap + cap / 2; 56 } 57 } 58 59 if (cap != port_mmaps->cap) { 60 61 port_mmaps->elts = nxt_realloc(port_mmaps->elts, 62 cap * sizeof(nxt_port_mmap_t)); 63 if (nxt_slow_path(port_mmaps->elts == NULL)) { 64 return NULL; 65 } 66 67 nxt_memzero(port_mmaps->elts + port_mmaps->cap, 68 sizeof(nxt_port_mmap_t) * (cap - port_mmaps->cap)); 69 70 port_mmaps->cap = cap; 71 } 72 73 if (i + 1 > port_mmaps->size) { 74 port_mmaps->size = i + 1; 75 } 76 77 return port_mmaps->elts + i; 78 } 79 80 81 void 82 nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free_elts) 83 { 84 uint32_t i; 85 nxt_port_mmap_t *port_mmap; 86 87 if (port_mmaps == NULL) { 88 return; 89 } 90 91 port_mmap = port_mmaps->elts; 92 93 for (i = 0; i < port_mmaps->size; i++) { 94 nxt_port_mmap_handler_use(port_mmap[i].mmap_handler, -1); 95 } 96 97 port_mmaps->size = 0; 98 99 if (free_elts != 0) { 100 nxt_free(port_mmaps->elts); 101 } 102 } 103 104 105 #define nxt_port_mmap_free_junk(p, size) \ 106 memset((p), 0xA5, size) 107 108 109 static void 110 nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data) 111 { 112 u_char *p; 113 nxt_mp_t *mp; 114 nxt_buf_t *b; 115 nxt_port_t *port; 116 nxt_process_t *process; 117 nxt_chunk_id_t c; 118 nxt_port_mmap_header_t *hdr; 119 nxt_port_mmap_handler_t *mmap_handler; 120 121 if (nxt_buf_ts_handle(task, obj, data)) { 122 return; 123 } 124 125 b = obj; 126 127 mp = b->data; 128 129 nxt_assert(data == b->parent); 130 131 mmap_handler = data; 132 hdr = mmap_handler->hdr; 133 134 if (nxt_slow_path(hdr->src_pid != nxt_pid && hdr->dst_pid != nxt_pid)) { 135 nxt_debug(task, "mmap buf completion: mmap for other process pair " 136 "%PI->%PI", hdr->src_pid, hdr->dst_pid); 137 138 goto release_buf; 139 } 140 141 if (b->is_port_mmap_sent && b->mem.pos > b->mem.start) { 142 /* 143 * Chunks until b->mem.pos has been sent to other side, 144 * let's release rest (if any). 145 */ 146 p = b->mem.pos - 1; 147 c = nxt_port_mmap_chunk_id(hdr, p) + 1; 148 p = nxt_port_mmap_chunk_start(hdr, c); 149 150 } else { 151 p = b->mem.start; 152 c = nxt_port_mmap_chunk_id(hdr, p); 153 } 154 155 nxt_port_mmap_free_junk(p, b->mem.end - p); 156 157 nxt_debug(task, "mmap buf completion: %p [%p,%uz] (sent=%d), " 158 "%PI->%PI,%d,%d", b, b->mem.start, b->mem.end - b->mem.start, 159 b->is_port_mmap_sent, hdr->src_pid, hdr->dst_pid, hdr->id, c); 160 161 while (p < b->mem.end) { 162 nxt_port_mmap_set_chunk_free(hdr->free_map, c); 163 164 p += PORT_MMAP_CHUNK_SIZE; 165 c++; 166 } 167 168 if (hdr->dst_pid == nxt_pid 169 && nxt_atomic_cmp_set(&hdr->oosm, 1, 0)) 170 { 171 process = nxt_runtime_process_find(task->thread->runtime, hdr->src_pid); 172 173 if (process != NULL && !nxt_queue_is_empty(&process->ports)) { 174 port = nxt_process_port_first(process); 175 176 if (port->type == NXT_PROCESS_WORKER) { 177 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_SHM_ACK, 178 -1, 0, 0, NULL); 179 } 180 } 181 } 182 183 release_buf: 184 185 nxt_port_mmap_handler_use(mmap_handler, -1); 186 187 nxt_mp_free(mp, b); 188 nxt_mp_release(mp); 189 } 190 191 192 nxt_port_mmap_handler_t * 193 nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process, 194 nxt_fd_t fd) 195 { 196 void *mem; 197 struct stat mmap_stat; 198 nxt_port_mmap_t *port_mmap; 199 nxt_port_mmap_header_t *hdr; 200 nxt_port_mmap_handler_t *mmap_handler; 201 202 nxt_debug(task, "got new mmap fd #%FD from process %PI", 203 fd, process->pid); 204 205 port_mmap = NULL; 206 207 if (fstat(fd, &mmap_stat) == -1) { 208 nxt_log(task, NXT_LOG_WARN, "fstat(%FD) failed %E", fd, nxt_errno); 209 210 return NULL; 211 } 212 213 mem = nxt_mem_mmap(NULL, mmap_stat.st_size, 214 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 215 216 if (nxt_slow_path(mem == MAP_FAILED)) { 217 nxt_log(task, NXT_LOG_WARN, "mmap() failed %E", nxt_errno); 218 219 return NULL; 220 } 221 222 hdr = mem; 223 224 mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t)); 225 if (nxt_slow_path(mmap_handler == NULL)) { 226 nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler"); 227 228 nxt_mem_munmap(mem, PORT_MMAP_SIZE); 229 230 return NULL; 231 } 232 233 mmap_handler->hdr = hdr; 234 235 if (nxt_slow_path(hdr->src_pid != process->pid 236 || hdr->dst_pid != nxt_pid)) 237 { 238 nxt_log(task, NXT_LOG_WARN, "unexpected pid in mmap header detected: " 239 "%PI != %PI or %PI != %PI", hdr->src_pid, process->pid, 240 hdr->dst_pid, nxt_pid); 241 242 return NULL; 243 } 244 245 nxt_thread_mutex_lock(&process->incoming.mutex); 246 247 port_mmap = nxt_port_mmap_at(&process->incoming, hdr->id); 248 if (nxt_slow_path(port_mmap == NULL)) { 249 nxt_log(task, NXT_LOG_WARN, "failed to add mmap to incoming array"); 250 251 nxt_mem_munmap(mem, PORT_MMAP_SIZE); 252 hdr = NULL; 253 254 nxt_free(mmap_handler); 255 mmap_handler = NULL; 256 257 goto fail; 258 } 259 260 port_mmap->mmap_handler = mmap_handler; 261 nxt_port_mmap_handler_use(mmap_handler, 1); 262 263 hdr->sent_over = 0xFFFFu; 264 265 fail: 266 267 nxt_thread_mutex_unlock(&process->incoming.mutex); 268 269 return mmap_handler; 270 } 271 272 273 static nxt_port_mmap_handler_t * 274 nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, 275 nxt_port_t *port, nxt_bool_t tracking, nxt_int_t n) 276 { 277 void *mem; 278 u_char *p, name[64]; 279 nxt_fd_t fd; 280 nxt_int_t i; 281 nxt_free_map_t *free_map; 282 nxt_port_mmap_t *port_mmap; 283 nxt_port_mmap_header_t *hdr; 284 nxt_port_mmap_handler_t *mmap_handler; 285 286 mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t)); 287 if (nxt_slow_path(mmap_handler == NULL)) { 288 nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler"); 289 290 return NULL; 291 } 292 293 port_mmap = nxt_port_mmap_at(&process->outgoing, process->outgoing.size); 294 if (nxt_slow_path(port_mmap == NULL)) { 295 nxt_log(task, NXT_LOG_WARN, 296 "failed to add port mmap to outgoing array"); 297 298 nxt_free(mmap_handler); 299 return NULL; 300 } 301 302 p = nxt_sprintf(name, name + sizeof(name), NXT_SHM_PREFIX "unit.%PI.%uxD", 303 nxt_pid, nxt_random(&task->thread->random)); 304 *p = '\0'; 305 306 #if (NXT_HAVE_MEMFD_CREATE) 307 308 fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC); 309 310 if (nxt_slow_path(fd == -1)) { 311 nxt_alert(task, "memfd_create(%s) failed %E", name, nxt_errno); 312 313 goto remove_fail; 314 } 315 316 nxt_debug(task, "memfd_create(%s): %FD", name, fd); 317 318 #elif (NXT_HAVE_SHM_OPEN_ANON) 319 320 fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR); 321 322 nxt_debug(task, "shm_open(SHM_ANON): %FD", fd); 323 324 if (nxt_slow_path(fd == -1)) { 325 nxt_alert(task, "shm_open(SHM_ANON) failed %E", nxt_errno); 326 327 goto remove_fail; 328 } 329 330 #elif (NXT_HAVE_SHM_OPEN) 331 332 /* Just in case. */ 333 shm_unlink((char *) name); 334 335 fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR); 336 337 nxt_debug(task, "shm_open(%s): %FD", name, fd); 338 339 if (nxt_slow_path(fd == -1)) { 340 nxt_alert(task, "shm_open(%s) failed %E", name, nxt_errno); 341 342 goto remove_fail; 343 } 344 345 if (nxt_slow_path(shm_unlink((char *) name) == -1)) { 346 nxt_log(task, NXT_LOG_WARN, "shm_unlink(%s) failed %E", name, 347 nxt_errno); 348 } 349 350 #else 351 352 #error No working shared memory implementation. 353 354 #endif 355 356 if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) { 357 nxt_log(task, NXT_LOG_WARN, "ftruncate() failed %E", nxt_errno); 358 359 goto remove_fail; 360 } 361 362 mem = nxt_mem_mmap(NULL, PORT_MMAP_SIZE, PROT_READ | PROT_WRITE, 363 MAP_SHARED, fd, 0); 364 365 if (nxt_slow_path(mem == MAP_FAILED)) { 366 goto remove_fail; 367 } 368 369 mmap_handler->hdr = mem; 370 port_mmap->mmap_handler = mmap_handler; 371 nxt_port_mmap_handler_use(mmap_handler, 1); 372 373 /* Init segment header. */ 374 hdr = mmap_handler->hdr; 375 376 nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map)); 377 nxt_memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map)); 378 379 hdr->id = process->outgoing.size - 1; 380 hdr->src_pid = nxt_pid; 381 hdr->dst_pid = process->pid; 382 hdr->sent_over = port->id; 383 384 /* Mark first chunk as busy */ 385 free_map = tracking ? hdr->free_tracking_map : hdr->free_map; 386 387 for (i = 0; i < n; i++) { 388 nxt_port_mmap_set_chunk_busy(free_map, i); 389 } 390 391 /* Mark as busy chunk followed the last available chunk. */ 392 nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT); 393 nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT); 394 395 nxt_debug(task, "send mmap fd %FD to process %PI", fd, port->pid); 396 397 /* TODO handle error */ 398 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL); 399 400 nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> %PI", 401 hdr->id, nxt_pid, process->pid); 402 403 return mmap_handler; 404 405 remove_fail: 406 407 nxt_free(mmap_handler); 408 409 process->outgoing.size--; 410 411 return NULL; 412 } 413 414 415 static nxt_port_mmap_handler_t * 416 nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c, 417 nxt_int_t n, nxt_bool_t tracking) 418 { 419 nxt_int_t i, res, nchunks; 420 nxt_process_t *process; 421 nxt_free_map_t *free_map; 422 nxt_port_mmap_t *port_mmap; 423 nxt_port_mmap_t *end_port_mmap; 424 nxt_port_mmap_header_t *hdr; 425 nxt_port_mmap_handler_t *mmap_handler; 426 427 process = port->process; 428 if (nxt_slow_path(process == NULL)) { 429 return NULL; 430 } 431 432 nxt_thread_mutex_lock(&process->outgoing.mutex); 433 434 end_port_mmap = process->outgoing.elts + process->outgoing.size; 435 436 for (port_mmap = process->outgoing.elts; 437 port_mmap < end_port_mmap; 438 port_mmap++) 439 { 440 mmap_handler = port_mmap->mmap_handler; 441 hdr = mmap_handler->hdr; 442 443 if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port->id) { 444 continue; 445 } 446 447 *c = 0; 448 449 free_map = tracking ? hdr->free_tracking_map : hdr->free_map; 450 451 while (nxt_port_mmap_get_free_chunk(free_map, c)) { 452 nchunks = 1; 453 454 while (nchunks < n) { 455 res = nxt_port_mmap_chk_set_chunk_busy(free_map, *c + nchunks); 456 457 if (res == 0) { 458 for (i = 0; i < nchunks; i++) { 459 nxt_port_mmap_set_chunk_free(free_map, *c + i); 460 } 461 462 *c += nchunks + 1; 463 nchunks = 0; 464 break; 465 } 466 467 nchunks++; 468 } 469 470 if (nchunks == n) { 471 goto unlock_return; 472 } 473 } 474 475 hdr->oosm = 1; 476 } 477 478 /* TODO introduce port_mmap limit and release wait. */ 479 480 *c = 0; 481 mmap_handler = nxt_port_new_port_mmap(task, process, port, tracking, n); 482 483 unlock_return: 484 485 nxt_thread_mutex_unlock(&process->outgoing.mutex); 486 487 return mmap_handler; 488 } 489 490 491 static nxt_port_mmap_handler_t * 492 nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id) 493 { 494 nxt_process_t *process; 495 nxt_port_mmap_handler_t *mmap_handler; 496 497 process = nxt_runtime_process_find(task->thread->runtime, spid); 498 if (nxt_slow_path(process == NULL)) { 499 return NULL; 500 } 501 502 nxt_thread_mutex_lock(&process->incoming.mutex); 503 504 if (nxt_fast_path(process->incoming.size > id)) { 505 mmap_handler = process->incoming.elts[id].mmap_handler; 506 507 } else { 508 mmap_handler = NULL; 509 510 nxt_debug(task, "invalid incoming mmap id %uD for pid %PI", id, spid); 511 } 512 513 nxt_thread_mutex_unlock(&process->incoming.mutex); 514 515 return mmap_handler; 516 } 517 518 519 nxt_int_t 520 nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_t *port, 521 nxt_port_mmap_tracking_t *tracking, uint32_t stream) 522 { 523 nxt_chunk_id_t c; 524 nxt_port_mmap_header_t *hdr; 525 nxt_port_mmap_handler_t *mmap_handler; 526 527 nxt_debug(task, "request tracking for stream #%uD", stream); 528 529 mmap_handler = nxt_port_mmap_get(task, port, &c, 1, 1); 530 if (nxt_slow_path(mmap_handler == NULL)) { 531 return NXT_ERROR; 532 } 533 534 nxt_port_mmap_handler_use(mmap_handler, 1); 535 536 hdr = mmap_handler->hdr; 537 538 tracking->mmap_handler = mmap_handler; 539 tracking->tracking = hdr->tracking + c; 540 541 *tracking->tracking = stream; 542 543 nxt_debug(task, "outgoing tracking allocation: %PI->%PI,%d,%d", 544 hdr->src_pid, hdr->dst_pid, hdr->id, c); 545 546 return NXT_OK; 547 } 548 549 550 nxt_bool_t 551 nxt_port_mmap_tracking_cancel(nxt_task_t *task, 552 nxt_port_mmap_tracking_t *tracking, uint32_t stream) 553 { 554 nxt_bool_t res; 555 nxt_chunk_id_t c; 556 nxt_port_mmap_header_t *hdr; 557 nxt_port_mmap_handler_t *mmap_handler; 558 559 mmap_handler = tracking->mmap_handler; 560 561 if (nxt_slow_path(mmap_handler == NULL)) { 562 return 0; 563 } 564 565 hdr = mmap_handler->hdr; 566 567 res = nxt_atomic_cmp_set(tracking->tracking, stream, 0); 568 569 nxt_debug(task, "%s tracking for stream #%uD", 570 (res ? "cancelled" : "failed to cancel"), stream); 571 572 if (!res) { 573 c = tracking->tracking - hdr->tracking; 574 nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c); 575 } 576 577 nxt_port_mmap_handler_use(mmap_handler, -1); 578 579 return res; 580 } 581 582 583 nxt_int_t 584 nxt_port_mmap_tracking_write(uint32_t *buf, nxt_port_mmap_tracking_t *t) 585 { 586 nxt_port_mmap_handler_t *mmap_handler; 587 588 mmap_handler = t->mmap_handler; 589 590 #if (NXT_DEBUG) 591 { 592 nxt_atomic_t *tracking; 593 594 tracking = mmap_handler->hdr->tracking; 595 596 nxt_assert(t->tracking >= tracking); 597 nxt_assert(t->tracking < tracking + PORT_MMAP_CHUNK_COUNT); 598 } 599 #endif 600 601 buf[0] = mmap_handler->hdr->id; 602 buf[1] = t->tracking - mmap_handler->hdr->tracking; 603 604 return NXT_OK; 605 } 606 607 nxt_bool_t 608 nxt_port_mmap_tracking_read(nxt_task_t *task, nxt_port_recv_msg_t *msg) 609 { 610 nxt_buf_t *b; 611 nxt_bool_t res; 612 nxt_chunk_id_t c; 613 nxt_port_mmap_header_t *hdr; 614 nxt_port_mmap_handler_t *mmap_handler; 615 nxt_port_mmap_tracking_msg_t *tracking_msg; 616 617 b = msg->buf; 618 619 if (nxt_buf_used_size(b) < (int) sizeof(nxt_port_mmap_tracking_msg_t)) { 620 nxt_debug(task, "too small message %O", nxt_buf_used_size(b)); 621 return 0; 622 } 623 624 tracking_msg = (nxt_port_mmap_tracking_msg_t *) b->mem.pos; 625 626 b->mem.pos += sizeof(nxt_port_mmap_tracking_msg_t); 627 mmap_handler = nxt_port_get_port_incoming_mmap(task, msg->port_msg.pid, 628 tracking_msg->mmap_id); 629 630 if (nxt_slow_path(mmap_handler == NULL)) { 631 return 0; 632 } 633 634 hdr = mmap_handler->hdr; 635 636 c = tracking_msg->tracking_id; 637 res = nxt_atomic_cmp_set(hdr->tracking + c, msg->port_msg.stream, 0); 638 639 nxt_debug(task, "tracking for stream #%uD %s", msg->port_msg.stream, 640 (res ? "received" : "already cancelled")); 641 642 if (!res) { 643 nxt_port_mmap_set_chunk_free(hdr->free_tracking_map, c); 644 } 645 646 return res; 647 } 648 649 650 nxt_buf_t * 651 nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size) 652 { 653 nxt_mp_t *mp; 654 nxt_buf_t *b; 655 nxt_int_t nchunks; 656 nxt_chunk_id_t c; 657 nxt_port_mmap_header_t *hdr; 658 nxt_port_mmap_handler_t *mmap_handler; 659 660 nxt_debug(task, "request %z bytes shm buffer", size); 661 662 nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE; 663 664 if (nxt_slow_path(nchunks > PORT_MMAP_CHUNK_COUNT)) { 665 nxt_alert(task, "requested buffer (%z) too big", size); 666 667 return NULL; 668 } 669 670 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 0); 671 if (nxt_slow_path(b == NULL)) { 672 return NULL; 673 } 674 675 b->completion_handler = nxt_port_mmap_buf_completion; 676 nxt_buf_set_port_mmap(b); 677 678 mmap_handler = nxt_port_mmap_get(task, port, &c, nchunks, 0); 679 if (nxt_slow_path(mmap_handler == NULL)) { 680 mp = task->thread->engine->mem_pool; 681 nxt_mp_free(mp, b); 682 nxt_mp_release(mp); 683 return NULL; 684 } 685 686 b->parent = mmap_handler; 687 688 nxt_port_mmap_handler_use(mmap_handler, 1); 689 690 hdr = mmap_handler->hdr; 691 692 b->mem.start = nxt_port_mmap_chunk_start(hdr, c); 693 b->mem.pos = b->mem.start; 694 b->mem.free = b->mem.start; 695 b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE; 696 697 nxt_debug(task, "outgoing mmap buf allocation: %p [%p,%uz] %PI->%PI,%d,%d", 698 b, b->mem.start, b->mem.end - b->mem.start, 699 hdr->src_pid, hdr->dst_pid, hdr->id, c); 700 701 return b; 702 } 703 704 705 nxt_int_t 706 nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size, 707 size_t min_size) 708 { 709 size_t nchunks, free_size; 710 nxt_chunk_id_t c, start; 711 nxt_port_mmap_header_t *hdr; 712 nxt_port_mmap_handler_t *mmap_handler; 713 714 nxt_debug(task, "request increase %z bytes shm buffer", size); 715 716 if (nxt_slow_path(nxt_buf_is_port_mmap(b) == 0)) { 717 nxt_log(task, NXT_LOG_WARN, 718 "failed to increase, not a mmap buffer"); 719 return NXT_ERROR; 720 } 721 722 free_size = nxt_buf_mem_free_size(&b->mem); 723 724 if (nxt_slow_path(size <= free_size)) { 725 return NXT_OK; 726 } 727 728 mmap_handler = b->parent; 729 hdr = mmap_handler->hdr; 730 731 start = nxt_port_mmap_chunk_id(hdr, b->mem.end); 732 733 size -= free_size; 734 735 nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE; 736 737 c = start; 738 739 /* Try to acquire as much chunks as required. */ 740 while (nchunks > 0) { 741 742 if (nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, c) == 0) { 743 break; 744 } 745 746 c++; 747 nchunks--; 748 } 749 750 if (nchunks != 0 751 && min_size > free_size + PORT_MMAP_CHUNK_SIZE * (c - start)) 752 { 753 c--; 754 while (c >= start) { 755 nxt_port_mmap_set_chunk_free(hdr->free_map, c); 756 c--; 757 } 758 759 nxt_debug(task, "failed to increase, %uz chunks busy", nchunks); 760 761 return NXT_ERROR; 762 763 } else { 764 b->mem.end += PORT_MMAP_CHUNK_SIZE * (c - start); 765 766 return NXT_OK; 767 } 768 } 769 770 771 static nxt_buf_t * 772 nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port, 773 nxt_pid_t spid, nxt_port_mmap_msg_t *mmap_msg) 774 { 775 size_t nchunks; 776 nxt_buf_t *b; 777 nxt_port_mmap_header_t *hdr; 778 nxt_port_mmap_handler_t *mmap_handler; 779 780 mmap_handler = nxt_port_get_port_incoming_mmap(task, spid, 781 mmap_msg->mmap_id); 782 if (nxt_slow_path(mmap_handler == NULL)) { 783 return NULL; 784 } 785 786 b = nxt_buf_mem_ts_alloc(task, port->mem_pool, 0); 787 if (nxt_slow_path(b == NULL)) { 788 return NULL; 789 } 790 791 b->completion_handler = nxt_port_mmap_buf_completion; 792 793 nxt_buf_set_port_mmap(b); 794 795 nchunks = mmap_msg->size / PORT_MMAP_CHUNK_SIZE; 796 if ((mmap_msg->size % PORT_MMAP_CHUNK_SIZE) != 0) { 797 nchunks++; 798 } 799 800 hdr = mmap_handler->hdr; 801 802 b->mem.start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id); 803 b->mem.pos = b->mem.start; 804 b->mem.free = b->mem.start + mmap_msg->size; 805 b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE; 806 807 b->parent = mmap_handler; 808 nxt_port_mmap_handler_use(mmap_handler, 1); 809 810 nxt_debug(task, "incoming mmap buf allocation: %p [%p,%uz] %PI->%PI,%d,%d", 811 b, b->mem.start, b->mem.end - b->mem.start, 812 hdr->src_pid, hdr->dst_pid, hdr->id, mmap_msg->chunk_id); 813 814 return b; 815 } 816 817 818 void 819 nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port, 820 nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb, void *mmsg_buf) 821 { 822 size_t bsize; 823 nxt_buf_t *bmem; 824 nxt_uint_t i; 825 nxt_port_mmap_msg_t *mmap_msg; 826 nxt_port_mmap_header_t *hdr; 827 nxt_port_mmap_handler_t *mmap_handler; 828 829 nxt_debug(task, "prepare %z bytes message for transfer to process %PI " 830 "via shared memory", sb->size, port->pid); 831 832 bsize = sb->niov * sizeof(nxt_port_mmap_msg_t); 833 mmap_msg = mmsg_buf; 834 835 bmem = msg->buf; 836 837 for (i = 0; i < sb->niov; i++, mmap_msg++) { 838 839 /* Lookup buffer which starts current iov_base. */ 840 while (bmem && sb->iobuf[i].iov_base != bmem->mem.pos) { 841 bmem = bmem->next; 842 } 843 844 if (nxt_slow_path(bmem == NULL)) { 845 nxt_log_error(NXT_LOG_ERR, task->log, 846 "failed to find buf for iobuf[%d]", i); 847 return; 848 /* TODO clear b and exit */ 849 } 850 851 mmap_handler = bmem->parent; 852 hdr = mmap_handler->hdr; 853 854 mmap_msg->mmap_id = hdr->id; 855 mmap_msg->chunk_id = nxt_port_mmap_chunk_id(hdr, bmem->mem.pos); 856 mmap_msg->size = sb->iobuf[i].iov_len; 857 858 nxt_debug(task, "mmap_msg={%D, %D, %D} to %PI", 859 mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size, 860 port->pid); 861 } 862 863 sb->iobuf[0].iov_base = mmsg_buf; 864 sb->iobuf[0].iov_len = bsize; 865 sb->niov = 1; 866 sb->size = bsize; 867 868 msg->port_msg.mmap = 1; 869 } 870 871 872 void 873 nxt_port_mmap_read(nxt_task_t *task, nxt_port_recv_msg_t *msg) 874 { 875 nxt_buf_t *b, **pb; 876 nxt_port_mmap_msg_t *end, *mmap_msg; 877 878 pb = &msg->buf; 879 msg->size = 0; 880 881 for (b = msg->buf; b != NULL; b = b->next) { 882 883 mmap_msg = (nxt_port_mmap_msg_t *) b->mem.pos; 884 end = (nxt_port_mmap_msg_t *) b->mem.free; 885 886 while (mmap_msg < end) { 887 nxt_debug(task, "mmap_msg={%D, %D, %D} from %PI", 888 mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size, 889 msg->port_msg.pid); 890 891 *pb = nxt_port_mmap_get_incoming_buf(task, msg->port, 892 msg->port_msg.pid, mmap_msg); 893 if (nxt_slow_path(*pb == NULL)) { 894 nxt_log_error(NXT_LOG_ERR, task->log, 895 "failed to get mmap buffer"); 896 897 break; 898 } 899 900 msg->size += mmap_msg->size; 901 pb = &(*pb)->next; 902 mmap_msg++; 903 904 /* Mark original buf as complete. */ 905 b->mem.pos += sizeof(nxt_port_mmap_msg_t); 906 } 907 } 908 } 909 910 911 nxt_port_method_t 912 nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b) 913 { 914 nxt_port_method_t m; 915 nxt_port_mmap_header_t *hdr; 916 nxt_port_mmap_handler_t *mmap_handler; 917 918 m = NXT_PORT_METHOD_ANY; 919 920 for (/* void */; b != NULL; b = b->next) { 921 if (nxt_buf_used_size(b) == 0) { 922 /* empty buffers does not affect method */ 923 continue; 924 } 925 926 if (nxt_buf_is_port_mmap(b)) { 927 mmap_handler = b->parent; 928 hdr = mmap_handler->hdr; 929 930 if (m == NXT_PORT_METHOD_PLAIN) { 931 nxt_log_error(NXT_LOG_ERR, task->log, 932 "mixing plain and mmap buffers, " 933 "using plain mode"); 934 935 break; 936 } 937 938 if (port->pid != hdr->dst_pid) { 939 nxt_log_error(NXT_LOG_ERR, task->log, 940 "send mmap buffer for %PI to %PI, " 941 "using plain mode", hdr->dst_pid, port->pid); 942 943 m = NXT_PORT_METHOD_PLAIN; 944 945 break; 946 } 947 948 if (m == NXT_PORT_METHOD_ANY) { 949 nxt_debug(task, "using mmap mode"); 950 951 m = NXT_PORT_METHOD_MMAP; 952 } 953 } else { 954 if (m == NXT_PORT_METHOD_MMAP) { 955 nxt_log_error(NXT_LOG_ERR, task->log, 956 "mixing mmap and plain buffers, " 957 "switching to plain mode"); 958 959 m = NXT_PORT_METHOD_PLAIN; 960 961 break; 962 } 963 964 if (m == NXT_PORT_METHOD_ANY) { 965 nxt_debug(task, "using plain mode"); 966 967 m = NXT_PORT_METHOD_PLAIN; 968 } 969 } 970 } 971 972 return m; 973 } 974