1 2 /* 3 * Copyright (C) Max Romanov 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 #if (NXT_HAVE_MEMFD_CREATE) 10 11 #include <linux/memfd.h> 12 #include <unistd.h> 13 #include <sys/syscall.h> 14 15 #endif 16 17 #include <nxt_port_memory_int.h> 18 19 void 20 nxt_port_mmap_destroy(nxt_port_mmap_t *port_mmap) 21 { 22 if (port_mmap->hdr != NULL) { 23 nxt_mem_munmap(port_mmap->hdr, PORT_MMAP_SIZE); 24 port_mmap->hdr = NULL; 25 } 26 } 27 28 29 static nxt_array_t * 30 nxt_port_mmaps_create() 31 { 32 nxt_mp_t *mp; 33 34 mp = nxt_mp_create(1024, 128, 256, 32); 35 36 if (nxt_slow_path(mp == NULL)) { 37 return NULL; 38 } 39 40 return nxt_array_create(mp, 1, sizeof(nxt_port_mmap_t)); 41 } 42 43 44 static nxt_port_mmap_t * 45 nxt_port_mmap_add(nxt_array_t *port_mmaps) 46 { 47 nxt_mp_thread_adopt(port_mmaps->mem_pool); 48 49 return nxt_array_zero_add(port_mmaps); 50 } 51 52 53 void 54 nxt_port_mmaps_destroy(nxt_array_t *port_mmaps, nxt_bool_t destroy_pool) 55 { 56 uint32_t i; 57 nxt_port_mmap_t *port_mmap; 58 59 if (port_mmaps == NULL) { 60 return; 61 } 62 63 nxt_mp_thread_adopt(port_mmaps->mem_pool); 64 65 port_mmap = port_mmaps->elts; 66 67 for (i = 0; i < port_mmaps->nelts; i++) { 68 nxt_port_mmap_destroy(port_mmap); 69 } 70 71 port_mmaps->nelts = 0; 72 73 if (destroy_pool != 0) { 74 nxt_mp_destroy(port_mmaps->mem_pool); 75 } 76 } 77 78 79 #define nxt_port_mmap_free_junk(p, size) \ 80 memset((p), 0xA5, size) 81 82 83 static void 84 nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data) 85 { 86 u_char *p; 87 nxt_mp_t *mp; 88 nxt_buf_t *b; 89 nxt_chunk_id_t c; 90 nxt_port_mmap_header_t *hdr; 91 92 if (nxt_buf_ts_handle(task, obj, data)) { 93 return; 94 } 95 96 b = obj; 97 98 mp = b->data; 99 100 #if (NXT_DEBUG) 101 if (nxt_slow_path(data != b->parent)) { 102 nxt_log_alert(task->log, "completion data (%p) != b->parent (%p)", 103 data, b->parent); 104 nxt_abort(); 105 } 106 #endif 107 108 hdr = data; 109 110 if (b->is_port_mmap_sent && b->mem.pos > b->mem.start) { 111 /* 112 * Chunks until b->mem.pos has been sent to other side, 113 * let's release rest (if any). 114 */ 115 p = b->mem.pos - 1; 116 c = nxt_port_mmap_chunk_id(hdr, p) + 1; 117 p = nxt_port_mmap_chunk_start(hdr, c); 118 119 } else { 120 p = b->mem.start; 121 c = nxt_port_mmap_chunk_id(hdr, p); 122 } 123 124 nxt_port_mmap_free_junk(p, b->mem.end - p); 125 126 nxt_debug(task, "mmap buf completion: %p [%p,%d] (sent=%d), %PI,%d,%d", b, 127 b->mem.start, b->mem.end - b->mem.start, b->is_port_mmap_sent, 128 hdr->pid, hdr->id, c); 129 130 while (p < b->mem.end) { 131 nxt_port_mmap_set_chunk_free(hdr, c); 132 133 p += PORT_MMAP_CHUNK_SIZE; 134 c++; 135 } 136 137 nxt_mp_release(mp, b); 138 } 139 140 141 nxt_port_mmap_header_t * 142 nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process, 143 nxt_fd_t fd) 144 { 145 void *mem; 146 struct stat mmap_stat; 147 nxt_port_mmap_t *port_mmap; 148 nxt_port_mmap_header_t *hdr; 149 150 nxt_debug(task, "got new mmap fd #%FD from process %PI", 151 fd, process->pid); 152 153 port_mmap = NULL; 154 hdr = NULL; 155 156 if (fstat(fd, &mmap_stat) == -1) { 157 nxt_log(task, NXT_LOG_WARN, "fstat(%FD) failed %E", fd, nxt_errno); 158 159 return NULL; 160 } 161 162 nxt_thread_mutex_lock(&process->incoming_mutex); 163 164 if (process->incoming == NULL) { 165 process->incoming = nxt_port_mmaps_create(); 166 } 167 168 if (nxt_slow_path(process->incoming == NULL)) { 169 nxt_log(task, NXT_LOG_WARN, "failed to allocate incoming array"); 170 171 goto fail; 172 } 173 174 port_mmap = nxt_port_mmap_add(process->incoming); 175 if (nxt_slow_path(port_mmap == NULL)) { 176 nxt_log(task, NXT_LOG_WARN, "failed to add mmap to incoming array"); 177 178 goto fail; 179 } 180 181 mem = nxt_mem_mmap(NULL, mmap_stat.st_size, 182 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 183 184 if (nxt_slow_path(mem == MAP_FAILED)) { 185 nxt_log(task, NXT_LOG_WARN, "mmap() failed %E", nxt_errno); 186 187 port_mmap = NULL; 188 189 goto fail; 190 } 191 192 port_mmap->hdr = mem; 193 hdr = port_mmap->hdr; 194 195 if (nxt_slow_path(port_mmap->hdr->id != process->incoming->nelts - 1)) { 196 nxt_log(task, NXT_LOG_WARN, "port mmap id mismatch (%d != %d)", 197 port_mmap->hdr->id, process->incoming->nelts - 1); 198 nxt_abort(); 199 } 200 201 fail: 202 203 nxt_thread_mutex_unlock(&process->incoming_mutex); 204 205 return hdr; 206 } 207 208 209 static nxt_port_mmap_header_t * 210 nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process, 211 nxt_port_t *port) 212 { 213 void *mem; 214 u_char *p, name[64]; 215 nxt_fd_t fd; 216 nxt_port_mmap_t *port_mmap; 217 nxt_port_mmap_header_t *hdr; 218 219 port_mmap = NULL; 220 221 if (process->outgoing == NULL) { 222 process->outgoing = nxt_port_mmaps_create(); 223 } 224 225 if (nxt_slow_path(process->outgoing == NULL)) { 226 nxt_log(task, NXT_LOG_WARN, "failed to allocate outgoing array"); 227 228 return NULL; 229 } 230 231 port_mmap = nxt_port_mmap_add(process->outgoing); 232 if (nxt_slow_path(port_mmap == NULL)) { 233 nxt_log(task, NXT_LOG_WARN, 234 "failed to add port mmap to outgoing array"); 235 236 return NULL; 237 } 238 239 p = nxt_sprintf(name, name + sizeof(name), "/unit.%PI.%uxD", 240 nxt_pid, nxt_random(&task->thread->random)); 241 *p = '\0'; 242 243 #if (NXT_HAVE_MEMFD_CREATE) 244 fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC); 245 246 if (nxt_slow_path(fd == -1)) { 247 nxt_log(task, NXT_LOG_CRIT, "memfd_create(%s) failed %E", 248 name, nxt_errno); 249 250 goto remove_fail; 251 } 252 253 nxt_debug(task, "memfd_create(%s): %FD", name, fd); 254 255 #elif (NXT_HAVE_SHM_OPEN) 256 shm_unlink((char *) name); // just in case 257 258 fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR); 259 260 nxt_debug(task, "shm_open(%s): %FD", name, fd); 261 262 if (nxt_slow_path(fd == -1)) { 263 nxt_log(task, NXT_LOG_CRIT, "shm_open(%s) failed %E", name, nxt_errno); 264 265 goto remove_fail; 266 } 267 268 if (nxt_slow_path(shm_unlink((char *) name) == -1)) { 269 nxt_log(task, NXT_LOG_WARN, "shm_unlink(%s) failed %E", name, 270 nxt_errno); 271 } 272 #endif 273 274 if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) { 275 nxt_log(task, NXT_LOG_WARN, "ftruncate() failed %E", nxt_errno); 276 277 goto remove_fail; 278 } 279 280 mem = nxt_mem_mmap(NULL, PORT_MMAP_SIZE, 281 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); 282 283 if (nxt_slow_path(mem == MAP_FAILED)) { 284 goto remove_fail; 285 } 286 287 port_mmap->hdr = mem; 288 289 /* Init segment header. */ 290 hdr = port_mmap->hdr; 291 292 nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map)); 293 294 hdr->id = process->outgoing->nelts - 1; 295 hdr->pid = process->pid; 296 297 /* Mark first chunk as busy */ 298 nxt_port_mmap_set_chunk_busy(hdr, 0); 299 300 /* Mark as busy chunk followed the last available chunk. */ 301 nxt_port_mmap_set_chunk_busy(hdr, PORT_MMAP_CHUNK_COUNT); 302 303 nxt_debug(task, "send mmap fd %FD to process %PI", fd, 304 port->pid); 305 306 /* TODO handle error */ 307 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL); 308 309 nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> %PI", 310 hdr->id, nxt_pid, process->pid); 311 312 return hdr; 313 314 remove_fail: 315 316 nxt_array_remove(process->outgoing, port_mmap); 317 318 return NULL; 319 } 320 321 322 static nxt_port_mmap_header_t * 323 nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c, 324 size_t size) 325 { 326 nxt_array_t *outgoing; 327 nxt_process_t *process; 328 nxt_port_mmap_t *port_mmap; 329 nxt_port_mmap_t *end_port_mmap; 330 nxt_port_mmap_header_t *hdr; 331 332 process = port->process; 333 if (nxt_slow_path(process == NULL)) { 334 return NULL; 335 } 336 337 *c = 0; 338 port_mmap = NULL; 339 hdr = NULL; 340 341 nxt_thread_mutex_lock(&process->outgoing_mutex); 342 343 if (process->outgoing == NULL) { 344 hdr = nxt_port_new_port_mmap(task, process, port); 345 346 goto unlock_return; 347 } 348 349 outgoing = process->outgoing; 350 port_mmap = outgoing->elts; 351 end_port_mmap = port_mmap + outgoing->nelts; 352 353 while (port_mmap < end_port_mmap) { 354 355 if (nxt_port_mmap_get_free_chunk(port_mmap->hdr, c)) { 356 hdr = port_mmap->hdr; 357 358 goto unlock_return; 359 } 360 361 port_mmap++; 362 } 363 364 /* TODO introduce port_mmap limit and release wait. */ 365 366 hdr = nxt_port_new_port_mmap(task, process, port); 367 368 unlock_return: 369 370 nxt_thread_mutex_unlock(&process->outgoing_mutex); 371 372 return hdr; 373 } 374 375 376 static nxt_port_mmap_header_t * 377 nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id) 378 { 379 nxt_array_t *incoming; 380 nxt_process_t *process; 381 nxt_port_mmap_t *port_mmap; 382 nxt_port_mmap_header_t *hdr; 383 384 process = nxt_runtime_process_find(task->thread->runtime, spid); 385 if (nxt_slow_path(process == NULL)) { 386 return NULL; 387 } 388 389 hdr = NULL; 390 391 nxt_thread_mutex_lock(&process->incoming_mutex); 392 393 incoming = process->incoming; 394 395 if (nxt_fast_path(incoming != NULL && incoming->nelts > id)) { 396 port_mmap = incoming->elts; 397 hdr = port_mmap[id].hdr; 398 } else { 399 nxt_log(task, NXT_LOG_WARN, 400 "failed to get incoming mmap #%d for process %PI", id, spid); 401 } 402 403 nxt_thread_mutex_unlock(&process->incoming_mutex); 404 405 return hdr; 406 } 407 408 409 nxt_buf_t * 410 nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size) 411 { 412 size_t nchunks; 413 nxt_buf_t *b; 414 nxt_chunk_id_t c; 415 nxt_port_mmap_header_t *hdr; 416 417 nxt_debug(task, "request %z bytes shm buffer", size); 418 419 b = nxt_buf_mem_ts_alloc(task, port->mem_pool, 0); 420 if (nxt_slow_path(b == NULL)) { 421 return NULL; 422 } 423 424 b->completion_handler = nxt_port_mmap_buf_completion; 425 nxt_buf_set_port_mmap(b); 426 427 hdr = nxt_port_mmap_get(task, port, &c, size); 428 if (nxt_slow_path(hdr == NULL)) { 429 nxt_mp_release(port->mem_pool, b); 430 return NULL; 431 } 432 433 b->parent = hdr; 434 435 b->mem.start = nxt_port_mmap_chunk_start(hdr, c); 436 b->mem.pos = b->mem.start; 437 b->mem.free = b->mem.start; 438 b->mem.end = b->mem.start + PORT_MMAP_CHUNK_SIZE; 439 440 nchunks = size / PORT_MMAP_CHUNK_SIZE; 441 if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) { 442 nchunks++; 443 } 444 445 nxt_debug(task, "outgoing mmap buf allocation: %p [%p,%d] %PI,%d,%d", b, 446 b->mem.start, b->mem.end - b->mem.start, 447 hdr->pid, hdr->id, c); 448 449 c++; 450 nchunks--; 451 452 /* Try to acquire as much chunks as required. */ 453 while (nchunks > 0) { 454 455 if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) { 456 break; 457 } 458 459 b->mem.end += PORT_MMAP_CHUNK_SIZE; 460 c++; 461 nchunks--; 462 } 463 464 return b; 465 } 466 467 468 nxt_int_t 469 nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size, 470 size_t min_size) 471 { 472 size_t nchunks, free_size; 473 nxt_chunk_id_t c, start; 474 nxt_port_mmap_header_t *hdr; 475 476 nxt_debug(task, "request increase %z bytes shm buffer", size); 477 478 if (nxt_slow_path(nxt_buf_is_port_mmap(b) == 0)) { 479 nxt_log(task, NXT_LOG_WARN, 480 "failed to increase, not a mmap buffer"); 481 return NXT_ERROR; 482 } 483 484 free_size = nxt_buf_mem_free_size(&b->mem); 485 486 if (nxt_slow_path(size <= free_size)) { 487 return NXT_OK; 488 } 489 490 hdr = b->parent; 491 492 start = nxt_port_mmap_chunk_id(hdr, b->mem.end); 493 494 size -= free_size; 495 496 nchunks = size / PORT_MMAP_CHUNK_SIZE; 497 if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) { 498 nchunks++; 499 } 500 501 c = start; 502 503 /* Try to acquire as much chunks as required. */ 504 while (nchunks > 0) { 505 506 if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) { 507 break; 508 } 509 510 c++; 511 nchunks--; 512 } 513 514 if (nchunks != 0 && 515 min_size > free_size + PORT_MMAP_CHUNK_SIZE * (c - start)) { 516 517 c--; 518 while (c >= start) { 519 nxt_port_mmap_set_chunk_free(hdr, c); 520 c--; 521 } 522 523 nxt_debug(task, "failed to increase, %d chunks busy", nchunks); 524 525 return NXT_ERROR; 526 } else { 527 b->mem.end += PORT_MMAP_CHUNK_SIZE * (c - start); 528 529 return NXT_OK; 530 } 531 } 532 533 534 static nxt_buf_t * 535 nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port, 536 nxt_pid_t spid, nxt_port_mmap_msg_t *mmap_msg) 537 { 538 size_t nchunks; 539 nxt_buf_t *b; 540 nxt_port_mmap_header_t *hdr; 541 542 hdr = nxt_port_get_port_incoming_mmap(task, spid, mmap_msg->mmap_id); 543 if (nxt_slow_path(hdr == NULL)) { 544 return NULL; 545 } 546 547 b = nxt_buf_mem_ts_alloc(task, port->mem_pool, 0); 548 if (nxt_slow_path(b == NULL)) { 549 return NULL; 550 } 551 552 b->completion_handler = nxt_port_mmap_buf_completion; 553 554 nxt_buf_set_port_mmap(b); 555 556 nchunks = mmap_msg->size / PORT_MMAP_CHUNK_SIZE; 557 if ((mmap_msg->size % PORT_MMAP_CHUNK_SIZE) != 0) { 558 nchunks++; 559 } 560 561 b->mem.start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id); 562 b->mem.pos = b->mem.start; 563 b->mem.free = b->mem.start + mmap_msg->size; 564 b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE; 565 566 b->parent = hdr; 567 568 nxt_debug(task, "incoming mmap buf allocation: %p [%p,%d] %PI,%d,%d", b, 569 b->mem.start, b->mem.end - b->mem.start, 570 hdr->pid, hdr->id, mmap_msg->chunk_id); 571 572 return b; 573 } 574 575 576 void 577 nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port, 578 nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb) 579 { 580 size_t bsize; 581 nxt_buf_t *bmem; 582 nxt_uint_t i; 583 nxt_port_mmap_msg_t *mmap_msg; 584 nxt_port_mmap_header_t *hdr; 585 586 nxt_debug(task, "prepare %z bytes message for transfer to process %PI " 587 "via shared memory", sb->size, port->pid); 588 589 bsize = sb->niov * sizeof(nxt_port_mmap_msg_t); 590 mmap_msg = port->mmsg_buf; 591 592 bmem = msg->buf; 593 594 for (i = 0; i < sb->niov; i++, mmap_msg++) { 595 596 /* Lookup buffer which starts current iov_base. */ 597 while (bmem && sb->iobuf[i].iov_base != bmem->mem.pos) { 598 bmem = bmem->next; 599 } 600 601 if (nxt_slow_path(bmem == NULL)) { 602 nxt_log_error(NXT_LOG_ERR, task->log, "failed to find buf for " 603 "iobuf[%d]", i); 604 return; 605 /* TODO clear b and exit */ 606 } 607 608 hdr = bmem->parent; 609 610 mmap_msg->mmap_id = hdr->id; 611 mmap_msg->chunk_id = nxt_port_mmap_chunk_id(hdr, bmem->mem.pos); 612 mmap_msg->size = sb->iobuf[i].iov_len; 613 614 nxt_debug(task, "mmap_msg={%D, %D, %D} to %PI", 615 mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size, 616 port->pid); 617 } 618 619 sb->iobuf[0].iov_base = port->mmsg_buf; 620 sb->iobuf[0].iov_len = bsize; 621 sb->niov = 1; 622 sb->size = bsize; 623 624 msg->port_msg.mmap = 1; 625 } 626 627 628 void 629 nxt_port_mmap_read(nxt_task_t *task, nxt_port_t *port, 630 nxt_port_recv_msg_t *msg) 631 { 632 nxt_buf_t *b, **pb; 633 nxt_port_mmap_msg_t *end, *mmap_msg; 634 635 b = msg->buf; 636 637 mmap_msg = (nxt_port_mmap_msg_t *) b->mem.pos; 638 end = (nxt_port_mmap_msg_t *) b->mem.free; 639 640 pb = &msg->buf; 641 msg->size = 0; 642 643 while (mmap_msg < end) { 644 nxt_debug(task, "mmap_msg={%D, %D, %D} from %PI", 645 mmap_msg->mmap_id, mmap_msg->chunk_id, mmap_msg->size, 646 msg->port_msg.pid); 647 648 *pb = nxt_port_mmap_get_incoming_buf(task, port, msg->port_msg.pid, 649 mmap_msg); 650 if (nxt_slow_path(*pb == NULL)) { 651 nxt_log_error(NXT_LOG_ERR, task->log, "failed to get mmap buffer"); 652 653 break; 654 } 655 656 msg->size += mmap_msg->size; 657 pb = &(*pb)->next; 658 mmap_msg++; 659 } 660 661 /* Mark original buf as complete. */ 662 b->mem.pos += nxt_buf_used_size(b); 663 } 664 665 666 nxt_port_method_t 667 nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b) 668 { 669 nxt_port_method_t m; 670 nxt_port_mmap_header_t *hdr; 671 672 m = NXT_PORT_METHOD_ANY; 673 674 for (; b != NULL; b = b->next) { 675 if (nxt_buf_used_size(b) == 0) { 676 /* empty buffers does not affect method */ 677 continue; 678 } 679 680 if (nxt_buf_is_port_mmap(b)) { 681 hdr = b->parent; 682 683 if (m == NXT_PORT_METHOD_PLAIN) { 684 nxt_log_error(NXT_LOG_ERR, task->log, 685 "mixing plain and mmap buffers, " 686 "using plain mode"); 687 688 break; 689 } 690 691 if (port->pid != hdr->pid) { 692 nxt_log_error(NXT_LOG_ERR, task->log, 693 "send mmap buffer for %PI to %PI, " 694 "using plain mode", hdr->pid, port->pid); 695 696 m = NXT_PORT_METHOD_PLAIN; 697 698 break; 699 } 700 701 if (m == NXT_PORT_METHOD_ANY) { 702 nxt_debug(task, "using mmap mode"); 703 704 m = NXT_PORT_METHOD_MMAP; 705 } 706 } else { 707 if (m == NXT_PORT_METHOD_MMAP) { 708 nxt_log_error(NXT_LOG_ERR, task->log, 709 "mixing mmap and plain buffers, " 710 "switching to plain mode"); 711 712 m = NXT_PORT_METHOD_PLAIN; 713 714 break; 715 } 716 717 if (m == NXT_PORT_METHOD_ANY) { 718 nxt_debug(task, "using plain mode"); 719 720 m = NXT_PORT_METHOD_PLAIN; 721 } 722 } 723 } 724 725 return m; 726 } 727