nxt_controller.c (201:ca6edbf8bf49) nxt_controller.c (208:7d1017bd0f6c)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_main.h>
9#include <nxt_runtime.h>
10#include <nxt_master_process.h>
11#include <nxt_conf.h>
12
13
14typedef struct {
15 nxt_conf_value_t *root;
16 nxt_mp_t *pool;
17} nxt_controller_conf_t;
18
19
20typedef struct {
21 nxt_http_request_parse_t parser;
22 size_t length;
23 nxt_controller_conf_t conf;
24 nxt_conn_t *conn;
25 nxt_queue_link_t link;
26} nxt_controller_request_t;
27
28
29typedef struct {
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_main.h>
9#include <nxt_runtime.h>
10#include <nxt_master_process.h>
11#include <nxt_conf.h>
12
13
14typedef struct {
15 nxt_conf_value_t *root;
16 nxt_mp_t *pool;
17} nxt_controller_conf_t;
18
19
20typedef struct {
21 nxt_http_request_parse_t parser;
22 size_t length;
23 nxt_controller_conf_t conf;
24 nxt_conn_t *conn;
25 nxt_queue_link_t link;
26} nxt_controller_request_t;
27
28
29typedef struct {
30 nxt_str_t status_line;
30 nxt_uint_t status;
31 nxt_conf_value_t *conf;
31 nxt_conf_value_t *conf;
32 nxt_str_t json;
32
33 u_char *title;
34 u_char *detail;
35 ssize_t offset;
36 nxt_uint_t line;
37 nxt_uint_t column;
33} nxt_controller_response_t;
34
35
36static void nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data);
37static void nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data);
38static nxt_msec_t nxt_controller_conn_timeout_value(nxt_conn_t *c,
39 uintptr_t data);
40static void nxt_controller_conn_read_error(nxt_task_t *task, void *obj,
41 void *data);
42static void nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj,
43 void *data);
44static void nxt_controller_conn_body_read(nxt_task_t *task, void *obj,
45 void *data);
46static void nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data);
47static void nxt_controller_conn_write_error(nxt_task_t *task, void *obj,
48 void *data);
49static void nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj,
50 void *data);
51static void nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data);
52static void nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data);
53
54static nxt_int_t nxt_controller_request_content_length(void *ctx,
55 nxt_http_field_t *field, nxt_log_t *log);
56
57static void nxt_controller_process_request(nxt_task_t *task,
58 nxt_controller_request_t *req);
59static nxt_int_t nxt_controller_conf_apply(nxt_task_t *task,
60 nxt_controller_request_t *req);
61static void nxt_controller_process_waiting(nxt_task_t *task);
62static nxt_int_t nxt_controller_conf_pass(nxt_task_t *task,
63 nxt_conf_value_t *conf);
64static void nxt_controller_response(nxt_task_t *task,
65 nxt_controller_request_t *req, nxt_controller_response_t *resp);
38} nxt_controller_response_t;
39
40
41static void nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data);
42static void nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data);
43static nxt_msec_t nxt_controller_conn_timeout_value(nxt_conn_t *c,
44 uintptr_t data);
45static void nxt_controller_conn_read_error(nxt_task_t *task, void *obj,
46 void *data);
47static void nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj,
48 void *data);
49static void nxt_controller_conn_body_read(nxt_task_t *task, void *obj,
50 void *data);
51static void nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data);
52static void nxt_controller_conn_write_error(nxt_task_t *task, void *obj,
53 void *data);
54static void nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj,
55 void *data);
56static void nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data);
57static void nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data);
58
59static nxt_int_t nxt_controller_request_content_length(void *ctx,
60 nxt_http_field_t *field, nxt_log_t *log);
61
62static void nxt_controller_process_request(nxt_task_t *task,
63 nxt_controller_request_t *req);
64static nxt_int_t nxt_controller_conf_apply(nxt_task_t *task,
65 nxt_controller_request_t *req);
66static void nxt_controller_process_waiting(nxt_task_t *task);
67static nxt_int_t nxt_controller_conf_pass(nxt_task_t *task,
68 nxt_conf_value_t *conf);
69static void nxt_controller_response(nxt_task_t *task,
70 nxt_controller_request_t *req, nxt_controller_response_t *resp);
66static nxt_buf_t *nxt_controller_response_body(nxt_controller_response_t *resp,
67 nxt_mp_t *pool);
71static u_char *nxt_controller_date(u_char *buf, nxt_realtime_t *now,
72 struct tm *tm, size_t size, const char *format);
68
69
70static nxt_http_fields_hash_entry_t nxt_controller_request_fields[] = {
71 { nxt_string("Content-Length"),
72 &nxt_controller_request_content_length, 0 },
73
74 { nxt_null_string, NULL, 0 }
75};
76
77static nxt_http_fields_hash_t *nxt_controller_fields_hash;
78
79
80static nxt_controller_conf_t nxt_controller_conf;
81static nxt_queue_t nxt_controller_waiting_requests;
82static nxt_controller_request_t *nxt_controller_current_request;
83
84
85static const nxt_event_conn_state_t nxt_controller_conn_read_state;
86static const nxt_event_conn_state_t nxt_controller_conn_body_read_state;
87static const nxt_event_conn_state_t nxt_controller_conn_write_state;
88static const nxt_event_conn_state_t nxt_controller_conn_close_state;
89
90
91nxt_int_t
92nxt_controller_start(nxt_task_t *task, void *data)
93{
94 nxt_mp_t *mp;
95 nxt_runtime_t *rt;
96 nxt_conf_value_t *conf;
97 nxt_http_fields_hash_t *hash;
98
99 static const nxt_str_t json
100 = nxt_string("{ \"listeners\": {}, \"applications\": {} }");
101
102 rt = task->thread->runtime;
103
104 hash = nxt_http_fields_hash_create(nxt_controller_request_fields,
105 rt->mem_pool);
106 if (nxt_slow_path(hash == NULL)) {
107 return NXT_ERROR;
108 }
109
110 nxt_controller_fields_hash = hash;
111
112 if (nxt_listen_event(task, rt->controller_socket) == NULL) {
113 return NXT_ERROR;
114 }
115
116 mp = nxt_mp_create(1024, 128, 256, 32);
117
118 if (nxt_slow_path(mp == NULL)) {
119 return NXT_ERROR;
120 }
121
122 conf = nxt_conf_json_parse_str(mp, &json);
123
124 if (conf == NULL) {
125 return NXT_ERROR;
126 }
127
128 nxt_controller_conf.root = conf;
129 nxt_controller_conf.pool = mp;
130
131 nxt_queue_init(&nxt_controller_waiting_requests);
132
133 return NXT_OK;
134}
135
136
137nxt_int_t
138nxt_runtime_controller_socket(nxt_task_t *task, nxt_runtime_t *rt)
139{
140 nxt_sockaddr_t *sa;
141 nxt_listen_socket_t *ls;
142
143 sa = rt->controller_listen;
144
145 if (rt->controller_listen == NULL) {
146 sa = nxt_sockaddr_alloc(rt->mem_pool, sizeof(struct sockaddr_in),
147 NXT_INET_ADDR_STR_LEN);
148 if (sa == NULL) {
149 return NXT_ERROR;
150 }
151
152 sa->type = SOCK_STREAM;
153 sa->u.sockaddr_in.sin_family = AF_INET;
154 sa->u.sockaddr_in.sin_port = htons(8443);
155
156 nxt_sockaddr_text(sa);
157
158 rt->controller_listen = sa;
159 }
160
161 ls = nxt_mp_alloc(rt->mem_pool, sizeof(nxt_listen_socket_t));
162 if (ls == NULL) {
163 return NXT_ERROR;
164 }
165
166 ls->sockaddr = nxt_sockaddr_create(rt->mem_pool, &sa->u.sockaddr,
167 sa->socklen, sa->length);
168 if (ls->sockaddr == NULL) {
169 return NXT_ERROR;
170 }
171
172 ls->sockaddr->type = sa->type;
173 ls->socklen = sa->socklen;
174 ls->address_length = sa->length;
175
176 nxt_sockaddr_text(ls->sockaddr);
177
178 ls->socket = -1;
179 ls->backlog = NXT_LISTEN_BACKLOG;
180 ls->read_after_accept = 1;
181 ls->flags = NXT_NONBLOCK;
182
183#if 0
184 /* STUB */
185 wq = nxt_mp_zget(cf->mem_pool, sizeof(nxt_work_queue_t));
186 if (wq == NULL) {
187 return NXT_ERROR;
188 }
189 nxt_work_queue_name(wq, "listen");
190 /**/
191
192 ls->work_queue = wq;
193#endif
194 ls->handler = nxt_controller_conn_init;
195
196 if (nxt_listen_socket_create(task, ls, 0) != NXT_OK) {
197 return NXT_ERROR;
198 }
199
200 rt->controller_socket = ls;
201
202 return NXT_OK;
203}
204
205
206static void
207nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data)
208{
209 nxt_buf_t *b;
210 nxt_conn_t *c;
211 nxt_event_engine_t *engine;
212 nxt_controller_request_t *r;
213
214 c = obj;
215
216 nxt_debug(task, "controller conn init fd:%d", c->socket.fd);
217
218 r = nxt_mp_zget(c->mem_pool, sizeof(nxt_controller_request_t));
219 if (nxt_slow_path(r == NULL)) {
220 nxt_controller_conn_free(task, c, NULL);
221 return;
222 }
223
224 r->conn = c;
225
226 if (nxt_slow_path(nxt_http_parse_request_init(&r->parser, c->mem_pool)
227 != NXT_OK))
228 {
229 nxt_controller_conn_free(task, c, NULL);
230 return;
231 }
232
233 r->parser.fields_hash = nxt_controller_fields_hash;
234
235 b = nxt_buf_mem_alloc(c->mem_pool, 1024, 0);
236 if (nxt_slow_path(b == NULL)) {
237 nxt_controller_conn_free(task, c, NULL);
238 return;
239 }
240
241 c->read = b;
242 c->socket.data = r;
243 c->socket.read_ready = 1;
244 c->read_state = &nxt_controller_conn_read_state;
245
246 engine = task->thread->engine;
247 c->read_work_queue = &engine->read_work_queue;
248 c->write_work_queue = &engine->write_work_queue;
249
250 nxt_conn_read(engine, c);
251}
252
253
254static const nxt_event_conn_state_t nxt_controller_conn_read_state
255 nxt_aligned(64) =
256{
257 .ready_handler = nxt_controller_conn_read,
258 .close_handler = nxt_controller_conn_close,
259 .error_handler = nxt_controller_conn_read_error,
260
261 .timer_handler = nxt_controller_conn_read_timeout,
262 .timer_value = nxt_controller_conn_timeout_value,
263 .timer_data = 60 * 1000,
264};
265
266
267static void
268nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data)
269{
270 size_t preread;
271 nxt_buf_t *b;
272 nxt_int_t rc;
273 nxt_conn_t *c;
274 nxt_controller_request_t *r;
275
276 c = obj;
277 r = data;
278
279 nxt_debug(task, "controller conn read");
280
281 nxt_queue_remove(&c->link);
282 nxt_queue_self(&c->link);
283
284 b = c->read;
285
286 rc = nxt_http_parse_request(&r->parser, &b->mem);
287
288 if (nxt_slow_path(rc != NXT_DONE)) {
289
290 if (rc == NXT_AGAIN) {
291 if (nxt_buf_mem_free_size(&b->mem) == 0) {
292 nxt_log(task, NXT_LOG_ERR, "too long request headers");
293 nxt_controller_conn_close(task, c, r);
294 return;
295 }
296
297 nxt_conn_read(task->thread->engine, c);
298 return;
299 }
300
301 /* rc == NXT_ERROR */
302
303 nxt_log(task, NXT_LOG_ERR, "parsing error");
304
305 nxt_controller_conn_close(task, c, r);
306 return;
307 }
308
309 rc = nxt_http_fields_process(r->parser.fields, r, task->log);
310
311 if (nxt_slow_path(rc != NXT_OK)) {
312 nxt_controller_conn_close(task, c, r);
313 return;
314 }
315
316 preread = nxt_buf_mem_used_size(&b->mem);
317
318 nxt_debug(task, "controller request header parsing complete, "
319 "body length: %uz, preread: %uz",
320 r->length, preread);
321
322 if (preread >= r->length) {
323 nxt_controller_process_request(task, r);
324 return;
325 }
326
327 if (r->length - preread > (size_t) nxt_buf_mem_free_size(&b->mem)) {
328 b = nxt_buf_mem_alloc(c->mem_pool, r->length, 0);
329 if (nxt_slow_path(b == NULL)) {
330 nxt_controller_conn_free(task, c, NULL);
331 return;
332 }
333
334 b->mem.free = nxt_cpymem(b->mem.free, c->read->mem.pos, preread);
335
336 c->read = b;
337 }
338
339 c->read_state = &nxt_controller_conn_body_read_state;
340
341 nxt_conn_read(task->thread->engine, c);
342}
343
344
345static nxt_msec_t
346nxt_controller_conn_timeout_value(nxt_conn_t *c, uintptr_t data)
347{
348 return (nxt_msec_t) data;
349}
350
351
352static void
353nxt_controller_conn_read_error(nxt_task_t *task, void *obj, void *data)
354{
355 nxt_conn_t *c;
356
357 c = obj;
358
359 nxt_debug(task, "controller conn read error");
360
361 nxt_controller_conn_close(task, c, data);
362}
363
364
365static void
366nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj, void *data)
367{
368 nxt_timer_t *timer;
369 nxt_conn_t *c;
370
371 timer = obj;
372
373 c = nxt_read_timer_conn(timer);
374 c->socket.timedout = 1;
375 c->socket.closed = 1;
376
377 nxt_debug(task, "controller conn read timeout");
378
379 nxt_controller_conn_close(task, c, data);
380}
381
382
383static const nxt_event_conn_state_t nxt_controller_conn_body_read_state
384 nxt_aligned(64) =
385{
386 .ready_handler = nxt_controller_conn_body_read,
387 .close_handler = nxt_controller_conn_close,
388 .error_handler = nxt_controller_conn_read_error,
389
390 .timer_handler = nxt_controller_conn_read_timeout,
391 .timer_value = nxt_controller_conn_timeout_value,
392 .timer_data = 60 * 1000,
393 .timer_autoreset = 1,
394};
395
396
397static void
398nxt_controller_conn_body_read(nxt_task_t *task, void *obj, void *data)
399{
400 size_t read;
401 nxt_buf_t *b;
402 nxt_conn_t *c;
403 nxt_controller_request_t *r;
404
405 c = obj;
406 r = data;
407 b = c->read;
408
409 read = nxt_buf_mem_used_size(&b->mem);
410
411 nxt_debug(task, "controller conn body read: %uz of %uz",
412 read, r->length);
413
414 if (read >= r->length) {
415 nxt_controller_process_request(task, r);
416 return;
417 }
418
419 nxt_conn_read(task->thread->engine, c);
420}
421
422
423static const nxt_event_conn_state_t nxt_controller_conn_write_state
424 nxt_aligned(64) =
425{
426 .ready_handler = nxt_controller_conn_write,
427 .error_handler = nxt_controller_conn_write_error,
428
429 .timer_handler = nxt_controller_conn_write_timeout,
430 .timer_value = nxt_controller_conn_timeout_value,
431 .timer_data = 60 * 1000,
432 .timer_autoreset = 1,
433};
434
435
436static void
437nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data)
438{
439 nxt_buf_t *b;
440 nxt_conn_t *c;
441
442 c = obj;
443
444 nxt_debug(task, "controller conn write");
445
446 b = c->write;
447
448 if (b->mem.pos != b->mem.free) {
449 nxt_conn_write(task->thread->engine, c);
450 return;
451 }
452
453 nxt_debug(task, "controller conn write complete");
454
455 nxt_controller_conn_close(task, c, data);
456}
457
458
459static void
460nxt_controller_conn_write_error(nxt_task_t *task, void *obj, void *data)
461{
462 nxt_conn_t *c;
463
464 c = obj;
465
466 nxt_debug(task, "controller conn write error");
467
468 nxt_controller_conn_close(task, c, data);
469}
470
471
472static void
473nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj, void *data)
474{
475 nxt_conn_t *c;
476 nxt_timer_t *timer;
477
478 timer = obj;
479
480 c = nxt_write_timer_conn(timer);
481 c->socket.timedout = 1;
482 c->socket.closed = 1;
483
484 nxt_debug(task, "controller conn write timeout");
485
486 nxt_controller_conn_close(task, c, data);
487}
488
489
490static const nxt_event_conn_state_t nxt_controller_conn_close_state
491 nxt_aligned(64) =
492{
493 .ready_handler = nxt_controller_conn_free,
494};
495
496
497static void
498nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data)
499{
500 nxt_conn_t *c;
501
502 c = obj;
503
504 nxt_debug(task, "controller conn close");
505
506 nxt_queue_remove(&c->link);
507
508 c->write_state = &nxt_controller_conn_close_state;
509
510 nxt_conn_close(task->thread->engine, c);
511}
512
513
514static void
515nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data)
516{
517 nxt_conn_t *c;
518
519 c = obj;
520
521 nxt_debug(task, "controller conn free");
522
523 nxt_mp_destroy(c->mem_pool);
524
525 //nxt_free(c);
526}
527
528
529static nxt_int_t
530nxt_controller_request_content_length(void *ctx, nxt_http_field_t *field,
531 nxt_log_t *log)
532{
533 off_t length;
534 nxt_controller_request_t *r;
535
536 r = ctx;
537
538 length = nxt_off_t_parse(field->value.start, field->value.length);
539
540 if (nxt_fast_path(length > 0)) {
541
542 if (nxt_slow_path(length > NXT_SIZE_T_MAX)) {
543 nxt_log_error(NXT_LOG_ERR, log, "Content-Length is too big");
544 return NXT_ERROR;
545 }
546
547 r->length = length;
548 return NXT_OK;
549 }
550
551 nxt_log_error(NXT_LOG_ERR, log, "Content-Length is invalid");
552
553 return NXT_ERROR;
554}
555
556
557static void
558nxt_controller_process_request(nxt_task_t *task, nxt_controller_request_t *req)
559{
560 nxt_mp_t *mp;
561 nxt_int_t rc;
562 nxt_str_t path;
563 nxt_conn_t *c;
73
74
75static nxt_http_fields_hash_entry_t nxt_controller_request_fields[] = {
76 { nxt_string("Content-Length"),
77 &nxt_controller_request_content_length, 0 },
78
79 { nxt_null_string, NULL, 0 }
80};
81
82static nxt_http_fields_hash_t *nxt_controller_fields_hash;
83
84
85static nxt_controller_conf_t nxt_controller_conf;
86static nxt_queue_t nxt_controller_waiting_requests;
87static nxt_controller_request_t *nxt_controller_current_request;
88
89
90static const nxt_event_conn_state_t nxt_controller_conn_read_state;
91static const nxt_event_conn_state_t nxt_controller_conn_body_read_state;
92static const nxt_event_conn_state_t nxt_controller_conn_write_state;
93static const nxt_event_conn_state_t nxt_controller_conn_close_state;
94
95
96nxt_int_t
97nxt_controller_start(nxt_task_t *task, void *data)
98{
99 nxt_mp_t *mp;
100 nxt_runtime_t *rt;
101 nxt_conf_value_t *conf;
102 nxt_http_fields_hash_t *hash;
103
104 static const nxt_str_t json
105 = nxt_string("{ \"listeners\": {}, \"applications\": {} }");
106
107 rt = task->thread->runtime;
108
109 hash = nxt_http_fields_hash_create(nxt_controller_request_fields,
110 rt->mem_pool);
111 if (nxt_slow_path(hash == NULL)) {
112 return NXT_ERROR;
113 }
114
115 nxt_controller_fields_hash = hash;
116
117 if (nxt_listen_event(task, rt->controller_socket) == NULL) {
118 return NXT_ERROR;
119 }
120
121 mp = nxt_mp_create(1024, 128, 256, 32);
122
123 if (nxt_slow_path(mp == NULL)) {
124 return NXT_ERROR;
125 }
126
127 conf = nxt_conf_json_parse_str(mp, &json);
128
129 if (conf == NULL) {
130 return NXT_ERROR;
131 }
132
133 nxt_controller_conf.root = conf;
134 nxt_controller_conf.pool = mp;
135
136 nxt_queue_init(&nxt_controller_waiting_requests);
137
138 return NXT_OK;
139}
140
141
142nxt_int_t
143nxt_runtime_controller_socket(nxt_task_t *task, nxt_runtime_t *rt)
144{
145 nxt_sockaddr_t *sa;
146 nxt_listen_socket_t *ls;
147
148 sa = rt->controller_listen;
149
150 if (rt->controller_listen == NULL) {
151 sa = nxt_sockaddr_alloc(rt->mem_pool, sizeof(struct sockaddr_in),
152 NXT_INET_ADDR_STR_LEN);
153 if (sa == NULL) {
154 return NXT_ERROR;
155 }
156
157 sa->type = SOCK_STREAM;
158 sa->u.sockaddr_in.sin_family = AF_INET;
159 sa->u.sockaddr_in.sin_port = htons(8443);
160
161 nxt_sockaddr_text(sa);
162
163 rt->controller_listen = sa;
164 }
165
166 ls = nxt_mp_alloc(rt->mem_pool, sizeof(nxt_listen_socket_t));
167 if (ls == NULL) {
168 return NXT_ERROR;
169 }
170
171 ls->sockaddr = nxt_sockaddr_create(rt->mem_pool, &sa->u.sockaddr,
172 sa->socklen, sa->length);
173 if (ls->sockaddr == NULL) {
174 return NXT_ERROR;
175 }
176
177 ls->sockaddr->type = sa->type;
178 ls->socklen = sa->socklen;
179 ls->address_length = sa->length;
180
181 nxt_sockaddr_text(ls->sockaddr);
182
183 ls->socket = -1;
184 ls->backlog = NXT_LISTEN_BACKLOG;
185 ls->read_after_accept = 1;
186 ls->flags = NXT_NONBLOCK;
187
188#if 0
189 /* STUB */
190 wq = nxt_mp_zget(cf->mem_pool, sizeof(nxt_work_queue_t));
191 if (wq == NULL) {
192 return NXT_ERROR;
193 }
194 nxt_work_queue_name(wq, "listen");
195 /**/
196
197 ls->work_queue = wq;
198#endif
199 ls->handler = nxt_controller_conn_init;
200
201 if (nxt_listen_socket_create(task, ls, 0) != NXT_OK) {
202 return NXT_ERROR;
203 }
204
205 rt->controller_socket = ls;
206
207 return NXT_OK;
208}
209
210
211static void
212nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data)
213{
214 nxt_buf_t *b;
215 nxt_conn_t *c;
216 nxt_event_engine_t *engine;
217 nxt_controller_request_t *r;
218
219 c = obj;
220
221 nxt_debug(task, "controller conn init fd:%d", c->socket.fd);
222
223 r = nxt_mp_zget(c->mem_pool, sizeof(nxt_controller_request_t));
224 if (nxt_slow_path(r == NULL)) {
225 nxt_controller_conn_free(task, c, NULL);
226 return;
227 }
228
229 r->conn = c;
230
231 if (nxt_slow_path(nxt_http_parse_request_init(&r->parser, c->mem_pool)
232 != NXT_OK))
233 {
234 nxt_controller_conn_free(task, c, NULL);
235 return;
236 }
237
238 r->parser.fields_hash = nxt_controller_fields_hash;
239
240 b = nxt_buf_mem_alloc(c->mem_pool, 1024, 0);
241 if (nxt_slow_path(b == NULL)) {
242 nxt_controller_conn_free(task, c, NULL);
243 return;
244 }
245
246 c->read = b;
247 c->socket.data = r;
248 c->socket.read_ready = 1;
249 c->read_state = &nxt_controller_conn_read_state;
250
251 engine = task->thread->engine;
252 c->read_work_queue = &engine->read_work_queue;
253 c->write_work_queue = &engine->write_work_queue;
254
255 nxt_conn_read(engine, c);
256}
257
258
259static const nxt_event_conn_state_t nxt_controller_conn_read_state
260 nxt_aligned(64) =
261{
262 .ready_handler = nxt_controller_conn_read,
263 .close_handler = nxt_controller_conn_close,
264 .error_handler = nxt_controller_conn_read_error,
265
266 .timer_handler = nxt_controller_conn_read_timeout,
267 .timer_value = nxt_controller_conn_timeout_value,
268 .timer_data = 60 * 1000,
269};
270
271
272static void
273nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data)
274{
275 size_t preread;
276 nxt_buf_t *b;
277 nxt_int_t rc;
278 nxt_conn_t *c;
279 nxt_controller_request_t *r;
280
281 c = obj;
282 r = data;
283
284 nxt_debug(task, "controller conn read");
285
286 nxt_queue_remove(&c->link);
287 nxt_queue_self(&c->link);
288
289 b = c->read;
290
291 rc = nxt_http_parse_request(&r->parser, &b->mem);
292
293 if (nxt_slow_path(rc != NXT_DONE)) {
294
295 if (rc == NXT_AGAIN) {
296 if (nxt_buf_mem_free_size(&b->mem) == 0) {
297 nxt_log(task, NXT_LOG_ERR, "too long request headers");
298 nxt_controller_conn_close(task, c, r);
299 return;
300 }
301
302 nxt_conn_read(task->thread->engine, c);
303 return;
304 }
305
306 /* rc == NXT_ERROR */
307
308 nxt_log(task, NXT_LOG_ERR, "parsing error");
309
310 nxt_controller_conn_close(task, c, r);
311 return;
312 }
313
314 rc = nxt_http_fields_process(r->parser.fields, r, task->log);
315
316 if (nxt_slow_path(rc != NXT_OK)) {
317 nxt_controller_conn_close(task, c, r);
318 return;
319 }
320
321 preread = nxt_buf_mem_used_size(&b->mem);
322
323 nxt_debug(task, "controller request header parsing complete, "
324 "body length: %uz, preread: %uz",
325 r->length, preread);
326
327 if (preread >= r->length) {
328 nxt_controller_process_request(task, r);
329 return;
330 }
331
332 if (r->length - preread > (size_t) nxt_buf_mem_free_size(&b->mem)) {
333 b = nxt_buf_mem_alloc(c->mem_pool, r->length, 0);
334 if (nxt_slow_path(b == NULL)) {
335 nxt_controller_conn_free(task, c, NULL);
336 return;
337 }
338
339 b->mem.free = nxt_cpymem(b->mem.free, c->read->mem.pos, preread);
340
341 c->read = b;
342 }
343
344 c->read_state = &nxt_controller_conn_body_read_state;
345
346 nxt_conn_read(task->thread->engine, c);
347}
348
349
350static nxt_msec_t
351nxt_controller_conn_timeout_value(nxt_conn_t *c, uintptr_t data)
352{
353 return (nxt_msec_t) data;
354}
355
356
357static void
358nxt_controller_conn_read_error(nxt_task_t *task, void *obj, void *data)
359{
360 nxt_conn_t *c;
361
362 c = obj;
363
364 nxt_debug(task, "controller conn read error");
365
366 nxt_controller_conn_close(task, c, data);
367}
368
369
370static void
371nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj, void *data)
372{
373 nxt_timer_t *timer;
374 nxt_conn_t *c;
375
376 timer = obj;
377
378 c = nxt_read_timer_conn(timer);
379 c->socket.timedout = 1;
380 c->socket.closed = 1;
381
382 nxt_debug(task, "controller conn read timeout");
383
384 nxt_controller_conn_close(task, c, data);
385}
386
387
388static const nxt_event_conn_state_t nxt_controller_conn_body_read_state
389 nxt_aligned(64) =
390{
391 .ready_handler = nxt_controller_conn_body_read,
392 .close_handler = nxt_controller_conn_close,
393 .error_handler = nxt_controller_conn_read_error,
394
395 .timer_handler = nxt_controller_conn_read_timeout,
396 .timer_value = nxt_controller_conn_timeout_value,
397 .timer_data = 60 * 1000,
398 .timer_autoreset = 1,
399};
400
401
402static void
403nxt_controller_conn_body_read(nxt_task_t *task, void *obj, void *data)
404{
405 size_t read;
406 nxt_buf_t *b;
407 nxt_conn_t *c;
408 nxt_controller_request_t *r;
409
410 c = obj;
411 r = data;
412 b = c->read;
413
414 read = nxt_buf_mem_used_size(&b->mem);
415
416 nxt_debug(task, "controller conn body read: %uz of %uz",
417 read, r->length);
418
419 if (read >= r->length) {
420 nxt_controller_process_request(task, r);
421 return;
422 }
423
424 nxt_conn_read(task->thread->engine, c);
425}
426
427
428static const nxt_event_conn_state_t nxt_controller_conn_write_state
429 nxt_aligned(64) =
430{
431 .ready_handler = nxt_controller_conn_write,
432 .error_handler = nxt_controller_conn_write_error,
433
434 .timer_handler = nxt_controller_conn_write_timeout,
435 .timer_value = nxt_controller_conn_timeout_value,
436 .timer_data = 60 * 1000,
437 .timer_autoreset = 1,
438};
439
440
441static void
442nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data)
443{
444 nxt_buf_t *b;
445 nxt_conn_t *c;
446
447 c = obj;
448
449 nxt_debug(task, "controller conn write");
450
451 b = c->write;
452
453 if (b->mem.pos != b->mem.free) {
454 nxt_conn_write(task->thread->engine, c);
455 return;
456 }
457
458 nxt_debug(task, "controller conn write complete");
459
460 nxt_controller_conn_close(task, c, data);
461}
462
463
464static void
465nxt_controller_conn_write_error(nxt_task_t *task, void *obj, void *data)
466{
467 nxt_conn_t *c;
468
469 c = obj;
470
471 nxt_debug(task, "controller conn write error");
472
473 nxt_controller_conn_close(task, c, data);
474}
475
476
477static void
478nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj, void *data)
479{
480 nxt_conn_t *c;
481 nxt_timer_t *timer;
482
483 timer = obj;
484
485 c = nxt_write_timer_conn(timer);
486 c->socket.timedout = 1;
487 c->socket.closed = 1;
488
489 nxt_debug(task, "controller conn write timeout");
490
491 nxt_controller_conn_close(task, c, data);
492}
493
494
495static const nxt_event_conn_state_t nxt_controller_conn_close_state
496 nxt_aligned(64) =
497{
498 .ready_handler = nxt_controller_conn_free,
499};
500
501
502static void
503nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data)
504{
505 nxt_conn_t *c;
506
507 c = obj;
508
509 nxt_debug(task, "controller conn close");
510
511 nxt_queue_remove(&c->link);
512
513 c->write_state = &nxt_controller_conn_close_state;
514
515 nxt_conn_close(task->thread->engine, c);
516}
517
518
519static void
520nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data)
521{
522 nxt_conn_t *c;
523
524 c = obj;
525
526 nxt_debug(task, "controller conn free");
527
528 nxt_mp_destroy(c->mem_pool);
529
530 //nxt_free(c);
531}
532
533
534static nxt_int_t
535nxt_controller_request_content_length(void *ctx, nxt_http_field_t *field,
536 nxt_log_t *log)
537{
538 off_t length;
539 nxt_controller_request_t *r;
540
541 r = ctx;
542
543 length = nxt_off_t_parse(field->value.start, field->value.length);
544
545 if (nxt_fast_path(length > 0)) {
546
547 if (nxt_slow_path(length > NXT_SIZE_T_MAX)) {
548 nxt_log_error(NXT_LOG_ERR, log, "Content-Length is too big");
549 return NXT_ERROR;
550 }
551
552 r->length = length;
553 return NXT_OK;
554 }
555
556 nxt_log_error(NXT_LOG_ERR, log, "Content-Length is invalid");
557
558 return NXT_ERROR;
559}
560
561
562static void
563nxt_controller_process_request(nxt_task_t *task, nxt_controller_request_t *req)
564{
565 nxt_mp_t *mp;
566 nxt_int_t rc;
567 nxt_str_t path;
568 nxt_conn_t *c;
564 nxt_uint_t status;
565 nxt_buf_mem_t *mbuf;
566 nxt_conf_op_t *ops;
567 nxt_conf_value_t *value;
569 nxt_buf_mem_t *mbuf;
570 nxt_conf_op_t *ops;
571 nxt_conf_value_t *value;
572 nxt_conf_json_error_t error;
568 nxt_controller_response_t resp;
569
570 static const nxt_str_t empty_obj = nxt_string("{}");
571
572 c = req->conn;
573 path = req->parser.path;
574
575 if (path.length > 1 && path.start[path.length - 1] == '/') {
576 path.length--;
577 }
578
579 nxt_memzero(&resp, sizeof(nxt_controller_response_t));
580
581 if (nxt_str_eq(&req->parser.method, "GET", 3)) {
582
583 value = nxt_conf_get_path(nxt_controller_conf.root, &path);
584
585 if (value == NULL) {
573 nxt_controller_response_t resp;
574
575 static const nxt_str_t empty_obj = nxt_string("{}");
576
577 c = req->conn;
578 path = req->parser.path;
579
580 if (path.length > 1 && path.start[path.length - 1] == '/') {
581 path.length--;
582 }
583
584 nxt_memzero(&resp, sizeof(nxt_controller_response_t));
585
586 if (nxt_str_eq(&req->parser.method, "GET", 3)) {
587
588 value = nxt_conf_get_path(nxt_controller_conf.root, &path);
589
590 if (value == NULL) {
586 status = 404;
587 goto done;
591 goto not_found;
588 }
589
592 }
593
594 resp.status = 200;
590 resp.conf = value;
591
595 resp.conf = value;
596
592 status = 200;
593 goto done;
597 nxt_controller_response(task, req, &resp);
598 return;
594 }
595
596 if (nxt_str_eq(&req->parser.method, "PUT", 3)) {
597
598 mp = nxt_mp_create(1024, 128, 256, 32);
599
600 if (nxt_slow_path(mp == NULL)) {
599 }
600
601 if (nxt_str_eq(&req->parser.method, "PUT", 3)) {
602
603 mp = nxt_mp_create(1024, 128, 256, 32);
604
605 if (nxt_slow_path(mp == NULL)) {
601 status = 500;
602 goto done;
606 goto alloc_fail;
603 }
604
605 mbuf = &c->read->mem;
606
607 }
608
609 mbuf = &c->read->mem;
610
607 value = nxt_conf_json_parse(mp, mbuf->pos, mbuf->free);
611 nxt_memzero(&error, sizeof(nxt_conf_json_error_t));
608
612
613 value = nxt_conf_json_parse(mp, mbuf->pos, mbuf->free, &error);
614
609 if (value == NULL) {
610 nxt_mp_destroy(mp);
615 if (value == NULL) {
616 nxt_mp_destroy(mp);
611 status = 400;
612 nxt_str_set(&resp.json, "{ \"error\": \"Invalid JSON.\" }");
613 goto done;
617
618 if (error.pos == NULL) {
619 goto alloc_fail;
620 }
621
622 resp.status = 400;
623 resp.title = (u_char *) "Invalid JSON.";
624 resp.detail = error.detail;
625 resp.offset = error.pos - mbuf->pos;
626
627 nxt_conf_json_position(mbuf->pos, error.pos,
628 &resp.line, &resp.column);
629
630 nxt_controller_response(task, req, &resp);
631 return;
614 }
615
616 if (path.length != 1) {
617 rc = nxt_conf_op_compile(c->mem_pool, &ops,
618 nxt_controller_conf.root,
619 &path, value);
620
621 if (rc != NXT_OK) {
622 if (rc == NXT_DECLINED) {
632 }
633
634 if (path.length != 1) {
635 rc = nxt_conf_op_compile(c->mem_pool, &ops,
636 nxt_controller_conf.root,
637 &path, value);
638
639 if (rc != NXT_OK) {
640 if (rc == NXT_DECLINED) {
623 status = 404;
624 goto done;
641 goto not_found;
625 }
626
642 }
643
627 status = 500;
628 goto done;
644 goto alloc_fail;
629 }
630
631 value = nxt_conf_clone(mp, ops, nxt_controller_conf.root);
632
633 if (nxt_slow_path(value == NULL)) {
634 nxt_mp_destroy(mp);
645 }
646
647 value = nxt_conf_clone(mp, ops, nxt_controller_conf.root);
648
649 if (nxt_slow_path(value == NULL)) {
650 nxt_mp_destroy(mp);
635 status = 500;
636 goto done;
651 goto alloc_fail;
637 }
638 }
639
640 if (nxt_slow_path(nxt_conf_validate(value) != NXT_OK)) {
641 nxt_mp_destroy(mp);
652 }
653 }
654
655 if (nxt_slow_path(nxt_conf_validate(value) != NXT_OK)) {
656 nxt_mp_destroy(mp);
642 status = 400;
643 nxt_str_set(&resp.json,
644 "{ \"error\": \"Invalid configuration.\" }");
645 goto done;
657 goto invalid_conf;
646 }
647
648 req->conf.root = value;
649 req->conf.pool = mp;
650
651 if (nxt_controller_conf_apply(task, req) != NXT_OK) {
652 nxt_mp_destroy(mp);
658 }
659
660 req->conf.root = value;
661 req->conf.pool = mp;
662
663 if (nxt_controller_conf_apply(task, req) != NXT_OK) {
664 nxt_mp_destroy(mp);
653 status = 500;
654 goto done;
665 goto alloc_fail;
655 }
656
657 return;
658 }
659
660 if (nxt_str_eq(&req->parser.method, "DELETE", 6)) {
661
662 if (path.length == 1) {
663 mp = nxt_mp_create(1024, 128, 256, 32);
664
665 if (nxt_slow_path(mp == NULL)) {
666 }
667
668 return;
669 }
670
671 if (nxt_str_eq(&req->parser.method, "DELETE", 6)) {
672
673 if (path.length == 1) {
674 mp = nxt_mp_create(1024, 128, 256, 32);
675
676 if (nxt_slow_path(mp == NULL)) {
666 status = 500;
667 goto done;
677 goto alloc_fail;
668 }
669
670 value = nxt_conf_json_parse_str(mp, &empty_obj);
671
672 } else {
673 rc = nxt_conf_op_compile(c->mem_pool, &ops,
674 nxt_controller_conf.root,
675 &path, NULL);
676
677 if (rc != NXT_OK) {
678 if (rc == NXT_DECLINED) {
678 }
679
680 value = nxt_conf_json_parse_str(mp, &empty_obj);
681
682 } else {
683 rc = nxt_conf_op_compile(c->mem_pool, &ops,
684 nxt_controller_conf.root,
685 &path, NULL);
686
687 if (rc != NXT_OK) {
688 if (rc == NXT_DECLINED) {
679 status = 404;
680 goto done;
689 goto not_found;
681 }
682
690 }
691
683 status = 500;
684 goto done;
692 goto alloc_fail;
685 }
686
687 mp = nxt_mp_create(1024, 128, 256, 32);
688
689 if (nxt_slow_path(mp == NULL)) {
693 }
694
695 mp = nxt_mp_create(1024, 128, 256, 32);
696
697 if (nxt_slow_path(mp == NULL)) {
690 status = 500;
691 goto done;
698 goto alloc_fail;
692 }
693
694 value = nxt_conf_clone(mp, ops, nxt_controller_conf.root);
695 }
696
697 if (nxt_slow_path(value == NULL)) {
698 nxt_mp_destroy(mp);
699 }
700
701 value = nxt_conf_clone(mp, ops, nxt_controller_conf.root);
702 }
703
704 if (nxt_slow_path(value == NULL)) {
705 nxt_mp_destroy(mp);
699 status = 500;
700 goto done;
706 goto alloc_fail;
701 }
702
703 if (nxt_slow_path(nxt_conf_validate(value) != NXT_OK)) {
704 nxt_mp_destroy(mp);
707 }
708
709 if (nxt_slow_path(nxt_conf_validate(value) != NXT_OK)) {
710 nxt_mp_destroy(mp);
705 status = 400;
706 nxt_str_set(&resp.json,
707 "{ \"error\": \"Invalid configuration.\" }");
708 goto done;
711 goto invalid_conf;
709 }
710
711 req->conf.root = value;
712 req->conf.pool = mp;
713
714 if (nxt_controller_conf_apply(task, req) != NXT_OK) {
715 nxt_mp_destroy(mp);
712 }
713
714 req->conf.root = value;
715 req->conf.pool = mp;
716
717 if (nxt_controller_conf_apply(task, req) != NXT_OK) {
718 nxt_mp_destroy(mp);
716 status = 500;
717 goto done;
719 goto alloc_fail;
718 }
719
720 return;
721 }
722
720 }
721
722 return;
723 }
724
723 status = 405;
725 resp.status = 405;
726 resp.title = (u_char *) "Invalid method.";
727 resp.offset = -1;
724
728
725done:
729 nxt_controller_response(task, req, &resp);
730 return;
726
731
727 switch (status) {
732alloc_fail:
728
733
729 case 200:
730 nxt_str_set(&resp.status_line, "200 OK");
731 break;
734 resp.status = 500;
735 resp.title = (u_char *) "Memory allocation failed.";
736 resp.offset = -1;
732
737
733 case 400:
734 nxt_str_set(&resp.status_line, "400 Bad Request");
735 break;
738 nxt_controller_response(task, req, &resp);
739 return;
736
740
737 case 404:
738 nxt_str_set(&resp.status_line, "404 Not Found");
739 nxt_str_set(&resp.json, "{ \"error\": \"Value doesn't exist.\" }");
740 break;
741not_found:
741
742
742 case 405:
743 nxt_str_set(&resp.status_line, "405 Method Not Allowed");
744 nxt_str_set(&resp.json, "{ \"error\": \"Invalid method.\" }");
745 break;
743 resp.status = 404;
744 resp.title = (u_char *) "Value doesn't exist.";
745 resp.offset = -1;
746
746
747 case 500:
748 nxt_str_set(&resp.status_line, "500 Internal Server Error");
749 nxt_str_set(&resp.json, "{ \"error\": \"Memory allocation failed.\" }");
750 break;
751 }
747 nxt_controller_response(task, req, &resp);
748 return;
752
749
750invalid_conf:
751
752 resp.status = 400;
753 resp.title = (u_char *) "Invalid configuration.";
754 resp.offset = -1;
755
753 nxt_controller_response(task, req, &resp);
756 nxt_controller_response(task, req, &resp);
757 return;
754}
755
756
757static nxt_int_t
758nxt_controller_conf_apply(nxt_task_t *task, nxt_controller_request_t *req)
759{
760 nxt_int_t rc;
761
762 if (nxt_controller_current_request != NULL) {
763 nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link);
764 return NXT_OK;
765 }
766
767 rc = nxt_controller_conf_pass(task, req->conf.root);
768
769 if (nxt_slow_path(rc != NXT_OK)) {
770 return NXT_ERROR;
771 }
772
773 nxt_controller_current_request = req;
774
775 return NXT_OK;
776}
777
778
779static void
780nxt_controller_conf_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
781 void *data)
782{
783 nxt_controller_request_t *req;
784 nxt_controller_response_t resp;
785
786 nxt_debug(task, "controller conf ready: %*s",
787 nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos);
788
789 nxt_memzero(&resp, sizeof(nxt_controller_response_t));
790
791 req = nxt_controller_current_request;
792 nxt_controller_current_request = NULL;
793
794 if (msg->port_msg.type == NXT_PORT_MSG_RPC_READY) {
795 nxt_mp_destroy(nxt_controller_conf.pool);
796
797 nxt_controller_conf = req->conf;
798
758}
759
760
761static nxt_int_t
762nxt_controller_conf_apply(nxt_task_t *task, nxt_controller_request_t *req)
763{
764 nxt_int_t rc;
765
766 if (nxt_controller_current_request != NULL) {
767 nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link);
768 return NXT_OK;
769 }
770
771 rc = nxt_controller_conf_pass(task, req->conf.root);
772
773 if (nxt_slow_path(rc != NXT_OK)) {
774 return NXT_ERROR;
775 }
776
777 nxt_controller_current_request = req;
778
779 return NXT_OK;
780}
781
782
783static void
784nxt_controller_conf_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
785 void *data)
786{
787 nxt_controller_request_t *req;
788 nxt_controller_response_t resp;
789
790 nxt_debug(task, "controller conf ready: %*s",
791 nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos);
792
793 nxt_memzero(&resp, sizeof(nxt_controller_response_t));
794
795 req = nxt_controller_current_request;
796 nxt_controller_current_request = NULL;
797
798 if (msg->port_msg.type == NXT_PORT_MSG_RPC_READY) {
799 nxt_mp_destroy(nxt_controller_conf.pool);
800
801 nxt_controller_conf = req->conf;
802
799 nxt_str_set(&resp.status_line, "200 OK");
800 nxt_str_set(&resp.json, "{ \"success\": \"Reconfiguration done.\" }");
803 resp.status = 200;
804 resp.title = (u_char *) "Reconfiguration done.";
801
802 } else {
803 nxt_mp_destroy(req->conf.pool);
804
805
806 } else {
807 nxt_mp_destroy(req->conf.pool);
808
805 nxt_str_set(&resp.status_line, "500 Internal Server Error");
806 nxt_str_set(&resp.json,
807 "{ \"error\": \"Failed to apply new configuration.\" }");
809 resp.status = 500;
810 resp.title = (u_char *) "Failed to apply new configuration.";
811 resp.offset = -1;
808 }
809
810 nxt_controller_response(task, req, &resp);
811
812 nxt_controller_process_waiting(task);
813}
814
815
816static void
817nxt_controller_process_waiting(nxt_task_t *task)
818{
819 nxt_controller_request_t *req;
820 nxt_controller_response_t resp;
821
822 nxt_queue_each(req, &nxt_controller_waiting_requests,
823 nxt_controller_request_t, link)
824 {
825 nxt_queue_remove(&req->link);
826
827 if (nxt_fast_path(nxt_controller_conf_apply(task, req) == NXT_OK)) {
828 return;
829 }
830
831 nxt_mp_destroy(req->conf.pool);
832
812 }
813
814 nxt_controller_response(task, req, &resp);
815
816 nxt_controller_process_waiting(task);
817}
818
819
820static void
821nxt_controller_process_waiting(nxt_task_t *task)
822{
823 nxt_controller_request_t *req;
824 nxt_controller_response_t resp;
825
826 nxt_queue_each(req, &nxt_controller_waiting_requests,
827 nxt_controller_request_t, link)
828 {
829 nxt_queue_remove(&req->link);
830
831 if (nxt_fast_path(nxt_controller_conf_apply(task, req) == NXT_OK)) {
832 return;
833 }
834
835 nxt_mp_destroy(req->conf.pool);
836
833 nxt_str_set(&resp.status_line, "500 Internal Server Error");
834 nxt_str_set(&resp.json,
835 "{ \"error\": \"Memory allocation failed.\" }");
837 nxt_memzero(&resp, sizeof(nxt_controller_response_t));
836
838
839 resp.status = 500;
840 resp.title = (u_char *) "Memory allocation failed.";
841 resp.offset = -1;
842
837 nxt_controller_response(task, req, &resp);
838
839 } nxt_queue_loop;
840}
841
842
843static nxt_int_t
844nxt_controller_conf_pass(nxt_task_t *task, nxt_conf_value_t *conf)
845{
846 size_t size;
847 uint32_t stream;
848 nxt_int_t rc;
849 nxt_buf_t *b;
850 nxt_port_t *router_port, *controller_port;
851 nxt_runtime_t *rt;
852
853 rt = task->thread->runtime;
854
855 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
856 controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
857
858 size = nxt_conf_json_length(conf, NULL);
859
860 b = nxt_port_mmap_get_buf(task, router_port, size);
861
862 b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL);
863
864 stream = nxt_port_rpc_register_handler(task, controller_port,
865 nxt_controller_conf_handler,
866 nxt_controller_conf_handler,
867 router_port->pid, NULL);
868
869 rc = nxt_port_socket_write(task, router_port, NXT_PORT_MSG_DATA_LAST, -1,
870 stream, controller_port->id, b);
871
872 if (nxt_slow_path(rc != NXT_OK)) {
873 nxt_port_rpc_cancel(task, controller_port, stream);
874 }
875
876 return rc;
877}
878
879
843 nxt_controller_response(task, req, &resp);
844
845 } nxt_queue_loop;
846}
847
848
849static nxt_int_t
850nxt_controller_conf_pass(nxt_task_t *task, nxt_conf_value_t *conf)
851{
852 size_t size;
853 uint32_t stream;
854 nxt_int_t rc;
855 nxt_buf_t *b;
856 nxt_port_t *router_port, *controller_port;
857 nxt_runtime_t *rt;
858
859 rt = task->thread->runtime;
860
861 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
862 controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
863
864 size = nxt_conf_json_length(conf, NULL);
865
866 b = nxt_port_mmap_get_buf(task, router_port, size);
867
868 b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL);
869
870 stream = nxt_port_rpc_register_handler(task, controller_port,
871 nxt_controller_conf_handler,
872 nxt_controller_conf_handler,
873 router_port->pid, NULL);
874
875 rc = nxt_port_socket_write(task, router_port, NXT_PORT_MSG_DATA_LAST, -1,
876 stream, controller_port->id, b);
877
878 if (nxt_slow_path(rc != NXT_OK)) {
879 nxt_port_rpc_cancel(task, controller_port, stream);
880 }
881
882 return rc;
883}
884
885
880
881static void
882nxt_controller_response(nxt_task_t *task, nxt_controller_request_t *req,
883 nxt_controller_response_t *resp)
884{
886static void
887nxt_controller_response(nxt_task_t *task, nxt_controller_request_t *req,
888 nxt_controller_response_t *resp)
889{
885 size_t size;
886 nxt_buf_t *b;
887 nxt_conn_t *c;
890 size_t size;
891 nxt_str_t status_line, str;
892 nxt_buf_t *b, *body;
893 nxt_conn_t *c;
894 nxt_uint_t n;
895 nxt_conf_value_t *value, *location;
896 nxt_conf_json_pretty_t pretty;
888
897
889 c = req->conn;
898 static nxt_str_t success_str = nxt_string("success");
899 static nxt_str_t error_str = nxt_string("error");
900 static nxt_str_t detail_str = nxt_string("detail");
901 static nxt_str_t location_str = nxt_string("location");
902 static nxt_str_t offset_str = nxt_string("offset");
903 static nxt_str_t line_str = nxt_string("line");
904 static nxt_str_t column_str = nxt_string("column");
890
905
891 size = sizeof("HTTP/1.0 " "\r\n\r\n") - 1 + resp->status_line.length;
906 static nxt_time_string_t date_cache = {
907 (nxt_atomic_uint_t) -1,
908 nxt_controller_date,
909 "%s, %02d %s %4d %02d:%02d:%02d GMT",
910 sizeof("Wed, 31 Dec 1986 16:40:00 GMT") - 1,
911 NXT_THREAD_TIME_GMT,
912 NXT_THREAD_TIME_SEC,
913 };
892
914
893 b = nxt_buf_mem_alloc(c->mem_pool, size, 0);
894 if (nxt_slow_path(b == NULL)) {
895 nxt_controller_conn_close(task, c, req);
896 return;
897 }
915 switch (resp->status) {
898
916
899 b->mem.free = nxt_cpymem(b->mem.free, "HTTP/1.0 ", sizeof("HTTP/1.0 ") - 1);
900 b->mem.free = nxt_cpymem(b->mem.free, resp->status_line.start,
901 resp->status_line.length);
917 case 200:
918 nxt_str_set(&status_line, "200 OK");
919 break;
902
920
903 b->mem.free = nxt_cpymem(b->mem.free, "\r\n\r\n", sizeof("\r\n\r\n") - 1);
921 case 400:
922 nxt_str_set(&status_line, "400 Bad Request");
923 break;
904
924
905 b->next = nxt_controller_response_body(resp, c->mem_pool);
925 case 404:
926 nxt_str_set(&status_line, "404 Not Found");
927 break;
906
928
907 if (nxt_slow_path(b->next == NULL)) {
908 nxt_controller_conn_close(task, c, req);
909 return;
929 case 405:
930 nxt_str_set(&status_line, "405 Method Not Allowed");
931 break;
932
933 case 500:
934 nxt_str_set(&status_line, "500 Internal Server Error");
935 break;
910 }
911
936 }
937
912 c->write = b;
913 c->write_state = &nxt_controller_conn_write_state;
938 c = req->conn;
939 value = resp->conf;
914
940
915 nxt_conn_write(task->thread->engine, c);
916}
941 if (value == NULL) {
942 n = 1
943 + (resp->detail != NULL)
944 + (resp->status >= 400 && resp->offset != -1);
917
945
946 value = nxt_conf_create_object(c->mem_pool, n);
918
947
919static nxt_buf_t *
920nxt_controller_response_body(nxt_controller_response_t *resp, nxt_mp_t *pool)
921{
922 size_t size;
923 nxt_buf_t *b;
924 nxt_conf_value_t *value;
925 nxt_conf_json_pretty_t pretty;
948 if (nxt_slow_path(value == NULL)) {
949 nxt_controller_conn_close(task, c, req);
950 return;
951 }
926
952
927 if (resp->conf) {
928 value = resp->conf;
953 str.length = nxt_strlen(resp->title);
954 str.start = resp->title;
929
955
930 } else {
931 value = nxt_conf_json_parse_str(pool, &resp->json);
956 if (resp->status < 400) {
957 nxt_conf_set_member_string(value, &success_str, &str, 0);
932
958
933 if (nxt_slow_path(value == NULL)) {
934 return NULL;
959 } else {
960 nxt_conf_set_member_string(value, &error_str, &str, 0);
935 }
961 }
962
963 n = 0;
964
965 if (resp->detail != NULL) {
966 str.length = nxt_strlen(resp->detail);
967 str.start = resp->detail;
968
969 n++;
970
971 nxt_conf_set_member_string(value, &detail_str, &str, n);
972 }
973
974 if (resp->status >= 400 && resp->offset != -1) {
975 n++;
976
977 location = nxt_conf_create_object(c->mem_pool,
978 resp->line != 0 ? 3 : 1);
979
980 nxt_conf_set_member(value, &location_str, location, n);
981
982 nxt_conf_set_member_integer(location, &offset_str, resp->offset, 0);
983
984 if (resp->line != 0) {
985 nxt_conf_set_member_integer(location, &line_str,
986 resp->line, 1);
987
988 nxt_conf_set_member_integer(location, &column_str,
989 resp->column, 2);
990 }
991 }
936 }
937
938 nxt_memzero(&pretty, sizeof(nxt_conf_json_pretty_t));
939
940 size = nxt_conf_json_length(value, &pretty) + 2;
941
992 }
993
994 nxt_memzero(&pretty, sizeof(nxt_conf_json_pretty_t));
995
996 size = nxt_conf_json_length(value, &pretty) + 2;
997
942 b = nxt_buf_mem_alloc(pool, size, 0);
943 if (nxt_slow_path(b == NULL)) {
944 return NULL;
998 body = nxt_buf_mem_alloc(c->mem_pool, size, 0);
999 if (nxt_slow_path(body == NULL)) {
1000 nxt_controller_conn_close(task, c, req);
1001 return;
945 }
946
947 nxt_memzero(&pretty, sizeof(nxt_conf_json_pretty_t));
948
1002 }
1003
1004 nxt_memzero(&pretty, sizeof(nxt_conf_json_pretty_t));
1005
949 b->mem.free = nxt_conf_json_print(b->mem.free, value, &pretty);
1006 body->mem.free = nxt_conf_json_print(body->mem.free, value, &pretty);
950
1007
951 *b->mem.free++ = '\r';
952 *b->mem.free++ = '\n';
1008 body->mem.free = nxt_cpymem(body->mem.free, "\r\n", 2);
953
1009
954 return b;
1010 size = sizeof("HTTP/1.1 " "\r\n") - 1 + status_line.length
1011 + sizeof("Server: nginext/0.1\r\n") - 1
1012 + sizeof("Date: Wed, 31 Dec 1986 16:40:00 GMT\r\n") - 1
1013 + sizeof("Content-Type: application/json\r\n") - 1
1014 + sizeof("Content-Length: " "\r\n") - 1 + NXT_SIZE_T_LEN
1015 + sizeof("Connection: close\r\n") - 1
1016 + sizeof("\r\n") - 1;
1017
1018 b = nxt_buf_mem_alloc(c->mem_pool, size, 0);
1019 if (nxt_slow_path(b == NULL)) {
1020 nxt_controller_conn_close(task, c, req);
1021 return;
1022 }
1023
1024 b->next = body;
1025
1026 nxt_str_set(&str, "HTTP/1.1 ");
1027
1028 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length);
1029 b->mem.free = nxt_cpymem(b->mem.free, status_line.start,
1030 status_line.length);
1031
1032 nxt_str_set(&str, "\r\n"
1033 "Server: nginext/0.1\r\n"
1034 "Date: ");
1035
1036 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length);
1037
1038 b->mem.free = nxt_thread_time_string(task->thread, &date_cache,
1039 b->mem.free);
1040
1041 nxt_str_set(&str, "\r\n"
1042 "Content-Type: application/json\r\n"
1043 "Content-Length: ");
1044
1045 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length);
1046
1047 b->mem.free = nxt_sprintf(b->mem.free, b->mem.end, "%uz",
1048 nxt_buf_mem_used_size(&body->mem));
1049
1050 nxt_str_set(&str, "\r\n"
1051 "Connection: close\r\n"
1052 "\r\n");
1053
1054 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length);
1055
1056 c->write = b;
1057 c->write_state = &nxt_controller_conn_write_state;
1058
1059 nxt_conn_write(task->thread->engine, c);
955}
1060}
1061
1062
1063static u_char *
1064nxt_controller_date(u_char *buf, nxt_realtime_t *now, struct tm *tm,
1065 size_t size, const char *format)
1066{
1067 static const char *week[] = { "Sun", "Mon", "Tue", "Wed", "Thu", "Fri",
1068 "Sat" };
1069
1070 static const char *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun",
1071 "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
1072
1073 return nxt_sprintf(buf, buf + size, format,
1074 week[tm->tm_wday], tm->tm_mday,
1075 month[tm->tm_mon], tm->tm_year + 1900,
1076 tm->tm_hour, tm->tm_min, tm->tm_sec);
1077}