xref: /unit/src/nxt_conn_write.c (revision 2652:29e22091601c)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 static void nxt_conn_write_timer_handler(nxt_task_t *task, void *obj,
11     void *data);
12 static ssize_t nxt_conn_io_sendfile(nxt_task_t *task, nxt_sendbuf_t *sb);
13 static ssize_t nxt_sendfile(int fd, int s, off_t pos, size_t size);
14 
15 
16 void
nxt_conn_io_write(nxt_task_t * task,void * obj,void * data)17 nxt_conn_io_write(nxt_task_t *task, void *obj, void *data)
18 {
19     ssize_t             ret;
20     nxt_buf_t           *b;
21     nxt_conn_t          *c;
22     nxt_sendbuf_t       sb;
23     nxt_event_engine_t  *engine;
24 
25     c = obj;
26 
27     nxt_debug(task, "conn write fd:%d er:%d bl:%d",
28               c->socket.fd, c->socket.error, c->block_write);
29 
30     if (c->socket.error != 0 || c->block_write) {
31         goto error;
32     }
33 
34     if (!c->socket.write_ready || c->write == NULL) {
35         return;
36     }
37 
38     engine = task->thread->engine;
39 
40     c->socket.write_handler = nxt_conn_io_write;
41     c->socket.error_handler = c->write_state->error_handler;
42 
43     b = c->write;
44 
45     sb.socket = c->socket.fd;
46     sb.error = 0;
47     sb.sent = 0;
48     sb.size = 0;
49     sb.buf = b;
50 #if (NXT_TLS)
51     sb.tls = c->u.tls;
52 #endif
53     sb.limit = 10 * 1024 * 1024;
54     sb.ready = 1;
55     sb.sync = 0;
56 
57     do {
58         ret = c->io->sendbuf(task, &sb);
59 
60         c->socket.write_ready = sb.ready;
61         c->socket.error = sb.error;
62 
63         if (ret < 0) {
64             /* ret == NXT_AGAIN || ret == NXT_ERROR. */
65             break;
66         }
67 
68         sb.sent += ret;
69         sb.limit -= ret;
70 
71         b = nxt_sendbuf_update(b, ret);
72 
73         if (b == NULL) {
74             nxt_fd_event_block_write(engine, &c->socket);
75             break;
76         }
77 
78         sb.buf = b;
79 
80         if (!c->socket.write_ready) {
81             ret = NXT_AGAIN;
82             break;
83         }
84 
85     } while (sb.limit != 0);
86 
87     nxt_debug(task, "event conn: %z sent:%O", ret, sb.sent);
88 
89     if (sb.sent != 0) {
90         if (c->write_state->timer_autoreset) {
91             nxt_timer_disable(engine, &c->write_timer);
92         }
93     }
94 
95     if (ret != NXT_ERROR) {
96 
97         if (sb.limit == 0) {
98             /*
99              * Postpone writing until next event poll to allow to
100              * process other received events and to get new events.
101              */
102             c->write_timer.handler = nxt_conn_write_timer_handler;
103             nxt_timer_add(engine, &c->write_timer, 0);
104 
105         } else if (ret == NXT_AGAIN) {
106             /*
107              * SSL libraries can require to toggle either write or read
108              * event if renegotiation occurs during SSL write operation.
109              * This case is handled on the c->io->send() level.  Timer
110              * can be set here because it should be set only for write
111              * direction.
112              */
113             nxt_conn_timer(engine, c, c->write_state, &c->write_timer);
114 
115             if (nxt_fd_event_is_disabled(c->socket.write)) {
116                 nxt_fd_event_enable_write(engine, &c->socket);
117             }
118         }
119     }
120 
121     if (ret == 0 || sb.sent != 0) {
122         /*
123          * ret == 0 means a sync buffer was processed.
124          * ret == NXT_ERROR is ignored here if some data was sent,
125          * the error will be handled on the next nxt_conn_write() call.
126          */
127         c->sent += sb.sent;
128         nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler,
129                            task, c, data);
130         return;
131     }
132 
133     if (ret != NXT_ERROR) {
134         return;
135     }
136 
137     nxt_fd_event_block_write(engine, &c->socket);
138 
139 error:
140 
141     nxt_work_queue_add(c->write_work_queue, c->write_state->error_handler,
142                        task, c, data);
143 }
144 
145 
146 static void
nxt_conn_write_timer_handler(nxt_task_t * task,void * obj,void * data)147 nxt_conn_write_timer_handler(nxt_task_t *task, void *obj, void *data)
148 {
149     nxt_conn_t   *c;
150     nxt_timer_t  *timer;
151 
152     timer = obj;
153 
154     nxt_debug(task, "conn write timer");
155 
156     c = nxt_write_timer_conn(timer);
157     c->delayed = 0;
158 
159     c->io->write(task, c, c->socket.data);
160 }
161 
162 
163 ssize_t
nxt_conn_io_sendbuf(nxt_task_t * task,nxt_sendbuf_t * sb)164 nxt_conn_io_sendbuf(nxt_task_t *task, nxt_sendbuf_t *sb)
165 {
166     nxt_uint_t    niov;
167     struct iovec  iov[NXT_IOBUF_MAX];
168 
169     niov = nxt_sendbuf_mem_coalesce0(task, sb, iov, NXT_IOBUF_MAX);
170 
171     if (niov == 0 && sb->sync) {
172         return 0;
173     }
174 
175     /*
176      * XXX Temporary fix for <https://github.com/nginx/unit/issues/1125>
177      */
178     if (niov == 0 && sb->buf == NULL) {
179         return 0;
180     }
181 
182     if (niov == 0 && nxt_buf_is_file(sb->buf)) {
183         return nxt_conn_io_sendfile(task, sb);
184     }
185 
186     return nxt_conn_io_writev(task, sb, iov, niov);
187 }
188 
189 
190 static ssize_t
nxt_conn_io_sendfile(nxt_task_t * task,nxt_sendbuf_t * sb)191 nxt_conn_io_sendfile(nxt_task_t *task, nxt_sendbuf_t *sb)
192 {
193     size_t     size;
194     ssize_t    n;
195     nxt_buf_t  *b;
196     nxt_err_t  err;
197 
198     b = sb->buf;
199 
200     for ( ;; ) {
201         size = b->file_end - b->file_pos;
202 
203         n = nxt_sendfile(b->file->fd, sb->socket, b->file_pos, size);
204 
205         err = (n == -1) ? nxt_errno : 0;
206 
207         nxt_debug(task, "sendfile(%FD, %d, @%O, %uz): %z",
208                   b->file->fd, sb->socket, b->file_pos, size, n);
209 
210         if (n > 0) {
211             if (n < (ssize_t) size) {
212                 sb->ready = 0;
213             }
214 
215             return n;
216         }
217 
218         if (nxt_slow_path(n == 0)) {
219             nxt_alert(task, "sendfile() reported that file was truncated at %O",
220                       b->file_pos);
221 
222             return NXT_ERROR;
223         }
224 
225         /* n == -1 */
226 
227         switch (err) {
228 
229         case NXT_EAGAIN:
230             sb->ready = 0;
231             nxt_debug(task, "sendfile() %E", err);
232 
233             return NXT_AGAIN;
234 
235         case NXT_EINTR:
236             nxt_debug(task, "sendfile() %E", err);
237             continue;
238 
239         default:
240             sb->error = err;
241             nxt_log(task, nxt_socket_error_level(err),
242                     "sendfile(%FD, %d, @%O, %uz) failed %E",
243                     b->file->fd, sb->socket, b->file_pos, size, err);
244 
245             return NXT_ERROR;
246         }
247     }
248 }
249 
250 
251 static ssize_t
nxt_sendfile(int fd,int s,off_t pos,size_t size)252 nxt_sendfile(int fd, int s, off_t pos, size_t size)
253 {
254     ssize_t  res;
255 
256 #if (NXT_HAVE_MACOSX_SENDFILE)
257 
258     off_t sent = size;
259 
260     int rc = sendfile(fd, s, pos, &sent, NULL, 0);
261 
262     res = (rc == 0 || sent > 0) ? sent : -1;
263 
264 #elif (NXT_HAVE_FREEBSD_SENDFILE)
265 
266     off_t sent = 0;
267 
268     int rc = sendfile(fd, s, pos, size, NULL, &sent, 0);
269 
270     res = (rc == 0 || sent > 0) ? sent : -1;
271 
272 #elif (NXT_HAVE_LINUX_SENDFILE)
273 
274     res = sendfile(s, fd, &pos, size);
275 
276 #else
277 
278     int    err;
279     void   *map;
280     off_t  page_off;
281 
282     page_off = pos % nxt_pagesize;
283 
284     map = nxt_mem_mmap(NULL, size + page_off, PROT_READ, MAP_SHARED, fd,
285                        pos - page_off);
286     if (nxt_slow_path(map == MAP_FAILED)) {
287         return -1;
288     }
289 
290     res = write(s, nxt_pointer_to(map, page_off), size);
291 
292     /* Backup and restore errno to catch socket errors in the upper level. */
293     err = errno;
294     nxt_mem_munmap(map, size + page_off);
295     errno = err;
296 
297 #endif
298 
299     return res;
300 }
301 
302 
303 ssize_t
nxt_conn_io_writev(nxt_task_t * task,nxt_sendbuf_t * sb,struct iovec * iov,nxt_uint_t niov)304 nxt_conn_io_writev(nxt_task_t *task, nxt_sendbuf_t *sb, struct iovec *iov,
305     nxt_uint_t niov)
306 {
307     ssize_t    n;
308     nxt_err_t  err;
309 
310     if (niov == 1) {
311         /* Disposal of surplus kernel iovec copy-in operation. */
312         return nxt_conn_io_send(task, sb, iov[0].iov_base, iov[0].iov_len);
313     }
314 
315     for ( ;; ) {
316         n = writev(sb->socket, iov, niov);
317 
318         err = (n == -1) ? nxt_socket_errno : 0;
319 
320         nxt_debug(task, "writev(%d, %ui): %z", sb->socket, niov, n);
321 
322         if (n > 0) {
323             return n;
324         }
325 
326         /* n == -1 */
327 
328         switch (err) {
329 
330         case NXT_EAGAIN:
331             sb->ready = 0;
332             nxt_debug(task, "writev() %E", err);
333 
334             return NXT_AGAIN;
335 
336         case NXT_EINTR:
337             nxt_debug(task, "writev() %E", err);
338             continue;
339 
340         default:
341             sb->error = err;
342             nxt_log(task, nxt_socket_error_level(err),
343                     "writev(%d, %ui) failed %E", sb->socket, niov, err);
344 
345             return NXT_ERROR;
346         }
347     }
348 }
349 
350 
351 ssize_t
nxt_conn_io_send(nxt_task_t * task,nxt_sendbuf_t * sb,void * buf,size_t size)352 nxt_conn_io_send(nxt_task_t *task, nxt_sendbuf_t *sb, void *buf, size_t size)
353 {
354     ssize_t    n;
355     nxt_err_t  err;
356 
357     for ( ;; ) {
358         n = send(sb->socket, buf, size, 0);
359 
360         err = (n == -1) ? nxt_socket_errno : 0;
361 
362         nxt_debug(task, "send(%d, %p, %uz): %z", sb->socket, buf, size, n);
363 
364         if (n > 0) {
365             return n;
366         }
367 
368         /* n == -1 */
369 
370         switch (err) {
371 
372         case NXT_EAGAIN:
373             sb->ready = 0;
374             nxt_debug(task, "send() %E", err);
375 
376             return NXT_AGAIN;
377 
378         case NXT_EINTR:
379             nxt_debug(task, "send() %E", err);
380             continue;
381 
382         default:
383             sb->error = err;
384             nxt_log(task, nxt_socket_error_level(err),
385                     "send(%d, %p, %uz) failed %E", sb->socket, buf, size, err);
386 
387             return NXT_ERROR;
388         }
389     }
390 }
391 
392 
393 /* Obsolete interfaces. */
394 
395 size_t
nxt_event_conn_write_limit(nxt_conn_t * c)396 nxt_event_conn_write_limit(nxt_conn_t *c)
397 {
398     ssize_t                 limit, correction;
399     nxt_event_write_rate_t  *rate;
400 
401     rate = c->rate;
402 
403     if (rate == NULL) {
404         return c->max_chunk;
405     }
406 
407     limit = rate->limit;
408     correction = limit - (size_t) rate->average;
409 
410     nxt_debug(c->socket.task, "event conn correction:%z average:%0.3f",
411               correction, rate->average);
412 
413     limit += correction;
414 
415     if (limit <= 0) {
416         return 0;
417     }
418 
419     if (rate->limit_after != 0) {
420         limit += rate->limit_after;
421         limit = nxt_min((size_t) limit, rate->max_limit);
422     }
423 
424     return nxt_min((size_t) limit, c->max_chunk);
425 }
426 
427 
428 nxt_bool_t
nxt_event_conn_write_delayed(nxt_event_engine_t * engine,nxt_conn_t * c,size_t sent)429 nxt_event_conn_write_delayed(nxt_event_engine_t *engine, nxt_conn_t *c,
430     size_t sent)
431 {
432     return 0;
433 }
434 
435 
436 ssize_t
nxt_event_conn_io_sendbuf(nxt_conn_t * c,nxt_buf_t * b,size_t limit)437 nxt_event_conn_io_sendbuf(nxt_conn_t *c, nxt_buf_t *b, size_t limit)
438 {
439     nxt_uint_t              niob;
440     struct iovec            iob[NXT_IOBUF_MAX];
441     nxt_sendbuf_coalesce_t  sb;
442 
443     sb.buf = b;
444     sb.iobuf = iob;
445     sb.nmax = NXT_IOBUF_MAX;
446     sb.sync = 0;
447     sb.size = 0;
448     sb.limit = limit;
449 
450     niob = nxt_sendbuf_mem_coalesce(c->socket.task, &sb);
451 
452     if (niob == 0 && sb.sync) {
453         return 0;
454     }
455 
456     return nxt_event_conn_io_writev(c, iob, niob);
457 }
458 
459 
460 ssize_t
nxt_event_conn_io_writev(nxt_conn_t * c,nxt_iobuf_t * iob,nxt_uint_t niob)461 nxt_event_conn_io_writev(nxt_conn_t *c, nxt_iobuf_t *iob, nxt_uint_t niob)
462 {
463     ssize_t    n;
464     nxt_err_t  err;
465 
466     if (niob == 1) {
467         /* Disposal of surplus kernel iovec copy-in operation. */
468         return nxt_event_conn_io_send(c, iob->iov_base, iob->iov_len);
469     }
470 
471     for ( ;; ) {
472         n = writev(c->socket.fd, iob, niob);
473 
474         err = (n == -1) ? nxt_socket_errno : 0;
475 
476         nxt_debug(c->socket.task, "writev(%d, %ui): %z", c->socket.fd, niob, n);
477 
478         if (n > 0) {
479             return n;
480         }
481 
482         /* n == -1 */
483 
484         switch (err) {
485 
486         case NXT_EAGAIN:
487             nxt_debug(c->socket.task, "writev() %E", err);
488             c->socket.write_ready = 0;
489             return NXT_AGAIN;
490 
491         case NXT_EINTR:
492             nxt_debug(c->socket.task, "writev() %E", err);
493             continue;
494 
495         default:
496             c->socket.error = err;
497             nxt_log(c->socket.task, nxt_socket_error_level(err),
498                     "writev(%d, %ui) failed %E", c->socket.fd, niob, err);
499             return NXT_ERROR;
500         }
501     }
502 }
503 
504 
505 ssize_t
nxt_event_conn_io_send(nxt_conn_t * c,void * buf,size_t size)506 nxt_event_conn_io_send(nxt_conn_t *c, void *buf, size_t size)
507 {
508     ssize_t    n;
509     nxt_err_t  err;
510 
511     for ( ;; ) {
512         n = send(c->socket.fd, buf, size, 0);
513 
514         err = (n == -1) ? nxt_socket_errno : 0;
515 
516         nxt_debug(c->socket.task, "send(%d, %p, %uz): %z",
517                   c->socket.fd, buf, size, n);
518 
519         if (n > 0) {
520             return n;
521         }
522 
523         /* n == -1 */
524 
525         switch (err) {
526 
527         case NXT_EAGAIN:
528             nxt_debug(c->socket.task, "send() %E", err);
529             c->socket.write_ready = 0;
530             return NXT_AGAIN;
531 
532         case NXT_EINTR:
533             nxt_debug(c->socket.task, "send() %E", err);
534             continue;
535 
536         default:
537             c->socket.error = err;
538             nxt_log(c->socket.task, nxt_socket_error_level(err),
539                     "send(%d, %p, %uz) failed %E",
540                     c->socket.fd, buf, size, err);
541             return NXT_ERROR;
542         }
543     }
544 }
545