nxt_event_conn_job_sendfile.c (0:a63ceefd6ab0) nxt_event_conn_job_sendfile.c (1:fdc027c56872)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8

--- 4 unchanged lines hidden (view full) ---

13 nxt_job_t job;
14 nxt_buf_t *out;
15 size_t sent;
16 size_t limit;
17 nxt_work_handler_t ready_handler;
18} nxt_job_sendfile_t;
19
20
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8

--- 4 unchanged lines hidden (view full) ---

13 nxt_job_t job;
14 nxt_buf_t *out;
15 size_t sent;
16 size_t limit;
17 nxt_work_handler_t ready_handler;
18} nxt_job_sendfile_t;
19
20
21static void nxt_event_conn_job_sendfile_start(nxt_thread_t *thr, void *obj,
21static void nxt_event_conn_job_sendfile_start(nxt_task_t *task, void *obj,
22 void *data);
22 void *data);
23static void nxt_event_conn_job_sendfile_handler(nxt_thread_t *thr, void *obj,
23static void nxt_event_conn_job_sendfile_handler(nxt_task_t *task, void *obj,
24 void *data);
24 void *data);
25static void nxt_event_conn_job_sendfile_return(nxt_thread_t *thr, void *obj,
25static void nxt_event_conn_job_sendfile_return(nxt_task_t *task, void *obj,
26 void *data);
26 void *data);
27static nxt_buf_t *nxt_event_conn_job_sendfile_completion(nxt_thread_t *thr,
27static nxt_buf_t *nxt_event_conn_job_sendfile_completion(nxt_task_t *task,
28 nxt_event_conn_t *c, nxt_buf_t *b);
29
30
31void
28 nxt_event_conn_t *c, nxt_buf_t *b);
29
30
31void
32nxt_event_conn_job_sendfile(nxt_thread_t *thr, nxt_event_conn_t *c)
32nxt_event_conn_job_sendfile(nxt_task_t *task, nxt_event_conn_t *c)
33{
33{
34 nxt_event_fd_disable(thr->engine, &c->socket);
34 nxt_event_fd_disable(task->thread->engine, &c->socket);
35
36 /* A work item data is not used in nxt_event_conn_job_sendfile_start(). */
35
36 /* A work item data is not used in nxt_event_conn_job_sendfile_start(). */
37 nxt_event_conn_job_sendfile_start(thr, c, NULL);
37 nxt_event_conn_job_sendfile_start(task, c, NULL);
38}
39
40
41static void
38}
39
40
41static void
42nxt_event_conn_job_sendfile_start(nxt_thread_t *thr, void *obj, void *data)
42nxt_event_conn_job_sendfile_start(nxt_task_t *task, void *obj, void *data)
43{
44 nxt_iobuf_t b;
45 nxt_event_conn_t *c;
46 nxt_job_sendfile_t *jbs;
47 nxt_sendbuf_coalesce_t sb;
48
49 c = obj;
50
43{
44 nxt_iobuf_t b;
45 nxt_event_conn_t *c;
46 nxt_job_sendfile_t *jbs;
47 nxt_sendbuf_coalesce_t sb;
48
49 c = obj;
50
51 nxt_log_debug(thr->log, "event conn sendfile fd:%d", c->socket.fd);
51 nxt_debug(task, "event conn sendfile fd:%d", c->socket.fd);
52
53 jbs = nxt_job_create(c->mem_pool, sizeof(nxt_job_sendfile_t));
54
55 if (nxt_slow_path(jbs == NULL)) {
52
53 jbs = nxt_job_create(c->mem_pool, sizeof(nxt_job_sendfile_t));
54
55 if (nxt_slow_path(jbs == NULL)) {
56 c->write_state->error_handler(thr, c, NULL);
56 c->write_state->error_handler(task, c, NULL);
57 return;
58 }
59
60 c->socket.write_handler = nxt_event_conn_job_sendfile_start;
61 c->socket.error_handler = c->write_state->error_handler;
62
63 jbs->job.data = c;
64 nxt_job_set_name(&jbs->job, "job sendfile");

--- 4 unchanged lines hidden (view full) ---

69
70 sb.buf = c->write;
71 sb.iobuf = &b;
72 sb.nmax = 1;
73 sb.sync = 0;
74 sb.size = 0;
75 sb.limit = jbs->limit;
76
57 return;
58 }
59
60 c->socket.write_handler = nxt_event_conn_job_sendfile_start;
61 c->socket.error_handler = c->write_state->error_handler;
62
63 jbs->job.data = c;
64 nxt_job_set_name(&jbs->job, "job sendfile");

--- 4 unchanged lines hidden (view full) ---

69
70 sb.buf = c->write;
71 sb.iobuf = &b;
72 sb.nmax = 1;
73 sb.sync = 0;
74 sb.size = 0;
75 sb.limit = jbs->limit;
76
77 if (nxt_sendbuf_mem_coalesce(&sb) != 0 || !sb.sync) {
77 if (nxt_sendbuf_mem_coalesce(c->socket.task, &sb) != 0 || !sb.sync) {
78
79 jbs->job.thread_pool = c->u.thread_pool;
80 jbs->job.log = c->socket.log;
81 jbs->out = c->write;
82 c->write = NULL;
83 jbs->ready_handler = nxt_event_conn_job_sendfile_return;
84
85 c->blocked = 1;
86
87 if (c->write_timer.state != NXT_EVENT_TIMER_DISABLED) {
88 c->write_timer.state = NXT_EVENT_TIMER_BLOCKED;
89 }
90
78
79 jbs->job.thread_pool = c->u.thread_pool;
80 jbs->job.log = c->socket.log;
81 jbs->out = c->write;
82 c->write = NULL;
83 jbs->ready_handler = nxt_event_conn_job_sendfile_return;
84
85 c->blocked = 1;
86
87 if (c->write_timer.state != NXT_EVENT_TIMER_DISABLED) {
88 c->write_timer.state = NXT_EVENT_TIMER_BLOCKED;
89 }
90
91 nxt_job_start(thr, &jbs->job, nxt_event_conn_job_sendfile_handler);
91 nxt_job_start(task, &jbs->job, nxt_event_conn_job_sendfile_handler);
92 return;
93 }
94 }
95
92 return;
93 }
94 }
95
96 nxt_event_conn_job_sendfile_return(thr, jbs, c);
96 nxt_event_conn_job_sendfile_return(task, jbs, c);
97}
98
99
100static void
97}
98
99
100static void
101nxt_event_conn_job_sendfile_handler(nxt_thread_t *thr, void *obj, void *data)
101nxt_event_conn_job_sendfile_handler(nxt_task_t *task, void *obj, void *data)
102{
103 ssize_t ret;
104 nxt_buf_t *b;
105 nxt_bool_t first;
106 nxt_event_conn_t *c;
107 nxt_job_sendfile_t *jbs;
108
109 jbs = obj;
110 c = data;
111
102{
103 ssize_t ret;
104 nxt_buf_t *b;
105 nxt_bool_t first;
106 nxt_event_conn_t *c;
107 nxt_job_sendfile_t *jbs;
108
109 jbs = obj;
110 c = data;
111
112 nxt_log_debug(thr->log, "event conn job sendfile fd:%d", c->socket.fd);
112 nxt_debug(task, "event conn job sendfile fd:%d", c->socket.fd);
113
114 first = c->socket.write_ready;
115 b = jbs->out;
116
117 do {
118 ret = c->io->sendbuf(c, b, jbs->limit);
119
120 if (ret == NXT_AGAIN) {

--- 20 unchanged lines hidden (view full) ---

141 goto fast;
142 }
143
144 goto done;
145 }
146
147 } while (c->socket.write_ready);
148
113
114 first = c->socket.write_ready;
115 b = jbs->out;
116
117 do {
118 ret = c->io->sendbuf(c, b, jbs->limit);
119
120 if (ret == NXT_AGAIN) {

--- 20 unchanged lines hidden (view full) ---

141 goto fast;
142 }
143
144 goto done;
145 }
146
147 } while (c->socket.write_ready);
148
149 if (first && thr->thread_pool->work_queue.head != NULL) {
149 if (first && task->thread->thread_pool->work_queue.head != NULL) {
150 goto fast;
151 }
152
153done:
154
150 goto fast;
151 }
152
153done:
154
155 nxt_job_return(thr, &jbs->job, jbs->ready_handler);
155 nxt_job_return(task, &jbs->job, jbs->ready_handler);
156 return;
157
158fast:
159
156 return;
157
158fast:
159
160 nxt_thread_pool_post(thr->thread_pool, nxt_event_conn_job_sendfile_handler,
161 jbs, c, thr->log);
160 nxt_thread_pool_post(task->thread->thread_pool,
161 nxt_event_conn_job_sendfile_handler,
162 &jbs->job.task, jbs, c);
162}
163
164
165static void
163}
164
165
166static void
166nxt_event_conn_job_sendfile_return(nxt_thread_t *thr, void *obj, void *data)
167nxt_event_conn_job_sendfile_return(nxt_task_t *task, void *obj, void *data)
167{
168 size_t sent;
169 nxt_buf_t *b;
170 nxt_bool_t done;
171 nxt_event_conn_t *c;
172 nxt_job_sendfile_t *jbs;
173
174 jbs = obj;
175 c = data;
176
177 c->blocked = 0;
178
179 sent = jbs->sent;
180 c->sent += sent;
181
168{
169 size_t sent;
170 nxt_buf_t *b;
171 nxt_bool_t done;
172 nxt_event_conn_t *c;
173 nxt_job_sendfile_t *jbs;
174
175 jbs = obj;
176 c = data;
177
178 c->blocked = 0;
179
180 sent = jbs->sent;
181 c->sent += sent;
182
182 nxt_log_debug(thr->log, "event conn sendfile sent:%z", sent);
183 nxt_debug(task, "event conn sendfile sent:%z", sent);
183
184 b = jbs->out;
185
186 /* The job must be destroyed before connection error handler. */
187 nxt_job_destroy(jbs);
188
189 if (c->write_state->process_buffers) {
184
185 b = jbs->out;
186
187 /* The job must be destroyed before connection error handler. */
188 nxt_job_destroy(jbs);
189
190 if (c->write_state->process_buffers) {
190 b = nxt_event_conn_job_sendfile_completion(thr, c, b);
191 b = nxt_event_conn_job_sendfile_completion(task, c, b);
191
192 done = (b == NULL);
193
194 /* Add data which might be added after sendfile job has started. */
195 nxt_buf_chain_add(&b, c->write);
196 c->write = b;
197
198 if (done) {
199 /* All data has been sent. */
200
201 if (b != NULL) {
202 /* But new data has been added. */
192
193 done = (b == NULL);
194
195 /* Add data which might be added after sendfile job has started. */
196 nxt_buf_chain_add(&b, c->write);
197 c->write = b;
198
199 if (done) {
200 /* All data has been sent. */
201
202 if (b != NULL) {
203 /* But new data has been added. */
203 nxt_event_conn_job_sendfile_start(thr, c, NULL);
204 nxt_event_conn_job_sendfile_start(task, c, NULL);
204 }
205
206 return;
207 }
208 }
209
210 if (sent != 0 && c->write_state->autoreset_timer) {
211 nxt_event_timer_disable(&c->write_timer);
212
213 } else if (c->write_timer.state == NXT_EVENT_TIMER_BLOCKED) {
214 c->write_timer.state = NXT_EVENT_TIMER_ACTIVE;
215 }
216
217 if (c->socket.error == 0
205 }
206
207 return;
208 }
209 }
210
211 if (sent != 0 && c->write_state->autoreset_timer) {
212 nxt_event_timer_disable(&c->write_timer);
213
214 } else if (c->write_timer.state == NXT_EVENT_TIMER_BLOCKED) {
215 c->write_timer.state = NXT_EVENT_TIMER_ACTIVE;
216 }
217
218 if (c->socket.error == 0
218 && !nxt_event_conn_write_delayed(thr->engine, c, sent))
219 && !nxt_event_conn_write_delayed(task->thread->engine, c, sent))
219 {
220 {
220 nxt_event_conn_timer(thr->engine, c, c->write_state, &c->write_timer);
221 nxt_event_conn_timer(task->thread->engine, c, c->write_state,
222 &c->write_timer);
221
223
222 nxt_event_fd_oneshot_write(thr->engine, &c->socket);
224 nxt_event_fd_oneshot_write(task->thread->engine, &c->socket);
223 }
224
225 if (sent != 0) {
225 }
226
227 if (sent != 0) {
226 nxt_event_conn_io_handle(thr, c->write_work_queue,
228 nxt_event_conn_io_handle(task->thread, c->write_work_queue,
227 c->write_state->ready_handler,
229 c->write_state->ready_handler,
228 c, c->socket.data);
230 task, c, c->socket.data);
229 /*
230 * Fall through if first operations were
231 * successful but the last one failed.
232 */
233 }
234
235 if (nxt_slow_path(c->socket.error != 0)) {
231 /*
232 * Fall through if first operations were
233 * successful but the last one failed.
234 */
235 }
236
237 if (nxt_slow_path(c->socket.error != 0)) {
236 nxt_event_conn_io_handle(thr, c->write_work_queue,
238 nxt_event_conn_io_handle(task->thread, c->write_work_queue,
237 c->write_state->error_handler,
239 c->write_state->error_handler,
238 c, c->socket.data);
240 task, c, c->socket.data);
239 }
240}
241
242
243static nxt_buf_t *
241 }
242}
243
244
245static nxt_buf_t *
244nxt_event_conn_job_sendfile_completion(nxt_thread_t *thr, nxt_event_conn_t *c,
246nxt_event_conn_job_sendfile_completion(nxt_task_t *task, nxt_event_conn_t *c,
245 nxt_buf_t *b)
246{
247 while (b != NULL) {
248
249 nxt_prefetch(b->next);
250
251 if (nxt_buf_is_mem(b) && b->mem.pos != b->mem.free) {
252 break;
253
254 } else if (nxt_buf_is_file(b) && b->file_pos != b->file_end) {
255 break;
256 }
257
247 nxt_buf_t *b)
248{
249 while (b != NULL) {
250
251 nxt_prefetch(b->next);
252
253 if (nxt_buf_is_mem(b) && b->mem.pos != b->mem.free) {
254 break;
255
256 } else if (nxt_buf_is_file(b) && b->file_pos != b->file_end) {
257 break;
258 }
259
258 nxt_thread_work_queue_add(thr, c->write_work_queue,
259 b->completion_handler,
260 b, b->parent, thr->log);
260 nxt_thread_work_queue_add(task->thread, c->write_work_queue,
261 b->completion_handler, task, b, b->parent);
261
262 b = b->next;
263 }
264
265 return b;
266}
267
268#endif
262
263 b = b->next;
264 }
265
266 return b;
267}
268
269#endif