xref: /unit/src/nxt_runtime.c (revision 216:07257705cd64)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Valentin V. Bartenev
5  * Copyright (C) NGINX, Inc.
6  */
7 
8 #include <nxt_main.h>
9 #include <nxt_runtime.h>
10 #include <nxt_port.h>
11 #include <nxt_master_process.h>
12 #include <nxt_router.h>
13 
14 
15 static nxt_int_t nxt_runtime_inherited_listen_sockets(nxt_task_t *task,
16     nxt_runtime_t *rt);
17 static nxt_int_t nxt_runtime_systemd_listen_sockets(nxt_task_t *task,
18     nxt_runtime_t *rt);
19 static nxt_int_t nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt);
20 static nxt_int_t nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt);
21 static void nxt_runtime_start(nxt_task_t *task, void *obj, void *data);
22 static void nxt_runtime_initial_start(nxt_task_t *task);
23 static void nxt_single_process_start(nxt_thread_t *thr, nxt_task_t *task,
24     nxt_runtime_t *rt);
25 static void nxt_runtime_close_idle_connections(nxt_event_engine_t *engine);
26 static void nxt_runtime_exit(nxt_task_t *task, void *obj, void *data);
27 static nxt_int_t nxt_runtime_event_engine_change(nxt_task_t *task,
28     nxt_runtime_t *rt);
29 static nxt_int_t nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt);
30 static nxt_int_t nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt);
31 static nxt_sockaddr_t *nxt_runtime_sockaddr_parse(nxt_task_t *task,
32     nxt_mp_t *mp, nxt_str_t *addr);
33 static nxt_sockaddr_t *nxt_runtime_sockaddr_unix_parse(nxt_task_t *task,
34     nxt_mp_t *mp, nxt_str_t *addr);
35 static nxt_sockaddr_t *nxt_runtime_sockaddr_inet6_parse(nxt_task_t *task,
36     nxt_mp_t *mp, nxt_str_t *addr);
37 static nxt_sockaddr_t *nxt_runtime_sockaddr_inet_parse(nxt_task_t *task,
38     nxt_mp_t *mp, nxt_str_t *addr);
39 static nxt_int_t nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt);
40 static nxt_int_t nxt_runtime_log_files_init(nxt_runtime_t *rt);
41 static nxt_int_t nxt_runtime_log_files_create(nxt_task_t *task,
42     nxt_runtime_t *rt);
43 static nxt_int_t nxt_runtime_pid_file_create(nxt_task_t *task,
44     nxt_file_name_t *pid_file);
45 
46 #if (NXT_THREADS)
47 static void nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt,
48     nxt_runtime_cont_t cont);
49 #endif
50 
51 static void nxt_runtime_process_destroy(nxt_runtime_t *rt,
52     nxt_process_t *process);
53 static nxt_process_t *nxt_runtime_process_remove_pid(nxt_runtime_t *rt,
54     nxt_pid_t pid);
55 
56 
57 nxt_int_t
58 nxt_runtime_create(nxt_task_t *task)
59 {
60     nxt_mp_t               *mp;
61     nxt_int_t              ret;
62     nxt_array_t            *listen_sockets;
63     nxt_runtime_t          *rt;
64     nxt_app_lang_module_t  *lang;
65 
66     mp = nxt_mp_create(1024, 128, 256, 32);
67 
68     if (nxt_slow_path(mp == NULL)) {
69         return NXT_ERROR;
70     }
71 
72     rt = nxt_mp_zget(mp, sizeof(nxt_runtime_t));
73     if (nxt_slow_path(rt == NULL)) {
74         return NXT_ERROR;
75     }
76 
77     task->thread->runtime = rt;
78     rt->mem_pool = mp;
79 
80     nxt_thread_mutex_create(&rt->processes_mutex);
81 
82     rt->prefix = nxt_current_directory(mp);
83     if (nxt_slow_path(rt->prefix == NULL)) {
84         goto fail;
85     }
86 
87     rt->conf_prefix = rt->prefix;
88 
89     rt->services = nxt_services_init(mp);
90     if (nxt_slow_path(rt->services == NULL)) {
91         goto fail;
92     }
93 
94     rt->languages = nxt_array_create(mp, 1, sizeof(nxt_app_lang_module_t));
95     if (nxt_slow_path(rt->languages == NULL)) {
96         goto fail;
97     }
98 
99     /* Should not fail. */
100     lang = nxt_array_add(rt->languages);
101     lang->type = (nxt_str_t) nxt_string("go");
102     lang->version = (nxt_str_t) nxt_null_string;
103     lang->file = NULL;
104     lang->module = &nxt_go_module;
105 
106     listen_sockets = nxt_array_create(mp, 1, sizeof(nxt_listen_socket_t));
107     if (nxt_slow_path(listen_sockets == NULL)) {
108         goto fail;
109     }
110 
111     rt->listen_sockets = listen_sockets;
112 
113     ret = nxt_runtime_inherited_listen_sockets(task, rt);
114     if (nxt_slow_path(ret != NXT_OK)) {
115         goto fail;
116     }
117 
118     if (nxt_runtime_hostname(task, rt) != NXT_OK) {
119         goto fail;
120     }
121 
122     if (nxt_slow_path(nxt_runtime_log_files_init(rt) != NXT_OK)) {
123         goto fail;
124     }
125 
126     if (nxt_runtime_event_engines(task, rt) != NXT_OK) {
127         goto fail;
128     }
129 
130     if (nxt_slow_path(nxt_runtime_thread_pools(task->thread, rt) != NXT_OK)) {
131         goto fail;
132     }
133 
134     rt->start = nxt_runtime_initial_start;
135 
136     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
137                        nxt_runtime_start, task, rt, NULL);
138 
139     return NXT_OK;
140 
141 fail:
142 
143     nxt_mp_destroy(mp);
144 
145     return NXT_ERROR;
146 }
147 
148 
149 static nxt_int_t
150 nxt_runtime_inherited_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt)
151 {
152     u_char               *v, *p;
153     nxt_int_t            type;
154     nxt_array_t          *inherited_sockets;
155     nxt_socket_t         s;
156     nxt_listen_socket_t  *ls;
157 
158     v = (u_char *) getenv("NGINX");
159 
160     if (v == NULL) {
161         return nxt_runtime_systemd_listen_sockets(task, rt);
162     }
163 
164     nxt_log(task, NXT_LOG_CRIT, "using inherited listen sockets: %s", v);
165 
166     inherited_sockets = nxt_array_create(rt->mem_pool,
167                                          1, sizeof(nxt_listen_socket_t));
168     if (inherited_sockets == NULL) {
169         return NXT_ERROR;
170     }
171 
172     rt->inherited_sockets = inherited_sockets;
173 
174     for (p = v; *p != '\0'; p++) {
175 
176         if (*p == ';') {
177             s = nxt_int_parse(v, p - v);
178 
179             if (nxt_slow_path(s < 0)) {
180                 nxt_log(task, NXT_LOG_CRIT, "invalid socket number "
181                         "\"%s\" in NGINX environment variable, "
182                         "ignoring the rest of the variable", v);
183                 return NXT_ERROR;
184             }
185 
186             v = p + 1;
187 
188             ls = nxt_array_zero_add(inherited_sockets);
189             if (nxt_slow_path(ls == NULL)) {
190                 return NXT_ERROR;
191             }
192 
193             ls->socket = s;
194 
195             ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s);
196             if (nxt_slow_path(ls->sockaddr == NULL)) {
197                 return NXT_ERROR;
198             }
199 
200             type = nxt_socket_getsockopt(task, s, SOL_SOCKET, SO_TYPE);
201             if (nxt_slow_path(type == -1)) {
202                 return NXT_ERROR;
203             }
204 
205             ls->sockaddr->type = (uint16_t) type;
206         }
207     }
208 
209     return NXT_OK;
210 }
211 
212 
213 static nxt_int_t
214 nxt_runtime_systemd_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt)
215 {
216     u_char               *nfd, *pid;
217     nxt_int_t            n;
218     nxt_array_t          *inherited_sockets;
219     nxt_socket_t         s;
220     nxt_listen_socket_t  *ls;
221 
222     /*
223      * Number of listening sockets passed.  The socket
224      * descriptors start from number 3 and are sequential.
225      */
226     nfd = (u_char *) getenv("LISTEN_FDS");
227     if (nfd == NULL) {
228         return NXT_OK;
229     }
230 
231     /* The pid of the service process. */
232     pid = (u_char *) getenv("LISTEN_PID");
233     if (pid == NULL) {
234         return NXT_OK;
235     }
236 
237     n = nxt_int_parse(nfd, nxt_strlen(nfd));
238     if (n < 0) {
239         return NXT_OK;
240     }
241 
242     if (nxt_pid != nxt_int_parse(pid, nxt_strlen(pid))) {
243         return NXT_OK;
244     }
245 
246     nxt_log(task, NXT_LOG_INFO, "using %s systemd listen sockets", n);
247 
248     inherited_sockets = nxt_array_create(rt->mem_pool,
249                                          n, sizeof(nxt_listen_socket_t));
250     if (inherited_sockets == NULL) {
251         return NXT_ERROR;
252     }
253 
254     rt->inherited_sockets = inherited_sockets;
255 
256     for (s = 3; s < n; s++) {
257         ls = nxt_array_zero_add(inherited_sockets);
258         if (nxt_slow_path(ls == NULL)) {
259             return NXT_ERROR;
260         }
261 
262         ls->socket = s;
263 
264         ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s);
265         if (nxt_slow_path(ls->sockaddr == NULL)) {
266             return NXT_ERROR;
267         }
268 
269         ls->sockaddr->type = SOCK_STREAM;
270     }
271 
272     return NXT_OK;
273 }
274 
275 
276 static nxt_int_t
277 nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt)
278 {
279     nxt_thread_t                 *thread;
280     nxt_event_engine_t           *engine;
281     const nxt_event_interface_t  *interface;
282 
283     interface = nxt_service_get(rt->services, "engine", NULL);
284 
285     if (nxt_slow_path(interface == NULL)) {
286         /* TODO: log */
287         return NXT_ERROR;
288     }
289 
290     engine = nxt_event_engine_create(task, interface,
291                                      nxt_master_process_signals, 0, 0);
292 
293     if (nxt_slow_path(engine == NULL)) {
294         return NXT_ERROR;
295     }
296 
297     thread = task->thread;
298     thread->engine = engine;
299     thread->fiber = &engine->fibers->fiber;
300 
301     engine->id = rt->last_engine_id++;
302 
303     nxt_queue_init(&rt->engines);
304     nxt_queue_insert_tail(&rt->engines, &engine->link);
305 
306     return NXT_OK;
307 }
308 
309 
310 static nxt_int_t
311 nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt)
312 {
313 #if (NXT_THREADS)
314     nxt_int_t    ret;
315     nxt_array_t  *thread_pools;
316 
317     thread_pools = nxt_array_create(rt->mem_pool, 1,
318                                     sizeof(nxt_thread_pool_t *));
319 
320     if (nxt_slow_path(thread_pools == NULL)) {
321         return NXT_ERROR;
322     }
323 
324     rt->thread_pools = thread_pools;
325     ret = nxt_runtime_thread_pool_create(thr, rt, 2, 60000 * 1000000LL);
326 
327     if (nxt_slow_path(ret != NXT_OK)) {
328         return NXT_ERROR;
329     }
330 
331 #endif
332 
333     return NXT_OK;
334 }
335 
336 
337 static void
338 nxt_runtime_start(nxt_task_t *task, void *obj, void *data)
339 {
340     nxt_runtime_t  *rt;
341 
342     rt = obj;
343 
344     nxt_debug(task, "rt conf done");
345 
346     task->thread->log->ctx_handler = NULL;
347     task->thread->log->ctx = NULL;
348 
349     if (nxt_runtime_conf_init(task, rt) != NXT_OK) {
350         goto fail;
351     }
352 
353     if (nxt_runtime_log_files_create(task, rt) != NXT_OK) {
354         goto fail;
355     }
356 
357     if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) {
358         goto fail;
359     }
360 
361 #if (NXT_THREADS)
362 
363     /*
364      * Thread pools should be destroyed before starting worker
365      * processes, because thread pool semaphores will stick in
366      * locked state in new processes after fork().
367      */
368     nxt_runtime_thread_pool_destroy(task, rt, rt->start);
369 
370 #else
371 
372     rt->start(task->thread, rt);
373 
374 #endif
375 
376     return;
377 
378 fail:
379 
380     nxt_runtime_quit(task);
381 }
382 
383 
384 static void
385 nxt_runtime_initial_start(nxt_task_t *task)
386 {
387     nxt_int_t                    ret;
388     nxt_thread_t                 *thr;
389     nxt_runtime_t                *rt;
390     const nxt_event_interface_t  *interface;
391 
392     thr = task->thread;
393     rt = thr->runtime;
394 
395     if (rt->inherited_sockets == NULL && rt->daemon) {
396 
397         if (nxt_process_daemon(task) != NXT_OK) {
398             goto fail;
399         }
400 
401         /*
402          * An event engine should be updated after fork()
403          * even if an event facility was not changed because:
404          * 1) inherited kqueue descriptor is invalid,
405          * 2) the signal thread is not inherited.
406          */
407         interface = nxt_service_get(rt->services, "engine", rt->engine);
408         if (interface == NULL) {
409             goto fail;
410         }
411 
412         ret = nxt_event_engine_change(task->thread->engine, interface,
413                                       rt->batch);
414         if (ret != NXT_OK) {
415             goto fail;
416         }
417     }
418 
419     ret = nxt_runtime_pid_file_create(task, rt->pid_file);
420     if (ret != NXT_OK) {
421         goto fail;
422     }
423 
424     if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) {
425         goto fail;
426     }
427 
428     thr->engine->max_connections = rt->engine_connections;
429 
430     if (rt->master_process) {
431         if (nxt_master_process_start(thr, task, rt) != NXT_ERROR) {
432             return;
433         }
434 
435     } else {
436         nxt_single_process_start(thr, task, rt);
437         return;
438     }
439 
440 fail:
441 
442     nxt_runtime_quit(task);
443 }
444 
445 
446 static void
447 nxt_single_process_start(nxt_thread_t *thr, nxt_task_t *task, nxt_runtime_t *rt)
448 {
449 #if (NXT_THREADS)
450     nxt_int_t  ret;
451 
452     ret = nxt_runtime_thread_pool_create(thr, rt, rt->auxiliary_threads,
453                                        60000 * 1000000LL);
454 
455     if (nxt_slow_path(ret != NXT_OK)) {
456         nxt_runtime_quit(task);
457         return;
458     }
459 
460 #endif
461 
462     rt->types |= (1U << NXT_PROCESS_SINGLE);
463 
464     nxt_runtime_listen_sockets_enable(task, rt);
465 
466     return;
467 }
468 
469 
470 void
471 nxt_runtime_quit(nxt_task_t *task)
472 {
473     nxt_bool_t          done;
474     nxt_runtime_t       *rt;
475     nxt_event_engine_t  *engine;
476 
477     rt = task->thread->runtime;
478     engine = task->thread->engine;
479 
480     nxt_debug(task, "exiting");
481 
482     done = 1;
483 
484     if (!engine->shutdown) {
485         engine->shutdown = 1;
486 
487 #if (NXT_THREADS)
488 
489         if (!nxt_array_is_empty(rt->thread_pools)) {
490             nxt_runtime_thread_pool_destroy(task, rt, nxt_runtime_quit);
491             done = 0;
492         }
493 
494 #endif
495 
496         if (nxt_runtime_is_master(rt)) {
497             nxt_master_stop_worker_processes(task, rt);
498             done = 0;
499         }
500     }
501 
502     nxt_runtime_close_idle_connections(engine);
503 
504     if (done) {
505         nxt_work_queue_add(&engine->fast_work_queue, nxt_runtime_exit,
506                            task, rt, engine);
507     }
508 }
509 
510 
511 static void
512 nxt_runtime_close_idle_connections(nxt_event_engine_t *engine)
513 {
514     nxt_conn_t        *c;
515     nxt_queue_t       *idle;
516     nxt_queue_link_t  *link, *next;
517 
518     nxt_debug(&engine->task, "close idle connections");
519 
520     idle = &engine->idle_connections;
521 
522     for (link = nxt_queue_head(idle);
523          link != nxt_queue_tail(idle);
524          link = next)
525     {
526         next = nxt_queue_next(link);
527         c = nxt_queue_link_data(link, nxt_conn_t, link);
528 
529         if (!c->socket.read_ready) {
530             nxt_queue_remove(link);
531             nxt_conn_close(engine, c);
532         }
533     }
534 }
535 
536 
537 static void
538 nxt_runtime_exit(nxt_task_t *task, void *obj, void *data)
539 {
540     nxt_runtime_t       *rt;
541     nxt_process_t       *process;
542     nxt_event_engine_t  *engine;
543 
544     rt = obj;
545     engine = data;
546 
547 #if (NXT_THREADS)
548 
549     nxt_debug(task, "thread pools: %d", rt->thread_pools->nelts);
550 
551     if (!nxt_array_is_empty(rt->thread_pools)) {
552         return;
553     }
554 
555 #endif
556 
557     if (nxt_runtime_is_master(rt)) {
558         if (rt->pid_file != NULL) {
559             nxt_file_delete(rt->pid_file);
560         }
561     }
562 
563     if (!engine->event.signal_support) {
564         nxt_event_engine_signals_stop(engine);
565     }
566 
567     nxt_runtime_process_each(rt, process) {
568 
569         nxt_runtime_process_remove(rt, process);
570 
571     } nxt_runtime_process_loop;
572 
573     nxt_thread_mutex_destroy(&rt->processes_mutex);
574 
575     nxt_mp_destroy(rt->mem_pool);
576 
577     nxt_debug(task, "exit");
578 
579     exit(0);
580     nxt_unreachable();
581 }
582 
583 
584 static nxt_int_t
585 nxt_runtime_event_engine_change(nxt_task_t *task, nxt_runtime_t *rt)
586 {
587     nxt_event_engine_t           *engine;
588     const nxt_event_interface_t  *interface;
589 
590     engine = task->thread->engine;
591 
592     if (engine->batch == rt->batch
593         && nxt_strcmp(engine->event.name, rt->engine) == 0)
594     {
595         return NXT_OK;
596     }
597 
598     interface = nxt_service_get(rt->services, "engine", rt->engine);
599 
600     if (interface != NULL) {
601         return nxt_event_engine_change(engine, interface, rt->batch);
602     }
603 
604     return NXT_ERROR;
605 }
606 
607 
608 void
609 nxt_runtime_event_engine_free(nxt_runtime_t *rt)
610 {
611     nxt_queue_link_t    *link;
612     nxt_event_engine_t  *engine;
613 
614     link = nxt_queue_first(&rt->engines);
615     nxt_queue_remove(link);
616 
617     engine = nxt_queue_link_data(link, nxt_event_engine_t, link);
618     nxt_event_engine_free(engine);
619 }
620 
621 
622 #if (NXT_THREADS)
623 
624 static void nxt_runtime_thread_pool_init(void);
625 static void nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj,
626     void *data);
627 
628 
629 nxt_int_t
630 nxt_runtime_thread_pool_create(nxt_thread_t *thr, nxt_runtime_t *rt,
631     nxt_uint_t max_threads, nxt_nsec_t timeout)
632 {
633     nxt_thread_pool_t   *thread_pool, **tp;
634 
635     tp = nxt_array_add(rt->thread_pools);
636     if (tp == NULL) {
637         return NXT_ERROR;
638     }
639 
640     thread_pool = nxt_thread_pool_create(max_threads, timeout,
641                                          nxt_runtime_thread_pool_init,
642                                          thr->engine,
643                                          nxt_runtime_thread_pool_exit);
644 
645     if (nxt_fast_path(thread_pool != NULL)) {
646         *tp = thread_pool;
647     }
648 
649     return NXT_OK;
650 }
651 
652 
653 static void
654 nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt,
655     nxt_runtime_cont_t cont)
656 {
657     nxt_uint_t         n;
658     nxt_thread_pool_t  **tp;
659 
660     rt->continuation = cont;
661 
662     n = rt->thread_pools->nelts;
663 
664     if (n == 0) {
665         cont(task);
666         return;
667     }
668 
669     tp = rt->thread_pools->elts;
670 
671     do {
672         nxt_thread_pool_destroy(*tp);
673 
674         tp++;
675         n--;
676     } while (n != 0);
677 }
678 
679 
680 static void
681 nxt_runtime_thread_pool_init(void)
682 {
683 #if (NXT_REGEX)
684     nxt_regex_init(0);
685 #endif
686 }
687 
688 
689 static void
690 nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj, void *data)
691 {
692     nxt_uint_t           i, n;
693     nxt_runtime_t        *rt;
694     nxt_thread_pool_t    *tp, **thread_pools;
695     nxt_thread_handle_t  handle;
696 
697     tp = obj;
698 
699     if (data != NULL) {
700         handle = (nxt_thread_handle_t) (uintptr_t) data;
701         nxt_thread_wait(handle);
702     }
703 
704     rt = task->thread->runtime;
705 
706     thread_pools = rt->thread_pools->elts;
707     n = rt->thread_pools->nelts;
708 
709     nxt_debug(task, "thread pools: %ui", n);
710 
711     for (i = 0; i < n; i++) {
712 
713         if (tp == thread_pools[i]) {
714             nxt_array_remove(rt->thread_pools, &thread_pools[i]);
715 
716             if (n == 1) {
717                 /* The last thread pool. */
718                 rt->continuation(task);
719             }
720 
721             return;
722         }
723     }
724 }
725 
726 #endif
727 
728 
729 static nxt_int_t
730 nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt)
731 {
732     nxt_int_t                    ret;
733     nxt_str_t                    *prefix;
734     nxt_file_t                   *file;
735     nxt_file_name_str_t          file_name;
736     const nxt_event_interface_t  *interface;
737 
738     rt->daemon = 1;
739     rt->master_process = 1;
740     rt->engine_connections = 256;
741     rt->worker_processes = 1;
742     rt->auxiliary_threads = 2;
743     rt->user_cred.user = "nobody";
744     rt->group = NULL;
745     rt->pid = "nginext.pid";
746     rt->error_log = "error.log";
747 
748     if (nxt_runtime_conf_read_cmd(task, rt) != NXT_OK) {
749         return NXT_ERROR;
750     }
751 
752     if (nxt_runtime_controller_socket(task, rt) != NXT_OK) {
753         return NXT_ERROR;
754     }
755 
756     if (nxt_user_cred_get(task, &rt->user_cred, rt->group) != NXT_OK) {
757         return NXT_ERROR;
758     }
759 
760     /* An engine's parameters. */
761 
762     interface = nxt_service_get(rt->services, "engine", rt->engine);
763     if (interface == NULL) {
764         return NXT_ERROR;
765     }
766 
767     rt->engine = interface->name;
768 
769     prefix = nxt_file_name_is_absolute(rt->pid) ? NULL : rt->prefix;
770 
771     ret = nxt_file_name_create(rt->mem_pool, &file_name, "%V%s%Z",
772                                prefix, rt->pid);
773     if (nxt_slow_path(ret != NXT_OK)) {
774         return NXT_ERROR;
775     }
776 
777     rt->pid_file = file_name.start;
778 
779     prefix = nxt_file_name_is_absolute(rt->error_log) ? NULL : rt->prefix;
780 
781     ret = nxt_file_name_create(rt->mem_pool, &file_name, "%V%s%Z",
782                                prefix, rt->error_log);
783     if (nxt_slow_path(ret != NXT_OK)) {
784         return NXT_ERROR;
785     }
786 
787     file = nxt_list_first(rt->log_files);
788     file->name = file_name.start;
789 
790     return NXT_OK;
791 }
792 
793 
794 static nxt_int_t
795 nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt)
796 {
797     char            *p, **argv;
798     nxt_int_t       n;
799     nxt_str_t       addr;
800     nxt_sockaddr_t  *sa;
801 
802     argv = nxt_process_argv;
803 
804     while (*argv != NULL) {
805         p = *argv++;
806 
807         if (nxt_strcmp(p, "--listen") == 0) {
808             if (*argv == NULL) {
809                 nxt_log(task, NXT_LOG_CRIT,
810                          "no argument for option \"--listen\"");
811                 return NXT_ERROR;
812             }
813 
814             p = *argv++;
815 
816             addr.length = nxt_strlen(p);
817             addr.start = (u_char *) p;
818 
819             sa = nxt_runtime_sockaddr_parse(task, rt->mem_pool, &addr);
820 
821             if (sa == NULL) {
822                 return NXT_ERROR;
823             }
824 
825             rt->controller_listen = sa;
826 
827             continue;
828         }
829 
830         if (nxt_strcmp(p, "--upstream") == 0) {
831             if (*argv == NULL) {
832                 nxt_log(task, NXT_LOG_CRIT,
833                               "no argument for option \"--upstream\"");
834                 return NXT_ERROR;
835             }
836 
837             p = *argv++;
838 
839             rt->upstream.length = nxt_strlen(p);
840             rt->upstream.start = (u_char *) p;
841 
842             continue;
843         }
844 
845         if (nxt_strcmp(p, "--workers") == 0) {
846             if (*argv == NULL) {
847                 nxt_log(task, NXT_LOG_CRIT,
848                         "no argument for option \"--workers\"");
849                 return NXT_ERROR;
850             }
851 
852             p = *argv++;
853             n = nxt_int_parse((u_char *) p, nxt_strlen(p));
854 
855             if (n < 1) {
856                 nxt_log(task, NXT_LOG_CRIT,
857                         "invalid number of workers: \"%s\"", p);
858                 return NXT_ERROR;
859             }
860 
861             rt->worker_processes = n;
862 
863             continue;
864         }
865 
866         if (nxt_strcmp(p, "--user") == 0) {
867             if (*argv == NULL) {
868                 nxt_log(task, NXT_LOG_CRIT,
869                         "no argument for option \"--user\"");
870                 return NXT_ERROR;
871             }
872 
873             p = *argv++;
874 
875             rt->user_cred.user = p;
876 
877             continue;
878         }
879 
880         if (nxt_strcmp(p, "--group") == 0) {
881             if (*argv == NULL) {
882                 nxt_log(task, NXT_LOG_CRIT,
883                         "no argument for option \"--group\"");
884                 return NXT_ERROR;
885             }
886 
887             p = *argv++;
888 
889             rt->group = p;
890 
891             continue;
892         }
893 
894         if (nxt_strcmp(p, "--pid") == 0) {
895             if (*argv == NULL) {
896                 nxt_log(task, NXT_LOG_CRIT,
897                         "no argument for option \"--pid\"");
898                 return NXT_ERROR;
899             }
900 
901             p = *argv++;
902 
903             rt->pid = p;
904 
905             continue;
906         }
907 
908         if (nxt_strcmp(p, "--log") == 0) {
909             if (*argv == NULL) {
910                 nxt_log(task, NXT_LOG_CRIT,
911                         "no argument for option \"--log\"");
912                 return NXT_ERROR;
913             }
914 
915             p = *argv++;
916 
917             rt->error_log = p;
918 
919             continue;
920         }
921 
922         if (nxt_strcmp(p, "--no-daemonize") == 0) {
923             rt->daemon = 0;
924             continue;
925         }
926     }
927 
928     return NXT_OK;
929 }
930 
931 
932 static nxt_sockaddr_t *
933 nxt_runtime_sockaddr_parse(nxt_task_t *task, nxt_mp_t *mp, nxt_str_t *addr)
934 {
935     u_char  *p;
936     size_t  length;
937 
938     length = addr->length;
939     p = addr->start;
940 
941     if (length >= 5 && nxt_memcmp(p, "unix:", 5) == 0) {
942         return nxt_runtime_sockaddr_unix_parse(task, mp, addr);
943     }
944 
945     if (length != 0 && *p == '[') {
946         return nxt_runtime_sockaddr_inet6_parse(task, mp, addr);
947     }
948 
949     return nxt_runtime_sockaddr_inet_parse(task, mp, addr);
950 }
951 
952 
953 static nxt_sockaddr_t *
954 nxt_runtime_sockaddr_unix_parse(nxt_task_t *task, nxt_mp_t *mp, nxt_str_t *addr)
955 {
956 #if (NXT_HAVE_UNIX_DOMAIN)
957     u_char          *p;
958     size_t          length, socklen;
959     nxt_sockaddr_t  *sa;
960 
961     /*
962      * Actual sockaddr_un length can be lesser or even larger than defined
963      * struct sockaddr_un length (see comment in nxt_socket.h).  So
964      * limit maximum Unix domain socket address length by defined sun_path[]
965      * length because some OSes accept addresses twice larger than defined
966      * struct sockaddr_un.  Also reserve space for a trailing zero to avoid
967      * ambiguity, since many OSes accept Unix domain socket addresses
968      * without a trailing zero.
969      */
970     const size_t max_len = sizeof(struct sockaddr_un)
971                            - offsetof(struct sockaddr_un, sun_path) - 1;
972 
973     /* cutting "unix:" */
974     length = addr->length - 5;
975     p = addr->start + 5;
976 
977     if (length == 0) {
978         nxt_log(task, NXT_LOG_CRIT,
979                 "unix domain socket \"%V\" name is invalid", addr);
980         return NULL;
981     }
982 
983     if (length > max_len) {
984         nxt_log(task, NXT_LOG_CRIT,
985                 "unix domain socket \"%V\" name is too long", addr);
986         return NULL;
987     }
988 
989     socklen = offsetof(struct sockaddr_un, sun_path) + length + 1;
990 
991 #if (NXT_LINUX)
992 
993     /*
994      * Linux unix(7):
995      *
996      *   abstract: an abstract socket address is distinguished by the fact
997      *   that sun_path[0] is a null byte ('\0').  The socket's address in
998      *   this namespace is given by the additional bytes in sun_path that
999      *   are covered by the specified length of the address structure.
1000      *   (Null bytes in the name have no special significance.)
1001      */
1002     if (p[0] == '@') {
1003         p[0] = '\0';
1004         socklen--;
1005     }
1006 
1007 #endif
1008 
1009     sa = nxt_sockaddr_alloc(mp, socklen, addr->length);
1010 
1011     if (nxt_slow_path(sa == NULL)) {
1012         return NULL;
1013     }
1014 
1015     sa->type = SOCK_STREAM;
1016 
1017     sa->u.sockaddr_un.sun_family = AF_UNIX;
1018     nxt_memcpy(sa->u.sockaddr_un.sun_path, p, length);
1019 
1020     return sa;
1021 
1022 #else  /* !(NXT_HAVE_UNIX_DOMAIN) */
1023 
1024     nxt_log(task, NXT_LOG_CRIT, "unix domain socket \"%V\" is not supported",
1025             addr);
1026 
1027     return NULL;
1028 
1029 #endif
1030 }
1031 
1032 
1033 static nxt_sockaddr_t *
1034 nxt_runtime_sockaddr_inet6_parse(nxt_task_t *task, nxt_mp_t *mp,
1035     nxt_str_t *addr)
1036 {
1037 #if (NXT_INET6)
1038     u_char           *p, *addr_end;
1039     size_t           length;
1040     nxt_int_t        port;
1041     nxt_sockaddr_t   *sa;
1042     struct in6_addr  *in6_addr;
1043 
1044     length = addr->length - 1;
1045     p = addr->start + 1;
1046 
1047     addr_end = nxt_memchr(p, ']', length);
1048 
1049     if (addr_end == NULL) {
1050         goto invalid_address;
1051     }
1052 
1053     sa = nxt_sockaddr_alloc(mp, sizeof(struct sockaddr_in6),
1054                             NXT_INET6_ADDR_STR_LEN);
1055     if (nxt_slow_path(sa == NULL)) {
1056         return NULL;
1057     }
1058 
1059     in6_addr = &sa->u.sockaddr_in6.sin6_addr;
1060 
1061     if (nxt_inet6_addr(in6_addr, p, addr_end - p) != NXT_OK) {
1062         goto invalid_address;
1063     }
1064 
1065     port = 0;
1066     p = addr_end + 1;
1067     length = (p + length) - p;
1068 
1069     if (length == 0) {
1070         goto found;
1071     }
1072 
1073     if (*p == ':') {
1074         port = nxt_int_parse(p + 1, length - 1);
1075 
1076         if (port >= 1 && port <= 65535) {
1077             goto found;
1078         }
1079     }
1080 
1081     nxt_log(task, NXT_LOG_CRIT, "invalid port in \"%V\"", addr);
1082 
1083     return NULL;
1084 
1085 found:
1086 
1087     sa->type = SOCK_STREAM;
1088 
1089     sa->u.sockaddr_in6.sin6_family = AF_INET6;
1090     sa->u.sockaddr_in6.sin6_port = htons((in_port_t) port);
1091 
1092     return sa;
1093 
1094 invalid_address:
1095 
1096     nxt_log(task, NXT_LOG_CRIT, "invalid IPv6 address in \"%V\"", addr);
1097 
1098     return NULL;
1099 
1100 #else
1101 
1102     nxt_log(task, NXT_LOG_CRIT, "IPv6 socket \"%V\" is not supported", addr);
1103 
1104     return NULL;
1105 
1106 #endif
1107 }
1108 
1109 
1110 static nxt_sockaddr_t *
1111 nxt_runtime_sockaddr_inet_parse(nxt_task_t *task, nxt_mp_t *mp,
1112     nxt_str_t *string)
1113 {
1114     u_char          *p, *ip;
1115     size_t          length;
1116     in_addr_t       addr;
1117     nxt_int_t       port;
1118     nxt_sockaddr_t  *sa;
1119 
1120     addr = INADDR_ANY;
1121 
1122     length = string->length;
1123     ip = string->start;
1124 
1125     p = nxt_memchr(ip, ':', length);
1126 
1127     if (p == NULL) {
1128 
1129         /* single value port, or address */
1130 
1131         port = nxt_int_parse(ip, length);
1132 
1133         if (port > 0) {
1134             /* "*:XX" */
1135 
1136             if (port < 1 || port > 65535) {
1137                 goto invalid_port;
1138             }
1139 
1140         } else {
1141             /* "x.x.x.x" */
1142 
1143             addr = nxt_inet_addr(ip, length);
1144 
1145             if (addr == INADDR_NONE) {
1146                 goto invalid_port;
1147             }
1148 
1149             port = 8080;
1150         }
1151 
1152     } else {
1153 
1154         /* x.x.x.x:XX */
1155 
1156         p++;
1157         length = (ip + length) - p;
1158         port = nxt_int_parse(p, length);
1159 
1160         if (port < 1 || port > 65535) {
1161             goto invalid_port;
1162         }
1163 
1164         length = (p - 1) - ip;
1165 
1166         if (length != 1 || ip[0] != '*') {
1167             addr = nxt_inet_addr(ip, length);
1168 
1169             if (addr == INADDR_NONE) {
1170                 goto invalid_addr;
1171             }
1172 
1173             /* "x.x.x.x:XX" */
1174         }
1175     }
1176 
1177     sa = nxt_sockaddr_alloc(mp, sizeof(struct sockaddr_in),
1178                             NXT_INET_ADDR_STR_LEN);
1179     if (nxt_slow_path(sa == NULL)) {
1180         return NULL;
1181     }
1182 
1183     sa->type = SOCK_STREAM;
1184 
1185     sa->u.sockaddr_in.sin_family = AF_INET;
1186     sa->u.sockaddr_in.sin_port = htons((in_port_t) port);
1187     sa->u.sockaddr_in.sin_addr.s_addr = addr;
1188 
1189     return sa;
1190 
1191 invalid_port:
1192 
1193     nxt_log(task, NXT_LOG_CRIT, "invalid port in \"%V\"", string);
1194 
1195     return NULL;
1196 
1197 invalid_addr:
1198 
1199     nxt_log(task, NXT_LOG_CRIT, "invalid address in \"%V\"", string);
1200 
1201     return NULL;
1202 }
1203 
1204 
1205 nxt_listen_socket_t *
1206 nxt_runtime_listen_socket_add(nxt_runtime_t *rt, nxt_sockaddr_t *sa)
1207 {
1208     nxt_mp_t             *mp;
1209     nxt_listen_socket_t  *ls;
1210 
1211     ls = nxt_array_zero_add(rt->listen_sockets);
1212     if (ls == NULL) {
1213         return NULL;
1214     }
1215 
1216     mp = rt->mem_pool;
1217 
1218     ls->sockaddr = nxt_sockaddr_create(mp, &sa->u.sockaddr, sa->socklen,
1219                                        sa->length);
1220     if (ls->sockaddr == NULL) {
1221         return NULL;
1222     }
1223 
1224     ls->sockaddr->type = sa->type;
1225 
1226     nxt_sockaddr_text(ls->sockaddr);
1227 
1228     ls->socket = -1;
1229     ls->backlog = NXT_LISTEN_BACKLOG;
1230 
1231     return ls;
1232 }
1233 
1234 
1235 static nxt_int_t
1236 nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt)
1237 {
1238     size_t  length;
1239     char    hostname[NXT_MAXHOSTNAMELEN + 1];
1240 
1241     if (gethostname(hostname, NXT_MAXHOSTNAMELEN) != 0) {
1242         nxt_log(task, NXT_LOG_CRIT, "gethostname() failed %E", nxt_errno);
1243         return NXT_ERROR;
1244     }
1245 
1246     /*
1247      * Linux gethostname(2):
1248      *
1249      *    If the null-terminated hostname is too large to fit,
1250      *    then the name is truncated, and no error is returned.
1251      *
1252      * For this reason an additional byte is reserved in the buffer.
1253      */
1254     hostname[NXT_MAXHOSTNAMELEN] = '\0';
1255 
1256     length = nxt_strlen(hostname);
1257     rt->hostname.length = length;
1258 
1259     rt->hostname.start = nxt_mp_nget(rt->mem_pool, length);
1260 
1261     if (rt->hostname.start != NULL) {
1262         nxt_memcpy_lowcase(rt->hostname.start, (u_char *) hostname, length);
1263         return NXT_OK;
1264     }
1265 
1266     return NXT_ERROR;
1267 }
1268 
1269 
1270 static nxt_int_t
1271 nxt_runtime_log_files_init(nxt_runtime_t *rt)
1272 {
1273     nxt_file_t  *file;
1274     nxt_list_t  *log_files;
1275 
1276     log_files = nxt_list_create(rt->mem_pool, 1, sizeof(nxt_file_t));
1277 
1278     if (nxt_fast_path(log_files != NULL)) {
1279         rt->log_files = log_files;
1280 
1281         /* Preallocate the main error_log.  This allocation cannot fail. */
1282         file = nxt_list_zero_add(log_files);
1283 
1284         file->fd = NXT_FILE_INVALID;
1285         file->log_level = NXT_LOG_CRIT;
1286 
1287         return NXT_OK;
1288     }
1289 
1290     return NXT_ERROR;
1291 }
1292 
1293 
1294 nxt_file_t *
1295 nxt_runtime_log_file_add(nxt_runtime_t *rt, nxt_str_t *name)
1296 {
1297     nxt_int_t            ret;
1298     nxt_str_t            *prefix;
1299     nxt_file_t           *file;
1300     nxt_file_name_str_t  file_name;
1301 
1302     prefix = nxt_file_name_is_absolute(name->start) ? NULL : rt->prefix;
1303 
1304     ret = nxt_file_name_create(rt->mem_pool, &file_name, "%V%V%Z",
1305                                prefix, name);
1306 
1307     if (nxt_slow_path(ret != NXT_OK)) {
1308         return NULL;
1309     }
1310 
1311     nxt_list_each(file, rt->log_files) {
1312 
1313         /* STUB: hardecoded case sensitive/insensitive. */
1314 
1315         if (file->name != NULL
1316             && nxt_file_name_eq(file->name, file_name.start))
1317         {
1318             return file;
1319         }
1320 
1321     } nxt_list_loop;
1322 
1323     file = nxt_list_zero_add(rt->log_files);
1324 
1325     if (nxt_slow_path(file == NULL)) {
1326         return NULL;
1327     }
1328 
1329     file->fd = NXT_FILE_INVALID;
1330     file->log_level = NXT_LOG_CRIT;
1331     file->name = file_name.start;
1332 
1333     return file;
1334 }
1335 
1336 
1337 static nxt_int_t
1338 nxt_runtime_log_files_create(nxt_task_t *task, nxt_runtime_t *rt)
1339 {
1340     nxt_int_t   ret;
1341     nxt_file_t  *file;
1342 
1343     nxt_list_each(file, rt->log_files) {
1344 
1345         ret = nxt_file_open(task, file, O_WRONLY | O_APPEND, O_CREAT,
1346                             NXT_FILE_OWNER_ACCESS);
1347 
1348         if (ret != NXT_OK) {
1349             return NXT_ERROR;
1350         }
1351 
1352     } nxt_list_loop;
1353 
1354     file = nxt_list_first(rt->log_files);
1355 
1356     return nxt_file_stderr(file);
1357 }
1358 
1359 
1360 nxt_int_t
1361 nxt_runtime_listen_sockets_create(nxt_task_t *task, nxt_runtime_t *rt)
1362 {
1363     nxt_int_t            ret;
1364     nxt_uint_t           c, p, ncurr, nprev;
1365     nxt_listen_socket_t  *curr, *prev;
1366 
1367     curr = rt->listen_sockets->elts;
1368     ncurr = rt->listen_sockets->nelts;
1369 
1370     if (rt->inherited_sockets != NULL) {
1371         prev = rt->inherited_sockets->elts;
1372         nprev = rt->inherited_sockets->nelts;
1373 
1374     } else {
1375         prev = NULL;
1376         nprev = 0;
1377     }
1378 
1379     for (c = 0; c < ncurr; c++) {
1380 
1381         for (p = 0; p < nprev; p++) {
1382 
1383             if (nxt_sockaddr_cmp(curr[c].sockaddr, prev[p].sockaddr)) {
1384 
1385                 ret = nxt_listen_socket_update(task, &curr[c], &prev[p]);
1386                 if (ret != NXT_OK) {
1387                     return NXT_ERROR;
1388                 }
1389 
1390                 goto next;
1391             }
1392         }
1393 
1394         if (nxt_listen_socket_create(task, &curr[c], 0) != NXT_OK) {
1395             return NXT_ERROR;
1396         }
1397 
1398     next:
1399 
1400         continue;
1401     }
1402 
1403     return NXT_OK;
1404 }
1405 
1406 
1407 nxt_int_t
1408 nxt_runtime_listen_sockets_enable(nxt_task_t *task, nxt_runtime_t *rt)
1409 {
1410     nxt_uint_t           i, n;
1411     nxt_listen_socket_t  *ls;
1412 
1413     ls = rt->listen_sockets->elts;
1414     n = rt->listen_sockets->nelts;
1415 
1416     for (i = 0; i < n; i++) {
1417         if (ls[i].flags == NXT_NONBLOCK) {
1418             if (nxt_listen_event(task, &ls[i]) == NULL) {
1419                 return NXT_ERROR;
1420             }
1421         }
1422     }
1423 
1424     return NXT_OK;
1425 }
1426 
1427 
1428 nxt_str_t *
1429 nxt_current_directory(nxt_mp_t *mp)
1430 {
1431     size_t     length;
1432     u_char     *p;
1433     nxt_str_t  *name;
1434     char       buf[NXT_MAX_PATH_LEN];
1435 
1436     length = nxt_dir_current(buf, NXT_MAX_PATH_LEN);
1437 
1438     if (nxt_fast_path(length != 0)) {
1439         name = nxt_str_alloc(mp, length + 1);
1440 
1441         if (nxt_fast_path(name != NULL)) {
1442             p = nxt_cpymem(name->start, buf, length);
1443             *p = '/';
1444 
1445             return name;
1446         }
1447     }
1448 
1449     return NULL;
1450 }
1451 
1452 
1453 static nxt_int_t
1454 nxt_runtime_pid_file_create(nxt_task_t *task, nxt_file_name_t *pid_file)
1455 {
1456     ssize_t     length;
1457     nxt_int_t   n;
1458     nxt_file_t  file;
1459     u_char      pid[NXT_INT64_T_LEN + NXT_LINEFEED_SIZE];
1460 
1461     nxt_memzero(&file, sizeof(nxt_file_t));
1462 
1463     file.name = pid_file;
1464 
1465     n = nxt_file_open(task, &file, O_WRONLY, O_CREAT | O_TRUNC,
1466                       NXT_FILE_DEFAULT_ACCESS);
1467 
1468     if (n != NXT_OK) {
1469         return NXT_ERROR;
1470     }
1471 
1472     length = nxt_sprintf(pid, pid + sizeof(pid), "%PI%n", nxt_pid) - pid;
1473 
1474     if (nxt_file_write(&file, pid, length, 0) != length) {
1475         return NXT_ERROR;
1476     }
1477 
1478     nxt_file_close(task, &file);
1479 
1480     return NXT_OK;
1481 }
1482 
1483 
1484 nxt_process_t *
1485 nxt_runtime_process_new(nxt_runtime_t *rt)
1486 {
1487     nxt_process_t  *process;
1488 
1489     /* TODO: memory failures. */
1490 
1491     process = nxt_mp_zalloc(rt->mem_pool, sizeof(nxt_process_t));
1492     if (nxt_slow_path(process == NULL)) {
1493         return NULL;
1494     }
1495 
1496     nxt_queue_init(&process->ports);
1497 
1498     nxt_thread_mutex_create(&process->incoming_mutex);
1499     nxt_thread_mutex_create(&process->outgoing_mutex);
1500     nxt_thread_mutex_create(&process->cp_mutex);
1501 
1502     return process;
1503 }
1504 
1505 
1506 static void
1507 nxt_runtime_process_destroy(nxt_runtime_t *rt, nxt_process_t *process)
1508 {
1509     nxt_assert(process->port_cleanups == 0);
1510     nxt_assert(process->registered == 0);
1511 
1512     nxt_port_mmaps_destroy(process->incoming, 1);
1513     nxt_port_mmaps_destroy(process->outgoing, 1);
1514 
1515     if (process->cp_mem_pool != NULL) {
1516         nxt_mp_thread_adopt(process->cp_mem_pool);
1517 
1518         nxt_mp_destroy(process->cp_mem_pool);
1519     }
1520 
1521     nxt_thread_mutex_destroy(&process->incoming_mutex);
1522     nxt_thread_mutex_destroy(&process->outgoing_mutex);
1523     nxt_thread_mutex_destroy(&process->cp_mutex);
1524 
1525     nxt_mp_free(rt->mem_pool, process);
1526 }
1527 
1528 
1529 static nxt_int_t
1530 nxt_runtime_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data)
1531 {
1532     nxt_process_t  *process;
1533 
1534     process = data;
1535 
1536     if (lhq->key.length == sizeof(nxt_pid_t) &&
1537         *(nxt_pid_t *) lhq->key.start == process->pid) {
1538         return NXT_OK;
1539     }
1540 
1541     return NXT_DECLINED;
1542 }
1543 
1544 static const nxt_lvlhsh_proto_t  lvlhsh_processes_proto  nxt_aligned(64) = {
1545     NXT_LVLHSH_DEFAULT,
1546     nxt_runtime_lvlhsh_pid_test,
1547     nxt_lvlhsh_alloc,
1548     nxt_lvlhsh_free,
1549 };
1550 
1551 
1552 nxt_inline void
1553 nxt_runtime_process_lhq_pid(nxt_lvlhsh_query_t *lhq, nxt_pid_t *pid)
1554 {
1555     lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid));
1556     lhq->key.length = sizeof(*pid);
1557     lhq->key.start = (u_char *) pid;
1558     lhq->proto = &lvlhsh_processes_proto;
1559 }
1560 
1561 
1562 nxt_process_t *
1563 nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid)
1564 {
1565     nxt_process_t       *process;
1566     nxt_lvlhsh_query_t  lhq;
1567 
1568     process = NULL;
1569 
1570     nxt_runtime_process_lhq_pid(&lhq, &pid);
1571 
1572     nxt_thread_mutex_lock(&rt->processes_mutex);
1573 
1574     if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) {
1575         process = lhq.value;
1576 
1577     } else {
1578         nxt_thread_log_debug("process %PI not found", pid);
1579     }
1580 
1581     nxt_thread_mutex_unlock(&rt->processes_mutex);
1582 
1583     return process;
1584 }
1585 
1586 
1587 nxt_process_t *
1588 nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid)
1589 {
1590     nxt_process_t       *process;
1591     nxt_lvlhsh_query_t  lhq;
1592 
1593     nxt_runtime_process_lhq_pid(&lhq, &pid);
1594 
1595     nxt_thread_mutex_lock(&rt->processes_mutex);
1596 
1597     if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) {
1598         nxt_thread_log_debug("process %PI found", pid);
1599 
1600         nxt_thread_mutex_unlock(&rt->processes_mutex);
1601         return lhq.value;
1602     }
1603 
1604     process = nxt_runtime_process_new(rt);
1605     if (nxt_slow_path(process == NULL)) {
1606         return NULL;
1607     }
1608 
1609     process->pid = pid;
1610 
1611     lhq.replace = 0;
1612     lhq.value = process;
1613     lhq.pool = rt->mem_pool;
1614 
1615     switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) {
1616 
1617     case NXT_OK:
1618         if (rt->nprocesses == 0) {
1619             rt->mprocess = process;
1620         }
1621 
1622         rt->nprocesses++;
1623 
1624         process->registered = 1;
1625 
1626         nxt_thread_log_debug("process %PI insert", pid);
1627         break;
1628 
1629     default:
1630         nxt_thread_log_debug("process %PI insert failed", pid);
1631         break;
1632     }
1633 
1634     nxt_thread_mutex_unlock(&rt->processes_mutex);
1635 
1636     return process;
1637 }
1638 
1639 
1640 void
1641 nxt_runtime_process_add(nxt_runtime_t *rt, nxt_process_t *process)
1642 {
1643     nxt_port_t          *port;
1644     nxt_lvlhsh_query_t  lhq;
1645 
1646     nxt_assert(process->registered == 0);
1647 
1648     nxt_runtime_process_lhq_pid(&lhq, &process->pid);
1649 
1650     lhq.replace = 0;
1651     lhq.value = process;
1652     lhq.pool = rt->mem_pool;
1653 
1654     nxt_thread_mutex_lock(&rt->processes_mutex);
1655 
1656     switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) {
1657 
1658     case NXT_OK:
1659         if (rt->nprocesses == 0) {
1660             rt->mprocess = process;
1661         }
1662 
1663         rt->nprocesses++;
1664 
1665         nxt_process_port_each(process, port) {
1666 
1667             port->pid = process->pid;
1668 
1669             nxt_runtime_port_add(rt, port);
1670 
1671         } nxt_process_port_loop;
1672 
1673         process->registered = 1;
1674 
1675         nxt_thread_log_debug("process %PI added", process->pid);
1676         break;
1677 
1678     default:
1679         nxt_thread_log_debug("process %PI failed to add", process->pid);
1680         break;
1681     }
1682 
1683     nxt_thread_mutex_unlock(&rt->processes_mutex);
1684 }
1685 
1686 
1687 static nxt_process_t *
1688 nxt_runtime_process_remove_pid(nxt_runtime_t *rt, nxt_pid_t pid)
1689 {
1690     nxt_process_t       *process;
1691     nxt_lvlhsh_query_t  lhq;
1692 
1693     process = NULL;
1694 
1695     nxt_runtime_process_lhq_pid(&lhq, &pid);
1696 
1697     lhq.pool = rt->mem_pool;
1698 
1699     nxt_thread_mutex_lock(&rt->processes_mutex);
1700 
1701     switch (nxt_lvlhsh_delete(&rt->processes, &lhq)) {
1702 
1703     case NXT_OK:
1704         rt->nprocesses--;
1705 
1706         process = lhq.value;
1707 
1708         process->registered = 0;
1709 
1710         nxt_thread_log_debug("process %PI removed", pid);
1711         break;
1712 
1713     default:
1714         nxt_thread_log_debug("process %PI remove failed", pid);
1715         break;
1716     }
1717 
1718     nxt_thread_mutex_unlock(&rt->processes_mutex);
1719 
1720     return process;
1721 }
1722 
1723 
1724 void
1725 nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process)
1726 {
1727     nxt_port_t  *port;
1728 
1729     if (process->port_cleanups == 0) {
1730         if (process->registered == 1) {
1731             nxt_runtime_process_remove_pid(rt, process->pid);
1732         }
1733 
1734         nxt_runtime_process_destroy(rt, process);
1735 
1736     } else {
1737         nxt_process_port_each(process, port) {
1738 
1739             nxt_runtime_port_remove(rt, port);
1740 
1741             nxt_port_release(port);
1742 
1743         } nxt_process_port_loop;
1744     }
1745 }
1746 
1747 
1748 nxt_process_t *
1749 nxt_runtime_process_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe)
1750 {
1751     nxt_memzero(lhe, sizeof(nxt_lvlhsh_each_t));
1752 
1753     lhe->proto = &lvlhsh_processes_proto;
1754 
1755     return nxt_runtime_process_next(rt, lhe);
1756 }
1757 
1758 
1759 nxt_port_t *
1760 nxt_runtime_port_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe)
1761 {
1762     return nxt_port_hash_first(&rt->ports, lhe);
1763 }
1764 
1765 
1766 void
1767 nxt_runtime_port_add(nxt_runtime_t *rt, nxt_port_t *port)
1768 {
1769     nxt_port_hash_add(&rt->ports, rt->mem_pool, port);
1770 
1771     rt->port_by_type[port->type] = port;
1772 }
1773 
1774 
1775 void
1776 nxt_runtime_port_remove(nxt_runtime_t *rt, nxt_port_t *port)
1777 {
1778     nxt_port_hash_remove(&rt->ports, rt->mem_pool, port);
1779 
1780     if (rt->port_by_type[port->type] == port) {
1781         rt->port_by_type[port->type] = NULL;
1782     }
1783 }
1784 
1785 
1786 nxt_port_t *
1787 nxt_runtime_port_find(nxt_runtime_t *rt, nxt_pid_t pid,
1788     nxt_port_id_t port_id)
1789 {
1790     return nxt_port_hash_find(&rt->ports, pid, port_id);
1791 }
1792