xref: /unit/src/nxt_application.c (revision 216:07257705cd64)
1 
2 /*
3  * Copyright (C) Max Romanov
4  * Copyright (C) Igor Sysoev
5  * Copyright (C) Valentin V. Bartenev
6  * Copyright (C) NGINX, Inc.
7  */
8 
9 #include <nxt_main.h>
10 #include <nxt_runtime.h>
11 #include <nxt_application.h>
12 #include <nxt_master_process.h>
13 
14 #include <glob.h>
15 
16 
17 typedef struct {
18     nxt_str_t   type;
19     nxt_str_t   version;
20     nxt_str_t   file;
21 } nxt_module_t;
22 
23 
24 static nxt_buf_t *nxt_discovery_modules(nxt_task_t *task, const char *path);
25 static nxt_int_t nxt_discovery_module(nxt_task_t *task, nxt_mp_t *mp,
26     nxt_array_t *modules, const char *name);
27 static nxt_app_module_t *nxt_app_module_load(nxt_task_t *task,
28     const char *name);
29 
30 
31 static nxt_thread_mutex_t        nxt_app_mutex;
32 static nxt_thread_cond_t         nxt_app_cond;
33 
34 static nxt_http_fields_hash_entry_t  nxt_app_request_fields[];
35 static nxt_http_fields_hash_t        *nxt_app_request_fields_hash;
36 
37 static nxt_application_module_t      *nxt_app;
38 
39 
40 nxt_int_t
41 nxt_discovery_start(nxt_task_t *task, void *data)
42 {
43     nxt_buf_t         *b;
44     nxt_port_t        *main_port;
45     nxt_runtime_t     *rt;
46 
47     nxt_debug(task, "DISCOVERY");
48 
49     b = nxt_discovery_modules(task, "build/nginext.*");
50 
51     rt = task->thread->runtime;
52     main_port = rt->port_by_type[NXT_PROCESS_MASTER];
53 
54     nxt_port_socket_write(task, main_port, NXT_PORT_MSG_MODULES, -1,
55                           0, -1, b);
56 
57     return NXT_OK;
58 }
59 
60 
61 static void
62 nxt_discovery_completion_handler(nxt_task_t *task, void *obj, void *data)
63 {
64     nxt_mp_t   *mp;
65     nxt_buf_t  *b;
66 
67     b = obj;
68     mp = b->data;
69 
70     nxt_mp_destroy(mp);
71 
72     exit(0);
73 }
74 
75 
76 static nxt_buf_t *
77 nxt_discovery_modules(nxt_task_t *task, const char *path)
78 {
79     char          *name;
80     u_char        *p, *end;
81     size_t        size;
82     glob_t        glb;
83     nxt_mp_t      *mp;
84     nxt_buf_t     *b;
85     nxt_int_t     ret;
86     nxt_uint_t    i, n;
87     nxt_array_t   *modules;
88     nxt_module_t  *module;
89 
90     b = NULL;
91 
92     mp = nxt_mp_create(1024, 128, 256, 32);
93     if (mp == NULL) {
94         return b;
95     }
96 
97     ret = glob(path, 0, NULL, &glb);
98 
99     if (ret == 0) {
100         n = glb.gl_pathc;
101 
102         modules = nxt_array_create(mp, n, sizeof(nxt_module_t));
103         if (modules == NULL) {
104             goto fail;
105         }
106 
107         for (i = 0; i < n; i++) {
108             name = glb.gl_pathv[i];
109 
110             ret = nxt_discovery_module(task, mp, modules, name);
111             if (ret != NXT_OK) {
112                 goto fail;
113             }
114         }
115 
116         size = sizeof("[]") - 1;
117         module = modules->elts;
118         n = modules->nelts;
119 
120         for (i = 0; i < n; i++) {
121             nxt_debug(task, "module: %V %V %V",
122                       &module[i].type, &module[i].version, &module[i].file);
123 
124             size += sizeof("{\"type\": \"\",") - 1;
125             size += sizeof(" \"version\": \"\",") - 1;
126             size += sizeof(" \"file\": \"\"},") - 1;
127 
128             size += module[i].type.length
129                     + module[i].version.length
130                     + module[i].file.length;
131         }
132 
133         b = nxt_buf_mem_alloc(mp, size, 0);
134         if (b == NULL) {
135             goto fail;
136         }
137 
138         b->completion_handler = nxt_discovery_completion_handler;
139 
140         p = b->mem.free;
141         end = b->mem.end;
142         *p++ = '[';
143 
144         for (i = 0; i < n; i++) {
145             p = nxt_sprintf(p, end,
146                   "{\"type\": \"%V\", \"version\": \"%V\", \"file\": \"%V\"},",
147                   &module[i].type, &module[i].version, &module[i].file);
148         }
149 
150         *p++ = ']';
151         b->mem.free = p;
152     }
153 
154 fail:
155 
156     globfree(&glb);
157 
158     return b;
159 }
160 
161 
162 static nxt_int_t
163 nxt_discovery_module(nxt_task_t *task, nxt_mp_t *mp, nxt_array_t *modules,
164     const char *name)
165 {
166     void                      *dl;
167     nxt_str_t                 *s;
168     nxt_int_t                 ret;
169     nxt_uint_t                i, n;
170     nxt_module_t              *module;
171     nxt_application_module_t  *app;
172 
173     /*
174      * Only memory allocation failure should return NXT_ERROR.
175      * Any module processing errors are ignored.
176      */
177     ret = NXT_ERROR;
178 
179     dl = dlopen(name, RTLD_GLOBAL | RTLD_NOW);
180 
181     if (dl == NULL) {
182         nxt_log(task, NXT_LOG_CRIT, "dlopen(\"%s\"), failed: \"%s\"",
183                 name, dlerror());
184         return NXT_OK;
185     }
186 
187     app = dlsym(dl, "nxt_app_module");
188 
189     if (app != NULL) {
190         nxt_log(task, NXT_LOG_NOTICE, "module: %V \"%s\"",
191                 &app->version, name);
192 
193         module = modules->elts;
194         n = modules->nelts;
195 
196         for (i = 0; i < n; i++) {
197             if (nxt_strstr_eq(&app->version, &module[i].version)) {
198                 nxt_log(task, NXT_LOG_NOTICE,
199                         "ignoring %s module with the same "
200                         "application language version %V as in %s",
201                         name, &module[i].version, &module[i].file);
202 
203                 goto done;
204             }
205         }
206 
207         module = nxt_array_add(modules);
208         if (module == NULL) {
209             goto fail;
210         }
211 
212         s = nxt_str_dup(mp, &module->type, &app->type);
213         if (s == NULL) {
214             goto fail;
215         }
216 
217         s = nxt_str_dup(mp, &module->version, &app->version);
218         if (s == NULL) {
219             goto fail;
220         }
221 
222         module->file.length = nxt_strlen(name);
223 
224         module->file.start = nxt_mp_alloc(mp, module->file.length);
225         if (module->file.start == NULL) {
226             goto fail;
227         }
228 
229         nxt_memcpy(module->file.start, name, module->file.length);
230 
231     } else {
232         nxt_log(task, NXT_LOG_CRIT, "dlsym(\"%s\"), failed: \"%s\"",
233                 name, dlerror());
234     }
235 
236 done:
237 
238     ret = NXT_OK;
239 
240 fail:
241 
242     if (dlclose(dl) != 0) {
243         nxt_log(task, NXT_LOG_CRIT, "dlclose(\"%s\"), failed: \"%s\"",
244                 name, dlerror());
245     }
246 
247     return ret;
248 }
249 
250 
251 nxt_int_t
252 nxt_app_start(nxt_task_t *task, void *data)
253 {
254     nxt_int_t              ret;
255     nxt_app_lang_module_t  *lang;
256     nxt_common_app_conf_t  *app_conf;
257 
258     app_conf = data;
259 
260     lang = nxt_app_lang_module(task->thread->runtime, &app_conf->type);
261     if (nxt_slow_path(lang == NULL)) {
262         nxt_log(task, NXT_LOG_CRIT, "unknown application type: \"%V\"",
263                 &app_conf->type);
264         return NXT_ERROR;
265     }
266 
267     nxt_app = lang->module;
268 
269     if (nxt_app == NULL) {
270         nxt_debug(task, "application language module: %V \"%s\"",
271                   &lang->version, lang->file);
272 
273         nxt_app = nxt_app_module_load(task, lang->file);
274     }
275 
276     if (nxt_slow_path(nxt_thread_mutex_create(&nxt_app_mutex) != NXT_OK)) {
277         return NXT_ERROR;
278     }
279 
280     if (nxt_slow_path(nxt_thread_cond_create(&nxt_app_cond) != NXT_OK)) {
281         return NXT_ERROR;
282     }
283 
284     ret = nxt_app->init(task, data);
285 
286     if (nxt_slow_path(ret != NXT_OK)) {
287         nxt_debug(task, "application init failed");
288 
289     } else {
290         nxt_debug(task, "application init done");
291     }
292 
293     return ret;
294 }
295 
296 
297 static nxt_app_module_t *
298 nxt_app_module_load(nxt_task_t *task, const char *name)
299 {
300     void  *dl;
301 
302     dl = dlopen(name, RTLD_GLOBAL | RTLD_LAZY);
303 
304     if (dl != NULL) {
305         return dlsym(dl, "nxt_app_module");
306     }
307 
308     nxt_log(task, NXT_LOG_CRIT, "dlopen(\"%s\"), failed: \"%s\"",
309             name, dlerror());
310 
311     return NULL;
312 }
313 
314 
315 nxt_int_t
316 nxt_app_http_init(nxt_task_t *task, nxt_runtime_t *rt)
317 {
318     nxt_http_fields_hash_t  *hash;
319 
320     hash = nxt_http_fields_hash_create(nxt_app_request_fields, rt->mem_pool);
321     if (nxt_slow_path(hash == NULL)) {
322         return NXT_ERROR;
323     }
324 
325     nxt_app_request_fields_hash = hash;
326 
327     return NXT_OK;
328 }
329 
330 
331 void
332 nxt_port_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
333 {
334     size_t          dump_size;
335     nxt_buf_t       *b;
336     nxt_port_t      *port;
337     nxt_app_rmsg_t  rmsg = { msg->buf };
338     nxt_app_wmsg_t  wmsg;
339 
340     b = msg->buf;
341     dump_size = b->mem.free - b->mem.pos;
342 
343     if (dump_size > 300) {
344         dump_size = 300;
345     }
346 
347     nxt_debug(task, "app data: %*s ...", dump_size, b->mem.pos);
348 
349     port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
350                                  msg->port_msg.reply_port);
351     if (nxt_slow_path(port == NULL)) {
352         //
353     }
354 
355     wmsg.port = port;
356     wmsg.write = NULL;
357     wmsg.buf = &wmsg.write;
358     wmsg.stream = msg->port_msg.stream;
359 
360     nxt_app->run(task, &rmsg, &wmsg);
361 }
362 
363 
364 nxt_inline nxt_port_t *
365 nxt_app_msg_get_port(nxt_task_t *task, nxt_app_wmsg_t *msg)
366 {
367     return msg->port;
368 }
369 
370 
371 u_char *
372 nxt_app_msg_write_get_buf(nxt_task_t *task, nxt_app_wmsg_t *msg, size_t size)
373 {
374     size_t      free_size;
375     u_char      *res;
376     nxt_buf_t   *b;
377     nxt_port_t  *port;
378 
379     res = NULL;
380 
381     do {
382         b = *msg->buf;
383 
384         if (b == NULL) {
385             port = nxt_app_msg_get_port(task, msg);
386             if (nxt_slow_path(port == NULL)) {
387                 return NULL;
388             }
389 
390             b = nxt_port_mmap_get_buf(task, port, size);
391             if (nxt_slow_path(b == NULL)) {
392                 return NULL;
393             }
394 
395             *msg->buf = b;
396 
397             free_size = nxt_buf_mem_free_size(&b->mem);
398 
399             if (nxt_slow_path(free_size < size)) {
400                 nxt_debug(task, "requested buffer too big (%z < %z)",
401                           free_size, size);
402                 return NULL;
403             }
404 
405         }
406 
407         free_size = nxt_buf_mem_free_size(&b->mem);
408 
409         if (free_size >= size) {
410             res = b->mem.free;
411             b->mem.free += size;
412 
413             return res;
414         }
415 
416         if (nxt_port_mmap_increase_buf(task, b, size, size) == NXT_OK) {
417             res = b->mem.free;
418             b->mem.free += size;
419 
420             return res;
421         }
422 
423         msg->buf = &b->next;
424     } while(1);
425 }
426 
427 
428 nxt_int_t
429 nxt_app_msg_write(nxt_task_t *task, nxt_app_wmsg_t *msg, u_char *c, size_t size)
430 {
431     u_char  *dst;
432     size_t  dst_length;
433 
434     if (c != NULL) {
435         dst_length = size + (size < 128 ? 1 : 4) + 1;
436 
437         dst = nxt_app_msg_write_get_buf(task, msg, dst_length);
438         if (nxt_slow_path(dst == NULL)) {
439             nxt_debug(task, "nxt_app_msg_write: get_buf(%uz) failed",
440                       dst_length);
441             return NXT_ERROR;
442         }
443 
444         dst = nxt_app_msg_write_length(dst, size + 1); /* +1 for trailing 0 */
445 
446         nxt_memcpy(dst, c, size);
447         dst[size] = 0;
448 
449         nxt_debug(task, "nxt_app_msg_write: %uz %*s", size, (int)size, c);
450     } else {
451         dst_length = 1;
452 
453         dst = nxt_app_msg_write_get_buf(task, msg, dst_length);
454         if (nxt_slow_path(dst == NULL)) {
455             nxt_debug(task, "nxt_app_msg_write: get_buf(%uz) failed",
456                       dst_length);
457             return NXT_ERROR;
458         }
459 
460         dst = nxt_app_msg_write_length(dst, 0);
461 
462         nxt_debug(task, "nxt_app_msg_write: NULL");
463     }
464 
465     return NXT_OK;
466 }
467 
468 
469 nxt_int_t
470 nxt_app_msg_write_prefixed_upcase(nxt_task_t *task, nxt_app_wmsg_t *msg,
471     const nxt_str_t *prefix, const nxt_str_t *v)
472 {
473     u_char  *dst, *src;
474     size_t  i, length, dst_length;
475 
476     length = prefix->length + v->length;
477 
478     dst_length = length + (length < 128 ? 1 : 4) + 1;
479 
480     dst = nxt_app_msg_write_get_buf(task, msg, dst_length);
481     if (nxt_slow_path(dst == NULL)) {
482         return NXT_ERROR;
483     }
484 
485     dst = nxt_app_msg_write_length(dst, length + 1); /* +1 for trailing 0 */
486 
487     nxt_memcpy(dst, prefix->start, prefix->length);
488     dst += prefix->length;
489 
490     src = v->start;
491     for (i = 0; i < v->length; i++, dst++, src++) {
492 
493         if (*src >= 'a' && *src <= 'z') {
494             *dst = *src & ~0x20;
495             continue;
496         }
497 
498         if (*src == '-') {
499             *dst = '_';
500             continue;
501         }
502 
503         *dst = *src;
504     }
505 
506     *dst = 0;
507 
508     return NXT_OK;
509 }
510 
511 
512 nxt_int_t
513 nxt_app_msg_read_str(nxt_task_t *task, nxt_app_rmsg_t *msg, nxt_str_t *str)
514 {
515     size_t     length;
516     nxt_buf_t  *buf;
517 
518     do {
519         buf = msg->buf;
520 
521         if (nxt_slow_path(buf == NULL)) {
522             return NXT_DONE;
523         }
524 
525         if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 1)) {
526             if (nxt_fast_path(nxt_buf_mem_used_size(&buf->mem) == 0)) {
527                 msg->buf = buf->next;
528                 continue;
529             }
530             return NXT_ERROR;
531         }
532 
533         if (buf->mem.pos[0] >= 128) {
534             if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 4)) {
535                 return NXT_ERROR;
536             }
537         }
538 
539         break;
540     } while (1);
541 
542     buf->mem.pos = nxt_app_msg_read_length(buf->mem.pos, &length);
543 
544     if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < (intptr_t)length))
545     {
546         return NXT_ERROR;
547     }
548 
549     if (length > 0) {
550         str->start = buf->mem.pos;
551         str->length = length - 1;
552 
553         buf->mem.pos += length;
554 
555         nxt_debug(task, "nxt_read_str: %d %*s", (int)length - 1,
556                         (int)length - 1, str->start);
557     } else {
558         str->start = NULL;
559         str->length = 0;
560 
561         nxt_debug(task, "nxt_read_str: NULL");
562     }
563 
564     return NXT_OK;
565 }
566 
567 
568 size_t
569 nxt_app_msg_read_raw(nxt_task_t *task, nxt_app_rmsg_t *msg, void *dst,
570     size_t size)
571 {
572     size_t     res, read_size;
573     nxt_buf_t  *buf;
574 
575     res = 0;
576 
577     while (size > 0) {
578         buf = msg->buf;
579 
580         if (nxt_slow_path(buf == NULL)) {
581             break;
582         }
583 
584         if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) == 0)) {
585             msg->buf = buf->next;
586             continue;
587         }
588 
589         read_size = nxt_buf_mem_used_size(&buf->mem);
590         read_size = nxt_min(read_size, size);
591 
592         dst = nxt_cpymem(dst, buf->mem.pos, read_size);
593 
594         size -= read_size;
595         buf->mem.pos += read_size;
596         res += read_size;
597     }
598 
599     nxt_debug(task, "nxt_read_raw: %uz", res);
600 
601     return res;
602 }
603 
604 
605 nxt_int_t
606 nxt_app_msg_read_nvp(nxt_task_t *task, nxt_app_rmsg_t *rmsg, nxt_str_t *n,
607     nxt_str_t *v)
608 {
609     nxt_int_t rc;
610 
611     rc = nxt_app_msg_read_str(task, rmsg, n);
612     if (nxt_slow_path(rc != NXT_OK)) {
613         return rc;
614     }
615 
616     rc = nxt_app_msg_read_str(task, rmsg, v);
617     if (nxt_slow_path(rc != NXT_OK)) {
618         return rc;
619     }
620 
621     return rc;
622 }
623 
624 
625 nxt_int_t
626 nxt_app_msg_read_size(nxt_task_t *task, nxt_app_rmsg_t *msg, size_t *size)
627 {
628     nxt_buf_t  *buf;
629 
630     do {
631         buf = msg->buf;
632 
633         if (nxt_slow_path(buf == NULL)) {
634             return NXT_DONE;
635         }
636 
637         if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 1)) {
638             if (nxt_fast_path(nxt_buf_mem_used_size(&buf->mem) == 0)) {
639                 msg->buf = buf->next;
640                 continue;
641             }
642             return NXT_ERROR;
643         }
644 
645         if (buf->mem.pos[0] >= 128) {
646             if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 4)) {
647                 return NXT_ERROR;
648             }
649         }
650 
651         break;
652     } while (1);
653 
654     buf->mem.pos = nxt_app_msg_read_length(buf->mem.pos, size);
655 
656     nxt_debug(task, "nxt_read_size: %d", (int)*size);
657 
658     return NXT_OK;
659 }
660 
661 
662 static nxt_int_t
663 nxt_app_request_content_length(void *ctx, nxt_http_field_t *field,
664     nxt_log_t *log)
665 {
666     nxt_str_t                 *v;
667     nxt_app_parse_ctx_t       *c;
668     nxt_app_request_header_t  *h;
669 
670     c = ctx;
671     h = &c->r.header;
672     v = &field->value;
673 
674     h->content_length = *v;
675     h->parsed_content_length = nxt_off_t_parse(v->start, v->length);
676 
677     return NXT_OK;
678 }
679 
680 
681 static nxt_int_t
682 nxt_app_request_content_type(void *ctx, nxt_http_field_t *field,
683     nxt_log_t *log)
684 {
685     nxt_app_parse_ctx_t       *c;
686     nxt_app_request_header_t  *h;
687 
688     c = ctx;
689     h = &c->r.header;
690 
691     h->content_type = field->value;
692 
693     return NXT_OK;
694 }
695 
696 
697 static nxt_int_t
698 nxt_app_request_cookie(void *ctx, nxt_http_field_t *field,
699     nxt_log_t *log)
700 {
701     nxt_app_parse_ctx_t       *c;
702     nxt_app_request_header_t  *h;
703 
704     c = ctx;
705     h = &c->r.header;
706 
707     h->cookie = field->value;
708 
709     return NXT_OK;
710 }
711 
712 
713 static nxt_int_t
714 nxt_app_request_host(void *ctx, nxt_http_field_t *field,
715     nxt_log_t *log)
716 {
717     nxt_app_parse_ctx_t       *c;
718     nxt_app_request_header_t  *h;
719 
720     c = ctx;
721     h = &c->r.header;
722 
723     h->host = field->value;
724 
725     return NXT_OK;
726 }
727 
728 
729 static nxt_http_fields_hash_entry_t  nxt_app_request_fields[] = {
730     { nxt_string("Content-Length"), &nxt_app_request_content_length, 0 },
731     { nxt_string("Content-Type"), &nxt_app_request_content_type, 0 },
732     { nxt_string("Cookie"), &nxt_app_request_cookie, 0 },
733     { nxt_string("Host"), &nxt_app_request_host, 0 },
734 
735     { nxt_null_string, NULL, 0 }
736 };
737 
738 
739 nxt_int_t
740 nxt_app_http_req_init(nxt_task_t *task, nxt_app_parse_ctx_t *ctx)
741 {
742     nxt_int_t  rc;
743 
744     ctx->mem_pool = nxt_mp_create(1024, 128, 256, 32);
745 
746     rc = nxt_http_parse_request_init(&ctx->parser, ctx->mem_pool);
747     if (nxt_slow_path(rc != NXT_OK)) {
748         return rc;
749     }
750 
751     ctx->parser.fields_hash = nxt_app_request_fields_hash;
752 
753     return NXT_OK;
754 }
755 
756 
757 nxt_int_t
758 nxt_app_http_req_header_parse(nxt_task_t *task, nxt_app_parse_ctx_t *ctx,
759     nxt_buf_t *buf)
760 {
761     nxt_int_t                 rc;
762     nxt_app_request_body_t    *b;
763     nxt_http_request_parse_t  *p;
764     nxt_app_request_header_t  *h;
765 
766     p = &ctx->parser;
767     b = &ctx->r.body;
768     h = &ctx->r.header;
769 
770     nxt_assert(h->done == 0);
771 
772     rc = nxt_http_parse_request(p, &buf->mem);
773 
774     if (nxt_slow_path(rc != NXT_DONE)) {
775         return rc;
776     }
777 
778     rc = nxt_http_fields_process(p->fields, ctx, task->log);
779 
780     if (nxt_slow_path(rc != NXT_OK)) {
781         return rc;
782     }
783 
784     h->fields = p->fields;
785     h->done = 1;
786 
787     h->version.start = p->version.str;
788     h->version.length = nxt_strlen(p->version.str);
789 
790     h->method = p->method;
791 
792     h->target.start = p->target_start;
793     h->target.length = p->target_end - p->target_start;
794 
795     h->path = p->path;
796     h->query = p->args;
797 
798     if (h->parsed_content_length == 0) {
799         b->done = 1;
800 
801     }
802 
803     if (buf->mem.free == buf->mem.pos) {
804         return NXT_DONE;
805     }
806 
807     b->buf = buf;
808     b->done = nxt_buf_mem_used_size(&buf->mem) >=
809               h->parsed_content_length;
810 
811     if (b->done == 1) {
812         b->preread_size = nxt_buf_mem_used_size(&buf->mem);
813     }
814 
815     return NXT_DONE;
816 }
817 
818 
819 nxt_int_t
820 nxt_app_http_req_body_read(nxt_task_t *task, nxt_app_parse_ctx_t *ctx,
821     nxt_buf_t *buf)
822 {
823     nxt_app_request_body_t    *b;
824     nxt_app_request_header_t  *h;
825 
826     b = &ctx->r.body;
827     h = &ctx->r.header;
828 
829     nxt_assert(h->done == 1);
830     nxt_assert(b->done == 0);
831 
832     b->done = nxt_buf_mem_used_size(&buf->mem) + b->preread_size >=
833               (size_t) h->parsed_content_length;
834 
835     if (b->done == 1) {
836         b->preread_size += nxt_buf_mem_used_size(&buf->mem);
837     }
838 
839     return b->done == 1 ? NXT_DONE : NXT_AGAIN;
840 }
841 
842 
843 nxt_int_t
844 nxt_app_http_req_done(nxt_task_t *task, nxt_app_parse_ctx_t *ctx)
845 {
846     nxt_mp_destroy(ctx->mem_pool);
847 
848     return NXT_OK;
849 }
850 
851 
852 nxt_int_t
853 nxt_app_msg_flush(nxt_task_t *task, nxt_app_wmsg_t *msg, nxt_bool_t last)
854 {
855     nxt_int_t   rc;
856     nxt_buf_t   *b;
857     nxt_port_t  *port;
858 
859     rc = NXT_OK;
860 
861     port = nxt_app_msg_get_port(task, msg);
862     if (nxt_slow_path(port == NULL)) {
863         return NXT_ERROR;
864     }
865 
866     if (nxt_slow_path(last == 1)) {
867         do {
868             b = *msg->buf;
869 
870             if (b == NULL) {
871                 b = nxt_buf_sync_alloc(port->mem_pool, NXT_BUF_SYNC_LAST);
872                 *msg->buf = b;
873                 break;
874             }
875 
876             msg->buf = &b->next;
877         } while(1);
878     }
879 
880     if (nxt_slow_path(msg->write != NULL)) {
881         rc = nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA,
882                                    -1, msg->stream, 0, msg->write);
883 
884         msg->write = NULL;
885         msg->buf = &msg->write;
886     }
887 
888     return rc;
889 }
890 
891 
892 nxt_int_t
893 nxt_app_msg_write_raw(nxt_task_t *task, nxt_app_wmsg_t *msg, const u_char *c,
894     size_t size)
895 {
896     size_t      free_size, copy_size;
897     nxt_buf_t   *b;
898     nxt_port_t  *port;
899 
900     nxt_debug(task, "nxt_app_msg_write_raw: %uz", size);
901 
902     while (size > 0) {
903         b = *msg->buf;
904 
905         if (b == NULL) {
906             port = nxt_app_msg_get_port(task, msg);
907             if (nxt_slow_path(port == NULL)) {
908                 return NXT_ERROR;
909             }
910 
911             b = nxt_port_mmap_get_buf(task, port, size);
912             if (nxt_slow_path(b == NULL)) {
913                 return NXT_ERROR;
914             }
915 
916             *msg->buf = b;
917         }
918 
919         do {
920             free_size = nxt_buf_mem_free_size(&b->mem);
921 
922             if (free_size > 0) {
923                 copy_size = nxt_min(free_size, size);
924 
925                 b->mem.free = nxt_cpymem(b->mem.free, c, copy_size);
926 
927                 size -= copy_size;
928                 c += copy_size;
929 
930                 if (size == 0) {
931                     return NXT_OK;
932                 }
933             }
934         } while (nxt_port_mmap_increase_buf(task, b, size, 1) == NXT_OK);
935 
936         msg->buf = &b->next;
937     }
938 
939     return NXT_OK;
940 }
941 
942 
943 nxt_app_lang_module_t *
944 nxt_app_lang_module(nxt_runtime_t *rt, nxt_str_t *name)
945 {
946     u_char                 *p, *end, *version;
947     size_t                 type_length, version_length;
948     nxt_uint_t             i, n;
949     nxt_app_lang_module_t  *lang;
950 
951     end = name->start + name->length;
952     version = end;
953 
954     for (p = name->start; p < end; p++) {
955         if (*p == ' ') {
956             version = p + 1;
957             break;
958         }
959 
960         if (*p >= '0' && *p <= '9') {
961             version = p;
962             break;
963         }
964     }
965 
966     type_length = p - name->start;
967     version_length = end - version;
968 
969     lang = rt->languages->elts;
970     n = rt->languages->nelts;
971 
972     for (i = 0; i < n; i++) {
973         if (nxt_str_eq(&lang[i].type, name->start, type_length)
974             && nxt_str_start(&lang[i].version, version, version_length))
975         {
976             return &lang[i];
977         }
978     }
979 
980     return NULL;
981 }
982 
983 
984 nxt_app_type_t
985 nxt_app_parse_type(nxt_str_t *str)
986 {
987     if (nxt_str_eq(str, "python", 6)) {
988         return NXT_APP_PYTHON;
989 
990     } else if (nxt_str_eq(str, "php", 3)) {
991         return NXT_APP_PHP;
992 
993     } else if (nxt_str_eq(str, "go", 2)) {
994         return NXT_APP_GO;
995 
996     }
997 
998     return NXT_APP_UNKNOWN;
999 }
1000