111Sigor@sysoev.ru 211Sigor@sysoev.ru /* 311Sigor@sysoev.ru * Copyright (C) Igor Sysoev 411Sigor@sysoev.ru * Copyright (C) NGINX, Inc. 511Sigor@sysoev.ru */ 611Sigor@sysoev.ru 711Sigor@sysoev.ru #include <nxt_main.h> 811Sigor@sysoev.ru 911Sigor@sysoev.ru 1011Sigor@sysoev.ru static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data); 1111Sigor@sysoev.ru static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data); 1211Sigor@sysoev.ru static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, 1342Smax.romanov@nginx.com nxt_port_recv_msg_t *msg, size_t size); 1411Sigor@sysoev.ru static nxt_buf_t *nxt_port_buf_alloc(nxt_port_t *port); 1511Sigor@sysoev.ru static void nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b); 1611Sigor@sysoev.ru static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data); 1711Sigor@sysoev.ru 1811Sigor@sysoev.ru 1914Sigor@sysoev.ru nxt_int_t 2014Sigor@sysoev.ru nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port, size_t max_size) 2111Sigor@sysoev.ru { 22*65Sigor@sysoev.ru nxt_mp_t *mp; 23*65Sigor@sysoev.ru nxt_int_t sndbuf, rcvbuf, size; 24*65Sigor@sysoev.ru nxt_socket_t snd, rcv; 2511Sigor@sysoev.ru 2614Sigor@sysoev.ru port->socket.task = task; 2714Sigor@sysoev.ru 2814Sigor@sysoev.ru port->pair[0] = -1; 2914Sigor@sysoev.ru port->pair[1] = -1; 3014Sigor@sysoev.ru 3114Sigor@sysoev.ru nxt_queue_init(&port->messages); 3214Sigor@sysoev.ru 33*65Sigor@sysoev.ru mp = nxt_mp_create(1024, 128, 256, 32); 3414Sigor@sysoev.ru if (nxt_slow_path(mp == NULL)) { 3514Sigor@sysoev.ru return NXT_ERROR; 3611Sigor@sysoev.ru } 3711Sigor@sysoev.ru 3814Sigor@sysoev.ru port->mem_pool = mp; 3911Sigor@sysoev.ru 4013Sigor@sysoev.ru if (nxt_slow_path(nxt_socketpair_create(task, port->pair) != NXT_OK)) { 4111Sigor@sysoev.ru goto socketpair_fail; 4211Sigor@sysoev.ru } 4311Sigor@sysoev.ru 4411Sigor@sysoev.ru snd = port->pair[1]; 4511Sigor@sysoev.ru 4613Sigor@sysoev.ru sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF); 4711Sigor@sysoev.ru if (nxt_slow_path(sndbuf < 0)) { 4811Sigor@sysoev.ru goto getsockopt_fail; 4911Sigor@sysoev.ru } 5011Sigor@sysoev.ru 5111Sigor@sysoev.ru rcv = port->pair[0]; 5211Sigor@sysoev.ru 5313Sigor@sysoev.ru rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF); 5411Sigor@sysoev.ru if (nxt_slow_path(rcvbuf < 0)) { 5511Sigor@sysoev.ru goto getsockopt_fail; 5611Sigor@sysoev.ru } 5711Sigor@sysoev.ru 5811Sigor@sysoev.ru if (max_size == 0) { 5911Sigor@sysoev.ru max_size = 16 * 1024; 6011Sigor@sysoev.ru } 6111Sigor@sysoev.ru 6211Sigor@sysoev.ru if ((size_t) sndbuf < max_size) { 6311Sigor@sysoev.ru /* 6411Sigor@sysoev.ru * On Unix domain sockets 6511Sigor@sysoev.ru * Linux uses 224K on both send and receive directions; 6611Sigor@sysoev.ru * FreeBSD, MacOSX, NetBSD, and OpenBSD use 2K buffer size 6711Sigor@sysoev.ru * on send direction and 4K buffer size on receive direction; 6811Sigor@sysoev.ru * Solaris uses 16K on send direction and 5K on receive direction. 6911Sigor@sysoev.ru */ 7013Sigor@sysoev.ru (void) nxt_socket_setsockopt(task, snd, SOL_SOCKET, SO_SNDBUF, 7113Sigor@sysoev.ru max_size); 7211Sigor@sysoev.ru 7313Sigor@sysoev.ru sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF); 7411Sigor@sysoev.ru if (nxt_slow_path(sndbuf < 0)) { 7511Sigor@sysoev.ru goto getsockopt_fail; 7611Sigor@sysoev.ru } 7711Sigor@sysoev.ru 7811Sigor@sysoev.ru size = sndbuf * 4; 7911Sigor@sysoev.ru 8011Sigor@sysoev.ru if (rcvbuf < size) { 8113Sigor@sysoev.ru (void) nxt_socket_setsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF, 8213Sigor@sysoev.ru size); 8311Sigor@sysoev.ru 8413Sigor@sysoev.ru rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF); 8511Sigor@sysoev.ru if (nxt_slow_path(rcvbuf < 0)) { 8611Sigor@sysoev.ru goto getsockopt_fail; 8711Sigor@sysoev.ru } 8811Sigor@sysoev.ru } 8911Sigor@sysoev.ru } 9011Sigor@sysoev.ru 9111Sigor@sysoev.ru port->max_size = nxt_min(max_size, (size_t) sndbuf); 9211Sigor@sysoev.ru port->max_share = (64 * 1024); 9311Sigor@sysoev.ru 9414Sigor@sysoev.ru return NXT_OK; 9511Sigor@sysoev.ru 9611Sigor@sysoev.ru getsockopt_fail: 9711Sigor@sysoev.ru 9813Sigor@sysoev.ru nxt_socket_close(task, port->pair[0]); 9913Sigor@sysoev.ru nxt_socket_close(task, port->pair[1]); 10011Sigor@sysoev.ru 10111Sigor@sysoev.ru socketpair_fail: 10211Sigor@sysoev.ru 103*65Sigor@sysoev.ru nxt_mp_destroy(port->mem_pool); 10411Sigor@sysoev.ru 10514Sigor@sysoev.ru return NXT_ERROR; 10611Sigor@sysoev.ru } 10711Sigor@sysoev.ru 10811Sigor@sysoev.ru 10911Sigor@sysoev.ru void 11011Sigor@sysoev.ru nxt_port_destroy(nxt_port_t *port) 11111Sigor@sysoev.ru { 11213Sigor@sysoev.ru nxt_socket_close(port->socket.task, port->socket.fd); 113*65Sigor@sysoev.ru nxt_mp_destroy(port->mem_pool); 11411Sigor@sysoev.ru } 11511Sigor@sysoev.ru 11611Sigor@sysoev.ru 11711Sigor@sysoev.ru void 11811Sigor@sysoev.ru nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port) 11911Sigor@sysoev.ru { 12011Sigor@sysoev.ru port->socket.fd = port->pair[1]; 12111Sigor@sysoev.ru port->socket.log = &nxt_main_log; 12211Sigor@sysoev.ru port->socket.write_ready = 1; 12311Sigor@sysoev.ru 12411Sigor@sysoev.ru port->socket.write_work_queue = &task->thread->engine->fast_work_queue; 12511Sigor@sysoev.ru port->socket.write_handler = nxt_port_write_handler; 12611Sigor@sysoev.ru port->socket.error_handler = nxt_port_error_handler; 12711Sigor@sysoev.ru } 12811Sigor@sysoev.ru 12911Sigor@sysoev.ru 13011Sigor@sysoev.ru void 13111Sigor@sysoev.ru nxt_port_write_close(nxt_port_t *port) 13211Sigor@sysoev.ru { 13313Sigor@sysoev.ru nxt_socket_close(port->socket.task, port->pair[1]); 13411Sigor@sysoev.ru port->pair[1] = -1; 13511Sigor@sysoev.ru } 13611Sigor@sysoev.ru 13711Sigor@sysoev.ru 13811Sigor@sysoev.ru nxt_int_t 13914Sigor@sysoev.ru nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, 14042Smax.romanov@nginx.com nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b) 14111Sigor@sysoev.ru { 14211Sigor@sysoev.ru nxt_queue_link_t *link; 14311Sigor@sysoev.ru nxt_port_send_msg_t *msg; 14411Sigor@sysoev.ru 14511Sigor@sysoev.ru for (link = nxt_queue_first(&port->messages); 14611Sigor@sysoev.ru link != nxt_queue_tail(&port->messages); 14711Sigor@sysoev.ru link = nxt_queue_next(link)) 14811Sigor@sysoev.ru { 14911Sigor@sysoev.ru msg = (nxt_port_send_msg_t *) link; 15011Sigor@sysoev.ru 15111Sigor@sysoev.ru if (msg->port_msg.stream == stream) { 15211Sigor@sysoev.ru /* 15311Sigor@sysoev.ru * An fd is ignored since a file descriptor 15411Sigor@sysoev.ru * must be sent only in the first message of a stream. 15511Sigor@sysoev.ru */ 15611Sigor@sysoev.ru nxt_buf_chain_add(&msg->buf, b); 15711Sigor@sysoev.ru 15811Sigor@sysoev.ru return NXT_OK; 15911Sigor@sysoev.ru } 16011Sigor@sysoev.ru } 16111Sigor@sysoev.ru 162*65Sigor@sysoev.ru msg = nxt_mp_zalloc(port->mem_pool, sizeof(nxt_port_send_msg_t)); 16311Sigor@sysoev.ru if (nxt_slow_path(msg == NULL)) { 16411Sigor@sysoev.ru return NXT_ERROR; 16511Sigor@sysoev.ru } 16611Sigor@sysoev.ru 16711Sigor@sysoev.ru msg->buf = b; 16811Sigor@sysoev.ru msg->fd = fd; 16911Sigor@sysoev.ru msg->share = 0; 17011Sigor@sysoev.ru 17111Sigor@sysoev.ru msg->port_msg.stream = stream; 17242Smax.romanov@nginx.com msg->port_msg.pid = nxt_pid; 17342Smax.romanov@nginx.com msg->port_msg.reply_port = reply_port; 17411Sigor@sysoev.ru msg->port_msg.type = type; 17511Sigor@sysoev.ru msg->port_msg.last = 0; 17642Smax.romanov@nginx.com msg->port_msg.mmap = 0; 17711Sigor@sysoev.ru 17811Sigor@sysoev.ru nxt_queue_insert_tail(&port->messages, &msg->link); 17911Sigor@sysoev.ru 18011Sigor@sysoev.ru if (port->socket.write_ready) { 18111Sigor@sysoev.ru nxt_port_write_handler(task, port, NULL); 18211Sigor@sysoev.ru } 18311Sigor@sysoev.ru 18411Sigor@sysoev.ru return NXT_OK; 18511Sigor@sysoev.ru } 18611Sigor@sysoev.ru 18711Sigor@sysoev.ru 18811Sigor@sysoev.ru static void 18911Sigor@sysoev.ru nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) 19011Sigor@sysoev.ru { 19111Sigor@sysoev.ru ssize_t n; 19211Sigor@sysoev.ru nxt_port_t *port; 19314Sigor@sysoev.ru struct iovec iov[NXT_IOBUF_MAX]; 19411Sigor@sysoev.ru nxt_queue_link_t *link; 19511Sigor@sysoev.ru nxt_port_send_msg_t *msg; 19611Sigor@sysoev.ru nxt_sendbuf_coalesce_t sb; 19742Smax.romanov@nginx.com nxt_port_method_t m; 19842Smax.romanov@nginx.com 19942Smax.romanov@nginx.com size_t plain_size; 20042Smax.romanov@nginx.com nxt_buf_t *plain_buf; 20111Sigor@sysoev.ru 20211Sigor@sysoev.ru port = obj; 20311Sigor@sysoev.ru 20411Sigor@sysoev.ru do { 20511Sigor@sysoev.ru link = nxt_queue_first(&port->messages); 20611Sigor@sysoev.ru 20711Sigor@sysoev.ru if (link == nxt_queue_tail(&port->messages)) { 20812Sigor@sysoev.ru nxt_fd_event_block_write(task->thread->engine, &port->socket); 20911Sigor@sysoev.ru return; 21011Sigor@sysoev.ru } 21111Sigor@sysoev.ru 21211Sigor@sysoev.ru msg = (nxt_port_send_msg_t *) link; 21311Sigor@sysoev.ru 21414Sigor@sysoev.ru iov[0].iov_base = &msg->port_msg; 21514Sigor@sysoev.ru iov[0].iov_len = sizeof(nxt_port_msg_t); 21611Sigor@sysoev.ru 21711Sigor@sysoev.ru sb.buf = msg->buf; 21814Sigor@sysoev.ru sb.iobuf = &iov[1]; 21911Sigor@sysoev.ru sb.nmax = NXT_IOBUF_MAX - 1; 22011Sigor@sysoev.ru sb.sync = 0; 22111Sigor@sysoev.ru sb.last = 0; 22242Smax.romanov@nginx.com sb.size = 0; 22311Sigor@sysoev.ru sb.limit = port->max_size; 22411Sigor@sysoev.ru 22542Smax.romanov@nginx.com m = nxt_port_mmap_get_method(task, port, msg->buf); 22642Smax.romanov@nginx.com 22742Smax.romanov@nginx.com if (m == NXT_PORT_METHOD_MMAP) { 22842Smax.romanov@nginx.com sb.limit = (1ULL << 31) - 1; 22942Smax.romanov@nginx.com } 23042Smax.romanov@nginx.com 23142Smax.romanov@nginx.com nxt_sendbuf_mem_coalesce(task, &sb); 23242Smax.romanov@nginx.com 23342Smax.romanov@nginx.com plain_size = sb.size; 23442Smax.romanov@nginx.com plain_buf = msg->buf; 23542Smax.romanov@nginx.com 23642Smax.romanov@nginx.com /* 23742Smax.romanov@nginx.com * Send through mmap enabled only when payload 23842Smax.romanov@nginx.com * is bigger than PORT_MMAP_MIN_SIZE. 23942Smax.romanov@nginx.com */ 24042Smax.romanov@nginx.com if (m == NXT_PORT_METHOD_MMAP && plain_size > PORT_MMAP_MIN_SIZE) { 24142Smax.romanov@nginx.com nxt_port_mmap_write(task, port, msg, &sb); 24242Smax.romanov@nginx.com 24342Smax.romanov@nginx.com } else { 24442Smax.romanov@nginx.com m = NXT_PORT_METHOD_PLAIN; 24542Smax.romanov@nginx.com } 24611Sigor@sysoev.ru 24711Sigor@sysoev.ru msg->port_msg.last = sb.last; 24811Sigor@sysoev.ru 24942Smax.romanov@nginx.com n = nxt_socketpair_send(&port->socket, msg->fd, iov, sb.niov + 1); 25011Sigor@sysoev.ru 25111Sigor@sysoev.ru if (n > 0) { 25242Smax.romanov@nginx.com if (nxt_slow_path((size_t) n != sb.size + iov[0].iov_len)) { 25311Sigor@sysoev.ru nxt_log(task, NXT_LOG_CRIT, 25411Sigor@sysoev.ru "port %d: short write: %z instead of %uz", 25542Smax.romanov@nginx.com port->socket.fd, n, sb.size + iov[0].iov_len); 25611Sigor@sysoev.ru goto fail; 25711Sigor@sysoev.ru } 25811Sigor@sysoev.ru 25942Smax.romanov@nginx.com if (msg->buf != plain_buf) { 26042Smax.romanov@nginx.com /* 26142Smax.romanov@nginx.com * Complete crafted mmap_msgs buf and restore msg->buf 26242Smax.romanov@nginx.com * for regular completion call. 26342Smax.romanov@nginx.com */ 26442Smax.romanov@nginx.com nxt_port_mmap_completion(task, 26542Smax.romanov@nginx.com port->socket.write_work_queue, 26642Smax.romanov@nginx.com msg->buf); 26742Smax.romanov@nginx.com 26842Smax.romanov@nginx.com msg->buf = plain_buf; 26942Smax.romanov@nginx.com } 27042Smax.romanov@nginx.com 27111Sigor@sysoev.ru msg->buf = nxt_sendbuf_completion(task, 27211Sigor@sysoev.ru port->socket.write_work_queue, 27311Sigor@sysoev.ru msg->buf, 27442Smax.romanov@nginx.com plain_size); 27511Sigor@sysoev.ru 27611Sigor@sysoev.ru if (msg->buf != NULL) { 27711Sigor@sysoev.ru /* 27811Sigor@sysoev.ru * A file descriptor is sent only 27911Sigor@sysoev.ru * in the first message of a stream. 28011Sigor@sysoev.ru */ 28111Sigor@sysoev.ru msg->fd = -1; 28211Sigor@sysoev.ru msg->share += n; 28311Sigor@sysoev.ru 28411Sigor@sysoev.ru if (msg->share >= port->max_share) { 28511Sigor@sysoev.ru msg->share = 0; 28611Sigor@sysoev.ru nxt_queue_remove(link); 28711Sigor@sysoev.ru nxt_queue_insert_tail(&port->messages, link); 28811Sigor@sysoev.ru } 28911Sigor@sysoev.ru 29011Sigor@sysoev.ru } else { 29111Sigor@sysoev.ru nxt_queue_remove(link); 292*65Sigor@sysoev.ru nxt_mp_free(port->mem_pool, msg); 29311Sigor@sysoev.ru } 29411Sigor@sysoev.ru 29511Sigor@sysoev.ru } else if (nxt_slow_path(n == NXT_ERROR)) { 29611Sigor@sysoev.ru goto fail; 29711Sigor@sysoev.ru } 29811Sigor@sysoev.ru 29911Sigor@sysoev.ru /* n == NXT_AGAIN */ 30011Sigor@sysoev.ru 30111Sigor@sysoev.ru } while (port->socket.write_ready); 30211Sigor@sysoev.ru 30312Sigor@sysoev.ru if (nxt_fd_event_is_disabled(port->socket.write)) { 30412Sigor@sysoev.ru nxt_fd_event_enable_write(task->thread->engine, &port->socket); 30511Sigor@sysoev.ru } 30611Sigor@sysoev.ru 30711Sigor@sysoev.ru return; 30811Sigor@sysoev.ru 30911Sigor@sysoev.ru fail: 31011Sigor@sysoev.ru 31111Sigor@sysoev.ru nxt_work_queue_add(&task->thread->engine->fast_work_queue, 31211Sigor@sysoev.ru nxt_port_error_handler, task, &port->socket, NULL); 31311Sigor@sysoev.ru } 31411Sigor@sysoev.ru 31511Sigor@sysoev.ru 31611Sigor@sysoev.ru void 31711Sigor@sysoev.ru nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port) 31811Sigor@sysoev.ru { 31911Sigor@sysoev.ru port->socket.fd = port->pair[0]; 32011Sigor@sysoev.ru port->socket.log = &nxt_main_log; 32111Sigor@sysoev.ru 32211Sigor@sysoev.ru port->socket.read_work_queue = &task->thread->engine->fast_work_queue; 32311Sigor@sysoev.ru port->socket.read_handler = nxt_port_read_handler; 32411Sigor@sysoev.ru port->socket.error_handler = nxt_port_error_handler; 32511Sigor@sysoev.ru 32612Sigor@sysoev.ru nxt_fd_event_enable_read(task->thread->engine, &port->socket); 32711Sigor@sysoev.ru } 32811Sigor@sysoev.ru 32911Sigor@sysoev.ru 33011Sigor@sysoev.ru void 33111Sigor@sysoev.ru nxt_port_read_close(nxt_port_t *port) 33211Sigor@sysoev.ru { 33313Sigor@sysoev.ru nxt_socket_close(port->socket.task, port->pair[0]); 33411Sigor@sysoev.ru port->pair[0] = -1; 33511Sigor@sysoev.ru } 33611Sigor@sysoev.ru 33711Sigor@sysoev.ru 33811Sigor@sysoev.ru static void 33911Sigor@sysoev.ru nxt_port_read_handler(nxt_task_t *task, void *obj, void *data) 34011Sigor@sysoev.ru { 34142Smax.romanov@nginx.com ssize_t n; 34242Smax.romanov@nginx.com nxt_buf_t *b; 34342Smax.romanov@nginx.com nxt_port_t *port; 34442Smax.romanov@nginx.com struct iovec iov[2]; 34542Smax.romanov@nginx.com nxt_port_recv_msg_t msg; 34611Sigor@sysoev.ru 34742Smax.romanov@nginx.com port = msg.port = obj; 34811Sigor@sysoev.ru 34911Sigor@sysoev.ru for ( ;; ) { 35011Sigor@sysoev.ru 35111Sigor@sysoev.ru b = nxt_port_buf_alloc(port); 35211Sigor@sysoev.ru 35311Sigor@sysoev.ru if (nxt_slow_path(b == NULL)) { 35411Sigor@sysoev.ru /* TODO: disable event for some time */ 35511Sigor@sysoev.ru } 35611Sigor@sysoev.ru 35742Smax.romanov@nginx.com iov[0].iov_base = &msg.port_msg; 35814Sigor@sysoev.ru iov[0].iov_len = sizeof(nxt_port_msg_t); 35911Sigor@sysoev.ru 36014Sigor@sysoev.ru iov[1].iov_base = b->mem.pos; 36114Sigor@sysoev.ru iov[1].iov_len = port->max_size; 36214Sigor@sysoev.ru 36342Smax.romanov@nginx.com n = nxt_socketpair_recv(&port->socket, &msg.fd, iov, 2); 36411Sigor@sysoev.ru 36511Sigor@sysoev.ru if (n > 0) { 36642Smax.romanov@nginx.com 36742Smax.romanov@nginx.com msg.buf = b; 36842Smax.romanov@nginx.com 36942Smax.romanov@nginx.com nxt_port_read_msg_process(task, port, &msg, n); 37011Sigor@sysoev.ru 37111Sigor@sysoev.ru if (b->mem.pos == b->mem.free) { 37211Sigor@sysoev.ru nxt_port_buf_free(port, b); 37311Sigor@sysoev.ru } 37411Sigor@sysoev.ru 37511Sigor@sysoev.ru if (port->socket.read_ready) { 37611Sigor@sysoev.ru continue; 37711Sigor@sysoev.ru } 37811Sigor@sysoev.ru 37911Sigor@sysoev.ru return; 38011Sigor@sysoev.ru } 38111Sigor@sysoev.ru 38211Sigor@sysoev.ru if (n == NXT_AGAIN) { 38311Sigor@sysoev.ru nxt_port_buf_free(port, b); 38411Sigor@sysoev.ru 38512Sigor@sysoev.ru nxt_fd_event_enable_read(task->thread->engine, &port->socket); 38611Sigor@sysoev.ru return; 38711Sigor@sysoev.ru } 38811Sigor@sysoev.ru 38911Sigor@sysoev.ru /* n == 0 || n == NXT_ERROR */ 39011Sigor@sysoev.ru 39111Sigor@sysoev.ru nxt_work_queue_add(&task->thread->engine->fast_work_queue, 39211Sigor@sysoev.ru nxt_port_error_handler, task, &port->socket, NULL); 39311Sigor@sysoev.ru return; 39411Sigor@sysoev.ru } 39511Sigor@sysoev.ru } 39611Sigor@sysoev.ru 39711Sigor@sysoev.ru 39811Sigor@sysoev.ru static void 39911Sigor@sysoev.ru nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, 40042Smax.romanov@nginx.com nxt_port_recv_msg_t *msg, size_t size) 40111Sigor@sysoev.ru { 40242Smax.romanov@nginx.com nxt_buf_t *b; 40342Smax.romanov@nginx.com nxt_buf_t *orig_b; 40442Smax.romanov@nginx.com nxt_buf_t **last_next; 40511Sigor@sysoev.ru 40611Sigor@sysoev.ru if (nxt_slow_path(size < sizeof(nxt_port_msg_t))) { 40711Sigor@sysoev.ru nxt_log(port->socket.task, NXT_LOG_CRIT, 40811Sigor@sysoev.ru "port %d: too small message:%uz", port->socket.fd, size); 40911Sigor@sysoev.ru goto fail; 41011Sigor@sysoev.ru } 41111Sigor@sysoev.ru 41242Smax.romanov@nginx.com /* adjust size to actual buffer used size */ 41342Smax.romanov@nginx.com size -= sizeof(nxt_port_msg_t); 41442Smax.romanov@nginx.com 41542Smax.romanov@nginx.com b = orig_b = msg->buf; 41642Smax.romanov@nginx.com b->mem.free += size; 41742Smax.romanov@nginx.com 41842Smax.romanov@nginx.com if (msg->port_msg.mmap) { 41942Smax.romanov@nginx.com nxt_port_mmap_read(task, port, msg, size); 42042Smax.romanov@nginx.com b = msg->buf; 42142Smax.romanov@nginx.com } 42211Sigor@sysoev.ru 42342Smax.romanov@nginx.com last_next = &b->next; 42411Sigor@sysoev.ru 42542Smax.romanov@nginx.com if (msg->port_msg.last) { 42642Smax.romanov@nginx.com /* find reference to last next, the NULL one */ 42742Smax.romanov@nginx.com while (*last_next) { 42842Smax.romanov@nginx.com last_next = &(*last_next)->next; 42942Smax.romanov@nginx.com } 43042Smax.romanov@nginx.com 43142Smax.romanov@nginx.com *last_next = nxt_buf_sync_alloc(port->mem_pool, NXT_BUF_SYNC_LAST); 43242Smax.romanov@nginx.com if (nxt_slow_path(*last_next == NULL)) { 43311Sigor@sysoev.ru goto fail; 43411Sigor@sysoev.ru } 43542Smax.romanov@nginx.com } 43611Sigor@sysoev.ru 43742Smax.romanov@nginx.com port->handler(task, msg); 43842Smax.romanov@nginx.com 43942Smax.romanov@nginx.com if (*last_next != NULL) { 44042Smax.romanov@nginx.com /* A sync buffer */ 44142Smax.romanov@nginx.com nxt_buf_free(port->mem_pool, *last_next); 44242Smax.romanov@nginx.com *last_next = NULL; 44311Sigor@sysoev.ru } 44411Sigor@sysoev.ru 44542Smax.romanov@nginx.com if (orig_b != b) { 44642Smax.romanov@nginx.com /* complete mmap buffers */ 44742Smax.romanov@nginx.com for (; b && nxt_buf_used_size(b) == 0; 44842Smax.romanov@nginx.com b = b->next) { 44942Smax.romanov@nginx.com nxt_debug(task, "complete buffer %p", b); 45042Smax.romanov@nginx.com 45142Smax.romanov@nginx.com nxt_work_queue_add(port->socket.read_work_queue, 45242Smax.romanov@nginx.com b->completion_handler, task, b, b->parent); 45342Smax.romanov@nginx.com } 45442Smax.romanov@nginx.com } 45511Sigor@sysoev.ru 45611Sigor@sysoev.ru return; 45711Sigor@sysoev.ru 45811Sigor@sysoev.ru fail: 45911Sigor@sysoev.ru 46042Smax.romanov@nginx.com if (msg->fd != -1) { 46142Smax.romanov@nginx.com nxt_fd_close(msg->fd); 46211Sigor@sysoev.ru } 46311Sigor@sysoev.ru } 46411Sigor@sysoev.ru 46511Sigor@sysoev.ru 46611Sigor@sysoev.ru static nxt_buf_t * 46711Sigor@sysoev.ru nxt_port_buf_alloc(nxt_port_t *port) 46811Sigor@sysoev.ru { 46911Sigor@sysoev.ru nxt_buf_t *b; 47011Sigor@sysoev.ru 47111Sigor@sysoev.ru if (port->free_bufs != NULL) { 47211Sigor@sysoev.ru b = port->free_bufs; 47311Sigor@sysoev.ru port->free_bufs = b->next; 47411Sigor@sysoev.ru 47511Sigor@sysoev.ru b->mem.pos = b->mem.start; 47611Sigor@sysoev.ru b->mem.free = b->mem.start; 47742Smax.romanov@nginx.com b->next = NULL; 47811Sigor@sysoev.ru 47911Sigor@sysoev.ru } else { 48011Sigor@sysoev.ru b = nxt_buf_mem_alloc(port->mem_pool, port->max_size, 0); 48111Sigor@sysoev.ru if (nxt_slow_path(b == NULL)) { 48211Sigor@sysoev.ru return NULL; 48311Sigor@sysoev.ru } 48411Sigor@sysoev.ru } 48511Sigor@sysoev.ru 48611Sigor@sysoev.ru return b; 48711Sigor@sysoev.ru } 48811Sigor@sysoev.ru 48911Sigor@sysoev.ru 49011Sigor@sysoev.ru static void 49111Sigor@sysoev.ru nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b) 49211Sigor@sysoev.ru { 49311Sigor@sysoev.ru b->next = port->free_bufs; 49411Sigor@sysoev.ru port->free_bufs = b; 49511Sigor@sysoev.ru } 49611Sigor@sysoev.ru 49711Sigor@sysoev.ru 49811Sigor@sysoev.ru static void 49911Sigor@sysoev.ru nxt_port_error_handler(nxt_task_t *task, void *obj, void *data) 50011Sigor@sysoev.ru { 50111Sigor@sysoev.ru /* TODO */ 50211Sigor@sysoev.ru } 503