xref: /unit/src/nxt_main_process.c (revision 1787:db82aa07015c)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 #include <nxt_runtime.h>
9 #include <nxt_port.h>
10 #include <nxt_main_process.h>
11 #include <nxt_conf.h>
12 #include <nxt_router.h>
13 #if (NXT_TLS)
14 #include <nxt_cert.h>
15 #endif
16 
17 #include <sys/mount.h>
18 
19 
20 typedef struct {
21     nxt_socket_t        socket;
22     nxt_socket_error_t  error;
23     u_char              *start;
24     u_char              *end;
25 } nxt_listening_socket_t;
26 
27 
28 typedef struct {
29     nxt_uint_t          size;
30     nxt_conf_map_t      *map;
31 } nxt_conf_app_map_t;
32 
33 
34 extern nxt_port_handlers_t  nxt_controller_process_port_handlers;
35 extern nxt_port_handlers_t  nxt_router_process_port_handlers;
36 
37 
38 static nxt_int_t nxt_main_process_port_create(nxt_task_t *task,
39     nxt_runtime_t *rt);
40 static void nxt_main_process_title(nxt_task_t *task);
41 static nxt_int_t nxt_main_process_create(nxt_task_t *task,
42     const nxt_process_init_t init);
43 static nxt_int_t nxt_main_start_process(nxt_task_t *task,
44     nxt_process_t *process);
45 static nxt_process_t *nxt_main_process_new(nxt_task_t *task, nxt_runtime_t *rt);
46 static void nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj,
47     void *data);
48 static void nxt_main_process_sigquit_handler(nxt_task_t *task, void *obj,
49     void *data);
50 static void nxt_main_process_sigusr1_handler(nxt_task_t *task, void *obj,
51     void *data);
52 static void nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj,
53     void *data);
54 static void nxt_main_process_signal_handler(nxt_task_t *task, void *obj,
55     void *data);
56 static void nxt_main_cleanup_process(nxt_task_t *task, nxt_pid_t pid);
57 static void nxt_main_port_socket_handler(nxt_task_t *task,
58     nxt_port_recv_msg_t *msg);
59 static nxt_int_t nxt_main_listening_socket(nxt_sockaddr_t *sa,
60     nxt_listening_socket_t *ls);
61 static void nxt_main_port_modules_handler(nxt_task_t *task,
62     nxt_port_recv_msg_t *msg);
63 static int nxt_cdecl nxt_app_lang_compare(const void *v1, const void *v2);
64 static void nxt_main_port_conf_store_handler(nxt_task_t *task,
65     nxt_port_recv_msg_t *msg);
66 static void nxt_main_port_access_log_handler(nxt_task_t *task,
67     nxt_port_recv_msg_t *msg);
68 
69 const nxt_sig_event_t  nxt_main_process_signals[] = {
70     nxt_event_signal(SIGHUP,  nxt_main_process_signal_handler),
71     nxt_event_signal(SIGINT,  nxt_main_process_sigterm_handler),
72     nxt_event_signal(SIGQUIT, nxt_main_process_sigquit_handler),
73     nxt_event_signal(SIGTERM, nxt_main_process_sigterm_handler),
74     nxt_event_signal(SIGCHLD, nxt_main_process_sigchld_handler),
75     nxt_event_signal(SIGUSR1, nxt_main_process_sigusr1_handler),
76     nxt_event_signal_end,
77 };
78 
79 
80 static nxt_bool_t  nxt_exiting;
81 
82 
83 nxt_int_t
84 nxt_main_process_start(nxt_thread_t *thr, nxt_task_t *task,
85     nxt_runtime_t *rt)
86 {
87     rt->type = NXT_PROCESS_MAIN;
88 
89     if (nxt_main_process_port_create(task, rt) != NXT_OK) {
90         return NXT_ERROR;
91     }
92 
93     nxt_main_process_title(task);
94 
95     /*
96      * The discovery process will send a message processed by
97      * nxt_main_port_modules_handler() which starts the controller
98      * and router processes.
99      */
100     return nxt_main_process_create(task, nxt_discovery_process);
101 }
102 
103 
104 static nxt_conf_map_t  nxt_common_app_conf[] = {
105     {
106         nxt_string("type"),
107         NXT_CONF_MAP_STR,
108         offsetof(nxt_common_app_conf_t, type),
109     },
110 
111     {
112         nxt_string("user"),
113         NXT_CONF_MAP_STR,
114         offsetof(nxt_common_app_conf_t, user),
115     },
116 
117     {
118         nxt_string("group"),
119         NXT_CONF_MAP_STR,
120         offsetof(nxt_common_app_conf_t, group),
121     },
122 
123     {
124         nxt_string("working_directory"),
125         NXT_CONF_MAP_CSTRZ,
126         offsetof(nxt_common_app_conf_t, working_directory),
127     },
128 
129     {
130         nxt_string("environment"),
131         NXT_CONF_MAP_PTR,
132         offsetof(nxt_common_app_conf_t, environment),
133     },
134 
135     {
136         nxt_string("isolation"),
137         NXT_CONF_MAP_PTR,
138         offsetof(nxt_common_app_conf_t, isolation),
139     },
140 
141     {
142         nxt_string("limits"),
143         NXT_CONF_MAP_PTR,
144         offsetof(nxt_common_app_conf_t, limits),
145     },
146 
147 };
148 
149 
150 static nxt_conf_map_t  nxt_common_app_limits_conf[] = {
151     {
152         nxt_string("shm"),
153         NXT_CONF_MAP_SIZE,
154         offsetof(nxt_common_app_conf_t, shm_limit),
155     },
156 
157 };
158 
159 
160 static nxt_conf_map_t  nxt_external_app_conf[] = {
161     {
162         nxt_string("executable"),
163         NXT_CONF_MAP_CSTRZ,
164         offsetof(nxt_common_app_conf_t, u.external.executable),
165     },
166 
167     {
168         nxt_string("arguments"),
169         NXT_CONF_MAP_PTR,
170         offsetof(nxt_common_app_conf_t, u.external.arguments),
171     },
172 
173 };
174 
175 
176 static nxt_conf_map_t  nxt_python_app_conf[] = {
177     {
178         nxt_string("home"),
179         NXT_CONF_MAP_CSTRZ,
180         offsetof(nxt_common_app_conf_t, u.python.home),
181     },
182 
183     {
184         nxt_string("path"),
185         NXT_CONF_MAP_PTR,
186         offsetof(nxt_common_app_conf_t, u.python.path),
187     },
188 
189     {
190         nxt_string("module"),
191         NXT_CONF_MAP_STR,
192         offsetof(nxt_common_app_conf_t, u.python.module),
193     },
194 
195     {
196         nxt_string("callable"),
197         NXT_CONF_MAP_CSTRZ,
198         offsetof(nxt_common_app_conf_t, u.python.callable),
199     },
200 
201     {
202         nxt_string("protocol"),
203         NXT_CONF_MAP_STR,
204         offsetof(nxt_common_app_conf_t, u.python.protocol),
205     },
206 
207     {
208         nxt_string("threads"),
209         NXT_CONF_MAP_INT32,
210         offsetof(nxt_common_app_conf_t, u.python.threads),
211     },
212 
213     {
214         nxt_string("thread_stack_size"),
215         NXT_CONF_MAP_INT32,
216         offsetof(nxt_common_app_conf_t, u.python.thread_stack_size),
217     },
218 };
219 
220 
221 static nxt_conf_map_t  nxt_php_app_conf[] = {
222     {
223         nxt_string("targets"),
224         NXT_CONF_MAP_PTR,
225         offsetof(nxt_common_app_conf_t, u.php.targets),
226     },
227 
228     {
229         nxt_string("options"),
230         NXT_CONF_MAP_PTR,
231         offsetof(nxt_common_app_conf_t, u.php.options),
232     },
233 };
234 
235 
236 static nxt_conf_map_t  nxt_perl_app_conf[] = {
237     {
238         nxt_string("script"),
239         NXT_CONF_MAP_CSTRZ,
240         offsetof(nxt_common_app_conf_t, u.perl.script),
241     },
242 
243     {
244         nxt_string("threads"),
245         NXT_CONF_MAP_INT32,
246         offsetof(nxt_common_app_conf_t, u.perl.threads),
247     },
248 
249     {
250         nxt_string("thread_stack_size"),
251         NXT_CONF_MAP_INT32,
252         offsetof(nxt_common_app_conf_t, u.perl.thread_stack_size),
253     },
254 };
255 
256 
257 static nxt_conf_map_t  nxt_ruby_app_conf[] = {
258     {
259         nxt_string("script"),
260         NXT_CONF_MAP_STR,
261         offsetof(nxt_common_app_conf_t, u.ruby.script),
262     },
263     {
264         nxt_string("threads"),
265         NXT_CONF_MAP_INT32,
266         offsetof(nxt_common_app_conf_t, u.ruby.threads),
267     },
268 };
269 
270 
271 static nxt_conf_map_t  nxt_java_app_conf[] = {
272     {
273         nxt_string("classpath"),
274         NXT_CONF_MAP_PTR,
275         offsetof(nxt_common_app_conf_t, u.java.classpath),
276     },
277     {
278         nxt_string("webapp"),
279         NXT_CONF_MAP_CSTRZ,
280         offsetof(nxt_common_app_conf_t, u.java.webapp),
281     },
282     {
283         nxt_string("options"),
284         NXT_CONF_MAP_PTR,
285         offsetof(nxt_common_app_conf_t, u.java.options),
286     },
287     {
288         nxt_string("unit_jars"),
289         NXT_CONF_MAP_CSTRZ,
290         offsetof(nxt_common_app_conf_t, u.java.unit_jars),
291     },
292     {
293         nxt_string("threads"),
294         NXT_CONF_MAP_INT32,
295         offsetof(nxt_common_app_conf_t, u.java.threads),
296     },
297     {
298         nxt_string("thread_stack_size"),
299         NXT_CONF_MAP_INT32,
300         offsetof(nxt_common_app_conf_t, u.java.thread_stack_size),
301     },
302 
303 };
304 
305 
306 static nxt_conf_app_map_t  nxt_app_maps[] = {
307     { nxt_nitems(nxt_external_app_conf),  nxt_external_app_conf },
308     { nxt_nitems(nxt_python_app_conf),    nxt_python_app_conf },
309     { nxt_nitems(nxt_php_app_conf),       nxt_php_app_conf },
310     { nxt_nitems(nxt_perl_app_conf),      nxt_perl_app_conf },
311     { nxt_nitems(nxt_ruby_app_conf),      nxt_ruby_app_conf },
312     { nxt_nitems(nxt_java_app_conf),      nxt_java_app_conf },
313 };
314 
315 
316 static void
317 nxt_port_main_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
318 {
319     nxt_debug(task, "main data: %*s",
320               nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos);
321 }
322 
323 
324 static void
325 nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
326 {
327     u_char                 *start, *p, ch;
328     size_t                 type_len;
329     nxt_int_t              ret;
330     nxt_buf_t              *b;
331     nxt_port_t             *port;
332     nxt_runtime_t          *rt;
333     nxt_process_t          *process;
334     nxt_app_type_t         idx;
335     nxt_conf_value_t       *conf;
336     nxt_process_init_t     *init;
337     nxt_common_app_conf_t  *app_conf;
338 
339     ret = NXT_ERROR;
340 
341     rt = task->thread->runtime;
342 
343     process = nxt_main_process_new(task, rt);
344     if (nxt_slow_path(process == NULL)) {
345         return;
346     }
347 
348     init = nxt_process_init(process);
349 
350     *init = nxt_app_process;
351 
352     b = nxt_buf_chk_make_plain(process->mem_pool, msg->buf, msg->size);
353     if (b == NULL) {
354         goto failed;
355     }
356 
357     nxt_debug(task, "main start process: %*s", b->mem.free - b->mem.pos,
358               b->mem.pos);
359 
360     app_conf = nxt_mp_zalloc(process->mem_pool, sizeof(nxt_common_app_conf_t));
361     if (nxt_slow_path(app_conf == NULL)) {
362         goto failed;
363     }
364 
365     start = b->mem.pos;
366 
367     app_conf->name.start = start;
368     app_conf->name.length = nxt_strlen(start);
369 
370     init->name = (const char *) start;
371 
372     process->name = nxt_mp_alloc(process->mem_pool, app_conf->name.length
373                                  + sizeof("\"\" application") + 1);
374 
375     if (nxt_slow_path(process->name == NULL)) {
376         goto failed;
377     }
378 
379     p = (u_char *) process->name;
380     *p++ = '"';
381     p = nxt_cpymem(p, init->name, app_conf->name.length);
382     p = nxt_cpymem(p, "\" application", 13);
383     *p = '\0';
384 
385     app_conf->shm_limit = 100 * 1024 * 1024;
386 
387     start += app_conf->name.length + 1;
388 
389     conf = nxt_conf_json_parse(process->mem_pool, start, b->mem.free, NULL);
390     if (conf == NULL) {
391         nxt_alert(task, "router app configuration parsing error");
392 
393         goto failed;
394     }
395 
396     rt = task->thread->runtime;
397 
398     app_conf->user.start  = (u_char*)rt->user_cred.user;
399     app_conf->user.length = nxt_strlen(rt->user_cred.user);
400 
401     ret = nxt_conf_map_object(process->mem_pool, conf, nxt_common_app_conf,
402                               nxt_nitems(nxt_common_app_conf), app_conf);
403 
404     if (ret != NXT_OK) {
405         nxt_alert(task, "failed to map common app conf received from router");
406         goto failed;
407     }
408 
409     for (type_len = 0; type_len != app_conf->type.length; type_len++) {
410         ch = app_conf->type.start[type_len];
411 
412         if (ch == ' ' || nxt_isdigit(ch)) {
413             break;
414         }
415     }
416 
417     idx = nxt_app_parse_type(app_conf->type.start, type_len);
418 
419     if (nxt_slow_path(idx >= nxt_nitems(nxt_app_maps))) {
420         nxt_alert(task, "invalid app type %d received from router", (int) idx);
421         goto failed;
422     }
423 
424     ret = nxt_conf_map_object(process->mem_pool, conf, nxt_app_maps[idx].map,
425                               nxt_app_maps[idx].size, app_conf);
426 
427     if (nxt_slow_path(ret != NXT_OK)) {
428         nxt_alert(task, "failed to map app conf received from router");
429         goto failed;
430     }
431 
432     if (app_conf->limits != NULL) {
433         ret = nxt_conf_map_object(process->mem_pool, app_conf->limits,
434                                   nxt_common_app_limits_conf,
435                                   nxt_nitems(nxt_common_app_limits_conf),
436                                   app_conf);
437 
438         if (nxt_slow_path(ret != NXT_OK)) {
439             nxt_alert(task, "failed to map app limits received from router");
440             goto failed;
441         }
442     }
443 
444     app_conf->self = conf;
445 
446     process->stream = msg->port_msg.stream;
447     process->data.app = app_conf;
448 
449     ret = nxt_main_start_process(task, process);
450     if (nxt_fast_path(ret == NXT_OK || ret == NXT_AGAIN)) {
451         return;
452     }
453 
454 failed:
455 
456     nxt_process_use(task, process, -1);
457 
458     port = nxt_runtime_port_find(rt, msg->port_msg.pid,
459                                  msg->port_msg.reply_port);
460 
461     if (nxt_fast_path(port != NULL)) {
462         nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
463                               -1, msg->port_msg.stream, 0, NULL);
464     }
465 }
466 
467 
468 static void
469 nxt_main_process_created_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
470 {
471     nxt_port_t     *port;
472     nxt_process_t  *process;
473     nxt_runtime_t  *rt;
474 
475     rt = task->thread->runtime;
476 
477     process = nxt_runtime_process_find(rt, msg->port_msg.pid);
478     if (nxt_slow_path(process == NULL)) {
479         return;
480     }
481 
482     nxt_assert(process->state == NXT_PROCESS_STATE_CREATING);
483 
484     port = nxt_runtime_port_find(rt, msg->port_msg.pid,
485                                  msg->port_msg.reply_port);
486 
487 
488     if (nxt_slow_path(port == NULL)) {
489         return;
490     }
491 
492 #if (NXT_HAVE_CLONE && NXT_HAVE_CLONE_NEWUSER)
493     if (nxt_is_clone_flag_set(process->isolation.clone.flags, NEWUSER)) {
494         if (nxt_slow_path(nxt_clone_credential_map(task, process->pid,
495                                                    process->user_cred,
496                                                    &process->isolation.clone)
497                           != NXT_OK))
498         {
499             (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
500                                          -1, msg->port_msg.stream, 0, NULL);
501             return;
502         }
503      }
504 
505 #endif
506 
507     process->state = NXT_PROCESS_STATE_CREATED;
508 
509     (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_READY_LAST,
510                                  -1, msg->port_msg.stream, 0, NULL);
511 }
512 
513 
514 static nxt_port_handlers_t  nxt_main_process_port_handlers = {
515     .data             = nxt_port_main_data_handler,
516     .process_created  = nxt_main_process_created_handler,
517     .process_ready    = nxt_port_process_ready_handler,
518     .start_process    = nxt_port_main_start_process_handler,
519     .socket           = nxt_main_port_socket_handler,
520     .modules          = nxt_main_port_modules_handler,
521     .conf_store       = nxt_main_port_conf_store_handler,
522 #if (NXT_TLS)
523     .cert_get         = nxt_cert_store_get_handler,
524     .cert_delete      = nxt_cert_store_delete_handler,
525 #endif
526     .access_log       = nxt_main_port_access_log_handler,
527     .rpc_ready        = nxt_port_rpc_handler,
528     .rpc_error        = nxt_port_rpc_handler,
529 };
530 
531 
532 static nxt_int_t
533 nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt)
534 {
535     nxt_int_t      ret;
536     nxt_port_t     *port;
537     nxt_process_t  *process;
538 
539     port = nxt_runtime_process_port_create(task, rt, nxt_pid, 0,
540                                            NXT_PROCESS_MAIN);
541     if (nxt_slow_path(port == NULL)) {
542         return NXT_ERROR;
543     }
544 
545     process = port->process;
546 
547     ret = nxt_port_socket_init(task, port, 0);
548     if (nxt_slow_path(ret != NXT_OK)) {
549         nxt_port_use(task, port, -1);
550         return ret;
551     }
552 
553     /*
554      * A main process port.  A write port is not closed
555      * since it should be inherited by processes.
556      */
557     nxt_port_enable(task, port, &nxt_main_process_port_handlers);
558 
559     process->state = NXT_PROCESS_STATE_READY;
560 
561     return NXT_OK;
562 }
563 
564 
565 static void
566 nxt_main_process_title(nxt_task_t *task)
567 {
568     u_char      *p, *end;
569     nxt_uint_t  i;
570     u_char      title[2048];
571 
572     end = title + sizeof(title) - 1;
573 
574     p = nxt_sprintf(title, end, "unit: main v" NXT_VERSION " [%s",
575                     nxt_process_argv[0]);
576 
577     for (i = 1; nxt_process_argv[i] != NULL; i++) {
578         p = nxt_sprintf(p, end, " %s", nxt_process_argv[i]);
579     }
580 
581     if (p < end) {
582         *p++ = ']';
583     }
584 
585     *p = '\0';
586 
587     nxt_process_title(task, "%s", title);
588 }
589 
590 
591 static nxt_int_t
592 nxt_main_process_create(nxt_task_t *task, const nxt_process_init_t init)
593 {
594     nxt_int_t           ret;
595     nxt_runtime_t       *rt;
596     nxt_process_t       *process;
597     nxt_process_init_t  *pinit;
598 
599     rt = task->thread->runtime;
600 
601     process = nxt_main_process_new(task, rt);
602     if (nxt_slow_path(process == NULL)) {
603         return NXT_ERROR;
604     }
605 
606     process->name = init.name;
607     process->user_cred = &rt->user_cred;
608 
609     pinit = nxt_process_init(process);
610     *pinit = init;
611 
612     ret = nxt_main_start_process(task, process);
613     if (nxt_slow_path(ret == NXT_ERROR)) {
614         nxt_process_use(task, process, -1);
615     }
616 
617     return ret;
618 }
619 
620 
621 static nxt_process_t *
622 nxt_main_process_new(nxt_task_t *task, nxt_runtime_t *rt)
623 {
624     nxt_process_t  *process;
625 
626     process = nxt_runtime_process_new(rt);
627     if (nxt_slow_path(process == NULL)) {
628         return NULL;
629     }
630 
631     process->mem_pool = nxt_mp_create(1024, 128, 256, 32);
632     if (process->mem_pool == NULL) {
633         nxt_process_use(task, process, -1);
634         return NULL;
635     }
636 
637     return process;
638 }
639 
640 
641 static nxt_int_t
642 nxt_main_start_process(nxt_task_t *task, nxt_process_t *process)
643 {
644     nxt_mp_t            *tmp_mp;
645     nxt_int_t           ret;
646     nxt_pid_t           pid;
647     nxt_port_t          *port;
648     nxt_process_init_t  *init;
649 
650     init = nxt_process_init(process);
651 
652     port = nxt_port_new(task, 0, 0, init->type);
653     if (nxt_slow_path(port == NULL)) {
654         return NXT_ERROR;
655     }
656 
657     nxt_process_port_add(task, process, port);
658 
659     ret = nxt_port_socket_init(task, port, 0);
660     if (nxt_slow_path(ret != NXT_OK)) {
661         goto free_port;
662     }
663 
664     tmp_mp = nxt_mp_create(1024, 128, 256, 32);
665     if (nxt_slow_path(tmp_mp == NULL)) {
666         ret = NXT_ERROR;
667 
668         goto close_port;
669     }
670 
671     if (init->prefork) {
672         ret = init->prefork(task, process, tmp_mp);
673         if (nxt_slow_path(ret != NXT_OK)) {
674             goto free_mempool;
675         }
676     }
677 
678     pid = nxt_process_create(task, process);
679 
680     switch (pid) {
681 
682     case -1:
683         ret = NXT_ERROR;
684         break;
685 
686     case 0:
687         /* The child process: return to the event engine work queue loop. */
688 
689         nxt_process_use(task, process, -1);
690 
691         ret = NXT_AGAIN;
692         break;
693 
694     default:
695         /* The main process created a new process. */
696 
697         nxt_process_use(task, process, -1);
698 
699         nxt_port_read_close(port);
700         nxt_port_write_enable(task, port);
701 
702         ret = NXT_OK;
703         break;
704     }
705 
706 free_mempool:
707 
708     nxt_mp_destroy(tmp_mp);
709 
710 close_port:
711 
712     if (nxt_slow_path(ret == NXT_ERROR)) {
713         nxt_port_close(task, port);
714     }
715 
716 free_port:
717 
718     nxt_port_use(task, port, -1);
719 
720     return ret;
721 }
722 
723 
724 static void
725 nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj, void *data)
726 {
727     nxt_debug(task, "sigterm handler signo:%d (%s)",
728               (int) (uintptr_t) obj, data);
729 
730     /* TODO: fast exit. */
731 
732     nxt_exiting = 1;
733 
734     nxt_runtime_quit(task, 0);
735 }
736 
737 
738 static void
739 nxt_main_process_sigquit_handler(nxt_task_t *task, void *obj, void *data)
740 {
741     nxt_debug(task, "sigquit handler signo:%d (%s)",
742               (int) (uintptr_t) obj, data);
743 
744     /* TODO: graceful exit. */
745 
746     nxt_exiting = 1;
747 
748     nxt_runtime_quit(task, 0);
749 }
750 
751 
752 static void
753 nxt_main_process_sigusr1_handler(nxt_task_t *task, void *obj, void *data)
754 {
755     nxt_mp_t        *mp;
756     nxt_int_t       ret;
757     nxt_uint_t      n;
758     nxt_port_t      *port;
759     nxt_file_t      *file, *new_file;
760     nxt_array_t     *new_files;
761     nxt_runtime_t   *rt;
762 
763     nxt_log(task, NXT_LOG_NOTICE, "signal %d (%s) recevied, %s",
764             (int) (uintptr_t) obj, data, "log files rotation");
765 
766     rt = task->thread->runtime;
767 
768     port = rt->port_by_type[NXT_PROCESS_ROUTER];
769 
770     if (nxt_fast_path(port != NULL)) {
771         (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_ACCESS_LOG,
772                                      -1, 0, 0, NULL);
773     }
774 
775     mp = nxt_mp_create(1024, 128, 256, 32);
776     if (mp == NULL) {
777         return;
778     }
779 
780     n = nxt_list_nelts(rt->log_files);
781 
782     new_files = nxt_array_create(mp, n, sizeof(nxt_file_t));
783     if (new_files == NULL) {
784         nxt_mp_destroy(mp);
785         return;
786     }
787 
788     nxt_list_each(file, rt->log_files) {
789 
790         /* This allocation cannot fail. */
791         new_file = nxt_array_add(new_files);
792 
793         new_file->name = file->name;
794         new_file->fd = NXT_FILE_INVALID;
795         new_file->log_level = NXT_LOG_ALERT;
796 
797         ret = nxt_file_open(task, new_file, O_WRONLY | O_APPEND, O_CREAT,
798                             NXT_FILE_OWNER_ACCESS);
799 
800         if (ret != NXT_OK) {
801             goto fail;
802         }
803 
804     } nxt_list_loop;
805 
806     new_file = new_files->elts;
807 
808     ret = nxt_file_stderr(&new_file[0]);
809 
810     if (ret == NXT_OK) {
811         n = 0;
812 
813         nxt_list_each(file, rt->log_files) {
814 
815             nxt_port_change_log_file(task, rt, n, new_file[n].fd);
816             /*
817              * The old log file descriptor must be closed at the moment
818              * when no other threads use it.  dup2() allows to use the
819              * old file descriptor for new log file.  This change is
820              * performed atomically in the kernel.
821              */
822             (void) nxt_file_redirect(file, new_file[n].fd);
823 
824             n++;
825 
826         } nxt_list_loop;
827 
828         nxt_mp_destroy(mp);
829         return;
830     }
831 
832 fail:
833 
834     new_file = new_files->elts;
835     n = new_files->nelts;
836 
837     while (n != 0) {
838         if (new_file->fd != NXT_FILE_INVALID) {
839             nxt_file_close(task, new_file);
840         }
841 
842         new_file++;
843         n--;
844     }
845 
846     nxt_mp_destroy(mp);
847 }
848 
849 
850 static void
851 nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data)
852 {
853     int                    status;
854     nxt_err_t              err;
855     nxt_pid_t              pid;
856 
857     nxt_debug(task, "sigchld handler signo:%d (%s)",
858               (int) (uintptr_t) obj, data);
859 
860     for ( ;; ) {
861         pid = waitpid(-1, &status, WNOHANG);
862 
863         if (pid == -1) {
864 
865             switch (err = nxt_errno) {
866 
867             case NXT_ECHILD:
868                 return;
869 
870             case NXT_EINTR:
871                 continue;
872 
873             default:
874                 nxt_alert(task, "waitpid() failed: %E", err);
875                 return;
876             }
877         }
878 
879         nxt_debug(task, "waitpid(): %PI", pid);
880 
881         if (pid == 0) {
882             return;
883         }
884 
885         if (WTERMSIG(status)) {
886 #ifdef WCOREDUMP
887             nxt_alert(task, "process %PI exited on signal %d%s",
888                       pid, WTERMSIG(status),
889                       WCOREDUMP(status) ? " (core dumped)" : "");
890 #else
891             nxt_alert(task, "process %PI exited on signal %d",
892                       pid, WTERMSIG(status));
893 #endif
894 
895         } else {
896             nxt_trace(task, "process %PI exited with code %d",
897                       pid, WEXITSTATUS(status));
898         }
899 
900         nxt_main_cleanup_process(task, pid);
901     }
902 }
903 
904 
905 static void
906 nxt_main_process_signal_handler(nxt_task_t *task, void *obj, void *data)
907 {
908     nxt_trace(task, "signal signo:%d (%s) recevied, ignored",
909               (int) (uintptr_t) obj, data);
910 }
911 
912 
913 static void
914 nxt_main_cleanup_process(nxt_task_t *task, nxt_pid_t pid)
915 {
916     int                 stream;
917     nxt_int_t           ret;
918     nxt_buf_t           *buf;
919     nxt_port_t          *port;
920     const char          *name;
921     nxt_runtime_t       *rt;
922     nxt_process_t       *process;
923     nxt_process_init_t  init;
924 
925     rt = task->thread->runtime;
926 
927     process = nxt_runtime_process_find(rt, pid);
928     if (!process) {
929         return;
930     }
931 
932     if (process->isolation.cleanup != NULL) {
933         process->isolation.cleanup(task, process);
934     }
935 
936     name = process->name;
937     stream = process->stream;
938     init = *((nxt_process_init_t *) nxt_process_init(process));
939 
940     if (process->state == NXT_PROCESS_STATE_READY) {
941         process->stream = 0;
942     }
943 
944     nxt_process_close_ports(task, process);
945 
946     if (nxt_exiting) {
947         if (rt->nprocesses <= 1) {
948             nxt_runtime_quit(task, 0);
949         }
950 
951         return;
952     }
953 
954     nxt_runtime_process_each(rt, process) {
955 
956         if (process->pid == nxt_pid
957             || process->pid == pid
958             || nxt_queue_is_empty(&process->ports))
959         {
960             continue;
961         }
962 
963         port = nxt_process_port_first(process);
964 
965         if (nxt_proc_remove_notify_matrix[init.type][port->type] == 0) {
966             continue;
967         }
968 
969         buf = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
970                                    sizeof(pid));
971 
972         if (nxt_slow_path(buf == NULL)) {
973             continue;
974         }
975 
976         buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(pid));
977 
978         nxt_port_socket_write(task, port, NXT_PORT_MSG_REMOVE_PID, -1,
979                               stream, 0, buf);
980 
981     } nxt_runtime_process_loop;
982 
983     if (init.restart) {
984         ret = nxt_main_process_create(task, init);
985         if (nxt_slow_path(ret == NXT_ERROR)) {
986             nxt_alert(task, "failed to restart %s", name);
987         }
988     }
989 }
990 
991 
992 static void
993 nxt_main_port_socket_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
994 {
995     size_t                  size;
996     nxt_int_t               ret;
997     nxt_buf_t               *b, *out;
998     nxt_port_t              *port;
999     nxt_sockaddr_t          *sa;
1000     nxt_port_msg_type_t     type;
1001     nxt_listening_socket_t  ls;
1002     u_char                  message[2048];
1003 
1004     b = msg->buf;
1005     sa = (nxt_sockaddr_t *) b->mem.pos;
1006 
1007     /* TODO check b size and make plain */
1008 
1009     out = NULL;
1010 
1011     ls.socket = -1;
1012     ls.error = NXT_SOCKET_ERROR_SYSTEM;
1013     ls.start = message;
1014     ls.end = message + sizeof(message);
1015 
1016     port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
1017                                  msg->port_msg.reply_port);
1018 
1019     nxt_debug(task, "listening socket \"%*s\"",
1020               (size_t) sa->length, nxt_sockaddr_start(sa));
1021 
1022     ret = nxt_main_listening_socket(sa, &ls);
1023 
1024     if (ret == NXT_OK) {
1025         nxt_debug(task, "socket(\"%*s\"): %d",
1026                   (size_t) sa->length, nxt_sockaddr_start(sa), ls.socket);
1027 
1028         type = NXT_PORT_MSG_RPC_READY_LAST | NXT_PORT_MSG_CLOSE_FD;
1029 
1030     } else {
1031         size = ls.end - ls.start;
1032 
1033         nxt_alert(task, "%*s", size, ls.start);
1034 
1035         out = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
1036                                    size + 1);
1037         if (nxt_slow_path(out == NULL)) {
1038             return;
1039         }
1040 
1041         *out->mem.free++ = (uint8_t) ls.error;
1042 
1043         out->mem.free = nxt_cpymem(out->mem.free, ls.start, size);
1044 
1045         type = NXT_PORT_MSG_RPC_ERROR;
1046     }
1047 
1048     nxt_port_socket_write(task, port, type, ls.socket, msg->port_msg.stream,
1049                           0, out);
1050 }
1051 
1052 
1053 static nxt_int_t
1054 nxt_main_listening_socket(nxt_sockaddr_t *sa, nxt_listening_socket_t *ls)
1055 {
1056     nxt_err_t         err;
1057     nxt_socket_t      s;
1058 
1059     const socklen_t   length = sizeof(int);
1060     static const int  enable = 1;
1061 
1062     s = socket(sa->u.sockaddr.sa_family, sa->type, 0);
1063 
1064     if (nxt_slow_path(s == -1)) {
1065         err = nxt_errno;
1066 
1067 #if (NXT_INET6)
1068 
1069         if (err == EAFNOSUPPORT && sa->u.sockaddr.sa_family == AF_INET6) {
1070             ls->error = NXT_SOCKET_ERROR_NOINET6;
1071         }
1072 
1073 #endif
1074 
1075         ls->end = nxt_sprintf(ls->start, ls->end,
1076                               "socket(\\\"%*s\\\") failed %E",
1077                               (size_t) sa->length, nxt_sockaddr_start(sa), err);
1078 
1079         return NXT_ERROR;
1080     }
1081 
1082     if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &enable, length) != 0) {
1083         ls->end = nxt_sprintf(ls->start, ls->end,
1084                               "setsockopt(\\\"%*s\\\", SO_REUSEADDR) failed %E",
1085                               (size_t) sa->length, nxt_sockaddr_start(sa),
1086                               nxt_errno);
1087         goto fail;
1088     }
1089 
1090 #if (NXT_INET6)
1091 
1092     if (sa->u.sockaddr.sa_family == AF_INET6) {
1093 
1094         if (setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &enable, length) != 0) {
1095             ls->end = nxt_sprintf(ls->start, ls->end,
1096                                "setsockopt(\\\"%*s\\\", IPV6_V6ONLY) failed %E",
1097                                (size_t) sa->length, nxt_sockaddr_start(sa),
1098                                nxt_errno);
1099             goto fail;
1100         }
1101     }
1102 
1103 #endif
1104 
1105     if (bind(s, &sa->u.sockaddr, sa->socklen) != 0) {
1106         err = nxt_errno;
1107 
1108 #if (NXT_HAVE_UNIX_DOMAIN)
1109 
1110         if (sa->u.sockaddr.sa_family == AF_UNIX) {
1111             switch (err) {
1112 
1113             case EACCES:
1114                 ls->error = NXT_SOCKET_ERROR_ACCESS;
1115                 break;
1116 
1117             case ENOENT:
1118             case ENOTDIR:
1119                 ls->error = NXT_SOCKET_ERROR_PATH;
1120                 break;
1121             }
1122 
1123         } else
1124 #endif
1125         {
1126             switch (err) {
1127 
1128             case EACCES:
1129                 ls->error = NXT_SOCKET_ERROR_PORT;
1130                 break;
1131 
1132             case EADDRINUSE:
1133                 ls->error = NXT_SOCKET_ERROR_INUSE;
1134                 break;
1135 
1136             case EADDRNOTAVAIL:
1137                 ls->error = NXT_SOCKET_ERROR_NOADDR;
1138                 break;
1139             }
1140         }
1141 
1142         ls->end = nxt_sprintf(ls->start, ls->end, "bind(\\\"%*s\\\") failed %E",
1143                               (size_t) sa->length, nxt_sockaddr_start(sa), err);
1144         goto fail;
1145     }
1146 
1147 #if (NXT_HAVE_UNIX_DOMAIN)
1148 
1149     if (sa->u.sockaddr.sa_family == AF_UNIX) {
1150         char     *filename;
1151         mode_t   access;
1152 
1153         filename = sa->u.sockaddr_un.sun_path;
1154         access = (S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
1155 
1156         if (chmod(filename, access) != 0) {
1157             ls->end = nxt_sprintf(ls->start, ls->end,
1158                                   "chmod(\\\"%s\\\") failed %E",
1159                                   filename, nxt_errno);
1160             goto fail;
1161         }
1162     }
1163 
1164 #endif
1165 
1166     ls->socket = s;
1167 
1168     return NXT_OK;
1169 
1170 fail:
1171 
1172     (void) close(s);
1173 
1174     return NXT_ERROR;
1175 }
1176 
1177 
1178 static nxt_conf_map_t  nxt_app_lang_module_map[] = {
1179     {
1180         nxt_string("type"),
1181         NXT_CONF_MAP_INT,
1182         offsetof(nxt_app_lang_module_t, type),
1183     },
1184 
1185     {
1186         nxt_string("version"),
1187         NXT_CONF_MAP_CSTRZ,
1188         offsetof(nxt_app_lang_module_t, version),
1189     },
1190 
1191     {
1192         nxt_string("file"),
1193         NXT_CONF_MAP_CSTRZ,
1194         offsetof(nxt_app_lang_module_t, file),
1195     },
1196 };
1197 
1198 
1199 static nxt_conf_map_t  nxt_app_lang_mounts_map[] = {
1200     {
1201         nxt_string("src"),
1202         NXT_CONF_MAP_CSTRZ,
1203         offsetof(nxt_fs_mount_t, src),
1204     },
1205     {
1206         nxt_string("dst"),
1207         NXT_CONF_MAP_CSTRZ,
1208         offsetof(nxt_fs_mount_t, dst),
1209     },
1210     {
1211         nxt_string("name"),
1212         NXT_CONF_MAP_CSTRZ,
1213         offsetof(nxt_fs_mount_t, name),
1214     },
1215     {
1216         nxt_string("type"),
1217         NXT_CONF_MAP_INT,
1218         offsetof(nxt_fs_mount_t, type),
1219     },
1220     {
1221         nxt_string("flags"),
1222         NXT_CONF_MAP_INT,
1223         offsetof(nxt_fs_mount_t, flags),
1224     },
1225     {
1226         nxt_string("data"),
1227         NXT_CONF_MAP_CSTRZ,
1228         offsetof(nxt_fs_mount_t, data),
1229     },
1230 };
1231 
1232 
1233 static void
1234 nxt_main_port_modules_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1235 {
1236     uint32_t               index, jindex, nmounts;
1237     nxt_mp_t               *mp;
1238     nxt_int_t              ret;
1239     nxt_buf_t              *b;
1240     nxt_port_t             *port;
1241     nxt_runtime_t          *rt;
1242     nxt_fs_mount_t         *mnt;
1243     nxt_conf_value_t       *conf, *root, *value, *mounts;
1244     nxt_app_lang_module_t  *lang;
1245 
1246     static nxt_str_t root_path = nxt_string("/");
1247     static nxt_str_t mounts_name = nxt_string("mounts");
1248 
1249     rt = task->thread->runtime;
1250 
1251     if (msg->port_msg.pid != rt->port_by_type[NXT_PROCESS_DISCOVERY]->pid) {
1252         return;
1253     }
1254 
1255     if (nxt_exiting) {
1256         nxt_debug(task, "ignoring discovered modules, exiting");
1257         return;
1258     }
1259 
1260     port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
1261                                  msg->port_msg.reply_port);
1262 
1263     if (nxt_fast_path(port != NULL)) {
1264         (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
1265                                      msg->port_msg.stream, 0, NULL);
1266     }
1267 
1268     b = msg->buf;
1269 
1270     if (b == NULL) {
1271         return;
1272     }
1273 
1274     mp = nxt_mp_create(1024, 128, 256, 32);
1275     if (mp == NULL) {
1276         return;
1277     }
1278 
1279     b = nxt_buf_chk_make_plain(mp, b, msg->size);
1280 
1281     if (b == NULL) {
1282         return;
1283     }
1284 
1285     nxt_debug(task, "application languages: \"%*s\"",
1286               b->mem.free - b->mem.pos, b->mem.pos);
1287 
1288     conf = nxt_conf_json_parse(mp, b->mem.pos, b->mem.free, NULL);
1289     if (conf == NULL) {
1290         goto fail;
1291     }
1292 
1293     root = nxt_conf_get_path(conf, &root_path);
1294     if (root == NULL) {
1295         goto fail;
1296     }
1297 
1298     for (index = 0; /* void */ ; index++) {
1299         value = nxt_conf_get_array_element(root, index);
1300         if (value == NULL) {
1301             break;
1302         }
1303 
1304         lang = nxt_array_zero_add(rt->languages);
1305         if (lang == NULL) {
1306             goto fail;
1307         }
1308 
1309         lang->module = NULL;
1310 
1311         ret = nxt_conf_map_object(rt->mem_pool, value, nxt_app_lang_module_map,
1312                                   nxt_nitems(nxt_app_lang_module_map), lang);
1313 
1314         if (ret != NXT_OK) {
1315             goto fail;
1316         }
1317 
1318         mounts = nxt_conf_get_object_member(value, &mounts_name, NULL);
1319         if (mounts == NULL) {
1320             nxt_alert(task, "missing mounts from discovery message.");
1321             goto fail;
1322         }
1323 
1324         if (nxt_conf_type(mounts) != NXT_CONF_ARRAY) {
1325             nxt_alert(task, "invalid mounts type from discovery message.");
1326             goto fail;
1327         }
1328 
1329         nmounts = nxt_conf_array_elements_count(mounts);
1330 
1331         lang->mounts = nxt_array_create(rt->mem_pool, nmounts,
1332                                         sizeof(nxt_fs_mount_t));
1333 
1334         if (lang->mounts == NULL) {
1335             goto fail;
1336         }
1337 
1338         for (jindex = 0; /* */; jindex++) {
1339             value = nxt_conf_get_array_element(mounts, jindex);
1340             if (value == NULL) {
1341                 break;
1342             }
1343 
1344             mnt = nxt_array_zero_add(lang->mounts);
1345             if (mnt == NULL) {
1346                 goto fail;
1347             }
1348 
1349             mnt->builtin = 1;
1350             mnt->deps = 1;
1351 
1352             ret = nxt_conf_map_object(rt->mem_pool, value,
1353                                       nxt_app_lang_mounts_map,
1354                                       nxt_nitems(nxt_app_lang_mounts_map), mnt);
1355 
1356             if (ret != NXT_OK) {
1357                 goto fail;
1358             }
1359         }
1360 
1361         nxt_debug(task, "lang %d %s \"%s\" (%d mounts)",
1362                   lang->type, lang->version, lang->file, lang->mounts->nelts);
1363     }
1364 
1365     qsort(rt->languages->elts, rt->languages->nelts,
1366           sizeof(nxt_app_lang_module_t), nxt_app_lang_compare);
1367 
1368 fail:
1369 
1370     nxt_mp_destroy(mp);
1371 
1372     ret = nxt_main_process_create(task, nxt_controller_process);
1373     if (ret == NXT_OK) {
1374         ret = nxt_main_process_create(task, nxt_router_process);
1375     }
1376 
1377     if (nxt_slow_path(ret == NXT_ERROR)) {
1378         nxt_exiting = 1;
1379 
1380         nxt_runtime_quit(task, 1);
1381     }
1382 }
1383 
1384 
1385 static int nxt_cdecl
1386 nxt_app_lang_compare(const void *v1, const void *v2)
1387 {
1388     int                          n;
1389     const nxt_app_lang_module_t  *lang1, *lang2;
1390 
1391     lang1 = v1;
1392     lang2 = v2;
1393 
1394     n = lang1->type - lang2->type;
1395 
1396     if (n != 0) {
1397         return n;
1398     }
1399 
1400     n = nxt_strverscmp(lang1->version, lang2->version);
1401 
1402     /* Negate result to move higher versions to the beginning. */
1403 
1404     return -n;
1405 }
1406 
1407 
1408 static void
1409 nxt_main_port_conf_store_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1410 {
1411     void           *p;
1412     size_t         size;
1413     ssize_t        n;
1414     nxt_int_t      ret;
1415     nxt_file_t     file;
1416     nxt_runtime_t  *rt;
1417 
1418     p = MAP_FAILED;
1419 
1420     /*
1421      * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be
1422      * initialized in 'cleanup' section.
1423      */
1424     size = 0;
1425 
1426     if (nxt_slow_path(msg->fd[0] == -1)) {
1427         nxt_alert(task, "conf_store_handler: invalid shm fd");
1428         goto error;
1429     }
1430 
1431     if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) {
1432         nxt_alert(task, "conf_store_handler: unexpected buffer size (%d)",
1433                   (int) nxt_buf_mem_used_size(&msg->buf->mem));
1434         goto error;
1435     }
1436 
1437     nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t));
1438 
1439     p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0);
1440 
1441     nxt_fd_close(msg->fd[0]);
1442     msg->fd[0] = -1;
1443 
1444     if (nxt_slow_path(p == MAP_FAILED)) {
1445         goto error;
1446     }
1447 
1448     nxt_debug(task, "conf_store_handler(%uz): %*s", size, size, p);
1449 
1450     nxt_memzero(&file, sizeof(nxt_file_t));
1451 
1452     rt = task->thread->runtime;
1453 
1454     file.name = (nxt_file_name_t *) rt->conf_tmp;
1455 
1456     if (nxt_slow_path(nxt_file_open(task, &file, NXT_FILE_WRONLY,
1457                                     NXT_FILE_TRUNCATE, NXT_FILE_OWNER_ACCESS)
1458                       != NXT_OK))
1459     {
1460         goto error;
1461     }
1462 
1463     n = nxt_file_write(&file, p, size, 0);
1464 
1465     nxt_file_close(task, &file);
1466 
1467     if (nxt_slow_path(n != (ssize_t) size)) {
1468         (void) nxt_file_delete(file.name);
1469         goto error;
1470     }
1471 
1472     ret = nxt_file_rename(file.name, (nxt_file_name_t *) rt->conf);
1473 
1474     if (nxt_fast_path(ret == NXT_OK)) {
1475         goto cleanup;
1476     }
1477 
1478 error:
1479 
1480     nxt_alert(task, "failed to store current configuration");
1481 
1482 cleanup:
1483 
1484     if (p != MAP_FAILED) {
1485         nxt_mem_munmap(p, size);
1486     }
1487 
1488     if (msg->fd[0] != -1) {
1489         nxt_fd_close(msg->fd[0]);
1490         msg->fd[0] = -1;
1491     }
1492 }
1493 
1494 
1495 static void
1496 nxt_main_port_access_log_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1497 {
1498     u_char               *path;
1499     nxt_int_t            ret;
1500     nxt_file_t           file;
1501     nxt_port_t           *port;
1502     nxt_port_msg_type_t  type;
1503 
1504     nxt_debug(task, "opening access log file");
1505 
1506     path = msg->buf->mem.pos;
1507 
1508     nxt_memzero(&file, sizeof(nxt_file_t));
1509 
1510     file.name = (nxt_file_name_t *) path;
1511     file.log_level = NXT_LOG_ERR;
1512 
1513     ret = nxt_file_open(task, &file, O_WRONLY | O_APPEND, O_CREAT,
1514                         NXT_FILE_OWNER_ACCESS);
1515 
1516     type = (ret == NXT_OK) ? NXT_PORT_MSG_RPC_READY_LAST | NXT_PORT_MSG_CLOSE_FD
1517                            : NXT_PORT_MSG_RPC_ERROR;
1518 
1519     port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
1520                                  msg->port_msg.reply_port);
1521 
1522     if (nxt_fast_path(port != NULL)) {
1523         (void) nxt_port_socket_write(task, port, type, file.fd,
1524                                      msg->port_msg.stream, 0, NULL);
1525     }
1526 }
1527