Deleted
Added
nxt_port_socket.c (389:3f222d4a7df8) | nxt_port_socket.c (423:449f2a9c5e62) |
---|---|
1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7#include <nxt_main.h> 8 --- 209 unchanged lines hidden (view full) --- 218 return msg; 219 } 220 221 return nxt_queue_link_data(lnk, nxt_port_send_msg_t, link); 222} 223 224 225nxt_int_t | 1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7#include <nxt_main.h> 8 --- 209 unchanged lines hidden (view full) --- 218 return msg; 219 } 220 221 return nxt_queue_link_data(lnk, nxt_port_send_msg_t, link); 222} 223 224 225nxt_int_t |
226nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, 227 nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b) | 226nxt_port_socket_twrite(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, 227 nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b, 228 void *tracking) |
228{ 229 nxt_port_send_msg_t msg, *res; 230 231 msg.link.next = NULL; 232 msg.link.prev = NULL; 233 234 msg.buf = b; 235 msg.fd = fd; 236 msg.close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0; 237 msg.share = 0; 238 | 229{ 230 nxt_port_send_msg_t msg, *res; 231 232 msg.link.next = NULL; 233 msg.link.prev = NULL; 234 235 msg.buf = b; 236 msg.fd = fd; 237 msg.close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0; 238 msg.share = 0; 239 |
240 if (tracking != NULL) { 241 nxt_port_mmap_tracking_write(msg.tracking_msg, tracking); 242 } 243 |
|
239 msg.port_msg.stream = stream; 240 msg.port_msg.pid = nxt_pid; 241 msg.port_msg.reply_port = reply_port; 242 msg.port_msg.type = type & NXT_PORT_MSG_MASK; 243 msg.port_msg.last = (type & NXT_PORT_MSG_LAST) != 0; 244 msg.port_msg.mmap = 0; 245 msg.port_msg.nf = 0; 246 msg.port_msg.mf = 0; | 244 msg.port_msg.stream = stream; 245 msg.port_msg.pid = nxt_pid; 246 msg.port_msg.reply_port = reply_port; 247 msg.port_msg.type = type & NXT_PORT_MSG_MASK; 248 msg.port_msg.last = (type & NXT_PORT_MSG_LAST) != 0; 249 msg.port_msg.mmap = 0; 250 msg.port_msg.nf = 0; 251 msg.port_msg.mf = 0; |
252 msg.port_msg.tracking = tracking != NULL; |
|
247 248 msg.work.data = NULL; 249 250 if (port->socket.write_ready) { 251 nxt_port_write_handler(task, &port->socket, &msg); 252 } else { 253 nxt_thread_mutex_lock(&port->write_mutex); 254 --- 77 unchanged lines hidden (view full) --- 332 m = nxt_port_mmap_get_method(task, port, msg->buf); 333 334 if (m == NXT_PORT_METHOD_MMAP) { 335 sb.limit = (1ULL << 31) - 1; 336 sb.nmax = nxt_min(NXT_IOBUF_MAX * 10 - 1, 337 port->max_size / PORT_MMAP_MIN_SIZE); 338 } 339 | 253 254 msg.work.data = NULL; 255 256 if (port->socket.write_ready) { 257 nxt_port_write_handler(task, &port->socket, &msg); 258 } else { 259 nxt_thread_mutex_lock(&port->write_mutex); 260 --- 77 unchanged lines hidden (view full) --- 338 m = nxt_port_mmap_get_method(task, port, msg->buf); 339 340 if (m == NXT_PORT_METHOD_MMAP) { 341 sb.limit = (1ULL << 31) - 1; 342 sb.nmax = nxt_min(NXT_IOBUF_MAX * 10 - 1, 343 port->max_size / PORT_MMAP_MIN_SIZE); 344 } 345 |
346 if (msg->port_msg.tracking) { 347 iov[0].iov_len += sizeof(msg->tracking_msg); 348 } 349 |
|
340 nxt_sendbuf_mem_coalesce(task, &sb); 341 342 plain_size = sb.size; 343 344 /* 345 * Send through mmap enabled only when payload 346 * is bigger than PORT_MMAP_MIN_SIZE. 347 */ --- 32 unchanged lines hidden (view full) --- 380 381 /* 382 * A file descriptor is sent only 383 * in the first message of a stream. 384 */ 385 msg->fd = -1; 386 msg->share += n; 387 msg->port_msg.nf = 1; | 350 nxt_sendbuf_mem_coalesce(task, &sb); 351 352 plain_size = sb.size; 353 354 /* 355 * Send through mmap enabled only when payload 356 * is bigger than PORT_MMAP_MIN_SIZE. 357 */ --- 32 unchanged lines hidden (view full) --- 390 391 /* 392 * A file descriptor is sent only 393 * in the first message of a stream. 394 */ 395 msg->fd = -1; 396 msg->share += n; 397 msg->port_msg.nf = 1; |
398 msg->port_msg.tracking = 0; |
|
388 389 if (msg->share >= port->max_share) { 390 msg->share = 0; 391 392 if (msg->link.next != NULL) { 393 nxt_queue_remove(&msg->link); 394 use_delta--; 395 } --- 276 unchanged lines hidden (view full) --- 672 nxt_port_recv_msg_t *msg) 673{ 674 nxt_buf_t *b, *orig_b; 675 nxt_port_recv_msg_t *fmsg; 676 677 if (nxt_slow_path(msg->size < sizeof(nxt_port_msg_t))) { 678 nxt_log(task, NXT_LOG_CRIT, 679 "port %d: too small message:%uz", port->socket.fd, msg->size); | 399 400 if (msg->share >= port->max_share) { 401 msg->share = 0; 402 403 if (msg->link.next != NULL) { 404 nxt_queue_remove(&msg->link); 405 use_delta--; 406 } --- 276 unchanged lines hidden (view full) --- 683 nxt_port_recv_msg_t *msg) 684{ 685 nxt_buf_t *b, *orig_b; 686 nxt_port_recv_msg_t *fmsg; 687 688 if (nxt_slow_path(msg->size < sizeof(nxt_port_msg_t))) { 689 nxt_log(task, NXT_LOG_CRIT, 690 "port %d: too small message:%uz", port->socket.fd, msg->size); |
680 goto fail; | 691 692 if (msg->fd != -1) { 693 nxt_fd_close(msg->fd); 694 } 695 696 return; |
681 } 682 683 /* adjust size to actual buffer used size */ 684 msg->size -= sizeof(nxt_port_msg_t); 685 686 b = orig_b = msg->buf; 687 b->mem.free += msg->size; 688 | 697 } 698 699 /* adjust size to actual buffer used size */ 700 msg->size -= sizeof(nxt_port_msg_t); 701 702 b = orig_b = msg->buf; 703 b->mem.free += msg->size; 704 |
689 if (msg->port_msg.mmap) { 690 nxt_port_mmap_read(task, port, msg); 691 b = msg->buf; | 705 if (msg->port_msg.tracking) { 706 msg->cancelled = nxt_port_mmap_tracking_read(task, msg) == 0; 707 708 } else { 709 msg->cancelled = 0; |
692 } 693 694 if (nxt_slow_path(msg->port_msg.nf != 0)) { | 710 } 711 712 if (nxt_slow_path(msg->port_msg.nf != 0)) { |
713 |
|
695 fmsg = nxt_port_frag_find(task, port, msg->port_msg.stream, 696 msg->port_msg.mf == 0); 697 | 714 fmsg = nxt_port_frag_find(task, port, msg->port_msg.stream, 715 msg->port_msg.mf == 0); 716 |
698 if (nxt_slow_path(fmsg == NULL)) { 699 nxt_assert(fmsg != NULL); 700 } | 717 nxt_assert(fmsg != NULL); |
701 | 718 |
702 nxt_buf_chain_add(&fmsg->buf, msg->buf); | 719 if (nxt_fast_path(fmsg->cancelled == 0)) { |
703 | 720 |
704 fmsg->size += msg->size; | 721 if (msg->port_msg.mmap) { 722 nxt_port_mmap_read(task, msg); 723 b = msg->buf; 724 } |
705 | 725 |
706 msg->buf = NULL; 707 b = NULL; | 726 nxt_buf_chain_add(&fmsg->buf, msg->buf); |
708 | 727 |
709 if (nxt_fast_path(msg->port_msg.mf == 0)) { 710 b = fmsg->buf; | 728 fmsg->size += msg->size; 729 msg->buf = NULL; 730 b = NULL; |
711 | 731 |
712 port->handler(task, fmsg); | 732 if (nxt_fast_path(msg->port_msg.mf == 0)) { |
713 | 733 |
714 msg->buf = fmsg->buf; 715 msg->fd = fmsg->fd; | 734 b = fmsg->buf; |
716 | 735 |
736 port->handler(task, fmsg); 737 738 msg->buf = fmsg->buf; 739 msg->fd = fmsg->fd; 740 } 741 } 742 743 if (nxt_fast_path(msg->port_msg.mf == 0)) { |
|
717 nxt_mp_free(port->mem_pool, fmsg); 718 } 719 } else { 720 if (nxt_slow_path(msg->port_msg.mf != 0)) { | 744 nxt_mp_free(port->mem_pool, fmsg); 745 } 746 } else { 747 if (nxt_slow_path(msg->port_msg.mf != 0)) { |
721 fmsg = nxt_port_frag_start(task, port, msg); | |
722 | 748 |
723 if (nxt_slow_path(fmsg == NULL)) { 724 nxt_assert(fmsg != NULL); | 749 if (msg->port_msg.mmap && msg->cancelled == 0) { 750 nxt_port_mmap_read(task, msg); 751 b = msg->buf; |
725 } 726 | 752 } 753 |
754 fmsg = nxt_port_frag_start(task, port, msg); 755 756 nxt_assert(fmsg != NULL); 757 |
|
727 fmsg->port_msg.nf = 0; 728 fmsg->port_msg.mf = 0; 729 | 758 fmsg->port_msg.nf = 0; 759 fmsg->port_msg.mf = 0; 760 |
730 msg->buf = NULL; 731 msg->fd = -1; 732 b = NULL; | 761 if (nxt_fast_path(msg->cancelled == 0)) { 762 msg->buf = NULL; 763 msg->fd = -1; 764 b = NULL; 765 766 } else { 767 if (msg->fd != -1) { 768 nxt_fd_close(msg->fd); 769 } 770 } |
733 } else { | 771 } else { |
734 port->handler(task, msg); | 772 if (nxt_fast_path(msg->cancelled == 0)) { 773 774 if (msg->port_msg.mmap) { 775 nxt_port_mmap_read(task, msg); 776 b = msg->buf; 777 } 778 779 port->handler(task, msg); 780 } |
735 } 736 } 737 738 if (msg->port_msg.mmap && orig_b != b) { 739 740 /* 741 * To disable instant buffer completion, 742 * handler should reset 'msg->buf'. --- 6 unchanged lines hidden (view full) --- 749 nxt_work_queue_add(port->socket.read_work_queue, 750 b->completion_handler, task, b, b->parent); 751 } 752 } 753 754 /* restore original buf */ 755 msg->buf = orig_b; 756 } | 781 } 782 } 783 784 if (msg->port_msg.mmap && orig_b != b) { 785 786 /* 787 * To disable instant buffer completion, 788 * handler should reset 'msg->buf'. --- 6 unchanged lines hidden (view full) --- 795 nxt_work_queue_add(port->socket.read_work_queue, 796 b->completion_handler, task, b, b->parent); 797 } 798 } 799 800 /* restore original buf */ 801 msg->buf = orig_b; 802 } |
757 758 return; 759 760fail: 761 762 if (msg->fd != -1) { 763 nxt_fd_close(msg->fd); 764 } | |
765} 766 767 768static nxt_buf_t * 769nxt_port_buf_alloc(nxt_port_t *port) 770{ 771 nxt_buf_t *b; 772 --- 73 unchanged lines hidden --- | 803} 804 805 806static nxt_buf_t * 807nxt_port_buf_alloc(nxt_port_t *port) 808{ 809 nxt_buf_t *b; 810 --- 73 unchanged lines hidden --- |