xref: /unit/src/nxt_controller.c (revision 662:053984ce0c4e)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Valentin V. Bartenev
5  * Copyright (C) NGINX, Inc.
6  */
7 
8 #include <nxt_main.h>
9 #include <nxt_runtime.h>
10 #include <nxt_main_process.h>
11 #include <nxt_conf.h>
12 
13 
14 typedef struct {
15     nxt_conf_value_t  *root;
16     nxt_mp_t          *pool;
17 } nxt_controller_conf_t;
18 
19 
20 typedef struct {
21     nxt_http_request_parse_t  parser;
22     size_t                    length;
23     nxt_controller_conf_t     conf;
24     nxt_conn_t                *conn;
25     nxt_queue_link_t          link;
26 } nxt_controller_request_t;
27 
28 
29 typedef struct {
30     nxt_uint_t        status;
31     nxt_conf_value_t  *conf;
32 
33     u_char            *title;
34     nxt_str_t         detail;
35     ssize_t           offset;
36     nxt_uint_t        line;
37     nxt_uint_t        column;
38 } nxt_controller_response_t;
39 
40 
41 static void nxt_controller_process_new_port_handler(nxt_task_t *task,
42     nxt_port_recv_msg_t *msg);
43 static void nxt_controller_send_current_conf(nxt_task_t *task);
44 static void nxt_controller_router_ready_handler(nxt_task_t *task,
45     nxt_port_recv_msg_t *msg);
46 static nxt_int_t nxt_controller_conf_default(void);
47 static void nxt_controller_conf_init_handler(nxt_task_t *task,
48     nxt_port_recv_msg_t *msg, void *data);
49 static nxt_int_t nxt_controller_conf_send(nxt_task_t *task,
50     nxt_conf_value_t *conf, nxt_port_rpc_handler_t handler, void *data);
51 
52 static void nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data);
53 static void nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data);
54 static nxt_msec_t nxt_controller_conn_timeout_value(nxt_conn_t *c,
55     uintptr_t data);
56 static void nxt_controller_conn_read_error(nxt_task_t *task, void *obj,
57     void *data);
58 static void nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj,
59     void *data);
60 static void nxt_controller_conn_body_read(nxt_task_t *task, void *obj,
61     void *data);
62 static void nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data);
63 static void nxt_controller_conn_write_error(nxt_task_t *task, void *obj,
64     void *data);
65 static void nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj,
66     void *data);
67 static void nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data);
68 static void nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data);
69 
70 static nxt_int_t nxt_controller_request_content_length(void *ctx,
71     nxt_http_field_t *field, uintptr_t data);
72 
73 static void nxt_controller_process_request(nxt_task_t *task,
74     nxt_controller_request_t *req);
75 static void nxt_controller_conf_handler(nxt_task_t *task,
76     nxt_port_recv_msg_t *msg, void *data);
77 static void nxt_controller_conf_store(nxt_task_t *task,
78     nxt_conf_value_t *conf);
79 static void nxt_controller_response(nxt_task_t *task,
80     nxt_controller_request_t *req, nxt_controller_response_t *resp);
81 static u_char *nxt_controller_date(u_char *buf, nxt_realtime_t *now,
82     struct tm *tm, size_t size, const char *format);
83 
84 
85 static nxt_http_field_proc_t  nxt_controller_request_fields[] = {
86     { nxt_string("Content-Length"),
87       &nxt_controller_request_content_length, 0 },
88 };
89 
90 static nxt_lvlhsh_t            nxt_controller_fields_hash;
91 
92 static nxt_uint_t              nxt_controller_listening;
93 static nxt_uint_t              nxt_controller_router_ready;
94 static nxt_controller_conf_t   nxt_controller_conf;
95 static nxt_queue_t             nxt_controller_waiting_requests;
96 
97 
98 static const nxt_event_conn_state_t  nxt_controller_conn_read_state;
99 static const nxt_event_conn_state_t  nxt_controller_conn_body_read_state;
100 static const nxt_event_conn_state_t  nxt_controller_conn_write_state;
101 static const nxt_event_conn_state_t  nxt_controller_conn_close_state;
102 
103 
104 nxt_port_handlers_t  nxt_controller_process_port_handlers = {
105     .quit           = nxt_worker_process_quit_handler,
106     .new_port       = nxt_controller_process_new_port_handler,
107     .change_file    = nxt_port_change_log_file_handler,
108     .mmap           = nxt_port_mmap_handler,
109     .process_ready  = nxt_controller_router_ready_handler,
110     .data           = nxt_port_data_handler,
111     .remove_pid     = nxt_port_remove_pid_handler,
112     .rpc_ready      = nxt_port_rpc_handler,
113     .rpc_error      = nxt_port_rpc_handler,
114 };
115 
116 
117 nxt_int_t
118 nxt_controller_start(nxt_task_t *task, void *data)
119 {
120     nxt_mp_t               *mp;
121     nxt_int_t              ret;
122     nxt_str_t              *json;
123     nxt_runtime_t          *rt;
124     nxt_conf_value_t       *conf;
125     nxt_event_engine_t     *engine;
126     nxt_conf_validation_t  vldt;
127 
128     rt = task->thread->runtime;
129 
130     engine = task->thread->engine;
131 
132     engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
133     if (nxt_slow_path(engine->mem_pool == NULL)) {
134         return NXT_ERROR;
135     }
136 
137     ret = nxt_http_fields_hash(&nxt_controller_fields_hash, rt->mem_pool,
138                                nxt_controller_request_fields,
139                                nxt_nitems(nxt_controller_request_fields));
140 
141     if (nxt_slow_path(ret != NXT_OK)) {
142         return NXT_ERROR;
143     }
144 
145     nxt_queue_init(&nxt_controller_waiting_requests);
146 
147     json = data;
148 
149     if (json->length == 0) {
150         return NXT_OK;
151     }
152 
153     mp = nxt_mp_create(1024, 128, 256, 32);
154     if (nxt_slow_path(mp == NULL)) {
155         return NXT_ERROR;
156     }
157 
158     conf = nxt_conf_json_parse_str(mp, json);
159     nxt_free(json->start);
160 
161     if (nxt_slow_path(conf == NULL)) {
162         nxt_alert(task, "failed to restore previous configuration: "
163                   "file is corrupted or not enough memory");
164 
165         nxt_mp_destroy(mp);
166         return NXT_OK;
167     }
168 
169     nxt_memzero(&vldt, sizeof(nxt_conf_validation_t));
170 
171     vldt.pool = nxt_mp_create(1024, 128, 256, 32);
172     if (nxt_slow_path(vldt.pool == NULL)) {
173         return NXT_ERROR;
174     }
175 
176     vldt.conf = conf;
177 
178     ret = nxt_conf_validate(&vldt);
179 
180     if (nxt_slow_path(ret != NXT_OK)) {
181 
182         if (ret == NXT_DECLINED) {
183             nxt_alert(task, "the previous configuration is invalid: %V",
184                       &vldt.error);
185 
186             nxt_mp_destroy(vldt.pool);
187             nxt_mp_destroy(mp);
188 
189             return NXT_OK;
190         }
191 
192         /* ret == NXT_ERROR */
193 
194         return NXT_ERROR;
195     }
196 
197     nxt_mp_destroy(vldt.pool);
198 
199     nxt_controller_conf.root = conf;
200     nxt_controller_conf.pool = mp;
201 
202     return NXT_OK;
203 }
204 
205 
206 static void
207 nxt_controller_process_new_port_handler(nxt_task_t *task,
208     nxt_port_recv_msg_t *msg)
209 {
210     nxt_port_new_port_handler(task, msg);
211 
212     if (msg->u.new_port->type != NXT_PROCESS_ROUTER
213         || !nxt_controller_router_ready)
214     {
215         return;
216     }
217 
218     nxt_controller_send_current_conf(task);
219 }
220 
221 
222 static void
223 nxt_controller_send_current_conf(nxt_task_t *task)
224 {
225     nxt_int_t         rc;
226     nxt_runtime_t     *rt;
227     nxt_conf_value_t  *conf;
228 
229     conf = nxt_controller_conf.root;
230 
231     if (conf != NULL) {
232         rc = nxt_controller_conf_send(task, conf,
233                                       nxt_controller_conf_init_handler, NULL);
234 
235         if (nxt_fast_path(rc == NXT_OK)) {
236             return;
237         }
238 
239         nxt_mp_destroy(nxt_controller_conf.pool);
240 
241         if (nxt_slow_path(nxt_controller_conf_default() != NXT_OK)) {
242             nxt_abort();
243         }
244     }
245 
246     if (nxt_slow_path(nxt_controller_conf_default() != NXT_OK)) {
247         nxt_abort();
248     }
249 
250     rt = task->thread->runtime;
251 
252     if (nxt_slow_path(nxt_listen_event(task, rt->controller_socket) == NULL)) {
253         nxt_abort();
254     }
255 
256     nxt_controller_listening = 1;
257 }
258 
259 
260 static void
261 nxt_controller_router_ready_handler(nxt_task_t *task,
262     nxt_port_recv_msg_t *msg)
263 {
264     nxt_port_t     *router_port;
265     nxt_runtime_t  *rt;
266 
267     rt = task->thread->runtime;
268 
269     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
270 
271     nxt_controller_router_ready = 1;
272 
273     if (router_port != NULL) {
274         nxt_controller_send_current_conf(task);
275     }
276 }
277 
278 
279 static nxt_int_t
280 nxt_controller_conf_default(void)
281 {
282     nxt_mp_t          *mp;
283     nxt_conf_value_t  *conf;
284 
285     static const nxt_str_t json
286         = nxt_string("{ \"listeners\": {}, \"applications\": {} }");
287 
288     mp = nxt_mp_create(1024, 128, 256, 32);
289 
290     if (nxt_slow_path(mp == NULL)) {
291         return NXT_ERROR;
292     }
293 
294     conf = nxt_conf_json_parse_str(mp, &json);
295 
296     if (nxt_slow_path(conf == NULL)) {
297         return NXT_ERROR;
298     }
299 
300     nxt_controller_conf.root = conf;
301     nxt_controller_conf.pool = mp;
302 
303     return NXT_OK;
304 }
305 
306 
307 static void
308 nxt_controller_conf_init_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
309     void *data)
310 {
311     nxt_runtime_t  *rt;
312 
313     if (msg->port_msg.type != NXT_PORT_MSG_RPC_READY) {
314         nxt_alert(task, "failed to apply previous configuration");
315 
316         nxt_mp_destroy(nxt_controller_conf.pool);
317 
318         if (nxt_slow_path(nxt_controller_conf_default() != NXT_OK)) {
319             nxt_abort();
320         }
321     }
322 
323     if (nxt_controller_listening == 0) {
324         rt = task->thread->runtime;
325 
326         if (nxt_slow_path(nxt_listen_event(task, rt->controller_socket)
327                           == NULL))
328         {
329             nxt_abort();
330         }
331 
332         nxt_controller_listening = 1;
333     }
334 }
335 
336 
337 static nxt_int_t
338 nxt_controller_conf_send(nxt_task_t *task, nxt_conf_value_t *conf,
339     nxt_port_rpc_handler_t handler, void *data)
340 {
341     size_t         size;
342     uint32_t       stream;
343     nxt_int_t      rc;
344     nxt_buf_t      *b;
345     nxt_port_t     *router_port, *controller_port;
346     nxt_runtime_t  *rt;
347 
348     rt = task->thread->runtime;
349 
350     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
351 
352     if (nxt_slow_path(router_port == NULL || !nxt_controller_router_ready)) {
353         return NXT_DECLINED;
354     }
355 
356     controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
357 
358     size = nxt_conf_json_length(conf, NULL);
359 
360     b = nxt_port_mmap_get_buf(task, router_port, size);
361     if (nxt_slow_path(b == NULL)) {
362         return NXT_ERROR;
363     }
364 
365     b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL);
366 
367     stream = nxt_port_rpc_register_handler(task, controller_port,
368                                            handler, handler,
369                                            router_port->pid, data);
370 
371     if (nxt_slow_path(stream == 0)) {
372         return NXT_ERROR;
373     }
374 
375     rc = nxt_port_socket_write(task, router_port, NXT_PORT_MSG_DATA_LAST, -1,
376                                stream, controller_port->id, b);
377 
378     if (nxt_slow_path(rc != NXT_OK)) {
379         nxt_port_rpc_cancel(task, controller_port, stream);
380         return NXT_ERROR;
381     }
382 
383     return NXT_OK;
384 }
385 
386 
387 nxt_int_t
388 nxt_runtime_controller_socket(nxt_task_t *task, nxt_runtime_t *rt)
389 {
390     nxt_sockaddr_t       *sa;
391     nxt_listen_socket_t  *ls;
392 
393     sa = rt->controller_listen;
394 
395     ls = nxt_mp_alloc(rt->mem_pool, sizeof(nxt_listen_socket_t));
396     if (ls == NULL) {
397         return NXT_ERROR;
398     }
399 
400     ls->sockaddr = nxt_sockaddr_create(rt->mem_pool, &sa->u.sockaddr,
401                                        sa->socklen, sa->length);
402     if (ls->sockaddr == NULL) {
403         return NXT_ERROR;
404     }
405 
406     ls->sockaddr->type = sa->type;
407     nxt_sockaddr_text(ls->sockaddr);
408 
409     nxt_listen_socket_remote_size(ls);
410 
411     ls->socket = -1;
412     ls->backlog = NXT_LISTEN_BACKLOG;
413     ls->read_after_accept = 1;
414     ls->flags = NXT_NONBLOCK;
415 
416 #if 0
417     /* STUB */
418     wq = nxt_mp_zget(cf->mem_pool, sizeof(nxt_work_queue_t));
419     if (wq == NULL) {
420         return NXT_ERROR;
421     }
422     nxt_work_queue_name(wq, "listen");
423     /**/
424 
425     ls->work_queue = wq;
426 #endif
427     ls->handler = nxt_controller_conn_init;
428 
429     if (nxt_listen_socket_create(task, ls, 0) != NXT_OK) {
430         return NXT_ERROR;
431     }
432 
433     rt->controller_socket = ls;
434 
435     return NXT_OK;
436 }
437 
438 
439 static void
440 nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data)
441 {
442     nxt_buf_t                 *b;
443     nxt_conn_t                *c;
444     nxt_event_engine_t        *engine;
445     nxt_controller_request_t  *r;
446 
447     c = obj;
448 
449     nxt_debug(task, "controller conn init fd:%d", c->socket.fd);
450 
451     r = nxt_mp_zget(c->mem_pool, sizeof(nxt_controller_request_t));
452     if (nxt_slow_path(r == NULL)) {
453         nxt_controller_conn_free(task, c, NULL);
454         return;
455     }
456 
457     r->conn = c;
458 
459     if (nxt_slow_path(nxt_http_parse_request_init(&r->parser, c->mem_pool)
460                       != NXT_OK))
461     {
462         nxt_controller_conn_free(task, c, NULL);
463         return;
464     }
465 
466     b = nxt_buf_mem_alloc(c->mem_pool, 1024, 0);
467     if (nxt_slow_path(b == NULL)) {
468         nxt_controller_conn_free(task, c, NULL);
469         return;
470     }
471 
472     c->read = b;
473     c->socket.data = r;
474     c->socket.read_ready = 1;
475     c->read_state = &nxt_controller_conn_read_state;
476 
477     engine = task->thread->engine;
478     c->read_work_queue = &engine->read_work_queue;
479     c->write_work_queue = &engine->write_work_queue;
480 
481     nxt_conn_read(engine, c);
482 }
483 
484 
485 static const nxt_event_conn_state_t  nxt_controller_conn_read_state
486     nxt_aligned(64) =
487 {
488     .ready_handler = nxt_controller_conn_read,
489     .close_handler = nxt_controller_conn_close,
490     .error_handler = nxt_controller_conn_read_error,
491 
492     .timer_handler = nxt_controller_conn_read_timeout,
493     .timer_value = nxt_controller_conn_timeout_value,
494     .timer_data = 60 * 1000,
495 };
496 
497 
498 static void
499 nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data)
500 {
501     size_t                    preread;
502     nxt_buf_t                 *b;
503     nxt_int_t                 rc;
504     nxt_conn_t                *c;
505     nxt_controller_request_t  *r;
506 
507     c = obj;
508     r = data;
509 
510     nxt_debug(task, "controller conn read");
511 
512     nxt_queue_remove(&c->link);
513     nxt_queue_self(&c->link);
514 
515     b = c->read;
516 
517     rc = nxt_http_parse_request(&r->parser, &b->mem);
518 
519     if (nxt_slow_path(rc != NXT_DONE)) {
520 
521         if (rc == NXT_AGAIN) {
522             if (nxt_buf_mem_free_size(&b->mem) == 0) {
523                 nxt_log(task, NXT_LOG_ERR, "too long request headers");
524                 nxt_controller_conn_close(task, c, r);
525                 return;
526             }
527 
528             nxt_conn_read(task->thread->engine, c);
529             return;
530         }
531 
532         /* rc == NXT_ERROR */
533 
534         nxt_log(task, NXT_LOG_ERR, "parsing error");
535 
536         nxt_controller_conn_close(task, c, r);
537         return;
538     }
539 
540     rc = nxt_http_fields_process(r->parser.fields, &nxt_controller_fields_hash,
541                                  r);
542 
543     if (nxt_slow_path(rc != NXT_OK)) {
544         nxt_controller_conn_close(task, c, r);
545         return;
546     }
547 
548     preread = nxt_buf_mem_used_size(&b->mem);
549 
550     nxt_debug(task, "controller request header parsing complete, "
551                     "body length: %uz, preread: %uz",
552                     r->length, preread);
553 
554     if (preread >= r->length) {
555         nxt_controller_process_request(task, r);
556         return;
557     }
558 
559     if (r->length - preread > (size_t) nxt_buf_mem_free_size(&b->mem)) {
560         b = nxt_buf_mem_alloc(c->mem_pool, r->length, 0);
561         if (nxt_slow_path(b == NULL)) {
562             nxt_controller_conn_free(task, c, NULL);
563             return;
564         }
565 
566         b->mem.free = nxt_cpymem(b->mem.free, c->read->mem.pos, preread);
567 
568         c->read = b;
569     }
570 
571     c->read_state = &nxt_controller_conn_body_read_state;
572 
573     nxt_conn_read(task->thread->engine, c);
574 }
575 
576 
577 static nxt_msec_t
578 nxt_controller_conn_timeout_value(nxt_conn_t *c, uintptr_t data)
579 {
580     return (nxt_msec_t) data;
581 }
582 
583 
584 static void
585 nxt_controller_conn_read_error(nxt_task_t *task, void *obj, void *data)
586 {
587     nxt_conn_t  *c;
588 
589     c = obj;
590 
591     nxt_debug(task, "controller conn read error");
592 
593     nxt_controller_conn_close(task, c, data);
594 }
595 
596 
597 static void
598 nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj, void *data)
599 {
600     nxt_timer_t  *timer;
601     nxt_conn_t   *c;
602 
603     timer = obj;
604 
605     c = nxt_read_timer_conn(timer);
606     c->socket.timedout = 1;
607     c->socket.closed = 1;
608 
609     nxt_debug(task, "controller conn read timeout");
610 
611     nxt_controller_conn_close(task, c, data);
612 }
613 
614 
615 static const nxt_event_conn_state_t  nxt_controller_conn_body_read_state
616     nxt_aligned(64) =
617 {
618     .ready_handler = nxt_controller_conn_body_read,
619     .close_handler = nxt_controller_conn_close,
620     .error_handler = nxt_controller_conn_read_error,
621 
622     .timer_handler = nxt_controller_conn_read_timeout,
623     .timer_value = nxt_controller_conn_timeout_value,
624     .timer_data = 60 * 1000,
625     .timer_autoreset = 1,
626 };
627 
628 
629 static void
630 nxt_controller_conn_body_read(nxt_task_t *task, void *obj, void *data)
631 {
632     size_t                    read;
633     nxt_buf_t                 *b;
634     nxt_conn_t                *c;
635     nxt_controller_request_t  *r;
636 
637     c = obj;
638     r = data;
639     b = c->read;
640 
641     read = nxt_buf_mem_used_size(&b->mem);
642 
643     nxt_debug(task, "controller conn body read: %uz of %uz",
644               read, r->length);
645 
646     if (read >= r->length) {
647         nxt_controller_process_request(task, r);
648         return;
649     }
650 
651     nxt_conn_read(task->thread->engine, c);
652 }
653 
654 
655 static const nxt_event_conn_state_t  nxt_controller_conn_write_state
656     nxt_aligned(64) =
657 {
658     .ready_handler = nxt_controller_conn_write,
659     .error_handler = nxt_controller_conn_write_error,
660 
661     .timer_handler = nxt_controller_conn_write_timeout,
662     .timer_value = nxt_controller_conn_timeout_value,
663     .timer_data = 60 * 1000,
664     .timer_autoreset = 1,
665 };
666 
667 
668 static void
669 nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data)
670 {
671     nxt_buf_t   *b;
672     nxt_conn_t  *c;
673 
674     c = obj;
675 
676     nxt_debug(task, "controller conn write");
677 
678     b = c->write;
679 
680     if (b->mem.pos != b->mem.free) {
681         nxt_conn_write(task->thread->engine, c);
682         return;
683     }
684 
685     nxt_debug(task, "controller conn write complete");
686 
687     nxt_controller_conn_close(task, c, data);
688 }
689 
690 
691 static void
692 nxt_controller_conn_write_error(nxt_task_t *task, void *obj, void *data)
693 {
694     nxt_conn_t  *c;
695 
696     c = obj;
697 
698     nxt_debug(task, "controller conn write error");
699 
700     nxt_controller_conn_close(task, c, data);
701 }
702 
703 
704 static void
705 nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj, void *data)
706 {
707     nxt_conn_t   *c;
708     nxt_timer_t  *timer;
709 
710     timer = obj;
711 
712     c = nxt_write_timer_conn(timer);
713     c->socket.timedout = 1;
714     c->socket.closed = 1;
715 
716     nxt_debug(task, "controller conn write timeout");
717 
718     nxt_controller_conn_close(task, c, data);
719 }
720 
721 
722 static const nxt_event_conn_state_t  nxt_controller_conn_close_state
723     nxt_aligned(64) =
724 {
725     .ready_handler = nxt_controller_conn_free,
726 };
727 
728 
729 static void
730 nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data)
731 {
732     nxt_conn_t  *c;
733 
734     c = obj;
735 
736     nxt_debug(task, "controller conn close");
737 
738     nxt_queue_remove(&c->link);
739 
740     c->write_state = &nxt_controller_conn_close_state;
741 
742     nxt_conn_close(task->thread->engine, c);
743 }
744 
745 
746 static void
747 nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data)
748 {
749     nxt_conn_t  *c;
750 
751     c = obj;
752 
753     nxt_debug(task, "controller conn free");
754 
755     nxt_sockaddr_cache_free(task->thread->engine, c);
756 
757     nxt_conn_free(task, c);
758 }
759 
760 
761 static nxt_int_t
762 nxt_controller_request_content_length(void *ctx, nxt_http_field_t *field,
763     uintptr_t data)
764 {
765     off_t                     length;
766     nxt_controller_request_t  *r;
767 
768     r = ctx;
769 
770     length = nxt_off_t_parse(field->value, field->value_length);
771 
772     if (nxt_fast_path(length > 0)) {
773 
774         if (nxt_slow_path(length > NXT_SIZE_T_MAX)) {
775             nxt_log_error(NXT_LOG_ERR, &r->conn->log,
776                           "Content-Length is too big");
777             return NXT_ERROR;
778         }
779 
780         r->length = length;
781         return NXT_OK;
782     }
783 
784     nxt_log_error(NXT_LOG_ERR, &r->conn->log, "Content-Length is invalid");
785 
786     return NXT_ERROR;
787 }
788 
789 
790 static void
791 nxt_controller_process_request(nxt_task_t *task, nxt_controller_request_t *req)
792 {
793     nxt_mp_t                   *mp;
794     nxt_int_t                  rc;
795     nxt_str_t                  path;
796     nxt_conn_t                 *c;
797     nxt_buf_mem_t              *mbuf;
798     nxt_conf_op_t              *ops;
799     nxt_conf_value_t           *value;
800     nxt_conf_validation_t      vldt;
801     nxt_conf_json_error_t      error;
802     nxt_controller_response_t  resp;
803 
804     static const nxt_str_t empty_obj = nxt_string("{}");
805 
806     c = req->conn;
807     path = req->parser.path;
808 
809     if (nxt_str_start(&path, "/config", 7)) {
810 
811         if (path.length == 7) {
812             path.length = 1;
813 
814         } else if (path.start[7] == '/') {
815             path.length -= 7;
816             path.start += 7;
817         }
818     }
819 
820     if (path.length > 1 && path.start[path.length - 1] == '/') {
821         path.length--;
822     }
823 
824     nxt_memzero(&resp, sizeof(nxt_controller_response_t));
825 
826     if (nxt_str_eq(&req->parser.method, "GET", 3)) {
827 
828         value = nxt_conf_get_path(nxt_controller_conf.root, &path);
829 
830         if (value == NULL) {
831             goto not_found;
832         }
833 
834         resp.status = 200;
835         resp.conf = value;
836 
837         nxt_controller_response(task, req, &resp);
838         return;
839     }
840 
841     if (nxt_str_eq(&req->parser.method, "PUT", 3)) {
842 
843         if (!nxt_queue_is_empty(&nxt_controller_waiting_requests)) {
844             nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link);
845             return;
846         }
847 
848         mp = nxt_mp_create(1024, 128, 256, 32);
849 
850         if (nxt_slow_path(mp == NULL)) {
851             goto alloc_fail;
852         }
853 
854         mbuf = &c->read->mem;
855 
856         nxt_memzero(&error, sizeof(nxt_conf_json_error_t));
857 
858         value = nxt_conf_json_parse(mp, mbuf->pos, mbuf->free, &error);
859 
860         if (value == NULL) {
861             nxt_mp_destroy(mp);
862 
863             if (error.pos == NULL) {
864                 goto alloc_fail;
865             }
866 
867             resp.status = 400;
868             resp.title = (u_char *) "Invalid JSON.";
869             resp.detail.length = nxt_strlen(error.detail);
870             resp.detail.start = error.detail;
871             resp.offset = error.pos - mbuf->pos;
872 
873             nxt_conf_json_position(mbuf->pos, error.pos,
874                                    &resp.line, &resp.column);
875 
876             nxt_controller_response(task, req, &resp);
877             return;
878         }
879 
880         if (path.length != 1) {
881             rc = nxt_conf_op_compile(c->mem_pool, &ops,
882                                      nxt_controller_conf.root,
883                                      &path, value);
884 
885             if (rc != NXT_OK) {
886                 nxt_mp_destroy(mp);
887 
888                 if (rc == NXT_DECLINED) {
889                     goto not_found;
890                 }
891 
892                 goto alloc_fail;
893             }
894 
895             value = nxt_conf_clone(mp, ops, nxt_controller_conf.root);
896 
897             if (nxt_slow_path(value == NULL)) {
898                 nxt_mp_destroy(mp);
899                 goto alloc_fail;
900             }
901         }
902 
903         nxt_memzero(&vldt, sizeof(nxt_conf_validation_t));
904 
905         vldt.conf = value;
906         vldt.pool = c->mem_pool;
907 
908         rc = nxt_conf_validate(&vldt);
909 
910         if (nxt_slow_path(rc != NXT_OK)) {
911             nxt_mp_destroy(mp);
912 
913             if (rc == NXT_DECLINED) {
914                 resp.detail = vldt.error;
915                 goto invalid_conf;
916             }
917 
918             /* rc == NXT_ERROR */
919             goto alloc_fail;
920         }
921 
922         rc = nxt_controller_conf_send(task, value,
923                                       nxt_controller_conf_handler, req);
924 
925         if (nxt_slow_path(rc != NXT_OK)) {
926             nxt_mp_destroy(mp);
927 
928             if (rc == NXT_DECLINED) {
929                 goto no_router;
930             }
931 
932             /* rc == NXT_ERROR */
933             goto alloc_fail;
934         }
935 
936         req->conf.root = value;
937         req->conf.pool = mp;
938 
939         nxt_queue_insert_head(&nxt_controller_waiting_requests, &req->link);
940 
941         return;
942     }
943 
944     if (nxt_str_eq(&req->parser.method, "DELETE", 6)) {
945 
946         if (!nxt_queue_is_empty(&nxt_controller_waiting_requests)) {
947             nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link);
948             return;
949         }
950 
951         if (path.length == 1) {
952             mp = nxt_mp_create(1024, 128, 256, 32);
953 
954             if (nxt_slow_path(mp == NULL)) {
955                 goto alloc_fail;
956             }
957 
958             value = nxt_conf_json_parse_str(mp, &empty_obj);
959 
960         } else {
961             rc = nxt_conf_op_compile(c->mem_pool, &ops,
962                                      nxt_controller_conf.root,
963                                      &path, NULL);
964 
965             if (rc != NXT_OK) {
966                 if (rc == NXT_DECLINED) {
967                     goto not_found;
968                 }
969 
970                 goto alloc_fail;
971             }
972 
973             mp = nxt_mp_create(1024, 128, 256, 32);
974 
975             if (nxt_slow_path(mp == NULL)) {
976                 goto alloc_fail;
977             }
978 
979             value = nxt_conf_clone(mp, ops, nxt_controller_conf.root);
980         }
981 
982         if (nxt_slow_path(value == NULL)) {
983             nxt_mp_destroy(mp);
984             goto alloc_fail;
985         }
986 
987         nxt_memzero(&vldt, sizeof(nxt_conf_validation_t));
988 
989         vldt.conf = value;
990         vldt.pool = c->mem_pool;
991 
992         rc = nxt_conf_validate(&vldt);
993 
994         if (nxt_slow_path(rc != NXT_OK)) {
995             nxt_mp_destroy(mp);
996 
997             if (rc == NXT_DECLINED) {
998                 resp.detail = vldt.error;
999                 goto invalid_conf;
1000             }
1001 
1002             /* rc == NXT_ERROR */
1003             goto alloc_fail;
1004         }
1005 
1006         rc = nxt_controller_conf_send(task, value,
1007                                       nxt_controller_conf_handler, req);
1008 
1009         if (nxt_slow_path(rc != NXT_OK)) {
1010             nxt_mp_destroy(mp);
1011 
1012             if (rc == NXT_DECLINED) {
1013                 goto no_router;
1014             }
1015 
1016             /* rc == NXT_ERROR */
1017             goto alloc_fail;
1018         }
1019 
1020         req->conf.root = value;
1021         req->conf.pool = mp;
1022 
1023         nxt_queue_insert_head(&nxt_controller_waiting_requests, &req->link);
1024 
1025         return;
1026     }
1027 
1028     resp.status = 405;
1029     resp.title = (u_char *) "Invalid method.";
1030     resp.offset = -1;
1031 
1032     nxt_controller_response(task, req, &resp);
1033     return;
1034 
1035 not_found:
1036 
1037     resp.status = 404;
1038     resp.title = (u_char *) "Value doesn't exist.";
1039     resp.offset = -1;
1040 
1041     nxt_controller_response(task, req, &resp);
1042     return;
1043 
1044 invalid_conf:
1045 
1046     resp.status = 400;
1047     resp.title = (u_char *) "Invalid configuration.";
1048     resp.offset = -1;
1049 
1050     nxt_controller_response(task, req, &resp);
1051     return;
1052 
1053 alloc_fail:
1054 
1055     resp.status = 500;
1056     resp.title = (u_char *) "Memory allocation failed.";
1057     resp.offset = -1;
1058 
1059     nxt_controller_response(task, req, &resp);
1060     return;
1061 
1062 no_router:
1063 
1064     resp.status = 500;
1065     resp.title = (u_char *) "Router process isn't available.";
1066     resp.offset = -1;
1067 
1068     nxt_controller_response(task, req, &resp);
1069     return;
1070 }
1071 
1072 
1073 static void
1074 nxt_controller_conf_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
1075     void *data)
1076 {
1077     nxt_queue_t                queue;
1078     nxt_controller_request_t   *req;
1079     nxt_controller_response_t  resp;
1080 
1081     req = data;
1082 
1083     nxt_debug(task, "controller conf ready: %*s",
1084               nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos);
1085 
1086     nxt_queue_remove(&req->link);
1087 
1088     nxt_memzero(&resp, sizeof(nxt_controller_response_t));
1089 
1090     if (msg->port_msg.type == NXT_PORT_MSG_RPC_READY) {
1091         nxt_mp_destroy(nxt_controller_conf.pool);
1092 
1093         nxt_controller_conf = req->conf;
1094 
1095         nxt_controller_conf_store(task, req->conf.root);
1096 
1097         resp.status = 200;
1098         resp.title = (u_char *) "Reconfiguration done.";
1099 
1100     } else {
1101         nxt_mp_destroy(req->conf.pool);
1102 
1103         resp.status = 500;
1104         resp.title = (u_char *) "Failed to apply new configuration.";
1105         resp.offset = -1;
1106     }
1107 
1108     nxt_controller_response(task, req, &resp);
1109 
1110     nxt_queue_init(&queue);
1111     nxt_queue_add(&queue, &nxt_controller_waiting_requests);
1112 
1113     nxt_queue_init(&nxt_controller_waiting_requests);
1114 
1115     nxt_queue_each(req, &queue, nxt_controller_request_t, link) {
1116         nxt_controller_process_request(task, req);
1117     } nxt_queue_loop;
1118 }
1119 
1120 
1121 static void
1122 nxt_controller_conf_store(nxt_task_t *task, nxt_conf_value_t *conf)
1123 {
1124     size_t         size;
1125     nxt_buf_t      *b;
1126     nxt_port_t     *main_port;
1127     nxt_runtime_t  *rt;
1128 
1129     rt = task->thread->runtime;
1130 
1131     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
1132 
1133     size = nxt_conf_json_length(conf, NULL);
1134 
1135     b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size);
1136 
1137     if (nxt_fast_path(b != NULL)) {
1138         b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL);
1139 
1140         (void) nxt_port_socket_write(task, main_port, NXT_PORT_MSG_CONF_STORE,
1141                                      -1, 0, -1, b);
1142     }
1143 }
1144 
1145 
1146 static void
1147 nxt_controller_response(nxt_task_t *task, nxt_controller_request_t *req,
1148     nxt_controller_response_t *resp)
1149 {
1150     size_t                  size;
1151     nxt_str_t               status_line, str;
1152     nxt_buf_t               *b, *body;
1153     nxt_conn_t              *c;
1154     nxt_uint_t              n;
1155     nxt_conf_value_t        *value, *location;
1156     nxt_conf_json_pretty_t  pretty;
1157 
1158     static nxt_str_t  success_str = nxt_string("success");
1159     static nxt_str_t  error_str = nxt_string("error");
1160     static nxt_str_t  detail_str = nxt_string("detail");
1161     static nxt_str_t  location_str = nxt_string("location");
1162     static nxt_str_t  offset_str = nxt_string("offset");
1163     static nxt_str_t  line_str = nxt_string("line");
1164     static nxt_str_t  column_str = nxt_string("column");
1165 
1166     static nxt_time_string_t  date_cache = {
1167         (nxt_atomic_uint_t) -1,
1168         nxt_controller_date,
1169         "%s, %02d %s %4d %02d:%02d:%02d GMT",
1170         sizeof("Wed, 31 Dec 1986 16:40:00 GMT") - 1,
1171         NXT_THREAD_TIME_GMT,
1172         NXT_THREAD_TIME_SEC,
1173     };
1174 
1175     switch (resp->status) {
1176 
1177     case 200:
1178         nxt_str_set(&status_line, "200 OK");
1179         break;
1180 
1181     case 400:
1182         nxt_str_set(&status_line, "400 Bad Request");
1183         break;
1184 
1185     case 404:
1186         nxt_str_set(&status_line, "404 Not Found");
1187         break;
1188 
1189     case 405:
1190         nxt_str_set(&status_line, "405 Method Not Allowed");
1191         break;
1192 
1193     default:
1194         nxt_str_set(&status_line, "500 Internal Server Error");
1195         break;
1196     }
1197 
1198     c = req->conn;
1199     value = resp->conf;
1200 
1201     if (value == NULL) {
1202         n = 1
1203             + (resp->detail.length != 0)
1204             + (resp->status >= 400 && resp->offset != -1);
1205 
1206         value = nxt_conf_create_object(c->mem_pool, n);
1207 
1208         if (nxt_slow_path(value == NULL)) {
1209             nxt_controller_conn_close(task, c, req);
1210             return;
1211         }
1212 
1213         str.length = nxt_strlen(resp->title);
1214         str.start = resp->title;
1215 
1216         if (resp->status < 400) {
1217             nxt_conf_set_member_string(value, &success_str, &str, 0);
1218 
1219         } else {
1220             nxt_conf_set_member_string(value, &error_str, &str, 0);
1221         }
1222 
1223         n = 0;
1224 
1225         if (resp->detail.length != 0) {
1226             n++;
1227 
1228             nxt_conf_set_member_string(value, &detail_str, &resp->detail, n);
1229         }
1230 
1231         if (resp->status >= 400 && resp->offset != -1) {
1232             n++;
1233 
1234             location = nxt_conf_create_object(c->mem_pool,
1235                                               resp->line != 0 ? 3 : 1);
1236 
1237             nxt_conf_set_member(value, &location_str, location, n);
1238 
1239             nxt_conf_set_member_integer(location, &offset_str, resp->offset, 0);
1240 
1241             if (resp->line != 0) {
1242                 nxt_conf_set_member_integer(location, &line_str,
1243                                             resp->line, 1);
1244 
1245                 nxt_conf_set_member_integer(location, &column_str,
1246                                             resp->column, 2);
1247             }
1248         }
1249     }
1250 
1251     nxt_memzero(&pretty, sizeof(nxt_conf_json_pretty_t));
1252 
1253     size = nxt_conf_json_length(value, &pretty) + 2;
1254 
1255     body = nxt_buf_mem_alloc(c->mem_pool, size, 0);
1256     if (nxt_slow_path(body == NULL)) {
1257         nxt_controller_conn_close(task, c, req);
1258         return;
1259     }
1260 
1261     nxt_memzero(&pretty, sizeof(nxt_conf_json_pretty_t));
1262 
1263     body->mem.free = nxt_conf_json_print(body->mem.free, value, &pretty);
1264 
1265     body->mem.free = nxt_cpymem(body->mem.free, "\r\n", 2);
1266 
1267     size = sizeof("HTTP/1.1 " "\r\n") - 1 + status_line.length
1268            + sizeof("Server: Unit/" NXT_VERSION "\r\n") - 1
1269            + sizeof("Date: Wed, 31 Dec 1986 16:40:00 GMT\r\n") - 1
1270            + sizeof("Content-Type: application/json\r\n") - 1
1271            + sizeof("Content-Length: " "\r\n") - 1 + NXT_SIZE_T_LEN
1272            + sizeof("Connection: close\r\n") - 1
1273            + sizeof("\r\n") - 1;
1274 
1275     b = nxt_buf_mem_alloc(c->mem_pool, size, 0);
1276     if (nxt_slow_path(b == NULL)) {
1277         nxt_controller_conn_close(task, c, req);
1278         return;
1279     }
1280 
1281     b->next = body;
1282 
1283     nxt_str_set(&str, "HTTP/1.1 ");
1284 
1285     b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length);
1286     b->mem.free = nxt_cpymem(b->mem.free, status_line.start,
1287                              status_line.length);
1288 
1289     nxt_str_set(&str, "\r\n"
1290                       "Server: Unit/" NXT_VERSION "\r\n"
1291                       "Date: ");
1292 
1293     b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length);
1294 
1295     b->mem.free = nxt_thread_time_string(task->thread, &date_cache,
1296                                          b->mem.free);
1297 
1298     nxt_str_set(&str, "\r\n"
1299                       "Content-Type: application/json\r\n"
1300                       "Content-Length: ");
1301 
1302     b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length);
1303 
1304     b->mem.free = nxt_sprintf(b->mem.free, b->mem.end, "%uz",
1305                               nxt_buf_mem_used_size(&body->mem));
1306 
1307     nxt_str_set(&str, "\r\n"
1308                       "Connection: close\r\n"
1309                       "\r\n");
1310 
1311     b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length);
1312 
1313     c->write = b;
1314     c->write_state = &nxt_controller_conn_write_state;
1315 
1316     nxt_conn_write(task->thread->engine, c);
1317 }
1318 
1319 
1320 static u_char *
1321 nxt_controller_date(u_char *buf, nxt_realtime_t *now, struct tm *tm,
1322     size_t size, const char *format)
1323 {
1324     static const char  *week[] = { "Sun", "Mon", "Tue", "Wed", "Thu", "Fri",
1325                                    "Sat" };
1326 
1327     static const char  *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun",
1328                                     "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
1329 
1330     return nxt_sprintf(buf, buf + size, format,
1331                        week[tm->tm_wday], tm->tm_mday,
1332                        month[tm->tm_mon], tm->tm_year + 1900,
1333                        tm->tm_hour, tm->tm_min, tm->tm_sec);
1334 }
1335