1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 #include <nxt_port_queue.h> 9 #include <nxt_port_memory_int.h> 10 11 12 #define NXT_PORT_MAX_ENQUEUE_BUF_SIZE \ 13 (int) (NXT_PORT_QUEUE_MSG_SIZE - sizeof(nxt_port_msg_t)) 14 15 16 static nxt_bool_t nxt_port_can_enqueue_buf(nxt_buf_t *b); 17 static uint8_t nxt_port_enqueue_buf(nxt_task_t *task, nxt_port_msg_t *pm, 18 void *qbuf, nxt_buf_t *b); 19 static nxt_int_t nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port, 20 nxt_port_send_msg_t *msg); 21 static nxt_port_send_msg_t *nxt_port_msg_alloc(nxt_port_send_msg_t *m); 22 static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data); 23 static nxt_port_send_msg_t *nxt_port_msg_first(nxt_port_t *port); 24 static nxt_buf_t *nxt_port_buf_completion(nxt_task_t *task, 25 nxt_work_queue_t *wq, nxt_buf_t *b, size_t sent, nxt_bool_t mmap_mode); 26 static nxt_port_send_msg_t *nxt_port_msg_insert_tail(nxt_port_t *port, 27 nxt_port_send_msg_t *msg); 28 static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data); 29 static void nxt_port_queue_read_handler(nxt_task_t *task, void *obj, 30 void *data); 31 static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, 32 nxt_port_recv_msg_t *msg); 33 static nxt_buf_t *nxt_port_buf_alloc(nxt_port_t *port); 34 static void nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b); 35 static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data); 36 37 38 nxt_int_t 39 nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port, size_t max_size) 40 { 41 nxt_int_t sndbuf, rcvbuf, size; 42 nxt_socket_t snd, rcv; 43 44 port->socket.task = task; 45 46 port->pair[0] = -1; 47 port->pair[1] = -1; 48 49 if (nxt_slow_path(nxt_socketpair_create(task, port->pair) != NXT_OK)) { 50 goto socketpair_fail; 51 } 52 53 snd = port->pair[1]; 54 55 sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF); 56 if (nxt_slow_path(sndbuf < 0)) { 57 goto getsockopt_fail; 58 } 59 60 rcv = port->pair[0]; 61 62 rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF); 63 if (nxt_slow_path(rcvbuf < 0)) { 64 goto getsockopt_fail; 65 } 66 67 if (max_size == 0) { 68 max_size = 16 * 1024; 69 } 70 71 if ((size_t) sndbuf < max_size) { 72 /* 73 * On Unix domain sockets 74 * Linux uses 224K on both send and receive directions; 75 * FreeBSD, MacOSX, NetBSD, and OpenBSD use 2K buffer size 76 * on send direction and 4K buffer size on receive direction; 77 * Solaris uses 16K on send direction and 5K on receive direction. 78 */ 79 (void) nxt_socket_setsockopt(task, snd, SOL_SOCKET, SO_SNDBUF, 80 max_size); 81 82 sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF); 83 if (nxt_slow_path(sndbuf < 0)) { 84 goto getsockopt_fail; 85 } 86 87 size = sndbuf * 4; 88 89 if (rcvbuf < size) { 90 (void) nxt_socket_setsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF, 91 size); 92 93 rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF); 94 if (nxt_slow_path(rcvbuf < 0)) { 95 goto getsockopt_fail; 96 } 97 } 98 } 99 100 port->max_size = nxt_min(max_size, (size_t) sndbuf); 101 port->max_share = (64 * 1024); 102 103 return NXT_OK; 104 105 getsockopt_fail: 106 107 nxt_socket_close(task, port->pair[0]); 108 nxt_socket_close(task, port->pair[1]); 109 110 socketpair_fail: 111 112 return NXT_ERROR; 113 } 114 115 116 void 117 nxt_port_destroy(nxt_port_t *port) 118 { 119 nxt_socket_close(port->socket.task, port->socket.fd); 120 nxt_mp_destroy(port->mem_pool); 121 } 122 123 124 void 125 nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port) 126 { 127 port->socket.fd = port->pair[1]; 128 port->socket.log = &nxt_main_log; 129 port->socket.write_ready = 1; 130 131 port->engine = task->thread->engine; 132 133 port->socket.write_work_queue = &port->engine->fast_work_queue; 134 port->socket.write_handler = nxt_port_write_handler; 135 port->socket.error_handler = nxt_port_error_handler; 136 } 137 138 139 void 140 nxt_port_write_close(nxt_port_t *port) 141 { 142 nxt_socket_close(port->socket.task, port->pair[1]); 143 port->pair[1] = -1; 144 } 145 146 147 static void 148 nxt_port_release_send_msg(nxt_port_send_msg_t *msg) 149 { 150 if (msg->allocated) { 151 nxt_free(msg); 152 } 153 } 154 155 156 nxt_int_t 157 nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, 158 nxt_fd_t fd, nxt_fd_t fd2, uint32_t stream, nxt_port_id_t reply_port, 159 nxt_buf_t *b) 160 { 161 int notify; 162 uint8_t qmsg_size; 163 nxt_int_t res; 164 nxt_port_send_msg_t msg; 165 struct { 166 nxt_port_msg_t pm; 167 uint8_t buf[NXT_PORT_MAX_ENQUEUE_BUF_SIZE]; 168 } qmsg; 169 170 msg.link.next = NULL; 171 msg.link.prev = NULL; 172 173 msg.buf = b; 174 msg.share = 0; 175 msg.fd[0] = fd; 176 msg.fd[1] = fd2; 177 msg.close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0; 178 msg.allocated = 0; 179 180 msg.port_msg.stream = stream; 181 msg.port_msg.pid = nxt_pid; 182 msg.port_msg.reply_port = reply_port; 183 msg.port_msg.type = type & NXT_PORT_MSG_MASK; 184 msg.port_msg.last = (type & NXT_PORT_MSG_LAST) != 0; 185 msg.port_msg.mmap = 0; 186 msg.port_msg.nf = 0; 187 msg.port_msg.mf = 0; 188 189 if (port->queue != NULL && type != _NXT_PORT_MSG_READ_QUEUE) { 190 191 if (fd == -1 && nxt_port_can_enqueue_buf(b)) { 192 qmsg.pm = msg.port_msg; 193 194 qmsg_size = sizeof(qmsg.pm); 195 196 if (b != NULL) { 197 qmsg_size += nxt_port_enqueue_buf(task, &qmsg.pm, qmsg.buf, b); 198 } 199 200 res = nxt_port_queue_send(port->queue, &qmsg, qmsg_size, ¬ify); 201 202 nxt_debug(task, "port{%d,%d} %d: enqueue %d notify %d, %d", 203 (int) port->pid, (int) port->id, port->socket.fd, 204 (int) qmsg_size, notify, res); 205 206 if (b != NULL && nxt_fast_path(res == NXT_OK)) { 207 if (qmsg.pm.mmap) { 208 b->is_port_mmap_sent = 1; 209 } 210 211 b->mem.pos = b->mem.free; 212 213 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 214 b->completion_handler, task, b, b->parent); 215 } 216 217 if (notify == 0) { 218 return res; 219 } 220 221 msg.port_msg.type = _NXT_PORT_MSG_READ_QUEUE; 222 msg.buf = NULL; 223 224 } else { 225 qmsg.buf[0] = _NXT_PORT_MSG_READ_SOCKET; 226 227 res = nxt_port_queue_send(port->queue, qmsg.buf, 1, ¬ify); 228 229 nxt_debug(task, "port{%d,%d} %d: enqueue 1 notify %d, %d", 230 (int) port->pid, (int) port->id, port->socket.fd, 231 notify, res); 232 233 if (nxt_slow_path(res == NXT_AGAIN)) { 234 return NXT_AGAIN; 235 } 236 } 237 } 238 239 res = nxt_port_msg_chk_insert(task, port, &msg); 240 if (nxt_fast_path(res == NXT_DECLINED)) { 241 nxt_port_write_handler(task, &port->socket, &msg); 242 res = NXT_OK; 243 } 244 245 return res; 246 } 247 248 249 static nxt_bool_t 250 nxt_port_can_enqueue_buf(nxt_buf_t *b) 251 { 252 if (b == NULL) { 253 return 1; 254 } 255 256 if (b->next != NULL) { 257 return 0; 258 } 259 260 return (nxt_buf_mem_used_size(&b->mem) <= NXT_PORT_MAX_ENQUEUE_BUF_SIZE 261 || nxt_buf_is_port_mmap(b)); 262 } 263 264 265 static uint8_t 266 nxt_port_enqueue_buf(nxt_task_t *task, nxt_port_msg_t *pm, void *qbuf, 267 nxt_buf_t *b) 268 { 269 ssize_t size; 270 nxt_port_mmap_msg_t *mm; 271 nxt_port_mmap_header_t *hdr; 272 nxt_port_mmap_handler_t *mmap_handler; 273 274 size = nxt_buf_mem_used_size(&b->mem); 275 276 if (size <= NXT_PORT_MAX_ENQUEUE_BUF_SIZE) { 277 nxt_memcpy(qbuf, b->mem.pos, size); 278 279 return size; 280 } 281 282 mmap_handler = b->parent; 283 hdr = mmap_handler->hdr; 284 mm = qbuf; 285 286 mm->mmap_id = hdr->id; 287 mm->chunk_id = nxt_port_mmap_chunk_id(hdr, b->mem.pos); 288 mm->size = nxt_buf_mem_used_size(&b->mem); 289 290 pm->mmap = 1; 291 292 nxt_debug(task, "mmap_msg={%D, %D, %D}", mm->mmap_id, mm->chunk_id, 293 mm->size); 294 295 return sizeof(nxt_port_mmap_msg_t); 296 } 297 298 299 static nxt_int_t 300 nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port, 301 nxt_port_send_msg_t *msg) 302 { 303 nxt_int_t res; 304 305 nxt_thread_mutex_lock(&port->write_mutex); 306 307 if (nxt_fast_path(port->socket.write_ready 308 && nxt_queue_is_empty(&port->messages))) 309 { 310 res = NXT_DECLINED; 311 312 } else { 313 msg = nxt_port_msg_alloc(msg); 314 315 if (nxt_fast_path(msg != NULL)) { 316 nxt_queue_insert_tail(&port->messages, &msg->link); 317 nxt_port_use(task, port, 1); 318 res = NXT_OK; 319 320 } else { 321 res = NXT_ERROR; 322 } 323 } 324 325 nxt_thread_mutex_unlock(&port->write_mutex); 326 327 return res; 328 } 329 330 331 static nxt_port_send_msg_t * 332 nxt_port_msg_alloc(nxt_port_send_msg_t *m) 333 { 334 nxt_port_send_msg_t *msg; 335 336 msg = nxt_malloc(sizeof(nxt_port_send_msg_t)); 337 if (nxt_slow_path(msg == NULL)) { 338 return NULL; 339 } 340 341 *msg = *m; 342 343 msg->allocated = 1; 344 345 return msg; 346 } 347 348 349 static void 350 nxt_port_fd_block_write(nxt_task_t *task, nxt_port_t *port, void *data) 351 { 352 nxt_fd_event_block_write(task->thread->engine, &port->socket); 353 } 354 355 356 static void 357 nxt_port_fd_enable_write(nxt_task_t *task, nxt_port_t *port, void *data) 358 { 359 nxt_fd_event_enable_write(task->thread->engine, &port->socket); 360 } 361 362 363 static void 364 nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) 365 { 366 int use_delta; 367 size_t plain_size; 368 ssize_t n; 369 uint32_t mmsg_buf[3 * NXT_IOBUF_MAX * 10]; 370 nxt_bool_t block_write, enable_write; 371 nxt_port_t *port; 372 struct iovec iov[NXT_IOBUF_MAX * 10]; 373 nxt_work_queue_t *wq; 374 nxt_port_method_t m; 375 nxt_port_send_msg_t *msg; 376 nxt_sendbuf_coalesce_t sb; 377 378 port = nxt_container_of(obj, nxt_port_t, socket); 379 380 block_write = 0; 381 enable_write = 0; 382 use_delta = 0; 383 384 wq = &task->thread->engine->fast_work_queue; 385 386 do { 387 if (data) { 388 msg = data; 389 390 } else { 391 msg = nxt_port_msg_first(port); 392 393 if (msg == NULL) { 394 block_write = 1; 395 goto cleanup; 396 } 397 } 398 399 next_fragment: 400 401 iov[0].iov_base = &msg->port_msg; 402 iov[0].iov_len = sizeof(nxt_port_msg_t); 403 404 sb.buf = msg->buf; 405 sb.iobuf = &iov[1]; 406 sb.nmax = NXT_IOBUF_MAX - 1; 407 sb.sync = 0; 408 sb.last = 0; 409 sb.size = 0; 410 sb.limit = port->max_size; 411 412 sb.limit_reached = 0; 413 sb.nmax_reached = 0; 414 415 m = nxt_port_mmap_get_method(task, port, msg->buf); 416 417 if (m == NXT_PORT_METHOD_MMAP) { 418 sb.limit = (1ULL << 31) - 1; 419 sb.nmax = nxt_min(NXT_IOBUF_MAX * 10 - 1, 420 port->max_size / PORT_MMAP_MIN_SIZE); 421 } 422 423 sb.limit -= iov[0].iov_len; 424 425 nxt_sendbuf_mem_coalesce(task, &sb); 426 427 plain_size = sb.size; 428 429 /* 430 * Send through mmap enabled only when payload 431 * is bigger than PORT_MMAP_MIN_SIZE. 432 */ 433 if (m == NXT_PORT_METHOD_MMAP && plain_size > PORT_MMAP_MIN_SIZE) { 434 nxt_port_mmap_write(task, port, msg, &sb, mmsg_buf); 435 436 } else { 437 m = NXT_PORT_METHOD_PLAIN; 438 } 439 440 msg->port_msg.last |= sb.last; 441 msg->port_msg.mf = sb.limit_reached || sb.nmax_reached; 442 443 n = nxt_socketpair_send(&port->socket, msg->fd, iov, sb.niov + 1); 444 445 if (n > 0) { 446 if (nxt_slow_path((size_t) n != sb.size + iov[0].iov_len)) { 447 nxt_alert(task, "port %d: short write: %z instead of %uz", 448 port->socket.fd, n, sb.size + iov[0].iov_len); 449 goto fail; 450 } 451 452 if (msg->close_fd) { 453 if (msg->fd[0] != -1) { 454 nxt_fd_close(msg->fd[0]); 455 456 msg->fd[0] = -1; 457 } 458 459 if (msg->fd[1] != -1) { 460 nxt_fd_close(msg->fd[1]); 461 462 msg->fd[1] = -1; 463 } 464 } 465 466 msg->buf = nxt_port_buf_completion(task, wq, msg->buf, plain_size, 467 m == NXT_PORT_METHOD_MMAP); 468 469 if (msg->buf != NULL) { 470 nxt_debug(task, "port %d: frag stream #%uD", port->socket.fd, 471 msg->port_msg.stream); 472 473 /* 474 * A file descriptor is sent only 475 * in the first message of a stream. 476 */ 477 msg->fd[0] = -1; 478 msg->fd[1] = -1; 479 msg->share += n; 480 msg->port_msg.nf = 1; 481 482 if (msg->share >= port->max_share) { 483 msg->share = 0; 484 485 if (msg->link.next != NULL) { 486 nxt_thread_mutex_lock(&port->write_mutex); 487 488 nxt_queue_remove(&msg->link); 489 nxt_queue_insert_tail(&port->messages, &msg->link); 490 491 nxt_thread_mutex_unlock(&port->write_mutex); 492 493 } else { 494 msg = nxt_port_msg_insert_tail(port, msg); 495 if (nxt_slow_path(msg == NULL)) { 496 goto fail; 497 } 498 499 use_delta++; 500 } 501 502 } else { 503 goto next_fragment; 504 } 505 506 } else { 507 if (msg->link.next != NULL) { 508 nxt_thread_mutex_lock(&port->write_mutex); 509 510 nxt_queue_remove(&msg->link); 511 msg->link.next = NULL; 512 513 nxt_thread_mutex_unlock(&port->write_mutex); 514 515 use_delta--; 516 } 517 518 nxt_port_release_send_msg(msg); 519 } 520 521 if (data != NULL) { 522 goto cleanup; 523 } 524 525 } else { 526 if (nxt_slow_path(n == NXT_ERROR)) { 527 if (msg->link.next == NULL) { 528 if (msg->close_fd) { 529 if (msg->fd[0] != -1) { 530 nxt_fd_close(msg->fd[0]); 531 532 msg->fd[0] = -1; 533 } 534 535 if (msg->fd[1] != -1) { 536 nxt_fd_close(msg->fd[1]); 537 538 msg->fd[1] = -1; 539 } 540 } 541 542 nxt_port_release_send_msg(msg); 543 } 544 545 goto fail; 546 } 547 548 if (msg->link.next == NULL) { 549 msg = nxt_port_msg_insert_tail(port, msg); 550 if (nxt_slow_path(msg == NULL)) { 551 goto fail; 552 } 553 554 use_delta++; 555 } 556 } 557 558 } while (port->socket.write_ready); 559 560 if (nxt_fd_event_is_disabled(port->socket.write)) { 561 enable_write = 1; 562 } 563 564 goto cleanup; 565 566 fail: 567 568 use_delta++; 569 570 nxt_work_queue_add(wq, nxt_port_error_handler, task, &port->socket, 571 &port->socket); 572 573 cleanup: 574 575 if (block_write && nxt_fd_event_is_active(port->socket.write)) { 576 nxt_port_post(task, port, nxt_port_fd_block_write, NULL); 577 } 578 579 if (enable_write) { 580 nxt_port_post(task, port, nxt_port_fd_enable_write, NULL); 581 } 582 583 if (use_delta != 0) { 584 nxt_port_use(task, port, use_delta); 585 } 586 } 587 588 589 static nxt_port_send_msg_t * 590 nxt_port_msg_first(nxt_port_t *port) 591 { 592 nxt_queue_link_t *lnk; 593 nxt_port_send_msg_t *msg; 594 595 nxt_thread_mutex_lock(&port->write_mutex); 596 597 lnk = nxt_queue_first(&port->messages); 598 599 if (lnk == nxt_queue_tail(&port->messages)) { 600 msg = NULL; 601 602 } else { 603 msg = nxt_queue_link_data(lnk, nxt_port_send_msg_t, link); 604 } 605 606 nxt_thread_mutex_unlock(&port->write_mutex); 607 608 return msg; 609 } 610 611 612 static nxt_buf_t * 613 nxt_port_buf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b, 614 size_t sent, nxt_bool_t mmap_mode) 615 { 616 size_t size; 617 nxt_buf_t *next; 618 619 while (b != NULL) { 620 621 nxt_prefetch(b->next); 622 623 if (!nxt_buf_is_sync(b)) { 624 625 size = nxt_buf_used_size(b); 626 627 if (size != 0) { 628 629 if (sent == 0) { 630 break; 631 } 632 633 if (nxt_buf_is_port_mmap(b) && mmap_mode) { 634 /* 635 * buffer has been sent to other side which is now 636 * responsible for shared memory bucket release 637 */ 638 b->is_port_mmap_sent = 1; 639 } 640 641 if (sent < size) { 642 643 if (nxt_buf_is_mem(b)) { 644 b->mem.pos += sent; 645 } 646 647 if (nxt_buf_is_file(b)) { 648 b->file_pos += sent; 649 } 650 651 break; 652 } 653 654 /* b->mem.free is NULL in file-only buffer. */ 655 b->mem.pos = b->mem.free; 656 657 if (nxt_buf_is_file(b)) { 658 b->file_pos = b->file_end; 659 } 660 661 sent -= size; 662 } 663 } 664 665 nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); 666 667 next = b->next; 668 b->next = NULL; 669 b = next; 670 } 671 672 return b; 673 } 674 675 676 static nxt_port_send_msg_t * 677 nxt_port_msg_insert_tail(nxt_port_t *port, nxt_port_send_msg_t *msg) 678 { 679 if (msg->allocated == 0) { 680 msg = nxt_port_msg_alloc(msg); 681 682 if (nxt_slow_path(msg == NULL)) { 683 return NULL; 684 } 685 } 686 687 nxt_thread_mutex_lock(&port->write_mutex); 688 689 nxt_queue_insert_tail(&port->messages, &msg->link); 690 691 nxt_thread_mutex_unlock(&port->write_mutex); 692 693 return msg; 694 } 695 696 697 void 698 nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port) 699 { 700 port->socket.fd = port->pair[0]; 701 port->socket.log = &nxt_main_log; 702 703 port->engine = task->thread->engine; 704 705 port->socket.read_work_queue = &port->engine->fast_work_queue; 706 port->socket.read_handler = port->queue != NULL 707 ? nxt_port_queue_read_handler 708 : nxt_port_read_handler; 709 port->socket.error_handler = nxt_port_error_handler; 710 711 nxt_fd_event_enable_read(port->engine, &port->socket); 712 } 713 714 715 void 716 nxt_port_read_close(nxt_port_t *port) 717 { 718 port->socket.read_ready = 0; 719 port->socket.read = NXT_EVENT_INACTIVE; 720 nxt_socket_close(port->socket.task, port->pair[0]); 721 port->pair[0] = -1; 722 } 723 724 725 static void 726 nxt_port_read_handler(nxt_task_t *task, void *obj, void *data) 727 { 728 ssize_t n; 729 nxt_buf_t *b; 730 nxt_port_t *port; 731 struct iovec iov[2]; 732 nxt_port_recv_msg_t msg; 733 734 port = msg.port = nxt_container_of(obj, nxt_port_t, socket); 735 736 nxt_assert(port->engine == task->thread->engine); 737 738 for ( ;; ) { 739 740 b = nxt_port_buf_alloc(port); 741 742 if (nxt_slow_path(b == NULL)) { 743 /* TODO: disable event for some time */ 744 } 745 746 iov[0].iov_base = &msg.port_msg; 747 iov[0].iov_len = sizeof(nxt_port_msg_t); 748 749 iov[1].iov_base = b->mem.pos; 750 iov[1].iov_len = port->max_size; 751 752 n = nxt_socketpair_recv(&port->socket, msg.fd, iov, 2); 753 754 if (n > 0) { 755 756 msg.buf = b; 757 msg.size = n; 758 759 nxt_port_read_msg_process(task, port, &msg); 760 761 /* 762 * To disable instant completion or buffer re-usage, 763 * handler should reset 'msg.buf'. 764 */ 765 if (msg.buf == b) { 766 nxt_port_buf_free(port, b); 767 } 768 769 if (port->socket.read_ready) { 770 continue; 771 } 772 773 return; 774 } 775 776 if (n == NXT_AGAIN) { 777 nxt_port_buf_free(port, b); 778 779 nxt_fd_event_enable_read(task->thread->engine, &port->socket); 780 return; 781 } 782 783 /* n == 0 || n == NXT_ERROR */ 784 785 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 786 nxt_port_error_handler, task, &port->socket, NULL); 787 return; 788 } 789 } 790 791 792 static void 793 nxt_port_queue_read_handler(nxt_task_t *task, void *obj, void *data) 794 { 795 ssize_t n; 796 nxt_buf_t *b; 797 nxt_port_t *port; 798 struct iovec iov[2]; 799 nxt_port_queue_t *queue; 800 nxt_port_recv_msg_t msg, *smsg; 801 uint8_t qmsg[NXT_PORT_QUEUE_MSG_SIZE]; 802 803 port = nxt_container_of(obj, nxt_port_t, socket); 804 msg.port = port; 805 806 nxt_assert(port->engine == task->thread->engine); 807 808 queue = port->queue; 809 nxt_atomic_fetch_add(&queue->nitems, 1); 810 811 for ( ;; ) { 812 813 if (port->from_socket == 0) { 814 n = nxt_port_queue_recv(queue, qmsg); 815 816 if (n < 0 && !port->socket.read_ready) { 817 nxt_atomic_fetch_add(&queue->nitems, -1); 818 819 n = nxt_port_queue_recv(queue, qmsg); 820 if (n < 0) { 821 return; 822 } 823 824 nxt_atomic_fetch_add(&queue->nitems, 1); 825 } 826 827 if (n == 1 && qmsg[0] == _NXT_PORT_MSG_READ_SOCKET) { 828 port->from_socket++; 829 830 nxt_debug(task, "port{%d,%d} %d: dequeue 1 read_socket %d", 831 (int) port->pid, (int) port->id, port->socket.fd, 832 port->from_socket); 833 834 continue; 835 } 836 837 nxt_debug(task, "port{%d,%d} %d: dequeue %d", 838 (int) port->pid, (int) port->id, port->socket.fd, 839 (int) n); 840 841 } else { 842 if ((smsg = port->socket_msg) != NULL && smsg->size != 0) { 843 msg.port_msg = smsg->port_msg; 844 b = smsg->buf; 845 n = smsg->size; 846 msg.fd[0] = smsg->fd[0]; 847 msg.fd[1] = smsg->fd[1]; 848 849 smsg->size = 0; 850 851 port->from_socket--; 852 853 nxt_debug(task, "port{%d,%d} %d: use suspended message %d", 854 (int) port->pid, (int) port->id, port->socket.fd, 855 (int) n); 856 857 goto process; 858 } 859 860 n = -1; 861 } 862 863 if (n < 0 && !port->socket.read_ready) { 864 nxt_atomic_fetch_add(&queue->nitems, -1); 865 return; 866 } 867 868 b = nxt_port_buf_alloc(port); 869 870 if (nxt_slow_path(b == NULL)) { 871 /* TODO: disable event for some time */ 872 } 873 874 if (n >= (ssize_t) sizeof(nxt_port_msg_t)) { 875 nxt_memcpy(&msg.port_msg, qmsg, sizeof(nxt_port_msg_t)); 876 877 if (n > (ssize_t) sizeof(nxt_port_msg_t)) { 878 nxt_memcpy(b->mem.pos, qmsg + sizeof(nxt_port_msg_t), 879 n - sizeof(nxt_port_msg_t)); 880 } 881 882 } else { 883 iov[0].iov_base = &msg.port_msg; 884 iov[0].iov_len = sizeof(nxt_port_msg_t); 885 886 iov[1].iov_base = b->mem.pos; 887 iov[1].iov_len = port->max_size; 888 889 n = nxt_socketpair_recv(&port->socket, msg.fd, iov, 2); 890 891 if (n == (ssize_t) sizeof(nxt_port_msg_t) 892 && msg.port_msg.type == _NXT_PORT_MSG_READ_QUEUE) 893 { 894 nxt_port_buf_free(port, b); 895 896 nxt_debug(task, "port{%d,%d} %d: recv %d read_queue", 897 (int) port->pid, (int) port->id, port->socket.fd, 898 (int) n); 899 900 continue; 901 } 902 903 nxt_debug(task, "port{%d,%d} %d: recvmsg %d", 904 (int) port->pid, (int) port->id, port->socket.fd, 905 (int) n); 906 907 if (n > 0) { 908 if (port->from_socket == 0) { 909 nxt_debug(task, "port{%d,%d} %d: suspend message %d", 910 (int) port->pid, (int) port->id, port->socket.fd, 911 (int) n); 912 913 smsg = port->socket_msg; 914 915 if (nxt_slow_path(smsg == NULL)) { 916 smsg = nxt_mp_alloc(port->mem_pool, 917 sizeof(nxt_port_recv_msg_t)); 918 919 if (nxt_slow_path(smsg == NULL)) { 920 nxt_alert(task, "port{%d,%d} %d: suspend message " 921 "failed", 922 (int) port->pid, (int) port->id, 923 port->socket.fd); 924 925 return; 926 } 927 928 port->socket_msg = smsg; 929 930 } else { 931 if (nxt_slow_path(smsg->size != 0)) { 932 nxt_alert(task, "port{%d,%d} %d: too many suspend " 933 "messages", 934 (int) port->pid, (int) port->id, 935 port->socket.fd); 936 937 return; 938 } 939 } 940 941 smsg->port_msg = msg.port_msg; 942 smsg->buf = b; 943 smsg->size = n; 944 smsg->fd[0] = msg.fd[0]; 945 smsg->fd[1] = msg.fd[1]; 946 947 continue; 948 } 949 950 port->from_socket--; 951 } 952 } 953 954 process: 955 956 if (n > 0) { 957 msg.buf = b; 958 msg.size = n; 959 960 nxt_port_read_msg_process(task, port, &msg); 961 962 /* 963 * To disable instant completion or buffer re-usage, 964 * handler should reset 'msg.buf'. 965 */ 966 if (msg.buf == b) { 967 nxt_port_buf_free(port, b); 968 } 969 970 continue; 971 } 972 973 if (n == NXT_AGAIN) { 974 nxt_port_buf_free(port, b); 975 976 nxt_fd_event_enable_read(task->thread->engine, &port->socket); 977 978 continue; 979 } 980 981 /* n == 0 || n == NXT_ERROR */ 982 983 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 984 nxt_port_error_handler, task, &port->socket, NULL); 985 return; 986 } 987 } 988 989 990 typedef struct { 991 uint32_t stream; 992 uint32_t pid; 993 } nxt_port_frag_key_t; 994 995 996 static nxt_int_t 997 nxt_port_lvlhsh_frag_test(nxt_lvlhsh_query_t *lhq, void *data) 998 { 999 nxt_port_recv_msg_t *fmsg; 1000 nxt_port_frag_key_t *frag_key; 1001 1002 fmsg = data; 1003 frag_key = (nxt_port_frag_key_t *) lhq->key.start; 1004 1005 if (lhq->key.length == sizeof(nxt_port_frag_key_t) 1006 && frag_key->stream == fmsg->port_msg.stream 1007 && frag_key->pid == (uint32_t) fmsg->port_msg.pid) 1008 { 1009 return NXT_OK; 1010 } 1011 1012 return NXT_DECLINED; 1013 } 1014 1015 1016 static void * 1017 nxt_port_lvlhsh_frag_alloc(void *ctx, size_t size) 1018 { 1019 return nxt_mp_align(ctx, size, size); 1020 } 1021 1022 1023 static void 1024 nxt_port_lvlhsh_frag_free(void *ctx, void *p) 1025 { 1026 nxt_mp_free(ctx, p); 1027 } 1028 1029 1030 static const nxt_lvlhsh_proto_t lvlhsh_frag_proto nxt_aligned(64) = { 1031 NXT_LVLHSH_DEFAULT, 1032 nxt_port_lvlhsh_frag_test, 1033 nxt_port_lvlhsh_frag_alloc, 1034 nxt_port_lvlhsh_frag_free, 1035 }; 1036 1037 1038 static nxt_port_recv_msg_t * 1039 nxt_port_frag_start(nxt_task_t *task, nxt_port_t *port, 1040 nxt_port_recv_msg_t *msg) 1041 { 1042 nxt_int_t res; 1043 nxt_lvlhsh_query_t lhq; 1044 nxt_port_recv_msg_t *fmsg; 1045 nxt_port_frag_key_t frag_key; 1046 1047 nxt_debug(task, "start frag stream #%uD", msg->port_msg.stream); 1048 1049 fmsg = nxt_mp_alloc(port->mem_pool, sizeof(nxt_port_recv_msg_t)); 1050 1051 if (nxt_slow_path(fmsg == NULL)) { 1052 return NULL; 1053 } 1054 1055 *fmsg = *msg; 1056 1057 frag_key.stream = fmsg->port_msg.stream; 1058 frag_key.pid = fmsg->port_msg.pid; 1059 1060 lhq.key_hash = nxt_murmur_hash2(&frag_key, sizeof(nxt_port_frag_key_t)); 1061 lhq.key.length = sizeof(nxt_port_frag_key_t); 1062 lhq.key.start = (u_char *) &frag_key; 1063 lhq.proto = &lvlhsh_frag_proto; 1064 lhq.replace = 0; 1065 lhq.value = fmsg; 1066 lhq.pool = port->mem_pool; 1067 1068 res = nxt_lvlhsh_insert(&port->frags, &lhq); 1069 1070 switch (res) { 1071 1072 case NXT_OK: 1073 return fmsg; 1074 1075 case NXT_DECLINED: 1076 nxt_log(task, NXT_LOG_WARN, "duplicate frag stream #%uD", 1077 fmsg->port_msg.stream); 1078 nxt_mp_free(port->mem_pool, fmsg); 1079 1080 return NULL; 1081 1082 default: 1083 nxt_log(task, NXT_LOG_WARN, "failed to add frag stream #%uD", 1084 fmsg->port_msg.stream); 1085 1086 nxt_mp_free(port->mem_pool, fmsg); 1087 1088 return NULL; 1089 1090 } 1091 } 1092 1093 1094 static nxt_port_recv_msg_t * 1095 nxt_port_frag_find(nxt_task_t *task, nxt_port_t *port, nxt_port_recv_msg_t *msg) 1096 { 1097 nxt_int_t res; 1098 nxt_bool_t last; 1099 nxt_lvlhsh_query_t lhq; 1100 nxt_port_frag_key_t frag_key; 1101 1102 last = msg->port_msg.mf == 0; 1103 1104 nxt_debug(task, "%s frag stream #%uD", last ? "last" : "next", 1105 msg->port_msg.stream); 1106 1107 frag_key.stream = msg->port_msg.stream; 1108 frag_key.pid = msg->port_msg.pid; 1109 1110 lhq.key_hash = nxt_murmur_hash2(&frag_key, sizeof(nxt_port_frag_key_t)); 1111 lhq.key.length = sizeof(nxt_port_frag_key_t); 1112 lhq.key.start = (u_char *) &frag_key; 1113 lhq.proto = &lvlhsh_frag_proto; 1114 lhq.pool = port->mem_pool; 1115 1116 res = last != 0 ? nxt_lvlhsh_delete(&port->frags, &lhq) : 1117 nxt_lvlhsh_find(&port->frags, &lhq); 1118 1119 switch (res) { 1120 1121 case NXT_OK: 1122 return lhq.value; 1123 1124 default: 1125 nxt_log(task, NXT_LOG_INFO, "frag stream #%uD not found", 1126 frag_key.stream); 1127 1128 return NULL; 1129 } 1130 } 1131 1132 1133 static void 1134 nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, 1135 nxt_port_recv_msg_t *msg) 1136 { 1137 nxt_buf_t *b, *orig_b, *next; 1138 nxt_port_recv_msg_t *fmsg; 1139 1140 if (nxt_slow_path(msg->size < sizeof(nxt_port_msg_t))) { 1141 nxt_alert(task, "port %d: too small message:%uz", 1142 port->socket.fd, msg->size); 1143 1144 if (msg->fd[0] != -1) { 1145 nxt_fd_close(msg->fd[0]); 1146 } 1147 1148 if (msg->fd[1] != -1) { 1149 nxt_fd_close(msg->fd[1]); 1150 } 1151 1152 return; 1153 } 1154 1155 /* adjust size to actual buffer used size */ 1156 msg->size -= sizeof(nxt_port_msg_t); 1157 1158 b = orig_b = msg->buf; 1159 b->mem.free += msg->size; 1160 1161 msg->cancelled = 0; 1162 1163 if (nxt_slow_path(msg->port_msg.nf != 0)) { 1164 1165 fmsg = nxt_port_frag_find(task, port, msg); 1166 1167 if (nxt_slow_path(fmsg == NULL)) { 1168 goto fmsg_failed; 1169 } 1170 1171 if (nxt_fast_path(fmsg->cancelled == 0)) { 1172 1173 if (msg->port_msg.mmap) { 1174 nxt_port_mmap_read(task, msg); 1175 } 1176 1177 nxt_buf_chain_add(&fmsg->buf, msg->buf); 1178 1179 fmsg->size += msg->size; 1180 msg->buf = NULL; 1181 b = NULL; 1182 1183 if (nxt_fast_path(msg->port_msg.mf == 0)) { 1184 1185 b = fmsg->buf; 1186 1187 port->handler(task, fmsg); 1188 1189 msg->buf = fmsg->buf; 1190 msg->fd[0] = fmsg->fd[0]; 1191 msg->fd[1] = fmsg->fd[1]; 1192 1193 /* 1194 * To disable instant completion or buffer re-usage, 1195 * handler should reset 'msg.buf'. 1196 */ 1197 if (!msg->port_msg.mmap && msg->buf == b) { 1198 nxt_port_buf_free(port, b); 1199 } 1200 } 1201 } 1202 1203 if (nxt_fast_path(msg->port_msg.mf == 0)) { 1204 nxt_mp_free(port->mem_pool, fmsg); 1205 } 1206 } else { 1207 if (nxt_slow_path(msg->port_msg.mf != 0)) { 1208 1209 if (msg->port_msg.mmap && msg->cancelled == 0) { 1210 nxt_port_mmap_read(task, msg); 1211 b = msg->buf; 1212 } 1213 1214 fmsg = nxt_port_frag_start(task, port, msg); 1215 1216 if (nxt_slow_path(fmsg == NULL)) { 1217 goto fmsg_failed; 1218 } 1219 1220 fmsg->port_msg.nf = 0; 1221 fmsg->port_msg.mf = 0; 1222 1223 if (nxt_fast_path(msg->cancelled == 0)) { 1224 msg->buf = NULL; 1225 msg->fd[0] = -1; 1226 msg->fd[1] = -1; 1227 b = NULL; 1228 1229 } else { 1230 if (msg->fd[0] != -1) { 1231 nxt_fd_close(msg->fd[0]); 1232 } 1233 1234 if (msg->fd[1] != -1) { 1235 nxt_fd_close(msg->fd[1]); 1236 } 1237 } 1238 } else { 1239 if (nxt_fast_path(msg->cancelled == 0)) { 1240 1241 if (msg->port_msg.mmap) { 1242 nxt_port_mmap_read(task, msg); 1243 b = msg->buf; 1244 } 1245 1246 port->handler(task, msg); 1247 } 1248 } 1249 } 1250 1251 fmsg_failed: 1252 1253 if (msg->port_msg.mmap && orig_b != b) { 1254 1255 /* 1256 * To disable instant buffer completion, 1257 * handler should reset 'msg->buf'. 1258 */ 1259 if (msg->buf == b) { 1260 /* complete mmap buffers */ 1261 while (b != NULL) { 1262 nxt_debug(task, "complete buffer %p", b); 1263 1264 nxt_work_queue_add(port->socket.read_work_queue, 1265 b->completion_handler, task, b, b->parent); 1266 1267 next = b->next; 1268 b->next = NULL; 1269 b = next; 1270 } 1271 } 1272 1273 /* restore original buf */ 1274 msg->buf = orig_b; 1275 } 1276 } 1277 1278 1279 static nxt_buf_t * 1280 nxt_port_buf_alloc(nxt_port_t *port) 1281 { 1282 nxt_buf_t *b; 1283 1284 if (port->free_bufs != NULL) { 1285 b = port->free_bufs; 1286 port->free_bufs = b->next; 1287 1288 b->mem.pos = b->mem.start; 1289 b->mem.free = b->mem.start; 1290 b->next = NULL; 1291 } else { 1292 b = nxt_buf_mem_alloc(port->mem_pool, port->max_size, 0); 1293 if (nxt_slow_path(b == NULL)) { 1294 return NULL; 1295 } 1296 } 1297 1298 return b; 1299 } 1300 1301 1302 static void 1303 nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b) 1304 { 1305 nxt_buf_chain_add(&b, port->free_bufs); 1306 port->free_bufs = b; 1307 } 1308 1309 1310 static void 1311 nxt_port_error_handler(nxt_task_t *task, void *obj, void *data) 1312 { 1313 int use_delta; 1314 nxt_buf_t *b, *next; 1315 nxt_port_t *port; 1316 nxt_work_queue_t *wq; 1317 nxt_port_send_msg_t *msg; 1318 1319 nxt_debug(task, "port error handler %p", obj); 1320 /* TODO */ 1321 1322 port = nxt_container_of(obj, nxt_port_t, socket); 1323 1324 use_delta = 0; 1325 1326 if (obj == data) { 1327 use_delta--; 1328 } 1329 1330 wq = &task->thread->engine->fast_work_queue; 1331 1332 nxt_thread_mutex_lock(&port->write_mutex); 1333 1334 nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) { 1335 1336 if (msg->close_fd) { 1337 if (msg->fd[0] != -1) { 1338 nxt_fd_close(msg->fd[0]); 1339 1340 msg->fd[0] = -1; 1341 } 1342 1343 if (msg->fd[1] != -1) { 1344 nxt_fd_close(msg->fd[1]); 1345 1346 msg->fd[1] = -1; 1347 } 1348 } 1349 1350 for (b = msg->buf; b != NULL; b = next) { 1351 next = b->next; 1352 b->next = NULL; 1353 1354 if (nxt_buf_is_sync(b)) { 1355 continue; 1356 } 1357 1358 nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); 1359 } 1360 1361 nxt_queue_remove(&msg->link); 1362 use_delta--; 1363 1364 nxt_port_release_send_msg(msg); 1365 1366 } nxt_queue_loop; 1367 1368 nxt_thread_mutex_unlock(&port->write_mutex); 1369 1370 if (use_delta != 0) { 1371 nxt_port_use(task, port, use_delta); 1372 } 1373 } 1374