nxt_runtime.c (1488:6976d36be926) nxt_runtime.c (1489:4a3ec07f4b19)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_main.h>
9#include <nxt_runtime.h>
10#include <nxt_port.h>
11#include <nxt_main_process.h>
12#include <nxt_router.h>
13
14
15static nxt_int_t nxt_runtime_inherited_listen_sockets(nxt_task_t *task,
16 nxt_runtime_t *rt);
17static nxt_int_t nxt_runtime_systemd_listen_sockets(nxt_task_t *task,
18 nxt_runtime_t *rt);
19static nxt_int_t nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt);
20static nxt_int_t nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt);
21static void nxt_runtime_start(nxt_task_t *task, void *obj, void *data);
22static void nxt_runtime_initial_start(nxt_task_t *task, nxt_uint_t status);
23static void nxt_runtime_close_idle_connections(nxt_event_engine_t *engine);
24static void nxt_runtime_stop_all_processes(nxt_task_t *task, nxt_runtime_t *rt);
25static void nxt_runtime_exit(nxt_task_t *task, void *obj, void *data);
26static nxt_int_t nxt_runtime_event_engine_change(nxt_task_t *task,
27 nxt_runtime_t *rt);
28static nxt_int_t nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt);
29static nxt_int_t nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt);
30static nxt_int_t nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt);
31static nxt_int_t nxt_runtime_log_files_init(nxt_runtime_t *rt);
32static nxt_int_t nxt_runtime_log_files_create(nxt_task_t *task,
33 nxt_runtime_t *rt);
34static nxt_int_t nxt_runtime_pid_file_create(nxt_task_t *task,
35 nxt_file_name_t *pid_file);
36static void nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt,
37 nxt_runtime_cont_t cont);
38static void nxt_runtime_thread_pool_init(void);
39static void nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj,
40 void *data);
41static nxt_process_t *nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid);
42static void nxt_runtime_process_remove(nxt_runtime_t *rt,
43 nxt_process_t *process);
44static void nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port);
45
46
47nxt_int_t
48nxt_runtime_create(nxt_task_t *task)
49{
50 nxt_mp_t *mp;
51 nxt_int_t ret;
52 nxt_array_t *listen_sockets;
53 nxt_runtime_t *rt;
54 nxt_app_lang_module_t *lang;
55
56 mp = nxt_mp_create(1024, 128, 256, 32);
57 if (nxt_slow_path(mp == NULL)) {
58 return NXT_ERROR;
59 }
60
61 rt = nxt_mp_zget(mp, sizeof(nxt_runtime_t));
62 if (nxt_slow_path(rt == NULL)) {
63 goto fail;
64 }
65
66 task->thread->runtime = rt;
67 rt->mem_pool = mp;
68
69 nxt_thread_mutex_create(&rt->processes_mutex);
70
71 rt->services = nxt_services_init(mp);
72 if (nxt_slow_path(rt->services == NULL)) {
73 goto fail;
74 }
75
76 rt->languages = nxt_array_create(mp, 1, sizeof(nxt_app_lang_module_t));
77 if (nxt_slow_path(rt->languages == NULL)) {
78 goto fail;
79 }
80
81 /* Should not fail. */
82 lang = nxt_array_add(rt->languages);
83 lang->type = NXT_APP_EXTERNAL;
84 lang->version = (u_char *) "";
85 lang->file = NULL;
86 lang->module = &nxt_external_module;
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_main.h>
9#include <nxt_runtime.h>
10#include <nxt_port.h>
11#include <nxt_main_process.h>
12#include <nxt_router.h>
13
14
15static nxt_int_t nxt_runtime_inherited_listen_sockets(nxt_task_t *task,
16 nxt_runtime_t *rt);
17static nxt_int_t nxt_runtime_systemd_listen_sockets(nxt_task_t *task,
18 nxt_runtime_t *rt);
19static nxt_int_t nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt);
20static nxt_int_t nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt);
21static void nxt_runtime_start(nxt_task_t *task, void *obj, void *data);
22static void nxt_runtime_initial_start(nxt_task_t *task, nxt_uint_t status);
23static void nxt_runtime_close_idle_connections(nxt_event_engine_t *engine);
24static void nxt_runtime_stop_all_processes(nxt_task_t *task, nxt_runtime_t *rt);
25static void nxt_runtime_exit(nxt_task_t *task, void *obj, void *data);
26static nxt_int_t nxt_runtime_event_engine_change(nxt_task_t *task,
27 nxt_runtime_t *rt);
28static nxt_int_t nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt);
29static nxt_int_t nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt);
30static nxt_int_t nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt);
31static nxt_int_t nxt_runtime_log_files_init(nxt_runtime_t *rt);
32static nxt_int_t nxt_runtime_log_files_create(nxt_task_t *task,
33 nxt_runtime_t *rt);
34static nxt_int_t nxt_runtime_pid_file_create(nxt_task_t *task,
35 nxt_file_name_t *pid_file);
36static void nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt,
37 nxt_runtime_cont_t cont);
38static void nxt_runtime_thread_pool_init(void);
39static void nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj,
40 void *data);
41static nxt_process_t *nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid);
42static void nxt_runtime_process_remove(nxt_runtime_t *rt,
43 nxt_process_t *process);
44static void nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port);
45
46
47nxt_int_t
48nxt_runtime_create(nxt_task_t *task)
49{
50 nxt_mp_t *mp;
51 nxt_int_t ret;
52 nxt_array_t *listen_sockets;
53 nxt_runtime_t *rt;
54 nxt_app_lang_module_t *lang;
55
56 mp = nxt_mp_create(1024, 128, 256, 32);
57 if (nxt_slow_path(mp == NULL)) {
58 return NXT_ERROR;
59 }
60
61 rt = nxt_mp_zget(mp, sizeof(nxt_runtime_t));
62 if (nxt_slow_path(rt == NULL)) {
63 goto fail;
64 }
65
66 task->thread->runtime = rt;
67 rt->mem_pool = mp;
68
69 nxt_thread_mutex_create(&rt->processes_mutex);
70
71 rt->services = nxt_services_init(mp);
72 if (nxt_slow_path(rt->services == NULL)) {
73 goto fail;
74 }
75
76 rt->languages = nxt_array_create(mp, 1, sizeof(nxt_app_lang_module_t));
77 if (nxt_slow_path(rt->languages == NULL)) {
78 goto fail;
79 }
80
81 /* Should not fail. */
82 lang = nxt_array_add(rt->languages);
83 lang->type = NXT_APP_EXTERNAL;
84 lang->version = (u_char *) "";
85 lang->file = NULL;
86 lang->module = &nxt_external_module;
87 lang->mounts = NULL;
87
88 listen_sockets = nxt_array_create(mp, 1, sizeof(nxt_listen_socket_t));
89 if (nxt_slow_path(listen_sockets == NULL)) {
90 goto fail;
91 }
92
93 rt->listen_sockets = listen_sockets;
94
95 ret = nxt_runtime_inherited_listen_sockets(task, rt);
96 if (nxt_slow_path(ret != NXT_OK)) {
97 goto fail;
98 }
99
100 if (nxt_runtime_hostname(task, rt) != NXT_OK) {
101 goto fail;
102 }
103
104 if (nxt_slow_path(nxt_runtime_log_files_init(rt) != NXT_OK)) {
105 goto fail;
106 }
107
108 if (nxt_runtime_event_engines(task, rt) != NXT_OK) {
109 goto fail;
110 }
111
112 if (nxt_slow_path(nxt_runtime_thread_pools(task->thread, rt) != NXT_OK)) {
113 goto fail;
114 }
115
116 rt->start = nxt_runtime_initial_start;
117
118 if (nxt_runtime_conf_init(task, rt) != NXT_OK) {
119 goto fail;
120 }
121
122 if (nxt_port_rpc_init() != NXT_OK) {
123 goto fail;
124 }
125
126 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
127 nxt_runtime_start, task, rt, NULL);
128
129 return NXT_OK;
130
131fail:
132
133 nxt_mp_destroy(mp);
134
135 return NXT_ERROR;
136}
137
138
139static nxt_int_t
140nxt_runtime_inherited_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt)
141{
142 u_char *v, *p;
143 nxt_int_t type;
144 nxt_array_t *inherited_sockets;
145 nxt_socket_t s;
146 nxt_listen_socket_t *ls;
147
148 v = (u_char *) getenv("NGINX");
149
150 if (v == NULL) {
151 return nxt_runtime_systemd_listen_sockets(task, rt);
152 }
153
154 nxt_alert(task, "using inherited listen sockets: %s", v);
155
156 inherited_sockets = nxt_array_create(rt->mem_pool,
157 1, sizeof(nxt_listen_socket_t));
158 if (inherited_sockets == NULL) {
159 return NXT_ERROR;
160 }
161
162 rt->inherited_sockets = inherited_sockets;
163
164 for (p = v; *p != '\0'; p++) {
165
166 if (*p == ';') {
167 s = nxt_int_parse(v, p - v);
168
169 if (nxt_slow_path(s < 0)) {
170 nxt_alert(task, "invalid socket number \"%s\" "
171 "in NGINX environment variable, "
172 "ignoring the rest of the variable", v);
173 return NXT_ERROR;
174 }
175
176 v = p + 1;
177
178 ls = nxt_array_zero_add(inherited_sockets);
179 if (nxt_slow_path(ls == NULL)) {
180 return NXT_ERROR;
181 }
182
183 ls->socket = s;
184
185 ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s);
186 if (nxt_slow_path(ls->sockaddr == NULL)) {
187 return NXT_ERROR;
188 }
189
190 type = nxt_socket_getsockopt(task, s, SOL_SOCKET, SO_TYPE);
191 if (nxt_slow_path(type == -1)) {
192 return NXT_ERROR;
193 }
194
195 ls->sockaddr->type = (uint16_t) type;
196 }
197 }
198
199 return NXT_OK;
200}
201
202
203static nxt_int_t
204nxt_runtime_systemd_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt)
205{
206 u_char *nfd, *pid;
207 nxt_int_t n;
208 nxt_array_t *inherited_sockets;
209 nxt_socket_t s;
210 nxt_listen_socket_t *ls;
211
212 /*
213 * Number of listening sockets passed. The socket
214 * descriptors start from number 3 and are sequential.
215 */
216 nfd = (u_char *) getenv("LISTEN_FDS");
217 if (nfd == NULL) {
218 return NXT_OK;
219 }
220
221 /* The pid of the service process. */
222 pid = (u_char *) getenv("LISTEN_PID");
223 if (pid == NULL) {
224 return NXT_OK;
225 }
226
227 n = nxt_int_parse(nfd, nxt_strlen(nfd));
228 if (n < 0) {
229 return NXT_OK;
230 }
231
232 if (nxt_pid != nxt_int_parse(pid, nxt_strlen(pid))) {
233 return NXT_OK;
234 }
235
236 nxt_log(task, NXT_LOG_INFO, "using %i systemd listen sockets", n);
237
238 inherited_sockets = nxt_array_create(rt->mem_pool,
239 n, sizeof(nxt_listen_socket_t));
240 if (inherited_sockets == NULL) {
241 return NXT_ERROR;
242 }
243
244 rt->inherited_sockets = inherited_sockets;
245
246 for (s = 3; s < n; s++) {
247 ls = nxt_array_zero_add(inherited_sockets);
248 if (nxt_slow_path(ls == NULL)) {
249 return NXT_ERROR;
250 }
251
252 ls->socket = s;
253
254 ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s);
255 if (nxt_slow_path(ls->sockaddr == NULL)) {
256 return NXT_ERROR;
257 }
258
259 ls->sockaddr->type = SOCK_STREAM;
260 }
261
262 return NXT_OK;
263}
264
265
266static nxt_int_t
267nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt)
268{
269 nxt_thread_t *thread;
270 nxt_event_engine_t *engine;
271 const nxt_event_interface_t *interface;
272
273 interface = nxt_service_get(rt->services, "engine", NULL);
274
275 if (nxt_slow_path(interface == NULL)) {
276 /* TODO: log */
277 return NXT_ERROR;
278 }
279
280 engine = nxt_event_engine_create(task, interface,
281 nxt_main_process_signals, 0, 0);
282
283 if (nxt_slow_path(engine == NULL)) {
284 return NXT_ERROR;
285 }
286
287 thread = task->thread;
288 thread->engine = engine;
289#if 0
290 thread->fiber = &engine->fibers->fiber;
291#endif
292
293 engine->id = rt->last_engine_id++;
294 engine->mem_pool = nxt_mp_create(1024, 128, 256, 32);
295
296 nxt_queue_init(&rt->engines);
297 nxt_queue_insert_tail(&rt->engines, &engine->link);
298
299 return NXT_OK;
300}
301
302
303static nxt_int_t
304nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt)
305{
306 nxt_int_t ret;
307 nxt_array_t *thread_pools;
308
309 thread_pools = nxt_array_create(rt->mem_pool, 1,
310 sizeof(nxt_thread_pool_t *));
311
312 if (nxt_slow_path(thread_pools == NULL)) {
313 return NXT_ERROR;
314 }
315
316 rt->thread_pools = thread_pools;
317 ret = nxt_runtime_thread_pool_create(thr, rt, 2, 60000 * 1000000LL);
318
319 if (nxt_slow_path(ret != NXT_OK)) {
320 return NXT_ERROR;
321 }
322
323 return NXT_OK;
324}
325
326
327static void
328nxt_runtime_start(nxt_task_t *task, void *obj, void *data)
329{
330 nxt_runtime_t *rt;
331
332 rt = obj;
333
334 nxt_debug(task, "rt conf done");
335
336 task->thread->log->ctx_handler = NULL;
337 task->thread->log->ctx = NULL;
338
339 if (nxt_runtime_log_files_create(task, rt) != NXT_OK) {
340 goto fail;
341 }
342
343 if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) {
344 goto fail;
345 }
346
347 /*
348 * Thread pools should be destroyed before starting worker
349 * processes, because thread pool semaphores will stick in
350 * locked state in new processes after fork().
351 */
352 nxt_runtime_thread_pool_destroy(task, rt, rt->start);
353
354 return;
355
356fail:
357
358 nxt_runtime_quit(task, 1);
359}
360
361
362static void
363nxt_runtime_initial_start(nxt_task_t *task, nxt_uint_t status)
364{
365 nxt_int_t ret;
366 nxt_thread_t *thr;
367 nxt_runtime_t *rt;
368 const nxt_event_interface_t *interface;
369
370 thr = task->thread;
371 rt = thr->runtime;
372
373 if (rt->inherited_sockets == NULL && rt->daemon) {
374
375 if (nxt_process_daemon(task) != NXT_OK) {
376 goto fail;
377 }
378
379 /*
380 * An event engine should be updated after fork()
381 * even if an event facility was not changed because:
382 * 1) inherited kqueue descriptor is invalid,
383 * 2) the signal thread is not inherited.
384 */
385 interface = nxt_service_get(rt->services, "engine", rt->engine);
386 if (interface == NULL) {
387 goto fail;
388 }
389
390 ret = nxt_event_engine_change(task->thread->engine, interface,
391 rt->batch);
392 if (ret != NXT_OK) {
393 goto fail;
394 }
395 }
396
397 ret = nxt_runtime_pid_file_create(task, rt->pid_file);
398 if (ret != NXT_OK) {
399 goto fail;
400 }
401
402 if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) {
403 goto fail;
404 }
405
406 thr->engine->max_connections = rt->engine_connections;
407
408 if (nxt_main_process_start(thr, task, rt) != NXT_ERROR) {
409 return;
410 }
411
412fail:
413
414 nxt_runtime_quit(task, 1);
415}
416
417
418void
419nxt_runtime_quit(nxt_task_t *task, nxt_uint_t status)
420{
421 nxt_bool_t done;
422 nxt_runtime_t *rt;
423 nxt_event_engine_t *engine;
424
425 rt = task->thread->runtime;
426 rt->status |= status;
427 engine = task->thread->engine;
428
429 nxt_debug(task, "exiting");
430
431 done = 1;
432
433 if (!engine->shutdown) {
434 engine->shutdown = 1;
435
436 if (!nxt_array_is_empty(rt->thread_pools)) {
437 nxt_runtime_thread_pool_destroy(task, rt, nxt_runtime_quit);
438 done = 0;
439 }
440
441 if (rt->type == NXT_PROCESS_MAIN) {
442 nxt_runtime_stop_all_processes(task, rt);
443 done = 0;
444 }
445 }
446
447 nxt_runtime_close_idle_connections(engine);
448
449 if (done) {
450 nxt_work_queue_add(&engine->fast_work_queue, nxt_runtime_exit,
451 task, rt, engine);
452 }
453}
454
455
456static void
457nxt_runtime_close_idle_connections(nxt_event_engine_t *engine)
458{
459 nxt_conn_t *c;
460 nxt_queue_t *idle;
461 nxt_queue_link_t *link, *next;
462
463 nxt_debug(&engine->task, "close idle connections");
464
465 idle = &engine->idle_connections;
466
467 for (link = nxt_queue_first(idle);
468 link != nxt_queue_tail(idle);
469 link = next)
470 {
471 next = nxt_queue_next(link);
472 c = nxt_queue_link_data(link, nxt_conn_t, link);
473
474 if (!c->socket.read_ready) {
475 nxt_queue_remove(link);
476 nxt_conn_close(engine, c);
477 }
478 }
479}
480
481
482void
483nxt_runtime_stop_app_processes(nxt_task_t *task, nxt_runtime_t *rt)
484{
485 nxt_port_t *port;
486 nxt_process_t *process;
487 nxt_process_init_t *init;
488
489 nxt_runtime_process_each(rt, process) {
490
491 init = nxt_process_init(process);
492
493 if (init->type == NXT_PROCESS_APP) {
494
495 nxt_process_port_each(process, port) {
496
497 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1,
498 0, 0, NULL);
499
500 } nxt_process_port_loop;
501 }
502
503 } nxt_runtime_process_loop;
504}
505
506
507static void
508nxt_runtime_stop_all_processes(nxt_task_t *task, nxt_runtime_t *rt)
509{
510 nxt_port_t *port;
511 nxt_process_t *process;
512
513 nxt_runtime_process_each(rt, process) {
514
515 nxt_process_port_each(process, port) {
516
517 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0,
518 0, NULL);
519
520 } nxt_process_port_loop;
521
522 } nxt_runtime_process_loop;
523}
524
525
526static void
527nxt_runtime_exit(nxt_task_t *task, void *obj, void *data)
528{
529 int status;
530 nxt_runtime_t *rt;
531 nxt_process_t *process;
532 nxt_event_engine_t *engine;
533
534 rt = obj;
535 engine = data;
536
537 nxt_debug(task, "thread pools: %d", rt->thread_pools->nelts);
538
539 if (!nxt_array_is_empty(rt->thread_pools)) {
540 return;
541 }
542
543 if (rt->type == NXT_PROCESS_MAIN) {
544 if (rt->pid_file != NULL) {
545 nxt_file_delete(rt->pid_file);
546 }
547
548#if (NXT_HAVE_UNIX_DOMAIN)
549 {
550 nxt_sockaddr_t *sa;
551 nxt_file_name_t *name;
552
553 sa = rt->controller_listen;
554
555 if (sa->u.sockaddr.sa_family == AF_UNIX) {
556 name = (nxt_file_name_t *) sa->u.sockaddr_un.sun_path;
557 (void) nxt_file_delete(name);
558 }
559 }
560#endif
561 }
562
563 if (!engine->event.signal_support) {
564 nxt_event_engine_signals_stop(engine);
565 }
566
567 nxt_runtime_process_each(rt, process) {
568
569 nxt_process_close_ports(task, process);
570
571 } nxt_runtime_process_loop;
572
573 if (rt->port_by_type[rt->type] != NULL) {
574 nxt_port_use(task, rt->port_by_type[rt->type], -1);
575 }
576
577 nxt_thread_mutex_destroy(&rt->processes_mutex);
578
579 status = rt->status;
580 nxt_mp_destroy(rt->mem_pool);
581
582 nxt_debug(task, "exit: %d", status);
583
584 exit(status);
585 nxt_unreachable();
586}
587
588
589static nxt_int_t
590nxt_runtime_event_engine_change(nxt_task_t *task, nxt_runtime_t *rt)
591{
592 nxt_event_engine_t *engine;
593 const nxt_event_interface_t *interface;
594
595 engine = task->thread->engine;
596
597 if (engine->batch == rt->batch
598 && nxt_strcmp(engine->event.name, rt->engine) == 0)
599 {
600 return NXT_OK;
601 }
602
603 interface = nxt_service_get(rt->services, "engine", rt->engine);
604
605 if (interface != NULL) {
606 return nxt_event_engine_change(engine, interface, rt->batch);
607 }
608
609 return NXT_ERROR;
610}
611
612
613void
614nxt_runtime_event_engine_free(nxt_runtime_t *rt)
615{
616 nxt_queue_link_t *link;
617 nxt_event_engine_t *engine;
618
619 link = nxt_queue_first(&rt->engines);
620 nxt_queue_remove(link);
621
622 engine = nxt_queue_link_data(link, nxt_event_engine_t, link);
623 nxt_event_engine_free(engine);
624}
625
626
627nxt_int_t
628nxt_runtime_thread_pool_create(nxt_thread_t *thr, nxt_runtime_t *rt,
629 nxt_uint_t max_threads, nxt_nsec_t timeout)
630{
631 nxt_thread_pool_t *thread_pool, **tp;
632
633 tp = nxt_array_add(rt->thread_pools);
634 if (tp == NULL) {
635 return NXT_ERROR;
636 }
637
638 thread_pool = nxt_thread_pool_create(max_threads, timeout,
639 nxt_runtime_thread_pool_init,
640 thr->engine,
641 nxt_runtime_thread_pool_exit);
642
643 if (nxt_fast_path(thread_pool != NULL)) {
644 *tp = thread_pool;
645 }
646
647 return NXT_OK;
648}
649
650
651static void
652nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt,
653 nxt_runtime_cont_t cont)
654{
655 nxt_uint_t n;
656 nxt_thread_pool_t **tp;
657
658 rt->continuation = cont;
659
660 n = rt->thread_pools->nelts;
661
662 if (n == 0) {
663 cont(task, 0);
664 return;
665 }
666
667 tp = rt->thread_pools->elts;
668
669 do {
670 nxt_thread_pool_destroy(*tp);
671
672 tp++;
673 n--;
674 } while (n != 0);
675}
676
677
678static void
679nxt_runtime_thread_pool_init(void)
680{
681#if (NXT_REGEX)
682 nxt_regex_init(0);
683#endif
684}
685
686
687static void
688nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj, void *data)
689{
690 nxt_uint_t i, n;
691 nxt_runtime_t *rt;
692 nxt_thread_pool_t *tp, **thread_pools;
693 nxt_thread_handle_t handle;
694
695 tp = obj;
696
697 if (data != NULL) {
698 handle = (nxt_thread_handle_t) (uintptr_t) data;
699 nxt_thread_wait(handle);
700 }
701
702 rt = task->thread->runtime;
703
704 thread_pools = rt->thread_pools->elts;
705 n = rt->thread_pools->nelts;
706
707 nxt_debug(task, "thread pools: %ui", n);
708
709 for (i = 0; i < n; i++) {
710
711 if (tp == thread_pools[i]) {
712 nxt_array_remove(rt->thread_pools, &thread_pools[i]);
713
714 nxt_free(tp);
715
716 if (n == 1) {
717 /* The last thread pool. */
718 rt->continuation(task, 0);
719 }
720
721 return;
722 }
723 }
724}
725
726
727static nxt_int_t
728nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt)
729{
730 nxt_int_t ret;
731 nxt_str_t control;
732 nxt_uint_t n;
733 nxt_file_t *file;
734 const char *slash;
735 nxt_sockaddr_t *sa;
736 nxt_file_name_str_t file_name;
737 const nxt_event_interface_t *interface;
738
739 rt->daemon = 1;
740 rt->engine_connections = 256;
741 rt->auxiliary_threads = 2;
742 rt->user_cred.user = NXT_USER;
743 rt->group = NXT_GROUP;
744 rt->pid = NXT_PID;
745 rt->log = NXT_LOG;
746 rt->modules = NXT_MODULES;
747 rt->state = NXT_STATE;
748 rt->control = NXT_CONTROL_SOCK;
749 rt->tmp = NXT_TMP;
750
751 nxt_memzero(&rt->capabilities, sizeof(nxt_capabilities_t));
752
753 if (nxt_runtime_conf_read_cmd(task, rt) != NXT_OK) {
754 return NXT_ERROR;
755 }
756
757 if (nxt_capability_set(task, &rt->capabilities) != NXT_OK) {
758 return NXT_ERROR;
759 }
760
761 if (rt->capabilities.setid) {
762 ret = nxt_credential_get(task, rt->mem_pool, &rt->user_cred,
763 rt->group);
764
765 if (nxt_slow_path(ret != NXT_OK)) {
766 return NXT_ERROR;
767 }
768
769 } else {
770 nxt_log(task, NXT_LOG_WARN, "Unit is running unprivileged, then it "
771 "cannot use arbitrary user and group.");
772 }
773
774 /* An engine's parameters. */
775
776 interface = nxt_service_get(rt->services, "engine", rt->engine);
777 if (interface == NULL) {
778 return NXT_ERROR;
779 }
780
781 rt->engine = interface->name;
782
783 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%Z", rt->pid);
784 if (nxt_slow_path(ret != NXT_OK)) {
785 return NXT_ERROR;
786 }
787
788 rt->pid_file = file_name.start;
789
790 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%Z", rt->log);
791 if (nxt_slow_path(ret != NXT_OK)) {
792 return NXT_ERROR;
793 }
794
795 file = nxt_list_first(rt->log_files);
796 file->name = file_name.start;
797
798 slash = "";
799 n = nxt_strlen(rt->modules);
800
801 if (n > 1 && rt->modules[n - 1] != '/') {
802 slash = "/";
803 }
804
805 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%s*.unit.so%Z",
806 rt->modules, slash);
807 if (nxt_slow_path(ret != NXT_OK)) {
808 return NXT_ERROR;
809 }
810
811 rt->modules = (char *) file_name.start;
812
813 slash = "";
814 n = nxt_strlen(rt->state);
815
816 if (n > 1 && rt->state[n - 1] != '/') {
817 slash = "/";
818 }
819
820 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%sconf.json%Z",
821 rt->state, slash);
822 if (nxt_slow_path(ret != NXT_OK)) {
823 return NXT_ERROR;
824 }
825
826 rt->conf = (char *) file_name.start;
827
828 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s.tmp%Z", rt->conf);
829 if (nxt_slow_path(ret != NXT_OK)) {
830 return NXT_ERROR;
831 }
832
833 rt->conf_tmp = (char *) file_name.start;
834
835 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%scerts/%Z",
836 rt->state, slash);
837 if (nxt_slow_path(ret != NXT_OK)) {
838 return NXT_ERROR;
839 }
840
841 ret = mkdir((char *) file_name.start, S_IRWXU);
842
843 if (nxt_fast_path(ret == 0 || nxt_errno == EEXIST)) {
844 rt->certs.length = file_name.len;
845 rt->certs.start = file_name.start;
846
847 } else {
848 nxt_alert(task, "Unable to create certificates storage directory: "
849 "mkdir(%s) failed %E", file_name.start, nxt_errno);
850 }
851
852 control.length = nxt_strlen(rt->control);
853 control.start = (u_char *) rt->control;
854
855 sa = nxt_sockaddr_parse(rt->mem_pool, &control);
856 if (nxt_slow_path(sa == NULL)) {
857 return NXT_ERROR;
858 }
859
860 sa->type = SOCK_STREAM;
861
862 rt->controller_listen = sa;
863
864 if (nxt_runtime_controller_socket(task, rt) != NXT_OK) {
865 return NXT_ERROR;
866 }
867
868 return NXT_OK;
869}
870
871
872static nxt_int_t
873nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt)
874{
875 char *p, **argv;
876 u_char *end;
877 u_char buf[1024];
878
879 static const char version[] =
880 "unit version: " NXT_VERSION "\n"
881 "configured as ./configure" NXT_CONFIGURE_OPTIONS "\n";
882
883 static const char no_control[] =
884 "option \"--control\" requires socket address\n";
885 static const char no_user[] = "option \"--user\" requires username\n";
886 static const char no_group[] = "option \"--group\" requires group name\n";
887 static const char no_pid[] = "option \"--pid\" requires filename\n";
888 static const char no_log[] = "option \"--log\" requires filename\n";
889 static const char no_modules[] =
890 "option \"--modules\" requires directory\n";
891 static const char no_state[] = "option \"--state\" requires directory\n";
892 static const char no_tmp[] = "option \"--tmp\" requires directory\n";
893
894 static const char help[] =
895 "\n"
896 "unit options:\n"
897 "\n"
898 " --version print unit version and configure options\n"
899 "\n"
900 " --no-daemon run unit in non-daemon mode\n"
901 "\n"
902 " --control ADDRESS set address of control API socket\n"
903 " default: \"" NXT_CONTROL_SOCK "\"\n"
904 "\n"
905 " --pid FILE set pid filename\n"
906 " default: \"" NXT_PID "\"\n"
907 "\n"
908 " --log FILE set log filename\n"
909 " default: \"" NXT_LOG "\"\n"
910 "\n"
911 " --modules DIRECTORY set modules directory name\n"
912 " default: \"" NXT_MODULES "\"\n"
913 "\n"
914 " --state DIRECTORY set state directory name\n"
915 " default: \"" NXT_STATE "\"\n"
916 "\n"
917 " --tmp DIRECTORY set tmp directory name\n"
918 " default: \"" NXT_TMP "\"\n"
919 "\n"
920 " --user USER set non-privileged processes to run"
921 " as specified user\n"
922 " default: \"" NXT_USER "\"\n"
923 "\n"
924 " --group GROUP set non-privileged processes to run"
925 " as specified group\n"
926 " default: ";
927
928 static const char group[] = "\"" NXT_GROUP "\"\n\n";
929 static const char primary[] = "user's primary group\n\n";
930
931 argv = &nxt_process_argv[1];
932
933 while (*argv != NULL) {
934 p = *argv++;
935
936 if (nxt_strcmp(p, "--control") == 0) {
937 if (*argv == NULL) {
938 write(STDERR_FILENO, no_control, nxt_length(no_control));
939 return NXT_ERROR;
940 }
941
942 p = *argv++;
943
944 rt->control = p;
945
946 continue;
947 }
948
949 if (nxt_strcmp(p, "--user") == 0) {
950 if (*argv == NULL) {
951 write(STDERR_FILENO, no_user, nxt_length(no_user));
952 return NXT_ERROR;
953 }
954
955 p = *argv++;
956
957 rt->user_cred.user = p;
958
959 continue;
960 }
961
962 if (nxt_strcmp(p, "--group") == 0) {
963 if (*argv == NULL) {
964 write(STDERR_FILENO, no_group, nxt_length(no_group));
965 return NXT_ERROR;
966 }
967
968 p = *argv++;
969
970 rt->group = p;
971
972 continue;
973 }
974
975 if (nxt_strcmp(p, "--pid") == 0) {
976 if (*argv == NULL) {
977 write(STDERR_FILENO, no_pid, nxt_length(no_pid));
978 return NXT_ERROR;
979 }
980
981 p = *argv++;
982
983 rt->pid = p;
984
985 continue;
986 }
987
988 if (nxt_strcmp(p, "--log") == 0) {
989 if (*argv == NULL) {
990 write(STDERR_FILENO, no_log, nxt_length(no_log));
991 return NXT_ERROR;
992 }
993
994 p = *argv++;
995
996 rt->log = p;
997
998 continue;
999 }
1000
1001 if (nxt_strcmp(p, "--modules") == 0) {
1002 if (*argv == NULL) {
1003 write(STDERR_FILENO, no_modules, nxt_length(no_modules));
1004 return NXT_ERROR;
1005 }
1006
1007 p = *argv++;
1008
1009 rt->modules = p;
1010
1011 continue;
1012 }
1013
1014 if (nxt_strcmp(p, "--state") == 0) {
1015 if (*argv == NULL) {
1016 write(STDERR_FILENO, no_state, nxt_length(no_state));
1017 return NXT_ERROR;
1018 }
1019
1020 p = *argv++;
1021
1022 rt->state = p;
1023
1024 continue;
1025 }
1026
1027 if (nxt_strcmp(p, "--tmp") == 0) {
1028 if (*argv == NULL) {
1029 write(STDERR_FILENO, no_tmp, nxt_length(no_tmp));
1030 return NXT_ERROR;
1031 }
1032
1033 p = *argv++;
1034
1035 rt->tmp = p;
1036
1037 continue;
1038 }
1039
1040 if (nxt_strcmp(p, "--no-daemon") == 0) {
1041 rt->daemon = 0;
1042 continue;
1043 }
1044
1045 if (nxt_strcmp(p, "--version") == 0) {
1046 write(STDERR_FILENO, version, nxt_length(version));
1047 exit(0);
1048 }
1049
1050 if (nxt_strcmp(p, "--help") == 0 || nxt_strcmp(p, "-h") == 0) {
1051 write(STDOUT_FILENO, help, nxt_length(help));
1052
1053 if (sizeof(NXT_GROUP) == 1) {
1054 write(STDOUT_FILENO, primary, nxt_length(primary));
1055
1056 } else {
1057 write(STDOUT_FILENO, group, nxt_length(group));
1058 }
1059
1060 exit(0);
1061 }
1062
1063 end = nxt_sprintf(buf, buf + sizeof(buf), "unknown option \"%s\", "
1064 "try \"%s -h\" for available options\n",
1065 p, nxt_process_argv[0]);
1066
1067 write(STDERR_FILENO, buf, end - buf);
1068
1069 return NXT_ERROR;
1070 }
1071
1072 return NXT_OK;
1073}
1074
1075
1076nxt_listen_socket_t *
1077nxt_runtime_listen_socket_add(nxt_runtime_t *rt, nxt_sockaddr_t *sa)
1078{
1079 nxt_mp_t *mp;
1080 nxt_listen_socket_t *ls;
1081
1082 ls = nxt_array_zero_add(rt->listen_sockets);
1083 if (ls == NULL) {
1084 return NULL;
1085 }
1086
1087 mp = rt->mem_pool;
1088
1089 ls->sockaddr = nxt_sockaddr_create(mp, &sa->u.sockaddr, sa->socklen,
1090 sa->length);
1091 if (ls->sockaddr == NULL) {
1092 return NULL;
1093 }
1094
1095 ls->sockaddr->type = sa->type;
1096
1097 nxt_sockaddr_text(ls->sockaddr);
1098
1099 ls->socket = -1;
1100 ls->backlog = NXT_LISTEN_BACKLOG;
1101
1102 return ls;
1103}
1104
1105
1106static nxt_int_t
1107nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt)
1108{
1109 size_t length;
1110 char hostname[NXT_MAXHOSTNAMELEN + 1];
1111
1112 if (gethostname(hostname, NXT_MAXHOSTNAMELEN) != 0) {
1113 nxt_alert(task, "gethostname() failed %E", nxt_errno);
1114 return NXT_ERROR;
1115 }
1116
1117 /*
1118 * Linux gethostname(2):
1119 *
1120 * If the null-terminated hostname is too large to fit,
1121 * then the name is truncated, and no error is returned.
1122 *
1123 * For this reason an additional byte is reserved in the buffer.
1124 */
1125 hostname[NXT_MAXHOSTNAMELEN] = '\0';
1126
1127 length = nxt_strlen(hostname);
1128 rt->hostname.length = length;
1129
1130 rt->hostname.start = nxt_mp_nget(rt->mem_pool, length);
1131
1132 if (rt->hostname.start != NULL) {
1133 nxt_memcpy_lowcase(rt->hostname.start, (u_char *) hostname, length);
1134 return NXT_OK;
1135 }
1136
1137 return NXT_ERROR;
1138}
1139
1140
1141static nxt_int_t
1142nxt_runtime_log_files_init(nxt_runtime_t *rt)
1143{
1144 nxt_file_t *file;
1145 nxt_list_t *log_files;
1146
1147 log_files = nxt_list_create(rt->mem_pool, 1, sizeof(nxt_file_t));
1148
1149 if (nxt_fast_path(log_files != NULL)) {
1150 rt->log_files = log_files;
1151
1152 /* Preallocate the main log. This allocation cannot fail. */
1153 file = nxt_list_zero_add(log_files);
1154
1155 file->fd = NXT_FILE_INVALID;
1156 file->log_level = NXT_LOG_ALERT;
1157
1158 return NXT_OK;
1159 }
1160
1161 return NXT_ERROR;
1162}
1163
1164
1165nxt_file_t *
1166nxt_runtime_log_file_add(nxt_runtime_t *rt, nxt_str_t *name)
1167{
1168 nxt_int_t ret;
1169 nxt_file_t *file;
1170 nxt_file_name_str_t file_name;
1171
1172 ret = nxt_file_name_create(rt->mem_pool, &file_name, "V%Z", name);
1173
1174 if (nxt_slow_path(ret != NXT_OK)) {
1175 return NULL;
1176 }
1177
1178 nxt_list_each(file, rt->log_files) {
1179
1180 /* STUB: hardecoded case sensitive/insensitive. */
1181
1182 if (file->name != NULL
1183 && nxt_file_name_eq(file->name, file_name.start))
1184 {
1185 return file;
1186 }
1187
1188 } nxt_list_loop;
1189
1190 file = nxt_list_zero_add(rt->log_files);
1191
1192 if (nxt_slow_path(file == NULL)) {
1193 return NULL;
1194 }
1195
1196 file->fd = NXT_FILE_INVALID;
1197 file->log_level = NXT_LOG_ALERT;
1198 file->name = file_name.start;
1199
1200 return file;
1201}
1202
1203
1204static nxt_int_t
1205nxt_runtime_log_files_create(nxt_task_t *task, nxt_runtime_t *rt)
1206{
1207 nxt_int_t ret;
1208 nxt_file_t *file;
1209
1210 nxt_list_each(file, rt->log_files) {
1211
1212 ret = nxt_file_open(task, file, O_WRONLY | O_APPEND, O_CREAT,
1213 NXT_FILE_OWNER_ACCESS);
1214
1215 if (ret != NXT_OK) {
1216 return NXT_ERROR;
1217 }
1218
1219 } nxt_list_loop;
1220
1221 file = nxt_list_first(rt->log_files);
1222
1223 return nxt_file_stderr(file);
1224}
1225
1226
1227nxt_int_t
1228nxt_runtime_listen_sockets_create(nxt_task_t *task, nxt_runtime_t *rt)
1229{
1230 nxt_int_t ret;
1231 nxt_uint_t c, p, ncurr, nprev;
1232 nxt_listen_socket_t *curr, *prev;
1233
1234 curr = rt->listen_sockets->elts;
1235 ncurr = rt->listen_sockets->nelts;
1236
1237 if (rt->inherited_sockets != NULL) {
1238 prev = rt->inherited_sockets->elts;
1239 nprev = rt->inherited_sockets->nelts;
1240
1241 } else {
1242 prev = NULL;
1243 nprev = 0;
1244 }
1245
1246 for (c = 0; c < ncurr; c++) {
1247
1248 for (p = 0; p < nprev; p++) {
1249
1250 if (nxt_sockaddr_cmp(curr[c].sockaddr, prev[p].sockaddr)) {
1251
1252 ret = nxt_listen_socket_update(task, &curr[c], &prev[p]);
1253 if (ret != NXT_OK) {
1254 return NXT_ERROR;
1255 }
1256
1257 goto next;
1258 }
1259 }
1260
1261 if (nxt_listen_socket_create(task, rt->mem_pool, &curr[c]) != NXT_OK) {
1262 return NXT_ERROR;
1263 }
1264
1265 next:
1266
1267 continue;
1268 }
1269
1270 return NXT_OK;
1271}
1272
1273
1274nxt_int_t
1275nxt_runtime_listen_sockets_enable(nxt_task_t *task, nxt_runtime_t *rt)
1276{
1277 nxt_uint_t i, n;
1278 nxt_listen_socket_t *ls;
1279
1280 ls = rt->listen_sockets->elts;
1281 n = rt->listen_sockets->nelts;
1282
1283 for (i = 0; i < n; i++) {
1284 if (ls[i].flags == NXT_NONBLOCK) {
1285 if (nxt_listen_event(task, &ls[i]) == NULL) {
1286 return NXT_ERROR;
1287 }
1288 }
1289 }
1290
1291 return NXT_OK;
1292}
1293
1294
1295nxt_str_t *
1296nxt_current_directory(nxt_mp_t *mp)
1297{
1298 size_t length;
1299 u_char *p;
1300 nxt_str_t *name;
1301 char buf[NXT_MAX_PATH_LEN];
1302
1303 length = nxt_dir_current(buf, NXT_MAX_PATH_LEN);
1304
1305 if (nxt_fast_path(length != 0)) {
1306 name = nxt_str_alloc(mp, length + 1);
1307
1308 if (nxt_fast_path(name != NULL)) {
1309 p = nxt_cpymem(name->start, buf, length);
1310 *p = '/';
1311
1312 return name;
1313 }
1314 }
1315
1316 return NULL;
1317}
1318
1319
1320static nxt_int_t
1321nxt_runtime_pid_file_create(nxt_task_t *task, nxt_file_name_t *pid_file)
1322{
1323 ssize_t length;
1324 nxt_int_t n;
1325 nxt_file_t file;
1326 u_char pid[NXT_INT64_T_LEN + nxt_length("\n")];
1327
1328 nxt_memzero(&file, sizeof(nxt_file_t));
1329
1330 file.name = pid_file;
1331
1332 n = nxt_file_open(task, &file, O_WRONLY, O_CREAT | O_TRUNC,
1333 NXT_FILE_DEFAULT_ACCESS);
1334
1335 if (n != NXT_OK) {
1336 return NXT_ERROR;
1337 }
1338
1339 length = nxt_sprintf(pid, pid + sizeof(pid), "%PI%n", nxt_pid) - pid;
1340
1341 if (nxt_file_write(&file, pid, length, 0) != length) {
1342 return NXT_ERROR;
1343 }
1344
1345 nxt_file_close(task, &file);
1346
1347 return NXT_OK;
1348}
1349
1350
1351nxt_process_t *
1352nxt_runtime_process_new(nxt_runtime_t *rt)
1353{
1354 nxt_process_t *process;
1355
1356 /* TODO: memory failures. */
1357
1358 process = nxt_mp_zalloc(rt->mem_pool,
1359 sizeof(nxt_process_t) + sizeof(nxt_process_init_t));
1360
1361 if (nxt_slow_path(process == NULL)) {
1362 return NULL;
1363 }
1364
1365 nxt_queue_init(&process->ports);
1366
1367 nxt_thread_mutex_create(&process->incoming.mutex);
1368 nxt_thread_mutex_create(&process->outgoing.mutex);
1369 nxt_thread_mutex_create(&process->cp_mutex);
1370
1371 process->use_count = 1;
1372
1373 return process;
1374}
1375
1376
1377void
1378nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process)
1379{
1380 nxt_port_t *port;
1381
1382 if (process->registered == 1) {
1383 nxt_runtime_process_remove(rt, process);
1384 }
1385
1386 nxt_assert(process->use_count == 0);
1387 nxt_assert(process->registered == 0);
1388
1389 nxt_port_mmaps_destroy(&process->incoming, 1);
1390 nxt_port_mmaps_destroy(&process->outgoing, 1);
1391
1392 do {
1393 port = nxt_port_hash_retrieve(&process->connected_ports);
1394
1395 } while (port != NULL);
1396
1397 nxt_thread_mutex_destroy(&process->incoming.mutex);
1398 nxt_thread_mutex_destroy(&process->outgoing.mutex);
1399 nxt_thread_mutex_destroy(&process->cp_mutex);
1400
1401 /* processes from nxt_runtime_process_get() have no memory pool */
1402 if (process->mem_pool != NULL) {
1403 nxt_mp_destroy(process->mem_pool);
1404 }
1405
1406 nxt_mp_free(rt->mem_pool, process);
1407}
1408
1409
1410static nxt_int_t
1411nxt_runtime_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data)
1412{
1413 nxt_process_t *process;
1414
1415 process = data;
1416
1417 if (lhq->key.length == sizeof(nxt_pid_t)
1418 && *(nxt_pid_t *) lhq->key.start == process->pid)
1419 {
1420 return NXT_OK;
1421 }
1422
1423 return NXT_DECLINED;
1424}
1425
1426static const nxt_lvlhsh_proto_t lvlhsh_processes_proto nxt_aligned(64) = {
1427 NXT_LVLHSH_DEFAULT,
1428 nxt_runtime_lvlhsh_pid_test,
1429 nxt_lvlhsh_alloc,
1430 nxt_lvlhsh_free,
1431};
1432
1433
1434nxt_inline void
1435nxt_runtime_process_lhq_pid(nxt_lvlhsh_query_t *lhq, nxt_pid_t *pid)
1436{
1437 lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid));
1438 lhq->key.length = sizeof(*pid);
1439 lhq->key.start = (u_char *) pid;
1440 lhq->proto = &lvlhsh_processes_proto;
1441}
1442
1443
1444nxt_process_t *
1445nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid)
1446{
1447 nxt_process_t *process;
1448 nxt_lvlhsh_query_t lhq;
1449
1450 process = NULL;
1451
1452 nxt_runtime_process_lhq_pid(&lhq, &pid);
1453
1454 nxt_thread_mutex_lock(&rt->processes_mutex);
1455
1456 if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) {
1457 process = lhq.value;
1458
1459 } else {
1460 nxt_thread_log_debug("process %PI not found", pid);
1461 }
1462
1463 nxt_thread_mutex_unlock(&rt->processes_mutex);
1464
1465 return process;
1466}
1467
1468
1469static nxt_process_t *
1470nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid)
1471{
1472 nxt_process_t *process;
1473 nxt_lvlhsh_query_t lhq;
1474
1475 nxt_runtime_process_lhq_pid(&lhq, &pid);
1476
1477 nxt_thread_mutex_lock(&rt->processes_mutex);
1478
1479 if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) {
1480 nxt_thread_log_debug("process %PI found", pid);
1481
1482 nxt_thread_mutex_unlock(&rt->processes_mutex);
1483
1484 process = lhq.value;
1485 process->use_count++;
1486
1487 return process;
1488 }
1489
1490 process = nxt_runtime_process_new(rt);
1491 if (nxt_slow_path(process == NULL)) {
1492
1493 nxt_thread_mutex_unlock(&rt->processes_mutex);
1494
1495 return NULL;
1496 }
1497
1498 process->pid = pid;
1499
1500 lhq.replace = 0;
1501 lhq.value = process;
1502 lhq.pool = rt->mem_pool;
1503
1504 switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) {
1505
1506 case NXT_OK:
1507 if (rt->nprocesses == 0) {
1508 rt->mprocess = process;
1509 }
1510
1511 rt->nprocesses++;
1512
1513 process->registered = 1;
1514
1515 nxt_thread_log_debug("process %PI insert", pid);
1516 break;
1517
1518 default:
1519 nxt_thread_log_debug("process %PI insert failed", pid);
1520 break;
1521 }
1522
1523 nxt_thread_mutex_unlock(&rt->processes_mutex);
1524
1525 return process;
1526}
1527
1528
1529void
1530nxt_runtime_process_add(nxt_task_t *task, nxt_process_t *process)
1531{
1532 nxt_port_t *port;
1533 nxt_runtime_t *rt;
1534 nxt_lvlhsh_query_t lhq;
1535
1536 nxt_assert(process->registered == 0);
1537
1538 rt = task->thread->runtime;
1539
1540 nxt_runtime_process_lhq_pid(&lhq, &process->pid);
1541
1542 lhq.replace = 0;
1543 lhq.value = process;
1544 lhq.pool = rt->mem_pool;
1545
1546 nxt_thread_mutex_lock(&rt->processes_mutex);
1547
1548 switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) {
1549
1550 case NXT_OK:
1551 if (rt->nprocesses == 0) {
1552 rt->mprocess = process;
1553 }
1554
1555 rt->nprocesses++;
1556
1557 nxt_process_port_each(process, port) {
1558
1559 port->pid = process->pid;
1560
1561 nxt_runtime_port_add(task, port);
1562
1563 } nxt_process_port_loop;
1564
1565 process->registered = 1;
1566
1567 nxt_thread_log_debug("process %PI added", process->pid);
1568 break;
1569
1570 default:
1571 nxt_thread_log_debug("process %PI failed to add", process->pid);
1572 break;
1573 }
1574
1575 nxt_thread_mutex_unlock(&rt->processes_mutex);
1576}
1577
1578
1579static void
1580nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process)
1581{
1582 nxt_pid_t pid;
1583 nxt_lvlhsh_query_t lhq;
1584
1585 pid = process->pid;
1586
1587 nxt_runtime_process_lhq_pid(&lhq, &pid);
1588
1589 lhq.pool = rt->mem_pool;
1590
1591 nxt_thread_mutex_lock(&rt->processes_mutex);
1592
1593 switch (nxt_lvlhsh_delete(&rt->processes, &lhq)) {
1594
1595 case NXT_OK:
1596 rt->nprocesses--;
1597
1598 process = lhq.value;
1599
1600 process->registered = 0;
1601
1602 nxt_thread_log_debug("process %PI removed", pid);
1603 break;
1604
1605 default:
1606 nxt_thread_log_debug("process %PI remove failed", pid);
1607 break;
1608 }
1609
1610 nxt_thread_mutex_unlock(&rt->processes_mutex);
1611}
1612
1613
1614nxt_process_t *
1615nxt_runtime_process_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe)
1616{
1617 nxt_lvlhsh_each_init(lhe, &lvlhsh_processes_proto);
1618
1619 return nxt_runtime_process_next(rt, lhe);
1620}
1621
1622
1623nxt_port_t *
1624nxt_runtime_process_port_create(nxt_task_t *task, nxt_runtime_t *rt,
1625 nxt_pid_t pid, nxt_port_id_t id, nxt_process_type_t type)
1626{
1627 nxt_port_t *port;
1628 nxt_process_t *process;
1629
1630 process = nxt_runtime_process_get(rt, pid);
1631 if (nxt_slow_path(process == NULL)) {
1632 return NULL;
1633 }
1634
1635 port = nxt_port_new(task, id, pid, type);
1636 if (nxt_slow_path(port == NULL)) {
1637 nxt_process_use(task, process, -1);
1638 return NULL;
1639 }
1640
1641 nxt_process_port_add(task, process, port);
1642
1643 nxt_process_use(task, process, -1);
1644
1645 nxt_runtime_port_add(task, port);
1646
1647 nxt_port_use(task, port, -1);
1648
1649 return port;
1650}
1651
1652
1653static void
1654nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port)
1655{
1656 nxt_int_t res;
1657 nxt_runtime_t *rt;
1658
1659 rt = task->thread->runtime;
1660
1661 res = nxt_port_hash_add(&rt->ports, port);
1662
1663 if (res != NXT_OK) {
1664 return;
1665 }
1666
1667 rt->port_by_type[port->type] = port;
1668
1669 nxt_port_use(task, port, 1);
1670}
1671
1672
1673void
1674nxt_runtime_port_remove(nxt_task_t *task, nxt_port_t *port)
1675{
1676 nxt_int_t res;
1677 nxt_runtime_t *rt;
1678
1679 rt = task->thread->runtime;
1680
1681 res = nxt_port_hash_remove(&rt->ports, port);
1682
1683 if (res != NXT_OK) {
1684 return;
1685 }
1686
1687 if (rt->port_by_type[port->type] == port) {
1688 rt->port_by_type[port->type] = NULL;
1689 }
1690
1691 nxt_port_use(task, port, -1);
1692}
1693
1694
1695nxt_port_t *
1696nxt_runtime_port_find(nxt_runtime_t *rt, nxt_pid_t pid,
1697 nxt_port_id_t port_id)
1698{
1699 return nxt_port_hash_find(&rt->ports, pid, port_id);
1700}
88
89 listen_sockets = nxt_array_create(mp, 1, sizeof(nxt_listen_socket_t));
90 if (nxt_slow_path(listen_sockets == NULL)) {
91 goto fail;
92 }
93
94 rt->listen_sockets = listen_sockets;
95
96 ret = nxt_runtime_inherited_listen_sockets(task, rt);
97 if (nxt_slow_path(ret != NXT_OK)) {
98 goto fail;
99 }
100
101 if (nxt_runtime_hostname(task, rt) != NXT_OK) {
102 goto fail;
103 }
104
105 if (nxt_slow_path(nxt_runtime_log_files_init(rt) != NXT_OK)) {
106 goto fail;
107 }
108
109 if (nxt_runtime_event_engines(task, rt) != NXT_OK) {
110 goto fail;
111 }
112
113 if (nxt_slow_path(nxt_runtime_thread_pools(task->thread, rt) != NXT_OK)) {
114 goto fail;
115 }
116
117 rt->start = nxt_runtime_initial_start;
118
119 if (nxt_runtime_conf_init(task, rt) != NXT_OK) {
120 goto fail;
121 }
122
123 if (nxt_port_rpc_init() != NXT_OK) {
124 goto fail;
125 }
126
127 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
128 nxt_runtime_start, task, rt, NULL);
129
130 return NXT_OK;
131
132fail:
133
134 nxt_mp_destroy(mp);
135
136 return NXT_ERROR;
137}
138
139
140static nxt_int_t
141nxt_runtime_inherited_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt)
142{
143 u_char *v, *p;
144 nxt_int_t type;
145 nxt_array_t *inherited_sockets;
146 nxt_socket_t s;
147 nxt_listen_socket_t *ls;
148
149 v = (u_char *) getenv("NGINX");
150
151 if (v == NULL) {
152 return nxt_runtime_systemd_listen_sockets(task, rt);
153 }
154
155 nxt_alert(task, "using inherited listen sockets: %s", v);
156
157 inherited_sockets = nxt_array_create(rt->mem_pool,
158 1, sizeof(nxt_listen_socket_t));
159 if (inherited_sockets == NULL) {
160 return NXT_ERROR;
161 }
162
163 rt->inherited_sockets = inherited_sockets;
164
165 for (p = v; *p != '\0'; p++) {
166
167 if (*p == ';') {
168 s = nxt_int_parse(v, p - v);
169
170 if (nxt_slow_path(s < 0)) {
171 nxt_alert(task, "invalid socket number \"%s\" "
172 "in NGINX environment variable, "
173 "ignoring the rest of the variable", v);
174 return NXT_ERROR;
175 }
176
177 v = p + 1;
178
179 ls = nxt_array_zero_add(inherited_sockets);
180 if (nxt_slow_path(ls == NULL)) {
181 return NXT_ERROR;
182 }
183
184 ls->socket = s;
185
186 ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s);
187 if (nxt_slow_path(ls->sockaddr == NULL)) {
188 return NXT_ERROR;
189 }
190
191 type = nxt_socket_getsockopt(task, s, SOL_SOCKET, SO_TYPE);
192 if (nxt_slow_path(type == -1)) {
193 return NXT_ERROR;
194 }
195
196 ls->sockaddr->type = (uint16_t) type;
197 }
198 }
199
200 return NXT_OK;
201}
202
203
204static nxt_int_t
205nxt_runtime_systemd_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt)
206{
207 u_char *nfd, *pid;
208 nxt_int_t n;
209 nxt_array_t *inherited_sockets;
210 nxt_socket_t s;
211 nxt_listen_socket_t *ls;
212
213 /*
214 * Number of listening sockets passed. The socket
215 * descriptors start from number 3 and are sequential.
216 */
217 nfd = (u_char *) getenv("LISTEN_FDS");
218 if (nfd == NULL) {
219 return NXT_OK;
220 }
221
222 /* The pid of the service process. */
223 pid = (u_char *) getenv("LISTEN_PID");
224 if (pid == NULL) {
225 return NXT_OK;
226 }
227
228 n = nxt_int_parse(nfd, nxt_strlen(nfd));
229 if (n < 0) {
230 return NXT_OK;
231 }
232
233 if (nxt_pid != nxt_int_parse(pid, nxt_strlen(pid))) {
234 return NXT_OK;
235 }
236
237 nxt_log(task, NXT_LOG_INFO, "using %i systemd listen sockets", n);
238
239 inherited_sockets = nxt_array_create(rt->mem_pool,
240 n, sizeof(nxt_listen_socket_t));
241 if (inherited_sockets == NULL) {
242 return NXT_ERROR;
243 }
244
245 rt->inherited_sockets = inherited_sockets;
246
247 for (s = 3; s < n; s++) {
248 ls = nxt_array_zero_add(inherited_sockets);
249 if (nxt_slow_path(ls == NULL)) {
250 return NXT_ERROR;
251 }
252
253 ls->socket = s;
254
255 ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s);
256 if (nxt_slow_path(ls->sockaddr == NULL)) {
257 return NXT_ERROR;
258 }
259
260 ls->sockaddr->type = SOCK_STREAM;
261 }
262
263 return NXT_OK;
264}
265
266
267static nxt_int_t
268nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt)
269{
270 nxt_thread_t *thread;
271 nxt_event_engine_t *engine;
272 const nxt_event_interface_t *interface;
273
274 interface = nxt_service_get(rt->services, "engine", NULL);
275
276 if (nxt_slow_path(interface == NULL)) {
277 /* TODO: log */
278 return NXT_ERROR;
279 }
280
281 engine = nxt_event_engine_create(task, interface,
282 nxt_main_process_signals, 0, 0);
283
284 if (nxt_slow_path(engine == NULL)) {
285 return NXT_ERROR;
286 }
287
288 thread = task->thread;
289 thread->engine = engine;
290#if 0
291 thread->fiber = &engine->fibers->fiber;
292#endif
293
294 engine->id = rt->last_engine_id++;
295 engine->mem_pool = nxt_mp_create(1024, 128, 256, 32);
296
297 nxt_queue_init(&rt->engines);
298 nxt_queue_insert_tail(&rt->engines, &engine->link);
299
300 return NXT_OK;
301}
302
303
304static nxt_int_t
305nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt)
306{
307 nxt_int_t ret;
308 nxt_array_t *thread_pools;
309
310 thread_pools = nxt_array_create(rt->mem_pool, 1,
311 sizeof(nxt_thread_pool_t *));
312
313 if (nxt_slow_path(thread_pools == NULL)) {
314 return NXT_ERROR;
315 }
316
317 rt->thread_pools = thread_pools;
318 ret = nxt_runtime_thread_pool_create(thr, rt, 2, 60000 * 1000000LL);
319
320 if (nxt_slow_path(ret != NXT_OK)) {
321 return NXT_ERROR;
322 }
323
324 return NXT_OK;
325}
326
327
328static void
329nxt_runtime_start(nxt_task_t *task, void *obj, void *data)
330{
331 nxt_runtime_t *rt;
332
333 rt = obj;
334
335 nxt_debug(task, "rt conf done");
336
337 task->thread->log->ctx_handler = NULL;
338 task->thread->log->ctx = NULL;
339
340 if (nxt_runtime_log_files_create(task, rt) != NXT_OK) {
341 goto fail;
342 }
343
344 if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) {
345 goto fail;
346 }
347
348 /*
349 * Thread pools should be destroyed before starting worker
350 * processes, because thread pool semaphores will stick in
351 * locked state in new processes after fork().
352 */
353 nxt_runtime_thread_pool_destroy(task, rt, rt->start);
354
355 return;
356
357fail:
358
359 nxt_runtime_quit(task, 1);
360}
361
362
363static void
364nxt_runtime_initial_start(nxt_task_t *task, nxt_uint_t status)
365{
366 nxt_int_t ret;
367 nxt_thread_t *thr;
368 nxt_runtime_t *rt;
369 const nxt_event_interface_t *interface;
370
371 thr = task->thread;
372 rt = thr->runtime;
373
374 if (rt->inherited_sockets == NULL && rt->daemon) {
375
376 if (nxt_process_daemon(task) != NXT_OK) {
377 goto fail;
378 }
379
380 /*
381 * An event engine should be updated after fork()
382 * even if an event facility was not changed because:
383 * 1) inherited kqueue descriptor is invalid,
384 * 2) the signal thread is not inherited.
385 */
386 interface = nxt_service_get(rt->services, "engine", rt->engine);
387 if (interface == NULL) {
388 goto fail;
389 }
390
391 ret = nxt_event_engine_change(task->thread->engine, interface,
392 rt->batch);
393 if (ret != NXT_OK) {
394 goto fail;
395 }
396 }
397
398 ret = nxt_runtime_pid_file_create(task, rt->pid_file);
399 if (ret != NXT_OK) {
400 goto fail;
401 }
402
403 if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) {
404 goto fail;
405 }
406
407 thr->engine->max_connections = rt->engine_connections;
408
409 if (nxt_main_process_start(thr, task, rt) != NXT_ERROR) {
410 return;
411 }
412
413fail:
414
415 nxt_runtime_quit(task, 1);
416}
417
418
419void
420nxt_runtime_quit(nxt_task_t *task, nxt_uint_t status)
421{
422 nxt_bool_t done;
423 nxt_runtime_t *rt;
424 nxt_event_engine_t *engine;
425
426 rt = task->thread->runtime;
427 rt->status |= status;
428 engine = task->thread->engine;
429
430 nxt_debug(task, "exiting");
431
432 done = 1;
433
434 if (!engine->shutdown) {
435 engine->shutdown = 1;
436
437 if (!nxt_array_is_empty(rt->thread_pools)) {
438 nxt_runtime_thread_pool_destroy(task, rt, nxt_runtime_quit);
439 done = 0;
440 }
441
442 if (rt->type == NXT_PROCESS_MAIN) {
443 nxt_runtime_stop_all_processes(task, rt);
444 done = 0;
445 }
446 }
447
448 nxt_runtime_close_idle_connections(engine);
449
450 if (done) {
451 nxt_work_queue_add(&engine->fast_work_queue, nxt_runtime_exit,
452 task, rt, engine);
453 }
454}
455
456
457static void
458nxt_runtime_close_idle_connections(nxt_event_engine_t *engine)
459{
460 nxt_conn_t *c;
461 nxt_queue_t *idle;
462 nxt_queue_link_t *link, *next;
463
464 nxt_debug(&engine->task, "close idle connections");
465
466 idle = &engine->idle_connections;
467
468 for (link = nxt_queue_first(idle);
469 link != nxt_queue_tail(idle);
470 link = next)
471 {
472 next = nxt_queue_next(link);
473 c = nxt_queue_link_data(link, nxt_conn_t, link);
474
475 if (!c->socket.read_ready) {
476 nxt_queue_remove(link);
477 nxt_conn_close(engine, c);
478 }
479 }
480}
481
482
483void
484nxt_runtime_stop_app_processes(nxt_task_t *task, nxt_runtime_t *rt)
485{
486 nxt_port_t *port;
487 nxt_process_t *process;
488 nxt_process_init_t *init;
489
490 nxt_runtime_process_each(rt, process) {
491
492 init = nxt_process_init(process);
493
494 if (init->type == NXT_PROCESS_APP) {
495
496 nxt_process_port_each(process, port) {
497
498 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1,
499 0, 0, NULL);
500
501 } nxt_process_port_loop;
502 }
503
504 } nxt_runtime_process_loop;
505}
506
507
508static void
509nxt_runtime_stop_all_processes(nxt_task_t *task, nxt_runtime_t *rt)
510{
511 nxt_port_t *port;
512 nxt_process_t *process;
513
514 nxt_runtime_process_each(rt, process) {
515
516 nxt_process_port_each(process, port) {
517
518 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0,
519 0, NULL);
520
521 } nxt_process_port_loop;
522
523 } nxt_runtime_process_loop;
524}
525
526
527static void
528nxt_runtime_exit(nxt_task_t *task, void *obj, void *data)
529{
530 int status;
531 nxt_runtime_t *rt;
532 nxt_process_t *process;
533 nxt_event_engine_t *engine;
534
535 rt = obj;
536 engine = data;
537
538 nxt_debug(task, "thread pools: %d", rt->thread_pools->nelts);
539
540 if (!nxt_array_is_empty(rt->thread_pools)) {
541 return;
542 }
543
544 if (rt->type == NXT_PROCESS_MAIN) {
545 if (rt->pid_file != NULL) {
546 nxt_file_delete(rt->pid_file);
547 }
548
549#if (NXT_HAVE_UNIX_DOMAIN)
550 {
551 nxt_sockaddr_t *sa;
552 nxt_file_name_t *name;
553
554 sa = rt->controller_listen;
555
556 if (sa->u.sockaddr.sa_family == AF_UNIX) {
557 name = (nxt_file_name_t *) sa->u.sockaddr_un.sun_path;
558 (void) nxt_file_delete(name);
559 }
560 }
561#endif
562 }
563
564 if (!engine->event.signal_support) {
565 nxt_event_engine_signals_stop(engine);
566 }
567
568 nxt_runtime_process_each(rt, process) {
569
570 nxt_process_close_ports(task, process);
571
572 } nxt_runtime_process_loop;
573
574 if (rt->port_by_type[rt->type] != NULL) {
575 nxt_port_use(task, rt->port_by_type[rt->type], -1);
576 }
577
578 nxt_thread_mutex_destroy(&rt->processes_mutex);
579
580 status = rt->status;
581 nxt_mp_destroy(rt->mem_pool);
582
583 nxt_debug(task, "exit: %d", status);
584
585 exit(status);
586 nxt_unreachable();
587}
588
589
590static nxt_int_t
591nxt_runtime_event_engine_change(nxt_task_t *task, nxt_runtime_t *rt)
592{
593 nxt_event_engine_t *engine;
594 const nxt_event_interface_t *interface;
595
596 engine = task->thread->engine;
597
598 if (engine->batch == rt->batch
599 && nxt_strcmp(engine->event.name, rt->engine) == 0)
600 {
601 return NXT_OK;
602 }
603
604 interface = nxt_service_get(rt->services, "engine", rt->engine);
605
606 if (interface != NULL) {
607 return nxt_event_engine_change(engine, interface, rt->batch);
608 }
609
610 return NXT_ERROR;
611}
612
613
614void
615nxt_runtime_event_engine_free(nxt_runtime_t *rt)
616{
617 nxt_queue_link_t *link;
618 nxt_event_engine_t *engine;
619
620 link = nxt_queue_first(&rt->engines);
621 nxt_queue_remove(link);
622
623 engine = nxt_queue_link_data(link, nxt_event_engine_t, link);
624 nxt_event_engine_free(engine);
625}
626
627
628nxt_int_t
629nxt_runtime_thread_pool_create(nxt_thread_t *thr, nxt_runtime_t *rt,
630 nxt_uint_t max_threads, nxt_nsec_t timeout)
631{
632 nxt_thread_pool_t *thread_pool, **tp;
633
634 tp = nxt_array_add(rt->thread_pools);
635 if (tp == NULL) {
636 return NXT_ERROR;
637 }
638
639 thread_pool = nxt_thread_pool_create(max_threads, timeout,
640 nxt_runtime_thread_pool_init,
641 thr->engine,
642 nxt_runtime_thread_pool_exit);
643
644 if (nxt_fast_path(thread_pool != NULL)) {
645 *tp = thread_pool;
646 }
647
648 return NXT_OK;
649}
650
651
652static void
653nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt,
654 nxt_runtime_cont_t cont)
655{
656 nxt_uint_t n;
657 nxt_thread_pool_t **tp;
658
659 rt->continuation = cont;
660
661 n = rt->thread_pools->nelts;
662
663 if (n == 0) {
664 cont(task, 0);
665 return;
666 }
667
668 tp = rt->thread_pools->elts;
669
670 do {
671 nxt_thread_pool_destroy(*tp);
672
673 tp++;
674 n--;
675 } while (n != 0);
676}
677
678
679static void
680nxt_runtime_thread_pool_init(void)
681{
682#if (NXT_REGEX)
683 nxt_regex_init(0);
684#endif
685}
686
687
688static void
689nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj, void *data)
690{
691 nxt_uint_t i, n;
692 nxt_runtime_t *rt;
693 nxt_thread_pool_t *tp, **thread_pools;
694 nxt_thread_handle_t handle;
695
696 tp = obj;
697
698 if (data != NULL) {
699 handle = (nxt_thread_handle_t) (uintptr_t) data;
700 nxt_thread_wait(handle);
701 }
702
703 rt = task->thread->runtime;
704
705 thread_pools = rt->thread_pools->elts;
706 n = rt->thread_pools->nelts;
707
708 nxt_debug(task, "thread pools: %ui", n);
709
710 for (i = 0; i < n; i++) {
711
712 if (tp == thread_pools[i]) {
713 nxt_array_remove(rt->thread_pools, &thread_pools[i]);
714
715 nxt_free(tp);
716
717 if (n == 1) {
718 /* The last thread pool. */
719 rt->continuation(task, 0);
720 }
721
722 return;
723 }
724 }
725}
726
727
728static nxt_int_t
729nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt)
730{
731 nxt_int_t ret;
732 nxt_str_t control;
733 nxt_uint_t n;
734 nxt_file_t *file;
735 const char *slash;
736 nxt_sockaddr_t *sa;
737 nxt_file_name_str_t file_name;
738 const nxt_event_interface_t *interface;
739
740 rt->daemon = 1;
741 rt->engine_connections = 256;
742 rt->auxiliary_threads = 2;
743 rt->user_cred.user = NXT_USER;
744 rt->group = NXT_GROUP;
745 rt->pid = NXT_PID;
746 rt->log = NXT_LOG;
747 rt->modules = NXT_MODULES;
748 rt->state = NXT_STATE;
749 rt->control = NXT_CONTROL_SOCK;
750 rt->tmp = NXT_TMP;
751
752 nxt_memzero(&rt->capabilities, sizeof(nxt_capabilities_t));
753
754 if (nxt_runtime_conf_read_cmd(task, rt) != NXT_OK) {
755 return NXT_ERROR;
756 }
757
758 if (nxt_capability_set(task, &rt->capabilities) != NXT_OK) {
759 return NXT_ERROR;
760 }
761
762 if (rt->capabilities.setid) {
763 ret = nxt_credential_get(task, rt->mem_pool, &rt->user_cred,
764 rt->group);
765
766 if (nxt_slow_path(ret != NXT_OK)) {
767 return NXT_ERROR;
768 }
769
770 } else {
771 nxt_log(task, NXT_LOG_WARN, "Unit is running unprivileged, then it "
772 "cannot use arbitrary user and group.");
773 }
774
775 /* An engine's parameters. */
776
777 interface = nxt_service_get(rt->services, "engine", rt->engine);
778 if (interface == NULL) {
779 return NXT_ERROR;
780 }
781
782 rt->engine = interface->name;
783
784 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%Z", rt->pid);
785 if (nxt_slow_path(ret != NXT_OK)) {
786 return NXT_ERROR;
787 }
788
789 rt->pid_file = file_name.start;
790
791 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%Z", rt->log);
792 if (nxt_slow_path(ret != NXT_OK)) {
793 return NXT_ERROR;
794 }
795
796 file = nxt_list_first(rt->log_files);
797 file->name = file_name.start;
798
799 slash = "";
800 n = nxt_strlen(rt->modules);
801
802 if (n > 1 && rt->modules[n - 1] != '/') {
803 slash = "/";
804 }
805
806 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%s*.unit.so%Z",
807 rt->modules, slash);
808 if (nxt_slow_path(ret != NXT_OK)) {
809 return NXT_ERROR;
810 }
811
812 rt->modules = (char *) file_name.start;
813
814 slash = "";
815 n = nxt_strlen(rt->state);
816
817 if (n > 1 && rt->state[n - 1] != '/') {
818 slash = "/";
819 }
820
821 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%sconf.json%Z",
822 rt->state, slash);
823 if (nxt_slow_path(ret != NXT_OK)) {
824 return NXT_ERROR;
825 }
826
827 rt->conf = (char *) file_name.start;
828
829 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s.tmp%Z", rt->conf);
830 if (nxt_slow_path(ret != NXT_OK)) {
831 return NXT_ERROR;
832 }
833
834 rt->conf_tmp = (char *) file_name.start;
835
836 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%scerts/%Z",
837 rt->state, slash);
838 if (nxt_slow_path(ret != NXT_OK)) {
839 return NXT_ERROR;
840 }
841
842 ret = mkdir((char *) file_name.start, S_IRWXU);
843
844 if (nxt_fast_path(ret == 0 || nxt_errno == EEXIST)) {
845 rt->certs.length = file_name.len;
846 rt->certs.start = file_name.start;
847
848 } else {
849 nxt_alert(task, "Unable to create certificates storage directory: "
850 "mkdir(%s) failed %E", file_name.start, nxt_errno);
851 }
852
853 control.length = nxt_strlen(rt->control);
854 control.start = (u_char *) rt->control;
855
856 sa = nxt_sockaddr_parse(rt->mem_pool, &control);
857 if (nxt_slow_path(sa == NULL)) {
858 return NXT_ERROR;
859 }
860
861 sa->type = SOCK_STREAM;
862
863 rt->controller_listen = sa;
864
865 if (nxt_runtime_controller_socket(task, rt) != NXT_OK) {
866 return NXT_ERROR;
867 }
868
869 return NXT_OK;
870}
871
872
873static nxt_int_t
874nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt)
875{
876 char *p, **argv;
877 u_char *end;
878 u_char buf[1024];
879
880 static const char version[] =
881 "unit version: " NXT_VERSION "\n"
882 "configured as ./configure" NXT_CONFIGURE_OPTIONS "\n";
883
884 static const char no_control[] =
885 "option \"--control\" requires socket address\n";
886 static const char no_user[] = "option \"--user\" requires username\n";
887 static const char no_group[] = "option \"--group\" requires group name\n";
888 static const char no_pid[] = "option \"--pid\" requires filename\n";
889 static const char no_log[] = "option \"--log\" requires filename\n";
890 static const char no_modules[] =
891 "option \"--modules\" requires directory\n";
892 static const char no_state[] = "option \"--state\" requires directory\n";
893 static const char no_tmp[] = "option \"--tmp\" requires directory\n";
894
895 static const char help[] =
896 "\n"
897 "unit options:\n"
898 "\n"
899 " --version print unit version and configure options\n"
900 "\n"
901 " --no-daemon run unit in non-daemon mode\n"
902 "\n"
903 " --control ADDRESS set address of control API socket\n"
904 " default: \"" NXT_CONTROL_SOCK "\"\n"
905 "\n"
906 " --pid FILE set pid filename\n"
907 " default: \"" NXT_PID "\"\n"
908 "\n"
909 " --log FILE set log filename\n"
910 " default: \"" NXT_LOG "\"\n"
911 "\n"
912 " --modules DIRECTORY set modules directory name\n"
913 " default: \"" NXT_MODULES "\"\n"
914 "\n"
915 " --state DIRECTORY set state directory name\n"
916 " default: \"" NXT_STATE "\"\n"
917 "\n"
918 " --tmp DIRECTORY set tmp directory name\n"
919 " default: \"" NXT_TMP "\"\n"
920 "\n"
921 " --user USER set non-privileged processes to run"
922 " as specified user\n"
923 " default: \"" NXT_USER "\"\n"
924 "\n"
925 " --group GROUP set non-privileged processes to run"
926 " as specified group\n"
927 " default: ";
928
929 static const char group[] = "\"" NXT_GROUP "\"\n\n";
930 static const char primary[] = "user's primary group\n\n";
931
932 argv = &nxt_process_argv[1];
933
934 while (*argv != NULL) {
935 p = *argv++;
936
937 if (nxt_strcmp(p, "--control") == 0) {
938 if (*argv == NULL) {
939 write(STDERR_FILENO, no_control, nxt_length(no_control));
940 return NXT_ERROR;
941 }
942
943 p = *argv++;
944
945 rt->control = p;
946
947 continue;
948 }
949
950 if (nxt_strcmp(p, "--user") == 0) {
951 if (*argv == NULL) {
952 write(STDERR_FILENO, no_user, nxt_length(no_user));
953 return NXT_ERROR;
954 }
955
956 p = *argv++;
957
958 rt->user_cred.user = p;
959
960 continue;
961 }
962
963 if (nxt_strcmp(p, "--group") == 0) {
964 if (*argv == NULL) {
965 write(STDERR_FILENO, no_group, nxt_length(no_group));
966 return NXT_ERROR;
967 }
968
969 p = *argv++;
970
971 rt->group = p;
972
973 continue;
974 }
975
976 if (nxt_strcmp(p, "--pid") == 0) {
977 if (*argv == NULL) {
978 write(STDERR_FILENO, no_pid, nxt_length(no_pid));
979 return NXT_ERROR;
980 }
981
982 p = *argv++;
983
984 rt->pid = p;
985
986 continue;
987 }
988
989 if (nxt_strcmp(p, "--log") == 0) {
990 if (*argv == NULL) {
991 write(STDERR_FILENO, no_log, nxt_length(no_log));
992 return NXT_ERROR;
993 }
994
995 p = *argv++;
996
997 rt->log = p;
998
999 continue;
1000 }
1001
1002 if (nxt_strcmp(p, "--modules") == 0) {
1003 if (*argv == NULL) {
1004 write(STDERR_FILENO, no_modules, nxt_length(no_modules));
1005 return NXT_ERROR;
1006 }
1007
1008 p = *argv++;
1009
1010 rt->modules = p;
1011
1012 continue;
1013 }
1014
1015 if (nxt_strcmp(p, "--state") == 0) {
1016 if (*argv == NULL) {
1017 write(STDERR_FILENO, no_state, nxt_length(no_state));
1018 return NXT_ERROR;
1019 }
1020
1021 p = *argv++;
1022
1023 rt->state = p;
1024
1025 continue;
1026 }
1027
1028 if (nxt_strcmp(p, "--tmp") == 0) {
1029 if (*argv == NULL) {
1030 write(STDERR_FILENO, no_tmp, nxt_length(no_tmp));
1031 return NXT_ERROR;
1032 }
1033
1034 p = *argv++;
1035
1036 rt->tmp = p;
1037
1038 continue;
1039 }
1040
1041 if (nxt_strcmp(p, "--no-daemon") == 0) {
1042 rt->daemon = 0;
1043 continue;
1044 }
1045
1046 if (nxt_strcmp(p, "--version") == 0) {
1047 write(STDERR_FILENO, version, nxt_length(version));
1048 exit(0);
1049 }
1050
1051 if (nxt_strcmp(p, "--help") == 0 || nxt_strcmp(p, "-h") == 0) {
1052 write(STDOUT_FILENO, help, nxt_length(help));
1053
1054 if (sizeof(NXT_GROUP) == 1) {
1055 write(STDOUT_FILENO, primary, nxt_length(primary));
1056
1057 } else {
1058 write(STDOUT_FILENO, group, nxt_length(group));
1059 }
1060
1061 exit(0);
1062 }
1063
1064 end = nxt_sprintf(buf, buf + sizeof(buf), "unknown option \"%s\", "
1065 "try \"%s -h\" for available options\n",
1066 p, nxt_process_argv[0]);
1067
1068 write(STDERR_FILENO, buf, end - buf);
1069
1070 return NXT_ERROR;
1071 }
1072
1073 return NXT_OK;
1074}
1075
1076
1077nxt_listen_socket_t *
1078nxt_runtime_listen_socket_add(nxt_runtime_t *rt, nxt_sockaddr_t *sa)
1079{
1080 nxt_mp_t *mp;
1081 nxt_listen_socket_t *ls;
1082
1083 ls = nxt_array_zero_add(rt->listen_sockets);
1084 if (ls == NULL) {
1085 return NULL;
1086 }
1087
1088 mp = rt->mem_pool;
1089
1090 ls->sockaddr = nxt_sockaddr_create(mp, &sa->u.sockaddr, sa->socklen,
1091 sa->length);
1092 if (ls->sockaddr == NULL) {
1093 return NULL;
1094 }
1095
1096 ls->sockaddr->type = sa->type;
1097
1098 nxt_sockaddr_text(ls->sockaddr);
1099
1100 ls->socket = -1;
1101 ls->backlog = NXT_LISTEN_BACKLOG;
1102
1103 return ls;
1104}
1105
1106
1107static nxt_int_t
1108nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt)
1109{
1110 size_t length;
1111 char hostname[NXT_MAXHOSTNAMELEN + 1];
1112
1113 if (gethostname(hostname, NXT_MAXHOSTNAMELEN) != 0) {
1114 nxt_alert(task, "gethostname() failed %E", nxt_errno);
1115 return NXT_ERROR;
1116 }
1117
1118 /*
1119 * Linux gethostname(2):
1120 *
1121 * If the null-terminated hostname is too large to fit,
1122 * then the name is truncated, and no error is returned.
1123 *
1124 * For this reason an additional byte is reserved in the buffer.
1125 */
1126 hostname[NXT_MAXHOSTNAMELEN] = '\0';
1127
1128 length = nxt_strlen(hostname);
1129 rt->hostname.length = length;
1130
1131 rt->hostname.start = nxt_mp_nget(rt->mem_pool, length);
1132
1133 if (rt->hostname.start != NULL) {
1134 nxt_memcpy_lowcase(rt->hostname.start, (u_char *) hostname, length);
1135 return NXT_OK;
1136 }
1137
1138 return NXT_ERROR;
1139}
1140
1141
1142static nxt_int_t
1143nxt_runtime_log_files_init(nxt_runtime_t *rt)
1144{
1145 nxt_file_t *file;
1146 nxt_list_t *log_files;
1147
1148 log_files = nxt_list_create(rt->mem_pool, 1, sizeof(nxt_file_t));
1149
1150 if (nxt_fast_path(log_files != NULL)) {
1151 rt->log_files = log_files;
1152
1153 /* Preallocate the main log. This allocation cannot fail. */
1154 file = nxt_list_zero_add(log_files);
1155
1156 file->fd = NXT_FILE_INVALID;
1157 file->log_level = NXT_LOG_ALERT;
1158
1159 return NXT_OK;
1160 }
1161
1162 return NXT_ERROR;
1163}
1164
1165
1166nxt_file_t *
1167nxt_runtime_log_file_add(nxt_runtime_t *rt, nxt_str_t *name)
1168{
1169 nxt_int_t ret;
1170 nxt_file_t *file;
1171 nxt_file_name_str_t file_name;
1172
1173 ret = nxt_file_name_create(rt->mem_pool, &file_name, "V%Z", name);
1174
1175 if (nxt_slow_path(ret != NXT_OK)) {
1176 return NULL;
1177 }
1178
1179 nxt_list_each(file, rt->log_files) {
1180
1181 /* STUB: hardecoded case sensitive/insensitive. */
1182
1183 if (file->name != NULL
1184 && nxt_file_name_eq(file->name, file_name.start))
1185 {
1186 return file;
1187 }
1188
1189 } nxt_list_loop;
1190
1191 file = nxt_list_zero_add(rt->log_files);
1192
1193 if (nxt_slow_path(file == NULL)) {
1194 return NULL;
1195 }
1196
1197 file->fd = NXT_FILE_INVALID;
1198 file->log_level = NXT_LOG_ALERT;
1199 file->name = file_name.start;
1200
1201 return file;
1202}
1203
1204
1205static nxt_int_t
1206nxt_runtime_log_files_create(nxt_task_t *task, nxt_runtime_t *rt)
1207{
1208 nxt_int_t ret;
1209 nxt_file_t *file;
1210
1211 nxt_list_each(file, rt->log_files) {
1212
1213 ret = nxt_file_open(task, file, O_WRONLY | O_APPEND, O_CREAT,
1214 NXT_FILE_OWNER_ACCESS);
1215
1216 if (ret != NXT_OK) {
1217 return NXT_ERROR;
1218 }
1219
1220 } nxt_list_loop;
1221
1222 file = nxt_list_first(rt->log_files);
1223
1224 return nxt_file_stderr(file);
1225}
1226
1227
1228nxt_int_t
1229nxt_runtime_listen_sockets_create(nxt_task_t *task, nxt_runtime_t *rt)
1230{
1231 nxt_int_t ret;
1232 nxt_uint_t c, p, ncurr, nprev;
1233 nxt_listen_socket_t *curr, *prev;
1234
1235 curr = rt->listen_sockets->elts;
1236 ncurr = rt->listen_sockets->nelts;
1237
1238 if (rt->inherited_sockets != NULL) {
1239 prev = rt->inherited_sockets->elts;
1240 nprev = rt->inherited_sockets->nelts;
1241
1242 } else {
1243 prev = NULL;
1244 nprev = 0;
1245 }
1246
1247 for (c = 0; c < ncurr; c++) {
1248
1249 for (p = 0; p < nprev; p++) {
1250
1251 if (nxt_sockaddr_cmp(curr[c].sockaddr, prev[p].sockaddr)) {
1252
1253 ret = nxt_listen_socket_update(task, &curr[c], &prev[p]);
1254 if (ret != NXT_OK) {
1255 return NXT_ERROR;
1256 }
1257
1258 goto next;
1259 }
1260 }
1261
1262 if (nxt_listen_socket_create(task, rt->mem_pool, &curr[c]) != NXT_OK) {
1263 return NXT_ERROR;
1264 }
1265
1266 next:
1267
1268 continue;
1269 }
1270
1271 return NXT_OK;
1272}
1273
1274
1275nxt_int_t
1276nxt_runtime_listen_sockets_enable(nxt_task_t *task, nxt_runtime_t *rt)
1277{
1278 nxt_uint_t i, n;
1279 nxt_listen_socket_t *ls;
1280
1281 ls = rt->listen_sockets->elts;
1282 n = rt->listen_sockets->nelts;
1283
1284 for (i = 0; i < n; i++) {
1285 if (ls[i].flags == NXT_NONBLOCK) {
1286 if (nxt_listen_event(task, &ls[i]) == NULL) {
1287 return NXT_ERROR;
1288 }
1289 }
1290 }
1291
1292 return NXT_OK;
1293}
1294
1295
1296nxt_str_t *
1297nxt_current_directory(nxt_mp_t *mp)
1298{
1299 size_t length;
1300 u_char *p;
1301 nxt_str_t *name;
1302 char buf[NXT_MAX_PATH_LEN];
1303
1304 length = nxt_dir_current(buf, NXT_MAX_PATH_LEN);
1305
1306 if (nxt_fast_path(length != 0)) {
1307 name = nxt_str_alloc(mp, length + 1);
1308
1309 if (nxt_fast_path(name != NULL)) {
1310 p = nxt_cpymem(name->start, buf, length);
1311 *p = '/';
1312
1313 return name;
1314 }
1315 }
1316
1317 return NULL;
1318}
1319
1320
1321static nxt_int_t
1322nxt_runtime_pid_file_create(nxt_task_t *task, nxt_file_name_t *pid_file)
1323{
1324 ssize_t length;
1325 nxt_int_t n;
1326 nxt_file_t file;
1327 u_char pid[NXT_INT64_T_LEN + nxt_length("\n")];
1328
1329 nxt_memzero(&file, sizeof(nxt_file_t));
1330
1331 file.name = pid_file;
1332
1333 n = nxt_file_open(task, &file, O_WRONLY, O_CREAT | O_TRUNC,
1334 NXT_FILE_DEFAULT_ACCESS);
1335
1336 if (n != NXT_OK) {
1337 return NXT_ERROR;
1338 }
1339
1340 length = nxt_sprintf(pid, pid + sizeof(pid), "%PI%n", nxt_pid) - pid;
1341
1342 if (nxt_file_write(&file, pid, length, 0) != length) {
1343 return NXT_ERROR;
1344 }
1345
1346 nxt_file_close(task, &file);
1347
1348 return NXT_OK;
1349}
1350
1351
1352nxt_process_t *
1353nxt_runtime_process_new(nxt_runtime_t *rt)
1354{
1355 nxt_process_t *process;
1356
1357 /* TODO: memory failures. */
1358
1359 process = nxt_mp_zalloc(rt->mem_pool,
1360 sizeof(nxt_process_t) + sizeof(nxt_process_init_t));
1361
1362 if (nxt_slow_path(process == NULL)) {
1363 return NULL;
1364 }
1365
1366 nxt_queue_init(&process->ports);
1367
1368 nxt_thread_mutex_create(&process->incoming.mutex);
1369 nxt_thread_mutex_create(&process->outgoing.mutex);
1370 nxt_thread_mutex_create(&process->cp_mutex);
1371
1372 process->use_count = 1;
1373
1374 return process;
1375}
1376
1377
1378void
1379nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process)
1380{
1381 nxt_port_t *port;
1382
1383 if (process->registered == 1) {
1384 nxt_runtime_process_remove(rt, process);
1385 }
1386
1387 nxt_assert(process->use_count == 0);
1388 nxt_assert(process->registered == 0);
1389
1390 nxt_port_mmaps_destroy(&process->incoming, 1);
1391 nxt_port_mmaps_destroy(&process->outgoing, 1);
1392
1393 do {
1394 port = nxt_port_hash_retrieve(&process->connected_ports);
1395
1396 } while (port != NULL);
1397
1398 nxt_thread_mutex_destroy(&process->incoming.mutex);
1399 nxt_thread_mutex_destroy(&process->outgoing.mutex);
1400 nxt_thread_mutex_destroy(&process->cp_mutex);
1401
1402 /* processes from nxt_runtime_process_get() have no memory pool */
1403 if (process->mem_pool != NULL) {
1404 nxt_mp_destroy(process->mem_pool);
1405 }
1406
1407 nxt_mp_free(rt->mem_pool, process);
1408}
1409
1410
1411static nxt_int_t
1412nxt_runtime_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data)
1413{
1414 nxt_process_t *process;
1415
1416 process = data;
1417
1418 if (lhq->key.length == sizeof(nxt_pid_t)
1419 && *(nxt_pid_t *) lhq->key.start == process->pid)
1420 {
1421 return NXT_OK;
1422 }
1423
1424 return NXT_DECLINED;
1425}
1426
1427static const nxt_lvlhsh_proto_t lvlhsh_processes_proto nxt_aligned(64) = {
1428 NXT_LVLHSH_DEFAULT,
1429 nxt_runtime_lvlhsh_pid_test,
1430 nxt_lvlhsh_alloc,
1431 nxt_lvlhsh_free,
1432};
1433
1434
1435nxt_inline void
1436nxt_runtime_process_lhq_pid(nxt_lvlhsh_query_t *lhq, nxt_pid_t *pid)
1437{
1438 lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid));
1439 lhq->key.length = sizeof(*pid);
1440 lhq->key.start = (u_char *) pid;
1441 lhq->proto = &lvlhsh_processes_proto;
1442}
1443
1444
1445nxt_process_t *
1446nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid)
1447{
1448 nxt_process_t *process;
1449 nxt_lvlhsh_query_t lhq;
1450
1451 process = NULL;
1452
1453 nxt_runtime_process_lhq_pid(&lhq, &pid);
1454
1455 nxt_thread_mutex_lock(&rt->processes_mutex);
1456
1457 if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) {
1458 process = lhq.value;
1459
1460 } else {
1461 nxt_thread_log_debug("process %PI not found", pid);
1462 }
1463
1464 nxt_thread_mutex_unlock(&rt->processes_mutex);
1465
1466 return process;
1467}
1468
1469
1470static nxt_process_t *
1471nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid)
1472{
1473 nxt_process_t *process;
1474 nxt_lvlhsh_query_t lhq;
1475
1476 nxt_runtime_process_lhq_pid(&lhq, &pid);
1477
1478 nxt_thread_mutex_lock(&rt->processes_mutex);
1479
1480 if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) {
1481 nxt_thread_log_debug("process %PI found", pid);
1482
1483 nxt_thread_mutex_unlock(&rt->processes_mutex);
1484
1485 process = lhq.value;
1486 process->use_count++;
1487
1488 return process;
1489 }
1490
1491 process = nxt_runtime_process_new(rt);
1492 if (nxt_slow_path(process == NULL)) {
1493
1494 nxt_thread_mutex_unlock(&rt->processes_mutex);
1495
1496 return NULL;
1497 }
1498
1499 process->pid = pid;
1500
1501 lhq.replace = 0;
1502 lhq.value = process;
1503 lhq.pool = rt->mem_pool;
1504
1505 switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) {
1506
1507 case NXT_OK:
1508 if (rt->nprocesses == 0) {
1509 rt->mprocess = process;
1510 }
1511
1512 rt->nprocesses++;
1513
1514 process->registered = 1;
1515
1516 nxt_thread_log_debug("process %PI insert", pid);
1517 break;
1518
1519 default:
1520 nxt_thread_log_debug("process %PI insert failed", pid);
1521 break;
1522 }
1523
1524 nxt_thread_mutex_unlock(&rt->processes_mutex);
1525
1526 return process;
1527}
1528
1529
1530void
1531nxt_runtime_process_add(nxt_task_t *task, nxt_process_t *process)
1532{
1533 nxt_port_t *port;
1534 nxt_runtime_t *rt;
1535 nxt_lvlhsh_query_t lhq;
1536
1537 nxt_assert(process->registered == 0);
1538
1539 rt = task->thread->runtime;
1540
1541 nxt_runtime_process_lhq_pid(&lhq, &process->pid);
1542
1543 lhq.replace = 0;
1544 lhq.value = process;
1545 lhq.pool = rt->mem_pool;
1546
1547 nxt_thread_mutex_lock(&rt->processes_mutex);
1548
1549 switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) {
1550
1551 case NXT_OK:
1552 if (rt->nprocesses == 0) {
1553 rt->mprocess = process;
1554 }
1555
1556 rt->nprocesses++;
1557
1558 nxt_process_port_each(process, port) {
1559
1560 port->pid = process->pid;
1561
1562 nxt_runtime_port_add(task, port);
1563
1564 } nxt_process_port_loop;
1565
1566 process->registered = 1;
1567
1568 nxt_thread_log_debug("process %PI added", process->pid);
1569 break;
1570
1571 default:
1572 nxt_thread_log_debug("process %PI failed to add", process->pid);
1573 break;
1574 }
1575
1576 nxt_thread_mutex_unlock(&rt->processes_mutex);
1577}
1578
1579
1580static void
1581nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process)
1582{
1583 nxt_pid_t pid;
1584 nxt_lvlhsh_query_t lhq;
1585
1586 pid = process->pid;
1587
1588 nxt_runtime_process_lhq_pid(&lhq, &pid);
1589
1590 lhq.pool = rt->mem_pool;
1591
1592 nxt_thread_mutex_lock(&rt->processes_mutex);
1593
1594 switch (nxt_lvlhsh_delete(&rt->processes, &lhq)) {
1595
1596 case NXT_OK:
1597 rt->nprocesses--;
1598
1599 process = lhq.value;
1600
1601 process->registered = 0;
1602
1603 nxt_thread_log_debug("process %PI removed", pid);
1604 break;
1605
1606 default:
1607 nxt_thread_log_debug("process %PI remove failed", pid);
1608 break;
1609 }
1610
1611 nxt_thread_mutex_unlock(&rt->processes_mutex);
1612}
1613
1614
1615nxt_process_t *
1616nxt_runtime_process_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe)
1617{
1618 nxt_lvlhsh_each_init(lhe, &lvlhsh_processes_proto);
1619
1620 return nxt_runtime_process_next(rt, lhe);
1621}
1622
1623
1624nxt_port_t *
1625nxt_runtime_process_port_create(nxt_task_t *task, nxt_runtime_t *rt,
1626 nxt_pid_t pid, nxt_port_id_t id, nxt_process_type_t type)
1627{
1628 nxt_port_t *port;
1629 nxt_process_t *process;
1630
1631 process = nxt_runtime_process_get(rt, pid);
1632 if (nxt_slow_path(process == NULL)) {
1633 return NULL;
1634 }
1635
1636 port = nxt_port_new(task, id, pid, type);
1637 if (nxt_slow_path(port == NULL)) {
1638 nxt_process_use(task, process, -1);
1639 return NULL;
1640 }
1641
1642 nxt_process_port_add(task, process, port);
1643
1644 nxt_process_use(task, process, -1);
1645
1646 nxt_runtime_port_add(task, port);
1647
1648 nxt_port_use(task, port, -1);
1649
1650 return port;
1651}
1652
1653
1654static void
1655nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port)
1656{
1657 nxt_int_t res;
1658 nxt_runtime_t *rt;
1659
1660 rt = task->thread->runtime;
1661
1662 res = nxt_port_hash_add(&rt->ports, port);
1663
1664 if (res != NXT_OK) {
1665 return;
1666 }
1667
1668 rt->port_by_type[port->type] = port;
1669
1670 nxt_port_use(task, port, 1);
1671}
1672
1673
1674void
1675nxt_runtime_port_remove(nxt_task_t *task, nxt_port_t *port)
1676{
1677 nxt_int_t res;
1678 nxt_runtime_t *rt;
1679
1680 rt = task->thread->runtime;
1681
1682 res = nxt_port_hash_remove(&rt->ports, port);
1683
1684 if (res != NXT_OK) {
1685 return;
1686 }
1687
1688 if (rt->port_by_type[port->type] == port) {
1689 rt->port_by_type[port->type] = NULL;
1690 }
1691
1692 nxt_port_use(task, port, -1);
1693}
1694
1695
1696nxt_port_t *
1697nxt_runtime_port_find(nxt_runtime_t *rt, nxt_pid_t pid,
1698 nxt_port_id_t port_id)
1699{
1700 return nxt_port_hash_find(&rt->ports, pid, port_id);
1701}