111Sigor@sysoev.ru 211Sigor@sysoev.ru /* 311Sigor@sysoev.ru * Copyright (C) Igor Sysoev 411Sigor@sysoev.ru * Copyright (C) NGINX, Inc. 511Sigor@sysoev.ru */ 611Sigor@sysoev.ru 711Sigor@sysoev.ru #include <nxt_main.h> 811Sigor@sysoev.ru 911Sigor@sysoev.ru 1011Sigor@sysoev.ru static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data); 11592Sigor@sysoev.ru static nxt_buf_t *nxt_port_buf_completion(nxt_task_t *task, 12592Sigor@sysoev.ru nxt_work_queue_t *wq, nxt_buf_t *b, size_t sent, nxt_bool_t mmap_mode); 1311Sigor@sysoev.ru static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data); 1411Sigor@sysoev.ru static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, 1582Smax.romanov@nginx.com nxt_port_recv_msg_t *msg); 1611Sigor@sysoev.ru static nxt_buf_t *nxt_port_buf_alloc(nxt_port_t *port); 1711Sigor@sysoev.ru static void nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b); 1811Sigor@sysoev.ru static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data); 1911Sigor@sysoev.ru 2011Sigor@sysoev.ru 2114Sigor@sysoev.ru nxt_int_t 2214Sigor@sysoev.ru nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port, size_t max_size) 2311Sigor@sysoev.ru { 2465Sigor@sysoev.ru nxt_int_t sndbuf, rcvbuf, size; 2565Sigor@sysoev.ru nxt_socket_t snd, rcv; 2611Sigor@sysoev.ru 2714Sigor@sysoev.ru port->socket.task = task; 2814Sigor@sysoev.ru 2914Sigor@sysoev.ru port->pair[0] = -1; 3014Sigor@sysoev.ru port->pair[1] = -1; 3114Sigor@sysoev.ru 3213Sigor@sysoev.ru if (nxt_slow_path(nxt_socketpair_create(task, port->pair) != NXT_OK)) { 3311Sigor@sysoev.ru goto socketpair_fail; 3411Sigor@sysoev.ru } 3511Sigor@sysoev.ru 3611Sigor@sysoev.ru snd = port->pair[1]; 3711Sigor@sysoev.ru 3813Sigor@sysoev.ru sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF); 3911Sigor@sysoev.ru if (nxt_slow_path(sndbuf < 0)) { 4011Sigor@sysoev.ru goto getsockopt_fail; 4111Sigor@sysoev.ru } 4211Sigor@sysoev.ru 4311Sigor@sysoev.ru rcv = port->pair[0]; 4411Sigor@sysoev.ru 4513Sigor@sysoev.ru rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF); 4611Sigor@sysoev.ru if (nxt_slow_path(rcvbuf < 0)) { 4711Sigor@sysoev.ru goto getsockopt_fail; 4811Sigor@sysoev.ru } 4911Sigor@sysoev.ru 5011Sigor@sysoev.ru if (max_size == 0) { 5111Sigor@sysoev.ru max_size = 16 * 1024; 5211Sigor@sysoev.ru } 5311Sigor@sysoev.ru 5411Sigor@sysoev.ru if ((size_t) sndbuf < max_size) { 5511Sigor@sysoev.ru /* 5611Sigor@sysoev.ru * On Unix domain sockets 5711Sigor@sysoev.ru * Linux uses 224K on both send and receive directions; 5811Sigor@sysoev.ru * FreeBSD, MacOSX, NetBSD, and OpenBSD use 2K buffer size 5911Sigor@sysoev.ru * on send direction and 4K buffer size on receive direction; 6011Sigor@sysoev.ru * Solaris uses 16K on send direction and 5K on receive direction. 6111Sigor@sysoev.ru */ 6213Sigor@sysoev.ru (void) nxt_socket_setsockopt(task, snd, SOL_SOCKET, SO_SNDBUF, 6313Sigor@sysoev.ru max_size); 6411Sigor@sysoev.ru 6513Sigor@sysoev.ru sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF); 6611Sigor@sysoev.ru if (nxt_slow_path(sndbuf < 0)) { 6711Sigor@sysoev.ru goto getsockopt_fail; 6811Sigor@sysoev.ru } 6911Sigor@sysoev.ru 7011Sigor@sysoev.ru size = sndbuf * 4; 7111Sigor@sysoev.ru 7211Sigor@sysoev.ru if (rcvbuf < size) { 7313Sigor@sysoev.ru (void) nxt_socket_setsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF, 7413Sigor@sysoev.ru size); 7511Sigor@sysoev.ru 7613Sigor@sysoev.ru rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF); 7711Sigor@sysoev.ru if (nxt_slow_path(rcvbuf < 0)) { 7811Sigor@sysoev.ru goto getsockopt_fail; 7911Sigor@sysoev.ru } 8011Sigor@sysoev.ru } 8111Sigor@sysoev.ru } 8211Sigor@sysoev.ru 8311Sigor@sysoev.ru port->max_size = nxt_min(max_size, (size_t) sndbuf); 8411Sigor@sysoev.ru port->max_share = (64 * 1024); 8511Sigor@sysoev.ru 8614Sigor@sysoev.ru return NXT_OK; 8711Sigor@sysoev.ru 8811Sigor@sysoev.ru getsockopt_fail: 8911Sigor@sysoev.ru 9013Sigor@sysoev.ru nxt_socket_close(task, port->pair[0]); 9113Sigor@sysoev.ru nxt_socket_close(task, port->pair[1]); 9211Sigor@sysoev.ru 9311Sigor@sysoev.ru socketpair_fail: 9411Sigor@sysoev.ru 9514Sigor@sysoev.ru return NXT_ERROR; 9611Sigor@sysoev.ru } 9711Sigor@sysoev.ru 9811Sigor@sysoev.ru 9911Sigor@sysoev.ru void 10011Sigor@sysoev.ru nxt_port_destroy(nxt_port_t *port) 10111Sigor@sysoev.ru { 10213Sigor@sysoev.ru nxt_socket_close(port->socket.task, port->socket.fd); 10365Sigor@sysoev.ru nxt_mp_destroy(port->mem_pool); 10411Sigor@sysoev.ru } 10511Sigor@sysoev.ru 10611Sigor@sysoev.ru 10711Sigor@sysoev.ru void 10811Sigor@sysoev.ru nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port) 10911Sigor@sysoev.ru { 11011Sigor@sysoev.ru port->socket.fd = port->pair[1]; 11111Sigor@sysoev.ru port->socket.log = &nxt_main_log; 11211Sigor@sysoev.ru port->socket.write_ready = 1; 11311Sigor@sysoev.ru 114141Smax.romanov@nginx.com port->engine = task->thread->engine; 115141Smax.romanov@nginx.com 116141Smax.romanov@nginx.com port->socket.write_work_queue = &port->engine->fast_work_queue; 11711Sigor@sysoev.ru port->socket.write_handler = nxt_port_write_handler; 11811Sigor@sysoev.ru port->socket.error_handler = nxt_port_error_handler; 119197Smax.romanov@nginx.com 120197Smax.romanov@nginx.com if (port->iov == NULL) { 121613Svbart@nginx.com port->iov = nxt_mp_get(port->mem_pool, 122613Svbart@nginx.com sizeof(struct iovec) * NXT_IOBUF_MAX * 10); 123613Svbart@nginx.com port->mmsg_buf = nxt_mp_get(port->mem_pool, 124613Svbart@nginx.com sizeof(uint32_t) * 3 * NXT_IOBUF_MAX * 10); 125197Smax.romanov@nginx.com } 12611Sigor@sysoev.ru } 12711Sigor@sysoev.ru 12811Sigor@sysoev.ru 12911Sigor@sysoev.ru void 13011Sigor@sysoev.ru nxt_port_write_close(nxt_port_t *port) 13111Sigor@sysoev.ru { 13213Sigor@sysoev.ru nxt_socket_close(port->socket.task, port->pair[1]); 13311Sigor@sysoev.ru port->pair[1] = -1; 13411Sigor@sysoev.ru } 13511Sigor@sysoev.ru 13611Sigor@sysoev.ru 137122Smax.romanov@nginx.com static void 138122Smax.romanov@nginx.com nxt_port_release_send_msg(nxt_task_t *task, void *obj, void *data) 139122Smax.romanov@nginx.com { 140122Smax.romanov@nginx.com nxt_event_engine_t *engine; 141122Smax.romanov@nginx.com nxt_port_send_msg_t *msg; 142122Smax.romanov@nginx.com 143122Smax.romanov@nginx.com msg = obj; 144122Smax.romanov@nginx.com engine = data; 145122Smax.romanov@nginx.com 146564Svbart@nginx.com nxt_assert(data == msg->work.data); 147122Smax.romanov@nginx.com 148122Smax.romanov@nginx.com if (engine != task->thread->engine) { 149122Smax.romanov@nginx.com 150122Smax.romanov@nginx.com nxt_debug(task, "current thread is %PT, expected %PT", 151122Smax.romanov@nginx.com task->thread->tid, engine->task.thread->tid); 152122Smax.romanov@nginx.com 153122Smax.romanov@nginx.com nxt_event_engine_post(engine, &msg->work); 154122Smax.romanov@nginx.com 155122Smax.romanov@nginx.com return; 156122Smax.romanov@nginx.com } 157122Smax.romanov@nginx.com 158430Sigor@sysoev.ru nxt_mp_free(engine->mem_pool, obj); 159430Sigor@sysoev.ru nxt_mp_release(engine->mem_pool); 160344Smax.romanov@nginx.com } 161344Smax.romanov@nginx.com 162344Smax.romanov@nginx.com 163344Smax.romanov@nginx.com static nxt_port_send_msg_t * 164344Smax.romanov@nginx.com nxt_port_msg_create(nxt_task_t *task, nxt_port_send_msg_t *m) 165344Smax.romanov@nginx.com { 166430Sigor@sysoev.ru nxt_mp_t *mp; 167344Smax.romanov@nginx.com nxt_port_send_msg_t *msg; 168344Smax.romanov@nginx.com 169430Sigor@sysoev.ru mp = task->thread->engine->mem_pool; 170430Sigor@sysoev.ru 171430Sigor@sysoev.ru msg = nxt_mp_alloc(mp, sizeof(nxt_port_send_msg_t)); 172344Smax.romanov@nginx.com if (nxt_slow_path(msg == NULL)) { 173344Smax.romanov@nginx.com return NULL; 174344Smax.romanov@nginx.com } 175344Smax.romanov@nginx.com 176430Sigor@sysoev.ru nxt_mp_retain(mp); 177430Sigor@sysoev.ru 178344Smax.romanov@nginx.com msg->link.next = NULL; 179344Smax.romanov@nginx.com msg->link.prev = NULL; 180344Smax.romanov@nginx.com 181344Smax.romanov@nginx.com msg->buf = m->buf; 182978Smax.romanov@nginx.com msg->share = m->share; 183344Smax.romanov@nginx.com msg->fd = m->fd; 184344Smax.romanov@nginx.com msg->close_fd = m->close_fd; 185344Smax.romanov@nginx.com msg->port_msg = m->port_msg; 186344Smax.romanov@nginx.com 187344Smax.romanov@nginx.com msg->work.next = NULL; 188344Smax.romanov@nginx.com msg->work.handler = nxt_port_release_send_msg; 189344Smax.romanov@nginx.com msg->work.task = task; 190344Smax.romanov@nginx.com msg->work.obj = msg; 191344Smax.romanov@nginx.com msg->work.data = task->thread->engine; 192344Smax.romanov@nginx.com 193344Smax.romanov@nginx.com return msg; 194344Smax.romanov@nginx.com } 195344Smax.romanov@nginx.com 196344Smax.romanov@nginx.com 197344Smax.romanov@nginx.com static nxt_port_send_msg_t * 198344Smax.romanov@nginx.com nxt_port_msg_push(nxt_task_t *task, nxt_port_t *port, nxt_port_send_msg_t *msg) 199344Smax.romanov@nginx.com { 200344Smax.romanov@nginx.com if (msg->work.data == NULL) { 201344Smax.romanov@nginx.com msg = nxt_port_msg_create(task, msg); 202344Smax.romanov@nginx.com } 203344Smax.romanov@nginx.com 204344Smax.romanov@nginx.com if (msg != NULL) { 205344Smax.romanov@nginx.com nxt_queue_insert_tail(&port->messages, &msg->link); 206344Smax.romanov@nginx.com } 207344Smax.romanov@nginx.com 208344Smax.romanov@nginx.com return msg; 209344Smax.romanov@nginx.com } 210344Smax.romanov@nginx.com 211344Smax.romanov@nginx.com 212344Smax.romanov@nginx.com static nxt_port_send_msg_t * 213344Smax.romanov@nginx.com nxt_port_msg_first(nxt_task_t *task, nxt_port_t *port, nxt_port_send_msg_t *msg) 214344Smax.romanov@nginx.com { 215344Smax.romanov@nginx.com nxt_queue_link_t *lnk; 216344Smax.romanov@nginx.com 217344Smax.romanov@nginx.com lnk = nxt_queue_first(&port->messages); 218344Smax.romanov@nginx.com 219344Smax.romanov@nginx.com if (lnk == nxt_queue_tail(&port->messages)) { 220344Smax.romanov@nginx.com return msg; 221344Smax.romanov@nginx.com } 222344Smax.romanov@nginx.com 223344Smax.romanov@nginx.com return nxt_queue_link_data(lnk, nxt_port_send_msg_t, link); 224122Smax.romanov@nginx.com } 225122Smax.romanov@nginx.com 226122Smax.romanov@nginx.com 22711Sigor@sysoev.ru nxt_int_t 228423Smax.romanov@nginx.com nxt_port_socket_twrite(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, 229423Smax.romanov@nginx.com nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b, 230423Smax.romanov@nginx.com void *tracking) 23111Sigor@sysoev.ru { 232344Smax.romanov@nginx.com nxt_port_send_msg_t msg, *res; 23311Sigor@sysoev.ru 234344Smax.romanov@nginx.com msg.link.next = NULL; 235344Smax.romanov@nginx.com msg.link.prev = NULL; 236122Smax.romanov@nginx.com 237344Smax.romanov@nginx.com msg.buf = b; 238344Smax.romanov@nginx.com msg.fd = fd; 239344Smax.romanov@nginx.com msg.close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0; 240344Smax.romanov@nginx.com msg.share = 0; 24111Sigor@sysoev.ru 242423Smax.romanov@nginx.com if (tracking != NULL) { 243423Smax.romanov@nginx.com nxt_port_mmap_tracking_write(msg.tracking_msg, tracking); 244423Smax.romanov@nginx.com } 245423Smax.romanov@nginx.com 246344Smax.romanov@nginx.com msg.port_msg.stream = stream; 247344Smax.romanov@nginx.com msg.port_msg.pid = nxt_pid; 248344Smax.romanov@nginx.com msg.port_msg.reply_port = reply_port; 249344Smax.romanov@nginx.com msg.port_msg.type = type & NXT_PORT_MSG_MASK; 250344Smax.romanov@nginx.com msg.port_msg.last = (type & NXT_PORT_MSG_LAST) != 0; 251344Smax.romanov@nginx.com msg.port_msg.mmap = 0; 252352Smax.romanov@nginx.com msg.port_msg.nf = 0; 253352Smax.romanov@nginx.com msg.port_msg.mf = 0; 254423Smax.romanov@nginx.com msg.port_msg.tracking = tracking != NULL; 25511Sigor@sysoev.ru 256344Smax.romanov@nginx.com msg.work.data = NULL; 257343Smax.romanov@nginx.com 25811Sigor@sysoev.ru if (port->socket.write_ready) { 259344Smax.romanov@nginx.com nxt_port_write_handler(task, &port->socket, &msg); 260344Smax.romanov@nginx.com } else { 261344Smax.romanov@nginx.com nxt_thread_mutex_lock(&port->write_mutex); 262344Smax.romanov@nginx.com 263344Smax.romanov@nginx.com res = nxt_port_msg_push(task, port, &msg); 264344Smax.romanov@nginx.com 265344Smax.romanov@nginx.com nxt_thread_mutex_unlock(&port->write_mutex); 266344Smax.romanov@nginx.com 267344Smax.romanov@nginx.com if (res == NULL) { 268344Smax.romanov@nginx.com return NXT_ERROR; 269344Smax.romanov@nginx.com } 270344Smax.romanov@nginx.com 271344Smax.romanov@nginx.com nxt_port_use(task, port, 1); 27211Sigor@sysoev.ru } 27311Sigor@sysoev.ru 27411Sigor@sysoev.ru return NXT_OK; 27511Sigor@sysoev.ru } 27611Sigor@sysoev.ru 27711Sigor@sysoev.ru 27811Sigor@sysoev.ru static void 279343Smax.romanov@nginx.com nxt_port_fd_block_write(nxt_task_t *task, nxt_port_t *port, void *data) 280343Smax.romanov@nginx.com { 281343Smax.romanov@nginx.com nxt_fd_event_block_write(task->thread->engine, &port->socket); 282343Smax.romanov@nginx.com } 283343Smax.romanov@nginx.com 284343Smax.romanov@nginx.com 285343Smax.romanov@nginx.com static void 286343Smax.romanov@nginx.com nxt_port_fd_enable_write(nxt_task_t *task, nxt_port_t *port, void *data) 287343Smax.romanov@nginx.com { 288343Smax.romanov@nginx.com nxt_fd_event_enable_write(task->thread->engine, &port->socket); 289343Smax.romanov@nginx.com } 290343Smax.romanov@nginx.com 291343Smax.romanov@nginx.com 292343Smax.romanov@nginx.com static void 29311Sigor@sysoev.ru nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) 29411Sigor@sysoev.ru { 295343Smax.romanov@nginx.com int use_delta; 296197Smax.romanov@nginx.com size_t plain_size; 29711Sigor@sysoev.ru ssize_t n; 298343Smax.romanov@nginx.com nxt_bool_t block_write, enable_write; 29911Sigor@sysoev.ru nxt_port_t *port; 300197Smax.romanov@nginx.com struct iovec *iov; 301127Smax.romanov@nginx.com nxt_work_queue_t *wq; 302125Smax.romanov@nginx.com nxt_port_method_t m; 30311Sigor@sysoev.ru nxt_port_send_msg_t *msg; 30411Sigor@sysoev.ru nxt_sendbuf_coalesce_t sb; 30542Smax.romanov@nginx.com 306197Smax.romanov@nginx.com port = nxt_container_of(obj, nxt_port_t, socket); 30711Sigor@sysoev.ru 308343Smax.romanov@nginx.com block_write = 0; 309343Smax.romanov@nginx.com enable_write = 0; 310343Smax.romanov@nginx.com use_delta = 0; 311343Smax.romanov@nginx.com 312343Smax.romanov@nginx.com nxt_thread_mutex_lock(&port->write_mutex); 313343Smax.romanov@nginx.com 314197Smax.romanov@nginx.com iov = port->iov; 31511Sigor@sysoev.ru 316344Smax.romanov@nginx.com wq = &task->thread->engine->fast_work_queue; 317344Smax.romanov@nginx.com 31811Sigor@sysoev.ru do { 319344Smax.romanov@nginx.com msg = nxt_port_msg_first(task, port, data); 32011Sigor@sysoev.ru 321344Smax.romanov@nginx.com if (msg == NULL) { 322343Smax.romanov@nginx.com block_write = 1; 323343Smax.romanov@nginx.com goto unlock_mutex; 32411Sigor@sysoev.ru } 32511Sigor@sysoev.ru 32614Sigor@sysoev.ru iov[0].iov_base = &msg->port_msg; 32714Sigor@sysoev.ru iov[0].iov_len = sizeof(nxt_port_msg_t); 32811Sigor@sysoev.ru 32911Sigor@sysoev.ru sb.buf = msg->buf; 33014Sigor@sysoev.ru sb.iobuf = &iov[1]; 33111Sigor@sysoev.ru sb.nmax = NXT_IOBUF_MAX - 1; 33211Sigor@sysoev.ru sb.sync = 0; 33311Sigor@sysoev.ru sb.last = 0; 33442Smax.romanov@nginx.com sb.size = 0; 33511Sigor@sysoev.ru sb.limit = port->max_size; 33611Sigor@sysoev.ru 337352Smax.romanov@nginx.com sb.limit_reached = 0; 338352Smax.romanov@nginx.com sb.nmax_reached = 0; 339352Smax.romanov@nginx.com 34042Smax.romanov@nginx.com m = nxt_port_mmap_get_method(task, port, msg->buf); 34142Smax.romanov@nginx.com 34242Smax.romanov@nginx.com if (m == NXT_PORT_METHOD_MMAP) { 34342Smax.romanov@nginx.com sb.limit = (1ULL << 31) - 1; 344352Smax.romanov@nginx.com sb.nmax = nxt_min(NXT_IOBUF_MAX * 10 - 1, 345352Smax.romanov@nginx.com port->max_size / PORT_MMAP_MIN_SIZE); 34642Smax.romanov@nginx.com } 34742Smax.romanov@nginx.com 348423Smax.romanov@nginx.com if (msg->port_msg.tracking) { 349423Smax.romanov@nginx.com iov[0].iov_len += sizeof(msg->tracking_msg); 350423Smax.romanov@nginx.com } 351423Smax.romanov@nginx.com 352*1002Smax.romanov@nginx.com sb.limit -= iov[0].iov_len; 353*1002Smax.romanov@nginx.com 35442Smax.romanov@nginx.com nxt_sendbuf_mem_coalesce(task, &sb); 35542Smax.romanov@nginx.com 35642Smax.romanov@nginx.com plain_size = sb.size; 35742Smax.romanov@nginx.com 35842Smax.romanov@nginx.com /* 35942Smax.romanov@nginx.com * Send through mmap enabled only when payload 36042Smax.romanov@nginx.com * is bigger than PORT_MMAP_MIN_SIZE. 36142Smax.romanov@nginx.com */ 36242Smax.romanov@nginx.com if (m == NXT_PORT_METHOD_MMAP && plain_size > PORT_MMAP_MIN_SIZE) { 36342Smax.romanov@nginx.com nxt_port_mmap_write(task, port, msg, &sb); 36442Smax.romanov@nginx.com 36542Smax.romanov@nginx.com } else { 36642Smax.romanov@nginx.com m = NXT_PORT_METHOD_PLAIN; 36742Smax.romanov@nginx.com } 36811Sigor@sysoev.ru 369189Smax.romanov@nginx.com msg->port_msg.last |= sb.last; 370352Smax.romanov@nginx.com msg->port_msg.mf = sb.limit_reached || sb.nmax_reached; 37111Sigor@sysoev.ru 37242Smax.romanov@nginx.com n = nxt_socketpair_send(&port->socket, msg->fd, iov, sb.niov + 1); 37311Sigor@sysoev.ru 37411Sigor@sysoev.ru if (n > 0) { 37542Smax.romanov@nginx.com if (nxt_slow_path((size_t) n != sb.size + iov[0].iov_len)) { 376564Svbart@nginx.com nxt_alert(task, "port %d: short write: %z instead of %uz", 377564Svbart@nginx.com port->socket.fd, n, sb.size + iov[0].iov_len); 37811Sigor@sysoev.ru goto fail; 37911Sigor@sysoev.ru } 38011Sigor@sysoev.ru 381189Smax.romanov@nginx.com if (msg->fd != -1 && msg->close_fd != 0) { 382189Smax.romanov@nginx.com nxt_fd_close(msg->fd); 383189Smax.romanov@nginx.com 384189Smax.romanov@nginx.com msg->fd = -1; 385189Smax.romanov@nginx.com } 386189Smax.romanov@nginx.com 387592Sigor@sysoev.ru msg->buf = nxt_port_buf_completion(task, wq, msg->buf, plain_size, 388203Smax.romanov@nginx.com m == NXT_PORT_METHOD_MMAP); 38911Sigor@sysoev.ru 39011Sigor@sysoev.ru if (msg->buf != NULL) { 391352Smax.romanov@nginx.com nxt_debug(task, "port %d: frag stream #%uD", port->socket.fd, 392352Smax.romanov@nginx.com msg->port_msg.stream); 393352Smax.romanov@nginx.com 39411Sigor@sysoev.ru /* 39511Sigor@sysoev.ru * A file descriptor is sent only 39611Sigor@sysoev.ru * in the first message of a stream. 39711Sigor@sysoev.ru */ 39811Sigor@sysoev.ru msg->fd = -1; 39911Sigor@sysoev.ru msg->share += n; 400352Smax.romanov@nginx.com msg->port_msg.nf = 1; 401423Smax.romanov@nginx.com msg->port_msg.tracking = 0; 40211Sigor@sysoev.ru 40311Sigor@sysoev.ru if (msg->share >= port->max_share) { 40411Sigor@sysoev.ru msg->share = 0; 405344Smax.romanov@nginx.com 406344Smax.romanov@nginx.com if (msg->link.next != NULL) { 407344Smax.romanov@nginx.com nxt_queue_remove(&msg->link); 408344Smax.romanov@nginx.com use_delta--; 409344Smax.romanov@nginx.com } 410344Smax.romanov@nginx.com data = NULL; 411344Smax.romanov@nginx.com 412344Smax.romanov@nginx.com if (nxt_port_msg_push(task, port, msg) != NULL) { 413344Smax.romanov@nginx.com use_delta++; 414344Smax.romanov@nginx.com } 41511Sigor@sysoev.ru } 41611Sigor@sysoev.ru 41711Sigor@sysoev.ru } else { 418344Smax.romanov@nginx.com if (msg->link.next != NULL) { 419344Smax.romanov@nginx.com nxt_queue_remove(&msg->link); 420344Smax.romanov@nginx.com use_delta--; 421344Smax.romanov@nginx.com nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg, 422344Smax.romanov@nginx.com msg->work.data); 423344Smax.romanov@nginx.com } 424344Smax.romanov@nginx.com data = NULL; 42511Sigor@sysoev.ru } 42611Sigor@sysoev.ru 42711Sigor@sysoev.ru } else if (nxt_slow_path(n == NXT_ERROR)) { 428344Smax.romanov@nginx.com if (msg->link.next == NULL) { 429344Smax.romanov@nginx.com if (nxt_port_msg_push(task, port, msg) != NULL) { 430344Smax.romanov@nginx.com use_delta++; 431344Smax.romanov@nginx.com } 432344Smax.romanov@nginx.com } 43311Sigor@sysoev.ru goto fail; 43411Sigor@sysoev.ru } 43511Sigor@sysoev.ru 43611Sigor@sysoev.ru /* n == NXT_AGAIN */ 43711Sigor@sysoev.ru 43811Sigor@sysoev.ru } while (port->socket.write_ready); 43911Sigor@sysoev.ru 44012Sigor@sysoev.ru if (nxt_fd_event_is_disabled(port->socket.write)) { 441343Smax.romanov@nginx.com enable_write = 1; 44211Sigor@sysoev.ru } 44311Sigor@sysoev.ru 444343Smax.romanov@nginx.com goto unlock_mutex; 44511Sigor@sysoev.ru 44611Sigor@sysoev.ru fail: 44711Sigor@sysoev.ru 448343Smax.romanov@nginx.com use_delta++; 449343Smax.romanov@nginx.com 450344Smax.romanov@nginx.com nxt_work_queue_add(wq, nxt_port_error_handler, task, &port->socket, 451343Smax.romanov@nginx.com &port->socket); 452343Smax.romanov@nginx.com 453343Smax.romanov@nginx.com unlock_mutex: 454343Smax.romanov@nginx.com nxt_thread_mutex_unlock(&port->write_mutex); 455343Smax.romanov@nginx.com 456343Smax.romanov@nginx.com if (block_write && nxt_fd_event_is_active(port->socket.write)) { 457343Smax.romanov@nginx.com nxt_port_post(task, port, nxt_port_fd_block_write, NULL); 458343Smax.romanov@nginx.com } 459343Smax.romanov@nginx.com 460343Smax.romanov@nginx.com if (enable_write) { 461343Smax.romanov@nginx.com nxt_port_post(task, port, nxt_port_fd_enable_write, NULL); 462343Smax.romanov@nginx.com } 463343Smax.romanov@nginx.com 464343Smax.romanov@nginx.com if (use_delta != 0) { 465343Smax.romanov@nginx.com nxt_port_use(task, port, use_delta); 466343Smax.romanov@nginx.com } 46711Sigor@sysoev.ru } 46811Sigor@sysoev.ru 46911Sigor@sysoev.ru 470592Sigor@sysoev.ru static nxt_buf_t * 471592Sigor@sysoev.ru nxt_port_buf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b, 472592Sigor@sysoev.ru size_t sent, nxt_bool_t mmap_mode) 473592Sigor@sysoev.ru { 474592Sigor@sysoev.ru size_t size; 475592Sigor@sysoev.ru 476592Sigor@sysoev.ru while (b != NULL) { 477592Sigor@sysoev.ru 478592Sigor@sysoev.ru nxt_prefetch(b->next); 479592Sigor@sysoev.ru 480592Sigor@sysoev.ru if (!nxt_buf_is_sync(b)) { 481592Sigor@sysoev.ru 482592Sigor@sysoev.ru size = nxt_buf_used_size(b); 483592Sigor@sysoev.ru 484592Sigor@sysoev.ru if (size != 0) { 485592Sigor@sysoev.ru 486592Sigor@sysoev.ru if (sent == 0) { 487592Sigor@sysoev.ru break; 488592Sigor@sysoev.ru } 489592Sigor@sysoev.ru 490592Sigor@sysoev.ru if (nxt_buf_is_port_mmap(b) && mmap_mode) { 491592Sigor@sysoev.ru /* 492592Sigor@sysoev.ru * buffer has been sent to other side which is now 493592Sigor@sysoev.ru * responsible for shared memory bucket release 494592Sigor@sysoev.ru */ 495592Sigor@sysoev.ru b->is_port_mmap_sent = 1; 496592Sigor@sysoev.ru } 497592Sigor@sysoev.ru 498592Sigor@sysoev.ru if (sent < size) { 499592Sigor@sysoev.ru 500592Sigor@sysoev.ru if (nxt_buf_is_mem(b)) { 501592Sigor@sysoev.ru b->mem.pos += sent; 502592Sigor@sysoev.ru } 503592Sigor@sysoev.ru 504592Sigor@sysoev.ru if (nxt_buf_is_file(b)) { 505592Sigor@sysoev.ru b->file_pos += sent; 506592Sigor@sysoev.ru } 507592Sigor@sysoev.ru 508592Sigor@sysoev.ru break; 509592Sigor@sysoev.ru } 510592Sigor@sysoev.ru 511592Sigor@sysoev.ru /* b->mem.free is NULL in file-only buffer. */ 512592Sigor@sysoev.ru b->mem.pos = b->mem.free; 513592Sigor@sysoev.ru 514592Sigor@sysoev.ru if (nxt_buf_is_file(b)) { 515592Sigor@sysoev.ru b->file_pos = b->file_end; 516592Sigor@sysoev.ru } 517592Sigor@sysoev.ru 518592Sigor@sysoev.ru sent -= size; 519592Sigor@sysoev.ru } 520592Sigor@sysoev.ru } 521592Sigor@sysoev.ru 522592Sigor@sysoev.ru nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); 523592Sigor@sysoev.ru 524592Sigor@sysoev.ru b = b->next; 525592Sigor@sysoev.ru } 526592Sigor@sysoev.ru 527592Sigor@sysoev.ru return b; 528592Sigor@sysoev.ru } 529592Sigor@sysoev.ru 530592Sigor@sysoev.ru 53111Sigor@sysoev.ru void 53211Sigor@sysoev.ru nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port) 53311Sigor@sysoev.ru { 53411Sigor@sysoev.ru port->socket.fd = port->pair[0]; 53511Sigor@sysoev.ru port->socket.log = &nxt_main_log; 53611Sigor@sysoev.ru 537141Smax.romanov@nginx.com port->engine = task->thread->engine; 538141Smax.romanov@nginx.com 539141Smax.romanov@nginx.com port->socket.read_work_queue = &port->engine->fast_work_queue; 54011Sigor@sysoev.ru port->socket.read_handler = nxt_port_read_handler; 54111Sigor@sysoev.ru port->socket.error_handler = nxt_port_error_handler; 54211Sigor@sysoev.ru 543141Smax.romanov@nginx.com nxt_fd_event_enable_read(port->engine, &port->socket); 54411Sigor@sysoev.ru } 54511Sigor@sysoev.ru 54611Sigor@sysoev.ru 54711Sigor@sysoev.ru void 54811Sigor@sysoev.ru nxt_port_read_close(nxt_port_t *port) 54911Sigor@sysoev.ru { 550350Smax.romanov@nginx.com port->socket.read_ready = 0; 55113Sigor@sysoev.ru nxt_socket_close(port->socket.task, port->pair[0]); 55211Sigor@sysoev.ru port->pair[0] = -1; 55311Sigor@sysoev.ru } 55411Sigor@sysoev.ru 55511Sigor@sysoev.ru 55611Sigor@sysoev.ru static void 55711Sigor@sysoev.ru nxt_port_read_handler(nxt_task_t *task, void *obj, void *data) 55811Sigor@sysoev.ru { 55942Smax.romanov@nginx.com ssize_t n; 56042Smax.romanov@nginx.com nxt_buf_t *b; 56142Smax.romanov@nginx.com nxt_port_t *port; 56242Smax.romanov@nginx.com struct iovec iov[2]; 56342Smax.romanov@nginx.com nxt_port_recv_msg_t msg; 56411Sigor@sysoev.ru 565125Smax.romanov@nginx.com port = msg.port = nxt_container_of(obj, nxt_port_t, socket); 56611Sigor@sysoev.ru 567141Smax.romanov@nginx.com nxt_assert(port->engine == task->thread->engine); 568141Smax.romanov@nginx.com 56911Sigor@sysoev.ru for ( ;; ) { 57011Sigor@sysoev.ru 57111Sigor@sysoev.ru b = nxt_port_buf_alloc(port); 57211Sigor@sysoev.ru 57311Sigor@sysoev.ru if (nxt_slow_path(b == NULL)) { 57411Sigor@sysoev.ru /* TODO: disable event for some time */ 57511Sigor@sysoev.ru } 57611Sigor@sysoev.ru 57742Smax.romanov@nginx.com iov[0].iov_base = &msg.port_msg; 57814Sigor@sysoev.ru iov[0].iov_len = sizeof(nxt_port_msg_t); 57911Sigor@sysoev.ru 58014Sigor@sysoev.ru iov[1].iov_base = b->mem.pos; 58114Sigor@sysoev.ru iov[1].iov_len = port->max_size; 58214Sigor@sysoev.ru 58342Smax.romanov@nginx.com n = nxt_socketpair_recv(&port->socket, &msg.fd, iov, 2); 58411Sigor@sysoev.ru 58511Sigor@sysoev.ru if (n > 0) { 58642Smax.romanov@nginx.com 58742Smax.romanov@nginx.com msg.buf = b; 58882Smax.romanov@nginx.com msg.size = n; 58942Smax.romanov@nginx.com 59082Smax.romanov@nginx.com nxt_port_read_msg_process(task, port, &msg); 59111Sigor@sysoev.ru 592194Smax.romanov@nginx.com /* 593194Smax.romanov@nginx.com * To disable instant completion or buffer re-usage, 594194Smax.romanov@nginx.com * handler should reset 'msg.buf'. 595194Smax.romanov@nginx.com */ 596194Smax.romanov@nginx.com if (msg.buf == b) { 59711Sigor@sysoev.ru nxt_port_buf_free(port, b); 59811Sigor@sysoev.ru } 59911Sigor@sysoev.ru 60011Sigor@sysoev.ru if (port->socket.read_ready) { 60111Sigor@sysoev.ru continue; 60211Sigor@sysoev.ru } 60311Sigor@sysoev.ru 60411Sigor@sysoev.ru return; 60511Sigor@sysoev.ru } 60611Sigor@sysoev.ru 60711Sigor@sysoev.ru if (n == NXT_AGAIN) { 60811Sigor@sysoev.ru nxt_port_buf_free(port, b); 60911Sigor@sysoev.ru 61012Sigor@sysoev.ru nxt_fd_event_enable_read(task->thread->engine, &port->socket); 61111Sigor@sysoev.ru return; 61211Sigor@sysoev.ru } 61311Sigor@sysoev.ru 61411Sigor@sysoev.ru /* n == 0 || n == NXT_ERROR */ 61511Sigor@sysoev.ru 61611Sigor@sysoev.ru nxt_work_queue_add(&task->thread->engine->fast_work_queue, 61711Sigor@sysoev.ru nxt_port_error_handler, task, &port->socket, NULL); 61811Sigor@sysoev.ru return; 61911Sigor@sysoev.ru } 62011Sigor@sysoev.ru } 62111Sigor@sysoev.ru 62211Sigor@sysoev.ru 623352Smax.romanov@nginx.com static nxt_int_t 624352Smax.romanov@nginx.com nxt_port_lvlhsh_frag_test(nxt_lvlhsh_query_t *lhq, void *data) 625352Smax.romanov@nginx.com { 626352Smax.romanov@nginx.com nxt_port_recv_msg_t *fmsg; 627352Smax.romanov@nginx.com 628352Smax.romanov@nginx.com fmsg = data; 629352Smax.romanov@nginx.com 630352Smax.romanov@nginx.com if (lhq->key.length == sizeof(uint32_t) 631352Smax.romanov@nginx.com && *(uint32_t *) lhq->key.start == fmsg->port_msg.stream) 632352Smax.romanov@nginx.com { 633352Smax.romanov@nginx.com return NXT_OK; 634352Smax.romanov@nginx.com } 635352Smax.romanov@nginx.com 636352Smax.romanov@nginx.com return NXT_DECLINED; 637352Smax.romanov@nginx.com } 638352Smax.romanov@nginx.com 639352Smax.romanov@nginx.com 640352Smax.romanov@nginx.com static void * 641352Smax.romanov@nginx.com nxt_port_lvlhsh_frag_alloc(void *ctx, size_t size) 642352Smax.romanov@nginx.com { 643352Smax.romanov@nginx.com return nxt_mp_alloc(ctx, size); 644352Smax.romanov@nginx.com } 645352Smax.romanov@nginx.com 646352Smax.romanov@nginx.com 647352Smax.romanov@nginx.com static void 648352Smax.romanov@nginx.com nxt_port_lvlhsh_frag_free(void *ctx, void *p) 649352Smax.romanov@nginx.com { 650389Smax.romanov@nginx.com nxt_mp_free(ctx, p); 651352Smax.romanov@nginx.com } 652352Smax.romanov@nginx.com 653352Smax.romanov@nginx.com 654352Smax.romanov@nginx.com static const nxt_lvlhsh_proto_t lvlhsh_frag_proto nxt_aligned(64) = { 655352Smax.romanov@nginx.com NXT_LVLHSH_DEFAULT, 656352Smax.romanov@nginx.com nxt_port_lvlhsh_frag_test, 657352Smax.romanov@nginx.com nxt_port_lvlhsh_frag_alloc, 658352Smax.romanov@nginx.com nxt_port_lvlhsh_frag_free, 659352Smax.romanov@nginx.com }; 660352Smax.romanov@nginx.com 661352Smax.romanov@nginx.com 662352Smax.romanov@nginx.com static nxt_port_recv_msg_t * 663352Smax.romanov@nginx.com nxt_port_frag_start(nxt_task_t *task, nxt_port_t *port, 664352Smax.romanov@nginx.com nxt_port_recv_msg_t *msg) 665352Smax.romanov@nginx.com { 666352Smax.romanov@nginx.com nxt_int_t res; 667352Smax.romanov@nginx.com nxt_lvlhsh_query_t lhq; 668352Smax.romanov@nginx.com nxt_port_recv_msg_t *fmsg; 669352Smax.romanov@nginx.com 670352Smax.romanov@nginx.com nxt_debug(task, "start frag stream #%uD", msg->port_msg.stream); 671352Smax.romanov@nginx.com 672352Smax.romanov@nginx.com fmsg = nxt_mp_alloc(port->mem_pool, sizeof(nxt_port_recv_msg_t)); 673352Smax.romanov@nginx.com 674352Smax.romanov@nginx.com if (nxt_slow_path(fmsg == NULL)) { 675352Smax.romanov@nginx.com return NULL; 676352Smax.romanov@nginx.com } 677352Smax.romanov@nginx.com 678352Smax.romanov@nginx.com *fmsg = *msg; 679352Smax.romanov@nginx.com 680352Smax.romanov@nginx.com lhq.key_hash = nxt_murmur_hash2(&fmsg->port_msg.stream, sizeof(uint32_t)); 681352Smax.romanov@nginx.com lhq.key.length = sizeof(uint32_t); 682352Smax.romanov@nginx.com lhq.key.start = (u_char *) &fmsg->port_msg.stream; 683352Smax.romanov@nginx.com lhq.proto = &lvlhsh_frag_proto; 684352Smax.romanov@nginx.com lhq.replace = 0; 685352Smax.romanov@nginx.com lhq.value = fmsg; 686352Smax.romanov@nginx.com lhq.pool = port->mem_pool; 687352Smax.romanov@nginx.com 688352Smax.romanov@nginx.com res = nxt_lvlhsh_insert(&port->frags, &lhq); 689352Smax.romanov@nginx.com 690352Smax.romanov@nginx.com switch (res) { 691352Smax.romanov@nginx.com 692352Smax.romanov@nginx.com case NXT_OK: 693352Smax.romanov@nginx.com return fmsg; 694352Smax.romanov@nginx.com 695352Smax.romanov@nginx.com case NXT_DECLINED: 696352Smax.romanov@nginx.com nxt_log(task, NXT_LOG_WARN, "duplicate frag stream #%uD", 697352Smax.romanov@nginx.com fmsg->port_msg.stream); 698352Smax.romanov@nginx.com nxt_mp_free(port->mem_pool, fmsg); 699352Smax.romanov@nginx.com 700352Smax.romanov@nginx.com return NULL; 701352Smax.romanov@nginx.com 702352Smax.romanov@nginx.com default: 703352Smax.romanov@nginx.com nxt_log(task, NXT_LOG_WARN, "failed to add frag stream #%uD", 704352Smax.romanov@nginx.com fmsg->port_msg.stream); 705352Smax.romanov@nginx.com 706352Smax.romanov@nginx.com nxt_mp_free(port->mem_pool, fmsg); 707352Smax.romanov@nginx.com 708352Smax.romanov@nginx.com return NULL; 709352Smax.romanov@nginx.com 710352Smax.romanov@nginx.com } 711352Smax.romanov@nginx.com } 712352Smax.romanov@nginx.com 713352Smax.romanov@nginx.com 714352Smax.romanov@nginx.com static nxt_port_recv_msg_t * 715352Smax.romanov@nginx.com nxt_port_frag_find(nxt_task_t *task, nxt_port_t *port, uint32_t stream, 716352Smax.romanov@nginx.com nxt_bool_t last) 717352Smax.romanov@nginx.com { 718352Smax.romanov@nginx.com nxt_int_t res; 719352Smax.romanov@nginx.com nxt_lvlhsh_query_t lhq; 720352Smax.romanov@nginx.com 721352Smax.romanov@nginx.com nxt_debug(task, "%s frag stream #%uD", last ? "last" : "next", stream); 722352Smax.romanov@nginx.com 723352Smax.romanov@nginx.com lhq.key_hash = nxt_murmur_hash2(&stream, sizeof(uint32_t)); 724352Smax.romanov@nginx.com lhq.key.length = sizeof(uint32_t); 725352Smax.romanov@nginx.com lhq.key.start = (u_char *) &stream; 726352Smax.romanov@nginx.com lhq.proto = &lvlhsh_frag_proto; 727352Smax.romanov@nginx.com lhq.pool = port->mem_pool; 728352Smax.romanov@nginx.com 729352Smax.romanov@nginx.com res = last != 0 ? nxt_lvlhsh_delete(&port->frags, &lhq) : 730352Smax.romanov@nginx.com nxt_lvlhsh_find(&port->frags, &lhq); 731352Smax.romanov@nginx.com 732352Smax.romanov@nginx.com switch (res) { 733352Smax.romanov@nginx.com 734352Smax.romanov@nginx.com case NXT_OK: 735352Smax.romanov@nginx.com return lhq.value; 736352Smax.romanov@nginx.com 737352Smax.romanov@nginx.com default: 738551Smax.romanov@nginx.com nxt_log(task, NXT_LOG_INFO, "frag stream #%uD not found", stream); 739352Smax.romanov@nginx.com 740352Smax.romanov@nginx.com return NULL; 741352Smax.romanov@nginx.com } 742352Smax.romanov@nginx.com } 743352Smax.romanov@nginx.com 744352Smax.romanov@nginx.com 74511Sigor@sysoev.ru static void 74611Sigor@sysoev.ru nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, 74782Smax.romanov@nginx.com nxt_port_recv_msg_t *msg) 74811Sigor@sysoev.ru { 749352Smax.romanov@nginx.com nxt_buf_t *b, *orig_b; 750352Smax.romanov@nginx.com nxt_port_recv_msg_t *fmsg; 75111Sigor@sysoev.ru 75282Smax.romanov@nginx.com if (nxt_slow_path(msg->size < sizeof(nxt_port_msg_t))) { 753564Svbart@nginx.com nxt_alert(task, "port %d: too small message:%uz", 754564Svbart@nginx.com port->socket.fd, msg->size); 755423Smax.romanov@nginx.com 756423Smax.romanov@nginx.com if (msg->fd != -1) { 757423Smax.romanov@nginx.com nxt_fd_close(msg->fd); 758423Smax.romanov@nginx.com } 759423Smax.romanov@nginx.com 760423Smax.romanov@nginx.com return; 76111Sigor@sysoev.ru } 76211Sigor@sysoev.ru 76342Smax.romanov@nginx.com /* adjust size to actual buffer used size */ 76482Smax.romanov@nginx.com msg->size -= sizeof(nxt_port_msg_t); 76542Smax.romanov@nginx.com 76642Smax.romanov@nginx.com b = orig_b = msg->buf; 76782Smax.romanov@nginx.com b->mem.free += msg->size; 76842Smax.romanov@nginx.com 769423Smax.romanov@nginx.com if (msg->port_msg.tracking) { 770423Smax.romanov@nginx.com msg->cancelled = nxt_port_mmap_tracking_read(task, msg) == 0; 771423Smax.romanov@nginx.com 772423Smax.romanov@nginx.com } else { 773423Smax.romanov@nginx.com msg->cancelled = 0; 77442Smax.romanov@nginx.com } 77511Sigor@sysoev.ru 776352Smax.romanov@nginx.com if (nxt_slow_path(msg->port_msg.nf != 0)) { 777423Smax.romanov@nginx.com 778352Smax.romanov@nginx.com fmsg = nxt_port_frag_find(task, port, msg->port_msg.stream, 779352Smax.romanov@nginx.com msg->port_msg.mf == 0); 780352Smax.romanov@nginx.com 781551Smax.romanov@nginx.com if (nxt_slow_path(fmsg == NULL)) { 782551Smax.romanov@nginx.com goto fmsg_failed; 783551Smax.romanov@nginx.com } 784423Smax.romanov@nginx.com 785423Smax.romanov@nginx.com if (nxt_fast_path(fmsg->cancelled == 0)) { 786423Smax.romanov@nginx.com 787423Smax.romanov@nginx.com if (msg->port_msg.mmap) { 788423Smax.romanov@nginx.com nxt_port_mmap_read(task, msg); 789423Smax.romanov@nginx.com } 790423Smax.romanov@nginx.com 791423Smax.romanov@nginx.com nxt_buf_chain_add(&fmsg->buf, msg->buf); 792423Smax.romanov@nginx.com 793423Smax.romanov@nginx.com fmsg->size += msg->size; 794423Smax.romanov@nginx.com msg->buf = NULL; 795423Smax.romanov@nginx.com b = NULL; 796423Smax.romanov@nginx.com 797423Smax.romanov@nginx.com if (nxt_fast_path(msg->port_msg.mf == 0)) { 798423Smax.romanov@nginx.com 799423Smax.romanov@nginx.com b = fmsg->buf; 800423Smax.romanov@nginx.com 801423Smax.romanov@nginx.com port->handler(task, fmsg); 802423Smax.romanov@nginx.com 803423Smax.romanov@nginx.com msg->buf = fmsg->buf; 804423Smax.romanov@nginx.com msg->fd = fmsg->fd; 805974Smax.romanov@nginx.com 806974Smax.romanov@nginx.com /* 807974Smax.romanov@nginx.com * To disable instant completion or buffer re-usage, 808974Smax.romanov@nginx.com * handler should reset 'msg.buf'. 809974Smax.romanov@nginx.com */ 810974Smax.romanov@nginx.com if (!msg->port_msg.mmap && msg->buf == b) { 811974Smax.romanov@nginx.com nxt_port_buf_free(port, b); 812974Smax.romanov@nginx.com } 813423Smax.romanov@nginx.com } 814352Smax.romanov@nginx.com } 815352Smax.romanov@nginx.com 816352Smax.romanov@nginx.com if (nxt_fast_path(msg->port_msg.mf == 0)) { 817352Smax.romanov@nginx.com nxt_mp_free(port->mem_pool, fmsg); 818352Smax.romanov@nginx.com } 819352Smax.romanov@nginx.com } else { 820352Smax.romanov@nginx.com if (nxt_slow_path(msg->port_msg.mf != 0)) { 821423Smax.romanov@nginx.com 822423Smax.romanov@nginx.com if (msg->port_msg.mmap && msg->cancelled == 0) { 823423Smax.romanov@nginx.com nxt_port_mmap_read(task, msg); 824423Smax.romanov@nginx.com b = msg->buf; 825423Smax.romanov@nginx.com } 826423Smax.romanov@nginx.com 827352Smax.romanov@nginx.com fmsg = nxt_port_frag_start(task, port, msg); 828352Smax.romanov@nginx.com 829551Smax.romanov@nginx.com if (nxt_slow_path(fmsg == NULL)) { 830551Smax.romanov@nginx.com goto fmsg_failed; 831551Smax.romanov@nginx.com } 832352Smax.romanov@nginx.com 833352Smax.romanov@nginx.com fmsg->port_msg.nf = 0; 834352Smax.romanov@nginx.com fmsg->port_msg.mf = 0; 835352Smax.romanov@nginx.com 836423Smax.romanov@nginx.com if (nxt_fast_path(msg->cancelled == 0)) { 837423Smax.romanov@nginx.com msg->buf = NULL; 838423Smax.romanov@nginx.com msg->fd = -1; 839423Smax.romanov@nginx.com b = NULL; 840423Smax.romanov@nginx.com 841423Smax.romanov@nginx.com } else { 842423Smax.romanov@nginx.com if (msg->fd != -1) { 843423Smax.romanov@nginx.com nxt_fd_close(msg->fd); 844423Smax.romanov@nginx.com } 845423Smax.romanov@nginx.com } 846352Smax.romanov@nginx.com } else { 847423Smax.romanov@nginx.com if (nxt_fast_path(msg->cancelled == 0)) { 848423Smax.romanov@nginx.com 849423Smax.romanov@nginx.com if (msg->port_msg.mmap) { 850423Smax.romanov@nginx.com nxt_port_mmap_read(task, msg); 851423Smax.romanov@nginx.com b = msg->buf; 852423Smax.romanov@nginx.com } 853423Smax.romanov@nginx.com 854423Smax.romanov@nginx.com port->handler(task, msg); 855423Smax.romanov@nginx.com } 856352Smax.romanov@nginx.com } 857352Smax.romanov@nginx.com } 85842Smax.romanov@nginx.com 859551Smax.romanov@nginx.com fmsg_failed: 860551Smax.romanov@nginx.com 86182Smax.romanov@nginx.com if (msg->port_msg.mmap && orig_b != b) { 86242Smax.romanov@nginx.com 863194Smax.romanov@nginx.com /* 864194Smax.romanov@nginx.com * To disable instant buffer completion, 865194Smax.romanov@nginx.com * handler should reset 'msg->buf'. 866194Smax.romanov@nginx.com */ 867194Smax.romanov@nginx.com if (msg->buf == b) { 868194Smax.romanov@nginx.com /* complete mmap buffers */ 869194Smax.romanov@nginx.com for (; b != NULL; b = b->next) { 870194Smax.romanov@nginx.com nxt_debug(task, "complete buffer %p", b); 871194Smax.romanov@nginx.com 872194Smax.romanov@nginx.com nxt_work_queue_add(port->socket.read_work_queue, 873194Smax.romanov@nginx.com b->completion_handler, task, b, b->parent); 874194Smax.romanov@nginx.com } 87542Smax.romanov@nginx.com } 876194Smax.romanov@nginx.com 877194Smax.romanov@nginx.com /* restore original buf */ 878194Smax.romanov@nginx.com msg->buf = orig_b; 87942Smax.romanov@nginx.com } 88011Sigor@sysoev.ru } 88111Sigor@sysoev.ru 88211Sigor@sysoev.ru 88311Sigor@sysoev.ru static nxt_buf_t * 88411Sigor@sysoev.ru nxt_port_buf_alloc(nxt_port_t *port) 88511Sigor@sysoev.ru { 88611Sigor@sysoev.ru nxt_buf_t *b; 88711Sigor@sysoev.ru 88811Sigor@sysoev.ru if (port->free_bufs != NULL) { 88911Sigor@sysoev.ru b = port->free_bufs; 89011Sigor@sysoev.ru port->free_bufs = b->next; 89111Sigor@sysoev.ru 89211Sigor@sysoev.ru b->mem.pos = b->mem.start; 89311Sigor@sysoev.ru b->mem.free = b->mem.start; 89442Smax.romanov@nginx.com b->next = NULL; 89511Sigor@sysoev.ru } else { 89611Sigor@sysoev.ru b = nxt_buf_mem_alloc(port->mem_pool, port->max_size, 0); 89711Sigor@sysoev.ru if (nxt_slow_path(b == NULL)) { 89811Sigor@sysoev.ru return NULL; 89911Sigor@sysoev.ru } 90011Sigor@sysoev.ru } 90111Sigor@sysoev.ru 90211Sigor@sysoev.ru return b; 90311Sigor@sysoev.ru } 90411Sigor@sysoev.ru 90511Sigor@sysoev.ru 90611Sigor@sysoev.ru static void 90711Sigor@sysoev.ru nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b) 90811Sigor@sysoev.ru { 909974Smax.romanov@nginx.com nxt_buf_chain_add(&b, port->free_bufs); 91011Sigor@sysoev.ru port->free_bufs = b; 91111Sigor@sysoev.ru } 91211Sigor@sysoev.ru 91311Sigor@sysoev.ru 91411Sigor@sysoev.ru static void 91511Sigor@sysoev.ru nxt_port_error_handler(nxt_task_t *task, void *obj, void *data) 91611Sigor@sysoev.ru { 917343Smax.romanov@nginx.com int use_delta; 918197Smax.romanov@nginx.com nxt_buf_t *b; 919197Smax.romanov@nginx.com nxt_port_t *port; 920197Smax.romanov@nginx.com nxt_work_queue_t *wq; 921197Smax.romanov@nginx.com nxt_port_send_msg_t *msg; 922197Smax.romanov@nginx.com 923125Smax.romanov@nginx.com nxt_debug(task, "port error handler %p", obj); 92411Sigor@sysoev.ru /* TODO */ 925197Smax.romanov@nginx.com 926197Smax.romanov@nginx.com port = nxt_container_of(obj, nxt_port_t, socket); 927197Smax.romanov@nginx.com 928343Smax.romanov@nginx.com use_delta = 0; 929343Smax.romanov@nginx.com 930343Smax.romanov@nginx.com if (obj == data) { 931343Smax.romanov@nginx.com use_delta--; 932343Smax.romanov@nginx.com } 933197Smax.romanov@nginx.com 934343Smax.romanov@nginx.com wq = &task->thread->engine->fast_work_queue; 935343Smax.romanov@nginx.com 936343Smax.romanov@nginx.com nxt_thread_mutex_lock(&port->write_mutex); 937343Smax.romanov@nginx.com 938343Smax.romanov@nginx.com nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) { 939197Smax.romanov@nginx.com 940521Szelenkov@nginx.com for (b = msg->buf; b != NULL; b = b->next) { 941197Smax.romanov@nginx.com if (nxt_buf_is_sync(b)) { 942197Smax.romanov@nginx.com continue; 943197Smax.romanov@nginx.com } 944197Smax.romanov@nginx.com 945197Smax.romanov@nginx.com nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); 946197Smax.romanov@nginx.com } 947197Smax.romanov@nginx.com 948197Smax.romanov@nginx.com nxt_queue_remove(&msg->link); 949343Smax.romanov@nginx.com use_delta--; 950197Smax.romanov@nginx.com nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg, 951344Smax.romanov@nginx.com msg->work.data); 952197Smax.romanov@nginx.com 953197Smax.romanov@nginx.com } nxt_queue_loop; 954343Smax.romanov@nginx.com 955343Smax.romanov@nginx.com nxt_thread_mutex_unlock(&port->write_mutex); 956343Smax.romanov@nginx.com 957343Smax.romanov@nginx.com if (use_delta != 0) { 958343Smax.romanov@nginx.com nxt_port_use(task, port, use_delta); 959343Smax.romanov@nginx.com } 96011Sigor@sysoev.ru } 961