xref: /unit/src/nxt_event_conn_job_sendfile.c (revision 7:be7025f805f4)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 #if (NXT_THREADS)
11 
12 typedef struct {
13     nxt_job_t           job;
14     nxt_buf_t           *out;
15     size_t              sent;
16     size_t              limit;
17     nxt_work_handler_t  ready_handler;
18 } nxt_job_sendfile_t;
19 
20 
21 static void nxt_event_conn_job_sendfile_start(nxt_task_t *task, void *obj,
22     void *data);
23 static void nxt_event_conn_job_sendfile_handler(nxt_task_t *task, void *obj,
24     void *data);
25 static void nxt_event_conn_job_sendfile_return(nxt_task_t *task, void *obj,
26     void *data);
27 static nxt_buf_t *nxt_event_conn_job_sendfile_completion(nxt_task_t *task,
28     nxt_event_conn_t *c, nxt_buf_t *b);
29 
30 
31 void
32 nxt_event_conn_job_sendfile(nxt_task_t *task, nxt_event_conn_t *c)
33 {
34     nxt_event_fd_disable(task->thread->engine, &c->socket);
35 
36     /* A work item data is not used in nxt_event_conn_job_sendfile_start(). */
37     nxt_event_conn_job_sendfile_start(task, c, NULL);
38 }
39 
40 
41 static void
42 nxt_event_conn_job_sendfile_start(nxt_task_t *task, void *obj, void *data)
43 {
44     nxt_iobuf_t             b;
45     nxt_event_conn_t        *c;
46     nxt_job_sendfile_t      *jbs;
47     nxt_sendbuf_coalesce_t  sb;
48 
49     c = obj;
50 
51     nxt_debug(task, "event conn sendfile fd:%d", c->socket.fd);
52 
53     jbs = nxt_job_create(c->mem_pool, sizeof(nxt_job_sendfile_t));
54 
55     if (nxt_slow_path(jbs == NULL)) {
56         c->write_state->error_handler(task, c, NULL);
57         return;
58     }
59 
60     c->socket.write_handler = nxt_event_conn_job_sendfile_start;
61     c->socket.error_handler = c->write_state->error_handler;
62 
63     jbs->job.data = c;
64     nxt_job_set_name(&jbs->job, "job sendfile");
65 
66     jbs->limit = nxt_event_conn_write_limit(c);
67 
68     if (jbs->limit != 0) {
69 
70         sb.buf = c->write;
71         sb.iobuf = &b;
72         sb.nmax = 1;
73         sb.sync = 0;
74         sb.size = 0;
75         sb.limit = jbs->limit;
76 
77         if (nxt_sendbuf_mem_coalesce(c->socket.task, &sb) != 0 || !sb.sync) {
78 
79             jbs->job.thread_pool = c->u.thread_pool;
80             jbs->job.log = c->socket.log;
81             jbs->out = c->write;
82             c->write = NULL;
83             jbs->ready_handler = nxt_event_conn_job_sendfile_return;
84 
85             c->blocked = 1;
86 
87             nxt_job_start(task, &jbs->job, nxt_event_conn_job_sendfile_handler);
88             return;
89         }
90     }
91 
92     nxt_event_conn_job_sendfile_return(task, jbs, c);
93 }
94 
95 
96 static void
97 nxt_event_conn_job_sendfile_handler(nxt_task_t *task, void *obj, void *data)
98 {
99     ssize_t             ret;
100     nxt_buf_t           *b;
101     nxt_bool_t          first;
102     nxt_event_conn_t    *c;
103     nxt_job_sendfile_t  *jbs;
104 
105     jbs = obj;
106     c = data;
107 
108     nxt_debug(task, "event conn job sendfile fd:%d", c->socket.fd);
109 
110     first = c->socket.write_ready;
111     b = jbs->out;
112 
113     do {
114         ret = c->io->sendbuf(c, b, jbs->limit);
115 
116         if (ret == NXT_AGAIN) {
117             break;
118         }
119 
120         if (nxt_slow_path(ret == NXT_ERROR)) {
121             goto done;
122         }
123 
124         jbs->sent += ret;
125         jbs->limit -= ret;
126 
127         b = nxt_sendbuf_update(b, ret);
128 
129         if (b == NULL) {
130             goto done;
131         }
132 
133         if (jbs->limit == 0) {
134 
135             if (c->rate == NULL) {
136                 jbs->limit = c->max_chunk;
137                 goto fast;
138             }
139 
140             goto done;
141         }
142 
143     } while (c->socket.write_ready);
144 
145     if (first && task->thread->thread_pool->work_queue.head != NULL) {
146         goto fast;
147     }
148 
149 done:
150 
151     nxt_job_return(task, &jbs->job, jbs->ready_handler);
152     return;
153 
154 fast:
155 
156     nxt_work_set(&jbs->job.work, nxt_event_conn_job_sendfile_handler,
157                  jbs->job.task, jbs, c);
158 
159     nxt_thread_pool_post(task->thread->thread_pool, &jbs->job.work);
160 }
161 
162 
163 static void
164 nxt_event_conn_job_sendfile_return(nxt_task_t *task, void *obj, void *data)
165 {
166     size_t              sent;
167     nxt_buf_t           *b;
168     nxt_bool_t          done;
169     nxt_event_conn_t    *c;
170     nxt_job_sendfile_t  *jbs;
171 
172     jbs = obj;
173     c = data;
174 
175     c->blocked = 0;
176 
177     sent = jbs->sent;
178     c->sent += sent;
179 
180     nxt_debug(task, "event conn sendfile sent:%z", sent);
181 
182     b = jbs->out;
183 
184     /* The job must be destroyed before connection error handler. */
185     nxt_job_destroy(jbs);
186 
187     if (c->write_state->process_buffers) {
188         b = nxt_event_conn_job_sendfile_completion(task, c, b);
189 
190         done = (b == NULL);
191 
192         /* Add data which might be added after sendfile job has started. */
193         nxt_buf_chain_add(&b, c->write);
194         c->write = b;
195 
196         if (done) {
197             /* All data has been sent. */
198 
199             if (b != NULL) {
200                 /* But new data has been added. */
201                 nxt_event_conn_job_sendfile_start(task, c, NULL);
202             }
203 
204             return;
205         }
206     }
207 
208     if (sent != 0 && c->write_state->autoreset_timer) {
209         nxt_timer_disable(task->thread->engine, &c->write_timer);
210     }
211 
212     if (c->socket.error == 0
213         && !nxt_event_conn_write_delayed(task->thread->engine, c, sent))
214     {
215         nxt_event_conn_timer(task->thread->engine, c, c->write_state,
216                              &c->write_timer);
217 
218         nxt_event_fd_oneshot_write(task->thread->engine, &c->socket);
219     }
220 
221     if (sent != 0) {
222         nxt_event_conn_io_handle(task->thread, c->write_work_queue,
223                                  c->write_state->ready_handler,
224                                  task, c, c->socket.data);
225         /*
226          * Fall through if first operations were
227          * successful but the last one failed.
228          */
229     }
230 
231     if (nxt_slow_path(c->socket.error != 0)) {
232         nxt_event_conn_io_handle(task->thread, c->write_work_queue,
233                                  c->write_state->error_handler,
234                                  task, c, c->socket.data);
235     }
236 }
237 
238 
239 static nxt_buf_t *
240 nxt_event_conn_job_sendfile_completion(nxt_task_t *task, nxt_event_conn_t *c,
241     nxt_buf_t *b)
242 {
243     while (b != NULL) {
244 
245         nxt_prefetch(b->next);
246 
247         if (nxt_buf_is_mem(b) && b->mem.pos != b->mem.free) {
248             break;
249 
250         } else if (nxt_buf_is_file(b) && b->file_pos != b->file_end) {
251             break;
252         }
253 
254         nxt_work_queue_add(c->write_work_queue,
255                            b->completion_handler, task, b, b->parent);
256 
257         b = b->next;
258     }
259 
260     return b;
261 }
262 
263 #endif
264