1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 #include <nxt_socket_msg.h> 9 #include <nxt_port_queue.h> 10 #include <nxt_port_memory_int.h> 11 12 13 #define NXT_PORT_MAX_ENQUEUE_BUF_SIZE \ 14 (int) (NXT_PORT_QUEUE_MSG_SIZE - sizeof(nxt_port_msg_t)) 15 16 17 static nxt_bool_t nxt_port_can_enqueue_buf(nxt_buf_t *b); 18 static uint8_t nxt_port_enqueue_buf(nxt_task_t *task, nxt_port_msg_t *pm, 19 void *qbuf, nxt_buf_t *b); 20 static nxt_int_t nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port, 21 nxt_port_send_msg_t *msg); 22 static nxt_port_send_msg_t *nxt_port_msg_alloc(nxt_port_send_msg_t *m); 23 static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data); 24 static nxt_port_send_msg_t *nxt_port_msg_first(nxt_port_t *port); 25 nxt_inline void nxt_port_msg_close_fd(nxt_port_send_msg_t *msg); 26 nxt_inline void nxt_port_close_fds(nxt_fd_t *fd); 27 static nxt_buf_t *nxt_port_buf_completion(nxt_task_t *task, 28 nxt_work_queue_t *wq, nxt_buf_t *b, size_t sent, nxt_bool_t mmap_mode); 29 static nxt_port_send_msg_t *nxt_port_msg_insert_tail(nxt_port_t *port, 30 nxt_port_send_msg_t *msg); 31 static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data); 32 static void nxt_port_queue_read_handler(nxt_task_t *task, void *obj, 33 void *data); 34 static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, 35 nxt_port_recv_msg_t *msg); 36 static nxt_buf_t *nxt_port_buf_alloc(nxt_port_t *port); 37 static void nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b); 38 static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data); 39 40 41 nxt_int_t 42 nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port, size_t max_size) 43 { 44 nxt_int_t sndbuf, rcvbuf, size; 45 nxt_socket_t snd, rcv; 46 47 port->socket.task = task; 48 49 port->pair[0] = -1; 50 port->pair[1] = -1; 51 52 if (nxt_slow_path(nxt_socketpair_create(task, port->pair) != NXT_OK)) { 53 goto socketpair_fail; 54 } 55 56 snd = port->pair[1]; 57 58 sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF); 59 if (nxt_slow_path(sndbuf < 0)) { 60 goto getsockopt_fail; 61 } 62 63 rcv = port->pair[0]; 64 65 rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF); 66 if (nxt_slow_path(rcvbuf < 0)) { 67 goto getsockopt_fail; 68 } 69 70 if (max_size == 0) { 71 max_size = 16 * 1024; 72 } 73 74 if ((size_t) sndbuf < max_size) { 75 /* 76 * On Unix domain sockets 77 * Linux uses 224K on both send and receive directions; 78 * FreeBSD, MacOSX, NetBSD, and OpenBSD use 2K buffer size 79 * on send direction and 4K buffer size on receive direction; 80 * Solaris uses 16K on send direction and 5K on receive direction. 81 */ 82 (void) nxt_socket_setsockopt(task, snd, SOL_SOCKET, SO_SNDBUF, 83 max_size); 84 85 sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF); 86 if (nxt_slow_path(sndbuf < 0)) { 87 goto getsockopt_fail; 88 } 89 90 size = sndbuf * 4; 91 92 if (rcvbuf < size) { 93 (void) nxt_socket_setsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF, 94 size); 95 96 rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF); 97 if (nxt_slow_path(rcvbuf < 0)) { 98 goto getsockopt_fail; 99 } 100 } 101 } 102 103 port->max_size = nxt_min(max_size, (size_t) sndbuf); 104 port->max_share = (64 * 1024); 105 106 return NXT_OK; 107 108 getsockopt_fail: 109 110 nxt_socket_close(task, port->pair[0]); 111 nxt_socket_close(task, port->pair[1]); 112 113 socketpair_fail: 114 115 return NXT_ERROR; 116 } 117 118 119 void 120 nxt_port_destroy(nxt_port_t *port) 121 { 122 nxt_socket_close(port->socket.task, port->socket.fd); 123 nxt_mp_destroy(port->mem_pool); 124 } 125 126 127 void 128 nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port) 129 { 130 port->socket.fd = port->pair[1]; 131 port->socket.log = &nxt_main_log; 132 port->socket.write_ready = 1; 133 134 port->engine = task->thread->engine; 135 136 port->socket.write_work_queue = &port->engine->fast_work_queue; 137 port->socket.write_handler = nxt_port_write_handler; 138 port->socket.error_handler = nxt_port_error_handler; 139 } 140 141 142 void 143 nxt_port_write_close(nxt_port_t *port) 144 { 145 nxt_socket_close(port->socket.task, port->pair[1]); 146 port->pair[1] = -1; 147 } 148 149 150 static void 151 nxt_port_release_send_msg(nxt_port_send_msg_t *msg) 152 { 153 if (msg->allocated) { 154 nxt_free(msg); 155 } 156 } 157 158 159 nxt_int_t 160 nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, 161 nxt_fd_t fd, nxt_fd_t fd2, uint32_t stream, nxt_port_id_t reply_port, 162 nxt_buf_t *b) 163 { 164 int notify; 165 uint8_t qmsg_size; 166 nxt_int_t res; 167 nxt_port_send_msg_t msg; 168 struct { 169 nxt_port_msg_t pm; 170 uint8_t buf[NXT_PORT_MAX_ENQUEUE_BUF_SIZE]; 171 } qmsg; 172 173 msg.link.next = NULL; 174 msg.link.prev = NULL; 175 176 msg.buf = b; 177 msg.share = 0; 178 msg.fd[0] = fd; 179 msg.fd[1] = fd2; 180 msg.close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0; 181 msg.allocated = 0; 182 183 msg.port_msg.stream = stream; 184 msg.port_msg.pid = nxt_pid; 185 msg.port_msg.reply_port = reply_port; 186 msg.port_msg.type = type & NXT_PORT_MSG_MASK; 187 msg.port_msg.last = (type & NXT_PORT_MSG_LAST) != 0; 188 msg.port_msg.mmap = 0; 189 msg.port_msg.nf = 0; 190 msg.port_msg.mf = 0; 191 192 if (port->queue != NULL && type != _NXT_PORT_MSG_READ_QUEUE) { 193 194 if (fd == -1 && nxt_port_can_enqueue_buf(b)) { 195 qmsg.pm = msg.port_msg; 196 197 qmsg_size = sizeof(qmsg.pm); 198 199 if (b != NULL) { 200 qmsg_size += nxt_port_enqueue_buf(task, &qmsg.pm, qmsg.buf, b); 201 } 202 203 res = nxt_port_queue_send(port->queue, &qmsg, qmsg_size, ¬ify); 204 205 nxt_debug(task, "port{%d,%d} %d: enqueue %d notify %d, %d", 206 (int) port->pid, (int) port->id, port->socket.fd, 207 (int) qmsg_size, notify, res); 208 209 if (b != NULL && nxt_fast_path(res == NXT_OK)) { 210 if (qmsg.pm.mmap) { 211 b->is_port_mmap_sent = 1; 212 } 213 214 b->mem.pos = b->mem.free; 215 216 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 217 b->completion_handler, task, b, b->parent); 218 } 219 220 if (notify == 0) { 221 return res; 222 } 223 224 msg.port_msg.type = _NXT_PORT_MSG_READ_QUEUE; 225 msg.buf = NULL; 226 227 } else { 228 qmsg.buf[0] = _NXT_PORT_MSG_READ_SOCKET; 229 230 res = nxt_port_queue_send(port->queue, qmsg.buf, 1, ¬ify); 231 232 nxt_debug(task, "port{%d,%d} %d: enqueue 1 notify %d, %d", 233 (int) port->pid, (int) port->id, port->socket.fd, 234 notify, res); 235 236 if (nxt_slow_path(res == NXT_AGAIN)) { 237 return NXT_AGAIN; 238 } 239 } 240 } 241 242 res = nxt_port_msg_chk_insert(task, port, &msg); 243 if (nxt_fast_path(res == NXT_DECLINED)) { 244 nxt_port_write_handler(task, &port->socket, &msg); 245 res = NXT_OK; 246 } 247 248 return res; 249 } 250 251 252 static nxt_bool_t 253 nxt_port_can_enqueue_buf(nxt_buf_t *b) 254 { 255 if (b == NULL) { 256 return 1; 257 } 258 259 if (b->next != NULL) { 260 return 0; 261 } 262 263 return (nxt_buf_mem_used_size(&b->mem) <= NXT_PORT_MAX_ENQUEUE_BUF_SIZE 264 || nxt_buf_is_port_mmap(b)); 265 } 266 267 268 static uint8_t 269 nxt_port_enqueue_buf(nxt_task_t *task, nxt_port_msg_t *pm, void *qbuf, 270 nxt_buf_t *b) 271 { 272 ssize_t size; 273 nxt_port_mmap_msg_t *mm; 274 nxt_port_mmap_header_t *hdr; 275 nxt_port_mmap_handler_t *mmap_handler; 276 277 size = nxt_buf_mem_used_size(&b->mem); 278 279 if (size <= NXT_PORT_MAX_ENQUEUE_BUF_SIZE) { 280 nxt_memcpy(qbuf, b->mem.pos, size); 281 282 return size; 283 } 284 285 mmap_handler = b->parent; 286 hdr = mmap_handler->hdr; 287 mm = qbuf; 288 289 mm->mmap_id = hdr->id; 290 mm->chunk_id = nxt_port_mmap_chunk_id(hdr, b->mem.pos); 291 mm->size = nxt_buf_mem_used_size(&b->mem); 292 293 pm->mmap = 1; 294 295 nxt_debug(task, "mmap_msg={%D, %D, %D}", mm->mmap_id, mm->chunk_id, 296 mm->size); 297 298 return sizeof(nxt_port_mmap_msg_t); 299 } 300 301 302 static nxt_int_t 303 nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port, 304 nxt_port_send_msg_t *msg) 305 { 306 nxt_int_t res; 307 308 nxt_thread_mutex_lock(&port->write_mutex); 309 310 if (nxt_fast_path(port->socket.write_ready 311 && nxt_queue_is_empty(&port->messages))) 312 { 313 res = NXT_DECLINED; 314 315 } else { 316 msg = nxt_port_msg_alloc(msg); 317 318 if (nxt_fast_path(msg != NULL)) { 319 nxt_queue_insert_tail(&port->messages, &msg->link); 320 nxt_port_use(task, port, 1); 321 res = NXT_OK; 322 323 } else { 324 res = NXT_ERROR; 325 } 326 } 327 328 nxt_thread_mutex_unlock(&port->write_mutex); 329 330 return res; 331 } 332 333 334 static nxt_port_send_msg_t * 335 nxt_port_msg_alloc(nxt_port_send_msg_t *m) 336 { 337 nxt_port_send_msg_t *msg; 338 339 msg = nxt_malloc(sizeof(nxt_port_send_msg_t)); 340 if (nxt_slow_path(msg == NULL)) { 341 return NULL; 342 } 343 344 *msg = *m; 345 346 msg->allocated = 1; 347 348 return msg; 349 } 350 351 352 static void 353 nxt_port_fd_block_write(nxt_task_t *task, nxt_port_t *port, void *data) 354 { 355 nxt_fd_event_block_write(task->thread->engine, &port->socket); 356 } 357 358 359 static void 360 nxt_port_fd_enable_write(nxt_task_t *task, nxt_port_t *port, void *data) 361 { 362 nxt_fd_event_enable_write(task->thread->engine, &port->socket); 363 } 364 365 366 static void 367 nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) 368 { 369 int use_delta; 370 size_t plain_size; 371 ssize_t n; 372 uint32_t mmsg_buf[3 * NXT_IOBUF_MAX * 10]; 373 nxt_bool_t block_write, enable_write; 374 nxt_port_t *port; 375 struct iovec iov[NXT_IOBUF_MAX * 10]; 376 nxt_work_queue_t *wq; 377 nxt_port_method_t m; 378 nxt_port_send_msg_t *msg; 379 nxt_sendbuf_coalesce_t sb; 380 381 port = nxt_container_of(obj, nxt_port_t, socket); 382 383 block_write = 0; 384 enable_write = 0; 385 use_delta = 0; 386 387 wq = &task->thread->engine->fast_work_queue; 388 389 do { 390 if (data) { 391 msg = data; 392 393 } else { 394 msg = nxt_port_msg_first(port); 395 396 if (msg == NULL) { 397 block_write = 1; 398 goto cleanup; 399 } 400 } 401 402 next_fragment: 403 404 iov[0].iov_base = &msg->port_msg; 405 iov[0].iov_len = sizeof(nxt_port_msg_t); 406 407 sb.buf = msg->buf; 408 sb.iobuf = &iov[1]; 409 sb.nmax = NXT_IOBUF_MAX - 1; 410 sb.sync = 0; 411 sb.last = 0; 412 sb.size = 0; 413 sb.limit = port->max_size; 414 415 sb.limit_reached = 0; 416 sb.nmax_reached = 0; 417 418 m = nxt_port_mmap_get_method(task, port, msg->buf); 419 420 if (m == NXT_PORT_METHOD_MMAP) { 421 sb.limit = (1ULL << 31) - 1; 422 sb.nmax = nxt_min(NXT_IOBUF_MAX * 10 - 1, 423 port->max_size / PORT_MMAP_MIN_SIZE); 424 } 425 426 sb.limit -= iov[0].iov_len; 427 428 nxt_sendbuf_mem_coalesce(task, &sb); 429 430 plain_size = sb.size; 431 432 /* 433 * Send through mmap enabled only when payload 434 * is bigger than PORT_MMAP_MIN_SIZE. 435 */ 436 if (m == NXT_PORT_METHOD_MMAP && plain_size > PORT_MMAP_MIN_SIZE) { 437 nxt_port_mmap_write(task, port, msg, &sb, mmsg_buf); 438 439 } else { 440 m = NXT_PORT_METHOD_PLAIN; 441 } 442 443 msg->port_msg.last |= sb.last; 444 msg->port_msg.mf = sb.limit_reached || sb.nmax_reached; 445 446 n = nxt_socketpair_send(&port->socket, msg->fd, iov, sb.niov + 1); 447 448 if (n > 0) { 449 if (nxt_slow_path((size_t) n != sb.size + iov[0].iov_len)) { 450 nxt_alert(task, "port %d: short write: %z instead of %uz", 451 port->socket.fd, n, sb.size + iov[0].iov_len); 452 goto fail; 453 } 454 455 nxt_port_msg_close_fd(msg); 456 457 msg->buf = nxt_port_buf_completion(task, wq, msg->buf, plain_size, 458 m == NXT_PORT_METHOD_MMAP); 459 460 if (msg->buf != NULL) { 461 nxt_debug(task, "port %d: frag stream #%uD", port->socket.fd, 462 msg->port_msg.stream); 463 464 /* 465 * A file descriptor is sent only 466 * in the first message of a stream. 467 */ 468 msg->fd[0] = -1; 469 msg->fd[1] = -1; 470 msg->share += n; 471 msg->port_msg.nf = 1; 472 473 if (msg->share >= port->max_share) { 474 msg->share = 0; 475 476 if (msg->link.next != NULL) { 477 nxt_thread_mutex_lock(&port->write_mutex); 478 479 nxt_queue_remove(&msg->link); 480 nxt_queue_insert_tail(&port->messages, &msg->link); 481 482 nxt_thread_mutex_unlock(&port->write_mutex); 483 484 } else { 485 msg = nxt_port_msg_insert_tail(port, msg); 486 if (nxt_slow_path(msg == NULL)) { 487 goto fail; 488 } 489 490 use_delta++; 491 } 492 493 } else { 494 goto next_fragment; 495 } 496 497 } else { 498 if (msg->link.next != NULL) { 499 nxt_thread_mutex_lock(&port->write_mutex); 500 501 nxt_queue_remove(&msg->link); 502 msg->link.next = NULL; 503 504 nxt_thread_mutex_unlock(&port->write_mutex); 505 506 use_delta--; 507 } 508 509 nxt_port_release_send_msg(msg); 510 } 511 512 if (data != NULL) { 513 goto cleanup; 514 } 515 516 } else { 517 if (nxt_slow_path(n == NXT_ERROR)) { 518 if (msg->link.next == NULL) { 519 nxt_port_msg_close_fd(msg); 520 521 nxt_port_release_send_msg(msg); 522 } 523 524 goto fail; 525 } 526 527 if (msg->link.next == NULL) { 528 msg = nxt_port_msg_insert_tail(port, msg); 529 if (nxt_slow_path(msg == NULL)) { 530 goto fail; 531 } 532 533 use_delta++; 534 } 535 } 536 537 } while (port->socket.write_ready); 538 539 if (nxt_fd_event_is_disabled(port->socket.write)) { 540 enable_write = 1; 541 } 542 543 goto cleanup; 544 545 fail: 546 547 use_delta++; 548 549 nxt_work_queue_add(wq, nxt_port_error_handler, task, &port->socket, 550 &port->socket); 551 552 cleanup: 553 554 if (block_write && nxt_fd_event_is_active(port->socket.write)) { 555 nxt_port_post(task, port, nxt_port_fd_block_write, NULL); 556 } 557 558 if (enable_write) { 559 nxt_port_post(task, port, nxt_port_fd_enable_write, NULL); 560 } 561 562 if (use_delta != 0) { 563 nxt_port_use(task, port, use_delta); 564 } 565 } 566 567 568 static nxt_port_send_msg_t * 569 nxt_port_msg_first(nxt_port_t *port) 570 { 571 nxt_queue_link_t *lnk; 572 nxt_port_send_msg_t *msg; 573 574 nxt_thread_mutex_lock(&port->write_mutex); 575 576 lnk = nxt_queue_first(&port->messages); 577 578 if (lnk == nxt_queue_tail(&port->messages)) { 579 msg = NULL; 580 581 } else { 582 msg = nxt_queue_link_data(lnk, nxt_port_send_msg_t, link); 583 } 584 585 nxt_thread_mutex_unlock(&port->write_mutex); 586 587 return msg; 588 } 589 590 591 nxt_inline void 592 nxt_port_msg_close_fd(nxt_port_send_msg_t *msg) 593 { 594 if (!msg->close_fd) { 595 return; 596 } 597 598 nxt_port_close_fds(msg->fd); 599 } 600 601 602 nxt_inline void 603 nxt_port_close_fds(nxt_fd_t *fd) 604 { 605 if (fd[0] != -1) { 606 nxt_fd_close(fd[0]); 607 fd[0] = -1; 608 } 609 610 if (fd[1] != -1) { 611 nxt_fd_close(fd[1]); 612 fd[1] = -1; 613 } 614 } 615 616 617 static nxt_buf_t * 618 nxt_port_buf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b, 619 size_t sent, nxt_bool_t mmap_mode) 620 { 621 size_t size; 622 nxt_buf_t *next; 623 624 while (b != NULL) { 625 626 nxt_prefetch(b->next); 627 628 if (!nxt_buf_is_sync(b)) { 629 630 size = nxt_buf_used_size(b); 631 632 if (size != 0) { 633 634 if (sent == 0) { 635 break; 636 } 637 638 if (nxt_buf_is_port_mmap(b) && mmap_mode) { 639 /* 640 * buffer has been sent to other side which is now 641 * responsible for shared memory bucket release 642 */ 643 b->is_port_mmap_sent = 1; 644 } 645 646 if (sent < size) { 647 648 if (nxt_buf_is_mem(b)) { 649 b->mem.pos += sent; 650 } 651 652 if (nxt_buf_is_file(b)) { 653 b->file_pos += sent; 654 } 655 656 break; 657 } 658 659 /* b->mem.free is NULL in file-only buffer. */ 660 b->mem.pos = b->mem.free; 661 662 if (nxt_buf_is_file(b)) { 663 b->file_pos = b->file_end; 664 } 665 666 sent -= size; 667 } 668 } 669 670 nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); 671 672 next = b->next; 673 b->next = NULL; 674 b = next; 675 } 676 677 return b; 678 } 679 680 681 static nxt_port_send_msg_t * 682 nxt_port_msg_insert_tail(nxt_port_t *port, nxt_port_send_msg_t *msg) 683 { 684 if (msg->allocated == 0) { 685 msg = nxt_port_msg_alloc(msg); 686 687 if (nxt_slow_path(msg == NULL)) { 688 return NULL; 689 } 690 } 691 692 nxt_thread_mutex_lock(&port->write_mutex); 693 694 nxt_queue_insert_tail(&port->messages, &msg->link); 695 696 nxt_thread_mutex_unlock(&port->write_mutex); 697 698 return msg; 699 } 700 701 702 void 703 nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port) 704 { 705 port->socket.fd = port->pair[0]; 706 port->socket.log = &nxt_main_log; 707 708 port->engine = task->thread->engine; 709 710 port->socket.read_work_queue = &port->engine->fast_work_queue; 711 port->socket.read_handler = port->queue != NULL 712 ? nxt_port_queue_read_handler 713 : nxt_port_read_handler; 714 port->socket.error_handler = nxt_port_error_handler; 715 716 nxt_fd_event_enable_read(port->engine, &port->socket); 717 } 718 719 720 void 721 nxt_port_read_close(nxt_port_t *port) 722 { 723 port->socket.read_ready = 0; 724 port->socket.read = NXT_EVENT_INACTIVE; 725 nxt_socket_close(port->socket.task, port->pair[0]); 726 port->pair[0] = -1; 727 } 728 729 730 static void 731 nxt_port_read_handler(nxt_task_t *task, void *obj, void *data) 732 { 733 ssize_t n; 734 nxt_buf_t *b; 735 nxt_int_t ret; 736 nxt_port_t *port; 737 nxt_recv_oob_t oob; 738 nxt_port_recv_msg_t msg; 739 struct iovec iov[2]; 740 741 port = msg.port = nxt_container_of(obj, nxt_port_t, socket); 742 743 nxt_assert(port->engine == task->thread->engine); 744 745 for ( ;; ) { 746 b = nxt_port_buf_alloc(port); 747 748 if (nxt_slow_path(b == NULL)) { 749 /* TODO: disable event for some time */ 750 } 751 752 iov[0].iov_base = &msg.port_msg; 753 iov[0].iov_len = sizeof(nxt_port_msg_t); 754 755 iov[1].iov_base = b->mem.pos; 756 iov[1].iov_len = port->max_size; 757 758 n = nxt_socketpair_recv(&port->socket, iov, 2, &oob); 759 760 if (n > 0) { 761 msg.fd[0] = -1; 762 msg.fd[1] = -1; 763 764 ret = nxt_socket_msg_oob_get(&oob, msg.fd, 765 nxt_recv_msg_cmsg_pid_ref(&msg)); 766 if (nxt_slow_path(ret != NXT_OK)) { 767 nxt_alert(task, "failed to get oob data from %d", 768 port->socket.fd); 769 770 nxt_port_close_fds(msg.fd); 771 772 goto fail; 773 } 774 775 msg.buf = b; 776 msg.size = n; 777 778 nxt_port_read_msg_process(task, port, &msg); 779 780 /* 781 * To disable instant completion or buffer re-usage, 782 * handler should reset 'msg.buf'. 783 */ 784 if (msg.buf == b) { 785 nxt_port_buf_free(port, b); 786 } 787 788 if (port->socket.read_ready) { 789 continue; 790 } 791 792 return; 793 } 794 795 if (n == NXT_AGAIN) { 796 nxt_port_buf_free(port, b); 797 798 nxt_fd_event_enable_read(task->thread->engine, &port->socket); 799 return; 800 } 801 802 fail: 803 /* n == 0 || error */ 804 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 805 nxt_port_error_handler, task, &port->socket, NULL); 806 return; 807 } 808 } 809 810 811 static void 812 nxt_port_queue_read_handler(nxt_task_t *task, void *obj, void *data) 813 { 814 ssize_t n; 815 nxt_buf_t *b; 816 nxt_int_t ret; 817 nxt_port_t *port; 818 struct iovec iov[2]; 819 nxt_recv_oob_t oob; 820 nxt_port_queue_t *queue; 821 nxt_port_recv_msg_t msg, *smsg; 822 uint8_t qmsg[NXT_PORT_QUEUE_MSG_SIZE]; 823 824 port = nxt_container_of(obj, nxt_port_t, socket); 825 msg.port = port; 826 827 nxt_assert(port->engine == task->thread->engine); 828 829 queue = port->queue; 830 nxt_atomic_fetch_add(&queue->nitems, 1); 831 832 for ( ;; ) { 833 834 if (port->from_socket == 0) { 835 n = nxt_port_queue_recv(queue, qmsg); 836 837 if (n < 0 && !port->socket.read_ready) { 838 nxt_atomic_fetch_add(&queue->nitems, -1); 839 840 n = nxt_port_queue_recv(queue, qmsg); 841 if (n < 0) { 842 return; 843 } 844 845 nxt_atomic_fetch_add(&queue->nitems, 1); 846 } 847 848 if (n == 1 && qmsg[0] == _NXT_PORT_MSG_READ_SOCKET) { 849 port->from_socket++; 850 851 nxt_debug(task, "port{%d,%d} %d: dequeue 1 read_socket %d", 852 (int) port->pid, (int) port->id, port->socket.fd, 853 port->from_socket); 854 855 continue; 856 } 857 858 nxt_debug(task, "port{%d,%d} %d: dequeue %d", 859 (int) port->pid, (int) port->id, port->socket.fd, 860 (int) n); 861 862 } else { 863 if ((smsg = port->socket_msg) != NULL && smsg->size != 0) { 864 msg.port_msg = smsg->port_msg; 865 b = smsg->buf; 866 n = smsg->size; 867 msg.fd[0] = smsg->fd[0]; 868 msg.fd[1] = smsg->fd[1]; 869 870 smsg->size = 0; 871 872 port->from_socket--; 873 874 nxt_debug(task, "port{%d,%d} %d: use suspended message %d", 875 (int) port->pid, (int) port->id, port->socket.fd, 876 (int) n); 877 878 goto process; 879 } 880 881 n = -1; 882 } 883 884 if (n < 0 && !port->socket.read_ready) { 885 nxt_atomic_fetch_add(&queue->nitems, -1); 886 return; 887 } 888 889 b = nxt_port_buf_alloc(port); 890 891 if (nxt_slow_path(b == NULL)) { 892 /* TODO: disable event for some time */ 893 } 894 895 if (n >= (ssize_t) sizeof(nxt_port_msg_t)) { 896 nxt_memcpy(&msg.port_msg, qmsg, sizeof(nxt_port_msg_t)); 897 898 if (n > (ssize_t) sizeof(nxt_port_msg_t)) { 899 nxt_memcpy(b->mem.pos, qmsg + sizeof(nxt_port_msg_t), 900 n - sizeof(nxt_port_msg_t)); 901 } 902 903 } else { 904 iov[0].iov_base = &msg.port_msg; 905 iov[0].iov_len = sizeof(nxt_port_msg_t); 906 907 iov[1].iov_base = b->mem.pos; 908 iov[1].iov_len = port->max_size; 909 910 n = nxt_socketpair_recv(&port->socket, iov, 2, &oob); 911 912 if (n > 0) { 913 msg.fd[0] = -1; 914 msg.fd[1] = -1; 915 916 ret = nxt_socket_msg_oob_get(&oob, msg.fd, 917 nxt_recv_msg_cmsg_pid_ref(&msg)); 918 if (nxt_slow_path(ret != NXT_OK)) { 919 nxt_alert(task, "failed to get oob data from %d", 920 port->socket.fd); 921 922 nxt_port_close_fds(msg.fd); 923 924 return; 925 } 926 } 927 928 if (n == (ssize_t) sizeof(nxt_port_msg_t) 929 && msg.port_msg.type == _NXT_PORT_MSG_READ_QUEUE) 930 { 931 nxt_port_buf_free(port, b); 932 933 nxt_debug(task, "port{%d,%d} %d: recv %d read_queue", 934 (int) port->pid, (int) port->id, port->socket.fd, 935 (int) n); 936 937 continue; 938 } 939 940 nxt_debug(task, "port{%d,%d} %d: recvmsg %d", 941 (int) port->pid, (int) port->id, port->socket.fd, 942 (int) n); 943 944 if (n > 0) { 945 if (port->from_socket == 0) { 946 nxt_debug(task, "port{%d,%d} %d: suspend message %d", 947 (int) port->pid, (int) port->id, port->socket.fd, 948 (int) n); 949 950 smsg = port->socket_msg; 951 952 if (nxt_slow_path(smsg == NULL)) { 953 smsg = nxt_mp_alloc(port->mem_pool, 954 sizeof(nxt_port_recv_msg_t)); 955 956 if (nxt_slow_path(smsg == NULL)) { 957 nxt_alert(task, "port{%d,%d} %d: suspend message " 958 "failed", 959 (int) port->pid, (int) port->id, 960 port->socket.fd); 961 962 return; 963 } 964 965 port->socket_msg = smsg; 966 967 } else { 968 if (nxt_slow_path(smsg->size != 0)) { 969 nxt_alert(task, "port{%d,%d} %d: too many suspend " 970 "messages", 971 (int) port->pid, (int) port->id, 972 port->socket.fd); 973 974 return; 975 } 976 } 977 978 smsg->port_msg = msg.port_msg; 979 smsg->buf = b; 980 smsg->size = n; 981 smsg->fd[0] = msg.fd[0]; 982 smsg->fd[1] = msg.fd[1]; 983 984 continue; 985 } 986 987 port->from_socket--; 988 } 989 } 990 991 process: 992 993 if (n > 0) { 994 msg.buf = b; 995 msg.size = n; 996 997 nxt_port_read_msg_process(task, port, &msg); 998 999 /* 1000 * To disable instant completion or buffer re-usage, 1001 * handler should reset 'msg.buf'. 1002 */ 1003 if (msg.buf == b) { 1004 nxt_port_buf_free(port, b); 1005 } 1006 1007 continue; 1008 } 1009 1010 if (n == NXT_AGAIN) { 1011 nxt_port_buf_free(port, b); 1012 1013 nxt_fd_event_enable_read(task->thread->engine, &port->socket); 1014 1015 continue; 1016 } 1017 1018 /* n == 0 || n == NXT_ERROR */ 1019 1020 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 1021 nxt_port_error_handler, task, &port->socket, NULL); 1022 return; 1023 } 1024 } 1025 1026 1027 typedef struct { 1028 uint32_t stream; 1029 uint32_t pid; 1030 } nxt_port_frag_key_t; 1031 1032 1033 static nxt_int_t 1034 nxt_port_lvlhsh_frag_test(nxt_lvlhsh_query_t *lhq, void *data) 1035 { 1036 nxt_port_recv_msg_t *fmsg; 1037 nxt_port_frag_key_t *frag_key; 1038 1039 fmsg = data; 1040 frag_key = (nxt_port_frag_key_t *) lhq->key.start; 1041 1042 if (lhq->key.length == sizeof(nxt_port_frag_key_t) 1043 && frag_key->stream == fmsg->port_msg.stream 1044 && frag_key->pid == (uint32_t) fmsg->port_msg.pid) 1045 { 1046 return NXT_OK; 1047 } 1048 1049 return NXT_DECLINED; 1050 } 1051 1052 1053 static void * 1054 nxt_port_lvlhsh_frag_alloc(void *ctx, size_t size) 1055 { 1056 return nxt_mp_align(ctx, size, size); 1057 } 1058 1059 1060 static void 1061 nxt_port_lvlhsh_frag_free(void *ctx, void *p) 1062 { 1063 nxt_mp_free(ctx, p); 1064 } 1065 1066 1067 static const nxt_lvlhsh_proto_t lvlhsh_frag_proto nxt_aligned(64) = { 1068 NXT_LVLHSH_DEFAULT, 1069 nxt_port_lvlhsh_frag_test, 1070 nxt_port_lvlhsh_frag_alloc, 1071 nxt_port_lvlhsh_frag_free, 1072 }; 1073 1074 1075 static nxt_port_recv_msg_t * 1076 nxt_port_frag_start(nxt_task_t *task, nxt_port_t *port, 1077 nxt_port_recv_msg_t *msg) 1078 { 1079 nxt_int_t res; 1080 nxt_lvlhsh_query_t lhq; 1081 nxt_port_recv_msg_t *fmsg; 1082 nxt_port_frag_key_t frag_key; 1083 1084 nxt_debug(task, "start frag stream #%uD", msg->port_msg.stream); 1085 1086 fmsg = nxt_mp_alloc(port->mem_pool, sizeof(nxt_port_recv_msg_t)); 1087 1088 if (nxt_slow_path(fmsg == NULL)) { 1089 return NULL; 1090 } 1091 1092 *fmsg = *msg; 1093 1094 frag_key.stream = fmsg->port_msg.stream; 1095 frag_key.pid = fmsg->port_msg.pid; 1096 1097 lhq.key_hash = nxt_murmur_hash2(&frag_key, sizeof(nxt_port_frag_key_t)); 1098 lhq.key.length = sizeof(nxt_port_frag_key_t); 1099 lhq.key.start = (u_char *) &frag_key; 1100 lhq.proto = &lvlhsh_frag_proto; 1101 lhq.replace = 0; 1102 lhq.value = fmsg; 1103 lhq.pool = port->mem_pool; 1104 1105 res = nxt_lvlhsh_insert(&port->frags, &lhq); 1106 1107 switch (res) { 1108 1109 case NXT_OK: 1110 return fmsg; 1111 1112 case NXT_DECLINED: 1113 nxt_log(task, NXT_LOG_WARN, "duplicate frag stream #%uD", 1114 fmsg->port_msg.stream); 1115 nxt_mp_free(port->mem_pool, fmsg); 1116 1117 return NULL; 1118 1119 default: 1120 nxt_log(task, NXT_LOG_WARN, "failed to add frag stream #%uD", 1121 fmsg->port_msg.stream); 1122 1123 nxt_mp_free(port->mem_pool, fmsg); 1124 1125 return NULL; 1126 1127 } 1128 } 1129 1130 1131 static nxt_port_recv_msg_t * 1132 nxt_port_frag_find(nxt_task_t *task, nxt_port_t *port, nxt_port_recv_msg_t *msg) 1133 { 1134 nxt_int_t res; 1135 nxt_bool_t last; 1136 nxt_lvlhsh_query_t lhq; 1137 nxt_port_frag_key_t frag_key; 1138 1139 last = msg->port_msg.mf == 0; 1140 1141 nxt_debug(task, "%s frag stream #%uD", last ? "last" : "next", 1142 msg->port_msg.stream); 1143 1144 frag_key.stream = msg->port_msg.stream; 1145 frag_key.pid = msg->port_msg.pid; 1146 1147 lhq.key_hash = nxt_murmur_hash2(&frag_key, sizeof(nxt_port_frag_key_t)); 1148 lhq.key.length = sizeof(nxt_port_frag_key_t); 1149 lhq.key.start = (u_char *) &frag_key; 1150 lhq.proto = &lvlhsh_frag_proto; 1151 lhq.pool = port->mem_pool; 1152 1153 res = last != 0 ? nxt_lvlhsh_delete(&port->frags, &lhq) : 1154 nxt_lvlhsh_find(&port->frags, &lhq); 1155 1156 switch (res) { 1157 1158 case NXT_OK: 1159 return lhq.value; 1160 1161 default: 1162 nxt_log(task, NXT_LOG_INFO, "frag stream #%uD not found", 1163 frag_key.stream); 1164 1165 return NULL; 1166 } 1167 } 1168 1169 1170 static void 1171 nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, 1172 nxt_port_recv_msg_t *msg) 1173 { 1174 nxt_buf_t *b, *orig_b, *next; 1175 nxt_port_recv_msg_t *fmsg; 1176 1177 if (nxt_slow_path(msg->size < sizeof(nxt_port_msg_t))) { 1178 nxt_alert(task, "port %d: too small message:%uz", 1179 port->socket.fd, msg->size); 1180 1181 nxt_port_close_fds(msg->fd); 1182 1183 return; 1184 } 1185 1186 /* adjust size to actual buffer used size */ 1187 msg->size -= sizeof(nxt_port_msg_t); 1188 1189 b = orig_b = msg->buf; 1190 b->mem.free += msg->size; 1191 1192 msg->cancelled = 0; 1193 1194 if (nxt_slow_path(msg->port_msg.nf != 0)) { 1195 1196 fmsg = nxt_port_frag_find(task, port, msg); 1197 1198 if (nxt_slow_path(fmsg == NULL)) { 1199 goto fmsg_failed; 1200 } 1201 1202 if (nxt_fast_path(fmsg->cancelled == 0)) { 1203 1204 if (msg->port_msg.mmap) { 1205 nxt_port_mmap_read(task, msg); 1206 } 1207 1208 nxt_buf_chain_add(&fmsg->buf, msg->buf); 1209 1210 fmsg->size += msg->size; 1211 msg->buf = NULL; 1212 b = NULL; 1213 1214 if (nxt_fast_path(msg->port_msg.mf == 0)) { 1215 1216 b = fmsg->buf; 1217 1218 port->handler(task, fmsg); 1219 1220 msg->buf = fmsg->buf; 1221 msg->fd[0] = fmsg->fd[0]; 1222 msg->fd[1] = fmsg->fd[1]; 1223 1224 /* 1225 * To disable instant completion or buffer re-usage, 1226 * handler should reset 'msg.buf'. 1227 */ 1228 if (!msg->port_msg.mmap && msg->buf == b) { 1229 nxt_port_buf_free(port, b); 1230 } 1231 } 1232 } 1233 1234 if (nxt_fast_path(msg->port_msg.mf == 0)) { 1235 nxt_mp_free(port->mem_pool, fmsg); 1236 } 1237 } else { 1238 if (nxt_slow_path(msg->port_msg.mf != 0)) { 1239 1240 if (msg->port_msg.mmap && msg->cancelled == 0) { 1241 nxt_port_mmap_read(task, msg); 1242 b = msg->buf; 1243 } 1244 1245 fmsg = nxt_port_frag_start(task, port, msg); 1246 1247 if (nxt_slow_path(fmsg == NULL)) { 1248 goto fmsg_failed; 1249 } 1250 1251 fmsg->port_msg.nf = 0; 1252 fmsg->port_msg.mf = 0; 1253 1254 if (nxt_fast_path(msg->cancelled == 0)) { 1255 msg->buf = NULL; 1256 msg->fd[0] = -1; 1257 msg->fd[1] = -1; 1258 b = NULL; 1259 1260 } else { 1261 nxt_port_close_fds(msg->fd); 1262 } 1263 } else { 1264 if (nxt_fast_path(msg->cancelled == 0)) { 1265 1266 if (msg->port_msg.mmap) { 1267 nxt_port_mmap_read(task, msg); 1268 b = msg->buf; 1269 } 1270 1271 port->handler(task, msg); 1272 } 1273 } 1274 } 1275 1276 fmsg_failed: 1277 1278 if (msg->port_msg.mmap && orig_b != b) { 1279 1280 /* 1281 * To disable instant buffer completion, 1282 * handler should reset 'msg->buf'. 1283 */ 1284 if (msg->buf == b) { 1285 /* complete mmap buffers */ 1286 while (b != NULL) { 1287 nxt_debug(task, "complete buffer %p", b); 1288 1289 nxt_work_queue_add(port->socket.read_work_queue, 1290 b->completion_handler, task, b, b->parent); 1291 1292 next = b->next; 1293 b->next = NULL; 1294 b = next; 1295 } 1296 } 1297 1298 /* restore original buf */ 1299 msg->buf = orig_b; 1300 } 1301 } 1302 1303 1304 static nxt_buf_t * 1305 nxt_port_buf_alloc(nxt_port_t *port) 1306 { 1307 nxt_buf_t *b; 1308 1309 if (port->free_bufs != NULL) { 1310 b = port->free_bufs; 1311 port->free_bufs = b->next; 1312 1313 b->mem.pos = b->mem.start; 1314 b->mem.free = b->mem.start; 1315 b->next = NULL; 1316 } else { 1317 b = nxt_buf_mem_alloc(port->mem_pool, port->max_size, 0); 1318 if (nxt_slow_path(b == NULL)) { 1319 return NULL; 1320 } 1321 } 1322 1323 return b; 1324 } 1325 1326 1327 static void 1328 nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b) 1329 { 1330 nxt_buf_chain_add(&b, port->free_bufs); 1331 port->free_bufs = b; 1332 } 1333 1334 1335 static void 1336 nxt_port_error_handler(nxt_task_t *task, void *obj, void *data) 1337 { 1338 int use_delta; 1339 nxt_buf_t *b, *next; 1340 nxt_port_t *port; 1341 nxt_work_queue_t *wq; 1342 nxt_port_send_msg_t *msg; 1343 1344 nxt_debug(task, "port error handler %p", obj); 1345 /* TODO */ 1346 1347 port = nxt_container_of(obj, nxt_port_t, socket); 1348 1349 use_delta = 0; 1350 1351 if (obj == data) { 1352 use_delta--; 1353 } 1354 1355 wq = &task->thread->engine->fast_work_queue; 1356 1357 nxt_thread_mutex_lock(&port->write_mutex); 1358 1359 nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) { 1360 1361 nxt_port_msg_close_fd(msg); 1362 1363 for (b = msg->buf; b != NULL; b = next) { 1364 next = b->next; 1365 b->next = NULL; 1366 1367 if (nxt_buf_is_sync(b)) { 1368 continue; 1369 } 1370 1371 nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); 1372 } 1373 1374 nxt_queue_remove(&msg->link); 1375 use_delta--; 1376 1377 nxt_port_release_send_msg(msg); 1378 1379 } nxt_queue_loop; 1380 1381 nxt_thread_mutex_unlock(&port->write_mutex); 1382 1383 if (use_delta != 0) { 1384 nxt_port_use(task, port, use_delta); 1385 } 1386 } 1387