Deleted Added
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8

--- 4 unchanged lines hidden (view full) ---

13 nxt_job_t job;
14 nxt_buf_t *out;
15 size_t sent;
16 size_t limit;
17 nxt_work_handler_t ready_handler;
18} nxt_job_sendfile_t;
19
20
21static void nxt_event_conn_job_sendfile_start(nxt_task_t *task, void *obj,
22 void *data);
23static void nxt_event_conn_job_sendfile_handler(nxt_task_t *task, void *obj,
24 void *data);
25static void nxt_event_conn_job_sendfile_return(nxt_task_t *task, void *obj,
26 void *data);
27static nxt_buf_t *nxt_event_conn_job_sendfile_completion(nxt_task_t *task,
28 nxt_event_conn_t *c, nxt_buf_t *b);
29
30
31void
32nxt_event_conn_job_sendfile(nxt_task_t *task, nxt_event_conn_t *c)
33{
34 nxt_event_fd_disable(task->thread->engine, &c->socket);
35
36 /* A work item data is not used in nxt_event_conn_job_sendfile_start(). */
37 nxt_event_conn_job_sendfile_start(task, c, NULL);
38}
39
40
41static void
42nxt_event_conn_job_sendfile_start(nxt_task_t *task, void *obj, void *data)
43{
44 nxt_iobuf_t b;
45 nxt_event_conn_t *c;
46 nxt_job_sendfile_t *jbs;
47 nxt_sendbuf_coalesce_t sb;
48
49 c = obj;
50
51 nxt_debug(task, "event conn sendfile fd:%d", c->socket.fd);
52
53 jbs = nxt_job_create(c->mem_pool, sizeof(nxt_job_sendfile_t));
54
55 if (nxt_slow_path(jbs == NULL)) {
56 c->write_state->error_handler(task, c, NULL);
57 return;
58 }
59
60 c->socket.write_handler = nxt_event_conn_job_sendfile_start;
61 c->socket.error_handler = c->write_state->error_handler;
62
63 jbs->job.data = c;
64 nxt_job_set_name(&jbs->job, "job sendfile");

--- 4 unchanged lines hidden (view full) ---

69
70 sb.buf = c->write;
71 sb.iobuf = &b;
72 sb.nmax = 1;
73 sb.sync = 0;
74 sb.size = 0;
75 sb.limit = jbs->limit;
76
77 if (nxt_sendbuf_mem_coalesce(c->socket.task, &sb) != 0 || !sb.sync) {
78
79 jbs->job.thread_pool = c->u.thread_pool;
80 jbs->job.log = c->socket.log;
81 jbs->out = c->write;
82 c->write = NULL;
83 jbs->ready_handler = nxt_event_conn_job_sendfile_return;
84
85 c->blocked = 1;
86
87 if (c->write_timer.state != NXT_EVENT_TIMER_DISABLED) {
88 c->write_timer.state = NXT_EVENT_TIMER_BLOCKED;
89 }
90
91 nxt_job_start(task, &jbs->job, nxt_event_conn_job_sendfile_handler);
92 return;
93 }
94 }
95
96 nxt_event_conn_job_sendfile_return(task, jbs, c);
97}
98
99
100static void
101nxt_event_conn_job_sendfile_handler(nxt_task_t *task, void *obj, void *data)
102{
103 ssize_t ret;
104 nxt_buf_t *b;
105 nxt_bool_t first;
106 nxt_event_conn_t *c;
107 nxt_job_sendfile_t *jbs;
108
109 jbs = obj;
110 c = data;
111
112 nxt_debug(task, "event conn job sendfile fd:%d", c->socket.fd);
113
114 first = c->socket.write_ready;
115 b = jbs->out;
116
117 do {
118 ret = c->io->sendbuf(c, b, jbs->limit);
119
120 if (ret == NXT_AGAIN) {

--- 20 unchanged lines hidden (view full) ---

141 goto fast;
142 }
143
144 goto done;
145 }
146
147 } while (c->socket.write_ready);
148
149 if (first && task->thread->thread_pool->work_queue.head != NULL) {
150 goto fast;
151 }
152
153done:
154
155 nxt_job_return(task, &jbs->job, jbs->ready_handler);
156 return;
157
158fast:
159
160 nxt_thread_pool_post(task->thread->thread_pool,
161 nxt_event_conn_job_sendfile_handler,
162 &jbs->job.task, jbs, c);
163}
164
165
166static void
167nxt_event_conn_job_sendfile_return(nxt_task_t *task, void *obj, void *data)
168{
169 size_t sent;
170 nxt_buf_t *b;
171 nxt_bool_t done;
172 nxt_event_conn_t *c;
173 nxt_job_sendfile_t *jbs;
174
175 jbs = obj;
176 c = data;
177
178 c->blocked = 0;
179
180 sent = jbs->sent;
181 c->sent += sent;
182
183 nxt_debug(task, "event conn sendfile sent:%z", sent);
184
185 b = jbs->out;
186
187 /* The job must be destroyed before connection error handler. */
188 nxt_job_destroy(jbs);
189
190 if (c->write_state->process_buffers) {
191 b = nxt_event_conn_job_sendfile_completion(task, c, b);
192
193 done = (b == NULL);
194
195 /* Add data which might be added after sendfile job has started. */
196 nxt_buf_chain_add(&b, c->write);
197 c->write = b;
198
199 if (done) {
200 /* All data has been sent. */
201
202 if (b != NULL) {
203 /* But new data has been added. */
204 nxt_event_conn_job_sendfile_start(task, c, NULL);
205 }
206
207 return;
208 }
209 }
210
211 if (sent != 0 && c->write_state->autoreset_timer) {
212 nxt_event_timer_disable(&c->write_timer);
213
214 } else if (c->write_timer.state == NXT_EVENT_TIMER_BLOCKED) {
215 c->write_timer.state = NXT_EVENT_TIMER_ACTIVE;
216 }
217
218 if (c->socket.error == 0
219 && !nxt_event_conn_write_delayed(task->thread->engine, c, sent))
220 {
221 nxt_event_conn_timer(task->thread->engine, c, c->write_state,
222 &c->write_timer);
223
224 nxt_event_fd_oneshot_write(task->thread->engine, &c->socket);
225 }
226
227 if (sent != 0) {
228 nxt_event_conn_io_handle(task->thread, c->write_work_queue,
229 c->write_state->ready_handler,
230 task, c, c->socket.data);
231 /*
232 * Fall through if first operations were
233 * successful but the last one failed.
234 */
235 }
236
237 if (nxt_slow_path(c->socket.error != 0)) {
238 nxt_event_conn_io_handle(task->thread, c->write_work_queue,
239 c->write_state->error_handler,
240 task, c, c->socket.data);
241 }
242}
243
244
245static nxt_buf_t *
246nxt_event_conn_job_sendfile_completion(nxt_task_t *task, nxt_event_conn_t *c,
247 nxt_buf_t *b)
248{
249 while (b != NULL) {
250
251 nxt_prefetch(b->next);
252
253 if (nxt_buf_is_mem(b) && b->mem.pos != b->mem.free) {
254 break;
255
256 } else if (nxt_buf_is_file(b) && b->file_pos != b->file_end) {
257 break;
258 }
259
260 nxt_thread_work_queue_add(task->thread, c->write_work_queue,
261 b->completion_handler, task, b, b->parent);
262
263 b = b->next;
264 }
265
266 return b;
267}
268
269#endif