111Sigor@sysoev.ru 211Sigor@sysoev.ru /* 311Sigor@sysoev.ru * Copyright (C) Igor Sysoev 411Sigor@sysoev.ru * Copyright (C) NGINX, Inc. 511Sigor@sysoev.ru */ 611Sigor@sysoev.ru 711Sigor@sysoev.ru #include <nxt_main.h> 811Sigor@sysoev.ru 911Sigor@sysoev.ru 1011Sigor@sysoev.ru static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data); 1111Sigor@sysoev.ru static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data); 1211Sigor@sysoev.ru static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, 1382Smax.romanov@nginx.com nxt_port_recv_msg_t *msg); 1411Sigor@sysoev.ru static nxt_buf_t *nxt_port_buf_alloc(nxt_port_t *port); 1511Sigor@sysoev.ru static void nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b); 1611Sigor@sysoev.ru static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data); 1711Sigor@sysoev.ru 1811Sigor@sysoev.ru 1914Sigor@sysoev.ru nxt_int_t 2014Sigor@sysoev.ru nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port, size_t max_size) 2111Sigor@sysoev.ru { 2265Sigor@sysoev.ru nxt_int_t sndbuf, rcvbuf, size; 2365Sigor@sysoev.ru nxt_socket_t snd, rcv; 2411Sigor@sysoev.ru 2514Sigor@sysoev.ru port->socket.task = task; 2614Sigor@sysoev.ru 2714Sigor@sysoev.ru port->pair[0] = -1; 2814Sigor@sysoev.ru port->pair[1] = -1; 2914Sigor@sysoev.ru 3013Sigor@sysoev.ru if (nxt_slow_path(nxt_socketpair_create(task, port->pair) != NXT_OK)) { 3111Sigor@sysoev.ru goto socketpair_fail; 3211Sigor@sysoev.ru } 3311Sigor@sysoev.ru 3411Sigor@sysoev.ru snd = port->pair[1]; 3511Sigor@sysoev.ru 3613Sigor@sysoev.ru sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF); 3711Sigor@sysoev.ru if (nxt_slow_path(sndbuf < 0)) { 3811Sigor@sysoev.ru goto getsockopt_fail; 3911Sigor@sysoev.ru } 4011Sigor@sysoev.ru 4111Sigor@sysoev.ru rcv = port->pair[0]; 4211Sigor@sysoev.ru 4313Sigor@sysoev.ru rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF); 4411Sigor@sysoev.ru if (nxt_slow_path(rcvbuf < 0)) { 4511Sigor@sysoev.ru goto getsockopt_fail; 4611Sigor@sysoev.ru } 4711Sigor@sysoev.ru 4811Sigor@sysoev.ru if (max_size == 0) { 4911Sigor@sysoev.ru max_size = 16 * 1024; 5011Sigor@sysoev.ru } 5111Sigor@sysoev.ru 5211Sigor@sysoev.ru if ((size_t) sndbuf < max_size) { 5311Sigor@sysoev.ru /* 5411Sigor@sysoev.ru * On Unix domain sockets 5511Sigor@sysoev.ru * Linux uses 224K on both send and receive directions; 5611Sigor@sysoev.ru * FreeBSD, MacOSX, NetBSD, and OpenBSD use 2K buffer size 5711Sigor@sysoev.ru * on send direction and 4K buffer size on receive direction; 5811Sigor@sysoev.ru * Solaris uses 16K on send direction and 5K on receive direction. 5911Sigor@sysoev.ru */ 6013Sigor@sysoev.ru (void) nxt_socket_setsockopt(task, snd, SOL_SOCKET, SO_SNDBUF, 6113Sigor@sysoev.ru max_size); 6211Sigor@sysoev.ru 6313Sigor@sysoev.ru sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF); 6411Sigor@sysoev.ru if (nxt_slow_path(sndbuf < 0)) { 6511Sigor@sysoev.ru goto getsockopt_fail; 6611Sigor@sysoev.ru } 6711Sigor@sysoev.ru 6811Sigor@sysoev.ru size = sndbuf * 4; 6911Sigor@sysoev.ru 7011Sigor@sysoev.ru if (rcvbuf < size) { 7113Sigor@sysoev.ru (void) nxt_socket_setsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF, 7213Sigor@sysoev.ru size); 7311Sigor@sysoev.ru 7413Sigor@sysoev.ru rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF); 7511Sigor@sysoev.ru if (nxt_slow_path(rcvbuf < 0)) { 7611Sigor@sysoev.ru goto getsockopt_fail; 7711Sigor@sysoev.ru } 7811Sigor@sysoev.ru } 7911Sigor@sysoev.ru } 8011Sigor@sysoev.ru 8111Sigor@sysoev.ru port->max_size = nxt_min(max_size, (size_t) sndbuf); 8211Sigor@sysoev.ru port->max_share = (64 * 1024); 8311Sigor@sysoev.ru 8414Sigor@sysoev.ru return NXT_OK; 8511Sigor@sysoev.ru 8611Sigor@sysoev.ru getsockopt_fail: 8711Sigor@sysoev.ru 8813Sigor@sysoev.ru nxt_socket_close(task, port->pair[0]); 8913Sigor@sysoev.ru nxt_socket_close(task, port->pair[1]); 9011Sigor@sysoev.ru 9111Sigor@sysoev.ru socketpair_fail: 9211Sigor@sysoev.ru 9314Sigor@sysoev.ru return NXT_ERROR; 9411Sigor@sysoev.ru } 9511Sigor@sysoev.ru 9611Sigor@sysoev.ru 9711Sigor@sysoev.ru void 9811Sigor@sysoev.ru nxt_port_destroy(nxt_port_t *port) 9911Sigor@sysoev.ru { 10013Sigor@sysoev.ru nxt_socket_close(port->socket.task, port->socket.fd); 10165Sigor@sysoev.ru nxt_mp_destroy(port->mem_pool); 10211Sigor@sysoev.ru } 10311Sigor@sysoev.ru 10411Sigor@sysoev.ru 10511Sigor@sysoev.ru void 10611Sigor@sysoev.ru nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port) 10711Sigor@sysoev.ru { 10811Sigor@sysoev.ru port->socket.fd = port->pair[1]; 10911Sigor@sysoev.ru port->socket.log = &nxt_main_log; 11011Sigor@sysoev.ru port->socket.write_ready = 1; 11111Sigor@sysoev.ru 112141Smax.romanov@nginx.com port->engine = task->thread->engine; 113141Smax.romanov@nginx.com 114141Smax.romanov@nginx.com port->socket.write_work_queue = &port->engine->fast_work_queue; 11511Sigor@sysoev.ru port->socket.write_handler = nxt_port_write_handler; 11611Sigor@sysoev.ru port->socket.error_handler = nxt_port_error_handler; 11711Sigor@sysoev.ru } 11811Sigor@sysoev.ru 11911Sigor@sysoev.ru 12011Sigor@sysoev.ru void 12111Sigor@sysoev.ru nxt_port_write_close(nxt_port_t *port) 12211Sigor@sysoev.ru { 12313Sigor@sysoev.ru nxt_socket_close(port->socket.task, port->pair[1]); 12411Sigor@sysoev.ru port->pair[1] = -1; 12511Sigor@sysoev.ru } 12611Sigor@sysoev.ru 12711Sigor@sysoev.ru 128122Smax.romanov@nginx.com static void 129122Smax.romanov@nginx.com nxt_port_release_send_msg(nxt_task_t *task, void *obj, void *data) 130122Smax.romanov@nginx.com { 131122Smax.romanov@nginx.com nxt_event_engine_t *engine; 132122Smax.romanov@nginx.com nxt_port_send_msg_t *msg; 133122Smax.romanov@nginx.com 134122Smax.romanov@nginx.com msg = obj; 135122Smax.romanov@nginx.com engine = data; 136122Smax.romanov@nginx.com 137122Smax.romanov@nginx.com #if (NXT_DEBUG) 138122Smax.romanov@nginx.com if (nxt_slow_path(data != msg->engine)) { 139122Smax.romanov@nginx.com nxt_log_alert(task->log, "release msg data (%p) != msg->engine (%p)", 140122Smax.romanov@nginx.com data, msg->engine); 141122Smax.romanov@nginx.com nxt_abort(); 142122Smax.romanov@nginx.com } 143122Smax.romanov@nginx.com #endif 144122Smax.romanov@nginx.com 145122Smax.romanov@nginx.com if (engine != task->thread->engine) { 146122Smax.romanov@nginx.com 147122Smax.romanov@nginx.com nxt_debug(task, "current thread is %PT, expected %PT", 148122Smax.romanov@nginx.com task->thread->tid, engine->task.thread->tid); 149122Smax.romanov@nginx.com 150122Smax.romanov@nginx.com nxt_event_engine_post(engine, &msg->work); 151122Smax.romanov@nginx.com 152122Smax.romanov@nginx.com return; 153122Smax.romanov@nginx.com } 154122Smax.romanov@nginx.com 155122Smax.romanov@nginx.com nxt_mp_release(msg->mem_pool, obj); 156122Smax.romanov@nginx.com } 157122Smax.romanov@nginx.com 158122Smax.romanov@nginx.com 15911Sigor@sysoev.ru nxt_int_t 16014Sigor@sysoev.ru nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, 16142Smax.romanov@nginx.com nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b) 16211Sigor@sysoev.ru { 16311Sigor@sysoev.ru nxt_port_send_msg_t *msg; 16411Sigor@sysoev.ru 165163Smax.romanov@nginx.com nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) { 16611Sigor@sysoev.ru 16789Smax.romanov@nginx.com if (msg->port_msg.stream == stream && 16889Smax.romanov@nginx.com msg->port_msg.reply_port == reply_port) { 169189Smax.romanov@nginx.com 170189Smax.romanov@nginx.com nxt_assert(msg->port_msg.last == 0); 171189Smax.romanov@nginx.com 17211Sigor@sysoev.ru /* 17311Sigor@sysoev.ru * An fd is ignored since a file descriptor 17411Sigor@sysoev.ru * must be sent only in the first message of a stream. 17511Sigor@sysoev.ru */ 17611Sigor@sysoev.ru nxt_buf_chain_add(&msg->buf, b); 17711Sigor@sysoev.ru 178189Smax.romanov@nginx.com msg->port_msg.last |= (type & NXT_PORT_MSG_LAST) != 0; 179189Smax.romanov@nginx.com 18011Sigor@sysoev.ru return NXT_OK; 18111Sigor@sysoev.ru } 182163Smax.romanov@nginx.com 183163Smax.romanov@nginx.com } nxt_queue_loop; 18411Sigor@sysoev.ru 185122Smax.romanov@nginx.com msg = nxt_mp_retain(port->mem_pool, sizeof(nxt_port_send_msg_t)); 18611Sigor@sysoev.ru if (nxt_slow_path(msg == NULL)) { 18711Sigor@sysoev.ru return NXT_ERROR; 18811Sigor@sysoev.ru } 18911Sigor@sysoev.ru 190122Smax.romanov@nginx.com msg->link.next = NULL; 191122Smax.romanov@nginx.com msg->link.prev = NULL; 192122Smax.romanov@nginx.com 19311Sigor@sysoev.ru msg->buf = b; 19411Sigor@sysoev.ru msg->fd = fd; 195189Smax.romanov@nginx.com msg->close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0; 19611Sigor@sysoev.ru msg->share = 0; 197122Smax.romanov@nginx.com 198122Smax.romanov@nginx.com msg->work.next = NULL; 199122Smax.romanov@nginx.com msg->work.handler = nxt_port_release_send_msg; 200122Smax.romanov@nginx.com msg->work.task = task; 201122Smax.romanov@nginx.com msg->work.obj = msg; 202122Smax.romanov@nginx.com msg->work.data = task->thread->engine; 203122Smax.romanov@nginx.com 204122Smax.romanov@nginx.com msg->engine = task->thread->engine; 20583Smax.romanov@nginx.com msg->mem_pool = port->mem_pool; 20611Sigor@sysoev.ru 20711Sigor@sysoev.ru msg->port_msg.stream = stream; 20842Smax.romanov@nginx.com msg->port_msg.pid = nxt_pid; 20942Smax.romanov@nginx.com msg->port_msg.reply_port = reply_port; 210189Smax.romanov@nginx.com msg->port_msg.type = type & NXT_PORT_MSG_MASK; 211189Smax.romanov@nginx.com msg->port_msg.last = (type & NXT_PORT_MSG_LAST) != 0; 21242Smax.romanov@nginx.com msg->port_msg.mmap = 0; 21311Sigor@sysoev.ru 21411Sigor@sysoev.ru nxt_queue_insert_tail(&port->messages, &msg->link); 21511Sigor@sysoev.ru 21611Sigor@sysoev.ru if (port->socket.write_ready) { 217125Smax.romanov@nginx.com nxt_port_write_handler(task, &port->socket, NULL); 21811Sigor@sysoev.ru } 21911Sigor@sysoev.ru 22011Sigor@sysoev.ru return NXT_OK; 22111Sigor@sysoev.ru } 22211Sigor@sysoev.ru 22311Sigor@sysoev.ru 22411Sigor@sysoev.ru static void 22511Sigor@sysoev.ru nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) 22611Sigor@sysoev.ru { 22711Sigor@sysoev.ru ssize_t n; 22811Sigor@sysoev.ru nxt_port_t *port; 22989Smax.romanov@nginx.com struct iovec iov[NXT_IOBUF_MAX * 10]; 230127Smax.romanov@nginx.com nxt_work_queue_t *wq; 23111Sigor@sysoev.ru nxt_queue_link_t *link; 232125Smax.romanov@nginx.com nxt_port_method_t m; 23311Sigor@sysoev.ru nxt_port_send_msg_t *msg; 23411Sigor@sysoev.ru nxt_sendbuf_coalesce_t sb; 23542Smax.romanov@nginx.com 23642Smax.romanov@nginx.com size_t plain_size; 23742Smax.romanov@nginx.com nxt_buf_t *plain_buf; 23811Sigor@sysoev.ru 239125Smax.romanov@nginx.com port = nxt_container_of(obj, nxt_port_t, socket); 24011Sigor@sysoev.ru 24111Sigor@sysoev.ru do { 24211Sigor@sysoev.ru link = nxt_queue_first(&port->messages); 24311Sigor@sysoev.ru 24411Sigor@sysoev.ru if (link == nxt_queue_tail(&port->messages)) { 24512Sigor@sysoev.ru nxt_fd_event_block_write(task->thread->engine, &port->socket); 24611Sigor@sysoev.ru return; 24711Sigor@sysoev.ru } 24811Sigor@sysoev.ru 249163Smax.romanov@nginx.com msg = nxt_queue_link_data(link, nxt_port_send_msg_t, link); 25011Sigor@sysoev.ru 25114Sigor@sysoev.ru iov[0].iov_base = &msg->port_msg; 25214Sigor@sysoev.ru iov[0].iov_len = sizeof(nxt_port_msg_t); 25311Sigor@sysoev.ru 25411Sigor@sysoev.ru sb.buf = msg->buf; 25514Sigor@sysoev.ru sb.iobuf = &iov[1]; 25611Sigor@sysoev.ru sb.nmax = NXT_IOBUF_MAX - 1; 25711Sigor@sysoev.ru sb.sync = 0; 25811Sigor@sysoev.ru sb.last = 0; 25942Smax.romanov@nginx.com sb.size = 0; 26011Sigor@sysoev.ru sb.limit = port->max_size; 26111Sigor@sysoev.ru 26242Smax.romanov@nginx.com m = nxt_port_mmap_get_method(task, port, msg->buf); 26342Smax.romanov@nginx.com 26442Smax.romanov@nginx.com if (m == NXT_PORT_METHOD_MMAP) { 26542Smax.romanov@nginx.com sb.limit = (1ULL << 31) - 1; 26689Smax.romanov@nginx.com sb.nmax = NXT_IOBUF_MAX * 10 - 1; 26742Smax.romanov@nginx.com } 26842Smax.romanov@nginx.com 26942Smax.romanov@nginx.com nxt_sendbuf_mem_coalesce(task, &sb); 27042Smax.romanov@nginx.com 27142Smax.romanov@nginx.com plain_size = sb.size; 27242Smax.romanov@nginx.com plain_buf = msg->buf; 27342Smax.romanov@nginx.com 27442Smax.romanov@nginx.com /* 27542Smax.romanov@nginx.com * Send through mmap enabled only when payload 27642Smax.romanov@nginx.com * is bigger than PORT_MMAP_MIN_SIZE. 27742Smax.romanov@nginx.com */ 27842Smax.romanov@nginx.com if (m == NXT_PORT_METHOD_MMAP && plain_size > PORT_MMAP_MIN_SIZE) { 27942Smax.romanov@nginx.com nxt_port_mmap_write(task, port, msg, &sb); 28042Smax.romanov@nginx.com 28142Smax.romanov@nginx.com } else { 28242Smax.romanov@nginx.com m = NXT_PORT_METHOD_PLAIN; 28342Smax.romanov@nginx.com } 28411Sigor@sysoev.ru 285189Smax.romanov@nginx.com msg->port_msg.last |= sb.last; 28611Sigor@sysoev.ru 28742Smax.romanov@nginx.com n = nxt_socketpair_send(&port->socket, msg->fd, iov, sb.niov + 1); 28811Sigor@sysoev.ru 28911Sigor@sysoev.ru if (n > 0) { 29042Smax.romanov@nginx.com if (nxt_slow_path((size_t) n != sb.size + iov[0].iov_len)) { 29111Sigor@sysoev.ru nxt_log(task, NXT_LOG_CRIT, 29211Sigor@sysoev.ru "port %d: short write: %z instead of %uz", 29342Smax.romanov@nginx.com port->socket.fd, n, sb.size + iov[0].iov_len); 29411Sigor@sysoev.ru goto fail; 29511Sigor@sysoev.ru } 29611Sigor@sysoev.ru 297189Smax.romanov@nginx.com if (msg->fd != -1 && msg->close_fd != 0) { 298189Smax.romanov@nginx.com nxt_fd_close(msg->fd); 299189Smax.romanov@nginx.com 300189Smax.romanov@nginx.com msg->fd = -1; 301189Smax.romanov@nginx.com } 302189Smax.romanov@nginx.com 303127Smax.romanov@nginx.com wq = &task->thread->engine->fast_work_queue; 304127Smax.romanov@nginx.com 30542Smax.romanov@nginx.com if (msg->buf != plain_buf) { 30642Smax.romanov@nginx.com /* 30742Smax.romanov@nginx.com * Complete crafted mmap_msgs buf and restore msg->buf 30842Smax.romanov@nginx.com * for regular completion call. 30942Smax.romanov@nginx.com */ 310127Smax.romanov@nginx.com nxt_port_mmap_completion(task, wq, msg->buf); 31142Smax.romanov@nginx.com 31242Smax.romanov@nginx.com msg->buf = plain_buf; 31342Smax.romanov@nginx.com } 31442Smax.romanov@nginx.com 315127Smax.romanov@nginx.com msg->buf = nxt_sendbuf_completion(task, wq, msg->buf, plain_size); 31611Sigor@sysoev.ru 31711Sigor@sysoev.ru if (msg->buf != NULL) { 31811Sigor@sysoev.ru /* 31911Sigor@sysoev.ru * A file descriptor is sent only 32011Sigor@sysoev.ru * in the first message of a stream. 32111Sigor@sysoev.ru */ 32211Sigor@sysoev.ru msg->fd = -1; 32311Sigor@sysoev.ru msg->share += n; 32411Sigor@sysoev.ru 32511Sigor@sysoev.ru if (msg->share >= port->max_share) { 32611Sigor@sysoev.ru msg->share = 0; 32711Sigor@sysoev.ru nxt_queue_remove(link); 32811Sigor@sysoev.ru nxt_queue_insert_tail(&port->messages, link); 32911Sigor@sysoev.ru } 33011Sigor@sysoev.ru 33111Sigor@sysoev.ru } else { 33211Sigor@sysoev.ru nxt_queue_remove(link); 333122Smax.romanov@nginx.com nxt_port_release_send_msg(task, msg, msg->engine); 33411Sigor@sysoev.ru } 33511Sigor@sysoev.ru 33611Sigor@sysoev.ru } else if (nxt_slow_path(n == NXT_ERROR)) { 33711Sigor@sysoev.ru goto fail; 33811Sigor@sysoev.ru } 33911Sigor@sysoev.ru 34011Sigor@sysoev.ru /* n == NXT_AGAIN */ 34111Sigor@sysoev.ru 34211Sigor@sysoev.ru } while (port->socket.write_ready); 34311Sigor@sysoev.ru 34412Sigor@sysoev.ru if (nxt_fd_event_is_disabled(port->socket.write)) { 345141Smax.romanov@nginx.com /* TODO task->thread->engine or port->engine ? */ 34612Sigor@sysoev.ru nxt_fd_event_enable_write(task->thread->engine, &port->socket); 34711Sigor@sysoev.ru } 34811Sigor@sysoev.ru 34911Sigor@sysoev.ru return; 35011Sigor@sysoev.ru 35111Sigor@sysoev.ru fail: 35211Sigor@sysoev.ru 35311Sigor@sysoev.ru nxt_work_queue_add(&task->thread->engine->fast_work_queue, 35411Sigor@sysoev.ru nxt_port_error_handler, task, &port->socket, NULL); 35511Sigor@sysoev.ru } 35611Sigor@sysoev.ru 35711Sigor@sysoev.ru 35811Sigor@sysoev.ru void 35911Sigor@sysoev.ru nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port) 36011Sigor@sysoev.ru { 36111Sigor@sysoev.ru port->socket.fd = port->pair[0]; 36211Sigor@sysoev.ru port->socket.log = &nxt_main_log; 36311Sigor@sysoev.ru 364141Smax.romanov@nginx.com port->engine = task->thread->engine; 365141Smax.romanov@nginx.com 366141Smax.romanov@nginx.com port->socket.read_work_queue = &port->engine->fast_work_queue; 36711Sigor@sysoev.ru port->socket.read_handler = nxt_port_read_handler; 36811Sigor@sysoev.ru port->socket.error_handler = nxt_port_error_handler; 36911Sigor@sysoev.ru 370141Smax.romanov@nginx.com nxt_fd_event_enable_read(port->engine, &port->socket); 37111Sigor@sysoev.ru } 37211Sigor@sysoev.ru 37311Sigor@sysoev.ru 37411Sigor@sysoev.ru void 37511Sigor@sysoev.ru nxt_port_read_close(nxt_port_t *port) 37611Sigor@sysoev.ru { 37713Sigor@sysoev.ru nxt_socket_close(port->socket.task, port->pair[0]); 37811Sigor@sysoev.ru port->pair[0] = -1; 37911Sigor@sysoev.ru } 38011Sigor@sysoev.ru 38111Sigor@sysoev.ru 38211Sigor@sysoev.ru static void 38311Sigor@sysoev.ru nxt_port_read_handler(nxt_task_t *task, void *obj, void *data) 38411Sigor@sysoev.ru { 38542Smax.romanov@nginx.com ssize_t n; 38642Smax.romanov@nginx.com nxt_buf_t *b; 38742Smax.romanov@nginx.com nxt_port_t *port; 38842Smax.romanov@nginx.com struct iovec iov[2]; 38942Smax.romanov@nginx.com nxt_port_recv_msg_t msg; 39011Sigor@sysoev.ru 391125Smax.romanov@nginx.com port = msg.port = nxt_container_of(obj, nxt_port_t, socket); 39211Sigor@sysoev.ru 393141Smax.romanov@nginx.com nxt_assert(port->engine == task->thread->engine); 394141Smax.romanov@nginx.com 39511Sigor@sysoev.ru for ( ;; ) { 39611Sigor@sysoev.ru 39711Sigor@sysoev.ru b = nxt_port_buf_alloc(port); 39811Sigor@sysoev.ru 39911Sigor@sysoev.ru if (nxt_slow_path(b == NULL)) { 40011Sigor@sysoev.ru /* TODO: disable event for some time */ 40111Sigor@sysoev.ru } 40211Sigor@sysoev.ru 40342Smax.romanov@nginx.com iov[0].iov_base = &msg.port_msg; 40414Sigor@sysoev.ru iov[0].iov_len = sizeof(nxt_port_msg_t); 40511Sigor@sysoev.ru 40614Sigor@sysoev.ru iov[1].iov_base = b->mem.pos; 40714Sigor@sysoev.ru iov[1].iov_len = port->max_size; 40814Sigor@sysoev.ru 40942Smax.romanov@nginx.com n = nxt_socketpair_recv(&port->socket, &msg.fd, iov, 2); 41011Sigor@sysoev.ru 41111Sigor@sysoev.ru if (n > 0) { 41242Smax.romanov@nginx.com 41342Smax.romanov@nginx.com msg.buf = b; 41482Smax.romanov@nginx.com msg.size = n; 41542Smax.romanov@nginx.com 41682Smax.romanov@nginx.com nxt_port_read_msg_process(task, port, &msg); 41711Sigor@sysoev.ru 418*194Smax.romanov@nginx.com /* 419*194Smax.romanov@nginx.com * To disable instant completion or buffer re-usage, 420*194Smax.romanov@nginx.com * handler should reset 'msg.buf'. 421*194Smax.romanov@nginx.com */ 422*194Smax.romanov@nginx.com if (msg.buf == b) { 42311Sigor@sysoev.ru nxt_port_buf_free(port, b); 42411Sigor@sysoev.ru } 42511Sigor@sysoev.ru 42611Sigor@sysoev.ru if (port->socket.read_ready) { 42711Sigor@sysoev.ru continue; 42811Sigor@sysoev.ru } 42911Sigor@sysoev.ru 43011Sigor@sysoev.ru return; 43111Sigor@sysoev.ru } 43211Sigor@sysoev.ru 43311Sigor@sysoev.ru if (n == NXT_AGAIN) { 43411Sigor@sysoev.ru nxt_port_buf_free(port, b); 43511Sigor@sysoev.ru 43612Sigor@sysoev.ru nxt_fd_event_enable_read(task->thread->engine, &port->socket); 43711Sigor@sysoev.ru return; 43811Sigor@sysoev.ru } 43911Sigor@sysoev.ru 44011Sigor@sysoev.ru /* n == 0 || n == NXT_ERROR */ 44111Sigor@sysoev.ru 44211Sigor@sysoev.ru nxt_work_queue_add(&task->thread->engine->fast_work_queue, 44311Sigor@sysoev.ru nxt_port_error_handler, task, &port->socket, NULL); 44411Sigor@sysoev.ru return; 44511Sigor@sysoev.ru } 44611Sigor@sysoev.ru } 44711Sigor@sysoev.ru 44811Sigor@sysoev.ru 44911Sigor@sysoev.ru static void 45011Sigor@sysoev.ru nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, 45182Smax.romanov@nginx.com nxt_port_recv_msg_t *msg) 45211Sigor@sysoev.ru { 45342Smax.romanov@nginx.com nxt_buf_t *b; 45442Smax.romanov@nginx.com nxt_buf_t *orig_b; 45511Sigor@sysoev.ru 45682Smax.romanov@nginx.com if (nxt_slow_path(msg->size < sizeof(nxt_port_msg_t))) { 45782Smax.romanov@nginx.com nxt_log(task, NXT_LOG_CRIT, 45882Smax.romanov@nginx.com "port %d: too small message:%uz", port->socket.fd, msg->size); 45911Sigor@sysoev.ru goto fail; 46011Sigor@sysoev.ru } 46111Sigor@sysoev.ru 46242Smax.romanov@nginx.com /* adjust size to actual buffer used size */ 46382Smax.romanov@nginx.com msg->size -= sizeof(nxt_port_msg_t); 46442Smax.romanov@nginx.com 46542Smax.romanov@nginx.com b = orig_b = msg->buf; 46682Smax.romanov@nginx.com b->mem.free += msg->size; 46742Smax.romanov@nginx.com 46842Smax.romanov@nginx.com if (msg->port_msg.mmap) { 46982Smax.romanov@nginx.com nxt_port_mmap_read(task, port, msg); 47042Smax.romanov@nginx.com b = msg->buf; 47142Smax.romanov@nginx.com } 47211Sigor@sysoev.ru 47342Smax.romanov@nginx.com port->handler(task, msg); 47442Smax.romanov@nginx.com 47582Smax.romanov@nginx.com if (msg->port_msg.mmap && orig_b != b) { 47642Smax.romanov@nginx.com 477*194Smax.romanov@nginx.com /* 478*194Smax.romanov@nginx.com * To disable instant buffer completion, 479*194Smax.romanov@nginx.com * handler should reset 'msg->buf'. 480*194Smax.romanov@nginx.com */ 481*194Smax.romanov@nginx.com if (msg->buf == b) { 482*194Smax.romanov@nginx.com /* complete mmap buffers */ 483*194Smax.romanov@nginx.com for (; b != NULL; b = b->next) { 484*194Smax.romanov@nginx.com nxt_debug(task, "complete buffer %p", b); 485*194Smax.romanov@nginx.com 486*194Smax.romanov@nginx.com nxt_work_queue_add(port->socket.read_work_queue, 487*194Smax.romanov@nginx.com b->completion_handler, task, b, b->parent); 488*194Smax.romanov@nginx.com } 48942Smax.romanov@nginx.com } 490*194Smax.romanov@nginx.com 491*194Smax.romanov@nginx.com /* restore original buf */ 492*194Smax.romanov@nginx.com msg->buf = orig_b; 49342Smax.romanov@nginx.com } 49411Sigor@sysoev.ru 49511Sigor@sysoev.ru return; 49611Sigor@sysoev.ru 49711Sigor@sysoev.ru fail: 49811Sigor@sysoev.ru 49942Smax.romanov@nginx.com if (msg->fd != -1) { 50042Smax.romanov@nginx.com nxt_fd_close(msg->fd); 50111Sigor@sysoev.ru } 50211Sigor@sysoev.ru } 50311Sigor@sysoev.ru 50411Sigor@sysoev.ru 50511Sigor@sysoev.ru static nxt_buf_t * 50611Sigor@sysoev.ru nxt_port_buf_alloc(nxt_port_t *port) 50711Sigor@sysoev.ru { 50811Sigor@sysoev.ru nxt_buf_t *b; 50911Sigor@sysoev.ru 51011Sigor@sysoev.ru if (port->free_bufs != NULL) { 51111Sigor@sysoev.ru b = port->free_bufs; 51211Sigor@sysoev.ru port->free_bufs = b->next; 51311Sigor@sysoev.ru 51411Sigor@sysoev.ru b->mem.pos = b->mem.start; 51511Sigor@sysoev.ru b->mem.free = b->mem.start; 51642Smax.romanov@nginx.com b->next = NULL; 51711Sigor@sysoev.ru } else { 51811Sigor@sysoev.ru b = nxt_buf_mem_alloc(port->mem_pool, port->max_size, 0); 51911Sigor@sysoev.ru if (nxt_slow_path(b == NULL)) { 52011Sigor@sysoev.ru return NULL; 52111Sigor@sysoev.ru } 52211Sigor@sysoev.ru } 52311Sigor@sysoev.ru 52411Sigor@sysoev.ru return b; 52511Sigor@sysoev.ru } 52611Sigor@sysoev.ru 52711Sigor@sysoev.ru 52811Sigor@sysoev.ru static void 52911Sigor@sysoev.ru nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b) 53011Sigor@sysoev.ru { 53111Sigor@sysoev.ru b->next = port->free_bufs; 53211Sigor@sysoev.ru port->free_bufs = b; 53311Sigor@sysoev.ru } 53411Sigor@sysoev.ru 53511Sigor@sysoev.ru 53611Sigor@sysoev.ru static void 53711Sigor@sysoev.ru nxt_port_error_handler(nxt_task_t *task, void *obj, void *data) 53811Sigor@sysoev.ru { 539125Smax.romanov@nginx.com nxt_debug(task, "port error handler %p", obj); 54011Sigor@sysoev.ru /* TODO */ 54111Sigor@sysoev.ru } 542