xref: /unit/src/nxt_runtime.c (revision 1251:5ce938471a4e)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Valentin V. Bartenev
5  * Copyright (C) NGINX, Inc.
6  */
7 
8 #include <nxt_main.h>
9 #include <nxt_runtime.h>
10 #include <nxt_port.h>
11 #include <nxt_main_process.h>
12 #include <nxt_router.h>
13 
14 
15 static nxt_int_t nxt_runtime_inherited_listen_sockets(nxt_task_t *task,
16     nxt_runtime_t *rt);
17 static nxt_int_t nxt_runtime_systemd_listen_sockets(nxt_task_t *task,
18     nxt_runtime_t *rt);
19 static nxt_int_t nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt);
20 static nxt_int_t nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt);
21 static void nxt_runtime_start(nxt_task_t *task, void *obj, void *data);
22 static void nxt_runtime_initial_start(nxt_task_t *task, nxt_uint_t status);
23 static void nxt_runtime_close_idle_connections(nxt_event_engine_t *engine);
24 static void nxt_runtime_exit(nxt_task_t *task, void *obj, void *data);
25 static nxt_int_t nxt_runtime_event_engine_change(nxt_task_t *task,
26     nxt_runtime_t *rt);
27 static nxt_int_t nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt);
28 static nxt_int_t nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt);
29 static nxt_int_t nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt);
30 static nxt_int_t nxt_runtime_log_files_init(nxt_runtime_t *rt);
31 static nxt_int_t nxt_runtime_log_files_create(nxt_task_t *task,
32     nxt_runtime_t *rt);
33 static nxt_int_t nxt_runtime_pid_file_create(nxt_task_t *task,
34     nxt_file_name_t *pid_file);
35 static void nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt,
36     nxt_runtime_cont_t cont);
37 static void nxt_runtime_thread_pool_init(void);
38 static void nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj,
39     void *data);
40 static void nxt_runtime_process_destroy(nxt_runtime_t *rt,
41     nxt_process_t *process);
42 static nxt_process_t *nxt_runtime_process_remove_pid(nxt_runtime_t *rt,
43     nxt_pid_t pid);
44 
45 
46 nxt_int_t
47 nxt_runtime_create(nxt_task_t *task)
48 {
49     nxt_mp_t               *mp;
50     nxt_int_t              ret;
51     nxt_array_t            *listen_sockets;
52     nxt_runtime_t          *rt;
53     nxt_app_lang_module_t  *lang;
54 
55     mp = nxt_mp_create(1024, 128, 256, 32);
56     if (nxt_slow_path(mp == NULL)) {
57         return NXT_ERROR;
58     }
59 
60     rt = nxt_mp_zget(mp, sizeof(nxt_runtime_t));
61     if (nxt_slow_path(rt == NULL)) {
62         goto fail;
63     }
64 
65     task->thread->runtime = rt;
66     rt->mem_pool = mp;
67 
68     nxt_thread_mutex_create(&rt->processes_mutex);
69 
70     rt->services = nxt_services_init(mp);
71     if (nxt_slow_path(rt->services == NULL)) {
72         goto fail;
73     }
74 
75     rt->languages = nxt_array_create(mp, 1, sizeof(nxt_app_lang_module_t));
76     if (nxt_slow_path(rt->languages == NULL)) {
77         goto fail;
78     }
79 
80     /* Should not fail. */
81     lang = nxt_array_add(rt->languages);
82     lang->type = NXT_APP_EXTERNAL;
83     lang->version = (u_char *) "";
84     lang->file = NULL;
85     lang->module = &nxt_external_module;
86 
87     listen_sockets = nxt_array_create(mp, 1, sizeof(nxt_listen_socket_t));
88     if (nxt_slow_path(listen_sockets == NULL)) {
89         goto fail;
90     }
91 
92     rt->listen_sockets = listen_sockets;
93 
94     ret = nxt_runtime_inherited_listen_sockets(task, rt);
95     if (nxt_slow_path(ret != NXT_OK)) {
96         goto fail;
97     }
98 
99     if (nxt_runtime_hostname(task, rt) != NXT_OK) {
100         goto fail;
101     }
102 
103     if (nxt_slow_path(nxt_runtime_log_files_init(rt) != NXT_OK)) {
104         goto fail;
105     }
106 
107     if (nxt_runtime_event_engines(task, rt) != NXT_OK) {
108         goto fail;
109     }
110 
111     if (nxt_slow_path(nxt_runtime_thread_pools(task->thread, rt) != NXT_OK)) {
112         goto fail;
113     }
114 
115     rt->start = nxt_runtime_initial_start;
116 
117     if (nxt_runtime_conf_init(task, rt) != NXT_OK) {
118         goto fail;
119     }
120 
121     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
122                        nxt_runtime_start, task, rt, NULL);
123 
124     return NXT_OK;
125 
126 fail:
127 
128     nxt_mp_destroy(mp);
129 
130     return NXT_ERROR;
131 }
132 
133 
134 static nxt_int_t
135 nxt_runtime_inherited_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt)
136 {
137     u_char               *v, *p;
138     nxt_int_t            type;
139     nxt_array_t          *inherited_sockets;
140     nxt_socket_t         s;
141     nxt_listen_socket_t  *ls;
142 
143     v = (u_char *) getenv("NGINX");
144 
145     if (v == NULL) {
146         return nxt_runtime_systemd_listen_sockets(task, rt);
147     }
148 
149     nxt_alert(task, "using inherited listen sockets: %s", v);
150 
151     inherited_sockets = nxt_array_create(rt->mem_pool,
152                                          1, sizeof(nxt_listen_socket_t));
153     if (inherited_sockets == NULL) {
154         return NXT_ERROR;
155     }
156 
157     rt->inherited_sockets = inherited_sockets;
158 
159     for (p = v; *p != '\0'; p++) {
160 
161         if (*p == ';') {
162             s = nxt_int_parse(v, p - v);
163 
164             if (nxt_slow_path(s < 0)) {
165                 nxt_alert(task, "invalid socket number \"%s\" "
166                           "in NGINX environment variable, "
167                           "ignoring the rest of the variable", v);
168                 return NXT_ERROR;
169             }
170 
171             v = p + 1;
172 
173             ls = nxt_array_zero_add(inherited_sockets);
174             if (nxt_slow_path(ls == NULL)) {
175                 return NXT_ERROR;
176             }
177 
178             ls->socket = s;
179 
180             ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s);
181             if (nxt_slow_path(ls->sockaddr == NULL)) {
182                 return NXT_ERROR;
183             }
184 
185             type = nxt_socket_getsockopt(task, s, SOL_SOCKET, SO_TYPE);
186             if (nxt_slow_path(type == -1)) {
187                 return NXT_ERROR;
188             }
189 
190             ls->sockaddr->type = (uint16_t) type;
191         }
192     }
193 
194     return NXT_OK;
195 }
196 
197 
198 static nxt_int_t
199 nxt_runtime_systemd_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt)
200 {
201     u_char               *nfd, *pid;
202     nxt_int_t            n;
203     nxt_array_t          *inherited_sockets;
204     nxt_socket_t         s;
205     nxt_listen_socket_t  *ls;
206 
207     /*
208      * Number of listening sockets passed.  The socket
209      * descriptors start from number 3 and are sequential.
210      */
211     nfd = (u_char *) getenv("LISTEN_FDS");
212     if (nfd == NULL) {
213         return NXT_OK;
214     }
215 
216     /* The pid of the service process. */
217     pid = (u_char *) getenv("LISTEN_PID");
218     if (pid == NULL) {
219         return NXT_OK;
220     }
221 
222     n = nxt_int_parse(nfd, nxt_strlen(nfd));
223     if (n < 0) {
224         return NXT_OK;
225     }
226 
227     if (nxt_pid != nxt_int_parse(pid, nxt_strlen(pid))) {
228         return NXT_OK;
229     }
230 
231     nxt_log(task, NXT_LOG_INFO, "using %i systemd listen sockets", n);
232 
233     inherited_sockets = nxt_array_create(rt->mem_pool,
234                                          n, sizeof(nxt_listen_socket_t));
235     if (inherited_sockets == NULL) {
236         return NXT_ERROR;
237     }
238 
239     rt->inherited_sockets = inherited_sockets;
240 
241     for (s = 3; s < n; s++) {
242         ls = nxt_array_zero_add(inherited_sockets);
243         if (nxt_slow_path(ls == NULL)) {
244             return NXT_ERROR;
245         }
246 
247         ls->socket = s;
248 
249         ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s);
250         if (nxt_slow_path(ls->sockaddr == NULL)) {
251             return NXT_ERROR;
252         }
253 
254         ls->sockaddr->type = SOCK_STREAM;
255     }
256 
257     return NXT_OK;
258 }
259 
260 
261 static nxt_int_t
262 nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt)
263 {
264     nxt_thread_t                 *thread;
265     nxt_event_engine_t           *engine;
266     const nxt_event_interface_t  *interface;
267 
268     interface = nxt_service_get(rt->services, "engine", NULL);
269 
270     if (nxt_slow_path(interface == NULL)) {
271         /* TODO: log */
272         return NXT_ERROR;
273     }
274 
275     engine = nxt_event_engine_create(task, interface,
276                                      nxt_main_process_signals, 0, 0);
277 
278     if (nxt_slow_path(engine == NULL)) {
279         return NXT_ERROR;
280     }
281 
282     thread = task->thread;
283     thread->engine = engine;
284 #if 0
285     thread->fiber = &engine->fibers->fiber;
286 #endif
287 
288     engine->id = rt->last_engine_id++;
289     engine->mem_pool = nxt_mp_create(1024, 128, 256, 32);
290 
291     nxt_queue_init(&rt->engines);
292     nxt_queue_insert_tail(&rt->engines, &engine->link);
293 
294     return NXT_OK;
295 }
296 
297 
298 static nxt_int_t
299 nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt)
300 {
301     nxt_int_t    ret;
302     nxt_array_t  *thread_pools;
303 
304     thread_pools = nxt_array_create(rt->mem_pool, 1,
305                                     sizeof(nxt_thread_pool_t *));
306 
307     if (nxt_slow_path(thread_pools == NULL)) {
308         return NXT_ERROR;
309     }
310 
311     rt->thread_pools = thread_pools;
312     ret = nxt_runtime_thread_pool_create(thr, rt, 2, 60000 * 1000000LL);
313 
314     if (nxt_slow_path(ret != NXT_OK)) {
315         return NXT_ERROR;
316     }
317 
318     return NXT_OK;
319 }
320 
321 
322 static void
323 nxt_runtime_start(nxt_task_t *task, void *obj, void *data)
324 {
325     nxt_runtime_t  *rt;
326 
327     rt = obj;
328 
329     nxt_debug(task, "rt conf done");
330 
331     task->thread->log->ctx_handler = NULL;
332     task->thread->log->ctx = NULL;
333 
334     if (nxt_runtime_log_files_create(task, rt) != NXT_OK) {
335         goto fail;
336     }
337 
338     if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) {
339         goto fail;
340     }
341 
342     /*
343      * Thread pools should be destroyed before starting worker
344      * processes, because thread pool semaphores will stick in
345      * locked state in new processes after fork().
346      */
347     nxt_runtime_thread_pool_destroy(task, rt, rt->start);
348 
349     return;
350 
351 fail:
352 
353     nxt_runtime_quit(task, 1);
354 }
355 
356 
357 static void
358 nxt_runtime_initial_start(nxt_task_t *task, nxt_uint_t status)
359 {
360     nxt_int_t                    ret;
361     nxt_thread_t                 *thr;
362     nxt_runtime_t                *rt;
363     const nxt_event_interface_t  *interface;
364 
365     thr = task->thread;
366     rt = thr->runtime;
367 
368     if (rt->inherited_sockets == NULL && rt->daemon) {
369 
370         if (nxt_process_daemon(task) != NXT_OK) {
371             goto fail;
372         }
373 
374         /*
375          * An event engine should be updated after fork()
376          * even if an event facility was not changed because:
377          * 1) inherited kqueue descriptor is invalid,
378          * 2) the signal thread is not inherited.
379          */
380         interface = nxt_service_get(rt->services, "engine", rt->engine);
381         if (interface == NULL) {
382             goto fail;
383         }
384 
385         ret = nxt_event_engine_change(task->thread->engine, interface,
386                                       rt->batch);
387         if (ret != NXT_OK) {
388             goto fail;
389         }
390     }
391 
392     ret = nxt_runtime_pid_file_create(task, rt->pid_file);
393     if (ret != NXT_OK) {
394         goto fail;
395     }
396 
397     if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) {
398         goto fail;
399     }
400 
401     thr->engine->max_connections = rt->engine_connections;
402 
403     if (nxt_main_process_start(thr, task, rt) != NXT_ERROR) {
404         return;
405     }
406 
407 fail:
408 
409     nxt_runtime_quit(task, 1);
410 }
411 
412 
413 void
414 nxt_runtime_quit(nxt_task_t *task, nxt_uint_t status)
415 {
416     nxt_bool_t          done;
417     nxt_runtime_t       *rt;
418     nxt_event_engine_t  *engine;
419 
420     rt = task->thread->runtime;
421     rt->status |= status;
422     engine = task->thread->engine;
423 
424     nxt_debug(task, "exiting");
425 
426     done = 1;
427 
428     if (!engine->shutdown) {
429         engine->shutdown = 1;
430 
431         if (!nxt_array_is_empty(rt->thread_pools)) {
432             nxt_runtime_thread_pool_destroy(task, rt, nxt_runtime_quit);
433             done = 0;
434         }
435 
436         if (rt->type == NXT_PROCESS_MAIN) {
437             nxt_main_stop_all_processes(task, rt);
438             done = 0;
439         }
440     }
441 
442     nxt_runtime_close_idle_connections(engine);
443 
444     if (done) {
445         nxt_work_queue_add(&engine->fast_work_queue, nxt_runtime_exit,
446                            task, rt, engine);
447     }
448 }
449 
450 
451 static void
452 nxt_runtime_close_idle_connections(nxt_event_engine_t *engine)
453 {
454     nxt_conn_t        *c;
455     nxt_queue_t       *idle;
456     nxt_queue_link_t  *link, *next;
457 
458     nxt_debug(&engine->task, "close idle connections");
459 
460     idle = &engine->idle_connections;
461 
462     for (link = nxt_queue_first(idle);
463          link != nxt_queue_tail(idle);
464          link = next)
465     {
466         next = nxt_queue_next(link);
467         c = nxt_queue_link_data(link, nxt_conn_t, link);
468 
469         if (!c->socket.read_ready) {
470             nxt_queue_remove(link);
471             nxt_conn_close(engine, c);
472         }
473     }
474 }
475 
476 
477 static void
478 nxt_runtime_exit(nxt_task_t *task, void *obj, void *data)
479 {
480     int                 status;
481     nxt_runtime_t       *rt;
482     nxt_process_t       *process;
483     nxt_event_engine_t  *engine;
484 
485     rt = obj;
486     engine = data;
487 
488     nxt_debug(task, "thread pools: %d", rt->thread_pools->nelts);
489 
490     if (!nxt_array_is_empty(rt->thread_pools)) {
491         return;
492     }
493 
494     if (rt->type == NXT_PROCESS_MAIN) {
495         if (rt->pid_file != NULL) {
496             nxt_file_delete(rt->pid_file);
497         }
498 
499 #if (NXT_HAVE_UNIX_DOMAIN)
500         {
501             nxt_sockaddr_t   *sa;
502             nxt_file_name_t  *name;
503 
504             sa = rt->controller_listen;
505 
506             if (sa->u.sockaddr.sa_family == AF_UNIX) {
507                 name = (nxt_file_name_t *) sa->u.sockaddr_un.sun_path;
508                 (void) nxt_file_delete(name);
509             }
510         }
511 #endif
512     }
513 
514     if (!engine->event.signal_support) {
515         nxt_event_engine_signals_stop(engine);
516     }
517 
518     nxt_runtime_process_each(rt, process) {
519 
520         nxt_process_close_ports(task, process);
521 
522     } nxt_runtime_process_loop;
523 
524     nxt_thread_mutex_destroy(&rt->processes_mutex);
525 
526     status = rt->status;
527     nxt_mp_destroy(rt->mem_pool);
528 
529     nxt_debug(task, "exit: %d", status);
530 
531     exit(status);
532     nxt_unreachable();
533 }
534 
535 
536 static nxt_int_t
537 nxt_runtime_event_engine_change(nxt_task_t *task, nxt_runtime_t *rt)
538 {
539     nxt_event_engine_t           *engine;
540     const nxt_event_interface_t  *interface;
541 
542     engine = task->thread->engine;
543 
544     if (engine->batch == rt->batch
545         && nxt_strcmp(engine->event.name, rt->engine) == 0)
546     {
547         return NXT_OK;
548     }
549 
550     interface = nxt_service_get(rt->services, "engine", rt->engine);
551 
552     if (interface != NULL) {
553         return nxt_event_engine_change(engine, interface, rt->batch);
554     }
555 
556     return NXT_ERROR;
557 }
558 
559 
560 void
561 nxt_runtime_event_engine_free(nxt_runtime_t *rt)
562 {
563     nxt_queue_link_t    *link;
564     nxt_event_engine_t  *engine;
565 
566     link = nxt_queue_first(&rt->engines);
567     nxt_queue_remove(link);
568 
569     engine = nxt_queue_link_data(link, nxt_event_engine_t, link);
570     nxt_event_engine_free(engine);
571 }
572 
573 
574 nxt_int_t
575 nxt_runtime_thread_pool_create(nxt_thread_t *thr, nxt_runtime_t *rt,
576     nxt_uint_t max_threads, nxt_nsec_t timeout)
577 {
578     nxt_thread_pool_t   *thread_pool, **tp;
579 
580     tp = nxt_array_add(rt->thread_pools);
581     if (tp == NULL) {
582         return NXT_ERROR;
583     }
584 
585     thread_pool = nxt_thread_pool_create(max_threads, timeout,
586                                          nxt_runtime_thread_pool_init,
587                                          thr->engine,
588                                          nxt_runtime_thread_pool_exit);
589 
590     if (nxt_fast_path(thread_pool != NULL)) {
591         *tp = thread_pool;
592     }
593 
594     return NXT_OK;
595 }
596 
597 
598 static void
599 nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt,
600     nxt_runtime_cont_t cont)
601 {
602     nxt_uint_t         n;
603     nxt_thread_pool_t  **tp;
604 
605     rt->continuation = cont;
606 
607     n = rt->thread_pools->nelts;
608 
609     if (n == 0) {
610         cont(task, 0);
611         return;
612     }
613 
614     tp = rt->thread_pools->elts;
615 
616     do {
617         nxt_thread_pool_destroy(*tp);
618 
619         tp++;
620         n--;
621     } while (n != 0);
622 }
623 
624 
625 static void
626 nxt_runtime_thread_pool_init(void)
627 {
628 #if (NXT_REGEX)
629     nxt_regex_init(0);
630 #endif
631 }
632 
633 
634 static void
635 nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj, void *data)
636 {
637     nxt_uint_t           i, n;
638     nxt_runtime_t        *rt;
639     nxt_thread_pool_t    *tp, **thread_pools;
640     nxt_thread_handle_t  handle;
641 
642     tp = obj;
643 
644     if (data != NULL) {
645         handle = (nxt_thread_handle_t) (uintptr_t) data;
646         nxt_thread_wait(handle);
647     }
648 
649     rt = task->thread->runtime;
650 
651     thread_pools = rt->thread_pools->elts;
652     n = rt->thread_pools->nelts;
653 
654     nxt_debug(task, "thread pools: %ui", n);
655 
656     for (i = 0; i < n; i++) {
657 
658         if (tp == thread_pools[i]) {
659             nxt_array_remove(rt->thread_pools, &thread_pools[i]);
660 
661             nxt_free(tp);
662 
663             if (n == 1) {
664                 /* The last thread pool. */
665                 rt->continuation(task, 0);
666             }
667 
668             return;
669         }
670     }
671 }
672 
673 
674 static nxt_int_t
675 nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt)
676 {
677     nxt_int_t                    ret;
678     nxt_str_t                    control;
679     nxt_uint_t                   n;
680     nxt_file_t                   *file;
681     const char                   *slash;
682     nxt_sockaddr_t               *sa;
683     nxt_file_name_str_t          file_name;
684     const nxt_event_interface_t  *interface;
685 
686     rt->daemon = 1;
687     rt->engine_connections = 256;
688     rt->auxiliary_threads = 2;
689     rt->user_cred.user = NXT_USER;
690     rt->group = NXT_GROUP;
691     rt->pid = NXT_PID;
692     rt->log = NXT_LOG;
693     rt->modules = NXT_MODULES;
694     rt->state = NXT_STATE;
695     rt->control = NXT_CONTROL_SOCK;
696 
697     nxt_memzero(&rt->capabilities, sizeof(nxt_capabilities_t));
698 
699     if (nxt_runtime_conf_read_cmd(task, rt) != NXT_OK) {
700         return NXT_ERROR;
701     }
702 
703     if (nxt_capability_set(task, &rt->capabilities) != NXT_OK) {
704         return NXT_ERROR;
705     }
706 
707     if (rt->capabilities.setid) {
708         if (nxt_user_cred_get(task, &rt->user_cred, rt->group) != NXT_OK) {
709             return NXT_ERROR;
710         }
711 
712     } else {
713         nxt_log(task, NXT_LOG_WARN, "Unit is running unprivileged, then it "
714                 "cannot use arbitrary user and group.");
715     }
716 
717     /* An engine's parameters. */
718 
719     interface = nxt_service_get(rt->services, "engine", rt->engine);
720     if (interface == NULL) {
721         return NXT_ERROR;
722     }
723 
724     rt->engine = interface->name;
725 
726     ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%Z", rt->pid);
727     if (nxt_slow_path(ret != NXT_OK)) {
728         return NXT_ERROR;
729     }
730 
731     rt->pid_file = file_name.start;
732 
733     ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%Z", rt->log);
734     if (nxt_slow_path(ret != NXT_OK)) {
735         return NXT_ERROR;
736     }
737 
738     file = nxt_list_first(rt->log_files);
739     file->name = file_name.start;
740 
741     slash = "";
742     n = nxt_strlen(rt->modules);
743 
744     if (n > 1 && rt->modules[n - 1] != '/') {
745         slash = "/";
746     }
747 
748     ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%s*.unit.so%Z",
749                                rt->modules, slash);
750     if (nxt_slow_path(ret != NXT_OK)) {
751         return NXT_ERROR;
752     }
753 
754     rt->modules = (char *) file_name.start;
755 
756     slash = "";
757     n = nxt_strlen(rt->state);
758 
759     if (n > 1 && rt->state[n - 1] != '/') {
760         slash = "/";
761     }
762 
763     ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%sconf.json%Z",
764                                rt->state, slash);
765     if (nxt_slow_path(ret != NXT_OK)) {
766         return NXT_ERROR;
767     }
768 
769     rt->conf = (char *) file_name.start;
770 
771     ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s.tmp%Z", rt->conf);
772     if (nxt_slow_path(ret != NXT_OK)) {
773         return NXT_ERROR;
774     }
775 
776     rt->conf_tmp = (char *) file_name.start;
777 
778     ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%scerts/%Z",
779                                rt->state, slash);
780     if (nxt_slow_path(ret != NXT_OK)) {
781         return NXT_ERROR;
782     }
783 
784     ret = mkdir((char *) file_name.start, S_IRWXU);
785 
786     if (nxt_fast_path(ret == 0 || nxt_errno == EEXIST)) {
787         rt->certs.length = file_name.len;
788         rt->certs.start = file_name.start;
789 
790     } else {
791         nxt_alert(task, "Unable to create certificates storage directory: "
792                   "mkdir(%s) failed %E", file_name.start, nxt_errno);
793     }
794 
795     control.length = nxt_strlen(rt->control);
796     control.start = (u_char *) rt->control;
797 
798     sa = nxt_sockaddr_parse(rt->mem_pool, &control);
799     if (nxt_slow_path(sa == NULL)) {
800         return NXT_ERROR;
801     }
802 
803     sa->type = SOCK_STREAM;
804 
805     rt->controller_listen = sa;
806 
807     if (nxt_runtime_controller_socket(task, rt) != NXT_OK) {
808         return NXT_ERROR;
809     }
810 
811     return NXT_OK;
812 }
813 
814 
815 static nxt_int_t
816 nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt)
817 {
818     char    *p, **argv;
819     u_char  *end;
820     u_char  buf[1024];
821 
822     static const char  version[] =
823         "unit version: " NXT_VERSION "\n"
824         "configured as ./configure" NXT_CONFIGURE_OPTIONS "\n";
825 
826     static const char  no_control[] =
827                        "option \"--control\" requires socket address\n";
828     static const char  no_user[] = "option \"--user\" requires username\n";
829     static const char  no_group[] = "option \"--group\" requires group name\n";
830     static const char  no_pid[] = "option \"--pid\" requires filename\n";
831     static const char  no_log[] = "option \"--log\" requires filename\n";
832     static const char  no_modules[] =
833                        "option \"--modules\" requires directory\n";
834     static const char  no_state[] = "option \"--state\" requires directory\n";
835 
836     static const char  help[] =
837         "\n"
838         "unit options:\n"
839         "\n"
840         "  --version            print unit version and configure options\n"
841         "\n"
842         "  --no-daemon          run unit in non-daemon mode\n"
843         "\n"
844         "  --control ADDRESS    set address of control API socket\n"
845         "                       default: \"" NXT_CONTROL_SOCK "\"\n"
846         "\n"
847         "  --pid FILE           set pid filename\n"
848         "                       default: \"" NXT_PID "\"\n"
849         "\n"
850         "  --log FILE           set log filename\n"
851         "                       default: \"" NXT_LOG "\"\n"
852         "\n"
853         "  --modules DIRECTORY  set modules directory name\n"
854         "                       default: \"" NXT_MODULES "\"\n"
855         "\n"
856         "  --state DIRECTORY    set state directory name\n"
857         "                       default: \"" NXT_STATE "\"\n"
858         "\n"
859         "  --user USER          set non-privileged processes to run"
860                                 " as specified user\n"
861         "                       default: \"" NXT_USER "\"\n"
862         "\n"
863         "  --group GROUP        set non-privileged processes to run"
864                                 " as specified group\n"
865         "                       default: ";
866 
867     static const char  group[] = "\"" NXT_GROUP "\"\n\n";
868     static const char  primary[] = "user's primary group\n\n";
869 
870     argv = &nxt_process_argv[1];
871 
872     while (*argv != NULL) {
873         p = *argv++;
874 
875         if (nxt_strcmp(p, "--control") == 0) {
876             if (*argv == NULL) {
877                 write(STDERR_FILENO, no_control, nxt_length(no_control));
878                 return NXT_ERROR;
879             }
880 
881             p = *argv++;
882 
883             rt->control = p;
884 
885             continue;
886         }
887 
888         if (nxt_strcmp(p, "--user") == 0) {
889             if (*argv == NULL) {
890                 write(STDERR_FILENO, no_user, nxt_length(no_user));
891                 return NXT_ERROR;
892             }
893 
894             p = *argv++;
895 
896             rt->user_cred.user = p;
897 
898             continue;
899         }
900 
901         if (nxt_strcmp(p, "--group") == 0) {
902             if (*argv == NULL) {
903                 write(STDERR_FILENO, no_group, nxt_length(no_group));
904                 return NXT_ERROR;
905             }
906 
907             p = *argv++;
908 
909             rt->group = p;
910 
911             continue;
912         }
913 
914         if (nxt_strcmp(p, "--pid") == 0) {
915             if (*argv == NULL) {
916                 write(STDERR_FILENO, no_pid, nxt_length(no_pid));
917                 return NXT_ERROR;
918             }
919 
920             p = *argv++;
921 
922             rt->pid = p;
923 
924             continue;
925         }
926 
927         if (nxt_strcmp(p, "--log") == 0) {
928             if (*argv == NULL) {
929                 write(STDERR_FILENO, no_log, nxt_length(no_log));
930                 return NXT_ERROR;
931             }
932 
933             p = *argv++;
934 
935             rt->log = p;
936 
937             continue;
938         }
939 
940         if (nxt_strcmp(p, "--modules") == 0) {
941             if (*argv == NULL) {
942                 write(STDERR_FILENO, no_modules, nxt_length(no_modules));
943                 return NXT_ERROR;
944             }
945 
946             p = *argv++;
947 
948             rt->modules = p;
949 
950             continue;
951         }
952 
953         if (nxt_strcmp(p, "--state") == 0) {
954             if (*argv == NULL) {
955                 write(STDERR_FILENO, no_state, nxt_length(no_state));
956                 return NXT_ERROR;
957             }
958 
959             p = *argv++;
960 
961             rt->state = p;
962 
963             continue;
964         }
965 
966         if (nxt_strcmp(p, "--no-daemon") == 0) {
967             rt->daemon = 0;
968             continue;
969         }
970 
971         if (nxt_strcmp(p, "--version") == 0) {
972             write(STDERR_FILENO, version, nxt_length(version));
973             exit(0);
974         }
975 
976         if (nxt_strcmp(p, "--help") == 0 || nxt_strcmp(p, "-h") == 0) {
977             write(STDOUT_FILENO, help, nxt_length(help));
978 
979             if (sizeof(NXT_GROUP) == 1) {
980                 write(STDOUT_FILENO, primary, nxt_length(primary));
981 
982             } else {
983                 write(STDOUT_FILENO, group, nxt_length(group));
984             }
985 
986             exit(0);
987         }
988 
989         end = nxt_sprintf(buf, buf + sizeof(buf), "unknown option \"%s\", "
990                           "try \"%s -h\" for available options\n",
991                           p, nxt_process_argv[0]);
992 
993         write(STDERR_FILENO, buf, end - buf);
994 
995         return NXT_ERROR;
996     }
997 
998     return NXT_OK;
999 }
1000 
1001 
1002 nxt_listen_socket_t *
1003 nxt_runtime_listen_socket_add(nxt_runtime_t *rt, nxt_sockaddr_t *sa)
1004 {
1005     nxt_mp_t             *mp;
1006     nxt_listen_socket_t  *ls;
1007 
1008     ls = nxt_array_zero_add(rt->listen_sockets);
1009     if (ls == NULL) {
1010         return NULL;
1011     }
1012 
1013     mp = rt->mem_pool;
1014 
1015     ls->sockaddr = nxt_sockaddr_create(mp, &sa->u.sockaddr, sa->socklen,
1016                                        sa->length);
1017     if (ls->sockaddr == NULL) {
1018         return NULL;
1019     }
1020 
1021     ls->sockaddr->type = sa->type;
1022 
1023     nxt_sockaddr_text(ls->sockaddr);
1024 
1025     ls->socket = -1;
1026     ls->backlog = NXT_LISTEN_BACKLOG;
1027 
1028     return ls;
1029 }
1030 
1031 
1032 static nxt_int_t
1033 nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt)
1034 {
1035     size_t  length;
1036     char    hostname[NXT_MAXHOSTNAMELEN + 1];
1037 
1038     if (gethostname(hostname, NXT_MAXHOSTNAMELEN) != 0) {
1039         nxt_alert(task, "gethostname() failed %E", nxt_errno);
1040         return NXT_ERROR;
1041     }
1042 
1043     /*
1044      * Linux gethostname(2):
1045      *
1046      *    If the null-terminated hostname is too large to fit,
1047      *    then the name is truncated, and no error is returned.
1048      *
1049      * For this reason an additional byte is reserved in the buffer.
1050      */
1051     hostname[NXT_MAXHOSTNAMELEN] = '\0';
1052 
1053     length = nxt_strlen(hostname);
1054     rt->hostname.length = length;
1055 
1056     rt->hostname.start = nxt_mp_nget(rt->mem_pool, length);
1057 
1058     if (rt->hostname.start != NULL) {
1059         nxt_memcpy_lowcase(rt->hostname.start, (u_char *) hostname, length);
1060         return NXT_OK;
1061     }
1062 
1063     return NXT_ERROR;
1064 }
1065 
1066 
1067 static nxt_int_t
1068 nxt_runtime_log_files_init(nxt_runtime_t *rt)
1069 {
1070     nxt_file_t  *file;
1071     nxt_list_t  *log_files;
1072 
1073     log_files = nxt_list_create(rt->mem_pool, 1, sizeof(nxt_file_t));
1074 
1075     if (nxt_fast_path(log_files != NULL)) {
1076         rt->log_files = log_files;
1077 
1078         /* Preallocate the main log.  This allocation cannot fail. */
1079         file = nxt_list_zero_add(log_files);
1080 
1081         file->fd = NXT_FILE_INVALID;
1082         file->log_level = NXT_LOG_ALERT;
1083 
1084         return NXT_OK;
1085     }
1086 
1087     return NXT_ERROR;
1088 }
1089 
1090 
1091 nxt_file_t *
1092 nxt_runtime_log_file_add(nxt_runtime_t *rt, nxt_str_t *name)
1093 {
1094     nxt_int_t            ret;
1095     nxt_file_t           *file;
1096     nxt_file_name_str_t  file_name;
1097 
1098     ret = nxt_file_name_create(rt->mem_pool, &file_name, "V%Z", name);
1099 
1100     if (nxt_slow_path(ret != NXT_OK)) {
1101         return NULL;
1102     }
1103 
1104     nxt_list_each(file, rt->log_files) {
1105 
1106         /* STUB: hardecoded case sensitive/insensitive. */
1107 
1108         if (file->name != NULL
1109             && nxt_file_name_eq(file->name, file_name.start))
1110         {
1111             return file;
1112         }
1113 
1114     } nxt_list_loop;
1115 
1116     file = nxt_list_zero_add(rt->log_files);
1117 
1118     if (nxt_slow_path(file == NULL)) {
1119         return NULL;
1120     }
1121 
1122     file->fd = NXT_FILE_INVALID;
1123     file->log_level = NXT_LOG_ALERT;
1124     file->name = file_name.start;
1125 
1126     return file;
1127 }
1128 
1129 
1130 static nxt_int_t
1131 nxt_runtime_log_files_create(nxt_task_t *task, nxt_runtime_t *rt)
1132 {
1133     nxt_int_t   ret;
1134     nxt_file_t  *file;
1135 
1136     nxt_list_each(file, rt->log_files) {
1137 
1138         ret = nxt_file_open(task, file, O_WRONLY | O_APPEND, O_CREAT,
1139                             NXT_FILE_OWNER_ACCESS);
1140 
1141         if (ret != NXT_OK) {
1142             return NXT_ERROR;
1143         }
1144 
1145     } nxt_list_loop;
1146 
1147     file = nxt_list_first(rt->log_files);
1148 
1149     return nxt_file_stderr(file);
1150 }
1151 
1152 
1153 nxt_int_t
1154 nxt_runtime_listen_sockets_create(nxt_task_t *task, nxt_runtime_t *rt)
1155 {
1156     nxt_int_t            ret;
1157     nxt_uint_t           c, p, ncurr, nprev;
1158     nxt_listen_socket_t  *curr, *prev;
1159 
1160     curr = rt->listen_sockets->elts;
1161     ncurr = rt->listen_sockets->nelts;
1162 
1163     if (rt->inherited_sockets != NULL) {
1164         prev = rt->inherited_sockets->elts;
1165         nprev = rt->inherited_sockets->nelts;
1166 
1167     } else {
1168         prev = NULL;
1169         nprev = 0;
1170     }
1171 
1172     for (c = 0; c < ncurr; c++) {
1173 
1174         for (p = 0; p < nprev; p++) {
1175 
1176             if (nxt_sockaddr_cmp(curr[c].sockaddr, prev[p].sockaddr)) {
1177 
1178                 ret = nxt_listen_socket_update(task, &curr[c], &prev[p]);
1179                 if (ret != NXT_OK) {
1180                     return NXT_ERROR;
1181                 }
1182 
1183                 goto next;
1184             }
1185         }
1186 
1187         if (nxt_listen_socket_create(task, &curr[c], 0) != NXT_OK) {
1188             return NXT_ERROR;
1189         }
1190 
1191     next:
1192 
1193         continue;
1194     }
1195 
1196     return NXT_OK;
1197 }
1198 
1199 
1200 nxt_int_t
1201 nxt_runtime_listen_sockets_enable(nxt_task_t *task, nxt_runtime_t *rt)
1202 {
1203     nxt_uint_t           i, n;
1204     nxt_listen_socket_t  *ls;
1205 
1206     ls = rt->listen_sockets->elts;
1207     n = rt->listen_sockets->nelts;
1208 
1209     for (i = 0; i < n; i++) {
1210         if (ls[i].flags == NXT_NONBLOCK) {
1211             if (nxt_listen_event(task, &ls[i]) == NULL) {
1212                 return NXT_ERROR;
1213             }
1214         }
1215     }
1216 
1217     return NXT_OK;
1218 }
1219 
1220 
1221 nxt_str_t *
1222 nxt_current_directory(nxt_mp_t *mp)
1223 {
1224     size_t     length;
1225     u_char     *p;
1226     nxt_str_t  *name;
1227     char       buf[NXT_MAX_PATH_LEN];
1228 
1229     length = nxt_dir_current(buf, NXT_MAX_PATH_LEN);
1230 
1231     if (nxt_fast_path(length != 0)) {
1232         name = nxt_str_alloc(mp, length + 1);
1233 
1234         if (nxt_fast_path(name != NULL)) {
1235             p = nxt_cpymem(name->start, buf, length);
1236             *p = '/';
1237 
1238             return name;
1239         }
1240     }
1241 
1242     return NULL;
1243 }
1244 
1245 
1246 static nxt_int_t
1247 nxt_runtime_pid_file_create(nxt_task_t *task, nxt_file_name_t *pid_file)
1248 {
1249     ssize_t     length;
1250     nxt_int_t   n;
1251     nxt_file_t  file;
1252     u_char      pid[NXT_INT64_T_LEN + nxt_length("\n")];
1253 
1254     nxt_memzero(&file, sizeof(nxt_file_t));
1255 
1256     file.name = pid_file;
1257 
1258     n = nxt_file_open(task, &file, O_WRONLY, O_CREAT | O_TRUNC,
1259                       NXT_FILE_DEFAULT_ACCESS);
1260 
1261     if (n != NXT_OK) {
1262         return NXT_ERROR;
1263     }
1264 
1265     length = nxt_sprintf(pid, pid + sizeof(pid), "%PI%n", nxt_pid) - pid;
1266 
1267     if (nxt_file_write(&file, pid, length, 0) != length) {
1268         return NXT_ERROR;
1269     }
1270 
1271     nxt_file_close(task, &file);
1272 
1273     return NXT_OK;
1274 }
1275 
1276 
1277 nxt_process_t *
1278 nxt_runtime_process_new(nxt_runtime_t *rt)
1279 {
1280     nxt_process_t  *process;
1281 
1282     /* TODO: memory failures. */
1283 
1284     process = nxt_mp_zalloc(rt->mem_pool, sizeof(nxt_process_t));
1285     if (nxt_slow_path(process == NULL)) {
1286         return NULL;
1287     }
1288 
1289     nxt_queue_init(&process->ports);
1290 
1291     nxt_thread_mutex_create(&process->incoming.mutex);
1292     nxt_thread_mutex_create(&process->outgoing.mutex);
1293     nxt_thread_mutex_create(&process->cp_mutex);
1294 
1295     process->use_count = 1;
1296 
1297     return process;
1298 }
1299 
1300 
1301 static void
1302 nxt_runtime_process_destroy(nxt_runtime_t *rt, nxt_process_t *process)
1303 {
1304     nxt_port_t  *port;
1305 
1306     nxt_assert(process->use_count == 0);
1307     nxt_assert(process->registered == 0);
1308 
1309     nxt_port_mmaps_destroy(&process->incoming, 1);
1310     nxt_port_mmaps_destroy(&process->outgoing, 1);
1311 
1312     do {
1313         port = nxt_port_hash_retrieve(&process->connected_ports);
1314 
1315     } while (port != NULL);
1316 
1317     nxt_thread_mutex_destroy(&process->incoming.mutex);
1318     nxt_thread_mutex_destroy(&process->outgoing.mutex);
1319     nxt_thread_mutex_destroy(&process->cp_mutex);
1320 
1321     nxt_mp_free(rt->mem_pool, process);
1322 }
1323 
1324 
1325 static nxt_int_t
1326 nxt_runtime_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data)
1327 {
1328     nxt_process_t  *process;
1329 
1330     process = data;
1331 
1332     if (lhq->key.length == sizeof(nxt_pid_t)
1333         && *(nxt_pid_t *) lhq->key.start == process->pid)
1334     {
1335         return NXT_OK;
1336     }
1337 
1338     return NXT_DECLINED;
1339 }
1340 
1341 static const nxt_lvlhsh_proto_t  lvlhsh_processes_proto  nxt_aligned(64) = {
1342     NXT_LVLHSH_DEFAULT,
1343     nxt_runtime_lvlhsh_pid_test,
1344     nxt_lvlhsh_alloc,
1345     nxt_lvlhsh_free,
1346 };
1347 
1348 
1349 nxt_inline void
1350 nxt_runtime_process_lhq_pid(nxt_lvlhsh_query_t *lhq, nxt_pid_t *pid)
1351 {
1352     lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid));
1353     lhq->key.length = sizeof(*pid);
1354     lhq->key.start = (u_char *) pid;
1355     lhq->proto = &lvlhsh_processes_proto;
1356 }
1357 
1358 
1359 nxt_process_t *
1360 nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid)
1361 {
1362     nxt_process_t       *process;
1363     nxt_lvlhsh_query_t  lhq;
1364 
1365     process = NULL;
1366 
1367     nxt_runtime_process_lhq_pid(&lhq, &pid);
1368 
1369     nxt_thread_mutex_lock(&rt->processes_mutex);
1370 
1371     if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) {
1372         process = lhq.value;
1373 
1374     } else {
1375         nxt_thread_log_debug("process %PI not found", pid);
1376     }
1377 
1378     nxt_thread_mutex_unlock(&rt->processes_mutex);
1379 
1380     return process;
1381 }
1382 
1383 
1384 nxt_process_t *
1385 nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid)
1386 {
1387     nxt_process_t       *process;
1388     nxt_lvlhsh_query_t  lhq;
1389 
1390     nxt_runtime_process_lhq_pid(&lhq, &pid);
1391 
1392     nxt_thread_mutex_lock(&rt->processes_mutex);
1393 
1394     if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) {
1395         nxt_thread_log_debug("process %PI found", pid);
1396 
1397         nxt_thread_mutex_unlock(&rt->processes_mutex);
1398 
1399         process = lhq.value;
1400         process->use_count++;
1401 
1402         return process;
1403     }
1404 
1405     process = nxt_runtime_process_new(rt);
1406     if (nxt_slow_path(process == NULL)) {
1407 
1408         nxt_thread_mutex_unlock(&rt->processes_mutex);
1409 
1410         return NULL;
1411     }
1412 
1413     process->pid = pid;
1414 
1415     lhq.replace = 0;
1416     lhq.value = process;
1417     lhq.pool = rt->mem_pool;
1418 
1419     switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) {
1420 
1421     case NXT_OK:
1422         if (rt->nprocesses == 0) {
1423             rt->mprocess = process;
1424         }
1425 
1426         rt->nprocesses++;
1427 
1428         process->registered = 1;
1429 
1430         nxt_thread_log_debug("process %PI insert", pid);
1431         break;
1432 
1433     default:
1434         nxt_thread_log_debug("process %PI insert failed", pid);
1435         break;
1436     }
1437 
1438     nxt_thread_mutex_unlock(&rt->processes_mutex);
1439 
1440     return process;
1441 }
1442 
1443 
1444 void
1445 nxt_runtime_process_add(nxt_task_t *task, nxt_process_t *process)
1446 {
1447     nxt_port_t          *port;
1448     nxt_runtime_t       *rt;
1449     nxt_lvlhsh_query_t  lhq;
1450 
1451     nxt_assert(process->registered == 0);
1452 
1453     rt = task->thread->runtime;
1454 
1455     nxt_runtime_process_lhq_pid(&lhq, &process->pid);
1456 
1457     lhq.replace = 0;
1458     lhq.value = process;
1459     lhq.pool = rt->mem_pool;
1460 
1461     nxt_thread_mutex_lock(&rt->processes_mutex);
1462 
1463     switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) {
1464 
1465     case NXT_OK:
1466         if (rt->nprocesses == 0) {
1467             rt->mprocess = process;
1468         }
1469 
1470         rt->nprocesses++;
1471 
1472         nxt_process_port_each(process, port) {
1473 
1474             port->pid = process->pid;
1475 
1476             nxt_runtime_port_add(task, port);
1477 
1478         } nxt_process_port_loop;
1479 
1480         process->registered = 1;
1481 
1482         nxt_thread_log_debug("process %PI added", process->pid);
1483         break;
1484 
1485     default:
1486         nxt_thread_log_debug("process %PI failed to add", process->pid);
1487         break;
1488     }
1489 
1490     nxt_thread_mutex_unlock(&rt->processes_mutex);
1491 }
1492 
1493 
1494 static nxt_process_t *
1495 nxt_runtime_process_remove_pid(nxt_runtime_t *rt, nxt_pid_t pid)
1496 {
1497     nxt_process_t       *process;
1498     nxt_lvlhsh_query_t  lhq;
1499 
1500     process = NULL;
1501 
1502     nxt_runtime_process_lhq_pid(&lhq, &pid);
1503 
1504     lhq.pool = rt->mem_pool;
1505 
1506     nxt_thread_mutex_lock(&rt->processes_mutex);
1507 
1508     switch (nxt_lvlhsh_delete(&rt->processes, &lhq)) {
1509 
1510     case NXT_OK:
1511         rt->nprocesses--;
1512 
1513         process = lhq.value;
1514 
1515         process->registered = 0;
1516 
1517         nxt_thread_log_debug("process %PI removed", pid);
1518         break;
1519 
1520     default:
1521         nxt_thread_log_debug("process %PI remove failed", pid);
1522         break;
1523     }
1524 
1525     nxt_thread_mutex_unlock(&rt->processes_mutex);
1526 
1527     return process;
1528 }
1529 
1530 
1531 void
1532 nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i)
1533 {
1534     nxt_runtime_t  *rt;
1535 
1536     process->use_count += i;
1537 
1538     if (process->use_count == 0) {
1539         rt = task->thread->runtime;
1540 
1541         if (process->registered == 1) {
1542             nxt_runtime_process_remove_pid(rt, process->pid);
1543         }
1544 
1545         nxt_runtime_process_destroy(rt, process);
1546     }
1547 }
1548 
1549 
1550 nxt_process_t *
1551 nxt_runtime_process_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe)
1552 {
1553     nxt_lvlhsh_each_init(lhe, &lvlhsh_processes_proto);
1554 
1555     return nxt_runtime_process_next(rt, lhe);
1556 }
1557 
1558 
1559 void
1560 nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port)
1561 {
1562     nxt_int_t      res;
1563     nxt_runtime_t  *rt;
1564 
1565     rt = task->thread->runtime;
1566 
1567     res = nxt_port_hash_add(&rt->ports, port);
1568 
1569     if (res != NXT_OK) {
1570         return;
1571     }
1572 
1573     rt->port_by_type[port->type] = port;
1574 
1575     nxt_port_use(task, port, 1);
1576 }
1577 
1578 
1579 void
1580 nxt_runtime_port_remove(nxt_task_t *task, nxt_port_t *port)
1581 {
1582     nxt_int_t      res;
1583     nxt_runtime_t  *rt;
1584 
1585     rt = task->thread->runtime;
1586 
1587     res = nxt_port_hash_remove(&rt->ports, port);
1588 
1589     if (res != NXT_OK) {
1590         return;
1591     }
1592 
1593     if (rt->port_by_type[port->type] == port) {
1594         rt->port_by_type[port->type] = NULL;
1595     }
1596 
1597     nxt_port_use(task, port, -1);
1598 }
1599 
1600 
1601 nxt_port_t *
1602 nxt_runtime_port_find(nxt_runtime_t *rt, nxt_pid_t pid,
1603     nxt_port_id_t port_id)
1604 {
1605     return nxt_port_hash_find(&rt->ports, pid, port_id);
1606 }
1607