xref: /unit/src/nxt_conn_write.c (revision 726:e3972a4a9c73)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 static void nxt_conn_write_timer_handler(nxt_task_t *task, void *obj,
11     void *data);
12 
13 
14 void
15 nxt_conn_io_write(nxt_task_t *task, void *obj, void *data)
16 {
17     ssize_t             ret;
18     nxt_buf_t           *b;
19     nxt_conn_t          *c;
20     nxt_sendbuf_t       sb;
21     nxt_event_engine_t  *engine;
22 
23     c = obj;
24 
25     nxt_debug(task, "conn write fd:%d", c->socket.fd);
26 
27     if (c->socket.error != 0) {
28         goto error;
29     }
30 
31     if (!c->socket.write_ready || c->write == NULL) {
32         return;
33     }
34 
35     engine = task->thread->engine;
36 
37     c->socket.write_handler = nxt_conn_io_write;
38     c->socket.error_handler = c->write_state->error_handler;
39 
40     b = c->write;
41 
42     sb.socket = c->socket.fd;
43     sb.error = 0;
44     sb.sent = 0;
45     sb.size = 0;
46     sb.buf = b;
47     sb.limit = 10 * 1024 * 1024;
48     sb.ready = 1;
49     sb.sync = 0;
50 
51     do {
52         ret = nxt_conn_io_sendbuf(task, &sb);
53 
54         c->socket.write_ready = sb.ready;
55         c->socket.error = sb.error;
56 
57         if (ret < 0) {
58             /* ret == NXT_AGAIN || ret == NXT_ERROR. */
59             break;
60         }
61 
62         sb.sent += ret;
63         sb.limit -= ret;
64 
65         b = nxt_sendbuf_update(b, ret);
66 
67         if (b == NULL) {
68             nxt_fd_event_block_write(engine, &c->socket);
69             break;
70         }
71 
72         sb.buf = b;
73 
74         if (!c->socket.write_ready) {
75             ret = NXT_AGAIN;
76             break;
77         }
78 
79     } while (sb.limit != 0);
80 
81     nxt_debug(task, "event conn: %z sent:%O", ret, sb.sent);
82 
83     if (sb.sent != 0) {
84         if (c->write_state->timer_autoreset) {
85             nxt_timer_disable(engine, &c->write_timer);
86         }
87     }
88 
89     if (ret != NXT_ERROR) {
90 
91         if (sb.limit == 0) {
92             /*
93              * Postpone writing until next event poll to allow to
94              * process other recevied events and to get new events.
95              */
96             c->write_timer.handler = nxt_conn_write_timer_handler;
97             nxt_timer_add(engine, &c->write_timer, 0);
98 
99         } else if (ret == NXT_AGAIN) {
100             /*
101              * SSL libraries can require to toggle either write or read
102              * event if renegotiation occurs during SSL write operation.
103              * This case is handled on the event_io->send() level.  Timer
104              * can be set here because it should be set only for write
105              * direction.
106              */
107             nxt_conn_timer(engine, c, c->write_state, &c->write_timer);
108 
109             if (nxt_fd_event_is_disabled(c->socket.write)) {
110                 nxt_fd_event_enable_write(engine, &c->socket);
111             }
112         }
113     }
114 
115     if (ret == 0 || sb.sent != 0) {
116         /*
117          * ret == 0 means a sync buffer was processed.
118          * ret == NXT_ERROR is ignored here if some data was sent,
119          * the error will be handled on the next nxt_conn_write() call.
120          */
121         c->sent += sb.sent;
122         nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler,
123                            task, c, data);
124         return;
125     }
126 
127     /* ret == NXT_ERROR */
128 
129     nxt_fd_event_block_write(engine, &c->socket);
130 
131 error:
132 
133     nxt_work_queue_add(c->write_work_queue, c->write_state->error_handler,
134                        task, c, data);
135 }
136 
137 
138 static void
139 nxt_conn_write_timer_handler(nxt_task_t *task, void *obj, void *data)
140 {
141     nxt_conn_t   *c;
142     nxt_timer_t  *timer;
143 
144     timer = obj;
145 
146     nxt_debug(task, "event conn conn timer");
147 
148     c = nxt_write_timer_conn(timer);
149     c->delayed = 0;
150 
151     c->io->write(task, c, c->socket.data);
152 }
153 
154 
155 ssize_t
156 nxt_conn_io_sendbuf(nxt_task_t *task, nxt_sendbuf_t *sb)
157 {
158     nxt_uint_t    niov;
159     struct iovec  iov[NXT_IOBUF_MAX];
160 
161     niov = nxt_sendbuf_mem_coalesce0(task, sb, iov, NXT_IOBUF_MAX);
162 
163     if (niov == 0 && sb->sync) {
164         return 0;
165     }
166 
167     return nxt_conn_io_writev(task, sb, iov, niov);
168 }
169 
170 
171 ssize_t
172 nxt_conn_io_writev(nxt_task_t *task, nxt_sendbuf_t *sb, struct iovec *iov,
173     nxt_uint_t niov)
174 {
175     ssize_t    n;
176     nxt_err_t  err;
177 
178     if (niov == 1) {
179         /* Disposal of surplus kernel iovec copy-in operation. */
180         return nxt_conn_io_send(task, sb, iov[0].iov_base, iov[0].iov_len);
181     }
182 
183     for ( ;; ) {
184         n = writev(sb->socket, iov, niov);
185 
186         err = (n == -1) ? nxt_socket_errno : 0;
187 
188         nxt_debug(task, "writev(%d, %ui): %z", sb->socket, niov, n);
189 
190         if (n > 0) {
191             return n;
192         }
193 
194         /* n == -1 */
195 
196         switch (err) {
197 
198         case NXT_EAGAIN:
199             sb->ready = 0;
200             nxt_debug(task, "writev() %E", err);
201 
202             return NXT_AGAIN;
203 
204         case NXT_EINTR:
205             nxt_debug(task, "writev() %E", err);
206             continue;
207 
208         default:
209             sb->error = err;
210             nxt_log(task, nxt_socket_error_level(err),
211                     "writev(%d, %ui) failed %E", sb->socket, niov, err);
212 
213             return NXT_ERROR;
214         }
215     }
216 }
217 
218 
219 ssize_t
220 nxt_conn_io_send(nxt_task_t *task, nxt_sendbuf_t *sb, void *buf, size_t size)
221 {
222     ssize_t    n;
223     nxt_err_t  err;
224 
225     for ( ;; ) {
226         n = send(sb->socket, buf, size, 0);
227 
228         err = (n == -1) ? nxt_socket_errno : 0;
229 
230         nxt_debug(task, "send(%d, %p, %uz): %z", sb->socket, buf, size, n);
231 
232         if (n > 0) {
233             return n;
234         }
235 
236         /* n == -1 */
237 
238         switch (err) {
239 
240         case NXT_EAGAIN:
241             sb->ready = 0;
242             nxt_debug(task, "send() %E", err);
243 
244             return NXT_AGAIN;
245 
246         case NXT_EINTR:
247             nxt_debug(task, "send() %E", err);
248             continue;
249 
250         default:
251             sb->error = err;
252             nxt_log(task, nxt_socket_error_level(err),
253                     "send(%d, %p, %uz) failed %E", sb->socket, buf, size, err);
254 
255             return NXT_ERROR;
256         }
257     }
258 }
259 
260 
261 /* Obsolete interfaces. */
262 
263 size_t
264 nxt_event_conn_write_limit(nxt_conn_t *c)
265 {
266     ssize_t                 limit, correction;
267     nxt_event_write_rate_t  *rate;
268 
269     rate = c->rate;
270 
271     if (rate == NULL) {
272         return c->max_chunk;
273     }
274 
275     limit = rate->limit;
276     correction = limit - (size_t) rate->average;
277 
278     nxt_debug(c->socket.task, "event conn correction:%z average:%0.3f",
279               correction, rate->average);
280 
281     limit += correction;
282 
283     if (limit <= 0) {
284         return 0;
285     }
286 
287     if (rate->limit_after != 0) {
288         limit += rate->limit_after;
289         limit = nxt_min((size_t) limit, rate->max_limit);
290     }
291 
292     return nxt_min((size_t) limit, c->max_chunk);
293 }
294 
295 
296 nxt_bool_t
297 nxt_event_conn_write_delayed(nxt_event_engine_t *engine, nxt_conn_t *c,
298     size_t sent)
299 {
300     return 0;
301 }
302 
303 
304 ssize_t
305 nxt_event_conn_io_write_chunk(nxt_conn_t *c, nxt_buf_t *b, size_t limit)
306 {
307     ssize_t  ret;
308 
309     ret = c->io->sendbuf(c, b, limit);
310 
311     if ((ret == NXT_AGAIN || !c->socket.write_ready)
312         && nxt_fd_event_is_disabled(c->socket.write))
313     {
314         nxt_fd_event_enable_write(c->socket.task->thread->engine, &c->socket);
315     }
316 
317     return ret;
318 }
319 
320 
321 ssize_t
322 nxt_event_conn_io_sendbuf(nxt_conn_t *c, nxt_buf_t *b, size_t limit)
323 {
324     nxt_uint_t              niob;
325     struct iovec            iob[NXT_IOBUF_MAX];
326     nxt_sendbuf_coalesce_t  sb;
327 
328     sb.buf = b;
329     sb.iobuf = iob;
330     sb.nmax = NXT_IOBUF_MAX;
331     sb.sync = 0;
332     sb.size = 0;
333     sb.limit = limit;
334 
335     niob = nxt_sendbuf_mem_coalesce(c->socket.task, &sb);
336 
337     if (niob == 0 && sb.sync) {
338         return 0;
339     }
340 
341     return nxt_event_conn_io_writev(c, iob, niob);
342 }
343 
344 
345 ssize_t
346 nxt_event_conn_io_writev(nxt_conn_t *c, nxt_iobuf_t *iob, nxt_uint_t niob)
347 {
348     ssize_t    n;
349     nxt_err_t  err;
350 
351     if (niob == 1) {
352         /* Disposal of surplus kernel iovec copy-in operation. */
353         return nxt_event_conn_io_send(c, iob->iov_base, iob->iov_len);
354     }
355 
356     for ( ;; ) {
357         n = writev(c->socket.fd, iob, niob);
358 
359         err = (n == -1) ? nxt_socket_errno : 0;
360 
361         nxt_debug(c->socket.task, "writev(%d, %ui): %z", c->socket.fd, niob, n);
362 
363         if (n > 0) {
364             return n;
365         }
366 
367         /* n == -1 */
368 
369         switch (err) {
370 
371         case NXT_EAGAIN:
372             nxt_debug(c->socket.task, "writev() %E", err);
373             c->socket.write_ready = 0;
374             return NXT_AGAIN;
375 
376         case NXT_EINTR:
377             nxt_debug(c->socket.task, "writev() %E", err);
378             continue;
379 
380         default:
381             c->socket.error = err;
382             nxt_log(c->socket.task, nxt_socket_error_level(err),
383                     "writev(%d, %ui) failed %E", c->socket.fd, niob, err);
384             return NXT_ERROR;
385         }
386     }
387 }
388 
389 
390 ssize_t
391 nxt_event_conn_io_send(nxt_conn_t *c, void *buf, size_t size)
392 {
393     ssize_t    n;
394     nxt_err_t  err;
395 
396     for ( ;; ) {
397         n = send(c->socket.fd, buf, size, 0);
398 
399         err = (n == -1) ? nxt_socket_errno : 0;
400 
401         nxt_debug(c->socket.task, "send(%d, %p, %uz): %z",
402                   c->socket.fd, buf, size, n);
403 
404         if (n > 0) {
405             return n;
406         }
407 
408         /* n == -1 */
409 
410         switch (err) {
411 
412         case NXT_EAGAIN:
413             nxt_debug(c->socket.task, "send() %E", err);
414             c->socket.write_ready = 0;
415             return NXT_AGAIN;
416 
417         case NXT_EINTR:
418             nxt_debug(c->socket.task, "send() %E", err);
419             continue;
420 
421         default:
422             c->socket.error = err;
423             nxt_log(c->socket.task, nxt_socket_error_level(err),
424                     "send(%d, %p, %uz) failed %E",
425                     c->socket.fd, buf, size, err);
426             return NXT_ERROR;
427         }
428     }
429 }
430