xref: /unit/src/nxt_runtime.c (revision 804:fe8d2dea28dd)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Valentin V. Bartenev
5  * Copyright (C) NGINX, Inc.
6  */
7 
8 #include <nxt_main.h>
9 #include <nxt_runtime.h>
10 #include <nxt_port.h>
11 #include <nxt_main_process.h>
12 #include <nxt_router.h>
13 
14 
15 static nxt_int_t nxt_runtime_inherited_listen_sockets(nxt_task_t *task,
16     nxt_runtime_t *rt);
17 static nxt_int_t nxt_runtime_systemd_listen_sockets(nxt_task_t *task,
18     nxt_runtime_t *rt);
19 static nxt_int_t nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt);
20 static nxt_int_t nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt);
21 static void nxt_runtime_start(nxt_task_t *task, void *obj, void *data);
22 static void nxt_runtime_initial_start(nxt_task_t *task, nxt_uint_t status);
23 static void nxt_runtime_close_idle_connections(nxt_event_engine_t *engine);
24 static void nxt_runtime_exit(nxt_task_t *task, void *obj, void *data);
25 static nxt_int_t nxt_runtime_event_engine_change(nxt_task_t *task,
26     nxt_runtime_t *rt);
27 static nxt_int_t nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt);
28 static nxt_int_t nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt);
29 static nxt_int_t nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt);
30 static nxt_int_t nxt_runtime_log_files_init(nxt_runtime_t *rt);
31 static nxt_int_t nxt_runtime_log_files_create(nxt_task_t *task,
32     nxt_runtime_t *rt);
33 static nxt_int_t nxt_runtime_pid_file_create(nxt_task_t *task,
34     nxt_file_name_t *pid_file);
35 static void nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt,
36     nxt_runtime_cont_t cont);
37 static void nxt_runtime_thread_pool_init(void);
38 static void nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj,
39     void *data);
40 static void nxt_runtime_process_destroy(nxt_runtime_t *rt,
41     nxt_process_t *process);
42 static nxt_process_t *nxt_runtime_process_remove_pid(nxt_runtime_t *rt,
43     nxt_pid_t pid);
44 
45 
46 nxt_int_t
47 nxt_runtime_create(nxt_task_t *task)
48 {
49     nxt_mp_t               *mp;
50     nxt_int_t              ret;
51     nxt_array_t            *listen_sockets;
52     nxt_runtime_t          *rt;
53     nxt_app_lang_module_t  *lang;
54 
55     mp = nxt_mp_create(1024, 128, 256, 32);
56 
57     if (nxt_slow_path(mp == NULL)) {
58         return NXT_ERROR;
59     }
60 
61     rt = nxt_mp_zget(mp, sizeof(nxt_runtime_t));
62     if (nxt_slow_path(rt == NULL)) {
63         return NXT_ERROR;
64     }
65 
66     task->thread->runtime = rt;
67     rt->mem_pool = mp;
68 
69     nxt_thread_mutex_create(&rt->processes_mutex);
70 
71     rt->services = nxt_services_init(mp);
72     if (nxt_slow_path(rt->services == NULL)) {
73         goto fail;
74     }
75 
76     rt->languages = nxt_array_create(mp, 1, sizeof(nxt_app_lang_module_t));
77     if (nxt_slow_path(rt->languages == NULL)) {
78         goto fail;
79     }
80 
81     /* Should not fail. */
82     lang = nxt_array_add(rt->languages);
83     lang->type = NXT_APP_EXTERNAL;
84     lang->version = (u_char *) "";
85     lang->file = NULL;
86     lang->module = &nxt_external_module;
87 
88     listen_sockets = nxt_array_create(mp, 1, sizeof(nxt_listen_socket_t));
89     if (nxt_slow_path(listen_sockets == NULL)) {
90         goto fail;
91     }
92 
93     rt->listen_sockets = listen_sockets;
94 
95     ret = nxt_runtime_inherited_listen_sockets(task, rt);
96     if (nxt_slow_path(ret != NXT_OK)) {
97         goto fail;
98     }
99 
100     if (nxt_runtime_hostname(task, rt) != NXT_OK) {
101         goto fail;
102     }
103 
104     if (nxt_slow_path(nxt_runtime_log_files_init(rt) != NXT_OK)) {
105         goto fail;
106     }
107 
108     if (nxt_runtime_event_engines(task, rt) != NXT_OK) {
109         goto fail;
110     }
111 
112     if (nxt_slow_path(nxt_runtime_thread_pools(task->thread, rt) != NXT_OK)) {
113         goto fail;
114     }
115 
116     rt->start = nxt_runtime_initial_start;
117 
118     if (nxt_runtime_conf_init(task, rt) != NXT_OK) {
119         goto fail;
120     }
121 
122     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
123                        nxt_runtime_start, task, rt, NULL);
124 
125     return NXT_OK;
126 
127 fail:
128 
129     nxt_mp_destroy(mp);
130 
131     return NXT_ERROR;
132 }
133 
134 
135 static nxt_int_t
136 nxt_runtime_inherited_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt)
137 {
138     u_char               *v, *p;
139     nxt_int_t            type;
140     nxt_array_t          *inherited_sockets;
141     nxt_socket_t         s;
142     nxt_listen_socket_t  *ls;
143 
144     v = (u_char *) getenv("NGINX");
145 
146     if (v == NULL) {
147         return nxt_runtime_systemd_listen_sockets(task, rt);
148     }
149 
150     nxt_alert(task, "using inherited listen sockets: %s", v);
151 
152     inherited_sockets = nxt_array_create(rt->mem_pool,
153                                          1, sizeof(nxt_listen_socket_t));
154     if (inherited_sockets == NULL) {
155         return NXT_ERROR;
156     }
157 
158     rt->inherited_sockets = inherited_sockets;
159 
160     for (p = v; *p != '\0'; p++) {
161 
162         if (*p == ';') {
163             s = nxt_int_parse(v, p - v);
164 
165             if (nxt_slow_path(s < 0)) {
166                 nxt_alert(task, "invalid socket number \"%s\" "
167                           "in NGINX environment variable, "
168                           "ignoring the rest of the variable", v);
169                 return NXT_ERROR;
170             }
171 
172             v = p + 1;
173 
174             ls = nxt_array_zero_add(inherited_sockets);
175             if (nxt_slow_path(ls == NULL)) {
176                 return NXT_ERROR;
177             }
178 
179             ls->socket = s;
180 
181             ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s);
182             if (nxt_slow_path(ls->sockaddr == NULL)) {
183                 return NXT_ERROR;
184             }
185 
186             type = nxt_socket_getsockopt(task, s, SOL_SOCKET, SO_TYPE);
187             if (nxt_slow_path(type == -1)) {
188                 return NXT_ERROR;
189             }
190 
191             ls->sockaddr->type = (uint16_t) type;
192         }
193     }
194 
195     return NXT_OK;
196 }
197 
198 
199 static nxt_int_t
200 nxt_runtime_systemd_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt)
201 {
202     u_char               *nfd, *pid;
203     nxt_int_t            n;
204     nxt_array_t          *inherited_sockets;
205     nxt_socket_t         s;
206     nxt_listen_socket_t  *ls;
207 
208     /*
209      * Number of listening sockets passed.  The socket
210      * descriptors start from number 3 and are sequential.
211      */
212     nfd = (u_char *) getenv("LISTEN_FDS");
213     if (nfd == NULL) {
214         return NXT_OK;
215     }
216 
217     /* The pid of the service process. */
218     pid = (u_char *) getenv("LISTEN_PID");
219     if (pid == NULL) {
220         return NXT_OK;
221     }
222 
223     n = nxt_int_parse(nfd, nxt_strlen(nfd));
224     if (n < 0) {
225         return NXT_OK;
226     }
227 
228     if (nxt_pid != nxt_int_parse(pid, nxt_strlen(pid))) {
229         return NXT_OK;
230     }
231 
232     nxt_log(task, NXT_LOG_INFO, "using %i systemd listen sockets", n);
233 
234     inherited_sockets = nxt_array_create(rt->mem_pool,
235                                          n, sizeof(nxt_listen_socket_t));
236     if (inherited_sockets == NULL) {
237         return NXT_ERROR;
238     }
239 
240     rt->inherited_sockets = inherited_sockets;
241 
242     for (s = 3; s < n; s++) {
243         ls = nxt_array_zero_add(inherited_sockets);
244         if (nxt_slow_path(ls == NULL)) {
245             return NXT_ERROR;
246         }
247 
248         ls->socket = s;
249 
250         ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s);
251         if (nxt_slow_path(ls->sockaddr == NULL)) {
252             return NXT_ERROR;
253         }
254 
255         ls->sockaddr->type = SOCK_STREAM;
256     }
257 
258     return NXT_OK;
259 }
260 
261 
262 static nxt_int_t
263 nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt)
264 {
265     nxt_thread_t                 *thread;
266     nxt_event_engine_t           *engine;
267     const nxt_event_interface_t  *interface;
268 
269     interface = nxt_service_get(rt->services, "engine", NULL);
270 
271     if (nxt_slow_path(interface == NULL)) {
272         /* TODO: log */
273         return NXT_ERROR;
274     }
275 
276     engine = nxt_event_engine_create(task, interface,
277                                      nxt_main_process_signals, 0, 0);
278 
279     if (nxt_slow_path(engine == NULL)) {
280         return NXT_ERROR;
281     }
282 
283     thread = task->thread;
284     thread->engine = engine;
285 #if 0
286     thread->fiber = &engine->fibers->fiber;
287 #endif
288 
289     engine->id = rt->last_engine_id++;
290     engine->mem_pool = nxt_mp_create(1024, 128, 256, 32);
291 
292     nxt_queue_init(&rt->engines);
293     nxt_queue_insert_tail(&rt->engines, &engine->link);
294 
295     return NXT_OK;
296 }
297 
298 
299 static nxt_int_t
300 nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt)
301 {
302     nxt_int_t    ret;
303     nxt_array_t  *thread_pools;
304 
305     thread_pools = nxt_array_create(rt->mem_pool, 1,
306                                     sizeof(nxt_thread_pool_t *));
307 
308     if (nxt_slow_path(thread_pools == NULL)) {
309         return NXT_ERROR;
310     }
311 
312     rt->thread_pools = thread_pools;
313     ret = nxt_runtime_thread_pool_create(thr, rt, 2, 60000 * 1000000LL);
314 
315     if (nxt_slow_path(ret != NXT_OK)) {
316         return NXT_ERROR;
317     }
318 
319     return NXT_OK;
320 }
321 
322 
323 static void
324 nxt_runtime_start(nxt_task_t *task, void *obj, void *data)
325 {
326     nxt_runtime_t  *rt;
327 
328     rt = obj;
329 
330     nxt_debug(task, "rt conf done");
331 
332     task->thread->log->ctx_handler = NULL;
333     task->thread->log->ctx = NULL;
334 
335     if (nxt_runtime_log_files_create(task, rt) != NXT_OK) {
336         goto fail;
337     }
338 
339     if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) {
340         goto fail;
341     }
342 
343     /*
344      * Thread pools should be destroyed before starting worker
345      * processes, because thread pool semaphores will stick in
346      * locked state in new processes after fork().
347      */
348     nxt_runtime_thread_pool_destroy(task, rt, rt->start);
349 
350     return;
351 
352 fail:
353 
354     nxt_runtime_quit(task, 1);
355 }
356 
357 
358 static void
359 nxt_runtime_initial_start(nxt_task_t *task, nxt_uint_t status)
360 {
361     nxt_int_t                    ret;
362     nxt_thread_t                 *thr;
363     nxt_runtime_t                *rt;
364     const nxt_event_interface_t  *interface;
365 
366     thr = task->thread;
367     rt = thr->runtime;
368 
369     if (rt->inherited_sockets == NULL && rt->daemon) {
370 
371         if (nxt_process_daemon(task) != NXT_OK) {
372             goto fail;
373         }
374 
375         /*
376          * An event engine should be updated after fork()
377          * even if an event facility was not changed because:
378          * 1) inherited kqueue descriptor is invalid,
379          * 2) the signal thread is not inherited.
380          */
381         interface = nxt_service_get(rt->services, "engine", rt->engine);
382         if (interface == NULL) {
383             goto fail;
384         }
385 
386         ret = nxt_event_engine_change(task->thread->engine, interface,
387                                       rt->batch);
388         if (ret != NXT_OK) {
389             goto fail;
390         }
391     }
392 
393     ret = nxt_runtime_pid_file_create(task, rt->pid_file);
394     if (ret != NXT_OK) {
395         goto fail;
396     }
397 
398     if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) {
399         goto fail;
400     }
401 
402     thr->engine->max_connections = rt->engine_connections;
403 
404     if (nxt_main_process_start(thr, task, rt) != NXT_ERROR) {
405         return;
406     }
407 
408 fail:
409 
410     nxt_runtime_quit(task, 1);
411 }
412 
413 
414 void
415 nxt_runtime_quit(nxt_task_t *task, nxt_uint_t status)
416 {
417     nxt_bool_t          done;
418     nxt_runtime_t       *rt;
419     nxt_event_engine_t  *engine;
420 
421     rt = task->thread->runtime;
422     rt->status |= status;
423     engine = task->thread->engine;
424 
425     nxt_debug(task, "exiting");
426 
427     done = 1;
428 
429     if (!engine->shutdown) {
430         engine->shutdown = 1;
431 
432         if (!nxt_array_is_empty(rt->thread_pools)) {
433             nxt_runtime_thread_pool_destroy(task, rt, nxt_runtime_quit);
434             done = 0;
435         }
436 
437         if (rt->type == NXT_PROCESS_MAIN) {
438             nxt_main_stop_all_processes(task, rt);
439             done = 0;
440         }
441     }
442 
443     nxt_runtime_close_idle_connections(engine);
444 
445     if (done) {
446         nxt_work_queue_add(&engine->fast_work_queue, nxt_runtime_exit,
447                            task, rt, engine);
448     }
449 }
450 
451 
452 static void
453 nxt_runtime_close_idle_connections(nxt_event_engine_t *engine)
454 {
455     nxt_conn_t        *c;
456     nxt_queue_t       *idle;
457     nxt_queue_link_t  *link, *next;
458 
459     nxt_debug(&engine->task, "close idle connections");
460 
461     idle = &engine->idle_connections;
462 
463     for (link = nxt_queue_head(idle);
464          link != nxt_queue_tail(idle);
465          link = next)
466     {
467         next = nxt_queue_next(link);
468         c = nxt_queue_link_data(link, nxt_conn_t, link);
469 
470         if (!c->socket.read_ready) {
471             nxt_queue_remove(link);
472             nxt_conn_close(engine, c);
473         }
474     }
475 }
476 
477 
478 static void
479 nxt_runtime_exit(nxt_task_t *task, void *obj, void *data)
480 {
481     int                 status;
482     nxt_runtime_t       *rt;
483     nxt_process_t       *process;
484     nxt_event_engine_t  *engine;
485 
486     rt = obj;
487     engine = data;
488 
489     nxt_debug(task, "thread pools: %d", rt->thread_pools->nelts);
490 
491     if (!nxt_array_is_empty(rt->thread_pools)) {
492         return;
493     }
494 
495     if (rt->type == NXT_PROCESS_MAIN) {
496         if (rt->pid_file != NULL) {
497             nxt_file_delete(rt->pid_file);
498         }
499 
500 #if (NXT_HAVE_UNIX_DOMAIN)
501         {
502             nxt_sockaddr_t   *sa;
503             nxt_file_name_t  *name;
504 
505             sa = rt->controller_listen;
506 
507             if (sa->u.sockaddr.sa_family == AF_UNIX) {
508                 name = (nxt_file_name_t *) sa->u.sockaddr_un.sun_path;
509                 (void) nxt_file_delete(name);
510             }
511         }
512 #endif
513     }
514 
515     if (!engine->event.signal_support) {
516         nxt_event_engine_signals_stop(engine);
517     }
518 
519     nxt_runtime_process_each(rt, process) {
520 
521         nxt_process_close_ports(task, process);
522 
523     } nxt_runtime_process_loop;
524 
525     nxt_thread_mutex_destroy(&rt->processes_mutex);
526 
527     status = rt->status;
528     nxt_mp_destroy(rt->mem_pool);
529 
530     nxt_debug(task, "exit: %d", status);
531 
532     exit(status);
533     nxt_unreachable();
534 }
535 
536 
537 static nxt_int_t
538 nxt_runtime_event_engine_change(nxt_task_t *task, nxt_runtime_t *rt)
539 {
540     nxt_event_engine_t           *engine;
541     const nxt_event_interface_t  *interface;
542 
543     engine = task->thread->engine;
544 
545     if (engine->batch == rt->batch
546         && nxt_strcmp(engine->event.name, rt->engine) == 0)
547     {
548         return NXT_OK;
549     }
550 
551     interface = nxt_service_get(rt->services, "engine", rt->engine);
552 
553     if (interface != NULL) {
554         return nxt_event_engine_change(engine, interface, rt->batch);
555     }
556 
557     return NXT_ERROR;
558 }
559 
560 
561 void
562 nxt_runtime_event_engine_free(nxt_runtime_t *rt)
563 {
564     nxt_queue_link_t    *link;
565     nxt_event_engine_t  *engine;
566 
567     link = nxt_queue_first(&rt->engines);
568     nxt_queue_remove(link);
569 
570     engine = nxt_queue_link_data(link, nxt_event_engine_t, link);
571     nxt_event_engine_free(engine);
572 }
573 
574 
575 nxt_int_t
576 nxt_runtime_thread_pool_create(nxt_thread_t *thr, nxt_runtime_t *rt,
577     nxt_uint_t max_threads, nxt_nsec_t timeout)
578 {
579     nxt_thread_pool_t   *thread_pool, **tp;
580 
581     tp = nxt_array_add(rt->thread_pools);
582     if (tp == NULL) {
583         return NXT_ERROR;
584     }
585 
586     thread_pool = nxt_thread_pool_create(max_threads, timeout,
587                                          nxt_runtime_thread_pool_init,
588                                          thr->engine,
589                                          nxt_runtime_thread_pool_exit);
590 
591     if (nxt_fast_path(thread_pool != NULL)) {
592         *tp = thread_pool;
593     }
594 
595     return NXT_OK;
596 }
597 
598 
599 static void
600 nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt,
601     nxt_runtime_cont_t cont)
602 {
603     nxt_uint_t         n;
604     nxt_thread_pool_t  **tp;
605 
606     rt->continuation = cont;
607 
608     n = rt->thread_pools->nelts;
609 
610     if (n == 0) {
611         cont(task, 0);
612         return;
613     }
614 
615     tp = rt->thread_pools->elts;
616 
617     do {
618         nxt_thread_pool_destroy(*tp);
619 
620         tp++;
621         n--;
622     } while (n != 0);
623 }
624 
625 
626 static void
627 nxt_runtime_thread_pool_init(void)
628 {
629 #if (NXT_REGEX)
630     nxt_regex_init(0);
631 #endif
632 }
633 
634 
635 static void
636 nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj, void *data)
637 {
638     nxt_uint_t           i, n;
639     nxt_runtime_t        *rt;
640     nxt_thread_pool_t    *tp, **thread_pools;
641     nxt_thread_handle_t  handle;
642 
643     tp = obj;
644 
645     if (data != NULL) {
646         handle = (nxt_thread_handle_t) (uintptr_t) data;
647         nxt_thread_wait(handle);
648     }
649 
650     rt = task->thread->runtime;
651 
652     thread_pools = rt->thread_pools->elts;
653     n = rt->thread_pools->nelts;
654 
655     nxt_debug(task, "thread pools: %ui", n);
656 
657     for (i = 0; i < n; i++) {
658 
659         if (tp == thread_pools[i]) {
660             nxt_array_remove(rt->thread_pools, &thread_pools[i]);
661 
662             if (n == 1) {
663                 /* The last thread pool. */
664                 rt->continuation(task, 0);
665             }
666 
667             return;
668         }
669     }
670 }
671 
672 
673 static nxt_int_t
674 nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt)
675 {
676     nxt_int_t                    ret;
677     nxt_str_t                    control;
678     nxt_uint_t                   n;
679     nxt_file_t                   *file;
680     const char                   *slash;
681     nxt_sockaddr_t               *sa;
682     nxt_file_name_str_t          file_name;
683     const nxt_event_interface_t  *interface;
684 
685     rt->daemon = 1;
686     rt->engine_connections = 256;
687     rt->auxiliary_threads = 2;
688     rt->user_cred.user = NXT_USER;
689     rt->group = NXT_GROUP;
690     rt->pid = NXT_PID;
691     rt->log = NXT_LOG;
692     rt->modules = NXT_MODULES;
693     rt->state = NXT_STATE;
694     rt->control = NXT_CONTROL_SOCK;
695 
696     if (nxt_runtime_conf_read_cmd(task, rt) != NXT_OK) {
697         return NXT_ERROR;
698     }
699 
700     if (nxt_user_cred_get(task, &rt->user_cred, rt->group) != NXT_OK) {
701         return NXT_ERROR;
702     }
703 
704     /* An engine's parameters. */
705 
706     interface = nxt_service_get(rt->services, "engine", rt->engine);
707     if (interface == NULL) {
708         return NXT_ERROR;
709     }
710 
711     rt->engine = interface->name;
712 
713     ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%Z", rt->pid);
714     if (nxt_slow_path(ret != NXT_OK)) {
715         return NXT_ERROR;
716     }
717 
718     rt->pid_file = file_name.start;
719 
720     ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%Z", rt->log);
721     if (nxt_slow_path(ret != NXT_OK)) {
722         return NXT_ERROR;
723     }
724 
725     file = nxt_list_first(rt->log_files);
726     file->name = file_name.start;
727 
728     slash = "";
729     n = nxt_strlen(rt->modules);
730 
731     if (n > 1 && rt->modules[n - 1] != '/') {
732         slash = "/";
733     }
734 
735     ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%s*.unit.so%Z",
736                                rt->modules, slash);
737     if (nxt_slow_path(ret != NXT_OK)) {
738         return NXT_ERROR;
739     }
740 
741     rt->modules = (char *) file_name.start;
742 
743     slash = "";
744     n = nxt_strlen(rt->state);
745 
746     if (n > 1 && rt->state[n - 1] != '/') {
747         slash = "/";
748     }
749 
750     ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%sconf.json%Z",
751                                rt->state, slash);
752     if (nxt_slow_path(ret != NXT_OK)) {
753         return NXT_ERROR;
754     }
755 
756     rt->conf = (char *) file_name.start;
757 
758     ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s.tmp%Z", rt->conf);
759     if (nxt_slow_path(ret != NXT_OK)) {
760         return NXT_ERROR;
761     }
762 
763     rt->conf_tmp = (char *) file_name.start;
764 
765     ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%scerts/%Z",
766                                rt->state, slash);
767     if (nxt_slow_path(ret != NXT_OK)) {
768         return NXT_ERROR;
769     }
770 
771     ret = mkdir((char *) file_name.start, S_IRWXU);
772 
773     if (nxt_fast_path(ret == 0 || nxt_errno == EEXIST)) {
774         rt->certs.length = file_name.len;
775         rt->certs.start = file_name.start;
776 
777     } else {
778         nxt_alert(task, "Unable to create certificates storage directory: "
779                   "mkdir(%s) failed %E", file_name.start, nxt_errno);
780     }
781 
782     control.length = nxt_strlen(rt->control);
783     control.start = (u_char *) rt->control;
784 
785     sa = nxt_sockaddr_parse(rt->mem_pool, &control);
786     if (nxt_slow_path(sa == NULL)) {
787         return NXT_ERROR;
788     }
789 
790     sa->type = SOCK_STREAM;
791 
792     rt->controller_listen = sa;
793 
794     if (nxt_runtime_controller_socket(task, rt) != NXT_OK) {
795         return NXT_ERROR;
796     }
797 
798     return NXT_OK;
799 }
800 
801 
802 static nxt_int_t
803 nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt)
804 {
805     char    *p, **argv;
806     u_char  *end;
807     u_char  buf[1024];
808 
809     static const char  version[] =
810         "unit version: " NXT_VERSION "\n"
811         "configured as ./configure" NXT_CONFIGURE_OPTIONS "\n";
812 
813     static const char  no_control[] =
814                        "option \"--control\" requires socket address\n";
815     static const char  no_user[] = "option \"--user\" requires username\n";
816     static const char  no_group[] = "option \"--group\" requires group name\n";
817     static const char  no_pid[] = "option \"--pid\" requires filename\n";
818     static const char  no_log[] = "option \"--log\" requires filename\n";
819     static const char  no_modules[] =
820                        "option \"--modules\" requires directory\n";
821     static const char  no_state[] = "option \"--state\" requires directory\n";
822 
823     static const char  help[] =
824         "\n"
825         "unit options:\n"
826         "\n"
827         "  --version            print unit version and configure options\n"
828         "\n"
829         "  --no-daemon          run unit in non-daemon mode\n"
830         "\n"
831         "  --control ADDRESS    set address of control API socket\n"
832         "                       default: \"" NXT_CONTROL_SOCK "\"\n"
833         "\n"
834         "  --pid FILE           set pid filename\n"
835         "                       default: \"" NXT_PID "\"\n"
836         "\n"
837         "  --log FILE           set log filename\n"
838         "                       default: \"" NXT_LOG "\"\n"
839         "\n"
840         "  --modules DIRECTORY  set modules directory name\n"
841         "                       default: \"" NXT_MODULES "\"\n"
842         "\n"
843         "  --state DIRECTORY    set state directory name\n"
844         "                       default: \"" NXT_STATE "\"\n"
845         "\n"
846         "  --user USER          set non-privileged processes to run"
847                                 " as specified user\n"
848         "                       default: \"" NXT_USER "\"\n"
849         "\n"
850         "  --group GROUP        set non-privileged processes to run"
851                                 " as specified group\n"
852         "                       default: ";
853 
854     static const char  group[] = "\"" NXT_GROUP "\"\n\n";
855     static const char  primary[] = "user's primary group\n\n";
856 
857     argv = &nxt_process_argv[1];
858 
859     while (*argv != NULL) {
860         p = *argv++;
861 
862         if (nxt_strcmp(p, "--control") == 0) {
863             if (*argv == NULL) {
864                 write(STDERR_FILENO, no_control, nxt_length(no_control));
865                 return NXT_ERROR;
866             }
867 
868             p = *argv++;
869 
870             rt->control = p;
871 
872             continue;
873         }
874 
875         if (nxt_strcmp(p, "--upstream") == 0) {
876             if (*argv == NULL) {
877                 nxt_alert(task, "no argument for option \"--upstream\"");
878                 return NXT_ERROR;
879             }
880 
881             p = *argv++;
882 
883             rt->upstream.length = nxt_strlen(p);
884             rt->upstream.start = (u_char *) p;
885 
886             continue;
887         }
888 
889         if (nxt_strcmp(p, "--user") == 0) {
890             if (*argv == NULL) {
891                 write(STDERR_FILENO, no_user, nxt_length(no_user));
892                 return NXT_ERROR;
893             }
894 
895             p = *argv++;
896 
897             rt->user_cred.user = p;
898 
899             continue;
900         }
901 
902         if (nxt_strcmp(p, "--group") == 0) {
903             if (*argv == NULL) {
904                 write(STDERR_FILENO, no_group, nxt_length(no_group));
905                 return NXT_ERROR;
906             }
907 
908             p = *argv++;
909 
910             rt->group = p;
911 
912             continue;
913         }
914 
915         if (nxt_strcmp(p, "--pid") == 0) {
916             if (*argv == NULL) {
917                 write(STDERR_FILENO, no_pid, nxt_length(no_pid));
918                 return NXT_ERROR;
919             }
920 
921             p = *argv++;
922 
923             rt->pid = p;
924 
925             continue;
926         }
927 
928         if (nxt_strcmp(p, "--log") == 0) {
929             if (*argv == NULL) {
930                 write(STDERR_FILENO, no_log, nxt_length(no_log));
931                 return NXT_ERROR;
932             }
933 
934             p = *argv++;
935 
936             rt->log = p;
937 
938             continue;
939         }
940 
941         if (nxt_strcmp(p, "--modules") == 0) {
942             if (*argv == NULL) {
943                 write(STDERR_FILENO, no_modules, nxt_length(no_modules));
944                 return NXT_ERROR;
945             }
946 
947             p = *argv++;
948 
949             rt->modules = p;
950 
951             continue;
952         }
953 
954         if (nxt_strcmp(p, "--state") == 0) {
955             if (*argv == NULL) {
956                 write(STDERR_FILENO, no_state, nxt_length(no_state));
957                 return NXT_ERROR;
958             }
959 
960             p = *argv++;
961 
962             rt->state = p;
963 
964             continue;
965         }
966 
967         if (nxt_strcmp(p, "--no-daemon") == 0) {
968             rt->daemon = 0;
969             continue;
970         }
971 
972         if (nxt_strcmp(p, "--version") == 0) {
973             write(STDERR_FILENO, version, nxt_length(version));
974             exit(0);
975         }
976 
977         if (nxt_strcmp(p, "--help") == 0 || nxt_strcmp(p, "-h") == 0) {
978             write(STDOUT_FILENO, help, nxt_length(help));
979 
980             if (sizeof(NXT_GROUP) == 1) {
981                 write(STDOUT_FILENO, primary, nxt_length(primary));
982 
983             } else {
984                 write(STDOUT_FILENO, group, nxt_length(group));
985             }
986 
987             exit(0);
988         }
989 
990         end = nxt_sprintf(buf, buf + sizeof(buf), "unknown option \"%s\", "
991                           "try \"%s -h\" for available options\n",
992                           p, nxt_process_argv[0]);
993 
994         write(STDERR_FILENO, buf, end - buf);
995 
996         return NXT_ERROR;
997     }
998 
999     return NXT_OK;
1000 }
1001 
1002 
1003 nxt_listen_socket_t *
1004 nxt_runtime_listen_socket_add(nxt_runtime_t *rt, nxt_sockaddr_t *sa)
1005 {
1006     nxt_mp_t             *mp;
1007     nxt_listen_socket_t  *ls;
1008 
1009     ls = nxt_array_zero_add(rt->listen_sockets);
1010     if (ls == NULL) {
1011         return NULL;
1012     }
1013 
1014     mp = rt->mem_pool;
1015 
1016     ls->sockaddr = nxt_sockaddr_create(mp, &sa->u.sockaddr, sa->socklen,
1017                                        sa->length);
1018     if (ls->sockaddr == NULL) {
1019         return NULL;
1020     }
1021 
1022     ls->sockaddr->type = sa->type;
1023 
1024     nxt_sockaddr_text(ls->sockaddr);
1025 
1026     ls->socket = -1;
1027     ls->backlog = NXT_LISTEN_BACKLOG;
1028 
1029     return ls;
1030 }
1031 
1032 
1033 static nxt_int_t
1034 nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt)
1035 {
1036     size_t  length;
1037     char    hostname[NXT_MAXHOSTNAMELEN + 1];
1038 
1039     if (gethostname(hostname, NXT_MAXHOSTNAMELEN) != 0) {
1040         nxt_alert(task, "gethostname() failed %E", nxt_errno);
1041         return NXT_ERROR;
1042     }
1043 
1044     /*
1045      * Linux gethostname(2):
1046      *
1047      *    If the null-terminated hostname is too large to fit,
1048      *    then the name is truncated, and no error is returned.
1049      *
1050      * For this reason an additional byte is reserved in the buffer.
1051      */
1052     hostname[NXT_MAXHOSTNAMELEN] = '\0';
1053 
1054     length = nxt_strlen(hostname);
1055     rt->hostname.length = length;
1056 
1057     rt->hostname.start = nxt_mp_nget(rt->mem_pool, length);
1058 
1059     if (rt->hostname.start != NULL) {
1060         nxt_memcpy_lowcase(rt->hostname.start, (u_char *) hostname, length);
1061         return NXT_OK;
1062     }
1063 
1064     return NXT_ERROR;
1065 }
1066 
1067 
1068 static nxt_int_t
1069 nxt_runtime_log_files_init(nxt_runtime_t *rt)
1070 {
1071     nxt_file_t  *file;
1072     nxt_list_t  *log_files;
1073 
1074     log_files = nxt_list_create(rt->mem_pool, 1, sizeof(nxt_file_t));
1075 
1076     if (nxt_fast_path(log_files != NULL)) {
1077         rt->log_files = log_files;
1078 
1079         /* Preallocate the main log.  This allocation cannot fail. */
1080         file = nxt_list_zero_add(log_files);
1081 
1082         file->fd = NXT_FILE_INVALID;
1083         file->log_level = NXT_LOG_ALERT;
1084 
1085         return NXT_OK;
1086     }
1087 
1088     return NXT_ERROR;
1089 }
1090 
1091 
1092 nxt_file_t *
1093 nxt_runtime_log_file_add(nxt_runtime_t *rt, nxt_str_t *name)
1094 {
1095     nxt_int_t            ret;
1096     nxt_file_t           *file;
1097     nxt_file_name_str_t  file_name;
1098 
1099     ret = nxt_file_name_create(rt->mem_pool, &file_name, "V%Z", name);
1100 
1101     if (nxt_slow_path(ret != NXT_OK)) {
1102         return NULL;
1103     }
1104 
1105     nxt_list_each(file, rt->log_files) {
1106 
1107         /* STUB: hardecoded case sensitive/insensitive. */
1108 
1109         if (file->name != NULL
1110             && nxt_file_name_eq(file->name, file_name.start))
1111         {
1112             return file;
1113         }
1114 
1115     } nxt_list_loop;
1116 
1117     file = nxt_list_zero_add(rt->log_files);
1118 
1119     if (nxt_slow_path(file == NULL)) {
1120         return NULL;
1121     }
1122 
1123     file->fd = NXT_FILE_INVALID;
1124     file->log_level = NXT_LOG_ALERT;
1125     file->name = file_name.start;
1126 
1127     return file;
1128 }
1129 
1130 
1131 static nxt_int_t
1132 nxt_runtime_log_files_create(nxt_task_t *task, nxt_runtime_t *rt)
1133 {
1134     nxt_int_t   ret;
1135     nxt_file_t  *file;
1136 
1137     nxt_list_each(file, rt->log_files) {
1138 
1139         ret = nxt_file_open(task, file, O_WRONLY | O_APPEND, O_CREAT,
1140                             NXT_FILE_OWNER_ACCESS);
1141 
1142         if (ret != NXT_OK) {
1143             return NXT_ERROR;
1144         }
1145 
1146     } nxt_list_loop;
1147 
1148     file = nxt_list_first(rt->log_files);
1149 
1150     return nxt_file_stderr(file);
1151 }
1152 
1153 
1154 nxt_int_t
1155 nxt_runtime_listen_sockets_create(nxt_task_t *task, nxt_runtime_t *rt)
1156 {
1157     nxt_int_t            ret;
1158     nxt_uint_t           c, p, ncurr, nprev;
1159     nxt_listen_socket_t  *curr, *prev;
1160 
1161     curr = rt->listen_sockets->elts;
1162     ncurr = rt->listen_sockets->nelts;
1163 
1164     if (rt->inherited_sockets != NULL) {
1165         prev = rt->inherited_sockets->elts;
1166         nprev = rt->inherited_sockets->nelts;
1167 
1168     } else {
1169         prev = NULL;
1170         nprev = 0;
1171     }
1172 
1173     for (c = 0; c < ncurr; c++) {
1174 
1175         for (p = 0; p < nprev; p++) {
1176 
1177             if (nxt_sockaddr_cmp(curr[c].sockaddr, prev[p].sockaddr)) {
1178 
1179                 ret = nxt_listen_socket_update(task, &curr[c], &prev[p]);
1180                 if (ret != NXT_OK) {
1181                     return NXT_ERROR;
1182                 }
1183 
1184                 goto next;
1185             }
1186         }
1187 
1188         if (nxt_listen_socket_create(task, &curr[c], 0) != NXT_OK) {
1189             return NXT_ERROR;
1190         }
1191 
1192     next:
1193 
1194         continue;
1195     }
1196 
1197     return NXT_OK;
1198 }
1199 
1200 
1201 nxt_int_t
1202 nxt_runtime_listen_sockets_enable(nxt_task_t *task, nxt_runtime_t *rt)
1203 {
1204     nxt_uint_t           i, n;
1205     nxt_listen_socket_t  *ls;
1206 
1207     ls = rt->listen_sockets->elts;
1208     n = rt->listen_sockets->nelts;
1209 
1210     for (i = 0; i < n; i++) {
1211         if (ls[i].flags == NXT_NONBLOCK) {
1212             if (nxt_listen_event(task, &ls[i]) == NULL) {
1213                 return NXT_ERROR;
1214             }
1215         }
1216     }
1217 
1218     return NXT_OK;
1219 }
1220 
1221 
1222 nxt_str_t *
1223 nxt_current_directory(nxt_mp_t *mp)
1224 {
1225     size_t     length;
1226     u_char     *p;
1227     nxt_str_t  *name;
1228     char       buf[NXT_MAX_PATH_LEN];
1229 
1230     length = nxt_dir_current(buf, NXT_MAX_PATH_LEN);
1231 
1232     if (nxt_fast_path(length != 0)) {
1233         name = nxt_str_alloc(mp, length + 1);
1234 
1235         if (nxt_fast_path(name != NULL)) {
1236             p = nxt_cpymem(name->start, buf, length);
1237             *p = '/';
1238 
1239             return name;
1240         }
1241     }
1242 
1243     return NULL;
1244 }
1245 
1246 
1247 static nxt_int_t
1248 nxt_runtime_pid_file_create(nxt_task_t *task, nxt_file_name_t *pid_file)
1249 {
1250     ssize_t     length;
1251     nxt_int_t   n;
1252     nxt_file_t  file;
1253     u_char      pid[NXT_INT64_T_LEN + nxt_length("\n")];
1254 
1255     nxt_memzero(&file, sizeof(nxt_file_t));
1256 
1257     file.name = pid_file;
1258 
1259     n = nxt_file_open(task, &file, O_WRONLY, O_CREAT | O_TRUNC,
1260                       NXT_FILE_DEFAULT_ACCESS);
1261 
1262     if (n != NXT_OK) {
1263         return NXT_ERROR;
1264     }
1265 
1266     length = nxt_sprintf(pid, pid + sizeof(pid), "%PI%n", nxt_pid) - pid;
1267 
1268     if (nxt_file_write(&file, pid, length, 0) != length) {
1269         return NXT_ERROR;
1270     }
1271 
1272     nxt_file_close(task, &file);
1273 
1274     return NXT_OK;
1275 }
1276 
1277 
1278 nxt_process_t *
1279 nxt_runtime_process_new(nxt_runtime_t *rt)
1280 {
1281     nxt_process_t  *process;
1282 
1283     /* TODO: memory failures. */
1284 
1285     process = nxt_mp_zalloc(rt->mem_pool, sizeof(nxt_process_t));
1286     if (nxt_slow_path(process == NULL)) {
1287         return NULL;
1288     }
1289 
1290     nxt_queue_init(&process->ports);
1291 
1292     nxt_thread_mutex_create(&process->incoming.mutex);
1293     nxt_thread_mutex_create(&process->outgoing.mutex);
1294     nxt_thread_mutex_create(&process->cp_mutex);
1295 
1296     process->use_count = 1;
1297 
1298     return process;
1299 }
1300 
1301 
1302 static void
1303 nxt_runtime_process_destroy(nxt_runtime_t *rt, nxt_process_t *process)
1304 {
1305     nxt_port_t  *port;
1306 
1307     nxt_assert(process->use_count == 0);
1308     nxt_assert(process->registered == 0);
1309 
1310     nxt_port_mmaps_destroy(&process->incoming, 1);
1311     nxt_port_mmaps_destroy(&process->outgoing, 1);
1312 
1313     do {
1314         port = nxt_port_hash_retrieve(&process->connected_ports);
1315 
1316     } while (port != NULL);
1317 
1318     nxt_thread_mutex_destroy(&process->incoming.mutex);
1319     nxt_thread_mutex_destroy(&process->outgoing.mutex);
1320     nxt_thread_mutex_destroy(&process->cp_mutex);
1321 
1322     nxt_mp_free(rt->mem_pool, process);
1323 }
1324 
1325 
1326 static nxt_int_t
1327 nxt_runtime_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data)
1328 {
1329     nxt_process_t  *process;
1330 
1331     process = data;
1332 
1333     if (lhq->key.length == sizeof(nxt_pid_t)
1334         && *(nxt_pid_t *) lhq->key.start == process->pid)
1335     {
1336         return NXT_OK;
1337     }
1338 
1339     return NXT_DECLINED;
1340 }
1341 
1342 static const nxt_lvlhsh_proto_t  lvlhsh_processes_proto  nxt_aligned(64) = {
1343     NXT_LVLHSH_DEFAULT,
1344     nxt_runtime_lvlhsh_pid_test,
1345     nxt_lvlhsh_alloc,
1346     nxt_lvlhsh_free,
1347 };
1348 
1349 
1350 nxt_inline void
1351 nxt_runtime_process_lhq_pid(nxt_lvlhsh_query_t *lhq, nxt_pid_t *pid)
1352 {
1353     lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid));
1354     lhq->key.length = sizeof(*pid);
1355     lhq->key.start = (u_char *) pid;
1356     lhq->proto = &lvlhsh_processes_proto;
1357 }
1358 
1359 
1360 nxt_process_t *
1361 nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid)
1362 {
1363     nxt_process_t       *process;
1364     nxt_lvlhsh_query_t  lhq;
1365 
1366     process = NULL;
1367 
1368     nxt_runtime_process_lhq_pid(&lhq, &pid);
1369 
1370     nxt_thread_mutex_lock(&rt->processes_mutex);
1371 
1372     if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) {
1373         process = lhq.value;
1374 
1375     } else {
1376         nxt_thread_log_debug("process %PI not found", pid);
1377     }
1378 
1379     nxt_thread_mutex_unlock(&rt->processes_mutex);
1380 
1381     return process;
1382 }
1383 
1384 
1385 nxt_process_t *
1386 nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid)
1387 {
1388     nxt_process_t       *process;
1389     nxt_lvlhsh_query_t  lhq;
1390 
1391     nxt_runtime_process_lhq_pid(&lhq, &pid);
1392 
1393     nxt_thread_mutex_lock(&rt->processes_mutex);
1394 
1395     if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) {
1396         nxt_thread_log_debug("process %PI found", pid);
1397 
1398         nxt_thread_mutex_unlock(&rt->processes_mutex);
1399 
1400         process = lhq.value;
1401         process->use_count++;
1402 
1403         return process;
1404     }
1405 
1406     process = nxt_runtime_process_new(rt);
1407     if (nxt_slow_path(process == NULL)) {
1408 
1409         nxt_thread_mutex_unlock(&rt->processes_mutex);
1410 
1411         return NULL;
1412     }
1413 
1414     process->pid = pid;
1415 
1416     lhq.replace = 0;
1417     lhq.value = process;
1418     lhq.pool = rt->mem_pool;
1419 
1420     switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) {
1421 
1422     case NXT_OK:
1423         if (rt->nprocesses == 0) {
1424             rt->mprocess = process;
1425         }
1426 
1427         rt->nprocesses++;
1428 
1429         process->registered = 1;
1430 
1431         nxt_thread_log_debug("process %PI insert", pid);
1432         break;
1433 
1434     default:
1435         nxt_thread_log_debug("process %PI insert failed", pid);
1436         break;
1437     }
1438 
1439     nxt_thread_mutex_unlock(&rt->processes_mutex);
1440 
1441     return process;
1442 }
1443 
1444 
1445 void
1446 nxt_runtime_process_add(nxt_task_t *task, nxt_process_t *process)
1447 {
1448     nxt_port_t          *port;
1449     nxt_runtime_t       *rt;
1450     nxt_lvlhsh_query_t  lhq;
1451 
1452     nxt_assert(process->registered == 0);
1453 
1454     rt = task->thread->runtime;
1455 
1456     nxt_runtime_process_lhq_pid(&lhq, &process->pid);
1457 
1458     lhq.replace = 0;
1459     lhq.value = process;
1460     lhq.pool = rt->mem_pool;
1461 
1462     nxt_thread_mutex_lock(&rt->processes_mutex);
1463 
1464     switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) {
1465 
1466     case NXT_OK:
1467         if (rt->nprocesses == 0) {
1468             rt->mprocess = process;
1469         }
1470 
1471         rt->nprocesses++;
1472 
1473         nxt_process_port_each(process, port) {
1474 
1475             port->pid = process->pid;
1476 
1477             nxt_runtime_port_add(task, port);
1478 
1479         } nxt_process_port_loop;
1480 
1481         process->registered = 1;
1482 
1483         nxt_thread_log_debug("process %PI added", process->pid);
1484         break;
1485 
1486     default:
1487         nxt_thread_log_debug("process %PI failed to add", process->pid);
1488         break;
1489     }
1490 
1491     nxt_thread_mutex_unlock(&rt->processes_mutex);
1492 }
1493 
1494 
1495 static nxt_process_t *
1496 nxt_runtime_process_remove_pid(nxt_runtime_t *rt, nxt_pid_t pid)
1497 {
1498     nxt_process_t       *process;
1499     nxt_lvlhsh_query_t  lhq;
1500 
1501     process = NULL;
1502 
1503     nxt_runtime_process_lhq_pid(&lhq, &pid);
1504 
1505     lhq.pool = rt->mem_pool;
1506 
1507     nxt_thread_mutex_lock(&rt->processes_mutex);
1508 
1509     switch (nxt_lvlhsh_delete(&rt->processes, &lhq)) {
1510 
1511     case NXT_OK:
1512         rt->nprocesses--;
1513 
1514         process = lhq.value;
1515 
1516         process->registered = 0;
1517 
1518         nxt_thread_log_debug("process %PI removed", pid);
1519         break;
1520 
1521     default:
1522         nxt_thread_log_debug("process %PI remove failed", pid);
1523         break;
1524     }
1525 
1526     nxt_thread_mutex_unlock(&rt->processes_mutex);
1527 
1528     return process;
1529 }
1530 
1531 
1532 void
1533 nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i)
1534 {
1535     nxt_runtime_t  *rt;
1536 
1537     process->use_count += i;
1538 
1539     if (process->use_count == 0) {
1540         rt = task->thread->runtime;
1541 
1542         if (process->registered == 1) {
1543             nxt_runtime_process_remove_pid(rt, process->pid);
1544         }
1545 
1546         nxt_runtime_process_destroy(rt, process);
1547     }
1548 }
1549 
1550 
1551 nxt_process_t *
1552 nxt_runtime_process_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe)
1553 {
1554     nxt_lvlhsh_each_init(lhe, &lvlhsh_processes_proto);
1555 
1556     return nxt_runtime_process_next(rt, lhe);
1557 }
1558 
1559 
1560 void
1561 nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port)
1562 {
1563     nxt_int_t      res;
1564     nxt_runtime_t  *rt;
1565 
1566     rt = task->thread->runtime;
1567 
1568     res = nxt_port_hash_add(&rt->ports, port);
1569 
1570     if (res != NXT_OK) {
1571         return;
1572     }
1573 
1574     rt->port_by_type[port->type] = port;
1575 
1576     nxt_port_use(task, port, 1);
1577 }
1578 
1579 
1580 void
1581 nxt_runtime_port_remove(nxt_task_t *task, nxt_port_t *port)
1582 {
1583     nxt_int_t      res;
1584     nxt_runtime_t  *rt;
1585 
1586     rt = task->thread->runtime;
1587 
1588     res = nxt_port_hash_remove(&rt->ports, port);
1589 
1590     if (res != NXT_OK) {
1591         return;
1592     }
1593 
1594     if (rt->port_by_type[port->type] == port) {
1595         rt->port_by_type[port->type] = NULL;
1596     }
1597 
1598     nxt_port_use(task, port, -1);
1599 }
1600 
1601 
1602 nxt_port_t *
1603 nxt_runtime_port_find(nxt_runtime_t *rt, nxt_pid_t pid,
1604     nxt_port_id_t port_id)
1605 {
1606     return nxt_port_hash_find(&rt->ports, pid, port_id);
1607 }
1608