xref: /unit/src/nxt_stream_source.c (revision 0:a63ceefd6ab0)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 static void nxt_stream_source_connected(nxt_thread_t *thr, void *obj,
11     void *data);
12 static void nxt_stream_source_write_ready(nxt_thread_t *thr, void *obj,
13     void *data);
14 static void nxt_stream_source_read_ready(nxt_thread_t *thr, void *obj,
15     void *data);
16 static nxt_buf_t *nxt_stream_source_process_buffers(nxt_stream_source_t *stream,
17     nxt_event_conn_t *c);
18 static void nxt_stream_source_buf_completion(nxt_thread_t *thr, void *obj,
19     void *data);
20 static void nxt_stream_source_read_done(nxt_thread_t *thr, void *obj,
21     void *data);
22 static void nxt_stream_source_refused(nxt_thread_t *thr, void *obj, void *data);
23 static void nxt_stream_source_closed(nxt_thread_t *thr, void *obj, void *data);
24 static void nxt_stream_source_error(nxt_thread_t *thr, void *obj, void *data);
25 static void nxt_stream_source_close(nxt_thread_t *thr,
26     nxt_stream_source_t *stream);
27 
28 
29 static const nxt_event_conn_state_t  nxt_stream_source_connect_state;
30 static const nxt_event_conn_state_t  nxt_stream_source_request_write_state;
31 static const nxt_event_conn_state_t  nxt_stream_source_response_ready_state;
32 static const nxt_event_conn_state_t  nxt_stream_source_response_read_state;
33 
34 
35 void
36 nxt_stream_source_connect(nxt_stream_source_t *stream)
37 {
38     nxt_thread_t          *thr;
39     nxt_event_conn_t      *c;
40     nxt_upstream_source_t  *us;
41 
42     thr = nxt_thread();
43 
44     us = stream->upstream;
45 
46     if (nxt_slow_path(!nxt_buf_pool_obtainable(&us->buffers))) {
47         nxt_thread_log_error(NXT_LOG_ERR, "%d buffers %uDK each "
48                              "are not enough to read upstream response",
49                              us->buffers.max, us->buffers.size / 1024);
50         goto fail;
51     }
52 
53     c = nxt_event_conn_create(us->buffers.mem_pool, thr->log);
54     if (nxt_slow_path(c == NULL)) {
55         goto fail;
56     }
57 
58     stream->conn = c;
59     c->socket.data = stream;
60 
61     nxt_event_conn_work_queue_set(c, us->work_queue);
62 
63     c->remote = us->peer->sockaddr;
64     c->write_state = &nxt_stream_source_connect_state;
65 
66     nxt_event_conn_connect(thr, c);
67     return;
68 
69 fail:
70 
71     stream->error_handler(stream);
72 }
73 
74 
75 static const nxt_event_conn_state_t  nxt_stream_source_connect_state
76     nxt_aligned(64) =
77 {
78     NXT_EVENT_NO_BUF_PROCESS,
79     NXT_EVENT_TIMER_AUTORESET,
80 
81     nxt_stream_source_connected,
82     nxt_stream_source_refused,
83     nxt_stream_source_error,
84 
85     NULL, /* timeout */
86     NULL, /* timeout value */
87     0, /* connect_timeout */
88 };
89 
90 
91 static void
92 nxt_stream_source_connected(nxt_thread_t *thr, void *obj, void *data)
93 {
94     nxt_event_conn_t     *c;
95     nxt_stream_source_t  *stream;
96 
97     c = obj;
98     stream = data;
99 
100     nxt_log_debug(thr->log, "stream source connected fd:%d", c->socket.fd);
101 
102     c->read_state = &nxt_stream_source_response_ready_state;
103     c->write = stream->out;
104     c->write_state = &nxt_stream_source_request_write_state;
105 
106     if (thr->engine->batch != 0) {
107         nxt_event_conn_write(thr, c);
108 
109     } else {
110         stream->read_queued = 1;
111         nxt_thread_work_queue_add(thr, &thr->engine->read_work_queue,
112                                   c->io->read, c, stream, thr->log);
113 
114         c->io->write(thr, c, stream);
115     }
116 }
117 
118 
119 static const nxt_event_conn_state_t  nxt_stream_source_request_write_state
120     nxt_aligned(64) =
121 {
122     NXT_EVENT_NO_BUF_PROCESS,
123     NXT_EVENT_TIMER_AUTORESET,
124 
125     nxt_stream_source_write_ready,
126     NULL,
127     nxt_stream_source_error,
128 
129     NULL, /* timeout */
130     NULL, /* timeout value */
131     0, /* connect_timeout */
132 };
133 
134 
135 static const nxt_event_conn_state_t nxt_stream_source_response_ready_state
136     nxt_aligned(64) =
137 {
138     NXT_EVENT_NO_BUF_PROCESS,
139     NXT_EVENT_TIMER_AUTORESET,
140 
141     nxt_stream_source_read_ready,
142     nxt_stream_source_closed,
143     nxt_stream_source_error,
144 
145     NULL, /* timeout */
146     NULL, /* timeout value */
147     0, /* connect_timeout */
148 };
149 
150 
151 static void
152 nxt_stream_source_write_ready(nxt_thread_t *thr, void *obj, void *data)
153 {
154     nxt_event_conn_t  *c;
155 
156     c = obj;
157 
158     nxt_log_debug(thr->log, "stream source write ready fd:%d", c->socket.fd);
159 
160     nxt_event_conn_read(thr, c);
161 }
162 
163 
164 static void
165 nxt_stream_source_read_ready(nxt_thread_t *thr, void *obj, void *data)
166 {
167     nxt_int_t            ret;
168     nxt_buf_t            *b;
169     nxt_buf_pool_t       *buffers;
170     nxt_event_conn_t     *c;
171     nxt_stream_source_t  *stream;
172 
173     c = obj;
174     stream = data;
175     stream->read_queued = 0;
176 
177     nxt_log_debug(thr->log, "stream source read ready fd:%d", c->socket.fd);
178 
179     if (c->read == NULL) {
180 
181         buffers = &stream->upstream->buffers;
182 
183         ret = nxt_buf_pool_mem_alloc(buffers, 0);
184 
185         if (nxt_slow_path(ret != NXT_OK)) {
186 
187             if (nxt_slow_path(ret == NXT_ERROR)) {
188                 goto fail;
189             }
190 
191             /* ret == NXT_AGAIN */
192 
193             nxt_log_debug(thr->log, "stream source flush");
194 
195             b = nxt_buf_sync_alloc(buffers->mem_pool, NXT_BUF_SYNC_NOBUF);
196 
197             if (nxt_slow_path(b == NULL)) {
198                 goto fail;
199             }
200 
201             nxt_event_fd_block_read(thr->engine, &c->socket);
202 
203             nxt_source_filter(thr, c->write_work_queue, stream->next, b);
204             return;
205         }
206 
207         c->read = buffers->current;
208         buffers->current = NULL;
209     }
210 
211     c->read_state = &nxt_stream_source_response_read_state;
212 
213     nxt_event_conn_read(thr, c);
214     return;
215 
216 fail:
217 
218     nxt_stream_source_close(thr, stream);
219 }
220 
221 
222 static const nxt_event_conn_state_t nxt_stream_source_response_read_state
223     nxt_aligned(64) =
224 {
225     NXT_EVENT_NO_BUF_PROCESS,
226     NXT_EVENT_TIMER_AUTORESET,
227 
228     nxt_stream_source_read_done,
229     nxt_stream_source_closed,
230     nxt_stream_source_error,
231 
232     NULL, /* timeout */
233     NULL, /* timeout value */
234     0, /* connect_timeout */
235 };
236 
237 
238 static void
239 nxt_stream_source_read_done(nxt_thread_t *thr, void *obj, void *data)
240 {
241     nxt_buf_t            *b;
242     nxt_bool_t           batch;
243     nxt_event_conn_t     *c;
244     nxt_stream_source_t  *stream;
245 
246     c = obj;
247     stream = data;
248 
249     nxt_log_debug(thr->log, "stream source read done fd:%d", c->socket.fd);
250 
251     if (c->read != NULL) {
252         b = nxt_stream_source_process_buffers(stream, c);
253 
254         if (nxt_slow_path(b == NULL)) {
255             nxt_stream_source_close(thr, stream);
256             return;
257         }
258 
259         batch = (thr->engine->batch != 0);
260 
261         if (batch) {
262             nxt_thread_work_queue_add(thr, stream->upstream->work_queue,
263                                       nxt_source_filter_handler,
264                                       stream->next, b, thr->log);
265         }
266 
267         if (!stream->read_queued) {
268             stream->read_queued = 1;
269             nxt_thread_work_queue_add(thr, stream->upstream->work_queue,
270                                       nxt_stream_source_read_ready,
271                                       c, stream, thr->log);
272         }
273 
274         if (!batch) {
275             stream->next->filter(thr, stream->next->context, b);
276         }
277     }
278 }
279 
280 
281 static nxt_buf_t *
282 nxt_stream_source_process_buffers(nxt_stream_source_t *stream,
283     nxt_event_conn_t *c)
284 {
285     size_t     size, nbytes;
286     nxt_buf_t  *b, *in, *head, **prev;
287 
288     nbytes = c->nbytes;
289     prev = &head;
290 
291     do {
292         b = nxt_buf_mem_alloc(stream->upstream->buffers.mem_pool, 0, 0);
293 
294         if (nxt_slow_path(b == NULL)) {
295             return NULL;
296         }
297 
298         *prev = b;
299 
300         b->data = stream;
301         b->completion_handler = nxt_stream_source_buf_completion;
302 
303         in = c->read;
304         in->retain++;
305         b->parent = in;
306 
307         b->mem.pos = in->mem.free;
308         b->mem.start = in->mem.free;
309 
310         size = nxt_buf_mem_free_size(&in->mem);
311 
312         if (nbytes < size) {
313             in->mem.free += nbytes;
314 
315             b->mem.free = in->mem.free;
316             b->mem.end = in->mem.free;
317 
318             break;
319         }
320 
321         in->mem.free = in->mem.end;
322 
323         b->mem.free = in->mem.free;
324         b->mem.end = in->mem.free;
325         nbytes -= size;
326 
327         prev = &b->next;
328         c->read = in->next;
329         in->next = NULL;
330 
331     } while (c->read != NULL);
332 
333     return head;
334 }
335 
336 
337 static void
338 nxt_stream_source_buf_completion(nxt_thread_t *thr, void *obj, void *data)
339 {
340     size_t               size;
341     nxt_buf_t            *b, *parent;
342     nxt_stream_source_t  *stream;
343 
344     b = obj;
345     parent = data;
346 
347 #if 0
348     nxt_log_debug(thr->log,
349                   "stream source buf completion: %p parent:%p retain:%uD",
350                   b, parent, parent->retain);
351 #endif
352 
353     stream = b->data;
354 
355     /* A parent is a buffer where stream reads data. */
356 
357     parent->mem.pos = b->mem.pos;
358     parent->retain--;
359 
360     if (parent->retain == 0 && !stream->conn->socket.closed) {
361         size = nxt_buf_mem_size(&parent->mem);
362 
363         parent->mem.pos = parent->mem.start;
364         parent->mem.free = parent->mem.start;
365 
366         /*
367          * A buffer's original size can be changed by filters
368          * so reuse the buffer only if it is still large enough.
369          */
370         if (size >= 256 || size >= stream->upstream->buffers.size) {
371 
372             if (stream->conn->read != parent) {
373                 nxt_buf_chain_add(&stream->conn->read, parent);
374             }
375 
376             if (!stream->read_queued) {
377                 stream->read_queued = 1;
378                 nxt_thread_work_queue_add(thr, stream->upstream->work_queue,
379                                           nxt_stream_source_read_ready,
380                                           stream->conn,
381                                           stream->conn->socket.data,
382                                           stream->conn->socket.log);
383             }
384         }
385     }
386 
387     nxt_buf_free(stream->upstream->buffers.mem_pool, b);
388 }
389 
390 
391 static void
392 nxt_stream_source_refused(nxt_thread_t *thr, void *obj, void *data)
393 {
394     nxt_stream_source_t  *stream;
395 
396     stream = data;
397 
398 #if (NXT_DEBUG)
399     {
400         nxt_event_conn_t  *c;
401 
402         c = obj;
403 
404         nxt_log_debug(thr->log, "stream source refused fd:%d", c->socket.fd);
405     }
406 #endif
407 
408     nxt_stream_source_close(thr, stream);
409 }
410 
411 
412 static void
413 nxt_stream_source_closed(nxt_thread_t *thr, void *obj, void *data)
414 {
415     nxt_buf_t            *b;
416     nxt_event_conn_t     *c;
417     nxt_stream_source_t  *stream;
418 
419     c = obj;
420     stream = data;
421 
422     nxt_log_debug(thr->log, "stream source closed fd:%d", c->socket.fd);
423 
424     nxt_event_conn_close(thr, c);
425 
426     b = nxt_buf_sync_alloc(stream->upstream->buffers.mem_pool,
427                            NXT_BUF_SYNC_LAST);
428 
429     if (nxt_slow_path(b == NULL)) {
430         stream->error_handler(stream);
431         return;
432     }
433 
434     nxt_source_filter(thr, c->write_work_queue, stream->next, b);
435 }
436 
437 
438 static void
439 nxt_stream_source_error(nxt_thread_t *thr, void *obj, void *data)
440 {
441     nxt_stream_source_t  *stream;
442 
443     stream = data;
444 
445 #if (NXT_DEBUG)
446     {
447         nxt_event_fd_t  *ev;
448 
449         ev = obj;
450 
451         nxt_log_debug(thr->log, "stream source error fd:%d", ev->fd);
452     }
453 #endif
454 
455     nxt_stream_source_close(thr, stream);
456 }
457 
458 
459 static void
460 nxt_stream_source_close(nxt_thread_t *thr, nxt_stream_source_t *stream)
461 {
462     nxt_event_conn_close(thr, stream->conn);
463 
464     stream->error_handler(stream);
465 }
466 
467 
468 void
469 nxt_source_filter_handler(nxt_thread_t *thr, void *obj, void *data)
470 {
471     nxt_source_hook_t  *next;
472 
473     next = obj;
474 
475     next->filter(thr, next->context, data);
476 }
477