xref: /unit/src/nxt_conn_read.c (revision 62)
1*62Sigor@sysoev.ru 
2*62Sigor@sysoev.ru /*
3*62Sigor@sysoev.ru  * Copyright (C) Igor Sysoev
4*62Sigor@sysoev.ru  * Copyright (C) NGINX, Inc.
5*62Sigor@sysoev.ru  */
6*62Sigor@sysoev.ru 
7*62Sigor@sysoev.ru #include <nxt_main.h>
8*62Sigor@sysoev.ru 
9*62Sigor@sysoev.ru 
10*62Sigor@sysoev.ru void
11*62Sigor@sysoev.ru nxt_conn_wait(nxt_conn_t *c)
12*62Sigor@sysoev.ru {
13*62Sigor@sysoev.ru     nxt_event_engine_t      *engine;
14*62Sigor@sysoev.ru     const nxt_conn_state_t  *state;
15*62Sigor@sysoev.ru 
16*62Sigor@sysoev.ru     nxt_debug(c->socket.task, "conn wait fd:%d rdy:%d",
17*62Sigor@sysoev.ru               c->socket.fd, c->socket.read_ready);
18*62Sigor@sysoev.ru 
19*62Sigor@sysoev.ru     engine = c->socket.task->thread->engine;
20*62Sigor@sysoev.ru     state = c->read_state;
21*62Sigor@sysoev.ru 
22*62Sigor@sysoev.ru     if (c->socket.read_ready) {
23*62Sigor@sysoev.ru         nxt_work_queue_add(&engine->fast_work_queue, state->ready_handler,
24*62Sigor@sysoev.ru                            c->socket.task, c, c->socket.data);
25*62Sigor@sysoev.ru         return;
26*62Sigor@sysoev.ru     }
27*62Sigor@sysoev.ru 
28*62Sigor@sysoev.ru     c->socket.read_handler = state->ready_handler;
29*62Sigor@sysoev.ru     c->socket.error_handler = state->error_handler;
30*62Sigor@sysoev.ru 
31*62Sigor@sysoev.ru     nxt_conn_timer(engine, c, state, &c->read_timer);
32*62Sigor@sysoev.ru 
33*62Sigor@sysoev.ru     nxt_fd_event_enable_read(engine, &c->socket);
34*62Sigor@sysoev.ru }
35*62Sigor@sysoev.ru 
36*62Sigor@sysoev.ru 
37*62Sigor@sysoev.ru void
38*62Sigor@sysoev.ru nxt_conn_io_read(nxt_task_t *task, void *obj, void *data)
39*62Sigor@sysoev.ru {
40*62Sigor@sysoev.ru     ssize_t                 n;
41*62Sigor@sysoev.ru     nxt_buf_t               *b;
42*62Sigor@sysoev.ru     nxt_conn_t              *c;
43*62Sigor@sysoev.ru     nxt_work_queue_t        *wq;
44*62Sigor@sysoev.ru     nxt_event_engine_t      *engine;
45*62Sigor@sysoev.ru     nxt_work_handler_t      handler;
46*62Sigor@sysoev.ru     const nxt_conn_state_t  *state;
47*62Sigor@sysoev.ru 
48*62Sigor@sysoev.ru     c = obj;
49*62Sigor@sysoev.ru 
50*62Sigor@sysoev.ru     nxt_debug(task, "conn read fd:%d rdy:%d cl:%d",
51*62Sigor@sysoev.ru               c->socket.fd, c->socket.read_ready, c->socket.closed);
52*62Sigor@sysoev.ru 
53*62Sigor@sysoev.ru     engine = task->thread->engine;
54*62Sigor@sysoev.ru 
55*62Sigor@sysoev.ru     state = c->read_state;
56*62Sigor@sysoev.ru 
57*62Sigor@sysoev.ru     if (c->socket.read_ready) {
58*62Sigor@sysoev.ru 
59*62Sigor@sysoev.ru         b = c->read;
60*62Sigor@sysoev.ru 
61*62Sigor@sysoev.ru         if (c->peek == 0)  {
62*62Sigor@sysoev.ru             n = c->io->recvbuf(c, b);
63*62Sigor@sysoev.ru 
64*62Sigor@sysoev.ru         } else {
65*62Sigor@sysoev.ru             n = c->io->recv(c, b->mem.free, c->peek, MSG_PEEK);
66*62Sigor@sysoev.ru         }
67*62Sigor@sysoev.ru 
68*62Sigor@sysoev.ru         if (n > 0) {
69*62Sigor@sysoev.ru             c->nbytes = n;
70*62Sigor@sysoev.ru 
71*62Sigor@sysoev.ru             nxt_recvbuf_update(b, n);
72*62Sigor@sysoev.ru 
73*62Sigor@sysoev.ru             nxt_fd_event_block_read(engine, &c->socket);
74*62Sigor@sysoev.ru 
75*62Sigor@sysoev.ru             if (state->timer_autoreset) {
76*62Sigor@sysoev.ru                 nxt_timer_disable(engine, &c->read_timer);
77*62Sigor@sysoev.ru             }
78*62Sigor@sysoev.ru 
79*62Sigor@sysoev.ru             wq = c->read_work_queue;
80*62Sigor@sysoev.ru             handler = state->ready_handler;
81*62Sigor@sysoev.ru 
82*62Sigor@sysoev.ru             nxt_work_queue_add(wq, handler, task, c, data);
83*62Sigor@sysoev.ru 
84*62Sigor@sysoev.ru             return;
85*62Sigor@sysoev.ru         }
86*62Sigor@sysoev.ru 
87*62Sigor@sysoev.ru         if (n != NXT_AGAIN) {
88*62Sigor@sysoev.ru             nxt_fd_event_block_read(engine, &c->socket);
89*62Sigor@sysoev.ru             nxt_timer_disable(engine, &c->read_timer);
90*62Sigor@sysoev.ru 
91*62Sigor@sysoev.ru             wq = &engine->fast_work_queue;
92*62Sigor@sysoev.ru 
93*62Sigor@sysoev.ru             handler = (n == 0) ? state->close_handler : state->error_handler;
94*62Sigor@sysoev.ru 
95*62Sigor@sysoev.ru             nxt_work_queue_add(wq, handler, task, c, data);
96*62Sigor@sysoev.ru 
97*62Sigor@sysoev.ru             return;
98*62Sigor@sysoev.ru         }
99*62Sigor@sysoev.ru     }
100*62Sigor@sysoev.ru 
101*62Sigor@sysoev.ru     /*
102*62Sigor@sysoev.ru      * Here c->io->read() is assigned instead of direct nxt_conn_io_read()
103*62Sigor@sysoev.ru      * because the function can be called by nxt_kqueue_conn_io_read().
104*62Sigor@sysoev.ru      */
105*62Sigor@sysoev.ru     c->socket.read_handler = c->io->read;
106*62Sigor@sysoev.ru     c->socket.error_handler = state->error_handler;
107*62Sigor@sysoev.ru 
108*62Sigor@sysoev.ru     if (c->read_timer.state == NXT_TIMER_DISABLED
109*62Sigor@sysoev.ru         || nxt_fd_event_is_disabled(c->socket.read))
110*62Sigor@sysoev.ru     {
111*62Sigor@sysoev.ru         /* Timer may be set or reset. */
112*62Sigor@sysoev.ru         nxt_conn_timer(engine, c, state, &c->read_timer);
113*62Sigor@sysoev.ru 
114*62Sigor@sysoev.ru         if (nxt_fd_event_is_disabled(c->socket.read)) {
115*62Sigor@sysoev.ru             nxt_fd_event_enable_read(engine, &c->socket);
116*62Sigor@sysoev.ru         }
117*62Sigor@sysoev.ru     }
118*62Sigor@sysoev.ru 
119*62Sigor@sysoev.ru     return;
120*62Sigor@sysoev.ru }
121*62Sigor@sysoev.ru 
122*62Sigor@sysoev.ru 
123*62Sigor@sysoev.ru ssize_t
124*62Sigor@sysoev.ru nxt_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b)
125*62Sigor@sysoev.ru {
126*62Sigor@sysoev.ru     ssize_t                 n;
127*62Sigor@sysoev.ru     nxt_err_t               err;
128*62Sigor@sysoev.ru     nxt_uint_t              niov;
129*62Sigor@sysoev.ru     struct iovec            iov[NXT_IOBUF_MAX];
130*62Sigor@sysoev.ru     nxt_recvbuf_coalesce_t  rb;
131*62Sigor@sysoev.ru 
132*62Sigor@sysoev.ru     rb.buf = b;
133*62Sigor@sysoev.ru     rb.iobuf = iov;
134*62Sigor@sysoev.ru     rb.nmax = NXT_IOBUF_MAX;
135*62Sigor@sysoev.ru     rb.size = 0;
136*62Sigor@sysoev.ru 
137*62Sigor@sysoev.ru     niov = nxt_recvbuf_mem_coalesce(&rb);
138*62Sigor@sysoev.ru 
139*62Sigor@sysoev.ru     if (niov == 1) {
140*62Sigor@sysoev.ru         /* Disposal of surplus kernel iovec copy-in operation. */
141*62Sigor@sysoev.ru         return nxt_conn_io_recv(c, iov->iov_base, iov->iov_len, 0);
142*62Sigor@sysoev.ru     }
143*62Sigor@sysoev.ru 
144*62Sigor@sysoev.ru     for ( ;; ) {
145*62Sigor@sysoev.ru         n = readv(c->socket.fd, iov, niov);
146*62Sigor@sysoev.ru 
147*62Sigor@sysoev.ru         err = (n == -1) ? nxt_socket_errno : 0;
148*62Sigor@sysoev.ru 
149*62Sigor@sysoev.ru         nxt_debug(c->socket.task, "readv(%d, %ui): %z", c->socket.fd, niov, n);
150*62Sigor@sysoev.ru 
151*62Sigor@sysoev.ru         if (n > 0) {
152*62Sigor@sysoev.ru             if ((size_t) n < rb.size) {
153*62Sigor@sysoev.ru                 c->socket.read_ready = 0;
154*62Sigor@sysoev.ru             }
155*62Sigor@sysoev.ru 
156*62Sigor@sysoev.ru             return n;
157*62Sigor@sysoev.ru         }
158*62Sigor@sysoev.ru 
159*62Sigor@sysoev.ru         if (n == 0) {
160*62Sigor@sysoev.ru             c->socket.closed = 1;
161*62Sigor@sysoev.ru             c->socket.read_ready = 0;
162*62Sigor@sysoev.ru             return n;
163*62Sigor@sysoev.ru         }
164*62Sigor@sysoev.ru 
165*62Sigor@sysoev.ru         /* n == -1 */
166*62Sigor@sysoev.ru 
167*62Sigor@sysoev.ru         switch (err) {
168*62Sigor@sysoev.ru 
169*62Sigor@sysoev.ru         case NXT_EAGAIN:
170*62Sigor@sysoev.ru             nxt_debug(c->socket.task, "readv() %E", err);
171*62Sigor@sysoev.ru             c->socket.read_ready = 0;
172*62Sigor@sysoev.ru             return NXT_AGAIN;
173*62Sigor@sysoev.ru 
174*62Sigor@sysoev.ru         case NXT_EINTR:
175*62Sigor@sysoev.ru             nxt_debug(c->socket.task, "readv() %E", err);
176*62Sigor@sysoev.ru             continue;
177*62Sigor@sysoev.ru 
178*62Sigor@sysoev.ru         default:
179*62Sigor@sysoev.ru             c->socket.error = err;
180*62Sigor@sysoev.ru             nxt_log(c->socket.task, nxt_socket_error_level(err),
181*62Sigor@sysoev.ru                     "readv(%d, %ui) failed %E", c->socket.fd, niov, err);
182*62Sigor@sysoev.ru 
183*62Sigor@sysoev.ru             return NXT_ERROR;
184*62Sigor@sysoev.ru         }
185*62Sigor@sysoev.ru     }
186*62Sigor@sysoev.ru }
187*62Sigor@sysoev.ru 
188*62Sigor@sysoev.ru 
189*62Sigor@sysoev.ru ssize_t
190*62Sigor@sysoev.ru nxt_conn_io_recv(nxt_conn_t *c, void *buf, size_t size, nxt_uint_t flags)
191*62Sigor@sysoev.ru {
192*62Sigor@sysoev.ru     ssize_t    n;
193*62Sigor@sysoev.ru     nxt_err_t  err;
194*62Sigor@sysoev.ru 
195*62Sigor@sysoev.ru     for ( ;; ) {
196*62Sigor@sysoev.ru         n = recv(c->socket.fd, buf, size, flags);
197*62Sigor@sysoev.ru 
198*62Sigor@sysoev.ru         err = (n == -1) ? nxt_socket_errno : 0;
199*62Sigor@sysoev.ru 
200*62Sigor@sysoev.ru         nxt_debug(c->socket.task, "recv(%d, %p, %uz, 0x%ui): %z",
201*62Sigor@sysoev.ru                   c->socket.fd, buf, size, flags, n);
202*62Sigor@sysoev.ru 
203*62Sigor@sysoev.ru         if (n > 0) {
204*62Sigor@sysoev.ru             if ((size_t) n < size) {
205*62Sigor@sysoev.ru                 c->socket.read_ready = 0;
206*62Sigor@sysoev.ru             }
207*62Sigor@sysoev.ru 
208*62Sigor@sysoev.ru             return n;
209*62Sigor@sysoev.ru         }
210*62Sigor@sysoev.ru 
211*62Sigor@sysoev.ru         if (n == 0) {
212*62Sigor@sysoev.ru             c->socket.closed = 1;
213*62Sigor@sysoev.ru             c->socket.read_ready = 0;
214*62Sigor@sysoev.ru 
215*62Sigor@sysoev.ru             return n;
216*62Sigor@sysoev.ru         }
217*62Sigor@sysoev.ru 
218*62Sigor@sysoev.ru         /* n == -1 */
219*62Sigor@sysoev.ru 
220*62Sigor@sysoev.ru         switch (err) {
221*62Sigor@sysoev.ru 
222*62Sigor@sysoev.ru         case NXT_EAGAIN:
223*62Sigor@sysoev.ru             nxt_debug(c->socket.task, "recv() %E", err);
224*62Sigor@sysoev.ru             c->socket.read_ready = 0;
225*62Sigor@sysoev.ru 
226*62Sigor@sysoev.ru             return NXT_AGAIN;
227*62Sigor@sysoev.ru 
228*62Sigor@sysoev.ru         case NXT_EINTR:
229*62Sigor@sysoev.ru             nxt_debug(c->socket.task, "recv() %E", err);
230*62Sigor@sysoev.ru             continue;
231*62Sigor@sysoev.ru 
232*62Sigor@sysoev.ru         default:
233*62Sigor@sysoev.ru             c->socket.error = err;
234*62Sigor@sysoev.ru             nxt_log(c->socket.task, nxt_socket_error_level(err),
235*62Sigor@sysoev.ru                     "recv(%d, %p, %uz, %ui) failed %E",
236*62Sigor@sysoev.ru                     c->socket.fd, buf, size, flags, err);
237*62Sigor@sysoev.ru 
238*62Sigor@sysoev.ru             return NXT_ERROR;
239*62Sigor@sysoev.ru         }
240*62Sigor@sysoev.ru     }
241*62Sigor@sysoev.ru }
242