xref: /unit/src/nxt_conn_read.c (revision 62:5e1efcc7b740)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 void
11 nxt_conn_wait(nxt_conn_t *c)
12 {
13     nxt_event_engine_t      *engine;
14     const nxt_conn_state_t  *state;
15 
16     nxt_debug(c->socket.task, "conn wait fd:%d rdy:%d",
17               c->socket.fd, c->socket.read_ready);
18 
19     engine = c->socket.task->thread->engine;
20     state = c->read_state;
21 
22     if (c->socket.read_ready) {
23         nxt_work_queue_add(&engine->fast_work_queue, state->ready_handler,
24                            c->socket.task, c, c->socket.data);
25         return;
26     }
27 
28     c->socket.read_handler = state->ready_handler;
29     c->socket.error_handler = state->error_handler;
30 
31     nxt_conn_timer(engine, c, state, &c->read_timer);
32 
33     nxt_fd_event_enable_read(engine, &c->socket);
34 }
35 
36 
37 void
38 nxt_conn_io_read(nxt_task_t *task, void *obj, void *data)
39 {
40     ssize_t                 n;
41     nxt_buf_t               *b;
42     nxt_conn_t              *c;
43     nxt_work_queue_t        *wq;
44     nxt_event_engine_t      *engine;
45     nxt_work_handler_t      handler;
46     const nxt_conn_state_t  *state;
47 
48     c = obj;
49 
50     nxt_debug(task, "conn read fd:%d rdy:%d cl:%d",
51               c->socket.fd, c->socket.read_ready, c->socket.closed);
52 
53     engine = task->thread->engine;
54 
55     state = c->read_state;
56 
57     if (c->socket.read_ready) {
58 
59         b = c->read;
60 
61         if (c->peek == 0)  {
62             n = c->io->recvbuf(c, b);
63 
64         } else {
65             n = c->io->recv(c, b->mem.free, c->peek, MSG_PEEK);
66         }
67 
68         if (n > 0) {
69             c->nbytes = n;
70 
71             nxt_recvbuf_update(b, n);
72 
73             nxt_fd_event_block_read(engine, &c->socket);
74 
75             if (state->timer_autoreset) {
76                 nxt_timer_disable(engine, &c->read_timer);
77             }
78 
79             wq = c->read_work_queue;
80             handler = state->ready_handler;
81 
82             nxt_work_queue_add(wq, handler, task, c, data);
83 
84             return;
85         }
86 
87         if (n != NXT_AGAIN) {
88             nxt_fd_event_block_read(engine, &c->socket);
89             nxt_timer_disable(engine, &c->read_timer);
90 
91             wq = &engine->fast_work_queue;
92 
93             handler = (n == 0) ? state->close_handler : state->error_handler;
94 
95             nxt_work_queue_add(wq, handler, task, c, data);
96 
97             return;
98         }
99     }
100 
101     /*
102      * Here c->io->read() is assigned instead of direct nxt_conn_io_read()
103      * because the function can be called by nxt_kqueue_conn_io_read().
104      */
105     c->socket.read_handler = c->io->read;
106     c->socket.error_handler = state->error_handler;
107 
108     if (c->read_timer.state == NXT_TIMER_DISABLED
109         || nxt_fd_event_is_disabled(c->socket.read))
110     {
111         /* Timer may be set or reset. */
112         nxt_conn_timer(engine, c, state, &c->read_timer);
113 
114         if (nxt_fd_event_is_disabled(c->socket.read)) {
115             nxt_fd_event_enable_read(engine, &c->socket);
116         }
117     }
118 
119     return;
120 }
121 
122 
123 ssize_t
124 nxt_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b)
125 {
126     ssize_t                 n;
127     nxt_err_t               err;
128     nxt_uint_t              niov;
129     struct iovec            iov[NXT_IOBUF_MAX];
130     nxt_recvbuf_coalesce_t  rb;
131 
132     rb.buf = b;
133     rb.iobuf = iov;
134     rb.nmax = NXT_IOBUF_MAX;
135     rb.size = 0;
136 
137     niov = nxt_recvbuf_mem_coalesce(&rb);
138 
139     if (niov == 1) {
140         /* Disposal of surplus kernel iovec copy-in operation. */
141         return nxt_conn_io_recv(c, iov->iov_base, iov->iov_len, 0);
142     }
143 
144     for ( ;; ) {
145         n = readv(c->socket.fd, iov, niov);
146 
147         err = (n == -1) ? nxt_socket_errno : 0;
148 
149         nxt_debug(c->socket.task, "readv(%d, %ui): %z", c->socket.fd, niov, n);
150 
151         if (n > 0) {
152             if ((size_t) n < rb.size) {
153                 c->socket.read_ready = 0;
154             }
155 
156             return n;
157         }
158 
159         if (n == 0) {
160             c->socket.closed = 1;
161             c->socket.read_ready = 0;
162             return n;
163         }
164 
165         /* n == -1 */
166 
167         switch (err) {
168 
169         case NXT_EAGAIN:
170             nxt_debug(c->socket.task, "readv() %E", err);
171             c->socket.read_ready = 0;
172             return NXT_AGAIN;
173 
174         case NXT_EINTR:
175             nxt_debug(c->socket.task, "readv() %E", err);
176             continue;
177 
178         default:
179             c->socket.error = err;
180             nxt_log(c->socket.task, nxt_socket_error_level(err),
181                     "readv(%d, %ui) failed %E", c->socket.fd, niov, err);
182 
183             return NXT_ERROR;
184         }
185     }
186 }
187 
188 
189 ssize_t
190 nxt_conn_io_recv(nxt_conn_t *c, void *buf, size_t size, nxt_uint_t flags)
191 {
192     ssize_t    n;
193     nxt_err_t  err;
194 
195     for ( ;; ) {
196         n = recv(c->socket.fd, buf, size, flags);
197 
198         err = (n == -1) ? nxt_socket_errno : 0;
199 
200         nxt_debug(c->socket.task, "recv(%d, %p, %uz, 0x%ui): %z",
201                   c->socket.fd, buf, size, flags, n);
202 
203         if (n > 0) {
204             if ((size_t) n < size) {
205                 c->socket.read_ready = 0;
206             }
207 
208             return n;
209         }
210 
211         if (n == 0) {
212             c->socket.closed = 1;
213             c->socket.read_ready = 0;
214 
215             return n;
216         }
217 
218         /* n == -1 */
219 
220         switch (err) {
221 
222         case NXT_EAGAIN:
223             nxt_debug(c->socket.task, "recv() %E", err);
224             c->socket.read_ready = 0;
225 
226             return NXT_AGAIN;
227 
228         case NXT_EINTR:
229             nxt_debug(c->socket.task, "recv() %E", err);
230             continue;
231 
232         default:
233             c->socket.error = err;
234             nxt_log(c->socket.task, nxt_socket_error_level(err),
235                     "recv(%d, %p, %uz, %ui) failed %E",
236                     c->socket.fd, buf, size, flags, err);
237 
238             return NXT_ERROR;
239         }
240     }
241 }
242