xref: /unit/src/nxt_port_socket.c (revision 1125)
111Sigor@sysoev.ru 
211Sigor@sysoev.ru /*
311Sigor@sysoev.ru  * Copyright (C) Igor Sysoev
411Sigor@sysoev.ru  * Copyright (C) NGINX, Inc.
511Sigor@sysoev.ru  */
611Sigor@sysoev.ru 
711Sigor@sysoev.ru #include <nxt_main.h>
811Sigor@sysoev.ru 
911Sigor@sysoev.ru 
10*1125Smax.romanov@nginx.com static nxt_int_t nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port,
11*1125Smax.romanov@nginx.com     nxt_port_send_msg_t *msg);
12*1125Smax.romanov@nginx.com static nxt_port_send_msg_t *nxt_port_msg_alloc(nxt_port_send_msg_t *m);
1311Sigor@sysoev.ru static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data);
14*1125Smax.romanov@nginx.com static nxt_port_send_msg_t *nxt_port_msg_first(nxt_port_t *port);
15592Sigor@sysoev.ru static nxt_buf_t *nxt_port_buf_completion(nxt_task_t *task,
16592Sigor@sysoev.ru     nxt_work_queue_t *wq, nxt_buf_t *b, size_t sent, nxt_bool_t mmap_mode);
17*1125Smax.romanov@nginx.com static nxt_port_send_msg_t *nxt_port_msg_insert_tail(nxt_port_t *port,
18*1125Smax.romanov@nginx.com     nxt_port_send_msg_t *msg);
1911Sigor@sysoev.ru static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data);
2011Sigor@sysoev.ru static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
2182Smax.romanov@nginx.com     nxt_port_recv_msg_t *msg);
2211Sigor@sysoev.ru static nxt_buf_t *nxt_port_buf_alloc(nxt_port_t *port);
2311Sigor@sysoev.ru static void nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b);
2411Sigor@sysoev.ru static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data);
2511Sigor@sysoev.ru 
2611Sigor@sysoev.ru 
2714Sigor@sysoev.ru nxt_int_t
2814Sigor@sysoev.ru nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port, size_t max_size)
2911Sigor@sysoev.ru {
3065Sigor@sysoev.ru     nxt_int_t     sndbuf, rcvbuf, size;
3165Sigor@sysoev.ru     nxt_socket_t  snd, rcv;
3211Sigor@sysoev.ru 
3314Sigor@sysoev.ru     port->socket.task = task;
3414Sigor@sysoev.ru 
3514Sigor@sysoev.ru     port->pair[0] = -1;
3614Sigor@sysoev.ru     port->pair[1] = -1;
3714Sigor@sysoev.ru 
3813Sigor@sysoev.ru     if (nxt_slow_path(nxt_socketpair_create(task, port->pair) != NXT_OK)) {
3911Sigor@sysoev.ru         goto socketpair_fail;
4011Sigor@sysoev.ru     }
4111Sigor@sysoev.ru 
4211Sigor@sysoev.ru     snd = port->pair[1];
4311Sigor@sysoev.ru 
4413Sigor@sysoev.ru     sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF);
4511Sigor@sysoev.ru     if (nxt_slow_path(sndbuf < 0)) {
4611Sigor@sysoev.ru         goto getsockopt_fail;
4711Sigor@sysoev.ru     }
4811Sigor@sysoev.ru 
4911Sigor@sysoev.ru     rcv = port->pair[0];
5011Sigor@sysoev.ru 
5113Sigor@sysoev.ru     rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF);
5211Sigor@sysoev.ru     if (nxt_slow_path(rcvbuf < 0)) {
5311Sigor@sysoev.ru         goto getsockopt_fail;
5411Sigor@sysoev.ru     }
5511Sigor@sysoev.ru 
5611Sigor@sysoev.ru     if (max_size == 0) {
5711Sigor@sysoev.ru         max_size = 16 * 1024;
5811Sigor@sysoev.ru     }
5911Sigor@sysoev.ru 
6011Sigor@sysoev.ru     if ((size_t) sndbuf < max_size) {
6111Sigor@sysoev.ru         /*
6211Sigor@sysoev.ru          * On Unix domain sockets
6311Sigor@sysoev.ru          *   Linux uses 224K on both send and receive directions;
6411Sigor@sysoev.ru          *   FreeBSD, MacOSX, NetBSD, and OpenBSD use 2K buffer size
6511Sigor@sysoev.ru          *   on send direction and 4K buffer size on receive direction;
6611Sigor@sysoev.ru          *   Solaris uses 16K on send direction and 5K on receive direction.
6711Sigor@sysoev.ru          */
6813Sigor@sysoev.ru         (void) nxt_socket_setsockopt(task, snd, SOL_SOCKET, SO_SNDBUF,
6913Sigor@sysoev.ru                                      max_size);
7011Sigor@sysoev.ru 
7113Sigor@sysoev.ru         sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF);
7211Sigor@sysoev.ru         if (nxt_slow_path(sndbuf < 0)) {
7311Sigor@sysoev.ru             goto getsockopt_fail;
7411Sigor@sysoev.ru         }
7511Sigor@sysoev.ru 
7611Sigor@sysoev.ru         size = sndbuf * 4;
7711Sigor@sysoev.ru 
7811Sigor@sysoev.ru         if (rcvbuf < size) {
7913Sigor@sysoev.ru             (void) nxt_socket_setsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF,
8013Sigor@sysoev.ru                                          size);
8111Sigor@sysoev.ru 
8213Sigor@sysoev.ru             rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF);
8311Sigor@sysoev.ru             if (nxt_slow_path(rcvbuf < 0)) {
8411Sigor@sysoev.ru                 goto getsockopt_fail;
8511Sigor@sysoev.ru             }
8611Sigor@sysoev.ru         }
8711Sigor@sysoev.ru     }
8811Sigor@sysoev.ru 
8911Sigor@sysoev.ru     port->max_size = nxt_min(max_size, (size_t) sndbuf);
9011Sigor@sysoev.ru     port->max_share = (64 * 1024);
9111Sigor@sysoev.ru 
9214Sigor@sysoev.ru     return NXT_OK;
9311Sigor@sysoev.ru 
9411Sigor@sysoev.ru getsockopt_fail:
9511Sigor@sysoev.ru 
9613Sigor@sysoev.ru     nxt_socket_close(task, port->pair[0]);
9713Sigor@sysoev.ru     nxt_socket_close(task, port->pair[1]);
9811Sigor@sysoev.ru 
9911Sigor@sysoev.ru socketpair_fail:
10011Sigor@sysoev.ru 
10114Sigor@sysoev.ru     return NXT_ERROR;
10211Sigor@sysoev.ru }
10311Sigor@sysoev.ru 
10411Sigor@sysoev.ru 
10511Sigor@sysoev.ru void
10611Sigor@sysoev.ru nxt_port_destroy(nxt_port_t *port)
10711Sigor@sysoev.ru {
10813Sigor@sysoev.ru     nxt_socket_close(port->socket.task, port->socket.fd);
10965Sigor@sysoev.ru     nxt_mp_destroy(port->mem_pool);
11011Sigor@sysoev.ru }
11111Sigor@sysoev.ru 
11211Sigor@sysoev.ru 
11311Sigor@sysoev.ru void
11411Sigor@sysoev.ru nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port)
11511Sigor@sysoev.ru {
11611Sigor@sysoev.ru     port->socket.fd = port->pair[1];
11711Sigor@sysoev.ru     port->socket.log = &nxt_main_log;
11811Sigor@sysoev.ru     port->socket.write_ready = 1;
11911Sigor@sysoev.ru 
120141Smax.romanov@nginx.com     port->engine = task->thread->engine;
121141Smax.romanov@nginx.com 
122141Smax.romanov@nginx.com     port->socket.write_work_queue = &port->engine->fast_work_queue;
12311Sigor@sysoev.ru     port->socket.write_handler = nxt_port_write_handler;
12411Sigor@sysoev.ru     port->socket.error_handler = nxt_port_error_handler;
12511Sigor@sysoev.ru }
12611Sigor@sysoev.ru 
12711Sigor@sysoev.ru 
12811Sigor@sysoev.ru void
12911Sigor@sysoev.ru nxt_port_write_close(nxt_port_t *port)
13011Sigor@sysoev.ru {
13113Sigor@sysoev.ru     nxt_socket_close(port->socket.task, port->pair[1]);
13211Sigor@sysoev.ru     port->pair[1] = -1;
13311Sigor@sysoev.ru }
13411Sigor@sysoev.ru 
13511Sigor@sysoev.ru 
136122Smax.romanov@nginx.com static void
137*1125Smax.romanov@nginx.com nxt_port_release_send_msg(nxt_port_send_msg_t *msg)
138122Smax.romanov@nginx.com {
139*1125Smax.romanov@nginx.com     if (msg->allocated) {
140*1125Smax.romanov@nginx.com         nxt_free(msg);
141344Smax.romanov@nginx.com     }
142122Smax.romanov@nginx.com }
143122Smax.romanov@nginx.com 
144122Smax.romanov@nginx.com 
14511Sigor@sysoev.ru nxt_int_t
146423Smax.romanov@nginx.com nxt_port_socket_twrite(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
147423Smax.romanov@nginx.com     nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b,
148423Smax.romanov@nginx.com     void *tracking)
14911Sigor@sysoev.ru {
150*1125Smax.romanov@nginx.com     nxt_int_t            res;
151*1125Smax.romanov@nginx.com     nxt_port_send_msg_t  msg;
15211Sigor@sysoev.ru 
153344Smax.romanov@nginx.com     msg.link.next = NULL;
154344Smax.romanov@nginx.com     msg.link.prev = NULL;
155122Smax.romanov@nginx.com 
156344Smax.romanov@nginx.com     msg.buf = b;
157*1125Smax.romanov@nginx.com     msg.share = 0;
158344Smax.romanov@nginx.com     msg.fd = fd;
159344Smax.romanov@nginx.com     msg.close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0;
160*1125Smax.romanov@nginx.com     msg.allocated = 0;
16111Sigor@sysoev.ru 
162423Smax.romanov@nginx.com     if (tracking != NULL) {
163423Smax.romanov@nginx.com         nxt_port_mmap_tracking_write(msg.tracking_msg, tracking);
164423Smax.romanov@nginx.com     }
165423Smax.romanov@nginx.com 
166344Smax.romanov@nginx.com     msg.port_msg.stream = stream;
167344Smax.romanov@nginx.com     msg.port_msg.pid = nxt_pid;
168344Smax.romanov@nginx.com     msg.port_msg.reply_port = reply_port;
169344Smax.romanov@nginx.com     msg.port_msg.type = type & NXT_PORT_MSG_MASK;
170344Smax.romanov@nginx.com     msg.port_msg.last = (type & NXT_PORT_MSG_LAST) != 0;
171344Smax.romanov@nginx.com     msg.port_msg.mmap = 0;
172352Smax.romanov@nginx.com     msg.port_msg.nf = 0;
173352Smax.romanov@nginx.com     msg.port_msg.mf = 0;
174423Smax.romanov@nginx.com     msg.port_msg.tracking = tracking != NULL;
17511Sigor@sysoev.ru 
176*1125Smax.romanov@nginx.com     res = nxt_port_msg_chk_insert(task, port, &msg);
177*1125Smax.romanov@nginx.com     if (nxt_fast_path(res == NXT_DECLINED)) {
178344Smax.romanov@nginx.com         nxt_port_write_handler(task, &port->socket, &msg);
179*1125Smax.romanov@nginx.com         res = NXT_OK;
18011Sigor@sysoev.ru     }
18111Sigor@sysoev.ru 
182*1125Smax.romanov@nginx.com     return res;
183*1125Smax.romanov@nginx.com }
184*1125Smax.romanov@nginx.com 
185*1125Smax.romanov@nginx.com 
186*1125Smax.romanov@nginx.com static nxt_int_t
187*1125Smax.romanov@nginx.com nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port,
188*1125Smax.romanov@nginx.com     nxt_port_send_msg_t *msg)
189*1125Smax.romanov@nginx.com {
190*1125Smax.romanov@nginx.com     nxt_int_t  res;
191*1125Smax.romanov@nginx.com 
192*1125Smax.romanov@nginx.com     nxt_thread_mutex_lock(&port->write_mutex);
193*1125Smax.romanov@nginx.com 
194*1125Smax.romanov@nginx.com     if (nxt_fast_path(port->socket.write_ready
195*1125Smax.romanov@nginx.com                       && nxt_queue_is_empty(&port->messages)))
196*1125Smax.romanov@nginx.com     {
197*1125Smax.romanov@nginx.com         res = NXT_DECLINED;
198*1125Smax.romanov@nginx.com 
199*1125Smax.romanov@nginx.com     } else {
200*1125Smax.romanov@nginx.com         msg = nxt_port_msg_alloc(msg);
201*1125Smax.romanov@nginx.com 
202*1125Smax.romanov@nginx.com         if (nxt_fast_path(msg != NULL)) {
203*1125Smax.romanov@nginx.com             nxt_queue_insert_tail(&port->messages, &msg->link);
204*1125Smax.romanov@nginx.com             nxt_port_use(task, port, 1);
205*1125Smax.romanov@nginx.com             res = NXT_OK;
206*1125Smax.romanov@nginx.com 
207*1125Smax.romanov@nginx.com         } else {
208*1125Smax.romanov@nginx.com             res = NXT_ERROR;
209*1125Smax.romanov@nginx.com         }
210*1125Smax.romanov@nginx.com     }
211*1125Smax.romanov@nginx.com 
212*1125Smax.romanov@nginx.com     nxt_thread_mutex_unlock(&port->write_mutex);
213*1125Smax.romanov@nginx.com 
214*1125Smax.romanov@nginx.com     return res;
215*1125Smax.romanov@nginx.com }
216*1125Smax.romanov@nginx.com 
217*1125Smax.romanov@nginx.com 
218*1125Smax.romanov@nginx.com static nxt_port_send_msg_t *
219*1125Smax.romanov@nginx.com nxt_port_msg_alloc(nxt_port_send_msg_t *m)
220*1125Smax.romanov@nginx.com {
221*1125Smax.romanov@nginx.com     nxt_port_send_msg_t  *msg;
222*1125Smax.romanov@nginx.com 
223*1125Smax.romanov@nginx.com     msg = nxt_malloc(sizeof(nxt_port_send_msg_t));
224*1125Smax.romanov@nginx.com     if (nxt_slow_path(msg == NULL)) {
225*1125Smax.romanov@nginx.com         return NULL;
226*1125Smax.romanov@nginx.com     }
227*1125Smax.romanov@nginx.com 
228*1125Smax.romanov@nginx.com     *msg = *m;
229*1125Smax.romanov@nginx.com 
230*1125Smax.romanov@nginx.com     msg->allocated = 1;
231*1125Smax.romanov@nginx.com 
232*1125Smax.romanov@nginx.com     return msg;
23311Sigor@sysoev.ru }
23411Sigor@sysoev.ru 
23511Sigor@sysoev.ru 
23611Sigor@sysoev.ru static void
237343Smax.romanov@nginx.com nxt_port_fd_block_write(nxt_task_t *task, nxt_port_t *port, void *data)
238343Smax.romanov@nginx.com {
239343Smax.romanov@nginx.com     nxt_fd_event_block_write(task->thread->engine, &port->socket);
240343Smax.romanov@nginx.com }
241343Smax.romanov@nginx.com 
242343Smax.romanov@nginx.com 
243343Smax.romanov@nginx.com static void
244343Smax.romanov@nginx.com nxt_port_fd_enable_write(nxt_task_t *task, nxt_port_t *port, void *data)
245343Smax.romanov@nginx.com {
246343Smax.romanov@nginx.com     nxt_fd_event_enable_write(task->thread->engine, &port->socket);
247343Smax.romanov@nginx.com }
248343Smax.romanov@nginx.com 
249343Smax.romanov@nginx.com 
250343Smax.romanov@nginx.com static void
25111Sigor@sysoev.ru nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
25211Sigor@sysoev.ru {
253343Smax.romanov@nginx.com     int                     use_delta;
254197Smax.romanov@nginx.com     size_t                  plain_size;
25511Sigor@sysoev.ru     ssize_t                 n;
256*1125Smax.romanov@nginx.com     uint32_t                mmsg_buf[3 * NXT_IOBUF_MAX * 10];
257343Smax.romanov@nginx.com     nxt_bool_t              block_write, enable_write;
25811Sigor@sysoev.ru     nxt_port_t              *port;
259*1125Smax.romanov@nginx.com     struct iovec            iov[NXT_IOBUF_MAX * 10];
260127Smax.romanov@nginx.com     nxt_work_queue_t        *wq;
261125Smax.romanov@nginx.com     nxt_port_method_t       m;
26211Sigor@sysoev.ru     nxt_port_send_msg_t     *msg;
26311Sigor@sysoev.ru     nxt_sendbuf_coalesce_t  sb;
26442Smax.romanov@nginx.com 
265197Smax.romanov@nginx.com     port = nxt_container_of(obj, nxt_port_t, socket);
26611Sigor@sysoev.ru 
267343Smax.romanov@nginx.com     block_write = 0;
268343Smax.romanov@nginx.com     enable_write = 0;
269343Smax.romanov@nginx.com     use_delta = 0;
270343Smax.romanov@nginx.com 
271344Smax.romanov@nginx.com     wq = &task->thread->engine->fast_work_queue;
272344Smax.romanov@nginx.com 
27311Sigor@sysoev.ru     do {
274*1125Smax.romanov@nginx.com         if (data) {
275*1125Smax.romanov@nginx.com             msg = data;
276*1125Smax.romanov@nginx.com 
277*1125Smax.romanov@nginx.com         } else {
278*1125Smax.romanov@nginx.com             msg = nxt_port_msg_first(port);
27911Sigor@sysoev.ru 
280*1125Smax.romanov@nginx.com             if (msg == NULL) {
281*1125Smax.romanov@nginx.com                 block_write = 1;
282*1125Smax.romanov@nginx.com                 goto cleanup;
283*1125Smax.romanov@nginx.com             }
28411Sigor@sysoev.ru         }
28511Sigor@sysoev.ru 
286*1125Smax.romanov@nginx.com next_fragment:
287*1125Smax.romanov@nginx.com 
28814Sigor@sysoev.ru         iov[0].iov_base = &msg->port_msg;
28914Sigor@sysoev.ru         iov[0].iov_len = sizeof(nxt_port_msg_t);
29011Sigor@sysoev.ru 
29111Sigor@sysoev.ru         sb.buf = msg->buf;
29214Sigor@sysoev.ru         sb.iobuf = &iov[1];
29311Sigor@sysoev.ru         sb.nmax = NXT_IOBUF_MAX - 1;
29411Sigor@sysoev.ru         sb.sync = 0;
29511Sigor@sysoev.ru         sb.last = 0;
29642Smax.romanov@nginx.com         sb.size = 0;
29711Sigor@sysoev.ru         sb.limit = port->max_size;
29811Sigor@sysoev.ru 
299352Smax.romanov@nginx.com         sb.limit_reached = 0;
300352Smax.romanov@nginx.com         sb.nmax_reached = 0;
301352Smax.romanov@nginx.com 
30242Smax.romanov@nginx.com         m = nxt_port_mmap_get_method(task, port, msg->buf);
30342Smax.romanov@nginx.com 
30442Smax.romanov@nginx.com         if (m == NXT_PORT_METHOD_MMAP) {
30542Smax.romanov@nginx.com             sb.limit = (1ULL << 31) - 1;
306352Smax.romanov@nginx.com             sb.nmax = nxt_min(NXT_IOBUF_MAX * 10 - 1,
307352Smax.romanov@nginx.com                               port->max_size / PORT_MMAP_MIN_SIZE);
30842Smax.romanov@nginx.com         }
30942Smax.romanov@nginx.com 
310423Smax.romanov@nginx.com         if (msg->port_msg.tracking) {
311423Smax.romanov@nginx.com             iov[0].iov_len += sizeof(msg->tracking_msg);
312423Smax.romanov@nginx.com         }
313423Smax.romanov@nginx.com 
3141002Smax.romanov@nginx.com         sb.limit -= iov[0].iov_len;
3151002Smax.romanov@nginx.com 
31642Smax.romanov@nginx.com         nxt_sendbuf_mem_coalesce(task, &sb);
31742Smax.romanov@nginx.com 
31842Smax.romanov@nginx.com         plain_size = sb.size;
31942Smax.romanov@nginx.com 
32042Smax.romanov@nginx.com         /*
32142Smax.romanov@nginx.com          * Send through mmap enabled only when payload
32242Smax.romanov@nginx.com          * is bigger than PORT_MMAP_MIN_SIZE.
32342Smax.romanov@nginx.com          */
32442Smax.romanov@nginx.com         if (m == NXT_PORT_METHOD_MMAP && plain_size > PORT_MMAP_MIN_SIZE) {
325*1125Smax.romanov@nginx.com             nxt_port_mmap_write(task, port, msg, &sb, mmsg_buf);
32642Smax.romanov@nginx.com 
32742Smax.romanov@nginx.com         } else {
32842Smax.romanov@nginx.com             m = NXT_PORT_METHOD_PLAIN;
32942Smax.romanov@nginx.com         }
33011Sigor@sysoev.ru 
331189Smax.romanov@nginx.com         msg->port_msg.last |= sb.last;
332352Smax.romanov@nginx.com         msg->port_msg.mf = sb.limit_reached || sb.nmax_reached;
33311Sigor@sysoev.ru 
33442Smax.romanov@nginx.com         n = nxt_socketpair_send(&port->socket, msg->fd, iov, sb.niov + 1);
33511Sigor@sysoev.ru 
33611Sigor@sysoev.ru         if (n > 0) {
33742Smax.romanov@nginx.com             if (nxt_slow_path((size_t) n != sb.size + iov[0].iov_len)) {
338564Svbart@nginx.com                 nxt_alert(task, "port %d: short write: %z instead of %uz",
339564Svbart@nginx.com                           port->socket.fd, n, sb.size + iov[0].iov_len);
34011Sigor@sysoev.ru                 goto fail;
34111Sigor@sysoev.ru             }
34211Sigor@sysoev.ru 
343189Smax.romanov@nginx.com             if (msg->fd != -1 && msg->close_fd != 0) {
344189Smax.romanov@nginx.com                 nxt_fd_close(msg->fd);
345189Smax.romanov@nginx.com 
346189Smax.romanov@nginx.com                 msg->fd = -1;
347189Smax.romanov@nginx.com             }
348189Smax.romanov@nginx.com 
349592Sigor@sysoev.ru             msg->buf = nxt_port_buf_completion(task, wq, msg->buf, plain_size,
350*1125Smax.romanov@nginx.com                                                m == NXT_PORT_METHOD_MMAP);
35111Sigor@sysoev.ru 
35211Sigor@sysoev.ru             if (msg->buf != NULL) {
353352Smax.romanov@nginx.com                 nxt_debug(task, "port %d: frag stream #%uD", port->socket.fd,
354352Smax.romanov@nginx.com                           msg->port_msg.stream);
355352Smax.romanov@nginx.com 
35611Sigor@sysoev.ru                 /*
35711Sigor@sysoev.ru                  * A file descriptor is sent only
35811Sigor@sysoev.ru                  * in the first message of a stream.
35911Sigor@sysoev.ru                  */
36011Sigor@sysoev.ru                 msg->fd = -1;
36111Sigor@sysoev.ru                 msg->share += n;
362352Smax.romanov@nginx.com                 msg->port_msg.nf = 1;
363423Smax.romanov@nginx.com                 msg->port_msg.tracking = 0;
36411Sigor@sysoev.ru 
36511Sigor@sysoev.ru                 if (msg->share >= port->max_share) {
36611Sigor@sysoev.ru                     msg->share = 0;
367344Smax.romanov@nginx.com 
368344Smax.romanov@nginx.com                     if (msg->link.next != NULL) {
369*1125Smax.romanov@nginx.com                         nxt_thread_mutex_lock(&port->write_mutex);
370*1125Smax.romanov@nginx.com 
371344Smax.romanov@nginx.com                         nxt_queue_remove(&msg->link);
372*1125Smax.romanov@nginx.com                         nxt_queue_insert_tail(&port->messages, &msg->link);
373*1125Smax.romanov@nginx.com 
374*1125Smax.romanov@nginx.com                         nxt_thread_mutex_unlock(&port->write_mutex);
375344Smax.romanov@nginx.com 
376*1125Smax.romanov@nginx.com                     } else {
377*1125Smax.romanov@nginx.com                         msg = nxt_port_msg_insert_tail(port, msg);
378*1125Smax.romanov@nginx.com                         if (nxt_slow_path(msg == NULL)) {
379*1125Smax.romanov@nginx.com                             goto fail;
380*1125Smax.romanov@nginx.com                         }
381*1125Smax.romanov@nginx.com 
382344Smax.romanov@nginx.com                         use_delta++;
383344Smax.romanov@nginx.com                     }
384*1125Smax.romanov@nginx.com 
385*1125Smax.romanov@nginx.com                 } else {
386*1125Smax.romanov@nginx.com                     goto next_fragment;
38711Sigor@sysoev.ru                 }
38811Sigor@sysoev.ru 
38911Sigor@sysoev.ru             } else {
390344Smax.romanov@nginx.com                 if (msg->link.next != NULL) {
391*1125Smax.romanov@nginx.com                     nxt_thread_mutex_lock(&port->write_mutex);
392*1125Smax.romanov@nginx.com 
393344Smax.romanov@nginx.com                     nxt_queue_remove(&msg->link);
394*1125Smax.romanov@nginx.com                     msg->link.next = NULL;
395*1125Smax.romanov@nginx.com 
396*1125Smax.romanov@nginx.com                     nxt_thread_mutex_unlock(&port->write_mutex);
397*1125Smax.romanov@nginx.com 
398344Smax.romanov@nginx.com                     use_delta--;
399344Smax.romanov@nginx.com                 }
400*1125Smax.romanov@nginx.com 
401*1125Smax.romanov@nginx.com                 nxt_port_release_send_msg(msg);
402*1125Smax.romanov@nginx.com             }
403*1125Smax.romanov@nginx.com 
404*1125Smax.romanov@nginx.com             if (data != NULL) {
405*1125Smax.romanov@nginx.com                 goto cleanup;
40611Sigor@sysoev.ru             }
40711Sigor@sysoev.ru 
4081004Smax.romanov@nginx.com         } else {
409*1125Smax.romanov@nginx.com             if (nxt_slow_path(n == NXT_ERROR)) {
410*1125Smax.romanov@nginx.com                 goto fail;
411344Smax.romanov@nginx.com             }
4121004Smax.romanov@nginx.com 
413*1125Smax.romanov@nginx.com             if (msg->link.next == NULL) {
414*1125Smax.romanov@nginx.com                 msg = nxt_port_msg_insert_tail(port, msg);
415*1125Smax.romanov@nginx.com                 if (nxt_slow_path(msg == NULL)) {
416*1125Smax.romanov@nginx.com                     goto fail;
417*1125Smax.romanov@nginx.com                 }
418*1125Smax.romanov@nginx.com 
419*1125Smax.romanov@nginx.com                 use_delta++;
4201004Smax.romanov@nginx.com             }
42111Sigor@sysoev.ru         }
42211Sigor@sysoev.ru 
42311Sigor@sysoev.ru     } while (port->socket.write_ready);
42411Sigor@sysoev.ru 
42512Sigor@sysoev.ru     if (nxt_fd_event_is_disabled(port->socket.write)) {
426343Smax.romanov@nginx.com         enable_write = 1;
42711Sigor@sysoev.ru     }
42811Sigor@sysoev.ru 
429*1125Smax.romanov@nginx.com     goto cleanup;
43011Sigor@sysoev.ru 
43111Sigor@sysoev.ru fail:
43211Sigor@sysoev.ru 
433343Smax.romanov@nginx.com     use_delta++;
434343Smax.romanov@nginx.com 
435344Smax.romanov@nginx.com     nxt_work_queue_add(wq, nxt_port_error_handler, task, &port->socket,
436343Smax.romanov@nginx.com                        &port->socket);
437343Smax.romanov@nginx.com 
438*1125Smax.romanov@nginx.com cleanup:
439343Smax.romanov@nginx.com 
440343Smax.romanov@nginx.com     if (block_write && nxt_fd_event_is_active(port->socket.write)) {
441343Smax.romanov@nginx.com         nxt_port_post(task, port, nxt_port_fd_block_write, NULL);
442343Smax.romanov@nginx.com     }
443343Smax.romanov@nginx.com 
444343Smax.romanov@nginx.com     if (enable_write) {
445343Smax.romanov@nginx.com         nxt_port_post(task, port, nxt_port_fd_enable_write, NULL);
446343Smax.romanov@nginx.com     }
447343Smax.romanov@nginx.com 
448343Smax.romanov@nginx.com     if (use_delta != 0) {
449343Smax.romanov@nginx.com         nxt_port_use(task, port, use_delta);
450343Smax.romanov@nginx.com     }
45111Sigor@sysoev.ru }
45211Sigor@sysoev.ru 
45311Sigor@sysoev.ru 
454*1125Smax.romanov@nginx.com static nxt_port_send_msg_t *
455*1125Smax.romanov@nginx.com nxt_port_msg_first(nxt_port_t *port)
456*1125Smax.romanov@nginx.com {
457*1125Smax.romanov@nginx.com     nxt_queue_link_t     *lnk;
458*1125Smax.romanov@nginx.com     nxt_port_send_msg_t  *msg;
459*1125Smax.romanov@nginx.com 
460*1125Smax.romanov@nginx.com     nxt_thread_mutex_lock(&port->write_mutex);
461*1125Smax.romanov@nginx.com 
462*1125Smax.romanov@nginx.com     lnk = nxt_queue_first(&port->messages);
463*1125Smax.romanov@nginx.com 
464*1125Smax.romanov@nginx.com     if (lnk == nxt_queue_tail(&port->messages)) {
465*1125Smax.romanov@nginx.com         msg = NULL;
466*1125Smax.romanov@nginx.com 
467*1125Smax.romanov@nginx.com     } else {
468*1125Smax.romanov@nginx.com         msg = nxt_queue_link_data(lnk, nxt_port_send_msg_t, link);
469*1125Smax.romanov@nginx.com     }
470*1125Smax.romanov@nginx.com 
471*1125Smax.romanov@nginx.com     nxt_thread_mutex_unlock(&port->write_mutex);
472*1125Smax.romanov@nginx.com 
473*1125Smax.romanov@nginx.com     return msg;
474*1125Smax.romanov@nginx.com }
475*1125Smax.romanov@nginx.com 
476*1125Smax.romanov@nginx.com 
477592Sigor@sysoev.ru static nxt_buf_t *
478592Sigor@sysoev.ru nxt_port_buf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b,
479592Sigor@sysoev.ru     size_t sent, nxt_bool_t mmap_mode)
480592Sigor@sysoev.ru {
481592Sigor@sysoev.ru     size_t  size;
482592Sigor@sysoev.ru 
483592Sigor@sysoev.ru     while (b != NULL) {
484592Sigor@sysoev.ru 
485592Sigor@sysoev.ru         nxt_prefetch(b->next);
486592Sigor@sysoev.ru 
487592Sigor@sysoev.ru         if (!nxt_buf_is_sync(b)) {
488592Sigor@sysoev.ru 
489592Sigor@sysoev.ru             size = nxt_buf_used_size(b);
490592Sigor@sysoev.ru 
491592Sigor@sysoev.ru             if (size != 0) {
492592Sigor@sysoev.ru 
493592Sigor@sysoev.ru                 if (sent == 0) {
494592Sigor@sysoev.ru                     break;
495592Sigor@sysoev.ru                 }
496592Sigor@sysoev.ru 
497592Sigor@sysoev.ru                 if (nxt_buf_is_port_mmap(b) && mmap_mode) {
498592Sigor@sysoev.ru                     /*
499592Sigor@sysoev.ru                      * buffer has been sent to other side which is now
500592Sigor@sysoev.ru                      * responsible for shared memory bucket release
501592Sigor@sysoev.ru                      */
502592Sigor@sysoev.ru                     b->is_port_mmap_sent = 1;
503592Sigor@sysoev.ru                 }
504592Sigor@sysoev.ru 
505592Sigor@sysoev.ru                 if (sent < size) {
506592Sigor@sysoev.ru 
507592Sigor@sysoev.ru                     if (nxt_buf_is_mem(b)) {
508592Sigor@sysoev.ru                         b->mem.pos += sent;
509592Sigor@sysoev.ru                     }
510592Sigor@sysoev.ru 
511592Sigor@sysoev.ru                     if (nxt_buf_is_file(b)) {
512592Sigor@sysoev.ru                         b->file_pos += sent;
513592Sigor@sysoev.ru                     }
514592Sigor@sysoev.ru 
515592Sigor@sysoev.ru                     break;
516592Sigor@sysoev.ru                 }
517592Sigor@sysoev.ru 
518592Sigor@sysoev.ru                 /* b->mem.free is NULL in file-only buffer. */
519592Sigor@sysoev.ru                 b->mem.pos = b->mem.free;
520592Sigor@sysoev.ru 
521592Sigor@sysoev.ru                 if (nxt_buf_is_file(b)) {
522592Sigor@sysoev.ru                     b->file_pos = b->file_end;
523592Sigor@sysoev.ru                 }
524592Sigor@sysoev.ru 
525592Sigor@sysoev.ru                 sent -= size;
526592Sigor@sysoev.ru             }
527592Sigor@sysoev.ru         }
528592Sigor@sysoev.ru 
529592Sigor@sysoev.ru         nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
530592Sigor@sysoev.ru 
531592Sigor@sysoev.ru         b = b->next;
532592Sigor@sysoev.ru     }
533592Sigor@sysoev.ru 
534592Sigor@sysoev.ru     return b;
535592Sigor@sysoev.ru }
536592Sigor@sysoev.ru 
537592Sigor@sysoev.ru 
538*1125Smax.romanov@nginx.com static nxt_port_send_msg_t *
539*1125Smax.romanov@nginx.com nxt_port_msg_insert_tail(nxt_port_t *port, nxt_port_send_msg_t *msg)
540*1125Smax.romanov@nginx.com {
541*1125Smax.romanov@nginx.com     if (msg->allocated == 0) {
542*1125Smax.romanov@nginx.com         msg = nxt_port_msg_alloc(msg);
543*1125Smax.romanov@nginx.com 
544*1125Smax.romanov@nginx.com         if (nxt_slow_path(msg == NULL)) {
545*1125Smax.romanov@nginx.com             return NULL;
546*1125Smax.romanov@nginx.com         }
547*1125Smax.romanov@nginx.com     }
548*1125Smax.romanov@nginx.com 
549*1125Smax.romanov@nginx.com     nxt_thread_mutex_lock(&port->write_mutex);
550*1125Smax.romanov@nginx.com 
551*1125Smax.romanov@nginx.com     nxt_queue_insert_tail(&port->messages, &msg->link);
552*1125Smax.romanov@nginx.com 
553*1125Smax.romanov@nginx.com     nxt_thread_mutex_unlock(&port->write_mutex);
554*1125Smax.romanov@nginx.com 
555*1125Smax.romanov@nginx.com     return msg;
556*1125Smax.romanov@nginx.com }
557*1125Smax.romanov@nginx.com 
558*1125Smax.romanov@nginx.com 
55911Sigor@sysoev.ru void
56011Sigor@sysoev.ru nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port)
56111Sigor@sysoev.ru {
56211Sigor@sysoev.ru     port->socket.fd = port->pair[0];
56311Sigor@sysoev.ru     port->socket.log = &nxt_main_log;
56411Sigor@sysoev.ru 
565141Smax.romanov@nginx.com     port->engine = task->thread->engine;
566141Smax.romanov@nginx.com 
567141Smax.romanov@nginx.com     port->socket.read_work_queue = &port->engine->fast_work_queue;
56811Sigor@sysoev.ru     port->socket.read_handler = nxt_port_read_handler;
56911Sigor@sysoev.ru     port->socket.error_handler = nxt_port_error_handler;
57011Sigor@sysoev.ru 
571141Smax.romanov@nginx.com     nxt_fd_event_enable_read(port->engine, &port->socket);
57211Sigor@sysoev.ru }
57311Sigor@sysoev.ru 
57411Sigor@sysoev.ru 
57511Sigor@sysoev.ru void
57611Sigor@sysoev.ru nxt_port_read_close(nxt_port_t *port)
57711Sigor@sysoev.ru {
578350Smax.romanov@nginx.com     port->socket.read_ready = 0;
5791015Smax.romanov@nginx.com     port->socket.read = NXT_EVENT_INACTIVE;
58013Sigor@sysoev.ru     nxt_socket_close(port->socket.task, port->pair[0]);
58111Sigor@sysoev.ru     port->pair[0] = -1;
58211Sigor@sysoev.ru }
58311Sigor@sysoev.ru 
58411Sigor@sysoev.ru 
58511Sigor@sysoev.ru static void
58611Sigor@sysoev.ru nxt_port_read_handler(nxt_task_t *task, void *obj, void *data)
58711Sigor@sysoev.ru {
58842Smax.romanov@nginx.com     ssize_t              n;
58942Smax.romanov@nginx.com     nxt_buf_t            *b;
59042Smax.romanov@nginx.com     nxt_port_t           *port;
59142Smax.romanov@nginx.com     struct iovec         iov[2];
59242Smax.romanov@nginx.com     nxt_port_recv_msg_t  msg;
59311Sigor@sysoev.ru 
594125Smax.romanov@nginx.com     port = msg.port = nxt_container_of(obj, nxt_port_t, socket);
59511Sigor@sysoev.ru 
596141Smax.romanov@nginx.com     nxt_assert(port->engine == task->thread->engine);
597141Smax.romanov@nginx.com 
59811Sigor@sysoev.ru     for ( ;; ) {
59911Sigor@sysoev.ru 
60011Sigor@sysoev.ru         b = nxt_port_buf_alloc(port);
60111Sigor@sysoev.ru 
60211Sigor@sysoev.ru         if (nxt_slow_path(b == NULL)) {
60311Sigor@sysoev.ru             /* TODO: disable event for some time */
60411Sigor@sysoev.ru         }
60511Sigor@sysoev.ru 
60642Smax.romanov@nginx.com         iov[0].iov_base = &msg.port_msg;
60714Sigor@sysoev.ru         iov[0].iov_len = sizeof(nxt_port_msg_t);
60811Sigor@sysoev.ru 
60914Sigor@sysoev.ru         iov[1].iov_base = b->mem.pos;
61014Sigor@sysoev.ru         iov[1].iov_len = port->max_size;
61114Sigor@sysoev.ru 
61242Smax.romanov@nginx.com         n = nxt_socketpair_recv(&port->socket, &msg.fd, iov, 2);
61311Sigor@sysoev.ru 
61411Sigor@sysoev.ru         if (n > 0) {
61542Smax.romanov@nginx.com 
61642Smax.romanov@nginx.com             msg.buf = b;
61782Smax.romanov@nginx.com             msg.size = n;
61842Smax.romanov@nginx.com 
61982Smax.romanov@nginx.com             nxt_port_read_msg_process(task, port, &msg);
62011Sigor@sysoev.ru 
621194Smax.romanov@nginx.com             /*
622194Smax.romanov@nginx.com              * To disable instant completion or buffer re-usage,
623194Smax.romanov@nginx.com              * handler should reset 'msg.buf'.
624194Smax.romanov@nginx.com              */
625194Smax.romanov@nginx.com             if (msg.buf == b) {
62611Sigor@sysoev.ru                 nxt_port_buf_free(port, b);
62711Sigor@sysoev.ru             }
62811Sigor@sysoev.ru 
62911Sigor@sysoev.ru             if (port->socket.read_ready) {
63011Sigor@sysoev.ru                 continue;
63111Sigor@sysoev.ru             }
63211Sigor@sysoev.ru 
63311Sigor@sysoev.ru             return;
63411Sigor@sysoev.ru         }
63511Sigor@sysoev.ru 
63611Sigor@sysoev.ru         if (n == NXT_AGAIN) {
63711Sigor@sysoev.ru             nxt_port_buf_free(port, b);
63811Sigor@sysoev.ru 
63912Sigor@sysoev.ru             nxt_fd_event_enable_read(task->thread->engine, &port->socket);
64011Sigor@sysoev.ru             return;
64111Sigor@sysoev.ru         }
64211Sigor@sysoev.ru 
64311Sigor@sysoev.ru         /* n == 0 || n == NXT_ERROR */
64411Sigor@sysoev.ru 
64511Sigor@sysoev.ru         nxt_work_queue_add(&task->thread->engine->fast_work_queue,
64611Sigor@sysoev.ru                            nxt_port_error_handler, task, &port->socket, NULL);
64711Sigor@sysoev.ru         return;
64811Sigor@sysoev.ru     }
64911Sigor@sysoev.ru }
65011Sigor@sysoev.ru 
65111Sigor@sysoev.ru 
6521005Smax.romanov@nginx.com typedef struct {
6531005Smax.romanov@nginx.com     uint32_t  stream;
6541005Smax.romanov@nginx.com     uint32_t  pid;
6551005Smax.romanov@nginx.com } nxt_port_frag_key_t;
6561005Smax.romanov@nginx.com 
6571005Smax.romanov@nginx.com 
658352Smax.romanov@nginx.com static nxt_int_t
659352Smax.romanov@nginx.com nxt_port_lvlhsh_frag_test(nxt_lvlhsh_query_t *lhq, void *data)
660352Smax.romanov@nginx.com {
661352Smax.romanov@nginx.com     nxt_port_recv_msg_t  *fmsg;
6621005Smax.romanov@nginx.com     nxt_port_frag_key_t  *frag_key;
663352Smax.romanov@nginx.com 
664352Smax.romanov@nginx.com     fmsg = data;
6651005Smax.romanov@nginx.com     frag_key = (nxt_port_frag_key_t *) lhq->key.start;
666352Smax.romanov@nginx.com 
6671005Smax.romanov@nginx.com     if (lhq->key.length == sizeof(nxt_port_frag_key_t)
6681005Smax.romanov@nginx.com         && frag_key->stream == fmsg->port_msg.stream
6691005Smax.romanov@nginx.com         && frag_key->pid == (uint32_t) fmsg->port_msg.pid)
670352Smax.romanov@nginx.com     {
671352Smax.romanov@nginx.com         return NXT_OK;
672352Smax.romanov@nginx.com     }
673352Smax.romanov@nginx.com 
674352Smax.romanov@nginx.com     return NXT_DECLINED;
675352Smax.romanov@nginx.com }
676352Smax.romanov@nginx.com 
677352Smax.romanov@nginx.com 
678352Smax.romanov@nginx.com static void *
679352Smax.romanov@nginx.com nxt_port_lvlhsh_frag_alloc(void *ctx, size_t size)
680352Smax.romanov@nginx.com {
6811084Smax.romanov@nginx.com     return nxt_mp_align(ctx, size, size);
682352Smax.romanov@nginx.com }
683352Smax.romanov@nginx.com 
684352Smax.romanov@nginx.com 
685352Smax.romanov@nginx.com static void
686352Smax.romanov@nginx.com nxt_port_lvlhsh_frag_free(void *ctx, void *p)
687352Smax.romanov@nginx.com {
688389Smax.romanov@nginx.com     nxt_mp_free(ctx, p);
689352Smax.romanov@nginx.com }
690352Smax.romanov@nginx.com 
691352Smax.romanov@nginx.com 
692352Smax.romanov@nginx.com static const nxt_lvlhsh_proto_t  lvlhsh_frag_proto  nxt_aligned(64) = {
693352Smax.romanov@nginx.com     NXT_LVLHSH_DEFAULT,
694352Smax.romanov@nginx.com     nxt_port_lvlhsh_frag_test,
695352Smax.romanov@nginx.com     nxt_port_lvlhsh_frag_alloc,
696352Smax.romanov@nginx.com     nxt_port_lvlhsh_frag_free,
697352Smax.romanov@nginx.com };
698352Smax.romanov@nginx.com 
699352Smax.romanov@nginx.com 
700352Smax.romanov@nginx.com static nxt_port_recv_msg_t *
701352Smax.romanov@nginx.com nxt_port_frag_start(nxt_task_t *task, nxt_port_t *port,
702352Smax.romanov@nginx.com     nxt_port_recv_msg_t *msg)
703352Smax.romanov@nginx.com {
704352Smax.romanov@nginx.com     nxt_int_t            res;
705352Smax.romanov@nginx.com     nxt_lvlhsh_query_t   lhq;
706352Smax.romanov@nginx.com     nxt_port_recv_msg_t  *fmsg;
7071005Smax.romanov@nginx.com     nxt_port_frag_key_t  frag_key;
708352Smax.romanov@nginx.com 
709352Smax.romanov@nginx.com     nxt_debug(task, "start frag stream #%uD", msg->port_msg.stream);
710352Smax.romanov@nginx.com 
711352Smax.romanov@nginx.com     fmsg = nxt_mp_alloc(port->mem_pool, sizeof(nxt_port_recv_msg_t));
712352Smax.romanov@nginx.com 
713352Smax.romanov@nginx.com     if (nxt_slow_path(fmsg == NULL)) {
714352Smax.romanov@nginx.com         return NULL;
715352Smax.romanov@nginx.com     }
716352Smax.romanov@nginx.com 
717352Smax.romanov@nginx.com     *fmsg = *msg;
718352Smax.romanov@nginx.com 
7191005Smax.romanov@nginx.com     frag_key.stream = fmsg->port_msg.stream;
7201005Smax.romanov@nginx.com     frag_key.pid = fmsg->port_msg.pid;
7211005Smax.romanov@nginx.com 
7221005Smax.romanov@nginx.com     lhq.key_hash = nxt_murmur_hash2(&frag_key, sizeof(nxt_port_frag_key_t));
7231005Smax.romanov@nginx.com     lhq.key.length = sizeof(nxt_port_frag_key_t);
7241005Smax.romanov@nginx.com     lhq.key.start = (u_char *) &frag_key;
725352Smax.romanov@nginx.com     lhq.proto = &lvlhsh_frag_proto;
726352Smax.romanov@nginx.com     lhq.replace = 0;
727352Smax.romanov@nginx.com     lhq.value = fmsg;
728352Smax.romanov@nginx.com     lhq.pool = port->mem_pool;
729352Smax.romanov@nginx.com 
730352Smax.romanov@nginx.com     res = nxt_lvlhsh_insert(&port->frags, &lhq);
731352Smax.romanov@nginx.com 
732352Smax.romanov@nginx.com     switch (res) {
733352Smax.romanov@nginx.com 
734352Smax.romanov@nginx.com     case NXT_OK:
735352Smax.romanov@nginx.com         return fmsg;
736352Smax.romanov@nginx.com 
737352Smax.romanov@nginx.com     case NXT_DECLINED:
738352Smax.romanov@nginx.com         nxt_log(task, NXT_LOG_WARN, "duplicate frag stream #%uD",
739352Smax.romanov@nginx.com                 fmsg->port_msg.stream);
740352Smax.romanov@nginx.com         nxt_mp_free(port->mem_pool, fmsg);
741352Smax.romanov@nginx.com 
742352Smax.romanov@nginx.com         return NULL;
743352Smax.romanov@nginx.com 
744352Smax.romanov@nginx.com     default:
745352Smax.romanov@nginx.com         nxt_log(task, NXT_LOG_WARN, "failed to add frag stream #%uD",
746352Smax.romanov@nginx.com                 fmsg->port_msg.stream);
747352Smax.romanov@nginx.com 
748352Smax.romanov@nginx.com         nxt_mp_free(port->mem_pool, fmsg);
749352Smax.romanov@nginx.com 
750352Smax.romanov@nginx.com         return NULL;
751352Smax.romanov@nginx.com 
752352Smax.romanov@nginx.com     }
753352Smax.romanov@nginx.com }
754352Smax.romanov@nginx.com 
755352Smax.romanov@nginx.com 
756352Smax.romanov@nginx.com static nxt_port_recv_msg_t *
7571005Smax.romanov@nginx.com nxt_port_frag_find(nxt_task_t *task, nxt_port_t *port, nxt_port_recv_msg_t *msg)
758352Smax.romanov@nginx.com {
7591005Smax.romanov@nginx.com     nxt_int_t            res;
7601005Smax.romanov@nginx.com     nxt_bool_t           last;
7611005Smax.romanov@nginx.com     nxt_lvlhsh_query_t   lhq;
7621005Smax.romanov@nginx.com     nxt_port_frag_key_t  frag_key;
7631005Smax.romanov@nginx.com 
7641005Smax.romanov@nginx.com     last = msg->port_msg.mf == 0;
765352Smax.romanov@nginx.com 
7661005Smax.romanov@nginx.com     nxt_debug(task, "%s frag stream #%uD", last ? "last" : "next",
7671005Smax.romanov@nginx.com               msg->port_msg.stream);
768352Smax.romanov@nginx.com 
7691005Smax.romanov@nginx.com     frag_key.stream = msg->port_msg.stream;
7701005Smax.romanov@nginx.com     frag_key.pid = msg->port_msg.pid;
7711005Smax.romanov@nginx.com 
7721005Smax.romanov@nginx.com     lhq.key_hash = nxt_murmur_hash2(&frag_key, sizeof(nxt_port_frag_key_t));
7731005Smax.romanov@nginx.com     lhq.key.length = sizeof(nxt_port_frag_key_t);
7741005Smax.romanov@nginx.com     lhq.key.start = (u_char *) &frag_key;
775352Smax.romanov@nginx.com     lhq.proto = &lvlhsh_frag_proto;
776352Smax.romanov@nginx.com     lhq.pool = port->mem_pool;
777352Smax.romanov@nginx.com 
778352Smax.romanov@nginx.com     res = last != 0 ? nxt_lvlhsh_delete(&port->frags, &lhq) :
779352Smax.romanov@nginx.com           nxt_lvlhsh_find(&port->frags, &lhq);
780352Smax.romanov@nginx.com 
781352Smax.romanov@nginx.com     switch (res) {
782352Smax.romanov@nginx.com 
783352Smax.romanov@nginx.com     case NXT_OK:
784352Smax.romanov@nginx.com         return lhq.value;
785352Smax.romanov@nginx.com 
786352Smax.romanov@nginx.com     default:
7871005Smax.romanov@nginx.com         nxt_log(task, NXT_LOG_INFO, "frag stream #%uD not found",
7881005Smax.romanov@nginx.com                 frag_key.stream);
789352Smax.romanov@nginx.com 
790352Smax.romanov@nginx.com         return NULL;
791352Smax.romanov@nginx.com     }
792352Smax.romanov@nginx.com }
793352Smax.romanov@nginx.com 
794352Smax.romanov@nginx.com 
79511Sigor@sysoev.ru static void
79611Sigor@sysoev.ru nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
79782Smax.romanov@nginx.com     nxt_port_recv_msg_t *msg)
79811Sigor@sysoev.ru {
799352Smax.romanov@nginx.com     nxt_buf_t            *b, *orig_b;
800352Smax.romanov@nginx.com     nxt_port_recv_msg_t  *fmsg;
80111Sigor@sysoev.ru 
80282Smax.romanov@nginx.com     if (nxt_slow_path(msg->size < sizeof(nxt_port_msg_t))) {
803564Svbart@nginx.com         nxt_alert(task, "port %d: too small message:%uz",
804564Svbart@nginx.com                   port->socket.fd, msg->size);
805423Smax.romanov@nginx.com 
806423Smax.romanov@nginx.com         if (msg->fd != -1) {
807423Smax.romanov@nginx.com             nxt_fd_close(msg->fd);
808423Smax.romanov@nginx.com         }
809423Smax.romanov@nginx.com 
810423Smax.romanov@nginx.com         return;
81111Sigor@sysoev.ru     }
81211Sigor@sysoev.ru 
81342Smax.romanov@nginx.com     /* adjust size to actual buffer used size */
81482Smax.romanov@nginx.com     msg->size -= sizeof(nxt_port_msg_t);
81542Smax.romanov@nginx.com 
81642Smax.romanov@nginx.com     b = orig_b = msg->buf;
81782Smax.romanov@nginx.com     b->mem.free += msg->size;
81842Smax.romanov@nginx.com 
819423Smax.romanov@nginx.com     if (msg->port_msg.tracking) {
820423Smax.romanov@nginx.com         msg->cancelled = nxt_port_mmap_tracking_read(task, msg) == 0;
821423Smax.romanov@nginx.com 
822423Smax.romanov@nginx.com     } else {
823423Smax.romanov@nginx.com         msg->cancelled = 0;
82442Smax.romanov@nginx.com     }
82511Sigor@sysoev.ru 
826352Smax.romanov@nginx.com     if (nxt_slow_path(msg->port_msg.nf != 0)) {
827423Smax.romanov@nginx.com 
8281005Smax.romanov@nginx.com         fmsg = nxt_port_frag_find(task, port, msg);
829352Smax.romanov@nginx.com 
830551Smax.romanov@nginx.com         if (nxt_slow_path(fmsg == NULL)) {
831551Smax.romanov@nginx.com             goto fmsg_failed;
832551Smax.romanov@nginx.com         }
833423Smax.romanov@nginx.com 
834423Smax.romanov@nginx.com         if (nxt_fast_path(fmsg->cancelled == 0)) {
835423Smax.romanov@nginx.com 
836423Smax.romanov@nginx.com             if (msg->port_msg.mmap) {
837423Smax.romanov@nginx.com                 nxt_port_mmap_read(task, msg);
838423Smax.romanov@nginx.com             }
839423Smax.romanov@nginx.com 
840423Smax.romanov@nginx.com             nxt_buf_chain_add(&fmsg->buf, msg->buf);
841423Smax.romanov@nginx.com 
842423Smax.romanov@nginx.com             fmsg->size += msg->size;
843423Smax.romanov@nginx.com             msg->buf = NULL;
844423Smax.romanov@nginx.com             b = NULL;
845423Smax.romanov@nginx.com 
846423Smax.romanov@nginx.com             if (nxt_fast_path(msg->port_msg.mf == 0)) {
847423Smax.romanov@nginx.com 
848423Smax.romanov@nginx.com                 b = fmsg->buf;
849423Smax.romanov@nginx.com 
850423Smax.romanov@nginx.com                 port->handler(task, fmsg);
851423Smax.romanov@nginx.com 
852423Smax.romanov@nginx.com                 msg->buf = fmsg->buf;
853423Smax.romanov@nginx.com                 msg->fd = fmsg->fd;
854974Smax.romanov@nginx.com 
855974Smax.romanov@nginx.com                 /*
856974Smax.romanov@nginx.com                  * To disable instant completion or buffer re-usage,
857974Smax.romanov@nginx.com                  * handler should reset 'msg.buf'.
858974Smax.romanov@nginx.com                  */
859974Smax.romanov@nginx.com                 if (!msg->port_msg.mmap && msg->buf == b) {
860974Smax.romanov@nginx.com                     nxt_port_buf_free(port, b);
861974Smax.romanov@nginx.com                 }
862423Smax.romanov@nginx.com             }
863352Smax.romanov@nginx.com         }
864352Smax.romanov@nginx.com 
865352Smax.romanov@nginx.com         if (nxt_fast_path(msg->port_msg.mf == 0)) {
866352Smax.romanov@nginx.com             nxt_mp_free(port->mem_pool, fmsg);
867352Smax.romanov@nginx.com         }
868352Smax.romanov@nginx.com     } else {
869352Smax.romanov@nginx.com         if (nxt_slow_path(msg->port_msg.mf != 0)) {
870423Smax.romanov@nginx.com 
871423Smax.romanov@nginx.com             if (msg->port_msg.mmap && msg->cancelled == 0) {
872423Smax.romanov@nginx.com                 nxt_port_mmap_read(task, msg);
873423Smax.romanov@nginx.com                 b = msg->buf;
874423Smax.romanov@nginx.com             }
875423Smax.romanov@nginx.com 
876352Smax.romanov@nginx.com             fmsg = nxt_port_frag_start(task, port, msg);
877352Smax.romanov@nginx.com 
878551Smax.romanov@nginx.com             if (nxt_slow_path(fmsg == NULL)) {
879551Smax.romanov@nginx.com                 goto fmsg_failed;
880551Smax.romanov@nginx.com             }
881352Smax.romanov@nginx.com 
882352Smax.romanov@nginx.com             fmsg->port_msg.nf = 0;
883352Smax.romanov@nginx.com             fmsg->port_msg.mf = 0;
884352Smax.romanov@nginx.com 
885423Smax.romanov@nginx.com             if (nxt_fast_path(msg->cancelled == 0)) {
886423Smax.romanov@nginx.com                 msg->buf = NULL;
887423Smax.romanov@nginx.com                 msg->fd = -1;
888423Smax.romanov@nginx.com                 b = NULL;
889423Smax.romanov@nginx.com 
890423Smax.romanov@nginx.com             } else {
891423Smax.romanov@nginx.com                 if (msg->fd != -1) {
892423Smax.romanov@nginx.com                     nxt_fd_close(msg->fd);
893423Smax.romanov@nginx.com                 }
894423Smax.romanov@nginx.com             }
895352Smax.romanov@nginx.com         } else {
896423Smax.romanov@nginx.com             if (nxt_fast_path(msg->cancelled == 0)) {
897423Smax.romanov@nginx.com 
898423Smax.romanov@nginx.com                 if (msg->port_msg.mmap) {
899423Smax.romanov@nginx.com                     nxt_port_mmap_read(task, msg);
900423Smax.romanov@nginx.com                     b = msg->buf;
901423Smax.romanov@nginx.com                 }
902423Smax.romanov@nginx.com 
903423Smax.romanov@nginx.com                 port->handler(task, msg);
904423Smax.romanov@nginx.com             }
905352Smax.romanov@nginx.com         }
906352Smax.romanov@nginx.com     }
90742Smax.romanov@nginx.com 
908551Smax.romanov@nginx.com fmsg_failed:
909551Smax.romanov@nginx.com 
91082Smax.romanov@nginx.com     if (msg->port_msg.mmap && orig_b != b) {
91142Smax.romanov@nginx.com 
912194Smax.romanov@nginx.com         /*
913194Smax.romanov@nginx.com          * To disable instant buffer completion,
914194Smax.romanov@nginx.com          * handler should reset 'msg->buf'.
915194Smax.romanov@nginx.com          */
916194Smax.romanov@nginx.com         if (msg->buf == b) {
917194Smax.romanov@nginx.com             /* complete mmap buffers */
918194Smax.romanov@nginx.com             for (; b != NULL; b = b->next) {
919194Smax.romanov@nginx.com                 nxt_debug(task, "complete buffer %p", b);
920194Smax.romanov@nginx.com 
921194Smax.romanov@nginx.com                 nxt_work_queue_add(port->socket.read_work_queue,
922194Smax.romanov@nginx.com                     b->completion_handler, task, b, b->parent);
923194Smax.romanov@nginx.com             }
92442Smax.romanov@nginx.com         }
925194Smax.romanov@nginx.com 
926194Smax.romanov@nginx.com         /* restore original buf */
927194Smax.romanov@nginx.com         msg->buf = orig_b;
92842Smax.romanov@nginx.com     }
92911Sigor@sysoev.ru }
93011Sigor@sysoev.ru 
93111Sigor@sysoev.ru 
93211Sigor@sysoev.ru static nxt_buf_t *
93311Sigor@sysoev.ru nxt_port_buf_alloc(nxt_port_t *port)
93411Sigor@sysoev.ru {
93511Sigor@sysoev.ru     nxt_buf_t  *b;
93611Sigor@sysoev.ru 
93711Sigor@sysoev.ru     if (port->free_bufs != NULL) {
93811Sigor@sysoev.ru         b = port->free_bufs;
93911Sigor@sysoev.ru         port->free_bufs = b->next;
94011Sigor@sysoev.ru 
94111Sigor@sysoev.ru         b->mem.pos = b->mem.start;
94211Sigor@sysoev.ru         b->mem.free = b->mem.start;
94342Smax.romanov@nginx.com         b->next = NULL;
94411Sigor@sysoev.ru     } else {
94511Sigor@sysoev.ru         b = nxt_buf_mem_alloc(port->mem_pool, port->max_size, 0);
94611Sigor@sysoev.ru         if (nxt_slow_path(b == NULL)) {
94711Sigor@sysoev.ru             return NULL;
94811Sigor@sysoev.ru         }
94911Sigor@sysoev.ru     }
95011Sigor@sysoev.ru 
95111Sigor@sysoev.ru     return b;
95211Sigor@sysoev.ru }
95311Sigor@sysoev.ru 
95411Sigor@sysoev.ru 
95511Sigor@sysoev.ru static void
95611Sigor@sysoev.ru nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b)
95711Sigor@sysoev.ru {
958974Smax.romanov@nginx.com     nxt_buf_chain_add(&b, port->free_bufs);
95911Sigor@sysoev.ru     port->free_bufs = b;
96011Sigor@sysoev.ru }
96111Sigor@sysoev.ru 
96211Sigor@sysoev.ru 
96311Sigor@sysoev.ru static void
96411Sigor@sysoev.ru nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
96511Sigor@sysoev.ru {
966343Smax.romanov@nginx.com     int                  use_delta;
967197Smax.romanov@nginx.com     nxt_buf_t            *b;
968197Smax.romanov@nginx.com     nxt_port_t           *port;
969197Smax.romanov@nginx.com     nxt_work_queue_t     *wq;
970197Smax.romanov@nginx.com     nxt_port_send_msg_t  *msg;
971197Smax.romanov@nginx.com 
972125Smax.romanov@nginx.com     nxt_debug(task, "port error handler %p", obj);
97311Sigor@sysoev.ru     /* TODO */
974197Smax.romanov@nginx.com 
975197Smax.romanov@nginx.com     port = nxt_container_of(obj, nxt_port_t, socket);
976197Smax.romanov@nginx.com 
977343Smax.romanov@nginx.com     use_delta = 0;
978343Smax.romanov@nginx.com 
979343Smax.romanov@nginx.com     if (obj == data) {
980343Smax.romanov@nginx.com         use_delta--;
981343Smax.romanov@nginx.com     }
982197Smax.romanov@nginx.com 
983343Smax.romanov@nginx.com     wq = &task->thread->engine->fast_work_queue;
984343Smax.romanov@nginx.com 
985343Smax.romanov@nginx.com     nxt_thread_mutex_lock(&port->write_mutex);
986343Smax.romanov@nginx.com 
987343Smax.romanov@nginx.com     nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) {
988197Smax.romanov@nginx.com 
989521Szelenkov@nginx.com         for (b = msg->buf; b != NULL; b = b->next) {
990197Smax.romanov@nginx.com             if (nxt_buf_is_sync(b)) {
991197Smax.romanov@nginx.com                 continue;
992197Smax.romanov@nginx.com             }
993197Smax.romanov@nginx.com 
994197Smax.romanov@nginx.com             nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
995197Smax.romanov@nginx.com         }
996197Smax.romanov@nginx.com 
997197Smax.romanov@nginx.com         nxt_queue_remove(&msg->link);
998343Smax.romanov@nginx.com         use_delta--;
999*1125Smax.romanov@nginx.com 
1000*1125Smax.romanov@nginx.com         nxt_port_release_send_msg(msg);
1001197Smax.romanov@nginx.com 
1002197Smax.romanov@nginx.com     } nxt_queue_loop;
1003343Smax.romanov@nginx.com 
1004343Smax.romanov@nginx.com     nxt_thread_mutex_unlock(&port->write_mutex);
1005343Smax.romanov@nginx.com 
1006343Smax.romanov@nginx.com     if (use_delta != 0) {
1007343Smax.romanov@nginx.com         nxt_port_use(task, port, use_delta);
1008343Smax.romanov@nginx.com     }
100911Sigor@sysoev.ru }
1010