xref: /unit/src/nxt_application.c (revision 0:a63ceefd6ab0)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Valentin V. Bartenev
5  * Copyright (C) NGINX, Inc.
6  */
7 
8 #include <nxt_main.h>
9 #include <nxt_cycle.h>
10 #include <nxt_application.h>
11 
12 
13 #define NXT_PARSE_AGAIN  (u_char *) -1
14 
15 
16 static void nxt_app_thread(void *ctx);
17 static nxt_app_request_t *nxt_app_request_create(nxt_socket_t s,
18     nxt_log_t *log);
19 static void nxt_app_conn_update(nxt_thread_t *thr, nxt_event_conn_t *c,
20     nxt_log_t *log);
21 static nxt_int_t nxt_app_write_finish(nxt_app_request_t *r);
22 static void nxt_app_buf_complettion(nxt_thread_t *thr, void *obj, void *data);
23 static void nxt_app_delivery_handler(nxt_thread_t *thr, void *obj, void *data);
24 static void nxt_app_delivery_ready(nxt_thread_t *thr, void *obj, void *data);
25 static void nxt_app_delivery_complettion(nxt_thread_t *thr, void *obj,
26     void *data);
27 static void nxt_app_delivery_error(nxt_thread_t *thr, void *obj, void *data);
28 static void nxt_app_delivery_timeout(nxt_thread_t *thr, void *obj, void *data);
29 static nxt_msec_t nxt_app_delivery_timer_value(nxt_event_conn_t *c,
30     uintptr_t data);
31 static void nxt_app_delivery_done(nxt_thread_t *thr, nxt_event_conn_t *c);
32 static void nxt_app_close_request(nxt_thread_t *thr, nxt_app_request_t *r);
33 
34 
35 typedef struct nxt_app_http_parse_state_s  nxt_app_http_parse_state_t;
36 
37 struct nxt_app_http_parse_state_s {
38     u_char     *pos;
39     nxt_int_t  (*handler)(nxt_app_request_header_t *h, u_char *start,
40                           u_char *end, nxt_app_http_parse_state_t *state);
41 };
42 
43 static nxt_int_t nxt_app_http_parse_request(nxt_app_request_t *r, u_char *buf,
44     size_t size);
45 static nxt_int_t nxt_app_http_parse_request_line(nxt_app_request_header_t *h,
46     u_char *start, u_char *end, nxt_app_http_parse_state_t *state);
47 static nxt_int_t nxt_app_http_parse_field_value(nxt_app_request_header_t *h,
48     u_char *start, u_char *end, nxt_app_http_parse_state_t *state);
49 static nxt_int_t nxt_app_http_parse_field_name(nxt_app_request_header_t *h,
50     u_char *start, u_char *end, nxt_app_http_parse_state_t *state);
51 
52 static nxt_int_t nxt_app_http_process_headers(nxt_app_request_t *r);
53 
54 
55 static const nxt_event_conn_state_t  nxt_app_delivery_write_state;
56 
57 static nxt_application_module_t  *nxt_app = &nxt_python_module;
58 
59 static nxt_thread_mutex_t        nxt_app_mutex;
60 static nxt_thread_cond_t         nxt_app_cond;
61 
62 static nxt_buf_t                 *nxt_app_buf_free;
63 static nxt_buf_t                 *nxt_app_buf_done;
64 
65 static nxt_event_engine_t        *nxt_app_engine;
66 static nxt_mem_pool_t            *nxt_app_mem_pool;
67 
68 static nxt_uint_t                nxt_app_buf_current_number;
69 static nxt_uint_t                nxt_app_buf_max_number = 16;
70 
71 
72 nxt_int_t
73 nxt_app_start(nxt_cycle_t *cycle)
74 {
75     nxt_thread_link_t    *link;
76     nxt_thread_handle_t  handle;
77 
78     if (nxt_slow_path(nxt_thread_mutex_create(&nxt_app_mutex) != NXT_OK)) {
79         return NXT_ERROR;
80     }
81 
82     if (nxt_slow_path(nxt_thread_cond_create(&nxt_app_cond) != NXT_OK)) {
83         return NXT_ERROR;
84     }
85 
86     link = nxt_malloc(sizeof(nxt_thread_link_t));
87 
88     if (nxt_fast_path(link != NULL)) {
89         link->start = nxt_app_thread;
90         link->data = cycle;
91         link->engine = NULL;
92         link->exit = NULL;
93 
94         return nxt_thread_create(&handle, link);
95     }
96 
97     return NXT_ERROR;
98 }
99 
100 
101 #define SIZE  4096
102 
103 static void
104 nxt_app_thread(void *ctx)
105 {
106     ssize_t                 n;
107     nxt_err_t               err;
108     nxt_cycle_t             *cycle;
109     nxt_socket_t            s;
110     nxt_thread_t            *thr;
111     nxt_app_request_t       *r;
112     nxt_event_engine_t      **engines;
113     nxt_listen_socket_t     *ls;
114     u_char                  buf[SIZE];
115     const size_t            size = SIZE;
116     nxt_app_header_field_t  fields[128];
117 
118     thr = nxt_thread();
119 
120     nxt_log_debug(thr->log, "app thread");
121 
122     cycle = ctx;
123     engines = cycle->engines->elts;
124 
125     nxt_app_engine = engines[0];
126 
127     nxt_app_mem_pool = nxt_mem_pool_create(512);
128     if (nxt_slow_path(nxt_app_mem_pool == NULL)) {
129         return;
130     }
131 
132     if (nxt_slow_path(nxt_app->init(thr) != NXT_OK)) {
133         nxt_log_debug(thr->log, "application init failed");
134     }
135 
136     ls = cycle->listen_sockets->elts;
137 
138     for ( ;; ) {
139         s = accept(ls->socket, NULL, NULL);
140 
141         if (nxt_slow_path(s == -1)) {
142             err = nxt_socket_errno;
143 
144             nxt_log_error(NXT_LOG_ERR, thr->log, "accept(%d) failed %E",
145                           ls->socket, err);
146 
147             if (err == EBADF) {
148                 /* STUB: ls->socket has been closed on exit. */
149                 return;
150             }
151 
152             continue;
153         }
154 
155         nxt_log_debug(thr->log, "accept(%d): %d", ls->socket, s);
156 
157         n = recv(s, buf, size, 0);
158 
159         if (nxt_slow_path(n <= 0)) {
160             err = (n == 0) ? 0 : nxt_socket_errno;
161 
162             nxt_log_error(NXT_LOG_ERR, thr->log, "recv(%d, %uz) failed %E",
163                           s, size, err);
164             close(s);
165             continue;
166         }
167 
168         nxt_log_debug(thr->log, "recv(%d, %uz): %z", s, size, n);
169 
170         r = nxt_app_request_create(s, thr->log);
171         if (nxt_slow_path(r == NULL)) {
172             goto fail;
173         }
174 
175         r->header.fields = fields;
176 
177         //nxt_app->start(r);
178 
179         if (nxt_app_http_parse_request(r, buf, n) != NXT_OK) {
180             nxt_log_debug(thr->log, "nxt_app_http_parse_request() failed");
181             nxt_mem_pool_destroy(r->mem_pool);
182             goto fail;
183         }
184 
185         if (nxt_app_http_process_headers(r) != NXT_OK) {
186             nxt_log_debug(thr->log, "nxt_app_http_process_headers() failed");
187             nxt_mem_pool_destroy(r->mem_pool);
188             goto fail;
189         }
190 
191         nxt_app->run(r);
192 
193         if (nxt_slow_path(nxt_app_write_finish(r) == NXT_ERROR)) {
194             goto fail;
195         }
196 
197         continue;
198 
199     fail:
200 
201         close(s);
202         nxt_nanosleep(1000000000);  /* 1s */
203     }
204 }
205 
206 
207 static nxt_app_request_t *
208 nxt_app_request_create(nxt_socket_t s, nxt_log_t *log)
209 {
210     nxt_mem_pool_t     *mp;
211     nxt_event_conn_t   *c;
212     nxt_app_request_t  *r;
213 
214     mp = nxt_mem_pool_create(1024);
215     if (nxt_slow_path(mp == NULL)) {
216         return NULL;
217     }
218 
219     r = nxt_mem_zalloc(mp, sizeof(nxt_app_request_t));
220     if (nxt_slow_path(r == NULL)) {
221         return NULL;
222     }
223 
224     c = nxt_mem_zalloc(mp, sizeof(nxt_event_conn_t));
225     if (nxt_slow_path(c == NULL)) {
226         return NULL;
227     }
228 
229     c->socket.fd = s;
230     c->socket.data = r;
231 
232     r->mem_pool = mp;
233     r->event_conn = c;
234     r->log = log;
235 
236     return r;
237 }
238 
239 
240 static nxt_int_t
241 nxt_app_http_parse_request(nxt_app_request_t *r, u_char *buf, size_t size)
242 {
243     u_char                      *end;
244     ssize_t                     n;
245     nxt_err_t                   err;
246     nxt_socket_t                s;
247     nxt_app_http_parse_state_t  state;
248 
249     end = buf + size;
250 
251     state.pos = buf;
252     state.handler = nxt_app_http_parse_request_line;
253 
254     for ( ;; ) {
255         switch (state.handler(&r->header, state.pos, end, &state)) {
256 
257         case NXT_OK:
258             continue;
259 
260         case NXT_DONE:
261             r->body_preread.len = end - state.pos;
262             r->body_preread.data = state.pos;
263 
264             return NXT_OK;
265 
266         case NXT_AGAIN:
267             s = r->event_conn->socket.fd;
268             n = recv(s, end, SIZE - size, 0);
269 
270             if (nxt_slow_path(n <= 0)) {
271                 err = (n == 0) ? 0 : nxt_socket_errno;
272 
273                 nxt_log_error(NXT_LOG_ERR, r->log, "recv(%d, %uz) failed %E",
274                               s, size, err);
275 
276                 return NXT_ERROR;
277             }
278 
279             nxt_log_debug(r->log, "recv(%d, %uz): %z", s, SIZE - size, n);
280 
281             size += n;
282             end += n;
283 
284             continue;
285         }
286 
287         return NXT_ERROR;
288     }
289 }
290 
291 
292 static nxt_int_t
293 nxt_app_http_parse_request_line(nxt_app_request_header_t *h, u_char *start,
294     u_char *end, nxt_app_http_parse_state_t *state)
295 {
296     u_char  *p;
297 
298     for (p = start; /* void */; p++) {
299 
300         if (nxt_slow_path(p == end)) {
301             state->pos = p;
302             return NXT_AGAIN;
303         }
304 
305         if (*p == ' ') {
306             break;
307         }
308     }
309 
310     h->method.len = p - start;
311     h->method.data = start;
312 
313     start = p + 1;
314 
315     p = nxt_memchr(start, ' ', end - start);
316 
317     if (nxt_slow_path(p == NULL)) {
318         return NXT_AGAIN;
319     }
320 
321     h->path.len = p - start;
322     h->path.data = start;
323 
324     start = p + 1;
325 
326     if (nxt_slow_path((size_t) (end - start) < sizeof("HTTP/1.1\n") - 1)) {
327         return NXT_AGAIN;
328     }
329 
330     h->version.len = sizeof("HTTP/1.1") - 1;
331     h->version.data = start;
332 
333     p = start + sizeof("HTTP/1.1") - 1;
334 
335     if (nxt_slow_path(*p == '\n')) {
336         return nxt_app_http_parse_field_name(h, p + 1, end, state);
337     }
338 
339     if (nxt_slow_path(end - p < 2)) {
340         return NXT_AGAIN;
341     }
342 
343     return nxt_app_http_parse_field_name(h, p + 2, end, state);
344 }
345 
346 
347 static nxt_int_t
348 nxt_app_http_parse_field_name(nxt_app_request_header_t *h, u_char *start,
349     u_char *end, nxt_app_http_parse_state_t *state)
350 {
351     u_char                  *p;
352     nxt_app_header_field_t  *fld;
353 
354     if (nxt_slow_path(start == end)) {
355         goto again;
356     }
357 
358     if (nxt_slow_path(*start == '\n')) {
359         state->pos = start + 1;
360         return NXT_DONE;
361     }
362 
363     if (*start == '\r') {
364         if (nxt_slow_path(end - start < 2)) {
365             goto again;
366         }
367 
368         if (nxt_slow_path(start[1] != '\n')) {
369             return NXT_ERROR;
370         }
371 
372         state->pos = start + 2;
373         return NXT_DONE;
374     }
375 
376     p = nxt_memchr(start, ':', end - start);
377 
378     if (nxt_slow_path(p == NULL)) {
379         goto again;
380     }
381 
382     fld = &h->fields[h->fields_num];
383 
384     fld->name.len = p - start;
385     fld->name.data = start;
386 
387     return nxt_app_http_parse_field_value(h, p + 1, end, state);
388 
389 again:
390 
391     state->pos = start;
392     state->handler = nxt_app_http_parse_field_name;
393 
394     return NXT_AGAIN;
395 }
396 
397 
398 static nxt_int_t
399 nxt_app_http_parse_field_value(nxt_app_request_header_t *h, u_char *start,
400     u_char *end, nxt_app_http_parse_state_t *state)
401 {
402     u_char                  *p;
403     nxt_app_header_field_t  *fld;
404 
405     for ( ;; ) {
406         if (nxt_slow_path(start == end)) {
407             goto again;
408         }
409 
410         if (*start != ' ') {
411             break;
412         }
413 
414         start++;
415     }
416 
417     p = nxt_memchr(start, '\n', end - start);
418 
419     if (nxt_slow_path(p == NULL)) {
420         goto again;
421     }
422 
423     fld = &h->fields[h->fields_num];
424 
425     fld->value.len = p - start;
426     fld->value.data = start;
427 
428     fld->value.len -= (p[-1] == '\r');
429 
430     h->fields_num++;
431 
432     state->pos = p + 1;
433     state->handler = nxt_app_http_parse_field_name;
434 
435     return NXT_OK;
436 
437 again:
438 
439     state->pos = start;
440     state->handler = nxt_app_http_parse_field_value;
441 
442     return NXT_AGAIN;
443 }
444 
445 
446 static nxt_int_t
447 nxt_app_http_process_headers(nxt_app_request_t *r)
448 {
449     nxt_uint_t               i;
450     nxt_app_header_field_t  *fld;
451 
452     static const u_char content_length[14] = "Content-Length";
453     static const u_char content_type[12] = "Content-Type";
454 
455     for (i = 0; i < r->header.fields_num; i++) {
456         fld = &r->header.fields[i];
457 
458         if (fld->name.len == sizeof(content_length)
459             && nxt_memcasecmp(fld->name.data, content_length,
460                               sizeof(content_length)) == 0)
461         {
462             r->header.content_length = &fld->value;
463             r->body_rest = nxt_off_t_parse(fld->value.data, fld->value.len);
464             continue;
465         }
466 
467         if (fld->name.len == sizeof(content_type)
468             && nxt_memcasecmp(fld->name.data, content_type,
469                               sizeof(content_type)) == 0)
470         {
471             r->header.content_type = &fld->value;
472             continue;
473         }
474     }
475 
476     return NXT_OK;
477 }
478 
479 
480 static void
481 nxt_app_conn_update(nxt_thread_t *thr, nxt_event_conn_t *c, nxt_log_t *log)
482 {
483     static nxt_atomic_t  ident = 1;
484 
485     c->socket.write_ready = 1;
486 
487     c->socket.log = &c->log;
488     c->log = *log;
489 
490     /* The while loop skips possible uint32_t overflow. */
491 
492     while (c->log.ident == 0) {
493         c->log.ident = (uint32_t) nxt_atomic_fetch_add(&ident, 1);
494     }
495 
496     thr->engine->connections++;
497 
498     c->io = thr->engine->event->io;
499     c->max_chunk = NXT_INT32_T_MAX;
500     c->sendfile = NXT_CONN_SENDFILE_UNSET;
501 
502     c->socket.read_work_queue = &thr->engine->read_work_queue;
503     c->socket.write_work_queue = &thr->engine->write_work_queue;
504     c->read_work_queue = &thr->engine->read_work_queue;
505     c->write_work_queue = &thr->engine->write_work_queue;
506 
507     nxt_event_conn_timer_init(&c->read_timer, c, c->socket.read_work_queue);
508     nxt_event_conn_timer_init(&c->write_timer, c, c->socket.write_work_queue);
509 
510     nxt_log_debug(&c->log, "event connections: %uD", thr->engine->connections);
511 }
512 
513 
514 nxt_int_t
515 nxt_app_http_read_body(nxt_app_request_t *r, u_char *data, size_t len)
516 {
517     size_t     preread;
518     ssize_t    n;
519     nxt_err_t  err;
520 
521     if ((off_t) len > r->body_rest) {
522         len = (size_t) r->body_rest;
523     }
524 
525     preread = 0;
526 
527     if (r->body_preread.len != 0) {
528         preread = nxt_min(r->body_preread.len, len);
529 
530         nxt_memcpy(data, r->body_preread.data, preread);
531 
532         r->body_preread.len -= preread;
533         r->body_preread.data += preread;
534 
535         r->body_rest -= preread;
536 
537         len -= preread;
538     }
539 
540     if (len == 0) {
541         return NXT_OK;
542     }
543 
544     n = recv(r->event_conn->socket.fd, data + preread, len, 0);
545 
546     if (nxt_slow_path(n < (ssize_t) len)) {
547         if (n <= 0) {
548             err = (n == 0) ? 0 : nxt_socket_errno;
549 
550             nxt_log_error(NXT_LOG_ERR, r->log, "recv(%d, %uz) failed %E",
551                           r->event_conn->socket.fd, len, err);
552 
553             return NXT_ERROR;
554         }
555 
556         nxt_log_error(NXT_LOG_ERR, r->log,
557                       "client prematurely closed connection");
558 
559         return NXT_ERROR;
560     }
561 
562     r->body_rest -= n;
563 
564     return NXT_OK;
565 }
566 
567 
568 nxt_int_t
569 nxt_app_write(nxt_app_request_t *r, const u_char *data, size_t len)
570 {
571     size_t      free;
572     nxt_err_t   err;
573     nxt_buf_t   *b, *out, **next;
574     nxt_uint_t  bufs;
575 
576     out = NULL;
577     next = &out;
578 
579     b = r->output_buf;
580 
581     if (b == NULL) {
582         bufs = 0;
583         goto get_buf;
584     }
585 
586     bufs = 1;
587 
588     for ( ;; ) {
589         free = nxt_buf_mem_free_size(&b->mem);
590 
591         if (free > len) {
592             b->mem.free = nxt_cpymem(b->mem.free, data, len);
593             break;
594         }
595 
596         b->mem.free = nxt_cpymem(b->mem.free, data, free);
597 
598         data += free;
599         len -= free;
600 
601         *next = b;
602         next = &b->next;
603 
604         if (len == 0) {
605             b = NULL;
606             break;
607         }
608 
609         if (bufs == nxt_app_buf_max_number) {
610             bufs = 0;
611             *next = NULL;
612 
613             nxt_event_engine_post(nxt_app_engine, nxt_app_delivery_handler,
614                                   r->event_conn, out, &nxt_main_log);
615 
616             out = NULL;
617             next = &out;
618         }
619 
620     get_buf:
621 
622         if (nxt_slow_path(nxt_thread_mutex_lock(&nxt_app_mutex) != NXT_OK)) {
623             return NXT_ERROR;
624         }
625 
626         for ( ;; ) {
627             b = nxt_app_buf_free;
628 
629             if (b != NULL) {
630                 nxt_app_buf_free = b->next;
631                 break;
632             }
633 
634             if (nxt_app_buf_current_number < nxt_app_buf_max_number) {
635                 break;
636             }
637 
638             err = nxt_thread_cond_wait(&nxt_app_cond, &nxt_app_mutex,
639                                        NXT_INFINITE_NSEC);
640 
641             if (nxt_slow_path(err != 0)) {
642                 (void) nxt_thread_mutex_unlock(&nxt_app_mutex);
643                 return NXT_ERROR;
644             }
645         }
646 
647         (void) nxt_thread_mutex_unlock(&nxt_app_mutex);
648 
649         if (b == NULL) {
650             b = nxt_buf_mem_alloc(nxt_app_mem_pool, 4096, 0);
651             if (nxt_slow_path(b == NULL)) {
652                 return NXT_ERROR;
653             }
654 
655             b->completion_handler = nxt_app_buf_complettion;
656 
657             nxt_app_buf_current_number++;
658         }
659 
660         bufs++;
661     }
662 
663     r->output_buf = b;
664 
665     if (out != NULL) {
666         *next = NULL;
667 
668         nxt_event_engine_post(nxt_app_engine, nxt_app_delivery_handler,
669                               r->event_conn, out, &nxt_main_log);
670     }
671 
672     return NXT_OK;
673 }
674 
675 
676 static nxt_int_t
677 nxt_app_write_finish(nxt_app_request_t *r)
678 {
679     nxt_buf_t  *b, *out;
680 
681     b = nxt_buf_sync_alloc(r->mem_pool, NXT_BUF_SYNC_LAST);
682     if (nxt_slow_path(b == NULL)) {
683         return NXT_ERROR;
684     }
685 
686     b->completion_handler = nxt_app_buf_complettion;
687     b->parent = (nxt_buf_t *) r;
688 
689     out = r->output_buf;
690 
691     if (out != NULL) {
692         r->output_buf = NULL;
693         out->next = b;
694 
695     } else {
696         out = b;
697     }
698 
699     nxt_event_engine_post(nxt_app_engine, nxt_app_delivery_handler,
700                           r->event_conn, out, &nxt_main_log);
701 
702     return NXT_OK;
703 }
704 
705 
706 static void
707 nxt_app_buf_complettion(nxt_thread_t *thr, void *obj, void *data)
708 {
709     nxt_buf_t  *b;
710 
711     b = obj;
712 
713     nxt_log_debug(thr->log, "app buf completion");
714 
715     b->next = nxt_app_buf_done;
716     nxt_app_buf_done = b;
717 }
718 
719 
720 static void
721 nxt_app_delivery_handler(nxt_thread_t *thr, void *obj, void *data)
722 {
723     nxt_buf_t         *b;
724     nxt_mem_pool_t    *mp;
725     nxt_event_conn_t  *c;
726 
727     c = obj;
728     b = data;
729 
730     nxt_log_debug(thr->log, "app delivery handler");
731 
732     if (c->write != NULL) {
733         nxt_buf_chain_add(&c->write, b);
734         return;
735     }
736 
737     if (c->mem_pool == NULL) {
738         mp = nxt_mem_pool_create(256);
739         if (nxt_slow_path(mp == NULL)) {
740             close(c->socket.fd);
741             return;
742         }
743 
744         c->mem_pool = mp;
745         nxt_app_conn_update(thr, c, &nxt_main_log);
746     }
747 
748     if (c->socket.timedout || c->socket.error != 0) {
749         nxt_buf_chain_add(&nxt_app_buf_done, b);
750         nxt_thread_work_queue_add(thr, c->write_work_queue,
751                                   nxt_app_delivery_complettion, c, NULL,
752                                   thr->log);
753         return;
754     }
755 
756     c->write = b;
757     c->write_state = &nxt_app_delivery_write_state;
758 
759     nxt_event_conn_write(thr, c);
760 }
761 
762 
763 static const nxt_event_conn_state_t  nxt_app_delivery_write_state
764     nxt_aligned(64) =
765 {
766     NXT_EVENT_BUF_PROCESS,
767     NXT_EVENT_TIMER_AUTORESET,
768 
769     nxt_app_delivery_ready,
770     NULL,
771     nxt_app_delivery_error,
772 
773     nxt_app_delivery_timeout,
774     nxt_app_delivery_timer_value,
775     0,
776 };
777 
778 
779 static void
780 nxt_app_delivery_ready(nxt_thread_t *thr, void *obj, void *data)
781 {
782     nxt_event_conn_t  *c;
783 
784     c = obj;
785 
786     nxt_thread_work_queue_add(thr, c->write_work_queue,
787                               nxt_app_delivery_complettion, c, NULL, thr->log);
788 }
789 
790 
791 static void
792 nxt_app_delivery_complettion(nxt_thread_t *thr, void *obj, void *data)
793 {
794     nxt_buf_t          *b, *bn, *free;
795     nxt_app_request_t  *r;
796 
797     nxt_log_debug(thr->log, "app delivery complettion");
798 
799     free = NULL;
800 
801     for (b = nxt_app_buf_done; b; b = bn) {
802         bn = b->next;
803 
804         if (nxt_buf_is_mem(b)) {
805             b->mem.pos = b->mem.start;
806             b->mem.free = b->mem.start;
807 
808             b->next = free;
809             free = b;
810 
811             continue;
812         }
813 
814         if (nxt_buf_is_last(b)) {
815             r = (nxt_app_request_t *) b->parent;
816             nxt_app_close_request(thr, r);
817         }
818     }
819 
820     nxt_app_buf_done = NULL;
821 
822     if (free == NULL) {
823         return;
824     }
825 
826     if (nxt_slow_path(nxt_thread_mutex_lock(&nxt_app_mutex) != NXT_OK)) {
827         return;
828     }
829 
830     nxt_buf_chain_add(&nxt_app_buf_free, free);
831 
832     (void) nxt_thread_mutex_unlock(&nxt_app_mutex);
833 
834     nxt_thread_time_update(thr);
835 
836     (void) nxt_thread_cond_signal(&nxt_app_cond);
837 }
838 
839 
840 static void
841 nxt_app_delivery_error(nxt_thread_t *thr, void *obj, void *data)
842 {
843     nxt_event_conn_t  *c;
844 
845     c = obj;
846 
847     nxt_log_debug(thr->log, "app delivery error");
848 
849     nxt_app_delivery_done(thr, c);
850 }
851 
852 
853 static void
854 nxt_app_delivery_timeout(nxt_thread_t *thr, void *obj, void *data)
855 {
856     nxt_event_conn_t  *c;
857 
858     c = obj;
859 
860     nxt_log_debug(thr->log, "app delivery timeout");
861 
862     nxt_app_delivery_done(thr, c);
863 }
864 
865 
866 static nxt_msec_t
867 nxt_app_delivery_timer_value(nxt_event_conn_t *c, uintptr_t data)
868 {
869     /* 30000 ms */
870     return 30000;
871 }
872 
873 
874 static void
875 nxt_app_delivery_done(nxt_thread_t *thr, nxt_event_conn_t *c)
876 {
877     if (c->write == NULL) {
878         return;
879     }
880 
881     nxt_buf_chain_add(&nxt_app_buf_done, c->write);
882 
883     c->write = NULL;
884 
885     nxt_thread_work_queue_add(thr, c->write_work_queue,
886                               nxt_app_delivery_complettion, c, NULL, thr->log);
887 }
888 
889 
890 static void
891 nxt_app_close_request(nxt_thread_t *thr, nxt_app_request_t *r)
892 {
893     nxt_event_conn_t  *c;
894 
895     nxt_log_debug(thr->log, "app close connection");
896 
897     c = r->event_conn;
898 
899     nxt_event_conn_close(thr, c);
900 
901     nxt_mem_pool_destroy(c->mem_pool);
902     nxt_mem_pool_destroy(r->mem_pool);
903 }
904