10Sigor@sysoev.ru
20Sigor@sysoev.ru /*
30Sigor@sysoev.ru * Copyright (C) Igor Sysoev
40Sigor@sysoev.ru * Copyright (C) NGINX, Inc.
50Sigor@sysoev.ru */
60Sigor@sysoev.ru
70Sigor@sysoev.ru #include <nxt_main.h>
80Sigor@sysoev.ru
90Sigor@sysoev.ru
101Sigor@sysoev.ru static void nxt_stream_source_connected(nxt_task_t *task, void *obj,
110Sigor@sysoev.ru void *data);
121Sigor@sysoev.ru static void nxt_stream_source_write_ready(nxt_task_t *task, void *obj,
130Sigor@sysoev.ru void *data);
141Sigor@sysoev.ru static void nxt_stream_source_read_ready(nxt_task_t *task, void *obj,
150Sigor@sysoev.ru void *data);
160Sigor@sysoev.ru static nxt_buf_t *nxt_stream_source_process_buffers(nxt_stream_source_t *stream,
170Sigor@sysoev.ru nxt_event_conn_t *c);
181Sigor@sysoev.ru static void nxt_stream_source_buf_completion(nxt_task_t *task, void *obj,
190Sigor@sysoev.ru void *data);
201Sigor@sysoev.ru static void nxt_stream_source_read_done(nxt_task_t *task, void *obj,
210Sigor@sysoev.ru void *data);
221Sigor@sysoev.ru static void nxt_stream_source_refused(nxt_task_t *task, void *obj, void *data);
231Sigor@sysoev.ru static void nxt_stream_source_closed(nxt_task_t *task, void *obj, void *data);
241Sigor@sysoev.ru static void nxt_stream_source_error(nxt_task_t *task, void *obj, void *data);
251Sigor@sysoev.ru static void nxt_stream_source_close(nxt_task_t *task,
260Sigor@sysoev.ru nxt_stream_source_t *stream);
270Sigor@sysoev.ru
280Sigor@sysoev.ru
290Sigor@sysoev.ru static const nxt_event_conn_state_t nxt_stream_source_connect_state;
300Sigor@sysoev.ru static const nxt_event_conn_state_t nxt_stream_source_request_write_state;
310Sigor@sysoev.ru static const nxt_event_conn_state_t nxt_stream_source_response_ready_state;
320Sigor@sysoev.ru static const nxt_event_conn_state_t nxt_stream_source_response_read_state;
330Sigor@sysoev.ru
340Sigor@sysoev.ru
350Sigor@sysoev.ru void
nxt_stream_source_connect(nxt_task_t * task,nxt_stream_source_t * stream)361Sigor@sysoev.ru nxt_stream_source_connect(nxt_task_t *task, nxt_stream_source_t *stream)
370Sigor@sysoev.ru {
380Sigor@sysoev.ru nxt_thread_t *thr;
390Sigor@sysoev.ru nxt_event_conn_t *c;
400Sigor@sysoev.ru nxt_upstream_source_t *us;
410Sigor@sysoev.ru
420Sigor@sysoev.ru thr = nxt_thread();
430Sigor@sysoev.ru
440Sigor@sysoev.ru us = stream->upstream;
450Sigor@sysoev.ru
460Sigor@sysoev.ru if (nxt_slow_path(!nxt_buf_pool_obtainable(&us->buffers))) {
471Sigor@sysoev.ru nxt_log(task, NXT_LOG_ERR,
481Sigor@sysoev.ru "%d buffers %uDK each are not enough to read upstream response",
491Sigor@sysoev.ru us->buffers.max, us->buffers.size / 1024);
500Sigor@sysoev.ru goto fail;
510Sigor@sysoev.ru }
520Sigor@sysoev.ru
530Sigor@sysoev.ru c = nxt_event_conn_create(us->buffers.mem_pool, thr->log);
540Sigor@sysoev.ru if (nxt_slow_path(c == NULL)) {
550Sigor@sysoev.ru goto fail;
560Sigor@sysoev.ru }
570Sigor@sysoev.ru
580Sigor@sysoev.ru stream->conn = c;
590Sigor@sysoev.ru c->socket.data = stream;
600Sigor@sysoev.ru
61*62Sigor@sysoev.ru nxt_conn_work_queue_set(c, us->work_queue);
620Sigor@sysoev.ru
630Sigor@sysoev.ru c->remote = us->peer->sockaddr;
640Sigor@sysoev.ru c->write_state = &nxt_stream_source_connect_state;
650Sigor@sysoev.ru
661Sigor@sysoev.ru nxt_event_conn_connect(task, c);
670Sigor@sysoev.ru return;
680Sigor@sysoev.ru
690Sigor@sysoev.ru fail:
700Sigor@sysoev.ru
711Sigor@sysoev.ru stream->error_handler(task, stream);
720Sigor@sysoev.ru }
730Sigor@sysoev.ru
740Sigor@sysoev.ru
750Sigor@sysoev.ru static const nxt_event_conn_state_t nxt_stream_source_connect_state
760Sigor@sysoev.ru nxt_aligned(64) =
770Sigor@sysoev.ru {
780Sigor@sysoev.ru NXT_EVENT_NO_BUF_PROCESS,
790Sigor@sysoev.ru NXT_EVENT_TIMER_AUTORESET,
800Sigor@sysoev.ru
810Sigor@sysoev.ru nxt_stream_source_connected,
820Sigor@sysoev.ru nxt_stream_source_refused,
830Sigor@sysoev.ru nxt_stream_source_error,
840Sigor@sysoev.ru
850Sigor@sysoev.ru NULL, /* timeout */
860Sigor@sysoev.ru NULL, /* timeout value */
870Sigor@sysoev.ru 0, /* connect_timeout */
880Sigor@sysoev.ru };
890Sigor@sysoev.ru
900Sigor@sysoev.ru
910Sigor@sysoev.ru static void
nxt_stream_source_connected(nxt_task_t * task,void * obj,void * data)921Sigor@sysoev.ru nxt_stream_source_connected(nxt_task_t *task, void *obj, void *data)
930Sigor@sysoev.ru {
940Sigor@sysoev.ru nxt_event_conn_t *c;
950Sigor@sysoev.ru nxt_stream_source_t *stream;
960Sigor@sysoev.ru
970Sigor@sysoev.ru c = obj;
980Sigor@sysoev.ru stream = data;
990Sigor@sysoev.ru
1001Sigor@sysoev.ru nxt_debug(task, "stream source connected fd:%d", c->socket.fd);
1010Sigor@sysoev.ru
1020Sigor@sysoev.ru c->read_state = &nxt_stream_source_response_ready_state;
1030Sigor@sysoev.ru c->write = stream->out;
1040Sigor@sysoev.ru c->write_state = &nxt_stream_source_request_write_state;
1050Sigor@sysoev.ru
1061Sigor@sysoev.ru if (task->thread->engine->batch != 0) {
1071Sigor@sysoev.ru nxt_event_conn_write(task, c);
1080Sigor@sysoev.ru
1090Sigor@sysoev.ru } else {
1100Sigor@sysoev.ru stream->read_queued = 1;
1111Sigor@sysoev.ru nxt_thread_work_queue_add(task->thread,
1121Sigor@sysoev.ru &task->thread->engine->read_work_queue,
1131Sigor@sysoev.ru c->io->read, task, c, stream);
1140Sigor@sysoev.ru
1151Sigor@sysoev.ru c->io->write(task, c, stream);
1160Sigor@sysoev.ru }
1170Sigor@sysoev.ru }
1180Sigor@sysoev.ru
1190Sigor@sysoev.ru
1200Sigor@sysoev.ru static const nxt_event_conn_state_t nxt_stream_source_request_write_state
1210Sigor@sysoev.ru nxt_aligned(64) =
1220Sigor@sysoev.ru {
1230Sigor@sysoev.ru NXT_EVENT_NO_BUF_PROCESS,
1240Sigor@sysoev.ru NXT_EVENT_TIMER_AUTORESET,
1250Sigor@sysoev.ru
1260Sigor@sysoev.ru nxt_stream_source_write_ready,
1270Sigor@sysoev.ru NULL,
1280Sigor@sysoev.ru nxt_stream_source_error,
1290Sigor@sysoev.ru
1300Sigor@sysoev.ru NULL, /* timeout */
1310Sigor@sysoev.ru NULL, /* timeout value */
1320Sigor@sysoev.ru 0, /* connect_timeout */
1330Sigor@sysoev.ru };
1340Sigor@sysoev.ru
1350Sigor@sysoev.ru
1360Sigor@sysoev.ru static const nxt_event_conn_state_t nxt_stream_source_response_ready_state
1370Sigor@sysoev.ru nxt_aligned(64) =
1380Sigor@sysoev.ru {
1390Sigor@sysoev.ru NXT_EVENT_NO_BUF_PROCESS,
1400Sigor@sysoev.ru NXT_EVENT_TIMER_AUTORESET,
1410Sigor@sysoev.ru
1420Sigor@sysoev.ru nxt_stream_source_read_ready,
1430Sigor@sysoev.ru nxt_stream_source_closed,
1440Sigor@sysoev.ru nxt_stream_source_error,
1450Sigor@sysoev.ru
1460Sigor@sysoev.ru NULL, /* timeout */
1470Sigor@sysoev.ru NULL, /* timeout value */
1480Sigor@sysoev.ru 0, /* connect_timeout */
1490Sigor@sysoev.ru };
1500Sigor@sysoev.ru
1510Sigor@sysoev.ru
1520Sigor@sysoev.ru static void
nxt_stream_source_write_ready(nxt_task_t * task,void * obj,void * data)1531Sigor@sysoev.ru nxt_stream_source_write_ready(nxt_task_t *task, void *obj, void *data)
1540Sigor@sysoev.ru {
1550Sigor@sysoev.ru nxt_event_conn_t *c;
1560Sigor@sysoev.ru
1570Sigor@sysoev.ru c = obj;
1580Sigor@sysoev.ru
1591Sigor@sysoev.ru nxt_debug(task, "stream source write ready fd:%d", c->socket.fd);
1600Sigor@sysoev.ru
161*62Sigor@sysoev.ru nxt_conn_read(task, c);
1620Sigor@sysoev.ru }
1630Sigor@sysoev.ru
1640Sigor@sysoev.ru
1650Sigor@sysoev.ru static void
nxt_stream_source_read_ready(nxt_task_t * task,void * obj,void * data)1661Sigor@sysoev.ru nxt_stream_source_read_ready(nxt_task_t *task, void *obj, void *data)
1670Sigor@sysoev.ru {
1680Sigor@sysoev.ru nxt_int_t ret;
1690Sigor@sysoev.ru nxt_buf_t *b;
1700Sigor@sysoev.ru nxt_buf_pool_t *buffers;
1710Sigor@sysoev.ru nxt_event_conn_t *c;
1720Sigor@sysoev.ru nxt_stream_source_t *stream;
1730Sigor@sysoev.ru
1740Sigor@sysoev.ru c = obj;
1750Sigor@sysoev.ru stream = data;
1760Sigor@sysoev.ru stream->read_queued = 0;
1770Sigor@sysoev.ru
1781Sigor@sysoev.ru nxt_debug(task, "stream source read ready fd:%d", c->socket.fd);
1790Sigor@sysoev.ru
1800Sigor@sysoev.ru if (c->read == NULL) {
1810Sigor@sysoev.ru
1820Sigor@sysoev.ru buffers = &stream->upstream->buffers;
1830Sigor@sysoev.ru
1840Sigor@sysoev.ru ret = nxt_buf_pool_mem_alloc(buffers, 0);
1850Sigor@sysoev.ru
1860Sigor@sysoev.ru if (nxt_slow_path(ret != NXT_OK)) {
1870Sigor@sysoev.ru
1880Sigor@sysoev.ru if (nxt_slow_path(ret == NXT_ERROR)) {
1890Sigor@sysoev.ru goto fail;
1900Sigor@sysoev.ru }
1910Sigor@sysoev.ru
1920Sigor@sysoev.ru /* ret == NXT_AGAIN */
1930Sigor@sysoev.ru
1941Sigor@sysoev.ru nxt_debug(task, "stream source flush");
1950Sigor@sysoev.ru
1960Sigor@sysoev.ru b = nxt_buf_sync_alloc(buffers->mem_pool, NXT_BUF_SYNC_NOBUF);
1970Sigor@sysoev.ru
1980Sigor@sysoev.ru if (nxt_slow_path(b == NULL)) {
1990Sigor@sysoev.ru goto fail;
2000Sigor@sysoev.ru }
2010Sigor@sysoev.ru
2021Sigor@sysoev.ru nxt_event_fd_block_read(task->thread->engine, &c->socket);
2030Sigor@sysoev.ru
2041Sigor@sysoev.ru nxt_source_filter(task->thread, c->write_work_queue, task,
2051Sigor@sysoev.ru stream->next, b);
2060Sigor@sysoev.ru return;
2070Sigor@sysoev.ru }
2080Sigor@sysoev.ru
2090Sigor@sysoev.ru c->read = buffers->current;
2100Sigor@sysoev.ru buffers->current = NULL;
2110Sigor@sysoev.ru }
2120Sigor@sysoev.ru
2130Sigor@sysoev.ru c->read_state = &nxt_stream_source_response_read_state;
2140Sigor@sysoev.ru
215*62Sigor@sysoev.ru nxt_conn_read(task, c);
2160Sigor@sysoev.ru return;
2170Sigor@sysoev.ru
2180Sigor@sysoev.ru fail:
2190Sigor@sysoev.ru
2201Sigor@sysoev.ru nxt_stream_source_close(task, stream);
2210Sigor@sysoev.ru }
2220Sigor@sysoev.ru
2230Sigor@sysoev.ru
2240Sigor@sysoev.ru static const nxt_event_conn_state_t nxt_stream_source_response_read_state
2250Sigor@sysoev.ru nxt_aligned(64) =
2260Sigor@sysoev.ru {
2270Sigor@sysoev.ru NXT_EVENT_NO_BUF_PROCESS,
2280Sigor@sysoev.ru NXT_EVENT_TIMER_AUTORESET,
2290Sigor@sysoev.ru
2300Sigor@sysoev.ru nxt_stream_source_read_done,
2310Sigor@sysoev.ru nxt_stream_source_closed,
2320Sigor@sysoev.ru nxt_stream_source_error,
2330Sigor@sysoev.ru
2340Sigor@sysoev.ru NULL, /* timeout */
2350Sigor@sysoev.ru NULL, /* timeout value */
2360Sigor@sysoev.ru 0, /* connect_timeout */
2370Sigor@sysoev.ru };
2380Sigor@sysoev.ru
2390Sigor@sysoev.ru
2400Sigor@sysoev.ru static void
nxt_stream_source_read_done(nxt_task_t * task,void * obj,void * data)2411Sigor@sysoev.ru nxt_stream_source_read_done(nxt_task_t *task, void *obj, void *data)
2420Sigor@sysoev.ru {
2430Sigor@sysoev.ru nxt_buf_t *b;
2440Sigor@sysoev.ru nxt_bool_t batch;
2450Sigor@sysoev.ru nxt_event_conn_t *c;
2460Sigor@sysoev.ru nxt_stream_source_t *stream;
2470Sigor@sysoev.ru
2480Sigor@sysoev.ru c = obj;
2490Sigor@sysoev.ru stream = data;
2500Sigor@sysoev.ru
2511Sigor@sysoev.ru nxt_debug(task, "stream source read done fd:%d", c->socket.fd);
2520Sigor@sysoev.ru
2530Sigor@sysoev.ru if (c->read != NULL) {
2540Sigor@sysoev.ru b = nxt_stream_source_process_buffers(stream, c);
2550Sigor@sysoev.ru
2560Sigor@sysoev.ru if (nxt_slow_path(b == NULL)) {
2571Sigor@sysoev.ru nxt_stream_source_close(task, stream);
2580Sigor@sysoev.ru return;
2590Sigor@sysoev.ru }
2600Sigor@sysoev.ru
2611Sigor@sysoev.ru batch = (task->thread->engine->batch != 0);
2620Sigor@sysoev.ru
2630Sigor@sysoev.ru if (batch) {
2641Sigor@sysoev.ru nxt_thread_work_queue_add(task->thread,
2651Sigor@sysoev.ru stream->upstream->work_queue,
2660Sigor@sysoev.ru nxt_source_filter_handler,
2671Sigor@sysoev.ru task, stream->next, b);
2680Sigor@sysoev.ru }
2690Sigor@sysoev.ru
2700Sigor@sysoev.ru if (!stream->read_queued) {
2710Sigor@sysoev.ru stream->read_queued = 1;
2721Sigor@sysoev.ru nxt_thread_work_queue_add(task->thread,
2731Sigor@sysoev.ru stream->upstream->work_queue,
2740Sigor@sysoev.ru nxt_stream_source_read_ready,
2751Sigor@sysoev.ru task, c, stream);
2760Sigor@sysoev.ru }
2770Sigor@sysoev.ru
2780Sigor@sysoev.ru if (!batch) {
2791Sigor@sysoev.ru stream->next->filter(task, stream->next->context, b);
2800Sigor@sysoev.ru }
2810Sigor@sysoev.ru }
2820Sigor@sysoev.ru }
2830Sigor@sysoev.ru
2840Sigor@sysoev.ru
2850Sigor@sysoev.ru static nxt_buf_t *
nxt_stream_source_process_buffers(nxt_stream_source_t * stream,nxt_event_conn_t * c)2860Sigor@sysoev.ru nxt_stream_source_process_buffers(nxt_stream_source_t *stream,
2870Sigor@sysoev.ru nxt_event_conn_t *c)
2880Sigor@sysoev.ru {
2890Sigor@sysoev.ru size_t size, nbytes;
2900Sigor@sysoev.ru nxt_buf_t *b, *in, *head, **prev;
2910Sigor@sysoev.ru
2920Sigor@sysoev.ru nbytes = c->nbytes;
2930Sigor@sysoev.ru prev = &head;
2940Sigor@sysoev.ru
2950Sigor@sysoev.ru do {
2960Sigor@sysoev.ru b = nxt_buf_mem_alloc(stream->upstream->buffers.mem_pool, 0, 0);
2970Sigor@sysoev.ru
2980Sigor@sysoev.ru if (nxt_slow_path(b == NULL)) {
2990Sigor@sysoev.ru return NULL;
3000Sigor@sysoev.ru }
3010Sigor@sysoev.ru
3020Sigor@sysoev.ru *prev = b;
3030Sigor@sysoev.ru
3040Sigor@sysoev.ru b->data = stream;
3050Sigor@sysoev.ru b->completion_handler = nxt_stream_source_buf_completion;
3060Sigor@sysoev.ru
3070Sigor@sysoev.ru in = c->read;
3080Sigor@sysoev.ru in->retain++;
3090Sigor@sysoev.ru b->parent = in;
3100Sigor@sysoev.ru
3110Sigor@sysoev.ru b->mem.pos = in->mem.free;
3120Sigor@sysoev.ru b->mem.start = in->mem.free;
3130Sigor@sysoev.ru
3140Sigor@sysoev.ru size = nxt_buf_mem_free_size(&in->mem);
3150Sigor@sysoev.ru
3160Sigor@sysoev.ru if (nbytes < size) {
3170Sigor@sysoev.ru in->mem.free += nbytes;
3180Sigor@sysoev.ru
3190Sigor@sysoev.ru b->mem.free = in->mem.free;
3200Sigor@sysoev.ru b->mem.end = in->mem.free;
3210Sigor@sysoev.ru
3220Sigor@sysoev.ru break;
3230Sigor@sysoev.ru }
3240Sigor@sysoev.ru
3250Sigor@sysoev.ru in->mem.free = in->mem.end;
3260Sigor@sysoev.ru
3270Sigor@sysoev.ru b->mem.free = in->mem.free;
3280Sigor@sysoev.ru b->mem.end = in->mem.free;
3290Sigor@sysoev.ru nbytes -= size;
3300Sigor@sysoev.ru
3310Sigor@sysoev.ru prev = &b->next;
3320Sigor@sysoev.ru c->read = in->next;
3330Sigor@sysoev.ru in->next = NULL;
3340Sigor@sysoev.ru
3350Sigor@sysoev.ru } while (c->read != NULL);
3360Sigor@sysoev.ru
3370Sigor@sysoev.ru return head;
3380Sigor@sysoev.ru }
3390Sigor@sysoev.ru
3400Sigor@sysoev.ru
3410Sigor@sysoev.ru static void
nxt_stream_source_buf_completion(nxt_task_t * task,void * obj,void * data)3421Sigor@sysoev.ru nxt_stream_source_buf_completion(nxt_task_t *task, void *obj, void *data)
3430Sigor@sysoev.ru {
3440Sigor@sysoev.ru size_t size;
3450Sigor@sysoev.ru nxt_buf_t *b, *parent;
3460Sigor@sysoev.ru nxt_stream_source_t *stream;
3470Sigor@sysoev.ru
3480Sigor@sysoev.ru b = obj;
3490Sigor@sysoev.ru parent = data;
3500Sigor@sysoev.ru
3510Sigor@sysoev.ru #if 0
3521Sigor@sysoev.ru nxt_debug(thr->log,
3530Sigor@sysoev.ru "stream source buf completion: %p parent:%p retain:%uD",
3540Sigor@sysoev.ru b, parent, parent->retain);
3550Sigor@sysoev.ru #endif
3560Sigor@sysoev.ru
3570Sigor@sysoev.ru stream = b->data;
3580Sigor@sysoev.ru
3590Sigor@sysoev.ru /* A parent is a buffer where stream reads data. */
3600Sigor@sysoev.ru
3610Sigor@sysoev.ru parent->mem.pos = b->mem.pos;
3620Sigor@sysoev.ru parent->retain--;
3630Sigor@sysoev.ru
3640Sigor@sysoev.ru if (parent->retain == 0 && !stream->conn->socket.closed) {
3650Sigor@sysoev.ru size = nxt_buf_mem_size(&parent->mem);
3660Sigor@sysoev.ru
3670Sigor@sysoev.ru parent->mem.pos = parent->mem.start;
3680Sigor@sysoev.ru parent->mem.free = parent->mem.start;
3690Sigor@sysoev.ru
3700Sigor@sysoev.ru /*
3710Sigor@sysoev.ru * A buffer's original size can be changed by filters
3720Sigor@sysoev.ru * so reuse the buffer only if it is still large enough.
3730Sigor@sysoev.ru */
3740Sigor@sysoev.ru if (size >= 256 || size >= stream->upstream->buffers.size) {
3750Sigor@sysoev.ru
3760Sigor@sysoev.ru if (stream->conn->read != parent) {
3770Sigor@sysoev.ru nxt_buf_chain_add(&stream->conn->read, parent);
3780Sigor@sysoev.ru }
3790Sigor@sysoev.ru
3800Sigor@sysoev.ru if (!stream->read_queued) {
3810Sigor@sysoev.ru stream->read_queued = 1;
3821Sigor@sysoev.ru nxt_thread_work_queue_add(task->thread,
3831Sigor@sysoev.ru stream->upstream->work_queue,
3840Sigor@sysoev.ru nxt_stream_source_read_ready,
3851Sigor@sysoev.ru task, stream->conn,
3861Sigor@sysoev.ru stream->conn->socket.data);
3870Sigor@sysoev.ru }
3880Sigor@sysoev.ru }
3890Sigor@sysoev.ru }
3900Sigor@sysoev.ru
3910Sigor@sysoev.ru nxt_buf_free(stream->upstream->buffers.mem_pool, b);
3920Sigor@sysoev.ru }
3930Sigor@sysoev.ru
3940Sigor@sysoev.ru
3950Sigor@sysoev.ru static void
nxt_stream_source_refused(nxt_task_t * task,void * obj,void * data)3961Sigor@sysoev.ru nxt_stream_source_refused(nxt_task_t *task, void *obj, void *data)
3970Sigor@sysoev.ru {
3980Sigor@sysoev.ru nxt_stream_source_t *stream;
3990Sigor@sysoev.ru
4000Sigor@sysoev.ru stream = data;
4010Sigor@sysoev.ru
4020Sigor@sysoev.ru #if (NXT_DEBUG)
4030Sigor@sysoev.ru {
4040Sigor@sysoev.ru nxt_event_conn_t *c;
4050Sigor@sysoev.ru
4060Sigor@sysoev.ru c = obj;
4070Sigor@sysoev.ru
4081Sigor@sysoev.ru nxt_debug(task, "stream source refused fd:%d", c->socket.fd);
4090Sigor@sysoev.ru }
4100Sigor@sysoev.ru #endif
4110Sigor@sysoev.ru
4121Sigor@sysoev.ru nxt_stream_source_close(task, stream);
4130Sigor@sysoev.ru }
4140Sigor@sysoev.ru
4150Sigor@sysoev.ru
4160Sigor@sysoev.ru static void
nxt_stream_source_closed(nxt_task_t * task,void * obj,void * data)4171Sigor@sysoev.ru nxt_stream_source_closed(nxt_task_t *task, void *obj, void *data)
4180Sigor@sysoev.ru {
4190Sigor@sysoev.ru nxt_buf_t *b;
4200Sigor@sysoev.ru nxt_event_conn_t *c;
4210Sigor@sysoev.ru nxt_stream_source_t *stream;
4220Sigor@sysoev.ru
4230Sigor@sysoev.ru c = obj;
4240Sigor@sysoev.ru stream = data;
4250Sigor@sysoev.ru
4261Sigor@sysoev.ru nxt_debug(task, "stream source closed fd:%d", c->socket.fd);
4270Sigor@sysoev.ru
428*62Sigor@sysoev.ru nxt_conn_close(task, c);
4290Sigor@sysoev.ru
4300Sigor@sysoev.ru b = nxt_buf_sync_alloc(stream->upstream->buffers.mem_pool,
4310Sigor@sysoev.ru NXT_BUF_SYNC_LAST);
4320Sigor@sysoev.ru
4330Sigor@sysoev.ru if (nxt_slow_path(b == NULL)) {
4341Sigor@sysoev.ru stream->error_handler(task, stream);
4350Sigor@sysoev.ru return;
4360Sigor@sysoev.ru }
4370Sigor@sysoev.ru
4381Sigor@sysoev.ru nxt_source_filter(task->thread, c->write_work_queue, task, stream->next, b);
4390Sigor@sysoev.ru }
4400Sigor@sysoev.ru
4410Sigor@sysoev.ru
4420Sigor@sysoev.ru static void
nxt_stream_source_error(nxt_task_t * task,void * obj,void * data)4431Sigor@sysoev.ru nxt_stream_source_error(nxt_task_t *task, void *obj, void *data)
4440Sigor@sysoev.ru {
4450Sigor@sysoev.ru nxt_stream_source_t *stream;
4460Sigor@sysoev.ru
4470Sigor@sysoev.ru stream = data;
4480Sigor@sysoev.ru
4490Sigor@sysoev.ru #if (NXT_DEBUG)
4500Sigor@sysoev.ru {
4510Sigor@sysoev.ru nxt_event_fd_t *ev;
4520Sigor@sysoev.ru
4530Sigor@sysoev.ru ev = obj;
4540Sigor@sysoev.ru
4551Sigor@sysoev.ru nxt_debug(task, "stream source error fd:%d", ev->fd);
4560Sigor@sysoev.ru }
4570Sigor@sysoev.ru #endif
4580Sigor@sysoev.ru
4591Sigor@sysoev.ru nxt_stream_source_close(task, stream);
4600Sigor@sysoev.ru }
4610Sigor@sysoev.ru
4620Sigor@sysoev.ru
4630Sigor@sysoev.ru static void
nxt_stream_source_close(nxt_task_t * task,nxt_stream_source_t * stream)4641Sigor@sysoev.ru nxt_stream_source_close(nxt_task_t *task, nxt_stream_source_t *stream)
4650Sigor@sysoev.ru {
466*62Sigor@sysoev.ru nxt_conn_close(task, stream->conn);
4670Sigor@sysoev.ru
4681Sigor@sysoev.ru stream->error_handler(task, stream);
4690Sigor@sysoev.ru }
4700Sigor@sysoev.ru
4710Sigor@sysoev.ru
4720Sigor@sysoev.ru void
nxt_source_filter_handler(nxt_task_t * task,void * obj,void * data)4731Sigor@sysoev.ru nxt_source_filter_handler(nxt_task_t *task, void *obj, void *data)
4740Sigor@sysoev.ru {
4750Sigor@sysoev.ru nxt_source_hook_t *next;
4760Sigor@sysoev.ru
4770Sigor@sysoev.ru next = obj;
4780Sigor@sysoev.ru
4791Sigor@sysoev.ru next->filter(task, next->context, data);
4800Sigor@sysoev.ru }
481