xref: /unit/src/nxt_runtime.c (revision 1781:e1f459d7469b)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Valentin V. Bartenev
5  * Copyright (C) NGINX, Inc.
6  */
7 
8 #include <nxt_main.h>
9 #include <nxt_runtime.h>
10 #include <nxt_port.h>
11 #include <nxt_main_process.h>
12 #include <nxt_router.h>
13 #include <nxt_regex.h>
14 
15 
16 static nxt_int_t nxt_runtime_inherited_listen_sockets(nxt_task_t *task,
17     nxt_runtime_t *rt);
18 static nxt_int_t nxt_runtime_systemd_listen_sockets(nxt_task_t *task,
19     nxt_runtime_t *rt);
20 static nxt_int_t nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt);
21 static nxt_int_t nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt);
22 static void nxt_runtime_start(nxt_task_t *task, void *obj, void *data);
23 static void nxt_runtime_initial_start(nxt_task_t *task, nxt_uint_t status);
24 static void nxt_runtime_close_idle_connections(nxt_event_engine_t *engine);
25 static void nxt_runtime_stop_all_processes(nxt_task_t *task, nxt_runtime_t *rt);
26 static void nxt_runtime_exit(nxt_task_t *task, void *obj, void *data);
27 static nxt_int_t nxt_runtime_event_engine_change(nxt_task_t *task,
28     nxt_runtime_t *rt);
29 static nxt_int_t nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt);
30 static nxt_int_t nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt);
31 static nxt_int_t nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt);
32 static nxt_int_t nxt_runtime_log_files_init(nxt_runtime_t *rt);
33 static nxt_int_t nxt_runtime_log_files_create(nxt_task_t *task,
34     nxt_runtime_t *rt);
35 static nxt_int_t nxt_runtime_pid_file_create(nxt_task_t *task,
36     nxt_file_name_t *pid_file);
37 static void nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt,
38     nxt_runtime_cont_t cont);
39 static void nxt_runtime_thread_pool_init(void);
40 static void nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj,
41     void *data);
42 static nxt_process_t *nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid);
43 static void nxt_runtime_process_remove(nxt_runtime_t *rt,
44     nxt_process_t *process);
45 static void nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port);
46 
47 
48 nxt_int_t
49 nxt_runtime_create(nxt_task_t *task)
50 {
51     nxt_mp_t               *mp;
52     nxt_int_t              ret;
53     nxt_array_t            *listen_sockets;
54     nxt_runtime_t          *rt;
55     nxt_app_lang_module_t  *lang;
56 
57     mp = nxt_mp_create(1024, 128, 256, 32);
58     if (nxt_slow_path(mp == NULL)) {
59         return NXT_ERROR;
60     }
61 
62     rt = nxt_mp_zget(mp, sizeof(nxt_runtime_t));
63     if (nxt_slow_path(rt == NULL)) {
64         goto fail;
65     }
66 
67     task->thread->runtime = rt;
68     rt->mem_pool = mp;
69 
70     nxt_thread_mutex_create(&rt->processes_mutex);
71 
72     rt->services = nxt_services_init(mp);
73     if (nxt_slow_path(rt->services == NULL)) {
74         goto fail;
75     }
76 
77     rt->languages = nxt_array_create(mp, 1, sizeof(nxt_app_lang_module_t));
78     if (nxt_slow_path(rt->languages == NULL)) {
79         goto fail;
80     }
81 
82     /* Should not fail. */
83     lang = nxt_array_add(rt->languages);
84     lang->type = NXT_APP_EXTERNAL;
85     lang->version = (u_char *) "";
86     lang->file = NULL;
87     lang->module = &nxt_external_module;
88 
89     lang->mounts = nxt_array_create(mp, 1, sizeof(nxt_fs_mount_t));
90     if (nxt_slow_path(lang->mounts == NULL)) {
91         goto fail;
92     }
93 
94     listen_sockets = nxt_array_create(mp, 1, sizeof(nxt_listen_socket_t));
95     if (nxt_slow_path(listen_sockets == NULL)) {
96         goto fail;
97     }
98 
99     rt->listen_sockets = listen_sockets;
100 
101     ret = nxt_runtime_inherited_listen_sockets(task, rt);
102     if (nxt_slow_path(ret != NXT_OK)) {
103         goto fail;
104     }
105 
106     if (nxt_runtime_hostname(task, rt) != NXT_OK) {
107         goto fail;
108     }
109 
110     if (nxt_slow_path(nxt_runtime_log_files_init(rt) != NXT_OK)) {
111         goto fail;
112     }
113 
114     if (nxt_runtime_event_engines(task, rt) != NXT_OK) {
115         goto fail;
116     }
117 
118     if (nxt_slow_path(nxt_runtime_thread_pools(task->thread, rt) != NXT_OK)) {
119         goto fail;
120     }
121 
122     rt->start = nxt_runtime_initial_start;
123 
124     if (nxt_runtime_conf_init(task, rt) != NXT_OK) {
125         goto fail;
126     }
127 
128     if (nxt_port_rpc_init() != NXT_OK) {
129         goto fail;
130     }
131 
132     if (nxt_slow_path(nxt_http_register_variables() != NXT_OK)) {
133         goto fail;
134     }
135 
136     if (nxt_slow_path(nxt_var_index_init() != NXT_OK)) {
137         goto fail;
138     }
139 
140     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
141                        nxt_runtime_start, task, rt, NULL);
142 
143     return NXT_OK;
144 
145 fail:
146 
147     nxt_mp_destroy(mp);
148 
149     return NXT_ERROR;
150 }
151 
152 
153 static nxt_int_t
154 nxt_runtime_inherited_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt)
155 {
156     u_char               *v, *p;
157     nxt_int_t            type;
158     nxt_array_t          *inherited_sockets;
159     nxt_socket_t         s;
160     nxt_listen_socket_t  *ls;
161 
162     v = (u_char *) getenv("NGINX");
163 
164     if (v == NULL) {
165         return nxt_runtime_systemd_listen_sockets(task, rt);
166     }
167 
168     nxt_alert(task, "using inherited listen sockets: %s", v);
169 
170     inherited_sockets = nxt_array_create(rt->mem_pool,
171                                          1, sizeof(nxt_listen_socket_t));
172     if (inherited_sockets == NULL) {
173         return NXT_ERROR;
174     }
175 
176     rt->inherited_sockets = inherited_sockets;
177 
178     for (p = v; *p != '\0'; p++) {
179 
180         if (*p == ';') {
181             s = nxt_int_parse(v, p - v);
182 
183             if (nxt_slow_path(s < 0)) {
184                 nxt_alert(task, "invalid socket number \"%s\" "
185                           "in NGINX environment variable, "
186                           "ignoring the rest of the variable", v);
187                 return NXT_ERROR;
188             }
189 
190             v = p + 1;
191 
192             ls = nxt_array_zero_add(inherited_sockets);
193             if (nxt_slow_path(ls == NULL)) {
194                 return NXT_ERROR;
195             }
196 
197             ls->socket = s;
198 
199             ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s);
200             if (nxt_slow_path(ls->sockaddr == NULL)) {
201                 return NXT_ERROR;
202             }
203 
204             type = nxt_socket_getsockopt(task, s, SOL_SOCKET, SO_TYPE);
205             if (nxt_slow_path(type == -1)) {
206                 return NXT_ERROR;
207             }
208 
209             ls->sockaddr->type = (uint16_t) type;
210         }
211     }
212 
213     return NXT_OK;
214 }
215 
216 
217 static nxt_int_t
218 nxt_runtime_systemd_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt)
219 {
220     u_char               *nfd, *pid;
221     nxt_int_t            n;
222     nxt_array_t          *inherited_sockets;
223     nxt_socket_t         s;
224     nxt_listen_socket_t  *ls;
225 
226     /*
227      * Number of listening sockets passed.  The socket
228      * descriptors start from number 3 and are sequential.
229      */
230     nfd = (u_char *) getenv("LISTEN_FDS");
231     if (nfd == NULL) {
232         return NXT_OK;
233     }
234 
235     /* The pid of the service process. */
236     pid = (u_char *) getenv("LISTEN_PID");
237     if (pid == NULL) {
238         return NXT_OK;
239     }
240 
241     n = nxt_int_parse(nfd, nxt_strlen(nfd));
242     if (n < 0) {
243         return NXT_OK;
244     }
245 
246     if (nxt_pid != nxt_int_parse(pid, nxt_strlen(pid))) {
247         return NXT_OK;
248     }
249 
250     nxt_log(task, NXT_LOG_INFO, "using %i systemd listen sockets", n);
251 
252     inherited_sockets = nxt_array_create(rt->mem_pool,
253                                          n, sizeof(nxt_listen_socket_t));
254     if (inherited_sockets == NULL) {
255         return NXT_ERROR;
256     }
257 
258     rt->inherited_sockets = inherited_sockets;
259 
260     for (s = 3; s < n; s++) {
261         ls = nxt_array_zero_add(inherited_sockets);
262         if (nxt_slow_path(ls == NULL)) {
263             return NXT_ERROR;
264         }
265 
266         ls->socket = s;
267 
268         ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s);
269         if (nxt_slow_path(ls->sockaddr == NULL)) {
270             return NXT_ERROR;
271         }
272 
273         ls->sockaddr->type = SOCK_STREAM;
274     }
275 
276     return NXT_OK;
277 }
278 
279 
280 static nxt_int_t
281 nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt)
282 {
283     nxt_thread_t                 *thread;
284     nxt_event_engine_t           *engine;
285     const nxt_event_interface_t  *interface;
286 
287     interface = nxt_service_get(rt->services, "engine", NULL);
288 
289     if (nxt_slow_path(interface == NULL)) {
290         /* TODO: log */
291         return NXT_ERROR;
292     }
293 
294     engine = nxt_event_engine_create(task, interface,
295                                      nxt_main_process_signals, 0, 0);
296 
297     if (nxt_slow_path(engine == NULL)) {
298         return NXT_ERROR;
299     }
300 
301     thread = task->thread;
302     thread->engine = engine;
303 #if 0
304     thread->fiber = &engine->fibers->fiber;
305 #endif
306 
307     engine->id = rt->last_engine_id++;
308     engine->mem_pool = nxt_mp_create(1024, 128, 256, 32);
309 
310     nxt_queue_init(&rt->engines);
311     nxt_queue_insert_tail(&rt->engines, &engine->link);
312 
313     return NXT_OK;
314 }
315 
316 
317 static nxt_int_t
318 nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt)
319 {
320     nxt_int_t    ret;
321     nxt_array_t  *thread_pools;
322 
323     thread_pools = nxt_array_create(rt->mem_pool, 1,
324                                     sizeof(nxt_thread_pool_t *));
325 
326     if (nxt_slow_path(thread_pools == NULL)) {
327         return NXT_ERROR;
328     }
329 
330     rt->thread_pools = thread_pools;
331     ret = nxt_runtime_thread_pool_create(thr, rt, 2, 60000 * 1000000LL);
332 
333     if (nxt_slow_path(ret != NXT_OK)) {
334         return NXT_ERROR;
335     }
336 
337     return NXT_OK;
338 }
339 
340 
341 static void
342 nxt_runtime_start(nxt_task_t *task, void *obj, void *data)
343 {
344     nxt_runtime_t  *rt;
345 
346     rt = obj;
347 
348     nxt_debug(task, "rt conf done");
349 
350     task->thread->log->ctx_handler = NULL;
351     task->thread->log->ctx = NULL;
352 
353     if (nxt_runtime_log_files_create(task, rt) != NXT_OK) {
354         goto fail;
355     }
356 
357     if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) {
358         goto fail;
359     }
360 
361     /*
362      * Thread pools should be destroyed before starting worker
363      * processes, because thread pool semaphores will stick in
364      * locked state in new processes after fork().
365      */
366     nxt_runtime_thread_pool_destroy(task, rt, rt->start);
367 
368     return;
369 
370 fail:
371 
372     nxt_runtime_quit(task, 1);
373 }
374 
375 
376 static void
377 nxt_runtime_initial_start(nxt_task_t *task, nxt_uint_t status)
378 {
379     nxt_int_t                    ret;
380     nxt_thread_t                 *thr;
381     nxt_runtime_t                *rt;
382     const nxt_event_interface_t  *interface;
383 
384     thr = task->thread;
385     rt = thr->runtime;
386 
387     if (rt->inherited_sockets == NULL && rt->daemon) {
388 
389         if (nxt_process_daemon(task) != NXT_OK) {
390             goto fail;
391         }
392 
393         /*
394          * An event engine should be updated after fork()
395          * even if an event facility was not changed because:
396          * 1) inherited kqueue descriptor is invalid,
397          * 2) the signal thread is not inherited.
398          */
399         interface = nxt_service_get(rt->services, "engine", rt->engine);
400         if (interface == NULL) {
401             goto fail;
402         }
403 
404         ret = nxt_event_engine_change(task->thread->engine, interface,
405                                       rt->batch);
406         if (ret != NXT_OK) {
407             goto fail;
408         }
409     }
410 
411     ret = nxt_runtime_pid_file_create(task, rt->pid_file);
412     if (ret != NXT_OK) {
413         goto fail;
414     }
415 
416     if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) {
417         goto fail;
418     }
419 
420     thr->engine->max_connections = rt->engine_connections;
421 
422     if (nxt_main_process_start(thr, task, rt) != NXT_ERROR) {
423         return;
424     }
425 
426 fail:
427 
428     nxt_runtime_quit(task, 1);
429 }
430 
431 
432 void
433 nxt_runtime_quit(nxt_task_t *task, nxt_uint_t status)
434 {
435     nxt_bool_t          done;
436     nxt_runtime_t       *rt;
437     nxt_event_engine_t  *engine;
438 
439     rt = task->thread->runtime;
440     rt->status |= status;
441     engine = task->thread->engine;
442 
443     nxt_debug(task, "exiting");
444 
445     done = 1;
446 
447     if (!engine->shutdown) {
448         engine->shutdown = 1;
449 
450         if (!nxt_array_is_empty(rt->thread_pools)) {
451             nxt_runtime_thread_pool_destroy(task, rt, nxt_runtime_quit);
452             done = 0;
453         }
454 
455         if (rt->type == NXT_PROCESS_MAIN) {
456             nxt_runtime_stop_all_processes(task, rt);
457             done = 0;
458         }
459     }
460 
461     nxt_runtime_close_idle_connections(engine);
462 
463     if (done) {
464         nxt_work_queue_add(&engine->fast_work_queue, nxt_runtime_exit,
465                            task, rt, engine);
466     }
467 }
468 
469 
470 static void
471 nxt_runtime_close_idle_connections(nxt_event_engine_t *engine)
472 {
473     nxt_conn_t        *c;
474     nxt_queue_t       *idle;
475     nxt_queue_link_t  *link, *next;
476 
477     nxt_debug(&engine->task, "close idle connections");
478 
479     idle = &engine->idle_connections;
480 
481     for (link = nxt_queue_first(idle);
482          link != nxt_queue_tail(idle);
483          link = next)
484     {
485         next = nxt_queue_next(link);
486         c = nxt_queue_link_data(link, nxt_conn_t, link);
487 
488         if (!c->socket.read_ready) {
489             nxt_queue_remove(link);
490             nxt_conn_close(engine, c);
491         }
492     }
493 }
494 
495 
496 void
497 nxt_runtime_stop_app_processes(nxt_task_t *task, nxt_runtime_t *rt)
498 {
499     nxt_port_t          *port;
500     nxt_process_t       *process;
501     nxt_process_init_t  *init;
502 
503     nxt_runtime_process_each(rt, process) {
504 
505         init = nxt_process_init(process);
506 
507         if (init->type == NXT_PROCESS_APP) {
508 
509             nxt_process_port_each(process, port) {
510 
511                 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1,
512                                              0, 0, NULL);
513 
514             } nxt_process_port_loop;
515         }
516 
517     } nxt_runtime_process_loop;
518 }
519 
520 
521 static void
522 nxt_runtime_stop_all_processes(nxt_task_t *task, nxt_runtime_t *rt)
523 {
524     nxt_port_t     *port;
525     nxt_process_t  *process;
526 
527     nxt_runtime_process_each(rt, process) {
528 
529         nxt_process_port_each(process, port) {
530 
531             (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0,
532                                          0, NULL);
533 
534         } nxt_process_port_loop;
535 
536     } nxt_runtime_process_loop;
537 }
538 
539 
540 static void
541 nxt_runtime_exit(nxt_task_t *task, void *obj, void *data)
542 {
543     int                 status, engine_count;
544     nxt_runtime_t       *rt;
545     nxt_process_t       *process;
546     nxt_event_engine_t  *engine;
547 
548     rt = obj;
549     engine = data;
550 
551     nxt_debug(task, "thread pools: %d", rt->thread_pools->nelts);
552 
553     if (!nxt_array_is_empty(rt->thread_pools)) {
554         return;
555     }
556 
557     if (rt->type == NXT_PROCESS_MAIN) {
558         if (rt->pid_file != NULL) {
559             nxt_file_delete(rt->pid_file);
560         }
561 
562 #if (NXT_HAVE_UNIX_DOMAIN)
563         {
564             nxt_sockaddr_t   *sa;
565             nxt_file_name_t  *name;
566 
567             sa = rt->controller_listen;
568 
569             if (sa->u.sockaddr.sa_family == AF_UNIX) {
570                 name = (nxt_file_name_t *) sa->u.sockaddr_un.sun_path;
571                 (void) nxt_file_delete(name);
572             }
573         }
574 #endif
575     }
576 
577     if (!engine->event.signal_support) {
578         nxt_event_engine_signals_stop(engine);
579     }
580 
581     nxt_runtime_process_each(rt, process) {
582 
583         nxt_process_close_ports(task, process);
584 
585     } nxt_runtime_process_loop;
586 
587     status = rt->status;
588 
589     engine_count = 0;
590 
591     nxt_queue_each(engine, &rt->engines, nxt_event_engine_t, link) {
592 
593         engine_count++;
594 
595     } nxt_queue_loop;
596 
597     if (engine_count <= 1) {
598         if (rt->port_by_type[rt->type] != NULL) {
599             nxt_port_use(task, rt->port_by_type[rt->type], -1);
600         }
601 
602         nxt_thread_mutex_destroy(&rt->processes_mutex);
603 
604         nxt_mp_destroy(rt->mem_pool);
605     }
606 
607     nxt_debug(task, "exit: %d", status);
608 
609     exit(status);
610     nxt_unreachable();
611 }
612 
613 
614 static nxt_int_t
615 nxt_runtime_event_engine_change(nxt_task_t *task, nxt_runtime_t *rt)
616 {
617     nxt_event_engine_t           *engine;
618     const nxt_event_interface_t  *interface;
619 
620     engine = task->thread->engine;
621 
622     if (engine->batch == rt->batch
623         && nxt_strcmp(engine->event.name, rt->engine) == 0)
624     {
625         return NXT_OK;
626     }
627 
628     interface = nxt_service_get(rt->services, "engine", rt->engine);
629 
630     if (interface != NULL) {
631         return nxt_event_engine_change(engine, interface, rt->batch);
632     }
633 
634     return NXT_ERROR;
635 }
636 
637 
638 void
639 nxt_runtime_event_engine_free(nxt_runtime_t *rt)
640 {
641     nxt_queue_link_t    *link;
642     nxt_event_engine_t  *engine;
643 
644     link = nxt_queue_first(&rt->engines);
645     nxt_queue_remove(link);
646 
647     engine = nxt_queue_link_data(link, nxt_event_engine_t, link);
648     nxt_event_engine_free(engine);
649 }
650 
651 
652 nxt_int_t
653 nxt_runtime_thread_pool_create(nxt_thread_t *thr, nxt_runtime_t *rt,
654     nxt_uint_t max_threads, nxt_nsec_t timeout)
655 {
656     nxt_thread_pool_t   *thread_pool, **tp;
657 
658     tp = nxt_array_add(rt->thread_pools);
659     if (tp == NULL) {
660         return NXT_ERROR;
661     }
662 
663     thread_pool = nxt_thread_pool_create(max_threads, timeout,
664                                          nxt_runtime_thread_pool_init,
665                                          thr->engine,
666                                          nxt_runtime_thread_pool_exit);
667 
668     if (nxt_fast_path(thread_pool != NULL)) {
669         *tp = thread_pool;
670     }
671 
672     return NXT_OK;
673 }
674 
675 
676 static void
677 nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt,
678     nxt_runtime_cont_t cont)
679 {
680     nxt_uint_t         n;
681     nxt_thread_pool_t  **tp;
682 
683     rt->continuation = cont;
684 
685     n = rt->thread_pools->nelts;
686 
687     if (n == 0) {
688         cont(task, 0);
689         return;
690     }
691 
692     tp = rt->thread_pools->elts;
693 
694     do {
695         nxt_thread_pool_destroy(*tp);
696 
697         tp++;
698         n--;
699     } while (n != 0);
700 }
701 
702 
703 static void
704 nxt_runtime_thread_pool_init(void)
705 {
706 }
707 
708 
709 static void
710 nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj, void *data)
711 {
712     nxt_uint_t           i, n;
713     nxt_runtime_t        *rt;
714     nxt_thread_pool_t    *tp, **thread_pools;
715     nxt_thread_handle_t  handle;
716 
717     tp = obj;
718 
719     if (data != NULL) {
720         handle = (nxt_thread_handle_t) (uintptr_t) data;
721         nxt_thread_wait(handle);
722     }
723 
724     rt = task->thread->runtime;
725 
726     thread_pools = rt->thread_pools->elts;
727     n = rt->thread_pools->nelts;
728 
729     nxt_debug(task, "thread pools: %ui", n);
730 
731     for (i = 0; i < n; i++) {
732 
733         if (tp == thread_pools[i]) {
734             nxt_array_remove(rt->thread_pools, &thread_pools[i]);
735 
736             nxt_free(tp);
737 
738             if (n == 1) {
739                 /* The last thread pool. */
740                 rt->continuation(task, 0);
741             }
742 
743             return;
744         }
745     }
746 }
747 
748 
749 static nxt_int_t
750 nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt)
751 {
752     nxt_int_t                    ret;
753     nxt_str_t                    control;
754     nxt_uint_t                   n;
755     nxt_file_t                   *file;
756     const char                   *slash;
757     nxt_sockaddr_t               *sa;
758     nxt_file_name_str_t          file_name;
759     const nxt_event_interface_t  *interface;
760 
761     rt->daemon = 1;
762     rt->engine_connections = 256;
763     rt->auxiliary_threads = 2;
764     rt->user_cred.user = NXT_USER;
765     rt->group = NXT_GROUP;
766     rt->pid = NXT_PID;
767     rt->log = NXT_LOG;
768     rt->modules = NXT_MODULES;
769     rt->state = NXT_STATE;
770     rt->control = NXT_CONTROL_SOCK;
771     rt->tmp = NXT_TMP;
772 
773     nxt_memzero(&rt->capabilities, sizeof(nxt_capabilities_t));
774 
775     if (nxt_runtime_conf_read_cmd(task, rt) != NXT_OK) {
776         return NXT_ERROR;
777     }
778 
779     if (nxt_capability_set(task, &rt->capabilities) != NXT_OK) {
780         return NXT_ERROR;
781     }
782 
783     if (rt->capabilities.setid) {
784         ret = nxt_credential_get(task, rt->mem_pool, &rt->user_cred,
785                                 rt->group);
786 
787         if (nxt_slow_path(ret != NXT_OK)) {
788             return NXT_ERROR;
789         }
790 
791     } else {
792         nxt_log(task, NXT_LOG_WARN, "Unit is running unprivileged, then it "
793                 "cannot use arbitrary user and group.");
794     }
795 
796     /* An engine's parameters. */
797 
798     interface = nxt_service_get(rt->services, "engine", rt->engine);
799     if (interface == NULL) {
800         return NXT_ERROR;
801     }
802 
803     rt->engine = interface->name;
804 
805     ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%Z", rt->pid);
806     if (nxt_slow_path(ret != NXT_OK)) {
807         return NXT_ERROR;
808     }
809 
810     rt->pid_file = file_name.start;
811 
812     ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%Z", rt->log);
813     if (nxt_slow_path(ret != NXT_OK)) {
814         return NXT_ERROR;
815     }
816 
817     file = nxt_list_first(rt->log_files);
818     file->name = file_name.start;
819 
820     slash = "";
821     n = nxt_strlen(rt->modules);
822 
823     if (n > 1 && rt->modules[n - 1] != '/') {
824         slash = "/";
825     }
826 
827     ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%s*.unit.so%Z",
828                                rt->modules, slash);
829     if (nxt_slow_path(ret != NXT_OK)) {
830         return NXT_ERROR;
831     }
832 
833     rt->modules = (char *) file_name.start;
834 
835     slash = "";
836     n = nxt_strlen(rt->state);
837 
838     if (n > 1 && rt->state[n - 1] != '/') {
839         slash = "/";
840     }
841 
842     ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%sconf.json%Z",
843                                rt->state, slash);
844     if (nxt_slow_path(ret != NXT_OK)) {
845         return NXT_ERROR;
846     }
847 
848     rt->conf = (char *) file_name.start;
849 
850     ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s.tmp%Z", rt->conf);
851     if (nxt_slow_path(ret != NXT_OK)) {
852         return NXT_ERROR;
853     }
854 
855     rt->conf_tmp = (char *) file_name.start;
856 
857     ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%scerts/%Z",
858                                rt->state, slash);
859     if (nxt_slow_path(ret != NXT_OK)) {
860         return NXT_ERROR;
861     }
862 
863     ret = mkdir((char *) file_name.start, S_IRWXU);
864 
865     if (nxt_fast_path(ret == 0 || nxt_errno == EEXIST)) {
866         rt->certs.length = file_name.len;
867         rt->certs.start = file_name.start;
868 
869     } else {
870         nxt_alert(task, "Unable to create certificates storage directory: "
871                   "mkdir(%s) failed %E", file_name.start, nxt_errno);
872     }
873 
874     control.length = nxt_strlen(rt->control);
875     control.start = (u_char *) rt->control;
876 
877     sa = nxt_sockaddr_parse(rt->mem_pool, &control);
878     if (nxt_slow_path(sa == NULL)) {
879         return NXT_ERROR;
880     }
881 
882     sa->type = SOCK_STREAM;
883 
884     rt->controller_listen = sa;
885 
886     if (nxt_runtime_controller_socket(task, rt) != NXT_OK) {
887         return NXT_ERROR;
888     }
889 
890     return NXT_OK;
891 }
892 
893 
894 static nxt_int_t
895 nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt)
896 {
897     char    *p, **argv;
898     u_char  *end;
899     u_char  buf[1024];
900 
901     static const char  version[] =
902         "unit version: " NXT_VERSION "\n"
903         "configured as ./configure" NXT_CONFIGURE_OPTIONS "\n";
904 
905     static const char  no_control[] =
906                        "option \"--control\" requires socket address\n";
907     static const char  no_user[] = "option \"--user\" requires username\n";
908     static const char  no_group[] = "option \"--group\" requires group name\n";
909     static const char  no_pid[] = "option \"--pid\" requires filename\n";
910     static const char  no_log[] = "option \"--log\" requires filename\n";
911     static const char  no_modules[] =
912                        "option \"--modules\" requires directory\n";
913     static const char  no_state[] = "option \"--state\" requires directory\n";
914     static const char  no_tmp[] = "option \"--tmp\" requires directory\n";
915 
916     static const char  help[] =
917         "\n"
918         "unit options:\n"
919         "\n"
920         "  --version            print unit version and configure options\n"
921         "\n"
922         "  --no-daemon          run unit in non-daemon mode\n"
923         "\n"
924         "  --control ADDRESS    set address of control API socket\n"
925         "                       default: \"" NXT_CONTROL_SOCK "\"\n"
926         "\n"
927         "  --pid FILE           set pid filename\n"
928         "                       default: \"" NXT_PID "\"\n"
929         "\n"
930         "  --log FILE           set log filename\n"
931         "                       default: \"" NXT_LOG "\"\n"
932         "\n"
933         "  --modules DIRECTORY  set modules directory name\n"
934         "                       default: \"" NXT_MODULES "\"\n"
935         "\n"
936         "  --state DIRECTORY    set state directory name\n"
937         "                       default: \"" NXT_STATE "\"\n"
938         "\n"
939         "  --tmp DIRECTORY      set tmp directory name\n"
940         "                       default: \"" NXT_TMP "\"\n"
941         "\n"
942         "  --user USER          set non-privileged processes to run"
943                                 " as specified user\n"
944         "                       default: \"" NXT_USER "\"\n"
945         "\n"
946         "  --group GROUP        set non-privileged processes to run"
947                                 " as specified group\n"
948         "                       default: ";
949 
950     static const char  group[] = "\"" NXT_GROUP "\"\n\n";
951     static const char  primary[] = "user's primary group\n\n";
952 
953     argv = &nxt_process_argv[1];
954 
955     while (*argv != NULL) {
956         p = *argv++;
957 
958         if (nxt_strcmp(p, "--control") == 0) {
959             if (*argv == NULL) {
960                 write(STDERR_FILENO, no_control, nxt_length(no_control));
961                 return NXT_ERROR;
962             }
963 
964             p = *argv++;
965 
966             rt->control = p;
967 
968             continue;
969         }
970 
971         if (nxt_strcmp(p, "--user") == 0) {
972             if (*argv == NULL) {
973                 write(STDERR_FILENO, no_user, nxt_length(no_user));
974                 return NXT_ERROR;
975             }
976 
977             p = *argv++;
978 
979             rt->user_cred.user = p;
980 
981             continue;
982         }
983 
984         if (nxt_strcmp(p, "--group") == 0) {
985             if (*argv == NULL) {
986                 write(STDERR_FILENO, no_group, nxt_length(no_group));
987                 return NXT_ERROR;
988             }
989 
990             p = *argv++;
991 
992             rt->group = p;
993 
994             continue;
995         }
996 
997         if (nxt_strcmp(p, "--pid") == 0) {
998             if (*argv == NULL) {
999                 write(STDERR_FILENO, no_pid, nxt_length(no_pid));
1000                 return NXT_ERROR;
1001             }
1002 
1003             p = *argv++;
1004 
1005             rt->pid = p;
1006 
1007             continue;
1008         }
1009 
1010         if (nxt_strcmp(p, "--log") == 0) {
1011             if (*argv == NULL) {
1012                 write(STDERR_FILENO, no_log, nxt_length(no_log));
1013                 return NXT_ERROR;
1014             }
1015 
1016             p = *argv++;
1017 
1018             rt->log = p;
1019 
1020             continue;
1021         }
1022 
1023         if (nxt_strcmp(p, "--modules") == 0) {
1024             if (*argv == NULL) {
1025                 write(STDERR_FILENO, no_modules, nxt_length(no_modules));
1026                 return NXT_ERROR;
1027             }
1028 
1029             p = *argv++;
1030 
1031             rt->modules = p;
1032 
1033             continue;
1034         }
1035 
1036         if (nxt_strcmp(p, "--state") == 0) {
1037             if (*argv == NULL) {
1038                 write(STDERR_FILENO, no_state, nxt_length(no_state));
1039                 return NXT_ERROR;
1040             }
1041 
1042             p = *argv++;
1043 
1044             rt->state = p;
1045 
1046             continue;
1047         }
1048 
1049         if (nxt_strcmp(p, "--tmp") == 0) {
1050             if (*argv == NULL) {
1051                 write(STDERR_FILENO, no_tmp, nxt_length(no_tmp));
1052                 return NXT_ERROR;
1053             }
1054 
1055             p = *argv++;
1056 
1057             rt->tmp = p;
1058 
1059             continue;
1060         }
1061 
1062         if (nxt_strcmp(p, "--no-daemon") == 0) {
1063             rt->daemon = 0;
1064             continue;
1065         }
1066 
1067         if (nxt_strcmp(p, "--version") == 0) {
1068             write(STDERR_FILENO, version, nxt_length(version));
1069             exit(0);
1070         }
1071 
1072         if (nxt_strcmp(p, "--help") == 0 || nxt_strcmp(p, "-h") == 0) {
1073             write(STDOUT_FILENO, help, nxt_length(help));
1074 
1075             if (sizeof(NXT_GROUP) == 1) {
1076                 write(STDOUT_FILENO, primary, nxt_length(primary));
1077 
1078             } else {
1079                 write(STDOUT_FILENO, group, nxt_length(group));
1080             }
1081 
1082             exit(0);
1083         }
1084 
1085         end = nxt_sprintf(buf, buf + sizeof(buf), "unknown option \"%s\", "
1086                           "try \"%s -h\" for available options\n",
1087                           p, nxt_process_argv[0]);
1088 
1089         write(STDERR_FILENO, buf, end - buf);
1090 
1091         return NXT_ERROR;
1092     }
1093 
1094     return NXT_OK;
1095 }
1096 
1097 
1098 nxt_listen_socket_t *
1099 nxt_runtime_listen_socket_add(nxt_runtime_t *rt, nxt_sockaddr_t *sa)
1100 {
1101     nxt_mp_t             *mp;
1102     nxt_listen_socket_t  *ls;
1103 
1104     ls = nxt_array_zero_add(rt->listen_sockets);
1105     if (ls == NULL) {
1106         return NULL;
1107     }
1108 
1109     mp = rt->mem_pool;
1110 
1111     ls->sockaddr = nxt_sockaddr_create(mp, &sa->u.sockaddr, sa->socklen,
1112                                        sa->length);
1113     if (ls->sockaddr == NULL) {
1114         return NULL;
1115     }
1116 
1117     ls->sockaddr->type = sa->type;
1118 
1119     nxt_sockaddr_text(ls->sockaddr);
1120 
1121     ls->socket = -1;
1122     ls->backlog = NXT_LISTEN_BACKLOG;
1123 
1124     return ls;
1125 }
1126 
1127 
1128 static nxt_int_t
1129 nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt)
1130 {
1131     size_t  length;
1132     char    hostname[NXT_MAXHOSTNAMELEN + 1];
1133 
1134     if (gethostname(hostname, NXT_MAXHOSTNAMELEN) != 0) {
1135         nxt_alert(task, "gethostname() failed %E", nxt_errno);
1136         return NXT_ERROR;
1137     }
1138 
1139     /*
1140      * Linux gethostname(2):
1141      *
1142      *    If the null-terminated hostname is too large to fit,
1143      *    then the name is truncated, and no error is returned.
1144      *
1145      * For this reason an additional byte is reserved in the buffer.
1146      */
1147     hostname[NXT_MAXHOSTNAMELEN] = '\0';
1148 
1149     length = nxt_strlen(hostname);
1150     rt->hostname.length = length;
1151 
1152     rt->hostname.start = nxt_mp_nget(rt->mem_pool, length);
1153 
1154     if (rt->hostname.start != NULL) {
1155         nxt_memcpy_lowcase(rt->hostname.start, (u_char *) hostname, length);
1156         return NXT_OK;
1157     }
1158 
1159     return NXT_ERROR;
1160 }
1161 
1162 
1163 static nxt_int_t
1164 nxt_runtime_log_files_init(nxt_runtime_t *rt)
1165 {
1166     nxt_file_t  *file;
1167     nxt_list_t  *log_files;
1168 
1169     log_files = nxt_list_create(rt->mem_pool, 1, sizeof(nxt_file_t));
1170 
1171     if (nxt_fast_path(log_files != NULL)) {
1172         rt->log_files = log_files;
1173 
1174         /* Preallocate the main log.  This allocation cannot fail. */
1175         file = nxt_list_zero_add(log_files);
1176 
1177         file->fd = NXT_FILE_INVALID;
1178         file->log_level = NXT_LOG_ALERT;
1179 
1180         return NXT_OK;
1181     }
1182 
1183     return NXT_ERROR;
1184 }
1185 
1186 
1187 nxt_file_t *
1188 nxt_runtime_log_file_add(nxt_runtime_t *rt, nxt_str_t *name)
1189 {
1190     nxt_int_t            ret;
1191     nxt_file_t           *file;
1192     nxt_file_name_str_t  file_name;
1193 
1194     ret = nxt_file_name_create(rt->mem_pool, &file_name, "V%Z", name);
1195 
1196     if (nxt_slow_path(ret != NXT_OK)) {
1197         return NULL;
1198     }
1199 
1200     nxt_list_each(file, rt->log_files) {
1201 
1202         /* STUB: hardecoded case sensitive/insensitive. */
1203 
1204         if (file->name != NULL
1205             && nxt_file_name_eq(file->name, file_name.start))
1206         {
1207             return file;
1208         }
1209 
1210     } nxt_list_loop;
1211 
1212     file = nxt_list_zero_add(rt->log_files);
1213 
1214     if (nxt_slow_path(file == NULL)) {
1215         return NULL;
1216     }
1217 
1218     file->fd = NXT_FILE_INVALID;
1219     file->log_level = NXT_LOG_ALERT;
1220     file->name = file_name.start;
1221 
1222     return file;
1223 }
1224 
1225 
1226 static nxt_int_t
1227 nxt_runtime_log_files_create(nxt_task_t *task, nxt_runtime_t *rt)
1228 {
1229     nxt_int_t   ret;
1230     nxt_file_t  *file;
1231 
1232     nxt_list_each(file, rt->log_files) {
1233 
1234         ret = nxt_file_open(task, file, O_WRONLY | O_APPEND, O_CREAT,
1235                             NXT_FILE_OWNER_ACCESS);
1236 
1237         if (ret != NXT_OK) {
1238             return NXT_ERROR;
1239         }
1240 
1241     } nxt_list_loop;
1242 
1243     file = nxt_list_first(rt->log_files);
1244 
1245     return nxt_file_stderr(file);
1246 }
1247 
1248 
1249 nxt_int_t
1250 nxt_runtime_listen_sockets_create(nxt_task_t *task, nxt_runtime_t *rt)
1251 {
1252     nxt_int_t            ret;
1253     nxt_uint_t           c, p, ncurr, nprev;
1254     nxt_listen_socket_t  *curr, *prev;
1255 
1256     curr = rt->listen_sockets->elts;
1257     ncurr = rt->listen_sockets->nelts;
1258 
1259     if (rt->inherited_sockets != NULL) {
1260         prev = rt->inherited_sockets->elts;
1261         nprev = rt->inherited_sockets->nelts;
1262 
1263     } else {
1264         prev = NULL;
1265         nprev = 0;
1266     }
1267 
1268     for (c = 0; c < ncurr; c++) {
1269 
1270         for (p = 0; p < nprev; p++) {
1271 
1272             if (nxt_sockaddr_cmp(curr[c].sockaddr, prev[p].sockaddr)) {
1273 
1274                 ret = nxt_listen_socket_update(task, &curr[c], &prev[p]);
1275                 if (ret != NXT_OK) {
1276                     return NXT_ERROR;
1277                 }
1278 
1279                 goto next;
1280             }
1281         }
1282 
1283         if (nxt_listen_socket_create(task, rt->mem_pool, &curr[c]) != NXT_OK) {
1284             return NXT_ERROR;
1285         }
1286 
1287     next:
1288 
1289         continue;
1290     }
1291 
1292     return NXT_OK;
1293 }
1294 
1295 
1296 nxt_int_t
1297 nxt_runtime_listen_sockets_enable(nxt_task_t *task, nxt_runtime_t *rt)
1298 {
1299     nxt_uint_t           i, n;
1300     nxt_listen_socket_t  *ls;
1301 
1302     ls = rt->listen_sockets->elts;
1303     n = rt->listen_sockets->nelts;
1304 
1305     for (i = 0; i < n; i++) {
1306         if (ls[i].flags == NXT_NONBLOCK) {
1307             if (nxt_listen_event(task, &ls[i]) == NULL) {
1308                 return NXT_ERROR;
1309             }
1310         }
1311     }
1312 
1313     return NXT_OK;
1314 }
1315 
1316 
1317 nxt_str_t *
1318 nxt_current_directory(nxt_mp_t *mp)
1319 {
1320     size_t     length;
1321     u_char     *p;
1322     nxt_str_t  *name;
1323     char       buf[NXT_MAX_PATH_LEN];
1324 
1325     length = nxt_dir_current(buf, NXT_MAX_PATH_LEN);
1326 
1327     if (nxt_fast_path(length != 0)) {
1328         name = nxt_str_alloc(mp, length + 1);
1329 
1330         if (nxt_fast_path(name != NULL)) {
1331             p = nxt_cpymem(name->start, buf, length);
1332             *p = '/';
1333 
1334             return name;
1335         }
1336     }
1337 
1338     return NULL;
1339 }
1340 
1341 
1342 static nxt_int_t
1343 nxt_runtime_pid_file_create(nxt_task_t *task, nxt_file_name_t *pid_file)
1344 {
1345     ssize_t     length;
1346     nxt_int_t   n;
1347     nxt_file_t  file;
1348     u_char      pid[NXT_INT64_T_LEN + nxt_length("\n")];
1349 
1350     nxt_memzero(&file, sizeof(nxt_file_t));
1351 
1352     file.name = pid_file;
1353 
1354     n = nxt_file_open(task, &file, O_WRONLY, O_CREAT | O_TRUNC,
1355                       NXT_FILE_DEFAULT_ACCESS);
1356 
1357     if (n != NXT_OK) {
1358         return NXT_ERROR;
1359     }
1360 
1361     length = nxt_sprintf(pid, pid + sizeof(pid), "%PI%n", nxt_pid) - pid;
1362 
1363     if (nxt_file_write(&file, pid, length, 0) != length) {
1364         return NXT_ERROR;
1365     }
1366 
1367     nxt_file_close(task, &file);
1368 
1369     return NXT_OK;
1370 }
1371 
1372 
1373 nxt_process_t *
1374 nxt_runtime_process_new(nxt_runtime_t *rt)
1375 {
1376     nxt_process_t  *process;
1377 
1378     /* TODO: memory failures. */
1379 
1380     process = nxt_mp_zalloc(rt->mem_pool,
1381                             sizeof(nxt_process_t) + sizeof(nxt_process_init_t));
1382 
1383     if (nxt_slow_path(process == NULL)) {
1384         return NULL;
1385     }
1386 
1387     nxt_queue_init(&process->ports);
1388 
1389     nxt_thread_mutex_create(&process->incoming.mutex);
1390 
1391     process->use_count = 1;
1392 
1393     return process;
1394 }
1395 
1396 
1397 void
1398 nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process)
1399 {
1400     if (process->registered == 1) {
1401         nxt_runtime_process_remove(rt, process);
1402     }
1403 
1404     nxt_assert(process->use_count == 0);
1405     nxt_assert(process->registered == 0);
1406 
1407     nxt_port_mmaps_destroy(&process->incoming, 1);
1408 
1409     nxt_thread_mutex_destroy(&process->incoming.mutex);
1410 
1411     /* processes from nxt_runtime_process_get() have no memory pool */
1412     if (process->mem_pool != NULL) {
1413         nxt_mp_destroy(process->mem_pool);
1414     }
1415 
1416     nxt_mp_free(rt->mem_pool, process);
1417 }
1418 
1419 
1420 static nxt_int_t
1421 nxt_runtime_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data)
1422 {
1423     nxt_process_t  *process;
1424 
1425     process = data;
1426 
1427     if (lhq->key.length == sizeof(nxt_pid_t)
1428         && *(nxt_pid_t *) lhq->key.start == process->pid)
1429     {
1430         return NXT_OK;
1431     }
1432 
1433     return NXT_DECLINED;
1434 }
1435 
1436 static const nxt_lvlhsh_proto_t  lvlhsh_processes_proto  nxt_aligned(64) = {
1437     NXT_LVLHSH_DEFAULT,
1438     nxt_runtime_lvlhsh_pid_test,
1439     nxt_lvlhsh_alloc,
1440     nxt_lvlhsh_free,
1441 };
1442 
1443 
1444 nxt_inline void
1445 nxt_runtime_process_lhq_pid(nxt_lvlhsh_query_t *lhq, nxt_pid_t *pid)
1446 {
1447     lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid));
1448     lhq->key.length = sizeof(*pid);
1449     lhq->key.start = (u_char *) pid;
1450     lhq->proto = &lvlhsh_processes_proto;
1451 }
1452 
1453 
1454 nxt_process_t *
1455 nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid)
1456 {
1457     nxt_process_t       *process;
1458     nxt_lvlhsh_query_t  lhq;
1459 
1460     process = NULL;
1461 
1462     nxt_runtime_process_lhq_pid(&lhq, &pid);
1463 
1464     nxt_thread_mutex_lock(&rt->processes_mutex);
1465 
1466     if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) {
1467         process = lhq.value;
1468 
1469     } else {
1470         nxt_thread_log_debug("process %PI not found", pid);
1471     }
1472 
1473     nxt_thread_mutex_unlock(&rt->processes_mutex);
1474 
1475     return process;
1476 }
1477 
1478 
1479 static nxt_process_t *
1480 nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid)
1481 {
1482     nxt_process_t       *process;
1483     nxt_lvlhsh_query_t  lhq;
1484 
1485     nxt_runtime_process_lhq_pid(&lhq, &pid);
1486 
1487     nxt_thread_mutex_lock(&rt->processes_mutex);
1488 
1489     if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) {
1490         nxt_thread_log_debug("process %PI found", pid);
1491 
1492         nxt_thread_mutex_unlock(&rt->processes_mutex);
1493 
1494         process = lhq.value;
1495         process->use_count++;
1496 
1497         return process;
1498     }
1499 
1500     process = nxt_runtime_process_new(rt);
1501     if (nxt_slow_path(process == NULL)) {
1502 
1503         nxt_thread_mutex_unlock(&rt->processes_mutex);
1504 
1505         return NULL;
1506     }
1507 
1508     process->pid = pid;
1509 
1510     lhq.replace = 0;
1511     lhq.value = process;
1512     lhq.pool = rt->mem_pool;
1513 
1514     switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) {
1515 
1516     case NXT_OK:
1517         if (rt->nprocesses == 0) {
1518             rt->mprocess = process;
1519         }
1520 
1521         rt->nprocesses++;
1522 
1523         process->registered = 1;
1524 
1525         nxt_thread_log_debug("process %PI insert", pid);
1526         break;
1527 
1528     default:
1529         nxt_thread_log_debug("process %PI insert failed", pid);
1530         break;
1531     }
1532 
1533     nxt_thread_mutex_unlock(&rt->processes_mutex);
1534 
1535     return process;
1536 }
1537 
1538 
1539 void
1540 nxt_runtime_process_add(nxt_task_t *task, nxt_process_t *process)
1541 {
1542     nxt_port_t          *port;
1543     nxt_runtime_t       *rt;
1544     nxt_lvlhsh_query_t  lhq;
1545 
1546     nxt_assert(process->registered == 0);
1547 
1548     rt = task->thread->runtime;
1549 
1550     nxt_runtime_process_lhq_pid(&lhq, &process->pid);
1551 
1552     lhq.replace = 0;
1553     lhq.value = process;
1554     lhq.pool = rt->mem_pool;
1555 
1556     nxt_thread_mutex_lock(&rt->processes_mutex);
1557 
1558     switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) {
1559 
1560     case NXT_OK:
1561         if (rt->nprocesses == 0) {
1562             rt->mprocess = process;
1563         }
1564 
1565         rt->nprocesses++;
1566 
1567         nxt_process_port_each(process, port) {
1568 
1569             port->pid = process->pid;
1570 
1571             nxt_runtime_port_add(task, port);
1572 
1573         } nxt_process_port_loop;
1574 
1575         process->registered = 1;
1576 
1577         nxt_thread_log_debug("process %PI added", process->pid);
1578         break;
1579 
1580     default:
1581         nxt_thread_log_debug("process %PI failed to add", process->pid);
1582         break;
1583     }
1584 
1585     nxt_thread_mutex_unlock(&rt->processes_mutex);
1586 }
1587 
1588 
1589 static void
1590 nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process)
1591 {
1592     nxt_pid_t           pid;
1593     nxt_lvlhsh_query_t  lhq;
1594 
1595     pid = process->pid;
1596 
1597     nxt_runtime_process_lhq_pid(&lhq, &pid);
1598 
1599     lhq.pool = rt->mem_pool;
1600 
1601     nxt_thread_mutex_lock(&rt->processes_mutex);
1602 
1603     switch (nxt_lvlhsh_delete(&rt->processes, &lhq)) {
1604 
1605     case NXT_OK:
1606         rt->nprocesses--;
1607 
1608         process = lhq.value;
1609 
1610         process->registered = 0;
1611 
1612         nxt_thread_log_debug("process %PI removed", pid);
1613         break;
1614 
1615     default:
1616         nxt_thread_log_debug("process %PI remove failed", pid);
1617         break;
1618     }
1619 
1620     nxt_thread_mutex_unlock(&rt->processes_mutex);
1621 }
1622 
1623 
1624 nxt_process_t *
1625 nxt_runtime_process_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe)
1626 {
1627     nxt_lvlhsh_each_init(lhe, &lvlhsh_processes_proto);
1628 
1629     return nxt_runtime_process_next(rt, lhe);
1630 }
1631 
1632 
1633 nxt_port_t *
1634 nxt_runtime_process_port_create(nxt_task_t *task, nxt_runtime_t *rt,
1635     nxt_pid_t pid, nxt_port_id_t id, nxt_process_type_t type)
1636 {
1637     nxt_port_t     *port;
1638     nxt_process_t  *process;
1639 
1640     process = nxt_runtime_process_get(rt, pid);
1641     if (nxt_slow_path(process == NULL)) {
1642         return NULL;
1643     }
1644 
1645     port = nxt_port_new(task, id, pid, type);
1646     if (nxt_slow_path(port == NULL)) {
1647         nxt_process_use(task, process, -1);
1648         return NULL;
1649     }
1650 
1651     nxt_process_port_add(task, process, port);
1652 
1653     nxt_process_use(task, process, -1);
1654 
1655     nxt_runtime_port_add(task, port);
1656 
1657     nxt_port_use(task, port, -1);
1658 
1659     return port;
1660 }
1661 
1662 
1663 static void
1664 nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port)
1665 {
1666     nxt_int_t      res;
1667     nxt_runtime_t  *rt;
1668 
1669     rt = task->thread->runtime;
1670 
1671     res = nxt_port_hash_add(&rt->ports, port);
1672 
1673     if (res != NXT_OK) {
1674         return;
1675     }
1676 
1677     rt->port_by_type[port->type] = port;
1678 
1679     nxt_port_use(task, port, 1);
1680 }
1681 
1682 
1683 void
1684 nxt_runtime_port_remove(nxt_task_t *task, nxt_port_t *port)
1685 {
1686     nxt_int_t      res;
1687     nxt_runtime_t  *rt;
1688 
1689     rt = task->thread->runtime;
1690 
1691     res = nxt_port_hash_remove(&rt->ports, port);
1692 
1693     if (res != NXT_OK) {
1694         return;
1695     }
1696 
1697     if (rt->port_by_type[port->type] == port) {
1698         rt->port_by_type[port->type] = NULL;
1699     }
1700 
1701     nxt_port_use(task, port, -1);
1702 }
1703 
1704 
1705 nxt_port_t *
1706 nxt_runtime_port_find(nxt_runtime_t *rt, nxt_pid_t pid,
1707     nxt_port_id_t port_id)
1708 {
1709     return nxt_port_hash_find(&rt->ports, pid, port_id);
1710 }
1711