xref: /unit/src/nxt_conn_write.c (revision 1577:604db78b62f9)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 static void nxt_conn_write_timer_handler(nxt_task_t *task, void *obj,
11     void *data);
12 static ssize_t nxt_conn_io_sendfile(nxt_task_t *task, nxt_sendbuf_t *sb);
13 static ssize_t nxt_sendfile(int fd, int s, off_t pos, size_t size);
14 
15 
16 void
nxt_conn_io_write(nxt_task_t * task,void * obj,void * data)17 nxt_conn_io_write(nxt_task_t *task, void *obj, void *data)
18 {
19     ssize_t             ret;
20     nxt_buf_t           *b;
21     nxt_conn_t          *c;
22     nxt_sendbuf_t       sb;
23     nxt_event_engine_t  *engine;
24 
25     c = obj;
26 
27     nxt_debug(task, "conn write fd:%d er:%d bl:%d",
28               c->socket.fd, c->socket.error, c->block_write);
29 
30     if (c->socket.error != 0 || c->block_write) {
31         goto error;
32     }
33 
34     if (!c->socket.write_ready || c->write == NULL) {
35         return;
36     }
37 
38     engine = task->thread->engine;
39 
40     c->socket.write_handler = nxt_conn_io_write;
41     c->socket.error_handler = c->write_state->error_handler;
42 
43     b = c->write;
44 
45     sb.socket = c->socket.fd;
46     sb.error = 0;
47     sb.sent = 0;
48     sb.size = 0;
49     sb.buf = b;
50 #if (NXT_TLS)
51     sb.tls = c->u.tls;
52 #endif
53     sb.limit = 10 * 1024 * 1024;
54     sb.ready = 1;
55     sb.sync = 0;
56 
57     do {
58         ret = c->io->sendbuf(task, &sb);
59 
60         c->socket.write_ready = sb.ready;
61         c->socket.error = sb.error;
62 
63         if (ret < 0) {
64             /* ret == NXT_AGAIN || ret == NXT_ERROR. */
65             break;
66         }
67 
68         sb.sent += ret;
69         sb.limit -= ret;
70 
71         b = nxt_sendbuf_update(b, ret);
72 
73         if (b == NULL) {
74             nxt_fd_event_block_write(engine, &c->socket);
75             break;
76         }
77 
78         sb.buf = b;
79 
80         if (!c->socket.write_ready) {
81             ret = NXT_AGAIN;
82             break;
83         }
84 
85     } while (sb.limit != 0);
86 
87     nxt_debug(task, "event conn: %z sent:%O", ret, sb.sent);
88 
89     if (sb.sent != 0) {
90         if (c->write_state->timer_autoreset) {
91             nxt_timer_disable(engine, &c->write_timer);
92         }
93     }
94 
95     if (ret != NXT_ERROR) {
96 
97         if (sb.limit == 0) {
98             /*
99              * Postpone writing until next event poll to allow to
100              * process other recevied events and to get new events.
101              */
102             c->write_timer.handler = nxt_conn_write_timer_handler;
103             nxt_timer_add(engine, &c->write_timer, 0);
104 
105         } else if (ret == NXT_AGAIN) {
106             /*
107              * SSL libraries can require to toggle either write or read
108              * event if renegotiation occurs during SSL write operation.
109              * This case is handled on the c->io->send() level.  Timer
110              * can be set here because it should be set only for write
111              * direction.
112              */
113             nxt_conn_timer(engine, c, c->write_state, &c->write_timer);
114 
115             if (nxt_fd_event_is_disabled(c->socket.write)) {
116                 nxt_fd_event_enable_write(engine, &c->socket);
117             }
118         }
119     }
120 
121     if (ret == 0 || sb.sent != 0) {
122         /*
123          * ret == 0 means a sync buffer was processed.
124          * ret == NXT_ERROR is ignored here if some data was sent,
125          * the error will be handled on the next nxt_conn_write() call.
126          */
127         c->sent += sb.sent;
128         nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler,
129                            task, c, data);
130         return;
131     }
132 
133     if (ret != NXT_ERROR) {
134         return;
135     }
136 
137     nxt_fd_event_block_write(engine, &c->socket);
138 
139 error:
140 
141     nxt_work_queue_add(c->write_work_queue, c->write_state->error_handler,
142                        task, c, data);
143 }
144 
145 
146 static void
nxt_conn_write_timer_handler(nxt_task_t * task,void * obj,void * data)147 nxt_conn_write_timer_handler(nxt_task_t *task, void *obj, void *data)
148 {
149     nxt_conn_t   *c;
150     nxt_timer_t  *timer;
151 
152     timer = obj;
153 
154     nxt_debug(task, "conn write timer");
155 
156     c = nxt_write_timer_conn(timer);
157     c->delayed = 0;
158 
159     c->io->write(task, c, c->socket.data);
160 }
161 
162 
163 ssize_t
nxt_conn_io_sendbuf(nxt_task_t * task,nxt_sendbuf_t * sb)164 nxt_conn_io_sendbuf(nxt_task_t *task, nxt_sendbuf_t *sb)
165 {
166     nxt_uint_t    niov;
167     struct iovec  iov[NXT_IOBUF_MAX];
168 
169     niov = nxt_sendbuf_mem_coalesce0(task, sb, iov, NXT_IOBUF_MAX);
170 
171     if (niov == 0 && sb->sync) {
172         return 0;
173     }
174 
175     if (niov == 0 && nxt_buf_is_file(sb->buf)) {
176         return nxt_conn_io_sendfile(task, sb);
177     }
178 
179     return nxt_conn_io_writev(task, sb, iov, niov);
180 }
181 
182 
183 static ssize_t
nxt_conn_io_sendfile(nxt_task_t * task,nxt_sendbuf_t * sb)184 nxt_conn_io_sendfile(nxt_task_t *task, nxt_sendbuf_t *sb)
185 {
186     size_t     size;
187     ssize_t    n;
188     nxt_buf_t  *b;
189     nxt_err_t  err;
190 
191     b = sb->buf;
192 
193     for ( ;; ) {
194         size = b->file_end - b->file_pos;
195 
196         n = nxt_sendfile(b->file->fd, sb->socket, b->file_pos, size);
197 
198         err = (n == -1) ? nxt_errno : 0;
199 
200         nxt_debug(task, "sendfile(%FD, %d, @%O, %uz): %z",
201                   b->file->fd, sb->socket, b->file_pos, size, n);
202 
203         if (n > 0) {
204             if (n < (ssize_t) size) {
205                 sb->ready = 0;
206             }
207 
208             return n;
209         }
210 
211         if (nxt_slow_path(n == 0)) {
212             nxt_alert(task, "sendfile() reported that file was truncated at %O",
213                       b->file_pos);
214 
215             return NXT_ERROR;
216         }
217 
218         /* n == -1 */
219 
220         switch (err) {
221 
222         case NXT_EAGAIN:
223             sb->ready = 0;
224             nxt_debug(task, "sendfile() %E", err);
225 
226             return NXT_AGAIN;
227 
228         case NXT_EINTR:
229             nxt_debug(task, "sendfile() %E", err);
230             continue;
231 
232         default:
233             sb->error = err;
234             nxt_log(task, nxt_socket_error_level(err),
235                     "sendfile(%FD, %d, @%O, %uz) failed %E",
236                     b->file->fd, sb->socket, b->file_pos, size, err);
237 
238             return NXT_ERROR;
239         }
240     }
241 }
242 
243 
244 static ssize_t
nxt_sendfile(int fd,int s,off_t pos,size_t size)245 nxt_sendfile(int fd, int s, off_t pos, size_t size)
246 {
247     ssize_t  res;
248 
249 #if (NXT_HAVE_MACOSX_SENDFILE)
250 
251     off_t sent = size;
252 
253     int rc = sendfile(fd, s, pos, &sent, NULL, 0);
254 
255     res = (rc == 0 || sent > 0) ? sent : -1;
256 
257 #elif (NXT_HAVE_FREEBSD_SENDFILE)
258 
259     off_t sent = 0;
260 
261     int rc = sendfile(fd, s, pos, size, NULL, &sent, 0);
262 
263     res = (rc == 0 || sent > 0) ? sent : -1;
264 
265 #elif (NXT_HAVE_LINUX_SENDFILE)
266 
267     res = sendfile(s, fd, &pos, size);
268 
269 #else
270 
271     int    err;
272     void   *map;
273     off_t  page_off;
274 
275     page_off = pos % nxt_pagesize;
276 
277     map = nxt_mem_mmap(NULL, size + page_off, PROT_READ, MAP_SHARED, fd,
278                        pos - page_off);
279     if (nxt_slow_path(map == MAP_FAILED)) {
280         return -1;
281     }
282 
283     res = write(s, nxt_pointer_to(map, page_off), size);
284 
285     /* Backup and restore errno to catch socket errors in the upper level. */
286     err = errno;
287     nxt_mem_munmap(map, size + page_off);
288     errno = err;
289 
290 #endif
291 
292     return res;
293 }
294 
295 
296 ssize_t
nxt_conn_io_writev(nxt_task_t * task,nxt_sendbuf_t * sb,struct iovec * iov,nxt_uint_t niov)297 nxt_conn_io_writev(nxt_task_t *task, nxt_sendbuf_t *sb, struct iovec *iov,
298     nxt_uint_t niov)
299 {
300     ssize_t    n;
301     nxt_err_t  err;
302 
303     if (niov == 1) {
304         /* Disposal of surplus kernel iovec copy-in operation. */
305         return nxt_conn_io_send(task, sb, iov[0].iov_base, iov[0].iov_len);
306     }
307 
308     for ( ;; ) {
309         n = writev(sb->socket, iov, niov);
310 
311         err = (n == -1) ? nxt_socket_errno : 0;
312 
313         nxt_debug(task, "writev(%d, %ui): %z", sb->socket, niov, n);
314 
315         if (n > 0) {
316             return n;
317         }
318 
319         /* n == -1 */
320 
321         switch (err) {
322 
323         case NXT_EAGAIN:
324             sb->ready = 0;
325             nxt_debug(task, "writev() %E", err);
326 
327             return NXT_AGAIN;
328 
329         case NXT_EINTR:
330             nxt_debug(task, "writev() %E", err);
331             continue;
332 
333         default:
334             sb->error = err;
335             nxt_log(task, nxt_socket_error_level(err),
336                     "writev(%d, %ui) failed %E", sb->socket, niov, err);
337 
338             return NXT_ERROR;
339         }
340     }
341 }
342 
343 
344 ssize_t
nxt_conn_io_send(nxt_task_t * task,nxt_sendbuf_t * sb,void * buf,size_t size)345 nxt_conn_io_send(nxt_task_t *task, nxt_sendbuf_t *sb, void *buf, size_t size)
346 {
347     ssize_t    n;
348     nxt_err_t  err;
349 
350     for ( ;; ) {
351         n = send(sb->socket, buf, size, 0);
352 
353         err = (n == -1) ? nxt_socket_errno : 0;
354 
355         nxt_debug(task, "send(%d, %p, %uz): %z", sb->socket, buf, size, n);
356 
357         if (n > 0) {
358             return n;
359         }
360 
361         /* n == -1 */
362 
363         switch (err) {
364 
365         case NXT_EAGAIN:
366             sb->ready = 0;
367             nxt_debug(task, "send() %E", err);
368 
369             return NXT_AGAIN;
370 
371         case NXT_EINTR:
372             nxt_debug(task, "send() %E", err);
373             continue;
374 
375         default:
376             sb->error = err;
377             nxt_log(task, nxt_socket_error_level(err),
378                     "send(%d, %p, %uz) failed %E", sb->socket, buf, size, err);
379 
380             return NXT_ERROR;
381         }
382     }
383 }
384 
385 
386 /* Obsolete interfaces. */
387 
388 size_t
nxt_event_conn_write_limit(nxt_conn_t * c)389 nxt_event_conn_write_limit(nxt_conn_t *c)
390 {
391     ssize_t                 limit, correction;
392     nxt_event_write_rate_t  *rate;
393 
394     rate = c->rate;
395 
396     if (rate == NULL) {
397         return c->max_chunk;
398     }
399 
400     limit = rate->limit;
401     correction = limit - (size_t) rate->average;
402 
403     nxt_debug(c->socket.task, "event conn correction:%z average:%0.3f",
404               correction, rate->average);
405 
406     limit += correction;
407 
408     if (limit <= 0) {
409         return 0;
410     }
411 
412     if (rate->limit_after != 0) {
413         limit += rate->limit_after;
414         limit = nxt_min((size_t) limit, rate->max_limit);
415     }
416 
417     return nxt_min((size_t) limit, c->max_chunk);
418 }
419 
420 
421 nxt_bool_t
nxt_event_conn_write_delayed(nxt_event_engine_t * engine,nxt_conn_t * c,size_t sent)422 nxt_event_conn_write_delayed(nxt_event_engine_t *engine, nxt_conn_t *c,
423     size_t sent)
424 {
425     return 0;
426 }
427 
428 
429 ssize_t
nxt_event_conn_io_sendbuf(nxt_conn_t * c,nxt_buf_t * b,size_t limit)430 nxt_event_conn_io_sendbuf(nxt_conn_t *c, nxt_buf_t *b, size_t limit)
431 {
432     nxt_uint_t              niob;
433     struct iovec            iob[NXT_IOBUF_MAX];
434     nxt_sendbuf_coalesce_t  sb;
435 
436     sb.buf = b;
437     sb.iobuf = iob;
438     sb.nmax = NXT_IOBUF_MAX;
439     sb.sync = 0;
440     sb.size = 0;
441     sb.limit = limit;
442 
443     niob = nxt_sendbuf_mem_coalesce(c->socket.task, &sb);
444 
445     if (niob == 0 && sb.sync) {
446         return 0;
447     }
448 
449     return nxt_event_conn_io_writev(c, iob, niob);
450 }
451 
452 
453 ssize_t
nxt_event_conn_io_writev(nxt_conn_t * c,nxt_iobuf_t * iob,nxt_uint_t niob)454 nxt_event_conn_io_writev(nxt_conn_t *c, nxt_iobuf_t *iob, nxt_uint_t niob)
455 {
456     ssize_t    n;
457     nxt_err_t  err;
458 
459     if (niob == 1) {
460         /* Disposal of surplus kernel iovec copy-in operation. */
461         return nxt_event_conn_io_send(c, iob->iov_base, iob->iov_len);
462     }
463 
464     for ( ;; ) {
465         n = writev(c->socket.fd, iob, niob);
466 
467         err = (n == -1) ? nxt_socket_errno : 0;
468 
469         nxt_debug(c->socket.task, "writev(%d, %ui): %z", c->socket.fd, niob, n);
470 
471         if (n > 0) {
472             return n;
473         }
474 
475         /* n == -1 */
476 
477         switch (err) {
478 
479         case NXT_EAGAIN:
480             nxt_debug(c->socket.task, "writev() %E", err);
481             c->socket.write_ready = 0;
482             return NXT_AGAIN;
483 
484         case NXT_EINTR:
485             nxt_debug(c->socket.task, "writev() %E", err);
486             continue;
487 
488         default:
489             c->socket.error = err;
490             nxt_log(c->socket.task, nxt_socket_error_level(err),
491                     "writev(%d, %ui) failed %E", c->socket.fd, niob, err);
492             return NXT_ERROR;
493         }
494     }
495 }
496 
497 
498 ssize_t
nxt_event_conn_io_send(nxt_conn_t * c,void * buf,size_t size)499 nxt_event_conn_io_send(nxt_conn_t *c, void *buf, size_t size)
500 {
501     ssize_t    n;
502     nxt_err_t  err;
503 
504     for ( ;; ) {
505         n = send(c->socket.fd, buf, size, 0);
506 
507         err = (n == -1) ? nxt_socket_errno : 0;
508 
509         nxt_debug(c->socket.task, "send(%d, %p, %uz): %z",
510                   c->socket.fd, buf, size, n);
511 
512         if (n > 0) {
513             return n;
514         }
515 
516         /* n == -1 */
517 
518         switch (err) {
519 
520         case NXT_EAGAIN:
521             nxt_debug(c->socket.task, "send() %E", err);
522             c->socket.write_ready = 0;
523             return NXT_AGAIN;
524 
525         case NXT_EINTR:
526             nxt_debug(c->socket.task, "send() %E", err);
527             continue;
528 
529         default:
530             c->socket.error = err;
531             nxt_log(c->socket.task, nxt_socket_error_level(err),
532                     "send(%d, %p, %uz) failed %E",
533                     c->socket.fd, buf, size, err);
534             return NXT_ERROR;
535         }
536     }
537 }
538