xref: /unit/src/nxt_event_conn_job_sendfile.c (revision 223:bf98efe2c55c)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 typedef struct {
11     nxt_job_t           job;
12     nxt_buf_t           *out;
13     size_t              sent;
14     size_t              limit;
15     nxt_work_handler_t  ready_handler;
16 } nxt_job_sendfile_t;
17 
18 
19 static void nxt_event_conn_job_sendfile_start(nxt_task_t *task, void *obj,
20     void *data);
21 static void nxt_event_conn_job_sendfile_handler(nxt_task_t *task, void *obj,
22     void *data);
23 static void nxt_event_conn_job_sendfile_return(nxt_task_t *task, void *obj,
24     void *data);
25 static nxt_buf_t *nxt_event_conn_job_sendfile_completion(nxt_task_t *task,
26     nxt_conn_t *c, nxt_buf_t *b);
27 
28 
29 void
30 nxt_event_conn_job_sendfile(nxt_task_t *task, nxt_conn_t *c)
31 {
32     nxt_fd_event_disable(task->thread->engine, &c->socket);
33 
34     /* A work item data is not used in nxt_event_conn_job_sendfile_start(). */
35     nxt_event_conn_job_sendfile_start(task, c, NULL);
36 }
37 
38 
39 static void
40 nxt_event_conn_job_sendfile_start(nxt_task_t *task, void *obj, void *data)
41 {
42     nxt_conn_t              *c;
43     nxt_iobuf_t             b;
44     nxt_job_sendfile_t      *jbs;
45     nxt_sendbuf_coalesce_t  sb;
46 
47     c = obj;
48 
49     nxt_debug(task, "event conn sendfile fd:%d", c->socket.fd);
50 
51     jbs = nxt_job_create(c->mem_pool, sizeof(nxt_job_sendfile_t));
52 
53     if (nxt_slow_path(jbs == NULL)) {
54         c->write_state->error_handler(task, c, NULL);
55         return;
56     }
57 
58     c->socket.write_handler = nxt_event_conn_job_sendfile_start;
59     c->socket.error_handler = c->write_state->error_handler;
60 
61     jbs->job.data = c;
62     nxt_job_set_name(&jbs->job, "job sendfile");
63 
64     jbs->limit = nxt_event_conn_write_limit(c);
65 
66     if (jbs->limit != 0) {
67 
68         sb.buf = c->write;
69         sb.iobuf = &b;
70         sb.nmax = 1;
71         sb.sync = 0;
72         sb.size = 0;
73         sb.limit = jbs->limit;
74 
75         if (nxt_sendbuf_mem_coalesce(c->socket.task, &sb) != 0 || !sb.sync) {
76 
77             jbs->job.thread_pool = c->u.thread_pool;
78             jbs->job.log = c->socket.log;
79             jbs->out = c->write;
80             c->write = NULL;
81             jbs->ready_handler = nxt_event_conn_job_sendfile_return;
82 
83             c->blocked = 1;
84 
85             nxt_job_start(task, &jbs->job, nxt_event_conn_job_sendfile_handler);
86             return;
87         }
88     }
89 
90     nxt_event_conn_job_sendfile_return(task, jbs, c);
91 }
92 
93 
94 static void
95 nxt_event_conn_job_sendfile_handler(nxt_task_t *task, void *obj, void *data)
96 {
97     ssize_t             ret;
98     nxt_buf_t           *b;
99     nxt_bool_t          first;
100     nxt_conn_t          *c;
101     nxt_job_sendfile_t  *jbs;
102 
103     jbs = obj;
104     c = data;
105 
106     nxt_debug(task, "event conn job sendfile fd:%d", c->socket.fd);
107 
108     first = c->socket.write_ready;
109     b = jbs->out;
110 
111     do {
112         ret = c->io->sendbuf(c, b, jbs->limit);
113 
114         if (ret == NXT_AGAIN) {
115             break;
116         }
117 
118         if (nxt_slow_path(ret == NXT_ERROR)) {
119             goto done;
120         }
121 
122         jbs->sent += ret;
123         jbs->limit -= ret;
124 
125         b = nxt_sendbuf_update(b, ret);
126 
127         if (b == NULL) {
128             goto done;
129         }
130 
131         if (jbs->limit == 0) {
132 
133             if (c->rate == NULL) {
134                 jbs->limit = c->max_chunk;
135                 goto fast;
136             }
137 
138             goto done;
139         }
140 
141     } while (c->socket.write_ready);
142 
143     if (first && task->thread->thread_pool->work_queue.head != NULL) {
144         goto fast;
145     }
146 
147 done:
148 
149     nxt_job_return(task, &jbs->job, jbs->ready_handler);
150     return;
151 
152 fast:
153 
154     nxt_work_set(&jbs->job.work, nxt_event_conn_job_sendfile_handler,
155                  jbs->job.task, jbs, c);
156 
157     nxt_thread_pool_post(task->thread->thread_pool, &jbs->job.work);
158 }
159 
160 
161 static void
162 nxt_event_conn_job_sendfile_return(nxt_task_t *task, void *obj, void *data)
163 {
164     size_t              sent;
165     nxt_buf_t           *b;
166     nxt_bool_t          done;
167     nxt_conn_t          *c;
168     nxt_job_sendfile_t  *jbs;
169 
170     jbs = obj;
171     c = data;
172 
173     c->blocked = 0;
174 
175     sent = jbs->sent;
176     c->sent += sent;
177 
178     nxt_debug(task, "event conn sendfile sent:%z", sent);
179 
180     b = jbs->out;
181 
182     /* The job must be destroyed before connection error handler. */
183     nxt_job_destroy(task, jbs);
184 
185     if (0 /* STUB: c->write_state->process_buffers */) {
186         b = nxt_event_conn_job_sendfile_completion(task, c, b);
187 
188         done = (b == NULL);
189 
190         /* Add data which might be added after sendfile job has started. */
191         nxt_buf_chain_add(&b, c->write);
192         c->write = b;
193 
194         if (done) {
195             /* All data has been sent. */
196 
197             if (b != NULL) {
198                 /* But new data has been added. */
199                 nxt_event_conn_job_sendfile_start(task, c, NULL);
200             }
201 
202             return;
203         }
204     }
205 
206     if (sent != 0 && c->write_state->timer_autoreset) {
207         nxt_timer_disable(task->thread->engine, &c->write_timer);
208     }
209 
210     if (c->socket.error == 0
211         && !nxt_event_conn_write_delayed(task->thread->engine, c, sent))
212     {
213         nxt_conn_timer(task->thread->engine, c, c->write_state,
214                        &c->write_timer);
215 
216         nxt_fd_event_oneshot_write(task->thread->engine, &c->socket);
217     }
218 
219     if (sent != 0) {
220         nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler,
221                            task, c, c->socket.data);
222         /*
223          * Fall through if first operations were
224          * successful but the last one failed.
225          */
226     }
227 
228     if (nxt_slow_path(c->socket.error != 0)) {
229         nxt_work_queue_add(c->write_work_queue, c->write_state->error_handler,
230                            task, c, c->socket.data);
231     }
232 }
233 
234 
235 static nxt_buf_t *
236 nxt_event_conn_job_sendfile_completion(nxt_task_t *task, nxt_conn_t *c,
237     nxt_buf_t *b)
238 {
239     while (b != NULL) {
240 
241         nxt_prefetch(b->next);
242 
243         if (nxt_buf_is_mem(b) && b->mem.pos != b->mem.free) {
244             break;
245 
246         } else if (nxt_buf_is_file(b) && b->file_pos != b->file_end) {
247             break;
248         }
249 
250         nxt_work_queue_add(c->write_work_queue,
251                            b->completion_handler, task, b, b->parent);
252 
253         b = b->next;
254     }
255 
256     return b;
257 }
258