xref: /unit/src/nxt_port_socket.c (revision 65)
111Sigor@sysoev.ru 
211Sigor@sysoev.ru /*
311Sigor@sysoev.ru  * Copyright (C) Igor Sysoev
411Sigor@sysoev.ru  * Copyright (C) NGINX, Inc.
511Sigor@sysoev.ru  */
611Sigor@sysoev.ru 
711Sigor@sysoev.ru #include <nxt_main.h>
811Sigor@sysoev.ru 
911Sigor@sysoev.ru 
1011Sigor@sysoev.ru static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data);
1111Sigor@sysoev.ru static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data);
1211Sigor@sysoev.ru static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
1342Smax.romanov@nginx.com     nxt_port_recv_msg_t *msg, size_t size);
1411Sigor@sysoev.ru static nxt_buf_t *nxt_port_buf_alloc(nxt_port_t *port);
1511Sigor@sysoev.ru static void nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b);
1611Sigor@sysoev.ru static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data);
1711Sigor@sysoev.ru 
1811Sigor@sysoev.ru 
1914Sigor@sysoev.ru nxt_int_t
2014Sigor@sysoev.ru nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port, size_t max_size)
2111Sigor@sysoev.ru {
22*65Sigor@sysoev.ru     nxt_mp_t      *mp;
23*65Sigor@sysoev.ru     nxt_int_t     sndbuf, rcvbuf, size;
24*65Sigor@sysoev.ru     nxt_socket_t  snd, rcv;
2511Sigor@sysoev.ru 
2614Sigor@sysoev.ru     port->socket.task = task;
2714Sigor@sysoev.ru 
2814Sigor@sysoev.ru     port->pair[0] = -1;
2914Sigor@sysoev.ru     port->pair[1] = -1;
3014Sigor@sysoev.ru 
3114Sigor@sysoev.ru     nxt_queue_init(&port->messages);
3214Sigor@sysoev.ru 
33*65Sigor@sysoev.ru     mp = nxt_mp_create(1024, 128, 256, 32);
3414Sigor@sysoev.ru     if (nxt_slow_path(mp == NULL)) {
3514Sigor@sysoev.ru         return NXT_ERROR;
3611Sigor@sysoev.ru     }
3711Sigor@sysoev.ru 
3814Sigor@sysoev.ru     port->mem_pool = mp;
3911Sigor@sysoev.ru 
4013Sigor@sysoev.ru     if (nxt_slow_path(nxt_socketpair_create(task, port->pair) != NXT_OK)) {
4111Sigor@sysoev.ru         goto socketpair_fail;
4211Sigor@sysoev.ru     }
4311Sigor@sysoev.ru 
4411Sigor@sysoev.ru     snd = port->pair[1];
4511Sigor@sysoev.ru 
4613Sigor@sysoev.ru     sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF);
4711Sigor@sysoev.ru     if (nxt_slow_path(sndbuf < 0)) {
4811Sigor@sysoev.ru         goto getsockopt_fail;
4911Sigor@sysoev.ru     }
5011Sigor@sysoev.ru 
5111Sigor@sysoev.ru     rcv = port->pair[0];
5211Sigor@sysoev.ru 
5313Sigor@sysoev.ru     rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF);
5411Sigor@sysoev.ru     if (nxt_slow_path(rcvbuf < 0)) {
5511Sigor@sysoev.ru         goto getsockopt_fail;
5611Sigor@sysoev.ru     }
5711Sigor@sysoev.ru 
5811Sigor@sysoev.ru     if (max_size == 0) {
5911Sigor@sysoev.ru         max_size = 16 * 1024;
6011Sigor@sysoev.ru     }
6111Sigor@sysoev.ru 
6211Sigor@sysoev.ru     if ((size_t) sndbuf < max_size) {
6311Sigor@sysoev.ru         /*
6411Sigor@sysoev.ru          * On Unix domain sockets
6511Sigor@sysoev.ru          *   Linux uses 224K on both send and receive directions;
6611Sigor@sysoev.ru          *   FreeBSD, MacOSX, NetBSD, and OpenBSD use 2K buffer size
6711Sigor@sysoev.ru          *   on send direction and 4K buffer size on receive direction;
6811Sigor@sysoev.ru          *   Solaris uses 16K on send direction and 5K on receive direction.
6911Sigor@sysoev.ru          */
7013Sigor@sysoev.ru         (void) nxt_socket_setsockopt(task, snd, SOL_SOCKET, SO_SNDBUF,
7113Sigor@sysoev.ru                                      max_size);
7211Sigor@sysoev.ru 
7313Sigor@sysoev.ru         sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF);
7411Sigor@sysoev.ru         if (nxt_slow_path(sndbuf < 0)) {
7511Sigor@sysoev.ru             goto getsockopt_fail;
7611Sigor@sysoev.ru         }
7711Sigor@sysoev.ru 
7811Sigor@sysoev.ru         size = sndbuf * 4;
7911Sigor@sysoev.ru 
8011Sigor@sysoev.ru         if (rcvbuf < size) {
8113Sigor@sysoev.ru             (void) nxt_socket_setsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF,
8213Sigor@sysoev.ru                                          size);
8311Sigor@sysoev.ru 
8413Sigor@sysoev.ru             rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF);
8511Sigor@sysoev.ru             if (nxt_slow_path(rcvbuf < 0)) {
8611Sigor@sysoev.ru                 goto getsockopt_fail;
8711Sigor@sysoev.ru             }
8811Sigor@sysoev.ru         }
8911Sigor@sysoev.ru     }
9011Sigor@sysoev.ru 
9111Sigor@sysoev.ru     port->max_size = nxt_min(max_size, (size_t) sndbuf);
9211Sigor@sysoev.ru     port->max_share = (64 * 1024);
9311Sigor@sysoev.ru 
9414Sigor@sysoev.ru     return NXT_OK;
9511Sigor@sysoev.ru 
9611Sigor@sysoev.ru getsockopt_fail:
9711Sigor@sysoev.ru 
9813Sigor@sysoev.ru     nxt_socket_close(task, port->pair[0]);
9913Sigor@sysoev.ru     nxt_socket_close(task, port->pair[1]);
10011Sigor@sysoev.ru 
10111Sigor@sysoev.ru socketpair_fail:
10211Sigor@sysoev.ru 
103*65Sigor@sysoev.ru     nxt_mp_destroy(port->mem_pool);
10411Sigor@sysoev.ru 
10514Sigor@sysoev.ru     return NXT_ERROR;
10611Sigor@sysoev.ru }
10711Sigor@sysoev.ru 
10811Sigor@sysoev.ru 
10911Sigor@sysoev.ru void
11011Sigor@sysoev.ru nxt_port_destroy(nxt_port_t *port)
11111Sigor@sysoev.ru {
11213Sigor@sysoev.ru     nxt_socket_close(port->socket.task, port->socket.fd);
113*65Sigor@sysoev.ru     nxt_mp_destroy(port->mem_pool);
11411Sigor@sysoev.ru }
11511Sigor@sysoev.ru 
11611Sigor@sysoev.ru 
11711Sigor@sysoev.ru void
11811Sigor@sysoev.ru nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port)
11911Sigor@sysoev.ru {
12011Sigor@sysoev.ru     port->socket.fd = port->pair[1];
12111Sigor@sysoev.ru     port->socket.log = &nxt_main_log;
12211Sigor@sysoev.ru     port->socket.write_ready = 1;
12311Sigor@sysoev.ru 
12411Sigor@sysoev.ru     port->socket.write_work_queue = &task->thread->engine->fast_work_queue;
12511Sigor@sysoev.ru     port->socket.write_handler = nxt_port_write_handler;
12611Sigor@sysoev.ru     port->socket.error_handler = nxt_port_error_handler;
12711Sigor@sysoev.ru }
12811Sigor@sysoev.ru 
12911Sigor@sysoev.ru 
13011Sigor@sysoev.ru void
13111Sigor@sysoev.ru nxt_port_write_close(nxt_port_t *port)
13211Sigor@sysoev.ru {
13313Sigor@sysoev.ru     nxt_socket_close(port->socket.task, port->pair[1]);
13411Sigor@sysoev.ru     port->pair[1] = -1;
13511Sigor@sysoev.ru }
13611Sigor@sysoev.ru 
13711Sigor@sysoev.ru 
13811Sigor@sysoev.ru nxt_int_t
13914Sigor@sysoev.ru nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
14042Smax.romanov@nginx.com     nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b)
14111Sigor@sysoev.ru {
14211Sigor@sysoev.ru     nxt_queue_link_t     *link;
14311Sigor@sysoev.ru     nxt_port_send_msg_t  *msg;
14411Sigor@sysoev.ru 
14511Sigor@sysoev.ru     for (link = nxt_queue_first(&port->messages);
14611Sigor@sysoev.ru          link != nxt_queue_tail(&port->messages);
14711Sigor@sysoev.ru          link = nxt_queue_next(link))
14811Sigor@sysoev.ru     {
14911Sigor@sysoev.ru         msg = (nxt_port_send_msg_t *) link;
15011Sigor@sysoev.ru 
15111Sigor@sysoev.ru         if (msg->port_msg.stream == stream) {
15211Sigor@sysoev.ru             /*
15311Sigor@sysoev.ru              * An fd is ignored since a file descriptor
15411Sigor@sysoev.ru              * must be sent only in the first message of a stream.
15511Sigor@sysoev.ru              */
15611Sigor@sysoev.ru             nxt_buf_chain_add(&msg->buf, b);
15711Sigor@sysoev.ru 
15811Sigor@sysoev.ru             return NXT_OK;
15911Sigor@sysoev.ru         }
16011Sigor@sysoev.ru     }
16111Sigor@sysoev.ru 
162*65Sigor@sysoev.ru     msg = nxt_mp_zalloc(port->mem_pool, sizeof(nxt_port_send_msg_t));
16311Sigor@sysoev.ru     if (nxt_slow_path(msg == NULL)) {
16411Sigor@sysoev.ru         return NXT_ERROR;
16511Sigor@sysoev.ru     }
16611Sigor@sysoev.ru 
16711Sigor@sysoev.ru     msg->buf = b;
16811Sigor@sysoev.ru     msg->fd = fd;
16911Sigor@sysoev.ru     msg->share = 0;
17011Sigor@sysoev.ru 
17111Sigor@sysoev.ru     msg->port_msg.stream = stream;
17242Smax.romanov@nginx.com     msg->port_msg.pid = nxt_pid;
17342Smax.romanov@nginx.com     msg->port_msg.reply_port = reply_port;
17411Sigor@sysoev.ru     msg->port_msg.type = type;
17511Sigor@sysoev.ru     msg->port_msg.last = 0;
17642Smax.romanov@nginx.com     msg->port_msg.mmap = 0;
17711Sigor@sysoev.ru 
17811Sigor@sysoev.ru     nxt_queue_insert_tail(&port->messages, &msg->link);
17911Sigor@sysoev.ru 
18011Sigor@sysoev.ru     if (port->socket.write_ready) {
18111Sigor@sysoev.ru         nxt_port_write_handler(task, port, NULL);
18211Sigor@sysoev.ru     }
18311Sigor@sysoev.ru 
18411Sigor@sysoev.ru     return NXT_OK;
18511Sigor@sysoev.ru }
18611Sigor@sysoev.ru 
18711Sigor@sysoev.ru 
18811Sigor@sysoev.ru static void
18911Sigor@sysoev.ru nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
19011Sigor@sysoev.ru {
19111Sigor@sysoev.ru     ssize_t                 n;
19211Sigor@sysoev.ru     nxt_port_t              *port;
19314Sigor@sysoev.ru     struct iovec            iov[NXT_IOBUF_MAX];
19411Sigor@sysoev.ru     nxt_queue_link_t        *link;
19511Sigor@sysoev.ru     nxt_port_send_msg_t     *msg;
19611Sigor@sysoev.ru     nxt_sendbuf_coalesce_t  sb;
19742Smax.romanov@nginx.com     nxt_port_method_t       m;
19842Smax.romanov@nginx.com 
19942Smax.romanov@nginx.com     size_t                  plain_size;
20042Smax.romanov@nginx.com     nxt_buf_t               *plain_buf;
20111Sigor@sysoev.ru 
20211Sigor@sysoev.ru     port = obj;
20311Sigor@sysoev.ru 
20411Sigor@sysoev.ru     do {
20511Sigor@sysoev.ru         link = nxt_queue_first(&port->messages);
20611Sigor@sysoev.ru 
20711Sigor@sysoev.ru         if (link == nxt_queue_tail(&port->messages)) {
20812Sigor@sysoev.ru             nxt_fd_event_block_write(task->thread->engine, &port->socket);
20911Sigor@sysoev.ru             return;
21011Sigor@sysoev.ru         }
21111Sigor@sysoev.ru 
21211Sigor@sysoev.ru         msg = (nxt_port_send_msg_t *) link;
21311Sigor@sysoev.ru 
21414Sigor@sysoev.ru         iov[0].iov_base = &msg->port_msg;
21514Sigor@sysoev.ru         iov[0].iov_len = sizeof(nxt_port_msg_t);
21611Sigor@sysoev.ru 
21711Sigor@sysoev.ru         sb.buf = msg->buf;
21814Sigor@sysoev.ru         sb.iobuf = &iov[1];
21911Sigor@sysoev.ru         sb.nmax = NXT_IOBUF_MAX - 1;
22011Sigor@sysoev.ru         sb.sync = 0;
22111Sigor@sysoev.ru         sb.last = 0;
22242Smax.romanov@nginx.com         sb.size = 0;
22311Sigor@sysoev.ru         sb.limit = port->max_size;
22411Sigor@sysoev.ru 
22542Smax.romanov@nginx.com         m = nxt_port_mmap_get_method(task, port, msg->buf);
22642Smax.romanov@nginx.com 
22742Smax.romanov@nginx.com         if (m == NXT_PORT_METHOD_MMAP) {
22842Smax.romanov@nginx.com             sb.limit = (1ULL << 31) - 1;
22942Smax.romanov@nginx.com         }
23042Smax.romanov@nginx.com 
23142Smax.romanov@nginx.com         nxt_sendbuf_mem_coalesce(task, &sb);
23242Smax.romanov@nginx.com 
23342Smax.romanov@nginx.com         plain_size = sb.size;
23442Smax.romanov@nginx.com         plain_buf = msg->buf;
23542Smax.romanov@nginx.com 
23642Smax.romanov@nginx.com         /*
23742Smax.romanov@nginx.com          * Send through mmap enabled only when payload
23842Smax.romanov@nginx.com          * is bigger than PORT_MMAP_MIN_SIZE.
23942Smax.romanov@nginx.com          */
24042Smax.romanov@nginx.com         if (m == NXT_PORT_METHOD_MMAP && plain_size > PORT_MMAP_MIN_SIZE) {
24142Smax.romanov@nginx.com             nxt_port_mmap_write(task, port, msg, &sb);
24242Smax.romanov@nginx.com 
24342Smax.romanov@nginx.com         } else {
24442Smax.romanov@nginx.com             m = NXT_PORT_METHOD_PLAIN;
24542Smax.romanov@nginx.com         }
24611Sigor@sysoev.ru 
24711Sigor@sysoev.ru         msg->port_msg.last = sb.last;
24811Sigor@sysoev.ru 
24942Smax.romanov@nginx.com         n = nxt_socketpair_send(&port->socket, msg->fd, iov, sb.niov + 1);
25011Sigor@sysoev.ru 
25111Sigor@sysoev.ru         if (n > 0) {
25242Smax.romanov@nginx.com             if (nxt_slow_path((size_t) n != sb.size + iov[0].iov_len)) {
25311Sigor@sysoev.ru                 nxt_log(task, NXT_LOG_CRIT,
25411Sigor@sysoev.ru                         "port %d: short write: %z instead of %uz",
25542Smax.romanov@nginx.com                         port->socket.fd, n, sb.size + iov[0].iov_len);
25611Sigor@sysoev.ru                 goto fail;
25711Sigor@sysoev.ru             }
25811Sigor@sysoev.ru 
25942Smax.romanov@nginx.com             if (msg->buf != plain_buf) {
26042Smax.romanov@nginx.com                 /*
26142Smax.romanov@nginx.com                  * Complete crafted mmap_msgs buf and restore msg->buf
26242Smax.romanov@nginx.com                  * for regular completion call.
26342Smax.romanov@nginx.com                  */
26442Smax.romanov@nginx.com                 nxt_port_mmap_completion(task,
26542Smax.romanov@nginx.com                                          port->socket.write_work_queue,
26642Smax.romanov@nginx.com                                          msg->buf);
26742Smax.romanov@nginx.com 
26842Smax.romanov@nginx.com                 msg->buf = plain_buf;
26942Smax.romanov@nginx.com             }
27042Smax.romanov@nginx.com 
27111Sigor@sysoev.ru             msg->buf = nxt_sendbuf_completion(task,
27211Sigor@sysoev.ru                                               port->socket.write_work_queue,
27311Sigor@sysoev.ru                                               msg->buf,
27442Smax.romanov@nginx.com                                               plain_size);
27511Sigor@sysoev.ru 
27611Sigor@sysoev.ru             if (msg->buf != NULL) {
27711Sigor@sysoev.ru                 /*
27811Sigor@sysoev.ru                  * A file descriptor is sent only
27911Sigor@sysoev.ru                  * in the first message of a stream.
28011Sigor@sysoev.ru                  */
28111Sigor@sysoev.ru                 msg->fd = -1;
28211Sigor@sysoev.ru                 msg->share += n;
28311Sigor@sysoev.ru 
28411Sigor@sysoev.ru                 if (msg->share >= port->max_share) {
28511Sigor@sysoev.ru                     msg->share = 0;
28611Sigor@sysoev.ru                     nxt_queue_remove(link);
28711Sigor@sysoev.ru                     nxt_queue_insert_tail(&port->messages, link);
28811Sigor@sysoev.ru                 }
28911Sigor@sysoev.ru 
29011Sigor@sysoev.ru             } else {
29111Sigor@sysoev.ru                 nxt_queue_remove(link);
292*65Sigor@sysoev.ru                 nxt_mp_free(port->mem_pool, msg);
29311Sigor@sysoev.ru             }
29411Sigor@sysoev.ru 
29511Sigor@sysoev.ru         } else if (nxt_slow_path(n == NXT_ERROR)) {
29611Sigor@sysoev.ru             goto fail;
29711Sigor@sysoev.ru         }
29811Sigor@sysoev.ru 
29911Sigor@sysoev.ru         /* n == NXT_AGAIN */
30011Sigor@sysoev.ru 
30111Sigor@sysoev.ru     } while (port->socket.write_ready);
30211Sigor@sysoev.ru 
30312Sigor@sysoev.ru     if (nxt_fd_event_is_disabled(port->socket.write)) {
30412Sigor@sysoev.ru         nxt_fd_event_enable_write(task->thread->engine, &port->socket);
30511Sigor@sysoev.ru     }
30611Sigor@sysoev.ru 
30711Sigor@sysoev.ru     return;
30811Sigor@sysoev.ru 
30911Sigor@sysoev.ru fail:
31011Sigor@sysoev.ru 
31111Sigor@sysoev.ru     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
31211Sigor@sysoev.ru                        nxt_port_error_handler, task, &port->socket, NULL);
31311Sigor@sysoev.ru }
31411Sigor@sysoev.ru 
31511Sigor@sysoev.ru 
31611Sigor@sysoev.ru void
31711Sigor@sysoev.ru nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port)
31811Sigor@sysoev.ru {
31911Sigor@sysoev.ru     port->socket.fd = port->pair[0];
32011Sigor@sysoev.ru     port->socket.log = &nxt_main_log;
32111Sigor@sysoev.ru 
32211Sigor@sysoev.ru     port->socket.read_work_queue = &task->thread->engine->fast_work_queue;
32311Sigor@sysoev.ru     port->socket.read_handler = nxt_port_read_handler;
32411Sigor@sysoev.ru     port->socket.error_handler = nxt_port_error_handler;
32511Sigor@sysoev.ru 
32612Sigor@sysoev.ru     nxt_fd_event_enable_read(task->thread->engine, &port->socket);
32711Sigor@sysoev.ru }
32811Sigor@sysoev.ru 
32911Sigor@sysoev.ru 
33011Sigor@sysoev.ru void
33111Sigor@sysoev.ru nxt_port_read_close(nxt_port_t *port)
33211Sigor@sysoev.ru {
33313Sigor@sysoev.ru     nxt_socket_close(port->socket.task, port->pair[0]);
33411Sigor@sysoev.ru     port->pair[0] = -1;
33511Sigor@sysoev.ru }
33611Sigor@sysoev.ru 
33711Sigor@sysoev.ru 
33811Sigor@sysoev.ru static void
33911Sigor@sysoev.ru nxt_port_read_handler(nxt_task_t *task, void *obj, void *data)
34011Sigor@sysoev.ru {
34142Smax.romanov@nginx.com     ssize_t              n;
34242Smax.romanov@nginx.com     nxt_buf_t            *b;
34342Smax.romanov@nginx.com     nxt_port_t           *port;
34442Smax.romanov@nginx.com     struct iovec         iov[2];
34542Smax.romanov@nginx.com     nxt_port_recv_msg_t  msg;
34611Sigor@sysoev.ru 
34742Smax.romanov@nginx.com     port = msg.port = obj;
34811Sigor@sysoev.ru 
34911Sigor@sysoev.ru     for ( ;; ) {
35011Sigor@sysoev.ru 
35111Sigor@sysoev.ru         b = nxt_port_buf_alloc(port);
35211Sigor@sysoev.ru 
35311Sigor@sysoev.ru         if (nxt_slow_path(b == NULL)) {
35411Sigor@sysoev.ru             /* TODO: disable event for some time */
35511Sigor@sysoev.ru         }
35611Sigor@sysoev.ru 
35742Smax.romanov@nginx.com         iov[0].iov_base = &msg.port_msg;
35814Sigor@sysoev.ru         iov[0].iov_len = sizeof(nxt_port_msg_t);
35911Sigor@sysoev.ru 
36014Sigor@sysoev.ru         iov[1].iov_base = b->mem.pos;
36114Sigor@sysoev.ru         iov[1].iov_len = port->max_size;
36214Sigor@sysoev.ru 
36342Smax.romanov@nginx.com         n = nxt_socketpair_recv(&port->socket, &msg.fd, iov, 2);
36411Sigor@sysoev.ru 
36511Sigor@sysoev.ru         if (n > 0) {
36642Smax.romanov@nginx.com 
36742Smax.romanov@nginx.com             msg.buf = b;
36842Smax.romanov@nginx.com 
36942Smax.romanov@nginx.com             nxt_port_read_msg_process(task, port, &msg, n);
37011Sigor@sysoev.ru 
37111Sigor@sysoev.ru             if (b->mem.pos == b->mem.free) {
37211Sigor@sysoev.ru                 nxt_port_buf_free(port, b);
37311Sigor@sysoev.ru             }
37411Sigor@sysoev.ru 
37511Sigor@sysoev.ru             if (port->socket.read_ready) {
37611Sigor@sysoev.ru                 continue;
37711Sigor@sysoev.ru             }
37811Sigor@sysoev.ru 
37911Sigor@sysoev.ru             return;
38011Sigor@sysoev.ru         }
38111Sigor@sysoev.ru 
38211Sigor@sysoev.ru         if (n == NXT_AGAIN) {
38311Sigor@sysoev.ru             nxt_port_buf_free(port, b);
38411Sigor@sysoev.ru 
38512Sigor@sysoev.ru             nxt_fd_event_enable_read(task->thread->engine, &port->socket);
38611Sigor@sysoev.ru             return;
38711Sigor@sysoev.ru         }
38811Sigor@sysoev.ru 
38911Sigor@sysoev.ru         /* n == 0 || n == NXT_ERROR */
39011Sigor@sysoev.ru 
39111Sigor@sysoev.ru         nxt_work_queue_add(&task->thread->engine->fast_work_queue,
39211Sigor@sysoev.ru                            nxt_port_error_handler, task, &port->socket, NULL);
39311Sigor@sysoev.ru         return;
39411Sigor@sysoev.ru     }
39511Sigor@sysoev.ru }
39611Sigor@sysoev.ru 
39711Sigor@sysoev.ru 
39811Sigor@sysoev.ru static void
39911Sigor@sysoev.ru nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
40042Smax.romanov@nginx.com     nxt_port_recv_msg_t *msg, size_t size)
40111Sigor@sysoev.ru {
40242Smax.romanov@nginx.com     nxt_buf_t  *b;
40342Smax.romanov@nginx.com     nxt_buf_t  *orig_b;
40442Smax.romanov@nginx.com     nxt_buf_t  **last_next;
40511Sigor@sysoev.ru 
40611Sigor@sysoev.ru     if (nxt_slow_path(size < sizeof(nxt_port_msg_t))) {
40711Sigor@sysoev.ru         nxt_log(port->socket.task, NXT_LOG_CRIT,
40811Sigor@sysoev.ru                 "port %d: too small message:%uz", port->socket.fd, size);
40911Sigor@sysoev.ru         goto fail;
41011Sigor@sysoev.ru     }
41111Sigor@sysoev.ru 
41242Smax.romanov@nginx.com     /* adjust size to actual buffer used size */
41342Smax.romanov@nginx.com     size -= sizeof(nxt_port_msg_t);
41442Smax.romanov@nginx.com 
41542Smax.romanov@nginx.com     b = orig_b = msg->buf;
41642Smax.romanov@nginx.com     b->mem.free += size;
41742Smax.romanov@nginx.com 
41842Smax.romanov@nginx.com     if (msg->port_msg.mmap) {
41942Smax.romanov@nginx.com         nxt_port_mmap_read(task, port, msg, size);
42042Smax.romanov@nginx.com         b = msg->buf;
42142Smax.romanov@nginx.com     }
42211Sigor@sysoev.ru 
42342Smax.romanov@nginx.com     last_next = &b->next;
42411Sigor@sysoev.ru 
42542Smax.romanov@nginx.com     if (msg->port_msg.last) {
42642Smax.romanov@nginx.com         /* find reference to last next, the NULL one */
42742Smax.romanov@nginx.com         while (*last_next) {
42842Smax.romanov@nginx.com             last_next = &(*last_next)->next;
42942Smax.romanov@nginx.com         }
43042Smax.romanov@nginx.com 
43142Smax.romanov@nginx.com         *last_next = nxt_buf_sync_alloc(port->mem_pool, NXT_BUF_SYNC_LAST);
43242Smax.romanov@nginx.com         if (nxt_slow_path(*last_next == NULL)) {
43311Sigor@sysoev.ru             goto fail;
43411Sigor@sysoev.ru         }
43542Smax.romanov@nginx.com     }
43611Sigor@sysoev.ru 
43742Smax.romanov@nginx.com     port->handler(task, msg);
43842Smax.romanov@nginx.com 
43942Smax.romanov@nginx.com     if (*last_next != NULL) {
44042Smax.romanov@nginx.com         /* A sync buffer */
44142Smax.romanov@nginx.com         nxt_buf_free(port->mem_pool, *last_next);
44242Smax.romanov@nginx.com         *last_next = NULL;
44311Sigor@sysoev.ru     }
44411Sigor@sysoev.ru 
44542Smax.romanov@nginx.com     if (orig_b != b) {
44642Smax.romanov@nginx.com         /* complete mmap buffers */
44742Smax.romanov@nginx.com         for (; b && nxt_buf_used_size(b) == 0;
44842Smax.romanov@nginx.com             b = b->next) {
44942Smax.romanov@nginx.com             nxt_debug(task, "complete buffer %p", b);
45042Smax.romanov@nginx.com 
45142Smax.romanov@nginx.com             nxt_work_queue_add(port->socket.read_work_queue,
45242Smax.romanov@nginx.com                 b->completion_handler, task, b, b->parent);
45342Smax.romanov@nginx.com         }
45442Smax.romanov@nginx.com     }
45511Sigor@sysoev.ru 
45611Sigor@sysoev.ru     return;
45711Sigor@sysoev.ru 
45811Sigor@sysoev.ru fail:
45911Sigor@sysoev.ru 
46042Smax.romanov@nginx.com     if (msg->fd != -1) {
46142Smax.romanov@nginx.com         nxt_fd_close(msg->fd);
46211Sigor@sysoev.ru     }
46311Sigor@sysoev.ru }
46411Sigor@sysoev.ru 
46511Sigor@sysoev.ru 
46611Sigor@sysoev.ru static nxt_buf_t *
46711Sigor@sysoev.ru nxt_port_buf_alloc(nxt_port_t *port)
46811Sigor@sysoev.ru {
46911Sigor@sysoev.ru     nxt_buf_t  *b;
47011Sigor@sysoev.ru 
47111Sigor@sysoev.ru     if (port->free_bufs != NULL) {
47211Sigor@sysoev.ru         b = port->free_bufs;
47311Sigor@sysoev.ru         port->free_bufs = b->next;
47411Sigor@sysoev.ru 
47511Sigor@sysoev.ru         b->mem.pos = b->mem.start;
47611Sigor@sysoev.ru         b->mem.free = b->mem.start;
47742Smax.romanov@nginx.com         b->next = NULL;
47811Sigor@sysoev.ru 
47911Sigor@sysoev.ru     } else {
48011Sigor@sysoev.ru         b = nxt_buf_mem_alloc(port->mem_pool, port->max_size, 0);
48111Sigor@sysoev.ru         if (nxt_slow_path(b == NULL)) {
48211Sigor@sysoev.ru             return NULL;
48311Sigor@sysoev.ru         }
48411Sigor@sysoev.ru     }
48511Sigor@sysoev.ru 
48611Sigor@sysoev.ru     return b;
48711Sigor@sysoev.ru }
48811Sigor@sysoev.ru 
48911Sigor@sysoev.ru 
49011Sigor@sysoev.ru static void
49111Sigor@sysoev.ru nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b)
49211Sigor@sysoev.ru {
49311Sigor@sysoev.ru     b->next = port->free_bufs;
49411Sigor@sysoev.ru     port->free_bufs = b;
49511Sigor@sysoev.ru }
49611Sigor@sysoev.ru 
49711Sigor@sysoev.ru 
49811Sigor@sysoev.ru static void
49911Sigor@sysoev.ru nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
50011Sigor@sysoev.ru {
50111Sigor@sysoev.ru     /* TODO */
50211Sigor@sysoev.ru }
503