xref: /unit/src/nxt_stream_source.c (revision 0)
1*0Sigor@sysoev.ru 
2*0Sigor@sysoev.ru /*
3*0Sigor@sysoev.ru  * Copyright (C) Igor Sysoev
4*0Sigor@sysoev.ru  * Copyright (C) NGINX, Inc.
5*0Sigor@sysoev.ru  */
6*0Sigor@sysoev.ru 
7*0Sigor@sysoev.ru #include <nxt_main.h>
8*0Sigor@sysoev.ru 
9*0Sigor@sysoev.ru 
10*0Sigor@sysoev.ru static void nxt_stream_source_connected(nxt_thread_t *thr, void *obj,
11*0Sigor@sysoev.ru     void *data);
12*0Sigor@sysoev.ru static void nxt_stream_source_write_ready(nxt_thread_t *thr, void *obj,
13*0Sigor@sysoev.ru     void *data);
14*0Sigor@sysoev.ru static void nxt_stream_source_read_ready(nxt_thread_t *thr, void *obj,
15*0Sigor@sysoev.ru     void *data);
16*0Sigor@sysoev.ru static nxt_buf_t *nxt_stream_source_process_buffers(nxt_stream_source_t *stream,
17*0Sigor@sysoev.ru     nxt_event_conn_t *c);
18*0Sigor@sysoev.ru static void nxt_stream_source_buf_completion(nxt_thread_t *thr, void *obj,
19*0Sigor@sysoev.ru     void *data);
20*0Sigor@sysoev.ru static void nxt_stream_source_read_done(nxt_thread_t *thr, void *obj,
21*0Sigor@sysoev.ru     void *data);
22*0Sigor@sysoev.ru static void nxt_stream_source_refused(nxt_thread_t *thr, void *obj, void *data);
23*0Sigor@sysoev.ru static void nxt_stream_source_closed(nxt_thread_t *thr, void *obj, void *data);
24*0Sigor@sysoev.ru static void nxt_stream_source_error(nxt_thread_t *thr, void *obj, void *data);
25*0Sigor@sysoev.ru static void nxt_stream_source_close(nxt_thread_t *thr,
26*0Sigor@sysoev.ru     nxt_stream_source_t *stream);
27*0Sigor@sysoev.ru 
28*0Sigor@sysoev.ru 
29*0Sigor@sysoev.ru static const nxt_event_conn_state_t  nxt_stream_source_connect_state;
30*0Sigor@sysoev.ru static const nxt_event_conn_state_t  nxt_stream_source_request_write_state;
31*0Sigor@sysoev.ru static const nxt_event_conn_state_t  nxt_stream_source_response_ready_state;
32*0Sigor@sysoev.ru static const nxt_event_conn_state_t  nxt_stream_source_response_read_state;
33*0Sigor@sysoev.ru 
34*0Sigor@sysoev.ru 
35*0Sigor@sysoev.ru void
36*0Sigor@sysoev.ru nxt_stream_source_connect(nxt_stream_source_t *stream)
37*0Sigor@sysoev.ru {
38*0Sigor@sysoev.ru     nxt_thread_t          *thr;
39*0Sigor@sysoev.ru     nxt_event_conn_t      *c;
40*0Sigor@sysoev.ru     nxt_upstream_source_t  *us;
41*0Sigor@sysoev.ru 
42*0Sigor@sysoev.ru     thr = nxt_thread();
43*0Sigor@sysoev.ru 
44*0Sigor@sysoev.ru     us = stream->upstream;
45*0Sigor@sysoev.ru 
46*0Sigor@sysoev.ru     if (nxt_slow_path(!nxt_buf_pool_obtainable(&us->buffers))) {
47*0Sigor@sysoev.ru         nxt_thread_log_error(NXT_LOG_ERR, "%d buffers %uDK each "
48*0Sigor@sysoev.ru                              "are not enough to read upstream response",
49*0Sigor@sysoev.ru                              us->buffers.max, us->buffers.size / 1024);
50*0Sigor@sysoev.ru         goto fail;
51*0Sigor@sysoev.ru     }
52*0Sigor@sysoev.ru 
53*0Sigor@sysoev.ru     c = nxt_event_conn_create(us->buffers.mem_pool, thr->log);
54*0Sigor@sysoev.ru     if (nxt_slow_path(c == NULL)) {
55*0Sigor@sysoev.ru         goto fail;
56*0Sigor@sysoev.ru     }
57*0Sigor@sysoev.ru 
58*0Sigor@sysoev.ru     stream->conn = c;
59*0Sigor@sysoev.ru     c->socket.data = stream;
60*0Sigor@sysoev.ru 
61*0Sigor@sysoev.ru     nxt_event_conn_work_queue_set(c, us->work_queue);
62*0Sigor@sysoev.ru 
63*0Sigor@sysoev.ru     c->remote = us->peer->sockaddr;
64*0Sigor@sysoev.ru     c->write_state = &nxt_stream_source_connect_state;
65*0Sigor@sysoev.ru 
66*0Sigor@sysoev.ru     nxt_event_conn_connect(thr, c);
67*0Sigor@sysoev.ru     return;
68*0Sigor@sysoev.ru 
69*0Sigor@sysoev.ru fail:
70*0Sigor@sysoev.ru 
71*0Sigor@sysoev.ru     stream->error_handler(stream);
72*0Sigor@sysoev.ru }
73*0Sigor@sysoev.ru 
74*0Sigor@sysoev.ru 
75*0Sigor@sysoev.ru static const nxt_event_conn_state_t  nxt_stream_source_connect_state
76*0Sigor@sysoev.ru     nxt_aligned(64) =
77*0Sigor@sysoev.ru {
78*0Sigor@sysoev.ru     NXT_EVENT_NO_BUF_PROCESS,
79*0Sigor@sysoev.ru     NXT_EVENT_TIMER_AUTORESET,
80*0Sigor@sysoev.ru 
81*0Sigor@sysoev.ru     nxt_stream_source_connected,
82*0Sigor@sysoev.ru     nxt_stream_source_refused,
83*0Sigor@sysoev.ru     nxt_stream_source_error,
84*0Sigor@sysoev.ru 
85*0Sigor@sysoev.ru     NULL, /* timeout */
86*0Sigor@sysoev.ru     NULL, /* timeout value */
87*0Sigor@sysoev.ru     0, /* connect_timeout */
88*0Sigor@sysoev.ru };
89*0Sigor@sysoev.ru 
90*0Sigor@sysoev.ru 
91*0Sigor@sysoev.ru static void
92*0Sigor@sysoev.ru nxt_stream_source_connected(nxt_thread_t *thr, void *obj, void *data)
93*0Sigor@sysoev.ru {
94*0Sigor@sysoev.ru     nxt_event_conn_t     *c;
95*0Sigor@sysoev.ru     nxt_stream_source_t  *stream;
96*0Sigor@sysoev.ru 
97*0Sigor@sysoev.ru     c = obj;
98*0Sigor@sysoev.ru     stream = data;
99*0Sigor@sysoev.ru 
100*0Sigor@sysoev.ru     nxt_log_debug(thr->log, "stream source connected fd:%d", c->socket.fd);
101*0Sigor@sysoev.ru 
102*0Sigor@sysoev.ru     c->read_state = &nxt_stream_source_response_ready_state;
103*0Sigor@sysoev.ru     c->write = stream->out;
104*0Sigor@sysoev.ru     c->write_state = &nxt_stream_source_request_write_state;
105*0Sigor@sysoev.ru 
106*0Sigor@sysoev.ru     if (thr->engine->batch != 0) {
107*0Sigor@sysoev.ru         nxt_event_conn_write(thr, c);
108*0Sigor@sysoev.ru 
109*0Sigor@sysoev.ru     } else {
110*0Sigor@sysoev.ru         stream->read_queued = 1;
111*0Sigor@sysoev.ru         nxt_thread_work_queue_add(thr, &thr->engine->read_work_queue,
112*0Sigor@sysoev.ru                                   c->io->read, c, stream, thr->log);
113*0Sigor@sysoev.ru 
114*0Sigor@sysoev.ru         c->io->write(thr, c, stream);
115*0Sigor@sysoev.ru     }
116*0Sigor@sysoev.ru }
117*0Sigor@sysoev.ru 
118*0Sigor@sysoev.ru 
119*0Sigor@sysoev.ru static const nxt_event_conn_state_t  nxt_stream_source_request_write_state
120*0Sigor@sysoev.ru     nxt_aligned(64) =
121*0Sigor@sysoev.ru {
122*0Sigor@sysoev.ru     NXT_EVENT_NO_BUF_PROCESS,
123*0Sigor@sysoev.ru     NXT_EVENT_TIMER_AUTORESET,
124*0Sigor@sysoev.ru 
125*0Sigor@sysoev.ru     nxt_stream_source_write_ready,
126*0Sigor@sysoev.ru     NULL,
127*0Sigor@sysoev.ru     nxt_stream_source_error,
128*0Sigor@sysoev.ru 
129*0Sigor@sysoev.ru     NULL, /* timeout */
130*0Sigor@sysoev.ru     NULL, /* timeout value */
131*0Sigor@sysoev.ru     0, /* connect_timeout */
132*0Sigor@sysoev.ru };
133*0Sigor@sysoev.ru 
134*0Sigor@sysoev.ru 
135*0Sigor@sysoev.ru static const nxt_event_conn_state_t nxt_stream_source_response_ready_state
136*0Sigor@sysoev.ru     nxt_aligned(64) =
137*0Sigor@sysoev.ru {
138*0Sigor@sysoev.ru     NXT_EVENT_NO_BUF_PROCESS,
139*0Sigor@sysoev.ru     NXT_EVENT_TIMER_AUTORESET,
140*0Sigor@sysoev.ru 
141*0Sigor@sysoev.ru     nxt_stream_source_read_ready,
142*0Sigor@sysoev.ru     nxt_stream_source_closed,
143*0Sigor@sysoev.ru     nxt_stream_source_error,
144*0Sigor@sysoev.ru 
145*0Sigor@sysoev.ru     NULL, /* timeout */
146*0Sigor@sysoev.ru     NULL, /* timeout value */
147*0Sigor@sysoev.ru     0, /* connect_timeout */
148*0Sigor@sysoev.ru };
149*0Sigor@sysoev.ru 
150*0Sigor@sysoev.ru 
151*0Sigor@sysoev.ru static void
152*0Sigor@sysoev.ru nxt_stream_source_write_ready(nxt_thread_t *thr, void *obj, void *data)
153*0Sigor@sysoev.ru {
154*0Sigor@sysoev.ru     nxt_event_conn_t  *c;
155*0Sigor@sysoev.ru 
156*0Sigor@sysoev.ru     c = obj;
157*0Sigor@sysoev.ru 
158*0Sigor@sysoev.ru     nxt_log_debug(thr->log, "stream source write ready fd:%d", c->socket.fd);
159*0Sigor@sysoev.ru 
160*0Sigor@sysoev.ru     nxt_event_conn_read(thr, c);
161*0Sigor@sysoev.ru }
162*0Sigor@sysoev.ru 
163*0Sigor@sysoev.ru 
164*0Sigor@sysoev.ru static void
165*0Sigor@sysoev.ru nxt_stream_source_read_ready(nxt_thread_t *thr, void *obj, void *data)
166*0Sigor@sysoev.ru {
167*0Sigor@sysoev.ru     nxt_int_t            ret;
168*0Sigor@sysoev.ru     nxt_buf_t            *b;
169*0Sigor@sysoev.ru     nxt_buf_pool_t       *buffers;
170*0Sigor@sysoev.ru     nxt_event_conn_t     *c;
171*0Sigor@sysoev.ru     nxt_stream_source_t  *stream;
172*0Sigor@sysoev.ru 
173*0Sigor@sysoev.ru     c = obj;
174*0Sigor@sysoev.ru     stream = data;
175*0Sigor@sysoev.ru     stream->read_queued = 0;
176*0Sigor@sysoev.ru 
177*0Sigor@sysoev.ru     nxt_log_debug(thr->log, "stream source read ready fd:%d", c->socket.fd);
178*0Sigor@sysoev.ru 
179*0Sigor@sysoev.ru     if (c->read == NULL) {
180*0Sigor@sysoev.ru 
181*0Sigor@sysoev.ru         buffers = &stream->upstream->buffers;
182*0Sigor@sysoev.ru 
183*0Sigor@sysoev.ru         ret = nxt_buf_pool_mem_alloc(buffers, 0);
184*0Sigor@sysoev.ru 
185*0Sigor@sysoev.ru         if (nxt_slow_path(ret != NXT_OK)) {
186*0Sigor@sysoev.ru 
187*0Sigor@sysoev.ru             if (nxt_slow_path(ret == NXT_ERROR)) {
188*0Sigor@sysoev.ru                 goto fail;
189*0Sigor@sysoev.ru             }
190*0Sigor@sysoev.ru 
191*0Sigor@sysoev.ru             /* ret == NXT_AGAIN */
192*0Sigor@sysoev.ru 
193*0Sigor@sysoev.ru             nxt_log_debug(thr->log, "stream source flush");
194*0Sigor@sysoev.ru 
195*0Sigor@sysoev.ru             b = nxt_buf_sync_alloc(buffers->mem_pool, NXT_BUF_SYNC_NOBUF);
196*0Sigor@sysoev.ru 
197*0Sigor@sysoev.ru             if (nxt_slow_path(b == NULL)) {
198*0Sigor@sysoev.ru                 goto fail;
199*0Sigor@sysoev.ru             }
200*0Sigor@sysoev.ru 
201*0Sigor@sysoev.ru             nxt_event_fd_block_read(thr->engine, &c->socket);
202*0Sigor@sysoev.ru 
203*0Sigor@sysoev.ru             nxt_source_filter(thr, c->write_work_queue, stream->next, b);
204*0Sigor@sysoev.ru             return;
205*0Sigor@sysoev.ru         }
206*0Sigor@sysoev.ru 
207*0Sigor@sysoev.ru         c->read = buffers->current;
208*0Sigor@sysoev.ru         buffers->current = NULL;
209*0Sigor@sysoev.ru     }
210*0Sigor@sysoev.ru 
211*0Sigor@sysoev.ru     c->read_state = &nxt_stream_source_response_read_state;
212*0Sigor@sysoev.ru 
213*0Sigor@sysoev.ru     nxt_event_conn_read(thr, c);
214*0Sigor@sysoev.ru     return;
215*0Sigor@sysoev.ru 
216*0Sigor@sysoev.ru fail:
217*0Sigor@sysoev.ru 
218*0Sigor@sysoev.ru     nxt_stream_source_close(thr, stream);
219*0Sigor@sysoev.ru }
220*0Sigor@sysoev.ru 
221*0Sigor@sysoev.ru 
222*0Sigor@sysoev.ru static const nxt_event_conn_state_t nxt_stream_source_response_read_state
223*0Sigor@sysoev.ru     nxt_aligned(64) =
224*0Sigor@sysoev.ru {
225*0Sigor@sysoev.ru     NXT_EVENT_NO_BUF_PROCESS,
226*0Sigor@sysoev.ru     NXT_EVENT_TIMER_AUTORESET,
227*0Sigor@sysoev.ru 
228*0Sigor@sysoev.ru     nxt_stream_source_read_done,
229*0Sigor@sysoev.ru     nxt_stream_source_closed,
230*0Sigor@sysoev.ru     nxt_stream_source_error,
231*0Sigor@sysoev.ru 
232*0Sigor@sysoev.ru     NULL, /* timeout */
233*0Sigor@sysoev.ru     NULL, /* timeout value */
234*0Sigor@sysoev.ru     0, /* connect_timeout */
235*0Sigor@sysoev.ru };
236*0Sigor@sysoev.ru 
237*0Sigor@sysoev.ru 
238*0Sigor@sysoev.ru static void
239*0Sigor@sysoev.ru nxt_stream_source_read_done(nxt_thread_t *thr, void *obj, void *data)
240*0Sigor@sysoev.ru {
241*0Sigor@sysoev.ru     nxt_buf_t            *b;
242*0Sigor@sysoev.ru     nxt_bool_t           batch;
243*0Sigor@sysoev.ru     nxt_event_conn_t     *c;
244*0Sigor@sysoev.ru     nxt_stream_source_t  *stream;
245*0Sigor@sysoev.ru 
246*0Sigor@sysoev.ru     c = obj;
247*0Sigor@sysoev.ru     stream = data;
248*0Sigor@sysoev.ru 
249*0Sigor@sysoev.ru     nxt_log_debug(thr->log, "stream source read done fd:%d", c->socket.fd);
250*0Sigor@sysoev.ru 
251*0Sigor@sysoev.ru     if (c->read != NULL) {
252*0Sigor@sysoev.ru         b = nxt_stream_source_process_buffers(stream, c);
253*0Sigor@sysoev.ru 
254*0Sigor@sysoev.ru         if (nxt_slow_path(b == NULL)) {
255*0Sigor@sysoev.ru             nxt_stream_source_close(thr, stream);
256*0Sigor@sysoev.ru             return;
257*0Sigor@sysoev.ru         }
258*0Sigor@sysoev.ru 
259*0Sigor@sysoev.ru         batch = (thr->engine->batch != 0);
260*0Sigor@sysoev.ru 
261*0Sigor@sysoev.ru         if (batch) {
262*0Sigor@sysoev.ru             nxt_thread_work_queue_add(thr, stream->upstream->work_queue,
263*0Sigor@sysoev.ru                                       nxt_source_filter_handler,
264*0Sigor@sysoev.ru                                       stream->next, b, thr->log);
265*0Sigor@sysoev.ru         }
266*0Sigor@sysoev.ru 
267*0Sigor@sysoev.ru         if (!stream->read_queued) {
268*0Sigor@sysoev.ru             stream->read_queued = 1;
269*0Sigor@sysoev.ru             nxt_thread_work_queue_add(thr, stream->upstream->work_queue,
270*0Sigor@sysoev.ru                                       nxt_stream_source_read_ready,
271*0Sigor@sysoev.ru                                       c, stream, thr->log);
272*0Sigor@sysoev.ru         }
273*0Sigor@sysoev.ru 
274*0Sigor@sysoev.ru         if (!batch) {
275*0Sigor@sysoev.ru             stream->next->filter(thr, stream->next->context, b);
276*0Sigor@sysoev.ru         }
277*0Sigor@sysoev.ru     }
278*0Sigor@sysoev.ru }
279*0Sigor@sysoev.ru 
280*0Sigor@sysoev.ru 
281*0Sigor@sysoev.ru static nxt_buf_t *
282*0Sigor@sysoev.ru nxt_stream_source_process_buffers(nxt_stream_source_t *stream,
283*0Sigor@sysoev.ru     nxt_event_conn_t *c)
284*0Sigor@sysoev.ru {
285*0Sigor@sysoev.ru     size_t     size, nbytes;
286*0Sigor@sysoev.ru     nxt_buf_t  *b, *in, *head, **prev;
287*0Sigor@sysoev.ru 
288*0Sigor@sysoev.ru     nbytes = c->nbytes;
289*0Sigor@sysoev.ru     prev = &head;
290*0Sigor@sysoev.ru 
291*0Sigor@sysoev.ru     do {
292*0Sigor@sysoev.ru         b = nxt_buf_mem_alloc(stream->upstream->buffers.mem_pool, 0, 0);
293*0Sigor@sysoev.ru 
294*0Sigor@sysoev.ru         if (nxt_slow_path(b == NULL)) {
295*0Sigor@sysoev.ru             return NULL;
296*0Sigor@sysoev.ru         }
297*0Sigor@sysoev.ru 
298*0Sigor@sysoev.ru         *prev = b;
299*0Sigor@sysoev.ru 
300*0Sigor@sysoev.ru         b->data = stream;
301*0Sigor@sysoev.ru         b->completion_handler = nxt_stream_source_buf_completion;
302*0Sigor@sysoev.ru 
303*0Sigor@sysoev.ru         in = c->read;
304*0Sigor@sysoev.ru         in->retain++;
305*0Sigor@sysoev.ru         b->parent = in;
306*0Sigor@sysoev.ru 
307*0Sigor@sysoev.ru         b->mem.pos = in->mem.free;
308*0Sigor@sysoev.ru         b->mem.start = in->mem.free;
309*0Sigor@sysoev.ru 
310*0Sigor@sysoev.ru         size = nxt_buf_mem_free_size(&in->mem);
311*0Sigor@sysoev.ru 
312*0Sigor@sysoev.ru         if (nbytes < size) {
313*0Sigor@sysoev.ru             in->mem.free += nbytes;
314*0Sigor@sysoev.ru 
315*0Sigor@sysoev.ru             b->mem.free = in->mem.free;
316*0Sigor@sysoev.ru             b->mem.end = in->mem.free;
317*0Sigor@sysoev.ru 
318*0Sigor@sysoev.ru             break;
319*0Sigor@sysoev.ru         }
320*0Sigor@sysoev.ru 
321*0Sigor@sysoev.ru         in->mem.free = in->mem.end;
322*0Sigor@sysoev.ru 
323*0Sigor@sysoev.ru         b->mem.free = in->mem.free;
324*0Sigor@sysoev.ru         b->mem.end = in->mem.free;
325*0Sigor@sysoev.ru         nbytes -= size;
326*0Sigor@sysoev.ru 
327*0Sigor@sysoev.ru         prev = &b->next;
328*0Sigor@sysoev.ru         c->read = in->next;
329*0Sigor@sysoev.ru         in->next = NULL;
330*0Sigor@sysoev.ru 
331*0Sigor@sysoev.ru     } while (c->read != NULL);
332*0Sigor@sysoev.ru 
333*0Sigor@sysoev.ru     return head;
334*0Sigor@sysoev.ru }
335*0Sigor@sysoev.ru 
336*0Sigor@sysoev.ru 
337*0Sigor@sysoev.ru static void
338*0Sigor@sysoev.ru nxt_stream_source_buf_completion(nxt_thread_t *thr, void *obj, void *data)
339*0Sigor@sysoev.ru {
340*0Sigor@sysoev.ru     size_t               size;
341*0Sigor@sysoev.ru     nxt_buf_t            *b, *parent;
342*0Sigor@sysoev.ru     nxt_stream_source_t  *stream;
343*0Sigor@sysoev.ru 
344*0Sigor@sysoev.ru     b = obj;
345*0Sigor@sysoev.ru     parent = data;
346*0Sigor@sysoev.ru 
347*0Sigor@sysoev.ru #if 0
348*0Sigor@sysoev.ru     nxt_log_debug(thr->log,
349*0Sigor@sysoev.ru                   "stream source buf completion: %p parent:%p retain:%uD",
350*0Sigor@sysoev.ru                   b, parent, parent->retain);
351*0Sigor@sysoev.ru #endif
352*0Sigor@sysoev.ru 
353*0Sigor@sysoev.ru     stream = b->data;
354*0Sigor@sysoev.ru 
355*0Sigor@sysoev.ru     /* A parent is a buffer where stream reads data. */
356*0Sigor@sysoev.ru 
357*0Sigor@sysoev.ru     parent->mem.pos = b->mem.pos;
358*0Sigor@sysoev.ru     parent->retain--;
359*0Sigor@sysoev.ru 
360*0Sigor@sysoev.ru     if (parent->retain == 0 && !stream->conn->socket.closed) {
361*0Sigor@sysoev.ru         size = nxt_buf_mem_size(&parent->mem);
362*0Sigor@sysoev.ru 
363*0Sigor@sysoev.ru         parent->mem.pos = parent->mem.start;
364*0Sigor@sysoev.ru         parent->mem.free = parent->mem.start;
365*0Sigor@sysoev.ru 
366*0Sigor@sysoev.ru         /*
367*0Sigor@sysoev.ru          * A buffer's original size can be changed by filters
368*0Sigor@sysoev.ru          * so reuse the buffer only if it is still large enough.
369*0Sigor@sysoev.ru          */
370*0Sigor@sysoev.ru         if (size >= 256 || size >= stream->upstream->buffers.size) {
371*0Sigor@sysoev.ru 
372*0Sigor@sysoev.ru             if (stream->conn->read != parent) {
373*0Sigor@sysoev.ru                 nxt_buf_chain_add(&stream->conn->read, parent);
374*0Sigor@sysoev.ru             }
375*0Sigor@sysoev.ru 
376*0Sigor@sysoev.ru             if (!stream->read_queued) {
377*0Sigor@sysoev.ru                 stream->read_queued = 1;
378*0Sigor@sysoev.ru                 nxt_thread_work_queue_add(thr, stream->upstream->work_queue,
379*0Sigor@sysoev.ru                                           nxt_stream_source_read_ready,
380*0Sigor@sysoev.ru                                           stream->conn,
381*0Sigor@sysoev.ru                                           stream->conn->socket.data,
382*0Sigor@sysoev.ru                                           stream->conn->socket.log);
383*0Sigor@sysoev.ru             }
384*0Sigor@sysoev.ru         }
385*0Sigor@sysoev.ru     }
386*0Sigor@sysoev.ru 
387*0Sigor@sysoev.ru     nxt_buf_free(stream->upstream->buffers.mem_pool, b);
388*0Sigor@sysoev.ru }
389*0Sigor@sysoev.ru 
390*0Sigor@sysoev.ru 
391*0Sigor@sysoev.ru static void
392*0Sigor@sysoev.ru nxt_stream_source_refused(nxt_thread_t *thr, void *obj, void *data)
393*0Sigor@sysoev.ru {
394*0Sigor@sysoev.ru     nxt_stream_source_t  *stream;
395*0Sigor@sysoev.ru 
396*0Sigor@sysoev.ru     stream = data;
397*0Sigor@sysoev.ru 
398*0Sigor@sysoev.ru #if (NXT_DEBUG)
399*0Sigor@sysoev.ru     {
400*0Sigor@sysoev.ru         nxt_event_conn_t  *c;
401*0Sigor@sysoev.ru 
402*0Sigor@sysoev.ru         c = obj;
403*0Sigor@sysoev.ru 
404*0Sigor@sysoev.ru         nxt_log_debug(thr->log, "stream source refused fd:%d", c->socket.fd);
405*0Sigor@sysoev.ru     }
406*0Sigor@sysoev.ru #endif
407*0Sigor@sysoev.ru 
408*0Sigor@sysoev.ru     nxt_stream_source_close(thr, stream);
409*0Sigor@sysoev.ru }
410*0Sigor@sysoev.ru 
411*0Sigor@sysoev.ru 
412*0Sigor@sysoev.ru static void
413*0Sigor@sysoev.ru nxt_stream_source_closed(nxt_thread_t *thr, void *obj, void *data)
414*0Sigor@sysoev.ru {
415*0Sigor@sysoev.ru     nxt_buf_t            *b;
416*0Sigor@sysoev.ru     nxt_event_conn_t     *c;
417*0Sigor@sysoev.ru     nxt_stream_source_t  *stream;
418*0Sigor@sysoev.ru 
419*0Sigor@sysoev.ru     c = obj;
420*0Sigor@sysoev.ru     stream = data;
421*0Sigor@sysoev.ru 
422*0Sigor@sysoev.ru     nxt_log_debug(thr->log, "stream source closed fd:%d", c->socket.fd);
423*0Sigor@sysoev.ru 
424*0Sigor@sysoev.ru     nxt_event_conn_close(thr, c);
425*0Sigor@sysoev.ru 
426*0Sigor@sysoev.ru     b = nxt_buf_sync_alloc(stream->upstream->buffers.mem_pool,
427*0Sigor@sysoev.ru                            NXT_BUF_SYNC_LAST);
428*0Sigor@sysoev.ru 
429*0Sigor@sysoev.ru     if (nxt_slow_path(b == NULL)) {
430*0Sigor@sysoev.ru         stream->error_handler(stream);
431*0Sigor@sysoev.ru         return;
432*0Sigor@sysoev.ru     }
433*0Sigor@sysoev.ru 
434*0Sigor@sysoev.ru     nxt_source_filter(thr, c->write_work_queue, stream->next, b);
435*0Sigor@sysoev.ru }
436*0Sigor@sysoev.ru 
437*0Sigor@sysoev.ru 
438*0Sigor@sysoev.ru static void
439*0Sigor@sysoev.ru nxt_stream_source_error(nxt_thread_t *thr, void *obj, void *data)
440*0Sigor@sysoev.ru {
441*0Sigor@sysoev.ru     nxt_stream_source_t  *stream;
442*0Sigor@sysoev.ru 
443*0Sigor@sysoev.ru     stream = data;
444*0Sigor@sysoev.ru 
445*0Sigor@sysoev.ru #if (NXT_DEBUG)
446*0Sigor@sysoev.ru     {
447*0Sigor@sysoev.ru         nxt_event_fd_t  *ev;
448*0Sigor@sysoev.ru 
449*0Sigor@sysoev.ru         ev = obj;
450*0Sigor@sysoev.ru 
451*0Sigor@sysoev.ru         nxt_log_debug(thr->log, "stream source error fd:%d", ev->fd);
452*0Sigor@sysoev.ru     }
453*0Sigor@sysoev.ru #endif
454*0Sigor@sysoev.ru 
455*0Sigor@sysoev.ru     nxt_stream_source_close(thr, stream);
456*0Sigor@sysoev.ru }
457*0Sigor@sysoev.ru 
458*0Sigor@sysoev.ru 
459*0Sigor@sysoev.ru static void
460*0Sigor@sysoev.ru nxt_stream_source_close(nxt_thread_t *thr, nxt_stream_source_t *stream)
461*0Sigor@sysoev.ru {
462*0Sigor@sysoev.ru     nxt_event_conn_close(thr, stream->conn);
463*0Sigor@sysoev.ru 
464*0Sigor@sysoev.ru     stream->error_handler(stream);
465*0Sigor@sysoev.ru }
466*0Sigor@sysoev.ru 
467*0Sigor@sysoev.ru 
468*0Sigor@sysoev.ru void
469*0Sigor@sysoev.ru nxt_source_filter_handler(nxt_thread_t *thr, void *obj, void *data)
470*0Sigor@sysoev.ru {
471*0Sigor@sysoev.ru     nxt_source_hook_t  *next;
472*0Sigor@sysoev.ru 
473*0Sigor@sysoev.ru     next = obj;
474*0Sigor@sysoev.ru 
475*0Sigor@sysoev.ru     next->filter(thr, next->context, data);
476*0Sigor@sysoev.ru }
477