xref: /unit/src/nxt_application.c (revision 703:2d536dde84d2)
1 
2 /*
3  * Copyright (C) Max Romanov
4  * Copyright (C) Igor Sysoev
5  * Copyright (C) Valentin V. Bartenev
6  * Copyright (C) NGINX, Inc.
7  */
8 
9 #include <nxt_main.h>
10 #include <nxt_runtime.h>
11 #include <nxt_main_process.h>
12 #include <nxt_router.h>
13 #include <nxt_http.h>
14 #include <nxt_application.h>
15 
16 #include <glob.h>
17 
18 
19 typedef struct {
20     nxt_app_type_t  type;
21     nxt_str_t       version;
22     nxt_str_t       file;
23 } nxt_module_t;
24 
25 
26 static nxt_buf_t *nxt_discovery_modules(nxt_task_t *task, const char *path);
27 static nxt_int_t nxt_discovery_module(nxt_task_t *task, nxt_mp_t *mp,
28     nxt_array_t *modules, const char *name);
29 static void nxt_discovery_completion_handler(nxt_task_t *task, void *obj,
30     void *data);
31 static void nxt_discovery_quit(nxt_task_t *task, nxt_port_recv_msg_t *msg,
32     void *data);
33 static nxt_app_module_t *nxt_app_module_load(nxt_task_t *task,
34     const char *name);
35 static nxt_int_t nxt_app_set_environment(nxt_conf_value_t *environment);
36 
37 static void nxt_app_http_release(nxt_task_t *task, void *obj, void *data);
38 
39 
40 static uint32_t  compat[] = {
41     NXT_VERNUM, NXT_DEBUG,
42 };
43 
44 
45 nxt_str_t  nxt_server = nxt_string(NXT_SERVER);
46 
47 
48 static nxt_thread_mutex_t        nxt_app_mutex;
49 static nxt_thread_cond_t         nxt_app_cond;
50 
51 static nxt_application_module_t  *nxt_app;
52 
53 
54 nxt_int_t
55 nxt_discovery_start(nxt_task_t *task, void *data)
56 {
57     uint32_t       stream;
58     nxt_buf_t      *b;
59     nxt_int_t      ret;
60     nxt_port_t     *main_port, *discovery_port;
61     nxt_runtime_t  *rt;
62 
63     nxt_debug(task, "DISCOVERY");
64 
65     rt = task->thread->runtime;
66 
67     b = nxt_discovery_modules(task, rt->modules);
68     if (nxt_slow_path(b == NULL)) {
69         return NXT_ERROR;
70     }
71 
72     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
73     discovery_port = rt->port_by_type[NXT_PROCESS_DISCOVERY];
74 
75     stream = nxt_port_rpc_register_handler(task, discovery_port,
76                                            nxt_discovery_quit,
77                                            nxt_discovery_quit,
78                                            main_port->pid, NULL);
79 
80     if (nxt_slow_path(stream == 0)) {
81         return NXT_ERROR;
82     }
83 
84     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_MODULES, -1,
85                                 stream, discovery_port->id, b);
86 
87     if (nxt_slow_path(ret != NXT_OK)) {
88         nxt_port_rpc_cancel(task, discovery_port, stream);
89         return NXT_ERROR;
90     }
91 
92     return NXT_OK;
93 }
94 
95 
96 static nxt_buf_t *
97 nxt_discovery_modules(nxt_task_t *task, const char *path)
98 {
99     char          *name;
100     u_char        *p, *end;
101     size_t        size;
102     glob_t        glb;
103     nxt_mp_t      *mp;
104     nxt_buf_t     *b;
105     nxt_int_t     ret;
106     nxt_uint_t    i, n;
107     nxt_array_t   *modules;
108     nxt_module_t  *module;
109 
110     b = NULL;
111 
112     mp = nxt_mp_create(1024, 128, 256, 32);
113     if (mp == NULL) {
114         return b;
115     }
116 
117     ret = glob(path, 0, NULL, &glb);
118 
119     n = glb.gl_pathc;
120 
121     if (ret != 0) {
122         nxt_log(task, NXT_LOG_NOTICE,
123                 "no modules matching: \"%s\" found", path);
124         n = 0;
125     }
126 
127     modules = nxt_array_create(mp, n, sizeof(nxt_module_t));
128     if (modules == NULL) {
129         goto fail;
130     }
131 
132     for (i = 0; i < n; i++) {
133         name = glb.gl_pathv[i];
134 
135         ret = nxt_discovery_module(task, mp, modules, name);
136         if (ret != NXT_OK) {
137             goto fail;
138         }
139     }
140 
141     size = nxt_length("[]");
142     module = modules->elts;
143     n = modules->nelts;
144 
145     for (i = 0; i < n; i++) {
146         nxt_debug(task, "module: %d %V %V",
147                   module[i].type, &module[i].version, &module[i].file);
148 
149         size += nxt_length("{\"type\": ,");
150         size += nxt_length(" \"version\": \"\",");
151         size += nxt_length(" \"file\": \"\"},");
152 
153         size += NXT_INT_T_LEN
154                 + module[i].version.length
155                 + module[i].file.length;
156     }
157 
158     b = nxt_buf_mem_alloc(mp, size, 0);
159     if (b == NULL) {
160         goto fail;
161     }
162 
163     b->completion_handler = nxt_discovery_completion_handler;
164 
165     p = b->mem.free;
166     end = b->mem.end;
167     *p++ = '[';
168 
169     for (i = 0; i < n; i++) {
170         p = nxt_sprintf(p, end,
171                       "{\"type\": %d, \"version\": \"%V\", \"file\": \"%V\"},",
172                       module[i].type, &module[i].version, &module[i].file);
173     }
174 
175     *p++ = ']';
176     b->mem.free = p;
177 
178 fail:
179 
180     globfree(&glb);
181 
182     return b;
183 }
184 
185 
186 static nxt_int_t
187 nxt_discovery_module(nxt_task_t *task, nxt_mp_t *mp, nxt_array_t *modules,
188     const char *name)
189 {
190     void                      *dl;
191     nxt_str_t                 version;
192     nxt_int_t                 ret;
193     nxt_uint_t                i, n;
194     nxt_module_t              *module;
195     nxt_app_type_t            type;
196     nxt_application_module_t  *app;
197 
198     /*
199      * Only memory allocation failure should return NXT_ERROR.
200      * Any module processing errors are ignored.
201      */
202     ret = NXT_ERROR;
203 
204     dl = dlopen(name, RTLD_GLOBAL | RTLD_NOW);
205 
206     if (dl == NULL) {
207         nxt_alert(task, "dlopen(\"%s\"), failed: \"%s\"", name, dlerror());
208         return NXT_OK;
209     }
210 
211     app = dlsym(dl, "nxt_app_module");
212 
213     if (app != NULL) {
214         nxt_log(task, NXT_LOG_NOTICE, "module: %V %s \"%s\"",
215                 &app->type, app->version, name);
216 
217         if (app->compat_length != sizeof(compat)
218             || nxt_memcmp(app->compat, compat, sizeof(compat)) != 0)
219         {
220             nxt_log(task, NXT_LOG_NOTICE, "incompatible module %s", name);
221 
222             goto done;
223         }
224 
225         type = nxt_app_parse_type(app->type.start, app->type.length);
226 
227         if (type == NXT_APP_UNKNOWN) {
228             nxt_log(task, NXT_LOG_NOTICE, "unknown module type %V", &app->type);
229 
230             goto done;
231         }
232 
233         module = modules->elts;
234         n = modules->nelts;
235 
236         version.start = (u_char *) app->version;
237         version.length = nxt_strlen(app->version);
238 
239         for (i = 0; i < n; i++) {
240             if (type == module[i].type
241                 && nxt_strstr_eq(&module[i].version, &version))
242             {
243                 nxt_log(task, NXT_LOG_NOTICE,
244                         "ignoring %s module with the same "
245                         "application language version %V %V as in %V",
246                         name, &app->type, &version, &module[i].file);
247 
248                 goto done;
249             }
250         }
251 
252         module = nxt_array_add(modules);
253         if (module == NULL) {
254             goto fail;
255         }
256 
257         module->type = type;
258 
259         nxt_str_dup(mp, &module->version, &version);
260         if (module->version.start == NULL) {
261             goto fail;
262         }
263 
264         module->file.length = nxt_strlen(name);
265 
266         module->file.start = nxt_mp_alloc(mp, module->file.length);
267         if (module->file.start == NULL) {
268             goto fail;
269         }
270 
271         nxt_memcpy(module->file.start, name, module->file.length);
272 
273     } else {
274         nxt_alert(task, "dlsym(\"%s\"), failed: \"%s\"", name, dlerror());
275     }
276 
277 done:
278 
279     ret = NXT_OK;
280 
281 fail:
282 
283     if (dlclose(dl) != 0) {
284         nxt_alert(task, "dlclose(\"%s\"), failed: \"%s\"", name, dlerror());
285     }
286 
287     return ret;
288 }
289 
290 
291 static void
292 nxt_discovery_completion_handler(nxt_task_t *task, void *obj, void *data)
293 {
294     nxt_mp_t   *mp;
295     nxt_buf_t  *b;
296 
297     b = obj;
298     mp = b->data;
299 
300     nxt_mp_destroy(mp);
301 }
302 
303 
304 static void
305 nxt_discovery_quit(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data)
306 {
307     nxt_worker_process_quit_handler(task, msg);
308 }
309 
310 
311 nxt_int_t
312 nxt_app_start(nxt_task_t *task, void *data)
313 {
314     nxt_int_t              ret;
315     nxt_app_lang_module_t  *lang;
316     nxt_common_app_conf_t  *app_conf;
317 
318     app_conf = data;
319 
320     lang = nxt_app_lang_module(task->thread->runtime, &app_conf->type);
321     if (nxt_slow_path(lang == NULL)) {
322         nxt_alert(task, "unknown application type: \"%V\"", &app_conf->type);
323         return NXT_ERROR;
324     }
325 
326     nxt_app = lang->module;
327 
328     if (nxt_app == NULL) {
329         nxt_debug(task, "application language module: %s \"%s\"",
330                   lang->version, lang->file);
331 
332         nxt_app = nxt_app_module_load(task, lang->file);
333     }
334 
335     if (app_conf->working_directory != NULL
336         && app_conf->working_directory[0] != 0)
337     {
338         ret = chdir(app_conf->working_directory);
339 
340         if (nxt_slow_path(ret != 0)) {
341             nxt_log(task, NXT_LOG_WARN, "chdir(%s) failed %E",
342                     app_conf->working_directory, nxt_errno);
343 
344             return NXT_ERROR;
345         }
346     }
347 
348     if (nxt_slow_path(nxt_app_set_environment(app_conf->environment)
349                       != NXT_OK))
350     {
351         nxt_alert(task, "failed to set environment");
352         return NXT_ERROR;
353     }
354 
355     if (nxt_slow_path(nxt_thread_mutex_create(&nxt_app_mutex) != NXT_OK)) {
356         return NXT_ERROR;
357     }
358 
359     if (nxt_slow_path(nxt_thread_cond_create(&nxt_app_cond) != NXT_OK)) {
360         return NXT_ERROR;
361     }
362 
363     ret = nxt_app->init(task, data);
364 
365     if (nxt_slow_path(ret != NXT_OK)) {
366         nxt_debug(task, "application init failed");
367 
368     } else {
369         nxt_debug(task, "application init done");
370     }
371 
372     return ret;
373 }
374 
375 
376 static nxt_app_module_t *
377 nxt_app_module_load(nxt_task_t *task, const char *name)
378 {
379     void  *dl;
380 
381     dl = dlopen(name, RTLD_GLOBAL | RTLD_LAZY);
382 
383     if (dl != NULL) {
384         return dlsym(dl, "nxt_app_module");
385     }
386 
387     nxt_alert(task, "dlopen(\"%s\"), failed: \"%s\"", name, dlerror());
388 
389     return NULL;
390 }
391 
392 
393 static nxt_int_t
394 nxt_app_set_environment(nxt_conf_value_t *environment)
395 {
396     char              *env, *p;
397     uint32_t          next;
398     nxt_str_t         name, value;
399     nxt_conf_value_t  *value_obj;
400 
401     if (environment != NULL) {
402         next = 0;
403 
404         for ( ;; ) {
405             value_obj = nxt_conf_next_object_member(environment, &name, &next);
406             if (value_obj == NULL) {
407                 break;
408             }
409 
410             nxt_conf_get_string(value_obj, &value);
411 
412             env = nxt_malloc(name.length + value.length + 2);
413             if (nxt_slow_path(env == NULL)) {
414                 return NXT_ERROR;
415             }
416 
417             p = nxt_cpymem(env, name.start, name.length);
418             *p++ = '=';
419             p = nxt_cpymem(p, value.start, value.length);
420             *p = '\0';
421 
422             if (nxt_slow_path(putenv(env) != 0)) {
423                 return NXT_ERROR;
424             }
425         }
426     }
427 
428     return NXT_OK;
429 }
430 
431 
432 void
433 nxt_app_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
434 {
435     if (nxt_app->atexit != NULL) {
436         nxt_app->atexit(task);
437     }
438 
439     nxt_worker_process_quit_handler(task, msg);
440 }
441 
442 
443 void
444 nxt_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
445 {
446     size_t          dump_size;
447     nxt_int_t       res;
448     nxt_buf_t       *b;
449     nxt_port_t      *port;
450     nxt_app_rmsg_t  rmsg = { msg->buf };
451     nxt_app_wmsg_t  wmsg;
452 
453     b = msg->buf;
454     dump_size = b->mem.free - b->mem.pos;
455 
456     if (dump_size > 300) {
457         dump_size = 300;
458     }
459 
460     nxt_debug(task, "app data: %*s ...", dump_size, b->mem.pos);
461 
462     port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
463                                  msg->port_msg.reply_port);
464     if (nxt_slow_path(port == NULL)) {
465         nxt_debug(task, "stream #%uD: reply port %d not found",
466                   msg->port_msg.stream, msg->port_msg.reply_port);
467         return;
468     }
469 
470     wmsg.port = port;
471     wmsg.write = NULL;
472     wmsg.buf = &wmsg.write;
473     wmsg.stream = msg->port_msg.stream;
474 
475     res = nxt_app->run(task, &rmsg, &wmsg);
476 
477     if (nxt_slow_path(res != NXT_OK)) {
478         nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
479                               msg->port_msg.stream, 0, NULL);
480     }
481 }
482 
483 
484 u_char *
485 nxt_app_msg_write_get_buf(nxt_task_t *task, nxt_app_wmsg_t *msg, size_t size)
486 {
487     size_t      free_size;
488     u_char      *res;
489     nxt_buf_t   *b;
490 
491     res = NULL;
492 
493     do {
494         b = *msg->buf;
495 
496         if (b == NULL) {
497             b = nxt_port_mmap_get_buf(task, msg->port, size);
498             if (nxt_slow_path(b == NULL)) {
499                 return NULL;
500             }
501 
502             *msg->buf = b;
503 
504             free_size = nxt_buf_mem_free_size(&b->mem);
505 
506             if (nxt_slow_path(free_size < size)) {
507                 nxt_log(task, NXT_LOG_WARN, "requested buffer too big "
508                         "(%z < %z)", free_size, size);
509                 return NULL;
510             }
511 
512         }
513 
514         free_size = nxt_buf_mem_free_size(&b->mem);
515 
516         if (free_size >= size) {
517             res = b->mem.free;
518             b->mem.free += size;
519 
520             return res;
521         }
522 
523         if (nxt_port_mmap_increase_buf(task, b, size, size) == NXT_OK) {
524             res = b->mem.free;
525             b->mem.free += size;
526 
527             return res;
528         }
529 
530         msg->buf = &b->next;
531     } while(1);
532 }
533 
534 
535 nxt_int_t
536 nxt_app_msg_write(nxt_task_t *task, nxt_app_wmsg_t *msg, u_char *c, size_t size)
537 {
538     u_char  *dst;
539     size_t  dst_length;
540 
541     if (c != NULL) {
542         dst_length = size + (size < 128 ? 1 : 4) + 1;
543 
544         dst = nxt_app_msg_write_get_buf(task, msg, dst_length);
545         if (nxt_slow_path(dst == NULL)) {
546             nxt_debug(task, "nxt_app_msg_write: get_buf(%uz) failed",
547                       dst_length);
548             return NXT_ERROR;
549         }
550 
551         dst = nxt_app_msg_write_length(dst, size + 1); /* +1 for trailing 0 */
552 
553         nxt_memcpy(dst, c, size);
554         dst[size] = 0;
555 
556         nxt_debug(task, "nxt_app_msg_write: %uz %*s", size, size, c);
557 
558     } else {
559         dst_length = 1;
560 
561         dst = nxt_app_msg_write_get_buf(task, msg, dst_length);
562         if (nxt_slow_path(dst == NULL)) {
563             nxt_debug(task, "nxt_app_msg_write: get_buf(%uz) failed",
564                       dst_length);
565             return NXT_ERROR;
566         }
567 
568         dst = nxt_app_msg_write_length(dst, 0);
569 
570         nxt_debug(task, "nxt_app_msg_write: NULL");
571     }
572 
573     return NXT_OK;
574 }
575 
576 
577 nxt_int_t
578 nxt_app_msg_write_prefixed_upcase(nxt_task_t *task, nxt_app_wmsg_t *msg,
579     const nxt_str_t *prefix, u_char *c, size_t size)
580 {
581     u_char  *dst, *src;
582     size_t  i, length, dst_length;
583 
584     length = prefix->length + size;
585 
586     dst_length = length + (length < 128 ? 1 : 4) + 1;
587 
588     dst = nxt_app_msg_write_get_buf(task, msg, dst_length);
589     if (nxt_slow_path(dst == NULL)) {
590         return NXT_ERROR;
591     }
592 
593     dst = nxt_app_msg_write_length(dst, length + 1); /* +1 for trailing 0 */
594 
595     nxt_memcpy(dst, prefix->start, prefix->length);
596     dst += prefix->length;
597 
598     src = c;
599     for (i = 0; i < size; i++, dst++, src++) {
600 
601         if (*src >= 'a' && *src <= 'z') {
602             *dst = *src & ~0x20;
603             continue;
604         }
605 
606         if (*src == '-') {
607             *dst = '_';
608             continue;
609         }
610 
611         *dst = *src;
612     }
613 
614     *dst = 0;
615 
616     return NXT_OK;
617 }
618 
619 
620 nxt_inline nxt_int_t
621 nxt_app_msg_read_size_(nxt_task_t *task, nxt_app_rmsg_t *msg, size_t *size)
622 {
623     nxt_buf_t  *buf;
624 
625     do {
626         buf = msg->buf;
627 
628         if (nxt_slow_path(buf == NULL)) {
629             return NXT_DONE;
630         }
631 
632         if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 1)) {
633             if (nxt_fast_path(nxt_buf_mem_used_size(&buf->mem) == 0)) {
634                 msg->buf = buf->next;
635                 continue;
636             }
637             return NXT_ERROR;
638         }
639 
640         if (buf->mem.pos[0] >= 128) {
641             if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < 4)) {
642                 return NXT_ERROR;
643             }
644         }
645 
646         break;
647     } while (1);
648 
649     buf->mem.pos = nxt_app_msg_read_length(buf->mem.pos, size);
650 
651     return NXT_OK;
652 }
653 
654 
655 nxt_int_t
656 nxt_app_msg_read_str(nxt_task_t *task, nxt_app_rmsg_t *msg, nxt_str_t *str)
657 {
658     size_t      length;
659     nxt_int_t   ret;
660     nxt_buf_t  *buf;
661 
662     ret = nxt_app_msg_read_size_(task, msg, &length);
663     if (ret != NXT_OK) {
664         return ret;
665     }
666 
667     buf = msg->buf;
668 
669     if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) < (intptr_t) length)) {
670         return NXT_ERROR;
671     }
672 
673     if (length > 0) {
674         str->start = buf->mem.pos;
675         str->length = length - 1;
676 
677         buf->mem.pos += length;
678 
679         nxt_debug(task, "nxt_read_str: %uz %*s", length - 1,
680                         length - 1, str->start);
681 
682     } else {
683         str->start = NULL;
684         str->length = 0;
685 
686         nxt_debug(task, "nxt_read_str: NULL");
687     }
688 
689     return NXT_OK;
690 }
691 
692 
693 size_t
694 nxt_app_msg_read_raw(nxt_task_t *task, nxt_app_rmsg_t *msg, void *dst,
695     size_t size)
696 {
697     size_t     res, read_size;
698     nxt_buf_t  *buf;
699 
700     res = 0;
701 
702     while (size > 0) {
703         buf = msg->buf;
704 
705         if (nxt_slow_path(buf == NULL)) {
706             break;
707         }
708 
709         if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) == 0)) {
710             msg->buf = buf->next;
711             continue;
712         }
713 
714         read_size = nxt_buf_mem_used_size(&buf->mem);
715         read_size = nxt_min(read_size, size);
716 
717         dst = nxt_cpymem(dst, buf->mem.pos, read_size);
718 
719         size -= read_size;
720         buf->mem.pos += read_size;
721         res += read_size;
722     }
723 
724     nxt_debug(task, "nxt_read_raw: %uz", res);
725 
726     return res;
727 }
728 
729 
730 nxt_int_t
731 nxt_app_msg_read_nvp(nxt_task_t *task, nxt_app_rmsg_t *rmsg, nxt_str_t *n,
732     nxt_str_t *v)
733 {
734     nxt_int_t rc;
735 
736     rc = nxt_app_msg_read_str(task, rmsg, n);
737     if (nxt_slow_path(rc != NXT_OK)) {
738         return rc;
739     }
740 
741     rc = nxt_app_msg_read_str(task, rmsg, v);
742     if (nxt_slow_path(rc != NXT_OK)) {
743         return rc;
744     }
745 
746     return rc;
747 }
748 
749 
750 nxt_int_t
751 nxt_app_msg_read_size(nxt_task_t *task, nxt_app_rmsg_t *msg, size_t *size)
752 {
753     nxt_int_t  ret;
754 
755     ret = nxt_app_msg_read_size_(task, msg, size);
756 
757     nxt_debug(task, "nxt_read_size: %d", (int) *size);
758 
759     return ret;
760 }
761 
762 
763 nxt_int_t
764 nxt_app_http_req_done(nxt_task_t *task, nxt_app_parse_ctx_t *ar)
765 {
766     ar->timer.handler = nxt_app_http_release;
767     nxt_timer_add(task->thread->engine, &ar->timer, 0);
768 
769     return NXT_OK;
770 }
771 
772 
773 static void
774 nxt_app_http_release(nxt_task_t *task, void *obj, void *data)
775 {
776     nxt_timer_t          *timer;
777     nxt_app_parse_ctx_t  *ar;
778 
779     timer = obj;
780 
781     nxt_debug(task, "http app release");
782 
783     ar = nxt_timer_data(timer, nxt_app_parse_ctx_t, timer);
784 
785     nxt_mp_release(ar->request->mem_pool);
786 }
787 
788 
789 nxt_int_t
790 nxt_app_msg_flush(nxt_task_t *task, nxt_app_wmsg_t *msg, nxt_bool_t last)
791 {
792     nxt_int_t   rc;
793     nxt_buf_t   *b;
794 
795     rc = NXT_OK;
796 
797     if (nxt_slow_path(last == 1)) {
798         do {
799             b = *msg->buf;
800 
801             if (b == NULL) {
802                 b = nxt_buf_sync_alloc(msg->port->mem_pool, NXT_BUF_SYNC_LAST);
803                 *msg->buf = b;
804                 break;
805             }
806 
807             msg->buf = &b->next;
808         } while(1);
809     }
810 
811     if (nxt_slow_path(msg->write != NULL)) {
812         rc = nxt_port_socket_write(task, msg->port, NXT_PORT_MSG_DATA,
813                                    -1, msg->stream, 0, msg->write);
814 
815         msg->write = NULL;
816         msg->buf = &msg->write;
817     }
818 
819     return rc;
820 }
821 
822 
823 nxt_int_t
824 nxt_app_msg_write_raw(nxt_task_t *task, nxt_app_wmsg_t *msg, const u_char *c,
825     size_t size)
826 {
827     size_t      free_size, copy_size;
828     nxt_buf_t   *b;
829 
830     nxt_debug(task, "nxt_app_msg_write_raw: %uz", size);
831 
832     while (size > 0) {
833         b = *msg->buf;
834 
835         if (b == NULL) {
836             b = nxt_port_mmap_get_buf(task, msg->port, size);
837             if (nxt_slow_path(b == NULL)) {
838                 return NXT_ERROR;
839             }
840 
841             *msg->buf = b;
842         }
843 
844         do {
845             free_size = nxt_buf_mem_free_size(&b->mem);
846 
847             if (free_size > 0) {
848                 copy_size = nxt_min(free_size, size);
849 
850                 b->mem.free = nxt_cpymem(b->mem.free, c, copy_size);
851 
852                 size -= copy_size;
853                 c += copy_size;
854 
855                 if (size == 0) {
856                     return NXT_OK;
857                 }
858             }
859         } while (nxt_port_mmap_increase_buf(task, b, size, 1) == NXT_OK);
860 
861         msg->buf = &b->next;
862     }
863 
864     return NXT_OK;
865 }
866 
867 
868 nxt_app_lang_module_t *
869 nxt_app_lang_module(nxt_runtime_t *rt, nxt_str_t *name)
870 {
871     u_char                 *p, *end, *version;
872     size_t                 version_length;
873     nxt_uint_t             i, n;
874     nxt_app_type_t         type;
875     nxt_app_lang_module_t  *lang;
876 
877     end = name->start + name->length;
878     version = end;
879 
880     for (p = name->start; p < end; p++) {
881         if (*p == ' ') {
882             version = p + 1;
883             break;
884         }
885 
886         if (*p >= '0' && *p <= '9') {
887             version = p;
888             break;
889         }
890     }
891 
892     type = nxt_app_parse_type(name->start, p - name->start);
893 
894     if (type == NXT_APP_UNKNOWN) {
895         return NULL;
896     }
897 
898     version_length = end - version;
899 
900     lang = rt->languages->elts;
901     n = rt->languages->nelts;
902 
903     for (i = 0; i < n; i++) {
904 
905         /*
906          * Versions are sorted in descending order
907          * so first match chooses the highest version.
908          */
909 
910         if (lang[i].type == type
911             && nxt_strvers_match(lang[i].version, version, version_length))
912         {
913             return &lang[i];
914         }
915     }
916 
917     return NULL;
918 }
919 
920 
921 nxt_app_type_t
922 nxt_app_parse_type(u_char *p, size_t length)
923 {
924     nxt_str_t str;
925 
926     str.length = length;
927     str.start = p;
928 
929     if (nxt_str_eq(&str, "python", 6)) {
930         return NXT_APP_PYTHON;
931 
932     } else if (nxt_str_eq(&str, "php", 3)) {
933         return NXT_APP_PHP;
934 
935     } else if (nxt_str_eq(&str, "go", 2)) {
936         return NXT_APP_GO;
937 
938     } else if (nxt_str_eq(&str, "perl", 4)) {
939         return NXT_APP_PERL;
940 
941     } else if (nxt_str_eq(&str, "ruby", 4)) {
942         return NXT_APP_RUBY;
943     }
944 
945     return NXT_APP_UNKNOWN;
946 }
947