111Sigor@sysoev.ru 211Sigor@sysoev.ru /* 311Sigor@sysoev.ru * Copyright (C) Igor Sysoev 411Sigor@sysoev.ru * Copyright (C) NGINX, Inc. 511Sigor@sysoev.ru */ 611Sigor@sysoev.ru 711Sigor@sysoev.ru #include <nxt_main.h> 811Sigor@sysoev.ru 911Sigor@sysoev.ru 10*1125Smax.romanov@nginx.com static nxt_int_t nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port, 11*1125Smax.romanov@nginx.com nxt_port_send_msg_t *msg); 12*1125Smax.romanov@nginx.com static nxt_port_send_msg_t *nxt_port_msg_alloc(nxt_port_send_msg_t *m); 1311Sigor@sysoev.ru static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data); 14*1125Smax.romanov@nginx.com static nxt_port_send_msg_t *nxt_port_msg_first(nxt_port_t *port); 15592Sigor@sysoev.ru static nxt_buf_t *nxt_port_buf_completion(nxt_task_t *task, 16592Sigor@sysoev.ru nxt_work_queue_t *wq, nxt_buf_t *b, size_t sent, nxt_bool_t mmap_mode); 17*1125Smax.romanov@nginx.com static nxt_port_send_msg_t *nxt_port_msg_insert_tail(nxt_port_t *port, 18*1125Smax.romanov@nginx.com nxt_port_send_msg_t *msg); 1911Sigor@sysoev.ru static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data); 2011Sigor@sysoev.ru static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, 2182Smax.romanov@nginx.com nxt_port_recv_msg_t *msg); 2211Sigor@sysoev.ru static nxt_buf_t *nxt_port_buf_alloc(nxt_port_t *port); 2311Sigor@sysoev.ru static void nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b); 2411Sigor@sysoev.ru static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data); 2511Sigor@sysoev.ru 2611Sigor@sysoev.ru 2714Sigor@sysoev.ru nxt_int_t 2814Sigor@sysoev.ru nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port, size_t max_size) 2911Sigor@sysoev.ru { 3065Sigor@sysoev.ru nxt_int_t sndbuf, rcvbuf, size; 3165Sigor@sysoev.ru nxt_socket_t snd, rcv; 3211Sigor@sysoev.ru 3314Sigor@sysoev.ru port->socket.task = task; 3414Sigor@sysoev.ru 3514Sigor@sysoev.ru port->pair[0] = -1; 3614Sigor@sysoev.ru port->pair[1] = -1; 3714Sigor@sysoev.ru 3813Sigor@sysoev.ru if (nxt_slow_path(nxt_socketpair_create(task, port->pair) != NXT_OK)) { 3911Sigor@sysoev.ru goto socketpair_fail; 4011Sigor@sysoev.ru } 4111Sigor@sysoev.ru 4211Sigor@sysoev.ru snd = port->pair[1]; 4311Sigor@sysoev.ru 4413Sigor@sysoev.ru sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF); 4511Sigor@sysoev.ru if (nxt_slow_path(sndbuf < 0)) { 4611Sigor@sysoev.ru goto getsockopt_fail; 4711Sigor@sysoev.ru } 4811Sigor@sysoev.ru 4911Sigor@sysoev.ru rcv = port->pair[0]; 5011Sigor@sysoev.ru 5113Sigor@sysoev.ru rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF); 5211Sigor@sysoev.ru if (nxt_slow_path(rcvbuf < 0)) { 5311Sigor@sysoev.ru goto getsockopt_fail; 5411Sigor@sysoev.ru } 5511Sigor@sysoev.ru 5611Sigor@sysoev.ru if (max_size == 0) { 5711Sigor@sysoev.ru max_size = 16 * 1024; 5811Sigor@sysoev.ru } 5911Sigor@sysoev.ru 6011Sigor@sysoev.ru if ((size_t) sndbuf < max_size) { 6111Sigor@sysoev.ru /* 6211Sigor@sysoev.ru * On Unix domain sockets 6311Sigor@sysoev.ru * Linux uses 224K on both send and receive directions; 6411Sigor@sysoev.ru * FreeBSD, MacOSX, NetBSD, and OpenBSD use 2K buffer size 6511Sigor@sysoev.ru * on send direction and 4K buffer size on receive direction; 6611Sigor@sysoev.ru * Solaris uses 16K on send direction and 5K on receive direction. 6711Sigor@sysoev.ru */ 6813Sigor@sysoev.ru (void) nxt_socket_setsockopt(task, snd, SOL_SOCKET, SO_SNDBUF, 6913Sigor@sysoev.ru max_size); 7011Sigor@sysoev.ru 7113Sigor@sysoev.ru sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF); 7211Sigor@sysoev.ru if (nxt_slow_path(sndbuf < 0)) { 7311Sigor@sysoev.ru goto getsockopt_fail; 7411Sigor@sysoev.ru } 7511Sigor@sysoev.ru 7611Sigor@sysoev.ru size = sndbuf * 4; 7711Sigor@sysoev.ru 7811Sigor@sysoev.ru if (rcvbuf < size) { 7913Sigor@sysoev.ru (void) nxt_socket_setsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF, 8013Sigor@sysoev.ru size); 8111Sigor@sysoev.ru 8213Sigor@sysoev.ru rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF); 8311Sigor@sysoev.ru if (nxt_slow_path(rcvbuf < 0)) { 8411Sigor@sysoev.ru goto getsockopt_fail; 8511Sigor@sysoev.ru } 8611Sigor@sysoev.ru } 8711Sigor@sysoev.ru } 8811Sigor@sysoev.ru 8911Sigor@sysoev.ru port->max_size = nxt_min(max_size, (size_t) sndbuf); 9011Sigor@sysoev.ru port->max_share = (64 * 1024); 9111Sigor@sysoev.ru 9214Sigor@sysoev.ru return NXT_OK; 9311Sigor@sysoev.ru 9411Sigor@sysoev.ru getsockopt_fail: 9511Sigor@sysoev.ru 9613Sigor@sysoev.ru nxt_socket_close(task, port->pair[0]); 9713Sigor@sysoev.ru nxt_socket_close(task, port->pair[1]); 9811Sigor@sysoev.ru 9911Sigor@sysoev.ru socketpair_fail: 10011Sigor@sysoev.ru 10114Sigor@sysoev.ru return NXT_ERROR; 10211Sigor@sysoev.ru } 10311Sigor@sysoev.ru 10411Sigor@sysoev.ru 10511Sigor@sysoev.ru void 10611Sigor@sysoev.ru nxt_port_destroy(nxt_port_t *port) 10711Sigor@sysoev.ru { 10813Sigor@sysoev.ru nxt_socket_close(port->socket.task, port->socket.fd); 10965Sigor@sysoev.ru nxt_mp_destroy(port->mem_pool); 11011Sigor@sysoev.ru } 11111Sigor@sysoev.ru 11211Sigor@sysoev.ru 11311Sigor@sysoev.ru void 11411Sigor@sysoev.ru nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port) 11511Sigor@sysoev.ru { 11611Sigor@sysoev.ru port->socket.fd = port->pair[1]; 11711Sigor@sysoev.ru port->socket.log = &nxt_main_log; 11811Sigor@sysoev.ru port->socket.write_ready = 1; 11911Sigor@sysoev.ru 120141Smax.romanov@nginx.com port->engine = task->thread->engine; 121141Smax.romanov@nginx.com 122141Smax.romanov@nginx.com port->socket.write_work_queue = &port->engine->fast_work_queue; 12311Sigor@sysoev.ru port->socket.write_handler = nxt_port_write_handler; 12411Sigor@sysoev.ru port->socket.error_handler = nxt_port_error_handler; 12511Sigor@sysoev.ru } 12611Sigor@sysoev.ru 12711Sigor@sysoev.ru 12811Sigor@sysoev.ru void 12911Sigor@sysoev.ru nxt_port_write_close(nxt_port_t *port) 13011Sigor@sysoev.ru { 13113Sigor@sysoev.ru nxt_socket_close(port->socket.task, port->pair[1]); 13211Sigor@sysoev.ru port->pair[1] = -1; 13311Sigor@sysoev.ru } 13411Sigor@sysoev.ru 13511Sigor@sysoev.ru 136122Smax.romanov@nginx.com static void 137*1125Smax.romanov@nginx.com nxt_port_release_send_msg(nxt_port_send_msg_t *msg) 138122Smax.romanov@nginx.com { 139*1125Smax.romanov@nginx.com if (msg->allocated) { 140*1125Smax.romanov@nginx.com nxt_free(msg); 141344Smax.romanov@nginx.com } 142122Smax.romanov@nginx.com } 143122Smax.romanov@nginx.com 144122Smax.romanov@nginx.com 14511Sigor@sysoev.ru nxt_int_t 146423Smax.romanov@nginx.com nxt_port_socket_twrite(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, 147423Smax.romanov@nginx.com nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b, 148423Smax.romanov@nginx.com void *tracking) 14911Sigor@sysoev.ru { 150*1125Smax.romanov@nginx.com nxt_int_t res; 151*1125Smax.romanov@nginx.com nxt_port_send_msg_t msg; 15211Sigor@sysoev.ru 153344Smax.romanov@nginx.com msg.link.next = NULL; 154344Smax.romanov@nginx.com msg.link.prev = NULL; 155122Smax.romanov@nginx.com 156344Smax.romanov@nginx.com msg.buf = b; 157*1125Smax.romanov@nginx.com msg.share = 0; 158344Smax.romanov@nginx.com msg.fd = fd; 159344Smax.romanov@nginx.com msg.close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0; 160*1125Smax.romanov@nginx.com msg.allocated = 0; 16111Sigor@sysoev.ru 162423Smax.romanov@nginx.com if (tracking != NULL) { 163423Smax.romanov@nginx.com nxt_port_mmap_tracking_write(msg.tracking_msg, tracking); 164423Smax.romanov@nginx.com } 165423Smax.romanov@nginx.com 166344Smax.romanov@nginx.com msg.port_msg.stream = stream; 167344Smax.romanov@nginx.com msg.port_msg.pid = nxt_pid; 168344Smax.romanov@nginx.com msg.port_msg.reply_port = reply_port; 169344Smax.romanov@nginx.com msg.port_msg.type = type & NXT_PORT_MSG_MASK; 170344Smax.romanov@nginx.com msg.port_msg.last = (type & NXT_PORT_MSG_LAST) != 0; 171344Smax.romanov@nginx.com msg.port_msg.mmap = 0; 172352Smax.romanov@nginx.com msg.port_msg.nf = 0; 173352Smax.romanov@nginx.com msg.port_msg.mf = 0; 174423Smax.romanov@nginx.com msg.port_msg.tracking = tracking != NULL; 17511Sigor@sysoev.ru 176*1125Smax.romanov@nginx.com res = nxt_port_msg_chk_insert(task, port, &msg); 177*1125Smax.romanov@nginx.com if (nxt_fast_path(res == NXT_DECLINED)) { 178344Smax.romanov@nginx.com nxt_port_write_handler(task, &port->socket, &msg); 179*1125Smax.romanov@nginx.com res = NXT_OK; 18011Sigor@sysoev.ru } 18111Sigor@sysoev.ru 182*1125Smax.romanov@nginx.com return res; 183*1125Smax.romanov@nginx.com } 184*1125Smax.romanov@nginx.com 185*1125Smax.romanov@nginx.com 186*1125Smax.romanov@nginx.com static nxt_int_t 187*1125Smax.romanov@nginx.com nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port, 188*1125Smax.romanov@nginx.com nxt_port_send_msg_t *msg) 189*1125Smax.romanov@nginx.com { 190*1125Smax.romanov@nginx.com nxt_int_t res; 191*1125Smax.romanov@nginx.com 192*1125Smax.romanov@nginx.com nxt_thread_mutex_lock(&port->write_mutex); 193*1125Smax.romanov@nginx.com 194*1125Smax.romanov@nginx.com if (nxt_fast_path(port->socket.write_ready 195*1125Smax.romanov@nginx.com && nxt_queue_is_empty(&port->messages))) 196*1125Smax.romanov@nginx.com { 197*1125Smax.romanov@nginx.com res = NXT_DECLINED; 198*1125Smax.romanov@nginx.com 199*1125Smax.romanov@nginx.com } else { 200*1125Smax.romanov@nginx.com msg = nxt_port_msg_alloc(msg); 201*1125Smax.romanov@nginx.com 202*1125Smax.romanov@nginx.com if (nxt_fast_path(msg != NULL)) { 203*1125Smax.romanov@nginx.com nxt_queue_insert_tail(&port->messages, &msg->link); 204*1125Smax.romanov@nginx.com nxt_port_use(task, port, 1); 205*1125Smax.romanov@nginx.com res = NXT_OK; 206*1125Smax.romanov@nginx.com 207*1125Smax.romanov@nginx.com } else { 208*1125Smax.romanov@nginx.com res = NXT_ERROR; 209*1125Smax.romanov@nginx.com } 210*1125Smax.romanov@nginx.com } 211*1125Smax.romanov@nginx.com 212*1125Smax.romanov@nginx.com nxt_thread_mutex_unlock(&port->write_mutex); 213*1125Smax.romanov@nginx.com 214*1125Smax.romanov@nginx.com return res; 215*1125Smax.romanov@nginx.com } 216*1125Smax.romanov@nginx.com 217*1125Smax.romanov@nginx.com 218*1125Smax.romanov@nginx.com static nxt_port_send_msg_t * 219*1125Smax.romanov@nginx.com nxt_port_msg_alloc(nxt_port_send_msg_t *m) 220*1125Smax.romanov@nginx.com { 221*1125Smax.romanov@nginx.com nxt_port_send_msg_t *msg; 222*1125Smax.romanov@nginx.com 223*1125Smax.romanov@nginx.com msg = nxt_malloc(sizeof(nxt_port_send_msg_t)); 224*1125Smax.romanov@nginx.com if (nxt_slow_path(msg == NULL)) { 225*1125Smax.romanov@nginx.com return NULL; 226*1125Smax.romanov@nginx.com } 227*1125Smax.romanov@nginx.com 228*1125Smax.romanov@nginx.com *msg = *m; 229*1125Smax.romanov@nginx.com 230*1125Smax.romanov@nginx.com msg->allocated = 1; 231*1125Smax.romanov@nginx.com 232*1125Smax.romanov@nginx.com return msg; 23311Sigor@sysoev.ru } 23411Sigor@sysoev.ru 23511Sigor@sysoev.ru 23611Sigor@sysoev.ru static void 237343Smax.romanov@nginx.com nxt_port_fd_block_write(nxt_task_t *task, nxt_port_t *port, void *data) 238343Smax.romanov@nginx.com { 239343Smax.romanov@nginx.com nxt_fd_event_block_write(task->thread->engine, &port->socket); 240343Smax.romanov@nginx.com } 241343Smax.romanov@nginx.com 242343Smax.romanov@nginx.com 243343Smax.romanov@nginx.com static void 244343Smax.romanov@nginx.com nxt_port_fd_enable_write(nxt_task_t *task, nxt_port_t *port, void *data) 245343Smax.romanov@nginx.com { 246343Smax.romanov@nginx.com nxt_fd_event_enable_write(task->thread->engine, &port->socket); 247343Smax.romanov@nginx.com } 248343Smax.romanov@nginx.com 249343Smax.romanov@nginx.com 250343Smax.romanov@nginx.com static void 25111Sigor@sysoev.ru nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) 25211Sigor@sysoev.ru { 253343Smax.romanov@nginx.com int use_delta; 254197Smax.romanov@nginx.com size_t plain_size; 25511Sigor@sysoev.ru ssize_t n; 256*1125Smax.romanov@nginx.com uint32_t mmsg_buf[3 * NXT_IOBUF_MAX * 10]; 257343Smax.romanov@nginx.com nxt_bool_t block_write, enable_write; 25811Sigor@sysoev.ru nxt_port_t *port; 259*1125Smax.romanov@nginx.com struct iovec iov[NXT_IOBUF_MAX * 10]; 260127Smax.romanov@nginx.com nxt_work_queue_t *wq; 261125Smax.romanov@nginx.com nxt_port_method_t m; 26211Sigor@sysoev.ru nxt_port_send_msg_t *msg; 26311Sigor@sysoev.ru nxt_sendbuf_coalesce_t sb; 26442Smax.romanov@nginx.com 265197Smax.romanov@nginx.com port = nxt_container_of(obj, nxt_port_t, socket); 26611Sigor@sysoev.ru 267343Smax.romanov@nginx.com block_write = 0; 268343Smax.romanov@nginx.com enable_write = 0; 269343Smax.romanov@nginx.com use_delta = 0; 270343Smax.romanov@nginx.com 271344Smax.romanov@nginx.com wq = &task->thread->engine->fast_work_queue; 272344Smax.romanov@nginx.com 27311Sigor@sysoev.ru do { 274*1125Smax.romanov@nginx.com if (data) { 275*1125Smax.romanov@nginx.com msg = data; 276*1125Smax.romanov@nginx.com 277*1125Smax.romanov@nginx.com } else { 278*1125Smax.romanov@nginx.com msg = nxt_port_msg_first(port); 27911Sigor@sysoev.ru 280*1125Smax.romanov@nginx.com if (msg == NULL) { 281*1125Smax.romanov@nginx.com block_write = 1; 282*1125Smax.romanov@nginx.com goto cleanup; 283*1125Smax.romanov@nginx.com } 28411Sigor@sysoev.ru } 28511Sigor@sysoev.ru 286*1125Smax.romanov@nginx.com next_fragment: 287*1125Smax.romanov@nginx.com 28814Sigor@sysoev.ru iov[0].iov_base = &msg->port_msg; 28914Sigor@sysoev.ru iov[0].iov_len = sizeof(nxt_port_msg_t); 29011Sigor@sysoev.ru 29111Sigor@sysoev.ru sb.buf = msg->buf; 29214Sigor@sysoev.ru sb.iobuf = &iov[1]; 29311Sigor@sysoev.ru sb.nmax = NXT_IOBUF_MAX - 1; 29411Sigor@sysoev.ru sb.sync = 0; 29511Sigor@sysoev.ru sb.last = 0; 29642Smax.romanov@nginx.com sb.size = 0; 29711Sigor@sysoev.ru sb.limit = port->max_size; 29811Sigor@sysoev.ru 299352Smax.romanov@nginx.com sb.limit_reached = 0; 300352Smax.romanov@nginx.com sb.nmax_reached = 0; 301352Smax.romanov@nginx.com 30242Smax.romanov@nginx.com m = nxt_port_mmap_get_method(task, port, msg->buf); 30342Smax.romanov@nginx.com 30442Smax.romanov@nginx.com if (m == NXT_PORT_METHOD_MMAP) { 30542Smax.romanov@nginx.com sb.limit = (1ULL << 31) - 1; 306352Smax.romanov@nginx.com sb.nmax = nxt_min(NXT_IOBUF_MAX * 10 - 1, 307352Smax.romanov@nginx.com port->max_size / PORT_MMAP_MIN_SIZE); 30842Smax.romanov@nginx.com } 30942Smax.romanov@nginx.com 310423Smax.romanov@nginx.com if (msg->port_msg.tracking) { 311423Smax.romanov@nginx.com iov[0].iov_len += sizeof(msg->tracking_msg); 312423Smax.romanov@nginx.com } 313423Smax.romanov@nginx.com 3141002Smax.romanov@nginx.com sb.limit -= iov[0].iov_len; 3151002Smax.romanov@nginx.com 31642Smax.romanov@nginx.com nxt_sendbuf_mem_coalesce(task, &sb); 31742Smax.romanov@nginx.com 31842Smax.romanov@nginx.com plain_size = sb.size; 31942Smax.romanov@nginx.com 32042Smax.romanov@nginx.com /* 32142Smax.romanov@nginx.com * Send through mmap enabled only when payload 32242Smax.romanov@nginx.com * is bigger than PORT_MMAP_MIN_SIZE. 32342Smax.romanov@nginx.com */ 32442Smax.romanov@nginx.com if (m == NXT_PORT_METHOD_MMAP && plain_size > PORT_MMAP_MIN_SIZE) { 325*1125Smax.romanov@nginx.com nxt_port_mmap_write(task, port, msg, &sb, mmsg_buf); 32642Smax.romanov@nginx.com 32742Smax.romanov@nginx.com } else { 32842Smax.romanov@nginx.com m = NXT_PORT_METHOD_PLAIN; 32942Smax.romanov@nginx.com } 33011Sigor@sysoev.ru 331189Smax.romanov@nginx.com msg->port_msg.last |= sb.last; 332352Smax.romanov@nginx.com msg->port_msg.mf = sb.limit_reached || sb.nmax_reached; 33311Sigor@sysoev.ru 33442Smax.romanov@nginx.com n = nxt_socketpair_send(&port->socket, msg->fd, iov, sb.niov + 1); 33511Sigor@sysoev.ru 33611Sigor@sysoev.ru if (n > 0) { 33742Smax.romanov@nginx.com if (nxt_slow_path((size_t) n != sb.size + iov[0].iov_len)) { 338564Svbart@nginx.com nxt_alert(task, "port %d: short write: %z instead of %uz", 339564Svbart@nginx.com port->socket.fd, n, sb.size + iov[0].iov_len); 34011Sigor@sysoev.ru goto fail; 34111Sigor@sysoev.ru } 34211Sigor@sysoev.ru 343189Smax.romanov@nginx.com if (msg->fd != -1 && msg->close_fd != 0) { 344189Smax.romanov@nginx.com nxt_fd_close(msg->fd); 345189Smax.romanov@nginx.com 346189Smax.romanov@nginx.com msg->fd = -1; 347189Smax.romanov@nginx.com } 348189Smax.romanov@nginx.com 349592Sigor@sysoev.ru msg->buf = nxt_port_buf_completion(task, wq, msg->buf, plain_size, 350*1125Smax.romanov@nginx.com m == NXT_PORT_METHOD_MMAP); 35111Sigor@sysoev.ru 35211Sigor@sysoev.ru if (msg->buf != NULL) { 353352Smax.romanov@nginx.com nxt_debug(task, "port %d: frag stream #%uD", port->socket.fd, 354352Smax.romanov@nginx.com msg->port_msg.stream); 355352Smax.romanov@nginx.com 35611Sigor@sysoev.ru /* 35711Sigor@sysoev.ru * A file descriptor is sent only 35811Sigor@sysoev.ru * in the first message of a stream. 35911Sigor@sysoev.ru */ 36011Sigor@sysoev.ru msg->fd = -1; 36111Sigor@sysoev.ru msg->share += n; 362352Smax.romanov@nginx.com msg->port_msg.nf = 1; 363423Smax.romanov@nginx.com msg->port_msg.tracking = 0; 36411Sigor@sysoev.ru 36511Sigor@sysoev.ru if (msg->share >= port->max_share) { 36611Sigor@sysoev.ru msg->share = 0; 367344Smax.romanov@nginx.com 368344Smax.romanov@nginx.com if (msg->link.next != NULL) { 369*1125Smax.romanov@nginx.com nxt_thread_mutex_lock(&port->write_mutex); 370*1125Smax.romanov@nginx.com 371344Smax.romanov@nginx.com nxt_queue_remove(&msg->link); 372*1125Smax.romanov@nginx.com nxt_queue_insert_tail(&port->messages, &msg->link); 373*1125Smax.romanov@nginx.com 374*1125Smax.romanov@nginx.com nxt_thread_mutex_unlock(&port->write_mutex); 375344Smax.romanov@nginx.com 376*1125Smax.romanov@nginx.com } else { 377*1125Smax.romanov@nginx.com msg = nxt_port_msg_insert_tail(port, msg); 378*1125Smax.romanov@nginx.com if (nxt_slow_path(msg == NULL)) { 379*1125Smax.romanov@nginx.com goto fail; 380*1125Smax.romanov@nginx.com } 381*1125Smax.romanov@nginx.com 382344Smax.romanov@nginx.com use_delta++; 383344Smax.romanov@nginx.com } 384*1125Smax.romanov@nginx.com 385*1125Smax.romanov@nginx.com } else { 386*1125Smax.romanov@nginx.com goto next_fragment; 38711Sigor@sysoev.ru } 38811Sigor@sysoev.ru 38911Sigor@sysoev.ru } else { 390344Smax.romanov@nginx.com if (msg->link.next != NULL) { 391*1125Smax.romanov@nginx.com nxt_thread_mutex_lock(&port->write_mutex); 392*1125Smax.romanov@nginx.com 393344Smax.romanov@nginx.com nxt_queue_remove(&msg->link); 394*1125Smax.romanov@nginx.com msg->link.next = NULL; 395*1125Smax.romanov@nginx.com 396*1125Smax.romanov@nginx.com nxt_thread_mutex_unlock(&port->write_mutex); 397*1125Smax.romanov@nginx.com 398344Smax.romanov@nginx.com use_delta--; 399344Smax.romanov@nginx.com } 400*1125Smax.romanov@nginx.com 401*1125Smax.romanov@nginx.com nxt_port_release_send_msg(msg); 402*1125Smax.romanov@nginx.com } 403*1125Smax.romanov@nginx.com 404*1125Smax.romanov@nginx.com if (data != NULL) { 405*1125Smax.romanov@nginx.com goto cleanup; 40611Sigor@sysoev.ru } 40711Sigor@sysoev.ru 4081004Smax.romanov@nginx.com } else { 409*1125Smax.romanov@nginx.com if (nxt_slow_path(n == NXT_ERROR)) { 410*1125Smax.romanov@nginx.com goto fail; 411344Smax.romanov@nginx.com } 4121004Smax.romanov@nginx.com 413*1125Smax.romanov@nginx.com if (msg->link.next == NULL) { 414*1125Smax.romanov@nginx.com msg = nxt_port_msg_insert_tail(port, msg); 415*1125Smax.romanov@nginx.com if (nxt_slow_path(msg == NULL)) { 416*1125Smax.romanov@nginx.com goto fail; 417*1125Smax.romanov@nginx.com } 418*1125Smax.romanov@nginx.com 419*1125Smax.romanov@nginx.com use_delta++; 4201004Smax.romanov@nginx.com } 42111Sigor@sysoev.ru } 42211Sigor@sysoev.ru 42311Sigor@sysoev.ru } while (port->socket.write_ready); 42411Sigor@sysoev.ru 42512Sigor@sysoev.ru if (nxt_fd_event_is_disabled(port->socket.write)) { 426343Smax.romanov@nginx.com enable_write = 1; 42711Sigor@sysoev.ru } 42811Sigor@sysoev.ru 429*1125Smax.romanov@nginx.com goto cleanup; 43011Sigor@sysoev.ru 43111Sigor@sysoev.ru fail: 43211Sigor@sysoev.ru 433343Smax.romanov@nginx.com use_delta++; 434343Smax.romanov@nginx.com 435344Smax.romanov@nginx.com nxt_work_queue_add(wq, nxt_port_error_handler, task, &port->socket, 436343Smax.romanov@nginx.com &port->socket); 437343Smax.romanov@nginx.com 438*1125Smax.romanov@nginx.com cleanup: 439343Smax.romanov@nginx.com 440343Smax.romanov@nginx.com if (block_write && nxt_fd_event_is_active(port->socket.write)) { 441343Smax.romanov@nginx.com nxt_port_post(task, port, nxt_port_fd_block_write, NULL); 442343Smax.romanov@nginx.com } 443343Smax.romanov@nginx.com 444343Smax.romanov@nginx.com if (enable_write) { 445343Smax.romanov@nginx.com nxt_port_post(task, port, nxt_port_fd_enable_write, NULL); 446343Smax.romanov@nginx.com } 447343Smax.romanov@nginx.com 448343Smax.romanov@nginx.com if (use_delta != 0) { 449343Smax.romanov@nginx.com nxt_port_use(task, port, use_delta); 450343Smax.romanov@nginx.com } 45111Sigor@sysoev.ru } 45211Sigor@sysoev.ru 45311Sigor@sysoev.ru 454*1125Smax.romanov@nginx.com static nxt_port_send_msg_t * 455*1125Smax.romanov@nginx.com nxt_port_msg_first(nxt_port_t *port) 456*1125Smax.romanov@nginx.com { 457*1125Smax.romanov@nginx.com nxt_queue_link_t *lnk; 458*1125Smax.romanov@nginx.com nxt_port_send_msg_t *msg; 459*1125Smax.romanov@nginx.com 460*1125Smax.romanov@nginx.com nxt_thread_mutex_lock(&port->write_mutex); 461*1125Smax.romanov@nginx.com 462*1125Smax.romanov@nginx.com lnk = nxt_queue_first(&port->messages); 463*1125Smax.romanov@nginx.com 464*1125Smax.romanov@nginx.com if (lnk == nxt_queue_tail(&port->messages)) { 465*1125Smax.romanov@nginx.com msg = NULL; 466*1125Smax.romanov@nginx.com 467*1125Smax.romanov@nginx.com } else { 468*1125Smax.romanov@nginx.com msg = nxt_queue_link_data(lnk, nxt_port_send_msg_t, link); 469*1125Smax.romanov@nginx.com } 470*1125Smax.romanov@nginx.com 471*1125Smax.romanov@nginx.com nxt_thread_mutex_unlock(&port->write_mutex); 472*1125Smax.romanov@nginx.com 473*1125Smax.romanov@nginx.com return msg; 474*1125Smax.romanov@nginx.com } 475*1125Smax.romanov@nginx.com 476*1125Smax.romanov@nginx.com 477592Sigor@sysoev.ru static nxt_buf_t * 478592Sigor@sysoev.ru nxt_port_buf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b, 479592Sigor@sysoev.ru size_t sent, nxt_bool_t mmap_mode) 480592Sigor@sysoev.ru { 481592Sigor@sysoev.ru size_t size; 482592Sigor@sysoev.ru 483592Sigor@sysoev.ru while (b != NULL) { 484592Sigor@sysoev.ru 485592Sigor@sysoev.ru nxt_prefetch(b->next); 486592Sigor@sysoev.ru 487592Sigor@sysoev.ru if (!nxt_buf_is_sync(b)) { 488592Sigor@sysoev.ru 489592Sigor@sysoev.ru size = nxt_buf_used_size(b); 490592Sigor@sysoev.ru 491592Sigor@sysoev.ru if (size != 0) { 492592Sigor@sysoev.ru 493592Sigor@sysoev.ru if (sent == 0) { 494592Sigor@sysoev.ru break; 495592Sigor@sysoev.ru } 496592Sigor@sysoev.ru 497592Sigor@sysoev.ru if (nxt_buf_is_port_mmap(b) && mmap_mode) { 498592Sigor@sysoev.ru /* 499592Sigor@sysoev.ru * buffer has been sent to other side which is now 500592Sigor@sysoev.ru * responsible for shared memory bucket release 501592Sigor@sysoev.ru */ 502592Sigor@sysoev.ru b->is_port_mmap_sent = 1; 503592Sigor@sysoev.ru } 504592Sigor@sysoev.ru 505592Sigor@sysoev.ru if (sent < size) { 506592Sigor@sysoev.ru 507592Sigor@sysoev.ru if (nxt_buf_is_mem(b)) { 508592Sigor@sysoev.ru b->mem.pos += sent; 509592Sigor@sysoev.ru } 510592Sigor@sysoev.ru 511592Sigor@sysoev.ru if (nxt_buf_is_file(b)) { 512592Sigor@sysoev.ru b->file_pos += sent; 513592Sigor@sysoev.ru } 514592Sigor@sysoev.ru 515592Sigor@sysoev.ru break; 516592Sigor@sysoev.ru } 517592Sigor@sysoev.ru 518592Sigor@sysoev.ru /* b->mem.free is NULL in file-only buffer. */ 519592Sigor@sysoev.ru b->mem.pos = b->mem.free; 520592Sigor@sysoev.ru 521592Sigor@sysoev.ru if (nxt_buf_is_file(b)) { 522592Sigor@sysoev.ru b->file_pos = b->file_end; 523592Sigor@sysoev.ru } 524592Sigor@sysoev.ru 525592Sigor@sysoev.ru sent -= size; 526592Sigor@sysoev.ru } 527592Sigor@sysoev.ru } 528592Sigor@sysoev.ru 529592Sigor@sysoev.ru nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); 530592Sigor@sysoev.ru 531592Sigor@sysoev.ru b = b->next; 532592Sigor@sysoev.ru } 533592Sigor@sysoev.ru 534592Sigor@sysoev.ru return b; 535592Sigor@sysoev.ru } 536592Sigor@sysoev.ru 537592Sigor@sysoev.ru 538*1125Smax.romanov@nginx.com static nxt_port_send_msg_t * 539*1125Smax.romanov@nginx.com nxt_port_msg_insert_tail(nxt_port_t *port, nxt_port_send_msg_t *msg) 540*1125Smax.romanov@nginx.com { 541*1125Smax.romanov@nginx.com if (msg->allocated == 0) { 542*1125Smax.romanov@nginx.com msg = nxt_port_msg_alloc(msg); 543*1125Smax.romanov@nginx.com 544*1125Smax.romanov@nginx.com if (nxt_slow_path(msg == NULL)) { 545*1125Smax.romanov@nginx.com return NULL; 546*1125Smax.romanov@nginx.com } 547*1125Smax.romanov@nginx.com } 548*1125Smax.romanov@nginx.com 549*1125Smax.romanov@nginx.com nxt_thread_mutex_lock(&port->write_mutex); 550*1125Smax.romanov@nginx.com 551*1125Smax.romanov@nginx.com nxt_queue_insert_tail(&port->messages, &msg->link); 552*1125Smax.romanov@nginx.com 553*1125Smax.romanov@nginx.com nxt_thread_mutex_unlock(&port->write_mutex); 554*1125Smax.romanov@nginx.com 555*1125Smax.romanov@nginx.com return msg; 556*1125Smax.romanov@nginx.com } 557*1125Smax.romanov@nginx.com 558*1125Smax.romanov@nginx.com 55911Sigor@sysoev.ru void 56011Sigor@sysoev.ru nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port) 56111Sigor@sysoev.ru { 56211Sigor@sysoev.ru port->socket.fd = port->pair[0]; 56311Sigor@sysoev.ru port->socket.log = &nxt_main_log; 56411Sigor@sysoev.ru 565141Smax.romanov@nginx.com port->engine = task->thread->engine; 566141Smax.romanov@nginx.com 567141Smax.romanov@nginx.com port->socket.read_work_queue = &port->engine->fast_work_queue; 56811Sigor@sysoev.ru port->socket.read_handler = nxt_port_read_handler; 56911Sigor@sysoev.ru port->socket.error_handler = nxt_port_error_handler; 57011Sigor@sysoev.ru 571141Smax.romanov@nginx.com nxt_fd_event_enable_read(port->engine, &port->socket); 57211Sigor@sysoev.ru } 57311Sigor@sysoev.ru 57411Sigor@sysoev.ru 57511Sigor@sysoev.ru void 57611Sigor@sysoev.ru nxt_port_read_close(nxt_port_t *port) 57711Sigor@sysoev.ru { 578350Smax.romanov@nginx.com port->socket.read_ready = 0; 5791015Smax.romanov@nginx.com port->socket.read = NXT_EVENT_INACTIVE; 58013Sigor@sysoev.ru nxt_socket_close(port->socket.task, port->pair[0]); 58111Sigor@sysoev.ru port->pair[0] = -1; 58211Sigor@sysoev.ru } 58311Sigor@sysoev.ru 58411Sigor@sysoev.ru 58511Sigor@sysoev.ru static void 58611Sigor@sysoev.ru nxt_port_read_handler(nxt_task_t *task, void *obj, void *data) 58711Sigor@sysoev.ru { 58842Smax.romanov@nginx.com ssize_t n; 58942Smax.romanov@nginx.com nxt_buf_t *b; 59042Smax.romanov@nginx.com nxt_port_t *port; 59142Smax.romanov@nginx.com struct iovec iov[2]; 59242Smax.romanov@nginx.com nxt_port_recv_msg_t msg; 59311Sigor@sysoev.ru 594125Smax.romanov@nginx.com port = msg.port = nxt_container_of(obj, nxt_port_t, socket); 59511Sigor@sysoev.ru 596141Smax.romanov@nginx.com nxt_assert(port->engine == task->thread->engine); 597141Smax.romanov@nginx.com 59811Sigor@sysoev.ru for ( ;; ) { 59911Sigor@sysoev.ru 60011Sigor@sysoev.ru b = nxt_port_buf_alloc(port); 60111Sigor@sysoev.ru 60211Sigor@sysoev.ru if (nxt_slow_path(b == NULL)) { 60311Sigor@sysoev.ru /* TODO: disable event for some time */ 60411Sigor@sysoev.ru } 60511Sigor@sysoev.ru 60642Smax.romanov@nginx.com iov[0].iov_base = &msg.port_msg; 60714Sigor@sysoev.ru iov[0].iov_len = sizeof(nxt_port_msg_t); 60811Sigor@sysoev.ru 60914Sigor@sysoev.ru iov[1].iov_base = b->mem.pos; 61014Sigor@sysoev.ru iov[1].iov_len = port->max_size; 61114Sigor@sysoev.ru 61242Smax.romanov@nginx.com n = nxt_socketpair_recv(&port->socket, &msg.fd, iov, 2); 61311Sigor@sysoev.ru 61411Sigor@sysoev.ru if (n > 0) { 61542Smax.romanov@nginx.com 61642Smax.romanov@nginx.com msg.buf = b; 61782Smax.romanov@nginx.com msg.size = n; 61842Smax.romanov@nginx.com 61982Smax.romanov@nginx.com nxt_port_read_msg_process(task, port, &msg); 62011Sigor@sysoev.ru 621194Smax.romanov@nginx.com /* 622194Smax.romanov@nginx.com * To disable instant completion or buffer re-usage, 623194Smax.romanov@nginx.com * handler should reset 'msg.buf'. 624194Smax.romanov@nginx.com */ 625194Smax.romanov@nginx.com if (msg.buf == b) { 62611Sigor@sysoev.ru nxt_port_buf_free(port, b); 62711Sigor@sysoev.ru } 62811Sigor@sysoev.ru 62911Sigor@sysoev.ru if (port->socket.read_ready) { 63011Sigor@sysoev.ru continue; 63111Sigor@sysoev.ru } 63211Sigor@sysoev.ru 63311Sigor@sysoev.ru return; 63411Sigor@sysoev.ru } 63511Sigor@sysoev.ru 63611Sigor@sysoev.ru if (n == NXT_AGAIN) { 63711Sigor@sysoev.ru nxt_port_buf_free(port, b); 63811Sigor@sysoev.ru 63912Sigor@sysoev.ru nxt_fd_event_enable_read(task->thread->engine, &port->socket); 64011Sigor@sysoev.ru return; 64111Sigor@sysoev.ru } 64211Sigor@sysoev.ru 64311Sigor@sysoev.ru /* n == 0 || n == NXT_ERROR */ 64411Sigor@sysoev.ru 64511Sigor@sysoev.ru nxt_work_queue_add(&task->thread->engine->fast_work_queue, 64611Sigor@sysoev.ru nxt_port_error_handler, task, &port->socket, NULL); 64711Sigor@sysoev.ru return; 64811Sigor@sysoev.ru } 64911Sigor@sysoev.ru } 65011Sigor@sysoev.ru 65111Sigor@sysoev.ru 6521005Smax.romanov@nginx.com typedef struct { 6531005Smax.romanov@nginx.com uint32_t stream; 6541005Smax.romanov@nginx.com uint32_t pid; 6551005Smax.romanov@nginx.com } nxt_port_frag_key_t; 6561005Smax.romanov@nginx.com 6571005Smax.romanov@nginx.com 658352Smax.romanov@nginx.com static nxt_int_t 659352Smax.romanov@nginx.com nxt_port_lvlhsh_frag_test(nxt_lvlhsh_query_t *lhq, void *data) 660352Smax.romanov@nginx.com { 661352Smax.romanov@nginx.com nxt_port_recv_msg_t *fmsg; 6621005Smax.romanov@nginx.com nxt_port_frag_key_t *frag_key; 663352Smax.romanov@nginx.com 664352Smax.romanov@nginx.com fmsg = data; 6651005Smax.romanov@nginx.com frag_key = (nxt_port_frag_key_t *) lhq->key.start; 666352Smax.romanov@nginx.com 6671005Smax.romanov@nginx.com if (lhq->key.length == sizeof(nxt_port_frag_key_t) 6681005Smax.romanov@nginx.com && frag_key->stream == fmsg->port_msg.stream 6691005Smax.romanov@nginx.com && frag_key->pid == (uint32_t) fmsg->port_msg.pid) 670352Smax.romanov@nginx.com { 671352Smax.romanov@nginx.com return NXT_OK; 672352Smax.romanov@nginx.com } 673352Smax.romanov@nginx.com 674352Smax.romanov@nginx.com return NXT_DECLINED; 675352Smax.romanov@nginx.com } 676352Smax.romanov@nginx.com 677352Smax.romanov@nginx.com 678352Smax.romanov@nginx.com static void * 679352Smax.romanov@nginx.com nxt_port_lvlhsh_frag_alloc(void *ctx, size_t size) 680352Smax.romanov@nginx.com { 6811084Smax.romanov@nginx.com return nxt_mp_align(ctx, size, size); 682352Smax.romanov@nginx.com } 683352Smax.romanov@nginx.com 684352Smax.romanov@nginx.com 685352Smax.romanov@nginx.com static void 686352Smax.romanov@nginx.com nxt_port_lvlhsh_frag_free(void *ctx, void *p) 687352Smax.romanov@nginx.com { 688389Smax.romanov@nginx.com nxt_mp_free(ctx, p); 689352Smax.romanov@nginx.com } 690352Smax.romanov@nginx.com 691352Smax.romanov@nginx.com 692352Smax.romanov@nginx.com static const nxt_lvlhsh_proto_t lvlhsh_frag_proto nxt_aligned(64) = { 693352Smax.romanov@nginx.com NXT_LVLHSH_DEFAULT, 694352Smax.romanov@nginx.com nxt_port_lvlhsh_frag_test, 695352Smax.romanov@nginx.com nxt_port_lvlhsh_frag_alloc, 696352Smax.romanov@nginx.com nxt_port_lvlhsh_frag_free, 697352Smax.romanov@nginx.com }; 698352Smax.romanov@nginx.com 699352Smax.romanov@nginx.com 700352Smax.romanov@nginx.com static nxt_port_recv_msg_t * 701352Smax.romanov@nginx.com nxt_port_frag_start(nxt_task_t *task, nxt_port_t *port, 702352Smax.romanov@nginx.com nxt_port_recv_msg_t *msg) 703352Smax.romanov@nginx.com { 704352Smax.romanov@nginx.com nxt_int_t res; 705352Smax.romanov@nginx.com nxt_lvlhsh_query_t lhq; 706352Smax.romanov@nginx.com nxt_port_recv_msg_t *fmsg; 7071005Smax.romanov@nginx.com nxt_port_frag_key_t frag_key; 708352Smax.romanov@nginx.com 709352Smax.romanov@nginx.com nxt_debug(task, "start frag stream #%uD", msg->port_msg.stream); 710352Smax.romanov@nginx.com 711352Smax.romanov@nginx.com fmsg = nxt_mp_alloc(port->mem_pool, sizeof(nxt_port_recv_msg_t)); 712352Smax.romanov@nginx.com 713352Smax.romanov@nginx.com if (nxt_slow_path(fmsg == NULL)) { 714352Smax.romanov@nginx.com return NULL; 715352Smax.romanov@nginx.com } 716352Smax.romanov@nginx.com 717352Smax.romanov@nginx.com *fmsg = *msg; 718352Smax.romanov@nginx.com 7191005Smax.romanov@nginx.com frag_key.stream = fmsg->port_msg.stream; 7201005Smax.romanov@nginx.com frag_key.pid = fmsg->port_msg.pid; 7211005Smax.romanov@nginx.com 7221005Smax.romanov@nginx.com lhq.key_hash = nxt_murmur_hash2(&frag_key, sizeof(nxt_port_frag_key_t)); 7231005Smax.romanov@nginx.com lhq.key.length = sizeof(nxt_port_frag_key_t); 7241005Smax.romanov@nginx.com lhq.key.start = (u_char *) &frag_key; 725352Smax.romanov@nginx.com lhq.proto = &lvlhsh_frag_proto; 726352Smax.romanov@nginx.com lhq.replace = 0; 727352Smax.romanov@nginx.com lhq.value = fmsg; 728352Smax.romanov@nginx.com lhq.pool = port->mem_pool; 729352Smax.romanov@nginx.com 730352Smax.romanov@nginx.com res = nxt_lvlhsh_insert(&port->frags, &lhq); 731352Smax.romanov@nginx.com 732352Smax.romanov@nginx.com switch (res) { 733352Smax.romanov@nginx.com 734352Smax.romanov@nginx.com case NXT_OK: 735352Smax.romanov@nginx.com return fmsg; 736352Smax.romanov@nginx.com 737352Smax.romanov@nginx.com case NXT_DECLINED: 738352Smax.romanov@nginx.com nxt_log(task, NXT_LOG_WARN, "duplicate frag stream #%uD", 739352Smax.romanov@nginx.com fmsg->port_msg.stream); 740352Smax.romanov@nginx.com nxt_mp_free(port->mem_pool, fmsg); 741352Smax.romanov@nginx.com 742352Smax.romanov@nginx.com return NULL; 743352Smax.romanov@nginx.com 744352Smax.romanov@nginx.com default: 745352Smax.romanov@nginx.com nxt_log(task, NXT_LOG_WARN, "failed to add frag stream #%uD", 746352Smax.romanov@nginx.com fmsg->port_msg.stream); 747352Smax.romanov@nginx.com 748352Smax.romanov@nginx.com nxt_mp_free(port->mem_pool, fmsg); 749352Smax.romanov@nginx.com 750352Smax.romanov@nginx.com return NULL; 751352Smax.romanov@nginx.com 752352Smax.romanov@nginx.com } 753352Smax.romanov@nginx.com } 754352Smax.romanov@nginx.com 755352Smax.romanov@nginx.com 756352Smax.romanov@nginx.com static nxt_port_recv_msg_t * 7571005Smax.romanov@nginx.com nxt_port_frag_find(nxt_task_t *task, nxt_port_t *port, nxt_port_recv_msg_t *msg) 758352Smax.romanov@nginx.com { 7591005Smax.romanov@nginx.com nxt_int_t res; 7601005Smax.romanov@nginx.com nxt_bool_t last; 7611005Smax.romanov@nginx.com nxt_lvlhsh_query_t lhq; 7621005Smax.romanov@nginx.com nxt_port_frag_key_t frag_key; 7631005Smax.romanov@nginx.com 7641005Smax.romanov@nginx.com last = msg->port_msg.mf == 0; 765352Smax.romanov@nginx.com 7661005Smax.romanov@nginx.com nxt_debug(task, "%s frag stream #%uD", last ? "last" : "next", 7671005Smax.romanov@nginx.com msg->port_msg.stream); 768352Smax.romanov@nginx.com 7691005Smax.romanov@nginx.com frag_key.stream = msg->port_msg.stream; 7701005Smax.romanov@nginx.com frag_key.pid = msg->port_msg.pid; 7711005Smax.romanov@nginx.com 7721005Smax.romanov@nginx.com lhq.key_hash = nxt_murmur_hash2(&frag_key, sizeof(nxt_port_frag_key_t)); 7731005Smax.romanov@nginx.com lhq.key.length = sizeof(nxt_port_frag_key_t); 7741005Smax.romanov@nginx.com lhq.key.start = (u_char *) &frag_key; 775352Smax.romanov@nginx.com lhq.proto = &lvlhsh_frag_proto; 776352Smax.romanov@nginx.com lhq.pool = port->mem_pool; 777352Smax.romanov@nginx.com 778352Smax.romanov@nginx.com res = last != 0 ? nxt_lvlhsh_delete(&port->frags, &lhq) : 779352Smax.romanov@nginx.com nxt_lvlhsh_find(&port->frags, &lhq); 780352Smax.romanov@nginx.com 781352Smax.romanov@nginx.com switch (res) { 782352Smax.romanov@nginx.com 783352Smax.romanov@nginx.com case NXT_OK: 784352Smax.romanov@nginx.com return lhq.value; 785352Smax.romanov@nginx.com 786352Smax.romanov@nginx.com default: 7871005Smax.romanov@nginx.com nxt_log(task, NXT_LOG_INFO, "frag stream #%uD not found", 7881005Smax.romanov@nginx.com frag_key.stream); 789352Smax.romanov@nginx.com 790352Smax.romanov@nginx.com return NULL; 791352Smax.romanov@nginx.com } 792352Smax.romanov@nginx.com } 793352Smax.romanov@nginx.com 794352Smax.romanov@nginx.com 79511Sigor@sysoev.ru static void 79611Sigor@sysoev.ru nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, 79782Smax.romanov@nginx.com nxt_port_recv_msg_t *msg) 79811Sigor@sysoev.ru { 799352Smax.romanov@nginx.com nxt_buf_t *b, *orig_b; 800352Smax.romanov@nginx.com nxt_port_recv_msg_t *fmsg; 80111Sigor@sysoev.ru 80282Smax.romanov@nginx.com if (nxt_slow_path(msg->size < sizeof(nxt_port_msg_t))) { 803564Svbart@nginx.com nxt_alert(task, "port %d: too small message:%uz", 804564Svbart@nginx.com port->socket.fd, msg->size); 805423Smax.romanov@nginx.com 806423Smax.romanov@nginx.com if (msg->fd != -1) { 807423Smax.romanov@nginx.com nxt_fd_close(msg->fd); 808423Smax.romanov@nginx.com } 809423Smax.romanov@nginx.com 810423Smax.romanov@nginx.com return; 81111Sigor@sysoev.ru } 81211Sigor@sysoev.ru 81342Smax.romanov@nginx.com /* adjust size to actual buffer used size */ 81482Smax.romanov@nginx.com msg->size -= sizeof(nxt_port_msg_t); 81542Smax.romanov@nginx.com 81642Smax.romanov@nginx.com b = orig_b = msg->buf; 81782Smax.romanov@nginx.com b->mem.free += msg->size; 81842Smax.romanov@nginx.com 819423Smax.romanov@nginx.com if (msg->port_msg.tracking) { 820423Smax.romanov@nginx.com msg->cancelled = nxt_port_mmap_tracking_read(task, msg) == 0; 821423Smax.romanov@nginx.com 822423Smax.romanov@nginx.com } else { 823423Smax.romanov@nginx.com msg->cancelled = 0; 82442Smax.romanov@nginx.com } 82511Sigor@sysoev.ru 826352Smax.romanov@nginx.com if (nxt_slow_path(msg->port_msg.nf != 0)) { 827423Smax.romanov@nginx.com 8281005Smax.romanov@nginx.com fmsg = nxt_port_frag_find(task, port, msg); 829352Smax.romanov@nginx.com 830551Smax.romanov@nginx.com if (nxt_slow_path(fmsg == NULL)) { 831551Smax.romanov@nginx.com goto fmsg_failed; 832551Smax.romanov@nginx.com } 833423Smax.romanov@nginx.com 834423Smax.romanov@nginx.com if (nxt_fast_path(fmsg->cancelled == 0)) { 835423Smax.romanov@nginx.com 836423Smax.romanov@nginx.com if (msg->port_msg.mmap) { 837423Smax.romanov@nginx.com nxt_port_mmap_read(task, msg); 838423Smax.romanov@nginx.com } 839423Smax.romanov@nginx.com 840423Smax.romanov@nginx.com nxt_buf_chain_add(&fmsg->buf, msg->buf); 841423Smax.romanov@nginx.com 842423Smax.romanov@nginx.com fmsg->size += msg->size; 843423Smax.romanov@nginx.com msg->buf = NULL; 844423Smax.romanov@nginx.com b = NULL; 845423Smax.romanov@nginx.com 846423Smax.romanov@nginx.com if (nxt_fast_path(msg->port_msg.mf == 0)) { 847423Smax.romanov@nginx.com 848423Smax.romanov@nginx.com b = fmsg->buf; 849423Smax.romanov@nginx.com 850423Smax.romanov@nginx.com port->handler(task, fmsg); 851423Smax.romanov@nginx.com 852423Smax.romanov@nginx.com msg->buf = fmsg->buf; 853423Smax.romanov@nginx.com msg->fd = fmsg->fd; 854974Smax.romanov@nginx.com 855974Smax.romanov@nginx.com /* 856974Smax.romanov@nginx.com * To disable instant completion or buffer re-usage, 857974Smax.romanov@nginx.com * handler should reset 'msg.buf'. 858974Smax.romanov@nginx.com */ 859974Smax.romanov@nginx.com if (!msg->port_msg.mmap && msg->buf == b) { 860974Smax.romanov@nginx.com nxt_port_buf_free(port, b); 861974Smax.romanov@nginx.com } 862423Smax.romanov@nginx.com } 863352Smax.romanov@nginx.com } 864352Smax.romanov@nginx.com 865352Smax.romanov@nginx.com if (nxt_fast_path(msg->port_msg.mf == 0)) { 866352Smax.romanov@nginx.com nxt_mp_free(port->mem_pool, fmsg); 867352Smax.romanov@nginx.com } 868352Smax.romanov@nginx.com } else { 869352Smax.romanov@nginx.com if (nxt_slow_path(msg->port_msg.mf != 0)) { 870423Smax.romanov@nginx.com 871423Smax.romanov@nginx.com if (msg->port_msg.mmap && msg->cancelled == 0) { 872423Smax.romanov@nginx.com nxt_port_mmap_read(task, msg); 873423Smax.romanov@nginx.com b = msg->buf; 874423Smax.romanov@nginx.com } 875423Smax.romanov@nginx.com 876352Smax.romanov@nginx.com fmsg = nxt_port_frag_start(task, port, msg); 877352Smax.romanov@nginx.com 878551Smax.romanov@nginx.com if (nxt_slow_path(fmsg == NULL)) { 879551Smax.romanov@nginx.com goto fmsg_failed; 880551Smax.romanov@nginx.com } 881352Smax.romanov@nginx.com 882352Smax.romanov@nginx.com fmsg->port_msg.nf = 0; 883352Smax.romanov@nginx.com fmsg->port_msg.mf = 0; 884352Smax.romanov@nginx.com 885423Smax.romanov@nginx.com if (nxt_fast_path(msg->cancelled == 0)) { 886423Smax.romanov@nginx.com msg->buf = NULL; 887423Smax.romanov@nginx.com msg->fd = -1; 888423Smax.romanov@nginx.com b = NULL; 889423Smax.romanov@nginx.com 890423Smax.romanov@nginx.com } else { 891423Smax.romanov@nginx.com if (msg->fd != -1) { 892423Smax.romanov@nginx.com nxt_fd_close(msg->fd); 893423Smax.romanov@nginx.com } 894423Smax.romanov@nginx.com } 895352Smax.romanov@nginx.com } else { 896423Smax.romanov@nginx.com if (nxt_fast_path(msg->cancelled == 0)) { 897423Smax.romanov@nginx.com 898423Smax.romanov@nginx.com if (msg->port_msg.mmap) { 899423Smax.romanov@nginx.com nxt_port_mmap_read(task, msg); 900423Smax.romanov@nginx.com b = msg->buf; 901423Smax.romanov@nginx.com } 902423Smax.romanov@nginx.com 903423Smax.romanov@nginx.com port->handler(task, msg); 904423Smax.romanov@nginx.com } 905352Smax.romanov@nginx.com } 906352Smax.romanov@nginx.com } 90742Smax.romanov@nginx.com 908551Smax.romanov@nginx.com fmsg_failed: 909551Smax.romanov@nginx.com 91082Smax.romanov@nginx.com if (msg->port_msg.mmap && orig_b != b) { 91142Smax.romanov@nginx.com 912194Smax.romanov@nginx.com /* 913194Smax.romanov@nginx.com * To disable instant buffer completion, 914194Smax.romanov@nginx.com * handler should reset 'msg->buf'. 915194Smax.romanov@nginx.com */ 916194Smax.romanov@nginx.com if (msg->buf == b) { 917194Smax.romanov@nginx.com /* complete mmap buffers */ 918194Smax.romanov@nginx.com for (; b != NULL; b = b->next) { 919194Smax.romanov@nginx.com nxt_debug(task, "complete buffer %p", b); 920194Smax.romanov@nginx.com 921194Smax.romanov@nginx.com nxt_work_queue_add(port->socket.read_work_queue, 922194Smax.romanov@nginx.com b->completion_handler, task, b, b->parent); 923194Smax.romanov@nginx.com } 92442Smax.romanov@nginx.com } 925194Smax.romanov@nginx.com 926194Smax.romanov@nginx.com /* restore original buf */ 927194Smax.romanov@nginx.com msg->buf = orig_b; 92842Smax.romanov@nginx.com } 92911Sigor@sysoev.ru } 93011Sigor@sysoev.ru 93111Sigor@sysoev.ru 93211Sigor@sysoev.ru static nxt_buf_t * 93311Sigor@sysoev.ru nxt_port_buf_alloc(nxt_port_t *port) 93411Sigor@sysoev.ru { 93511Sigor@sysoev.ru nxt_buf_t *b; 93611Sigor@sysoev.ru 93711Sigor@sysoev.ru if (port->free_bufs != NULL) { 93811Sigor@sysoev.ru b = port->free_bufs; 93911Sigor@sysoev.ru port->free_bufs = b->next; 94011Sigor@sysoev.ru 94111Sigor@sysoev.ru b->mem.pos = b->mem.start; 94211Sigor@sysoev.ru b->mem.free = b->mem.start; 94342Smax.romanov@nginx.com b->next = NULL; 94411Sigor@sysoev.ru } else { 94511Sigor@sysoev.ru b = nxt_buf_mem_alloc(port->mem_pool, port->max_size, 0); 94611Sigor@sysoev.ru if (nxt_slow_path(b == NULL)) { 94711Sigor@sysoev.ru return NULL; 94811Sigor@sysoev.ru } 94911Sigor@sysoev.ru } 95011Sigor@sysoev.ru 95111Sigor@sysoev.ru return b; 95211Sigor@sysoev.ru } 95311Sigor@sysoev.ru 95411Sigor@sysoev.ru 95511Sigor@sysoev.ru static void 95611Sigor@sysoev.ru nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b) 95711Sigor@sysoev.ru { 958974Smax.romanov@nginx.com nxt_buf_chain_add(&b, port->free_bufs); 95911Sigor@sysoev.ru port->free_bufs = b; 96011Sigor@sysoev.ru } 96111Sigor@sysoev.ru 96211Sigor@sysoev.ru 96311Sigor@sysoev.ru static void 96411Sigor@sysoev.ru nxt_port_error_handler(nxt_task_t *task, void *obj, void *data) 96511Sigor@sysoev.ru { 966343Smax.romanov@nginx.com int use_delta; 967197Smax.romanov@nginx.com nxt_buf_t *b; 968197Smax.romanov@nginx.com nxt_port_t *port; 969197Smax.romanov@nginx.com nxt_work_queue_t *wq; 970197Smax.romanov@nginx.com nxt_port_send_msg_t *msg; 971197Smax.romanov@nginx.com 972125Smax.romanov@nginx.com nxt_debug(task, "port error handler %p", obj); 97311Sigor@sysoev.ru /* TODO */ 974197Smax.romanov@nginx.com 975197Smax.romanov@nginx.com port = nxt_container_of(obj, nxt_port_t, socket); 976197Smax.romanov@nginx.com 977343Smax.romanov@nginx.com use_delta = 0; 978343Smax.romanov@nginx.com 979343Smax.romanov@nginx.com if (obj == data) { 980343Smax.romanov@nginx.com use_delta--; 981343Smax.romanov@nginx.com } 982197Smax.romanov@nginx.com 983343Smax.romanov@nginx.com wq = &task->thread->engine->fast_work_queue; 984343Smax.romanov@nginx.com 985343Smax.romanov@nginx.com nxt_thread_mutex_lock(&port->write_mutex); 986343Smax.romanov@nginx.com 987343Smax.romanov@nginx.com nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) { 988197Smax.romanov@nginx.com 989521Szelenkov@nginx.com for (b = msg->buf; b != NULL; b = b->next) { 990197Smax.romanov@nginx.com if (nxt_buf_is_sync(b)) { 991197Smax.romanov@nginx.com continue; 992197Smax.romanov@nginx.com } 993197Smax.romanov@nginx.com 994197Smax.romanov@nginx.com nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); 995197Smax.romanov@nginx.com } 996197Smax.romanov@nginx.com 997197Smax.romanov@nginx.com nxt_queue_remove(&msg->link); 998343Smax.romanov@nginx.com use_delta--; 999*1125Smax.romanov@nginx.com 1000*1125Smax.romanov@nginx.com nxt_port_release_send_msg(msg); 1001197Smax.romanov@nginx.com 1002197Smax.romanov@nginx.com } nxt_queue_loop; 1003343Smax.romanov@nginx.com 1004343Smax.romanov@nginx.com nxt_thread_mutex_unlock(&port->write_mutex); 1005343Smax.romanov@nginx.com 1006343Smax.romanov@nginx.com if (use_delta != 0) { 1007343Smax.romanov@nginx.com nxt_port_use(task, port, use_delta); 1008343Smax.romanov@nginx.com } 100911Sigor@sysoev.ru } 1010