nxt_stream_source.c (0:a63ceefd6ab0) nxt_stream_source.c (1:fdc027c56872)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8
9
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8
9
10static void nxt_stream_source_connected(nxt_thread_t *thr, void *obj,
10static void nxt_stream_source_connected(nxt_task_t *task, void *obj,
11 void *data);
11 void *data);
12static void nxt_stream_source_write_ready(nxt_thread_t *thr, void *obj,
12static void nxt_stream_source_write_ready(nxt_task_t *task, void *obj,
13 void *data);
13 void *data);
14static void nxt_stream_source_read_ready(nxt_thread_t *thr, void *obj,
14static void nxt_stream_source_read_ready(nxt_task_t *task, void *obj,
15 void *data);
16static nxt_buf_t *nxt_stream_source_process_buffers(nxt_stream_source_t *stream,
17 nxt_event_conn_t *c);
15 void *data);
16static nxt_buf_t *nxt_stream_source_process_buffers(nxt_stream_source_t *stream,
17 nxt_event_conn_t *c);
18static void nxt_stream_source_buf_completion(nxt_thread_t *thr, void *obj,
18static void nxt_stream_source_buf_completion(nxt_task_t *task, void *obj,
19 void *data);
19 void *data);
20static void nxt_stream_source_read_done(nxt_thread_t *thr, void *obj,
20static void nxt_stream_source_read_done(nxt_task_t *task, void *obj,
21 void *data);
21 void *data);
22static void nxt_stream_source_refused(nxt_thread_t *thr, void *obj, void *data);
23static void nxt_stream_source_closed(nxt_thread_t *thr, void *obj, void *data);
24static void nxt_stream_source_error(nxt_thread_t *thr, void *obj, void *data);
25static void nxt_stream_source_close(nxt_thread_t *thr,
22static void nxt_stream_source_refused(nxt_task_t *task, void *obj, void *data);
23static void nxt_stream_source_closed(nxt_task_t *task, void *obj, void *data);
24static void nxt_stream_source_error(nxt_task_t *task, void *obj, void *data);
25static void nxt_stream_source_close(nxt_task_t *task,
26 nxt_stream_source_t *stream);
27
28
29static const nxt_event_conn_state_t nxt_stream_source_connect_state;
30static const nxt_event_conn_state_t nxt_stream_source_request_write_state;
31static const nxt_event_conn_state_t nxt_stream_source_response_ready_state;
32static const nxt_event_conn_state_t nxt_stream_source_response_read_state;
33
34
35void
26 nxt_stream_source_t *stream);
27
28
29static const nxt_event_conn_state_t nxt_stream_source_connect_state;
30static const nxt_event_conn_state_t nxt_stream_source_request_write_state;
31static const nxt_event_conn_state_t nxt_stream_source_response_ready_state;
32static const nxt_event_conn_state_t nxt_stream_source_response_read_state;
33
34
35void
36nxt_stream_source_connect(nxt_stream_source_t *stream)
36nxt_stream_source_connect(nxt_task_t *task, nxt_stream_source_t *stream)
37{
38 nxt_thread_t *thr;
39 nxt_event_conn_t *c;
40 nxt_upstream_source_t *us;
41
42 thr = nxt_thread();
43
44 us = stream->upstream;
45
46 if (nxt_slow_path(!nxt_buf_pool_obtainable(&us->buffers))) {
37{
38 nxt_thread_t *thr;
39 nxt_event_conn_t *c;
40 nxt_upstream_source_t *us;
41
42 thr = nxt_thread();
43
44 us = stream->upstream;
45
46 if (nxt_slow_path(!nxt_buf_pool_obtainable(&us->buffers))) {
47 nxt_thread_log_error(NXT_LOG_ERR, "%d buffers %uDK each "
48 "are not enough to read upstream response",
49 us->buffers.max, us->buffers.size / 1024);
47 nxt_log(task, NXT_LOG_ERR,
48 "%d buffers %uDK each are not enough to read upstream response",
49 us->buffers.max, us->buffers.size / 1024);
50 goto fail;
51 }
52
53 c = nxt_event_conn_create(us->buffers.mem_pool, thr->log);
54 if (nxt_slow_path(c == NULL)) {
55 goto fail;
56 }
57
58 stream->conn = c;
59 c->socket.data = stream;
60
61 nxt_event_conn_work_queue_set(c, us->work_queue);
62
63 c->remote = us->peer->sockaddr;
64 c->write_state = &nxt_stream_source_connect_state;
65
50 goto fail;
51 }
52
53 c = nxt_event_conn_create(us->buffers.mem_pool, thr->log);
54 if (nxt_slow_path(c == NULL)) {
55 goto fail;
56 }
57
58 stream->conn = c;
59 c->socket.data = stream;
60
61 nxt_event_conn_work_queue_set(c, us->work_queue);
62
63 c->remote = us->peer->sockaddr;
64 c->write_state = &nxt_stream_source_connect_state;
65
66 nxt_event_conn_connect(thr, c);
66 nxt_event_conn_connect(task, c);
67 return;
68
69fail:
70
67 return;
68
69fail:
70
71 stream->error_handler(stream);
71 stream->error_handler(task, stream);
72}
73
74
75static const nxt_event_conn_state_t nxt_stream_source_connect_state
76 nxt_aligned(64) =
77{
78 NXT_EVENT_NO_BUF_PROCESS,
79 NXT_EVENT_TIMER_AUTORESET,

--- 4 unchanged lines hidden (view full) ---

84
85 NULL, /* timeout */
86 NULL, /* timeout value */
87 0, /* connect_timeout */
88};
89
90
91static void
72}
73
74
75static const nxt_event_conn_state_t nxt_stream_source_connect_state
76 nxt_aligned(64) =
77{
78 NXT_EVENT_NO_BUF_PROCESS,
79 NXT_EVENT_TIMER_AUTORESET,

--- 4 unchanged lines hidden (view full) ---

84
85 NULL, /* timeout */
86 NULL, /* timeout value */
87 0, /* connect_timeout */
88};
89
90
91static void
92nxt_stream_source_connected(nxt_thread_t *thr, void *obj, void *data)
92nxt_stream_source_connected(nxt_task_t *task, void *obj, void *data)
93{
94 nxt_event_conn_t *c;
95 nxt_stream_source_t *stream;
96
97 c = obj;
98 stream = data;
99
93{
94 nxt_event_conn_t *c;
95 nxt_stream_source_t *stream;
96
97 c = obj;
98 stream = data;
99
100 nxt_log_debug(thr->log, "stream source connected fd:%d", c->socket.fd);
100 nxt_debug(task, "stream source connected fd:%d", c->socket.fd);
101
102 c->read_state = &nxt_stream_source_response_ready_state;
103 c->write = stream->out;
104 c->write_state = &nxt_stream_source_request_write_state;
105
101
102 c->read_state = &nxt_stream_source_response_ready_state;
103 c->write = stream->out;
104 c->write_state = &nxt_stream_source_request_write_state;
105
106 if (thr->engine->batch != 0) {
107 nxt_event_conn_write(thr, c);
106 if (task->thread->engine->batch != 0) {
107 nxt_event_conn_write(task, c);
108
109 } else {
110 stream->read_queued = 1;
108
109 } else {
110 stream->read_queued = 1;
111 nxt_thread_work_queue_add(thr, &thr->engine->read_work_queue,
112 c->io->read, c, stream, thr->log);
111 nxt_thread_work_queue_add(task->thread,
112 &task->thread->engine->read_work_queue,
113 c->io->read, task, c, stream);
113
114
114 c->io->write(thr, c, stream);
115 c->io->write(task, c, stream);
115 }
116}
117
118
119static const nxt_event_conn_state_t nxt_stream_source_request_write_state
120 nxt_aligned(64) =
121{
122 NXT_EVENT_NO_BUF_PROCESS,

--- 21 unchanged lines hidden (view full) ---

144
145 NULL, /* timeout */
146 NULL, /* timeout value */
147 0, /* connect_timeout */
148};
149
150
151static void
116 }
117}
118
119
120static const nxt_event_conn_state_t nxt_stream_source_request_write_state
121 nxt_aligned(64) =
122{
123 NXT_EVENT_NO_BUF_PROCESS,

--- 21 unchanged lines hidden (view full) ---

145
146 NULL, /* timeout */
147 NULL, /* timeout value */
148 0, /* connect_timeout */
149};
150
151
152static void
152nxt_stream_source_write_ready(nxt_thread_t *thr, void *obj, void *data)
153nxt_stream_source_write_ready(nxt_task_t *task, void *obj, void *data)
153{
154 nxt_event_conn_t *c;
155
156 c = obj;
157
154{
155 nxt_event_conn_t *c;
156
157 c = obj;
158
158 nxt_log_debug(thr->log, "stream source write ready fd:%d", c->socket.fd);
159 nxt_debug(task, "stream source write ready fd:%d", c->socket.fd);
159
160
160 nxt_event_conn_read(thr, c);
161 nxt_event_conn_read(task, c);
161}
162
163
164static void
162}
163
164
165static void
165nxt_stream_source_read_ready(nxt_thread_t *thr, void *obj, void *data)
166nxt_stream_source_read_ready(nxt_task_t *task, void *obj, void *data)
166{
167 nxt_int_t ret;
168 nxt_buf_t *b;
169 nxt_buf_pool_t *buffers;
170 nxt_event_conn_t *c;
171 nxt_stream_source_t *stream;
172
173 c = obj;
174 stream = data;
175 stream->read_queued = 0;
176
167{
168 nxt_int_t ret;
169 nxt_buf_t *b;
170 nxt_buf_pool_t *buffers;
171 nxt_event_conn_t *c;
172 nxt_stream_source_t *stream;
173
174 c = obj;
175 stream = data;
176 stream->read_queued = 0;
177
177 nxt_log_debug(thr->log, "stream source read ready fd:%d", c->socket.fd);
178 nxt_debug(task, "stream source read ready fd:%d", c->socket.fd);
178
179 if (c->read == NULL) {
180
181 buffers = &stream->upstream->buffers;
182
183 ret = nxt_buf_pool_mem_alloc(buffers, 0);
184
185 if (nxt_slow_path(ret != NXT_OK)) {
186
187 if (nxt_slow_path(ret == NXT_ERROR)) {
188 goto fail;
189 }
190
191 /* ret == NXT_AGAIN */
192
179
180 if (c->read == NULL) {
181
182 buffers = &stream->upstream->buffers;
183
184 ret = nxt_buf_pool_mem_alloc(buffers, 0);
185
186 if (nxt_slow_path(ret != NXT_OK)) {
187
188 if (nxt_slow_path(ret == NXT_ERROR)) {
189 goto fail;
190 }
191
192 /* ret == NXT_AGAIN */
193
193 nxt_log_debug(thr->log, "stream source flush");
194 nxt_debug(task, "stream source flush");
194
195 b = nxt_buf_sync_alloc(buffers->mem_pool, NXT_BUF_SYNC_NOBUF);
196
197 if (nxt_slow_path(b == NULL)) {
198 goto fail;
199 }
200
195
196 b = nxt_buf_sync_alloc(buffers->mem_pool, NXT_BUF_SYNC_NOBUF);
197
198 if (nxt_slow_path(b == NULL)) {
199 goto fail;
200 }
201
201 nxt_event_fd_block_read(thr->engine, &c->socket);
202 nxt_event_fd_block_read(task->thread->engine, &c->socket);
202
203
203 nxt_source_filter(thr, c->write_work_queue, stream->next, b);
204 nxt_source_filter(task->thread, c->write_work_queue, task,
205 stream->next, b);
204 return;
205 }
206
207 c->read = buffers->current;
208 buffers->current = NULL;
209 }
210
211 c->read_state = &nxt_stream_source_response_read_state;
212
206 return;
207 }
208
209 c->read = buffers->current;
210 buffers->current = NULL;
211 }
212
213 c->read_state = &nxt_stream_source_response_read_state;
214
213 nxt_event_conn_read(thr, c);
215 nxt_event_conn_read(task, c);
214 return;
215
216fail:
217
216 return;
217
218fail:
219
218 nxt_stream_source_close(thr, stream);
220 nxt_stream_source_close(task, stream);
219}
220
221
222static const nxt_event_conn_state_t nxt_stream_source_response_read_state
223 nxt_aligned(64) =
224{
225 NXT_EVENT_NO_BUF_PROCESS,
226 NXT_EVENT_TIMER_AUTORESET,

--- 4 unchanged lines hidden (view full) ---

231
232 NULL, /* timeout */
233 NULL, /* timeout value */
234 0, /* connect_timeout */
235};
236
237
238static void
221}
222
223
224static const nxt_event_conn_state_t nxt_stream_source_response_read_state
225 nxt_aligned(64) =
226{
227 NXT_EVENT_NO_BUF_PROCESS,
228 NXT_EVENT_TIMER_AUTORESET,

--- 4 unchanged lines hidden (view full) ---

233
234 NULL, /* timeout */
235 NULL, /* timeout value */
236 0, /* connect_timeout */
237};
238
239
240static void
239nxt_stream_source_read_done(nxt_thread_t *thr, void *obj, void *data)
241nxt_stream_source_read_done(nxt_task_t *task, void *obj, void *data)
240{
241 nxt_buf_t *b;
242 nxt_bool_t batch;
243 nxt_event_conn_t *c;
244 nxt_stream_source_t *stream;
245
246 c = obj;
247 stream = data;
248
242{
243 nxt_buf_t *b;
244 nxt_bool_t batch;
245 nxt_event_conn_t *c;
246 nxt_stream_source_t *stream;
247
248 c = obj;
249 stream = data;
250
249 nxt_log_debug(thr->log, "stream source read done fd:%d", c->socket.fd);
251 nxt_debug(task, "stream source read done fd:%d", c->socket.fd);
250
251 if (c->read != NULL) {
252 b = nxt_stream_source_process_buffers(stream, c);
253
254 if (nxt_slow_path(b == NULL)) {
252
253 if (c->read != NULL) {
254 b = nxt_stream_source_process_buffers(stream, c);
255
256 if (nxt_slow_path(b == NULL)) {
255 nxt_stream_source_close(thr, stream);
257 nxt_stream_source_close(task, stream);
256 return;
257 }
258
258 return;
259 }
260
259 batch = (thr->engine->batch != 0);
261 batch = (task->thread->engine->batch != 0);
260
261 if (batch) {
262
263 if (batch) {
262 nxt_thread_work_queue_add(thr, stream->upstream->work_queue,
264 nxt_thread_work_queue_add(task->thread,
265 stream->upstream->work_queue,
263 nxt_source_filter_handler,
266 nxt_source_filter_handler,
264 stream->next, b, thr->log);
267 task, stream->next, b);
265 }
266
267 if (!stream->read_queued) {
268 stream->read_queued = 1;
268 }
269
270 if (!stream->read_queued) {
271 stream->read_queued = 1;
269 nxt_thread_work_queue_add(thr, stream->upstream->work_queue,
272 nxt_thread_work_queue_add(task->thread,
273 stream->upstream->work_queue,
270 nxt_stream_source_read_ready,
274 nxt_stream_source_read_ready,
271 c, stream, thr->log);
275 task, c, stream);
272 }
273
274 if (!batch) {
276 }
277
278 if (!batch) {
275 stream->next->filter(thr, stream->next->context, b);
279 stream->next->filter(task, stream->next->context, b);
276 }
277 }
278}
279
280
281static nxt_buf_t *
282nxt_stream_source_process_buffers(nxt_stream_source_t *stream,
283 nxt_event_conn_t *c)

--- 46 unchanged lines hidden (view full) ---

330
331 } while (c->read != NULL);
332
333 return head;
334}
335
336
337static void
280 }
281 }
282}
283
284
285static nxt_buf_t *
286nxt_stream_source_process_buffers(nxt_stream_source_t *stream,
287 nxt_event_conn_t *c)

--- 46 unchanged lines hidden (view full) ---

334
335 } while (c->read != NULL);
336
337 return head;
338}
339
340
341static void
338nxt_stream_source_buf_completion(nxt_thread_t *thr, void *obj, void *data)
342nxt_stream_source_buf_completion(nxt_task_t *task, void *obj, void *data)
339{
340 size_t size;
341 nxt_buf_t *b, *parent;
342 nxt_stream_source_t *stream;
343
344 b = obj;
345 parent = data;
346
347#if 0
343{
344 size_t size;
345 nxt_buf_t *b, *parent;
346 nxt_stream_source_t *stream;
347
348 b = obj;
349 parent = data;
350
351#if 0
348 nxt_log_debug(thr->log,
352 nxt_debug(thr->log,
349 "stream source buf completion: %p parent:%p retain:%uD",
350 b, parent, parent->retain);
351#endif
352
353 stream = b->data;
354
355 /* A parent is a buffer where stream reads data. */
356

--- 13 unchanged lines hidden (view full) ---

370 if (size >= 256 || size >= stream->upstream->buffers.size) {
371
372 if (stream->conn->read != parent) {
373 nxt_buf_chain_add(&stream->conn->read, parent);
374 }
375
376 if (!stream->read_queued) {
377 stream->read_queued = 1;
353 "stream source buf completion: %p parent:%p retain:%uD",
354 b, parent, parent->retain);
355#endif
356
357 stream = b->data;
358
359 /* A parent is a buffer where stream reads data. */
360

--- 13 unchanged lines hidden (view full) ---

374 if (size >= 256 || size >= stream->upstream->buffers.size) {
375
376 if (stream->conn->read != parent) {
377 nxt_buf_chain_add(&stream->conn->read, parent);
378 }
379
380 if (!stream->read_queued) {
381 stream->read_queued = 1;
378 nxt_thread_work_queue_add(thr, stream->upstream->work_queue,
382 nxt_thread_work_queue_add(task->thread,
383 stream->upstream->work_queue,
379 nxt_stream_source_read_ready,
384 nxt_stream_source_read_ready,
380 stream->conn,
381 stream->conn->socket.data,
382 stream->conn->socket.log);
385 task, stream->conn,
386 stream->conn->socket.data);
383 }
384 }
385 }
386
387 nxt_buf_free(stream->upstream->buffers.mem_pool, b);
388}
389
390
391static void
387 }
388 }
389 }
390
391 nxt_buf_free(stream->upstream->buffers.mem_pool, b);
392}
393
394
395static void
392nxt_stream_source_refused(nxt_thread_t *thr, void *obj, void *data)
396nxt_stream_source_refused(nxt_task_t *task, void *obj, void *data)
393{
394 nxt_stream_source_t *stream;
395
396 stream = data;
397
398#if (NXT_DEBUG)
399 {
400 nxt_event_conn_t *c;
401
402 c = obj;
403
397{
398 nxt_stream_source_t *stream;
399
400 stream = data;
401
402#if (NXT_DEBUG)
403 {
404 nxt_event_conn_t *c;
405
406 c = obj;
407
404 nxt_log_debug(thr->log, "stream source refused fd:%d", c->socket.fd);
408 nxt_debug(task, "stream source refused fd:%d", c->socket.fd);
405 }
406#endif
407
409 }
410#endif
411
408 nxt_stream_source_close(thr, stream);
412 nxt_stream_source_close(task, stream);
409}
410
411
412static void
413}
414
415
416static void
413nxt_stream_source_closed(nxt_thread_t *thr, void *obj, void *data)
417nxt_stream_source_closed(nxt_task_t *task, void *obj, void *data)
414{
415 nxt_buf_t *b;
416 nxt_event_conn_t *c;
417 nxt_stream_source_t *stream;
418
419 c = obj;
420 stream = data;
421
418{
419 nxt_buf_t *b;
420 nxt_event_conn_t *c;
421 nxt_stream_source_t *stream;
422
423 c = obj;
424 stream = data;
425
422 nxt_log_debug(thr->log, "stream source closed fd:%d", c->socket.fd);
426 nxt_debug(task, "stream source closed fd:%d", c->socket.fd);
423
427
424 nxt_event_conn_close(thr, c);
428 nxt_event_conn_close(task, c);
425
426 b = nxt_buf_sync_alloc(stream->upstream->buffers.mem_pool,
427 NXT_BUF_SYNC_LAST);
428
429 if (nxt_slow_path(b == NULL)) {
429
430 b = nxt_buf_sync_alloc(stream->upstream->buffers.mem_pool,
431 NXT_BUF_SYNC_LAST);
432
433 if (nxt_slow_path(b == NULL)) {
430 stream->error_handler(stream);
434 stream->error_handler(task, stream);
431 return;
432 }
433
435 return;
436 }
437
434 nxt_source_filter(thr, c->write_work_queue, stream->next, b);
438 nxt_source_filter(task->thread, c->write_work_queue, task, stream->next, b);
435}
436
437
438static void
439}
440
441
442static void
439nxt_stream_source_error(nxt_thread_t *thr, void *obj, void *data)
443nxt_stream_source_error(nxt_task_t *task, void *obj, void *data)
440{
441 nxt_stream_source_t *stream;
442
443 stream = data;
444
445#if (NXT_DEBUG)
446 {
447 nxt_event_fd_t *ev;
448
449 ev = obj;
450
444{
445 nxt_stream_source_t *stream;
446
447 stream = data;
448
449#if (NXT_DEBUG)
450 {
451 nxt_event_fd_t *ev;
452
453 ev = obj;
454
451 nxt_log_debug(thr->log, "stream source error fd:%d", ev->fd);
455 nxt_debug(task, "stream source error fd:%d", ev->fd);
452 }
453#endif
454
456 }
457#endif
458
455 nxt_stream_source_close(thr, stream);
459 nxt_stream_source_close(task, stream);
456}
457
458
459static void
460}
461
462
463static void
460nxt_stream_source_close(nxt_thread_t *thr, nxt_stream_source_t *stream)
464nxt_stream_source_close(nxt_task_t *task, nxt_stream_source_t *stream)
461{
465{
462 nxt_event_conn_close(thr, stream->conn);
466 nxt_event_conn_close(task, stream->conn);
463
467
464 stream->error_handler(stream);
468 stream->error_handler(task, stream);
465}
466
467
468void
469}
470
471
472void
469nxt_source_filter_handler(nxt_thread_t *thr, void *obj, void *data)
473nxt_source_filter_handler(nxt_task_t *task, void *obj, void *data)
470{
471 nxt_source_hook_t *next;
472
473 next = obj;
474
474{
475 nxt_source_hook_t *next;
476
477 next = obj;
478
475 next->filter(thr, next->context, data);
479 next->filter(task, next->context, data);
476}
480}