xref: /unit/src/nxt_application.c (revision 723:c48e0ee3a8b3)
1 
2 /*
3  * Copyright (C) Max Romanov
4  * Copyright (C) Igor Sysoev
5  * Copyright (C) Valentin V. Bartenev
6  * Copyright (C) NGINX, Inc.
7  */
8 
9 #include <nxt_main.h>
10 #include <nxt_runtime.h>
11 #include <nxt_main_process.h>
12 #include <nxt_router.h>
13 #include <nxt_http.h>
14 #include <nxt_application.h>
15 #include <nxt_port_memory_int.h>
16 
17 #include <glob.h>
18 
19 
20 typedef struct {
21     nxt_app_type_t  type;
22     nxt_str_t       version;
23     nxt_str_t       file;
24 } nxt_module_t;
25 
26 
27 static nxt_buf_t *nxt_discovery_modules(nxt_task_t *task, const char *path);
28 static nxt_int_t nxt_discovery_module(nxt_task_t *task, nxt_mp_t *mp,
29     nxt_array_t *modules, const char *name);
30 static void nxt_discovery_completion_handler(nxt_task_t *task, void *obj,
31     void *data);
32 static void nxt_discovery_quit(nxt_task_t *task, nxt_port_recv_msg_t *msg,
33     void *data);
34 static nxt_app_module_t *nxt_app_module_load(nxt_task_t *task,
35     const char *name);
36 static nxt_int_t nxt_app_set_environment(nxt_conf_value_t *environment);
37 
38 static void nxt_app_http_release(nxt_task_t *task, void *obj, void *data);
39 
40 
41 static uint32_t  compat[] = {
42     NXT_VERNUM, NXT_DEBUG,
43 };
44 
45 
46 nxt_str_t  nxt_server = nxt_string(NXT_SERVER);
47 
48 
49 static nxt_thread_mutex_t        nxt_app_mutex;
50 static nxt_thread_cond_t         nxt_app_cond;
51 
52 static nxt_application_module_t  *nxt_app;
53 
54 
55 nxt_int_t
56 nxt_discovery_start(nxt_task_t *task, void *data)
57 {
58     uint32_t       stream;
59     nxt_buf_t      *b;
60     nxt_int_t      ret;
61     nxt_port_t     *main_port, *discovery_port;
62     nxt_runtime_t  *rt;
63 
64     nxt_debug(task, "DISCOVERY");
65 
66     rt = task->thread->runtime;
67 
68     b = nxt_discovery_modules(task, rt->modules);
69     if (nxt_slow_path(b == NULL)) {
70         return NXT_ERROR;
71     }
72 
73     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
74     discovery_port = rt->port_by_type[NXT_PROCESS_DISCOVERY];
75 
76     stream = nxt_port_rpc_register_handler(task, discovery_port,
77                                            nxt_discovery_quit,
78                                            nxt_discovery_quit,
79                                            main_port->pid, NULL);
80 
81     if (nxt_slow_path(stream == 0)) {
82         return NXT_ERROR;
83     }
84 
85     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_MODULES, -1,
86                                 stream, discovery_port->id, b);
87 
88     if (nxt_slow_path(ret != NXT_OK)) {
89         nxt_port_rpc_cancel(task, discovery_port, stream);
90         return NXT_ERROR;
91     }
92 
93     return NXT_OK;
94 }
95 
96 
97 static nxt_buf_t *
98 nxt_discovery_modules(nxt_task_t *task, const char *path)
99 {
100     char          *name;
101     u_char        *p, *end;
102     size_t        size;
103     glob_t        glb;
104     nxt_mp_t      *mp;
105     nxt_buf_t     *b;
106     nxt_int_t     ret;
107     nxt_uint_t    i, n;
108     nxt_array_t   *modules;
109     nxt_module_t  *module;
110 
111     b = NULL;
112 
113     mp = nxt_mp_create(1024, 128, 256, 32);
114     if (mp == NULL) {
115         return b;
116     }
117 
118     ret = glob(path, 0, NULL, &glb);
119 
120     n = glb.gl_pathc;
121 
122     if (ret != 0) {
123         nxt_log(task, NXT_LOG_NOTICE,
124                 "no modules matching: \"%s\" found", path);
125         n = 0;
126     }
127 
128     modules = nxt_array_create(mp, n, sizeof(nxt_module_t));
129     if (modules == NULL) {
130         goto fail;
131     }
132 
133     for (i = 0; i < n; i++) {
134         name = glb.gl_pathv[i];
135 
136         ret = nxt_discovery_module(task, mp, modules, name);
137         if (ret != NXT_OK) {
138             goto fail;
139         }
140     }
141 
142     size = nxt_length("[]");
143     module = modules->elts;
144     n = modules->nelts;
145 
146     for (i = 0; i < n; i++) {
147         nxt_debug(task, "module: %d %V %V",
148                   module[i].type, &module[i].version, &module[i].file);
149 
150         size += nxt_length("{\"type\": ,");
151         size += nxt_length(" \"version\": \"\",");
152         size += nxt_length(" \"file\": \"\"},");
153 
154         size += NXT_INT_T_LEN
155                 + module[i].version.length
156                 + module[i].file.length;
157     }
158 
159     b = nxt_buf_mem_alloc(mp, size, 0);
160     if (b == NULL) {
161         goto fail;
162     }
163 
164     b->completion_handler = nxt_discovery_completion_handler;
165 
166     p = b->mem.free;
167     end = b->mem.end;
168     *p++ = '[';
169 
170     for (i = 0; i < n; i++) {
171         p = nxt_sprintf(p, end,
172                       "{\"type\": %d, \"version\": \"%V\", \"file\": \"%V\"},",
173                       module[i].type, &module[i].version, &module[i].file);
174     }
175 
176     *p++ = ']';
177     b->mem.free = p;
178 
179 fail:
180 
181     globfree(&glb);
182 
183     return b;
184 }
185 
186 
187 static nxt_int_t
188 nxt_discovery_module(nxt_task_t *task, nxt_mp_t *mp, nxt_array_t *modules,
189     const char *name)
190 {
191     void                      *dl;
192     nxt_str_t                 version;
193     nxt_int_t                 ret;
194     nxt_uint_t                i, n;
195     nxt_module_t              *module;
196     nxt_app_type_t            type;
197     nxt_application_module_t  *app;
198 
199     /*
200      * Only memory allocation failure should return NXT_ERROR.
201      * Any module processing errors are ignored.
202      */
203     ret = NXT_ERROR;
204 
205     dl = dlopen(name, RTLD_GLOBAL | RTLD_NOW);
206 
207     if (dl == NULL) {
208         nxt_alert(task, "dlopen(\"%s\"), failed: \"%s\"", name, dlerror());
209         return NXT_OK;
210     }
211 
212     app = dlsym(dl, "nxt_app_module");
213 
214     if (app != NULL) {
215         nxt_log(task, NXT_LOG_NOTICE, "module: %V %s \"%s\"",
216                 &app->type, app->version, name);
217 
218         if (app->compat_length != sizeof(compat)
219             || nxt_memcmp(app->compat, compat, sizeof(compat)) != 0)
220         {
221             nxt_log(task, NXT_LOG_NOTICE, "incompatible module %s", name);
222 
223             goto done;
224         }
225 
226         type = nxt_app_parse_type(app->type.start, app->type.length);
227 
228         if (type == NXT_APP_UNKNOWN) {
229             nxt_log(task, NXT_LOG_NOTICE, "unknown module type %V", &app->type);
230 
231             goto done;
232         }
233 
234         module = modules->elts;
235         n = modules->nelts;
236 
237         version.start = (u_char *) app->version;
238         version.length = nxt_strlen(app->version);
239 
240         for (i = 0; i < n; i++) {
241             if (type == module[i].type
242                 && nxt_strstr_eq(&module[i].version, &version))
243             {
244                 nxt_log(task, NXT_LOG_NOTICE,
245                         "ignoring %s module with the same "
246                         "application language version %V %V as in %V",
247                         name, &app->type, &version, &module[i].file);
248 
249                 goto done;
250             }
251         }
252 
253         module = nxt_array_add(modules);
254         if (module == NULL) {
255             goto fail;
256         }
257 
258         module->type = type;
259 
260         nxt_str_dup(mp, &module->version, &version);
261         if (module->version.start == NULL) {
262             goto fail;
263         }
264 
265         module->file.length = nxt_strlen(name);
266 
267         module->file.start = nxt_mp_alloc(mp, module->file.length);
268         if (module->file.start == NULL) {
269             goto fail;
270         }
271 
272         nxt_memcpy(module->file.start, name, module->file.length);
273 
274     } else {
275         nxt_alert(task, "dlsym(\"%s\"), failed: \"%s\"", name, dlerror());
276     }
277 
278 done:
279 
280     ret = NXT_OK;
281 
282 fail:
283 
284     if (dlclose(dl) != 0) {
285         nxt_alert(task, "dlclose(\"%s\"), failed: \"%s\"", name, dlerror());
286     }
287 
288     return ret;
289 }
290 
291 
292 static void
293 nxt_discovery_completion_handler(nxt_task_t *task, void *obj, void *data)
294 {
295     nxt_mp_t   *mp;
296     nxt_buf_t  *b;
297 
298     b = obj;
299     mp = b->data;
300 
301     nxt_mp_destroy(mp);
302 }
303 
304 
305 static void
306 nxt_discovery_quit(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data)
307 {
308     nxt_worker_process_quit_handler(task, msg);
309 }
310 
311 
312 nxt_int_t
313 nxt_app_start(nxt_task_t *task, void *data)
314 {
315     nxt_int_t              ret;
316     nxt_app_lang_module_t  *lang;
317     nxt_common_app_conf_t  *app_conf;
318 
319     app_conf = data;
320 
321     lang = nxt_app_lang_module(task->thread->runtime, &app_conf->type);
322     if (nxt_slow_path(lang == NULL)) {
323         nxt_alert(task, "unknown application type: \"%V\"", &app_conf->type);
324         return NXT_ERROR;
325     }
326 
327     nxt_app = lang->module;
328 
329     if (nxt_app == NULL) {
330         nxt_debug(task, "application language module: %s \"%s\"",
331                   lang->version, lang->file);
332 
333         nxt_app = nxt_app_module_load(task, lang->file);
334     }
335 
336     if (app_conf->working_directory != NULL
337         && app_conf->working_directory[0] != 0)
338     {
339         ret = chdir(app_conf->working_directory);
340 
341         if (nxt_slow_path(ret != 0)) {
342             nxt_log(task, NXT_LOG_WARN, "chdir(%s) failed %E",
343                     app_conf->working_directory, nxt_errno);
344 
345             return NXT_ERROR;
346         }
347     }
348 
349     if (nxt_slow_path(nxt_app_set_environment(app_conf->environment)
350                       != NXT_OK))
351     {
352         nxt_alert(task, "failed to set environment");
353         return NXT_ERROR;
354     }
355 
356     if (nxt_slow_path(nxt_thread_mutex_create(&nxt_app_mutex) != NXT_OK)) {
357         return NXT_ERROR;
358     }
359 
360     if (nxt_slow_path(nxt_thread_cond_create(&nxt_app_cond) != NXT_OK)) {
361         return NXT_ERROR;
362     }
363 
364     ret = nxt_app->init(task, data);
365 
366     if (nxt_slow_path(ret != NXT_OK)) {
367         nxt_debug(task, "application init failed");
368 
369     } else {
370         nxt_debug(task, "application init done");
371     }
372 
373     return ret;
374 }
375 
376 
377 static nxt_app_module_t *
378 nxt_app_module_load(nxt_task_t *task, const char *name)
379 {
380     void  *dl;
381 
382     dl = dlopen(name, RTLD_GLOBAL | RTLD_LAZY);
383 
384     if (dl != NULL) {
385         return dlsym(dl, "nxt_app_module");
386     }
387 
388     nxt_alert(task, "dlopen(\"%s\"), failed: \"%s\"", name, dlerror());
389 
390     return NULL;
391 }
392 
393 
394 static nxt_int_t
395 nxt_app_set_environment(nxt_conf_value_t *environment)
396 {
397     char              *env, *p;
398     uint32_t          next;
399     nxt_str_t         name, value;
400     nxt_conf_value_t  *value_obj;
401 
402     if (environment != NULL) {
403         next = 0;
404 
405         for ( ;; ) {
406             value_obj = nxt_conf_next_object_member(environment, &name, &next);
407             if (value_obj == NULL) {
408                 break;
409             }
410 
411             nxt_conf_get_string(value_obj, &value);
412 
413             env = nxt_malloc(name.length + value.length + 2);
414             if (nxt_slow_path(env == NULL)) {
415                 return NXT_ERROR;
416             }
417 
418             p = nxt_cpymem(env, name.start, name.length);
419             *p++ = '=';
420             p = nxt_cpymem(p, value.start, value.length);
421             *p = '\0';
422 
423             if (nxt_slow_path(putenv(env) != 0)) {
424                 return NXT_ERROR;
425             }
426         }
427     }
428 
429     return NXT_OK;
430 }
431 
432 
433 void
434 nxt_app_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
435 {
436     if (nxt_app->atexit != NULL) {
437         nxt_app->atexit(task);
438     }
439 
440     nxt_worker_process_quit_handler(task, msg);
441 }
442 
443 
444 void
445 nxt_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
446 {
447     size_t          dump_size;
448     nxt_int_t       res;
449     nxt_buf_t       *b;
450     nxt_port_t      *port;
451     nxt_app_rmsg_t  rmsg = { msg->buf };
452     nxt_app_wmsg_t  wmsg;
453 
454     b = msg->buf;
455     dump_size = b->mem.free - b->mem.pos;
456 
457     if (dump_size > 300) {
458         dump_size = 300;
459     }
460 
461     nxt_debug(task, "app data: %*s ...", dump_size, b->mem.pos);
462 
463     port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
464                                  msg->port_msg.reply_port);
465     if (nxt_slow_path(port == NULL)) {
466         nxt_debug(task, "stream #%uD: reply port %d not found",
467                   msg->port_msg.stream, msg->port_msg.reply_port);
468         return;
469     }
470 
471     wmsg.port = port;
472     wmsg.write = NULL;
473     wmsg.buf = &wmsg.write;
474     wmsg.stream = msg->port_msg.stream;
475 
476     res = nxt_app->run(task, &rmsg, &wmsg);
477 
478     if (nxt_slow_path(res != NXT_OK)) {
479         nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
480                               msg->port_msg.stream, 0, NULL);
481     }
482 }
483 
484 
485 u_char *
486 nxt_app_msg_write_get_buf(nxt_task_t *task, nxt_app_wmsg_t *msg, size_t size)
487 {
488     size_t      free_size;
489     u_char      *res;
490     nxt_buf_t   *b;
491 
492     res = NULL;
493 
494     do {
495         b = *msg->buf;
496 
497         if (b == NULL) {
498             b = nxt_port_mmap_get_buf(task, msg->port, size);
499             if (nxt_slow_path(b == NULL)) {
500                 return NULL;
501             }
502 
503             *msg->buf = b;
504         }
505 
506         free_size = nxt_buf_mem_free_size(&b->mem);
507 
508         if (free_size >= size) {
509             res = b->mem.free;
510             b->mem.free += size;
511 
512             return res;
513         }
514 
515         if (nxt_port_mmap_increase_buf(task, b, size, size) == NXT_OK) {
516             res = b->mem.free;
517             b->mem.free += size;
518 
519             return res;
520         }
521 
522         msg->buf = &b->next;
523     } while(1);
524 }
525 
526 
527 nxt_int_t
528 nxt_app_msg_write(nxt_task_t *task, nxt_app_wmsg_t *msg, u_char *c, size_t size)
529 {
530     u_char  *dst;
531     size_t  dst_length;
532 
533     if (c != NULL) {
534         dst_length = size + (size < 128 ? 1 : 4) + 1;
535 
536         dst = nxt_app_msg_write_get_buf(task, msg, dst_length);
537         if (nxt_slow_path(dst == NULL)) {
538             nxt_debug(task, "nxt_app_msg_write: get_buf(%uz) failed",
539                       dst_length);
540             return NXT_ERROR;
541         }
542 
543         dst = nxt_app_msg_write_length(dst, size + 1); /* +1 for trailing 0 */
544 
545         nxt_memcpy(dst, c, size);
546         dst[size] = 0;
547 
548         nxt_debug(task, "nxt_app_msg_write: %uz %*s", size, size, c);
549 
550     } else {
551         dst_length = 1;
552 
553         dst = nxt_app_msg_write_get_buf(task, msg, dst_length);
554         if (nxt_slow_path(dst == NULL)) {
555             nxt_debug(task, "nxt_app_msg_write: get_buf(%uz) failed",
556                       dst_length);
557             return NXT_ERROR;
558         }
559 
560         dst = nxt_app_msg_write_length(dst, 0);
561 
562         nxt_debug(task, "nxt_app_msg_write: NULL");
563     }
564 
565     return NXT_OK;
566 }
567 
568 
569 nxt_int_t
570 nxt_app_msg_write_prefixed_upcase(nxt_task_t *task, nxt_app_wmsg_t *msg,
571     const nxt_str_t *prefix, u_char *c, size_t size)
572 {
573     u_char  *dst, *src;
574     size_t  i, length, dst_length;
575 
576     length = prefix->length + size;
577 
578     dst_length = length + (length < 128 ? 1 : 4) + 1;
579 
580     dst = nxt_app_msg_write_get_buf(task, msg, dst_length);
581     if (nxt_slow_path(dst == NULL)) {
582         return NXT_ERROR;
583     }
584 
585     dst = nxt_app_msg_write_length(dst, length + 1); /* +1 for trailing 0 */
586 
587     nxt_memcpy(dst, prefix->start, prefix->length);
588     dst += prefix->length;
589 
590     src = c;
591     for (i = 0; i < size; i++, dst++, src++) {
592 
593         if (*src >= 'a' && *src <= 'z') {
594             *dst = *src & ~0x20;
595             continue;
596         }
597 
598         if (*src == '-') {
599             *dst = '_';
600             continue;
601         }
602 
603         *dst = *src;
604     }
605 
606     *dst = 0;
607 
608     return NXT_OK;
609 }
610 
611 
612 nxt_inline nxt_int_t
613 nxt_app_msg_read_size_(nxt_task_t *task, nxt_app_rmsg_t *msg, size_t *size)
614 {
615     nxt_buf_t  *buf;
616 
617     do {
618         buf = msg->buf;
619 
620         if (nxt_slow_path(buf == NULL)) {
621             return NXT_DONE;
622         }
623 
624         if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 1)) {
625             if (nxt_fast_path(nxt_buf_mem_used_size(&buf->mem) == 0)) {
626                 msg->buf = buf->next;
627                 continue;
628             }
629             return NXT_ERROR;
630         }
631 
632         if (buf->mem.pos[0] >= 128) {
633             if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 4)) {
634                 return NXT_ERROR;
635             }
636         }
637 
638         break;
639     } while (1);
640 
641     buf->mem.pos = nxt_app_msg_read_length(buf->mem.pos, size);
642 
643     return NXT_OK;
644 }
645 
646 
647 nxt_int_t
648 nxt_app_msg_read_str(nxt_task_t *task, nxt_app_rmsg_t *msg, nxt_str_t *str)
649 {
650     size_t      length;
651     nxt_int_t   ret;
652     nxt_buf_t  *buf;
653 
654     ret = nxt_app_msg_read_size_(task, msg, &length);
655     if (ret != NXT_OK) {
656         return ret;
657     }
658 
659     buf = msg->buf;
660 
661     if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < (intptr_t) length)) {
662         return NXT_ERROR;
663     }
664 
665     if (length > 0) {
666         str->start = buf->mem.pos;
667         str->length = length - 1;
668 
669         buf->mem.pos += length;
670 
671         nxt_debug(task, "nxt_read_str: %uz %*s", length - 1,
672                         length - 1, str->start);
673 
674     } else {
675         str->start = NULL;
676         str->length = 0;
677 
678         nxt_debug(task, "nxt_read_str: NULL");
679     }
680 
681     return NXT_OK;
682 }
683 
684 
685 size_t
686 nxt_app_msg_read_raw(nxt_task_t *task, nxt_app_rmsg_t *msg, void *dst,
687     size_t size)
688 {
689     size_t     res, read_size;
690     nxt_buf_t  *buf;
691 
692     res = 0;
693 
694     while (size > 0) {
695         buf = msg->buf;
696 
697         if (nxt_slow_path(buf == NULL)) {
698             break;
699         }
700 
701         if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) == 0)) {
702             msg->buf = buf->next;
703             continue;
704         }
705 
706         read_size = nxt_buf_mem_used_size(&buf->mem);
707         read_size = nxt_min(read_size, size);
708 
709         dst = nxt_cpymem(dst, buf->mem.pos, read_size);
710 
711         size -= read_size;
712         buf->mem.pos += read_size;
713         res += read_size;
714     }
715 
716     nxt_debug(task, "nxt_read_raw: %uz", res);
717 
718     return res;
719 }
720 
721 
722 nxt_int_t
723 nxt_app_msg_read_nvp(nxt_task_t *task, nxt_app_rmsg_t *rmsg, nxt_str_t *n,
724     nxt_str_t *v)
725 {
726     nxt_int_t rc;
727 
728     rc = nxt_app_msg_read_str(task, rmsg, n);
729     if (nxt_slow_path(rc != NXT_OK)) {
730         return rc;
731     }
732 
733     rc = nxt_app_msg_read_str(task, rmsg, v);
734     if (nxt_slow_path(rc != NXT_OK)) {
735         return rc;
736     }
737 
738     return rc;
739 }
740 
741 
742 nxt_int_t
743 nxt_app_msg_read_size(nxt_task_t *task, nxt_app_rmsg_t *msg, size_t *size)
744 {
745     nxt_int_t  ret;
746 
747     ret = nxt_app_msg_read_size_(task, msg, size);
748 
749     nxt_debug(task, "nxt_read_size: %d", (int) *size);
750 
751     return ret;
752 }
753 
754 
755 nxt_int_t
756 nxt_app_http_req_done(nxt_task_t *task, nxt_app_parse_ctx_t *ar)
757 {
758     ar->timer.handler = nxt_app_http_release;
759     nxt_timer_add(task->thread->engine, &ar->timer, 0);
760 
761     return NXT_OK;
762 }
763 
764 
765 static void
766 nxt_app_http_release(nxt_task_t *task, void *obj, void *data)
767 {
768     nxt_timer_t          *timer;
769     nxt_app_parse_ctx_t  *ar;
770 
771     timer = obj;
772 
773     nxt_debug(task, "http app release");
774 
775     ar = nxt_timer_data(timer, nxt_app_parse_ctx_t, timer);
776 
777     nxt_mp_release(ar->request->mem_pool);
778 }
779 
780 
781 nxt_int_t
782 nxt_app_msg_flush(nxt_task_t *task, nxt_app_wmsg_t *msg, nxt_bool_t last)
783 {
784     nxt_int_t   rc;
785     nxt_buf_t   *b;
786 
787     rc = NXT_OK;
788 
789     if (nxt_slow_path(last == 1)) {
790         do {
791             b = *msg->buf;
792 
793             if (b == NULL) {
794                 b = nxt_buf_sync_alloc(msg->port->mem_pool, NXT_BUF_SYNC_LAST);
795                 *msg->buf = b;
796                 break;
797             }
798 
799             msg->buf = &b->next;
800         } while(1);
801     }
802 
803     if (nxt_slow_path(msg->write != NULL)) {
804         rc = nxt_port_socket_write(task, msg->port, NXT_PORT_MSG_DATA,
805                                    -1, msg->stream, 0, msg->write);
806 
807         msg->write = NULL;
808         msg->buf = &msg->write;
809     }
810 
811     return rc;
812 }
813 
814 
815 nxt_int_t
816 nxt_app_msg_write_raw(nxt_task_t *task, nxt_app_wmsg_t *msg, const u_char *c,
817     size_t size)
818 {
819     size_t     free_size, copy_size;
820     nxt_buf_t  *b;
821 
822     nxt_debug(task, "nxt_app_msg_write_raw: %uz", size);
823 
824     while (size > 0) {
825         b = *msg->buf;
826 
827         if (b == NULL) {
828             free_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
829 
830             b = nxt_port_mmap_get_buf(task, msg->port, free_size);
831             if (nxt_slow_path(b == NULL)) {
832                 return NXT_ERROR;
833             }
834 
835             *msg->buf = b;
836 
837         } else {
838             free_size = nxt_buf_mem_free_size(&b->mem);
839 
840             if (free_size < size
841                 && nxt_port_mmap_increase_buf(task, b, size, 1) == NXT_OK)
842             {
843                 free_size = nxt_buf_mem_free_size(&b->mem);
844             }
845         }
846 
847         if (free_size > 0) {
848             copy_size = nxt_min(free_size, size);
849 
850             b->mem.free = nxt_cpymem(b->mem.free, c, copy_size);
851 
852             size -= copy_size;
853             c += copy_size;
854 
855             if (size == 0) {
856                 return NXT_OK;
857             }
858         }
859 
860         msg->buf = &b->next;
861     }
862 
863     return NXT_OK;
864 }
865 
866 
867 nxt_app_lang_module_t *
868 nxt_app_lang_module(nxt_runtime_t *rt, nxt_str_t *name)
869 {
870     u_char                 *p, *end, *version;
871     size_t                 version_length;
872     nxt_uint_t             i, n;
873     nxt_app_type_t         type;
874     nxt_app_lang_module_t  *lang;
875 
876     end = name->start + name->length;
877     version = end;
878 
879     for (p = name->start; p < end; p++) {
880         if (*p == ' ') {
881             version = p + 1;
882             break;
883         }
884 
885         if (*p >= '0' && *p <= '9') {
886             version = p;
887             break;
888         }
889     }
890 
891     type = nxt_app_parse_type(name->start, p - name->start);
892 
893     if (type == NXT_APP_UNKNOWN) {
894         return NULL;
895     }
896 
897     version_length = end - version;
898 
899     lang = rt->languages->elts;
900     n = rt->languages->nelts;
901 
902     for (i = 0; i < n; i++) {
903 
904         /*
905          * Versions are sorted in descending order
906          * so first match chooses the highest version.
907          */
908 
909         if (lang[i].type == type
910             && nxt_strvers_match(lang[i].version, version, version_length))
911         {
912             return &lang[i];
913         }
914     }
915 
916     return NULL;
917 }
918 
919 
920 nxt_app_type_t
921 nxt_app_parse_type(u_char *p, size_t length)
922 {
923     nxt_str_t str;
924 
925     str.length = length;
926     str.start = p;
927 
928     if (nxt_str_eq(&str, "python", 6)) {
929         return NXT_APP_PYTHON;
930 
931     } else if (nxt_str_eq(&str, "php", 3)) {
932         return NXT_APP_PHP;
933 
934     } else if (nxt_str_eq(&str, "go", 2)) {
935         return NXT_APP_GO;
936 
937     } else if (nxt_str_eq(&str, "perl", 4)) {
938         return NXT_APP_PERL;
939 
940     } else if (nxt_str_eq(&str, "ruby", 4)) {
941         return NXT_APP_RUBY;
942     }
943 
944     return NXT_APP_UNKNOWN;
945 }
946