xref: /unit/src/nxt_port_socket.c (revision 2139:99d792169ffb)
111Sigor@sysoev.ru 
211Sigor@sysoev.ru /*
311Sigor@sysoev.ru  * Copyright (C) Igor Sysoev
411Sigor@sysoev.ru  * Copyright (C) NGINX, Inc.
511Sigor@sysoev.ru  */
611Sigor@sysoev.ru 
711Sigor@sysoev.ru #include <nxt_main.h>
81996St.nateldemoura@f5.com #include <nxt_socket_msg.h>
91555Smax.romanov@nginx.com #include <nxt_port_queue.h>
101832Smax.romanov@nginx.com #include <nxt_port_memory_int.h>
1111Sigor@sysoev.ru 
1211Sigor@sysoev.ru 
131832Smax.romanov@nginx.com #define NXT_PORT_MAX_ENQUEUE_BUF_SIZE \
141832Smax.romanov@nginx.com           (int) (NXT_PORT_QUEUE_MSG_SIZE - sizeof(nxt_port_msg_t))
151832Smax.romanov@nginx.com 
161832Smax.romanov@nginx.com 
171832Smax.romanov@nginx.com static nxt_bool_t nxt_port_can_enqueue_buf(nxt_buf_t *b);
181832Smax.romanov@nginx.com static uint8_t nxt_port_enqueue_buf(nxt_task_t *task, nxt_port_msg_t *pm,
191832Smax.romanov@nginx.com     void *qbuf, nxt_buf_t *b);
201125Smax.romanov@nginx.com static nxt_int_t nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port,
211125Smax.romanov@nginx.com     nxt_port_send_msg_t *msg);
22*2139Sandrew@digital-domain.net static nxt_port_send_msg_t *nxt_port_msg_alloc(const nxt_port_send_msg_t *m);
2311Sigor@sysoev.ru static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data);
241125Smax.romanov@nginx.com static nxt_port_send_msg_t *nxt_port_msg_first(nxt_port_t *port);
251908Smax.romanov@nginx.com nxt_inline void nxt_port_msg_close_fd(nxt_port_send_msg_t *msg);
261996St.nateldemoura@f5.com nxt_inline void nxt_port_close_fds(nxt_fd_t *fd);
27592Sigor@sysoev.ru static nxt_buf_t *nxt_port_buf_completion(nxt_task_t *task,
28592Sigor@sysoev.ru     nxt_work_queue_t *wq, nxt_buf_t *b, size_t sent, nxt_bool_t mmap_mode);
291125Smax.romanov@nginx.com static nxt_port_send_msg_t *nxt_port_msg_insert_tail(nxt_port_t *port,
301125Smax.romanov@nginx.com     nxt_port_send_msg_t *msg);
3111Sigor@sysoev.ru static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data);
321555Smax.romanov@nginx.com static void nxt_port_queue_read_handler(nxt_task_t *task, void *obj,
331555Smax.romanov@nginx.com     void *data);
3411Sigor@sysoev.ru static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
3582Smax.romanov@nginx.com     nxt_port_recv_msg_t *msg);
3611Sigor@sysoev.ru static nxt_buf_t *nxt_port_buf_alloc(nxt_port_t *port);
3711Sigor@sysoev.ru static void nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b);
3811Sigor@sysoev.ru static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data);
3911Sigor@sysoev.ru 
4011Sigor@sysoev.ru 
4114Sigor@sysoev.ru nxt_int_t
nxt_port_socket_init(nxt_task_t * task,nxt_port_t * port,size_t max_size)4214Sigor@sysoev.ru nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port, size_t max_size)
4311Sigor@sysoev.ru {
4465Sigor@sysoev.ru     nxt_int_t     sndbuf, rcvbuf, size;
4565Sigor@sysoev.ru     nxt_socket_t  snd, rcv;
4611Sigor@sysoev.ru 
4714Sigor@sysoev.ru     port->socket.task = task;
4814Sigor@sysoev.ru 
4914Sigor@sysoev.ru     port->pair[0] = -1;
5014Sigor@sysoev.ru     port->pair[1] = -1;
5114Sigor@sysoev.ru 
5213Sigor@sysoev.ru     if (nxt_slow_path(nxt_socketpair_create(task, port->pair) != NXT_OK)) {
5311Sigor@sysoev.ru         goto socketpair_fail;
5411Sigor@sysoev.ru     }
5511Sigor@sysoev.ru 
5611Sigor@sysoev.ru     snd = port->pair[1];
5711Sigor@sysoev.ru 
5813Sigor@sysoev.ru     sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF);
5911Sigor@sysoev.ru     if (nxt_slow_path(sndbuf < 0)) {
6011Sigor@sysoev.ru         goto getsockopt_fail;
6111Sigor@sysoev.ru     }
6211Sigor@sysoev.ru 
6311Sigor@sysoev.ru     rcv = port->pair[0];
6411Sigor@sysoev.ru 
6513Sigor@sysoev.ru     rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF);
6611Sigor@sysoev.ru     if (nxt_slow_path(rcvbuf < 0)) {
6711Sigor@sysoev.ru         goto getsockopt_fail;
6811Sigor@sysoev.ru     }
6911Sigor@sysoev.ru 
7011Sigor@sysoev.ru     if (max_size == 0) {
7111Sigor@sysoev.ru         max_size = 16 * 1024;
7211Sigor@sysoev.ru     }
7311Sigor@sysoev.ru 
7411Sigor@sysoev.ru     if ((size_t) sndbuf < max_size) {
7511Sigor@sysoev.ru         /*
7611Sigor@sysoev.ru          * On Unix domain sockets
7711Sigor@sysoev.ru          *   Linux uses 224K on both send and receive directions;
7811Sigor@sysoev.ru          *   FreeBSD, MacOSX, NetBSD, and OpenBSD use 2K buffer size
7911Sigor@sysoev.ru          *   on send direction and 4K buffer size on receive direction;
8011Sigor@sysoev.ru          *   Solaris uses 16K on send direction and 5K on receive direction.
8111Sigor@sysoev.ru          */
8213Sigor@sysoev.ru         (void) nxt_socket_setsockopt(task, snd, SOL_SOCKET, SO_SNDBUF,
8313Sigor@sysoev.ru                                      max_size);
8411Sigor@sysoev.ru 
8513Sigor@sysoev.ru         sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF);
8611Sigor@sysoev.ru         if (nxt_slow_path(sndbuf < 0)) {
8711Sigor@sysoev.ru             goto getsockopt_fail;
8811Sigor@sysoev.ru         }
8911Sigor@sysoev.ru 
9011Sigor@sysoev.ru         size = sndbuf * 4;
9111Sigor@sysoev.ru 
9211Sigor@sysoev.ru         if (rcvbuf < size) {
9313Sigor@sysoev.ru             (void) nxt_socket_setsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF,
9413Sigor@sysoev.ru                                          size);
9511Sigor@sysoev.ru 
9613Sigor@sysoev.ru             rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF);
9711Sigor@sysoev.ru             if (nxt_slow_path(rcvbuf < 0)) {
9811Sigor@sysoev.ru                 goto getsockopt_fail;
9911Sigor@sysoev.ru             }
10011Sigor@sysoev.ru         }
10111Sigor@sysoev.ru     }
10211Sigor@sysoev.ru 
10311Sigor@sysoev.ru     port->max_size = nxt_min(max_size, (size_t) sndbuf);
10411Sigor@sysoev.ru     port->max_share = (64 * 1024);
10511Sigor@sysoev.ru 
10614Sigor@sysoev.ru     return NXT_OK;
10711Sigor@sysoev.ru 
10811Sigor@sysoev.ru getsockopt_fail:
10911Sigor@sysoev.ru 
11013Sigor@sysoev.ru     nxt_socket_close(task, port->pair[0]);
11113Sigor@sysoev.ru     nxt_socket_close(task, port->pair[1]);
11211Sigor@sysoev.ru 
11311Sigor@sysoev.ru socketpair_fail:
11411Sigor@sysoev.ru 
11514Sigor@sysoev.ru     return NXT_ERROR;
11611Sigor@sysoev.ru }
11711Sigor@sysoev.ru 
11811Sigor@sysoev.ru 
11911Sigor@sysoev.ru void
nxt_port_destroy(nxt_port_t * port)12011Sigor@sysoev.ru nxt_port_destroy(nxt_port_t *port)
12111Sigor@sysoev.ru {
12213Sigor@sysoev.ru     nxt_socket_close(port->socket.task, port->socket.fd);
12365Sigor@sysoev.ru     nxt_mp_destroy(port->mem_pool);
12411Sigor@sysoev.ru }
12511Sigor@sysoev.ru 
12611Sigor@sysoev.ru 
12711Sigor@sysoev.ru void
nxt_port_write_enable(nxt_task_t * task,nxt_port_t * port)12811Sigor@sysoev.ru nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port)
12911Sigor@sysoev.ru {
13011Sigor@sysoev.ru     port->socket.fd = port->pair[1];
13111Sigor@sysoev.ru     port->socket.log = &nxt_main_log;
13211Sigor@sysoev.ru     port->socket.write_ready = 1;
13311Sigor@sysoev.ru 
134141Smax.romanov@nginx.com     port->engine = task->thread->engine;
135141Smax.romanov@nginx.com 
136141Smax.romanov@nginx.com     port->socket.write_work_queue = &port->engine->fast_work_queue;
13711Sigor@sysoev.ru     port->socket.write_handler = nxt_port_write_handler;
13811Sigor@sysoev.ru     port->socket.error_handler = nxt_port_error_handler;
13911Sigor@sysoev.ru }
14011Sigor@sysoev.ru 
14111Sigor@sysoev.ru 
14211Sigor@sysoev.ru void
nxt_port_write_close(nxt_port_t * port)14311Sigor@sysoev.ru nxt_port_write_close(nxt_port_t *port)
14411Sigor@sysoev.ru {
14513Sigor@sysoev.ru     nxt_socket_close(port->socket.task, port->pair[1]);
14611Sigor@sysoev.ru     port->pair[1] = -1;
14711Sigor@sysoev.ru }
14811Sigor@sysoev.ru 
14911Sigor@sysoev.ru 
150122Smax.romanov@nginx.com static void
nxt_port_release_send_msg(nxt_port_send_msg_t * msg)1511125Smax.romanov@nginx.com nxt_port_release_send_msg(nxt_port_send_msg_t *msg)
152122Smax.romanov@nginx.com {
1531125Smax.romanov@nginx.com     if (msg->allocated) {
1541125Smax.romanov@nginx.com         nxt_free(msg);
155344Smax.romanov@nginx.com     }
156122Smax.romanov@nginx.com }
157122Smax.romanov@nginx.com 
158122Smax.romanov@nginx.com 
15911Sigor@sysoev.ru nxt_int_t
nxt_port_socket_write2(nxt_task_t * task,nxt_port_t * port,nxt_uint_t type,nxt_fd_t fd,nxt_fd_t fd2,uint32_t stream,nxt_port_id_t reply_port,nxt_buf_t * b)1601555Smax.romanov@nginx.com nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
1611555Smax.romanov@nginx.com     nxt_fd_t fd, nxt_fd_t fd2, uint32_t stream, nxt_port_id_t reply_port,
1621555Smax.romanov@nginx.com     nxt_buf_t *b)
16311Sigor@sysoev.ru {
1641555Smax.romanov@nginx.com     int                  notify;
1651832Smax.romanov@nginx.com     uint8_t              qmsg_size;
1661125Smax.romanov@nginx.com     nxt_int_t            res;
1671125Smax.romanov@nginx.com     nxt_port_send_msg_t  msg;
1681832Smax.romanov@nginx.com     struct {
1691832Smax.romanov@nginx.com         nxt_port_msg_t   pm;
1701832Smax.romanov@nginx.com         uint8_t          buf[NXT_PORT_MAX_ENQUEUE_BUF_SIZE];
1711832Smax.romanov@nginx.com     } qmsg;
17211Sigor@sysoev.ru 
173344Smax.romanov@nginx.com     msg.link.next = NULL;
174344Smax.romanov@nginx.com     msg.link.prev = NULL;
175122Smax.romanov@nginx.com 
176344Smax.romanov@nginx.com     msg.buf = b;
1771125Smax.romanov@nginx.com     msg.share = 0;
1781558Smax.romanov@nginx.com     msg.fd[0] = fd;
1791558Smax.romanov@nginx.com     msg.fd[1] = fd2;
180344Smax.romanov@nginx.com     msg.close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0;
1811125Smax.romanov@nginx.com     msg.allocated = 0;
18211Sigor@sysoev.ru 
183344Smax.romanov@nginx.com     msg.port_msg.stream = stream;
184344Smax.romanov@nginx.com     msg.port_msg.pid = nxt_pid;
185344Smax.romanov@nginx.com     msg.port_msg.reply_port = reply_port;
186344Smax.romanov@nginx.com     msg.port_msg.type = type & NXT_PORT_MSG_MASK;
187344Smax.romanov@nginx.com     msg.port_msg.last = (type & NXT_PORT_MSG_LAST) != 0;
188344Smax.romanov@nginx.com     msg.port_msg.mmap = 0;
189352Smax.romanov@nginx.com     msg.port_msg.nf = 0;
190352Smax.romanov@nginx.com     msg.port_msg.mf = 0;
1911555Smax.romanov@nginx.com 
1921555Smax.romanov@nginx.com     if (port->queue != NULL && type != _NXT_PORT_MSG_READ_QUEUE) {
1931555Smax.romanov@nginx.com 
1941832Smax.romanov@nginx.com         if (fd == -1 && nxt_port_can_enqueue_buf(b)) {
1951832Smax.romanov@nginx.com             qmsg.pm = msg.port_msg;
1961832Smax.romanov@nginx.com 
1971832Smax.romanov@nginx.com             qmsg_size = sizeof(qmsg.pm);
1981832Smax.romanov@nginx.com 
1991555Smax.romanov@nginx.com             if (b != NULL) {
2001832Smax.romanov@nginx.com                 qmsg_size += nxt_port_enqueue_buf(task, &qmsg.pm, qmsg.buf, b);
2011555Smax.romanov@nginx.com             }
2021555Smax.romanov@nginx.com 
2031832Smax.romanov@nginx.com             res = nxt_port_queue_send(port->queue, &qmsg, qmsg_size, &notify);
2041555Smax.romanov@nginx.com 
2051555Smax.romanov@nginx.com             nxt_debug(task, "port{%d,%d} %d: enqueue %d notify %d, %d",
2061555Smax.romanov@nginx.com                       (int) port->pid, (int) port->id, port->socket.fd,
2071832Smax.romanov@nginx.com                       (int) qmsg_size, notify, res);
2081832Smax.romanov@nginx.com 
2091832Smax.romanov@nginx.com             if (b != NULL && nxt_fast_path(res == NXT_OK)) {
2101832Smax.romanov@nginx.com                 if (qmsg.pm.mmap) {
2111832Smax.romanov@nginx.com                     b->is_port_mmap_sent = 1;
2121832Smax.romanov@nginx.com                 }
2131832Smax.romanov@nginx.com 
2141832Smax.romanov@nginx.com                 b->mem.pos = b->mem.free;
2151832Smax.romanov@nginx.com 
2161832Smax.romanov@nginx.com                 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2171832Smax.romanov@nginx.com                                    b->completion_handler, task, b, b->parent);
2181832Smax.romanov@nginx.com             }
2191555Smax.romanov@nginx.com 
2201555Smax.romanov@nginx.com             if (notify == 0) {
2211555Smax.romanov@nginx.com                 return res;
2221555Smax.romanov@nginx.com             }
2231555Smax.romanov@nginx.com 
2241555Smax.romanov@nginx.com             msg.port_msg.type = _NXT_PORT_MSG_READ_QUEUE;
2251555Smax.romanov@nginx.com             msg.buf = NULL;
2261555Smax.romanov@nginx.com 
2271555Smax.romanov@nginx.com         } else {
2281832Smax.romanov@nginx.com             qmsg.buf[0] = _NXT_PORT_MSG_READ_SOCKET;
2291555Smax.romanov@nginx.com 
2301832Smax.romanov@nginx.com             res = nxt_port_queue_send(port->queue, qmsg.buf, 1, &notify);
2311555Smax.romanov@nginx.com 
2321555Smax.romanov@nginx.com             nxt_debug(task, "port{%d,%d} %d: enqueue 1 notify %d, %d",
2331555Smax.romanov@nginx.com                       (int) port->pid, (int) port->id, port->socket.fd,
2341555Smax.romanov@nginx.com                       notify, res);
2351560Smax.romanov@nginx.com 
2361560Smax.romanov@nginx.com             if (nxt_slow_path(res == NXT_AGAIN)) {
2371560Smax.romanov@nginx.com                 return NXT_AGAIN;
2381560Smax.romanov@nginx.com             }
2391555Smax.romanov@nginx.com         }
2401555Smax.romanov@nginx.com     }
24111Sigor@sysoev.ru 
2421125Smax.romanov@nginx.com     res = nxt_port_msg_chk_insert(task, port, &msg);
2431125Smax.romanov@nginx.com     if (nxt_fast_path(res == NXT_DECLINED)) {
244344Smax.romanov@nginx.com         nxt_port_write_handler(task, &port->socket, &msg);
2451125Smax.romanov@nginx.com         res = NXT_OK;
24611Sigor@sysoev.ru     }
24711Sigor@sysoev.ru 
2481125Smax.romanov@nginx.com     return res;
2491125Smax.romanov@nginx.com }
2501125Smax.romanov@nginx.com 
2511125Smax.romanov@nginx.com 
2521832Smax.romanov@nginx.com static nxt_bool_t
nxt_port_can_enqueue_buf(nxt_buf_t * b)2531832Smax.romanov@nginx.com nxt_port_can_enqueue_buf(nxt_buf_t *b)
2541832Smax.romanov@nginx.com {
2551832Smax.romanov@nginx.com     if (b == NULL) {
2561832Smax.romanov@nginx.com         return 1;
2571832Smax.romanov@nginx.com     }
2581832Smax.romanov@nginx.com 
2591832Smax.romanov@nginx.com     if (b->next != NULL) {
2601832Smax.romanov@nginx.com         return 0;
2611832Smax.romanov@nginx.com     }
2621832Smax.romanov@nginx.com 
2631832Smax.romanov@nginx.com     return (nxt_buf_mem_used_size(&b->mem) <= NXT_PORT_MAX_ENQUEUE_BUF_SIZE
2641832Smax.romanov@nginx.com             || nxt_buf_is_port_mmap(b));
2651832Smax.romanov@nginx.com }
2661832Smax.romanov@nginx.com 
2671832Smax.romanov@nginx.com 
2681832Smax.romanov@nginx.com static uint8_t
nxt_port_enqueue_buf(nxt_task_t * task,nxt_port_msg_t * pm,void * qbuf,nxt_buf_t * b)2691832Smax.romanov@nginx.com nxt_port_enqueue_buf(nxt_task_t *task, nxt_port_msg_t *pm, void *qbuf,
2701832Smax.romanov@nginx.com     nxt_buf_t *b)
2711832Smax.romanov@nginx.com {
2721832Smax.romanov@nginx.com     ssize_t                  size;
2731832Smax.romanov@nginx.com     nxt_port_mmap_msg_t      *mm;
2741832Smax.romanov@nginx.com     nxt_port_mmap_header_t   *hdr;
2751832Smax.romanov@nginx.com     nxt_port_mmap_handler_t  *mmap_handler;
2761832Smax.romanov@nginx.com 
2771832Smax.romanov@nginx.com     size = nxt_buf_mem_used_size(&b->mem);
2781832Smax.romanov@nginx.com 
2791832Smax.romanov@nginx.com     if (size <= NXT_PORT_MAX_ENQUEUE_BUF_SIZE) {
2801832Smax.romanov@nginx.com         nxt_memcpy(qbuf, b->mem.pos, size);
2811832Smax.romanov@nginx.com 
2821832Smax.romanov@nginx.com         return size;
2831832Smax.romanov@nginx.com     }
2841832Smax.romanov@nginx.com 
2851832Smax.romanov@nginx.com     mmap_handler = b->parent;
2861832Smax.romanov@nginx.com     hdr = mmap_handler->hdr;
2871832Smax.romanov@nginx.com     mm = qbuf;
2881832Smax.romanov@nginx.com 
2891832Smax.romanov@nginx.com     mm->mmap_id = hdr->id;
2901832Smax.romanov@nginx.com     mm->chunk_id = nxt_port_mmap_chunk_id(hdr, b->mem.pos);
2911832Smax.romanov@nginx.com     mm->size = nxt_buf_mem_used_size(&b->mem);
2921832Smax.romanov@nginx.com 
2931832Smax.romanov@nginx.com     pm->mmap = 1;
2941832Smax.romanov@nginx.com 
2951832Smax.romanov@nginx.com     nxt_debug(task, "mmap_msg={%D, %D, %D}", mm->mmap_id, mm->chunk_id,
2961832Smax.romanov@nginx.com               mm->size);
2971832Smax.romanov@nginx.com 
2981832Smax.romanov@nginx.com     return sizeof(nxt_port_mmap_msg_t);
2991832Smax.romanov@nginx.com }
3001832Smax.romanov@nginx.com 
3011832Smax.romanov@nginx.com 
3021125Smax.romanov@nginx.com static nxt_int_t
nxt_port_msg_chk_insert(nxt_task_t * task,nxt_port_t * port,nxt_port_send_msg_t * msg)3031125Smax.romanov@nginx.com nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port,
3041125Smax.romanov@nginx.com     nxt_port_send_msg_t *msg)
3051125Smax.romanov@nginx.com {
3061125Smax.romanov@nginx.com     nxt_int_t  res;
3071125Smax.romanov@nginx.com 
3081125Smax.romanov@nginx.com     nxt_thread_mutex_lock(&port->write_mutex);
3091125Smax.romanov@nginx.com 
3101125Smax.romanov@nginx.com     if (nxt_fast_path(port->socket.write_ready
3111125Smax.romanov@nginx.com                       && nxt_queue_is_empty(&port->messages)))
3121125Smax.romanov@nginx.com     {
3131125Smax.romanov@nginx.com         res = NXT_DECLINED;
3141125Smax.romanov@nginx.com 
3151125Smax.romanov@nginx.com     } else {
3161125Smax.romanov@nginx.com         msg = nxt_port_msg_alloc(msg);
3171125Smax.romanov@nginx.com 
3181125Smax.romanov@nginx.com         if (nxt_fast_path(msg != NULL)) {
3191125Smax.romanov@nginx.com             nxt_queue_insert_tail(&port->messages, &msg->link);
3201125Smax.romanov@nginx.com             nxt_port_use(task, port, 1);
3211125Smax.romanov@nginx.com             res = NXT_OK;
3221125Smax.romanov@nginx.com 
3231125Smax.romanov@nginx.com         } else {
3241125Smax.romanov@nginx.com             res = NXT_ERROR;
3251125Smax.romanov@nginx.com         }
3261125Smax.romanov@nginx.com     }
3271125Smax.romanov@nginx.com 
3281125Smax.romanov@nginx.com     nxt_thread_mutex_unlock(&port->write_mutex);
3291125Smax.romanov@nginx.com 
3301125Smax.romanov@nginx.com     return res;
3311125Smax.romanov@nginx.com }
3321125Smax.romanov@nginx.com 
3331125Smax.romanov@nginx.com 
3341125Smax.romanov@nginx.com static nxt_port_send_msg_t *
nxt_port_msg_alloc(const nxt_port_send_msg_t * m)335*2139Sandrew@digital-domain.net nxt_port_msg_alloc(const nxt_port_send_msg_t *m)
3361125Smax.romanov@nginx.com {
3371125Smax.romanov@nginx.com     nxt_port_send_msg_t  *msg;
3381125Smax.romanov@nginx.com 
3391125Smax.romanov@nginx.com     msg = nxt_malloc(sizeof(nxt_port_send_msg_t));
3401125Smax.romanov@nginx.com     if (nxt_slow_path(msg == NULL)) {
3411125Smax.romanov@nginx.com         return NULL;
3421125Smax.romanov@nginx.com     }
3431125Smax.romanov@nginx.com 
3441125Smax.romanov@nginx.com     *msg = *m;
3451125Smax.romanov@nginx.com 
3461125Smax.romanov@nginx.com     msg->allocated = 1;
3471125Smax.romanov@nginx.com 
3481125Smax.romanov@nginx.com     return msg;
34911Sigor@sysoev.ru }
35011Sigor@sysoev.ru 
35111Sigor@sysoev.ru 
35211Sigor@sysoev.ru static void
nxt_port_fd_block_write(nxt_task_t * task,nxt_port_t * port,void * data)353343Smax.romanov@nginx.com nxt_port_fd_block_write(nxt_task_t *task, nxt_port_t *port, void *data)
354343Smax.romanov@nginx.com {
355343Smax.romanov@nginx.com     nxt_fd_event_block_write(task->thread->engine, &port->socket);
356343Smax.romanov@nginx.com }
357343Smax.romanov@nginx.com 
358343Smax.romanov@nginx.com 
359343Smax.romanov@nginx.com static void
nxt_port_fd_enable_write(nxt_task_t * task,nxt_port_t * port,void * data)360343Smax.romanov@nginx.com nxt_port_fd_enable_write(nxt_task_t *task, nxt_port_t *port, void *data)
361343Smax.romanov@nginx.com {
362343Smax.romanov@nginx.com     nxt_fd_event_enable_write(task->thread->engine, &port->socket);
363343Smax.romanov@nginx.com }
364343Smax.romanov@nginx.com 
365343Smax.romanov@nginx.com 
366343Smax.romanov@nginx.com static void
nxt_port_write_handler(nxt_task_t * task,void * obj,void * data)36711Sigor@sysoev.ru nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
36811Sigor@sysoev.ru {
369343Smax.romanov@nginx.com     int                     use_delta;
370197Smax.romanov@nginx.com     size_t                  plain_size;
37111Sigor@sysoev.ru     ssize_t                 n;
3721125Smax.romanov@nginx.com     uint32_t                mmsg_buf[3 * NXT_IOBUF_MAX * 10];
373343Smax.romanov@nginx.com     nxt_bool_t              block_write, enable_write;
37411Sigor@sysoev.ru     nxt_port_t              *port;
3751125Smax.romanov@nginx.com     struct iovec            iov[NXT_IOBUF_MAX * 10];
376127Smax.romanov@nginx.com     nxt_work_queue_t        *wq;
377125Smax.romanov@nginx.com     nxt_port_method_t       m;
37811Sigor@sysoev.ru     nxt_port_send_msg_t     *msg;
37911Sigor@sysoev.ru     nxt_sendbuf_coalesce_t  sb;
38042Smax.romanov@nginx.com 
381197Smax.romanov@nginx.com     port = nxt_container_of(obj, nxt_port_t, socket);
38211Sigor@sysoev.ru 
383343Smax.romanov@nginx.com     block_write = 0;
384343Smax.romanov@nginx.com     enable_write = 0;
385343Smax.romanov@nginx.com     use_delta = 0;
386343Smax.romanov@nginx.com 
387344Smax.romanov@nginx.com     wq = &task->thread->engine->fast_work_queue;
388344Smax.romanov@nginx.com 
38911Sigor@sysoev.ru     do {
3901125Smax.romanov@nginx.com         if (data) {
3911125Smax.romanov@nginx.com             msg = data;
3921125Smax.romanov@nginx.com 
3931125Smax.romanov@nginx.com         } else {
3941125Smax.romanov@nginx.com             msg = nxt_port_msg_first(port);
39511Sigor@sysoev.ru 
3961125Smax.romanov@nginx.com             if (msg == NULL) {
3971125Smax.romanov@nginx.com                 block_write = 1;
3981125Smax.romanov@nginx.com                 goto cleanup;
3991125Smax.romanov@nginx.com             }
40011Sigor@sysoev.ru         }
40111Sigor@sysoev.ru 
4021125Smax.romanov@nginx.com next_fragment:
4031125Smax.romanov@nginx.com 
40414Sigor@sysoev.ru         iov[0].iov_base = &msg->port_msg;
40514Sigor@sysoev.ru         iov[0].iov_len = sizeof(nxt_port_msg_t);
40611Sigor@sysoev.ru 
40711Sigor@sysoev.ru         sb.buf = msg->buf;
40814Sigor@sysoev.ru         sb.iobuf = &iov[1];
40911Sigor@sysoev.ru         sb.nmax = NXT_IOBUF_MAX - 1;
41011Sigor@sysoev.ru         sb.sync = 0;
41111Sigor@sysoev.ru         sb.last = 0;
41242Smax.romanov@nginx.com         sb.size = 0;
41311Sigor@sysoev.ru         sb.limit = port->max_size;
41411Sigor@sysoev.ru 
415352Smax.romanov@nginx.com         sb.limit_reached = 0;
416352Smax.romanov@nginx.com         sb.nmax_reached = 0;
417352Smax.romanov@nginx.com 
41842Smax.romanov@nginx.com         m = nxt_port_mmap_get_method(task, port, msg->buf);
41942Smax.romanov@nginx.com 
42042Smax.romanov@nginx.com         if (m == NXT_PORT_METHOD_MMAP) {
42142Smax.romanov@nginx.com             sb.limit = (1ULL << 31) - 1;
422352Smax.romanov@nginx.com             sb.nmax = nxt_min(NXT_IOBUF_MAX * 10 - 1,
423352Smax.romanov@nginx.com                               port->max_size / PORT_MMAP_MIN_SIZE);
42442Smax.romanov@nginx.com         }
42542Smax.romanov@nginx.com 
4261002Smax.romanov@nginx.com         sb.limit -= iov[0].iov_len;
4271002Smax.romanov@nginx.com 
42842Smax.romanov@nginx.com         nxt_sendbuf_mem_coalesce(task, &sb);
42942Smax.romanov@nginx.com 
43042Smax.romanov@nginx.com         plain_size = sb.size;
43142Smax.romanov@nginx.com 
43242Smax.romanov@nginx.com         /*
43342Smax.romanov@nginx.com          * Send through mmap enabled only when payload
43442Smax.romanov@nginx.com          * is bigger than PORT_MMAP_MIN_SIZE.
43542Smax.romanov@nginx.com          */
43642Smax.romanov@nginx.com         if (m == NXT_PORT_METHOD_MMAP && plain_size > PORT_MMAP_MIN_SIZE) {
4371125Smax.romanov@nginx.com             nxt_port_mmap_write(task, port, msg, &sb, mmsg_buf);
43842Smax.romanov@nginx.com 
43942Smax.romanov@nginx.com         } else {
44042Smax.romanov@nginx.com             m = NXT_PORT_METHOD_PLAIN;
44142Smax.romanov@nginx.com         }
44211Sigor@sysoev.ru 
443189Smax.romanov@nginx.com         msg->port_msg.last |= sb.last;
444352Smax.romanov@nginx.com         msg->port_msg.mf = sb.limit_reached || sb.nmax_reached;
44511Sigor@sysoev.ru 
4461558Smax.romanov@nginx.com         n = nxt_socketpair_send(&port->socket, msg->fd, iov, sb.niov + 1);
44711Sigor@sysoev.ru 
44811Sigor@sysoev.ru         if (n > 0) {
44942Smax.romanov@nginx.com             if (nxt_slow_path((size_t) n != sb.size + iov[0].iov_len)) {
450564Svbart@nginx.com                 nxt_alert(task, "port %d: short write: %z instead of %uz",
451564Svbart@nginx.com                           port->socket.fd, n, sb.size + iov[0].iov_len);
45211Sigor@sysoev.ru                 goto fail;
45311Sigor@sysoev.ru             }
45411Sigor@sysoev.ru 
4551908Smax.romanov@nginx.com             nxt_port_msg_close_fd(msg);
4561553Smax.romanov@nginx.com 
457592Sigor@sysoev.ru             msg->buf = nxt_port_buf_completion(task, wq, msg->buf, plain_size,
4581125Smax.romanov@nginx.com                                                m == NXT_PORT_METHOD_MMAP);
45911Sigor@sysoev.ru 
46011Sigor@sysoev.ru             if (msg->buf != NULL) {
461352Smax.romanov@nginx.com                 nxt_debug(task, "port %d: frag stream #%uD", port->socket.fd,
462352Smax.romanov@nginx.com                           msg->port_msg.stream);
463352Smax.romanov@nginx.com 
46411Sigor@sysoev.ru                 /*
46511Sigor@sysoev.ru                  * A file descriptor is sent only
46611Sigor@sysoev.ru                  * in the first message of a stream.
46711Sigor@sysoev.ru                  */
4681558Smax.romanov@nginx.com                 msg->fd[0] = -1;
4691558Smax.romanov@nginx.com                 msg->fd[1] = -1;
47011Sigor@sysoev.ru                 msg->share += n;
471352Smax.romanov@nginx.com                 msg->port_msg.nf = 1;
47211Sigor@sysoev.ru 
47311Sigor@sysoev.ru                 if (msg->share >= port->max_share) {
47411Sigor@sysoev.ru                     msg->share = 0;
475344Smax.romanov@nginx.com 
476344Smax.romanov@nginx.com                     if (msg->link.next != NULL) {
4771125Smax.romanov@nginx.com                         nxt_thread_mutex_lock(&port->write_mutex);
4781125Smax.romanov@nginx.com 
479344Smax.romanov@nginx.com                         nxt_queue_remove(&msg->link);
4801125Smax.romanov@nginx.com                         nxt_queue_insert_tail(&port->messages, &msg->link);
4811125Smax.romanov@nginx.com 
4821125Smax.romanov@nginx.com                         nxt_thread_mutex_unlock(&port->write_mutex);
483344Smax.romanov@nginx.com 
4841125Smax.romanov@nginx.com                     } else {
4851125Smax.romanov@nginx.com                         msg = nxt_port_msg_insert_tail(port, msg);
4861125Smax.romanov@nginx.com                         if (nxt_slow_path(msg == NULL)) {
4871125Smax.romanov@nginx.com                             goto fail;
4881125Smax.romanov@nginx.com                         }
4891125Smax.romanov@nginx.com 
490344Smax.romanov@nginx.com                         use_delta++;
491344Smax.romanov@nginx.com                     }
4921125Smax.romanov@nginx.com 
4931125Smax.romanov@nginx.com                 } else {
4941125Smax.romanov@nginx.com                     goto next_fragment;
49511Sigor@sysoev.ru                 }
49611Sigor@sysoev.ru 
49711Sigor@sysoev.ru             } else {
498344Smax.romanov@nginx.com                 if (msg->link.next != NULL) {
4991125Smax.romanov@nginx.com                     nxt_thread_mutex_lock(&port->write_mutex);
5001125Smax.romanov@nginx.com 
501344Smax.romanov@nginx.com                     nxt_queue_remove(&msg->link);
5021125Smax.romanov@nginx.com                     msg->link.next = NULL;
5031125Smax.romanov@nginx.com 
5041125Smax.romanov@nginx.com                     nxt_thread_mutex_unlock(&port->write_mutex);
5051125Smax.romanov@nginx.com 
506344Smax.romanov@nginx.com                     use_delta--;
507344Smax.romanov@nginx.com                 }
5081125Smax.romanov@nginx.com 
5091125Smax.romanov@nginx.com                 nxt_port_release_send_msg(msg);
5101125Smax.romanov@nginx.com             }
5111125Smax.romanov@nginx.com 
5121125Smax.romanov@nginx.com             if (data != NULL) {
5131125Smax.romanov@nginx.com                 goto cleanup;
51411Sigor@sysoev.ru             }
51511Sigor@sysoev.ru 
5161004Smax.romanov@nginx.com         } else {
5171125Smax.romanov@nginx.com             if (nxt_slow_path(n == NXT_ERROR)) {
5181907Smax.romanov@nginx.com                 if (msg->link.next == NULL) {
5191908Smax.romanov@nginx.com                     nxt_port_msg_close_fd(msg);
5201907Smax.romanov@nginx.com 
5211907Smax.romanov@nginx.com                     nxt_port_release_send_msg(msg);
5221907Smax.romanov@nginx.com                 }
5231907Smax.romanov@nginx.com 
5241125Smax.romanov@nginx.com                 goto fail;
525344Smax.romanov@nginx.com             }
5261004Smax.romanov@nginx.com 
5271125Smax.romanov@nginx.com             if (msg->link.next == NULL) {
5281125Smax.romanov@nginx.com                 msg = nxt_port_msg_insert_tail(port, msg);
5291125Smax.romanov@nginx.com                 if (nxt_slow_path(msg == NULL)) {
5301125Smax.romanov@nginx.com                     goto fail;
5311125Smax.romanov@nginx.com                 }
5321125Smax.romanov@nginx.com 
5331125Smax.romanov@nginx.com                 use_delta++;
5341004Smax.romanov@nginx.com             }
53511Sigor@sysoev.ru         }
53611Sigor@sysoev.ru 
53711Sigor@sysoev.ru     } while (port->socket.write_ready);
53811Sigor@sysoev.ru 
53912Sigor@sysoev.ru     if (nxt_fd_event_is_disabled(port->socket.write)) {
540343Smax.romanov@nginx.com         enable_write = 1;
54111Sigor@sysoev.ru     }
54211Sigor@sysoev.ru 
5431125Smax.romanov@nginx.com     goto cleanup;
54411Sigor@sysoev.ru 
54511Sigor@sysoev.ru fail:
54611Sigor@sysoev.ru 
547343Smax.romanov@nginx.com     use_delta++;
548343Smax.romanov@nginx.com 
549344Smax.romanov@nginx.com     nxt_work_queue_add(wq, nxt_port_error_handler, task, &port->socket,
550343Smax.romanov@nginx.com                        &port->socket);
551343Smax.romanov@nginx.com 
5521125Smax.romanov@nginx.com cleanup:
553343Smax.romanov@nginx.com 
554343Smax.romanov@nginx.com     if (block_write && nxt_fd_event_is_active(port->socket.write)) {
555343Smax.romanov@nginx.com         nxt_port_post(task, port, nxt_port_fd_block_write, NULL);
556343Smax.romanov@nginx.com     }
557343Smax.romanov@nginx.com 
558343Smax.romanov@nginx.com     if (enable_write) {
559343Smax.romanov@nginx.com         nxt_port_post(task, port, nxt_port_fd_enable_write, NULL);
560343Smax.romanov@nginx.com     }
561343Smax.romanov@nginx.com 
562343Smax.romanov@nginx.com     if (use_delta != 0) {
563343Smax.romanov@nginx.com         nxt_port_use(task, port, use_delta);
564343Smax.romanov@nginx.com     }
56511Sigor@sysoev.ru }
56611Sigor@sysoev.ru 
56711Sigor@sysoev.ru 
5681125Smax.romanov@nginx.com static nxt_port_send_msg_t *
nxt_port_msg_first(nxt_port_t * port)5691125Smax.romanov@nginx.com nxt_port_msg_first(nxt_port_t *port)
5701125Smax.romanov@nginx.com {
5711125Smax.romanov@nginx.com     nxt_queue_link_t     *lnk;
5721125Smax.romanov@nginx.com     nxt_port_send_msg_t  *msg;
5731125Smax.romanov@nginx.com 
5741125Smax.romanov@nginx.com     nxt_thread_mutex_lock(&port->write_mutex);
5751125Smax.romanov@nginx.com 
5761125Smax.romanov@nginx.com     lnk = nxt_queue_first(&port->messages);
5771125Smax.romanov@nginx.com 
5781125Smax.romanov@nginx.com     if (lnk == nxt_queue_tail(&port->messages)) {
5791125Smax.romanov@nginx.com         msg = NULL;
5801125Smax.romanov@nginx.com 
5811125Smax.romanov@nginx.com     } else {
5821125Smax.romanov@nginx.com         msg = nxt_queue_link_data(lnk, nxt_port_send_msg_t, link);
5831125Smax.romanov@nginx.com     }
5841125Smax.romanov@nginx.com 
5851125Smax.romanov@nginx.com     nxt_thread_mutex_unlock(&port->write_mutex);
5861125Smax.romanov@nginx.com 
5871125Smax.romanov@nginx.com     return msg;
5881125Smax.romanov@nginx.com }
5891125Smax.romanov@nginx.com 
5901125Smax.romanov@nginx.com 
5911908Smax.romanov@nginx.com nxt_inline void
nxt_port_msg_close_fd(nxt_port_send_msg_t * msg)5921908Smax.romanov@nginx.com nxt_port_msg_close_fd(nxt_port_send_msg_t *msg)
5931908Smax.romanov@nginx.com {
5941908Smax.romanov@nginx.com     if (!msg->close_fd) {
5951908Smax.romanov@nginx.com         return;
5961908Smax.romanov@nginx.com     }
5971908Smax.romanov@nginx.com 
5981996St.nateldemoura@f5.com     nxt_port_close_fds(msg->fd);
5991996St.nateldemoura@f5.com }
6001996St.nateldemoura@f5.com 
6011908Smax.romanov@nginx.com 
6021996St.nateldemoura@f5.com nxt_inline void
nxt_port_close_fds(nxt_fd_t * fd)6031996St.nateldemoura@f5.com nxt_port_close_fds(nxt_fd_t *fd)
6041996St.nateldemoura@f5.com {
6051996St.nateldemoura@f5.com     if (fd[0] != -1) {
6061996St.nateldemoura@f5.com         nxt_fd_close(fd[0]);
6071996St.nateldemoura@f5.com         fd[0] = -1;
6081908Smax.romanov@nginx.com     }
6091908Smax.romanov@nginx.com 
6101996St.nateldemoura@f5.com     if (fd[1] != -1) {
6111996St.nateldemoura@f5.com         nxt_fd_close(fd[1]);
6121996St.nateldemoura@f5.com         fd[1] = -1;
6131908Smax.romanov@nginx.com     }
6141908Smax.romanov@nginx.com }
6151908Smax.romanov@nginx.com 
6161908Smax.romanov@nginx.com 
617592Sigor@sysoev.ru static nxt_buf_t *
nxt_port_buf_completion(nxt_task_t * task,nxt_work_queue_t * wq,nxt_buf_t * b,size_t sent,nxt_bool_t mmap_mode)618592Sigor@sysoev.ru nxt_port_buf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b,
619592Sigor@sysoev.ru     size_t sent, nxt_bool_t mmap_mode)
620592Sigor@sysoev.ru {
6211269Sigor@sysoev.ru     size_t     size;
6221269Sigor@sysoev.ru     nxt_buf_t  *next;
623592Sigor@sysoev.ru 
624592Sigor@sysoev.ru     while (b != NULL) {
625592Sigor@sysoev.ru 
626592Sigor@sysoev.ru         nxt_prefetch(b->next);
627592Sigor@sysoev.ru 
628592Sigor@sysoev.ru         if (!nxt_buf_is_sync(b)) {
629592Sigor@sysoev.ru 
630592Sigor@sysoev.ru             size = nxt_buf_used_size(b);
631592Sigor@sysoev.ru 
632592Sigor@sysoev.ru             if (size != 0) {
633592Sigor@sysoev.ru 
634592Sigor@sysoev.ru                 if (sent == 0) {
635592Sigor@sysoev.ru                     break;
636592Sigor@sysoev.ru                 }
637592Sigor@sysoev.ru 
638592Sigor@sysoev.ru                 if (nxt_buf_is_port_mmap(b) && mmap_mode) {
639592Sigor@sysoev.ru                     /*
640592Sigor@sysoev.ru                      * buffer has been sent to other side which is now
641592Sigor@sysoev.ru                      * responsible for shared memory bucket release
642592Sigor@sysoev.ru                      */
643592Sigor@sysoev.ru                     b->is_port_mmap_sent = 1;
644592Sigor@sysoev.ru                 }
645592Sigor@sysoev.ru 
646592Sigor@sysoev.ru                 if (sent < size) {
647592Sigor@sysoev.ru 
648592Sigor@sysoev.ru                     if (nxt_buf_is_mem(b)) {
649592Sigor@sysoev.ru                         b->mem.pos += sent;
650592Sigor@sysoev.ru                     }
651592Sigor@sysoev.ru 
652592Sigor@sysoev.ru                     if (nxt_buf_is_file(b)) {
653592Sigor@sysoev.ru                         b->file_pos += sent;
654592Sigor@sysoev.ru                     }
655592Sigor@sysoev.ru 
656592Sigor@sysoev.ru                     break;
657592Sigor@sysoev.ru                 }
658592Sigor@sysoev.ru 
659592Sigor@sysoev.ru                 /* b->mem.free is NULL in file-only buffer. */
660592Sigor@sysoev.ru                 b->mem.pos = b->mem.free;
661592Sigor@sysoev.ru 
662592Sigor@sysoev.ru                 if (nxt_buf_is_file(b)) {
663592Sigor@sysoev.ru                     b->file_pos = b->file_end;
664592Sigor@sysoev.ru                 }
665592Sigor@sysoev.ru 
666592Sigor@sysoev.ru                 sent -= size;
667592Sigor@sysoev.ru             }
668592Sigor@sysoev.ru         }
669592Sigor@sysoev.ru 
670592Sigor@sysoev.ru         nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
671592Sigor@sysoev.ru 
6721269Sigor@sysoev.ru         next = b->next;
6731269Sigor@sysoev.ru         b->next = NULL;
6741269Sigor@sysoev.ru         b = next;
675592Sigor@sysoev.ru     }
676592Sigor@sysoev.ru 
677592Sigor@sysoev.ru     return b;
678592Sigor@sysoev.ru }
679592Sigor@sysoev.ru 
680592Sigor@sysoev.ru 
6811125Smax.romanov@nginx.com static nxt_port_send_msg_t *
nxt_port_msg_insert_tail(nxt_port_t * port,nxt_port_send_msg_t * msg)6821125Smax.romanov@nginx.com nxt_port_msg_insert_tail(nxt_port_t *port, nxt_port_send_msg_t *msg)
6831125Smax.romanov@nginx.com {
6841125Smax.romanov@nginx.com     if (msg->allocated == 0) {
6851125Smax.romanov@nginx.com         msg = nxt_port_msg_alloc(msg);
6861125Smax.romanov@nginx.com 
6871125Smax.romanov@nginx.com         if (nxt_slow_path(msg == NULL)) {
6881125Smax.romanov@nginx.com             return NULL;
6891125Smax.romanov@nginx.com         }
6901125Smax.romanov@nginx.com     }
6911125Smax.romanov@nginx.com 
6921125Smax.romanov@nginx.com     nxt_thread_mutex_lock(&port->write_mutex);
6931125Smax.romanov@nginx.com 
6941125Smax.romanov@nginx.com     nxt_queue_insert_tail(&port->messages, &msg->link);
6951125Smax.romanov@nginx.com 
6961125Smax.romanov@nginx.com     nxt_thread_mutex_unlock(&port->write_mutex);
6971125Smax.romanov@nginx.com 
6981125Smax.romanov@nginx.com     return msg;
6991125Smax.romanov@nginx.com }
7001125Smax.romanov@nginx.com 
7011125Smax.romanov@nginx.com 
70211Sigor@sysoev.ru void
nxt_port_read_enable(nxt_task_t * task,nxt_port_t * port)70311Sigor@sysoev.ru nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port)
70411Sigor@sysoev.ru {
70511Sigor@sysoev.ru     port->socket.fd = port->pair[0];
70611Sigor@sysoev.ru     port->socket.log = &nxt_main_log;
70711Sigor@sysoev.ru 
708141Smax.romanov@nginx.com     port->engine = task->thread->engine;
709141Smax.romanov@nginx.com 
710141Smax.romanov@nginx.com     port->socket.read_work_queue = &port->engine->fast_work_queue;
7111555Smax.romanov@nginx.com     port->socket.read_handler = port->queue != NULL
7121555Smax.romanov@nginx.com                                 ? nxt_port_queue_read_handler
7131555Smax.romanov@nginx.com                                 : nxt_port_read_handler;
71411Sigor@sysoev.ru     port->socket.error_handler = nxt_port_error_handler;
71511Sigor@sysoev.ru 
716141Smax.romanov@nginx.com     nxt_fd_event_enable_read(port->engine, &port->socket);
71711Sigor@sysoev.ru }
71811Sigor@sysoev.ru 
71911Sigor@sysoev.ru 
72011Sigor@sysoev.ru void
nxt_port_read_close(nxt_port_t * port)72111Sigor@sysoev.ru nxt_port_read_close(nxt_port_t *port)
72211Sigor@sysoev.ru {
723350Smax.romanov@nginx.com     port->socket.read_ready = 0;
7241015Smax.romanov@nginx.com     port->socket.read = NXT_EVENT_INACTIVE;
72513Sigor@sysoev.ru     nxt_socket_close(port->socket.task, port->pair[0]);
72611Sigor@sysoev.ru     port->pair[0] = -1;
72711Sigor@sysoev.ru }
72811Sigor@sysoev.ru 
72911Sigor@sysoev.ru 
73011Sigor@sysoev.ru static void
nxt_port_read_handler(nxt_task_t * task,void * obj,void * data)73111Sigor@sysoev.ru nxt_port_read_handler(nxt_task_t *task, void *obj, void *data)
73211Sigor@sysoev.ru {
73342Smax.romanov@nginx.com     ssize_t              n;
73442Smax.romanov@nginx.com     nxt_buf_t            *b;
7351996St.nateldemoura@f5.com     nxt_int_t            ret;
73642Smax.romanov@nginx.com     nxt_port_t           *port;
7371996St.nateldemoura@f5.com     nxt_recv_oob_t       oob;
7381996St.nateldemoura@f5.com     nxt_port_recv_msg_t  msg;
73942Smax.romanov@nginx.com     struct iovec         iov[2];
74011Sigor@sysoev.ru 
741125Smax.romanov@nginx.com     port = msg.port = nxt_container_of(obj, nxt_port_t, socket);
74211Sigor@sysoev.ru 
743141Smax.romanov@nginx.com     nxt_assert(port->engine == task->thread->engine);
744141Smax.romanov@nginx.com 
74511Sigor@sysoev.ru     for ( ;; ) {
74611Sigor@sysoev.ru         b = nxt_port_buf_alloc(port);
74711Sigor@sysoev.ru 
74811Sigor@sysoev.ru         if (nxt_slow_path(b == NULL)) {
74911Sigor@sysoev.ru             /* TODO: disable event for some time */
75011Sigor@sysoev.ru         }
75111Sigor@sysoev.ru 
75242Smax.romanov@nginx.com         iov[0].iov_base = &msg.port_msg;
75314Sigor@sysoev.ru         iov[0].iov_len = sizeof(nxt_port_msg_t);
75411Sigor@sysoev.ru 
75514Sigor@sysoev.ru         iov[1].iov_base = b->mem.pos;
75614Sigor@sysoev.ru         iov[1].iov_len = port->max_size;
75714Sigor@sysoev.ru 
7581996St.nateldemoura@f5.com         n = nxt_socketpair_recv(&port->socket, iov, 2, &oob);
75911Sigor@sysoev.ru 
76011Sigor@sysoev.ru         if (n > 0) {
7611996St.nateldemoura@f5.com             msg.fd[0] = -1;
7621996St.nateldemoura@f5.com             msg.fd[1] = -1;
7631996St.nateldemoura@f5.com 
7641996St.nateldemoura@f5.com             ret = nxt_socket_msg_oob_get(&oob, msg.fd,
7651996St.nateldemoura@f5.com                                          nxt_recv_msg_cmsg_pid_ref(&msg));
7661996St.nateldemoura@f5.com             if (nxt_slow_path(ret != NXT_OK)) {
7671996St.nateldemoura@f5.com                 nxt_alert(task, "failed to get oob data from %d",
7681996St.nateldemoura@f5.com                           port->socket.fd);
7691996St.nateldemoura@f5.com 
7701996St.nateldemoura@f5.com                 nxt_port_close_fds(msg.fd);
7711996St.nateldemoura@f5.com 
7721996St.nateldemoura@f5.com                 goto fail;
7731996St.nateldemoura@f5.com             }
77442Smax.romanov@nginx.com 
77542Smax.romanov@nginx.com             msg.buf = b;
77682Smax.romanov@nginx.com             msg.size = n;
77742Smax.romanov@nginx.com 
77882Smax.romanov@nginx.com             nxt_port_read_msg_process(task, port, &msg);
77911Sigor@sysoev.ru 
780194Smax.romanov@nginx.com             /*
781194Smax.romanov@nginx.com              * To disable instant completion or buffer re-usage,
782194Smax.romanov@nginx.com              * handler should reset 'msg.buf'.
783194Smax.romanov@nginx.com              */
784194Smax.romanov@nginx.com             if (msg.buf == b) {
78511Sigor@sysoev.ru                 nxt_port_buf_free(port, b);
78611Sigor@sysoev.ru             }
78711Sigor@sysoev.ru 
78811Sigor@sysoev.ru             if (port->socket.read_ready) {
78911Sigor@sysoev.ru                 continue;
79011Sigor@sysoev.ru             }
79111Sigor@sysoev.ru 
79211Sigor@sysoev.ru             return;
79311Sigor@sysoev.ru         }
79411Sigor@sysoev.ru 
79511Sigor@sysoev.ru         if (n == NXT_AGAIN) {
79611Sigor@sysoev.ru             nxt_port_buf_free(port, b);
79711Sigor@sysoev.ru 
79812Sigor@sysoev.ru             nxt_fd_event_enable_read(task->thread->engine, &port->socket);
79911Sigor@sysoev.ru             return;
80011Sigor@sysoev.ru         }
80111Sigor@sysoev.ru 
8021996St.nateldemoura@f5.com fail:
8031996St.nateldemoura@f5.com         /* n == 0 || error  */
80411Sigor@sysoev.ru         nxt_work_queue_add(&task->thread->engine->fast_work_queue,
80511Sigor@sysoev.ru                            nxt_port_error_handler, task, &port->socket, NULL);
80611Sigor@sysoev.ru         return;
80711Sigor@sysoev.ru     }
80811Sigor@sysoev.ru }
80911Sigor@sysoev.ru 
81011Sigor@sysoev.ru 
8111555Smax.romanov@nginx.com static void
nxt_port_queue_read_handler(nxt_task_t * task,void * obj,void * data)8121555Smax.romanov@nginx.com nxt_port_queue_read_handler(nxt_task_t *task, void *obj, void *data)
8131555Smax.romanov@nginx.com {
8141555Smax.romanov@nginx.com     ssize_t              n;
8151555Smax.romanov@nginx.com     nxt_buf_t            *b;
8161996St.nateldemoura@f5.com     nxt_int_t            ret;
8171555Smax.romanov@nginx.com     nxt_port_t           *port;
8181555Smax.romanov@nginx.com     struct iovec         iov[2];
8191996St.nateldemoura@f5.com     nxt_recv_oob_t       oob;
8201555Smax.romanov@nginx.com     nxt_port_queue_t     *queue;
8211555Smax.romanov@nginx.com     nxt_port_recv_msg_t  msg, *smsg;
8221555Smax.romanov@nginx.com     uint8_t              qmsg[NXT_PORT_QUEUE_MSG_SIZE];
8231555Smax.romanov@nginx.com 
8241555Smax.romanov@nginx.com     port = nxt_container_of(obj, nxt_port_t, socket);
8251555Smax.romanov@nginx.com     msg.port = port;
8261555Smax.romanov@nginx.com 
8271555Smax.romanov@nginx.com     nxt_assert(port->engine == task->thread->engine);
8281555Smax.romanov@nginx.com 
8291555Smax.romanov@nginx.com     queue = port->queue;
8301555Smax.romanov@nginx.com     nxt_atomic_fetch_add(&queue->nitems, 1);
8311555Smax.romanov@nginx.com 
8321555Smax.romanov@nginx.com     for ( ;; ) {
8331555Smax.romanov@nginx.com 
8341555Smax.romanov@nginx.com         if (port->from_socket == 0) {
8351555Smax.romanov@nginx.com             n = nxt_port_queue_recv(queue, qmsg);
8361555Smax.romanov@nginx.com 
8371555Smax.romanov@nginx.com             if (n < 0 && !port->socket.read_ready) {
8381555Smax.romanov@nginx.com                 nxt_atomic_fetch_add(&queue->nitems, -1);
8391555Smax.romanov@nginx.com 
8401555Smax.romanov@nginx.com                 n = nxt_port_queue_recv(queue, qmsg);
8411555Smax.romanov@nginx.com                 if (n < 0) {
8421555Smax.romanov@nginx.com                     return;
8431555Smax.romanov@nginx.com                 }
8441555Smax.romanov@nginx.com 
8451555Smax.romanov@nginx.com                 nxt_atomic_fetch_add(&queue->nitems, 1);
8461555Smax.romanov@nginx.com             }
8471555Smax.romanov@nginx.com 
8481555Smax.romanov@nginx.com             if (n == 1 && qmsg[0] == _NXT_PORT_MSG_READ_SOCKET) {
8491555Smax.romanov@nginx.com                 port->from_socket++;
8501555Smax.romanov@nginx.com 
8511555Smax.romanov@nginx.com                 nxt_debug(task, "port{%d,%d} %d: dequeue 1 read_socket %d",
8521555Smax.romanov@nginx.com                           (int) port->pid, (int) port->id, port->socket.fd,
8531555Smax.romanov@nginx.com                           port->from_socket);
8541555Smax.romanov@nginx.com 
8551555Smax.romanov@nginx.com                 continue;
8561555Smax.romanov@nginx.com             }
8571555Smax.romanov@nginx.com 
8581555Smax.romanov@nginx.com             nxt_debug(task, "port{%d,%d} %d: dequeue %d",
8591555Smax.romanov@nginx.com                       (int) port->pid, (int) port->id, port->socket.fd,
8601555Smax.romanov@nginx.com                       (int) n);
8611555Smax.romanov@nginx.com 
8621555Smax.romanov@nginx.com         } else {
8631555Smax.romanov@nginx.com             if ((smsg = port->socket_msg) != NULL && smsg->size != 0) {
8641555Smax.romanov@nginx.com                 msg.port_msg = smsg->port_msg;
8651555Smax.romanov@nginx.com                 b = smsg->buf;
8661555Smax.romanov@nginx.com                 n = smsg->size;
8671558Smax.romanov@nginx.com                 msg.fd[0] = smsg->fd[0];
8681558Smax.romanov@nginx.com                 msg.fd[1] = smsg->fd[1];
8691555Smax.romanov@nginx.com 
8701555Smax.romanov@nginx.com                 smsg->size = 0;
8711555Smax.romanov@nginx.com 
8721555Smax.romanov@nginx.com                 port->from_socket--;
8731555Smax.romanov@nginx.com 
8741555Smax.romanov@nginx.com                 nxt_debug(task, "port{%d,%d} %d: use suspended message %d",
8751555Smax.romanov@nginx.com                           (int) port->pid, (int) port->id, port->socket.fd,
8761555Smax.romanov@nginx.com                           (int) n);
8771555Smax.romanov@nginx.com 
8781555Smax.romanov@nginx.com                 goto process;
8791555Smax.romanov@nginx.com             }
8801555Smax.romanov@nginx.com 
8811555Smax.romanov@nginx.com             n = -1;
8821555Smax.romanov@nginx.com         }
8831555Smax.romanov@nginx.com 
8841555Smax.romanov@nginx.com         if (n < 0 && !port->socket.read_ready) {
8851555Smax.romanov@nginx.com             nxt_atomic_fetch_add(&queue->nitems, -1);
8861555Smax.romanov@nginx.com             return;
8871555Smax.romanov@nginx.com         }
8881555Smax.romanov@nginx.com 
8891555Smax.romanov@nginx.com         b = nxt_port_buf_alloc(port);
8901555Smax.romanov@nginx.com 
8911555Smax.romanov@nginx.com         if (nxt_slow_path(b == NULL)) {
8921555Smax.romanov@nginx.com             /* TODO: disable event for some time */
8931555Smax.romanov@nginx.com         }
8941555Smax.romanov@nginx.com 
8951555Smax.romanov@nginx.com         if (n >= (ssize_t) sizeof(nxt_port_msg_t)) {
8961555Smax.romanov@nginx.com             nxt_memcpy(&msg.port_msg, qmsg, sizeof(nxt_port_msg_t));
8971555Smax.romanov@nginx.com 
8981555Smax.romanov@nginx.com             if (n > (ssize_t) sizeof(nxt_port_msg_t)) {
8991555Smax.romanov@nginx.com                 nxt_memcpy(b->mem.pos, qmsg + sizeof(nxt_port_msg_t),
9001555Smax.romanov@nginx.com                            n - sizeof(nxt_port_msg_t));
9011555Smax.romanov@nginx.com             }
9021555Smax.romanov@nginx.com 
9031555Smax.romanov@nginx.com         } else {
9041555Smax.romanov@nginx.com             iov[0].iov_base = &msg.port_msg;
9051555Smax.romanov@nginx.com             iov[0].iov_len = sizeof(nxt_port_msg_t);
9061555Smax.romanov@nginx.com 
9071555Smax.romanov@nginx.com             iov[1].iov_base = b->mem.pos;
9081555Smax.romanov@nginx.com             iov[1].iov_len = port->max_size;
9091555Smax.romanov@nginx.com 
9101996St.nateldemoura@f5.com             n = nxt_socketpair_recv(&port->socket, iov, 2, &oob);
9111996St.nateldemoura@f5.com 
9121996St.nateldemoura@f5.com             if (n > 0) {
9131996St.nateldemoura@f5.com                 msg.fd[0] = -1;
9141996St.nateldemoura@f5.com                 msg.fd[1] = -1;
9151996St.nateldemoura@f5.com 
9161996St.nateldemoura@f5.com                 ret = nxt_socket_msg_oob_get(&oob, msg.fd,
9171996St.nateldemoura@f5.com                                              nxt_recv_msg_cmsg_pid_ref(&msg));
9181996St.nateldemoura@f5.com                 if (nxt_slow_path(ret != NXT_OK)) {
9191996St.nateldemoura@f5.com                     nxt_alert(task, "failed to get oob data from %d",
9201996St.nateldemoura@f5.com                               port->socket.fd);
9211996St.nateldemoura@f5.com 
9221996St.nateldemoura@f5.com                     nxt_port_close_fds(msg.fd);
9231996St.nateldemoura@f5.com 
9241996St.nateldemoura@f5.com                     return;
9251996St.nateldemoura@f5.com                 }
9261996St.nateldemoura@f5.com             }
9271555Smax.romanov@nginx.com 
9281555Smax.romanov@nginx.com             if (n == (ssize_t) sizeof(nxt_port_msg_t)
9291555Smax.romanov@nginx.com                 && msg.port_msg.type == _NXT_PORT_MSG_READ_QUEUE)
9301555Smax.romanov@nginx.com             {
9311555Smax.romanov@nginx.com                 nxt_port_buf_free(port, b);
9321555Smax.romanov@nginx.com 
9331555Smax.romanov@nginx.com                 nxt_debug(task, "port{%d,%d} %d: recv %d read_queue",
9341555Smax.romanov@nginx.com                           (int) port->pid, (int) port->id, port->socket.fd,
9351555Smax.romanov@nginx.com                           (int) n);
9361555Smax.romanov@nginx.com 
9371555Smax.romanov@nginx.com                 continue;
9381555Smax.romanov@nginx.com             }
9391555Smax.romanov@nginx.com 
9401555Smax.romanov@nginx.com             nxt_debug(task, "port{%d,%d} %d: recvmsg %d",
9411555Smax.romanov@nginx.com                       (int) port->pid, (int) port->id, port->socket.fd,
9421555Smax.romanov@nginx.com                       (int) n);
9431555Smax.romanov@nginx.com 
9441555Smax.romanov@nginx.com             if (n > 0) {
9451555Smax.romanov@nginx.com                 if (port->from_socket == 0) {
9461555Smax.romanov@nginx.com                     nxt_debug(task, "port{%d,%d} %d: suspend message %d",
9471555Smax.romanov@nginx.com                               (int) port->pid, (int) port->id, port->socket.fd,
9481555Smax.romanov@nginx.com                               (int) n);
9491555Smax.romanov@nginx.com 
9501555Smax.romanov@nginx.com                     smsg = port->socket_msg;
9511555Smax.romanov@nginx.com 
9521555Smax.romanov@nginx.com                     if (nxt_slow_path(smsg == NULL)) {
9531555Smax.romanov@nginx.com                         smsg = nxt_mp_alloc(port->mem_pool,
9541555Smax.romanov@nginx.com                                             sizeof(nxt_port_recv_msg_t));
9551555Smax.romanov@nginx.com 
9561555Smax.romanov@nginx.com                         if (nxt_slow_path(smsg == NULL)) {
9571555Smax.romanov@nginx.com                             nxt_alert(task, "port{%d,%d} %d: suspend message "
9581555Smax.romanov@nginx.com                                             "failed",
9591555Smax.romanov@nginx.com                                       (int) port->pid, (int) port->id,
9601555Smax.romanov@nginx.com                                       port->socket.fd);
9611555Smax.romanov@nginx.com 
9621555Smax.romanov@nginx.com                             return;
9631555Smax.romanov@nginx.com                         }
9641555Smax.romanov@nginx.com 
9651555Smax.romanov@nginx.com                         port->socket_msg = smsg;
9661555Smax.romanov@nginx.com 
9671555Smax.romanov@nginx.com                     } else {
9681555Smax.romanov@nginx.com                         if (nxt_slow_path(smsg->size != 0)) {
9691555Smax.romanov@nginx.com                             nxt_alert(task, "port{%d,%d} %d: too many suspend "
9701555Smax.romanov@nginx.com                                             "messages",
9711555Smax.romanov@nginx.com                                       (int) port->pid, (int) port->id,
9721555Smax.romanov@nginx.com                                       port->socket.fd);
9731555Smax.romanov@nginx.com 
9741555Smax.romanov@nginx.com                             return;
9751555Smax.romanov@nginx.com                         }
9761555Smax.romanov@nginx.com                     }
9771555Smax.romanov@nginx.com 
9781555Smax.romanov@nginx.com                     smsg->port_msg = msg.port_msg;
9791555Smax.romanov@nginx.com                     smsg->buf = b;
9801555Smax.romanov@nginx.com                     smsg->size = n;
9811558Smax.romanov@nginx.com                     smsg->fd[0] = msg.fd[0];
9821558Smax.romanov@nginx.com                     smsg->fd[1] = msg.fd[1];
9831555Smax.romanov@nginx.com 
9841555Smax.romanov@nginx.com                     continue;
9851555Smax.romanov@nginx.com                 }
9861555Smax.romanov@nginx.com 
9871555Smax.romanov@nginx.com                 port->from_socket--;
9881555Smax.romanov@nginx.com             }
9891555Smax.romanov@nginx.com         }
9901555Smax.romanov@nginx.com 
9911555Smax.romanov@nginx.com     process:
9921555Smax.romanov@nginx.com 
9931555Smax.romanov@nginx.com         if (n > 0) {
9941555Smax.romanov@nginx.com             msg.buf = b;
9951555Smax.romanov@nginx.com             msg.size = n;
9961555Smax.romanov@nginx.com 
9971555Smax.romanov@nginx.com             nxt_port_read_msg_process(task, port, &msg);
9981555Smax.romanov@nginx.com 
9991555Smax.romanov@nginx.com             /*
10001555Smax.romanov@nginx.com              * To disable instant completion or buffer re-usage,
10011555Smax.romanov@nginx.com              * handler should reset 'msg.buf'.
10021555Smax.romanov@nginx.com              */
10031555Smax.romanov@nginx.com             if (msg.buf == b) {
10041555Smax.romanov@nginx.com                 nxt_port_buf_free(port, b);
10051555Smax.romanov@nginx.com             }
10061555Smax.romanov@nginx.com 
10071555Smax.romanov@nginx.com             continue;
10081555Smax.romanov@nginx.com         }
10091555Smax.romanov@nginx.com 
10101555Smax.romanov@nginx.com         if (n == NXT_AGAIN) {
10111555Smax.romanov@nginx.com             nxt_port_buf_free(port, b);
10121555Smax.romanov@nginx.com 
10131555Smax.romanov@nginx.com             nxt_fd_event_enable_read(task->thread->engine, &port->socket);
10141555Smax.romanov@nginx.com 
10151555Smax.romanov@nginx.com             continue;
10161555Smax.romanov@nginx.com         }
10171555Smax.romanov@nginx.com 
10181555Smax.romanov@nginx.com         /* n == 0 || n == NXT_ERROR */
10191555Smax.romanov@nginx.com 
10201555Smax.romanov@nginx.com         nxt_work_queue_add(&task->thread->engine->fast_work_queue,
10211555Smax.romanov@nginx.com                            nxt_port_error_handler, task, &port->socket, NULL);
10221555Smax.romanov@nginx.com         return;
10231555Smax.romanov@nginx.com     }
10241555Smax.romanov@nginx.com }
10251555Smax.romanov@nginx.com 
10261555Smax.romanov@nginx.com 
10271005Smax.romanov@nginx.com typedef struct {
10281005Smax.romanov@nginx.com     uint32_t  stream;
10291005Smax.romanov@nginx.com     uint32_t  pid;
10301005Smax.romanov@nginx.com } nxt_port_frag_key_t;
10311005Smax.romanov@nginx.com 
10321005Smax.romanov@nginx.com 
1033352Smax.romanov@nginx.com static nxt_int_t
nxt_port_lvlhsh_frag_test(nxt_lvlhsh_query_t * lhq,void * data)1034352Smax.romanov@nginx.com nxt_port_lvlhsh_frag_test(nxt_lvlhsh_query_t *lhq, void *data)
1035352Smax.romanov@nginx.com {
1036352Smax.romanov@nginx.com     nxt_port_recv_msg_t  *fmsg;
10371005Smax.romanov@nginx.com     nxt_port_frag_key_t  *frag_key;
1038352Smax.romanov@nginx.com 
1039352Smax.romanov@nginx.com     fmsg = data;
10401005Smax.romanov@nginx.com     frag_key = (nxt_port_frag_key_t *) lhq->key.start;
1041352Smax.romanov@nginx.com 
10421005Smax.romanov@nginx.com     if (lhq->key.length == sizeof(nxt_port_frag_key_t)
10431005Smax.romanov@nginx.com         && frag_key->stream == fmsg->port_msg.stream
10441005Smax.romanov@nginx.com         && frag_key->pid == (uint32_t) fmsg->port_msg.pid)
1045352Smax.romanov@nginx.com     {
1046352Smax.romanov@nginx.com         return NXT_OK;
1047352Smax.romanov@nginx.com     }
1048352Smax.romanov@nginx.com 
1049352Smax.romanov@nginx.com     return NXT_DECLINED;
1050352Smax.romanov@nginx.com }
1051352Smax.romanov@nginx.com 
1052352Smax.romanov@nginx.com 
1053352Smax.romanov@nginx.com static void *
nxt_port_lvlhsh_frag_alloc(void * ctx,size_t size)1054352Smax.romanov@nginx.com nxt_port_lvlhsh_frag_alloc(void *ctx, size_t size)
1055352Smax.romanov@nginx.com {
10561084Smax.romanov@nginx.com     return nxt_mp_align(ctx, size, size);
1057352Smax.romanov@nginx.com }
1058352Smax.romanov@nginx.com 
1059352Smax.romanov@nginx.com 
1060352Smax.romanov@nginx.com static void
nxt_port_lvlhsh_frag_free(void * ctx,void * p)1061352Smax.romanov@nginx.com nxt_port_lvlhsh_frag_free(void *ctx, void *p)
1062352Smax.romanov@nginx.com {
1063389Smax.romanov@nginx.com     nxt_mp_free(ctx, p);
1064352Smax.romanov@nginx.com }
1065352Smax.romanov@nginx.com 
1066352Smax.romanov@nginx.com 
1067352Smax.romanov@nginx.com static const nxt_lvlhsh_proto_t  lvlhsh_frag_proto  nxt_aligned(64) = {
1068352Smax.romanov@nginx.com     NXT_LVLHSH_DEFAULT,
1069352Smax.romanov@nginx.com     nxt_port_lvlhsh_frag_test,
1070352Smax.romanov@nginx.com     nxt_port_lvlhsh_frag_alloc,
1071352Smax.romanov@nginx.com     nxt_port_lvlhsh_frag_free,
1072352Smax.romanov@nginx.com };
1073352Smax.romanov@nginx.com 
1074352Smax.romanov@nginx.com 
1075352Smax.romanov@nginx.com static nxt_port_recv_msg_t *
nxt_port_frag_start(nxt_task_t * task,nxt_port_t * port,nxt_port_recv_msg_t * msg)1076352Smax.romanov@nginx.com nxt_port_frag_start(nxt_task_t *task, nxt_port_t *port,
1077352Smax.romanov@nginx.com     nxt_port_recv_msg_t *msg)
1078352Smax.romanov@nginx.com {
1079352Smax.romanov@nginx.com     nxt_int_t            res;
1080352Smax.romanov@nginx.com     nxt_lvlhsh_query_t   lhq;
1081352Smax.romanov@nginx.com     nxt_port_recv_msg_t  *fmsg;
10821005Smax.romanov@nginx.com     nxt_port_frag_key_t  frag_key;
1083352Smax.romanov@nginx.com 
1084352Smax.romanov@nginx.com     nxt_debug(task, "start frag stream #%uD", msg->port_msg.stream);
1085352Smax.romanov@nginx.com 
1086352Smax.romanov@nginx.com     fmsg = nxt_mp_alloc(port->mem_pool, sizeof(nxt_port_recv_msg_t));
1087352Smax.romanov@nginx.com 
1088352Smax.romanov@nginx.com     if (nxt_slow_path(fmsg == NULL)) {
1089352Smax.romanov@nginx.com         return NULL;
1090352Smax.romanov@nginx.com     }
1091352Smax.romanov@nginx.com 
1092352Smax.romanov@nginx.com     *fmsg = *msg;
1093352Smax.romanov@nginx.com 
10941005Smax.romanov@nginx.com     frag_key.stream = fmsg->port_msg.stream;
10951005Smax.romanov@nginx.com     frag_key.pid = fmsg->port_msg.pid;
10961005Smax.romanov@nginx.com 
10971005Smax.romanov@nginx.com     lhq.key_hash = nxt_murmur_hash2(&frag_key, sizeof(nxt_port_frag_key_t));
10981005Smax.romanov@nginx.com     lhq.key.length = sizeof(nxt_port_frag_key_t);
10991005Smax.romanov@nginx.com     lhq.key.start = (u_char *) &frag_key;
1100352Smax.romanov@nginx.com     lhq.proto = &lvlhsh_frag_proto;
1101352Smax.romanov@nginx.com     lhq.replace = 0;
1102352Smax.romanov@nginx.com     lhq.value = fmsg;
1103352Smax.romanov@nginx.com     lhq.pool = port->mem_pool;
1104352Smax.romanov@nginx.com 
1105352Smax.romanov@nginx.com     res = nxt_lvlhsh_insert(&port->frags, &lhq);
1106352Smax.romanov@nginx.com 
1107352Smax.romanov@nginx.com     switch (res) {
1108352Smax.romanov@nginx.com 
1109352Smax.romanov@nginx.com     case NXT_OK:
1110352Smax.romanov@nginx.com         return fmsg;
1111352Smax.romanov@nginx.com 
1112352Smax.romanov@nginx.com     case NXT_DECLINED:
1113352Smax.romanov@nginx.com         nxt_log(task, NXT_LOG_WARN, "duplicate frag stream #%uD",
1114352Smax.romanov@nginx.com                 fmsg->port_msg.stream);
1115352Smax.romanov@nginx.com         nxt_mp_free(port->mem_pool, fmsg);
1116352Smax.romanov@nginx.com 
1117352Smax.romanov@nginx.com         return NULL;
1118352Smax.romanov@nginx.com 
1119352Smax.romanov@nginx.com     default:
1120352Smax.romanov@nginx.com         nxt_log(task, NXT_LOG_WARN, "failed to add frag stream #%uD",
1121352Smax.romanov@nginx.com                 fmsg->port_msg.stream);
1122352Smax.romanov@nginx.com 
1123352Smax.romanov@nginx.com         nxt_mp_free(port->mem_pool, fmsg);
1124352Smax.romanov@nginx.com 
1125352Smax.romanov@nginx.com         return NULL;
1126352Smax.romanov@nginx.com 
1127352Smax.romanov@nginx.com     }
1128352Smax.romanov@nginx.com }
1129352Smax.romanov@nginx.com 
1130352Smax.romanov@nginx.com 
1131352Smax.romanov@nginx.com static nxt_port_recv_msg_t *
nxt_port_frag_find(nxt_task_t * task,nxt_port_t * port,nxt_port_recv_msg_t * msg)11321005Smax.romanov@nginx.com nxt_port_frag_find(nxt_task_t *task, nxt_port_t *port, nxt_port_recv_msg_t *msg)
1133352Smax.romanov@nginx.com {
11341005Smax.romanov@nginx.com     nxt_int_t            res;
11351005Smax.romanov@nginx.com     nxt_bool_t           last;
11361005Smax.romanov@nginx.com     nxt_lvlhsh_query_t   lhq;
11371005Smax.romanov@nginx.com     nxt_port_frag_key_t  frag_key;
11381005Smax.romanov@nginx.com 
11391005Smax.romanov@nginx.com     last = msg->port_msg.mf == 0;
1140352Smax.romanov@nginx.com 
11411005Smax.romanov@nginx.com     nxt_debug(task, "%s frag stream #%uD", last ? "last" : "next",
11421005Smax.romanov@nginx.com               msg->port_msg.stream);
1143352Smax.romanov@nginx.com 
11441005Smax.romanov@nginx.com     frag_key.stream = msg->port_msg.stream;
11451005Smax.romanov@nginx.com     frag_key.pid = msg->port_msg.pid;
11461005Smax.romanov@nginx.com 
11471005Smax.romanov@nginx.com     lhq.key_hash = nxt_murmur_hash2(&frag_key, sizeof(nxt_port_frag_key_t));
11481005Smax.romanov@nginx.com     lhq.key.length = sizeof(nxt_port_frag_key_t);
11491005Smax.romanov@nginx.com     lhq.key.start = (u_char *) &frag_key;
1150352Smax.romanov@nginx.com     lhq.proto = &lvlhsh_frag_proto;
1151352Smax.romanov@nginx.com     lhq.pool = port->mem_pool;
1152352Smax.romanov@nginx.com 
1153352Smax.romanov@nginx.com     res = last != 0 ? nxt_lvlhsh_delete(&port->frags, &lhq) :
1154352Smax.romanov@nginx.com           nxt_lvlhsh_find(&port->frags, &lhq);
1155352Smax.romanov@nginx.com 
1156352Smax.romanov@nginx.com     switch (res) {
1157352Smax.romanov@nginx.com 
1158352Smax.romanov@nginx.com     case NXT_OK:
1159352Smax.romanov@nginx.com         return lhq.value;
1160352Smax.romanov@nginx.com 
1161352Smax.romanov@nginx.com     default:
11621005Smax.romanov@nginx.com         nxt_log(task, NXT_LOG_INFO, "frag stream #%uD not found",
11631005Smax.romanov@nginx.com                 frag_key.stream);
1164352Smax.romanov@nginx.com 
1165352Smax.romanov@nginx.com         return NULL;
1166352Smax.romanov@nginx.com     }
1167352Smax.romanov@nginx.com }
1168352Smax.romanov@nginx.com 
1169352Smax.romanov@nginx.com 
117011Sigor@sysoev.ru static void
nxt_port_read_msg_process(nxt_task_t * task,nxt_port_t * port,nxt_port_recv_msg_t * msg)117111Sigor@sysoev.ru nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
117282Smax.romanov@nginx.com     nxt_port_recv_msg_t *msg)
117311Sigor@sysoev.ru {
11741269Sigor@sysoev.ru     nxt_buf_t            *b, *orig_b, *next;
1175352Smax.romanov@nginx.com     nxt_port_recv_msg_t  *fmsg;
117611Sigor@sysoev.ru 
117782Smax.romanov@nginx.com     if (nxt_slow_path(msg->size < sizeof(nxt_port_msg_t))) {
1178564Svbart@nginx.com         nxt_alert(task, "port %d: too small message:%uz",
1179564Svbart@nginx.com                   port->socket.fd, msg->size);
1180423Smax.romanov@nginx.com 
11811996St.nateldemoura@f5.com         nxt_port_close_fds(msg->fd);
11821553Smax.romanov@nginx.com 
1183423Smax.romanov@nginx.com         return;
118411Sigor@sysoev.ru     }
118511Sigor@sysoev.ru 
118642Smax.romanov@nginx.com     /* adjust size to actual buffer used size */
118782Smax.romanov@nginx.com     msg->size -= sizeof(nxt_port_msg_t);
118842Smax.romanov@nginx.com 
118942Smax.romanov@nginx.com     b = orig_b = msg->buf;
119082Smax.romanov@nginx.com     b->mem.free += msg->size;
119142Smax.romanov@nginx.com 
11921555Smax.romanov@nginx.com     msg->cancelled = 0;
119311Sigor@sysoev.ru 
1194352Smax.romanov@nginx.com     if (nxt_slow_path(msg->port_msg.nf != 0)) {
1195423Smax.romanov@nginx.com 
11961005Smax.romanov@nginx.com         fmsg = nxt_port_frag_find(task, port, msg);
1197352Smax.romanov@nginx.com 
1198551Smax.romanov@nginx.com         if (nxt_slow_path(fmsg == NULL)) {
1199551Smax.romanov@nginx.com             goto fmsg_failed;
1200551Smax.romanov@nginx.com         }
1201423Smax.romanov@nginx.com 
1202423Smax.romanov@nginx.com         if (nxt_fast_path(fmsg->cancelled == 0)) {
1203423Smax.romanov@nginx.com 
1204423Smax.romanov@nginx.com             if (msg->port_msg.mmap) {
1205423Smax.romanov@nginx.com                 nxt_port_mmap_read(task, msg);
1206423Smax.romanov@nginx.com             }
1207423Smax.romanov@nginx.com 
1208423Smax.romanov@nginx.com             nxt_buf_chain_add(&fmsg->buf, msg->buf);
1209423Smax.romanov@nginx.com 
1210423Smax.romanov@nginx.com             fmsg->size += msg->size;
1211423Smax.romanov@nginx.com             msg->buf = NULL;
1212423Smax.romanov@nginx.com             b = NULL;
1213423Smax.romanov@nginx.com 
1214423Smax.romanov@nginx.com             if (nxt_fast_path(msg->port_msg.mf == 0)) {
1215423Smax.romanov@nginx.com 
1216423Smax.romanov@nginx.com                 b = fmsg->buf;
1217423Smax.romanov@nginx.com 
1218423Smax.romanov@nginx.com                 port->handler(task, fmsg);
1219423Smax.romanov@nginx.com 
1220423Smax.romanov@nginx.com                 msg->buf = fmsg->buf;
12211558Smax.romanov@nginx.com                 msg->fd[0] = fmsg->fd[0];
12221558Smax.romanov@nginx.com                 msg->fd[1] = fmsg->fd[1];
1223974Smax.romanov@nginx.com 
1224974Smax.romanov@nginx.com                 /*
1225974Smax.romanov@nginx.com                  * To disable instant completion or buffer re-usage,
1226974Smax.romanov@nginx.com                  * handler should reset 'msg.buf'.
1227974Smax.romanov@nginx.com                  */
1228974Smax.romanov@nginx.com                 if (!msg->port_msg.mmap && msg->buf == b) {
1229974Smax.romanov@nginx.com                     nxt_port_buf_free(port, b);
1230974Smax.romanov@nginx.com                 }
1231423Smax.romanov@nginx.com             }
1232352Smax.romanov@nginx.com         }
1233352Smax.romanov@nginx.com 
1234352Smax.romanov@nginx.com         if (nxt_fast_path(msg->port_msg.mf == 0)) {
1235352Smax.romanov@nginx.com             nxt_mp_free(port->mem_pool, fmsg);
1236352Smax.romanov@nginx.com         }
1237352Smax.romanov@nginx.com     } else {
1238352Smax.romanov@nginx.com         if (nxt_slow_path(msg->port_msg.mf != 0)) {
1239423Smax.romanov@nginx.com 
1240423Smax.romanov@nginx.com             if (msg->port_msg.mmap && msg->cancelled == 0) {
1241423Smax.romanov@nginx.com                 nxt_port_mmap_read(task, msg);
1242423Smax.romanov@nginx.com                 b = msg->buf;
1243423Smax.romanov@nginx.com             }
1244423Smax.romanov@nginx.com 
1245352Smax.romanov@nginx.com             fmsg = nxt_port_frag_start(task, port, msg);
1246352Smax.romanov@nginx.com 
1247551Smax.romanov@nginx.com             if (nxt_slow_path(fmsg == NULL)) {
1248551Smax.romanov@nginx.com                 goto fmsg_failed;
1249551Smax.romanov@nginx.com             }
1250352Smax.romanov@nginx.com 
1251352Smax.romanov@nginx.com             fmsg->port_msg.nf = 0;
1252352Smax.romanov@nginx.com             fmsg->port_msg.mf = 0;
1253352Smax.romanov@nginx.com 
1254423Smax.romanov@nginx.com             if (nxt_fast_path(msg->cancelled == 0)) {
1255423Smax.romanov@nginx.com                 msg->buf = NULL;
12561558Smax.romanov@nginx.com                 msg->fd[0] = -1;
12571558Smax.romanov@nginx.com                 msg->fd[1] = -1;
1258423Smax.romanov@nginx.com                 b = NULL;
1259423Smax.romanov@nginx.com 
1260423Smax.romanov@nginx.com             } else {
12611996St.nateldemoura@f5.com                 nxt_port_close_fds(msg->fd);
1262423Smax.romanov@nginx.com             }
1263352Smax.romanov@nginx.com         } else {
1264423Smax.romanov@nginx.com             if (nxt_fast_path(msg->cancelled == 0)) {
1265423Smax.romanov@nginx.com 
1266423Smax.romanov@nginx.com                 if (msg->port_msg.mmap) {
1267423Smax.romanov@nginx.com                     nxt_port_mmap_read(task, msg);
1268423Smax.romanov@nginx.com                     b = msg->buf;
1269423Smax.romanov@nginx.com                 }
1270423Smax.romanov@nginx.com 
1271423Smax.romanov@nginx.com                 port->handler(task, msg);
1272423Smax.romanov@nginx.com             }
1273352Smax.romanov@nginx.com         }
1274352Smax.romanov@nginx.com     }
127542Smax.romanov@nginx.com 
1276551Smax.romanov@nginx.com fmsg_failed:
1277551Smax.romanov@nginx.com 
127882Smax.romanov@nginx.com     if (msg->port_msg.mmap && orig_b != b) {
127942Smax.romanov@nginx.com 
1280194Smax.romanov@nginx.com         /*
1281194Smax.romanov@nginx.com          * To disable instant buffer completion,
1282194Smax.romanov@nginx.com          * handler should reset 'msg->buf'.
1283194Smax.romanov@nginx.com          */
1284194Smax.romanov@nginx.com         if (msg->buf == b) {
1285194Smax.romanov@nginx.com             /* complete mmap buffers */
12861269Sigor@sysoev.ru             while (b != NULL) {
1287194Smax.romanov@nginx.com                 nxt_debug(task, "complete buffer %p", b);
1288194Smax.romanov@nginx.com 
1289194Smax.romanov@nginx.com                 nxt_work_queue_add(port->socket.read_work_queue,
1290194Smax.romanov@nginx.com                     b->completion_handler, task, b, b->parent);
12911269Sigor@sysoev.ru 
12921269Sigor@sysoev.ru                 next = b->next;
12931269Sigor@sysoev.ru                 b->next = NULL;
12941269Sigor@sysoev.ru                 b = next;
1295194Smax.romanov@nginx.com             }
129642Smax.romanov@nginx.com         }
1297194Smax.romanov@nginx.com 
1298194Smax.romanov@nginx.com         /* restore original buf */
1299194Smax.romanov@nginx.com         msg->buf = orig_b;
130042Smax.romanov@nginx.com     }
130111Sigor@sysoev.ru }
130211Sigor@sysoev.ru 
130311Sigor@sysoev.ru 
130411Sigor@sysoev.ru static nxt_buf_t *
nxt_port_buf_alloc(nxt_port_t * port)130511Sigor@sysoev.ru nxt_port_buf_alloc(nxt_port_t *port)
130611Sigor@sysoev.ru {
130711Sigor@sysoev.ru     nxt_buf_t  *b;
130811Sigor@sysoev.ru 
130911Sigor@sysoev.ru     if (port->free_bufs != NULL) {
131011Sigor@sysoev.ru         b = port->free_bufs;
131111Sigor@sysoev.ru         port->free_bufs = b->next;
131211Sigor@sysoev.ru 
131311Sigor@sysoev.ru         b->mem.pos = b->mem.start;
131411Sigor@sysoev.ru         b->mem.free = b->mem.start;
131542Smax.romanov@nginx.com         b->next = NULL;
131611Sigor@sysoev.ru     } else {
131711Sigor@sysoev.ru         b = nxt_buf_mem_alloc(port->mem_pool, port->max_size, 0);
131811Sigor@sysoev.ru         if (nxt_slow_path(b == NULL)) {
131911Sigor@sysoev.ru             return NULL;
132011Sigor@sysoev.ru         }
132111Sigor@sysoev.ru     }
132211Sigor@sysoev.ru 
132311Sigor@sysoev.ru     return b;
132411Sigor@sysoev.ru }
132511Sigor@sysoev.ru 
132611Sigor@sysoev.ru 
132711Sigor@sysoev.ru static void
nxt_port_buf_free(nxt_port_t * port,nxt_buf_t * b)132811Sigor@sysoev.ru nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b)
132911Sigor@sysoev.ru {
1330974Smax.romanov@nginx.com     nxt_buf_chain_add(&b, port->free_bufs);
133111Sigor@sysoev.ru     port->free_bufs = b;
133211Sigor@sysoev.ru }
133311Sigor@sysoev.ru 
133411Sigor@sysoev.ru 
133511Sigor@sysoev.ru static void
nxt_port_error_handler(nxt_task_t * task,void * obj,void * data)133611Sigor@sysoev.ru nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
133711Sigor@sysoev.ru {
1338343Smax.romanov@nginx.com     int                  use_delta;
13391269Sigor@sysoev.ru     nxt_buf_t            *b, *next;
1340197Smax.romanov@nginx.com     nxt_port_t           *port;
1341197Smax.romanov@nginx.com     nxt_work_queue_t     *wq;
1342197Smax.romanov@nginx.com     nxt_port_send_msg_t  *msg;
1343197Smax.romanov@nginx.com 
1344125Smax.romanov@nginx.com     nxt_debug(task, "port error handler %p", obj);
134511Sigor@sysoev.ru     /* TODO */
1346197Smax.romanov@nginx.com 
1347197Smax.romanov@nginx.com     port = nxt_container_of(obj, nxt_port_t, socket);
1348197Smax.romanov@nginx.com 
1349343Smax.romanov@nginx.com     use_delta = 0;
1350343Smax.romanov@nginx.com 
1351343Smax.romanov@nginx.com     if (obj == data) {
1352343Smax.romanov@nginx.com         use_delta--;
1353343Smax.romanov@nginx.com     }
1354197Smax.romanov@nginx.com 
1355343Smax.romanov@nginx.com     wq = &task->thread->engine->fast_work_queue;
1356343Smax.romanov@nginx.com 
1357343Smax.romanov@nginx.com     nxt_thread_mutex_lock(&port->write_mutex);
1358343Smax.romanov@nginx.com 
1359343Smax.romanov@nginx.com     nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) {
1360197Smax.romanov@nginx.com 
13611908Smax.romanov@nginx.com         nxt_port_msg_close_fd(msg);
13621553Smax.romanov@nginx.com 
13631269Sigor@sysoev.ru         for (b = msg->buf; b != NULL; b = next) {
13641269Sigor@sysoev.ru             next = b->next;
13651269Sigor@sysoev.ru             b->next = NULL;
13661269Sigor@sysoev.ru 
1367197Smax.romanov@nginx.com             if (nxt_buf_is_sync(b)) {
1368197Smax.romanov@nginx.com                 continue;
1369197Smax.romanov@nginx.com             }
1370197Smax.romanov@nginx.com 
1371197Smax.romanov@nginx.com             nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
1372197Smax.romanov@nginx.com         }
1373197Smax.romanov@nginx.com 
1374197Smax.romanov@nginx.com         nxt_queue_remove(&msg->link);
1375343Smax.romanov@nginx.com         use_delta--;
13761125Smax.romanov@nginx.com 
13771125Smax.romanov@nginx.com         nxt_port_release_send_msg(msg);
1378197Smax.romanov@nginx.com 
1379197Smax.romanov@nginx.com     } nxt_queue_loop;
1380343Smax.romanov@nginx.com 
1381343Smax.romanov@nginx.com     nxt_thread_mutex_unlock(&port->write_mutex);
1382343Smax.romanov@nginx.com 
1383343Smax.romanov@nginx.com     if (use_delta != 0) {
1384343Smax.romanov@nginx.com         nxt_port_use(task, port, use_delta);
1385343Smax.romanov@nginx.com     }
138611Sigor@sysoev.ru }
1387