nxt_controller.c (238:63a42123d971) nxt_controller.c (240:36bafba970b5)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_main.h>
9#include <nxt_runtime.h>
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_main.h>
9#include <nxt_runtime.h>
10#include <nxt_master_process.h>
10#include <nxt_main_process.h>
11#include <nxt_conf.h>
12
13
14typedef struct {
15 nxt_conf_value_t *root;
16 nxt_mp_t *pool;
17} nxt_controller_conf_t;
18
19
20typedef struct {
21 nxt_http_request_parse_t parser;
22 size_t length;
23 nxt_controller_conf_t conf;
24 nxt_conn_t *conn;
25 nxt_queue_link_t link;
26} nxt_controller_request_t;
27
28
29typedef struct {
30 nxt_uint_t status;
31 nxt_conf_value_t *conf;
32
33 u_char *title;
34 u_char *detail;
35 ssize_t offset;
36 nxt_uint_t line;
37 nxt_uint_t column;
38} nxt_controller_response_t;
39
40
41static void nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data);
42static void nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data);
43static nxt_msec_t nxt_controller_conn_timeout_value(nxt_conn_t *c,
44 uintptr_t data);
45static void nxt_controller_conn_read_error(nxt_task_t *task, void *obj,
46 void *data);
47static void nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj,
48 void *data);
49static void nxt_controller_conn_body_read(nxt_task_t *task, void *obj,
50 void *data);
51static void nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data);
52static void nxt_controller_conn_write_error(nxt_task_t *task, void *obj,
53 void *data);
54static void nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj,
55 void *data);
56static void nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data);
57static void nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data);
58
59static nxt_int_t nxt_controller_request_content_length(void *ctx,
60 nxt_http_field_t *field, nxt_log_t *log);
61
62static void nxt_controller_process_request(nxt_task_t *task,
63 nxt_controller_request_t *req);
64static nxt_int_t nxt_controller_conf_apply(nxt_task_t *task,
65 nxt_controller_request_t *req);
66static void nxt_controller_conf_handler(nxt_task_t *task,
67 nxt_port_recv_msg_t *msg, void *data);
68static void nxt_controller_response(nxt_task_t *task,
69 nxt_controller_request_t *req, nxt_controller_response_t *resp);
70static u_char *nxt_controller_date(u_char *buf, nxt_realtime_t *now,
71 struct tm *tm, size_t size, const char *format);
72
73
74static nxt_http_fields_hash_entry_t nxt_controller_request_fields[] = {
75 { nxt_string("Content-Length"),
76 &nxt_controller_request_content_length, 0 },
77
78 { nxt_null_string, NULL, 0 }
79};
80
81static nxt_http_fields_hash_t *nxt_controller_fields_hash;
82
83static nxt_controller_conf_t nxt_controller_conf;
84static nxt_queue_t nxt_controller_waiting_requests;
85
86
87static const nxt_event_conn_state_t nxt_controller_conn_read_state;
88static const nxt_event_conn_state_t nxt_controller_conn_body_read_state;
89static const nxt_event_conn_state_t nxt_controller_conn_write_state;
90static const nxt_event_conn_state_t nxt_controller_conn_close_state;
91
92
93nxt_int_t
94nxt_controller_start(nxt_task_t *task, void *data)
95{
96 nxt_mp_t *mp;
97 nxt_runtime_t *rt;
98 nxt_conf_value_t *conf;
99 nxt_http_fields_hash_t *hash;
100
101 static const nxt_str_t json
102 = nxt_string("{ \"listeners\": {}, \"applications\": {} }");
103
104 rt = task->thread->runtime;
105
106 hash = nxt_http_fields_hash_create(nxt_controller_request_fields,
107 rt->mem_pool);
108 if (nxt_slow_path(hash == NULL)) {
109 return NXT_ERROR;
110 }
111
112 nxt_controller_fields_hash = hash;
113
114 if (nxt_listen_event(task, rt->controller_socket) == NULL) {
115 return NXT_ERROR;
116 }
117
118 mp = nxt_mp_create(1024, 128, 256, 32);
119
120 if (nxt_slow_path(mp == NULL)) {
121 return NXT_ERROR;
122 }
123
124 conf = nxt_conf_json_parse_str(mp, &json);
125
126 if (conf == NULL) {
127 return NXT_ERROR;
128 }
129
130 nxt_controller_conf.root = conf;
131 nxt_controller_conf.pool = mp;
132
133 nxt_queue_init(&nxt_controller_waiting_requests);
134
135 return NXT_OK;
136}
137
138
139nxt_int_t
140nxt_runtime_controller_socket(nxt_task_t *task, nxt_runtime_t *rt)
141{
142 nxt_sockaddr_t *sa;
143 nxt_listen_socket_t *ls;
144
145 sa = rt->controller_listen;
146
147 ls = nxt_mp_alloc(rt->mem_pool, sizeof(nxt_listen_socket_t));
148 if (ls == NULL) {
149 return NXT_ERROR;
150 }
151
152 ls->sockaddr = nxt_sockaddr_create(rt->mem_pool, &sa->u.sockaddr,
153 sa->socklen, sa->length);
154 if (ls->sockaddr == NULL) {
155 return NXT_ERROR;
156 }
157
158 ls->sockaddr->type = sa->type;
159 ls->socklen = sa->socklen;
160 ls->address_length = sa->length;
161
162 nxt_sockaddr_text(ls->sockaddr);
163
164 ls->socket = -1;
165 ls->backlog = NXT_LISTEN_BACKLOG;
166 ls->read_after_accept = 1;
167 ls->flags = NXT_NONBLOCK;
168
169#if 0
170 /* STUB */
171 wq = nxt_mp_zget(cf->mem_pool, sizeof(nxt_work_queue_t));
172 if (wq == NULL) {
173 return NXT_ERROR;
174 }
175 nxt_work_queue_name(wq, "listen");
176 /**/
177
178 ls->work_queue = wq;
179#endif
180 ls->handler = nxt_controller_conn_init;
181
182 if (nxt_listen_socket_create(task, ls, 0) != NXT_OK) {
183 return NXT_ERROR;
184 }
185
186 rt->controller_socket = ls;
187
188 return NXT_OK;
189}
190
191
192static void
193nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data)
194{
195 nxt_buf_t *b;
196 nxt_conn_t *c;
197 nxt_event_engine_t *engine;
198 nxt_controller_request_t *r;
199
200 c = obj;
201
202 nxt_debug(task, "controller conn init fd:%d", c->socket.fd);
203
204 r = nxt_mp_zget(c->mem_pool, sizeof(nxt_controller_request_t));
205 if (nxt_slow_path(r == NULL)) {
206 nxt_controller_conn_free(task, c, NULL);
207 return;
208 }
209
210 r->conn = c;
211
212 if (nxt_slow_path(nxt_http_parse_request_init(&r->parser, c->mem_pool)
213 != NXT_OK))
214 {
215 nxt_controller_conn_free(task, c, NULL);
216 return;
217 }
218
219 r->parser.fields_hash = nxt_controller_fields_hash;
220
221 b = nxt_buf_mem_alloc(c->mem_pool, 1024, 0);
222 if (nxt_slow_path(b == NULL)) {
223 nxt_controller_conn_free(task, c, NULL);
224 return;
225 }
226
227 c->read = b;
228 c->socket.data = r;
229 c->socket.read_ready = 1;
230 c->read_state = &nxt_controller_conn_read_state;
231
232 engine = task->thread->engine;
233 c->read_work_queue = &engine->read_work_queue;
234 c->write_work_queue = &engine->write_work_queue;
235
236 nxt_conn_read(engine, c);
237}
238
239
240static const nxt_event_conn_state_t nxt_controller_conn_read_state
241 nxt_aligned(64) =
242{
243 .ready_handler = nxt_controller_conn_read,
244 .close_handler = nxt_controller_conn_close,
245 .error_handler = nxt_controller_conn_read_error,
246
247 .timer_handler = nxt_controller_conn_read_timeout,
248 .timer_value = nxt_controller_conn_timeout_value,
249 .timer_data = 60 * 1000,
250};
251
252
253static void
254nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data)
255{
256 size_t preread;
257 nxt_buf_t *b;
258 nxt_int_t rc;
259 nxt_conn_t *c;
260 nxt_controller_request_t *r;
261
262 c = obj;
263 r = data;
264
265 nxt_debug(task, "controller conn read");
266
267 nxt_queue_remove(&c->link);
268 nxt_queue_self(&c->link);
269
270 b = c->read;
271
272 rc = nxt_http_parse_request(&r->parser, &b->mem);
273
274 if (nxt_slow_path(rc != NXT_DONE)) {
275
276 if (rc == NXT_AGAIN) {
277 if (nxt_buf_mem_free_size(&b->mem) == 0) {
278 nxt_log(task, NXT_LOG_ERR, "too long request headers");
279 nxt_controller_conn_close(task, c, r);
280 return;
281 }
282
283 nxt_conn_read(task->thread->engine, c);
284 return;
285 }
286
287 /* rc == NXT_ERROR */
288
289 nxt_log(task, NXT_LOG_ERR, "parsing error");
290
291 nxt_controller_conn_close(task, c, r);
292 return;
293 }
294
295 rc = nxt_http_fields_process(r->parser.fields, r, task->log);
296
297 if (nxt_slow_path(rc != NXT_OK)) {
298 nxt_controller_conn_close(task, c, r);
299 return;
300 }
301
302 preread = nxt_buf_mem_used_size(&b->mem);
303
304 nxt_debug(task, "controller request header parsing complete, "
305 "body length: %uz, preread: %uz",
306 r->length, preread);
307
308 if (preread >= r->length) {
309 nxt_controller_process_request(task, r);
310 return;
311 }
312
313 if (r->length - preread > (size_t) nxt_buf_mem_free_size(&b->mem)) {
314 b = nxt_buf_mem_alloc(c->mem_pool, r->length, 0);
315 if (nxt_slow_path(b == NULL)) {
316 nxt_controller_conn_free(task, c, NULL);
317 return;
318 }
319
320 b->mem.free = nxt_cpymem(b->mem.free, c->read->mem.pos, preread);
321
322 c->read = b;
323 }
324
325 c->read_state = &nxt_controller_conn_body_read_state;
326
327 nxt_conn_read(task->thread->engine, c);
328}
329
330
331static nxt_msec_t
332nxt_controller_conn_timeout_value(nxt_conn_t *c, uintptr_t data)
333{
334 return (nxt_msec_t) data;
335}
336
337
338static void
339nxt_controller_conn_read_error(nxt_task_t *task, void *obj, void *data)
340{
341 nxt_conn_t *c;
342
343 c = obj;
344
345 nxt_debug(task, "controller conn read error");
346
347 nxt_controller_conn_close(task, c, data);
348}
349
350
351static void
352nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj, void *data)
353{
354 nxt_timer_t *timer;
355 nxt_conn_t *c;
356
357 timer = obj;
358
359 c = nxt_read_timer_conn(timer);
360 c->socket.timedout = 1;
361 c->socket.closed = 1;
362
363 nxt_debug(task, "controller conn read timeout");
364
365 nxt_controller_conn_close(task, c, data);
366}
367
368
369static const nxt_event_conn_state_t nxt_controller_conn_body_read_state
370 nxt_aligned(64) =
371{
372 .ready_handler = nxt_controller_conn_body_read,
373 .close_handler = nxt_controller_conn_close,
374 .error_handler = nxt_controller_conn_read_error,
375
376 .timer_handler = nxt_controller_conn_read_timeout,
377 .timer_value = nxt_controller_conn_timeout_value,
378 .timer_data = 60 * 1000,
379 .timer_autoreset = 1,
380};
381
382
383static void
384nxt_controller_conn_body_read(nxt_task_t *task, void *obj, void *data)
385{
386 size_t read;
387 nxt_buf_t *b;
388 nxt_conn_t *c;
389 nxt_controller_request_t *r;
390
391 c = obj;
392 r = data;
393 b = c->read;
394
395 read = nxt_buf_mem_used_size(&b->mem);
396
397 nxt_debug(task, "controller conn body read: %uz of %uz",
398 read, r->length);
399
400 if (read >= r->length) {
401 nxt_controller_process_request(task, r);
402 return;
403 }
404
405 nxt_conn_read(task->thread->engine, c);
406}
407
408
409static const nxt_event_conn_state_t nxt_controller_conn_write_state
410 nxt_aligned(64) =
411{
412 .ready_handler = nxt_controller_conn_write,
413 .error_handler = nxt_controller_conn_write_error,
414
415 .timer_handler = nxt_controller_conn_write_timeout,
416 .timer_value = nxt_controller_conn_timeout_value,
417 .timer_data = 60 * 1000,
418 .timer_autoreset = 1,
419};
420
421
422static void
423nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data)
424{
425 nxt_buf_t *b;
426 nxt_conn_t *c;
427
428 c = obj;
429
430 nxt_debug(task, "controller conn write");
431
432 b = c->write;
433
434 if (b->mem.pos != b->mem.free) {
435 nxt_conn_write(task->thread->engine, c);
436 return;
437 }
438
439 nxt_debug(task, "controller conn write complete");
440
441 nxt_controller_conn_close(task, c, data);
442}
443
444
445static void
446nxt_controller_conn_write_error(nxt_task_t *task, void *obj, void *data)
447{
448 nxt_conn_t *c;
449
450 c = obj;
451
452 nxt_debug(task, "controller conn write error");
453
454 nxt_controller_conn_close(task, c, data);
455}
456
457
458static void
459nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj, void *data)
460{
461 nxt_conn_t *c;
462 nxt_timer_t *timer;
463
464 timer = obj;
465
466 c = nxt_write_timer_conn(timer);
467 c->socket.timedout = 1;
468 c->socket.closed = 1;
469
470 nxt_debug(task, "controller conn write timeout");
471
472 nxt_controller_conn_close(task, c, data);
473}
474
475
476static const nxt_event_conn_state_t nxt_controller_conn_close_state
477 nxt_aligned(64) =
478{
479 .ready_handler = nxt_controller_conn_free,
480};
481
482
483static void
484nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data)
485{
486 nxt_conn_t *c;
487
488 c = obj;
489
490 nxt_debug(task, "controller conn close");
491
492 nxt_queue_remove(&c->link);
493
494 c->write_state = &nxt_controller_conn_close_state;
495
496 nxt_conn_close(task->thread->engine, c);
497}
498
499
500static void
501nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data)
502{
503 nxt_conn_t *c;
504
505 c = obj;
506
507 nxt_debug(task, "controller conn free");
508
509 nxt_mp_destroy(c->mem_pool);
510
511 //nxt_free(c);
512}
513
514
515static nxt_int_t
516nxt_controller_request_content_length(void *ctx, nxt_http_field_t *field,
517 nxt_log_t *log)
518{
519 off_t length;
520 nxt_controller_request_t *r;
521
522 r = ctx;
523
524 length = nxt_off_t_parse(field->value.start, field->value.length);
525
526 if (nxt_fast_path(length > 0)) {
527
528 if (nxt_slow_path(length > NXT_SIZE_T_MAX)) {
529 nxt_log_error(NXT_LOG_ERR, log, "Content-Length is too big");
530 return NXT_ERROR;
531 }
532
533 r->length = length;
534 return NXT_OK;
535 }
536
537 nxt_log_error(NXT_LOG_ERR, log, "Content-Length is invalid");
538
539 return NXT_ERROR;
540}
541
542
543static void
544nxt_controller_process_request(nxt_task_t *task, nxt_controller_request_t *req)
545{
546 nxt_mp_t *mp;
547 nxt_int_t rc;
548 nxt_str_t path;
549 nxt_conn_t *c;
550 nxt_buf_mem_t *mbuf;
551 nxt_conf_op_t *ops;
552 nxt_conf_value_t *value;
553 nxt_conf_json_error_t error;
554 nxt_controller_response_t resp;
555
556 static const nxt_str_t empty_obj = nxt_string("{}");
557
558 c = req->conn;
559 path = req->parser.path;
560
561 if (path.length > 1 && path.start[path.length - 1] == '/') {
562 path.length--;
563 }
564
565 nxt_memzero(&resp, sizeof(nxt_controller_response_t));
566
567 if (nxt_str_eq(&req->parser.method, "GET", 3)) {
568
569 value = nxt_conf_get_path(nxt_controller_conf.root, &path);
570
571 if (value == NULL) {
572 goto not_found;
573 }
574
575 resp.status = 200;
576 resp.conf = value;
577
578 nxt_controller_response(task, req, &resp);
579 return;
580 }
581
582 if (nxt_str_eq(&req->parser.method, "PUT", 3)) {
583
584 if (!nxt_queue_is_empty(&nxt_controller_waiting_requests)) {
585 nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link);
586 return;
587 }
588
589 mp = nxt_mp_create(1024, 128, 256, 32);
590
591 if (nxt_slow_path(mp == NULL)) {
592 goto alloc_fail;
593 }
594
595 mbuf = &c->read->mem;
596
597 nxt_memzero(&error, sizeof(nxt_conf_json_error_t));
598
599 value = nxt_conf_json_parse(mp, mbuf->pos, mbuf->free, &error);
600
601 if (value == NULL) {
602 nxt_mp_destroy(mp);
603
604 if (error.pos == NULL) {
605 goto alloc_fail;
606 }
607
608 resp.status = 400;
609 resp.title = (u_char *) "Invalid JSON.";
610 resp.detail = error.detail;
611 resp.offset = error.pos - mbuf->pos;
612
613 nxt_conf_json_position(mbuf->pos, error.pos,
614 &resp.line, &resp.column);
615
616 nxt_controller_response(task, req, &resp);
617 return;
618 }
619
620 if (path.length != 1) {
621 rc = nxt_conf_op_compile(c->mem_pool, &ops,
622 nxt_controller_conf.root,
623 &path, value);
624
625 if (rc != NXT_OK) {
626 if (rc == NXT_DECLINED) {
627 goto not_found;
628 }
629
630 goto alloc_fail;
631 }
632
633 value = nxt_conf_clone(mp, ops, nxt_controller_conf.root);
634
635 if (nxt_slow_path(value == NULL)) {
636 nxt_mp_destroy(mp);
637 goto alloc_fail;
638 }
639 }
640
641 if (nxt_slow_path(nxt_conf_validate(value) != NXT_OK)) {
642 nxt_mp_destroy(mp);
643 goto invalid_conf;
644 }
645
646 req->conf.root = value;
647 req->conf.pool = mp;
648
649 if (nxt_controller_conf_apply(task, req) != NXT_OK) {
650 nxt_mp_destroy(mp);
651 goto alloc_fail;
652 }
653
654 return;
655 }
656
657 if (nxt_str_eq(&req->parser.method, "DELETE", 6)) {
658
659 if (!nxt_queue_is_empty(&nxt_controller_waiting_requests)) {
660 nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link);
661 return;
662 }
663
664 if (path.length == 1) {
665 mp = nxt_mp_create(1024, 128, 256, 32);
666
667 if (nxt_slow_path(mp == NULL)) {
668 goto alloc_fail;
669 }
670
671 value = nxt_conf_json_parse_str(mp, &empty_obj);
672
673 } else {
674 rc = nxt_conf_op_compile(c->mem_pool, &ops,
675 nxt_controller_conf.root,
676 &path, NULL);
677
678 if (rc != NXT_OK) {
679 if (rc == NXT_DECLINED) {
680 goto not_found;
681 }
682
683 goto alloc_fail;
684 }
685
686 mp = nxt_mp_create(1024, 128, 256, 32);
687
688 if (nxt_slow_path(mp == NULL)) {
689 goto alloc_fail;
690 }
691
692 value = nxt_conf_clone(mp, ops, nxt_controller_conf.root);
693 }
694
695 if (nxt_slow_path(value == NULL)) {
696 nxt_mp_destroy(mp);
697 goto alloc_fail;
698 }
699
700 if (nxt_slow_path(nxt_conf_validate(value) != NXT_OK)) {
701 nxt_mp_destroy(mp);
702 goto invalid_conf;
703 }
704
705 req->conf.root = value;
706 req->conf.pool = mp;
707
708 if (nxt_controller_conf_apply(task, req) != NXT_OK) {
709 nxt_mp_destroy(mp);
710 goto alloc_fail;
711 }
712
713 return;
714 }
715
716 resp.status = 405;
717 resp.title = (u_char *) "Invalid method.";
718 resp.offset = -1;
719
720 nxt_controller_response(task, req, &resp);
721 return;
722
723alloc_fail:
724
725 resp.status = 500;
726 resp.title = (u_char *) "Memory allocation failed.";
727 resp.offset = -1;
728
729 nxt_controller_response(task, req, &resp);
730 return;
731
732not_found:
733
734 resp.status = 404;
735 resp.title = (u_char *) "Value doesn't exist.";
736 resp.offset = -1;
737
738 nxt_controller_response(task, req, &resp);
739 return;
740
741invalid_conf:
742
743 resp.status = 400;
744 resp.title = (u_char *) "Invalid configuration.";
745 resp.offset = -1;
746
747 nxt_controller_response(task, req, &resp);
748 return;
749}
750
751
752static nxt_int_t
753nxt_controller_conf_apply(nxt_task_t *task, nxt_controller_request_t *req)
754{
755 size_t size;
756 uint32_t stream;
757 nxt_int_t rc;
758 nxt_buf_t *b;
759 nxt_port_t *router_port, *controller_port;
760 nxt_runtime_t *rt;
761
762 rt = task->thread->runtime;
763
764 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
765 controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
766
767 size = nxt_conf_json_length(req->conf.root, NULL);
768
769 b = nxt_port_mmap_get_buf(task, router_port, size);
770
771 b->mem.free = nxt_conf_json_print(b->mem.free, req->conf.root, NULL);
772
773 stream = nxt_port_rpc_register_handler(task, controller_port,
774 nxt_controller_conf_handler,
775 nxt_controller_conf_handler,
776 router_port->pid, req);
777
778 rc = nxt_port_socket_write(task, router_port, NXT_PORT_MSG_DATA_LAST, -1,
779 stream, controller_port->id, b);
780
781 if (nxt_slow_path(rc != NXT_OK)) {
782 nxt_port_rpc_cancel(task, controller_port, stream);
783 return NXT_ERROR;
784 }
785
786 nxt_queue_insert_head(&nxt_controller_waiting_requests, &req->link);
787
788 return NXT_OK;
789}
790
791
792static void
793nxt_controller_conf_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
794 void *data)
795{
796 nxt_queue_t queue;
797 nxt_controller_request_t *req;
798 nxt_controller_response_t resp;
799
800 req = data;
801
802 nxt_debug(task, "controller conf ready: %*s",
803 nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos);
804
805 nxt_queue_remove(&req->link);
806
807 nxt_memzero(&resp, sizeof(nxt_controller_response_t));
808
809 if (msg->port_msg.type == NXT_PORT_MSG_RPC_READY) {
810 nxt_mp_destroy(nxt_controller_conf.pool);
811
812 nxt_controller_conf = req->conf;
813
814 resp.status = 200;
815 resp.title = (u_char *) "Reconfiguration done.";
816
817 } else {
818 nxt_mp_destroy(req->conf.pool);
819
820 resp.status = 500;
821 resp.title = (u_char *) "Failed to apply new configuration.";
822 resp.offset = -1;
823 }
824
825 nxt_controller_response(task, req, &resp);
826
827 nxt_queue_init(&queue);
828 nxt_queue_add(&queue, &nxt_controller_waiting_requests);
829
830 nxt_queue_init(&nxt_controller_waiting_requests);
831
832 nxt_queue_each(req, &queue, nxt_controller_request_t, link) {
833 nxt_controller_process_request(task, req);
834 } nxt_queue_loop;
835}
836
837
838static void
839nxt_controller_response(nxt_task_t *task, nxt_controller_request_t *req,
840 nxt_controller_response_t *resp)
841{
842 size_t size;
843 nxt_str_t status_line, str;
844 nxt_buf_t *b, *body;
845 nxt_conn_t *c;
846 nxt_uint_t n;
847 nxt_conf_value_t *value, *location;
848 nxt_conf_json_pretty_t pretty;
849
850 static nxt_str_t success_str = nxt_string("success");
851 static nxt_str_t error_str = nxt_string("error");
852 static nxt_str_t detail_str = nxt_string("detail");
853 static nxt_str_t location_str = nxt_string("location");
854 static nxt_str_t offset_str = nxt_string("offset");
855 static nxt_str_t line_str = nxt_string("line");
856 static nxt_str_t column_str = nxt_string("column");
857
858 static nxt_time_string_t date_cache = {
859 (nxt_atomic_uint_t) -1,
860 nxt_controller_date,
861 "%s, %02d %s %4d %02d:%02d:%02d GMT",
862 sizeof("Wed, 31 Dec 1986 16:40:00 GMT") - 1,
863 NXT_THREAD_TIME_GMT,
864 NXT_THREAD_TIME_SEC,
865 };
866
867 switch (resp->status) {
868
869 case 200:
870 nxt_str_set(&status_line, "200 OK");
871 break;
872
873 case 400:
874 nxt_str_set(&status_line, "400 Bad Request");
875 break;
876
877 case 404:
878 nxt_str_set(&status_line, "404 Not Found");
879 break;
880
881 case 405:
882 nxt_str_set(&status_line, "405 Method Not Allowed");
883 break;
884
885 default:
886 nxt_str_set(&status_line, "500 Internal Server Error");
887 break;
888 }
889
890 c = req->conn;
891 value = resp->conf;
892
893 if (value == NULL) {
894 n = 1
895 + (resp->detail != NULL)
896 + (resp->status >= 400 && resp->offset != -1);
897
898 value = nxt_conf_create_object(c->mem_pool, n);
899
900 if (nxt_slow_path(value == NULL)) {
901 nxt_controller_conn_close(task, c, req);
902 return;
903 }
904
905 str.length = nxt_strlen(resp->title);
906 str.start = resp->title;
907
908 if (resp->status < 400) {
909 nxt_conf_set_member_string(value, &success_str, &str, 0);
910
911 } else {
912 nxt_conf_set_member_string(value, &error_str, &str, 0);
913 }
914
915 n = 0;
916
917 if (resp->detail != NULL) {
918 str.length = nxt_strlen(resp->detail);
919 str.start = resp->detail;
920
921 n++;
922
923 nxt_conf_set_member_string(value, &detail_str, &str, n);
924 }
925
926 if (resp->status >= 400 && resp->offset != -1) {
927 n++;
928
929 location = nxt_conf_create_object(c->mem_pool,
930 resp->line != 0 ? 3 : 1);
931
932 nxt_conf_set_member(value, &location_str, location, n);
933
934 nxt_conf_set_member_integer(location, &offset_str, resp->offset, 0);
935
936 if (resp->line != 0) {
937 nxt_conf_set_member_integer(location, &line_str,
938 resp->line, 1);
939
940 nxt_conf_set_member_integer(location, &column_str,
941 resp->column, 2);
942 }
943 }
944 }
945
946 nxt_memzero(&pretty, sizeof(nxt_conf_json_pretty_t));
947
948 size = nxt_conf_json_length(value, &pretty) + 2;
949
950 body = nxt_buf_mem_alloc(c->mem_pool, size, 0);
951 if (nxt_slow_path(body == NULL)) {
952 nxt_controller_conn_close(task, c, req);
953 return;
954 }
955
956 nxt_memzero(&pretty, sizeof(nxt_conf_json_pretty_t));
957
958 body->mem.free = nxt_conf_json_print(body->mem.free, value, &pretty);
959
960 body->mem.free = nxt_cpymem(body->mem.free, "\r\n", 2);
961
962 size = sizeof("HTTP/1.1 " "\r\n") - 1 + status_line.length
963 + sizeof("Server: nginext/0.1\r\n") - 1
964 + sizeof("Date: Wed, 31 Dec 1986 16:40:00 GMT\r\n") - 1
965 + sizeof("Content-Type: application/json\r\n") - 1
966 + sizeof("Content-Length: " "\r\n") - 1 + NXT_SIZE_T_LEN
967 + sizeof("Connection: close\r\n") - 1
968 + sizeof("\r\n") - 1;
969
970 b = nxt_buf_mem_alloc(c->mem_pool, size, 0);
971 if (nxt_slow_path(b == NULL)) {
972 nxt_controller_conn_close(task, c, req);
973 return;
974 }
975
976 b->next = body;
977
978 nxt_str_set(&str, "HTTP/1.1 ");
979
980 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length);
981 b->mem.free = nxt_cpymem(b->mem.free, status_line.start,
982 status_line.length);
983
984 nxt_str_set(&str, "\r\n"
985 "Server: nginext/0.1\r\n"
986 "Date: ");
987
988 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length);
989
990 b->mem.free = nxt_thread_time_string(task->thread, &date_cache,
991 b->mem.free);
992
993 nxt_str_set(&str, "\r\n"
994 "Content-Type: application/json\r\n"
995 "Content-Length: ");
996
997 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length);
998
999 b->mem.free = nxt_sprintf(b->mem.free, b->mem.end, "%uz",
1000 nxt_buf_mem_used_size(&body->mem));
1001
1002 nxt_str_set(&str, "\r\n"
1003 "Connection: close\r\n"
1004 "\r\n");
1005
1006 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length);
1007
1008 c->write = b;
1009 c->write_state = &nxt_controller_conn_write_state;
1010
1011 nxt_conn_write(task->thread->engine, c);
1012}
1013
1014
1015static u_char *
1016nxt_controller_date(u_char *buf, nxt_realtime_t *now, struct tm *tm,
1017 size_t size, const char *format)
1018{
1019 static const char *week[] = { "Sun", "Mon", "Tue", "Wed", "Thu", "Fri",
1020 "Sat" };
1021
1022 static const char *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun",
1023 "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
1024
1025 return nxt_sprintf(buf, buf + size, format,
1026 week[tm->tm_wday], tm->tm_mday,
1027 month[tm->tm_mon], tm->tm_year + 1900,
1028 tm->tm_hour, tm->tm_min, tm->tm_sec);
1029}
11#include <nxt_conf.h>
12
13
14typedef struct {
15 nxt_conf_value_t *root;
16 nxt_mp_t *pool;
17} nxt_controller_conf_t;
18
19
20typedef struct {
21 nxt_http_request_parse_t parser;
22 size_t length;
23 nxt_controller_conf_t conf;
24 nxt_conn_t *conn;
25 nxt_queue_link_t link;
26} nxt_controller_request_t;
27
28
29typedef struct {
30 nxt_uint_t status;
31 nxt_conf_value_t *conf;
32
33 u_char *title;
34 u_char *detail;
35 ssize_t offset;
36 nxt_uint_t line;
37 nxt_uint_t column;
38} nxt_controller_response_t;
39
40
41static void nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data);
42static void nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data);
43static nxt_msec_t nxt_controller_conn_timeout_value(nxt_conn_t *c,
44 uintptr_t data);
45static void nxt_controller_conn_read_error(nxt_task_t *task, void *obj,
46 void *data);
47static void nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj,
48 void *data);
49static void nxt_controller_conn_body_read(nxt_task_t *task, void *obj,
50 void *data);
51static void nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data);
52static void nxt_controller_conn_write_error(nxt_task_t *task, void *obj,
53 void *data);
54static void nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj,
55 void *data);
56static void nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data);
57static void nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data);
58
59static nxt_int_t nxt_controller_request_content_length(void *ctx,
60 nxt_http_field_t *field, nxt_log_t *log);
61
62static void nxt_controller_process_request(nxt_task_t *task,
63 nxt_controller_request_t *req);
64static nxt_int_t nxt_controller_conf_apply(nxt_task_t *task,
65 nxt_controller_request_t *req);
66static void nxt_controller_conf_handler(nxt_task_t *task,
67 nxt_port_recv_msg_t *msg, void *data);
68static void nxt_controller_response(nxt_task_t *task,
69 nxt_controller_request_t *req, nxt_controller_response_t *resp);
70static u_char *nxt_controller_date(u_char *buf, nxt_realtime_t *now,
71 struct tm *tm, size_t size, const char *format);
72
73
74static nxt_http_fields_hash_entry_t nxt_controller_request_fields[] = {
75 { nxt_string("Content-Length"),
76 &nxt_controller_request_content_length, 0 },
77
78 { nxt_null_string, NULL, 0 }
79};
80
81static nxt_http_fields_hash_t *nxt_controller_fields_hash;
82
83static nxt_controller_conf_t nxt_controller_conf;
84static nxt_queue_t nxt_controller_waiting_requests;
85
86
87static const nxt_event_conn_state_t nxt_controller_conn_read_state;
88static const nxt_event_conn_state_t nxt_controller_conn_body_read_state;
89static const nxt_event_conn_state_t nxt_controller_conn_write_state;
90static const nxt_event_conn_state_t nxt_controller_conn_close_state;
91
92
93nxt_int_t
94nxt_controller_start(nxt_task_t *task, void *data)
95{
96 nxt_mp_t *mp;
97 nxt_runtime_t *rt;
98 nxt_conf_value_t *conf;
99 nxt_http_fields_hash_t *hash;
100
101 static const nxt_str_t json
102 = nxt_string("{ \"listeners\": {}, \"applications\": {} }");
103
104 rt = task->thread->runtime;
105
106 hash = nxt_http_fields_hash_create(nxt_controller_request_fields,
107 rt->mem_pool);
108 if (nxt_slow_path(hash == NULL)) {
109 return NXT_ERROR;
110 }
111
112 nxt_controller_fields_hash = hash;
113
114 if (nxt_listen_event(task, rt->controller_socket) == NULL) {
115 return NXT_ERROR;
116 }
117
118 mp = nxt_mp_create(1024, 128, 256, 32);
119
120 if (nxt_slow_path(mp == NULL)) {
121 return NXT_ERROR;
122 }
123
124 conf = nxt_conf_json_parse_str(mp, &json);
125
126 if (conf == NULL) {
127 return NXT_ERROR;
128 }
129
130 nxt_controller_conf.root = conf;
131 nxt_controller_conf.pool = mp;
132
133 nxt_queue_init(&nxt_controller_waiting_requests);
134
135 return NXT_OK;
136}
137
138
139nxt_int_t
140nxt_runtime_controller_socket(nxt_task_t *task, nxt_runtime_t *rt)
141{
142 nxt_sockaddr_t *sa;
143 nxt_listen_socket_t *ls;
144
145 sa = rt->controller_listen;
146
147 ls = nxt_mp_alloc(rt->mem_pool, sizeof(nxt_listen_socket_t));
148 if (ls == NULL) {
149 return NXT_ERROR;
150 }
151
152 ls->sockaddr = nxt_sockaddr_create(rt->mem_pool, &sa->u.sockaddr,
153 sa->socklen, sa->length);
154 if (ls->sockaddr == NULL) {
155 return NXT_ERROR;
156 }
157
158 ls->sockaddr->type = sa->type;
159 ls->socklen = sa->socklen;
160 ls->address_length = sa->length;
161
162 nxt_sockaddr_text(ls->sockaddr);
163
164 ls->socket = -1;
165 ls->backlog = NXT_LISTEN_BACKLOG;
166 ls->read_after_accept = 1;
167 ls->flags = NXT_NONBLOCK;
168
169#if 0
170 /* STUB */
171 wq = nxt_mp_zget(cf->mem_pool, sizeof(nxt_work_queue_t));
172 if (wq == NULL) {
173 return NXT_ERROR;
174 }
175 nxt_work_queue_name(wq, "listen");
176 /**/
177
178 ls->work_queue = wq;
179#endif
180 ls->handler = nxt_controller_conn_init;
181
182 if (nxt_listen_socket_create(task, ls, 0) != NXT_OK) {
183 return NXT_ERROR;
184 }
185
186 rt->controller_socket = ls;
187
188 return NXT_OK;
189}
190
191
192static void
193nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data)
194{
195 nxt_buf_t *b;
196 nxt_conn_t *c;
197 nxt_event_engine_t *engine;
198 nxt_controller_request_t *r;
199
200 c = obj;
201
202 nxt_debug(task, "controller conn init fd:%d", c->socket.fd);
203
204 r = nxt_mp_zget(c->mem_pool, sizeof(nxt_controller_request_t));
205 if (nxt_slow_path(r == NULL)) {
206 nxt_controller_conn_free(task, c, NULL);
207 return;
208 }
209
210 r->conn = c;
211
212 if (nxt_slow_path(nxt_http_parse_request_init(&r->parser, c->mem_pool)
213 != NXT_OK))
214 {
215 nxt_controller_conn_free(task, c, NULL);
216 return;
217 }
218
219 r->parser.fields_hash = nxt_controller_fields_hash;
220
221 b = nxt_buf_mem_alloc(c->mem_pool, 1024, 0);
222 if (nxt_slow_path(b == NULL)) {
223 nxt_controller_conn_free(task, c, NULL);
224 return;
225 }
226
227 c->read = b;
228 c->socket.data = r;
229 c->socket.read_ready = 1;
230 c->read_state = &nxt_controller_conn_read_state;
231
232 engine = task->thread->engine;
233 c->read_work_queue = &engine->read_work_queue;
234 c->write_work_queue = &engine->write_work_queue;
235
236 nxt_conn_read(engine, c);
237}
238
239
240static const nxt_event_conn_state_t nxt_controller_conn_read_state
241 nxt_aligned(64) =
242{
243 .ready_handler = nxt_controller_conn_read,
244 .close_handler = nxt_controller_conn_close,
245 .error_handler = nxt_controller_conn_read_error,
246
247 .timer_handler = nxt_controller_conn_read_timeout,
248 .timer_value = nxt_controller_conn_timeout_value,
249 .timer_data = 60 * 1000,
250};
251
252
253static void
254nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data)
255{
256 size_t preread;
257 nxt_buf_t *b;
258 nxt_int_t rc;
259 nxt_conn_t *c;
260 nxt_controller_request_t *r;
261
262 c = obj;
263 r = data;
264
265 nxt_debug(task, "controller conn read");
266
267 nxt_queue_remove(&c->link);
268 nxt_queue_self(&c->link);
269
270 b = c->read;
271
272 rc = nxt_http_parse_request(&r->parser, &b->mem);
273
274 if (nxt_slow_path(rc != NXT_DONE)) {
275
276 if (rc == NXT_AGAIN) {
277 if (nxt_buf_mem_free_size(&b->mem) == 0) {
278 nxt_log(task, NXT_LOG_ERR, "too long request headers");
279 nxt_controller_conn_close(task, c, r);
280 return;
281 }
282
283 nxt_conn_read(task->thread->engine, c);
284 return;
285 }
286
287 /* rc == NXT_ERROR */
288
289 nxt_log(task, NXT_LOG_ERR, "parsing error");
290
291 nxt_controller_conn_close(task, c, r);
292 return;
293 }
294
295 rc = nxt_http_fields_process(r->parser.fields, r, task->log);
296
297 if (nxt_slow_path(rc != NXT_OK)) {
298 nxt_controller_conn_close(task, c, r);
299 return;
300 }
301
302 preread = nxt_buf_mem_used_size(&b->mem);
303
304 nxt_debug(task, "controller request header parsing complete, "
305 "body length: %uz, preread: %uz",
306 r->length, preread);
307
308 if (preread >= r->length) {
309 nxt_controller_process_request(task, r);
310 return;
311 }
312
313 if (r->length - preread > (size_t) nxt_buf_mem_free_size(&b->mem)) {
314 b = nxt_buf_mem_alloc(c->mem_pool, r->length, 0);
315 if (nxt_slow_path(b == NULL)) {
316 nxt_controller_conn_free(task, c, NULL);
317 return;
318 }
319
320 b->mem.free = nxt_cpymem(b->mem.free, c->read->mem.pos, preread);
321
322 c->read = b;
323 }
324
325 c->read_state = &nxt_controller_conn_body_read_state;
326
327 nxt_conn_read(task->thread->engine, c);
328}
329
330
331static nxt_msec_t
332nxt_controller_conn_timeout_value(nxt_conn_t *c, uintptr_t data)
333{
334 return (nxt_msec_t) data;
335}
336
337
338static void
339nxt_controller_conn_read_error(nxt_task_t *task, void *obj, void *data)
340{
341 nxt_conn_t *c;
342
343 c = obj;
344
345 nxt_debug(task, "controller conn read error");
346
347 nxt_controller_conn_close(task, c, data);
348}
349
350
351static void
352nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj, void *data)
353{
354 nxt_timer_t *timer;
355 nxt_conn_t *c;
356
357 timer = obj;
358
359 c = nxt_read_timer_conn(timer);
360 c->socket.timedout = 1;
361 c->socket.closed = 1;
362
363 nxt_debug(task, "controller conn read timeout");
364
365 nxt_controller_conn_close(task, c, data);
366}
367
368
369static const nxt_event_conn_state_t nxt_controller_conn_body_read_state
370 nxt_aligned(64) =
371{
372 .ready_handler = nxt_controller_conn_body_read,
373 .close_handler = nxt_controller_conn_close,
374 .error_handler = nxt_controller_conn_read_error,
375
376 .timer_handler = nxt_controller_conn_read_timeout,
377 .timer_value = nxt_controller_conn_timeout_value,
378 .timer_data = 60 * 1000,
379 .timer_autoreset = 1,
380};
381
382
383static void
384nxt_controller_conn_body_read(nxt_task_t *task, void *obj, void *data)
385{
386 size_t read;
387 nxt_buf_t *b;
388 nxt_conn_t *c;
389 nxt_controller_request_t *r;
390
391 c = obj;
392 r = data;
393 b = c->read;
394
395 read = nxt_buf_mem_used_size(&b->mem);
396
397 nxt_debug(task, "controller conn body read: %uz of %uz",
398 read, r->length);
399
400 if (read >= r->length) {
401 nxt_controller_process_request(task, r);
402 return;
403 }
404
405 nxt_conn_read(task->thread->engine, c);
406}
407
408
409static const nxt_event_conn_state_t nxt_controller_conn_write_state
410 nxt_aligned(64) =
411{
412 .ready_handler = nxt_controller_conn_write,
413 .error_handler = nxt_controller_conn_write_error,
414
415 .timer_handler = nxt_controller_conn_write_timeout,
416 .timer_value = nxt_controller_conn_timeout_value,
417 .timer_data = 60 * 1000,
418 .timer_autoreset = 1,
419};
420
421
422static void
423nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data)
424{
425 nxt_buf_t *b;
426 nxt_conn_t *c;
427
428 c = obj;
429
430 nxt_debug(task, "controller conn write");
431
432 b = c->write;
433
434 if (b->mem.pos != b->mem.free) {
435 nxt_conn_write(task->thread->engine, c);
436 return;
437 }
438
439 nxt_debug(task, "controller conn write complete");
440
441 nxt_controller_conn_close(task, c, data);
442}
443
444
445static void
446nxt_controller_conn_write_error(nxt_task_t *task, void *obj, void *data)
447{
448 nxt_conn_t *c;
449
450 c = obj;
451
452 nxt_debug(task, "controller conn write error");
453
454 nxt_controller_conn_close(task, c, data);
455}
456
457
458static void
459nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj, void *data)
460{
461 nxt_conn_t *c;
462 nxt_timer_t *timer;
463
464 timer = obj;
465
466 c = nxt_write_timer_conn(timer);
467 c->socket.timedout = 1;
468 c->socket.closed = 1;
469
470 nxt_debug(task, "controller conn write timeout");
471
472 nxt_controller_conn_close(task, c, data);
473}
474
475
476static const nxt_event_conn_state_t nxt_controller_conn_close_state
477 nxt_aligned(64) =
478{
479 .ready_handler = nxt_controller_conn_free,
480};
481
482
483static void
484nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data)
485{
486 nxt_conn_t *c;
487
488 c = obj;
489
490 nxt_debug(task, "controller conn close");
491
492 nxt_queue_remove(&c->link);
493
494 c->write_state = &nxt_controller_conn_close_state;
495
496 nxt_conn_close(task->thread->engine, c);
497}
498
499
500static void
501nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data)
502{
503 nxt_conn_t *c;
504
505 c = obj;
506
507 nxt_debug(task, "controller conn free");
508
509 nxt_mp_destroy(c->mem_pool);
510
511 //nxt_free(c);
512}
513
514
515static nxt_int_t
516nxt_controller_request_content_length(void *ctx, nxt_http_field_t *field,
517 nxt_log_t *log)
518{
519 off_t length;
520 nxt_controller_request_t *r;
521
522 r = ctx;
523
524 length = nxt_off_t_parse(field->value.start, field->value.length);
525
526 if (nxt_fast_path(length > 0)) {
527
528 if (nxt_slow_path(length > NXT_SIZE_T_MAX)) {
529 nxt_log_error(NXT_LOG_ERR, log, "Content-Length is too big");
530 return NXT_ERROR;
531 }
532
533 r->length = length;
534 return NXT_OK;
535 }
536
537 nxt_log_error(NXT_LOG_ERR, log, "Content-Length is invalid");
538
539 return NXT_ERROR;
540}
541
542
543static void
544nxt_controller_process_request(nxt_task_t *task, nxt_controller_request_t *req)
545{
546 nxt_mp_t *mp;
547 nxt_int_t rc;
548 nxt_str_t path;
549 nxt_conn_t *c;
550 nxt_buf_mem_t *mbuf;
551 nxt_conf_op_t *ops;
552 nxt_conf_value_t *value;
553 nxt_conf_json_error_t error;
554 nxt_controller_response_t resp;
555
556 static const nxt_str_t empty_obj = nxt_string("{}");
557
558 c = req->conn;
559 path = req->parser.path;
560
561 if (path.length > 1 && path.start[path.length - 1] == '/') {
562 path.length--;
563 }
564
565 nxt_memzero(&resp, sizeof(nxt_controller_response_t));
566
567 if (nxt_str_eq(&req->parser.method, "GET", 3)) {
568
569 value = nxt_conf_get_path(nxt_controller_conf.root, &path);
570
571 if (value == NULL) {
572 goto not_found;
573 }
574
575 resp.status = 200;
576 resp.conf = value;
577
578 nxt_controller_response(task, req, &resp);
579 return;
580 }
581
582 if (nxt_str_eq(&req->parser.method, "PUT", 3)) {
583
584 if (!nxt_queue_is_empty(&nxt_controller_waiting_requests)) {
585 nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link);
586 return;
587 }
588
589 mp = nxt_mp_create(1024, 128, 256, 32);
590
591 if (nxt_slow_path(mp == NULL)) {
592 goto alloc_fail;
593 }
594
595 mbuf = &c->read->mem;
596
597 nxt_memzero(&error, sizeof(nxt_conf_json_error_t));
598
599 value = nxt_conf_json_parse(mp, mbuf->pos, mbuf->free, &error);
600
601 if (value == NULL) {
602 nxt_mp_destroy(mp);
603
604 if (error.pos == NULL) {
605 goto alloc_fail;
606 }
607
608 resp.status = 400;
609 resp.title = (u_char *) "Invalid JSON.";
610 resp.detail = error.detail;
611 resp.offset = error.pos - mbuf->pos;
612
613 nxt_conf_json_position(mbuf->pos, error.pos,
614 &resp.line, &resp.column);
615
616 nxt_controller_response(task, req, &resp);
617 return;
618 }
619
620 if (path.length != 1) {
621 rc = nxt_conf_op_compile(c->mem_pool, &ops,
622 nxt_controller_conf.root,
623 &path, value);
624
625 if (rc != NXT_OK) {
626 if (rc == NXT_DECLINED) {
627 goto not_found;
628 }
629
630 goto alloc_fail;
631 }
632
633 value = nxt_conf_clone(mp, ops, nxt_controller_conf.root);
634
635 if (nxt_slow_path(value == NULL)) {
636 nxt_mp_destroy(mp);
637 goto alloc_fail;
638 }
639 }
640
641 if (nxt_slow_path(nxt_conf_validate(value) != NXT_OK)) {
642 nxt_mp_destroy(mp);
643 goto invalid_conf;
644 }
645
646 req->conf.root = value;
647 req->conf.pool = mp;
648
649 if (nxt_controller_conf_apply(task, req) != NXT_OK) {
650 nxt_mp_destroy(mp);
651 goto alloc_fail;
652 }
653
654 return;
655 }
656
657 if (nxt_str_eq(&req->parser.method, "DELETE", 6)) {
658
659 if (!nxt_queue_is_empty(&nxt_controller_waiting_requests)) {
660 nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link);
661 return;
662 }
663
664 if (path.length == 1) {
665 mp = nxt_mp_create(1024, 128, 256, 32);
666
667 if (nxt_slow_path(mp == NULL)) {
668 goto alloc_fail;
669 }
670
671 value = nxt_conf_json_parse_str(mp, &empty_obj);
672
673 } else {
674 rc = nxt_conf_op_compile(c->mem_pool, &ops,
675 nxt_controller_conf.root,
676 &path, NULL);
677
678 if (rc != NXT_OK) {
679 if (rc == NXT_DECLINED) {
680 goto not_found;
681 }
682
683 goto alloc_fail;
684 }
685
686 mp = nxt_mp_create(1024, 128, 256, 32);
687
688 if (nxt_slow_path(mp == NULL)) {
689 goto alloc_fail;
690 }
691
692 value = nxt_conf_clone(mp, ops, nxt_controller_conf.root);
693 }
694
695 if (nxt_slow_path(value == NULL)) {
696 nxt_mp_destroy(mp);
697 goto alloc_fail;
698 }
699
700 if (nxt_slow_path(nxt_conf_validate(value) != NXT_OK)) {
701 nxt_mp_destroy(mp);
702 goto invalid_conf;
703 }
704
705 req->conf.root = value;
706 req->conf.pool = mp;
707
708 if (nxt_controller_conf_apply(task, req) != NXT_OK) {
709 nxt_mp_destroy(mp);
710 goto alloc_fail;
711 }
712
713 return;
714 }
715
716 resp.status = 405;
717 resp.title = (u_char *) "Invalid method.";
718 resp.offset = -1;
719
720 nxt_controller_response(task, req, &resp);
721 return;
722
723alloc_fail:
724
725 resp.status = 500;
726 resp.title = (u_char *) "Memory allocation failed.";
727 resp.offset = -1;
728
729 nxt_controller_response(task, req, &resp);
730 return;
731
732not_found:
733
734 resp.status = 404;
735 resp.title = (u_char *) "Value doesn't exist.";
736 resp.offset = -1;
737
738 nxt_controller_response(task, req, &resp);
739 return;
740
741invalid_conf:
742
743 resp.status = 400;
744 resp.title = (u_char *) "Invalid configuration.";
745 resp.offset = -1;
746
747 nxt_controller_response(task, req, &resp);
748 return;
749}
750
751
752static nxt_int_t
753nxt_controller_conf_apply(nxt_task_t *task, nxt_controller_request_t *req)
754{
755 size_t size;
756 uint32_t stream;
757 nxt_int_t rc;
758 nxt_buf_t *b;
759 nxt_port_t *router_port, *controller_port;
760 nxt_runtime_t *rt;
761
762 rt = task->thread->runtime;
763
764 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
765 controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
766
767 size = nxt_conf_json_length(req->conf.root, NULL);
768
769 b = nxt_port_mmap_get_buf(task, router_port, size);
770
771 b->mem.free = nxt_conf_json_print(b->mem.free, req->conf.root, NULL);
772
773 stream = nxt_port_rpc_register_handler(task, controller_port,
774 nxt_controller_conf_handler,
775 nxt_controller_conf_handler,
776 router_port->pid, req);
777
778 rc = nxt_port_socket_write(task, router_port, NXT_PORT_MSG_DATA_LAST, -1,
779 stream, controller_port->id, b);
780
781 if (nxt_slow_path(rc != NXT_OK)) {
782 nxt_port_rpc_cancel(task, controller_port, stream);
783 return NXT_ERROR;
784 }
785
786 nxt_queue_insert_head(&nxt_controller_waiting_requests, &req->link);
787
788 return NXT_OK;
789}
790
791
792static void
793nxt_controller_conf_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
794 void *data)
795{
796 nxt_queue_t queue;
797 nxt_controller_request_t *req;
798 nxt_controller_response_t resp;
799
800 req = data;
801
802 nxt_debug(task, "controller conf ready: %*s",
803 nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos);
804
805 nxt_queue_remove(&req->link);
806
807 nxt_memzero(&resp, sizeof(nxt_controller_response_t));
808
809 if (msg->port_msg.type == NXT_PORT_MSG_RPC_READY) {
810 nxt_mp_destroy(nxt_controller_conf.pool);
811
812 nxt_controller_conf = req->conf;
813
814 resp.status = 200;
815 resp.title = (u_char *) "Reconfiguration done.";
816
817 } else {
818 nxt_mp_destroy(req->conf.pool);
819
820 resp.status = 500;
821 resp.title = (u_char *) "Failed to apply new configuration.";
822 resp.offset = -1;
823 }
824
825 nxt_controller_response(task, req, &resp);
826
827 nxt_queue_init(&queue);
828 nxt_queue_add(&queue, &nxt_controller_waiting_requests);
829
830 nxt_queue_init(&nxt_controller_waiting_requests);
831
832 nxt_queue_each(req, &queue, nxt_controller_request_t, link) {
833 nxt_controller_process_request(task, req);
834 } nxt_queue_loop;
835}
836
837
838static void
839nxt_controller_response(nxt_task_t *task, nxt_controller_request_t *req,
840 nxt_controller_response_t *resp)
841{
842 size_t size;
843 nxt_str_t status_line, str;
844 nxt_buf_t *b, *body;
845 nxt_conn_t *c;
846 nxt_uint_t n;
847 nxt_conf_value_t *value, *location;
848 nxt_conf_json_pretty_t pretty;
849
850 static nxt_str_t success_str = nxt_string("success");
851 static nxt_str_t error_str = nxt_string("error");
852 static nxt_str_t detail_str = nxt_string("detail");
853 static nxt_str_t location_str = nxt_string("location");
854 static nxt_str_t offset_str = nxt_string("offset");
855 static nxt_str_t line_str = nxt_string("line");
856 static nxt_str_t column_str = nxt_string("column");
857
858 static nxt_time_string_t date_cache = {
859 (nxt_atomic_uint_t) -1,
860 nxt_controller_date,
861 "%s, %02d %s %4d %02d:%02d:%02d GMT",
862 sizeof("Wed, 31 Dec 1986 16:40:00 GMT") - 1,
863 NXT_THREAD_TIME_GMT,
864 NXT_THREAD_TIME_SEC,
865 };
866
867 switch (resp->status) {
868
869 case 200:
870 nxt_str_set(&status_line, "200 OK");
871 break;
872
873 case 400:
874 nxt_str_set(&status_line, "400 Bad Request");
875 break;
876
877 case 404:
878 nxt_str_set(&status_line, "404 Not Found");
879 break;
880
881 case 405:
882 nxt_str_set(&status_line, "405 Method Not Allowed");
883 break;
884
885 default:
886 nxt_str_set(&status_line, "500 Internal Server Error");
887 break;
888 }
889
890 c = req->conn;
891 value = resp->conf;
892
893 if (value == NULL) {
894 n = 1
895 + (resp->detail != NULL)
896 + (resp->status >= 400 && resp->offset != -1);
897
898 value = nxt_conf_create_object(c->mem_pool, n);
899
900 if (nxt_slow_path(value == NULL)) {
901 nxt_controller_conn_close(task, c, req);
902 return;
903 }
904
905 str.length = nxt_strlen(resp->title);
906 str.start = resp->title;
907
908 if (resp->status < 400) {
909 nxt_conf_set_member_string(value, &success_str, &str, 0);
910
911 } else {
912 nxt_conf_set_member_string(value, &error_str, &str, 0);
913 }
914
915 n = 0;
916
917 if (resp->detail != NULL) {
918 str.length = nxt_strlen(resp->detail);
919 str.start = resp->detail;
920
921 n++;
922
923 nxt_conf_set_member_string(value, &detail_str, &str, n);
924 }
925
926 if (resp->status >= 400 && resp->offset != -1) {
927 n++;
928
929 location = nxt_conf_create_object(c->mem_pool,
930 resp->line != 0 ? 3 : 1);
931
932 nxt_conf_set_member(value, &location_str, location, n);
933
934 nxt_conf_set_member_integer(location, &offset_str, resp->offset, 0);
935
936 if (resp->line != 0) {
937 nxt_conf_set_member_integer(location, &line_str,
938 resp->line, 1);
939
940 nxt_conf_set_member_integer(location, &column_str,
941 resp->column, 2);
942 }
943 }
944 }
945
946 nxt_memzero(&pretty, sizeof(nxt_conf_json_pretty_t));
947
948 size = nxt_conf_json_length(value, &pretty) + 2;
949
950 body = nxt_buf_mem_alloc(c->mem_pool, size, 0);
951 if (nxt_slow_path(body == NULL)) {
952 nxt_controller_conn_close(task, c, req);
953 return;
954 }
955
956 nxt_memzero(&pretty, sizeof(nxt_conf_json_pretty_t));
957
958 body->mem.free = nxt_conf_json_print(body->mem.free, value, &pretty);
959
960 body->mem.free = nxt_cpymem(body->mem.free, "\r\n", 2);
961
962 size = sizeof("HTTP/1.1 " "\r\n") - 1 + status_line.length
963 + sizeof("Server: nginext/0.1\r\n") - 1
964 + sizeof("Date: Wed, 31 Dec 1986 16:40:00 GMT\r\n") - 1
965 + sizeof("Content-Type: application/json\r\n") - 1
966 + sizeof("Content-Length: " "\r\n") - 1 + NXT_SIZE_T_LEN
967 + sizeof("Connection: close\r\n") - 1
968 + sizeof("\r\n") - 1;
969
970 b = nxt_buf_mem_alloc(c->mem_pool, size, 0);
971 if (nxt_slow_path(b == NULL)) {
972 nxt_controller_conn_close(task, c, req);
973 return;
974 }
975
976 b->next = body;
977
978 nxt_str_set(&str, "HTTP/1.1 ");
979
980 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length);
981 b->mem.free = nxt_cpymem(b->mem.free, status_line.start,
982 status_line.length);
983
984 nxt_str_set(&str, "\r\n"
985 "Server: nginext/0.1\r\n"
986 "Date: ");
987
988 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length);
989
990 b->mem.free = nxt_thread_time_string(task->thread, &date_cache,
991 b->mem.free);
992
993 nxt_str_set(&str, "\r\n"
994 "Content-Type: application/json\r\n"
995 "Content-Length: ");
996
997 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length);
998
999 b->mem.free = nxt_sprintf(b->mem.free, b->mem.end, "%uz",
1000 nxt_buf_mem_used_size(&body->mem));
1001
1002 nxt_str_set(&str, "\r\n"
1003 "Connection: close\r\n"
1004 "\r\n");
1005
1006 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length);
1007
1008 c->write = b;
1009 c->write_state = &nxt_controller_conn_write_state;
1010
1011 nxt_conn_write(task->thread->engine, c);
1012}
1013
1014
1015static u_char *
1016nxt_controller_date(u_char *buf, nxt_realtime_t *now, struct tm *tm,
1017 size_t size, const char *format)
1018{
1019 static const char *week[] = { "Sun", "Mon", "Tue", "Wed", "Thu", "Fri",
1020 "Sat" };
1021
1022 static const char *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun",
1023 "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
1024
1025 return nxt_sprintf(buf, buf + size, format,
1026 week[tm->tm_wday], tm->tm_mday,
1027 month[tm->tm_mon], tm->tm_year + 1900,
1028 tm->tm_hour, tm->tm_min, tm->tm_sec);
1029}