1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 static nxt_int_t nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port, 11 nxt_port_send_msg_t *msg); 12 static nxt_port_send_msg_t *nxt_port_msg_alloc(nxt_port_send_msg_t *m); 13 static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data); 14 static nxt_port_send_msg_t *nxt_port_msg_first(nxt_port_t *port); 15 static nxt_buf_t *nxt_port_buf_completion(nxt_task_t *task, 16 nxt_work_queue_t *wq, nxt_buf_t *b, size_t sent, nxt_bool_t mmap_mode); 17 static nxt_port_send_msg_t *nxt_port_msg_insert_tail(nxt_port_t *port, 18 nxt_port_send_msg_t *msg); 19 static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data); 20 static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, 21 nxt_port_recv_msg_t *msg); 22 static nxt_buf_t *nxt_port_buf_alloc(nxt_port_t *port); 23 static void nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b); 24 static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data); 25 26 27 nxt_int_t 28 nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port, size_t max_size) 29 { 30 nxt_int_t sndbuf, rcvbuf, size; 31 nxt_socket_t snd, rcv; 32 33 port->socket.task = task; 34 35 port->pair[0] = -1; 36 port->pair[1] = -1; 37 38 if (nxt_slow_path(nxt_socketpair_create(task, port->pair) != NXT_OK)) { 39 goto socketpair_fail; 40 } 41 42 snd = port->pair[1]; 43 44 sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF); 45 if (nxt_slow_path(sndbuf < 0)) { 46 goto getsockopt_fail; 47 } 48 49 rcv = port->pair[0]; 50 51 rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF); 52 if (nxt_slow_path(rcvbuf < 0)) { 53 goto getsockopt_fail; 54 } 55 56 if (max_size == 0) { 57 max_size = 16 * 1024; 58 } 59 60 if ((size_t) sndbuf < max_size) { 61 /* 62 * On Unix domain sockets 63 * Linux uses 224K on both send and receive directions; 64 * FreeBSD, MacOSX, NetBSD, and OpenBSD use 2K buffer size 65 * on send direction and 4K buffer size on receive direction; 66 * Solaris uses 16K on send direction and 5K on receive direction. 67 */ 68 (void) nxt_socket_setsockopt(task, snd, SOL_SOCKET, SO_SNDBUF, 69 max_size); 70 71 sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF); 72 if (nxt_slow_path(sndbuf < 0)) { 73 goto getsockopt_fail; 74 } 75 76 size = sndbuf * 4; 77 78 if (rcvbuf < size) { 79 (void) nxt_socket_setsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF, 80 size); 81 82 rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF); 83 if (nxt_slow_path(rcvbuf < 0)) { 84 goto getsockopt_fail; 85 } 86 } 87 } 88 89 port->max_size = nxt_min(max_size, (size_t) sndbuf); 90 port->max_share = (64 * 1024); 91 92 return NXT_OK; 93 94 getsockopt_fail: 95 96 nxt_socket_close(task, port->pair[0]); 97 nxt_socket_close(task, port->pair[1]); 98 99 socketpair_fail: 100 101 return NXT_ERROR; 102 } 103 104 105 void 106 nxt_port_destroy(nxt_port_t *port) 107 { 108 nxt_socket_close(port->socket.task, port->socket.fd); 109 nxt_mp_destroy(port->mem_pool); 110 } 111 112 113 void 114 nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port) 115 { 116 port->socket.fd = port->pair[1]; 117 port->socket.log = &nxt_main_log; 118 port->socket.write_ready = 1; 119 120 port->engine = task->thread->engine; 121 122 port->socket.write_work_queue = &port->engine->fast_work_queue; 123 port->socket.write_handler = nxt_port_write_handler; 124 port->socket.error_handler = nxt_port_error_handler; 125 } 126 127 128 void 129 nxt_port_write_close(nxt_port_t *port) 130 { 131 nxt_socket_close(port->socket.task, port->pair[1]); 132 port->pair[1] = -1; 133 } 134 135 136 static void 137 nxt_port_release_send_msg(nxt_port_send_msg_t *msg) 138 { 139 if (msg->allocated) { 140 nxt_free(msg); 141 } 142 } 143 144 145 nxt_int_t 146 nxt_port_socket_twrite(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, 147 nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b, 148 void *tracking) 149 { 150 nxt_int_t res; 151 nxt_port_send_msg_t msg; 152 153 msg.link.next = NULL; 154 msg.link.prev = NULL; 155 156 msg.buf = b; 157 msg.share = 0; 158 msg.fd = fd; 159 msg.fd2 = -1; 160 msg.close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0; 161 msg.allocated = 0; 162 163 if (tracking != NULL) { 164 nxt_port_mmap_tracking_write(msg.tracking_msg, tracking); 165 } 166 167 msg.port_msg.stream = stream; 168 msg.port_msg.pid = nxt_pid; 169 msg.port_msg.reply_port = reply_port; 170 msg.port_msg.type = type & NXT_PORT_MSG_MASK; 171 msg.port_msg.last = (type & NXT_PORT_MSG_LAST) != 0; 172 msg.port_msg.mmap = 0; 173 msg.port_msg.nf = 0; 174 msg.port_msg.mf = 0; 175 msg.port_msg.tracking = tracking != NULL; 176 177 res = nxt_port_msg_chk_insert(task, port, &msg); 178 if (nxt_fast_path(res == NXT_DECLINED)) { 179 nxt_port_write_handler(task, &port->socket, &msg); 180 res = NXT_OK; 181 } 182 183 return res; 184 } 185 186 187 static nxt_int_t 188 nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port, 189 nxt_port_send_msg_t *msg) 190 { 191 nxt_int_t res; 192 193 nxt_thread_mutex_lock(&port->write_mutex); 194 195 if (nxt_fast_path(port->socket.write_ready 196 && nxt_queue_is_empty(&port->messages))) 197 { 198 res = NXT_DECLINED; 199 200 } else { 201 msg = nxt_port_msg_alloc(msg); 202 203 if (nxt_fast_path(msg != NULL)) { 204 nxt_queue_insert_tail(&port->messages, &msg->link); 205 nxt_port_use(task, port, 1); 206 res = NXT_OK; 207 208 } else { 209 res = NXT_ERROR; 210 } 211 } 212 213 nxt_thread_mutex_unlock(&port->write_mutex); 214 215 return res; 216 } 217 218 219 static nxt_port_send_msg_t * 220 nxt_port_msg_alloc(nxt_port_send_msg_t *m) 221 { 222 nxt_port_send_msg_t *msg; 223 224 msg = nxt_malloc(sizeof(nxt_port_send_msg_t)); 225 if (nxt_slow_path(msg == NULL)) { 226 return NULL; 227 } 228 229 *msg = *m; 230 231 msg->allocated = 1; 232 233 return msg; 234 } 235 236 237 static void 238 nxt_port_fd_block_write(nxt_task_t *task, nxt_port_t *port, void *data) 239 { 240 nxt_fd_event_block_write(task->thread->engine, &port->socket); 241 } 242 243 244 static void 245 nxt_port_fd_enable_write(nxt_task_t *task, nxt_port_t *port, void *data) 246 { 247 nxt_fd_event_enable_write(task->thread->engine, &port->socket); 248 } 249 250 251 static void 252 nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) 253 { 254 int use_delta; 255 size_t plain_size; 256 ssize_t n; 257 uint32_t mmsg_buf[3 * NXT_IOBUF_MAX * 10]; 258 nxt_bool_t block_write, enable_write; 259 nxt_port_t *port; 260 struct iovec iov[NXT_IOBUF_MAX * 10]; 261 nxt_work_queue_t *wq; 262 nxt_port_method_t m; 263 nxt_port_send_msg_t *msg; 264 nxt_sendbuf_coalesce_t sb; 265 266 port = nxt_container_of(obj, nxt_port_t, socket); 267 268 block_write = 0; 269 enable_write = 0; 270 use_delta = 0; 271 272 wq = &task->thread->engine->fast_work_queue; 273 274 do { 275 if (data) { 276 msg = data; 277 278 } else { 279 msg = nxt_port_msg_first(port); 280 281 if (msg == NULL) { 282 block_write = 1; 283 goto cleanup; 284 } 285 } 286 287 next_fragment: 288 289 iov[0].iov_base = &msg->port_msg; 290 iov[0].iov_len = sizeof(nxt_port_msg_t); 291 292 sb.buf = msg->buf; 293 sb.iobuf = &iov[1]; 294 sb.nmax = NXT_IOBUF_MAX - 1; 295 sb.sync = 0; 296 sb.last = 0; 297 sb.size = 0; 298 sb.limit = port->max_size; 299 300 sb.limit_reached = 0; 301 sb.nmax_reached = 0; 302 303 m = nxt_port_mmap_get_method(task, port, msg->buf); 304 305 if (m == NXT_PORT_METHOD_MMAP) { 306 sb.limit = (1ULL << 31) - 1; 307 sb.nmax = nxt_min(NXT_IOBUF_MAX * 10 - 1, 308 port->max_size / PORT_MMAP_MIN_SIZE); 309 } 310 311 if (msg->port_msg.tracking) { 312 iov[0].iov_len += sizeof(msg->tracking_msg); 313 } 314 315 sb.limit -= iov[0].iov_len; 316 317 nxt_sendbuf_mem_coalesce(task, &sb); 318 319 plain_size = sb.size; 320 321 /* 322 * Send through mmap enabled only when payload 323 * is bigger than PORT_MMAP_MIN_SIZE. 324 */ 325 if (m == NXT_PORT_METHOD_MMAP && plain_size > PORT_MMAP_MIN_SIZE) { 326 nxt_port_mmap_write(task, port, msg, &sb, mmsg_buf); 327 328 } else { 329 m = NXT_PORT_METHOD_PLAIN; 330 } 331 332 msg->port_msg.last |= sb.last; 333 msg->port_msg.mf = sb.limit_reached || sb.nmax_reached; 334 335 n = nxt_socketpair_send(&port->socket, &msg->fd, iov, sb.niov + 1); 336 337 if (n > 0) { 338 if (nxt_slow_path((size_t) n != sb.size + iov[0].iov_len)) { 339 nxt_alert(task, "port %d: short write: %z instead of %uz", 340 port->socket.fd, n, sb.size + iov[0].iov_len); 341 goto fail; 342 } 343 344 if (msg->fd != -1 && msg->close_fd != 0) { 345 nxt_fd_close(msg->fd); 346 347 msg->fd = -1; 348 } 349 350 if (msg->fd2 != -1 && msg->close_fd != 0) { 351 nxt_fd_close(msg->fd2); 352 353 msg->fd2 = -1; 354 } 355 356 msg->buf = nxt_port_buf_completion(task, wq, msg->buf, plain_size, 357 m == NXT_PORT_METHOD_MMAP); 358 359 if (msg->buf != NULL) { 360 nxt_debug(task, "port %d: frag stream #%uD", port->socket.fd, 361 msg->port_msg.stream); 362 363 /* 364 * A file descriptor is sent only 365 * in the first message of a stream. 366 */ 367 msg->fd = -1; 368 msg->fd2 = -1; 369 msg->share += n; 370 msg->port_msg.nf = 1; 371 msg->port_msg.tracking = 0; 372 373 if (msg->share >= port->max_share) { 374 msg->share = 0; 375 376 if (msg->link.next != NULL) { 377 nxt_thread_mutex_lock(&port->write_mutex); 378 379 nxt_queue_remove(&msg->link); 380 nxt_queue_insert_tail(&port->messages, &msg->link); 381 382 nxt_thread_mutex_unlock(&port->write_mutex); 383 384 } else { 385 msg = nxt_port_msg_insert_tail(port, msg); 386 if (nxt_slow_path(msg == NULL)) { 387 goto fail; 388 } 389 390 use_delta++; 391 } 392 393 } else { 394 goto next_fragment; 395 } 396 397 } else { 398 if (msg->link.next != NULL) { 399 nxt_thread_mutex_lock(&port->write_mutex); 400 401 nxt_queue_remove(&msg->link); 402 msg->link.next = NULL; 403 404 nxt_thread_mutex_unlock(&port->write_mutex); 405 406 use_delta--; 407 } 408 409 nxt_port_release_send_msg(msg); 410 } 411 412 if (data != NULL) { 413 goto cleanup; 414 } 415 416 } else { 417 if (nxt_slow_path(n == NXT_ERROR)) { 418 goto fail; 419 } 420 421 if (msg->link.next == NULL) { 422 msg = nxt_port_msg_insert_tail(port, msg); 423 if (nxt_slow_path(msg == NULL)) { 424 goto fail; 425 } 426 427 use_delta++; 428 } 429 } 430 431 } while (port->socket.write_ready); 432 433 if (nxt_fd_event_is_disabled(port->socket.write)) { 434 enable_write = 1; 435 } 436 437 goto cleanup; 438 439 fail: 440 441 use_delta++; 442 443 nxt_work_queue_add(wq, nxt_port_error_handler, task, &port->socket, 444 &port->socket); 445 446 cleanup: 447 448 if (block_write && nxt_fd_event_is_active(port->socket.write)) { 449 nxt_port_post(task, port, nxt_port_fd_block_write, NULL); 450 } 451 452 if (enable_write) { 453 nxt_port_post(task, port, nxt_port_fd_enable_write, NULL); 454 } 455 456 if (use_delta != 0) { 457 nxt_port_use(task, port, use_delta); 458 } 459 } 460 461 462 static nxt_port_send_msg_t * 463 nxt_port_msg_first(nxt_port_t *port) 464 { 465 nxt_queue_link_t *lnk; 466 nxt_port_send_msg_t *msg; 467 468 nxt_thread_mutex_lock(&port->write_mutex); 469 470 lnk = nxt_queue_first(&port->messages); 471 472 if (lnk == nxt_queue_tail(&port->messages)) { 473 msg = NULL; 474 475 } else { 476 msg = nxt_queue_link_data(lnk, nxt_port_send_msg_t, link); 477 } 478 479 nxt_thread_mutex_unlock(&port->write_mutex); 480 481 return msg; 482 } 483 484 485 static nxt_buf_t * 486 nxt_port_buf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b, 487 size_t sent, nxt_bool_t mmap_mode) 488 { 489 size_t size; 490 nxt_buf_t *next; 491 492 while (b != NULL) { 493 494 nxt_prefetch(b->next); 495 496 if (!nxt_buf_is_sync(b)) { 497 498 size = nxt_buf_used_size(b); 499 500 if (size != 0) { 501 502 if (sent == 0) { 503 break; 504 } 505 506 if (nxt_buf_is_port_mmap(b) && mmap_mode) { 507 /* 508 * buffer has been sent to other side which is now 509 * responsible for shared memory bucket release 510 */ 511 b->is_port_mmap_sent = 1; 512 } 513 514 if (sent < size) { 515 516 if (nxt_buf_is_mem(b)) { 517 b->mem.pos += sent; 518 } 519 520 if (nxt_buf_is_file(b)) { 521 b->file_pos += sent; 522 } 523 524 break; 525 } 526 527 /* b->mem.free is NULL in file-only buffer. */ 528 b->mem.pos = b->mem.free; 529 530 if (nxt_buf_is_file(b)) { 531 b->file_pos = b->file_end; 532 } 533 534 sent -= size; 535 } 536 } 537 538 nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); 539 540 next = b->next; 541 b->next = NULL; 542 b = next; 543 } 544 545 return b; 546 } 547 548 549 static nxt_port_send_msg_t * 550 nxt_port_msg_insert_tail(nxt_port_t *port, nxt_port_send_msg_t *msg) 551 { 552 if (msg->allocated == 0) { 553 msg = nxt_port_msg_alloc(msg); 554 555 if (nxt_slow_path(msg == NULL)) { 556 return NULL; 557 } 558 } 559 560 nxt_thread_mutex_lock(&port->write_mutex); 561 562 nxt_queue_insert_tail(&port->messages, &msg->link); 563 564 nxt_thread_mutex_unlock(&port->write_mutex); 565 566 return msg; 567 } 568 569 570 void 571 nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port) 572 { 573 port->socket.fd = port->pair[0]; 574 port->socket.log = &nxt_main_log; 575 576 port->engine = task->thread->engine; 577 578 port->socket.read_work_queue = &port->engine->fast_work_queue; 579 port->socket.read_handler = nxt_port_read_handler; 580 port->socket.error_handler = nxt_port_error_handler; 581 582 nxt_fd_event_enable_read(port->engine, &port->socket); 583 } 584 585 586 void 587 nxt_port_read_close(nxt_port_t *port) 588 { 589 port->socket.read_ready = 0; 590 port->socket.read = NXT_EVENT_INACTIVE; 591 nxt_socket_close(port->socket.task, port->pair[0]); 592 port->pair[0] = -1; 593 } 594 595 596 static void 597 nxt_port_read_handler(nxt_task_t *task, void *obj, void *data) 598 { 599 ssize_t n; 600 nxt_buf_t *b; 601 nxt_port_t *port; 602 struct iovec iov[2]; 603 nxt_port_recv_msg_t msg; 604 605 port = msg.port = nxt_container_of(obj, nxt_port_t, socket); 606 607 nxt_assert(port->engine == task->thread->engine); 608 609 for ( ;; ) { 610 611 b = nxt_port_buf_alloc(port); 612 613 if (nxt_slow_path(b == NULL)) { 614 /* TODO: disable event for some time */ 615 } 616 617 iov[0].iov_base = &msg.port_msg; 618 iov[0].iov_len = sizeof(nxt_port_msg_t); 619 620 iov[1].iov_base = b->mem.pos; 621 iov[1].iov_len = port->max_size; 622 623 n = nxt_socketpair_recv(&port->socket, &msg.fd, iov, 2); 624 625 if (n > 0) { 626 627 msg.buf = b; 628 msg.size = n; 629 630 nxt_port_read_msg_process(task, port, &msg); 631 632 /* 633 * To disable instant completion or buffer re-usage, 634 * handler should reset 'msg.buf'. 635 */ 636 if (msg.buf == b) { 637 nxt_port_buf_free(port, b); 638 } 639 640 if (port->socket.read_ready) { 641 continue; 642 } 643 644 return; 645 } 646 647 if (n == NXT_AGAIN) { 648 nxt_port_buf_free(port, b); 649 650 nxt_fd_event_enable_read(task->thread->engine, &port->socket); 651 return; 652 } 653 654 /* n == 0 || n == NXT_ERROR */ 655 656 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 657 nxt_port_error_handler, task, &port->socket, NULL); 658 return; 659 } 660 } 661 662 663 typedef struct { 664 uint32_t stream; 665 uint32_t pid; 666 } nxt_port_frag_key_t; 667 668 669 static nxt_int_t 670 nxt_port_lvlhsh_frag_test(nxt_lvlhsh_query_t *lhq, void *data) 671 { 672 nxt_port_recv_msg_t *fmsg; 673 nxt_port_frag_key_t *frag_key; 674 675 fmsg = data; 676 frag_key = (nxt_port_frag_key_t *) lhq->key.start; 677 678 if (lhq->key.length == sizeof(nxt_port_frag_key_t) 679 && frag_key->stream == fmsg->port_msg.stream 680 && frag_key->pid == (uint32_t) fmsg->port_msg.pid) 681 { 682 return NXT_OK; 683 } 684 685 return NXT_DECLINED; 686 } 687 688 689 static void * 690 nxt_port_lvlhsh_frag_alloc(void *ctx, size_t size) 691 { 692 return nxt_mp_align(ctx, size, size); 693 } 694 695 696 static void 697 nxt_port_lvlhsh_frag_free(void *ctx, void *p) 698 { 699 nxt_mp_free(ctx, p); 700 } 701 702 703 static const nxt_lvlhsh_proto_t lvlhsh_frag_proto nxt_aligned(64) = { 704 NXT_LVLHSH_DEFAULT, 705 nxt_port_lvlhsh_frag_test, 706 nxt_port_lvlhsh_frag_alloc, 707 nxt_port_lvlhsh_frag_free, 708 }; 709 710 711 static nxt_port_recv_msg_t * 712 nxt_port_frag_start(nxt_task_t *task, nxt_port_t *port, 713 nxt_port_recv_msg_t *msg) 714 { 715 nxt_int_t res; 716 nxt_lvlhsh_query_t lhq; 717 nxt_port_recv_msg_t *fmsg; 718 nxt_port_frag_key_t frag_key; 719 720 nxt_debug(task, "start frag stream #%uD", msg->port_msg.stream); 721 722 fmsg = nxt_mp_alloc(port->mem_pool, sizeof(nxt_port_recv_msg_t)); 723 724 if (nxt_slow_path(fmsg == NULL)) { 725 return NULL; 726 } 727 728 *fmsg = *msg; 729 730 frag_key.stream = fmsg->port_msg.stream; 731 frag_key.pid = fmsg->port_msg.pid; 732 733 lhq.key_hash = nxt_murmur_hash2(&frag_key, sizeof(nxt_port_frag_key_t)); 734 lhq.key.length = sizeof(nxt_port_frag_key_t); 735 lhq.key.start = (u_char *) &frag_key; 736 lhq.proto = &lvlhsh_frag_proto; 737 lhq.replace = 0; 738 lhq.value = fmsg; 739 lhq.pool = port->mem_pool; 740 741 res = nxt_lvlhsh_insert(&port->frags, &lhq); 742 743 switch (res) { 744 745 case NXT_OK: 746 return fmsg; 747 748 case NXT_DECLINED: 749 nxt_log(task, NXT_LOG_WARN, "duplicate frag stream #%uD", 750 fmsg->port_msg.stream); 751 nxt_mp_free(port->mem_pool, fmsg); 752 753 return NULL; 754 755 default: 756 nxt_log(task, NXT_LOG_WARN, "failed to add frag stream #%uD", 757 fmsg->port_msg.stream); 758 759 nxt_mp_free(port->mem_pool, fmsg); 760 761 return NULL; 762 763 } 764 } 765 766 767 static nxt_port_recv_msg_t * 768 nxt_port_frag_find(nxt_task_t *task, nxt_port_t *port, nxt_port_recv_msg_t *msg) 769 { 770 nxt_int_t res; 771 nxt_bool_t last; 772 nxt_lvlhsh_query_t lhq; 773 nxt_port_frag_key_t frag_key; 774 775 last = msg->port_msg.mf == 0; 776 777 nxt_debug(task, "%s frag stream #%uD", last ? "last" : "next", 778 msg->port_msg.stream); 779 780 frag_key.stream = msg->port_msg.stream; 781 frag_key.pid = msg->port_msg.pid; 782 783 lhq.key_hash = nxt_murmur_hash2(&frag_key, sizeof(nxt_port_frag_key_t)); 784 lhq.key.length = sizeof(nxt_port_frag_key_t); 785 lhq.key.start = (u_char *) &frag_key; 786 lhq.proto = &lvlhsh_frag_proto; 787 lhq.pool = port->mem_pool; 788 789 res = last != 0 ? nxt_lvlhsh_delete(&port->frags, &lhq) : 790 nxt_lvlhsh_find(&port->frags, &lhq); 791 792 switch (res) { 793 794 case NXT_OK: 795 return lhq.value; 796 797 default: 798 nxt_log(task, NXT_LOG_INFO, "frag stream #%uD not found", 799 frag_key.stream); 800 801 return NULL; 802 } 803 } 804 805 806 static void 807 nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, 808 nxt_port_recv_msg_t *msg) 809 { 810 nxt_buf_t *b, *orig_b, *next; 811 nxt_port_recv_msg_t *fmsg; 812 813 if (nxt_slow_path(msg->size < sizeof(nxt_port_msg_t))) { 814 nxt_alert(task, "port %d: too small message:%uz", 815 port->socket.fd, msg->size); 816 817 if (msg->fd != -1) { 818 nxt_fd_close(msg->fd); 819 } 820 821 if (msg->fd2 != -1) { 822 nxt_fd_close(msg->fd2); 823 } 824 825 return; 826 } 827 828 /* adjust size to actual buffer used size */ 829 msg->size -= sizeof(nxt_port_msg_t); 830 831 b = orig_b = msg->buf; 832 b->mem.free += msg->size; 833 834 if (msg->port_msg.tracking) { 835 msg->cancelled = nxt_port_mmap_tracking_read(task, msg) == 0; 836 837 } else { 838 msg->cancelled = 0; 839 } 840 841 if (nxt_slow_path(msg->port_msg.nf != 0)) { 842 843 fmsg = nxt_port_frag_find(task, port, msg); 844 845 if (nxt_slow_path(fmsg == NULL)) { 846 goto fmsg_failed; 847 } 848 849 if (nxt_fast_path(fmsg->cancelled == 0)) { 850 851 if (msg->port_msg.mmap) { 852 nxt_port_mmap_read(task, msg); 853 } 854 855 nxt_buf_chain_add(&fmsg->buf, msg->buf); 856 857 fmsg->size += msg->size; 858 msg->buf = NULL; 859 b = NULL; 860 861 if (nxt_fast_path(msg->port_msg.mf == 0)) { 862 863 b = fmsg->buf; 864 865 port->handler(task, fmsg); 866 867 msg->buf = fmsg->buf; 868 msg->fd = fmsg->fd; 869 msg->fd2 = fmsg->fd2; 870 871 /* 872 * To disable instant completion or buffer re-usage, 873 * handler should reset 'msg.buf'. 874 */ 875 if (!msg->port_msg.mmap && msg->buf == b) { 876 nxt_port_buf_free(port, b); 877 } 878 } 879 } 880 881 if (nxt_fast_path(msg->port_msg.mf == 0)) { 882 nxt_mp_free(port->mem_pool, fmsg); 883 } 884 } else { 885 if (nxt_slow_path(msg->port_msg.mf != 0)) { 886 887 if (msg->port_msg.mmap && msg->cancelled == 0) { 888 nxt_port_mmap_read(task, msg); 889 b = msg->buf; 890 } 891 892 fmsg = nxt_port_frag_start(task, port, msg); 893 894 if (nxt_slow_path(fmsg == NULL)) { 895 goto fmsg_failed; 896 } 897 898 fmsg->port_msg.nf = 0; 899 fmsg->port_msg.mf = 0; 900 901 if (nxt_fast_path(msg->cancelled == 0)) { 902 msg->buf = NULL; 903 msg->fd = -1; 904 msg->fd2 = -1; 905 b = NULL; 906 907 } else { 908 if (msg->fd != -1) { 909 nxt_fd_close(msg->fd); 910 } 911 912 if (msg->fd2 != -1) { 913 nxt_fd_close(msg->fd2); 914 } 915 } 916 } else { 917 if (nxt_fast_path(msg->cancelled == 0)) { 918 919 if (msg->port_msg.mmap) { 920 nxt_port_mmap_read(task, msg); 921 b = msg->buf; 922 } 923 924 port->handler(task, msg); 925 } 926 } 927 } 928 929 fmsg_failed: 930 931 if (msg->port_msg.mmap && orig_b != b) { 932 933 /* 934 * To disable instant buffer completion, 935 * handler should reset 'msg->buf'. 936 */ 937 if (msg->buf == b) { 938 /* complete mmap buffers */ 939 while (b != NULL) { 940 nxt_debug(task, "complete buffer %p", b); 941 942 nxt_work_queue_add(port->socket.read_work_queue, 943 b->completion_handler, task, b, b->parent); 944 945 next = b->next; 946 b->next = NULL; 947 b = next; 948 } 949 } 950 951 /* restore original buf */ 952 msg->buf = orig_b; 953 } 954 } 955 956 957 static nxt_buf_t * 958 nxt_port_buf_alloc(nxt_port_t *port) 959 { 960 nxt_buf_t *b; 961 962 if (port->free_bufs != NULL) { 963 b = port->free_bufs; 964 port->free_bufs = b->next; 965 966 b->mem.pos = b->mem.start; 967 b->mem.free = b->mem.start; 968 b->next = NULL; 969 } else { 970 b = nxt_buf_mem_alloc(port->mem_pool, port->max_size, 0); 971 if (nxt_slow_path(b == NULL)) { 972 return NULL; 973 } 974 } 975 976 return b; 977 } 978 979 980 static void 981 nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b) 982 { 983 nxt_buf_chain_add(&b, port->free_bufs); 984 port->free_bufs = b; 985 } 986 987 988 static void 989 nxt_port_error_handler(nxt_task_t *task, void *obj, void *data) 990 { 991 int use_delta; 992 nxt_buf_t *b, *next; 993 nxt_port_t *port; 994 nxt_work_queue_t *wq; 995 nxt_port_send_msg_t *msg; 996 997 nxt_debug(task, "port error handler %p", obj); 998 /* TODO */ 999 1000 port = nxt_container_of(obj, nxt_port_t, socket); 1001 1002 use_delta = 0; 1003 1004 if (obj == data) { 1005 use_delta--; 1006 } 1007 1008 wq = &task->thread->engine->fast_work_queue; 1009 1010 nxt_thread_mutex_lock(&port->write_mutex); 1011 1012 nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) { 1013 1014 if (msg->fd != -1 && msg->close_fd != 0) { 1015 nxt_fd_close(msg->fd); 1016 1017 msg->fd = -1; 1018 } 1019 1020 if (msg->fd2 != -1 && msg->close_fd != 0) { 1021 nxt_fd_close(msg->fd2); 1022 1023 msg->fd2 = -1; 1024 } 1025 1026 for (b = msg->buf; b != NULL; b = next) { 1027 next = b->next; 1028 b->next = NULL; 1029 1030 if (nxt_buf_is_sync(b)) { 1031 continue; 1032 } 1033 1034 nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); 1035 } 1036 1037 nxt_queue_remove(&msg->link); 1038 use_delta--; 1039 1040 nxt_port_release_send_msg(msg); 1041 1042 } nxt_queue_loop; 1043 1044 nxt_thread_mutex_unlock(&port->write_mutex); 1045 1046 if (use_delta != 0) { 1047 nxt_port_use(task, port, use_delta); 1048 } 1049 } 1050