1
2 /*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7 #include <nxt_main.h>
8
9
10 static void nxt_conn_write_timer_handler(nxt_task_t *task, void *obj,
11 void *data);
12 static ssize_t nxt_conn_io_sendfile(nxt_task_t *task, nxt_sendbuf_t *sb);
13 static ssize_t nxt_sendfile(int fd, int s, off_t pos, size_t size);
14
15
16 void
nxt_conn_io_write(nxt_task_t * task,void * obj,void * data)17 nxt_conn_io_write(nxt_task_t *task, void *obj, void *data)
18 {
19 ssize_t ret;
20 nxt_buf_t *b;
21 nxt_conn_t *c;
22 nxt_sendbuf_t sb;
23 nxt_event_engine_t *engine;
24
25 c = obj;
26
27 nxt_debug(task, "conn write fd:%d er:%d bl:%d",
28 c->socket.fd, c->socket.error, c->block_write);
29
30 if (c->socket.error != 0 || c->block_write) {
31 goto error;
32 }
33
34 if (!c->socket.write_ready || c->write == NULL) {
35 return;
36 }
37
38 engine = task->thread->engine;
39
40 c->socket.write_handler = nxt_conn_io_write;
41 c->socket.error_handler = c->write_state->error_handler;
42
43 b = c->write;
44
45 sb.socket = c->socket.fd;
46 sb.error = 0;
47 sb.sent = 0;
48 sb.size = 0;
49 sb.buf = b;
50 #if (NXT_TLS)
51 sb.tls = c->u.tls;
52 #endif
53 sb.limit = 10 * 1024 * 1024;
54 sb.ready = 1;
55 sb.sync = 0;
56
57 do {
58 ret = c->io->sendbuf(task, &sb);
59
60 c->socket.write_ready = sb.ready;
61 c->socket.error = sb.error;
62
63 if (ret < 0) {
64 /* ret == NXT_AGAIN || ret == NXT_ERROR. */
65 break;
66 }
67
68 sb.sent += ret;
69 sb.limit -= ret;
70
71 b = nxt_sendbuf_update(b, ret);
72
73 if (b == NULL) {
74 nxt_fd_event_block_write(engine, &c->socket);
75 break;
76 }
77
78 sb.buf = b;
79
80 if (!c->socket.write_ready) {
81 ret = NXT_AGAIN;
82 break;
83 }
84
85 } while (sb.limit != 0);
86
87 nxt_debug(task, "event conn: %z sent:%O", ret, sb.sent);
88
89 if (sb.sent != 0) {
90 if (c->write_state->timer_autoreset) {
91 nxt_timer_disable(engine, &c->write_timer);
92 }
93 }
94
95 if (ret != NXT_ERROR) {
96
97 if (sb.limit == 0) {
98 /*
99 * Postpone writing until next event poll to allow to
100 * process other recevied events and to get new events.
101 */
102 c->write_timer.handler = nxt_conn_write_timer_handler;
103 nxt_timer_add(engine, &c->write_timer, 0);
104
105 } else if (ret == NXT_AGAIN) {
106 /*
107 * SSL libraries can require to toggle either write or read
108 * event if renegotiation occurs during SSL write operation.
109 * This case is handled on the c->io->send() level. Timer
110 * can be set here because it should be set only for write
111 * direction.
112 */
113 nxt_conn_timer(engine, c, c->write_state, &c->write_timer);
114
115 if (nxt_fd_event_is_disabled(c->socket.write)) {
116 nxt_fd_event_enable_write(engine, &c->socket);
117 }
118 }
119 }
120
121 if (ret == 0 || sb.sent != 0) {
122 /*
123 * ret == 0 means a sync buffer was processed.
124 * ret == NXT_ERROR is ignored here if some data was sent,
125 * the error will be handled on the next nxt_conn_write() call.
126 */
127 c->sent += sb.sent;
128 nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler,
129 task, c, data);
130 return;
131 }
132
133 if (ret != NXT_ERROR) {
134 return;
135 }
136
137 nxt_fd_event_block_write(engine, &c->socket);
138
139 error:
140
141 nxt_work_queue_add(c->write_work_queue, c->write_state->error_handler,
142 task, c, data);
143 }
144
145
146 static void
nxt_conn_write_timer_handler(nxt_task_t * task,void * obj,void * data)147 nxt_conn_write_timer_handler(nxt_task_t *task, void *obj, void *data)
148 {
149 nxt_conn_t *c;
150 nxt_timer_t *timer;
151
152 timer = obj;
153
154 nxt_debug(task, "conn write timer");
155
156 c = nxt_write_timer_conn(timer);
157 c->delayed = 0;
158
159 c->io->write(task, c, c->socket.data);
160 }
161
162
163 ssize_t
nxt_conn_io_sendbuf(nxt_task_t * task,nxt_sendbuf_t * sb)164 nxt_conn_io_sendbuf(nxt_task_t *task, nxt_sendbuf_t *sb)
165 {
166 nxt_uint_t niov;
167 struct iovec iov[NXT_IOBUF_MAX];
168
169 niov = nxt_sendbuf_mem_coalesce0(task, sb, iov, NXT_IOBUF_MAX);
170
171 if (niov == 0 && sb->sync) {
172 return 0;
173 }
174
175 if (niov == 0 && nxt_buf_is_file(sb->buf)) {
176 return nxt_conn_io_sendfile(task, sb);
177 }
178
179 return nxt_conn_io_writev(task, sb, iov, niov);
180 }
181
182
183 static ssize_t
nxt_conn_io_sendfile(nxt_task_t * task,nxt_sendbuf_t * sb)184 nxt_conn_io_sendfile(nxt_task_t *task, nxt_sendbuf_t *sb)
185 {
186 size_t size;
187 ssize_t n;
188 nxt_buf_t *b;
189 nxt_err_t err;
190
191 b = sb->buf;
192
193 for ( ;; ) {
194 size = b->file_end - b->file_pos;
195
196 n = nxt_sendfile(b->file->fd, sb->socket, b->file_pos, size);
197
198 err = (n == -1) ? nxt_errno : 0;
199
200 nxt_debug(task, "sendfile(%FD, %d, @%O, %uz): %z",
201 b->file->fd, sb->socket, b->file_pos, size, n);
202
203 if (n > 0) {
204 if (n < (ssize_t) size) {
205 sb->ready = 0;
206 }
207
208 return n;
209 }
210
211 if (nxt_slow_path(n == 0)) {
212 nxt_alert(task, "sendfile() reported that file was truncated at %O",
213 b->file_pos);
214
215 return NXT_ERROR;
216 }
217
218 /* n == -1 */
219
220 switch (err) {
221
222 case NXT_EAGAIN:
223 sb->ready = 0;
224 nxt_debug(task, "sendfile() %E", err);
225
226 return NXT_AGAIN;
227
228 case NXT_EINTR:
229 nxt_debug(task, "sendfile() %E", err);
230 continue;
231
232 default:
233 sb->error = err;
234 nxt_log(task, nxt_socket_error_level(err),
235 "sendfile(%FD, %d, @%O, %uz) failed %E",
236 b->file->fd, sb->socket, b->file_pos, size, err);
237
238 return NXT_ERROR;
239 }
240 }
241 }
242
243
244 static ssize_t
nxt_sendfile(int fd,int s,off_t pos,size_t size)245 nxt_sendfile(int fd, int s, off_t pos, size_t size)
246 {
247 ssize_t res;
248
249 #if (NXT_HAVE_MACOSX_SENDFILE)
250
251 off_t sent = size;
252
253 int rc = sendfile(fd, s, pos, &sent, NULL, 0);
254
255 res = (rc == 0 || sent > 0) ? sent : -1;
256
257 #elif (NXT_HAVE_FREEBSD_SENDFILE)
258
259 off_t sent = 0;
260
261 int rc = sendfile(fd, s, pos, size, NULL, &sent, 0);
262
263 res = (rc == 0 || sent > 0) ? sent : -1;
264
265 #elif (NXT_HAVE_LINUX_SENDFILE)
266
267 res = sendfile(s, fd, &pos, size);
268
269 #else
270
271 int err;
272 void *map;
273 off_t page_off;
274
275 page_off = pos % nxt_pagesize;
276
277 map = nxt_mem_mmap(NULL, size + page_off, PROT_READ, MAP_SHARED, fd,
278 pos - page_off);
279 if (nxt_slow_path(map == MAP_FAILED)) {
280 return -1;
281 }
282
283 res = write(s, nxt_pointer_to(map, page_off), size);
284
285 /* Backup and restore errno to catch socket errors in the upper level. */
286 err = errno;
287 nxt_mem_munmap(map, size + page_off);
288 errno = err;
289
290 #endif
291
292 return res;
293 }
294
295
296 ssize_t
nxt_conn_io_writev(nxt_task_t * task,nxt_sendbuf_t * sb,struct iovec * iov,nxt_uint_t niov)297 nxt_conn_io_writev(nxt_task_t *task, nxt_sendbuf_t *sb, struct iovec *iov,
298 nxt_uint_t niov)
299 {
300 ssize_t n;
301 nxt_err_t err;
302
303 if (niov == 1) {
304 /* Disposal of surplus kernel iovec copy-in operation. */
305 return nxt_conn_io_send(task, sb, iov[0].iov_base, iov[0].iov_len);
306 }
307
308 for ( ;; ) {
309 n = writev(sb->socket, iov, niov);
310
311 err = (n == -1) ? nxt_socket_errno : 0;
312
313 nxt_debug(task, "writev(%d, %ui): %z", sb->socket, niov, n);
314
315 if (n > 0) {
316 return n;
317 }
318
319 /* n == -1 */
320
321 switch (err) {
322
323 case NXT_EAGAIN:
324 sb->ready = 0;
325 nxt_debug(task, "writev() %E", err);
326
327 return NXT_AGAIN;
328
329 case NXT_EINTR:
330 nxt_debug(task, "writev() %E", err);
331 continue;
332
333 default:
334 sb->error = err;
335 nxt_log(task, nxt_socket_error_level(err),
336 "writev(%d, %ui) failed %E", sb->socket, niov, err);
337
338 return NXT_ERROR;
339 }
340 }
341 }
342
343
344 ssize_t
nxt_conn_io_send(nxt_task_t * task,nxt_sendbuf_t * sb,void * buf,size_t size)345 nxt_conn_io_send(nxt_task_t *task, nxt_sendbuf_t *sb, void *buf, size_t size)
346 {
347 ssize_t n;
348 nxt_err_t err;
349
350 for ( ;; ) {
351 n = send(sb->socket, buf, size, 0);
352
353 err = (n == -1) ? nxt_socket_errno : 0;
354
355 nxt_debug(task, "send(%d, %p, %uz): %z", sb->socket, buf, size, n);
356
357 if (n > 0) {
358 return n;
359 }
360
361 /* n == -1 */
362
363 switch (err) {
364
365 case NXT_EAGAIN:
366 sb->ready = 0;
367 nxt_debug(task, "send() %E", err);
368
369 return NXT_AGAIN;
370
371 case NXT_EINTR:
372 nxt_debug(task, "send() %E", err);
373 continue;
374
375 default:
376 sb->error = err;
377 nxt_log(task, nxt_socket_error_level(err),
378 "send(%d, %p, %uz) failed %E", sb->socket, buf, size, err);
379
380 return NXT_ERROR;
381 }
382 }
383 }
384
385
386 /* Obsolete interfaces. */
387
388 size_t
nxt_event_conn_write_limit(nxt_conn_t * c)389 nxt_event_conn_write_limit(nxt_conn_t *c)
390 {
391 ssize_t limit, correction;
392 nxt_event_write_rate_t *rate;
393
394 rate = c->rate;
395
396 if (rate == NULL) {
397 return c->max_chunk;
398 }
399
400 limit = rate->limit;
401 correction = limit - (size_t) rate->average;
402
403 nxt_debug(c->socket.task, "event conn correction:%z average:%0.3f",
404 correction, rate->average);
405
406 limit += correction;
407
408 if (limit <= 0) {
409 return 0;
410 }
411
412 if (rate->limit_after != 0) {
413 limit += rate->limit_after;
414 limit = nxt_min((size_t) limit, rate->max_limit);
415 }
416
417 return nxt_min((size_t) limit, c->max_chunk);
418 }
419
420
421 nxt_bool_t
nxt_event_conn_write_delayed(nxt_event_engine_t * engine,nxt_conn_t * c,size_t sent)422 nxt_event_conn_write_delayed(nxt_event_engine_t *engine, nxt_conn_t *c,
423 size_t sent)
424 {
425 return 0;
426 }
427
428
429 ssize_t
nxt_event_conn_io_sendbuf(nxt_conn_t * c,nxt_buf_t * b,size_t limit)430 nxt_event_conn_io_sendbuf(nxt_conn_t *c, nxt_buf_t *b, size_t limit)
431 {
432 nxt_uint_t niob;
433 struct iovec iob[NXT_IOBUF_MAX];
434 nxt_sendbuf_coalesce_t sb;
435
436 sb.buf = b;
437 sb.iobuf = iob;
438 sb.nmax = NXT_IOBUF_MAX;
439 sb.sync = 0;
440 sb.size = 0;
441 sb.limit = limit;
442
443 niob = nxt_sendbuf_mem_coalesce(c->socket.task, &sb);
444
445 if (niob == 0 && sb.sync) {
446 return 0;
447 }
448
449 return nxt_event_conn_io_writev(c, iob, niob);
450 }
451
452
453 ssize_t
nxt_event_conn_io_writev(nxt_conn_t * c,nxt_iobuf_t * iob,nxt_uint_t niob)454 nxt_event_conn_io_writev(nxt_conn_t *c, nxt_iobuf_t *iob, nxt_uint_t niob)
455 {
456 ssize_t n;
457 nxt_err_t err;
458
459 if (niob == 1) {
460 /* Disposal of surplus kernel iovec copy-in operation. */
461 return nxt_event_conn_io_send(c, iob->iov_base, iob->iov_len);
462 }
463
464 for ( ;; ) {
465 n = writev(c->socket.fd, iob, niob);
466
467 err = (n == -1) ? nxt_socket_errno : 0;
468
469 nxt_debug(c->socket.task, "writev(%d, %ui): %z", c->socket.fd, niob, n);
470
471 if (n > 0) {
472 return n;
473 }
474
475 /* n == -1 */
476
477 switch (err) {
478
479 case NXT_EAGAIN:
480 nxt_debug(c->socket.task, "writev() %E", err);
481 c->socket.write_ready = 0;
482 return NXT_AGAIN;
483
484 case NXT_EINTR:
485 nxt_debug(c->socket.task, "writev() %E", err);
486 continue;
487
488 default:
489 c->socket.error = err;
490 nxt_log(c->socket.task, nxt_socket_error_level(err),
491 "writev(%d, %ui) failed %E", c->socket.fd, niob, err);
492 return NXT_ERROR;
493 }
494 }
495 }
496
497
498 ssize_t
nxt_event_conn_io_send(nxt_conn_t * c,void * buf,size_t size)499 nxt_event_conn_io_send(nxt_conn_t *c, void *buf, size_t size)
500 {
501 ssize_t n;
502 nxt_err_t err;
503
504 for ( ;; ) {
505 n = send(c->socket.fd, buf, size, 0);
506
507 err = (n == -1) ? nxt_socket_errno : 0;
508
509 nxt_debug(c->socket.task, "send(%d, %p, %uz): %z",
510 c->socket.fd, buf, size, n);
511
512 if (n > 0) {
513 return n;
514 }
515
516 /* n == -1 */
517
518 switch (err) {
519
520 case NXT_EAGAIN:
521 nxt_debug(c->socket.task, "send() %E", err);
522 c->socket.write_ready = 0;
523 return NXT_AGAIN;
524
525 case NXT_EINTR:
526 nxt_debug(c->socket.task, "send() %E", err);
527 continue;
528
529 default:
530 c->socket.error = err;
531 nxt_log(c->socket.task, nxt_socket_error_level(err),
532 "send(%d, %p, %uz) failed %E",
533 c->socket.fd, buf, size, err);
534 return NXT_ERROR;
535 }
536 }
537 }
538