xref: /unit/src/nxt_port_socket.c (revision 1907)
111Sigor@sysoev.ru 
211Sigor@sysoev.ru /*
311Sigor@sysoev.ru  * Copyright (C) Igor Sysoev
411Sigor@sysoev.ru  * Copyright (C) NGINX, Inc.
511Sigor@sysoev.ru  */
611Sigor@sysoev.ru 
711Sigor@sysoev.ru #include <nxt_main.h>
81555Smax.romanov@nginx.com #include <nxt_port_queue.h>
91832Smax.romanov@nginx.com #include <nxt_port_memory_int.h>
1011Sigor@sysoev.ru 
1111Sigor@sysoev.ru 
121832Smax.romanov@nginx.com #define NXT_PORT_MAX_ENQUEUE_BUF_SIZE \
131832Smax.romanov@nginx.com           (int) (NXT_PORT_QUEUE_MSG_SIZE - sizeof(nxt_port_msg_t))
141832Smax.romanov@nginx.com 
151832Smax.romanov@nginx.com 
161832Smax.romanov@nginx.com static nxt_bool_t nxt_port_can_enqueue_buf(nxt_buf_t *b);
171832Smax.romanov@nginx.com static uint8_t nxt_port_enqueue_buf(nxt_task_t *task, nxt_port_msg_t *pm,
181832Smax.romanov@nginx.com     void *qbuf, nxt_buf_t *b);
191125Smax.romanov@nginx.com static nxt_int_t nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port,
201125Smax.romanov@nginx.com     nxt_port_send_msg_t *msg);
211125Smax.romanov@nginx.com static nxt_port_send_msg_t *nxt_port_msg_alloc(nxt_port_send_msg_t *m);
2211Sigor@sysoev.ru static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data);
231125Smax.romanov@nginx.com static nxt_port_send_msg_t *nxt_port_msg_first(nxt_port_t *port);
24592Sigor@sysoev.ru static nxt_buf_t *nxt_port_buf_completion(nxt_task_t *task,
25592Sigor@sysoev.ru     nxt_work_queue_t *wq, nxt_buf_t *b, size_t sent, nxt_bool_t mmap_mode);
261125Smax.romanov@nginx.com static nxt_port_send_msg_t *nxt_port_msg_insert_tail(nxt_port_t *port,
271125Smax.romanov@nginx.com     nxt_port_send_msg_t *msg);
2811Sigor@sysoev.ru static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data);
291555Smax.romanov@nginx.com static void nxt_port_queue_read_handler(nxt_task_t *task, void *obj,
301555Smax.romanov@nginx.com     void *data);
3111Sigor@sysoev.ru static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
3282Smax.romanov@nginx.com     nxt_port_recv_msg_t *msg);
3311Sigor@sysoev.ru static nxt_buf_t *nxt_port_buf_alloc(nxt_port_t *port);
3411Sigor@sysoev.ru static void nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b);
3511Sigor@sysoev.ru static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data);
3611Sigor@sysoev.ru 
3711Sigor@sysoev.ru 
3814Sigor@sysoev.ru nxt_int_t
3914Sigor@sysoev.ru nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port, size_t max_size)
4011Sigor@sysoev.ru {
4165Sigor@sysoev.ru     nxt_int_t     sndbuf, rcvbuf, size;
4265Sigor@sysoev.ru     nxt_socket_t  snd, rcv;
4311Sigor@sysoev.ru 
4414Sigor@sysoev.ru     port->socket.task = task;
4514Sigor@sysoev.ru 
4614Sigor@sysoev.ru     port->pair[0] = -1;
4714Sigor@sysoev.ru     port->pair[1] = -1;
4814Sigor@sysoev.ru 
4913Sigor@sysoev.ru     if (nxt_slow_path(nxt_socketpair_create(task, port->pair) != NXT_OK)) {
5011Sigor@sysoev.ru         goto socketpair_fail;
5111Sigor@sysoev.ru     }
5211Sigor@sysoev.ru 
5311Sigor@sysoev.ru     snd = port->pair[1];
5411Sigor@sysoev.ru 
5513Sigor@sysoev.ru     sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF);
5611Sigor@sysoev.ru     if (nxt_slow_path(sndbuf < 0)) {
5711Sigor@sysoev.ru         goto getsockopt_fail;
5811Sigor@sysoev.ru     }
5911Sigor@sysoev.ru 
6011Sigor@sysoev.ru     rcv = port->pair[0];
6111Sigor@sysoev.ru 
6213Sigor@sysoev.ru     rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF);
6311Sigor@sysoev.ru     if (nxt_slow_path(rcvbuf < 0)) {
6411Sigor@sysoev.ru         goto getsockopt_fail;
6511Sigor@sysoev.ru     }
6611Sigor@sysoev.ru 
6711Sigor@sysoev.ru     if (max_size == 0) {
6811Sigor@sysoev.ru         max_size = 16 * 1024;
6911Sigor@sysoev.ru     }
7011Sigor@sysoev.ru 
7111Sigor@sysoev.ru     if ((size_t) sndbuf < max_size) {
7211Sigor@sysoev.ru         /*
7311Sigor@sysoev.ru          * On Unix domain sockets
7411Sigor@sysoev.ru          *   Linux uses 224K on both send and receive directions;
7511Sigor@sysoev.ru          *   FreeBSD, MacOSX, NetBSD, and OpenBSD use 2K buffer size
7611Sigor@sysoev.ru          *   on send direction and 4K buffer size on receive direction;
7711Sigor@sysoev.ru          *   Solaris uses 16K on send direction and 5K on receive direction.
7811Sigor@sysoev.ru          */
7913Sigor@sysoev.ru         (void) nxt_socket_setsockopt(task, snd, SOL_SOCKET, SO_SNDBUF,
8013Sigor@sysoev.ru                                      max_size);
8111Sigor@sysoev.ru 
8213Sigor@sysoev.ru         sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF);
8311Sigor@sysoev.ru         if (nxt_slow_path(sndbuf < 0)) {
8411Sigor@sysoev.ru             goto getsockopt_fail;
8511Sigor@sysoev.ru         }
8611Sigor@sysoev.ru 
8711Sigor@sysoev.ru         size = sndbuf * 4;
8811Sigor@sysoev.ru 
8911Sigor@sysoev.ru         if (rcvbuf < size) {
9013Sigor@sysoev.ru             (void) nxt_socket_setsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF,
9113Sigor@sysoev.ru                                          size);
9211Sigor@sysoev.ru 
9313Sigor@sysoev.ru             rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF);
9411Sigor@sysoev.ru             if (nxt_slow_path(rcvbuf < 0)) {
9511Sigor@sysoev.ru                 goto getsockopt_fail;
9611Sigor@sysoev.ru             }
9711Sigor@sysoev.ru         }
9811Sigor@sysoev.ru     }
9911Sigor@sysoev.ru 
10011Sigor@sysoev.ru     port->max_size = nxt_min(max_size, (size_t) sndbuf);
10111Sigor@sysoev.ru     port->max_share = (64 * 1024);
10211Sigor@sysoev.ru 
10314Sigor@sysoev.ru     return NXT_OK;
10411Sigor@sysoev.ru 
10511Sigor@sysoev.ru getsockopt_fail:
10611Sigor@sysoev.ru 
10713Sigor@sysoev.ru     nxt_socket_close(task, port->pair[0]);
10813Sigor@sysoev.ru     nxt_socket_close(task, port->pair[1]);
10911Sigor@sysoev.ru 
11011Sigor@sysoev.ru socketpair_fail:
11111Sigor@sysoev.ru 
11214Sigor@sysoev.ru     return NXT_ERROR;
11311Sigor@sysoev.ru }
11411Sigor@sysoev.ru 
11511Sigor@sysoev.ru 
11611Sigor@sysoev.ru void
11711Sigor@sysoev.ru nxt_port_destroy(nxt_port_t *port)
11811Sigor@sysoev.ru {
11913Sigor@sysoev.ru     nxt_socket_close(port->socket.task, port->socket.fd);
12065Sigor@sysoev.ru     nxt_mp_destroy(port->mem_pool);
12111Sigor@sysoev.ru }
12211Sigor@sysoev.ru 
12311Sigor@sysoev.ru 
12411Sigor@sysoev.ru void
12511Sigor@sysoev.ru nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port)
12611Sigor@sysoev.ru {
12711Sigor@sysoev.ru     port->socket.fd = port->pair[1];
12811Sigor@sysoev.ru     port->socket.log = &nxt_main_log;
12911Sigor@sysoev.ru     port->socket.write_ready = 1;
13011Sigor@sysoev.ru 
131141Smax.romanov@nginx.com     port->engine = task->thread->engine;
132141Smax.romanov@nginx.com 
133141Smax.romanov@nginx.com     port->socket.write_work_queue = &port->engine->fast_work_queue;
13411Sigor@sysoev.ru     port->socket.write_handler = nxt_port_write_handler;
13511Sigor@sysoev.ru     port->socket.error_handler = nxt_port_error_handler;
13611Sigor@sysoev.ru }
13711Sigor@sysoev.ru 
13811Sigor@sysoev.ru 
13911Sigor@sysoev.ru void
14011Sigor@sysoev.ru nxt_port_write_close(nxt_port_t *port)
14111Sigor@sysoev.ru {
14213Sigor@sysoev.ru     nxt_socket_close(port->socket.task, port->pair[1]);
14311Sigor@sysoev.ru     port->pair[1] = -1;
14411Sigor@sysoev.ru }
14511Sigor@sysoev.ru 
14611Sigor@sysoev.ru 
147122Smax.romanov@nginx.com static void
1481125Smax.romanov@nginx.com nxt_port_release_send_msg(nxt_port_send_msg_t *msg)
149122Smax.romanov@nginx.com {
1501125Smax.romanov@nginx.com     if (msg->allocated) {
1511125Smax.romanov@nginx.com         nxt_free(msg);
152344Smax.romanov@nginx.com     }
153122Smax.romanov@nginx.com }
154122Smax.romanov@nginx.com 
155122Smax.romanov@nginx.com 
15611Sigor@sysoev.ru nxt_int_t
1571555Smax.romanov@nginx.com nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
1581555Smax.romanov@nginx.com     nxt_fd_t fd, nxt_fd_t fd2, uint32_t stream, nxt_port_id_t reply_port,
1591555Smax.romanov@nginx.com     nxt_buf_t *b)
16011Sigor@sysoev.ru {
1611555Smax.romanov@nginx.com     int                  notify;
1621832Smax.romanov@nginx.com     uint8_t              qmsg_size;
1631125Smax.romanov@nginx.com     nxt_int_t            res;
1641125Smax.romanov@nginx.com     nxt_port_send_msg_t  msg;
1651832Smax.romanov@nginx.com     struct {
1661832Smax.romanov@nginx.com         nxt_port_msg_t   pm;
1671832Smax.romanov@nginx.com         uint8_t          buf[NXT_PORT_MAX_ENQUEUE_BUF_SIZE];
1681832Smax.romanov@nginx.com     } qmsg;
16911Sigor@sysoev.ru 
170344Smax.romanov@nginx.com     msg.link.next = NULL;
171344Smax.romanov@nginx.com     msg.link.prev = NULL;
172122Smax.romanov@nginx.com 
173344Smax.romanov@nginx.com     msg.buf = b;
1741125Smax.romanov@nginx.com     msg.share = 0;
1751558Smax.romanov@nginx.com     msg.fd[0] = fd;
1761558Smax.romanov@nginx.com     msg.fd[1] = fd2;
177344Smax.romanov@nginx.com     msg.close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0;
1781125Smax.romanov@nginx.com     msg.allocated = 0;
17911Sigor@sysoev.ru 
180344Smax.romanov@nginx.com     msg.port_msg.stream = stream;
181344Smax.romanov@nginx.com     msg.port_msg.pid = nxt_pid;
182344Smax.romanov@nginx.com     msg.port_msg.reply_port = reply_port;
183344Smax.romanov@nginx.com     msg.port_msg.type = type & NXT_PORT_MSG_MASK;
184344Smax.romanov@nginx.com     msg.port_msg.last = (type & NXT_PORT_MSG_LAST) != 0;
185344Smax.romanov@nginx.com     msg.port_msg.mmap = 0;
186352Smax.romanov@nginx.com     msg.port_msg.nf = 0;
187352Smax.romanov@nginx.com     msg.port_msg.mf = 0;
1881555Smax.romanov@nginx.com 
1891555Smax.romanov@nginx.com     if (port->queue != NULL && type != _NXT_PORT_MSG_READ_QUEUE) {
1901555Smax.romanov@nginx.com 
1911832Smax.romanov@nginx.com         if (fd == -1 && nxt_port_can_enqueue_buf(b)) {
1921832Smax.romanov@nginx.com             qmsg.pm = msg.port_msg;
1931832Smax.romanov@nginx.com 
1941832Smax.romanov@nginx.com             qmsg_size = sizeof(qmsg.pm);
1951832Smax.romanov@nginx.com 
1961555Smax.romanov@nginx.com             if (b != NULL) {
1971832Smax.romanov@nginx.com                 qmsg_size += nxt_port_enqueue_buf(task, &qmsg.pm, qmsg.buf, b);
1981555Smax.romanov@nginx.com             }
1991555Smax.romanov@nginx.com 
2001832Smax.romanov@nginx.com             res = nxt_port_queue_send(port->queue, &qmsg, qmsg_size, &notify);
2011555Smax.romanov@nginx.com 
2021555Smax.romanov@nginx.com             nxt_debug(task, "port{%d,%d} %d: enqueue %d notify %d, %d",
2031555Smax.romanov@nginx.com                       (int) port->pid, (int) port->id, port->socket.fd,
2041832Smax.romanov@nginx.com                       (int) qmsg_size, notify, res);
2051832Smax.romanov@nginx.com 
2061832Smax.romanov@nginx.com             if (b != NULL && nxt_fast_path(res == NXT_OK)) {
2071832Smax.romanov@nginx.com                 if (qmsg.pm.mmap) {
2081832Smax.romanov@nginx.com                     b->is_port_mmap_sent = 1;
2091832Smax.romanov@nginx.com                 }
2101832Smax.romanov@nginx.com 
2111832Smax.romanov@nginx.com                 b->mem.pos = b->mem.free;
2121832Smax.romanov@nginx.com 
2131832Smax.romanov@nginx.com                 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2141832Smax.romanov@nginx.com                                    b->completion_handler, task, b, b->parent);
2151832Smax.romanov@nginx.com             }
2161555Smax.romanov@nginx.com 
2171555Smax.romanov@nginx.com             if (notify == 0) {
2181555Smax.romanov@nginx.com                 return res;
2191555Smax.romanov@nginx.com             }
2201555Smax.romanov@nginx.com 
2211555Smax.romanov@nginx.com             msg.port_msg.type = _NXT_PORT_MSG_READ_QUEUE;
2221555Smax.romanov@nginx.com             msg.buf = NULL;
2231555Smax.romanov@nginx.com 
2241555Smax.romanov@nginx.com         } else {
2251832Smax.romanov@nginx.com             qmsg.buf[0] = _NXT_PORT_MSG_READ_SOCKET;
2261555Smax.romanov@nginx.com 
2271832Smax.romanov@nginx.com             res = nxt_port_queue_send(port->queue, qmsg.buf, 1, &notify);
2281555Smax.romanov@nginx.com 
2291555Smax.romanov@nginx.com             nxt_debug(task, "port{%d,%d} %d: enqueue 1 notify %d, %d",
2301555Smax.romanov@nginx.com                       (int) port->pid, (int) port->id, port->socket.fd,
2311555Smax.romanov@nginx.com                       notify, res);
2321560Smax.romanov@nginx.com 
2331560Smax.romanov@nginx.com             if (nxt_slow_path(res == NXT_AGAIN)) {
2341560Smax.romanov@nginx.com                 return NXT_AGAIN;
2351560Smax.romanov@nginx.com             }
2361555Smax.romanov@nginx.com         }
2371555Smax.romanov@nginx.com     }
23811Sigor@sysoev.ru 
2391125Smax.romanov@nginx.com     res = nxt_port_msg_chk_insert(task, port, &msg);
2401125Smax.romanov@nginx.com     if (nxt_fast_path(res == NXT_DECLINED)) {
241344Smax.romanov@nginx.com         nxt_port_write_handler(task, &port->socket, &msg);
2421125Smax.romanov@nginx.com         res = NXT_OK;
24311Sigor@sysoev.ru     }
24411Sigor@sysoev.ru 
2451125Smax.romanov@nginx.com     return res;
2461125Smax.romanov@nginx.com }
2471125Smax.romanov@nginx.com 
2481125Smax.romanov@nginx.com 
2491832Smax.romanov@nginx.com static nxt_bool_t
2501832Smax.romanov@nginx.com nxt_port_can_enqueue_buf(nxt_buf_t *b)
2511832Smax.romanov@nginx.com {
2521832Smax.romanov@nginx.com     if (b == NULL) {
2531832Smax.romanov@nginx.com         return 1;
2541832Smax.romanov@nginx.com     }
2551832Smax.romanov@nginx.com 
2561832Smax.romanov@nginx.com     if (b->next != NULL) {
2571832Smax.romanov@nginx.com         return 0;
2581832Smax.romanov@nginx.com     }
2591832Smax.romanov@nginx.com 
2601832Smax.romanov@nginx.com     return (nxt_buf_mem_used_size(&b->mem) <= NXT_PORT_MAX_ENQUEUE_BUF_SIZE
2611832Smax.romanov@nginx.com             || nxt_buf_is_port_mmap(b));
2621832Smax.romanov@nginx.com }
2631832Smax.romanov@nginx.com 
2641832Smax.romanov@nginx.com 
2651832Smax.romanov@nginx.com static uint8_t
2661832Smax.romanov@nginx.com nxt_port_enqueue_buf(nxt_task_t *task, nxt_port_msg_t *pm, void *qbuf,
2671832Smax.romanov@nginx.com     nxt_buf_t *b)
2681832Smax.romanov@nginx.com {
2691832Smax.romanov@nginx.com     ssize_t                  size;
2701832Smax.romanov@nginx.com     nxt_port_mmap_msg_t      *mm;
2711832Smax.romanov@nginx.com     nxt_port_mmap_header_t   *hdr;
2721832Smax.romanov@nginx.com     nxt_port_mmap_handler_t  *mmap_handler;
2731832Smax.romanov@nginx.com 
2741832Smax.romanov@nginx.com     size = nxt_buf_mem_used_size(&b->mem);
2751832Smax.romanov@nginx.com 
2761832Smax.romanov@nginx.com     if (size <= NXT_PORT_MAX_ENQUEUE_BUF_SIZE) {
2771832Smax.romanov@nginx.com         nxt_memcpy(qbuf, b->mem.pos, size);
2781832Smax.romanov@nginx.com 
2791832Smax.romanov@nginx.com         return size;
2801832Smax.romanov@nginx.com     }
2811832Smax.romanov@nginx.com 
2821832Smax.romanov@nginx.com     mmap_handler = b->parent;
2831832Smax.romanov@nginx.com     hdr = mmap_handler->hdr;
2841832Smax.romanov@nginx.com     mm = qbuf;
2851832Smax.romanov@nginx.com 
2861832Smax.romanov@nginx.com     mm->mmap_id = hdr->id;
2871832Smax.romanov@nginx.com     mm->chunk_id = nxt_port_mmap_chunk_id(hdr, b->mem.pos);
2881832Smax.romanov@nginx.com     mm->size = nxt_buf_mem_used_size(&b->mem);
2891832Smax.romanov@nginx.com 
2901832Smax.romanov@nginx.com     pm->mmap = 1;
2911832Smax.romanov@nginx.com 
2921832Smax.romanov@nginx.com     nxt_debug(task, "mmap_msg={%D, %D, %D}", mm->mmap_id, mm->chunk_id,
2931832Smax.romanov@nginx.com               mm->size);
2941832Smax.romanov@nginx.com 
2951832Smax.romanov@nginx.com     return sizeof(nxt_port_mmap_msg_t);
2961832Smax.romanov@nginx.com }
2971832Smax.romanov@nginx.com 
2981832Smax.romanov@nginx.com 
2991125Smax.romanov@nginx.com static nxt_int_t
3001125Smax.romanov@nginx.com nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port,
3011125Smax.romanov@nginx.com     nxt_port_send_msg_t *msg)
3021125Smax.romanov@nginx.com {
3031125Smax.romanov@nginx.com     nxt_int_t  res;
3041125Smax.romanov@nginx.com 
3051125Smax.romanov@nginx.com     nxt_thread_mutex_lock(&port->write_mutex);
3061125Smax.romanov@nginx.com 
3071125Smax.romanov@nginx.com     if (nxt_fast_path(port->socket.write_ready
3081125Smax.romanov@nginx.com                       && nxt_queue_is_empty(&port->messages)))
3091125Smax.romanov@nginx.com     {
3101125Smax.romanov@nginx.com         res = NXT_DECLINED;
3111125Smax.romanov@nginx.com 
3121125Smax.romanov@nginx.com     } else {
3131125Smax.romanov@nginx.com         msg = nxt_port_msg_alloc(msg);
3141125Smax.romanov@nginx.com 
3151125Smax.romanov@nginx.com         if (nxt_fast_path(msg != NULL)) {
3161125Smax.romanov@nginx.com             nxt_queue_insert_tail(&port->messages, &msg->link);
3171125Smax.romanov@nginx.com             nxt_port_use(task, port, 1);
3181125Smax.romanov@nginx.com             res = NXT_OK;
3191125Smax.romanov@nginx.com 
3201125Smax.romanov@nginx.com         } else {
3211125Smax.romanov@nginx.com             res = NXT_ERROR;
3221125Smax.romanov@nginx.com         }
3231125Smax.romanov@nginx.com     }
3241125Smax.romanov@nginx.com 
3251125Smax.romanov@nginx.com     nxt_thread_mutex_unlock(&port->write_mutex);
3261125Smax.romanov@nginx.com 
3271125Smax.romanov@nginx.com     return res;
3281125Smax.romanov@nginx.com }
3291125Smax.romanov@nginx.com 
3301125Smax.romanov@nginx.com 
3311125Smax.romanov@nginx.com static nxt_port_send_msg_t *
3321125Smax.romanov@nginx.com nxt_port_msg_alloc(nxt_port_send_msg_t *m)
3331125Smax.romanov@nginx.com {
3341125Smax.romanov@nginx.com     nxt_port_send_msg_t  *msg;
3351125Smax.romanov@nginx.com 
3361125Smax.romanov@nginx.com     msg = nxt_malloc(sizeof(nxt_port_send_msg_t));
3371125Smax.romanov@nginx.com     if (nxt_slow_path(msg == NULL)) {
3381125Smax.romanov@nginx.com         return NULL;
3391125Smax.romanov@nginx.com     }
3401125Smax.romanov@nginx.com 
3411125Smax.romanov@nginx.com     *msg = *m;
3421125Smax.romanov@nginx.com 
3431125Smax.romanov@nginx.com     msg->allocated = 1;
3441125Smax.romanov@nginx.com 
3451125Smax.romanov@nginx.com     return msg;
34611Sigor@sysoev.ru }
34711Sigor@sysoev.ru 
34811Sigor@sysoev.ru 
34911Sigor@sysoev.ru static void
350343Smax.romanov@nginx.com nxt_port_fd_block_write(nxt_task_t *task, nxt_port_t *port, void *data)
351343Smax.romanov@nginx.com {
352343Smax.romanov@nginx.com     nxt_fd_event_block_write(task->thread->engine, &port->socket);
353343Smax.romanov@nginx.com }
354343Smax.romanov@nginx.com 
355343Smax.romanov@nginx.com 
356343Smax.romanov@nginx.com static void
357343Smax.romanov@nginx.com nxt_port_fd_enable_write(nxt_task_t *task, nxt_port_t *port, void *data)
358343Smax.romanov@nginx.com {
359343Smax.romanov@nginx.com     nxt_fd_event_enable_write(task->thread->engine, &port->socket);
360343Smax.romanov@nginx.com }
361343Smax.romanov@nginx.com 
362343Smax.romanov@nginx.com 
363343Smax.romanov@nginx.com static void
36411Sigor@sysoev.ru nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
36511Sigor@sysoev.ru {
366343Smax.romanov@nginx.com     int                     use_delta;
367197Smax.romanov@nginx.com     size_t                  plain_size;
36811Sigor@sysoev.ru     ssize_t                 n;
3691125Smax.romanov@nginx.com     uint32_t                mmsg_buf[3 * NXT_IOBUF_MAX * 10];
370343Smax.romanov@nginx.com     nxt_bool_t              block_write, enable_write;
37111Sigor@sysoev.ru     nxt_port_t              *port;
3721125Smax.romanov@nginx.com     struct iovec            iov[NXT_IOBUF_MAX * 10];
373127Smax.romanov@nginx.com     nxt_work_queue_t        *wq;
374125Smax.romanov@nginx.com     nxt_port_method_t       m;
37511Sigor@sysoev.ru     nxt_port_send_msg_t     *msg;
37611Sigor@sysoev.ru     nxt_sendbuf_coalesce_t  sb;
37742Smax.romanov@nginx.com 
378197Smax.romanov@nginx.com     port = nxt_container_of(obj, nxt_port_t, socket);
37911Sigor@sysoev.ru 
380343Smax.romanov@nginx.com     block_write = 0;
381343Smax.romanov@nginx.com     enable_write = 0;
382343Smax.romanov@nginx.com     use_delta = 0;
383343Smax.romanov@nginx.com 
384344Smax.romanov@nginx.com     wq = &task->thread->engine->fast_work_queue;
385344Smax.romanov@nginx.com 
38611Sigor@sysoev.ru     do {
3871125Smax.romanov@nginx.com         if (data) {
3881125Smax.romanov@nginx.com             msg = data;
3891125Smax.romanov@nginx.com 
3901125Smax.romanov@nginx.com         } else {
3911125Smax.romanov@nginx.com             msg = nxt_port_msg_first(port);
39211Sigor@sysoev.ru 
3931125Smax.romanov@nginx.com             if (msg == NULL) {
3941125Smax.romanov@nginx.com                 block_write = 1;
3951125Smax.romanov@nginx.com                 goto cleanup;
3961125Smax.romanov@nginx.com             }
39711Sigor@sysoev.ru         }
39811Sigor@sysoev.ru 
3991125Smax.romanov@nginx.com next_fragment:
4001125Smax.romanov@nginx.com 
40114Sigor@sysoev.ru         iov[0].iov_base = &msg->port_msg;
40214Sigor@sysoev.ru         iov[0].iov_len = sizeof(nxt_port_msg_t);
40311Sigor@sysoev.ru 
40411Sigor@sysoev.ru         sb.buf = msg->buf;
40514Sigor@sysoev.ru         sb.iobuf = &iov[1];
40611Sigor@sysoev.ru         sb.nmax = NXT_IOBUF_MAX - 1;
40711Sigor@sysoev.ru         sb.sync = 0;
40811Sigor@sysoev.ru         sb.last = 0;
40942Smax.romanov@nginx.com         sb.size = 0;
41011Sigor@sysoev.ru         sb.limit = port->max_size;
41111Sigor@sysoev.ru 
412352Smax.romanov@nginx.com         sb.limit_reached = 0;
413352Smax.romanov@nginx.com         sb.nmax_reached = 0;
414352Smax.romanov@nginx.com 
41542Smax.romanov@nginx.com         m = nxt_port_mmap_get_method(task, port, msg->buf);
41642Smax.romanov@nginx.com 
41742Smax.romanov@nginx.com         if (m == NXT_PORT_METHOD_MMAP) {
41842Smax.romanov@nginx.com             sb.limit = (1ULL << 31) - 1;
419352Smax.romanov@nginx.com             sb.nmax = nxt_min(NXT_IOBUF_MAX * 10 - 1,
420352Smax.romanov@nginx.com                               port->max_size / PORT_MMAP_MIN_SIZE);
42142Smax.romanov@nginx.com         }
42242Smax.romanov@nginx.com 
4231002Smax.romanov@nginx.com         sb.limit -= iov[0].iov_len;
4241002Smax.romanov@nginx.com 
42542Smax.romanov@nginx.com         nxt_sendbuf_mem_coalesce(task, &sb);
42642Smax.romanov@nginx.com 
42742Smax.romanov@nginx.com         plain_size = sb.size;
42842Smax.romanov@nginx.com 
42942Smax.romanov@nginx.com         /*
43042Smax.romanov@nginx.com          * Send through mmap enabled only when payload
43142Smax.romanov@nginx.com          * is bigger than PORT_MMAP_MIN_SIZE.
43242Smax.romanov@nginx.com          */
43342Smax.romanov@nginx.com         if (m == NXT_PORT_METHOD_MMAP && plain_size > PORT_MMAP_MIN_SIZE) {
4341125Smax.romanov@nginx.com             nxt_port_mmap_write(task, port, msg, &sb, mmsg_buf);
43542Smax.romanov@nginx.com 
43642Smax.romanov@nginx.com         } else {
43742Smax.romanov@nginx.com             m = NXT_PORT_METHOD_PLAIN;
43842Smax.romanov@nginx.com         }
43911Sigor@sysoev.ru 
440189Smax.romanov@nginx.com         msg->port_msg.last |= sb.last;
441352Smax.romanov@nginx.com         msg->port_msg.mf = sb.limit_reached || sb.nmax_reached;
44211Sigor@sysoev.ru 
4431558Smax.romanov@nginx.com         n = nxt_socketpair_send(&port->socket, msg->fd, iov, sb.niov + 1);
44411Sigor@sysoev.ru 
44511Sigor@sysoev.ru         if (n > 0) {
44642Smax.romanov@nginx.com             if (nxt_slow_path((size_t) n != sb.size + iov[0].iov_len)) {
447564Svbart@nginx.com                 nxt_alert(task, "port %d: short write: %z instead of %uz",
448564Svbart@nginx.com                           port->socket.fd, n, sb.size + iov[0].iov_len);
44911Sigor@sysoev.ru                 goto fail;
45011Sigor@sysoev.ru             }
45111Sigor@sysoev.ru 
4521558Smax.romanov@nginx.com             if (msg->close_fd) {
4531558Smax.romanov@nginx.com                 if (msg->fd[0] != -1) {
4541558Smax.romanov@nginx.com                     nxt_fd_close(msg->fd[0]);
455189Smax.romanov@nginx.com 
4561558Smax.romanov@nginx.com                     msg->fd[0] = -1;
4571558Smax.romanov@nginx.com                 }
458189Smax.romanov@nginx.com 
4591558Smax.romanov@nginx.com                 if (msg->fd[1] != -1) {
4601558Smax.romanov@nginx.com                     nxt_fd_close(msg->fd[1]);
4611553Smax.romanov@nginx.com 
4621558Smax.romanov@nginx.com                     msg->fd[1] = -1;
4631558Smax.romanov@nginx.com                 }
4641553Smax.romanov@nginx.com             }
4651553Smax.romanov@nginx.com 
466592Sigor@sysoev.ru             msg->buf = nxt_port_buf_completion(task, wq, msg->buf, plain_size,
4671125Smax.romanov@nginx.com                                                m == NXT_PORT_METHOD_MMAP);
46811Sigor@sysoev.ru 
46911Sigor@sysoev.ru             if (msg->buf != NULL) {
470352Smax.romanov@nginx.com                 nxt_debug(task, "port %d: frag stream #%uD", port->socket.fd,
471352Smax.romanov@nginx.com                           msg->port_msg.stream);
472352Smax.romanov@nginx.com 
47311Sigor@sysoev.ru                 /*
47411Sigor@sysoev.ru                  * A file descriptor is sent only
47511Sigor@sysoev.ru                  * in the first message of a stream.
47611Sigor@sysoev.ru                  */
4771558Smax.romanov@nginx.com                 msg->fd[0] = -1;
4781558Smax.romanov@nginx.com                 msg->fd[1] = -1;
47911Sigor@sysoev.ru                 msg->share += n;
480352Smax.romanov@nginx.com                 msg->port_msg.nf = 1;
48111Sigor@sysoev.ru 
48211Sigor@sysoev.ru                 if (msg->share >= port->max_share) {
48311Sigor@sysoev.ru                     msg->share = 0;
484344Smax.romanov@nginx.com 
485344Smax.romanov@nginx.com                     if (msg->link.next != NULL) {
4861125Smax.romanov@nginx.com                         nxt_thread_mutex_lock(&port->write_mutex);
4871125Smax.romanov@nginx.com 
488344Smax.romanov@nginx.com                         nxt_queue_remove(&msg->link);
4891125Smax.romanov@nginx.com                         nxt_queue_insert_tail(&port->messages, &msg->link);
4901125Smax.romanov@nginx.com 
4911125Smax.romanov@nginx.com                         nxt_thread_mutex_unlock(&port->write_mutex);
492344Smax.romanov@nginx.com 
4931125Smax.romanov@nginx.com                     } else {
4941125Smax.romanov@nginx.com                         msg = nxt_port_msg_insert_tail(port, msg);
4951125Smax.romanov@nginx.com                         if (nxt_slow_path(msg == NULL)) {
4961125Smax.romanov@nginx.com                             goto fail;
4971125Smax.romanov@nginx.com                         }
4981125Smax.romanov@nginx.com 
499344Smax.romanov@nginx.com                         use_delta++;
500344Smax.romanov@nginx.com                     }
5011125Smax.romanov@nginx.com 
5021125Smax.romanov@nginx.com                 } else {
5031125Smax.romanov@nginx.com                     goto next_fragment;
50411Sigor@sysoev.ru                 }
50511Sigor@sysoev.ru 
50611Sigor@sysoev.ru             } else {
507344Smax.romanov@nginx.com                 if (msg->link.next != NULL) {
5081125Smax.romanov@nginx.com                     nxt_thread_mutex_lock(&port->write_mutex);
5091125Smax.romanov@nginx.com 
510344Smax.romanov@nginx.com                     nxt_queue_remove(&msg->link);
5111125Smax.romanov@nginx.com                     msg->link.next = NULL;
5121125Smax.romanov@nginx.com 
5131125Smax.romanov@nginx.com                     nxt_thread_mutex_unlock(&port->write_mutex);
5141125Smax.romanov@nginx.com 
515344Smax.romanov@nginx.com                     use_delta--;
516344Smax.romanov@nginx.com                 }
5171125Smax.romanov@nginx.com 
5181125Smax.romanov@nginx.com                 nxt_port_release_send_msg(msg);
5191125Smax.romanov@nginx.com             }
5201125Smax.romanov@nginx.com 
5211125Smax.romanov@nginx.com             if (data != NULL) {
5221125Smax.romanov@nginx.com                 goto cleanup;
52311Sigor@sysoev.ru             }
52411Sigor@sysoev.ru 
5251004Smax.romanov@nginx.com         } else {
5261125Smax.romanov@nginx.com             if (nxt_slow_path(n == NXT_ERROR)) {
527*1907Smax.romanov@nginx.com                 if (msg->link.next == NULL) {
528*1907Smax.romanov@nginx.com                     if (msg->close_fd) {
529*1907Smax.romanov@nginx.com                         if (msg->fd[0] != -1) {
530*1907Smax.romanov@nginx.com                             nxt_fd_close(msg->fd[0]);
531*1907Smax.romanov@nginx.com 
532*1907Smax.romanov@nginx.com                             msg->fd[0] = -1;
533*1907Smax.romanov@nginx.com                         }
534*1907Smax.romanov@nginx.com 
535*1907Smax.romanov@nginx.com                         if (msg->fd[1] != -1) {
536*1907Smax.romanov@nginx.com                             nxt_fd_close(msg->fd[1]);
537*1907Smax.romanov@nginx.com 
538*1907Smax.romanov@nginx.com                             msg->fd[1] = -1;
539*1907Smax.romanov@nginx.com                         }
540*1907Smax.romanov@nginx.com                     }
541*1907Smax.romanov@nginx.com 
542*1907Smax.romanov@nginx.com                     nxt_port_release_send_msg(msg);
543*1907Smax.romanov@nginx.com                 }
544*1907Smax.romanov@nginx.com 
5451125Smax.romanov@nginx.com                 goto fail;
546344Smax.romanov@nginx.com             }
5471004Smax.romanov@nginx.com 
5481125Smax.romanov@nginx.com             if (msg->link.next == NULL) {
5491125Smax.romanov@nginx.com                 msg = nxt_port_msg_insert_tail(port, msg);
5501125Smax.romanov@nginx.com                 if (nxt_slow_path(msg == NULL)) {
5511125Smax.romanov@nginx.com                     goto fail;
5521125Smax.romanov@nginx.com                 }
5531125Smax.romanov@nginx.com 
5541125Smax.romanov@nginx.com                 use_delta++;
5551004Smax.romanov@nginx.com             }
55611Sigor@sysoev.ru         }
55711Sigor@sysoev.ru 
55811Sigor@sysoev.ru     } while (port->socket.write_ready);
55911Sigor@sysoev.ru 
56012Sigor@sysoev.ru     if (nxt_fd_event_is_disabled(port->socket.write)) {
561343Smax.romanov@nginx.com         enable_write = 1;
56211Sigor@sysoev.ru     }
56311Sigor@sysoev.ru 
5641125Smax.romanov@nginx.com     goto cleanup;
56511Sigor@sysoev.ru 
56611Sigor@sysoev.ru fail:
56711Sigor@sysoev.ru 
568343Smax.romanov@nginx.com     use_delta++;
569343Smax.romanov@nginx.com 
570344Smax.romanov@nginx.com     nxt_work_queue_add(wq, nxt_port_error_handler, task, &port->socket,
571343Smax.romanov@nginx.com                        &port->socket);
572343Smax.romanov@nginx.com 
5731125Smax.romanov@nginx.com cleanup:
574343Smax.romanov@nginx.com 
575343Smax.romanov@nginx.com     if (block_write && nxt_fd_event_is_active(port->socket.write)) {
576343Smax.romanov@nginx.com         nxt_port_post(task, port, nxt_port_fd_block_write, NULL);
577343Smax.romanov@nginx.com     }
578343Smax.romanov@nginx.com 
579343Smax.romanov@nginx.com     if (enable_write) {
580343Smax.romanov@nginx.com         nxt_port_post(task, port, nxt_port_fd_enable_write, NULL);
581343Smax.romanov@nginx.com     }
582343Smax.romanov@nginx.com 
583343Smax.romanov@nginx.com     if (use_delta != 0) {
584343Smax.romanov@nginx.com         nxt_port_use(task, port, use_delta);
585343Smax.romanov@nginx.com     }
58611Sigor@sysoev.ru }
58711Sigor@sysoev.ru 
58811Sigor@sysoev.ru 
5891125Smax.romanov@nginx.com static nxt_port_send_msg_t *
5901125Smax.romanov@nginx.com nxt_port_msg_first(nxt_port_t *port)
5911125Smax.romanov@nginx.com {
5921125Smax.romanov@nginx.com     nxt_queue_link_t     *lnk;
5931125Smax.romanov@nginx.com     nxt_port_send_msg_t  *msg;
5941125Smax.romanov@nginx.com 
5951125Smax.romanov@nginx.com     nxt_thread_mutex_lock(&port->write_mutex);
5961125Smax.romanov@nginx.com 
5971125Smax.romanov@nginx.com     lnk = nxt_queue_first(&port->messages);
5981125Smax.romanov@nginx.com 
5991125Smax.romanov@nginx.com     if (lnk == nxt_queue_tail(&port->messages)) {
6001125Smax.romanov@nginx.com         msg = NULL;
6011125Smax.romanov@nginx.com 
6021125Smax.romanov@nginx.com     } else {
6031125Smax.romanov@nginx.com         msg = nxt_queue_link_data(lnk, nxt_port_send_msg_t, link);
6041125Smax.romanov@nginx.com     }
6051125Smax.romanov@nginx.com 
6061125Smax.romanov@nginx.com     nxt_thread_mutex_unlock(&port->write_mutex);
6071125Smax.romanov@nginx.com 
6081125Smax.romanov@nginx.com     return msg;
6091125Smax.romanov@nginx.com }
6101125Smax.romanov@nginx.com 
6111125Smax.romanov@nginx.com 
612592Sigor@sysoev.ru static nxt_buf_t *
613592Sigor@sysoev.ru nxt_port_buf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b,
614592Sigor@sysoev.ru     size_t sent, nxt_bool_t mmap_mode)
615592Sigor@sysoev.ru {
6161269Sigor@sysoev.ru     size_t     size;
6171269Sigor@sysoev.ru     nxt_buf_t  *next;
618592Sigor@sysoev.ru 
619592Sigor@sysoev.ru     while (b != NULL) {
620592Sigor@sysoev.ru 
621592Sigor@sysoev.ru         nxt_prefetch(b->next);
622592Sigor@sysoev.ru 
623592Sigor@sysoev.ru         if (!nxt_buf_is_sync(b)) {
624592Sigor@sysoev.ru 
625592Sigor@sysoev.ru             size = nxt_buf_used_size(b);
626592Sigor@sysoev.ru 
627592Sigor@sysoev.ru             if (size != 0) {
628592Sigor@sysoev.ru 
629592Sigor@sysoev.ru                 if (sent == 0) {
630592Sigor@sysoev.ru                     break;
631592Sigor@sysoev.ru                 }
632592Sigor@sysoev.ru 
633592Sigor@sysoev.ru                 if (nxt_buf_is_port_mmap(b) && mmap_mode) {
634592Sigor@sysoev.ru                     /*
635592Sigor@sysoev.ru                      * buffer has been sent to other side which is now
636592Sigor@sysoev.ru                      * responsible for shared memory bucket release
637592Sigor@sysoev.ru                      */
638592Sigor@sysoev.ru                     b->is_port_mmap_sent = 1;
639592Sigor@sysoev.ru                 }
640592Sigor@sysoev.ru 
641592Sigor@sysoev.ru                 if (sent < size) {
642592Sigor@sysoev.ru 
643592Sigor@sysoev.ru                     if (nxt_buf_is_mem(b)) {
644592Sigor@sysoev.ru                         b->mem.pos += sent;
645592Sigor@sysoev.ru                     }
646592Sigor@sysoev.ru 
647592Sigor@sysoev.ru                     if (nxt_buf_is_file(b)) {
648592Sigor@sysoev.ru                         b->file_pos += sent;
649592Sigor@sysoev.ru                     }
650592Sigor@sysoev.ru 
651592Sigor@sysoev.ru                     break;
652592Sigor@sysoev.ru                 }
653592Sigor@sysoev.ru 
654592Sigor@sysoev.ru                 /* b->mem.free is NULL in file-only buffer. */
655592Sigor@sysoev.ru                 b->mem.pos = b->mem.free;
656592Sigor@sysoev.ru 
657592Sigor@sysoev.ru                 if (nxt_buf_is_file(b)) {
658592Sigor@sysoev.ru                     b->file_pos = b->file_end;
659592Sigor@sysoev.ru                 }
660592Sigor@sysoev.ru 
661592Sigor@sysoev.ru                 sent -= size;
662592Sigor@sysoev.ru             }
663592Sigor@sysoev.ru         }
664592Sigor@sysoev.ru 
665592Sigor@sysoev.ru         nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
666592Sigor@sysoev.ru 
6671269Sigor@sysoev.ru         next = b->next;
6681269Sigor@sysoev.ru         b->next = NULL;
6691269Sigor@sysoev.ru         b = next;
670592Sigor@sysoev.ru     }
671592Sigor@sysoev.ru 
672592Sigor@sysoev.ru     return b;
673592Sigor@sysoev.ru }
674592Sigor@sysoev.ru 
675592Sigor@sysoev.ru 
6761125Smax.romanov@nginx.com static nxt_port_send_msg_t *
6771125Smax.romanov@nginx.com nxt_port_msg_insert_tail(nxt_port_t *port, nxt_port_send_msg_t *msg)
6781125Smax.romanov@nginx.com {
6791125Smax.romanov@nginx.com     if (msg->allocated == 0) {
6801125Smax.romanov@nginx.com         msg = nxt_port_msg_alloc(msg);
6811125Smax.romanov@nginx.com 
6821125Smax.romanov@nginx.com         if (nxt_slow_path(msg == NULL)) {
6831125Smax.romanov@nginx.com             return NULL;
6841125Smax.romanov@nginx.com         }
6851125Smax.romanov@nginx.com     }
6861125Smax.romanov@nginx.com 
6871125Smax.romanov@nginx.com     nxt_thread_mutex_lock(&port->write_mutex);
6881125Smax.romanov@nginx.com 
6891125Smax.romanov@nginx.com     nxt_queue_insert_tail(&port->messages, &msg->link);
6901125Smax.romanov@nginx.com 
6911125Smax.romanov@nginx.com     nxt_thread_mutex_unlock(&port->write_mutex);
6921125Smax.romanov@nginx.com 
6931125Smax.romanov@nginx.com     return msg;
6941125Smax.romanov@nginx.com }
6951125Smax.romanov@nginx.com 
6961125Smax.romanov@nginx.com 
69711Sigor@sysoev.ru void
69811Sigor@sysoev.ru nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port)
69911Sigor@sysoev.ru {
70011Sigor@sysoev.ru     port->socket.fd = port->pair[0];
70111Sigor@sysoev.ru     port->socket.log = &nxt_main_log;
70211Sigor@sysoev.ru 
703141Smax.romanov@nginx.com     port->engine = task->thread->engine;
704141Smax.romanov@nginx.com 
705141Smax.romanov@nginx.com     port->socket.read_work_queue = &port->engine->fast_work_queue;
7061555Smax.romanov@nginx.com     port->socket.read_handler = port->queue != NULL
7071555Smax.romanov@nginx.com                                 ? nxt_port_queue_read_handler
7081555Smax.romanov@nginx.com                                 : nxt_port_read_handler;
70911Sigor@sysoev.ru     port->socket.error_handler = nxt_port_error_handler;
71011Sigor@sysoev.ru 
711141Smax.romanov@nginx.com     nxt_fd_event_enable_read(port->engine, &port->socket);
71211Sigor@sysoev.ru }
71311Sigor@sysoev.ru 
71411Sigor@sysoev.ru 
71511Sigor@sysoev.ru void
71611Sigor@sysoev.ru nxt_port_read_close(nxt_port_t *port)
71711Sigor@sysoev.ru {
718350Smax.romanov@nginx.com     port->socket.read_ready = 0;
7191015Smax.romanov@nginx.com     port->socket.read = NXT_EVENT_INACTIVE;
72013Sigor@sysoev.ru     nxt_socket_close(port->socket.task, port->pair[0]);
72111Sigor@sysoev.ru     port->pair[0] = -1;
72211Sigor@sysoev.ru }
72311Sigor@sysoev.ru 
72411Sigor@sysoev.ru 
72511Sigor@sysoev.ru static void
72611Sigor@sysoev.ru nxt_port_read_handler(nxt_task_t *task, void *obj, void *data)
72711Sigor@sysoev.ru {
72842Smax.romanov@nginx.com     ssize_t              n;
72942Smax.romanov@nginx.com     nxt_buf_t            *b;
73042Smax.romanov@nginx.com     nxt_port_t           *port;
73142Smax.romanov@nginx.com     struct iovec         iov[2];
73242Smax.romanov@nginx.com     nxt_port_recv_msg_t  msg;
73311Sigor@sysoev.ru 
734125Smax.romanov@nginx.com     port = msg.port = nxt_container_of(obj, nxt_port_t, socket);
73511Sigor@sysoev.ru 
736141Smax.romanov@nginx.com     nxt_assert(port->engine == task->thread->engine);
737141Smax.romanov@nginx.com 
73811Sigor@sysoev.ru     for ( ;; ) {
73911Sigor@sysoev.ru 
74011Sigor@sysoev.ru         b = nxt_port_buf_alloc(port);
74111Sigor@sysoev.ru 
74211Sigor@sysoev.ru         if (nxt_slow_path(b == NULL)) {
74311Sigor@sysoev.ru             /* TODO: disable event for some time */
74411Sigor@sysoev.ru         }
74511Sigor@sysoev.ru 
74642Smax.romanov@nginx.com         iov[0].iov_base = &msg.port_msg;
74714Sigor@sysoev.ru         iov[0].iov_len = sizeof(nxt_port_msg_t);
74811Sigor@sysoev.ru 
74914Sigor@sysoev.ru         iov[1].iov_base = b->mem.pos;
75014Sigor@sysoev.ru         iov[1].iov_len = port->max_size;
75114Sigor@sysoev.ru 
7521558Smax.romanov@nginx.com         n = nxt_socketpair_recv(&port->socket, msg.fd, iov, 2);
75311Sigor@sysoev.ru 
75411Sigor@sysoev.ru         if (n > 0) {
75542Smax.romanov@nginx.com 
75642Smax.romanov@nginx.com             msg.buf = b;
75782Smax.romanov@nginx.com             msg.size = n;
75842Smax.romanov@nginx.com 
75982Smax.romanov@nginx.com             nxt_port_read_msg_process(task, port, &msg);
76011Sigor@sysoev.ru 
761194Smax.romanov@nginx.com             /*
762194Smax.romanov@nginx.com              * To disable instant completion or buffer re-usage,
763194Smax.romanov@nginx.com              * handler should reset 'msg.buf'.
764194Smax.romanov@nginx.com              */
765194Smax.romanov@nginx.com             if (msg.buf == b) {
76611Sigor@sysoev.ru                 nxt_port_buf_free(port, b);
76711Sigor@sysoev.ru             }
76811Sigor@sysoev.ru 
76911Sigor@sysoev.ru             if (port->socket.read_ready) {
77011Sigor@sysoev.ru                 continue;
77111Sigor@sysoev.ru             }
77211Sigor@sysoev.ru 
77311Sigor@sysoev.ru             return;
77411Sigor@sysoev.ru         }
77511Sigor@sysoev.ru 
77611Sigor@sysoev.ru         if (n == NXT_AGAIN) {
77711Sigor@sysoev.ru             nxt_port_buf_free(port, b);
77811Sigor@sysoev.ru 
77912Sigor@sysoev.ru             nxt_fd_event_enable_read(task->thread->engine, &port->socket);
78011Sigor@sysoev.ru             return;
78111Sigor@sysoev.ru         }
78211Sigor@sysoev.ru 
78311Sigor@sysoev.ru         /* n == 0 || n == NXT_ERROR */
78411Sigor@sysoev.ru 
78511Sigor@sysoev.ru         nxt_work_queue_add(&task->thread->engine->fast_work_queue,
78611Sigor@sysoev.ru                            nxt_port_error_handler, task, &port->socket, NULL);
78711Sigor@sysoev.ru         return;
78811Sigor@sysoev.ru     }
78911Sigor@sysoev.ru }
79011Sigor@sysoev.ru 
79111Sigor@sysoev.ru 
7921555Smax.romanov@nginx.com static void
7931555Smax.romanov@nginx.com nxt_port_queue_read_handler(nxt_task_t *task, void *obj, void *data)
7941555Smax.romanov@nginx.com {
7951555Smax.romanov@nginx.com     ssize_t              n;
7961555Smax.romanov@nginx.com     nxt_buf_t            *b;
7971555Smax.romanov@nginx.com     nxt_port_t           *port;
7981555Smax.romanov@nginx.com     struct iovec         iov[2];
7991555Smax.romanov@nginx.com     nxt_port_queue_t     *queue;
8001555Smax.romanov@nginx.com     nxt_port_recv_msg_t  msg, *smsg;
8011555Smax.romanov@nginx.com     uint8_t              qmsg[NXT_PORT_QUEUE_MSG_SIZE];
8021555Smax.romanov@nginx.com 
8031555Smax.romanov@nginx.com     port = nxt_container_of(obj, nxt_port_t, socket);
8041555Smax.romanov@nginx.com     msg.port = port;
8051555Smax.romanov@nginx.com 
8061555Smax.romanov@nginx.com     nxt_assert(port->engine == task->thread->engine);
8071555Smax.romanov@nginx.com 
8081555Smax.romanov@nginx.com     queue = port->queue;
8091555Smax.romanov@nginx.com     nxt_atomic_fetch_add(&queue->nitems, 1);
8101555Smax.romanov@nginx.com 
8111555Smax.romanov@nginx.com     for ( ;; ) {
8121555Smax.romanov@nginx.com 
8131555Smax.romanov@nginx.com         if (port->from_socket == 0) {
8141555Smax.romanov@nginx.com             n = nxt_port_queue_recv(queue, qmsg);
8151555Smax.romanov@nginx.com 
8161555Smax.romanov@nginx.com             if (n < 0 && !port->socket.read_ready) {
8171555Smax.romanov@nginx.com                 nxt_atomic_fetch_add(&queue->nitems, -1);
8181555Smax.romanov@nginx.com 
8191555Smax.romanov@nginx.com                 n = nxt_port_queue_recv(queue, qmsg);
8201555Smax.romanov@nginx.com                 if (n < 0) {
8211555Smax.romanov@nginx.com                     return;
8221555Smax.romanov@nginx.com                 }
8231555Smax.romanov@nginx.com 
8241555Smax.romanov@nginx.com                 nxt_atomic_fetch_add(&queue->nitems, 1);
8251555Smax.romanov@nginx.com             }
8261555Smax.romanov@nginx.com 
8271555Smax.romanov@nginx.com             if (n == 1 && qmsg[0] == _NXT_PORT_MSG_READ_SOCKET) {
8281555Smax.romanov@nginx.com                 port->from_socket++;
8291555Smax.romanov@nginx.com 
8301555Smax.romanov@nginx.com                 nxt_debug(task, "port{%d,%d} %d: dequeue 1 read_socket %d",
8311555Smax.romanov@nginx.com                           (int) port->pid, (int) port->id, port->socket.fd,
8321555Smax.romanov@nginx.com                           port->from_socket);
8331555Smax.romanov@nginx.com 
8341555Smax.romanov@nginx.com                 continue;
8351555Smax.romanov@nginx.com             }
8361555Smax.romanov@nginx.com 
8371555Smax.romanov@nginx.com             nxt_debug(task, "port{%d,%d} %d: dequeue %d",
8381555Smax.romanov@nginx.com                       (int) port->pid, (int) port->id, port->socket.fd,
8391555Smax.romanov@nginx.com                       (int) n);
8401555Smax.romanov@nginx.com 
8411555Smax.romanov@nginx.com         } else {
8421555Smax.romanov@nginx.com             if ((smsg = port->socket_msg) != NULL && smsg->size != 0) {
8431555Smax.romanov@nginx.com                 msg.port_msg = smsg->port_msg;
8441555Smax.romanov@nginx.com                 b = smsg->buf;
8451555Smax.romanov@nginx.com                 n = smsg->size;
8461558Smax.romanov@nginx.com                 msg.fd[0] = smsg->fd[0];
8471558Smax.romanov@nginx.com                 msg.fd[1] = smsg->fd[1];
8481555Smax.romanov@nginx.com 
8491555Smax.romanov@nginx.com                 smsg->size = 0;
8501555Smax.romanov@nginx.com 
8511555Smax.romanov@nginx.com                 port->from_socket--;
8521555Smax.romanov@nginx.com 
8531555Smax.romanov@nginx.com                 nxt_debug(task, "port{%d,%d} %d: use suspended message %d",
8541555Smax.romanov@nginx.com                           (int) port->pid, (int) port->id, port->socket.fd,
8551555Smax.romanov@nginx.com                           (int) n);
8561555Smax.romanov@nginx.com 
8571555Smax.romanov@nginx.com                 goto process;
8581555Smax.romanov@nginx.com             }
8591555Smax.romanov@nginx.com 
8601555Smax.romanov@nginx.com             n = -1;
8611555Smax.romanov@nginx.com         }
8621555Smax.romanov@nginx.com 
8631555Smax.romanov@nginx.com         if (n < 0 && !port->socket.read_ready) {
8641555Smax.romanov@nginx.com             nxt_atomic_fetch_add(&queue->nitems, -1);
8651555Smax.romanov@nginx.com             return;
8661555Smax.romanov@nginx.com         }
8671555Smax.romanov@nginx.com 
8681555Smax.romanov@nginx.com         b = nxt_port_buf_alloc(port);
8691555Smax.romanov@nginx.com 
8701555Smax.romanov@nginx.com         if (nxt_slow_path(b == NULL)) {
8711555Smax.romanov@nginx.com             /* TODO: disable event for some time */
8721555Smax.romanov@nginx.com         }
8731555Smax.romanov@nginx.com 
8741555Smax.romanov@nginx.com         if (n >= (ssize_t) sizeof(nxt_port_msg_t)) {
8751555Smax.romanov@nginx.com             nxt_memcpy(&msg.port_msg, qmsg, sizeof(nxt_port_msg_t));
8761555Smax.romanov@nginx.com 
8771555Smax.romanov@nginx.com             if (n > (ssize_t) sizeof(nxt_port_msg_t)) {
8781555Smax.romanov@nginx.com                 nxt_memcpy(b->mem.pos, qmsg + sizeof(nxt_port_msg_t),
8791555Smax.romanov@nginx.com                            n - sizeof(nxt_port_msg_t));
8801555Smax.romanov@nginx.com             }
8811555Smax.romanov@nginx.com 
8821555Smax.romanov@nginx.com         } else {
8831555Smax.romanov@nginx.com             iov[0].iov_base = &msg.port_msg;
8841555Smax.romanov@nginx.com             iov[0].iov_len = sizeof(nxt_port_msg_t);
8851555Smax.romanov@nginx.com 
8861555Smax.romanov@nginx.com             iov[1].iov_base = b->mem.pos;
8871555Smax.romanov@nginx.com             iov[1].iov_len = port->max_size;
8881555Smax.romanov@nginx.com 
8891558Smax.romanov@nginx.com             n = nxt_socketpair_recv(&port->socket, msg.fd, iov, 2);
8901555Smax.romanov@nginx.com 
8911555Smax.romanov@nginx.com             if (n == (ssize_t) sizeof(nxt_port_msg_t)
8921555Smax.romanov@nginx.com                 && msg.port_msg.type == _NXT_PORT_MSG_READ_QUEUE)
8931555Smax.romanov@nginx.com             {
8941555Smax.romanov@nginx.com                 nxt_port_buf_free(port, b);
8951555Smax.romanov@nginx.com 
8961555Smax.romanov@nginx.com                 nxt_debug(task, "port{%d,%d} %d: recv %d read_queue",
8971555Smax.romanov@nginx.com                           (int) port->pid, (int) port->id, port->socket.fd,
8981555Smax.romanov@nginx.com                           (int) n);
8991555Smax.romanov@nginx.com 
9001555Smax.romanov@nginx.com                 continue;
9011555Smax.romanov@nginx.com             }
9021555Smax.romanov@nginx.com 
9031555Smax.romanov@nginx.com             nxt_debug(task, "port{%d,%d} %d: recvmsg %d",
9041555Smax.romanov@nginx.com                       (int) port->pid, (int) port->id, port->socket.fd,
9051555Smax.romanov@nginx.com                       (int) n);
9061555Smax.romanov@nginx.com 
9071555Smax.romanov@nginx.com             if (n > 0) {
9081555Smax.romanov@nginx.com                 if (port->from_socket == 0) {
9091555Smax.romanov@nginx.com                     nxt_debug(task, "port{%d,%d} %d: suspend message %d",
9101555Smax.romanov@nginx.com                               (int) port->pid, (int) port->id, port->socket.fd,
9111555Smax.romanov@nginx.com                               (int) n);
9121555Smax.romanov@nginx.com 
9131555Smax.romanov@nginx.com                     smsg = port->socket_msg;
9141555Smax.romanov@nginx.com 
9151555Smax.romanov@nginx.com                     if (nxt_slow_path(smsg == NULL)) {
9161555Smax.romanov@nginx.com                         smsg = nxt_mp_alloc(port->mem_pool,
9171555Smax.romanov@nginx.com                                             sizeof(nxt_port_recv_msg_t));
9181555Smax.romanov@nginx.com 
9191555Smax.romanov@nginx.com                         if (nxt_slow_path(smsg == NULL)) {
9201555Smax.romanov@nginx.com                             nxt_alert(task, "port{%d,%d} %d: suspend message "
9211555Smax.romanov@nginx.com                                             "failed",
9221555Smax.romanov@nginx.com                                       (int) port->pid, (int) port->id,
9231555Smax.romanov@nginx.com                                       port->socket.fd);
9241555Smax.romanov@nginx.com 
9251555Smax.romanov@nginx.com                             return;
9261555Smax.romanov@nginx.com                         }
9271555Smax.romanov@nginx.com 
9281555Smax.romanov@nginx.com                         port->socket_msg = smsg;
9291555Smax.romanov@nginx.com 
9301555Smax.romanov@nginx.com                     } else {
9311555Smax.romanov@nginx.com                         if (nxt_slow_path(smsg->size != 0)) {
9321555Smax.romanov@nginx.com                             nxt_alert(task, "port{%d,%d} %d: too many suspend "
9331555Smax.romanov@nginx.com                                             "messages",
9341555Smax.romanov@nginx.com                                       (int) port->pid, (int) port->id,
9351555Smax.romanov@nginx.com                                       port->socket.fd);
9361555Smax.romanov@nginx.com 
9371555Smax.romanov@nginx.com                             return;
9381555Smax.romanov@nginx.com                         }
9391555Smax.romanov@nginx.com                     }
9401555Smax.romanov@nginx.com 
9411555Smax.romanov@nginx.com                     smsg->port_msg = msg.port_msg;
9421555Smax.romanov@nginx.com                     smsg->buf = b;
9431555Smax.romanov@nginx.com                     smsg->size = n;
9441558Smax.romanov@nginx.com                     smsg->fd[0] = msg.fd[0];
9451558Smax.romanov@nginx.com                     smsg->fd[1] = msg.fd[1];
9461555Smax.romanov@nginx.com 
9471555Smax.romanov@nginx.com                     continue;
9481555Smax.romanov@nginx.com                 }
9491555Smax.romanov@nginx.com 
9501555Smax.romanov@nginx.com                 port->from_socket--;
9511555Smax.romanov@nginx.com             }
9521555Smax.romanov@nginx.com         }
9531555Smax.romanov@nginx.com 
9541555Smax.romanov@nginx.com     process:
9551555Smax.romanov@nginx.com 
9561555Smax.romanov@nginx.com         if (n > 0) {
9571555Smax.romanov@nginx.com             msg.buf = b;
9581555Smax.romanov@nginx.com             msg.size = n;
9591555Smax.romanov@nginx.com 
9601555Smax.romanov@nginx.com             nxt_port_read_msg_process(task, port, &msg);
9611555Smax.romanov@nginx.com 
9621555Smax.romanov@nginx.com             /*
9631555Smax.romanov@nginx.com              * To disable instant completion or buffer re-usage,
9641555Smax.romanov@nginx.com              * handler should reset 'msg.buf'.
9651555Smax.romanov@nginx.com              */
9661555Smax.romanov@nginx.com             if (msg.buf == b) {
9671555Smax.romanov@nginx.com                 nxt_port_buf_free(port, b);
9681555Smax.romanov@nginx.com             }
9691555Smax.romanov@nginx.com 
9701555Smax.romanov@nginx.com             continue;
9711555Smax.romanov@nginx.com         }
9721555Smax.romanov@nginx.com 
9731555Smax.romanov@nginx.com         if (n == NXT_AGAIN) {
9741555Smax.romanov@nginx.com             nxt_port_buf_free(port, b);
9751555Smax.romanov@nginx.com 
9761555Smax.romanov@nginx.com             nxt_fd_event_enable_read(task->thread->engine, &port->socket);
9771555Smax.romanov@nginx.com 
9781555Smax.romanov@nginx.com             continue;
9791555Smax.romanov@nginx.com         }
9801555Smax.romanov@nginx.com 
9811555Smax.romanov@nginx.com         /* n == 0 || n == NXT_ERROR */
9821555Smax.romanov@nginx.com 
9831555Smax.romanov@nginx.com         nxt_work_queue_add(&task->thread->engine->fast_work_queue,
9841555Smax.romanov@nginx.com                            nxt_port_error_handler, task, &port->socket, NULL);
9851555Smax.romanov@nginx.com         return;
9861555Smax.romanov@nginx.com     }
9871555Smax.romanov@nginx.com }
9881555Smax.romanov@nginx.com 
9891555Smax.romanov@nginx.com 
9901005Smax.romanov@nginx.com typedef struct {
9911005Smax.romanov@nginx.com     uint32_t  stream;
9921005Smax.romanov@nginx.com     uint32_t  pid;
9931005Smax.romanov@nginx.com } nxt_port_frag_key_t;
9941005Smax.romanov@nginx.com 
9951005Smax.romanov@nginx.com 
996352Smax.romanov@nginx.com static nxt_int_t
997352Smax.romanov@nginx.com nxt_port_lvlhsh_frag_test(nxt_lvlhsh_query_t *lhq, void *data)
998352Smax.romanov@nginx.com {
999352Smax.romanov@nginx.com     nxt_port_recv_msg_t  *fmsg;
10001005Smax.romanov@nginx.com     nxt_port_frag_key_t  *frag_key;
1001352Smax.romanov@nginx.com 
1002352Smax.romanov@nginx.com     fmsg = data;
10031005Smax.romanov@nginx.com     frag_key = (nxt_port_frag_key_t *) lhq->key.start;
1004352Smax.romanov@nginx.com 
10051005Smax.romanov@nginx.com     if (lhq->key.length == sizeof(nxt_port_frag_key_t)
10061005Smax.romanov@nginx.com         && frag_key->stream == fmsg->port_msg.stream
10071005Smax.romanov@nginx.com         && frag_key->pid == (uint32_t) fmsg->port_msg.pid)
1008352Smax.romanov@nginx.com     {
1009352Smax.romanov@nginx.com         return NXT_OK;
1010352Smax.romanov@nginx.com     }
1011352Smax.romanov@nginx.com 
1012352Smax.romanov@nginx.com     return NXT_DECLINED;
1013352Smax.romanov@nginx.com }
1014352Smax.romanov@nginx.com 
1015352Smax.romanov@nginx.com 
1016352Smax.romanov@nginx.com static void *
1017352Smax.romanov@nginx.com nxt_port_lvlhsh_frag_alloc(void *ctx, size_t size)
1018352Smax.romanov@nginx.com {
10191084Smax.romanov@nginx.com     return nxt_mp_align(ctx, size, size);
1020352Smax.romanov@nginx.com }
1021352Smax.romanov@nginx.com 
1022352Smax.romanov@nginx.com 
1023352Smax.romanov@nginx.com static void
1024352Smax.romanov@nginx.com nxt_port_lvlhsh_frag_free(void *ctx, void *p)
1025352Smax.romanov@nginx.com {
1026389Smax.romanov@nginx.com     nxt_mp_free(ctx, p);
1027352Smax.romanov@nginx.com }
1028352Smax.romanov@nginx.com 
1029352Smax.romanov@nginx.com 
1030352Smax.romanov@nginx.com static const nxt_lvlhsh_proto_t  lvlhsh_frag_proto  nxt_aligned(64) = {
1031352Smax.romanov@nginx.com     NXT_LVLHSH_DEFAULT,
1032352Smax.romanov@nginx.com     nxt_port_lvlhsh_frag_test,
1033352Smax.romanov@nginx.com     nxt_port_lvlhsh_frag_alloc,
1034352Smax.romanov@nginx.com     nxt_port_lvlhsh_frag_free,
1035352Smax.romanov@nginx.com };
1036352Smax.romanov@nginx.com 
1037352Smax.romanov@nginx.com 
1038352Smax.romanov@nginx.com static nxt_port_recv_msg_t *
1039352Smax.romanov@nginx.com nxt_port_frag_start(nxt_task_t *task, nxt_port_t *port,
1040352Smax.romanov@nginx.com     nxt_port_recv_msg_t *msg)
1041352Smax.romanov@nginx.com {
1042352Smax.romanov@nginx.com     nxt_int_t            res;
1043352Smax.romanov@nginx.com     nxt_lvlhsh_query_t   lhq;
1044352Smax.romanov@nginx.com     nxt_port_recv_msg_t  *fmsg;
10451005Smax.romanov@nginx.com     nxt_port_frag_key_t  frag_key;
1046352Smax.romanov@nginx.com 
1047352Smax.romanov@nginx.com     nxt_debug(task, "start frag stream #%uD", msg->port_msg.stream);
1048352Smax.romanov@nginx.com 
1049352Smax.romanov@nginx.com     fmsg = nxt_mp_alloc(port->mem_pool, sizeof(nxt_port_recv_msg_t));
1050352Smax.romanov@nginx.com 
1051352Smax.romanov@nginx.com     if (nxt_slow_path(fmsg == NULL)) {
1052352Smax.romanov@nginx.com         return NULL;
1053352Smax.romanov@nginx.com     }
1054352Smax.romanov@nginx.com 
1055352Smax.romanov@nginx.com     *fmsg = *msg;
1056352Smax.romanov@nginx.com 
10571005Smax.romanov@nginx.com     frag_key.stream = fmsg->port_msg.stream;
10581005Smax.romanov@nginx.com     frag_key.pid = fmsg->port_msg.pid;
10591005Smax.romanov@nginx.com 
10601005Smax.romanov@nginx.com     lhq.key_hash = nxt_murmur_hash2(&frag_key, sizeof(nxt_port_frag_key_t));
10611005Smax.romanov@nginx.com     lhq.key.length = sizeof(nxt_port_frag_key_t);
10621005Smax.romanov@nginx.com     lhq.key.start = (u_char *) &frag_key;
1063352Smax.romanov@nginx.com     lhq.proto = &lvlhsh_frag_proto;
1064352Smax.romanov@nginx.com     lhq.replace = 0;
1065352Smax.romanov@nginx.com     lhq.value = fmsg;
1066352Smax.romanov@nginx.com     lhq.pool = port->mem_pool;
1067352Smax.romanov@nginx.com 
1068352Smax.romanov@nginx.com     res = nxt_lvlhsh_insert(&port->frags, &lhq);
1069352Smax.romanov@nginx.com 
1070352Smax.romanov@nginx.com     switch (res) {
1071352Smax.romanov@nginx.com 
1072352Smax.romanov@nginx.com     case NXT_OK:
1073352Smax.romanov@nginx.com         return fmsg;
1074352Smax.romanov@nginx.com 
1075352Smax.romanov@nginx.com     case NXT_DECLINED:
1076352Smax.romanov@nginx.com         nxt_log(task, NXT_LOG_WARN, "duplicate frag stream #%uD",
1077352Smax.romanov@nginx.com                 fmsg->port_msg.stream);
1078352Smax.romanov@nginx.com         nxt_mp_free(port->mem_pool, fmsg);
1079352Smax.romanov@nginx.com 
1080352Smax.romanov@nginx.com         return NULL;
1081352Smax.romanov@nginx.com 
1082352Smax.romanov@nginx.com     default:
1083352Smax.romanov@nginx.com         nxt_log(task, NXT_LOG_WARN, "failed to add frag stream #%uD",
1084352Smax.romanov@nginx.com                 fmsg->port_msg.stream);
1085352Smax.romanov@nginx.com 
1086352Smax.romanov@nginx.com         nxt_mp_free(port->mem_pool, fmsg);
1087352Smax.romanov@nginx.com 
1088352Smax.romanov@nginx.com         return NULL;
1089352Smax.romanov@nginx.com 
1090352Smax.romanov@nginx.com     }
1091352Smax.romanov@nginx.com }
1092352Smax.romanov@nginx.com 
1093352Smax.romanov@nginx.com 
1094352Smax.romanov@nginx.com static nxt_port_recv_msg_t *
10951005Smax.romanov@nginx.com nxt_port_frag_find(nxt_task_t *task, nxt_port_t *port, nxt_port_recv_msg_t *msg)
1096352Smax.romanov@nginx.com {
10971005Smax.romanov@nginx.com     nxt_int_t            res;
10981005Smax.romanov@nginx.com     nxt_bool_t           last;
10991005Smax.romanov@nginx.com     nxt_lvlhsh_query_t   lhq;
11001005Smax.romanov@nginx.com     nxt_port_frag_key_t  frag_key;
11011005Smax.romanov@nginx.com 
11021005Smax.romanov@nginx.com     last = msg->port_msg.mf == 0;
1103352Smax.romanov@nginx.com 
11041005Smax.romanov@nginx.com     nxt_debug(task, "%s frag stream #%uD", last ? "last" : "next",
11051005Smax.romanov@nginx.com               msg->port_msg.stream);
1106352Smax.romanov@nginx.com 
11071005Smax.romanov@nginx.com     frag_key.stream = msg->port_msg.stream;
11081005Smax.romanov@nginx.com     frag_key.pid = msg->port_msg.pid;
11091005Smax.romanov@nginx.com 
11101005Smax.romanov@nginx.com     lhq.key_hash = nxt_murmur_hash2(&frag_key, sizeof(nxt_port_frag_key_t));
11111005Smax.romanov@nginx.com     lhq.key.length = sizeof(nxt_port_frag_key_t);
11121005Smax.romanov@nginx.com     lhq.key.start = (u_char *) &frag_key;
1113352Smax.romanov@nginx.com     lhq.proto = &lvlhsh_frag_proto;
1114352Smax.romanov@nginx.com     lhq.pool = port->mem_pool;
1115352Smax.romanov@nginx.com 
1116352Smax.romanov@nginx.com     res = last != 0 ? nxt_lvlhsh_delete(&port->frags, &lhq) :
1117352Smax.romanov@nginx.com           nxt_lvlhsh_find(&port->frags, &lhq);
1118352Smax.romanov@nginx.com 
1119352Smax.romanov@nginx.com     switch (res) {
1120352Smax.romanov@nginx.com 
1121352Smax.romanov@nginx.com     case NXT_OK:
1122352Smax.romanov@nginx.com         return lhq.value;
1123352Smax.romanov@nginx.com 
1124352Smax.romanov@nginx.com     default:
11251005Smax.romanov@nginx.com         nxt_log(task, NXT_LOG_INFO, "frag stream #%uD not found",
11261005Smax.romanov@nginx.com                 frag_key.stream);
1127352Smax.romanov@nginx.com 
1128352Smax.romanov@nginx.com         return NULL;
1129352Smax.romanov@nginx.com     }
1130352Smax.romanov@nginx.com }
1131352Smax.romanov@nginx.com 
1132352Smax.romanov@nginx.com 
113311Sigor@sysoev.ru static void
113411Sigor@sysoev.ru nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
113582Smax.romanov@nginx.com     nxt_port_recv_msg_t *msg)
113611Sigor@sysoev.ru {
11371269Sigor@sysoev.ru     nxt_buf_t            *b, *orig_b, *next;
1138352Smax.romanov@nginx.com     nxt_port_recv_msg_t  *fmsg;
113911Sigor@sysoev.ru 
114082Smax.romanov@nginx.com     if (nxt_slow_path(msg->size < sizeof(nxt_port_msg_t))) {
1141564Svbart@nginx.com         nxt_alert(task, "port %d: too small message:%uz",
1142564Svbart@nginx.com                   port->socket.fd, msg->size);
1143423Smax.romanov@nginx.com 
11441558Smax.romanov@nginx.com         if (msg->fd[0] != -1) {
11451558Smax.romanov@nginx.com             nxt_fd_close(msg->fd[0]);
1146423Smax.romanov@nginx.com         }
1147423Smax.romanov@nginx.com 
11481558Smax.romanov@nginx.com         if (msg->fd[1] != -1) {
11491558Smax.romanov@nginx.com             nxt_fd_close(msg->fd[1]);
11501553Smax.romanov@nginx.com         }
11511553Smax.romanov@nginx.com 
1152423Smax.romanov@nginx.com         return;
115311Sigor@sysoev.ru     }
115411Sigor@sysoev.ru 
115542Smax.romanov@nginx.com     /* adjust size to actual buffer used size */
115682Smax.romanov@nginx.com     msg->size -= sizeof(nxt_port_msg_t);
115742Smax.romanov@nginx.com 
115842Smax.romanov@nginx.com     b = orig_b = msg->buf;
115982Smax.romanov@nginx.com     b->mem.free += msg->size;
116042Smax.romanov@nginx.com 
11611555Smax.romanov@nginx.com     msg->cancelled = 0;
116211Sigor@sysoev.ru 
1163352Smax.romanov@nginx.com     if (nxt_slow_path(msg->port_msg.nf != 0)) {
1164423Smax.romanov@nginx.com 
11651005Smax.romanov@nginx.com         fmsg = nxt_port_frag_find(task, port, msg);
1166352Smax.romanov@nginx.com 
1167551Smax.romanov@nginx.com         if (nxt_slow_path(fmsg == NULL)) {
1168551Smax.romanov@nginx.com             goto fmsg_failed;
1169551Smax.romanov@nginx.com         }
1170423Smax.romanov@nginx.com 
1171423Smax.romanov@nginx.com         if (nxt_fast_path(fmsg->cancelled == 0)) {
1172423Smax.romanov@nginx.com 
1173423Smax.romanov@nginx.com             if (msg->port_msg.mmap) {
1174423Smax.romanov@nginx.com                 nxt_port_mmap_read(task, msg);
1175423Smax.romanov@nginx.com             }
1176423Smax.romanov@nginx.com 
1177423Smax.romanov@nginx.com             nxt_buf_chain_add(&fmsg->buf, msg->buf);
1178423Smax.romanov@nginx.com 
1179423Smax.romanov@nginx.com             fmsg->size += msg->size;
1180423Smax.romanov@nginx.com             msg->buf = NULL;
1181423Smax.romanov@nginx.com             b = NULL;
1182423Smax.romanov@nginx.com 
1183423Smax.romanov@nginx.com             if (nxt_fast_path(msg->port_msg.mf == 0)) {
1184423Smax.romanov@nginx.com 
1185423Smax.romanov@nginx.com                 b = fmsg->buf;
1186423Smax.romanov@nginx.com 
1187423Smax.romanov@nginx.com                 port->handler(task, fmsg);
1188423Smax.romanov@nginx.com 
1189423Smax.romanov@nginx.com                 msg->buf = fmsg->buf;
11901558Smax.romanov@nginx.com                 msg->fd[0] = fmsg->fd[0];
11911558Smax.romanov@nginx.com                 msg->fd[1] = fmsg->fd[1];
1192974Smax.romanov@nginx.com 
1193974Smax.romanov@nginx.com                 /*
1194974Smax.romanov@nginx.com                  * To disable instant completion or buffer re-usage,
1195974Smax.romanov@nginx.com                  * handler should reset 'msg.buf'.
1196974Smax.romanov@nginx.com                  */
1197974Smax.romanov@nginx.com                 if (!msg->port_msg.mmap && msg->buf == b) {
1198974Smax.romanov@nginx.com                     nxt_port_buf_free(port, b);
1199974Smax.romanov@nginx.com                 }
1200423Smax.romanov@nginx.com             }
1201352Smax.romanov@nginx.com         }
1202352Smax.romanov@nginx.com 
1203352Smax.romanov@nginx.com         if (nxt_fast_path(msg->port_msg.mf == 0)) {
1204352Smax.romanov@nginx.com             nxt_mp_free(port->mem_pool, fmsg);
1205352Smax.romanov@nginx.com         }
1206352Smax.romanov@nginx.com     } else {
1207352Smax.romanov@nginx.com         if (nxt_slow_path(msg->port_msg.mf != 0)) {
1208423Smax.romanov@nginx.com 
1209423Smax.romanov@nginx.com             if (msg->port_msg.mmap && msg->cancelled == 0) {
1210423Smax.romanov@nginx.com                 nxt_port_mmap_read(task, msg);
1211423Smax.romanov@nginx.com                 b = msg->buf;
1212423Smax.romanov@nginx.com             }
1213423Smax.romanov@nginx.com 
1214352Smax.romanov@nginx.com             fmsg = nxt_port_frag_start(task, port, msg);
1215352Smax.romanov@nginx.com 
1216551Smax.romanov@nginx.com             if (nxt_slow_path(fmsg == NULL)) {
1217551Smax.romanov@nginx.com                 goto fmsg_failed;
1218551Smax.romanov@nginx.com             }
1219352Smax.romanov@nginx.com 
1220352Smax.romanov@nginx.com             fmsg->port_msg.nf = 0;
1221352Smax.romanov@nginx.com             fmsg->port_msg.mf = 0;
1222352Smax.romanov@nginx.com 
1223423Smax.romanov@nginx.com             if (nxt_fast_path(msg->cancelled == 0)) {
1224423Smax.romanov@nginx.com                 msg->buf = NULL;
12251558Smax.romanov@nginx.com                 msg->fd[0] = -1;
12261558Smax.romanov@nginx.com                 msg->fd[1] = -1;
1227423Smax.romanov@nginx.com                 b = NULL;
1228423Smax.romanov@nginx.com 
1229423Smax.romanov@nginx.com             } else {
12301558Smax.romanov@nginx.com                 if (msg->fd[0] != -1) {
12311558Smax.romanov@nginx.com                     nxt_fd_close(msg->fd[0]);
1232423Smax.romanov@nginx.com                 }
12331553Smax.romanov@nginx.com 
12341558Smax.romanov@nginx.com                 if (msg->fd[1] != -1) {
12351558Smax.romanov@nginx.com                     nxt_fd_close(msg->fd[1]);
12361553Smax.romanov@nginx.com                 }
1237423Smax.romanov@nginx.com             }
1238352Smax.romanov@nginx.com         } else {
1239423Smax.romanov@nginx.com             if (nxt_fast_path(msg->cancelled == 0)) {
1240423Smax.romanov@nginx.com 
1241423Smax.romanov@nginx.com                 if (msg->port_msg.mmap) {
1242423Smax.romanov@nginx.com                     nxt_port_mmap_read(task, msg);
1243423Smax.romanov@nginx.com                     b = msg->buf;
1244423Smax.romanov@nginx.com                 }
1245423Smax.romanov@nginx.com 
1246423Smax.romanov@nginx.com                 port->handler(task, msg);
1247423Smax.romanov@nginx.com             }
1248352Smax.romanov@nginx.com         }
1249352Smax.romanov@nginx.com     }
125042Smax.romanov@nginx.com 
1251551Smax.romanov@nginx.com fmsg_failed:
1252551Smax.romanov@nginx.com 
125382Smax.romanov@nginx.com     if (msg->port_msg.mmap && orig_b != b) {
125442Smax.romanov@nginx.com 
1255194Smax.romanov@nginx.com         /*
1256194Smax.romanov@nginx.com          * To disable instant buffer completion,
1257194Smax.romanov@nginx.com          * handler should reset 'msg->buf'.
1258194Smax.romanov@nginx.com          */
1259194Smax.romanov@nginx.com         if (msg->buf == b) {
1260194Smax.romanov@nginx.com             /* complete mmap buffers */
12611269Sigor@sysoev.ru             while (b != NULL) {
1262194Smax.romanov@nginx.com                 nxt_debug(task, "complete buffer %p", b);
1263194Smax.romanov@nginx.com 
1264194Smax.romanov@nginx.com                 nxt_work_queue_add(port->socket.read_work_queue,
1265194Smax.romanov@nginx.com                     b->completion_handler, task, b, b->parent);
12661269Sigor@sysoev.ru 
12671269Sigor@sysoev.ru                 next = b->next;
12681269Sigor@sysoev.ru                 b->next = NULL;
12691269Sigor@sysoev.ru                 b = next;
1270194Smax.romanov@nginx.com             }
127142Smax.romanov@nginx.com         }
1272194Smax.romanov@nginx.com 
1273194Smax.romanov@nginx.com         /* restore original buf */
1274194Smax.romanov@nginx.com         msg->buf = orig_b;
127542Smax.romanov@nginx.com     }
127611Sigor@sysoev.ru }
127711Sigor@sysoev.ru 
127811Sigor@sysoev.ru 
127911Sigor@sysoev.ru static nxt_buf_t *
128011Sigor@sysoev.ru nxt_port_buf_alloc(nxt_port_t *port)
128111Sigor@sysoev.ru {
128211Sigor@sysoev.ru     nxt_buf_t  *b;
128311Sigor@sysoev.ru 
128411Sigor@sysoev.ru     if (port->free_bufs != NULL) {
128511Sigor@sysoev.ru         b = port->free_bufs;
128611Sigor@sysoev.ru         port->free_bufs = b->next;
128711Sigor@sysoev.ru 
128811Sigor@sysoev.ru         b->mem.pos = b->mem.start;
128911Sigor@sysoev.ru         b->mem.free = b->mem.start;
129042Smax.romanov@nginx.com         b->next = NULL;
129111Sigor@sysoev.ru     } else {
129211Sigor@sysoev.ru         b = nxt_buf_mem_alloc(port->mem_pool, port->max_size, 0);
129311Sigor@sysoev.ru         if (nxt_slow_path(b == NULL)) {
129411Sigor@sysoev.ru             return NULL;
129511Sigor@sysoev.ru         }
129611Sigor@sysoev.ru     }
129711Sigor@sysoev.ru 
129811Sigor@sysoev.ru     return b;
129911Sigor@sysoev.ru }
130011Sigor@sysoev.ru 
130111Sigor@sysoev.ru 
130211Sigor@sysoev.ru static void
130311Sigor@sysoev.ru nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b)
130411Sigor@sysoev.ru {
1305974Smax.romanov@nginx.com     nxt_buf_chain_add(&b, port->free_bufs);
130611Sigor@sysoev.ru     port->free_bufs = b;
130711Sigor@sysoev.ru }
130811Sigor@sysoev.ru 
130911Sigor@sysoev.ru 
131011Sigor@sysoev.ru static void
131111Sigor@sysoev.ru nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
131211Sigor@sysoev.ru {
1313343Smax.romanov@nginx.com     int                  use_delta;
13141269Sigor@sysoev.ru     nxt_buf_t            *b, *next;
1315197Smax.romanov@nginx.com     nxt_port_t           *port;
1316197Smax.romanov@nginx.com     nxt_work_queue_t     *wq;
1317197Smax.romanov@nginx.com     nxt_port_send_msg_t  *msg;
1318197Smax.romanov@nginx.com 
1319125Smax.romanov@nginx.com     nxt_debug(task, "port error handler %p", obj);
132011Sigor@sysoev.ru     /* TODO */
1321197Smax.romanov@nginx.com 
1322197Smax.romanov@nginx.com     port = nxt_container_of(obj, nxt_port_t, socket);
1323197Smax.romanov@nginx.com 
1324343Smax.romanov@nginx.com     use_delta = 0;
1325343Smax.romanov@nginx.com 
1326343Smax.romanov@nginx.com     if (obj == data) {
1327343Smax.romanov@nginx.com         use_delta--;
1328343Smax.romanov@nginx.com     }
1329197Smax.romanov@nginx.com 
1330343Smax.romanov@nginx.com     wq = &task->thread->engine->fast_work_queue;
1331343Smax.romanov@nginx.com 
1332343Smax.romanov@nginx.com     nxt_thread_mutex_lock(&port->write_mutex);
1333343Smax.romanov@nginx.com 
1334343Smax.romanov@nginx.com     nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) {
1335197Smax.romanov@nginx.com 
13361558Smax.romanov@nginx.com         if (msg->close_fd) {
13371558Smax.romanov@nginx.com             if (msg->fd[0] != -1) {
13381558Smax.romanov@nginx.com                 nxt_fd_close(msg->fd[0]);
13391485Smax.romanov@nginx.com 
13401558Smax.romanov@nginx.com                 msg->fd[0] = -1;
13411558Smax.romanov@nginx.com             }
13421485Smax.romanov@nginx.com 
13431558Smax.romanov@nginx.com             if (msg->fd[1] != -1) {
13441558Smax.romanov@nginx.com                 nxt_fd_close(msg->fd[1]);
13451553Smax.romanov@nginx.com 
13461558Smax.romanov@nginx.com                 msg->fd[1] = -1;
13471558Smax.romanov@nginx.com             }
13481553Smax.romanov@nginx.com         }
13491553Smax.romanov@nginx.com 
13501269Sigor@sysoev.ru         for (b = msg->buf; b != NULL; b = next) {
13511269Sigor@sysoev.ru             next = b->next;
13521269Sigor@sysoev.ru             b->next = NULL;
13531269Sigor@sysoev.ru 
1354197Smax.romanov@nginx.com             if (nxt_buf_is_sync(b)) {
1355197Smax.romanov@nginx.com                 continue;
1356197Smax.romanov@nginx.com             }
1357197Smax.romanov@nginx.com 
1358197Smax.romanov@nginx.com             nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
1359197Smax.romanov@nginx.com         }
1360197Smax.romanov@nginx.com 
1361197Smax.romanov@nginx.com         nxt_queue_remove(&msg->link);
1362343Smax.romanov@nginx.com         use_delta--;
13631125Smax.romanov@nginx.com 
13641125Smax.romanov@nginx.com         nxt_port_release_send_msg(msg);
1365197Smax.romanov@nginx.com 
1366197Smax.romanov@nginx.com     } nxt_queue_loop;
1367343Smax.romanov@nginx.com 
1368343Smax.romanov@nginx.com     nxt_thread_mutex_unlock(&port->write_mutex);
1369343Smax.romanov@nginx.com 
1370343Smax.romanov@nginx.com     if (use_delta != 0) {
1371343Smax.romanov@nginx.com         nxt_port_use(task, port, use_delta);
1372343Smax.romanov@nginx.com     }
137311Sigor@sysoev.ru }
1374