10Sigor@sysoev.ru 
20Sigor@sysoev.ru /*
30Sigor@sysoev.ru  * Copyright (C) Igor Sysoev
40Sigor@sysoev.ru  * Copyright (C) NGINX, Inc.
50Sigor@sysoev.ru  */
60Sigor@sysoev.ru 
70Sigor@sysoev.ru #include <nxt_main.h>
80Sigor@sysoev.ru 
90Sigor@sysoev.ru 
100Sigor@sysoev.ru #if (NXT_THREADS)
110Sigor@sysoev.ru 
120Sigor@sysoev.ru typedef struct {
130Sigor@sysoev.ru     nxt_job_t           job;
140Sigor@sysoev.ru     nxt_buf_t           *out;
150Sigor@sysoev.ru     size_t              sent;
160Sigor@sysoev.ru     size_t              limit;
170Sigor@sysoev.ru     nxt_work_handler_t  ready_handler;
180Sigor@sysoev.ru } nxt_job_sendfile_t;
190Sigor@sysoev.ru 
200Sigor@sysoev.ru 
21*1Sigor@sysoev.ru static void nxt_event_conn_job_sendfile_start(nxt_task_t *task, void *obj,
220Sigor@sysoev.ru     void *data);
23*1Sigor@sysoev.ru static void nxt_event_conn_job_sendfile_handler(nxt_task_t *task, void *obj,
240Sigor@sysoev.ru     void *data);
25*1Sigor@sysoev.ru static void nxt_event_conn_job_sendfile_return(nxt_task_t *task, void *obj,
260Sigor@sysoev.ru     void *data);
27*1Sigor@sysoev.ru static nxt_buf_t *nxt_event_conn_job_sendfile_completion(nxt_task_t *task,
280Sigor@sysoev.ru     nxt_event_conn_t *c, nxt_buf_t *b);
290Sigor@sysoev.ru 
300Sigor@sysoev.ru 
310Sigor@sysoev.ru void
32*1Sigor@sysoev.ru nxt_event_conn_job_sendfile(nxt_task_t *task, nxt_event_conn_t *c)
330Sigor@sysoev.ru {
34*1Sigor@sysoev.ru     nxt_event_fd_disable(task->thread->engine, &c->socket);
350Sigor@sysoev.ru 
360Sigor@sysoev.ru     /* A work item data is not used in nxt_event_conn_job_sendfile_start(). */
37*1Sigor@sysoev.ru     nxt_event_conn_job_sendfile_start(task, c, NULL);
380Sigor@sysoev.ru }
390Sigor@sysoev.ru 
400Sigor@sysoev.ru 
410Sigor@sysoev.ru static void
42*1Sigor@sysoev.ru nxt_event_conn_job_sendfile_start(nxt_task_t *task, void *obj, void *data)
430Sigor@sysoev.ru {
440Sigor@sysoev.ru     nxt_iobuf_t             b;
450Sigor@sysoev.ru     nxt_event_conn_t        *c;
460Sigor@sysoev.ru     nxt_job_sendfile_t      *jbs;
470Sigor@sysoev.ru     nxt_sendbuf_coalesce_t  sb;
480Sigor@sysoev.ru 
490Sigor@sysoev.ru     c = obj;
500Sigor@sysoev.ru 
51*1Sigor@sysoev.ru     nxt_debug(task, "event conn sendfile fd:%d", c->socket.fd);
520Sigor@sysoev.ru 
530Sigor@sysoev.ru     jbs = nxt_job_create(c->mem_pool, sizeof(nxt_job_sendfile_t));
540Sigor@sysoev.ru 
550Sigor@sysoev.ru     if (nxt_slow_path(jbs == NULL)) {
56*1Sigor@sysoev.ru         c->write_state->error_handler(task, c, NULL);
570Sigor@sysoev.ru         return;
580Sigor@sysoev.ru     }
590Sigor@sysoev.ru 
600Sigor@sysoev.ru     c->socket.write_handler = nxt_event_conn_job_sendfile_start;
610Sigor@sysoev.ru     c->socket.error_handler = c->write_state->error_handler;
620Sigor@sysoev.ru 
630Sigor@sysoev.ru     jbs->job.data = c;
640Sigor@sysoev.ru     nxt_job_set_name(&jbs->job, "job sendfile");
650Sigor@sysoev.ru 
660Sigor@sysoev.ru     jbs->limit = nxt_event_conn_write_limit(c);
670Sigor@sysoev.ru 
680Sigor@sysoev.ru     if (jbs->limit != 0) {
690Sigor@sysoev.ru 
700Sigor@sysoev.ru         sb.buf = c->write;
710Sigor@sysoev.ru         sb.iobuf = &b;
720Sigor@sysoev.ru         sb.nmax = 1;
730Sigor@sysoev.ru         sb.sync = 0;
740Sigor@sysoev.ru         sb.size = 0;
750Sigor@sysoev.ru         sb.limit = jbs->limit;
760Sigor@sysoev.ru 
77*1Sigor@sysoev.ru         if (nxt_sendbuf_mem_coalesce(c->socket.task, &sb) != 0 || !sb.sync) {
780Sigor@sysoev.ru 
790Sigor@sysoev.ru             jbs->job.thread_pool = c->u.thread_pool;
800Sigor@sysoev.ru             jbs->job.log = c->socket.log;
810Sigor@sysoev.ru             jbs->out = c->write;
820Sigor@sysoev.ru             c->write = NULL;
830Sigor@sysoev.ru             jbs->ready_handler = nxt_event_conn_job_sendfile_return;
840Sigor@sysoev.ru 
850Sigor@sysoev.ru             c->blocked = 1;
860Sigor@sysoev.ru 
870Sigor@sysoev.ru             if (c->write_timer.state != NXT_EVENT_TIMER_DISABLED) {
880Sigor@sysoev.ru                 c->write_timer.state = NXT_EVENT_TIMER_BLOCKED;
890Sigor@sysoev.ru             }
900Sigor@sysoev.ru 
91*1Sigor@sysoev.ru             nxt_job_start(task, &jbs->job, nxt_event_conn_job_sendfile_handler);
920Sigor@sysoev.ru             return;
930Sigor@sysoev.ru         }
940Sigor@sysoev.ru     }
950Sigor@sysoev.ru 
96*1Sigor@sysoev.ru     nxt_event_conn_job_sendfile_return(task, jbs, c);
970Sigor@sysoev.ru }
980Sigor@sysoev.ru 
990Sigor@sysoev.ru 
1000Sigor@sysoev.ru static void
101*1Sigor@sysoev.ru nxt_event_conn_job_sendfile_handler(nxt_task_t *task, void *obj, void *data)
1020Sigor@sysoev.ru {
1030Sigor@sysoev.ru     ssize_t             ret;
1040Sigor@sysoev.ru     nxt_buf_t           *b;
1050Sigor@sysoev.ru     nxt_bool_t          first;
1060Sigor@sysoev.ru     nxt_event_conn_t    *c;
1070Sigor@sysoev.ru     nxt_job_sendfile_t  *jbs;
1080Sigor@sysoev.ru 
1090Sigor@sysoev.ru     jbs = obj;
1100Sigor@sysoev.ru     c = data;
1110Sigor@sysoev.ru 
112*1Sigor@sysoev.ru     nxt_debug(task, "event conn job sendfile fd:%d", c->socket.fd);
1130Sigor@sysoev.ru 
1140Sigor@sysoev.ru     first = c->socket.write_ready;
1150Sigor@sysoev.ru     b = jbs->out;
1160Sigor@sysoev.ru 
1170Sigor@sysoev.ru     do {
1180Sigor@sysoev.ru         ret = c->io->sendbuf(c, b, jbs->limit);
1190Sigor@sysoev.ru 
1200Sigor@sysoev.ru         if (ret == NXT_AGAIN) {
1210Sigor@sysoev.ru             break;
1220Sigor@sysoev.ru         }
1230Sigor@sysoev.ru 
1240Sigor@sysoev.ru         if (nxt_slow_path(ret == NXT_ERROR)) {
1250Sigor@sysoev.ru             goto done;
1260Sigor@sysoev.ru         }
1270Sigor@sysoev.ru 
1280Sigor@sysoev.ru         jbs->sent += ret;
1290Sigor@sysoev.ru         jbs->limit -= ret;
1300Sigor@sysoev.ru 
1310Sigor@sysoev.ru         b = nxt_sendbuf_update(b, ret);
1320Sigor@sysoev.ru 
1330Sigor@sysoev.ru         if (b == NULL) {
1340Sigor@sysoev.ru             goto done;
1350Sigor@sysoev.ru         }
1360Sigor@sysoev.ru 
1370Sigor@sysoev.ru         if (jbs->limit == 0) {
1380Sigor@sysoev.ru 
1390Sigor@sysoev.ru             if (c->rate == NULL) {
1400Sigor@sysoev.ru                 jbs->limit = c->max_chunk;
1410Sigor@sysoev.ru                 goto fast;
1420Sigor@sysoev.ru             }
1430Sigor@sysoev.ru 
1440Sigor@sysoev.ru             goto done;
1450Sigor@sysoev.ru         }
1460Sigor@sysoev.ru 
1470Sigor@sysoev.ru     } while (c->socket.write_ready);
1480Sigor@sysoev.ru 
149*1Sigor@sysoev.ru     if (first && task->thread->thread_pool->work_queue.head != NULL) {
1500Sigor@sysoev.ru         goto fast;
1510Sigor@sysoev.ru     }
1520Sigor@sysoev.ru 
1530Sigor@sysoev.ru done:
1540Sigor@sysoev.ru 
155*1Sigor@sysoev.ru     nxt_job_return(task, &jbs->job, jbs->ready_handler);
1560Sigor@sysoev.ru     return;
1570Sigor@sysoev.ru 
1580Sigor@sysoev.ru fast:
1590Sigor@sysoev.ru 
160*1Sigor@sysoev.ru     nxt_thread_pool_post(task->thread->thread_pool,
161*1Sigor@sysoev.ru                          nxt_event_conn_job_sendfile_handler,
162*1Sigor@sysoev.ru                          &jbs->job.task, jbs, c);
1630Sigor@sysoev.ru }
1640Sigor@sysoev.ru 
1650Sigor@sysoev.ru 
1660Sigor@sysoev.ru static void
167*1Sigor@sysoev.ru nxt_event_conn_job_sendfile_return(nxt_task_t *task, void *obj, void *data)
1680Sigor@sysoev.ru {
1690Sigor@sysoev.ru     size_t              sent;
1700Sigor@sysoev.ru     nxt_buf_t           *b;
1710Sigor@sysoev.ru     nxt_bool_t          done;
1720Sigor@sysoev.ru     nxt_event_conn_t    *c;
1730Sigor@sysoev.ru     nxt_job_sendfile_t  *jbs;
1740Sigor@sysoev.ru 
1750Sigor@sysoev.ru     jbs = obj;
1760Sigor@sysoev.ru     c = data;
1770Sigor@sysoev.ru 
1780Sigor@sysoev.ru     c->blocked = 0;
1790Sigor@sysoev.ru 
1800Sigor@sysoev.ru     sent = jbs->sent;
1810Sigor@sysoev.ru     c->sent += sent;
1820Sigor@sysoev.ru 
183*1Sigor@sysoev.ru     nxt_debug(task, "event conn sendfile sent:%z", sent);
1840Sigor@sysoev.ru 
1850Sigor@sysoev.ru     b = jbs->out;
1860Sigor@sysoev.ru 
1870Sigor@sysoev.ru     /* The job must be destroyed before connection error handler. */
1880Sigor@sysoev.ru     nxt_job_destroy(jbs);
1890Sigor@sysoev.ru 
1900Sigor@sysoev.ru     if (c->write_state->process_buffers) {
191*1Sigor@sysoev.ru         b = nxt_event_conn_job_sendfile_completion(task, c, b);
1920Sigor@sysoev.ru 
1930Sigor@sysoev.ru         done = (b == NULL);
1940Sigor@sysoev.ru 
1950Sigor@sysoev.ru         /* Add data which might be added after sendfile job has started. */
1960Sigor@sysoev.ru         nxt_buf_chain_add(&b, c->write);
1970Sigor@sysoev.ru         c->write = b;
1980Sigor@sysoev.ru 
1990Sigor@sysoev.ru         if (done) {
2000Sigor@sysoev.ru             /* All data has been sent. */
2010Sigor@sysoev.ru 
2020Sigor@sysoev.ru             if (b != NULL) {
2030Sigor@sysoev.ru                 /* But new data has been added. */
204*1Sigor@sysoev.ru                 nxt_event_conn_job_sendfile_start(task, c, NULL);
2050Sigor@sysoev.ru             }
2060Sigor@sysoev.ru 
2070Sigor@sysoev.ru             return;
2080Sigor@sysoev.ru         }
2090Sigor@sysoev.ru     }
2100Sigor@sysoev.ru 
2110Sigor@sysoev.ru     if (sent != 0 && c->write_state->autoreset_timer) {
2120Sigor@sysoev.ru         nxt_event_timer_disable(&c->write_timer);
2130Sigor@sysoev.ru 
2140Sigor@sysoev.ru     } else if (c->write_timer.state == NXT_EVENT_TIMER_BLOCKED) {
2150Sigor@sysoev.ru         c->write_timer.state = NXT_EVENT_TIMER_ACTIVE;
2160Sigor@sysoev.ru     }
2170Sigor@sysoev.ru 
2180Sigor@sysoev.ru     if (c->socket.error == 0
219*1Sigor@sysoev.ru         && !nxt_event_conn_write_delayed(task->thread->engine, c, sent))
2200Sigor@sysoev.ru     {
221*1Sigor@sysoev.ru         nxt_event_conn_timer(task->thread->engine, c, c->write_state,
222*1Sigor@sysoev.ru                              &c->write_timer);
2230Sigor@sysoev.ru 
224*1Sigor@sysoev.ru         nxt_event_fd_oneshot_write(task->thread->engine, &c->socket);
2250Sigor@sysoev.ru     }
2260Sigor@sysoev.ru 
2270Sigor@sysoev.ru     if (sent != 0) {
228*1Sigor@sysoev.ru         nxt_event_conn_io_handle(task->thread, c->write_work_queue,
2290Sigor@sysoev.ru                                  c->write_state->ready_handler,
230*1Sigor@sysoev.ru                                  task, c, c->socket.data);
2310Sigor@sysoev.ru         /*
2320Sigor@sysoev.ru          * Fall through if first operations were
2330Sigor@sysoev.ru          * successful but the last one failed.
2340Sigor@sysoev.ru          */
2350Sigor@sysoev.ru     }
2360Sigor@sysoev.ru 
2370Sigor@sysoev.ru     if (nxt_slow_path(c->socket.error != 0)) {
238*1Sigor@sysoev.ru         nxt_event_conn_io_handle(task->thread, c->write_work_queue,
2390Sigor@sysoev.ru                                  c->write_state->error_handler,
240*1Sigor@sysoev.ru                                  task, c, c->socket.data);
2410Sigor@sysoev.ru     }
2420Sigor@sysoev.ru }
2430Sigor@sysoev.ru 
2440Sigor@sysoev.ru 
2450Sigor@sysoev.ru static nxt_buf_t *
246*1Sigor@sysoev.ru nxt_event_conn_job_sendfile_completion(nxt_task_t *task, nxt_event_conn_t *c,
2470Sigor@sysoev.ru     nxt_buf_t *b)
2480Sigor@sysoev.ru {
2490Sigor@sysoev.ru     while (b != NULL) {
2500Sigor@sysoev.ru 
2510Sigor@sysoev.ru         nxt_prefetch(b->next);
2520Sigor@sysoev.ru 
2530Sigor@sysoev.ru         if (nxt_buf_is_mem(b) && b->mem.pos != b->mem.free) {
2540Sigor@sysoev.ru             break;
2550Sigor@sysoev.ru 
2560Sigor@sysoev.ru         } else if (nxt_buf_is_file(b) && b->file_pos != b->file_end) {
2570Sigor@sysoev.ru             break;
2580Sigor@sysoev.ru         }
2590Sigor@sysoev.ru 
260*1Sigor@sysoev.ru         nxt_thread_work_queue_add(task->thread, c->write_work_queue,
261*1Sigor@sysoev.ru                                   b->completion_handler, task, b, b->parent);
2620Sigor@sysoev.ru 
2630Sigor@sysoev.ru         b = b->next;
2640Sigor@sysoev.ru     }
2650Sigor@sysoev.ru 
2660Sigor@sysoev.ru     return b;
2670Sigor@sysoev.ru }
2680Sigor@sysoev.ru 
2690Sigor@sysoev.ru #endif
270