10Sigor@sysoev.ru 20Sigor@sysoev.ru /* 30Sigor@sysoev.ru * Copyright (C) Igor Sysoev 40Sigor@sysoev.ru * Copyright (C) NGINX, Inc. 50Sigor@sysoev.ru */ 60Sigor@sysoev.ru 70Sigor@sysoev.ru #include <nxt_main.h> 80Sigor@sysoev.ru 90Sigor@sysoev.ru 100Sigor@sysoev.ru #if (NXT_THREADS) 110Sigor@sysoev.ru 120Sigor@sysoev.ru typedef struct { 130Sigor@sysoev.ru nxt_job_t job; 140Sigor@sysoev.ru nxt_buf_t *out; 150Sigor@sysoev.ru size_t sent; 160Sigor@sysoev.ru size_t limit; 170Sigor@sysoev.ru nxt_work_handler_t ready_handler; 180Sigor@sysoev.ru } nxt_job_sendfile_t; 190Sigor@sysoev.ru 200Sigor@sysoev.ru 211Sigor@sysoev.ru static void nxt_event_conn_job_sendfile_start(nxt_task_t *task, void *obj, 220Sigor@sysoev.ru void *data); 231Sigor@sysoev.ru static void nxt_event_conn_job_sendfile_handler(nxt_task_t *task, void *obj, 240Sigor@sysoev.ru void *data); 251Sigor@sysoev.ru static void nxt_event_conn_job_sendfile_return(nxt_task_t *task, void *obj, 260Sigor@sysoev.ru void *data); 271Sigor@sysoev.ru static nxt_buf_t *nxt_event_conn_job_sendfile_completion(nxt_task_t *task, 280Sigor@sysoev.ru nxt_event_conn_t *c, nxt_buf_t *b); 290Sigor@sysoev.ru 300Sigor@sysoev.ru 310Sigor@sysoev.ru void 321Sigor@sysoev.ru nxt_event_conn_job_sendfile(nxt_task_t *task, nxt_event_conn_t *c) 330Sigor@sysoev.ru { 341Sigor@sysoev.ru nxt_event_fd_disable(task->thread->engine, &c->socket); 350Sigor@sysoev.ru 360Sigor@sysoev.ru /* A work item data is not used in nxt_event_conn_job_sendfile_start(). */ 371Sigor@sysoev.ru nxt_event_conn_job_sendfile_start(task, c, NULL); 380Sigor@sysoev.ru } 390Sigor@sysoev.ru 400Sigor@sysoev.ru 410Sigor@sysoev.ru static void 421Sigor@sysoev.ru nxt_event_conn_job_sendfile_start(nxt_task_t *task, void *obj, void *data) 430Sigor@sysoev.ru { 440Sigor@sysoev.ru nxt_iobuf_t b; 450Sigor@sysoev.ru nxt_event_conn_t *c; 460Sigor@sysoev.ru nxt_job_sendfile_t *jbs; 470Sigor@sysoev.ru nxt_sendbuf_coalesce_t sb; 480Sigor@sysoev.ru 490Sigor@sysoev.ru c = obj; 500Sigor@sysoev.ru 511Sigor@sysoev.ru nxt_debug(task, "event conn sendfile fd:%d", c->socket.fd); 520Sigor@sysoev.ru 530Sigor@sysoev.ru jbs = nxt_job_create(c->mem_pool, sizeof(nxt_job_sendfile_t)); 540Sigor@sysoev.ru 550Sigor@sysoev.ru if (nxt_slow_path(jbs == NULL)) { 561Sigor@sysoev.ru c->write_state->error_handler(task, c, NULL); 570Sigor@sysoev.ru return; 580Sigor@sysoev.ru } 590Sigor@sysoev.ru 600Sigor@sysoev.ru c->socket.write_handler = nxt_event_conn_job_sendfile_start; 610Sigor@sysoev.ru c->socket.error_handler = c->write_state->error_handler; 620Sigor@sysoev.ru 630Sigor@sysoev.ru jbs->job.data = c; 640Sigor@sysoev.ru nxt_job_set_name(&jbs->job, "job sendfile"); 650Sigor@sysoev.ru 660Sigor@sysoev.ru jbs->limit = nxt_event_conn_write_limit(c); 670Sigor@sysoev.ru 680Sigor@sysoev.ru if (jbs->limit != 0) { 690Sigor@sysoev.ru 700Sigor@sysoev.ru sb.buf = c->write; 710Sigor@sysoev.ru sb.iobuf = &b; 720Sigor@sysoev.ru sb.nmax = 1; 730Sigor@sysoev.ru sb.sync = 0; 740Sigor@sysoev.ru sb.size = 0; 750Sigor@sysoev.ru sb.limit = jbs->limit; 760Sigor@sysoev.ru 771Sigor@sysoev.ru if (nxt_sendbuf_mem_coalesce(c->socket.task, &sb) != 0 || !sb.sync) { 780Sigor@sysoev.ru 790Sigor@sysoev.ru jbs->job.thread_pool = c->u.thread_pool; 800Sigor@sysoev.ru jbs->job.log = c->socket.log; 810Sigor@sysoev.ru jbs->out = c->write; 820Sigor@sysoev.ru c->write = NULL; 830Sigor@sysoev.ru jbs->ready_handler = nxt_event_conn_job_sendfile_return; 840Sigor@sysoev.ru 850Sigor@sysoev.ru c->blocked = 1; 860Sigor@sysoev.ru 870Sigor@sysoev.ru if (c->write_timer.state != NXT_EVENT_TIMER_DISABLED) { 880Sigor@sysoev.ru c->write_timer.state = NXT_EVENT_TIMER_BLOCKED; 890Sigor@sysoev.ru } 900Sigor@sysoev.ru 911Sigor@sysoev.ru nxt_job_start(task, &jbs->job, nxt_event_conn_job_sendfile_handler); 920Sigor@sysoev.ru return; 930Sigor@sysoev.ru } 940Sigor@sysoev.ru } 950Sigor@sysoev.ru 961Sigor@sysoev.ru nxt_event_conn_job_sendfile_return(task, jbs, c); 970Sigor@sysoev.ru } 980Sigor@sysoev.ru 990Sigor@sysoev.ru 1000Sigor@sysoev.ru static void 1011Sigor@sysoev.ru nxt_event_conn_job_sendfile_handler(nxt_task_t *task, void *obj, void *data) 1020Sigor@sysoev.ru { 1030Sigor@sysoev.ru ssize_t ret; 1040Sigor@sysoev.ru nxt_buf_t *b; 1050Sigor@sysoev.ru nxt_bool_t first; 1060Sigor@sysoev.ru nxt_event_conn_t *c; 1070Sigor@sysoev.ru nxt_job_sendfile_t *jbs; 1080Sigor@sysoev.ru 1090Sigor@sysoev.ru jbs = obj; 1100Sigor@sysoev.ru c = data; 1110Sigor@sysoev.ru 1121Sigor@sysoev.ru nxt_debug(task, "event conn job sendfile fd:%d", c->socket.fd); 1130Sigor@sysoev.ru 1140Sigor@sysoev.ru first = c->socket.write_ready; 1150Sigor@sysoev.ru b = jbs->out; 1160Sigor@sysoev.ru 1170Sigor@sysoev.ru do { 1180Sigor@sysoev.ru ret = c->io->sendbuf(c, b, jbs->limit); 1190Sigor@sysoev.ru 1200Sigor@sysoev.ru if (ret == NXT_AGAIN) { 1210Sigor@sysoev.ru break; 1220Sigor@sysoev.ru } 1230Sigor@sysoev.ru 1240Sigor@sysoev.ru if (nxt_slow_path(ret == NXT_ERROR)) { 1250Sigor@sysoev.ru goto done; 1260Sigor@sysoev.ru } 1270Sigor@sysoev.ru 1280Sigor@sysoev.ru jbs->sent += ret; 1290Sigor@sysoev.ru jbs->limit -= ret; 1300Sigor@sysoev.ru 1310Sigor@sysoev.ru b = nxt_sendbuf_update(b, ret); 1320Sigor@sysoev.ru 1330Sigor@sysoev.ru if (b == NULL) { 1340Sigor@sysoev.ru goto done; 1350Sigor@sysoev.ru } 1360Sigor@sysoev.ru 1370Sigor@sysoev.ru if (jbs->limit == 0) { 1380Sigor@sysoev.ru 1390Sigor@sysoev.ru if (c->rate == NULL) { 1400Sigor@sysoev.ru jbs->limit = c->max_chunk; 1410Sigor@sysoev.ru goto fast; 1420Sigor@sysoev.ru } 1430Sigor@sysoev.ru 1440Sigor@sysoev.ru goto done; 1450Sigor@sysoev.ru } 1460Sigor@sysoev.ru 1470Sigor@sysoev.ru } while (c->socket.write_ready); 1480Sigor@sysoev.ru 1491Sigor@sysoev.ru if (first && task->thread->thread_pool->work_queue.head != NULL) { 1500Sigor@sysoev.ru goto fast; 1510Sigor@sysoev.ru } 1520Sigor@sysoev.ru 1530Sigor@sysoev.ru done: 1540Sigor@sysoev.ru 1551Sigor@sysoev.ru nxt_job_return(task, &jbs->job, jbs->ready_handler); 1560Sigor@sysoev.ru return; 1570Sigor@sysoev.ru 1580Sigor@sysoev.ru fast: 1590Sigor@sysoev.ru 160*4Sigor@sysoev.ru nxt_work_set(&jbs->job.work, nxt_event_conn_job_sendfile_handler, 161*4Sigor@sysoev.ru jbs->job.task, jbs, c); 162*4Sigor@sysoev.ru 163*4Sigor@sysoev.ru nxt_thread_pool_post(task->thread->thread_pool, &jbs->job.work); 1640Sigor@sysoev.ru } 1650Sigor@sysoev.ru 1660Sigor@sysoev.ru 1670Sigor@sysoev.ru static void 1681Sigor@sysoev.ru nxt_event_conn_job_sendfile_return(nxt_task_t *task, void *obj, void *data) 1690Sigor@sysoev.ru { 1700Sigor@sysoev.ru size_t sent; 1710Sigor@sysoev.ru nxt_buf_t *b; 1720Sigor@sysoev.ru nxt_bool_t done; 1730Sigor@sysoev.ru nxt_event_conn_t *c; 1740Sigor@sysoev.ru nxt_job_sendfile_t *jbs; 1750Sigor@sysoev.ru 1760Sigor@sysoev.ru jbs = obj; 1770Sigor@sysoev.ru c = data; 1780Sigor@sysoev.ru 1790Sigor@sysoev.ru c->blocked = 0; 1800Sigor@sysoev.ru 1810Sigor@sysoev.ru sent = jbs->sent; 1820Sigor@sysoev.ru c->sent += sent; 1830Sigor@sysoev.ru 1841Sigor@sysoev.ru nxt_debug(task, "event conn sendfile sent:%z", sent); 1850Sigor@sysoev.ru 1860Sigor@sysoev.ru b = jbs->out; 1870Sigor@sysoev.ru 1880Sigor@sysoev.ru /* The job must be destroyed before connection error handler. */ 1890Sigor@sysoev.ru nxt_job_destroy(jbs); 1900Sigor@sysoev.ru 1910Sigor@sysoev.ru if (c->write_state->process_buffers) { 1921Sigor@sysoev.ru b = nxt_event_conn_job_sendfile_completion(task, c, b); 1930Sigor@sysoev.ru 1940Sigor@sysoev.ru done = (b == NULL); 1950Sigor@sysoev.ru 1960Sigor@sysoev.ru /* Add data which might be added after sendfile job has started. */ 1970Sigor@sysoev.ru nxt_buf_chain_add(&b, c->write); 1980Sigor@sysoev.ru c->write = b; 1990Sigor@sysoev.ru 2000Sigor@sysoev.ru if (done) { 2010Sigor@sysoev.ru /* All data has been sent. */ 2020Sigor@sysoev.ru 2030Sigor@sysoev.ru if (b != NULL) { 2040Sigor@sysoev.ru /* But new data has been added. */ 2051Sigor@sysoev.ru nxt_event_conn_job_sendfile_start(task, c, NULL); 2060Sigor@sysoev.ru } 2070Sigor@sysoev.ru 2080Sigor@sysoev.ru return; 2090Sigor@sysoev.ru } 2100Sigor@sysoev.ru } 2110Sigor@sysoev.ru 2120Sigor@sysoev.ru if (sent != 0 && c->write_state->autoreset_timer) { 2130Sigor@sysoev.ru nxt_event_timer_disable(&c->write_timer); 2140Sigor@sysoev.ru 2150Sigor@sysoev.ru } else if (c->write_timer.state == NXT_EVENT_TIMER_BLOCKED) { 2160Sigor@sysoev.ru c->write_timer.state = NXT_EVENT_TIMER_ACTIVE; 2170Sigor@sysoev.ru } 2180Sigor@sysoev.ru 2190Sigor@sysoev.ru if (c->socket.error == 0 2201Sigor@sysoev.ru && !nxt_event_conn_write_delayed(task->thread->engine, c, sent)) 2210Sigor@sysoev.ru { 2221Sigor@sysoev.ru nxt_event_conn_timer(task->thread->engine, c, c->write_state, 2231Sigor@sysoev.ru &c->write_timer); 2240Sigor@sysoev.ru 2251Sigor@sysoev.ru nxt_event_fd_oneshot_write(task->thread->engine, &c->socket); 2260Sigor@sysoev.ru } 2270Sigor@sysoev.ru 2280Sigor@sysoev.ru if (sent != 0) { 2291Sigor@sysoev.ru nxt_event_conn_io_handle(task->thread, c->write_work_queue, 2300Sigor@sysoev.ru c->write_state->ready_handler, 2311Sigor@sysoev.ru task, c, c->socket.data); 2320Sigor@sysoev.ru /* 2330Sigor@sysoev.ru * Fall through if first operations were 2340Sigor@sysoev.ru * successful but the last one failed. 2350Sigor@sysoev.ru */ 2360Sigor@sysoev.ru } 2370Sigor@sysoev.ru 2380Sigor@sysoev.ru if (nxt_slow_path(c->socket.error != 0)) { 2391Sigor@sysoev.ru nxt_event_conn_io_handle(task->thread, c->write_work_queue, 2400Sigor@sysoev.ru c->write_state->error_handler, 2411Sigor@sysoev.ru task, c, c->socket.data); 2420Sigor@sysoev.ru } 2430Sigor@sysoev.ru } 2440Sigor@sysoev.ru 2450Sigor@sysoev.ru 2460Sigor@sysoev.ru static nxt_buf_t * 2471Sigor@sysoev.ru nxt_event_conn_job_sendfile_completion(nxt_task_t *task, nxt_event_conn_t *c, 2480Sigor@sysoev.ru nxt_buf_t *b) 2490Sigor@sysoev.ru { 2500Sigor@sysoev.ru while (b != NULL) { 2510Sigor@sysoev.ru 2520Sigor@sysoev.ru nxt_prefetch(b->next); 2530Sigor@sysoev.ru 2540Sigor@sysoev.ru if (nxt_buf_is_mem(b) && b->mem.pos != b->mem.free) { 2550Sigor@sysoev.ru break; 2560Sigor@sysoev.ru 2570Sigor@sysoev.ru } else if (nxt_buf_is_file(b) && b->file_pos != b->file_end) { 2580Sigor@sysoev.ru break; 2590Sigor@sysoev.ru } 2600Sigor@sysoev.ru 261*4Sigor@sysoev.ru nxt_work_queue_add(c->write_work_queue, 262*4Sigor@sysoev.ru b->completion_handler, task, b, b->parent); 2630Sigor@sysoev.ru 2640Sigor@sysoev.ru b = b->next; 2650Sigor@sysoev.ru } 2660Sigor@sysoev.ru 2670Sigor@sysoev.ru return b; 2680Sigor@sysoev.ru } 2690Sigor@sysoev.ru 2700Sigor@sysoev.ru #endif 271