nxt_runtime.c (697:b73f6d3709c2) nxt_runtime.c (703:2d536dde84d2)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_main.h>
9#include <nxt_runtime.h>
10#include <nxt_port.h>
11#include <nxt_main_process.h>
12#include <nxt_router.h>
13
14
15static nxt_int_t nxt_runtime_inherited_listen_sockets(nxt_task_t *task,
16 nxt_runtime_t *rt);
17static nxt_int_t nxt_runtime_systemd_listen_sockets(nxt_task_t *task,
18 nxt_runtime_t *rt);
19static nxt_int_t nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt);
20static nxt_int_t nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt);
21static void nxt_runtime_start(nxt_task_t *task, void *obj, void *data);
22static void nxt_runtime_initial_start(nxt_task_t *task, nxt_uint_t status);
23static void nxt_runtime_close_idle_connections(nxt_event_engine_t *engine);
24static void nxt_runtime_exit(nxt_task_t *task, void *obj, void *data);
25static nxt_int_t nxt_runtime_event_engine_change(nxt_task_t *task,
26 nxt_runtime_t *rt);
27static nxt_int_t nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt);
28static nxt_int_t nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt);
29static nxt_int_t nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt);
30static nxt_int_t nxt_runtime_log_files_init(nxt_runtime_t *rt);
31static nxt_int_t nxt_runtime_log_files_create(nxt_task_t *task,
32 nxt_runtime_t *rt);
33static nxt_int_t nxt_runtime_pid_file_create(nxt_task_t *task,
34 nxt_file_name_t *pid_file);
35static void nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt,
36 nxt_runtime_cont_t cont);
37static void nxt_runtime_thread_pool_init(void);
38static void nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj,
39 void *data);
40static void nxt_runtime_process_destroy(nxt_runtime_t *rt,
41 nxt_process_t *process);
42static nxt_process_t *nxt_runtime_process_remove_pid(nxt_runtime_t *rt,
43 nxt_pid_t pid);
44
45
46nxt_int_t
47nxt_runtime_create(nxt_task_t *task)
48{
49 nxt_mp_t *mp;
50 nxt_int_t ret;
51 nxt_array_t *listen_sockets;
52 nxt_runtime_t *rt;
53 nxt_app_lang_module_t *lang;
54
55 mp = nxt_mp_create(1024, 128, 256, 32);
56
57 if (nxt_slow_path(mp == NULL)) {
58 return NXT_ERROR;
59 }
60
61 rt = nxt_mp_zget(mp, sizeof(nxt_runtime_t));
62 if (nxt_slow_path(rt == NULL)) {
63 return NXT_ERROR;
64 }
65
66 task->thread->runtime = rt;
67 rt->mem_pool = mp;
68
69 nxt_thread_mutex_create(&rt->processes_mutex);
70
71 rt->services = nxt_services_init(mp);
72 if (nxt_slow_path(rt->services == NULL)) {
73 goto fail;
74 }
75
76 rt->languages = nxt_array_create(mp, 1, sizeof(nxt_app_lang_module_t));
77 if (nxt_slow_path(rt->languages == NULL)) {
78 goto fail;
79 }
80
81 /* Should not fail. */
82 lang = nxt_array_add(rt->languages);
83 lang->type = NXT_APP_GO;
84 lang->version = (u_char *) "";
85 lang->file = NULL;
86 lang->module = &nxt_go_module;
87
88 listen_sockets = nxt_array_create(mp, 1, sizeof(nxt_listen_socket_t));
89 if (nxt_slow_path(listen_sockets == NULL)) {
90 goto fail;
91 }
92
93 rt->listen_sockets = listen_sockets;
94
95 ret = nxt_runtime_inherited_listen_sockets(task, rt);
96 if (nxt_slow_path(ret != NXT_OK)) {
97 goto fail;
98 }
99
100 if (nxt_runtime_hostname(task, rt) != NXT_OK) {
101 goto fail;
102 }
103
104 if (nxt_slow_path(nxt_runtime_log_files_init(rt) != NXT_OK)) {
105 goto fail;
106 }
107
108 if (nxt_runtime_event_engines(task, rt) != NXT_OK) {
109 goto fail;
110 }
111
112 if (nxt_slow_path(nxt_runtime_thread_pools(task->thread, rt) != NXT_OK)) {
113 goto fail;
114 }
115
116 rt->start = nxt_runtime_initial_start;
117
118 if (nxt_runtime_conf_init(task, rt) != NXT_OK) {
119 goto fail;
120 }
121
122 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
123 nxt_runtime_start, task, rt, NULL);
124
125 return NXT_OK;
126
127fail:
128
129 nxt_mp_destroy(mp);
130
131 return NXT_ERROR;
132}
133
134
135static nxt_int_t
136nxt_runtime_inherited_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt)
137{
138 u_char *v, *p;
139 nxt_int_t type;
140 nxt_array_t *inherited_sockets;
141 nxt_socket_t s;
142 nxt_listen_socket_t *ls;
143
144 v = (u_char *) getenv("NGINX");
145
146 if (v == NULL) {
147 return nxt_runtime_systemd_listen_sockets(task, rt);
148 }
149
150 nxt_alert(task, "using inherited listen sockets: %s", v);
151
152 inherited_sockets = nxt_array_create(rt->mem_pool,
153 1, sizeof(nxt_listen_socket_t));
154 if (inherited_sockets == NULL) {
155 return NXT_ERROR;
156 }
157
158 rt->inherited_sockets = inherited_sockets;
159
160 for (p = v; *p != '\0'; p++) {
161
162 if (*p == ';') {
163 s = nxt_int_parse(v, p - v);
164
165 if (nxt_slow_path(s < 0)) {
166 nxt_alert(task, "invalid socket number \"%s\" "
167 "in NGINX environment variable, "
168 "ignoring the rest of the variable", v);
169 return NXT_ERROR;
170 }
171
172 v = p + 1;
173
174 ls = nxt_array_zero_add(inherited_sockets);
175 if (nxt_slow_path(ls == NULL)) {
176 return NXT_ERROR;
177 }
178
179 ls->socket = s;
180
181 ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s);
182 if (nxt_slow_path(ls->sockaddr == NULL)) {
183 return NXT_ERROR;
184 }
185
186 type = nxt_socket_getsockopt(task, s, SOL_SOCKET, SO_TYPE);
187 if (nxt_slow_path(type == -1)) {
188 return NXT_ERROR;
189 }
190
191 ls->sockaddr->type = (uint16_t) type;
192 }
193 }
194
195 return NXT_OK;
196}
197
198
199static nxt_int_t
200nxt_runtime_systemd_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt)
201{
202 u_char *nfd, *pid;
203 nxt_int_t n;
204 nxt_array_t *inherited_sockets;
205 nxt_socket_t s;
206 nxt_listen_socket_t *ls;
207
208 /*
209 * Number of listening sockets passed. The socket
210 * descriptors start from number 3 and are sequential.
211 */
212 nfd = (u_char *) getenv("LISTEN_FDS");
213 if (nfd == NULL) {
214 return NXT_OK;
215 }
216
217 /* The pid of the service process. */
218 pid = (u_char *) getenv("LISTEN_PID");
219 if (pid == NULL) {
220 return NXT_OK;
221 }
222
223 n = nxt_int_parse(nfd, nxt_strlen(nfd));
224 if (n < 0) {
225 return NXT_OK;
226 }
227
228 if (nxt_pid != nxt_int_parse(pid, nxt_strlen(pid))) {
229 return NXT_OK;
230 }
231
232 nxt_log(task, NXT_LOG_INFO, "using %i systemd listen sockets", n);
233
234 inherited_sockets = nxt_array_create(rt->mem_pool,
235 n, sizeof(nxt_listen_socket_t));
236 if (inherited_sockets == NULL) {
237 return NXT_ERROR;
238 }
239
240 rt->inherited_sockets = inherited_sockets;
241
242 for (s = 3; s < n; s++) {
243 ls = nxt_array_zero_add(inherited_sockets);
244 if (nxt_slow_path(ls == NULL)) {
245 return NXT_ERROR;
246 }
247
248 ls->socket = s;
249
250 ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s);
251 if (nxt_slow_path(ls->sockaddr == NULL)) {
252 return NXT_ERROR;
253 }
254
255 ls->sockaddr->type = SOCK_STREAM;
256 }
257
258 return NXT_OK;
259}
260
261
262static nxt_int_t
263nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt)
264{
265 nxt_thread_t *thread;
266 nxt_event_engine_t *engine;
267 const nxt_event_interface_t *interface;
268
269 interface = nxt_service_get(rt->services, "engine", NULL);
270
271 if (nxt_slow_path(interface == NULL)) {
272 /* TODO: log */
273 return NXT_ERROR;
274 }
275
276 engine = nxt_event_engine_create(task, interface,
277 nxt_main_process_signals, 0, 0);
278
279 if (nxt_slow_path(engine == NULL)) {
280 return NXT_ERROR;
281 }
282
283 thread = task->thread;
284 thread->engine = engine;
285#if 0
286 thread->fiber = &engine->fibers->fiber;
287#endif
288
289 engine->id = rt->last_engine_id++;
290 engine->mem_pool = nxt_mp_create(1024, 128, 256, 32);
291
292 nxt_queue_init(&rt->engines);
293 nxt_queue_insert_tail(&rt->engines, &engine->link);
294
295 return NXT_OK;
296}
297
298
299static nxt_int_t
300nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt)
301{
302 nxt_int_t ret;
303 nxt_array_t *thread_pools;
304
305 thread_pools = nxt_array_create(rt->mem_pool, 1,
306 sizeof(nxt_thread_pool_t *));
307
308 if (nxt_slow_path(thread_pools == NULL)) {
309 return NXT_ERROR;
310 }
311
312 rt->thread_pools = thread_pools;
313 ret = nxt_runtime_thread_pool_create(thr, rt, 2, 60000 * 1000000LL);
314
315 if (nxt_slow_path(ret != NXT_OK)) {
316 return NXT_ERROR;
317 }
318
319 return NXT_OK;
320}
321
322
323static void
324nxt_runtime_start(nxt_task_t *task, void *obj, void *data)
325{
326 nxt_runtime_t *rt;
327
328 rt = obj;
329
330 nxt_debug(task, "rt conf done");
331
332 task->thread->log->ctx_handler = NULL;
333 task->thread->log->ctx = NULL;
334
335 if (nxt_runtime_log_files_create(task, rt) != NXT_OK) {
336 goto fail;
337 }
338
339 if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) {
340 goto fail;
341 }
342
343 /*
344 * Thread pools should be destroyed before starting worker
345 * processes, because thread pool semaphores will stick in
346 * locked state in new processes after fork().
347 */
348 nxt_runtime_thread_pool_destroy(task, rt, rt->start);
349
350 return;
351
352fail:
353
354 nxt_runtime_quit(task, 1);
355}
356
357
358static void
359nxt_runtime_initial_start(nxt_task_t *task, nxt_uint_t status)
360{
361 nxt_int_t ret;
362 nxt_thread_t *thr;
363 nxt_runtime_t *rt;
364 const nxt_event_interface_t *interface;
365
366 thr = task->thread;
367 rt = thr->runtime;
368
369 if (rt->inherited_sockets == NULL && rt->daemon) {
370
371 if (nxt_process_daemon(task) != NXT_OK) {
372 goto fail;
373 }
374
375 /*
376 * An event engine should be updated after fork()
377 * even if an event facility was not changed because:
378 * 1) inherited kqueue descriptor is invalid,
379 * 2) the signal thread is not inherited.
380 */
381 interface = nxt_service_get(rt->services, "engine", rt->engine);
382 if (interface == NULL) {
383 goto fail;
384 }
385
386 ret = nxt_event_engine_change(task->thread->engine, interface,
387 rt->batch);
388 if (ret != NXT_OK) {
389 goto fail;
390 }
391 }
392
393 ret = nxt_runtime_pid_file_create(task, rt->pid_file);
394 if (ret != NXT_OK) {
395 goto fail;
396 }
397
398 if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) {
399 goto fail;
400 }
401
402 thr->engine->max_connections = rt->engine_connections;
403
404 if (nxt_main_process_start(thr, task, rt) != NXT_ERROR) {
405 return;
406 }
407
408fail:
409
410 nxt_runtime_quit(task, 1);
411}
412
413
414void
415nxt_runtime_quit(nxt_task_t *task, nxt_uint_t status)
416{
417 nxt_bool_t done;
418 nxt_runtime_t *rt;
419 nxt_event_engine_t *engine;
420
421 rt = task->thread->runtime;
422 rt->status |= status;
423 engine = task->thread->engine;
424
425 nxt_debug(task, "exiting");
426
427 done = 1;
428
429 if (!engine->shutdown) {
430 engine->shutdown = 1;
431
432 if (!nxt_array_is_empty(rt->thread_pools)) {
433 nxt_runtime_thread_pool_destroy(task, rt, nxt_runtime_quit);
434 done = 0;
435 }
436
437 if (rt->type == NXT_PROCESS_MAIN) {
438 nxt_main_stop_worker_processes(task, rt);
439 done = 0;
440 }
441 }
442
443 nxt_runtime_close_idle_connections(engine);
444
445 if (done) {
446 nxt_work_queue_add(&engine->fast_work_queue, nxt_runtime_exit,
447 task, rt, engine);
448 }
449}
450
451
452static void
453nxt_runtime_close_idle_connections(nxt_event_engine_t *engine)
454{
455 nxt_conn_t *c;
456 nxt_queue_t *idle;
457 nxt_queue_link_t *link, *next;
458
459 nxt_debug(&engine->task, "close idle connections");
460
461 idle = &engine->idle_connections;
462
463 for (link = nxt_queue_head(idle);
464 link != nxt_queue_tail(idle);
465 link = next)
466 {
467 next = nxt_queue_next(link);
468 c = nxt_queue_link_data(link, nxt_conn_t, link);
469
470 if (!c->socket.read_ready) {
471 nxt_queue_remove(link);
472 nxt_conn_close(engine, c);
473 }
474 }
475}
476
477
478static void
479nxt_runtime_exit(nxt_task_t *task, void *obj, void *data)
480{
481 int status;
482 nxt_runtime_t *rt;
483 nxt_process_t *process;
484 nxt_event_engine_t *engine;
485
486 rt = obj;
487 engine = data;
488
489 nxt_debug(task, "thread pools: %d", rt->thread_pools->nelts);
490
491 if (!nxt_array_is_empty(rt->thread_pools)) {
492 return;
493 }
494
495 if (rt->type == NXT_PROCESS_MAIN) {
496 if (rt->pid_file != NULL) {
497 nxt_file_delete(rt->pid_file);
498 }
499
500#if (NXT_HAVE_UNIX_DOMAIN)
501 {
502 nxt_sockaddr_t *sa;
503 nxt_file_name_t *name;
504
505 sa = rt->controller_listen;
506
507 if (sa->u.sockaddr.sa_family == AF_UNIX) {
508 name = (nxt_file_name_t *) sa->u.sockaddr_un.sun_path;
509 (void) nxt_file_delete(name);
510 }
511 }
512#endif
513 }
514
515 if (!engine->event.signal_support) {
516 nxt_event_engine_signals_stop(engine);
517 }
518
519 nxt_runtime_process_each(rt, process) {
520
521 nxt_process_close_ports(task, process);
522
523 } nxt_runtime_process_loop;
524
525 nxt_thread_mutex_destroy(&rt->processes_mutex);
526
527 status = rt->status;
528 nxt_mp_destroy(rt->mem_pool);
529
530 nxt_debug(task, "exit: %d", status);
531
532 exit(status);
533 nxt_unreachable();
534}
535
536
537static nxt_int_t
538nxt_runtime_event_engine_change(nxt_task_t *task, nxt_runtime_t *rt)
539{
540 nxt_event_engine_t *engine;
541 const nxt_event_interface_t *interface;
542
543 engine = task->thread->engine;
544
545 if (engine->batch == rt->batch
546 && nxt_strcmp(engine->event.name, rt->engine) == 0)
547 {
548 return NXT_OK;
549 }
550
551 interface = nxt_service_get(rt->services, "engine", rt->engine);
552
553 if (interface != NULL) {
554 return nxt_event_engine_change(engine, interface, rt->batch);
555 }
556
557 return NXT_ERROR;
558}
559
560
561void
562nxt_runtime_event_engine_free(nxt_runtime_t *rt)
563{
564 nxt_queue_link_t *link;
565 nxt_event_engine_t *engine;
566
567 link = nxt_queue_first(&rt->engines);
568 nxt_queue_remove(link);
569
570 engine = nxt_queue_link_data(link, nxt_event_engine_t, link);
571 nxt_event_engine_free(engine);
572}
573
574
575nxt_int_t
576nxt_runtime_thread_pool_create(nxt_thread_t *thr, nxt_runtime_t *rt,
577 nxt_uint_t max_threads, nxt_nsec_t timeout)
578{
579 nxt_thread_pool_t *thread_pool, **tp;
580
581 tp = nxt_array_add(rt->thread_pools);
582 if (tp == NULL) {
583 return NXT_ERROR;
584 }
585
586 thread_pool = nxt_thread_pool_create(max_threads, timeout,
587 nxt_runtime_thread_pool_init,
588 thr->engine,
589 nxt_runtime_thread_pool_exit);
590
591 if (nxt_fast_path(thread_pool != NULL)) {
592 *tp = thread_pool;
593 }
594
595 return NXT_OK;
596}
597
598
599static void
600nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt,
601 nxt_runtime_cont_t cont)
602{
603 nxt_uint_t n;
604 nxt_thread_pool_t **tp;
605
606 rt->continuation = cont;
607
608 n = rt->thread_pools->nelts;
609
610 if (n == 0) {
611 cont(task, 0);
612 return;
613 }
614
615 tp = rt->thread_pools->elts;
616
617 do {
618 nxt_thread_pool_destroy(*tp);
619
620 tp++;
621 n--;
622 } while (n != 0);
623}
624
625
626static void
627nxt_runtime_thread_pool_init(void)
628{
629#if (NXT_REGEX)
630 nxt_regex_init(0);
631#endif
632}
633
634
635static void
636nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj, void *data)
637{
638 nxt_uint_t i, n;
639 nxt_runtime_t *rt;
640 nxt_thread_pool_t *tp, **thread_pools;
641 nxt_thread_handle_t handle;
642
643 tp = obj;
644
645 if (data != NULL) {
646 handle = (nxt_thread_handle_t) (uintptr_t) data;
647 nxt_thread_wait(handle);
648 }
649
650 rt = task->thread->runtime;
651
652 thread_pools = rt->thread_pools->elts;
653 n = rt->thread_pools->nelts;
654
655 nxt_debug(task, "thread pools: %ui", n);
656
657 for (i = 0; i < n; i++) {
658
659 if (tp == thread_pools[i]) {
660 nxt_array_remove(rt->thread_pools, &thread_pools[i]);
661
662 if (n == 1) {
663 /* The last thread pool. */
664 rt->continuation(task, 0);
665 }
666
667 return;
668 }
669 }
670}
671
672
673static nxt_int_t
674nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt)
675{
676 nxt_int_t ret;
677 nxt_str_t control;
678 nxt_uint_t n;
679 nxt_file_t *file;
680 const char *slash;
681 nxt_sockaddr_t *sa;
682 nxt_file_name_str_t file_name;
683 const nxt_event_interface_t *interface;
684
685 rt->daemon = 1;
686 rt->engine_connections = 256;
687 rt->auxiliary_threads = 2;
688 rt->user_cred.user = NXT_USER;
689 rt->group = NXT_GROUP;
690 rt->pid = NXT_PID;
691 rt->log = NXT_LOG;
692 rt->modules = NXT_MODULES;
693 rt->state = NXT_STATE;
694 rt->control = NXT_CONTROL_SOCK;
695
696 if (nxt_runtime_conf_read_cmd(task, rt) != NXT_OK) {
697 return NXT_ERROR;
698 }
699
700 if (nxt_user_cred_get(task, &rt->user_cred, rt->group) != NXT_OK) {
701 return NXT_ERROR;
702 }
703
704 /* An engine's parameters. */
705
706 interface = nxt_service_get(rt->services, "engine", rt->engine);
707 if (interface == NULL) {
708 return NXT_ERROR;
709 }
710
711 rt->engine = interface->name;
712
713 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%Z", rt->pid);
714 if (nxt_slow_path(ret != NXT_OK)) {
715 return NXT_ERROR;
716 }
717
718 rt->pid_file = file_name.start;
719
720 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%Z", rt->log);
721 if (nxt_slow_path(ret != NXT_OK)) {
722 return NXT_ERROR;
723 }
724
725 file = nxt_list_first(rt->log_files);
726 file->name = file_name.start;
727
728 slash = "";
729 n = nxt_strlen(rt->modules);
730
731 if (n > 1 && rt->modules[n - 1] != '/') {
732 slash = "/";
733 }
734
735 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%s*.unit.so%Z",
736 rt->modules, slash);
737 if (nxt_slow_path(ret != NXT_OK)) {
738 return NXT_ERROR;
739 }
740
741 rt->modules = (char *) file_name.start;
742
743 slash = "";
744 n = nxt_strlen(rt->state);
745
746 if (n > 1 && rt->state[n - 1] != '/') {
747 slash = "/";
748 }
749
750 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%sconf.json%Z",
751 rt->state, slash);
752 if (nxt_slow_path(ret != NXT_OK)) {
753 return NXT_ERROR;
754 }
755
756 rt->conf = (char *) file_name.start;
757
758 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s.tmp%Z", rt->conf);
759 if (nxt_slow_path(ret != NXT_OK)) {
760 return NXT_ERROR;
761 }
762
763 rt->conf_tmp = (char *) file_name.start;
764
765 control.length = nxt_strlen(rt->control);
766 control.start = (u_char *) rt->control;
767
768 sa = nxt_sockaddr_parse(rt->mem_pool, &control);
769 if (nxt_slow_path(sa == NULL)) {
770 return NXT_ERROR;
771 }
772
773 sa->type = SOCK_STREAM;
774
775 rt->controller_listen = sa;
776
777 if (nxt_runtime_controller_socket(task, rt) != NXT_OK) {
778 return NXT_ERROR;
779 }
780
781 return NXT_OK;
782}
783
784
785static nxt_int_t
786nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt)
787{
788 char *p, **argv;
789 u_char *end;
790 u_char buf[1024];
791
792 static const char version[] =
793 "unit version: " NXT_VERSION "\n"
794 "configured as ./configure" NXT_CONFIGURE_OPTIONS "\n";
795
796 static const char no_control[] =
797 "option \"--control\" requires socket address\n";
798 static const char no_user[] = "option \"--user\" requires username\n";
799 static const char no_group[] = "option \"--group\" requires group name\n";
800 static const char no_pid[] = "option \"--pid\" requires filename\n";
801 static const char no_log[] = "option \"--log\" requires filename\n";
802 static const char no_modules[] =
803 "option \"--modules\" requires directory\n";
804 static const char no_state[] = "option \"--state\" requires directory\n";
805
806 static const char help[] =
807 "\n"
808 "unit options:\n"
809 "\n"
810 " --version print unit version and configure options\n"
811 "\n"
812 " --no-daemon run unit in non-daemon mode\n"
813 "\n"
814 " --control ADDRESS set address of control API socket\n"
815 " default: \"" NXT_CONTROL_SOCK "\"\n"
816 "\n"
817 " --pid FILE set pid filename\n"
818 " default: \"" NXT_PID "\"\n"
819 "\n"
820 " --log FILE set log filename\n"
821 " default: \"" NXT_LOG "\"\n"
822 "\n"
823 " --modules DIRECTORY set modules directory name\n"
824 " default: \"" NXT_MODULES "\"\n"
825 "\n"
826 " --state DIRECTORY set state directory name\n"
827 " default: \"" NXT_STATE "\"\n"
828 "\n"
829 " --user USER set non-privileged processes to run"
830 " as specified user\n"
831 " default: \"" NXT_USER "\"\n"
832 "\n"
833 " --group GROUP set non-privileged processes to run"
834 " as specified group\n"
835 " default: ";
836
837 static const char group[] = "\"" NXT_GROUP "\"\n\n";
838 static const char primary[] = "user's primary group\n\n";
839
840 argv = &nxt_process_argv[1];
841
842 while (*argv != NULL) {
843 p = *argv++;
844
845 if (nxt_strcmp(p, "--control") == 0) {
846 if (*argv == NULL) {
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_main.h>
9#include <nxt_runtime.h>
10#include <nxt_port.h>
11#include <nxt_main_process.h>
12#include <nxt_router.h>
13
14
15static nxt_int_t nxt_runtime_inherited_listen_sockets(nxt_task_t *task,
16 nxt_runtime_t *rt);
17static nxt_int_t nxt_runtime_systemd_listen_sockets(nxt_task_t *task,
18 nxt_runtime_t *rt);
19static nxt_int_t nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt);
20static nxt_int_t nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt);
21static void nxt_runtime_start(nxt_task_t *task, void *obj, void *data);
22static void nxt_runtime_initial_start(nxt_task_t *task, nxt_uint_t status);
23static void nxt_runtime_close_idle_connections(nxt_event_engine_t *engine);
24static void nxt_runtime_exit(nxt_task_t *task, void *obj, void *data);
25static nxt_int_t nxt_runtime_event_engine_change(nxt_task_t *task,
26 nxt_runtime_t *rt);
27static nxt_int_t nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt);
28static nxt_int_t nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt);
29static nxt_int_t nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt);
30static nxt_int_t nxt_runtime_log_files_init(nxt_runtime_t *rt);
31static nxt_int_t nxt_runtime_log_files_create(nxt_task_t *task,
32 nxt_runtime_t *rt);
33static nxt_int_t nxt_runtime_pid_file_create(nxt_task_t *task,
34 nxt_file_name_t *pid_file);
35static void nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt,
36 nxt_runtime_cont_t cont);
37static void nxt_runtime_thread_pool_init(void);
38static void nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj,
39 void *data);
40static void nxt_runtime_process_destroy(nxt_runtime_t *rt,
41 nxt_process_t *process);
42static nxt_process_t *nxt_runtime_process_remove_pid(nxt_runtime_t *rt,
43 nxt_pid_t pid);
44
45
46nxt_int_t
47nxt_runtime_create(nxt_task_t *task)
48{
49 nxt_mp_t *mp;
50 nxt_int_t ret;
51 nxt_array_t *listen_sockets;
52 nxt_runtime_t *rt;
53 nxt_app_lang_module_t *lang;
54
55 mp = nxt_mp_create(1024, 128, 256, 32);
56
57 if (nxt_slow_path(mp == NULL)) {
58 return NXT_ERROR;
59 }
60
61 rt = nxt_mp_zget(mp, sizeof(nxt_runtime_t));
62 if (nxt_slow_path(rt == NULL)) {
63 return NXT_ERROR;
64 }
65
66 task->thread->runtime = rt;
67 rt->mem_pool = mp;
68
69 nxt_thread_mutex_create(&rt->processes_mutex);
70
71 rt->services = nxt_services_init(mp);
72 if (nxt_slow_path(rt->services == NULL)) {
73 goto fail;
74 }
75
76 rt->languages = nxt_array_create(mp, 1, sizeof(nxt_app_lang_module_t));
77 if (nxt_slow_path(rt->languages == NULL)) {
78 goto fail;
79 }
80
81 /* Should not fail. */
82 lang = nxt_array_add(rt->languages);
83 lang->type = NXT_APP_GO;
84 lang->version = (u_char *) "";
85 lang->file = NULL;
86 lang->module = &nxt_go_module;
87
88 listen_sockets = nxt_array_create(mp, 1, sizeof(nxt_listen_socket_t));
89 if (nxt_slow_path(listen_sockets == NULL)) {
90 goto fail;
91 }
92
93 rt->listen_sockets = listen_sockets;
94
95 ret = nxt_runtime_inherited_listen_sockets(task, rt);
96 if (nxt_slow_path(ret != NXT_OK)) {
97 goto fail;
98 }
99
100 if (nxt_runtime_hostname(task, rt) != NXT_OK) {
101 goto fail;
102 }
103
104 if (nxt_slow_path(nxt_runtime_log_files_init(rt) != NXT_OK)) {
105 goto fail;
106 }
107
108 if (nxt_runtime_event_engines(task, rt) != NXT_OK) {
109 goto fail;
110 }
111
112 if (nxt_slow_path(nxt_runtime_thread_pools(task->thread, rt) != NXT_OK)) {
113 goto fail;
114 }
115
116 rt->start = nxt_runtime_initial_start;
117
118 if (nxt_runtime_conf_init(task, rt) != NXT_OK) {
119 goto fail;
120 }
121
122 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
123 nxt_runtime_start, task, rt, NULL);
124
125 return NXT_OK;
126
127fail:
128
129 nxt_mp_destroy(mp);
130
131 return NXT_ERROR;
132}
133
134
135static nxt_int_t
136nxt_runtime_inherited_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt)
137{
138 u_char *v, *p;
139 nxt_int_t type;
140 nxt_array_t *inherited_sockets;
141 nxt_socket_t s;
142 nxt_listen_socket_t *ls;
143
144 v = (u_char *) getenv("NGINX");
145
146 if (v == NULL) {
147 return nxt_runtime_systemd_listen_sockets(task, rt);
148 }
149
150 nxt_alert(task, "using inherited listen sockets: %s", v);
151
152 inherited_sockets = nxt_array_create(rt->mem_pool,
153 1, sizeof(nxt_listen_socket_t));
154 if (inherited_sockets == NULL) {
155 return NXT_ERROR;
156 }
157
158 rt->inherited_sockets = inherited_sockets;
159
160 for (p = v; *p != '\0'; p++) {
161
162 if (*p == ';') {
163 s = nxt_int_parse(v, p - v);
164
165 if (nxt_slow_path(s < 0)) {
166 nxt_alert(task, "invalid socket number \"%s\" "
167 "in NGINX environment variable, "
168 "ignoring the rest of the variable", v);
169 return NXT_ERROR;
170 }
171
172 v = p + 1;
173
174 ls = nxt_array_zero_add(inherited_sockets);
175 if (nxt_slow_path(ls == NULL)) {
176 return NXT_ERROR;
177 }
178
179 ls->socket = s;
180
181 ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s);
182 if (nxt_slow_path(ls->sockaddr == NULL)) {
183 return NXT_ERROR;
184 }
185
186 type = nxt_socket_getsockopt(task, s, SOL_SOCKET, SO_TYPE);
187 if (nxt_slow_path(type == -1)) {
188 return NXT_ERROR;
189 }
190
191 ls->sockaddr->type = (uint16_t) type;
192 }
193 }
194
195 return NXT_OK;
196}
197
198
199static nxt_int_t
200nxt_runtime_systemd_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt)
201{
202 u_char *nfd, *pid;
203 nxt_int_t n;
204 nxt_array_t *inherited_sockets;
205 nxt_socket_t s;
206 nxt_listen_socket_t *ls;
207
208 /*
209 * Number of listening sockets passed. The socket
210 * descriptors start from number 3 and are sequential.
211 */
212 nfd = (u_char *) getenv("LISTEN_FDS");
213 if (nfd == NULL) {
214 return NXT_OK;
215 }
216
217 /* The pid of the service process. */
218 pid = (u_char *) getenv("LISTEN_PID");
219 if (pid == NULL) {
220 return NXT_OK;
221 }
222
223 n = nxt_int_parse(nfd, nxt_strlen(nfd));
224 if (n < 0) {
225 return NXT_OK;
226 }
227
228 if (nxt_pid != nxt_int_parse(pid, nxt_strlen(pid))) {
229 return NXT_OK;
230 }
231
232 nxt_log(task, NXT_LOG_INFO, "using %i systemd listen sockets", n);
233
234 inherited_sockets = nxt_array_create(rt->mem_pool,
235 n, sizeof(nxt_listen_socket_t));
236 if (inherited_sockets == NULL) {
237 return NXT_ERROR;
238 }
239
240 rt->inherited_sockets = inherited_sockets;
241
242 for (s = 3; s < n; s++) {
243 ls = nxt_array_zero_add(inherited_sockets);
244 if (nxt_slow_path(ls == NULL)) {
245 return NXT_ERROR;
246 }
247
248 ls->socket = s;
249
250 ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s);
251 if (nxt_slow_path(ls->sockaddr == NULL)) {
252 return NXT_ERROR;
253 }
254
255 ls->sockaddr->type = SOCK_STREAM;
256 }
257
258 return NXT_OK;
259}
260
261
262static nxt_int_t
263nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt)
264{
265 nxt_thread_t *thread;
266 nxt_event_engine_t *engine;
267 const nxt_event_interface_t *interface;
268
269 interface = nxt_service_get(rt->services, "engine", NULL);
270
271 if (nxt_slow_path(interface == NULL)) {
272 /* TODO: log */
273 return NXT_ERROR;
274 }
275
276 engine = nxt_event_engine_create(task, interface,
277 nxt_main_process_signals, 0, 0);
278
279 if (nxt_slow_path(engine == NULL)) {
280 return NXT_ERROR;
281 }
282
283 thread = task->thread;
284 thread->engine = engine;
285#if 0
286 thread->fiber = &engine->fibers->fiber;
287#endif
288
289 engine->id = rt->last_engine_id++;
290 engine->mem_pool = nxt_mp_create(1024, 128, 256, 32);
291
292 nxt_queue_init(&rt->engines);
293 nxt_queue_insert_tail(&rt->engines, &engine->link);
294
295 return NXT_OK;
296}
297
298
299static nxt_int_t
300nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt)
301{
302 nxt_int_t ret;
303 nxt_array_t *thread_pools;
304
305 thread_pools = nxt_array_create(rt->mem_pool, 1,
306 sizeof(nxt_thread_pool_t *));
307
308 if (nxt_slow_path(thread_pools == NULL)) {
309 return NXT_ERROR;
310 }
311
312 rt->thread_pools = thread_pools;
313 ret = nxt_runtime_thread_pool_create(thr, rt, 2, 60000 * 1000000LL);
314
315 if (nxt_slow_path(ret != NXT_OK)) {
316 return NXT_ERROR;
317 }
318
319 return NXT_OK;
320}
321
322
323static void
324nxt_runtime_start(nxt_task_t *task, void *obj, void *data)
325{
326 nxt_runtime_t *rt;
327
328 rt = obj;
329
330 nxt_debug(task, "rt conf done");
331
332 task->thread->log->ctx_handler = NULL;
333 task->thread->log->ctx = NULL;
334
335 if (nxt_runtime_log_files_create(task, rt) != NXT_OK) {
336 goto fail;
337 }
338
339 if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) {
340 goto fail;
341 }
342
343 /*
344 * Thread pools should be destroyed before starting worker
345 * processes, because thread pool semaphores will stick in
346 * locked state in new processes after fork().
347 */
348 nxt_runtime_thread_pool_destroy(task, rt, rt->start);
349
350 return;
351
352fail:
353
354 nxt_runtime_quit(task, 1);
355}
356
357
358static void
359nxt_runtime_initial_start(nxt_task_t *task, nxt_uint_t status)
360{
361 nxt_int_t ret;
362 nxt_thread_t *thr;
363 nxt_runtime_t *rt;
364 const nxt_event_interface_t *interface;
365
366 thr = task->thread;
367 rt = thr->runtime;
368
369 if (rt->inherited_sockets == NULL && rt->daemon) {
370
371 if (nxt_process_daemon(task) != NXT_OK) {
372 goto fail;
373 }
374
375 /*
376 * An event engine should be updated after fork()
377 * even if an event facility was not changed because:
378 * 1) inherited kqueue descriptor is invalid,
379 * 2) the signal thread is not inherited.
380 */
381 interface = nxt_service_get(rt->services, "engine", rt->engine);
382 if (interface == NULL) {
383 goto fail;
384 }
385
386 ret = nxt_event_engine_change(task->thread->engine, interface,
387 rt->batch);
388 if (ret != NXT_OK) {
389 goto fail;
390 }
391 }
392
393 ret = nxt_runtime_pid_file_create(task, rt->pid_file);
394 if (ret != NXT_OK) {
395 goto fail;
396 }
397
398 if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) {
399 goto fail;
400 }
401
402 thr->engine->max_connections = rt->engine_connections;
403
404 if (nxt_main_process_start(thr, task, rt) != NXT_ERROR) {
405 return;
406 }
407
408fail:
409
410 nxt_runtime_quit(task, 1);
411}
412
413
414void
415nxt_runtime_quit(nxt_task_t *task, nxt_uint_t status)
416{
417 nxt_bool_t done;
418 nxt_runtime_t *rt;
419 nxt_event_engine_t *engine;
420
421 rt = task->thread->runtime;
422 rt->status |= status;
423 engine = task->thread->engine;
424
425 nxt_debug(task, "exiting");
426
427 done = 1;
428
429 if (!engine->shutdown) {
430 engine->shutdown = 1;
431
432 if (!nxt_array_is_empty(rt->thread_pools)) {
433 nxt_runtime_thread_pool_destroy(task, rt, nxt_runtime_quit);
434 done = 0;
435 }
436
437 if (rt->type == NXT_PROCESS_MAIN) {
438 nxt_main_stop_worker_processes(task, rt);
439 done = 0;
440 }
441 }
442
443 nxt_runtime_close_idle_connections(engine);
444
445 if (done) {
446 nxt_work_queue_add(&engine->fast_work_queue, nxt_runtime_exit,
447 task, rt, engine);
448 }
449}
450
451
452static void
453nxt_runtime_close_idle_connections(nxt_event_engine_t *engine)
454{
455 nxt_conn_t *c;
456 nxt_queue_t *idle;
457 nxt_queue_link_t *link, *next;
458
459 nxt_debug(&engine->task, "close idle connections");
460
461 idle = &engine->idle_connections;
462
463 for (link = nxt_queue_head(idle);
464 link != nxt_queue_tail(idle);
465 link = next)
466 {
467 next = nxt_queue_next(link);
468 c = nxt_queue_link_data(link, nxt_conn_t, link);
469
470 if (!c->socket.read_ready) {
471 nxt_queue_remove(link);
472 nxt_conn_close(engine, c);
473 }
474 }
475}
476
477
478static void
479nxt_runtime_exit(nxt_task_t *task, void *obj, void *data)
480{
481 int status;
482 nxt_runtime_t *rt;
483 nxt_process_t *process;
484 nxt_event_engine_t *engine;
485
486 rt = obj;
487 engine = data;
488
489 nxt_debug(task, "thread pools: %d", rt->thread_pools->nelts);
490
491 if (!nxt_array_is_empty(rt->thread_pools)) {
492 return;
493 }
494
495 if (rt->type == NXT_PROCESS_MAIN) {
496 if (rt->pid_file != NULL) {
497 nxt_file_delete(rt->pid_file);
498 }
499
500#if (NXT_HAVE_UNIX_DOMAIN)
501 {
502 nxt_sockaddr_t *sa;
503 nxt_file_name_t *name;
504
505 sa = rt->controller_listen;
506
507 if (sa->u.sockaddr.sa_family == AF_UNIX) {
508 name = (nxt_file_name_t *) sa->u.sockaddr_un.sun_path;
509 (void) nxt_file_delete(name);
510 }
511 }
512#endif
513 }
514
515 if (!engine->event.signal_support) {
516 nxt_event_engine_signals_stop(engine);
517 }
518
519 nxt_runtime_process_each(rt, process) {
520
521 nxt_process_close_ports(task, process);
522
523 } nxt_runtime_process_loop;
524
525 nxt_thread_mutex_destroy(&rt->processes_mutex);
526
527 status = rt->status;
528 nxt_mp_destroy(rt->mem_pool);
529
530 nxt_debug(task, "exit: %d", status);
531
532 exit(status);
533 nxt_unreachable();
534}
535
536
537static nxt_int_t
538nxt_runtime_event_engine_change(nxt_task_t *task, nxt_runtime_t *rt)
539{
540 nxt_event_engine_t *engine;
541 const nxt_event_interface_t *interface;
542
543 engine = task->thread->engine;
544
545 if (engine->batch == rt->batch
546 && nxt_strcmp(engine->event.name, rt->engine) == 0)
547 {
548 return NXT_OK;
549 }
550
551 interface = nxt_service_get(rt->services, "engine", rt->engine);
552
553 if (interface != NULL) {
554 return nxt_event_engine_change(engine, interface, rt->batch);
555 }
556
557 return NXT_ERROR;
558}
559
560
561void
562nxt_runtime_event_engine_free(nxt_runtime_t *rt)
563{
564 nxt_queue_link_t *link;
565 nxt_event_engine_t *engine;
566
567 link = nxt_queue_first(&rt->engines);
568 nxt_queue_remove(link);
569
570 engine = nxt_queue_link_data(link, nxt_event_engine_t, link);
571 nxt_event_engine_free(engine);
572}
573
574
575nxt_int_t
576nxt_runtime_thread_pool_create(nxt_thread_t *thr, nxt_runtime_t *rt,
577 nxt_uint_t max_threads, nxt_nsec_t timeout)
578{
579 nxt_thread_pool_t *thread_pool, **tp;
580
581 tp = nxt_array_add(rt->thread_pools);
582 if (tp == NULL) {
583 return NXT_ERROR;
584 }
585
586 thread_pool = nxt_thread_pool_create(max_threads, timeout,
587 nxt_runtime_thread_pool_init,
588 thr->engine,
589 nxt_runtime_thread_pool_exit);
590
591 if (nxt_fast_path(thread_pool != NULL)) {
592 *tp = thread_pool;
593 }
594
595 return NXT_OK;
596}
597
598
599static void
600nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt,
601 nxt_runtime_cont_t cont)
602{
603 nxt_uint_t n;
604 nxt_thread_pool_t **tp;
605
606 rt->continuation = cont;
607
608 n = rt->thread_pools->nelts;
609
610 if (n == 0) {
611 cont(task, 0);
612 return;
613 }
614
615 tp = rt->thread_pools->elts;
616
617 do {
618 nxt_thread_pool_destroy(*tp);
619
620 tp++;
621 n--;
622 } while (n != 0);
623}
624
625
626static void
627nxt_runtime_thread_pool_init(void)
628{
629#if (NXT_REGEX)
630 nxt_regex_init(0);
631#endif
632}
633
634
635static void
636nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj, void *data)
637{
638 nxt_uint_t i, n;
639 nxt_runtime_t *rt;
640 nxt_thread_pool_t *tp, **thread_pools;
641 nxt_thread_handle_t handle;
642
643 tp = obj;
644
645 if (data != NULL) {
646 handle = (nxt_thread_handle_t) (uintptr_t) data;
647 nxt_thread_wait(handle);
648 }
649
650 rt = task->thread->runtime;
651
652 thread_pools = rt->thread_pools->elts;
653 n = rt->thread_pools->nelts;
654
655 nxt_debug(task, "thread pools: %ui", n);
656
657 for (i = 0; i < n; i++) {
658
659 if (tp == thread_pools[i]) {
660 nxt_array_remove(rt->thread_pools, &thread_pools[i]);
661
662 if (n == 1) {
663 /* The last thread pool. */
664 rt->continuation(task, 0);
665 }
666
667 return;
668 }
669 }
670}
671
672
673static nxt_int_t
674nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt)
675{
676 nxt_int_t ret;
677 nxt_str_t control;
678 nxt_uint_t n;
679 nxt_file_t *file;
680 const char *slash;
681 nxt_sockaddr_t *sa;
682 nxt_file_name_str_t file_name;
683 const nxt_event_interface_t *interface;
684
685 rt->daemon = 1;
686 rt->engine_connections = 256;
687 rt->auxiliary_threads = 2;
688 rt->user_cred.user = NXT_USER;
689 rt->group = NXT_GROUP;
690 rt->pid = NXT_PID;
691 rt->log = NXT_LOG;
692 rt->modules = NXT_MODULES;
693 rt->state = NXT_STATE;
694 rt->control = NXT_CONTROL_SOCK;
695
696 if (nxt_runtime_conf_read_cmd(task, rt) != NXT_OK) {
697 return NXT_ERROR;
698 }
699
700 if (nxt_user_cred_get(task, &rt->user_cred, rt->group) != NXT_OK) {
701 return NXT_ERROR;
702 }
703
704 /* An engine's parameters. */
705
706 interface = nxt_service_get(rt->services, "engine", rt->engine);
707 if (interface == NULL) {
708 return NXT_ERROR;
709 }
710
711 rt->engine = interface->name;
712
713 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%Z", rt->pid);
714 if (nxt_slow_path(ret != NXT_OK)) {
715 return NXT_ERROR;
716 }
717
718 rt->pid_file = file_name.start;
719
720 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%Z", rt->log);
721 if (nxt_slow_path(ret != NXT_OK)) {
722 return NXT_ERROR;
723 }
724
725 file = nxt_list_first(rt->log_files);
726 file->name = file_name.start;
727
728 slash = "";
729 n = nxt_strlen(rt->modules);
730
731 if (n > 1 && rt->modules[n - 1] != '/') {
732 slash = "/";
733 }
734
735 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%s*.unit.so%Z",
736 rt->modules, slash);
737 if (nxt_slow_path(ret != NXT_OK)) {
738 return NXT_ERROR;
739 }
740
741 rt->modules = (char *) file_name.start;
742
743 slash = "";
744 n = nxt_strlen(rt->state);
745
746 if (n > 1 && rt->state[n - 1] != '/') {
747 slash = "/";
748 }
749
750 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%sconf.json%Z",
751 rt->state, slash);
752 if (nxt_slow_path(ret != NXT_OK)) {
753 return NXT_ERROR;
754 }
755
756 rt->conf = (char *) file_name.start;
757
758 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s.tmp%Z", rt->conf);
759 if (nxt_slow_path(ret != NXT_OK)) {
760 return NXT_ERROR;
761 }
762
763 rt->conf_tmp = (char *) file_name.start;
764
765 control.length = nxt_strlen(rt->control);
766 control.start = (u_char *) rt->control;
767
768 sa = nxt_sockaddr_parse(rt->mem_pool, &control);
769 if (nxt_slow_path(sa == NULL)) {
770 return NXT_ERROR;
771 }
772
773 sa->type = SOCK_STREAM;
774
775 rt->controller_listen = sa;
776
777 if (nxt_runtime_controller_socket(task, rt) != NXT_OK) {
778 return NXT_ERROR;
779 }
780
781 return NXT_OK;
782}
783
784
785static nxt_int_t
786nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt)
787{
788 char *p, **argv;
789 u_char *end;
790 u_char buf[1024];
791
792 static const char version[] =
793 "unit version: " NXT_VERSION "\n"
794 "configured as ./configure" NXT_CONFIGURE_OPTIONS "\n";
795
796 static const char no_control[] =
797 "option \"--control\" requires socket address\n";
798 static const char no_user[] = "option \"--user\" requires username\n";
799 static const char no_group[] = "option \"--group\" requires group name\n";
800 static const char no_pid[] = "option \"--pid\" requires filename\n";
801 static const char no_log[] = "option \"--log\" requires filename\n";
802 static const char no_modules[] =
803 "option \"--modules\" requires directory\n";
804 static const char no_state[] = "option \"--state\" requires directory\n";
805
806 static const char help[] =
807 "\n"
808 "unit options:\n"
809 "\n"
810 " --version print unit version and configure options\n"
811 "\n"
812 " --no-daemon run unit in non-daemon mode\n"
813 "\n"
814 " --control ADDRESS set address of control API socket\n"
815 " default: \"" NXT_CONTROL_SOCK "\"\n"
816 "\n"
817 " --pid FILE set pid filename\n"
818 " default: \"" NXT_PID "\"\n"
819 "\n"
820 " --log FILE set log filename\n"
821 " default: \"" NXT_LOG "\"\n"
822 "\n"
823 " --modules DIRECTORY set modules directory name\n"
824 " default: \"" NXT_MODULES "\"\n"
825 "\n"
826 " --state DIRECTORY set state directory name\n"
827 " default: \"" NXT_STATE "\"\n"
828 "\n"
829 " --user USER set non-privileged processes to run"
830 " as specified user\n"
831 " default: \"" NXT_USER "\"\n"
832 "\n"
833 " --group GROUP set non-privileged processes to run"
834 " as specified group\n"
835 " default: ";
836
837 static const char group[] = "\"" NXT_GROUP "\"\n\n";
838 static const char primary[] = "user's primary group\n\n";
839
840 argv = &nxt_process_argv[1];
841
842 while (*argv != NULL) {
843 p = *argv++;
844
845 if (nxt_strcmp(p, "--control") == 0) {
846 if (*argv == NULL) {
847 write(STDERR_FILENO, no_control, sizeof(no_control) - 1);
847 write(STDERR_FILENO, no_control, nxt_length(no_control));
848 return NXT_ERROR;
849 }
850
851 p = *argv++;
852
853 rt->control = p;
854
855 continue;
856 }
857
858 if (nxt_strcmp(p, "--upstream") == 0) {
859 if (*argv == NULL) {
860 nxt_alert(task, "no argument for option \"--upstream\"");
861 return NXT_ERROR;
862 }
863
864 p = *argv++;
865
866 rt->upstream.length = nxt_strlen(p);
867 rt->upstream.start = (u_char *) p;
868
869 continue;
870 }
871
872 if (nxt_strcmp(p, "--user") == 0) {
873 if (*argv == NULL) {
848 return NXT_ERROR;
849 }
850
851 p = *argv++;
852
853 rt->control = p;
854
855 continue;
856 }
857
858 if (nxt_strcmp(p, "--upstream") == 0) {
859 if (*argv == NULL) {
860 nxt_alert(task, "no argument for option \"--upstream\"");
861 return NXT_ERROR;
862 }
863
864 p = *argv++;
865
866 rt->upstream.length = nxt_strlen(p);
867 rt->upstream.start = (u_char *) p;
868
869 continue;
870 }
871
872 if (nxt_strcmp(p, "--user") == 0) {
873 if (*argv == NULL) {
874 write(STDERR_FILENO, no_user, sizeof(no_user) - 1);
874 write(STDERR_FILENO, no_user, nxt_length(no_user));
875 return NXT_ERROR;
876 }
877
878 p = *argv++;
879
880 rt->user_cred.user = p;
881
882 continue;
883 }
884
885 if (nxt_strcmp(p, "--group") == 0) {
886 if (*argv == NULL) {
875 return NXT_ERROR;
876 }
877
878 p = *argv++;
879
880 rt->user_cred.user = p;
881
882 continue;
883 }
884
885 if (nxt_strcmp(p, "--group") == 0) {
886 if (*argv == NULL) {
887 write(STDERR_FILENO, no_group, sizeof(no_group) - 1);
887 write(STDERR_FILENO, no_group, nxt_length(no_group));
888 return NXT_ERROR;
889 }
890
891 p = *argv++;
892
893 rt->group = p;
894
895 continue;
896 }
897
898 if (nxt_strcmp(p, "--pid") == 0) {
899 if (*argv == NULL) {
888 return NXT_ERROR;
889 }
890
891 p = *argv++;
892
893 rt->group = p;
894
895 continue;
896 }
897
898 if (nxt_strcmp(p, "--pid") == 0) {
899 if (*argv == NULL) {
900 write(STDERR_FILENO, no_pid, sizeof(no_pid) - 1);
900 write(STDERR_FILENO, no_pid, nxt_length(no_pid));
901 return NXT_ERROR;
902 }
903
904 p = *argv++;
905
906 rt->pid = p;
907
908 continue;
909 }
910
911 if (nxt_strcmp(p, "--log") == 0) {
912 if (*argv == NULL) {
901 return NXT_ERROR;
902 }
903
904 p = *argv++;
905
906 rt->pid = p;
907
908 continue;
909 }
910
911 if (nxt_strcmp(p, "--log") == 0) {
912 if (*argv == NULL) {
913 write(STDERR_FILENO, no_log, sizeof(no_log) - 1);
913 write(STDERR_FILENO, no_log, nxt_length(no_log));
914 return NXT_ERROR;
915 }
916
917 p = *argv++;
918
919 rt->log = p;
920
921 continue;
922 }
923
924 if (nxt_strcmp(p, "--modules") == 0) {
925 if (*argv == NULL) {
914 return NXT_ERROR;
915 }
916
917 p = *argv++;
918
919 rt->log = p;
920
921 continue;
922 }
923
924 if (nxt_strcmp(p, "--modules") == 0) {
925 if (*argv == NULL) {
926 write(STDERR_FILENO, no_modules, sizeof(no_modules) - 1);
926 write(STDERR_FILENO, no_modules, nxt_length(no_modules));
927 return NXT_ERROR;
928 }
929
930 p = *argv++;
931
932 rt->modules = p;
933
934 continue;
935 }
936
937 if (nxt_strcmp(p, "--state") == 0) {
938 if (*argv == NULL) {
927 return NXT_ERROR;
928 }
929
930 p = *argv++;
931
932 rt->modules = p;
933
934 continue;
935 }
936
937 if (nxt_strcmp(p, "--state") == 0) {
938 if (*argv == NULL) {
939 write(STDERR_FILENO, no_state, sizeof(no_state) - 1);
939 write(STDERR_FILENO, no_state, nxt_length(no_state));
940 return NXT_ERROR;
941 }
942
943 p = *argv++;
944
945 rt->state = p;
946
947 continue;
948 }
949
950 if (nxt_strcmp(p, "--no-daemon") == 0) {
951 rt->daemon = 0;
952 continue;
953 }
954
955 if (nxt_strcmp(p, "--version") == 0) {
940 return NXT_ERROR;
941 }
942
943 p = *argv++;
944
945 rt->state = p;
946
947 continue;
948 }
949
950 if (nxt_strcmp(p, "--no-daemon") == 0) {
951 rt->daemon = 0;
952 continue;
953 }
954
955 if (nxt_strcmp(p, "--version") == 0) {
956 write(STDERR_FILENO, version, sizeof(version) - 1);
956 write(STDERR_FILENO, version, nxt_length(version));
957 exit(0);
958 }
959
960 if (nxt_strcmp(p, "--help") == 0 || nxt_strcmp(p, "-h") == 0) {
957 exit(0);
958 }
959
960 if (nxt_strcmp(p, "--help") == 0 || nxt_strcmp(p, "-h") == 0) {
961 write(STDOUT_FILENO, help, sizeof(help) - 1);
961 write(STDOUT_FILENO, help, nxt_length(help));
962
963 if (sizeof(NXT_GROUP) == 1) {
962
963 if (sizeof(NXT_GROUP) == 1) {
964 write(STDOUT_FILENO, primary, sizeof(primary) - 1);
964 write(STDOUT_FILENO, primary, nxt_length(primary));
965
966 } else {
965
966 } else {
967 write(STDOUT_FILENO, group, sizeof(group) - 1);
967 write(STDOUT_FILENO, group, nxt_length(group));
968 }
969
970 exit(0);
971 }
972
973 end = nxt_sprintf(buf, buf + sizeof(buf), "unknown option \"%s\", "
974 "try \"%s -h\" for available options\n",
975 p, nxt_process_argv[0]);
976
977 write(STDERR_FILENO, buf, end - buf);
978
979 return NXT_ERROR;
980 }
981
982 return NXT_OK;
983}
984
985
986nxt_listen_socket_t *
987nxt_runtime_listen_socket_add(nxt_runtime_t *rt, nxt_sockaddr_t *sa)
988{
989 nxt_mp_t *mp;
990 nxt_listen_socket_t *ls;
991
992 ls = nxt_array_zero_add(rt->listen_sockets);
993 if (ls == NULL) {
994 return NULL;
995 }
996
997 mp = rt->mem_pool;
998
999 ls->sockaddr = nxt_sockaddr_create(mp, &sa->u.sockaddr, sa->socklen,
1000 sa->length);
1001 if (ls->sockaddr == NULL) {
1002 return NULL;
1003 }
1004
1005 ls->sockaddr->type = sa->type;
1006
1007 nxt_sockaddr_text(ls->sockaddr);
1008
1009 ls->socket = -1;
1010 ls->backlog = NXT_LISTEN_BACKLOG;
1011
1012 return ls;
1013}
1014
1015
1016static nxt_int_t
1017nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt)
1018{
1019 size_t length;
1020 char hostname[NXT_MAXHOSTNAMELEN + 1];
1021
1022 if (gethostname(hostname, NXT_MAXHOSTNAMELEN) != 0) {
1023 nxt_alert(task, "gethostname() failed %E", nxt_errno);
1024 return NXT_ERROR;
1025 }
1026
1027 /*
1028 * Linux gethostname(2):
1029 *
1030 * If the null-terminated hostname is too large to fit,
1031 * then the name is truncated, and no error is returned.
1032 *
1033 * For this reason an additional byte is reserved in the buffer.
1034 */
1035 hostname[NXT_MAXHOSTNAMELEN] = '\0';
1036
1037 length = nxt_strlen(hostname);
1038 rt->hostname.length = length;
1039
1040 rt->hostname.start = nxt_mp_nget(rt->mem_pool, length);
1041
1042 if (rt->hostname.start != NULL) {
1043 nxt_memcpy_lowcase(rt->hostname.start, (u_char *) hostname, length);
1044 return NXT_OK;
1045 }
1046
1047 return NXT_ERROR;
1048}
1049
1050
1051static nxt_int_t
1052nxt_runtime_log_files_init(nxt_runtime_t *rt)
1053{
1054 nxt_file_t *file;
1055 nxt_list_t *log_files;
1056
1057 log_files = nxt_list_create(rt->mem_pool, 1, sizeof(nxt_file_t));
1058
1059 if (nxt_fast_path(log_files != NULL)) {
1060 rt->log_files = log_files;
1061
1062 /* Preallocate the main log. This allocation cannot fail. */
1063 file = nxt_list_zero_add(log_files);
1064
1065 file->fd = NXT_FILE_INVALID;
1066 file->log_level = NXT_LOG_ALERT;
1067
1068 return NXT_OK;
1069 }
1070
1071 return NXT_ERROR;
1072}
1073
1074
1075nxt_file_t *
1076nxt_runtime_log_file_add(nxt_runtime_t *rt, nxt_str_t *name)
1077{
1078 nxt_int_t ret;
1079 nxt_file_t *file;
1080 nxt_file_name_str_t file_name;
1081
1082 ret = nxt_file_name_create(rt->mem_pool, &file_name, "V%Z", name);
1083
1084 if (nxt_slow_path(ret != NXT_OK)) {
1085 return NULL;
1086 }
1087
1088 nxt_list_each(file, rt->log_files) {
1089
1090 /* STUB: hardecoded case sensitive/insensitive. */
1091
1092 if (file->name != NULL
1093 && nxt_file_name_eq(file->name, file_name.start))
1094 {
1095 return file;
1096 }
1097
1098 } nxt_list_loop;
1099
1100 file = nxt_list_zero_add(rt->log_files);
1101
1102 if (nxt_slow_path(file == NULL)) {
1103 return NULL;
1104 }
1105
1106 file->fd = NXT_FILE_INVALID;
1107 file->log_level = NXT_LOG_ALERT;
1108 file->name = file_name.start;
1109
1110 return file;
1111}
1112
1113
1114static nxt_int_t
1115nxt_runtime_log_files_create(nxt_task_t *task, nxt_runtime_t *rt)
1116{
1117 nxt_int_t ret;
1118 nxt_file_t *file;
1119
1120 nxt_list_each(file, rt->log_files) {
1121
1122 ret = nxt_file_open(task, file, O_WRONLY | O_APPEND, O_CREAT,
1123 NXT_FILE_OWNER_ACCESS);
1124
1125 if (ret != NXT_OK) {
1126 return NXT_ERROR;
1127 }
1128
1129 } nxt_list_loop;
1130
1131 file = nxt_list_first(rt->log_files);
1132
1133 return nxt_file_stderr(file);
1134}
1135
1136
1137nxt_int_t
1138nxt_runtime_listen_sockets_create(nxt_task_t *task, nxt_runtime_t *rt)
1139{
1140 nxt_int_t ret;
1141 nxt_uint_t c, p, ncurr, nprev;
1142 nxt_listen_socket_t *curr, *prev;
1143
1144 curr = rt->listen_sockets->elts;
1145 ncurr = rt->listen_sockets->nelts;
1146
1147 if (rt->inherited_sockets != NULL) {
1148 prev = rt->inherited_sockets->elts;
1149 nprev = rt->inherited_sockets->nelts;
1150
1151 } else {
1152 prev = NULL;
1153 nprev = 0;
1154 }
1155
1156 for (c = 0; c < ncurr; c++) {
1157
1158 for (p = 0; p < nprev; p++) {
1159
1160 if (nxt_sockaddr_cmp(curr[c].sockaddr, prev[p].sockaddr)) {
1161
1162 ret = nxt_listen_socket_update(task, &curr[c], &prev[p]);
1163 if (ret != NXT_OK) {
1164 return NXT_ERROR;
1165 }
1166
1167 goto next;
1168 }
1169 }
1170
1171 if (nxt_listen_socket_create(task, &curr[c], 0) != NXT_OK) {
1172 return NXT_ERROR;
1173 }
1174
1175 next:
1176
1177 continue;
1178 }
1179
1180 return NXT_OK;
1181}
1182
1183
1184nxt_int_t
1185nxt_runtime_listen_sockets_enable(nxt_task_t *task, nxt_runtime_t *rt)
1186{
1187 nxt_uint_t i, n;
1188 nxt_listen_socket_t *ls;
1189
1190 ls = rt->listen_sockets->elts;
1191 n = rt->listen_sockets->nelts;
1192
1193 for (i = 0; i < n; i++) {
1194 if (ls[i].flags == NXT_NONBLOCK) {
1195 if (nxt_listen_event(task, &ls[i]) == NULL) {
1196 return NXT_ERROR;
1197 }
1198 }
1199 }
1200
1201 return NXT_OK;
1202}
1203
1204
1205nxt_str_t *
1206nxt_current_directory(nxt_mp_t *mp)
1207{
1208 size_t length;
1209 u_char *p;
1210 nxt_str_t *name;
1211 char buf[NXT_MAX_PATH_LEN];
1212
1213 length = nxt_dir_current(buf, NXT_MAX_PATH_LEN);
1214
1215 if (nxt_fast_path(length != 0)) {
1216 name = nxt_str_alloc(mp, length + 1);
1217
1218 if (nxt_fast_path(name != NULL)) {
1219 p = nxt_cpymem(name->start, buf, length);
1220 *p = '/';
1221
1222 return name;
1223 }
1224 }
1225
1226 return NULL;
1227}
1228
1229
1230static nxt_int_t
1231nxt_runtime_pid_file_create(nxt_task_t *task, nxt_file_name_t *pid_file)
1232{
1233 ssize_t length;
1234 nxt_int_t n;
1235 nxt_file_t file;
1236 u_char pid[NXT_INT64_T_LEN + NXT_LINEFEED_SIZE];
1237
1238 nxt_memzero(&file, sizeof(nxt_file_t));
1239
1240 file.name = pid_file;
1241
1242 n = nxt_file_open(task, &file, O_WRONLY, O_CREAT | O_TRUNC,
1243 NXT_FILE_DEFAULT_ACCESS);
1244
1245 if (n != NXT_OK) {
1246 return NXT_ERROR;
1247 }
1248
1249 length = nxt_sprintf(pid, pid + sizeof(pid), "%PI%n", nxt_pid) - pid;
1250
1251 if (nxt_file_write(&file, pid, length, 0) != length) {
1252 return NXT_ERROR;
1253 }
1254
1255 nxt_file_close(task, &file);
1256
1257 return NXT_OK;
1258}
1259
1260
1261nxt_process_t *
1262nxt_runtime_process_new(nxt_runtime_t *rt)
1263{
1264 nxt_process_t *process;
1265
1266 /* TODO: memory failures. */
1267
1268 process = nxt_mp_zalloc(rt->mem_pool, sizeof(nxt_process_t));
1269 if (nxt_slow_path(process == NULL)) {
1270 return NULL;
1271 }
1272
1273 nxt_queue_init(&process->ports);
1274
1275 nxt_thread_mutex_create(&process->incoming.mutex);
1276 nxt_thread_mutex_create(&process->outgoing.mutex);
1277 nxt_thread_mutex_create(&process->cp_mutex);
1278
1279 process->use_count = 1;
1280
1281 return process;
1282}
1283
1284
1285static void
1286nxt_runtime_process_destroy(nxt_runtime_t *rt, nxt_process_t *process)
1287{
1288 nxt_port_t *port;
1289
1290 nxt_assert(process->use_count == 0);
1291 nxt_assert(process->registered == 0);
1292
1293 nxt_port_mmaps_destroy(&process->incoming, 1);
1294 nxt_port_mmaps_destroy(&process->outgoing, 1);
1295
1296 do {
1297 port = nxt_port_hash_retrieve(&process->connected_ports);
1298
1299 } while (port != NULL);
1300
1301 nxt_thread_mutex_destroy(&process->incoming.mutex);
1302 nxt_thread_mutex_destroy(&process->outgoing.mutex);
1303 nxt_thread_mutex_destroy(&process->cp_mutex);
1304
1305 nxt_mp_free(rt->mem_pool, process);
1306}
1307
1308
1309static nxt_int_t
1310nxt_runtime_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data)
1311{
1312 nxt_process_t *process;
1313
1314 process = data;
1315
1316 if (lhq->key.length == sizeof(nxt_pid_t)
1317 && *(nxt_pid_t *) lhq->key.start == process->pid)
1318 {
1319 return NXT_OK;
1320 }
1321
1322 return NXT_DECLINED;
1323}
1324
1325static const nxt_lvlhsh_proto_t lvlhsh_processes_proto nxt_aligned(64) = {
1326 NXT_LVLHSH_DEFAULT,
1327 nxt_runtime_lvlhsh_pid_test,
1328 nxt_lvlhsh_alloc,
1329 nxt_lvlhsh_free,
1330};
1331
1332
1333nxt_inline void
1334nxt_runtime_process_lhq_pid(nxt_lvlhsh_query_t *lhq, nxt_pid_t *pid)
1335{
1336 lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid));
1337 lhq->key.length = sizeof(*pid);
1338 lhq->key.start = (u_char *) pid;
1339 lhq->proto = &lvlhsh_processes_proto;
1340}
1341
1342
1343nxt_process_t *
1344nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid)
1345{
1346 nxt_process_t *process;
1347 nxt_lvlhsh_query_t lhq;
1348
1349 process = NULL;
1350
1351 nxt_runtime_process_lhq_pid(&lhq, &pid);
1352
1353 nxt_thread_mutex_lock(&rt->processes_mutex);
1354
1355 if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) {
1356 process = lhq.value;
1357
1358 } else {
1359 nxt_thread_log_debug("process %PI not found", pid);
1360 }
1361
1362 nxt_thread_mutex_unlock(&rt->processes_mutex);
1363
1364 return process;
1365}
1366
1367
1368nxt_process_t *
1369nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid)
1370{
1371 nxt_process_t *process;
1372 nxt_lvlhsh_query_t lhq;
1373
1374 nxt_runtime_process_lhq_pid(&lhq, &pid);
1375
1376 nxt_thread_mutex_lock(&rt->processes_mutex);
1377
1378 if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) {
1379 nxt_thread_log_debug("process %PI found", pid);
1380
1381 nxt_thread_mutex_unlock(&rt->processes_mutex);
1382
1383 process = lhq.value;
1384 process->use_count++;
1385
1386 return process;
1387 }
1388
1389 process = nxt_runtime_process_new(rt);
1390 if (nxt_slow_path(process == NULL)) {
1391
1392 nxt_thread_mutex_unlock(&rt->processes_mutex);
1393
1394 return NULL;
1395 }
1396
1397 process->pid = pid;
1398
1399 lhq.replace = 0;
1400 lhq.value = process;
1401 lhq.pool = rt->mem_pool;
1402
1403 switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) {
1404
1405 case NXT_OK:
1406 if (rt->nprocesses == 0) {
1407 rt->mprocess = process;
1408 }
1409
1410 rt->nprocesses++;
1411
1412 process->registered = 1;
1413
1414 nxt_thread_log_debug("process %PI insert", pid);
1415 break;
1416
1417 default:
1418 nxt_thread_log_debug("process %PI insert failed", pid);
1419 break;
1420 }
1421
1422 nxt_thread_mutex_unlock(&rt->processes_mutex);
1423
1424 return process;
1425}
1426
1427
1428void
1429nxt_runtime_process_add(nxt_task_t *task, nxt_process_t *process)
1430{
1431 nxt_port_t *port;
1432 nxt_runtime_t *rt;
1433 nxt_lvlhsh_query_t lhq;
1434
1435 nxt_assert(process->registered == 0);
1436
1437 rt = task->thread->runtime;
1438
1439 nxt_runtime_process_lhq_pid(&lhq, &process->pid);
1440
1441 lhq.replace = 0;
1442 lhq.value = process;
1443 lhq.pool = rt->mem_pool;
1444
1445 nxt_thread_mutex_lock(&rt->processes_mutex);
1446
1447 switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) {
1448
1449 case NXT_OK:
1450 if (rt->nprocesses == 0) {
1451 rt->mprocess = process;
1452 }
1453
1454 rt->nprocesses++;
1455
1456 nxt_process_port_each(process, port) {
1457
1458 port->pid = process->pid;
1459
1460 nxt_runtime_port_add(task, port);
1461
1462 } nxt_process_port_loop;
1463
1464 process->registered = 1;
1465
1466 nxt_thread_log_debug("process %PI added", process->pid);
1467 break;
1468
1469 default:
1470 nxt_thread_log_debug("process %PI failed to add", process->pid);
1471 break;
1472 }
1473
1474 nxt_thread_mutex_unlock(&rt->processes_mutex);
1475}
1476
1477
1478static nxt_process_t *
1479nxt_runtime_process_remove_pid(nxt_runtime_t *rt, nxt_pid_t pid)
1480{
1481 nxt_process_t *process;
1482 nxt_lvlhsh_query_t lhq;
1483
1484 process = NULL;
1485
1486 nxt_runtime_process_lhq_pid(&lhq, &pid);
1487
1488 lhq.pool = rt->mem_pool;
1489
1490 nxt_thread_mutex_lock(&rt->processes_mutex);
1491
1492 switch (nxt_lvlhsh_delete(&rt->processes, &lhq)) {
1493
1494 case NXT_OK:
1495 rt->nprocesses--;
1496
1497 process = lhq.value;
1498
1499 process->registered = 0;
1500
1501 nxt_thread_log_debug("process %PI removed", pid);
1502 break;
1503
1504 default:
1505 nxt_thread_log_debug("process %PI remove failed", pid);
1506 break;
1507 }
1508
1509 nxt_thread_mutex_unlock(&rt->processes_mutex);
1510
1511 return process;
1512}
1513
1514
1515void
1516nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i)
1517{
1518 nxt_runtime_t *rt;
1519
1520 process->use_count += i;
1521
1522 if (process->use_count == 0) {
1523 rt = task->thread->runtime;
1524
1525 if (process->registered == 1) {
1526 nxt_runtime_process_remove_pid(rt, process->pid);
1527 }
1528
1529 nxt_runtime_process_destroy(rt, process);
1530 }
1531}
1532
1533
1534nxt_process_t *
1535nxt_runtime_process_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe)
1536{
1537 nxt_lvlhsh_each_init(lhe, &lvlhsh_processes_proto);
1538
1539 return nxt_runtime_process_next(rt, lhe);
1540}
1541
1542
1543void
1544nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port)
1545{
1546 nxt_int_t res;
1547 nxt_runtime_t *rt;
1548
1549 rt = task->thread->runtime;
1550
1551 res = nxt_port_hash_add(&rt->ports, port);
1552
1553 if (res != NXT_OK) {
1554 return;
1555 }
1556
1557 rt->port_by_type[port->type] = port;
1558
1559 nxt_port_use(task, port, 1);
1560}
1561
1562
1563void
1564nxt_runtime_port_remove(nxt_task_t *task, nxt_port_t *port)
1565{
1566 nxt_int_t res;
1567 nxt_runtime_t *rt;
1568
1569 rt = task->thread->runtime;
1570
1571 res = nxt_port_hash_remove(&rt->ports, port);
1572
1573 if (res != NXT_OK) {
1574 return;
1575 }
1576
1577 if (rt->port_by_type[port->type] == port) {
1578 rt->port_by_type[port->type] = NULL;
1579 }
1580
1581 nxt_port_use(task, port, -1);
1582}
1583
1584
1585nxt_port_t *
1586nxt_runtime_port_find(nxt_runtime_t *rt, nxt_pid_t pid,
1587 nxt_port_id_t port_id)
1588{
1589 return nxt_port_hash_find(&rt->ports, pid, port_id);
1590}
968 }
969
970 exit(0);
971 }
972
973 end = nxt_sprintf(buf, buf + sizeof(buf), "unknown option \"%s\", "
974 "try \"%s -h\" for available options\n",
975 p, nxt_process_argv[0]);
976
977 write(STDERR_FILENO, buf, end - buf);
978
979 return NXT_ERROR;
980 }
981
982 return NXT_OK;
983}
984
985
986nxt_listen_socket_t *
987nxt_runtime_listen_socket_add(nxt_runtime_t *rt, nxt_sockaddr_t *sa)
988{
989 nxt_mp_t *mp;
990 nxt_listen_socket_t *ls;
991
992 ls = nxt_array_zero_add(rt->listen_sockets);
993 if (ls == NULL) {
994 return NULL;
995 }
996
997 mp = rt->mem_pool;
998
999 ls->sockaddr = nxt_sockaddr_create(mp, &sa->u.sockaddr, sa->socklen,
1000 sa->length);
1001 if (ls->sockaddr == NULL) {
1002 return NULL;
1003 }
1004
1005 ls->sockaddr->type = sa->type;
1006
1007 nxt_sockaddr_text(ls->sockaddr);
1008
1009 ls->socket = -1;
1010 ls->backlog = NXT_LISTEN_BACKLOG;
1011
1012 return ls;
1013}
1014
1015
1016static nxt_int_t
1017nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt)
1018{
1019 size_t length;
1020 char hostname[NXT_MAXHOSTNAMELEN + 1];
1021
1022 if (gethostname(hostname, NXT_MAXHOSTNAMELEN) != 0) {
1023 nxt_alert(task, "gethostname() failed %E", nxt_errno);
1024 return NXT_ERROR;
1025 }
1026
1027 /*
1028 * Linux gethostname(2):
1029 *
1030 * If the null-terminated hostname is too large to fit,
1031 * then the name is truncated, and no error is returned.
1032 *
1033 * For this reason an additional byte is reserved in the buffer.
1034 */
1035 hostname[NXT_MAXHOSTNAMELEN] = '\0';
1036
1037 length = nxt_strlen(hostname);
1038 rt->hostname.length = length;
1039
1040 rt->hostname.start = nxt_mp_nget(rt->mem_pool, length);
1041
1042 if (rt->hostname.start != NULL) {
1043 nxt_memcpy_lowcase(rt->hostname.start, (u_char *) hostname, length);
1044 return NXT_OK;
1045 }
1046
1047 return NXT_ERROR;
1048}
1049
1050
1051static nxt_int_t
1052nxt_runtime_log_files_init(nxt_runtime_t *rt)
1053{
1054 nxt_file_t *file;
1055 nxt_list_t *log_files;
1056
1057 log_files = nxt_list_create(rt->mem_pool, 1, sizeof(nxt_file_t));
1058
1059 if (nxt_fast_path(log_files != NULL)) {
1060 rt->log_files = log_files;
1061
1062 /* Preallocate the main log. This allocation cannot fail. */
1063 file = nxt_list_zero_add(log_files);
1064
1065 file->fd = NXT_FILE_INVALID;
1066 file->log_level = NXT_LOG_ALERT;
1067
1068 return NXT_OK;
1069 }
1070
1071 return NXT_ERROR;
1072}
1073
1074
1075nxt_file_t *
1076nxt_runtime_log_file_add(nxt_runtime_t *rt, nxt_str_t *name)
1077{
1078 nxt_int_t ret;
1079 nxt_file_t *file;
1080 nxt_file_name_str_t file_name;
1081
1082 ret = nxt_file_name_create(rt->mem_pool, &file_name, "V%Z", name);
1083
1084 if (nxt_slow_path(ret != NXT_OK)) {
1085 return NULL;
1086 }
1087
1088 nxt_list_each(file, rt->log_files) {
1089
1090 /* STUB: hardecoded case sensitive/insensitive. */
1091
1092 if (file->name != NULL
1093 && nxt_file_name_eq(file->name, file_name.start))
1094 {
1095 return file;
1096 }
1097
1098 } nxt_list_loop;
1099
1100 file = nxt_list_zero_add(rt->log_files);
1101
1102 if (nxt_slow_path(file == NULL)) {
1103 return NULL;
1104 }
1105
1106 file->fd = NXT_FILE_INVALID;
1107 file->log_level = NXT_LOG_ALERT;
1108 file->name = file_name.start;
1109
1110 return file;
1111}
1112
1113
1114static nxt_int_t
1115nxt_runtime_log_files_create(nxt_task_t *task, nxt_runtime_t *rt)
1116{
1117 nxt_int_t ret;
1118 nxt_file_t *file;
1119
1120 nxt_list_each(file, rt->log_files) {
1121
1122 ret = nxt_file_open(task, file, O_WRONLY | O_APPEND, O_CREAT,
1123 NXT_FILE_OWNER_ACCESS);
1124
1125 if (ret != NXT_OK) {
1126 return NXT_ERROR;
1127 }
1128
1129 } nxt_list_loop;
1130
1131 file = nxt_list_first(rt->log_files);
1132
1133 return nxt_file_stderr(file);
1134}
1135
1136
1137nxt_int_t
1138nxt_runtime_listen_sockets_create(nxt_task_t *task, nxt_runtime_t *rt)
1139{
1140 nxt_int_t ret;
1141 nxt_uint_t c, p, ncurr, nprev;
1142 nxt_listen_socket_t *curr, *prev;
1143
1144 curr = rt->listen_sockets->elts;
1145 ncurr = rt->listen_sockets->nelts;
1146
1147 if (rt->inherited_sockets != NULL) {
1148 prev = rt->inherited_sockets->elts;
1149 nprev = rt->inherited_sockets->nelts;
1150
1151 } else {
1152 prev = NULL;
1153 nprev = 0;
1154 }
1155
1156 for (c = 0; c < ncurr; c++) {
1157
1158 for (p = 0; p < nprev; p++) {
1159
1160 if (nxt_sockaddr_cmp(curr[c].sockaddr, prev[p].sockaddr)) {
1161
1162 ret = nxt_listen_socket_update(task, &curr[c], &prev[p]);
1163 if (ret != NXT_OK) {
1164 return NXT_ERROR;
1165 }
1166
1167 goto next;
1168 }
1169 }
1170
1171 if (nxt_listen_socket_create(task, &curr[c], 0) != NXT_OK) {
1172 return NXT_ERROR;
1173 }
1174
1175 next:
1176
1177 continue;
1178 }
1179
1180 return NXT_OK;
1181}
1182
1183
1184nxt_int_t
1185nxt_runtime_listen_sockets_enable(nxt_task_t *task, nxt_runtime_t *rt)
1186{
1187 nxt_uint_t i, n;
1188 nxt_listen_socket_t *ls;
1189
1190 ls = rt->listen_sockets->elts;
1191 n = rt->listen_sockets->nelts;
1192
1193 for (i = 0; i < n; i++) {
1194 if (ls[i].flags == NXT_NONBLOCK) {
1195 if (nxt_listen_event(task, &ls[i]) == NULL) {
1196 return NXT_ERROR;
1197 }
1198 }
1199 }
1200
1201 return NXT_OK;
1202}
1203
1204
1205nxt_str_t *
1206nxt_current_directory(nxt_mp_t *mp)
1207{
1208 size_t length;
1209 u_char *p;
1210 nxt_str_t *name;
1211 char buf[NXT_MAX_PATH_LEN];
1212
1213 length = nxt_dir_current(buf, NXT_MAX_PATH_LEN);
1214
1215 if (nxt_fast_path(length != 0)) {
1216 name = nxt_str_alloc(mp, length + 1);
1217
1218 if (nxt_fast_path(name != NULL)) {
1219 p = nxt_cpymem(name->start, buf, length);
1220 *p = '/';
1221
1222 return name;
1223 }
1224 }
1225
1226 return NULL;
1227}
1228
1229
1230static nxt_int_t
1231nxt_runtime_pid_file_create(nxt_task_t *task, nxt_file_name_t *pid_file)
1232{
1233 ssize_t length;
1234 nxt_int_t n;
1235 nxt_file_t file;
1236 u_char pid[NXT_INT64_T_LEN + NXT_LINEFEED_SIZE];
1237
1238 nxt_memzero(&file, sizeof(nxt_file_t));
1239
1240 file.name = pid_file;
1241
1242 n = nxt_file_open(task, &file, O_WRONLY, O_CREAT | O_TRUNC,
1243 NXT_FILE_DEFAULT_ACCESS);
1244
1245 if (n != NXT_OK) {
1246 return NXT_ERROR;
1247 }
1248
1249 length = nxt_sprintf(pid, pid + sizeof(pid), "%PI%n", nxt_pid) - pid;
1250
1251 if (nxt_file_write(&file, pid, length, 0) != length) {
1252 return NXT_ERROR;
1253 }
1254
1255 nxt_file_close(task, &file);
1256
1257 return NXT_OK;
1258}
1259
1260
1261nxt_process_t *
1262nxt_runtime_process_new(nxt_runtime_t *rt)
1263{
1264 nxt_process_t *process;
1265
1266 /* TODO: memory failures. */
1267
1268 process = nxt_mp_zalloc(rt->mem_pool, sizeof(nxt_process_t));
1269 if (nxt_slow_path(process == NULL)) {
1270 return NULL;
1271 }
1272
1273 nxt_queue_init(&process->ports);
1274
1275 nxt_thread_mutex_create(&process->incoming.mutex);
1276 nxt_thread_mutex_create(&process->outgoing.mutex);
1277 nxt_thread_mutex_create(&process->cp_mutex);
1278
1279 process->use_count = 1;
1280
1281 return process;
1282}
1283
1284
1285static void
1286nxt_runtime_process_destroy(nxt_runtime_t *rt, nxt_process_t *process)
1287{
1288 nxt_port_t *port;
1289
1290 nxt_assert(process->use_count == 0);
1291 nxt_assert(process->registered == 0);
1292
1293 nxt_port_mmaps_destroy(&process->incoming, 1);
1294 nxt_port_mmaps_destroy(&process->outgoing, 1);
1295
1296 do {
1297 port = nxt_port_hash_retrieve(&process->connected_ports);
1298
1299 } while (port != NULL);
1300
1301 nxt_thread_mutex_destroy(&process->incoming.mutex);
1302 nxt_thread_mutex_destroy(&process->outgoing.mutex);
1303 nxt_thread_mutex_destroy(&process->cp_mutex);
1304
1305 nxt_mp_free(rt->mem_pool, process);
1306}
1307
1308
1309static nxt_int_t
1310nxt_runtime_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data)
1311{
1312 nxt_process_t *process;
1313
1314 process = data;
1315
1316 if (lhq->key.length == sizeof(nxt_pid_t)
1317 && *(nxt_pid_t *) lhq->key.start == process->pid)
1318 {
1319 return NXT_OK;
1320 }
1321
1322 return NXT_DECLINED;
1323}
1324
1325static const nxt_lvlhsh_proto_t lvlhsh_processes_proto nxt_aligned(64) = {
1326 NXT_LVLHSH_DEFAULT,
1327 nxt_runtime_lvlhsh_pid_test,
1328 nxt_lvlhsh_alloc,
1329 nxt_lvlhsh_free,
1330};
1331
1332
1333nxt_inline void
1334nxt_runtime_process_lhq_pid(nxt_lvlhsh_query_t *lhq, nxt_pid_t *pid)
1335{
1336 lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid));
1337 lhq->key.length = sizeof(*pid);
1338 lhq->key.start = (u_char *) pid;
1339 lhq->proto = &lvlhsh_processes_proto;
1340}
1341
1342
1343nxt_process_t *
1344nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid)
1345{
1346 nxt_process_t *process;
1347 nxt_lvlhsh_query_t lhq;
1348
1349 process = NULL;
1350
1351 nxt_runtime_process_lhq_pid(&lhq, &pid);
1352
1353 nxt_thread_mutex_lock(&rt->processes_mutex);
1354
1355 if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) {
1356 process = lhq.value;
1357
1358 } else {
1359 nxt_thread_log_debug("process %PI not found", pid);
1360 }
1361
1362 nxt_thread_mutex_unlock(&rt->processes_mutex);
1363
1364 return process;
1365}
1366
1367
1368nxt_process_t *
1369nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid)
1370{
1371 nxt_process_t *process;
1372 nxt_lvlhsh_query_t lhq;
1373
1374 nxt_runtime_process_lhq_pid(&lhq, &pid);
1375
1376 nxt_thread_mutex_lock(&rt->processes_mutex);
1377
1378 if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) {
1379 nxt_thread_log_debug("process %PI found", pid);
1380
1381 nxt_thread_mutex_unlock(&rt->processes_mutex);
1382
1383 process = lhq.value;
1384 process->use_count++;
1385
1386 return process;
1387 }
1388
1389 process = nxt_runtime_process_new(rt);
1390 if (nxt_slow_path(process == NULL)) {
1391
1392 nxt_thread_mutex_unlock(&rt->processes_mutex);
1393
1394 return NULL;
1395 }
1396
1397 process->pid = pid;
1398
1399 lhq.replace = 0;
1400 lhq.value = process;
1401 lhq.pool = rt->mem_pool;
1402
1403 switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) {
1404
1405 case NXT_OK:
1406 if (rt->nprocesses == 0) {
1407 rt->mprocess = process;
1408 }
1409
1410 rt->nprocesses++;
1411
1412 process->registered = 1;
1413
1414 nxt_thread_log_debug("process %PI insert", pid);
1415 break;
1416
1417 default:
1418 nxt_thread_log_debug("process %PI insert failed", pid);
1419 break;
1420 }
1421
1422 nxt_thread_mutex_unlock(&rt->processes_mutex);
1423
1424 return process;
1425}
1426
1427
1428void
1429nxt_runtime_process_add(nxt_task_t *task, nxt_process_t *process)
1430{
1431 nxt_port_t *port;
1432 nxt_runtime_t *rt;
1433 nxt_lvlhsh_query_t lhq;
1434
1435 nxt_assert(process->registered == 0);
1436
1437 rt = task->thread->runtime;
1438
1439 nxt_runtime_process_lhq_pid(&lhq, &process->pid);
1440
1441 lhq.replace = 0;
1442 lhq.value = process;
1443 lhq.pool = rt->mem_pool;
1444
1445 nxt_thread_mutex_lock(&rt->processes_mutex);
1446
1447 switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) {
1448
1449 case NXT_OK:
1450 if (rt->nprocesses == 0) {
1451 rt->mprocess = process;
1452 }
1453
1454 rt->nprocesses++;
1455
1456 nxt_process_port_each(process, port) {
1457
1458 port->pid = process->pid;
1459
1460 nxt_runtime_port_add(task, port);
1461
1462 } nxt_process_port_loop;
1463
1464 process->registered = 1;
1465
1466 nxt_thread_log_debug("process %PI added", process->pid);
1467 break;
1468
1469 default:
1470 nxt_thread_log_debug("process %PI failed to add", process->pid);
1471 break;
1472 }
1473
1474 nxt_thread_mutex_unlock(&rt->processes_mutex);
1475}
1476
1477
1478static nxt_process_t *
1479nxt_runtime_process_remove_pid(nxt_runtime_t *rt, nxt_pid_t pid)
1480{
1481 nxt_process_t *process;
1482 nxt_lvlhsh_query_t lhq;
1483
1484 process = NULL;
1485
1486 nxt_runtime_process_lhq_pid(&lhq, &pid);
1487
1488 lhq.pool = rt->mem_pool;
1489
1490 nxt_thread_mutex_lock(&rt->processes_mutex);
1491
1492 switch (nxt_lvlhsh_delete(&rt->processes, &lhq)) {
1493
1494 case NXT_OK:
1495 rt->nprocesses--;
1496
1497 process = lhq.value;
1498
1499 process->registered = 0;
1500
1501 nxt_thread_log_debug("process %PI removed", pid);
1502 break;
1503
1504 default:
1505 nxt_thread_log_debug("process %PI remove failed", pid);
1506 break;
1507 }
1508
1509 nxt_thread_mutex_unlock(&rt->processes_mutex);
1510
1511 return process;
1512}
1513
1514
1515void
1516nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i)
1517{
1518 nxt_runtime_t *rt;
1519
1520 process->use_count += i;
1521
1522 if (process->use_count == 0) {
1523 rt = task->thread->runtime;
1524
1525 if (process->registered == 1) {
1526 nxt_runtime_process_remove_pid(rt, process->pid);
1527 }
1528
1529 nxt_runtime_process_destroy(rt, process);
1530 }
1531}
1532
1533
1534nxt_process_t *
1535nxt_runtime_process_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe)
1536{
1537 nxt_lvlhsh_each_init(lhe, &lvlhsh_processes_proto);
1538
1539 return nxt_runtime_process_next(rt, lhe);
1540}
1541
1542
1543void
1544nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port)
1545{
1546 nxt_int_t res;
1547 nxt_runtime_t *rt;
1548
1549 rt = task->thread->runtime;
1550
1551 res = nxt_port_hash_add(&rt->ports, port);
1552
1553 if (res != NXT_OK) {
1554 return;
1555 }
1556
1557 rt->port_by_type[port->type] = port;
1558
1559 nxt_port_use(task, port, 1);
1560}
1561
1562
1563void
1564nxt_runtime_port_remove(nxt_task_t *task, nxt_port_t *port)
1565{
1566 nxt_int_t res;
1567 nxt_runtime_t *rt;
1568
1569 rt = task->thread->runtime;
1570
1571 res = nxt_port_hash_remove(&rt->ports, port);
1572
1573 if (res != NXT_OK) {
1574 return;
1575 }
1576
1577 if (rt->port_by_type[port->type] == port) {
1578 rt->port_by_type[port->type] = NULL;
1579 }
1580
1581 nxt_port_use(task, port, -1);
1582}
1583
1584
1585nxt_port_t *
1586nxt_runtime_port_find(nxt_runtime_t *rt, nxt_pid_t pid,
1587 nxt_port_id_t port_id)
1588{
1589 return nxt_port_hash_find(&rt->ports, pid, port_id);
1590}