nxt_runtime.c (196:7d6a0c9b661e) nxt_runtime.c (211:6738bb76ae0a)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_main.h>
9#include <nxt_runtime.h>
10#include <nxt_port.h>
11#include <nxt_master_process.h>
12#include <nxt_router.h>
13
14
15static nxt_int_t nxt_runtime_inherited_listen_sockets(nxt_task_t *task,
16 nxt_runtime_t *rt);
17static nxt_int_t nxt_runtime_systemd_listen_sockets(nxt_task_t *task,
18 nxt_runtime_t *rt);
19static nxt_int_t nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt);
20static nxt_int_t nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt);
21static void nxt_runtime_start(nxt_task_t *task, void *obj, void *data);
22static void nxt_runtime_initial_start(nxt_task_t *task);
23static void nxt_single_process_start(nxt_thread_t *thr, nxt_task_t *task,
24 nxt_runtime_t *rt);
25static void nxt_runtime_close_idle_connections(nxt_event_engine_t *engine);
26static void nxt_runtime_exit(nxt_task_t *task, void *obj, void *data);
27static nxt_int_t nxt_runtime_event_engine_change(nxt_task_t *task,
28 nxt_runtime_t *rt);
29static nxt_int_t nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt);
30static nxt_int_t nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt);
31static nxt_sockaddr_t *nxt_runtime_sockaddr_parse(nxt_task_t *task,
32 nxt_mp_t *mp, nxt_str_t *addr);
33static nxt_sockaddr_t *nxt_runtime_sockaddr_unix_parse(nxt_task_t *task,
34 nxt_mp_t *mp, nxt_str_t *addr);
35static nxt_sockaddr_t *nxt_runtime_sockaddr_inet6_parse(nxt_task_t *task,
36 nxt_mp_t *mp, nxt_str_t *addr);
37static nxt_sockaddr_t *nxt_runtime_sockaddr_inet_parse(nxt_task_t *task,
38 nxt_mp_t *mp, nxt_str_t *addr);
39static nxt_int_t nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt);
40static nxt_int_t nxt_runtime_log_files_init(nxt_runtime_t *rt);
41static nxt_int_t nxt_runtime_log_files_create(nxt_task_t *task,
42 nxt_runtime_t *rt);
43static nxt_int_t nxt_runtime_pid_file_create(nxt_task_t *task,
44 nxt_file_name_t *pid_file);
45
46#if (NXT_THREADS)
47static void nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt,
48 nxt_runtime_cont_t cont);
49#endif
50
51static void nxt_runtime_process_destroy(nxt_runtime_t *rt,
52 nxt_process_t *process);
53static nxt_process_t *nxt_runtime_process_remove_pid(nxt_runtime_t *rt,
54 nxt_pid_t pid);
55
56
57nxt_int_t
58nxt_runtime_create(nxt_task_t *task)
59{
60 nxt_mp_t *mp;
61 nxt_int_t ret;
62 nxt_array_t *listen_sockets;
63 nxt_runtime_t *rt;
64
65 mp = nxt_mp_create(1024, 128, 256, 32);
66
67 if (nxt_slow_path(mp == NULL)) {
68 return NXT_ERROR;
69 }
70
71 rt = nxt_mp_zget(mp, sizeof(nxt_runtime_t));
72 if (nxt_slow_path(rt == NULL)) {
73 return NXT_ERROR;
74 }
75
76 task->thread->runtime = rt;
77 rt->mem_pool = mp;
78
79 nxt_thread_mutex_create(&rt->processes_mutex);
80
81 rt->prefix = nxt_current_directory(mp);
82 if (nxt_slow_path(rt->prefix == NULL)) {
83 goto fail;
84 }
85
86 rt->conf_prefix = rt->prefix;
87
88 rt->services = nxt_services_init(mp);
89 if (nxt_slow_path(rt->services == NULL)) {
90 goto fail;
91 }
92
93 listen_sockets = nxt_array_create(mp, 1, sizeof(nxt_listen_socket_t));
94 if (nxt_slow_path(listen_sockets == NULL)) {
95 goto fail;
96 }
97
98 rt->listen_sockets = listen_sockets;
99
100 ret = nxt_runtime_inherited_listen_sockets(task, rt);
101 if (nxt_slow_path(ret != NXT_OK)) {
102 goto fail;
103 }
104
105 if (nxt_runtime_hostname(task, rt) != NXT_OK) {
106 goto fail;
107 }
108
109 if (nxt_slow_path(nxt_runtime_log_files_init(rt) != NXT_OK)) {
110 goto fail;
111 }
112
113 if (nxt_runtime_event_engines(task, rt) != NXT_OK) {
114 goto fail;
115 }
116
117 if (nxt_slow_path(nxt_runtime_thread_pools(task->thread, rt) != NXT_OK)) {
118 goto fail;
119 }
120
121 rt->start = nxt_runtime_initial_start;
122
123 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
124 nxt_runtime_start, task, rt, NULL);
125
126 return NXT_OK;
127
128fail:
129
130 nxt_mp_destroy(mp);
131
132 return NXT_ERROR;
133}
134
135
136static nxt_int_t
137nxt_runtime_inherited_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt)
138{
139 u_char *v, *p;
140 nxt_int_t type;
141 nxt_array_t *inherited_sockets;
142 nxt_socket_t s;
143 nxt_listen_socket_t *ls;
144
145 v = (u_char *) getenv("NGINX");
146
147 if (v == NULL) {
148 return nxt_runtime_systemd_listen_sockets(task, rt);
149 }
150
151 nxt_log(task, NXT_LOG_CRIT, "using inherited listen sockets: %s", v);
152
153 inherited_sockets = nxt_array_create(rt->mem_pool,
154 1, sizeof(nxt_listen_socket_t));
155 if (inherited_sockets == NULL) {
156 return NXT_ERROR;
157 }
158
159 rt->inherited_sockets = inherited_sockets;
160
161 for (p = v; *p != '\0'; p++) {
162
163 if (*p == ';') {
164 s = nxt_int_parse(v, p - v);
165
166 if (nxt_slow_path(s < 0)) {
167 nxt_log(task, NXT_LOG_CRIT, "invalid socket number "
168 "\"%s\" in NGINX environment variable, "
169 "ignoring the rest of the variable", v);
170 return NXT_ERROR;
171 }
172
173 v = p + 1;
174
175 ls = nxt_array_zero_add(inherited_sockets);
176 if (nxt_slow_path(ls == NULL)) {
177 return NXT_ERROR;
178 }
179
180 ls->socket = s;
181
182 ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s);
183 if (nxt_slow_path(ls->sockaddr == NULL)) {
184 return NXT_ERROR;
185 }
186
187 type = nxt_socket_getsockopt(task, s, SOL_SOCKET, SO_TYPE);
188 if (nxt_slow_path(type == -1)) {
189 return NXT_ERROR;
190 }
191
192 ls->sockaddr->type = (uint16_t) type;
193 }
194 }
195
196 return NXT_OK;
197}
198
199
200static nxt_int_t
201nxt_runtime_systemd_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt)
202{
203 u_char *nfd, *pid;
204 nxt_int_t n;
205 nxt_array_t *inherited_sockets;
206 nxt_socket_t s;
207 nxt_listen_socket_t *ls;
208
209 /*
210 * Number of listening sockets passed. The socket
211 * descriptors start from number 3 and are sequential.
212 */
213 nfd = (u_char *) getenv("LISTEN_FDS");
214 if (nfd == NULL) {
215 return NXT_OK;
216 }
217
218 /* The pid of the service process. */
219 pid = (u_char *) getenv("LISTEN_PID");
220 if (pid == NULL) {
221 return NXT_OK;
222 }
223
224 n = nxt_int_parse(nfd, nxt_strlen(nfd));
225 if (n < 0) {
226 return NXT_OK;
227 }
228
229 if (nxt_pid != nxt_int_parse(pid, nxt_strlen(pid))) {
230 return NXT_OK;
231 }
232
233 nxt_log(task, NXT_LOG_INFO, "using %s systemd listen sockets", n);
234
235 inherited_sockets = nxt_array_create(rt->mem_pool,
236 n, sizeof(nxt_listen_socket_t));
237 if (inherited_sockets == NULL) {
238 return NXT_ERROR;
239 }
240
241 rt->inherited_sockets = inherited_sockets;
242
243 for (s = 3; s < n; s++) {
244 ls = nxt_array_zero_add(inherited_sockets);
245 if (nxt_slow_path(ls == NULL)) {
246 return NXT_ERROR;
247 }
248
249 ls->socket = s;
250
251 ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s);
252 if (nxt_slow_path(ls->sockaddr == NULL)) {
253 return NXT_ERROR;
254 }
255
256 ls->sockaddr->type = SOCK_STREAM;
257 }
258
259 return NXT_OK;
260}
261
262
263static nxt_int_t
264nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt)
265{
266 nxt_thread_t *thread;
267 nxt_event_engine_t *engine;
268 const nxt_event_interface_t *interface;
269
270 interface = nxt_service_get(rt->services, "engine", NULL);
271
272 if (nxt_slow_path(interface == NULL)) {
273 /* TODO: log */
274 return NXT_ERROR;
275 }
276
277 engine = nxt_event_engine_create(task, interface,
278 nxt_master_process_signals, 0, 0);
279
280 if (nxt_slow_path(engine == NULL)) {
281 return NXT_ERROR;
282 }
283
284 thread = task->thread;
285 thread->engine = engine;
286 thread->fiber = &engine->fibers->fiber;
287
288 engine->id = rt->last_engine_id++;
289
290 nxt_queue_init(&rt->engines);
291 nxt_queue_insert_tail(&rt->engines, &engine->link);
292
293 return NXT_OK;
294}
295
296
297static nxt_int_t
298nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt)
299{
300#if (NXT_THREADS)
301 nxt_int_t ret;
302 nxt_array_t *thread_pools;
303
304 thread_pools = nxt_array_create(rt->mem_pool, 1,
305 sizeof(nxt_thread_pool_t *));
306
307 if (nxt_slow_path(thread_pools == NULL)) {
308 return NXT_ERROR;
309 }
310
311 rt->thread_pools = thread_pools;
312 ret = nxt_runtime_thread_pool_create(thr, rt, 2, 60000 * 1000000LL);
313
314 if (nxt_slow_path(ret != NXT_OK)) {
315 return NXT_ERROR;
316 }
317
318#endif
319
320 return NXT_OK;
321}
322
323
324static void
325nxt_runtime_start(nxt_task_t *task, void *obj, void *data)
326{
327 nxt_uint_t i;
328 nxt_runtime_t *rt;
329
330 rt = obj;
331
332 nxt_debug(task, "rt conf done");
333
334 task->thread->log->ctx_handler = NULL;
335 task->thread->log->ctx = NULL;
336
337 if (nxt_runtime_conf_init(task, rt) != NXT_OK) {
338 goto fail;
339 }
340
341 for (i = 0; i < nxt_init_modules_n; i++) {
342 if (nxt_init_modules[i](task->thread, rt) != NXT_OK) {
343 goto fail;
344 }
345 }
346
347 if (nxt_runtime_log_files_create(task, rt) != NXT_OK) {
348 goto fail;
349 }
350
351 if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) {
352 goto fail;
353 }
354
355#if (NXT_THREADS)
356
357 /*
358 * Thread pools should be destroyed before starting worker
359 * processes, because thread pool semaphores will stick in
360 * locked state in new processes after fork().
361 */
362 nxt_runtime_thread_pool_destroy(task, rt, rt->start);
363
364#else
365
366 rt->start(task->thread, rt);
367
368#endif
369
370 return;
371
372fail:
373
374 nxt_runtime_quit(task);
375}
376
377
378static void
379nxt_runtime_initial_start(nxt_task_t *task)
380{
381 nxt_int_t ret;
382 nxt_thread_t *thr;
383 nxt_runtime_t *rt;
384 const nxt_event_interface_t *interface;
385
386 thr = task->thread;
387 rt = thr->runtime;
388
389 if (rt->inherited_sockets == NULL && rt->daemon) {
390
391 if (nxt_process_daemon(task) != NXT_OK) {
392 goto fail;
393 }
394
395 /*
396 * An event engine should be updated after fork()
397 * even if an event facility was not changed because:
398 * 1) inherited kqueue descriptor is invalid,
399 * 2) the signal thread is not inherited.
400 */
401 interface = nxt_service_get(rt->services, "engine", rt->engine);
402 if (interface == NULL) {
403 goto fail;
404 }
405
406 ret = nxt_event_engine_change(task->thread->engine, interface,
407 rt->batch);
408 if (ret != NXT_OK) {
409 goto fail;
410 }
411 }
412
413 ret = nxt_runtime_pid_file_create(task, rt->pid_file);
414 if (ret != NXT_OK) {
415 goto fail;
416 }
417
418 if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) {
419 goto fail;
420 }
421
422 thr->engine->max_connections = rt->engine_connections;
423
424 if (rt->master_process) {
425 if (nxt_master_process_start(thr, task, rt) != NXT_ERROR) {
426 return;
427 }
428
429 } else {
430 nxt_single_process_start(thr, task, rt);
431 return;
432 }
433
434fail:
435
436 nxt_runtime_quit(task);
437}
438
439
440static void
441nxt_single_process_start(nxt_thread_t *thr, nxt_task_t *task, nxt_runtime_t *rt)
442{
443#if (NXT_THREADS)
444 nxt_int_t ret;
445
446 ret = nxt_runtime_thread_pool_create(thr, rt, rt->auxiliary_threads,
447 60000 * 1000000LL);
448
449 if (nxt_slow_path(ret != NXT_OK)) {
450 nxt_runtime_quit(task);
451 return;
452 }
453
454#endif
455
456 rt->types |= (1U << NXT_PROCESS_SINGLE);
457
458 nxt_runtime_listen_sockets_enable(task, rt);
459
460 return;
461}
462
463
464void
465nxt_runtime_quit(nxt_task_t *task)
466{
467 nxt_bool_t done;
468 nxt_runtime_t *rt;
469 nxt_event_engine_t *engine;
470
471 rt = task->thread->runtime;
472 engine = task->thread->engine;
473
474 nxt_debug(task, "exiting");
475
476 done = 1;
477
478 if (!engine->shutdown) {
479 engine->shutdown = 1;
480
481#if (NXT_THREADS)
482
483 if (!nxt_array_is_empty(rt->thread_pools)) {
484 nxt_runtime_thread_pool_destroy(task, rt, nxt_runtime_quit);
485 done = 0;
486 }
487
488#endif
489
490 if (nxt_runtime_is_master(rt)) {
491 nxt_master_stop_worker_processes(task, rt);
492 done = 0;
493 }
494 }
495
496 nxt_runtime_close_idle_connections(engine);
497
498 if (done) {
499 nxt_work_queue_add(&engine->fast_work_queue, nxt_runtime_exit,
500 task, rt, engine);
501 }
502}
503
504
505static void
506nxt_runtime_close_idle_connections(nxt_event_engine_t *engine)
507{
508 nxt_conn_t *c;
509 nxt_queue_t *idle;
510 nxt_queue_link_t *link, *next;
511
512 nxt_debug(&engine->task, "close idle connections");
513
514 idle = &engine->idle_connections;
515
516 for (link = nxt_queue_head(idle);
517 link != nxt_queue_tail(idle);
518 link = next)
519 {
520 next = nxt_queue_next(link);
521 c = nxt_queue_link_data(link, nxt_conn_t, link);
522
523 if (!c->socket.read_ready) {
524 nxt_queue_remove(link);
525 nxt_conn_close(engine, c);
526 }
527 }
528}
529
530
531static void
532nxt_runtime_exit(nxt_task_t *task, void *obj, void *data)
533{
534 nxt_runtime_t *rt;
535 nxt_process_t *process;
536 nxt_event_engine_t *engine;
537
538 rt = obj;
539 engine = data;
540
541#if (NXT_THREADS)
542
543 nxt_debug(task, "thread pools: %d", rt->thread_pools->nelts);
544
545 if (!nxt_array_is_empty(rt->thread_pools)) {
546 return;
547 }
548
549#endif
550
551 if (nxt_runtime_is_master(rt)) {
552 if (rt->pid_file != NULL) {
553 nxt_file_delete(rt->pid_file);
554 }
555 }
556
557 if (!engine->event.signal_support) {
558 nxt_event_engine_signals_stop(engine);
559 }
560
561 nxt_runtime_process_each(rt, process) {
562
563 nxt_runtime_process_remove(rt, process);
564
565 } nxt_runtime_process_loop;
566
567 nxt_thread_mutex_destroy(&rt->processes_mutex);
568
569 nxt_mp_destroy(rt->mem_pool);
570
571 nxt_debug(task, "exit");
572
573 exit(0);
574 nxt_unreachable();
575}
576
577
578static nxt_int_t
579nxt_runtime_event_engine_change(nxt_task_t *task, nxt_runtime_t *rt)
580{
581 nxt_event_engine_t *engine;
582 const nxt_event_interface_t *interface;
583
584 engine = task->thread->engine;
585
586 if (engine->batch == rt->batch
587 && nxt_strcmp(engine->event.name, rt->engine) == 0)
588 {
589 return NXT_OK;
590 }
591
592 interface = nxt_service_get(rt->services, "engine", rt->engine);
593
594 if (interface != NULL) {
595 return nxt_event_engine_change(engine, interface, rt->batch);
596 }
597
598 return NXT_ERROR;
599}
600
601
602void
603nxt_runtime_event_engine_free(nxt_runtime_t *rt)
604{
605 nxt_queue_link_t *link;
606 nxt_event_engine_t *engine;
607
608 link = nxt_queue_first(&rt->engines);
609 nxt_queue_remove(link);
610
611 engine = nxt_queue_link_data(link, nxt_event_engine_t, link);
612 nxt_event_engine_free(engine);
613}
614
615
616#if (NXT_THREADS)
617
618static void nxt_runtime_thread_pool_init(void);
619static void nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj,
620 void *data);
621
622
623nxt_int_t
624nxt_runtime_thread_pool_create(nxt_thread_t *thr, nxt_runtime_t *rt,
625 nxt_uint_t max_threads, nxt_nsec_t timeout)
626{
627 nxt_thread_pool_t *thread_pool, **tp;
628
629 tp = nxt_array_add(rt->thread_pools);
630 if (tp == NULL) {
631 return NXT_ERROR;
632 }
633
634 thread_pool = nxt_thread_pool_create(max_threads, timeout,
635 nxt_runtime_thread_pool_init,
636 thr->engine,
637 nxt_runtime_thread_pool_exit);
638
639 if (nxt_fast_path(thread_pool != NULL)) {
640 *tp = thread_pool;
641 }
642
643 return NXT_OK;
644}
645
646
647static void
648nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt,
649 nxt_runtime_cont_t cont)
650{
651 nxt_uint_t n;
652 nxt_thread_pool_t **tp;
653
654 rt->continuation = cont;
655
656 n = rt->thread_pools->nelts;
657
658 if (n == 0) {
659 cont(task);
660 return;
661 }
662
663 tp = rt->thread_pools->elts;
664
665 do {
666 nxt_thread_pool_destroy(*tp);
667
668 tp++;
669 n--;
670 } while (n != 0);
671}
672
673
674static void
675nxt_runtime_thread_pool_init(void)
676{
677#if (NXT_REGEX)
678 nxt_regex_init(0);
679#endif
680}
681
682
683static void
684nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj, void *data)
685{
686 nxt_uint_t i, n;
687 nxt_runtime_t *rt;
688 nxt_thread_pool_t *tp, **thread_pools;
689 nxt_thread_handle_t handle;
690
691 tp = obj;
692
693 if (data != NULL) {
694 handle = (nxt_thread_handle_t) (uintptr_t) data;
695 nxt_thread_wait(handle);
696 }
697
698 rt = task->thread->runtime;
699
700 thread_pools = rt->thread_pools->elts;
701 n = rt->thread_pools->nelts;
702
703 nxt_debug(task, "thread pools: %ui", n);
704
705 for (i = 0; i < n; i++) {
706
707 if (tp == thread_pools[i]) {
708 nxt_array_remove(rt->thread_pools, &thread_pools[i]);
709
710 if (n == 1) {
711 /* The last thread pool. */
712 rt->continuation(task);
713 }
714
715 return;
716 }
717 }
718}
719
720#endif
721
722
723static nxt_int_t
724nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt)
725{
726 nxt_int_t ret;
727 nxt_str_t *prefix;
728 nxt_file_t *file;
729 nxt_file_name_str_t file_name;
730 const nxt_event_interface_t *interface;
731
732 rt->daemon = 1;
733 rt->master_process = 1;
734 rt->engine_connections = 256;
735 rt->worker_processes = 1;
736 rt->auxiliary_threads = 2;
737 rt->user_cred.user = "nobody";
738 rt->group = NULL;
739 rt->pid = "nginext.pid";
740 rt->error_log = "error.log";
741
742 if (nxt_runtime_conf_read_cmd(task, rt) != NXT_OK) {
743 return NXT_ERROR;
744 }
745
746 if (nxt_runtime_controller_socket(task, rt) != NXT_OK) {
747 return NXT_ERROR;
748 }
749
750 if (nxt_user_cred_get(task, &rt->user_cred, rt->group) != NXT_OK) {
751 return NXT_ERROR;
752 }
753
754 /* An engine's parameters. */
755
756 interface = nxt_service_get(rt->services, "engine", rt->engine);
757 if (interface == NULL) {
758 return NXT_ERROR;
759 }
760
761 rt->engine = interface->name;
762
763 prefix = nxt_file_name_is_absolute(rt->pid) ? NULL : rt->prefix;
764
765 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%V%s%Z",
766 prefix, rt->pid);
767 if (nxt_slow_path(ret != NXT_OK)) {
768 return NXT_ERROR;
769 }
770
771 rt->pid_file = file_name.start;
772
773 prefix = nxt_file_name_is_absolute(rt->error_log) ? NULL : rt->prefix;
774
775 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%V%s%Z",
776 prefix, rt->error_log);
777 if (nxt_slow_path(ret != NXT_OK)) {
778 return NXT_ERROR;
779 }
780
781 file = nxt_list_first(rt->log_files);
782 file->name = file_name.start;
783
784 return NXT_OK;
785}
786
787
788static nxt_int_t
789nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt)
790{
791 char *p, **argv;
792 nxt_int_t n;
793 nxt_str_t addr;
794 nxt_sockaddr_t *sa;
795
796 argv = nxt_process_argv;
797
798 while (*argv != NULL) {
799 p = *argv++;
800
801 if (nxt_strcmp(p, "--listen") == 0) {
802 if (*argv == NULL) {
803 nxt_log(task, NXT_LOG_CRIT,
804 "no argument for option \"--listen\"");
805 return NXT_ERROR;
806 }
807
808 p = *argv++;
809
810 addr.length = nxt_strlen(p);
811 addr.start = (u_char *) p;
812
813 sa = nxt_runtime_sockaddr_parse(task, rt->mem_pool, &addr);
814
815 if (sa == NULL) {
816 return NXT_ERROR;
817 }
818
819 rt->controller_listen = sa;
820
821 continue;
822 }
823
824 if (nxt_strcmp(p, "--upstream") == 0) {
825 if (*argv == NULL) {
826 nxt_log(task, NXT_LOG_CRIT,
827 "no argument for option \"--upstream\"");
828 return NXT_ERROR;
829 }
830
831 p = *argv++;
832
833 rt->upstream.length = nxt_strlen(p);
834 rt->upstream.start = (u_char *) p;
835
836 continue;
837 }
838
839 if (nxt_strcmp(p, "--workers") == 0) {
840 if (*argv == NULL) {
841 nxt_log(task, NXT_LOG_CRIT,
842 "no argument for option \"--workers\"");
843 return NXT_ERROR;
844 }
845
846 p = *argv++;
847 n = nxt_int_parse((u_char *) p, nxt_strlen(p));
848
849 if (n < 1) {
850 nxt_log(task, NXT_LOG_CRIT,
851 "invalid number of workers: \"%s\"", p);
852 return NXT_ERROR;
853 }
854
855 rt->worker_processes = n;
856
857 continue;
858 }
859
860 if (nxt_strcmp(p, "--user") == 0) {
861 if (*argv == NULL) {
862 nxt_log(task, NXT_LOG_CRIT,
863 "no argument for option \"--user\"");
864 return NXT_ERROR;
865 }
866
867 p = *argv++;
868
869 rt->user_cred.user = p;
870
871 continue;
872 }
873
874 if (nxt_strcmp(p, "--group") == 0) {
875 if (*argv == NULL) {
876 nxt_log(task, NXT_LOG_CRIT,
877 "no argument for option \"--group\"");
878 return NXT_ERROR;
879 }
880
881 p = *argv++;
882
883 rt->group = p;
884
885 continue;
886 }
887
888 if (nxt_strcmp(p, "--pid") == 0) {
889 if (*argv == NULL) {
890 nxt_log(task, NXT_LOG_CRIT,
891 "no argument for option \"--pid\"");
892 return NXT_ERROR;
893 }
894
895 p = *argv++;
896
897 rt->pid = p;
898
899 continue;
900 }
901
902 if (nxt_strcmp(p, "--log") == 0) {
903 if (*argv == NULL) {
904 nxt_log(task, NXT_LOG_CRIT,
905 "no argument for option \"--log\"");
906 return NXT_ERROR;
907 }
908
909 p = *argv++;
910
911 rt->error_log = p;
912
913 continue;
914 }
915
916 if (nxt_strcmp(p, "--no-daemonize") == 0) {
917 rt->daemon = 0;
918 continue;
919 }
920 }
921
922 return NXT_OK;
923}
924
925
926static nxt_sockaddr_t *
927nxt_runtime_sockaddr_parse(nxt_task_t *task, nxt_mp_t *mp, nxt_str_t *addr)
928{
929 u_char *p;
930 size_t length;
931
932 length = addr->length;
933 p = addr->start;
934
935 if (length >= 5 && nxt_memcmp(p, "unix:", 5) == 0) {
936 return nxt_runtime_sockaddr_unix_parse(task, mp, addr);
937 }
938
939 if (length != 0 && *p == '[') {
940 return nxt_runtime_sockaddr_inet6_parse(task, mp, addr);
941 }
942
943 return nxt_runtime_sockaddr_inet_parse(task, mp, addr);
944}
945
946
947static nxt_sockaddr_t *
948nxt_runtime_sockaddr_unix_parse(nxt_task_t *task, nxt_mp_t *mp, nxt_str_t *addr)
949{
950#if (NXT_HAVE_UNIX_DOMAIN)
951 u_char *p;
952 size_t length, socklen;
953 nxt_sockaddr_t *sa;
954
955 /*
956 * Actual sockaddr_un length can be lesser or even larger than defined
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_main.h>
9#include <nxt_runtime.h>
10#include <nxt_port.h>
11#include <nxt_master_process.h>
12#include <nxt_router.h>
13
14
15static nxt_int_t nxt_runtime_inherited_listen_sockets(nxt_task_t *task,
16 nxt_runtime_t *rt);
17static nxt_int_t nxt_runtime_systemd_listen_sockets(nxt_task_t *task,
18 nxt_runtime_t *rt);
19static nxt_int_t nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt);
20static nxt_int_t nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt);
21static void nxt_runtime_start(nxt_task_t *task, void *obj, void *data);
22static void nxt_runtime_initial_start(nxt_task_t *task);
23static void nxt_single_process_start(nxt_thread_t *thr, nxt_task_t *task,
24 nxt_runtime_t *rt);
25static void nxt_runtime_close_idle_connections(nxt_event_engine_t *engine);
26static void nxt_runtime_exit(nxt_task_t *task, void *obj, void *data);
27static nxt_int_t nxt_runtime_event_engine_change(nxt_task_t *task,
28 nxt_runtime_t *rt);
29static nxt_int_t nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt);
30static nxt_int_t nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt);
31static nxt_sockaddr_t *nxt_runtime_sockaddr_parse(nxt_task_t *task,
32 nxt_mp_t *mp, nxt_str_t *addr);
33static nxt_sockaddr_t *nxt_runtime_sockaddr_unix_parse(nxt_task_t *task,
34 nxt_mp_t *mp, nxt_str_t *addr);
35static nxt_sockaddr_t *nxt_runtime_sockaddr_inet6_parse(nxt_task_t *task,
36 nxt_mp_t *mp, nxt_str_t *addr);
37static nxt_sockaddr_t *nxt_runtime_sockaddr_inet_parse(nxt_task_t *task,
38 nxt_mp_t *mp, nxt_str_t *addr);
39static nxt_int_t nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt);
40static nxt_int_t nxt_runtime_log_files_init(nxt_runtime_t *rt);
41static nxt_int_t nxt_runtime_log_files_create(nxt_task_t *task,
42 nxt_runtime_t *rt);
43static nxt_int_t nxt_runtime_pid_file_create(nxt_task_t *task,
44 nxt_file_name_t *pid_file);
45
46#if (NXT_THREADS)
47static void nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt,
48 nxt_runtime_cont_t cont);
49#endif
50
51static void nxt_runtime_process_destroy(nxt_runtime_t *rt,
52 nxt_process_t *process);
53static nxt_process_t *nxt_runtime_process_remove_pid(nxt_runtime_t *rt,
54 nxt_pid_t pid);
55
56
57nxt_int_t
58nxt_runtime_create(nxt_task_t *task)
59{
60 nxt_mp_t *mp;
61 nxt_int_t ret;
62 nxt_array_t *listen_sockets;
63 nxt_runtime_t *rt;
64
65 mp = nxt_mp_create(1024, 128, 256, 32);
66
67 if (nxt_slow_path(mp == NULL)) {
68 return NXT_ERROR;
69 }
70
71 rt = nxt_mp_zget(mp, sizeof(nxt_runtime_t));
72 if (nxt_slow_path(rt == NULL)) {
73 return NXT_ERROR;
74 }
75
76 task->thread->runtime = rt;
77 rt->mem_pool = mp;
78
79 nxt_thread_mutex_create(&rt->processes_mutex);
80
81 rt->prefix = nxt_current_directory(mp);
82 if (nxt_slow_path(rt->prefix == NULL)) {
83 goto fail;
84 }
85
86 rt->conf_prefix = rt->prefix;
87
88 rt->services = nxt_services_init(mp);
89 if (nxt_slow_path(rt->services == NULL)) {
90 goto fail;
91 }
92
93 listen_sockets = nxt_array_create(mp, 1, sizeof(nxt_listen_socket_t));
94 if (nxt_slow_path(listen_sockets == NULL)) {
95 goto fail;
96 }
97
98 rt->listen_sockets = listen_sockets;
99
100 ret = nxt_runtime_inherited_listen_sockets(task, rt);
101 if (nxt_slow_path(ret != NXT_OK)) {
102 goto fail;
103 }
104
105 if (nxt_runtime_hostname(task, rt) != NXT_OK) {
106 goto fail;
107 }
108
109 if (nxt_slow_path(nxt_runtime_log_files_init(rt) != NXT_OK)) {
110 goto fail;
111 }
112
113 if (nxt_runtime_event_engines(task, rt) != NXT_OK) {
114 goto fail;
115 }
116
117 if (nxt_slow_path(nxt_runtime_thread_pools(task->thread, rt) != NXT_OK)) {
118 goto fail;
119 }
120
121 rt->start = nxt_runtime_initial_start;
122
123 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
124 nxt_runtime_start, task, rt, NULL);
125
126 return NXT_OK;
127
128fail:
129
130 nxt_mp_destroy(mp);
131
132 return NXT_ERROR;
133}
134
135
136static nxt_int_t
137nxt_runtime_inherited_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt)
138{
139 u_char *v, *p;
140 nxt_int_t type;
141 nxt_array_t *inherited_sockets;
142 nxt_socket_t s;
143 nxt_listen_socket_t *ls;
144
145 v = (u_char *) getenv("NGINX");
146
147 if (v == NULL) {
148 return nxt_runtime_systemd_listen_sockets(task, rt);
149 }
150
151 nxt_log(task, NXT_LOG_CRIT, "using inherited listen sockets: %s", v);
152
153 inherited_sockets = nxt_array_create(rt->mem_pool,
154 1, sizeof(nxt_listen_socket_t));
155 if (inherited_sockets == NULL) {
156 return NXT_ERROR;
157 }
158
159 rt->inherited_sockets = inherited_sockets;
160
161 for (p = v; *p != '\0'; p++) {
162
163 if (*p == ';') {
164 s = nxt_int_parse(v, p - v);
165
166 if (nxt_slow_path(s < 0)) {
167 nxt_log(task, NXT_LOG_CRIT, "invalid socket number "
168 "\"%s\" in NGINX environment variable, "
169 "ignoring the rest of the variable", v);
170 return NXT_ERROR;
171 }
172
173 v = p + 1;
174
175 ls = nxt_array_zero_add(inherited_sockets);
176 if (nxt_slow_path(ls == NULL)) {
177 return NXT_ERROR;
178 }
179
180 ls->socket = s;
181
182 ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s);
183 if (nxt_slow_path(ls->sockaddr == NULL)) {
184 return NXT_ERROR;
185 }
186
187 type = nxt_socket_getsockopt(task, s, SOL_SOCKET, SO_TYPE);
188 if (nxt_slow_path(type == -1)) {
189 return NXT_ERROR;
190 }
191
192 ls->sockaddr->type = (uint16_t) type;
193 }
194 }
195
196 return NXT_OK;
197}
198
199
200static nxt_int_t
201nxt_runtime_systemd_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt)
202{
203 u_char *nfd, *pid;
204 nxt_int_t n;
205 nxt_array_t *inherited_sockets;
206 nxt_socket_t s;
207 nxt_listen_socket_t *ls;
208
209 /*
210 * Number of listening sockets passed. The socket
211 * descriptors start from number 3 and are sequential.
212 */
213 nfd = (u_char *) getenv("LISTEN_FDS");
214 if (nfd == NULL) {
215 return NXT_OK;
216 }
217
218 /* The pid of the service process. */
219 pid = (u_char *) getenv("LISTEN_PID");
220 if (pid == NULL) {
221 return NXT_OK;
222 }
223
224 n = nxt_int_parse(nfd, nxt_strlen(nfd));
225 if (n < 0) {
226 return NXT_OK;
227 }
228
229 if (nxt_pid != nxt_int_parse(pid, nxt_strlen(pid))) {
230 return NXT_OK;
231 }
232
233 nxt_log(task, NXT_LOG_INFO, "using %s systemd listen sockets", n);
234
235 inherited_sockets = nxt_array_create(rt->mem_pool,
236 n, sizeof(nxt_listen_socket_t));
237 if (inherited_sockets == NULL) {
238 return NXT_ERROR;
239 }
240
241 rt->inherited_sockets = inherited_sockets;
242
243 for (s = 3; s < n; s++) {
244 ls = nxt_array_zero_add(inherited_sockets);
245 if (nxt_slow_path(ls == NULL)) {
246 return NXT_ERROR;
247 }
248
249 ls->socket = s;
250
251 ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s);
252 if (nxt_slow_path(ls->sockaddr == NULL)) {
253 return NXT_ERROR;
254 }
255
256 ls->sockaddr->type = SOCK_STREAM;
257 }
258
259 return NXT_OK;
260}
261
262
263static nxt_int_t
264nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt)
265{
266 nxt_thread_t *thread;
267 nxt_event_engine_t *engine;
268 const nxt_event_interface_t *interface;
269
270 interface = nxt_service_get(rt->services, "engine", NULL);
271
272 if (nxt_slow_path(interface == NULL)) {
273 /* TODO: log */
274 return NXT_ERROR;
275 }
276
277 engine = nxt_event_engine_create(task, interface,
278 nxt_master_process_signals, 0, 0);
279
280 if (nxt_slow_path(engine == NULL)) {
281 return NXT_ERROR;
282 }
283
284 thread = task->thread;
285 thread->engine = engine;
286 thread->fiber = &engine->fibers->fiber;
287
288 engine->id = rt->last_engine_id++;
289
290 nxt_queue_init(&rt->engines);
291 nxt_queue_insert_tail(&rt->engines, &engine->link);
292
293 return NXT_OK;
294}
295
296
297static nxt_int_t
298nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt)
299{
300#if (NXT_THREADS)
301 nxt_int_t ret;
302 nxt_array_t *thread_pools;
303
304 thread_pools = nxt_array_create(rt->mem_pool, 1,
305 sizeof(nxt_thread_pool_t *));
306
307 if (nxt_slow_path(thread_pools == NULL)) {
308 return NXT_ERROR;
309 }
310
311 rt->thread_pools = thread_pools;
312 ret = nxt_runtime_thread_pool_create(thr, rt, 2, 60000 * 1000000LL);
313
314 if (nxt_slow_path(ret != NXT_OK)) {
315 return NXT_ERROR;
316 }
317
318#endif
319
320 return NXT_OK;
321}
322
323
324static void
325nxt_runtime_start(nxt_task_t *task, void *obj, void *data)
326{
327 nxt_uint_t i;
328 nxt_runtime_t *rt;
329
330 rt = obj;
331
332 nxt_debug(task, "rt conf done");
333
334 task->thread->log->ctx_handler = NULL;
335 task->thread->log->ctx = NULL;
336
337 if (nxt_runtime_conf_init(task, rt) != NXT_OK) {
338 goto fail;
339 }
340
341 for (i = 0; i < nxt_init_modules_n; i++) {
342 if (nxt_init_modules[i](task->thread, rt) != NXT_OK) {
343 goto fail;
344 }
345 }
346
347 if (nxt_runtime_log_files_create(task, rt) != NXT_OK) {
348 goto fail;
349 }
350
351 if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) {
352 goto fail;
353 }
354
355#if (NXT_THREADS)
356
357 /*
358 * Thread pools should be destroyed before starting worker
359 * processes, because thread pool semaphores will stick in
360 * locked state in new processes after fork().
361 */
362 nxt_runtime_thread_pool_destroy(task, rt, rt->start);
363
364#else
365
366 rt->start(task->thread, rt);
367
368#endif
369
370 return;
371
372fail:
373
374 nxt_runtime_quit(task);
375}
376
377
378static void
379nxt_runtime_initial_start(nxt_task_t *task)
380{
381 nxt_int_t ret;
382 nxt_thread_t *thr;
383 nxt_runtime_t *rt;
384 const nxt_event_interface_t *interface;
385
386 thr = task->thread;
387 rt = thr->runtime;
388
389 if (rt->inherited_sockets == NULL && rt->daemon) {
390
391 if (nxt_process_daemon(task) != NXT_OK) {
392 goto fail;
393 }
394
395 /*
396 * An event engine should be updated after fork()
397 * even if an event facility was not changed because:
398 * 1) inherited kqueue descriptor is invalid,
399 * 2) the signal thread is not inherited.
400 */
401 interface = nxt_service_get(rt->services, "engine", rt->engine);
402 if (interface == NULL) {
403 goto fail;
404 }
405
406 ret = nxt_event_engine_change(task->thread->engine, interface,
407 rt->batch);
408 if (ret != NXT_OK) {
409 goto fail;
410 }
411 }
412
413 ret = nxt_runtime_pid_file_create(task, rt->pid_file);
414 if (ret != NXT_OK) {
415 goto fail;
416 }
417
418 if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) {
419 goto fail;
420 }
421
422 thr->engine->max_connections = rt->engine_connections;
423
424 if (rt->master_process) {
425 if (nxt_master_process_start(thr, task, rt) != NXT_ERROR) {
426 return;
427 }
428
429 } else {
430 nxt_single_process_start(thr, task, rt);
431 return;
432 }
433
434fail:
435
436 nxt_runtime_quit(task);
437}
438
439
440static void
441nxt_single_process_start(nxt_thread_t *thr, nxt_task_t *task, nxt_runtime_t *rt)
442{
443#if (NXT_THREADS)
444 nxt_int_t ret;
445
446 ret = nxt_runtime_thread_pool_create(thr, rt, rt->auxiliary_threads,
447 60000 * 1000000LL);
448
449 if (nxt_slow_path(ret != NXT_OK)) {
450 nxt_runtime_quit(task);
451 return;
452 }
453
454#endif
455
456 rt->types |= (1U << NXT_PROCESS_SINGLE);
457
458 nxt_runtime_listen_sockets_enable(task, rt);
459
460 return;
461}
462
463
464void
465nxt_runtime_quit(nxt_task_t *task)
466{
467 nxt_bool_t done;
468 nxt_runtime_t *rt;
469 nxt_event_engine_t *engine;
470
471 rt = task->thread->runtime;
472 engine = task->thread->engine;
473
474 nxt_debug(task, "exiting");
475
476 done = 1;
477
478 if (!engine->shutdown) {
479 engine->shutdown = 1;
480
481#if (NXT_THREADS)
482
483 if (!nxt_array_is_empty(rt->thread_pools)) {
484 nxt_runtime_thread_pool_destroy(task, rt, nxt_runtime_quit);
485 done = 0;
486 }
487
488#endif
489
490 if (nxt_runtime_is_master(rt)) {
491 nxt_master_stop_worker_processes(task, rt);
492 done = 0;
493 }
494 }
495
496 nxt_runtime_close_idle_connections(engine);
497
498 if (done) {
499 nxt_work_queue_add(&engine->fast_work_queue, nxt_runtime_exit,
500 task, rt, engine);
501 }
502}
503
504
505static void
506nxt_runtime_close_idle_connections(nxt_event_engine_t *engine)
507{
508 nxt_conn_t *c;
509 nxt_queue_t *idle;
510 nxt_queue_link_t *link, *next;
511
512 nxt_debug(&engine->task, "close idle connections");
513
514 idle = &engine->idle_connections;
515
516 for (link = nxt_queue_head(idle);
517 link != nxt_queue_tail(idle);
518 link = next)
519 {
520 next = nxt_queue_next(link);
521 c = nxt_queue_link_data(link, nxt_conn_t, link);
522
523 if (!c->socket.read_ready) {
524 nxt_queue_remove(link);
525 nxt_conn_close(engine, c);
526 }
527 }
528}
529
530
531static void
532nxt_runtime_exit(nxt_task_t *task, void *obj, void *data)
533{
534 nxt_runtime_t *rt;
535 nxt_process_t *process;
536 nxt_event_engine_t *engine;
537
538 rt = obj;
539 engine = data;
540
541#if (NXT_THREADS)
542
543 nxt_debug(task, "thread pools: %d", rt->thread_pools->nelts);
544
545 if (!nxt_array_is_empty(rt->thread_pools)) {
546 return;
547 }
548
549#endif
550
551 if (nxt_runtime_is_master(rt)) {
552 if (rt->pid_file != NULL) {
553 nxt_file_delete(rt->pid_file);
554 }
555 }
556
557 if (!engine->event.signal_support) {
558 nxt_event_engine_signals_stop(engine);
559 }
560
561 nxt_runtime_process_each(rt, process) {
562
563 nxt_runtime_process_remove(rt, process);
564
565 } nxt_runtime_process_loop;
566
567 nxt_thread_mutex_destroy(&rt->processes_mutex);
568
569 nxt_mp_destroy(rt->mem_pool);
570
571 nxt_debug(task, "exit");
572
573 exit(0);
574 nxt_unreachable();
575}
576
577
578static nxt_int_t
579nxt_runtime_event_engine_change(nxt_task_t *task, nxt_runtime_t *rt)
580{
581 nxt_event_engine_t *engine;
582 const nxt_event_interface_t *interface;
583
584 engine = task->thread->engine;
585
586 if (engine->batch == rt->batch
587 && nxt_strcmp(engine->event.name, rt->engine) == 0)
588 {
589 return NXT_OK;
590 }
591
592 interface = nxt_service_get(rt->services, "engine", rt->engine);
593
594 if (interface != NULL) {
595 return nxt_event_engine_change(engine, interface, rt->batch);
596 }
597
598 return NXT_ERROR;
599}
600
601
602void
603nxt_runtime_event_engine_free(nxt_runtime_t *rt)
604{
605 nxt_queue_link_t *link;
606 nxt_event_engine_t *engine;
607
608 link = nxt_queue_first(&rt->engines);
609 nxt_queue_remove(link);
610
611 engine = nxt_queue_link_data(link, nxt_event_engine_t, link);
612 nxt_event_engine_free(engine);
613}
614
615
616#if (NXT_THREADS)
617
618static void nxt_runtime_thread_pool_init(void);
619static void nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj,
620 void *data);
621
622
623nxt_int_t
624nxt_runtime_thread_pool_create(nxt_thread_t *thr, nxt_runtime_t *rt,
625 nxt_uint_t max_threads, nxt_nsec_t timeout)
626{
627 nxt_thread_pool_t *thread_pool, **tp;
628
629 tp = nxt_array_add(rt->thread_pools);
630 if (tp == NULL) {
631 return NXT_ERROR;
632 }
633
634 thread_pool = nxt_thread_pool_create(max_threads, timeout,
635 nxt_runtime_thread_pool_init,
636 thr->engine,
637 nxt_runtime_thread_pool_exit);
638
639 if (nxt_fast_path(thread_pool != NULL)) {
640 *tp = thread_pool;
641 }
642
643 return NXT_OK;
644}
645
646
647static void
648nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt,
649 nxt_runtime_cont_t cont)
650{
651 nxt_uint_t n;
652 nxt_thread_pool_t **tp;
653
654 rt->continuation = cont;
655
656 n = rt->thread_pools->nelts;
657
658 if (n == 0) {
659 cont(task);
660 return;
661 }
662
663 tp = rt->thread_pools->elts;
664
665 do {
666 nxt_thread_pool_destroy(*tp);
667
668 tp++;
669 n--;
670 } while (n != 0);
671}
672
673
674static void
675nxt_runtime_thread_pool_init(void)
676{
677#if (NXT_REGEX)
678 nxt_regex_init(0);
679#endif
680}
681
682
683static void
684nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj, void *data)
685{
686 nxt_uint_t i, n;
687 nxt_runtime_t *rt;
688 nxt_thread_pool_t *tp, **thread_pools;
689 nxt_thread_handle_t handle;
690
691 tp = obj;
692
693 if (data != NULL) {
694 handle = (nxt_thread_handle_t) (uintptr_t) data;
695 nxt_thread_wait(handle);
696 }
697
698 rt = task->thread->runtime;
699
700 thread_pools = rt->thread_pools->elts;
701 n = rt->thread_pools->nelts;
702
703 nxt_debug(task, "thread pools: %ui", n);
704
705 for (i = 0; i < n; i++) {
706
707 if (tp == thread_pools[i]) {
708 nxt_array_remove(rt->thread_pools, &thread_pools[i]);
709
710 if (n == 1) {
711 /* The last thread pool. */
712 rt->continuation(task);
713 }
714
715 return;
716 }
717 }
718}
719
720#endif
721
722
723static nxt_int_t
724nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt)
725{
726 nxt_int_t ret;
727 nxt_str_t *prefix;
728 nxt_file_t *file;
729 nxt_file_name_str_t file_name;
730 const nxt_event_interface_t *interface;
731
732 rt->daemon = 1;
733 rt->master_process = 1;
734 rt->engine_connections = 256;
735 rt->worker_processes = 1;
736 rt->auxiliary_threads = 2;
737 rt->user_cred.user = "nobody";
738 rt->group = NULL;
739 rt->pid = "nginext.pid";
740 rt->error_log = "error.log";
741
742 if (nxt_runtime_conf_read_cmd(task, rt) != NXT_OK) {
743 return NXT_ERROR;
744 }
745
746 if (nxt_runtime_controller_socket(task, rt) != NXT_OK) {
747 return NXT_ERROR;
748 }
749
750 if (nxt_user_cred_get(task, &rt->user_cred, rt->group) != NXT_OK) {
751 return NXT_ERROR;
752 }
753
754 /* An engine's parameters. */
755
756 interface = nxt_service_get(rt->services, "engine", rt->engine);
757 if (interface == NULL) {
758 return NXT_ERROR;
759 }
760
761 rt->engine = interface->name;
762
763 prefix = nxt_file_name_is_absolute(rt->pid) ? NULL : rt->prefix;
764
765 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%V%s%Z",
766 prefix, rt->pid);
767 if (nxt_slow_path(ret != NXT_OK)) {
768 return NXT_ERROR;
769 }
770
771 rt->pid_file = file_name.start;
772
773 prefix = nxt_file_name_is_absolute(rt->error_log) ? NULL : rt->prefix;
774
775 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%V%s%Z",
776 prefix, rt->error_log);
777 if (nxt_slow_path(ret != NXT_OK)) {
778 return NXT_ERROR;
779 }
780
781 file = nxt_list_first(rt->log_files);
782 file->name = file_name.start;
783
784 return NXT_OK;
785}
786
787
788static nxt_int_t
789nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt)
790{
791 char *p, **argv;
792 nxt_int_t n;
793 nxt_str_t addr;
794 nxt_sockaddr_t *sa;
795
796 argv = nxt_process_argv;
797
798 while (*argv != NULL) {
799 p = *argv++;
800
801 if (nxt_strcmp(p, "--listen") == 0) {
802 if (*argv == NULL) {
803 nxt_log(task, NXT_LOG_CRIT,
804 "no argument for option \"--listen\"");
805 return NXT_ERROR;
806 }
807
808 p = *argv++;
809
810 addr.length = nxt_strlen(p);
811 addr.start = (u_char *) p;
812
813 sa = nxt_runtime_sockaddr_parse(task, rt->mem_pool, &addr);
814
815 if (sa == NULL) {
816 return NXT_ERROR;
817 }
818
819 rt->controller_listen = sa;
820
821 continue;
822 }
823
824 if (nxt_strcmp(p, "--upstream") == 0) {
825 if (*argv == NULL) {
826 nxt_log(task, NXT_LOG_CRIT,
827 "no argument for option \"--upstream\"");
828 return NXT_ERROR;
829 }
830
831 p = *argv++;
832
833 rt->upstream.length = nxt_strlen(p);
834 rt->upstream.start = (u_char *) p;
835
836 continue;
837 }
838
839 if (nxt_strcmp(p, "--workers") == 0) {
840 if (*argv == NULL) {
841 nxt_log(task, NXT_LOG_CRIT,
842 "no argument for option \"--workers\"");
843 return NXT_ERROR;
844 }
845
846 p = *argv++;
847 n = nxt_int_parse((u_char *) p, nxt_strlen(p));
848
849 if (n < 1) {
850 nxt_log(task, NXT_LOG_CRIT,
851 "invalid number of workers: \"%s\"", p);
852 return NXT_ERROR;
853 }
854
855 rt->worker_processes = n;
856
857 continue;
858 }
859
860 if (nxt_strcmp(p, "--user") == 0) {
861 if (*argv == NULL) {
862 nxt_log(task, NXT_LOG_CRIT,
863 "no argument for option \"--user\"");
864 return NXT_ERROR;
865 }
866
867 p = *argv++;
868
869 rt->user_cred.user = p;
870
871 continue;
872 }
873
874 if (nxt_strcmp(p, "--group") == 0) {
875 if (*argv == NULL) {
876 nxt_log(task, NXT_LOG_CRIT,
877 "no argument for option \"--group\"");
878 return NXT_ERROR;
879 }
880
881 p = *argv++;
882
883 rt->group = p;
884
885 continue;
886 }
887
888 if (nxt_strcmp(p, "--pid") == 0) {
889 if (*argv == NULL) {
890 nxt_log(task, NXT_LOG_CRIT,
891 "no argument for option \"--pid\"");
892 return NXT_ERROR;
893 }
894
895 p = *argv++;
896
897 rt->pid = p;
898
899 continue;
900 }
901
902 if (nxt_strcmp(p, "--log") == 0) {
903 if (*argv == NULL) {
904 nxt_log(task, NXT_LOG_CRIT,
905 "no argument for option \"--log\"");
906 return NXT_ERROR;
907 }
908
909 p = *argv++;
910
911 rt->error_log = p;
912
913 continue;
914 }
915
916 if (nxt_strcmp(p, "--no-daemonize") == 0) {
917 rt->daemon = 0;
918 continue;
919 }
920 }
921
922 return NXT_OK;
923}
924
925
926static nxt_sockaddr_t *
927nxt_runtime_sockaddr_parse(nxt_task_t *task, nxt_mp_t *mp, nxt_str_t *addr)
928{
929 u_char *p;
930 size_t length;
931
932 length = addr->length;
933 p = addr->start;
934
935 if (length >= 5 && nxt_memcmp(p, "unix:", 5) == 0) {
936 return nxt_runtime_sockaddr_unix_parse(task, mp, addr);
937 }
938
939 if (length != 0 && *p == '[') {
940 return nxt_runtime_sockaddr_inet6_parse(task, mp, addr);
941 }
942
943 return nxt_runtime_sockaddr_inet_parse(task, mp, addr);
944}
945
946
947static nxt_sockaddr_t *
948nxt_runtime_sockaddr_unix_parse(nxt_task_t *task, nxt_mp_t *mp, nxt_str_t *addr)
949{
950#if (NXT_HAVE_UNIX_DOMAIN)
951 u_char *p;
952 size_t length, socklen;
953 nxt_sockaddr_t *sa;
954
955 /*
956 * Actual sockaddr_un length can be lesser or even larger than defined
957 * struct sockaddr_un length (see comment in unix/nxt_socket.h). So
957 * struct sockaddr_un length (see comment in nxt_socket.h). So
958 * limit maximum Unix domain socket address length by defined sun_path[]
959 * length because some OSes accept addresses twice larger than defined
960 * struct sockaddr_un. Also reserve space for a trailing zero to avoid
961 * ambiguity, since many OSes accept Unix domain socket addresses
962 * without a trailing zero.
963 */
964 const size_t max_len = sizeof(struct sockaddr_un)
965 - offsetof(struct sockaddr_un, sun_path) - 1;
966
967 /* cutting "unix:" */
968 length = addr->length - 5;
969 p = addr->start + 5;
970
971 if (length == 0) {
972 nxt_log(task, NXT_LOG_CRIT,
973 "unix domain socket \"%V\" name is invalid", addr);
974 return NULL;
975 }
976
977 if (length > max_len) {
978 nxt_log(task, NXT_LOG_CRIT,
979 "unix domain socket \"%V\" name is too long", addr);
980 return NULL;
981 }
982
983 socklen = offsetof(struct sockaddr_un, sun_path) + length + 1;
984
985#if (NXT_LINUX)
986
987 /*
988 * Linux unix(7):
989 *
990 * abstract: an abstract socket address is distinguished by the fact
991 * that sun_path[0] is a null byte ('\0'). The socket's address in
992 * this namespace is given by the additional bytes in sun_path that
993 * are covered by the specified length of the address structure.
994 * (Null bytes in the name have no special significance.)
995 */
996 if (p[0] == '@') {
997 p[0] = '\0';
998 socklen--;
999 }
1000
1001#endif
1002
1003 sa = nxt_sockaddr_alloc(mp, socklen, addr->length);
1004
1005 if (nxt_slow_path(sa == NULL)) {
1006 return NULL;
1007 }
1008
1009 sa->type = SOCK_STREAM;
1010
1011 sa->u.sockaddr_un.sun_family = AF_UNIX;
1012 nxt_memcpy(sa->u.sockaddr_un.sun_path, p, length);
1013
1014 return sa;
1015
1016#else /* !(NXT_HAVE_UNIX_DOMAIN) */
1017
1018 nxt_log(task, NXT_LOG_CRIT, "unix domain socket \"%V\" is not supported",
1019 addr);
1020
1021 return NULL;
1022
1023#endif
1024}
1025
1026
1027static nxt_sockaddr_t *
1028nxt_runtime_sockaddr_inet6_parse(nxt_task_t *task, nxt_mp_t *mp,
1029 nxt_str_t *addr)
1030{
1031#if (NXT_INET6)
1032 u_char *p, *addr_end;
1033 size_t length;
1034 nxt_int_t port;
1035 nxt_sockaddr_t *sa;
1036 struct in6_addr *in6_addr;
1037
1038 length = addr->length - 1;
1039 p = addr->start + 1;
1040
1041 addr_end = nxt_memchr(p, ']', length);
1042
1043 if (addr_end == NULL) {
1044 goto invalid_address;
1045 }
1046
1047 sa = nxt_sockaddr_alloc(mp, sizeof(struct sockaddr_in6),
1048 NXT_INET6_ADDR_STR_LEN);
1049 if (nxt_slow_path(sa == NULL)) {
1050 return NULL;
1051 }
1052
1053 in6_addr = &sa->u.sockaddr_in6.sin6_addr;
1054
1055 if (nxt_inet6_addr(in6_addr, p, addr_end - p) != NXT_OK) {
1056 goto invalid_address;
1057 }
1058
1059 port = 0;
1060 p = addr_end + 1;
1061 length = (p + length) - p;
1062
1063 if (length == 0) {
1064 goto found;
1065 }
1066
1067 if (*p == ':') {
1068 port = nxt_int_parse(p + 1, length - 1);
1069
1070 if (port >= 1 && port <= 65535) {
1071 goto found;
1072 }
1073 }
1074
1075 nxt_log(task, NXT_LOG_CRIT, "invalid port in \"%V\"", addr);
1076
1077 return NULL;
1078
1079found:
1080
1081 sa->type = SOCK_STREAM;
1082
1083 sa->u.sockaddr_in6.sin6_family = AF_INET6;
1084 sa->u.sockaddr_in6.sin6_port = htons((in_port_t) port);
1085
1086 return sa;
1087
1088invalid_address:
1089
1090 nxt_log(task, NXT_LOG_CRIT, "invalid IPv6 address in \"%V\"", addr);
1091
1092 return NULL;
1093
1094#else
1095
1096 nxt_log(task, NXT_LOG_CRIT, "IPv6 socket \"%V\" is not supported", addr);
1097
1098 return NULL;
1099
1100#endif
1101}
1102
1103
1104static nxt_sockaddr_t *
1105nxt_runtime_sockaddr_inet_parse(nxt_task_t *task, nxt_mp_t *mp,
1106 nxt_str_t *string)
1107{
1108 u_char *p, *ip;
1109 size_t length;
1110 in_addr_t addr;
1111 nxt_int_t port;
1112 nxt_sockaddr_t *sa;
1113
1114 addr = INADDR_ANY;
1115
1116 length = string->length;
1117 ip = string->start;
1118
1119 p = nxt_memchr(ip, ':', length);
1120
1121 if (p == NULL) {
1122
1123 /* single value port, or address */
1124
1125 port = nxt_int_parse(ip, length);
1126
1127 if (port > 0) {
1128 /* "*:XX" */
1129
1130 if (port < 1 || port > 65535) {
1131 goto invalid_port;
1132 }
1133
1134 } else {
1135 /* "x.x.x.x" */
1136
1137 addr = nxt_inet_addr(ip, length);
1138
1139 if (addr == INADDR_NONE) {
1140 goto invalid_port;
1141 }
1142
1143 port = 8080;
1144 }
1145
1146 } else {
1147
1148 /* x.x.x.x:XX */
1149
1150 p++;
1151 length = (ip + length) - p;
1152 port = nxt_int_parse(p, length);
1153
1154 if (port < 1 || port > 65535) {
1155 goto invalid_port;
1156 }
1157
1158 length = (p - 1) - ip;
1159
1160 if (length != 1 || ip[0] != '*') {
1161 addr = nxt_inet_addr(ip, length);
1162
1163 if (addr == INADDR_NONE) {
1164 goto invalid_addr;
1165 }
1166
1167 /* "x.x.x.x:XX" */
1168 }
1169 }
1170
1171 sa = nxt_sockaddr_alloc(mp, sizeof(struct sockaddr_in),
1172 NXT_INET_ADDR_STR_LEN);
1173 if (nxt_slow_path(sa == NULL)) {
1174 return NULL;
1175 }
1176
1177 sa->type = SOCK_STREAM;
1178
1179 sa->u.sockaddr_in.sin_family = AF_INET;
1180 sa->u.sockaddr_in.sin_port = htons((in_port_t) port);
1181 sa->u.sockaddr_in.sin_addr.s_addr = addr;
1182
1183 return sa;
1184
1185invalid_port:
1186
1187 nxt_log(task, NXT_LOG_CRIT, "invalid port in \"%V\"", string);
1188
1189 return NULL;
1190
1191invalid_addr:
1192
1193 nxt_log(task, NXT_LOG_CRIT, "invalid address in \"%V\"", string);
1194
1195 return NULL;
1196}
1197
1198
1199nxt_listen_socket_t *
1200nxt_runtime_listen_socket_add(nxt_runtime_t *rt, nxt_sockaddr_t *sa)
1201{
1202 nxt_mp_t *mp;
1203 nxt_listen_socket_t *ls;
1204
1205 ls = nxt_array_zero_add(rt->listen_sockets);
1206 if (ls == NULL) {
1207 return NULL;
1208 }
1209
1210 mp = rt->mem_pool;
1211
1212 ls->sockaddr = nxt_sockaddr_create(mp, &sa->u.sockaddr, sa->socklen,
1213 sa->length);
1214 if (ls->sockaddr == NULL) {
1215 return NULL;
1216 }
1217
1218 ls->sockaddr->type = sa->type;
1219
1220 nxt_sockaddr_text(ls->sockaddr);
1221
1222 ls->socket = -1;
1223 ls->backlog = NXT_LISTEN_BACKLOG;
1224
1225 return ls;
1226}
1227
1228
1229static nxt_int_t
1230nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt)
1231{
1232 size_t length;
1233 char hostname[NXT_MAXHOSTNAMELEN + 1];
1234
1235 if (gethostname(hostname, NXT_MAXHOSTNAMELEN) != 0) {
1236 nxt_log(task, NXT_LOG_CRIT, "gethostname() failed %E", nxt_errno);
1237 return NXT_ERROR;
1238 }
1239
1240 /*
1241 * Linux gethostname(2):
1242 *
1243 * If the null-terminated hostname is too large to fit,
1244 * then the name is truncated, and no error is returned.
1245 *
1246 * For this reason an additional byte is reserved in the buffer.
1247 */
1248 hostname[NXT_MAXHOSTNAMELEN] = '\0';
1249
1250 length = nxt_strlen(hostname);
1251 rt->hostname.length = length;
1252
1253 rt->hostname.start = nxt_mp_nget(rt->mem_pool, length);
1254
1255 if (rt->hostname.start != NULL) {
1256 nxt_memcpy_lowcase(rt->hostname.start, (u_char *) hostname, length);
1257 return NXT_OK;
1258 }
1259
1260 return NXT_ERROR;
1261}
1262
1263
1264static nxt_int_t
1265nxt_runtime_log_files_init(nxt_runtime_t *rt)
1266{
1267 nxt_file_t *file;
1268 nxt_list_t *log_files;
1269
1270 log_files = nxt_list_create(rt->mem_pool, 1, sizeof(nxt_file_t));
1271
1272 if (nxt_fast_path(log_files != NULL)) {
1273 rt->log_files = log_files;
1274
1275 /* Preallocate the main error_log. This allocation cannot fail. */
1276 file = nxt_list_zero_add(log_files);
1277
1278 file->fd = NXT_FILE_INVALID;
1279 file->log_level = NXT_LOG_CRIT;
1280
1281 return NXT_OK;
1282 }
1283
1284 return NXT_ERROR;
1285}
1286
1287
1288nxt_file_t *
1289nxt_runtime_log_file_add(nxt_runtime_t *rt, nxt_str_t *name)
1290{
1291 nxt_int_t ret;
1292 nxt_str_t *prefix;
1293 nxt_file_t *file;
1294 nxt_file_name_str_t file_name;
1295
1296 prefix = nxt_file_name_is_absolute(name->start) ? NULL : rt->prefix;
1297
1298 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%V%V%Z",
1299 prefix, name);
1300
1301 if (nxt_slow_path(ret != NXT_OK)) {
1302 return NULL;
1303 }
1304
1305 nxt_list_each(file, rt->log_files) {
1306
1307 /* STUB: hardecoded case sensitive/insensitive. */
1308
1309 if (file->name != NULL
1310 && nxt_file_name_eq(file->name, file_name.start))
1311 {
1312 return file;
1313 }
1314
1315 } nxt_list_loop;
1316
1317 file = nxt_list_zero_add(rt->log_files);
1318
1319 if (nxt_slow_path(file == NULL)) {
1320 return NULL;
1321 }
1322
1323 file->fd = NXT_FILE_INVALID;
1324 file->log_level = NXT_LOG_CRIT;
1325 file->name = file_name.start;
1326
1327 return file;
1328}
1329
1330
1331static nxt_int_t
1332nxt_runtime_log_files_create(nxt_task_t *task, nxt_runtime_t *rt)
1333{
1334 nxt_int_t ret;
1335 nxt_file_t *file;
1336
1337 nxt_list_each(file, rt->log_files) {
1338
1339 ret = nxt_file_open(task, file, O_WRONLY | O_APPEND, O_CREAT,
1340 NXT_FILE_OWNER_ACCESS);
1341
1342 if (ret != NXT_OK) {
1343 return NXT_ERROR;
1344 }
1345
1346 } nxt_list_loop;
1347
1348 file = nxt_list_first(rt->log_files);
1349
1350 return nxt_file_stderr(file);
1351}
1352
1353
1354nxt_int_t
1355nxt_runtime_listen_sockets_create(nxt_task_t *task, nxt_runtime_t *rt)
1356{
1357 nxt_int_t ret;
1358 nxt_uint_t c, p, ncurr, nprev;
1359 nxt_listen_socket_t *curr, *prev;
1360
1361 curr = rt->listen_sockets->elts;
1362 ncurr = rt->listen_sockets->nelts;
1363
1364 if (rt->inherited_sockets != NULL) {
1365 prev = rt->inherited_sockets->elts;
1366 nprev = rt->inherited_sockets->nelts;
1367
1368 } else {
1369 prev = NULL;
1370 nprev = 0;
1371 }
1372
1373 for (c = 0; c < ncurr; c++) {
1374
1375 for (p = 0; p < nprev; p++) {
1376
1377 if (nxt_sockaddr_cmp(curr[c].sockaddr, prev[p].sockaddr)) {
1378
1379 ret = nxt_listen_socket_update(task, &curr[c], &prev[p]);
1380 if (ret != NXT_OK) {
1381 return NXT_ERROR;
1382 }
1383
1384 goto next;
1385 }
1386 }
1387
1388 if (nxt_listen_socket_create(task, &curr[c], 0) != NXT_OK) {
1389 return NXT_ERROR;
1390 }
1391
1392 next:
1393
1394 continue;
1395 }
1396
1397 return NXT_OK;
1398}
1399
1400
1401nxt_int_t
1402nxt_runtime_listen_sockets_enable(nxt_task_t *task, nxt_runtime_t *rt)
1403{
1404 nxt_uint_t i, n;
1405 nxt_listen_socket_t *ls;
1406
1407 ls = rt->listen_sockets->elts;
1408 n = rt->listen_sockets->nelts;
1409
1410 for (i = 0; i < n; i++) {
1411 if (ls[i].flags == NXT_NONBLOCK) {
1412 if (nxt_listen_event(task, &ls[i]) == NULL) {
1413 return NXT_ERROR;
1414 }
1415 }
1416 }
1417
1418 return NXT_OK;
1419}
1420
1421
1422nxt_str_t *
1423nxt_current_directory(nxt_mp_t *mp)
1424{
1425 size_t length;
1426 u_char *p;
1427 nxt_str_t *name;
1428 char buf[NXT_MAX_PATH_LEN];
1429
1430 length = nxt_dir_current(buf, NXT_MAX_PATH_LEN);
1431
1432 if (nxt_fast_path(length != 0)) {
1433 name = nxt_str_alloc(mp, length + 1);
1434
1435 if (nxt_fast_path(name != NULL)) {
1436 p = nxt_cpymem(name->start, buf, length);
1437 *p = '/';
1438
1439 return name;
1440 }
1441 }
1442
1443 return NULL;
1444}
1445
1446
1447static nxt_int_t
1448nxt_runtime_pid_file_create(nxt_task_t *task, nxt_file_name_t *pid_file)
1449{
1450 ssize_t length;
1451 nxt_int_t n;
1452 nxt_file_t file;
1453 u_char pid[NXT_INT64_T_LEN + NXT_LINEFEED_SIZE];
1454
1455 nxt_memzero(&file, sizeof(nxt_file_t));
1456
1457 file.name = pid_file;
1458
1459 n = nxt_file_open(task, &file, O_WRONLY, O_CREAT | O_TRUNC,
1460 NXT_FILE_DEFAULT_ACCESS);
1461
1462 if (n != NXT_OK) {
1463 return NXT_ERROR;
1464 }
1465
1466 length = nxt_sprintf(pid, pid + sizeof(pid), "%PI%n", nxt_pid) - pid;
1467
1468 if (nxt_file_write(&file, pid, length, 0) != length) {
1469 return NXT_ERROR;
1470 }
1471
1472 nxt_file_close(task, &file);
1473
1474 return NXT_OK;
1475}
1476
1477
1478nxt_process_t *
1479nxt_runtime_process_new(nxt_runtime_t *rt)
1480{
1481 nxt_process_t *process;
1482
1483 /* TODO: memory failures. */
1484
1485 process = nxt_mp_zalloc(rt->mem_pool, sizeof(nxt_process_t));
1486 if (nxt_slow_path(process == NULL)) {
1487 return NULL;
1488 }
1489
1490 nxt_queue_init(&process->ports);
1491
1492 nxt_thread_mutex_create(&process->incoming_mutex);
1493 nxt_thread_mutex_create(&process->outgoing_mutex);
1494 nxt_thread_mutex_create(&process->cp_mutex);
1495
1496 return process;
1497}
1498
1499
1500static void
1501nxt_runtime_process_destroy(nxt_runtime_t *rt, nxt_process_t *process)
1502{
1503 nxt_assert(process->port_cleanups == 0);
1504 nxt_assert(process->registered == 0);
1505
1506 nxt_port_mmaps_destroy(process->incoming, 1);
1507 nxt_port_mmaps_destroy(process->outgoing, 1);
1508
1509 if (process->cp_mem_pool != NULL) {
1510 nxt_mp_thread_adopt(process->cp_mem_pool);
1511
1512 nxt_mp_destroy(process->cp_mem_pool);
1513 }
1514
1515 nxt_thread_mutex_destroy(&process->incoming_mutex);
1516 nxt_thread_mutex_destroy(&process->outgoing_mutex);
1517 nxt_thread_mutex_destroy(&process->cp_mutex);
1518
1519 nxt_mp_free(rt->mem_pool, process);
1520}
1521
1522
1523static nxt_int_t
1524nxt_runtime_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data)
1525{
1526 nxt_process_t *process;
1527
1528 process = data;
1529
1530 if (lhq->key.length == sizeof(nxt_pid_t) &&
1531 *(nxt_pid_t *) lhq->key.start == process->pid) {
1532 return NXT_OK;
1533 }
1534
1535 return NXT_DECLINED;
1536}
1537
1538static const nxt_lvlhsh_proto_t lvlhsh_processes_proto nxt_aligned(64) = {
1539 NXT_LVLHSH_DEFAULT,
1540 nxt_runtime_lvlhsh_pid_test,
1541 nxt_lvlhsh_alloc,
1542 nxt_lvlhsh_free,
1543};
1544
1545
1546nxt_inline void
1547nxt_runtime_process_lhq_pid(nxt_lvlhsh_query_t *lhq, nxt_pid_t *pid)
1548{
1549 lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid));
1550 lhq->key.length = sizeof(*pid);
1551 lhq->key.start = (u_char *) pid;
1552 lhq->proto = &lvlhsh_processes_proto;
1553}
1554
1555
1556nxt_process_t *
1557nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid)
1558{
1559 nxt_process_t *process;
1560 nxt_lvlhsh_query_t lhq;
1561
1562 process = NULL;
1563
1564 nxt_runtime_process_lhq_pid(&lhq, &pid);
1565
1566 nxt_thread_mutex_lock(&rt->processes_mutex);
1567
1568 if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) {
1569 process = lhq.value;
1570
1571 } else {
1572 nxt_thread_log_debug("process %PI not found", pid);
1573 }
1574
1575 nxt_thread_mutex_unlock(&rt->processes_mutex);
1576
1577 return process;
1578}
1579
1580
1581nxt_process_t *
1582nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid)
1583{
1584 nxt_process_t *process;
1585 nxt_lvlhsh_query_t lhq;
1586
1587 nxt_runtime_process_lhq_pid(&lhq, &pid);
1588
1589 nxt_thread_mutex_lock(&rt->processes_mutex);
1590
1591 if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) {
1592 nxt_thread_log_debug("process %PI found", pid);
1593
1594 nxt_thread_mutex_unlock(&rt->processes_mutex);
1595 return lhq.value;
1596 }
1597
1598 process = nxt_runtime_process_new(rt);
1599 if (nxt_slow_path(process == NULL)) {
1600 return NULL;
1601 }
1602
1603 process->pid = pid;
1604
1605 lhq.replace = 0;
1606 lhq.value = process;
1607 lhq.pool = rt->mem_pool;
1608
1609 switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) {
1610
1611 case NXT_OK:
1612 if (rt->nprocesses == 0) {
1613 rt->mprocess = process;
1614 }
1615
1616 rt->nprocesses++;
1617
1618 process->registered = 1;
1619
1620 nxt_thread_log_debug("process %PI insert", pid);
1621 break;
1622
1623 default:
1624 nxt_thread_log_debug("process %PI insert failed", pid);
1625 break;
1626 }
1627
1628 nxt_thread_mutex_unlock(&rt->processes_mutex);
1629
1630 return process;
1631}
1632
1633
1634void
1635nxt_runtime_process_add(nxt_runtime_t *rt, nxt_process_t *process)
1636{
1637 nxt_port_t *port;
1638 nxt_lvlhsh_query_t lhq;
1639
1640 nxt_assert(process->registered == 0);
1641
1642 nxt_runtime_process_lhq_pid(&lhq, &process->pid);
1643
1644 lhq.replace = 0;
1645 lhq.value = process;
1646 lhq.pool = rt->mem_pool;
1647
1648 nxt_thread_mutex_lock(&rt->processes_mutex);
1649
1650 switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) {
1651
1652 case NXT_OK:
1653 if (rt->nprocesses == 0) {
1654 rt->mprocess = process;
1655 }
1656
1657 rt->nprocesses++;
1658
1659 nxt_process_port_each(process, port) {
1660
1661 port->pid = process->pid;
1662
1663 nxt_runtime_port_add(rt, port);
1664
1665 } nxt_process_port_loop;
1666
1667 process->registered = 1;
1668
1669 nxt_thread_log_debug("process %PI added", process->pid);
1670 break;
1671
1672 default:
1673 nxt_thread_log_debug("process %PI failed to add", process->pid);
1674 break;
1675 }
1676
1677 nxt_thread_mutex_unlock(&rt->processes_mutex);
1678}
1679
1680
1681static nxt_process_t *
1682nxt_runtime_process_remove_pid(nxt_runtime_t *rt, nxt_pid_t pid)
1683{
1684 nxt_process_t *process;
1685 nxt_lvlhsh_query_t lhq;
1686
1687 process = NULL;
1688
1689 nxt_runtime_process_lhq_pid(&lhq, &pid);
1690
1691 lhq.pool = rt->mem_pool;
1692
1693 nxt_thread_mutex_lock(&rt->processes_mutex);
1694
1695 switch (nxt_lvlhsh_delete(&rt->processes, &lhq)) {
1696
1697 case NXT_OK:
1698 rt->nprocesses--;
1699
1700 process = lhq.value;
1701
1702 process->registered = 0;
1703
1704 nxt_thread_log_debug("process %PI removed", pid);
1705 break;
1706
1707 default:
1708 nxt_thread_log_debug("process %PI remove failed", pid);
1709 break;
1710 }
1711
1712 nxt_thread_mutex_unlock(&rt->processes_mutex);
1713
1714 return process;
1715}
1716
1717
1718void
1719nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process)
1720{
1721 nxt_port_t *port;
1722
1723 if (process->port_cleanups == 0) {
1724 if (process->registered == 1) {
1725 nxt_runtime_process_remove_pid(rt, process->pid);
1726 }
1727
1728 nxt_runtime_process_destroy(rt, process);
1729
1730 } else {
1731 nxt_process_port_each(process, port) {
1732
1733 nxt_runtime_port_remove(rt, port);
1734
1735 nxt_port_release(port);
1736
1737 } nxt_process_port_loop;
1738 }
1739}
1740
1741
1742nxt_process_t *
1743nxt_runtime_process_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe)
1744{
1745 nxt_memzero(lhe, sizeof(nxt_lvlhsh_each_t));
1746
1747 lhe->proto = &lvlhsh_processes_proto;
1748
1749 return nxt_runtime_process_next(rt, lhe);
1750}
1751
1752
1753nxt_port_t *
1754nxt_runtime_port_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe)
1755{
1756 return nxt_port_hash_first(&rt->ports, lhe);
1757}
1758
1759
1760void
1761nxt_runtime_port_add(nxt_runtime_t *rt, nxt_port_t *port)
1762{
1763 nxt_port_hash_add(&rt->ports, rt->mem_pool, port);
1764
1765 rt->port_by_type[port->type] = port;
1766}
1767
1768
1769void
1770nxt_runtime_port_remove(nxt_runtime_t *rt, nxt_port_t *port)
1771{
1772 nxt_port_hash_remove(&rt->ports, rt->mem_pool, port);
1773
1774 if (rt->port_by_type[port->type] == port) {
1775 rt->port_by_type[port->type] = NULL;
1776 }
1777}
1778
1779
1780nxt_port_t *
1781nxt_runtime_port_find(nxt_runtime_t *rt, nxt_pid_t pid,
1782 nxt_port_id_t port_id)
1783{
1784 return nxt_port_hash_find(&rt->ports, pid, port_id);
1785}
958 * limit maximum Unix domain socket address length by defined sun_path[]
959 * length because some OSes accept addresses twice larger than defined
960 * struct sockaddr_un. Also reserve space for a trailing zero to avoid
961 * ambiguity, since many OSes accept Unix domain socket addresses
962 * without a trailing zero.
963 */
964 const size_t max_len = sizeof(struct sockaddr_un)
965 - offsetof(struct sockaddr_un, sun_path) - 1;
966
967 /* cutting "unix:" */
968 length = addr->length - 5;
969 p = addr->start + 5;
970
971 if (length == 0) {
972 nxt_log(task, NXT_LOG_CRIT,
973 "unix domain socket \"%V\" name is invalid", addr);
974 return NULL;
975 }
976
977 if (length > max_len) {
978 nxt_log(task, NXT_LOG_CRIT,
979 "unix domain socket \"%V\" name is too long", addr);
980 return NULL;
981 }
982
983 socklen = offsetof(struct sockaddr_un, sun_path) + length + 1;
984
985#if (NXT_LINUX)
986
987 /*
988 * Linux unix(7):
989 *
990 * abstract: an abstract socket address is distinguished by the fact
991 * that sun_path[0] is a null byte ('\0'). The socket's address in
992 * this namespace is given by the additional bytes in sun_path that
993 * are covered by the specified length of the address structure.
994 * (Null bytes in the name have no special significance.)
995 */
996 if (p[0] == '@') {
997 p[0] = '\0';
998 socklen--;
999 }
1000
1001#endif
1002
1003 sa = nxt_sockaddr_alloc(mp, socklen, addr->length);
1004
1005 if (nxt_slow_path(sa == NULL)) {
1006 return NULL;
1007 }
1008
1009 sa->type = SOCK_STREAM;
1010
1011 sa->u.sockaddr_un.sun_family = AF_UNIX;
1012 nxt_memcpy(sa->u.sockaddr_un.sun_path, p, length);
1013
1014 return sa;
1015
1016#else /* !(NXT_HAVE_UNIX_DOMAIN) */
1017
1018 nxt_log(task, NXT_LOG_CRIT, "unix domain socket \"%V\" is not supported",
1019 addr);
1020
1021 return NULL;
1022
1023#endif
1024}
1025
1026
1027static nxt_sockaddr_t *
1028nxt_runtime_sockaddr_inet6_parse(nxt_task_t *task, nxt_mp_t *mp,
1029 nxt_str_t *addr)
1030{
1031#if (NXT_INET6)
1032 u_char *p, *addr_end;
1033 size_t length;
1034 nxt_int_t port;
1035 nxt_sockaddr_t *sa;
1036 struct in6_addr *in6_addr;
1037
1038 length = addr->length - 1;
1039 p = addr->start + 1;
1040
1041 addr_end = nxt_memchr(p, ']', length);
1042
1043 if (addr_end == NULL) {
1044 goto invalid_address;
1045 }
1046
1047 sa = nxt_sockaddr_alloc(mp, sizeof(struct sockaddr_in6),
1048 NXT_INET6_ADDR_STR_LEN);
1049 if (nxt_slow_path(sa == NULL)) {
1050 return NULL;
1051 }
1052
1053 in6_addr = &sa->u.sockaddr_in6.sin6_addr;
1054
1055 if (nxt_inet6_addr(in6_addr, p, addr_end - p) != NXT_OK) {
1056 goto invalid_address;
1057 }
1058
1059 port = 0;
1060 p = addr_end + 1;
1061 length = (p + length) - p;
1062
1063 if (length == 0) {
1064 goto found;
1065 }
1066
1067 if (*p == ':') {
1068 port = nxt_int_parse(p + 1, length - 1);
1069
1070 if (port >= 1 && port <= 65535) {
1071 goto found;
1072 }
1073 }
1074
1075 nxt_log(task, NXT_LOG_CRIT, "invalid port in \"%V\"", addr);
1076
1077 return NULL;
1078
1079found:
1080
1081 sa->type = SOCK_STREAM;
1082
1083 sa->u.sockaddr_in6.sin6_family = AF_INET6;
1084 sa->u.sockaddr_in6.sin6_port = htons((in_port_t) port);
1085
1086 return sa;
1087
1088invalid_address:
1089
1090 nxt_log(task, NXT_LOG_CRIT, "invalid IPv6 address in \"%V\"", addr);
1091
1092 return NULL;
1093
1094#else
1095
1096 nxt_log(task, NXT_LOG_CRIT, "IPv6 socket \"%V\" is not supported", addr);
1097
1098 return NULL;
1099
1100#endif
1101}
1102
1103
1104static nxt_sockaddr_t *
1105nxt_runtime_sockaddr_inet_parse(nxt_task_t *task, nxt_mp_t *mp,
1106 nxt_str_t *string)
1107{
1108 u_char *p, *ip;
1109 size_t length;
1110 in_addr_t addr;
1111 nxt_int_t port;
1112 nxt_sockaddr_t *sa;
1113
1114 addr = INADDR_ANY;
1115
1116 length = string->length;
1117 ip = string->start;
1118
1119 p = nxt_memchr(ip, ':', length);
1120
1121 if (p == NULL) {
1122
1123 /* single value port, or address */
1124
1125 port = nxt_int_parse(ip, length);
1126
1127 if (port > 0) {
1128 /* "*:XX" */
1129
1130 if (port < 1 || port > 65535) {
1131 goto invalid_port;
1132 }
1133
1134 } else {
1135 /* "x.x.x.x" */
1136
1137 addr = nxt_inet_addr(ip, length);
1138
1139 if (addr == INADDR_NONE) {
1140 goto invalid_port;
1141 }
1142
1143 port = 8080;
1144 }
1145
1146 } else {
1147
1148 /* x.x.x.x:XX */
1149
1150 p++;
1151 length = (ip + length) - p;
1152 port = nxt_int_parse(p, length);
1153
1154 if (port < 1 || port > 65535) {
1155 goto invalid_port;
1156 }
1157
1158 length = (p - 1) - ip;
1159
1160 if (length != 1 || ip[0] != '*') {
1161 addr = nxt_inet_addr(ip, length);
1162
1163 if (addr == INADDR_NONE) {
1164 goto invalid_addr;
1165 }
1166
1167 /* "x.x.x.x:XX" */
1168 }
1169 }
1170
1171 sa = nxt_sockaddr_alloc(mp, sizeof(struct sockaddr_in),
1172 NXT_INET_ADDR_STR_LEN);
1173 if (nxt_slow_path(sa == NULL)) {
1174 return NULL;
1175 }
1176
1177 sa->type = SOCK_STREAM;
1178
1179 sa->u.sockaddr_in.sin_family = AF_INET;
1180 sa->u.sockaddr_in.sin_port = htons((in_port_t) port);
1181 sa->u.sockaddr_in.sin_addr.s_addr = addr;
1182
1183 return sa;
1184
1185invalid_port:
1186
1187 nxt_log(task, NXT_LOG_CRIT, "invalid port in \"%V\"", string);
1188
1189 return NULL;
1190
1191invalid_addr:
1192
1193 nxt_log(task, NXT_LOG_CRIT, "invalid address in \"%V\"", string);
1194
1195 return NULL;
1196}
1197
1198
1199nxt_listen_socket_t *
1200nxt_runtime_listen_socket_add(nxt_runtime_t *rt, nxt_sockaddr_t *sa)
1201{
1202 nxt_mp_t *mp;
1203 nxt_listen_socket_t *ls;
1204
1205 ls = nxt_array_zero_add(rt->listen_sockets);
1206 if (ls == NULL) {
1207 return NULL;
1208 }
1209
1210 mp = rt->mem_pool;
1211
1212 ls->sockaddr = nxt_sockaddr_create(mp, &sa->u.sockaddr, sa->socklen,
1213 sa->length);
1214 if (ls->sockaddr == NULL) {
1215 return NULL;
1216 }
1217
1218 ls->sockaddr->type = sa->type;
1219
1220 nxt_sockaddr_text(ls->sockaddr);
1221
1222 ls->socket = -1;
1223 ls->backlog = NXT_LISTEN_BACKLOG;
1224
1225 return ls;
1226}
1227
1228
1229static nxt_int_t
1230nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt)
1231{
1232 size_t length;
1233 char hostname[NXT_MAXHOSTNAMELEN + 1];
1234
1235 if (gethostname(hostname, NXT_MAXHOSTNAMELEN) != 0) {
1236 nxt_log(task, NXT_LOG_CRIT, "gethostname() failed %E", nxt_errno);
1237 return NXT_ERROR;
1238 }
1239
1240 /*
1241 * Linux gethostname(2):
1242 *
1243 * If the null-terminated hostname is too large to fit,
1244 * then the name is truncated, and no error is returned.
1245 *
1246 * For this reason an additional byte is reserved in the buffer.
1247 */
1248 hostname[NXT_MAXHOSTNAMELEN] = '\0';
1249
1250 length = nxt_strlen(hostname);
1251 rt->hostname.length = length;
1252
1253 rt->hostname.start = nxt_mp_nget(rt->mem_pool, length);
1254
1255 if (rt->hostname.start != NULL) {
1256 nxt_memcpy_lowcase(rt->hostname.start, (u_char *) hostname, length);
1257 return NXT_OK;
1258 }
1259
1260 return NXT_ERROR;
1261}
1262
1263
1264static nxt_int_t
1265nxt_runtime_log_files_init(nxt_runtime_t *rt)
1266{
1267 nxt_file_t *file;
1268 nxt_list_t *log_files;
1269
1270 log_files = nxt_list_create(rt->mem_pool, 1, sizeof(nxt_file_t));
1271
1272 if (nxt_fast_path(log_files != NULL)) {
1273 rt->log_files = log_files;
1274
1275 /* Preallocate the main error_log. This allocation cannot fail. */
1276 file = nxt_list_zero_add(log_files);
1277
1278 file->fd = NXT_FILE_INVALID;
1279 file->log_level = NXT_LOG_CRIT;
1280
1281 return NXT_OK;
1282 }
1283
1284 return NXT_ERROR;
1285}
1286
1287
1288nxt_file_t *
1289nxt_runtime_log_file_add(nxt_runtime_t *rt, nxt_str_t *name)
1290{
1291 nxt_int_t ret;
1292 nxt_str_t *prefix;
1293 nxt_file_t *file;
1294 nxt_file_name_str_t file_name;
1295
1296 prefix = nxt_file_name_is_absolute(name->start) ? NULL : rt->prefix;
1297
1298 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%V%V%Z",
1299 prefix, name);
1300
1301 if (nxt_slow_path(ret != NXT_OK)) {
1302 return NULL;
1303 }
1304
1305 nxt_list_each(file, rt->log_files) {
1306
1307 /* STUB: hardecoded case sensitive/insensitive. */
1308
1309 if (file->name != NULL
1310 && nxt_file_name_eq(file->name, file_name.start))
1311 {
1312 return file;
1313 }
1314
1315 } nxt_list_loop;
1316
1317 file = nxt_list_zero_add(rt->log_files);
1318
1319 if (nxt_slow_path(file == NULL)) {
1320 return NULL;
1321 }
1322
1323 file->fd = NXT_FILE_INVALID;
1324 file->log_level = NXT_LOG_CRIT;
1325 file->name = file_name.start;
1326
1327 return file;
1328}
1329
1330
1331static nxt_int_t
1332nxt_runtime_log_files_create(nxt_task_t *task, nxt_runtime_t *rt)
1333{
1334 nxt_int_t ret;
1335 nxt_file_t *file;
1336
1337 nxt_list_each(file, rt->log_files) {
1338
1339 ret = nxt_file_open(task, file, O_WRONLY | O_APPEND, O_CREAT,
1340 NXT_FILE_OWNER_ACCESS);
1341
1342 if (ret != NXT_OK) {
1343 return NXT_ERROR;
1344 }
1345
1346 } nxt_list_loop;
1347
1348 file = nxt_list_first(rt->log_files);
1349
1350 return nxt_file_stderr(file);
1351}
1352
1353
1354nxt_int_t
1355nxt_runtime_listen_sockets_create(nxt_task_t *task, nxt_runtime_t *rt)
1356{
1357 nxt_int_t ret;
1358 nxt_uint_t c, p, ncurr, nprev;
1359 nxt_listen_socket_t *curr, *prev;
1360
1361 curr = rt->listen_sockets->elts;
1362 ncurr = rt->listen_sockets->nelts;
1363
1364 if (rt->inherited_sockets != NULL) {
1365 prev = rt->inherited_sockets->elts;
1366 nprev = rt->inherited_sockets->nelts;
1367
1368 } else {
1369 prev = NULL;
1370 nprev = 0;
1371 }
1372
1373 for (c = 0; c < ncurr; c++) {
1374
1375 for (p = 0; p < nprev; p++) {
1376
1377 if (nxt_sockaddr_cmp(curr[c].sockaddr, prev[p].sockaddr)) {
1378
1379 ret = nxt_listen_socket_update(task, &curr[c], &prev[p]);
1380 if (ret != NXT_OK) {
1381 return NXT_ERROR;
1382 }
1383
1384 goto next;
1385 }
1386 }
1387
1388 if (nxt_listen_socket_create(task, &curr[c], 0) != NXT_OK) {
1389 return NXT_ERROR;
1390 }
1391
1392 next:
1393
1394 continue;
1395 }
1396
1397 return NXT_OK;
1398}
1399
1400
1401nxt_int_t
1402nxt_runtime_listen_sockets_enable(nxt_task_t *task, nxt_runtime_t *rt)
1403{
1404 nxt_uint_t i, n;
1405 nxt_listen_socket_t *ls;
1406
1407 ls = rt->listen_sockets->elts;
1408 n = rt->listen_sockets->nelts;
1409
1410 for (i = 0; i < n; i++) {
1411 if (ls[i].flags == NXT_NONBLOCK) {
1412 if (nxt_listen_event(task, &ls[i]) == NULL) {
1413 return NXT_ERROR;
1414 }
1415 }
1416 }
1417
1418 return NXT_OK;
1419}
1420
1421
1422nxt_str_t *
1423nxt_current_directory(nxt_mp_t *mp)
1424{
1425 size_t length;
1426 u_char *p;
1427 nxt_str_t *name;
1428 char buf[NXT_MAX_PATH_LEN];
1429
1430 length = nxt_dir_current(buf, NXT_MAX_PATH_LEN);
1431
1432 if (nxt_fast_path(length != 0)) {
1433 name = nxt_str_alloc(mp, length + 1);
1434
1435 if (nxt_fast_path(name != NULL)) {
1436 p = nxt_cpymem(name->start, buf, length);
1437 *p = '/';
1438
1439 return name;
1440 }
1441 }
1442
1443 return NULL;
1444}
1445
1446
1447static nxt_int_t
1448nxt_runtime_pid_file_create(nxt_task_t *task, nxt_file_name_t *pid_file)
1449{
1450 ssize_t length;
1451 nxt_int_t n;
1452 nxt_file_t file;
1453 u_char pid[NXT_INT64_T_LEN + NXT_LINEFEED_SIZE];
1454
1455 nxt_memzero(&file, sizeof(nxt_file_t));
1456
1457 file.name = pid_file;
1458
1459 n = nxt_file_open(task, &file, O_WRONLY, O_CREAT | O_TRUNC,
1460 NXT_FILE_DEFAULT_ACCESS);
1461
1462 if (n != NXT_OK) {
1463 return NXT_ERROR;
1464 }
1465
1466 length = nxt_sprintf(pid, pid + sizeof(pid), "%PI%n", nxt_pid) - pid;
1467
1468 if (nxt_file_write(&file, pid, length, 0) != length) {
1469 return NXT_ERROR;
1470 }
1471
1472 nxt_file_close(task, &file);
1473
1474 return NXT_OK;
1475}
1476
1477
1478nxt_process_t *
1479nxt_runtime_process_new(nxt_runtime_t *rt)
1480{
1481 nxt_process_t *process;
1482
1483 /* TODO: memory failures. */
1484
1485 process = nxt_mp_zalloc(rt->mem_pool, sizeof(nxt_process_t));
1486 if (nxt_slow_path(process == NULL)) {
1487 return NULL;
1488 }
1489
1490 nxt_queue_init(&process->ports);
1491
1492 nxt_thread_mutex_create(&process->incoming_mutex);
1493 nxt_thread_mutex_create(&process->outgoing_mutex);
1494 nxt_thread_mutex_create(&process->cp_mutex);
1495
1496 return process;
1497}
1498
1499
1500static void
1501nxt_runtime_process_destroy(nxt_runtime_t *rt, nxt_process_t *process)
1502{
1503 nxt_assert(process->port_cleanups == 0);
1504 nxt_assert(process->registered == 0);
1505
1506 nxt_port_mmaps_destroy(process->incoming, 1);
1507 nxt_port_mmaps_destroy(process->outgoing, 1);
1508
1509 if (process->cp_mem_pool != NULL) {
1510 nxt_mp_thread_adopt(process->cp_mem_pool);
1511
1512 nxt_mp_destroy(process->cp_mem_pool);
1513 }
1514
1515 nxt_thread_mutex_destroy(&process->incoming_mutex);
1516 nxt_thread_mutex_destroy(&process->outgoing_mutex);
1517 nxt_thread_mutex_destroy(&process->cp_mutex);
1518
1519 nxt_mp_free(rt->mem_pool, process);
1520}
1521
1522
1523static nxt_int_t
1524nxt_runtime_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data)
1525{
1526 nxt_process_t *process;
1527
1528 process = data;
1529
1530 if (lhq->key.length == sizeof(nxt_pid_t) &&
1531 *(nxt_pid_t *) lhq->key.start == process->pid) {
1532 return NXT_OK;
1533 }
1534
1535 return NXT_DECLINED;
1536}
1537
1538static const nxt_lvlhsh_proto_t lvlhsh_processes_proto nxt_aligned(64) = {
1539 NXT_LVLHSH_DEFAULT,
1540 nxt_runtime_lvlhsh_pid_test,
1541 nxt_lvlhsh_alloc,
1542 nxt_lvlhsh_free,
1543};
1544
1545
1546nxt_inline void
1547nxt_runtime_process_lhq_pid(nxt_lvlhsh_query_t *lhq, nxt_pid_t *pid)
1548{
1549 lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid));
1550 lhq->key.length = sizeof(*pid);
1551 lhq->key.start = (u_char *) pid;
1552 lhq->proto = &lvlhsh_processes_proto;
1553}
1554
1555
1556nxt_process_t *
1557nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid)
1558{
1559 nxt_process_t *process;
1560 nxt_lvlhsh_query_t lhq;
1561
1562 process = NULL;
1563
1564 nxt_runtime_process_lhq_pid(&lhq, &pid);
1565
1566 nxt_thread_mutex_lock(&rt->processes_mutex);
1567
1568 if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) {
1569 process = lhq.value;
1570
1571 } else {
1572 nxt_thread_log_debug("process %PI not found", pid);
1573 }
1574
1575 nxt_thread_mutex_unlock(&rt->processes_mutex);
1576
1577 return process;
1578}
1579
1580
1581nxt_process_t *
1582nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid)
1583{
1584 nxt_process_t *process;
1585 nxt_lvlhsh_query_t lhq;
1586
1587 nxt_runtime_process_lhq_pid(&lhq, &pid);
1588
1589 nxt_thread_mutex_lock(&rt->processes_mutex);
1590
1591 if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) {
1592 nxt_thread_log_debug("process %PI found", pid);
1593
1594 nxt_thread_mutex_unlock(&rt->processes_mutex);
1595 return lhq.value;
1596 }
1597
1598 process = nxt_runtime_process_new(rt);
1599 if (nxt_slow_path(process == NULL)) {
1600 return NULL;
1601 }
1602
1603 process->pid = pid;
1604
1605 lhq.replace = 0;
1606 lhq.value = process;
1607 lhq.pool = rt->mem_pool;
1608
1609 switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) {
1610
1611 case NXT_OK:
1612 if (rt->nprocesses == 0) {
1613 rt->mprocess = process;
1614 }
1615
1616 rt->nprocesses++;
1617
1618 process->registered = 1;
1619
1620 nxt_thread_log_debug("process %PI insert", pid);
1621 break;
1622
1623 default:
1624 nxt_thread_log_debug("process %PI insert failed", pid);
1625 break;
1626 }
1627
1628 nxt_thread_mutex_unlock(&rt->processes_mutex);
1629
1630 return process;
1631}
1632
1633
1634void
1635nxt_runtime_process_add(nxt_runtime_t *rt, nxt_process_t *process)
1636{
1637 nxt_port_t *port;
1638 nxt_lvlhsh_query_t lhq;
1639
1640 nxt_assert(process->registered == 0);
1641
1642 nxt_runtime_process_lhq_pid(&lhq, &process->pid);
1643
1644 lhq.replace = 0;
1645 lhq.value = process;
1646 lhq.pool = rt->mem_pool;
1647
1648 nxt_thread_mutex_lock(&rt->processes_mutex);
1649
1650 switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) {
1651
1652 case NXT_OK:
1653 if (rt->nprocesses == 0) {
1654 rt->mprocess = process;
1655 }
1656
1657 rt->nprocesses++;
1658
1659 nxt_process_port_each(process, port) {
1660
1661 port->pid = process->pid;
1662
1663 nxt_runtime_port_add(rt, port);
1664
1665 } nxt_process_port_loop;
1666
1667 process->registered = 1;
1668
1669 nxt_thread_log_debug("process %PI added", process->pid);
1670 break;
1671
1672 default:
1673 nxt_thread_log_debug("process %PI failed to add", process->pid);
1674 break;
1675 }
1676
1677 nxt_thread_mutex_unlock(&rt->processes_mutex);
1678}
1679
1680
1681static nxt_process_t *
1682nxt_runtime_process_remove_pid(nxt_runtime_t *rt, nxt_pid_t pid)
1683{
1684 nxt_process_t *process;
1685 nxt_lvlhsh_query_t lhq;
1686
1687 process = NULL;
1688
1689 nxt_runtime_process_lhq_pid(&lhq, &pid);
1690
1691 lhq.pool = rt->mem_pool;
1692
1693 nxt_thread_mutex_lock(&rt->processes_mutex);
1694
1695 switch (nxt_lvlhsh_delete(&rt->processes, &lhq)) {
1696
1697 case NXT_OK:
1698 rt->nprocesses--;
1699
1700 process = lhq.value;
1701
1702 process->registered = 0;
1703
1704 nxt_thread_log_debug("process %PI removed", pid);
1705 break;
1706
1707 default:
1708 nxt_thread_log_debug("process %PI remove failed", pid);
1709 break;
1710 }
1711
1712 nxt_thread_mutex_unlock(&rt->processes_mutex);
1713
1714 return process;
1715}
1716
1717
1718void
1719nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process)
1720{
1721 nxt_port_t *port;
1722
1723 if (process->port_cleanups == 0) {
1724 if (process->registered == 1) {
1725 nxt_runtime_process_remove_pid(rt, process->pid);
1726 }
1727
1728 nxt_runtime_process_destroy(rt, process);
1729
1730 } else {
1731 nxt_process_port_each(process, port) {
1732
1733 nxt_runtime_port_remove(rt, port);
1734
1735 nxt_port_release(port);
1736
1737 } nxt_process_port_loop;
1738 }
1739}
1740
1741
1742nxt_process_t *
1743nxt_runtime_process_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe)
1744{
1745 nxt_memzero(lhe, sizeof(nxt_lvlhsh_each_t));
1746
1747 lhe->proto = &lvlhsh_processes_proto;
1748
1749 return nxt_runtime_process_next(rt, lhe);
1750}
1751
1752
1753nxt_port_t *
1754nxt_runtime_port_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe)
1755{
1756 return nxt_port_hash_first(&rt->ports, lhe);
1757}
1758
1759
1760void
1761nxt_runtime_port_add(nxt_runtime_t *rt, nxt_port_t *port)
1762{
1763 nxt_port_hash_add(&rt->ports, rt->mem_pool, port);
1764
1765 rt->port_by_type[port->type] = port;
1766}
1767
1768
1769void
1770nxt_runtime_port_remove(nxt_runtime_t *rt, nxt_port_t *port)
1771{
1772 nxt_port_hash_remove(&rt->ports, rt->mem_pool, port);
1773
1774 if (rt->port_by_type[port->type] == port) {
1775 rt->port_by_type[port->type] = NULL;
1776 }
1777}
1778
1779
1780nxt_port_t *
1781nxt_runtime_port_find(nxt_runtime_t *rt, nxt_pid_t pid,
1782 nxt_port_id_t port_id)
1783{
1784 return nxt_port_hash_find(&rt->ports, pid, port_id);
1785}