111Sigor@sysoev.ru
211Sigor@sysoev.ru /*
311Sigor@sysoev.ru * Copyright (C) Igor Sysoev
411Sigor@sysoev.ru * Copyright (C) NGINX, Inc.
511Sigor@sysoev.ru */
611Sigor@sysoev.ru
711Sigor@sysoev.ru #include <nxt_main.h>
81996St.nateldemoura@f5.com #include <nxt_socket_msg.h>
91555Smax.romanov@nginx.com #include <nxt_port_queue.h>
101832Smax.romanov@nginx.com #include <nxt_port_memory_int.h>
1111Sigor@sysoev.ru
1211Sigor@sysoev.ru
131832Smax.romanov@nginx.com #define NXT_PORT_MAX_ENQUEUE_BUF_SIZE \
141832Smax.romanov@nginx.com (int) (NXT_PORT_QUEUE_MSG_SIZE - sizeof(nxt_port_msg_t))
151832Smax.romanov@nginx.com
161832Smax.romanov@nginx.com
171832Smax.romanov@nginx.com static nxt_bool_t nxt_port_can_enqueue_buf(nxt_buf_t *b);
181832Smax.romanov@nginx.com static uint8_t nxt_port_enqueue_buf(nxt_task_t *task, nxt_port_msg_t *pm,
191832Smax.romanov@nginx.com void *qbuf, nxt_buf_t *b);
201125Smax.romanov@nginx.com static nxt_int_t nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port,
211125Smax.romanov@nginx.com nxt_port_send_msg_t *msg);
22*2139Sandrew@digital-domain.net static nxt_port_send_msg_t *nxt_port_msg_alloc(const nxt_port_send_msg_t *m);
2311Sigor@sysoev.ru static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data);
241125Smax.romanov@nginx.com static nxt_port_send_msg_t *nxt_port_msg_first(nxt_port_t *port);
251908Smax.romanov@nginx.com nxt_inline void nxt_port_msg_close_fd(nxt_port_send_msg_t *msg);
261996St.nateldemoura@f5.com nxt_inline void nxt_port_close_fds(nxt_fd_t *fd);
27592Sigor@sysoev.ru static nxt_buf_t *nxt_port_buf_completion(nxt_task_t *task,
28592Sigor@sysoev.ru nxt_work_queue_t *wq, nxt_buf_t *b, size_t sent, nxt_bool_t mmap_mode);
291125Smax.romanov@nginx.com static nxt_port_send_msg_t *nxt_port_msg_insert_tail(nxt_port_t *port,
301125Smax.romanov@nginx.com nxt_port_send_msg_t *msg);
3111Sigor@sysoev.ru static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data);
321555Smax.romanov@nginx.com static void nxt_port_queue_read_handler(nxt_task_t *task, void *obj,
331555Smax.romanov@nginx.com void *data);
3411Sigor@sysoev.ru static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
3582Smax.romanov@nginx.com nxt_port_recv_msg_t *msg);
3611Sigor@sysoev.ru static nxt_buf_t *nxt_port_buf_alloc(nxt_port_t *port);
3711Sigor@sysoev.ru static void nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b);
3811Sigor@sysoev.ru static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data);
3911Sigor@sysoev.ru
4011Sigor@sysoev.ru
4114Sigor@sysoev.ru nxt_int_t
nxt_port_socket_init(nxt_task_t * task,nxt_port_t * port,size_t max_size)4214Sigor@sysoev.ru nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port, size_t max_size)
4311Sigor@sysoev.ru {
4465Sigor@sysoev.ru nxt_int_t sndbuf, rcvbuf, size;
4565Sigor@sysoev.ru nxt_socket_t snd, rcv;
4611Sigor@sysoev.ru
4714Sigor@sysoev.ru port->socket.task = task;
4814Sigor@sysoev.ru
4914Sigor@sysoev.ru port->pair[0] = -1;
5014Sigor@sysoev.ru port->pair[1] = -1;
5114Sigor@sysoev.ru
5213Sigor@sysoev.ru if (nxt_slow_path(nxt_socketpair_create(task, port->pair) != NXT_OK)) {
5311Sigor@sysoev.ru goto socketpair_fail;
5411Sigor@sysoev.ru }
5511Sigor@sysoev.ru
5611Sigor@sysoev.ru snd = port->pair[1];
5711Sigor@sysoev.ru
5813Sigor@sysoev.ru sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF);
5911Sigor@sysoev.ru if (nxt_slow_path(sndbuf < 0)) {
6011Sigor@sysoev.ru goto getsockopt_fail;
6111Sigor@sysoev.ru }
6211Sigor@sysoev.ru
6311Sigor@sysoev.ru rcv = port->pair[0];
6411Sigor@sysoev.ru
6513Sigor@sysoev.ru rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF);
6611Sigor@sysoev.ru if (nxt_slow_path(rcvbuf < 0)) {
6711Sigor@sysoev.ru goto getsockopt_fail;
6811Sigor@sysoev.ru }
6911Sigor@sysoev.ru
7011Sigor@sysoev.ru if (max_size == 0) {
7111Sigor@sysoev.ru max_size = 16 * 1024;
7211Sigor@sysoev.ru }
7311Sigor@sysoev.ru
7411Sigor@sysoev.ru if ((size_t) sndbuf < max_size) {
7511Sigor@sysoev.ru /*
7611Sigor@sysoev.ru * On Unix domain sockets
7711Sigor@sysoev.ru * Linux uses 224K on both send and receive directions;
7811Sigor@sysoev.ru * FreeBSD, MacOSX, NetBSD, and OpenBSD use 2K buffer size
7911Sigor@sysoev.ru * on send direction and 4K buffer size on receive direction;
8011Sigor@sysoev.ru * Solaris uses 16K on send direction and 5K on receive direction.
8111Sigor@sysoev.ru */
8213Sigor@sysoev.ru (void) nxt_socket_setsockopt(task, snd, SOL_SOCKET, SO_SNDBUF,
8313Sigor@sysoev.ru max_size);
8411Sigor@sysoev.ru
8513Sigor@sysoev.ru sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF);
8611Sigor@sysoev.ru if (nxt_slow_path(sndbuf < 0)) {
8711Sigor@sysoev.ru goto getsockopt_fail;
8811Sigor@sysoev.ru }
8911Sigor@sysoev.ru
9011Sigor@sysoev.ru size = sndbuf * 4;
9111Sigor@sysoev.ru
9211Sigor@sysoev.ru if (rcvbuf < size) {
9313Sigor@sysoev.ru (void) nxt_socket_setsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF,
9413Sigor@sysoev.ru size);
9511Sigor@sysoev.ru
9613Sigor@sysoev.ru rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF);
9711Sigor@sysoev.ru if (nxt_slow_path(rcvbuf < 0)) {
9811Sigor@sysoev.ru goto getsockopt_fail;
9911Sigor@sysoev.ru }
10011Sigor@sysoev.ru }
10111Sigor@sysoev.ru }
10211Sigor@sysoev.ru
10311Sigor@sysoev.ru port->max_size = nxt_min(max_size, (size_t) sndbuf);
10411Sigor@sysoev.ru port->max_share = (64 * 1024);
10511Sigor@sysoev.ru
10614Sigor@sysoev.ru return NXT_OK;
10711Sigor@sysoev.ru
10811Sigor@sysoev.ru getsockopt_fail:
10911Sigor@sysoev.ru
11013Sigor@sysoev.ru nxt_socket_close(task, port->pair[0]);
11113Sigor@sysoev.ru nxt_socket_close(task, port->pair[1]);
11211Sigor@sysoev.ru
11311Sigor@sysoev.ru socketpair_fail:
11411Sigor@sysoev.ru
11514Sigor@sysoev.ru return NXT_ERROR;
11611Sigor@sysoev.ru }
11711Sigor@sysoev.ru
11811Sigor@sysoev.ru
11911Sigor@sysoev.ru void
nxt_port_destroy(nxt_port_t * port)12011Sigor@sysoev.ru nxt_port_destroy(nxt_port_t *port)
12111Sigor@sysoev.ru {
12213Sigor@sysoev.ru nxt_socket_close(port->socket.task, port->socket.fd);
12365Sigor@sysoev.ru nxt_mp_destroy(port->mem_pool);
12411Sigor@sysoev.ru }
12511Sigor@sysoev.ru
12611Sigor@sysoev.ru
12711Sigor@sysoev.ru void
nxt_port_write_enable(nxt_task_t * task,nxt_port_t * port)12811Sigor@sysoev.ru nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port)
12911Sigor@sysoev.ru {
13011Sigor@sysoev.ru port->socket.fd = port->pair[1];
13111Sigor@sysoev.ru port->socket.log = &nxt_main_log;
13211Sigor@sysoev.ru port->socket.write_ready = 1;
13311Sigor@sysoev.ru
134141Smax.romanov@nginx.com port->engine = task->thread->engine;
135141Smax.romanov@nginx.com
136141Smax.romanov@nginx.com port->socket.write_work_queue = &port->engine->fast_work_queue;
13711Sigor@sysoev.ru port->socket.write_handler = nxt_port_write_handler;
13811Sigor@sysoev.ru port->socket.error_handler = nxt_port_error_handler;
13911Sigor@sysoev.ru }
14011Sigor@sysoev.ru
14111Sigor@sysoev.ru
14211Sigor@sysoev.ru void
nxt_port_write_close(nxt_port_t * port)14311Sigor@sysoev.ru nxt_port_write_close(nxt_port_t *port)
14411Sigor@sysoev.ru {
14513Sigor@sysoev.ru nxt_socket_close(port->socket.task, port->pair[1]);
14611Sigor@sysoev.ru port->pair[1] = -1;
14711Sigor@sysoev.ru }
14811Sigor@sysoev.ru
14911Sigor@sysoev.ru
150122Smax.romanov@nginx.com static void
nxt_port_release_send_msg(nxt_port_send_msg_t * msg)1511125Smax.romanov@nginx.com nxt_port_release_send_msg(nxt_port_send_msg_t *msg)
152122Smax.romanov@nginx.com {
1531125Smax.romanov@nginx.com if (msg->allocated) {
1541125Smax.romanov@nginx.com nxt_free(msg);
155344Smax.romanov@nginx.com }
156122Smax.romanov@nginx.com }
157122Smax.romanov@nginx.com
158122Smax.romanov@nginx.com
15911Sigor@sysoev.ru nxt_int_t
nxt_port_socket_write2(nxt_task_t * task,nxt_port_t * port,nxt_uint_t type,nxt_fd_t fd,nxt_fd_t fd2,uint32_t stream,nxt_port_id_t reply_port,nxt_buf_t * b)1601555Smax.romanov@nginx.com nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
1611555Smax.romanov@nginx.com nxt_fd_t fd, nxt_fd_t fd2, uint32_t stream, nxt_port_id_t reply_port,
1621555Smax.romanov@nginx.com nxt_buf_t *b)
16311Sigor@sysoev.ru {
1641555Smax.romanov@nginx.com int notify;
1651832Smax.romanov@nginx.com uint8_t qmsg_size;
1661125Smax.romanov@nginx.com nxt_int_t res;
1671125Smax.romanov@nginx.com nxt_port_send_msg_t msg;
1681832Smax.romanov@nginx.com struct {
1691832Smax.romanov@nginx.com nxt_port_msg_t pm;
1701832Smax.romanov@nginx.com uint8_t buf[NXT_PORT_MAX_ENQUEUE_BUF_SIZE];
1711832Smax.romanov@nginx.com } qmsg;
17211Sigor@sysoev.ru
173344Smax.romanov@nginx.com msg.link.next = NULL;
174344Smax.romanov@nginx.com msg.link.prev = NULL;
175122Smax.romanov@nginx.com
176344Smax.romanov@nginx.com msg.buf = b;
1771125Smax.romanov@nginx.com msg.share = 0;
1781558Smax.romanov@nginx.com msg.fd[0] = fd;
1791558Smax.romanov@nginx.com msg.fd[1] = fd2;
180344Smax.romanov@nginx.com msg.close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0;
1811125Smax.romanov@nginx.com msg.allocated = 0;
18211Sigor@sysoev.ru
183344Smax.romanov@nginx.com msg.port_msg.stream = stream;
184344Smax.romanov@nginx.com msg.port_msg.pid = nxt_pid;
185344Smax.romanov@nginx.com msg.port_msg.reply_port = reply_port;
186344Smax.romanov@nginx.com msg.port_msg.type = type & NXT_PORT_MSG_MASK;
187344Smax.romanov@nginx.com msg.port_msg.last = (type & NXT_PORT_MSG_LAST) != 0;
188344Smax.romanov@nginx.com msg.port_msg.mmap = 0;
189352Smax.romanov@nginx.com msg.port_msg.nf = 0;
190352Smax.romanov@nginx.com msg.port_msg.mf = 0;
1911555Smax.romanov@nginx.com
1921555Smax.romanov@nginx.com if (port->queue != NULL && type != _NXT_PORT_MSG_READ_QUEUE) {
1931555Smax.romanov@nginx.com
1941832Smax.romanov@nginx.com if (fd == -1 && nxt_port_can_enqueue_buf(b)) {
1951832Smax.romanov@nginx.com qmsg.pm = msg.port_msg;
1961832Smax.romanov@nginx.com
1971832Smax.romanov@nginx.com qmsg_size = sizeof(qmsg.pm);
1981832Smax.romanov@nginx.com
1991555Smax.romanov@nginx.com if (b != NULL) {
2001832Smax.romanov@nginx.com qmsg_size += nxt_port_enqueue_buf(task, &qmsg.pm, qmsg.buf, b);
2011555Smax.romanov@nginx.com }
2021555Smax.romanov@nginx.com
2031832Smax.romanov@nginx.com res = nxt_port_queue_send(port->queue, &qmsg, qmsg_size, ¬ify);
2041555Smax.romanov@nginx.com
2051555Smax.romanov@nginx.com nxt_debug(task, "port{%d,%d} %d: enqueue %d notify %d, %d",
2061555Smax.romanov@nginx.com (int) port->pid, (int) port->id, port->socket.fd,
2071832Smax.romanov@nginx.com (int) qmsg_size, notify, res);
2081832Smax.romanov@nginx.com
2091832Smax.romanov@nginx.com if (b != NULL && nxt_fast_path(res == NXT_OK)) {
2101832Smax.romanov@nginx.com if (qmsg.pm.mmap) {
2111832Smax.romanov@nginx.com b->is_port_mmap_sent = 1;
2121832Smax.romanov@nginx.com }
2131832Smax.romanov@nginx.com
2141832Smax.romanov@nginx.com b->mem.pos = b->mem.free;
2151832Smax.romanov@nginx.com
2161832Smax.romanov@nginx.com nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2171832Smax.romanov@nginx.com b->completion_handler, task, b, b->parent);
2181832Smax.romanov@nginx.com }
2191555Smax.romanov@nginx.com
2201555Smax.romanov@nginx.com if (notify == 0) {
2211555Smax.romanov@nginx.com return res;
2221555Smax.romanov@nginx.com }
2231555Smax.romanov@nginx.com
2241555Smax.romanov@nginx.com msg.port_msg.type = _NXT_PORT_MSG_READ_QUEUE;
2251555Smax.romanov@nginx.com msg.buf = NULL;
2261555Smax.romanov@nginx.com
2271555Smax.romanov@nginx.com } else {
2281832Smax.romanov@nginx.com qmsg.buf[0] = _NXT_PORT_MSG_READ_SOCKET;
2291555Smax.romanov@nginx.com
2301832Smax.romanov@nginx.com res = nxt_port_queue_send(port->queue, qmsg.buf, 1, ¬ify);
2311555Smax.romanov@nginx.com
2321555Smax.romanov@nginx.com nxt_debug(task, "port{%d,%d} %d: enqueue 1 notify %d, %d",
2331555Smax.romanov@nginx.com (int) port->pid, (int) port->id, port->socket.fd,
2341555Smax.romanov@nginx.com notify, res);
2351560Smax.romanov@nginx.com
2361560Smax.romanov@nginx.com if (nxt_slow_path(res == NXT_AGAIN)) {
2371560Smax.romanov@nginx.com return NXT_AGAIN;
2381560Smax.romanov@nginx.com }
2391555Smax.romanov@nginx.com }
2401555Smax.romanov@nginx.com }
24111Sigor@sysoev.ru
2421125Smax.romanov@nginx.com res = nxt_port_msg_chk_insert(task, port, &msg);
2431125Smax.romanov@nginx.com if (nxt_fast_path(res == NXT_DECLINED)) {
244344Smax.romanov@nginx.com nxt_port_write_handler(task, &port->socket, &msg);
2451125Smax.romanov@nginx.com res = NXT_OK;
24611Sigor@sysoev.ru }
24711Sigor@sysoev.ru
2481125Smax.romanov@nginx.com return res;
2491125Smax.romanov@nginx.com }
2501125Smax.romanov@nginx.com
2511125Smax.romanov@nginx.com
2521832Smax.romanov@nginx.com static nxt_bool_t
nxt_port_can_enqueue_buf(nxt_buf_t * b)2531832Smax.romanov@nginx.com nxt_port_can_enqueue_buf(nxt_buf_t *b)
2541832Smax.romanov@nginx.com {
2551832Smax.romanov@nginx.com if (b == NULL) {
2561832Smax.romanov@nginx.com return 1;
2571832Smax.romanov@nginx.com }
2581832Smax.romanov@nginx.com
2591832Smax.romanov@nginx.com if (b->next != NULL) {
2601832Smax.romanov@nginx.com return 0;
2611832Smax.romanov@nginx.com }
2621832Smax.romanov@nginx.com
2631832Smax.romanov@nginx.com return (nxt_buf_mem_used_size(&b->mem) <= NXT_PORT_MAX_ENQUEUE_BUF_SIZE
2641832Smax.romanov@nginx.com || nxt_buf_is_port_mmap(b));
2651832Smax.romanov@nginx.com }
2661832Smax.romanov@nginx.com
2671832Smax.romanov@nginx.com
2681832Smax.romanov@nginx.com static uint8_t
nxt_port_enqueue_buf(nxt_task_t * task,nxt_port_msg_t * pm,void * qbuf,nxt_buf_t * b)2691832Smax.romanov@nginx.com nxt_port_enqueue_buf(nxt_task_t *task, nxt_port_msg_t *pm, void *qbuf,
2701832Smax.romanov@nginx.com nxt_buf_t *b)
2711832Smax.romanov@nginx.com {
2721832Smax.romanov@nginx.com ssize_t size;
2731832Smax.romanov@nginx.com nxt_port_mmap_msg_t *mm;
2741832Smax.romanov@nginx.com nxt_port_mmap_header_t *hdr;
2751832Smax.romanov@nginx.com nxt_port_mmap_handler_t *mmap_handler;
2761832Smax.romanov@nginx.com
2771832Smax.romanov@nginx.com size = nxt_buf_mem_used_size(&b->mem);
2781832Smax.romanov@nginx.com
2791832Smax.romanov@nginx.com if (size <= NXT_PORT_MAX_ENQUEUE_BUF_SIZE) {
2801832Smax.romanov@nginx.com nxt_memcpy(qbuf, b->mem.pos, size);
2811832Smax.romanov@nginx.com
2821832Smax.romanov@nginx.com return size;
2831832Smax.romanov@nginx.com }
2841832Smax.romanov@nginx.com
2851832Smax.romanov@nginx.com mmap_handler = b->parent;
2861832Smax.romanov@nginx.com hdr = mmap_handler->hdr;
2871832Smax.romanov@nginx.com mm = qbuf;
2881832Smax.romanov@nginx.com
2891832Smax.romanov@nginx.com mm->mmap_id = hdr->id;
2901832Smax.romanov@nginx.com mm->chunk_id = nxt_port_mmap_chunk_id(hdr, b->mem.pos);
2911832Smax.romanov@nginx.com mm->size = nxt_buf_mem_used_size(&b->mem);
2921832Smax.romanov@nginx.com
2931832Smax.romanov@nginx.com pm->mmap = 1;
2941832Smax.romanov@nginx.com
2951832Smax.romanov@nginx.com nxt_debug(task, "mmap_msg={%D, %D, %D}", mm->mmap_id, mm->chunk_id,
2961832Smax.romanov@nginx.com mm->size);
2971832Smax.romanov@nginx.com
2981832Smax.romanov@nginx.com return sizeof(nxt_port_mmap_msg_t);
2991832Smax.romanov@nginx.com }
3001832Smax.romanov@nginx.com
3011832Smax.romanov@nginx.com
3021125Smax.romanov@nginx.com static nxt_int_t
nxt_port_msg_chk_insert(nxt_task_t * task,nxt_port_t * port,nxt_port_send_msg_t * msg)3031125Smax.romanov@nginx.com nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port,
3041125Smax.romanov@nginx.com nxt_port_send_msg_t *msg)
3051125Smax.romanov@nginx.com {
3061125Smax.romanov@nginx.com nxt_int_t res;
3071125Smax.romanov@nginx.com
3081125Smax.romanov@nginx.com nxt_thread_mutex_lock(&port->write_mutex);
3091125Smax.romanov@nginx.com
3101125Smax.romanov@nginx.com if (nxt_fast_path(port->socket.write_ready
3111125Smax.romanov@nginx.com && nxt_queue_is_empty(&port->messages)))
3121125Smax.romanov@nginx.com {
3131125Smax.romanov@nginx.com res = NXT_DECLINED;
3141125Smax.romanov@nginx.com
3151125Smax.romanov@nginx.com } else {
3161125Smax.romanov@nginx.com msg = nxt_port_msg_alloc(msg);
3171125Smax.romanov@nginx.com
3181125Smax.romanov@nginx.com if (nxt_fast_path(msg != NULL)) {
3191125Smax.romanov@nginx.com nxt_queue_insert_tail(&port->messages, &msg->link);
3201125Smax.romanov@nginx.com nxt_port_use(task, port, 1);
3211125Smax.romanov@nginx.com res = NXT_OK;
3221125Smax.romanov@nginx.com
3231125Smax.romanov@nginx.com } else {
3241125Smax.romanov@nginx.com res = NXT_ERROR;
3251125Smax.romanov@nginx.com }
3261125Smax.romanov@nginx.com }
3271125Smax.romanov@nginx.com
3281125Smax.romanov@nginx.com nxt_thread_mutex_unlock(&port->write_mutex);
3291125Smax.romanov@nginx.com
3301125Smax.romanov@nginx.com return res;
3311125Smax.romanov@nginx.com }
3321125Smax.romanov@nginx.com
3331125Smax.romanov@nginx.com
3341125Smax.romanov@nginx.com static nxt_port_send_msg_t *
nxt_port_msg_alloc(const nxt_port_send_msg_t * m)335*2139Sandrew@digital-domain.net nxt_port_msg_alloc(const nxt_port_send_msg_t *m)
3361125Smax.romanov@nginx.com {
3371125Smax.romanov@nginx.com nxt_port_send_msg_t *msg;
3381125Smax.romanov@nginx.com
3391125Smax.romanov@nginx.com msg = nxt_malloc(sizeof(nxt_port_send_msg_t));
3401125Smax.romanov@nginx.com if (nxt_slow_path(msg == NULL)) {
3411125Smax.romanov@nginx.com return NULL;
3421125Smax.romanov@nginx.com }
3431125Smax.romanov@nginx.com
3441125Smax.romanov@nginx.com *msg = *m;
3451125Smax.romanov@nginx.com
3461125Smax.romanov@nginx.com msg->allocated = 1;
3471125Smax.romanov@nginx.com
3481125Smax.romanov@nginx.com return msg;
34911Sigor@sysoev.ru }
35011Sigor@sysoev.ru
35111Sigor@sysoev.ru
35211Sigor@sysoev.ru static void
nxt_port_fd_block_write(nxt_task_t * task,nxt_port_t * port,void * data)353343Smax.romanov@nginx.com nxt_port_fd_block_write(nxt_task_t *task, nxt_port_t *port, void *data)
354343Smax.romanov@nginx.com {
355343Smax.romanov@nginx.com nxt_fd_event_block_write(task->thread->engine, &port->socket);
356343Smax.romanov@nginx.com }
357343Smax.romanov@nginx.com
358343Smax.romanov@nginx.com
359343Smax.romanov@nginx.com static void
nxt_port_fd_enable_write(nxt_task_t * task,nxt_port_t * port,void * data)360343Smax.romanov@nginx.com nxt_port_fd_enable_write(nxt_task_t *task, nxt_port_t *port, void *data)
361343Smax.romanov@nginx.com {
362343Smax.romanov@nginx.com nxt_fd_event_enable_write(task->thread->engine, &port->socket);
363343Smax.romanov@nginx.com }
364343Smax.romanov@nginx.com
365343Smax.romanov@nginx.com
366343Smax.romanov@nginx.com static void
nxt_port_write_handler(nxt_task_t * task,void * obj,void * data)36711Sigor@sysoev.ru nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
36811Sigor@sysoev.ru {
369343Smax.romanov@nginx.com int use_delta;
370197Smax.romanov@nginx.com size_t plain_size;
37111Sigor@sysoev.ru ssize_t n;
3721125Smax.romanov@nginx.com uint32_t mmsg_buf[3 * NXT_IOBUF_MAX * 10];
373343Smax.romanov@nginx.com nxt_bool_t block_write, enable_write;
37411Sigor@sysoev.ru nxt_port_t *port;
3751125Smax.romanov@nginx.com struct iovec iov[NXT_IOBUF_MAX * 10];
376127Smax.romanov@nginx.com nxt_work_queue_t *wq;
377125Smax.romanov@nginx.com nxt_port_method_t m;
37811Sigor@sysoev.ru nxt_port_send_msg_t *msg;
37911Sigor@sysoev.ru nxt_sendbuf_coalesce_t sb;
38042Smax.romanov@nginx.com
381197Smax.romanov@nginx.com port = nxt_container_of(obj, nxt_port_t, socket);
38211Sigor@sysoev.ru
383343Smax.romanov@nginx.com block_write = 0;
384343Smax.romanov@nginx.com enable_write = 0;
385343Smax.romanov@nginx.com use_delta = 0;
386343Smax.romanov@nginx.com
387344Smax.romanov@nginx.com wq = &task->thread->engine->fast_work_queue;
388344Smax.romanov@nginx.com
38911Sigor@sysoev.ru do {
3901125Smax.romanov@nginx.com if (data) {
3911125Smax.romanov@nginx.com msg = data;
3921125Smax.romanov@nginx.com
3931125Smax.romanov@nginx.com } else {
3941125Smax.romanov@nginx.com msg = nxt_port_msg_first(port);
39511Sigor@sysoev.ru
3961125Smax.romanov@nginx.com if (msg == NULL) {
3971125Smax.romanov@nginx.com block_write = 1;
3981125Smax.romanov@nginx.com goto cleanup;
3991125Smax.romanov@nginx.com }
40011Sigor@sysoev.ru }
40111Sigor@sysoev.ru
4021125Smax.romanov@nginx.com next_fragment:
4031125Smax.romanov@nginx.com
40414Sigor@sysoev.ru iov[0].iov_base = &msg->port_msg;
40514Sigor@sysoev.ru iov[0].iov_len = sizeof(nxt_port_msg_t);
40611Sigor@sysoev.ru
40711Sigor@sysoev.ru sb.buf = msg->buf;
40814Sigor@sysoev.ru sb.iobuf = &iov[1];
40911Sigor@sysoev.ru sb.nmax = NXT_IOBUF_MAX - 1;
41011Sigor@sysoev.ru sb.sync = 0;
41111Sigor@sysoev.ru sb.last = 0;
41242Smax.romanov@nginx.com sb.size = 0;
41311Sigor@sysoev.ru sb.limit = port->max_size;
41411Sigor@sysoev.ru
415352Smax.romanov@nginx.com sb.limit_reached = 0;
416352Smax.romanov@nginx.com sb.nmax_reached = 0;
417352Smax.romanov@nginx.com
41842Smax.romanov@nginx.com m = nxt_port_mmap_get_method(task, port, msg->buf);
41942Smax.romanov@nginx.com
42042Smax.romanov@nginx.com if (m == NXT_PORT_METHOD_MMAP) {
42142Smax.romanov@nginx.com sb.limit = (1ULL << 31) - 1;
422352Smax.romanov@nginx.com sb.nmax = nxt_min(NXT_IOBUF_MAX * 10 - 1,
423352Smax.romanov@nginx.com port->max_size / PORT_MMAP_MIN_SIZE);
42442Smax.romanov@nginx.com }
42542Smax.romanov@nginx.com
4261002Smax.romanov@nginx.com sb.limit -= iov[0].iov_len;
4271002Smax.romanov@nginx.com
42842Smax.romanov@nginx.com nxt_sendbuf_mem_coalesce(task, &sb);
42942Smax.romanov@nginx.com
43042Smax.romanov@nginx.com plain_size = sb.size;
43142Smax.romanov@nginx.com
43242Smax.romanov@nginx.com /*
43342Smax.romanov@nginx.com * Send through mmap enabled only when payload
43442Smax.romanov@nginx.com * is bigger than PORT_MMAP_MIN_SIZE.
43542Smax.romanov@nginx.com */
43642Smax.romanov@nginx.com if (m == NXT_PORT_METHOD_MMAP && plain_size > PORT_MMAP_MIN_SIZE) {
4371125Smax.romanov@nginx.com nxt_port_mmap_write(task, port, msg, &sb, mmsg_buf);
43842Smax.romanov@nginx.com
43942Smax.romanov@nginx.com } else {
44042Smax.romanov@nginx.com m = NXT_PORT_METHOD_PLAIN;
44142Smax.romanov@nginx.com }
44211Sigor@sysoev.ru
443189Smax.romanov@nginx.com msg->port_msg.last |= sb.last;
444352Smax.romanov@nginx.com msg->port_msg.mf = sb.limit_reached || sb.nmax_reached;
44511Sigor@sysoev.ru
4461558Smax.romanov@nginx.com n = nxt_socketpair_send(&port->socket, msg->fd, iov, sb.niov + 1);
44711Sigor@sysoev.ru
44811Sigor@sysoev.ru if (n > 0) {
44942Smax.romanov@nginx.com if (nxt_slow_path((size_t) n != sb.size + iov[0].iov_len)) {
450564Svbart@nginx.com nxt_alert(task, "port %d: short write: %z instead of %uz",
451564Svbart@nginx.com port->socket.fd, n, sb.size + iov[0].iov_len);
45211Sigor@sysoev.ru goto fail;
45311Sigor@sysoev.ru }
45411Sigor@sysoev.ru
4551908Smax.romanov@nginx.com nxt_port_msg_close_fd(msg);
4561553Smax.romanov@nginx.com
457592Sigor@sysoev.ru msg->buf = nxt_port_buf_completion(task, wq, msg->buf, plain_size,
4581125Smax.romanov@nginx.com m == NXT_PORT_METHOD_MMAP);
45911Sigor@sysoev.ru
46011Sigor@sysoev.ru if (msg->buf != NULL) {
461352Smax.romanov@nginx.com nxt_debug(task, "port %d: frag stream #%uD", port->socket.fd,
462352Smax.romanov@nginx.com msg->port_msg.stream);
463352Smax.romanov@nginx.com
46411Sigor@sysoev.ru /*
46511Sigor@sysoev.ru * A file descriptor is sent only
46611Sigor@sysoev.ru * in the first message of a stream.
46711Sigor@sysoev.ru */
4681558Smax.romanov@nginx.com msg->fd[0] = -1;
4691558Smax.romanov@nginx.com msg->fd[1] = -1;
47011Sigor@sysoev.ru msg->share += n;
471352Smax.romanov@nginx.com msg->port_msg.nf = 1;
47211Sigor@sysoev.ru
47311Sigor@sysoev.ru if (msg->share >= port->max_share) {
47411Sigor@sysoev.ru msg->share = 0;
475344Smax.romanov@nginx.com
476344Smax.romanov@nginx.com if (msg->link.next != NULL) {
4771125Smax.romanov@nginx.com nxt_thread_mutex_lock(&port->write_mutex);
4781125Smax.romanov@nginx.com
479344Smax.romanov@nginx.com nxt_queue_remove(&msg->link);
4801125Smax.romanov@nginx.com nxt_queue_insert_tail(&port->messages, &msg->link);
4811125Smax.romanov@nginx.com
4821125Smax.romanov@nginx.com nxt_thread_mutex_unlock(&port->write_mutex);
483344Smax.romanov@nginx.com
4841125Smax.romanov@nginx.com } else {
4851125Smax.romanov@nginx.com msg = nxt_port_msg_insert_tail(port, msg);
4861125Smax.romanov@nginx.com if (nxt_slow_path(msg == NULL)) {
4871125Smax.romanov@nginx.com goto fail;
4881125Smax.romanov@nginx.com }
4891125Smax.romanov@nginx.com
490344Smax.romanov@nginx.com use_delta++;
491344Smax.romanov@nginx.com }
4921125Smax.romanov@nginx.com
4931125Smax.romanov@nginx.com } else {
4941125Smax.romanov@nginx.com goto next_fragment;
49511Sigor@sysoev.ru }
49611Sigor@sysoev.ru
49711Sigor@sysoev.ru } else {
498344Smax.romanov@nginx.com if (msg->link.next != NULL) {
4991125Smax.romanov@nginx.com nxt_thread_mutex_lock(&port->write_mutex);
5001125Smax.romanov@nginx.com
501344Smax.romanov@nginx.com nxt_queue_remove(&msg->link);
5021125Smax.romanov@nginx.com msg->link.next = NULL;
5031125Smax.romanov@nginx.com
5041125Smax.romanov@nginx.com nxt_thread_mutex_unlock(&port->write_mutex);
5051125Smax.romanov@nginx.com
506344Smax.romanov@nginx.com use_delta--;
507344Smax.romanov@nginx.com }
5081125Smax.romanov@nginx.com
5091125Smax.romanov@nginx.com nxt_port_release_send_msg(msg);
5101125Smax.romanov@nginx.com }
5111125Smax.romanov@nginx.com
5121125Smax.romanov@nginx.com if (data != NULL) {
5131125Smax.romanov@nginx.com goto cleanup;
51411Sigor@sysoev.ru }
51511Sigor@sysoev.ru
5161004Smax.romanov@nginx.com } else {
5171125Smax.romanov@nginx.com if (nxt_slow_path(n == NXT_ERROR)) {
5181907Smax.romanov@nginx.com if (msg->link.next == NULL) {
5191908Smax.romanov@nginx.com nxt_port_msg_close_fd(msg);
5201907Smax.romanov@nginx.com
5211907Smax.romanov@nginx.com nxt_port_release_send_msg(msg);
5221907Smax.romanov@nginx.com }
5231907Smax.romanov@nginx.com
5241125Smax.romanov@nginx.com goto fail;
525344Smax.romanov@nginx.com }
5261004Smax.romanov@nginx.com
5271125Smax.romanov@nginx.com if (msg->link.next == NULL) {
5281125Smax.romanov@nginx.com msg = nxt_port_msg_insert_tail(port, msg);
5291125Smax.romanov@nginx.com if (nxt_slow_path(msg == NULL)) {
5301125Smax.romanov@nginx.com goto fail;
5311125Smax.romanov@nginx.com }
5321125Smax.romanov@nginx.com
5331125Smax.romanov@nginx.com use_delta++;
5341004Smax.romanov@nginx.com }
53511Sigor@sysoev.ru }
53611Sigor@sysoev.ru
53711Sigor@sysoev.ru } while (port->socket.write_ready);
53811Sigor@sysoev.ru
53912Sigor@sysoev.ru if (nxt_fd_event_is_disabled(port->socket.write)) {
540343Smax.romanov@nginx.com enable_write = 1;
54111Sigor@sysoev.ru }
54211Sigor@sysoev.ru
5431125Smax.romanov@nginx.com goto cleanup;
54411Sigor@sysoev.ru
54511Sigor@sysoev.ru fail:
54611Sigor@sysoev.ru
547343Smax.romanov@nginx.com use_delta++;
548343Smax.romanov@nginx.com
549344Smax.romanov@nginx.com nxt_work_queue_add(wq, nxt_port_error_handler, task, &port->socket,
550343Smax.romanov@nginx.com &port->socket);
551343Smax.romanov@nginx.com
5521125Smax.romanov@nginx.com cleanup:
553343Smax.romanov@nginx.com
554343Smax.romanov@nginx.com if (block_write && nxt_fd_event_is_active(port->socket.write)) {
555343Smax.romanov@nginx.com nxt_port_post(task, port, nxt_port_fd_block_write, NULL);
556343Smax.romanov@nginx.com }
557343Smax.romanov@nginx.com
558343Smax.romanov@nginx.com if (enable_write) {
559343Smax.romanov@nginx.com nxt_port_post(task, port, nxt_port_fd_enable_write, NULL);
560343Smax.romanov@nginx.com }
561343Smax.romanov@nginx.com
562343Smax.romanov@nginx.com if (use_delta != 0) {
563343Smax.romanov@nginx.com nxt_port_use(task, port, use_delta);
564343Smax.romanov@nginx.com }
56511Sigor@sysoev.ru }
56611Sigor@sysoev.ru
56711Sigor@sysoev.ru
5681125Smax.romanov@nginx.com static nxt_port_send_msg_t *
nxt_port_msg_first(nxt_port_t * port)5691125Smax.romanov@nginx.com nxt_port_msg_first(nxt_port_t *port)
5701125Smax.romanov@nginx.com {
5711125Smax.romanov@nginx.com nxt_queue_link_t *lnk;
5721125Smax.romanov@nginx.com nxt_port_send_msg_t *msg;
5731125Smax.romanov@nginx.com
5741125Smax.romanov@nginx.com nxt_thread_mutex_lock(&port->write_mutex);
5751125Smax.romanov@nginx.com
5761125Smax.romanov@nginx.com lnk = nxt_queue_first(&port->messages);
5771125Smax.romanov@nginx.com
5781125Smax.romanov@nginx.com if (lnk == nxt_queue_tail(&port->messages)) {
5791125Smax.romanov@nginx.com msg = NULL;
5801125Smax.romanov@nginx.com
5811125Smax.romanov@nginx.com } else {
5821125Smax.romanov@nginx.com msg = nxt_queue_link_data(lnk, nxt_port_send_msg_t, link);
5831125Smax.romanov@nginx.com }
5841125Smax.romanov@nginx.com
5851125Smax.romanov@nginx.com nxt_thread_mutex_unlock(&port->write_mutex);
5861125Smax.romanov@nginx.com
5871125Smax.romanov@nginx.com return msg;
5881125Smax.romanov@nginx.com }
5891125Smax.romanov@nginx.com
5901125Smax.romanov@nginx.com
5911908Smax.romanov@nginx.com nxt_inline void
nxt_port_msg_close_fd(nxt_port_send_msg_t * msg)5921908Smax.romanov@nginx.com nxt_port_msg_close_fd(nxt_port_send_msg_t *msg)
5931908Smax.romanov@nginx.com {
5941908Smax.romanov@nginx.com if (!msg->close_fd) {
5951908Smax.romanov@nginx.com return;
5961908Smax.romanov@nginx.com }
5971908Smax.romanov@nginx.com
5981996St.nateldemoura@f5.com nxt_port_close_fds(msg->fd);
5991996St.nateldemoura@f5.com }
6001996St.nateldemoura@f5.com
6011908Smax.romanov@nginx.com
6021996St.nateldemoura@f5.com nxt_inline void
nxt_port_close_fds(nxt_fd_t * fd)6031996St.nateldemoura@f5.com nxt_port_close_fds(nxt_fd_t *fd)
6041996St.nateldemoura@f5.com {
6051996St.nateldemoura@f5.com if (fd[0] != -1) {
6061996St.nateldemoura@f5.com nxt_fd_close(fd[0]);
6071996St.nateldemoura@f5.com fd[0] = -1;
6081908Smax.romanov@nginx.com }
6091908Smax.romanov@nginx.com
6101996St.nateldemoura@f5.com if (fd[1] != -1) {
6111996St.nateldemoura@f5.com nxt_fd_close(fd[1]);
6121996St.nateldemoura@f5.com fd[1] = -1;
6131908Smax.romanov@nginx.com }
6141908Smax.romanov@nginx.com }
6151908Smax.romanov@nginx.com
6161908Smax.romanov@nginx.com
617592Sigor@sysoev.ru static nxt_buf_t *
nxt_port_buf_completion(nxt_task_t * task,nxt_work_queue_t * wq,nxt_buf_t * b,size_t sent,nxt_bool_t mmap_mode)618592Sigor@sysoev.ru nxt_port_buf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b,
619592Sigor@sysoev.ru size_t sent, nxt_bool_t mmap_mode)
620592Sigor@sysoev.ru {
6211269Sigor@sysoev.ru size_t size;
6221269Sigor@sysoev.ru nxt_buf_t *next;
623592Sigor@sysoev.ru
624592Sigor@sysoev.ru while (b != NULL) {
625592Sigor@sysoev.ru
626592Sigor@sysoev.ru nxt_prefetch(b->next);
627592Sigor@sysoev.ru
628592Sigor@sysoev.ru if (!nxt_buf_is_sync(b)) {
629592Sigor@sysoev.ru
630592Sigor@sysoev.ru size = nxt_buf_used_size(b);
631592Sigor@sysoev.ru
632592Sigor@sysoev.ru if (size != 0) {
633592Sigor@sysoev.ru
634592Sigor@sysoev.ru if (sent == 0) {
635592Sigor@sysoev.ru break;
636592Sigor@sysoev.ru }
637592Sigor@sysoev.ru
638592Sigor@sysoev.ru if (nxt_buf_is_port_mmap(b) && mmap_mode) {
639592Sigor@sysoev.ru /*
640592Sigor@sysoev.ru * buffer has been sent to other side which is now
641592Sigor@sysoev.ru * responsible for shared memory bucket release
642592Sigor@sysoev.ru */
643592Sigor@sysoev.ru b->is_port_mmap_sent = 1;
644592Sigor@sysoev.ru }
645592Sigor@sysoev.ru
646592Sigor@sysoev.ru if (sent < size) {
647592Sigor@sysoev.ru
648592Sigor@sysoev.ru if (nxt_buf_is_mem(b)) {
649592Sigor@sysoev.ru b->mem.pos += sent;
650592Sigor@sysoev.ru }
651592Sigor@sysoev.ru
652592Sigor@sysoev.ru if (nxt_buf_is_file(b)) {
653592Sigor@sysoev.ru b->file_pos += sent;
654592Sigor@sysoev.ru }
655592Sigor@sysoev.ru
656592Sigor@sysoev.ru break;
657592Sigor@sysoev.ru }
658592Sigor@sysoev.ru
659592Sigor@sysoev.ru /* b->mem.free is NULL in file-only buffer. */
660592Sigor@sysoev.ru b->mem.pos = b->mem.free;
661592Sigor@sysoev.ru
662592Sigor@sysoev.ru if (nxt_buf_is_file(b)) {
663592Sigor@sysoev.ru b->file_pos = b->file_end;
664592Sigor@sysoev.ru }
665592Sigor@sysoev.ru
666592Sigor@sysoev.ru sent -= size;
667592Sigor@sysoev.ru }
668592Sigor@sysoev.ru }
669592Sigor@sysoev.ru
670592Sigor@sysoev.ru nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
671592Sigor@sysoev.ru
6721269Sigor@sysoev.ru next = b->next;
6731269Sigor@sysoev.ru b->next = NULL;
6741269Sigor@sysoev.ru b = next;
675592Sigor@sysoev.ru }
676592Sigor@sysoev.ru
677592Sigor@sysoev.ru return b;
678592Sigor@sysoev.ru }
679592Sigor@sysoev.ru
680592Sigor@sysoev.ru
6811125Smax.romanov@nginx.com static nxt_port_send_msg_t *
nxt_port_msg_insert_tail(nxt_port_t * port,nxt_port_send_msg_t * msg)6821125Smax.romanov@nginx.com nxt_port_msg_insert_tail(nxt_port_t *port, nxt_port_send_msg_t *msg)
6831125Smax.romanov@nginx.com {
6841125Smax.romanov@nginx.com if (msg->allocated == 0) {
6851125Smax.romanov@nginx.com msg = nxt_port_msg_alloc(msg);
6861125Smax.romanov@nginx.com
6871125Smax.romanov@nginx.com if (nxt_slow_path(msg == NULL)) {
6881125Smax.romanov@nginx.com return NULL;
6891125Smax.romanov@nginx.com }
6901125Smax.romanov@nginx.com }
6911125Smax.romanov@nginx.com
6921125Smax.romanov@nginx.com nxt_thread_mutex_lock(&port->write_mutex);
6931125Smax.romanov@nginx.com
6941125Smax.romanov@nginx.com nxt_queue_insert_tail(&port->messages, &msg->link);
6951125Smax.romanov@nginx.com
6961125Smax.romanov@nginx.com nxt_thread_mutex_unlock(&port->write_mutex);
6971125Smax.romanov@nginx.com
6981125Smax.romanov@nginx.com return msg;
6991125Smax.romanov@nginx.com }
7001125Smax.romanov@nginx.com
7011125Smax.romanov@nginx.com
70211Sigor@sysoev.ru void
nxt_port_read_enable(nxt_task_t * task,nxt_port_t * port)70311Sigor@sysoev.ru nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port)
70411Sigor@sysoev.ru {
70511Sigor@sysoev.ru port->socket.fd = port->pair[0];
70611Sigor@sysoev.ru port->socket.log = &nxt_main_log;
70711Sigor@sysoev.ru
708141Smax.romanov@nginx.com port->engine = task->thread->engine;
709141Smax.romanov@nginx.com
710141Smax.romanov@nginx.com port->socket.read_work_queue = &port->engine->fast_work_queue;
7111555Smax.romanov@nginx.com port->socket.read_handler = port->queue != NULL
7121555Smax.romanov@nginx.com ? nxt_port_queue_read_handler
7131555Smax.romanov@nginx.com : nxt_port_read_handler;
71411Sigor@sysoev.ru port->socket.error_handler = nxt_port_error_handler;
71511Sigor@sysoev.ru
716141Smax.romanov@nginx.com nxt_fd_event_enable_read(port->engine, &port->socket);
71711Sigor@sysoev.ru }
71811Sigor@sysoev.ru
71911Sigor@sysoev.ru
72011Sigor@sysoev.ru void
nxt_port_read_close(nxt_port_t * port)72111Sigor@sysoev.ru nxt_port_read_close(nxt_port_t *port)
72211Sigor@sysoev.ru {
723350Smax.romanov@nginx.com port->socket.read_ready = 0;
7241015Smax.romanov@nginx.com port->socket.read = NXT_EVENT_INACTIVE;
72513Sigor@sysoev.ru nxt_socket_close(port->socket.task, port->pair[0]);
72611Sigor@sysoev.ru port->pair[0] = -1;
72711Sigor@sysoev.ru }
72811Sigor@sysoev.ru
72911Sigor@sysoev.ru
73011Sigor@sysoev.ru static void
nxt_port_read_handler(nxt_task_t * task,void * obj,void * data)73111Sigor@sysoev.ru nxt_port_read_handler(nxt_task_t *task, void *obj, void *data)
73211Sigor@sysoev.ru {
73342Smax.romanov@nginx.com ssize_t n;
73442Smax.romanov@nginx.com nxt_buf_t *b;
7351996St.nateldemoura@f5.com nxt_int_t ret;
73642Smax.romanov@nginx.com nxt_port_t *port;
7371996St.nateldemoura@f5.com nxt_recv_oob_t oob;
7381996St.nateldemoura@f5.com nxt_port_recv_msg_t msg;
73942Smax.romanov@nginx.com struct iovec iov[2];
74011Sigor@sysoev.ru
741125Smax.romanov@nginx.com port = msg.port = nxt_container_of(obj, nxt_port_t, socket);
74211Sigor@sysoev.ru
743141Smax.romanov@nginx.com nxt_assert(port->engine == task->thread->engine);
744141Smax.romanov@nginx.com
74511Sigor@sysoev.ru for ( ;; ) {
74611Sigor@sysoev.ru b = nxt_port_buf_alloc(port);
74711Sigor@sysoev.ru
74811Sigor@sysoev.ru if (nxt_slow_path(b == NULL)) {
74911Sigor@sysoev.ru /* TODO: disable event for some time */
75011Sigor@sysoev.ru }
75111Sigor@sysoev.ru
75242Smax.romanov@nginx.com iov[0].iov_base = &msg.port_msg;
75314Sigor@sysoev.ru iov[0].iov_len = sizeof(nxt_port_msg_t);
75411Sigor@sysoev.ru
75514Sigor@sysoev.ru iov[1].iov_base = b->mem.pos;
75614Sigor@sysoev.ru iov[1].iov_len = port->max_size;
75714Sigor@sysoev.ru
7581996St.nateldemoura@f5.com n = nxt_socketpair_recv(&port->socket, iov, 2, &oob);
75911Sigor@sysoev.ru
76011Sigor@sysoev.ru if (n > 0) {
7611996St.nateldemoura@f5.com msg.fd[0] = -1;
7621996St.nateldemoura@f5.com msg.fd[1] = -1;
7631996St.nateldemoura@f5.com
7641996St.nateldemoura@f5.com ret = nxt_socket_msg_oob_get(&oob, msg.fd,
7651996St.nateldemoura@f5.com nxt_recv_msg_cmsg_pid_ref(&msg));
7661996St.nateldemoura@f5.com if (nxt_slow_path(ret != NXT_OK)) {
7671996St.nateldemoura@f5.com nxt_alert(task, "failed to get oob data from %d",
7681996St.nateldemoura@f5.com port->socket.fd);
7691996St.nateldemoura@f5.com
7701996St.nateldemoura@f5.com nxt_port_close_fds(msg.fd);
7711996St.nateldemoura@f5.com
7721996St.nateldemoura@f5.com goto fail;
7731996St.nateldemoura@f5.com }
77442Smax.romanov@nginx.com
77542Smax.romanov@nginx.com msg.buf = b;
77682Smax.romanov@nginx.com msg.size = n;
77742Smax.romanov@nginx.com
77882Smax.romanov@nginx.com nxt_port_read_msg_process(task, port, &msg);
77911Sigor@sysoev.ru
780194Smax.romanov@nginx.com /*
781194Smax.romanov@nginx.com * To disable instant completion or buffer re-usage,
782194Smax.romanov@nginx.com * handler should reset 'msg.buf'.
783194Smax.romanov@nginx.com */
784194Smax.romanov@nginx.com if (msg.buf == b) {
78511Sigor@sysoev.ru nxt_port_buf_free(port, b);
78611Sigor@sysoev.ru }
78711Sigor@sysoev.ru
78811Sigor@sysoev.ru if (port->socket.read_ready) {
78911Sigor@sysoev.ru continue;
79011Sigor@sysoev.ru }
79111Sigor@sysoev.ru
79211Sigor@sysoev.ru return;
79311Sigor@sysoev.ru }
79411Sigor@sysoev.ru
79511Sigor@sysoev.ru if (n == NXT_AGAIN) {
79611Sigor@sysoev.ru nxt_port_buf_free(port, b);
79711Sigor@sysoev.ru
79812Sigor@sysoev.ru nxt_fd_event_enable_read(task->thread->engine, &port->socket);
79911Sigor@sysoev.ru return;
80011Sigor@sysoev.ru }
80111Sigor@sysoev.ru
8021996St.nateldemoura@f5.com fail:
8031996St.nateldemoura@f5.com /* n == 0 || error */
80411Sigor@sysoev.ru nxt_work_queue_add(&task->thread->engine->fast_work_queue,
80511Sigor@sysoev.ru nxt_port_error_handler, task, &port->socket, NULL);
80611Sigor@sysoev.ru return;
80711Sigor@sysoev.ru }
80811Sigor@sysoev.ru }
80911Sigor@sysoev.ru
81011Sigor@sysoev.ru
8111555Smax.romanov@nginx.com static void
nxt_port_queue_read_handler(nxt_task_t * task,void * obj,void * data)8121555Smax.romanov@nginx.com nxt_port_queue_read_handler(nxt_task_t *task, void *obj, void *data)
8131555Smax.romanov@nginx.com {
8141555Smax.romanov@nginx.com ssize_t n;
8151555Smax.romanov@nginx.com nxt_buf_t *b;
8161996St.nateldemoura@f5.com nxt_int_t ret;
8171555Smax.romanov@nginx.com nxt_port_t *port;
8181555Smax.romanov@nginx.com struct iovec iov[2];
8191996St.nateldemoura@f5.com nxt_recv_oob_t oob;
8201555Smax.romanov@nginx.com nxt_port_queue_t *queue;
8211555Smax.romanov@nginx.com nxt_port_recv_msg_t msg, *smsg;
8221555Smax.romanov@nginx.com uint8_t qmsg[NXT_PORT_QUEUE_MSG_SIZE];
8231555Smax.romanov@nginx.com
8241555Smax.romanov@nginx.com port = nxt_container_of(obj, nxt_port_t, socket);
8251555Smax.romanov@nginx.com msg.port = port;
8261555Smax.romanov@nginx.com
8271555Smax.romanov@nginx.com nxt_assert(port->engine == task->thread->engine);
8281555Smax.romanov@nginx.com
8291555Smax.romanov@nginx.com queue = port->queue;
8301555Smax.romanov@nginx.com nxt_atomic_fetch_add(&queue->nitems, 1);
8311555Smax.romanov@nginx.com
8321555Smax.romanov@nginx.com for ( ;; ) {
8331555Smax.romanov@nginx.com
8341555Smax.romanov@nginx.com if (port->from_socket == 0) {
8351555Smax.romanov@nginx.com n = nxt_port_queue_recv(queue, qmsg);
8361555Smax.romanov@nginx.com
8371555Smax.romanov@nginx.com if (n < 0 && !port->socket.read_ready) {
8381555Smax.romanov@nginx.com nxt_atomic_fetch_add(&queue->nitems, -1);
8391555Smax.romanov@nginx.com
8401555Smax.romanov@nginx.com n = nxt_port_queue_recv(queue, qmsg);
8411555Smax.romanov@nginx.com if (n < 0) {
8421555Smax.romanov@nginx.com return;
8431555Smax.romanov@nginx.com }
8441555Smax.romanov@nginx.com
8451555Smax.romanov@nginx.com nxt_atomic_fetch_add(&queue->nitems, 1);
8461555Smax.romanov@nginx.com }
8471555Smax.romanov@nginx.com
8481555Smax.romanov@nginx.com if (n == 1 && qmsg[0] == _NXT_PORT_MSG_READ_SOCKET) {
8491555Smax.romanov@nginx.com port->from_socket++;
8501555Smax.romanov@nginx.com
8511555Smax.romanov@nginx.com nxt_debug(task, "port{%d,%d} %d: dequeue 1 read_socket %d",
8521555Smax.romanov@nginx.com (int) port->pid, (int) port->id, port->socket.fd,
8531555Smax.romanov@nginx.com port->from_socket);
8541555Smax.romanov@nginx.com
8551555Smax.romanov@nginx.com continue;
8561555Smax.romanov@nginx.com }
8571555Smax.romanov@nginx.com
8581555Smax.romanov@nginx.com nxt_debug(task, "port{%d,%d} %d: dequeue %d",
8591555Smax.romanov@nginx.com (int) port->pid, (int) port->id, port->socket.fd,
8601555Smax.romanov@nginx.com (int) n);
8611555Smax.romanov@nginx.com
8621555Smax.romanov@nginx.com } else {
8631555Smax.romanov@nginx.com if ((smsg = port->socket_msg) != NULL && smsg->size != 0) {
8641555Smax.romanov@nginx.com msg.port_msg = smsg->port_msg;
8651555Smax.romanov@nginx.com b = smsg->buf;
8661555Smax.romanov@nginx.com n = smsg->size;
8671558Smax.romanov@nginx.com msg.fd[0] = smsg->fd[0];
8681558Smax.romanov@nginx.com msg.fd[1] = smsg->fd[1];
8691555Smax.romanov@nginx.com
8701555Smax.romanov@nginx.com smsg->size = 0;
8711555Smax.romanov@nginx.com
8721555Smax.romanov@nginx.com port->from_socket--;
8731555Smax.romanov@nginx.com
8741555Smax.romanov@nginx.com nxt_debug(task, "port{%d,%d} %d: use suspended message %d",
8751555Smax.romanov@nginx.com (int) port->pid, (int) port->id, port->socket.fd,
8761555Smax.romanov@nginx.com (int) n);
8771555Smax.romanov@nginx.com
8781555Smax.romanov@nginx.com goto process;
8791555Smax.romanov@nginx.com }
8801555Smax.romanov@nginx.com
8811555Smax.romanov@nginx.com n = -1;
8821555Smax.romanov@nginx.com }
8831555Smax.romanov@nginx.com
8841555Smax.romanov@nginx.com if (n < 0 && !port->socket.read_ready) {
8851555Smax.romanov@nginx.com nxt_atomic_fetch_add(&queue->nitems, -1);
8861555Smax.romanov@nginx.com return;
8871555Smax.romanov@nginx.com }
8881555Smax.romanov@nginx.com
8891555Smax.romanov@nginx.com b = nxt_port_buf_alloc(port);
8901555Smax.romanov@nginx.com
8911555Smax.romanov@nginx.com if (nxt_slow_path(b == NULL)) {
8921555Smax.romanov@nginx.com /* TODO: disable event for some time */
8931555Smax.romanov@nginx.com }
8941555Smax.romanov@nginx.com
8951555Smax.romanov@nginx.com if (n >= (ssize_t) sizeof(nxt_port_msg_t)) {
8961555Smax.romanov@nginx.com nxt_memcpy(&msg.port_msg, qmsg, sizeof(nxt_port_msg_t));
8971555Smax.romanov@nginx.com
8981555Smax.romanov@nginx.com if (n > (ssize_t) sizeof(nxt_port_msg_t)) {
8991555Smax.romanov@nginx.com nxt_memcpy(b->mem.pos, qmsg + sizeof(nxt_port_msg_t),
9001555Smax.romanov@nginx.com n - sizeof(nxt_port_msg_t));
9011555Smax.romanov@nginx.com }
9021555Smax.romanov@nginx.com
9031555Smax.romanov@nginx.com } else {
9041555Smax.romanov@nginx.com iov[0].iov_base = &msg.port_msg;
9051555Smax.romanov@nginx.com iov[0].iov_len = sizeof(nxt_port_msg_t);
9061555Smax.romanov@nginx.com
9071555Smax.romanov@nginx.com iov[1].iov_base = b->mem.pos;
9081555Smax.romanov@nginx.com iov[1].iov_len = port->max_size;
9091555Smax.romanov@nginx.com
9101996St.nateldemoura@f5.com n = nxt_socketpair_recv(&port->socket, iov, 2, &oob);
9111996St.nateldemoura@f5.com
9121996St.nateldemoura@f5.com if (n > 0) {
9131996St.nateldemoura@f5.com msg.fd[0] = -1;
9141996St.nateldemoura@f5.com msg.fd[1] = -1;
9151996St.nateldemoura@f5.com
9161996St.nateldemoura@f5.com ret = nxt_socket_msg_oob_get(&oob, msg.fd,
9171996St.nateldemoura@f5.com nxt_recv_msg_cmsg_pid_ref(&msg));
9181996St.nateldemoura@f5.com if (nxt_slow_path(ret != NXT_OK)) {
9191996St.nateldemoura@f5.com nxt_alert(task, "failed to get oob data from %d",
9201996St.nateldemoura@f5.com port->socket.fd);
9211996St.nateldemoura@f5.com
9221996St.nateldemoura@f5.com nxt_port_close_fds(msg.fd);
9231996St.nateldemoura@f5.com
9241996St.nateldemoura@f5.com return;
9251996St.nateldemoura@f5.com }
9261996St.nateldemoura@f5.com }
9271555Smax.romanov@nginx.com
9281555Smax.romanov@nginx.com if (n == (ssize_t) sizeof(nxt_port_msg_t)
9291555Smax.romanov@nginx.com && msg.port_msg.type == _NXT_PORT_MSG_READ_QUEUE)
9301555Smax.romanov@nginx.com {
9311555Smax.romanov@nginx.com nxt_port_buf_free(port, b);
9321555Smax.romanov@nginx.com
9331555Smax.romanov@nginx.com nxt_debug(task, "port{%d,%d} %d: recv %d read_queue",
9341555Smax.romanov@nginx.com (int) port->pid, (int) port->id, port->socket.fd,
9351555Smax.romanov@nginx.com (int) n);
9361555Smax.romanov@nginx.com
9371555Smax.romanov@nginx.com continue;
9381555Smax.romanov@nginx.com }
9391555Smax.romanov@nginx.com
9401555Smax.romanov@nginx.com nxt_debug(task, "port{%d,%d} %d: recvmsg %d",
9411555Smax.romanov@nginx.com (int) port->pid, (int) port->id, port->socket.fd,
9421555Smax.romanov@nginx.com (int) n);
9431555Smax.romanov@nginx.com
9441555Smax.romanov@nginx.com if (n > 0) {
9451555Smax.romanov@nginx.com if (port->from_socket == 0) {
9461555Smax.romanov@nginx.com nxt_debug(task, "port{%d,%d} %d: suspend message %d",
9471555Smax.romanov@nginx.com (int) port->pid, (int) port->id, port->socket.fd,
9481555Smax.romanov@nginx.com (int) n);
9491555Smax.romanov@nginx.com
9501555Smax.romanov@nginx.com smsg = port->socket_msg;
9511555Smax.romanov@nginx.com
9521555Smax.romanov@nginx.com if (nxt_slow_path(smsg == NULL)) {
9531555Smax.romanov@nginx.com smsg = nxt_mp_alloc(port->mem_pool,
9541555Smax.romanov@nginx.com sizeof(nxt_port_recv_msg_t));
9551555Smax.romanov@nginx.com
9561555Smax.romanov@nginx.com if (nxt_slow_path(smsg == NULL)) {
9571555Smax.romanov@nginx.com nxt_alert(task, "port{%d,%d} %d: suspend message "
9581555Smax.romanov@nginx.com "failed",
9591555Smax.romanov@nginx.com (int) port->pid, (int) port->id,
9601555Smax.romanov@nginx.com port->socket.fd);
9611555Smax.romanov@nginx.com
9621555Smax.romanov@nginx.com return;
9631555Smax.romanov@nginx.com }
9641555Smax.romanov@nginx.com
9651555Smax.romanov@nginx.com port->socket_msg = smsg;
9661555Smax.romanov@nginx.com
9671555Smax.romanov@nginx.com } else {
9681555Smax.romanov@nginx.com if (nxt_slow_path(smsg->size != 0)) {
9691555Smax.romanov@nginx.com nxt_alert(task, "port{%d,%d} %d: too many suspend "
9701555Smax.romanov@nginx.com "messages",
9711555Smax.romanov@nginx.com (int) port->pid, (int) port->id,
9721555Smax.romanov@nginx.com port->socket.fd);
9731555Smax.romanov@nginx.com
9741555Smax.romanov@nginx.com return;
9751555Smax.romanov@nginx.com }
9761555Smax.romanov@nginx.com }
9771555Smax.romanov@nginx.com
9781555Smax.romanov@nginx.com smsg->port_msg = msg.port_msg;
9791555Smax.romanov@nginx.com smsg->buf = b;
9801555Smax.romanov@nginx.com smsg->size = n;
9811558Smax.romanov@nginx.com smsg->fd[0] = msg.fd[0];
9821558Smax.romanov@nginx.com smsg->fd[1] = msg.fd[1];
9831555Smax.romanov@nginx.com
9841555Smax.romanov@nginx.com continue;
9851555Smax.romanov@nginx.com }
9861555Smax.romanov@nginx.com
9871555Smax.romanov@nginx.com port->from_socket--;
9881555Smax.romanov@nginx.com }
9891555Smax.romanov@nginx.com }
9901555Smax.romanov@nginx.com
9911555Smax.romanov@nginx.com process:
9921555Smax.romanov@nginx.com
9931555Smax.romanov@nginx.com if (n > 0) {
9941555Smax.romanov@nginx.com msg.buf = b;
9951555Smax.romanov@nginx.com msg.size = n;
9961555Smax.romanov@nginx.com
9971555Smax.romanov@nginx.com nxt_port_read_msg_process(task, port, &msg);
9981555Smax.romanov@nginx.com
9991555Smax.romanov@nginx.com /*
10001555Smax.romanov@nginx.com * To disable instant completion or buffer re-usage,
10011555Smax.romanov@nginx.com * handler should reset 'msg.buf'.
10021555Smax.romanov@nginx.com */
10031555Smax.romanov@nginx.com if (msg.buf == b) {
10041555Smax.romanov@nginx.com nxt_port_buf_free(port, b);
10051555Smax.romanov@nginx.com }
10061555Smax.romanov@nginx.com
10071555Smax.romanov@nginx.com continue;
10081555Smax.romanov@nginx.com }
10091555Smax.romanov@nginx.com
10101555Smax.romanov@nginx.com if (n == NXT_AGAIN) {
10111555Smax.romanov@nginx.com nxt_port_buf_free(port, b);
10121555Smax.romanov@nginx.com
10131555Smax.romanov@nginx.com nxt_fd_event_enable_read(task->thread->engine, &port->socket);
10141555Smax.romanov@nginx.com
10151555Smax.romanov@nginx.com continue;
10161555Smax.romanov@nginx.com }
10171555Smax.romanov@nginx.com
10181555Smax.romanov@nginx.com /* n == 0 || n == NXT_ERROR */
10191555Smax.romanov@nginx.com
10201555Smax.romanov@nginx.com nxt_work_queue_add(&task->thread->engine->fast_work_queue,
10211555Smax.romanov@nginx.com nxt_port_error_handler, task, &port->socket, NULL);
10221555Smax.romanov@nginx.com return;
10231555Smax.romanov@nginx.com }
10241555Smax.romanov@nginx.com }
10251555Smax.romanov@nginx.com
10261555Smax.romanov@nginx.com
10271005Smax.romanov@nginx.com typedef struct {
10281005Smax.romanov@nginx.com uint32_t stream;
10291005Smax.romanov@nginx.com uint32_t pid;
10301005Smax.romanov@nginx.com } nxt_port_frag_key_t;
10311005Smax.romanov@nginx.com
10321005Smax.romanov@nginx.com
1033352Smax.romanov@nginx.com static nxt_int_t
nxt_port_lvlhsh_frag_test(nxt_lvlhsh_query_t * lhq,void * data)1034352Smax.romanov@nginx.com nxt_port_lvlhsh_frag_test(nxt_lvlhsh_query_t *lhq, void *data)
1035352Smax.romanov@nginx.com {
1036352Smax.romanov@nginx.com nxt_port_recv_msg_t *fmsg;
10371005Smax.romanov@nginx.com nxt_port_frag_key_t *frag_key;
1038352Smax.romanov@nginx.com
1039352Smax.romanov@nginx.com fmsg = data;
10401005Smax.romanov@nginx.com frag_key = (nxt_port_frag_key_t *) lhq->key.start;
1041352Smax.romanov@nginx.com
10421005Smax.romanov@nginx.com if (lhq->key.length == sizeof(nxt_port_frag_key_t)
10431005Smax.romanov@nginx.com && frag_key->stream == fmsg->port_msg.stream
10441005Smax.romanov@nginx.com && frag_key->pid == (uint32_t) fmsg->port_msg.pid)
1045352Smax.romanov@nginx.com {
1046352Smax.romanov@nginx.com return NXT_OK;
1047352Smax.romanov@nginx.com }
1048352Smax.romanov@nginx.com
1049352Smax.romanov@nginx.com return NXT_DECLINED;
1050352Smax.romanov@nginx.com }
1051352Smax.romanov@nginx.com
1052352Smax.romanov@nginx.com
1053352Smax.romanov@nginx.com static void *
nxt_port_lvlhsh_frag_alloc(void * ctx,size_t size)1054352Smax.romanov@nginx.com nxt_port_lvlhsh_frag_alloc(void *ctx, size_t size)
1055352Smax.romanov@nginx.com {
10561084Smax.romanov@nginx.com return nxt_mp_align(ctx, size, size);
1057352Smax.romanov@nginx.com }
1058352Smax.romanov@nginx.com
1059352Smax.romanov@nginx.com
1060352Smax.romanov@nginx.com static void
nxt_port_lvlhsh_frag_free(void * ctx,void * p)1061352Smax.romanov@nginx.com nxt_port_lvlhsh_frag_free(void *ctx, void *p)
1062352Smax.romanov@nginx.com {
1063389Smax.romanov@nginx.com nxt_mp_free(ctx, p);
1064352Smax.romanov@nginx.com }
1065352Smax.romanov@nginx.com
1066352Smax.romanov@nginx.com
1067352Smax.romanov@nginx.com static const nxt_lvlhsh_proto_t lvlhsh_frag_proto nxt_aligned(64) = {
1068352Smax.romanov@nginx.com NXT_LVLHSH_DEFAULT,
1069352Smax.romanov@nginx.com nxt_port_lvlhsh_frag_test,
1070352Smax.romanov@nginx.com nxt_port_lvlhsh_frag_alloc,
1071352Smax.romanov@nginx.com nxt_port_lvlhsh_frag_free,
1072352Smax.romanov@nginx.com };
1073352Smax.romanov@nginx.com
1074352Smax.romanov@nginx.com
1075352Smax.romanov@nginx.com static nxt_port_recv_msg_t *
nxt_port_frag_start(nxt_task_t * task,nxt_port_t * port,nxt_port_recv_msg_t * msg)1076352Smax.romanov@nginx.com nxt_port_frag_start(nxt_task_t *task, nxt_port_t *port,
1077352Smax.romanov@nginx.com nxt_port_recv_msg_t *msg)
1078352Smax.romanov@nginx.com {
1079352Smax.romanov@nginx.com nxt_int_t res;
1080352Smax.romanov@nginx.com nxt_lvlhsh_query_t lhq;
1081352Smax.romanov@nginx.com nxt_port_recv_msg_t *fmsg;
10821005Smax.romanov@nginx.com nxt_port_frag_key_t frag_key;
1083352Smax.romanov@nginx.com
1084352Smax.romanov@nginx.com nxt_debug(task, "start frag stream #%uD", msg->port_msg.stream);
1085352Smax.romanov@nginx.com
1086352Smax.romanov@nginx.com fmsg = nxt_mp_alloc(port->mem_pool, sizeof(nxt_port_recv_msg_t));
1087352Smax.romanov@nginx.com
1088352Smax.romanov@nginx.com if (nxt_slow_path(fmsg == NULL)) {
1089352Smax.romanov@nginx.com return NULL;
1090352Smax.romanov@nginx.com }
1091352Smax.romanov@nginx.com
1092352Smax.romanov@nginx.com *fmsg = *msg;
1093352Smax.romanov@nginx.com
10941005Smax.romanov@nginx.com frag_key.stream = fmsg->port_msg.stream;
10951005Smax.romanov@nginx.com frag_key.pid = fmsg->port_msg.pid;
10961005Smax.romanov@nginx.com
10971005Smax.romanov@nginx.com lhq.key_hash = nxt_murmur_hash2(&frag_key, sizeof(nxt_port_frag_key_t));
10981005Smax.romanov@nginx.com lhq.key.length = sizeof(nxt_port_frag_key_t);
10991005Smax.romanov@nginx.com lhq.key.start = (u_char *) &frag_key;
1100352Smax.romanov@nginx.com lhq.proto = &lvlhsh_frag_proto;
1101352Smax.romanov@nginx.com lhq.replace = 0;
1102352Smax.romanov@nginx.com lhq.value = fmsg;
1103352Smax.romanov@nginx.com lhq.pool = port->mem_pool;
1104352Smax.romanov@nginx.com
1105352Smax.romanov@nginx.com res = nxt_lvlhsh_insert(&port->frags, &lhq);
1106352Smax.romanov@nginx.com
1107352Smax.romanov@nginx.com switch (res) {
1108352Smax.romanov@nginx.com
1109352Smax.romanov@nginx.com case NXT_OK:
1110352Smax.romanov@nginx.com return fmsg;
1111352Smax.romanov@nginx.com
1112352Smax.romanov@nginx.com case NXT_DECLINED:
1113352Smax.romanov@nginx.com nxt_log(task, NXT_LOG_WARN, "duplicate frag stream #%uD",
1114352Smax.romanov@nginx.com fmsg->port_msg.stream);
1115352Smax.romanov@nginx.com nxt_mp_free(port->mem_pool, fmsg);
1116352Smax.romanov@nginx.com
1117352Smax.romanov@nginx.com return NULL;
1118352Smax.romanov@nginx.com
1119352Smax.romanov@nginx.com default:
1120352Smax.romanov@nginx.com nxt_log(task, NXT_LOG_WARN, "failed to add frag stream #%uD",
1121352Smax.romanov@nginx.com fmsg->port_msg.stream);
1122352Smax.romanov@nginx.com
1123352Smax.romanov@nginx.com nxt_mp_free(port->mem_pool, fmsg);
1124352Smax.romanov@nginx.com
1125352Smax.romanov@nginx.com return NULL;
1126352Smax.romanov@nginx.com
1127352Smax.romanov@nginx.com }
1128352Smax.romanov@nginx.com }
1129352Smax.romanov@nginx.com
1130352Smax.romanov@nginx.com
1131352Smax.romanov@nginx.com static nxt_port_recv_msg_t *
nxt_port_frag_find(nxt_task_t * task,nxt_port_t * port,nxt_port_recv_msg_t * msg)11321005Smax.romanov@nginx.com nxt_port_frag_find(nxt_task_t *task, nxt_port_t *port, nxt_port_recv_msg_t *msg)
1133352Smax.romanov@nginx.com {
11341005Smax.romanov@nginx.com nxt_int_t res;
11351005Smax.romanov@nginx.com nxt_bool_t last;
11361005Smax.romanov@nginx.com nxt_lvlhsh_query_t lhq;
11371005Smax.romanov@nginx.com nxt_port_frag_key_t frag_key;
11381005Smax.romanov@nginx.com
11391005Smax.romanov@nginx.com last = msg->port_msg.mf == 0;
1140352Smax.romanov@nginx.com
11411005Smax.romanov@nginx.com nxt_debug(task, "%s frag stream #%uD", last ? "last" : "next",
11421005Smax.romanov@nginx.com msg->port_msg.stream);
1143352Smax.romanov@nginx.com
11441005Smax.romanov@nginx.com frag_key.stream = msg->port_msg.stream;
11451005Smax.romanov@nginx.com frag_key.pid = msg->port_msg.pid;
11461005Smax.romanov@nginx.com
11471005Smax.romanov@nginx.com lhq.key_hash = nxt_murmur_hash2(&frag_key, sizeof(nxt_port_frag_key_t));
11481005Smax.romanov@nginx.com lhq.key.length = sizeof(nxt_port_frag_key_t);
11491005Smax.romanov@nginx.com lhq.key.start = (u_char *) &frag_key;
1150352Smax.romanov@nginx.com lhq.proto = &lvlhsh_frag_proto;
1151352Smax.romanov@nginx.com lhq.pool = port->mem_pool;
1152352Smax.romanov@nginx.com
1153352Smax.romanov@nginx.com res = last != 0 ? nxt_lvlhsh_delete(&port->frags, &lhq) :
1154352Smax.romanov@nginx.com nxt_lvlhsh_find(&port->frags, &lhq);
1155352Smax.romanov@nginx.com
1156352Smax.romanov@nginx.com switch (res) {
1157352Smax.romanov@nginx.com
1158352Smax.romanov@nginx.com case NXT_OK:
1159352Smax.romanov@nginx.com return lhq.value;
1160352Smax.romanov@nginx.com
1161352Smax.romanov@nginx.com default:
11621005Smax.romanov@nginx.com nxt_log(task, NXT_LOG_INFO, "frag stream #%uD not found",
11631005Smax.romanov@nginx.com frag_key.stream);
1164352Smax.romanov@nginx.com
1165352Smax.romanov@nginx.com return NULL;
1166352Smax.romanov@nginx.com }
1167352Smax.romanov@nginx.com }
1168352Smax.romanov@nginx.com
1169352Smax.romanov@nginx.com
117011Sigor@sysoev.ru static void
nxt_port_read_msg_process(nxt_task_t * task,nxt_port_t * port,nxt_port_recv_msg_t * msg)117111Sigor@sysoev.ru nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
117282Smax.romanov@nginx.com nxt_port_recv_msg_t *msg)
117311Sigor@sysoev.ru {
11741269Sigor@sysoev.ru nxt_buf_t *b, *orig_b, *next;
1175352Smax.romanov@nginx.com nxt_port_recv_msg_t *fmsg;
117611Sigor@sysoev.ru
117782Smax.romanov@nginx.com if (nxt_slow_path(msg->size < sizeof(nxt_port_msg_t))) {
1178564Svbart@nginx.com nxt_alert(task, "port %d: too small message:%uz",
1179564Svbart@nginx.com port->socket.fd, msg->size);
1180423Smax.romanov@nginx.com
11811996St.nateldemoura@f5.com nxt_port_close_fds(msg->fd);
11821553Smax.romanov@nginx.com
1183423Smax.romanov@nginx.com return;
118411Sigor@sysoev.ru }
118511Sigor@sysoev.ru
118642Smax.romanov@nginx.com /* adjust size to actual buffer used size */
118782Smax.romanov@nginx.com msg->size -= sizeof(nxt_port_msg_t);
118842Smax.romanov@nginx.com
118942Smax.romanov@nginx.com b = orig_b = msg->buf;
119082Smax.romanov@nginx.com b->mem.free += msg->size;
119142Smax.romanov@nginx.com
11921555Smax.romanov@nginx.com msg->cancelled = 0;
119311Sigor@sysoev.ru
1194352Smax.romanov@nginx.com if (nxt_slow_path(msg->port_msg.nf != 0)) {
1195423Smax.romanov@nginx.com
11961005Smax.romanov@nginx.com fmsg = nxt_port_frag_find(task, port, msg);
1197352Smax.romanov@nginx.com
1198551Smax.romanov@nginx.com if (nxt_slow_path(fmsg == NULL)) {
1199551Smax.romanov@nginx.com goto fmsg_failed;
1200551Smax.romanov@nginx.com }
1201423Smax.romanov@nginx.com
1202423Smax.romanov@nginx.com if (nxt_fast_path(fmsg->cancelled == 0)) {
1203423Smax.romanov@nginx.com
1204423Smax.romanov@nginx.com if (msg->port_msg.mmap) {
1205423Smax.romanov@nginx.com nxt_port_mmap_read(task, msg);
1206423Smax.romanov@nginx.com }
1207423Smax.romanov@nginx.com
1208423Smax.romanov@nginx.com nxt_buf_chain_add(&fmsg->buf, msg->buf);
1209423Smax.romanov@nginx.com
1210423Smax.romanov@nginx.com fmsg->size += msg->size;
1211423Smax.romanov@nginx.com msg->buf = NULL;
1212423Smax.romanov@nginx.com b = NULL;
1213423Smax.romanov@nginx.com
1214423Smax.romanov@nginx.com if (nxt_fast_path(msg->port_msg.mf == 0)) {
1215423Smax.romanov@nginx.com
1216423Smax.romanov@nginx.com b = fmsg->buf;
1217423Smax.romanov@nginx.com
1218423Smax.romanov@nginx.com port->handler(task, fmsg);
1219423Smax.romanov@nginx.com
1220423Smax.romanov@nginx.com msg->buf = fmsg->buf;
12211558Smax.romanov@nginx.com msg->fd[0] = fmsg->fd[0];
12221558Smax.romanov@nginx.com msg->fd[1] = fmsg->fd[1];
1223974Smax.romanov@nginx.com
1224974Smax.romanov@nginx.com /*
1225974Smax.romanov@nginx.com * To disable instant completion or buffer re-usage,
1226974Smax.romanov@nginx.com * handler should reset 'msg.buf'.
1227974Smax.romanov@nginx.com */
1228974Smax.romanov@nginx.com if (!msg->port_msg.mmap && msg->buf == b) {
1229974Smax.romanov@nginx.com nxt_port_buf_free(port, b);
1230974Smax.romanov@nginx.com }
1231423Smax.romanov@nginx.com }
1232352Smax.romanov@nginx.com }
1233352Smax.romanov@nginx.com
1234352Smax.romanov@nginx.com if (nxt_fast_path(msg->port_msg.mf == 0)) {
1235352Smax.romanov@nginx.com nxt_mp_free(port->mem_pool, fmsg);
1236352Smax.romanov@nginx.com }
1237352Smax.romanov@nginx.com } else {
1238352Smax.romanov@nginx.com if (nxt_slow_path(msg->port_msg.mf != 0)) {
1239423Smax.romanov@nginx.com
1240423Smax.romanov@nginx.com if (msg->port_msg.mmap && msg->cancelled == 0) {
1241423Smax.romanov@nginx.com nxt_port_mmap_read(task, msg);
1242423Smax.romanov@nginx.com b = msg->buf;
1243423Smax.romanov@nginx.com }
1244423Smax.romanov@nginx.com
1245352Smax.romanov@nginx.com fmsg = nxt_port_frag_start(task, port, msg);
1246352Smax.romanov@nginx.com
1247551Smax.romanov@nginx.com if (nxt_slow_path(fmsg == NULL)) {
1248551Smax.romanov@nginx.com goto fmsg_failed;
1249551Smax.romanov@nginx.com }
1250352Smax.romanov@nginx.com
1251352Smax.romanov@nginx.com fmsg->port_msg.nf = 0;
1252352Smax.romanov@nginx.com fmsg->port_msg.mf = 0;
1253352Smax.romanov@nginx.com
1254423Smax.romanov@nginx.com if (nxt_fast_path(msg->cancelled == 0)) {
1255423Smax.romanov@nginx.com msg->buf = NULL;
12561558Smax.romanov@nginx.com msg->fd[0] = -1;
12571558Smax.romanov@nginx.com msg->fd[1] = -1;
1258423Smax.romanov@nginx.com b = NULL;
1259423Smax.romanov@nginx.com
1260423Smax.romanov@nginx.com } else {
12611996St.nateldemoura@f5.com nxt_port_close_fds(msg->fd);
1262423Smax.romanov@nginx.com }
1263352Smax.romanov@nginx.com } else {
1264423Smax.romanov@nginx.com if (nxt_fast_path(msg->cancelled == 0)) {
1265423Smax.romanov@nginx.com
1266423Smax.romanov@nginx.com if (msg->port_msg.mmap) {
1267423Smax.romanov@nginx.com nxt_port_mmap_read(task, msg);
1268423Smax.romanov@nginx.com b = msg->buf;
1269423Smax.romanov@nginx.com }
1270423Smax.romanov@nginx.com
1271423Smax.romanov@nginx.com port->handler(task, msg);
1272423Smax.romanov@nginx.com }
1273352Smax.romanov@nginx.com }
1274352Smax.romanov@nginx.com }
127542Smax.romanov@nginx.com
1276551Smax.romanov@nginx.com fmsg_failed:
1277551Smax.romanov@nginx.com
127882Smax.romanov@nginx.com if (msg->port_msg.mmap && orig_b != b) {
127942Smax.romanov@nginx.com
1280194Smax.romanov@nginx.com /*
1281194Smax.romanov@nginx.com * To disable instant buffer completion,
1282194Smax.romanov@nginx.com * handler should reset 'msg->buf'.
1283194Smax.romanov@nginx.com */
1284194Smax.romanov@nginx.com if (msg->buf == b) {
1285194Smax.romanov@nginx.com /* complete mmap buffers */
12861269Sigor@sysoev.ru while (b != NULL) {
1287194Smax.romanov@nginx.com nxt_debug(task, "complete buffer %p", b);
1288194Smax.romanov@nginx.com
1289194Smax.romanov@nginx.com nxt_work_queue_add(port->socket.read_work_queue,
1290194Smax.romanov@nginx.com b->completion_handler, task, b, b->parent);
12911269Sigor@sysoev.ru
12921269Sigor@sysoev.ru next = b->next;
12931269Sigor@sysoev.ru b->next = NULL;
12941269Sigor@sysoev.ru b = next;
1295194Smax.romanov@nginx.com }
129642Smax.romanov@nginx.com }
1297194Smax.romanov@nginx.com
1298194Smax.romanov@nginx.com /* restore original buf */
1299194Smax.romanov@nginx.com msg->buf = orig_b;
130042Smax.romanov@nginx.com }
130111Sigor@sysoev.ru }
130211Sigor@sysoev.ru
130311Sigor@sysoev.ru
130411Sigor@sysoev.ru static nxt_buf_t *
nxt_port_buf_alloc(nxt_port_t * port)130511Sigor@sysoev.ru nxt_port_buf_alloc(nxt_port_t *port)
130611Sigor@sysoev.ru {
130711Sigor@sysoev.ru nxt_buf_t *b;
130811Sigor@sysoev.ru
130911Sigor@sysoev.ru if (port->free_bufs != NULL) {
131011Sigor@sysoev.ru b = port->free_bufs;
131111Sigor@sysoev.ru port->free_bufs = b->next;
131211Sigor@sysoev.ru
131311Sigor@sysoev.ru b->mem.pos = b->mem.start;
131411Sigor@sysoev.ru b->mem.free = b->mem.start;
131542Smax.romanov@nginx.com b->next = NULL;
131611Sigor@sysoev.ru } else {
131711Sigor@sysoev.ru b = nxt_buf_mem_alloc(port->mem_pool, port->max_size, 0);
131811Sigor@sysoev.ru if (nxt_slow_path(b == NULL)) {
131911Sigor@sysoev.ru return NULL;
132011Sigor@sysoev.ru }
132111Sigor@sysoev.ru }
132211Sigor@sysoev.ru
132311Sigor@sysoev.ru return b;
132411Sigor@sysoev.ru }
132511Sigor@sysoev.ru
132611Sigor@sysoev.ru
132711Sigor@sysoev.ru static void
nxt_port_buf_free(nxt_port_t * port,nxt_buf_t * b)132811Sigor@sysoev.ru nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b)
132911Sigor@sysoev.ru {
1330974Smax.romanov@nginx.com nxt_buf_chain_add(&b, port->free_bufs);
133111Sigor@sysoev.ru port->free_bufs = b;
133211Sigor@sysoev.ru }
133311Sigor@sysoev.ru
133411Sigor@sysoev.ru
133511Sigor@sysoev.ru static void
nxt_port_error_handler(nxt_task_t * task,void * obj,void * data)133611Sigor@sysoev.ru nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
133711Sigor@sysoev.ru {
1338343Smax.romanov@nginx.com int use_delta;
13391269Sigor@sysoev.ru nxt_buf_t *b, *next;
1340197Smax.romanov@nginx.com nxt_port_t *port;
1341197Smax.romanov@nginx.com nxt_work_queue_t *wq;
1342197Smax.romanov@nginx.com nxt_port_send_msg_t *msg;
1343197Smax.romanov@nginx.com
1344125Smax.romanov@nginx.com nxt_debug(task, "port error handler %p", obj);
134511Sigor@sysoev.ru /* TODO */
1346197Smax.romanov@nginx.com
1347197Smax.romanov@nginx.com port = nxt_container_of(obj, nxt_port_t, socket);
1348197Smax.romanov@nginx.com
1349343Smax.romanov@nginx.com use_delta = 0;
1350343Smax.romanov@nginx.com
1351343Smax.romanov@nginx.com if (obj == data) {
1352343Smax.romanov@nginx.com use_delta--;
1353343Smax.romanov@nginx.com }
1354197Smax.romanov@nginx.com
1355343Smax.romanov@nginx.com wq = &task->thread->engine->fast_work_queue;
1356343Smax.romanov@nginx.com
1357343Smax.romanov@nginx.com nxt_thread_mutex_lock(&port->write_mutex);
1358343Smax.romanov@nginx.com
1359343Smax.romanov@nginx.com nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) {
1360197Smax.romanov@nginx.com
13611908Smax.romanov@nginx.com nxt_port_msg_close_fd(msg);
13621553Smax.romanov@nginx.com
13631269Sigor@sysoev.ru for (b = msg->buf; b != NULL; b = next) {
13641269Sigor@sysoev.ru next = b->next;
13651269Sigor@sysoev.ru b->next = NULL;
13661269Sigor@sysoev.ru
1367197Smax.romanov@nginx.com if (nxt_buf_is_sync(b)) {
1368197Smax.romanov@nginx.com continue;
1369197Smax.romanov@nginx.com }
1370197Smax.romanov@nginx.com
1371197Smax.romanov@nginx.com nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
1372197Smax.romanov@nginx.com }
1373197Smax.romanov@nginx.com
1374197Smax.romanov@nginx.com nxt_queue_remove(&msg->link);
1375343Smax.romanov@nginx.com use_delta--;
13761125Smax.romanov@nginx.com
13771125Smax.romanov@nginx.com nxt_port_release_send_msg(msg);
1378197Smax.romanov@nginx.com
1379197Smax.romanov@nginx.com } nxt_queue_loop;
1380343Smax.romanov@nginx.com
1381343Smax.romanov@nginx.com nxt_thread_mutex_unlock(&port->write_mutex);
1382343Smax.romanov@nginx.com
1383343Smax.romanov@nginx.com if (use_delta != 0) {
1384343Smax.romanov@nginx.com nxt_port_use(task, port, use_delta);
1385343Smax.romanov@nginx.com }
138611Sigor@sysoev.ru }
1387