xref: /unit/src/nxt_conn_read.c (revision 771:f349b2d68e75)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 void
11 nxt_conn_wait(nxt_conn_t *c)
12 {
13     nxt_event_engine_t      *engine;
14     const nxt_conn_state_t  *state;
15 
16     nxt_debug(c->socket.task, "conn wait fd:%d rdy:%d",
17               c->socket.fd, c->socket.read_ready);
18 
19     engine = c->socket.task->thread->engine;
20     state = c->read_state;
21 
22     if (c->socket.read_ready) {
23         nxt_work_queue_add(&engine->fast_work_queue, state->ready_handler,
24                            c->socket.task, c, c->socket.data);
25         return;
26     }
27 
28     c->socket.read_handler = state->ready_handler;
29     c->socket.error_handler = state->error_handler;
30 
31     nxt_conn_timer(engine, c, state, &c->read_timer);
32 
33     nxt_fd_event_enable_read(engine, &c->socket);
34 }
35 
36 
37 void
38 nxt_conn_io_read(nxt_task_t *task, void *obj, void *data)
39 {
40     ssize_t                 n;
41     nxt_conn_t              *c;
42     nxt_event_engine_t      *engine;
43     nxt_work_handler_t      handler;
44     const nxt_conn_state_t  *state;
45 
46     c = obj;
47 
48     nxt_debug(task, "conn read fd:%d rdy:%d cl:%d",
49               c->socket.fd, c->socket.read_ready, c->socket.closed);
50 
51     engine = task->thread->engine;
52 
53     /*
54      * Here c->io->read() is assigned instead of direct nxt_conn_io_read()
55      * because the function can be called by nxt_kqueue_conn_io_read().
56      */
57     c->socket.read_handler = c->io->read;
58     state = c->read_state;
59     c->socket.error_handler = state->error_handler;
60 
61     if (c->socket.read_ready) {
62 
63         if (state->io_read_handler == NULL) {
64             n = c->io->recvbuf(c, c->read);
65 
66         } else {
67             n = state->io_read_handler(c);
68             /* The state can be changed by io_read_handler. */
69             state = c->read_state;
70         }
71 
72         if (n > 0) {
73             c->nbytes = n;
74 
75             nxt_recvbuf_update(c->read, n);
76 
77             nxt_fd_event_block_read(engine, &c->socket);
78 
79             if (state->timer_autoreset) {
80                 nxt_timer_disable(engine, &c->read_timer);
81             }
82 
83             nxt_work_queue_add(c->read_work_queue,
84                                state->ready_handler, task, c, data);
85             return;
86         }
87 
88         if (n != NXT_AGAIN) {
89             /* n == 0 or n == NXT_ERROR. */
90             handler = (n == 0) ? state->close_handler : state->error_handler;
91 
92             nxt_fd_event_block_read(engine, &c->socket);
93             nxt_timer_disable(engine, &c->read_timer);
94 
95             nxt_work_queue_add(&engine->fast_work_queue,
96                                handler, task, c, data);
97             return;
98         }
99 
100         /* n == NXT_AGAIN. */
101 
102         if (c->socket.read_ready) {
103             /*
104              * SSL/TLS library can return NXT_AGAIN if renegotiation
105              * occured during read operation, it toggled write event
106              * internally so only read timer should be set.
107              */
108             if (c->read_timer.state == NXT_TIMER_DISABLED) {
109                 nxt_conn_timer(engine, c, state, &c->read_timer);
110             }
111 
112             return;
113         }
114     }
115 
116     if (nxt_fd_event_is_disabled(c->socket.read)) {
117         nxt_fd_event_enable_read(engine, &c->socket);
118     }
119 
120     if (state->timer_autoreset || c->read_timer.state == NXT_TIMER_DISABLED) {
121         nxt_conn_timer(engine, c, state, &c->read_timer);
122     }
123 }
124 
125 
126 ssize_t
127 nxt_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b)
128 {
129     ssize_t                 n;
130     nxt_err_t               err;
131     nxt_uint_t              niov;
132     struct iovec            iov[NXT_IOBUF_MAX];
133     nxt_recvbuf_coalesce_t  rb;
134 
135     rb.buf = b;
136     rb.iobuf = iov;
137     rb.nmax = NXT_IOBUF_MAX;
138     rb.size = 0;
139 
140     niov = nxt_recvbuf_mem_coalesce(&rb);
141 
142     if (niov == 1) {
143         /* Disposal of surplus kernel iovec copy-in operation. */
144         return nxt_conn_io_recv(c, iov->iov_base, iov->iov_len, 0);
145     }
146 
147     for ( ;; ) {
148         n = readv(c->socket.fd, iov, niov);
149 
150         err = (n == -1) ? nxt_socket_errno : 0;
151 
152         nxt_debug(c->socket.task, "readv(%d, %ui): %z", c->socket.fd, niov, n);
153 
154         if (n > 0) {
155             if ((size_t) n < rb.size) {
156                 c->socket.read_ready = 0;
157             }
158 
159             return n;
160         }
161 
162         if (n == 0) {
163             c->socket.closed = 1;
164             c->socket.read_ready = 0;
165             return n;
166         }
167 
168         /* n == -1 */
169 
170         switch (err) {
171 
172         case NXT_EAGAIN:
173             nxt_debug(c->socket.task, "readv() %E", err);
174             c->socket.read_ready = 0;
175             return NXT_AGAIN;
176 
177         case NXT_EINTR:
178             nxt_debug(c->socket.task, "readv() %E", err);
179             continue;
180 
181         default:
182             c->socket.error = err;
183             nxt_log(c->socket.task, nxt_socket_error_level(err),
184                     "readv(%d, %ui) failed %E", c->socket.fd, niov, err);
185 
186             return NXT_ERROR;
187         }
188     }
189 }
190 
191 
192 ssize_t
193 nxt_conn_io_recv(nxt_conn_t *c, void *buf, size_t size, nxt_uint_t flags)
194 {
195     ssize_t    n;
196     nxt_err_t  err;
197 
198     for ( ;; ) {
199         n = recv(c->socket.fd, buf, size, flags);
200 
201         err = (n == -1) ? nxt_socket_errno : 0;
202 
203         nxt_debug(c->socket.task, "recv(%d, %p, %uz, 0x%ui): %z",
204                   c->socket.fd, buf, size, flags, n);
205 
206         if (n > 0) {
207             if ((size_t) n < size && (flags & MSG_PEEK) == 0) {
208                 c->socket.read_ready = 0;
209             }
210 
211             return n;
212         }
213 
214         if (n == 0) {
215             c->socket.closed = 1;
216 
217             if ((flags & MSG_PEEK) == 0) {
218                 c->socket.read_ready = 0;
219             }
220 
221             return n;
222         }
223 
224         /* n == -1 */
225 
226         switch (err) {
227 
228         case NXT_EAGAIN:
229             nxt_debug(c->socket.task, "recv() %E", err);
230             c->socket.read_ready = 0;
231 
232             return NXT_AGAIN;
233 
234         case NXT_EINTR:
235             nxt_debug(c->socket.task, "recv() %E", err);
236             continue;
237 
238         default:
239             c->socket.error = err;
240             nxt_log(c->socket.task, nxt_socket_error_level(err),
241                     "recv(%d, %p, %uz, %ui) failed %E",
242                     c->socket.fd, buf, size, flags, err);
243 
244             return NXT_ERROR;
245         }
246     }
247 }
248