xref: /unit/src/nxt_controller.c (revision 2145:ee1e319c8ed8)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Valentin V. Bartenev
5  * Copyright (C) NGINX, Inc.
6  */
7 
8 #include <nxt_main.h>
9 #include <nxt_runtime.h>
10 #include <nxt_main_process.h>
11 #include <nxt_conf.h>
12 #include <nxt_cert.h>
13 
14 
15 typedef struct {
16     nxt_conf_value_t  *root;
17     nxt_mp_t          *pool;
18 } nxt_controller_conf_t;
19 
20 
21 typedef struct {
22     nxt_http_request_parse_t  parser;
23     size_t                    length;
24     nxt_controller_conf_t     conf;
25     nxt_conn_t                *conn;
26     nxt_queue_link_t          link;
27 } nxt_controller_request_t;
28 
29 
30 typedef struct {
31     nxt_uint_t        status;
32     nxt_conf_value_t  *conf;
33 
34     u_char            *title;
35     nxt_str_t         detail;
36     ssize_t           offset;
37     nxt_uint_t        line;
38     nxt_uint_t        column;
39 } nxt_controller_response_t;
40 
41 
42 static nxt_int_t nxt_controller_prefork(nxt_task_t *task,
43     nxt_process_t *process, nxt_mp_t *mp);
44 static nxt_int_t nxt_controller_file_read(nxt_task_t *task, const char *name,
45     nxt_str_t *str, nxt_mp_t *mp);
46 static nxt_int_t nxt_controller_start(nxt_task_t *task,
47     nxt_process_data_t *data);
48 static void nxt_controller_process_new_port_handler(nxt_task_t *task,
49     nxt_port_recv_msg_t *msg);
50 static void nxt_controller_send_current_conf(nxt_task_t *task);
51 static void nxt_controller_router_ready_handler(nxt_task_t *task,
52     nxt_port_recv_msg_t *msg);
53 static void nxt_controller_remove_pid_handler(nxt_task_t *task,
54     nxt_port_recv_msg_t *msg);
55 static nxt_int_t nxt_controller_conf_default(void);
56 static void nxt_controller_conf_init_handler(nxt_task_t *task,
57     nxt_port_recv_msg_t *msg, void *data);
58 static void nxt_controller_flush_requests(nxt_task_t *task);
59 static nxt_int_t nxt_controller_conf_send(nxt_task_t *task, nxt_mp_t *mp,
60     nxt_conf_value_t *conf, nxt_port_rpc_handler_t handler, void *data);
61 
62 static void nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data);
63 static void nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data);
64 static nxt_msec_t nxt_controller_conn_timeout_value(nxt_conn_t *c,
65     uintptr_t data);
66 static void nxt_controller_conn_read_error(nxt_task_t *task, void *obj,
67     void *data);
68 static void nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj,
69     void *data);
70 static void nxt_controller_conn_body_read(nxt_task_t *task, void *obj,
71     void *data);
72 static void nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data);
73 static void nxt_controller_conn_write_error(nxt_task_t *task, void *obj,
74     void *data);
75 static void nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj,
76     void *data);
77 static void nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data);
78 static void nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data);
79 
80 static nxt_int_t nxt_controller_request_content_length(void *ctx,
81     nxt_http_field_t *field, uintptr_t data);
82 
83 static void nxt_controller_process_request(nxt_task_t *task,
84     nxt_controller_request_t *req);
85 static void nxt_controller_process_config(nxt_task_t *task,
86     nxt_controller_request_t *req, nxt_str_t *path);
87 static nxt_bool_t nxt_controller_check_postpone_request(nxt_task_t *task);
88 #if (NXT_TLS)
89 static void nxt_controller_process_cert(nxt_task_t *task,
90     nxt_controller_request_t *req, nxt_str_t *path);
91 static void nxt_controller_process_cert_save(nxt_task_t *task,
92     nxt_port_recv_msg_t *msg, void *data);
93 static nxt_bool_t nxt_controller_cert_in_use(nxt_str_t *name);
94 static void nxt_controller_cert_cleanup(nxt_task_t *task, void *obj,
95     void *data);
96 #endif
97 static void nxt_controller_process_control(nxt_task_t *task,
98     nxt_controller_request_t *req, nxt_str_t *path);
99 static void nxt_controller_app_restart_handler(nxt_task_t *task,
100     nxt_port_recv_msg_t *msg, void *data);
101 static void nxt_controller_conf_handler(nxt_task_t *task,
102     nxt_port_recv_msg_t *msg, void *data);
103 static void nxt_controller_conf_store(nxt_task_t *task,
104     nxt_conf_value_t *conf);
105 static void nxt_controller_response(nxt_task_t *task,
106     nxt_controller_request_t *req, nxt_controller_response_t *resp);
107 static u_char *nxt_controller_date(u_char *buf, nxt_realtime_t *now,
108     struct tm *tm, size_t size, const char *format);
109 
110 
111 static nxt_http_field_proc_t  nxt_controller_request_fields[] = {
112     { nxt_string("Content-Length"),
113       &nxt_controller_request_content_length, 0 },
114 };
115 
116 static nxt_lvlhsh_t            nxt_controller_fields_hash;
117 
118 static nxt_uint_t              nxt_controller_listening;
119 static nxt_uint_t              nxt_controller_router_ready;
120 static nxt_controller_conf_t   nxt_controller_conf;
121 static nxt_queue_t             nxt_controller_waiting_requests;
122 static nxt_bool_t              nxt_controller_waiting_init_conf;
123 
124 
125 static const nxt_event_conn_state_t  nxt_controller_conn_read_state;
126 static const nxt_event_conn_state_t  nxt_controller_conn_body_read_state;
127 static const nxt_event_conn_state_t  nxt_controller_conn_write_state;
128 static const nxt_event_conn_state_t  nxt_controller_conn_close_state;
129 
130 
131 static const nxt_port_handlers_t  nxt_controller_process_port_handlers = {
132     .quit           = nxt_signal_quit_handler,
133     .new_port       = nxt_controller_process_new_port_handler,
134     .change_file    = nxt_port_change_log_file_handler,
135     .mmap           = nxt_port_mmap_handler,
136     .process_ready  = nxt_controller_router_ready_handler,
137     .data           = nxt_port_data_handler,
138     .remove_pid     = nxt_controller_remove_pid_handler,
139     .rpc_ready      = nxt_port_rpc_handler,
140     .rpc_error      = nxt_port_rpc_handler,
141 };
142 
143 
144 const nxt_process_init_t  nxt_controller_process = {
145     .name           = "controller",
146     .type           = NXT_PROCESS_CONTROLLER,
147     .prefork        = nxt_controller_prefork,
148     .restart        = 1,
149     .setup          = nxt_process_core_setup,
150     .start          = nxt_controller_start,
151     .port_handlers  = &nxt_controller_process_port_handlers,
152     .signals        = nxt_process_signals,
153 };
154 
155 
156 static nxt_int_t
nxt_controller_prefork(nxt_task_t * task,nxt_process_t * process,nxt_mp_t * mp)157 nxt_controller_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp)
158 {
159     nxt_str_t              ver;
160     nxt_int_t              ret, num;
161     nxt_runtime_t          *rt;
162     nxt_controller_init_t  ctrl_init;
163 
164     nxt_log(task, NXT_LOG_INFO, "controller started");
165 
166     rt = task->thread->runtime;
167 
168     nxt_memzero(&ctrl_init, sizeof(nxt_controller_init_t));
169 
170     /*
171      * Since configuration version has only been introduced in 1.26,
172      * set the default version to 1.25.
173     */
174     nxt_conf_ver = 12500;
175 
176     ret = nxt_controller_file_read(task, rt->conf, &ctrl_init.conf, mp);
177     if (nxt_slow_path(ret == NXT_ERROR)) {
178         return NXT_ERROR;
179     }
180 
181     if (ret == NXT_OK) {
182         ret = nxt_controller_file_read(task, rt->ver, &ver, mp);
183         if (nxt_slow_path(ret == NXT_ERROR)) {
184             return NXT_ERROR;
185         }
186 
187         if (ret == NXT_OK) {
188             num = nxt_int_parse(ver.start, ver.length);
189 
190             if (nxt_slow_path(num < 0)) {
191                 nxt_alert(task, "failed to restore previous configuration: "
192                           "invalid version string \"%V\"", &ver);
193 
194                 nxt_str_null(&ctrl_init.conf);
195 
196             } else {
197                 nxt_conf_ver = num;
198             }
199         }
200     }
201 
202 #if (NXT_TLS)
203     ctrl_init.certs = nxt_cert_store_load(task, mp);
204 
205     nxt_mp_cleanup(mp, nxt_controller_cert_cleanup, task, ctrl_init.certs, rt);
206 #endif
207 
208     process->data.controller = ctrl_init;
209 
210     return NXT_OK;
211 }
212 
213 
214 static nxt_int_t
nxt_controller_file_read(nxt_task_t * task,const char * name,nxt_str_t * str,nxt_mp_t * mp)215 nxt_controller_file_read(nxt_task_t *task, const char *name, nxt_str_t *str,
216     nxt_mp_t *mp)
217 {
218     ssize_t          n;
219     nxt_int_t        ret;
220     nxt_file_t       file;
221     nxt_file_info_t  fi;
222 
223     nxt_memzero(&file, sizeof(nxt_file_t));
224 
225     file.name = (nxt_file_name_t *) name;
226 
227     ret = nxt_file_open(task, &file, NXT_FILE_RDONLY, NXT_FILE_OPEN, 0);
228 
229     if (ret == NXT_OK) {
230         ret = nxt_file_info(&file, &fi);
231         if (nxt_slow_path(ret != NXT_OK)) {
232             goto fail;
233         }
234 
235         if (nxt_fast_path(nxt_is_file(&fi))) {
236             str->length = nxt_file_size(&fi);
237             str->start = nxt_mp_nget(mp, str->length);
238             if (nxt_slow_path(str->start == NULL)) {
239                 goto fail;
240             }
241 
242             n = nxt_file_read(&file, str->start, str->length, 0);
243             if (nxt_slow_path(n != (ssize_t) str->length)) {
244                 goto fail;
245             }
246 
247             nxt_file_close(task, &file);
248 
249             return NXT_OK;
250         }
251 
252         nxt_file_close(task, &file);
253     }
254 
255     return NXT_DECLINED;
256 
257 fail:
258 
259     nxt_file_close(task, &file);
260 
261     return NXT_ERROR;
262 }
263 
264 
265 #if (NXT_TLS)
266 
267 static void
nxt_controller_cert_cleanup(nxt_task_t * task,void * obj,void * data)268 nxt_controller_cert_cleanup(nxt_task_t *task, void *obj, void *data)
269 {
270     pid_t          main_pid;
271     nxt_array_t    *certs;
272     nxt_runtime_t  *rt;
273 
274     certs = obj;
275     rt = data;
276 
277     main_pid = rt->port_by_type[NXT_PROCESS_MAIN]->pid;
278 
279     if (nxt_pid == main_pid && certs != NULL) {
280         nxt_cert_store_release(certs);
281     }
282 }
283 
284 #endif
285 
286 
287 static nxt_int_t
nxt_controller_start(nxt_task_t * task,nxt_process_data_t * data)288 nxt_controller_start(nxt_task_t *task, nxt_process_data_t *data)
289 {
290     nxt_mp_t               *mp;
291     nxt_int_t              ret;
292     nxt_str_t              *json;
293     nxt_conf_value_t       *conf;
294     nxt_conf_validation_t  vldt;
295     nxt_controller_init_t  *init;
296 
297     ret = nxt_http_fields_hash(&nxt_controller_fields_hash,
298                                nxt_controller_request_fields,
299                                nxt_nitems(nxt_controller_request_fields));
300 
301     if (nxt_slow_path(ret != NXT_OK)) {
302         return NXT_ERROR;
303     }
304 
305     nxt_queue_init(&nxt_controller_waiting_requests);
306 
307     init = &data->controller;
308 
309 #if (NXT_TLS)
310     if (init->certs != NULL) {
311         nxt_cert_info_init(task, init->certs);
312         nxt_cert_store_release(init->certs);
313     }
314 #endif
315 
316     json = &init->conf;
317 
318     if (json->start == NULL) {
319         return NXT_OK;
320     }
321 
322     mp = nxt_mp_create(1024, 128, 256, 32);
323     if (nxt_slow_path(mp == NULL)) {
324         return NXT_ERROR;
325     }
326 
327     conf = nxt_conf_json_parse_str(mp, json);
328     if (nxt_slow_path(conf == NULL)) {
329         nxt_alert(task, "failed to restore previous configuration: "
330                   "file is corrupted or not enough memory");
331 
332         nxt_mp_destroy(mp);
333         return NXT_OK;
334     }
335 
336     nxt_memzero(&vldt, sizeof(nxt_conf_validation_t));
337 
338     vldt.pool = nxt_mp_create(1024, 128, 256, 32);
339     if (nxt_slow_path(vldt.pool == NULL)) {
340         nxt_mp_destroy(mp);
341         return NXT_ERROR;
342     }
343 
344     vldt.conf = conf;
345     vldt.conf_pool = mp;
346     vldt.ver = nxt_conf_ver;
347 
348     ret = nxt_conf_validate(&vldt);
349 
350     if (nxt_slow_path(ret != NXT_OK)) {
351 
352         if (ret == NXT_DECLINED) {
353             nxt_alert(task, "the previous configuration is invalid: %V",
354                       &vldt.error);
355 
356             nxt_mp_destroy(vldt.pool);
357             nxt_mp_destroy(mp);
358 
359             return NXT_OK;
360         }
361 
362         /* ret == NXT_ERROR */
363 
364         return NXT_ERROR;
365     }
366 
367     nxt_mp_destroy(vldt.pool);
368 
369     nxt_controller_conf.root = conf;
370     nxt_controller_conf.pool = mp;
371 
372     return NXT_OK;
373 }
374 
375 
376 static void
nxt_controller_process_new_port_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)377 nxt_controller_process_new_port_handler(nxt_task_t *task,
378     nxt_port_recv_msg_t *msg)
379 {
380     nxt_port_new_port_handler(task, msg);
381 
382     if (msg->u.new_port->type != NXT_PROCESS_ROUTER
383         || !nxt_controller_router_ready)
384     {
385         return;
386     }
387 
388     nxt_controller_send_current_conf(task);
389 }
390 
391 
392 static void
nxt_controller_send_current_conf(nxt_task_t * task)393 nxt_controller_send_current_conf(nxt_task_t *task)
394 {
395     nxt_int_t         rc;
396     nxt_runtime_t     *rt;
397     nxt_conf_value_t  *conf;
398 
399     conf = nxt_controller_conf.root;
400 
401     if (conf != NULL) {
402         rc = nxt_controller_conf_send(task, nxt_controller_conf.pool, conf,
403                                       nxt_controller_conf_init_handler, NULL);
404 
405         if (nxt_fast_path(rc == NXT_OK)) {
406             nxt_controller_waiting_init_conf = 1;
407 
408             return;
409         }
410 
411         nxt_mp_destroy(nxt_controller_conf.pool);
412 
413         if (nxt_slow_path(nxt_controller_conf_default() != NXT_OK)) {
414             nxt_abort();
415         }
416     }
417 
418     if (nxt_slow_path(nxt_controller_conf_default() != NXT_OK)) {
419         nxt_abort();
420     }
421 
422     rt = task->thread->runtime;
423 
424     if (nxt_slow_path(nxt_listen_event(task, rt->controller_socket) == NULL)) {
425         nxt_abort();
426     }
427 
428     nxt_controller_listening = 1;
429 
430     nxt_controller_flush_requests(task);
431 }
432 
433 
434 static void
nxt_controller_router_ready_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)435 nxt_controller_router_ready_handler(nxt_task_t *task,
436     nxt_port_recv_msg_t *msg)
437 {
438     nxt_port_t     *router_port;
439     nxt_runtime_t  *rt;
440 
441     rt = task->thread->runtime;
442 
443     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
444 
445     nxt_controller_router_ready = 1;
446 
447     if (router_port != NULL) {
448         nxt_controller_send_current_conf(task);
449     }
450 }
451 
452 
453 static void
nxt_controller_remove_pid_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)454 nxt_controller_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
455 {
456     nxt_pid_t      pid;
457     nxt_process_t  *process;
458     nxt_runtime_t  *rt;
459 
460     rt = task->thread->runtime;
461 
462     nxt_assert(nxt_buf_used_size(msg->buf) == sizeof(pid));
463 
464     nxt_memcpy(&pid, msg->buf->mem.pos, sizeof(pid));
465 
466     process = nxt_runtime_process_find(rt, pid);
467     if (process != NULL && nxt_process_type(process) == NXT_PROCESS_ROUTER) {
468         nxt_controller_router_ready = 0;
469     }
470 
471     nxt_port_remove_pid_handler(task, msg);
472 }
473 
474 
475 static nxt_int_t
nxt_controller_conf_default(void)476 nxt_controller_conf_default(void)
477 {
478     nxt_mp_t          *mp;
479     nxt_conf_value_t  *conf;
480 
481     static const nxt_str_t json
482         = nxt_string("{ \"listeners\": {}, \"applications\": {} }");
483 
484     mp = nxt_mp_create(1024, 128, 256, 32);
485 
486     if (nxt_slow_path(mp == NULL)) {
487         return NXT_ERROR;
488     }
489 
490     conf = nxt_conf_json_parse_str(mp, &json);
491 
492     if (nxt_slow_path(conf == NULL)) {
493         return NXT_ERROR;
494     }
495 
496     nxt_controller_conf.root = conf;
497     nxt_controller_conf.pool = mp;
498 
499     return NXT_OK;
500 }
501 
502 
503 static void
nxt_controller_conf_init_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)504 nxt_controller_conf_init_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
505     void *data)
506 {
507     nxt_runtime_t  *rt;
508 
509     nxt_controller_waiting_init_conf = 0;
510 
511     if (msg->port_msg.type != NXT_PORT_MSG_RPC_READY) {
512         nxt_alert(task, "failed to apply previous configuration");
513 
514         nxt_mp_destroy(nxt_controller_conf.pool);
515 
516         if (nxt_slow_path(nxt_controller_conf_default() != NXT_OK)) {
517             nxt_abort();
518         }
519     }
520 
521     if (nxt_controller_listening == 0) {
522         rt = task->thread->runtime;
523 
524         if (nxt_slow_path(nxt_listen_event(task, rt->controller_socket)
525                           == NULL))
526         {
527             nxt_abort();
528         }
529 
530         nxt_controller_listening = 1;
531     }
532 
533     nxt_controller_flush_requests(task);
534 }
535 
536 
537 static void
nxt_controller_flush_requests(nxt_task_t * task)538 nxt_controller_flush_requests(nxt_task_t *task)
539 {
540     nxt_queue_t               queue;
541     nxt_controller_request_t  *req;
542 
543     nxt_queue_init(&queue);
544     nxt_queue_add(&queue, &nxt_controller_waiting_requests);
545 
546     nxt_queue_init(&nxt_controller_waiting_requests);
547 
548     nxt_queue_each(req, &queue, nxt_controller_request_t, link) {
549         nxt_controller_process_request(task, req);
550     } nxt_queue_loop;
551 }
552 
553 
554 static nxt_int_t
nxt_controller_conf_send(nxt_task_t * task,nxt_mp_t * mp,nxt_conf_value_t * conf,nxt_port_rpc_handler_t handler,void * data)555 nxt_controller_conf_send(nxt_task_t *task, nxt_mp_t *mp, nxt_conf_value_t *conf,
556     nxt_port_rpc_handler_t handler, void *data)
557 {
558     void           *mem;
559     u_char         *end;
560     size_t         size;
561     uint32_t       stream;
562     nxt_fd_t       fd;
563     nxt_int_t      rc;
564     nxt_buf_t      *b;
565     nxt_port_t     *router_port, *controller_port;
566     nxt_runtime_t  *rt;
567 
568     rt = task->thread->runtime;
569 
570     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
571 
572     nxt_assert(router_port != NULL);
573     nxt_assert(nxt_controller_router_ready);
574 
575     controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
576 
577     size = nxt_conf_json_length(conf, NULL);
578 
579     b = nxt_buf_mem_alloc(mp, sizeof(size_t), 0);
580     if (nxt_slow_path(b == NULL)) {
581         return NXT_ERROR;
582     }
583 
584     fd = nxt_shm_open(task, size);
585     if (nxt_slow_path(fd == -1)) {
586         return NXT_ERROR;
587     }
588 
589     mem = nxt_mem_mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
590     if (nxt_slow_path(mem == MAP_FAILED)) {
591         goto fail;
592     }
593 
594     end = nxt_conf_json_print(mem, conf, NULL);
595 
596     nxt_mem_munmap(mem, size);
597 
598     size = end - (u_char *) mem;
599 
600     b->mem.free = nxt_cpymem(b->mem.pos, &size, sizeof(size_t));
601 
602     stream = nxt_port_rpc_register_handler(task, controller_port,
603                                            handler, handler,
604                                            router_port->pid, data);
605     if (nxt_slow_path(stream == 0)) {
606         goto fail;
607     }
608 
609     rc = nxt_port_socket_write(task, router_port,
610                                NXT_PORT_MSG_DATA_LAST | NXT_PORT_MSG_CLOSE_FD,
611                                fd, stream, controller_port->id, b);
612 
613     if (nxt_slow_path(rc != NXT_OK)) {
614         nxt_port_rpc_cancel(task, controller_port, stream);
615 
616         goto fail;
617     }
618 
619     return NXT_OK;
620 
621 fail:
622 
623     nxt_fd_close(fd);
624 
625     return NXT_ERROR;
626 }
627 
628 
629 nxt_int_t
nxt_runtime_controller_socket(nxt_task_t * task,nxt_runtime_t * rt)630 nxt_runtime_controller_socket(nxt_task_t *task, nxt_runtime_t *rt)
631 {
632     nxt_listen_socket_t  *ls;
633 
634     ls = nxt_mp_alloc(rt->mem_pool, sizeof(nxt_listen_socket_t));
635     if (ls == NULL) {
636         return NXT_ERROR;
637     }
638 
639     ls->sockaddr = rt->controller_listen;
640 
641     nxt_listen_socket_remote_size(ls);
642 
643     ls->socket = -1;
644     ls->backlog = NXT_LISTEN_BACKLOG;
645     ls->read_after_accept = 1;
646     ls->flags = NXT_NONBLOCK;
647 
648 #if 0
649     /* STUB */
650     wq = nxt_mp_zget(cf->mem_pool, sizeof(nxt_work_queue_t));
651     if (wq == NULL) {
652         return NXT_ERROR;
653     }
654     nxt_work_queue_name(wq, "listen");
655     /**/
656 
657     ls->work_queue = wq;
658 #endif
659     ls->handler = nxt_controller_conn_init;
660 
661     if (nxt_listen_socket_create(task, rt->mem_pool, ls) != NXT_OK) {
662         return NXT_ERROR;
663     }
664 
665     rt->controller_socket = ls;
666 
667     return NXT_OK;
668 }
669 
670 
671 static void
nxt_controller_conn_init(nxt_task_t * task,void * obj,void * data)672 nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data)
673 {
674     nxt_buf_t                 *b;
675     nxt_conn_t                *c;
676     nxt_event_engine_t        *engine;
677     nxt_controller_request_t  *r;
678 
679     c = obj;
680 
681     nxt_debug(task, "controller conn init fd:%d", c->socket.fd);
682 
683     r = nxt_mp_zget(c->mem_pool, sizeof(nxt_controller_request_t));
684     if (nxt_slow_path(r == NULL)) {
685         nxt_controller_conn_free(task, c, NULL);
686         return;
687     }
688 
689     r->conn = c;
690 
691     if (nxt_slow_path(nxt_http_parse_request_init(&r->parser, c->mem_pool)
692                       != NXT_OK))
693     {
694         nxt_controller_conn_free(task, c, NULL);
695         return;
696     }
697 
698     r->parser.encoded_slashes = 1;
699 
700     b = nxt_buf_mem_alloc(c->mem_pool, 1024, 0);
701     if (nxt_slow_path(b == NULL)) {
702         nxt_controller_conn_free(task, c, NULL);
703         return;
704     }
705 
706     c->read = b;
707     c->socket.data = r;
708     c->socket.read_ready = 1;
709     c->read_state = &nxt_controller_conn_read_state;
710 
711     engine = task->thread->engine;
712     c->read_work_queue = &engine->read_work_queue;
713     c->write_work_queue = &engine->write_work_queue;
714 
715     nxt_conn_read(engine, c);
716 }
717 
718 
719 static const nxt_event_conn_state_t  nxt_controller_conn_read_state
720     nxt_aligned(64) =
721 {
722     .ready_handler = nxt_controller_conn_read,
723     .close_handler = nxt_controller_conn_close,
724     .error_handler = nxt_controller_conn_read_error,
725 
726     .timer_handler = nxt_controller_conn_read_timeout,
727     .timer_value = nxt_controller_conn_timeout_value,
728     .timer_data = 300 * 1000,
729 };
730 
731 
732 static void
nxt_controller_conn_read(nxt_task_t * task,void * obj,void * data)733 nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data)
734 {
735     size_t                    preread;
736     nxt_buf_t                 *b;
737     nxt_int_t                 rc;
738     nxt_conn_t                *c;
739     nxt_controller_request_t  *r;
740 
741     c = obj;
742     r = data;
743 
744     nxt_debug(task, "controller conn read");
745 
746     nxt_queue_remove(&c->link);
747     nxt_queue_self(&c->link);
748 
749     b = c->read;
750 
751     rc = nxt_http_parse_request(&r->parser, &b->mem);
752 
753     if (nxt_slow_path(rc != NXT_DONE)) {
754 
755         if (rc == NXT_AGAIN) {
756             if (nxt_buf_mem_free_size(&b->mem) == 0) {
757                 nxt_log(task, NXT_LOG_ERR, "too long request headers");
758                 nxt_controller_conn_close(task, c, r);
759                 return;
760             }
761 
762             nxt_conn_read(task->thread->engine, c);
763             return;
764         }
765 
766         /* rc == NXT_ERROR */
767 
768         nxt_log(task, NXT_LOG_ERR, "parsing error");
769 
770         nxt_controller_conn_close(task, c, r);
771         return;
772     }
773 
774     rc = nxt_http_fields_process(r->parser.fields, &nxt_controller_fields_hash,
775                                  r);
776 
777     if (nxt_slow_path(rc != NXT_OK)) {
778         nxt_controller_conn_close(task, c, r);
779         return;
780     }
781 
782     preread = nxt_buf_mem_used_size(&b->mem);
783 
784     nxt_debug(task, "controller request header parsing complete, "
785                     "body length: %uz, preread: %uz",
786                     r->length, preread);
787 
788     if (preread >= r->length) {
789         nxt_controller_process_request(task, r);
790         return;
791     }
792 
793     if (r->length - preread > (size_t) nxt_buf_mem_free_size(&b->mem)) {
794         b = nxt_buf_mem_alloc(c->mem_pool, r->length, 0);
795         if (nxt_slow_path(b == NULL)) {
796             nxt_controller_conn_free(task, c, NULL);
797             return;
798         }
799 
800         b->mem.free = nxt_cpymem(b->mem.free, c->read->mem.pos, preread);
801 
802         c->read = b;
803     }
804 
805     c->read_state = &nxt_controller_conn_body_read_state;
806 
807     nxt_conn_read(task->thread->engine, c);
808 }
809 
810 
811 static nxt_msec_t
nxt_controller_conn_timeout_value(nxt_conn_t * c,uintptr_t data)812 nxt_controller_conn_timeout_value(nxt_conn_t *c, uintptr_t data)
813 {
814     return (nxt_msec_t) data;
815 }
816 
817 
818 static void
nxt_controller_conn_read_error(nxt_task_t * task,void * obj,void * data)819 nxt_controller_conn_read_error(nxt_task_t *task, void *obj, void *data)
820 {
821     nxt_conn_t  *c;
822 
823     c = obj;
824 
825     nxt_debug(task, "controller conn read error");
826 
827     nxt_controller_conn_close(task, c, data);
828 }
829 
830 
831 static void
nxt_controller_conn_read_timeout(nxt_task_t * task,void * obj,void * data)832 nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj, void *data)
833 {
834     nxt_timer_t  *timer;
835     nxt_conn_t   *c;
836 
837     timer = obj;
838 
839     c = nxt_read_timer_conn(timer);
840     c->socket.timedout = 1;
841     c->socket.closed = 1;
842 
843     nxt_debug(task, "controller conn read timeout");
844 
845     nxt_controller_conn_close(task, c, data);
846 }
847 
848 
849 static const nxt_event_conn_state_t  nxt_controller_conn_body_read_state
850     nxt_aligned(64) =
851 {
852     .ready_handler = nxt_controller_conn_body_read,
853     .close_handler = nxt_controller_conn_close,
854     .error_handler = nxt_controller_conn_read_error,
855 
856     .timer_handler = nxt_controller_conn_read_timeout,
857     .timer_value = nxt_controller_conn_timeout_value,
858     .timer_data = 60 * 1000,
859     .timer_autoreset = 1,
860 };
861 
862 
863 static void
nxt_controller_conn_body_read(nxt_task_t * task,void * obj,void * data)864 nxt_controller_conn_body_read(nxt_task_t *task, void *obj, void *data)
865 {
866     size_t                    read;
867     nxt_buf_t                 *b;
868     nxt_conn_t                *c;
869     nxt_controller_request_t  *r;
870 
871     c = obj;
872     r = data;
873     b = c->read;
874 
875     read = nxt_buf_mem_used_size(&b->mem);
876 
877     nxt_debug(task, "controller conn body read: %uz of %uz",
878               read, r->length);
879 
880     if (read >= r->length) {
881         nxt_controller_process_request(task, r);
882         return;
883     }
884 
885     nxt_conn_read(task->thread->engine, c);
886 }
887 
888 
889 static const nxt_event_conn_state_t  nxt_controller_conn_write_state
890     nxt_aligned(64) =
891 {
892     .ready_handler = nxt_controller_conn_write,
893     .error_handler = nxt_controller_conn_write_error,
894 
895     .timer_handler = nxt_controller_conn_write_timeout,
896     .timer_value = nxt_controller_conn_timeout_value,
897     .timer_data = 60 * 1000,
898     .timer_autoreset = 1,
899 };
900 
901 
902 static void
nxt_controller_conn_write(nxt_task_t * task,void * obj,void * data)903 nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data)
904 {
905     nxt_buf_t   *b;
906     nxt_conn_t  *c;
907 
908     c = obj;
909 
910     nxt_debug(task, "controller conn write");
911 
912     b = c->write;
913 
914     if (b->mem.pos != b->mem.free) {
915         nxt_conn_write(task->thread->engine, c);
916         return;
917     }
918 
919     nxt_debug(task, "controller conn write complete");
920 
921     nxt_controller_conn_close(task, c, data);
922 }
923 
924 
925 static void
nxt_controller_conn_write_error(nxt_task_t * task,void * obj,void * data)926 nxt_controller_conn_write_error(nxt_task_t *task, void *obj, void *data)
927 {
928     nxt_conn_t  *c;
929 
930     c = obj;
931 
932     nxt_debug(task, "controller conn write error");
933 
934     nxt_controller_conn_close(task, c, data);
935 }
936 
937 
938 static void
nxt_controller_conn_write_timeout(nxt_task_t * task,void * obj,void * data)939 nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj, void *data)
940 {
941     nxt_conn_t   *c;
942     nxt_timer_t  *timer;
943 
944     timer = obj;
945 
946     c = nxt_write_timer_conn(timer);
947     c->socket.timedout = 1;
948     c->socket.closed = 1;
949 
950     nxt_debug(task, "controller conn write timeout");
951 
952     nxt_controller_conn_close(task, c, data);
953 }
954 
955 
956 static const nxt_event_conn_state_t  nxt_controller_conn_close_state
957     nxt_aligned(64) =
958 {
959     .ready_handler = nxt_controller_conn_free,
960 };
961 
962 
963 static void
nxt_controller_conn_close(nxt_task_t * task,void * obj,void * data)964 nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data)
965 {
966     nxt_conn_t  *c;
967 
968     c = obj;
969 
970     nxt_debug(task, "controller conn close");
971 
972     nxt_queue_remove(&c->link);
973 
974     c->write_state = &nxt_controller_conn_close_state;
975 
976     nxt_conn_close(task->thread->engine, c);
977 }
978 
979 
980 static void
nxt_controller_conn_free(nxt_task_t * task,void * obj,void * data)981 nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data)
982 {
983     nxt_conn_t  *c;
984 
985     c = obj;
986 
987     nxt_debug(task, "controller conn free");
988 
989     nxt_sockaddr_cache_free(task->thread->engine, c);
990 
991     nxt_conn_free(task, c);
992 }
993 
994 
995 static nxt_int_t
nxt_controller_request_content_length(void * ctx,nxt_http_field_t * field,uintptr_t data)996 nxt_controller_request_content_length(void *ctx, nxt_http_field_t *field,
997     uintptr_t data)
998 {
999     off_t                     length;
1000     nxt_controller_request_t  *r;
1001 
1002     r = ctx;
1003 
1004     length = nxt_off_t_parse(field->value, field->value_length);
1005 
1006     if (nxt_fast_path(length >= 0)) {
1007 
1008         if (nxt_slow_path(length > NXT_SIZE_T_MAX)) {
1009             nxt_log_error(NXT_LOG_ERR, &r->conn->log,
1010                           "Content-Length is too big");
1011             return NXT_ERROR;
1012         }
1013 
1014         r->length = length;
1015         return NXT_OK;
1016     }
1017 
1018     nxt_log_error(NXT_LOG_ERR, &r->conn->log, "Content-Length is invalid");
1019 
1020     return NXT_ERROR;
1021 }
1022 
1023 
1024 static void
nxt_controller_process_request(nxt_task_t * task,nxt_controller_request_t * req)1025 nxt_controller_process_request(nxt_task_t *task, nxt_controller_request_t *req)
1026 {
1027     uint32_t                   i, count;
1028     nxt_str_t                  path;
1029     nxt_conn_t                 *c;
1030     nxt_conf_value_t           *value;
1031     nxt_controller_response_t  resp;
1032 #if (NXT_TLS)
1033     nxt_conf_value_t           *certs;
1034 
1035     static nxt_str_t certificates = nxt_string("certificates");
1036 #endif
1037     static nxt_str_t config = nxt_string("config");
1038 
1039     c = req->conn;
1040     path = req->parser.path;
1041 
1042     if (path.length > 1 && path.start[path.length - 1] == '/') {
1043         path.length--;
1044     }
1045 
1046     if (nxt_str_start(&path, "/config", 7)
1047         && (path.length == 7 || path.start[7] == '/'))
1048     {
1049         if (path.length == 7) {
1050             path.length = 1;
1051 
1052         } else {
1053             path.length -= 7;
1054             path.start += 7;
1055         }
1056 
1057         nxt_controller_process_config(task, req, &path);
1058         return;
1059     }
1060 
1061 #if (NXT_TLS)
1062 
1063     if (nxt_str_start(&path, "/certificates", 13)
1064         && (path.length == 13 || path.start[13] == '/'))
1065     {
1066         if (path.length == 13) {
1067             path.length = 1;
1068 
1069         } else {
1070             path.length -= 13;
1071             path.start += 13;
1072         }
1073 
1074         nxt_controller_process_cert(task, req, &path);
1075         return;
1076     }
1077 
1078 #endif
1079 
1080     if (nxt_str_start(&path, "/control/", 9)) {
1081         path.length -= 9;
1082         path.start += 9;
1083 
1084         nxt_controller_process_control(task, req, &path);
1085         return;
1086     }
1087 
1088     nxt_memzero(&resp, sizeof(nxt_controller_response_t));
1089 
1090     if (path.length == 1 && path.start[0] == '/') {
1091 
1092         if (!nxt_str_eq(&req->parser.method, "GET", 3)) {
1093             goto invalid_method;
1094         }
1095 
1096         count = 1;
1097 #if (NXT_TLS)
1098         count++;
1099 #endif
1100 
1101         value = nxt_conf_create_object(c->mem_pool, count);
1102         if (nxt_slow_path(value == NULL)) {
1103             goto alloc_fail;
1104         }
1105 
1106         i = 0;
1107 
1108 #if (NXT_TLS)
1109         certs = nxt_cert_info_get_all(c->mem_pool);
1110         if (nxt_slow_path(certs == NULL)) {
1111             goto alloc_fail;
1112         }
1113 
1114         nxt_conf_set_member(value, &certificates, certs, i++);
1115 #endif
1116 
1117         nxt_conf_set_member(value, &config, nxt_controller_conf.root, i);
1118 
1119         resp.status = 200;
1120         resp.conf = value;
1121 
1122         nxt_controller_response(task, req, &resp);
1123         return;
1124     }
1125 
1126     resp.status = 404;
1127     resp.title = (u_char *) "Value doesn't exist.";
1128     resp.offset = -1;
1129 
1130     nxt_controller_response(task, req, &resp);
1131     return;
1132 
1133 invalid_method:
1134 
1135     resp.status = 405;
1136     resp.title = (u_char *) "Invalid method.";
1137     resp.offset = -1;
1138 
1139     nxt_controller_response(task, req, &resp);
1140     return;
1141 
1142 alloc_fail:
1143 
1144     resp.status = 500;
1145     resp.title = (u_char *) "Memory allocation failed.";
1146     resp.offset = -1;
1147 
1148     nxt_controller_response(task, req, &resp);
1149     return;
1150 }
1151 
1152 
1153 static void
nxt_controller_process_config(nxt_task_t * task,nxt_controller_request_t * req,nxt_str_t * path)1154 nxt_controller_process_config(nxt_task_t *task, nxt_controller_request_t *req,
1155     nxt_str_t *path)
1156 {
1157     nxt_mp_t                   *mp;
1158     nxt_int_t                  rc;
1159     nxt_conn_t                 *c;
1160     nxt_bool_t                 post;
1161     nxt_buf_mem_t              *mbuf;
1162     nxt_conf_op_t              *ops;
1163     nxt_conf_value_t           *value;
1164     nxt_conf_validation_t      vldt;
1165     nxt_conf_json_error_t      error;
1166     nxt_controller_response_t  resp;
1167 
1168     static const nxt_str_t empty_obj = nxt_string("{}");
1169 
1170     nxt_memzero(&resp, sizeof(nxt_controller_response_t));
1171 
1172     c = req->conn;
1173 
1174     if (nxt_str_eq(&req->parser.method, "GET", 3)) {
1175 
1176         value = nxt_conf_get_path(nxt_controller_conf.root, path);
1177 
1178         if (value == NULL) {
1179             goto not_found;
1180         }
1181 
1182         resp.status = 200;
1183         resp.conf = value;
1184 
1185         nxt_controller_response(task, req, &resp);
1186         return;
1187     }
1188 
1189     if (nxt_str_eq(&req->parser.method, "POST", 4)) {
1190         if (path->length == 1) {
1191             goto not_allowed;
1192         }
1193 
1194         post = 1;
1195 
1196     } else {
1197         post = 0;
1198     }
1199 
1200     if (post || nxt_str_eq(&req->parser.method, "PUT", 3)) {
1201 
1202         if (nxt_controller_check_postpone_request(task)) {
1203             nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link);
1204             return;
1205         }
1206 
1207         mp = nxt_mp_create(1024, 128, 256, 32);
1208 
1209         if (nxt_slow_path(mp == NULL)) {
1210             goto alloc_fail;
1211         }
1212 
1213         mbuf = &c->read->mem;
1214 
1215         nxt_memzero(&error, sizeof(nxt_conf_json_error_t));
1216 
1217         /* Skip UTF-8 BOM. */
1218         if (nxt_buf_mem_used_size(mbuf) >= 3
1219             && nxt_memcmp(mbuf->pos, "\xEF\xBB\xBF", 3) == 0)
1220         {
1221             mbuf->pos += 3;
1222         }
1223 
1224         value = nxt_conf_json_parse(mp, mbuf->pos, mbuf->free, &error);
1225 
1226         if (value == NULL) {
1227             nxt_mp_destroy(mp);
1228 
1229             if (error.pos == NULL) {
1230                 goto alloc_fail;
1231             }
1232 
1233             resp.status = 400;
1234             resp.title = (u_char *) "Invalid JSON.";
1235             resp.detail.length = nxt_strlen(error.detail);
1236             resp.detail.start = error.detail;
1237             resp.offset = error.pos - mbuf->pos;
1238 
1239             nxt_conf_json_position(mbuf->pos, error.pos,
1240                                    &resp.line, &resp.column);
1241 
1242             nxt_controller_response(task, req, &resp);
1243             return;
1244         }
1245 
1246         if (path->length != 1) {
1247             rc = nxt_conf_op_compile(c->mem_pool, &ops,
1248                                      nxt_controller_conf.root,
1249                                      path, value, post);
1250 
1251             if (rc != NXT_CONF_OP_OK) {
1252                 nxt_mp_destroy(mp);
1253 
1254                 switch (rc) {
1255                 case NXT_CONF_OP_NOT_FOUND:
1256                     goto not_found;
1257 
1258                 case NXT_CONF_OP_NOT_ALLOWED:
1259                     goto not_allowed;
1260                 }
1261 
1262                 /* rc == NXT_CONF_OP_ERROR */
1263                 goto alloc_fail;
1264             }
1265 
1266             value = nxt_conf_clone(mp, ops, nxt_controller_conf.root);
1267 
1268             if (nxt_slow_path(value == NULL)) {
1269                 nxt_mp_destroy(mp);
1270                 goto alloc_fail;
1271             }
1272         }
1273 
1274         nxt_memzero(&vldt, sizeof(nxt_conf_validation_t));
1275 
1276         vldt.conf = value;
1277         vldt.pool = c->mem_pool;
1278         vldt.conf_pool = mp;
1279         vldt.ver = NXT_VERNUM;
1280 
1281         rc = nxt_conf_validate(&vldt);
1282 
1283         if (nxt_slow_path(rc != NXT_OK)) {
1284             nxt_mp_destroy(mp);
1285 
1286             if (rc == NXT_DECLINED) {
1287                 resp.detail = vldt.error;
1288                 goto invalid_conf;
1289             }
1290 
1291             /* rc == NXT_ERROR */
1292             goto alloc_fail;
1293         }
1294 
1295         rc = nxt_controller_conf_send(task, mp, value,
1296                                       nxt_controller_conf_handler, req);
1297 
1298         if (nxt_slow_path(rc != NXT_OK)) {
1299             nxt_mp_destroy(mp);
1300 
1301             /* rc == NXT_ERROR */
1302             goto alloc_fail;
1303         }
1304 
1305         req->conf.root = value;
1306         req->conf.pool = mp;
1307 
1308         nxt_queue_insert_head(&nxt_controller_waiting_requests, &req->link);
1309 
1310         return;
1311     }
1312 
1313     if (nxt_str_eq(&req->parser.method, "DELETE", 6)) {
1314 
1315         if (nxt_controller_check_postpone_request(task)) {
1316             nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link);
1317             return;
1318         }
1319 
1320         if (path->length == 1) {
1321             mp = nxt_mp_create(1024, 128, 256, 32);
1322 
1323             if (nxt_slow_path(mp == NULL)) {
1324                 goto alloc_fail;
1325             }
1326 
1327             value = nxt_conf_json_parse_str(mp, &empty_obj);
1328 
1329         } else {
1330             rc = nxt_conf_op_compile(c->mem_pool, &ops,
1331                                      nxt_controller_conf.root,
1332                                      path, NULL, 0);
1333 
1334             if (rc != NXT_OK) {
1335                 if (rc == NXT_CONF_OP_NOT_FOUND) {
1336                     goto not_found;
1337                 }
1338 
1339                 /* rc == NXT_CONF_OP_ERROR */
1340                 goto alloc_fail;
1341             }
1342 
1343             mp = nxt_mp_create(1024, 128, 256, 32);
1344 
1345             if (nxt_slow_path(mp == NULL)) {
1346                 goto alloc_fail;
1347             }
1348 
1349             value = nxt_conf_clone(mp, ops, nxt_controller_conf.root);
1350         }
1351 
1352         if (nxt_slow_path(value == NULL)) {
1353             nxt_mp_destroy(mp);
1354             goto alloc_fail;
1355         }
1356 
1357         nxt_memzero(&vldt, sizeof(nxt_conf_validation_t));
1358 
1359         vldt.conf = value;
1360         vldt.pool = c->mem_pool;
1361         vldt.conf_pool = mp;
1362         vldt.ver = NXT_VERNUM;
1363 
1364         rc = nxt_conf_validate(&vldt);
1365 
1366         if (nxt_slow_path(rc != NXT_OK)) {
1367             nxt_mp_destroy(mp);
1368 
1369             if (rc == NXT_DECLINED) {
1370                 resp.detail = vldt.error;
1371                 goto invalid_conf;
1372             }
1373 
1374             /* rc == NXT_ERROR */
1375             goto alloc_fail;
1376         }
1377 
1378         rc = nxt_controller_conf_send(task, mp, value,
1379                                       nxt_controller_conf_handler, req);
1380 
1381         if (nxt_slow_path(rc != NXT_OK)) {
1382             nxt_mp_destroy(mp);
1383 
1384             /* rc == NXT_ERROR */
1385             goto alloc_fail;
1386         }
1387 
1388         req->conf.root = value;
1389         req->conf.pool = mp;
1390 
1391         nxt_queue_insert_head(&nxt_controller_waiting_requests, &req->link);
1392 
1393         return;
1394     }
1395 
1396 not_allowed:
1397 
1398     resp.status = 405;
1399     resp.title = (u_char *) "Method isn't allowed.";
1400     resp.offset = -1;
1401 
1402     nxt_controller_response(task, req, &resp);
1403     return;
1404 
1405 not_found:
1406 
1407     resp.status = 404;
1408     resp.title = (u_char *) "Value doesn't exist.";
1409     resp.offset = -1;
1410 
1411     nxt_controller_response(task, req, &resp);
1412     return;
1413 
1414 invalid_conf:
1415 
1416     resp.status = 400;
1417     resp.title = (u_char *) "Invalid configuration.";
1418     resp.offset = -1;
1419 
1420     nxt_controller_response(task, req, &resp);
1421     return;
1422 
1423 alloc_fail:
1424 
1425     resp.status = 500;
1426     resp.title = (u_char *) "Memory allocation failed.";
1427     resp.offset = -1;
1428 
1429     nxt_controller_response(task, req, &resp);
1430 }
1431 
1432 
1433 static nxt_bool_t
nxt_controller_check_postpone_request(nxt_task_t * task)1434 nxt_controller_check_postpone_request(nxt_task_t *task)
1435 {
1436     nxt_port_t     *router_port;
1437     nxt_runtime_t  *rt;
1438 
1439     if (!nxt_queue_is_empty(&nxt_controller_waiting_requests)
1440         || nxt_controller_waiting_init_conf
1441         || !nxt_controller_router_ready)
1442     {
1443         return 1;
1444     }
1445 
1446     rt = task->thread->runtime;
1447 
1448     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
1449 
1450     return (router_port == NULL);
1451 }
1452 
1453 
1454 #if (NXT_TLS)
1455 
1456 static void
nxt_controller_process_cert(nxt_task_t * task,nxt_controller_request_t * req,nxt_str_t * path)1457 nxt_controller_process_cert(nxt_task_t *task,
1458     nxt_controller_request_t *req, nxt_str_t *path)
1459 {
1460     u_char                     *p;
1461     nxt_str_t                  name;
1462     nxt_int_t                  ret;
1463     nxt_conn_t                 *c;
1464     nxt_cert_t                 *cert;
1465     nxt_conf_value_t           *value;
1466     nxt_controller_response_t  resp;
1467 
1468     name.length = path->length - 1;
1469     name.start = path->start + 1;
1470 
1471     p = nxt_memchr(name.start, '/', name.length);
1472 
1473     if (p != NULL) {
1474         name.length = p - name.start;
1475 
1476         path->length -= p - path->start;
1477         path->start = p;
1478 
1479     } else {
1480         path = NULL;
1481     }
1482 
1483     nxt_memzero(&resp, sizeof(nxt_controller_response_t));
1484 
1485     c = req->conn;
1486 
1487     if (nxt_str_eq(&req->parser.method, "GET", 3)) {
1488 
1489         if (name.length != 0) {
1490             value = nxt_cert_info_get(&name);
1491             if (value == NULL) {
1492                 goto cert_not_found;
1493             }
1494 
1495             if (path != NULL) {
1496                 value = nxt_conf_get_path(value, path);
1497                 if (value == NULL) {
1498                     goto not_found;
1499                 }
1500             }
1501 
1502         } else {
1503             value = nxt_cert_info_get_all(c->mem_pool);
1504             if (value == NULL) {
1505                 goto alloc_fail;
1506             }
1507         }
1508 
1509         resp.status = 200;
1510         resp.conf = value;
1511 
1512         nxt_controller_response(task, req, &resp);
1513         return;
1514     }
1515 
1516     if (name.length == 0 || path != NULL) {
1517         goto invalid_name;
1518     }
1519 
1520     if (nxt_str_eq(&req->parser.method, "PUT", 3)) {
1521         value = nxt_cert_info_get(&name);
1522         if (value != NULL) {
1523             goto exists_cert;
1524         }
1525 
1526         cert = nxt_cert_mem(task, &c->read->mem);
1527         if (cert == NULL) {
1528             goto invalid_cert;
1529         }
1530 
1531         ret = nxt_cert_info_save(&name, cert);
1532 
1533         nxt_cert_destroy(cert);
1534 
1535         if (nxt_slow_path(ret != NXT_OK)) {
1536             goto alloc_fail;
1537         }
1538 
1539         nxt_cert_store_get(task, &name, c->mem_pool,
1540                            nxt_controller_process_cert_save, req);
1541         return;
1542     }
1543 
1544     if (nxt_str_eq(&req->parser.method, "DELETE", 6)) {
1545 
1546         if (nxt_controller_cert_in_use(&name)) {
1547             goto cert_in_use;
1548         }
1549 
1550         if (nxt_cert_info_delete(&name) != NXT_OK) {
1551             goto cert_not_found;
1552         }
1553 
1554         nxt_cert_store_delete(task, &name, c->mem_pool);
1555 
1556         resp.status = 200;
1557         resp.title = (u_char *) "Certificate deleted.";
1558 
1559         nxt_controller_response(task, req, &resp);
1560         return;
1561     }
1562 
1563     resp.status = 405;
1564     resp.title = (u_char *) "Invalid method.";
1565     resp.offset = -1;
1566 
1567     nxt_controller_response(task, req, &resp);
1568     return;
1569 
1570 invalid_name:
1571 
1572     resp.status = 400;
1573     resp.title = (u_char *) "Invalid certificate name.";
1574     resp.offset = -1;
1575 
1576     nxt_controller_response(task, req, &resp);
1577     return;
1578 
1579 invalid_cert:
1580 
1581     resp.status = 400;
1582     resp.title = (u_char *) "Invalid certificate.";
1583     resp.offset = -1;
1584 
1585     nxt_controller_response(task, req, &resp);
1586     return;
1587 
1588 exists_cert:
1589 
1590     resp.status = 400;
1591     resp.title = (u_char *) "Certificate already exists.";
1592     resp.offset = -1;
1593 
1594     nxt_controller_response(task, req, &resp);
1595     return;
1596 
1597 cert_in_use:
1598 
1599     resp.status = 400;
1600     resp.title = (u_char *) "Certificate is used in the configuration.";
1601     resp.offset = -1;
1602 
1603     nxt_controller_response(task, req, &resp);
1604     return;
1605 
1606 cert_not_found:
1607 
1608     resp.status = 404;
1609     resp.title = (u_char *) "Certificate doesn't exist.";
1610     resp.offset = -1;
1611 
1612     nxt_controller_response(task, req, &resp);
1613     return;
1614 
1615 not_found:
1616 
1617     resp.status = 404;
1618     resp.title = (u_char *) "Invalid path.";
1619     resp.offset = -1;
1620 
1621     nxt_controller_response(task, req, &resp);
1622     return;
1623 
1624 alloc_fail:
1625 
1626     resp.status = 500;
1627     resp.title = (u_char *) "Memory allocation failed.";
1628     resp.offset = -1;
1629 
1630     nxt_controller_response(task, req, &resp);
1631     return;
1632 }
1633 
1634 
1635 static void
nxt_controller_process_cert_save(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)1636 nxt_controller_process_cert_save(nxt_task_t *task, nxt_port_recv_msg_t *msg,
1637     void *data)
1638 {
1639     nxt_conn_t                *c;
1640     nxt_buf_mem_t             *mbuf;
1641     nxt_controller_request_t  *req;
1642     nxt_controller_response_t  resp;
1643 
1644     req = data;
1645 
1646     nxt_memzero(&resp, sizeof(nxt_controller_response_t));
1647 
1648     if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) {
1649         resp.status = 500;
1650         resp.title = (u_char *) "Failed to store certificate.";
1651 
1652         nxt_controller_response(task, req, &resp);
1653         return;
1654     }
1655 
1656     c = req->conn;
1657 
1658     mbuf = &c->read->mem;
1659 
1660     nxt_fd_write(msg->fd[0], mbuf->pos, nxt_buf_mem_used_size(mbuf));
1661 
1662     nxt_fd_close(msg->fd[0]);
1663 
1664     nxt_memzero(&resp, sizeof(nxt_controller_response_t));
1665 
1666     resp.status = 200;
1667     resp.title = (u_char *) "Certificate chain uploaded.";
1668 
1669     nxt_controller_response(task, req, &resp);
1670 }
1671 
1672 
1673 static nxt_bool_t
nxt_controller_cert_in_use(nxt_str_t * name)1674 nxt_controller_cert_in_use(nxt_str_t *name)
1675 {
1676     uint32_t          next;
1677     nxt_str_t         str;
1678     nxt_conf_value_t  *listeners, *listener, *value;
1679 
1680     static nxt_str_t  listeners_path = nxt_string("/listeners");
1681     static nxt_str_t  certificate_path = nxt_string("/tls/certificate");
1682 
1683     listeners = nxt_conf_get_path(nxt_controller_conf.root, &listeners_path);
1684 
1685     if (listeners != NULL) {
1686         next = 0;
1687 
1688         for ( ;; ) {
1689             listener = nxt_conf_next_object_member(listeners, &str, &next);
1690             if (listener == NULL) {
1691                 break;
1692             }
1693 
1694             value = nxt_conf_get_path(listener, &certificate_path);
1695             if (value == NULL) {
1696                 continue;
1697             }
1698 
1699             nxt_conf_get_string(value, &str);
1700 
1701             if (nxt_strstr_eq(&str, name)) {
1702                 return 1;
1703             }
1704         }
1705     }
1706 
1707     return 0;
1708 }
1709 
1710 #endif
1711 
1712 
1713 static void
nxt_controller_conf_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)1714 nxt_controller_conf_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
1715     void *data)
1716 {
1717     nxt_controller_request_t   *req;
1718     nxt_controller_response_t  resp;
1719 
1720     req = data;
1721 
1722     nxt_debug(task, "controller conf ready: %*s",
1723               nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos);
1724 
1725     nxt_queue_remove(&req->link);
1726 
1727     nxt_memzero(&resp, sizeof(nxt_controller_response_t));
1728 
1729     if (msg->port_msg.type == NXT_PORT_MSG_RPC_READY) {
1730         nxt_mp_destroy(nxt_controller_conf.pool);
1731 
1732         nxt_controller_conf = req->conf;
1733 
1734         nxt_controller_conf_store(task, req->conf.root);
1735 
1736         resp.status = 200;
1737         resp.title = (u_char *) "Reconfiguration done.";
1738 
1739     } else {
1740         nxt_mp_destroy(req->conf.pool);
1741 
1742         resp.status = 500;
1743         resp.title = (u_char *) "Failed to apply new configuration.";
1744         resp.offset = -1;
1745     }
1746 
1747     nxt_controller_response(task, req, &resp);
1748 
1749     nxt_controller_flush_requests(task);
1750 }
1751 
1752 
1753 static void
nxt_controller_process_control(nxt_task_t * task,nxt_controller_request_t * req,nxt_str_t * path)1754 nxt_controller_process_control(nxt_task_t *task,
1755     nxt_controller_request_t *req, nxt_str_t *path)
1756 {
1757     uint32_t                   stream;
1758     nxt_buf_t                  *b;
1759     nxt_int_t                  rc;
1760     nxt_port_t                 *router_port, *controller_port;
1761     nxt_runtime_t              *rt;
1762     nxt_conf_value_t           *value;
1763     nxt_controller_response_t  resp;
1764 
1765     static nxt_str_t applications = nxt_string("applications");
1766 
1767     nxt_memzero(&resp, sizeof(nxt_controller_response_t));
1768 
1769     if (!nxt_str_eq(&req->parser.method, "GET", 3)) {
1770         goto not_allowed;
1771     }
1772 
1773     if (!nxt_str_start(path, "applications/", 13)
1774         || nxt_memcmp(path->start + path->length - 8, "/restart", 8) != 0)
1775     {
1776         goto not_found;
1777     }
1778 
1779     path->start += 13;
1780     path->length -= 13 + 8;
1781 
1782     if (nxt_controller_check_postpone_request(task)) {
1783         nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link);
1784         return;
1785     }
1786 
1787     value = nxt_controller_conf.root;
1788     if (value == NULL) {
1789         goto not_found;
1790     }
1791 
1792     value = nxt_conf_get_object_member(value, &applications, NULL);
1793     if (value == NULL) {
1794         goto not_found;
1795     }
1796 
1797     value = nxt_conf_get_object_member(value, path, NULL);
1798     if (value == NULL) {
1799         goto not_found;
1800     }
1801 
1802     b = nxt_buf_mem_alloc(req->conn->mem_pool, path->length, 0);
1803     if (nxt_slow_path(b == NULL)) {
1804         goto alloc_fail;
1805     }
1806 
1807     b->mem.free = nxt_cpymem(b->mem.pos, path->start, path->length);
1808 
1809     rt = task->thread->runtime;
1810 
1811     controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
1812     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
1813 
1814     stream = nxt_port_rpc_register_handler(task, controller_port,
1815                                            nxt_controller_app_restart_handler,
1816                                            nxt_controller_app_restart_handler,
1817                                            router_port->pid, req);
1818     if (nxt_slow_path(stream == 0)) {
1819         goto alloc_fail;
1820     }
1821 
1822     rc = nxt_port_socket_write(task, router_port, NXT_PORT_MSG_APP_RESTART,
1823                                -1, stream, 0, b);
1824     if (nxt_slow_path(rc != NXT_OK)) {
1825         nxt_port_rpc_cancel(task, controller_port, stream);
1826 
1827         goto fail;
1828     }
1829 
1830     nxt_queue_insert_head(&nxt_controller_waiting_requests, &req->link);
1831 
1832     return;
1833 
1834 not_allowed:
1835 
1836     resp.status = 405;
1837     resp.title = (u_char *) "Method isn't allowed.";
1838     resp.offset = -1;
1839 
1840     nxt_controller_response(task, req, &resp);
1841     return;
1842 
1843 not_found:
1844 
1845     resp.status = 404;
1846     resp.title = (u_char *) "Value doesn't exist.";
1847     resp.offset = -1;
1848 
1849     nxt_controller_response(task, req, &resp);
1850     return;
1851 
1852 alloc_fail:
1853 
1854     resp.status = 500;
1855     resp.title = (u_char *) "Memory allocation failed.";
1856     resp.offset = -1;
1857 
1858     nxt_controller_response(task, req, &resp);
1859     return;
1860 
1861 fail:
1862 
1863     resp.status = 500;
1864     resp.title = (u_char *) "Send restart failed.";
1865     resp.offset = -1;
1866 
1867     nxt_controller_response(task, req, &resp);
1868 }
1869 
1870 
1871 static void
nxt_controller_app_restart_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg,void * data)1872 nxt_controller_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
1873     void *data)
1874 {
1875     nxt_controller_request_t   *req;
1876     nxt_controller_response_t  resp;
1877 
1878     req = data;
1879 
1880     nxt_debug(task, "controller app restart handler");
1881 
1882     nxt_queue_remove(&req->link);
1883 
1884     nxt_memzero(&resp, sizeof(nxt_controller_response_t));
1885 
1886     if (msg->port_msg.type == NXT_PORT_MSG_RPC_READY) {
1887         resp.status = 200;
1888         resp.title = (u_char *) "Ok";
1889 
1890     } else {
1891         resp.status = 500;
1892         resp.title = (u_char *) "Failed to restart app.";
1893         resp.offset = -1;
1894     }
1895 
1896     nxt_controller_response(task, req, &resp);
1897 
1898     nxt_controller_flush_requests(task);
1899 }
1900 
1901 
1902 static void
nxt_controller_conf_store(nxt_task_t * task,nxt_conf_value_t * conf)1903 nxt_controller_conf_store(nxt_task_t *task, nxt_conf_value_t *conf)
1904 {
1905     void           *mem;
1906     u_char         *end;
1907     size_t         size;
1908     nxt_fd_t       fd;
1909     nxt_buf_t      *b;
1910     nxt_port_t     *main_port;
1911     nxt_runtime_t  *rt;
1912 
1913     rt = task->thread->runtime;
1914 
1915     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
1916 
1917     size = nxt_conf_json_length(conf, NULL);
1918 
1919     fd = nxt_shm_open(task, size);
1920     if (nxt_slow_path(fd == -1)) {
1921         return;
1922     }
1923 
1924     mem = nxt_mem_mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
1925     if (nxt_slow_path(mem == MAP_FAILED)) {
1926         goto fail;
1927     }
1928 
1929     end = nxt_conf_json_print(mem, conf, NULL);
1930 
1931     nxt_mem_munmap(mem, size);
1932 
1933     size = end - (u_char *) mem;
1934 
1935     b = nxt_buf_mem_alloc(task->thread->engine->mem_pool, sizeof(size_t), 0);
1936     if (nxt_slow_path(b == NULL)) {
1937         goto fail;
1938     }
1939 
1940     b->mem.free = nxt_cpymem(b->mem.pos, &size, sizeof(size_t));
1941 
1942     (void) nxt_port_socket_write(task, main_port,
1943                                 NXT_PORT_MSG_CONF_STORE | NXT_PORT_MSG_CLOSE_FD,
1944                                  fd, 0, -1, b);
1945 
1946     return;
1947 
1948 fail:
1949 
1950     nxt_fd_close(fd);
1951 }
1952 
1953 
1954 static void
nxt_controller_response(nxt_task_t * task,nxt_controller_request_t * req,nxt_controller_response_t * resp)1955 nxt_controller_response(nxt_task_t *task, nxt_controller_request_t *req,
1956     nxt_controller_response_t *resp)
1957 {
1958     size_t                  size;
1959     nxt_str_t               status_line, str;
1960     nxt_buf_t               *b, *body;
1961     nxt_conn_t              *c;
1962     nxt_uint_t              n;
1963     nxt_conf_value_t        *value, *location;
1964     nxt_conf_json_pretty_t  pretty;
1965 
1966     static nxt_str_t  success_str = nxt_string("success");
1967     static nxt_str_t  error_str = nxt_string("error");
1968     static nxt_str_t  detail_str = nxt_string("detail");
1969     static nxt_str_t  location_str = nxt_string("location");
1970     static nxt_str_t  offset_str = nxt_string("offset");
1971     static nxt_str_t  line_str = nxt_string("line");
1972     static nxt_str_t  column_str = nxt_string("column");
1973 
1974     static nxt_time_string_t  date_cache = {
1975         (nxt_atomic_uint_t) -1,
1976         nxt_controller_date,
1977         "%s, %02d %s %4d %02d:%02d:%02d GMT",
1978         nxt_length("Wed, 31 Dec 1986 16:40:00 GMT"),
1979         NXT_THREAD_TIME_GMT,
1980         NXT_THREAD_TIME_SEC,
1981     };
1982 
1983     switch (resp->status) {
1984 
1985     case 200:
1986         nxt_str_set(&status_line, "200 OK");
1987         break;
1988 
1989     case 400:
1990         nxt_str_set(&status_line, "400 Bad Request");
1991         break;
1992 
1993     case 404:
1994         nxt_str_set(&status_line, "404 Not Found");
1995         break;
1996 
1997     case 405:
1998         nxt_str_set(&status_line, "405 Method Not Allowed");
1999         break;
2000 
2001     default:
2002         nxt_str_set(&status_line, "500 Internal Server Error");
2003         break;
2004     }
2005 
2006     c = req->conn;
2007     value = resp->conf;
2008 
2009     if (value == NULL) {
2010         n = 1
2011             + (resp->detail.length != 0)
2012             + (resp->status >= 400 && resp->offset != -1);
2013 
2014         value = nxt_conf_create_object(c->mem_pool, n);
2015 
2016         if (nxt_slow_path(value == NULL)) {
2017             nxt_controller_conn_close(task, c, req);
2018             return;
2019         }
2020 
2021         str.length = nxt_strlen(resp->title);
2022         str.start = resp->title;
2023 
2024         if (resp->status < 400) {
2025             nxt_conf_set_member_string(value, &success_str, &str, 0);
2026 
2027         } else {
2028             nxt_conf_set_member_string(value, &error_str, &str, 0);
2029         }
2030 
2031         n = 0;
2032 
2033         if (resp->detail.length != 0) {
2034             n++;
2035 
2036             nxt_conf_set_member_string(value, &detail_str, &resp->detail, n);
2037         }
2038 
2039         if (resp->status >= 400 && resp->offset != -1) {
2040             n++;
2041 
2042             location = nxt_conf_create_object(c->mem_pool,
2043                                               resp->line != 0 ? 3 : 1);
2044 
2045             nxt_conf_set_member(value, &location_str, location, n);
2046 
2047             nxt_conf_set_member_integer(location, &offset_str, resp->offset, 0);
2048 
2049             if (resp->line != 0) {
2050                 nxt_conf_set_member_integer(location, &line_str,
2051                                             resp->line, 1);
2052 
2053                 nxt_conf_set_member_integer(location, &column_str,
2054                                             resp->column, 2);
2055             }
2056         }
2057     }
2058 
2059     nxt_memzero(&pretty, sizeof(nxt_conf_json_pretty_t));
2060 
2061     size = nxt_conf_json_length(value, &pretty) + 2;
2062 
2063     body = nxt_buf_mem_alloc(c->mem_pool, size, 0);
2064     if (nxt_slow_path(body == NULL)) {
2065         nxt_controller_conn_close(task, c, req);
2066         return;
2067     }
2068 
2069     nxt_memzero(&pretty, sizeof(nxt_conf_json_pretty_t));
2070 
2071     body->mem.free = nxt_conf_json_print(body->mem.free, value, &pretty);
2072 
2073     body->mem.free = nxt_cpymem(body->mem.free, "\r\n", 2);
2074 
2075     size = nxt_length("HTTP/1.1 " "\r\n") + status_line.length
2076            + nxt_length("Server: " NXT_SERVER "\r\n")
2077            + nxt_length("Date: Wed, 31 Dec 1986 16:40:00 GMT\r\n")
2078            + nxt_length("Content-Type: application/json\r\n")
2079            + nxt_length("Content-Length: " "\r\n") + NXT_SIZE_T_LEN
2080            + nxt_length("Connection: close\r\n")
2081            + nxt_length("\r\n");
2082 
2083     b = nxt_buf_mem_alloc(c->mem_pool, size, 0);
2084     if (nxt_slow_path(b == NULL)) {
2085         nxt_controller_conn_close(task, c, req);
2086         return;
2087     }
2088 
2089     b->next = body;
2090 
2091     nxt_str_set(&str, "HTTP/1.1 ");
2092 
2093     b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length);
2094     b->mem.free = nxt_cpymem(b->mem.free, status_line.start,
2095                              status_line.length);
2096 
2097     nxt_str_set(&str, "\r\n"
2098                       "Server: " NXT_SERVER "\r\n"
2099                       "Date: ");
2100 
2101     b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length);
2102 
2103     b->mem.free = nxt_thread_time_string(task->thread, &date_cache,
2104                                          b->mem.free);
2105 
2106     nxt_str_set(&str, "\r\n"
2107                       "Content-Type: application/json\r\n"
2108                       "Content-Length: ");
2109 
2110     b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length);
2111 
2112     b->mem.free = nxt_sprintf(b->mem.free, b->mem.end, "%uz",
2113                               nxt_buf_mem_used_size(&body->mem));
2114 
2115     nxt_str_set(&str, "\r\n"
2116                       "Connection: close\r\n"
2117                       "\r\n");
2118 
2119     b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length);
2120 
2121     c->write = b;
2122     c->write_state = &nxt_controller_conn_write_state;
2123 
2124     nxt_conn_write(task->thread->engine, c);
2125 }
2126 
2127 
2128 static u_char *
nxt_controller_date(u_char * buf,nxt_realtime_t * now,struct tm * tm,size_t size,const char * format)2129 nxt_controller_date(u_char *buf, nxt_realtime_t *now, struct tm *tm,
2130     size_t size, const char *format)
2131 {
2132     static const char  *week[] = { "Sun", "Mon", "Tue", "Wed", "Thu", "Fri",
2133                                    "Sat" };
2134 
2135     static const char  *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun",
2136                                     "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
2137 
2138     return nxt_sprintf(buf, buf + size, format,
2139                        week[tm->tm_wday], tm->tm_mday,
2140                        month[tm->tm_mon], tm->tm_year + 1900,
2141                        tm->tm_hour, tm->tm_min, tm->tm_sec);
2142 }
2143