xref: /unit/src/nxt_main_process.c (revision 1788:0fc09c5d1314)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 #include <nxt_runtime.h>
9 #include <nxt_port.h>
10 #include <nxt_main_process.h>
11 #include <nxt_conf.h>
12 #include <nxt_router.h>
13 #if (NXT_TLS)
14 #include <nxt_cert.h>
15 #endif
16 
17 #include <sys/mount.h>
18 
19 
20 typedef struct {
21     nxt_socket_t        socket;
22     nxt_socket_error_t  error;
23     u_char              *start;
24     u_char              *end;
25 } nxt_listening_socket_t;
26 
27 
28 typedef struct {
29     nxt_uint_t          size;
30     nxt_conf_map_t      *map;
31 } nxt_conf_app_map_t;
32 
33 
34 extern nxt_port_handlers_t  nxt_controller_process_port_handlers;
35 extern nxt_port_handlers_t  nxt_router_process_port_handlers;
36 
37 
38 static nxt_int_t nxt_main_process_port_create(nxt_task_t *task,
39     nxt_runtime_t *rt);
40 static void nxt_main_process_title(nxt_task_t *task);
41 static nxt_int_t nxt_main_process_create(nxt_task_t *task,
42     const nxt_process_init_t init);
43 static nxt_int_t nxt_main_start_process(nxt_task_t *task,
44     nxt_process_t *process);
45 static nxt_process_t *nxt_main_process_new(nxt_task_t *task, nxt_runtime_t *rt);
46 static void nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj,
47     void *data);
48 static void nxt_main_process_sigquit_handler(nxt_task_t *task, void *obj,
49     void *data);
50 static void nxt_main_process_sigusr1_handler(nxt_task_t *task, void *obj,
51     void *data);
52 static void nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj,
53     void *data);
54 static void nxt_main_process_signal_handler(nxt_task_t *task, void *obj,
55     void *data);
56 static void nxt_main_cleanup_process(nxt_task_t *task, nxt_pid_t pid);
57 static void nxt_main_port_socket_handler(nxt_task_t *task,
58     nxt_port_recv_msg_t *msg);
59 static nxt_int_t nxt_main_listening_socket(nxt_sockaddr_t *sa,
60     nxt_listening_socket_t *ls);
61 static void nxt_main_port_modules_handler(nxt_task_t *task,
62     nxt_port_recv_msg_t *msg);
63 static int nxt_cdecl nxt_app_lang_compare(const void *v1, const void *v2);
64 static void nxt_main_port_conf_store_handler(nxt_task_t *task,
65     nxt_port_recv_msg_t *msg);
66 static void nxt_main_port_access_log_handler(nxt_task_t *task,
67     nxt_port_recv_msg_t *msg);
68 
69 const nxt_sig_event_t  nxt_main_process_signals[] = {
70     nxt_event_signal(SIGHUP,  nxt_main_process_signal_handler),
71     nxt_event_signal(SIGINT,  nxt_main_process_sigterm_handler),
72     nxt_event_signal(SIGQUIT, nxt_main_process_sigquit_handler),
73     nxt_event_signal(SIGTERM, nxt_main_process_sigterm_handler),
74     nxt_event_signal(SIGCHLD, nxt_main_process_sigchld_handler),
75     nxt_event_signal(SIGUSR1, nxt_main_process_sigusr1_handler),
76     nxt_event_signal_end,
77 };
78 
79 
80 static nxt_bool_t  nxt_exiting;
81 
82 
83 nxt_int_t
84 nxt_main_process_start(nxt_thread_t *thr, nxt_task_t *task,
85     nxt_runtime_t *rt)
86 {
87     rt->type = NXT_PROCESS_MAIN;
88 
89     if (nxt_main_process_port_create(task, rt) != NXT_OK) {
90         return NXT_ERROR;
91     }
92 
93     nxt_main_process_title(task);
94 
95     /*
96      * The discovery process will send a message processed by
97      * nxt_main_port_modules_handler() which starts the controller
98      * and router processes.
99      */
100     return nxt_main_process_create(task, nxt_discovery_process);
101 }
102 
103 
104 static nxt_conf_map_t  nxt_common_app_conf[] = {
105     {
106         nxt_string("type"),
107         NXT_CONF_MAP_STR,
108         offsetof(nxt_common_app_conf_t, type),
109     },
110 
111     {
112         nxt_string("user"),
113         NXT_CONF_MAP_STR,
114         offsetof(nxt_common_app_conf_t, user),
115     },
116 
117     {
118         nxt_string("group"),
119         NXT_CONF_MAP_STR,
120         offsetof(nxt_common_app_conf_t, group),
121     },
122 
123     {
124         nxt_string("working_directory"),
125         NXT_CONF_MAP_CSTRZ,
126         offsetof(nxt_common_app_conf_t, working_directory),
127     },
128 
129     {
130         nxt_string("environment"),
131         NXT_CONF_MAP_PTR,
132         offsetof(nxt_common_app_conf_t, environment),
133     },
134 
135     {
136         nxt_string("isolation"),
137         NXT_CONF_MAP_PTR,
138         offsetof(nxt_common_app_conf_t, isolation),
139     },
140 
141     {
142         nxt_string("limits"),
143         NXT_CONF_MAP_PTR,
144         offsetof(nxt_common_app_conf_t, limits),
145     },
146 
147 };
148 
149 
150 static nxt_conf_map_t  nxt_common_app_limits_conf[] = {
151     {
152         nxt_string("shm"),
153         NXT_CONF_MAP_SIZE,
154         offsetof(nxt_common_app_conf_t, shm_limit),
155     },
156 
157 };
158 
159 
160 static nxt_conf_map_t  nxt_external_app_conf[] = {
161     {
162         nxt_string("executable"),
163         NXT_CONF_MAP_CSTRZ,
164         offsetof(nxt_common_app_conf_t, u.external.executable),
165     },
166 
167     {
168         nxt_string("arguments"),
169         NXT_CONF_MAP_PTR,
170         offsetof(nxt_common_app_conf_t, u.external.arguments),
171     },
172 
173 };
174 
175 
176 static nxt_conf_map_t  nxt_python_app_conf[] = {
177     {
178         nxt_string("home"),
179         NXT_CONF_MAP_CSTRZ,
180         offsetof(nxt_common_app_conf_t, u.python.home),
181     },
182 
183     {
184         nxt_string("path"),
185         NXT_CONF_MAP_PTR,
186         offsetof(nxt_common_app_conf_t, u.python.path),
187     },
188 
189     {
190         nxt_string("module"),
191         NXT_CONF_MAP_STR,
192         offsetof(nxt_common_app_conf_t, u.python.module),
193     },
194 
195     {
196         nxt_string("callable"),
197         NXT_CONF_MAP_CSTRZ,
198         offsetof(nxt_common_app_conf_t, u.python.callable),
199     },
200 
201     {
202         nxt_string("protocol"),
203         NXT_CONF_MAP_STR,
204         offsetof(nxt_common_app_conf_t, u.python.protocol),
205     },
206 
207     {
208         nxt_string("threads"),
209         NXT_CONF_MAP_INT32,
210         offsetof(nxt_common_app_conf_t, u.python.threads),
211     },
212 
213     {
214         nxt_string("thread_stack_size"),
215         NXT_CONF_MAP_INT32,
216         offsetof(nxt_common_app_conf_t, u.python.thread_stack_size),
217     },
218 };
219 
220 
221 static nxt_conf_map_t  nxt_php_app_conf[] = {
222     {
223         nxt_string("targets"),
224         NXT_CONF_MAP_PTR,
225         offsetof(nxt_common_app_conf_t, u.php.targets),
226     },
227 
228     {
229         nxt_string("options"),
230         NXT_CONF_MAP_PTR,
231         offsetof(nxt_common_app_conf_t, u.php.options),
232     },
233 };
234 
235 
236 static nxt_conf_map_t  nxt_perl_app_conf[] = {
237     {
238         nxt_string("script"),
239         NXT_CONF_MAP_CSTRZ,
240         offsetof(nxt_common_app_conf_t, u.perl.script),
241     },
242 
243     {
244         nxt_string("threads"),
245         NXT_CONF_MAP_INT32,
246         offsetof(nxt_common_app_conf_t, u.perl.threads),
247     },
248 
249     {
250         nxt_string("thread_stack_size"),
251         NXT_CONF_MAP_INT32,
252         offsetof(nxt_common_app_conf_t, u.perl.thread_stack_size),
253     },
254 };
255 
256 
257 static nxt_conf_map_t  nxt_ruby_app_conf[] = {
258     {
259         nxt_string("script"),
260         NXT_CONF_MAP_STR,
261         offsetof(nxt_common_app_conf_t, u.ruby.script),
262     },
263     {
264         nxt_string("threads"),
265         NXT_CONF_MAP_INT32,
266         offsetof(nxt_common_app_conf_t, u.ruby.threads),
267     },
268 };
269 
270 
271 static nxt_conf_map_t  nxt_java_app_conf[] = {
272     {
273         nxt_string("classpath"),
274         NXT_CONF_MAP_PTR,
275         offsetof(nxt_common_app_conf_t, u.java.classpath),
276     },
277     {
278         nxt_string("webapp"),
279         NXT_CONF_MAP_CSTRZ,
280         offsetof(nxt_common_app_conf_t, u.java.webapp),
281     },
282     {
283         nxt_string("options"),
284         NXT_CONF_MAP_PTR,
285         offsetof(nxt_common_app_conf_t, u.java.options),
286     },
287     {
288         nxt_string("unit_jars"),
289         NXT_CONF_MAP_CSTRZ,
290         offsetof(nxt_common_app_conf_t, u.java.unit_jars),
291     },
292     {
293         nxt_string("threads"),
294         NXT_CONF_MAP_INT32,
295         offsetof(nxt_common_app_conf_t, u.java.threads),
296     },
297     {
298         nxt_string("thread_stack_size"),
299         NXT_CONF_MAP_INT32,
300         offsetof(nxt_common_app_conf_t, u.java.thread_stack_size),
301     },
302 
303 };
304 
305 
306 static nxt_conf_app_map_t  nxt_app_maps[] = {
307     { nxt_nitems(nxt_external_app_conf),  nxt_external_app_conf },
308     { nxt_nitems(nxt_python_app_conf),    nxt_python_app_conf },
309     { nxt_nitems(nxt_php_app_conf),       nxt_php_app_conf },
310     { nxt_nitems(nxt_perl_app_conf),      nxt_perl_app_conf },
311     { nxt_nitems(nxt_ruby_app_conf),      nxt_ruby_app_conf },
312     { nxt_nitems(nxt_java_app_conf),      nxt_java_app_conf },
313 };
314 
315 
316 static void
317 nxt_port_main_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
318 {
319     nxt_debug(task, "main data: %*s",
320               nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos);
321 }
322 
323 
324 static void
325 nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
326 {
327     u_char                 *start, *p, ch;
328     size_t                 type_len;
329     nxt_int_t              ret;
330     nxt_buf_t              *b;
331     nxt_port_t             *port;
332     nxt_runtime_t          *rt;
333     nxt_process_t          *process;
334     nxt_app_type_t         idx;
335     nxt_conf_value_t       *conf;
336     nxt_process_init_t     *init;
337     nxt_common_app_conf_t  *app_conf;
338 
339     ret = NXT_ERROR;
340 
341     rt = task->thread->runtime;
342 
343     process = nxt_main_process_new(task, rt);
344     if (nxt_slow_path(process == NULL)) {
345         return;
346     }
347 
348     init = nxt_process_init(process);
349 
350     *init = nxt_app_process;
351 
352     b = nxt_buf_chk_make_plain(process->mem_pool, msg->buf, msg->size);
353     if (b == NULL) {
354         goto failed;
355     }
356 
357     nxt_debug(task, "main start process: %*s", b->mem.free - b->mem.pos,
358               b->mem.pos);
359 
360     app_conf = nxt_mp_zalloc(process->mem_pool, sizeof(nxt_common_app_conf_t));
361     if (nxt_slow_path(app_conf == NULL)) {
362         goto failed;
363     }
364 
365     start = b->mem.pos;
366 
367     app_conf->name.start = start;
368     app_conf->name.length = nxt_strlen(start);
369 
370     init->name = (const char *) start;
371 
372     process->name = nxt_mp_alloc(process->mem_pool, app_conf->name.length
373                                  + sizeof("\"\" application") + 1);
374 
375     if (nxt_slow_path(process->name == NULL)) {
376         goto failed;
377     }
378 
379     p = (u_char *) process->name;
380     *p++ = '"';
381     p = nxt_cpymem(p, init->name, app_conf->name.length);
382     p = nxt_cpymem(p, "\" application", 13);
383     *p = '\0';
384 
385     app_conf->shm_limit = 100 * 1024 * 1024;
386 
387     start += app_conf->name.length + 1;
388 
389     conf = nxt_conf_json_parse(process->mem_pool, start, b->mem.free, NULL);
390     if (conf == NULL) {
391         nxt_alert(task, "router app configuration parsing error");
392 
393         goto failed;
394     }
395 
396     rt = task->thread->runtime;
397 
398     app_conf->user.start  = (u_char*)rt->user_cred.user;
399     app_conf->user.length = nxt_strlen(rt->user_cred.user);
400 
401     ret = nxt_conf_map_object(process->mem_pool, conf, nxt_common_app_conf,
402                               nxt_nitems(nxt_common_app_conf), app_conf);
403 
404     if (ret != NXT_OK) {
405         nxt_alert(task, "failed to map common app conf received from router");
406         goto failed;
407     }
408 
409     for (type_len = 0; type_len != app_conf->type.length; type_len++) {
410         ch = app_conf->type.start[type_len];
411 
412         if (ch == ' ' || nxt_isdigit(ch)) {
413             break;
414         }
415     }
416 
417     idx = nxt_app_parse_type(app_conf->type.start, type_len);
418 
419     if (nxt_slow_path(idx >= nxt_nitems(nxt_app_maps))) {
420         nxt_alert(task, "invalid app type %d received from router", (int) idx);
421         goto failed;
422     }
423 
424     ret = nxt_conf_map_object(process->mem_pool, conf, nxt_app_maps[idx].map,
425                               nxt_app_maps[idx].size, app_conf);
426 
427     if (nxt_slow_path(ret != NXT_OK)) {
428         nxt_alert(task, "failed to map app conf received from router");
429         goto failed;
430     }
431 
432     if (app_conf->limits != NULL) {
433         ret = nxt_conf_map_object(process->mem_pool, app_conf->limits,
434                                   nxt_common_app_limits_conf,
435                                   nxt_nitems(nxt_common_app_limits_conf),
436                                   app_conf);
437 
438         if (nxt_slow_path(ret != NXT_OK)) {
439             nxt_alert(task, "failed to map app limits received from router");
440             goto failed;
441         }
442     }
443 
444     app_conf->self = conf;
445 
446     process->stream = msg->port_msg.stream;
447     process->data.app = app_conf;
448 
449     ret = nxt_main_start_process(task, process);
450     if (nxt_fast_path(ret == NXT_OK || ret == NXT_AGAIN)) {
451         return;
452     }
453 
454 failed:
455 
456     nxt_process_use(task, process, -1);
457 
458     port = nxt_runtime_port_find(rt, msg->port_msg.pid,
459                                  msg->port_msg.reply_port);
460 
461     if (nxt_fast_path(port != NULL)) {
462         nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
463                               -1, msg->port_msg.stream, 0, NULL);
464     }
465 }
466 
467 
468 static void
469 nxt_main_process_created_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
470 {
471     nxt_port_t     *port;
472     nxt_process_t  *process;
473     nxt_runtime_t  *rt;
474 
475     rt = task->thread->runtime;
476 
477     process = nxt_runtime_process_find(rt, msg->port_msg.pid);
478     if (nxt_slow_path(process == NULL)) {
479         return;
480     }
481 
482     nxt_assert(process->state == NXT_PROCESS_STATE_CREATING);
483 
484     port = nxt_runtime_port_find(rt, msg->port_msg.pid,
485                                  msg->port_msg.reply_port);
486 
487 
488     if (nxt_slow_path(port == NULL)) {
489         return;
490     }
491 
492 #if (NXT_HAVE_CLONE && NXT_HAVE_CLONE_NEWUSER)
493     if (nxt_is_clone_flag_set(process->isolation.clone.flags, NEWUSER)) {
494         if (nxt_slow_path(nxt_clone_credential_map(task, process->pid,
495                                                    process->user_cred,
496                                                    &process->isolation.clone)
497                           != NXT_OK))
498         {
499             (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
500                                          -1, msg->port_msg.stream, 0, NULL);
501             return;
502         }
503      }
504 
505 #endif
506 
507     process->state = NXT_PROCESS_STATE_CREATED;
508 
509     (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_READY_LAST,
510                                  -1, msg->port_msg.stream, 0, NULL);
511 }
512 
513 
514 static nxt_port_handlers_t  nxt_main_process_port_handlers = {
515     .data             = nxt_port_main_data_handler,
516     .process_created  = nxt_main_process_created_handler,
517     .process_ready    = nxt_port_process_ready_handler,
518     .start_process    = nxt_port_main_start_process_handler,
519     .socket           = nxt_main_port_socket_handler,
520     .modules          = nxt_main_port_modules_handler,
521     .conf_store       = nxt_main_port_conf_store_handler,
522 #if (NXT_TLS)
523     .cert_get         = nxt_cert_store_get_handler,
524     .cert_delete      = nxt_cert_store_delete_handler,
525 #endif
526     .access_log       = nxt_main_port_access_log_handler,
527     .rpc_ready        = nxt_port_rpc_handler,
528     .rpc_error        = nxt_port_rpc_handler,
529 };
530 
531 
532 static nxt_int_t
533 nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt)
534 {
535     nxt_int_t      ret;
536     nxt_port_t     *port;
537     nxt_process_t  *process;
538 
539     port = nxt_runtime_process_port_create(task, rt, nxt_pid, 0,
540                                            NXT_PROCESS_MAIN);
541     if (nxt_slow_path(port == NULL)) {
542         return NXT_ERROR;
543     }
544 
545     process = port->process;
546 
547     ret = nxt_port_socket_init(task, port, 0);
548     if (nxt_slow_path(ret != NXT_OK)) {
549         nxt_port_use(task, port, -1);
550         return ret;
551     }
552 
553     /*
554      * A main process port.  A write port is not closed
555      * since it should be inherited by processes.
556      */
557     nxt_port_enable(task, port, &nxt_main_process_port_handlers);
558 
559     process->state = NXT_PROCESS_STATE_READY;
560 
561     return NXT_OK;
562 }
563 
564 
565 static void
566 nxt_main_process_title(nxt_task_t *task)
567 {
568     u_char      *p, *end;
569     nxt_uint_t  i;
570     u_char      title[2048];
571 
572     end = title + sizeof(title) - 1;
573 
574     p = nxt_sprintf(title, end, "unit: main v" NXT_VERSION " [%s",
575                     nxt_process_argv[0]);
576 
577     for (i = 1; nxt_process_argv[i] != NULL; i++) {
578         p = nxt_sprintf(p, end, " %s", nxt_process_argv[i]);
579     }
580 
581     if (p < end) {
582         *p++ = ']';
583     }
584 
585     *p = '\0';
586 
587     nxt_process_title(task, "%s", title);
588 }
589 
590 
591 static nxt_int_t
592 nxt_main_process_create(nxt_task_t *task, const nxt_process_init_t init)
593 {
594     nxt_int_t           ret;
595     nxt_runtime_t       *rt;
596     nxt_process_t       *process;
597     nxt_process_init_t  *pinit;
598 
599     rt = task->thread->runtime;
600 
601     process = nxt_main_process_new(task, rt);
602     if (nxt_slow_path(process == NULL)) {
603         return NXT_ERROR;
604     }
605 
606     process->name = init.name;
607     process->user_cred = &rt->user_cred;
608 
609     pinit = nxt_process_init(process);
610     *pinit = init;
611 
612     ret = nxt_main_start_process(task, process);
613     if (nxt_slow_path(ret == NXT_ERROR)) {
614         nxt_process_use(task, process, -1);
615     }
616 
617     return ret;
618 }
619 
620 
621 static nxt_process_t *
622 nxt_main_process_new(nxt_task_t *task, nxt_runtime_t *rt)
623 {
624     nxt_process_t  *process;
625 
626     process = nxt_runtime_process_new(rt);
627     if (nxt_slow_path(process == NULL)) {
628         return NULL;
629     }
630 
631     process->mem_pool = nxt_mp_create(1024, 128, 256, 32);
632     if (process->mem_pool == NULL) {
633         nxt_process_use(task, process, -1);
634         return NULL;
635     }
636 
637     return process;
638 }
639 
640 
641 static nxt_int_t
642 nxt_main_start_process(nxt_task_t *task, nxt_process_t *process)
643 {
644     nxt_mp_t            *tmp_mp;
645     nxt_int_t           ret;
646     nxt_pid_t           pid;
647     nxt_port_t          *port;
648     nxt_process_init_t  *init;
649 
650     init = nxt_process_init(process);
651 
652     port = nxt_port_new(task, 0, 0, init->type);
653     if (nxt_slow_path(port == NULL)) {
654         return NXT_ERROR;
655     }
656 
657     nxt_process_port_add(task, process, port);
658 
659     ret = nxt_port_socket_init(task, port, 0);
660     if (nxt_slow_path(ret != NXT_OK)) {
661         goto free_port;
662     }
663 
664     tmp_mp = nxt_mp_create(1024, 128, 256, 32);
665     if (nxt_slow_path(tmp_mp == NULL)) {
666         ret = NXT_ERROR;
667 
668         goto close_port;
669     }
670 
671     if (init->prefork) {
672         ret = init->prefork(task, process, tmp_mp);
673         if (nxt_slow_path(ret != NXT_OK)) {
674             goto free_mempool;
675         }
676     }
677 
678     pid = nxt_process_create(task, process);
679 
680     switch (pid) {
681 
682     case -1:
683         ret = NXT_ERROR;
684         break;
685 
686     case 0:
687         /* The child process: return to the event engine work queue loop. */
688 
689         nxt_process_use(task, process, -1);
690 
691         ret = NXT_AGAIN;
692         break;
693 
694     default:
695         /* The main process created a new process. */
696 
697         nxt_process_use(task, process, -1);
698 
699         nxt_port_read_close(port);
700         nxt_port_write_enable(task, port);
701 
702         ret = NXT_OK;
703         break;
704     }
705 
706 free_mempool:
707 
708     nxt_mp_destroy(tmp_mp);
709 
710 close_port:
711 
712     if (nxt_slow_path(ret == NXT_ERROR)) {
713         nxt_port_close(task, port);
714     }
715 
716 free_port:
717 
718     nxt_port_use(task, port, -1);
719 
720     return ret;
721 }
722 
723 
724 static void
725 nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj, void *data)
726 {
727     nxt_debug(task, "sigterm handler signo:%d (%s)",
728               (int) (uintptr_t) obj, data);
729 
730     /* TODO: fast exit. */
731 
732     nxt_exiting = 1;
733 
734     nxt_runtime_quit(task, 0);
735 }
736 
737 
738 static void
739 nxt_main_process_sigquit_handler(nxt_task_t *task, void *obj, void *data)
740 {
741     nxt_debug(task, "sigquit handler signo:%d (%s)",
742               (int) (uintptr_t) obj, data);
743 
744     /* TODO: graceful exit. */
745 
746     nxt_exiting = 1;
747 
748     nxt_runtime_quit(task, 0);
749 }
750 
751 
752 static void
753 nxt_main_process_sigusr1_handler(nxt_task_t *task, void *obj, void *data)
754 {
755     nxt_mp_t        *mp;
756     nxt_int_t       ret;
757     nxt_uint_t      n;
758     nxt_port_t      *port;
759     nxt_file_t      *file, *new_file;
760     nxt_array_t     *new_files;
761     nxt_runtime_t   *rt;
762 
763     nxt_log(task, NXT_LOG_NOTICE, "signal %d (%s) recevied, %s",
764             (int) (uintptr_t) obj, data, "log files rotation");
765 
766     rt = task->thread->runtime;
767 
768     port = rt->port_by_type[NXT_PROCESS_ROUTER];
769 
770     if (nxt_fast_path(port != NULL)) {
771         (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_ACCESS_LOG,
772                                      -1, 0, 0, NULL);
773     }
774 
775     mp = nxt_mp_create(1024, 128, 256, 32);
776     if (mp == NULL) {
777         return;
778     }
779 
780     n = nxt_list_nelts(rt->log_files);
781 
782     new_files = nxt_array_create(mp, n, sizeof(nxt_file_t));
783     if (new_files == NULL) {
784         nxt_mp_destroy(mp);
785         return;
786     }
787 
788     nxt_list_each(file, rt->log_files) {
789 
790         /* This allocation cannot fail. */
791         new_file = nxt_array_add(new_files);
792 
793         new_file->name = file->name;
794         new_file->fd = NXT_FILE_INVALID;
795         new_file->log_level = NXT_LOG_ALERT;
796 
797         ret = nxt_file_open(task, new_file, O_WRONLY | O_APPEND, O_CREAT,
798                             NXT_FILE_OWNER_ACCESS);
799 
800         if (ret != NXT_OK) {
801             goto fail;
802         }
803 
804     } nxt_list_loop;
805 
806     new_file = new_files->elts;
807 
808     ret = nxt_file_stderr(&new_file[0]);
809 
810     if (ret == NXT_OK) {
811         n = 0;
812 
813         nxt_list_each(file, rt->log_files) {
814 
815             nxt_port_change_log_file(task, rt, n, new_file[n].fd);
816             /*
817              * The old log file descriptor must be closed at the moment
818              * when no other threads use it.  dup2() allows to use the
819              * old file descriptor for new log file.  This change is
820              * performed atomically in the kernel.
821              */
822             (void) nxt_file_redirect(file, new_file[n].fd);
823 
824             n++;
825 
826         } nxt_list_loop;
827 
828         nxt_mp_destroy(mp);
829         return;
830     }
831 
832 fail:
833 
834     new_file = new_files->elts;
835     n = new_files->nelts;
836 
837     while (n != 0) {
838         if (new_file->fd != NXT_FILE_INVALID) {
839             nxt_file_close(task, new_file);
840         }
841 
842         new_file++;
843         n--;
844     }
845 
846     nxt_mp_destroy(mp);
847 }
848 
849 
850 static void
851 nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data)
852 {
853     int                    status;
854     nxt_err_t              err;
855     nxt_pid_t              pid;
856 
857     nxt_debug(task, "sigchld handler signo:%d (%s)",
858               (int) (uintptr_t) obj, data);
859 
860     for ( ;; ) {
861         pid = waitpid(-1, &status, WNOHANG);
862 
863         if (pid == -1) {
864 
865             switch (err = nxt_errno) {
866 
867             case NXT_ECHILD:
868                 return;
869 
870             case NXT_EINTR:
871                 continue;
872 
873             default:
874                 nxt_alert(task, "waitpid() failed: %E", err);
875                 return;
876             }
877         }
878 
879         nxt_debug(task, "waitpid(): %PI", pid);
880 
881         if (pid == 0) {
882             return;
883         }
884 
885         if (WTERMSIG(status)) {
886 #ifdef WCOREDUMP
887             nxt_alert(task, "process %PI exited on signal %d%s",
888                       pid, WTERMSIG(status),
889                       WCOREDUMP(status) ? " (core dumped)" : "");
890 #else
891             nxt_alert(task, "process %PI exited on signal %d",
892                       pid, WTERMSIG(status));
893 #endif
894 
895         } else {
896             nxt_trace(task, "process %PI exited with code %d",
897                       pid, WEXITSTATUS(status));
898         }
899 
900         nxt_main_cleanup_process(task, pid);
901     }
902 }
903 
904 
905 static void
906 nxt_main_process_signal_handler(nxt_task_t *task, void *obj, void *data)
907 {
908     nxt_trace(task, "signal signo:%d (%s) recevied, ignored",
909               (int) (uintptr_t) obj, data);
910 }
911 
912 
913 static void
914 nxt_main_cleanup_process(nxt_task_t *task, nxt_pid_t pid)
915 {
916     int                 stream;
917     nxt_int_t           ret;
918     nxt_buf_t           *buf;
919     nxt_port_t          *port;
920     const char          *name;
921     nxt_runtime_t       *rt;
922     nxt_process_t       *process;
923     nxt_process_init_t  init;
924 
925     rt = task->thread->runtime;
926 
927     process = nxt_runtime_process_find(rt, pid);
928     if (!process) {
929         return;
930     }
931 
932     if (process->isolation.cleanup != NULL) {
933         process->isolation.cleanup(task, process);
934     }
935 
936     name = process->name;
937     stream = process->stream;
938     init = *((nxt_process_init_t *) nxt_process_init(process));
939 
940     if (process->state == NXT_PROCESS_STATE_READY) {
941         process->stream = 0;
942     }
943 
944     nxt_process_close_ports(task, process);
945 
946     if (nxt_exiting) {
947         if (rt->nprocesses <= 1) {
948             nxt_runtime_quit(task, 0);
949         }
950 
951         return;
952     }
953 
954     nxt_runtime_process_each(rt, process) {
955 
956         if (process->pid == nxt_pid
957             || process->pid == pid
958             || nxt_queue_is_empty(&process->ports))
959         {
960             continue;
961         }
962 
963         port = nxt_process_port_first(process);
964 
965         if (nxt_proc_remove_notify_matrix[init.type][port->type] == 0) {
966             continue;
967         }
968 
969         buf = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
970                                    sizeof(pid));
971 
972         if (nxt_slow_path(buf == NULL)) {
973             continue;
974         }
975 
976         buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(pid));
977 
978         nxt_port_socket_write(task, port, NXT_PORT_MSG_REMOVE_PID, -1,
979                               stream, 0, buf);
980 
981     } nxt_runtime_process_loop;
982 
983     if (init.restart) {
984         ret = nxt_main_process_create(task, init);
985         if (nxt_slow_path(ret == NXT_ERROR)) {
986             nxt_alert(task, "failed to restart %s", name);
987         }
988     }
989 }
990 
991 
992 static void
993 nxt_main_port_socket_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
994 {
995     size_t                  size;
996     nxt_int_t               ret;
997     nxt_buf_t               *b, *out;
998     nxt_port_t              *port;
999     nxt_sockaddr_t          *sa;
1000     nxt_port_msg_type_t     type;
1001     nxt_listening_socket_t  ls;
1002     u_char                  message[2048];
1003 
1004     port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
1005                                  msg->port_msg.reply_port);
1006     if (nxt_slow_path(port == NULL)) {
1007         return;
1008     }
1009 
1010     b = msg->buf;
1011     sa = (nxt_sockaddr_t *) b->mem.pos;
1012 
1013     /* TODO check b size and make plain */
1014 
1015     ls.socket = -1;
1016     ls.error = NXT_SOCKET_ERROR_SYSTEM;
1017     ls.start = message;
1018     ls.end = message + sizeof(message);
1019 
1020     nxt_debug(task, "listening socket \"%*s\"",
1021               (size_t) sa->length, nxt_sockaddr_start(sa));
1022 
1023     ret = nxt_main_listening_socket(sa, &ls);
1024 
1025     if (ret == NXT_OK) {
1026         nxt_debug(task, "socket(\"%*s\"): %d",
1027                   (size_t) sa->length, nxt_sockaddr_start(sa), ls.socket);
1028 
1029         out = NULL;
1030 
1031         type = NXT_PORT_MSG_RPC_READY_LAST | NXT_PORT_MSG_CLOSE_FD;
1032 
1033     } else {
1034         size = ls.end - ls.start;
1035 
1036         nxt_alert(task, "%*s", size, ls.start);
1037 
1038         out = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
1039                                    size + 1);
1040         if (nxt_fast_path(out != NULL)) {
1041             *out->mem.free++ = (uint8_t) ls.error;
1042 
1043             out->mem.free = nxt_cpymem(out->mem.free, ls.start, size);
1044         }
1045 
1046         type = NXT_PORT_MSG_RPC_ERROR;
1047     }
1048 
1049     nxt_port_socket_write(task, port, type, ls.socket, msg->port_msg.stream,
1050                           0, out);
1051 }
1052 
1053 
1054 static nxt_int_t
1055 nxt_main_listening_socket(nxt_sockaddr_t *sa, nxt_listening_socket_t *ls)
1056 {
1057     nxt_err_t         err;
1058     nxt_socket_t      s;
1059 
1060     const socklen_t   length = sizeof(int);
1061     static const int  enable = 1;
1062 
1063     s = socket(sa->u.sockaddr.sa_family, sa->type, 0);
1064 
1065     if (nxt_slow_path(s == -1)) {
1066         err = nxt_errno;
1067 
1068 #if (NXT_INET6)
1069 
1070         if (err == EAFNOSUPPORT && sa->u.sockaddr.sa_family == AF_INET6) {
1071             ls->error = NXT_SOCKET_ERROR_NOINET6;
1072         }
1073 
1074 #endif
1075 
1076         ls->end = nxt_sprintf(ls->start, ls->end,
1077                               "socket(\\\"%*s\\\") failed %E",
1078                               (size_t) sa->length, nxt_sockaddr_start(sa), err);
1079 
1080         return NXT_ERROR;
1081     }
1082 
1083     if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &enable, length) != 0) {
1084         ls->end = nxt_sprintf(ls->start, ls->end,
1085                               "setsockopt(\\\"%*s\\\", SO_REUSEADDR) failed %E",
1086                               (size_t) sa->length, nxt_sockaddr_start(sa),
1087                               nxt_errno);
1088         goto fail;
1089     }
1090 
1091 #if (NXT_INET6)
1092 
1093     if (sa->u.sockaddr.sa_family == AF_INET6) {
1094 
1095         if (setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &enable, length) != 0) {
1096             ls->end = nxt_sprintf(ls->start, ls->end,
1097                                "setsockopt(\\\"%*s\\\", IPV6_V6ONLY) failed %E",
1098                                (size_t) sa->length, nxt_sockaddr_start(sa),
1099                                nxt_errno);
1100             goto fail;
1101         }
1102     }
1103 
1104 #endif
1105 
1106     if (bind(s, &sa->u.sockaddr, sa->socklen) != 0) {
1107         err = nxt_errno;
1108 
1109 #if (NXT_HAVE_UNIX_DOMAIN)
1110 
1111         if (sa->u.sockaddr.sa_family == AF_UNIX) {
1112             switch (err) {
1113 
1114             case EACCES:
1115                 ls->error = NXT_SOCKET_ERROR_ACCESS;
1116                 break;
1117 
1118             case ENOENT:
1119             case ENOTDIR:
1120                 ls->error = NXT_SOCKET_ERROR_PATH;
1121                 break;
1122             }
1123 
1124         } else
1125 #endif
1126         {
1127             switch (err) {
1128 
1129             case EACCES:
1130                 ls->error = NXT_SOCKET_ERROR_PORT;
1131                 break;
1132 
1133             case EADDRINUSE:
1134                 ls->error = NXT_SOCKET_ERROR_INUSE;
1135                 break;
1136 
1137             case EADDRNOTAVAIL:
1138                 ls->error = NXT_SOCKET_ERROR_NOADDR;
1139                 break;
1140             }
1141         }
1142 
1143         ls->end = nxt_sprintf(ls->start, ls->end, "bind(\\\"%*s\\\") failed %E",
1144                               (size_t) sa->length, nxt_sockaddr_start(sa), err);
1145         goto fail;
1146     }
1147 
1148 #if (NXT_HAVE_UNIX_DOMAIN)
1149 
1150     if (sa->u.sockaddr.sa_family == AF_UNIX) {
1151         char     *filename;
1152         mode_t   access;
1153 
1154         filename = sa->u.sockaddr_un.sun_path;
1155         access = (S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
1156 
1157         if (chmod(filename, access) != 0) {
1158             ls->end = nxt_sprintf(ls->start, ls->end,
1159                                   "chmod(\\\"%s\\\") failed %E",
1160                                   filename, nxt_errno);
1161             goto fail;
1162         }
1163     }
1164 
1165 #endif
1166 
1167     ls->socket = s;
1168 
1169     return NXT_OK;
1170 
1171 fail:
1172 
1173     (void) close(s);
1174 
1175     return NXT_ERROR;
1176 }
1177 
1178 
1179 static nxt_conf_map_t  nxt_app_lang_module_map[] = {
1180     {
1181         nxt_string("type"),
1182         NXT_CONF_MAP_INT,
1183         offsetof(nxt_app_lang_module_t, type),
1184     },
1185 
1186     {
1187         nxt_string("version"),
1188         NXT_CONF_MAP_CSTRZ,
1189         offsetof(nxt_app_lang_module_t, version),
1190     },
1191 
1192     {
1193         nxt_string("file"),
1194         NXT_CONF_MAP_CSTRZ,
1195         offsetof(nxt_app_lang_module_t, file),
1196     },
1197 };
1198 
1199 
1200 static nxt_conf_map_t  nxt_app_lang_mounts_map[] = {
1201     {
1202         nxt_string("src"),
1203         NXT_CONF_MAP_CSTRZ,
1204         offsetof(nxt_fs_mount_t, src),
1205     },
1206     {
1207         nxt_string("dst"),
1208         NXT_CONF_MAP_CSTRZ,
1209         offsetof(nxt_fs_mount_t, dst),
1210     },
1211     {
1212         nxt_string("name"),
1213         NXT_CONF_MAP_CSTRZ,
1214         offsetof(nxt_fs_mount_t, name),
1215     },
1216     {
1217         nxt_string("type"),
1218         NXT_CONF_MAP_INT,
1219         offsetof(nxt_fs_mount_t, type),
1220     },
1221     {
1222         nxt_string("flags"),
1223         NXT_CONF_MAP_INT,
1224         offsetof(nxt_fs_mount_t, flags),
1225     },
1226     {
1227         nxt_string("data"),
1228         NXT_CONF_MAP_CSTRZ,
1229         offsetof(nxt_fs_mount_t, data),
1230     },
1231 };
1232 
1233 
1234 static void
1235 nxt_main_port_modules_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1236 {
1237     uint32_t               index, jindex, nmounts;
1238     nxt_mp_t               *mp;
1239     nxt_int_t              ret;
1240     nxt_buf_t              *b;
1241     nxt_port_t             *port;
1242     nxt_runtime_t          *rt;
1243     nxt_fs_mount_t         *mnt;
1244     nxt_conf_value_t       *conf, *root, *value, *mounts;
1245     nxt_app_lang_module_t  *lang;
1246 
1247     static nxt_str_t root_path = nxt_string("/");
1248     static nxt_str_t mounts_name = nxt_string("mounts");
1249 
1250     rt = task->thread->runtime;
1251 
1252     if (msg->port_msg.pid != rt->port_by_type[NXT_PROCESS_DISCOVERY]->pid) {
1253         return;
1254     }
1255 
1256     if (nxt_exiting) {
1257         nxt_debug(task, "ignoring discovered modules, exiting");
1258         return;
1259     }
1260 
1261     port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
1262                                  msg->port_msg.reply_port);
1263 
1264     if (nxt_fast_path(port != NULL)) {
1265         (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
1266                                      msg->port_msg.stream, 0, NULL);
1267     }
1268 
1269     b = msg->buf;
1270 
1271     if (b == NULL) {
1272         return;
1273     }
1274 
1275     mp = nxt_mp_create(1024, 128, 256, 32);
1276     if (mp == NULL) {
1277         return;
1278     }
1279 
1280     b = nxt_buf_chk_make_plain(mp, b, msg->size);
1281 
1282     if (b == NULL) {
1283         return;
1284     }
1285 
1286     nxt_debug(task, "application languages: \"%*s\"",
1287               b->mem.free - b->mem.pos, b->mem.pos);
1288 
1289     conf = nxt_conf_json_parse(mp, b->mem.pos, b->mem.free, NULL);
1290     if (conf == NULL) {
1291         goto fail;
1292     }
1293 
1294     root = nxt_conf_get_path(conf, &root_path);
1295     if (root == NULL) {
1296         goto fail;
1297     }
1298 
1299     for (index = 0; /* void */ ; index++) {
1300         value = nxt_conf_get_array_element(root, index);
1301         if (value == NULL) {
1302             break;
1303         }
1304 
1305         lang = nxt_array_zero_add(rt->languages);
1306         if (lang == NULL) {
1307             goto fail;
1308         }
1309 
1310         lang->module = NULL;
1311 
1312         ret = nxt_conf_map_object(rt->mem_pool, value, nxt_app_lang_module_map,
1313                                   nxt_nitems(nxt_app_lang_module_map), lang);
1314 
1315         if (ret != NXT_OK) {
1316             goto fail;
1317         }
1318 
1319         mounts = nxt_conf_get_object_member(value, &mounts_name, NULL);
1320         if (mounts == NULL) {
1321             nxt_alert(task, "missing mounts from discovery message.");
1322             goto fail;
1323         }
1324 
1325         if (nxt_conf_type(mounts) != NXT_CONF_ARRAY) {
1326             nxt_alert(task, "invalid mounts type from discovery message.");
1327             goto fail;
1328         }
1329 
1330         nmounts = nxt_conf_array_elements_count(mounts);
1331 
1332         lang->mounts = nxt_array_create(rt->mem_pool, nmounts,
1333                                         sizeof(nxt_fs_mount_t));
1334 
1335         if (lang->mounts == NULL) {
1336             goto fail;
1337         }
1338 
1339         for (jindex = 0; /* */; jindex++) {
1340             value = nxt_conf_get_array_element(mounts, jindex);
1341             if (value == NULL) {
1342                 break;
1343             }
1344 
1345             mnt = nxt_array_zero_add(lang->mounts);
1346             if (mnt == NULL) {
1347                 goto fail;
1348             }
1349 
1350             mnt->builtin = 1;
1351             mnt->deps = 1;
1352 
1353             ret = nxt_conf_map_object(rt->mem_pool, value,
1354                                       nxt_app_lang_mounts_map,
1355                                       nxt_nitems(nxt_app_lang_mounts_map), mnt);
1356 
1357             if (ret != NXT_OK) {
1358                 goto fail;
1359             }
1360         }
1361 
1362         nxt_debug(task, "lang %d %s \"%s\" (%d mounts)",
1363                   lang->type, lang->version, lang->file, lang->mounts->nelts);
1364     }
1365 
1366     qsort(rt->languages->elts, rt->languages->nelts,
1367           sizeof(nxt_app_lang_module_t), nxt_app_lang_compare);
1368 
1369 fail:
1370 
1371     nxt_mp_destroy(mp);
1372 
1373     ret = nxt_main_process_create(task, nxt_controller_process);
1374     if (ret == NXT_OK) {
1375         ret = nxt_main_process_create(task, nxt_router_process);
1376     }
1377 
1378     if (nxt_slow_path(ret == NXT_ERROR)) {
1379         nxt_exiting = 1;
1380 
1381         nxt_runtime_quit(task, 1);
1382     }
1383 }
1384 
1385 
1386 static int nxt_cdecl
1387 nxt_app_lang_compare(const void *v1, const void *v2)
1388 {
1389     int                          n;
1390     const nxt_app_lang_module_t  *lang1, *lang2;
1391 
1392     lang1 = v1;
1393     lang2 = v2;
1394 
1395     n = lang1->type - lang2->type;
1396 
1397     if (n != 0) {
1398         return n;
1399     }
1400 
1401     n = nxt_strverscmp(lang1->version, lang2->version);
1402 
1403     /* Negate result to move higher versions to the beginning. */
1404 
1405     return -n;
1406 }
1407 
1408 
1409 static void
1410 nxt_main_port_conf_store_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1411 {
1412     void           *p;
1413     size_t         size;
1414     ssize_t        n;
1415     nxt_int_t      ret;
1416     nxt_file_t     file;
1417     nxt_runtime_t  *rt;
1418 
1419     p = MAP_FAILED;
1420 
1421     /*
1422      * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be
1423      * initialized in 'cleanup' section.
1424      */
1425     size = 0;
1426 
1427     if (nxt_slow_path(msg->fd[0] == -1)) {
1428         nxt_alert(task, "conf_store_handler: invalid shm fd");
1429         goto error;
1430     }
1431 
1432     if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) {
1433         nxt_alert(task, "conf_store_handler: unexpected buffer size (%d)",
1434                   (int) nxt_buf_mem_used_size(&msg->buf->mem));
1435         goto error;
1436     }
1437 
1438     nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t));
1439 
1440     p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0);
1441 
1442     nxt_fd_close(msg->fd[0]);
1443     msg->fd[0] = -1;
1444 
1445     if (nxt_slow_path(p == MAP_FAILED)) {
1446         goto error;
1447     }
1448 
1449     nxt_debug(task, "conf_store_handler(%uz): %*s", size, size, p);
1450 
1451     nxt_memzero(&file, sizeof(nxt_file_t));
1452 
1453     rt = task->thread->runtime;
1454 
1455     file.name = (nxt_file_name_t *) rt->conf_tmp;
1456 
1457     if (nxt_slow_path(nxt_file_open(task, &file, NXT_FILE_WRONLY,
1458                                     NXT_FILE_TRUNCATE, NXT_FILE_OWNER_ACCESS)
1459                       != NXT_OK))
1460     {
1461         goto error;
1462     }
1463 
1464     n = nxt_file_write(&file, p, size, 0);
1465 
1466     nxt_file_close(task, &file);
1467 
1468     if (nxt_slow_path(n != (ssize_t) size)) {
1469         (void) nxt_file_delete(file.name);
1470         goto error;
1471     }
1472 
1473     ret = nxt_file_rename(file.name, (nxt_file_name_t *) rt->conf);
1474 
1475     if (nxt_fast_path(ret == NXT_OK)) {
1476         goto cleanup;
1477     }
1478 
1479 error:
1480 
1481     nxt_alert(task, "failed to store current configuration");
1482 
1483 cleanup:
1484 
1485     if (p != MAP_FAILED) {
1486         nxt_mem_munmap(p, size);
1487     }
1488 
1489     if (msg->fd[0] != -1) {
1490         nxt_fd_close(msg->fd[0]);
1491         msg->fd[0] = -1;
1492     }
1493 }
1494 
1495 
1496 static void
1497 nxt_main_port_access_log_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1498 {
1499     u_char               *path;
1500     nxt_int_t            ret;
1501     nxt_file_t           file;
1502     nxt_port_t           *port;
1503     nxt_port_msg_type_t  type;
1504 
1505     nxt_debug(task, "opening access log file");
1506 
1507     path = msg->buf->mem.pos;
1508 
1509     nxt_memzero(&file, sizeof(nxt_file_t));
1510 
1511     file.name = (nxt_file_name_t *) path;
1512     file.log_level = NXT_LOG_ERR;
1513 
1514     ret = nxt_file_open(task, &file, O_WRONLY | O_APPEND, O_CREAT,
1515                         NXT_FILE_OWNER_ACCESS);
1516 
1517     type = (ret == NXT_OK) ? NXT_PORT_MSG_RPC_READY_LAST | NXT_PORT_MSG_CLOSE_FD
1518                            : NXT_PORT_MSG_RPC_ERROR;
1519 
1520     port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
1521                                  msg->port_msg.reply_port);
1522 
1523     if (nxt_fast_path(port != NULL)) {
1524         (void) nxt_port_socket_write(task, port, type, file.fd,
1525                                      msg->port_msg.stream, 0, NULL);
1526     }
1527 }
1528