1*0Sigor@sysoev.ru 
2*0Sigor@sysoev.ru /*
3*0Sigor@sysoev.ru  * Copyright (C) Igor Sysoev
4*0Sigor@sysoev.ru  * Copyright (C) NGINX, Inc.
5*0Sigor@sysoev.ru  */
6*0Sigor@sysoev.ru 
7*0Sigor@sysoev.ru #include <nxt_main.h>
8*0Sigor@sysoev.ru 
9*0Sigor@sysoev.ru 
10*0Sigor@sysoev.ru #if (NXT_THREADS)
11*0Sigor@sysoev.ru 
12*0Sigor@sysoev.ru typedef struct {
13*0Sigor@sysoev.ru     nxt_job_t           job;
14*0Sigor@sysoev.ru     nxt_buf_t           *out;
15*0Sigor@sysoev.ru     size_t              sent;
16*0Sigor@sysoev.ru     size_t              limit;
17*0Sigor@sysoev.ru     nxt_work_handler_t  ready_handler;
18*0Sigor@sysoev.ru } nxt_job_sendfile_t;
19*0Sigor@sysoev.ru 
20*0Sigor@sysoev.ru 
21*0Sigor@sysoev.ru static void nxt_event_conn_job_sendfile_start(nxt_thread_t *thr, void *obj,
22*0Sigor@sysoev.ru     void *data);
23*0Sigor@sysoev.ru static void nxt_event_conn_job_sendfile_handler(nxt_thread_t *thr, void *obj,
24*0Sigor@sysoev.ru     void *data);
25*0Sigor@sysoev.ru static void nxt_event_conn_job_sendfile_return(nxt_thread_t *thr, void *obj,
26*0Sigor@sysoev.ru     void *data);
27*0Sigor@sysoev.ru static nxt_buf_t *nxt_event_conn_job_sendfile_completion(nxt_thread_t *thr,
28*0Sigor@sysoev.ru     nxt_event_conn_t *c, nxt_buf_t *b);
29*0Sigor@sysoev.ru 
30*0Sigor@sysoev.ru 
31*0Sigor@sysoev.ru void
32*0Sigor@sysoev.ru nxt_event_conn_job_sendfile(nxt_thread_t *thr, nxt_event_conn_t *c)
33*0Sigor@sysoev.ru {
34*0Sigor@sysoev.ru     nxt_event_fd_disable(thr->engine, &c->socket);
35*0Sigor@sysoev.ru 
36*0Sigor@sysoev.ru     /* A work item data is not used in nxt_event_conn_job_sendfile_start(). */
37*0Sigor@sysoev.ru     nxt_event_conn_job_sendfile_start(thr, c, NULL);
38*0Sigor@sysoev.ru }
39*0Sigor@sysoev.ru 
40*0Sigor@sysoev.ru 
41*0Sigor@sysoev.ru static void
42*0Sigor@sysoev.ru nxt_event_conn_job_sendfile_start(nxt_thread_t *thr, void *obj, void *data)
43*0Sigor@sysoev.ru {
44*0Sigor@sysoev.ru     nxt_iobuf_t             b;
45*0Sigor@sysoev.ru     nxt_event_conn_t        *c;
46*0Sigor@sysoev.ru     nxt_job_sendfile_t      *jbs;
47*0Sigor@sysoev.ru     nxt_sendbuf_coalesce_t  sb;
48*0Sigor@sysoev.ru 
49*0Sigor@sysoev.ru     c = obj;
50*0Sigor@sysoev.ru 
51*0Sigor@sysoev.ru     nxt_log_debug(thr->log, "event conn sendfile fd:%d", c->socket.fd);
52*0Sigor@sysoev.ru 
53*0Sigor@sysoev.ru     jbs = nxt_job_create(c->mem_pool, sizeof(nxt_job_sendfile_t));
54*0Sigor@sysoev.ru 
55*0Sigor@sysoev.ru     if (nxt_slow_path(jbs == NULL)) {
56*0Sigor@sysoev.ru         c->write_state->error_handler(thr, c, NULL);
57*0Sigor@sysoev.ru         return;
58*0Sigor@sysoev.ru     }
59*0Sigor@sysoev.ru 
60*0Sigor@sysoev.ru     c->socket.write_handler = nxt_event_conn_job_sendfile_start;
61*0Sigor@sysoev.ru     c->socket.error_handler = c->write_state->error_handler;
62*0Sigor@sysoev.ru 
63*0Sigor@sysoev.ru     jbs->job.data = c;
64*0Sigor@sysoev.ru     nxt_job_set_name(&jbs->job, "job sendfile");
65*0Sigor@sysoev.ru 
66*0Sigor@sysoev.ru     jbs->limit = nxt_event_conn_write_limit(c);
67*0Sigor@sysoev.ru 
68*0Sigor@sysoev.ru     if (jbs->limit != 0) {
69*0Sigor@sysoev.ru 
70*0Sigor@sysoev.ru         sb.buf = c->write;
71*0Sigor@sysoev.ru         sb.iobuf = &b;
72*0Sigor@sysoev.ru         sb.nmax = 1;
73*0Sigor@sysoev.ru         sb.sync = 0;
74*0Sigor@sysoev.ru         sb.size = 0;
75*0Sigor@sysoev.ru         sb.limit = jbs->limit;
76*0Sigor@sysoev.ru 
77*0Sigor@sysoev.ru         if (nxt_sendbuf_mem_coalesce(&sb) != 0 || !sb.sync) {
78*0Sigor@sysoev.ru 
79*0Sigor@sysoev.ru             jbs->job.thread_pool = c->u.thread_pool;
80*0Sigor@sysoev.ru             jbs->job.log = c->socket.log;
81*0Sigor@sysoev.ru             jbs->out = c->write;
82*0Sigor@sysoev.ru             c->write = NULL;
83*0Sigor@sysoev.ru             jbs->ready_handler = nxt_event_conn_job_sendfile_return;
84*0Sigor@sysoev.ru 
85*0Sigor@sysoev.ru             c->blocked = 1;
86*0Sigor@sysoev.ru 
87*0Sigor@sysoev.ru             if (c->write_timer.state != NXT_EVENT_TIMER_DISABLED) {
88*0Sigor@sysoev.ru                 c->write_timer.state = NXT_EVENT_TIMER_BLOCKED;
89*0Sigor@sysoev.ru             }
90*0Sigor@sysoev.ru 
91*0Sigor@sysoev.ru             nxt_job_start(thr, &jbs->job, nxt_event_conn_job_sendfile_handler);
92*0Sigor@sysoev.ru             return;
93*0Sigor@sysoev.ru         }
94*0Sigor@sysoev.ru     }
95*0Sigor@sysoev.ru 
96*0Sigor@sysoev.ru     nxt_event_conn_job_sendfile_return(thr, jbs, c);
97*0Sigor@sysoev.ru }
98*0Sigor@sysoev.ru 
99*0Sigor@sysoev.ru 
100*0Sigor@sysoev.ru static void
101*0Sigor@sysoev.ru nxt_event_conn_job_sendfile_handler(nxt_thread_t *thr, void *obj, void *data)
102*0Sigor@sysoev.ru {
103*0Sigor@sysoev.ru     ssize_t             ret;
104*0Sigor@sysoev.ru     nxt_buf_t           *b;
105*0Sigor@sysoev.ru     nxt_bool_t          first;
106*0Sigor@sysoev.ru     nxt_event_conn_t    *c;
107*0Sigor@sysoev.ru     nxt_job_sendfile_t  *jbs;
108*0Sigor@sysoev.ru 
109*0Sigor@sysoev.ru     jbs = obj;
110*0Sigor@sysoev.ru     c = data;
111*0Sigor@sysoev.ru 
112*0Sigor@sysoev.ru     nxt_log_debug(thr->log, "event conn job sendfile fd:%d", c->socket.fd);
113*0Sigor@sysoev.ru 
114*0Sigor@sysoev.ru     first = c->socket.write_ready;
115*0Sigor@sysoev.ru     b = jbs->out;
116*0Sigor@sysoev.ru 
117*0Sigor@sysoev.ru     do {
118*0Sigor@sysoev.ru         ret = c->io->sendbuf(c, b, jbs->limit);
119*0Sigor@sysoev.ru 
120*0Sigor@sysoev.ru         if (ret == NXT_AGAIN) {
121*0Sigor@sysoev.ru             break;
122*0Sigor@sysoev.ru         }
123*0Sigor@sysoev.ru 
124*0Sigor@sysoev.ru         if (nxt_slow_path(ret == NXT_ERROR)) {
125*0Sigor@sysoev.ru             goto done;
126*0Sigor@sysoev.ru         }
127*0Sigor@sysoev.ru 
128*0Sigor@sysoev.ru         jbs->sent += ret;
129*0Sigor@sysoev.ru         jbs->limit -= ret;
130*0Sigor@sysoev.ru 
131*0Sigor@sysoev.ru         b = nxt_sendbuf_update(b, ret);
132*0Sigor@sysoev.ru 
133*0Sigor@sysoev.ru         if (b == NULL) {
134*0Sigor@sysoev.ru             goto done;
135*0Sigor@sysoev.ru         }
136*0Sigor@sysoev.ru 
137*0Sigor@sysoev.ru         if (jbs->limit == 0) {
138*0Sigor@sysoev.ru 
139*0Sigor@sysoev.ru             if (c->rate == NULL) {
140*0Sigor@sysoev.ru                 jbs->limit = c->max_chunk;
141*0Sigor@sysoev.ru                 goto fast;
142*0Sigor@sysoev.ru             }
143*0Sigor@sysoev.ru 
144*0Sigor@sysoev.ru             goto done;
145*0Sigor@sysoev.ru         }
146*0Sigor@sysoev.ru 
147*0Sigor@sysoev.ru     } while (c->socket.write_ready);
148*0Sigor@sysoev.ru 
149*0Sigor@sysoev.ru     if (first && thr->thread_pool->work_queue.head != NULL) {
150*0Sigor@sysoev.ru         goto fast;
151*0Sigor@sysoev.ru     }
152*0Sigor@sysoev.ru 
153*0Sigor@sysoev.ru done:
154*0Sigor@sysoev.ru 
155*0Sigor@sysoev.ru     nxt_job_return(thr, &jbs->job, jbs->ready_handler);
156*0Sigor@sysoev.ru     return;
157*0Sigor@sysoev.ru 
158*0Sigor@sysoev.ru fast:
159*0Sigor@sysoev.ru 
160*0Sigor@sysoev.ru     nxt_thread_pool_post(thr->thread_pool, nxt_event_conn_job_sendfile_handler,
161*0Sigor@sysoev.ru                          jbs, c, thr->log);
162*0Sigor@sysoev.ru }
163*0Sigor@sysoev.ru 
164*0Sigor@sysoev.ru 
165*0Sigor@sysoev.ru static void
166*0Sigor@sysoev.ru nxt_event_conn_job_sendfile_return(nxt_thread_t *thr, void *obj, void *data)
167*0Sigor@sysoev.ru {
168*0Sigor@sysoev.ru     size_t              sent;
169*0Sigor@sysoev.ru     nxt_buf_t           *b;
170*0Sigor@sysoev.ru     nxt_bool_t          done;
171*0Sigor@sysoev.ru     nxt_event_conn_t    *c;
172*0Sigor@sysoev.ru     nxt_job_sendfile_t  *jbs;
173*0Sigor@sysoev.ru 
174*0Sigor@sysoev.ru     jbs = obj;
175*0Sigor@sysoev.ru     c = data;
176*0Sigor@sysoev.ru 
177*0Sigor@sysoev.ru     c->blocked = 0;
178*0Sigor@sysoev.ru 
179*0Sigor@sysoev.ru     sent = jbs->sent;
180*0Sigor@sysoev.ru     c->sent += sent;
181*0Sigor@sysoev.ru 
182*0Sigor@sysoev.ru     nxt_log_debug(thr->log, "event conn sendfile sent:%z", sent);
183*0Sigor@sysoev.ru 
184*0Sigor@sysoev.ru     b = jbs->out;
185*0Sigor@sysoev.ru 
186*0Sigor@sysoev.ru     /* The job must be destroyed before connection error handler. */
187*0Sigor@sysoev.ru     nxt_job_destroy(jbs);
188*0Sigor@sysoev.ru 
189*0Sigor@sysoev.ru     if (c->write_state->process_buffers) {
190*0Sigor@sysoev.ru         b = nxt_event_conn_job_sendfile_completion(thr, c, b);
191*0Sigor@sysoev.ru 
192*0Sigor@sysoev.ru         done = (b == NULL);
193*0Sigor@sysoev.ru 
194*0Sigor@sysoev.ru         /* Add data which might be added after sendfile job has started. */
195*0Sigor@sysoev.ru         nxt_buf_chain_add(&b, c->write);
196*0Sigor@sysoev.ru         c->write = b;
197*0Sigor@sysoev.ru 
198*0Sigor@sysoev.ru         if (done) {
199*0Sigor@sysoev.ru             /* All data has been sent. */
200*0Sigor@sysoev.ru 
201*0Sigor@sysoev.ru             if (b != NULL) {
202*0Sigor@sysoev.ru                 /* But new data has been added. */
203*0Sigor@sysoev.ru                 nxt_event_conn_job_sendfile_start(thr, c, NULL);
204*0Sigor@sysoev.ru             }
205*0Sigor@sysoev.ru 
206*0Sigor@sysoev.ru             return;
207*0Sigor@sysoev.ru         }
208*0Sigor@sysoev.ru     }
209*0Sigor@sysoev.ru 
210*0Sigor@sysoev.ru     if (sent != 0 && c->write_state->autoreset_timer) {
211*0Sigor@sysoev.ru         nxt_event_timer_disable(&c->write_timer);
212*0Sigor@sysoev.ru 
213*0Sigor@sysoev.ru     } else if (c->write_timer.state == NXT_EVENT_TIMER_BLOCKED) {
214*0Sigor@sysoev.ru         c->write_timer.state = NXT_EVENT_TIMER_ACTIVE;
215*0Sigor@sysoev.ru     }
216*0Sigor@sysoev.ru 
217*0Sigor@sysoev.ru     if (c->socket.error == 0
218*0Sigor@sysoev.ru         && !nxt_event_conn_write_delayed(thr->engine, c, sent))
219*0Sigor@sysoev.ru     {
220*0Sigor@sysoev.ru         nxt_event_conn_timer(thr->engine, c, c->write_state, &c->write_timer);
221*0Sigor@sysoev.ru 
222*0Sigor@sysoev.ru         nxt_event_fd_oneshot_write(thr->engine, &c->socket);
223*0Sigor@sysoev.ru     }
224*0Sigor@sysoev.ru 
225*0Sigor@sysoev.ru     if (sent != 0) {
226*0Sigor@sysoev.ru         nxt_event_conn_io_handle(thr, c->write_work_queue,
227*0Sigor@sysoev.ru                                  c->write_state->ready_handler,
228*0Sigor@sysoev.ru                                  c, c->socket.data);
229*0Sigor@sysoev.ru         /*
230*0Sigor@sysoev.ru          * Fall through if first operations were
231*0Sigor@sysoev.ru          * successful but the last one failed.
232*0Sigor@sysoev.ru          */
233*0Sigor@sysoev.ru     }
234*0Sigor@sysoev.ru 
235*0Sigor@sysoev.ru     if (nxt_slow_path(c->socket.error != 0)) {
236*0Sigor@sysoev.ru         nxt_event_conn_io_handle(thr, c->write_work_queue,
237*0Sigor@sysoev.ru                                  c->write_state->error_handler,
238*0Sigor@sysoev.ru                                  c, c->socket.data);
239*0Sigor@sysoev.ru     }
240*0Sigor@sysoev.ru }
241*0Sigor@sysoev.ru 
242*0Sigor@sysoev.ru 
243*0Sigor@sysoev.ru static nxt_buf_t *
244*0Sigor@sysoev.ru nxt_event_conn_job_sendfile_completion(nxt_thread_t *thr, nxt_event_conn_t *c,
245*0Sigor@sysoev.ru     nxt_buf_t *b)
246*0Sigor@sysoev.ru {
247*0Sigor@sysoev.ru     while (b != NULL) {
248*0Sigor@sysoev.ru 
249*0Sigor@sysoev.ru         nxt_prefetch(b->next);
250*0Sigor@sysoev.ru 
251*0Sigor@sysoev.ru         if (nxt_buf_is_mem(b) && b->mem.pos != b->mem.free) {
252*0Sigor@sysoev.ru             break;
253*0Sigor@sysoev.ru 
254*0Sigor@sysoev.ru         } else if (nxt_buf_is_file(b) && b->file_pos != b->file_end) {
255*0Sigor@sysoev.ru             break;
256*0Sigor@sysoev.ru         }
257*0Sigor@sysoev.ru 
258*0Sigor@sysoev.ru         nxt_thread_work_queue_add(thr, c->write_work_queue,
259*0Sigor@sysoev.ru                                   b->completion_handler,
260*0Sigor@sysoev.ru                                   b, b->parent, thr->log);
261*0Sigor@sysoev.ru 
262*0Sigor@sysoev.ru         b = b->next;
263*0Sigor@sysoev.ru     }
264*0Sigor@sysoev.ru 
265*0Sigor@sysoev.ru     return b;
266*0Sigor@sysoev.ru }
267*0Sigor@sysoev.ru 
268*0Sigor@sysoev.ru #endif
269