nxt_runtime.c (1545:78836321a126) nxt_runtime.c (1548:a745db447e56)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_main.h>
9#include <nxt_runtime.h>
10#include <nxt_port.h>
11#include <nxt_main_process.h>
12#include <nxt_router.h>
13
14
15static nxt_int_t nxt_runtime_inherited_listen_sockets(nxt_task_t *task,
16 nxt_runtime_t *rt);
17static nxt_int_t nxt_runtime_systemd_listen_sockets(nxt_task_t *task,
18 nxt_runtime_t *rt);
19static nxt_int_t nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt);
20static nxt_int_t nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt);
21static void nxt_runtime_start(nxt_task_t *task, void *obj, void *data);
22static void nxt_runtime_initial_start(nxt_task_t *task, nxt_uint_t status);
23static void nxt_runtime_close_idle_connections(nxt_event_engine_t *engine);
24static void nxt_runtime_stop_all_processes(nxt_task_t *task, nxt_runtime_t *rt);
25static void nxt_runtime_exit(nxt_task_t *task, void *obj, void *data);
26static nxt_int_t nxt_runtime_event_engine_change(nxt_task_t *task,
27 nxt_runtime_t *rt);
28static nxt_int_t nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt);
29static nxt_int_t nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt);
30static nxt_int_t nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt);
31static nxt_int_t nxt_runtime_log_files_init(nxt_runtime_t *rt);
32static nxt_int_t nxt_runtime_log_files_create(nxt_task_t *task,
33 nxt_runtime_t *rt);
34static nxt_int_t nxt_runtime_pid_file_create(nxt_task_t *task,
35 nxt_file_name_t *pid_file);
36static void nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt,
37 nxt_runtime_cont_t cont);
38static void nxt_runtime_thread_pool_init(void);
39static void nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj,
40 void *data);
41static nxt_process_t *nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid);
42static void nxt_runtime_process_remove(nxt_runtime_t *rt,
43 nxt_process_t *process);
44static void nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port);
45
46
47nxt_int_t
48nxt_runtime_create(nxt_task_t *task)
49{
50 nxt_mp_t *mp;
51 nxt_int_t ret;
52 nxt_array_t *listen_sockets;
53 nxt_runtime_t *rt;
54 nxt_app_lang_module_t *lang;
55
56 mp = nxt_mp_create(1024, 128, 256, 32);
57 if (nxt_slow_path(mp == NULL)) {
58 return NXT_ERROR;
59 }
60
61 rt = nxt_mp_zget(mp, sizeof(nxt_runtime_t));
62 if (nxt_slow_path(rt == NULL)) {
63 goto fail;
64 }
65
66 task->thread->runtime = rt;
67 rt->mem_pool = mp;
68
69 nxt_thread_mutex_create(&rt->processes_mutex);
70
71 rt->services = nxt_services_init(mp);
72 if (nxt_slow_path(rt->services == NULL)) {
73 goto fail;
74 }
75
76 rt->languages = nxt_array_create(mp, 1, sizeof(nxt_app_lang_module_t));
77 if (nxt_slow_path(rt->languages == NULL)) {
78 goto fail;
79 }
80
81 /* Should not fail. */
82 lang = nxt_array_add(rt->languages);
83 lang->type = NXT_APP_EXTERNAL;
84 lang->version = (u_char *) "";
85 lang->file = NULL;
86 lang->module = &nxt_external_module;
87 lang->mounts = NULL;
88
89 listen_sockets = nxt_array_create(mp, 1, sizeof(nxt_listen_socket_t));
90 if (nxt_slow_path(listen_sockets == NULL)) {
91 goto fail;
92 }
93
94 rt->listen_sockets = listen_sockets;
95
96 ret = nxt_runtime_inherited_listen_sockets(task, rt);
97 if (nxt_slow_path(ret != NXT_OK)) {
98 goto fail;
99 }
100
101 if (nxt_runtime_hostname(task, rt) != NXT_OK) {
102 goto fail;
103 }
104
105 if (nxt_slow_path(nxt_runtime_log_files_init(rt) != NXT_OK)) {
106 goto fail;
107 }
108
109 if (nxt_runtime_event_engines(task, rt) != NXT_OK) {
110 goto fail;
111 }
112
113 if (nxt_slow_path(nxt_runtime_thread_pools(task->thread, rt) != NXT_OK)) {
114 goto fail;
115 }
116
117 rt->start = nxt_runtime_initial_start;
118
119 if (nxt_runtime_conf_init(task, rt) != NXT_OK) {
120 goto fail;
121 }
122
123 if (nxt_port_rpc_init() != NXT_OK) {
124 goto fail;
125 }
126
127 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
128 nxt_runtime_start, task, rt, NULL);
129
130 return NXT_OK;
131
132fail:
133
134 nxt_mp_destroy(mp);
135
136 return NXT_ERROR;
137}
138
139
140static nxt_int_t
141nxt_runtime_inherited_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt)
142{
143 u_char *v, *p;
144 nxt_int_t type;
145 nxt_array_t *inherited_sockets;
146 nxt_socket_t s;
147 nxt_listen_socket_t *ls;
148
149 v = (u_char *) getenv("NGINX");
150
151 if (v == NULL) {
152 return nxt_runtime_systemd_listen_sockets(task, rt);
153 }
154
155 nxt_alert(task, "using inherited listen sockets: %s", v);
156
157 inherited_sockets = nxt_array_create(rt->mem_pool,
158 1, sizeof(nxt_listen_socket_t));
159 if (inherited_sockets == NULL) {
160 return NXT_ERROR;
161 }
162
163 rt->inherited_sockets = inherited_sockets;
164
165 for (p = v; *p != '\0'; p++) {
166
167 if (*p == ';') {
168 s = nxt_int_parse(v, p - v);
169
170 if (nxt_slow_path(s < 0)) {
171 nxt_alert(task, "invalid socket number \"%s\" "
172 "in NGINX environment variable, "
173 "ignoring the rest of the variable", v);
174 return NXT_ERROR;
175 }
176
177 v = p + 1;
178
179 ls = nxt_array_zero_add(inherited_sockets);
180 if (nxt_slow_path(ls == NULL)) {
181 return NXT_ERROR;
182 }
183
184 ls->socket = s;
185
186 ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s);
187 if (nxt_slow_path(ls->sockaddr == NULL)) {
188 return NXT_ERROR;
189 }
190
191 type = nxt_socket_getsockopt(task, s, SOL_SOCKET, SO_TYPE);
192 if (nxt_slow_path(type == -1)) {
193 return NXT_ERROR;
194 }
195
196 ls->sockaddr->type = (uint16_t) type;
197 }
198 }
199
200 return NXT_OK;
201}
202
203
204static nxt_int_t
205nxt_runtime_systemd_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt)
206{
207 u_char *nfd, *pid;
208 nxt_int_t n;
209 nxt_array_t *inherited_sockets;
210 nxt_socket_t s;
211 nxt_listen_socket_t *ls;
212
213 /*
214 * Number of listening sockets passed. The socket
215 * descriptors start from number 3 and are sequential.
216 */
217 nfd = (u_char *) getenv("LISTEN_FDS");
218 if (nfd == NULL) {
219 return NXT_OK;
220 }
221
222 /* The pid of the service process. */
223 pid = (u_char *) getenv("LISTEN_PID");
224 if (pid == NULL) {
225 return NXT_OK;
226 }
227
228 n = nxt_int_parse(nfd, nxt_strlen(nfd));
229 if (n < 0) {
230 return NXT_OK;
231 }
232
233 if (nxt_pid != nxt_int_parse(pid, nxt_strlen(pid))) {
234 return NXT_OK;
235 }
236
237 nxt_log(task, NXT_LOG_INFO, "using %i systemd listen sockets", n);
238
239 inherited_sockets = nxt_array_create(rt->mem_pool,
240 n, sizeof(nxt_listen_socket_t));
241 if (inherited_sockets == NULL) {
242 return NXT_ERROR;
243 }
244
245 rt->inherited_sockets = inherited_sockets;
246
247 for (s = 3; s < n; s++) {
248 ls = nxt_array_zero_add(inherited_sockets);
249 if (nxt_slow_path(ls == NULL)) {
250 return NXT_ERROR;
251 }
252
253 ls->socket = s;
254
255 ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s);
256 if (nxt_slow_path(ls->sockaddr == NULL)) {
257 return NXT_ERROR;
258 }
259
260 ls->sockaddr->type = SOCK_STREAM;
261 }
262
263 return NXT_OK;
264}
265
266
267static nxt_int_t
268nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt)
269{
270 nxt_thread_t *thread;
271 nxt_event_engine_t *engine;
272 const nxt_event_interface_t *interface;
273
274 interface = nxt_service_get(rt->services, "engine", NULL);
275
276 if (nxt_slow_path(interface == NULL)) {
277 /* TODO: log */
278 return NXT_ERROR;
279 }
280
281 engine = nxt_event_engine_create(task, interface,
282 nxt_main_process_signals, 0, 0);
283
284 if (nxt_slow_path(engine == NULL)) {
285 return NXT_ERROR;
286 }
287
288 thread = task->thread;
289 thread->engine = engine;
290#if 0
291 thread->fiber = &engine->fibers->fiber;
292#endif
293
294 engine->id = rt->last_engine_id++;
295 engine->mem_pool = nxt_mp_create(1024, 128, 256, 32);
296
297 nxt_queue_init(&rt->engines);
298 nxt_queue_insert_tail(&rt->engines, &engine->link);
299
300 return NXT_OK;
301}
302
303
304static nxt_int_t
305nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt)
306{
307 nxt_int_t ret;
308 nxt_array_t *thread_pools;
309
310 thread_pools = nxt_array_create(rt->mem_pool, 1,
311 sizeof(nxt_thread_pool_t *));
312
313 if (nxt_slow_path(thread_pools == NULL)) {
314 return NXT_ERROR;
315 }
316
317 rt->thread_pools = thread_pools;
318 ret = nxt_runtime_thread_pool_create(thr, rt, 2, 60000 * 1000000LL);
319
320 if (nxt_slow_path(ret != NXT_OK)) {
321 return NXT_ERROR;
322 }
323
324 return NXT_OK;
325}
326
327
328static void
329nxt_runtime_start(nxt_task_t *task, void *obj, void *data)
330{
331 nxt_runtime_t *rt;
332
333 rt = obj;
334
335 nxt_debug(task, "rt conf done");
336
337 task->thread->log->ctx_handler = NULL;
338 task->thread->log->ctx = NULL;
339
340 if (nxt_runtime_log_files_create(task, rt) != NXT_OK) {
341 goto fail;
342 }
343
344 if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) {
345 goto fail;
346 }
347
348 /*
349 * Thread pools should be destroyed before starting worker
350 * processes, because thread pool semaphores will stick in
351 * locked state in new processes after fork().
352 */
353 nxt_runtime_thread_pool_destroy(task, rt, rt->start);
354
355 return;
356
357fail:
358
359 nxt_runtime_quit(task, 1);
360}
361
362
363static void
364nxt_runtime_initial_start(nxt_task_t *task, nxt_uint_t status)
365{
366 nxt_int_t ret;
367 nxt_thread_t *thr;
368 nxt_runtime_t *rt;
369 const nxt_event_interface_t *interface;
370
371 thr = task->thread;
372 rt = thr->runtime;
373
374 if (rt->inherited_sockets == NULL && rt->daemon) {
375
376 if (nxt_process_daemon(task) != NXT_OK) {
377 goto fail;
378 }
379
380 /*
381 * An event engine should be updated after fork()
382 * even if an event facility was not changed because:
383 * 1) inherited kqueue descriptor is invalid,
384 * 2) the signal thread is not inherited.
385 */
386 interface = nxt_service_get(rt->services, "engine", rt->engine);
387 if (interface == NULL) {
388 goto fail;
389 }
390
391 ret = nxt_event_engine_change(task->thread->engine, interface,
392 rt->batch);
393 if (ret != NXT_OK) {
394 goto fail;
395 }
396 }
397
398 ret = nxt_runtime_pid_file_create(task, rt->pid_file);
399 if (ret != NXT_OK) {
400 goto fail;
401 }
402
403 if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) {
404 goto fail;
405 }
406
407 thr->engine->max_connections = rt->engine_connections;
408
409 if (nxt_main_process_start(thr, task, rt) != NXT_ERROR) {
410 return;
411 }
412
413fail:
414
415 nxt_runtime_quit(task, 1);
416}
417
418
419void
420nxt_runtime_quit(nxt_task_t *task, nxt_uint_t status)
421{
422 nxt_bool_t done;
423 nxt_runtime_t *rt;
424 nxt_event_engine_t *engine;
425
426 rt = task->thread->runtime;
427 rt->status |= status;
428 engine = task->thread->engine;
429
430 nxt_debug(task, "exiting");
431
432 done = 1;
433
434 if (!engine->shutdown) {
435 engine->shutdown = 1;
436
437 if (!nxt_array_is_empty(rt->thread_pools)) {
438 nxt_runtime_thread_pool_destroy(task, rt, nxt_runtime_quit);
439 done = 0;
440 }
441
442 if (rt->type == NXT_PROCESS_MAIN) {
443 nxt_runtime_stop_all_processes(task, rt);
444 done = 0;
445 }
446 }
447
448 nxt_runtime_close_idle_connections(engine);
449
450 if (done) {
451 nxt_work_queue_add(&engine->fast_work_queue, nxt_runtime_exit,
452 task, rt, engine);
453 }
454}
455
456
457static void
458nxt_runtime_close_idle_connections(nxt_event_engine_t *engine)
459{
460 nxt_conn_t *c;
461 nxt_queue_t *idle;
462 nxt_queue_link_t *link, *next;
463
464 nxt_debug(&engine->task, "close idle connections");
465
466 idle = &engine->idle_connections;
467
468 for (link = nxt_queue_first(idle);
469 link != nxt_queue_tail(idle);
470 link = next)
471 {
472 next = nxt_queue_next(link);
473 c = nxt_queue_link_data(link, nxt_conn_t, link);
474
475 if (!c->socket.read_ready) {
476 nxt_queue_remove(link);
477 nxt_conn_close(engine, c);
478 }
479 }
480}
481
482
483void
484nxt_runtime_stop_app_processes(nxt_task_t *task, nxt_runtime_t *rt)
485{
486 nxt_port_t *port;
487 nxt_process_t *process;
488 nxt_process_init_t *init;
489
490 nxt_runtime_process_each(rt, process) {
491
492 init = nxt_process_init(process);
493
494 if (init->type == NXT_PROCESS_APP) {
495
496 nxt_process_port_each(process, port) {
497
498 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1,
499 0, 0, NULL);
500
501 } nxt_process_port_loop;
502 }
503
504 } nxt_runtime_process_loop;
505}
506
507
508static void
509nxt_runtime_stop_all_processes(nxt_task_t *task, nxt_runtime_t *rt)
510{
511 nxt_port_t *port;
512 nxt_process_t *process;
513
514 nxt_runtime_process_each(rt, process) {
515
516 nxt_process_port_each(process, port) {
517
518 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0,
519 0, NULL);
520
521 } nxt_process_port_loop;
522
523 } nxt_runtime_process_loop;
524}
525
526
527static void
528nxt_runtime_exit(nxt_task_t *task, void *obj, void *data)
529{
530 int status, engine_count;
531 nxt_runtime_t *rt;
532 nxt_process_t *process;
533 nxt_event_engine_t *engine;
534
535 rt = obj;
536 engine = data;
537
538 nxt_debug(task, "thread pools: %d", rt->thread_pools->nelts);
539
540 if (!nxt_array_is_empty(rt->thread_pools)) {
541 return;
542 }
543
544 if (rt->type == NXT_PROCESS_MAIN) {
545 if (rt->pid_file != NULL) {
546 nxt_file_delete(rt->pid_file);
547 }
548
549#if (NXT_HAVE_UNIX_DOMAIN)
550 {
551 nxt_sockaddr_t *sa;
552 nxt_file_name_t *name;
553
554 sa = rt->controller_listen;
555
556 if (sa->u.sockaddr.sa_family == AF_UNIX) {
557 name = (nxt_file_name_t *) sa->u.sockaddr_un.sun_path;
558 (void) nxt_file_delete(name);
559 }
560 }
561#endif
562 }
563
564 if (!engine->event.signal_support) {
565 nxt_event_engine_signals_stop(engine);
566 }
567
568 nxt_runtime_process_each(rt, process) {
569
570 nxt_process_close_ports(task, process);
571
572 } nxt_runtime_process_loop;
573
574 status = rt->status;
575
576 engine_count = 0;
577
578 nxt_queue_each(engine, &rt->engines, nxt_event_engine_t, link) {
579
580 engine_count++;
581
582 } nxt_queue_loop;
583
584 if (engine_count <= 1) {
585 if (rt->port_by_type[rt->type] != NULL) {
586 nxt_port_use(task, rt->port_by_type[rt->type], -1);
587 }
588
589 nxt_thread_mutex_destroy(&rt->processes_mutex);
590
591 nxt_mp_destroy(rt->mem_pool);
592 }
593
594 nxt_debug(task, "exit: %d", status);
595
596 exit(status);
597 nxt_unreachable();
598}
599
600
601static nxt_int_t
602nxt_runtime_event_engine_change(nxt_task_t *task, nxt_runtime_t *rt)
603{
604 nxt_event_engine_t *engine;
605 const nxt_event_interface_t *interface;
606
607 engine = task->thread->engine;
608
609 if (engine->batch == rt->batch
610 && nxt_strcmp(engine->event.name, rt->engine) == 0)
611 {
612 return NXT_OK;
613 }
614
615 interface = nxt_service_get(rt->services, "engine", rt->engine);
616
617 if (interface != NULL) {
618 return nxt_event_engine_change(engine, interface, rt->batch);
619 }
620
621 return NXT_ERROR;
622}
623
624
625void
626nxt_runtime_event_engine_free(nxt_runtime_t *rt)
627{
628 nxt_queue_link_t *link;
629 nxt_event_engine_t *engine;
630
631 link = nxt_queue_first(&rt->engines);
632 nxt_queue_remove(link);
633
634 engine = nxt_queue_link_data(link, nxt_event_engine_t, link);
635 nxt_event_engine_free(engine);
636}
637
638
639nxt_int_t
640nxt_runtime_thread_pool_create(nxt_thread_t *thr, nxt_runtime_t *rt,
641 nxt_uint_t max_threads, nxt_nsec_t timeout)
642{
643 nxt_thread_pool_t *thread_pool, **tp;
644
645 tp = nxt_array_add(rt->thread_pools);
646 if (tp == NULL) {
647 return NXT_ERROR;
648 }
649
650 thread_pool = nxt_thread_pool_create(max_threads, timeout,
651 nxt_runtime_thread_pool_init,
652 thr->engine,
653 nxt_runtime_thread_pool_exit);
654
655 if (nxt_fast_path(thread_pool != NULL)) {
656 *tp = thread_pool;
657 }
658
659 return NXT_OK;
660}
661
662
663static void
664nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt,
665 nxt_runtime_cont_t cont)
666{
667 nxt_uint_t n;
668 nxt_thread_pool_t **tp;
669
670 rt->continuation = cont;
671
672 n = rt->thread_pools->nelts;
673
674 if (n == 0) {
675 cont(task, 0);
676 return;
677 }
678
679 tp = rt->thread_pools->elts;
680
681 do {
682 nxt_thread_pool_destroy(*tp);
683
684 tp++;
685 n--;
686 } while (n != 0);
687}
688
689
690static void
691nxt_runtime_thread_pool_init(void)
692{
693#if (NXT_REGEX)
694 nxt_regex_init(0);
695#endif
696}
697
698
699static void
700nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj, void *data)
701{
702 nxt_uint_t i, n;
703 nxt_runtime_t *rt;
704 nxt_thread_pool_t *tp, **thread_pools;
705 nxt_thread_handle_t handle;
706
707 tp = obj;
708
709 if (data != NULL) {
710 handle = (nxt_thread_handle_t) (uintptr_t) data;
711 nxt_thread_wait(handle);
712 }
713
714 rt = task->thread->runtime;
715
716 thread_pools = rt->thread_pools->elts;
717 n = rt->thread_pools->nelts;
718
719 nxt_debug(task, "thread pools: %ui", n);
720
721 for (i = 0; i < n; i++) {
722
723 if (tp == thread_pools[i]) {
724 nxt_array_remove(rt->thread_pools, &thread_pools[i]);
725
726 nxt_free(tp);
727
728 if (n == 1) {
729 /* The last thread pool. */
730 rt->continuation(task, 0);
731 }
732
733 return;
734 }
735 }
736}
737
738
739static nxt_int_t
740nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt)
741{
742 nxt_int_t ret;
743 nxt_str_t control;
744 nxt_uint_t n;
745 nxt_file_t *file;
746 const char *slash;
747 nxt_sockaddr_t *sa;
748 nxt_file_name_str_t file_name;
749 const nxt_event_interface_t *interface;
750
751 rt->daemon = 1;
752 rt->engine_connections = 256;
753 rt->auxiliary_threads = 2;
754 rt->user_cred.user = NXT_USER;
755 rt->group = NXT_GROUP;
756 rt->pid = NXT_PID;
757 rt->log = NXT_LOG;
758 rt->modules = NXT_MODULES;
759 rt->state = NXT_STATE;
760 rt->control = NXT_CONTROL_SOCK;
761 rt->tmp = NXT_TMP;
762
763 nxt_memzero(&rt->capabilities, sizeof(nxt_capabilities_t));
764
765 if (nxt_runtime_conf_read_cmd(task, rt) != NXT_OK) {
766 return NXT_ERROR;
767 }
768
769 if (nxt_capability_set(task, &rt->capabilities) != NXT_OK) {
770 return NXT_ERROR;
771 }
772
773 if (rt->capabilities.setid) {
774 ret = nxt_credential_get(task, rt->mem_pool, &rt->user_cred,
775 rt->group);
776
777 if (nxt_slow_path(ret != NXT_OK)) {
778 return NXT_ERROR;
779 }
780
781 } else {
782 nxt_log(task, NXT_LOG_WARN, "Unit is running unprivileged, then it "
783 "cannot use arbitrary user and group.");
784 }
785
786 /* An engine's parameters. */
787
788 interface = nxt_service_get(rt->services, "engine", rt->engine);
789 if (interface == NULL) {
790 return NXT_ERROR;
791 }
792
793 rt->engine = interface->name;
794
795 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%Z", rt->pid);
796 if (nxt_slow_path(ret != NXT_OK)) {
797 return NXT_ERROR;
798 }
799
800 rt->pid_file = file_name.start;
801
802 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%Z", rt->log);
803 if (nxt_slow_path(ret != NXT_OK)) {
804 return NXT_ERROR;
805 }
806
807 file = nxt_list_first(rt->log_files);
808 file->name = file_name.start;
809
810 slash = "";
811 n = nxt_strlen(rt->modules);
812
813 if (n > 1 && rt->modules[n - 1] != '/') {
814 slash = "/";
815 }
816
817 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%s*.unit.so%Z",
818 rt->modules, slash);
819 if (nxt_slow_path(ret != NXT_OK)) {
820 return NXT_ERROR;
821 }
822
823 rt->modules = (char *) file_name.start;
824
825 slash = "";
826 n = nxt_strlen(rt->state);
827
828 if (n > 1 && rt->state[n - 1] != '/') {
829 slash = "/";
830 }
831
832 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%sconf.json%Z",
833 rt->state, slash);
834 if (nxt_slow_path(ret != NXT_OK)) {
835 return NXT_ERROR;
836 }
837
838 rt->conf = (char *) file_name.start;
839
840 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s.tmp%Z", rt->conf);
841 if (nxt_slow_path(ret != NXT_OK)) {
842 return NXT_ERROR;
843 }
844
845 rt->conf_tmp = (char *) file_name.start;
846
847 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%scerts/%Z",
848 rt->state, slash);
849 if (nxt_slow_path(ret != NXT_OK)) {
850 return NXT_ERROR;
851 }
852
853 ret = mkdir((char *) file_name.start, S_IRWXU);
854
855 if (nxt_fast_path(ret == 0 || nxt_errno == EEXIST)) {
856 rt->certs.length = file_name.len;
857 rt->certs.start = file_name.start;
858
859 } else {
860 nxt_alert(task, "Unable to create certificates storage directory: "
861 "mkdir(%s) failed %E", file_name.start, nxt_errno);
862 }
863
864 control.length = nxt_strlen(rt->control);
865 control.start = (u_char *) rt->control;
866
867 sa = nxt_sockaddr_parse(rt->mem_pool, &control);
868 if (nxt_slow_path(sa == NULL)) {
869 return NXT_ERROR;
870 }
871
872 sa->type = SOCK_STREAM;
873
874 rt->controller_listen = sa;
875
876 if (nxt_runtime_controller_socket(task, rt) != NXT_OK) {
877 return NXT_ERROR;
878 }
879
880 return NXT_OK;
881}
882
883
884static nxt_int_t
885nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt)
886{
887 char *p, **argv;
888 u_char *end;
889 u_char buf[1024];
890
891 static const char version[] =
892 "unit version: " NXT_VERSION "\n"
893 "configured as ./configure" NXT_CONFIGURE_OPTIONS "\n";
894
895 static const char no_control[] =
896 "option \"--control\" requires socket address\n";
897 static const char no_user[] = "option \"--user\" requires username\n";
898 static const char no_group[] = "option \"--group\" requires group name\n";
899 static const char no_pid[] = "option \"--pid\" requires filename\n";
900 static const char no_log[] = "option \"--log\" requires filename\n";
901 static const char no_modules[] =
902 "option \"--modules\" requires directory\n";
903 static const char no_state[] = "option \"--state\" requires directory\n";
904 static const char no_tmp[] = "option \"--tmp\" requires directory\n";
905
906 static const char help[] =
907 "\n"
908 "unit options:\n"
909 "\n"
910 " --version print unit version and configure options\n"
911 "\n"
912 " --no-daemon run unit in non-daemon mode\n"
913 "\n"
914 " --control ADDRESS set address of control API socket\n"
915 " default: \"" NXT_CONTROL_SOCK "\"\n"
916 "\n"
917 " --pid FILE set pid filename\n"
918 " default: \"" NXT_PID "\"\n"
919 "\n"
920 " --log FILE set log filename\n"
921 " default: \"" NXT_LOG "\"\n"
922 "\n"
923 " --modules DIRECTORY set modules directory name\n"
924 " default: \"" NXT_MODULES "\"\n"
925 "\n"
926 " --state DIRECTORY set state directory name\n"
927 " default: \"" NXT_STATE "\"\n"
928 "\n"
929 " --tmp DIRECTORY set tmp directory name\n"
930 " default: \"" NXT_TMP "\"\n"
931 "\n"
932 " --user USER set non-privileged processes to run"
933 " as specified user\n"
934 " default: \"" NXT_USER "\"\n"
935 "\n"
936 " --group GROUP set non-privileged processes to run"
937 " as specified group\n"
938 " default: ";
939
940 static const char group[] = "\"" NXT_GROUP "\"\n\n";
941 static const char primary[] = "user's primary group\n\n";
942
943 argv = &nxt_process_argv[1];
944
945 while (*argv != NULL) {
946 p = *argv++;
947
948 if (nxt_strcmp(p, "--control") == 0) {
949 if (*argv == NULL) {
950 write(STDERR_FILENO, no_control, nxt_length(no_control));
951 return NXT_ERROR;
952 }
953
954 p = *argv++;
955
956 rt->control = p;
957
958 continue;
959 }
960
961 if (nxt_strcmp(p, "--user") == 0) {
962 if (*argv == NULL) {
963 write(STDERR_FILENO, no_user, nxt_length(no_user));
964 return NXT_ERROR;
965 }
966
967 p = *argv++;
968
969 rt->user_cred.user = p;
970
971 continue;
972 }
973
974 if (nxt_strcmp(p, "--group") == 0) {
975 if (*argv == NULL) {
976 write(STDERR_FILENO, no_group, nxt_length(no_group));
977 return NXT_ERROR;
978 }
979
980 p = *argv++;
981
982 rt->group = p;
983
984 continue;
985 }
986
987 if (nxt_strcmp(p, "--pid") == 0) {
988 if (*argv == NULL) {
989 write(STDERR_FILENO, no_pid, nxt_length(no_pid));
990 return NXT_ERROR;
991 }
992
993 p = *argv++;
994
995 rt->pid = p;
996
997 continue;
998 }
999
1000 if (nxt_strcmp(p, "--log") == 0) {
1001 if (*argv == NULL) {
1002 write(STDERR_FILENO, no_log, nxt_length(no_log));
1003 return NXT_ERROR;
1004 }
1005
1006 p = *argv++;
1007
1008 rt->log = p;
1009
1010 continue;
1011 }
1012
1013 if (nxt_strcmp(p, "--modules") == 0) {
1014 if (*argv == NULL) {
1015 write(STDERR_FILENO, no_modules, nxt_length(no_modules));
1016 return NXT_ERROR;
1017 }
1018
1019 p = *argv++;
1020
1021 rt->modules = p;
1022
1023 continue;
1024 }
1025
1026 if (nxt_strcmp(p, "--state") == 0) {
1027 if (*argv == NULL) {
1028 write(STDERR_FILENO, no_state, nxt_length(no_state));
1029 return NXT_ERROR;
1030 }
1031
1032 p = *argv++;
1033
1034 rt->state = p;
1035
1036 continue;
1037 }
1038
1039 if (nxt_strcmp(p, "--tmp") == 0) {
1040 if (*argv == NULL) {
1041 write(STDERR_FILENO, no_tmp, nxt_length(no_tmp));
1042 return NXT_ERROR;
1043 }
1044
1045 p = *argv++;
1046
1047 rt->tmp = p;
1048
1049 continue;
1050 }
1051
1052 if (nxt_strcmp(p, "--no-daemon") == 0) {
1053 rt->daemon = 0;
1054 continue;
1055 }
1056
1057 if (nxt_strcmp(p, "--version") == 0) {
1058 write(STDERR_FILENO, version, nxt_length(version));
1059 exit(0);
1060 }
1061
1062 if (nxt_strcmp(p, "--help") == 0 || nxt_strcmp(p, "-h") == 0) {
1063 write(STDOUT_FILENO, help, nxt_length(help));
1064
1065 if (sizeof(NXT_GROUP) == 1) {
1066 write(STDOUT_FILENO, primary, nxt_length(primary));
1067
1068 } else {
1069 write(STDOUT_FILENO, group, nxt_length(group));
1070 }
1071
1072 exit(0);
1073 }
1074
1075 end = nxt_sprintf(buf, buf + sizeof(buf), "unknown option \"%s\", "
1076 "try \"%s -h\" for available options\n",
1077 p, nxt_process_argv[0]);
1078
1079 write(STDERR_FILENO, buf, end - buf);
1080
1081 return NXT_ERROR;
1082 }
1083
1084 return NXT_OK;
1085}
1086
1087
1088nxt_listen_socket_t *
1089nxt_runtime_listen_socket_add(nxt_runtime_t *rt, nxt_sockaddr_t *sa)
1090{
1091 nxt_mp_t *mp;
1092 nxt_listen_socket_t *ls;
1093
1094 ls = nxt_array_zero_add(rt->listen_sockets);
1095 if (ls == NULL) {
1096 return NULL;
1097 }
1098
1099 mp = rt->mem_pool;
1100
1101 ls->sockaddr = nxt_sockaddr_create(mp, &sa->u.sockaddr, sa->socklen,
1102 sa->length);
1103 if (ls->sockaddr == NULL) {
1104 return NULL;
1105 }
1106
1107 ls->sockaddr->type = sa->type;
1108
1109 nxt_sockaddr_text(ls->sockaddr);
1110
1111 ls->socket = -1;
1112 ls->backlog = NXT_LISTEN_BACKLOG;
1113
1114 return ls;
1115}
1116
1117
1118static nxt_int_t
1119nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt)
1120{
1121 size_t length;
1122 char hostname[NXT_MAXHOSTNAMELEN + 1];
1123
1124 if (gethostname(hostname, NXT_MAXHOSTNAMELEN) != 0) {
1125 nxt_alert(task, "gethostname() failed %E", nxt_errno);
1126 return NXT_ERROR;
1127 }
1128
1129 /*
1130 * Linux gethostname(2):
1131 *
1132 * If the null-terminated hostname is too large to fit,
1133 * then the name is truncated, and no error is returned.
1134 *
1135 * For this reason an additional byte is reserved in the buffer.
1136 */
1137 hostname[NXT_MAXHOSTNAMELEN] = '\0';
1138
1139 length = nxt_strlen(hostname);
1140 rt->hostname.length = length;
1141
1142 rt->hostname.start = nxt_mp_nget(rt->mem_pool, length);
1143
1144 if (rt->hostname.start != NULL) {
1145 nxt_memcpy_lowcase(rt->hostname.start, (u_char *) hostname, length);
1146 return NXT_OK;
1147 }
1148
1149 return NXT_ERROR;
1150}
1151
1152
1153static nxt_int_t
1154nxt_runtime_log_files_init(nxt_runtime_t *rt)
1155{
1156 nxt_file_t *file;
1157 nxt_list_t *log_files;
1158
1159 log_files = nxt_list_create(rt->mem_pool, 1, sizeof(nxt_file_t));
1160
1161 if (nxt_fast_path(log_files != NULL)) {
1162 rt->log_files = log_files;
1163
1164 /* Preallocate the main log. This allocation cannot fail. */
1165 file = nxt_list_zero_add(log_files);
1166
1167 file->fd = NXT_FILE_INVALID;
1168 file->log_level = NXT_LOG_ALERT;
1169
1170 return NXT_OK;
1171 }
1172
1173 return NXT_ERROR;
1174}
1175
1176
1177nxt_file_t *
1178nxt_runtime_log_file_add(nxt_runtime_t *rt, nxt_str_t *name)
1179{
1180 nxt_int_t ret;
1181 nxt_file_t *file;
1182 nxt_file_name_str_t file_name;
1183
1184 ret = nxt_file_name_create(rt->mem_pool, &file_name, "V%Z", name);
1185
1186 if (nxt_slow_path(ret != NXT_OK)) {
1187 return NULL;
1188 }
1189
1190 nxt_list_each(file, rt->log_files) {
1191
1192 /* STUB: hardecoded case sensitive/insensitive. */
1193
1194 if (file->name != NULL
1195 && nxt_file_name_eq(file->name, file_name.start))
1196 {
1197 return file;
1198 }
1199
1200 } nxt_list_loop;
1201
1202 file = nxt_list_zero_add(rt->log_files);
1203
1204 if (nxt_slow_path(file == NULL)) {
1205 return NULL;
1206 }
1207
1208 file->fd = NXT_FILE_INVALID;
1209 file->log_level = NXT_LOG_ALERT;
1210 file->name = file_name.start;
1211
1212 return file;
1213}
1214
1215
1216static nxt_int_t
1217nxt_runtime_log_files_create(nxt_task_t *task, nxt_runtime_t *rt)
1218{
1219 nxt_int_t ret;
1220 nxt_file_t *file;
1221
1222 nxt_list_each(file, rt->log_files) {
1223
1224 ret = nxt_file_open(task, file, O_WRONLY | O_APPEND, O_CREAT,
1225 NXT_FILE_OWNER_ACCESS);
1226
1227 if (ret != NXT_OK) {
1228 return NXT_ERROR;
1229 }
1230
1231 } nxt_list_loop;
1232
1233 file = nxt_list_first(rt->log_files);
1234
1235 return nxt_file_stderr(file);
1236}
1237
1238
1239nxt_int_t
1240nxt_runtime_listen_sockets_create(nxt_task_t *task, nxt_runtime_t *rt)
1241{
1242 nxt_int_t ret;
1243 nxt_uint_t c, p, ncurr, nprev;
1244 nxt_listen_socket_t *curr, *prev;
1245
1246 curr = rt->listen_sockets->elts;
1247 ncurr = rt->listen_sockets->nelts;
1248
1249 if (rt->inherited_sockets != NULL) {
1250 prev = rt->inherited_sockets->elts;
1251 nprev = rt->inherited_sockets->nelts;
1252
1253 } else {
1254 prev = NULL;
1255 nprev = 0;
1256 }
1257
1258 for (c = 0; c < ncurr; c++) {
1259
1260 for (p = 0; p < nprev; p++) {
1261
1262 if (nxt_sockaddr_cmp(curr[c].sockaddr, prev[p].sockaddr)) {
1263
1264 ret = nxt_listen_socket_update(task, &curr[c], &prev[p]);
1265 if (ret != NXT_OK) {
1266 return NXT_ERROR;
1267 }
1268
1269 goto next;
1270 }
1271 }
1272
1273 if (nxt_listen_socket_create(task, rt->mem_pool, &curr[c]) != NXT_OK) {
1274 return NXT_ERROR;
1275 }
1276
1277 next:
1278
1279 continue;
1280 }
1281
1282 return NXT_OK;
1283}
1284
1285
1286nxt_int_t
1287nxt_runtime_listen_sockets_enable(nxt_task_t *task, nxt_runtime_t *rt)
1288{
1289 nxt_uint_t i, n;
1290 nxt_listen_socket_t *ls;
1291
1292 ls = rt->listen_sockets->elts;
1293 n = rt->listen_sockets->nelts;
1294
1295 for (i = 0; i < n; i++) {
1296 if (ls[i].flags == NXT_NONBLOCK) {
1297 if (nxt_listen_event(task, &ls[i]) == NULL) {
1298 return NXT_ERROR;
1299 }
1300 }
1301 }
1302
1303 return NXT_OK;
1304}
1305
1306
1307nxt_str_t *
1308nxt_current_directory(nxt_mp_t *mp)
1309{
1310 size_t length;
1311 u_char *p;
1312 nxt_str_t *name;
1313 char buf[NXT_MAX_PATH_LEN];
1314
1315 length = nxt_dir_current(buf, NXT_MAX_PATH_LEN);
1316
1317 if (nxt_fast_path(length != 0)) {
1318 name = nxt_str_alloc(mp, length + 1);
1319
1320 if (nxt_fast_path(name != NULL)) {
1321 p = nxt_cpymem(name->start, buf, length);
1322 *p = '/';
1323
1324 return name;
1325 }
1326 }
1327
1328 return NULL;
1329}
1330
1331
1332static nxt_int_t
1333nxt_runtime_pid_file_create(nxt_task_t *task, nxt_file_name_t *pid_file)
1334{
1335 ssize_t length;
1336 nxt_int_t n;
1337 nxt_file_t file;
1338 u_char pid[NXT_INT64_T_LEN + nxt_length("\n")];
1339
1340 nxt_memzero(&file, sizeof(nxt_file_t));
1341
1342 file.name = pid_file;
1343
1344 n = nxt_file_open(task, &file, O_WRONLY, O_CREAT | O_TRUNC,
1345 NXT_FILE_DEFAULT_ACCESS);
1346
1347 if (n != NXT_OK) {
1348 return NXT_ERROR;
1349 }
1350
1351 length = nxt_sprintf(pid, pid + sizeof(pid), "%PI%n", nxt_pid) - pid;
1352
1353 if (nxt_file_write(&file, pid, length, 0) != length) {
1354 return NXT_ERROR;
1355 }
1356
1357 nxt_file_close(task, &file);
1358
1359 return NXT_OK;
1360}
1361
1362
1363nxt_process_t *
1364nxt_runtime_process_new(nxt_runtime_t *rt)
1365{
1366 nxt_process_t *process;
1367
1368 /* TODO: memory failures. */
1369
1370 process = nxt_mp_zalloc(rt->mem_pool,
1371 sizeof(nxt_process_t) + sizeof(nxt_process_init_t));
1372
1373 if (nxt_slow_path(process == NULL)) {
1374 return NULL;
1375 }
1376
1377 nxt_queue_init(&process->ports);
1378
1379 nxt_thread_mutex_create(&process->incoming.mutex);
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_main.h>
9#include <nxt_runtime.h>
10#include <nxt_port.h>
11#include <nxt_main_process.h>
12#include <nxt_router.h>
13
14
15static nxt_int_t nxt_runtime_inherited_listen_sockets(nxt_task_t *task,
16 nxt_runtime_t *rt);
17static nxt_int_t nxt_runtime_systemd_listen_sockets(nxt_task_t *task,
18 nxt_runtime_t *rt);
19static nxt_int_t nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt);
20static nxt_int_t nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt);
21static void nxt_runtime_start(nxt_task_t *task, void *obj, void *data);
22static void nxt_runtime_initial_start(nxt_task_t *task, nxt_uint_t status);
23static void nxt_runtime_close_idle_connections(nxt_event_engine_t *engine);
24static void nxt_runtime_stop_all_processes(nxt_task_t *task, nxt_runtime_t *rt);
25static void nxt_runtime_exit(nxt_task_t *task, void *obj, void *data);
26static nxt_int_t nxt_runtime_event_engine_change(nxt_task_t *task,
27 nxt_runtime_t *rt);
28static nxt_int_t nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt);
29static nxt_int_t nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt);
30static nxt_int_t nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt);
31static nxt_int_t nxt_runtime_log_files_init(nxt_runtime_t *rt);
32static nxt_int_t nxt_runtime_log_files_create(nxt_task_t *task,
33 nxt_runtime_t *rt);
34static nxt_int_t nxt_runtime_pid_file_create(nxt_task_t *task,
35 nxt_file_name_t *pid_file);
36static void nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt,
37 nxt_runtime_cont_t cont);
38static void nxt_runtime_thread_pool_init(void);
39static void nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj,
40 void *data);
41static nxt_process_t *nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid);
42static void nxt_runtime_process_remove(nxt_runtime_t *rt,
43 nxt_process_t *process);
44static void nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port);
45
46
47nxt_int_t
48nxt_runtime_create(nxt_task_t *task)
49{
50 nxt_mp_t *mp;
51 nxt_int_t ret;
52 nxt_array_t *listen_sockets;
53 nxt_runtime_t *rt;
54 nxt_app_lang_module_t *lang;
55
56 mp = nxt_mp_create(1024, 128, 256, 32);
57 if (nxt_slow_path(mp == NULL)) {
58 return NXT_ERROR;
59 }
60
61 rt = nxt_mp_zget(mp, sizeof(nxt_runtime_t));
62 if (nxt_slow_path(rt == NULL)) {
63 goto fail;
64 }
65
66 task->thread->runtime = rt;
67 rt->mem_pool = mp;
68
69 nxt_thread_mutex_create(&rt->processes_mutex);
70
71 rt->services = nxt_services_init(mp);
72 if (nxt_slow_path(rt->services == NULL)) {
73 goto fail;
74 }
75
76 rt->languages = nxt_array_create(mp, 1, sizeof(nxt_app_lang_module_t));
77 if (nxt_slow_path(rt->languages == NULL)) {
78 goto fail;
79 }
80
81 /* Should not fail. */
82 lang = nxt_array_add(rt->languages);
83 lang->type = NXT_APP_EXTERNAL;
84 lang->version = (u_char *) "";
85 lang->file = NULL;
86 lang->module = &nxt_external_module;
87 lang->mounts = NULL;
88
89 listen_sockets = nxt_array_create(mp, 1, sizeof(nxt_listen_socket_t));
90 if (nxt_slow_path(listen_sockets == NULL)) {
91 goto fail;
92 }
93
94 rt->listen_sockets = listen_sockets;
95
96 ret = nxt_runtime_inherited_listen_sockets(task, rt);
97 if (nxt_slow_path(ret != NXT_OK)) {
98 goto fail;
99 }
100
101 if (nxt_runtime_hostname(task, rt) != NXT_OK) {
102 goto fail;
103 }
104
105 if (nxt_slow_path(nxt_runtime_log_files_init(rt) != NXT_OK)) {
106 goto fail;
107 }
108
109 if (nxt_runtime_event_engines(task, rt) != NXT_OK) {
110 goto fail;
111 }
112
113 if (nxt_slow_path(nxt_runtime_thread_pools(task->thread, rt) != NXT_OK)) {
114 goto fail;
115 }
116
117 rt->start = nxt_runtime_initial_start;
118
119 if (nxt_runtime_conf_init(task, rt) != NXT_OK) {
120 goto fail;
121 }
122
123 if (nxt_port_rpc_init() != NXT_OK) {
124 goto fail;
125 }
126
127 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
128 nxt_runtime_start, task, rt, NULL);
129
130 return NXT_OK;
131
132fail:
133
134 nxt_mp_destroy(mp);
135
136 return NXT_ERROR;
137}
138
139
140static nxt_int_t
141nxt_runtime_inherited_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt)
142{
143 u_char *v, *p;
144 nxt_int_t type;
145 nxt_array_t *inherited_sockets;
146 nxt_socket_t s;
147 nxt_listen_socket_t *ls;
148
149 v = (u_char *) getenv("NGINX");
150
151 if (v == NULL) {
152 return nxt_runtime_systemd_listen_sockets(task, rt);
153 }
154
155 nxt_alert(task, "using inherited listen sockets: %s", v);
156
157 inherited_sockets = nxt_array_create(rt->mem_pool,
158 1, sizeof(nxt_listen_socket_t));
159 if (inherited_sockets == NULL) {
160 return NXT_ERROR;
161 }
162
163 rt->inherited_sockets = inherited_sockets;
164
165 for (p = v; *p != '\0'; p++) {
166
167 if (*p == ';') {
168 s = nxt_int_parse(v, p - v);
169
170 if (nxt_slow_path(s < 0)) {
171 nxt_alert(task, "invalid socket number \"%s\" "
172 "in NGINX environment variable, "
173 "ignoring the rest of the variable", v);
174 return NXT_ERROR;
175 }
176
177 v = p + 1;
178
179 ls = nxt_array_zero_add(inherited_sockets);
180 if (nxt_slow_path(ls == NULL)) {
181 return NXT_ERROR;
182 }
183
184 ls->socket = s;
185
186 ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s);
187 if (nxt_slow_path(ls->sockaddr == NULL)) {
188 return NXT_ERROR;
189 }
190
191 type = nxt_socket_getsockopt(task, s, SOL_SOCKET, SO_TYPE);
192 if (nxt_slow_path(type == -1)) {
193 return NXT_ERROR;
194 }
195
196 ls->sockaddr->type = (uint16_t) type;
197 }
198 }
199
200 return NXT_OK;
201}
202
203
204static nxt_int_t
205nxt_runtime_systemd_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt)
206{
207 u_char *nfd, *pid;
208 nxt_int_t n;
209 nxt_array_t *inherited_sockets;
210 nxt_socket_t s;
211 nxt_listen_socket_t *ls;
212
213 /*
214 * Number of listening sockets passed. The socket
215 * descriptors start from number 3 and are sequential.
216 */
217 nfd = (u_char *) getenv("LISTEN_FDS");
218 if (nfd == NULL) {
219 return NXT_OK;
220 }
221
222 /* The pid of the service process. */
223 pid = (u_char *) getenv("LISTEN_PID");
224 if (pid == NULL) {
225 return NXT_OK;
226 }
227
228 n = nxt_int_parse(nfd, nxt_strlen(nfd));
229 if (n < 0) {
230 return NXT_OK;
231 }
232
233 if (nxt_pid != nxt_int_parse(pid, nxt_strlen(pid))) {
234 return NXT_OK;
235 }
236
237 nxt_log(task, NXT_LOG_INFO, "using %i systemd listen sockets", n);
238
239 inherited_sockets = nxt_array_create(rt->mem_pool,
240 n, sizeof(nxt_listen_socket_t));
241 if (inherited_sockets == NULL) {
242 return NXT_ERROR;
243 }
244
245 rt->inherited_sockets = inherited_sockets;
246
247 for (s = 3; s < n; s++) {
248 ls = nxt_array_zero_add(inherited_sockets);
249 if (nxt_slow_path(ls == NULL)) {
250 return NXT_ERROR;
251 }
252
253 ls->socket = s;
254
255 ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s);
256 if (nxt_slow_path(ls->sockaddr == NULL)) {
257 return NXT_ERROR;
258 }
259
260 ls->sockaddr->type = SOCK_STREAM;
261 }
262
263 return NXT_OK;
264}
265
266
267static nxt_int_t
268nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt)
269{
270 nxt_thread_t *thread;
271 nxt_event_engine_t *engine;
272 const nxt_event_interface_t *interface;
273
274 interface = nxt_service_get(rt->services, "engine", NULL);
275
276 if (nxt_slow_path(interface == NULL)) {
277 /* TODO: log */
278 return NXT_ERROR;
279 }
280
281 engine = nxt_event_engine_create(task, interface,
282 nxt_main_process_signals, 0, 0);
283
284 if (nxt_slow_path(engine == NULL)) {
285 return NXT_ERROR;
286 }
287
288 thread = task->thread;
289 thread->engine = engine;
290#if 0
291 thread->fiber = &engine->fibers->fiber;
292#endif
293
294 engine->id = rt->last_engine_id++;
295 engine->mem_pool = nxt_mp_create(1024, 128, 256, 32);
296
297 nxt_queue_init(&rt->engines);
298 nxt_queue_insert_tail(&rt->engines, &engine->link);
299
300 return NXT_OK;
301}
302
303
304static nxt_int_t
305nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt)
306{
307 nxt_int_t ret;
308 nxt_array_t *thread_pools;
309
310 thread_pools = nxt_array_create(rt->mem_pool, 1,
311 sizeof(nxt_thread_pool_t *));
312
313 if (nxt_slow_path(thread_pools == NULL)) {
314 return NXT_ERROR;
315 }
316
317 rt->thread_pools = thread_pools;
318 ret = nxt_runtime_thread_pool_create(thr, rt, 2, 60000 * 1000000LL);
319
320 if (nxt_slow_path(ret != NXT_OK)) {
321 return NXT_ERROR;
322 }
323
324 return NXT_OK;
325}
326
327
328static void
329nxt_runtime_start(nxt_task_t *task, void *obj, void *data)
330{
331 nxt_runtime_t *rt;
332
333 rt = obj;
334
335 nxt_debug(task, "rt conf done");
336
337 task->thread->log->ctx_handler = NULL;
338 task->thread->log->ctx = NULL;
339
340 if (nxt_runtime_log_files_create(task, rt) != NXT_OK) {
341 goto fail;
342 }
343
344 if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) {
345 goto fail;
346 }
347
348 /*
349 * Thread pools should be destroyed before starting worker
350 * processes, because thread pool semaphores will stick in
351 * locked state in new processes after fork().
352 */
353 nxt_runtime_thread_pool_destroy(task, rt, rt->start);
354
355 return;
356
357fail:
358
359 nxt_runtime_quit(task, 1);
360}
361
362
363static void
364nxt_runtime_initial_start(nxt_task_t *task, nxt_uint_t status)
365{
366 nxt_int_t ret;
367 nxt_thread_t *thr;
368 nxt_runtime_t *rt;
369 const nxt_event_interface_t *interface;
370
371 thr = task->thread;
372 rt = thr->runtime;
373
374 if (rt->inherited_sockets == NULL && rt->daemon) {
375
376 if (nxt_process_daemon(task) != NXT_OK) {
377 goto fail;
378 }
379
380 /*
381 * An event engine should be updated after fork()
382 * even if an event facility was not changed because:
383 * 1) inherited kqueue descriptor is invalid,
384 * 2) the signal thread is not inherited.
385 */
386 interface = nxt_service_get(rt->services, "engine", rt->engine);
387 if (interface == NULL) {
388 goto fail;
389 }
390
391 ret = nxt_event_engine_change(task->thread->engine, interface,
392 rt->batch);
393 if (ret != NXT_OK) {
394 goto fail;
395 }
396 }
397
398 ret = nxt_runtime_pid_file_create(task, rt->pid_file);
399 if (ret != NXT_OK) {
400 goto fail;
401 }
402
403 if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) {
404 goto fail;
405 }
406
407 thr->engine->max_connections = rt->engine_connections;
408
409 if (nxt_main_process_start(thr, task, rt) != NXT_ERROR) {
410 return;
411 }
412
413fail:
414
415 nxt_runtime_quit(task, 1);
416}
417
418
419void
420nxt_runtime_quit(nxt_task_t *task, nxt_uint_t status)
421{
422 nxt_bool_t done;
423 nxt_runtime_t *rt;
424 nxt_event_engine_t *engine;
425
426 rt = task->thread->runtime;
427 rt->status |= status;
428 engine = task->thread->engine;
429
430 nxt_debug(task, "exiting");
431
432 done = 1;
433
434 if (!engine->shutdown) {
435 engine->shutdown = 1;
436
437 if (!nxt_array_is_empty(rt->thread_pools)) {
438 nxt_runtime_thread_pool_destroy(task, rt, nxt_runtime_quit);
439 done = 0;
440 }
441
442 if (rt->type == NXT_PROCESS_MAIN) {
443 nxt_runtime_stop_all_processes(task, rt);
444 done = 0;
445 }
446 }
447
448 nxt_runtime_close_idle_connections(engine);
449
450 if (done) {
451 nxt_work_queue_add(&engine->fast_work_queue, nxt_runtime_exit,
452 task, rt, engine);
453 }
454}
455
456
457static void
458nxt_runtime_close_idle_connections(nxt_event_engine_t *engine)
459{
460 nxt_conn_t *c;
461 nxt_queue_t *idle;
462 nxt_queue_link_t *link, *next;
463
464 nxt_debug(&engine->task, "close idle connections");
465
466 idle = &engine->idle_connections;
467
468 for (link = nxt_queue_first(idle);
469 link != nxt_queue_tail(idle);
470 link = next)
471 {
472 next = nxt_queue_next(link);
473 c = nxt_queue_link_data(link, nxt_conn_t, link);
474
475 if (!c->socket.read_ready) {
476 nxt_queue_remove(link);
477 nxt_conn_close(engine, c);
478 }
479 }
480}
481
482
483void
484nxt_runtime_stop_app_processes(nxt_task_t *task, nxt_runtime_t *rt)
485{
486 nxt_port_t *port;
487 nxt_process_t *process;
488 nxt_process_init_t *init;
489
490 nxt_runtime_process_each(rt, process) {
491
492 init = nxt_process_init(process);
493
494 if (init->type == NXT_PROCESS_APP) {
495
496 nxt_process_port_each(process, port) {
497
498 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1,
499 0, 0, NULL);
500
501 } nxt_process_port_loop;
502 }
503
504 } nxt_runtime_process_loop;
505}
506
507
508static void
509nxt_runtime_stop_all_processes(nxt_task_t *task, nxt_runtime_t *rt)
510{
511 nxt_port_t *port;
512 nxt_process_t *process;
513
514 nxt_runtime_process_each(rt, process) {
515
516 nxt_process_port_each(process, port) {
517
518 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0,
519 0, NULL);
520
521 } nxt_process_port_loop;
522
523 } nxt_runtime_process_loop;
524}
525
526
527static void
528nxt_runtime_exit(nxt_task_t *task, void *obj, void *data)
529{
530 int status, engine_count;
531 nxt_runtime_t *rt;
532 nxt_process_t *process;
533 nxt_event_engine_t *engine;
534
535 rt = obj;
536 engine = data;
537
538 nxt_debug(task, "thread pools: %d", rt->thread_pools->nelts);
539
540 if (!nxt_array_is_empty(rt->thread_pools)) {
541 return;
542 }
543
544 if (rt->type == NXT_PROCESS_MAIN) {
545 if (rt->pid_file != NULL) {
546 nxt_file_delete(rt->pid_file);
547 }
548
549#if (NXT_HAVE_UNIX_DOMAIN)
550 {
551 nxt_sockaddr_t *sa;
552 nxt_file_name_t *name;
553
554 sa = rt->controller_listen;
555
556 if (sa->u.sockaddr.sa_family == AF_UNIX) {
557 name = (nxt_file_name_t *) sa->u.sockaddr_un.sun_path;
558 (void) nxt_file_delete(name);
559 }
560 }
561#endif
562 }
563
564 if (!engine->event.signal_support) {
565 nxt_event_engine_signals_stop(engine);
566 }
567
568 nxt_runtime_process_each(rt, process) {
569
570 nxt_process_close_ports(task, process);
571
572 } nxt_runtime_process_loop;
573
574 status = rt->status;
575
576 engine_count = 0;
577
578 nxt_queue_each(engine, &rt->engines, nxt_event_engine_t, link) {
579
580 engine_count++;
581
582 } nxt_queue_loop;
583
584 if (engine_count <= 1) {
585 if (rt->port_by_type[rt->type] != NULL) {
586 nxt_port_use(task, rt->port_by_type[rt->type], -1);
587 }
588
589 nxt_thread_mutex_destroy(&rt->processes_mutex);
590
591 nxt_mp_destroy(rt->mem_pool);
592 }
593
594 nxt_debug(task, "exit: %d", status);
595
596 exit(status);
597 nxt_unreachable();
598}
599
600
601static nxt_int_t
602nxt_runtime_event_engine_change(nxt_task_t *task, nxt_runtime_t *rt)
603{
604 nxt_event_engine_t *engine;
605 const nxt_event_interface_t *interface;
606
607 engine = task->thread->engine;
608
609 if (engine->batch == rt->batch
610 && nxt_strcmp(engine->event.name, rt->engine) == 0)
611 {
612 return NXT_OK;
613 }
614
615 interface = nxt_service_get(rt->services, "engine", rt->engine);
616
617 if (interface != NULL) {
618 return nxt_event_engine_change(engine, interface, rt->batch);
619 }
620
621 return NXT_ERROR;
622}
623
624
625void
626nxt_runtime_event_engine_free(nxt_runtime_t *rt)
627{
628 nxt_queue_link_t *link;
629 nxt_event_engine_t *engine;
630
631 link = nxt_queue_first(&rt->engines);
632 nxt_queue_remove(link);
633
634 engine = nxt_queue_link_data(link, nxt_event_engine_t, link);
635 nxt_event_engine_free(engine);
636}
637
638
639nxt_int_t
640nxt_runtime_thread_pool_create(nxt_thread_t *thr, nxt_runtime_t *rt,
641 nxt_uint_t max_threads, nxt_nsec_t timeout)
642{
643 nxt_thread_pool_t *thread_pool, **tp;
644
645 tp = nxt_array_add(rt->thread_pools);
646 if (tp == NULL) {
647 return NXT_ERROR;
648 }
649
650 thread_pool = nxt_thread_pool_create(max_threads, timeout,
651 nxt_runtime_thread_pool_init,
652 thr->engine,
653 nxt_runtime_thread_pool_exit);
654
655 if (nxt_fast_path(thread_pool != NULL)) {
656 *tp = thread_pool;
657 }
658
659 return NXT_OK;
660}
661
662
663static void
664nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt,
665 nxt_runtime_cont_t cont)
666{
667 nxt_uint_t n;
668 nxt_thread_pool_t **tp;
669
670 rt->continuation = cont;
671
672 n = rt->thread_pools->nelts;
673
674 if (n == 0) {
675 cont(task, 0);
676 return;
677 }
678
679 tp = rt->thread_pools->elts;
680
681 do {
682 nxt_thread_pool_destroy(*tp);
683
684 tp++;
685 n--;
686 } while (n != 0);
687}
688
689
690static void
691nxt_runtime_thread_pool_init(void)
692{
693#if (NXT_REGEX)
694 nxt_regex_init(0);
695#endif
696}
697
698
699static void
700nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj, void *data)
701{
702 nxt_uint_t i, n;
703 nxt_runtime_t *rt;
704 nxt_thread_pool_t *tp, **thread_pools;
705 nxt_thread_handle_t handle;
706
707 tp = obj;
708
709 if (data != NULL) {
710 handle = (nxt_thread_handle_t) (uintptr_t) data;
711 nxt_thread_wait(handle);
712 }
713
714 rt = task->thread->runtime;
715
716 thread_pools = rt->thread_pools->elts;
717 n = rt->thread_pools->nelts;
718
719 nxt_debug(task, "thread pools: %ui", n);
720
721 for (i = 0; i < n; i++) {
722
723 if (tp == thread_pools[i]) {
724 nxt_array_remove(rt->thread_pools, &thread_pools[i]);
725
726 nxt_free(tp);
727
728 if (n == 1) {
729 /* The last thread pool. */
730 rt->continuation(task, 0);
731 }
732
733 return;
734 }
735 }
736}
737
738
739static nxt_int_t
740nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt)
741{
742 nxt_int_t ret;
743 nxt_str_t control;
744 nxt_uint_t n;
745 nxt_file_t *file;
746 const char *slash;
747 nxt_sockaddr_t *sa;
748 nxt_file_name_str_t file_name;
749 const nxt_event_interface_t *interface;
750
751 rt->daemon = 1;
752 rt->engine_connections = 256;
753 rt->auxiliary_threads = 2;
754 rt->user_cred.user = NXT_USER;
755 rt->group = NXT_GROUP;
756 rt->pid = NXT_PID;
757 rt->log = NXT_LOG;
758 rt->modules = NXT_MODULES;
759 rt->state = NXT_STATE;
760 rt->control = NXT_CONTROL_SOCK;
761 rt->tmp = NXT_TMP;
762
763 nxt_memzero(&rt->capabilities, sizeof(nxt_capabilities_t));
764
765 if (nxt_runtime_conf_read_cmd(task, rt) != NXT_OK) {
766 return NXT_ERROR;
767 }
768
769 if (nxt_capability_set(task, &rt->capabilities) != NXT_OK) {
770 return NXT_ERROR;
771 }
772
773 if (rt->capabilities.setid) {
774 ret = nxt_credential_get(task, rt->mem_pool, &rt->user_cred,
775 rt->group);
776
777 if (nxt_slow_path(ret != NXT_OK)) {
778 return NXT_ERROR;
779 }
780
781 } else {
782 nxt_log(task, NXT_LOG_WARN, "Unit is running unprivileged, then it "
783 "cannot use arbitrary user and group.");
784 }
785
786 /* An engine's parameters. */
787
788 interface = nxt_service_get(rt->services, "engine", rt->engine);
789 if (interface == NULL) {
790 return NXT_ERROR;
791 }
792
793 rt->engine = interface->name;
794
795 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%Z", rt->pid);
796 if (nxt_slow_path(ret != NXT_OK)) {
797 return NXT_ERROR;
798 }
799
800 rt->pid_file = file_name.start;
801
802 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%Z", rt->log);
803 if (nxt_slow_path(ret != NXT_OK)) {
804 return NXT_ERROR;
805 }
806
807 file = nxt_list_first(rt->log_files);
808 file->name = file_name.start;
809
810 slash = "";
811 n = nxt_strlen(rt->modules);
812
813 if (n > 1 && rt->modules[n - 1] != '/') {
814 slash = "/";
815 }
816
817 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%s*.unit.so%Z",
818 rt->modules, slash);
819 if (nxt_slow_path(ret != NXT_OK)) {
820 return NXT_ERROR;
821 }
822
823 rt->modules = (char *) file_name.start;
824
825 slash = "";
826 n = nxt_strlen(rt->state);
827
828 if (n > 1 && rt->state[n - 1] != '/') {
829 slash = "/";
830 }
831
832 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%sconf.json%Z",
833 rt->state, slash);
834 if (nxt_slow_path(ret != NXT_OK)) {
835 return NXT_ERROR;
836 }
837
838 rt->conf = (char *) file_name.start;
839
840 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s.tmp%Z", rt->conf);
841 if (nxt_slow_path(ret != NXT_OK)) {
842 return NXT_ERROR;
843 }
844
845 rt->conf_tmp = (char *) file_name.start;
846
847 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%scerts/%Z",
848 rt->state, slash);
849 if (nxt_slow_path(ret != NXT_OK)) {
850 return NXT_ERROR;
851 }
852
853 ret = mkdir((char *) file_name.start, S_IRWXU);
854
855 if (nxt_fast_path(ret == 0 || nxt_errno == EEXIST)) {
856 rt->certs.length = file_name.len;
857 rt->certs.start = file_name.start;
858
859 } else {
860 nxt_alert(task, "Unable to create certificates storage directory: "
861 "mkdir(%s) failed %E", file_name.start, nxt_errno);
862 }
863
864 control.length = nxt_strlen(rt->control);
865 control.start = (u_char *) rt->control;
866
867 sa = nxt_sockaddr_parse(rt->mem_pool, &control);
868 if (nxt_slow_path(sa == NULL)) {
869 return NXT_ERROR;
870 }
871
872 sa->type = SOCK_STREAM;
873
874 rt->controller_listen = sa;
875
876 if (nxt_runtime_controller_socket(task, rt) != NXT_OK) {
877 return NXT_ERROR;
878 }
879
880 return NXT_OK;
881}
882
883
884static nxt_int_t
885nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt)
886{
887 char *p, **argv;
888 u_char *end;
889 u_char buf[1024];
890
891 static const char version[] =
892 "unit version: " NXT_VERSION "\n"
893 "configured as ./configure" NXT_CONFIGURE_OPTIONS "\n";
894
895 static const char no_control[] =
896 "option \"--control\" requires socket address\n";
897 static const char no_user[] = "option \"--user\" requires username\n";
898 static const char no_group[] = "option \"--group\" requires group name\n";
899 static const char no_pid[] = "option \"--pid\" requires filename\n";
900 static const char no_log[] = "option \"--log\" requires filename\n";
901 static const char no_modules[] =
902 "option \"--modules\" requires directory\n";
903 static const char no_state[] = "option \"--state\" requires directory\n";
904 static const char no_tmp[] = "option \"--tmp\" requires directory\n";
905
906 static const char help[] =
907 "\n"
908 "unit options:\n"
909 "\n"
910 " --version print unit version and configure options\n"
911 "\n"
912 " --no-daemon run unit in non-daemon mode\n"
913 "\n"
914 " --control ADDRESS set address of control API socket\n"
915 " default: \"" NXT_CONTROL_SOCK "\"\n"
916 "\n"
917 " --pid FILE set pid filename\n"
918 " default: \"" NXT_PID "\"\n"
919 "\n"
920 " --log FILE set log filename\n"
921 " default: \"" NXT_LOG "\"\n"
922 "\n"
923 " --modules DIRECTORY set modules directory name\n"
924 " default: \"" NXT_MODULES "\"\n"
925 "\n"
926 " --state DIRECTORY set state directory name\n"
927 " default: \"" NXT_STATE "\"\n"
928 "\n"
929 " --tmp DIRECTORY set tmp directory name\n"
930 " default: \"" NXT_TMP "\"\n"
931 "\n"
932 " --user USER set non-privileged processes to run"
933 " as specified user\n"
934 " default: \"" NXT_USER "\"\n"
935 "\n"
936 " --group GROUP set non-privileged processes to run"
937 " as specified group\n"
938 " default: ";
939
940 static const char group[] = "\"" NXT_GROUP "\"\n\n";
941 static const char primary[] = "user's primary group\n\n";
942
943 argv = &nxt_process_argv[1];
944
945 while (*argv != NULL) {
946 p = *argv++;
947
948 if (nxt_strcmp(p, "--control") == 0) {
949 if (*argv == NULL) {
950 write(STDERR_FILENO, no_control, nxt_length(no_control));
951 return NXT_ERROR;
952 }
953
954 p = *argv++;
955
956 rt->control = p;
957
958 continue;
959 }
960
961 if (nxt_strcmp(p, "--user") == 0) {
962 if (*argv == NULL) {
963 write(STDERR_FILENO, no_user, nxt_length(no_user));
964 return NXT_ERROR;
965 }
966
967 p = *argv++;
968
969 rt->user_cred.user = p;
970
971 continue;
972 }
973
974 if (nxt_strcmp(p, "--group") == 0) {
975 if (*argv == NULL) {
976 write(STDERR_FILENO, no_group, nxt_length(no_group));
977 return NXT_ERROR;
978 }
979
980 p = *argv++;
981
982 rt->group = p;
983
984 continue;
985 }
986
987 if (nxt_strcmp(p, "--pid") == 0) {
988 if (*argv == NULL) {
989 write(STDERR_FILENO, no_pid, nxt_length(no_pid));
990 return NXT_ERROR;
991 }
992
993 p = *argv++;
994
995 rt->pid = p;
996
997 continue;
998 }
999
1000 if (nxt_strcmp(p, "--log") == 0) {
1001 if (*argv == NULL) {
1002 write(STDERR_FILENO, no_log, nxt_length(no_log));
1003 return NXT_ERROR;
1004 }
1005
1006 p = *argv++;
1007
1008 rt->log = p;
1009
1010 continue;
1011 }
1012
1013 if (nxt_strcmp(p, "--modules") == 0) {
1014 if (*argv == NULL) {
1015 write(STDERR_FILENO, no_modules, nxt_length(no_modules));
1016 return NXT_ERROR;
1017 }
1018
1019 p = *argv++;
1020
1021 rt->modules = p;
1022
1023 continue;
1024 }
1025
1026 if (nxt_strcmp(p, "--state") == 0) {
1027 if (*argv == NULL) {
1028 write(STDERR_FILENO, no_state, nxt_length(no_state));
1029 return NXT_ERROR;
1030 }
1031
1032 p = *argv++;
1033
1034 rt->state = p;
1035
1036 continue;
1037 }
1038
1039 if (nxt_strcmp(p, "--tmp") == 0) {
1040 if (*argv == NULL) {
1041 write(STDERR_FILENO, no_tmp, nxt_length(no_tmp));
1042 return NXT_ERROR;
1043 }
1044
1045 p = *argv++;
1046
1047 rt->tmp = p;
1048
1049 continue;
1050 }
1051
1052 if (nxt_strcmp(p, "--no-daemon") == 0) {
1053 rt->daemon = 0;
1054 continue;
1055 }
1056
1057 if (nxt_strcmp(p, "--version") == 0) {
1058 write(STDERR_FILENO, version, nxt_length(version));
1059 exit(0);
1060 }
1061
1062 if (nxt_strcmp(p, "--help") == 0 || nxt_strcmp(p, "-h") == 0) {
1063 write(STDOUT_FILENO, help, nxt_length(help));
1064
1065 if (sizeof(NXT_GROUP) == 1) {
1066 write(STDOUT_FILENO, primary, nxt_length(primary));
1067
1068 } else {
1069 write(STDOUT_FILENO, group, nxt_length(group));
1070 }
1071
1072 exit(0);
1073 }
1074
1075 end = nxt_sprintf(buf, buf + sizeof(buf), "unknown option \"%s\", "
1076 "try \"%s -h\" for available options\n",
1077 p, nxt_process_argv[0]);
1078
1079 write(STDERR_FILENO, buf, end - buf);
1080
1081 return NXT_ERROR;
1082 }
1083
1084 return NXT_OK;
1085}
1086
1087
1088nxt_listen_socket_t *
1089nxt_runtime_listen_socket_add(nxt_runtime_t *rt, nxt_sockaddr_t *sa)
1090{
1091 nxt_mp_t *mp;
1092 nxt_listen_socket_t *ls;
1093
1094 ls = nxt_array_zero_add(rt->listen_sockets);
1095 if (ls == NULL) {
1096 return NULL;
1097 }
1098
1099 mp = rt->mem_pool;
1100
1101 ls->sockaddr = nxt_sockaddr_create(mp, &sa->u.sockaddr, sa->socklen,
1102 sa->length);
1103 if (ls->sockaddr == NULL) {
1104 return NULL;
1105 }
1106
1107 ls->sockaddr->type = sa->type;
1108
1109 nxt_sockaddr_text(ls->sockaddr);
1110
1111 ls->socket = -1;
1112 ls->backlog = NXT_LISTEN_BACKLOG;
1113
1114 return ls;
1115}
1116
1117
1118static nxt_int_t
1119nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt)
1120{
1121 size_t length;
1122 char hostname[NXT_MAXHOSTNAMELEN + 1];
1123
1124 if (gethostname(hostname, NXT_MAXHOSTNAMELEN) != 0) {
1125 nxt_alert(task, "gethostname() failed %E", nxt_errno);
1126 return NXT_ERROR;
1127 }
1128
1129 /*
1130 * Linux gethostname(2):
1131 *
1132 * If the null-terminated hostname is too large to fit,
1133 * then the name is truncated, and no error is returned.
1134 *
1135 * For this reason an additional byte is reserved in the buffer.
1136 */
1137 hostname[NXT_MAXHOSTNAMELEN] = '\0';
1138
1139 length = nxt_strlen(hostname);
1140 rt->hostname.length = length;
1141
1142 rt->hostname.start = nxt_mp_nget(rt->mem_pool, length);
1143
1144 if (rt->hostname.start != NULL) {
1145 nxt_memcpy_lowcase(rt->hostname.start, (u_char *) hostname, length);
1146 return NXT_OK;
1147 }
1148
1149 return NXT_ERROR;
1150}
1151
1152
1153static nxt_int_t
1154nxt_runtime_log_files_init(nxt_runtime_t *rt)
1155{
1156 nxt_file_t *file;
1157 nxt_list_t *log_files;
1158
1159 log_files = nxt_list_create(rt->mem_pool, 1, sizeof(nxt_file_t));
1160
1161 if (nxt_fast_path(log_files != NULL)) {
1162 rt->log_files = log_files;
1163
1164 /* Preallocate the main log. This allocation cannot fail. */
1165 file = nxt_list_zero_add(log_files);
1166
1167 file->fd = NXT_FILE_INVALID;
1168 file->log_level = NXT_LOG_ALERT;
1169
1170 return NXT_OK;
1171 }
1172
1173 return NXT_ERROR;
1174}
1175
1176
1177nxt_file_t *
1178nxt_runtime_log_file_add(nxt_runtime_t *rt, nxt_str_t *name)
1179{
1180 nxt_int_t ret;
1181 nxt_file_t *file;
1182 nxt_file_name_str_t file_name;
1183
1184 ret = nxt_file_name_create(rt->mem_pool, &file_name, "V%Z", name);
1185
1186 if (nxt_slow_path(ret != NXT_OK)) {
1187 return NULL;
1188 }
1189
1190 nxt_list_each(file, rt->log_files) {
1191
1192 /* STUB: hardecoded case sensitive/insensitive. */
1193
1194 if (file->name != NULL
1195 && nxt_file_name_eq(file->name, file_name.start))
1196 {
1197 return file;
1198 }
1199
1200 } nxt_list_loop;
1201
1202 file = nxt_list_zero_add(rt->log_files);
1203
1204 if (nxt_slow_path(file == NULL)) {
1205 return NULL;
1206 }
1207
1208 file->fd = NXT_FILE_INVALID;
1209 file->log_level = NXT_LOG_ALERT;
1210 file->name = file_name.start;
1211
1212 return file;
1213}
1214
1215
1216static nxt_int_t
1217nxt_runtime_log_files_create(nxt_task_t *task, nxt_runtime_t *rt)
1218{
1219 nxt_int_t ret;
1220 nxt_file_t *file;
1221
1222 nxt_list_each(file, rt->log_files) {
1223
1224 ret = nxt_file_open(task, file, O_WRONLY | O_APPEND, O_CREAT,
1225 NXT_FILE_OWNER_ACCESS);
1226
1227 if (ret != NXT_OK) {
1228 return NXT_ERROR;
1229 }
1230
1231 } nxt_list_loop;
1232
1233 file = nxt_list_first(rt->log_files);
1234
1235 return nxt_file_stderr(file);
1236}
1237
1238
1239nxt_int_t
1240nxt_runtime_listen_sockets_create(nxt_task_t *task, nxt_runtime_t *rt)
1241{
1242 nxt_int_t ret;
1243 nxt_uint_t c, p, ncurr, nprev;
1244 nxt_listen_socket_t *curr, *prev;
1245
1246 curr = rt->listen_sockets->elts;
1247 ncurr = rt->listen_sockets->nelts;
1248
1249 if (rt->inherited_sockets != NULL) {
1250 prev = rt->inherited_sockets->elts;
1251 nprev = rt->inherited_sockets->nelts;
1252
1253 } else {
1254 prev = NULL;
1255 nprev = 0;
1256 }
1257
1258 for (c = 0; c < ncurr; c++) {
1259
1260 for (p = 0; p < nprev; p++) {
1261
1262 if (nxt_sockaddr_cmp(curr[c].sockaddr, prev[p].sockaddr)) {
1263
1264 ret = nxt_listen_socket_update(task, &curr[c], &prev[p]);
1265 if (ret != NXT_OK) {
1266 return NXT_ERROR;
1267 }
1268
1269 goto next;
1270 }
1271 }
1272
1273 if (nxt_listen_socket_create(task, rt->mem_pool, &curr[c]) != NXT_OK) {
1274 return NXT_ERROR;
1275 }
1276
1277 next:
1278
1279 continue;
1280 }
1281
1282 return NXT_OK;
1283}
1284
1285
1286nxt_int_t
1287nxt_runtime_listen_sockets_enable(nxt_task_t *task, nxt_runtime_t *rt)
1288{
1289 nxt_uint_t i, n;
1290 nxt_listen_socket_t *ls;
1291
1292 ls = rt->listen_sockets->elts;
1293 n = rt->listen_sockets->nelts;
1294
1295 for (i = 0; i < n; i++) {
1296 if (ls[i].flags == NXT_NONBLOCK) {
1297 if (nxt_listen_event(task, &ls[i]) == NULL) {
1298 return NXT_ERROR;
1299 }
1300 }
1301 }
1302
1303 return NXT_OK;
1304}
1305
1306
1307nxt_str_t *
1308nxt_current_directory(nxt_mp_t *mp)
1309{
1310 size_t length;
1311 u_char *p;
1312 nxt_str_t *name;
1313 char buf[NXT_MAX_PATH_LEN];
1314
1315 length = nxt_dir_current(buf, NXT_MAX_PATH_LEN);
1316
1317 if (nxt_fast_path(length != 0)) {
1318 name = nxt_str_alloc(mp, length + 1);
1319
1320 if (nxt_fast_path(name != NULL)) {
1321 p = nxt_cpymem(name->start, buf, length);
1322 *p = '/';
1323
1324 return name;
1325 }
1326 }
1327
1328 return NULL;
1329}
1330
1331
1332static nxt_int_t
1333nxt_runtime_pid_file_create(nxt_task_t *task, nxt_file_name_t *pid_file)
1334{
1335 ssize_t length;
1336 nxt_int_t n;
1337 nxt_file_t file;
1338 u_char pid[NXT_INT64_T_LEN + nxt_length("\n")];
1339
1340 nxt_memzero(&file, sizeof(nxt_file_t));
1341
1342 file.name = pid_file;
1343
1344 n = nxt_file_open(task, &file, O_WRONLY, O_CREAT | O_TRUNC,
1345 NXT_FILE_DEFAULT_ACCESS);
1346
1347 if (n != NXT_OK) {
1348 return NXT_ERROR;
1349 }
1350
1351 length = nxt_sprintf(pid, pid + sizeof(pid), "%PI%n", nxt_pid) - pid;
1352
1353 if (nxt_file_write(&file, pid, length, 0) != length) {
1354 return NXT_ERROR;
1355 }
1356
1357 nxt_file_close(task, &file);
1358
1359 return NXT_OK;
1360}
1361
1362
1363nxt_process_t *
1364nxt_runtime_process_new(nxt_runtime_t *rt)
1365{
1366 nxt_process_t *process;
1367
1368 /* TODO: memory failures. */
1369
1370 process = nxt_mp_zalloc(rt->mem_pool,
1371 sizeof(nxt_process_t) + sizeof(nxt_process_init_t));
1372
1373 if (nxt_slow_path(process == NULL)) {
1374 return NULL;
1375 }
1376
1377 nxt_queue_init(&process->ports);
1378
1379 nxt_thread_mutex_create(&process->incoming.mutex);
1380 nxt_thread_mutex_create(&process->outgoing.mutex);
1381 nxt_thread_mutex_create(&process->cp_mutex);
1382
1383 process->use_count = 1;
1384
1385 return process;
1386}
1387
1388
1389void
1390nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process)
1391{
1392 if (process->registered == 1) {
1393 nxt_runtime_process_remove(rt, process);
1394 }
1395
1396 nxt_assert(process->use_count == 0);
1397 nxt_assert(process->registered == 0);
1398
1399 nxt_port_mmaps_destroy(&process->incoming, 1);
1380 nxt_thread_mutex_create(&process->cp_mutex);
1381
1382 process->use_count = 1;
1383
1384 return process;
1385}
1386
1387
1388void
1389nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process)
1390{
1391 if (process->registered == 1) {
1392 nxt_runtime_process_remove(rt, process);
1393 }
1394
1395 nxt_assert(process->use_count == 0);
1396 nxt_assert(process->registered == 0);
1397
1398 nxt_port_mmaps_destroy(&process->incoming, 1);
1400 nxt_port_mmaps_destroy(&process->outgoing, 1);
1401
1402 nxt_thread_mutex_destroy(&process->incoming.mutex);
1399
1400 nxt_thread_mutex_destroy(&process->incoming.mutex);
1403 nxt_thread_mutex_destroy(&process->outgoing.mutex);
1404 nxt_thread_mutex_destroy(&process->cp_mutex);
1405
1406 /* processes from nxt_runtime_process_get() have no memory pool */
1407 if (process->mem_pool != NULL) {
1408 nxt_mp_destroy(process->mem_pool);
1409 }
1410
1411 nxt_mp_free(rt->mem_pool, process);
1412}
1413
1414
1415static nxt_int_t
1416nxt_runtime_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data)
1417{
1418 nxt_process_t *process;
1419
1420 process = data;
1421
1422 if (lhq->key.length == sizeof(nxt_pid_t)
1423 && *(nxt_pid_t *) lhq->key.start == process->pid)
1424 {
1425 return NXT_OK;
1426 }
1427
1428 return NXT_DECLINED;
1429}
1430
1431static const nxt_lvlhsh_proto_t lvlhsh_processes_proto nxt_aligned(64) = {
1432 NXT_LVLHSH_DEFAULT,
1433 nxt_runtime_lvlhsh_pid_test,
1434 nxt_lvlhsh_alloc,
1435 nxt_lvlhsh_free,
1436};
1437
1438
1439nxt_inline void
1440nxt_runtime_process_lhq_pid(nxt_lvlhsh_query_t *lhq, nxt_pid_t *pid)
1441{
1442 lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid));
1443 lhq->key.length = sizeof(*pid);
1444 lhq->key.start = (u_char *) pid;
1445 lhq->proto = &lvlhsh_processes_proto;
1446}
1447
1448
1449nxt_process_t *
1450nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid)
1451{
1452 nxt_process_t *process;
1453 nxt_lvlhsh_query_t lhq;
1454
1455 process = NULL;
1456
1457 nxt_runtime_process_lhq_pid(&lhq, &pid);
1458
1459 nxt_thread_mutex_lock(&rt->processes_mutex);
1460
1461 if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) {
1462 process = lhq.value;
1463
1464 } else {
1465 nxt_thread_log_debug("process %PI not found", pid);
1466 }
1467
1468 nxt_thread_mutex_unlock(&rt->processes_mutex);
1469
1470 return process;
1471}
1472
1473
1474static nxt_process_t *
1475nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid)
1476{
1477 nxt_process_t *process;
1478 nxt_lvlhsh_query_t lhq;
1479
1480 nxt_runtime_process_lhq_pid(&lhq, &pid);
1481
1482 nxt_thread_mutex_lock(&rt->processes_mutex);
1483
1484 if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) {
1485 nxt_thread_log_debug("process %PI found", pid);
1486
1487 nxt_thread_mutex_unlock(&rt->processes_mutex);
1488
1489 process = lhq.value;
1490 process->use_count++;
1491
1492 return process;
1493 }
1494
1495 process = nxt_runtime_process_new(rt);
1496 if (nxt_slow_path(process == NULL)) {
1497
1498 nxt_thread_mutex_unlock(&rt->processes_mutex);
1499
1500 return NULL;
1501 }
1502
1503 process->pid = pid;
1504
1505 lhq.replace = 0;
1506 lhq.value = process;
1507 lhq.pool = rt->mem_pool;
1508
1509 switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) {
1510
1511 case NXT_OK:
1512 if (rt->nprocesses == 0) {
1513 rt->mprocess = process;
1514 }
1515
1516 rt->nprocesses++;
1517
1518 process->registered = 1;
1519
1520 nxt_thread_log_debug("process %PI insert", pid);
1521 break;
1522
1523 default:
1524 nxt_thread_log_debug("process %PI insert failed", pid);
1525 break;
1526 }
1527
1528 nxt_thread_mutex_unlock(&rt->processes_mutex);
1529
1530 return process;
1531}
1532
1533
1534void
1535nxt_runtime_process_add(nxt_task_t *task, nxt_process_t *process)
1536{
1537 nxt_port_t *port;
1538 nxt_runtime_t *rt;
1539 nxt_lvlhsh_query_t lhq;
1540
1541 nxt_assert(process->registered == 0);
1542
1543 rt = task->thread->runtime;
1544
1545 nxt_runtime_process_lhq_pid(&lhq, &process->pid);
1546
1547 lhq.replace = 0;
1548 lhq.value = process;
1549 lhq.pool = rt->mem_pool;
1550
1551 nxt_thread_mutex_lock(&rt->processes_mutex);
1552
1553 switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) {
1554
1555 case NXT_OK:
1556 if (rt->nprocesses == 0) {
1557 rt->mprocess = process;
1558 }
1559
1560 rt->nprocesses++;
1561
1562 nxt_process_port_each(process, port) {
1563
1564 port->pid = process->pid;
1565
1566 nxt_runtime_port_add(task, port);
1567
1568 } nxt_process_port_loop;
1569
1570 process->registered = 1;
1571
1572 nxt_thread_log_debug("process %PI added", process->pid);
1573 break;
1574
1575 default:
1576 nxt_thread_log_debug("process %PI failed to add", process->pid);
1577 break;
1578 }
1579
1580 nxt_thread_mutex_unlock(&rt->processes_mutex);
1581}
1582
1583
1584static void
1585nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process)
1586{
1587 nxt_pid_t pid;
1588 nxt_lvlhsh_query_t lhq;
1589
1590 pid = process->pid;
1591
1592 nxt_runtime_process_lhq_pid(&lhq, &pid);
1593
1594 lhq.pool = rt->mem_pool;
1595
1596 nxt_thread_mutex_lock(&rt->processes_mutex);
1597
1598 switch (nxt_lvlhsh_delete(&rt->processes, &lhq)) {
1599
1600 case NXT_OK:
1601 rt->nprocesses--;
1602
1603 process = lhq.value;
1604
1605 process->registered = 0;
1606
1607 nxt_thread_log_debug("process %PI removed", pid);
1608 break;
1609
1610 default:
1611 nxt_thread_log_debug("process %PI remove failed", pid);
1612 break;
1613 }
1614
1615 nxt_thread_mutex_unlock(&rt->processes_mutex);
1616}
1617
1618
1619nxt_process_t *
1620nxt_runtime_process_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe)
1621{
1622 nxt_lvlhsh_each_init(lhe, &lvlhsh_processes_proto);
1623
1624 return nxt_runtime_process_next(rt, lhe);
1625}
1626
1627
1628nxt_port_t *
1629nxt_runtime_process_port_create(nxt_task_t *task, nxt_runtime_t *rt,
1630 nxt_pid_t pid, nxt_port_id_t id, nxt_process_type_t type)
1631{
1632 nxt_port_t *port;
1633 nxt_process_t *process;
1634
1635 process = nxt_runtime_process_get(rt, pid);
1636 if (nxt_slow_path(process == NULL)) {
1637 return NULL;
1638 }
1639
1640 port = nxt_port_new(task, id, pid, type);
1641 if (nxt_slow_path(port == NULL)) {
1642 nxt_process_use(task, process, -1);
1643 return NULL;
1644 }
1645
1646 nxt_process_port_add(task, process, port);
1647
1648 nxt_process_use(task, process, -1);
1649
1650 nxt_runtime_port_add(task, port);
1651
1652 nxt_port_use(task, port, -1);
1653
1654 return port;
1655}
1656
1657
1658static void
1659nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port)
1660{
1661 nxt_int_t res;
1662 nxt_runtime_t *rt;
1663
1664 rt = task->thread->runtime;
1665
1666 res = nxt_port_hash_add(&rt->ports, port);
1667
1668 if (res != NXT_OK) {
1669 return;
1670 }
1671
1672 rt->port_by_type[port->type] = port;
1673
1674 nxt_port_use(task, port, 1);
1675}
1676
1677
1678void
1679nxt_runtime_port_remove(nxt_task_t *task, nxt_port_t *port)
1680{
1681 nxt_int_t res;
1682 nxt_runtime_t *rt;
1683
1684 rt = task->thread->runtime;
1685
1686 res = nxt_port_hash_remove(&rt->ports, port);
1687
1688 if (res != NXT_OK) {
1689 return;
1690 }
1691
1692 if (rt->port_by_type[port->type] == port) {
1693 rt->port_by_type[port->type] = NULL;
1694 }
1695
1696 nxt_port_use(task, port, -1);
1697}
1698
1699
1700nxt_port_t *
1701nxt_runtime_port_find(nxt_runtime_t *rt, nxt_pid_t pid,
1702 nxt_port_id_t port_id)
1703{
1704 return nxt_port_hash_find(&rt->ports, pid, port_id);
1705}
1401 nxt_thread_mutex_destroy(&process->cp_mutex);
1402
1403 /* processes from nxt_runtime_process_get() have no memory pool */
1404 if (process->mem_pool != NULL) {
1405 nxt_mp_destroy(process->mem_pool);
1406 }
1407
1408 nxt_mp_free(rt->mem_pool, process);
1409}
1410
1411
1412static nxt_int_t
1413nxt_runtime_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data)
1414{
1415 nxt_process_t *process;
1416
1417 process = data;
1418
1419 if (lhq->key.length == sizeof(nxt_pid_t)
1420 && *(nxt_pid_t *) lhq->key.start == process->pid)
1421 {
1422 return NXT_OK;
1423 }
1424
1425 return NXT_DECLINED;
1426}
1427
1428static const nxt_lvlhsh_proto_t lvlhsh_processes_proto nxt_aligned(64) = {
1429 NXT_LVLHSH_DEFAULT,
1430 nxt_runtime_lvlhsh_pid_test,
1431 nxt_lvlhsh_alloc,
1432 nxt_lvlhsh_free,
1433};
1434
1435
1436nxt_inline void
1437nxt_runtime_process_lhq_pid(nxt_lvlhsh_query_t *lhq, nxt_pid_t *pid)
1438{
1439 lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid));
1440 lhq->key.length = sizeof(*pid);
1441 lhq->key.start = (u_char *) pid;
1442 lhq->proto = &lvlhsh_processes_proto;
1443}
1444
1445
1446nxt_process_t *
1447nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid)
1448{
1449 nxt_process_t *process;
1450 nxt_lvlhsh_query_t lhq;
1451
1452 process = NULL;
1453
1454 nxt_runtime_process_lhq_pid(&lhq, &pid);
1455
1456 nxt_thread_mutex_lock(&rt->processes_mutex);
1457
1458 if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) {
1459 process = lhq.value;
1460
1461 } else {
1462 nxt_thread_log_debug("process %PI not found", pid);
1463 }
1464
1465 nxt_thread_mutex_unlock(&rt->processes_mutex);
1466
1467 return process;
1468}
1469
1470
1471static nxt_process_t *
1472nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid)
1473{
1474 nxt_process_t *process;
1475 nxt_lvlhsh_query_t lhq;
1476
1477 nxt_runtime_process_lhq_pid(&lhq, &pid);
1478
1479 nxt_thread_mutex_lock(&rt->processes_mutex);
1480
1481 if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) {
1482 nxt_thread_log_debug("process %PI found", pid);
1483
1484 nxt_thread_mutex_unlock(&rt->processes_mutex);
1485
1486 process = lhq.value;
1487 process->use_count++;
1488
1489 return process;
1490 }
1491
1492 process = nxt_runtime_process_new(rt);
1493 if (nxt_slow_path(process == NULL)) {
1494
1495 nxt_thread_mutex_unlock(&rt->processes_mutex);
1496
1497 return NULL;
1498 }
1499
1500 process->pid = pid;
1501
1502 lhq.replace = 0;
1503 lhq.value = process;
1504 lhq.pool = rt->mem_pool;
1505
1506 switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) {
1507
1508 case NXT_OK:
1509 if (rt->nprocesses == 0) {
1510 rt->mprocess = process;
1511 }
1512
1513 rt->nprocesses++;
1514
1515 process->registered = 1;
1516
1517 nxt_thread_log_debug("process %PI insert", pid);
1518 break;
1519
1520 default:
1521 nxt_thread_log_debug("process %PI insert failed", pid);
1522 break;
1523 }
1524
1525 nxt_thread_mutex_unlock(&rt->processes_mutex);
1526
1527 return process;
1528}
1529
1530
1531void
1532nxt_runtime_process_add(nxt_task_t *task, nxt_process_t *process)
1533{
1534 nxt_port_t *port;
1535 nxt_runtime_t *rt;
1536 nxt_lvlhsh_query_t lhq;
1537
1538 nxt_assert(process->registered == 0);
1539
1540 rt = task->thread->runtime;
1541
1542 nxt_runtime_process_lhq_pid(&lhq, &process->pid);
1543
1544 lhq.replace = 0;
1545 lhq.value = process;
1546 lhq.pool = rt->mem_pool;
1547
1548 nxt_thread_mutex_lock(&rt->processes_mutex);
1549
1550 switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) {
1551
1552 case NXT_OK:
1553 if (rt->nprocesses == 0) {
1554 rt->mprocess = process;
1555 }
1556
1557 rt->nprocesses++;
1558
1559 nxt_process_port_each(process, port) {
1560
1561 port->pid = process->pid;
1562
1563 nxt_runtime_port_add(task, port);
1564
1565 } nxt_process_port_loop;
1566
1567 process->registered = 1;
1568
1569 nxt_thread_log_debug("process %PI added", process->pid);
1570 break;
1571
1572 default:
1573 nxt_thread_log_debug("process %PI failed to add", process->pid);
1574 break;
1575 }
1576
1577 nxt_thread_mutex_unlock(&rt->processes_mutex);
1578}
1579
1580
1581static void
1582nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process)
1583{
1584 nxt_pid_t pid;
1585 nxt_lvlhsh_query_t lhq;
1586
1587 pid = process->pid;
1588
1589 nxt_runtime_process_lhq_pid(&lhq, &pid);
1590
1591 lhq.pool = rt->mem_pool;
1592
1593 nxt_thread_mutex_lock(&rt->processes_mutex);
1594
1595 switch (nxt_lvlhsh_delete(&rt->processes, &lhq)) {
1596
1597 case NXT_OK:
1598 rt->nprocesses--;
1599
1600 process = lhq.value;
1601
1602 process->registered = 0;
1603
1604 nxt_thread_log_debug("process %PI removed", pid);
1605 break;
1606
1607 default:
1608 nxt_thread_log_debug("process %PI remove failed", pid);
1609 break;
1610 }
1611
1612 nxt_thread_mutex_unlock(&rt->processes_mutex);
1613}
1614
1615
1616nxt_process_t *
1617nxt_runtime_process_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe)
1618{
1619 nxt_lvlhsh_each_init(lhe, &lvlhsh_processes_proto);
1620
1621 return nxt_runtime_process_next(rt, lhe);
1622}
1623
1624
1625nxt_port_t *
1626nxt_runtime_process_port_create(nxt_task_t *task, nxt_runtime_t *rt,
1627 nxt_pid_t pid, nxt_port_id_t id, nxt_process_type_t type)
1628{
1629 nxt_port_t *port;
1630 nxt_process_t *process;
1631
1632 process = nxt_runtime_process_get(rt, pid);
1633 if (nxt_slow_path(process == NULL)) {
1634 return NULL;
1635 }
1636
1637 port = nxt_port_new(task, id, pid, type);
1638 if (nxt_slow_path(port == NULL)) {
1639 nxt_process_use(task, process, -1);
1640 return NULL;
1641 }
1642
1643 nxt_process_port_add(task, process, port);
1644
1645 nxt_process_use(task, process, -1);
1646
1647 nxt_runtime_port_add(task, port);
1648
1649 nxt_port_use(task, port, -1);
1650
1651 return port;
1652}
1653
1654
1655static void
1656nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port)
1657{
1658 nxt_int_t res;
1659 nxt_runtime_t *rt;
1660
1661 rt = task->thread->runtime;
1662
1663 res = nxt_port_hash_add(&rt->ports, port);
1664
1665 if (res != NXT_OK) {
1666 return;
1667 }
1668
1669 rt->port_by_type[port->type] = port;
1670
1671 nxt_port_use(task, port, 1);
1672}
1673
1674
1675void
1676nxt_runtime_port_remove(nxt_task_t *task, nxt_port_t *port)
1677{
1678 nxt_int_t res;
1679 nxt_runtime_t *rt;
1680
1681 rt = task->thread->runtime;
1682
1683 res = nxt_port_hash_remove(&rt->ports, port);
1684
1685 if (res != NXT_OK) {
1686 return;
1687 }
1688
1689 if (rt->port_by_type[port->type] == port) {
1690 rt->port_by_type[port->type] = NULL;
1691 }
1692
1693 nxt_port_use(task, port, -1);
1694}
1695
1696
1697nxt_port_t *
1698nxt_runtime_port_find(nxt_runtime_t *rt, nxt_pid_t pid,
1699 nxt_port_id_t port_id)
1700{
1701 return nxt_port_hash_find(&rt->ports, pid, port_id);
1702}