xref: /unit/src/nxt_conn_read.c (revision 1268:dc403927ab0b)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 void
nxt_conn_wait(nxt_conn_t * c)11 nxt_conn_wait(nxt_conn_t *c)
12 {
13     nxt_event_engine_t      *engine;
14     const nxt_conn_state_t  *state;
15 
16     nxt_debug(c->socket.task, "conn wait fd:%d rdy:%d",
17               c->socket.fd, c->socket.read_ready);
18 
19     engine = c->socket.task->thread->engine;
20     state = c->read_state;
21 
22     if (c->socket.read_ready) {
23         nxt_work_queue_add(&engine->fast_work_queue, state->ready_handler,
24                            c->socket.task, c, c->socket.data);
25         return;
26     }
27 
28     c->socket.read_handler = state->ready_handler;
29     c->socket.error_handler = state->error_handler;
30 
31     nxt_conn_timer(engine, c, state, &c->read_timer);
32 
33     nxt_fd_event_enable_read(engine, &c->socket);
34 }
35 
36 
37 void
nxt_conn_io_read(nxt_task_t * task,void * obj,void * data)38 nxt_conn_io_read(nxt_task_t *task, void *obj, void *data)
39 {
40     ssize_t                 n;
41     nxt_conn_t              *c;
42     nxt_event_engine_t      *engine;
43     nxt_work_handler_t      handler;
44     const nxt_conn_state_t  *state;
45 
46     c = obj;
47 
48     nxt_debug(task, "conn read fd:%d rdy:%d cl:%d er:%d bl:%d",
49               c->socket.fd, c->socket.read_ready, c->socket.closed,
50               c->socket.error, c->block_read);
51 
52     if (c->socket.error != 0 || c->block_read) {
53         return;
54     }
55 
56     engine = task->thread->engine;
57 
58     /*
59      * Here c->io->read() is assigned instead of direct nxt_conn_io_read()
60      * because the function can be called by nxt_kqueue_conn_io_read().
61      */
62     c->socket.read_handler = c->io->read;
63     state = c->read_state;
64     c->socket.error_handler = state->error_handler;
65 
66     if (c->socket.read_ready) {
67 
68         if (state->io_read_handler == NULL) {
69             n = c->io->recvbuf(c, c->read);
70 
71         } else {
72             n = state->io_read_handler(task, c);
73             /* The state can be changed by io_read_handler. */
74             state = c->read_state;
75         }
76 
77         if (n > 0) {
78             c->nbytes = n;
79 
80             nxt_recvbuf_update(c->read, n);
81 
82             nxt_fd_event_block_read(engine, &c->socket);
83 
84             if (state->timer_autoreset) {
85                 nxt_timer_disable(engine, &c->read_timer);
86             }
87 
88             nxt_work_queue_add(c->read_work_queue,
89                                state->ready_handler, task, c, data);
90             return;
91         }
92 
93         if (n != NXT_AGAIN) {
94             /* n == 0 or n == NXT_ERROR. */
95             handler = (n == 0) ? state->close_handler : state->error_handler;
96 
97             nxt_fd_event_block_read(engine, &c->socket);
98             nxt_timer_disable(engine, &c->read_timer);
99 
100             nxt_work_queue_add(&engine->fast_work_queue,
101                                handler, task, c, data);
102             return;
103         }
104 
105         /* n == NXT_AGAIN. */
106 
107         if (c->socket.read_ready) {
108             /*
109              * SSL/TLS library can return NXT_AGAIN if renegotiation
110              * occured during read operation, it toggled write event
111              * internally so only read timer should be set.
112              */
113             if (!c->read_timer.enabled) {
114                 nxt_conn_timer(engine, c, state, &c->read_timer);
115             }
116 
117             return;
118         }
119     }
120 
121     if (nxt_fd_event_is_disabled(c->socket.read)) {
122         nxt_fd_event_enable_read(engine, &c->socket);
123     }
124 
125     if (state->timer_autoreset || !c->read_timer.enabled) {
126         nxt_conn_timer(engine, c, state, &c->read_timer);
127     }
128 }
129 
130 
131 ssize_t
nxt_conn_io_recvbuf(nxt_conn_t * c,nxt_buf_t * b)132 nxt_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b)
133 {
134     ssize_t                 n;
135     nxt_err_t               err;
136     nxt_uint_t              niov;
137     struct iovec            iov[NXT_IOBUF_MAX];
138     nxt_recvbuf_coalesce_t  rb;
139 
140     rb.buf = b;
141     rb.iobuf = iov;
142     rb.nmax = NXT_IOBUF_MAX;
143     rb.size = 0;
144 
145     niov = nxt_recvbuf_mem_coalesce(&rb);
146 
147     if (niov == 1) {
148         /* Disposal of surplus kernel iovec copy-in operation. */
149         return nxt_conn_io_recv(c, iov->iov_base, iov->iov_len, 0);
150     }
151 
152     for ( ;; ) {
153         n = readv(c->socket.fd, iov, niov);
154 
155         err = (n == -1) ? nxt_socket_errno : 0;
156 
157         nxt_debug(c->socket.task, "readv(%d, %ui): %z", c->socket.fd, niov, n);
158 
159         if (n > 0) {
160             if ((size_t) n < rb.size) {
161                 c->socket.read_ready = 0;
162             }
163 
164             return n;
165         }
166 
167         if (n == 0) {
168             c->socket.closed = 1;
169             c->socket.read_ready = 0;
170             return n;
171         }
172 
173         /* n == -1 */
174 
175         switch (err) {
176 
177         case NXT_EAGAIN:
178             nxt_debug(c->socket.task, "readv() %E", err);
179             c->socket.read_ready = 0;
180             return NXT_AGAIN;
181 
182         case NXT_EINTR:
183             nxt_debug(c->socket.task, "readv() %E", err);
184             continue;
185 
186         default:
187             c->socket.error = err;
188             nxt_log(c->socket.task, nxt_socket_error_level(err),
189                     "readv(%d, %ui) failed %E", c->socket.fd, niov, err);
190 
191             return NXT_ERROR;
192         }
193     }
194 }
195 
196 
197 ssize_t
nxt_conn_io_recv(nxt_conn_t * c,void * buf,size_t size,nxt_uint_t flags)198 nxt_conn_io_recv(nxt_conn_t *c, void *buf, size_t size, nxt_uint_t flags)
199 {
200     ssize_t    n;
201     nxt_err_t  err;
202 
203     for ( ;; ) {
204         n = recv(c->socket.fd, buf, size, flags);
205 
206         err = (n == -1) ? nxt_socket_errno : 0;
207 
208         nxt_debug(c->socket.task, "recv(%d, %p, %uz, 0x%ui): %z",
209                   c->socket.fd, buf, size, flags, n);
210 
211         if (n > 0) {
212             if ((size_t) n < size && (flags & MSG_PEEK) == 0) {
213                 c->socket.read_ready = 0;
214             }
215 
216             return n;
217         }
218 
219         if (n == 0) {
220             c->socket.closed = 1;
221 
222             if ((flags & MSG_PEEK) == 0) {
223                 c->socket.read_ready = 0;
224             }
225 
226             return n;
227         }
228 
229         /* n == -1 */
230 
231         switch (err) {
232 
233         case NXT_EAGAIN:
234             nxt_debug(c->socket.task, "recv() %E", err);
235             c->socket.read_ready = 0;
236 
237             return NXT_AGAIN;
238 
239         case NXT_EINTR:
240             nxt_debug(c->socket.task, "recv() %E", err);
241             continue;
242 
243         default:
244             c->socket.error = err;
245             nxt_log(c->socket.task, nxt_socket_error_level(err),
246                     "recv(%d, %p, %uz, %ui) failed %E",
247                     c->socket.fd, buf, size, flags, err);
248 
249             return NXT_ERROR;
250         }
251     }
252 }
253