xref: /unit/src/nxt_port_socket.c (revision 194)
111Sigor@sysoev.ru 
211Sigor@sysoev.ru /*
311Sigor@sysoev.ru  * Copyright (C) Igor Sysoev
411Sigor@sysoev.ru  * Copyright (C) NGINX, Inc.
511Sigor@sysoev.ru  */
611Sigor@sysoev.ru 
711Sigor@sysoev.ru #include <nxt_main.h>
811Sigor@sysoev.ru 
911Sigor@sysoev.ru 
1011Sigor@sysoev.ru static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data);
1111Sigor@sysoev.ru static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data);
1211Sigor@sysoev.ru static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
1382Smax.romanov@nginx.com     nxt_port_recv_msg_t *msg);
1411Sigor@sysoev.ru static nxt_buf_t *nxt_port_buf_alloc(nxt_port_t *port);
1511Sigor@sysoev.ru static void nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b);
1611Sigor@sysoev.ru static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data);
1711Sigor@sysoev.ru 
1811Sigor@sysoev.ru 
1914Sigor@sysoev.ru nxt_int_t
2014Sigor@sysoev.ru nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port, size_t max_size)
2111Sigor@sysoev.ru {
2265Sigor@sysoev.ru     nxt_int_t     sndbuf, rcvbuf, size;
2365Sigor@sysoev.ru     nxt_socket_t  snd, rcv;
2411Sigor@sysoev.ru 
2514Sigor@sysoev.ru     port->socket.task = task;
2614Sigor@sysoev.ru 
2714Sigor@sysoev.ru     port->pair[0] = -1;
2814Sigor@sysoev.ru     port->pair[1] = -1;
2914Sigor@sysoev.ru 
3013Sigor@sysoev.ru     if (nxt_slow_path(nxt_socketpair_create(task, port->pair) != NXT_OK)) {
3111Sigor@sysoev.ru         goto socketpair_fail;
3211Sigor@sysoev.ru     }
3311Sigor@sysoev.ru 
3411Sigor@sysoev.ru     snd = port->pair[1];
3511Sigor@sysoev.ru 
3613Sigor@sysoev.ru     sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF);
3711Sigor@sysoev.ru     if (nxt_slow_path(sndbuf < 0)) {
3811Sigor@sysoev.ru         goto getsockopt_fail;
3911Sigor@sysoev.ru     }
4011Sigor@sysoev.ru 
4111Sigor@sysoev.ru     rcv = port->pair[0];
4211Sigor@sysoev.ru 
4313Sigor@sysoev.ru     rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF);
4411Sigor@sysoev.ru     if (nxt_slow_path(rcvbuf < 0)) {
4511Sigor@sysoev.ru         goto getsockopt_fail;
4611Sigor@sysoev.ru     }
4711Sigor@sysoev.ru 
4811Sigor@sysoev.ru     if (max_size == 0) {
4911Sigor@sysoev.ru         max_size = 16 * 1024;
5011Sigor@sysoev.ru     }
5111Sigor@sysoev.ru 
5211Sigor@sysoev.ru     if ((size_t) sndbuf < max_size) {
5311Sigor@sysoev.ru         /*
5411Sigor@sysoev.ru          * On Unix domain sockets
5511Sigor@sysoev.ru          *   Linux uses 224K on both send and receive directions;
5611Sigor@sysoev.ru          *   FreeBSD, MacOSX, NetBSD, and OpenBSD use 2K buffer size
5711Sigor@sysoev.ru          *   on send direction and 4K buffer size on receive direction;
5811Sigor@sysoev.ru          *   Solaris uses 16K on send direction and 5K on receive direction.
5911Sigor@sysoev.ru          */
6013Sigor@sysoev.ru         (void) nxt_socket_setsockopt(task, snd, SOL_SOCKET, SO_SNDBUF,
6113Sigor@sysoev.ru                                      max_size);
6211Sigor@sysoev.ru 
6313Sigor@sysoev.ru         sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF);
6411Sigor@sysoev.ru         if (nxt_slow_path(sndbuf < 0)) {
6511Sigor@sysoev.ru             goto getsockopt_fail;
6611Sigor@sysoev.ru         }
6711Sigor@sysoev.ru 
6811Sigor@sysoev.ru         size = sndbuf * 4;
6911Sigor@sysoev.ru 
7011Sigor@sysoev.ru         if (rcvbuf < size) {
7113Sigor@sysoev.ru             (void) nxt_socket_setsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF,
7213Sigor@sysoev.ru                                          size);
7311Sigor@sysoev.ru 
7413Sigor@sysoev.ru             rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF);
7511Sigor@sysoev.ru             if (nxt_slow_path(rcvbuf < 0)) {
7611Sigor@sysoev.ru                 goto getsockopt_fail;
7711Sigor@sysoev.ru             }
7811Sigor@sysoev.ru         }
7911Sigor@sysoev.ru     }
8011Sigor@sysoev.ru 
8111Sigor@sysoev.ru     port->max_size = nxt_min(max_size, (size_t) sndbuf);
8211Sigor@sysoev.ru     port->max_share = (64 * 1024);
8311Sigor@sysoev.ru 
8414Sigor@sysoev.ru     return NXT_OK;
8511Sigor@sysoev.ru 
8611Sigor@sysoev.ru getsockopt_fail:
8711Sigor@sysoev.ru 
8813Sigor@sysoev.ru     nxt_socket_close(task, port->pair[0]);
8913Sigor@sysoev.ru     nxt_socket_close(task, port->pair[1]);
9011Sigor@sysoev.ru 
9111Sigor@sysoev.ru socketpair_fail:
9211Sigor@sysoev.ru 
9314Sigor@sysoev.ru     return NXT_ERROR;
9411Sigor@sysoev.ru }
9511Sigor@sysoev.ru 
9611Sigor@sysoev.ru 
9711Sigor@sysoev.ru void
9811Sigor@sysoev.ru nxt_port_destroy(nxt_port_t *port)
9911Sigor@sysoev.ru {
10013Sigor@sysoev.ru     nxt_socket_close(port->socket.task, port->socket.fd);
10165Sigor@sysoev.ru     nxt_mp_destroy(port->mem_pool);
10211Sigor@sysoev.ru }
10311Sigor@sysoev.ru 
10411Sigor@sysoev.ru 
10511Sigor@sysoev.ru void
10611Sigor@sysoev.ru nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port)
10711Sigor@sysoev.ru {
10811Sigor@sysoev.ru     port->socket.fd = port->pair[1];
10911Sigor@sysoev.ru     port->socket.log = &nxt_main_log;
11011Sigor@sysoev.ru     port->socket.write_ready = 1;
11111Sigor@sysoev.ru 
112141Smax.romanov@nginx.com     port->engine = task->thread->engine;
113141Smax.romanov@nginx.com 
114141Smax.romanov@nginx.com     port->socket.write_work_queue = &port->engine->fast_work_queue;
11511Sigor@sysoev.ru     port->socket.write_handler = nxt_port_write_handler;
11611Sigor@sysoev.ru     port->socket.error_handler = nxt_port_error_handler;
11711Sigor@sysoev.ru }
11811Sigor@sysoev.ru 
11911Sigor@sysoev.ru 
12011Sigor@sysoev.ru void
12111Sigor@sysoev.ru nxt_port_write_close(nxt_port_t *port)
12211Sigor@sysoev.ru {
12313Sigor@sysoev.ru     nxt_socket_close(port->socket.task, port->pair[1]);
12411Sigor@sysoev.ru     port->pair[1] = -1;
12511Sigor@sysoev.ru }
12611Sigor@sysoev.ru 
12711Sigor@sysoev.ru 
128122Smax.romanov@nginx.com static void
129122Smax.romanov@nginx.com nxt_port_release_send_msg(nxt_task_t *task, void *obj, void *data)
130122Smax.romanov@nginx.com {
131122Smax.romanov@nginx.com     nxt_event_engine_t   *engine;
132122Smax.romanov@nginx.com     nxt_port_send_msg_t  *msg;
133122Smax.romanov@nginx.com 
134122Smax.romanov@nginx.com     msg = obj;
135122Smax.romanov@nginx.com     engine = data;
136122Smax.romanov@nginx.com 
137122Smax.romanov@nginx.com #if (NXT_DEBUG)
138122Smax.romanov@nginx.com     if (nxt_slow_path(data != msg->engine)) {
139122Smax.romanov@nginx.com         nxt_log_alert(task->log, "release msg data (%p) != msg->engine (%p)",
140122Smax.romanov@nginx.com                       data, msg->engine);
141122Smax.romanov@nginx.com         nxt_abort();
142122Smax.romanov@nginx.com     }
143122Smax.romanov@nginx.com #endif
144122Smax.romanov@nginx.com 
145122Smax.romanov@nginx.com     if (engine != task->thread->engine) {
146122Smax.romanov@nginx.com 
147122Smax.romanov@nginx.com         nxt_debug(task, "current thread is %PT, expected %PT",
148122Smax.romanov@nginx.com                   task->thread->tid, engine->task.thread->tid);
149122Smax.romanov@nginx.com 
150122Smax.romanov@nginx.com         nxt_event_engine_post(engine, &msg->work);
151122Smax.romanov@nginx.com 
152122Smax.romanov@nginx.com         return;
153122Smax.romanov@nginx.com     }
154122Smax.romanov@nginx.com 
155122Smax.romanov@nginx.com     nxt_mp_release(msg->mem_pool, obj);
156122Smax.romanov@nginx.com }
157122Smax.romanov@nginx.com 
158122Smax.romanov@nginx.com 
15911Sigor@sysoev.ru nxt_int_t
16014Sigor@sysoev.ru nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
16142Smax.romanov@nginx.com     nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b)
16211Sigor@sysoev.ru {
16311Sigor@sysoev.ru     nxt_port_send_msg_t  *msg;
16411Sigor@sysoev.ru 
165163Smax.romanov@nginx.com     nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) {
16611Sigor@sysoev.ru 
16789Smax.romanov@nginx.com         if  (msg->port_msg.stream == stream &&
16889Smax.romanov@nginx.com              msg->port_msg.reply_port == reply_port) {
169189Smax.romanov@nginx.com 
170189Smax.romanov@nginx.com             nxt_assert(msg->port_msg.last == 0);
171189Smax.romanov@nginx.com 
17211Sigor@sysoev.ru             /*
17311Sigor@sysoev.ru              * An fd is ignored since a file descriptor
17411Sigor@sysoev.ru              * must be sent only in the first message of a stream.
17511Sigor@sysoev.ru              */
17611Sigor@sysoev.ru             nxt_buf_chain_add(&msg->buf, b);
17711Sigor@sysoev.ru 
178189Smax.romanov@nginx.com             msg->port_msg.last |= (type & NXT_PORT_MSG_LAST) != 0;
179189Smax.romanov@nginx.com 
18011Sigor@sysoev.ru             return NXT_OK;
18111Sigor@sysoev.ru         }
182163Smax.romanov@nginx.com 
183163Smax.romanov@nginx.com     } nxt_queue_loop;
18411Sigor@sysoev.ru 
185122Smax.romanov@nginx.com     msg = nxt_mp_retain(port->mem_pool, sizeof(nxt_port_send_msg_t));
18611Sigor@sysoev.ru     if (nxt_slow_path(msg == NULL)) {
18711Sigor@sysoev.ru         return NXT_ERROR;
18811Sigor@sysoev.ru     }
18911Sigor@sysoev.ru 
190122Smax.romanov@nginx.com     msg->link.next = NULL;
191122Smax.romanov@nginx.com     msg->link.prev = NULL;
192122Smax.romanov@nginx.com 
19311Sigor@sysoev.ru     msg->buf = b;
19411Sigor@sysoev.ru     msg->fd = fd;
195189Smax.romanov@nginx.com     msg->close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0;
19611Sigor@sysoev.ru     msg->share = 0;
197122Smax.romanov@nginx.com 
198122Smax.romanov@nginx.com     msg->work.next = NULL;
199122Smax.romanov@nginx.com     msg->work.handler = nxt_port_release_send_msg;
200122Smax.romanov@nginx.com     msg->work.task = task;
201122Smax.romanov@nginx.com     msg->work.obj = msg;
202122Smax.romanov@nginx.com     msg->work.data = task->thread->engine;
203122Smax.romanov@nginx.com 
204122Smax.romanov@nginx.com     msg->engine = task->thread->engine;
20583Smax.romanov@nginx.com     msg->mem_pool = port->mem_pool;
20611Sigor@sysoev.ru 
20711Sigor@sysoev.ru     msg->port_msg.stream = stream;
20842Smax.romanov@nginx.com     msg->port_msg.pid = nxt_pid;
20942Smax.romanov@nginx.com     msg->port_msg.reply_port = reply_port;
210189Smax.romanov@nginx.com     msg->port_msg.type = type & NXT_PORT_MSG_MASK;
211189Smax.romanov@nginx.com     msg->port_msg.last = (type & NXT_PORT_MSG_LAST) != 0;
21242Smax.romanov@nginx.com     msg->port_msg.mmap = 0;
21311Sigor@sysoev.ru 
21411Sigor@sysoev.ru     nxt_queue_insert_tail(&port->messages, &msg->link);
21511Sigor@sysoev.ru 
21611Sigor@sysoev.ru     if (port->socket.write_ready) {
217125Smax.romanov@nginx.com         nxt_port_write_handler(task, &port->socket, NULL);
21811Sigor@sysoev.ru     }
21911Sigor@sysoev.ru 
22011Sigor@sysoev.ru     return NXT_OK;
22111Sigor@sysoev.ru }
22211Sigor@sysoev.ru 
22311Sigor@sysoev.ru 
22411Sigor@sysoev.ru static void
22511Sigor@sysoev.ru nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
22611Sigor@sysoev.ru {
22711Sigor@sysoev.ru     ssize_t                 n;
22811Sigor@sysoev.ru     nxt_port_t              *port;
22989Smax.romanov@nginx.com     struct iovec            iov[NXT_IOBUF_MAX * 10];
230127Smax.romanov@nginx.com     nxt_work_queue_t        *wq;
23111Sigor@sysoev.ru     nxt_queue_link_t        *link;
232125Smax.romanov@nginx.com     nxt_port_method_t       m;
23311Sigor@sysoev.ru     nxt_port_send_msg_t     *msg;
23411Sigor@sysoev.ru     nxt_sendbuf_coalesce_t  sb;
23542Smax.romanov@nginx.com 
23642Smax.romanov@nginx.com     size_t                  plain_size;
23742Smax.romanov@nginx.com     nxt_buf_t               *plain_buf;
23811Sigor@sysoev.ru 
239125Smax.romanov@nginx.com     port = nxt_container_of(obj, nxt_port_t, socket);
24011Sigor@sysoev.ru 
24111Sigor@sysoev.ru     do {
24211Sigor@sysoev.ru         link = nxt_queue_first(&port->messages);
24311Sigor@sysoev.ru 
24411Sigor@sysoev.ru         if (link == nxt_queue_tail(&port->messages)) {
24512Sigor@sysoev.ru             nxt_fd_event_block_write(task->thread->engine, &port->socket);
24611Sigor@sysoev.ru             return;
24711Sigor@sysoev.ru         }
24811Sigor@sysoev.ru 
249163Smax.romanov@nginx.com         msg = nxt_queue_link_data(link, nxt_port_send_msg_t, link);
25011Sigor@sysoev.ru 
25114Sigor@sysoev.ru         iov[0].iov_base = &msg->port_msg;
25214Sigor@sysoev.ru         iov[0].iov_len = sizeof(nxt_port_msg_t);
25311Sigor@sysoev.ru 
25411Sigor@sysoev.ru         sb.buf = msg->buf;
25514Sigor@sysoev.ru         sb.iobuf = &iov[1];
25611Sigor@sysoev.ru         sb.nmax = NXT_IOBUF_MAX - 1;
25711Sigor@sysoev.ru         sb.sync = 0;
25811Sigor@sysoev.ru         sb.last = 0;
25942Smax.romanov@nginx.com         sb.size = 0;
26011Sigor@sysoev.ru         sb.limit = port->max_size;
26111Sigor@sysoev.ru 
26242Smax.romanov@nginx.com         m = nxt_port_mmap_get_method(task, port, msg->buf);
26342Smax.romanov@nginx.com 
26442Smax.romanov@nginx.com         if (m == NXT_PORT_METHOD_MMAP) {
26542Smax.romanov@nginx.com             sb.limit = (1ULL << 31) - 1;
26689Smax.romanov@nginx.com             sb.nmax = NXT_IOBUF_MAX * 10 - 1;
26742Smax.romanov@nginx.com         }
26842Smax.romanov@nginx.com 
26942Smax.romanov@nginx.com         nxt_sendbuf_mem_coalesce(task, &sb);
27042Smax.romanov@nginx.com 
27142Smax.romanov@nginx.com         plain_size = sb.size;
27242Smax.romanov@nginx.com         plain_buf = msg->buf;
27342Smax.romanov@nginx.com 
27442Smax.romanov@nginx.com         /*
27542Smax.romanov@nginx.com          * Send through mmap enabled only when payload
27642Smax.romanov@nginx.com          * is bigger than PORT_MMAP_MIN_SIZE.
27742Smax.romanov@nginx.com          */
27842Smax.romanov@nginx.com         if (m == NXT_PORT_METHOD_MMAP && plain_size > PORT_MMAP_MIN_SIZE) {
27942Smax.romanov@nginx.com             nxt_port_mmap_write(task, port, msg, &sb);
28042Smax.romanov@nginx.com 
28142Smax.romanov@nginx.com         } else {
28242Smax.romanov@nginx.com             m = NXT_PORT_METHOD_PLAIN;
28342Smax.romanov@nginx.com         }
28411Sigor@sysoev.ru 
285189Smax.romanov@nginx.com         msg->port_msg.last |= sb.last;
28611Sigor@sysoev.ru 
28742Smax.romanov@nginx.com         n = nxt_socketpair_send(&port->socket, msg->fd, iov, sb.niov + 1);
28811Sigor@sysoev.ru 
28911Sigor@sysoev.ru         if (n > 0) {
29042Smax.romanov@nginx.com             if (nxt_slow_path((size_t) n != sb.size + iov[0].iov_len)) {
29111Sigor@sysoev.ru                 nxt_log(task, NXT_LOG_CRIT,
29211Sigor@sysoev.ru                         "port %d: short write: %z instead of %uz",
29342Smax.romanov@nginx.com                         port->socket.fd, n, sb.size + iov[0].iov_len);
29411Sigor@sysoev.ru                 goto fail;
29511Sigor@sysoev.ru             }
29611Sigor@sysoev.ru 
297189Smax.romanov@nginx.com             if (msg->fd != -1 && msg->close_fd != 0) {
298189Smax.romanov@nginx.com                 nxt_fd_close(msg->fd);
299189Smax.romanov@nginx.com 
300189Smax.romanov@nginx.com                 msg->fd = -1;
301189Smax.romanov@nginx.com             }
302189Smax.romanov@nginx.com 
303127Smax.romanov@nginx.com             wq = &task->thread->engine->fast_work_queue;
304127Smax.romanov@nginx.com 
30542Smax.romanov@nginx.com             if (msg->buf != plain_buf) {
30642Smax.romanov@nginx.com                 /*
30742Smax.romanov@nginx.com                  * Complete crafted mmap_msgs buf and restore msg->buf
30842Smax.romanov@nginx.com                  * for regular completion call.
30942Smax.romanov@nginx.com                  */
310127Smax.romanov@nginx.com                 nxt_port_mmap_completion(task, wq, msg->buf);
31142Smax.romanov@nginx.com 
31242Smax.romanov@nginx.com                 msg->buf = plain_buf;
31342Smax.romanov@nginx.com             }
31442Smax.romanov@nginx.com 
315127Smax.romanov@nginx.com             msg->buf = nxt_sendbuf_completion(task, wq, msg->buf, plain_size);
31611Sigor@sysoev.ru 
31711Sigor@sysoev.ru             if (msg->buf != NULL) {
31811Sigor@sysoev.ru                 /*
31911Sigor@sysoev.ru                  * A file descriptor is sent only
32011Sigor@sysoev.ru                  * in the first message of a stream.
32111Sigor@sysoev.ru                  */
32211Sigor@sysoev.ru                 msg->fd = -1;
32311Sigor@sysoev.ru                 msg->share += n;
32411Sigor@sysoev.ru 
32511Sigor@sysoev.ru                 if (msg->share >= port->max_share) {
32611Sigor@sysoev.ru                     msg->share = 0;
32711Sigor@sysoev.ru                     nxt_queue_remove(link);
32811Sigor@sysoev.ru                     nxt_queue_insert_tail(&port->messages, link);
32911Sigor@sysoev.ru                 }
33011Sigor@sysoev.ru 
33111Sigor@sysoev.ru             } else {
33211Sigor@sysoev.ru                 nxt_queue_remove(link);
333122Smax.romanov@nginx.com                 nxt_port_release_send_msg(task, msg, msg->engine);
33411Sigor@sysoev.ru             }
33511Sigor@sysoev.ru 
33611Sigor@sysoev.ru         } else if (nxt_slow_path(n == NXT_ERROR)) {
33711Sigor@sysoev.ru             goto fail;
33811Sigor@sysoev.ru         }
33911Sigor@sysoev.ru 
34011Sigor@sysoev.ru         /* n == NXT_AGAIN */
34111Sigor@sysoev.ru 
34211Sigor@sysoev.ru     } while (port->socket.write_ready);
34311Sigor@sysoev.ru 
34412Sigor@sysoev.ru     if (nxt_fd_event_is_disabled(port->socket.write)) {
345141Smax.romanov@nginx.com         /* TODO task->thread->engine or port->engine ? */
34612Sigor@sysoev.ru         nxt_fd_event_enable_write(task->thread->engine, &port->socket);
34711Sigor@sysoev.ru     }
34811Sigor@sysoev.ru 
34911Sigor@sysoev.ru     return;
35011Sigor@sysoev.ru 
35111Sigor@sysoev.ru fail:
35211Sigor@sysoev.ru 
35311Sigor@sysoev.ru     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
35411Sigor@sysoev.ru                        nxt_port_error_handler, task, &port->socket, NULL);
35511Sigor@sysoev.ru }
35611Sigor@sysoev.ru 
35711Sigor@sysoev.ru 
35811Sigor@sysoev.ru void
35911Sigor@sysoev.ru nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port)
36011Sigor@sysoev.ru {
36111Sigor@sysoev.ru     port->socket.fd = port->pair[0];
36211Sigor@sysoev.ru     port->socket.log = &nxt_main_log;
36311Sigor@sysoev.ru 
364141Smax.romanov@nginx.com     port->engine = task->thread->engine;
365141Smax.romanov@nginx.com 
366141Smax.romanov@nginx.com     port->socket.read_work_queue = &port->engine->fast_work_queue;
36711Sigor@sysoev.ru     port->socket.read_handler = nxt_port_read_handler;
36811Sigor@sysoev.ru     port->socket.error_handler = nxt_port_error_handler;
36911Sigor@sysoev.ru 
370141Smax.romanov@nginx.com     nxt_fd_event_enable_read(port->engine, &port->socket);
37111Sigor@sysoev.ru }
37211Sigor@sysoev.ru 
37311Sigor@sysoev.ru 
37411Sigor@sysoev.ru void
37511Sigor@sysoev.ru nxt_port_read_close(nxt_port_t *port)
37611Sigor@sysoev.ru {
37713Sigor@sysoev.ru     nxt_socket_close(port->socket.task, port->pair[0]);
37811Sigor@sysoev.ru     port->pair[0] = -1;
37911Sigor@sysoev.ru }
38011Sigor@sysoev.ru 
38111Sigor@sysoev.ru 
38211Sigor@sysoev.ru static void
38311Sigor@sysoev.ru nxt_port_read_handler(nxt_task_t *task, void *obj, void *data)
38411Sigor@sysoev.ru {
38542Smax.romanov@nginx.com     ssize_t              n;
38642Smax.romanov@nginx.com     nxt_buf_t            *b;
38742Smax.romanov@nginx.com     nxt_port_t           *port;
38842Smax.romanov@nginx.com     struct iovec         iov[2];
38942Smax.romanov@nginx.com     nxt_port_recv_msg_t  msg;
39011Sigor@sysoev.ru 
391125Smax.romanov@nginx.com     port = msg.port = nxt_container_of(obj, nxt_port_t, socket);
39211Sigor@sysoev.ru 
393141Smax.romanov@nginx.com     nxt_assert(port->engine == task->thread->engine);
394141Smax.romanov@nginx.com 
39511Sigor@sysoev.ru     for ( ;; ) {
39611Sigor@sysoev.ru 
39711Sigor@sysoev.ru         b = nxt_port_buf_alloc(port);
39811Sigor@sysoev.ru 
39911Sigor@sysoev.ru         if (nxt_slow_path(b == NULL)) {
40011Sigor@sysoev.ru             /* TODO: disable event for some time */
40111Sigor@sysoev.ru         }
40211Sigor@sysoev.ru 
40342Smax.romanov@nginx.com         iov[0].iov_base = &msg.port_msg;
40414Sigor@sysoev.ru         iov[0].iov_len = sizeof(nxt_port_msg_t);
40511Sigor@sysoev.ru 
40614Sigor@sysoev.ru         iov[1].iov_base = b->mem.pos;
40714Sigor@sysoev.ru         iov[1].iov_len = port->max_size;
40814Sigor@sysoev.ru 
40942Smax.romanov@nginx.com         n = nxt_socketpair_recv(&port->socket, &msg.fd, iov, 2);
41011Sigor@sysoev.ru 
41111Sigor@sysoev.ru         if (n > 0) {
41242Smax.romanov@nginx.com 
41342Smax.romanov@nginx.com             msg.buf = b;
41482Smax.romanov@nginx.com             msg.size = n;
41542Smax.romanov@nginx.com 
41682Smax.romanov@nginx.com             nxt_port_read_msg_process(task, port, &msg);
41711Sigor@sysoev.ru 
418*194Smax.romanov@nginx.com             /*
419*194Smax.romanov@nginx.com              * To disable instant completion or buffer re-usage,
420*194Smax.romanov@nginx.com              * handler should reset 'msg.buf'.
421*194Smax.romanov@nginx.com              */
422*194Smax.romanov@nginx.com             if (msg.buf == b) {
42311Sigor@sysoev.ru                 nxt_port_buf_free(port, b);
42411Sigor@sysoev.ru             }
42511Sigor@sysoev.ru 
42611Sigor@sysoev.ru             if (port->socket.read_ready) {
42711Sigor@sysoev.ru                 continue;
42811Sigor@sysoev.ru             }
42911Sigor@sysoev.ru 
43011Sigor@sysoev.ru             return;
43111Sigor@sysoev.ru         }
43211Sigor@sysoev.ru 
43311Sigor@sysoev.ru         if (n == NXT_AGAIN) {
43411Sigor@sysoev.ru             nxt_port_buf_free(port, b);
43511Sigor@sysoev.ru 
43612Sigor@sysoev.ru             nxt_fd_event_enable_read(task->thread->engine, &port->socket);
43711Sigor@sysoev.ru             return;
43811Sigor@sysoev.ru         }
43911Sigor@sysoev.ru 
44011Sigor@sysoev.ru         /* n == 0 || n == NXT_ERROR */
44111Sigor@sysoev.ru 
44211Sigor@sysoev.ru         nxt_work_queue_add(&task->thread->engine->fast_work_queue,
44311Sigor@sysoev.ru                            nxt_port_error_handler, task, &port->socket, NULL);
44411Sigor@sysoev.ru         return;
44511Sigor@sysoev.ru     }
44611Sigor@sysoev.ru }
44711Sigor@sysoev.ru 
44811Sigor@sysoev.ru 
44911Sigor@sysoev.ru static void
45011Sigor@sysoev.ru nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
45182Smax.romanov@nginx.com     nxt_port_recv_msg_t *msg)
45211Sigor@sysoev.ru {
45342Smax.romanov@nginx.com     nxt_buf_t  *b;
45442Smax.romanov@nginx.com     nxt_buf_t  *orig_b;
45511Sigor@sysoev.ru 
45682Smax.romanov@nginx.com     if (nxt_slow_path(msg->size < sizeof(nxt_port_msg_t))) {
45782Smax.romanov@nginx.com         nxt_log(task, NXT_LOG_CRIT,
45882Smax.romanov@nginx.com                 "port %d: too small message:%uz", port->socket.fd, msg->size);
45911Sigor@sysoev.ru         goto fail;
46011Sigor@sysoev.ru     }
46111Sigor@sysoev.ru 
46242Smax.romanov@nginx.com     /* adjust size to actual buffer used size */
46382Smax.romanov@nginx.com     msg->size -= sizeof(nxt_port_msg_t);
46442Smax.romanov@nginx.com 
46542Smax.romanov@nginx.com     b = orig_b = msg->buf;
46682Smax.romanov@nginx.com     b->mem.free += msg->size;
46742Smax.romanov@nginx.com 
46842Smax.romanov@nginx.com     if (msg->port_msg.mmap) {
46982Smax.romanov@nginx.com         nxt_port_mmap_read(task, port, msg);
47042Smax.romanov@nginx.com         b = msg->buf;
47142Smax.romanov@nginx.com     }
47211Sigor@sysoev.ru 
47342Smax.romanov@nginx.com     port->handler(task, msg);
47442Smax.romanov@nginx.com 
47582Smax.romanov@nginx.com     if (msg->port_msg.mmap && orig_b != b) {
47642Smax.romanov@nginx.com 
477*194Smax.romanov@nginx.com         /*
478*194Smax.romanov@nginx.com          * To disable instant buffer completion,
479*194Smax.romanov@nginx.com          * handler should reset 'msg->buf'.
480*194Smax.romanov@nginx.com          */
481*194Smax.romanov@nginx.com         if (msg->buf == b) {
482*194Smax.romanov@nginx.com             /* complete mmap buffers */
483*194Smax.romanov@nginx.com             for (; b != NULL; b = b->next) {
484*194Smax.romanov@nginx.com                 nxt_debug(task, "complete buffer %p", b);
485*194Smax.romanov@nginx.com 
486*194Smax.romanov@nginx.com                 nxt_work_queue_add(port->socket.read_work_queue,
487*194Smax.romanov@nginx.com                     b->completion_handler, task, b, b->parent);
488*194Smax.romanov@nginx.com             }
48942Smax.romanov@nginx.com         }
490*194Smax.romanov@nginx.com 
491*194Smax.romanov@nginx.com         /* restore original buf */
492*194Smax.romanov@nginx.com         msg->buf = orig_b;
49342Smax.romanov@nginx.com     }
49411Sigor@sysoev.ru 
49511Sigor@sysoev.ru     return;
49611Sigor@sysoev.ru 
49711Sigor@sysoev.ru fail:
49811Sigor@sysoev.ru 
49942Smax.romanov@nginx.com     if (msg->fd != -1) {
50042Smax.romanov@nginx.com         nxt_fd_close(msg->fd);
50111Sigor@sysoev.ru     }
50211Sigor@sysoev.ru }
50311Sigor@sysoev.ru 
50411Sigor@sysoev.ru 
50511Sigor@sysoev.ru static nxt_buf_t *
50611Sigor@sysoev.ru nxt_port_buf_alloc(nxt_port_t *port)
50711Sigor@sysoev.ru {
50811Sigor@sysoev.ru     nxt_buf_t  *b;
50911Sigor@sysoev.ru 
51011Sigor@sysoev.ru     if (port->free_bufs != NULL) {
51111Sigor@sysoev.ru         b = port->free_bufs;
51211Sigor@sysoev.ru         port->free_bufs = b->next;
51311Sigor@sysoev.ru 
51411Sigor@sysoev.ru         b->mem.pos = b->mem.start;
51511Sigor@sysoev.ru         b->mem.free = b->mem.start;
51642Smax.romanov@nginx.com         b->next = NULL;
51711Sigor@sysoev.ru     } else {
51811Sigor@sysoev.ru         b = nxt_buf_mem_alloc(port->mem_pool, port->max_size, 0);
51911Sigor@sysoev.ru         if (nxt_slow_path(b == NULL)) {
52011Sigor@sysoev.ru             return NULL;
52111Sigor@sysoev.ru         }
52211Sigor@sysoev.ru     }
52311Sigor@sysoev.ru 
52411Sigor@sysoev.ru     return b;
52511Sigor@sysoev.ru }
52611Sigor@sysoev.ru 
52711Sigor@sysoev.ru 
52811Sigor@sysoev.ru static void
52911Sigor@sysoev.ru nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b)
53011Sigor@sysoev.ru {
53111Sigor@sysoev.ru     b->next = port->free_bufs;
53211Sigor@sysoev.ru     port->free_bufs = b;
53311Sigor@sysoev.ru }
53411Sigor@sysoev.ru 
53511Sigor@sysoev.ru 
53611Sigor@sysoev.ru static void
53711Sigor@sysoev.ru nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
53811Sigor@sysoev.ru {
539125Smax.romanov@nginx.com     nxt_debug(task, "port error handler %p", obj);
54011Sigor@sysoev.ru     /* TODO */
54111Sigor@sysoev.ru }
542