xref: /unit/src/nxt_port_socket.c (revision 1002)
111Sigor@sysoev.ru 
211Sigor@sysoev.ru /*
311Sigor@sysoev.ru  * Copyright (C) Igor Sysoev
411Sigor@sysoev.ru  * Copyright (C) NGINX, Inc.
511Sigor@sysoev.ru  */
611Sigor@sysoev.ru 
711Sigor@sysoev.ru #include <nxt_main.h>
811Sigor@sysoev.ru 
911Sigor@sysoev.ru 
1011Sigor@sysoev.ru static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data);
11592Sigor@sysoev.ru static nxt_buf_t *nxt_port_buf_completion(nxt_task_t *task,
12592Sigor@sysoev.ru     nxt_work_queue_t *wq, nxt_buf_t *b, size_t sent, nxt_bool_t mmap_mode);
1311Sigor@sysoev.ru static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data);
1411Sigor@sysoev.ru static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
1582Smax.romanov@nginx.com     nxt_port_recv_msg_t *msg);
1611Sigor@sysoev.ru static nxt_buf_t *nxt_port_buf_alloc(nxt_port_t *port);
1711Sigor@sysoev.ru static void nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b);
1811Sigor@sysoev.ru static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data);
1911Sigor@sysoev.ru 
2011Sigor@sysoev.ru 
2114Sigor@sysoev.ru nxt_int_t
2214Sigor@sysoev.ru nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port, size_t max_size)
2311Sigor@sysoev.ru {
2465Sigor@sysoev.ru     nxt_int_t     sndbuf, rcvbuf, size;
2565Sigor@sysoev.ru     nxt_socket_t  snd, rcv;
2611Sigor@sysoev.ru 
2714Sigor@sysoev.ru     port->socket.task = task;
2814Sigor@sysoev.ru 
2914Sigor@sysoev.ru     port->pair[0] = -1;
3014Sigor@sysoev.ru     port->pair[1] = -1;
3114Sigor@sysoev.ru 
3213Sigor@sysoev.ru     if (nxt_slow_path(nxt_socketpair_create(task, port->pair) != NXT_OK)) {
3311Sigor@sysoev.ru         goto socketpair_fail;
3411Sigor@sysoev.ru     }
3511Sigor@sysoev.ru 
3611Sigor@sysoev.ru     snd = port->pair[1];
3711Sigor@sysoev.ru 
3813Sigor@sysoev.ru     sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF);
3911Sigor@sysoev.ru     if (nxt_slow_path(sndbuf < 0)) {
4011Sigor@sysoev.ru         goto getsockopt_fail;
4111Sigor@sysoev.ru     }
4211Sigor@sysoev.ru 
4311Sigor@sysoev.ru     rcv = port->pair[0];
4411Sigor@sysoev.ru 
4513Sigor@sysoev.ru     rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF);
4611Sigor@sysoev.ru     if (nxt_slow_path(rcvbuf < 0)) {
4711Sigor@sysoev.ru         goto getsockopt_fail;
4811Sigor@sysoev.ru     }
4911Sigor@sysoev.ru 
5011Sigor@sysoev.ru     if (max_size == 0) {
5111Sigor@sysoev.ru         max_size = 16 * 1024;
5211Sigor@sysoev.ru     }
5311Sigor@sysoev.ru 
5411Sigor@sysoev.ru     if ((size_t) sndbuf < max_size) {
5511Sigor@sysoev.ru         /*
5611Sigor@sysoev.ru          * On Unix domain sockets
5711Sigor@sysoev.ru          *   Linux uses 224K on both send and receive directions;
5811Sigor@sysoev.ru          *   FreeBSD, MacOSX, NetBSD, and OpenBSD use 2K buffer size
5911Sigor@sysoev.ru          *   on send direction and 4K buffer size on receive direction;
6011Sigor@sysoev.ru          *   Solaris uses 16K on send direction and 5K on receive direction.
6111Sigor@sysoev.ru          */
6213Sigor@sysoev.ru         (void) nxt_socket_setsockopt(task, snd, SOL_SOCKET, SO_SNDBUF,
6313Sigor@sysoev.ru                                      max_size);
6411Sigor@sysoev.ru 
6513Sigor@sysoev.ru         sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF);
6611Sigor@sysoev.ru         if (nxt_slow_path(sndbuf < 0)) {
6711Sigor@sysoev.ru             goto getsockopt_fail;
6811Sigor@sysoev.ru         }
6911Sigor@sysoev.ru 
7011Sigor@sysoev.ru         size = sndbuf * 4;
7111Sigor@sysoev.ru 
7211Sigor@sysoev.ru         if (rcvbuf < size) {
7313Sigor@sysoev.ru             (void) nxt_socket_setsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF,
7413Sigor@sysoev.ru                                          size);
7511Sigor@sysoev.ru 
7613Sigor@sysoev.ru             rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF);
7711Sigor@sysoev.ru             if (nxt_slow_path(rcvbuf < 0)) {
7811Sigor@sysoev.ru                 goto getsockopt_fail;
7911Sigor@sysoev.ru             }
8011Sigor@sysoev.ru         }
8111Sigor@sysoev.ru     }
8211Sigor@sysoev.ru 
8311Sigor@sysoev.ru     port->max_size = nxt_min(max_size, (size_t) sndbuf);
8411Sigor@sysoev.ru     port->max_share = (64 * 1024);
8511Sigor@sysoev.ru 
8614Sigor@sysoev.ru     return NXT_OK;
8711Sigor@sysoev.ru 
8811Sigor@sysoev.ru getsockopt_fail:
8911Sigor@sysoev.ru 
9013Sigor@sysoev.ru     nxt_socket_close(task, port->pair[0]);
9113Sigor@sysoev.ru     nxt_socket_close(task, port->pair[1]);
9211Sigor@sysoev.ru 
9311Sigor@sysoev.ru socketpair_fail:
9411Sigor@sysoev.ru 
9514Sigor@sysoev.ru     return NXT_ERROR;
9611Sigor@sysoev.ru }
9711Sigor@sysoev.ru 
9811Sigor@sysoev.ru 
9911Sigor@sysoev.ru void
10011Sigor@sysoev.ru nxt_port_destroy(nxt_port_t *port)
10111Sigor@sysoev.ru {
10213Sigor@sysoev.ru     nxt_socket_close(port->socket.task, port->socket.fd);
10365Sigor@sysoev.ru     nxt_mp_destroy(port->mem_pool);
10411Sigor@sysoev.ru }
10511Sigor@sysoev.ru 
10611Sigor@sysoev.ru 
10711Sigor@sysoev.ru void
10811Sigor@sysoev.ru nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port)
10911Sigor@sysoev.ru {
11011Sigor@sysoev.ru     port->socket.fd = port->pair[1];
11111Sigor@sysoev.ru     port->socket.log = &nxt_main_log;
11211Sigor@sysoev.ru     port->socket.write_ready = 1;
11311Sigor@sysoev.ru 
114141Smax.romanov@nginx.com     port->engine = task->thread->engine;
115141Smax.romanov@nginx.com 
116141Smax.romanov@nginx.com     port->socket.write_work_queue = &port->engine->fast_work_queue;
11711Sigor@sysoev.ru     port->socket.write_handler = nxt_port_write_handler;
11811Sigor@sysoev.ru     port->socket.error_handler = nxt_port_error_handler;
119197Smax.romanov@nginx.com 
120197Smax.romanov@nginx.com     if (port->iov == NULL) {
121613Svbart@nginx.com         port->iov = nxt_mp_get(port->mem_pool,
122613Svbart@nginx.com                                sizeof(struct iovec) * NXT_IOBUF_MAX * 10);
123613Svbart@nginx.com         port->mmsg_buf = nxt_mp_get(port->mem_pool,
124613Svbart@nginx.com                                     sizeof(uint32_t) * 3 * NXT_IOBUF_MAX * 10);
125197Smax.romanov@nginx.com     }
12611Sigor@sysoev.ru }
12711Sigor@sysoev.ru 
12811Sigor@sysoev.ru 
12911Sigor@sysoev.ru void
13011Sigor@sysoev.ru nxt_port_write_close(nxt_port_t *port)
13111Sigor@sysoev.ru {
13213Sigor@sysoev.ru     nxt_socket_close(port->socket.task, port->pair[1]);
13311Sigor@sysoev.ru     port->pair[1] = -1;
13411Sigor@sysoev.ru }
13511Sigor@sysoev.ru 
13611Sigor@sysoev.ru 
137122Smax.romanov@nginx.com static void
138122Smax.romanov@nginx.com nxt_port_release_send_msg(nxt_task_t *task, void *obj, void *data)
139122Smax.romanov@nginx.com {
140122Smax.romanov@nginx.com     nxt_event_engine_t   *engine;
141122Smax.romanov@nginx.com     nxt_port_send_msg_t  *msg;
142122Smax.romanov@nginx.com 
143122Smax.romanov@nginx.com     msg = obj;
144122Smax.romanov@nginx.com     engine = data;
145122Smax.romanov@nginx.com 
146564Svbart@nginx.com     nxt_assert(data == msg->work.data);
147122Smax.romanov@nginx.com 
148122Smax.romanov@nginx.com     if (engine != task->thread->engine) {
149122Smax.romanov@nginx.com 
150122Smax.romanov@nginx.com         nxt_debug(task, "current thread is %PT, expected %PT",
151122Smax.romanov@nginx.com                   task->thread->tid, engine->task.thread->tid);
152122Smax.romanov@nginx.com 
153122Smax.romanov@nginx.com         nxt_event_engine_post(engine, &msg->work);
154122Smax.romanov@nginx.com 
155122Smax.romanov@nginx.com         return;
156122Smax.romanov@nginx.com     }
157122Smax.romanov@nginx.com 
158430Sigor@sysoev.ru     nxt_mp_free(engine->mem_pool, obj);
159430Sigor@sysoev.ru     nxt_mp_release(engine->mem_pool);
160344Smax.romanov@nginx.com }
161344Smax.romanov@nginx.com 
162344Smax.romanov@nginx.com 
163344Smax.romanov@nginx.com static nxt_port_send_msg_t *
164344Smax.romanov@nginx.com nxt_port_msg_create(nxt_task_t *task, nxt_port_send_msg_t *m)
165344Smax.romanov@nginx.com {
166430Sigor@sysoev.ru     nxt_mp_t             *mp;
167344Smax.romanov@nginx.com     nxt_port_send_msg_t  *msg;
168344Smax.romanov@nginx.com 
169430Sigor@sysoev.ru     mp = task->thread->engine->mem_pool;
170430Sigor@sysoev.ru 
171430Sigor@sysoev.ru     msg = nxt_mp_alloc(mp, sizeof(nxt_port_send_msg_t));
172344Smax.romanov@nginx.com     if (nxt_slow_path(msg == NULL)) {
173344Smax.romanov@nginx.com         return NULL;
174344Smax.romanov@nginx.com     }
175344Smax.romanov@nginx.com 
176430Sigor@sysoev.ru     nxt_mp_retain(mp);
177430Sigor@sysoev.ru 
178344Smax.romanov@nginx.com     msg->link.next = NULL;
179344Smax.romanov@nginx.com     msg->link.prev = NULL;
180344Smax.romanov@nginx.com 
181344Smax.romanov@nginx.com     msg->buf = m->buf;
182978Smax.romanov@nginx.com     msg->share = m->share;
183344Smax.romanov@nginx.com     msg->fd = m->fd;
184344Smax.romanov@nginx.com     msg->close_fd = m->close_fd;
185344Smax.romanov@nginx.com     msg->port_msg = m->port_msg;
186344Smax.romanov@nginx.com 
187344Smax.romanov@nginx.com     msg->work.next = NULL;
188344Smax.romanov@nginx.com     msg->work.handler = nxt_port_release_send_msg;
189344Smax.romanov@nginx.com     msg->work.task = task;
190344Smax.romanov@nginx.com     msg->work.obj = msg;
191344Smax.romanov@nginx.com     msg->work.data = task->thread->engine;
192344Smax.romanov@nginx.com 
193344Smax.romanov@nginx.com     return msg;
194344Smax.romanov@nginx.com }
195344Smax.romanov@nginx.com 
196344Smax.romanov@nginx.com 
197344Smax.romanov@nginx.com static nxt_port_send_msg_t *
198344Smax.romanov@nginx.com nxt_port_msg_push(nxt_task_t *task, nxt_port_t *port, nxt_port_send_msg_t *msg)
199344Smax.romanov@nginx.com {
200344Smax.romanov@nginx.com     if (msg->work.data == NULL) {
201344Smax.romanov@nginx.com         msg = nxt_port_msg_create(task, msg);
202344Smax.romanov@nginx.com     }
203344Smax.romanov@nginx.com 
204344Smax.romanov@nginx.com     if (msg != NULL) {
205344Smax.romanov@nginx.com         nxt_queue_insert_tail(&port->messages, &msg->link);
206344Smax.romanov@nginx.com     }
207344Smax.romanov@nginx.com 
208344Smax.romanov@nginx.com     return msg;
209344Smax.romanov@nginx.com }
210344Smax.romanov@nginx.com 
211344Smax.romanov@nginx.com 
212344Smax.romanov@nginx.com static nxt_port_send_msg_t *
213344Smax.romanov@nginx.com nxt_port_msg_first(nxt_task_t *task, nxt_port_t *port, nxt_port_send_msg_t *msg)
214344Smax.romanov@nginx.com {
215344Smax.romanov@nginx.com     nxt_queue_link_t  *lnk;
216344Smax.romanov@nginx.com 
217344Smax.romanov@nginx.com     lnk = nxt_queue_first(&port->messages);
218344Smax.romanov@nginx.com 
219344Smax.romanov@nginx.com     if (lnk == nxt_queue_tail(&port->messages)) {
220344Smax.romanov@nginx.com         return msg;
221344Smax.romanov@nginx.com     }
222344Smax.romanov@nginx.com 
223344Smax.romanov@nginx.com     return nxt_queue_link_data(lnk, nxt_port_send_msg_t, link);
224122Smax.romanov@nginx.com }
225122Smax.romanov@nginx.com 
226122Smax.romanov@nginx.com 
22711Sigor@sysoev.ru nxt_int_t
228423Smax.romanov@nginx.com nxt_port_socket_twrite(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
229423Smax.romanov@nginx.com     nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b,
230423Smax.romanov@nginx.com     void *tracking)
23111Sigor@sysoev.ru {
232344Smax.romanov@nginx.com     nxt_port_send_msg_t  msg, *res;
23311Sigor@sysoev.ru 
234344Smax.romanov@nginx.com     msg.link.next = NULL;
235344Smax.romanov@nginx.com     msg.link.prev = NULL;
236122Smax.romanov@nginx.com 
237344Smax.romanov@nginx.com     msg.buf = b;
238344Smax.romanov@nginx.com     msg.fd = fd;
239344Smax.romanov@nginx.com     msg.close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0;
240344Smax.romanov@nginx.com     msg.share = 0;
24111Sigor@sysoev.ru 
242423Smax.romanov@nginx.com     if (tracking != NULL) {
243423Smax.romanov@nginx.com         nxt_port_mmap_tracking_write(msg.tracking_msg, tracking);
244423Smax.romanov@nginx.com     }
245423Smax.romanov@nginx.com 
246344Smax.romanov@nginx.com     msg.port_msg.stream = stream;
247344Smax.romanov@nginx.com     msg.port_msg.pid = nxt_pid;
248344Smax.romanov@nginx.com     msg.port_msg.reply_port = reply_port;
249344Smax.romanov@nginx.com     msg.port_msg.type = type & NXT_PORT_MSG_MASK;
250344Smax.romanov@nginx.com     msg.port_msg.last = (type & NXT_PORT_MSG_LAST) != 0;
251344Smax.romanov@nginx.com     msg.port_msg.mmap = 0;
252352Smax.romanov@nginx.com     msg.port_msg.nf = 0;
253352Smax.romanov@nginx.com     msg.port_msg.mf = 0;
254423Smax.romanov@nginx.com     msg.port_msg.tracking = tracking != NULL;
25511Sigor@sysoev.ru 
256344Smax.romanov@nginx.com     msg.work.data = NULL;
257343Smax.romanov@nginx.com 
25811Sigor@sysoev.ru     if (port->socket.write_ready) {
259344Smax.romanov@nginx.com         nxt_port_write_handler(task, &port->socket, &msg);
260344Smax.romanov@nginx.com     } else {
261344Smax.romanov@nginx.com         nxt_thread_mutex_lock(&port->write_mutex);
262344Smax.romanov@nginx.com 
263344Smax.romanov@nginx.com         res = nxt_port_msg_push(task, port, &msg);
264344Smax.romanov@nginx.com 
265344Smax.romanov@nginx.com         nxt_thread_mutex_unlock(&port->write_mutex);
266344Smax.romanov@nginx.com 
267344Smax.romanov@nginx.com         if (res == NULL) {
268344Smax.romanov@nginx.com             return NXT_ERROR;
269344Smax.romanov@nginx.com         }
270344Smax.romanov@nginx.com 
271344Smax.romanov@nginx.com         nxt_port_use(task, port, 1);
27211Sigor@sysoev.ru     }
27311Sigor@sysoev.ru 
27411Sigor@sysoev.ru     return NXT_OK;
27511Sigor@sysoev.ru }
27611Sigor@sysoev.ru 
27711Sigor@sysoev.ru 
27811Sigor@sysoev.ru static void
279343Smax.romanov@nginx.com nxt_port_fd_block_write(nxt_task_t *task, nxt_port_t *port, void *data)
280343Smax.romanov@nginx.com {
281343Smax.romanov@nginx.com     nxt_fd_event_block_write(task->thread->engine, &port->socket);
282343Smax.romanov@nginx.com }
283343Smax.romanov@nginx.com 
284343Smax.romanov@nginx.com 
285343Smax.romanov@nginx.com static void
286343Smax.romanov@nginx.com nxt_port_fd_enable_write(nxt_task_t *task, nxt_port_t *port, void *data)
287343Smax.romanov@nginx.com {
288343Smax.romanov@nginx.com     nxt_fd_event_enable_write(task->thread->engine, &port->socket);
289343Smax.romanov@nginx.com }
290343Smax.romanov@nginx.com 
291343Smax.romanov@nginx.com 
292343Smax.romanov@nginx.com static void
29311Sigor@sysoev.ru nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
29411Sigor@sysoev.ru {
295343Smax.romanov@nginx.com     int                     use_delta;
296197Smax.romanov@nginx.com     size_t                  plain_size;
29711Sigor@sysoev.ru     ssize_t                 n;
298343Smax.romanov@nginx.com     nxt_bool_t              block_write, enable_write;
29911Sigor@sysoev.ru     nxt_port_t              *port;
300197Smax.romanov@nginx.com     struct iovec            *iov;
301127Smax.romanov@nginx.com     nxt_work_queue_t        *wq;
302125Smax.romanov@nginx.com     nxt_port_method_t       m;
30311Sigor@sysoev.ru     nxt_port_send_msg_t     *msg;
30411Sigor@sysoev.ru     nxt_sendbuf_coalesce_t  sb;
30542Smax.romanov@nginx.com 
306197Smax.romanov@nginx.com     port = nxt_container_of(obj, nxt_port_t, socket);
30711Sigor@sysoev.ru 
308343Smax.romanov@nginx.com     block_write = 0;
309343Smax.romanov@nginx.com     enable_write = 0;
310343Smax.romanov@nginx.com     use_delta = 0;
311343Smax.romanov@nginx.com 
312343Smax.romanov@nginx.com     nxt_thread_mutex_lock(&port->write_mutex);
313343Smax.romanov@nginx.com 
314197Smax.romanov@nginx.com     iov = port->iov;
31511Sigor@sysoev.ru 
316344Smax.romanov@nginx.com     wq = &task->thread->engine->fast_work_queue;
317344Smax.romanov@nginx.com 
31811Sigor@sysoev.ru     do {
319344Smax.romanov@nginx.com         msg = nxt_port_msg_first(task, port, data);
32011Sigor@sysoev.ru 
321344Smax.romanov@nginx.com         if (msg == NULL) {
322343Smax.romanov@nginx.com             block_write = 1;
323343Smax.romanov@nginx.com             goto unlock_mutex;
32411Sigor@sysoev.ru         }
32511Sigor@sysoev.ru 
32614Sigor@sysoev.ru         iov[0].iov_base = &msg->port_msg;
32714Sigor@sysoev.ru         iov[0].iov_len = sizeof(nxt_port_msg_t);
32811Sigor@sysoev.ru 
32911Sigor@sysoev.ru         sb.buf = msg->buf;
33014Sigor@sysoev.ru         sb.iobuf = &iov[1];
33111Sigor@sysoev.ru         sb.nmax = NXT_IOBUF_MAX - 1;
33211Sigor@sysoev.ru         sb.sync = 0;
33311Sigor@sysoev.ru         sb.last = 0;
33442Smax.romanov@nginx.com         sb.size = 0;
33511Sigor@sysoev.ru         sb.limit = port->max_size;
33611Sigor@sysoev.ru 
337352Smax.romanov@nginx.com         sb.limit_reached = 0;
338352Smax.romanov@nginx.com         sb.nmax_reached = 0;
339352Smax.romanov@nginx.com 
34042Smax.romanov@nginx.com         m = nxt_port_mmap_get_method(task, port, msg->buf);
34142Smax.romanov@nginx.com 
34242Smax.romanov@nginx.com         if (m == NXT_PORT_METHOD_MMAP) {
34342Smax.romanov@nginx.com             sb.limit = (1ULL << 31) - 1;
344352Smax.romanov@nginx.com             sb.nmax = nxt_min(NXT_IOBUF_MAX * 10 - 1,
345352Smax.romanov@nginx.com                               port->max_size / PORT_MMAP_MIN_SIZE);
34642Smax.romanov@nginx.com         }
34742Smax.romanov@nginx.com 
348423Smax.romanov@nginx.com         if (msg->port_msg.tracking) {
349423Smax.romanov@nginx.com             iov[0].iov_len += sizeof(msg->tracking_msg);
350423Smax.romanov@nginx.com         }
351423Smax.romanov@nginx.com 
352*1002Smax.romanov@nginx.com         sb.limit -= iov[0].iov_len;
353*1002Smax.romanov@nginx.com 
35442Smax.romanov@nginx.com         nxt_sendbuf_mem_coalesce(task, &sb);
35542Smax.romanov@nginx.com 
35642Smax.romanov@nginx.com         plain_size = sb.size;
35742Smax.romanov@nginx.com 
35842Smax.romanov@nginx.com         /*
35942Smax.romanov@nginx.com          * Send through mmap enabled only when payload
36042Smax.romanov@nginx.com          * is bigger than PORT_MMAP_MIN_SIZE.
36142Smax.romanov@nginx.com          */
36242Smax.romanov@nginx.com         if (m == NXT_PORT_METHOD_MMAP && plain_size > PORT_MMAP_MIN_SIZE) {
36342Smax.romanov@nginx.com             nxt_port_mmap_write(task, port, msg, &sb);
36442Smax.romanov@nginx.com 
36542Smax.romanov@nginx.com         } else {
36642Smax.romanov@nginx.com             m = NXT_PORT_METHOD_PLAIN;
36742Smax.romanov@nginx.com         }
36811Sigor@sysoev.ru 
369189Smax.romanov@nginx.com         msg->port_msg.last |= sb.last;
370352Smax.romanov@nginx.com         msg->port_msg.mf = sb.limit_reached || sb.nmax_reached;
37111Sigor@sysoev.ru 
37242Smax.romanov@nginx.com         n = nxt_socketpair_send(&port->socket, msg->fd, iov, sb.niov + 1);
37311Sigor@sysoev.ru 
37411Sigor@sysoev.ru         if (n > 0) {
37542Smax.romanov@nginx.com             if (nxt_slow_path((size_t) n != sb.size + iov[0].iov_len)) {
376564Svbart@nginx.com                 nxt_alert(task, "port %d: short write: %z instead of %uz",
377564Svbart@nginx.com                           port->socket.fd, n, sb.size + iov[0].iov_len);
37811Sigor@sysoev.ru                 goto fail;
37911Sigor@sysoev.ru             }
38011Sigor@sysoev.ru 
381189Smax.romanov@nginx.com             if (msg->fd != -1 && msg->close_fd != 0) {
382189Smax.romanov@nginx.com                 nxt_fd_close(msg->fd);
383189Smax.romanov@nginx.com 
384189Smax.romanov@nginx.com                 msg->fd = -1;
385189Smax.romanov@nginx.com             }
386189Smax.romanov@nginx.com 
387592Sigor@sysoev.ru             msg->buf = nxt_port_buf_completion(task, wq, msg->buf, plain_size,
388203Smax.romanov@nginx.com                                               m == NXT_PORT_METHOD_MMAP);
38911Sigor@sysoev.ru 
39011Sigor@sysoev.ru             if (msg->buf != NULL) {
391352Smax.romanov@nginx.com                 nxt_debug(task, "port %d: frag stream #%uD", port->socket.fd,
392352Smax.romanov@nginx.com                           msg->port_msg.stream);
393352Smax.romanov@nginx.com 
39411Sigor@sysoev.ru                 /*
39511Sigor@sysoev.ru                  * A file descriptor is sent only
39611Sigor@sysoev.ru                  * in the first message of a stream.
39711Sigor@sysoev.ru                  */
39811Sigor@sysoev.ru                 msg->fd = -1;
39911Sigor@sysoev.ru                 msg->share += n;
400352Smax.romanov@nginx.com                 msg->port_msg.nf = 1;
401423Smax.romanov@nginx.com                 msg->port_msg.tracking = 0;
40211Sigor@sysoev.ru 
40311Sigor@sysoev.ru                 if (msg->share >= port->max_share) {
40411Sigor@sysoev.ru                     msg->share = 0;
405344Smax.romanov@nginx.com 
406344Smax.romanov@nginx.com                     if (msg->link.next != NULL) {
407344Smax.romanov@nginx.com                         nxt_queue_remove(&msg->link);
408344Smax.romanov@nginx.com                         use_delta--;
409344Smax.romanov@nginx.com                     }
410344Smax.romanov@nginx.com                     data = NULL;
411344Smax.romanov@nginx.com 
412344Smax.romanov@nginx.com                     if (nxt_port_msg_push(task, port, msg) != NULL) {
413344Smax.romanov@nginx.com                         use_delta++;
414344Smax.romanov@nginx.com                     }
41511Sigor@sysoev.ru                 }
41611Sigor@sysoev.ru 
41711Sigor@sysoev.ru             } else {
418344Smax.romanov@nginx.com                 if (msg->link.next != NULL) {
419344Smax.romanov@nginx.com                     nxt_queue_remove(&msg->link);
420344Smax.romanov@nginx.com                     use_delta--;
421344Smax.romanov@nginx.com                     nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg,
422344Smax.romanov@nginx.com                                        msg->work.data);
423344Smax.romanov@nginx.com                 }
424344Smax.romanov@nginx.com                 data = NULL;
42511Sigor@sysoev.ru             }
42611Sigor@sysoev.ru 
42711Sigor@sysoev.ru         } else if (nxt_slow_path(n == NXT_ERROR)) {
428344Smax.romanov@nginx.com             if (msg->link.next == NULL) {
429344Smax.romanov@nginx.com                 if (nxt_port_msg_push(task, port, msg) != NULL) {
430344Smax.romanov@nginx.com                     use_delta++;
431344Smax.romanov@nginx.com                 }
432344Smax.romanov@nginx.com             }
43311Sigor@sysoev.ru             goto fail;
43411Sigor@sysoev.ru         }
43511Sigor@sysoev.ru 
43611Sigor@sysoev.ru         /* n == NXT_AGAIN */
43711Sigor@sysoev.ru 
43811Sigor@sysoev.ru     } while (port->socket.write_ready);
43911Sigor@sysoev.ru 
44012Sigor@sysoev.ru     if (nxt_fd_event_is_disabled(port->socket.write)) {
441343Smax.romanov@nginx.com         enable_write = 1;
44211Sigor@sysoev.ru     }
44311Sigor@sysoev.ru 
444343Smax.romanov@nginx.com     goto unlock_mutex;
44511Sigor@sysoev.ru 
44611Sigor@sysoev.ru fail:
44711Sigor@sysoev.ru 
448343Smax.romanov@nginx.com     use_delta++;
449343Smax.romanov@nginx.com 
450344Smax.romanov@nginx.com     nxt_work_queue_add(wq, nxt_port_error_handler, task, &port->socket,
451343Smax.romanov@nginx.com                        &port->socket);
452343Smax.romanov@nginx.com 
453343Smax.romanov@nginx.com unlock_mutex:
454343Smax.romanov@nginx.com     nxt_thread_mutex_unlock(&port->write_mutex);
455343Smax.romanov@nginx.com 
456343Smax.romanov@nginx.com     if (block_write && nxt_fd_event_is_active(port->socket.write)) {
457343Smax.romanov@nginx.com         nxt_port_post(task, port, nxt_port_fd_block_write, NULL);
458343Smax.romanov@nginx.com     }
459343Smax.romanov@nginx.com 
460343Smax.romanov@nginx.com     if (enable_write) {
461343Smax.romanov@nginx.com         nxt_port_post(task, port, nxt_port_fd_enable_write, NULL);
462343Smax.romanov@nginx.com     }
463343Smax.romanov@nginx.com 
464343Smax.romanov@nginx.com     if (use_delta != 0) {
465343Smax.romanov@nginx.com         nxt_port_use(task, port, use_delta);
466343Smax.romanov@nginx.com     }
46711Sigor@sysoev.ru }
46811Sigor@sysoev.ru 
46911Sigor@sysoev.ru 
470592Sigor@sysoev.ru static nxt_buf_t *
471592Sigor@sysoev.ru nxt_port_buf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b,
472592Sigor@sysoev.ru     size_t sent, nxt_bool_t mmap_mode)
473592Sigor@sysoev.ru {
474592Sigor@sysoev.ru     size_t  size;
475592Sigor@sysoev.ru 
476592Sigor@sysoev.ru     while (b != NULL) {
477592Sigor@sysoev.ru 
478592Sigor@sysoev.ru         nxt_prefetch(b->next);
479592Sigor@sysoev.ru 
480592Sigor@sysoev.ru         if (!nxt_buf_is_sync(b)) {
481592Sigor@sysoev.ru 
482592Sigor@sysoev.ru             size = nxt_buf_used_size(b);
483592Sigor@sysoev.ru 
484592Sigor@sysoev.ru             if (size != 0) {
485592Sigor@sysoev.ru 
486592Sigor@sysoev.ru                 if (sent == 0) {
487592Sigor@sysoev.ru                     break;
488592Sigor@sysoev.ru                 }
489592Sigor@sysoev.ru 
490592Sigor@sysoev.ru                 if (nxt_buf_is_port_mmap(b) && mmap_mode) {
491592Sigor@sysoev.ru                     /*
492592Sigor@sysoev.ru                      * buffer has been sent to other side which is now
493592Sigor@sysoev.ru                      * responsible for shared memory bucket release
494592Sigor@sysoev.ru                      */
495592Sigor@sysoev.ru                     b->is_port_mmap_sent = 1;
496592Sigor@sysoev.ru                 }
497592Sigor@sysoev.ru 
498592Sigor@sysoev.ru                 if (sent < size) {
499592Sigor@sysoev.ru 
500592Sigor@sysoev.ru                     if (nxt_buf_is_mem(b)) {
501592Sigor@sysoev.ru                         b->mem.pos += sent;
502592Sigor@sysoev.ru                     }
503592Sigor@sysoev.ru 
504592Sigor@sysoev.ru                     if (nxt_buf_is_file(b)) {
505592Sigor@sysoev.ru                         b->file_pos += sent;
506592Sigor@sysoev.ru                     }
507592Sigor@sysoev.ru 
508592Sigor@sysoev.ru                     break;
509592Sigor@sysoev.ru                 }
510592Sigor@sysoev.ru 
511592Sigor@sysoev.ru                 /* b->mem.free is NULL in file-only buffer. */
512592Sigor@sysoev.ru                 b->mem.pos = b->mem.free;
513592Sigor@sysoev.ru 
514592Sigor@sysoev.ru                 if (nxt_buf_is_file(b)) {
515592Sigor@sysoev.ru                     b->file_pos = b->file_end;
516592Sigor@sysoev.ru                 }
517592Sigor@sysoev.ru 
518592Sigor@sysoev.ru                 sent -= size;
519592Sigor@sysoev.ru             }
520592Sigor@sysoev.ru         }
521592Sigor@sysoev.ru 
522592Sigor@sysoev.ru         nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
523592Sigor@sysoev.ru 
524592Sigor@sysoev.ru         b = b->next;
525592Sigor@sysoev.ru     }
526592Sigor@sysoev.ru 
527592Sigor@sysoev.ru     return b;
528592Sigor@sysoev.ru }
529592Sigor@sysoev.ru 
530592Sigor@sysoev.ru 
53111Sigor@sysoev.ru void
53211Sigor@sysoev.ru nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port)
53311Sigor@sysoev.ru {
53411Sigor@sysoev.ru     port->socket.fd = port->pair[0];
53511Sigor@sysoev.ru     port->socket.log = &nxt_main_log;
53611Sigor@sysoev.ru 
537141Smax.romanov@nginx.com     port->engine = task->thread->engine;
538141Smax.romanov@nginx.com 
539141Smax.romanov@nginx.com     port->socket.read_work_queue = &port->engine->fast_work_queue;
54011Sigor@sysoev.ru     port->socket.read_handler = nxt_port_read_handler;
54111Sigor@sysoev.ru     port->socket.error_handler = nxt_port_error_handler;
54211Sigor@sysoev.ru 
543141Smax.romanov@nginx.com     nxt_fd_event_enable_read(port->engine, &port->socket);
54411Sigor@sysoev.ru }
54511Sigor@sysoev.ru 
54611Sigor@sysoev.ru 
54711Sigor@sysoev.ru void
54811Sigor@sysoev.ru nxt_port_read_close(nxt_port_t *port)
54911Sigor@sysoev.ru {
550350Smax.romanov@nginx.com     port->socket.read_ready = 0;
55113Sigor@sysoev.ru     nxt_socket_close(port->socket.task, port->pair[0]);
55211Sigor@sysoev.ru     port->pair[0] = -1;
55311Sigor@sysoev.ru }
55411Sigor@sysoev.ru 
55511Sigor@sysoev.ru 
55611Sigor@sysoev.ru static void
55711Sigor@sysoev.ru nxt_port_read_handler(nxt_task_t *task, void *obj, void *data)
55811Sigor@sysoev.ru {
55942Smax.romanov@nginx.com     ssize_t              n;
56042Smax.romanov@nginx.com     nxt_buf_t            *b;
56142Smax.romanov@nginx.com     nxt_port_t           *port;
56242Smax.romanov@nginx.com     struct iovec         iov[2];
56342Smax.romanov@nginx.com     nxt_port_recv_msg_t  msg;
56411Sigor@sysoev.ru 
565125Smax.romanov@nginx.com     port = msg.port = nxt_container_of(obj, nxt_port_t, socket);
56611Sigor@sysoev.ru 
567141Smax.romanov@nginx.com     nxt_assert(port->engine == task->thread->engine);
568141Smax.romanov@nginx.com 
56911Sigor@sysoev.ru     for ( ;; ) {
57011Sigor@sysoev.ru 
57111Sigor@sysoev.ru         b = nxt_port_buf_alloc(port);
57211Sigor@sysoev.ru 
57311Sigor@sysoev.ru         if (nxt_slow_path(b == NULL)) {
57411Sigor@sysoev.ru             /* TODO: disable event for some time */
57511Sigor@sysoev.ru         }
57611Sigor@sysoev.ru 
57742Smax.romanov@nginx.com         iov[0].iov_base = &msg.port_msg;
57814Sigor@sysoev.ru         iov[0].iov_len = sizeof(nxt_port_msg_t);
57911Sigor@sysoev.ru 
58014Sigor@sysoev.ru         iov[1].iov_base = b->mem.pos;
58114Sigor@sysoev.ru         iov[1].iov_len = port->max_size;
58214Sigor@sysoev.ru 
58342Smax.romanov@nginx.com         n = nxt_socketpair_recv(&port->socket, &msg.fd, iov, 2);
58411Sigor@sysoev.ru 
58511Sigor@sysoev.ru         if (n > 0) {
58642Smax.romanov@nginx.com 
58742Smax.romanov@nginx.com             msg.buf = b;
58882Smax.romanov@nginx.com             msg.size = n;
58942Smax.romanov@nginx.com 
59082Smax.romanov@nginx.com             nxt_port_read_msg_process(task, port, &msg);
59111Sigor@sysoev.ru 
592194Smax.romanov@nginx.com             /*
593194Smax.romanov@nginx.com              * To disable instant completion or buffer re-usage,
594194Smax.romanov@nginx.com              * handler should reset 'msg.buf'.
595194Smax.romanov@nginx.com              */
596194Smax.romanov@nginx.com             if (msg.buf == b) {
59711Sigor@sysoev.ru                 nxt_port_buf_free(port, b);
59811Sigor@sysoev.ru             }
59911Sigor@sysoev.ru 
60011Sigor@sysoev.ru             if (port->socket.read_ready) {
60111Sigor@sysoev.ru                 continue;
60211Sigor@sysoev.ru             }
60311Sigor@sysoev.ru 
60411Sigor@sysoev.ru             return;
60511Sigor@sysoev.ru         }
60611Sigor@sysoev.ru 
60711Sigor@sysoev.ru         if (n == NXT_AGAIN) {
60811Sigor@sysoev.ru             nxt_port_buf_free(port, b);
60911Sigor@sysoev.ru 
61012Sigor@sysoev.ru             nxt_fd_event_enable_read(task->thread->engine, &port->socket);
61111Sigor@sysoev.ru             return;
61211Sigor@sysoev.ru         }
61311Sigor@sysoev.ru 
61411Sigor@sysoev.ru         /* n == 0 || n == NXT_ERROR */
61511Sigor@sysoev.ru 
61611Sigor@sysoev.ru         nxt_work_queue_add(&task->thread->engine->fast_work_queue,
61711Sigor@sysoev.ru                            nxt_port_error_handler, task, &port->socket, NULL);
61811Sigor@sysoev.ru         return;
61911Sigor@sysoev.ru     }
62011Sigor@sysoev.ru }
62111Sigor@sysoev.ru 
62211Sigor@sysoev.ru 
623352Smax.romanov@nginx.com static nxt_int_t
624352Smax.romanov@nginx.com nxt_port_lvlhsh_frag_test(nxt_lvlhsh_query_t *lhq, void *data)
625352Smax.romanov@nginx.com {
626352Smax.romanov@nginx.com     nxt_port_recv_msg_t  *fmsg;
627352Smax.romanov@nginx.com 
628352Smax.romanov@nginx.com     fmsg = data;
629352Smax.romanov@nginx.com 
630352Smax.romanov@nginx.com     if (lhq->key.length == sizeof(uint32_t)
631352Smax.romanov@nginx.com         && *(uint32_t *) lhq->key.start == fmsg->port_msg.stream)
632352Smax.romanov@nginx.com     {
633352Smax.romanov@nginx.com         return NXT_OK;
634352Smax.romanov@nginx.com     }
635352Smax.romanov@nginx.com 
636352Smax.romanov@nginx.com     return NXT_DECLINED;
637352Smax.romanov@nginx.com }
638352Smax.romanov@nginx.com 
639352Smax.romanov@nginx.com 
640352Smax.romanov@nginx.com static void *
641352Smax.romanov@nginx.com nxt_port_lvlhsh_frag_alloc(void *ctx, size_t size)
642352Smax.romanov@nginx.com {
643352Smax.romanov@nginx.com     return nxt_mp_alloc(ctx, size);
644352Smax.romanov@nginx.com }
645352Smax.romanov@nginx.com 
646352Smax.romanov@nginx.com 
647352Smax.romanov@nginx.com static void
648352Smax.romanov@nginx.com nxt_port_lvlhsh_frag_free(void *ctx, void *p)
649352Smax.romanov@nginx.com {
650389Smax.romanov@nginx.com     nxt_mp_free(ctx, p);
651352Smax.romanov@nginx.com }
652352Smax.romanov@nginx.com 
653352Smax.romanov@nginx.com 
654352Smax.romanov@nginx.com static const nxt_lvlhsh_proto_t  lvlhsh_frag_proto  nxt_aligned(64) = {
655352Smax.romanov@nginx.com     NXT_LVLHSH_DEFAULT,
656352Smax.romanov@nginx.com     nxt_port_lvlhsh_frag_test,
657352Smax.romanov@nginx.com     nxt_port_lvlhsh_frag_alloc,
658352Smax.romanov@nginx.com     nxt_port_lvlhsh_frag_free,
659352Smax.romanov@nginx.com };
660352Smax.romanov@nginx.com 
661352Smax.romanov@nginx.com 
662352Smax.romanov@nginx.com static nxt_port_recv_msg_t *
663352Smax.romanov@nginx.com nxt_port_frag_start(nxt_task_t *task, nxt_port_t *port,
664352Smax.romanov@nginx.com     nxt_port_recv_msg_t *msg)
665352Smax.romanov@nginx.com {
666352Smax.romanov@nginx.com     nxt_int_t            res;
667352Smax.romanov@nginx.com     nxt_lvlhsh_query_t   lhq;
668352Smax.romanov@nginx.com     nxt_port_recv_msg_t  *fmsg;
669352Smax.romanov@nginx.com 
670352Smax.romanov@nginx.com     nxt_debug(task, "start frag stream #%uD", msg->port_msg.stream);
671352Smax.romanov@nginx.com 
672352Smax.romanov@nginx.com     fmsg = nxt_mp_alloc(port->mem_pool, sizeof(nxt_port_recv_msg_t));
673352Smax.romanov@nginx.com 
674352Smax.romanov@nginx.com     if (nxt_slow_path(fmsg == NULL)) {
675352Smax.romanov@nginx.com         return NULL;
676352Smax.romanov@nginx.com     }
677352Smax.romanov@nginx.com 
678352Smax.romanov@nginx.com     *fmsg = *msg;
679352Smax.romanov@nginx.com 
680352Smax.romanov@nginx.com     lhq.key_hash = nxt_murmur_hash2(&fmsg->port_msg.stream, sizeof(uint32_t));
681352Smax.romanov@nginx.com     lhq.key.length = sizeof(uint32_t);
682352Smax.romanov@nginx.com     lhq.key.start = (u_char *) &fmsg->port_msg.stream;
683352Smax.romanov@nginx.com     lhq.proto = &lvlhsh_frag_proto;
684352Smax.romanov@nginx.com     lhq.replace = 0;
685352Smax.romanov@nginx.com     lhq.value = fmsg;
686352Smax.romanov@nginx.com     lhq.pool = port->mem_pool;
687352Smax.romanov@nginx.com 
688352Smax.romanov@nginx.com     res = nxt_lvlhsh_insert(&port->frags, &lhq);
689352Smax.romanov@nginx.com 
690352Smax.romanov@nginx.com     switch (res) {
691352Smax.romanov@nginx.com 
692352Smax.romanov@nginx.com     case NXT_OK:
693352Smax.romanov@nginx.com         return fmsg;
694352Smax.romanov@nginx.com 
695352Smax.romanov@nginx.com     case NXT_DECLINED:
696352Smax.romanov@nginx.com         nxt_log(task, NXT_LOG_WARN, "duplicate frag stream #%uD",
697352Smax.romanov@nginx.com                 fmsg->port_msg.stream);
698352Smax.romanov@nginx.com         nxt_mp_free(port->mem_pool, fmsg);
699352Smax.romanov@nginx.com 
700352Smax.romanov@nginx.com         return NULL;
701352Smax.romanov@nginx.com 
702352Smax.romanov@nginx.com     default:
703352Smax.romanov@nginx.com         nxt_log(task, NXT_LOG_WARN, "failed to add frag stream #%uD",
704352Smax.romanov@nginx.com                 fmsg->port_msg.stream);
705352Smax.romanov@nginx.com 
706352Smax.romanov@nginx.com         nxt_mp_free(port->mem_pool, fmsg);
707352Smax.romanov@nginx.com 
708352Smax.romanov@nginx.com         return NULL;
709352Smax.romanov@nginx.com 
710352Smax.romanov@nginx.com     }
711352Smax.romanov@nginx.com }
712352Smax.romanov@nginx.com 
713352Smax.romanov@nginx.com 
714352Smax.romanov@nginx.com static nxt_port_recv_msg_t *
715352Smax.romanov@nginx.com nxt_port_frag_find(nxt_task_t *task, nxt_port_t *port, uint32_t stream,
716352Smax.romanov@nginx.com     nxt_bool_t last)
717352Smax.romanov@nginx.com {
718352Smax.romanov@nginx.com     nxt_int_t          res;
719352Smax.romanov@nginx.com     nxt_lvlhsh_query_t lhq;
720352Smax.romanov@nginx.com 
721352Smax.romanov@nginx.com     nxt_debug(task, "%s frag stream #%uD", last ? "last" : "next", stream);
722352Smax.romanov@nginx.com 
723352Smax.romanov@nginx.com     lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(uint32_t));
724352Smax.romanov@nginx.com     lhq.key.length = sizeof(uint32_t);
725352Smax.romanov@nginx.com     lhq.key.start = (u_char *) &stream;
726352Smax.romanov@nginx.com     lhq.proto = &lvlhsh_frag_proto;
727352Smax.romanov@nginx.com     lhq.pool = port->mem_pool;
728352Smax.romanov@nginx.com 
729352Smax.romanov@nginx.com     res = last != 0 ? nxt_lvlhsh_delete(&port->frags, &lhq) :
730352Smax.romanov@nginx.com           nxt_lvlhsh_find(&port->frags, &lhq);
731352Smax.romanov@nginx.com 
732352Smax.romanov@nginx.com     switch (res) {
733352Smax.romanov@nginx.com 
734352Smax.romanov@nginx.com     case NXT_OK:
735352Smax.romanov@nginx.com         return lhq.value;
736352Smax.romanov@nginx.com 
737352Smax.romanov@nginx.com     default:
738551Smax.romanov@nginx.com         nxt_log(task, NXT_LOG_INFO, "frag stream #%uD not found", stream);
739352Smax.romanov@nginx.com 
740352Smax.romanov@nginx.com         return NULL;
741352Smax.romanov@nginx.com     }
742352Smax.romanov@nginx.com }
743352Smax.romanov@nginx.com 
744352Smax.romanov@nginx.com 
74511Sigor@sysoev.ru static void
74611Sigor@sysoev.ru nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
74782Smax.romanov@nginx.com     nxt_port_recv_msg_t *msg)
74811Sigor@sysoev.ru {
749352Smax.romanov@nginx.com     nxt_buf_t            *b, *orig_b;
750352Smax.romanov@nginx.com     nxt_port_recv_msg_t  *fmsg;
75111Sigor@sysoev.ru 
75282Smax.romanov@nginx.com     if (nxt_slow_path(msg->size < sizeof(nxt_port_msg_t))) {
753564Svbart@nginx.com         nxt_alert(task, "port %d: too small message:%uz",
754564Svbart@nginx.com                   port->socket.fd, msg->size);
755423Smax.romanov@nginx.com 
756423Smax.romanov@nginx.com         if (msg->fd != -1) {
757423Smax.romanov@nginx.com             nxt_fd_close(msg->fd);
758423Smax.romanov@nginx.com         }
759423Smax.romanov@nginx.com 
760423Smax.romanov@nginx.com         return;
76111Sigor@sysoev.ru     }
76211Sigor@sysoev.ru 
76342Smax.romanov@nginx.com     /* adjust size to actual buffer used size */
76482Smax.romanov@nginx.com     msg->size -= sizeof(nxt_port_msg_t);
76542Smax.romanov@nginx.com 
76642Smax.romanov@nginx.com     b = orig_b = msg->buf;
76782Smax.romanov@nginx.com     b->mem.free += msg->size;
76842Smax.romanov@nginx.com 
769423Smax.romanov@nginx.com     if (msg->port_msg.tracking) {
770423Smax.romanov@nginx.com         msg->cancelled = nxt_port_mmap_tracking_read(task, msg) == 0;
771423Smax.romanov@nginx.com 
772423Smax.romanov@nginx.com     } else {
773423Smax.romanov@nginx.com         msg->cancelled = 0;
77442Smax.romanov@nginx.com     }
77511Sigor@sysoev.ru 
776352Smax.romanov@nginx.com     if (nxt_slow_path(msg->port_msg.nf != 0)) {
777423Smax.romanov@nginx.com 
778352Smax.romanov@nginx.com         fmsg = nxt_port_frag_find(task, port, msg->port_msg.stream,
779352Smax.romanov@nginx.com                                   msg->port_msg.mf == 0);
780352Smax.romanov@nginx.com 
781551Smax.romanov@nginx.com         if (nxt_slow_path(fmsg == NULL)) {
782551Smax.romanov@nginx.com             goto fmsg_failed;
783551Smax.romanov@nginx.com         }
784423Smax.romanov@nginx.com 
785423Smax.romanov@nginx.com         if (nxt_fast_path(fmsg->cancelled == 0)) {
786423Smax.romanov@nginx.com 
787423Smax.romanov@nginx.com             if (msg->port_msg.mmap) {
788423Smax.romanov@nginx.com                 nxt_port_mmap_read(task, msg);
789423Smax.romanov@nginx.com             }
790423Smax.romanov@nginx.com 
791423Smax.romanov@nginx.com             nxt_buf_chain_add(&fmsg->buf, msg->buf);
792423Smax.romanov@nginx.com 
793423Smax.romanov@nginx.com             fmsg->size += msg->size;
794423Smax.romanov@nginx.com             msg->buf = NULL;
795423Smax.romanov@nginx.com             b = NULL;
796423Smax.romanov@nginx.com 
797423Smax.romanov@nginx.com             if (nxt_fast_path(msg->port_msg.mf == 0)) {
798423Smax.romanov@nginx.com 
799423Smax.romanov@nginx.com                 b = fmsg->buf;
800423Smax.romanov@nginx.com 
801423Smax.romanov@nginx.com                 port->handler(task, fmsg);
802423Smax.romanov@nginx.com 
803423Smax.romanov@nginx.com                 msg->buf = fmsg->buf;
804423Smax.romanov@nginx.com                 msg->fd = fmsg->fd;
805974Smax.romanov@nginx.com 
806974Smax.romanov@nginx.com                 /*
807974Smax.romanov@nginx.com                  * To disable instant completion or buffer re-usage,
808974Smax.romanov@nginx.com                  * handler should reset 'msg.buf'.
809974Smax.romanov@nginx.com                  */
810974Smax.romanov@nginx.com                 if (!msg->port_msg.mmap && msg->buf == b) {
811974Smax.romanov@nginx.com                     nxt_port_buf_free(port, b);
812974Smax.romanov@nginx.com                 }
813423Smax.romanov@nginx.com             }
814352Smax.romanov@nginx.com         }
815352Smax.romanov@nginx.com 
816352Smax.romanov@nginx.com         if (nxt_fast_path(msg->port_msg.mf == 0)) {
817352Smax.romanov@nginx.com             nxt_mp_free(port->mem_pool, fmsg);
818352Smax.romanov@nginx.com         }
819352Smax.romanov@nginx.com     } else {
820352Smax.romanov@nginx.com         if (nxt_slow_path(msg->port_msg.mf != 0)) {
821423Smax.romanov@nginx.com 
822423Smax.romanov@nginx.com             if (msg->port_msg.mmap && msg->cancelled == 0) {
823423Smax.romanov@nginx.com                 nxt_port_mmap_read(task, msg);
824423Smax.romanov@nginx.com                 b = msg->buf;
825423Smax.romanov@nginx.com             }
826423Smax.romanov@nginx.com 
827352Smax.romanov@nginx.com             fmsg = nxt_port_frag_start(task, port, msg);
828352Smax.romanov@nginx.com 
829551Smax.romanov@nginx.com             if (nxt_slow_path(fmsg == NULL)) {
830551Smax.romanov@nginx.com                 goto fmsg_failed;
831551Smax.romanov@nginx.com             }
832352Smax.romanov@nginx.com 
833352Smax.romanov@nginx.com             fmsg->port_msg.nf = 0;
834352Smax.romanov@nginx.com             fmsg->port_msg.mf = 0;
835352Smax.romanov@nginx.com 
836423Smax.romanov@nginx.com             if (nxt_fast_path(msg->cancelled == 0)) {
837423Smax.romanov@nginx.com                 msg->buf = NULL;
838423Smax.romanov@nginx.com                 msg->fd = -1;
839423Smax.romanov@nginx.com                 b = NULL;
840423Smax.romanov@nginx.com 
841423Smax.romanov@nginx.com             } else {
842423Smax.romanov@nginx.com                 if (msg->fd != -1) {
843423Smax.romanov@nginx.com                     nxt_fd_close(msg->fd);
844423Smax.romanov@nginx.com                 }
845423Smax.romanov@nginx.com             }
846352Smax.romanov@nginx.com         } else {
847423Smax.romanov@nginx.com             if (nxt_fast_path(msg->cancelled == 0)) {
848423Smax.romanov@nginx.com 
849423Smax.romanov@nginx.com                 if (msg->port_msg.mmap) {
850423Smax.romanov@nginx.com                     nxt_port_mmap_read(task, msg);
851423Smax.romanov@nginx.com                     b = msg->buf;
852423Smax.romanov@nginx.com                 }
853423Smax.romanov@nginx.com 
854423Smax.romanov@nginx.com                 port->handler(task, msg);
855423Smax.romanov@nginx.com             }
856352Smax.romanov@nginx.com         }
857352Smax.romanov@nginx.com     }
85842Smax.romanov@nginx.com 
859551Smax.romanov@nginx.com fmsg_failed:
860551Smax.romanov@nginx.com 
86182Smax.romanov@nginx.com     if (msg->port_msg.mmap && orig_b != b) {
86242Smax.romanov@nginx.com 
863194Smax.romanov@nginx.com         /*
864194Smax.romanov@nginx.com          * To disable instant buffer completion,
865194Smax.romanov@nginx.com          * handler should reset 'msg->buf'.
866194Smax.romanov@nginx.com          */
867194Smax.romanov@nginx.com         if (msg->buf == b) {
868194Smax.romanov@nginx.com             /* complete mmap buffers */
869194Smax.romanov@nginx.com             for (; b != NULL; b = b->next) {
870194Smax.romanov@nginx.com                 nxt_debug(task, "complete buffer %p", b);
871194Smax.romanov@nginx.com 
872194Smax.romanov@nginx.com                 nxt_work_queue_add(port->socket.read_work_queue,
873194Smax.romanov@nginx.com                     b->completion_handler, task, b, b->parent);
874194Smax.romanov@nginx.com             }
87542Smax.romanov@nginx.com         }
876194Smax.romanov@nginx.com 
877194Smax.romanov@nginx.com         /* restore original buf */
878194Smax.romanov@nginx.com         msg->buf = orig_b;
87942Smax.romanov@nginx.com     }
88011Sigor@sysoev.ru }
88111Sigor@sysoev.ru 
88211Sigor@sysoev.ru 
88311Sigor@sysoev.ru static nxt_buf_t *
88411Sigor@sysoev.ru nxt_port_buf_alloc(nxt_port_t *port)
88511Sigor@sysoev.ru {
88611Sigor@sysoev.ru     nxt_buf_t  *b;
88711Sigor@sysoev.ru 
88811Sigor@sysoev.ru     if (port->free_bufs != NULL) {
88911Sigor@sysoev.ru         b = port->free_bufs;
89011Sigor@sysoev.ru         port->free_bufs = b->next;
89111Sigor@sysoev.ru 
89211Sigor@sysoev.ru         b->mem.pos = b->mem.start;
89311Sigor@sysoev.ru         b->mem.free = b->mem.start;
89442Smax.romanov@nginx.com         b->next = NULL;
89511Sigor@sysoev.ru     } else {
89611Sigor@sysoev.ru         b = nxt_buf_mem_alloc(port->mem_pool, port->max_size, 0);
89711Sigor@sysoev.ru         if (nxt_slow_path(b == NULL)) {
89811Sigor@sysoev.ru             return NULL;
89911Sigor@sysoev.ru         }
90011Sigor@sysoev.ru     }
90111Sigor@sysoev.ru 
90211Sigor@sysoev.ru     return b;
90311Sigor@sysoev.ru }
90411Sigor@sysoev.ru 
90511Sigor@sysoev.ru 
90611Sigor@sysoev.ru static void
90711Sigor@sysoev.ru nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b)
90811Sigor@sysoev.ru {
909974Smax.romanov@nginx.com     nxt_buf_chain_add(&b, port->free_bufs);
91011Sigor@sysoev.ru     port->free_bufs = b;
91111Sigor@sysoev.ru }
91211Sigor@sysoev.ru 
91311Sigor@sysoev.ru 
91411Sigor@sysoev.ru static void
91511Sigor@sysoev.ru nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
91611Sigor@sysoev.ru {
917343Smax.romanov@nginx.com     int                  use_delta;
918197Smax.romanov@nginx.com     nxt_buf_t            *b;
919197Smax.romanov@nginx.com     nxt_port_t           *port;
920197Smax.romanov@nginx.com     nxt_work_queue_t     *wq;
921197Smax.romanov@nginx.com     nxt_port_send_msg_t  *msg;
922197Smax.romanov@nginx.com 
923125Smax.romanov@nginx.com     nxt_debug(task, "port error handler %p", obj);
92411Sigor@sysoev.ru     /* TODO */
925197Smax.romanov@nginx.com 
926197Smax.romanov@nginx.com     port = nxt_container_of(obj, nxt_port_t, socket);
927197Smax.romanov@nginx.com 
928343Smax.romanov@nginx.com     use_delta = 0;
929343Smax.romanov@nginx.com 
930343Smax.romanov@nginx.com     if (obj == data) {
931343Smax.romanov@nginx.com         use_delta--;
932343Smax.romanov@nginx.com     }
933197Smax.romanov@nginx.com 
934343Smax.romanov@nginx.com     wq = &task->thread->engine->fast_work_queue;
935343Smax.romanov@nginx.com 
936343Smax.romanov@nginx.com     nxt_thread_mutex_lock(&port->write_mutex);
937343Smax.romanov@nginx.com 
938343Smax.romanov@nginx.com     nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) {
939197Smax.romanov@nginx.com 
940521Szelenkov@nginx.com         for (b = msg->buf; b != NULL; b = b->next) {
941197Smax.romanov@nginx.com             if (nxt_buf_is_sync(b)) {
942197Smax.romanov@nginx.com                 continue;
943197Smax.romanov@nginx.com             }
944197Smax.romanov@nginx.com 
945197Smax.romanov@nginx.com             nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
946197Smax.romanov@nginx.com         }
947197Smax.romanov@nginx.com 
948197Smax.romanov@nginx.com         nxt_queue_remove(&msg->link);
949343Smax.romanov@nginx.com         use_delta--;
950197Smax.romanov@nginx.com         nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg,
951344Smax.romanov@nginx.com                            msg->work.data);
952197Smax.romanov@nginx.com 
953197Smax.romanov@nginx.com     } nxt_queue_loop;
954343Smax.romanov@nginx.com 
955343Smax.romanov@nginx.com     nxt_thread_mutex_unlock(&port->write_mutex);
956343Smax.romanov@nginx.com 
957343Smax.romanov@nginx.com     if (use_delta != 0) {
958343Smax.romanov@nginx.com         nxt_port_use(task, port, use_delta);
959343Smax.romanov@nginx.com     }
96011Sigor@sysoev.ru }
961