1
2 /*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7 #include <nxt_main.h>
8
9
10 static void nxt_conn_write_timer_handler(nxt_task_t *task, void *obj,
11 void *data);
12 static ssize_t nxt_conn_io_sendfile(nxt_task_t *task, nxt_sendbuf_t *sb);
13 static ssize_t nxt_sendfile(int fd, int s, off_t pos, size_t size);
14
15
16 void
nxt_conn_io_write(nxt_task_t * task,void * obj,void * data)17 nxt_conn_io_write(nxt_task_t *task, void *obj, void *data)
18 {
19 ssize_t ret;
20 nxt_buf_t *b;
21 nxt_conn_t *c;
22 nxt_sendbuf_t sb;
23 nxt_event_engine_t *engine;
24
25 c = obj;
26
27 nxt_debug(task, "conn write fd:%d er:%d bl:%d",
28 c->socket.fd, c->socket.error, c->block_write);
29
30 if (c->socket.error != 0 || c->block_write) {
31 goto error;
32 }
33
34 if (!c->socket.write_ready || c->write == NULL) {
35 return;
36 }
37
38 engine = task->thread->engine;
39
40 c->socket.write_handler = nxt_conn_io_write;
41 c->socket.error_handler = c->write_state->error_handler;
42
43 b = c->write;
44
45 sb.socket = c->socket.fd;
46 sb.error = 0;
47 sb.sent = 0;
48 sb.size = 0;
49 sb.buf = b;
50 #if (NXT_TLS)
51 sb.tls = c->u.tls;
52 #endif
53 sb.limit = 10 * 1024 * 1024;
54 sb.ready = 1;
55 sb.sync = 0;
56
57 do {
58 ret = c->io->sendbuf(task, &sb);
59
60 c->socket.write_ready = sb.ready;
61 c->socket.error = sb.error;
62
63 if (ret < 0) {
64 /* ret == NXT_AGAIN || ret == NXT_ERROR. */
65 break;
66 }
67
68 sb.sent += ret;
69 sb.limit -= ret;
70
71 b = nxt_sendbuf_update(b, ret);
72
73 if (b == NULL) {
74 nxt_fd_event_block_write(engine, &c->socket);
75 break;
76 }
77
78 sb.buf = b;
79
80 if (!c->socket.write_ready) {
81 ret = NXT_AGAIN;
82 break;
83 }
84
85 } while (sb.limit != 0);
86
87 nxt_debug(task, "event conn: %z sent:%O", ret, sb.sent);
88
89 if (sb.sent != 0) {
90 if (c->write_state->timer_autoreset) {
91 nxt_timer_disable(engine, &c->write_timer);
92 }
93 }
94
95 if (ret != NXT_ERROR) {
96
97 if (sb.limit == 0) {
98 /*
99 * Postpone writing until next event poll to allow to
100 * process other received events and to get new events.
101 */
102 c->write_timer.handler = nxt_conn_write_timer_handler;
103 nxt_timer_add(engine, &c->write_timer, 0);
104
105 } else if (ret == NXT_AGAIN) {
106 /*
107 * SSL libraries can require to toggle either write or read
108 * event if renegotiation occurs during SSL write operation.
109 * This case is handled on the c->io->send() level. Timer
110 * can be set here because it should be set only for write
111 * direction.
112 */
113 nxt_conn_timer(engine, c, c->write_state, &c->write_timer);
114
115 if (nxt_fd_event_is_disabled(c->socket.write)) {
116 nxt_fd_event_enable_write(engine, &c->socket);
117 }
118 }
119 }
120
121 if (ret == 0 || sb.sent != 0) {
122 /*
123 * ret == 0 means a sync buffer was processed.
124 * ret == NXT_ERROR is ignored here if some data was sent,
125 * the error will be handled on the next nxt_conn_write() call.
126 */
127 c->sent += sb.sent;
128 nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler,
129 task, c, data);
130 return;
131 }
132
133 if (ret != NXT_ERROR) {
134 return;
135 }
136
137 nxt_fd_event_block_write(engine, &c->socket);
138
139 error:
140
141 nxt_work_queue_add(c->write_work_queue, c->write_state->error_handler,
142 task, c, data);
143 }
144
145
146 static void
nxt_conn_write_timer_handler(nxt_task_t * task,void * obj,void * data)147 nxt_conn_write_timer_handler(nxt_task_t *task, void *obj, void *data)
148 {
149 nxt_conn_t *c;
150 nxt_timer_t *timer;
151
152 timer = obj;
153
154 nxt_debug(task, "conn write timer");
155
156 c = nxt_write_timer_conn(timer);
157 c->delayed = 0;
158
159 c->io->write(task, c, c->socket.data);
160 }
161
162
163 ssize_t
nxt_conn_io_sendbuf(nxt_task_t * task,nxt_sendbuf_t * sb)164 nxt_conn_io_sendbuf(nxt_task_t *task, nxt_sendbuf_t *sb)
165 {
166 nxt_uint_t niov;
167 struct iovec iov[NXT_IOBUF_MAX];
168
169 niov = nxt_sendbuf_mem_coalesce0(task, sb, iov, NXT_IOBUF_MAX);
170
171 if (niov == 0 && sb->sync) {
172 return 0;
173 }
174
175 /*
176 * XXX Temporary fix for <https://github.com/nginx/unit/issues/1125>
177 */
178 if (niov == 0 && sb->buf == NULL) {
179 return 0;
180 }
181
182 if (niov == 0 && nxt_buf_is_file(sb->buf)) {
183 return nxt_conn_io_sendfile(task, sb);
184 }
185
186 return nxt_conn_io_writev(task, sb, iov, niov);
187 }
188
189
190 static ssize_t
nxt_conn_io_sendfile(nxt_task_t * task,nxt_sendbuf_t * sb)191 nxt_conn_io_sendfile(nxt_task_t *task, nxt_sendbuf_t *sb)
192 {
193 size_t size;
194 ssize_t n;
195 nxt_buf_t *b;
196 nxt_err_t err;
197
198 b = sb->buf;
199
200 for ( ;; ) {
201 size = b->file_end - b->file_pos;
202
203 n = nxt_sendfile(b->file->fd, sb->socket, b->file_pos, size);
204
205 err = (n == -1) ? nxt_errno : 0;
206
207 nxt_debug(task, "sendfile(%FD, %d, @%O, %uz): %z",
208 b->file->fd, sb->socket, b->file_pos, size, n);
209
210 if (n > 0) {
211 if (n < (ssize_t) size) {
212 sb->ready = 0;
213 }
214
215 return n;
216 }
217
218 if (nxt_slow_path(n == 0)) {
219 nxt_alert(task, "sendfile() reported that file was truncated at %O",
220 b->file_pos);
221
222 return NXT_ERROR;
223 }
224
225 /* n == -1 */
226
227 switch (err) {
228
229 case NXT_EAGAIN:
230 sb->ready = 0;
231 nxt_debug(task, "sendfile() %E", err);
232
233 return NXT_AGAIN;
234
235 case NXT_EINTR:
236 nxt_debug(task, "sendfile() %E", err);
237 continue;
238
239 default:
240 sb->error = err;
241 nxt_log(task, nxt_socket_error_level(err),
242 "sendfile(%FD, %d, @%O, %uz) failed %E",
243 b->file->fd, sb->socket, b->file_pos, size, err);
244
245 return NXT_ERROR;
246 }
247 }
248 }
249
250
251 static ssize_t
nxt_sendfile(int fd,int s,off_t pos,size_t size)252 nxt_sendfile(int fd, int s, off_t pos, size_t size)
253 {
254 ssize_t res;
255
256 #if (NXT_HAVE_MACOSX_SENDFILE)
257
258 off_t sent = size;
259
260 int rc = sendfile(fd, s, pos, &sent, NULL, 0);
261
262 res = (rc == 0 || sent > 0) ? sent : -1;
263
264 #elif (NXT_HAVE_FREEBSD_SENDFILE)
265
266 off_t sent = 0;
267
268 int rc = sendfile(fd, s, pos, size, NULL, &sent, 0);
269
270 res = (rc == 0 || sent > 0) ? sent : -1;
271
272 #elif (NXT_HAVE_LINUX_SENDFILE)
273
274 res = sendfile(s, fd, &pos, size);
275
276 #else
277
278 int err;
279 void *map;
280 off_t page_off;
281
282 page_off = pos % nxt_pagesize;
283
284 map = nxt_mem_mmap(NULL, size + page_off, PROT_READ, MAP_SHARED, fd,
285 pos - page_off);
286 if (nxt_slow_path(map == MAP_FAILED)) {
287 return -1;
288 }
289
290 res = write(s, nxt_pointer_to(map, page_off), size);
291
292 /* Backup and restore errno to catch socket errors in the upper level. */
293 err = errno;
294 nxt_mem_munmap(map, size + page_off);
295 errno = err;
296
297 #endif
298
299 return res;
300 }
301
302
303 ssize_t
nxt_conn_io_writev(nxt_task_t * task,nxt_sendbuf_t * sb,struct iovec * iov,nxt_uint_t niov)304 nxt_conn_io_writev(nxt_task_t *task, nxt_sendbuf_t *sb, struct iovec *iov,
305 nxt_uint_t niov)
306 {
307 ssize_t n;
308 nxt_err_t err;
309
310 if (niov == 1) {
311 /* Disposal of surplus kernel iovec copy-in operation. */
312 return nxt_conn_io_send(task, sb, iov[0].iov_base, iov[0].iov_len);
313 }
314
315 for ( ;; ) {
316 n = writev(sb->socket, iov, niov);
317
318 err = (n == -1) ? nxt_socket_errno : 0;
319
320 nxt_debug(task, "writev(%d, %ui): %z", sb->socket, niov, n);
321
322 if (n > 0) {
323 return n;
324 }
325
326 /* n == -1 */
327
328 switch (err) {
329
330 case NXT_EAGAIN:
331 sb->ready = 0;
332 nxt_debug(task, "writev() %E", err);
333
334 return NXT_AGAIN;
335
336 case NXT_EINTR:
337 nxt_debug(task, "writev() %E", err);
338 continue;
339
340 default:
341 sb->error = err;
342 nxt_log(task, nxt_socket_error_level(err),
343 "writev(%d, %ui) failed %E", sb->socket, niov, err);
344
345 return NXT_ERROR;
346 }
347 }
348 }
349
350
351 ssize_t
nxt_conn_io_send(nxt_task_t * task,nxt_sendbuf_t * sb,void * buf,size_t size)352 nxt_conn_io_send(nxt_task_t *task, nxt_sendbuf_t *sb, void *buf, size_t size)
353 {
354 ssize_t n;
355 nxt_err_t err;
356
357 for ( ;; ) {
358 n = send(sb->socket, buf, size, 0);
359
360 err = (n == -1) ? nxt_socket_errno : 0;
361
362 nxt_debug(task, "send(%d, %p, %uz): %z", sb->socket, buf, size, n);
363
364 if (n > 0) {
365 return n;
366 }
367
368 /* n == -1 */
369
370 switch (err) {
371
372 case NXT_EAGAIN:
373 sb->ready = 0;
374 nxt_debug(task, "send() %E", err);
375
376 return NXT_AGAIN;
377
378 case NXT_EINTR:
379 nxt_debug(task, "send() %E", err);
380 continue;
381
382 default:
383 sb->error = err;
384 nxt_log(task, nxt_socket_error_level(err),
385 "send(%d, %p, %uz) failed %E", sb->socket, buf, size, err);
386
387 return NXT_ERROR;
388 }
389 }
390 }
391
392
393 /* Obsolete interfaces. */
394
395 size_t
nxt_event_conn_write_limit(nxt_conn_t * c)396 nxt_event_conn_write_limit(nxt_conn_t *c)
397 {
398 ssize_t limit, correction;
399 nxt_event_write_rate_t *rate;
400
401 rate = c->rate;
402
403 if (rate == NULL) {
404 return c->max_chunk;
405 }
406
407 limit = rate->limit;
408 correction = limit - (size_t) rate->average;
409
410 nxt_debug(c->socket.task, "event conn correction:%z average:%0.3f",
411 correction, rate->average);
412
413 limit += correction;
414
415 if (limit <= 0) {
416 return 0;
417 }
418
419 if (rate->limit_after != 0) {
420 limit += rate->limit_after;
421 limit = nxt_min((size_t) limit, rate->max_limit);
422 }
423
424 return nxt_min((size_t) limit, c->max_chunk);
425 }
426
427
428 nxt_bool_t
nxt_event_conn_write_delayed(nxt_event_engine_t * engine,nxt_conn_t * c,size_t sent)429 nxt_event_conn_write_delayed(nxt_event_engine_t *engine, nxt_conn_t *c,
430 size_t sent)
431 {
432 return 0;
433 }
434
435
436 ssize_t
nxt_event_conn_io_sendbuf(nxt_conn_t * c,nxt_buf_t * b,size_t limit)437 nxt_event_conn_io_sendbuf(nxt_conn_t *c, nxt_buf_t *b, size_t limit)
438 {
439 nxt_uint_t niob;
440 struct iovec iob[NXT_IOBUF_MAX];
441 nxt_sendbuf_coalesce_t sb;
442
443 sb.buf = b;
444 sb.iobuf = iob;
445 sb.nmax = NXT_IOBUF_MAX;
446 sb.sync = 0;
447 sb.size = 0;
448 sb.limit = limit;
449
450 niob = nxt_sendbuf_mem_coalesce(c->socket.task, &sb);
451
452 if (niob == 0 && sb.sync) {
453 return 0;
454 }
455
456 return nxt_event_conn_io_writev(c, iob, niob);
457 }
458
459
460 ssize_t
nxt_event_conn_io_writev(nxt_conn_t * c,nxt_iobuf_t * iob,nxt_uint_t niob)461 nxt_event_conn_io_writev(nxt_conn_t *c, nxt_iobuf_t *iob, nxt_uint_t niob)
462 {
463 ssize_t n;
464 nxt_err_t err;
465
466 if (niob == 1) {
467 /* Disposal of surplus kernel iovec copy-in operation. */
468 return nxt_event_conn_io_send(c, iob->iov_base, iob->iov_len);
469 }
470
471 for ( ;; ) {
472 n = writev(c->socket.fd, iob, niob);
473
474 err = (n == -1) ? nxt_socket_errno : 0;
475
476 nxt_debug(c->socket.task, "writev(%d, %ui): %z", c->socket.fd, niob, n);
477
478 if (n > 0) {
479 return n;
480 }
481
482 /* n == -1 */
483
484 switch (err) {
485
486 case NXT_EAGAIN:
487 nxt_debug(c->socket.task, "writev() %E", err);
488 c->socket.write_ready = 0;
489 return NXT_AGAIN;
490
491 case NXT_EINTR:
492 nxt_debug(c->socket.task, "writev() %E", err);
493 continue;
494
495 default:
496 c->socket.error = err;
497 nxt_log(c->socket.task, nxt_socket_error_level(err),
498 "writev(%d, %ui) failed %E", c->socket.fd, niob, err);
499 return NXT_ERROR;
500 }
501 }
502 }
503
504
505 ssize_t
nxt_event_conn_io_send(nxt_conn_t * c,void * buf,size_t size)506 nxt_event_conn_io_send(nxt_conn_t *c, void *buf, size_t size)
507 {
508 ssize_t n;
509 nxt_err_t err;
510
511 for ( ;; ) {
512 n = send(c->socket.fd, buf, size, 0);
513
514 err = (n == -1) ? nxt_socket_errno : 0;
515
516 nxt_debug(c->socket.task, "send(%d, %p, %uz): %z",
517 c->socket.fd, buf, size, n);
518
519 if (n > 0) {
520 return n;
521 }
522
523 /* n == -1 */
524
525 switch (err) {
526
527 case NXT_EAGAIN:
528 nxt_debug(c->socket.task, "send() %E", err);
529 c->socket.write_ready = 0;
530 return NXT_AGAIN;
531
532 case NXT_EINTR:
533 nxt_debug(c->socket.task, "send() %E", err);
534 continue;
535
536 default:
537 c->socket.error = err;
538 nxt_log(c->socket.task, nxt_socket_error_level(err),
539 "send(%d, %p, %uz) failed %E",
540 c->socket.fd, buf, size, err);
541 return NXT_ERROR;
542 }
543 }
544 }
545