xref: /unit/src/nxt_runtime.c (revision 277:6baa1731cc6f)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Valentin V. Bartenev
5  * Copyright (C) NGINX, Inc.
6  */
7 
8 #include <nxt_main.h>
9 #include <nxt_runtime.h>
10 #include <nxt_port.h>
11 #include <nxt_main_process.h>
12 #include <nxt_router.h>
13 
14 
15 static nxt_int_t nxt_runtime_inherited_listen_sockets(nxt_task_t *task,
16     nxt_runtime_t *rt);
17 static nxt_int_t nxt_runtime_systemd_listen_sockets(nxt_task_t *task,
18     nxt_runtime_t *rt);
19 static nxt_int_t nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt);
20 static nxt_int_t nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt);
21 static void nxt_runtime_start(nxt_task_t *task, void *obj, void *data);
22 static void nxt_runtime_initial_start(nxt_task_t *task);
23 static void nxt_single_process_start(nxt_thread_t *thr, nxt_task_t *task,
24     nxt_runtime_t *rt);
25 static void nxt_runtime_close_idle_connections(nxt_event_engine_t *engine);
26 static void nxt_runtime_exit(nxt_task_t *task, void *obj, void *data);
27 static nxt_int_t nxt_runtime_event_engine_change(nxt_task_t *task,
28     nxt_runtime_t *rt);
29 static nxt_int_t nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt);
30 static nxt_int_t nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt);
31 static nxt_sockaddr_t *nxt_runtime_sockaddr_parse(nxt_task_t *task,
32     nxt_mp_t *mp, nxt_str_t *addr);
33 static nxt_sockaddr_t *nxt_runtime_sockaddr_unix_parse(nxt_task_t *task,
34     nxt_mp_t *mp, nxt_str_t *addr);
35 static nxt_sockaddr_t *nxt_runtime_sockaddr_inet6_parse(nxt_task_t *task,
36     nxt_mp_t *mp, nxt_str_t *addr);
37 static nxt_sockaddr_t *nxt_runtime_sockaddr_inet_parse(nxt_task_t *task,
38     nxt_mp_t *mp, nxt_str_t *addr);
39 static nxt_int_t nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt);
40 static nxt_int_t nxt_runtime_log_files_init(nxt_runtime_t *rt);
41 static nxt_int_t nxt_runtime_log_files_create(nxt_task_t *task,
42     nxt_runtime_t *rt);
43 static nxt_int_t nxt_runtime_pid_file_create(nxt_task_t *task,
44     nxt_file_name_t *pid_file);
45 static void nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt,
46     nxt_runtime_cont_t cont);
47 static void nxt_runtime_thread_pool_init(void);
48 static void nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj,
49     void *data);
50 static void nxt_runtime_process_destroy(nxt_runtime_t *rt,
51     nxt_process_t *process);
52 static nxt_process_t *nxt_runtime_process_remove_pid(nxt_runtime_t *rt,
53     nxt_pid_t pid);
54 
55 
56 nxt_int_t
57 nxt_runtime_create(nxt_task_t *task)
58 {
59     nxt_mp_t               *mp;
60     nxt_int_t              ret;
61     nxt_array_t            *listen_sockets;
62     nxt_runtime_t          *rt;
63     nxt_app_lang_module_t  *lang;
64 
65     mp = nxt_mp_create(1024, 128, 256, 32);
66 
67     if (nxt_slow_path(mp == NULL)) {
68         return NXT_ERROR;
69     }
70 
71     rt = nxt_mp_zget(mp, sizeof(nxt_runtime_t));
72     if (nxt_slow_path(rt == NULL)) {
73         return NXT_ERROR;
74     }
75 
76     task->thread->runtime = rt;
77     rt->mem_pool = mp;
78 
79     nxt_thread_mutex_create(&rt->processes_mutex);
80 
81     rt->services = nxt_services_init(mp);
82     if (nxt_slow_path(rt->services == NULL)) {
83         goto fail;
84     }
85 
86     rt->languages = nxt_array_create(mp, 1, sizeof(nxt_app_lang_module_t));
87     if (nxt_slow_path(rt->languages == NULL)) {
88         goto fail;
89     }
90 
91     /* Should not fail. */
92     lang = nxt_array_add(rt->languages);
93     lang->type = (nxt_str_t) nxt_string("go");
94     lang->version = (nxt_str_t) nxt_null_string;
95     lang->file = NULL;
96     lang->module = &nxt_go_module;
97 
98     listen_sockets = nxt_array_create(mp, 1, sizeof(nxt_listen_socket_t));
99     if (nxt_slow_path(listen_sockets == NULL)) {
100         goto fail;
101     }
102 
103     rt->listen_sockets = listen_sockets;
104 
105     ret = nxt_runtime_inherited_listen_sockets(task, rt);
106     if (nxt_slow_path(ret != NXT_OK)) {
107         goto fail;
108     }
109 
110     if (nxt_runtime_hostname(task, rt) != NXT_OK) {
111         goto fail;
112     }
113 
114     if (nxt_slow_path(nxt_runtime_log_files_init(rt) != NXT_OK)) {
115         goto fail;
116     }
117 
118     if (nxt_runtime_event_engines(task, rt) != NXT_OK) {
119         goto fail;
120     }
121 
122     if (nxt_slow_path(nxt_runtime_thread_pools(task->thread, rt) != NXT_OK)) {
123         goto fail;
124     }
125 
126     rt->start = nxt_runtime_initial_start;
127 
128     if (nxt_runtime_conf_init(task, rt) != NXT_OK) {
129         goto fail;
130     }
131 
132     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
133                        nxt_runtime_start, task, rt, NULL);
134 
135     return NXT_OK;
136 
137 fail:
138 
139     nxt_mp_destroy(mp);
140 
141     return NXT_ERROR;
142 }
143 
144 
145 static nxt_int_t
146 nxt_runtime_inherited_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt)
147 {
148     u_char               *v, *p;
149     nxt_int_t            type;
150     nxt_array_t          *inherited_sockets;
151     nxt_socket_t         s;
152     nxt_listen_socket_t  *ls;
153 
154     v = (u_char *) getenv("NGINX");
155 
156     if (v == NULL) {
157         return nxt_runtime_systemd_listen_sockets(task, rt);
158     }
159 
160     nxt_log(task, NXT_LOG_CRIT, "using inherited listen sockets: %s", v);
161 
162     inherited_sockets = nxt_array_create(rt->mem_pool,
163                                          1, sizeof(nxt_listen_socket_t));
164     if (inherited_sockets == NULL) {
165         return NXT_ERROR;
166     }
167 
168     rt->inherited_sockets = inherited_sockets;
169 
170     for (p = v; *p != '\0'; p++) {
171 
172         if (*p == ';') {
173             s = nxt_int_parse(v, p - v);
174 
175             if (nxt_slow_path(s < 0)) {
176                 nxt_log(task, NXT_LOG_CRIT, "invalid socket number "
177                         "\"%s\" in NGINX environment variable, "
178                         "ignoring the rest of the variable", v);
179                 return NXT_ERROR;
180             }
181 
182             v = p + 1;
183 
184             ls = nxt_array_zero_add(inherited_sockets);
185             if (nxt_slow_path(ls == NULL)) {
186                 return NXT_ERROR;
187             }
188 
189             ls->socket = s;
190 
191             ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s);
192             if (nxt_slow_path(ls->sockaddr == NULL)) {
193                 return NXT_ERROR;
194             }
195 
196             type = nxt_socket_getsockopt(task, s, SOL_SOCKET, SO_TYPE);
197             if (nxt_slow_path(type == -1)) {
198                 return NXT_ERROR;
199             }
200 
201             ls->sockaddr->type = (uint16_t) type;
202         }
203     }
204 
205     return NXT_OK;
206 }
207 
208 
209 static nxt_int_t
210 nxt_runtime_systemd_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt)
211 {
212     u_char               *nfd, *pid;
213     nxt_int_t            n;
214     nxt_array_t          *inherited_sockets;
215     nxt_socket_t         s;
216     nxt_listen_socket_t  *ls;
217 
218     /*
219      * Number of listening sockets passed.  The socket
220      * descriptors start from number 3 and are sequential.
221      */
222     nfd = (u_char *) getenv("LISTEN_FDS");
223     if (nfd == NULL) {
224         return NXT_OK;
225     }
226 
227     /* The pid of the service process. */
228     pid = (u_char *) getenv("LISTEN_PID");
229     if (pid == NULL) {
230         return NXT_OK;
231     }
232 
233     n = nxt_int_parse(nfd, nxt_strlen(nfd));
234     if (n < 0) {
235         return NXT_OK;
236     }
237 
238     if (nxt_pid != nxt_int_parse(pid, nxt_strlen(pid))) {
239         return NXT_OK;
240     }
241 
242     nxt_log(task, NXT_LOG_INFO, "using %s systemd listen sockets", n);
243 
244     inherited_sockets = nxt_array_create(rt->mem_pool,
245                                          n, sizeof(nxt_listen_socket_t));
246     if (inherited_sockets == NULL) {
247         return NXT_ERROR;
248     }
249 
250     rt->inherited_sockets = inherited_sockets;
251 
252     for (s = 3; s < n; s++) {
253         ls = nxt_array_zero_add(inherited_sockets);
254         if (nxt_slow_path(ls == NULL)) {
255             return NXT_ERROR;
256         }
257 
258         ls->socket = s;
259 
260         ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s);
261         if (nxt_slow_path(ls->sockaddr == NULL)) {
262             return NXT_ERROR;
263         }
264 
265         ls->sockaddr->type = SOCK_STREAM;
266     }
267 
268     return NXT_OK;
269 }
270 
271 
272 static nxt_int_t
273 nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt)
274 {
275     nxt_thread_t                 *thread;
276     nxt_event_engine_t           *engine;
277     const nxt_event_interface_t  *interface;
278 
279     interface = nxt_service_get(rt->services, "engine", NULL);
280 
281     if (nxt_slow_path(interface == NULL)) {
282         /* TODO: log */
283         return NXT_ERROR;
284     }
285 
286     engine = nxt_event_engine_create(task, interface,
287                                      nxt_main_process_signals, 0, 0);
288 
289     if (nxt_slow_path(engine == NULL)) {
290         return NXT_ERROR;
291     }
292 
293     thread = task->thread;
294     thread->engine = engine;
295     thread->fiber = &engine->fibers->fiber;
296 
297     engine->id = rt->last_engine_id++;
298 
299     nxt_queue_init(&rt->engines);
300     nxt_queue_insert_tail(&rt->engines, &engine->link);
301 
302     return NXT_OK;
303 }
304 
305 
306 static nxt_int_t
307 nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt)
308 {
309     nxt_int_t    ret;
310     nxt_array_t  *thread_pools;
311 
312     thread_pools = nxt_array_create(rt->mem_pool, 1,
313                                     sizeof(nxt_thread_pool_t *));
314 
315     if (nxt_slow_path(thread_pools == NULL)) {
316         return NXT_ERROR;
317     }
318 
319     rt->thread_pools = thread_pools;
320     ret = nxt_runtime_thread_pool_create(thr, rt, 2, 60000 * 1000000LL);
321 
322     if (nxt_slow_path(ret != NXT_OK)) {
323         return NXT_ERROR;
324     }
325 
326     return NXT_OK;
327 }
328 
329 
330 static void
331 nxt_runtime_start(nxt_task_t *task, void *obj, void *data)
332 {
333     nxt_runtime_t  *rt;
334 
335     rt = obj;
336 
337     nxt_debug(task, "rt conf done");
338 
339     task->thread->log->ctx_handler = NULL;
340     task->thread->log->ctx = NULL;
341 
342     if (nxt_runtime_log_files_create(task, rt) != NXT_OK) {
343         goto fail;
344     }
345 
346     if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) {
347         goto fail;
348     }
349 
350     /*
351      * Thread pools should be destroyed before starting worker
352      * processes, because thread pool semaphores will stick in
353      * locked state in new processes after fork().
354      */
355     nxt_runtime_thread_pool_destroy(task, rt, rt->start);
356 
357     return;
358 
359 fail:
360 
361     nxt_runtime_quit(task);
362 }
363 
364 
365 static void
366 nxt_runtime_initial_start(nxt_task_t *task)
367 {
368     nxt_int_t                    ret;
369     nxt_thread_t                 *thr;
370     nxt_runtime_t                *rt;
371     const nxt_event_interface_t  *interface;
372 
373     thr = task->thread;
374     rt = thr->runtime;
375 
376     if (rt->inherited_sockets == NULL && rt->daemon) {
377 
378         if (nxt_process_daemon(task) != NXT_OK) {
379             goto fail;
380         }
381 
382         /*
383          * An event engine should be updated after fork()
384          * even if an event facility was not changed because:
385          * 1) inherited kqueue descriptor is invalid,
386          * 2) the signal thread is not inherited.
387          */
388         interface = nxt_service_get(rt->services, "engine", rt->engine);
389         if (interface == NULL) {
390             goto fail;
391         }
392 
393         ret = nxt_event_engine_change(task->thread->engine, interface,
394                                       rt->batch);
395         if (ret != NXT_OK) {
396             goto fail;
397         }
398     }
399 
400     ret = nxt_runtime_pid_file_create(task, rt->pid_file);
401     if (ret != NXT_OK) {
402         goto fail;
403     }
404 
405     if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) {
406         goto fail;
407     }
408 
409     thr->engine->max_connections = rt->engine_connections;
410 
411     if (rt->main_process) {
412         if (nxt_main_process_start(thr, task, rt) != NXT_ERROR) {
413             return;
414         }
415 
416     } else {
417         nxt_single_process_start(thr, task, rt);
418         return;
419     }
420 
421 fail:
422 
423     nxt_runtime_quit(task);
424 }
425 
426 
427 static void
428 nxt_single_process_start(nxt_thread_t *thr, nxt_task_t *task, nxt_runtime_t *rt)
429 {
430     nxt_int_t  ret;
431 
432     ret = nxt_runtime_thread_pool_create(thr, rt, rt->auxiliary_threads,
433                                        60000 * 1000000LL);
434 
435     if (nxt_slow_path(ret != NXT_OK)) {
436         nxt_runtime_quit(task);
437         return;
438     }
439 
440     rt->types |= (1U << NXT_PROCESS_SINGLE);
441 
442     nxt_runtime_listen_sockets_enable(task, rt);
443 
444     return;
445 }
446 
447 
448 void
449 nxt_runtime_quit(nxt_task_t *task)
450 {
451     nxt_bool_t          done;
452     nxt_runtime_t       *rt;
453     nxt_event_engine_t  *engine;
454 
455     rt = task->thread->runtime;
456     engine = task->thread->engine;
457 
458     nxt_debug(task, "exiting");
459 
460     done = 1;
461 
462     if (!engine->shutdown) {
463         engine->shutdown = 1;
464 
465         if (!nxt_array_is_empty(rt->thread_pools)) {
466             nxt_runtime_thread_pool_destroy(task, rt, nxt_runtime_quit);
467             done = 0;
468         }
469 
470         if (nxt_runtime_is_main(rt)) {
471             nxt_main_stop_worker_processes(task, rt);
472             done = 0;
473         }
474     }
475 
476     nxt_runtime_close_idle_connections(engine);
477 
478     if (done) {
479         nxt_work_queue_add(&engine->fast_work_queue, nxt_runtime_exit,
480                            task, rt, engine);
481     }
482 }
483 
484 
485 static void
486 nxt_runtime_close_idle_connections(nxt_event_engine_t *engine)
487 {
488     nxt_conn_t        *c;
489     nxt_queue_t       *idle;
490     nxt_queue_link_t  *link, *next;
491 
492     nxt_debug(&engine->task, "close idle connections");
493 
494     idle = &engine->idle_connections;
495 
496     for (link = nxt_queue_head(idle);
497          link != nxt_queue_tail(idle);
498          link = next)
499     {
500         next = nxt_queue_next(link);
501         c = nxt_queue_link_data(link, nxt_conn_t, link);
502 
503         if (!c->socket.read_ready) {
504             nxt_queue_remove(link);
505             nxt_conn_close(engine, c);
506         }
507     }
508 }
509 
510 
511 static void
512 nxt_runtime_exit(nxt_task_t *task, void *obj, void *data)
513 {
514     nxt_runtime_t       *rt;
515     nxt_process_t       *process;
516     nxt_event_engine_t  *engine;
517 
518     rt = obj;
519     engine = data;
520 
521     nxt_debug(task, "thread pools: %d", rt->thread_pools->nelts);
522 
523     if (!nxt_array_is_empty(rt->thread_pools)) {
524         return;
525     }
526 
527     if (nxt_runtime_is_main(rt)) {
528         if (rt->pid_file != NULL) {
529             nxt_file_delete(rt->pid_file);
530         }
531 
532 #if (NXT_HAVE_UNIX_DOMAIN)
533         {
534             nxt_sockaddr_t   *sa;
535             nxt_file_name_t  *name;
536 
537             sa = rt->controller_listen;
538 
539             if (sa->u.sockaddr.sa_family == AF_UNIX) {
540                 name = (nxt_file_name_t *) sa->u.sockaddr_un.sun_path;
541                 (void) nxt_file_delete(name);
542             }
543         }
544 #endif
545     }
546 
547     if (!engine->event.signal_support) {
548         nxt_event_engine_signals_stop(engine);
549     }
550 
551     nxt_runtime_process_each(rt, process) {
552 
553         nxt_runtime_process_remove(rt, process);
554 
555     } nxt_runtime_process_loop;
556 
557     nxt_thread_mutex_destroy(&rt->processes_mutex);
558 
559     nxt_mp_destroy(rt->mem_pool);
560 
561     nxt_debug(task, "exit");
562 
563     exit(0);
564     nxt_unreachable();
565 }
566 
567 
568 static nxt_int_t
569 nxt_runtime_event_engine_change(nxt_task_t *task, nxt_runtime_t *rt)
570 {
571     nxt_event_engine_t           *engine;
572     const nxt_event_interface_t  *interface;
573 
574     engine = task->thread->engine;
575 
576     if (engine->batch == rt->batch
577         && nxt_strcmp(engine->event.name, rt->engine) == 0)
578     {
579         return NXT_OK;
580     }
581 
582     interface = nxt_service_get(rt->services, "engine", rt->engine);
583 
584     if (interface != NULL) {
585         return nxt_event_engine_change(engine, interface, rt->batch);
586     }
587 
588     return NXT_ERROR;
589 }
590 
591 
592 void
593 nxt_runtime_event_engine_free(nxt_runtime_t *rt)
594 {
595     nxt_queue_link_t    *link;
596     nxt_event_engine_t  *engine;
597 
598     link = nxt_queue_first(&rt->engines);
599     nxt_queue_remove(link);
600 
601     engine = nxt_queue_link_data(link, nxt_event_engine_t, link);
602     nxt_event_engine_free(engine);
603 }
604 
605 
606 nxt_int_t
607 nxt_runtime_thread_pool_create(nxt_thread_t *thr, nxt_runtime_t *rt,
608     nxt_uint_t max_threads, nxt_nsec_t timeout)
609 {
610     nxt_thread_pool_t   *thread_pool, **tp;
611 
612     tp = nxt_array_add(rt->thread_pools);
613     if (tp == NULL) {
614         return NXT_ERROR;
615     }
616 
617     thread_pool = nxt_thread_pool_create(max_threads, timeout,
618                                          nxt_runtime_thread_pool_init,
619                                          thr->engine,
620                                          nxt_runtime_thread_pool_exit);
621 
622     if (nxt_fast_path(thread_pool != NULL)) {
623         *tp = thread_pool;
624     }
625 
626     return NXT_OK;
627 }
628 
629 
630 static void
631 nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt,
632     nxt_runtime_cont_t cont)
633 {
634     nxt_uint_t         n;
635     nxt_thread_pool_t  **tp;
636 
637     rt->continuation = cont;
638 
639     n = rt->thread_pools->nelts;
640 
641     if (n == 0) {
642         cont(task);
643         return;
644     }
645 
646     tp = rt->thread_pools->elts;
647 
648     do {
649         nxt_thread_pool_destroy(*tp);
650 
651         tp++;
652         n--;
653     } while (n != 0);
654 }
655 
656 
657 static void
658 nxt_runtime_thread_pool_init(void)
659 {
660 #if (NXT_REGEX)
661     nxt_regex_init(0);
662 #endif
663 }
664 
665 
666 static void
667 nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj, void *data)
668 {
669     nxt_uint_t           i, n;
670     nxt_runtime_t        *rt;
671     nxt_thread_pool_t    *tp, **thread_pools;
672     nxt_thread_handle_t  handle;
673 
674     tp = obj;
675 
676     if (data != NULL) {
677         handle = (nxt_thread_handle_t) (uintptr_t) data;
678         nxt_thread_wait(handle);
679     }
680 
681     rt = task->thread->runtime;
682 
683     thread_pools = rt->thread_pools->elts;
684     n = rt->thread_pools->nelts;
685 
686     nxt_debug(task, "thread pools: %ui", n);
687 
688     for (i = 0; i < n; i++) {
689 
690         if (tp == thread_pools[i]) {
691             nxt_array_remove(rt->thread_pools, &thread_pools[i]);
692 
693             if (n == 1) {
694                 /* The last thread pool. */
695                 rt->continuation(task);
696             }
697 
698             return;
699         }
700     }
701 }
702 
703 
704 static nxt_int_t
705 nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt)
706 {
707     nxt_int_t                    ret;
708     nxt_str_t                    control;
709     nxt_uint_t                   n;
710     nxt_file_t                   *file;
711     const char                   *slash;
712     nxt_sockaddr_t               *sa;
713     nxt_file_name_str_t          file_name;
714     const nxt_event_interface_t  *interface;
715 
716     rt->daemon = 1;
717     rt->main_process = 1;
718     rt->engine_connections = 256;
719     rt->auxiliary_threads = 2;
720     rt->user_cred.user = NXT_USER;
721     rt->group = NXT_GROUP;
722     rt->pid = NXT_PID;
723     rt->log = NXT_LOG;
724     rt->modules = NXT_MODULES;
725     rt->control = NXT_CONTROL_SOCK;
726 
727     if (nxt_runtime_conf_read_cmd(task, rt) != NXT_OK) {
728         return NXT_ERROR;
729     }
730 
731     if (nxt_user_cred_get(task, &rt->user_cred, rt->group) != NXT_OK) {
732         return NXT_ERROR;
733     }
734 
735     /* An engine's parameters. */
736 
737     interface = nxt_service_get(rt->services, "engine", rt->engine);
738     if (interface == NULL) {
739         return NXT_ERROR;
740     }
741 
742     rt->engine = interface->name;
743 
744     ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%Z", rt->pid);
745     if (nxt_slow_path(ret != NXT_OK)) {
746         return NXT_ERROR;
747     }
748 
749     rt->pid_file = file_name.start;
750 
751     ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%Z", rt->log);
752     if (nxt_slow_path(ret != NXT_OK)) {
753         return NXT_ERROR;
754     }
755 
756     file = nxt_list_first(rt->log_files);
757     file->name = file_name.start;
758 
759     slash = "";
760     n = nxt_strlen(rt->modules);
761 
762     if (n > 1 && rt->modules[n - 1] != '/') {
763         slash = "/";
764     }
765 
766     ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%s*.unit.so%Z",
767                                rt->modules, slash);
768     if (nxt_slow_path(ret != NXT_OK)) {
769         return NXT_ERROR;
770     }
771 
772     rt->modules = (char *) file_name.start;
773 
774     control.length = nxt_strlen(rt->control);
775     control.start = (u_char *) rt->control;
776 
777     sa = nxt_runtime_sockaddr_parse(task, rt->mem_pool, &control);
778     if (nxt_slow_path(sa == NULL)) {
779         return NXT_ERROR;
780     }
781 
782     rt->controller_listen = sa;
783 
784     if (nxt_runtime_controller_socket(task, rt) != NXT_OK) {
785         return NXT_ERROR;
786     }
787 
788     return NXT_OK;
789 }
790 
791 
792 static nxt_int_t
793 nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt)
794 {
795     char    *p, **argv;
796     u_char  *end;
797     u_char  buf[1024];
798 
799     static const char  version[] =
800         "unit version: " NXT_VERSION "\n"
801         "configured as ./configure" NXT_CONFIGURE_OPTIONS "\n";
802 
803     static const char  no_control[] =
804                        "option \"--control\" requires socket address\n";
805     static const char  no_user[] = "option \"--user\" requires username\n";
806     static const char  no_group[] = "option \"--group\" requires group name\n";
807     static const char  no_pid[] = "option \"--pid\" requires filename\n";
808     static const char  no_log[] = "option \"--log\" requires filename\n";
809     static const char  no_modules[] =
810                        "option \"--modules\" requires directory\n";
811 
812     static const char  help[] =
813         "\n"
814         "unit options:\n"
815         "\n"
816         "  --version            print unit version and configure options\n"
817         "\n"
818         "  --no-daemon          run unit in non-daemon mode\n"
819         "\n"
820         "  --control ADDRESS    set address of control API socket\n"
821         "                       default: \"" NXT_CONTROL_SOCK "\"\n"
822         "\n"
823         "  --pid FILE           set pid filename\n"
824         "                       default: \"" NXT_PID "\"\n"
825         "\n"
826         "  --log FILE           set log filename\n"
827         "                       default: \"" NXT_LOG "\"\n"
828         "\n"
829         "  --modules DIRECTORY  set modules directory name\n"
830         "                       default: \"" NXT_MODULES "\"\n"
831         "\n"
832         "  --user USER          set non-privileged processes to run"
833                                 " as specified user\n"
834         "                       default: \"" NXT_USER "\"\n"
835         "\n"
836         "  --group GROUP        set non-privileged processes to run"
837                                 " as specified group\n"
838         "                       default: ";
839 
840     static const char  group[] = "\"" NXT_GROUP "\"\n\n";
841     static const char  primary[] = "user's primary group\n\n";
842 
843     argv = &nxt_process_argv[1];
844 
845     while (*argv != NULL) {
846         p = *argv++;
847 
848         if (nxt_strcmp(p, "--control") == 0) {
849             if (*argv == NULL) {
850                 write(STDERR_FILENO, no_control, sizeof(no_control) - 1);
851                 return NXT_ERROR;
852             }
853 
854             p = *argv++;
855 
856             rt->control = p;
857 
858             continue;
859         }
860 
861         if (nxt_strcmp(p, "--upstream") == 0) {
862             if (*argv == NULL) {
863                 nxt_log(task, NXT_LOG_CRIT,
864                               "no argument for option \"--upstream\"");
865                 return NXT_ERROR;
866             }
867 
868             p = *argv++;
869 
870             rt->upstream.length = nxt_strlen(p);
871             rt->upstream.start = (u_char *) p;
872 
873             continue;
874         }
875 
876         if (nxt_strcmp(p, "--user") == 0) {
877             if (*argv == NULL) {
878                 write(STDERR_FILENO, no_user, sizeof(no_user) - 1);
879                 return NXT_ERROR;
880             }
881 
882             p = *argv++;
883 
884             rt->user_cred.user = p;
885 
886             continue;
887         }
888 
889         if (nxt_strcmp(p, "--group") == 0) {
890             if (*argv == NULL) {
891                 write(STDERR_FILENO, no_group, sizeof(no_group) - 1);
892                 return NXT_ERROR;
893             }
894 
895             p = *argv++;
896 
897             rt->group = p;
898 
899             continue;
900         }
901 
902         if (nxt_strcmp(p, "--pid") == 0) {
903             if (*argv == NULL) {
904                 write(STDERR_FILENO, no_pid, sizeof(no_pid) - 1);
905                 return NXT_ERROR;
906             }
907 
908             p = *argv++;
909 
910             rt->pid = p;
911 
912             continue;
913         }
914 
915         if (nxt_strcmp(p, "--log") == 0) {
916             if (*argv == NULL) {
917                 write(STDERR_FILENO, no_log, sizeof(no_log) - 1);
918                 return NXT_ERROR;
919             }
920 
921             p = *argv++;
922 
923             rt->log = p;
924 
925             continue;
926         }
927 
928         if (nxt_strcmp(p, "--modules") == 0) {
929             if (*argv == NULL) {
930                 write(STDERR_FILENO, no_modules, sizeof(no_modules) - 1);
931                 return NXT_ERROR;
932             }
933 
934             p = *argv++;
935 
936             rt->modules = p;
937 
938             continue;
939         }
940 
941         if (nxt_strcmp(p, "--no-daemon") == 0) {
942             rt->daemon = 0;
943             continue;
944         }
945 
946         if (nxt_strcmp(p, "--version") == 0) {
947             write(STDERR_FILENO, version, sizeof(version) - 1);
948             exit(0);
949         }
950 
951         if (nxt_strcmp(p, "--help") == 0) {
952             write(STDOUT_FILENO, help, sizeof(help) - 1);
953 
954             if (sizeof(NXT_GROUP) == 1) {
955                 write(STDOUT_FILENO, primary, sizeof(primary) - 1);
956 
957             } else {
958                 write(STDOUT_FILENO, group, sizeof(group) - 1);
959             }
960 
961             exit(0);
962         }
963 
964         end = nxt_sprintf(buf, buf + sizeof(buf), "unknown option \"%s\"\n", p);
965         write(STDERR_FILENO, buf, end - buf);
966 
967         return NXT_ERROR;
968     }
969 
970     return NXT_OK;
971 }
972 
973 
974 static nxt_sockaddr_t *
975 nxt_runtime_sockaddr_parse(nxt_task_t *task, nxt_mp_t *mp, nxt_str_t *addr)
976 {
977     u_char  *p;
978     size_t  length;
979 
980     length = addr->length;
981     p = addr->start;
982 
983     if (length >= 5 && nxt_memcmp(p, "unix:", 5) == 0) {
984         return nxt_runtime_sockaddr_unix_parse(task, mp, addr);
985     }
986 
987     if (length != 0 && *p == '[') {
988         return nxt_runtime_sockaddr_inet6_parse(task, mp, addr);
989     }
990 
991     return nxt_runtime_sockaddr_inet_parse(task, mp, addr);
992 }
993 
994 
995 static nxt_sockaddr_t *
996 nxt_runtime_sockaddr_unix_parse(nxt_task_t *task, nxt_mp_t *mp, nxt_str_t *addr)
997 {
998 #if (NXT_HAVE_UNIX_DOMAIN)
999     u_char          *p;
1000     size_t          length, socklen;
1001     nxt_sockaddr_t  *sa;
1002 
1003     /*
1004      * Actual sockaddr_un length can be lesser or even larger than defined
1005      * struct sockaddr_un length (see comment in nxt_socket.h).  So
1006      * limit maximum Unix domain socket address length by defined sun_path[]
1007      * length because some OSes accept addresses twice larger than defined
1008      * struct sockaddr_un.  Also reserve space for a trailing zero to avoid
1009      * ambiguity, since many OSes accept Unix domain socket addresses
1010      * without a trailing zero.
1011      */
1012     const size_t max_len = sizeof(struct sockaddr_un)
1013                            - offsetof(struct sockaddr_un, sun_path) - 1;
1014 
1015     /* cutting "unix:" */
1016     length = addr->length - 5;
1017     p = addr->start + 5;
1018 
1019     if (length == 0) {
1020         nxt_log(task, NXT_LOG_CRIT,
1021                 "unix domain socket \"%V\" name is invalid", addr);
1022         return NULL;
1023     }
1024 
1025     if (length > max_len) {
1026         nxt_log(task, NXT_LOG_CRIT,
1027                 "unix domain socket \"%V\" name is too long", addr);
1028         return NULL;
1029     }
1030 
1031     socklen = offsetof(struct sockaddr_un, sun_path) + length + 1;
1032 
1033 #if (NXT_LINUX)
1034 
1035     /*
1036      * Linux unix(7):
1037      *
1038      *   abstract: an abstract socket address is distinguished by the fact
1039      *   that sun_path[0] is a null byte ('\0').  The socket's address in
1040      *   this namespace is given by the additional bytes in sun_path that
1041      *   are covered by the specified length of the address structure.
1042      *   (Null bytes in the name have no special significance.)
1043      */
1044     if (p[0] == '@') {
1045         p[0] = '\0';
1046         socklen--;
1047     }
1048 
1049 #endif
1050 
1051     sa = nxt_sockaddr_alloc(mp, socklen, addr->length);
1052 
1053     if (nxt_slow_path(sa == NULL)) {
1054         return NULL;
1055     }
1056 
1057     sa->type = SOCK_STREAM;
1058 
1059     sa->u.sockaddr_un.sun_family = AF_UNIX;
1060     nxt_memcpy(sa->u.sockaddr_un.sun_path, p, length);
1061 
1062     return sa;
1063 
1064 #else  /* !(NXT_HAVE_UNIX_DOMAIN) */
1065 
1066     nxt_log(task, NXT_LOG_CRIT, "unix domain socket \"%V\" is not supported",
1067             addr);
1068 
1069     return NULL;
1070 
1071 #endif
1072 }
1073 
1074 
1075 static nxt_sockaddr_t *
1076 nxt_runtime_sockaddr_inet6_parse(nxt_task_t *task, nxt_mp_t *mp,
1077     nxt_str_t *addr)
1078 {
1079 #if (NXT_INET6)
1080     u_char           *p, *addr_end;
1081     size_t           length;
1082     nxt_int_t        port;
1083     nxt_sockaddr_t   *sa;
1084     struct in6_addr  *in6_addr;
1085 
1086     length = addr->length - 1;
1087     p = addr->start + 1;
1088 
1089     addr_end = nxt_memchr(p, ']', length);
1090 
1091     if (addr_end == NULL) {
1092         goto invalid_address;
1093     }
1094 
1095     sa = nxt_sockaddr_alloc(mp, sizeof(struct sockaddr_in6),
1096                             NXT_INET6_ADDR_STR_LEN);
1097     if (nxt_slow_path(sa == NULL)) {
1098         return NULL;
1099     }
1100 
1101     in6_addr = &sa->u.sockaddr_in6.sin6_addr;
1102 
1103     if (nxt_inet6_addr(in6_addr, p, addr_end - p) != NXT_OK) {
1104         goto invalid_address;
1105     }
1106 
1107     port = 0;
1108     p = addr_end + 1;
1109     length = (p + length) - p;
1110 
1111     if (length == 0) {
1112         goto found;
1113     }
1114 
1115     if (*p == ':') {
1116         port = nxt_int_parse(p + 1, length - 1);
1117 
1118         if (port >= 1 && port <= 65535) {
1119             goto found;
1120         }
1121     }
1122 
1123     nxt_log(task, NXT_LOG_CRIT, "invalid port in \"%V\"", addr);
1124 
1125     return NULL;
1126 
1127 found:
1128 
1129     sa->type = SOCK_STREAM;
1130 
1131     sa->u.sockaddr_in6.sin6_family = AF_INET6;
1132     sa->u.sockaddr_in6.sin6_port = htons((in_port_t) port);
1133 
1134     return sa;
1135 
1136 invalid_address:
1137 
1138     nxt_log(task, NXT_LOG_CRIT, "invalid IPv6 address in \"%V\"", addr);
1139 
1140     return NULL;
1141 
1142 #else
1143 
1144     nxt_log(task, NXT_LOG_CRIT, "IPv6 socket \"%V\" is not supported", addr);
1145 
1146     return NULL;
1147 
1148 #endif
1149 }
1150 
1151 
1152 static nxt_sockaddr_t *
1153 nxt_runtime_sockaddr_inet_parse(nxt_task_t *task, nxt_mp_t *mp,
1154     nxt_str_t *string)
1155 {
1156     u_char          *p, *ip;
1157     size_t          length;
1158     in_addr_t       addr;
1159     nxt_int_t       port;
1160     nxt_sockaddr_t  *sa;
1161 
1162     addr = INADDR_ANY;
1163 
1164     length = string->length;
1165     ip = string->start;
1166 
1167     p = nxt_memchr(ip, ':', length);
1168 
1169     if (p == NULL) {
1170 
1171         /* single value port, or address */
1172 
1173         port = nxt_int_parse(ip, length);
1174 
1175         if (port > 0) {
1176             /* "*:XX" */
1177 
1178             if (port < 1 || port > 65535) {
1179                 goto invalid_port;
1180             }
1181 
1182         } else {
1183             /* "x.x.x.x" */
1184 
1185             addr = nxt_inet_addr(ip, length);
1186 
1187             if (addr == INADDR_NONE) {
1188                 goto invalid_port;
1189             }
1190 
1191             port = 8080;
1192         }
1193 
1194     } else {
1195 
1196         /* x.x.x.x:XX */
1197 
1198         p++;
1199         length = (ip + length) - p;
1200         port = nxt_int_parse(p, length);
1201 
1202         if (port < 1 || port > 65535) {
1203             goto invalid_port;
1204         }
1205 
1206         length = (p - 1) - ip;
1207 
1208         if (length != 1 || ip[0] != '*') {
1209             addr = nxt_inet_addr(ip, length);
1210 
1211             if (addr == INADDR_NONE) {
1212                 goto invalid_addr;
1213             }
1214 
1215             /* "x.x.x.x:XX" */
1216         }
1217     }
1218 
1219     sa = nxt_sockaddr_alloc(mp, sizeof(struct sockaddr_in),
1220                             NXT_INET_ADDR_STR_LEN);
1221     if (nxt_slow_path(sa == NULL)) {
1222         return NULL;
1223     }
1224 
1225     sa->type = SOCK_STREAM;
1226 
1227     sa->u.sockaddr_in.sin_family = AF_INET;
1228     sa->u.sockaddr_in.sin_port = htons((in_port_t) port);
1229     sa->u.sockaddr_in.sin_addr.s_addr = addr;
1230 
1231     return sa;
1232 
1233 invalid_port:
1234 
1235     nxt_log(task, NXT_LOG_CRIT, "invalid port in \"%V\"", string);
1236 
1237     return NULL;
1238 
1239 invalid_addr:
1240 
1241     nxt_log(task, NXT_LOG_CRIT, "invalid address in \"%V\"", string);
1242 
1243     return NULL;
1244 }
1245 
1246 
1247 nxt_listen_socket_t *
1248 nxt_runtime_listen_socket_add(nxt_runtime_t *rt, nxt_sockaddr_t *sa)
1249 {
1250     nxt_mp_t             *mp;
1251     nxt_listen_socket_t  *ls;
1252 
1253     ls = nxt_array_zero_add(rt->listen_sockets);
1254     if (ls == NULL) {
1255         return NULL;
1256     }
1257 
1258     mp = rt->mem_pool;
1259 
1260     ls->sockaddr = nxt_sockaddr_create(mp, &sa->u.sockaddr, sa->socklen,
1261                                        sa->length);
1262     if (ls->sockaddr == NULL) {
1263         return NULL;
1264     }
1265 
1266     ls->sockaddr->type = sa->type;
1267 
1268     nxt_sockaddr_text(ls->sockaddr);
1269 
1270     ls->socket = -1;
1271     ls->backlog = NXT_LISTEN_BACKLOG;
1272 
1273     return ls;
1274 }
1275 
1276 
1277 static nxt_int_t
1278 nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt)
1279 {
1280     size_t  length;
1281     char    hostname[NXT_MAXHOSTNAMELEN + 1];
1282 
1283     if (gethostname(hostname, NXT_MAXHOSTNAMELEN) != 0) {
1284         nxt_log(task, NXT_LOG_CRIT, "gethostname() failed %E", nxt_errno);
1285         return NXT_ERROR;
1286     }
1287 
1288     /*
1289      * Linux gethostname(2):
1290      *
1291      *    If the null-terminated hostname is too large to fit,
1292      *    then the name is truncated, and no error is returned.
1293      *
1294      * For this reason an additional byte is reserved in the buffer.
1295      */
1296     hostname[NXT_MAXHOSTNAMELEN] = '\0';
1297 
1298     length = nxt_strlen(hostname);
1299     rt->hostname.length = length;
1300 
1301     rt->hostname.start = nxt_mp_nget(rt->mem_pool, length);
1302 
1303     if (rt->hostname.start != NULL) {
1304         nxt_memcpy_lowcase(rt->hostname.start, (u_char *) hostname, length);
1305         return NXT_OK;
1306     }
1307 
1308     return NXT_ERROR;
1309 }
1310 
1311 
1312 static nxt_int_t
1313 nxt_runtime_log_files_init(nxt_runtime_t *rt)
1314 {
1315     nxt_file_t  *file;
1316     nxt_list_t  *log_files;
1317 
1318     log_files = nxt_list_create(rt->mem_pool, 1, sizeof(nxt_file_t));
1319 
1320     if (nxt_fast_path(log_files != NULL)) {
1321         rt->log_files = log_files;
1322 
1323         /* Preallocate the main log.  This allocation cannot fail. */
1324         file = nxt_list_zero_add(log_files);
1325 
1326         file->fd = NXT_FILE_INVALID;
1327         file->log_level = NXT_LOG_CRIT;
1328 
1329         return NXT_OK;
1330     }
1331 
1332     return NXT_ERROR;
1333 }
1334 
1335 
1336 nxt_file_t *
1337 nxt_runtime_log_file_add(nxt_runtime_t *rt, nxt_str_t *name)
1338 {
1339     nxt_int_t            ret;
1340     nxt_file_t           *file;
1341     nxt_file_name_str_t  file_name;
1342 
1343     ret = nxt_file_name_create(rt->mem_pool, &file_name, "V%Z", name);
1344 
1345     if (nxt_slow_path(ret != NXT_OK)) {
1346         return NULL;
1347     }
1348 
1349     nxt_list_each(file, rt->log_files) {
1350 
1351         /* STUB: hardecoded case sensitive/insensitive. */
1352 
1353         if (file->name != NULL
1354             && nxt_file_name_eq(file->name, file_name.start))
1355         {
1356             return file;
1357         }
1358 
1359     } nxt_list_loop;
1360 
1361     file = nxt_list_zero_add(rt->log_files);
1362 
1363     if (nxt_slow_path(file == NULL)) {
1364         return NULL;
1365     }
1366 
1367     file->fd = NXT_FILE_INVALID;
1368     file->log_level = NXT_LOG_CRIT;
1369     file->name = file_name.start;
1370 
1371     return file;
1372 }
1373 
1374 
1375 static nxt_int_t
1376 nxt_runtime_log_files_create(nxt_task_t *task, nxt_runtime_t *rt)
1377 {
1378     nxt_int_t   ret;
1379     nxt_file_t  *file;
1380 
1381     nxt_list_each(file, rt->log_files) {
1382 
1383         ret = nxt_file_open(task, file, O_WRONLY | O_APPEND, O_CREAT,
1384                             NXT_FILE_OWNER_ACCESS);
1385 
1386         if (ret != NXT_OK) {
1387             return NXT_ERROR;
1388         }
1389 
1390     } nxt_list_loop;
1391 
1392     file = nxt_list_first(rt->log_files);
1393 
1394     return nxt_file_stderr(file);
1395 }
1396 
1397 
1398 nxt_int_t
1399 nxt_runtime_listen_sockets_create(nxt_task_t *task, nxt_runtime_t *rt)
1400 {
1401     nxt_int_t            ret;
1402     nxt_uint_t           c, p, ncurr, nprev;
1403     nxt_listen_socket_t  *curr, *prev;
1404 
1405     curr = rt->listen_sockets->elts;
1406     ncurr = rt->listen_sockets->nelts;
1407 
1408     if (rt->inherited_sockets != NULL) {
1409         prev = rt->inherited_sockets->elts;
1410         nprev = rt->inherited_sockets->nelts;
1411 
1412     } else {
1413         prev = NULL;
1414         nprev = 0;
1415     }
1416 
1417     for (c = 0; c < ncurr; c++) {
1418 
1419         for (p = 0; p < nprev; p++) {
1420 
1421             if (nxt_sockaddr_cmp(curr[c].sockaddr, prev[p].sockaddr)) {
1422 
1423                 ret = nxt_listen_socket_update(task, &curr[c], &prev[p]);
1424                 if (ret != NXT_OK) {
1425                     return NXT_ERROR;
1426                 }
1427 
1428                 goto next;
1429             }
1430         }
1431 
1432         if (nxt_listen_socket_create(task, &curr[c], 0) != NXT_OK) {
1433             return NXT_ERROR;
1434         }
1435 
1436     next:
1437 
1438         continue;
1439     }
1440 
1441     return NXT_OK;
1442 }
1443 
1444 
1445 nxt_int_t
1446 nxt_runtime_listen_sockets_enable(nxt_task_t *task, nxt_runtime_t *rt)
1447 {
1448     nxt_uint_t           i, n;
1449     nxt_listen_socket_t  *ls;
1450 
1451     ls = rt->listen_sockets->elts;
1452     n = rt->listen_sockets->nelts;
1453 
1454     for (i = 0; i < n; i++) {
1455         if (ls[i].flags == NXT_NONBLOCK) {
1456             if (nxt_listen_event(task, &ls[i]) == NULL) {
1457                 return NXT_ERROR;
1458             }
1459         }
1460     }
1461 
1462     return NXT_OK;
1463 }
1464 
1465 
1466 nxt_str_t *
1467 nxt_current_directory(nxt_mp_t *mp)
1468 {
1469     size_t     length;
1470     u_char     *p;
1471     nxt_str_t  *name;
1472     char       buf[NXT_MAX_PATH_LEN];
1473 
1474     length = nxt_dir_current(buf, NXT_MAX_PATH_LEN);
1475 
1476     if (nxt_fast_path(length != 0)) {
1477         name = nxt_str_alloc(mp, length + 1);
1478 
1479         if (nxt_fast_path(name != NULL)) {
1480             p = nxt_cpymem(name->start, buf, length);
1481             *p = '/';
1482 
1483             return name;
1484         }
1485     }
1486 
1487     return NULL;
1488 }
1489 
1490 
1491 static nxt_int_t
1492 nxt_runtime_pid_file_create(nxt_task_t *task, nxt_file_name_t *pid_file)
1493 {
1494     ssize_t     length;
1495     nxt_int_t   n;
1496     nxt_file_t  file;
1497     u_char      pid[NXT_INT64_T_LEN + NXT_LINEFEED_SIZE];
1498 
1499     nxt_memzero(&file, sizeof(nxt_file_t));
1500 
1501     file.name = pid_file;
1502 
1503     n = nxt_file_open(task, &file, O_WRONLY, O_CREAT | O_TRUNC,
1504                       NXT_FILE_DEFAULT_ACCESS);
1505 
1506     if (n != NXT_OK) {
1507         return NXT_ERROR;
1508     }
1509 
1510     length = nxt_sprintf(pid, pid + sizeof(pid), "%PI%n", nxt_pid) - pid;
1511 
1512     if (nxt_file_write(&file, pid, length, 0) != length) {
1513         return NXT_ERROR;
1514     }
1515 
1516     nxt_file_close(task, &file);
1517 
1518     return NXT_OK;
1519 }
1520 
1521 
1522 nxt_process_t *
1523 nxt_runtime_process_new(nxt_runtime_t *rt)
1524 {
1525     nxt_process_t  *process;
1526 
1527     /* TODO: memory failures. */
1528 
1529     process = nxt_mp_zalloc(rt->mem_pool, sizeof(nxt_process_t));
1530     if (nxt_slow_path(process == NULL)) {
1531         return NULL;
1532     }
1533 
1534     nxt_queue_init(&process->ports);
1535 
1536     nxt_thread_mutex_create(&process->incoming_mutex);
1537     nxt_thread_mutex_create(&process->outgoing_mutex);
1538     nxt_thread_mutex_create(&process->cp_mutex);
1539 
1540     return process;
1541 }
1542 
1543 
1544 static void
1545 nxt_runtime_process_destroy(nxt_runtime_t *rt, nxt_process_t *process)
1546 {
1547     nxt_assert(process->port_cleanups == 0);
1548     nxt_assert(process->registered == 0);
1549 
1550     nxt_port_mmaps_destroy(process->incoming, 1);
1551     nxt_port_mmaps_destroy(process->outgoing, 1);
1552 
1553     if (process->cp_mem_pool != NULL) {
1554         nxt_mp_thread_adopt(process->cp_mem_pool);
1555 
1556         nxt_mp_destroy(process->cp_mem_pool);
1557     }
1558 
1559     nxt_thread_mutex_destroy(&process->incoming_mutex);
1560     nxt_thread_mutex_destroy(&process->outgoing_mutex);
1561     nxt_thread_mutex_destroy(&process->cp_mutex);
1562 
1563     nxt_mp_free(rt->mem_pool, process);
1564 }
1565 
1566 
1567 static nxt_int_t
1568 nxt_runtime_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data)
1569 {
1570     nxt_process_t  *process;
1571 
1572     process = data;
1573 
1574     if (lhq->key.length == sizeof(nxt_pid_t)
1575         && *(nxt_pid_t *) lhq->key.start == process->pid)
1576     {
1577         return NXT_OK;
1578     }
1579 
1580     return NXT_DECLINED;
1581 }
1582 
1583 static const nxt_lvlhsh_proto_t  lvlhsh_processes_proto  nxt_aligned(64) = {
1584     NXT_LVLHSH_DEFAULT,
1585     nxt_runtime_lvlhsh_pid_test,
1586     nxt_lvlhsh_alloc,
1587     nxt_lvlhsh_free,
1588 };
1589 
1590 
1591 nxt_inline void
1592 nxt_runtime_process_lhq_pid(nxt_lvlhsh_query_t *lhq, nxt_pid_t *pid)
1593 {
1594     lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid));
1595     lhq->key.length = sizeof(*pid);
1596     lhq->key.start = (u_char *) pid;
1597     lhq->proto = &lvlhsh_processes_proto;
1598 }
1599 
1600 
1601 nxt_process_t *
1602 nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid)
1603 {
1604     nxt_process_t       *process;
1605     nxt_lvlhsh_query_t  lhq;
1606 
1607     process = NULL;
1608 
1609     nxt_runtime_process_lhq_pid(&lhq, &pid);
1610 
1611     nxt_thread_mutex_lock(&rt->processes_mutex);
1612 
1613     if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) {
1614         process = lhq.value;
1615 
1616     } else {
1617         nxt_thread_log_debug("process %PI not found", pid);
1618     }
1619 
1620     nxt_thread_mutex_unlock(&rt->processes_mutex);
1621 
1622     return process;
1623 }
1624 
1625 
1626 nxt_process_t *
1627 nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid)
1628 {
1629     nxt_process_t       *process;
1630     nxt_lvlhsh_query_t  lhq;
1631 
1632     nxt_runtime_process_lhq_pid(&lhq, &pid);
1633 
1634     nxt_thread_mutex_lock(&rt->processes_mutex);
1635 
1636     if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) {
1637         nxt_thread_log_debug("process %PI found", pid);
1638 
1639         nxt_thread_mutex_unlock(&rt->processes_mutex);
1640         return lhq.value;
1641     }
1642 
1643     process = nxt_runtime_process_new(rt);
1644     if (nxt_slow_path(process == NULL)) {
1645         return NULL;
1646     }
1647 
1648     process->pid = pid;
1649 
1650     lhq.replace = 0;
1651     lhq.value = process;
1652     lhq.pool = rt->mem_pool;
1653 
1654     switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) {
1655 
1656     case NXT_OK:
1657         if (rt->nprocesses == 0) {
1658             rt->mprocess = process;
1659         }
1660 
1661         rt->nprocesses++;
1662 
1663         process->registered = 1;
1664 
1665         nxt_thread_log_debug("process %PI insert", pid);
1666         break;
1667 
1668     default:
1669         nxt_thread_log_debug("process %PI insert failed", pid);
1670         break;
1671     }
1672 
1673     nxt_thread_mutex_unlock(&rt->processes_mutex);
1674 
1675     return process;
1676 }
1677 
1678 
1679 void
1680 nxt_runtime_process_add(nxt_runtime_t *rt, nxt_process_t *process)
1681 {
1682     nxt_port_t          *port;
1683     nxt_lvlhsh_query_t  lhq;
1684 
1685     nxt_assert(process->registered == 0);
1686 
1687     nxt_runtime_process_lhq_pid(&lhq, &process->pid);
1688 
1689     lhq.replace = 0;
1690     lhq.value = process;
1691     lhq.pool = rt->mem_pool;
1692 
1693     nxt_thread_mutex_lock(&rt->processes_mutex);
1694 
1695     switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) {
1696 
1697     case NXT_OK:
1698         if (rt->nprocesses == 0) {
1699             rt->mprocess = process;
1700         }
1701 
1702         rt->nprocesses++;
1703 
1704         nxt_process_port_each(process, port) {
1705 
1706             port->pid = process->pid;
1707 
1708             nxt_runtime_port_add(rt, port);
1709 
1710         } nxt_process_port_loop;
1711 
1712         process->registered = 1;
1713 
1714         nxt_thread_log_debug("process %PI added", process->pid);
1715         break;
1716 
1717     default:
1718         nxt_thread_log_debug("process %PI failed to add", process->pid);
1719         break;
1720     }
1721 
1722     nxt_thread_mutex_unlock(&rt->processes_mutex);
1723 }
1724 
1725 
1726 static nxt_process_t *
1727 nxt_runtime_process_remove_pid(nxt_runtime_t *rt, nxt_pid_t pid)
1728 {
1729     nxt_process_t       *process;
1730     nxt_lvlhsh_query_t  lhq;
1731 
1732     process = NULL;
1733 
1734     nxt_runtime_process_lhq_pid(&lhq, &pid);
1735 
1736     lhq.pool = rt->mem_pool;
1737 
1738     nxt_thread_mutex_lock(&rt->processes_mutex);
1739 
1740     switch (nxt_lvlhsh_delete(&rt->processes, &lhq)) {
1741 
1742     case NXT_OK:
1743         rt->nprocesses--;
1744 
1745         process = lhq.value;
1746 
1747         process->registered = 0;
1748 
1749         nxt_thread_log_debug("process %PI removed", pid);
1750         break;
1751 
1752     default:
1753         nxt_thread_log_debug("process %PI remove failed", pid);
1754         break;
1755     }
1756 
1757     nxt_thread_mutex_unlock(&rt->processes_mutex);
1758 
1759     return process;
1760 }
1761 
1762 
1763 void
1764 nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process)
1765 {
1766     nxt_port_t  *port;
1767 
1768     if (process->port_cleanups == 0) {
1769         if (process->registered == 1) {
1770             nxt_runtime_process_remove_pid(rt, process->pid);
1771         }
1772 
1773         nxt_runtime_process_destroy(rt, process);
1774 
1775     } else {
1776         nxt_process_port_each(process, port) {
1777 
1778             nxt_runtime_port_remove(rt, port);
1779 
1780             nxt_port_release(port);
1781 
1782         } nxt_process_port_loop;
1783     }
1784 }
1785 
1786 
1787 nxt_process_t *
1788 nxt_runtime_process_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe)
1789 {
1790     nxt_memzero(lhe, sizeof(nxt_lvlhsh_each_t));
1791 
1792     lhe->proto = &lvlhsh_processes_proto;
1793 
1794     return nxt_runtime_process_next(rt, lhe);
1795 }
1796 
1797 
1798 nxt_port_t *
1799 nxt_runtime_port_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe)
1800 {
1801     return nxt_port_hash_first(&rt->ports, lhe);
1802 }
1803 
1804 
1805 void
1806 nxt_runtime_port_add(nxt_runtime_t *rt, nxt_port_t *port)
1807 {
1808     nxt_port_hash_add(&rt->ports, rt->mem_pool, port);
1809 
1810     rt->port_by_type[port->type] = port;
1811 }
1812 
1813 
1814 void
1815 nxt_runtime_port_remove(nxt_runtime_t *rt, nxt_port_t *port)
1816 {
1817     nxt_port_hash_remove(&rt->ports, rt->mem_pool, port);
1818 
1819     if (rt->port_by_type[port->type] == port) {
1820         rt->port_by_type[port->type] = NULL;
1821     }
1822 }
1823 
1824 
1825 nxt_port_t *
1826 nxt_runtime_port_find(nxt_runtime_t *rt, nxt_pid_t pid,
1827     nxt_port_id_t port_id)
1828 {
1829     return nxt_port_hash_find(&rt->ports, pid, port_id);
1830 }
1831