xref: /unit/src/nxt_stream_source.c (revision 62:5e1efcc7b740)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 static void nxt_stream_source_connected(nxt_task_t *task, void *obj,
11     void *data);
12 static void nxt_stream_source_write_ready(nxt_task_t *task, void *obj,
13     void *data);
14 static void nxt_stream_source_read_ready(nxt_task_t *task, void *obj,
15     void *data);
16 static nxt_buf_t *nxt_stream_source_process_buffers(nxt_stream_source_t *stream,
17     nxt_event_conn_t *c);
18 static void nxt_stream_source_buf_completion(nxt_task_t *task, void *obj,
19     void *data);
20 static void nxt_stream_source_read_done(nxt_task_t *task, void *obj,
21     void *data);
22 static void nxt_stream_source_refused(nxt_task_t *task, void *obj, void *data);
23 static void nxt_stream_source_closed(nxt_task_t *task, void *obj, void *data);
24 static void nxt_stream_source_error(nxt_task_t *task, void *obj, void *data);
25 static void nxt_stream_source_close(nxt_task_t *task,
26     nxt_stream_source_t *stream);
27 
28 
29 static const nxt_event_conn_state_t  nxt_stream_source_connect_state;
30 static const nxt_event_conn_state_t  nxt_stream_source_request_write_state;
31 static const nxt_event_conn_state_t  nxt_stream_source_response_ready_state;
32 static const nxt_event_conn_state_t  nxt_stream_source_response_read_state;
33 
34 
35 void
nxt_stream_source_connect(nxt_task_t * task,nxt_stream_source_t * stream)36 nxt_stream_source_connect(nxt_task_t *task, nxt_stream_source_t *stream)
37 {
38     nxt_thread_t          *thr;
39     nxt_event_conn_t      *c;
40     nxt_upstream_source_t  *us;
41 
42     thr = nxt_thread();
43 
44     us = stream->upstream;
45 
46     if (nxt_slow_path(!nxt_buf_pool_obtainable(&us->buffers))) {
47         nxt_log(task, NXT_LOG_ERR,
48                 "%d buffers %uDK each are not enough to read upstream response",
49                 us->buffers.max, us->buffers.size / 1024);
50         goto fail;
51     }
52 
53     c = nxt_event_conn_create(us->buffers.mem_pool, thr->log);
54     if (nxt_slow_path(c == NULL)) {
55         goto fail;
56     }
57 
58     stream->conn = c;
59     c->socket.data = stream;
60 
61     nxt_conn_work_queue_set(c, us->work_queue);
62 
63     c->remote = us->peer->sockaddr;
64     c->write_state = &nxt_stream_source_connect_state;
65 
66     nxt_event_conn_connect(task, c);
67     return;
68 
69 fail:
70 
71     stream->error_handler(task, stream);
72 }
73 
74 
75 static const nxt_event_conn_state_t  nxt_stream_source_connect_state
76     nxt_aligned(64) =
77 {
78     NXT_EVENT_NO_BUF_PROCESS,
79     NXT_EVENT_TIMER_AUTORESET,
80 
81     nxt_stream_source_connected,
82     nxt_stream_source_refused,
83     nxt_stream_source_error,
84 
85     NULL, /* timeout */
86     NULL, /* timeout value */
87     0, /* connect_timeout */
88 };
89 
90 
91 static void
nxt_stream_source_connected(nxt_task_t * task,void * obj,void * data)92 nxt_stream_source_connected(nxt_task_t *task, void *obj, void *data)
93 {
94     nxt_event_conn_t     *c;
95     nxt_stream_source_t  *stream;
96 
97     c = obj;
98     stream = data;
99 
100     nxt_debug(task, "stream source connected fd:%d", c->socket.fd);
101 
102     c->read_state = &nxt_stream_source_response_ready_state;
103     c->write = stream->out;
104     c->write_state = &nxt_stream_source_request_write_state;
105 
106     if (task->thread->engine->batch != 0) {
107         nxt_event_conn_write(task, c);
108 
109     } else {
110         stream->read_queued = 1;
111         nxt_thread_work_queue_add(task->thread,
112                                   &task->thread->engine->read_work_queue,
113                                   c->io->read, task, c, stream);
114 
115         c->io->write(task, c, stream);
116     }
117 }
118 
119 
120 static const nxt_event_conn_state_t  nxt_stream_source_request_write_state
121     nxt_aligned(64) =
122 {
123     NXT_EVENT_NO_BUF_PROCESS,
124     NXT_EVENT_TIMER_AUTORESET,
125 
126     nxt_stream_source_write_ready,
127     NULL,
128     nxt_stream_source_error,
129 
130     NULL, /* timeout */
131     NULL, /* timeout value */
132     0, /* connect_timeout */
133 };
134 
135 
136 static const nxt_event_conn_state_t nxt_stream_source_response_ready_state
137     nxt_aligned(64) =
138 {
139     NXT_EVENT_NO_BUF_PROCESS,
140     NXT_EVENT_TIMER_AUTORESET,
141 
142     nxt_stream_source_read_ready,
143     nxt_stream_source_closed,
144     nxt_stream_source_error,
145 
146     NULL, /* timeout */
147     NULL, /* timeout value */
148     0, /* connect_timeout */
149 };
150 
151 
152 static void
nxt_stream_source_write_ready(nxt_task_t * task,void * obj,void * data)153 nxt_stream_source_write_ready(nxt_task_t *task, void *obj, void *data)
154 {
155     nxt_event_conn_t  *c;
156 
157     c = obj;
158 
159     nxt_debug(task, "stream source write ready fd:%d", c->socket.fd);
160 
161     nxt_conn_read(task, c);
162 }
163 
164 
165 static void
nxt_stream_source_read_ready(nxt_task_t * task,void * obj,void * data)166 nxt_stream_source_read_ready(nxt_task_t *task, void *obj, void *data)
167 {
168     nxt_int_t            ret;
169     nxt_buf_t            *b;
170     nxt_buf_pool_t       *buffers;
171     nxt_event_conn_t     *c;
172     nxt_stream_source_t  *stream;
173 
174     c = obj;
175     stream = data;
176     stream->read_queued = 0;
177 
178     nxt_debug(task, "stream source read ready fd:%d", c->socket.fd);
179 
180     if (c->read == NULL) {
181 
182         buffers = &stream->upstream->buffers;
183 
184         ret = nxt_buf_pool_mem_alloc(buffers, 0);
185 
186         if (nxt_slow_path(ret != NXT_OK)) {
187 
188             if (nxt_slow_path(ret == NXT_ERROR)) {
189                 goto fail;
190             }
191 
192             /* ret == NXT_AGAIN */
193 
194             nxt_debug(task, "stream source flush");
195 
196             b = nxt_buf_sync_alloc(buffers->mem_pool, NXT_BUF_SYNC_NOBUF);
197 
198             if (nxt_slow_path(b == NULL)) {
199                 goto fail;
200             }
201 
202             nxt_event_fd_block_read(task->thread->engine, &c->socket);
203 
204             nxt_source_filter(task->thread, c->write_work_queue, task,
205                               stream->next, b);
206             return;
207         }
208 
209         c->read = buffers->current;
210         buffers->current = NULL;
211     }
212 
213     c->read_state = &nxt_stream_source_response_read_state;
214 
215     nxt_conn_read(task, c);
216     return;
217 
218 fail:
219 
220     nxt_stream_source_close(task, stream);
221 }
222 
223 
224 static const nxt_event_conn_state_t nxt_stream_source_response_read_state
225     nxt_aligned(64) =
226 {
227     NXT_EVENT_NO_BUF_PROCESS,
228     NXT_EVENT_TIMER_AUTORESET,
229 
230     nxt_stream_source_read_done,
231     nxt_stream_source_closed,
232     nxt_stream_source_error,
233 
234     NULL, /* timeout */
235     NULL, /* timeout value */
236     0, /* connect_timeout */
237 };
238 
239 
240 static void
nxt_stream_source_read_done(nxt_task_t * task,void * obj,void * data)241 nxt_stream_source_read_done(nxt_task_t *task, void *obj, void *data)
242 {
243     nxt_buf_t            *b;
244     nxt_bool_t           batch;
245     nxt_event_conn_t     *c;
246     nxt_stream_source_t  *stream;
247 
248     c = obj;
249     stream = data;
250 
251     nxt_debug(task, "stream source read done fd:%d", c->socket.fd);
252 
253     if (c->read != NULL) {
254         b = nxt_stream_source_process_buffers(stream, c);
255 
256         if (nxt_slow_path(b == NULL)) {
257             nxt_stream_source_close(task, stream);
258             return;
259         }
260 
261         batch = (task->thread->engine->batch != 0);
262 
263         if (batch) {
264             nxt_thread_work_queue_add(task->thread,
265                                       stream->upstream->work_queue,
266                                       nxt_source_filter_handler,
267                                       task, stream->next, b);
268         }
269 
270         if (!stream->read_queued) {
271             stream->read_queued = 1;
272             nxt_thread_work_queue_add(task->thread,
273                                       stream->upstream->work_queue,
274                                       nxt_stream_source_read_ready,
275                                       task, c, stream);
276         }
277 
278         if (!batch) {
279             stream->next->filter(task, stream->next->context, b);
280         }
281     }
282 }
283 
284 
285 static nxt_buf_t *
nxt_stream_source_process_buffers(nxt_stream_source_t * stream,nxt_event_conn_t * c)286 nxt_stream_source_process_buffers(nxt_stream_source_t *stream,
287     nxt_event_conn_t *c)
288 {
289     size_t     size, nbytes;
290     nxt_buf_t  *b, *in, *head, **prev;
291 
292     nbytes = c->nbytes;
293     prev = &head;
294 
295     do {
296         b = nxt_buf_mem_alloc(stream->upstream->buffers.mem_pool, 0, 0);
297 
298         if (nxt_slow_path(b == NULL)) {
299             return NULL;
300         }
301 
302         *prev = b;
303 
304         b->data = stream;
305         b->completion_handler = nxt_stream_source_buf_completion;
306 
307         in = c->read;
308         in->retain++;
309         b->parent = in;
310 
311         b->mem.pos = in->mem.free;
312         b->mem.start = in->mem.free;
313 
314         size = nxt_buf_mem_free_size(&in->mem);
315 
316         if (nbytes < size) {
317             in->mem.free += nbytes;
318 
319             b->mem.free = in->mem.free;
320             b->mem.end = in->mem.free;
321 
322             break;
323         }
324 
325         in->mem.free = in->mem.end;
326 
327         b->mem.free = in->mem.free;
328         b->mem.end = in->mem.free;
329         nbytes -= size;
330 
331         prev = &b->next;
332         c->read = in->next;
333         in->next = NULL;
334 
335     } while (c->read != NULL);
336 
337     return head;
338 }
339 
340 
341 static void
nxt_stream_source_buf_completion(nxt_task_t * task,void * obj,void * data)342 nxt_stream_source_buf_completion(nxt_task_t *task, void *obj, void *data)
343 {
344     size_t               size;
345     nxt_buf_t            *b, *parent;
346     nxt_stream_source_t  *stream;
347 
348     b = obj;
349     parent = data;
350 
351 #if 0
352     nxt_debug(thr->log,
353                   "stream source buf completion: %p parent:%p retain:%uD",
354                   b, parent, parent->retain);
355 #endif
356 
357     stream = b->data;
358 
359     /* A parent is a buffer where stream reads data. */
360 
361     parent->mem.pos = b->mem.pos;
362     parent->retain--;
363 
364     if (parent->retain == 0 && !stream->conn->socket.closed) {
365         size = nxt_buf_mem_size(&parent->mem);
366 
367         parent->mem.pos = parent->mem.start;
368         parent->mem.free = parent->mem.start;
369 
370         /*
371          * A buffer's original size can be changed by filters
372          * so reuse the buffer only if it is still large enough.
373          */
374         if (size >= 256 || size >= stream->upstream->buffers.size) {
375 
376             if (stream->conn->read != parent) {
377                 nxt_buf_chain_add(&stream->conn->read, parent);
378             }
379 
380             if (!stream->read_queued) {
381                 stream->read_queued = 1;
382                 nxt_thread_work_queue_add(task->thread,
383                                           stream->upstream->work_queue,
384                                           nxt_stream_source_read_ready,
385                                           task, stream->conn,
386                                           stream->conn->socket.data);
387             }
388         }
389     }
390 
391     nxt_buf_free(stream->upstream->buffers.mem_pool, b);
392 }
393 
394 
395 static void
nxt_stream_source_refused(nxt_task_t * task,void * obj,void * data)396 nxt_stream_source_refused(nxt_task_t *task, void *obj, void *data)
397 {
398     nxt_stream_source_t  *stream;
399 
400     stream = data;
401 
402 #if (NXT_DEBUG)
403     {
404         nxt_event_conn_t  *c;
405 
406         c = obj;
407 
408         nxt_debug(task, "stream source refused fd:%d", c->socket.fd);
409     }
410 #endif
411 
412     nxt_stream_source_close(task, stream);
413 }
414 
415 
416 static void
nxt_stream_source_closed(nxt_task_t * task,void * obj,void * data)417 nxt_stream_source_closed(nxt_task_t *task, void *obj, void *data)
418 {
419     nxt_buf_t            *b;
420     nxt_event_conn_t     *c;
421     nxt_stream_source_t  *stream;
422 
423     c = obj;
424     stream = data;
425 
426     nxt_debug(task, "stream source closed fd:%d", c->socket.fd);
427 
428     nxt_conn_close(task, c);
429 
430     b = nxt_buf_sync_alloc(stream->upstream->buffers.mem_pool,
431                            NXT_BUF_SYNC_LAST);
432 
433     if (nxt_slow_path(b == NULL)) {
434         stream->error_handler(task, stream);
435         return;
436     }
437 
438     nxt_source_filter(task->thread, c->write_work_queue, task, stream->next, b);
439 }
440 
441 
442 static void
nxt_stream_source_error(nxt_task_t * task,void * obj,void * data)443 nxt_stream_source_error(nxt_task_t *task, void *obj, void *data)
444 {
445     nxt_stream_source_t  *stream;
446 
447     stream = data;
448 
449 #if (NXT_DEBUG)
450     {
451         nxt_event_fd_t  *ev;
452 
453         ev = obj;
454 
455         nxt_debug(task, "stream source error fd:%d", ev->fd);
456     }
457 #endif
458 
459     nxt_stream_source_close(task, stream);
460 }
461 
462 
463 static void
nxt_stream_source_close(nxt_task_t * task,nxt_stream_source_t * stream)464 nxt_stream_source_close(nxt_task_t *task, nxt_stream_source_t *stream)
465 {
466     nxt_conn_close(task, stream->conn);
467 
468     stream->error_handler(task, stream);
469 }
470 
471 
472 void
nxt_source_filter_handler(nxt_task_t * task,void * obj,void * data)473 nxt_source_filter_handler(nxt_task_t *task, void *obj, void *data)
474 {
475     nxt_source_hook_t  *next;
476 
477     next = obj;
478 
479     next->filter(task, next->context, data);
480 }
481