162Sigor@sysoev.ru 262Sigor@sysoev.ru /* 362Sigor@sysoev.ru * Copyright (C) Igor Sysoev 462Sigor@sysoev.ru * Copyright (C) NGINX, Inc. 562Sigor@sysoev.ru */ 662Sigor@sysoev.ru 762Sigor@sysoev.ru #include <nxt_main.h> 862Sigor@sysoev.ru 962Sigor@sysoev.ru 1062Sigor@sysoev.ru void 1162Sigor@sysoev.ru nxt_conn_wait(nxt_conn_t *c) 1262Sigor@sysoev.ru { 1362Sigor@sysoev.ru nxt_event_engine_t *engine; 1462Sigor@sysoev.ru const nxt_conn_state_t *state; 1562Sigor@sysoev.ru 1662Sigor@sysoev.ru nxt_debug(c->socket.task, "conn wait fd:%d rdy:%d", 1762Sigor@sysoev.ru c->socket.fd, c->socket.read_ready); 1862Sigor@sysoev.ru 1962Sigor@sysoev.ru engine = c->socket.task->thread->engine; 2062Sigor@sysoev.ru state = c->read_state; 2162Sigor@sysoev.ru 2262Sigor@sysoev.ru if (c->socket.read_ready) { 2362Sigor@sysoev.ru nxt_work_queue_add(&engine->fast_work_queue, state->ready_handler, 2462Sigor@sysoev.ru c->socket.task, c, c->socket.data); 2562Sigor@sysoev.ru return; 2662Sigor@sysoev.ru } 2762Sigor@sysoev.ru 2862Sigor@sysoev.ru c->socket.read_handler = state->ready_handler; 2962Sigor@sysoev.ru c->socket.error_handler = state->error_handler; 3062Sigor@sysoev.ru 3162Sigor@sysoev.ru nxt_conn_timer(engine, c, state, &c->read_timer); 3262Sigor@sysoev.ru 3362Sigor@sysoev.ru nxt_fd_event_enable_read(engine, &c->socket); 3462Sigor@sysoev.ru } 3562Sigor@sysoev.ru 3662Sigor@sysoev.ru 3762Sigor@sysoev.ru void 3862Sigor@sysoev.ru nxt_conn_io_read(nxt_task_t *task, void *obj, void *data) 3962Sigor@sysoev.ru { 4062Sigor@sysoev.ru ssize_t n; 4162Sigor@sysoev.ru nxt_conn_t *c; 4262Sigor@sysoev.ru nxt_work_queue_t *wq; 4362Sigor@sysoev.ru nxt_event_engine_t *engine; 4462Sigor@sysoev.ru nxt_work_handler_t handler; 4562Sigor@sysoev.ru const nxt_conn_state_t *state; 4662Sigor@sysoev.ru 4762Sigor@sysoev.ru c = obj; 4862Sigor@sysoev.ru 4962Sigor@sysoev.ru nxt_debug(task, "conn read fd:%d rdy:%d cl:%d", 5062Sigor@sysoev.ru c->socket.fd, c->socket.read_ready, c->socket.closed); 5162Sigor@sysoev.ru 5262Sigor@sysoev.ru engine = task->thread->engine; 5362Sigor@sysoev.ru 5462Sigor@sysoev.ru state = c->read_state; 5562Sigor@sysoev.ru 5662Sigor@sysoev.ru if (c->socket.read_ready) { 5762Sigor@sysoev.ru 58629Sigor@sysoev.ru if (state->io_read_handler == NULL) { 59629Sigor@sysoev.ru n = c->io->recvbuf(c, c->read); 6062Sigor@sysoev.ru 6162Sigor@sysoev.ru } else { 62629Sigor@sysoev.ru n = state->io_read_handler(c); 63*741Sigor@sysoev.ru /* The state can be changed by io_read_handler. */ 64*741Sigor@sysoev.ru state = c->read_state; 6562Sigor@sysoev.ru } 6662Sigor@sysoev.ru 6762Sigor@sysoev.ru if (n > 0) { 6862Sigor@sysoev.ru c->nbytes = n; 6962Sigor@sysoev.ru 70629Sigor@sysoev.ru nxt_recvbuf_update(c->read, n); 7162Sigor@sysoev.ru 7262Sigor@sysoev.ru nxt_fd_event_block_read(engine, &c->socket); 7362Sigor@sysoev.ru 7462Sigor@sysoev.ru if (state->timer_autoreset) { 7562Sigor@sysoev.ru nxt_timer_disable(engine, &c->read_timer); 7662Sigor@sysoev.ru } 7762Sigor@sysoev.ru 7862Sigor@sysoev.ru wq = c->read_work_queue; 7962Sigor@sysoev.ru handler = state->ready_handler; 8062Sigor@sysoev.ru 8162Sigor@sysoev.ru nxt_work_queue_add(wq, handler, task, c, data); 8262Sigor@sysoev.ru 8362Sigor@sysoev.ru return; 8462Sigor@sysoev.ru } 8562Sigor@sysoev.ru 8662Sigor@sysoev.ru if (n != NXT_AGAIN) { 8762Sigor@sysoev.ru nxt_fd_event_block_read(engine, &c->socket); 8862Sigor@sysoev.ru nxt_timer_disable(engine, &c->read_timer); 8962Sigor@sysoev.ru 9062Sigor@sysoev.ru wq = &engine->fast_work_queue; 9162Sigor@sysoev.ru 9262Sigor@sysoev.ru handler = (n == 0) ? state->close_handler : state->error_handler; 9362Sigor@sysoev.ru 9462Sigor@sysoev.ru nxt_work_queue_add(wq, handler, task, c, data); 9562Sigor@sysoev.ru 9662Sigor@sysoev.ru return; 9762Sigor@sysoev.ru } 9862Sigor@sysoev.ru } 9962Sigor@sysoev.ru 10062Sigor@sysoev.ru /* 10162Sigor@sysoev.ru * Here c->io->read() is assigned instead of direct nxt_conn_io_read() 10262Sigor@sysoev.ru * because the function can be called by nxt_kqueue_conn_io_read(). 10362Sigor@sysoev.ru */ 10462Sigor@sysoev.ru c->socket.read_handler = c->io->read; 10562Sigor@sysoev.ru c->socket.error_handler = state->error_handler; 10662Sigor@sysoev.ru 10762Sigor@sysoev.ru if (c->read_timer.state == NXT_TIMER_DISABLED 10862Sigor@sysoev.ru || nxt_fd_event_is_disabled(c->socket.read)) 10962Sigor@sysoev.ru { 11062Sigor@sysoev.ru /* Timer may be set or reset. */ 11162Sigor@sysoev.ru nxt_conn_timer(engine, c, state, &c->read_timer); 11262Sigor@sysoev.ru 11362Sigor@sysoev.ru if (nxt_fd_event_is_disabled(c->socket.read)) { 11462Sigor@sysoev.ru nxt_fd_event_enable_read(engine, &c->socket); 11562Sigor@sysoev.ru } 11662Sigor@sysoev.ru } 11762Sigor@sysoev.ru 11862Sigor@sysoev.ru return; 11962Sigor@sysoev.ru } 12062Sigor@sysoev.ru 12162Sigor@sysoev.ru 12262Sigor@sysoev.ru ssize_t 12362Sigor@sysoev.ru nxt_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b) 12462Sigor@sysoev.ru { 12562Sigor@sysoev.ru ssize_t n; 12662Sigor@sysoev.ru nxt_err_t err; 12762Sigor@sysoev.ru nxt_uint_t niov; 12862Sigor@sysoev.ru struct iovec iov[NXT_IOBUF_MAX]; 12962Sigor@sysoev.ru nxt_recvbuf_coalesce_t rb; 13062Sigor@sysoev.ru 13162Sigor@sysoev.ru rb.buf = b; 13262Sigor@sysoev.ru rb.iobuf = iov; 13362Sigor@sysoev.ru rb.nmax = NXT_IOBUF_MAX; 13462Sigor@sysoev.ru rb.size = 0; 13562Sigor@sysoev.ru 13662Sigor@sysoev.ru niov = nxt_recvbuf_mem_coalesce(&rb); 13762Sigor@sysoev.ru 13862Sigor@sysoev.ru if (niov == 1) { 13962Sigor@sysoev.ru /* Disposal of surplus kernel iovec copy-in operation. */ 14062Sigor@sysoev.ru return nxt_conn_io_recv(c, iov->iov_base, iov->iov_len, 0); 14162Sigor@sysoev.ru } 14262Sigor@sysoev.ru 14362Sigor@sysoev.ru for ( ;; ) { 14462Sigor@sysoev.ru n = readv(c->socket.fd, iov, niov); 14562Sigor@sysoev.ru 14662Sigor@sysoev.ru err = (n == -1) ? nxt_socket_errno : 0; 14762Sigor@sysoev.ru 14862Sigor@sysoev.ru nxt_debug(c->socket.task, "readv(%d, %ui): %z", c->socket.fd, niov, n); 14962Sigor@sysoev.ru 15062Sigor@sysoev.ru if (n > 0) { 15162Sigor@sysoev.ru if ((size_t) n < rb.size) { 15262Sigor@sysoev.ru c->socket.read_ready = 0; 15362Sigor@sysoev.ru } 15462Sigor@sysoev.ru 15562Sigor@sysoev.ru return n; 15662Sigor@sysoev.ru } 15762Sigor@sysoev.ru 15862Sigor@sysoev.ru if (n == 0) { 15962Sigor@sysoev.ru c->socket.closed = 1; 16062Sigor@sysoev.ru c->socket.read_ready = 0; 16162Sigor@sysoev.ru return n; 16262Sigor@sysoev.ru } 16362Sigor@sysoev.ru 16462Sigor@sysoev.ru /* n == -1 */ 16562Sigor@sysoev.ru 16662Sigor@sysoev.ru switch (err) { 16762Sigor@sysoev.ru 16862Sigor@sysoev.ru case NXT_EAGAIN: 16962Sigor@sysoev.ru nxt_debug(c->socket.task, "readv() %E", err); 17062Sigor@sysoev.ru c->socket.read_ready = 0; 17162Sigor@sysoev.ru return NXT_AGAIN; 17262Sigor@sysoev.ru 17362Sigor@sysoev.ru case NXT_EINTR: 17462Sigor@sysoev.ru nxt_debug(c->socket.task, "readv() %E", err); 17562Sigor@sysoev.ru continue; 17662Sigor@sysoev.ru 17762Sigor@sysoev.ru default: 17862Sigor@sysoev.ru c->socket.error = err; 17962Sigor@sysoev.ru nxt_log(c->socket.task, nxt_socket_error_level(err), 18062Sigor@sysoev.ru "readv(%d, %ui) failed %E", c->socket.fd, niov, err); 18162Sigor@sysoev.ru 18262Sigor@sysoev.ru return NXT_ERROR; 18362Sigor@sysoev.ru } 18462Sigor@sysoev.ru } 18562Sigor@sysoev.ru } 18662Sigor@sysoev.ru 18762Sigor@sysoev.ru 18862Sigor@sysoev.ru ssize_t 18962Sigor@sysoev.ru nxt_conn_io_recv(nxt_conn_t *c, void *buf, size_t size, nxt_uint_t flags) 19062Sigor@sysoev.ru { 19162Sigor@sysoev.ru ssize_t n; 19262Sigor@sysoev.ru nxt_err_t err; 19362Sigor@sysoev.ru 19462Sigor@sysoev.ru for ( ;; ) { 19562Sigor@sysoev.ru n = recv(c->socket.fd, buf, size, flags); 19662Sigor@sysoev.ru 19762Sigor@sysoev.ru err = (n == -1) ? nxt_socket_errno : 0; 19862Sigor@sysoev.ru 19962Sigor@sysoev.ru nxt_debug(c->socket.task, "recv(%d, %p, %uz, 0x%ui): %z", 20062Sigor@sysoev.ru c->socket.fd, buf, size, flags, n); 20162Sigor@sysoev.ru 20262Sigor@sysoev.ru if (n > 0) { 20362Sigor@sysoev.ru if ((size_t) n < size) { 20462Sigor@sysoev.ru c->socket.read_ready = 0; 20562Sigor@sysoev.ru } 20662Sigor@sysoev.ru 20762Sigor@sysoev.ru return n; 20862Sigor@sysoev.ru } 20962Sigor@sysoev.ru 21062Sigor@sysoev.ru if (n == 0) { 21162Sigor@sysoev.ru c->socket.closed = 1; 21262Sigor@sysoev.ru c->socket.read_ready = 0; 21362Sigor@sysoev.ru 21462Sigor@sysoev.ru return n; 21562Sigor@sysoev.ru } 21662Sigor@sysoev.ru 21762Sigor@sysoev.ru /* n == -1 */ 21862Sigor@sysoev.ru 21962Sigor@sysoev.ru switch (err) { 22062Sigor@sysoev.ru 22162Sigor@sysoev.ru case NXT_EAGAIN: 22262Sigor@sysoev.ru nxt_debug(c->socket.task, "recv() %E", err); 22362Sigor@sysoev.ru c->socket.read_ready = 0; 22462Sigor@sysoev.ru 22562Sigor@sysoev.ru return NXT_AGAIN; 22662Sigor@sysoev.ru 22762Sigor@sysoev.ru case NXT_EINTR: 22862Sigor@sysoev.ru nxt_debug(c->socket.task, "recv() %E", err); 22962Sigor@sysoev.ru continue; 23062Sigor@sysoev.ru 23162Sigor@sysoev.ru default: 23262Sigor@sysoev.ru c->socket.error = err; 23362Sigor@sysoev.ru nxt_log(c->socket.task, nxt_socket_error_level(err), 23462Sigor@sysoev.ru "recv(%d, %p, %uz, %ui) failed %E", 23562Sigor@sysoev.ru c->socket.fd, buf, size, flags, err); 23662Sigor@sysoev.ru 23762Sigor@sysoev.ru return NXT_ERROR; 23862Sigor@sysoev.ru } 23962Sigor@sysoev.ru } 24062Sigor@sysoev.ru } 241