xref: /unit/src/nxt_conn_read.c (revision 836:ecd3c5bbf7d8)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 void
11 nxt_conn_wait(nxt_conn_t *c)
12 {
13     nxt_event_engine_t      *engine;
14     const nxt_conn_state_t  *state;
15 
16     nxt_debug(c->socket.task, "conn wait fd:%d rdy:%d",
17               c->socket.fd, c->socket.read_ready);
18 
19     engine = c->socket.task->thread->engine;
20     state = c->read_state;
21 
22     if (c->socket.read_ready) {
23         nxt_work_queue_add(&engine->fast_work_queue, state->ready_handler,
24                            c->socket.task, c, c->socket.data);
25         return;
26     }
27 
28     c->socket.read_handler = state->ready_handler;
29     c->socket.error_handler = state->error_handler;
30 
31     nxt_conn_timer(engine, c, state, &c->read_timer);
32 
33     nxt_fd_event_enable_read(engine, &c->socket);
34 }
35 
36 
37 void
38 nxt_conn_io_read(nxt_task_t *task, void *obj, void *data)
39 {
40     ssize_t                 n;
41     nxt_conn_t              *c;
42     nxt_event_engine_t      *engine;
43     nxt_work_handler_t      handler;
44     const nxt_conn_state_t  *state;
45 
46     c = obj;
47 
48     nxt_debug(task, "conn read fd:%d rdy:%d cl:%d",
49               c->socket.fd, c->socket.read_ready, c->socket.closed);
50 
51     if (c->socket.error != 0) {
52         return;
53     }
54 
55     engine = task->thread->engine;
56 
57     /*
58      * Here c->io->read() is assigned instead of direct nxt_conn_io_read()
59      * because the function can be called by nxt_kqueue_conn_io_read().
60      */
61     c->socket.read_handler = c->io->read;
62     state = c->read_state;
63     c->socket.error_handler = state->error_handler;
64 
65     if (c->socket.read_ready) {
66 
67         if (state->io_read_handler == NULL) {
68             n = c->io->recvbuf(c, c->read);
69 
70         } else {
71             n = state->io_read_handler(c);
72             /* The state can be changed by io_read_handler. */
73             state = c->read_state;
74         }
75 
76         if (n > 0) {
77             c->nbytes = n;
78 
79             nxt_recvbuf_update(c->read, n);
80 
81             nxt_fd_event_block_read(engine, &c->socket);
82 
83             if (state->timer_autoreset) {
84                 nxt_timer_disable(engine, &c->read_timer);
85             }
86 
87             nxt_work_queue_add(c->read_work_queue,
88                                state->ready_handler, task, c, data);
89             return;
90         }
91 
92         if (n != NXT_AGAIN) {
93             /* n == 0 or n == NXT_ERROR. */
94             handler = (n == 0) ? state->close_handler : state->error_handler;
95 
96             nxt_fd_event_block_read(engine, &c->socket);
97             nxt_timer_disable(engine, &c->read_timer);
98 
99             nxt_work_queue_add(&engine->fast_work_queue,
100                                handler, task, c, data);
101             return;
102         }
103 
104         /* n == NXT_AGAIN. */
105 
106         if (c->socket.read_ready) {
107             /*
108              * SSL/TLS library can return NXT_AGAIN if renegotiation
109              * occured during read operation, it toggled write event
110              * internally so only read timer should be set.
111              */
112             if (!c->read_timer.enabled) {
113                 nxt_conn_timer(engine, c, state, &c->read_timer);
114             }
115 
116             return;
117         }
118     }
119 
120     if (nxt_fd_event_is_disabled(c->socket.read)) {
121         nxt_fd_event_enable_read(engine, &c->socket);
122     }
123 
124     if (state->timer_autoreset || !c->read_timer.enabled) {
125         nxt_conn_timer(engine, c, state, &c->read_timer);
126     }
127 }
128 
129 
130 ssize_t
131 nxt_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b)
132 {
133     ssize_t                 n;
134     nxt_err_t               err;
135     nxt_uint_t              niov;
136     struct iovec            iov[NXT_IOBUF_MAX];
137     nxt_recvbuf_coalesce_t  rb;
138 
139     rb.buf = b;
140     rb.iobuf = iov;
141     rb.nmax = NXT_IOBUF_MAX;
142     rb.size = 0;
143 
144     niov = nxt_recvbuf_mem_coalesce(&rb);
145 
146     if (niov == 1) {
147         /* Disposal of surplus kernel iovec copy-in operation. */
148         return nxt_conn_io_recv(c, iov->iov_base, iov->iov_len, 0);
149     }
150 
151     for ( ;; ) {
152         n = readv(c->socket.fd, iov, niov);
153 
154         err = (n == -1) ? nxt_socket_errno : 0;
155 
156         nxt_debug(c->socket.task, "readv(%d, %ui): %z", c->socket.fd, niov, n);
157 
158         if (n > 0) {
159             if ((size_t) n < rb.size) {
160                 c->socket.read_ready = 0;
161             }
162 
163             return n;
164         }
165 
166         if (n == 0) {
167             c->socket.closed = 1;
168             c->socket.read_ready = 0;
169             return n;
170         }
171 
172         /* n == -1 */
173 
174         switch (err) {
175 
176         case NXT_EAGAIN:
177             nxt_debug(c->socket.task, "readv() %E", err);
178             c->socket.read_ready = 0;
179             return NXT_AGAIN;
180 
181         case NXT_EINTR:
182             nxt_debug(c->socket.task, "readv() %E", err);
183             continue;
184 
185         default:
186             c->socket.error = err;
187             nxt_log(c->socket.task, nxt_socket_error_level(err),
188                     "readv(%d, %ui) failed %E", c->socket.fd, niov, err);
189 
190             return NXT_ERROR;
191         }
192     }
193 }
194 
195 
196 ssize_t
197 nxt_conn_io_recv(nxt_conn_t *c, void *buf, size_t size, nxt_uint_t flags)
198 {
199     ssize_t    n;
200     nxt_err_t  err;
201 
202     for ( ;; ) {
203         n = recv(c->socket.fd, buf, size, flags);
204 
205         err = (n == -1) ? nxt_socket_errno : 0;
206 
207         nxt_debug(c->socket.task, "recv(%d, %p, %uz, 0x%ui): %z",
208                   c->socket.fd, buf, size, flags, n);
209 
210         if (n > 0) {
211             if ((size_t) n < size && (flags & MSG_PEEK) == 0) {
212                 c->socket.read_ready = 0;
213             }
214 
215             return n;
216         }
217 
218         if (n == 0) {
219             c->socket.closed = 1;
220 
221             if ((flags & MSG_PEEK) == 0) {
222                 c->socket.read_ready = 0;
223             }
224 
225             return n;
226         }
227 
228         /* n == -1 */
229 
230         switch (err) {
231 
232         case NXT_EAGAIN:
233             nxt_debug(c->socket.task, "recv() %E", err);
234             c->socket.read_ready = 0;
235 
236             return NXT_AGAIN;
237 
238         case NXT_EINTR:
239             nxt_debug(c->socket.task, "recv() %E", err);
240             continue;
241 
242         default:
243             c->socket.error = err;
244             nxt_log(c->socket.task, nxt_socket_error_level(err),
245                     "recv(%d, %p, %uz, %ui) failed %E",
246                     c->socket.fd, buf, size, flags, err);
247 
248             return NXT_ERROR;
249         }
250     }
251 }
252