nxt_controller.c (386:d9e23ae1617d) nxt_controller.c (417:47366bb40f2c)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_main.h>
9#include <nxt_runtime.h>
10#include <nxt_main_process.h>
11#include <nxt_conf.h>
12
13
14typedef struct {
15 nxt_conf_value_t *root;
16 nxt_mp_t *pool;
17} nxt_controller_conf_t;
18
19
20typedef struct {
21 nxt_http_request_parse_t parser;
22 size_t length;
23 nxt_controller_conf_t conf;
24 nxt_conn_t *conn;
25 nxt_queue_link_t link;
26} nxt_controller_request_t;
27
28
29typedef struct {
30 nxt_uint_t status;
31 nxt_conf_value_t *conf;
32
33 u_char *title;
34 nxt_str_t detail;
35 ssize_t offset;
36 nxt_uint_t line;
37 nxt_uint_t column;
38} nxt_controller_response_t;
39
40
41static void nxt_controller_process_new_port_handler(nxt_task_t *task,
42 nxt_port_recv_msg_t *msg);
43static nxt_int_t nxt_controller_conf_default(void);
44static void nxt_controller_conf_init_handler(nxt_task_t *task,
45 nxt_port_recv_msg_t *msg, void *data);
46static nxt_int_t nxt_controller_conf_send(nxt_task_t *task,
47 nxt_conf_value_t *conf, nxt_port_rpc_handler_t handler, void *data);
48
49static void nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data);
50static void nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data);
51static nxt_msec_t nxt_controller_conn_timeout_value(nxt_conn_t *c,
52 uintptr_t data);
53static void nxt_controller_conn_read_error(nxt_task_t *task, void *obj,
54 void *data);
55static void nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj,
56 void *data);
57static void nxt_controller_conn_body_read(nxt_task_t *task, void *obj,
58 void *data);
59static void nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data);
60static void nxt_controller_conn_write_error(nxt_task_t *task, void *obj,
61 void *data);
62static void nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj,
63 void *data);
64static void nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data);
65static void nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data);
66
67static nxt_int_t nxt_controller_request_content_length(void *ctx,
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_main.h>
9#include <nxt_runtime.h>
10#include <nxt_main_process.h>
11#include <nxt_conf.h>
12
13
14typedef struct {
15 nxt_conf_value_t *root;
16 nxt_mp_t *pool;
17} nxt_controller_conf_t;
18
19
20typedef struct {
21 nxt_http_request_parse_t parser;
22 size_t length;
23 nxt_controller_conf_t conf;
24 nxt_conn_t *conn;
25 nxt_queue_link_t link;
26} nxt_controller_request_t;
27
28
29typedef struct {
30 nxt_uint_t status;
31 nxt_conf_value_t *conf;
32
33 u_char *title;
34 nxt_str_t detail;
35 ssize_t offset;
36 nxt_uint_t line;
37 nxt_uint_t column;
38} nxt_controller_response_t;
39
40
41static void nxt_controller_process_new_port_handler(nxt_task_t *task,
42 nxt_port_recv_msg_t *msg);
43static nxt_int_t nxt_controller_conf_default(void);
44static void nxt_controller_conf_init_handler(nxt_task_t *task,
45 nxt_port_recv_msg_t *msg, void *data);
46static nxt_int_t nxt_controller_conf_send(nxt_task_t *task,
47 nxt_conf_value_t *conf, nxt_port_rpc_handler_t handler, void *data);
48
49static void nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data);
50static void nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data);
51static nxt_msec_t nxt_controller_conn_timeout_value(nxt_conn_t *c,
52 uintptr_t data);
53static void nxt_controller_conn_read_error(nxt_task_t *task, void *obj,
54 void *data);
55static void nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj,
56 void *data);
57static void nxt_controller_conn_body_read(nxt_task_t *task, void *obj,
58 void *data);
59static void nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data);
60static void nxt_controller_conn_write_error(nxt_task_t *task, void *obj,
61 void *data);
62static void nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj,
63 void *data);
64static void nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data);
65static void nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data);
66
67static nxt_int_t nxt_controller_request_content_length(void *ctx,
68 nxt_http_field_t *field, nxt_log_t *log);
68 nxt_http_field_t *field, uintptr_t data);
69
70static void nxt_controller_process_request(nxt_task_t *task,
71 nxt_controller_request_t *req);
72static void nxt_controller_conf_handler(nxt_task_t *task,
73 nxt_port_recv_msg_t *msg, void *data);
74static void nxt_controller_conf_store(nxt_task_t *task,
75 nxt_conf_value_t *conf);
76static void nxt_controller_response(nxt_task_t *task,
77 nxt_controller_request_t *req, nxt_controller_response_t *resp);
78static u_char *nxt_controller_date(u_char *buf, nxt_realtime_t *now,
79 struct tm *tm, size_t size, const char *format);
80
81
69
70static void nxt_controller_process_request(nxt_task_t *task,
71 nxt_controller_request_t *req);
72static void nxt_controller_conf_handler(nxt_task_t *task,
73 nxt_port_recv_msg_t *msg, void *data);
74static void nxt_controller_conf_store(nxt_task_t *task,
75 nxt_conf_value_t *conf);
76static void nxt_controller_response(nxt_task_t *task,
77 nxt_controller_request_t *req, nxt_controller_response_t *resp);
78static u_char *nxt_controller_date(u_char *buf, nxt_realtime_t *now,
79 struct tm *tm, size_t size, const char *format);
80
81
82static nxt_http_fields_hash_entry_t nxt_controller_request_fields[] = {
82static nxt_http_field_proc_t nxt_controller_request_fields[] = {
83 { nxt_string("Content-Length"),
84 &nxt_controller_request_content_length, 0 },
83 { nxt_string("Content-Length"),
84 &nxt_controller_request_content_length, 0 },
85
86 { nxt_null_string, NULL, 0 }
87};
88
85};
86
89static nxt_http_fields_hash_t *nxt_controller_fields_hash;
87static nxt_lvlhsh_t nxt_controller_fields_hash;
90
91static nxt_uint_t nxt_controller_listening;
92static nxt_controller_conf_t nxt_controller_conf;
93static nxt_queue_t nxt_controller_waiting_requests;
94
95
96static const nxt_event_conn_state_t nxt_controller_conn_read_state;
97static const nxt_event_conn_state_t nxt_controller_conn_body_read_state;
98static const nxt_event_conn_state_t nxt_controller_conn_write_state;
99static const nxt_event_conn_state_t nxt_controller_conn_close_state;
100
101
102nxt_port_handlers_t nxt_controller_process_port_handlers = {
103 .quit = nxt_worker_process_quit_handler,
104 .new_port = nxt_controller_process_new_port_handler,
105 .change_file = nxt_port_change_log_file_handler,
106 .mmap = nxt_port_mmap_handler,
107 .data = nxt_port_data_handler,
108 .remove_pid = nxt_port_remove_pid_handler,
109 .rpc_ready = nxt_port_rpc_handler,
110 .rpc_error = nxt_port_rpc_handler,
111};
112
113
114nxt_int_t
115nxt_controller_start(nxt_task_t *task, void *data)
116{
88
89static nxt_uint_t nxt_controller_listening;
90static nxt_controller_conf_t nxt_controller_conf;
91static nxt_queue_t nxt_controller_waiting_requests;
92
93
94static const nxt_event_conn_state_t nxt_controller_conn_read_state;
95static const nxt_event_conn_state_t nxt_controller_conn_body_read_state;
96static const nxt_event_conn_state_t nxt_controller_conn_write_state;
97static const nxt_event_conn_state_t nxt_controller_conn_close_state;
98
99
100nxt_port_handlers_t nxt_controller_process_port_handlers = {
101 .quit = nxt_worker_process_quit_handler,
102 .new_port = nxt_controller_process_new_port_handler,
103 .change_file = nxt_port_change_log_file_handler,
104 .mmap = nxt_port_mmap_handler,
105 .data = nxt_port_data_handler,
106 .remove_pid = nxt_port_remove_pid_handler,
107 .rpc_ready = nxt_port_rpc_handler,
108 .rpc_error = nxt_port_rpc_handler,
109};
110
111
112nxt_int_t
113nxt_controller_start(nxt_task_t *task, void *data)
114{
117 nxt_mp_t *mp;
118 nxt_int_t ret;
119 nxt_str_t *json;
120 nxt_runtime_t *rt;
121 nxt_conf_value_t *conf;
122 nxt_event_engine_t *engine;
123 nxt_conf_validation_t vldt;
124 nxt_http_fields_hash_t *hash;
115 nxt_mp_t *mp;
116 nxt_int_t ret;
117 nxt_str_t *json;
118 nxt_runtime_t *rt;
119 nxt_conf_value_t *conf;
120 nxt_event_engine_t *engine;
121 nxt_conf_validation_t vldt;
125
126 rt = task->thread->runtime;
127
128 engine = task->thread->engine;
129
130 engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
131 if (nxt_slow_path(engine->mem_pool == NULL)) {
132 return NXT_ERROR;
133 }
134
122
123 rt = task->thread->runtime;
124
125 engine = task->thread->engine;
126
127 engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
128 if (nxt_slow_path(engine->mem_pool == NULL)) {
129 return NXT_ERROR;
130 }
131
135 hash = nxt_http_fields_hash_create(nxt_controller_request_fields,
136 rt->mem_pool);
137 if (nxt_slow_path(hash == NULL)) {
132 ret = nxt_http_fields_hash(&nxt_controller_fields_hash, rt->mem_pool,
133 nxt_controller_request_fields,
134 nxt_nitems(nxt_controller_request_fields));
135
136 if (nxt_slow_path(ret != NXT_OK)) {
138 return NXT_ERROR;
139 }
140
137 return NXT_ERROR;
138 }
139
141 nxt_controller_fields_hash = hash;
142 nxt_queue_init(&nxt_controller_waiting_requests);
143
144 json = data;
145
146 if (json->length == 0) {
147 return NXT_OK;
148 }
149
150 mp = nxt_mp_create(1024, 128, 256, 32);
151 if (nxt_slow_path(mp == NULL)) {
152 return NXT_ERROR;
153 }
154
155 conf = nxt_conf_json_parse_str(mp, json);
156 nxt_free(json->start);
157
158 if (nxt_slow_path(conf == NULL)) {
159 nxt_log(task, NXT_LOG_ALERT,
160 "failed to restore previous configuration: "
161 "file is corrupted or not enough memory");
162
163 nxt_mp_destroy(mp);
164 return NXT_OK;
165 }
166
167 nxt_memzero(&vldt, sizeof(nxt_conf_validation_t));
168
169 vldt.pool = nxt_mp_create(1024, 128, 256, 32);
170 if (nxt_slow_path(vldt.pool == NULL)) {
171 return NXT_ERROR;
172 }
173
174 vldt.conf = conf;
175
176 ret = nxt_conf_validate(&vldt);
177
178 if (nxt_slow_path(ret != NXT_OK)) {
179
180 if (ret == NXT_DECLINED) {
181 nxt_log(task, NXT_LOG_ALERT,
182 "the previous configuration is invalid: %V", &vldt.error);
183
184 nxt_mp_destroy(vldt.pool);
185 nxt_mp_destroy(mp);
186
187 return NXT_OK;
188 }
189
190 /* ret == NXT_ERROR */
191
192 return NXT_ERROR;
193 }
194
195 nxt_mp_destroy(vldt.pool);
196
197 nxt_controller_conf.root = conf;
198 nxt_controller_conf.pool = mp;
199
200 return NXT_OK;
201}
202
203
204static void
205nxt_controller_process_new_port_handler(nxt_task_t *task,
206 nxt_port_recv_msg_t *msg)
207{
208 nxt_int_t rc;
209 nxt_runtime_t *rt;
210 nxt_conf_value_t *conf;
211
212 nxt_port_new_port_handler(task, msg);
213
214 if (msg->u.new_port->type != NXT_PROCESS_ROUTER) {
215 return;
216 }
217
218 conf = nxt_controller_conf.root;
219
220 if (conf != NULL) {
221 rc = nxt_controller_conf_send(task, conf,
222 nxt_controller_conf_init_handler, NULL);
223
224 if (nxt_fast_path(rc == NXT_OK)) {
225 return;
226 }
227
228 nxt_mp_destroy(nxt_controller_conf.pool);
229
230 if (nxt_slow_path(nxt_controller_conf_default() != NXT_OK)) {
231 nxt_abort();
232 }
233 }
234
235 if (nxt_slow_path(nxt_controller_conf_default() != NXT_OK)) {
236 nxt_abort();
237 }
238
239 rt = task->thread->runtime;
240
241 if (nxt_slow_path(nxt_listen_event(task, rt->controller_socket) == NULL)) {
242 nxt_abort();
243 }
244
245 nxt_controller_listening = 1;
246}
247
248
249static nxt_int_t
250nxt_controller_conf_default(void)
251{
252 nxt_mp_t *mp;
253 nxt_conf_value_t *conf;
254
255 static const nxt_str_t json
256 = nxt_string("{ \"listeners\": {}, \"applications\": {} }");
257
258 mp = nxt_mp_create(1024, 128, 256, 32);
259
260 if (nxt_slow_path(mp == NULL)) {
261 return NXT_ERROR;
262 }
263
264 conf = nxt_conf_json_parse_str(mp, &json);
265
266 if (nxt_slow_path(conf == NULL)) {
267 return NXT_ERROR;
268 }
269
270 nxt_controller_conf.root = conf;
271 nxt_controller_conf.pool = mp;
272
273 return NXT_OK;
274}
275
276
277static void
278nxt_controller_conf_init_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
279 void *data)
280{
281 nxt_runtime_t *rt;
282
283 if (msg->port_msg.type != NXT_PORT_MSG_RPC_READY) {
284 nxt_log(task, NXT_LOG_ALERT, "failed to apply previous configuration");
285
286 nxt_mp_destroy(nxt_controller_conf.pool);
287
288 if (nxt_slow_path(nxt_controller_conf_default() != NXT_OK)) {
289 nxt_abort();
290 }
291 }
292
293 if (nxt_controller_listening == 0) {
294 rt = task->thread->runtime;
295
296 if (nxt_slow_path(nxt_listen_event(task, rt->controller_socket)
297 == NULL))
298 {
299 nxt_abort();
300 }
301
302 nxt_controller_listening = 1;
303 }
304}
305
306
307static nxt_int_t
308nxt_controller_conf_send(nxt_task_t *task, nxt_conf_value_t *conf,
309 nxt_port_rpc_handler_t handler, void *data)
310{
311 size_t size;
312 uint32_t stream;
313 nxt_int_t rc;
314 nxt_buf_t *b;
315 nxt_port_t *router_port, *controller_port;
316 nxt_runtime_t *rt;
317
318 rt = task->thread->runtime;
319
320 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
321
322 if (nxt_slow_path(router_port == NULL)) {
323 return NXT_DECLINED;
324 }
325
326 controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
327
328 size = nxt_conf_json_length(conf, NULL);
329
330 b = nxt_port_mmap_get_buf(task, router_port, size);
331 if (nxt_slow_path(b == NULL)) {
332 return NXT_ERROR;
333 }
334
335 b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL);
336
337 stream = nxt_port_rpc_register_handler(task, controller_port,
338 handler, handler,
339 router_port->pid, data);
340
341 rc = nxt_port_socket_write(task, router_port, NXT_PORT_MSG_DATA_LAST, -1,
342 stream, controller_port->id, b);
343
344 if (nxt_slow_path(rc != NXT_OK)) {
345 nxt_port_rpc_cancel(task, controller_port, stream);
346 return NXT_ERROR;
347 }
348
349 return NXT_OK;
350}
351
352
353nxt_int_t
354nxt_runtime_controller_socket(nxt_task_t *task, nxt_runtime_t *rt)
355{
356 nxt_sockaddr_t *sa;
357 nxt_listen_socket_t *ls;
358
359 sa = rt->controller_listen;
360
361 ls = nxt_mp_alloc(rt->mem_pool, sizeof(nxt_listen_socket_t));
362 if (ls == NULL) {
363 return NXT_ERROR;
364 }
365
366 ls->sockaddr = nxt_sockaddr_create(rt->mem_pool, &sa->u.sockaddr,
367 sa->socklen, sa->length);
368 if (ls->sockaddr == NULL) {
369 return NXT_ERROR;
370 }
371
372 ls->sockaddr->type = sa->type;
373 nxt_sockaddr_text(ls->sockaddr);
374
375 nxt_listen_socket_remote_size(ls);
376
377 ls->socket = -1;
378 ls->backlog = NXT_LISTEN_BACKLOG;
379 ls->read_after_accept = 1;
380 ls->flags = NXT_NONBLOCK;
381
382#if 0
383 /* STUB */
384 wq = nxt_mp_zget(cf->mem_pool, sizeof(nxt_work_queue_t));
385 if (wq == NULL) {
386 return NXT_ERROR;
387 }
388 nxt_work_queue_name(wq, "listen");
389 /**/
390
391 ls->work_queue = wq;
392#endif
393 ls->handler = nxt_controller_conn_init;
394
395 if (nxt_listen_socket_create(task, ls, 0) != NXT_OK) {
396 return NXT_ERROR;
397 }
398
399 rt->controller_socket = ls;
400
401 return NXT_OK;
402}
403
404
405static void
406nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data)
407{
408 nxt_buf_t *b;
409 nxt_conn_t *c;
410 nxt_event_engine_t *engine;
411 nxt_controller_request_t *r;
412
413 c = obj;
414
415 nxt_debug(task, "controller conn init fd:%d", c->socket.fd);
416
417 r = nxt_mp_zget(c->mem_pool, sizeof(nxt_controller_request_t));
418 if (nxt_slow_path(r == NULL)) {
419 nxt_controller_conn_free(task, c, NULL);
420 return;
421 }
422
423 r->conn = c;
424
425 if (nxt_slow_path(nxt_http_parse_request_init(&r->parser, c->mem_pool)
426 != NXT_OK))
427 {
428 nxt_controller_conn_free(task, c, NULL);
429 return;
430 }
431
140 nxt_queue_init(&nxt_controller_waiting_requests);
141
142 json = data;
143
144 if (json->length == 0) {
145 return NXT_OK;
146 }
147
148 mp = nxt_mp_create(1024, 128, 256, 32);
149 if (nxt_slow_path(mp == NULL)) {
150 return NXT_ERROR;
151 }
152
153 conf = nxt_conf_json_parse_str(mp, json);
154 nxt_free(json->start);
155
156 if (nxt_slow_path(conf == NULL)) {
157 nxt_log(task, NXT_LOG_ALERT,
158 "failed to restore previous configuration: "
159 "file is corrupted or not enough memory");
160
161 nxt_mp_destroy(mp);
162 return NXT_OK;
163 }
164
165 nxt_memzero(&vldt, sizeof(nxt_conf_validation_t));
166
167 vldt.pool = nxt_mp_create(1024, 128, 256, 32);
168 if (nxt_slow_path(vldt.pool == NULL)) {
169 return NXT_ERROR;
170 }
171
172 vldt.conf = conf;
173
174 ret = nxt_conf_validate(&vldt);
175
176 if (nxt_slow_path(ret != NXT_OK)) {
177
178 if (ret == NXT_DECLINED) {
179 nxt_log(task, NXT_LOG_ALERT,
180 "the previous configuration is invalid: %V", &vldt.error);
181
182 nxt_mp_destroy(vldt.pool);
183 nxt_mp_destroy(mp);
184
185 return NXT_OK;
186 }
187
188 /* ret == NXT_ERROR */
189
190 return NXT_ERROR;
191 }
192
193 nxt_mp_destroy(vldt.pool);
194
195 nxt_controller_conf.root = conf;
196 nxt_controller_conf.pool = mp;
197
198 return NXT_OK;
199}
200
201
202static void
203nxt_controller_process_new_port_handler(nxt_task_t *task,
204 nxt_port_recv_msg_t *msg)
205{
206 nxt_int_t rc;
207 nxt_runtime_t *rt;
208 nxt_conf_value_t *conf;
209
210 nxt_port_new_port_handler(task, msg);
211
212 if (msg->u.new_port->type != NXT_PROCESS_ROUTER) {
213 return;
214 }
215
216 conf = nxt_controller_conf.root;
217
218 if (conf != NULL) {
219 rc = nxt_controller_conf_send(task, conf,
220 nxt_controller_conf_init_handler, NULL);
221
222 if (nxt_fast_path(rc == NXT_OK)) {
223 return;
224 }
225
226 nxt_mp_destroy(nxt_controller_conf.pool);
227
228 if (nxt_slow_path(nxt_controller_conf_default() != NXT_OK)) {
229 nxt_abort();
230 }
231 }
232
233 if (nxt_slow_path(nxt_controller_conf_default() != NXT_OK)) {
234 nxt_abort();
235 }
236
237 rt = task->thread->runtime;
238
239 if (nxt_slow_path(nxt_listen_event(task, rt->controller_socket) == NULL)) {
240 nxt_abort();
241 }
242
243 nxt_controller_listening = 1;
244}
245
246
247static nxt_int_t
248nxt_controller_conf_default(void)
249{
250 nxt_mp_t *mp;
251 nxt_conf_value_t *conf;
252
253 static const nxt_str_t json
254 = nxt_string("{ \"listeners\": {}, \"applications\": {} }");
255
256 mp = nxt_mp_create(1024, 128, 256, 32);
257
258 if (nxt_slow_path(mp == NULL)) {
259 return NXT_ERROR;
260 }
261
262 conf = nxt_conf_json_parse_str(mp, &json);
263
264 if (nxt_slow_path(conf == NULL)) {
265 return NXT_ERROR;
266 }
267
268 nxt_controller_conf.root = conf;
269 nxt_controller_conf.pool = mp;
270
271 return NXT_OK;
272}
273
274
275static void
276nxt_controller_conf_init_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
277 void *data)
278{
279 nxt_runtime_t *rt;
280
281 if (msg->port_msg.type != NXT_PORT_MSG_RPC_READY) {
282 nxt_log(task, NXT_LOG_ALERT, "failed to apply previous configuration");
283
284 nxt_mp_destroy(nxt_controller_conf.pool);
285
286 if (nxt_slow_path(nxt_controller_conf_default() != NXT_OK)) {
287 nxt_abort();
288 }
289 }
290
291 if (nxt_controller_listening == 0) {
292 rt = task->thread->runtime;
293
294 if (nxt_slow_path(nxt_listen_event(task, rt->controller_socket)
295 == NULL))
296 {
297 nxt_abort();
298 }
299
300 nxt_controller_listening = 1;
301 }
302}
303
304
305static nxt_int_t
306nxt_controller_conf_send(nxt_task_t *task, nxt_conf_value_t *conf,
307 nxt_port_rpc_handler_t handler, void *data)
308{
309 size_t size;
310 uint32_t stream;
311 nxt_int_t rc;
312 nxt_buf_t *b;
313 nxt_port_t *router_port, *controller_port;
314 nxt_runtime_t *rt;
315
316 rt = task->thread->runtime;
317
318 router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
319
320 if (nxt_slow_path(router_port == NULL)) {
321 return NXT_DECLINED;
322 }
323
324 controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
325
326 size = nxt_conf_json_length(conf, NULL);
327
328 b = nxt_port_mmap_get_buf(task, router_port, size);
329 if (nxt_slow_path(b == NULL)) {
330 return NXT_ERROR;
331 }
332
333 b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL);
334
335 stream = nxt_port_rpc_register_handler(task, controller_port,
336 handler, handler,
337 router_port->pid, data);
338
339 rc = nxt_port_socket_write(task, router_port, NXT_PORT_MSG_DATA_LAST, -1,
340 stream, controller_port->id, b);
341
342 if (nxt_slow_path(rc != NXT_OK)) {
343 nxt_port_rpc_cancel(task, controller_port, stream);
344 return NXT_ERROR;
345 }
346
347 return NXT_OK;
348}
349
350
351nxt_int_t
352nxt_runtime_controller_socket(nxt_task_t *task, nxt_runtime_t *rt)
353{
354 nxt_sockaddr_t *sa;
355 nxt_listen_socket_t *ls;
356
357 sa = rt->controller_listen;
358
359 ls = nxt_mp_alloc(rt->mem_pool, sizeof(nxt_listen_socket_t));
360 if (ls == NULL) {
361 return NXT_ERROR;
362 }
363
364 ls->sockaddr = nxt_sockaddr_create(rt->mem_pool, &sa->u.sockaddr,
365 sa->socklen, sa->length);
366 if (ls->sockaddr == NULL) {
367 return NXT_ERROR;
368 }
369
370 ls->sockaddr->type = sa->type;
371 nxt_sockaddr_text(ls->sockaddr);
372
373 nxt_listen_socket_remote_size(ls);
374
375 ls->socket = -1;
376 ls->backlog = NXT_LISTEN_BACKLOG;
377 ls->read_after_accept = 1;
378 ls->flags = NXT_NONBLOCK;
379
380#if 0
381 /* STUB */
382 wq = nxt_mp_zget(cf->mem_pool, sizeof(nxt_work_queue_t));
383 if (wq == NULL) {
384 return NXT_ERROR;
385 }
386 nxt_work_queue_name(wq, "listen");
387 /**/
388
389 ls->work_queue = wq;
390#endif
391 ls->handler = nxt_controller_conn_init;
392
393 if (nxt_listen_socket_create(task, ls, 0) != NXT_OK) {
394 return NXT_ERROR;
395 }
396
397 rt->controller_socket = ls;
398
399 return NXT_OK;
400}
401
402
403static void
404nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data)
405{
406 nxt_buf_t *b;
407 nxt_conn_t *c;
408 nxt_event_engine_t *engine;
409 nxt_controller_request_t *r;
410
411 c = obj;
412
413 nxt_debug(task, "controller conn init fd:%d", c->socket.fd);
414
415 r = nxt_mp_zget(c->mem_pool, sizeof(nxt_controller_request_t));
416 if (nxt_slow_path(r == NULL)) {
417 nxt_controller_conn_free(task, c, NULL);
418 return;
419 }
420
421 r->conn = c;
422
423 if (nxt_slow_path(nxt_http_parse_request_init(&r->parser, c->mem_pool)
424 != NXT_OK))
425 {
426 nxt_controller_conn_free(task, c, NULL);
427 return;
428 }
429
432 r->parser.fields_hash = nxt_controller_fields_hash;
433
434 b = nxt_buf_mem_alloc(c->mem_pool, 1024, 0);
435 if (nxt_slow_path(b == NULL)) {
436 nxt_controller_conn_free(task, c, NULL);
437 return;
438 }
439
440 c->read = b;
441 c->socket.data = r;
442 c->socket.read_ready = 1;
443 c->read_state = &nxt_controller_conn_read_state;
444
445 engine = task->thread->engine;
446 c->read_work_queue = &engine->read_work_queue;
447 c->write_work_queue = &engine->write_work_queue;
448
449 nxt_conn_read(engine, c);
450}
451
452
453static const nxt_event_conn_state_t nxt_controller_conn_read_state
454 nxt_aligned(64) =
455{
456 .ready_handler = nxt_controller_conn_read,
457 .close_handler = nxt_controller_conn_close,
458 .error_handler = nxt_controller_conn_read_error,
459
460 .timer_handler = nxt_controller_conn_read_timeout,
461 .timer_value = nxt_controller_conn_timeout_value,
462 .timer_data = 60 * 1000,
463};
464
465
466static void
467nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data)
468{
469 size_t preread;
470 nxt_buf_t *b;
471 nxt_int_t rc;
472 nxt_conn_t *c;
473 nxt_controller_request_t *r;
474
475 c = obj;
476 r = data;
477
478 nxt_debug(task, "controller conn read");
479
480 nxt_queue_remove(&c->link);
481 nxt_queue_self(&c->link);
482
483 b = c->read;
484
485 rc = nxt_http_parse_request(&r->parser, &b->mem);
486
487 if (nxt_slow_path(rc != NXT_DONE)) {
488
489 if (rc == NXT_AGAIN) {
490 if (nxt_buf_mem_free_size(&b->mem) == 0) {
491 nxt_log(task, NXT_LOG_ERR, "too long request headers");
492 nxt_controller_conn_close(task, c, r);
493 return;
494 }
495
496 nxt_conn_read(task->thread->engine, c);
497 return;
498 }
499
500 /* rc == NXT_ERROR */
501
502 nxt_log(task, NXT_LOG_ERR, "parsing error");
503
504 nxt_controller_conn_close(task, c, r);
505 return;
506 }
507
430 b = nxt_buf_mem_alloc(c->mem_pool, 1024, 0);
431 if (nxt_slow_path(b == NULL)) {
432 nxt_controller_conn_free(task, c, NULL);
433 return;
434 }
435
436 c->read = b;
437 c->socket.data = r;
438 c->socket.read_ready = 1;
439 c->read_state = &nxt_controller_conn_read_state;
440
441 engine = task->thread->engine;
442 c->read_work_queue = &engine->read_work_queue;
443 c->write_work_queue = &engine->write_work_queue;
444
445 nxt_conn_read(engine, c);
446}
447
448
449static const nxt_event_conn_state_t nxt_controller_conn_read_state
450 nxt_aligned(64) =
451{
452 .ready_handler = nxt_controller_conn_read,
453 .close_handler = nxt_controller_conn_close,
454 .error_handler = nxt_controller_conn_read_error,
455
456 .timer_handler = nxt_controller_conn_read_timeout,
457 .timer_value = nxt_controller_conn_timeout_value,
458 .timer_data = 60 * 1000,
459};
460
461
462static void
463nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data)
464{
465 size_t preread;
466 nxt_buf_t *b;
467 nxt_int_t rc;
468 nxt_conn_t *c;
469 nxt_controller_request_t *r;
470
471 c = obj;
472 r = data;
473
474 nxt_debug(task, "controller conn read");
475
476 nxt_queue_remove(&c->link);
477 nxt_queue_self(&c->link);
478
479 b = c->read;
480
481 rc = nxt_http_parse_request(&r->parser, &b->mem);
482
483 if (nxt_slow_path(rc != NXT_DONE)) {
484
485 if (rc == NXT_AGAIN) {
486 if (nxt_buf_mem_free_size(&b->mem) == 0) {
487 nxt_log(task, NXT_LOG_ERR, "too long request headers");
488 nxt_controller_conn_close(task, c, r);
489 return;
490 }
491
492 nxt_conn_read(task->thread->engine, c);
493 return;
494 }
495
496 /* rc == NXT_ERROR */
497
498 nxt_log(task, NXT_LOG_ERR, "parsing error");
499
500 nxt_controller_conn_close(task, c, r);
501 return;
502 }
503
508 rc = nxt_http_fields_process(r->parser.fields, r, task->log);
504 rc = nxt_http_fields_process(r->parser.fields, &nxt_controller_fields_hash,
505 r);
509
510 if (nxt_slow_path(rc != NXT_OK)) {
511 nxt_controller_conn_close(task, c, r);
512 return;
513 }
514
515 preread = nxt_buf_mem_used_size(&b->mem);
516
517 nxt_debug(task, "controller request header parsing complete, "
518 "body length: %uz, preread: %uz",
519 r->length, preread);
520
521 if (preread >= r->length) {
522 nxt_controller_process_request(task, r);
523 return;
524 }
525
526 if (r->length - preread > (size_t) nxt_buf_mem_free_size(&b->mem)) {
527 b = nxt_buf_mem_alloc(c->mem_pool, r->length, 0);
528 if (nxt_slow_path(b == NULL)) {
529 nxt_controller_conn_free(task, c, NULL);
530 return;
531 }
532
533 b->mem.free = nxt_cpymem(b->mem.free, c->read->mem.pos, preread);
534
535 c->read = b;
536 }
537
538 c->read_state = &nxt_controller_conn_body_read_state;
539
540 nxt_conn_read(task->thread->engine, c);
541}
542
543
544static nxt_msec_t
545nxt_controller_conn_timeout_value(nxt_conn_t *c, uintptr_t data)
546{
547 return (nxt_msec_t) data;
548}
549
550
551static void
552nxt_controller_conn_read_error(nxt_task_t *task, void *obj, void *data)
553{
554 nxt_conn_t *c;
555
556 c = obj;
557
558 nxt_debug(task, "controller conn read error");
559
560 nxt_controller_conn_close(task, c, data);
561}
562
563
564static void
565nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj, void *data)
566{
567 nxt_timer_t *timer;
568 nxt_conn_t *c;
569
570 timer = obj;
571
572 c = nxt_read_timer_conn(timer);
573 c->socket.timedout = 1;
574 c->socket.closed = 1;
575
576 nxt_debug(task, "controller conn read timeout");
577
578 nxt_controller_conn_close(task, c, data);
579}
580
581
582static const nxt_event_conn_state_t nxt_controller_conn_body_read_state
583 nxt_aligned(64) =
584{
585 .ready_handler = nxt_controller_conn_body_read,
586 .close_handler = nxt_controller_conn_close,
587 .error_handler = nxt_controller_conn_read_error,
588
589 .timer_handler = nxt_controller_conn_read_timeout,
590 .timer_value = nxt_controller_conn_timeout_value,
591 .timer_data = 60 * 1000,
592 .timer_autoreset = 1,
593};
594
595
596static void
597nxt_controller_conn_body_read(nxt_task_t *task, void *obj, void *data)
598{
599 size_t read;
600 nxt_buf_t *b;
601 nxt_conn_t *c;
602 nxt_controller_request_t *r;
603
604 c = obj;
605 r = data;
606 b = c->read;
607
608 read = nxt_buf_mem_used_size(&b->mem);
609
610 nxt_debug(task, "controller conn body read: %uz of %uz",
611 read, r->length);
612
613 if (read >= r->length) {
614 nxt_controller_process_request(task, r);
615 return;
616 }
617
618 nxt_conn_read(task->thread->engine, c);
619}
620
621
622static const nxt_event_conn_state_t nxt_controller_conn_write_state
623 nxt_aligned(64) =
624{
625 .ready_handler = nxt_controller_conn_write,
626 .error_handler = nxt_controller_conn_write_error,
627
628 .timer_handler = nxt_controller_conn_write_timeout,
629 .timer_value = nxt_controller_conn_timeout_value,
630 .timer_data = 60 * 1000,
631 .timer_autoreset = 1,
632};
633
634
635static void
636nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data)
637{
638 nxt_buf_t *b;
639 nxt_conn_t *c;
640
641 c = obj;
642
643 nxt_debug(task, "controller conn write");
644
645 b = c->write;
646
647 if (b->mem.pos != b->mem.free) {
648 nxt_conn_write(task->thread->engine, c);
649 return;
650 }
651
652 nxt_debug(task, "controller conn write complete");
653
654 nxt_controller_conn_close(task, c, data);
655}
656
657
658static void
659nxt_controller_conn_write_error(nxt_task_t *task, void *obj, void *data)
660{
661 nxt_conn_t *c;
662
663 c = obj;
664
665 nxt_debug(task, "controller conn write error");
666
667 nxt_controller_conn_close(task, c, data);
668}
669
670
671static void
672nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj, void *data)
673{
674 nxt_conn_t *c;
675 nxt_timer_t *timer;
676
677 timer = obj;
678
679 c = nxt_write_timer_conn(timer);
680 c->socket.timedout = 1;
681 c->socket.closed = 1;
682
683 nxt_debug(task, "controller conn write timeout");
684
685 nxt_controller_conn_close(task, c, data);
686}
687
688
689static const nxt_event_conn_state_t nxt_controller_conn_close_state
690 nxt_aligned(64) =
691{
692 .ready_handler = nxt_controller_conn_free,
693};
694
695
696static void
697nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data)
698{
699 nxt_conn_t *c;
700
701 c = obj;
702
703 nxt_debug(task, "controller conn close");
704
705 nxt_queue_remove(&c->link);
706
707 c->write_state = &nxt_controller_conn_close_state;
708
709 nxt_conn_close(task->thread->engine, c);
710}
711
712
713static void
714nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data)
715{
716 nxt_conn_t *c;
717
718 c = obj;
719
720 nxt_debug(task, "controller conn free");
721
722 nxt_sockaddr_cache_free(task->thread->engine, c);
723
724 nxt_conn_free(task, c);
725}
726
727
728static nxt_int_t
729nxt_controller_request_content_length(void *ctx, nxt_http_field_t *field,
506
507 if (nxt_slow_path(rc != NXT_OK)) {
508 nxt_controller_conn_close(task, c, r);
509 return;
510 }
511
512 preread = nxt_buf_mem_used_size(&b->mem);
513
514 nxt_debug(task, "controller request header parsing complete, "
515 "body length: %uz, preread: %uz",
516 r->length, preread);
517
518 if (preread >= r->length) {
519 nxt_controller_process_request(task, r);
520 return;
521 }
522
523 if (r->length - preread > (size_t) nxt_buf_mem_free_size(&b->mem)) {
524 b = nxt_buf_mem_alloc(c->mem_pool, r->length, 0);
525 if (nxt_slow_path(b == NULL)) {
526 nxt_controller_conn_free(task, c, NULL);
527 return;
528 }
529
530 b->mem.free = nxt_cpymem(b->mem.free, c->read->mem.pos, preread);
531
532 c->read = b;
533 }
534
535 c->read_state = &nxt_controller_conn_body_read_state;
536
537 nxt_conn_read(task->thread->engine, c);
538}
539
540
541static nxt_msec_t
542nxt_controller_conn_timeout_value(nxt_conn_t *c, uintptr_t data)
543{
544 return (nxt_msec_t) data;
545}
546
547
548static void
549nxt_controller_conn_read_error(nxt_task_t *task, void *obj, void *data)
550{
551 nxt_conn_t *c;
552
553 c = obj;
554
555 nxt_debug(task, "controller conn read error");
556
557 nxt_controller_conn_close(task, c, data);
558}
559
560
561static void
562nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj, void *data)
563{
564 nxt_timer_t *timer;
565 nxt_conn_t *c;
566
567 timer = obj;
568
569 c = nxt_read_timer_conn(timer);
570 c->socket.timedout = 1;
571 c->socket.closed = 1;
572
573 nxt_debug(task, "controller conn read timeout");
574
575 nxt_controller_conn_close(task, c, data);
576}
577
578
579static const nxt_event_conn_state_t nxt_controller_conn_body_read_state
580 nxt_aligned(64) =
581{
582 .ready_handler = nxt_controller_conn_body_read,
583 .close_handler = nxt_controller_conn_close,
584 .error_handler = nxt_controller_conn_read_error,
585
586 .timer_handler = nxt_controller_conn_read_timeout,
587 .timer_value = nxt_controller_conn_timeout_value,
588 .timer_data = 60 * 1000,
589 .timer_autoreset = 1,
590};
591
592
593static void
594nxt_controller_conn_body_read(nxt_task_t *task, void *obj, void *data)
595{
596 size_t read;
597 nxt_buf_t *b;
598 nxt_conn_t *c;
599 nxt_controller_request_t *r;
600
601 c = obj;
602 r = data;
603 b = c->read;
604
605 read = nxt_buf_mem_used_size(&b->mem);
606
607 nxt_debug(task, "controller conn body read: %uz of %uz",
608 read, r->length);
609
610 if (read >= r->length) {
611 nxt_controller_process_request(task, r);
612 return;
613 }
614
615 nxt_conn_read(task->thread->engine, c);
616}
617
618
619static const nxt_event_conn_state_t nxt_controller_conn_write_state
620 nxt_aligned(64) =
621{
622 .ready_handler = nxt_controller_conn_write,
623 .error_handler = nxt_controller_conn_write_error,
624
625 .timer_handler = nxt_controller_conn_write_timeout,
626 .timer_value = nxt_controller_conn_timeout_value,
627 .timer_data = 60 * 1000,
628 .timer_autoreset = 1,
629};
630
631
632static void
633nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data)
634{
635 nxt_buf_t *b;
636 nxt_conn_t *c;
637
638 c = obj;
639
640 nxt_debug(task, "controller conn write");
641
642 b = c->write;
643
644 if (b->mem.pos != b->mem.free) {
645 nxt_conn_write(task->thread->engine, c);
646 return;
647 }
648
649 nxt_debug(task, "controller conn write complete");
650
651 nxt_controller_conn_close(task, c, data);
652}
653
654
655static void
656nxt_controller_conn_write_error(nxt_task_t *task, void *obj, void *data)
657{
658 nxt_conn_t *c;
659
660 c = obj;
661
662 nxt_debug(task, "controller conn write error");
663
664 nxt_controller_conn_close(task, c, data);
665}
666
667
668static void
669nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj, void *data)
670{
671 nxt_conn_t *c;
672 nxt_timer_t *timer;
673
674 timer = obj;
675
676 c = nxt_write_timer_conn(timer);
677 c->socket.timedout = 1;
678 c->socket.closed = 1;
679
680 nxt_debug(task, "controller conn write timeout");
681
682 nxt_controller_conn_close(task, c, data);
683}
684
685
686static const nxt_event_conn_state_t nxt_controller_conn_close_state
687 nxt_aligned(64) =
688{
689 .ready_handler = nxt_controller_conn_free,
690};
691
692
693static void
694nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data)
695{
696 nxt_conn_t *c;
697
698 c = obj;
699
700 nxt_debug(task, "controller conn close");
701
702 nxt_queue_remove(&c->link);
703
704 c->write_state = &nxt_controller_conn_close_state;
705
706 nxt_conn_close(task->thread->engine, c);
707}
708
709
710static void
711nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data)
712{
713 nxt_conn_t *c;
714
715 c = obj;
716
717 nxt_debug(task, "controller conn free");
718
719 nxt_sockaddr_cache_free(task->thread->engine, c);
720
721 nxt_conn_free(task, c);
722}
723
724
725static nxt_int_t
726nxt_controller_request_content_length(void *ctx, nxt_http_field_t *field,
730 nxt_log_t *log)
727 uintptr_t data)
731{
732 off_t length;
733 nxt_controller_request_t *r;
734
735 r = ctx;
736
728{
729 off_t length;
730 nxt_controller_request_t *r;
731
732 r = ctx;
733
737 length = nxt_off_t_parse(field->value.start, field->value.length);
734 length = nxt_off_t_parse(field->value, field->value_length);
738
739 if (nxt_fast_path(length > 0)) {
740
741 if (nxt_slow_path(length > NXT_SIZE_T_MAX)) {
735
736 if (nxt_fast_path(length > 0)) {
737
738 if (nxt_slow_path(length > NXT_SIZE_T_MAX)) {
742 nxt_log_error(NXT_LOG_ERR, log, "Content-Length is too big");
739 nxt_log_error(NXT_LOG_ERR, &r->conn->log,
740 "Content-Length is too big");
743 return NXT_ERROR;
744 }
745
746 r->length = length;
747 return NXT_OK;
748 }
749
741 return NXT_ERROR;
742 }
743
744 r->length = length;
745 return NXT_OK;
746 }
747
750 nxt_log_error(NXT_LOG_ERR, log, "Content-Length is invalid");
748 nxt_log_error(NXT_LOG_ERR, &r->conn->log, "Content-Length is invalid");
751
752 return NXT_ERROR;
753}
754
755
756static void
757nxt_controller_process_request(nxt_task_t *task, nxt_controller_request_t *req)
758{
759 nxt_mp_t *mp;
760 nxt_int_t rc;
761 nxt_str_t path;
762 nxt_conn_t *c;
763 nxt_buf_mem_t *mbuf;
764 nxt_conf_op_t *ops;
765 nxt_conf_value_t *value;
766 nxt_conf_validation_t vldt;
767 nxt_conf_json_error_t error;
768 nxt_controller_response_t resp;
769
770 static const nxt_str_t empty_obj = nxt_string("{}");
771
772 c = req->conn;
773 path = req->parser.path;
774
775 if (path.length > 1 && path.start[path.length - 1] == '/') {
776 path.length--;
777 }
778
779 nxt_memzero(&resp, sizeof(nxt_controller_response_t));
780
781 if (nxt_str_eq(&req->parser.method, "GET", 3)) {
782
783 value = nxt_conf_get_path(nxt_controller_conf.root, &path);
784
785 if (value == NULL) {
786 goto not_found;
787 }
788
789 resp.status = 200;
790 resp.conf = value;
791
792 nxt_controller_response(task, req, &resp);
793 return;
794 }
795
796 if (nxt_str_eq(&req->parser.method, "PUT", 3)) {
797
798 if (!nxt_queue_is_empty(&nxt_controller_waiting_requests)) {
799 nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link);
800 return;
801 }
802
803 mp = nxt_mp_create(1024, 128, 256, 32);
804
805 if (nxt_slow_path(mp == NULL)) {
806 goto alloc_fail;
807 }
808
809 mbuf = &c->read->mem;
810
811 nxt_memzero(&error, sizeof(nxt_conf_json_error_t));
812
813 value = nxt_conf_json_parse(mp, mbuf->pos, mbuf->free, &error);
814
815 if (value == NULL) {
816 nxt_mp_destroy(mp);
817
818 if (error.pos == NULL) {
819 goto alloc_fail;
820 }
821
822 resp.status = 400;
823 resp.title = (u_char *) "Invalid JSON.";
824 resp.detail.length = nxt_strlen(error.detail);
825 resp.detail.start = error.detail;
826 resp.offset = error.pos - mbuf->pos;
827
828 nxt_conf_json_position(mbuf->pos, error.pos,
829 &resp.line, &resp.column);
830
831 nxt_controller_response(task, req, &resp);
832 return;
833 }
834
835 if (path.length != 1) {
836 rc = nxt_conf_op_compile(c->mem_pool, &ops,
837 nxt_controller_conf.root,
838 &path, value);
839
840 if (rc != NXT_OK) {
841 if (rc == NXT_DECLINED) {
842 goto not_found;
843 }
844
845 goto alloc_fail;
846 }
847
848 value = nxt_conf_clone(mp, ops, nxt_controller_conf.root);
849
850 if (nxt_slow_path(value == NULL)) {
851 nxt_mp_destroy(mp);
852 goto alloc_fail;
853 }
854 }
855
856 nxt_memzero(&vldt, sizeof(nxt_conf_validation_t));
857
858 vldt.conf = value;
859 vldt.pool = c->mem_pool;
860
861 rc = nxt_conf_validate(&vldt);
862
863 if (nxt_slow_path(rc != NXT_OK)) {
864 nxt_mp_destroy(mp);
865
866 if (rc == NXT_DECLINED) {
867 resp.detail = vldt.error;
868 goto invalid_conf;
869 }
870
871 /* rc == NXT_ERROR */
872 goto alloc_fail;
873 }
874
875 rc = nxt_controller_conf_send(task, value,
876 nxt_controller_conf_handler, req);
877
878 if (nxt_slow_path(rc != NXT_OK)) {
879 nxt_mp_destroy(mp);
880
881 if (rc == NXT_DECLINED) {
882 goto no_router;
883 }
884
885 /* rc == NXT_ERROR */
886 goto alloc_fail;
887 }
888
889 req->conf.root = value;
890 req->conf.pool = mp;
891
892 nxt_queue_insert_head(&nxt_controller_waiting_requests, &req->link);
893
894 return;
895 }
896
897 if (nxt_str_eq(&req->parser.method, "DELETE", 6)) {
898
899 if (!nxt_queue_is_empty(&nxt_controller_waiting_requests)) {
900 nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link);
901 return;
902 }
903
904 if (path.length == 1) {
905 mp = nxt_mp_create(1024, 128, 256, 32);
906
907 if (nxt_slow_path(mp == NULL)) {
908 goto alloc_fail;
909 }
910
911 value = nxt_conf_json_parse_str(mp, &empty_obj);
912
913 } else {
914 rc = nxt_conf_op_compile(c->mem_pool, &ops,
915 nxt_controller_conf.root,
916 &path, NULL);
917
918 if (rc != NXT_OK) {
919 if (rc == NXT_DECLINED) {
920 goto not_found;
921 }
922
923 goto alloc_fail;
924 }
925
926 mp = nxt_mp_create(1024, 128, 256, 32);
927
928 if (nxt_slow_path(mp == NULL)) {
929 goto alloc_fail;
930 }
931
932 value = nxt_conf_clone(mp, ops, nxt_controller_conf.root);
933 }
934
935 if (nxt_slow_path(value == NULL)) {
936 nxt_mp_destroy(mp);
937 goto alloc_fail;
938 }
939
940 nxt_memzero(&vldt, sizeof(nxt_conf_validation_t));
941
942 vldt.conf = value;
943 vldt.pool = c->mem_pool;
944
945 rc = nxt_conf_validate(&vldt);
946
947 if (nxt_slow_path(rc != NXT_OK)) {
948 nxt_mp_destroy(mp);
949
950 if (rc == NXT_DECLINED) {
951 resp.detail = vldt.error;
952 goto invalid_conf;
953 }
954
955 /* rc == NXT_ERROR */
956 goto alloc_fail;
957 }
958
959 rc = nxt_controller_conf_send(task, value,
960 nxt_controller_conf_handler, req);
961
962 if (nxt_slow_path(rc != NXT_OK)) {
963 nxt_mp_destroy(mp);
964
965 if (rc == NXT_DECLINED) {
966 goto no_router;
967 }
968
969 /* rc == NXT_ERROR */
970 goto alloc_fail;
971 }
972
973 req->conf.root = value;
974 req->conf.pool = mp;
975
976 nxt_queue_insert_head(&nxt_controller_waiting_requests, &req->link);
977
978 return;
979 }
980
981 resp.status = 405;
982 resp.title = (u_char *) "Invalid method.";
983 resp.offset = -1;
984
985 nxt_controller_response(task, req, &resp);
986 return;
987
988not_found:
989
990 resp.status = 404;
991 resp.title = (u_char *) "Value doesn't exist.";
992 resp.offset = -1;
993
994 nxt_controller_response(task, req, &resp);
995 return;
996
997invalid_conf:
998
999 resp.status = 400;
1000 resp.title = (u_char *) "Invalid configuration.";
1001 resp.offset = -1;
1002
1003 nxt_controller_response(task, req, &resp);
1004 return;
1005
1006alloc_fail:
1007
1008 resp.status = 500;
1009 resp.title = (u_char *) "Memory allocation failed.";
1010 resp.offset = -1;
1011
1012 nxt_controller_response(task, req, &resp);
1013 return;
1014
1015no_router:
1016
1017 resp.status = 500;
1018 resp.title = (u_char *) "Router process isn't available.";
1019 resp.offset = -1;
1020
1021 nxt_controller_response(task, req, &resp);
1022 return;
1023}
1024
1025
1026static void
1027nxt_controller_conf_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
1028 void *data)
1029{
1030 nxt_queue_t queue;
1031 nxt_controller_request_t *req;
1032 nxt_controller_response_t resp;
1033
1034 req = data;
1035
1036 nxt_debug(task, "controller conf ready: %*s",
1037 nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos);
1038
1039 nxt_queue_remove(&req->link);
1040
1041 nxt_memzero(&resp, sizeof(nxt_controller_response_t));
1042
1043 if (msg->port_msg.type == NXT_PORT_MSG_RPC_READY) {
1044 nxt_mp_destroy(nxt_controller_conf.pool);
1045
1046 nxt_controller_conf = req->conf;
1047
1048 nxt_controller_conf_store(task, req->conf.root);
1049
1050 resp.status = 200;
1051 resp.title = (u_char *) "Reconfiguration done.";
1052
1053 } else {
1054 nxt_mp_destroy(req->conf.pool);
1055
1056 resp.status = 500;
1057 resp.title = (u_char *) "Failed to apply new configuration.";
1058 resp.offset = -1;
1059 }
1060
1061 nxt_controller_response(task, req, &resp);
1062
1063 nxt_queue_init(&queue);
1064 nxt_queue_add(&queue, &nxt_controller_waiting_requests);
1065
1066 nxt_queue_init(&nxt_controller_waiting_requests);
1067
1068 nxt_queue_each(req, &queue, nxt_controller_request_t, link) {
1069 nxt_controller_process_request(task, req);
1070 } nxt_queue_loop;
1071}
1072
1073
1074static void
1075nxt_controller_conf_store(nxt_task_t *task, nxt_conf_value_t *conf)
1076{
1077 size_t size;
1078 nxt_buf_t *b;
1079 nxt_port_t *main_port;
1080 nxt_runtime_t *rt;
1081
1082 rt = task->thread->runtime;
1083
1084 main_port = rt->port_by_type[NXT_PROCESS_MAIN];
1085
1086 size = nxt_conf_json_length(conf, NULL);
1087
1088 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size);
1089
1090 if (nxt_fast_path(b != NULL)) {
1091 b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL);
1092
1093 (void) nxt_port_socket_write(task, main_port, NXT_PORT_MSG_CONF_STORE,
1094 -1, 0, -1, b);
1095 }
1096}
1097
1098
1099static void
1100nxt_controller_response(nxt_task_t *task, nxt_controller_request_t *req,
1101 nxt_controller_response_t *resp)
1102{
1103 size_t size;
1104 nxt_str_t status_line, str;
1105 nxt_buf_t *b, *body;
1106 nxt_conn_t *c;
1107 nxt_uint_t n;
1108 nxt_conf_value_t *value, *location;
1109 nxt_conf_json_pretty_t pretty;
1110
1111 static nxt_str_t success_str = nxt_string("success");
1112 static nxt_str_t error_str = nxt_string("error");
1113 static nxt_str_t detail_str = nxt_string("detail");
1114 static nxt_str_t location_str = nxt_string("location");
1115 static nxt_str_t offset_str = nxt_string("offset");
1116 static nxt_str_t line_str = nxt_string("line");
1117 static nxt_str_t column_str = nxt_string("column");
1118
1119 static nxt_time_string_t date_cache = {
1120 (nxt_atomic_uint_t) -1,
1121 nxt_controller_date,
1122 "%s, %02d %s %4d %02d:%02d:%02d GMT",
1123 sizeof("Wed, 31 Dec 1986 16:40:00 GMT") - 1,
1124 NXT_THREAD_TIME_GMT,
1125 NXT_THREAD_TIME_SEC,
1126 };
1127
1128 switch (resp->status) {
1129
1130 case 200:
1131 nxt_str_set(&status_line, "200 OK");
1132 break;
1133
1134 case 400:
1135 nxt_str_set(&status_line, "400 Bad Request");
1136 break;
1137
1138 case 404:
1139 nxt_str_set(&status_line, "404 Not Found");
1140 break;
1141
1142 case 405:
1143 nxt_str_set(&status_line, "405 Method Not Allowed");
1144 break;
1145
1146 default:
1147 nxt_str_set(&status_line, "500 Internal Server Error");
1148 break;
1149 }
1150
1151 c = req->conn;
1152 value = resp->conf;
1153
1154 if (value == NULL) {
1155 n = 1
1156 + (resp->detail.length != 0)
1157 + (resp->status >= 400 && resp->offset != -1);
1158
1159 value = nxt_conf_create_object(c->mem_pool, n);
1160
1161 if (nxt_slow_path(value == NULL)) {
1162 nxt_controller_conn_close(task, c, req);
1163 return;
1164 }
1165
1166 str.length = nxt_strlen(resp->title);
1167 str.start = resp->title;
1168
1169 if (resp->status < 400) {
1170 nxt_conf_set_member_string(value, &success_str, &str, 0);
1171
1172 } else {
1173 nxt_conf_set_member_string(value, &error_str, &str, 0);
1174 }
1175
1176 n = 0;
1177
1178 if (resp->detail.length != 0) {
1179 n++;
1180
1181 nxt_conf_set_member_string(value, &detail_str, &resp->detail, n);
1182 }
1183
1184 if (resp->status >= 400 && resp->offset != -1) {
1185 n++;
1186
1187 location = nxt_conf_create_object(c->mem_pool,
1188 resp->line != 0 ? 3 : 1);
1189
1190 nxt_conf_set_member(value, &location_str, location, n);
1191
1192 nxt_conf_set_member_integer(location, &offset_str, resp->offset, 0);
1193
1194 if (resp->line != 0) {
1195 nxt_conf_set_member_integer(location, &line_str,
1196 resp->line, 1);
1197
1198 nxt_conf_set_member_integer(location, &column_str,
1199 resp->column, 2);
1200 }
1201 }
1202 }
1203
1204 nxt_memzero(&pretty, sizeof(nxt_conf_json_pretty_t));
1205
1206 size = nxt_conf_json_length(value, &pretty) + 2;
1207
1208 body = nxt_buf_mem_alloc(c->mem_pool, size, 0);
1209 if (nxt_slow_path(body == NULL)) {
1210 nxt_controller_conn_close(task, c, req);
1211 return;
1212 }
1213
1214 nxt_memzero(&pretty, sizeof(nxt_conf_json_pretty_t));
1215
1216 body->mem.free = nxt_conf_json_print(body->mem.free, value, &pretty);
1217
1218 body->mem.free = nxt_cpymem(body->mem.free, "\r\n", 2);
1219
1220 size = sizeof("HTTP/1.1 " "\r\n") - 1 + status_line.length
1221 + sizeof("Server: unit/" NXT_VERSION "\r\n") - 1
1222 + sizeof("Date: Wed, 31 Dec 1986 16:40:00 GMT\r\n") - 1
1223 + sizeof("Content-Type: application/json\r\n") - 1
1224 + sizeof("Content-Length: " "\r\n") - 1 + NXT_SIZE_T_LEN
1225 + sizeof("Connection: close\r\n") - 1
1226 + sizeof("\r\n") - 1;
1227
1228 b = nxt_buf_mem_alloc(c->mem_pool, size, 0);
1229 if (nxt_slow_path(b == NULL)) {
1230 nxt_controller_conn_close(task, c, req);
1231 return;
1232 }
1233
1234 b->next = body;
1235
1236 nxt_str_set(&str, "HTTP/1.1 ");
1237
1238 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length);
1239 b->mem.free = nxt_cpymem(b->mem.free, status_line.start,
1240 status_line.length);
1241
1242 nxt_str_set(&str, "\r\n"
1243 "Server: unit/" NXT_VERSION "\r\n"
1244 "Date: ");
1245
1246 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length);
1247
1248 b->mem.free = nxt_thread_time_string(task->thread, &date_cache,
1249 b->mem.free);
1250
1251 nxt_str_set(&str, "\r\n"
1252 "Content-Type: application/json\r\n"
1253 "Content-Length: ");
1254
1255 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length);
1256
1257 b->mem.free = nxt_sprintf(b->mem.free, b->mem.end, "%uz",
1258 nxt_buf_mem_used_size(&body->mem));
1259
1260 nxt_str_set(&str, "\r\n"
1261 "Connection: close\r\n"
1262 "\r\n");
1263
1264 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length);
1265
1266 c->write = b;
1267 c->write_state = &nxt_controller_conn_write_state;
1268
1269 nxt_conn_write(task->thread->engine, c);
1270}
1271
1272
1273static u_char *
1274nxt_controller_date(u_char *buf, nxt_realtime_t *now, struct tm *tm,
1275 size_t size, const char *format)
1276{
1277 static const char *week[] = { "Sun", "Mon", "Tue", "Wed", "Thu", "Fri",
1278 "Sat" };
1279
1280 static const char *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun",
1281 "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
1282
1283 return nxt_sprintf(buf, buf + size, format,
1284 week[tm->tm_wday], tm->tm_mday,
1285 month[tm->tm_mon], tm->tm_year + 1900,
1286 tm->tm_hour, tm->tm_min, tm->tm_sec);
1287}
749
750 return NXT_ERROR;
751}
752
753
754static void
755nxt_controller_process_request(nxt_task_t *task, nxt_controller_request_t *req)
756{
757 nxt_mp_t *mp;
758 nxt_int_t rc;
759 nxt_str_t path;
760 nxt_conn_t *c;
761 nxt_buf_mem_t *mbuf;
762 nxt_conf_op_t *ops;
763 nxt_conf_value_t *value;
764 nxt_conf_validation_t vldt;
765 nxt_conf_json_error_t error;
766 nxt_controller_response_t resp;
767
768 static const nxt_str_t empty_obj = nxt_string("{}");
769
770 c = req->conn;
771 path = req->parser.path;
772
773 if (path.length > 1 && path.start[path.length - 1] == '/') {
774 path.length--;
775 }
776
777 nxt_memzero(&resp, sizeof(nxt_controller_response_t));
778
779 if (nxt_str_eq(&req->parser.method, "GET", 3)) {
780
781 value = nxt_conf_get_path(nxt_controller_conf.root, &path);
782
783 if (value == NULL) {
784 goto not_found;
785 }
786
787 resp.status = 200;
788 resp.conf = value;
789
790 nxt_controller_response(task, req, &resp);
791 return;
792 }
793
794 if (nxt_str_eq(&req->parser.method, "PUT", 3)) {
795
796 if (!nxt_queue_is_empty(&nxt_controller_waiting_requests)) {
797 nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link);
798 return;
799 }
800
801 mp = nxt_mp_create(1024, 128, 256, 32);
802
803 if (nxt_slow_path(mp == NULL)) {
804 goto alloc_fail;
805 }
806
807 mbuf = &c->read->mem;
808
809 nxt_memzero(&error, sizeof(nxt_conf_json_error_t));
810
811 value = nxt_conf_json_parse(mp, mbuf->pos, mbuf->free, &error);
812
813 if (value == NULL) {
814 nxt_mp_destroy(mp);
815
816 if (error.pos == NULL) {
817 goto alloc_fail;
818 }
819
820 resp.status = 400;
821 resp.title = (u_char *) "Invalid JSON.";
822 resp.detail.length = nxt_strlen(error.detail);
823 resp.detail.start = error.detail;
824 resp.offset = error.pos - mbuf->pos;
825
826 nxt_conf_json_position(mbuf->pos, error.pos,
827 &resp.line, &resp.column);
828
829 nxt_controller_response(task, req, &resp);
830 return;
831 }
832
833 if (path.length != 1) {
834 rc = nxt_conf_op_compile(c->mem_pool, &ops,
835 nxt_controller_conf.root,
836 &path, value);
837
838 if (rc != NXT_OK) {
839 if (rc == NXT_DECLINED) {
840 goto not_found;
841 }
842
843 goto alloc_fail;
844 }
845
846 value = nxt_conf_clone(mp, ops, nxt_controller_conf.root);
847
848 if (nxt_slow_path(value == NULL)) {
849 nxt_mp_destroy(mp);
850 goto alloc_fail;
851 }
852 }
853
854 nxt_memzero(&vldt, sizeof(nxt_conf_validation_t));
855
856 vldt.conf = value;
857 vldt.pool = c->mem_pool;
858
859 rc = nxt_conf_validate(&vldt);
860
861 if (nxt_slow_path(rc != NXT_OK)) {
862 nxt_mp_destroy(mp);
863
864 if (rc == NXT_DECLINED) {
865 resp.detail = vldt.error;
866 goto invalid_conf;
867 }
868
869 /* rc == NXT_ERROR */
870 goto alloc_fail;
871 }
872
873 rc = nxt_controller_conf_send(task, value,
874 nxt_controller_conf_handler, req);
875
876 if (nxt_slow_path(rc != NXT_OK)) {
877 nxt_mp_destroy(mp);
878
879 if (rc == NXT_DECLINED) {
880 goto no_router;
881 }
882
883 /* rc == NXT_ERROR */
884 goto alloc_fail;
885 }
886
887 req->conf.root = value;
888 req->conf.pool = mp;
889
890 nxt_queue_insert_head(&nxt_controller_waiting_requests, &req->link);
891
892 return;
893 }
894
895 if (nxt_str_eq(&req->parser.method, "DELETE", 6)) {
896
897 if (!nxt_queue_is_empty(&nxt_controller_waiting_requests)) {
898 nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link);
899 return;
900 }
901
902 if (path.length == 1) {
903 mp = nxt_mp_create(1024, 128, 256, 32);
904
905 if (nxt_slow_path(mp == NULL)) {
906 goto alloc_fail;
907 }
908
909 value = nxt_conf_json_parse_str(mp, &empty_obj);
910
911 } else {
912 rc = nxt_conf_op_compile(c->mem_pool, &ops,
913 nxt_controller_conf.root,
914 &path, NULL);
915
916 if (rc != NXT_OK) {
917 if (rc == NXT_DECLINED) {
918 goto not_found;
919 }
920
921 goto alloc_fail;
922 }
923
924 mp = nxt_mp_create(1024, 128, 256, 32);
925
926 if (nxt_slow_path(mp == NULL)) {
927 goto alloc_fail;
928 }
929
930 value = nxt_conf_clone(mp, ops, nxt_controller_conf.root);
931 }
932
933 if (nxt_slow_path(value == NULL)) {
934 nxt_mp_destroy(mp);
935 goto alloc_fail;
936 }
937
938 nxt_memzero(&vldt, sizeof(nxt_conf_validation_t));
939
940 vldt.conf = value;
941 vldt.pool = c->mem_pool;
942
943 rc = nxt_conf_validate(&vldt);
944
945 if (nxt_slow_path(rc != NXT_OK)) {
946 nxt_mp_destroy(mp);
947
948 if (rc == NXT_DECLINED) {
949 resp.detail = vldt.error;
950 goto invalid_conf;
951 }
952
953 /* rc == NXT_ERROR */
954 goto alloc_fail;
955 }
956
957 rc = nxt_controller_conf_send(task, value,
958 nxt_controller_conf_handler, req);
959
960 if (nxt_slow_path(rc != NXT_OK)) {
961 nxt_mp_destroy(mp);
962
963 if (rc == NXT_DECLINED) {
964 goto no_router;
965 }
966
967 /* rc == NXT_ERROR */
968 goto alloc_fail;
969 }
970
971 req->conf.root = value;
972 req->conf.pool = mp;
973
974 nxt_queue_insert_head(&nxt_controller_waiting_requests, &req->link);
975
976 return;
977 }
978
979 resp.status = 405;
980 resp.title = (u_char *) "Invalid method.";
981 resp.offset = -1;
982
983 nxt_controller_response(task, req, &resp);
984 return;
985
986not_found:
987
988 resp.status = 404;
989 resp.title = (u_char *) "Value doesn't exist.";
990 resp.offset = -1;
991
992 nxt_controller_response(task, req, &resp);
993 return;
994
995invalid_conf:
996
997 resp.status = 400;
998 resp.title = (u_char *) "Invalid configuration.";
999 resp.offset = -1;
1000
1001 nxt_controller_response(task, req, &resp);
1002 return;
1003
1004alloc_fail:
1005
1006 resp.status = 500;
1007 resp.title = (u_char *) "Memory allocation failed.";
1008 resp.offset = -1;
1009
1010 nxt_controller_response(task, req, &resp);
1011 return;
1012
1013no_router:
1014
1015 resp.status = 500;
1016 resp.title = (u_char *) "Router process isn't available.";
1017 resp.offset = -1;
1018
1019 nxt_controller_response(task, req, &resp);
1020 return;
1021}
1022
1023
1024static void
1025nxt_controller_conf_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
1026 void *data)
1027{
1028 nxt_queue_t queue;
1029 nxt_controller_request_t *req;
1030 nxt_controller_response_t resp;
1031
1032 req = data;
1033
1034 nxt_debug(task, "controller conf ready: %*s",
1035 nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos);
1036
1037 nxt_queue_remove(&req->link);
1038
1039 nxt_memzero(&resp, sizeof(nxt_controller_response_t));
1040
1041 if (msg->port_msg.type == NXT_PORT_MSG_RPC_READY) {
1042 nxt_mp_destroy(nxt_controller_conf.pool);
1043
1044 nxt_controller_conf = req->conf;
1045
1046 nxt_controller_conf_store(task, req->conf.root);
1047
1048 resp.status = 200;
1049 resp.title = (u_char *) "Reconfiguration done.";
1050
1051 } else {
1052 nxt_mp_destroy(req->conf.pool);
1053
1054 resp.status = 500;
1055 resp.title = (u_char *) "Failed to apply new configuration.";
1056 resp.offset = -1;
1057 }
1058
1059 nxt_controller_response(task, req, &resp);
1060
1061 nxt_queue_init(&queue);
1062 nxt_queue_add(&queue, &nxt_controller_waiting_requests);
1063
1064 nxt_queue_init(&nxt_controller_waiting_requests);
1065
1066 nxt_queue_each(req, &queue, nxt_controller_request_t, link) {
1067 nxt_controller_process_request(task, req);
1068 } nxt_queue_loop;
1069}
1070
1071
1072static void
1073nxt_controller_conf_store(nxt_task_t *task, nxt_conf_value_t *conf)
1074{
1075 size_t size;
1076 nxt_buf_t *b;
1077 nxt_port_t *main_port;
1078 nxt_runtime_t *rt;
1079
1080 rt = task->thread->runtime;
1081
1082 main_port = rt->port_by_type[NXT_PROCESS_MAIN];
1083
1084 size = nxt_conf_json_length(conf, NULL);
1085
1086 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size);
1087
1088 if (nxt_fast_path(b != NULL)) {
1089 b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL);
1090
1091 (void) nxt_port_socket_write(task, main_port, NXT_PORT_MSG_CONF_STORE,
1092 -1, 0, -1, b);
1093 }
1094}
1095
1096
1097static void
1098nxt_controller_response(nxt_task_t *task, nxt_controller_request_t *req,
1099 nxt_controller_response_t *resp)
1100{
1101 size_t size;
1102 nxt_str_t status_line, str;
1103 nxt_buf_t *b, *body;
1104 nxt_conn_t *c;
1105 nxt_uint_t n;
1106 nxt_conf_value_t *value, *location;
1107 nxt_conf_json_pretty_t pretty;
1108
1109 static nxt_str_t success_str = nxt_string("success");
1110 static nxt_str_t error_str = nxt_string("error");
1111 static nxt_str_t detail_str = nxt_string("detail");
1112 static nxt_str_t location_str = nxt_string("location");
1113 static nxt_str_t offset_str = nxt_string("offset");
1114 static nxt_str_t line_str = nxt_string("line");
1115 static nxt_str_t column_str = nxt_string("column");
1116
1117 static nxt_time_string_t date_cache = {
1118 (nxt_atomic_uint_t) -1,
1119 nxt_controller_date,
1120 "%s, %02d %s %4d %02d:%02d:%02d GMT",
1121 sizeof("Wed, 31 Dec 1986 16:40:00 GMT") - 1,
1122 NXT_THREAD_TIME_GMT,
1123 NXT_THREAD_TIME_SEC,
1124 };
1125
1126 switch (resp->status) {
1127
1128 case 200:
1129 nxt_str_set(&status_line, "200 OK");
1130 break;
1131
1132 case 400:
1133 nxt_str_set(&status_line, "400 Bad Request");
1134 break;
1135
1136 case 404:
1137 nxt_str_set(&status_line, "404 Not Found");
1138 break;
1139
1140 case 405:
1141 nxt_str_set(&status_line, "405 Method Not Allowed");
1142 break;
1143
1144 default:
1145 nxt_str_set(&status_line, "500 Internal Server Error");
1146 break;
1147 }
1148
1149 c = req->conn;
1150 value = resp->conf;
1151
1152 if (value == NULL) {
1153 n = 1
1154 + (resp->detail.length != 0)
1155 + (resp->status >= 400 && resp->offset != -1);
1156
1157 value = nxt_conf_create_object(c->mem_pool, n);
1158
1159 if (nxt_slow_path(value == NULL)) {
1160 nxt_controller_conn_close(task, c, req);
1161 return;
1162 }
1163
1164 str.length = nxt_strlen(resp->title);
1165 str.start = resp->title;
1166
1167 if (resp->status < 400) {
1168 nxt_conf_set_member_string(value, &success_str, &str, 0);
1169
1170 } else {
1171 nxt_conf_set_member_string(value, &error_str, &str, 0);
1172 }
1173
1174 n = 0;
1175
1176 if (resp->detail.length != 0) {
1177 n++;
1178
1179 nxt_conf_set_member_string(value, &detail_str, &resp->detail, n);
1180 }
1181
1182 if (resp->status >= 400 && resp->offset != -1) {
1183 n++;
1184
1185 location = nxt_conf_create_object(c->mem_pool,
1186 resp->line != 0 ? 3 : 1);
1187
1188 nxt_conf_set_member(value, &location_str, location, n);
1189
1190 nxt_conf_set_member_integer(location, &offset_str, resp->offset, 0);
1191
1192 if (resp->line != 0) {
1193 nxt_conf_set_member_integer(location, &line_str,
1194 resp->line, 1);
1195
1196 nxt_conf_set_member_integer(location, &column_str,
1197 resp->column, 2);
1198 }
1199 }
1200 }
1201
1202 nxt_memzero(&pretty, sizeof(nxt_conf_json_pretty_t));
1203
1204 size = nxt_conf_json_length(value, &pretty) + 2;
1205
1206 body = nxt_buf_mem_alloc(c->mem_pool, size, 0);
1207 if (nxt_slow_path(body == NULL)) {
1208 nxt_controller_conn_close(task, c, req);
1209 return;
1210 }
1211
1212 nxt_memzero(&pretty, sizeof(nxt_conf_json_pretty_t));
1213
1214 body->mem.free = nxt_conf_json_print(body->mem.free, value, &pretty);
1215
1216 body->mem.free = nxt_cpymem(body->mem.free, "\r\n", 2);
1217
1218 size = sizeof("HTTP/1.1 " "\r\n") - 1 + status_line.length
1219 + sizeof("Server: unit/" NXT_VERSION "\r\n") - 1
1220 + sizeof("Date: Wed, 31 Dec 1986 16:40:00 GMT\r\n") - 1
1221 + sizeof("Content-Type: application/json\r\n") - 1
1222 + sizeof("Content-Length: " "\r\n") - 1 + NXT_SIZE_T_LEN
1223 + sizeof("Connection: close\r\n") - 1
1224 + sizeof("\r\n") - 1;
1225
1226 b = nxt_buf_mem_alloc(c->mem_pool, size, 0);
1227 if (nxt_slow_path(b == NULL)) {
1228 nxt_controller_conn_close(task, c, req);
1229 return;
1230 }
1231
1232 b->next = body;
1233
1234 nxt_str_set(&str, "HTTP/1.1 ");
1235
1236 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length);
1237 b->mem.free = nxt_cpymem(b->mem.free, status_line.start,
1238 status_line.length);
1239
1240 nxt_str_set(&str, "\r\n"
1241 "Server: unit/" NXT_VERSION "\r\n"
1242 "Date: ");
1243
1244 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length);
1245
1246 b->mem.free = nxt_thread_time_string(task->thread, &date_cache,
1247 b->mem.free);
1248
1249 nxt_str_set(&str, "\r\n"
1250 "Content-Type: application/json\r\n"
1251 "Content-Length: ");
1252
1253 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length);
1254
1255 b->mem.free = nxt_sprintf(b->mem.free, b->mem.end, "%uz",
1256 nxt_buf_mem_used_size(&body->mem));
1257
1258 nxt_str_set(&str, "\r\n"
1259 "Connection: close\r\n"
1260 "\r\n");
1261
1262 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length);
1263
1264 c->write = b;
1265 c->write_state = &nxt_controller_conn_write_state;
1266
1267 nxt_conn_write(task->thread->engine, c);
1268}
1269
1270
1271static u_char *
1272nxt_controller_date(u_char *buf, nxt_realtime_t *now, struct tm *tm,
1273 size_t size, const char *format)
1274{
1275 static const char *week[] = { "Sun", "Mon", "Tue", "Wed", "Thu", "Fri",
1276 "Sat" };
1277
1278 static const char *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun",
1279 "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
1280
1281 return nxt_sprintf(buf, buf + size, format,
1282 week[tm->tm_wday], tm->tm_mday,
1283 month[tm->tm_mon], tm->tm_year + 1900,
1284 tm->tm_hour, tm->tm_min, tm->tm_sec);
1285}