xref: /unit/src/nxt_conn_read.c (revision 809)
162Sigor@sysoev.ru 
262Sigor@sysoev.ru /*
362Sigor@sysoev.ru  * Copyright (C) Igor Sysoev
462Sigor@sysoev.ru  * Copyright (C) NGINX, Inc.
562Sigor@sysoev.ru  */
662Sigor@sysoev.ru 
762Sigor@sysoev.ru #include <nxt_main.h>
862Sigor@sysoev.ru 
962Sigor@sysoev.ru 
1062Sigor@sysoev.ru void
1162Sigor@sysoev.ru nxt_conn_wait(nxt_conn_t *c)
1262Sigor@sysoev.ru {
1362Sigor@sysoev.ru     nxt_event_engine_t      *engine;
1462Sigor@sysoev.ru     const nxt_conn_state_t  *state;
1562Sigor@sysoev.ru 
1662Sigor@sysoev.ru     nxt_debug(c->socket.task, "conn wait fd:%d rdy:%d",
1762Sigor@sysoev.ru               c->socket.fd, c->socket.read_ready);
1862Sigor@sysoev.ru 
1962Sigor@sysoev.ru     engine = c->socket.task->thread->engine;
2062Sigor@sysoev.ru     state = c->read_state;
2162Sigor@sysoev.ru 
2262Sigor@sysoev.ru     if (c->socket.read_ready) {
2362Sigor@sysoev.ru         nxt_work_queue_add(&engine->fast_work_queue, state->ready_handler,
2462Sigor@sysoev.ru                            c->socket.task, c, c->socket.data);
2562Sigor@sysoev.ru         return;
2662Sigor@sysoev.ru     }
2762Sigor@sysoev.ru 
2862Sigor@sysoev.ru     c->socket.read_handler = state->ready_handler;
2962Sigor@sysoev.ru     c->socket.error_handler = state->error_handler;
3062Sigor@sysoev.ru 
3162Sigor@sysoev.ru     nxt_conn_timer(engine, c, state, &c->read_timer);
3262Sigor@sysoev.ru 
3362Sigor@sysoev.ru     nxt_fd_event_enable_read(engine, &c->socket);
3462Sigor@sysoev.ru }
3562Sigor@sysoev.ru 
3662Sigor@sysoev.ru 
3762Sigor@sysoev.ru void
3862Sigor@sysoev.ru nxt_conn_io_read(nxt_task_t *task, void *obj, void *data)
3962Sigor@sysoev.ru {
4062Sigor@sysoev.ru     ssize_t                 n;
4162Sigor@sysoev.ru     nxt_conn_t              *c;
4262Sigor@sysoev.ru     nxt_event_engine_t      *engine;
4362Sigor@sysoev.ru     nxt_work_handler_t      handler;
4462Sigor@sysoev.ru     const nxt_conn_state_t  *state;
4562Sigor@sysoev.ru 
4662Sigor@sysoev.ru     c = obj;
4762Sigor@sysoev.ru 
4862Sigor@sysoev.ru     nxt_debug(task, "conn read fd:%d rdy:%d cl:%d",
4962Sigor@sysoev.ru               c->socket.fd, c->socket.read_ready, c->socket.closed);
5062Sigor@sysoev.ru 
5162Sigor@sysoev.ru     engine = task->thread->engine;
5262Sigor@sysoev.ru 
53771Sigor@sysoev.ru     /*
54771Sigor@sysoev.ru      * Here c->io->read() is assigned instead of direct nxt_conn_io_read()
55771Sigor@sysoev.ru      * because the function can be called by nxt_kqueue_conn_io_read().
56771Sigor@sysoev.ru      */
57771Sigor@sysoev.ru     c->socket.read_handler = c->io->read;
5862Sigor@sysoev.ru     state = c->read_state;
59771Sigor@sysoev.ru     c->socket.error_handler = state->error_handler;
6062Sigor@sysoev.ru 
6162Sigor@sysoev.ru     if (c->socket.read_ready) {
6262Sigor@sysoev.ru 
63629Sigor@sysoev.ru         if (state->io_read_handler == NULL) {
64629Sigor@sysoev.ru             n = c->io->recvbuf(c, c->read);
6562Sigor@sysoev.ru 
6662Sigor@sysoev.ru         } else {
67629Sigor@sysoev.ru             n = state->io_read_handler(c);
68741Sigor@sysoev.ru             /* The state can be changed by io_read_handler. */
69741Sigor@sysoev.ru             state = c->read_state;
7062Sigor@sysoev.ru         }
7162Sigor@sysoev.ru 
7262Sigor@sysoev.ru         if (n > 0) {
7362Sigor@sysoev.ru             c->nbytes = n;
7462Sigor@sysoev.ru 
75629Sigor@sysoev.ru             nxt_recvbuf_update(c->read, n);
7662Sigor@sysoev.ru 
7762Sigor@sysoev.ru             nxt_fd_event_block_read(engine, &c->socket);
7862Sigor@sysoev.ru 
7962Sigor@sysoev.ru             if (state->timer_autoreset) {
8062Sigor@sysoev.ru                 nxt_timer_disable(engine, &c->read_timer);
8162Sigor@sysoev.ru             }
8262Sigor@sysoev.ru 
83771Sigor@sysoev.ru             nxt_work_queue_add(c->read_work_queue,
84771Sigor@sysoev.ru                                state->ready_handler, task, c, data);
8562Sigor@sysoev.ru             return;
8662Sigor@sysoev.ru         }
8762Sigor@sysoev.ru 
8862Sigor@sysoev.ru         if (n != NXT_AGAIN) {
89771Sigor@sysoev.ru             /* n == 0 or n == NXT_ERROR. */
90771Sigor@sysoev.ru             handler = (n == 0) ? state->close_handler : state->error_handler;
91771Sigor@sysoev.ru 
9262Sigor@sysoev.ru             nxt_fd_event_block_read(engine, &c->socket);
9362Sigor@sysoev.ru             nxt_timer_disable(engine, &c->read_timer);
9462Sigor@sysoev.ru 
95771Sigor@sysoev.ru             nxt_work_queue_add(&engine->fast_work_queue,
96771Sigor@sysoev.ru                                handler, task, c, data);
97771Sigor@sysoev.ru             return;
98771Sigor@sysoev.ru         }
99771Sigor@sysoev.ru 
100771Sigor@sysoev.ru         /* n == NXT_AGAIN. */
10162Sigor@sysoev.ru 
102771Sigor@sysoev.ru         if (c->socket.read_ready) {
103771Sigor@sysoev.ru             /*
104771Sigor@sysoev.ru              * SSL/TLS library can return NXT_AGAIN if renegotiation
105771Sigor@sysoev.ru              * occured during read operation, it toggled write event
106771Sigor@sysoev.ru              * internally so only read timer should be set.
107771Sigor@sysoev.ru              */
108*809Svbart@nginx.com             if (!c->read_timer.enabled) {
109771Sigor@sysoev.ru                 nxt_conn_timer(engine, c, state, &c->read_timer);
110771Sigor@sysoev.ru             }
11162Sigor@sysoev.ru 
11262Sigor@sysoev.ru             return;
11362Sigor@sysoev.ru         }
11462Sigor@sysoev.ru     }
11562Sigor@sysoev.ru 
116766Sigor@sysoev.ru     if (nxt_fd_event_is_disabled(c->socket.read)) {
117766Sigor@sysoev.ru         nxt_fd_event_enable_read(engine, &c->socket);
11862Sigor@sysoev.ru     }
11962Sigor@sysoev.ru 
120*809Svbart@nginx.com     if (state->timer_autoreset || !c->read_timer.enabled) {
121766Sigor@sysoev.ru         nxt_conn_timer(engine, c, state, &c->read_timer);
122766Sigor@sysoev.ru     }
12362Sigor@sysoev.ru }
12462Sigor@sysoev.ru 
12562Sigor@sysoev.ru 
12662Sigor@sysoev.ru ssize_t
12762Sigor@sysoev.ru nxt_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b)
12862Sigor@sysoev.ru {
12962Sigor@sysoev.ru     ssize_t                 n;
13062Sigor@sysoev.ru     nxt_err_t               err;
13162Sigor@sysoev.ru     nxt_uint_t              niov;
13262Sigor@sysoev.ru     struct iovec            iov[NXT_IOBUF_MAX];
13362Sigor@sysoev.ru     nxt_recvbuf_coalesce_t  rb;
13462Sigor@sysoev.ru 
13562Sigor@sysoev.ru     rb.buf = b;
13662Sigor@sysoev.ru     rb.iobuf = iov;
13762Sigor@sysoev.ru     rb.nmax = NXT_IOBUF_MAX;
13862Sigor@sysoev.ru     rb.size = 0;
13962Sigor@sysoev.ru 
14062Sigor@sysoev.ru     niov = nxt_recvbuf_mem_coalesce(&rb);
14162Sigor@sysoev.ru 
14262Sigor@sysoev.ru     if (niov == 1) {
14362Sigor@sysoev.ru         /* Disposal of surplus kernel iovec copy-in operation. */
14462Sigor@sysoev.ru         return nxt_conn_io_recv(c, iov->iov_base, iov->iov_len, 0);
14562Sigor@sysoev.ru     }
14662Sigor@sysoev.ru 
14762Sigor@sysoev.ru     for ( ;; ) {
14862Sigor@sysoev.ru         n = readv(c->socket.fd, iov, niov);
14962Sigor@sysoev.ru 
15062Sigor@sysoev.ru         err = (n == -1) ? nxt_socket_errno : 0;
15162Sigor@sysoev.ru 
15262Sigor@sysoev.ru         nxt_debug(c->socket.task, "readv(%d, %ui): %z", c->socket.fd, niov, n);
15362Sigor@sysoev.ru 
15462Sigor@sysoev.ru         if (n > 0) {
15562Sigor@sysoev.ru             if ((size_t) n < rb.size) {
15662Sigor@sysoev.ru                 c->socket.read_ready = 0;
15762Sigor@sysoev.ru             }
15862Sigor@sysoev.ru 
15962Sigor@sysoev.ru             return n;
16062Sigor@sysoev.ru         }
16162Sigor@sysoev.ru 
16262Sigor@sysoev.ru         if (n == 0) {
16362Sigor@sysoev.ru             c->socket.closed = 1;
16462Sigor@sysoev.ru             c->socket.read_ready = 0;
16562Sigor@sysoev.ru             return n;
16662Sigor@sysoev.ru         }
16762Sigor@sysoev.ru 
16862Sigor@sysoev.ru         /* n == -1 */
16962Sigor@sysoev.ru 
17062Sigor@sysoev.ru         switch (err) {
17162Sigor@sysoev.ru 
17262Sigor@sysoev.ru         case NXT_EAGAIN:
17362Sigor@sysoev.ru             nxt_debug(c->socket.task, "readv() %E", err);
17462Sigor@sysoev.ru             c->socket.read_ready = 0;
17562Sigor@sysoev.ru             return NXT_AGAIN;
17662Sigor@sysoev.ru 
17762Sigor@sysoev.ru         case NXT_EINTR:
17862Sigor@sysoev.ru             nxt_debug(c->socket.task, "readv() %E", err);
17962Sigor@sysoev.ru             continue;
18062Sigor@sysoev.ru 
18162Sigor@sysoev.ru         default:
18262Sigor@sysoev.ru             c->socket.error = err;
18362Sigor@sysoev.ru             nxt_log(c->socket.task, nxt_socket_error_level(err),
18462Sigor@sysoev.ru                     "readv(%d, %ui) failed %E", c->socket.fd, niov, err);
18562Sigor@sysoev.ru 
18662Sigor@sysoev.ru             return NXT_ERROR;
18762Sigor@sysoev.ru         }
18862Sigor@sysoev.ru     }
18962Sigor@sysoev.ru }
19062Sigor@sysoev.ru 
19162Sigor@sysoev.ru 
19262Sigor@sysoev.ru ssize_t
19362Sigor@sysoev.ru nxt_conn_io_recv(nxt_conn_t *c, void *buf, size_t size, nxt_uint_t flags)
19462Sigor@sysoev.ru {
19562Sigor@sysoev.ru     ssize_t    n;
19662Sigor@sysoev.ru     nxt_err_t  err;
19762Sigor@sysoev.ru 
19862Sigor@sysoev.ru     for ( ;; ) {
19962Sigor@sysoev.ru         n = recv(c->socket.fd, buf, size, flags);
20062Sigor@sysoev.ru 
20162Sigor@sysoev.ru         err = (n == -1) ? nxt_socket_errno : 0;
20262Sigor@sysoev.ru 
20362Sigor@sysoev.ru         nxt_debug(c->socket.task, "recv(%d, %p, %uz, 0x%ui): %z",
20462Sigor@sysoev.ru                   c->socket.fd, buf, size, flags, n);
20562Sigor@sysoev.ru 
20662Sigor@sysoev.ru         if (n > 0) {
207771Sigor@sysoev.ru             if ((size_t) n < size && (flags & MSG_PEEK) == 0) {
20862Sigor@sysoev.ru                 c->socket.read_ready = 0;
20962Sigor@sysoev.ru             }
21062Sigor@sysoev.ru 
21162Sigor@sysoev.ru             return n;
21262Sigor@sysoev.ru         }
21362Sigor@sysoev.ru 
21462Sigor@sysoev.ru         if (n == 0) {
21562Sigor@sysoev.ru             c->socket.closed = 1;
216771Sigor@sysoev.ru 
217771Sigor@sysoev.ru             if ((flags & MSG_PEEK) == 0) {
218771Sigor@sysoev.ru                 c->socket.read_ready = 0;
219771Sigor@sysoev.ru             }
22062Sigor@sysoev.ru 
22162Sigor@sysoev.ru             return n;
22262Sigor@sysoev.ru         }
22362Sigor@sysoev.ru 
22462Sigor@sysoev.ru         /* n == -1 */
22562Sigor@sysoev.ru 
22662Sigor@sysoev.ru         switch (err) {
22762Sigor@sysoev.ru 
22862Sigor@sysoev.ru         case NXT_EAGAIN:
22962Sigor@sysoev.ru             nxt_debug(c->socket.task, "recv() %E", err);
23062Sigor@sysoev.ru             c->socket.read_ready = 0;
23162Sigor@sysoev.ru 
23262Sigor@sysoev.ru             return NXT_AGAIN;
23362Sigor@sysoev.ru 
23462Sigor@sysoev.ru         case NXT_EINTR:
23562Sigor@sysoev.ru             nxt_debug(c->socket.task, "recv() %E", err);
23662Sigor@sysoev.ru             continue;
23762Sigor@sysoev.ru 
23862Sigor@sysoev.ru         default:
23962Sigor@sysoev.ru             c->socket.error = err;
24062Sigor@sysoev.ru             nxt_log(c->socket.task, nxt_socket_error_level(err),
24162Sigor@sysoev.ru                     "recv(%d, %p, %uz, %ui) failed %E",
24262Sigor@sysoev.ru                     c->socket.fd, buf, size, flags, err);
24362Sigor@sysoev.ru 
24462Sigor@sysoev.ru             return NXT_ERROR;
24562Sigor@sysoev.ru         }
24662Sigor@sysoev.ru     }
24762Sigor@sysoev.ru }
248