xref: /unit/src/nxt_conn_read.c (revision 741:8856c08e95aa)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 void
11 nxt_conn_wait(nxt_conn_t *c)
12 {
13     nxt_event_engine_t      *engine;
14     const nxt_conn_state_t  *state;
15 
16     nxt_debug(c->socket.task, "conn wait fd:%d rdy:%d",
17               c->socket.fd, c->socket.read_ready);
18 
19     engine = c->socket.task->thread->engine;
20     state = c->read_state;
21 
22     if (c->socket.read_ready) {
23         nxt_work_queue_add(&engine->fast_work_queue, state->ready_handler,
24                            c->socket.task, c, c->socket.data);
25         return;
26     }
27 
28     c->socket.read_handler = state->ready_handler;
29     c->socket.error_handler = state->error_handler;
30 
31     nxt_conn_timer(engine, c, state, &c->read_timer);
32 
33     nxt_fd_event_enable_read(engine, &c->socket);
34 }
35 
36 
37 void
38 nxt_conn_io_read(nxt_task_t *task, void *obj, void *data)
39 {
40     ssize_t                 n;
41     nxt_conn_t              *c;
42     nxt_work_queue_t        *wq;
43     nxt_event_engine_t      *engine;
44     nxt_work_handler_t      handler;
45     const nxt_conn_state_t  *state;
46 
47     c = obj;
48 
49     nxt_debug(task, "conn read fd:%d rdy:%d cl:%d",
50               c->socket.fd, c->socket.read_ready, c->socket.closed);
51 
52     engine = task->thread->engine;
53 
54     state = c->read_state;
55 
56     if (c->socket.read_ready) {
57 
58         if (state->io_read_handler == NULL) {
59             n = c->io->recvbuf(c, c->read);
60 
61         } else {
62             n = state->io_read_handler(c);
63             /* The state can be changed by io_read_handler. */
64             state = c->read_state;
65         }
66 
67         if (n > 0) {
68             c->nbytes = n;
69 
70             nxt_recvbuf_update(c->read, n);
71 
72             nxt_fd_event_block_read(engine, &c->socket);
73 
74             if (state->timer_autoreset) {
75                 nxt_timer_disable(engine, &c->read_timer);
76             }
77 
78             wq = c->read_work_queue;
79             handler = state->ready_handler;
80 
81             nxt_work_queue_add(wq, handler, task, c, data);
82 
83             return;
84         }
85 
86         if (n != NXT_AGAIN) {
87             nxt_fd_event_block_read(engine, &c->socket);
88             nxt_timer_disable(engine, &c->read_timer);
89 
90             wq = &engine->fast_work_queue;
91 
92             handler = (n == 0) ? state->close_handler : state->error_handler;
93 
94             nxt_work_queue_add(wq, handler, task, c, data);
95 
96             return;
97         }
98     }
99 
100     /*
101      * Here c->io->read() is assigned instead of direct nxt_conn_io_read()
102      * because the function can be called by nxt_kqueue_conn_io_read().
103      */
104     c->socket.read_handler = c->io->read;
105     c->socket.error_handler = state->error_handler;
106 
107     if (c->read_timer.state == NXT_TIMER_DISABLED
108         || nxt_fd_event_is_disabled(c->socket.read))
109     {
110         /* Timer may be set or reset. */
111         nxt_conn_timer(engine, c, state, &c->read_timer);
112 
113         if (nxt_fd_event_is_disabled(c->socket.read)) {
114             nxt_fd_event_enable_read(engine, &c->socket);
115         }
116     }
117 
118     return;
119 }
120 
121 
122 ssize_t
123 nxt_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b)
124 {
125     ssize_t                 n;
126     nxt_err_t               err;
127     nxt_uint_t              niov;
128     struct iovec            iov[NXT_IOBUF_MAX];
129     nxt_recvbuf_coalesce_t  rb;
130 
131     rb.buf = b;
132     rb.iobuf = iov;
133     rb.nmax = NXT_IOBUF_MAX;
134     rb.size = 0;
135 
136     niov = nxt_recvbuf_mem_coalesce(&rb);
137 
138     if (niov == 1) {
139         /* Disposal of surplus kernel iovec copy-in operation. */
140         return nxt_conn_io_recv(c, iov->iov_base, iov->iov_len, 0);
141     }
142 
143     for ( ;; ) {
144         n = readv(c->socket.fd, iov, niov);
145 
146         err = (n == -1) ? nxt_socket_errno : 0;
147 
148         nxt_debug(c->socket.task, "readv(%d, %ui): %z", c->socket.fd, niov, n);
149 
150         if (n > 0) {
151             if ((size_t) n < rb.size) {
152                 c->socket.read_ready = 0;
153             }
154 
155             return n;
156         }
157 
158         if (n == 0) {
159             c->socket.closed = 1;
160             c->socket.read_ready = 0;
161             return n;
162         }
163 
164         /* n == -1 */
165 
166         switch (err) {
167 
168         case NXT_EAGAIN:
169             nxt_debug(c->socket.task, "readv() %E", err);
170             c->socket.read_ready = 0;
171             return NXT_AGAIN;
172 
173         case NXT_EINTR:
174             nxt_debug(c->socket.task, "readv() %E", err);
175             continue;
176 
177         default:
178             c->socket.error = err;
179             nxt_log(c->socket.task, nxt_socket_error_level(err),
180                     "readv(%d, %ui) failed %E", c->socket.fd, niov, err);
181 
182             return NXT_ERROR;
183         }
184     }
185 }
186 
187 
188 ssize_t
189 nxt_conn_io_recv(nxt_conn_t *c, void *buf, size_t size, nxt_uint_t flags)
190 {
191     ssize_t    n;
192     nxt_err_t  err;
193 
194     for ( ;; ) {
195         n = recv(c->socket.fd, buf, size, flags);
196 
197         err = (n == -1) ? nxt_socket_errno : 0;
198 
199         nxt_debug(c->socket.task, "recv(%d, %p, %uz, 0x%ui): %z",
200                   c->socket.fd, buf, size, flags, n);
201 
202         if (n > 0) {
203             if ((size_t) n < size) {
204                 c->socket.read_ready = 0;
205             }
206 
207             return n;
208         }
209 
210         if (n == 0) {
211             c->socket.closed = 1;
212             c->socket.read_ready = 0;
213 
214             return n;
215         }
216 
217         /* n == -1 */
218 
219         switch (err) {
220 
221         case NXT_EAGAIN:
222             nxt_debug(c->socket.task, "recv() %E", err);
223             c->socket.read_ready = 0;
224 
225             return NXT_AGAIN;
226 
227         case NXT_EINTR:
228             nxt_debug(c->socket.task, "recv() %E", err);
229             continue;
230 
231         default:
232             c->socket.error = err;
233             nxt_log(c->socket.task, nxt_socket_error_level(err),
234                     "recv(%d, %p, %uz, %ui) failed %E",
235                     c->socket.fd, buf, size, flags, err);
236 
237             return NXT_ERROR;
238         }
239     }
240 }
241