xref: /unit/src/nxt_controller.c (revision 1926:6e85d6c0b8bb)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Valentin V. Bartenev
5  * Copyright (C) NGINX, Inc.
6  */
7 
8 #include <nxt_main.h>
9 #include <nxt_runtime.h>
10 #include <nxt_main_process.h>
11 #include <nxt_conf.h>
12 #include <nxt_cert.h>
13 
14 
15 typedef struct {
16     nxt_conf_value_t  *root;
17     nxt_mp_t          *pool;
18 } nxt_controller_conf_t;
19 
20 
21 typedef struct {
22     nxt_http_request_parse_t  parser;
23     size_t                    length;
24     nxt_controller_conf_t     conf;
25     nxt_conn_t                *conn;
26     nxt_queue_link_t          link;
27 } nxt_controller_request_t;
28 
29 
30 typedef struct {
31     nxt_uint_t        status;
32     nxt_conf_value_t  *conf;
33 
34     u_char            *title;
35     nxt_str_t         detail;
36     ssize_t           offset;
37     nxt_uint_t        line;
38     nxt_uint_t        column;
39 } nxt_controller_response_t;
40 
41 
42 static nxt_int_t nxt_controller_prefork(nxt_task_t *task,
43     nxt_process_t *process, nxt_mp_t *mp);
44 static nxt_int_t nxt_controller_start(nxt_task_t *task,
45     nxt_process_data_t *data);
46 static void nxt_controller_process_new_port_handler(nxt_task_t *task,
47     nxt_port_recv_msg_t *msg);
48 static void nxt_controller_send_current_conf(nxt_task_t *task);
49 static void nxt_controller_router_ready_handler(nxt_task_t *task,
50     nxt_port_recv_msg_t *msg);
51 static void nxt_controller_remove_pid_handler(nxt_task_t *task,
52     nxt_port_recv_msg_t *msg);
53 static nxt_int_t nxt_controller_conf_default(void);
54 static void nxt_controller_conf_init_handler(nxt_task_t *task,
55     nxt_port_recv_msg_t *msg, void *data);
56 static void nxt_controller_flush_requests(nxt_task_t *task);
57 static nxt_int_t nxt_controller_conf_send(nxt_task_t *task, nxt_mp_t *mp,
58     nxt_conf_value_t *conf, nxt_port_rpc_handler_t handler, void *data);
59 
60 static void nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data);
61 static void nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data);
62 static nxt_msec_t nxt_controller_conn_timeout_value(nxt_conn_t *c,
63     uintptr_t data);
64 static void nxt_controller_conn_read_error(nxt_task_t *task, void *obj,
65     void *data);
66 static void nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj,
67     void *data);
68 static void nxt_controller_conn_body_read(nxt_task_t *task, void *obj,
69     void *data);
70 static void nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data);
71 static void nxt_controller_conn_write_error(nxt_task_t *task, void *obj,
72     void *data);
73 static void nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj,
74     void *data);
75 static void nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data);
76 static void nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data);
77 
78 static nxt_int_t nxt_controller_request_content_length(void *ctx,
79     nxt_http_field_t *field, uintptr_t data);
80 
81 static void nxt_controller_process_request(nxt_task_t *task,
82     nxt_controller_request_t *req);
83 static void nxt_controller_process_config(nxt_task_t *task,
84     nxt_controller_request_t *req, nxt_str_t *path);
85 static nxt_bool_t nxt_controller_check_postpone_request(nxt_task_t *task);
86 #if (NXT_TLS)
87 static void nxt_controller_process_cert(nxt_task_t *task,
88     nxt_controller_request_t *req, nxt_str_t *path);
89 static void nxt_controller_process_cert_save(nxt_task_t *task,
90     nxt_port_recv_msg_t *msg, void *data);
91 static nxt_bool_t nxt_controller_cert_in_use(nxt_str_t *name);
92 static void nxt_controller_cert_cleanup(nxt_task_t *task, void *obj,
93     void *data);
94 #endif
95 static void nxt_controller_process_control(nxt_task_t *task,
96     nxt_controller_request_t *req, nxt_str_t *path);
97 static void nxt_controller_app_restart_handler(nxt_task_t *task,
98     nxt_port_recv_msg_t *msg, void *data);
99 static void nxt_controller_conf_handler(nxt_task_t *task,
100     nxt_port_recv_msg_t *msg, void *data);
101 static void nxt_controller_conf_store(nxt_task_t *task,
102     nxt_conf_value_t *conf);
103 static void nxt_controller_response(nxt_task_t *task,
104     nxt_controller_request_t *req, nxt_controller_response_t *resp);
105 static u_char *nxt_controller_date(u_char *buf, nxt_realtime_t *now,
106     struct tm *tm, size_t size, const char *format);
107 
108 
109 static nxt_http_field_proc_t  nxt_controller_request_fields[] = {
110     { nxt_string("Content-Length"),
111       &nxt_controller_request_content_length, 0 },
112 };
113 
114 static nxt_lvlhsh_t            nxt_controller_fields_hash;
115 
116 static nxt_uint_t              nxt_controller_listening;
117 static nxt_uint_t              nxt_controller_router_ready;
118 static nxt_controller_conf_t   nxt_controller_conf;
119 static nxt_queue_t             nxt_controller_waiting_requests;
120 static nxt_bool_t              nxt_controller_waiting_init_conf;
121 
122 
123 static const nxt_event_conn_state_t  nxt_controller_conn_read_state;
124 static const nxt_event_conn_state_t  nxt_controller_conn_body_read_state;
125 static const nxt_event_conn_state_t  nxt_controller_conn_write_state;
126 static const nxt_event_conn_state_t  nxt_controller_conn_close_state;
127 
128 
129 static const nxt_port_handlers_t  nxt_controller_process_port_handlers = {
130     .quit           = nxt_signal_quit_handler,
131     .new_port       = nxt_controller_process_new_port_handler,
132     .change_file    = nxt_port_change_log_file_handler,
133     .mmap           = nxt_port_mmap_handler,
134     .process_ready  = nxt_controller_router_ready_handler,
135     .data           = nxt_port_data_handler,
136     .remove_pid     = nxt_controller_remove_pid_handler,
137     .rpc_ready      = nxt_port_rpc_handler,
138     .rpc_error      = nxt_port_rpc_handler,
139 };
140 
141 
142 const nxt_process_init_t  nxt_controller_process = {
143     .name           = "controller",
144     .type           = NXT_PROCESS_CONTROLLER,
145     .prefork        = nxt_controller_prefork,
146     .restart        = 1,
147     .setup          = nxt_process_core_setup,
148     .start          = nxt_controller_start,
149     .port_handlers  = &nxt_controller_process_port_handlers,
150     .signals        = nxt_process_signals,
151 };
152 
153 
154 static nxt_int_t
155 nxt_controller_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp)
156 {
157     ssize_t                n;
158     nxt_int_t              ret;
159     nxt_str_t              *conf;
160     nxt_file_t             file;
161     nxt_runtime_t          *rt;
162     nxt_file_info_t        fi;
163     nxt_controller_init_t  ctrl_init;
164 
165     nxt_log(task, NXT_LOG_INFO, "controller started");
166 
167     rt = task->thread->runtime;
168 
169     nxt_memzero(&ctrl_init, sizeof(nxt_controller_init_t));
170 
171     conf = &ctrl_init.conf;
172 
173     nxt_memzero(&file, sizeof(nxt_file_t));
174 
175     file.name = (nxt_file_name_t *) rt->conf;
176 
177     ret = nxt_file_open(task, &file, NXT_FILE_RDONLY, NXT_FILE_OPEN, 0);
178 
179     if (ret == NXT_OK) {
180         ret = nxt_file_info(&file, &fi);
181 
182         if (nxt_fast_path(ret == NXT_OK && nxt_is_file(&fi))) {
183             conf->length = nxt_file_size(&fi);
184             conf->start = nxt_mp_alloc(mp, conf->length);
185             if (nxt_slow_path(conf->start == NULL)) {
186                 nxt_file_close(task, &file);
187                 return NXT_ERROR;
188             }
189 
190             n = nxt_file_read(&file, conf->start, conf->length, 0);
191 
192             if (nxt_slow_path(n != (ssize_t) conf->length)) {
193                 conf->start = NULL;
194                 conf->length = 0;
195 
196                 nxt_alert(task, "failed to restore previous configuration: "
197                           "cannot read the file");
198             }
199         }
200 
201         nxt_file_close(task, &file);
202     }
203 
204 #if (NXT_TLS)
205     ctrl_init.certs = nxt_cert_store_load(task, mp);
206 
207     nxt_mp_cleanup(mp, nxt_controller_cert_cleanup, task, ctrl_init.certs, rt);
208 #endif
209 
210     process->data.controller = ctrl_init;
211 
212     return NXT_OK;
213 }
214 
215 
216 #if (NXT_TLS)
217 
218 static void
219 nxt_controller_cert_cleanup(nxt_task_t *task, void *obj, void *data)
220 {
221     pid_t          main_pid;
222     nxt_array_t    *certs;
223     nxt_runtime_t  *rt;
224 
225     certs = obj;
226     rt = data;
227 
228     main_pid = rt->port_by_type[NXT_PROCESS_MAIN]->pid;
229 
230     if (nxt_pid == main_pid && certs != NULL) {
231         nxt_cert_store_release(certs);
232     }
233 }
234 
235 #endif
236 
237 
238 static nxt_int_t
239 nxt_controller_start(nxt_task_t *task, nxt_process_data_t *data)
240 {
241     nxt_mp_t               *mp;
242     nxt_int_t              ret;
243     nxt_str_t              *json;
244     nxt_conf_value_t       *conf;
245     nxt_conf_validation_t  vldt;
246     nxt_controller_init_t  *init;
247 
248     ret = nxt_http_fields_hash(&nxt_controller_fields_hash,
249                                nxt_controller_request_fields,
250                                nxt_nitems(nxt_controller_request_fields));
251 
252     if (nxt_slow_path(ret != NXT_OK)) {
253         return NXT_ERROR;
254     }
255 
256     nxt_queue_init(&nxt_controller_waiting_requests);
257 
258     init = &data->controller;
259 
260 #if (NXT_TLS)
261     if (init->certs != NULL) {
262         nxt_cert_info_init(task, init->certs);
263         nxt_cert_store_release(init->certs);
264     }
265 #endif
266 
267     json = &init->conf;
268 
269     if (json->start == NULL) {
270         return NXT_OK;
271     }
272 
273     mp = nxt_mp_create(1024, 128, 256, 32);
274     if (nxt_slow_path(mp == NULL)) {
275         return NXT_ERROR;
276     }
277 
278     conf = nxt_conf_json_parse_str(mp, json);
279     if (nxt_slow_path(conf == NULL)) {
280         nxt_alert(task, "failed to restore previous configuration: "
281                   "file is corrupted or not enough memory");
282 
283         nxt_mp_destroy(mp);
284         return NXT_OK;
285     }
286 
287     nxt_memzero(&vldt, sizeof(nxt_conf_validation_t));
288 
289     vldt.pool = nxt_mp_create(1024, 128, 256, 32);
290     if (nxt_slow_path(vldt.pool == NULL)) {
291         nxt_mp_destroy(mp);
292         return NXT_ERROR;
293     }
294 
295     vldt.conf = conf;
296 
297     ret = nxt_conf_validate(&vldt);
298 
299     if (nxt_slow_path(ret != NXT_OK)) {
300 
301         if (ret == NXT_DECLINED) {
302             nxt_alert(task, "the previous configuration is invalid: %V",
303                       &vldt.error);
304 
305             nxt_mp_destroy(vldt.pool);
306             nxt_mp_destroy(mp);
307 
308             return NXT_OK;
309         }
310 
311         /* ret == NXT_ERROR */
312 
313         return NXT_ERROR;
314     }
315 
316     nxt_mp_destroy(vldt.pool);
317 
318     nxt_controller_conf.root = conf;
319     nxt_controller_conf.pool = mp;
320 
321     return NXT_OK;
322 }
323 
324 
325 static void
326 nxt_controller_process_new_port_handler(nxt_task_t *task,
327     nxt_port_recv_msg_t *msg)
328 {
329     nxt_port_new_port_handler(task, msg);
330 
331     if (msg->u.new_port->type != NXT_PROCESS_ROUTER
332         || !nxt_controller_router_ready)
333     {
334         return;
335     }
336 
337     nxt_controller_send_current_conf(task);
338 }
339 
340 
341 static void
342 nxt_controller_send_current_conf(nxt_task_t *task)
343 {
344     nxt_int_t         rc;
345     nxt_runtime_t     *rt;
346     nxt_conf_value_t  *conf;
347 
348     conf = nxt_controller_conf.root;
349 
350     if (conf != NULL) {
351         rc = nxt_controller_conf_send(task, nxt_controller_conf.pool, conf,
352                                       nxt_controller_conf_init_handler, NULL);
353 
354         if (nxt_fast_path(rc == NXT_OK)) {
355             nxt_controller_waiting_init_conf = 1;
356 
357             return;
358         }
359 
360         nxt_mp_destroy(nxt_controller_conf.pool);
361 
362         if (nxt_slow_path(nxt_controller_conf_default() != NXT_OK)) {
363             nxt_abort();
364         }
365     }
366 
367     if (nxt_slow_path(nxt_controller_conf_default() != NXT_OK)) {
368         nxt_abort();
369     }
370 
371     rt = task->thread->runtime;
372 
373     if (nxt_slow_path(nxt_listen_event(task, rt->controller_socket) == NULL)) {
374         nxt_abort();
375     }
376 
377     nxt_controller_listening = 1;
378 
379     nxt_controller_flush_requests(task);
380 }
381 
382 
383 static void
384 nxt_controller_router_ready_handler(nxt_task_t *task,
385     nxt_port_recv_msg_t *msg)
386 {
387     nxt_port_t     *router_port;
388     nxt_runtime_t  *rt;
389 
390     rt = task->thread->runtime;
391 
392     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
393 
394     nxt_controller_router_ready = 1;
395 
396     if (router_port != NULL) {
397         nxt_controller_send_current_conf(task);
398     }
399 }
400 
401 
402 static void
403 nxt_controller_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
404 {
405     nxt_pid_t      pid;
406     nxt_process_t  *process;
407     nxt_runtime_t  *rt;
408 
409     rt = task->thread->runtime;
410 
411     nxt_assert(nxt_buf_used_size(msg->buf) == sizeof(pid));
412 
413     nxt_memcpy(&pid, msg->buf->mem.pos, sizeof(pid));
414 
415     process = nxt_runtime_process_find(rt, pid);
416     if (process != NULL && nxt_process_type(process) == NXT_PROCESS_ROUTER) {
417         nxt_controller_router_ready = 0;
418     }
419 
420     nxt_port_remove_pid_handler(task, msg);
421 }
422 
423 
424 static nxt_int_t
425 nxt_controller_conf_default(void)
426 {
427     nxt_mp_t          *mp;
428     nxt_conf_value_t  *conf;
429 
430     static const nxt_str_t json
431         = nxt_string("{ \"listeners\": {}, \"applications\": {} }");
432 
433     mp = nxt_mp_create(1024, 128, 256, 32);
434 
435     if (nxt_slow_path(mp == NULL)) {
436         return NXT_ERROR;
437     }
438 
439     conf = nxt_conf_json_parse_str(mp, &json);
440 
441     if (nxt_slow_path(conf == NULL)) {
442         return NXT_ERROR;
443     }
444 
445     nxt_controller_conf.root = conf;
446     nxt_controller_conf.pool = mp;
447 
448     return NXT_OK;
449 }
450 
451 
452 static void
453 nxt_controller_conf_init_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
454     void *data)
455 {
456     nxt_runtime_t  *rt;
457 
458     nxt_controller_waiting_init_conf = 0;
459 
460     if (msg->port_msg.type != NXT_PORT_MSG_RPC_READY) {
461         nxt_alert(task, "failed to apply previous configuration");
462 
463         nxt_mp_destroy(nxt_controller_conf.pool);
464 
465         if (nxt_slow_path(nxt_controller_conf_default() != NXT_OK)) {
466             nxt_abort();
467         }
468     }
469 
470     if (nxt_controller_listening == 0) {
471         rt = task->thread->runtime;
472 
473         if (nxt_slow_path(nxt_listen_event(task, rt->controller_socket)
474                           == NULL))
475         {
476             nxt_abort();
477         }
478 
479         nxt_controller_listening = 1;
480     }
481 
482     nxt_controller_flush_requests(task);
483 }
484 
485 
486 static void
487 nxt_controller_flush_requests(nxt_task_t *task)
488 {
489     nxt_queue_t               queue;
490     nxt_controller_request_t  *req;
491 
492     nxt_queue_init(&queue);
493     nxt_queue_add(&queue, &nxt_controller_waiting_requests);
494 
495     nxt_queue_init(&nxt_controller_waiting_requests);
496 
497     nxt_queue_each(req, &queue, nxt_controller_request_t, link) {
498         nxt_controller_process_request(task, req);
499     } nxt_queue_loop;
500 }
501 
502 
503 static nxt_int_t
504 nxt_controller_conf_send(nxt_task_t *task, nxt_mp_t *mp, nxt_conf_value_t *conf,
505     nxt_port_rpc_handler_t handler, void *data)
506 {
507     void           *mem;
508     u_char         *end;
509     size_t         size;
510     uint32_t       stream;
511     nxt_fd_t       fd;
512     nxt_int_t      rc;
513     nxt_buf_t      *b;
514     nxt_port_t     *router_port, *controller_port;
515     nxt_runtime_t  *rt;
516 
517     rt = task->thread->runtime;
518 
519     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
520 
521     nxt_assert(router_port != NULL);
522     nxt_assert(nxt_controller_router_ready);
523 
524     controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
525 
526     size = nxt_conf_json_length(conf, NULL);
527 
528     b = nxt_buf_mem_alloc(mp, sizeof(size_t), 0);
529     if (nxt_slow_path(b == NULL)) {
530         return NXT_ERROR;
531     }
532 
533     fd = nxt_shm_open(task, size);
534     if (nxt_slow_path(fd == -1)) {
535         return NXT_ERROR;
536     }
537 
538     mem = nxt_mem_mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
539     if (nxt_slow_path(mem == MAP_FAILED)) {
540         goto fail;
541     }
542 
543     end = nxt_conf_json_print(mem, conf, NULL);
544 
545     nxt_mem_munmap(mem, size);
546 
547     size = end - (u_char *) mem;
548 
549     b->mem.free = nxt_cpymem(b->mem.pos, &size, sizeof(size_t));
550 
551     stream = nxt_port_rpc_register_handler(task, controller_port,
552                                            handler, handler,
553                                            router_port->pid, data);
554     if (nxt_slow_path(stream == 0)) {
555         goto fail;
556     }
557 
558     rc = nxt_port_socket_write(task, router_port,
559                                NXT_PORT_MSG_DATA_LAST | NXT_PORT_MSG_CLOSE_FD,
560                                fd, stream, controller_port->id, b);
561 
562     if (nxt_slow_path(rc != NXT_OK)) {
563         nxt_port_rpc_cancel(task, controller_port, stream);
564 
565         goto fail;
566     }
567 
568     return NXT_OK;
569 
570 fail:
571 
572     nxt_fd_close(fd);
573 
574     return NXT_ERROR;
575 }
576 
577 
578 nxt_int_t
579 nxt_runtime_controller_socket(nxt_task_t *task, nxt_runtime_t *rt)
580 {
581     nxt_listen_socket_t  *ls;
582 
583     ls = nxt_mp_alloc(rt->mem_pool, sizeof(nxt_listen_socket_t));
584     if (ls == NULL) {
585         return NXT_ERROR;
586     }
587 
588     ls->sockaddr = rt->controller_listen;
589 
590     nxt_listen_socket_remote_size(ls);
591 
592     ls->socket = -1;
593     ls->backlog = NXT_LISTEN_BACKLOG;
594     ls->read_after_accept = 1;
595     ls->flags = NXT_NONBLOCK;
596 
597 #if 0
598     /* STUB */
599     wq = nxt_mp_zget(cf->mem_pool, sizeof(nxt_work_queue_t));
600     if (wq == NULL) {
601         return NXT_ERROR;
602     }
603     nxt_work_queue_name(wq, "listen");
604     /**/
605 
606     ls->work_queue = wq;
607 #endif
608     ls->handler = nxt_controller_conn_init;
609 
610     if (nxt_listen_socket_create(task, rt->mem_pool, ls) != NXT_OK) {
611         return NXT_ERROR;
612     }
613 
614     rt->controller_socket = ls;
615 
616     return NXT_OK;
617 }
618 
619 
620 static void
621 nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data)
622 {
623     nxt_buf_t                 *b;
624     nxt_conn_t                *c;
625     nxt_event_engine_t        *engine;
626     nxt_controller_request_t  *r;
627 
628     c = obj;
629 
630     nxt_debug(task, "controller conn init fd:%d", c->socket.fd);
631 
632     r = nxt_mp_zget(c->mem_pool, sizeof(nxt_controller_request_t));
633     if (nxt_slow_path(r == NULL)) {
634         nxt_controller_conn_free(task, c, NULL);
635         return;
636     }
637 
638     r->conn = c;
639 
640     if (nxt_slow_path(nxt_http_parse_request_init(&r->parser, c->mem_pool)
641                       != NXT_OK))
642     {
643         nxt_controller_conn_free(task, c, NULL);
644         return;
645     }
646 
647     r->parser.encoded_slashes = 1;
648 
649     b = nxt_buf_mem_alloc(c->mem_pool, 1024, 0);
650     if (nxt_slow_path(b == NULL)) {
651         nxt_controller_conn_free(task, c, NULL);
652         return;
653     }
654 
655     c->read = b;
656     c->socket.data = r;
657     c->socket.read_ready = 1;
658     c->read_state = &nxt_controller_conn_read_state;
659 
660     engine = task->thread->engine;
661     c->read_work_queue = &engine->read_work_queue;
662     c->write_work_queue = &engine->write_work_queue;
663 
664     nxt_conn_read(engine, c);
665 }
666 
667 
668 static const nxt_event_conn_state_t  nxt_controller_conn_read_state
669     nxt_aligned(64) =
670 {
671     .ready_handler = nxt_controller_conn_read,
672     .close_handler = nxt_controller_conn_close,
673     .error_handler = nxt_controller_conn_read_error,
674 
675     .timer_handler = nxt_controller_conn_read_timeout,
676     .timer_value = nxt_controller_conn_timeout_value,
677     .timer_data = 60 * 1000,
678 };
679 
680 
681 static void
682 nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data)
683 {
684     size_t                    preread;
685     nxt_buf_t                 *b;
686     nxt_int_t                 rc;
687     nxt_conn_t                *c;
688     nxt_controller_request_t  *r;
689 
690     c = obj;
691     r = data;
692 
693     nxt_debug(task, "controller conn read");
694 
695     nxt_queue_remove(&c->link);
696     nxt_queue_self(&c->link);
697 
698     b = c->read;
699 
700     rc = nxt_http_parse_request(&r->parser, &b->mem);
701 
702     if (nxt_slow_path(rc != NXT_DONE)) {
703 
704         if (rc == NXT_AGAIN) {
705             if (nxt_buf_mem_free_size(&b->mem) == 0) {
706                 nxt_log(task, NXT_LOG_ERR, "too long request headers");
707                 nxt_controller_conn_close(task, c, r);
708                 return;
709             }
710 
711             nxt_conn_read(task->thread->engine, c);
712             return;
713         }
714 
715         /* rc == NXT_ERROR */
716 
717         nxt_log(task, NXT_LOG_ERR, "parsing error");
718 
719         nxt_controller_conn_close(task, c, r);
720         return;
721     }
722 
723     rc = nxt_http_fields_process(r->parser.fields, &nxt_controller_fields_hash,
724                                  r);
725 
726     if (nxt_slow_path(rc != NXT_OK)) {
727         nxt_controller_conn_close(task, c, r);
728         return;
729     }
730 
731     preread = nxt_buf_mem_used_size(&b->mem);
732 
733     nxt_debug(task, "controller request header parsing complete, "
734                     "body length: %uz, preread: %uz",
735                     r->length, preread);
736 
737     if (preread >= r->length) {
738         nxt_controller_process_request(task, r);
739         return;
740     }
741 
742     if (r->length - preread > (size_t) nxt_buf_mem_free_size(&b->mem)) {
743         b = nxt_buf_mem_alloc(c->mem_pool, r->length, 0);
744         if (nxt_slow_path(b == NULL)) {
745             nxt_controller_conn_free(task, c, NULL);
746             return;
747         }
748 
749         b->mem.free = nxt_cpymem(b->mem.free, c->read->mem.pos, preread);
750 
751         c->read = b;
752     }
753 
754     c->read_state = &nxt_controller_conn_body_read_state;
755 
756     nxt_conn_read(task->thread->engine, c);
757 }
758 
759 
760 static nxt_msec_t
761 nxt_controller_conn_timeout_value(nxt_conn_t *c, uintptr_t data)
762 {
763     return (nxt_msec_t) data;
764 }
765 
766 
767 static void
768 nxt_controller_conn_read_error(nxt_task_t *task, void *obj, void *data)
769 {
770     nxt_conn_t  *c;
771 
772     c = obj;
773 
774     nxt_debug(task, "controller conn read error");
775 
776     nxt_controller_conn_close(task, c, data);
777 }
778 
779 
780 static void
781 nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj, void *data)
782 {
783     nxt_timer_t  *timer;
784     nxt_conn_t   *c;
785 
786     timer = obj;
787 
788     c = nxt_read_timer_conn(timer);
789     c->socket.timedout = 1;
790     c->socket.closed = 1;
791 
792     nxt_debug(task, "controller conn read timeout");
793 
794     nxt_controller_conn_close(task, c, data);
795 }
796 
797 
798 static const nxt_event_conn_state_t  nxt_controller_conn_body_read_state
799     nxt_aligned(64) =
800 {
801     .ready_handler = nxt_controller_conn_body_read,
802     .close_handler = nxt_controller_conn_close,
803     .error_handler = nxt_controller_conn_read_error,
804 
805     .timer_handler = nxt_controller_conn_read_timeout,
806     .timer_value = nxt_controller_conn_timeout_value,
807     .timer_data = 60 * 1000,
808     .timer_autoreset = 1,
809 };
810 
811 
812 static void
813 nxt_controller_conn_body_read(nxt_task_t *task, void *obj, void *data)
814 {
815     size_t                    read;
816     nxt_buf_t                 *b;
817     nxt_conn_t                *c;
818     nxt_controller_request_t  *r;
819 
820     c = obj;
821     r = data;
822     b = c->read;
823 
824     read = nxt_buf_mem_used_size(&b->mem);
825 
826     nxt_debug(task, "controller conn body read: %uz of %uz",
827               read, r->length);
828 
829     if (read >= r->length) {
830         nxt_controller_process_request(task, r);
831         return;
832     }
833 
834     nxt_conn_read(task->thread->engine, c);
835 }
836 
837 
838 static const nxt_event_conn_state_t  nxt_controller_conn_write_state
839     nxt_aligned(64) =
840 {
841     .ready_handler = nxt_controller_conn_write,
842     .error_handler = nxt_controller_conn_write_error,
843 
844     .timer_handler = nxt_controller_conn_write_timeout,
845     .timer_value = nxt_controller_conn_timeout_value,
846     .timer_data = 60 * 1000,
847     .timer_autoreset = 1,
848 };
849 
850 
851 static void
852 nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data)
853 {
854     nxt_buf_t   *b;
855     nxt_conn_t  *c;
856 
857     c = obj;
858 
859     nxt_debug(task, "controller conn write");
860 
861     b = c->write;
862 
863     if (b->mem.pos != b->mem.free) {
864         nxt_conn_write(task->thread->engine, c);
865         return;
866     }
867 
868     nxt_debug(task, "controller conn write complete");
869 
870     nxt_controller_conn_close(task, c, data);
871 }
872 
873 
874 static void
875 nxt_controller_conn_write_error(nxt_task_t *task, void *obj, void *data)
876 {
877     nxt_conn_t  *c;
878 
879     c = obj;
880 
881     nxt_debug(task, "controller conn write error");
882 
883     nxt_controller_conn_close(task, c, data);
884 }
885 
886 
887 static void
888 nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj, void *data)
889 {
890     nxt_conn_t   *c;
891     nxt_timer_t  *timer;
892 
893     timer = obj;
894 
895     c = nxt_write_timer_conn(timer);
896     c->socket.timedout = 1;
897     c->socket.closed = 1;
898 
899     nxt_debug(task, "controller conn write timeout");
900 
901     nxt_controller_conn_close(task, c, data);
902 }
903 
904 
905 static const nxt_event_conn_state_t  nxt_controller_conn_close_state
906     nxt_aligned(64) =
907 {
908     .ready_handler = nxt_controller_conn_free,
909 };
910 
911 
912 static void
913 nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data)
914 {
915     nxt_conn_t  *c;
916 
917     c = obj;
918 
919     nxt_debug(task, "controller conn close");
920 
921     nxt_queue_remove(&c->link);
922 
923     c->write_state = &nxt_controller_conn_close_state;
924 
925     nxt_conn_close(task->thread->engine, c);
926 }
927 
928 
929 static void
930 nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data)
931 {
932     nxt_conn_t  *c;
933 
934     c = obj;
935 
936     nxt_debug(task, "controller conn free");
937 
938     nxt_sockaddr_cache_free(task->thread->engine, c);
939 
940     nxt_conn_free(task, c);
941 }
942 
943 
944 static nxt_int_t
945 nxt_controller_request_content_length(void *ctx, nxt_http_field_t *field,
946     uintptr_t data)
947 {
948     off_t                     length;
949     nxt_controller_request_t  *r;
950 
951     r = ctx;
952 
953     length = nxt_off_t_parse(field->value, field->value_length);
954 
955     if (nxt_fast_path(length >= 0)) {
956 
957         if (nxt_slow_path(length > NXT_SIZE_T_MAX)) {
958             nxt_log_error(NXT_LOG_ERR, &r->conn->log,
959                           "Content-Length is too big");
960             return NXT_ERROR;
961         }
962 
963         r->length = length;
964         return NXT_OK;
965     }
966 
967     nxt_log_error(NXT_LOG_ERR, &r->conn->log, "Content-Length is invalid");
968 
969     return NXT_ERROR;
970 }
971 
972 
973 static void
974 nxt_controller_process_request(nxt_task_t *task, nxt_controller_request_t *req)
975 {
976     uint32_t                   i, count;
977     nxt_str_t                  path;
978     nxt_conn_t                 *c;
979     nxt_conf_value_t           *value;
980     nxt_controller_response_t  resp;
981 #if (NXT_TLS)
982     nxt_conf_value_t           *certs;
983 
984     static nxt_str_t certificates = nxt_string("certificates");
985 #endif
986     static nxt_str_t config = nxt_string("config");
987 
988     c = req->conn;
989     path = req->parser.path;
990 
991     if (path.length > 1 && path.start[path.length - 1] == '/') {
992         path.length--;
993     }
994 
995     if (nxt_str_start(&path, "/config", 7)
996         && (path.length == 7 || path.start[7] == '/'))
997     {
998         if (path.length == 7) {
999             path.length = 1;
1000 
1001         } else {
1002             path.length -= 7;
1003             path.start += 7;
1004         }
1005 
1006         nxt_controller_process_config(task, req, &path);
1007         return;
1008     }
1009 
1010 #if (NXT_TLS)
1011 
1012     if (nxt_str_start(&path, "/certificates", 13)
1013         && (path.length == 13 || path.start[13] == '/'))
1014     {
1015         if (path.length == 13) {
1016             path.length = 1;
1017 
1018         } else {
1019             path.length -= 13;
1020             path.start += 13;
1021         }
1022 
1023         nxt_controller_process_cert(task, req, &path);
1024         return;
1025     }
1026 
1027 #endif
1028 
1029     if (nxt_str_start(&path, "/control/", 9)) {
1030         path.length -= 9;
1031         path.start += 9;
1032 
1033         nxt_controller_process_control(task, req, &path);
1034         return;
1035     }
1036 
1037     nxt_memzero(&resp, sizeof(nxt_controller_response_t));
1038 
1039     if (path.length == 1 && path.start[0] == '/') {
1040 
1041         if (!nxt_str_eq(&req->parser.method, "GET", 3)) {
1042             goto invalid_method;
1043         }
1044 
1045         count = 1;
1046 #if (NXT_TLS)
1047         count++;
1048 #endif
1049 
1050         value = nxt_conf_create_object(c->mem_pool, count);
1051         if (nxt_slow_path(value == NULL)) {
1052             goto alloc_fail;
1053         }
1054 
1055         i = 0;
1056 
1057 #if (NXT_TLS)
1058         certs = nxt_cert_info_get_all(c->mem_pool);
1059         if (nxt_slow_path(certs == NULL)) {
1060             goto alloc_fail;
1061         }
1062 
1063         nxt_conf_set_member(value, &certificates, certs, i++);
1064 #endif
1065 
1066         nxt_conf_set_member(value, &config, nxt_controller_conf.root, i);
1067 
1068         resp.status = 200;
1069         resp.conf = value;
1070 
1071         nxt_controller_response(task, req, &resp);
1072         return;
1073     }
1074 
1075     resp.status = 404;
1076     resp.title = (u_char *) "Value doesn't exist.";
1077     resp.offset = -1;
1078 
1079     nxt_controller_response(task, req, &resp);
1080     return;
1081 
1082 invalid_method:
1083 
1084     resp.status = 405;
1085     resp.title = (u_char *) "Invalid method.";
1086     resp.offset = -1;
1087 
1088     nxt_controller_response(task, req, &resp);
1089     return;
1090 
1091 alloc_fail:
1092 
1093     resp.status = 500;
1094     resp.title = (u_char *) "Memory allocation failed.";
1095     resp.offset = -1;
1096 
1097     nxt_controller_response(task, req, &resp);
1098     return;
1099 }
1100 
1101 
1102 static void
1103 nxt_controller_process_config(nxt_task_t *task, nxt_controller_request_t *req,
1104     nxt_str_t *path)
1105 {
1106     nxt_mp_t                   *mp;
1107     nxt_int_t                  rc;
1108     nxt_conn_t                 *c;
1109     nxt_bool_t                 post;
1110     nxt_buf_mem_t              *mbuf;
1111     nxt_conf_op_t              *ops;
1112     nxt_conf_value_t           *value;
1113     nxt_conf_validation_t      vldt;
1114     nxt_conf_json_error_t      error;
1115     nxt_controller_response_t  resp;
1116 
1117     static const nxt_str_t empty_obj = nxt_string("{}");
1118 
1119     nxt_memzero(&resp, sizeof(nxt_controller_response_t));
1120 
1121     c = req->conn;
1122 
1123     if (nxt_str_eq(&req->parser.method, "GET", 3)) {
1124 
1125         value = nxt_conf_get_path(nxt_controller_conf.root, path);
1126 
1127         if (value == NULL) {
1128             goto not_found;
1129         }
1130 
1131         resp.status = 200;
1132         resp.conf = value;
1133 
1134         nxt_controller_response(task, req, &resp);
1135         return;
1136     }
1137 
1138     if (nxt_str_eq(&req->parser.method, "POST", 4)) {
1139         if (path->length == 1) {
1140             goto not_allowed;
1141         }
1142 
1143         post = 1;
1144 
1145     } else {
1146         post = 0;
1147     }
1148 
1149     if (post || nxt_str_eq(&req->parser.method, "PUT", 3)) {
1150 
1151         if (nxt_controller_check_postpone_request(task)) {
1152             nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link);
1153             return;
1154         }
1155 
1156         mp = nxt_mp_create(1024, 128, 256, 32);
1157 
1158         if (nxt_slow_path(mp == NULL)) {
1159             goto alloc_fail;
1160         }
1161 
1162         mbuf = &c->read->mem;
1163 
1164         nxt_memzero(&error, sizeof(nxt_conf_json_error_t));
1165 
1166         /* Skip UTF-8 BOM. */
1167         if (nxt_buf_mem_used_size(mbuf) >= 3
1168             && nxt_memcmp(mbuf->pos, "\xEF\xBB\xBF", 3) == 0)
1169         {
1170             mbuf->pos += 3;
1171         }
1172 
1173         value = nxt_conf_json_parse(mp, mbuf->pos, mbuf->free, &error);
1174 
1175         if (value == NULL) {
1176             nxt_mp_destroy(mp);
1177 
1178             if (error.pos == NULL) {
1179                 goto alloc_fail;
1180             }
1181 
1182             resp.status = 400;
1183             resp.title = (u_char *) "Invalid JSON.";
1184             resp.detail.length = nxt_strlen(error.detail);
1185             resp.detail.start = error.detail;
1186             resp.offset = error.pos - mbuf->pos;
1187 
1188             nxt_conf_json_position(mbuf->pos, error.pos,
1189                                    &resp.line, &resp.column);
1190 
1191             nxt_controller_response(task, req, &resp);
1192             return;
1193         }
1194 
1195         if (path->length != 1) {
1196             rc = nxt_conf_op_compile(c->mem_pool, &ops,
1197                                      nxt_controller_conf.root,
1198                                      path, value, post);
1199 
1200             if (rc != NXT_CONF_OP_OK) {
1201                 nxt_mp_destroy(mp);
1202 
1203                 switch (rc) {
1204                 case NXT_CONF_OP_NOT_FOUND:
1205                     goto not_found;
1206 
1207                 case NXT_CONF_OP_NOT_ALLOWED:
1208                     goto not_allowed;
1209                 }
1210 
1211                 /* rc == NXT_CONF_OP_ERROR */
1212                 goto alloc_fail;
1213             }
1214 
1215             value = nxt_conf_clone(mp, ops, nxt_controller_conf.root);
1216 
1217             if (nxt_slow_path(value == NULL)) {
1218                 nxt_mp_destroy(mp);
1219                 goto alloc_fail;
1220             }
1221         }
1222 
1223         nxt_memzero(&vldt, sizeof(nxt_conf_validation_t));
1224 
1225         vldt.conf = value;
1226         vldt.pool = c->mem_pool;
1227 
1228         rc = nxt_conf_validate(&vldt);
1229 
1230         if (nxt_slow_path(rc != NXT_OK)) {
1231             nxt_mp_destroy(mp);
1232 
1233             if (rc == NXT_DECLINED) {
1234                 resp.detail = vldt.error;
1235                 goto invalid_conf;
1236             }
1237 
1238             /* rc == NXT_ERROR */
1239             goto alloc_fail;
1240         }
1241 
1242         rc = nxt_controller_conf_send(task, mp, value,
1243                                       nxt_controller_conf_handler, req);
1244 
1245         if (nxt_slow_path(rc != NXT_OK)) {
1246             nxt_mp_destroy(mp);
1247 
1248             /* rc == NXT_ERROR */
1249             goto alloc_fail;
1250         }
1251 
1252         req->conf.root = value;
1253         req->conf.pool = mp;
1254 
1255         nxt_queue_insert_head(&nxt_controller_waiting_requests, &req->link);
1256 
1257         return;
1258     }
1259 
1260     if (nxt_str_eq(&req->parser.method, "DELETE", 6)) {
1261 
1262         if (nxt_controller_check_postpone_request(task)) {
1263             nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link);
1264             return;
1265         }
1266 
1267         if (path->length == 1) {
1268             mp = nxt_mp_create(1024, 128, 256, 32);
1269 
1270             if (nxt_slow_path(mp == NULL)) {
1271                 goto alloc_fail;
1272             }
1273 
1274             value = nxt_conf_json_parse_str(mp, &empty_obj);
1275 
1276         } else {
1277             rc = nxt_conf_op_compile(c->mem_pool, &ops,
1278                                      nxt_controller_conf.root,
1279                                      path, NULL, 0);
1280 
1281             if (rc != NXT_OK) {
1282                 if (rc == NXT_CONF_OP_NOT_FOUND) {
1283                     goto not_found;
1284                 }
1285 
1286                 /* rc == NXT_CONF_OP_ERROR */
1287                 goto alloc_fail;
1288             }
1289 
1290             mp = nxt_mp_create(1024, 128, 256, 32);
1291 
1292             if (nxt_slow_path(mp == NULL)) {
1293                 goto alloc_fail;
1294             }
1295 
1296             value = nxt_conf_clone(mp, ops, nxt_controller_conf.root);
1297         }
1298 
1299         if (nxt_slow_path(value == NULL)) {
1300             nxt_mp_destroy(mp);
1301             goto alloc_fail;
1302         }
1303 
1304         nxt_memzero(&vldt, sizeof(nxt_conf_validation_t));
1305 
1306         vldt.conf = value;
1307         vldt.pool = c->mem_pool;
1308 
1309         rc = nxt_conf_validate(&vldt);
1310 
1311         if (nxt_slow_path(rc != NXT_OK)) {
1312             nxt_mp_destroy(mp);
1313 
1314             if (rc == NXT_DECLINED) {
1315                 resp.detail = vldt.error;
1316                 goto invalid_conf;
1317             }
1318 
1319             /* rc == NXT_ERROR */
1320             goto alloc_fail;
1321         }
1322 
1323         rc = nxt_controller_conf_send(task, mp, value,
1324                                       nxt_controller_conf_handler, req);
1325 
1326         if (nxt_slow_path(rc != NXT_OK)) {
1327             nxt_mp_destroy(mp);
1328 
1329             /* rc == NXT_ERROR */
1330             goto alloc_fail;
1331         }
1332 
1333         req->conf.root = value;
1334         req->conf.pool = mp;
1335 
1336         nxt_queue_insert_head(&nxt_controller_waiting_requests, &req->link);
1337 
1338         return;
1339     }
1340 
1341 not_allowed:
1342 
1343     resp.status = 405;
1344     resp.title = (u_char *) "Method isn't allowed.";
1345     resp.offset = -1;
1346 
1347     nxt_controller_response(task, req, &resp);
1348     return;
1349 
1350 not_found:
1351 
1352     resp.status = 404;
1353     resp.title = (u_char *) "Value doesn't exist.";
1354     resp.offset = -1;
1355 
1356     nxt_controller_response(task, req, &resp);
1357     return;
1358 
1359 invalid_conf:
1360 
1361     resp.status = 400;
1362     resp.title = (u_char *) "Invalid configuration.";
1363     resp.offset = -1;
1364 
1365     nxt_controller_response(task, req, &resp);
1366     return;
1367 
1368 alloc_fail:
1369 
1370     resp.status = 500;
1371     resp.title = (u_char *) "Memory allocation failed.";
1372     resp.offset = -1;
1373 
1374     nxt_controller_response(task, req, &resp);
1375 }
1376 
1377 
1378 static nxt_bool_t
1379 nxt_controller_check_postpone_request(nxt_task_t *task)
1380 {
1381     nxt_port_t     *router_port;
1382     nxt_runtime_t  *rt;
1383 
1384     if (!nxt_queue_is_empty(&nxt_controller_waiting_requests)
1385         || nxt_controller_waiting_init_conf
1386         || !nxt_controller_router_ready)
1387     {
1388         return 1;
1389     }
1390 
1391     rt = task->thread->runtime;
1392 
1393     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
1394 
1395     return (router_port == NULL);
1396 }
1397 
1398 
1399 #if (NXT_TLS)
1400 
1401 static void
1402 nxt_controller_process_cert(nxt_task_t *task,
1403     nxt_controller_request_t *req, nxt_str_t *path)
1404 {
1405     u_char                     *p;
1406     nxt_str_t                  name;
1407     nxt_int_t                  ret;
1408     nxt_conn_t                 *c;
1409     nxt_cert_t                 *cert;
1410     nxt_conf_value_t           *value;
1411     nxt_controller_response_t  resp;
1412 
1413     name.length = path->length - 1;
1414     name.start = path->start + 1;
1415 
1416     p = nxt_memchr(name.start, '/', name.length);
1417 
1418     if (p != NULL) {
1419         name.length = p - name.start;
1420 
1421         path->length -= p - path->start;
1422         path->start = p;
1423 
1424     } else {
1425         path = NULL;
1426     }
1427 
1428     nxt_memzero(&resp, sizeof(nxt_controller_response_t));
1429 
1430     c = req->conn;
1431 
1432     if (nxt_str_eq(&req->parser.method, "GET", 3)) {
1433 
1434         if (name.length != 0) {
1435             value = nxt_cert_info_get(&name);
1436             if (value == NULL) {
1437                 goto cert_not_found;
1438             }
1439 
1440             if (path != NULL) {
1441                 value = nxt_conf_get_path(value, path);
1442                 if (value == NULL) {
1443                     goto not_found;
1444                 }
1445             }
1446 
1447         } else {
1448             value = nxt_cert_info_get_all(c->mem_pool);
1449             if (value == NULL) {
1450                 goto alloc_fail;
1451             }
1452         }
1453 
1454         resp.status = 200;
1455         resp.conf = value;
1456 
1457         nxt_controller_response(task, req, &resp);
1458         return;
1459     }
1460 
1461     if (name.length == 0 || path != NULL) {
1462         goto invalid_name;
1463     }
1464 
1465     if (nxt_str_eq(&req->parser.method, "PUT", 3)) {
1466         value = nxt_cert_info_get(&name);
1467         if (value != NULL) {
1468             goto exists_cert;
1469         }
1470 
1471         cert = nxt_cert_mem(task, &c->read->mem);
1472         if (cert == NULL) {
1473             goto invalid_cert;
1474         }
1475 
1476         ret = nxt_cert_info_save(&name, cert);
1477 
1478         nxt_cert_destroy(cert);
1479 
1480         if (nxt_slow_path(ret != NXT_OK)) {
1481             goto alloc_fail;
1482         }
1483 
1484         nxt_cert_store_get(task, &name, c->mem_pool,
1485                            nxt_controller_process_cert_save, req);
1486         return;
1487     }
1488 
1489     if (nxt_str_eq(&req->parser.method, "DELETE", 6)) {
1490 
1491         if (nxt_controller_cert_in_use(&name)) {
1492             goto cert_in_use;
1493         }
1494 
1495         if (nxt_cert_info_delete(&name) != NXT_OK) {
1496             goto cert_not_found;
1497         }
1498 
1499         nxt_cert_store_delete(task, &name, c->mem_pool);
1500 
1501         resp.status = 200;
1502         resp.title = (u_char *) "Certificate deleted.";
1503 
1504         nxt_controller_response(task, req, &resp);
1505         return;
1506     }
1507 
1508     resp.status = 405;
1509     resp.title = (u_char *) "Invalid method.";
1510     resp.offset = -1;
1511 
1512     nxt_controller_response(task, req, &resp);
1513     return;
1514 
1515 invalid_name:
1516 
1517     resp.status = 400;
1518     resp.title = (u_char *) "Invalid certificate name.";
1519     resp.offset = -1;
1520 
1521     nxt_controller_response(task, req, &resp);
1522     return;
1523 
1524 invalid_cert:
1525 
1526     resp.status = 400;
1527     resp.title = (u_char *) "Invalid certificate.";
1528     resp.offset = -1;
1529 
1530     nxt_controller_response(task, req, &resp);
1531     return;
1532 
1533 exists_cert:
1534 
1535     resp.status = 400;
1536     resp.title = (u_char *) "Certificate already exists.";
1537     resp.offset = -1;
1538 
1539     nxt_controller_response(task, req, &resp);
1540     return;
1541 
1542 cert_in_use:
1543 
1544     resp.status = 400;
1545     resp.title = (u_char *) "Certificate is used in the configuration.";
1546     resp.offset = -1;
1547 
1548     nxt_controller_response(task, req, &resp);
1549     return;
1550 
1551 cert_not_found:
1552 
1553     resp.status = 404;
1554     resp.title = (u_char *) "Certificate doesn't exist.";
1555     resp.offset = -1;
1556 
1557     nxt_controller_response(task, req, &resp);
1558     return;
1559 
1560 not_found:
1561 
1562     resp.status = 404;
1563     resp.title = (u_char *) "Invalid path.";
1564     resp.offset = -1;
1565 
1566     nxt_controller_response(task, req, &resp);
1567     return;
1568 
1569 alloc_fail:
1570 
1571     resp.status = 500;
1572     resp.title = (u_char *) "Memory allocation failed.";
1573     resp.offset = -1;
1574 
1575     nxt_controller_response(task, req, &resp);
1576     return;
1577 }
1578 
1579 
1580 static void
1581 nxt_controller_process_cert_save(nxt_task_t *task, nxt_port_recv_msg_t *msg,
1582     void *data)
1583 {
1584     nxt_conn_t                *c;
1585     nxt_buf_mem_t             *mbuf;
1586     nxt_controller_request_t  *req;
1587     nxt_controller_response_t  resp;
1588 
1589     req = data;
1590 
1591     nxt_memzero(&resp, sizeof(nxt_controller_response_t));
1592 
1593     if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) {
1594         resp.status = 500;
1595         resp.title = (u_char *) "Failed to store certificate.";
1596 
1597         nxt_controller_response(task, req, &resp);
1598         return;
1599     }
1600 
1601     c = req->conn;
1602 
1603     mbuf = &c->read->mem;
1604 
1605     nxt_fd_write(msg->fd[0], mbuf->pos, nxt_buf_mem_used_size(mbuf));
1606 
1607     nxt_fd_close(msg->fd[0]);
1608 
1609     nxt_memzero(&resp, sizeof(nxt_controller_response_t));
1610 
1611     resp.status = 200;
1612     resp.title = (u_char *) "Certificate chain uploaded.";
1613 
1614     nxt_controller_response(task, req, &resp);
1615 }
1616 
1617 
1618 static nxt_bool_t
1619 nxt_controller_cert_in_use(nxt_str_t *name)
1620 {
1621     uint32_t          next;
1622     nxt_str_t         str;
1623     nxt_conf_value_t  *listeners, *listener, *value;
1624 
1625     static nxt_str_t  listeners_path = nxt_string("/listeners");
1626     static nxt_str_t  certificate_path = nxt_string("/tls/certificate");
1627 
1628     listeners = nxt_conf_get_path(nxt_controller_conf.root, &listeners_path);
1629 
1630     if (listeners != NULL) {
1631         next = 0;
1632 
1633         for ( ;; ) {
1634             listener = nxt_conf_next_object_member(listeners, &str, &next);
1635             if (listener == NULL) {
1636                 break;
1637             }
1638 
1639             value = nxt_conf_get_path(listener, &certificate_path);
1640             if (value == NULL) {
1641                 continue;
1642             }
1643 
1644             nxt_conf_get_string(value, &str);
1645 
1646             if (nxt_strstr_eq(&str, name)) {
1647                 return 1;
1648             }
1649         }
1650     }
1651 
1652     return 0;
1653 }
1654 
1655 #endif
1656 
1657 
1658 static void
1659 nxt_controller_conf_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
1660     void *data)
1661 {
1662     nxt_controller_request_t   *req;
1663     nxt_controller_response_t  resp;
1664 
1665     req = data;
1666 
1667     nxt_debug(task, "controller conf ready: %*s",
1668               nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos);
1669 
1670     nxt_queue_remove(&req->link);
1671 
1672     nxt_memzero(&resp, sizeof(nxt_controller_response_t));
1673 
1674     if (msg->port_msg.type == NXT_PORT_MSG_RPC_READY) {
1675         nxt_mp_destroy(nxt_controller_conf.pool);
1676 
1677         nxt_controller_conf = req->conf;
1678 
1679         nxt_controller_conf_store(task, req->conf.root);
1680 
1681         resp.status = 200;
1682         resp.title = (u_char *) "Reconfiguration done.";
1683 
1684     } else {
1685         nxt_mp_destroy(req->conf.pool);
1686 
1687         resp.status = 500;
1688         resp.title = (u_char *) "Failed to apply new configuration.";
1689         resp.offset = -1;
1690     }
1691 
1692     nxt_controller_response(task, req, &resp);
1693 
1694     nxt_controller_flush_requests(task);
1695 }
1696 
1697 
1698 static void
1699 nxt_controller_process_control(nxt_task_t *task,
1700     nxt_controller_request_t *req, nxt_str_t *path)
1701 {
1702     uint32_t                   stream;
1703     nxt_buf_t                  *b;
1704     nxt_int_t                  rc;
1705     nxt_port_t                 *router_port, *controller_port;
1706     nxt_runtime_t              *rt;
1707     nxt_conf_value_t           *value;
1708     nxt_controller_response_t  resp;
1709 
1710     static nxt_str_t applications = nxt_string("applications");
1711 
1712     nxt_memzero(&resp, sizeof(nxt_controller_response_t));
1713 
1714     if (!nxt_str_eq(&req->parser.method, "GET", 3)) {
1715         goto not_allowed;
1716     }
1717 
1718     if (!nxt_str_start(path, "applications/", 13)
1719         || nxt_memcmp(path->start + path->length - 8, "/restart", 8) != 0)
1720     {
1721         goto not_found;
1722     }
1723 
1724     path->start += 13;
1725     path->length -= 13 + 8;
1726 
1727     if (nxt_controller_check_postpone_request(task)) {
1728         nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link);
1729         return;
1730     }
1731 
1732     value = nxt_controller_conf.root;
1733     if (value == NULL) {
1734         goto not_found;
1735     }
1736 
1737     value = nxt_conf_get_object_member(value, &applications, NULL);
1738     if (value == NULL) {
1739         goto not_found;
1740     }
1741 
1742     value = nxt_conf_get_object_member(value, path, NULL);
1743     if (value == NULL) {
1744         goto not_found;
1745     }
1746 
1747     b = nxt_buf_mem_alloc(req->conn->mem_pool, path->length, 0);
1748     if (nxt_slow_path(b == NULL)) {
1749         goto alloc_fail;
1750     }
1751 
1752     b->mem.free = nxt_cpymem(b->mem.pos, path->start, path->length);
1753 
1754     rt = task->thread->runtime;
1755 
1756     controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
1757     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
1758 
1759     stream = nxt_port_rpc_register_handler(task, controller_port,
1760                                            nxt_controller_app_restart_handler,
1761                                            nxt_controller_app_restart_handler,
1762                                            router_port->pid, req);
1763     if (nxt_slow_path(stream == 0)) {
1764         goto alloc_fail;
1765     }
1766 
1767     rc = nxt_port_socket_write(task, router_port, NXT_PORT_MSG_APP_RESTART,
1768                                -1, stream, 0, b);
1769     if (nxt_slow_path(rc != NXT_OK)) {
1770         nxt_port_rpc_cancel(task, controller_port, stream);
1771 
1772         goto fail;
1773     }
1774 
1775     nxt_queue_insert_head(&nxt_controller_waiting_requests, &req->link);
1776 
1777     return;
1778 
1779 not_allowed:
1780 
1781     resp.status = 405;
1782     resp.title = (u_char *) "Method isn't allowed.";
1783     resp.offset = -1;
1784 
1785     nxt_controller_response(task, req, &resp);
1786     return;
1787 
1788 not_found:
1789 
1790     resp.status = 404;
1791     resp.title = (u_char *) "Value doesn't exist.";
1792     resp.offset = -1;
1793 
1794     nxt_controller_response(task, req, &resp);
1795     return;
1796 
1797 alloc_fail:
1798 
1799     resp.status = 500;
1800     resp.title = (u_char *) "Memory allocation failed.";
1801     resp.offset = -1;
1802 
1803     nxt_controller_response(task, req, &resp);
1804     return;
1805 
1806 fail:
1807 
1808     resp.status = 500;
1809     resp.title = (u_char *) "Send restart failed.";
1810     resp.offset = -1;
1811 
1812     nxt_controller_response(task, req, &resp);
1813 }
1814 
1815 
1816 static void
1817 nxt_controller_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
1818     void *data)
1819 {
1820     nxt_controller_request_t   *req;
1821     nxt_controller_response_t  resp;
1822 
1823     req = data;
1824 
1825     nxt_debug(task, "controller app restart handler");
1826 
1827     nxt_queue_remove(&req->link);
1828 
1829     nxt_memzero(&resp, sizeof(nxt_controller_response_t));
1830 
1831     if (msg->port_msg.type == NXT_PORT_MSG_RPC_READY) {
1832         resp.status = 200;
1833         resp.title = (u_char *) "Ok";
1834 
1835     } else {
1836         resp.status = 500;
1837         resp.title = (u_char *) "Failed to restart app.";
1838         resp.offset = -1;
1839     }
1840 
1841     nxt_controller_response(task, req, &resp);
1842 
1843     nxt_controller_flush_requests(task);
1844 }
1845 
1846 
1847 static void
1848 nxt_controller_conf_store(nxt_task_t *task, nxt_conf_value_t *conf)
1849 {
1850     void           *mem;
1851     u_char         *end;
1852     size_t         size;
1853     nxt_fd_t       fd;
1854     nxt_buf_t      *b;
1855     nxt_port_t     *main_port;
1856     nxt_runtime_t  *rt;
1857 
1858     rt = task->thread->runtime;
1859 
1860     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
1861 
1862     size = nxt_conf_json_length(conf, NULL);
1863 
1864     fd = nxt_shm_open(task, size);
1865     if (nxt_slow_path(fd == -1)) {
1866         return;
1867     }
1868 
1869     mem = nxt_mem_mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
1870     if (nxt_slow_path(mem == MAP_FAILED)) {
1871         goto fail;
1872     }
1873 
1874     end = nxt_conf_json_print(mem, conf, NULL);
1875 
1876     nxt_mem_munmap(mem, size);
1877 
1878     size = end - (u_char *) mem;
1879 
1880     b = nxt_buf_mem_alloc(task->thread->engine->mem_pool, sizeof(size_t), 0);
1881     if (nxt_slow_path(b == NULL)) {
1882         goto fail;
1883     }
1884 
1885     b->mem.free = nxt_cpymem(b->mem.pos, &size, sizeof(size_t));
1886 
1887     (void) nxt_port_socket_write(task, main_port,
1888                                 NXT_PORT_MSG_CONF_STORE | NXT_PORT_MSG_CLOSE_FD,
1889                                  fd, 0, -1, b);
1890 
1891     return;
1892 
1893 fail:
1894 
1895     nxt_fd_close(fd);
1896 }
1897 
1898 
1899 static void
1900 nxt_controller_response(nxt_task_t *task, nxt_controller_request_t *req,
1901     nxt_controller_response_t *resp)
1902 {
1903     size_t                  size;
1904     nxt_str_t               status_line, str;
1905     nxt_buf_t               *b, *body;
1906     nxt_conn_t              *c;
1907     nxt_uint_t              n;
1908     nxt_conf_value_t        *value, *location;
1909     nxt_conf_json_pretty_t  pretty;
1910 
1911     static nxt_str_t  success_str = nxt_string("success");
1912     static nxt_str_t  error_str = nxt_string("error");
1913     static nxt_str_t  detail_str = nxt_string("detail");
1914     static nxt_str_t  location_str = nxt_string("location");
1915     static nxt_str_t  offset_str = nxt_string("offset");
1916     static nxt_str_t  line_str = nxt_string("line");
1917     static nxt_str_t  column_str = nxt_string("column");
1918 
1919     static nxt_time_string_t  date_cache = {
1920         (nxt_atomic_uint_t) -1,
1921         nxt_controller_date,
1922         "%s, %02d %s %4d %02d:%02d:%02d GMT",
1923         nxt_length("Wed, 31 Dec 1986 16:40:00 GMT"),
1924         NXT_THREAD_TIME_GMT,
1925         NXT_THREAD_TIME_SEC,
1926     };
1927 
1928     switch (resp->status) {
1929 
1930     case 200:
1931         nxt_str_set(&status_line, "200 OK");
1932         break;
1933 
1934     case 400:
1935         nxt_str_set(&status_line, "400 Bad Request");
1936         break;
1937 
1938     case 404:
1939         nxt_str_set(&status_line, "404 Not Found");
1940         break;
1941 
1942     case 405:
1943         nxt_str_set(&status_line, "405 Method Not Allowed");
1944         break;
1945 
1946     default:
1947         nxt_str_set(&status_line, "500 Internal Server Error");
1948         break;
1949     }
1950 
1951     c = req->conn;
1952     value = resp->conf;
1953 
1954     if (value == NULL) {
1955         n = 1
1956             + (resp->detail.length != 0)
1957             + (resp->status >= 400 && resp->offset != -1);
1958 
1959         value = nxt_conf_create_object(c->mem_pool, n);
1960 
1961         if (nxt_slow_path(value == NULL)) {
1962             nxt_controller_conn_close(task, c, req);
1963             return;
1964         }
1965 
1966         str.length = nxt_strlen(resp->title);
1967         str.start = resp->title;
1968 
1969         if (resp->status < 400) {
1970             nxt_conf_set_member_string(value, &success_str, &str, 0);
1971 
1972         } else {
1973             nxt_conf_set_member_string(value, &error_str, &str, 0);
1974         }
1975 
1976         n = 0;
1977 
1978         if (resp->detail.length != 0) {
1979             n++;
1980 
1981             nxt_conf_set_member_string(value, &detail_str, &resp->detail, n);
1982         }
1983 
1984         if (resp->status >= 400 && resp->offset != -1) {
1985             n++;
1986 
1987             location = nxt_conf_create_object(c->mem_pool,
1988                                               resp->line != 0 ? 3 : 1);
1989 
1990             nxt_conf_set_member(value, &location_str, location, n);
1991 
1992             nxt_conf_set_member_integer(location, &offset_str, resp->offset, 0);
1993 
1994             if (resp->line != 0) {
1995                 nxt_conf_set_member_integer(location, &line_str,
1996                                             resp->line, 1);
1997 
1998                 nxt_conf_set_member_integer(location, &column_str,
1999                                             resp->column, 2);
2000             }
2001         }
2002     }
2003 
2004     nxt_memzero(&pretty, sizeof(nxt_conf_json_pretty_t));
2005 
2006     size = nxt_conf_json_length(value, &pretty) + 2;
2007 
2008     body = nxt_buf_mem_alloc(c->mem_pool, size, 0);
2009     if (nxt_slow_path(body == NULL)) {
2010         nxt_controller_conn_close(task, c, req);
2011         return;
2012     }
2013 
2014     nxt_memzero(&pretty, sizeof(nxt_conf_json_pretty_t));
2015 
2016     body->mem.free = nxt_conf_json_print(body->mem.free, value, &pretty);
2017 
2018     body->mem.free = nxt_cpymem(body->mem.free, "\r\n", 2);
2019 
2020     size = nxt_length("HTTP/1.1 " "\r\n") + status_line.length
2021            + nxt_length("Server: " NXT_SERVER "\r\n")
2022            + nxt_length("Date: Wed, 31 Dec 1986 16:40:00 GMT\r\n")
2023            + nxt_length("Content-Type: application/json\r\n")
2024            + nxt_length("Content-Length: " "\r\n") + NXT_SIZE_T_LEN
2025            + nxt_length("Connection: close\r\n")
2026            + nxt_length("\r\n");
2027 
2028     b = nxt_buf_mem_alloc(c->mem_pool, size, 0);
2029     if (nxt_slow_path(b == NULL)) {
2030         nxt_controller_conn_close(task, c, req);
2031         return;
2032     }
2033 
2034     b->next = body;
2035 
2036     nxt_str_set(&str, "HTTP/1.1 ");
2037 
2038     b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length);
2039     b->mem.free = nxt_cpymem(b->mem.free, status_line.start,
2040                              status_line.length);
2041 
2042     nxt_str_set(&str, "\r\n"
2043                       "Server: " NXT_SERVER "\r\n"
2044                       "Date: ");
2045 
2046     b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length);
2047 
2048     b->mem.free = nxt_thread_time_string(task->thread, &date_cache,
2049                                          b->mem.free);
2050 
2051     nxt_str_set(&str, "\r\n"
2052                       "Content-Type: application/json\r\n"
2053                       "Content-Length: ");
2054 
2055     b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length);
2056 
2057     b->mem.free = nxt_sprintf(b->mem.free, b->mem.end, "%uz",
2058                               nxt_buf_mem_used_size(&body->mem));
2059 
2060     nxt_str_set(&str, "\r\n"
2061                       "Connection: close\r\n"
2062                       "\r\n");
2063 
2064     b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length);
2065 
2066     c->write = b;
2067     c->write_state = &nxt_controller_conn_write_state;
2068 
2069     nxt_conn_write(task->thread->engine, c);
2070 }
2071 
2072 
2073 static u_char *
2074 nxt_controller_date(u_char *buf, nxt_realtime_t *now, struct tm *tm,
2075     size_t size, const char *format)
2076 {
2077     static const char  *week[] = { "Sun", "Mon", "Tue", "Wed", "Thu", "Fri",
2078                                    "Sat" };
2079 
2080     static const char  *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun",
2081                                     "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
2082 
2083     return nxt_sprintf(buf, buf + size, format,
2084                        week[tm->tm_wday], tm->tm_mday,
2085                        month[tm->tm_mon], tm->tm_year + 1900,
2086                        tm->tm_hour, tm->tm_min, tm->tm_sec);
2087 }
2088