xref: /unit/src/nxt_runtime.c (revision 2174:a7fb5d8a9590)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Valentin V. Bartenev
5  * Copyright (C) NGINX, Inc.
6  */
7 
8 #include <nxt_main.h>
9 #include <nxt_runtime.h>
10 #include <nxt_port.h>
11 #include <nxt_main_process.h>
12 #include <nxt_router.h>
13 #include <nxt_regex.h>
14 
15 
16 static nxt_int_t nxt_runtime_inherited_listen_sockets(nxt_task_t *task,
17     nxt_runtime_t *rt);
18 static nxt_int_t nxt_runtime_systemd_listen_sockets(nxt_task_t *task,
19     nxt_runtime_t *rt);
20 static nxt_int_t nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt);
21 static nxt_int_t nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt);
22 static void nxt_runtime_start(nxt_task_t *task, void *obj, void *data);
23 static void nxt_runtime_initial_start(nxt_task_t *task, nxt_uint_t status);
24 static void nxt_runtime_close_idle_connections(nxt_event_engine_t *engine);
25 static void nxt_runtime_stop_all_processes(nxt_task_t *task, nxt_runtime_t *rt);
26 static void nxt_runtime_exit(nxt_task_t *task, void *obj, void *data);
27 static nxt_int_t nxt_runtime_event_engine_change(nxt_task_t *task,
28     nxt_runtime_t *rt);
29 static nxt_int_t nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt);
30 static nxt_int_t nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt);
31 static nxt_int_t nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt);
32 static nxt_int_t nxt_runtime_log_files_init(nxt_runtime_t *rt);
33 static nxt_int_t nxt_runtime_log_files_create(nxt_task_t *task,
34     nxt_runtime_t *rt);
35 static nxt_int_t nxt_runtime_pid_file_create(nxt_task_t *task,
36     nxt_file_name_t *pid_file);
37 static void nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt,
38     nxt_runtime_cont_t cont);
39 static void nxt_runtime_thread_pool_init(void);
40 static void nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj,
41     void *data);
42 static nxt_process_t *nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid);
43 static void nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port);
44 
45 
46 nxt_int_t
47 nxt_runtime_create(nxt_task_t *task)
48 {
49     nxt_mp_t               *mp;
50     nxt_int_t              ret;
51     nxt_array_t            *listen_sockets;
52     nxt_runtime_t          *rt;
53     nxt_app_lang_module_t  *lang;
54 
55     mp = nxt_mp_create(1024, 128, 256, 32);
56     if (nxt_slow_path(mp == NULL)) {
57         return NXT_ERROR;
58     }
59 
60     rt = nxt_mp_zget(mp, sizeof(nxt_runtime_t));
61     if (nxt_slow_path(rt == NULL)) {
62         goto fail;
63     }
64 
65     task->thread->runtime = rt;
66     rt->mem_pool = mp;
67 
68     nxt_thread_mutex_create(&rt->processes_mutex);
69 
70     rt->services = nxt_services_init(mp);
71     if (nxt_slow_path(rt->services == NULL)) {
72         goto fail;
73     }
74 
75     rt->languages = nxt_array_create(mp, 1, sizeof(nxt_app_lang_module_t));
76     if (nxt_slow_path(rt->languages == NULL)) {
77         goto fail;
78     }
79 
80     /* Should not fail. */
81     lang = nxt_array_add(rt->languages);
82     lang->type = NXT_APP_EXTERNAL;
83     lang->version = (u_char *) "";
84     lang->file = NULL;
85     lang->module = &nxt_external_module;
86 
87     lang->mounts = nxt_array_create(mp, 1, sizeof(nxt_fs_mount_t));
88     if (nxt_slow_path(lang->mounts == NULL)) {
89         goto fail;
90     }
91 
92     listen_sockets = nxt_array_create(mp, 1, sizeof(nxt_listen_socket_t));
93     if (nxt_slow_path(listen_sockets == NULL)) {
94         goto fail;
95     }
96 
97     rt->listen_sockets = listen_sockets;
98 
99     ret = nxt_runtime_inherited_listen_sockets(task, rt);
100     if (nxt_slow_path(ret != NXT_OK)) {
101         goto fail;
102     }
103 
104     if (nxt_runtime_hostname(task, rt) != NXT_OK) {
105         goto fail;
106     }
107 
108     if (nxt_slow_path(nxt_runtime_log_files_init(rt) != NXT_OK)) {
109         goto fail;
110     }
111 
112     if (nxt_runtime_event_engines(task, rt) != NXT_OK) {
113         goto fail;
114     }
115 
116     if (nxt_slow_path(nxt_runtime_thread_pools(task->thread, rt) != NXT_OK)) {
117         goto fail;
118     }
119 
120     rt->start = nxt_runtime_initial_start;
121 
122     if (nxt_runtime_conf_init(task, rt) != NXT_OK) {
123         goto fail;
124     }
125 
126     if (nxt_port_rpc_init() != NXT_OK) {
127         goto fail;
128     }
129 
130     if (nxt_slow_path(nxt_http_register_variables() != NXT_OK)) {
131         goto fail;
132     }
133 
134     if (nxt_slow_path(nxt_var_index_init() != NXT_OK)) {
135         goto fail;
136     }
137 
138     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
139                        nxt_runtime_start, task, rt, NULL);
140 
141     return NXT_OK;
142 
143 fail:
144 
145     nxt_mp_destroy(mp);
146 
147     return NXT_ERROR;
148 }
149 
150 
151 static nxt_int_t
152 nxt_runtime_inherited_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt)
153 {
154     u_char               *v, *p;
155     nxt_int_t            type;
156     nxt_array_t          *inherited_sockets;
157     nxt_socket_t         s;
158     nxt_listen_socket_t  *ls;
159 
160     v = (u_char *) getenv("NGINX");
161 
162     if (v == NULL) {
163         return nxt_runtime_systemd_listen_sockets(task, rt);
164     }
165 
166     nxt_alert(task, "using inherited listen sockets: %s", v);
167 
168     inherited_sockets = nxt_array_create(rt->mem_pool,
169                                          1, sizeof(nxt_listen_socket_t));
170     if (inherited_sockets == NULL) {
171         return NXT_ERROR;
172     }
173 
174     rt->inherited_sockets = inherited_sockets;
175 
176     for (p = v; *p != '\0'; p++) {
177 
178         if (*p == ';') {
179             s = nxt_int_parse(v, p - v);
180 
181             if (nxt_slow_path(s < 0)) {
182                 nxt_alert(task, "invalid socket number \"%s\" "
183                           "in NGINX environment variable, "
184                           "ignoring the rest of the variable", v);
185                 return NXT_ERROR;
186             }
187 
188             v = p + 1;
189 
190             ls = nxt_array_zero_add(inherited_sockets);
191             if (nxt_slow_path(ls == NULL)) {
192                 return NXT_ERROR;
193             }
194 
195             ls->socket = s;
196 
197             ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s);
198             if (nxt_slow_path(ls->sockaddr == NULL)) {
199                 return NXT_ERROR;
200             }
201 
202             type = nxt_socket_getsockopt(task, s, SOL_SOCKET, SO_TYPE);
203             if (nxt_slow_path(type == -1)) {
204                 return NXT_ERROR;
205             }
206 
207             ls->sockaddr->type = (uint16_t) type;
208         }
209     }
210 
211     return NXT_OK;
212 }
213 
214 
215 static nxt_int_t
216 nxt_runtime_systemd_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt)
217 {
218     u_char               *nfd, *pid;
219     nxt_int_t            n;
220     nxt_array_t          *inherited_sockets;
221     nxt_socket_t         s;
222     nxt_listen_socket_t  *ls;
223 
224     /*
225      * Number of listening sockets passed.  The socket
226      * descriptors start from number 3 and are sequential.
227      */
228     nfd = (u_char *) getenv("LISTEN_FDS");
229     if (nfd == NULL) {
230         return NXT_OK;
231     }
232 
233     /* The pid of the service process. */
234     pid = (u_char *) getenv("LISTEN_PID");
235     if (pid == NULL) {
236         return NXT_OK;
237     }
238 
239     n = nxt_int_parse(nfd, nxt_strlen(nfd));
240     if (n < 0) {
241         return NXT_OK;
242     }
243 
244     if (nxt_pid != nxt_int_parse(pid, nxt_strlen(pid))) {
245         return NXT_OK;
246     }
247 
248     nxt_log(task, NXT_LOG_INFO, "using %i systemd listen sockets", n);
249 
250     inherited_sockets = nxt_array_create(rt->mem_pool,
251                                          n, sizeof(nxt_listen_socket_t));
252     if (inherited_sockets == NULL) {
253         return NXT_ERROR;
254     }
255 
256     rt->inherited_sockets = inherited_sockets;
257 
258     for (s = 3; s < n; s++) {
259         ls = nxt_array_zero_add(inherited_sockets);
260         if (nxt_slow_path(ls == NULL)) {
261             return NXT_ERROR;
262         }
263 
264         ls->socket = s;
265 
266         ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s);
267         if (nxt_slow_path(ls->sockaddr == NULL)) {
268             return NXT_ERROR;
269         }
270 
271         ls->sockaddr->type = SOCK_STREAM;
272     }
273 
274     return NXT_OK;
275 }
276 
277 
278 static nxt_int_t
279 nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt)
280 {
281     nxt_thread_t                 *thread;
282     nxt_event_engine_t           *engine;
283     const nxt_event_interface_t  *interface;
284 
285     interface = nxt_service_get(rt->services, "engine", NULL);
286 
287     if (nxt_slow_path(interface == NULL)) {
288         /* TODO: log */
289         return NXT_ERROR;
290     }
291 
292     engine = nxt_event_engine_create(task, interface,
293                                      nxt_main_process_signals, 0, 0);
294 
295     if (nxt_slow_path(engine == NULL)) {
296         return NXT_ERROR;
297     }
298 
299     thread = task->thread;
300     thread->engine = engine;
301 #if 0
302     thread->fiber = &engine->fibers->fiber;
303 #endif
304 
305     engine->id = rt->last_engine_id++;
306     engine->mem_pool = nxt_mp_create(1024, 128, 256, 32);
307 
308     nxt_queue_init(&rt->engines);
309     nxt_queue_insert_tail(&rt->engines, &engine->link);
310 
311     return NXT_OK;
312 }
313 
314 
315 static nxt_int_t
316 nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt)
317 {
318     nxt_int_t    ret;
319     nxt_array_t  *thread_pools;
320 
321     thread_pools = nxt_array_create(rt->mem_pool, 1,
322                                     sizeof(nxt_thread_pool_t *));
323 
324     if (nxt_slow_path(thread_pools == NULL)) {
325         return NXT_ERROR;
326     }
327 
328     rt->thread_pools = thread_pools;
329     ret = nxt_runtime_thread_pool_create(thr, rt, 2, 60000 * 1000000LL);
330 
331     if (nxt_slow_path(ret != NXT_OK)) {
332         return NXT_ERROR;
333     }
334 
335     return NXT_OK;
336 }
337 
338 
339 static void
340 nxt_runtime_start(nxt_task_t *task, void *obj, void *data)
341 {
342     nxt_runtime_t  *rt;
343 
344     rt = obj;
345 
346     nxt_debug(task, "rt conf done");
347 
348     task->thread->log->ctx_handler = NULL;
349     task->thread->log->ctx = NULL;
350 
351     if (nxt_runtime_log_files_create(task, rt) != NXT_OK) {
352         goto fail;
353     }
354 
355     if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) {
356         goto fail;
357     }
358 
359     /*
360      * Thread pools should be destroyed before starting worker
361      * processes, because thread pool semaphores will stick in
362      * locked state in new processes after fork().
363      */
364     nxt_runtime_thread_pool_destroy(task, rt, rt->start);
365 
366     return;
367 
368 fail:
369 
370     nxt_runtime_quit(task, 1);
371 }
372 
373 
374 static void
375 nxt_runtime_initial_start(nxt_task_t *task, nxt_uint_t status)
376 {
377     nxt_int_t                    ret;
378     nxt_thread_t                 *thr;
379     nxt_runtime_t                *rt;
380     const nxt_event_interface_t  *interface;
381 
382     thr = task->thread;
383     rt = thr->runtime;
384 
385     if (rt->inherited_sockets == NULL && rt->daemon) {
386 
387         if (nxt_process_daemon(task) != NXT_OK) {
388             goto fail;
389         }
390 
391         /*
392          * An event engine should be updated after fork()
393          * even if an event facility was not changed because:
394          * 1) inherited kqueue descriptor is invalid,
395          * 2) the signal thread is not inherited.
396          */
397         interface = nxt_service_get(rt->services, "engine", rt->engine);
398         if (interface == NULL) {
399             goto fail;
400         }
401 
402         ret = nxt_event_engine_change(task->thread->engine, interface,
403                                       rt->batch);
404         if (ret != NXT_OK) {
405             goto fail;
406         }
407     }
408 
409     ret = nxt_runtime_pid_file_create(task, rt->pid_file);
410     if (ret != NXT_OK) {
411         goto fail;
412     }
413 
414     if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) {
415         goto fail;
416     }
417 
418     thr->engine->max_connections = rt->engine_connections;
419 
420     if (nxt_main_process_start(thr, task, rt) != NXT_ERROR) {
421         return;
422     }
423 
424 fail:
425 
426     nxt_runtime_quit(task, 1);
427 }
428 
429 
430 void
431 nxt_runtime_quit(nxt_task_t *task, nxt_uint_t status)
432 {
433     nxt_bool_t          done;
434     nxt_runtime_t       *rt;
435     nxt_event_engine_t  *engine;
436 
437     rt = task->thread->runtime;
438     rt->status |= status;
439     engine = task->thread->engine;
440 
441     nxt_debug(task, "exiting");
442 
443     done = 1;
444 
445     if (!engine->shutdown) {
446         engine->shutdown = 1;
447 
448         if (!nxt_array_is_empty(rt->thread_pools)) {
449             nxt_runtime_thread_pool_destroy(task, rt, nxt_runtime_quit);
450             done = 0;
451         }
452 
453         if (rt->type == NXT_PROCESS_MAIN) {
454             nxt_runtime_stop_all_processes(task, rt);
455             done = 0;
456         }
457     }
458 
459     nxt_runtime_close_idle_connections(engine);
460 
461     if (done) {
462         nxt_work_queue_add(&engine->fast_work_queue, nxt_runtime_exit,
463                            task, rt, engine);
464     }
465 }
466 
467 
468 static void
469 nxt_runtime_close_idle_connections(nxt_event_engine_t *engine)
470 {
471     nxt_conn_t        *c;
472     nxt_queue_t       *idle;
473     nxt_queue_link_t  *link, *next;
474 
475     nxt_debug(&engine->task, "close idle connections");
476 
477     idle = &engine->idle_connections;
478 
479     for (link = nxt_queue_first(idle);
480          link != nxt_queue_tail(idle);
481          link = next)
482     {
483         next = nxt_queue_next(link);
484         c = nxt_queue_link_data(link, nxt_conn_t, link);
485 
486         if (!c->socket.read_ready) {
487             nxt_queue_remove(link);
488             nxt_conn_close(engine, c);
489         }
490     }
491 }
492 
493 
494 void
495 nxt_runtime_stop_app_processes(nxt_task_t *task, nxt_runtime_t *rt)
496 {
497     nxt_port_t          *port;
498     nxt_process_t       *process;
499     nxt_process_init_t  *init;
500 
501     nxt_runtime_process_each(rt, process) {
502 
503         init = nxt_process_init(process);
504 
505         if (init->type == NXT_PROCESS_APP
506             || init->type == NXT_PROCESS_PROTOTYPE)
507         {
508 
509             nxt_process_port_each(process, port) {
510 
511                 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1,
512                                              0, 0, NULL);
513 
514             } nxt_process_port_loop;
515         }
516 
517     } nxt_runtime_process_loop;
518 }
519 
520 
521 static void
522 nxt_runtime_stop_all_processes(nxt_task_t *task, nxt_runtime_t *rt)
523 {
524     nxt_port_t     *port;
525     nxt_process_t  *process;
526 
527     nxt_runtime_process_each(rt, process) {
528 
529         nxt_process_port_each(process, port) {
530 
531             nxt_debug(task, "%d sending quit to %PI", rt->type, port->pid);
532 
533             (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0,
534                                          0, NULL);
535 
536         } nxt_process_port_loop;
537 
538     } nxt_runtime_process_loop;
539 }
540 
541 
542 static void
543 nxt_runtime_exit(nxt_task_t *task, void *obj, void *data)
544 {
545     int                 status, engine_count;
546     nxt_runtime_t       *rt;
547     nxt_process_t       *process;
548     nxt_event_engine_t  *engine;
549 
550     rt = obj;
551     engine = data;
552 
553     nxt_debug(task, "thread pools: %d", rt->thread_pools->nelts);
554 
555     if (!nxt_array_is_empty(rt->thread_pools)) {
556         return;
557     }
558 
559     if (rt->type == NXT_PROCESS_MAIN) {
560         if (rt->pid_file != NULL) {
561             nxt_file_delete(rt->pid_file);
562         }
563 
564 #if (NXT_HAVE_UNIX_DOMAIN)
565         {
566             nxt_sockaddr_t   *sa;
567             nxt_file_name_t  *name;
568 
569             sa = rt->controller_listen;
570 
571             if (sa->u.sockaddr.sa_family == AF_UNIX) {
572                 name = (nxt_file_name_t *) sa->u.sockaddr_un.sun_path;
573                 (void) nxt_file_delete(name);
574             }
575         }
576 #endif
577     }
578 
579     if (!engine->event.signal_support) {
580         nxt_event_engine_signals_stop(engine);
581     }
582 
583     nxt_runtime_process_each(rt, process) {
584 
585         nxt_runtime_process_remove(rt, process);
586         nxt_process_close_ports(task, process);
587 
588     } nxt_runtime_process_loop;
589 
590     status = rt->status;
591 
592     engine_count = 0;
593 
594     nxt_queue_each(engine, &rt->engines, nxt_event_engine_t, link) {
595 
596         engine_count++;
597 
598     } nxt_queue_loop;
599 
600     if (engine_count <= 1) {
601         if (rt->port_by_type[rt->type] != NULL) {
602             nxt_port_use(task, rt->port_by_type[rt->type], -1);
603         }
604 
605         nxt_thread_mutex_destroy(&rt->processes_mutex);
606 
607         nxt_mp_destroy(rt->mem_pool);
608     }
609 
610     nxt_debug(task, "exit: %d", status);
611 
612     exit(status);
613     nxt_unreachable();
614 }
615 
616 
617 static nxt_int_t
618 nxt_runtime_event_engine_change(nxt_task_t *task, nxt_runtime_t *rt)
619 {
620     nxt_event_engine_t           *engine;
621     const nxt_event_interface_t  *interface;
622 
623     engine = task->thread->engine;
624 
625     if (engine->batch == rt->batch
626         && nxt_strcmp(engine->event.name, rt->engine) == 0)
627     {
628         return NXT_OK;
629     }
630 
631     interface = nxt_service_get(rt->services, "engine", rt->engine);
632 
633     if (interface != NULL) {
634         return nxt_event_engine_change(engine, interface, rt->batch);
635     }
636 
637     return NXT_ERROR;
638 }
639 
640 
641 void
642 nxt_runtime_event_engine_free(nxt_runtime_t *rt)
643 {
644     nxt_queue_link_t    *link;
645     nxt_event_engine_t  *engine;
646 
647     link = nxt_queue_first(&rt->engines);
648     nxt_queue_remove(link);
649 
650     engine = nxt_queue_link_data(link, nxt_event_engine_t, link);
651     nxt_event_engine_free(engine);
652 }
653 
654 
655 nxt_int_t
656 nxt_runtime_thread_pool_create(nxt_thread_t *thr, nxt_runtime_t *rt,
657     nxt_uint_t max_threads, nxt_nsec_t timeout)
658 {
659     nxt_thread_pool_t   *thread_pool, **tp;
660 
661     tp = nxt_array_add(rt->thread_pools);
662     if (tp == NULL) {
663         return NXT_ERROR;
664     }
665 
666     thread_pool = nxt_thread_pool_create(max_threads, timeout,
667                                          nxt_runtime_thread_pool_init,
668                                          thr->engine,
669                                          nxt_runtime_thread_pool_exit);
670 
671     if (nxt_fast_path(thread_pool != NULL)) {
672         *tp = thread_pool;
673     }
674 
675     return NXT_OK;
676 }
677 
678 
679 static void
680 nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt,
681     nxt_runtime_cont_t cont)
682 {
683     nxt_uint_t         n;
684     nxt_thread_pool_t  **tp;
685 
686     rt->continuation = cont;
687 
688     n = rt->thread_pools->nelts;
689 
690     if (n == 0) {
691         cont(task, 0);
692         return;
693     }
694 
695     tp = rt->thread_pools->elts;
696 
697     do {
698         nxt_thread_pool_destroy(*tp);
699 
700         tp++;
701         n--;
702     } while (n != 0);
703 }
704 
705 
706 static void
707 nxt_runtime_thread_pool_init(void)
708 {
709 }
710 
711 
712 static void
713 nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj, void *data)
714 {
715     nxt_uint_t           i, n;
716     nxt_runtime_t        *rt;
717     nxt_thread_pool_t    *tp, **thread_pools;
718     nxt_thread_handle_t  handle;
719 
720     tp = obj;
721 
722     if (data != NULL) {
723         handle = (nxt_thread_handle_t) (uintptr_t) data;
724         nxt_thread_wait(handle);
725     }
726 
727     rt = task->thread->runtime;
728 
729     thread_pools = rt->thread_pools->elts;
730     n = rt->thread_pools->nelts;
731 
732     nxt_debug(task, "thread pools: %ui", n);
733 
734     for (i = 0; i < n; i++) {
735 
736         if (tp == thread_pools[i]) {
737             nxt_array_remove(rt->thread_pools, &thread_pools[i]);
738 
739             nxt_free(tp);
740 
741             if (n == 1) {
742                 /* The last thread pool. */
743                 rt->continuation(task, 0);
744             }
745 
746             return;
747         }
748     }
749 }
750 
751 
752 static nxt_int_t
753 nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt)
754 {
755     nxt_int_t                    ret;
756     nxt_str_t                    control;
757     nxt_uint_t                   n;
758     nxt_file_t                   *file;
759     const char                   *slash;
760     nxt_sockaddr_t               *sa;
761     nxt_file_name_str_t          file_name;
762     const nxt_event_interface_t  *interface;
763 
764     rt->daemon = 1;
765     rt->engine_connections = 256;
766     rt->auxiliary_threads = 2;
767     rt->user_cred.user = NXT_USER;
768     rt->group = NXT_GROUP;
769     rt->pid = NXT_PID;
770     rt->log = NXT_LOG;
771     rt->modules = NXT_MODULES;
772     rt->state = NXT_STATE;
773     rt->control = NXT_CONTROL_SOCK;
774     rt->tmp = NXT_TMP;
775 
776     nxt_memzero(&rt->capabilities, sizeof(nxt_capabilities_t));
777 
778     if (nxt_runtime_conf_read_cmd(task, rt) != NXT_OK) {
779         return NXT_ERROR;
780     }
781 
782     if (nxt_capability_set(task, &rt->capabilities) != NXT_OK) {
783         return NXT_ERROR;
784     }
785 
786     if (rt->capabilities.setid) {
787         ret = nxt_credential_get(task, rt->mem_pool, &rt->user_cred,
788                                 rt->group);
789 
790         if (nxt_slow_path(ret != NXT_OK)) {
791             return NXT_ERROR;
792         }
793 
794     } else {
795         nxt_log(task, NXT_LOG_WARN, "Unit is running unprivileged, then it "
796                 "cannot use arbitrary user and group.");
797     }
798 
799     /* An engine's parameters. */
800 
801     interface = nxt_service_get(rt->services, "engine", rt->engine);
802     if (interface == NULL) {
803         return NXT_ERROR;
804     }
805 
806     rt->engine = interface->name;
807 
808     ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%Z", rt->pid);
809     if (nxt_slow_path(ret != NXT_OK)) {
810         return NXT_ERROR;
811     }
812 
813     rt->pid_file = file_name.start;
814 
815     ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%Z", rt->log);
816     if (nxt_slow_path(ret != NXT_OK)) {
817         return NXT_ERROR;
818     }
819 
820     file = nxt_list_first(rt->log_files);
821     file->name = file_name.start;
822 
823     slash = "";
824     n = nxt_strlen(rt->modules);
825 
826     if (n > 1 && rt->modules[n - 1] != '/') {
827         slash = "/";
828     }
829 
830     ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%s*.unit.so%Z",
831                                rt->modules, slash);
832     if (nxt_slow_path(ret != NXT_OK)) {
833         return NXT_ERROR;
834     }
835 
836     rt->modules = (char *) file_name.start;
837 
838     slash = "";
839     n = nxt_strlen(rt->state);
840 
841     if (n > 1 && rt->state[n - 1] != '/') {
842         slash = "/";
843     }
844 
845     ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%sversion%Z",
846                                rt->state, slash);
847     if (nxt_slow_path(ret != NXT_OK)) {
848         return NXT_ERROR;
849     }
850 
851     rt->ver = (char *) file_name.start;
852 
853     ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s.tmp%Z", rt->ver);
854     if (nxt_slow_path(ret != NXT_OK)) {
855         return NXT_ERROR;
856     }
857 
858     rt->ver_tmp = (char *) file_name.start;
859 
860     ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%sconf.json%Z",
861                                rt->state, slash);
862     if (nxt_slow_path(ret != NXT_OK)) {
863         return NXT_ERROR;
864     }
865 
866     rt->conf = (char *) file_name.start;
867 
868     ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s.tmp%Z", rt->conf);
869     if (nxt_slow_path(ret != NXT_OK)) {
870         return NXT_ERROR;
871     }
872 
873     rt->conf_tmp = (char *) file_name.start;
874 
875     ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%scerts/%Z",
876                                rt->state, slash);
877     if (nxt_slow_path(ret != NXT_OK)) {
878         return NXT_ERROR;
879     }
880 
881     ret = mkdir((char *) file_name.start, S_IRWXU);
882 
883     if (nxt_fast_path(ret == 0 || nxt_errno == EEXIST)) {
884         rt->certs.length = file_name.len;
885         rt->certs.start = file_name.start;
886 
887     } else {
888         nxt_alert(task, "Unable to create certificates storage directory: "
889                   "mkdir(%s) failed %E", file_name.start, nxt_errno);
890     }
891 
892     control.length = nxt_strlen(rt->control);
893     control.start = (u_char *) rt->control;
894 
895     sa = nxt_sockaddr_parse(rt->mem_pool, &control);
896     if (nxt_slow_path(sa == NULL)) {
897         return NXT_ERROR;
898     }
899 
900     sa->type = SOCK_STREAM;
901 
902     rt->controller_listen = sa;
903 
904     if (nxt_runtime_controller_socket(task, rt) != NXT_OK) {
905         return NXT_ERROR;
906     }
907 
908     return NXT_OK;
909 }
910 
911 
912 static nxt_int_t
913 nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt)
914 {
915     char    *p, **argv;
916     u_char  *end;
917     u_char  buf[1024];
918 
919     static const char  version[] =
920         "unit version: " NXT_VERSION "\n"
921         "configured as ./configure" NXT_CONFIGURE_OPTIONS "\n";
922 
923     static const char  no_control[] =
924                        "option \"--control\" requires socket address\n";
925     static const char  no_user[] = "option \"--user\" requires username\n";
926     static const char  no_group[] = "option \"--group\" requires group name\n";
927     static const char  no_pid[] = "option \"--pid\" requires filename\n";
928     static const char  no_log[] = "option \"--log\" requires filename\n";
929     static const char  no_modules[] =
930                        "option \"--modules\" requires directory\n";
931     static const char  no_state[] = "option \"--state\" requires directory\n";
932     static const char  no_tmp[] = "option \"--tmp\" requires directory\n";
933 
934     static const char  help[] =
935         "\n"
936         "unit options:\n"
937         "\n"
938         "  --version            print unit version and configure options\n"
939         "\n"
940         "  --no-daemon          run unit in non-daemon mode\n"
941         "\n"
942         "  --control ADDRESS    set address of control API socket\n"
943         "                       default: \"" NXT_CONTROL_SOCK "\"\n"
944         "\n"
945         "  --pid FILE           set pid filename\n"
946         "                       default: \"" NXT_PID "\"\n"
947         "\n"
948         "  --log FILE           set log filename\n"
949         "                       default: \"" NXT_LOG "\"\n"
950         "\n"
951         "  --modules DIRECTORY  set modules directory name\n"
952         "                       default: \"" NXT_MODULES "\"\n"
953         "\n"
954         "  --state DIRECTORY    set state directory name\n"
955         "                       default: \"" NXT_STATE "\"\n"
956         "\n"
957         "  --tmp DIRECTORY      set tmp directory name\n"
958         "                       default: \"" NXT_TMP "\"\n"
959         "\n"
960         "  --user USER          set non-privileged processes to run"
961                                 " as specified user\n"
962         "                       default: \"" NXT_USER "\"\n"
963         "\n"
964         "  --group GROUP        set non-privileged processes to run"
965                                 " as specified group\n"
966         "                       default: ";
967 
968     static const char  group[] = "\"" NXT_GROUP "\"\n\n";
969     static const char  primary[] = "user's primary group\n\n";
970 
971     argv = &nxt_process_argv[1];
972 
973     while (*argv != NULL) {
974         p = *argv++;
975 
976         if (nxt_strcmp(p, "--control") == 0) {
977             if (*argv == NULL) {
978                 write(STDERR_FILENO, no_control, nxt_length(no_control));
979                 return NXT_ERROR;
980             }
981 
982             p = *argv++;
983 
984             rt->control = p;
985 
986             continue;
987         }
988 
989         if (nxt_strcmp(p, "--user") == 0) {
990             if (*argv == NULL) {
991                 write(STDERR_FILENO, no_user, nxt_length(no_user));
992                 return NXT_ERROR;
993             }
994 
995             p = *argv++;
996 
997             rt->user_cred.user = p;
998 
999             continue;
1000         }
1001 
1002         if (nxt_strcmp(p, "--group") == 0) {
1003             if (*argv == NULL) {
1004                 write(STDERR_FILENO, no_group, nxt_length(no_group));
1005                 return NXT_ERROR;
1006             }
1007 
1008             p = *argv++;
1009 
1010             rt->group = p;
1011 
1012             continue;
1013         }
1014 
1015         if (nxt_strcmp(p, "--pid") == 0) {
1016             if (*argv == NULL) {
1017                 write(STDERR_FILENO, no_pid, nxt_length(no_pid));
1018                 return NXT_ERROR;
1019             }
1020 
1021             p = *argv++;
1022 
1023             rt->pid = p;
1024 
1025             continue;
1026         }
1027 
1028         if (nxt_strcmp(p, "--log") == 0) {
1029             if (*argv == NULL) {
1030                 write(STDERR_FILENO, no_log, nxt_length(no_log));
1031                 return NXT_ERROR;
1032             }
1033 
1034             p = *argv++;
1035 
1036             rt->log = p;
1037 
1038             continue;
1039         }
1040 
1041         if (nxt_strcmp(p, "--modules") == 0) {
1042             if (*argv == NULL) {
1043                 write(STDERR_FILENO, no_modules, nxt_length(no_modules));
1044                 return NXT_ERROR;
1045             }
1046 
1047             p = *argv++;
1048 
1049             rt->modules = p;
1050 
1051             continue;
1052         }
1053 
1054         if (nxt_strcmp(p, "--state") == 0) {
1055             if (*argv == NULL) {
1056                 write(STDERR_FILENO, no_state, nxt_length(no_state));
1057                 return NXT_ERROR;
1058             }
1059 
1060             p = *argv++;
1061 
1062             rt->state = p;
1063 
1064             continue;
1065         }
1066 
1067         if (nxt_strcmp(p, "--tmp") == 0) {
1068             if (*argv == NULL) {
1069                 write(STDERR_FILENO, no_tmp, nxt_length(no_tmp));
1070                 return NXT_ERROR;
1071             }
1072 
1073             p = *argv++;
1074 
1075             rt->tmp = p;
1076 
1077             continue;
1078         }
1079 
1080         if (nxt_strcmp(p, "--no-daemon") == 0) {
1081             rt->daemon = 0;
1082             continue;
1083         }
1084 
1085         if (nxt_strcmp(p, "--version") == 0) {
1086             write(STDERR_FILENO, version, nxt_length(version));
1087             exit(0);
1088         }
1089 
1090         if (nxt_strcmp(p, "--help") == 0 || nxt_strcmp(p, "-h") == 0) {
1091             write(STDOUT_FILENO, help, nxt_length(help));
1092 
1093             if (sizeof(NXT_GROUP) == 1) {
1094                 write(STDOUT_FILENO, primary, nxt_length(primary));
1095 
1096             } else {
1097                 write(STDOUT_FILENO, group, nxt_length(group));
1098             }
1099 
1100             exit(0);
1101         }
1102 
1103         end = nxt_sprintf(buf, buf + sizeof(buf), "unknown option \"%s\", "
1104                           "try \"%s -h\" for available options\n",
1105                           p, nxt_process_argv[0]);
1106 
1107         write(STDERR_FILENO, buf, end - buf);
1108 
1109         return NXT_ERROR;
1110     }
1111 
1112     return NXT_OK;
1113 }
1114 
1115 
1116 nxt_listen_socket_t *
1117 nxt_runtime_listen_socket_add(nxt_runtime_t *rt, nxt_sockaddr_t *sa)
1118 {
1119     nxt_mp_t             *mp;
1120     nxt_listen_socket_t  *ls;
1121 
1122     ls = nxt_array_zero_add(rt->listen_sockets);
1123     if (ls == NULL) {
1124         return NULL;
1125     }
1126 
1127     mp = rt->mem_pool;
1128 
1129     ls->sockaddr = nxt_sockaddr_create(mp, &sa->u.sockaddr, sa->socklen,
1130                                        sa->length);
1131     if (ls->sockaddr == NULL) {
1132         return NULL;
1133     }
1134 
1135     ls->sockaddr->type = sa->type;
1136 
1137     nxt_sockaddr_text(ls->sockaddr);
1138 
1139     ls->socket = -1;
1140     ls->backlog = NXT_LISTEN_BACKLOG;
1141 
1142     return ls;
1143 }
1144 
1145 
1146 static nxt_int_t
1147 nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt)
1148 {
1149     size_t  length;
1150     char    hostname[NXT_MAXHOSTNAMELEN + 1];
1151 
1152     if (gethostname(hostname, NXT_MAXHOSTNAMELEN) != 0) {
1153         nxt_alert(task, "gethostname() failed %E", nxt_errno);
1154         return NXT_ERROR;
1155     }
1156 
1157     /*
1158      * Linux gethostname(2):
1159      *
1160      *    If the null-terminated hostname is too large to fit,
1161      *    then the name is truncated, and no error is returned.
1162      *
1163      * For this reason an additional byte is reserved in the buffer.
1164      */
1165     hostname[NXT_MAXHOSTNAMELEN] = '\0';
1166 
1167     length = nxt_strlen(hostname);
1168     rt->hostname.length = length;
1169 
1170     rt->hostname.start = nxt_mp_nget(rt->mem_pool, length);
1171 
1172     if (rt->hostname.start != NULL) {
1173         nxt_memcpy_lowcase(rt->hostname.start, (u_char *) hostname, length);
1174         return NXT_OK;
1175     }
1176 
1177     return NXT_ERROR;
1178 }
1179 
1180 
1181 static nxt_int_t
1182 nxt_runtime_log_files_init(nxt_runtime_t *rt)
1183 {
1184     nxt_file_t  *file;
1185     nxt_list_t  *log_files;
1186 
1187     log_files = nxt_list_create(rt->mem_pool, 1, sizeof(nxt_file_t));
1188 
1189     if (nxt_fast_path(log_files != NULL)) {
1190         rt->log_files = log_files;
1191 
1192         /* Preallocate the main log.  This allocation cannot fail. */
1193         file = nxt_list_zero_add(log_files);
1194 
1195         file->fd = NXT_FILE_INVALID;
1196         file->log_level = NXT_LOG_ALERT;
1197 
1198         return NXT_OK;
1199     }
1200 
1201     return NXT_ERROR;
1202 }
1203 
1204 
1205 nxt_file_t *
1206 nxt_runtime_log_file_add(nxt_runtime_t *rt, nxt_str_t *name)
1207 {
1208     nxt_int_t            ret;
1209     nxt_file_t           *file;
1210     nxt_file_name_str_t  file_name;
1211 
1212     ret = nxt_file_name_create(rt->mem_pool, &file_name, "V%Z", name);
1213 
1214     if (nxt_slow_path(ret != NXT_OK)) {
1215         return NULL;
1216     }
1217 
1218     nxt_list_each(file, rt->log_files) {
1219 
1220         /* STUB: hardecoded case sensitive/insensitive. */
1221 
1222         if (file->name != NULL
1223             && nxt_file_name_eq(file->name, file_name.start))
1224         {
1225             return file;
1226         }
1227 
1228     } nxt_list_loop;
1229 
1230     file = nxt_list_zero_add(rt->log_files);
1231 
1232     if (nxt_slow_path(file == NULL)) {
1233         return NULL;
1234     }
1235 
1236     file->fd = NXT_FILE_INVALID;
1237     file->log_level = NXT_LOG_ALERT;
1238     file->name = file_name.start;
1239 
1240     return file;
1241 }
1242 
1243 
1244 static nxt_int_t
1245 nxt_runtime_log_files_create(nxt_task_t *task, nxt_runtime_t *rt)
1246 {
1247     nxt_int_t   ret;
1248     nxt_file_t  *file;
1249 
1250     nxt_list_each(file, rt->log_files) {
1251 
1252         ret = nxt_file_open(task, file, O_WRONLY | O_APPEND, O_CREAT,
1253                             NXT_FILE_OWNER_ACCESS);
1254 
1255         if (ret != NXT_OK) {
1256             return NXT_ERROR;
1257         }
1258 
1259     } nxt_list_loop;
1260 
1261     file = nxt_list_first(rt->log_files);
1262 
1263     return nxt_file_stderr(file);
1264 }
1265 
1266 
1267 nxt_int_t
1268 nxt_runtime_listen_sockets_create(nxt_task_t *task, nxt_runtime_t *rt)
1269 {
1270     nxt_int_t            ret;
1271     nxt_uint_t           c, p, ncurr, nprev;
1272     nxt_listen_socket_t  *curr, *prev;
1273 
1274     curr = rt->listen_sockets->elts;
1275     ncurr = rt->listen_sockets->nelts;
1276 
1277     if (rt->inherited_sockets != NULL) {
1278         prev = rt->inherited_sockets->elts;
1279         nprev = rt->inherited_sockets->nelts;
1280 
1281     } else {
1282         prev = NULL;
1283         nprev = 0;
1284     }
1285 
1286     for (c = 0; c < ncurr; c++) {
1287 
1288         for (p = 0; p < nprev; p++) {
1289 
1290             if (nxt_sockaddr_cmp(curr[c].sockaddr, prev[p].sockaddr)) {
1291 
1292                 ret = nxt_listen_socket_update(task, &curr[c], &prev[p]);
1293                 if (ret != NXT_OK) {
1294                     return NXT_ERROR;
1295                 }
1296 
1297                 goto next;
1298             }
1299         }
1300 
1301         if (nxt_listen_socket_create(task, rt->mem_pool, &curr[c]) != NXT_OK) {
1302             return NXT_ERROR;
1303         }
1304 
1305     next:
1306 
1307         continue;
1308     }
1309 
1310     return NXT_OK;
1311 }
1312 
1313 
1314 nxt_int_t
1315 nxt_runtime_listen_sockets_enable(nxt_task_t *task, nxt_runtime_t *rt)
1316 {
1317     nxt_uint_t           i, n;
1318     nxt_listen_socket_t  *ls;
1319 
1320     ls = rt->listen_sockets->elts;
1321     n = rt->listen_sockets->nelts;
1322 
1323     for (i = 0; i < n; i++) {
1324         if (ls[i].flags == NXT_NONBLOCK) {
1325             if (nxt_listen_event(task, &ls[i]) == NULL) {
1326                 return NXT_ERROR;
1327             }
1328         }
1329     }
1330 
1331     return NXT_OK;
1332 }
1333 
1334 
1335 nxt_str_t *
1336 nxt_current_directory(nxt_mp_t *mp)
1337 {
1338     size_t     length;
1339     u_char     *p;
1340     nxt_str_t  *name;
1341     char       buf[NXT_MAX_PATH_LEN];
1342 
1343     length = nxt_dir_current(buf, NXT_MAX_PATH_LEN);
1344 
1345     if (nxt_fast_path(length != 0)) {
1346         name = nxt_str_alloc(mp, length + 1);
1347 
1348         if (nxt_fast_path(name != NULL)) {
1349             p = nxt_cpymem(name->start, buf, length);
1350             *p = '/';
1351 
1352             return name;
1353         }
1354     }
1355 
1356     return NULL;
1357 }
1358 
1359 
1360 static nxt_int_t
1361 nxt_runtime_pid_file_create(nxt_task_t *task, nxt_file_name_t *pid_file)
1362 {
1363     ssize_t     length;
1364     nxt_int_t   n;
1365     nxt_file_t  file;
1366     u_char      pid[NXT_INT64_T_LEN + nxt_length("\n")];
1367 
1368     nxt_memzero(&file, sizeof(nxt_file_t));
1369 
1370     file.name = pid_file;
1371 
1372     n = nxt_file_open(task, &file, O_WRONLY, O_CREAT | O_TRUNC,
1373                       NXT_FILE_DEFAULT_ACCESS);
1374 
1375     if (n != NXT_OK) {
1376         return NXT_ERROR;
1377     }
1378 
1379     length = nxt_sprintf(pid, pid + sizeof(pid), "%PI%n", nxt_pid) - pid;
1380 
1381     if (nxt_file_write(&file, pid, length, 0) != length) {
1382         return NXT_ERROR;
1383     }
1384 
1385     nxt_file_close(task, &file);
1386 
1387     return NXT_OK;
1388 }
1389 
1390 
1391 void
1392 nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process)
1393 {
1394     nxt_process_t  *child;
1395 
1396     if (process->registered == 1) {
1397         nxt_runtime_process_remove(rt, process);
1398     }
1399 
1400     if (process->link.next != NULL) {
1401         nxt_queue_remove(&process->link);
1402     }
1403 
1404     nxt_queue_each(child, &process->children, nxt_process_t, link) {
1405         nxt_queue_remove(&child->link);
1406         child->link.next = NULL;
1407     } nxt_queue_loop;
1408 
1409     nxt_assert(process->use_count == 0);
1410     nxt_assert(process->registered == 0);
1411     nxt_assert(nxt_queue_is_empty(&process->ports));
1412 
1413     nxt_port_mmaps_destroy(&process->incoming, 1);
1414 
1415     nxt_thread_mutex_destroy(&process->incoming.mutex);
1416 
1417     /* processes from nxt_runtime_process_get() have no memory pool */
1418     if (process->mem_pool != NULL) {
1419         nxt_mp_destroy(process->mem_pool);
1420     }
1421 
1422     nxt_mp_free(rt->mem_pool, process);
1423 }
1424 
1425 
1426 static nxt_int_t
1427 nxt_runtime_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data)
1428 {
1429     nxt_process_t  *process;
1430 
1431     process = data;
1432 
1433     if (lhq->key.length == sizeof(nxt_pid_t)
1434         && *(nxt_pid_t *) lhq->key.start == process->pid)
1435     {
1436         return NXT_OK;
1437     }
1438 
1439     return NXT_DECLINED;
1440 }
1441 
1442 static const nxt_lvlhsh_proto_t  lvlhsh_processes_proto  nxt_aligned(64) = {
1443     NXT_LVLHSH_DEFAULT,
1444     nxt_runtime_lvlhsh_pid_test,
1445     nxt_lvlhsh_alloc,
1446     nxt_lvlhsh_free,
1447 };
1448 
1449 
1450 nxt_inline void
1451 nxt_runtime_process_lhq_pid(nxt_lvlhsh_query_t *lhq, nxt_pid_t *pid)
1452 {
1453     lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid));
1454     lhq->key.length = sizeof(*pid);
1455     lhq->key.start = (u_char *) pid;
1456     lhq->proto = &lvlhsh_processes_proto;
1457 }
1458 
1459 
1460 nxt_process_t *
1461 nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid)
1462 {
1463     nxt_process_t       *process;
1464     nxt_lvlhsh_query_t  lhq;
1465 
1466     process = NULL;
1467 
1468     nxt_runtime_process_lhq_pid(&lhq, &pid);
1469 
1470     nxt_thread_mutex_lock(&rt->processes_mutex);
1471 
1472     if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) {
1473         process = lhq.value;
1474 
1475     } else {
1476         nxt_thread_log_debug("process %PI not found", pid);
1477     }
1478 
1479     nxt_thread_mutex_unlock(&rt->processes_mutex);
1480 
1481     return process;
1482 }
1483 
1484 
1485 static nxt_process_t *
1486 nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid)
1487 {
1488     nxt_process_t       *process;
1489     nxt_lvlhsh_query_t  lhq;
1490 
1491     nxt_runtime_process_lhq_pid(&lhq, &pid);
1492 
1493     nxt_thread_mutex_lock(&rt->processes_mutex);
1494 
1495     if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) {
1496         nxt_thread_log_debug("process %PI found", pid);
1497 
1498         nxt_thread_mutex_unlock(&rt->processes_mutex);
1499 
1500         process = lhq.value;
1501         process->use_count++;
1502 
1503         return process;
1504     }
1505 
1506     process = nxt_process_new(rt);
1507     if (nxt_slow_path(process == NULL)) {
1508 
1509         nxt_thread_mutex_unlock(&rt->processes_mutex);
1510 
1511         return NULL;
1512     }
1513 
1514     process->pid = pid;
1515 
1516     lhq.replace = 0;
1517     lhq.value = process;
1518     lhq.pool = rt->mem_pool;
1519 
1520     switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) {
1521 
1522     case NXT_OK:
1523         if (rt->nprocesses == 0) {
1524             rt->mprocess = process;
1525         }
1526 
1527         rt->nprocesses++;
1528 
1529         process->registered = 1;
1530 
1531         nxt_thread_log_debug("process %PI insert", pid);
1532         break;
1533 
1534     default:
1535         nxt_thread_log_debug("process %PI insert failed", pid);
1536         break;
1537     }
1538 
1539     nxt_thread_mutex_unlock(&rt->processes_mutex);
1540 
1541     return process;
1542 }
1543 
1544 
1545 void
1546 nxt_runtime_process_add(nxt_task_t *task, nxt_process_t *process)
1547 {
1548     nxt_port_t          *port;
1549     nxt_runtime_t       *rt;
1550     nxt_lvlhsh_query_t  lhq;
1551 
1552     nxt_assert(process->registered == 0);
1553 
1554     rt = task->thread->runtime;
1555 
1556     nxt_runtime_process_lhq_pid(&lhq, &process->pid);
1557 
1558     lhq.replace = 0;
1559     lhq.value = process;
1560     lhq.pool = rt->mem_pool;
1561 
1562     nxt_thread_mutex_lock(&rt->processes_mutex);
1563 
1564     switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) {
1565 
1566     case NXT_OK:
1567         if (rt->nprocesses == 0) {
1568             rt->mprocess = process;
1569         }
1570 
1571         rt->nprocesses++;
1572 
1573         nxt_process_port_each(process, port) {
1574 
1575             port->pid = process->pid;
1576 
1577             nxt_runtime_port_add(task, port);
1578 
1579         } nxt_process_port_loop;
1580 
1581         process->registered = 1;
1582 
1583         nxt_debug(task, "process %PI added", process->pid);
1584         break;
1585 
1586     default:
1587         nxt_alert(task, "process %PI failed to add", process->pid);
1588         break;
1589     }
1590 
1591     nxt_thread_mutex_unlock(&rt->processes_mutex);
1592 }
1593 
1594 
1595 void
1596 nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process)
1597 {
1598     nxt_pid_t           pid;
1599     nxt_lvlhsh_query_t  lhq;
1600 
1601     nxt_assert(process->registered != 0);
1602 
1603     pid = process->pid;
1604 
1605     nxt_runtime_process_lhq_pid(&lhq, &pid);
1606 
1607     lhq.pool = rt->mem_pool;
1608 
1609     nxt_thread_mutex_lock(&rt->processes_mutex);
1610 
1611     switch (nxt_lvlhsh_delete(&rt->processes, &lhq)) {
1612 
1613     case NXT_OK:
1614         nxt_assert(lhq.value == process);
1615 
1616         rt->nprocesses--;
1617 
1618         process->registered = 0;
1619 
1620         nxt_thread_log_debug("process %PI removed", pid);
1621         break;
1622 
1623     default:
1624         nxt_thread_log_alert("process %PI remove failed", pid);
1625         break;
1626     }
1627 
1628     nxt_thread_mutex_unlock(&rt->processes_mutex);
1629 }
1630 
1631 
1632 nxt_process_t *
1633 nxt_runtime_process_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe)
1634 {
1635     nxt_lvlhsh_each_init(lhe, &lvlhsh_processes_proto);
1636 
1637     return nxt_runtime_process_next(rt, lhe);
1638 }
1639 
1640 
1641 nxt_port_t *
1642 nxt_runtime_process_port_create(nxt_task_t *task, nxt_runtime_t *rt,
1643     nxt_pid_t pid, nxt_port_id_t id, nxt_process_type_t type)
1644 {
1645     nxt_port_t     *port;
1646     nxt_process_t  *process;
1647 
1648     process = nxt_runtime_process_get(rt, pid);
1649     if (nxt_slow_path(process == NULL)) {
1650         return NULL;
1651     }
1652 
1653     port = nxt_port_new(task, id, pid, type);
1654     if (nxt_slow_path(port == NULL)) {
1655         nxt_process_use(task, process, -1);
1656         return NULL;
1657     }
1658 
1659     nxt_process_port_add(task, process, port);
1660 
1661     nxt_process_use(task, process, -1);
1662 
1663     nxt_runtime_port_add(task, port);
1664 
1665     nxt_port_use(task, port, -1);
1666 
1667     return port;
1668 }
1669 
1670 
1671 static void
1672 nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port)
1673 {
1674     nxt_int_t      res;
1675     nxt_runtime_t  *rt;
1676 
1677     rt = task->thread->runtime;
1678 
1679     res = nxt_port_hash_add(&rt->ports, port);
1680 
1681     if (res != NXT_OK) {
1682         return;
1683     }
1684 
1685     rt->port_by_type[port->type] = port;
1686 
1687     nxt_port_use(task, port, 1);
1688 }
1689 
1690 
1691 void
1692 nxt_runtime_port_remove(nxt_task_t *task, nxt_port_t *port)
1693 {
1694     nxt_int_t      res;
1695     nxt_runtime_t  *rt;
1696 
1697     rt = task->thread->runtime;
1698 
1699     res = nxt_port_hash_remove(&rt->ports, port);
1700 
1701     if (res != NXT_OK) {
1702         return;
1703     }
1704 
1705     if (rt->port_by_type[port->type] == port) {
1706         rt->port_by_type[port->type] = NULL;
1707     }
1708 
1709     nxt_port_use(task, port, -1);
1710 }
1711 
1712 
1713 nxt_port_t *
1714 nxt_runtime_port_find(nxt_runtime_t *rt, nxt_pid_t pid,
1715     nxt_port_id_t port_id)
1716 {
1717     return nxt_port_hash_find(&rt->ports, pid, port_id);
1718 }
1719