nxt_main_process.c (2050:d1298cc3f385) nxt_main_process.c (2078:0996dd223cdd)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8#include <nxt_runtime.h>
9#include <nxt_port.h>
10#include <nxt_main_process.h>
11#include <nxt_conf.h>
12#include <nxt_router.h>
13#include <nxt_port_queue.h>
14#if (NXT_TLS)
15#include <nxt_cert.h>
16#endif
17
18#include <sys/mount.h>
19
20
21typedef struct {
22 nxt_socket_t socket;
23 nxt_socket_error_t error;
24 u_char *start;
25 u_char *end;
26} nxt_listening_socket_t;
27
28
29typedef struct {
30 nxt_uint_t size;
31 nxt_conf_map_t *map;
32} nxt_conf_app_map_t;
33
34
35static nxt_int_t nxt_main_process_port_create(nxt_task_t *task,
36 nxt_runtime_t *rt);
37static void nxt_main_process_title(nxt_task_t *task);
38static void nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj,
39 void *data);
40static void nxt_main_process_sigquit_handler(nxt_task_t *task, void *obj,
41 void *data);
42static void nxt_main_process_sigusr1_handler(nxt_task_t *task, void *obj,
43 void *data);
44static void nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj,
45 void *data);
46static void nxt_main_process_signal_handler(nxt_task_t *task, void *obj,
47 void *data);
48static void nxt_main_process_cleanup(nxt_task_t *task, nxt_process_t *process);
49static void nxt_main_port_socket_handler(nxt_task_t *task,
50 nxt_port_recv_msg_t *msg);
51static nxt_int_t nxt_main_listening_socket(nxt_sockaddr_t *sa,
52 nxt_listening_socket_t *ls);
53static void nxt_main_port_modules_handler(nxt_task_t *task,
54 nxt_port_recv_msg_t *msg);
55static int nxt_cdecl nxt_app_lang_compare(const void *v1, const void *v2);
56static void nxt_main_process_whoami_handler(nxt_task_t *task,
57 nxt_port_recv_msg_t *msg);
58static void nxt_main_port_conf_store_handler(nxt_task_t *task,
59 nxt_port_recv_msg_t *msg);
60static nxt_int_t nxt_main_file_store(nxt_task_t *task, const char *tmp_name,
61 const char *name, u_char *buf, size_t size);
62static void nxt_main_port_access_log_handler(nxt_task_t *task,
63 nxt_port_recv_msg_t *msg);
64
65const nxt_sig_event_t nxt_main_process_signals[] = {
66 nxt_event_signal(SIGHUP, nxt_main_process_signal_handler),
67 nxt_event_signal(SIGINT, nxt_main_process_sigterm_handler),
68 nxt_event_signal(SIGQUIT, nxt_main_process_sigquit_handler),
69 nxt_event_signal(SIGTERM, nxt_main_process_sigterm_handler),
70 nxt_event_signal(SIGCHLD, nxt_main_process_sigchld_handler),
71 nxt_event_signal(SIGUSR1, nxt_main_process_sigusr1_handler),
72 nxt_event_signal_end,
73};
74
75
76nxt_uint_t nxt_conf_ver;
77
78static nxt_bool_t nxt_exiting;
79
80
81nxt_int_t
82nxt_main_process_start(nxt_thread_t *thr, nxt_task_t *task,
83 nxt_runtime_t *rt)
84{
85 rt->type = NXT_PROCESS_MAIN;
86
87 if (nxt_main_process_port_create(task, rt) != NXT_OK) {
88 return NXT_ERROR;
89 }
90
91 nxt_main_process_title(task);
92
93 /*
94 * The discovery process will send a message processed by
95 * nxt_main_port_modules_handler() which starts the controller
96 * and router processes.
97 */
98 return nxt_process_init_start(task, nxt_discovery_process);
99}
100
101
102static nxt_conf_map_t nxt_common_app_conf[] = {
103 {
104 nxt_string("type"),
105 NXT_CONF_MAP_STR,
106 offsetof(nxt_common_app_conf_t, type),
107 },
108
109 {
110 nxt_string("user"),
111 NXT_CONF_MAP_STR,
112 offsetof(nxt_common_app_conf_t, user),
113 },
114
115 {
116 nxt_string("group"),
117 NXT_CONF_MAP_STR,
118 offsetof(nxt_common_app_conf_t, group),
119 },
120
121 {
122 nxt_string("working_directory"),
123 NXT_CONF_MAP_CSTRZ,
124 offsetof(nxt_common_app_conf_t, working_directory),
125 },
126
127 {
128 nxt_string("environment"),
129 NXT_CONF_MAP_PTR,
130 offsetof(nxt_common_app_conf_t, environment),
131 },
132
133 {
134 nxt_string("isolation"),
135 NXT_CONF_MAP_PTR,
136 offsetof(nxt_common_app_conf_t, isolation),
137 },
138
139 {
140 nxt_string("limits"),
141 NXT_CONF_MAP_PTR,
142 offsetof(nxt_common_app_conf_t, limits),
143 },
144
145};
146
147
148static nxt_conf_map_t nxt_common_app_limits_conf[] = {
149 {
150 nxt_string("shm"),
151 NXT_CONF_MAP_SIZE,
152 offsetof(nxt_common_app_conf_t, shm_limit),
153 },
154
155 {
156 nxt_string("requests"),
157 NXT_CONF_MAP_INT32,
158 offsetof(nxt_common_app_conf_t, request_limit),
159 },
160
161};
162
163
164static nxt_conf_map_t nxt_external_app_conf[] = {
165 {
166 nxt_string("executable"),
167 NXT_CONF_MAP_CSTRZ,
168 offsetof(nxt_common_app_conf_t, u.external.executable),
169 },
170
171 {
172 nxt_string("arguments"),
173 NXT_CONF_MAP_PTR,
174 offsetof(nxt_common_app_conf_t, u.external.arguments),
175 },
176
177};
178
179
180static nxt_conf_map_t nxt_python_app_conf[] = {
181 {
182 nxt_string("home"),
183 NXT_CONF_MAP_CSTRZ,
184 offsetof(nxt_common_app_conf_t, u.python.home),
185 },
186
187 {
188 nxt_string("path"),
189 NXT_CONF_MAP_PTR,
190 offsetof(nxt_common_app_conf_t, u.python.path),
191 },
192
193 {
194 nxt_string("module"),
195 NXT_CONF_MAP_STR,
196 offsetof(nxt_common_app_conf_t, u.python.module),
197 },
198
199 {
200 nxt_string("callable"),
201 NXT_CONF_MAP_CSTRZ,
202 offsetof(nxt_common_app_conf_t, u.python.callable),
203 },
204
205 {
206 nxt_string("protocol"),
207 NXT_CONF_MAP_STR,
208 offsetof(nxt_common_app_conf_t, u.python.protocol),
209 },
210
211 {
212 nxt_string("threads"),
213 NXT_CONF_MAP_INT32,
214 offsetof(nxt_common_app_conf_t, u.python.threads),
215 },
216
217 {
218 nxt_string("targets"),
219 NXT_CONF_MAP_PTR,
220 offsetof(nxt_common_app_conf_t, u.python.targets),
221 },
222
223 {
224 nxt_string("thread_stack_size"),
225 NXT_CONF_MAP_INT32,
226 offsetof(nxt_common_app_conf_t, u.python.thread_stack_size),
227 },
228};
229
230
231static nxt_conf_map_t nxt_php_app_conf[] = {
232 {
233 nxt_string("targets"),
234 NXT_CONF_MAP_PTR,
235 offsetof(nxt_common_app_conf_t, u.php.targets),
236 },
237
238 {
239 nxt_string("options"),
240 NXT_CONF_MAP_PTR,
241 offsetof(nxt_common_app_conf_t, u.php.options),
242 },
243};
244
245
246static nxt_conf_map_t nxt_perl_app_conf[] = {
247 {
248 nxt_string("script"),
249 NXT_CONF_MAP_CSTRZ,
250 offsetof(nxt_common_app_conf_t, u.perl.script),
251 },
252
253 {
254 nxt_string("threads"),
255 NXT_CONF_MAP_INT32,
256 offsetof(nxt_common_app_conf_t, u.perl.threads),
257 },
258
259 {
260 nxt_string("thread_stack_size"),
261 NXT_CONF_MAP_INT32,
262 offsetof(nxt_common_app_conf_t, u.perl.thread_stack_size),
263 },
264};
265
266
267static nxt_conf_map_t nxt_ruby_app_conf[] = {
268 {
269 nxt_string("script"),
270 NXT_CONF_MAP_STR,
271 offsetof(nxt_common_app_conf_t, u.ruby.script),
272 },
273 {
274 nxt_string("threads"),
275 NXT_CONF_MAP_INT32,
276 offsetof(nxt_common_app_conf_t, u.ruby.threads),
277 },
278 {
279 nxt_string("hooks"),
280 NXT_CONF_MAP_STR,
281 offsetof(nxt_common_app_conf_t, u.ruby.hooks),
282 }
283};
284
285
286static nxt_conf_map_t nxt_java_app_conf[] = {
287 {
288 nxt_string("classpath"),
289 NXT_CONF_MAP_PTR,
290 offsetof(nxt_common_app_conf_t, u.java.classpath),
291 },
292 {
293 nxt_string("webapp"),
294 NXT_CONF_MAP_CSTRZ,
295 offsetof(nxt_common_app_conf_t, u.java.webapp),
296 },
297 {
298 nxt_string("options"),
299 NXT_CONF_MAP_PTR,
300 offsetof(nxt_common_app_conf_t, u.java.options),
301 },
302 {
303 nxt_string("unit_jars"),
304 NXT_CONF_MAP_CSTRZ,
305 offsetof(nxt_common_app_conf_t, u.java.unit_jars),
306 },
307 {
308 nxt_string("threads"),
309 NXT_CONF_MAP_INT32,
310 offsetof(nxt_common_app_conf_t, u.java.threads),
311 },
312 {
313 nxt_string("thread_stack_size"),
314 NXT_CONF_MAP_INT32,
315 offsetof(nxt_common_app_conf_t, u.java.thread_stack_size),
316 },
317
318};
319
320
321static nxt_conf_app_map_t nxt_app_maps[] = {
322 { nxt_nitems(nxt_external_app_conf), nxt_external_app_conf },
323 { nxt_nitems(nxt_python_app_conf), nxt_python_app_conf },
324 { nxt_nitems(nxt_php_app_conf), nxt_php_app_conf },
325 { nxt_nitems(nxt_perl_app_conf), nxt_perl_app_conf },
326 { nxt_nitems(nxt_ruby_app_conf), nxt_ruby_app_conf },
327 { nxt_nitems(nxt_java_app_conf), nxt_java_app_conf },
328};
329
330
331static void
332nxt_main_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
333{
334 nxt_debug(task, "main data: %*s",
335 nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos);
336}
337
338
339static void
340nxt_main_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
341{
342 void *mem;
343 nxt_port_t *port;
344
345 nxt_port_new_port_handler(task, msg);
346
347 port = msg->u.new_port;
348
349 if (port != NULL
350 && port->type == NXT_PROCESS_APP
351 && msg->fd[1] != -1)
352 {
353 mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
354 PROT_READ | PROT_WRITE, MAP_SHARED, msg->fd[1], 0);
355 if (nxt_fast_path(mem != MAP_FAILED)) {
356 port->queue = mem;
357 }
358
359 nxt_fd_close(msg->fd[1]);
360 msg->fd[1] = -1;
361 }
362}
363
364
365static void
366nxt_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
367{
368 u_char *start, *p, ch;
369 size_t type_len;
370 nxt_int_t ret;
371 nxt_buf_t *b;
372 nxt_port_t *port;
373 nxt_runtime_t *rt;
374 nxt_process_t *process;
375 nxt_app_type_t idx;
376 nxt_conf_value_t *conf;
377 nxt_process_init_t *init;
378 nxt_common_app_conf_t *app_conf;
379
380 rt = task->thread->runtime;
381
382 port = rt->port_by_type[NXT_PROCESS_ROUTER];
383 if (nxt_slow_path(port == NULL)) {
384 nxt_alert(task, "router port not found");
385 goto close_fds;
386 }
387
388 if (nxt_slow_path(port->pid != nxt_recv_msg_cmsg_pid(msg))) {
389 nxt_alert(task, "process %PI cannot start processes",
390 nxt_recv_msg_cmsg_pid(msg));
391
392 goto close_fds;
393 }
394
395 process = nxt_process_new(rt);
396 if (nxt_slow_path(process == NULL)) {
397 goto close_fds;
398 }
399
400 process->mem_pool = nxt_mp_create(1024, 128, 256, 32);
401 if (process->mem_pool == NULL) {
402 nxt_process_use(task, process, -1);
403 goto close_fds;
404 }
405
406 process->parent_port = rt->port_by_type[NXT_PROCESS_MAIN];
407
408 init = nxt_process_init(process);
409
410 *init = nxt_proto_process;
411
412 b = nxt_buf_chk_make_plain(process->mem_pool, msg->buf, msg->size);
413 if (b == NULL) {
414 goto failed;
415 }
416
417 nxt_debug(task, "main start prototype: %*s", b->mem.free - b->mem.pos,
418 b->mem.pos);
419
420 app_conf = nxt_mp_zalloc(process->mem_pool, sizeof(nxt_common_app_conf_t));
421 if (nxt_slow_path(app_conf == NULL)) {
422 goto failed;
423 }
424
425 app_conf->shared_port_fd = msg->fd[0];
426 app_conf->shared_queue_fd = msg->fd[1];
427
428 start = b->mem.pos;
429
430 app_conf->name.start = start;
431 app_conf->name.length = nxt_strlen(start);
432
433 init->name = (const char *) start;
434
435 process->name = nxt_mp_alloc(process->mem_pool, app_conf->name.length
436 + sizeof("\"\" prototype") + 1);
437
438 if (nxt_slow_path(process->name == NULL)) {
439 goto failed;
440 }
441
442 p = (u_char *) process->name;
443 *p++ = '"';
444 p = nxt_cpymem(p, init->name, app_conf->name.length);
445 p = nxt_cpymem(p, "\" prototype", 11);
446 *p = '\0';
447
448 app_conf->shm_limit = 100 * 1024 * 1024;
449 app_conf->request_limit = 0;
450
451 start += app_conf->name.length + 1;
452
453 conf = nxt_conf_json_parse(process->mem_pool, start, b->mem.free, NULL);
454 if (conf == NULL) {
455 nxt_alert(task, "router app configuration parsing error");
456
457 goto failed;
458 }
459
460 rt = task->thread->runtime;
461
462 app_conf->user.start = (u_char*)rt->user_cred.user;
463 app_conf->user.length = nxt_strlen(rt->user_cred.user);
464
465 ret = nxt_conf_map_object(process->mem_pool, conf, nxt_common_app_conf,
466 nxt_nitems(nxt_common_app_conf), app_conf);
467
468 if (ret != NXT_OK) {
469 nxt_alert(task, "failed to map common app conf received from router");
470 goto failed;
471 }
472
473 for (type_len = 0; type_len != app_conf->type.length; type_len++) {
474 ch = app_conf->type.start[type_len];
475
476 if (ch == ' ' || nxt_isdigit(ch)) {
477 break;
478 }
479 }
480
481 idx = nxt_app_parse_type(app_conf->type.start, type_len);
482
483 if (nxt_slow_path(idx >= nxt_nitems(nxt_app_maps))) {
484 nxt_alert(task, "invalid app type %d received from router", (int) idx);
485 goto failed;
486 }
487
488 ret = nxt_conf_map_object(process->mem_pool, conf, nxt_app_maps[idx].map,
489 nxt_app_maps[idx].size, app_conf);
490
491 if (nxt_slow_path(ret != NXT_OK)) {
492 nxt_alert(task, "failed to map app conf received from router");
493 goto failed;
494 }
495
496 if (app_conf->limits != NULL) {
497 ret = nxt_conf_map_object(process->mem_pool, app_conf->limits,
498 nxt_common_app_limits_conf,
499 nxt_nitems(nxt_common_app_limits_conf),
500 app_conf);
501
502 if (nxt_slow_path(ret != NXT_OK)) {
503 nxt_alert(task, "failed to map app limits received from router");
504 goto failed;
505 }
506 }
507
508 app_conf->self = conf;
509
510 process->stream = msg->port_msg.stream;
511 process->data.app = app_conf;
512
513 ret = nxt_process_start(task, process);
514 if (nxt_fast_path(ret == NXT_OK || ret == NXT_AGAIN)) {
515
516 /* Close shared port fds only in main process. */
517 if (ret == NXT_OK) {
518 nxt_fd_close(app_conf->shared_port_fd);
519 nxt_fd_close(app_conf->shared_queue_fd);
520 }
521
522 /* Avoid fds close in caller. */
523 msg->fd[0] = -1;
524 msg->fd[1] = -1;
525
526 return;
527 }
528
529failed:
530
531 nxt_process_use(task, process, -1);
532
533 port = nxt_runtime_port_find(rt, msg->port_msg.pid,
534 msg->port_msg.reply_port);
535
536 if (nxt_fast_path(port != NULL)) {
537 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
538 -1, msg->port_msg.stream, 0, NULL);
539 }
540
541close_fds:
542
543 nxt_fd_close(msg->fd[0]);
544 msg->fd[0] = -1;
545
546 nxt_fd_close(msg->fd[1]);
547 msg->fd[1] = -1;
548}
549
550
551static void
552nxt_main_process_created_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
553{
554 nxt_port_t *port;
555 nxt_process_t *process;
556 nxt_runtime_t *rt;
557
558 rt = task->thread->runtime;
559
560 port = nxt_runtime_port_find(rt, msg->port_msg.pid,
561 msg->port_msg.reply_port);
562 if (nxt_slow_path(port == NULL)) {
563 return;
564 }
565
566 process = port->process;
567
568 nxt_assert(process != NULL);
569 nxt_assert(process->state == NXT_PROCESS_STATE_CREATING);
570
571#if (NXT_HAVE_CLONE && NXT_HAVE_CLONE_NEWUSER)
572 if (nxt_is_clone_flag_set(process->isolation.clone.flags, NEWUSER)) {
573 if (nxt_slow_path(nxt_clone_credential_map(task, process->pid,
574 process->user_cred,
575 &process->isolation.clone)
576 != NXT_OK))
577 {
578 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
579 -1, msg->port_msg.stream, 0, NULL);
580 return;
581 }
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8#include <nxt_runtime.h>
9#include <nxt_port.h>
10#include <nxt_main_process.h>
11#include <nxt_conf.h>
12#include <nxt_router.h>
13#include <nxt_port_queue.h>
14#if (NXT_TLS)
15#include <nxt_cert.h>
16#endif
17
18#include <sys/mount.h>
19
20
21typedef struct {
22 nxt_socket_t socket;
23 nxt_socket_error_t error;
24 u_char *start;
25 u_char *end;
26} nxt_listening_socket_t;
27
28
29typedef struct {
30 nxt_uint_t size;
31 nxt_conf_map_t *map;
32} nxt_conf_app_map_t;
33
34
35static nxt_int_t nxt_main_process_port_create(nxt_task_t *task,
36 nxt_runtime_t *rt);
37static void nxt_main_process_title(nxt_task_t *task);
38static void nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj,
39 void *data);
40static void nxt_main_process_sigquit_handler(nxt_task_t *task, void *obj,
41 void *data);
42static void nxt_main_process_sigusr1_handler(nxt_task_t *task, void *obj,
43 void *data);
44static void nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj,
45 void *data);
46static void nxt_main_process_signal_handler(nxt_task_t *task, void *obj,
47 void *data);
48static void nxt_main_process_cleanup(nxt_task_t *task, nxt_process_t *process);
49static void nxt_main_port_socket_handler(nxt_task_t *task,
50 nxt_port_recv_msg_t *msg);
51static nxt_int_t nxt_main_listening_socket(nxt_sockaddr_t *sa,
52 nxt_listening_socket_t *ls);
53static void nxt_main_port_modules_handler(nxt_task_t *task,
54 nxt_port_recv_msg_t *msg);
55static int nxt_cdecl nxt_app_lang_compare(const void *v1, const void *v2);
56static void nxt_main_process_whoami_handler(nxt_task_t *task,
57 nxt_port_recv_msg_t *msg);
58static void nxt_main_port_conf_store_handler(nxt_task_t *task,
59 nxt_port_recv_msg_t *msg);
60static nxt_int_t nxt_main_file_store(nxt_task_t *task, const char *tmp_name,
61 const char *name, u_char *buf, size_t size);
62static void nxt_main_port_access_log_handler(nxt_task_t *task,
63 nxt_port_recv_msg_t *msg);
64
65const nxt_sig_event_t nxt_main_process_signals[] = {
66 nxt_event_signal(SIGHUP, nxt_main_process_signal_handler),
67 nxt_event_signal(SIGINT, nxt_main_process_sigterm_handler),
68 nxt_event_signal(SIGQUIT, nxt_main_process_sigquit_handler),
69 nxt_event_signal(SIGTERM, nxt_main_process_sigterm_handler),
70 nxt_event_signal(SIGCHLD, nxt_main_process_sigchld_handler),
71 nxt_event_signal(SIGUSR1, nxt_main_process_sigusr1_handler),
72 nxt_event_signal_end,
73};
74
75
76nxt_uint_t nxt_conf_ver;
77
78static nxt_bool_t nxt_exiting;
79
80
81nxt_int_t
82nxt_main_process_start(nxt_thread_t *thr, nxt_task_t *task,
83 nxt_runtime_t *rt)
84{
85 rt->type = NXT_PROCESS_MAIN;
86
87 if (nxt_main_process_port_create(task, rt) != NXT_OK) {
88 return NXT_ERROR;
89 }
90
91 nxt_main_process_title(task);
92
93 /*
94 * The discovery process will send a message processed by
95 * nxt_main_port_modules_handler() which starts the controller
96 * and router processes.
97 */
98 return nxt_process_init_start(task, nxt_discovery_process);
99}
100
101
102static nxt_conf_map_t nxt_common_app_conf[] = {
103 {
104 nxt_string("type"),
105 NXT_CONF_MAP_STR,
106 offsetof(nxt_common_app_conf_t, type),
107 },
108
109 {
110 nxt_string("user"),
111 NXT_CONF_MAP_STR,
112 offsetof(nxt_common_app_conf_t, user),
113 },
114
115 {
116 nxt_string("group"),
117 NXT_CONF_MAP_STR,
118 offsetof(nxt_common_app_conf_t, group),
119 },
120
121 {
122 nxt_string("working_directory"),
123 NXT_CONF_MAP_CSTRZ,
124 offsetof(nxt_common_app_conf_t, working_directory),
125 },
126
127 {
128 nxt_string("environment"),
129 NXT_CONF_MAP_PTR,
130 offsetof(nxt_common_app_conf_t, environment),
131 },
132
133 {
134 nxt_string("isolation"),
135 NXT_CONF_MAP_PTR,
136 offsetof(nxt_common_app_conf_t, isolation),
137 },
138
139 {
140 nxt_string("limits"),
141 NXT_CONF_MAP_PTR,
142 offsetof(nxt_common_app_conf_t, limits),
143 },
144
145};
146
147
148static nxt_conf_map_t nxt_common_app_limits_conf[] = {
149 {
150 nxt_string("shm"),
151 NXT_CONF_MAP_SIZE,
152 offsetof(nxt_common_app_conf_t, shm_limit),
153 },
154
155 {
156 nxt_string("requests"),
157 NXT_CONF_MAP_INT32,
158 offsetof(nxt_common_app_conf_t, request_limit),
159 },
160
161};
162
163
164static nxt_conf_map_t nxt_external_app_conf[] = {
165 {
166 nxt_string("executable"),
167 NXT_CONF_MAP_CSTRZ,
168 offsetof(nxt_common_app_conf_t, u.external.executable),
169 },
170
171 {
172 nxt_string("arguments"),
173 NXT_CONF_MAP_PTR,
174 offsetof(nxt_common_app_conf_t, u.external.arguments),
175 },
176
177};
178
179
180static nxt_conf_map_t nxt_python_app_conf[] = {
181 {
182 nxt_string("home"),
183 NXT_CONF_MAP_CSTRZ,
184 offsetof(nxt_common_app_conf_t, u.python.home),
185 },
186
187 {
188 nxt_string("path"),
189 NXT_CONF_MAP_PTR,
190 offsetof(nxt_common_app_conf_t, u.python.path),
191 },
192
193 {
194 nxt_string("module"),
195 NXT_CONF_MAP_STR,
196 offsetof(nxt_common_app_conf_t, u.python.module),
197 },
198
199 {
200 nxt_string("callable"),
201 NXT_CONF_MAP_CSTRZ,
202 offsetof(nxt_common_app_conf_t, u.python.callable),
203 },
204
205 {
206 nxt_string("protocol"),
207 NXT_CONF_MAP_STR,
208 offsetof(nxt_common_app_conf_t, u.python.protocol),
209 },
210
211 {
212 nxt_string("threads"),
213 NXT_CONF_MAP_INT32,
214 offsetof(nxt_common_app_conf_t, u.python.threads),
215 },
216
217 {
218 nxt_string("targets"),
219 NXT_CONF_MAP_PTR,
220 offsetof(nxt_common_app_conf_t, u.python.targets),
221 },
222
223 {
224 nxt_string("thread_stack_size"),
225 NXT_CONF_MAP_INT32,
226 offsetof(nxt_common_app_conf_t, u.python.thread_stack_size),
227 },
228};
229
230
231static nxt_conf_map_t nxt_php_app_conf[] = {
232 {
233 nxt_string("targets"),
234 NXT_CONF_MAP_PTR,
235 offsetof(nxt_common_app_conf_t, u.php.targets),
236 },
237
238 {
239 nxt_string("options"),
240 NXT_CONF_MAP_PTR,
241 offsetof(nxt_common_app_conf_t, u.php.options),
242 },
243};
244
245
246static nxt_conf_map_t nxt_perl_app_conf[] = {
247 {
248 nxt_string("script"),
249 NXT_CONF_MAP_CSTRZ,
250 offsetof(nxt_common_app_conf_t, u.perl.script),
251 },
252
253 {
254 nxt_string("threads"),
255 NXT_CONF_MAP_INT32,
256 offsetof(nxt_common_app_conf_t, u.perl.threads),
257 },
258
259 {
260 nxt_string("thread_stack_size"),
261 NXT_CONF_MAP_INT32,
262 offsetof(nxt_common_app_conf_t, u.perl.thread_stack_size),
263 },
264};
265
266
267static nxt_conf_map_t nxt_ruby_app_conf[] = {
268 {
269 nxt_string("script"),
270 NXT_CONF_MAP_STR,
271 offsetof(nxt_common_app_conf_t, u.ruby.script),
272 },
273 {
274 nxt_string("threads"),
275 NXT_CONF_MAP_INT32,
276 offsetof(nxt_common_app_conf_t, u.ruby.threads),
277 },
278 {
279 nxt_string("hooks"),
280 NXT_CONF_MAP_STR,
281 offsetof(nxt_common_app_conf_t, u.ruby.hooks),
282 }
283};
284
285
286static nxt_conf_map_t nxt_java_app_conf[] = {
287 {
288 nxt_string("classpath"),
289 NXT_CONF_MAP_PTR,
290 offsetof(nxt_common_app_conf_t, u.java.classpath),
291 },
292 {
293 nxt_string("webapp"),
294 NXT_CONF_MAP_CSTRZ,
295 offsetof(nxt_common_app_conf_t, u.java.webapp),
296 },
297 {
298 nxt_string("options"),
299 NXT_CONF_MAP_PTR,
300 offsetof(nxt_common_app_conf_t, u.java.options),
301 },
302 {
303 nxt_string("unit_jars"),
304 NXT_CONF_MAP_CSTRZ,
305 offsetof(nxt_common_app_conf_t, u.java.unit_jars),
306 },
307 {
308 nxt_string("threads"),
309 NXT_CONF_MAP_INT32,
310 offsetof(nxt_common_app_conf_t, u.java.threads),
311 },
312 {
313 nxt_string("thread_stack_size"),
314 NXT_CONF_MAP_INT32,
315 offsetof(nxt_common_app_conf_t, u.java.thread_stack_size),
316 },
317
318};
319
320
321static nxt_conf_app_map_t nxt_app_maps[] = {
322 { nxt_nitems(nxt_external_app_conf), nxt_external_app_conf },
323 { nxt_nitems(nxt_python_app_conf), nxt_python_app_conf },
324 { nxt_nitems(nxt_php_app_conf), nxt_php_app_conf },
325 { nxt_nitems(nxt_perl_app_conf), nxt_perl_app_conf },
326 { nxt_nitems(nxt_ruby_app_conf), nxt_ruby_app_conf },
327 { nxt_nitems(nxt_java_app_conf), nxt_java_app_conf },
328};
329
330
331static void
332nxt_main_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
333{
334 nxt_debug(task, "main data: %*s",
335 nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos);
336}
337
338
339static void
340nxt_main_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
341{
342 void *mem;
343 nxt_port_t *port;
344
345 nxt_port_new_port_handler(task, msg);
346
347 port = msg->u.new_port;
348
349 if (port != NULL
350 && port->type == NXT_PROCESS_APP
351 && msg->fd[1] != -1)
352 {
353 mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
354 PROT_READ | PROT_WRITE, MAP_SHARED, msg->fd[1], 0);
355 if (nxt_fast_path(mem != MAP_FAILED)) {
356 port->queue = mem;
357 }
358
359 nxt_fd_close(msg->fd[1]);
360 msg->fd[1] = -1;
361 }
362}
363
364
365static void
366nxt_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
367{
368 u_char *start, *p, ch;
369 size_t type_len;
370 nxt_int_t ret;
371 nxt_buf_t *b;
372 nxt_port_t *port;
373 nxt_runtime_t *rt;
374 nxt_process_t *process;
375 nxt_app_type_t idx;
376 nxt_conf_value_t *conf;
377 nxt_process_init_t *init;
378 nxt_common_app_conf_t *app_conf;
379
380 rt = task->thread->runtime;
381
382 port = rt->port_by_type[NXT_PROCESS_ROUTER];
383 if (nxt_slow_path(port == NULL)) {
384 nxt_alert(task, "router port not found");
385 goto close_fds;
386 }
387
388 if (nxt_slow_path(port->pid != nxt_recv_msg_cmsg_pid(msg))) {
389 nxt_alert(task, "process %PI cannot start processes",
390 nxt_recv_msg_cmsg_pid(msg));
391
392 goto close_fds;
393 }
394
395 process = nxt_process_new(rt);
396 if (nxt_slow_path(process == NULL)) {
397 goto close_fds;
398 }
399
400 process->mem_pool = nxt_mp_create(1024, 128, 256, 32);
401 if (process->mem_pool == NULL) {
402 nxt_process_use(task, process, -1);
403 goto close_fds;
404 }
405
406 process->parent_port = rt->port_by_type[NXT_PROCESS_MAIN];
407
408 init = nxt_process_init(process);
409
410 *init = nxt_proto_process;
411
412 b = nxt_buf_chk_make_plain(process->mem_pool, msg->buf, msg->size);
413 if (b == NULL) {
414 goto failed;
415 }
416
417 nxt_debug(task, "main start prototype: %*s", b->mem.free - b->mem.pos,
418 b->mem.pos);
419
420 app_conf = nxt_mp_zalloc(process->mem_pool, sizeof(nxt_common_app_conf_t));
421 if (nxt_slow_path(app_conf == NULL)) {
422 goto failed;
423 }
424
425 app_conf->shared_port_fd = msg->fd[0];
426 app_conf->shared_queue_fd = msg->fd[1];
427
428 start = b->mem.pos;
429
430 app_conf->name.start = start;
431 app_conf->name.length = nxt_strlen(start);
432
433 init->name = (const char *) start;
434
435 process->name = nxt_mp_alloc(process->mem_pool, app_conf->name.length
436 + sizeof("\"\" prototype") + 1);
437
438 if (nxt_slow_path(process->name == NULL)) {
439 goto failed;
440 }
441
442 p = (u_char *) process->name;
443 *p++ = '"';
444 p = nxt_cpymem(p, init->name, app_conf->name.length);
445 p = nxt_cpymem(p, "\" prototype", 11);
446 *p = '\0';
447
448 app_conf->shm_limit = 100 * 1024 * 1024;
449 app_conf->request_limit = 0;
450
451 start += app_conf->name.length + 1;
452
453 conf = nxt_conf_json_parse(process->mem_pool, start, b->mem.free, NULL);
454 if (conf == NULL) {
455 nxt_alert(task, "router app configuration parsing error");
456
457 goto failed;
458 }
459
460 rt = task->thread->runtime;
461
462 app_conf->user.start = (u_char*)rt->user_cred.user;
463 app_conf->user.length = nxt_strlen(rt->user_cred.user);
464
465 ret = nxt_conf_map_object(process->mem_pool, conf, nxt_common_app_conf,
466 nxt_nitems(nxt_common_app_conf), app_conf);
467
468 if (ret != NXT_OK) {
469 nxt_alert(task, "failed to map common app conf received from router");
470 goto failed;
471 }
472
473 for (type_len = 0; type_len != app_conf->type.length; type_len++) {
474 ch = app_conf->type.start[type_len];
475
476 if (ch == ' ' || nxt_isdigit(ch)) {
477 break;
478 }
479 }
480
481 idx = nxt_app_parse_type(app_conf->type.start, type_len);
482
483 if (nxt_slow_path(idx >= nxt_nitems(nxt_app_maps))) {
484 nxt_alert(task, "invalid app type %d received from router", (int) idx);
485 goto failed;
486 }
487
488 ret = nxt_conf_map_object(process->mem_pool, conf, nxt_app_maps[idx].map,
489 nxt_app_maps[idx].size, app_conf);
490
491 if (nxt_slow_path(ret != NXT_OK)) {
492 nxt_alert(task, "failed to map app conf received from router");
493 goto failed;
494 }
495
496 if (app_conf->limits != NULL) {
497 ret = nxt_conf_map_object(process->mem_pool, app_conf->limits,
498 nxt_common_app_limits_conf,
499 nxt_nitems(nxt_common_app_limits_conf),
500 app_conf);
501
502 if (nxt_slow_path(ret != NXT_OK)) {
503 nxt_alert(task, "failed to map app limits received from router");
504 goto failed;
505 }
506 }
507
508 app_conf->self = conf;
509
510 process->stream = msg->port_msg.stream;
511 process->data.app = app_conf;
512
513 ret = nxt_process_start(task, process);
514 if (nxt_fast_path(ret == NXT_OK || ret == NXT_AGAIN)) {
515
516 /* Close shared port fds only in main process. */
517 if (ret == NXT_OK) {
518 nxt_fd_close(app_conf->shared_port_fd);
519 nxt_fd_close(app_conf->shared_queue_fd);
520 }
521
522 /* Avoid fds close in caller. */
523 msg->fd[0] = -1;
524 msg->fd[1] = -1;
525
526 return;
527 }
528
529failed:
530
531 nxt_process_use(task, process, -1);
532
533 port = nxt_runtime_port_find(rt, msg->port_msg.pid,
534 msg->port_msg.reply_port);
535
536 if (nxt_fast_path(port != NULL)) {
537 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
538 -1, msg->port_msg.stream, 0, NULL);
539 }
540
541close_fds:
542
543 nxt_fd_close(msg->fd[0]);
544 msg->fd[0] = -1;
545
546 nxt_fd_close(msg->fd[1]);
547 msg->fd[1] = -1;
548}
549
550
551static void
552nxt_main_process_created_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
553{
554 nxt_port_t *port;
555 nxt_process_t *process;
556 nxt_runtime_t *rt;
557
558 rt = task->thread->runtime;
559
560 port = nxt_runtime_port_find(rt, msg->port_msg.pid,
561 msg->port_msg.reply_port);
562 if (nxt_slow_path(port == NULL)) {
563 return;
564 }
565
566 process = port->process;
567
568 nxt_assert(process != NULL);
569 nxt_assert(process->state == NXT_PROCESS_STATE_CREATING);
570
571#if (NXT_HAVE_CLONE && NXT_HAVE_CLONE_NEWUSER)
572 if (nxt_is_clone_flag_set(process->isolation.clone.flags, NEWUSER)) {
573 if (nxt_slow_path(nxt_clone_credential_map(task, process->pid,
574 process->user_cred,
575 &process->isolation.clone)
576 != NXT_OK))
577 {
578 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
579 -1, msg->port_msg.stream, 0, NULL);
580 return;
581 }
582 }
582 }
583
584#endif
585
586 process->state = NXT_PROCESS_STATE_CREATED;
587
588 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_READY_LAST,
589 -1, msg->port_msg.stream, 0, NULL);
590}
591
592
593static nxt_port_handlers_t nxt_main_process_port_handlers = {
594 .data = nxt_main_data_handler,
595 .new_port = nxt_main_new_port_handler,
596 .process_created = nxt_main_process_created_handler,
597 .process_ready = nxt_port_process_ready_handler,
598 .whoami = nxt_main_process_whoami_handler,
599 .remove_pid = nxt_port_remove_pid_handler,
600 .start_process = nxt_main_start_process_handler,
601 .socket = nxt_main_port_socket_handler,
602 .modules = nxt_main_port_modules_handler,
603 .conf_store = nxt_main_port_conf_store_handler,
604#if (NXT_TLS)
605 .cert_get = nxt_cert_store_get_handler,
606 .cert_delete = nxt_cert_store_delete_handler,
607#endif
608 .access_log = nxt_main_port_access_log_handler,
609 .rpc_ready = nxt_port_rpc_handler,
610 .rpc_error = nxt_port_rpc_handler,
611};
612
613
614static void
615nxt_main_process_whoami_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
616{
617 nxt_buf_t *buf;
618 nxt_pid_t pid, ppid;
619 nxt_port_t *port;
620 nxt_runtime_t *rt;
621 nxt_process_t *pprocess;
622
623 nxt_assert(msg->port_msg.reply_port == 0);
624
625 if (nxt_slow_path(msg->buf == NULL
626 || nxt_buf_used_size(msg->buf) != sizeof(nxt_pid_t)))
627 {
628 nxt_alert(task, "whoami: buffer is NULL or unexpected size");
629 goto fail;
630 }
631
632 nxt_memcpy(&ppid, msg->buf->mem.pos, sizeof(nxt_pid_t));
633
634 rt = task->thread->runtime;
635
636 pprocess = nxt_runtime_process_find(rt, ppid);
637 if (nxt_slow_path(pprocess == NULL)) {
638 nxt_alert(task, "whoami: parent process %PI not found", ppid);
639 goto fail;
640 }
641
642 pid = nxt_recv_msg_cmsg_pid(msg);
643
644 nxt_debug(task, "whoami: from %PI, parent %PI, fd %d", pid, ppid,
645 msg->fd[0]);
646
647 if (msg->fd[0] != -1) {
648 port = nxt_runtime_process_port_create(task, rt, pid, 0,
649 NXT_PROCESS_APP);
650 if (nxt_slow_path(port == NULL)) {
651 goto fail;
652 }
653
654 nxt_fd_nonblocking(task, msg->fd[0]);
655
656 port->pair[0] = -1;
657 port->pair[1] = msg->fd[0];
658 msg->fd[0] = -1;
659
660 port->max_size = 16 * 1024;
661 port->max_share = 64 * 1024;
662 port->socket.task = task;
663
664 nxt_port_write_enable(task, port);
665
666 } else {
667 port = nxt_runtime_port_find(rt, pid, 0);
668 if (nxt_slow_path(port == NULL)) {
669 goto fail;
670 }
671 }
672
673 if (ppid != nxt_pid) {
674 nxt_queue_insert_tail(&pprocess->children, &port->process->link);
675 }
676
677 buf = nxt_buf_mem_alloc(task->thread->engine->mem_pool,
678 sizeof(nxt_pid_t), 0);
679 if (nxt_slow_path(buf == NULL)) {
680 goto fail;
681 }
682
683 buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(nxt_pid_t));
684
685 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_READY_LAST, -1,
686 msg->port_msg.stream, 0, buf);
687
688fail:
689
690 if (msg->fd[0] != -1) {
691 nxt_fd_close(msg->fd[0]);
692 }
693}
694
695
696static nxt_int_t
697nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt)
698{
699 nxt_int_t ret;
700 nxt_port_t *port;
701 nxt_process_t *process;
702
703 port = nxt_runtime_process_port_create(task, rt, nxt_pid, 0,
704 NXT_PROCESS_MAIN);
705 if (nxt_slow_path(port == NULL)) {
706 return NXT_ERROR;
707 }
708
709 process = port->process;
710
711 ret = nxt_port_socket_init(task, port, 0);
712 if (nxt_slow_path(ret != NXT_OK)) {
713 nxt_port_use(task, port, -1);
714 return ret;
715 }
716
717 /*
718 * A main process port. A write port is not closed
719 * since it should be inherited by processes.
720 */
721 nxt_port_enable(task, port, &nxt_main_process_port_handlers);
722
723 process->state = NXT_PROCESS_STATE_READY;
724
725 return NXT_OK;
726}
727
728
729static void
730nxt_main_process_title(nxt_task_t *task)
731{
732 u_char *p, *end;
733 nxt_uint_t i;
734 u_char title[2048];
735
736 end = title + sizeof(title) - 1;
737
738 p = nxt_sprintf(title, end, "unit: main v" NXT_VERSION " [%s",
739 nxt_process_argv[0]);
740
741 for (i = 1; nxt_process_argv[i] != NULL; i++) {
742 p = nxt_sprintf(p, end, " %s", nxt_process_argv[i]);
743 }
744
745 if (p < end) {
746 *p++ = ']';
747 }
748
749 *p = '\0';
750
751 nxt_process_title(task, "%s", title);
752}
753
754
755static void
756nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj, void *data)
757{
758 nxt_debug(task, "sigterm handler signo:%d (%s)",
759 (int) (uintptr_t) obj, data);
760
761 /* TODO: fast exit. */
762
763 nxt_exiting = 1;
764
765 nxt_runtime_quit(task, 0);
766}
767
768
769static void
770nxt_main_process_sigquit_handler(nxt_task_t *task, void *obj, void *data)
771{
772 nxt_debug(task, "sigquit handler signo:%d (%s)",
773 (int) (uintptr_t) obj, data);
774
775 /* TODO: graceful exit. */
776
777 nxt_exiting = 1;
778
779 nxt_runtime_quit(task, 0);
780}
781
782
783static void
784nxt_main_process_sigusr1_handler(nxt_task_t *task, void *obj, void *data)
785{
786 nxt_mp_t *mp;
787 nxt_int_t ret;
788 nxt_uint_t n;
789 nxt_port_t *port;
790 nxt_file_t *file, *new_file;
791 nxt_array_t *new_files;
792 nxt_runtime_t *rt;
793
794 nxt_log(task, NXT_LOG_NOTICE, "signal %d (%s) recevied, %s",
795 (int) (uintptr_t) obj, data, "log files rotation");
796
797 rt = task->thread->runtime;
798
799 port = rt->port_by_type[NXT_PROCESS_ROUTER];
800
801 if (nxt_fast_path(port != NULL)) {
802 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_ACCESS_LOG,
803 -1, 0, 0, NULL);
804 }
805
806 mp = nxt_mp_create(1024, 128, 256, 32);
807 if (mp == NULL) {
808 return;
809 }
810
811 n = nxt_list_nelts(rt->log_files);
812
813 new_files = nxt_array_create(mp, n, sizeof(nxt_file_t));
814 if (new_files == NULL) {
815 nxt_mp_destroy(mp);
816 return;
817 }
818
819 nxt_list_each(file, rt->log_files) {
820
821 /* This allocation cannot fail. */
822 new_file = nxt_array_add(new_files);
823
824 new_file->name = file->name;
825 new_file->fd = NXT_FILE_INVALID;
826 new_file->log_level = NXT_LOG_ALERT;
827
828 ret = nxt_file_open(task, new_file, O_WRONLY | O_APPEND, O_CREAT,
829 NXT_FILE_OWNER_ACCESS);
830
831 if (ret != NXT_OK) {
832 goto fail;
833 }
834
835 } nxt_list_loop;
836
837 new_file = new_files->elts;
838
839 ret = nxt_file_stderr(&new_file[0]);
840
841 if (ret == NXT_OK) {
842 n = 0;
843
844 nxt_list_each(file, rt->log_files) {
845
846 nxt_port_change_log_file(task, rt, n, new_file[n].fd);
847 /*
848 * The old log file descriptor must be closed at the moment
849 * when no other threads use it. dup2() allows to use the
850 * old file descriptor for new log file. This change is
851 * performed atomically in the kernel.
852 */
853 (void) nxt_file_redirect(file, new_file[n].fd);
854
855 n++;
856
857 } nxt_list_loop;
858
859 nxt_mp_destroy(mp);
860 return;
861 }
862
863fail:
864
865 new_file = new_files->elts;
866 n = new_files->nelts;
867
868 while (n != 0) {
869 if (new_file->fd != NXT_FILE_INVALID) {
870 nxt_file_close(task, new_file);
871 }
872
873 new_file++;
874 n--;
875 }
876
877 nxt_mp_destroy(mp);
878}
879
880
881static void
882nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data)
883{
884 int status;
885 nxt_int_t ret;
886 nxt_err_t err;
887 nxt_pid_t pid;
888 nxt_port_t *port;
889 nxt_queue_t children;
890 nxt_runtime_t *rt;
891 nxt_process_t *process, *child;
892 nxt_process_init_t init;
893
894 nxt_debug(task, "sigchld handler signo:%d (%s)",
895 (int) (uintptr_t) obj, data);
896
897 rt = task->thread->runtime;
898
899 for ( ;; ) {
900 pid = waitpid(-1, &status, WNOHANG);
901
902 if (pid == -1) {
903
904 switch (err = nxt_errno) {
905
906 case NXT_ECHILD:
907 return;
908
909 case NXT_EINTR:
910 continue;
911
912 default:
913 nxt_alert(task, "waitpid() failed: %E", err);
914 return;
915 }
916 }
917
918 nxt_debug(task, "waitpid(): %PI", pid);
919
920 if (pid == 0) {
921 return;
922 }
923
924 if (WTERMSIG(status)) {
925#ifdef WCOREDUMP
926 nxt_alert(task, "process %PI exited on signal %d%s",
927 pid, WTERMSIG(status),
928 WCOREDUMP(status) ? " (core dumped)" : "");
929#else
930 nxt_alert(task, "process %PI exited on signal %d",
931 pid, WTERMSIG(status));
932#endif
933
934 } else {
935 nxt_trace(task, "process %PI exited with code %d",
936 pid, WEXITSTATUS(status));
937 }
938
939 process = nxt_runtime_process_find(rt, pid);
940
941 if (process != NULL) {
942 nxt_main_process_cleanup(task, process);
943
944 if (process->state == NXT_PROCESS_STATE_READY) {
945 process->stream = 0;
946 }
947
948 nxt_queue_init(&children);
949
950 if (!nxt_queue_is_empty(&process->children)) {
951 nxt_queue_add(&children, &process->children);
952
953 nxt_queue_init(&process->children);
954
955 nxt_queue_each(child, &children, nxt_process_t, link) {
956 port = nxt_process_port_first(child);
957
958 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT,
959 -1, 0, 0, NULL);
960 } nxt_queue_loop;
961 }
962
963 if (nxt_exiting) {
964 nxt_process_close_ports(task, process);
965
966 nxt_queue_each(child, &children, nxt_process_t, link) {
967 nxt_queue_remove(&child->link);
968 child->link.next = NULL;
969
970 nxt_process_close_ports(task, child);
971 } nxt_queue_loop;
972
973 if (rt->nprocesses <= 1) {
974 nxt_runtime_quit(task, 0);
975
976 return;
977 }
978
979 continue;
980 }
981
982 nxt_port_remove_notify_others(task, process);
983
984 nxt_queue_each(child, &children, nxt_process_t, link) {
985 nxt_port_remove_notify_others(task, child);
986
987 nxt_queue_remove(&child->link);
988 child->link.next = NULL;
989
990 nxt_process_close_ports(task, child);
991 } nxt_queue_loop;
992
993 init = *(nxt_process_init_t *) nxt_process_init(process);
994
995 nxt_process_close_ports(task, process);
996
997 if (init.restart) {
998 ret = nxt_process_init_start(task, init);
999 if (nxt_slow_path(ret == NXT_ERROR)) {
1000 nxt_alert(task, "failed to restart %s", init.name);
1001 }
1002 }
1003 }
1004 }
1005}
1006
1007
1008static void
1009nxt_main_process_signal_handler(nxt_task_t *task, void *obj, void *data)
1010{
1011 nxt_trace(task, "signal signo:%d (%s) recevied, ignored",
1012 (int) (uintptr_t) obj, data);
1013}
1014
1015
1016static void
1017nxt_main_process_cleanup(nxt_task_t *task, nxt_process_t *process)
1018{
1019 if (process->isolation.cleanup != NULL) {
1020 process->isolation.cleanup(task, process);
1021 }
1022}
1023
1024
1025static void
1026nxt_main_port_socket_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1027{
1028 size_t size;
1029 nxt_int_t ret;
1030 nxt_buf_t *b, *out;
1031 nxt_port_t *port;
1032 nxt_sockaddr_t *sa;
1033 nxt_port_msg_type_t type;
1034 nxt_listening_socket_t ls;
1035 u_char message[2048];
1036
1037 port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
1038 msg->port_msg.reply_port);
1039 if (nxt_slow_path(port == NULL)) {
1040 return;
1041 }
1042
1043 if (nxt_slow_path(port->type != NXT_PROCESS_ROUTER)) {
1044 nxt_alert(task, "process %PI cannot create listener sockets",
1045 msg->port_msg.pid);
1046
1047 return;
1048 }
1049
1050 b = msg->buf;
1051 sa = (nxt_sockaddr_t *) b->mem.pos;
1052
1053 /* TODO check b size and make plain */
1054
1055 ls.socket = -1;
1056 ls.error = NXT_SOCKET_ERROR_SYSTEM;
1057 ls.start = message;
1058 ls.end = message + sizeof(message);
1059
1060 nxt_debug(task, "listening socket \"%*s\"",
1061 (size_t) sa->length, nxt_sockaddr_start(sa));
1062
1063 ret = nxt_main_listening_socket(sa, &ls);
1064
1065 if (ret == NXT_OK) {
1066 nxt_debug(task, "socket(\"%*s\"): %d",
1067 (size_t) sa->length, nxt_sockaddr_start(sa), ls.socket);
1068
1069 out = NULL;
1070
1071 type = NXT_PORT_MSG_RPC_READY_LAST | NXT_PORT_MSG_CLOSE_FD;
1072
1073 } else {
1074 size = ls.end - ls.start;
1075
1076 nxt_alert(task, "%*s", size, ls.start);
1077
1078 out = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
1079 size + 1);
1080 if (nxt_fast_path(out != NULL)) {
1081 *out->mem.free++ = (uint8_t) ls.error;
1082
1083 out->mem.free = nxt_cpymem(out->mem.free, ls.start, size);
1084 }
1085
1086 type = NXT_PORT_MSG_RPC_ERROR;
1087 }
1088
1089 nxt_port_socket_write(task, port, type, ls.socket, msg->port_msg.stream,
1090 0, out);
1091}
1092
1093
1094static nxt_int_t
1095nxt_main_listening_socket(nxt_sockaddr_t *sa, nxt_listening_socket_t *ls)
1096{
1097 nxt_err_t err;
1098 nxt_socket_t s;
1099
1100 const socklen_t length = sizeof(int);
1101 static const int enable = 1;
1102
1103 s = socket(sa->u.sockaddr.sa_family, sa->type, 0);
1104
1105 if (nxt_slow_path(s == -1)) {
1106 err = nxt_errno;
1107
1108#if (NXT_INET6)
1109
1110 if (err == EAFNOSUPPORT && sa->u.sockaddr.sa_family == AF_INET6) {
1111 ls->error = NXT_SOCKET_ERROR_NOINET6;
1112 }
1113
1114#endif
1115
1116 ls->end = nxt_sprintf(ls->start, ls->end,
1117 "socket(\\\"%*s\\\") failed %E",
1118 (size_t) sa->length, nxt_sockaddr_start(sa), err);
1119
1120 return NXT_ERROR;
1121 }
1122
1123 if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &enable, length) != 0) {
1124 ls->end = nxt_sprintf(ls->start, ls->end,
1125 "setsockopt(\\\"%*s\\\", SO_REUSEADDR) failed %E",
1126 (size_t) sa->length, nxt_sockaddr_start(sa),
1127 nxt_errno);
1128 goto fail;
1129 }
1130
1131#if (NXT_INET6)
1132
1133 if (sa->u.sockaddr.sa_family == AF_INET6) {
1134
1135 if (setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &enable, length) != 0) {
1136 ls->end = nxt_sprintf(ls->start, ls->end,
1137 "setsockopt(\\\"%*s\\\", IPV6_V6ONLY) failed %E",
1138 (size_t) sa->length, nxt_sockaddr_start(sa),
1139 nxt_errno);
1140 goto fail;
1141 }
1142 }
1143
1144#endif
1145
1146 if (bind(s, &sa->u.sockaddr, sa->socklen) != 0) {
1147 err = nxt_errno;
1148
1149#if (NXT_HAVE_UNIX_DOMAIN)
1150
1151 if (sa->u.sockaddr.sa_family == AF_UNIX) {
1152 switch (err) {
1153
1154 case EACCES:
1155 ls->error = NXT_SOCKET_ERROR_ACCESS;
1156 break;
1157
1158 case ENOENT:
1159 case ENOTDIR:
1160 ls->error = NXT_SOCKET_ERROR_PATH;
1161 break;
1162 }
1163
1164 } else
1165#endif
1166 {
1167 switch (err) {
1168
1169 case EACCES:
1170 ls->error = NXT_SOCKET_ERROR_PORT;
1171 break;
1172
1173 case EADDRINUSE:
1174 ls->error = NXT_SOCKET_ERROR_INUSE;
1175 break;
1176
1177 case EADDRNOTAVAIL:
1178 ls->error = NXT_SOCKET_ERROR_NOADDR;
1179 break;
1180 }
1181 }
1182
1183 ls->end = nxt_sprintf(ls->start, ls->end, "bind(\\\"%*s\\\") failed %E",
1184 (size_t) sa->length, nxt_sockaddr_start(sa), err);
1185 goto fail;
1186 }
1187
1188#if (NXT_HAVE_UNIX_DOMAIN)
1189
1190 if (sa->u.sockaddr.sa_family == AF_UNIX) {
1191 char *filename;
1192 mode_t access;
1193
1194 filename = sa->u.sockaddr_un.sun_path;
1195 access = (S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
1196
1197 if (chmod(filename, access) != 0) {
1198 ls->end = nxt_sprintf(ls->start, ls->end,
1199 "chmod(\\\"%s\\\") failed %E",
1200 filename, nxt_errno);
1201 goto fail;
1202 }
1203 }
1204
1205#endif
1206
1207 ls->socket = s;
1208
1209 return NXT_OK;
1210
1211fail:
1212
1213 (void) close(s);
1214
1215 return NXT_ERROR;
1216}
1217
1218
1219static nxt_conf_map_t nxt_app_lang_module_map[] = {
1220 {
1221 nxt_string("type"),
1222 NXT_CONF_MAP_INT,
1223 offsetof(nxt_app_lang_module_t, type),
1224 },
1225
1226 {
1227 nxt_string("version"),
1228 NXT_CONF_MAP_CSTRZ,
1229 offsetof(nxt_app_lang_module_t, version),
1230 },
1231
1232 {
1233 nxt_string("file"),
1234 NXT_CONF_MAP_CSTRZ,
1235 offsetof(nxt_app_lang_module_t, file),
1236 },
1237};
1238
1239
1240static nxt_conf_map_t nxt_app_lang_mounts_map[] = {
1241 {
1242 nxt_string("src"),
1243 NXT_CONF_MAP_CSTRZ,
1244 offsetof(nxt_fs_mount_t, src),
1245 },
1246 {
1247 nxt_string("dst"),
1248 NXT_CONF_MAP_CSTRZ,
1249 offsetof(nxt_fs_mount_t, dst),
1250 },
1251 {
1252 nxt_string("name"),
1253 NXT_CONF_MAP_CSTRZ,
1254 offsetof(nxt_fs_mount_t, name),
1255 },
1256 {
1257 nxt_string("type"),
1258 NXT_CONF_MAP_INT,
1259 offsetof(nxt_fs_mount_t, type),
1260 },
1261 {
1262 nxt_string("flags"),
1263 NXT_CONF_MAP_INT,
1264 offsetof(nxt_fs_mount_t, flags),
1265 },
1266 {
1267 nxt_string("data"),
1268 NXT_CONF_MAP_CSTRZ,
1269 offsetof(nxt_fs_mount_t, data),
1270 },
1271};
1272
1273
1274static void
1275nxt_main_port_modules_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1276{
1277 uint32_t index, jindex, nmounts;
1278 nxt_mp_t *mp;
1279 nxt_int_t ret;
1280 nxt_buf_t *b;
1281 nxt_port_t *port;
1282 nxt_runtime_t *rt;
1283 nxt_fs_mount_t *mnt;
1284 nxt_conf_value_t *conf, *root, *value, *mounts;
1285 nxt_app_lang_module_t *lang;
1286
1287 static nxt_str_t root_path = nxt_string("/");
1288 static nxt_str_t mounts_name = nxt_string("mounts");
1289
1290 rt = task->thread->runtime;
1291
1292 if (msg->port_msg.pid != rt->port_by_type[NXT_PROCESS_DISCOVERY]->pid) {
1293 nxt_alert(task, "process %PI cannot send modules", msg->port_msg.pid);
1294 return;
1295 }
1296
1297 if (nxt_exiting) {
1298 nxt_debug(task, "ignoring discovered modules, exiting");
1299 return;
1300 }
1301
1302 port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
1303 msg->port_msg.reply_port);
1304
1305 if (nxt_fast_path(port != NULL)) {
1306 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
1307 msg->port_msg.stream, 0, NULL);
1308 }
1309
1310 b = msg->buf;
1311
1312 if (b == NULL) {
1313 return;
1314 }
1315
1316 mp = nxt_mp_create(1024, 128, 256, 32);
1317 if (mp == NULL) {
1318 return;
1319 }
1320
1321 b = nxt_buf_chk_make_plain(mp, b, msg->size);
1322
1323 if (b == NULL) {
1324 return;
1325 }
1326
1327 nxt_debug(task, "application languages: \"%*s\"",
1328 b->mem.free - b->mem.pos, b->mem.pos);
1329
1330 conf = nxt_conf_json_parse(mp, b->mem.pos, b->mem.free, NULL);
1331 if (conf == NULL) {
1332 goto fail;
1333 }
1334
1335 root = nxt_conf_get_path(conf, &root_path);
1336 if (root == NULL) {
1337 goto fail;
1338 }
1339
1340 for (index = 0; /* void */ ; index++) {
1341 value = nxt_conf_get_array_element(root, index);
1342 if (value == NULL) {
1343 break;
1344 }
1345
1346 lang = nxt_array_zero_add(rt->languages);
1347 if (lang == NULL) {
1348 goto fail;
1349 }
1350
1351 lang->module = NULL;
1352
1353 ret = nxt_conf_map_object(rt->mem_pool, value, nxt_app_lang_module_map,
1354 nxt_nitems(nxt_app_lang_module_map), lang);
1355
1356 if (ret != NXT_OK) {
1357 goto fail;
1358 }
1359
1360 mounts = nxt_conf_get_object_member(value, &mounts_name, NULL);
1361 if (mounts == NULL) {
1362 nxt_alert(task, "missing mounts from discovery message.");
1363 goto fail;
1364 }
1365
1366 if (nxt_conf_type(mounts) != NXT_CONF_ARRAY) {
1367 nxt_alert(task, "invalid mounts type from discovery message.");
1368 goto fail;
1369 }
1370
1371 nmounts = nxt_conf_array_elements_count(mounts);
1372
1373 lang->mounts = nxt_array_create(rt->mem_pool, nmounts,
1374 sizeof(nxt_fs_mount_t));
1375
1376 if (lang->mounts == NULL) {
1377 goto fail;
1378 }
1379
1380 for (jindex = 0; /* */; jindex++) {
1381 value = nxt_conf_get_array_element(mounts, jindex);
1382 if (value == NULL) {
1383 break;
1384 }
1385
1386 mnt = nxt_array_zero_add(lang->mounts);
1387 if (mnt == NULL) {
1388 goto fail;
1389 }
1390
1391 mnt->builtin = 1;
1392 mnt->deps = 1;
1393
1394 ret = nxt_conf_map_object(rt->mem_pool, value,
1395 nxt_app_lang_mounts_map,
1396 nxt_nitems(nxt_app_lang_mounts_map), mnt);
1397
1398 if (ret != NXT_OK) {
1399 goto fail;
1400 }
1401 }
1402
1403 nxt_debug(task, "lang %d %s \"%s\" (%d mounts)",
1404 lang->type, lang->version, lang->file, lang->mounts->nelts);
1405 }
1406
1407 qsort(rt->languages->elts, rt->languages->nelts,
1408 sizeof(nxt_app_lang_module_t), nxt_app_lang_compare);
1409
1410fail:
1411
1412 nxt_mp_destroy(mp);
1413
1414 ret = nxt_process_init_start(task, nxt_controller_process);
1415 if (ret == NXT_OK) {
1416 ret = nxt_process_init_start(task, nxt_router_process);
1417 }
1418
1419 if (nxt_slow_path(ret == NXT_ERROR)) {
1420 nxt_exiting = 1;
1421
1422 nxt_runtime_quit(task, 1);
1423 }
1424}
1425
1426
1427static int nxt_cdecl
1428nxt_app_lang_compare(const void *v1, const void *v2)
1429{
1430 int n;
1431 const nxt_app_lang_module_t *lang1, *lang2;
1432
1433 lang1 = v1;
1434 lang2 = v2;
1435
1436 n = lang1->type - lang2->type;
1437
1438 if (n != 0) {
1439 return n;
1440 }
1441
1442 n = nxt_strverscmp(lang1->version, lang2->version);
1443
1444 /* Negate result to move higher versions to the beginning. */
1445
1446 return -n;
1447}
1448
1449
1450static void
1451nxt_main_port_conf_store_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1452{
1453 void *p;
1454 size_t n, size;
1455 nxt_int_t ret;
1456 nxt_port_t *ctl_port;
1457 nxt_runtime_t *rt;
1458 u_char ver[NXT_INT_T_LEN];
1459
1460 rt = task->thread->runtime;
1461
1462 ctl_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
1463
1464 if (nxt_slow_path(msg->port_msg.pid != ctl_port->pid)) {
1465 nxt_alert(task, "process %PI cannot store conf", msg->port_msg.pid);
1466 return;
1467 }
1468
1469 p = MAP_FAILED;
1470
1471 /*
1472 * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be
1473 * initialized in 'cleanup' section.
1474 */
1475 size = 0;
1476
1477 if (nxt_slow_path(msg->fd[0] == -1)) {
1478 nxt_alert(task, "conf_store_handler: invalid shm fd");
1479 goto error;
1480 }
1481
1482 if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) {
1483 nxt_alert(task, "conf_store_handler: unexpected buffer size (%d)",
1484 (int) nxt_buf_mem_used_size(&msg->buf->mem));
1485 goto error;
1486 }
1487
1488 nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t));
1489
1490 p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0);
1491
1492 nxt_fd_close(msg->fd[0]);
1493 msg->fd[0] = -1;
1494
1495 if (nxt_slow_path(p == MAP_FAILED)) {
1496 goto error;
1497 }
1498
1499 nxt_debug(task, "conf_store_handler(%uz): %*s", size, size, p);
1500
1501 if (nxt_conf_ver != NXT_VERNUM) {
1502 n = nxt_sprintf(ver, ver + NXT_INT_T_LEN, "%d", NXT_VERNUM) - ver;
1503
1504 ret = nxt_main_file_store(task, rt->ver_tmp, rt->ver, ver, n);
1505 if (nxt_slow_path(ret != NXT_OK)) {
1506 goto error;
1507 }
1508
1509 nxt_conf_ver = NXT_VERNUM;
1510 }
1511
1512 ret = nxt_main_file_store(task, rt->conf_tmp, rt->conf, p, size);
1513
1514 if (nxt_fast_path(ret == NXT_OK)) {
1515 goto cleanup;
1516 }
1517
1518error:
1519
1520 nxt_alert(task, "failed to store current configuration");
1521
1522cleanup:
1523
1524 if (p != MAP_FAILED) {
1525 nxt_mem_munmap(p, size);
1526 }
1527
1528 if (msg->fd[0] != -1) {
1529 nxt_fd_close(msg->fd[0]);
1530 msg->fd[0] = -1;
1531 }
1532}
1533
1534
1535static nxt_int_t
1536nxt_main_file_store(nxt_task_t *task, const char *tmp_name, const char *name,
1537 u_char *buf, size_t size)
1538{
1539 ssize_t n;
1540 nxt_int_t ret;
1541 nxt_file_t file;
1542
1543 nxt_memzero(&file, sizeof(nxt_file_t));
1544
1545 file.name = (nxt_file_name_t *) name;
1546
1547 ret = nxt_file_open(task, &file, NXT_FILE_WRONLY, NXT_FILE_TRUNCATE,
1548 NXT_FILE_OWNER_ACCESS);
1549 if (nxt_slow_path(ret != NXT_OK)) {
1550 return NXT_ERROR;
1551 }
1552
1553 n = nxt_file_write(&file, buf, size, 0);
1554
1555 nxt_file_close(task, &file);
1556
1557 if (nxt_slow_path(n != (ssize_t) size)) {
1558 (void) nxt_file_delete(file.name);
1559 return NXT_ERROR;
1560 }
1561
1562 return nxt_file_rename(file.name, (nxt_file_name_t *) name);
1563}
1564
1565
1566static void
1567nxt_main_port_access_log_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1568{
1569 u_char *path;
1570 nxt_int_t ret;
1571 nxt_file_t file;
1572 nxt_port_t *port;
1573 nxt_port_msg_type_t type;
1574
1575 nxt_debug(task, "opening access log file");
1576
1577 path = msg->buf->mem.pos;
1578
1579 nxt_memzero(&file, sizeof(nxt_file_t));
1580
1581 file.name = (nxt_file_name_t *) path;
1582 file.log_level = NXT_LOG_ERR;
1583
1584 ret = nxt_file_open(task, &file, O_WRONLY | O_APPEND, O_CREAT,
1585 NXT_FILE_OWNER_ACCESS);
1586
1587 type = (ret == NXT_OK) ? NXT_PORT_MSG_RPC_READY_LAST | NXT_PORT_MSG_CLOSE_FD
1588 : NXT_PORT_MSG_RPC_ERROR;
1589
1590 port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
1591 msg->port_msg.reply_port);
1592
1593 if (nxt_fast_path(port != NULL)) {
1594 (void) nxt_port_socket_write(task, port, type, file.fd,
1595 msg->port_msg.stream, 0, NULL);
1596 }
1597}
583
584#endif
585
586 process->state = NXT_PROCESS_STATE_CREATED;
587
588 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_READY_LAST,
589 -1, msg->port_msg.stream, 0, NULL);
590}
591
592
593static nxt_port_handlers_t nxt_main_process_port_handlers = {
594 .data = nxt_main_data_handler,
595 .new_port = nxt_main_new_port_handler,
596 .process_created = nxt_main_process_created_handler,
597 .process_ready = nxt_port_process_ready_handler,
598 .whoami = nxt_main_process_whoami_handler,
599 .remove_pid = nxt_port_remove_pid_handler,
600 .start_process = nxt_main_start_process_handler,
601 .socket = nxt_main_port_socket_handler,
602 .modules = nxt_main_port_modules_handler,
603 .conf_store = nxt_main_port_conf_store_handler,
604#if (NXT_TLS)
605 .cert_get = nxt_cert_store_get_handler,
606 .cert_delete = nxt_cert_store_delete_handler,
607#endif
608 .access_log = nxt_main_port_access_log_handler,
609 .rpc_ready = nxt_port_rpc_handler,
610 .rpc_error = nxt_port_rpc_handler,
611};
612
613
614static void
615nxt_main_process_whoami_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
616{
617 nxt_buf_t *buf;
618 nxt_pid_t pid, ppid;
619 nxt_port_t *port;
620 nxt_runtime_t *rt;
621 nxt_process_t *pprocess;
622
623 nxt_assert(msg->port_msg.reply_port == 0);
624
625 if (nxt_slow_path(msg->buf == NULL
626 || nxt_buf_used_size(msg->buf) != sizeof(nxt_pid_t)))
627 {
628 nxt_alert(task, "whoami: buffer is NULL or unexpected size");
629 goto fail;
630 }
631
632 nxt_memcpy(&ppid, msg->buf->mem.pos, sizeof(nxt_pid_t));
633
634 rt = task->thread->runtime;
635
636 pprocess = nxt_runtime_process_find(rt, ppid);
637 if (nxt_slow_path(pprocess == NULL)) {
638 nxt_alert(task, "whoami: parent process %PI not found", ppid);
639 goto fail;
640 }
641
642 pid = nxt_recv_msg_cmsg_pid(msg);
643
644 nxt_debug(task, "whoami: from %PI, parent %PI, fd %d", pid, ppid,
645 msg->fd[0]);
646
647 if (msg->fd[0] != -1) {
648 port = nxt_runtime_process_port_create(task, rt, pid, 0,
649 NXT_PROCESS_APP);
650 if (nxt_slow_path(port == NULL)) {
651 goto fail;
652 }
653
654 nxt_fd_nonblocking(task, msg->fd[0]);
655
656 port->pair[0] = -1;
657 port->pair[1] = msg->fd[0];
658 msg->fd[0] = -1;
659
660 port->max_size = 16 * 1024;
661 port->max_share = 64 * 1024;
662 port->socket.task = task;
663
664 nxt_port_write_enable(task, port);
665
666 } else {
667 port = nxt_runtime_port_find(rt, pid, 0);
668 if (nxt_slow_path(port == NULL)) {
669 goto fail;
670 }
671 }
672
673 if (ppid != nxt_pid) {
674 nxt_queue_insert_tail(&pprocess->children, &port->process->link);
675 }
676
677 buf = nxt_buf_mem_alloc(task->thread->engine->mem_pool,
678 sizeof(nxt_pid_t), 0);
679 if (nxt_slow_path(buf == NULL)) {
680 goto fail;
681 }
682
683 buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(nxt_pid_t));
684
685 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_READY_LAST, -1,
686 msg->port_msg.stream, 0, buf);
687
688fail:
689
690 if (msg->fd[0] != -1) {
691 nxt_fd_close(msg->fd[0]);
692 }
693}
694
695
696static nxt_int_t
697nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt)
698{
699 nxt_int_t ret;
700 nxt_port_t *port;
701 nxt_process_t *process;
702
703 port = nxt_runtime_process_port_create(task, rt, nxt_pid, 0,
704 NXT_PROCESS_MAIN);
705 if (nxt_slow_path(port == NULL)) {
706 return NXT_ERROR;
707 }
708
709 process = port->process;
710
711 ret = nxt_port_socket_init(task, port, 0);
712 if (nxt_slow_path(ret != NXT_OK)) {
713 nxt_port_use(task, port, -1);
714 return ret;
715 }
716
717 /*
718 * A main process port. A write port is not closed
719 * since it should be inherited by processes.
720 */
721 nxt_port_enable(task, port, &nxt_main_process_port_handlers);
722
723 process->state = NXT_PROCESS_STATE_READY;
724
725 return NXT_OK;
726}
727
728
729static void
730nxt_main_process_title(nxt_task_t *task)
731{
732 u_char *p, *end;
733 nxt_uint_t i;
734 u_char title[2048];
735
736 end = title + sizeof(title) - 1;
737
738 p = nxt_sprintf(title, end, "unit: main v" NXT_VERSION " [%s",
739 nxt_process_argv[0]);
740
741 for (i = 1; nxt_process_argv[i] != NULL; i++) {
742 p = nxt_sprintf(p, end, " %s", nxt_process_argv[i]);
743 }
744
745 if (p < end) {
746 *p++ = ']';
747 }
748
749 *p = '\0';
750
751 nxt_process_title(task, "%s", title);
752}
753
754
755static void
756nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj, void *data)
757{
758 nxt_debug(task, "sigterm handler signo:%d (%s)",
759 (int) (uintptr_t) obj, data);
760
761 /* TODO: fast exit. */
762
763 nxt_exiting = 1;
764
765 nxt_runtime_quit(task, 0);
766}
767
768
769static void
770nxt_main_process_sigquit_handler(nxt_task_t *task, void *obj, void *data)
771{
772 nxt_debug(task, "sigquit handler signo:%d (%s)",
773 (int) (uintptr_t) obj, data);
774
775 /* TODO: graceful exit. */
776
777 nxt_exiting = 1;
778
779 nxt_runtime_quit(task, 0);
780}
781
782
783static void
784nxt_main_process_sigusr1_handler(nxt_task_t *task, void *obj, void *data)
785{
786 nxt_mp_t *mp;
787 nxt_int_t ret;
788 nxt_uint_t n;
789 nxt_port_t *port;
790 nxt_file_t *file, *new_file;
791 nxt_array_t *new_files;
792 nxt_runtime_t *rt;
793
794 nxt_log(task, NXT_LOG_NOTICE, "signal %d (%s) recevied, %s",
795 (int) (uintptr_t) obj, data, "log files rotation");
796
797 rt = task->thread->runtime;
798
799 port = rt->port_by_type[NXT_PROCESS_ROUTER];
800
801 if (nxt_fast_path(port != NULL)) {
802 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_ACCESS_LOG,
803 -1, 0, 0, NULL);
804 }
805
806 mp = nxt_mp_create(1024, 128, 256, 32);
807 if (mp == NULL) {
808 return;
809 }
810
811 n = nxt_list_nelts(rt->log_files);
812
813 new_files = nxt_array_create(mp, n, sizeof(nxt_file_t));
814 if (new_files == NULL) {
815 nxt_mp_destroy(mp);
816 return;
817 }
818
819 nxt_list_each(file, rt->log_files) {
820
821 /* This allocation cannot fail. */
822 new_file = nxt_array_add(new_files);
823
824 new_file->name = file->name;
825 new_file->fd = NXT_FILE_INVALID;
826 new_file->log_level = NXT_LOG_ALERT;
827
828 ret = nxt_file_open(task, new_file, O_WRONLY | O_APPEND, O_CREAT,
829 NXT_FILE_OWNER_ACCESS);
830
831 if (ret != NXT_OK) {
832 goto fail;
833 }
834
835 } nxt_list_loop;
836
837 new_file = new_files->elts;
838
839 ret = nxt_file_stderr(&new_file[0]);
840
841 if (ret == NXT_OK) {
842 n = 0;
843
844 nxt_list_each(file, rt->log_files) {
845
846 nxt_port_change_log_file(task, rt, n, new_file[n].fd);
847 /*
848 * The old log file descriptor must be closed at the moment
849 * when no other threads use it. dup2() allows to use the
850 * old file descriptor for new log file. This change is
851 * performed atomically in the kernel.
852 */
853 (void) nxt_file_redirect(file, new_file[n].fd);
854
855 n++;
856
857 } nxt_list_loop;
858
859 nxt_mp_destroy(mp);
860 return;
861 }
862
863fail:
864
865 new_file = new_files->elts;
866 n = new_files->nelts;
867
868 while (n != 0) {
869 if (new_file->fd != NXT_FILE_INVALID) {
870 nxt_file_close(task, new_file);
871 }
872
873 new_file++;
874 n--;
875 }
876
877 nxt_mp_destroy(mp);
878}
879
880
881static void
882nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data)
883{
884 int status;
885 nxt_int_t ret;
886 nxt_err_t err;
887 nxt_pid_t pid;
888 nxt_port_t *port;
889 nxt_queue_t children;
890 nxt_runtime_t *rt;
891 nxt_process_t *process, *child;
892 nxt_process_init_t init;
893
894 nxt_debug(task, "sigchld handler signo:%d (%s)",
895 (int) (uintptr_t) obj, data);
896
897 rt = task->thread->runtime;
898
899 for ( ;; ) {
900 pid = waitpid(-1, &status, WNOHANG);
901
902 if (pid == -1) {
903
904 switch (err = nxt_errno) {
905
906 case NXT_ECHILD:
907 return;
908
909 case NXT_EINTR:
910 continue;
911
912 default:
913 nxt_alert(task, "waitpid() failed: %E", err);
914 return;
915 }
916 }
917
918 nxt_debug(task, "waitpid(): %PI", pid);
919
920 if (pid == 0) {
921 return;
922 }
923
924 if (WTERMSIG(status)) {
925#ifdef WCOREDUMP
926 nxt_alert(task, "process %PI exited on signal %d%s",
927 pid, WTERMSIG(status),
928 WCOREDUMP(status) ? " (core dumped)" : "");
929#else
930 nxt_alert(task, "process %PI exited on signal %d",
931 pid, WTERMSIG(status));
932#endif
933
934 } else {
935 nxt_trace(task, "process %PI exited with code %d",
936 pid, WEXITSTATUS(status));
937 }
938
939 process = nxt_runtime_process_find(rt, pid);
940
941 if (process != NULL) {
942 nxt_main_process_cleanup(task, process);
943
944 if (process->state == NXT_PROCESS_STATE_READY) {
945 process->stream = 0;
946 }
947
948 nxt_queue_init(&children);
949
950 if (!nxt_queue_is_empty(&process->children)) {
951 nxt_queue_add(&children, &process->children);
952
953 nxt_queue_init(&process->children);
954
955 nxt_queue_each(child, &children, nxt_process_t, link) {
956 port = nxt_process_port_first(child);
957
958 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT,
959 -1, 0, 0, NULL);
960 } nxt_queue_loop;
961 }
962
963 if (nxt_exiting) {
964 nxt_process_close_ports(task, process);
965
966 nxt_queue_each(child, &children, nxt_process_t, link) {
967 nxt_queue_remove(&child->link);
968 child->link.next = NULL;
969
970 nxt_process_close_ports(task, child);
971 } nxt_queue_loop;
972
973 if (rt->nprocesses <= 1) {
974 nxt_runtime_quit(task, 0);
975
976 return;
977 }
978
979 continue;
980 }
981
982 nxt_port_remove_notify_others(task, process);
983
984 nxt_queue_each(child, &children, nxt_process_t, link) {
985 nxt_port_remove_notify_others(task, child);
986
987 nxt_queue_remove(&child->link);
988 child->link.next = NULL;
989
990 nxt_process_close_ports(task, child);
991 } nxt_queue_loop;
992
993 init = *(nxt_process_init_t *) nxt_process_init(process);
994
995 nxt_process_close_ports(task, process);
996
997 if (init.restart) {
998 ret = nxt_process_init_start(task, init);
999 if (nxt_slow_path(ret == NXT_ERROR)) {
1000 nxt_alert(task, "failed to restart %s", init.name);
1001 }
1002 }
1003 }
1004 }
1005}
1006
1007
1008static void
1009nxt_main_process_signal_handler(nxt_task_t *task, void *obj, void *data)
1010{
1011 nxt_trace(task, "signal signo:%d (%s) recevied, ignored",
1012 (int) (uintptr_t) obj, data);
1013}
1014
1015
1016static void
1017nxt_main_process_cleanup(nxt_task_t *task, nxt_process_t *process)
1018{
1019 if (process->isolation.cleanup != NULL) {
1020 process->isolation.cleanup(task, process);
1021 }
1022}
1023
1024
1025static void
1026nxt_main_port_socket_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1027{
1028 size_t size;
1029 nxt_int_t ret;
1030 nxt_buf_t *b, *out;
1031 nxt_port_t *port;
1032 nxt_sockaddr_t *sa;
1033 nxt_port_msg_type_t type;
1034 nxt_listening_socket_t ls;
1035 u_char message[2048];
1036
1037 port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
1038 msg->port_msg.reply_port);
1039 if (nxt_slow_path(port == NULL)) {
1040 return;
1041 }
1042
1043 if (nxt_slow_path(port->type != NXT_PROCESS_ROUTER)) {
1044 nxt_alert(task, "process %PI cannot create listener sockets",
1045 msg->port_msg.pid);
1046
1047 return;
1048 }
1049
1050 b = msg->buf;
1051 sa = (nxt_sockaddr_t *) b->mem.pos;
1052
1053 /* TODO check b size and make plain */
1054
1055 ls.socket = -1;
1056 ls.error = NXT_SOCKET_ERROR_SYSTEM;
1057 ls.start = message;
1058 ls.end = message + sizeof(message);
1059
1060 nxt_debug(task, "listening socket \"%*s\"",
1061 (size_t) sa->length, nxt_sockaddr_start(sa));
1062
1063 ret = nxt_main_listening_socket(sa, &ls);
1064
1065 if (ret == NXT_OK) {
1066 nxt_debug(task, "socket(\"%*s\"): %d",
1067 (size_t) sa->length, nxt_sockaddr_start(sa), ls.socket);
1068
1069 out = NULL;
1070
1071 type = NXT_PORT_MSG_RPC_READY_LAST | NXT_PORT_MSG_CLOSE_FD;
1072
1073 } else {
1074 size = ls.end - ls.start;
1075
1076 nxt_alert(task, "%*s", size, ls.start);
1077
1078 out = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
1079 size + 1);
1080 if (nxt_fast_path(out != NULL)) {
1081 *out->mem.free++ = (uint8_t) ls.error;
1082
1083 out->mem.free = nxt_cpymem(out->mem.free, ls.start, size);
1084 }
1085
1086 type = NXT_PORT_MSG_RPC_ERROR;
1087 }
1088
1089 nxt_port_socket_write(task, port, type, ls.socket, msg->port_msg.stream,
1090 0, out);
1091}
1092
1093
1094static nxt_int_t
1095nxt_main_listening_socket(nxt_sockaddr_t *sa, nxt_listening_socket_t *ls)
1096{
1097 nxt_err_t err;
1098 nxt_socket_t s;
1099
1100 const socklen_t length = sizeof(int);
1101 static const int enable = 1;
1102
1103 s = socket(sa->u.sockaddr.sa_family, sa->type, 0);
1104
1105 if (nxt_slow_path(s == -1)) {
1106 err = nxt_errno;
1107
1108#if (NXT_INET6)
1109
1110 if (err == EAFNOSUPPORT && sa->u.sockaddr.sa_family == AF_INET6) {
1111 ls->error = NXT_SOCKET_ERROR_NOINET6;
1112 }
1113
1114#endif
1115
1116 ls->end = nxt_sprintf(ls->start, ls->end,
1117 "socket(\\\"%*s\\\") failed %E",
1118 (size_t) sa->length, nxt_sockaddr_start(sa), err);
1119
1120 return NXT_ERROR;
1121 }
1122
1123 if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &enable, length) != 0) {
1124 ls->end = nxt_sprintf(ls->start, ls->end,
1125 "setsockopt(\\\"%*s\\\", SO_REUSEADDR) failed %E",
1126 (size_t) sa->length, nxt_sockaddr_start(sa),
1127 nxt_errno);
1128 goto fail;
1129 }
1130
1131#if (NXT_INET6)
1132
1133 if (sa->u.sockaddr.sa_family == AF_INET6) {
1134
1135 if (setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &enable, length) != 0) {
1136 ls->end = nxt_sprintf(ls->start, ls->end,
1137 "setsockopt(\\\"%*s\\\", IPV6_V6ONLY) failed %E",
1138 (size_t) sa->length, nxt_sockaddr_start(sa),
1139 nxt_errno);
1140 goto fail;
1141 }
1142 }
1143
1144#endif
1145
1146 if (bind(s, &sa->u.sockaddr, sa->socklen) != 0) {
1147 err = nxt_errno;
1148
1149#if (NXT_HAVE_UNIX_DOMAIN)
1150
1151 if (sa->u.sockaddr.sa_family == AF_UNIX) {
1152 switch (err) {
1153
1154 case EACCES:
1155 ls->error = NXT_SOCKET_ERROR_ACCESS;
1156 break;
1157
1158 case ENOENT:
1159 case ENOTDIR:
1160 ls->error = NXT_SOCKET_ERROR_PATH;
1161 break;
1162 }
1163
1164 } else
1165#endif
1166 {
1167 switch (err) {
1168
1169 case EACCES:
1170 ls->error = NXT_SOCKET_ERROR_PORT;
1171 break;
1172
1173 case EADDRINUSE:
1174 ls->error = NXT_SOCKET_ERROR_INUSE;
1175 break;
1176
1177 case EADDRNOTAVAIL:
1178 ls->error = NXT_SOCKET_ERROR_NOADDR;
1179 break;
1180 }
1181 }
1182
1183 ls->end = nxt_sprintf(ls->start, ls->end, "bind(\\\"%*s\\\") failed %E",
1184 (size_t) sa->length, nxt_sockaddr_start(sa), err);
1185 goto fail;
1186 }
1187
1188#if (NXT_HAVE_UNIX_DOMAIN)
1189
1190 if (sa->u.sockaddr.sa_family == AF_UNIX) {
1191 char *filename;
1192 mode_t access;
1193
1194 filename = sa->u.sockaddr_un.sun_path;
1195 access = (S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
1196
1197 if (chmod(filename, access) != 0) {
1198 ls->end = nxt_sprintf(ls->start, ls->end,
1199 "chmod(\\\"%s\\\") failed %E",
1200 filename, nxt_errno);
1201 goto fail;
1202 }
1203 }
1204
1205#endif
1206
1207 ls->socket = s;
1208
1209 return NXT_OK;
1210
1211fail:
1212
1213 (void) close(s);
1214
1215 return NXT_ERROR;
1216}
1217
1218
1219static nxt_conf_map_t nxt_app_lang_module_map[] = {
1220 {
1221 nxt_string("type"),
1222 NXT_CONF_MAP_INT,
1223 offsetof(nxt_app_lang_module_t, type),
1224 },
1225
1226 {
1227 nxt_string("version"),
1228 NXT_CONF_MAP_CSTRZ,
1229 offsetof(nxt_app_lang_module_t, version),
1230 },
1231
1232 {
1233 nxt_string("file"),
1234 NXT_CONF_MAP_CSTRZ,
1235 offsetof(nxt_app_lang_module_t, file),
1236 },
1237};
1238
1239
1240static nxt_conf_map_t nxt_app_lang_mounts_map[] = {
1241 {
1242 nxt_string("src"),
1243 NXT_CONF_MAP_CSTRZ,
1244 offsetof(nxt_fs_mount_t, src),
1245 },
1246 {
1247 nxt_string("dst"),
1248 NXT_CONF_MAP_CSTRZ,
1249 offsetof(nxt_fs_mount_t, dst),
1250 },
1251 {
1252 nxt_string("name"),
1253 NXT_CONF_MAP_CSTRZ,
1254 offsetof(nxt_fs_mount_t, name),
1255 },
1256 {
1257 nxt_string("type"),
1258 NXT_CONF_MAP_INT,
1259 offsetof(nxt_fs_mount_t, type),
1260 },
1261 {
1262 nxt_string("flags"),
1263 NXT_CONF_MAP_INT,
1264 offsetof(nxt_fs_mount_t, flags),
1265 },
1266 {
1267 nxt_string("data"),
1268 NXT_CONF_MAP_CSTRZ,
1269 offsetof(nxt_fs_mount_t, data),
1270 },
1271};
1272
1273
1274static void
1275nxt_main_port_modules_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1276{
1277 uint32_t index, jindex, nmounts;
1278 nxt_mp_t *mp;
1279 nxt_int_t ret;
1280 nxt_buf_t *b;
1281 nxt_port_t *port;
1282 nxt_runtime_t *rt;
1283 nxt_fs_mount_t *mnt;
1284 nxt_conf_value_t *conf, *root, *value, *mounts;
1285 nxt_app_lang_module_t *lang;
1286
1287 static nxt_str_t root_path = nxt_string("/");
1288 static nxt_str_t mounts_name = nxt_string("mounts");
1289
1290 rt = task->thread->runtime;
1291
1292 if (msg->port_msg.pid != rt->port_by_type[NXT_PROCESS_DISCOVERY]->pid) {
1293 nxt_alert(task, "process %PI cannot send modules", msg->port_msg.pid);
1294 return;
1295 }
1296
1297 if (nxt_exiting) {
1298 nxt_debug(task, "ignoring discovered modules, exiting");
1299 return;
1300 }
1301
1302 port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
1303 msg->port_msg.reply_port);
1304
1305 if (nxt_fast_path(port != NULL)) {
1306 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
1307 msg->port_msg.stream, 0, NULL);
1308 }
1309
1310 b = msg->buf;
1311
1312 if (b == NULL) {
1313 return;
1314 }
1315
1316 mp = nxt_mp_create(1024, 128, 256, 32);
1317 if (mp == NULL) {
1318 return;
1319 }
1320
1321 b = nxt_buf_chk_make_plain(mp, b, msg->size);
1322
1323 if (b == NULL) {
1324 return;
1325 }
1326
1327 nxt_debug(task, "application languages: \"%*s\"",
1328 b->mem.free - b->mem.pos, b->mem.pos);
1329
1330 conf = nxt_conf_json_parse(mp, b->mem.pos, b->mem.free, NULL);
1331 if (conf == NULL) {
1332 goto fail;
1333 }
1334
1335 root = nxt_conf_get_path(conf, &root_path);
1336 if (root == NULL) {
1337 goto fail;
1338 }
1339
1340 for (index = 0; /* void */ ; index++) {
1341 value = nxt_conf_get_array_element(root, index);
1342 if (value == NULL) {
1343 break;
1344 }
1345
1346 lang = nxt_array_zero_add(rt->languages);
1347 if (lang == NULL) {
1348 goto fail;
1349 }
1350
1351 lang->module = NULL;
1352
1353 ret = nxt_conf_map_object(rt->mem_pool, value, nxt_app_lang_module_map,
1354 nxt_nitems(nxt_app_lang_module_map), lang);
1355
1356 if (ret != NXT_OK) {
1357 goto fail;
1358 }
1359
1360 mounts = nxt_conf_get_object_member(value, &mounts_name, NULL);
1361 if (mounts == NULL) {
1362 nxt_alert(task, "missing mounts from discovery message.");
1363 goto fail;
1364 }
1365
1366 if (nxt_conf_type(mounts) != NXT_CONF_ARRAY) {
1367 nxt_alert(task, "invalid mounts type from discovery message.");
1368 goto fail;
1369 }
1370
1371 nmounts = nxt_conf_array_elements_count(mounts);
1372
1373 lang->mounts = nxt_array_create(rt->mem_pool, nmounts,
1374 sizeof(nxt_fs_mount_t));
1375
1376 if (lang->mounts == NULL) {
1377 goto fail;
1378 }
1379
1380 for (jindex = 0; /* */; jindex++) {
1381 value = nxt_conf_get_array_element(mounts, jindex);
1382 if (value == NULL) {
1383 break;
1384 }
1385
1386 mnt = nxt_array_zero_add(lang->mounts);
1387 if (mnt == NULL) {
1388 goto fail;
1389 }
1390
1391 mnt->builtin = 1;
1392 mnt->deps = 1;
1393
1394 ret = nxt_conf_map_object(rt->mem_pool, value,
1395 nxt_app_lang_mounts_map,
1396 nxt_nitems(nxt_app_lang_mounts_map), mnt);
1397
1398 if (ret != NXT_OK) {
1399 goto fail;
1400 }
1401 }
1402
1403 nxt_debug(task, "lang %d %s \"%s\" (%d mounts)",
1404 lang->type, lang->version, lang->file, lang->mounts->nelts);
1405 }
1406
1407 qsort(rt->languages->elts, rt->languages->nelts,
1408 sizeof(nxt_app_lang_module_t), nxt_app_lang_compare);
1409
1410fail:
1411
1412 nxt_mp_destroy(mp);
1413
1414 ret = nxt_process_init_start(task, nxt_controller_process);
1415 if (ret == NXT_OK) {
1416 ret = nxt_process_init_start(task, nxt_router_process);
1417 }
1418
1419 if (nxt_slow_path(ret == NXT_ERROR)) {
1420 nxt_exiting = 1;
1421
1422 nxt_runtime_quit(task, 1);
1423 }
1424}
1425
1426
1427static int nxt_cdecl
1428nxt_app_lang_compare(const void *v1, const void *v2)
1429{
1430 int n;
1431 const nxt_app_lang_module_t *lang1, *lang2;
1432
1433 lang1 = v1;
1434 lang2 = v2;
1435
1436 n = lang1->type - lang2->type;
1437
1438 if (n != 0) {
1439 return n;
1440 }
1441
1442 n = nxt_strverscmp(lang1->version, lang2->version);
1443
1444 /* Negate result to move higher versions to the beginning. */
1445
1446 return -n;
1447}
1448
1449
1450static void
1451nxt_main_port_conf_store_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1452{
1453 void *p;
1454 size_t n, size;
1455 nxt_int_t ret;
1456 nxt_port_t *ctl_port;
1457 nxt_runtime_t *rt;
1458 u_char ver[NXT_INT_T_LEN];
1459
1460 rt = task->thread->runtime;
1461
1462 ctl_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
1463
1464 if (nxt_slow_path(msg->port_msg.pid != ctl_port->pid)) {
1465 nxt_alert(task, "process %PI cannot store conf", msg->port_msg.pid);
1466 return;
1467 }
1468
1469 p = MAP_FAILED;
1470
1471 /*
1472 * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be
1473 * initialized in 'cleanup' section.
1474 */
1475 size = 0;
1476
1477 if (nxt_slow_path(msg->fd[0] == -1)) {
1478 nxt_alert(task, "conf_store_handler: invalid shm fd");
1479 goto error;
1480 }
1481
1482 if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) {
1483 nxt_alert(task, "conf_store_handler: unexpected buffer size (%d)",
1484 (int) nxt_buf_mem_used_size(&msg->buf->mem));
1485 goto error;
1486 }
1487
1488 nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t));
1489
1490 p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0);
1491
1492 nxt_fd_close(msg->fd[0]);
1493 msg->fd[0] = -1;
1494
1495 if (nxt_slow_path(p == MAP_FAILED)) {
1496 goto error;
1497 }
1498
1499 nxt_debug(task, "conf_store_handler(%uz): %*s", size, size, p);
1500
1501 if (nxt_conf_ver != NXT_VERNUM) {
1502 n = nxt_sprintf(ver, ver + NXT_INT_T_LEN, "%d", NXT_VERNUM) - ver;
1503
1504 ret = nxt_main_file_store(task, rt->ver_tmp, rt->ver, ver, n);
1505 if (nxt_slow_path(ret != NXT_OK)) {
1506 goto error;
1507 }
1508
1509 nxt_conf_ver = NXT_VERNUM;
1510 }
1511
1512 ret = nxt_main_file_store(task, rt->conf_tmp, rt->conf, p, size);
1513
1514 if (nxt_fast_path(ret == NXT_OK)) {
1515 goto cleanup;
1516 }
1517
1518error:
1519
1520 nxt_alert(task, "failed to store current configuration");
1521
1522cleanup:
1523
1524 if (p != MAP_FAILED) {
1525 nxt_mem_munmap(p, size);
1526 }
1527
1528 if (msg->fd[0] != -1) {
1529 nxt_fd_close(msg->fd[0]);
1530 msg->fd[0] = -1;
1531 }
1532}
1533
1534
1535static nxt_int_t
1536nxt_main_file_store(nxt_task_t *task, const char *tmp_name, const char *name,
1537 u_char *buf, size_t size)
1538{
1539 ssize_t n;
1540 nxt_int_t ret;
1541 nxt_file_t file;
1542
1543 nxt_memzero(&file, sizeof(nxt_file_t));
1544
1545 file.name = (nxt_file_name_t *) name;
1546
1547 ret = nxt_file_open(task, &file, NXT_FILE_WRONLY, NXT_FILE_TRUNCATE,
1548 NXT_FILE_OWNER_ACCESS);
1549 if (nxt_slow_path(ret != NXT_OK)) {
1550 return NXT_ERROR;
1551 }
1552
1553 n = nxt_file_write(&file, buf, size, 0);
1554
1555 nxt_file_close(task, &file);
1556
1557 if (nxt_slow_path(n != (ssize_t) size)) {
1558 (void) nxt_file_delete(file.name);
1559 return NXT_ERROR;
1560 }
1561
1562 return nxt_file_rename(file.name, (nxt_file_name_t *) name);
1563}
1564
1565
1566static void
1567nxt_main_port_access_log_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1568{
1569 u_char *path;
1570 nxt_int_t ret;
1571 nxt_file_t file;
1572 nxt_port_t *port;
1573 nxt_port_msg_type_t type;
1574
1575 nxt_debug(task, "opening access log file");
1576
1577 path = msg->buf->mem.pos;
1578
1579 nxt_memzero(&file, sizeof(nxt_file_t));
1580
1581 file.name = (nxt_file_name_t *) path;
1582 file.log_level = NXT_LOG_ERR;
1583
1584 ret = nxt_file_open(task, &file, O_WRONLY | O_APPEND, O_CREAT,
1585 NXT_FILE_OWNER_ACCESS);
1586
1587 type = (ret == NXT_OK) ? NXT_PORT_MSG_RPC_READY_LAST | NXT_PORT_MSG_CLOSE_FD
1588 : NXT_PORT_MSG_RPC_ERROR;
1589
1590 port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
1591 msg->port_msg.reply_port);
1592
1593 if (nxt_fast_path(port != NULL)) {
1594 (void) nxt_port_socket_write(task, port, type, file.fd,
1595 msg->port_msg.stream, 0, NULL);
1596 }
1597}