xref: /unit/src/nxt_runtime.c (revision 2646:b003301015ea)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Valentin V. Bartenev
5  * Copyright (C) NGINX, Inc.
6  */
7 
8 #include <nxt_main.h>
9 #include <nxt_runtime.h>
10 #include <nxt_port.h>
11 #include <nxt_main_process.h>
12 #include <nxt_router.h>
13 #include <nxt_regex.h>
14 
15 
16 static nxt_int_t nxt_runtime_inherited_listen_sockets(nxt_task_t *task,
17     nxt_runtime_t *rt);
18 static nxt_int_t nxt_runtime_systemd_listen_sockets(nxt_task_t *task,
19     nxt_runtime_t *rt);
20 static nxt_int_t nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt);
21 static nxt_int_t nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt);
22 static void nxt_runtime_start(nxt_task_t *task, void *obj, void *data);
23 static void nxt_runtime_initial_start(nxt_task_t *task, nxt_uint_t status);
24 static void nxt_runtime_close_idle_connections(nxt_event_engine_t *engine);
25 static void nxt_runtime_stop_all_processes(nxt_task_t *task, nxt_runtime_t *rt);
26 static void nxt_runtime_exit(nxt_task_t *task, void *obj, void *data);
27 static nxt_int_t nxt_runtime_event_engine_change(nxt_task_t *task,
28     nxt_runtime_t *rt);
29 static nxt_int_t nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt);
30 static nxt_int_t nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt);
31 static nxt_int_t nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt);
32 static nxt_int_t nxt_runtime_log_files_init(nxt_runtime_t *rt);
33 static nxt_int_t nxt_runtime_log_files_create(nxt_task_t *task,
34     nxt_runtime_t *rt);
35 static nxt_int_t nxt_runtime_pid_file_create(nxt_task_t *task,
36     nxt_file_name_t *pid_file);
37 static void nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt,
38     nxt_runtime_cont_t cont);
39 static void nxt_runtime_thread_pool_init(void);
40 static void nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj,
41     void *data);
42 static nxt_process_t *nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid);
43 static void nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port);
44 
45 
46 nxt_int_t
nxt_runtime_create(nxt_task_t * task)47 nxt_runtime_create(nxt_task_t *task)
48 {
49     nxt_mp_t               *mp;
50     nxt_int_t              ret;
51     nxt_array_t            *listen_sockets;
52     nxt_runtime_t          *rt;
53     nxt_app_lang_module_t  *lang;
54 
55     mp = nxt_mp_create(1024, 128, 256, 32);
56     if (nxt_slow_path(mp == NULL)) {
57         return NXT_ERROR;
58     }
59 
60     rt = nxt_mp_zget(mp, sizeof(nxt_runtime_t));
61     if (nxt_slow_path(rt == NULL)) {
62         goto fail;
63     }
64 
65     task->thread->runtime = rt;
66     rt->mem_pool = mp;
67 
68     nxt_thread_mutex_create(&rt->processes_mutex);
69 
70     rt->services = nxt_services_init(mp);
71     if (nxt_slow_path(rt->services == NULL)) {
72         goto fail;
73     }
74 
75     rt->languages = nxt_array_create(mp, 1, sizeof(nxt_app_lang_module_t));
76     if (nxt_slow_path(rt->languages == NULL)) {
77         goto fail;
78     }
79 
80     /* Should not fail. */
81     lang = nxt_array_add(rt->languages);
82     lang->type = NXT_APP_EXTERNAL;
83     lang->version = (u_char *) "";
84     lang->file = NULL;
85     lang->module = &nxt_external_module;
86 
87     lang->mounts = nxt_array_create(mp, 1, sizeof(nxt_fs_mount_t));
88     if (nxt_slow_path(lang->mounts == NULL)) {
89         goto fail;
90     }
91 
92     listen_sockets = nxt_array_create(mp, 1, sizeof(nxt_listen_socket_t));
93     if (nxt_slow_path(listen_sockets == NULL)) {
94         goto fail;
95     }
96 
97     rt->listen_sockets = listen_sockets;
98 
99     ret = nxt_runtime_inherited_listen_sockets(task, rt);
100     if (nxt_slow_path(ret != NXT_OK)) {
101         goto fail;
102     }
103 
104     if (nxt_runtime_hostname(task, rt) != NXT_OK) {
105         goto fail;
106     }
107 
108     if (nxt_slow_path(nxt_runtime_log_files_init(rt) != NXT_OK)) {
109         goto fail;
110     }
111 
112     if (nxt_runtime_event_engines(task, rt) != NXT_OK) {
113         goto fail;
114     }
115 
116     if (nxt_slow_path(nxt_runtime_thread_pools(task->thread, rt) != NXT_OK)) {
117         goto fail;
118     }
119 
120     rt->start = nxt_runtime_initial_start;
121 
122     if (nxt_runtime_conf_init(task, rt) != NXT_OK) {
123         goto fail;
124     }
125 
126     if (nxt_port_rpc_init() != NXT_OK) {
127         goto fail;
128     }
129 
130     if (nxt_slow_path(nxt_http_register_variables() != NXT_OK)) {
131         goto fail;
132     }
133 
134     if (nxt_slow_path(nxt_var_index_init() != NXT_OK)) {
135         goto fail;
136     }
137 
138     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
139                        nxt_runtime_start, task, rt, NULL);
140 
141     return NXT_OK;
142 
143 fail:
144 
145     nxt_mp_destroy(mp);
146 
147     return NXT_ERROR;
148 }
149 
150 
151 static nxt_int_t
nxt_runtime_inherited_listen_sockets(nxt_task_t * task,nxt_runtime_t * rt)152 nxt_runtime_inherited_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt)
153 {
154     u_char               *v, *p;
155     nxt_int_t            type;
156     nxt_array_t          *inherited_sockets;
157     nxt_socket_t         s;
158     nxt_listen_socket_t  *ls;
159 
160     v = (u_char *) getenv("NGINX");
161 
162     if (v == NULL) {
163         return nxt_runtime_systemd_listen_sockets(task, rt);
164     }
165 
166     nxt_alert(task, "using inherited listen sockets: %s", v);
167 
168     inherited_sockets = nxt_array_create(rt->mem_pool,
169                                          1, sizeof(nxt_listen_socket_t));
170     if (inherited_sockets == NULL) {
171         return NXT_ERROR;
172     }
173 
174     rt->inherited_sockets = inherited_sockets;
175 
176     for (p = v; *p != '\0'; p++) {
177 
178         if (*p == ';') {
179             s = nxt_int_parse(v, p - v);
180 
181             if (nxt_slow_path(s < 0)) {
182                 nxt_alert(task, "invalid socket number \"%s\" "
183                           "in NGINX environment variable, "
184                           "ignoring the rest of the variable", v);
185                 return NXT_ERROR;
186             }
187 
188             v = p + 1;
189 
190             ls = nxt_array_zero_add(inherited_sockets);
191             if (nxt_slow_path(ls == NULL)) {
192                 return NXT_ERROR;
193             }
194 
195             ls->socket = s;
196 
197             ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s);
198             if (nxt_slow_path(ls->sockaddr == NULL)) {
199                 return NXT_ERROR;
200             }
201 
202             type = nxt_socket_getsockopt(task, s, SOL_SOCKET, SO_TYPE);
203             if (nxt_slow_path(type == -1)) {
204                 return NXT_ERROR;
205             }
206 
207             ls->sockaddr->type = (uint16_t) type;
208         }
209     }
210 
211     return NXT_OK;
212 }
213 
214 
215 static nxt_int_t
nxt_runtime_systemd_listen_sockets(nxt_task_t * task,nxt_runtime_t * rt)216 nxt_runtime_systemd_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt)
217 {
218     u_char               *nfd, *pid;
219     nxt_int_t            n;
220     nxt_array_t          *inherited_sockets;
221     nxt_socket_t         s;
222     nxt_listen_socket_t  *ls;
223 
224     /*
225      * Number of listening sockets passed.  The socket
226      * descriptors start from number 3 and are sequential.
227      */
228     nfd = (u_char *) getenv("LISTEN_FDS");
229     if (nfd == NULL) {
230         return NXT_OK;
231     }
232 
233     /* The pid of the service process. */
234     pid = (u_char *) getenv("LISTEN_PID");
235     if (pid == NULL) {
236         return NXT_OK;
237     }
238 
239     n = nxt_int_parse(nfd, nxt_strlen(nfd));
240     if (n < 0) {
241         return NXT_OK;
242     }
243 
244     if (nxt_pid != nxt_int_parse(pid, nxt_strlen(pid))) {
245         return NXT_OK;
246     }
247 
248     nxt_log(task, NXT_LOG_INFO, "using %i systemd listen sockets", n);
249 
250     inherited_sockets = nxt_array_create(rt->mem_pool,
251                                          n, sizeof(nxt_listen_socket_t));
252     if (inherited_sockets == NULL) {
253         return NXT_ERROR;
254     }
255 
256     rt->inherited_sockets = inherited_sockets;
257 
258     for (s = 3; s < n; s++) {
259         ls = nxt_array_zero_add(inherited_sockets);
260         if (nxt_slow_path(ls == NULL)) {
261             return NXT_ERROR;
262         }
263 
264         ls->socket = s;
265 
266         ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s);
267         if (nxt_slow_path(ls->sockaddr == NULL)) {
268             return NXT_ERROR;
269         }
270 
271         ls->sockaddr->type = SOCK_STREAM;
272     }
273 
274     return NXT_OK;
275 }
276 
277 
278 static nxt_int_t
nxt_runtime_event_engines(nxt_task_t * task,nxt_runtime_t * rt)279 nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt)
280 {
281     nxt_thread_t                 *thread;
282     nxt_event_engine_t           *engine;
283     const nxt_event_interface_t  *interface;
284 
285     interface = nxt_service_get(rt->services, "engine", NULL);
286 
287     if (nxt_slow_path(interface == NULL)) {
288         /* TODO: log */
289         return NXT_ERROR;
290     }
291 
292     engine = nxt_event_engine_create(task, interface,
293                                      nxt_main_process_signals, 0, 0);
294 
295     if (nxt_slow_path(engine == NULL)) {
296         return NXT_ERROR;
297     }
298 
299     thread = task->thread;
300     thread->engine = engine;
301 #if 0
302     thread->fiber = &engine->fibers->fiber;
303 #endif
304 
305     engine->id = rt->last_engine_id++;
306     engine->mem_pool = nxt_mp_create(1024, 128, 256, 32);
307 
308     nxt_queue_init(&rt->engines);
309     nxt_queue_insert_tail(&rt->engines, &engine->link);
310 
311     return NXT_OK;
312 }
313 
314 
315 static nxt_int_t
nxt_runtime_thread_pools(nxt_thread_t * thr,nxt_runtime_t * rt)316 nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt)
317 {
318     nxt_int_t    ret;
319     nxt_array_t  *thread_pools;
320 
321     thread_pools = nxt_array_create(rt->mem_pool, 1,
322                                     sizeof(nxt_thread_pool_t *));
323 
324     if (nxt_slow_path(thread_pools == NULL)) {
325         return NXT_ERROR;
326     }
327 
328     rt->thread_pools = thread_pools;
329     ret = nxt_runtime_thread_pool_create(thr, rt, 2, 60000 * 1000000LL);
330 
331     if (nxt_slow_path(ret != NXT_OK)) {
332         return NXT_ERROR;
333     }
334 
335     return NXT_OK;
336 }
337 
338 
339 static void
nxt_runtime_start(nxt_task_t * task,void * obj,void * data)340 nxt_runtime_start(nxt_task_t *task, void *obj, void *data)
341 {
342     nxt_runtime_t  *rt;
343 
344     rt = obj;
345 
346     nxt_debug(task, "rt conf done");
347 
348     task->thread->log->ctx_handler = NULL;
349     task->thread->log->ctx = NULL;
350 
351     if (nxt_runtime_log_files_create(task, rt) != NXT_OK) {
352         goto fail;
353     }
354 
355     if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) {
356         goto fail;
357     }
358 
359     /*
360      * Thread pools should be destroyed before starting worker
361      * processes, because thread pool semaphores will stick in
362      * locked state in new processes after fork().
363      */
364     nxt_runtime_thread_pool_destroy(task, rt, rt->start);
365 
366     return;
367 
368 fail:
369 
370     nxt_runtime_quit(task, 1);
371 }
372 
373 
374 static void
nxt_runtime_initial_start(nxt_task_t * task,nxt_uint_t status)375 nxt_runtime_initial_start(nxt_task_t *task, nxt_uint_t status)
376 {
377     nxt_int_t                    ret;
378     nxt_thread_t                 *thr;
379     nxt_runtime_t                *rt;
380     const nxt_event_interface_t  *interface;
381 
382     thr = task->thread;
383     rt = thr->runtime;
384 
385     if (rt->inherited_sockets == NULL && rt->daemon) {
386 
387         if (nxt_process_daemon(task) != NXT_OK) {
388             goto fail;
389         }
390 
391         /*
392          * An event engine should be updated after fork()
393          * even if an event facility was not changed because:
394          * 1) inherited kqueue descriptor is invalid,
395          * 2) the signal thread is not inherited.
396          */
397         interface = nxt_service_get(rt->services, "engine", rt->engine);
398         if (interface == NULL) {
399             goto fail;
400         }
401 
402         ret = nxt_event_engine_change(task->thread->engine, interface,
403                                       rt->batch);
404         if (ret != NXT_OK) {
405             goto fail;
406         }
407     }
408 
409     ret = nxt_runtime_pid_file_create(task, rt->pid_file);
410     if (ret != NXT_OK) {
411         goto fail;
412     }
413 
414     if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) {
415         goto fail;
416     }
417 
418     thr->engine->max_connections = rt->engine_connections;
419 
420     if (nxt_main_process_start(thr, task, rt) != NXT_ERROR) {
421         return;
422     }
423 
424 fail:
425 
426     nxt_runtime_quit(task, 1);
427 }
428 
429 
430 void
nxt_runtime_quit(nxt_task_t * task,nxt_uint_t status)431 nxt_runtime_quit(nxt_task_t *task, nxt_uint_t status)
432 {
433     nxt_bool_t          done;
434     nxt_runtime_t       *rt;
435     nxt_event_engine_t  *engine;
436 
437     rt = task->thread->runtime;
438     rt->status |= status;
439     engine = task->thread->engine;
440 
441     nxt_debug(task, "exiting");
442 
443     done = 1;
444 
445     if (!engine->shutdown) {
446         engine->shutdown = 1;
447 
448         if (!nxt_array_is_empty(rt->thread_pools)) {
449             nxt_runtime_thread_pool_destroy(task, rt, nxt_runtime_quit);
450             done = 0;
451         }
452 
453         if (rt->type == NXT_PROCESS_MAIN) {
454             nxt_runtime_stop_all_processes(task, rt);
455             done = 0;
456         }
457     }
458 
459     nxt_runtime_close_idle_connections(engine);
460 
461     if (done) {
462         nxt_work_queue_add(&engine->fast_work_queue, nxt_runtime_exit,
463                            task, rt, engine);
464     }
465 }
466 
467 
468 static void
nxt_runtime_close_idle_connections(nxt_event_engine_t * engine)469 nxt_runtime_close_idle_connections(nxt_event_engine_t *engine)
470 {
471     nxt_conn_t        *c;
472     nxt_queue_t       *idle;
473     nxt_queue_link_t  *link, *next;
474 
475     nxt_debug(&engine->task, "close idle connections");
476 
477     idle = &engine->idle_connections;
478 
479     for (link = nxt_queue_first(idle);
480          link != nxt_queue_tail(idle);
481          link = next)
482     {
483         next = nxt_queue_next(link);
484         c = nxt_queue_link_data(link, nxt_conn_t, link);
485 
486         if (!c->socket.read_ready) {
487             nxt_queue_remove(link);
488             nxt_conn_close(engine, c);
489         }
490     }
491 }
492 
493 
494 void
nxt_runtime_stop_app_processes(nxt_task_t * task,nxt_runtime_t * rt)495 nxt_runtime_stop_app_processes(nxt_task_t *task, nxt_runtime_t *rt)
496 {
497     nxt_port_t          *port;
498     nxt_process_t       *process;
499     nxt_process_init_t  *init;
500 
501     nxt_runtime_process_each(rt, process) {
502 
503         init = nxt_process_init(process);
504 
505         if (init->type == NXT_PROCESS_APP
506             || init->type == NXT_PROCESS_PROTOTYPE)
507         {
508 
509             nxt_process_port_each(process, port) {
510 
511                 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1,
512                                              0, 0, NULL);
513 
514             } nxt_process_port_loop;
515         }
516 
517     } nxt_runtime_process_loop;
518 }
519 
520 
521 static void
nxt_runtime_stop_all_processes(nxt_task_t * task,nxt_runtime_t * rt)522 nxt_runtime_stop_all_processes(nxt_task_t *task, nxt_runtime_t *rt)
523 {
524     nxt_port_t     *port;
525     nxt_process_t  *process;
526 
527     nxt_runtime_process_each(rt, process) {
528 
529         nxt_process_port_each(process, port) {
530 
531             nxt_debug(task, "%d sending quit to %PI", rt->type, port->pid);
532 
533             (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0,
534                                          0, NULL);
535 
536         } nxt_process_port_loop;
537 
538     } nxt_runtime_process_loop;
539 }
540 
541 
542 static void
nxt_runtime_exit(nxt_task_t * task,void * obj,void * data)543 nxt_runtime_exit(nxt_task_t *task, void *obj, void *data)
544 {
545     int                 status, engine_count;
546     nxt_runtime_t       *rt;
547     nxt_process_t       *process;
548     nxt_event_engine_t  *engine;
549 
550     rt = obj;
551     engine = data;
552 
553     nxt_debug(task, "thread pools: %d", rt->thread_pools->nelts);
554 
555     if (!nxt_array_is_empty(rt->thread_pools)) {
556         return;
557     }
558 
559     if (rt->type == NXT_PROCESS_MAIN) {
560         if (rt->pid_file != NULL) {
561             nxt_file_delete(rt->pid_file);
562         }
563 
564 #if (NXT_HAVE_UNIX_DOMAIN)
565         {
566             size_t           i;
567             nxt_sockaddr_t   *sa;
568             nxt_file_name_t  *name;
569 
570             sa = rt->controller_listen;
571 
572             if (sa->u.sockaddr.sa_family == AF_UNIX) {
573                 name = (nxt_file_name_t *) sa->u.sockaddr_un.sun_path;
574                 (void) nxt_file_delete(name);
575             }
576 
577             for (i = 0; i < rt->listen_sockets->nelts; i++) {
578                 nxt_listen_socket_t  *ls;
579 
580                 ls = (nxt_listen_socket_t *) rt->listen_sockets->elts + i;
581                 sa = ls->sockaddr;
582 
583                 if (sa->u.sockaddr.sa_family != AF_UNIX
584                     || sa->u.sockaddr_un.sun_path[0] == '\0')
585                 {
586                     continue;
587                 }
588 
589                 name = (nxt_file_name_t *) sa->u.sockaddr_un.sun_path;
590                 (void) nxt_file_delete(name);
591             }
592         }
593 #endif
594     }
595 
596     if (!engine->event.signal_support) {
597         nxt_event_engine_signals_stop(engine);
598     }
599 
600     nxt_runtime_process_each(rt, process) {
601 
602         nxt_runtime_process_remove(rt, process);
603         nxt_process_close_ports(task, process);
604 
605     } nxt_runtime_process_loop;
606 
607     status = rt->status;
608 
609     engine_count = 0;
610 
611     nxt_queue_each(engine, &rt->engines, nxt_event_engine_t, link) {
612 
613         engine_count++;
614 
615     } nxt_queue_loop;
616 
617     if (engine_count <= 1) {
618         if (rt->port_by_type[rt->type] != NULL) {
619             nxt_port_use(task, rt->port_by_type[rt->type], -1);
620         }
621 
622         nxt_thread_mutex_destroy(&rt->processes_mutex);
623 
624         nxt_mp_destroy(rt->mem_pool);
625     }
626 
627     nxt_debug(task, "exit: %d", status);
628 
629     exit(status);
630     nxt_unreachable();
631 }
632 
633 
634 static nxt_int_t
nxt_runtime_event_engine_change(nxt_task_t * task,nxt_runtime_t * rt)635 nxt_runtime_event_engine_change(nxt_task_t *task, nxt_runtime_t *rt)
636 {
637     nxt_event_engine_t           *engine;
638     const nxt_event_interface_t  *interface;
639 
640     engine = task->thread->engine;
641 
642     if (engine->batch == rt->batch
643         && nxt_strcmp(engine->event.name, rt->engine) == 0)
644     {
645         return NXT_OK;
646     }
647 
648     interface = nxt_service_get(rt->services, "engine", rt->engine);
649 
650     if (interface != NULL) {
651         return nxt_event_engine_change(engine, interface, rt->batch);
652     }
653 
654     return NXT_ERROR;
655 }
656 
657 
658 void
nxt_runtime_event_engine_free(nxt_runtime_t * rt)659 nxt_runtime_event_engine_free(nxt_runtime_t *rt)
660 {
661     nxt_queue_link_t    *link;
662     nxt_event_engine_t  *engine;
663 
664     link = nxt_queue_first(&rt->engines);
665     nxt_queue_remove(link);
666 
667     engine = nxt_queue_link_data(link, nxt_event_engine_t, link);
668     nxt_event_engine_free(engine);
669 }
670 
671 
672 nxt_int_t
nxt_runtime_thread_pool_create(nxt_thread_t * thr,nxt_runtime_t * rt,nxt_uint_t max_threads,nxt_nsec_t timeout)673 nxt_runtime_thread_pool_create(nxt_thread_t *thr, nxt_runtime_t *rt,
674     nxt_uint_t max_threads, nxt_nsec_t timeout)
675 {
676     nxt_thread_pool_t   *thread_pool, **tp;
677 
678     tp = nxt_array_add(rt->thread_pools);
679     if (tp == NULL) {
680         return NXT_ERROR;
681     }
682 
683     thread_pool = nxt_thread_pool_create(max_threads, timeout,
684                                          nxt_runtime_thread_pool_init,
685                                          thr->engine,
686                                          nxt_runtime_thread_pool_exit);
687 
688     if (nxt_fast_path(thread_pool != NULL)) {
689         *tp = thread_pool;
690     }
691 
692     return NXT_OK;
693 }
694 
695 
696 static void
nxt_runtime_thread_pool_destroy(nxt_task_t * task,nxt_runtime_t * rt,nxt_runtime_cont_t cont)697 nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt,
698     nxt_runtime_cont_t cont)
699 {
700     nxt_uint_t         n;
701     nxt_thread_pool_t  **tp;
702 
703     rt->continuation = cont;
704 
705     n = rt->thread_pools->nelts;
706 
707     if (n == 0) {
708         cont(task, 0);
709         return;
710     }
711 
712     tp = rt->thread_pools->elts;
713 
714     do {
715         nxt_thread_pool_destroy(*tp);
716 
717         tp++;
718         n--;
719     } while (n != 0);
720 }
721 
722 
723 static void
nxt_runtime_thread_pool_init(void)724 nxt_runtime_thread_pool_init(void)
725 {
726 }
727 
728 
729 static void
nxt_runtime_thread_pool_exit(nxt_task_t * task,void * obj,void * data)730 nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj, void *data)
731 {
732     nxt_uint_t           i, n;
733     nxt_runtime_t        *rt;
734     nxt_thread_pool_t    *tp, **thread_pools;
735     nxt_thread_handle_t  handle;
736 
737     tp = obj;
738 
739     if (data != NULL) {
740         handle = (nxt_thread_handle_t) (uintptr_t) data;
741         nxt_thread_wait(handle);
742     }
743 
744     rt = task->thread->runtime;
745 
746     thread_pools = rt->thread_pools->elts;
747     n = rt->thread_pools->nelts;
748 
749     nxt_debug(task, "thread pools: %ui", n);
750 
751     for (i = 0; i < n; i++) {
752 
753         if (tp == thread_pools[i]) {
754             nxt_array_remove(rt->thread_pools, &thread_pools[i]);
755 
756             nxt_free(tp);
757 
758             if (n == 1) {
759                 /* The last thread pool. */
760                 rt->continuation(task, 0);
761             }
762 
763             return;
764         }
765     }
766 }
767 
768 
769 static nxt_int_t
nxt_runtime_conf_init(nxt_task_t * task,nxt_runtime_t * rt)770 nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt)
771 {
772     nxt_int_t                    ret;
773     nxt_str_t                    control;
774     nxt_uint_t                   n;
775     nxt_file_t                   *file;
776     const char                   *slash;
777     nxt_sockaddr_t               *sa;
778     nxt_file_name_str_t          file_name;
779     const nxt_event_interface_t  *interface;
780 
781     rt->daemon = 1;
782     rt->engine_connections = 256;
783     rt->auxiliary_threads = 2;
784     rt->user_cred.user = NXT_USER;
785     rt->group = NXT_GROUP;
786     rt->pid = NXT_PID;
787     rt->log = NXT_LOG;
788     rt->modules = NXT_MODULESDIR;
789     rt->state = NXT_STATEDIR;
790     rt->control = NXT_CONTROL_SOCK;
791     rt->tmp = NXT_TMPDIR;
792 
793     nxt_memzero(&rt->capabilities, sizeof(nxt_capabilities_t));
794 
795     if (nxt_runtime_conf_read_cmd(task, rt) != NXT_OK) {
796         return NXT_ERROR;
797     }
798 
799     if (nxt_capability_set(task, &rt->capabilities) != NXT_OK) {
800         return NXT_ERROR;
801     }
802 
803     if (rt->capabilities.setid) {
804         ret = nxt_credential_get(task, rt->mem_pool, &rt->user_cred,
805                                 rt->group);
806 
807         if (nxt_slow_path(ret != NXT_OK)) {
808             return NXT_ERROR;
809         }
810 
811     } else {
812         nxt_log(task, NXT_LOG_WARN, "Unit is running unprivileged, then it "
813                 "cannot use arbitrary user and group.");
814     }
815 
816     /* An engine's parameters. */
817 
818     interface = nxt_service_get(rt->services, "engine", rt->engine);
819     if (interface == NULL) {
820         return NXT_ERROR;
821     }
822 
823     rt->engine = interface->name;
824 
825     ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%Z", rt->pid);
826     if (nxt_slow_path(ret != NXT_OK)) {
827         return NXT_ERROR;
828     }
829 
830     rt->pid_file = file_name.start;
831 
832     ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%Z", rt->log);
833     if (nxt_slow_path(ret != NXT_OK)) {
834         return NXT_ERROR;
835     }
836 
837     file = nxt_list_first(rt->log_files);
838     file->name = file_name.start;
839 
840     slash = "";
841     n = nxt_strlen(rt->modules);
842 
843     if (n > 1 && rt->modules[n - 1] != '/') {
844         slash = "/";
845     }
846 
847     ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%s*.unit.so%Z",
848                                rt->modules, slash);
849     if (nxt_slow_path(ret != NXT_OK)) {
850         return NXT_ERROR;
851     }
852 
853     rt->modules = (char *) file_name.start;
854 
855     slash = "";
856     n = nxt_strlen(rt->state);
857 
858     if (n > 1 && rt->state[n - 1] != '/') {
859         slash = "/";
860     }
861 
862     ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%sversion%Z",
863                                rt->state, slash);
864     if (nxt_slow_path(ret != NXT_OK)) {
865         return NXT_ERROR;
866     }
867 
868     rt->ver = (char *) file_name.start;
869 
870     ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s.tmp%Z", rt->ver);
871     if (nxt_slow_path(ret != NXT_OK)) {
872         return NXT_ERROR;
873     }
874 
875     rt->ver_tmp = (char *) file_name.start;
876 
877     ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%sconf.json%Z",
878                                rt->state, slash);
879     if (nxt_slow_path(ret != NXT_OK)) {
880         return NXT_ERROR;
881     }
882 
883     rt->conf = (char *) file_name.start;
884 
885     ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s.tmp%Z", rt->conf);
886     if (nxt_slow_path(ret != NXT_OK)) {
887         return NXT_ERROR;
888     }
889 
890     rt->conf_tmp = (char *) file_name.start;
891 
892     ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%scerts/%Z",
893                                rt->state, slash);
894     if (nxt_slow_path(ret != NXT_OK)) {
895         return NXT_ERROR;
896     }
897 
898     ret = mkdir((char *) file_name.start, S_IRWXU);
899 
900     if (nxt_fast_path(ret == 0 || nxt_errno == EEXIST)) {
901         rt->certs.length = file_name.len;
902         rt->certs.start = file_name.start;
903 
904     } else {
905         nxt_alert(task, "Unable to create certificates storage directory: "
906                   "mkdir(%s) failed %E", file_name.start, nxt_errno);
907     }
908 
909     ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%sscripts/%Z",
910                                rt->state, slash);
911     if (nxt_slow_path(ret != NXT_OK)) {
912         return NXT_ERROR;
913     }
914 
915     ret = mkdir((char *) file_name.start, S_IRWXU);
916 
917     if (nxt_fast_path(ret == 0 || nxt_errno == EEXIST)) {
918         rt->scripts.length = file_name.len;
919         rt->scripts.start = file_name.start;
920 
921     } else {
922         nxt_alert(task, "Unable to create scripts storage directory: "
923                   "mkdir(%s) failed %E", file_name.start, nxt_errno);
924     }
925 
926     control.length = nxt_strlen(rt->control);
927     control.start = (u_char *) rt->control;
928 
929     sa = nxt_sockaddr_parse(rt->mem_pool, &control);
930     if (nxt_slow_path(sa == NULL)) {
931         return NXT_ERROR;
932     }
933 
934     sa->type = SOCK_STREAM;
935 
936     rt->controller_listen = sa;
937 
938     if (nxt_runtime_controller_socket(task, rt) != NXT_OK) {
939         return NXT_ERROR;
940     }
941 
942     return NXT_OK;
943 }
944 
945 
946 static nxt_int_t
nxt_runtime_conf_read_cmd(nxt_task_t * task,nxt_runtime_t * rt)947 nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt)
948 {
949     char    *p, **argv;
950     u_char  *end;
951     u_char  buf[1024];
952 
953     static const char  version[] =
954         "unit version: " NXT_VERSION "\n"
955         "configured as ./configure" NXT_CONFIGURE_OPTIONS "\n";
956 
957     static const char  no_control[] =
958                        "option \"--control\" requires socket address\n";
959     static const char  no_control_mode[] =
960                         "option \"--control-mode\" requires a mode\n";
961     static const char  no_control_user[] =
962                         "option \"--control-user\" requires a username\n";
963     static const char  no_control_group[] =
964                         "option \"--control-group\" requires a group name\n";
965     static const char  no_user[] = "option \"--user\" requires username\n";
966     static const char  no_group[] = "option \"--group\" requires group name\n";
967     static const char  no_pid[] = "option \"--pid\" requires filename\n";
968     static const char  no_log[] = "option \"--log\" requires filename\n";
969     static const char  no_modules[] =
970                        "option \"--modulesdir\" requires directory\n";
971     static const char  no_state[] =
972                        "option \"--statedir\" requires directory\n";
973     static const char  no_tmp[] = "option \"--tmpdir\" requires directory\n";
974 
975     static const char  modules_deprecated[] =
976            "option \"--modules\" is deprecated; use \"--modulesdir\" instead\n";
977     static const char  state_deprecated[] =
978            "option \"--state\" is deprecated; use \"--statedir\" instead\n";
979     static const char  tmp_deprecated[] =
980            "option \"--tmp\" is deprecated; use \"--tmpdir\" instead\n";
981 
982     static const char  help[] =
983         "\n"
984         "unit options:\n"
985         "\n"
986         "  --version            print unit version and configure options\n"
987         "\n"
988         "  --no-daemon          run unit in non-daemon mode\n"
989         "\n"
990         "  --control ADDRESS    set address of control API socket\n"
991         "                       default: \"" NXT_CONTROL_SOCK "\"\n"
992         "\n"
993         "  --control-mode MODE  set mode of the control API socket\n"
994         "                       default: 0600\n"
995         "\n"
996         "  --control-user USER    set the owner of the control API socket\n"
997         "\n"
998         "  --control-group GROUP  set the group of the control API socket\n"
999         "\n"
1000         "  --pid FILE           set pid filename\n"
1001         "                       default: \"" NXT_PID "\"\n"
1002         "\n"
1003         "  --log FILE           set log filename\n"
1004         "                       default: \"" NXT_LOG "\"\n"
1005         "\n"
1006         "  --modulesdir DIR     set modules directory name\n"
1007         "                       default: \"" NXT_MODULESDIR "\"\n"
1008         "\n"
1009         "  --statedir DIR       set state directory name\n"
1010         "                       default: \"" NXT_STATEDIR "\"\n"
1011         "\n"
1012         "  --tmpdir DIR         set tmp directory name\n"
1013         "                       default: \"" NXT_TMPDIR "\"\n"
1014         "\n"
1015         "  --modules DIR        [deprecated] synonym for --modulesdir\n"
1016         "  --state DIR          [deprecated] synonym for --statedir\n"
1017         "  --tmp DIR            [deprecated] synonym for --tmpdir\n"
1018         "\n"
1019         "  --user USER          set non-privileged processes to run"
1020                                 " as specified user\n"
1021         "                       default: \"" NXT_USER "\"\n"
1022         "\n"
1023         "  --group GROUP        set non-privileged processes to run"
1024                                 " as specified group\n"
1025         "                       default: ";
1026 
1027     static const char  group[] = "\"" NXT_GROUP "\"\n\n";
1028     static const char  primary[] = "user's primary group\n\n";
1029 
1030     argv = &nxt_process_argv[1];
1031 
1032     while (*argv != NULL) {
1033         p = *argv++;
1034 
1035         if (nxt_strcmp(p, "--control") == 0) {
1036             if (*argv == NULL) {
1037                 write(STDERR_FILENO, no_control, nxt_length(no_control));
1038                 return NXT_ERROR;
1039             }
1040 
1041             p = *argv++;
1042 
1043             rt->control = p;
1044 
1045             continue;
1046         }
1047 
1048         if (nxt_strcmp(p, "--control-mode") == 0) {
1049             if (*argv == NULL) {
1050                 write(STDERR_FILENO, no_control_mode,
1051                       nxt_length(no_control_mode));
1052                 return NXT_ERROR;
1053             }
1054 
1055             p = *argv++;
1056 
1057             rt->control_mode = strtoul(p, NULL, 8);
1058 
1059             continue;
1060         }
1061 
1062         if (nxt_strcmp(p, "--control-user") == 0) {
1063             if (*argv == NULL) {
1064                 write(STDERR_FILENO, no_control_user,
1065                       nxt_length(no_control_user));
1066                 return NXT_ERROR;
1067             }
1068 
1069             p = *argv++;
1070 
1071             rt->control_user = p;
1072 
1073             continue;
1074         }
1075 
1076         if (nxt_strcmp(p, "--control-group") == 0) {
1077             if (*argv == NULL) {
1078                 write(STDERR_FILENO, no_control_group,
1079                       nxt_length(no_control_group));
1080                 return NXT_ERROR;
1081             }
1082 
1083             p = *argv++;
1084 
1085             rt->control_group = p;
1086 
1087             continue;
1088         }
1089 
1090         if (nxt_strcmp(p, "--user") == 0) {
1091             if (*argv == NULL) {
1092                 write(STDERR_FILENO, no_user, nxt_length(no_user));
1093                 return NXT_ERROR;
1094             }
1095 
1096             p = *argv++;
1097 
1098             rt->user_cred.user = p;
1099 
1100             continue;
1101         }
1102 
1103         if (nxt_strcmp(p, "--group") == 0) {
1104             if (*argv == NULL) {
1105                 write(STDERR_FILENO, no_group, nxt_length(no_group));
1106                 return NXT_ERROR;
1107             }
1108 
1109             p = *argv++;
1110 
1111             rt->group = p;
1112 
1113             continue;
1114         }
1115 
1116         if (nxt_strcmp(p, "--pid") == 0) {
1117             if (*argv == NULL) {
1118                 write(STDERR_FILENO, no_pid, nxt_length(no_pid));
1119                 return NXT_ERROR;
1120             }
1121 
1122             p = *argv++;
1123 
1124             rt->pid = p;
1125 
1126             continue;
1127         }
1128 
1129         if (nxt_strcmp(p, "--log") == 0) {
1130             if (*argv == NULL) {
1131                 write(STDERR_FILENO, no_log, nxt_length(no_log));
1132                 return NXT_ERROR;
1133             }
1134 
1135             p = *argv++;
1136 
1137             rt->log = p;
1138 
1139             continue;
1140         }
1141 
1142         if (nxt_strcmp(p, "--modules") == 0) {
1143             write(STDERR_FILENO, modules_deprecated,
1144                   nxt_length(modules_deprecated));
1145             goto modulesdir;
1146         }
1147 
1148         if (nxt_strcmp(p, "--modulesdir") == 0) {
1149 modulesdir:
1150             if (*argv == NULL) {
1151                 write(STDERR_FILENO, no_modules, nxt_length(no_modules));
1152                 return NXT_ERROR;
1153             }
1154 
1155             p = *argv++;
1156 
1157             rt->modules = p;
1158 
1159             continue;
1160         }
1161 
1162         if (nxt_strcmp(p, "--state") == 0) {
1163             write(STDERR_FILENO, state_deprecated,
1164                   nxt_length(state_deprecated));
1165             goto statedir;
1166         }
1167 
1168         if (nxt_strcmp(p, "--statedir") == 0) {
1169 statedir:
1170             if (*argv == NULL) {
1171                 write(STDERR_FILENO, no_state, nxt_length(no_state));
1172                 return NXT_ERROR;
1173             }
1174 
1175             p = *argv++;
1176 
1177             rt->state = p;
1178 
1179             continue;
1180         }
1181 
1182         if (nxt_strcmp(p, "--tmp") == 0) {
1183             write(STDERR_FILENO, tmp_deprecated, nxt_length(tmp_deprecated));
1184             goto tmpdir;
1185         }
1186 
1187         if (nxt_strcmp(p, "--tmpdir") == 0) {
1188 tmpdir:
1189             if (*argv == NULL) {
1190                 write(STDERR_FILENO, no_tmp, nxt_length(no_tmp));
1191                 return NXT_ERROR;
1192             }
1193 
1194             p = *argv++;
1195 
1196             rt->tmp = p;
1197 
1198             continue;
1199         }
1200 
1201         if (nxt_strcmp(p, "--no-daemon") == 0) {
1202             rt->daemon = 0;
1203             continue;
1204         }
1205 
1206         if (nxt_strcmp(p, "--version") == 0) {
1207             write(STDERR_FILENO, version, nxt_length(version));
1208             exit(0);
1209         }
1210 
1211         if (nxt_strcmp(p, "--help") == 0 || nxt_strcmp(p, "-h") == 0) {
1212             write(STDOUT_FILENO, help, nxt_length(help));
1213 
1214             if (sizeof(NXT_GROUP) == 1) {
1215                 write(STDOUT_FILENO, primary, nxt_length(primary));
1216 
1217             } else {
1218                 write(STDOUT_FILENO, group, nxt_length(group));
1219             }
1220 
1221             exit(0);
1222         }
1223 
1224         end = nxt_sprintf(buf, buf + sizeof(buf), "unknown option \"%s\", "
1225                           "try \"%s -h\" for available options\n",
1226                           p, nxt_process_argv[0]);
1227 
1228         write(STDERR_FILENO, buf, end - buf);
1229 
1230         return NXT_ERROR;
1231     }
1232 
1233     return NXT_OK;
1234 }
1235 
1236 
1237 nxt_listen_socket_t *
nxt_runtime_listen_socket_add(nxt_runtime_t * rt,nxt_sockaddr_t * sa)1238 nxt_runtime_listen_socket_add(nxt_runtime_t *rt, nxt_sockaddr_t *sa)
1239 {
1240     nxt_mp_t             *mp;
1241     nxt_listen_socket_t  *ls;
1242 
1243     ls = nxt_array_zero_add(rt->listen_sockets);
1244     if (ls == NULL) {
1245         return NULL;
1246     }
1247 
1248     mp = rt->mem_pool;
1249 
1250     ls->sockaddr = nxt_sockaddr_create(mp, &sa->u.sockaddr, sa->socklen,
1251                                        sa->length);
1252     if (ls->sockaddr == NULL) {
1253         return NULL;
1254     }
1255 
1256     ls->sockaddr->type = sa->type;
1257 
1258     nxt_sockaddr_text(ls->sockaddr);
1259 
1260     ls->socket = -1;
1261     ls->backlog = NXT_LISTEN_BACKLOG;
1262 
1263     return ls;
1264 }
1265 
1266 
1267 static nxt_int_t
nxt_runtime_hostname(nxt_task_t * task,nxt_runtime_t * rt)1268 nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt)
1269 {
1270     size_t  length;
1271     char    hostname[NXT_MAXHOSTNAMELEN + 1];
1272 
1273     if (gethostname(hostname, NXT_MAXHOSTNAMELEN) != 0) {
1274         nxt_alert(task, "gethostname() failed %E", nxt_errno);
1275         return NXT_ERROR;
1276     }
1277 
1278     /*
1279      * Linux gethostname(2):
1280      *
1281      *    If the null-terminated hostname is too large to fit,
1282      *    then the name is truncated, and no error is returned.
1283      *
1284      * For this reason an additional byte is reserved in the buffer.
1285      */
1286     hostname[NXT_MAXHOSTNAMELEN] = '\0';
1287 
1288     length = nxt_strlen(hostname);
1289     rt->hostname.length = length;
1290 
1291     rt->hostname.start = nxt_mp_nget(rt->mem_pool, length);
1292 
1293     if (rt->hostname.start != NULL) {
1294         nxt_memcpy_lowcase(rt->hostname.start, (u_char *) hostname, length);
1295         return NXT_OK;
1296     }
1297 
1298     return NXT_ERROR;
1299 }
1300 
1301 
1302 static nxt_int_t
nxt_runtime_log_files_init(nxt_runtime_t * rt)1303 nxt_runtime_log_files_init(nxt_runtime_t *rt)
1304 {
1305     nxt_file_t  *file;
1306     nxt_list_t  *log_files;
1307 
1308     log_files = nxt_list_create(rt->mem_pool, 1, sizeof(nxt_file_t));
1309 
1310     if (nxt_fast_path(log_files != NULL)) {
1311         rt->log_files = log_files;
1312 
1313         /* Preallocate the main log.  This allocation cannot fail. */
1314         file = nxt_list_zero_add(log_files);
1315 
1316         file->fd = NXT_FILE_INVALID;
1317         file->log_level = NXT_LOG_ALERT;
1318 
1319         return NXT_OK;
1320     }
1321 
1322     return NXT_ERROR;
1323 }
1324 
1325 
1326 nxt_file_t *
nxt_runtime_log_file_add(nxt_runtime_t * rt,nxt_str_t * name)1327 nxt_runtime_log_file_add(nxt_runtime_t *rt, nxt_str_t *name)
1328 {
1329     nxt_int_t            ret;
1330     nxt_file_t           *file;
1331     nxt_file_name_str_t  file_name;
1332 
1333     ret = nxt_file_name_create(rt->mem_pool, &file_name, "V%Z", name);
1334 
1335     if (nxt_slow_path(ret != NXT_OK)) {
1336         return NULL;
1337     }
1338 
1339     nxt_list_each(file, rt->log_files) {
1340 
1341         /* STUB: hardecoded case sensitive/insensitive. */
1342 
1343         if (file->name != NULL
1344             && nxt_file_name_eq(file->name, file_name.start))
1345         {
1346             return file;
1347         }
1348 
1349     } nxt_list_loop;
1350 
1351     file = nxt_list_zero_add(rt->log_files);
1352 
1353     if (nxt_slow_path(file == NULL)) {
1354         return NULL;
1355     }
1356 
1357     file->fd = NXT_FILE_INVALID;
1358     file->log_level = NXT_LOG_ALERT;
1359     file->name = file_name.start;
1360 
1361     return file;
1362 }
1363 
1364 
1365 static nxt_int_t
nxt_runtime_log_files_create(nxt_task_t * task,nxt_runtime_t * rt)1366 nxt_runtime_log_files_create(nxt_task_t *task, nxt_runtime_t *rt)
1367 {
1368     nxt_int_t   ret;
1369     nxt_file_t  *file;
1370 
1371     nxt_list_each(file, rt->log_files) {
1372 
1373         ret = nxt_file_open(task, file, O_WRONLY | O_APPEND, O_CREAT,
1374                             NXT_FILE_OWNER_ACCESS);
1375 
1376         if (ret != NXT_OK) {
1377             return NXT_ERROR;
1378         }
1379 
1380     } nxt_list_loop;
1381 
1382     file = nxt_list_first(rt->log_files);
1383 
1384     return nxt_file_stderr(file);
1385 }
1386 
1387 
1388 nxt_int_t
nxt_runtime_listen_sockets_create(nxt_task_t * task,nxt_runtime_t * rt)1389 nxt_runtime_listen_sockets_create(nxt_task_t *task, nxt_runtime_t *rt)
1390 {
1391     nxt_int_t            ret;
1392     nxt_uint_t           c, p, ncurr, nprev;
1393     nxt_listen_socket_t  *curr, *prev;
1394 
1395     curr = rt->listen_sockets->elts;
1396     ncurr = rt->listen_sockets->nelts;
1397 
1398     if (rt->inherited_sockets != NULL) {
1399         prev = rt->inherited_sockets->elts;
1400         nprev = rt->inherited_sockets->nelts;
1401 
1402     } else {
1403         prev = NULL;
1404         nprev = 0;
1405     }
1406 
1407     for (c = 0; c < ncurr; c++) {
1408 
1409         for (p = 0; p < nprev; p++) {
1410 
1411             if (nxt_sockaddr_cmp(curr[c].sockaddr, prev[p].sockaddr)) {
1412 
1413                 ret = nxt_listen_socket_update(task, &curr[c], &prev[p]);
1414                 if (ret != NXT_OK) {
1415                     return NXT_ERROR;
1416                 }
1417 
1418                 goto next;
1419             }
1420         }
1421 
1422         if (nxt_listen_socket_create(task, rt->mem_pool, &curr[c]) != NXT_OK) {
1423             return NXT_ERROR;
1424         }
1425 
1426     next:
1427 
1428         continue;
1429     }
1430 
1431     return NXT_OK;
1432 }
1433 
1434 
1435 nxt_int_t
nxt_runtime_listen_sockets_enable(nxt_task_t * task,nxt_runtime_t * rt)1436 nxt_runtime_listen_sockets_enable(nxt_task_t *task, nxt_runtime_t *rt)
1437 {
1438     nxt_uint_t           i, n;
1439     nxt_listen_socket_t  *ls;
1440 
1441     ls = rt->listen_sockets->elts;
1442     n = rt->listen_sockets->nelts;
1443 
1444     for (i = 0; i < n; i++) {
1445         if (ls[i].flags == NXT_NONBLOCK) {
1446             if (nxt_listen_event(task, &ls[i]) == NULL) {
1447                 return NXT_ERROR;
1448             }
1449         }
1450     }
1451 
1452     return NXT_OK;
1453 }
1454 
1455 
1456 nxt_str_t *
nxt_current_directory(nxt_mp_t * mp)1457 nxt_current_directory(nxt_mp_t *mp)
1458 {
1459     size_t     length;
1460     u_char     *p;
1461     nxt_str_t  *name;
1462     char       buf[NXT_MAX_PATH_LEN];
1463 
1464     length = nxt_dir_current(buf, NXT_MAX_PATH_LEN);
1465 
1466     if (nxt_fast_path(length != 0)) {
1467         name = nxt_str_alloc(mp, length + 1);
1468 
1469         if (nxt_fast_path(name != NULL)) {
1470             p = nxt_cpymem(name->start, buf, length);
1471             *p = '/';
1472 
1473             return name;
1474         }
1475     }
1476 
1477     return NULL;
1478 }
1479 
1480 
1481 static nxt_int_t
nxt_runtime_pid_file_create(nxt_task_t * task,nxt_file_name_t * pid_file)1482 nxt_runtime_pid_file_create(nxt_task_t *task, nxt_file_name_t *pid_file)
1483 {
1484     ssize_t     length;
1485     nxt_int_t   n;
1486     nxt_file_t  file;
1487     u_char      pid[NXT_INT64_T_LEN + nxt_length("\n")];
1488 
1489     nxt_memzero(&file, sizeof(nxt_file_t));
1490 
1491     file.name = pid_file;
1492 
1493     nxt_fs_mkdir_parent(pid_file, 0755);
1494 
1495     n = nxt_file_open(task, &file, O_WRONLY, O_CREAT | O_TRUNC,
1496                       NXT_FILE_DEFAULT_ACCESS);
1497 
1498     if (n != NXT_OK) {
1499         return NXT_ERROR;
1500     }
1501 
1502     length = nxt_sprintf(pid, pid + sizeof(pid), "%PI%n", nxt_pid) - pid;
1503 
1504     if (nxt_file_write(&file, pid, length, 0) != length) {
1505         return NXT_ERROR;
1506     }
1507 
1508     nxt_file_close(task, &file);
1509 
1510     return NXT_OK;
1511 }
1512 
1513 
1514 void
nxt_runtime_process_release(nxt_runtime_t * rt,nxt_process_t * process)1515 nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process)
1516 {
1517     nxt_process_t  *child;
1518 
1519     if (process->registered == 1) {
1520         nxt_runtime_process_remove(rt, process);
1521     }
1522 
1523     if (process->link.next != NULL) {
1524         nxt_queue_remove(&process->link);
1525     }
1526 
1527     nxt_queue_each(child, &process->children, nxt_process_t, link) {
1528         nxt_queue_remove(&child->link);
1529         child->link.next = NULL;
1530     } nxt_queue_loop;
1531 
1532     nxt_assert(process->use_count == 0);
1533     nxt_assert(process->registered == 0);
1534     nxt_assert(nxt_queue_is_empty(&process->ports));
1535 
1536     nxt_port_mmaps_destroy(&process->incoming, 1);
1537 
1538     nxt_thread_mutex_destroy(&process->incoming.mutex);
1539 
1540     /* processes from nxt_runtime_process_get() have no memory pool */
1541     if (process->mem_pool != NULL) {
1542         nxt_mp_destroy(process->mem_pool);
1543     }
1544 
1545     nxt_mp_free(rt->mem_pool, process);
1546 }
1547 
1548 
1549 static nxt_int_t
nxt_runtime_lvlhsh_pid_test(nxt_lvlhsh_query_t * lhq,void * data)1550 nxt_runtime_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data)
1551 {
1552     nxt_process_t  *process;
1553 
1554     process = data;
1555 
1556     if (lhq->key.length == sizeof(nxt_pid_t)
1557         && *(nxt_pid_t *) lhq->key.start == process->pid)
1558     {
1559         return NXT_OK;
1560     }
1561 
1562     return NXT_DECLINED;
1563 }
1564 
1565 static const nxt_lvlhsh_proto_t  lvlhsh_processes_proto  nxt_aligned(64) = {
1566     NXT_LVLHSH_DEFAULT,
1567     nxt_runtime_lvlhsh_pid_test,
1568     nxt_lvlhsh_alloc,
1569     nxt_lvlhsh_free,
1570 };
1571 
1572 
1573 nxt_inline void
nxt_runtime_process_lhq_pid(nxt_lvlhsh_query_t * lhq,nxt_pid_t * pid)1574 nxt_runtime_process_lhq_pid(nxt_lvlhsh_query_t *lhq, nxt_pid_t *pid)
1575 {
1576     lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid));
1577     lhq->key.length = sizeof(*pid);
1578     lhq->key.start = (u_char *) pid;
1579     lhq->proto = &lvlhsh_processes_proto;
1580 }
1581 
1582 
1583 nxt_process_t *
nxt_runtime_process_find(nxt_runtime_t * rt,nxt_pid_t pid)1584 nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid)
1585 {
1586     nxt_process_t       *process;
1587     nxt_lvlhsh_query_t  lhq;
1588 
1589     process = NULL;
1590 
1591     nxt_runtime_process_lhq_pid(&lhq, &pid);
1592 
1593     nxt_thread_mutex_lock(&rt->processes_mutex);
1594 
1595     if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) {
1596         process = lhq.value;
1597 
1598     } else {
1599         nxt_thread_log_debug("process %PI not found", pid);
1600     }
1601 
1602     nxt_thread_mutex_unlock(&rt->processes_mutex);
1603 
1604     return process;
1605 }
1606 
1607 
1608 static nxt_process_t *
nxt_runtime_process_get(nxt_runtime_t * rt,nxt_pid_t pid)1609 nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid)
1610 {
1611     nxt_process_t       *process;
1612     nxt_lvlhsh_query_t  lhq;
1613 
1614     nxt_runtime_process_lhq_pid(&lhq, &pid);
1615 
1616     nxt_thread_mutex_lock(&rt->processes_mutex);
1617 
1618     if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) {
1619         nxt_thread_log_debug("process %PI found", pid);
1620 
1621         nxt_thread_mutex_unlock(&rt->processes_mutex);
1622 
1623         process = lhq.value;
1624         process->use_count++;
1625 
1626         return process;
1627     }
1628 
1629     process = nxt_process_new(rt);
1630     if (nxt_slow_path(process == NULL)) {
1631 
1632         nxt_thread_mutex_unlock(&rt->processes_mutex);
1633 
1634         return NULL;
1635     }
1636 
1637     process->pid = pid;
1638 
1639     lhq.replace = 0;
1640     lhq.value = process;
1641     lhq.pool = rt->mem_pool;
1642 
1643     switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) {
1644 
1645     case NXT_OK:
1646         if (rt->nprocesses == 0) {
1647             rt->mprocess = process;
1648         }
1649 
1650         rt->nprocesses++;
1651 
1652         process->registered = 1;
1653 
1654         nxt_thread_log_debug("process %PI insert", pid);
1655         break;
1656 
1657     default:
1658         nxt_thread_log_debug("process %PI insert failed", pid);
1659         break;
1660     }
1661 
1662     nxt_thread_mutex_unlock(&rt->processes_mutex);
1663 
1664     return process;
1665 }
1666 
1667 
1668 void
nxt_runtime_process_add(nxt_task_t * task,nxt_process_t * process)1669 nxt_runtime_process_add(nxt_task_t *task, nxt_process_t *process)
1670 {
1671     nxt_port_t          *port;
1672     nxt_runtime_t       *rt;
1673     nxt_lvlhsh_query_t  lhq;
1674 
1675     nxt_assert(process->registered == 0);
1676 
1677     rt = task->thread->runtime;
1678 
1679     nxt_runtime_process_lhq_pid(&lhq, &process->pid);
1680 
1681     lhq.replace = 0;
1682     lhq.value = process;
1683     lhq.pool = rt->mem_pool;
1684 
1685     nxt_thread_mutex_lock(&rt->processes_mutex);
1686 
1687     switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) {
1688 
1689     case NXT_OK:
1690         if (rt->nprocesses == 0) {
1691             rt->mprocess = process;
1692         }
1693 
1694         rt->nprocesses++;
1695 
1696         nxt_process_port_each(process, port) {
1697 
1698             port->pid = process->pid;
1699 
1700             nxt_runtime_port_add(task, port);
1701 
1702         } nxt_process_port_loop;
1703 
1704         process->registered = 1;
1705 
1706         nxt_debug(task, "process %PI added", process->pid);
1707         break;
1708 
1709     default:
1710         nxt_alert(task, "process %PI failed to add", process->pid);
1711         break;
1712     }
1713 
1714     nxt_thread_mutex_unlock(&rt->processes_mutex);
1715 }
1716 
1717 
1718 void
nxt_runtime_process_remove(nxt_runtime_t * rt,nxt_process_t * process)1719 nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process)
1720 {
1721     nxt_pid_t           pid;
1722     nxt_lvlhsh_query_t  lhq;
1723 
1724     nxt_assert(process->registered != 0);
1725 
1726     pid = process->pid;
1727 
1728     nxt_runtime_process_lhq_pid(&lhq, &pid);
1729 
1730     lhq.pool = rt->mem_pool;
1731 
1732     nxt_thread_mutex_lock(&rt->processes_mutex);
1733 
1734     switch (nxt_lvlhsh_delete(&rt->processes, &lhq)) {
1735 
1736     case NXT_OK:
1737         nxt_assert(lhq.value == process);
1738 
1739         rt->nprocesses--;
1740 
1741         process->registered = 0;
1742 
1743         nxt_thread_log_debug("process %PI removed", pid);
1744         break;
1745 
1746     default:
1747         nxt_thread_log_alert("process %PI remove failed", pid);
1748         break;
1749     }
1750 
1751     nxt_thread_mutex_unlock(&rt->processes_mutex);
1752 }
1753 
1754 
1755 nxt_process_t *
nxt_runtime_process_first(nxt_runtime_t * rt,nxt_lvlhsh_each_t * lhe)1756 nxt_runtime_process_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe)
1757 {
1758     nxt_lvlhsh_each_init(lhe, &lvlhsh_processes_proto);
1759 
1760     return nxt_runtime_process_next(rt, lhe);
1761 }
1762 
1763 
1764 nxt_port_t *
nxt_runtime_process_port_create(nxt_task_t * task,nxt_runtime_t * rt,nxt_pid_t pid,nxt_port_id_t id,nxt_process_type_t type)1765 nxt_runtime_process_port_create(nxt_task_t *task, nxt_runtime_t *rt,
1766     nxt_pid_t pid, nxt_port_id_t id, nxt_process_type_t type)
1767 {
1768     nxt_port_t     *port;
1769     nxt_process_t  *process;
1770 
1771     process = nxt_runtime_process_get(rt, pid);
1772     if (nxt_slow_path(process == NULL)) {
1773         return NULL;
1774     }
1775 
1776     port = nxt_port_new(task, id, pid, type);
1777     if (nxt_slow_path(port == NULL)) {
1778         nxt_process_use(task, process, -1);
1779         return NULL;
1780     }
1781 
1782     nxt_process_port_add(task, process, port);
1783 
1784     nxt_process_use(task, process, -1);
1785 
1786     nxt_runtime_port_add(task, port);
1787 
1788     nxt_port_use(task, port, -1);
1789 
1790     return port;
1791 }
1792 
1793 
1794 static void
nxt_runtime_port_add(nxt_task_t * task,nxt_port_t * port)1795 nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port)
1796 {
1797     nxt_int_t      res;
1798     nxt_runtime_t  *rt;
1799 
1800     rt = task->thread->runtime;
1801 
1802     res = nxt_port_hash_add(&rt->ports, port);
1803 
1804     if (res != NXT_OK) {
1805         return;
1806     }
1807 
1808     rt->port_by_type[port->type] = port;
1809 
1810     nxt_port_use(task, port, 1);
1811 }
1812 
1813 
1814 void
nxt_runtime_port_remove(nxt_task_t * task,nxt_port_t * port)1815 nxt_runtime_port_remove(nxt_task_t *task, nxt_port_t *port)
1816 {
1817     nxt_int_t      res;
1818     nxt_runtime_t  *rt;
1819 
1820     rt = task->thread->runtime;
1821 
1822     res = nxt_port_hash_remove(&rt->ports, port);
1823 
1824     if (res != NXT_OK) {
1825         return;
1826     }
1827 
1828     if (rt->port_by_type[port->type] == port) {
1829         rt->port_by_type[port->type] = NULL;
1830     }
1831 
1832     nxt_port_use(task, port, -1);
1833 }
1834 
1835 
1836 nxt_port_t *
nxt_runtime_port_find(nxt_runtime_t * rt,nxt_pid_t pid,nxt_port_id_t port_id)1837 nxt_runtime_port_find(nxt_runtime_t *rt, nxt_pid_t pid,
1838     nxt_port_id_t port_id)
1839 {
1840     return nxt_port_hash_find(&rt->ports, pid, port_id);
1841 }
1842