nxt_event_conn_job_sendfile.c (56:92b4984ca3c1) nxt_event_conn_job_sendfile.c (62:5e1efcc7b740)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8
9
10#if (NXT_THREADS)
11
12typedef struct {
13 nxt_job_t job;
14 nxt_buf_t *out;
15 size_t sent;
16 size_t limit;
17 nxt_work_handler_t ready_handler;
18} nxt_job_sendfile_t;
19
20
21static void nxt_event_conn_job_sendfile_start(nxt_task_t *task, void *obj,
22 void *data);
23static void nxt_event_conn_job_sendfile_handler(nxt_task_t *task, void *obj,
24 void *data);
25static void nxt_event_conn_job_sendfile_return(nxt_task_t *task, void *obj,
26 void *data);
27static nxt_buf_t *nxt_event_conn_job_sendfile_completion(nxt_task_t *task,
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8
9
10#if (NXT_THREADS)
11
12typedef struct {
13 nxt_job_t job;
14 nxt_buf_t *out;
15 size_t sent;
16 size_t limit;
17 nxt_work_handler_t ready_handler;
18} nxt_job_sendfile_t;
19
20
21static void nxt_event_conn_job_sendfile_start(nxt_task_t *task, void *obj,
22 void *data);
23static void nxt_event_conn_job_sendfile_handler(nxt_task_t *task, void *obj,
24 void *data);
25static void nxt_event_conn_job_sendfile_return(nxt_task_t *task, void *obj,
26 void *data);
27static nxt_buf_t *nxt_event_conn_job_sendfile_completion(nxt_task_t *task,
28 nxt_event_conn_t *c, nxt_buf_t *b);
28 nxt_conn_t *c, nxt_buf_t *b);
29
30
31void
29
30
31void
32nxt_event_conn_job_sendfile(nxt_task_t *task, nxt_event_conn_t *c)
32nxt_event_conn_job_sendfile(nxt_task_t *task, nxt_conn_t *c)
33{
34 nxt_fd_event_disable(task->thread->engine, &c->socket);
35
36 /* A work item data is not used in nxt_event_conn_job_sendfile_start(). */
37 nxt_event_conn_job_sendfile_start(task, c, NULL);
38}
39
40
41static void
42nxt_event_conn_job_sendfile_start(nxt_task_t *task, void *obj, void *data)
43{
33{
34 nxt_fd_event_disable(task->thread->engine, &c->socket);
35
36 /* A work item data is not used in nxt_event_conn_job_sendfile_start(). */
37 nxt_event_conn_job_sendfile_start(task, c, NULL);
38}
39
40
41static void
42nxt_event_conn_job_sendfile_start(nxt_task_t *task, void *obj, void *data)
43{
44 nxt_conn_t *c;
44 nxt_iobuf_t b;
45 nxt_iobuf_t b;
45 nxt_event_conn_t *c;
46 nxt_job_sendfile_t *jbs;
47 nxt_sendbuf_coalesce_t sb;
48
49 c = obj;
50
51 nxt_debug(task, "event conn sendfile fd:%d", c->socket.fd);
52
53 jbs = nxt_job_create(c->mem_pool, sizeof(nxt_job_sendfile_t));
54
55 if (nxt_slow_path(jbs == NULL)) {
56 c->write_state->error_handler(task, c, NULL);
57 return;
58 }
59
60 c->socket.write_handler = nxt_event_conn_job_sendfile_start;
61 c->socket.error_handler = c->write_state->error_handler;
62
63 jbs->job.data = c;
64 nxt_job_set_name(&jbs->job, "job sendfile");
65
66 jbs->limit = nxt_event_conn_write_limit(c);
67
68 if (jbs->limit != 0) {
69
70 sb.buf = c->write;
71 sb.iobuf = &b;
72 sb.nmax = 1;
73 sb.sync = 0;
74 sb.size = 0;
75 sb.limit = jbs->limit;
76
77 if (nxt_sendbuf_mem_coalesce(c->socket.task, &sb) != 0 || !sb.sync) {
78
79 jbs->job.thread_pool = c->u.thread_pool;
80 jbs->job.log = c->socket.log;
81 jbs->out = c->write;
82 c->write = NULL;
83 jbs->ready_handler = nxt_event_conn_job_sendfile_return;
84
85 c->blocked = 1;
86
87 nxt_job_start(task, &jbs->job, nxt_event_conn_job_sendfile_handler);
88 return;
89 }
90 }
91
92 nxt_event_conn_job_sendfile_return(task, jbs, c);
93}
94
95
96static void
97nxt_event_conn_job_sendfile_handler(nxt_task_t *task, void *obj, void *data)
98{
99 ssize_t ret;
100 nxt_buf_t *b;
101 nxt_bool_t first;
46 nxt_job_sendfile_t *jbs;
47 nxt_sendbuf_coalesce_t sb;
48
49 c = obj;
50
51 nxt_debug(task, "event conn sendfile fd:%d", c->socket.fd);
52
53 jbs = nxt_job_create(c->mem_pool, sizeof(nxt_job_sendfile_t));
54
55 if (nxt_slow_path(jbs == NULL)) {
56 c->write_state->error_handler(task, c, NULL);
57 return;
58 }
59
60 c->socket.write_handler = nxt_event_conn_job_sendfile_start;
61 c->socket.error_handler = c->write_state->error_handler;
62
63 jbs->job.data = c;
64 nxt_job_set_name(&jbs->job, "job sendfile");
65
66 jbs->limit = nxt_event_conn_write_limit(c);
67
68 if (jbs->limit != 0) {
69
70 sb.buf = c->write;
71 sb.iobuf = &b;
72 sb.nmax = 1;
73 sb.sync = 0;
74 sb.size = 0;
75 sb.limit = jbs->limit;
76
77 if (nxt_sendbuf_mem_coalesce(c->socket.task, &sb) != 0 || !sb.sync) {
78
79 jbs->job.thread_pool = c->u.thread_pool;
80 jbs->job.log = c->socket.log;
81 jbs->out = c->write;
82 c->write = NULL;
83 jbs->ready_handler = nxt_event_conn_job_sendfile_return;
84
85 c->blocked = 1;
86
87 nxt_job_start(task, &jbs->job, nxt_event_conn_job_sendfile_handler);
88 return;
89 }
90 }
91
92 nxt_event_conn_job_sendfile_return(task, jbs, c);
93}
94
95
96static void
97nxt_event_conn_job_sendfile_handler(nxt_task_t *task, void *obj, void *data)
98{
99 ssize_t ret;
100 nxt_buf_t *b;
101 nxt_bool_t first;
102 nxt_event_conn_t *c;
102 nxt_conn_t *c;
103 nxt_job_sendfile_t *jbs;
104
105 jbs = obj;
106 c = data;
107
108 nxt_debug(task, "event conn job sendfile fd:%d", c->socket.fd);
109
110 first = c->socket.write_ready;
111 b = jbs->out;
112
113 do {
114 ret = c->io->sendbuf(c, b, jbs->limit);
115
116 if (ret == NXT_AGAIN) {
117 break;
118 }
119
120 if (nxt_slow_path(ret == NXT_ERROR)) {
121 goto done;
122 }
123
124 jbs->sent += ret;
125 jbs->limit -= ret;
126
127 b = nxt_sendbuf_update(b, ret);
128
129 if (b == NULL) {
130 goto done;
131 }
132
133 if (jbs->limit == 0) {
134
135 if (c->rate == NULL) {
136 jbs->limit = c->max_chunk;
137 goto fast;
138 }
139
140 goto done;
141 }
142
143 } while (c->socket.write_ready);
144
145 if (first && task->thread->thread_pool->work_queue.head != NULL) {
146 goto fast;
147 }
148
149done:
150
151 nxt_job_return(task, &jbs->job, jbs->ready_handler);
152 return;
153
154fast:
155
156 nxt_work_set(&jbs->job.work, nxt_event_conn_job_sendfile_handler,
157 jbs->job.task, jbs, c);
158
159 nxt_thread_pool_post(task->thread->thread_pool, &jbs->job.work);
160}
161
162
163static void
164nxt_event_conn_job_sendfile_return(nxt_task_t *task, void *obj, void *data)
165{
166 size_t sent;
167 nxt_buf_t *b;
168 nxt_bool_t done;
103 nxt_job_sendfile_t *jbs;
104
105 jbs = obj;
106 c = data;
107
108 nxt_debug(task, "event conn job sendfile fd:%d", c->socket.fd);
109
110 first = c->socket.write_ready;
111 b = jbs->out;
112
113 do {
114 ret = c->io->sendbuf(c, b, jbs->limit);
115
116 if (ret == NXT_AGAIN) {
117 break;
118 }
119
120 if (nxt_slow_path(ret == NXT_ERROR)) {
121 goto done;
122 }
123
124 jbs->sent += ret;
125 jbs->limit -= ret;
126
127 b = nxt_sendbuf_update(b, ret);
128
129 if (b == NULL) {
130 goto done;
131 }
132
133 if (jbs->limit == 0) {
134
135 if (c->rate == NULL) {
136 jbs->limit = c->max_chunk;
137 goto fast;
138 }
139
140 goto done;
141 }
142
143 } while (c->socket.write_ready);
144
145 if (first && task->thread->thread_pool->work_queue.head != NULL) {
146 goto fast;
147 }
148
149done:
150
151 nxt_job_return(task, &jbs->job, jbs->ready_handler);
152 return;
153
154fast:
155
156 nxt_work_set(&jbs->job.work, nxt_event_conn_job_sendfile_handler,
157 jbs->job.task, jbs, c);
158
159 nxt_thread_pool_post(task->thread->thread_pool, &jbs->job.work);
160}
161
162
163static void
164nxt_event_conn_job_sendfile_return(nxt_task_t *task, void *obj, void *data)
165{
166 size_t sent;
167 nxt_buf_t *b;
168 nxt_bool_t done;
169 nxt_event_conn_t *c;
169 nxt_conn_t *c;
170 nxt_job_sendfile_t *jbs;
171
172 jbs = obj;
173 c = data;
174
175 c->blocked = 0;
176
177 sent = jbs->sent;
178 c->sent += sent;
179
180 nxt_debug(task, "event conn sendfile sent:%z", sent);
181
182 b = jbs->out;
183
184 /* The job must be destroyed before connection error handler. */
185 nxt_job_destroy(task, jbs);
186
187 if (0 /* STUB: c->write_state->process_buffers */) {
188 b = nxt_event_conn_job_sendfile_completion(task, c, b);
189
190 done = (b == NULL);
191
192 /* Add data which might be added after sendfile job has started. */
193 nxt_buf_chain_add(&b, c->write);
194 c->write = b;
195
196 if (done) {
197 /* All data has been sent. */
198
199 if (b != NULL) {
200 /* But new data has been added. */
201 nxt_event_conn_job_sendfile_start(task, c, NULL);
202 }
203
204 return;
205 }
206 }
207
208 if (sent != 0 && c->write_state->timer_autoreset) {
209 nxt_timer_disable(task->thread->engine, &c->write_timer);
210 }
211
212 if (c->socket.error == 0
213 && !nxt_event_conn_write_delayed(task->thread->engine, c, sent))
214 {
170 nxt_job_sendfile_t *jbs;
171
172 jbs = obj;
173 c = data;
174
175 c->blocked = 0;
176
177 sent = jbs->sent;
178 c->sent += sent;
179
180 nxt_debug(task, "event conn sendfile sent:%z", sent);
181
182 b = jbs->out;
183
184 /* The job must be destroyed before connection error handler. */
185 nxt_job_destroy(task, jbs);
186
187 if (0 /* STUB: c->write_state->process_buffers */) {
188 b = nxt_event_conn_job_sendfile_completion(task, c, b);
189
190 done = (b == NULL);
191
192 /* Add data which might be added after sendfile job has started. */
193 nxt_buf_chain_add(&b, c->write);
194 c->write = b;
195
196 if (done) {
197 /* All data has been sent. */
198
199 if (b != NULL) {
200 /* But new data has been added. */
201 nxt_event_conn_job_sendfile_start(task, c, NULL);
202 }
203
204 return;
205 }
206 }
207
208 if (sent != 0 && c->write_state->timer_autoreset) {
209 nxt_timer_disable(task->thread->engine, &c->write_timer);
210 }
211
212 if (c->socket.error == 0
213 && !nxt_event_conn_write_delayed(task->thread->engine, c, sent))
214 {
215 nxt_event_conn_timer(task->thread->engine, c, c->write_state,
216 &c->write_timer);
215 nxt_conn_timer(task->thread->engine, c, c->write_state,
216 &c->write_timer);
217
218 nxt_fd_event_oneshot_write(task->thread->engine, &c->socket);
219 }
220
221 if (sent != 0) {
222 nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler,
223 task, c, c->socket.data);
224 /*
225 * Fall through if first operations were
226 * successful but the last one failed.
227 */
228 }
229
230 if (nxt_slow_path(c->socket.error != 0)) {
231 nxt_work_queue_add(c->write_work_queue, c->write_state->error_handler,
232 task, c, c->socket.data);
233 }
234}
235
236
237static nxt_buf_t *
217
218 nxt_fd_event_oneshot_write(task->thread->engine, &c->socket);
219 }
220
221 if (sent != 0) {
222 nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler,
223 task, c, c->socket.data);
224 /*
225 * Fall through if first operations were
226 * successful but the last one failed.
227 */
228 }
229
230 if (nxt_slow_path(c->socket.error != 0)) {
231 nxt_work_queue_add(c->write_work_queue, c->write_state->error_handler,
232 task, c, c->socket.data);
233 }
234}
235
236
237static nxt_buf_t *
238nxt_event_conn_job_sendfile_completion(nxt_task_t *task, nxt_event_conn_t *c,
238nxt_event_conn_job_sendfile_completion(nxt_task_t *task, nxt_conn_t *c,
239 nxt_buf_t *b)
240{
241 while (b != NULL) {
242
243 nxt_prefetch(b->next);
244
245 if (nxt_buf_is_mem(b) && b->mem.pos != b->mem.free) {
246 break;
247
248 } else if (nxt_buf_is_file(b) && b->file_pos != b->file_end) {
249 break;
250 }
251
252 nxt_work_queue_add(c->write_work_queue,
253 b->completion_handler, task, b, b->parent);
254
255 b = b->next;
256 }
257
258 return b;
259}
260
261#endif
239 nxt_buf_t *b)
240{
241 while (b != NULL) {
242
243 nxt_prefetch(b->next);
244
245 if (nxt_buf_is_mem(b) && b->mem.pos != b->mem.free) {
246 break;
247
248 } else if (nxt_buf_is_file(b) && b->file_pos != b->file_end) {
249 break;
250 }
251
252 nxt_work_queue_add(c->write_work_queue,
253 b->completion_handler, task, b, b->parent);
254
255 b = b->next;
256 }
257
258 return b;
259}
260
261#endif