111Sigor@sysoev.ru 211Sigor@sysoev.ru /* 311Sigor@sysoev.ru * Copyright (C) Igor Sysoev 411Sigor@sysoev.ru * Copyright (C) NGINX, Inc. 511Sigor@sysoev.ru */ 611Sigor@sysoev.ru 711Sigor@sysoev.ru #include <nxt_main.h> 81555Smax.romanov@nginx.com #include <nxt_port_queue.h> 91832Smax.romanov@nginx.com #include <nxt_port_memory_int.h> 1011Sigor@sysoev.ru 1111Sigor@sysoev.ru 121832Smax.romanov@nginx.com #define NXT_PORT_MAX_ENQUEUE_BUF_SIZE \ 131832Smax.romanov@nginx.com (int) (NXT_PORT_QUEUE_MSG_SIZE - sizeof(nxt_port_msg_t)) 141832Smax.romanov@nginx.com 151832Smax.romanov@nginx.com 161832Smax.romanov@nginx.com static nxt_bool_t nxt_port_can_enqueue_buf(nxt_buf_t *b); 171832Smax.romanov@nginx.com static uint8_t nxt_port_enqueue_buf(nxt_task_t *task, nxt_port_msg_t *pm, 181832Smax.romanov@nginx.com void *qbuf, nxt_buf_t *b); 191125Smax.romanov@nginx.com static nxt_int_t nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port, 201125Smax.romanov@nginx.com nxt_port_send_msg_t *msg); 211125Smax.romanov@nginx.com static nxt_port_send_msg_t *nxt_port_msg_alloc(nxt_port_send_msg_t *m); 2211Sigor@sysoev.ru static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data); 231125Smax.romanov@nginx.com static nxt_port_send_msg_t *nxt_port_msg_first(nxt_port_t *port); 24592Sigor@sysoev.ru static nxt_buf_t *nxt_port_buf_completion(nxt_task_t *task, 25592Sigor@sysoev.ru nxt_work_queue_t *wq, nxt_buf_t *b, size_t sent, nxt_bool_t mmap_mode); 261125Smax.romanov@nginx.com static nxt_port_send_msg_t *nxt_port_msg_insert_tail(nxt_port_t *port, 271125Smax.romanov@nginx.com nxt_port_send_msg_t *msg); 2811Sigor@sysoev.ru static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data); 291555Smax.romanov@nginx.com static void nxt_port_queue_read_handler(nxt_task_t *task, void *obj, 301555Smax.romanov@nginx.com void *data); 3111Sigor@sysoev.ru static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, 3282Smax.romanov@nginx.com nxt_port_recv_msg_t *msg); 3311Sigor@sysoev.ru static nxt_buf_t *nxt_port_buf_alloc(nxt_port_t *port); 3411Sigor@sysoev.ru static void nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b); 3511Sigor@sysoev.ru static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data); 3611Sigor@sysoev.ru 3711Sigor@sysoev.ru 3814Sigor@sysoev.ru nxt_int_t 3914Sigor@sysoev.ru nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port, size_t max_size) 4011Sigor@sysoev.ru { 4165Sigor@sysoev.ru nxt_int_t sndbuf, rcvbuf, size; 4265Sigor@sysoev.ru nxt_socket_t snd, rcv; 4311Sigor@sysoev.ru 4414Sigor@sysoev.ru port->socket.task = task; 4514Sigor@sysoev.ru 4614Sigor@sysoev.ru port->pair[0] = -1; 4714Sigor@sysoev.ru port->pair[1] = -1; 4814Sigor@sysoev.ru 4913Sigor@sysoev.ru if (nxt_slow_path(nxt_socketpair_create(task, port->pair) != NXT_OK)) { 5011Sigor@sysoev.ru goto socketpair_fail; 5111Sigor@sysoev.ru } 5211Sigor@sysoev.ru 5311Sigor@sysoev.ru snd = port->pair[1]; 5411Sigor@sysoev.ru 5513Sigor@sysoev.ru sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF); 5611Sigor@sysoev.ru if (nxt_slow_path(sndbuf < 0)) { 5711Sigor@sysoev.ru goto getsockopt_fail; 5811Sigor@sysoev.ru } 5911Sigor@sysoev.ru 6011Sigor@sysoev.ru rcv = port->pair[0]; 6111Sigor@sysoev.ru 6213Sigor@sysoev.ru rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF); 6311Sigor@sysoev.ru if (nxt_slow_path(rcvbuf < 0)) { 6411Sigor@sysoev.ru goto getsockopt_fail; 6511Sigor@sysoev.ru } 6611Sigor@sysoev.ru 6711Sigor@sysoev.ru if (max_size == 0) { 6811Sigor@sysoev.ru max_size = 16 * 1024; 6911Sigor@sysoev.ru } 7011Sigor@sysoev.ru 7111Sigor@sysoev.ru if ((size_t) sndbuf < max_size) { 7211Sigor@sysoev.ru /* 7311Sigor@sysoev.ru * On Unix domain sockets 7411Sigor@sysoev.ru * Linux uses 224K on both send and receive directions; 7511Sigor@sysoev.ru * FreeBSD, MacOSX, NetBSD, and OpenBSD use 2K buffer size 7611Sigor@sysoev.ru * on send direction and 4K buffer size on receive direction; 7711Sigor@sysoev.ru * Solaris uses 16K on send direction and 5K on receive direction. 7811Sigor@sysoev.ru */ 7913Sigor@sysoev.ru (void) nxt_socket_setsockopt(task, snd, SOL_SOCKET, SO_SNDBUF, 8013Sigor@sysoev.ru max_size); 8111Sigor@sysoev.ru 8213Sigor@sysoev.ru sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF); 8311Sigor@sysoev.ru if (nxt_slow_path(sndbuf < 0)) { 8411Sigor@sysoev.ru goto getsockopt_fail; 8511Sigor@sysoev.ru } 8611Sigor@sysoev.ru 8711Sigor@sysoev.ru size = sndbuf * 4; 8811Sigor@sysoev.ru 8911Sigor@sysoev.ru if (rcvbuf < size) { 9013Sigor@sysoev.ru (void) nxt_socket_setsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF, 9113Sigor@sysoev.ru size); 9211Sigor@sysoev.ru 9313Sigor@sysoev.ru rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF); 9411Sigor@sysoev.ru if (nxt_slow_path(rcvbuf < 0)) { 9511Sigor@sysoev.ru goto getsockopt_fail; 9611Sigor@sysoev.ru } 9711Sigor@sysoev.ru } 9811Sigor@sysoev.ru } 9911Sigor@sysoev.ru 10011Sigor@sysoev.ru port->max_size = nxt_min(max_size, (size_t) sndbuf); 10111Sigor@sysoev.ru port->max_share = (64 * 1024); 10211Sigor@sysoev.ru 10314Sigor@sysoev.ru return NXT_OK; 10411Sigor@sysoev.ru 10511Sigor@sysoev.ru getsockopt_fail: 10611Sigor@sysoev.ru 10713Sigor@sysoev.ru nxt_socket_close(task, port->pair[0]); 10813Sigor@sysoev.ru nxt_socket_close(task, port->pair[1]); 10911Sigor@sysoev.ru 11011Sigor@sysoev.ru socketpair_fail: 11111Sigor@sysoev.ru 11214Sigor@sysoev.ru return NXT_ERROR; 11311Sigor@sysoev.ru } 11411Sigor@sysoev.ru 11511Sigor@sysoev.ru 11611Sigor@sysoev.ru void 11711Sigor@sysoev.ru nxt_port_destroy(nxt_port_t *port) 11811Sigor@sysoev.ru { 11913Sigor@sysoev.ru nxt_socket_close(port->socket.task, port->socket.fd); 12065Sigor@sysoev.ru nxt_mp_destroy(port->mem_pool); 12111Sigor@sysoev.ru } 12211Sigor@sysoev.ru 12311Sigor@sysoev.ru 12411Sigor@sysoev.ru void 12511Sigor@sysoev.ru nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port) 12611Sigor@sysoev.ru { 12711Sigor@sysoev.ru port->socket.fd = port->pair[1]; 12811Sigor@sysoev.ru port->socket.log = &nxt_main_log; 12911Sigor@sysoev.ru port->socket.write_ready = 1; 13011Sigor@sysoev.ru 131141Smax.romanov@nginx.com port->engine = task->thread->engine; 132141Smax.romanov@nginx.com 133141Smax.romanov@nginx.com port->socket.write_work_queue = &port->engine->fast_work_queue; 13411Sigor@sysoev.ru port->socket.write_handler = nxt_port_write_handler; 13511Sigor@sysoev.ru port->socket.error_handler = nxt_port_error_handler; 13611Sigor@sysoev.ru } 13711Sigor@sysoev.ru 13811Sigor@sysoev.ru 13911Sigor@sysoev.ru void 14011Sigor@sysoev.ru nxt_port_write_close(nxt_port_t *port) 14111Sigor@sysoev.ru { 14213Sigor@sysoev.ru nxt_socket_close(port->socket.task, port->pair[1]); 14311Sigor@sysoev.ru port->pair[1] = -1; 14411Sigor@sysoev.ru } 14511Sigor@sysoev.ru 14611Sigor@sysoev.ru 147122Smax.romanov@nginx.com static void 1481125Smax.romanov@nginx.com nxt_port_release_send_msg(nxt_port_send_msg_t *msg) 149122Smax.romanov@nginx.com { 1501125Smax.romanov@nginx.com if (msg->allocated) { 1511125Smax.romanov@nginx.com nxt_free(msg); 152344Smax.romanov@nginx.com } 153122Smax.romanov@nginx.com } 154122Smax.romanov@nginx.com 155122Smax.romanov@nginx.com 15611Sigor@sysoev.ru nxt_int_t 1571555Smax.romanov@nginx.com nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, 1581555Smax.romanov@nginx.com nxt_fd_t fd, nxt_fd_t fd2, uint32_t stream, nxt_port_id_t reply_port, 1591555Smax.romanov@nginx.com nxt_buf_t *b) 16011Sigor@sysoev.ru { 1611555Smax.romanov@nginx.com int notify; 1621832Smax.romanov@nginx.com uint8_t qmsg_size; 1631125Smax.romanov@nginx.com nxt_int_t res; 1641125Smax.romanov@nginx.com nxt_port_send_msg_t msg; 1651832Smax.romanov@nginx.com struct { 1661832Smax.romanov@nginx.com nxt_port_msg_t pm; 1671832Smax.romanov@nginx.com uint8_t buf[NXT_PORT_MAX_ENQUEUE_BUF_SIZE]; 1681832Smax.romanov@nginx.com } qmsg; 16911Sigor@sysoev.ru 170344Smax.romanov@nginx.com msg.link.next = NULL; 171344Smax.romanov@nginx.com msg.link.prev = NULL; 172122Smax.romanov@nginx.com 173344Smax.romanov@nginx.com msg.buf = b; 1741125Smax.romanov@nginx.com msg.share = 0; 1751558Smax.romanov@nginx.com msg.fd[0] = fd; 1761558Smax.romanov@nginx.com msg.fd[1] = fd2; 177344Smax.romanov@nginx.com msg.close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0; 1781125Smax.romanov@nginx.com msg.allocated = 0; 17911Sigor@sysoev.ru 180344Smax.romanov@nginx.com msg.port_msg.stream = stream; 181344Smax.romanov@nginx.com msg.port_msg.pid = nxt_pid; 182344Smax.romanov@nginx.com msg.port_msg.reply_port = reply_port; 183344Smax.romanov@nginx.com msg.port_msg.type = type & NXT_PORT_MSG_MASK; 184344Smax.romanov@nginx.com msg.port_msg.last = (type & NXT_PORT_MSG_LAST) != 0; 185344Smax.romanov@nginx.com msg.port_msg.mmap = 0; 186352Smax.romanov@nginx.com msg.port_msg.nf = 0; 187352Smax.romanov@nginx.com msg.port_msg.mf = 0; 1881555Smax.romanov@nginx.com 1891555Smax.romanov@nginx.com if (port->queue != NULL && type != _NXT_PORT_MSG_READ_QUEUE) { 1901555Smax.romanov@nginx.com 1911832Smax.romanov@nginx.com if (fd == -1 && nxt_port_can_enqueue_buf(b)) { 1921832Smax.romanov@nginx.com qmsg.pm = msg.port_msg; 1931832Smax.romanov@nginx.com 1941832Smax.romanov@nginx.com qmsg_size = sizeof(qmsg.pm); 1951832Smax.romanov@nginx.com 1961555Smax.romanov@nginx.com if (b != NULL) { 1971832Smax.romanov@nginx.com qmsg_size += nxt_port_enqueue_buf(task, &qmsg.pm, qmsg.buf, b); 1981555Smax.romanov@nginx.com } 1991555Smax.romanov@nginx.com 2001832Smax.romanov@nginx.com res = nxt_port_queue_send(port->queue, &qmsg, qmsg_size, ¬ify); 2011555Smax.romanov@nginx.com 2021555Smax.romanov@nginx.com nxt_debug(task, "port{%d,%d} %d: enqueue %d notify %d, %d", 2031555Smax.romanov@nginx.com (int) port->pid, (int) port->id, port->socket.fd, 2041832Smax.romanov@nginx.com (int) qmsg_size, notify, res); 2051832Smax.romanov@nginx.com 2061832Smax.romanov@nginx.com if (b != NULL && nxt_fast_path(res == NXT_OK)) { 2071832Smax.romanov@nginx.com if (qmsg.pm.mmap) { 2081832Smax.romanov@nginx.com b->is_port_mmap_sent = 1; 2091832Smax.romanov@nginx.com } 2101832Smax.romanov@nginx.com 2111832Smax.romanov@nginx.com b->mem.pos = b->mem.free; 2121832Smax.romanov@nginx.com 2131832Smax.romanov@nginx.com nxt_work_queue_add(&task->thread->engine->fast_work_queue, 2141832Smax.romanov@nginx.com b->completion_handler, task, b, b->parent); 2151832Smax.romanov@nginx.com } 2161555Smax.romanov@nginx.com 2171555Smax.romanov@nginx.com if (notify == 0) { 2181555Smax.romanov@nginx.com return res; 2191555Smax.romanov@nginx.com } 2201555Smax.romanov@nginx.com 2211555Smax.romanov@nginx.com msg.port_msg.type = _NXT_PORT_MSG_READ_QUEUE; 2221555Smax.romanov@nginx.com msg.buf = NULL; 2231555Smax.romanov@nginx.com 2241555Smax.romanov@nginx.com } else { 2251832Smax.romanov@nginx.com qmsg.buf[0] = _NXT_PORT_MSG_READ_SOCKET; 2261555Smax.romanov@nginx.com 2271832Smax.romanov@nginx.com res = nxt_port_queue_send(port->queue, qmsg.buf, 1, ¬ify); 2281555Smax.romanov@nginx.com 2291555Smax.romanov@nginx.com nxt_debug(task, "port{%d,%d} %d: enqueue 1 notify %d, %d", 2301555Smax.romanov@nginx.com (int) port->pid, (int) port->id, port->socket.fd, 2311555Smax.romanov@nginx.com notify, res); 2321560Smax.romanov@nginx.com 2331560Smax.romanov@nginx.com if (nxt_slow_path(res == NXT_AGAIN)) { 2341560Smax.romanov@nginx.com return NXT_AGAIN; 2351560Smax.romanov@nginx.com } 2361555Smax.romanov@nginx.com } 2371555Smax.romanov@nginx.com } 23811Sigor@sysoev.ru 2391125Smax.romanov@nginx.com res = nxt_port_msg_chk_insert(task, port, &msg); 2401125Smax.romanov@nginx.com if (nxt_fast_path(res == NXT_DECLINED)) { 241344Smax.romanov@nginx.com nxt_port_write_handler(task, &port->socket, &msg); 2421125Smax.romanov@nginx.com res = NXT_OK; 24311Sigor@sysoev.ru } 24411Sigor@sysoev.ru 2451125Smax.romanov@nginx.com return res; 2461125Smax.romanov@nginx.com } 2471125Smax.romanov@nginx.com 2481125Smax.romanov@nginx.com 2491832Smax.romanov@nginx.com static nxt_bool_t 2501832Smax.romanov@nginx.com nxt_port_can_enqueue_buf(nxt_buf_t *b) 2511832Smax.romanov@nginx.com { 2521832Smax.romanov@nginx.com if (b == NULL) { 2531832Smax.romanov@nginx.com return 1; 2541832Smax.romanov@nginx.com } 2551832Smax.romanov@nginx.com 2561832Smax.romanov@nginx.com if (b->next != NULL) { 2571832Smax.romanov@nginx.com return 0; 2581832Smax.romanov@nginx.com } 2591832Smax.romanov@nginx.com 2601832Smax.romanov@nginx.com return (nxt_buf_mem_used_size(&b->mem) <= NXT_PORT_MAX_ENQUEUE_BUF_SIZE 2611832Smax.romanov@nginx.com || nxt_buf_is_port_mmap(b)); 2621832Smax.romanov@nginx.com } 2631832Smax.romanov@nginx.com 2641832Smax.romanov@nginx.com 2651832Smax.romanov@nginx.com static uint8_t 2661832Smax.romanov@nginx.com nxt_port_enqueue_buf(nxt_task_t *task, nxt_port_msg_t *pm, void *qbuf, 2671832Smax.romanov@nginx.com nxt_buf_t *b) 2681832Smax.romanov@nginx.com { 2691832Smax.romanov@nginx.com ssize_t size; 2701832Smax.romanov@nginx.com nxt_port_mmap_msg_t *mm; 2711832Smax.romanov@nginx.com nxt_port_mmap_header_t *hdr; 2721832Smax.romanov@nginx.com nxt_port_mmap_handler_t *mmap_handler; 2731832Smax.romanov@nginx.com 2741832Smax.romanov@nginx.com size = nxt_buf_mem_used_size(&b->mem); 2751832Smax.romanov@nginx.com 2761832Smax.romanov@nginx.com if (size <= NXT_PORT_MAX_ENQUEUE_BUF_SIZE) { 2771832Smax.romanov@nginx.com nxt_memcpy(qbuf, b->mem.pos, size); 2781832Smax.romanov@nginx.com 2791832Smax.romanov@nginx.com return size; 2801832Smax.romanov@nginx.com } 2811832Smax.romanov@nginx.com 2821832Smax.romanov@nginx.com mmap_handler = b->parent; 2831832Smax.romanov@nginx.com hdr = mmap_handler->hdr; 2841832Smax.romanov@nginx.com mm = qbuf; 2851832Smax.romanov@nginx.com 2861832Smax.romanov@nginx.com mm->mmap_id = hdr->id; 2871832Smax.romanov@nginx.com mm->chunk_id = nxt_port_mmap_chunk_id(hdr, b->mem.pos); 2881832Smax.romanov@nginx.com mm->size = nxt_buf_mem_used_size(&b->mem); 2891832Smax.romanov@nginx.com 2901832Smax.romanov@nginx.com pm->mmap = 1; 2911832Smax.romanov@nginx.com 2921832Smax.romanov@nginx.com nxt_debug(task, "mmap_msg={%D, %D, %D}", mm->mmap_id, mm->chunk_id, 2931832Smax.romanov@nginx.com mm->size); 2941832Smax.romanov@nginx.com 2951832Smax.romanov@nginx.com return sizeof(nxt_port_mmap_msg_t); 2961832Smax.romanov@nginx.com } 2971832Smax.romanov@nginx.com 2981832Smax.romanov@nginx.com 2991125Smax.romanov@nginx.com static nxt_int_t 3001125Smax.romanov@nginx.com nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port, 3011125Smax.romanov@nginx.com nxt_port_send_msg_t *msg) 3021125Smax.romanov@nginx.com { 3031125Smax.romanov@nginx.com nxt_int_t res; 3041125Smax.romanov@nginx.com 3051125Smax.romanov@nginx.com nxt_thread_mutex_lock(&port->write_mutex); 3061125Smax.romanov@nginx.com 3071125Smax.romanov@nginx.com if (nxt_fast_path(port->socket.write_ready 3081125Smax.romanov@nginx.com && nxt_queue_is_empty(&port->messages))) 3091125Smax.romanov@nginx.com { 3101125Smax.romanov@nginx.com res = NXT_DECLINED; 3111125Smax.romanov@nginx.com 3121125Smax.romanov@nginx.com } else { 3131125Smax.romanov@nginx.com msg = nxt_port_msg_alloc(msg); 3141125Smax.romanov@nginx.com 3151125Smax.romanov@nginx.com if (nxt_fast_path(msg != NULL)) { 3161125Smax.romanov@nginx.com nxt_queue_insert_tail(&port->messages, &msg->link); 3171125Smax.romanov@nginx.com nxt_port_use(task, port, 1); 3181125Smax.romanov@nginx.com res = NXT_OK; 3191125Smax.romanov@nginx.com 3201125Smax.romanov@nginx.com } else { 3211125Smax.romanov@nginx.com res = NXT_ERROR; 3221125Smax.romanov@nginx.com } 3231125Smax.romanov@nginx.com } 3241125Smax.romanov@nginx.com 3251125Smax.romanov@nginx.com nxt_thread_mutex_unlock(&port->write_mutex); 3261125Smax.romanov@nginx.com 3271125Smax.romanov@nginx.com return res; 3281125Smax.romanov@nginx.com } 3291125Smax.romanov@nginx.com 3301125Smax.romanov@nginx.com 3311125Smax.romanov@nginx.com static nxt_port_send_msg_t * 3321125Smax.romanov@nginx.com nxt_port_msg_alloc(nxt_port_send_msg_t *m) 3331125Smax.romanov@nginx.com { 3341125Smax.romanov@nginx.com nxt_port_send_msg_t *msg; 3351125Smax.romanov@nginx.com 3361125Smax.romanov@nginx.com msg = nxt_malloc(sizeof(nxt_port_send_msg_t)); 3371125Smax.romanov@nginx.com if (nxt_slow_path(msg == NULL)) { 3381125Smax.romanov@nginx.com return NULL; 3391125Smax.romanov@nginx.com } 3401125Smax.romanov@nginx.com 3411125Smax.romanov@nginx.com *msg = *m; 3421125Smax.romanov@nginx.com 3431125Smax.romanov@nginx.com msg->allocated = 1; 3441125Smax.romanov@nginx.com 3451125Smax.romanov@nginx.com return msg; 34611Sigor@sysoev.ru } 34711Sigor@sysoev.ru 34811Sigor@sysoev.ru 34911Sigor@sysoev.ru static void 350343Smax.romanov@nginx.com nxt_port_fd_block_write(nxt_task_t *task, nxt_port_t *port, void *data) 351343Smax.romanov@nginx.com { 352343Smax.romanov@nginx.com nxt_fd_event_block_write(task->thread->engine, &port->socket); 353343Smax.romanov@nginx.com } 354343Smax.romanov@nginx.com 355343Smax.romanov@nginx.com 356343Smax.romanov@nginx.com static void 357343Smax.romanov@nginx.com nxt_port_fd_enable_write(nxt_task_t *task, nxt_port_t *port, void *data) 358343Smax.romanov@nginx.com { 359343Smax.romanov@nginx.com nxt_fd_event_enable_write(task->thread->engine, &port->socket); 360343Smax.romanov@nginx.com } 361343Smax.romanov@nginx.com 362343Smax.romanov@nginx.com 363343Smax.romanov@nginx.com static void 36411Sigor@sysoev.ru nxt_port_write_handler(nxt_task_t *task, void *obj, void *data) 36511Sigor@sysoev.ru { 366343Smax.romanov@nginx.com int use_delta; 367197Smax.romanov@nginx.com size_t plain_size; 36811Sigor@sysoev.ru ssize_t n; 3691125Smax.romanov@nginx.com uint32_t mmsg_buf[3 * NXT_IOBUF_MAX * 10]; 370343Smax.romanov@nginx.com nxt_bool_t block_write, enable_write; 37111Sigor@sysoev.ru nxt_port_t *port; 3721125Smax.romanov@nginx.com struct iovec iov[NXT_IOBUF_MAX * 10]; 373127Smax.romanov@nginx.com nxt_work_queue_t *wq; 374125Smax.romanov@nginx.com nxt_port_method_t m; 37511Sigor@sysoev.ru nxt_port_send_msg_t *msg; 37611Sigor@sysoev.ru nxt_sendbuf_coalesce_t sb; 37742Smax.romanov@nginx.com 378197Smax.romanov@nginx.com port = nxt_container_of(obj, nxt_port_t, socket); 37911Sigor@sysoev.ru 380343Smax.romanov@nginx.com block_write = 0; 381343Smax.romanov@nginx.com enable_write = 0; 382343Smax.romanov@nginx.com use_delta = 0; 383343Smax.romanov@nginx.com 384344Smax.romanov@nginx.com wq = &task->thread->engine->fast_work_queue; 385344Smax.romanov@nginx.com 38611Sigor@sysoev.ru do { 3871125Smax.romanov@nginx.com if (data) { 3881125Smax.romanov@nginx.com msg = data; 3891125Smax.romanov@nginx.com 3901125Smax.romanov@nginx.com } else { 3911125Smax.romanov@nginx.com msg = nxt_port_msg_first(port); 39211Sigor@sysoev.ru 3931125Smax.romanov@nginx.com if (msg == NULL) { 3941125Smax.romanov@nginx.com block_write = 1; 3951125Smax.romanov@nginx.com goto cleanup; 3961125Smax.romanov@nginx.com } 39711Sigor@sysoev.ru } 39811Sigor@sysoev.ru 3991125Smax.romanov@nginx.com next_fragment: 4001125Smax.romanov@nginx.com 40114Sigor@sysoev.ru iov[0].iov_base = &msg->port_msg; 40214Sigor@sysoev.ru iov[0].iov_len = sizeof(nxt_port_msg_t); 40311Sigor@sysoev.ru 40411Sigor@sysoev.ru sb.buf = msg->buf; 40514Sigor@sysoev.ru sb.iobuf = &iov[1]; 40611Sigor@sysoev.ru sb.nmax = NXT_IOBUF_MAX - 1; 40711Sigor@sysoev.ru sb.sync = 0; 40811Sigor@sysoev.ru sb.last = 0; 40942Smax.romanov@nginx.com sb.size = 0; 41011Sigor@sysoev.ru sb.limit = port->max_size; 41111Sigor@sysoev.ru 412352Smax.romanov@nginx.com sb.limit_reached = 0; 413352Smax.romanov@nginx.com sb.nmax_reached = 0; 414352Smax.romanov@nginx.com 41542Smax.romanov@nginx.com m = nxt_port_mmap_get_method(task, port, msg->buf); 41642Smax.romanov@nginx.com 41742Smax.romanov@nginx.com if (m == NXT_PORT_METHOD_MMAP) { 41842Smax.romanov@nginx.com sb.limit = (1ULL << 31) - 1; 419352Smax.romanov@nginx.com sb.nmax = nxt_min(NXT_IOBUF_MAX * 10 - 1, 420352Smax.romanov@nginx.com port->max_size / PORT_MMAP_MIN_SIZE); 42142Smax.romanov@nginx.com } 42242Smax.romanov@nginx.com 4231002Smax.romanov@nginx.com sb.limit -= iov[0].iov_len; 4241002Smax.romanov@nginx.com 42542Smax.romanov@nginx.com nxt_sendbuf_mem_coalesce(task, &sb); 42642Smax.romanov@nginx.com 42742Smax.romanov@nginx.com plain_size = sb.size; 42842Smax.romanov@nginx.com 42942Smax.romanov@nginx.com /* 43042Smax.romanov@nginx.com * Send through mmap enabled only when payload 43142Smax.romanov@nginx.com * is bigger than PORT_MMAP_MIN_SIZE. 43242Smax.romanov@nginx.com */ 43342Smax.romanov@nginx.com if (m == NXT_PORT_METHOD_MMAP && plain_size > PORT_MMAP_MIN_SIZE) { 4341125Smax.romanov@nginx.com nxt_port_mmap_write(task, port, msg, &sb, mmsg_buf); 43542Smax.romanov@nginx.com 43642Smax.romanov@nginx.com } else { 43742Smax.romanov@nginx.com m = NXT_PORT_METHOD_PLAIN; 43842Smax.romanov@nginx.com } 43911Sigor@sysoev.ru 440189Smax.romanov@nginx.com msg->port_msg.last |= sb.last; 441352Smax.romanov@nginx.com msg->port_msg.mf = sb.limit_reached || sb.nmax_reached; 44211Sigor@sysoev.ru 4431558Smax.romanov@nginx.com n = nxt_socketpair_send(&port->socket, msg->fd, iov, sb.niov + 1); 44411Sigor@sysoev.ru 44511Sigor@sysoev.ru if (n > 0) { 44642Smax.romanov@nginx.com if (nxt_slow_path((size_t) n != sb.size + iov[0].iov_len)) { 447564Svbart@nginx.com nxt_alert(task, "port %d: short write: %z instead of %uz", 448564Svbart@nginx.com port->socket.fd, n, sb.size + iov[0].iov_len); 44911Sigor@sysoev.ru goto fail; 45011Sigor@sysoev.ru } 45111Sigor@sysoev.ru 4521558Smax.romanov@nginx.com if (msg->close_fd) { 4531558Smax.romanov@nginx.com if (msg->fd[0] != -1) { 4541558Smax.romanov@nginx.com nxt_fd_close(msg->fd[0]); 455189Smax.romanov@nginx.com 4561558Smax.romanov@nginx.com msg->fd[0] = -1; 4571558Smax.romanov@nginx.com } 458189Smax.romanov@nginx.com 4591558Smax.romanov@nginx.com if (msg->fd[1] != -1) { 4601558Smax.romanov@nginx.com nxt_fd_close(msg->fd[1]); 4611553Smax.romanov@nginx.com 4621558Smax.romanov@nginx.com msg->fd[1] = -1; 4631558Smax.romanov@nginx.com } 4641553Smax.romanov@nginx.com } 4651553Smax.romanov@nginx.com 466592Sigor@sysoev.ru msg->buf = nxt_port_buf_completion(task, wq, msg->buf, plain_size, 4671125Smax.romanov@nginx.com m == NXT_PORT_METHOD_MMAP); 46811Sigor@sysoev.ru 46911Sigor@sysoev.ru if (msg->buf != NULL) { 470352Smax.romanov@nginx.com nxt_debug(task, "port %d: frag stream #%uD", port->socket.fd, 471352Smax.romanov@nginx.com msg->port_msg.stream); 472352Smax.romanov@nginx.com 47311Sigor@sysoev.ru /* 47411Sigor@sysoev.ru * A file descriptor is sent only 47511Sigor@sysoev.ru * in the first message of a stream. 47611Sigor@sysoev.ru */ 4771558Smax.romanov@nginx.com msg->fd[0] = -1; 4781558Smax.romanov@nginx.com msg->fd[1] = -1; 47911Sigor@sysoev.ru msg->share += n; 480352Smax.romanov@nginx.com msg->port_msg.nf = 1; 48111Sigor@sysoev.ru 48211Sigor@sysoev.ru if (msg->share >= port->max_share) { 48311Sigor@sysoev.ru msg->share = 0; 484344Smax.romanov@nginx.com 485344Smax.romanov@nginx.com if (msg->link.next != NULL) { 4861125Smax.romanov@nginx.com nxt_thread_mutex_lock(&port->write_mutex); 4871125Smax.romanov@nginx.com 488344Smax.romanov@nginx.com nxt_queue_remove(&msg->link); 4891125Smax.romanov@nginx.com nxt_queue_insert_tail(&port->messages, &msg->link); 4901125Smax.romanov@nginx.com 4911125Smax.romanov@nginx.com nxt_thread_mutex_unlock(&port->write_mutex); 492344Smax.romanov@nginx.com 4931125Smax.romanov@nginx.com } else { 4941125Smax.romanov@nginx.com msg = nxt_port_msg_insert_tail(port, msg); 4951125Smax.romanov@nginx.com if (nxt_slow_path(msg == NULL)) { 4961125Smax.romanov@nginx.com goto fail; 4971125Smax.romanov@nginx.com } 4981125Smax.romanov@nginx.com 499344Smax.romanov@nginx.com use_delta++; 500344Smax.romanov@nginx.com } 5011125Smax.romanov@nginx.com 5021125Smax.romanov@nginx.com } else { 5031125Smax.romanov@nginx.com goto next_fragment; 50411Sigor@sysoev.ru } 50511Sigor@sysoev.ru 50611Sigor@sysoev.ru } else { 507344Smax.romanov@nginx.com if (msg->link.next != NULL) { 5081125Smax.romanov@nginx.com nxt_thread_mutex_lock(&port->write_mutex); 5091125Smax.romanov@nginx.com 510344Smax.romanov@nginx.com nxt_queue_remove(&msg->link); 5111125Smax.romanov@nginx.com msg->link.next = NULL; 5121125Smax.romanov@nginx.com 5131125Smax.romanov@nginx.com nxt_thread_mutex_unlock(&port->write_mutex); 5141125Smax.romanov@nginx.com 515344Smax.romanov@nginx.com use_delta--; 516344Smax.romanov@nginx.com } 5171125Smax.romanov@nginx.com 5181125Smax.romanov@nginx.com nxt_port_release_send_msg(msg); 5191125Smax.romanov@nginx.com } 5201125Smax.romanov@nginx.com 5211125Smax.romanov@nginx.com if (data != NULL) { 5221125Smax.romanov@nginx.com goto cleanup; 52311Sigor@sysoev.ru } 52411Sigor@sysoev.ru 5251004Smax.romanov@nginx.com } else { 5261125Smax.romanov@nginx.com if (nxt_slow_path(n == NXT_ERROR)) { 527*1907Smax.romanov@nginx.com if (msg->link.next == NULL) { 528*1907Smax.romanov@nginx.com if (msg->close_fd) { 529*1907Smax.romanov@nginx.com if (msg->fd[0] != -1) { 530*1907Smax.romanov@nginx.com nxt_fd_close(msg->fd[0]); 531*1907Smax.romanov@nginx.com 532*1907Smax.romanov@nginx.com msg->fd[0] = -1; 533*1907Smax.romanov@nginx.com } 534*1907Smax.romanov@nginx.com 535*1907Smax.romanov@nginx.com if (msg->fd[1] != -1) { 536*1907Smax.romanov@nginx.com nxt_fd_close(msg->fd[1]); 537*1907Smax.romanov@nginx.com 538*1907Smax.romanov@nginx.com msg->fd[1] = -1; 539*1907Smax.romanov@nginx.com } 540*1907Smax.romanov@nginx.com } 541*1907Smax.romanov@nginx.com 542*1907Smax.romanov@nginx.com nxt_port_release_send_msg(msg); 543*1907Smax.romanov@nginx.com } 544*1907Smax.romanov@nginx.com 5451125Smax.romanov@nginx.com goto fail; 546344Smax.romanov@nginx.com } 5471004Smax.romanov@nginx.com 5481125Smax.romanov@nginx.com if (msg->link.next == NULL) { 5491125Smax.romanov@nginx.com msg = nxt_port_msg_insert_tail(port, msg); 5501125Smax.romanov@nginx.com if (nxt_slow_path(msg == NULL)) { 5511125Smax.romanov@nginx.com goto fail; 5521125Smax.romanov@nginx.com } 5531125Smax.romanov@nginx.com 5541125Smax.romanov@nginx.com use_delta++; 5551004Smax.romanov@nginx.com } 55611Sigor@sysoev.ru } 55711Sigor@sysoev.ru 55811Sigor@sysoev.ru } while (port->socket.write_ready); 55911Sigor@sysoev.ru 56012Sigor@sysoev.ru if (nxt_fd_event_is_disabled(port->socket.write)) { 561343Smax.romanov@nginx.com enable_write = 1; 56211Sigor@sysoev.ru } 56311Sigor@sysoev.ru 5641125Smax.romanov@nginx.com goto cleanup; 56511Sigor@sysoev.ru 56611Sigor@sysoev.ru fail: 56711Sigor@sysoev.ru 568343Smax.romanov@nginx.com use_delta++; 569343Smax.romanov@nginx.com 570344Smax.romanov@nginx.com nxt_work_queue_add(wq, nxt_port_error_handler, task, &port->socket, 571343Smax.romanov@nginx.com &port->socket); 572343Smax.romanov@nginx.com 5731125Smax.romanov@nginx.com cleanup: 574343Smax.romanov@nginx.com 575343Smax.romanov@nginx.com if (block_write && nxt_fd_event_is_active(port->socket.write)) { 576343Smax.romanov@nginx.com nxt_port_post(task, port, nxt_port_fd_block_write, NULL); 577343Smax.romanov@nginx.com } 578343Smax.romanov@nginx.com 579343Smax.romanov@nginx.com if (enable_write) { 580343Smax.romanov@nginx.com nxt_port_post(task, port, nxt_port_fd_enable_write, NULL); 581343Smax.romanov@nginx.com } 582343Smax.romanov@nginx.com 583343Smax.romanov@nginx.com if (use_delta != 0) { 584343Smax.romanov@nginx.com nxt_port_use(task, port, use_delta); 585343Smax.romanov@nginx.com } 58611Sigor@sysoev.ru } 58711Sigor@sysoev.ru 58811Sigor@sysoev.ru 5891125Smax.romanov@nginx.com static nxt_port_send_msg_t * 5901125Smax.romanov@nginx.com nxt_port_msg_first(nxt_port_t *port) 5911125Smax.romanov@nginx.com { 5921125Smax.romanov@nginx.com nxt_queue_link_t *lnk; 5931125Smax.romanov@nginx.com nxt_port_send_msg_t *msg; 5941125Smax.romanov@nginx.com 5951125Smax.romanov@nginx.com nxt_thread_mutex_lock(&port->write_mutex); 5961125Smax.romanov@nginx.com 5971125Smax.romanov@nginx.com lnk = nxt_queue_first(&port->messages); 5981125Smax.romanov@nginx.com 5991125Smax.romanov@nginx.com if (lnk == nxt_queue_tail(&port->messages)) { 6001125Smax.romanov@nginx.com msg = NULL; 6011125Smax.romanov@nginx.com 6021125Smax.romanov@nginx.com } else { 6031125Smax.romanov@nginx.com msg = nxt_queue_link_data(lnk, nxt_port_send_msg_t, link); 6041125Smax.romanov@nginx.com } 6051125Smax.romanov@nginx.com 6061125Smax.romanov@nginx.com nxt_thread_mutex_unlock(&port->write_mutex); 6071125Smax.romanov@nginx.com 6081125Smax.romanov@nginx.com return msg; 6091125Smax.romanov@nginx.com } 6101125Smax.romanov@nginx.com 6111125Smax.romanov@nginx.com 612592Sigor@sysoev.ru static nxt_buf_t * 613592Sigor@sysoev.ru nxt_port_buf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b, 614592Sigor@sysoev.ru size_t sent, nxt_bool_t mmap_mode) 615592Sigor@sysoev.ru { 6161269Sigor@sysoev.ru size_t size; 6171269Sigor@sysoev.ru nxt_buf_t *next; 618592Sigor@sysoev.ru 619592Sigor@sysoev.ru while (b != NULL) { 620592Sigor@sysoev.ru 621592Sigor@sysoev.ru nxt_prefetch(b->next); 622592Sigor@sysoev.ru 623592Sigor@sysoev.ru if (!nxt_buf_is_sync(b)) { 624592Sigor@sysoev.ru 625592Sigor@sysoev.ru size = nxt_buf_used_size(b); 626592Sigor@sysoev.ru 627592Sigor@sysoev.ru if (size != 0) { 628592Sigor@sysoev.ru 629592Sigor@sysoev.ru if (sent == 0) { 630592Sigor@sysoev.ru break; 631592Sigor@sysoev.ru } 632592Sigor@sysoev.ru 633592Sigor@sysoev.ru if (nxt_buf_is_port_mmap(b) && mmap_mode) { 634592Sigor@sysoev.ru /* 635592Sigor@sysoev.ru * buffer has been sent to other side which is now 636592Sigor@sysoev.ru * responsible for shared memory bucket release 637592Sigor@sysoev.ru */ 638592Sigor@sysoev.ru b->is_port_mmap_sent = 1; 639592Sigor@sysoev.ru } 640592Sigor@sysoev.ru 641592Sigor@sysoev.ru if (sent < size) { 642592Sigor@sysoev.ru 643592Sigor@sysoev.ru if (nxt_buf_is_mem(b)) { 644592Sigor@sysoev.ru b->mem.pos += sent; 645592Sigor@sysoev.ru } 646592Sigor@sysoev.ru 647592Sigor@sysoev.ru if (nxt_buf_is_file(b)) { 648592Sigor@sysoev.ru b->file_pos += sent; 649592Sigor@sysoev.ru } 650592Sigor@sysoev.ru 651592Sigor@sysoev.ru break; 652592Sigor@sysoev.ru } 653592Sigor@sysoev.ru 654592Sigor@sysoev.ru /* b->mem.free is NULL in file-only buffer. */ 655592Sigor@sysoev.ru b->mem.pos = b->mem.free; 656592Sigor@sysoev.ru 657592Sigor@sysoev.ru if (nxt_buf_is_file(b)) { 658592Sigor@sysoev.ru b->file_pos = b->file_end; 659592Sigor@sysoev.ru } 660592Sigor@sysoev.ru 661592Sigor@sysoev.ru sent -= size; 662592Sigor@sysoev.ru } 663592Sigor@sysoev.ru } 664592Sigor@sysoev.ru 665592Sigor@sysoev.ru nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); 666592Sigor@sysoev.ru 6671269Sigor@sysoev.ru next = b->next; 6681269Sigor@sysoev.ru b->next = NULL; 6691269Sigor@sysoev.ru b = next; 670592Sigor@sysoev.ru } 671592Sigor@sysoev.ru 672592Sigor@sysoev.ru return b; 673592Sigor@sysoev.ru } 674592Sigor@sysoev.ru 675592Sigor@sysoev.ru 6761125Smax.romanov@nginx.com static nxt_port_send_msg_t * 6771125Smax.romanov@nginx.com nxt_port_msg_insert_tail(nxt_port_t *port, nxt_port_send_msg_t *msg) 6781125Smax.romanov@nginx.com { 6791125Smax.romanov@nginx.com if (msg->allocated == 0) { 6801125Smax.romanov@nginx.com msg = nxt_port_msg_alloc(msg); 6811125Smax.romanov@nginx.com 6821125Smax.romanov@nginx.com if (nxt_slow_path(msg == NULL)) { 6831125Smax.romanov@nginx.com return NULL; 6841125Smax.romanov@nginx.com } 6851125Smax.romanov@nginx.com } 6861125Smax.romanov@nginx.com 6871125Smax.romanov@nginx.com nxt_thread_mutex_lock(&port->write_mutex); 6881125Smax.romanov@nginx.com 6891125Smax.romanov@nginx.com nxt_queue_insert_tail(&port->messages, &msg->link); 6901125Smax.romanov@nginx.com 6911125Smax.romanov@nginx.com nxt_thread_mutex_unlock(&port->write_mutex); 6921125Smax.romanov@nginx.com 6931125Smax.romanov@nginx.com return msg; 6941125Smax.romanov@nginx.com } 6951125Smax.romanov@nginx.com 6961125Smax.romanov@nginx.com 69711Sigor@sysoev.ru void 69811Sigor@sysoev.ru nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port) 69911Sigor@sysoev.ru { 70011Sigor@sysoev.ru port->socket.fd = port->pair[0]; 70111Sigor@sysoev.ru port->socket.log = &nxt_main_log; 70211Sigor@sysoev.ru 703141Smax.romanov@nginx.com port->engine = task->thread->engine; 704141Smax.romanov@nginx.com 705141Smax.romanov@nginx.com port->socket.read_work_queue = &port->engine->fast_work_queue; 7061555Smax.romanov@nginx.com port->socket.read_handler = port->queue != NULL 7071555Smax.romanov@nginx.com ? nxt_port_queue_read_handler 7081555Smax.romanov@nginx.com : nxt_port_read_handler; 70911Sigor@sysoev.ru port->socket.error_handler = nxt_port_error_handler; 71011Sigor@sysoev.ru 711141Smax.romanov@nginx.com nxt_fd_event_enable_read(port->engine, &port->socket); 71211Sigor@sysoev.ru } 71311Sigor@sysoev.ru 71411Sigor@sysoev.ru 71511Sigor@sysoev.ru void 71611Sigor@sysoev.ru nxt_port_read_close(nxt_port_t *port) 71711Sigor@sysoev.ru { 718350Smax.romanov@nginx.com port->socket.read_ready = 0; 7191015Smax.romanov@nginx.com port->socket.read = NXT_EVENT_INACTIVE; 72013Sigor@sysoev.ru nxt_socket_close(port->socket.task, port->pair[0]); 72111Sigor@sysoev.ru port->pair[0] = -1; 72211Sigor@sysoev.ru } 72311Sigor@sysoev.ru 72411Sigor@sysoev.ru 72511Sigor@sysoev.ru static void 72611Sigor@sysoev.ru nxt_port_read_handler(nxt_task_t *task, void *obj, void *data) 72711Sigor@sysoev.ru { 72842Smax.romanov@nginx.com ssize_t n; 72942Smax.romanov@nginx.com nxt_buf_t *b; 73042Smax.romanov@nginx.com nxt_port_t *port; 73142Smax.romanov@nginx.com struct iovec iov[2]; 73242Smax.romanov@nginx.com nxt_port_recv_msg_t msg; 73311Sigor@sysoev.ru 734125Smax.romanov@nginx.com port = msg.port = nxt_container_of(obj, nxt_port_t, socket); 73511Sigor@sysoev.ru 736141Smax.romanov@nginx.com nxt_assert(port->engine == task->thread->engine); 737141Smax.romanov@nginx.com 73811Sigor@sysoev.ru for ( ;; ) { 73911Sigor@sysoev.ru 74011Sigor@sysoev.ru b = nxt_port_buf_alloc(port); 74111Sigor@sysoev.ru 74211Sigor@sysoev.ru if (nxt_slow_path(b == NULL)) { 74311Sigor@sysoev.ru /* TODO: disable event for some time */ 74411Sigor@sysoev.ru } 74511Sigor@sysoev.ru 74642Smax.romanov@nginx.com iov[0].iov_base = &msg.port_msg; 74714Sigor@sysoev.ru iov[0].iov_len = sizeof(nxt_port_msg_t); 74811Sigor@sysoev.ru 74914Sigor@sysoev.ru iov[1].iov_base = b->mem.pos; 75014Sigor@sysoev.ru iov[1].iov_len = port->max_size; 75114Sigor@sysoev.ru 7521558Smax.romanov@nginx.com n = nxt_socketpair_recv(&port->socket, msg.fd, iov, 2); 75311Sigor@sysoev.ru 75411Sigor@sysoev.ru if (n > 0) { 75542Smax.romanov@nginx.com 75642Smax.romanov@nginx.com msg.buf = b; 75782Smax.romanov@nginx.com msg.size = n; 75842Smax.romanov@nginx.com 75982Smax.romanov@nginx.com nxt_port_read_msg_process(task, port, &msg); 76011Sigor@sysoev.ru 761194Smax.romanov@nginx.com /* 762194Smax.romanov@nginx.com * To disable instant completion or buffer re-usage, 763194Smax.romanov@nginx.com * handler should reset 'msg.buf'. 764194Smax.romanov@nginx.com */ 765194Smax.romanov@nginx.com if (msg.buf == b) { 76611Sigor@sysoev.ru nxt_port_buf_free(port, b); 76711Sigor@sysoev.ru } 76811Sigor@sysoev.ru 76911Sigor@sysoev.ru if (port->socket.read_ready) { 77011Sigor@sysoev.ru continue; 77111Sigor@sysoev.ru } 77211Sigor@sysoev.ru 77311Sigor@sysoev.ru return; 77411Sigor@sysoev.ru } 77511Sigor@sysoev.ru 77611Sigor@sysoev.ru if (n == NXT_AGAIN) { 77711Sigor@sysoev.ru nxt_port_buf_free(port, b); 77811Sigor@sysoev.ru 77912Sigor@sysoev.ru nxt_fd_event_enable_read(task->thread->engine, &port->socket); 78011Sigor@sysoev.ru return; 78111Sigor@sysoev.ru } 78211Sigor@sysoev.ru 78311Sigor@sysoev.ru /* n == 0 || n == NXT_ERROR */ 78411Sigor@sysoev.ru 78511Sigor@sysoev.ru nxt_work_queue_add(&task->thread->engine->fast_work_queue, 78611Sigor@sysoev.ru nxt_port_error_handler, task, &port->socket, NULL); 78711Sigor@sysoev.ru return; 78811Sigor@sysoev.ru } 78911Sigor@sysoev.ru } 79011Sigor@sysoev.ru 79111Sigor@sysoev.ru 7921555Smax.romanov@nginx.com static void 7931555Smax.romanov@nginx.com nxt_port_queue_read_handler(nxt_task_t *task, void *obj, void *data) 7941555Smax.romanov@nginx.com { 7951555Smax.romanov@nginx.com ssize_t n; 7961555Smax.romanov@nginx.com nxt_buf_t *b; 7971555Smax.romanov@nginx.com nxt_port_t *port; 7981555Smax.romanov@nginx.com struct iovec iov[2]; 7991555Smax.romanov@nginx.com nxt_port_queue_t *queue; 8001555Smax.romanov@nginx.com nxt_port_recv_msg_t msg, *smsg; 8011555Smax.romanov@nginx.com uint8_t qmsg[NXT_PORT_QUEUE_MSG_SIZE]; 8021555Smax.romanov@nginx.com 8031555Smax.romanov@nginx.com port = nxt_container_of(obj, nxt_port_t, socket); 8041555Smax.romanov@nginx.com msg.port = port; 8051555Smax.romanov@nginx.com 8061555Smax.romanov@nginx.com nxt_assert(port->engine == task->thread->engine); 8071555Smax.romanov@nginx.com 8081555Smax.romanov@nginx.com queue = port->queue; 8091555Smax.romanov@nginx.com nxt_atomic_fetch_add(&queue->nitems, 1); 8101555Smax.romanov@nginx.com 8111555Smax.romanov@nginx.com for ( ;; ) { 8121555Smax.romanov@nginx.com 8131555Smax.romanov@nginx.com if (port->from_socket == 0) { 8141555Smax.romanov@nginx.com n = nxt_port_queue_recv(queue, qmsg); 8151555Smax.romanov@nginx.com 8161555Smax.romanov@nginx.com if (n < 0 && !port->socket.read_ready) { 8171555Smax.romanov@nginx.com nxt_atomic_fetch_add(&queue->nitems, -1); 8181555Smax.romanov@nginx.com 8191555Smax.romanov@nginx.com n = nxt_port_queue_recv(queue, qmsg); 8201555Smax.romanov@nginx.com if (n < 0) { 8211555Smax.romanov@nginx.com return; 8221555Smax.romanov@nginx.com } 8231555Smax.romanov@nginx.com 8241555Smax.romanov@nginx.com nxt_atomic_fetch_add(&queue->nitems, 1); 8251555Smax.romanov@nginx.com } 8261555Smax.romanov@nginx.com 8271555Smax.romanov@nginx.com if (n == 1 && qmsg[0] == _NXT_PORT_MSG_READ_SOCKET) { 8281555Smax.romanov@nginx.com port->from_socket++; 8291555Smax.romanov@nginx.com 8301555Smax.romanov@nginx.com nxt_debug(task, "port{%d,%d} %d: dequeue 1 read_socket %d", 8311555Smax.romanov@nginx.com (int) port->pid, (int) port->id, port->socket.fd, 8321555Smax.romanov@nginx.com port->from_socket); 8331555Smax.romanov@nginx.com 8341555Smax.romanov@nginx.com continue; 8351555Smax.romanov@nginx.com } 8361555Smax.romanov@nginx.com 8371555Smax.romanov@nginx.com nxt_debug(task, "port{%d,%d} %d: dequeue %d", 8381555Smax.romanov@nginx.com (int) port->pid, (int) port->id, port->socket.fd, 8391555Smax.romanov@nginx.com (int) n); 8401555Smax.romanov@nginx.com 8411555Smax.romanov@nginx.com } else { 8421555Smax.romanov@nginx.com if ((smsg = port->socket_msg) != NULL && smsg->size != 0) { 8431555Smax.romanov@nginx.com msg.port_msg = smsg->port_msg; 8441555Smax.romanov@nginx.com b = smsg->buf; 8451555Smax.romanov@nginx.com n = smsg->size; 8461558Smax.romanov@nginx.com msg.fd[0] = smsg->fd[0]; 8471558Smax.romanov@nginx.com msg.fd[1] = smsg->fd[1]; 8481555Smax.romanov@nginx.com 8491555Smax.romanov@nginx.com smsg->size = 0; 8501555Smax.romanov@nginx.com 8511555Smax.romanov@nginx.com port->from_socket--; 8521555Smax.romanov@nginx.com 8531555Smax.romanov@nginx.com nxt_debug(task, "port{%d,%d} %d: use suspended message %d", 8541555Smax.romanov@nginx.com (int) port->pid, (int) port->id, port->socket.fd, 8551555Smax.romanov@nginx.com (int) n); 8561555Smax.romanov@nginx.com 8571555Smax.romanov@nginx.com goto process; 8581555Smax.romanov@nginx.com } 8591555Smax.romanov@nginx.com 8601555Smax.romanov@nginx.com n = -1; 8611555Smax.romanov@nginx.com } 8621555Smax.romanov@nginx.com 8631555Smax.romanov@nginx.com if (n < 0 && !port->socket.read_ready) { 8641555Smax.romanov@nginx.com nxt_atomic_fetch_add(&queue->nitems, -1); 8651555Smax.romanov@nginx.com return; 8661555Smax.romanov@nginx.com } 8671555Smax.romanov@nginx.com 8681555Smax.romanov@nginx.com b = nxt_port_buf_alloc(port); 8691555Smax.romanov@nginx.com 8701555Smax.romanov@nginx.com if (nxt_slow_path(b == NULL)) { 8711555Smax.romanov@nginx.com /* TODO: disable event for some time */ 8721555Smax.romanov@nginx.com } 8731555Smax.romanov@nginx.com 8741555Smax.romanov@nginx.com if (n >= (ssize_t) sizeof(nxt_port_msg_t)) { 8751555Smax.romanov@nginx.com nxt_memcpy(&msg.port_msg, qmsg, sizeof(nxt_port_msg_t)); 8761555Smax.romanov@nginx.com 8771555Smax.romanov@nginx.com if (n > (ssize_t) sizeof(nxt_port_msg_t)) { 8781555Smax.romanov@nginx.com nxt_memcpy(b->mem.pos, qmsg + sizeof(nxt_port_msg_t), 8791555Smax.romanov@nginx.com n - sizeof(nxt_port_msg_t)); 8801555Smax.romanov@nginx.com } 8811555Smax.romanov@nginx.com 8821555Smax.romanov@nginx.com } else { 8831555Smax.romanov@nginx.com iov[0].iov_base = &msg.port_msg; 8841555Smax.romanov@nginx.com iov[0].iov_len = sizeof(nxt_port_msg_t); 8851555Smax.romanov@nginx.com 8861555Smax.romanov@nginx.com iov[1].iov_base = b->mem.pos; 8871555Smax.romanov@nginx.com iov[1].iov_len = port->max_size; 8881555Smax.romanov@nginx.com 8891558Smax.romanov@nginx.com n = nxt_socketpair_recv(&port->socket, msg.fd, iov, 2); 8901555Smax.romanov@nginx.com 8911555Smax.romanov@nginx.com if (n == (ssize_t) sizeof(nxt_port_msg_t) 8921555Smax.romanov@nginx.com && msg.port_msg.type == _NXT_PORT_MSG_READ_QUEUE) 8931555Smax.romanov@nginx.com { 8941555Smax.romanov@nginx.com nxt_port_buf_free(port, b); 8951555Smax.romanov@nginx.com 8961555Smax.romanov@nginx.com nxt_debug(task, "port{%d,%d} %d: recv %d read_queue", 8971555Smax.romanov@nginx.com (int) port->pid, (int) port->id, port->socket.fd, 8981555Smax.romanov@nginx.com (int) n); 8991555Smax.romanov@nginx.com 9001555Smax.romanov@nginx.com continue; 9011555Smax.romanov@nginx.com } 9021555Smax.romanov@nginx.com 9031555Smax.romanov@nginx.com nxt_debug(task, "port{%d,%d} %d: recvmsg %d", 9041555Smax.romanov@nginx.com (int) port->pid, (int) port->id, port->socket.fd, 9051555Smax.romanov@nginx.com (int) n); 9061555Smax.romanov@nginx.com 9071555Smax.romanov@nginx.com if (n > 0) { 9081555Smax.romanov@nginx.com if (port->from_socket == 0) { 9091555Smax.romanov@nginx.com nxt_debug(task, "port{%d,%d} %d: suspend message %d", 9101555Smax.romanov@nginx.com (int) port->pid, (int) port->id, port->socket.fd, 9111555Smax.romanov@nginx.com (int) n); 9121555Smax.romanov@nginx.com 9131555Smax.romanov@nginx.com smsg = port->socket_msg; 9141555Smax.romanov@nginx.com 9151555Smax.romanov@nginx.com if (nxt_slow_path(smsg == NULL)) { 9161555Smax.romanov@nginx.com smsg = nxt_mp_alloc(port->mem_pool, 9171555Smax.romanov@nginx.com sizeof(nxt_port_recv_msg_t)); 9181555Smax.romanov@nginx.com 9191555Smax.romanov@nginx.com if (nxt_slow_path(smsg == NULL)) { 9201555Smax.romanov@nginx.com nxt_alert(task, "port{%d,%d} %d: suspend message " 9211555Smax.romanov@nginx.com "failed", 9221555Smax.romanov@nginx.com (int) port->pid, (int) port->id, 9231555Smax.romanov@nginx.com port->socket.fd); 9241555Smax.romanov@nginx.com 9251555Smax.romanov@nginx.com return; 9261555Smax.romanov@nginx.com } 9271555Smax.romanov@nginx.com 9281555Smax.romanov@nginx.com port->socket_msg = smsg; 9291555Smax.romanov@nginx.com 9301555Smax.romanov@nginx.com } else { 9311555Smax.romanov@nginx.com if (nxt_slow_path(smsg->size != 0)) { 9321555Smax.romanov@nginx.com nxt_alert(task, "port{%d,%d} %d: too many suspend " 9331555Smax.romanov@nginx.com "messages", 9341555Smax.romanov@nginx.com (int) port->pid, (int) port->id, 9351555Smax.romanov@nginx.com port->socket.fd); 9361555Smax.romanov@nginx.com 9371555Smax.romanov@nginx.com return; 9381555Smax.romanov@nginx.com } 9391555Smax.romanov@nginx.com } 9401555Smax.romanov@nginx.com 9411555Smax.romanov@nginx.com smsg->port_msg = msg.port_msg; 9421555Smax.romanov@nginx.com smsg->buf = b; 9431555Smax.romanov@nginx.com smsg->size = n; 9441558Smax.romanov@nginx.com smsg->fd[0] = msg.fd[0]; 9451558Smax.romanov@nginx.com smsg->fd[1] = msg.fd[1]; 9461555Smax.romanov@nginx.com 9471555Smax.romanov@nginx.com continue; 9481555Smax.romanov@nginx.com } 9491555Smax.romanov@nginx.com 9501555Smax.romanov@nginx.com port->from_socket--; 9511555Smax.romanov@nginx.com } 9521555Smax.romanov@nginx.com } 9531555Smax.romanov@nginx.com 9541555Smax.romanov@nginx.com process: 9551555Smax.romanov@nginx.com 9561555Smax.romanov@nginx.com if (n > 0) { 9571555Smax.romanov@nginx.com msg.buf = b; 9581555Smax.romanov@nginx.com msg.size = n; 9591555Smax.romanov@nginx.com 9601555Smax.romanov@nginx.com nxt_port_read_msg_process(task, port, &msg); 9611555Smax.romanov@nginx.com 9621555Smax.romanov@nginx.com /* 9631555Smax.romanov@nginx.com * To disable instant completion or buffer re-usage, 9641555Smax.romanov@nginx.com * handler should reset 'msg.buf'. 9651555Smax.romanov@nginx.com */ 9661555Smax.romanov@nginx.com if (msg.buf == b) { 9671555Smax.romanov@nginx.com nxt_port_buf_free(port, b); 9681555Smax.romanov@nginx.com } 9691555Smax.romanov@nginx.com 9701555Smax.romanov@nginx.com continue; 9711555Smax.romanov@nginx.com } 9721555Smax.romanov@nginx.com 9731555Smax.romanov@nginx.com if (n == NXT_AGAIN) { 9741555Smax.romanov@nginx.com nxt_port_buf_free(port, b); 9751555Smax.romanov@nginx.com 9761555Smax.romanov@nginx.com nxt_fd_event_enable_read(task->thread->engine, &port->socket); 9771555Smax.romanov@nginx.com 9781555Smax.romanov@nginx.com continue; 9791555Smax.romanov@nginx.com } 9801555Smax.romanov@nginx.com 9811555Smax.romanov@nginx.com /* n == 0 || n == NXT_ERROR */ 9821555Smax.romanov@nginx.com 9831555Smax.romanov@nginx.com nxt_work_queue_add(&task->thread->engine->fast_work_queue, 9841555Smax.romanov@nginx.com nxt_port_error_handler, task, &port->socket, NULL); 9851555Smax.romanov@nginx.com return; 9861555Smax.romanov@nginx.com } 9871555Smax.romanov@nginx.com } 9881555Smax.romanov@nginx.com 9891555Smax.romanov@nginx.com 9901005Smax.romanov@nginx.com typedef struct { 9911005Smax.romanov@nginx.com uint32_t stream; 9921005Smax.romanov@nginx.com uint32_t pid; 9931005Smax.romanov@nginx.com } nxt_port_frag_key_t; 9941005Smax.romanov@nginx.com 9951005Smax.romanov@nginx.com 996352Smax.romanov@nginx.com static nxt_int_t 997352Smax.romanov@nginx.com nxt_port_lvlhsh_frag_test(nxt_lvlhsh_query_t *lhq, void *data) 998352Smax.romanov@nginx.com { 999352Smax.romanov@nginx.com nxt_port_recv_msg_t *fmsg; 10001005Smax.romanov@nginx.com nxt_port_frag_key_t *frag_key; 1001352Smax.romanov@nginx.com 1002352Smax.romanov@nginx.com fmsg = data; 10031005Smax.romanov@nginx.com frag_key = (nxt_port_frag_key_t *) lhq->key.start; 1004352Smax.romanov@nginx.com 10051005Smax.romanov@nginx.com if (lhq->key.length == sizeof(nxt_port_frag_key_t) 10061005Smax.romanov@nginx.com && frag_key->stream == fmsg->port_msg.stream 10071005Smax.romanov@nginx.com && frag_key->pid == (uint32_t) fmsg->port_msg.pid) 1008352Smax.romanov@nginx.com { 1009352Smax.romanov@nginx.com return NXT_OK; 1010352Smax.romanov@nginx.com } 1011352Smax.romanov@nginx.com 1012352Smax.romanov@nginx.com return NXT_DECLINED; 1013352Smax.romanov@nginx.com } 1014352Smax.romanov@nginx.com 1015352Smax.romanov@nginx.com 1016352Smax.romanov@nginx.com static void * 1017352Smax.romanov@nginx.com nxt_port_lvlhsh_frag_alloc(void *ctx, size_t size) 1018352Smax.romanov@nginx.com { 10191084Smax.romanov@nginx.com return nxt_mp_align(ctx, size, size); 1020352Smax.romanov@nginx.com } 1021352Smax.romanov@nginx.com 1022352Smax.romanov@nginx.com 1023352Smax.romanov@nginx.com static void 1024352Smax.romanov@nginx.com nxt_port_lvlhsh_frag_free(void *ctx, void *p) 1025352Smax.romanov@nginx.com { 1026389Smax.romanov@nginx.com nxt_mp_free(ctx, p); 1027352Smax.romanov@nginx.com } 1028352Smax.romanov@nginx.com 1029352Smax.romanov@nginx.com 1030352Smax.romanov@nginx.com static const nxt_lvlhsh_proto_t lvlhsh_frag_proto nxt_aligned(64) = { 1031352Smax.romanov@nginx.com NXT_LVLHSH_DEFAULT, 1032352Smax.romanov@nginx.com nxt_port_lvlhsh_frag_test, 1033352Smax.romanov@nginx.com nxt_port_lvlhsh_frag_alloc, 1034352Smax.romanov@nginx.com nxt_port_lvlhsh_frag_free, 1035352Smax.romanov@nginx.com }; 1036352Smax.romanov@nginx.com 1037352Smax.romanov@nginx.com 1038352Smax.romanov@nginx.com static nxt_port_recv_msg_t * 1039352Smax.romanov@nginx.com nxt_port_frag_start(nxt_task_t *task, nxt_port_t *port, 1040352Smax.romanov@nginx.com nxt_port_recv_msg_t *msg) 1041352Smax.romanov@nginx.com { 1042352Smax.romanov@nginx.com nxt_int_t res; 1043352Smax.romanov@nginx.com nxt_lvlhsh_query_t lhq; 1044352Smax.romanov@nginx.com nxt_port_recv_msg_t *fmsg; 10451005Smax.romanov@nginx.com nxt_port_frag_key_t frag_key; 1046352Smax.romanov@nginx.com 1047352Smax.romanov@nginx.com nxt_debug(task, "start frag stream #%uD", msg->port_msg.stream); 1048352Smax.romanov@nginx.com 1049352Smax.romanov@nginx.com fmsg = nxt_mp_alloc(port->mem_pool, sizeof(nxt_port_recv_msg_t)); 1050352Smax.romanov@nginx.com 1051352Smax.romanov@nginx.com if (nxt_slow_path(fmsg == NULL)) { 1052352Smax.romanov@nginx.com return NULL; 1053352Smax.romanov@nginx.com } 1054352Smax.romanov@nginx.com 1055352Smax.romanov@nginx.com *fmsg = *msg; 1056352Smax.romanov@nginx.com 10571005Smax.romanov@nginx.com frag_key.stream = fmsg->port_msg.stream; 10581005Smax.romanov@nginx.com frag_key.pid = fmsg->port_msg.pid; 10591005Smax.romanov@nginx.com 10601005Smax.romanov@nginx.com lhq.key_hash = nxt_murmur_hash2(&frag_key, sizeof(nxt_port_frag_key_t)); 10611005Smax.romanov@nginx.com lhq.key.length = sizeof(nxt_port_frag_key_t); 10621005Smax.romanov@nginx.com lhq.key.start = (u_char *) &frag_key; 1063352Smax.romanov@nginx.com lhq.proto = &lvlhsh_frag_proto; 1064352Smax.romanov@nginx.com lhq.replace = 0; 1065352Smax.romanov@nginx.com lhq.value = fmsg; 1066352Smax.romanov@nginx.com lhq.pool = port->mem_pool; 1067352Smax.romanov@nginx.com 1068352Smax.romanov@nginx.com res = nxt_lvlhsh_insert(&port->frags, &lhq); 1069352Smax.romanov@nginx.com 1070352Smax.romanov@nginx.com switch (res) { 1071352Smax.romanov@nginx.com 1072352Smax.romanov@nginx.com case NXT_OK: 1073352Smax.romanov@nginx.com return fmsg; 1074352Smax.romanov@nginx.com 1075352Smax.romanov@nginx.com case NXT_DECLINED: 1076352Smax.romanov@nginx.com nxt_log(task, NXT_LOG_WARN, "duplicate frag stream #%uD", 1077352Smax.romanov@nginx.com fmsg->port_msg.stream); 1078352Smax.romanov@nginx.com nxt_mp_free(port->mem_pool, fmsg); 1079352Smax.romanov@nginx.com 1080352Smax.romanov@nginx.com return NULL; 1081352Smax.romanov@nginx.com 1082352Smax.romanov@nginx.com default: 1083352Smax.romanov@nginx.com nxt_log(task, NXT_LOG_WARN, "failed to add frag stream #%uD", 1084352Smax.romanov@nginx.com fmsg->port_msg.stream); 1085352Smax.romanov@nginx.com 1086352Smax.romanov@nginx.com nxt_mp_free(port->mem_pool, fmsg); 1087352Smax.romanov@nginx.com 1088352Smax.romanov@nginx.com return NULL; 1089352Smax.romanov@nginx.com 1090352Smax.romanov@nginx.com } 1091352Smax.romanov@nginx.com } 1092352Smax.romanov@nginx.com 1093352Smax.romanov@nginx.com 1094352Smax.romanov@nginx.com static nxt_port_recv_msg_t * 10951005Smax.romanov@nginx.com nxt_port_frag_find(nxt_task_t *task, nxt_port_t *port, nxt_port_recv_msg_t *msg) 1096352Smax.romanov@nginx.com { 10971005Smax.romanov@nginx.com nxt_int_t res; 10981005Smax.romanov@nginx.com nxt_bool_t last; 10991005Smax.romanov@nginx.com nxt_lvlhsh_query_t lhq; 11001005Smax.romanov@nginx.com nxt_port_frag_key_t frag_key; 11011005Smax.romanov@nginx.com 11021005Smax.romanov@nginx.com last = msg->port_msg.mf == 0; 1103352Smax.romanov@nginx.com 11041005Smax.romanov@nginx.com nxt_debug(task, "%s frag stream #%uD", last ? "last" : "next", 11051005Smax.romanov@nginx.com msg->port_msg.stream); 1106352Smax.romanov@nginx.com 11071005Smax.romanov@nginx.com frag_key.stream = msg->port_msg.stream; 11081005Smax.romanov@nginx.com frag_key.pid = msg->port_msg.pid; 11091005Smax.romanov@nginx.com 11101005Smax.romanov@nginx.com lhq.key_hash = nxt_murmur_hash2(&frag_key, sizeof(nxt_port_frag_key_t)); 11111005Smax.romanov@nginx.com lhq.key.length = sizeof(nxt_port_frag_key_t); 11121005Smax.romanov@nginx.com lhq.key.start = (u_char *) &frag_key; 1113352Smax.romanov@nginx.com lhq.proto = &lvlhsh_frag_proto; 1114352Smax.romanov@nginx.com lhq.pool = port->mem_pool; 1115352Smax.romanov@nginx.com 1116352Smax.romanov@nginx.com res = last != 0 ? nxt_lvlhsh_delete(&port->frags, &lhq) : 1117352Smax.romanov@nginx.com nxt_lvlhsh_find(&port->frags, &lhq); 1118352Smax.romanov@nginx.com 1119352Smax.romanov@nginx.com switch (res) { 1120352Smax.romanov@nginx.com 1121352Smax.romanov@nginx.com case NXT_OK: 1122352Smax.romanov@nginx.com return lhq.value; 1123352Smax.romanov@nginx.com 1124352Smax.romanov@nginx.com default: 11251005Smax.romanov@nginx.com nxt_log(task, NXT_LOG_INFO, "frag stream #%uD not found", 11261005Smax.romanov@nginx.com frag_key.stream); 1127352Smax.romanov@nginx.com 1128352Smax.romanov@nginx.com return NULL; 1129352Smax.romanov@nginx.com } 1130352Smax.romanov@nginx.com } 1131352Smax.romanov@nginx.com 1132352Smax.romanov@nginx.com 113311Sigor@sysoev.ru static void 113411Sigor@sysoev.ru nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, 113582Smax.romanov@nginx.com nxt_port_recv_msg_t *msg) 113611Sigor@sysoev.ru { 11371269Sigor@sysoev.ru nxt_buf_t *b, *orig_b, *next; 1138352Smax.romanov@nginx.com nxt_port_recv_msg_t *fmsg; 113911Sigor@sysoev.ru 114082Smax.romanov@nginx.com if (nxt_slow_path(msg->size < sizeof(nxt_port_msg_t))) { 1141564Svbart@nginx.com nxt_alert(task, "port %d: too small message:%uz", 1142564Svbart@nginx.com port->socket.fd, msg->size); 1143423Smax.romanov@nginx.com 11441558Smax.romanov@nginx.com if (msg->fd[0] != -1) { 11451558Smax.romanov@nginx.com nxt_fd_close(msg->fd[0]); 1146423Smax.romanov@nginx.com } 1147423Smax.romanov@nginx.com 11481558Smax.romanov@nginx.com if (msg->fd[1] != -1) { 11491558Smax.romanov@nginx.com nxt_fd_close(msg->fd[1]); 11501553Smax.romanov@nginx.com } 11511553Smax.romanov@nginx.com 1152423Smax.romanov@nginx.com return; 115311Sigor@sysoev.ru } 115411Sigor@sysoev.ru 115542Smax.romanov@nginx.com /* adjust size to actual buffer used size */ 115682Smax.romanov@nginx.com msg->size -= sizeof(nxt_port_msg_t); 115742Smax.romanov@nginx.com 115842Smax.romanov@nginx.com b = orig_b = msg->buf; 115982Smax.romanov@nginx.com b->mem.free += msg->size; 116042Smax.romanov@nginx.com 11611555Smax.romanov@nginx.com msg->cancelled = 0; 116211Sigor@sysoev.ru 1163352Smax.romanov@nginx.com if (nxt_slow_path(msg->port_msg.nf != 0)) { 1164423Smax.romanov@nginx.com 11651005Smax.romanov@nginx.com fmsg = nxt_port_frag_find(task, port, msg); 1166352Smax.romanov@nginx.com 1167551Smax.romanov@nginx.com if (nxt_slow_path(fmsg == NULL)) { 1168551Smax.romanov@nginx.com goto fmsg_failed; 1169551Smax.romanov@nginx.com } 1170423Smax.romanov@nginx.com 1171423Smax.romanov@nginx.com if (nxt_fast_path(fmsg->cancelled == 0)) { 1172423Smax.romanov@nginx.com 1173423Smax.romanov@nginx.com if (msg->port_msg.mmap) { 1174423Smax.romanov@nginx.com nxt_port_mmap_read(task, msg); 1175423Smax.romanov@nginx.com } 1176423Smax.romanov@nginx.com 1177423Smax.romanov@nginx.com nxt_buf_chain_add(&fmsg->buf, msg->buf); 1178423Smax.romanov@nginx.com 1179423Smax.romanov@nginx.com fmsg->size += msg->size; 1180423Smax.romanov@nginx.com msg->buf = NULL; 1181423Smax.romanov@nginx.com b = NULL; 1182423Smax.romanov@nginx.com 1183423Smax.romanov@nginx.com if (nxt_fast_path(msg->port_msg.mf == 0)) { 1184423Smax.romanov@nginx.com 1185423Smax.romanov@nginx.com b = fmsg->buf; 1186423Smax.romanov@nginx.com 1187423Smax.romanov@nginx.com port->handler(task, fmsg); 1188423Smax.romanov@nginx.com 1189423Smax.romanov@nginx.com msg->buf = fmsg->buf; 11901558Smax.romanov@nginx.com msg->fd[0] = fmsg->fd[0]; 11911558Smax.romanov@nginx.com msg->fd[1] = fmsg->fd[1]; 1192974Smax.romanov@nginx.com 1193974Smax.romanov@nginx.com /* 1194974Smax.romanov@nginx.com * To disable instant completion or buffer re-usage, 1195974Smax.romanov@nginx.com * handler should reset 'msg.buf'. 1196974Smax.romanov@nginx.com */ 1197974Smax.romanov@nginx.com if (!msg->port_msg.mmap && msg->buf == b) { 1198974Smax.romanov@nginx.com nxt_port_buf_free(port, b); 1199974Smax.romanov@nginx.com } 1200423Smax.romanov@nginx.com } 1201352Smax.romanov@nginx.com } 1202352Smax.romanov@nginx.com 1203352Smax.romanov@nginx.com if (nxt_fast_path(msg->port_msg.mf == 0)) { 1204352Smax.romanov@nginx.com nxt_mp_free(port->mem_pool, fmsg); 1205352Smax.romanov@nginx.com } 1206352Smax.romanov@nginx.com } else { 1207352Smax.romanov@nginx.com if (nxt_slow_path(msg->port_msg.mf != 0)) { 1208423Smax.romanov@nginx.com 1209423Smax.romanov@nginx.com if (msg->port_msg.mmap && msg->cancelled == 0) { 1210423Smax.romanov@nginx.com nxt_port_mmap_read(task, msg); 1211423Smax.romanov@nginx.com b = msg->buf; 1212423Smax.romanov@nginx.com } 1213423Smax.romanov@nginx.com 1214352Smax.romanov@nginx.com fmsg = nxt_port_frag_start(task, port, msg); 1215352Smax.romanov@nginx.com 1216551Smax.romanov@nginx.com if (nxt_slow_path(fmsg == NULL)) { 1217551Smax.romanov@nginx.com goto fmsg_failed; 1218551Smax.romanov@nginx.com } 1219352Smax.romanov@nginx.com 1220352Smax.romanov@nginx.com fmsg->port_msg.nf = 0; 1221352Smax.romanov@nginx.com fmsg->port_msg.mf = 0; 1222352Smax.romanov@nginx.com 1223423Smax.romanov@nginx.com if (nxt_fast_path(msg->cancelled == 0)) { 1224423Smax.romanov@nginx.com msg->buf = NULL; 12251558Smax.romanov@nginx.com msg->fd[0] = -1; 12261558Smax.romanov@nginx.com msg->fd[1] = -1; 1227423Smax.romanov@nginx.com b = NULL; 1228423Smax.romanov@nginx.com 1229423Smax.romanov@nginx.com } else { 12301558Smax.romanov@nginx.com if (msg->fd[0] != -1) { 12311558Smax.romanov@nginx.com nxt_fd_close(msg->fd[0]); 1232423Smax.romanov@nginx.com } 12331553Smax.romanov@nginx.com 12341558Smax.romanov@nginx.com if (msg->fd[1] != -1) { 12351558Smax.romanov@nginx.com nxt_fd_close(msg->fd[1]); 12361553Smax.romanov@nginx.com } 1237423Smax.romanov@nginx.com } 1238352Smax.romanov@nginx.com } else { 1239423Smax.romanov@nginx.com if (nxt_fast_path(msg->cancelled == 0)) { 1240423Smax.romanov@nginx.com 1241423Smax.romanov@nginx.com if (msg->port_msg.mmap) { 1242423Smax.romanov@nginx.com nxt_port_mmap_read(task, msg); 1243423Smax.romanov@nginx.com b = msg->buf; 1244423Smax.romanov@nginx.com } 1245423Smax.romanov@nginx.com 1246423Smax.romanov@nginx.com port->handler(task, msg); 1247423Smax.romanov@nginx.com } 1248352Smax.romanov@nginx.com } 1249352Smax.romanov@nginx.com } 125042Smax.romanov@nginx.com 1251551Smax.romanov@nginx.com fmsg_failed: 1252551Smax.romanov@nginx.com 125382Smax.romanov@nginx.com if (msg->port_msg.mmap && orig_b != b) { 125442Smax.romanov@nginx.com 1255194Smax.romanov@nginx.com /* 1256194Smax.romanov@nginx.com * To disable instant buffer completion, 1257194Smax.romanov@nginx.com * handler should reset 'msg->buf'. 1258194Smax.romanov@nginx.com */ 1259194Smax.romanov@nginx.com if (msg->buf == b) { 1260194Smax.romanov@nginx.com /* complete mmap buffers */ 12611269Sigor@sysoev.ru while (b != NULL) { 1262194Smax.romanov@nginx.com nxt_debug(task, "complete buffer %p", b); 1263194Smax.romanov@nginx.com 1264194Smax.romanov@nginx.com nxt_work_queue_add(port->socket.read_work_queue, 1265194Smax.romanov@nginx.com b->completion_handler, task, b, b->parent); 12661269Sigor@sysoev.ru 12671269Sigor@sysoev.ru next = b->next; 12681269Sigor@sysoev.ru b->next = NULL; 12691269Sigor@sysoev.ru b = next; 1270194Smax.romanov@nginx.com } 127142Smax.romanov@nginx.com } 1272194Smax.romanov@nginx.com 1273194Smax.romanov@nginx.com /* restore original buf */ 1274194Smax.romanov@nginx.com msg->buf = orig_b; 127542Smax.romanov@nginx.com } 127611Sigor@sysoev.ru } 127711Sigor@sysoev.ru 127811Sigor@sysoev.ru 127911Sigor@sysoev.ru static nxt_buf_t * 128011Sigor@sysoev.ru nxt_port_buf_alloc(nxt_port_t *port) 128111Sigor@sysoev.ru { 128211Sigor@sysoev.ru nxt_buf_t *b; 128311Sigor@sysoev.ru 128411Sigor@sysoev.ru if (port->free_bufs != NULL) { 128511Sigor@sysoev.ru b = port->free_bufs; 128611Sigor@sysoev.ru port->free_bufs = b->next; 128711Sigor@sysoev.ru 128811Sigor@sysoev.ru b->mem.pos = b->mem.start; 128911Sigor@sysoev.ru b->mem.free = b->mem.start; 129042Smax.romanov@nginx.com b->next = NULL; 129111Sigor@sysoev.ru } else { 129211Sigor@sysoev.ru b = nxt_buf_mem_alloc(port->mem_pool, port->max_size, 0); 129311Sigor@sysoev.ru if (nxt_slow_path(b == NULL)) { 129411Sigor@sysoev.ru return NULL; 129511Sigor@sysoev.ru } 129611Sigor@sysoev.ru } 129711Sigor@sysoev.ru 129811Sigor@sysoev.ru return b; 129911Sigor@sysoev.ru } 130011Sigor@sysoev.ru 130111Sigor@sysoev.ru 130211Sigor@sysoev.ru static void 130311Sigor@sysoev.ru nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b) 130411Sigor@sysoev.ru { 1305974Smax.romanov@nginx.com nxt_buf_chain_add(&b, port->free_bufs); 130611Sigor@sysoev.ru port->free_bufs = b; 130711Sigor@sysoev.ru } 130811Sigor@sysoev.ru 130911Sigor@sysoev.ru 131011Sigor@sysoev.ru static void 131111Sigor@sysoev.ru nxt_port_error_handler(nxt_task_t *task, void *obj, void *data) 131211Sigor@sysoev.ru { 1313343Smax.romanov@nginx.com int use_delta; 13141269Sigor@sysoev.ru nxt_buf_t *b, *next; 1315197Smax.romanov@nginx.com nxt_port_t *port; 1316197Smax.romanov@nginx.com nxt_work_queue_t *wq; 1317197Smax.romanov@nginx.com nxt_port_send_msg_t *msg; 1318197Smax.romanov@nginx.com 1319125Smax.romanov@nginx.com nxt_debug(task, "port error handler %p", obj); 132011Sigor@sysoev.ru /* TODO */ 1321197Smax.romanov@nginx.com 1322197Smax.romanov@nginx.com port = nxt_container_of(obj, nxt_port_t, socket); 1323197Smax.romanov@nginx.com 1324343Smax.romanov@nginx.com use_delta = 0; 1325343Smax.romanov@nginx.com 1326343Smax.romanov@nginx.com if (obj == data) { 1327343Smax.romanov@nginx.com use_delta--; 1328343Smax.romanov@nginx.com } 1329197Smax.romanov@nginx.com 1330343Smax.romanov@nginx.com wq = &task->thread->engine->fast_work_queue; 1331343Smax.romanov@nginx.com 1332343Smax.romanov@nginx.com nxt_thread_mutex_lock(&port->write_mutex); 1333343Smax.romanov@nginx.com 1334343Smax.romanov@nginx.com nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) { 1335197Smax.romanov@nginx.com 13361558Smax.romanov@nginx.com if (msg->close_fd) { 13371558Smax.romanov@nginx.com if (msg->fd[0] != -1) { 13381558Smax.romanov@nginx.com nxt_fd_close(msg->fd[0]); 13391485Smax.romanov@nginx.com 13401558Smax.romanov@nginx.com msg->fd[0] = -1; 13411558Smax.romanov@nginx.com } 13421485Smax.romanov@nginx.com 13431558Smax.romanov@nginx.com if (msg->fd[1] != -1) { 13441558Smax.romanov@nginx.com nxt_fd_close(msg->fd[1]); 13451553Smax.romanov@nginx.com 13461558Smax.romanov@nginx.com msg->fd[1] = -1; 13471558Smax.romanov@nginx.com } 13481553Smax.romanov@nginx.com } 13491553Smax.romanov@nginx.com 13501269Sigor@sysoev.ru for (b = msg->buf; b != NULL; b = next) { 13511269Sigor@sysoev.ru next = b->next; 13521269Sigor@sysoev.ru b->next = NULL; 13531269Sigor@sysoev.ru 1354197Smax.romanov@nginx.com if (nxt_buf_is_sync(b)) { 1355197Smax.romanov@nginx.com continue; 1356197Smax.romanov@nginx.com } 1357197Smax.romanov@nginx.com 1358197Smax.romanov@nginx.com nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent); 1359197Smax.romanov@nginx.com } 1360197Smax.romanov@nginx.com 1361197Smax.romanov@nginx.com nxt_queue_remove(&msg->link); 1362343Smax.romanov@nginx.com use_delta--; 13631125Smax.romanov@nginx.com 13641125Smax.romanov@nginx.com nxt_port_release_send_msg(msg); 1365197Smax.romanov@nginx.com 1366197Smax.romanov@nginx.com } nxt_queue_loop; 1367343Smax.romanov@nginx.com 1368343Smax.romanov@nginx.com nxt_thread_mutex_unlock(&port->write_mutex); 1369343Smax.romanov@nginx.com 1370343Smax.romanov@nginx.com if (use_delta != 0) { 1371343Smax.romanov@nginx.com nxt_port_use(task, port, use_delta); 1372343Smax.romanov@nginx.com } 137311Sigor@sysoev.ru } 1374