xref: /unit/src/nxt_stream_source.c (revision 1)
10Sigor@sysoev.ru 
20Sigor@sysoev.ru /*
30Sigor@sysoev.ru  * Copyright (C) Igor Sysoev
40Sigor@sysoev.ru  * Copyright (C) NGINX, Inc.
50Sigor@sysoev.ru  */
60Sigor@sysoev.ru 
70Sigor@sysoev.ru #include <nxt_main.h>
80Sigor@sysoev.ru 
90Sigor@sysoev.ru 
10*1Sigor@sysoev.ru static void nxt_stream_source_connected(nxt_task_t *task, void *obj,
110Sigor@sysoev.ru     void *data);
12*1Sigor@sysoev.ru static void nxt_stream_source_write_ready(nxt_task_t *task, void *obj,
130Sigor@sysoev.ru     void *data);
14*1Sigor@sysoev.ru static void nxt_stream_source_read_ready(nxt_task_t *task, void *obj,
150Sigor@sysoev.ru     void *data);
160Sigor@sysoev.ru static nxt_buf_t *nxt_stream_source_process_buffers(nxt_stream_source_t *stream,
170Sigor@sysoev.ru     nxt_event_conn_t *c);
18*1Sigor@sysoev.ru static void nxt_stream_source_buf_completion(nxt_task_t *task, void *obj,
190Sigor@sysoev.ru     void *data);
20*1Sigor@sysoev.ru static void nxt_stream_source_read_done(nxt_task_t *task, void *obj,
210Sigor@sysoev.ru     void *data);
22*1Sigor@sysoev.ru static void nxt_stream_source_refused(nxt_task_t *task, void *obj, void *data);
23*1Sigor@sysoev.ru static void nxt_stream_source_closed(nxt_task_t *task, void *obj, void *data);
24*1Sigor@sysoev.ru static void nxt_stream_source_error(nxt_task_t *task, void *obj, void *data);
25*1Sigor@sysoev.ru static void nxt_stream_source_close(nxt_task_t *task,
260Sigor@sysoev.ru     nxt_stream_source_t *stream);
270Sigor@sysoev.ru 
280Sigor@sysoev.ru 
290Sigor@sysoev.ru static const nxt_event_conn_state_t  nxt_stream_source_connect_state;
300Sigor@sysoev.ru static const nxt_event_conn_state_t  nxt_stream_source_request_write_state;
310Sigor@sysoev.ru static const nxt_event_conn_state_t  nxt_stream_source_response_ready_state;
320Sigor@sysoev.ru static const nxt_event_conn_state_t  nxt_stream_source_response_read_state;
330Sigor@sysoev.ru 
340Sigor@sysoev.ru 
350Sigor@sysoev.ru void
36*1Sigor@sysoev.ru nxt_stream_source_connect(nxt_task_t *task, nxt_stream_source_t *stream)
370Sigor@sysoev.ru {
380Sigor@sysoev.ru     nxt_thread_t          *thr;
390Sigor@sysoev.ru     nxt_event_conn_t      *c;
400Sigor@sysoev.ru     nxt_upstream_source_t  *us;
410Sigor@sysoev.ru 
420Sigor@sysoev.ru     thr = nxt_thread();
430Sigor@sysoev.ru 
440Sigor@sysoev.ru     us = stream->upstream;
450Sigor@sysoev.ru 
460Sigor@sysoev.ru     if (nxt_slow_path(!nxt_buf_pool_obtainable(&us->buffers))) {
47*1Sigor@sysoev.ru         nxt_log(task, NXT_LOG_ERR,
48*1Sigor@sysoev.ru                 "%d buffers %uDK each are not enough to read upstream response",
49*1Sigor@sysoev.ru                 us->buffers.max, us->buffers.size / 1024);
500Sigor@sysoev.ru         goto fail;
510Sigor@sysoev.ru     }
520Sigor@sysoev.ru 
530Sigor@sysoev.ru     c = nxt_event_conn_create(us->buffers.mem_pool, thr->log);
540Sigor@sysoev.ru     if (nxt_slow_path(c == NULL)) {
550Sigor@sysoev.ru         goto fail;
560Sigor@sysoev.ru     }
570Sigor@sysoev.ru 
580Sigor@sysoev.ru     stream->conn = c;
590Sigor@sysoev.ru     c->socket.data = stream;
600Sigor@sysoev.ru 
610Sigor@sysoev.ru     nxt_event_conn_work_queue_set(c, us->work_queue);
620Sigor@sysoev.ru 
630Sigor@sysoev.ru     c->remote = us->peer->sockaddr;
640Sigor@sysoev.ru     c->write_state = &nxt_stream_source_connect_state;
650Sigor@sysoev.ru 
66*1Sigor@sysoev.ru     nxt_event_conn_connect(task, c);
670Sigor@sysoev.ru     return;
680Sigor@sysoev.ru 
690Sigor@sysoev.ru fail:
700Sigor@sysoev.ru 
71*1Sigor@sysoev.ru     stream->error_handler(task, stream);
720Sigor@sysoev.ru }
730Sigor@sysoev.ru 
740Sigor@sysoev.ru 
750Sigor@sysoev.ru static const nxt_event_conn_state_t  nxt_stream_source_connect_state
760Sigor@sysoev.ru     nxt_aligned(64) =
770Sigor@sysoev.ru {
780Sigor@sysoev.ru     NXT_EVENT_NO_BUF_PROCESS,
790Sigor@sysoev.ru     NXT_EVENT_TIMER_AUTORESET,
800Sigor@sysoev.ru 
810Sigor@sysoev.ru     nxt_stream_source_connected,
820Sigor@sysoev.ru     nxt_stream_source_refused,
830Sigor@sysoev.ru     nxt_stream_source_error,
840Sigor@sysoev.ru 
850Sigor@sysoev.ru     NULL, /* timeout */
860Sigor@sysoev.ru     NULL, /* timeout value */
870Sigor@sysoev.ru     0, /* connect_timeout */
880Sigor@sysoev.ru };
890Sigor@sysoev.ru 
900Sigor@sysoev.ru 
910Sigor@sysoev.ru static void
92*1Sigor@sysoev.ru nxt_stream_source_connected(nxt_task_t *task, void *obj, void *data)
930Sigor@sysoev.ru {
940Sigor@sysoev.ru     nxt_event_conn_t     *c;
950Sigor@sysoev.ru     nxt_stream_source_t  *stream;
960Sigor@sysoev.ru 
970Sigor@sysoev.ru     c = obj;
980Sigor@sysoev.ru     stream = data;
990Sigor@sysoev.ru 
100*1Sigor@sysoev.ru     nxt_debug(task, "stream source connected fd:%d", c->socket.fd);
1010Sigor@sysoev.ru 
1020Sigor@sysoev.ru     c->read_state = &nxt_stream_source_response_ready_state;
1030Sigor@sysoev.ru     c->write = stream->out;
1040Sigor@sysoev.ru     c->write_state = &nxt_stream_source_request_write_state;
1050Sigor@sysoev.ru 
106*1Sigor@sysoev.ru     if (task->thread->engine->batch != 0) {
107*1Sigor@sysoev.ru         nxt_event_conn_write(task, c);
1080Sigor@sysoev.ru 
1090Sigor@sysoev.ru     } else {
1100Sigor@sysoev.ru         stream->read_queued = 1;
111*1Sigor@sysoev.ru         nxt_thread_work_queue_add(task->thread,
112*1Sigor@sysoev.ru                                   &task->thread->engine->read_work_queue,
113*1Sigor@sysoev.ru                                   c->io->read, task, c, stream);
1140Sigor@sysoev.ru 
115*1Sigor@sysoev.ru         c->io->write(task, c, stream);
1160Sigor@sysoev.ru     }
1170Sigor@sysoev.ru }
1180Sigor@sysoev.ru 
1190Sigor@sysoev.ru 
1200Sigor@sysoev.ru static const nxt_event_conn_state_t  nxt_stream_source_request_write_state
1210Sigor@sysoev.ru     nxt_aligned(64) =
1220Sigor@sysoev.ru {
1230Sigor@sysoev.ru     NXT_EVENT_NO_BUF_PROCESS,
1240Sigor@sysoev.ru     NXT_EVENT_TIMER_AUTORESET,
1250Sigor@sysoev.ru 
1260Sigor@sysoev.ru     nxt_stream_source_write_ready,
1270Sigor@sysoev.ru     NULL,
1280Sigor@sysoev.ru     nxt_stream_source_error,
1290Sigor@sysoev.ru 
1300Sigor@sysoev.ru     NULL, /* timeout */
1310Sigor@sysoev.ru     NULL, /* timeout value */
1320Sigor@sysoev.ru     0, /* connect_timeout */
1330Sigor@sysoev.ru };
1340Sigor@sysoev.ru 
1350Sigor@sysoev.ru 
1360Sigor@sysoev.ru static const nxt_event_conn_state_t nxt_stream_source_response_ready_state
1370Sigor@sysoev.ru     nxt_aligned(64) =
1380Sigor@sysoev.ru {
1390Sigor@sysoev.ru     NXT_EVENT_NO_BUF_PROCESS,
1400Sigor@sysoev.ru     NXT_EVENT_TIMER_AUTORESET,
1410Sigor@sysoev.ru 
1420Sigor@sysoev.ru     nxt_stream_source_read_ready,
1430Sigor@sysoev.ru     nxt_stream_source_closed,
1440Sigor@sysoev.ru     nxt_stream_source_error,
1450Sigor@sysoev.ru 
1460Sigor@sysoev.ru     NULL, /* timeout */
1470Sigor@sysoev.ru     NULL, /* timeout value */
1480Sigor@sysoev.ru     0, /* connect_timeout */
1490Sigor@sysoev.ru };
1500Sigor@sysoev.ru 
1510Sigor@sysoev.ru 
1520Sigor@sysoev.ru static void
153*1Sigor@sysoev.ru nxt_stream_source_write_ready(nxt_task_t *task, void *obj, void *data)
1540Sigor@sysoev.ru {
1550Sigor@sysoev.ru     nxt_event_conn_t  *c;
1560Sigor@sysoev.ru 
1570Sigor@sysoev.ru     c = obj;
1580Sigor@sysoev.ru 
159*1Sigor@sysoev.ru     nxt_debug(task, "stream source write ready fd:%d", c->socket.fd);
1600Sigor@sysoev.ru 
161*1Sigor@sysoev.ru     nxt_event_conn_read(task, c);
1620Sigor@sysoev.ru }
1630Sigor@sysoev.ru 
1640Sigor@sysoev.ru 
1650Sigor@sysoev.ru static void
166*1Sigor@sysoev.ru nxt_stream_source_read_ready(nxt_task_t *task, void *obj, void *data)
1670Sigor@sysoev.ru {
1680Sigor@sysoev.ru     nxt_int_t            ret;
1690Sigor@sysoev.ru     nxt_buf_t            *b;
1700Sigor@sysoev.ru     nxt_buf_pool_t       *buffers;
1710Sigor@sysoev.ru     nxt_event_conn_t     *c;
1720Sigor@sysoev.ru     nxt_stream_source_t  *stream;
1730Sigor@sysoev.ru 
1740Sigor@sysoev.ru     c = obj;
1750Sigor@sysoev.ru     stream = data;
1760Sigor@sysoev.ru     stream->read_queued = 0;
1770Sigor@sysoev.ru 
178*1Sigor@sysoev.ru     nxt_debug(task, "stream source read ready fd:%d", c->socket.fd);
1790Sigor@sysoev.ru 
1800Sigor@sysoev.ru     if (c->read == NULL) {
1810Sigor@sysoev.ru 
1820Sigor@sysoev.ru         buffers = &stream->upstream->buffers;
1830Sigor@sysoev.ru 
1840Sigor@sysoev.ru         ret = nxt_buf_pool_mem_alloc(buffers, 0);
1850Sigor@sysoev.ru 
1860Sigor@sysoev.ru         if (nxt_slow_path(ret != NXT_OK)) {
1870Sigor@sysoev.ru 
1880Sigor@sysoev.ru             if (nxt_slow_path(ret == NXT_ERROR)) {
1890Sigor@sysoev.ru                 goto fail;
1900Sigor@sysoev.ru             }
1910Sigor@sysoev.ru 
1920Sigor@sysoev.ru             /* ret == NXT_AGAIN */
1930Sigor@sysoev.ru 
194*1Sigor@sysoev.ru             nxt_debug(task, "stream source flush");
1950Sigor@sysoev.ru 
1960Sigor@sysoev.ru             b = nxt_buf_sync_alloc(buffers->mem_pool, NXT_BUF_SYNC_NOBUF);
1970Sigor@sysoev.ru 
1980Sigor@sysoev.ru             if (nxt_slow_path(b == NULL)) {
1990Sigor@sysoev.ru                 goto fail;
2000Sigor@sysoev.ru             }
2010Sigor@sysoev.ru 
202*1Sigor@sysoev.ru             nxt_event_fd_block_read(task->thread->engine, &c->socket);
2030Sigor@sysoev.ru 
204*1Sigor@sysoev.ru             nxt_source_filter(task->thread, c->write_work_queue, task,
205*1Sigor@sysoev.ru                               stream->next, b);
2060Sigor@sysoev.ru             return;
2070Sigor@sysoev.ru         }
2080Sigor@sysoev.ru 
2090Sigor@sysoev.ru         c->read = buffers->current;
2100Sigor@sysoev.ru         buffers->current = NULL;
2110Sigor@sysoev.ru     }
2120Sigor@sysoev.ru 
2130Sigor@sysoev.ru     c->read_state = &nxt_stream_source_response_read_state;
2140Sigor@sysoev.ru 
215*1Sigor@sysoev.ru     nxt_event_conn_read(task, c);
2160Sigor@sysoev.ru     return;
2170Sigor@sysoev.ru 
2180Sigor@sysoev.ru fail:
2190Sigor@sysoev.ru 
220*1Sigor@sysoev.ru     nxt_stream_source_close(task, stream);
2210Sigor@sysoev.ru }
2220Sigor@sysoev.ru 
2230Sigor@sysoev.ru 
2240Sigor@sysoev.ru static const nxt_event_conn_state_t nxt_stream_source_response_read_state
2250Sigor@sysoev.ru     nxt_aligned(64) =
2260Sigor@sysoev.ru {
2270Sigor@sysoev.ru     NXT_EVENT_NO_BUF_PROCESS,
2280Sigor@sysoev.ru     NXT_EVENT_TIMER_AUTORESET,
2290Sigor@sysoev.ru 
2300Sigor@sysoev.ru     nxt_stream_source_read_done,
2310Sigor@sysoev.ru     nxt_stream_source_closed,
2320Sigor@sysoev.ru     nxt_stream_source_error,
2330Sigor@sysoev.ru 
2340Sigor@sysoev.ru     NULL, /* timeout */
2350Sigor@sysoev.ru     NULL, /* timeout value */
2360Sigor@sysoev.ru     0, /* connect_timeout */
2370Sigor@sysoev.ru };
2380Sigor@sysoev.ru 
2390Sigor@sysoev.ru 
2400Sigor@sysoev.ru static void
241*1Sigor@sysoev.ru nxt_stream_source_read_done(nxt_task_t *task, void *obj, void *data)
2420Sigor@sysoev.ru {
2430Sigor@sysoev.ru     nxt_buf_t            *b;
2440Sigor@sysoev.ru     nxt_bool_t           batch;
2450Sigor@sysoev.ru     nxt_event_conn_t     *c;
2460Sigor@sysoev.ru     nxt_stream_source_t  *stream;
2470Sigor@sysoev.ru 
2480Sigor@sysoev.ru     c = obj;
2490Sigor@sysoev.ru     stream = data;
2500Sigor@sysoev.ru 
251*1Sigor@sysoev.ru     nxt_debug(task, "stream source read done fd:%d", c->socket.fd);
2520Sigor@sysoev.ru 
2530Sigor@sysoev.ru     if (c->read != NULL) {
2540Sigor@sysoev.ru         b = nxt_stream_source_process_buffers(stream, c);
2550Sigor@sysoev.ru 
2560Sigor@sysoev.ru         if (nxt_slow_path(b == NULL)) {
257*1Sigor@sysoev.ru             nxt_stream_source_close(task, stream);
2580Sigor@sysoev.ru             return;
2590Sigor@sysoev.ru         }
2600Sigor@sysoev.ru 
261*1Sigor@sysoev.ru         batch = (task->thread->engine->batch != 0);
2620Sigor@sysoev.ru 
2630Sigor@sysoev.ru         if (batch) {
264*1Sigor@sysoev.ru             nxt_thread_work_queue_add(task->thread,
265*1Sigor@sysoev.ru                                       stream->upstream->work_queue,
2660Sigor@sysoev.ru                                       nxt_source_filter_handler,
267*1Sigor@sysoev.ru                                       task, stream->next, b);
2680Sigor@sysoev.ru         }
2690Sigor@sysoev.ru 
2700Sigor@sysoev.ru         if (!stream->read_queued) {
2710Sigor@sysoev.ru             stream->read_queued = 1;
272*1Sigor@sysoev.ru             nxt_thread_work_queue_add(task->thread,
273*1Sigor@sysoev.ru                                       stream->upstream->work_queue,
2740Sigor@sysoev.ru                                       nxt_stream_source_read_ready,
275*1Sigor@sysoev.ru                                       task, c, stream);
2760Sigor@sysoev.ru         }
2770Sigor@sysoev.ru 
2780Sigor@sysoev.ru         if (!batch) {
279*1Sigor@sysoev.ru             stream->next->filter(task, stream->next->context, b);
2800Sigor@sysoev.ru         }
2810Sigor@sysoev.ru     }
2820Sigor@sysoev.ru }
2830Sigor@sysoev.ru 
2840Sigor@sysoev.ru 
2850Sigor@sysoev.ru static nxt_buf_t *
2860Sigor@sysoev.ru nxt_stream_source_process_buffers(nxt_stream_source_t *stream,
2870Sigor@sysoev.ru     nxt_event_conn_t *c)
2880Sigor@sysoev.ru {
2890Sigor@sysoev.ru     size_t     size, nbytes;
2900Sigor@sysoev.ru     nxt_buf_t  *b, *in, *head, **prev;
2910Sigor@sysoev.ru 
2920Sigor@sysoev.ru     nbytes = c->nbytes;
2930Sigor@sysoev.ru     prev = &head;
2940Sigor@sysoev.ru 
2950Sigor@sysoev.ru     do {
2960Sigor@sysoev.ru         b = nxt_buf_mem_alloc(stream->upstream->buffers.mem_pool, 0, 0);
2970Sigor@sysoev.ru 
2980Sigor@sysoev.ru         if (nxt_slow_path(b == NULL)) {
2990Sigor@sysoev.ru             return NULL;
3000Sigor@sysoev.ru         }
3010Sigor@sysoev.ru 
3020Sigor@sysoev.ru         *prev = b;
3030Sigor@sysoev.ru 
3040Sigor@sysoev.ru         b->data = stream;
3050Sigor@sysoev.ru         b->completion_handler = nxt_stream_source_buf_completion;
3060Sigor@sysoev.ru 
3070Sigor@sysoev.ru         in = c->read;
3080Sigor@sysoev.ru         in->retain++;
3090Sigor@sysoev.ru         b->parent = in;
3100Sigor@sysoev.ru 
3110Sigor@sysoev.ru         b->mem.pos = in->mem.free;
3120Sigor@sysoev.ru         b->mem.start = in->mem.free;
3130Sigor@sysoev.ru 
3140Sigor@sysoev.ru         size = nxt_buf_mem_free_size(&in->mem);
3150Sigor@sysoev.ru 
3160Sigor@sysoev.ru         if (nbytes < size) {
3170Sigor@sysoev.ru             in->mem.free += nbytes;
3180Sigor@sysoev.ru 
3190Sigor@sysoev.ru             b->mem.free = in->mem.free;
3200Sigor@sysoev.ru             b->mem.end = in->mem.free;
3210Sigor@sysoev.ru 
3220Sigor@sysoev.ru             break;
3230Sigor@sysoev.ru         }
3240Sigor@sysoev.ru 
3250Sigor@sysoev.ru         in->mem.free = in->mem.end;
3260Sigor@sysoev.ru 
3270Sigor@sysoev.ru         b->mem.free = in->mem.free;
3280Sigor@sysoev.ru         b->mem.end = in->mem.free;
3290Sigor@sysoev.ru         nbytes -= size;
3300Sigor@sysoev.ru 
3310Sigor@sysoev.ru         prev = &b->next;
3320Sigor@sysoev.ru         c->read = in->next;
3330Sigor@sysoev.ru         in->next = NULL;
3340Sigor@sysoev.ru 
3350Sigor@sysoev.ru     } while (c->read != NULL);
3360Sigor@sysoev.ru 
3370Sigor@sysoev.ru     return head;
3380Sigor@sysoev.ru }
3390Sigor@sysoev.ru 
3400Sigor@sysoev.ru 
3410Sigor@sysoev.ru static void
342*1Sigor@sysoev.ru nxt_stream_source_buf_completion(nxt_task_t *task, void *obj, void *data)
3430Sigor@sysoev.ru {
3440Sigor@sysoev.ru     size_t               size;
3450Sigor@sysoev.ru     nxt_buf_t            *b, *parent;
3460Sigor@sysoev.ru     nxt_stream_source_t  *stream;
3470Sigor@sysoev.ru 
3480Sigor@sysoev.ru     b = obj;
3490Sigor@sysoev.ru     parent = data;
3500Sigor@sysoev.ru 
3510Sigor@sysoev.ru #if 0
352*1Sigor@sysoev.ru     nxt_debug(thr->log,
3530Sigor@sysoev.ru                   "stream source buf completion: %p parent:%p retain:%uD",
3540Sigor@sysoev.ru                   b, parent, parent->retain);
3550Sigor@sysoev.ru #endif
3560Sigor@sysoev.ru 
3570Sigor@sysoev.ru     stream = b->data;
3580Sigor@sysoev.ru 
3590Sigor@sysoev.ru     /* A parent is a buffer where stream reads data. */
3600Sigor@sysoev.ru 
3610Sigor@sysoev.ru     parent->mem.pos = b->mem.pos;
3620Sigor@sysoev.ru     parent->retain--;
3630Sigor@sysoev.ru 
3640Sigor@sysoev.ru     if (parent->retain == 0 && !stream->conn->socket.closed) {
3650Sigor@sysoev.ru         size = nxt_buf_mem_size(&parent->mem);
3660Sigor@sysoev.ru 
3670Sigor@sysoev.ru         parent->mem.pos = parent->mem.start;
3680Sigor@sysoev.ru         parent->mem.free = parent->mem.start;
3690Sigor@sysoev.ru 
3700Sigor@sysoev.ru         /*
3710Sigor@sysoev.ru          * A buffer's original size can be changed by filters
3720Sigor@sysoev.ru          * so reuse the buffer only if it is still large enough.
3730Sigor@sysoev.ru          */
3740Sigor@sysoev.ru         if (size >= 256 || size >= stream->upstream->buffers.size) {
3750Sigor@sysoev.ru 
3760Sigor@sysoev.ru             if (stream->conn->read != parent) {
3770Sigor@sysoev.ru                 nxt_buf_chain_add(&stream->conn->read, parent);
3780Sigor@sysoev.ru             }
3790Sigor@sysoev.ru 
3800Sigor@sysoev.ru             if (!stream->read_queued) {
3810Sigor@sysoev.ru                 stream->read_queued = 1;
382*1Sigor@sysoev.ru                 nxt_thread_work_queue_add(task->thread,
383*1Sigor@sysoev.ru                                           stream->upstream->work_queue,
3840Sigor@sysoev.ru                                           nxt_stream_source_read_ready,
385*1Sigor@sysoev.ru                                           task, stream->conn,
386*1Sigor@sysoev.ru                                           stream->conn->socket.data);
3870Sigor@sysoev.ru             }
3880Sigor@sysoev.ru         }
3890Sigor@sysoev.ru     }
3900Sigor@sysoev.ru 
3910Sigor@sysoev.ru     nxt_buf_free(stream->upstream->buffers.mem_pool, b);
3920Sigor@sysoev.ru }
3930Sigor@sysoev.ru 
3940Sigor@sysoev.ru 
3950Sigor@sysoev.ru static void
396*1Sigor@sysoev.ru nxt_stream_source_refused(nxt_task_t *task, void *obj, void *data)
3970Sigor@sysoev.ru {
3980Sigor@sysoev.ru     nxt_stream_source_t  *stream;
3990Sigor@sysoev.ru 
4000Sigor@sysoev.ru     stream = data;
4010Sigor@sysoev.ru 
4020Sigor@sysoev.ru #if (NXT_DEBUG)
4030Sigor@sysoev.ru     {
4040Sigor@sysoev.ru         nxt_event_conn_t  *c;
4050Sigor@sysoev.ru 
4060Sigor@sysoev.ru         c = obj;
4070Sigor@sysoev.ru 
408*1Sigor@sysoev.ru         nxt_debug(task, "stream source refused fd:%d", c->socket.fd);
4090Sigor@sysoev.ru     }
4100Sigor@sysoev.ru #endif
4110Sigor@sysoev.ru 
412*1Sigor@sysoev.ru     nxt_stream_source_close(task, stream);
4130Sigor@sysoev.ru }
4140Sigor@sysoev.ru 
4150Sigor@sysoev.ru 
4160Sigor@sysoev.ru static void
417*1Sigor@sysoev.ru nxt_stream_source_closed(nxt_task_t *task, void *obj, void *data)
4180Sigor@sysoev.ru {
4190Sigor@sysoev.ru     nxt_buf_t            *b;
4200Sigor@sysoev.ru     nxt_event_conn_t     *c;
4210Sigor@sysoev.ru     nxt_stream_source_t  *stream;
4220Sigor@sysoev.ru 
4230Sigor@sysoev.ru     c = obj;
4240Sigor@sysoev.ru     stream = data;
4250Sigor@sysoev.ru 
426*1Sigor@sysoev.ru     nxt_debug(task, "stream source closed fd:%d", c->socket.fd);
4270Sigor@sysoev.ru 
428*1Sigor@sysoev.ru     nxt_event_conn_close(task, c);
4290Sigor@sysoev.ru 
4300Sigor@sysoev.ru     b = nxt_buf_sync_alloc(stream->upstream->buffers.mem_pool,
4310Sigor@sysoev.ru                            NXT_BUF_SYNC_LAST);
4320Sigor@sysoev.ru 
4330Sigor@sysoev.ru     if (nxt_slow_path(b == NULL)) {
434*1Sigor@sysoev.ru         stream->error_handler(task, stream);
4350Sigor@sysoev.ru         return;
4360Sigor@sysoev.ru     }
4370Sigor@sysoev.ru 
438*1Sigor@sysoev.ru     nxt_source_filter(task->thread, c->write_work_queue, task, stream->next, b);
4390Sigor@sysoev.ru }
4400Sigor@sysoev.ru 
4410Sigor@sysoev.ru 
4420Sigor@sysoev.ru static void
443*1Sigor@sysoev.ru nxt_stream_source_error(nxt_task_t *task, void *obj, void *data)
4440Sigor@sysoev.ru {
4450Sigor@sysoev.ru     nxt_stream_source_t  *stream;
4460Sigor@sysoev.ru 
4470Sigor@sysoev.ru     stream = data;
4480Sigor@sysoev.ru 
4490Sigor@sysoev.ru #if (NXT_DEBUG)
4500Sigor@sysoev.ru     {
4510Sigor@sysoev.ru         nxt_event_fd_t  *ev;
4520Sigor@sysoev.ru 
4530Sigor@sysoev.ru         ev = obj;
4540Sigor@sysoev.ru 
455*1Sigor@sysoev.ru         nxt_debug(task, "stream source error fd:%d", ev->fd);
4560Sigor@sysoev.ru     }
4570Sigor@sysoev.ru #endif
4580Sigor@sysoev.ru 
459*1Sigor@sysoev.ru     nxt_stream_source_close(task, stream);
4600Sigor@sysoev.ru }
4610Sigor@sysoev.ru 
4620Sigor@sysoev.ru 
4630Sigor@sysoev.ru static void
464*1Sigor@sysoev.ru nxt_stream_source_close(nxt_task_t *task, nxt_stream_source_t *stream)
4650Sigor@sysoev.ru {
466*1Sigor@sysoev.ru     nxt_event_conn_close(task, stream->conn);
4670Sigor@sysoev.ru 
468*1Sigor@sysoev.ru     stream->error_handler(task, stream);
4690Sigor@sysoev.ru }
4700Sigor@sysoev.ru 
4710Sigor@sysoev.ru 
4720Sigor@sysoev.ru void
473*1Sigor@sysoev.ru nxt_source_filter_handler(nxt_task_t *task, void *obj, void *data)
4740Sigor@sysoev.ru {
4750Sigor@sysoev.ru     nxt_source_hook_t  *next;
4760Sigor@sysoev.ru 
4770Sigor@sysoev.ru     next = obj;
4780Sigor@sysoev.ru 
479*1Sigor@sysoev.ru     next->filter(task, next->context, data);
4800Sigor@sysoev.ru }
481