1
2 /*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7 #include <nxt_main.h>
8
9
10 void
nxt_conn_wait(nxt_conn_t * c)11 nxt_conn_wait(nxt_conn_t *c)
12 {
13 nxt_event_engine_t *engine;
14 const nxt_conn_state_t *state;
15
16 nxt_debug(c->socket.task, "conn wait fd:%d rdy:%d",
17 c->socket.fd, c->socket.read_ready);
18
19 engine = c->socket.task->thread->engine;
20 state = c->read_state;
21
22 if (c->socket.read_ready) {
23 nxt_work_queue_add(&engine->fast_work_queue, state->ready_handler,
24 c->socket.task, c, c->socket.data);
25 return;
26 }
27
28 c->socket.read_handler = state->ready_handler;
29 c->socket.error_handler = state->error_handler;
30
31 nxt_conn_timer(engine, c, state, &c->read_timer);
32
33 nxt_fd_event_enable_read(engine, &c->socket);
34 }
35
36
37 void
nxt_conn_io_read(nxt_task_t * task,void * obj,void * data)38 nxt_conn_io_read(nxt_task_t *task, void *obj, void *data)
39 {
40 ssize_t n;
41 nxt_conn_t *c;
42 nxt_event_engine_t *engine;
43 nxt_work_handler_t handler;
44 const nxt_conn_state_t *state;
45
46 c = obj;
47
48 nxt_debug(task, "conn read fd:%d rdy:%d cl:%d er:%d bl:%d",
49 c->socket.fd, c->socket.read_ready, c->socket.closed,
50 c->socket.error, c->block_read);
51
52 if (c->socket.error != 0 || c->block_read) {
53 return;
54 }
55
56 engine = task->thread->engine;
57
58 /*
59 * Here c->io->read() is assigned instead of direct nxt_conn_io_read()
60 * because the function can be called by nxt_kqueue_conn_io_read().
61 */
62 c->socket.read_handler = c->io->read;
63 state = c->read_state;
64 c->socket.error_handler = state->error_handler;
65
66 if (c->socket.read_ready) {
67
68 if (state->io_read_handler == NULL) {
69 n = c->io->recvbuf(c, c->read);
70
71 } else {
72 n = state->io_read_handler(task, c);
73 /* The state can be changed by io_read_handler. */
74 state = c->read_state;
75 }
76
77 if (n > 0) {
78 c->nbytes = n;
79
80 nxt_recvbuf_update(c->read, n);
81
82 nxt_fd_event_block_read(engine, &c->socket);
83
84 if (state->timer_autoreset) {
85 nxt_timer_disable(engine, &c->read_timer);
86 }
87
88 nxt_work_queue_add(c->read_work_queue,
89 state->ready_handler, task, c, data);
90 return;
91 }
92
93 if (n != NXT_AGAIN) {
94 /* n == 0 or n == NXT_ERROR. */
95 handler = (n == 0) ? state->close_handler : state->error_handler;
96
97 nxt_fd_event_block_read(engine, &c->socket);
98 nxt_timer_disable(engine, &c->read_timer);
99
100 nxt_work_queue_add(&engine->fast_work_queue,
101 handler, task, c, data);
102 return;
103 }
104
105 /* n == NXT_AGAIN. */
106
107 if (c->socket.read_ready) {
108 /*
109 * SSL/TLS library can return NXT_AGAIN if renegotiation
110 * occured during read operation, it toggled write event
111 * internally so only read timer should be set.
112 */
113 if (!c->read_timer.enabled) {
114 nxt_conn_timer(engine, c, state, &c->read_timer);
115 }
116
117 return;
118 }
119 }
120
121 if (nxt_fd_event_is_disabled(c->socket.read)) {
122 nxt_fd_event_enable_read(engine, &c->socket);
123 }
124
125 if (state->timer_autoreset || !c->read_timer.enabled) {
126 nxt_conn_timer(engine, c, state, &c->read_timer);
127 }
128 }
129
130
131 ssize_t
nxt_conn_io_recvbuf(nxt_conn_t * c,nxt_buf_t * b)132 nxt_conn_io_recvbuf(nxt_conn_t *c, nxt_buf_t *b)
133 {
134 ssize_t n;
135 nxt_err_t err;
136 nxt_uint_t niov;
137 struct iovec iov[NXT_IOBUF_MAX];
138 nxt_recvbuf_coalesce_t rb;
139
140 rb.buf = b;
141 rb.iobuf = iov;
142 rb.nmax = NXT_IOBUF_MAX;
143 rb.size = 0;
144
145 niov = nxt_recvbuf_mem_coalesce(&rb);
146
147 if (niov == 1) {
148 /* Disposal of surplus kernel iovec copy-in operation. */
149 return nxt_conn_io_recv(c, iov->iov_base, iov->iov_len, 0);
150 }
151
152 for ( ;; ) {
153 n = readv(c->socket.fd, iov, niov);
154
155 err = (n == -1) ? nxt_socket_errno : 0;
156
157 nxt_debug(c->socket.task, "readv(%d, %ui): %z", c->socket.fd, niov, n);
158
159 if (n > 0) {
160 if ((size_t) n < rb.size) {
161 c->socket.read_ready = 0;
162 }
163
164 return n;
165 }
166
167 if (n == 0) {
168 c->socket.closed = 1;
169 c->socket.read_ready = 0;
170 return n;
171 }
172
173 /* n == -1 */
174
175 switch (err) {
176
177 case NXT_EAGAIN:
178 nxt_debug(c->socket.task, "readv() %E", err);
179 c->socket.read_ready = 0;
180 return NXT_AGAIN;
181
182 case NXT_EINTR:
183 nxt_debug(c->socket.task, "readv() %E", err);
184 continue;
185
186 default:
187 c->socket.error = err;
188 nxt_log(c->socket.task, nxt_socket_error_level(err),
189 "readv(%d, %ui) failed %E", c->socket.fd, niov, err);
190
191 return NXT_ERROR;
192 }
193 }
194 }
195
196
197 ssize_t
nxt_conn_io_recv(nxt_conn_t * c,void * buf,size_t size,nxt_uint_t flags)198 nxt_conn_io_recv(nxt_conn_t *c, void *buf, size_t size, nxt_uint_t flags)
199 {
200 ssize_t n;
201 nxt_err_t err;
202
203 for ( ;; ) {
204 n = recv(c->socket.fd, buf, size, flags);
205
206 err = (n == -1) ? nxt_socket_errno : 0;
207
208 nxt_debug(c->socket.task, "recv(%d, %p, %uz, 0x%ui): %z",
209 c->socket.fd, buf, size, flags, n);
210
211 if (n > 0) {
212 if ((size_t) n < size && (flags & MSG_PEEK) == 0) {
213 c->socket.read_ready = 0;
214 }
215
216 return n;
217 }
218
219 if (n == 0) {
220 c->socket.closed = 1;
221
222 if ((flags & MSG_PEEK) == 0) {
223 c->socket.read_ready = 0;
224 }
225
226 return n;
227 }
228
229 /* n == -1 */
230
231 switch (err) {
232
233 case NXT_EAGAIN:
234 nxt_debug(c->socket.task, "recv() %E", err);
235 c->socket.read_ready = 0;
236
237 return NXT_AGAIN;
238
239 case NXT_EINTR:
240 nxt_debug(c->socket.task, "recv() %E", err);
241 continue;
242
243 default:
244 c->socket.error = err;
245 nxt_log(c->socket.task, nxt_socket_error_level(err),
246 "recv(%d, %p, %uz, %ui) failed %E",
247 c->socket.fd, buf, size, flags, err);
248
249 return NXT_ERROR;
250 }
251 }
252 }
253