1
2 /*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8 #include <nxt_main.h>
9 #include <nxt_runtime.h>
10 #include <nxt_port.h>
11 #include <nxt_main_process.h>
12 #include <nxt_router.h>
13 #include <nxt_regex.h>
14
15
16 static nxt_int_t nxt_runtime_inherited_listen_sockets(nxt_task_t *task,
17 nxt_runtime_t *rt);
18 static nxt_int_t nxt_runtime_systemd_listen_sockets(nxt_task_t *task,
19 nxt_runtime_t *rt);
20 static nxt_int_t nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt);
21 static nxt_int_t nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt);
22 static void nxt_runtime_start(nxt_task_t *task, void *obj, void *data);
23 static void nxt_runtime_initial_start(nxt_task_t *task, nxt_uint_t status);
24 static void nxt_runtime_close_idle_connections(nxt_event_engine_t *engine);
25 static void nxt_runtime_stop_all_processes(nxt_task_t *task, nxt_runtime_t *rt);
26 static void nxt_runtime_exit(nxt_task_t *task, void *obj, void *data);
27 static nxt_int_t nxt_runtime_event_engine_change(nxt_task_t *task,
28 nxt_runtime_t *rt);
29 static nxt_int_t nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt);
30 static nxt_int_t nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt);
31 static nxt_int_t nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt);
32 static nxt_int_t nxt_runtime_log_files_init(nxt_runtime_t *rt);
33 static nxt_int_t nxt_runtime_log_files_create(nxt_task_t *task,
34 nxt_runtime_t *rt);
35 static nxt_int_t nxt_runtime_pid_file_create(nxt_task_t *task,
36 nxt_file_name_t *pid_file);
37 static void nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt,
38 nxt_runtime_cont_t cont);
39 static void nxt_runtime_thread_pool_init(void);
40 static void nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj,
41 void *data);
42 static nxt_process_t *nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid);
43 static void nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port);
44
45
46 nxt_int_t
nxt_runtime_create(nxt_task_t * task)47 nxt_runtime_create(nxt_task_t *task)
48 {
49 nxt_mp_t *mp;
50 nxt_int_t ret;
51 nxt_array_t *listen_sockets;
52 nxt_runtime_t *rt;
53 nxt_app_lang_module_t *lang;
54
55 mp = nxt_mp_create(1024, 128, 256, 32);
56 if (nxt_slow_path(mp == NULL)) {
57 return NXT_ERROR;
58 }
59
60 rt = nxt_mp_zget(mp, sizeof(nxt_runtime_t));
61 if (nxt_slow_path(rt == NULL)) {
62 goto fail;
63 }
64
65 task->thread->runtime = rt;
66 rt->mem_pool = mp;
67
68 nxt_thread_mutex_create(&rt->processes_mutex);
69
70 rt->services = nxt_services_init(mp);
71 if (nxt_slow_path(rt->services == NULL)) {
72 goto fail;
73 }
74
75 rt->languages = nxt_array_create(mp, 1, sizeof(nxt_app_lang_module_t));
76 if (nxt_slow_path(rt->languages == NULL)) {
77 goto fail;
78 }
79
80 /* Should not fail. */
81 lang = nxt_array_add(rt->languages);
82 lang->type = NXT_APP_EXTERNAL;
83 lang->version = (u_char *) "";
84 lang->file = NULL;
85 lang->module = &nxt_external_module;
86
87 lang->mounts = nxt_array_create(mp, 1, sizeof(nxt_fs_mount_t));
88 if (nxt_slow_path(lang->mounts == NULL)) {
89 goto fail;
90 }
91
92 listen_sockets = nxt_array_create(mp, 1, sizeof(nxt_listen_socket_t));
93 if (nxt_slow_path(listen_sockets == NULL)) {
94 goto fail;
95 }
96
97 rt->listen_sockets = listen_sockets;
98
99 ret = nxt_runtime_inherited_listen_sockets(task, rt);
100 if (nxt_slow_path(ret != NXT_OK)) {
101 goto fail;
102 }
103
104 if (nxt_runtime_hostname(task, rt) != NXT_OK) {
105 goto fail;
106 }
107
108 if (nxt_slow_path(nxt_runtime_log_files_init(rt) != NXT_OK)) {
109 goto fail;
110 }
111
112 if (nxt_runtime_event_engines(task, rt) != NXT_OK) {
113 goto fail;
114 }
115
116 if (nxt_slow_path(nxt_runtime_thread_pools(task->thread, rt) != NXT_OK)) {
117 goto fail;
118 }
119
120 rt->start = nxt_runtime_initial_start;
121
122 if (nxt_runtime_conf_init(task, rt) != NXT_OK) {
123 goto fail;
124 }
125
126 if (nxt_port_rpc_init() != NXT_OK) {
127 goto fail;
128 }
129
130 if (nxt_slow_path(nxt_http_register_variables() != NXT_OK)) {
131 goto fail;
132 }
133
134 if (nxt_slow_path(nxt_var_index_init() != NXT_OK)) {
135 goto fail;
136 }
137
138 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
139 nxt_runtime_start, task, rt, NULL);
140
141 return NXT_OK;
142
143 fail:
144
145 nxt_mp_destroy(mp);
146
147 return NXT_ERROR;
148 }
149
150
151 static nxt_int_t
nxt_runtime_inherited_listen_sockets(nxt_task_t * task,nxt_runtime_t * rt)152 nxt_runtime_inherited_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt)
153 {
154 u_char *v, *p;
155 nxt_int_t type;
156 nxt_array_t *inherited_sockets;
157 nxt_socket_t s;
158 nxt_listen_socket_t *ls;
159
160 v = (u_char *) getenv("NGINX");
161
162 if (v == NULL) {
163 return nxt_runtime_systemd_listen_sockets(task, rt);
164 }
165
166 nxt_alert(task, "using inherited listen sockets: %s", v);
167
168 inherited_sockets = nxt_array_create(rt->mem_pool,
169 1, sizeof(nxt_listen_socket_t));
170 if (inherited_sockets == NULL) {
171 return NXT_ERROR;
172 }
173
174 rt->inherited_sockets = inherited_sockets;
175
176 for (p = v; *p != '\0'; p++) {
177
178 if (*p == ';') {
179 s = nxt_int_parse(v, p - v);
180
181 if (nxt_slow_path(s < 0)) {
182 nxt_alert(task, "invalid socket number \"%s\" "
183 "in NGINX environment variable, "
184 "ignoring the rest of the variable", v);
185 return NXT_ERROR;
186 }
187
188 v = p + 1;
189
190 ls = nxt_array_zero_add(inherited_sockets);
191 if (nxt_slow_path(ls == NULL)) {
192 return NXT_ERROR;
193 }
194
195 ls->socket = s;
196
197 ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s);
198 if (nxt_slow_path(ls->sockaddr == NULL)) {
199 return NXT_ERROR;
200 }
201
202 type = nxt_socket_getsockopt(task, s, SOL_SOCKET, SO_TYPE);
203 if (nxt_slow_path(type == -1)) {
204 return NXT_ERROR;
205 }
206
207 ls->sockaddr->type = (uint16_t) type;
208 }
209 }
210
211 return NXT_OK;
212 }
213
214
215 static nxt_int_t
nxt_runtime_systemd_listen_sockets(nxt_task_t * task,nxt_runtime_t * rt)216 nxt_runtime_systemd_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt)
217 {
218 u_char *nfd, *pid;
219 nxt_int_t n;
220 nxt_array_t *inherited_sockets;
221 nxt_socket_t s;
222 nxt_listen_socket_t *ls;
223
224 /*
225 * Number of listening sockets passed. The socket
226 * descriptors start from number 3 and are sequential.
227 */
228 nfd = (u_char *) getenv("LISTEN_FDS");
229 if (nfd == NULL) {
230 return NXT_OK;
231 }
232
233 /* The pid of the service process. */
234 pid = (u_char *) getenv("LISTEN_PID");
235 if (pid == NULL) {
236 return NXT_OK;
237 }
238
239 n = nxt_int_parse(nfd, nxt_strlen(nfd));
240 if (n < 0) {
241 return NXT_OK;
242 }
243
244 if (nxt_pid != nxt_int_parse(pid, nxt_strlen(pid))) {
245 return NXT_OK;
246 }
247
248 nxt_log(task, NXT_LOG_INFO, "using %i systemd listen sockets", n);
249
250 inherited_sockets = nxt_array_create(rt->mem_pool,
251 n, sizeof(nxt_listen_socket_t));
252 if (inherited_sockets == NULL) {
253 return NXT_ERROR;
254 }
255
256 rt->inherited_sockets = inherited_sockets;
257
258 for (s = 3; s < n; s++) {
259 ls = nxt_array_zero_add(inherited_sockets);
260 if (nxt_slow_path(ls == NULL)) {
261 return NXT_ERROR;
262 }
263
264 ls->socket = s;
265
266 ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s);
267 if (nxt_slow_path(ls->sockaddr == NULL)) {
268 return NXT_ERROR;
269 }
270
271 ls->sockaddr->type = SOCK_STREAM;
272 }
273
274 return NXT_OK;
275 }
276
277
278 static nxt_int_t
nxt_runtime_event_engines(nxt_task_t * task,nxt_runtime_t * rt)279 nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt)
280 {
281 nxt_thread_t *thread;
282 nxt_event_engine_t *engine;
283 const nxt_event_interface_t *interface;
284
285 interface = nxt_service_get(rt->services, "engine", NULL);
286
287 if (nxt_slow_path(interface == NULL)) {
288 /* TODO: log */
289 return NXT_ERROR;
290 }
291
292 engine = nxt_event_engine_create(task, interface,
293 nxt_main_process_signals, 0, 0);
294
295 if (nxt_slow_path(engine == NULL)) {
296 return NXT_ERROR;
297 }
298
299 thread = task->thread;
300 thread->engine = engine;
301 #if 0
302 thread->fiber = &engine->fibers->fiber;
303 #endif
304
305 engine->id = rt->last_engine_id++;
306 engine->mem_pool = nxt_mp_create(1024, 128, 256, 32);
307
308 nxt_queue_init(&rt->engines);
309 nxt_queue_insert_tail(&rt->engines, &engine->link);
310
311 return NXT_OK;
312 }
313
314
315 static nxt_int_t
nxt_runtime_thread_pools(nxt_thread_t * thr,nxt_runtime_t * rt)316 nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt)
317 {
318 nxt_int_t ret;
319 nxt_array_t *thread_pools;
320
321 thread_pools = nxt_array_create(rt->mem_pool, 1,
322 sizeof(nxt_thread_pool_t *));
323
324 if (nxt_slow_path(thread_pools == NULL)) {
325 return NXT_ERROR;
326 }
327
328 rt->thread_pools = thread_pools;
329 ret = nxt_runtime_thread_pool_create(thr, rt, 2, 60000 * 1000000LL);
330
331 if (nxt_slow_path(ret != NXT_OK)) {
332 return NXT_ERROR;
333 }
334
335 return NXT_OK;
336 }
337
338
339 static void
nxt_runtime_start(nxt_task_t * task,void * obj,void * data)340 nxt_runtime_start(nxt_task_t *task, void *obj, void *data)
341 {
342 nxt_runtime_t *rt;
343
344 rt = obj;
345
346 nxt_debug(task, "rt conf done");
347
348 task->thread->log->ctx_handler = NULL;
349 task->thread->log->ctx = NULL;
350
351 if (nxt_runtime_log_files_create(task, rt) != NXT_OK) {
352 goto fail;
353 }
354
355 if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) {
356 goto fail;
357 }
358
359 /*
360 * Thread pools should be destroyed before starting worker
361 * processes, because thread pool semaphores will stick in
362 * locked state in new processes after fork().
363 */
364 nxt_runtime_thread_pool_destroy(task, rt, rt->start);
365
366 return;
367
368 fail:
369
370 nxt_runtime_quit(task, 1);
371 }
372
373
374 static void
nxt_runtime_initial_start(nxt_task_t * task,nxt_uint_t status)375 nxt_runtime_initial_start(nxt_task_t *task, nxt_uint_t status)
376 {
377 nxt_int_t ret;
378 nxt_thread_t *thr;
379 nxt_runtime_t *rt;
380 const nxt_event_interface_t *interface;
381
382 thr = task->thread;
383 rt = thr->runtime;
384
385 if (rt->inherited_sockets == NULL && rt->daemon) {
386
387 if (nxt_process_daemon(task) != NXT_OK) {
388 goto fail;
389 }
390
391 /*
392 * An event engine should be updated after fork()
393 * even if an event facility was not changed because:
394 * 1) inherited kqueue descriptor is invalid,
395 * 2) the signal thread is not inherited.
396 */
397 interface = nxt_service_get(rt->services, "engine", rt->engine);
398 if (interface == NULL) {
399 goto fail;
400 }
401
402 ret = nxt_event_engine_change(task->thread->engine, interface,
403 rt->batch);
404 if (ret != NXT_OK) {
405 goto fail;
406 }
407 }
408
409 ret = nxt_runtime_pid_file_create(task, rt->pid_file);
410 if (ret != NXT_OK) {
411 goto fail;
412 }
413
414 if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) {
415 goto fail;
416 }
417
418 thr->engine->max_connections = rt->engine_connections;
419
420 if (nxt_main_process_start(thr, task, rt) != NXT_ERROR) {
421 return;
422 }
423
424 fail:
425
426 nxt_runtime_quit(task, 1);
427 }
428
429
430 void
nxt_runtime_quit(nxt_task_t * task,nxt_uint_t status)431 nxt_runtime_quit(nxt_task_t *task, nxt_uint_t status)
432 {
433 nxt_bool_t done;
434 nxt_runtime_t *rt;
435 nxt_event_engine_t *engine;
436
437 rt = task->thread->runtime;
438 rt->status |= status;
439 engine = task->thread->engine;
440
441 nxt_debug(task, "exiting");
442
443 done = 1;
444
445 if (!engine->shutdown) {
446 engine->shutdown = 1;
447
448 if (!nxt_array_is_empty(rt->thread_pools)) {
449 nxt_runtime_thread_pool_destroy(task, rt, nxt_runtime_quit);
450 done = 0;
451 }
452
453 if (rt->type == NXT_PROCESS_MAIN) {
454 nxt_runtime_stop_all_processes(task, rt);
455 done = 0;
456 }
457 }
458
459 nxt_runtime_close_idle_connections(engine);
460
461 if (done) {
462 nxt_work_queue_add(&engine->fast_work_queue, nxt_runtime_exit,
463 task, rt, engine);
464 }
465 }
466
467
468 static void
nxt_runtime_close_idle_connections(nxt_event_engine_t * engine)469 nxt_runtime_close_idle_connections(nxt_event_engine_t *engine)
470 {
471 nxt_conn_t *c;
472 nxt_queue_t *idle;
473 nxt_queue_link_t *link, *next;
474
475 nxt_debug(&engine->task, "close idle connections");
476
477 idle = &engine->idle_connections;
478
479 for (link = nxt_queue_first(idle);
480 link != nxt_queue_tail(idle);
481 link = next)
482 {
483 next = nxt_queue_next(link);
484 c = nxt_queue_link_data(link, nxt_conn_t, link);
485
486 if (!c->socket.read_ready) {
487 nxt_queue_remove(link);
488 nxt_conn_close(engine, c);
489 }
490 }
491 }
492
493
494 void
nxt_runtime_stop_app_processes(nxt_task_t * task,nxt_runtime_t * rt)495 nxt_runtime_stop_app_processes(nxt_task_t *task, nxt_runtime_t *rt)
496 {
497 nxt_port_t *port;
498 nxt_process_t *process;
499 nxt_process_init_t *init;
500
501 nxt_runtime_process_each(rt, process) {
502
503 init = nxt_process_init(process);
504
505 if (init->type == NXT_PROCESS_APP
506 || init->type == NXT_PROCESS_PROTOTYPE)
507 {
508
509 nxt_process_port_each(process, port) {
510
511 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1,
512 0, 0, NULL);
513
514 } nxt_process_port_loop;
515 }
516
517 } nxt_runtime_process_loop;
518 }
519
520
521 static void
nxt_runtime_stop_all_processes(nxt_task_t * task,nxt_runtime_t * rt)522 nxt_runtime_stop_all_processes(nxt_task_t *task, nxt_runtime_t *rt)
523 {
524 nxt_port_t *port;
525 nxt_process_t *process;
526
527 nxt_runtime_process_each(rt, process) {
528
529 nxt_process_port_each(process, port) {
530
531 nxt_debug(task, "%d sending quit to %PI", rt->type, port->pid);
532
533 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0,
534 0, NULL);
535
536 } nxt_process_port_loop;
537
538 } nxt_runtime_process_loop;
539 }
540
541
542 static void
nxt_runtime_exit(nxt_task_t * task,void * obj,void * data)543 nxt_runtime_exit(nxt_task_t *task, void *obj, void *data)
544 {
545 int status, engine_count;
546 nxt_runtime_t *rt;
547 nxt_process_t *process;
548 nxt_event_engine_t *engine;
549
550 rt = obj;
551 engine = data;
552
553 nxt_debug(task, "thread pools: %d", rt->thread_pools->nelts);
554
555 if (!nxt_array_is_empty(rt->thread_pools)) {
556 return;
557 }
558
559 if (rt->type == NXT_PROCESS_MAIN) {
560 if (rt->pid_file != NULL) {
561 nxt_file_delete(rt->pid_file);
562 }
563
564 #if (NXT_HAVE_UNIX_DOMAIN)
565 {
566 size_t i;
567 nxt_sockaddr_t *sa;
568 nxt_file_name_t *name;
569
570 sa = rt->controller_listen;
571
572 if (sa->u.sockaddr.sa_family == AF_UNIX) {
573 name = (nxt_file_name_t *) sa->u.sockaddr_un.sun_path;
574 (void) nxt_file_delete(name);
575 }
576
577 for (i = 0; i < rt->listen_sockets->nelts; i++) {
578 nxt_listen_socket_t *ls;
579
580 ls = (nxt_listen_socket_t *) rt->listen_sockets->elts + i;
581 sa = ls->sockaddr;
582
583 if (sa->u.sockaddr.sa_family != AF_UNIX
584 || sa->u.sockaddr_un.sun_path[0] == '\0')
585 {
586 continue;
587 }
588
589 name = (nxt_file_name_t *) sa->u.sockaddr_un.sun_path;
590 (void) nxt_file_delete(name);
591 }
592 }
593 #endif
594 }
595
596 if (!engine->event.signal_support) {
597 nxt_event_engine_signals_stop(engine);
598 }
599
600 nxt_runtime_process_each(rt, process) {
601
602 nxt_runtime_process_remove(rt, process);
603 nxt_process_close_ports(task, process);
604
605 } nxt_runtime_process_loop;
606
607 status = rt->status;
608
609 engine_count = 0;
610
611 nxt_queue_each(engine, &rt->engines, nxt_event_engine_t, link) {
612
613 engine_count++;
614
615 } nxt_queue_loop;
616
617 if (engine_count <= 1) {
618 if (rt->port_by_type[rt->type] != NULL) {
619 nxt_port_use(task, rt->port_by_type[rt->type], -1);
620 }
621
622 nxt_thread_mutex_destroy(&rt->processes_mutex);
623
624 nxt_mp_destroy(rt->mem_pool);
625 }
626
627 nxt_debug(task, "exit: %d", status);
628
629 exit(status);
630 nxt_unreachable();
631 }
632
633
634 static nxt_int_t
nxt_runtime_event_engine_change(nxt_task_t * task,nxt_runtime_t * rt)635 nxt_runtime_event_engine_change(nxt_task_t *task, nxt_runtime_t *rt)
636 {
637 nxt_event_engine_t *engine;
638 const nxt_event_interface_t *interface;
639
640 engine = task->thread->engine;
641
642 if (engine->batch == rt->batch
643 && nxt_strcmp(engine->event.name, rt->engine) == 0)
644 {
645 return NXT_OK;
646 }
647
648 interface = nxt_service_get(rt->services, "engine", rt->engine);
649
650 if (interface != NULL) {
651 return nxt_event_engine_change(engine, interface, rt->batch);
652 }
653
654 return NXT_ERROR;
655 }
656
657
658 void
nxt_runtime_event_engine_free(nxt_runtime_t * rt)659 nxt_runtime_event_engine_free(nxt_runtime_t *rt)
660 {
661 nxt_queue_link_t *link;
662 nxt_event_engine_t *engine;
663
664 link = nxt_queue_first(&rt->engines);
665 nxt_queue_remove(link);
666
667 engine = nxt_queue_link_data(link, nxt_event_engine_t, link);
668 nxt_event_engine_free(engine);
669 }
670
671
672 nxt_int_t
nxt_runtime_thread_pool_create(nxt_thread_t * thr,nxt_runtime_t * rt,nxt_uint_t max_threads,nxt_nsec_t timeout)673 nxt_runtime_thread_pool_create(nxt_thread_t *thr, nxt_runtime_t *rt,
674 nxt_uint_t max_threads, nxt_nsec_t timeout)
675 {
676 nxt_thread_pool_t *thread_pool, **tp;
677
678 tp = nxt_array_add(rt->thread_pools);
679 if (tp == NULL) {
680 return NXT_ERROR;
681 }
682
683 thread_pool = nxt_thread_pool_create(max_threads, timeout,
684 nxt_runtime_thread_pool_init,
685 thr->engine,
686 nxt_runtime_thread_pool_exit);
687
688 if (nxt_fast_path(thread_pool != NULL)) {
689 *tp = thread_pool;
690 }
691
692 return NXT_OK;
693 }
694
695
696 static void
nxt_runtime_thread_pool_destroy(nxt_task_t * task,nxt_runtime_t * rt,nxt_runtime_cont_t cont)697 nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt,
698 nxt_runtime_cont_t cont)
699 {
700 nxt_uint_t n;
701 nxt_thread_pool_t **tp;
702
703 rt->continuation = cont;
704
705 n = rt->thread_pools->nelts;
706
707 if (n == 0) {
708 cont(task, 0);
709 return;
710 }
711
712 tp = rt->thread_pools->elts;
713
714 do {
715 nxt_thread_pool_destroy(*tp);
716
717 tp++;
718 n--;
719 } while (n != 0);
720 }
721
722
723 static void
nxt_runtime_thread_pool_init(void)724 nxt_runtime_thread_pool_init(void)
725 {
726 }
727
728
729 static void
nxt_runtime_thread_pool_exit(nxt_task_t * task,void * obj,void * data)730 nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj, void *data)
731 {
732 nxt_uint_t i, n;
733 nxt_runtime_t *rt;
734 nxt_thread_pool_t *tp, **thread_pools;
735 nxt_thread_handle_t handle;
736
737 tp = obj;
738
739 if (data != NULL) {
740 handle = (nxt_thread_handle_t) (uintptr_t) data;
741 nxt_thread_wait(handle);
742 }
743
744 rt = task->thread->runtime;
745
746 thread_pools = rt->thread_pools->elts;
747 n = rt->thread_pools->nelts;
748
749 nxt_debug(task, "thread pools: %ui", n);
750
751 for (i = 0; i < n; i++) {
752
753 if (tp == thread_pools[i]) {
754 nxt_array_remove(rt->thread_pools, &thread_pools[i]);
755
756 nxt_free(tp);
757
758 if (n == 1) {
759 /* The last thread pool. */
760 rt->continuation(task, 0);
761 }
762
763 return;
764 }
765 }
766 }
767
768
769 static nxt_int_t
nxt_runtime_conf_init(nxt_task_t * task,nxt_runtime_t * rt)770 nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt)
771 {
772 nxt_int_t ret;
773 nxt_str_t control;
774 nxt_uint_t n;
775 nxt_file_t *file;
776 const char *slash;
777 nxt_sockaddr_t *sa;
778 nxt_file_name_str_t file_name;
779 const nxt_event_interface_t *interface;
780
781 rt->daemon = 1;
782 rt->engine_connections = 256;
783 rt->auxiliary_threads = 2;
784 rt->user_cred.user = NXT_USER;
785 rt->group = NXT_GROUP;
786 rt->pid = NXT_PID;
787 rt->log = NXT_LOG;
788 rt->modules = NXT_MODULESDIR;
789 rt->state = NXT_STATEDIR;
790 rt->control = NXT_CONTROL_SOCK;
791 rt->tmp = NXT_TMPDIR;
792
793 nxt_memzero(&rt->capabilities, sizeof(nxt_capabilities_t));
794
795 if (nxt_runtime_conf_read_cmd(task, rt) != NXT_OK) {
796 return NXT_ERROR;
797 }
798
799 if (nxt_capability_set(task, &rt->capabilities) != NXT_OK) {
800 return NXT_ERROR;
801 }
802
803 if (rt->capabilities.setid) {
804 ret = nxt_credential_get(task, rt->mem_pool, &rt->user_cred,
805 rt->group);
806
807 if (nxt_slow_path(ret != NXT_OK)) {
808 return NXT_ERROR;
809 }
810
811 } else {
812 nxt_log(task, NXT_LOG_WARN, "Unit is running unprivileged, then it "
813 "cannot use arbitrary user and group.");
814 }
815
816 /* An engine's parameters. */
817
818 interface = nxt_service_get(rt->services, "engine", rt->engine);
819 if (interface == NULL) {
820 return NXT_ERROR;
821 }
822
823 rt->engine = interface->name;
824
825 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%Z", rt->pid);
826 if (nxt_slow_path(ret != NXT_OK)) {
827 return NXT_ERROR;
828 }
829
830 rt->pid_file = file_name.start;
831
832 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%Z", rt->log);
833 if (nxt_slow_path(ret != NXT_OK)) {
834 return NXT_ERROR;
835 }
836
837 file = nxt_list_first(rt->log_files);
838 file->name = file_name.start;
839
840 slash = "";
841 n = nxt_strlen(rt->modules);
842
843 if (n > 1 && rt->modules[n - 1] != '/') {
844 slash = "/";
845 }
846
847 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%s*.unit.so%Z",
848 rt->modules, slash);
849 if (nxt_slow_path(ret != NXT_OK)) {
850 return NXT_ERROR;
851 }
852
853 rt->modules = (char *) file_name.start;
854
855 slash = "";
856 n = nxt_strlen(rt->state);
857
858 if (n > 1 && rt->state[n - 1] != '/') {
859 slash = "/";
860 }
861
862 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%sversion%Z",
863 rt->state, slash);
864 if (nxt_slow_path(ret != NXT_OK)) {
865 return NXT_ERROR;
866 }
867
868 rt->ver = (char *) file_name.start;
869
870 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s.tmp%Z", rt->ver);
871 if (nxt_slow_path(ret != NXT_OK)) {
872 return NXT_ERROR;
873 }
874
875 rt->ver_tmp = (char *) file_name.start;
876
877 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%sconf.json%Z",
878 rt->state, slash);
879 if (nxt_slow_path(ret != NXT_OK)) {
880 return NXT_ERROR;
881 }
882
883 rt->conf = (char *) file_name.start;
884
885 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s.tmp%Z", rt->conf);
886 if (nxt_slow_path(ret != NXT_OK)) {
887 return NXT_ERROR;
888 }
889
890 rt->conf_tmp = (char *) file_name.start;
891
892 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%scerts/%Z",
893 rt->state, slash);
894 if (nxt_slow_path(ret != NXT_OK)) {
895 return NXT_ERROR;
896 }
897
898 ret = mkdir((char *) file_name.start, S_IRWXU);
899
900 if (nxt_fast_path(ret == 0 || nxt_errno == EEXIST)) {
901 rt->certs.length = file_name.len;
902 rt->certs.start = file_name.start;
903
904 } else {
905 nxt_alert(task, "Unable to create certificates storage directory: "
906 "mkdir(%s) failed %E", file_name.start, nxt_errno);
907 }
908
909 ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%sscripts/%Z",
910 rt->state, slash);
911 if (nxt_slow_path(ret != NXT_OK)) {
912 return NXT_ERROR;
913 }
914
915 ret = mkdir((char *) file_name.start, S_IRWXU);
916
917 if (nxt_fast_path(ret == 0 || nxt_errno == EEXIST)) {
918 rt->scripts.length = file_name.len;
919 rt->scripts.start = file_name.start;
920
921 } else {
922 nxt_alert(task, "Unable to create scripts storage directory: "
923 "mkdir(%s) failed %E", file_name.start, nxt_errno);
924 }
925
926 control.length = nxt_strlen(rt->control);
927 control.start = (u_char *) rt->control;
928
929 sa = nxt_sockaddr_parse(rt->mem_pool, &control);
930 if (nxt_slow_path(sa == NULL)) {
931 return NXT_ERROR;
932 }
933
934 sa->type = SOCK_STREAM;
935
936 rt->controller_listen = sa;
937
938 if (nxt_runtime_controller_socket(task, rt) != NXT_OK) {
939 return NXT_ERROR;
940 }
941
942 return NXT_OK;
943 }
944
945
946 static nxt_int_t
nxt_runtime_conf_read_cmd(nxt_task_t * task,nxt_runtime_t * rt)947 nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt)
948 {
949 char *p, **argv;
950 u_char *end;
951 u_char buf[1024];
952
953 static const char version[] =
954 "unit version: " NXT_VERSION "\n"
955 "configured as ./configure" NXT_CONFIGURE_OPTIONS "\n";
956
957 static const char no_control[] =
958 "option \"--control\" requires socket address\n";
959 static const char no_user[] = "option \"--user\" requires username\n";
960 static const char no_group[] = "option \"--group\" requires group name\n";
961 static const char no_pid[] = "option \"--pid\" requires filename\n";
962 static const char no_log[] = "option \"--log\" requires filename\n";
963 static const char no_modules[] =
964 "option \"--modulesdir\" requires directory\n";
965 static const char no_state[] =
966 "option \"--statedir\" requires directory\n";
967 static const char no_tmp[] = "option \"--tmpdir\" requires directory\n";
968
969 static const char modules_deprecated[] =
970 "option \"--modules\" is deprecated; use \"--modulesdir\" instead\n";
971 static const char state_deprecated[] =
972 "option \"--state\" is deprecated; use \"--statedir\" instead\n";
973 static const char tmp_deprecated[] =
974 "option \"--tmp\" is deprecated; use \"--tmpdir\" instead\n";
975
976 static const char help[] =
977 "\n"
978 "unit options:\n"
979 "\n"
980 " --version print unit version and configure options\n"
981 "\n"
982 " --no-daemon run unit in non-daemon mode\n"
983 "\n"
984 " --control ADDRESS set address of control API socket\n"
985 " default: \"" NXT_CONTROL_SOCK "\"\n"
986 "\n"
987 " --pid FILE set pid filename\n"
988 " default: \"" NXT_PID "\"\n"
989 "\n"
990 " --log FILE set log filename\n"
991 " default: \"" NXT_LOG "\"\n"
992 "\n"
993 " --modulesdir DIR set modules directory name\n"
994 " default: \"" NXT_MODULESDIR "\"\n"
995 "\n"
996 " --statedir DIR set state directory name\n"
997 " default: \"" NXT_STATEDIR "\"\n"
998 "\n"
999 " --tmpdir DIR set tmp directory name\n"
1000 " default: \"" NXT_TMPDIR "\"\n"
1001 "\n"
1002 " --modules DIR [deprecated] synonym for --modulesdir\n"
1003 " --state DIR [deprecated] synonym for --statedir\n"
1004 " --tmp DIR [deprecated] synonym for --tmpdir\n"
1005 "\n"
1006 " --user USER set non-privileged processes to run"
1007 " as specified user\n"
1008 " default: \"" NXT_USER "\"\n"
1009 "\n"
1010 " --group GROUP set non-privileged processes to run"
1011 " as specified group\n"
1012 " default: ";
1013
1014 static const char group[] = "\"" NXT_GROUP "\"\n\n";
1015 static const char primary[] = "user's primary group\n\n";
1016
1017 argv = &nxt_process_argv[1];
1018
1019 while (*argv != NULL) {
1020 p = *argv++;
1021
1022 if (nxt_strcmp(p, "--control") == 0) {
1023 if (*argv == NULL) {
1024 write(STDERR_FILENO, no_control, nxt_length(no_control));
1025 return NXT_ERROR;
1026 }
1027
1028 p = *argv++;
1029
1030 rt->control = p;
1031
1032 continue;
1033 }
1034
1035 if (nxt_strcmp(p, "--user") == 0) {
1036 if (*argv == NULL) {
1037 write(STDERR_FILENO, no_user, nxt_length(no_user));
1038 return NXT_ERROR;
1039 }
1040
1041 p = *argv++;
1042
1043 rt->user_cred.user = p;
1044
1045 continue;
1046 }
1047
1048 if (nxt_strcmp(p, "--group") == 0) {
1049 if (*argv == NULL) {
1050 write(STDERR_FILENO, no_group, nxt_length(no_group));
1051 return NXT_ERROR;
1052 }
1053
1054 p = *argv++;
1055
1056 rt->group = p;
1057
1058 continue;
1059 }
1060
1061 if (nxt_strcmp(p, "--pid") == 0) {
1062 if (*argv == NULL) {
1063 write(STDERR_FILENO, no_pid, nxt_length(no_pid));
1064 return NXT_ERROR;
1065 }
1066
1067 p = *argv++;
1068
1069 rt->pid = p;
1070
1071 continue;
1072 }
1073
1074 if (nxt_strcmp(p, "--log") == 0) {
1075 if (*argv == NULL) {
1076 write(STDERR_FILENO, no_log, nxt_length(no_log));
1077 return NXT_ERROR;
1078 }
1079
1080 p = *argv++;
1081
1082 rt->log = p;
1083
1084 continue;
1085 }
1086
1087 if (nxt_strcmp(p, "--modules") == 0) {
1088 write(STDERR_FILENO, modules_deprecated,
1089 nxt_length(modules_deprecated));
1090 goto modulesdir;
1091 }
1092
1093 if (nxt_strcmp(p, "--modulesdir") == 0) {
1094 modulesdir:
1095 if (*argv == NULL) {
1096 write(STDERR_FILENO, no_modules, nxt_length(no_modules));
1097 return NXT_ERROR;
1098 }
1099
1100 p = *argv++;
1101
1102 rt->modules = p;
1103
1104 continue;
1105 }
1106
1107 if (nxt_strcmp(p, "--state") == 0) {
1108 write(STDERR_FILENO, state_deprecated,
1109 nxt_length(state_deprecated));
1110 goto statedir;
1111 }
1112
1113 if (nxt_strcmp(p, "--statedir") == 0) {
1114 statedir:
1115 if (*argv == NULL) {
1116 write(STDERR_FILENO, no_state, nxt_length(no_state));
1117 return NXT_ERROR;
1118 }
1119
1120 p = *argv++;
1121
1122 rt->state = p;
1123
1124 continue;
1125 }
1126
1127 if (nxt_strcmp(p, "--tmp") == 0) {
1128 write(STDERR_FILENO, tmp_deprecated, nxt_length(tmp_deprecated));
1129 goto tmpdir;
1130 }
1131
1132 if (nxt_strcmp(p, "--tmpdir") == 0) {
1133 tmpdir:
1134 if (*argv == NULL) {
1135 write(STDERR_FILENO, no_tmp, nxt_length(no_tmp));
1136 return NXT_ERROR;
1137 }
1138
1139 p = *argv++;
1140
1141 rt->tmp = p;
1142
1143 continue;
1144 }
1145
1146 if (nxt_strcmp(p, "--no-daemon") == 0) {
1147 rt->daemon = 0;
1148 continue;
1149 }
1150
1151 if (nxt_strcmp(p, "--version") == 0) {
1152 write(STDERR_FILENO, version, nxt_length(version));
1153 exit(0);
1154 }
1155
1156 if (nxt_strcmp(p, "--help") == 0 || nxt_strcmp(p, "-h") == 0) {
1157 write(STDOUT_FILENO, help, nxt_length(help));
1158
1159 if (sizeof(NXT_GROUP) == 1) {
1160 write(STDOUT_FILENO, primary, nxt_length(primary));
1161
1162 } else {
1163 write(STDOUT_FILENO, group, nxt_length(group));
1164 }
1165
1166 exit(0);
1167 }
1168
1169 end = nxt_sprintf(buf, buf + sizeof(buf), "unknown option \"%s\", "
1170 "try \"%s -h\" for available options\n",
1171 p, nxt_process_argv[0]);
1172
1173 write(STDERR_FILENO, buf, end - buf);
1174
1175 return NXT_ERROR;
1176 }
1177
1178 return NXT_OK;
1179 }
1180
1181
1182 nxt_listen_socket_t *
nxt_runtime_listen_socket_add(nxt_runtime_t * rt,nxt_sockaddr_t * sa)1183 nxt_runtime_listen_socket_add(nxt_runtime_t *rt, nxt_sockaddr_t *sa)
1184 {
1185 nxt_mp_t *mp;
1186 nxt_listen_socket_t *ls;
1187
1188 ls = nxt_array_zero_add(rt->listen_sockets);
1189 if (ls == NULL) {
1190 return NULL;
1191 }
1192
1193 mp = rt->mem_pool;
1194
1195 ls->sockaddr = nxt_sockaddr_create(mp, &sa->u.sockaddr, sa->socklen,
1196 sa->length);
1197 if (ls->sockaddr == NULL) {
1198 return NULL;
1199 }
1200
1201 ls->sockaddr->type = sa->type;
1202
1203 nxt_sockaddr_text(ls->sockaddr);
1204
1205 ls->socket = -1;
1206 ls->backlog = NXT_LISTEN_BACKLOG;
1207
1208 return ls;
1209 }
1210
1211
1212 static nxt_int_t
nxt_runtime_hostname(nxt_task_t * task,nxt_runtime_t * rt)1213 nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt)
1214 {
1215 size_t length;
1216 char hostname[NXT_MAXHOSTNAMELEN + 1];
1217
1218 if (gethostname(hostname, NXT_MAXHOSTNAMELEN) != 0) {
1219 nxt_alert(task, "gethostname() failed %E", nxt_errno);
1220 return NXT_ERROR;
1221 }
1222
1223 /*
1224 * Linux gethostname(2):
1225 *
1226 * If the null-terminated hostname is too large to fit,
1227 * then the name is truncated, and no error is returned.
1228 *
1229 * For this reason an additional byte is reserved in the buffer.
1230 */
1231 hostname[NXT_MAXHOSTNAMELEN] = '\0';
1232
1233 length = nxt_strlen(hostname);
1234 rt->hostname.length = length;
1235
1236 rt->hostname.start = nxt_mp_nget(rt->mem_pool, length);
1237
1238 if (rt->hostname.start != NULL) {
1239 nxt_memcpy_lowcase(rt->hostname.start, (u_char *) hostname, length);
1240 return NXT_OK;
1241 }
1242
1243 return NXT_ERROR;
1244 }
1245
1246
1247 static nxt_int_t
nxt_runtime_log_files_init(nxt_runtime_t * rt)1248 nxt_runtime_log_files_init(nxt_runtime_t *rt)
1249 {
1250 nxt_file_t *file;
1251 nxt_list_t *log_files;
1252
1253 log_files = nxt_list_create(rt->mem_pool, 1, sizeof(nxt_file_t));
1254
1255 if (nxt_fast_path(log_files != NULL)) {
1256 rt->log_files = log_files;
1257
1258 /* Preallocate the main log. This allocation cannot fail. */
1259 file = nxt_list_zero_add(log_files);
1260
1261 file->fd = NXT_FILE_INVALID;
1262 file->log_level = NXT_LOG_ALERT;
1263
1264 return NXT_OK;
1265 }
1266
1267 return NXT_ERROR;
1268 }
1269
1270
1271 nxt_file_t *
nxt_runtime_log_file_add(nxt_runtime_t * rt,nxt_str_t * name)1272 nxt_runtime_log_file_add(nxt_runtime_t *rt, nxt_str_t *name)
1273 {
1274 nxt_int_t ret;
1275 nxt_file_t *file;
1276 nxt_file_name_str_t file_name;
1277
1278 ret = nxt_file_name_create(rt->mem_pool, &file_name, "V%Z", name);
1279
1280 if (nxt_slow_path(ret != NXT_OK)) {
1281 return NULL;
1282 }
1283
1284 nxt_list_each(file, rt->log_files) {
1285
1286 /* STUB: hardecoded case sensitive/insensitive. */
1287
1288 if (file->name != NULL
1289 && nxt_file_name_eq(file->name, file_name.start))
1290 {
1291 return file;
1292 }
1293
1294 } nxt_list_loop;
1295
1296 file = nxt_list_zero_add(rt->log_files);
1297
1298 if (nxt_slow_path(file == NULL)) {
1299 return NULL;
1300 }
1301
1302 file->fd = NXT_FILE_INVALID;
1303 file->log_level = NXT_LOG_ALERT;
1304 file->name = file_name.start;
1305
1306 return file;
1307 }
1308
1309
1310 static nxt_int_t
nxt_runtime_log_files_create(nxt_task_t * task,nxt_runtime_t * rt)1311 nxt_runtime_log_files_create(nxt_task_t *task, nxt_runtime_t *rt)
1312 {
1313 nxt_int_t ret;
1314 nxt_file_t *file;
1315
1316 nxt_list_each(file, rt->log_files) {
1317
1318 ret = nxt_file_open(task, file, O_WRONLY | O_APPEND, O_CREAT,
1319 NXT_FILE_OWNER_ACCESS);
1320
1321 if (ret != NXT_OK) {
1322 return NXT_ERROR;
1323 }
1324
1325 } nxt_list_loop;
1326
1327 file = nxt_list_first(rt->log_files);
1328
1329 return nxt_file_stderr(file);
1330 }
1331
1332
1333 nxt_int_t
nxt_runtime_listen_sockets_create(nxt_task_t * task,nxt_runtime_t * rt)1334 nxt_runtime_listen_sockets_create(nxt_task_t *task, nxt_runtime_t *rt)
1335 {
1336 nxt_int_t ret;
1337 nxt_uint_t c, p, ncurr, nprev;
1338 nxt_listen_socket_t *curr, *prev;
1339
1340 curr = rt->listen_sockets->elts;
1341 ncurr = rt->listen_sockets->nelts;
1342
1343 if (rt->inherited_sockets != NULL) {
1344 prev = rt->inherited_sockets->elts;
1345 nprev = rt->inherited_sockets->nelts;
1346
1347 } else {
1348 prev = NULL;
1349 nprev = 0;
1350 }
1351
1352 for (c = 0; c < ncurr; c++) {
1353
1354 for (p = 0; p < nprev; p++) {
1355
1356 if (nxt_sockaddr_cmp(curr[c].sockaddr, prev[p].sockaddr)) {
1357
1358 ret = nxt_listen_socket_update(task, &curr[c], &prev[p]);
1359 if (ret != NXT_OK) {
1360 return NXT_ERROR;
1361 }
1362
1363 goto next;
1364 }
1365 }
1366
1367 if (nxt_listen_socket_create(task, rt->mem_pool, &curr[c]) != NXT_OK) {
1368 return NXT_ERROR;
1369 }
1370
1371 next:
1372
1373 continue;
1374 }
1375
1376 return NXT_OK;
1377 }
1378
1379
1380 nxt_int_t
nxt_runtime_listen_sockets_enable(nxt_task_t * task,nxt_runtime_t * rt)1381 nxt_runtime_listen_sockets_enable(nxt_task_t *task, nxt_runtime_t *rt)
1382 {
1383 nxt_uint_t i, n;
1384 nxt_listen_socket_t *ls;
1385
1386 ls = rt->listen_sockets->elts;
1387 n = rt->listen_sockets->nelts;
1388
1389 for (i = 0; i < n; i++) {
1390 if (ls[i].flags == NXT_NONBLOCK) {
1391 if (nxt_listen_event(task, &ls[i]) == NULL) {
1392 return NXT_ERROR;
1393 }
1394 }
1395 }
1396
1397 return NXT_OK;
1398 }
1399
1400
1401 nxt_str_t *
nxt_current_directory(nxt_mp_t * mp)1402 nxt_current_directory(nxt_mp_t *mp)
1403 {
1404 size_t length;
1405 u_char *p;
1406 nxt_str_t *name;
1407 char buf[NXT_MAX_PATH_LEN];
1408
1409 length = nxt_dir_current(buf, NXT_MAX_PATH_LEN);
1410
1411 if (nxt_fast_path(length != 0)) {
1412 name = nxt_str_alloc(mp, length + 1);
1413
1414 if (nxt_fast_path(name != NULL)) {
1415 p = nxt_cpymem(name->start, buf, length);
1416 *p = '/';
1417
1418 return name;
1419 }
1420 }
1421
1422 return NULL;
1423 }
1424
1425
1426 static nxt_int_t
nxt_runtime_pid_file_create(nxt_task_t * task,nxt_file_name_t * pid_file)1427 nxt_runtime_pid_file_create(nxt_task_t *task, nxt_file_name_t *pid_file)
1428 {
1429 ssize_t length;
1430 nxt_int_t n;
1431 nxt_file_t file;
1432 u_char pid[NXT_INT64_T_LEN + nxt_length("\n")];
1433
1434 nxt_memzero(&file, sizeof(nxt_file_t));
1435
1436 file.name = pid_file;
1437
1438 nxt_fs_mkdir_parent(pid_file, 0755);
1439
1440 n = nxt_file_open(task, &file, O_WRONLY, O_CREAT | O_TRUNC,
1441 NXT_FILE_DEFAULT_ACCESS);
1442
1443 if (n != NXT_OK) {
1444 return NXT_ERROR;
1445 }
1446
1447 length = nxt_sprintf(pid, pid + sizeof(pid), "%PI%n", nxt_pid) - pid;
1448
1449 if (nxt_file_write(&file, pid, length, 0) != length) {
1450 return NXT_ERROR;
1451 }
1452
1453 nxt_file_close(task, &file);
1454
1455 return NXT_OK;
1456 }
1457
1458
1459 void
nxt_runtime_process_release(nxt_runtime_t * rt,nxt_process_t * process)1460 nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process)
1461 {
1462 nxt_process_t *child;
1463
1464 if (process->registered == 1) {
1465 nxt_runtime_process_remove(rt, process);
1466 }
1467
1468 if (process->link.next != NULL) {
1469 nxt_queue_remove(&process->link);
1470 }
1471
1472 nxt_queue_each(child, &process->children, nxt_process_t, link) {
1473 nxt_queue_remove(&child->link);
1474 child->link.next = NULL;
1475 } nxt_queue_loop;
1476
1477 nxt_assert(process->use_count == 0);
1478 nxt_assert(process->registered == 0);
1479 nxt_assert(nxt_queue_is_empty(&process->ports));
1480
1481 nxt_port_mmaps_destroy(&process->incoming, 1);
1482
1483 nxt_thread_mutex_destroy(&process->incoming.mutex);
1484
1485 /* processes from nxt_runtime_process_get() have no memory pool */
1486 if (process->mem_pool != NULL) {
1487 nxt_mp_destroy(process->mem_pool);
1488 }
1489
1490 nxt_mp_free(rt->mem_pool, process);
1491 }
1492
1493
1494 static nxt_int_t
nxt_runtime_lvlhsh_pid_test(nxt_lvlhsh_query_t * lhq,void * data)1495 nxt_runtime_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data)
1496 {
1497 nxt_process_t *process;
1498
1499 process = data;
1500
1501 if (lhq->key.length == sizeof(nxt_pid_t)
1502 && *(nxt_pid_t *) lhq->key.start == process->pid)
1503 {
1504 return NXT_OK;
1505 }
1506
1507 return NXT_DECLINED;
1508 }
1509
1510 static const nxt_lvlhsh_proto_t lvlhsh_processes_proto nxt_aligned(64) = {
1511 NXT_LVLHSH_DEFAULT,
1512 nxt_runtime_lvlhsh_pid_test,
1513 nxt_lvlhsh_alloc,
1514 nxt_lvlhsh_free,
1515 };
1516
1517
1518 nxt_inline void
nxt_runtime_process_lhq_pid(nxt_lvlhsh_query_t * lhq,nxt_pid_t * pid)1519 nxt_runtime_process_lhq_pid(nxt_lvlhsh_query_t *lhq, nxt_pid_t *pid)
1520 {
1521 lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid));
1522 lhq->key.length = sizeof(*pid);
1523 lhq->key.start = (u_char *) pid;
1524 lhq->proto = &lvlhsh_processes_proto;
1525 }
1526
1527
1528 nxt_process_t *
nxt_runtime_process_find(nxt_runtime_t * rt,nxt_pid_t pid)1529 nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid)
1530 {
1531 nxt_process_t *process;
1532 nxt_lvlhsh_query_t lhq;
1533
1534 process = NULL;
1535
1536 nxt_runtime_process_lhq_pid(&lhq, &pid);
1537
1538 nxt_thread_mutex_lock(&rt->processes_mutex);
1539
1540 if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) {
1541 process = lhq.value;
1542
1543 } else {
1544 nxt_thread_log_debug("process %PI not found", pid);
1545 }
1546
1547 nxt_thread_mutex_unlock(&rt->processes_mutex);
1548
1549 return process;
1550 }
1551
1552
1553 static nxt_process_t *
nxt_runtime_process_get(nxt_runtime_t * rt,nxt_pid_t pid)1554 nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid)
1555 {
1556 nxt_process_t *process;
1557 nxt_lvlhsh_query_t lhq;
1558
1559 nxt_runtime_process_lhq_pid(&lhq, &pid);
1560
1561 nxt_thread_mutex_lock(&rt->processes_mutex);
1562
1563 if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) {
1564 nxt_thread_log_debug("process %PI found", pid);
1565
1566 nxt_thread_mutex_unlock(&rt->processes_mutex);
1567
1568 process = lhq.value;
1569 process->use_count++;
1570
1571 return process;
1572 }
1573
1574 process = nxt_process_new(rt);
1575 if (nxt_slow_path(process == NULL)) {
1576
1577 nxt_thread_mutex_unlock(&rt->processes_mutex);
1578
1579 return NULL;
1580 }
1581
1582 process->pid = pid;
1583
1584 lhq.replace = 0;
1585 lhq.value = process;
1586 lhq.pool = rt->mem_pool;
1587
1588 switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) {
1589
1590 case NXT_OK:
1591 if (rt->nprocesses == 0) {
1592 rt->mprocess = process;
1593 }
1594
1595 rt->nprocesses++;
1596
1597 process->registered = 1;
1598
1599 nxt_thread_log_debug("process %PI insert", pid);
1600 break;
1601
1602 default:
1603 nxt_thread_log_debug("process %PI insert failed", pid);
1604 break;
1605 }
1606
1607 nxt_thread_mutex_unlock(&rt->processes_mutex);
1608
1609 return process;
1610 }
1611
1612
1613 void
nxt_runtime_process_add(nxt_task_t * task,nxt_process_t * process)1614 nxt_runtime_process_add(nxt_task_t *task, nxt_process_t *process)
1615 {
1616 nxt_port_t *port;
1617 nxt_runtime_t *rt;
1618 nxt_lvlhsh_query_t lhq;
1619
1620 nxt_assert(process->registered == 0);
1621
1622 rt = task->thread->runtime;
1623
1624 nxt_runtime_process_lhq_pid(&lhq, &process->pid);
1625
1626 lhq.replace = 0;
1627 lhq.value = process;
1628 lhq.pool = rt->mem_pool;
1629
1630 nxt_thread_mutex_lock(&rt->processes_mutex);
1631
1632 switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) {
1633
1634 case NXT_OK:
1635 if (rt->nprocesses == 0) {
1636 rt->mprocess = process;
1637 }
1638
1639 rt->nprocesses++;
1640
1641 nxt_process_port_each(process, port) {
1642
1643 port->pid = process->pid;
1644
1645 nxt_runtime_port_add(task, port);
1646
1647 } nxt_process_port_loop;
1648
1649 process->registered = 1;
1650
1651 nxt_debug(task, "process %PI added", process->pid);
1652 break;
1653
1654 default:
1655 nxt_alert(task, "process %PI failed to add", process->pid);
1656 break;
1657 }
1658
1659 nxt_thread_mutex_unlock(&rt->processes_mutex);
1660 }
1661
1662
1663 void
nxt_runtime_process_remove(nxt_runtime_t * rt,nxt_process_t * process)1664 nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process)
1665 {
1666 nxt_pid_t pid;
1667 nxt_lvlhsh_query_t lhq;
1668
1669 nxt_assert(process->registered != 0);
1670
1671 pid = process->pid;
1672
1673 nxt_runtime_process_lhq_pid(&lhq, &pid);
1674
1675 lhq.pool = rt->mem_pool;
1676
1677 nxt_thread_mutex_lock(&rt->processes_mutex);
1678
1679 switch (nxt_lvlhsh_delete(&rt->processes, &lhq)) {
1680
1681 case NXT_OK:
1682 nxt_assert(lhq.value == process);
1683
1684 rt->nprocesses--;
1685
1686 process->registered = 0;
1687
1688 nxt_thread_log_debug("process %PI removed", pid);
1689 break;
1690
1691 default:
1692 nxt_thread_log_alert("process %PI remove failed", pid);
1693 break;
1694 }
1695
1696 nxt_thread_mutex_unlock(&rt->processes_mutex);
1697 }
1698
1699
1700 nxt_process_t *
nxt_runtime_process_first(nxt_runtime_t * rt,nxt_lvlhsh_each_t * lhe)1701 nxt_runtime_process_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe)
1702 {
1703 nxt_lvlhsh_each_init(lhe, &lvlhsh_processes_proto);
1704
1705 return nxt_runtime_process_next(rt, lhe);
1706 }
1707
1708
1709 nxt_port_t *
nxt_runtime_process_port_create(nxt_task_t * task,nxt_runtime_t * rt,nxt_pid_t pid,nxt_port_id_t id,nxt_process_type_t type)1710 nxt_runtime_process_port_create(nxt_task_t *task, nxt_runtime_t *rt,
1711 nxt_pid_t pid, nxt_port_id_t id, nxt_process_type_t type)
1712 {
1713 nxt_port_t *port;
1714 nxt_process_t *process;
1715
1716 process = nxt_runtime_process_get(rt, pid);
1717 if (nxt_slow_path(process == NULL)) {
1718 return NULL;
1719 }
1720
1721 port = nxt_port_new(task, id, pid, type);
1722 if (nxt_slow_path(port == NULL)) {
1723 nxt_process_use(task, process, -1);
1724 return NULL;
1725 }
1726
1727 nxt_process_port_add(task, process, port);
1728
1729 nxt_process_use(task, process, -1);
1730
1731 nxt_runtime_port_add(task, port);
1732
1733 nxt_port_use(task, port, -1);
1734
1735 return port;
1736 }
1737
1738
1739 static void
nxt_runtime_port_add(nxt_task_t * task,nxt_port_t * port)1740 nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port)
1741 {
1742 nxt_int_t res;
1743 nxt_runtime_t *rt;
1744
1745 rt = task->thread->runtime;
1746
1747 res = nxt_port_hash_add(&rt->ports, port);
1748
1749 if (res != NXT_OK) {
1750 return;
1751 }
1752
1753 rt->port_by_type[port->type] = port;
1754
1755 nxt_port_use(task, port, 1);
1756 }
1757
1758
1759 void
nxt_runtime_port_remove(nxt_task_t * task,nxt_port_t * port)1760 nxt_runtime_port_remove(nxt_task_t *task, nxt_port_t *port)
1761 {
1762 nxt_int_t res;
1763 nxt_runtime_t *rt;
1764
1765 rt = task->thread->runtime;
1766
1767 res = nxt_port_hash_remove(&rt->ports, port);
1768
1769 if (res != NXT_OK) {
1770 return;
1771 }
1772
1773 if (rt->port_by_type[port->type] == port) {
1774 rt->port_by_type[port->type] = NULL;
1775 }
1776
1777 nxt_port_use(task, port, -1);
1778 }
1779
1780
1781 nxt_port_t *
nxt_runtime_port_find(nxt_runtime_t * rt,nxt_pid_t pid,nxt_port_id_t port_id)1782 nxt_runtime_port_find(nxt_runtime_t *rt, nxt_pid_t pid,
1783 nxt_port_id_t port_id)
1784 {
1785 return nxt_port_hash_find(&rt->ports, pid, port_id);
1786 }
1787