xref: /unit/src/nxt_runtime.c (revision 1489:4a3ec07f4b19)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Valentin V. Bartenev
5  * Copyright (C) NGINX, Inc.
6  */
7 
8 #include <nxt_main.h>
9 #include <nxt_runtime.h>
10 #include <nxt_port.h>
11 #include <nxt_main_process.h>
12 #include <nxt_router.h>
13 
14 
15 static nxt_int_t nxt_runtime_inherited_listen_sockets(nxt_task_t *task,
16     nxt_runtime_t *rt);
17 static nxt_int_t nxt_runtime_systemd_listen_sockets(nxt_task_t *task,
18     nxt_runtime_t *rt);
19 static nxt_int_t nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt);
20 static nxt_int_t nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt);
21 static void nxt_runtime_start(nxt_task_t *task, void *obj, void *data);
22 static void nxt_runtime_initial_start(nxt_task_t *task, nxt_uint_t status);
23 static void nxt_runtime_close_idle_connections(nxt_event_engine_t *engine);
24 static void nxt_runtime_stop_all_processes(nxt_task_t *task, nxt_runtime_t *rt);
25 static void nxt_runtime_exit(nxt_task_t *task, void *obj, void *data);
26 static nxt_int_t nxt_runtime_event_engine_change(nxt_task_t *task,
27     nxt_runtime_t *rt);
28 static nxt_int_t nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt);
29 static nxt_int_t nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt);
30 static nxt_int_t nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt);
31 static nxt_int_t nxt_runtime_log_files_init(nxt_runtime_t *rt);
32 static nxt_int_t nxt_runtime_log_files_create(nxt_task_t *task,
33     nxt_runtime_t *rt);
34 static nxt_int_t nxt_runtime_pid_file_create(nxt_task_t *task,
35     nxt_file_name_t *pid_file);
36 static void nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt,
37     nxt_runtime_cont_t cont);
38 static void nxt_runtime_thread_pool_init(void);
39 static void nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj,
40     void *data);
41 static nxt_process_t *nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid);
42 static void nxt_runtime_process_remove(nxt_runtime_t *rt,
43     nxt_process_t *process);
44 static void nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port);
45 
46 
47 nxt_int_t
48 nxt_runtime_create(nxt_task_t *task)
49 {
50     nxt_mp_t               *mp;
51     nxt_int_t              ret;
52     nxt_array_t            *listen_sockets;
53     nxt_runtime_t          *rt;
54     nxt_app_lang_module_t  *lang;
55 
56     mp = nxt_mp_create(1024, 128, 256, 32);
57     if (nxt_slow_path(mp == NULL)) {
58         return NXT_ERROR;
59     }
60 
61     rt = nxt_mp_zget(mp, sizeof(nxt_runtime_t));
62     if (nxt_slow_path(rt == NULL)) {
63         goto fail;
64     }
65 
66     task->thread->runtime = rt;
67     rt->mem_pool = mp;
68 
69     nxt_thread_mutex_create(&rt->processes_mutex);
70 
71     rt->services = nxt_services_init(mp);
72     if (nxt_slow_path(rt->services == NULL)) {
73         goto fail;
74     }
75 
76     rt->languages = nxt_array_create(mp, 1, sizeof(nxt_app_lang_module_t));
77     if (nxt_slow_path(rt->languages == NULL)) {
78         goto fail;
79     }
80 
81     /* Should not fail. */
82     lang = nxt_array_add(rt->languages);
83     lang->type = NXT_APP_EXTERNAL;
84     lang->version = (u_char *) "";
85     lang->file = NULL;
86     lang->module = &nxt_external_module;
87     lang->mounts = NULL;
88 
89     listen_sockets = nxt_array_create(mp, 1, sizeof(nxt_listen_socket_t));
90     if (nxt_slow_path(listen_sockets == NULL)) {
91         goto fail;
92     }
93 
94     rt->listen_sockets = listen_sockets;
95 
96     ret = nxt_runtime_inherited_listen_sockets(task, rt);
97     if (nxt_slow_path(ret != NXT_OK)) {
98         goto fail;
99     }
100 
101     if (nxt_runtime_hostname(task, rt) != NXT_OK) {
102         goto fail;
103     }
104 
105     if (nxt_slow_path(nxt_runtime_log_files_init(rt) != NXT_OK)) {
106         goto fail;
107     }
108 
109     if (nxt_runtime_event_engines(task, rt) != NXT_OK) {
110         goto fail;
111     }
112 
113     if (nxt_slow_path(nxt_runtime_thread_pools(task->thread, rt) != NXT_OK)) {
114         goto fail;
115     }
116 
117     rt->start = nxt_runtime_initial_start;
118 
119     if (nxt_runtime_conf_init(task, rt) != NXT_OK) {
120         goto fail;
121     }
122 
123     if (nxt_port_rpc_init() != NXT_OK) {
124         goto fail;
125     }
126 
127     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
128                        nxt_runtime_start, task, rt, NULL);
129 
130     return NXT_OK;
131 
132 fail:
133 
134     nxt_mp_destroy(mp);
135 
136     return NXT_ERROR;
137 }
138 
139 
140 static nxt_int_t
141 nxt_runtime_inherited_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt)
142 {
143     u_char               *v, *p;
144     nxt_int_t            type;
145     nxt_array_t          *inherited_sockets;
146     nxt_socket_t         s;
147     nxt_listen_socket_t  *ls;
148 
149     v = (u_char *) getenv("NGINX");
150 
151     if (v == NULL) {
152         return nxt_runtime_systemd_listen_sockets(task, rt);
153     }
154 
155     nxt_alert(task, "using inherited listen sockets: %s", v);
156 
157     inherited_sockets = nxt_array_create(rt->mem_pool,
158                                          1, sizeof(nxt_listen_socket_t));
159     if (inherited_sockets == NULL) {
160         return NXT_ERROR;
161     }
162 
163     rt->inherited_sockets = inherited_sockets;
164 
165     for (p = v; *p != '\0'; p++) {
166 
167         if (*p == ';') {
168             s = nxt_int_parse(v, p - v);
169 
170             if (nxt_slow_path(s < 0)) {
171                 nxt_alert(task, "invalid socket number \"%s\" "
172                           "in NGINX environment variable, "
173                           "ignoring the rest of the variable", v);
174                 return NXT_ERROR;
175             }
176 
177             v = p + 1;
178 
179             ls = nxt_array_zero_add(inherited_sockets);
180             if (nxt_slow_path(ls == NULL)) {
181                 return NXT_ERROR;
182             }
183 
184             ls->socket = s;
185 
186             ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s);
187             if (nxt_slow_path(ls->sockaddr == NULL)) {
188                 return NXT_ERROR;
189             }
190 
191             type = nxt_socket_getsockopt(task, s, SOL_SOCKET, SO_TYPE);
192             if (nxt_slow_path(type == -1)) {
193                 return NXT_ERROR;
194             }
195 
196             ls->sockaddr->type = (uint16_t) type;
197         }
198     }
199 
200     return NXT_OK;
201 }
202 
203 
204 static nxt_int_t
205 nxt_runtime_systemd_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt)
206 {
207     u_char               *nfd, *pid;
208     nxt_int_t            n;
209     nxt_array_t          *inherited_sockets;
210     nxt_socket_t         s;
211     nxt_listen_socket_t  *ls;
212 
213     /*
214      * Number of listening sockets passed.  The socket
215      * descriptors start from number 3 and are sequential.
216      */
217     nfd = (u_char *) getenv("LISTEN_FDS");
218     if (nfd == NULL) {
219         return NXT_OK;
220     }
221 
222     /* The pid of the service process. */
223     pid = (u_char *) getenv("LISTEN_PID");
224     if (pid == NULL) {
225         return NXT_OK;
226     }
227 
228     n = nxt_int_parse(nfd, nxt_strlen(nfd));
229     if (n < 0) {
230         return NXT_OK;
231     }
232 
233     if (nxt_pid != nxt_int_parse(pid, nxt_strlen(pid))) {
234         return NXT_OK;
235     }
236 
237     nxt_log(task, NXT_LOG_INFO, "using %i systemd listen sockets", n);
238 
239     inherited_sockets = nxt_array_create(rt->mem_pool,
240                                          n, sizeof(nxt_listen_socket_t));
241     if (inherited_sockets == NULL) {
242         return NXT_ERROR;
243     }
244 
245     rt->inherited_sockets = inherited_sockets;
246 
247     for (s = 3; s < n; s++) {
248         ls = nxt_array_zero_add(inherited_sockets);
249         if (nxt_slow_path(ls == NULL)) {
250             return NXT_ERROR;
251         }
252 
253         ls->socket = s;
254 
255         ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s);
256         if (nxt_slow_path(ls->sockaddr == NULL)) {
257             return NXT_ERROR;
258         }
259 
260         ls->sockaddr->type = SOCK_STREAM;
261     }
262 
263     return NXT_OK;
264 }
265 
266 
267 static nxt_int_t
268 nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt)
269 {
270     nxt_thread_t                 *thread;
271     nxt_event_engine_t           *engine;
272     const nxt_event_interface_t  *interface;
273 
274     interface = nxt_service_get(rt->services, "engine", NULL);
275 
276     if (nxt_slow_path(interface == NULL)) {
277         /* TODO: log */
278         return NXT_ERROR;
279     }
280 
281     engine = nxt_event_engine_create(task, interface,
282                                      nxt_main_process_signals, 0, 0);
283 
284     if (nxt_slow_path(engine == NULL)) {
285         return NXT_ERROR;
286     }
287 
288     thread = task->thread;
289     thread->engine = engine;
290 #if 0
291     thread->fiber = &engine->fibers->fiber;
292 #endif
293 
294     engine->id = rt->last_engine_id++;
295     engine->mem_pool = nxt_mp_create(1024, 128, 256, 32);
296 
297     nxt_queue_init(&rt->engines);
298     nxt_queue_insert_tail(&rt->engines, &engine->link);
299 
300     return NXT_OK;
301 }
302 
303 
304 static nxt_int_t
305 nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt)
306 {
307     nxt_int_t    ret;
308     nxt_array_t  *thread_pools;
309 
310     thread_pools = nxt_array_create(rt->mem_pool, 1,
311                                     sizeof(nxt_thread_pool_t *));
312 
313     if (nxt_slow_path(thread_pools == NULL)) {
314         return NXT_ERROR;
315     }
316 
317     rt->thread_pools = thread_pools;
318     ret = nxt_runtime_thread_pool_create(thr, rt, 2, 60000 * 1000000LL);
319 
320     if (nxt_slow_path(ret != NXT_OK)) {
321         return NXT_ERROR;
322     }
323 
324     return NXT_OK;
325 }
326 
327 
328 static void
329 nxt_runtime_start(nxt_task_t *task, void *obj, void *data)
330 {
331     nxt_runtime_t  *rt;
332 
333     rt = obj;
334 
335     nxt_debug(task, "rt conf done");
336 
337     task->thread->log->ctx_handler = NULL;
338     task->thread->log->ctx = NULL;
339 
340     if (nxt_runtime_log_files_create(task, rt) != NXT_OK) {
341         goto fail;
342     }
343 
344     if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) {
345         goto fail;
346     }
347 
348     /*
349      * Thread pools should be destroyed before starting worker
350      * processes, because thread pool semaphores will stick in
351      * locked state in new processes after fork().
352      */
353     nxt_runtime_thread_pool_destroy(task, rt, rt->start);
354 
355     return;
356 
357 fail:
358 
359     nxt_runtime_quit(task, 1);
360 }
361 
362 
363 static void
364 nxt_runtime_initial_start(nxt_task_t *task, nxt_uint_t status)
365 {
366     nxt_int_t                    ret;
367     nxt_thread_t                 *thr;
368     nxt_runtime_t                *rt;
369     const nxt_event_interface_t  *interface;
370 
371     thr = task->thread;
372     rt = thr->runtime;
373 
374     if (rt->inherited_sockets == NULL && rt->daemon) {
375 
376         if (nxt_process_daemon(task) != NXT_OK) {
377             goto fail;
378         }
379 
380         /*
381          * An event engine should be updated after fork()
382          * even if an event facility was not changed because:
383          * 1) inherited kqueue descriptor is invalid,
384          * 2) the signal thread is not inherited.
385          */
386         interface = nxt_service_get(rt->services, "engine", rt->engine);
387         if (interface == NULL) {
388             goto fail;
389         }
390 
391         ret = nxt_event_engine_change(task->thread->engine, interface,
392                                       rt->batch);
393         if (ret != NXT_OK) {
394             goto fail;
395         }
396     }
397 
398     ret = nxt_runtime_pid_file_create(task, rt->pid_file);
399     if (ret != NXT_OK) {
400         goto fail;
401     }
402 
403     if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) {
404         goto fail;
405     }
406 
407     thr->engine->max_connections = rt->engine_connections;
408 
409     if (nxt_main_process_start(thr, task, rt) != NXT_ERROR) {
410         return;
411     }
412 
413 fail:
414 
415     nxt_runtime_quit(task, 1);
416 }
417 
418 
419 void
420 nxt_runtime_quit(nxt_task_t *task, nxt_uint_t status)
421 {
422     nxt_bool_t          done;
423     nxt_runtime_t       *rt;
424     nxt_event_engine_t  *engine;
425 
426     rt = task->thread->runtime;
427     rt->status |= status;
428     engine = task->thread->engine;
429 
430     nxt_debug(task, "exiting");
431 
432     done = 1;
433 
434     if (!engine->shutdown) {
435         engine->shutdown = 1;
436 
437         if (!nxt_array_is_empty(rt->thread_pools)) {
438             nxt_runtime_thread_pool_destroy(task, rt, nxt_runtime_quit);
439             done = 0;
440         }
441 
442         if (rt->type == NXT_PROCESS_MAIN) {
443             nxt_runtime_stop_all_processes(task, rt);
444             done = 0;
445         }
446     }
447 
448     nxt_runtime_close_idle_connections(engine);
449 
450     if (done) {
451         nxt_work_queue_add(&engine->fast_work_queue, nxt_runtime_exit,
452                            task, rt, engine);
453     }
454 }
455 
456 
457 static void
458 nxt_runtime_close_idle_connections(nxt_event_engine_t *engine)
459 {
460     nxt_conn_t        *c;
461     nxt_queue_t       *idle;
462     nxt_queue_link_t  *link, *next;
463 
464     nxt_debug(&engine->task, "close idle connections");
465 
466     idle = &engine->idle_connections;
467 
468     for (link = nxt_queue_first(idle);
469          link != nxt_queue_tail(idle);
470          link = next)
471     {
472         next = nxt_queue_next(link);
473         c = nxt_queue_link_data(link, nxt_conn_t, link);
474 
475         if (!c->socket.read_ready) {
476             nxt_queue_remove(link);
477             nxt_conn_close(engine, c);
478         }
479     }
480 }
481 
482 
483 void
484 nxt_runtime_stop_app_processes(nxt_task_t *task, nxt_runtime_t *rt)
485 {
486     nxt_port_t          *port;
487     nxt_process_t       *process;
488     nxt_process_init_t  *init;
489 
490     nxt_runtime_process_each(rt, process) {
491 
492         init = nxt_process_init(process);
493 
494         if (init->type == NXT_PROCESS_APP) {
495 
496             nxt_process_port_each(process, port) {
497 
498                 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1,
499                                              0, 0, NULL);
500 
501             } nxt_process_port_loop;
502         }
503 
504     } nxt_runtime_process_loop;
505 }
506 
507 
508 static void
509 nxt_runtime_stop_all_processes(nxt_task_t *task, nxt_runtime_t *rt)
510 {
511     nxt_port_t     *port;
512     nxt_process_t  *process;
513 
514     nxt_runtime_process_each(rt, process) {
515 
516         nxt_process_port_each(process, port) {
517 
518             (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0,
519                                          0, NULL);
520 
521         } nxt_process_port_loop;
522 
523     } nxt_runtime_process_loop;
524 }
525 
526 
527 static void
528 nxt_runtime_exit(nxt_task_t *task, void *obj, void *data)
529 {
530     int                 status;
531     nxt_runtime_t       *rt;
532     nxt_process_t       *process;
533     nxt_event_engine_t  *engine;
534 
535     rt = obj;
536     engine = data;
537 
538     nxt_debug(task, "thread pools: %d", rt->thread_pools->nelts);
539 
540     if (!nxt_array_is_empty(rt->thread_pools)) {
541         return;
542     }
543 
544     if (rt->type == NXT_PROCESS_MAIN) {
545         if (rt->pid_file != NULL) {
546             nxt_file_delete(rt->pid_file);
547         }
548 
549 #if (NXT_HAVE_UNIX_DOMAIN)
550         {
551             nxt_sockaddr_t   *sa;
552             nxt_file_name_t  *name;
553 
554             sa = rt->controller_listen;
555 
556             if (sa->u.sockaddr.sa_family == AF_UNIX) {
557                 name = (nxt_file_name_t *) sa->u.sockaddr_un.sun_path;
558                 (void) nxt_file_delete(name);
559             }
560         }
561 #endif
562     }
563 
564     if (!engine->event.signal_support) {
565         nxt_event_engine_signals_stop(engine);
566     }
567 
568     nxt_runtime_process_each(rt, process) {
569 
570         nxt_process_close_ports(task, process);
571 
572     } nxt_runtime_process_loop;
573 
574     if (rt->port_by_type[rt->type] != NULL) {
575         nxt_port_use(task, rt->port_by_type[rt->type], -1);
576     }
577 
578     nxt_thread_mutex_destroy(&rt->processes_mutex);
579 
580     status = rt->status;
581     nxt_mp_destroy(rt->mem_pool);
582 
583     nxt_debug(task, "exit: %d", status);
584 
585     exit(status);
586     nxt_unreachable();
587 }
588 
589 
590 static nxt_int_t
591 nxt_runtime_event_engine_change(nxt_task_t *task, nxt_runtime_t *rt)
592 {
593     nxt_event_engine_t           *engine;
594     const nxt_event_interface_t  *interface;
595 
596     engine = task->thread->engine;
597 
598     if (engine->batch == rt->batch
599         && nxt_strcmp(engine->event.name, rt->engine) == 0)
600     {
601         return NXT_OK;
602     }
603 
604     interface = nxt_service_get(rt->services, "engine", rt->engine);
605 
606     if (interface != NULL) {
607         return nxt_event_engine_change(engine, interface, rt->batch);
608     }
609 
610     return NXT_ERROR;
611 }
612 
613 
614 void
615 nxt_runtime_event_engine_free(nxt_runtime_t *rt)
616 {
617     nxt_queue_link_t    *link;
618     nxt_event_engine_t  *engine;
619 
620     link = nxt_queue_first(&rt->engines);
621     nxt_queue_remove(link);
622 
623     engine = nxt_queue_link_data(link, nxt_event_engine_t, link);
624     nxt_event_engine_free(engine);
625 }
626 
627 
628 nxt_int_t
629 nxt_runtime_thread_pool_create(nxt_thread_t *thr, nxt_runtime_t *rt,
630     nxt_uint_t max_threads, nxt_nsec_t timeout)
631 {
632     nxt_thread_pool_t   *thread_pool, **tp;
633 
634     tp = nxt_array_add(rt->thread_pools);
635     if (tp == NULL) {
636         return NXT_ERROR;
637     }
638 
639     thread_pool = nxt_thread_pool_create(max_threads, timeout,
640                                          nxt_runtime_thread_pool_init,
641                                          thr->engine,
642                                          nxt_runtime_thread_pool_exit);
643 
644     if (nxt_fast_path(thread_pool != NULL)) {
645         *tp = thread_pool;
646     }
647 
648     return NXT_OK;
649 }
650 
651 
652 static void
653 nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt,
654     nxt_runtime_cont_t cont)
655 {
656     nxt_uint_t         n;
657     nxt_thread_pool_t  **tp;
658 
659     rt->continuation = cont;
660 
661     n = rt->thread_pools->nelts;
662 
663     if (n == 0) {
664         cont(task, 0);
665         return;
666     }
667 
668     tp = rt->thread_pools->elts;
669 
670     do {
671         nxt_thread_pool_destroy(*tp);
672 
673         tp++;
674         n--;
675     } while (n != 0);
676 }
677 
678 
679 static void
680 nxt_runtime_thread_pool_init(void)
681 {
682 #if (NXT_REGEX)
683     nxt_regex_init(0);
684 #endif
685 }
686 
687 
688 static void
689 nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj, void *data)
690 {
691     nxt_uint_t           i, n;
692     nxt_runtime_t        *rt;
693     nxt_thread_pool_t    *tp, **thread_pools;
694     nxt_thread_handle_t  handle;
695 
696     tp = obj;
697 
698     if (data != NULL) {
699         handle = (nxt_thread_handle_t) (uintptr_t) data;
700         nxt_thread_wait(handle);
701     }
702 
703     rt = task->thread->runtime;
704 
705     thread_pools = rt->thread_pools->elts;
706     n = rt->thread_pools->nelts;
707 
708     nxt_debug(task, "thread pools: %ui", n);
709 
710     for (i = 0; i < n; i++) {
711 
712         if (tp == thread_pools[i]) {
713             nxt_array_remove(rt->thread_pools, &thread_pools[i]);
714 
715             nxt_free(tp);
716 
717             if (n == 1) {
718                 /* The last thread pool. */
719                 rt->continuation(task, 0);
720             }
721 
722             return;
723         }
724     }
725 }
726 
727 
728 static nxt_int_t
729 nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt)
730 {
731     nxt_int_t                    ret;
732     nxt_str_t                    control;
733     nxt_uint_t                   n;
734     nxt_file_t                   *file;
735     const char                   *slash;
736     nxt_sockaddr_t               *sa;
737     nxt_file_name_str_t          file_name;
738     const nxt_event_interface_t  *interface;
739 
740     rt->daemon = 1;
741     rt->engine_connections = 256;
742     rt->auxiliary_threads = 2;
743     rt->user_cred.user = NXT_USER;
744     rt->group = NXT_GROUP;
745     rt->pid = NXT_PID;
746     rt->log = NXT_LOG;
747     rt->modules = NXT_MODULES;
748     rt->state = NXT_STATE;
749     rt->control = NXT_CONTROL_SOCK;
750     rt->tmp = NXT_TMP;
751 
752     nxt_memzero(&rt->capabilities, sizeof(nxt_capabilities_t));
753 
754     if (nxt_runtime_conf_read_cmd(task, rt) != NXT_OK) {
755         return NXT_ERROR;
756     }
757 
758     if (nxt_capability_set(task, &rt->capabilities) != NXT_OK) {
759         return NXT_ERROR;
760     }
761 
762     if (rt->capabilities.setid) {
763         ret = nxt_credential_get(task, rt->mem_pool, &rt->user_cred,
764                                 rt->group);
765 
766         if (nxt_slow_path(ret != NXT_OK)) {
767             return NXT_ERROR;
768         }
769 
770     } else {
771         nxt_log(task, NXT_LOG_WARN, "Unit is running unprivileged, then it "
772                 "cannot use arbitrary user and group.");
773     }
774 
775     /* An engine's parameters. */
776 
777     interface = nxt_service_get(rt->services, "engine", rt->engine);
778     if (interface == NULL) {
779         return NXT_ERROR;
780     }
781 
782     rt->engine = interface->name;
783 
784     ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%Z", rt->pid);
785     if (nxt_slow_path(ret != NXT_OK)) {
786         return NXT_ERROR;
787     }
788 
789     rt->pid_file = file_name.start;
790 
791     ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%Z", rt->log);
792     if (nxt_slow_path(ret != NXT_OK)) {
793         return NXT_ERROR;
794     }
795 
796     file = nxt_list_first(rt->log_files);
797     file->name = file_name.start;
798 
799     slash = "";
800     n = nxt_strlen(rt->modules);
801 
802     if (n > 1 && rt->modules[n - 1] != '/') {
803         slash = "/";
804     }
805 
806     ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%s*.unit.so%Z",
807                                rt->modules, slash);
808     if (nxt_slow_path(ret != NXT_OK)) {
809         return NXT_ERROR;
810     }
811 
812     rt->modules = (char *) file_name.start;
813 
814     slash = "";
815     n = nxt_strlen(rt->state);
816 
817     if (n > 1 && rt->state[n - 1] != '/') {
818         slash = "/";
819     }
820 
821     ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%sconf.json%Z",
822                                rt->state, slash);
823     if (nxt_slow_path(ret != NXT_OK)) {
824         return NXT_ERROR;
825     }
826 
827     rt->conf = (char *) file_name.start;
828 
829     ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s.tmp%Z", rt->conf);
830     if (nxt_slow_path(ret != NXT_OK)) {
831         return NXT_ERROR;
832     }
833 
834     rt->conf_tmp = (char *) file_name.start;
835 
836     ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%scerts/%Z",
837                                rt->state, slash);
838     if (nxt_slow_path(ret != NXT_OK)) {
839         return NXT_ERROR;
840     }
841 
842     ret = mkdir((char *) file_name.start, S_IRWXU);
843 
844     if (nxt_fast_path(ret == 0 || nxt_errno == EEXIST)) {
845         rt->certs.length = file_name.len;
846         rt->certs.start = file_name.start;
847 
848     } else {
849         nxt_alert(task, "Unable to create certificates storage directory: "
850                   "mkdir(%s) failed %E", file_name.start, nxt_errno);
851     }
852 
853     control.length = nxt_strlen(rt->control);
854     control.start = (u_char *) rt->control;
855 
856     sa = nxt_sockaddr_parse(rt->mem_pool, &control);
857     if (nxt_slow_path(sa == NULL)) {
858         return NXT_ERROR;
859     }
860 
861     sa->type = SOCK_STREAM;
862 
863     rt->controller_listen = sa;
864 
865     if (nxt_runtime_controller_socket(task, rt) != NXT_OK) {
866         return NXT_ERROR;
867     }
868 
869     return NXT_OK;
870 }
871 
872 
873 static nxt_int_t
874 nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt)
875 {
876     char    *p, **argv;
877     u_char  *end;
878     u_char  buf[1024];
879 
880     static const char  version[] =
881         "unit version: " NXT_VERSION "\n"
882         "configured as ./configure" NXT_CONFIGURE_OPTIONS "\n";
883 
884     static const char  no_control[] =
885                        "option \"--control\" requires socket address\n";
886     static const char  no_user[] = "option \"--user\" requires username\n";
887     static const char  no_group[] = "option \"--group\" requires group name\n";
888     static const char  no_pid[] = "option \"--pid\" requires filename\n";
889     static const char  no_log[] = "option \"--log\" requires filename\n";
890     static const char  no_modules[] =
891                        "option \"--modules\" requires directory\n";
892     static const char  no_state[] = "option \"--state\" requires directory\n";
893     static const char  no_tmp[] = "option \"--tmp\" requires directory\n";
894 
895     static const char  help[] =
896         "\n"
897         "unit options:\n"
898         "\n"
899         "  --version            print unit version and configure options\n"
900         "\n"
901         "  --no-daemon          run unit in non-daemon mode\n"
902         "\n"
903         "  --control ADDRESS    set address of control API socket\n"
904         "                       default: \"" NXT_CONTROL_SOCK "\"\n"
905         "\n"
906         "  --pid FILE           set pid filename\n"
907         "                       default: \"" NXT_PID "\"\n"
908         "\n"
909         "  --log FILE           set log filename\n"
910         "                       default: \"" NXT_LOG "\"\n"
911         "\n"
912         "  --modules DIRECTORY  set modules directory name\n"
913         "                       default: \"" NXT_MODULES "\"\n"
914         "\n"
915         "  --state DIRECTORY    set state directory name\n"
916         "                       default: \"" NXT_STATE "\"\n"
917         "\n"
918         "  --tmp DIRECTORY      set tmp directory name\n"
919         "                       default: \"" NXT_TMP "\"\n"
920         "\n"
921         "  --user USER          set non-privileged processes to run"
922                                 " as specified user\n"
923         "                       default: \"" NXT_USER "\"\n"
924         "\n"
925         "  --group GROUP        set non-privileged processes to run"
926                                 " as specified group\n"
927         "                       default: ";
928 
929     static const char  group[] = "\"" NXT_GROUP "\"\n\n";
930     static const char  primary[] = "user's primary group\n\n";
931 
932     argv = &nxt_process_argv[1];
933 
934     while (*argv != NULL) {
935         p = *argv++;
936 
937         if (nxt_strcmp(p, "--control") == 0) {
938             if (*argv == NULL) {
939                 write(STDERR_FILENO, no_control, nxt_length(no_control));
940                 return NXT_ERROR;
941             }
942 
943             p = *argv++;
944 
945             rt->control = p;
946 
947             continue;
948         }
949 
950         if (nxt_strcmp(p, "--user") == 0) {
951             if (*argv == NULL) {
952                 write(STDERR_FILENO, no_user, nxt_length(no_user));
953                 return NXT_ERROR;
954             }
955 
956             p = *argv++;
957 
958             rt->user_cred.user = p;
959 
960             continue;
961         }
962 
963         if (nxt_strcmp(p, "--group") == 0) {
964             if (*argv == NULL) {
965                 write(STDERR_FILENO, no_group, nxt_length(no_group));
966                 return NXT_ERROR;
967             }
968 
969             p = *argv++;
970 
971             rt->group = p;
972 
973             continue;
974         }
975 
976         if (nxt_strcmp(p, "--pid") == 0) {
977             if (*argv == NULL) {
978                 write(STDERR_FILENO, no_pid, nxt_length(no_pid));
979                 return NXT_ERROR;
980             }
981 
982             p = *argv++;
983 
984             rt->pid = p;
985 
986             continue;
987         }
988 
989         if (nxt_strcmp(p, "--log") == 0) {
990             if (*argv == NULL) {
991                 write(STDERR_FILENO, no_log, nxt_length(no_log));
992                 return NXT_ERROR;
993             }
994 
995             p = *argv++;
996 
997             rt->log = p;
998 
999             continue;
1000         }
1001 
1002         if (nxt_strcmp(p, "--modules") == 0) {
1003             if (*argv == NULL) {
1004                 write(STDERR_FILENO, no_modules, nxt_length(no_modules));
1005                 return NXT_ERROR;
1006             }
1007 
1008             p = *argv++;
1009 
1010             rt->modules = p;
1011 
1012             continue;
1013         }
1014 
1015         if (nxt_strcmp(p, "--state") == 0) {
1016             if (*argv == NULL) {
1017                 write(STDERR_FILENO, no_state, nxt_length(no_state));
1018                 return NXT_ERROR;
1019             }
1020 
1021             p = *argv++;
1022 
1023             rt->state = p;
1024 
1025             continue;
1026         }
1027 
1028         if (nxt_strcmp(p, "--tmp") == 0) {
1029             if (*argv == NULL) {
1030                 write(STDERR_FILENO, no_tmp, nxt_length(no_tmp));
1031                 return NXT_ERROR;
1032             }
1033 
1034             p = *argv++;
1035 
1036             rt->tmp = p;
1037 
1038             continue;
1039         }
1040 
1041         if (nxt_strcmp(p, "--no-daemon") == 0) {
1042             rt->daemon = 0;
1043             continue;
1044         }
1045 
1046         if (nxt_strcmp(p, "--version") == 0) {
1047             write(STDERR_FILENO, version, nxt_length(version));
1048             exit(0);
1049         }
1050 
1051         if (nxt_strcmp(p, "--help") == 0 || nxt_strcmp(p, "-h") == 0) {
1052             write(STDOUT_FILENO, help, nxt_length(help));
1053 
1054             if (sizeof(NXT_GROUP) == 1) {
1055                 write(STDOUT_FILENO, primary, nxt_length(primary));
1056 
1057             } else {
1058                 write(STDOUT_FILENO, group, nxt_length(group));
1059             }
1060 
1061             exit(0);
1062         }
1063 
1064         end = nxt_sprintf(buf, buf + sizeof(buf), "unknown option \"%s\", "
1065                           "try \"%s -h\" for available options\n",
1066                           p, nxt_process_argv[0]);
1067 
1068         write(STDERR_FILENO, buf, end - buf);
1069 
1070         return NXT_ERROR;
1071     }
1072 
1073     return NXT_OK;
1074 }
1075 
1076 
1077 nxt_listen_socket_t *
1078 nxt_runtime_listen_socket_add(nxt_runtime_t *rt, nxt_sockaddr_t *sa)
1079 {
1080     nxt_mp_t             *mp;
1081     nxt_listen_socket_t  *ls;
1082 
1083     ls = nxt_array_zero_add(rt->listen_sockets);
1084     if (ls == NULL) {
1085         return NULL;
1086     }
1087 
1088     mp = rt->mem_pool;
1089 
1090     ls->sockaddr = nxt_sockaddr_create(mp, &sa->u.sockaddr, sa->socklen,
1091                                        sa->length);
1092     if (ls->sockaddr == NULL) {
1093         return NULL;
1094     }
1095 
1096     ls->sockaddr->type = sa->type;
1097 
1098     nxt_sockaddr_text(ls->sockaddr);
1099 
1100     ls->socket = -1;
1101     ls->backlog = NXT_LISTEN_BACKLOG;
1102 
1103     return ls;
1104 }
1105 
1106 
1107 static nxt_int_t
1108 nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt)
1109 {
1110     size_t  length;
1111     char    hostname[NXT_MAXHOSTNAMELEN + 1];
1112 
1113     if (gethostname(hostname, NXT_MAXHOSTNAMELEN) != 0) {
1114         nxt_alert(task, "gethostname() failed %E", nxt_errno);
1115         return NXT_ERROR;
1116     }
1117 
1118     /*
1119      * Linux gethostname(2):
1120      *
1121      *    If the null-terminated hostname is too large to fit,
1122      *    then the name is truncated, and no error is returned.
1123      *
1124      * For this reason an additional byte is reserved in the buffer.
1125      */
1126     hostname[NXT_MAXHOSTNAMELEN] = '\0';
1127 
1128     length = nxt_strlen(hostname);
1129     rt->hostname.length = length;
1130 
1131     rt->hostname.start = nxt_mp_nget(rt->mem_pool, length);
1132 
1133     if (rt->hostname.start != NULL) {
1134         nxt_memcpy_lowcase(rt->hostname.start, (u_char *) hostname, length);
1135         return NXT_OK;
1136     }
1137 
1138     return NXT_ERROR;
1139 }
1140 
1141 
1142 static nxt_int_t
1143 nxt_runtime_log_files_init(nxt_runtime_t *rt)
1144 {
1145     nxt_file_t  *file;
1146     nxt_list_t  *log_files;
1147 
1148     log_files = nxt_list_create(rt->mem_pool, 1, sizeof(nxt_file_t));
1149 
1150     if (nxt_fast_path(log_files != NULL)) {
1151         rt->log_files = log_files;
1152 
1153         /* Preallocate the main log.  This allocation cannot fail. */
1154         file = nxt_list_zero_add(log_files);
1155 
1156         file->fd = NXT_FILE_INVALID;
1157         file->log_level = NXT_LOG_ALERT;
1158 
1159         return NXT_OK;
1160     }
1161 
1162     return NXT_ERROR;
1163 }
1164 
1165 
1166 nxt_file_t *
1167 nxt_runtime_log_file_add(nxt_runtime_t *rt, nxt_str_t *name)
1168 {
1169     nxt_int_t            ret;
1170     nxt_file_t           *file;
1171     nxt_file_name_str_t  file_name;
1172 
1173     ret = nxt_file_name_create(rt->mem_pool, &file_name, "V%Z", name);
1174 
1175     if (nxt_slow_path(ret != NXT_OK)) {
1176         return NULL;
1177     }
1178 
1179     nxt_list_each(file, rt->log_files) {
1180 
1181         /* STUB: hardecoded case sensitive/insensitive. */
1182 
1183         if (file->name != NULL
1184             && nxt_file_name_eq(file->name, file_name.start))
1185         {
1186             return file;
1187         }
1188 
1189     } nxt_list_loop;
1190 
1191     file = nxt_list_zero_add(rt->log_files);
1192 
1193     if (nxt_slow_path(file == NULL)) {
1194         return NULL;
1195     }
1196 
1197     file->fd = NXT_FILE_INVALID;
1198     file->log_level = NXT_LOG_ALERT;
1199     file->name = file_name.start;
1200 
1201     return file;
1202 }
1203 
1204 
1205 static nxt_int_t
1206 nxt_runtime_log_files_create(nxt_task_t *task, nxt_runtime_t *rt)
1207 {
1208     nxt_int_t   ret;
1209     nxt_file_t  *file;
1210 
1211     nxt_list_each(file, rt->log_files) {
1212 
1213         ret = nxt_file_open(task, file, O_WRONLY | O_APPEND, O_CREAT,
1214                             NXT_FILE_OWNER_ACCESS);
1215 
1216         if (ret != NXT_OK) {
1217             return NXT_ERROR;
1218         }
1219 
1220     } nxt_list_loop;
1221 
1222     file = nxt_list_first(rt->log_files);
1223 
1224     return nxt_file_stderr(file);
1225 }
1226 
1227 
1228 nxt_int_t
1229 nxt_runtime_listen_sockets_create(nxt_task_t *task, nxt_runtime_t *rt)
1230 {
1231     nxt_int_t            ret;
1232     nxt_uint_t           c, p, ncurr, nprev;
1233     nxt_listen_socket_t  *curr, *prev;
1234 
1235     curr = rt->listen_sockets->elts;
1236     ncurr = rt->listen_sockets->nelts;
1237 
1238     if (rt->inherited_sockets != NULL) {
1239         prev = rt->inherited_sockets->elts;
1240         nprev = rt->inherited_sockets->nelts;
1241 
1242     } else {
1243         prev = NULL;
1244         nprev = 0;
1245     }
1246 
1247     for (c = 0; c < ncurr; c++) {
1248 
1249         for (p = 0; p < nprev; p++) {
1250 
1251             if (nxt_sockaddr_cmp(curr[c].sockaddr, prev[p].sockaddr)) {
1252 
1253                 ret = nxt_listen_socket_update(task, &curr[c], &prev[p]);
1254                 if (ret != NXT_OK) {
1255                     return NXT_ERROR;
1256                 }
1257 
1258                 goto next;
1259             }
1260         }
1261 
1262         if (nxt_listen_socket_create(task, rt->mem_pool, &curr[c]) != NXT_OK) {
1263             return NXT_ERROR;
1264         }
1265 
1266     next:
1267 
1268         continue;
1269     }
1270 
1271     return NXT_OK;
1272 }
1273 
1274 
1275 nxt_int_t
1276 nxt_runtime_listen_sockets_enable(nxt_task_t *task, nxt_runtime_t *rt)
1277 {
1278     nxt_uint_t           i, n;
1279     nxt_listen_socket_t  *ls;
1280 
1281     ls = rt->listen_sockets->elts;
1282     n = rt->listen_sockets->nelts;
1283 
1284     for (i = 0; i < n; i++) {
1285         if (ls[i].flags == NXT_NONBLOCK) {
1286             if (nxt_listen_event(task, &ls[i]) == NULL) {
1287                 return NXT_ERROR;
1288             }
1289         }
1290     }
1291 
1292     return NXT_OK;
1293 }
1294 
1295 
1296 nxt_str_t *
1297 nxt_current_directory(nxt_mp_t *mp)
1298 {
1299     size_t     length;
1300     u_char     *p;
1301     nxt_str_t  *name;
1302     char       buf[NXT_MAX_PATH_LEN];
1303 
1304     length = nxt_dir_current(buf, NXT_MAX_PATH_LEN);
1305 
1306     if (nxt_fast_path(length != 0)) {
1307         name = nxt_str_alloc(mp, length + 1);
1308 
1309         if (nxt_fast_path(name != NULL)) {
1310             p = nxt_cpymem(name->start, buf, length);
1311             *p = '/';
1312 
1313             return name;
1314         }
1315     }
1316 
1317     return NULL;
1318 }
1319 
1320 
1321 static nxt_int_t
1322 nxt_runtime_pid_file_create(nxt_task_t *task, nxt_file_name_t *pid_file)
1323 {
1324     ssize_t     length;
1325     nxt_int_t   n;
1326     nxt_file_t  file;
1327     u_char      pid[NXT_INT64_T_LEN + nxt_length("\n")];
1328 
1329     nxt_memzero(&file, sizeof(nxt_file_t));
1330 
1331     file.name = pid_file;
1332 
1333     n = nxt_file_open(task, &file, O_WRONLY, O_CREAT | O_TRUNC,
1334                       NXT_FILE_DEFAULT_ACCESS);
1335 
1336     if (n != NXT_OK) {
1337         return NXT_ERROR;
1338     }
1339 
1340     length = nxt_sprintf(pid, pid + sizeof(pid), "%PI%n", nxt_pid) - pid;
1341 
1342     if (nxt_file_write(&file, pid, length, 0) != length) {
1343         return NXT_ERROR;
1344     }
1345 
1346     nxt_file_close(task, &file);
1347 
1348     return NXT_OK;
1349 }
1350 
1351 
1352 nxt_process_t *
1353 nxt_runtime_process_new(nxt_runtime_t *rt)
1354 {
1355     nxt_process_t  *process;
1356 
1357     /* TODO: memory failures. */
1358 
1359     process = nxt_mp_zalloc(rt->mem_pool,
1360                             sizeof(nxt_process_t) + sizeof(nxt_process_init_t));
1361 
1362     if (nxt_slow_path(process == NULL)) {
1363         return NULL;
1364     }
1365 
1366     nxt_queue_init(&process->ports);
1367 
1368     nxt_thread_mutex_create(&process->incoming.mutex);
1369     nxt_thread_mutex_create(&process->outgoing.mutex);
1370     nxt_thread_mutex_create(&process->cp_mutex);
1371 
1372     process->use_count = 1;
1373 
1374     return process;
1375 }
1376 
1377 
1378 void
1379 nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process)
1380 {
1381     nxt_port_t  *port;
1382 
1383     if (process->registered == 1) {
1384         nxt_runtime_process_remove(rt, process);
1385     }
1386 
1387     nxt_assert(process->use_count == 0);
1388     nxt_assert(process->registered == 0);
1389 
1390     nxt_port_mmaps_destroy(&process->incoming, 1);
1391     nxt_port_mmaps_destroy(&process->outgoing, 1);
1392 
1393     do {
1394         port = nxt_port_hash_retrieve(&process->connected_ports);
1395 
1396     } while (port != NULL);
1397 
1398     nxt_thread_mutex_destroy(&process->incoming.mutex);
1399     nxt_thread_mutex_destroy(&process->outgoing.mutex);
1400     nxt_thread_mutex_destroy(&process->cp_mutex);
1401 
1402     /* processes from nxt_runtime_process_get() have no memory pool */
1403     if (process->mem_pool != NULL) {
1404         nxt_mp_destroy(process->mem_pool);
1405     }
1406 
1407     nxt_mp_free(rt->mem_pool, process);
1408 }
1409 
1410 
1411 static nxt_int_t
1412 nxt_runtime_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data)
1413 {
1414     nxt_process_t  *process;
1415 
1416     process = data;
1417 
1418     if (lhq->key.length == sizeof(nxt_pid_t)
1419         && *(nxt_pid_t *) lhq->key.start == process->pid)
1420     {
1421         return NXT_OK;
1422     }
1423 
1424     return NXT_DECLINED;
1425 }
1426 
1427 static const nxt_lvlhsh_proto_t  lvlhsh_processes_proto  nxt_aligned(64) = {
1428     NXT_LVLHSH_DEFAULT,
1429     nxt_runtime_lvlhsh_pid_test,
1430     nxt_lvlhsh_alloc,
1431     nxt_lvlhsh_free,
1432 };
1433 
1434 
1435 nxt_inline void
1436 nxt_runtime_process_lhq_pid(nxt_lvlhsh_query_t *lhq, nxt_pid_t *pid)
1437 {
1438     lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid));
1439     lhq->key.length = sizeof(*pid);
1440     lhq->key.start = (u_char *) pid;
1441     lhq->proto = &lvlhsh_processes_proto;
1442 }
1443 
1444 
1445 nxt_process_t *
1446 nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid)
1447 {
1448     nxt_process_t       *process;
1449     nxt_lvlhsh_query_t  lhq;
1450 
1451     process = NULL;
1452 
1453     nxt_runtime_process_lhq_pid(&lhq, &pid);
1454 
1455     nxt_thread_mutex_lock(&rt->processes_mutex);
1456 
1457     if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) {
1458         process = lhq.value;
1459 
1460     } else {
1461         nxt_thread_log_debug("process %PI not found", pid);
1462     }
1463 
1464     nxt_thread_mutex_unlock(&rt->processes_mutex);
1465 
1466     return process;
1467 }
1468 
1469 
1470 static nxt_process_t *
1471 nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid)
1472 {
1473     nxt_process_t       *process;
1474     nxt_lvlhsh_query_t  lhq;
1475 
1476     nxt_runtime_process_lhq_pid(&lhq, &pid);
1477 
1478     nxt_thread_mutex_lock(&rt->processes_mutex);
1479 
1480     if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) {
1481         nxt_thread_log_debug("process %PI found", pid);
1482 
1483         nxt_thread_mutex_unlock(&rt->processes_mutex);
1484 
1485         process = lhq.value;
1486         process->use_count++;
1487 
1488         return process;
1489     }
1490 
1491     process = nxt_runtime_process_new(rt);
1492     if (nxt_slow_path(process == NULL)) {
1493 
1494         nxt_thread_mutex_unlock(&rt->processes_mutex);
1495 
1496         return NULL;
1497     }
1498 
1499     process->pid = pid;
1500 
1501     lhq.replace = 0;
1502     lhq.value = process;
1503     lhq.pool = rt->mem_pool;
1504 
1505     switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) {
1506 
1507     case NXT_OK:
1508         if (rt->nprocesses == 0) {
1509             rt->mprocess = process;
1510         }
1511 
1512         rt->nprocesses++;
1513 
1514         process->registered = 1;
1515 
1516         nxt_thread_log_debug("process %PI insert", pid);
1517         break;
1518 
1519     default:
1520         nxt_thread_log_debug("process %PI insert failed", pid);
1521         break;
1522     }
1523 
1524     nxt_thread_mutex_unlock(&rt->processes_mutex);
1525 
1526     return process;
1527 }
1528 
1529 
1530 void
1531 nxt_runtime_process_add(nxt_task_t *task, nxt_process_t *process)
1532 {
1533     nxt_port_t          *port;
1534     nxt_runtime_t       *rt;
1535     nxt_lvlhsh_query_t  lhq;
1536 
1537     nxt_assert(process->registered == 0);
1538 
1539     rt = task->thread->runtime;
1540 
1541     nxt_runtime_process_lhq_pid(&lhq, &process->pid);
1542 
1543     lhq.replace = 0;
1544     lhq.value = process;
1545     lhq.pool = rt->mem_pool;
1546 
1547     nxt_thread_mutex_lock(&rt->processes_mutex);
1548 
1549     switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) {
1550 
1551     case NXT_OK:
1552         if (rt->nprocesses == 0) {
1553             rt->mprocess = process;
1554         }
1555 
1556         rt->nprocesses++;
1557 
1558         nxt_process_port_each(process, port) {
1559 
1560             port->pid = process->pid;
1561 
1562             nxt_runtime_port_add(task, port);
1563 
1564         } nxt_process_port_loop;
1565 
1566         process->registered = 1;
1567 
1568         nxt_thread_log_debug("process %PI added", process->pid);
1569         break;
1570 
1571     default:
1572         nxt_thread_log_debug("process %PI failed to add", process->pid);
1573         break;
1574     }
1575 
1576     nxt_thread_mutex_unlock(&rt->processes_mutex);
1577 }
1578 
1579 
1580 static void
1581 nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process)
1582 {
1583     nxt_pid_t           pid;
1584     nxt_lvlhsh_query_t  lhq;
1585 
1586     pid = process->pid;
1587 
1588     nxt_runtime_process_lhq_pid(&lhq, &pid);
1589 
1590     lhq.pool = rt->mem_pool;
1591 
1592     nxt_thread_mutex_lock(&rt->processes_mutex);
1593 
1594     switch (nxt_lvlhsh_delete(&rt->processes, &lhq)) {
1595 
1596     case NXT_OK:
1597         rt->nprocesses--;
1598 
1599         process = lhq.value;
1600 
1601         process->registered = 0;
1602 
1603         nxt_thread_log_debug("process %PI removed", pid);
1604         break;
1605 
1606     default:
1607         nxt_thread_log_debug("process %PI remove failed", pid);
1608         break;
1609     }
1610 
1611     nxt_thread_mutex_unlock(&rt->processes_mutex);
1612 }
1613 
1614 
1615 nxt_process_t *
1616 nxt_runtime_process_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe)
1617 {
1618     nxt_lvlhsh_each_init(lhe, &lvlhsh_processes_proto);
1619 
1620     return nxt_runtime_process_next(rt, lhe);
1621 }
1622 
1623 
1624 nxt_port_t *
1625 nxt_runtime_process_port_create(nxt_task_t *task, nxt_runtime_t *rt,
1626     nxt_pid_t pid, nxt_port_id_t id, nxt_process_type_t type)
1627 {
1628     nxt_port_t     *port;
1629     nxt_process_t  *process;
1630 
1631     process = nxt_runtime_process_get(rt, pid);
1632     if (nxt_slow_path(process == NULL)) {
1633         return NULL;
1634     }
1635 
1636     port = nxt_port_new(task, id, pid, type);
1637     if (nxt_slow_path(port == NULL)) {
1638         nxt_process_use(task, process, -1);
1639         return NULL;
1640     }
1641 
1642     nxt_process_port_add(task, process, port);
1643 
1644     nxt_process_use(task, process, -1);
1645 
1646     nxt_runtime_port_add(task, port);
1647 
1648     nxt_port_use(task, port, -1);
1649 
1650     return port;
1651 }
1652 
1653 
1654 static void
1655 nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port)
1656 {
1657     nxt_int_t      res;
1658     nxt_runtime_t  *rt;
1659 
1660     rt = task->thread->runtime;
1661 
1662     res = nxt_port_hash_add(&rt->ports, port);
1663 
1664     if (res != NXT_OK) {
1665         return;
1666     }
1667 
1668     rt->port_by_type[port->type] = port;
1669 
1670     nxt_port_use(task, port, 1);
1671 }
1672 
1673 
1674 void
1675 nxt_runtime_port_remove(nxt_task_t *task, nxt_port_t *port)
1676 {
1677     nxt_int_t      res;
1678     nxt_runtime_t  *rt;
1679 
1680     rt = task->thread->runtime;
1681 
1682     res = nxt_port_hash_remove(&rt->ports, port);
1683 
1684     if (res != NXT_OK) {
1685         return;
1686     }
1687 
1688     if (rt->port_by_type[port->type] == port) {
1689         rt->port_by_type[port->type] = NULL;
1690     }
1691 
1692     nxt_port_use(task, port, -1);
1693 }
1694 
1695 
1696 nxt_port_t *
1697 nxt_runtime_port_find(nxt_runtime_t *rt, nxt_pid_t pid,
1698     nxt_port_id_t port_id)
1699 {
1700     return nxt_port_hash_find(&rt->ports, pid, port_id);
1701 }
1702