xref: /unit/src/nxt_controller.c (revision 632:a92721a99956)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Valentin V. Bartenev
5  * Copyright (C) NGINX, Inc.
6  */
7 
8 #include <nxt_main.h>
9 #include <nxt_runtime.h>
10 #include <nxt_main_process.h>
11 #include <nxt_conf.h>
12 
13 
14 typedef struct {
15     nxt_conf_value_t  *root;
16     nxt_mp_t          *pool;
17 } nxt_controller_conf_t;
18 
19 
20 typedef struct {
21     nxt_http_request_parse_t  parser;
22     size_t                    length;
23     nxt_controller_conf_t     conf;
24     nxt_conn_t                *conn;
25     nxt_queue_link_t          link;
26 } nxt_controller_request_t;
27 
28 
29 typedef struct {
30     nxt_uint_t        status;
31     nxt_conf_value_t  *conf;
32 
33     u_char            *title;
34     nxt_str_t         detail;
35     ssize_t           offset;
36     nxt_uint_t        line;
37     nxt_uint_t        column;
38 } nxt_controller_response_t;
39 
40 
41 static void nxt_controller_process_new_port_handler(nxt_task_t *task,
42     nxt_port_recv_msg_t *msg);
43 static nxt_int_t nxt_controller_conf_default(void);
44 static void nxt_controller_conf_init_handler(nxt_task_t *task,
45     nxt_port_recv_msg_t *msg, void *data);
46 static nxt_int_t nxt_controller_conf_send(nxt_task_t *task,
47     nxt_conf_value_t *conf, nxt_port_rpc_handler_t handler, void *data);
48 
49 static void nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data);
50 static void nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data);
51 static nxt_msec_t nxt_controller_conn_timeout_value(nxt_conn_t *c,
52     uintptr_t data);
53 static void nxt_controller_conn_read_error(nxt_task_t *task, void *obj,
54     void *data);
55 static void nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj,
56     void *data);
57 static void nxt_controller_conn_body_read(nxt_task_t *task, void *obj,
58     void *data);
59 static void nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data);
60 static void nxt_controller_conn_write_error(nxt_task_t *task, void *obj,
61     void *data);
62 static void nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj,
63     void *data);
64 static void nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data);
65 static void nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data);
66 
67 static nxt_int_t nxt_controller_request_content_length(void *ctx,
68     nxt_http_field_t *field, uintptr_t data);
69 
70 static void nxt_controller_process_request(nxt_task_t *task,
71     nxt_controller_request_t *req);
72 static void nxt_controller_conf_handler(nxt_task_t *task,
73     nxt_port_recv_msg_t *msg, void *data);
74 static void nxt_controller_conf_store(nxt_task_t *task,
75     nxt_conf_value_t *conf);
76 static void nxt_controller_response(nxt_task_t *task,
77     nxt_controller_request_t *req, nxt_controller_response_t *resp);
78 static u_char *nxt_controller_date(u_char *buf, nxt_realtime_t *now,
79     struct tm *tm, size_t size, const char *format);
80 
81 
82 static nxt_http_field_proc_t  nxt_controller_request_fields[] = {
83     { nxt_string("Content-Length"),
84       &nxt_controller_request_content_length, 0 },
85 };
86 
87 static nxt_lvlhsh_t            nxt_controller_fields_hash;
88 
89 static nxt_uint_t              nxt_controller_listening;
90 static nxt_controller_conf_t   nxt_controller_conf;
91 static nxt_queue_t             nxt_controller_waiting_requests;
92 
93 
94 static const nxt_event_conn_state_t  nxt_controller_conn_read_state;
95 static const nxt_event_conn_state_t  nxt_controller_conn_body_read_state;
96 static const nxt_event_conn_state_t  nxt_controller_conn_write_state;
97 static const nxt_event_conn_state_t  nxt_controller_conn_close_state;
98 
99 
100 nxt_port_handlers_t  nxt_controller_process_port_handlers = {
101     .quit         = nxt_worker_process_quit_handler,
102     .new_port     = nxt_controller_process_new_port_handler,
103     .change_file  = nxt_port_change_log_file_handler,
104     .mmap         = nxt_port_mmap_handler,
105     .data         = nxt_port_data_handler,
106     .remove_pid   = nxt_port_remove_pid_handler,
107     .rpc_ready    = nxt_port_rpc_handler,
108     .rpc_error    = nxt_port_rpc_handler,
109 };
110 
111 
112 nxt_int_t
113 nxt_controller_start(nxt_task_t *task, void *data)
114 {
115     nxt_mp_t               *mp;
116     nxt_int_t              ret;
117     nxt_str_t              *json;
118     nxt_runtime_t          *rt;
119     nxt_conf_value_t       *conf;
120     nxt_event_engine_t     *engine;
121     nxt_conf_validation_t  vldt;
122 
123     rt = task->thread->runtime;
124 
125     engine = task->thread->engine;
126 
127     engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
128     if (nxt_slow_path(engine->mem_pool == NULL)) {
129         return NXT_ERROR;
130     }
131 
132     ret = nxt_http_fields_hash(&nxt_controller_fields_hash, rt->mem_pool,
133                                nxt_controller_request_fields,
134                                nxt_nitems(nxt_controller_request_fields));
135 
136     if (nxt_slow_path(ret != NXT_OK)) {
137         return NXT_ERROR;
138     }
139 
140     nxt_queue_init(&nxt_controller_waiting_requests);
141 
142     json = data;
143 
144     if (json->length == 0) {
145         return NXT_OK;
146     }
147 
148     mp = nxt_mp_create(1024, 128, 256, 32);
149     if (nxt_slow_path(mp == NULL)) {
150         return NXT_ERROR;
151     }
152 
153     conf = nxt_conf_json_parse_str(mp, json);
154     nxt_free(json->start);
155 
156     if (nxt_slow_path(conf == NULL)) {
157         nxt_alert(task, "failed to restore previous configuration: "
158                   "file is corrupted or not enough memory");
159 
160         nxt_mp_destroy(mp);
161         return NXT_OK;
162     }
163 
164     nxt_memzero(&vldt, sizeof(nxt_conf_validation_t));
165 
166     vldt.pool = nxt_mp_create(1024, 128, 256, 32);
167     if (nxt_slow_path(vldt.pool == NULL)) {
168         return NXT_ERROR;
169     }
170 
171     vldt.conf = conf;
172 
173     ret = nxt_conf_validate(&vldt);
174 
175     if (nxt_slow_path(ret != NXT_OK)) {
176 
177         if (ret == NXT_DECLINED) {
178             nxt_alert(task, "the previous configuration is invalid: %V",
179                       &vldt.error);
180 
181             nxt_mp_destroy(vldt.pool);
182             nxt_mp_destroy(mp);
183 
184             return NXT_OK;
185         }
186 
187         /* ret == NXT_ERROR */
188 
189         return NXT_ERROR;
190     }
191 
192     nxt_mp_destroy(vldt.pool);
193 
194     nxt_controller_conf.root = conf;
195     nxt_controller_conf.pool = mp;
196 
197     return NXT_OK;
198 }
199 
200 
201 static void
202 nxt_controller_process_new_port_handler(nxt_task_t *task,
203     nxt_port_recv_msg_t *msg)
204 {
205     nxt_int_t         rc;
206     nxt_runtime_t     *rt;
207     nxt_conf_value_t  *conf;
208 
209     nxt_port_new_port_handler(task, msg);
210 
211     if (msg->u.new_port->type != NXT_PROCESS_ROUTER) {
212         return;
213     }
214 
215     conf = nxt_controller_conf.root;
216 
217     if (conf != NULL) {
218         rc = nxt_controller_conf_send(task, conf,
219                                       nxt_controller_conf_init_handler, NULL);
220 
221         if (nxt_fast_path(rc == NXT_OK)) {
222             return;
223         }
224 
225         nxt_mp_destroy(nxt_controller_conf.pool);
226 
227         if (nxt_slow_path(nxt_controller_conf_default() != NXT_OK)) {
228             nxt_abort();
229         }
230     }
231 
232     if (nxt_slow_path(nxt_controller_conf_default() != NXT_OK)) {
233         nxt_abort();
234     }
235 
236     rt = task->thread->runtime;
237 
238     if (nxt_slow_path(nxt_listen_event(task, rt->controller_socket) == NULL)) {
239         nxt_abort();
240     }
241 
242     nxt_controller_listening = 1;
243 }
244 
245 
246 static nxt_int_t
247 nxt_controller_conf_default(void)
248 {
249     nxt_mp_t          *mp;
250     nxt_conf_value_t  *conf;
251 
252     static const nxt_str_t json
253         = nxt_string("{ \"listeners\": {}, \"applications\": {} }");
254 
255     mp = nxt_mp_create(1024, 128, 256, 32);
256 
257     if (nxt_slow_path(mp == NULL)) {
258         return NXT_ERROR;
259     }
260 
261     conf = nxt_conf_json_parse_str(mp, &json);
262 
263     if (nxt_slow_path(conf == NULL)) {
264         return NXT_ERROR;
265     }
266 
267     nxt_controller_conf.root = conf;
268     nxt_controller_conf.pool = mp;
269 
270     return NXT_OK;
271 }
272 
273 
274 static void
275 nxt_controller_conf_init_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
276     void *data)
277 {
278     nxt_runtime_t  *rt;
279 
280     if (msg->port_msg.type != NXT_PORT_MSG_RPC_READY) {
281         nxt_alert(task, "failed to apply previous configuration");
282 
283         nxt_mp_destroy(nxt_controller_conf.pool);
284 
285         if (nxt_slow_path(nxt_controller_conf_default() != NXT_OK)) {
286             nxt_abort();
287         }
288     }
289 
290     if (nxt_controller_listening == 0) {
291         rt = task->thread->runtime;
292 
293         if (nxt_slow_path(nxt_listen_event(task, rt->controller_socket)
294                           == NULL))
295         {
296             nxt_abort();
297         }
298 
299         nxt_controller_listening = 1;
300     }
301 }
302 
303 
304 static nxt_int_t
305 nxt_controller_conf_send(nxt_task_t *task, nxt_conf_value_t *conf,
306     nxt_port_rpc_handler_t handler, void *data)
307 {
308     size_t         size;
309     uint32_t       stream;
310     nxt_int_t      rc;
311     nxt_buf_t      *b;
312     nxt_port_t     *router_port, *controller_port;
313     nxt_runtime_t  *rt;
314 
315     rt = task->thread->runtime;
316 
317     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
318 
319     if (nxt_slow_path(router_port == NULL)) {
320         return NXT_DECLINED;
321     }
322 
323     controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
324 
325     size = nxt_conf_json_length(conf, NULL);
326 
327     b = nxt_port_mmap_get_buf(task, router_port, size);
328     if (nxt_slow_path(b == NULL)) {
329         return NXT_ERROR;
330     }
331 
332     b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL);
333 
334     stream = nxt_port_rpc_register_handler(task, controller_port,
335                                            handler, handler,
336                                            router_port->pid, data);
337 
338     rc = nxt_port_socket_write(task, router_port, NXT_PORT_MSG_DATA_LAST, -1,
339                                stream, controller_port->id, b);
340 
341     if (nxt_slow_path(rc != NXT_OK)) {
342         nxt_port_rpc_cancel(task, controller_port, stream);
343         return NXT_ERROR;
344     }
345 
346     return NXT_OK;
347 }
348 
349 
350 nxt_int_t
351 nxt_runtime_controller_socket(nxt_task_t *task, nxt_runtime_t *rt)
352 {
353     nxt_sockaddr_t       *sa;
354     nxt_listen_socket_t  *ls;
355 
356     sa = rt->controller_listen;
357 
358     ls = nxt_mp_alloc(rt->mem_pool, sizeof(nxt_listen_socket_t));
359     if (ls == NULL) {
360         return NXT_ERROR;
361     }
362 
363     ls->sockaddr = nxt_sockaddr_create(rt->mem_pool, &sa->u.sockaddr,
364                                        sa->socklen, sa->length);
365     if (ls->sockaddr == NULL) {
366         return NXT_ERROR;
367     }
368 
369     ls->sockaddr->type = sa->type;
370     nxt_sockaddr_text(ls->sockaddr);
371 
372     nxt_listen_socket_remote_size(ls);
373 
374     ls->socket = -1;
375     ls->backlog = NXT_LISTEN_BACKLOG;
376     ls->read_after_accept = 1;
377     ls->flags = NXT_NONBLOCK;
378 
379 #if 0
380     /* STUB */
381     wq = nxt_mp_zget(cf->mem_pool, sizeof(nxt_work_queue_t));
382     if (wq == NULL) {
383         return NXT_ERROR;
384     }
385     nxt_work_queue_name(wq, "listen");
386     /**/
387 
388     ls->work_queue = wq;
389 #endif
390     ls->handler = nxt_controller_conn_init;
391 
392     if (nxt_listen_socket_create(task, ls, 0) != NXT_OK) {
393         return NXT_ERROR;
394     }
395 
396     rt->controller_socket = ls;
397 
398     return NXT_OK;
399 }
400 
401 
402 static void
403 nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data)
404 {
405     nxt_buf_t                 *b;
406     nxt_conn_t                *c;
407     nxt_event_engine_t        *engine;
408     nxt_controller_request_t  *r;
409 
410     c = obj;
411 
412     nxt_debug(task, "controller conn init fd:%d", c->socket.fd);
413 
414     r = nxt_mp_zget(c->mem_pool, sizeof(nxt_controller_request_t));
415     if (nxt_slow_path(r == NULL)) {
416         nxt_controller_conn_free(task, c, NULL);
417         return;
418     }
419 
420     r->conn = c;
421 
422     if (nxt_slow_path(nxt_http_parse_request_init(&r->parser, c->mem_pool)
423                       != NXT_OK))
424     {
425         nxt_controller_conn_free(task, c, NULL);
426         return;
427     }
428 
429     b = nxt_buf_mem_alloc(c->mem_pool, 1024, 0);
430     if (nxt_slow_path(b == NULL)) {
431         nxt_controller_conn_free(task, c, NULL);
432         return;
433     }
434 
435     c->read = b;
436     c->socket.data = r;
437     c->socket.read_ready = 1;
438     c->read_state = &nxt_controller_conn_read_state;
439 
440     engine = task->thread->engine;
441     c->read_work_queue = &engine->read_work_queue;
442     c->write_work_queue = &engine->write_work_queue;
443 
444     nxt_conn_read(engine, c);
445 }
446 
447 
448 static const nxt_event_conn_state_t  nxt_controller_conn_read_state
449     nxt_aligned(64) =
450 {
451     .ready_handler = nxt_controller_conn_read,
452     .close_handler = nxt_controller_conn_close,
453     .error_handler = nxt_controller_conn_read_error,
454 
455     .timer_handler = nxt_controller_conn_read_timeout,
456     .timer_value = nxt_controller_conn_timeout_value,
457     .timer_data = 60 * 1000,
458 };
459 
460 
461 static void
462 nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data)
463 {
464     size_t                    preread;
465     nxt_buf_t                 *b;
466     nxt_int_t                 rc;
467     nxt_conn_t                *c;
468     nxt_controller_request_t  *r;
469 
470     c = obj;
471     r = data;
472 
473     nxt_debug(task, "controller conn read");
474 
475     nxt_queue_remove(&c->link);
476     nxt_queue_self(&c->link);
477 
478     b = c->read;
479 
480     rc = nxt_http_parse_request(&r->parser, &b->mem);
481 
482     if (nxt_slow_path(rc != NXT_DONE)) {
483 
484         if (rc == NXT_AGAIN) {
485             if (nxt_buf_mem_free_size(&b->mem) == 0) {
486                 nxt_log(task, NXT_LOG_ERR, "too long request headers");
487                 nxt_controller_conn_close(task, c, r);
488                 return;
489             }
490 
491             nxt_conn_read(task->thread->engine, c);
492             return;
493         }
494 
495         /* rc == NXT_ERROR */
496 
497         nxt_log(task, NXT_LOG_ERR, "parsing error");
498 
499         nxt_controller_conn_close(task, c, r);
500         return;
501     }
502 
503     rc = nxt_http_fields_process(r->parser.fields, &nxt_controller_fields_hash,
504                                  r);
505 
506     if (nxt_slow_path(rc != NXT_OK)) {
507         nxt_controller_conn_close(task, c, r);
508         return;
509     }
510 
511     preread = nxt_buf_mem_used_size(&b->mem);
512 
513     nxt_debug(task, "controller request header parsing complete, "
514                     "body length: %uz, preread: %uz",
515                     r->length, preread);
516 
517     if (preread >= r->length) {
518         nxt_controller_process_request(task, r);
519         return;
520     }
521 
522     if (r->length - preread > (size_t) nxt_buf_mem_free_size(&b->mem)) {
523         b = nxt_buf_mem_alloc(c->mem_pool, r->length, 0);
524         if (nxt_slow_path(b == NULL)) {
525             nxt_controller_conn_free(task, c, NULL);
526             return;
527         }
528 
529         b->mem.free = nxt_cpymem(b->mem.free, c->read->mem.pos, preread);
530 
531         c->read = b;
532     }
533 
534     c->read_state = &nxt_controller_conn_body_read_state;
535 
536     nxt_conn_read(task->thread->engine, c);
537 }
538 
539 
540 static nxt_msec_t
541 nxt_controller_conn_timeout_value(nxt_conn_t *c, uintptr_t data)
542 {
543     return (nxt_msec_t) data;
544 }
545 
546 
547 static void
548 nxt_controller_conn_read_error(nxt_task_t *task, void *obj, void *data)
549 {
550     nxt_conn_t  *c;
551 
552     c = obj;
553 
554     nxt_debug(task, "controller conn read error");
555 
556     nxt_controller_conn_close(task, c, data);
557 }
558 
559 
560 static void
561 nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj, void *data)
562 {
563     nxt_timer_t  *timer;
564     nxt_conn_t   *c;
565 
566     timer = obj;
567 
568     c = nxt_read_timer_conn(timer);
569     c->socket.timedout = 1;
570     c->socket.closed = 1;
571 
572     nxt_debug(task, "controller conn read timeout");
573 
574     nxt_controller_conn_close(task, c, data);
575 }
576 
577 
578 static const nxt_event_conn_state_t  nxt_controller_conn_body_read_state
579     nxt_aligned(64) =
580 {
581     .ready_handler = nxt_controller_conn_body_read,
582     .close_handler = nxt_controller_conn_close,
583     .error_handler = nxt_controller_conn_read_error,
584 
585     .timer_handler = nxt_controller_conn_read_timeout,
586     .timer_value = nxt_controller_conn_timeout_value,
587     .timer_data = 60 * 1000,
588     .timer_autoreset = 1,
589 };
590 
591 
592 static void
593 nxt_controller_conn_body_read(nxt_task_t *task, void *obj, void *data)
594 {
595     size_t                    read;
596     nxt_buf_t                 *b;
597     nxt_conn_t                *c;
598     nxt_controller_request_t  *r;
599 
600     c = obj;
601     r = data;
602     b = c->read;
603 
604     read = nxt_buf_mem_used_size(&b->mem);
605 
606     nxt_debug(task, "controller conn body read: %uz of %uz",
607               read, r->length);
608 
609     if (read >= r->length) {
610         nxt_controller_process_request(task, r);
611         return;
612     }
613 
614     nxt_conn_read(task->thread->engine, c);
615 }
616 
617 
618 static const nxt_event_conn_state_t  nxt_controller_conn_write_state
619     nxt_aligned(64) =
620 {
621     .ready_handler = nxt_controller_conn_write,
622     .error_handler = nxt_controller_conn_write_error,
623 
624     .timer_handler = nxt_controller_conn_write_timeout,
625     .timer_value = nxt_controller_conn_timeout_value,
626     .timer_data = 60 * 1000,
627     .timer_autoreset = 1,
628 };
629 
630 
631 static void
632 nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data)
633 {
634     nxt_buf_t   *b;
635     nxt_conn_t  *c;
636 
637     c = obj;
638 
639     nxt_debug(task, "controller conn write");
640 
641     b = c->write;
642 
643     if (b->mem.pos != b->mem.free) {
644         nxt_conn_write(task->thread->engine, c);
645         return;
646     }
647 
648     nxt_debug(task, "controller conn write complete");
649 
650     nxt_controller_conn_close(task, c, data);
651 }
652 
653 
654 static void
655 nxt_controller_conn_write_error(nxt_task_t *task, void *obj, void *data)
656 {
657     nxt_conn_t  *c;
658 
659     c = obj;
660 
661     nxt_debug(task, "controller conn write error");
662 
663     nxt_controller_conn_close(task, c, data);
664 }
665 
666 
667 static void
668 nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj, void *data)
669 {
670     nxt_conn_t   *c;
671     nxt_timer_t  *timer;
672 
673     timer = obj;
674 
675     c = nxt_write_timer_conn(timer);
676     c->socket.timedout = 1;
677     c->socket.closed = 1;
678 
679     nxt_debug(task, "controller conn write timeout");
680 
681     nxt_controller_conn_close(task, c, data);
682 }
683 
684 
685 static const nxt_event_conn_state_t  nxt_controller_conn_close_state
686     nxt_aligned(64) =
687 {
688     .ready_handler = nxt_controller_conn_free,
689 };
690 
691 
692 static void
693 nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data)
694 {
695     nxt_conn_t  *c;
696 
697     c = obj;
698 
699     nxt_debug(task, "controller conn close");
700 
701     nxt_queue_remove(&c->link);
702 
703     c->write_state = &nxt_controller_conn_close_state;
704 
705     nxt_conn_close(task->thread->engine, c);
706 }
707 
708 
709 static void
710 nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data)
711 {
712     nxt_conn_t  *c;
713 
714     c = obj;
715 
716     nxt_debug(task, "controller conn free");
717 
718     nxt_sockaddr_cache_free(task->thread->engine, c);
719 
720     nxt_conn_free(task, c);
721 }
722 
723 
724 static nxt_int_t
725 nxt_controller_request_content_length(void *ctx, nxt_http_field_t *field,
726     uintptr_t data)
727 {
728     off_t                     length;
729     nxt_controller_request_t  *r;
730 
731     r = ctx;
732 
733     length = nxt_off_t_parse(field->value, field->value_length);
734 
735     if (nxt_fast_path(length > 0)) {
736 
737         if (nxt_slow_path(length > NXT_SIZE_T_MAX)) {
738             nxt_log_error(NXT_LOG_ERR, &r->conn->log,
739                           "Content-Length is too big");
740             return NXT_ERROR;
741         }
742 
743         r->length = length;
744         return NXT_OK;
745     }
746 
747     nxt_log_error(NXT_LOG_ERR, &r->conn->log, "Content-Length is invalid");
748 
749     return NXT_ERROR;
750 }
751 
752 
753 static void
754 nxt_controller_process_request(nxt_task_t *task, nxt_controller_request_t *req)
755 {
756     nxt_mp_t                   *mp;
757     nxt_int_t                  rc;
758     nxt_str_t                  path;
759     nxt_conn_t                 *c;
760     nxt_buf_mem_t              *mbuf;
761     nxt_conf_op_t              *ops;
762     nxt_conf_value_t           *value;
763     nxt_conf_validation_t      vldt;
764     nxt_conf_json_error_t      error;
765     nxt_controller_response_t  resp;
766 
767     static const nxt_str_t empty_obj = nxt_string("{}");
768 
769     c = req->conn;
770     path = req->parser.path;
771 
772     if (nxt_str_start(&path, "/config", 7)) {
773 
774         if (path.length == 7) {
775             path.length = 1;
776 
777         } else if (path.start[7] == '/') {
778             path.length -= 7;
779             path.start += 7;
780         }
781     }
782 
783     if (path.length > 1 && path.start[path.length - 1] == '/') {
784         path.length--;
785     }
786 
787     nxt_memzero(&resp, sizeof(nxt_controller_response_t));
788 
789     if (nxt_str_eq(&req->parser.method, "GET", 3)) {
790 
791         value = nxt_conf_get_path(nxt_controller_conf.root, &path);
792 
793         if (value == NULL) {
794             goto not_found;
795         }
796 
797         resp.status = 200;
798         resp.conf = value;
799 
800         nxt_controller_response(task, req, &resp);
801         return;
802     }
803 
804     if (nxt_str_eq(&req->parser.method, "PUT", 3)) {
805 
806         if (!nxt_queue_is_empty(&nxt_controller_waiting_requests)) {
807             nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link);
808             return;
809         }
810 
811         mp = nxt_mp_create(1024, 128, 256, 32);
812 
813         if (nxt_slow_path(mp == NULL)) {
814             goto alloc_fail;
815         }
816 
817         mbuf = &c->read->mem;
818 
819         nxt_memzero(&error, sizeof(nxt_conf_json_error_t));
820 
821         value = nxt_conf_json_parse(mp, mbuf->pos, mbuf->free, &error);
822 
823         if (value == NULL) {
824             nxt_mp_destroy(mp);
825 
826             if (error.pos == NULL) {
827                 goto alloc_fail;
828             }
829 
830             resp.status = 400;
831             resp.title = (u_char *) "Invalid JSON.";
832             resp.detail.length = nxt_strlen(error.detail);
833             resp.detail.start = error.detail;
834             resp.offset = error.pos - mbuf->pos;
835 
836             nxt_conf_json_position(mbuf->pos, error.pos,
837                                    &resp.line, &resp.column);
838 
839             nxt_controller_response(task, req, &resp);
840             return;
841         }
842 
843         if (path.length != 1) {
844             rc = nxt_conf_op_compile(c->mem_pool, &ops,
845                                      nxt_controller_conf.root,
846                                      &path, value);
847 
848             if (rc != NXT_OK) {
849                 nxt_mp_destroy(mp);
850 
851                 if (rc == NXT_DECLINED) {
852                     goto not_found;
853                 }
854 
855                 goto alloc_fail;
856             }
857 
858             value = nxt_conf_clone(mp, ops, nxt_controller_conf.root);
859 
860             if (nxt_slow_path(value == NULL)) {
861                 nxt_mp_destroy(mp);
862                 goto alloc_fail;
863             }
864         }
865 
866         nxt_memzero(&vldt, sizeof(nxt_conf_validation_t));
867 
868         vldt.conf = value;
869         vldt.pool = c->mem_pool;
870 
871         rc = nxt_conf_validate(&vldt);
872 
873         if (nxt_slow_path(rc != NXT_OK)) {
874             nxt_mp_destroy(mp);
875 
876             if (rc == NXT_DECLINED) {
877                 resp.detail = vldt.error;
878                 goto invalid_conf;
879             }
880 
881             /* rc == NXT_ERROR */
882             goto alloc_fail;
883         }
884 
885         rc = nxt_controller_conf_send(task, value,
886                                       nxt_controller_conf_handler, req);
887 
888         if (nxt_slow_path(rc != NXT_OK)) {
889             nxt_mp_destroy(mp);
890 
891             if (rc == NXT_DECLINED) {
892                 goto no_router;
893             }
894 
895             /* rc == NXT_ERROR */
896             goto alloc_fail;
897         }
898 
899         req->conf.root = value;
900         req->conf.pool = mp;
901 
902         nxt_queue_insert_head(&nxt_controller_waiting_requests, &req->link);
903 
904         return;
905     }
906 
907     if (nxt_str_eq(&req->parser.method, "DELETE", 6)) {
908 
909         if (!nxt_queue_is_empty(&nxt_controller_waiting_requests)) {
910             nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link);
911             return;
912         }
913 
914         if (path.length == 1) {
915             mp = nxt_mp_create(1024, 128, 256, 32);
916 
917             if (nxt_slow_path(mp == NULL)) {
918                 goto alloc_fail;
919             }
920 
921             value = nxt_conf_json_parse_str(mp, &empty_obj);
922 
923         } else {
924             rc = nxt_conf_op_compile(c->mem_pool, &ops,
925                                      nxt_controller_conf.root,
926                                      &path, NULL);
927 
928             if (rc != NXT_OK) {
929                 if (rc == NXT_DECLINED) {
930                     goto not_found;
931                 }
932 
933                 goto alloc_fail;
934             }
935 
936             mp = nxt_mp_create(1024, 128, 256, 32);
937 
938             if (nxt_slow_path(mp == NULL)) {
939                 goto alloc_fail;
940             }
941 
942             value = nxt_conf_clone(mp, ops, nxt_controller_conf.root);
943         }
944 
945         if (nxt_slow_path(value == NULL)) {
946             nxt_mp_destroy(mp);
947             goto alloc_fail;
948         }
949 
950         nxt_memzero(&vldt, sizeof(nxt_conf_validation_t));
951 
952         vldt.conf = value;
953         vldt.pool = c->mem_pool;
954 
955         rc = nxt_conf_validate(&vldt);
956 
957         if (nxt_slow_path(rc != NXT_OK)) {
958             nxt_mp_destroy(mp);
959 
960             if (rc == NXT_DECLINED) {
961                 resp.detail = vldt.error;
962                 goto invalid_conf;
963             }
964 
965             /* rc == NXT_ERROR */
966             goto alloc_fail;
967         }
968 
969         rc = nxt_controller_conf_send(task, value,
970                                       nxt_controller_conf_handler, req);
971 
972         if (nxt_slow_path(rc != NXT_OK)) {
973             nxt_mp_destroy(mp);
974 
975             if (rc == NXT_DECLINED) {
976                 goto no_router;
977             }
978 
979             /* rc == NXT_ERROR */
980             goto alloc_fail;
981         }
982 
983         req->conf.root = value;
984         req->conf.pool = mp;
985 
986         nxt_queue_insert_head(&nxt_controller_waiting_requests, &req->link);
987 
988         return;
989     }
990 
991     resp.status = 405;
992     resp.title = (u_char *) "Invalid method.";
993     resp.offset = -1;
994 
995     nxt_controller_response(task, req, &resp);
996     return;
997 
998 not_found:
999 
1000     resp.status = 404;
1001     resp.title = (u_char *) "Value doesn't exist.";
1002     resp.offset = -1;
1003 
1004     nxt_controller_response(task, req, &resp);
1005     return;
1006 
1007 invalid_conf:
1008 
1009     resp.status = 400;
1010     resp.title = (u_char *) "Invalid configuration.";
1011     resp.offset = -1;
1012 
1013     nxt_controller_response(task, req, &resp);
1014     return;
1015 
1016 alloc_fail:
1017 
1018     resp.status = 500;
1019     resp.title = (u_char *) "Memory allocation failed.";
1020     resp.offset = -1;
1021 
1022     nxt_controller_response(task, req, &resp);
1023     return;
1024 
1025 no_router:
1026 
1027     resp.status = 500;
1028     resp.title = (u_char *) "Router process isn't available.";
1029     resp.offset = -1;
1030 
1031     nxt_controller_response(task, req, &resp);
1032     return;
1033 }
1034 
1035 
1036 static void
1037 nxt_controller_conf_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
1038     void *data)
1039 {
1040     nxt_queue_t                queue;
1041     nxt_controller_request_t   *req;
1042     nxt_controller_response_t  resp;
1043 
1044     req = data;
1045 
1046     nxt_debug(task, "controller conf ready: %*s",
1047               nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos);
1048 
1049     nxt_queue_remove(&req->link);
1050 
1051     nxt_memzero(&resp, sizeof(nxt_controller_response_t));
1052 
1053     if (msg->port_msg.type == NXT_PORT_MSG_RPC_READY) {
1054         nxt_mp_destroy(nxt_controller_conf.pool);
1055 
1056         nxt_controller_conf = req->conf;
1057 
1058         nxt_controller_conf_store(task, req->conf.root);
1059 
1060         resp.status = 200;
1061         resp.title = (u_char *) "Reconfiguration done.";
1062 
1063     } else {
1064         nxt_mp_destroy(req->conf.pool);
1065 
1066         resp.status = 500;
1067         resp.title = (u_char *) "Failed to apply new configuration.";
1068         resp.offset = -1;
1069     }
1070 
1071     nxt_controller_response(task, req, &resp);
1072 
1073     nxt_queue_init(&queue);
1074     nxt_queue_add(&queue, &nxt_controller_waiting_requests);
1075 
1076     nxt_queue_init(&nxt_controller_waiting_requests);
1077 
1078     nxt_queue_each(req, &queue, nxt_controller_request_t, link) {
1079         nxt_controller_process_request(task, req);
1080     } nxt_queue_loop;
1081 }
1082 
1083 
1084 static void
1085 nxt_controller_conf_store(nxt_task_t *task, nxt_conf_value_t *conf)
1086 {
1087     size_t         size;
1088     nxt_buf_t      *b;
1089     nxt_port_t     *main_port;
1090     nxt_runtime_t  *rt;
1091 
1092     rt = task->thread->runtime;
1093 
1094     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
1095 
1096     size = nxt_conf_json_length(conf, NULL);
1097 
1098     b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size);
1099 
1100     if (nxt_fast_path(b != NULL)) {
1101         b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL);
1102 
1103         (void) nxt_port_socket_write(task, main_port, NXT_PORT_MSG_CONF_STORE,
1104                                      -1, 0, -1, b);
1105     }
1106 }
1107 
1108 
1109 static void
1110 nxt_controller_response(nxt_task_t *task, nxt_controller_request_t *req,
1111     nxt_controller_response_t *resp)
1112 {
1113     size_t                  size;
1114     nxt_str_t               status_line, str;
1115     nxt_buf_t               *b, *body;
1116     nxt_conn_t              *c;
1117     nxt_uint_t              n;
1118     nxt_conf_value_t        *value, *location;
1119     nxt_conf_json_pretty_t  pretty;
1120 
1121     static nxt_str_t  success_str = nxt_string("success");
1122     static nxt_str_t  error_str = nxt_string("error");
1123     static nxt_str_t  detail_str = nxt_string("detail");
1124     static nxt_str_t  location_str = nxt_string("location");
1125     static nxt_str_t  offset_str = nxt_string("offset");
1126     static nxt_str_t  line_str = nxt_string("line");
1127     static nxt_str_t  column_str = nxt_string("column");
1128 
1129     static nxt_time_string_t  date_cache = {
1130         (nxt_atomic_uint_t) -1,
1131         nxt_controller_date,
1132         "%s, %02d %s %4d %02d:%02d:%02d GMT",
1133         sizeof("Wed, 31 Dec 1986 16:40:00 GMT") - 1,
1134         NXT_THREAD_TIME_GMT,
1135         NXT_THREAD_TIME_SEC,
1136     };
1137 
1138     switch (resp->status) {
1139 
1140     case 200:
1141         nxt_str_set(&status_line, "200 OK");
1142         break;
1143 
1144     case 400:
1145         nxt_str_set(&status_line, "400 Bad Request");
1146         break;
1147 
1148     case 404:
1149         nxt_str_set(&status_line, "404 Not Found");
1150         break;
1151 
1152     case 405:
1153         nxt_str_set(&status_line, "405 Method Not Allowed");
1154         break;
1155 
1156     default:
1157         nxt_str_set(&status_line, "500 Internal Server Error");
1158         break;
1159     }
1160 
1161     c = req->conn;
1162     value = resp->conf;
1163 
1164     if (value == NULL) {
1165         n = 1
1166             + (resp->detail.length != 0)
1167             + (resp->status >= 400 && resp->offset != -1);
1168 
1169         value = nxt_conf_create_object(c->mem_pool, n);
1170 
1171         if (nxt_slow_path(value == NULL)) {
1172             nxt_controller_conn_close(task, c, req);
1173             return;
1174         }
1175 
1176         str.length = nxt_strlen(resp->title);
1177         str.start = resp->title;
1178 
1179         if (resp->status < 400) {
1180             nxt_conf_set_member_string(value, &success_str, &str, 0);
1181 
1182         } else {
1183             nxt_conf_set_member_string(value, &error_str, &str, 0);
1184         }
1185 
1186         n = 0;
1187 
1188         if (resp->detail.length != 0) {
1189             n++;
1190 
1191             nxt_conf_set_member_string(value, &detail_str, &resp->detail, n);
1192         }
1193 
1194         if (resp->status >= 400 && resp->offset != -1) {
1195             n++;
1196 
1197             location = nxt_conf_create_object(c->mem_pool,
1198                                               resp->line != 0 ? 3 : 1);
1199 
1200             nxt_conf_set_member(value, &location_str, location, n);
1201 
1202             nxt_conf_set_member_integer(location, &offset_str, resp->offset, 0);
1203 
1204             if (resp->line != 0) {
1205                 nxt_conf_set_member_integer(location, &line_str,
1206                                             resp->line, 1);
1207 
1208                 nxt_conf_set_member_integer(location, &column_str,
1209                                             resp->column, 2);
1210             }
1211         }
1212     }
1213 
1214     nxt_memzero(&pretty, sizeof(nxt_conf_json_pretty_t));
1215 
1216     size = nxt_conf_json_length(value, &pretty) + 2;
1217 
1218     body = nxt_buf_mem_alloc(c->mem_pool, size, 0);
1219     if (nxt_slow_path(body == NULL)) {
1220         nxt_controller_conn_close(task, c, req);
1221         return;
1222     }
1223 
1224     nxt_memzero(&pretty, sizeof(nxt_conf_json_pretty_t));
1225 
1226     body->mem.free = nxt_conf_json_print(body->mem.free, value, &pretty);
1227 
1228     body->mem.free = nxt_cpymem(body->mem.free, "\r\n", 2);
1229 
1230     size = sizeof("HTTP/1.1 " "\r\n") - 1 + status_line.length
1231            + sizeof("Server: Unit/" NXT_VERSION "\r\n") - 1
1232            + sizeof("Date: Wed, 31 Dec 1986 16:40:00 GMT\r\n") - 1
1233            + sizeof("Content-Type: application/json\r\n") - 1
1234            + sizeof("Content-Length: " "\r\n") - 1 + NXT_SIZE_T_LEN
1235            + sizeof("Connection: close\r\n") - 1
1236            + sizeof("\r\n") - 1;
1237 
1238     b = nxt_buf_mem_alloc(c->mem_pool, size, 0);
1239     if (nxt_slow_path(b == NULL)) {
1240         nxt_controller_conn_close(task, c, req);
1241         return;
1242     }
1243 
1244     b->next = body;
1245 
1246     nxt_str_set(&str, "HTTP/1.1 ");
1247 
1248     b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length);
1249     b->mem.free = nxt_cpymem(b->mem.free, status_line.start,
1250                              status_line.length);
1251 
1252     nxt_str_set(&str, "\r\n"
1253                       "Server: Unit/" NXT_VERSION "\r\n"
1254                       "Date: ");
1255 
1256     b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length);
1257 
1258     b->mem.free = nxt_thread_time_string(task->thread, &date_cache,
1259                                          b->mem.free);
1260 
1261     nxt_str_set(&str, "\r\n"
1262                       "Content-Type: application/json\r\n"
1263                       "Content-Length: ");
1264 
1265     b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length);
1266 
1267     b->mem.free = nxt_sprintf(b->mem.free, b->mem.end, "%uz",
1268                               nxt_buf_mem_used_size(&body->mem));
1269 
1270     nxt_str_set(&str, "\r\n"
1271                       "Connection: close\r\n"
1272                       "\r\n");
1273 
1274     b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length);
1275 
1276     c->write = b;
1277     c->write_state = &nxt_controller_conn_write_state;
1278 
1279     nxt_conn_write(task->thread->engine, c);
1280 }
1281 
1282 
1283 static u_char *
1284 nxt_controller_date(u_char *buf, nxt_realtime_t *now, struct tm *tm,
1285     size_t size, const char *format)
1286 {
1287     static const char  *week[] = { "Sun", "Mon", "Tue", "Wed", "Thu", "Fri",
1288                                    "Sat" };
1289 
1290     static const char  *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun",
1291                                     "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
1292 
1293     return nxt_sprintf(buf, buf + size, format,
1294                        week[tm->tm_wday], tm->tm_mday,
1295                        month[tm->tm_mon], tm->tm_year + 1900,
1296                        tm->tm_hour, tm->tm_min, tm->tm_sec);
1297 }
1298