1
2 /*
3 * Copyright (C) Max Romanov
4 * Copyright (C) Igor Sysoev
5 * Copyright (C) Valentin V. Bartenev
6 * Copyright (C) NGINX, Inc.
7 */
8
9 #include <nxt_main.h>
10 #include <nxt_runtime.h>
11 #include <nxt_main_process.h>
12 #include <nxt_router.h>
13 #include <nxt_http.h>
14 #include <nxt_application.h>
15 #include <nxt_unit.h>
16 #include <nxt_port_memory_int.h>
17 #include <nxt_isolation.h>
18
19 #include <glob.h>
20
21 #if (NXT_HAVE_PR_SET_NO_NEW_PRIVS)
22 #include <sys/prctl.h>
23 #endif
24
25
26 #ifdef WCOREDUMP
27 #define NXT_WCOREDUMP(s) WCOREDUMP(s)
28 #else
29 #define NXT_WCOREDUMP(s) 0
30 #endif
31
32
33 typedef struct {
34 nxt_app_type_t type;
35 nxt_str_t version;
36 nxt_str_t file;
37 nxt_array_t *mounts;
38 } nxt_module_t;
39
40
41 static nxt_int_t nxt_discovery_start(nxt_task_t *task,
42 nxt_process_data_t *data);
43 static nxt_buf_t *nxt_discovery_modules(nxt_task_t *task, const char *path);
44 static nxt_int_t nxt_discovery_module(nxt_task_t *task, nxt_mp_t *mp,
45 nxt_array_t *modules, const char *name);
46 static void nxt_discovery_completion_handler(nxt_task_t *task, void *obj,
47 void *data);
48 static void nxt_discovery_quit(nxt_task_t *task, nxt_port_recv_msg_t *msg,
49 void *data);
50 static nxt_app_module_t *nxt_app_module_load(nxt_task_t *task,
51 const char *name);
52 static nxt_int_t nxt_proto_setup(nxt_task_t *task, nxt_process_t *process);
53 static nxt_int_t nxt_proto_start(nxt_task_t *task, nxt_process_data_t *data);
54 static nxt_int_t nxt_app_setup(nxt_task_t *task, nxt_process_t *process);
55 static nxt_int_t nxt_app_set_environment(nxt_conf_value_t *environment);
56 static void nxt_proto_start_process_handler(nxt_task_t *task,
57 nxt_port_recv_msg_t *msg);
58 static void nxt_proto_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
59 static void nxt_proto_process_created_handler(nxt_task_t *task,
60 nxt_port_recv_msg_t *msg);
61 static void nxt_proto_quit_children(nxt_task_t *task);
62 static nxt_process_t *nxt_proto_process_find(nxt_task_t *task, nxt_pid_t pid);
63 static void nxt_proto_process_add(nxt_task_t *task, nxt_process_t *process);
64 static nxt_process_t *nxt_proto_process_remove(nxt_task_t *task, nxt_pid_t pid);
65 static u_char *nxt_cstr_dup(nxt_mp_t *mp, u_char *dst, u_char *src);
66 static void nxt_proto_signal_handler(nxt_task_t *task, void *obj, void *data);
67 static void nxt_proto_sigterm_handler(nxt_task_t *task, void *obj, void *data);
68 static void nxt_proto_sigchld_handler(nxt_task_t *task, void *obj, void *data);
69
70
71 nxt_str_t nxt_server = nxt_string(NXT_SERVER);
72
73
74 static uint32_t compat[] = {
75 NXT_VERNUM, NXT_DEBUG,
76 };
77
78
79 static nxt_lvlhsh_t nxt_proto_processes;
80 static nxt_queue_t nxt_proto_children;
81 static nxt_bool_t nxt_proto_exiting;
82
83 static nxt_app_module_t *nxt_app;
84 static nxt_common_app_conf_t *nxt_app_conf;
85
86
87 static const nxt_port_handlers_t nxt_discovery_process_port_handlers = {
88 .quit = nxt_signal_quit_handler,
89 .new_port = nxt_port_new_port_handler,
90 .change_file = nxt_port_change_log_file_handler,
91 .mmap = nxt_port_mmap_handler,
92 .data = nxt_port_data_handler,
93 .remove_pid = nxt_port_remove_pid_handler,
94 .rpc_ready = nxt_port_rpc_handler,
95 .rpc_error = nxt_port_rpc_handler,
96 };
97
98
99 const nxt_sig_event_t nxt_prototype_signals[] = {
100 nxt_event_signal(SIGHUP, nxt_proto_signal_handler),
101 nxt_event_signal(SIGINT, nxt_proto_sigterm_handler),
102 nxt_event_signal(SIGQUIT, nxt_proto_sigterm_handler),
103 nxt_event_signal(SIGTERM, nxt_proto_sigterm_handler),
104 nxt_event_signal(SIGCHLD, nxt_proto_sigchld_handler),
105 nxt_event_signal_end,
106 };
107
108
109 static const nxt_port_handlers_t nxt_proto_process_port_handlers = {
110 .quit = nxt_proto_quit_handler,
111 .change_file = nxt_port_change_log_file_handler,
112 .new_port = nxt_port_new_port_handler,
113 .process_created = nxt_proto_process_created_handler,
114 .process_ready = nxt_port_process_ready_handler,
115 .remove_pid = nxt_port_remove_pid_handler,
116 .start_process = nxt_proto_start_process_handler,
117 .rpc_ready = nxt_port_rpc_handler,
118 .rpc_error = nxt_port_rpc_handler,
119 };
120
121
122 static const nxt_port_handlers_t nxt_app_process_port_handlers = {
123 .quit = nxt_signal_quit_handler,
124 .rpc_ready = nxt_port_rpc_handler,
125 .rpc_error = nxt_port_rpc_handler,
126 };
127
128
129 const nxt_process_init_t nxt_discovery_process = {
130 .name = "discovery",
131 .type = NXT_PROCESS_DISCOVERY,
132 .prefork = NULL,
133 .restart = 0,
134 .setup = nxt_process_core_setup,
135 .start = nxt_discovery_start,
136 .port_handlers = &nxt_discovery_process_port_handlers,
137 .signals = nxt_process_signals,
138 };
139
140
141 const nxt_process_init_t nxt_proto_process = {
142 .type = NXT_PROCESS_PROTOTYPE,
143 .prefork = nxt_isolation_main_prefork,
144 .restart = 0,
145 .setup = nxt_proto_setup,
146 .start = nxt_proto_start,
147 .port_handlers = &nxt_proto_process_port_handlers,
148 .signals = nxt_prototype_signals,
149 };
150
151
152 const nxt_process_init_t nxt_app_process = {
153 .type = NXT_PROCESS_APP,
154 .setup = nxt_app_setup,
155 .start = NULL,
156 .prefork = NULL,
157 .restart = 0,
158 .port_handlers = &nxt_app_process_port_handlers,
159 .signals = nxt_process_signals,
160 };
161
162
163 static nxt_int_t
nxt_discovery_start(nxt_task_t * task,nxt_process_data_t * data)164 nxt_discovery_start(nxt_task_t *task, nxt_process_data_t *data)
165 {
166 uint32_t stream;
167 nxt_buf_t *b;
168 nxt_int_t ret;
169 nxt_port_t *main_port, *discovery_port;
170 nxt_runtime_t *rt;
171
172 nxt_log(task, NXT_LOG_INFO, "discovery started");
173
174 rt = task->thread->runtime;
175
176 b = nxt_discovery_modules(task, rt->modules);
177 if (nxt_slow_path(b == NULL)) {
178 return NXT_ERROR;
179 }
180
181 main_port = rt->port_by_type[NXT_PROCESS_MAIN];
182 discovery_port = rt->port_by_type[NXT_PROCESS_DISCOVERY];
183
184 stream = nxt_port_rpc_register_handler(task, discovery_port,
185 nxt_discovery_quit,
186 nxt_discovery_quit,
187 main_port->pid, NULL);
188
189 if (nxt_slow_path(stream == 0)) {
190 return NXT_ERROR;
191 }
192
193 ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_MODULES, -1,
194 stream, discovery_port->id, b);
195
196 if (nxt_slow_path(ret != NXT_OK)) {
197 nxt_port_rpc_cancel(task, discovery_port, stream);
198 return NXT_ERROR;
199 }
200
201 return NXT_OK;
202 }
203
204
205 static nxt_buf_t *
nxt_discovery_modules(nxt_task_t * task,const char * path)206 nxt_discovery_modules(nxt_task_t *task, const char *path)
207 {
208 char *name;
209 u_char *p, *end;
210 size_t size;
211 glob_t glb;
212 nxt_mp_t *mp;
213 nxt_buf_t *b;
214 nxt_int_t ret;
215 nxt_uint_t i, n, j;
216 nxt_array_t *modules, *mounts;
217 nxt_module_t *module;
218 nxt_fs_mount_t *mnt;
219
220 b = NULL;
221
222 mp = nxt_mp_create(1024, 128, 256, 32);
223 if (mp == NULL) {
224 return b;
225 }
226
227 ret = glob(path, 0, NULL, &glb);
228
229 n = glb.gl_pathc;
230
231 if (ret != 0) {
232 nxt_log(task, NXT_LOG_NOTICE,
233 "no modules matching: \"%s\" found", path);
234 n = 0;
235 }
236
237 modules = nxt_array_create(mp, n, sizeof(nxt_module_t));
238 if (modules == NULL) {
239 goto fail;
240 }
241
242 for (i = 0; i < n; i++) {
243 name = glb.gl_pathv[i];
244
245 ret = nxt_discovery_module(task, mp, modules, name);
246 if (ret != NXT_OK) {
247 goto fail;
248 }
249 }
250
251 size = nxt_length("[]");
252 module = modules->elts;
253 n = modules->nelts;
254
255 for (i = 0; i < n; i++) {
256 nxt_debug(task, "module: %d %V %V",
257 module[i].type, &module[i].version, &module[i].file);
258
259 size += nxt_length("{\"type\": ,");
260 size += nxt_length(" \"version\": \"\",");
261 size += nxt_length(" \"file\": \"\",");
262 size += nxt_length(" \"mounts\": []},");
263
264 size += NXT_INT_T_LEN
265 + module[i].version.length
266 + module[i].file.length;
267
268 mounts = module[i].mounts;
269
270 size += mounts->nelts * nxt_length("{\"src\": \"\", \"dst\": \"\", "
271 "\"type\": , \"name\": \"\", "
272 "\"flags\": , \"data\": \"\"},");
273
274 mnt = mounts->elts;
275
276 for (j = 0; j < mounts->nelts; j++) {
277 size += nxt_strlen(mnt[j].src) + nxt_strlen(mnt[j].dst)
278 + nxt_strlen(mnt[j].name) + (2 * NXT_INT_T_LEN)
279 + (mnt[j].data == NULL ? 0 : nxt_strlen(mnt[j].data));
280 }
281 }
282
283 b = nxt_buf_mem_alloc(mp, size, 0);
284 if (b == NULL) {
285 goto fail;
286 }
287
288 b->completion_handler = nxt_discovery_completion_handler;
289
290 p = b->mem.free;
291 end = b->mem.end;
292 *p++ = '[';
293
294 for (i = 0; i < n; i++) {
295 mounts = module[i].mounts;
296
297 p = nxt_sprintf(p, end, "{\"type\": %d, \"version\": \"%V\", "
298 "\"file\": \"%V\", \"mounts\": [",
299 module[i].type, &module[i].version, &module[i].file);
300
301 mnt = mounts->elts;
302 for (j = 0; j < mounts->nelts; j++) {
303 p = nxt_sprintf(p, end,
304 "{\"src\": \"%s\", \"dst\": \"%s\", "
305 "\"name\": \"%s\", \"type\": %d, \"flags\": %d, "
306 "\"data\": \"%s\"},",
307 mnt[j].src, mnt[j].dst, mnt[j].name, mnt[j].type,
308 mnt[j].flags,
309 mnt[j].data == NULL ? (u_char *) "" : mnt[j].data);
310 }
311
312 *p++ = ']';
313 *p++ = '}';
314 *p++ = ',';
315 }
316
317 *p++ = ']';
318
319 if (nxt_slow_path(p > end)) {
320 nxt_alert(task, "discovery write past the buffer");
321 goto fail;
322 }
323
324 b->mem.free = p;
325
326 fail:
327
328 globfree(&glb);
329
330 return b;
331 }
332
333
334 static nxt_int_t
nxt_discovery_module(nxt_task_t * task,nxt_mp_t * mp,nxt_array_t * modules,const char * name)335 nxt_discovery_module(nxt_task_t *task, nxt_mp_t *mp, nxt_array_t *modules,
336 const char *name)
337 {
338 void *dl;
339 nxt_str_t version;
340 nxt_int_t ret;
341 nxt_uint_t i, j, n;
342 nxt_array_t *mounts;
343 nxt_module_t *module;
344 nxt_app_type_t type;
345 nxt_fs_mount_t *to;
346 nxt_app_module_t *app;
347 const nxt_fs_mount_t *from;
348
349 /*
350 * Only memory allocation failure should return NXT_ERROR.
351 * Any module processing errors are ignored.
352 */
353 ret = NXT_ERROR;
354
355 dl = dlopen(name, RTLD_GLOBAL | RTLD_NOW);
356
357 if (dl == NULL) {
358 nxt_alert(task, "dlopen(\"%s\"), failed: \"%s\"", name, dlerror());
359 return NXT_OK;
360 }
361
362 app = dlsym(dl, "nxt_app_module");
363
364 if (app != NULL) {
365 nxt_log(task, NXT_LOG_NOTICE, "module: %V %s \"%s\"",
366 &app->type, app->version, name);
367
368 if (app->compat_length != sizeof(compat)
369 || memcmp(app->compat, compat, sizeof(compat)) != 0)
370 {
371 nxt_log(task, NXT_LOG_NOTICE, "incompatible module %s", name);
372
373 goto done;
374 }
375
376 type = nxt_app_parse_type(app->type.start, app->type.length);
377
378 if (type == NXT_APP_UNKNOWN) {
379 nxt_log(task, NXT_LOG_NOTICE, "unknown module type %V", &app->type);
380
381 goto done;
382 }
383
384 module = modules->elts;
385 n = modules->nelts;
386
387 version.start = (u_char *) app->version;
388 version.length = nxt_strlen(app->version);
389
390 for (i = 0; i < n; i++) {
391 if (type == module[i].type
392 && nxt_strstr_eq(&module[i].version, &version))
393 {
394 nxt_log(task, NXT_LOG_NOTICE,
395 "ignoring %s module with the same "
396 "application language version %V %V as in %V",
397 name, &app->type, &version, &module[i].file);
398
399 goto done;
400 }
401 }
402
403 module = nxt_array_add(modules);
404 if (module == NULL) {
405 goto fail;
406 }
407
408 module->type = type;
409
410 nxt_str_dup(mp, &module->version, &version);
411 if (module->version.start == NULL) {
412 goto fail;
413 }
414
415 module->file.length = nxt_strlen(name);
416
417 module->file.start = nxt_mp_alloc(mp, module->file.length);
418 if (module->file.start == NULL) {
419 goto fail;
420 }
421
422 nxt_memcpy(module->file.start, name, module->file.length);
423
424 module->mounts = nxt_array_create(mp, app->nmounts,
425 sizeof(nxt_fs_mount_t));
426
427 if (nxt_slow_path(module->mounts == NULL)) {
428 goto fail;
429 }
430
431 mounts = module->mounts;
432
433 for (j = 0; j < app->nmounts; j++) {
434 from = &app->mounts[j];
435 to = nxt_array_zero_add(mounts);
436 if (nxt_slow_path(to == NULL)) {
437 goto fail;
438 }
439
440 to->src = nxt_cstr_dup(mp, to->src, from->src);
441 if (nxt_slow_path(to->src == NULL)) {
442 goto fail;
443 }
444
445 to->dst = nxt_cstr_dup(mp, to->dst, from->dst);
446 if (nxt_slow_path(to->dst == NULL)) {
447 goto fail;
448 }
449
450 to->name = nxt_cstr_dup(mp, to->name, from->name);
451 if (nxt_slow_path(to->name == NULL)) {
452 goto fail;
453 }
454
455 to->type = from->type;
456
457 if (from->data != NULL) {
458 to->data = nxt_cstr_dup(mp, to->data, from->data);
459 if (nxt_slow_path(to->data == NULL)) {
460 goto fail;
461 }
462 }
463
464 to->flags = from->flags;
465 }
466
467 } else {
468 nxt_alert(task, "dlsym(\"%s\"), failed: \"%s\"", name, dlerror());
469 }
470
471 done:
472
473 ret = NXT_OK;
474
475 fail:
476
477 if (dlclose(dl) != 0) {
478 nxt_alert(task, "dlclose(\"%s\"), failed: \"%s\"", name, dlerror());
479 }
480
481 return ret;
482 }
483
484
485 static void
nxt_discovery_completion_handler(nxt_task_t * task,void * obj,void * data)486 nxt_discovery_completion_handler(nxt_task_t *task, void *obj, void *data)
487 {
488 nxt_mp_t *mp;
489 nxt_buf_t *b;
490
491 b = obj;
492 mp = b->data;
493
494 nxt_mp_destroy(mp);
495 }
496
497
498 static void
nxt_discovery_quit(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)499 nxt_discovery_quit(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data)
500 {
501 nxt_signal_quit_handler(task, msg);
502 }
503
504
505 static nxt_int_t
nxt_proto_setup(nxt_task_t * task,nxt_process_t * process)506 nxt_proto_setup(nxt_task_t *task, nxt_process_t *process)
507 {
508 nxt_int_t ret;
509 nxt_app_lang_module_t *lang;
510 nxt_common_app_conf_t *app_conf;
511
512 app_conf = process->data.app;
513
514 nxt_queue_init(&nxt_proto_children);
515
516 nxt_app_conf = app_conf;
517
518 lang = nxt_app_lang_module(task->thread->runtime, &app_conf->type);
519 if (nxt_slow_path(lang == NULL)) {
520 nxt_alert(task, "unknown application type: \"%V\"", &app_conf->type);
521 return NXT_ERROR;
522 }
523
524 nxt_app = lang->module;
525
526 if (nxt_app == NULL) {
527 nxt_debug(task, "application language module: %s \"%s\"",
528 lang->version, lang->file);
529
530 nxt_app = nxt_app_module_load(task, lang->file);
531 if (nxt_slow_path(nxt_app == NULL)) {
532 return NXT_ERROR;
533 }
534 }
535
536 if (nxt_slow_path(nxt_app_set_environment(app_conf->environment)
537 != NXT_OK))
538 {
539 nxt_alert(task, "failed to set environment");
540 return NXT_ERROR;
541 }
542
543 if (nxt_app->setup != NULL) {
544 ret = nxt_app->setup(task, process, app_conf);
545 if (nxt_slow_path(ret != NXT_OK)) {
546 return ret;
547 }
548 }
549
550 #if (NXT_HAVE_ISOLATION_ROOTFS)
551 if (process->isolation.rootfs != NULL) {
552 if (process->isolation.mounts != NULL) {
553 ret = nxt_isolation_prepare_rootfs(task, process);
554 if (nxt_slow_path(ret != NXT_OK)) {
555 return ret;
556 }
557 }
558
559 ret = nxt_isolation_change_root(task, process);
560 if (nxt_slow_path(ret != NXT_OK)) {
561 return NXT_ERROR;
562 }
563 }
564 #endif
565
566 if (app_conf->working_directory != NULL
567 && app_conf->working_directory[0] != 0)
568 {
569 ret = chdir(app_conf->working_directory);
570
571 if (nxt_slow_path(ret != 0)) {
572 nxt_log(task, NXT_LOG_WARN, "chdir(%s) failed %E",
573 app_conf->working_directory, nxt_errno);
574
575 return NXT_ERROR;
576 }
577 }
578
579 process->state = NXT_PROCESS_STATE_CREATED;
580
581 return NXT_OK;
582 }
583
584
585 static nxt_int_t
nxt_proto_start(nxt_task_t * task,nxt_process_data_t * data)586 nxt_proto_start(nxt_task_t *task, nxt_process_data_t *data)
587 {
588 nxt_debug(task, "prototype waiting for clone messages");
589
590 return NXT_OK;
591 }
592
593
594 static void
nxt_proto_start_process_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)595 nxt_proto_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
596 {
597 u_char *p;
598 nxt_int_t ret;
599 nxt_port_t *port;
600 nxt_runtime_t *rt;
601 nxt_process_t *process;
602 nxt_process_init_t *init;
603
604 rt = task->thread->runtime;
605
606 process = nxt_process_new(rt);
607 if (nxt_slow_path(process == NULL)) {
608 goto failed;
609 }
610
611 process->mem_pool = nxt_mp_create(1024, 128, 256, 32);
612 if (nxt_slow_path(process->mem_pool == NULL)) {
613 nxt_process_use(task, process, -1);
614 goto failed;
615 }
616
617 process->parent_port = rt->port_by_type[NXT_PROCESS_PROTOTYPE];
618
619 init = nxt_process_init(process);
620 *init = nxt_app_process;
621
622 process->name = nxt_mp_alloc(process->mem_pool, nxt_app_conf->name.length
623 + sizeof("\"\" application") + 1);
624
625 if (nxt_slow_path(process->name == NULL)) {
626 nxt_process_use(task, process, -1);
627
628 goto failed;
629 }
630
631 init->start = nxt_app->start;
632
633 init->name = (const char *) nxt_app_conf->name.start;
634
635 p = (u_char *) process->name;
636 *p++ = '"';
637 p = nxt_cpymem(p, nxt_app_conf->name.start, nxt_app_conf->name.length);
638 p = nxt_cpymem(p, "\" application", 13);
639 *p = '\0';
640
641 process->user_cred = &rt->user_cred;
642
643 process->data.app = nxt_app_conf;
644 process->stream = msg->port_msg.stream;
645
646 init->siblings = &nxt_proto_children;
647
648 ret = nxt_process_start(task, process);
649 if (nxt_slow_path(ret == NXT_ERROR)) {
650 nxt_process_use(task, process, -1);
651
652 goto failed;
653 }
654
655 nxt_proto_process_add(task, process);
656
657 return;
658
659 failed:
660
661 port = nxt_runtime_port_find(rt, msg->port_msg.pid,
662 msg->port_msg.reply_port);
663
664 if (nxt_fast_path(port != NULL)) {
665 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
666 -1, msg->port_msg.stream, 0, NULL);
667 }
668 }
669
670
671 static void
nxt_proto_quit_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)672 nxt_proto_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
673 {
674 nxt_debug(task, "prototype quit handler");
675
676 nxt_proto_quit_children(task);
677
678 nxt_proto_exiting = 1;
679
680 if (nxt_queue_is_empty(&nxt_proto_children)) {
681 nxt_process_quit(task, 0);
682 }
683 }
684
685
686 static void
nxt_proto_quit_children(nxt_task_t * task)687 nxt_proto_quit_children(nxt_task_t *task)
688 {
689 nxt_port_t *port;
690 nxt_process_t *process;
691
692 nxt_queue_each(process, &nxt_proto_children, nxt_process_t, link) {
693 port = nxt_process_port_first(process);
694
695 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT,
696 -1, 0, 0, NULL);
697 }
698 nxt_queue_loop;
699 }
700
701
702 static void
nxt_proto_process_created_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)703 nxt_proto_process_created_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
704 {
705 nxt_pid_t isolated_pid, pid;
706 nxt_process_t *process;
707
708 isolated_pid = nxt_recv_msg_cmsg_pid(msg);
709
710 process = nxt_proto_process_find(task, isolated_pid);
711 if (nxt_slow_path(process == NULL)) {
712 return;
713 }
714
715 process->state = NXT_PROCESS_STATE_CREATED;
716
717 pid = msg->port_msg.pid;
718
719 if (process->pid != pid) {
720 nxt_debug(task, "app process %PI (aka %PI) is created", isolated_pid,
721 pid);
722
723 process->pid = pid;
724
725 } else {
726 nxt_debug(task, "app process %PI is created", isolated_pid);
727 }
728
729 if (!process->registered) {
730 nxt_assert(!nxt_queue_is_empty(&process->ports));
731
732 nxt_runtime_process_add(task, process);
733
734 nxt_port_use(task, nxt_process_port_first(process), -1);
735 }
736 }
737
738
739 static void
nxt_proto_signal_handler(nxt_task_t * task,void * obj,void * data)740 nxt_proto_signal_handler(nxt_task_t *task, void *obj, void *data)
741 {
742 nxt_trace(task, "signal signo:%d (%s) received, ignored",
743 (int) (uintptr_t) obj, data);
744 }
745
746
747 static void
nxt_proto_sigterm_handler(nxt_task_t * task,void * obj,void * data)748 nxt_proto_sigterm_handler(nxt_task_t *task, void *obj, void *data)
749 {
750 nxt_trace(task, "signal signo:%d (%s) received",
751 (int) (uintptr_t) obj, data);
752
753 nxt_proto_quit_children(task);
754
755 nxt_proto_exiting = 1;
756
757 if (nxt_queue_is_empty(&nxt_proto_children)) {
758 nxt_process_quit(task, 0);
759 }
760 }
761
762
763 static void
nxt_proto_sigchld_handler(nxt_task_t * task,void * obj,void * data)764 nxt_proto_sigchld_handler(nxt_task_t *task, void *obj, void *data)
765 {
766 int status;
767 nxt_err_t err;
768 nxt_pid_t pid;
769 nxt_port_t *port;
770 nxt_process_t *process;
771 nxt_runtime_t *rt;
772
773 rt = task->thread->runtime;
774
775 nxt_debug(task, "proto sigchld handler signo:%d (%s)",
776 (int) (uintptr_t) obj, data);
777
778 for ( ;; ) {
779 pid = waitpid(-1, &status, WNOHANG);
780
781 if (pid == -1) {
782
783 switch (err = nxt_errno) {
784
785 case NXT_ECHILD:
786 return;
787
788 case NXT_EINTR:
789 continue;
790
791 default:
792 nxt_alert(task, "waitpid() failed: %E", err);
793 return;
794 }
795 }
796
797 nxt_debug(task, "waitpid(): %PI", pid);
798
799 if (pid == 0) {
800 return;
801 }
802
803 process = nxt_proto_process_remove(task, pid);
804
805 if (WTERMSIG(status)) {
806 if (rt->is_pid_isolated) {
807 nxt_alert(task, "app process %PI (isolated %PI) "
808 "exited on signal %d%s",
809 process != NULL ? process->pid : 0,
810 pid, WTERMSIG(status),
811 NXT_WCOREDUMP(status) ? " (core dumped)" : "");
812
813 } else {
814 nxt_alert(task, "app process %PI exited on signal %d%s",
815 pid, WTERMSIG(status),
816 NXT_WCOREDUMP(status) ? " (core dumped)" : "");
817 }
818
819 } else {
820 if (rt->is_pid_isolated) {
821 nxt_trace(task, "app process %PI (isolated %PI) "
822 "exited with code %d",
823 process != NULL ? process->pid : 0,
824 pid, WEXITSTATUS(status));
825
826 } else {
827 nxt_trace(task, "app process %PI exited with code %d",
828 pid, WEXITSTATUS(status));
829 }
830 }
831
832 if (process == NULL) {
833 continue;
834 }
835
836 if (process->registered) {
837 port = NULL;
838
839 } else {
840 nxt_assert(!nxt_queue_is_empty(&process->ports));
841
842 port = nxt_process_port_first(process);
843 }
844
845 if (process->state != NXT_PROCESS_STATE_CREATING) {
846 nxt_port_remove_notify_others(task, process);
847 }
848
849 nxt_process_close_ports(task, process);
850
851 if (port != NULL) {
852 nxt_port_use(task, port, -1);
853 }
854
855 if (nxt_proto_exiting && nxt_queue_is_empty(&nxt_proto_children)) {
856 nxt_process_quit(task, 0);
857 return;
858 }
859 }
860 }
861
862
863 static nxt_app_module_t *
nxt_app_module_load(nxt_task_t * task,const char * name)864 nxt_app_module_load(nxt_task_t *task, const char *name)
865 {
866 char *err;
867 void *dl;
868 nxt_app_module_t *app;
869
870 dl = dlopen(name, RTLD_GLOBAL | RTLD_LAZY);
871
872 if (nxt_slow_path(dl == NULL)) {
873 err = dlerror();
874 nxt_alert(task, "dlopen(\"%s\") failed: \"%s\"",
875 name, err != NULL ? err : "(null)");
876 return NULL;
877 }
878
879 app = dlsym(dl, "nxt_app_module");
880
881 if (nxt_slow_path(app == NULL)) {
882 err = dlerror();
883 nxt_alert(task, "dlsym(\"%s\", \"nxt_app_module\") failed: \"%s\"",
884 name, err != NULL ? err : "(null)");
885
886 if (dlclose(dl) != 0) {
887 err = dlerror();
888 nxt_alert(task, "dlclose(\"%s\") failed: \"%s\"",
889 name, err != NULL ? err : "(null)");
890 }
891 }
892
893 return app;
894 }
895
896
897 static nxt_int_t
nxt_app_set_environment(nxt_conf_value_t * environment)898 nxt_app_set_environment(nxt_conf_value_t *environment)
899 {
900 char *env, *p;
901 uint32_t next;
902 nxt_str_t name, value;
903 nxt_conf_value_t *value_obj;
904
905 if (environment != NULL) {
906 next = 0;
907
908 for ( ;; ) {
909 value_obj = nxt_conf_next_object_member(environment, &name, &next);
910 if (value_obj == NULL) {
911 break;
912 }
913
914 nxt_conf_get_string(value_obj, &value);
915
916 env = nxt_malloc(name.length + value.length + 2);
917 if (nxt_slow_path(env == NULL)) {
918 return NXT_ERROR;
919 }
920
921 p = nxt_cpymem(env, name.start, name.length);
922 *p++ = '=';
923 p = nxt_cpymem(p, value.start, value.length);
924 *p = '\0';
925
926 if (nxt_slow_path(putenv(env) != 0)) {
927 return NXT_ERROR;
928 }
929 }
930 }
931
932 return NXT_OK;
933 }
934
935
936 nxt_int_t
nxt_app_set_logs(void)937 nxt_app_set_logs(void)
938 {
939 nxt_int_t ret;
940 nxt_file_t file;
941 nxt_task_t *task;
942 nxt_thread_t *thr;
943 nxt_process_t *process;
944 nxt_runtime_t *rt;
945 nxt_common_app_conf_t *app_conf;
946
947 thr = nxt_thread();
948
949 task = thr->task;
950
951 rt = task->thread->runtime;
952 if (!rt->daemon) {
953 return NXT_OK;
954 }
955
956 process = rt->port_by_type[NXT_PROCESS_PROTOTYPE]->process;
957 app_conf = process->data.app;
958
959 if (app_conf->stdout_log != NULL) {
960 nxt_memzero(&file, sizeof(nxt_file_t));
961 file.log_level = 1;
962 file.name = (u_char *) app_conf->stdout_log;
963 ret = nxt_file_open(task, &file, O_WRONLY | O_APPEND, O_CREAT, 0666);
964 if (ret == NXT_ERROR) {
965 return NXT_ERROR;
966 }
967
968 nxt_file_stdout(&file);
969 nxt_file_close(task, &file);
970 }
971
972 if (app_conf->stderr_log != NULL) {
973 nxt_memzero(&file, sizeof(nxt_file_t));
974 file.log_level = 1;
975 file.name = (u_char *) app_conf->stderr_log;
976 ret = nxt_file_open(task, &file, O_WRONLY | O_APPEND, O_CREAT, 0666);
977 if (ret == NXT_ERROR) {
978 return NXT_ERROR;
979 }
980
981 nxt_file_stderr(&file);
982 nxt_file_close(task, &file);
983 }
984
985 return NXT_OK;
986 }
987
988
989 static u_char *
nxt_cstr_dup(nxt_mp_t * mp,u_char * dst,u_char * src)990 nxt_cstr_dup(nxt_mp_t *mp, u_char *dst, u_char *src)
991 {
992 u_char *p;
993 size_t len;
994
995 len = nxt_strlen(src);
996
997 if (dst == NULL) {
998 dst = nxt_mp_alloc(mp, len + 1);
999 if (nxt_slow_path(dst == NULL)) {
1000 return NULL;
1001 }
1002 }
1003
1004 p = nxt_cpymem(dst, src, len);
1005 *p = '\0';
1006
1007 return dst;
1008 }
1009
1010
1011 static nxt_int_t
nxt_app_setup(nxt_task_t * task,nxt_process_t * process)1012 nxt_app_setup(nxt_task_t *task, nxt_process_t *process)
1013 {
1014 nxt_process_init_t *init;
1015
1016 process->state = NXT_PROCESS_STATE_CREATED;
1017
1018 init = nxt_process_init(process);
1019
1020 return init->start(task, &process->data);
1021 }
1022
1023
1024 nxt_app_lang_module_t *
nxt_app_lang_module(nxt_runtime_t * rt,nxt_str_t * name)1025 nxt_app_lang_module(nxt_runtime_t *rt, nxt_str_t *name)
1026 {
1027 u_char *p, *end, *version;
1028 size_t version_length;
1029 nxt_uint_t i, n;
1030 nxt_app_type_t type;
1031 nxt_app_lang_module_t *lang;
1032
1033 end = name->start + name->length;
1034 version = end;
1035
1036 for (p = name->start; p < end; p++) {
1037 if (*p == ' ') {
1038 version = p + 1;
1039 break;
1040 }
1041
1042 if (*p >= '0' && *p <= '9') {
1043 version = p;
1044 break;
1045 }
1046 }
1047
1048 type = nxt_app_parse_type(name->start, p - name->start);
1049
1050 if (type == NXT_APP_UNKNOWN) {
1051 return NULL;
1052 }
1053
1054 version_length = end - version;
1055
1056 lang = rt->languages->elts;
1057 n = rt->languages->nelts;
1058
1059 for (i = 0; i < n; i++) {
1060
1061 /*
1062 * Versions are sorted in descending order
1063 * so first match chooses the highest version.
1064 */
1065
1066 if (lang[i].type == type
1067 && nxt_strvers_match(lang[i].version, version, version_length))
1068 {
1069 return &lang[i];
1070 }
1071 }
1072
1073 return NULL;
1074 }
1075
1076
1077 nxt_app_type_t
nxt_app_parse_type(u_char * p,size_t length)1078 nxt_app_parse_type(u_char *p, size_t length)
1079 {
1080 nxt_str_t str;
1081
1082 str.length = length;
1083 str.start = p;
1084
1085 if (nxt_str_eq(&str, "external", 8) || nxt_str_eq(&str, "go", 2)) {
1086 return NXT_APP_EXTERNAL;
1087
1088 } else if (nxt_str_eq(&str, "python", 6)) {
1089 return NXT_APP_PYTHON;
1090
1091 } else if (nxt_str_eq(&str, "php", 3)) {
1092 return NXT_APP_PHP;
1093
1094 } else if (nxt_str_eq(&str, "perl", 4)) {
1095 return NXT_APP_PERL;
1096
1097 } else if (nxt_str_eq(&str, "ruby", 4)) {
1098 return NXT_APP_RUBY;
1099
1100 } else if (nxt_str_eq(&str, "java", 4)) {
1101 return NXT_APP_JAVA;
1102 }
1103
1104 return NXT_APP_UNKNOWN;
1105 }
1106
1107
1108 nxt_int_t
nxt_unit_default_init(nxt_task_t * task,nxt_unit_init_t * init,nxt_common_app_conf_t * conf)1109 nxt_unit_default_init(nxt_task_t *task, nxt_unit_init_t *init,
1110 nxt_common_app_conf_t *conf)
1111 {
1112 nxt_port_t *my_port, *proto_port, *router_port;
1113 nxt_runtime_t *rt;
1114
1115 nxt_memzero(init, sizeof(nxt_unit_init_t));
1116
1117 rt = task->thread->runtime;
1118
1119 proto_port = rt->port_by_type[NXT_PROCESS_PROTOTYPE];
1120 if (nxt_slow_path(proto_port == NULL)) {
1121 return NXT_ERROR;
1122 }
1123
1124 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
1125 if (nxt_slow_path(router_port == NULL)) {
1126 return NXT_ERROR;
1127 }
1128
1129 my_port = nxt_runtime_port_find(rt, nxt_pid, 0);
1130 if (nxt_slow_path(my_port == NULL)) {
1131 return NXT_ERROR;
1132 }
1133
1134 init->ready_port.id.pid = proto_port->pid;
1135 init->ready_port.id.id = proto_port->id;
1136 init->ready_port.in_fd = -1;
1137 init->ready_port.out_fd = proto_port->pair[1];
1138
1139 init->ready_stream = my_port->process->stream;
1140
1141 init->router_port.id.pid = router_port->pid;
1142 init->router_port.id.id = router_port->id;
1143 init->router_port.in_fd = -1;
1144 init->router_port.out_fd = router_port->pair[1];
1145
1146 init->read_port.id.pid = my_port->pid;
1147 init->read_port.id.id = my_port->id;
1148 init->read_port.in_fd = my_port->pair[0];
1149 init->read_port.out_fd = my_port->pair[1];
1150
1151 init->shared_port_fd = conf->shared_port_fd;
1152 init->shared_queue_fd = conf->shared_queue_fd;
1153
1154 init->log_fd = 2;
1155
1156 init->shm_limit = conf->shm_limit;
1157 init->request_limit = conf->request_limit;
1158
1159 return NXT_OK;
1160 }
1161
1162
1163 static nxt_int_t
nxt_proto_lvlhsh_isolated_pid_test(nxt_lvlhsh_query_t * lhq,void * data)1164 nxt_proto_lvlhsh_isolated_pid_test(nxt_lvlhsh_query_t *lhq, void *data)
1165 {
1166 nxt_pid_t *qpid;
1167 nxt_process_t *process;
1168
1169 process = data;
1170 qpid = (nxt_pid_t *) lhq->key.start;
1171
1172 if (*qpid == process->isolated_pid) {
1173 return NXT_OK;
1174 }
1175
1176 return NXT_DECLINED;
1177 }
1178
1179
1180 static const nxt_lvlhsh_proto_t lvlhsh_processes_proto nxt_aligned(64) = {
1181 NXT_LVLHSH_DEFAULT,
1182 nxt_proto_lvlhsh_isolated_pid_test,
1183 nxt_lvlhsh_alloc,
1184 nxt_lvlhsh_free,
1185 };
1186
1187
1188 nxt_inline void
nxt_proto_process_lhq_pid(nxt_lvlhsh_query_t * lhq,nxt_pid_t * pid)1189 nxt_proto_process_lhq_pid(nxt_lvlhsh_query_t *lhq, nxt_pid_t *pid)
1190 {
1191 lhq->key_hash = nxt_murmur_hash2(pid, sizeof(nxt_pid_t));
1192 lhq->key.length = sizeof(nxt_pid_t);
1193 lhq->key.start = (u_char *) pid;
1194 lhq->proto = &lvlhsh_processes_proto;
1195 }
1196
1197
1198 static void
nxt_proto_process_add(nxt_task_t * task,nxt_process_t * process)1199 nxt_proto_process_add(nxt_task_t *task, nxt_process_t *process)
1200 {
1201 nxt_runtime_t *rt;
1202 nxt_lvlhsh_query_t lhq;
1203
1204 rt = task->thread->runtime;
1205
1206 nxt_proto_process_lhq_pid(&lhq, &process->isolated_pid);
1207
1208 lhq.replace = 0;
1209 lhq.value = process;
1210 lhq.pool = rt->mem_pool;
1211
1212 switch (nxt_lvlhsh_insert(&nxt_proto_processes, &lhq)) {
1213
1214 case NXT_OK:
1215 nxt_debug(task, "process (isolated %PI) added", process->isolated_pid);
1216
1217 nxt_queue_insert_tail(&nxt_proto_children, &process->link);
1218 break;
1219
1220 default:
1221 nxt_alert(task, "process (isolated %PI) failed to add",
1222 process->isolated_pid);
1223 break;
1224 }
1225 }
1226
1227
1228 static nxt_process_t *
nxt_proto_process_remove(nxt_task_t * task,nxt_pid_t pid)1229 nxt_proto_process_remove(nxt_task_t *task, nxt_pid_t pid)
1230 {
1231 nxt_runtime_t *rt;
1232 nxt_process_t *process;
1233 nxt_lvlhsh_query_t lhq;
1234
1235 nxt_proto_process_lhq_pid(&lhq, &pid);
1236
1237 rt = task->thread->runtime;
1238
1239 lhq.pool = rt->mem_pool;
1240
1241 switch (nxt_lvlhsh_delete(&nxt_proto_processes, &lhq)) {
1242
1243 case NXT_OK:
1244 nxt_debug(task, "process (isolated %PI) removed", pid);
1245
1246 process = lhq.value;
1247
1248 nxt_queue_remove(&process->link);
1249 process->link.next = NULL;
1250
1251 break;
1252
1253 default:
1254 nxt_debug(task, "process (isolated %PI) remove failed", pid);
1255 process = NULL;
1256 break;
1257 }
1258
1259 return process;
1260 }
1261
1262
1263 static nxt_process_t *
nxt_proto_process_find(nxt_task_t * task,nxt_pid_t pid)1264 nxt_proto_process_find(nxt_task_t *task, nxt_pid_t pid)
1265 {
1266 nxt_process_t *process;
1267 nxt_lvlhsh_query_t lhq;
1268
1269 nxt_proto_process_lhq_pid(&lhq, &pid);
1270
1271 if (nxt_lvlhsh_find(&nxt_proto_processes, &lhq) == NXT_OK) {
1272 process = lhq.value;
1273
1274 } else {
1275 nxt_debug(task, "process (isolated %PI) not found", pid);
1276
1277 process = NULL;
1278 }
1279
1280 return process;
1281 }
1282