nxt_stream_source.c (1:fdc027c56872) nxt_stream_source.c (62:5e1efcc7b740)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8

--- 44 unchanged lines hidden (view full) ---

53 c = nxt_event_conn_create(us->buffers.mem_pool, thr->log);
54 if (nxt_slow_path(c == NULL)) {
55 goto fail;
56 }
57
58 stream->conn = c;
59 c->socket.data = stream;
60
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8

--- 44 unchanged lines hidden (view full) ---

53 c = nxt_event_conn_create(us->buffers.mem_pool, thr->log);
54 if (nxt_slow_path(c == NULL)) {
55 goto fail;
56 }
57
58 stream->conn = c;
59 c->socket.data = stream;
60
61 nxt_event_conn_work_queue_set(c, us->work_queue);
61 nxt_conn_work_queue_set(c, us->work_queue);
62
63 c->remote = us->peer->sockaddr;
64 c->write_state = &nxt_stream_source_connect_state;
65
66 nxt_event_conn_connect(task, c);
67 return;
68
69fail:

--- 83 unchanged lines hidden (view full) ---

153nxt_stream_source_write_ready(nxt_task_t *task, void *obj, void *data)
154{
155 nxt_event_conn_t *c;
156
157 c = obj;
158
159 nxt_debug(task, "stream source write ready fd:%d", c->socket.fd);
160
62
63 c->remote = us->peer->sockaddr;
64 c->write_state = &nxt_stream_source_connect_state;
65
66 nxt_event_conn_connect(task, c);
67 return;
68
69fail:

--- 83 unchanged lines hidden (view full) ---

153nxt_stream_source_write_ready(nxt_task_t *task, void *obj, void *data)
154{
155 nxt_event_conn_t *c;
156
157 c = obj;
158
159 nxt_debug(task, "stream source write ready fd:%d", c->socket.fd);
160
161 nxt_event_conn_read(task, c);
161 nxt_conn_read(task, c);
162}
163
164
165static void
166nxt_stream_source_read_ready(nxt_task_t *task, void *obj, void *data)
167{
168 nxt_int_t ret;
169 nxt_buf_t *b;

--- 37 unchanged lines hidden (view full) ---

207 }
208
209 c->read = buffers->current;
210 buffers->current = NULL;
211 }
212
213 c->read_state = &nxt_stream_source_response_read_state;
214
162}
163
164
165static void
166nxt_stream_source_read_ready(nxt_task_t *task, void *obj, void *data)
167{
168 nxt_int_t ret;
169 nxt_buf_t *b;

--- 37 unchanged lines hidden (view full) ---

207 }
208
209 c->read = buffers->current;
210 buffers->current = NULL;
211 }
212
213 c->read_state = &nxt_stream_source_response_read_state;
214
215 nxt_event_conn_read(task, c);
215 nxt_conn_read(task, c);
216 return;
217
218fail:
219
220 nxt_stream_source_close(task, stream);
221}
222
223

--- 196 unchanged lines hidden (view full) ---

420 nxt_event_conn_t *c;
421 nxt_stream_source_t *stream;
422
423 c = obj;
424 stream = data;
425
426 nxt_debug(task, "stream source closed fd:%d", c->socket.fd);
427
216 return;
217
218fail:
219
220 nxt_stream_source_close(task, stream);
221}
222
223

--- 196 unchanged lines hidden (view full) ---

420 nxt_event_conn_t *c;
421 nxt_stream_source_t *stream;
422
423 c = obj;
424 stream = data;
425
426 nxt_debug(task, "stream source closed fd:%d", c->socket.fd);
427
428 nxt_event_conn_close(task, c);
428 nxt_conn_close(task, c);
429
430 b = nxt_buf_sync_alloc(stream->upstream->buffers.mem_pool,
431 NXT_BUF_SYNC_LAST);
432
433 if (nxt_slow_path(b == NULL)) {
434 stream->error_handler(task, stream);
435 return;
436 }

--- 21 unchanged lines hidden (view full) ---

458
459 nxt_stream_source_close(task, stream);
460}
461
462
463static void
464nxt_stream_source_close(nxt_task_t *task, nxt_stream_source_t *stream)
465{
429
430 b = nxt_buf_sync_alloc(stream->upstream->buffers.mem_pool,
431 NXT_BUF_SYNC_LAST);
432
433 if (nxt_slow_path(b == NULL)) {
434 stream->error_handler(task, stream);
435 return;
436 }

--- 21 unchanged lines hidden (view full) ---

458
459 nxt_stream_source_close(task, stream);
460}
461
462
463static void
464nxt_stream_source_close(nxt_task_t *task, nxt_stream_source_t *stream)
465{
466 nxt_event_conn_close(task, stream->conn);
466 nxt_conn_close(task, stream->conn);
467
468 stream->error_handler(task, stream);
469}
470
471
472void
473nxt_source_filter_handler(nxt_task_t *task, void *obj, void *data)
474{
475 nxt_source_hook_t *next;
476
477 next = obj;
478
479 next->filter(task, next->context, data);
480}
467
468 stream->error_handler(task, stream);
469}
470
471
472void
473nxt_source_filter_handler(nxt_task_t *task, void *obj, void *data)
474{
475 nxt_source_hook_t *next;
476
477 next = obj;
478
479 next->filter(task, next->context, data);
480}