nxt_port_socket.c (389:3f222d4a7df8) nxt_port_socket.c (423:449f2a9c5e62)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8

--- 209 unchanged lines hidden (view full) ---

218 return msg;
219 }
220
221 return nxt_queue_link_data(lnk, nxt_port_send_msg_t, link);
222}
223
224
225nxt_int_t
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8

--- 209 unchanged lines hidden (view full) ---

218 return msg;
219 }
220
221 return nxt_queue_link_data(lnk, nxt_port_send_msg_t, link);
222}
223
224
225nxt_int_t
226nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
227 nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b)
226nxt_port_socket_twrite(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
227 nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b,
228 void *tracking)
228{
229 nxt_port_send_msg_t msg, *res;
230
231 msg.link.next = NULL;
232 msg.link.prev = NULL;
233
234 msg.buf = b;
235 msg.fd = fd;
236 msg.close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0;
237 msg.share = 0;
238
229{
230 nxt_port_send_msg_t msg, *res;
231
232 msg.link.next = NULL;
233 msg.link.prev = NULL;
234
235 msg.buf = b;
236 msg.fd = fd;
237 msg.close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0;
238 msg.share = 0;
239
240 if (tracking != NULL) {
241 nxt_port_mmap_tracking_write(msg.tracking_msg, tracking);
242 }
243
239 msg.port_msg.stream = stream;
240 msg.port_msg.pid = nxt_pid;
241 msg.port_msg.reply_port = reply_port;
242 msg.port_msg.type = type & NXT_PORT_MSG_MASK;
243 msg.port_msg.last = (type & NXT_PORT_MSG_LAST) != 0;
244 msg.port_msg.mmap = 0;
245 msg.port_msg.nf = 0;
246 msg.port_msg.mf = 0;
244 msg.port_msg.stream = stream;
245 msg.port_msg.pid = nxt_pid;
246 msg.port_msg.reply_port = reply_port;
247 msg.port_msg.type = type & NXT_PORT_MSG_MASK;
248 msg.port_msg.last = (type & NXT_PORT_MSG_LAST) != 0;
249 msg.port_msg.mmap = 0;
250 msg.port_msg.nf = 0;
251 msg.port_msg.mf = 0;
252 msg.port_msg.tracking = tracking != NULL;
247
248 msg.work.data = NULL;
249
250 if (port->socket.write_ready) {
251 nxt_port_write_handler(task, &port->socket, &msg);
252 } else {
253 nxt_thread_mutex_lock(&port->write_mutex);
254

--- 77 unchanged lines hidden (view full) ---

332 m = nxt_port_mmap_get_method(task, port, msg->buf);
333
334 if (m == NXT_PORT_METHOD_MMAP) {
335 sb.limit = (1ULL << 31) - 1;
336 sb.nmax = nxt_min(NXT_IOBUF_MAX * 10 - 1,
337 port->max_size / PORT_MMAP_MIN_SIZE);
338 }
339
253
254 msg.work.data = NULL;
255
256 if (port->socket.write_ready) {
257 nxt_port_write_handler(task, &port->socket, &msg);
258 } else {
259 nxt_thread_mutex_lock(&port->write_mutex);
260

--- 77 unchanged lines hidden (view full) ---

338 m = nxt_port_mmap_get_method(task, port, msg->buf);
339
340 if (m == NXT_PORT_METHOD_MMAP) {
341 sb.limit = (1ULL << 31) - 1;
342 sb.nmax = nxt_min(NXT_IOBUF_MAX * 10 - 1,
343 port->max_size / PORT_MMAP_MIN_SIZE);
344 }
345
346 if (msg->port_msg.tracking) {
347 iov[0].iov_len += sizeof(msg->tracking_msg);
348 }
349
340 nxt_sendbuf_mem_coalesce(task, &sb);
341
342 plain_size = sb.size;
343
344 /*
345 * Send through mmap enabled only when payload
346 * is bigger than PORT_MMAP_MIN_SIZE.
347 */

--- 32 unchanged lines hidden (view full) ---

380
381 /*
382 * A file descriptor is sent only
383 * in the first message of a stream.
384 */
385 msg->fd = -1;
386 msg->share += n;
387 msg->port_msg.nf = 1;
350 nxt_sendbuf_mem_coalesce(task, &sb);
351
352 plain_size = sb.size;
353
354 /*
355 * Send through mmap enabled only when payload
356 * is bigger than PORT_MMAP_MIN_SIZE.
357 */

--- 32 unchanged lines hidden (view full) ---

390
391 /*
392 * A file descriptor is sent only
393 * in the first message of a stream.
394 */
395 msg->fd = -1;
396 msg->share += n;
397 msg->port_msg.nf = 1;
398 msg->port_msg.tracking = 0;
388
389 if (msg->share >= port->max_share) {
390 msg->share = 0;
391
392 if (msg->link.next != NULL) {
393 nxt_queue_remove(&msg->link);
394 use_delta--;
395 }

--- 276 unchanged lines hidden (view full) ---

672 nxt_port_recv_msg_t *msg)
673{
674 nxt_buf_t *b, *orig_b;
675 nxt_port_recv_msg_t *fmsg;
676
677 if (nxt_slow_path(msg->size < sizeof(nxt_port_msg_t))) {
678 nxt_log(task, NXT_LOG_CRIT,
679 "port %d: too small message:%uz", port->socket.fd, msg->size);
399
400 if (msg->share >= port->max_share) {
401 msg->share = 0;
402
403 if (msg->link.next != NULL) {
404 nxt_queue_remove(&msg->link);
405 use_delta--;
406 }

--- 276 unchanged lines hidden (view full) ---

683 nxt_port_recv_msg_t *msg)
684{
685 nxt_buf_t *b, *orig_b;
686 nxt_port_recv_msg_t *fmsg;
687
688 if (nxt_slow_path(msg->size < sizeof(nxt_port_msg_t))) {
689 nxt_log(task, NXT_LOG_CRIT,
690 "port %d: too small message:%uz", port->socket.fd, msg->size);
680 goto fail;
691
692 if (msg->fd != -1) {
693 nxt_fd_close(msg->fd);
694 }
695
696 return;
681 }
682
683 /* adjust size to actual buffer used size */
684 msg->size -= sizeof(nxt_port_msg_t);
685
686 b = orig_b = msg->buf;
687 b->mem.free += msg->size;
688
697 }
698
699 /* adjust size to actual buffer used size */
700 msg->size -= sizeof(nxt_port_msg_t);
701
702 b = orig_b = msg->buf;
703 b->mem.free += msg->size;
704
689 if (msg->port_msg.mmap) {
690 nxt_port_mmap_read(task, port, msg);
691 b = msg->buf;
705 if (msg->port_msg.tracking) {
706 msg->cancelled = nxt_port_mmap_tracking_read(task, msg) == 0;
707
708 } else {
709 msg->cancelled = 0;
692 }
693
694 if (nxt_slow_path(msg->port_msg.nf != 0)) {
710 }
711
712 if (nxt_slow_path(msg->port_msg.nf != 0)) {
713
695 fmsg = nxt_port_frag_find(task, port, msg->port_msg.stream,
696 msg->port_msg.mf == 0);
697
714 fmsg = nxt_port_frag_find(task, port, msg->port_msg.stream,
715 msg->port_msg.mf == 0);
716
698 if (nxt_slow_path(fmsg == NULL)) {
699 nxt_assert(fmsg != NULL);
700 }
717 nxt_assert(fmsg != NULL);
701
718
702 nxt_buf_chain_add(&fmsg->buf, msg->buf);
719 if (nxt_fast_path(fmsg->cancelled == 0)) {
703
720
704 fmsg->size += msg->size;
721 if (msg->port_msg.mmap) {
722 nxt_port_mmap_read(task, msg);
723 b = msg->buf;
724 }
705
725
706 msg->buf = NULL;
707 b = NULL;
726 nxt_buf_chain_add(&fmsg->buf, msg->buf);
708
727
709 if (nxt_fast_path(msg->port_msg.mf == 0)) {
710 b = fmsg->buf;
728 fmsg->size += msg->size;
729 msg->buf = NULL;
730 b = NULL;
711
731
712 port->handler(task, fmsg);
732 if (nxt_fast_path(msg->port_msg.mf == 0)) {
713
733
714 msg->buf = fmsg->buf;
715 msg->fd = fmsg->fd;
734 b = fmsg->buf;
716
735
736 port->handler(task, fmsg);
737
738 msg->buf = fmsg->buf;
739 msg->fd = fmsg->fd;
740 }
741 }
742
743 if (nxt_fast_path(msg->port_msg.mf == 0)) {
717 nxt_mp_free(port->mem_pool, fmsg);
718 }
719 } else {
720 if (nxt_slow_path(msg->port_msg.mf != 0)) {
744 nxt_mp_free(port->mem_pool, fmsg);
745 }
746 } else {
747 if (nxt_slow_path(msg->port_msg.mf != 0)) {
721 fmsg = nxt_port_frag_start(task, port, msg);
722
748
723 if (nxt_slow_path(fmsg == NULL)) {
724 nxt_assert(fmsg != NULL);
749 if (msg->port_msg.mmap && msg->cancelled == 0) {
750 nxt_port_mmap_read(task, msg);
751 b = msg->buf;
725 }
726
752 }
753
754 fmsg = nxt_port_frag_start(task, port, msg);
755
756 nxt_assert(fmsg != NULL);
757
727 fmsg->port_msg.nf = 0;
728 fmsg->port_msg.mf = 0;
729
758 fmsg->port_msg.nf = 0;
759 fmsg->port_msg.mf = 0;
760
730 msg->buf = NULL;
731 msg->fd = -1;
732 b = NULL;
761 if (nxt_fast_path(msg->cancelled == 0)) {
762 msg->buf = NULL;
763 msg->fd = -1;
764 b = NULL;
765
766 } else {
767 if (msg->fd != -1) {
768 nxt_fd_close(msg->fd);
769 }
770 }
733 } else {
771 } else {
734 port->handler(task, msg);
772 if (nxt_fast_path(msg->cancelled == 0)) {
773
774 if (msg->port_msg.mmap) {
775 nxt_port_mmap_read(task, msg);
776 b = msg->buf;
777 }
778
779 port->handler(task, msg);
780 }
735 }
736 }
737
738 if (msg->port_msg.mmap && orig_b != b) {
739
740 /*
741 * To disable instant buffer completion,
742 * handler should reset 'msg->buf'.

--- 6 unchanged lines hidden (view full) ---

749 nxt_work_queue_add(port->socket.read_work_queue,
750 b->completion_handler, task, b, b->parent);
751 }
752 }
753
754 /* restore original buf */
755 msg->buf = orig_b;
756 }
781 }
782 }
783
784 if (msg->port_msg.mmap && orig_b != b) {
785
786 /*
787 * To disable instant buffer completion,
788 * handler should reset 'msg->buf'.

--- 6 unchanged lines hidden (view full) ---

795 nxt_work_queue_add(port->socket.read_work_queue,
796 b->completion_handler, task, b, b->parent);
797 }
798 }
799
800 /* restore original buf */
801 msg->buf = orig_b;
802 }
757
758 return;
759
760fail:
761
762 if (msg->fd != -1) {
763 nxt_fd_close(msg->fd);
764 }
765}
766
767
768static nxt_buf_t *
769nxt_port_buf_alloc(nxt_port_t *port)
770{
771 nxt_buf_t *b;
772

--- 73 unchanged lines hidden ---
803}
804
805
806static nxt_buf_t *
807nxt_port_buf_alloc(nxt_port_t *port)
808{
809 nxt_buf_t *b;
810

--- 73 unchanged lines hidden ---