1*62Sigor@sysoev.ru 2*62Sigor@sysoev.ru /* 3*62Sigor@sysoev.ru * Copyright (C) Igor Sysoev 4*62Sigor@sysoev.ru * Copyright (C) NGINX, Inc. 5*62Sigor@sysoev.ru */ 6*62Sigor@sysoev.ru 7*62Sigor@sysoev.ru #include <nxt_main.h> 8*62Sigor@sysoev.ru 9*62Sigor@sysoev.ru 10*62Sigor@sysoev.ru void 11*62Sigor@sysoev.ru nxt_conn_wait(nxt_conn_t *c) 12*62Sigor@sysoev.ru { 13*62Sigor@sysoev.ru nxt_event_engine_t *engine; 14*62Sigor@sysoev.ru const nxt_conn_state_t *state; 15*62Sigor@sysoev.ru 16*62Sigor@sysoev.ru nxt_debug(c->socket.task, "conn wait fd:%d rdy:%d", 17*62Sigor@sysoev.ru c->socket.fd, c->socket.read_ready); 18*62Sigor@sysoev.ru 19*62Sigor@sysoev.ru engine = c->socket.task->thread->engine; 20*62Sigor@sysoev.ru state = c->read_state; 21*62Sigor@sysoev.ru 22*62Sigor@sysoev.ru if (c->socket.read_ready) { 23*62Sigor@sysoev.ru nxt_work_queue_add(&engine->fast_work_queue, state->ready_handler, 24*62Sigor@sysoev.ru c->socket.task, c, c->socket.data); 25*62Sigor@sysoev.ru return; 26*62Sigor@sysoev.ru } 27*62Sigor@sysoev.ru 28*62Sigor@sysoev.ru c->socket.read_handler = state->ready_handler; 29*62Sigor@sysoev.ru c->socket.error_handler = state->error_handler; 30*62Sigor@sysoev.ru 31*62Sigor@sysoev.ru nxt_conn_timer(engine, c, state, &c->read_timer); 32*62Sigor@sysoev.ru 33*62Sigor@sysoev.ru nxt_fd_event_enable_read(engine, &c->socket); 34*62Sigor@sysoev.ru } 35*62Sigor@sysoev.ru 36*62Sigor@sysoev.ru 37*62Sigor@sysoev.ru void 38*62Sigor@sysoev.ru nxt_conn_io_read(nxt_task_t *task, void *obj, void *data) 39*62Sigor@sysoev.ru { 40*62Sigor@sysoev.ru ssize_t n; 41*62Sigor@sysoev.ru nxt_buf_t *b; 42*62Sigor@sysoev.ru nxt_conn_t *c; 43*62Sigor@sysoev.ru nxt_work_queue_t *wq; 44*62Sigor@sysoev.ru nxt_event_engine_t *engine; 45*62Sigor@sysoev.ru nxt_work_handler_t handler; 46*62Sigor@sysoev.ru const nxt_conn_state_t *state; 47*62Sigor@sysoev.ru 48*62Sigor@sysoev.ru c = obj; 49*62Sigor@sysoev.ru 50*62Sigor@sysoev.ru nxt_debug(task, "conn read fd:%d rdy:%d cl:%d", 51*62Sigor@sysoev.ru c->socket.fd, c->socket.read_ready, c->socket.closed); 52*62Sigor@sysoev.ru 53*62Sigor@sysoev.ru engine = task->thread->engine; 54*62Sigor@sysoev.ru 55*62Sigor@sysoev.ru state = c->read_state; 56*62Sigor@sysoev.ru 57*62Sigor@sysoev.ru if (c->socket.read_ready) { 58*62Sigor@sysoev.ru 59*62Sigor@sysoev.ru b = c->read; 60*62Sigor@sysoev.ru 61*62Sigor@sysoev.ru if (c->peek == 0) { 62*62Sigor@sysoev.ru n = c->io->recvbuf(c, b); 63*62Sigor@sysoev.ru 64*62Sigor@sysoev.ru } else { 65*62Sigor@sysoev.ru n = c->io->recv(c, b->mem.free, c->peek, MSG_PEEK); 66*62Sigor@sysoev.ru } 67*62Sigor@sysoev.ru 68*62Sigor@sysoev.ru if (n > 0) { 69*62Sigor@sysoev.ru c->nbytes = n; 70*62Sigor@sysoev.ru 71*62Sigor@sysoev.ru nxt_recvbuf_update(b, n); 72*62Sigor@sysoev.ru 73*62Sigor@sysoev.ru nxt_fd_event_block_read(engine, &c->socket); 74*62Sigor@sysoev.ru 75*62Sigor@sysoev.ru if (state->timer_autoreset) { 76*62Sigor@sysoev.ru nxt_timer_disable(engine, &c->read_timer); 77*62Sigor@sysoev.ru } 78*62Sigor@sysoev.ru 79*62Sigor@sysoev.ru wq = c->read_work_queue; 80*62Sigor@sysoev.ru handler = state->ready_handler; 81*62Sigor@sysoev.ru 82*62Sigor@sysoev.ru nxt_work_queue_add(wq, handler, task, c, data); 83*62Sigor@sysoev.ru 84*62Sigor@sysoev.ru return; 85*62Sigor@sysoev.ru } 86*62Sigor@sysoev.ru 87*62Sigor@sysoev.ru if (n != NXT_AGAIN) { 88*62Sigor@sysoev.ru nxt_fd_event_block_read(engine, &c->socket); 89*62Sigor@sysoev.ru nxt_timer_disable(engine, &c->read_timer); 90*62Sigor@sysoev.ru 91*62Sigor@sysoev.ru wq = &engine->fast_work_queue; 92*62Sigor@sysoev.ru 93*62Sigor@sysoev.ru handler = (n == 0) ? state->close_handler : state->error_handler; 94*62Sigor@sysoev.ru 95*62Sigor@sysoev.ru nxt_work_queue_add(wq, handler, task, c, data); 96*62Sigor@sysoev.ru 97*62Sigor@sysoev.ru return; 98*62Sigor@sysoev.ru } 99*62Sigor@sysoev.ru } 100*62Sigor@sysoev.ru 101*62Sigor@sysoev.ru /* 102*62Sigor@sysoev.ru * Here c->io->read() is assigned instead of direct nxt_conn_io_read() 103*62Sigor@sysoev.ru * because the function can be called by nxt_kqueue_conn_io_read(). 104*62Sigor@sysoev.ru */ 105*62Sigor@sysoev.ru c->socket.read_handler = c->io->read; 106*62Sigor@sysoev.ru c->socket.error_handler = state->error_handler; 107*62Sigor@sysoev.ru 108*62Sigor@sysoev.ru if (c->read_timer.state == NXT_TIMER_DISABLED 109*62Sigor@sysoev.ru || nxt_fd_event_is_disabled(c->socket.read)) 110*62Sigor@sysoev.ru { 111*62Sigor@sysoev.ru /* Timer may be set or reset. */ 112*62Sigor@sysoev.ru nxt_conn_timer(engine, c, state, &c->read_timer); 113*62Sigor@sysoev.ru 114*62Sigor@sysoev.ru if (nxt_fd_event_is_disabled(c->socket.read)) { 115*62Sigor@sysoev.ru nxt_fd_event_enable_read(engine, &c->socket); 116*62Sigor@sysoev.ru } 117*62Sigor@sysoev.ru } 118*62Sigor@sysoev.ru 119*62Sigor@sysoev.ru return; 120*62Sigor@sysoev.ru } 121*62Sigor@sysoev.ru 122*62Sigor@sysoev.ru 123*62Sigor@sysoev.ru ssize_t 124*62Sigor@sysoev.ru nxt_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b) 125*62Sigor@sysoev.ru { 126*62Sigor@sysoev.ru ssize_t n; 127*62Sigor@sysoev.ru nxt_err_t err; 128*62Sigor@sysoev.ru nxt_uint_t niov; 129*62Sigor@sysoev.ru struct iovec iov[NXT_IOBUF_MAX]; 130*62Sigor@sysoev.ru nxt_recvbuf_coalesce_t rb; 131*62Sigor@sysoev.ru 132*62Sigor@sysoev.ru rb.buf = b; 133*62Sigor@sysoev.ru rb.iobuf = iov; 134*62Sigor@sysoev.ru rb.nmax = NXT_IOBUF_MAX; 135*62Sigor@sysoev.ru rb.size = 0; 136*62Sigor@sysoev.ru 137*62Sigor@sysoev.ru niov = nxt_recvbuf_mem_coalesce(&rb); 138*62Sigor@sysoev.ru 139*62Sigor@sysoev.ru if (niov == 1) { 140*62Sigor@sysoev.ru /* Disposal of surplus kernel iovec copy-in operation. */ 141*62Sigor@sysoev.ru return nxt_conn_io_recv(c, iov->iov_base, iov->iov_len, 0); 142*62Sigor@sysoev.ru } 143*62Sigor@sysoev.ru 144*62Sigor@sysoev.ru for ( ;; ) { 145*62Sigor@sysoev.ru n = readv(c->socket.fd, iov, niov); 146*62Sigor@sysoev.ru 147*62Sigor@sysoev.ru err = (n == -1) ? nxt_socket_errno : 0; 148*62Sigor@sysoev.ru 149*62Sigor@sysoev.ru nxt_debug(c->socket.task, "readv(%d, %ui): %z", c->socket.fd, niov, n); 150*62Sigor@sysoev.ru 151*62Sigor@sysoev.ru if (n > 0) { 152*62Sigor@sysoev.ru if ((size_t) n < rb.size) { 153*62Sigor@sysoev.ru c->socket.read_ready = 0; 154*62Sigor@sysoev.ru } 155*62Sigor@sysoev.ru 156*62Sigor@sysoev.ru return n; 157*62Sigor@sysoev.ru } 158*62Sigor@sysoev.ru 159*62Sigor@sysoev.ru if (n == 0) { 160*62Sigor@sysoev.ru c->socket.closed = 1; 161*62Sigor@sysoev.ru c->socket.read_ready = 0; 162*62Sigor@sysoev.ru return n; 163*62Sigor@sysoev.ru } 164*62Sigor@sysoev.ru 165*62Sigor@sysoev.ru /* n == -1 */ 166*62Sigor@sysoev.ru 167*62Sigor@sysoev.ru switch (err) { 168*62Sigor@sysoev.ru 169*62Sigor@sysoev.ru case NXT_EAGAIN: 170*62Sigor@sysoev.ru nxt_debug(c->socket.task, "readv() %E", err); 171*62Sigor@sysoev.ru c->socket.read_ready = 0; 172*62Sigor@sysoev.ru return NXT_AGAIN; 173*62Sigor@sysoev.ru 174*62Sigor@sysoev.ru case NXT_EINTR: 175*62Sigor@sysoev.ru nxt_debug(c->socket.task, "readv() %E", err); 176*62Sigor@sysoev.ru continue; 177*62Sigor@sysoev.ru 178*62Sigor@sysoev.ru default: 179*62Sigor@sysoev.ru c->socket.error = err; 180*62Sigor@sysoev.ru nxt_log(c->socket.task, nxt_socket_error_level(err), 181*62Sigor@sysoev.ru "readv(%d, %ui) failed %E", c->socket.fd, niov, err); 182*62Sigor@sysoev.ru 183*62Sigor@sysoev.ru return NXT_ERROR; 184*62Sigor@sysoev.ru } 185*62Sigor@sysoev.ru } 186*62Sigor@sysoev.ru } 187*62Sigor@sysoev.ru 188*62Sigor@sysoev.ru 189*62Sigor@sysoev.ru ssize_t 190*62Sigor@sysoev.ru nxt_conn_io_recv(nxt_conn_t *c, void *buf, size_t size, nxt_uint_t flags) 191*62Sigor@sysoev.ru { 192*62Sigor@sysoev.ru ssize_t n; 193*62Sigor@sysoev.ru nxt_err_t err; 194*62Sigor@sysoev.ru 195*62Sigor@sysoev.ru for ( ;; ) { 196*62Sigor@sysoev.ru n = recv(c->socket.fd, buf, size, flags); 197*62Sigor@sysoev.ru 198*62Sigor@sysoev.ru err = (n == -1) ? nxt_socket_errno : 0; 199*62Sigor@sysoev.ru 200*62Sigor@sysoev.ru nxt_debug(c->socket.task, "recv(%d, %p, %uz, 0x%ui): %z", 201*62Sigor@sysoev.ru c->socket.fd, buf, size, flags, n); 202*62Sigor@sysoev.ru 203*62Sigor@sysoev.ru if (n > 0) { 204*62Sigor@sysoev.ru if ((size_t) n < size) { 205*62Sigor@sysoev.ru c->socket.read_ready = 0; 206*62Sigor@sysoev.ru } 207*62Sigor@sysoev.ru 208*62Sigor@sysoev.ru return n; 209*62Sigor@sysoev.ru } 210*62Sigor@sysoev.ru 211*62Sigor@sysoev.ru if (n == 0) { 212*62Sigor@sysoev.ru c->socket.closed = 1; 213*62Sigor@sysoev.ru c->socket.read_ready = 0; 214*62Sigor@sysoev.ru 215*62Sigor@sysoev.ru return n; 216*62Sigor@sysoev.ru } 217*62Sigor@sysoev.ru 218*62Sigor@sysoev.ru /* n == -1 */ 219*62Sigor@sysoev.ru 220*62Sigor@sysoev.ru switch (err) { 221*62Sigor@sysoev.ru 222*62Sigor@sysoev.ru case NXT_EAGAIN: 223*62Sigor@sysoev.ru nxt_debug(c->socket.task, "recv() %E", err); 224*62Sigor@sysoev.ru c->socket.read_ready = 0; 225*62Sigor@sysoev.ru 226*62Sigor@sysoev.ru return NXT_AGAIN; 227*62Sigor@sysoev.ru 228*62Sigor@sysoev.ru case NXT_EINTR: 229*62Sigor@sysoev.ru nxt_debug(c->socket.task, "recv() %E", err); 230*62Sigor@sysoev.ru continue; 231*62Sigor@sysoev.ru 232*62Sigor@sysoev.ru default: 233*62Sigor@sysoev.ru c->socket.error = err; 234*62Sigor@sysoev.ru nxt_log(c->socket.task, nxt_socket_error_level(err), 235*62Sigor@sysoev.ru "recv(%d, %p, %uz, %ui) failed %E", 236*62Sigor@sysoev.ru c->socket.fd, buf, size, flags, err); 237*62Sigor@sysoev.ru 238*62Sigor@sysoev.ru return NXT_ERROR; 239*62Sigor@sysoev.ru } 240*62Sigor@sysoev.ru } 241*62Sigor@sysoev.ru } 242