xref: /unit/src/nxt_main_process.c (revision 259:9cf0e151e752)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 #include <nxt_runtime.h>
9 #include <nxt_port.h>
10 #include <nxt_main_process.h>
11 #include <nxt_conf.h>
12 #include <nxt_application.h>
13 
14 
15 typedef struct {
16     nxt_socket_t        socket;
17     nxt_socket_error_t  error;
18     u_char              *start;
19     u_char              *end;
20 } nxt_listening_socket_t;
21 
22 
23 static nxt_int_t nxt_main_process_port_create(nxt_task_t *task,
24     nxt_runtime_t *rt);
25 static void nxt_main_process_title(nxt_task_t *task);
26 static nxt_int_t nxt_main_start_controller_process(nxt_task_t *task,
27     nxt_runtime_t *rt);
28 static nxt_int_t nxt_main_start_router_process(nxt_task_t *task,
29     nxt_runtime_t *rt);
30 static nxt_int_t nxt_main_start_discovery_process(nxt_task_t *task,
31     nxt_runtime_t *rt);
32 static nxt_int_t nxt_main_start_worker_process(nxt_task_t *task,
33     nxt_runtime_t *rt, nxt_common_app_conf_t *app_conf, uint32_t stream);
34 static nxt_int_t nxt_main_create_worker_process(nxt_task_t *task,
35     nxt_runtime_t *rt, nxt_process_init_t *init);
36 static void nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj,
37     void *data);
38 static void nxt_main_process_sigquit_handler(nxt_task_t *task, void *obj,
39     void *data);
40 static void nxt_main_process_sigusr1_handler(nxt_task_t *task, void *obj,
41     void *data);
42 static void nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj,
43     void *data);
44 static void nxt_main_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid);
45 static void nxt_main_port_socket_handler(nxt_task_t *task,
46     nxt_port_recv_msg_t *msg);
47 static nxt_int_t nxt_main_listening_socket(nxt_sockaddr_t *sa,
48     nxt_listening_socket_t *ls);
49 static void nxt_main_port_modules_handler(nxt_task_t *task,
50     nxt_port_recv_msg_t *msg);
51 static int nxt_cdecl nxt_app_lang_compare(const void *v1, const void *v2);
52 
53 
54 const nxt_sig_event_t  nxt_main_process_signals[] = {
55     nxt_event_signal(SIGINT,  nxt_main_process_sigterm_handler),
56     nxt_event_signal(SIGQUIT, nxt_main_process_sigquit_handler),
57     nxt_event_signal(SIGTERM, nxt_main_process_sigterm_handler),
58     nxt_event_signal(SIGCHLD, nxt_main_process_sigchld_handler),
59     nxt_event_signal(SIGUSR1, nxt_main_process_sigusr1_handler),
60     nxt_event_signal_end,
61 };
62 
63 
64 static nxt_bool_t  nxt_exiting;
65 
66 
67 nxt_int_t
68 nxt_main_process_start(nxt_thread_t *thr, nxt_task_t *task,
69     nxt_runtime_t *rt)
70 {
71     rt->types |= (1U << NXT_PROCESS_MAIN);
72 
73     if (nxt_main_process_port_create(task, rt) != NXT_OK) {
74         return NXT_ERROR;
75     }
76 
77     nxt_main_process_title(task);
78 
79     /*
80      * The dicsovery process will send a message processed by
81      * nxt_main_port_modules_handler() which starts the controller
82      * and router processes.
83      */
84     return nxt_main_start_discovery_process(task, rt);
85 }
86 
87 
88 static nxt_conf_map_t  nxt_common_app_conf[] = {
89     {
90         nxt_string("type"),
91         NXT_CONF_MAP_STR,
92         offsetof(nxt_common_app_conf_t, type),
93     },
94 
95     {
96         nxt_string("user"),
97         NXT_CONF_MAP_STR,
98         offsetof(nxt_common_app_conf_t, user),
99     },
100 
101     {
102         nxt_string("group"),
103         NXT_CONF_MAP_STR,
104         offsetof(nxt_common_app_conf_t, group),
105     },
106 
107     {
108         nxt_string("workers"),
109         NXT_CONF_MAP_INT32,
110         offsetof(nxt_common_app_conf_t, workers),
111     },
112 
113     {
114         nxt_string("path"),
115         NXT_CONF_MAP_STR,
116         offsetof(nxt_common_app_conf_t, u.python.path),
117     },
118 
119     {
120         nxt_string("module"),
121         NXT_CONF_MAP_STR,
122         offsetof(nxt_common_app_conf_t, u.python.module),
123     },
124 
125     {
126         nxt_string("root"),
127         NXT_CONF_MAP_STR,
128         offsetof(nxt_common_app_conf_t, u.php.root),
129     },
130 
131     {
132         nxt_string("script"),
133         NXT_CONF_MAP_STR,
134         offsetof(nxt_common_app_conf_t, u.php.script),
135     },
136 
137     {
138         nxt_string("index"),
139         NXT_CONF_MAP_STR,
140         offsetof(nxt_common_app_conf_t, u.php.index),
141     },
142 
143     {
144         nxt_string("executable"),
145         NXT_CONF_MAP_STR,
146         offsetof(nxt_common_app_conf_t, u.go.executable),
147     },
148 };
149 
150 
151 static void
152 nxt_port_main_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
153 {
154     nxt_debug(task, "main data: %*s",
155               nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos);
156 }
157 
158 
159 static void
160 nxt_port_main_start_worker_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
161 {
162     u_char                 *start;
163     nxt_mp_t               *mp;
164     nxt_int_t              ret;
165     nxt_buf_t              *b;
166     nxt_conf_value_t       *conf;
167     nxt_common_app_conf_t  app_conf;
168 
169     static nxt_str_t nobody = nxt_string("nobody");
170 
171     b = msg->buf;
172 
173     nxt_debug(task, "main start worker: %*s", b->mem.free - b->mem.pos,
174               b->mem.pos);
175 
176     mp = nxt_mp_create(1024, 128, 256, 32);
177 
178     nxt_memzero(&app_conf, sizeof(nxt_common_app_conf_t));
179 
180     start = b->mem.pos;
181 
182     app_conf.name.start = start;
183     app_conf.name.length = nxt_strlen(start);
184 
185     start += app_conf.name.length + 1;
186 
187     conf = nxt_conf_json_parse(mp, start, b->mem.free, NULL);
188 
189     if (conf == NULL) {
190         nxt_log(task, NXT_LOG_CRIT, "configuration parsing error");
191         return;
192     }
193 
194     app_conf.user = nobody;
195 
196     ret = nxt_conf_map_object(mp, conf, nxt_common_app_conf,
197                               nxt_nitems(nxt_common_app_conf), &app_conf);
198     if (ret != NXT_OK) {
199         nxt_log(task, NXT_LOG_CRIT, "root map error");
200         return;
201     }
202 
203     ret = nxt_main_start_worker_process(task, task->thread->runtime,
204                                         &app_conf, msg->port_msg.stream);
205 
206     nxt_mp_destroy(mp);
207 }
208 
209 
210 static nxt_port_handler_t  nxt_main_process_port_handlers[] = {
211     NULL, /* NXT_PORT_MSG_QUIT         */
212     NULL, /* NXT_PORT_MSG_NEW_PORT     */
213     NULL, /* NXT_PORT_MSG_CHANGE_FILE  */
214     NULL, /* NXT_PORT_MSG_MMAP         */
215     nxt_port_main_data_handler,
216     NULL, /* NXT_PORT_MSG_REMOVE_PID   */
217     nxt_port_ready_handler,
218     nxt_port_main_start_worker_handler,
219     nxt_main_port_socket_handler,
220     nxt_main_port_modules_handler,
221     nxt_port_rpc_handler,
222     nxt_port_rpc_handler,
223 };
224 
225 
226 static nxt_int_t
227 nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt)
228 {
229     nxt_int_t      ret;
230     nxt_port_t     *port;
231     nxt_process_t  *process;
232 
233     process = nxt_runtime_process_get(rt, nxt_pid);
234     if (nxt_slow_path(process == NULL)) {
235         return NXT_ERROR;
236     }
237 
238     port = nxt_port_new(task, 0, nxt_pid, NXT_PROCESS_MAIN);
239     if (nxt_slow_path(port == NULL)) {
240         return NXT_ERROR;
241     }
242 
243     ret = nxt_port_socket_init(task, port, 0);
244     if (nxt_slow_path(ret != NXT_OK)) {
245         return ret;
246     }
247 
248     nxt_process_port_add(task, process, port);
249 
250     nxt_runtime_port_add(rt, port);
251 
252     /*
253      * A main process port.  A write port is not closed
254      * since it should be inherited by worker processes.
255      */
256     nxt_port_enable(task, port, nxt_main_process_port_handlers);
257 
258     process->ready = 1;
259 
260     return NXT_OK;
261 }
262 
263 
264 static void
265 nxt_main_process_title(nxt_task_t *task)
266 {
267     u_char      *p, *end;
268     nxt_uint_t  i;
269     u_char      title[2048];
270 
271     end = title + sizeof(title) - 1;
272 
273     p = nxt_sprintf(title, end, "unit: main [%s", nxt_process_argv[0]);
274 
275     for (i = 1; nxt_process_argv[i] != NULL; i++) {
276         p = nxt_sprintf(p, end, " %s", nxt_process_argv[i]);
277     }
278 
279     if (p < end) {
280         *p++ = ']';
281     }
282 
283     *p = '\0';
284 
285     nxt_process_title(task, "%s", title);
286 }
287 
288 
289 static nxt_int_t
290 nxt_main_start_controller_process(nxt_task_t *task, nxt_runtime_t *rt)
291 {
292     nxt_process_init_t  *init;
293 
294     init = nxt_malloc(sizeof(nxt_process_init_t));
295     if (nxt_slow_path(init == NULL)) {
296         return NXT_ERROR;
297     }
298 
299     init->start = nxt_controller_start;
300     init->name = "controller";
301     init->user_cred = &rt->user_cred;
302     init->port_handlers = nxt_controller_process_port_handlers;
303     init->signals = nxt_worker_process_signals;
304     init->type = NXT_PROCESS_CONTROLLER;
305     init->data = rt;
306     init->stream = 0;
307     init->restart = 1;
308 
309     return nxt_main_create_worker_process(task, rt, init);
310 }
311 
312 
313 static nxt_int_t
314 nxt_main_start_discovery_process(nxt_task_t *task, nxt_runtime_t *rt)
315 {
316     nxt_process_init_t  *init;
317 
318     init = nxt_malloc(sizeof(nxt_process_init_t));
319     if (nxt_slow_path(init == NULL)) {
320         return NXT_ERROR;
321     }
322 
323     init->start = nxt_discovery_start;
324     init->name = "discovery";
325     init->user_cred = &rt->user_cred;
326     init->port_handlers = nxt_discovery_process_port_handlers;
327     init->signals = nxt_worker_process_signals;
328     init->type = NXT_PROCESS_DISCOVERY;
329     init->data = rt;
330     init->stream = 0;
331     init->restart = 0;
332 
333     return nxt_main_create_worker_process(task, rt, init);
334 }
335 
336 
337 static nxt_int_t
338 nxt_main_start_router_process(nxt_task_t *task, nxt_runtime_t *rt)
339 {
340     nxt_process_init_t  *init;
341 
342     init = nxt_malloc(sizeof(nxt_process_init_t));
343     if (nxt_slow_path(init == NULL)) {
344         return NXT_ERROR;
345     }
346 
347     init->start = nxt_router_start;
348     init->name = "router";
349     init->user_cred = &rt->user_cred;
350     init->port_handlers = nxt_router_process_port_handlers;
351     init->signals = nxt_worker_process_signals;
352     init->type = NXT_PROCESS_ROUTER;
353     init->data = rt;
354     init->stream = 0;
355     init->restart = 1;
356 
357     return nxt_main_create_worker_process(task, rt, init);
358 }
359 
360 
361 static nxt_int_t
362 nxt_main_start_worker_process(nxt_task_t *task, nxt_runtime_t *rt,
363     nxt_common_app_conf_t *app_conf, uint32_t stream)
364 {
365     char                *user, *group;
366     u_char              *title, *last, *end;
367     size_t              size;
368     nxt_process_init_t  *init;
369 
370     size = sizeof(nxt_process_init_t)
371            + sizeof(nxt_user_cred_t)
372            + app_conf->user.length + 1
373            + app_conf->group.length + 1
374            + app_conf->name.length + sizeof("\"\" application");
375 
376     init = nxt_malloc(size);
377     if (nxt_slow_path(init == NULL)) {
378         return NXT_ERROR;
379     }
380 
381     init->user_cred = nxt_pointer_to(init, sizeof(nxt_process_init_t));
382     user = nxt_pointer_to(init->user_cred, sizeof(nxt_user_cred_t));
383 
384     nxt_memcpy(user, app_conf->user.start, app_conf->user.length);
385     last = nxt_pointer_to(user, app_conf->user.length);
386     *last++ = '\0';
387 
388     init->user_cred->user = user;
389 
390     if (app_conf->group.start != NULL) {
391         group = (char *) last;
392 
393         nxt_memcpy(group, app_conf->group.start, app_conf->group.length);
394         last = nxt_pointer_to(group, app_conf->group.length);
395         *last++ = '\0';
396 
397     } else {
398         group = NULL;
399     }
400 
401     if (nxt_user_cred_get(task, init->user_cred, group) != NXT_OK) {
402         return NXT_ERROR;
403     }
404 
405     title = last;
406     end = title + app_conf->name.length + sizeof("\"\" application");
407 
408     nxt_sprintf(title, end, "\"%V\" application%Z", &app_conf->name);
409 
410     init->start = nxt_app_start;
411     init->name = (char *) title;
412     init->port_handlers = nxt_app_process_port_handlers;
413     init->signals = nxt_worker_process_signals;
414     init->type = NXT_PROCESS_WORKER;
415     init->data = app_conf;
416     init->stream = stream;
417     init->restart = 0;
418 
419     return nxt_main_create_worker_process(task, rt, init);
420 }
421 
422 
423 static nxt_int_t
424 nxt_main_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt,
425     nxt_process_init_t *init)
426 {
427     nxt_int_t      ret;
428     nxt_pid_t      pid;
429     nxt_port_t     *port;
430     nxt_process_t  *process;
431 
432     /*
433      * TODO: remove process, init, ports from array on memory and fork failures.
434      */
435 
436     process = nxt_runtime_process_new(rt);
437     if (nxt_slow_path(process == NULL)) {
438         return NXT_ERROR;
439     }
440 
441     process->init = init;
442 
443     port = nxt_port_new(task, 0, 0, init->type);
444     if (nxt_slow_path(port == NULL)) {
445         nxt_runtime_process_remove(rt, process);
446         return NXT_ERROR;
447     }
448 
449     nxt_process_port_add(task, process, port);
450 
451     ret = nxt_port_socket_init(task, port, 0);
452     if (nxt_slow_path(ret != NXT_OK)) {
453         nxt_mp_release(port->mem_pool, port);
454         return ret;
455     }
456 
457     pid = nxt_process_create(task, process);
458 
459     switch (pid) {
460 
461     case -1:
462         return NXT_ERROR;
463 
464     case 0:
465         /* A worker process, return to the event engine work queue loop. */
466         return NXT_AGAIN;
467 
468     default:
469         /* The main process created a new process. */
470 
471         nxt_port_read_close(port);
472         nxt_port_write_enable(task, port);
473 
474         return NXT_OK;
475     }
476 }
477 
478 
479 void
480 nxt_main_stop_worker_processes(nxt_task_t *task, nxt_runtime_t *rt)
481 {
482     nxt_port_t     *port;
483     nxt_process_t  *process;
484 
485     nxt_runtime_process_each(rt, process)
486     {
487         if (nxt_pid != process->pid) {
488             process->init = NULL;
489 
490             nxt_process_port_each(process, port) {
491 
492                 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT,
493                                              -1, 0, 0, NULL);
494 
495             } nxt_process_port_loop;
496         }
497     }
498     nxt_runtime_process_loop;
499 }
500 
501 
502 
503 static void
504 nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj, void *data)
505 {
506     nxt_debug(task, "sigterm handler signo:%d (%s)",
507               (int) (uintptr_t) obj, data);
508 
509     /* TODO: fast exit. */
510 
511     nxt_exiting = 1;
512 
513     nxt_runtime_quit(task);
514 }
515 
516 
517 static void
518 nxt_main_process_sigquit_handler(nxt_task_t *task, void *obj, void *data)
519 {
520     nxt_debug(task, "sigquit handler signo:%d (%s)",
521               (int) (uintptr_t) obj, data);
522 
523     /* TODO: graceful exit. */
524 
525     nxt_exiting = 1;
526 
527     nxt_runtime_quit(task);
528 }
529 
530 
531 static void
532 nxt_main_process_sigusr1_handler(nxt_task_t *task, void *obj, void *data)
533 {
534     nxt_mp_t        *mp;
535     nxt_int_t       ret;
536     nxt_uint_t      n;
537     nxt_file_t      *file, *new_file;
538     nxt_runtime_t   *rt;
539     nxt_array_t     *new_files;
540 
541     nxt_log(task, NXT_LOG_NOTICE, "signal %d (%s) recevied, %s",
542             (int) (uintptr_t) obj, data, "log files rotation");
543 
544     mp = nxt_mp_create(1024, 128, 256, 32);
545     if (mp == NULL) {
546         return;
547     }
548 
549     rt = task->thread->runtime;
550 
551     n = nxt_list_nelts(rt->log_files);
552 
553     new_files = nxt_array_create(mp, n, sizeof(nxt_file_t));
554     if (new_files == NULL) {
555         nxt_mp_destroy(mp);
556         return;
557     }
558 
559     nxt_list_each(file, rt->log_files) {
560 
561         /* This allocation cannot fail. */
562         new_file = nxt_array_add(new_files);
563 
564         new_file->name = file->name;
565         new_file->fd = NXT_FILE_INVALID;
566         new_file->log_level = NXT_LOG_CRIT;
567 
568         ret = nxt_file_open(task, new_file, O_WRONLY | O_APPEND, O_CREAT,
569                             NXT_FILE_OWNER_ACCESS);
570 
571         if (ret != NXT_OK) {
572             goto fail;
573         }
574 
575     } nxt_list_loop;
576 
577     new_file = new_files->elts;
578 
579     ret = nxt_file_stderr(&new_file[0]);
580 
581     if (ret == NXT_OK) {
582         n = 0;
583 
584         nxt_list_each(file, rt->log_files) {
585 
586             nxt_port_change_log_file(task, rt, n, new_file[n].fd);
587             /*
588              * The old log file descriptor must be closed at the moment
589              * when no other threads use it.  dup2() allows to use the
590              * old file descriptor for new log file.  This change is
591              * performed atomically in the kernel.
592              */
593             (void) nxt_file_redirect(file, new_file[n].fd);
594 
595             n++;
596 
597         } nxt_list_loop;
598 
599         nxt_mp_destroy(mp);
600         return;
601    }
602 
603 fail:
604 
605     new_file = new_files->elts;
606     n = new_files->nelts;
607 
608     while (n != 0) {
609         if (new_file->fd != NXT_FILE_INVALID) {
610             nxt_file_close(task, new_file);
611         }
612 
613         new_file++;
614         n--;
615     }
616 
617     nxt_mp_destroy(mp);
618 }
619 
620 
621 static void
622 nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data)
623 {
624     int                    status;
625     nxt_err_t              err;
626     nxt_pid_t              pid;
627 
628     nxt_debug(task, "sigchld handler signo:%d (%s)",
629               (int) (uintptr_t) obj, data);
630 
631     for ( ;; ) {
632         pid = waitpid(-1, &status, WNOHANG);
633 
634         if (pid == -1) {
635 
636             switch (err = nxt_errno) {
637 
638             case NXT_ECHILD:
639                 return;
640 
641             case NXT_EINTR:
642                 continue;
643 
644             default:
645                 nxt_log(task, NXT_LOG_CRIT, "waitpid() failed: %E", err);
646                 return;
647             }
648         }
649 
650         nxt_debug(task, "waitpid(): %PI", pid);
651 
652         if (pid == 0) {
653             return;
654         }
655 
656         if (WTERMSIG(status)) {
657 #ifdef WCOREDUMP
658             nxt_log(task, NXT_LOG_CRIT, "process %PI exited on signal %d%s",
659                     pid, WTERMSIG(status),
660                     WCOREDUMP(status) ? " (core dumped)" : "");
661 #else
662             nxt_log(task, NXT_LOG_CRIT, "process %PI exited on signal %d",
663                     pid, WTERMSIG(status));
664 #endif
665 
666         } else {
667             nxt_trace(task, "process %PI exited with code %d",
668                       pid, WEXITSTATUS(status));
669         }
670 
671         nxt_main_cleanup_worker_process(task, pid);
672     }
673 }
674 
675 
676 static void
677 nxt_main_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid)
678 {
679     nxt_buf_t           *buf;
680     nxt_port_t          *port;
681     nxt_runtime_t       *rt;
682     nxt_process_t       *process;
683     nxt_process_init_t  *init;
684 
685     rt = task->thread->runtime;
686 
687     process = nxt_runtime_process_find(rt, pid);
688 
689     if (process) {
690         init = process->init;
691 
692         nxt_runtime_process_remove(rt, process);
693 
694         if (!nxt_exiting) {
695             nxt_runtime_process_each(rt, process)
696             {
697                 if (process->pid == nxt_pid ||
698                     process->pid == pid ||
699                     nxt_queue_is_empty(&process->ports)) {
700                     continue;
701                 }
702 
703                 port = nxt_process_port_first(process);
704 
705                 buf = nxt_buf_mem_alloc(port->mem_pool, sizeof(pid), 0);
706                 buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(pid));
707 
708                 nxt_port_socket_write(task, port, NXT_PORT_MSG_REMOVE_PID,
709                                       -1, init->stream, 0, buf);
710             }
711             nxt_runtime_process_loop;
712         }
713 
714         if (nxt_exiting) {
715 
716             if (rt->nprocesses == 2) {
717                 nxt_runtime_quit(task);
718             }
719 
720         } else if (init != NULL) {
721             if (init->restart != 0) {
722                 (void) nxt_main_create_worker_process(task, rt, init);
723 
724             } else {
725                 nxt_free(init);
726             }
727         }
728     }
729 }
730 
731 
732 static void
733 nxt_main_port_socket_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
734 {
735     size_t                  size;
736     nxt_int_t               ret;
737     nxt_buf_t               *b, *out;
738     nxt_port_t              *port;
739     nxt_sockaddr_t          *sa;
740     nxt_port_msg_type_t     type;
741     nxt_listening_socket_t  ls;
742     u_char                  message[2048];
743 
744     b = msg->buf;
745     sa = (nxt_sockaddr_t *) b->mem.pos;
746 
747     out = NULL;
748 
749     ls.socket = -1;
750     ls.error = NXT_SOCKET_ERROR_SYSTEM;
751     ls.start = message;
752     ls.end = message + sizeof(message);
753 
754     port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
755                                  msg->port_msg.reply_port);
756 
757     nxt_debug(task, "listening socket \"%*s\"",
758               sa->length, nxt_sockaddr_start(sa));
759 
760     ret = nxt_main_listening_socket(sa, &ls);
761 
762     if (ret == NXT_OK) {
763         nxt_debug(task, "socket(\"%*s\"): %d",
764                   sa->length, nxt_sockaddr_start(sa), ls.socket);
765 
766         type = NXT_PORT_MSG_RPC_READY_LAST | NXT_PORT_MSG_CLOSE_FD;
767 
768     } else {
769         size = ls.end - ls.start;
770 
771         nxt_log(task, NXT_LOG_CRIT, "%*s", size, ls.start);
772 
773         out = nxt_buf_mem_alloc(port->mem_pool, size + 1, 0);
774         if (nxt_slow_path(out == NULL)) {
775             return;
776         }
777 
778         *out->mem.free++ = (uint8_t) ls.error;
779 
780         out->mem.free = nxt_cpymem(out->mem.free, ls.start, size);
781 
782         type = NXT_PORT_MSG_RPC_ERROR;
783     }
784 
785     nxt_port_socket_write(task, port, type, ls.socket, msg->port_msg.stream,
786                           0, out);
787 }
788 
789 
790 static nxt_int_t
791 nxt_main_listening_socket(nxt_sockaddr_t *sa, nxt_listening_socket_t *ls)
792 {
793     nxt_err_t         err;
794     nxt_socket_t      s;
795 
796     const socklen_t   length = sizeof(int);
797     static const int  enable = 1;
798 
799     s = socket(sa->u.sockaddr.sa_family, sa->type, 0);
800 
801     if (nxt_slow_path(s == -1)) {
802         err = nxt_errno;
803 
804 #if (NXT_INET6)
805 
806         if (err == EAFNOSUPPORT && sa->u.sockaddr.sa_family == AF_INET6) {
807             ls->error = NXT_SOCKET_ERROR_NOINET6;
808         }
809 
810 #endif
811 
812         ls->end = nxt_sprintf(ls->start, ls->end,
813                               "socket(\\\"%*s\\\") failed %E",
814                               sa->length, nxt_sockaddr_start(sa), err);
815 
816         return NXT_ERROR;
817     }
818 
819     if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &enable, length) != 0) {
820         ls->end = nxt_sprintf(ls->start, ls->end,
821                               "setsockopt(\\\"%*s\\\", SO_REUSEADDR) failed %E",
822                               sa->length, nxt_sockaddr_start(sa), nxt_errno);
823         goto fail;
824     }
825 
826 #if (NXT_INET6)
827 
828     if (sa->u.sockaddr.sa_family == AF_INET6) {
829 
830         if (setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &enable, length) != 0) {
831             ls->end = nxt_sprintf(ls->start, ls->end,
832                                "setsockopt(\\\"%*s\\\", IPV6_V6ONLY) failed %E",
833                                sa->length, nxt_sockaddr_start(sa), nxt_errno);
834             goto fail;
835         }
836     }
837 
838 #endif
839 
840     if (bind(s, &sa->u.sockaddr, sa->socklen) != 0) {
841         err = nxt_errno;
842 
843 #if (NXT_HAVE_UNIX_DOMAIN)
844 
845         if (sa->u.sockaddr.sa_family == AF_UNIX) {
846             switch (err) {
847 
848             case EACCES:
849                 ls->error = NXT_SOCKET_ERROR_ACCESS;
850                 break;
851 
852             case ENOENT:
853             case ENOTDIR:
854                 ls->error = NXT_SOCKET_ERROR_PATH;
855                 break;
856             }
857 
858             goto next;
859         }
860 
861 #endif
862 
863         switch (err) {
864 
865         case EACCES:
866             ls->error = NXT_SOCKET_ERROR_PORT;
867             break;
868 
869         case EADDRINUSE:
870             ls->error = NXT_SOCKET_ERROR_INUSE;
871             break;
872 
873         case EADDRNOTAVAIL:
874             ls->error = NXT_SOCKET_ERROR_NOADDR;
875             break;
876         }
877 
878         ls->end = nxt_sprintf(ls->start, ls->end, "bind(\\\"%*s\\\") failed %E",
879                               sa->length, nxt_sockaddr_start(sa), err);
880         goto fail;
881     }
882 
883 #if (NXT_HAVE_UNIX_DOMAIN)
884 
885 next:
886 
887     if (sa->u.sockaddr.sa_family == AF_UNIX) {
888         char     *filename;
889         mode_t   access;
890 
891         filename = sa->u.sockaddr_un.sun_path;
892         access = (S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
893 
894         if (chmod(filename, access) != 0) {
895             ls->end = nxt_sprintf(ls->start, ls->end,
896                                   "chmod(\\\"%*s\\\") failed %E",
897                                   filename, nxt_errno);
898             goto fail;
899         }
900     }
901 
902 #endif
903 
904     ls->socket = s;
905 
906     return NXT_OK;
907 
908 fail:
909 
910     (void) close(s);
911 
912     return NXT_ERROR;
913 }
914 
915 
916 static nxt_conf_map_t  nxt_app_lang_module_map[] = {
917     {
918         nxt_string("type"),
919         NXT_CONF_MAP_STR_COPY,
920         offsetof(nxt_app_lang_module_t, type),
921     },
922 
923     {
924         nxt_string("version"),
925         NXT_CONF_MAP_STR_COPY,
926         offsetof(nxt_app_lang_module_t, version),
927     },
928 
929     {
930         nxt_string("file"),
931         NXT_CONF_MAP_CSTRZ,
932         offsetof(nxt_app_lang_module_t, file),
933     },
934 };
935 
936 
937 static void
938 nxt_main_port_modules_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
939 {
940     uint32_t               index;
941     nxt_mp_t               *mp;
942     nxt_int_t              ret;
943     nxt_buf_t              *b;
944     nxt_runtime_t          *rt;
945     nxt_conf_value_t       *conf, *root, *value;
946     nxt_app_lang_module_t  *lang;
947 
948     static nxt_str_t   root_path = nxt_string("/");
949 
950     rt = task->thread->runtime;
951 
952     if (msg->port_msg.pid != rt->port_by_type[NXT_PROCESS_DISCOVERY]->pid) {
953         return;
954     }
955 
956     b = msg->buf;
957 
958     if (b == NULL) {
959         return;
960     }
961 
962     nxt_debug(task, "application languages: \"%*s\"",
963               b->mem.free - b->mem.pos, b->mem.pos);
964 
965     mp = nxt_mp_create(1024, 128, 256, 32);
966     if (mp == NULL) {
967         return;
968     }
969 
970     conf = nxt_conf_json_parse(mp, b->mem.pos, b->mem.free, NULL);
971     if (conf == NULL) {
972         goto fail;
973     }
974 
975     root = nxt_conf_get_path(conf, &root_path);
976     if (root == NULL) {
977         goto fail;
978     }
979 
980     for (index = 0; /* void */ ; index++) {
981         value = nxt_conf_get_array_element(root, index);
982         if (value == NULL) {
983             break;
984         }
985 
986         lang = nxt_array_add(rt->languages);
987         if (lang == NULL) {
988             goto fail;
989         }
990 
991         lang->module = NULL;
992 
993         ret = nxt_conf_map_object(rt->mem_pool, value, nxt_app_lang_module_map,
994                                   nxt_nitems(nxt_app_lang_module_map), lang);
995 
996         if (ret != NXT_OK) {
997             goto fail;
998         }
999 
1000         nxt_debug(task, "lang %V %V \"%s\"",
1001                   &lang->type, &lang->version, lang->file);
1002     }
1003 
1004     qsort(rt->languages->elts, rt->languages->nelts,
1005           sizeof(nxt_app_lang_module_t), nxt_app_lang_compare);
1006 
1007 fail:
1008 
1009     nxt_mp_destroy(mp);
1010 
1011     ret = nxt_main_start_controller_process(task, rt);
1012 
1013     if (ret == NXT_OK) {
1014         (void) nxt_main_start_router_process(task, rt);
1015     }
1016 }
1017 
1018 
1019 static int nxt_cdecl
1020 nxt_app_lang_compare(const void *v1, const void *v2)
1021 {
1022     int                          n;
1023     size_t                       length;
1024     const nxt_app_lang_module_t  *lang1, *lang2;
1025 
1026     lang1 = v1;
1027     lang2 = v2;
1028 
1029     if (lang1->type.length != lang2->type.length) {
1030         return lang1->type.length - lang2->type.length;
1031     }
1032 
1033     n = nxt_strncmp(lang1->type.start, lang2->type.start, lang1->type.length);
1034 
1035     if (n != 0) {
1036         return n;
1037     }
1038 
1039     length = nxt_min(lang1->version.length, lang2->version.length);
1040 
1041     n = nxt_strncmp(lang1->version.start, lang2->version.start, length);
1042 
1043     if (n == 0) {
1044         n = lang1->version.length - lang2->version.length;
1045     }
1046 
1047     /* Negate result to move higher versions to the beginning. */
1048 
1049     return -n;
1050 }
1051