1 2 /* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 9 10 static nxt_int_t nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port, 11 nxt_port_send_msg_t *msg); 12 static nxt_port_send_msg_t *nxt_port_msg_alloc(nxt_port_send_msg_t *m); 13 static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data); 14 static nxt_port_send_msg_t *nxt_port_msg_first(nxt_port_t *port); 15 static nxt_buf_t *nxt_port_buf_completion(nxt_task_t *task, 16 nxt_work_queue_t *wq, nxt_buf_t *b, size_t sent, nxt_bool_t mmap_mode); 17 static nxt_port_send_msg_t *nxt_port_msg_insert_tail(nxt_port_t *port, 18 nxt_port_send_msg_t *msg); 19 static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data); 20 static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, 21 nxt_port_recv_msg_t *msg); 22 static nxt_buf_t *nxt_port_buf_alloc(nxt_port_t *port); 23 static void nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b); 24 static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data); 25 26 27 nxt_int_t 28 nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port, size_t max_size) 29 { 30 nxt_int_t sndbuf, rcvbuf, size; 31 nxt_socket_t snd, rcv; 32 33 port->socket.task = task; 34 35 port->pair[0] = -1; 36 port->pair[1] = -1; 37 38 if (nxt_slow_path(nxt_socketpair_create(task, port->pair) != NXT_OK)) { 39 goto socketpair_fail; 40 } 41 42 snd = port->pair[1]; 43 44 sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF); 45 if (nxt_slow_path(sndbuf < 0)) { 46 goto getsockopt_fail; 47 } 48 49 rcv = port->pair[0]; 50 51 rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF); 52 if (nxt_slow_path(rcvbuf < 0)) { 53 goto getsockopt_fail; 54 } 55 56 if (max_size == 0) { 57 max_size = 16 * 1024; 58 } 59 60 if ((size_t) sndbuf < max_size) { 61 /* 62 * On Unix domain sockets 63 * Linux uses 224K on both send and receive directions; 64 * FreeBSD, MacOSX, NetBSD, and OpenBSD use 2K buffer size 65 * on send direction and 4K buffer size on receive direction; 66 * Solaris uses 16K on send direction and 5K on receive direction. 67 */ 68 (void) nxt_socket_setsockopt(task, snd, SOL_SOCKET, SO_SNDBUF, 69 max_size); 70 71 sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF); 72 if (nxt_slow_path(sndbuf < 0)) { 73 goto getsockopt_fail; 74 } 75 76 size = sndbuf * 4; 77 78 if (rcvbuf < size) { 79 (void) nxt_socket_setsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF, 80 size); 81 82 rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF); 83 if (nxt_slow_path(rcvbuf < 0)) { 84 goto getsockopt_fail; 85 } 86 } 87 } 88 89 port->max_size = nxt_min(max_size, (size_t) sndbuf); 90 port->max_share = (64 * 1024); 91 92 return NXT_OK; 93 94 getsockopt_fail: 95 96 nxt_socket_close(task, port->pair[0]); 97 nxt_socket_close(task, port->pair[1]); 98 99 socketpair_fail: 100 101 return NXT_ERROR; 102 } 103 104 105 void 106 nxt_port_destroy(nxt_port_t *port) 107 { 108 nxt_socket_close(port->socket.task, port->socket.fd); 109 nxt_mp_destroy(port->mem_pool); 110 } 111 112 113 void 114 nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port) 115 { 116 port->socket.fd = port->pair[1]; 117 port->socket.log = &nxt_main_log; 118 port->socket.write_ready = 1; 119 120 port->engine = task->thread->engine; 121 122 port->socket.write_work_queue = &port->engine->fast_work_queue; 123 port->socket.write_handler = nxt_port_write_handler; 124 port->socket.error_handler = nxt_port_error_handler; 125 } 126 127 128 void 129 nxt_port_write_close(nxt_port_t *port) 130 { 131 nxt_socket_close(port->socket.task, port->pair[1]); 132 port->pair[1] = -1; 133 } 134 135 136 static void 137 nxt_port_release_send_msg(nxt_port_send_msg_t *msg) 138 { 139 if (msg->allocated) { 140 nxt_free(msg); 141 } 142 } 143 144 145 nxt_int_t 146 nxt_port_socket_twrite(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, 147 nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b, 148 void *tracking) 149 { 150 nxt_int_t res; 151 nxt_port_send_msg_t msg; 152 153 msg.link.next = NULL; 154 msg.link.prev = NULL; 155 156 msg.buf = b; 157 msg.share = 0; 158 msg.fd = fd; 159 msg.close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0; 160 msg.allocated = 0; 161 162 if (tracking != NULL) { 163 nxt_port_mmap_tracking_write(msg.tracking_msg, tracking); 164 } 165 166 msg.port_msg.stream = stream; 167 msg.port_msg.pid = nxt_pid; 168 msg.port_msg.reply_port = reply_port; 169 msg.port_msg.type = type & NXT_PORT_MSG_MASK; 170 msg.port_msg.last = (type & NXT_PORT_MSG_LAST) != 0; 171 msg.port_msg.mmap = 0; 172 msg.port_msg.nf = 0; 173 msg.port_msg.mf = 0; 174 msg.port_msg.tracking = tracking != NULL; 175 176 res = nxt_port_msg_chk_insert(task, port, &msg); 177 if (nxt_fast_path(res == NXT_DECLINED)) { 178 nxt_port_write_handler(task, &port->socket, &msg); 179 res = NXT_OK; 180 } 181 182 return res; 183 } 184 185 186 static nxt_int_t 187 nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port, 188 nxt_port_send_msg_t *msg) 189 { 190 nxt_int_t res; 191 192 nxt_thread_mutex_lock(&port->write_mutex); 193 194 if (nxt_fast_path(port->socket.write_ready 195 && nxt_queue_is_empty(&port->messages))) 196 { 197 res = NXT_DECLINED; 198 199 } else { 200 msg = nxt_port_msg_alloc(msg); 201 202 if (nxt_fast_path(msg != NULL)) { 203 nxt_queue_insert_tail(&port->messages, &msg->link); 204 nxt_port_use(task, port, 1); 205 res = NXT_OK; 206 207 } else { 208 res = NXT_ERROR; 209 } 210 } 211 212 nxt_thread_mutex_unlock(&port->write_mutex); 213 214 return res; 215 } 216 217 218 static nxt_port_send_msg_t * 219 nxt_port_msg_alloc(nxt_port_send_msg_t *m) 220 { 221 nxt_port_send_msg_t *msg; 222 223 msg = nxt_malloc(sizeof(nxt_port_send_msg_t)); 224 if (nxt_slow_path(msg == NULL)) { 225 return NULL; 226 } 227 228 *msg = *m; 229 230 msg->allocated = 1; 231 232 return msg; 233 } 234 235 236 static void 237 nxt_port_fd_block_write(nxt_task_t *task, nxt_port_t *port, void *data) 238 { 239 nxt_fd_event_block_write(task->thread->engine, &port->socket); 240 } 241 242 243 static void 244 nxt_port_fd_enable_write(nxt_task_t *task, nxt_port_t *port, void *data) 245 { 246 nxt_fd_event_enable_write(task->thread->engine, &port->socket); 247 } 248 249 250 static void 251 nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) 252 { 253 int use_delta; 254 size_t plain_size; 255 ssize_t n; 256 uint32_t mmsg_buf[3 * NXT_IOBUF_MAX * 10]; 257 nxt_bool_t block_write, enable_write; 258 nxt_port_t *port; 259 struct iovec iov[NXT_IOBUF_MAX * 10]; 260 nxt_work_queue_t *wq; 261 nxt_port_method_t m; 262 nxt_port_send_msg_t *msg; 263 nxt_sendbuf_coalesce_t sb; 264 265 port = nxt_container_of(obj, nxt_port_t, socket); 266 267 block_write = 0; 268 enable_write = 0; 269 use_delta = 0; 270 271 wq = &task->thread->engine->fast_work_queue; 272 273 do { 274 if (data) { 275 msg = data; 276 277 } else { 278 msg = nxt_port_msg_first(port); 279 280 if (msg == NULL) { 281 block_write = 1; 282 goto cleanup; 283 } 284 } 285 286 next_fragment: 287 288 iov[0].iov_base = &msg->port_msg; 289 iov[0].iov_len = sizeof(nxt_port_msg_t); 290 291 sb.buf = msg->buf; 292 sb.iobuf = &iov[1]; 293 sb.nmax = NXT_IOBUF_MAX - 1; 294 sb.sync = 0; 295 sb.last = 0; 296 sb.size = 0; 297 sb.limit = port->max_size; 298 299 sb.limit_reached = 0; 300 sb.nmax_reached = 0; 301 302 m = nxt_port_mmap_get_method(task, port, msg->buf); 303 304 if (m == NXT_PORT_METHOD_MMAP) { 305 sb.limit = (1ULL << 31) - 1; 306 sb.nmax = nxt_min(NXT_IOBUF_MAX * 10 - 1, 307 port->max_size / PORT_MMAP_MIN_SIZE); 308 } 309 310 if (msg->port_msg.tracking) { 311 iov[0].iov_len += sizeof(msg->tracking_msg); 312 } 313 314 sb.limit -= iov[0].iov_len; 315 316 nxt_sendbuf_mem_coalesce(task, &sb); 317 318 plain_size = sb.size; 319 320 /* 321 * Send through mmap enabled only when payload 322 * is bigger than PORT_MMAP_MIN_SIZE. 323 */ 324 if (m == NXT_PORT_METHOD_MMAP && plain_size > PORT_MMAP_MIN_SIZE) { 325 nxt_port_mmap_write(task, port, msg, &sb, mmsg_buf); 326 327 } else { 328 m = NXT_PORT_METHOD_PLAIN; 329 } 330 331 msg->port_msg.last |= sb.last; 332 msg->port_msg.mf = sb.limit_reached || sb.nmax_reached; 333 334 n = nxt_socketpair_send(&port->socket, msg->fd, iov, sb.niov + 1); 335 336 if (n > 0) { 337 if (nxt_slow_path((size_t) n != sb.size + iov[0].iov_len)) { 338 nxt_alert(task, "port %d: short write: %z instead of %uz", 339 port->socket.fd, n, sb.size + iov[0].iov_len); 340 goto fail; 341 } 342 343 if (msg->fd != -1 && msg->close_fd != 0) { 344 nxt_fd_close(msg->fd); 345 346 msg->fd = -1; 347 } 348 349 msg->buf = nxt_port_buf_completion(task, wq, msg->buf, plain_size, 350 m == NXT_PORT_METHOD_MMAP); 351 352 if (msg->buf != NULL) { 353 nxt_debug(task, "port %d: frag stream #%uD", port->socket.fd, 354 msg->port_msg.stream); 355 356 /* 357 * A file descriptor is sent only 358 * in the first message of a stream. 359 */ 360 msg->fd = -1; 361 msg->share += n; 362 msg->port_msg.nf = 1; 363 msg->port_msg.tracking = 0; 364 365 if (msg->share >= port->max_share) { 366 msg->share = 0; 367 368 if (msg->link.next != NULL) { 369 nxt_thread_mutex_lock(&port->write_mutex); 370 371 nxt_queue_remove(&msg->link); 372 nxt_queue_insert_tail(&port->messages, &msg->link); 373 374 nxt_thread_mutex_unlock(&port->write_mutex); 375 376 } else { 377 msg = nxt_port_msg_insert_tail(port, msg); 378 if (nxt_slow_path(msg == NULL)) { 379 goto fail; 380 } 381 382 use_delta++; 383 } 384 385 } else { 386 goto next_fragment; 387 } 388 389 } else { 390 if (msg->link.next != NULL) { 391 nxt_thread_mutex_lock(&port->write_mutex); 392 393 nxt_queue_remove(&msg->link); 394 msg->link.next = NULL; 395 396 nxt_thread_mutex_unlock(&port->write_mutex); 397 398 use_delta--; 399 } 400 401 nxt_port_release_send_msg(msg); 402 } 403 404 if (data != NULL) { 405 goto cleanup; 406 } 407 408 } else { 409 if (nxt_slow_path(n == NXT_ERROR)) { 410 goto fail; 411 } 412 413 if (msg->link.next == NULL) { 414 msg = nxt_port_msg_insert_tail(port, msg); 415 if (nxt_slow_path(msg == NULL)) { 416 goto fail; 417 } 418 419 use_delta++; 420 } 421 } 422 423 } while (port->socket.write_ready); 424 425 if (nxt_fd_event_is_disabled(port->socket.write)) { 426 enable_write = 1; 427 } 428 429 goto cleanup; 430 431 fail: 432 433 use_delta++; 434 435 nxt_work_queue_add(wq, nxt_port_error_handler, task, &port->socket, 436 &port->socket); 437 438 cleanup: 439 440 if (block_write && nxt_fd_event_is_active(port->socket.write)) { 441 nxt_port_post(task, port, nxt_port_fd_block_write, NULL); 442 } 443 444 if (enable_write) { 445 nxt_port_post(task, port, nxt_port_fd_enable_write, NULL); 446 } 447 448 if (use_delta != 0) { 449 nxt_port_use(task, port, use_delta); 450 } 451 } 452 453 454 static nxt_port_send_msg_t * 455 nxt_port_msg_first(nxt_port_t *port) 456 { 457 nxt_queue_link_t *lnk; 458 nxt_port_send_msg_t *msg; 459 460 nxt_thread_mutex_lock(&port->write_mutex); 461 462 lnk = nxt_queue_first(&port->messages); 463 464 if (lnk == nxt_queue_tail(&port->messages)) { 465 msg = NULL; 466 467 } else { 468 msg = nxt_queue_link_data(lnk, nxt_port_send_msg_t, link); 469 } 470 471 nxt_thread_mutex_unlock(&port->write_mutex); 472 473 return msg; 474 } 475 476 477 static nxt_buf_t * 478 nxt_port_buf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b, 479 size_t sent, nxt_bool_t mmap_mode) 480 { 481 size_t size; 482 nxt_buf_t *next; 483 484 while (b != NULL) { 485 486 nxt_prefetch(b->next); 487 488 if (!nxt_buf_is_sync(b)) { 489 490 size = nxt_buf_used_size(b); 491 492 if (size != 0) { 493 494 if (sent == 0) { 495 break; 496 } 497 498 if (nxt_buf_is_port_mmap(b) && mmap_mode) { 499 /* 500 * buffer has been sent to other side which is now 501 * responsible for shared memory bucket release 502 */ 503 b->is_port_mmap_sent = 1; 504 } 505 506 if (sent < size) { 507 508 if (nxt_buf_is_mem(b)) { 509 b->mem.pos += sent; 510 } 511 512 if (nxt_buf_is_file(b)) { 513 b->file_pos += sent; 514 } 515 516 break; 517 } 518 519 /* b->mem.free is NULL in file-only buffer. */ 520 b->mem.pos = b->mem.free; 521 522 if (nxt_buf_is_file(b)) { 523 b->file_pos = b->file_end; 524 } 525 526 sent -= size; 527 } 528 } 529 530 nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); 531 532 next = b->next; 533 b->next = NULL; 534 b = next; 535 } 536 537 return b; 538 } 539 540 541 static nxt_port_send_msg_t * 542 nxt_port_msg_insert_tail(nxt_port_t *port, nxt_port_send_msg_t *msg) 543 { 544 if (msg->allocated == 0) { 545 msg = nxt_port_msg_alloc(msg); 546 547 if (nxt_slow_path(msg == NULL)) { 548 return NULL; 549 } 550 } 551 552 nxt_thread_mutex_lock(&port->write_mutex); 553 554 nxt_queue_insert_tail(&port->messages, &msg->link); 555 556 nxt_thread_mutex_unlock(&port->write_mutex); 557 558 return msg; 559 } 560 561 562 void 563 nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port) 564 { 565 port->socket.fd = port->pair[0]; 566 port->socket.log = &nxt_main_log; 567 568 port->engine = task->thread->engine; 569 570 port->socket.read_work_queue = &port->engine->fast_work_queue; 571 port->socket.read_handler = nxt_port_read_handler; 572 port->socket.error_handler = nxt_port_error_handler; 573 574 nxt_fd_event_enable_read(port->engine, &port->socket); 575 } 576 577 578 void 579 nxt_port_read_close(nxt_port_t *port) 580 { 581 port->socket.read_ready = 0; 582 port->socket.read = NXT_EVENT_INACTIVE; 583 nxt_socket_close(port->socket.task, port->pair[0]); 584 port->pair[0] = -1; 585 } 586 587 588 static void 589 nxt_port_read_handler(nxt_task_t *task, void *obj, void *data) 590 { 591 ssize_t n; 592 nxt_buf_t *b; 593 nxt_port_t *port; 594 struct iovec iov[2]; 595 nxt_port_recv_msg_t msg; 596 597 port = msg.port = nxt_container_of(obj, nxt_port_t, socket); 598 599 nxt_assert(port->engine == task->thread->engine); 600 601 for ( ;; ) { 602 603 b = nxt_port_buf_alloc(port); 604 605 if (nxt_slow_path(b == NULL)) { 606 /* TODO: disable event for some time */ 607 } 608 609 iov[0].iov_base = &msg.port_msg; 610 iov[0].iov_len = sizeof(nxt_port_msg_t); 611 612 iov[1].iov_base = b->mem.pos; 613 iov[1].iov_len = port->max_size; 614 615 n = nxt_socketpair_recv(&port->socket, &msg.fd, iov, 2); 616 617 if (n > 0) { 618 619 msg.buf = b; 620 msg.size = n; 621 622 nxt_port_read_msg_process(task, port, &msg); 623 624 /* 625 * To disable instant completion or buffer re-usage, 626 * handler should reset 'msg.buf'. 627 */ 628 if (msg.buf == b) { 629 nxt_port_buf_free(port, b); 630 } 631 632 if (port->socket.read_ready) { 633 continue; 634 } 635 636 return; 637 } 638 639 if (n == NXT_AGAIN) { 640 nxt_port_buf_free(port, b); 641 642 nxt_fd_event_enable_read(task->thread->engine, &port->socket); 643 return; 644 } 645 646 /* n == 0 || n == NXT_ERROR */ 647 648 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 649 nxt_port_error_handler, task, &port->socket, NULL); 650 return; 651 } 652 } 653 654 655 typedef struct { 656 uint32_t stream; 657 uint32_t pid; 658 } nxt_port_frag_key_t; 659 660 661 static nxt_int_t 662 nxt_port_lvlhsh_frag_test(nxt_lvlhsh_query_t *lhq, void *data) 663 { 664 nxt_port_recv_msg_t *fmsg; 665 nxt_port_frag_key_t *frag_key; 666 667 fmsg = data; 668 frag_key = (nxt_port_frag_key_t *) lhq->key.start; 669 670 if (lhq->key.length == sizeof(nxt_port_frag_key_t) 671 && frag_key->stream == fmsg->port_msg.stream 672 && frag_key->pid == (uint32_t) fmsg->port_msg.pid) 673 { 674 return NXT_OK; 675 } 676 677 return NXT_DECLINED; 678 } 679 680 681 static void * 682 nxt_port_lvlhsh_frag_alloc(void *ctx, size_t size) 683 { 684 return nxt_mp_align(ctx, size, size); 685 } 686 687 688 static void 689 nxt_port_lvlhsh_frag_free(void *ctx, void *p) 690 { 691 nxt_mp_free(ctx, p); 692 } 693 694 695 static const nxt_lvlhsh_proto_t lvlhsh_frag_proto nxt_aligned(64) = { 696 NXT_LVLHSH_DEFAULT, 697 nxt_port_lvlhsh_frag_test, 698 nxt_port_lvlhsh_frag_alloc, 699 nxt_port_lvlhsh_frag_free, 700 }; 701 702 703 static nxt_port_recv_msg_t * 704 nxt_port_frag_start(nxt_task_t *task, nxt_port_t *port, 705 nxt_port_recv_msg_t *msg) 706 { 707 nxt_int_t res; 708 nxt_lvlhsh_query_t lhq; 709 nxt_port_recv_msg_t *fmsg; 710 nxt_port_frag_key_t frag_key; 711 712 nxt_debug(task, "start frag stream #%uD", msg->port_msg.stream); 713 714 fmsg = nxt_mp_alloc(port->mem_pool, sizeof(nxt_port_recv_msg_t)); 715 716 if (nxt_slow_path(fmsg == NULL)) { 717 return NULL; 718 } 719 720 *fmsg = *msg; 721 722 frag_key.stream = fmsg->port_msg.stream; 723 frag_key.pid = fmsg->port_msg.pid; 724 725 lhq.key_hash = nxt_murmur_hash2(&frag_key, sizeof(nxt_port_frag_key_t)); 726 lhq.key.length = sizeof(nxt_port_frag_key_t); 727 lhq.key.start = (u_char *) &frag_key; 728 lhq.proto = &lvlhsh_frag_proto; 729 lhq.replace = 0; 730 lhq.value = fmsg; 731 lhq.pool = port->mem_pool; 732 733 res = nxt_lvlhsh_insert(&port->frags, &lhq); 734 735 switch (res) { 736 737 case NXT_OK: 738 return fmsg; 739 740 case NXT_DECLINED: 741 nxt_log(task, NXT_LOG_WARN, "duplicate frag stream #%uD", 742 fmsg->port_msg.stream); 743 nxt_mp_free(port->mem_pool, fmsg); 744 745 return NULL; 746 747 default: 748 nxt_log(task, NXT_LOG_WARN, "failed to add frag stream #%uD", 749 fmsg->port_msg.stream); 750 751 nxt_mp_free(port->mem_pool, fmsg); 752 753 return NULL; 754 755 } 756 } 757 758 759 static nxt_port_recv_msg_t * 760 nxt_port_frag_find(nxt_task_t *task, nxt_port_t *port, nxt_port_recv_msg_t *msg) 761 { 762 nxt_int_t res; 763 nxt_bool_t last; 764 nxt_lvlhsh_query_t lhq; 765 nxt_port_frag_key_t frag_key; 766 767 last = msg->port_msg.mf == 0; 768 769 nxt_debug(task, "%s frag stream #%uD", last ? "last" : "next", 770 msg->port_msg.stream); 771 772 frag_key.stream = msg->port_msg.stream; 773 frag_key.pid = msg->port_msg.pid; 774 775 lhq.key_hash = nxt_murmur_hash2(&frag_key, sizeof(nxt_port_frag_key_t)); 776 lhq.key.length = sizeof(nxt_port_frag_key_t); 777 lhq.key.start = (u_char *) &frag_key; 778 lhq.proto = &lvlhsh_frag_proto; 779 lhq.pool = port->mem_pool; 780 781 res = last != 0 ? nxt_lvlhsh_delete(&port->frags, &lhq) : 782 nxt_lvlhsh_find(&port->frags, &lhq); 783 784 switch (res) { 785 786 case NXT_OK: 787 return lhq.value; 788 789 default: 790 nxt_log(task, NXT_LOG_INFO, "frag stream #%uD not found", 791 frag_key.stream); 792 793 return NULL; 794 } 795 } 796 797 798 static void 799 nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, 800 nxt_port_recv_msg_t *msg) 801 { 802 nxt_buf_t *b, *orig_b, *next; 803 nxt_port_recv_msg_t *fmsg; 804 805 if (nxt_slow_path(msg->size < sizeof(nxt_port_msg_t))) { 806 nxt_alert(task, "port %d: too small message:%uz", 807 port->socket.fd, msg->size); 808 809 if (msg->fd != -1) { 810 nxt_fd_close(msg->fd); 811 } 812 813 return; 814 } 815 816 /* adjust size to actual buffer used size */ 817 msg->size -= sizeof(nxt_port_msg_t); 818 819 b = orig_b = msg->buf; 820 b->mem.free += msg->size; 821 822 if (msg->port_msg.tracking) { 823 msg->cancelled = nxt_port_mmap_tracking_read(task, msg) == 0; 824 825 } else { 826 msg->cancelled = 0; 827 } 828 829 if (nxt_slow_path(msg->port_msg.nf != 0)) { 830 831 fmsg = nxt_port_frag_find(task, port, msg); 832 833 if (nxt_slow_path(fmsg == NULL)) { 834 goto fmsg_failed; 835 } 836 837 if (nxt_fast_path(fmsg->cancelled == 0)) { 838 839 if (msg->port_msg.mmap) { 840 nxt_port_mmap_read(task, msg); 841 } 842 843 nxt_buf_chain_add(&fmsg->buf, msg->buf); 844 845 fmsg->size += msg->size; 846 msg->buf = NULL; 847 b = NULL; 848 849 if (nxt_fast_path(msg->port_msg.mf == 0)) { 850 851 b = fmsg->buf; 852 853 port->handler(task, fmsg); 854 855 msg->buf = fmsg->buf; 856 msg->fd = fmsg->fd; 857 858 /* 859 * To disable instant completion or buffer re-usage, 860 * handler should reset 'msg.buf'. 861 */ 862 if (!msg->port_msg.mmap && msg->buf == b) { 863 nxt_port_buf_free(port, b); 864 } 865 } 866 } 867 868 if (nxt_fast_path(msg->port_msg.mf == 0)) { 869 nxt_mp_free(port->mem_pool, fmsg); 870 } 871 } else { 872 if (nxt_slow_path(msg->port_msg.mf != 0)) { 873 874 if (msg->port_msg.mmap && msg->cancelled == 0) { 875 nxt_port_mmap_read(task, msg); 876 b = msg->buf; 877 } 878 879 fmsg = nxt_port_frag_start(task, port, msg); 880 881 if (nxt_slow_path(fmsg == NULL)) { 882 goto fmsg_failed; 883 } 884 885 fmsg->port_msg.nf = 0; 886 fmsg->port_msg.mf = 0; 887 888 if (nxt_fast_path(msg->cancelled == 0)) { 889 msg->buf = NULL; 890 msg->fd = -1; 891 b = NULL; 892 893 } else { 894 if (msg->fd != -1) { 895 nxt_fd_close(msg->fd); 896 } 897 } 898 } else { 899 if (nxt_fast_path(msg->cancelled == 0)) { 900 901 if (msg->port_msg.mmap) { 902 nxt_port_mmap_read(task, msg); 903 b = msg->buf; 904 } 905 906 port->handler(task, msg); 907 } 908 } 909 } 910 911 fmsg_failed: 912 913 if (msg->port_msg.mmap && orig_b != b) { 914 915 /* 916 * To disable instant buffer completion, 917 * handler should reset 'msg->buf'. 918 */ 919 if (msg->buf == b) { 920 /* complete mmap buffers */ 921 while (b != NULL) { 922 nxt_debug(task, "complete buffer %p", b); 923 924 nxt_work_queue_add(port->socket.read_work_queue, 925 b->completion_handler, task, b, b->parent); 926 927 next = b->next; 928 b->next = NULL; 929 b = next; 930 } 931 } 932 933 /* restore original buf */ 934 msg->buf = orig_b; 935 } 936 } 937 938 939 static nxt_buf_t * 940 nxt_port_buf_alloc(nxt_port_t *port) 941 { 942 nxt_buf_t *b; 943 944 if (port->free_bufs != NULL) { 945 b = port->free_bufs; 946 port->free_bufs = b->next; 947 948 b->mem.pos = b->mem.start; 949 b->mem.free = b->mem.start; 950 b->next = NULL; 951 } else { 952 b = nxt_buf_mem_alloc(port->mem_pool, port->max_size, 0); 953 if (nxt_slow_path(b == NULL)) { 954 return NULL; 955 } 956 } 957 958 return b; 959 } 960 961 962 static void 963 nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b) 964 { 965 nxt_buf_chain_add(&b, port->free_bufs); 966 port->free_bufs = b; 967 } 968 969 970 static void 971 nxt_port_error_handler(nxt_task_t *task, void *obj, void *data) 972 { 973 int use_delta; 974 nxt_buf_t *b, *next; 975 nxt_port_t *port; 976 nxt_work_queue_t *wq; 977 nxt_port_send_msg_t *msg; 978 979 nxt_debug(task, "port error handler %p", obj); 980 /* TODO */ 981 982 port = nxt_container_of(obj, nxt_port_t, socket); 983 984 use_delta = 0; 985 986 if (obj == data) { 987 use_delta--; 988 } 989 990 wq = &task->thread->engine->fast_work_queue; 991 992 nxt_thread_mutex_lock(&port->write_mutex); 993 994 nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) { 995 996 for (b = msg->buf; b != NULL; b = next) { 997 next = b->next; 998 b->next = NULL; 999 1000 if (nxt_buf_is_sync(b)) { 1001 continue; 1002 } 1003 1004 nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); 1005 } 1006 1007 nxt_queue_remove(&msg->link); 1008 use_delta--; 1009 1010 nxt_port_release_send_msg(msg); 1011 1012 } nxt_queue_loop; 1013 1014 nxt_thread_mutex_unlock(&port->write_mutex); 1015 1016 if (use_delta != 0) { 1017 nxt_port_use(task, port, use_delta); 1018 } 1019 } 1020