nxt_controller.c (673:9fa79c719a17) nxt_controller.c (703:2d536dde84d2)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_main.h>
9#include <nxt_runtime.h>
10#include <nxt_main_process.h>
11#include <nxt_conf.h>
12
13
14typedef struct {
15 nxt_conf_value_t *root;
16 nxt_mp_t *pool;
17} nxt_controller_conf_t;
18
19
20typedef struct {
21 nxt_http_request_parse_t parser;
22 size_t length;
23 nxt_controller_conf_t conf;
24 nxt_conn_t *conn;
25 nxt_queue_link_t link;
26} nxt_controller_request_t;
27
28
29typedef struct {
30 nxt_uint_t status;
31 nxt_conf_value_t *conf;
32
33 u_char *title;
34 nxt_str_t detail;
35 ssize_t offset;
36 nxt_uint_t line;
37 nxt_uint_t column;
38} nxt_controller_response_t;
39
40
41static void nxt_controller_process_new_port_handler(nxt_task_t *task,
42 nxt_port_recv_msg_t *msg);
43static void nxt_controller_send_current_conf(nxt_task_t *task);
44static void nxt_controller_router_ready_handler(nxt_task_t *task,
45 nxt_port_recv_msg_t *msg);
46static nxt_int_t nxt_controller_conf_default(void);
47static void nxt_controller_conf_init_handler(nxt_task_t *task,
48 nxt_port_recv_msg_t *msg, void *data);
49static nxt_int_t nxt_controller_conf_send(nxt_task_t *task,
50 nxt_conf_value_t *conf, nxt_port_rpc_handler_t handler, void *data);
51
52static void nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data);
53static void nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data);
54static nxt_msec_t nxt_controller_conn_timeout_value(nxt_conn_t *c,
55 uintptr_t data);
56static void nxt_controller_conn_read_error(nxt_task_t *task, void *obj,
57 void *data);
58static void nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj,
59 void *data);
60static void nxt_controller_conn_body_read(nxt_task_t *task, void *obj,
61 void *data);
62static void nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data);
63static void nxt_controller_conn_write_error(nxt_task_t *task, void *obj,
64 void *data);
65static void nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj,
66 void *data);
67static void nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data);
68static void nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data);
69
70static nxt_int_t nxt_controller_request_content_length(void *ctx,
71 nxt_http_field_t *field, uintptr_t data);
72
73static void nxt_controller_process_request(nxt_task_t *task,
74 nxt_controller_request_t *req);
75static void nxt_controller_conf_handler(nxt_task_t *task,
76 nxt_port_recv_msg_t *msg, void *data);
77static void nxt_controller_conf_store(nxt_task_t *task,
78 nxt_conf_value_t *conf);
79static void nxt_controller_response(nxt_task_t *task,
80 nxt_controller_request_t *req, nxt_controller_response_t *resp);
81static u_char *nxt_controller_date(u_char *buf, nxt_realtime_t *now,
82 struct tm *tm, size_t size, const char *format);
83
84
85static nxt_http_field_proc_t nxt_controller_request_fields[] = {
86 { nxt_string("Content-Length"),
87 &nxt_controller_request_content_length, 0 },
88};
89
90static nxt_lvlhsh_t nxt_controller_fields_hash;
91
92static nxt_uint_t nxt_controller_listening;
93static nxt_uint_t nxt_controller_router_ready;
94static nxt_controller_conf_t nxt_controller_conf;
95static nxt_queue_t nxt_controller_waiting_requests;
96
97
98static const nxt_event_conn_state_t nxt_controller_conn_read_state;
99static const nxt_event_conn_state_t nxt_controller_conn_body_read_state;
100static const nxt_event_conn_state_t nxt_controller_conn_write_state;
101static const nxt_event_conn_state_t nxt_controller_conn_close_state;
102
103
104nxt_port_handlers_t nxt_controller_process_port_handlers = {
105 .quit = nxt_worker_process_quit_handler,
106 .new_port = nxt_controller_process_new_port_handler,
107 .change_file = nxt_port_change_log_file_handler,
108 .mmap = nxt_port_mmap_handler,
109 .process_ready = nxt_controller_router_ready_handler,
110 .data = nxt_port_data_handler,
111 .remove_pid = nxt_port_remove_pid_handler,
112 .rpc_ready = nxt_port_rpc_handler,
113 .rpc_error = nxt_port_rpc_handler,
114};
115
116
117nxt_int_t
118nxt_controller_start(nxt_task_t *task, void *data)
119{
120 nxt_mp_t *mp;
121 nxt_int_t ret;
122 nxt_str_t *json;
123 nxt_runtime_t *rt;
124 nxt_conf_value_t *conf;
125 nxt_event_engine_t *engine;
126 nxt_conf_validation_t vldt;
127
128 rt = task->thread->runtime;
129
130 engine = task->thread->engine;
131
132 engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
133 if (nxt_slow_path(engine->mem_pool == NULL)) {
134 return NXT_ERROR;
135 }
136
137 ret = nxt_http_fields_hash(&nxt_controller_fields_hash, rt->mem_pool,
138 nxt_controller_request_fields,
139 nxt_nitems(nxt_controller_request_fields));
140
141 if (nxt_slow_path(ret != NXT_OK)) {
142 return NXT_ERROR;
143 }
144
145 nxt_queue_init(&nxt_controller_waiting_requests);
146
147 json = data;
148
149 if (json->length == 0) {
150 return NXT_OK;
151 }
152
153 mp = nxt_mp_create(1024, 128, 256, 32);
154 if (nxt_slow_path(mp == NULL)) {
155 return NXT_ERROR;
156 }
157
158 conf = nxt_conf_json_parse_str(mp, json);
159 nxt_free(json->start);
160
161 if (nxt_slow_path(conf == NULL)) {
162 nxt_alert(task, "failed to restore previous configuration: "
163 "file is corrupted or not enough memory");
164
165 nxt_mp_destroy(mp);
166 return NXT_OK;
167 }
168
169 nxt_memzero(&vldt, sizeof(nxt_conf_validation_t));
170
171 vldt.pool = nxt_mp_create(1024, 128, 256, 32);
172 if (nxt_slow_path(vldt.pool == NULL)) {
173 return NXT_ERROR;
174 }
175
176 vldt.conf = conf;
177
178 ret = nxt_conf_validate(&vldt);
179
180 if (nxt_slow_path(ret != NXT_OK)) {
181
182 if (ret == NXT_DECLINED) {
183 nxt_alert(task, "the previous configuration is invalid: %V",
184 &vldt.error);
185
186 nxt_mp_destroy(vldt.pool);
187 nxt_mp_destroy(mp);
188
189 return NXT_OK;
190 }
191
192 /* ret == NXT_ERROR */
193
194 return NXT_ERROR;
195 }
196
197 nxt_mp_destroy(vldt.pool);
198
199 nxt_controller_conf.root = conf;
200 nxt_controller_conf.pool = mp;
201
202 return NXT_OK;
203}
204
205
206static void
207nxt_controller_process_new_port_handler(nxt_task_t *task,
208 nxt_port_recv_msg_t *msg)
209{
210 nxt_port_new_port_handler(task, msg);
211
212 if (msg->u.new_port->type != NXT_PROCESS_ROUTER
213 || !nxt_controller_router_ready)
214 {
215 return;
216 }
217
218 nxt_controller_send_current_conf(task);
219}
220
221
222static void
223nxt_controller_send_current_conf(nxt_task_t *task)
224{
225 nxt_int_t rc;
226 nxt_runtime_t *rt;
227 nxt_conf_value_t *conf;
228
229 conf = nxt_controller_conf.root;
230
231 if (conf != NULL) {
232 rc = nxt_controller_conf_send(task, conf,
233 nxt_controller_conf_init_handler, NULL);
234
235 if (nxt_fast_path(rc == NXT_OK)) {
236 return;
237 }
238
239 nxt_mp_destroy(nxt_controller_conf.pool);
240
241 if (nxt_slow_path(nxt_controller_conf_default() != NXT_OK)) {
242 nxt_abort();
243 }
244 }
245
246 if (nxt_slow_path(nxt_controller_conf_default() != NXT_OK)) {
247 nxt_abort();
248 }
249
250 rt = task->thread->runtime;
251
252 if (nxt_slow_path(nxt_listen_event(task, rt->controller_socket) == NULL)) {
253 nxt_abort();
254 }
255
256 nxt_controller_listening = 1;
257}
258
259
260static void
261nxt_controller_router_ready_handler(nxt_task_t *task,
262 nxt_port_recv_msg_t *msg)
263{
264 nxt_port_t *router_port;
265 nxt_runtime_t *rt;
266
267 rt = task->thread->runtime;
268
269 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
270
271 nxt_controller_router_ready = 1;
272
273 if (router_port != NULL) {
274 nxt_controller_send_current_conf(task);
275 }
276}
277
278
279static nxt_int_t
280nxt_controller_conf_default(void)
281{
282 nxt_mp_t *mp;
283 nxt_conf_value_t *conf;
284
285 static const nxt_str_t json
286 = nxt_string("{ \"listeners\": {}, \"applications\": {} }");
287
288 mp = nxt_mp_create(1024, 128, 256, 32);
289
290 if (nxt_slow_path(mp == NULL)) {
291 return NXT_ERROR;
292 }
293
294 conf = nxt_conf_json_parse_str(mp, &json);
295
296 if (nxt_slow_path(conf == NULL)) {
297 return NXT_ERROR;
298 }
299
300 nxt_controller_conf.root = conf;
301 nxt_controller_conf.pool = mp;
302
303 return NXT_OK;
304}
305
306
307static void
308nxt_controller_conf_init_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
309 void *data)
310{
311 nxt_runtime_t *rt;
312
313 if (msg->port_msg.type != NXT_PORT_MSG_RPC_READY) {
314 nxt_alert(task, "failed to apply previous configuration");
315
316 nxt_mp_destroy(nxt_controller_conf.pool);
317
318 if (nxt_slow_path(nxt_controller_conf_default() != NXT_OK)) {
319 nxt_abort();
320 }
321 }
322
323 if (nxt_controller_listening == 0) {
324 rt = task->thread->runtime;
325
326 if (nxt_slow_path(nxt_listen_event(task, rt->controller_socket)
327 == NULL))
328 {
329 nxt_abort();
330 }
331
332 nxt_controller_listening = 1;
333 }
334}
335
336
337static nxt_int_t
338nxt_controller_conf_send(nxt_task_t *task, nxt_conf_value_t *conf,
339 nxt_port_rpc_handler_t handler, void *data)
340{
341 size_t size;
342 uint32_t stream;
343 nxt_int_t rc;
344 nxt_buf_t *b;
345 nxt_port_t *router_port, *controller_port;
346 nxt_runtime_t *rt;
347
348 rt = task->thread->runtime;
349
350 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
351
352 if (nxt_slow_path(router_port == NULL || !nxt_controller_router_ready)) {
353 return NXT_DECLINED;
354 }
355
356 controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
357
358 size = nxt_conf_json_length(conf, NULL);
359
360 b = nxt_port_mmap_get_buf(task, router_port, size);
361 if (nxt_slow_path(b == NULL)) {
362 return NXT_ERROR;
363 }
364
365 b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL);
366
367 stream = nxt_port_rpc_register_handler(task, controller_port,
368 handler, handler,
369 router_port->pid, data);
370
371 if (nxt_slow_path(stream == 0)) {
372 return NXT_ERROR;
373 }
374
375 rc = nxt_port_socket_write(task, router_port, NXT_PORT_MSG_DATA_LAST, -1,
376 stream, controller_port->id, b);
377
378 if (nxt_slow_path(rc != NXT_OK)) {
379 nxt_port_rpc_cancel(task, controller_port, stream);
380 return NXT_ERROR;
381 }
382
383 return NXT_OK;
384}
385
386
387nxt_int_t
388nxt_runtime_controller_socket(nxt_task_t *task, nxt_runtime_t *rt)
389{
390 nxt_sockaddr_t *sa;
391 nxt_listen_socket_t *ls;
392
393 sa = rt->controller_listen;
394
395 ls = nxt_mp_alloc(rt->mem_pool, sizeof(nxt_listen_socket_t));
396 if (ls == NULL) {
397 return NXT_ERROR;
398 }
399
400 ls->sockaddr = nxt_sockaddr_create(rt->mem_pool, &sa->u.sockaddr,
401 sa->socklen, sa->length);
402 if (ls->sockaddr == NULL) {
403 return NXT_ERROR;
404 }
405
406 ls->sockaddr->type = sa->type;
407 nxt_sockaddr_text(ls->sockaddr);
408
409 nxt_listen_socket_remote_size(ls);
410
411 ls->socket = -1;
412 ls->backlog = NXT_LISTEN_BACKLOG;
413 ls->read_after_accept = 1;
414 ls->flags = NXT_NONBLOCK;
415
416#if 0
417 /* STUB */
418 wq = nxt_mp_zget(cf->mem_pool, sizeof(nxt_work_queue_t));
419 if (wq == NULL) {
420 return NXT_ERROR;
421 }
422 nxt_work_queue_name(wq, "listen");
423 /**/
424
425 ls->work_queue = wq;
426#endif
427 ls->handler = nxt_controller_conn_init;
428
429 if (nxt_listen_socket_create(task, ls, 0) != NXT_OK) {
430 return NXT_ERROR;
431 }
432
433 rt->controller_socket = ls;
434
435 return NXT_OK;
436}
437
438
439static void
440nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data)
441{
442 nxt_buf_t *b;
443 nxt_conn_t *c;
444 nxt_event_engine_t *engine;
445 nxt_controller_request_t *r;
446
447 c = obj;
448
449 nxt_debug(task, "controller conn init fd:%d", c->socket.fd);
450
451 r = nxt_mp_zget(c->mem_pool, sizeof(nxt_controller_request_t));
452 if (nxt_slow_path(r == NULL)) {
453 nxt_controller_conn_free(task, c, NULL);
454 return;
455 }
456
457 r->conn = c;
458
459 if (nxt_slow_path(nxt_http_parse_request_init(&r->parser, c->mem_pool)
460 != NXT_OK))
461 {
462 nxt_controller_conn_free(task, c, NULL);
463 return;
464 }
465
466 b = nxt_buf_mem_alloc(c->mem_pool, 1024, 0);
467 if (nxt_slow_path(b == NULL)) {
468 nxt_controller_conn_free(task, c, NULL);
469 return;
470 }
471
472 c->read = b;
473 c->socket.data = r;
474 c->socket.read_ready = 1;
475 c->read_state = &nxt_controller_conn_read_state;
476
477 engine = task->thread->engine;
478 c->read_work_queue = &engine->read_work_queue;
479 c->write_work_queue = &engine->write_work_queue;
480
481 nxt_conn_read(engine, c);
482}
483
484
485static const nxt_event_conn_state_t nxt_controller_conn_read_state
486 nxt_aligned(64) =
487{
488 .ready_handler = nxt_controller_conn_read,
489 .close_handler = nxt_controller_conn_close,
490 .error_handler = nxt_controller_conn_read_error,
491
492 .timer_handler = nxt_controller_conn_read_timeout,
493 .timer_value = nxt_controller_conn_timeout_value,
494 .timer_data = 60 * 1000,
495};
496
497
498static void
499nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data)
500{
501 size_t preread;
502 nxt_buf_t *b;
503 nxt_int_t rc;
504 nxt_conn_t *c;
505 nxt_controller_request_t *r;
506
507 c = obj;
508 r = data;
509
510 nxt_debug(task, "controller conn read");
511
512 nxt_queue_remove(&c->link);
513 nxt_queue_self(&c->link);
514
515 b = c->read;
516
517 rc = nxt_http_parse_request(&r->parser, &b->mem);
518
519 if (nxt_slow_path(rc != NXT_DONE)) {
520
521 if (rc == NXT_AGAIN) {
522 if (nxt_buf_mem_free_size(&b->mem) == 0) {
523 nxt_log(task, NXT_LOG_ERR, "too long request headers");
524 nxt_controller_conn_close(task, c, r);
525 return;
526 }
527
528 nxt_conn_read(task->thread->engine, c);
529 return;
530 }
531
532 /* rc == NXT_ERROR */
533
534 nxt_log(task, NXT_LOG_ERR, "parsing error");
535
536 nxt_controller_conn_close(task, c, r);
537 return;
538 }
539
540 rc = nxt_http_fields_process(r->parser.fields, &nxt_controller_fields_hash,
541 r);
542
543 if (nxt_slow_path(rc != NXT_OK)) {
544 nxt_controller_conn_close(task, c, r);
545 return;
546 }
547
548 preread = nxt_buf_mem_used_size(&b->mem);
549
550 nxt_debug(task, "controller request header parsing complete, "
551 "body length: %uz, preread: %uz",
552 r->length, preread);
553
554 if (preread >= r->length) {
555 nxt_controller_process_request(task, r);
556 return;
557 }
558
559 if (r->length - preread > (size_t) nxt_buf_mem_free_size(&b->mem)) {
560 b = nxt_buf_mem_alloc(c->mem_pool, r->length, 0);
561 if (nxt_slow_path(b == NULL)) {
562 nxt_controller_conn_free(task, c, NULL);
563 return;
564 }
565
566 b->mem.free = nxt_cpymem(b->mem.free, c->read->mem.pos, preread);
567
568 c->read = b;
569 }
570
571 c->read_state = &nxt_controller_conn_body_read_state;
572
573 nxt_conn_read(task->thread->engine, c);
574}
575
576
577static nxt_msec_t
578nxt_controller_conn_timeout_value(nxt_conn_t *c, uintptr_t data)
579{
580 return (nxt_msec_t) data;
581}
582
583
584static void
585nxt_controller_conn_read_error(nxt_task_t *task, void *obj, void *data)
586{
587 nxt_conn_t *c;
588
589 c = obj;
590
591 nxt_debug(task, "controller conn read error");
592
593 nxt_controller_conn_close(task, c, data);
594}
595
596
597static void
598nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj, void *data)
599{
600 nxt_timer_t *timer;
601 nxt_conn_t *c;
602
603 timer = obj;
604
605 c = nxt_read_timer_conn(timer);
606 c->socket.timedout = 1;
607 c->socket.closed = 1;
608
609 nxt_debug(task, "controller conn read timeout");
610
611 nxt_controller_conn_close(task, c, data);
612}
613
614
615static const nxt_event_conn_state_t nxt_controller_conn_body_read_state
616 nxt_aligned(64) =
617{
618 .ready_handler = nxt_controller_conn_body_read,
619 .close_handler = nxt_controller_conn_close,
620 .error_handler = nxt_controller_conn_read_error,
621
622 .timer_handler = nxt_controller_conn_read_timeout,
623 .timer_value = nxt_controller_conn_timeout_value,
624 .timer_data = 60 * 1000,
625 .timer_autoreset = 1,
626};
627
628
629static void
630nxt_controller_conn_body_read(nxt_task_t *task, void *obj, void *data)
631{
632 size_t read;
633 nxt_buf_t *b;
634 nxt_conn_t *c;
635 nxt_controller_request_t *r;
636
637 c = obj;
638 r = data;
639 b = c->read;
640
641 read = nxt_buf_mem_used_size(&b->mem);
642
643 nxt_debug(task, "controller conn body read: %uz of %uz",
644 read, r->length);
645
646 if (read >= r->length) {
647 nxt_controller_process_request(task, r);
648 return;
649 }
650
651 nxt_conn_read(task->thread->engine, c);
652}
653
654
655static const nxt_event_conn_state_t nxt_controller_conn_write_state
656 nxt_aligned(64) =
657{
658 .ready_handler = nxt_controller_conn_write,
659 .error_handler = nxt_controller_conn_write_error,
660
661 .timer_handler = nxt_controller_conn_write_timeout,
662 .timer_value = nxt_controller_conn_timeout_value,
663 .timer_data = 60 * 1000,
664 .timer_autoreset = 1,
665};
666
667
668static void
669nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data)
670{
671 nxt_buf_t *b;
672 nxt_conn_t *c;
673
674 c = obj;
675
676 nxt_debug(task, "controller conn write");
677
678 b = c->write;
679
680 if (b->mem.pos != b->mem.free) {
681 nxt_conn_write(task->thread->engine, c);
682 return;
683 }
684
685 nxt_debug(task, "controller conn write complete");
686
687 nxt_controller_conn_close(task, c, data);
688}
689
690
691static void
692nxt_controller_conn_write_error(nxt_task_t *task, void *obj, void *data)
693{
694 nxt_conn_t *c;
695
696 c = obj;
697
698 nxt_debug(task, "controller conn write error");
699
700 nxt_controller_conn_close(task, c, data);
701}
702
703
704static void
705nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj, void *data)
706{
707 nxt_conn_t *c;
708 nxt_timer_t *timer;
709
710 timer = obj;
711
712 c = nxt_write_timer_conn(timer);
713 c->socket.timedout = 1;
714 c->socket.closed = 1;
715
716 nxt_debug(task, "controller conn write timeout");
717
718 nxt_controller_conn_close(task, c, data);
719}
720
721
722static const nxt_event_conn_state_t nxt_controller_conn_close_state
723 nxt_aligned(64) =
724{
725 .ready_handler = nxt_controller_conn_free,
726};
727
728
729static void
730nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data)
731{
732 nxt_conn_t *c;
733
734 c = obj;
735
736 nxt_debug(task, "controller conn close");
737
738 nxt_queue_remove(&c->link);
739
740 c->write_state = &nxt_controller_conn_close_state;
741
742 nxt_conn_close(task->thread->engine, c);
743}
744
745
746static void
747nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data)
748{
749 nxt_conn_t *c;
750
751 c = obj;
752
753 nxt_debug(task, "controller conn free");
754
755 nxt_sockaddr_cache_free(task->thread->engine, c);
756
757 nxt_conn_free(task, c);
758}
759
760
761static nxt_int_t
762nxt_controller_request_content_length(void *ctx, nxt_http_field_t *field,
763 uintptr_t data)
764{
765 off_t length;
766 nxt_controller_request_t *r;
767
768 r = ctx;
769
770 length = nxt_off_t_parse(field->value, field->value_length);
771
772 if (nxt_fast_path(length > 0)) {
773
774 if (nxt_slow_path(length > NXT_SIZE_T_MAX)) {
775 nxt_log_error(NXT_LOG_ERR, &r->conn->log,
776 "Content-Length is too big");
777 return NXT_ERROR;
778 }
779
780 r->length = length;
781 return NXT_OK;
782 }
783
784 nxt_log_error(NXT_LOG_ERR, &r->conn->log, "Content-Length is invalid");
785
786 return NXT_ERROR;
787}
788
789
790static void
791nxt_controller_process_request(nxt_task_t *task, nxt_controller_request_t *req)
792{
793 nxt_mp_t *mp;
794 nxt_int_t rc;
795 nxt_str_t path;
796 nxt_conn_t *c;
797 nxt_buf_mem_t *mbuf;
798 nxt_conf_op_t *ops;
799 nxt_conf_value_t *value;
800 nxt_conf_validation_t vldt;
801 nxt_conf_json_error_t error;
802 nxt_controller_response_t resp;
803
804 static const nxt_str_t empty_obj = nxt_string("{}");
805
806 c = req->conn;
807 path = req->parser.path;
808
809 if (nxt_str_start(&path, "/config", 7)) {
810
811 if (path.length == 7) {
812 path.length = 1;
813
814 } else if (path.start[7] == '/') {
815 path.length -= 7;
816 path.start += 7;
817 }
818 }
819
820 if (path.length > 1 && path.start[path.length - 1] == '/') {
821 path.length--;
822 }
823
824 nxt_memzero(&resp, sizeof(nxt_controller_response_t));
825
826 if (nxt_str_eq(&req->parser.method, "GET", 3)) {
827
828 value = nxt_conf_get_path(nxt_controller_conf.root, &path);
829
830 if (value == NULL) {
831 goto not_found;
832 }
833
834 resp.status = 200;
835 resp.conf = value;
836
837 nxt_controller_response(task, req, &resp);
838 return;
839 }
840
841 if (nxt_str_eq(&req->parser.method, "PUT", 3)) {
842
843 if (!nxt_queue_is_empty(&nxt_controller_waiting_requests)) {
844 nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link);
845 return;
846 }
847
848 mp = nxt_mp_create(1024, 128, 256, 32);
849
850 if (nxt_slow_path(mp == NULL)) {
851 goto alloc_fail;
852 }
853
854 mbuf = &c->read->mem;
855
856 nxt_memzero(&error, sizeof(nxt_conf_json_error_t));
857
858 value = nxt_conf_json_parse(mp, mbuf->pos, mbuf->free, &error);
859
860 if (value == NULL) {
861 nxt_mp_destroy(mp);
862
863 if (error.pos == NULL) {
864 goto alloc_fail;
865 }
866
867 resp.status = 400;
868 resp.title = (u_char *) "Invalid JSON.";
869 resp.detail.length = nxt_strlen(error.detail);
870 resp.detail.start = error.detail;
871 resp.offset = error.pos - mbuf->pos;
872
873 nxt_conf_json_position(mbuf->pos, error.pos,
874 &resp.line, &resp.column);
875
876 nxt_controller_response(task, req, &resp);
877 return;
878 }
879
880 if (path.length != 1) {
881 rc = nxt_conf_op_compile(c->mem_pool, &ops,
882 nxt_controller_conf.root,
883 &path, value);
884
885 if (rc != NXT_OK) {
886 nxt_mp_destroy(mp);
887
888 if (rc == NXT_DECLINED) {
889 goto not_found;
890 }
891
892 goto alloc_fail;
893 }
894
895 value = nxt_conf_clone(mp, ops, nxt_controller_conf.root);
896
897 if (nxt_slow_path(value == NULL)) {
898 nxt_mp_destroy(mp);
899 goto alloc_fail;
900 }
901 }
902
903 nxt_memzero(&vldt, sizeof(nxt_conf_validation_t));
904
905 vldt.conf = value;
906 vldt.pool = c->mem_pool;
907
908 rc = nxt_conf_validate(&vldt);
909
910 if (nxt_slow_path(rc != NXT_OK)) {
911 nxt_mp_destroy(mp);
912
913 if (rc == NXT_DECLINED) {
914 resp.detail = vldt.error;
915 goto invalid_conf;
916 }
917
918 /* rc == NXT_ERROR */
919 goto alloc_fail;
920 }
921
922 rc = nxt_controller_conf_send(task, value,
923 nxt_controller_conf_handler, req);
924
925 if (nxt_slow_path(rc != NXT_OK)) {
926 nxt_mp_destroy(mp);
927
928 if (rc == NXT_DECLINED) {
929 goto no_router;
930 }
931
932 /* rc == NXT_ERROR */
933 goto alloc_fail;
934 }
935
936 req->conf.root = value;
937 req->conf.pool = mp;
938
939 nxt_queue_insert_head(&nxt_controller_waiting_requests, &req->link);
940
941 return;
942 }
943
944 if (nxt_str_eq(&req->parser.method, "DELETE", 6)) {
945
946 if (!nxt_queue_is_empty(&nxt_controller_waiting_requests)) {
947 nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link);
948 return;
949 }
950
951 if (path.length == 1) {
952 mp = nxt_mp_create(1024, 128, 256, 32);
953
954 if (nxt_slow_path(mp == NULL)) {
955 goto alloc_fail;
956 }
957
958 value = nxt_conf_json_parse_str(mp, &empty_obj);
959
960 } else {
961 rc = nxt_conf_op_compile(c->mem_pool, &ops,
962 nxt_controller_conf.root,
963 &path, NULL);
964
965 if (rc != NXT_OK) {
966 if (rc == NXT_DECLINED) {
967 goto not_found;
968 }
969
970 goto alloc_fail;
971 }
972
973 mp = nxt_mp_create(1024, 128, 256, 32);
974
975 if (nxt_slow_path(mp == NULL)) {
976 goto alloc_fail;
977 }
978
979 value = nxt_conf_clone(mp, ops, nxt_controller_conf.root);
980 }
981
982 if (nxt_slow_path(value == NULL)) {
983 nxt_mp_destroy(mp);
984 goto alloc_fail;
985 }
986
987 nxt_memzero(&vldt, sizeof(nxt_conf_validation_t));
988
989 vldt.conf = value;
990 vldt.pool = c->mem_pool;
991
992 rc = nxt_conf_validate(&vldt);
993
994 if (nxt_slow_path(rc != NXT_OK)) {
995 nxt_mp_destroy(mp);
996
997 if (rc == NXT_DECLINED) {
998 resp.detail = vldt.error;
999 goto invalid_conf;
1000 }
1001
1002 /* rc == NXT_ERROR */
1003 goto alloc_fail;
1004 }
1005
1006 rc = nxt_controller_conf_send(task, value,
1007 nxt_controller_conf_handler, req);
1008
1009 if (nxt_slow_path(rc != NXT_OK)) {
1010 nxt_mp_destroy(mp);
1011
1012 if (rc == NXT_DECLINED) {
1013 goto no_router;
1014 }
1015
1016 /* rc == NXT_ERROR */
1017 goto alloc_fail;
1018 }
1019
1020 req->conf.root = value;
1021 req->conf.pool = mp;
1022
1023 nxt_queue_insert_head(&nxt_controller_waiting_requests, &req->link);
1024
1025 return;
1026 }
1027
1028 resp.status = 405;
1029 resp.title = (u_char *) "Invalid method.";
1030 resp.offset = -1;
1031
1032 nxt_controller_response(task, req, &resp);
1033 return;
1034
1035not_found:
1036
1037 resp.status = 404;
1038 resp.title = (u_char *) "Value doesn't exist.";
1039 resp.offset = -1;
1040
1041 nxt_controller_response(task, req, &resp);
1042 return;
1043
1044invalid_conf:
1045
1046 resp.status = 400;
1047 resp.title = (u_char *) "Invalid configuration.";
1048 resp.offset = -1;
1049
1050 nxt_controller_response(task, req, &resp);
1051 return;
1052
1053alloc_fail:
1054
1055 resp.status = 500;
1056 resp.title = (u_char *) "Memory allocation failed.";
1057 resp.offset = -1;
1058
1059 nxt_controller_response(task, req, &resp);
1060 return;
1061
1062no_router:
1063
1064 resp.status = 500;
1065 resp.title = (u_char *) "Router process isn't available.";
1066 resp.offset = -1;
1067
1068 nxt_controller_response(task, req, &resp);
1069 return;
1070}
1071
1072
1073static void
1074nxt_controller_conf_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
1075 void *data)
1076{
1077 nxt_queue_t queue;
1078 nxt_controller_request_t *req;
1079 nxt_controller_response_t resp;
1080
1081 req = data;
1082
1083 nxt_debug(task, "controller conf ready: %*s",
1084 nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos);
1085
1086 nxt_queue_remove(&req->link);
1087
1088 nxt_memzero(&resp, sizeof(nxt_controller_response_t));
1089
1090 if (msg->port_msg.type == NXT_PORT_MSG_RPC_READY) {
1091 nxt_mp_destroy(nxt_controller_conf.pool);
1092
1093 nxt_controller_conf = req->conf;
1094
1095 nxt_controller_conf_store(task, req->conf.root);
1096
1097 resp.status = 200;
1098 resp.title = (u_char *) "Reconfiguration done.";
1099
1100 } else {
1101 nxt_mp_destroy(req->conf.pool);
1102
1103 resp.status = 500;
1104 resp.title = (u_char *) "Failed to apply new configuration.";
1105 resp.offset = -1;
1106 }
1107
1108 nxt_controller_response(task, req, &resp);
1109
1110 nxt_queue_init(&queue);
1111 nxt_queue_add(&queue, &nxt_controller_waiting_requests);
1112
1113 nxt_queue_init(&nxt_controller_waiting_requests);
1114
1115 nxt_queue_each(req, &queue, nxt_controller_request_t, link) {
1116 nxt_controller_process_request(task, req);
1117 } nxt_queue_loop;
1118}
1119
1120
1121static void
1122nxt_controller_conf_store(nxt_task_t *task, nxt_conf_value_t *conf)
1123{
1124 size_t size;
1125 nxt_buf_t *b;
1126 nxt_port_t *main_port;
1127 nxt_runtime_t *rt;
1128
1129 rt = task->thread->runtime;
1130
1131 main_port = rt->port_by_type[NXT_PROCESS_MAIN];
1132
1133 size = nxt_conf_json_length(conf, NULL);
1134
1135 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size);
1136
1137 if (nxt_fast_path(b != NULL)) {
1138 b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL);
1139
1140 (void) nxt_port_socket_write(task, main_port, NXT_PORT_MSG_CONF_STORE,
1141 -1, 0, -1, b);
1142 }
1143}
1144
1145
1146static void
1147nxt_controller_response(nxt_task_t *task, nxt_controller_request_t *req,
1148 nxt_controller_response_t *resp)
1149{
1150 size_t size;
1151 nxt_str_t status_line, str;
1152 nxt_buf_t *b, *body;
1153 nxt_conn_t *c;
1154 nxt_uint_t n;
1155 nxt_conf_value_t *value, *location;
1156 nxt_conf_json_pretty_t pretty;
1157
1158 static nxt_str_t success_str = nxt_string("success");
1159 static nxt_str_t error_str = nxt_string("error");
1160 static nxt_str_t detail_str = nxt_string("detail");
1161 static nxt_str_t location_str = nxt_string("location");
1162 static nxt_str_t offset_str = nxt_string("offset");
1163 static nxt_str_t line_str = nxt_string("line");
1164 static nxt_str_t column_str = nxt_string("column");
1165
1166 static nxt_time_string_t date_cache = {
1167 (nxt_atomic_uint_t) -1,
1168 nxt_controller_date,
1169 "%s, %02d %s %4d %02d:%02d:%02d GMT",
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_main.h>
9#include <nxt_runtime.h>
10#include <nxt_main_process.h>
11#include <nxt_conf.h>
12
13
14typedef struct {
15 nxt_conf_value_t *root;
16 nxt_mp_t *pool;
17} nxt_controller_conf_t;
18
19
20typedef struct {
21 nxt_http_request_parse_t parser;
22 size_t length;
23 nxt_controller_conf_t conf;
24 nxt_conn_t *conn;
25 nxt_queue_link_t link;
26} nxt_controller_request_t;
27
28
29typedef struct {
30 nxt_uint_t status;
31 nxt_conf_value_t *conf;
32
33 u_char *title;
34 nxt_str_t detail;
35 ssize_t offset;
36 nxt_uint_t line;
37 nxt_uint_t column;
38} nxt_controller_response_t;
39
40
41static void nxt_controller_process_new_port_handler(nxt_task_t *task,
42 nxt_port_recv_msg_t *msg);
43static void nxt_controller_send_current_conf(nxt_task_t *task);
44static void nxt_controller_router_ready_handler(nxt_task_t *task,
45 nxt_port_recv_msg_t *msg);
46static nxt_int_t nxt_controller_conf_default(void);
47static void nxt_controller_conf_init_handler(nxt_task_t *task,
48 nxt_port_recv_msg_t *msg, void *data);
49static nxt_int_t nxt_controller_conf_send(nxt_task_t *task,
50 nxt_conf_value_t *conf, nxt_port_rpc_handler_t handler, void *data);
51
52static void nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data);
53static void nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data);
54static nxt_msec_t nxt_controller_conn_timeout_value(nxt_conn_t *c,
55 uintptr_t data);
56static void nxt_controller_conn_read_error(nxt_task_t *task, void *obj,
57 void *data);
58static void nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj,
59 void *data);
60static void nxt_controller_conn_body_read(nxt_task_t *task, void *obj,
61 void *data);
62static void nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data);
63static void nxt_controller_conn_write_error(nxt_task_t *task, void *obj,
64 void *data);
65static void nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj,
66 void *data);
67static void nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data);
68static void nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data);
69
70static nxt_int_t nxt_controller_request_content_length(void *ctx,
71 nxt_http_field_t *field, uintptr_t data);
72
73static void nxt_controller_process_request(nxt_task_t *task,
74 nxt_controller_request_t *req);
75static void nxt_controller_conf_handler(nxt_task_t *task,
76 nxt_port_recv_msg_t *msg, void *data);
77static void nxt_controller_conf_store(nxt_task_t *task,
78 nxt_conf_value_t *conf);
79static void nxt_controller_response(nxt_task_t *task,
80 nxt_controller_request_t *req, nxt_controller_response_t *resp);
81static u_char *nxt_controller_date(u_char *buf, nxt_realtime_t *now,
82 struct tm *tm, size_t size, const char *format);
83
84
85static nxt_http_field_proc_t nxt_controller_request_fields[] = {
86 { nxt_string("Content-Length"),
87 &nxt_controller_request_content_length, 0 },
88};
89
90static nxt_lvlhsh_t nxt_controller_fields_hash;
91
92static nxt_uint_t nxt_controller_listening;
93static nxt_uint_t nxt_controller_router_ready;
94static nxt_controller_conf_t nxt_controller_conf;
95static nxt_queue_t nxt_controller_waiting_requests;
96
97
98static const nxt_event_conn_state_t nxt_controller_conn_read_state;
99static const nxt_event_conn_state_t nxt_controller_conn_body_read_state;
100static const nxt_event_conn_state_t nxt_controller_conn_write_state;
101static const nxt_event_conn_state_t nxt_controller_conn_close_state;
102
103
104nxt_port_handlers_t nxt_controller_process_port_handlers = {
105 .quit = nxt_worker_process_quit_handler,
106 .new_port = nxt_controller_process_new_port_handler,
107 .change_file = nxt_port_change_log_file_handler,
108 .mmap = nxt_port_mmap_handler,
109 .process_ready = nxt_controller_router_ready_handler,
110 .data = nxt_port_data_handler,
111 .remove_pid = nxt_port_remove_pid_handler,
112 .rpc_ready = nxt_port_rpc_handler,
113 .rpc_error = nxt_port_rpc_handler,
114};
115
116
117nxt_int_t
118nxt_controller_start(nxt_task_t *task, void *data)
119{
120 nxt_mp_t *mp;
121 nxt_int_t ret;
122 nxt_str_t *json;
123 nxt_runtime_t *rt;
124 nxt_conf_value_t *conf;
125 nxt_event_engine_t *engine;
126 nxt_conf_validation_t vldt;
127
128 rt = task->thread->runtime;
129
130 engine = task->thread->engine;
131
132 engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
133 if (nxt_slow_path(engine->mem_pool == NULL)) {
134 return NXT_ERROR;
135 }
136
137 ret = nxt_http_fields_hash(&nxt_controller_fields_hash, rt->mem_pool,
138 nxt_controller_request_fields,
139 nxt_nitems(nxt_controller_request_fields));
140
141 if (nxt_slow_path(ret != NXT_OK)) {
142 return NXT_ERROR;
143 }
144
145 nxt_queue_init(&nxt_controller_waiting_requests);
146
147 json = data;
148
149 if (json->length == 0) {
150 return NXT_OK;
151 }
152
153 mp = nxt_mp_create(1024, 128, 256, 32);
154 if (nxt_slow_path(mp == NULL)) {
155 return NXT_ERROR;
156 }
157
158 conf = nxt_conf_json_parse_str(mp, json);
159 nxt_free(json->start);
160
161 if (nxt_slow_path(conf == NULL)) {
162 nxt_alert(task, "failed to restore previous configuration: "
163 "file is corrupted or not enough memory");
164
165 nxt_mp_destroy(mp);
166 return NXT_OK;
167 }
168
169 nxt_memzero(&vldt, sizeof(nxt_conf_validation_t));
170
171 vldt.pool = nxt_mp_create(1024, 128, 256, 32);
172 if (nxt_slow_path(vldt.pool == NULL)) {
173 return NXT_ERROR;
174 }
175
176 vldt.conf = conf;
177
178 ret = nxt_conf_validate(&vldt);
179
180 if (nxt_slow_path(ret != NXT_OK)) {
181
182 if (ret == NXT_DECLINED) {
183 nxt_alert(task, "the previous configuration is invalid: %V",
184 &vldt.error);
185
186 nxt_mp_destroy(vldt.pool);
187 nxt_mp_destroy(mp);
188
189 return NXT_OK;
190 }
191
192 /* ret == NXT_ERROR */
193
194 return NXT_ERROR;
195 }
196
197 nxt_mp_destroy(vldt.pool);
198
199 nxt_controller_conf.root = conf;
200 nxt_controller_conf.pool = mp;
201
202 return NXT_OK;
203}
204
205
206static void
207nxt_controller_process_new_port_handler(nxt_task_t *task,
208 nxt_port_recv_msg_t *msg)
209{
210 nxt_port_new_port_handler(task, msg);
211
212 if (msg->u.new_port->type != NXT_PROCESS_ROUTER
213 || !nxt_controller_router_ready)
214 {
215 return;
216 }
217
218 nxt_controller_send_current_conf(task);
219}
220
221
222static void
223nxt_controller_send_current_conf(nxt_task_t *task)
224{
225 nxt_int_t rc;
226 nxt_runtime_t *rt;
227 nxt_conf_value_t *conf;
228
229 conf = nxt_controller_conf.root;
230
231 if (conf != NULL) {
232 rc = nxt_controller_conf_send(task, conf,
233 nxt_controller_conf_init_handler, NULL);
234
235 if (nxt_fast_path(rc == NXT_OK)) {
236 return;
237 }
238
239 nxt_mp_destroy(nxt_controller_conf.pool);
240
241 if (nxt_slow_path(nxt_controller_conf_default() != NXT_OK)) {
242 nxt_abort();
243 }
244 }
245
246 if (nxt_slow_path(nxt_controller_conf_default() != NXT_OK)) {
247 nxt_abort();
248 }
249
250 rt = task->thread->runtime;
251
252 if (nxt_slow_path(nxt_listen_event(task, rt->controller_socket) == NULL)) {
253 nxt_abort();
254 }
255
256 nxt_controller_listening = 1;
257}
258
259
260static void
261nxt_controller_router_ready_handler(nxt_task_t *task,
262 nxt_port_recv_msg_t *msg)
263{
264 nxt_port_t *router_port;
265 nxt_runtime_t *rt;
266
267 rt = task->thread->runtime;
268
269 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
270
271 nxt_controller_router_ready = 1;
272
273 if (router_port != NULL) {
274 nxt_controller_send_current_conf(task);
275 }
276}
277
278
279static nxt_int_t
280nxt_controller_conf_default(void)
281{
282 nxt_mp_t *mp;
283 nxt_conf_value_t *conf;
284
285 static const nxt_str_t json
286 = nxt_string("{ \"listeners\": {}, \"applications\": {} }");
287
288 mp = nxt_mp_create(1024, 128, 256, 32);
289
290 if (nxt_slow_path(mp == NULL)) {
291 return NXT_ERROR;
292 }
293
294 conf = nxt_conf_json_parse_str(mp, &json);
295
296 if (nxt_slow_path(conf == NULL)) {
297 return NXT_ERROR;
298 }
299
300 nxt_controller_conf.root = conf;
301 nxt_controller_conf.pool = mp;
302
303 return NXT_OK;
304}
305
306
307static void
308nxt_controller_conf_init_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
309 void *data)
310{
311 nxt_runtime_t *rt;
312
313 if (msg->port_msg.type != NXT_PORT_MSG_RPC_READY) {
314 nxt_alert(task, "failed to apply previous configuration");
315
316 nxt_mp_destroy(nxt_controller_conf.pool);
317
318 if (nxt_slow_path(nxt_controller_conf_default() != NXT_OK)) {
319 nxt_abort();
320 }
321 }
322
323 if (nxt_controller_listening == 0) {
324 rt = task->thread->runtime;
325
326 if (nxt_slow_path(nxt_listen_event(task, rt->controller_socket)
327 == NULL))
328 {
329 nxt_abort();
330 }
331
332 nxt_controller_listening = 1;
333 }
334}
335
336
337static nxt_int_t
338nxt_controller_conf_send(nxt_task_t *task, nxt_conf_value_t *conf,
339 nxt_port_rpc_handler_t handler, void *data)
340{
341 size_t size;
342 uint32_t stream;
343 nxt_int_t rc;
344 nxt_buf_t *b;
345 nxt_port_t *router_port, *controller_port;
346 nxt_runtime_t *rt;
347
348 rt = task->thread->runtime;
349
350 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
351
352 if (nxt_slow_path(router_port == NULL || !nxt_controller_router_ready)) {
353 return NXT_DECLINED;
354 }
355
356 controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
357
358 size = nxt_conf_json_length(conf, NULL);
359
360 b = nxt_port_mmap_get_buf(task, router_port, size);
361 if (nxt_slow_path(b == NULL)) {
362 return NXT_ERROR;
363 }
364
365 b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL);
366
367 stream = nxt_port_rpc_register_handler(task, controller_port,
368 handler, handler,
369 router_port->pid, data);
370
371 if (nxt_slow_path(stream == 0)) {
372 return NXT_ERROR;
373 }
374
375 rc = nxt_port_socket_write(task, router_port, NXT_PORT_MSG_DATA_LAST, -1,
376 stream, controller_port->id, b);
377
378 if (nxt_slow_path(rc != NXT_OK)) {
379 nxt_port_rpc_cancel(task, controller_port, stream);
380 return NXT_ERROR;
381 }
382
383 return NXT_OK;
384}
385
386
387nxt_int_t
388nxt_runtime_controller_socket(nxt_task_t *task, nxt_runtime_t *rt)
389{
390 nxt_sockaddr_t *sa;
391 nxt_listen_socket_t *ls;
392
393 sa = rt->controller_listen;
394
395 ls = nxt_mp_alloc(rt->mem_pool, sizeof(nxt_listen_socket_t));
396 if (ls == NULL) {
397 return NXT_ERROR;
398 }
399
400 ls->sockaddr = nxt_sockaddr_create(rt->mem_pool, &sa->u.sockaddr,
401 sa->socklen, sa->length);
402 if (ls->sockaddr == NULL) {
403 return NXT_ERROR;
404 }
405
406 ls->sockaddr->type = sa->type;
407 nxt_sockaddr_text(ls->sockaddr);
408
409 nxt_listen_socket_remote_size(ls);
410
411 ls->socket = -1;
412 ls->backlog = NXT_LISTEN_BACKLOG;
413 ls->read_after_accept = 1;
414 ls->flags = NXT_NONBLOCK;
415
416#if 0
417 /* STUB */
418 wq = nxt_mp_zget(cf->mem_pool, sizeof(nxt_work_queue_t));
419 if (wq == NULL) {
420 return NXT_ERROR;
421 }
422 nxt_work_queue_name(wq, "listen");
423 /**/
424
425 ls->work_queue = wq;
426#endif
427 ls->handler = nxt_controller_conn_init;
428
429 if (nxt_listen_socket_create(task, ls, 0) != NXT_OK) {
430 return NXT_ERROR;
431 }
432
433 rt->controller_socket = ls;
434
435 return NXT_OK;
436}
437
438
439static void
440nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data)
441{
442 nxt_buf_t *b;
443 nxt_conn_t *c;
444 nxt_event_engine_t *engine;
445 nxt_controller_request_t *r;
446
447 c = obj;
448
449 nxt_debug(task, "controller conn init fd:%d", c->socket.fd);
450
451 r = nxt_mp_zget(c->mem_pool, sizeof(nxt_controller_request_t));
452 if (nxt_slow_path(r == NULL)) {
453 nxt_controller_conn_free(task, c, NULL);
454 return;
455 }
456
457 r->conn = c;
458
459 if (nxt_slow_path(nxt_http_parse_request_init(&r->parser, c->mem_pool)
460 != NXT_OK))
461 {
462 nxt_controller_conn_free(task, c, NULL);
463 return;
464 }
465
466 b = nxt_buf_mem_alloc(c->mem_pool, 1024, 0);
467 if (nxt_slow_path(b == NULL)) {
468 nxt_controller_conn_free(task, c, NULL);
469 return;
470 }
471
472 c->read = b;
473 c->socket.data = r;
474 c->socket.read_ready = 1;
475 c->read_state = &nxt_controller_conn_read_state;
476
477 engine = task->thread->engine;
478 c->read_work_queue = &engine->read_work_queue;
479 c->write_work_queue = &engine->write_work_queue;
480
481 nxt_conn_read(engine, c);
482}
483
484
485static const nxt_event_conn_state_t nxt_controller_conn_read_state
486 nxt_aligned(64) =
487{
488 .ready_handler = nxt_controller_conn_read,
489 .close_handler = nxt_controller_conn_close,
490 .error_handler = nxt_controller_conn_read_error,
491
492 .timer_handler = nxt_controller_conn_read_timeout,
493 .timer_value = nxt_controller_conn_timeout_value,
494 .timer_data = 60 * 1000,
495};
496
497
498static void
499nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data)
500{
501 size_t preread;
502 nxt_buf_t *b;
503 nxt_int_t rc;
504 nxt_conn_t *c;
505 nxt_controller_request_t *r;
506
507 c = obj;
508 r = data;
509
510 nxt_debug(task, "controller conn read");
511
512 nxt_queue_remove(&c->link);
513 nxt_queue_self(&c->link);
514
515 b = c->read;
516
517 rc = nxt_http_parse_request(&r->parser, &b->mem);
518
519 if (nxt_slow_path(rc != NXT_DONE)) {
520
521 if (rc == NXT_AGAIN) {
522 if (nxt_buf_mem_free_size(&b->mem) == 0) {
523 nxt_log(task, NXT_LOG_ERR, "too long request headers");
524 nxt_controller_conn_close(task, c, r);
525 return;
526 }
527
528 nxt_conn_read(task->thread->engine, c);
529 return;
530 }
531
532 /* rc == NXT_ERROR */
533
534 nxt_log(task, NXT_LOG_ERR, "parsing error");
535
536 nxt_controller_conn_close(task, c, r);
537 return;
538 }
539
540 rc = nxt_http_fields_process(r->parser.fields, &nxt_controller_fields_hash,
541 r);
542
543 if (nxt_slow_path(rc != NXT_OK)) {
544 nxt_controller_conn_close(task, c, r);
545 return;
546 }
547
548 preread = nxt_buf_mem_used_size(&b->mem);
549
550 nxt_debug(task, "controller request header parsing complete, "
551 "body length: %uz, preread: %uz",
552 r->length, preread);
553
554 if (preread >= r->length) {
555 nxt_controller_process_request(task, r);
556 return;
557 }
558
559 if (r->length - preread > (size_t) nxt_buf_mem_free_size(&b->mem)) {
560 b = nxt_buf_mem_alloc(c->mem_pool, r->length, 0);
561 if (nxt_slow_path(b == NULL)) {
562 nxt_controller_conn_free(task, c, NULL);
563 return;
564 }
565
566 b->mem.free = nxt_cpymem(b->mem.free, c->read->mem.pos, preread);
567
568 c->read = b;
569 }
570
571 c->read_state = &nxt_controller_conn_body_read_state;
572
573 nxt_conn_read(task->thread->engine, c);
574}
575
576
577static nxt_msec_t
578nxt_controller_conn_timeout_value(nxt_conn_t *c, uintptr_t data)
579{
580 return (nxt_msec_t) data;
581}
582
583
584static void
585nxt_controller_conn_read_error(nxt_task_t *task, void *obj, void *data)
586{
587 nxt_conn_t *c;
588
589 c = obj;
590
591 nxt_debug(task, "controller conn read error");
592
593 nxt_controller_conn_close(task, c, data);
594}
595
596
597static void
598nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj, void *data)
599{
600 nxt_timer_t *timer;
601 nxt_conn_t *c;
602
603 timer = obj;
604
605 c = nxt_read_timer_conn(timer);
606 c->socket.timedout = 1;
607 c->socket.closed = 1;
608
609 nxt_debug(task, "controller conn read timeout");
610
611 nxt_controller_conn_close(task, c, data);
612}
613
614
615static const nxt_event_conn_state_t nxt_controller_conn_body_read_state
616 nxt_aligned(64) =
617{
618 .ready_handler = nxt_controller_conn_body_read,
619 .close_handler = nxt_controller_conn_close,
620 .error_handler = nxt_controller_conn_read_error,
621
622 .timer_handler = nxt_controller_conn_read_timeout,
623 .timer_value = nxt_controller_conn_timeout_value,
624 .timer_data = 60 * 1000,
625 .timer_autoreset = 1,
626};
627
628
629static void
630nxt_controller_conn_body_read(nxt_task_t *task, void *obj, void *data)
631{
632 size_t read;
633 nxt_buf_t *b;
634 nxt_conn_t *c;
635 nxt_controller_request_t *r;
636
637 c = obj;
638 r = data;
639 b = c->read;
640
641 read = nxt_buf_mem_used_size(&b->mem);
642
643 nxt_debug(task, "controller conn body read: %uz of %uz",
644 read, r->length);
645
646 if (read >= r->length) {
647 nxt_controller_process_request(task, r);
648 return;
649 }
650
651 nxt_conn_read(task->thread->engine, c);
652}
653
654
655static const nxt_event_conn_state_t nxt_controller_conn_write_state
656 nxt_aligned(64) =
657{
658 .ready_handler = nxt_controller_conn_write,
659 .error_handler = nxt_controller_conn_write_error,
660
661 .timer_handler = nxt_controller_conn_write_timeout,
662 .timer_value = nxt_controller_conn_timeout_value,
663 .timer_data = 60 * 1000,
664 .timer_autoreset = 1,
665};
666
667
668static void
669nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data)
670{
671 nxt_buf_t *b;
672 nxt_conn_t *c;
673
674 c = obj;
675
676 nxt_debug(task, "controller conn write");
677
678 b = c->write;
679
680 if (b->mem.pos != b->mem.free) {
681 nxt_conn_write(task->thread->engine, c);
682 return;
683 }
684
685 nxt_debug(task, "controller conn write complete");
686
687 nxt_controller_conn_close(task, c, data);
688}
689
690
691static void
692nxt_controller_conn_write_error(nxt_task_t *task, void *obj, void *data)
693{
694 nxt_conn_t *c;
695
696 c = obj;
697
698 nxt_debug(task, "controller conn write error");
699
700 nxt_controller_conn_close(task, c, data);
701}
702
703
704static void
705nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj, void *data)
706{
707 nxt_conn_t *c;
708 nxt_timer_t *timer;
709
710 timer = obj;
711
712 c = nxt_write_timer_conn(timer);
713 c->socket.timedout = 1;
714 c->socket.closed = 1;
715
716 nxt_debug(task, "controller conn write timeout");
717
718 nxt_controller_conn_close(task, c, data);
719}
720
721
722static const nxt_event_conn_state_t nxt_controller_conn_close_state
723 nxt_aligned(64) =
724{
725 .ready_handler = nxt_controller_conn_free,
726};
727
728
729static void
730nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data)
731{
732 nxt_conn_t *c;
733
734 c = obj;
735
736 nxt_debug(task, "controller conn close");
737
738 nxt_queue_remove(&c->link);
739
740 c->write_state = &nxt_controller_conn_close_state;
741
742 nxt_conn_close(task->thread->engine, c);
743}
744
745
746static void
747nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data)
748{
749 nxt_conn_t *c;
750
751 c = obj;
752
753 nxt_debug(task, "controller conn free");
754
755 nxt_sockaddr_cache_free(task->thread->engine, c);
756
757 nxt_conn_free(task, c);
758}
759
760
761static nxt_int_t
762nxt_controller_request_content_length(void *ctx, nxt_http_field_t *field,
763 uintptr_t data)
764{
765 off_t length;
766 nxt_controller_request_t *r;
767
768 r = ctx;
769
770 length = nxt_off_t_parse(field->value, field->value_length);
771
772 if (nxt_fast_path(length > 0)) {
773
774 if (nxt_slow_path(length > NXT_SIZE_T_MAX)) {
775 nxt_log_error(NXT_LOG_ERR, &r->conn->log,
776 "Content-Length is too big");
777 return NXT_ERROR;
778 }
779
780 r->length = length;
781 return NXT_OK;
782 }
783
784 nxt_log_error(NXT_LOG_ERR, &r->conn->log, "Content-Length is invalid");
785
786 return NXT_ERROR;
787}
788
789
790static void
791nxt_controller_process_request(nxt_task_t *task, nxt_controller_request_t *req)
792{
793 nxt_mp_t *mp;
794 nxt_int_t rc;
795 nxt_str_t path;
796 nxt_conn_t *c;
797 nxt_buf_mem_t *mbuf;
798 nxt_conf_op_t *ops;
799 nxt_conf_value_t *value;
800 nxt_conf_validation_t vldt;
801 nxt_conf_json_error_t error;
802 nxt_controller_response_t resp;
803
804 static const nxt_str_t empty_obj = nxt_string("{}");
805
806 c = req->conn;
807 path = req->parser.path;
808
809 if (nxt_str_start(&path, "/config", 7)) {
810
811 if (path.length == 7) {
812 path.length = 1;
813
814 } else if (path.start[7] == '/') {
815 path.length -= 7;
816 path.start += 7;
817 }
818 }
819
820 if (path.length > 1 && path.start[path.length - 1] == '/') {
821 path.length--;
822 }
823
824 nxt_memzero(&resp, sizeof(nxt_controller_response_t));
825
826 if (nxt_str_eq(&req->parser.method, "GET", 3)) {
827
828 value = nxt_conf_get_path(nxt_controller_conf.root, &path);
829
830 if (value == NULL) {
831 goto not_found;
832 }
833
834 resp.status = 200;
835 resp.conf = value;
836
837 nxt_controller_response(task, req, &resp);
838 return;
839 }
840
841 if (nxt_str_eq(&req->parser.method, "PUT", 3)) {
842
843 if (!nxt_queue_is_empty(&nxt_controller_waiting_requests)) {
844 nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link);
845 return;
846 }
847
848 mp = nxt_mp_create(1024, 128, 256, 32);
849
850 if (nxt_slow_path(mp == NULL)) {
851 goto alloc_fail;
852 }
853
854 mbuf = &c->read->mem;
855
856 nxt_memzero(&error, sizeof(nxt_conf_json_error_t));
857
858 value = nxt_conf_json_parse(mp, mbuf->pos, mbuf->free, &error);
859
860 if (value == NULL) {
861 nxt_mp_destroy(mp);
862
863 if (error.pos == NULL) {
864 goto alloc_fail;
865 }
866
867 resp.status = 400;
868 resp.title = (u_char *) "Invalid JSON.";
869 resp.detail.length = nxt_strlen(error.detail);
870 resp.detail.start = error.detail;
871 resp.offset = error.pos - mbuf->pos;
872
873 nxt_conf_json_position(mbuf->pos, error.pos,
874 &resp.line, &resp.column);
875
876 nxt_controller_response(task, req, &resp);
877 return;
878 }
879
880 if (path.length != 1) {
881 rc = nxt_conf_op_compile(c->mem_pool, &ops,
882 nxt_controller_conf.root,
883 &path, value);
884
885 if (rc != NXT_OK) {
886 nxt_mp_destroy(mp);
887
888 if (rc == NXT_DECLINED) {
889 goto not_found;
890 }
891
892 goto alloc_fail;
893 }
894
895 value = nxt_conf_clone(mp, ops, nxt_controller_conf.root);
896
897 if (nxt_slow_path(value == NULL)) {
898 nxt_mp_destroy(mp);
899 goto alloc_fail;
900 }
901 }
902
903 nxt_memzero(&vldt, sizeof(nxt_conf_validation_t));
904
905 vldt.conf = value;
906 vldt.pool = c->mem_pool;
907
908 rc = nxt_conf_validate(&vldt);
909
910 if (nxt_slow_path(rc != NXT_OK)) {
911 nxt_mp_destroy(mp);
912
913 if (rc == NXT_DECLINED) {
914 resp.detail = vldt.error;
915 goto invalid_conf;
916 }
917
918 /* rc == NXT_ERROR */
919 goto alloc_fail;
920 }
921
922 rc = nxt_controller_conf_send(task, value,
923 nxt_controller_conf_handler, req);
924
925 if (nxt_slow_path(rc != NXT_OK)) {
926 nxt_mp_destroy(mp);
927
928 if (rc == NXT_DECLINED) {
929 goto no_router;
930 }
931
932 /* rc == NXT_ERROR */
933 goto alloc_fail;
934 }
935
936 req->conf.root = value;
937 req->conf.pool = mp;
938
939 nxt_queue_insert_head(&nxt_controller_waiting_requests, &req->link);
940
941 return;
942 }
943
944 if (nxt_str_eq(&req->parser.method, "DELETE", 6)) {
945
946 if (!nxt_queue_is_empty(&nxt_controller_waiting_requests)) {
947 nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link);
948 return;
949 }
950
951 if (path.length == 1) {
952 mp = nxt_mp_create(1024, 128, 256, 32);
953
954 if (nxt_slow_path(mp == NULL)) {
955 goto alloc_fail;
956 }
957
958 value = nxt_conf_json_parse_str(mp, &empty_obj);
959
960 } else {
961 rc = nxt_conf_op_compile(c->mem_pool, &ops,
962 nxt_controller_conf.root,
963 &path, NULL);
964
965 if (rc != NXT_OK) {
966 if (rc == NXT_DECLINED) {
967 goto not_found;
968 }
969
970 goto alloc_fail;
971 }
972
973 mp = nxt_mp_create(1024, 128, 256, 32);
974
975 if (nxt_slow_path(mp == NULL)) {
976 goto alloc_fail;
977 }
978
979 value = nxt_conf_clone(mp, ops, nxt_controller_conf.root);
980 }
981
982 if (nxt_slow_path(value == NULL)) {
983 nxt_mp_destroy(mp);
984 goto alloc_fail;
985 }
986
987 nxt_memzero(&vldt, sizeof(nxt_conf_validation_t));
988
989 vldt.conf = value;
990 vldt.pool = c->mem_pool;
991
992 rc = nxt_conf_validate(&vldt);
993
994 if (nxt_slow_path(rc != NXT_OK)) {
995 nxt_mp_destroy(mp);
996
997 if (rc == NXT_DECLINED) {
998 resp.detail = vldt.error;
999 goto invalid_conf;
1000 }
1001
1002 /* rc == NXT_ERROR */
1003 goto alloc_fail;
1004 }
1005
1006 rc = nxt_controller_conf_send(task, value,
1007 nxt_controller_conf_handler, req);
1008
1009 if (nxt_slow_path(rc != NXT_OK)) {
1010 nxt_mp_destroy(mp);
1011
1012 if (rc == NXT_DECLINED) {
1013 goto no_router;
1014 }
1015
1016 /* rc == NXT_ERROR */
1017 goto alloc_fail;
1018 }
1019
1020 req->conf.root = value;
1021 req->conf.pool = mp;
1022
1023 nxt_queue_insert_head(&nxt_controller_waiting_requests, &req->link);
1024
1025 return;
1026 }
1027
1028 resp.status = 405;
1029 resp.title = (u_char *) "Invalid method.";
1030 resp.offset = -1;
1031
1032 nxt_controller_response(task, req, &resp);
1033 return;
1034
1035not_found:
1036
1037 resp.status = 404;
1038 resp.title = (u_char *) "Value doesn't exist.";
1039 resp.offset = -1;
1040
1041 nxt_controller_response(task, req, &resp);
1042 return;
1043
1044invalid_conf:
1045
1046 resp.status = 400;
1047 resp.title = (u_char *) "Invalid configuration.";
1048 resp.offset = -1;
1049
1050 nxt_controller_response(task, req, &resp);
1051 return;
1052
1053alloc_fail:
1054
1055 resp.status = 500;
1056 resp.title = (u_char *) "Memory allocation failed.";
1057 resp.offset = -1;
1058
1059 nxt_controller_response(task, req, &resp);
1060 return;
1061
1062no_router:
1063
1064 resp.status = 500;
1065 resp.title = (u_char *) "Router process isn't available.";
1066 resp.offset = -1;
1067
1068 nxt_controller_response(task, req, &resp);
1069 return;
1070}
1071
1072
1073static void
1074nxt_controller_conf_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
1075 void *data)
1076{
1077 nxt_queue_t queue;
1078 nxt_controller_request_t *req;
1079 nxt_controller_response_t resp;
1080
1081 req = data;
1082
1083 nxt_debug(task, "controller conf ready: %*s",
1084 nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos);
1085
1086 nxt_queue_remove(&req->link);
1087
1088 nxt_memzero(&resp, sizeof(nxt_controller_response_t));
1089
1090 if (msg->port_msg.type == NXT_PORT_MSG_RPC_READY) {
1091 nxt_mp_destroy(nxt_controller_conf.pool);
1092
1093 nxt_controller_conf = req->conf;
1094
1095 nxt_controller_conf_store(task, req->conf.root);
1096
1097 resp.status = 200;
1098 resp.title = (u_char *) "Reconfiguration done.";
1099
1100 } else {
1101 nxt_mp_destroy(req->conf.pool);
1102
1103 resp.status = 500;
1104 resp.title = (u_char *) "Failed to apply new configuration.";
1105 resp.offset = -1;
1106 }
1107
1108 nxt_controller_response(task, req, &resp);
1109
1110 nxt_queue_init(&queue);
1111 nxt_queue_add(&queue, &nxt_controller_waiting_requests);
1112
1113 nxt_queue_init(&nxt_controller_waiting_requests);
1114
1115 nxt_queue_each(req, &queue, nxt_controller_request_t, link) {
1116 nxt_controller_process_request(task, req);
1117 } nxt_queue_loop;
1118}
1119
1120
1121static void
1122nxt_controller_conf_store(nxt_task_t *task, nxt_conf_value_t *conf)
1123{
1124 size_t size;
1125 nxt_buf_t *b;
1126 nxt_port_t *main_port;
1127 nxt_runtime_t *rt;
1128
1129 rt = task->thread->runtime;
1130
1131 main_port = rt->port_by_type[NXT_PROCESS_MAIN];
1132
1133 size = nxt_conf_json_length(conf, NULL);
1134
1135 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size);
1136
1137 if (nxt_fast_path(b != NULL)) {
1138 b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL);
1139
1140 (void) nxt_port_socket_write(task, main_port, NXT_PORT_MSG_CONF_STORE,
1141 -1, 0, -1, b);
1142 }
1143}
1144
1145
1146static void
1147nxt_controller_response(nxt_task_t *task, nxt_controller_request_t *req,
1148 nxt_controller_response_t *resp)
1149{
1150 size_t size;
1151 nxt_str_t status_line, str;
1152 nxt_buf_t *b, *body;
1153 nxt_conn_t *c;
1154 nxt_uint_t n;
1155 nxt_conf_value_t *value, *location;
1156 nxt_conf_json_pretty_t pretty;
1157
1158 static nxt_str_t success_str = nxt_string("success");
1159 static nxt_str_t error_str = nxt_string("error");
1160 static nxt_str_t detail_str = nxt_string("detail");
1161 static nxt_str_t location_str = nxt_string("location");
1162 static nxt_str_t offset_str = nxt_string("offset");
1163 static nxt_str_t line_str = nxt_string("line");
1164 static nxt_str_t column_str = nxt_string("column");
1165
1166 static nxt_time_string_t date_cache = {
1167 (nxt_atomic_uint_t) -1,
1168 nxt_controller_date,
1169 "%s, %02d %s %4d %02d:%02d:%02d GMT",
1170 sizeof("Wed, 31 Dec 1986 16:40:00 GMT") - 1,
1170 nxt_length("Wed, 31 Dec 1986 16:40:00 GMT"),
1171 NXT_THREAD_TIME_GMT,
1172 NXT_THREAD_TIME_SEC,
1173 };
1174
1175 switch (resp->status) {
1176
1177 case 200:
1178 nxt_str_set(&status_line, "200 OK");
1179 break;
1180
1181 case 400:
1182 nxt_str_set(&status_line, "400 Bad Request");
1183 break;
1184
1185 case 404:
1186 nxt_str_set(&status_line, "404 Not Found");
1187 break;
1188
1189 case 405:
1190 nxt_str_set(&status_line, "405 Method Not Allowed");
1191 break;
1192
1193 default:
1194 nxt_str_set(&status_line, "500 Internal Server Error");
1195 break;
1196 }
1197
1198 c = req->conn;
1199 value = resp->conf;
1200
1201 if (value == NULL) {
1202 n = 1
1203 + (resp->detail.length != 0)
1204 + (resp->status >= 400 && resp->offset != -1);
1205
1206 value = nxt_conf_create_object(c->mem_pool, n);
1207
1208 if (nxt_slow_path(value == NULL)) {
1209 nxt_controller_conn_close(task, c, req);
1210 return;
1211 }
1212
1213 str.length = nxt_strlen(resp->title);
1214 str.start = resp->title;
1215
1216 if (resp->status < 400) {
1217 nxt_conf_set_member_string(value, &success_str, &str, 0);
1218
1219 } else {
1220 nxt_conf_set_member_string(value, &error_str, &str, 0);
1221 }
1222
1223 n = 0;
1224
1225 if (resp->detail.length != 0) {
1226 n++;
1227
1228 nxt_conf_set_member_string(value, &detail_str, &resp->detail, n);
1229 }
1230
1231 if (resp->status >= 400 && resp->offset != -1) {
1232 n++;
1233
1234 location = nxt_conf_create_object(c->mem_pool,
1235 resp->line != 0 ? 3 : 1);
1236
1237 nxt_conf_set_member(value, &location_str, location, n);
1238
1239 nxt_conf_set_member_integer(location, &offset_str, resp->offset, 0);
1240
1241 if (resp->line != 0) {
1242 nxt_conf_set_member_integer(location, &line_str,
1243 resp->line, 1);
1244
1245 nxt_conf_set_member_integer(location, &column_str,
1246 resp->column, 2);
1247 }
1248 }
1249 }
1250
1251 nxt_memzero(&pretty, sizeof(nxt_conf_json_pretty_t));
1252
1253 size = nxt_conf_json_length(value, &pretty) + 2;
1254
1255 body = nxt_buf_mem_alloc(c->mem_pool, size, 0);
1256 if (nxt_slow_path(body == NULL)) {
1257 nxt_controller_conn_close(task, c, req);
1258 return;
1259 }
1260
1261 nxt_memzero(&pretty, sizeof(nxt_conf_json_pretty_t));
1262
1263 body->mem.free = nxt_conf_json_print(body->mem.free, value, &pretty);
1264
1265 body->mem.free = nxt_cpymem(body->mem.free, "\r\n", 2);
1266
1171 NXT_THREAD_TIME_GMT,
1172 NXT_THREAD_TIME_SEC,
1173 };
1174
1175 switch (resp->status) {
1176
1177 case 200:
1178 nxt_str_set(&status_line, "200 OK");
1179 break;
1180
1181 case 400:
1182 nxt_str_set(&status_line, "400 Bad Request");
1183 break;
1184
1185 case 404:
1186 nxt_str_set(&status_line, "404 Not Found");
1187 break;
1188
1189 case 405:
1190 nxt_str_set(&status_line, "405 Method Not Allowed");
1191 break;
1192
1193 default:
1194 nxt_str_set(&status_line, "500 Internal Server Error");
1195 break;
1196 }
1197
1198 c = req->conn;
1199 value = resp->conf;
1200
1201 if (value == NULL) {
1202 n = 1
1203 + (resp->detail.length != 0)
1204 + (resp->status >= 400 && resp->offset != -1);
1205
1206 value = nxt_conf_create_object(c->mem_pool, n);
1207
1208 if (nxt_slow_path(value == NULL)) {
1209 nxt_controller_conn_close(task, c, req);
1210 return;
1211 }
1212
1213 str.length = nxt_strlen(resp->title);
1214 str.start = resp->title;
1215
1216 if (resp->status < 400) {
1217 nxt_conf_set_member_string(value, &success_str, &str, 0);
1218
1219 } else {
1220 nxt_conf_set_member_string(value, &error_str, &str, 0);
1221 }
1222
1223 n = 0;
1224
1225 if (resp->detail.length != 0) {
1226 n++;
1227
1228 nxt_conf_set_member_string(value, &detail_str, &resp->detail, n);
1229 }
1230
1231 if (resp->status >= 400 && resp->offset != -1) {
1232 n++;
1233
1234 location = nxt_conf_create_object(c->mem_pool,
1235 resp->line != 0 ? 3 : 1);
1236
1237 nxt_conf_set_member(value, &location_str, location, n);
1238
1239 nxt_conf_set_member_integer(location, &offset_str, resp->offset, 0);
1240
1241 if (resp->line != 0) {
1242 nxt_conf_set_member_integer(location, &line_str,
1243 resp->line, 1);
1244
1245 nxt_conf_set_member_integer(location, &column_str,
1246 resp->column, 2);
1247 }
1248 }
1249 }
1250
1251 nxt_memzero(&pretty, sizeof(nxt_conf_json_pretty_t));
1252
1253 size = nxt_conf_json_length(value, &pretty) + 2;
1254
1255 body = nxt_buf_mem_alloc(c->mem_pool, size, 0);
1256 if (nxt_slow_path(body == NULL)) {
1257 nxt_controller_conn_close(task, c, req);
1258 return;
1259 }
1260
1261 nxt_memzero(&pretty, sizeof(nxt_conf_json_pretty_t));
1262
1263 body->mem.free = nxt_conf_json_print(body->mem.free, value, &pretty);
1264
1265 body->mem.free = nxt_cpymem(body->mem.free, "\r\n", 2);
1266
1267 size = sizeof("HTTP/1.1 " "\r\n") - 1 + status_line.length
1268 + sizeof("Server: " NXT_SERVER "\r\n") - 1
1269 + sizeof("Date: Wed, 31 Dec 1986 16:40:00 GMT\r\n") - 1
1270 + sizeof("Content-Type: application/json\r\n") - 1
1271 + sizeof("Content-Length: " "\r\n") - 1 + NXT_SIZE_T_LEN
1272 + sizeof("Connection: close\r\n") - 1
1273 + sizeof("\r\n") - 1;
1267 size = nxt_length("HTTP/1.1 " "\r\n") + status_line.length
1268 + nxt_length("Server: " NXT_SERVER "\r\n")
1269 + nxt_length("Date: Wed, 31 Dec 1986 16:40:00 GMT\r\n")
1270 + nxt_length("Content-Type: application/json\r\n")
1271 + nxt_length("Content-Length: " "\r\n") + NXT_SIZE_T_LEN
1272 + nxt_length("Connection: close\r\n")
1273 + nxt_length("\r\n");
1274
1275 b = nxt_buf_mem_alloc(c->mem_pool, size, 0);
1276 if (nxt_slow_path(b == NULL)) {
1277 nxt_controller_conn_close(task, c, req);
1278 return;
1279 }
1280
1281 b->next = body;
1282
1283 nxt_str_set(&str, "HTTP/1.1 ");
1284
1285 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length);
1286 b->mem.free = nxt_cpymem(b->mem.free, status_line.start,
1287 status_line.length);
1288
1289 nxt_str_set(&str, "\r\n"
1290 "Server: " NXT_SERVER "\r\n"
1291 "Date: ");
1292
1293 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length);
1294
1295 b->mem.free = nxt_thread_time_string(task->thread, &date_cache,
1296 b->mem.free);
1297
1298 nxt_str_set(&str, "\r\n"
1299 "Content-Type: application/json\r\n"
1300 "Content-Length: ");
1301
1302 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length);
1303
1304 b->mem.free = nxt_sprintf(b->mem.free, b->mem.end, "%uz",
1305 nxt_buf_mem_used_size(&body->mem));
1306
1307 nxt_str_set(&str, "\r\n"
1308 "Connection: close\r\n"
1309 "\r\n");
1310
1311 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length);
1312
1313 c->write = b;
1314 c->write_state = &nxt_controller_conn_write_state;
1315
1316 nxt_conn_write(task->thread->engine, c);
1317}
1318
1319
1320static u_char *
1321nxt_controller_date(u_char *buf, nxt_realtime_t *now, struct tm *tm,
1322 size_t size, const char *format)
1323{
1324 static const char *week[] = { "Sun", "Mon", "Tue", "Wed", "Thu", "Fri",
1325 "Sat" };
1326
1327 static const char *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun",
1328 "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
1329
1330 return nxt_sprintf(buf, buf + size, format,
1331 week[tm->tm_wday], tm->tm_mday,
1332 month[tm->tm_mon], tm->tm_year + 1900,
1333 tm->tm_hour, tm->tm_min, tm->tm_sec);
1334}
1274
1275 b = nxt_buf_mem_alloc(c->mem_pool, size, 0);
1276 if (nxt_slow_path(b == NULL)) {
1277 nxt_controller_conn_close(task, c, req);
1278 return;
1279 }
1280
1281 b->next = body;
1282
1283 nxt_str_set(&str, "HTTP/1.1 ");
1284
1285 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length);
1286 b->mem.free = nxt_cpymem(b->mem.free, status_line.start,
1287 status_line.length);
1288
1289 nxt_str_set(&str, "\r\n"
1290 "Server: " NXT_SERVER "\r\n"
1291 "Date: ");
1292
1293 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length);
1294
1295 b->mem.free = nxt_thread_time_string(task->thread, &date_cache,
1296 b->mem.free);
1297
1298 nxt_str_set(&str, "\r\n"
1299 "Content-Type: application/json\r\n"
1300 "Content-Length: ");
1301
1302 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length);
1303
1304 b->mem.free = nxt_sprintf(b->mem.free, b->mem.end, "%uz",
1305 nxt_buf_mem_used_size(&body->mem));
1306
1307 nxt_str_set(&str, "\r\n"
1308 "Connection: close\r\n"
1309 "\r\n");
1310
1311 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length);
1312
1313 c->write = b;
1314 c->write_state = &nxt_controller_conn_write_state;
1315
1316 nxt_conn_write(task->thread->engine, c);
1317}
1318
1319
1320static u_char *
1321nxt_controller_date(u_char *buf, nxt_realtime_t *now, struct tm *tm,
1322 size_t size, const char *format)
1323{
1324 static const char *week[] = { "Sun", "Mon", "Tue", "Wed", "Thu", "Fri",
1325 "Sat" };
1326
1327 static const char *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun",
1328 "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
1329
1330 return nxt_sprintf(buf, buf + size, format,
1331 week[tm->tm_wday], tm->tm_mday,
1332 month[tm->tm_mon], tm->tm_year + 1900,
1333 tm->tm_hour, tm->tm_min, tm->tm_sec);
1334}