xref: /unit/src/nxt_event_conn_job_sendfile.c (revision 4:76c63e9b6322)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 #if (NXT_THREADS)
11 
12 typedef struct {
13     nxt_job_t           job;
14     nxt_buf_t           *out;
15     size_t              sent;
16     size_t              limit;
17     nxt_work_handler_t  ready_handler;
18 } nxt_job_sendfile_t;
19 
20 
21 static void nxt_event_conn_job_sendfile_start(nxt_task_t *task, void *obj,
22     void *data);
23 static void nxt_event_conn_job_sendfile_handler(nxt_task_t *task, void *obj,
24     void *data);
25 static void nxt_event_conn_job_sendfile_return(nxt_task_t *task, void *obj,
26     void *data);
27 static nxt_buf_t *nxt_event_conn_job_sendfile_completion(nxt_task_t *task,
28     nxt_event_conn_t *c, nxt_buf_t *b);
29 
30 
31 void
32 nxt_event_conn_job_sendfile(nxt_task_t *task, nxt_event_conn_t *c)
33 {
34     nxt_event_fd_disable(task->thread->engine, &c->socket);
35 
36     /* A work item data is not used in nxt_event_conn_job_sendfile_start(). */
37     nxt_event_conn_job_sendfile_start(task, c, NULL);
38 }
39 
40 
41 static void
42 nxt_event_conn_job_sendfile_start(nxt_task_t *task, void *obj, void *data)
43 {
44     nxt_iobuf_t             b;
45     nxt_event_conn_t        *c;
46     nxt_job_sendfile_t      *jbs;
47     nxt_sendbuf_coalesce_t  sb;
48 
49     c = obj;
50 
51     nxt_debug(task, "event conn sendfile fd:%d", c->socket.fd);
52 
53     jbs = nxt_job_create(c->mem_pool, sizeof(nxt_job_sendfile_t));
54 
55     if (nxt_slow_path(jbs == NULL)) {
56         c->write_state->error_handler(task, c, NULL);
57         return;
58     }
59 
60     c->socket.write_handler = nxt_event_conn_job_sendfile_start;
61     c->socket.error_handler = c->write_state->error_handler;
62 
63     jbs->job.data = c;
64     nxt_job_set_name(&jbs->job, "job sendfile");
65 
66     jbs->limit = nxt_event_conn_write_limit(c);
67 
68     if (jbs->limit != 0) {
69 
70         sb.buf = c->write;
71         sb.iobuf = &b;
72         sb.nmax = 1;
73         sb.sync = 0;
74         sb.size = 0;
75         sb.limit = jbs->limit;
76 
77         if (nxt_sendbuf_mem_coalesce(c->socket.task, &sb) != 0 || !sb.sync) {
78 
79             jbs->job.thread_pool = c->u.thread_pool;
80             jbs->job.log = c->socket.log;
81             jbs->out = c->write;
82             c->write = NULL;
83             jbs->ready_handler = nxt_event_conn_job_sendfile_return;
84 
85             c->blocked = 1;
86 
87             if (c->write_timer.state != NXT_EVENT_TIMER_DISABLED) {
88                 c->write_timer.state = NXT_EVENT_TIMER_BLOCKED;
89             }
90 
91             nxt_job_start(task, &jbs->job, nxt_event_conn_job_sendfile_handler);
92             return;
93         }
94     }
95 
96     nxt_event_conn_job_sendfile_return(task, jbs, c);
97 }
98 
99 
100 static void
101 nxt_event_conn_job_sendfile_handler(nxt_task_t *task, void *obj, void *data)
102 {
103     ssize_t             ret;
104     nxt_buf_t           *b;
105     nxt_bool_t          first;
106     nxt_event_conn_t    *c;
107     nxt_job_sendfile_t  *jbs;
108 
109     jbs = obj;
110     c = data;
111 
112     nxt_debug(task, "event conn job sendfile fd:%d", c->socket.fd);
113 
114     first = c->socket.write_ready;
115     b = jbs->out;
116 
117     do {
118         ret = c->io->sendbuf(c, b, jbs->limit);
119 
120         if (ret == NXT_AGAIN) {
121             break;
122         }
123 
124         if (nxt_slow_path(ret == NXT_ERROR)) {
125             goto done;
126         }
127 
128         jbs->sent += ret;
129         jbs->limit -= ret;
130 
131         b = nxt_sendbuf_update(b, ret);
132 
133         if (b == NULL) {
134             goto done;
135         }
136 
137         if (jbs->limit == 0) {
138 
139             if (c->rate == NULL) {
140                 jbs->limit = c->max_chunk;
141                 goto fast;
142             }
143 
144             goto done;
145         }
146 
147     } while (c->socket.write_ready);
148 
149     if (first && task->thread->thread_pool->work_queue.head != NULL) {
150         goto fast;
151     }
152 
153 done:
154 
155     nxt_job_return(task, &jbs->job, jbs->ready_handler);
156     return;
157 
158 fast:
159 
160     nxt_work_set(&jbs->job.work, nxt_event_conn_job_sendfile_handler,
161                  jbs->job.task, jbs, c);
162 
163     nxt_thread_pool_post(task->thread->thread_pool, &jbs->job.work);
164 }
165 
166 
167 static void
168 nxt_event_conn_job_sendfile_return(nxt_task_t *task, void *obj, void *data)
169 {
170     size_t              sent;
171     nxt_buf_t           *b;
172     nxt_bool_t          done;
173     nxt_event_conn_t    *c;
174     nxt_job_sendfile_t  *jbs;
175 
176     jbs = obj;
177     c = data;
178 
179     c->blocked = 0;
180 
181     sent = jbs->sent;
182     c->sent += sent;
183 
184     nxt_debug(task, "event conn sendfile sent:%z", sent);
185 
186     b = jbs->out;
187 
188     /* The job must be destroyed before connection error handler. */
189     nxt_job_destroy(jbs);
190 
191     if (c->write_state->process_buffers) {
192         b = nxt_event_conn_job_sendfile_completion(task, c, b);
193 
194         done = (b == NULL);
195 
196         /* Add data which might be added after sendfile job has started. */
197         nxt_buf_chain_add(&b, c->write);
198         c->write = b;
199 
200         if (done) {
201             /* All data has been sent. */
202 
203             if (b != NULL) {
204                 /* But new data has been added. */
205                 nxt_event_conn_job_sendfile_start(task, c, NULL);
206             }
207 
208             return;
209         }
210     }
211 
212     if (sent != 0 && c->write_state->autoreset_timer) {
213         nxt_event_timer_disable(&c->write_timer);
214 
215     } else if (c->write_timer.state == NXT_EVENT_TIMER_BLOCKED) {
216         c->write_timer.state = NXT_EVENT_TIMER_ACTIVE;
217     }
218 
219     if (c->socket.error == 0
220         && !nxt_event_conn_write_delayed(task->thread->engine, c, sent))
221     {
222         nxt_event_conn_timer(task->thread->engine, c, c->write_state,
223                              &c->write_timer);
224 
225         nxt_event_fd_oneshot_write(task->thread->engine, &c->socket);
226     }
227 
228     if (sent != 0) {
229         nxt_event_conn_io_handle(task->thread, c->write_work_queue,
230                                  c->write_state->ready_handler,
231                                  task, c, c->socket.data);
232         /*
233          * Fall through if first operations were
234          * successful but the last one failed.
235          */
236     }
237 
238     if (nxt_slow_path(c->socket.error != 0)) {
239         nxt_event_conn_io_handle(task->thread, c->write_work_queue,
240                                  c->write_state->error_handler,
241                                  task, c, c->socket.data);
242     }
243 }
244 
245 
246 static nxt_buf_t *
247 nxt_event_conn_job_sendfile_completion(nxt_task_t *task, nxt_event_conn_t *c,
248     nxt_buf_t *b)
249 {
250     while (b != NULL) {
251 
252         nxt_prefetch(b->next);
253 
254         if (nxt_buf_is_mem(b) && b->mem.pos != b->mem.free) {
255             break;
256 
257         } else if (nxt_buf_is_file(b) && b->file_pos != b->file_end) {
258             break;
259         }
260 
261         nxt_work_queue_add(c->write_work_queue,
262                            b->completion_handler, task, b, b->parent);
263 
264         b = b->next;
265     }
266 
267     return b;
268 }
269 
270 #endif
271