1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8#include <nxt_main.h> 9#include <nxt_runtime.h> 10#include <nxt_main_process.h> 11#include <nxt_conf.h> 12 13 14typedef struct { 15 nxt_conf_value_t *root; 16 nxt_mp_t *pool; 17} nxt_controller_conf_t; 18 19 20typedef struct { 21 nxt_http_request_parse_t parser; 22 size_t length; 23 nxt_controller_conf_t conf; 24 nxt_conn_t *conn; 25 nxt_queue_link_t link; 26} nxt_controller_request_t; 27 28 29typedef struct { 30 nxt_uint_t status; 31 nxt_conf_value_t *conf; 32 33 u_char *title; 34 nxt_str_t detail; 35 ssize_t offset; 36 nxt_uint_t line; 37 nxt_uint_t column; 38} nxt_controller_response_t; 39 40 41static void nxt_controller_process_new_port_handler(nxt_task_t *task, 42 nxt_port_recv_msg_t *msg); 43static nxt_int_t nxt_controller_conf_default(void); 44static void nxt_controller_conf_init_handler(nxt_task_t *task, 45 nxt_port_recv_msg_t *msg, void *data); 46static nxt_int_t nxt_controller_conf_send(nxt_task_t *task, 47 nxt_conf_value_t *conf, nxt_port_rpc_handler_t handler, void *data); 48 49static void nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data); 50static void nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data); 51static nxt_msec_t nxt_controller_conn_timeout_value(nxt_conn_t *c, 52 uintptr_t data); 53static void nxt_controller_conn_read_error(nxt_task_t *task, void *obj, 54 void *data); 55static void nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj, 56 void *data); 57static void nxt_controller_conn_body_read(nxt_task_t *task, void *obj, 58 void *data); 59static void nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data); 60static void nxt_controller_conn_write_error(nxt_task_t *task, void *obj, 61 void *data); 62static void nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj, 63 void *data); 64static void nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data); 65static void nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data); 66 67static nxt_int_t nxt_controller_request_content_length(void *ctx,
| 1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8#include <nxt_main.h> 9#include <nxt_runtime.h> 10#include <nxt_main_process.h> 11#include <nxt_conf.h> 12 13 14typedef struct { 15 nxt_conf_value_t *root; 16 nxt_mp_t *pool; 17} nxt_controller_conf_t; 18 19 20typedef struct { 21 nxt_http_request_parse_t parser; 22 size_t length; 23 nxt_controller_conf_t conf; 24 nxt_conn_t *conn; 25 nxt_queue_link_t link; 26} nxt_controller_request_t; 27 28 29typedef struct { 30 nxt_uint_t status; 31 nxt_conf_value_t *conf; 32 33 u_char *title; 34 nxt_str_t detail; 35 ssize_t offset; 36 nxt_uint_t line; 37 nxt_uint_t column; 38} nxt_controller_response_t; 39 40 41static void nxt_controller_process_new_port_handler(nxt_task_t *task, 42 nxt_port_recv_msg_t *msg); 43static nxt_int_t nxt_controller_conf_default(void); 44static void nxt_controller_conf_init_handler(nxt_task_t *task, 45 nxt_port_recv_msg_t *msg, void *data); 46static nxt_int_t nxt_controller_conf_send(nxt_task_t *task, 47 nxt_conf_value_t *conf, nxt_port_rpc_handler_t handler, void *data); 48 49static void nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data); 50static void nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data); 51static nxt_msec_t nxt_controller_conn_timeout_value(nxt_conn_t *c, 52 uintptr_t data); 53static void nxt_controller_conn_read_error(nxt_task_t *task, void *obj, 54 void *data); 55static void nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj, 56 void *data); 57static void nxt_controller_conn_body_read(nxt_task_t *task, void *obj, 58 void *data); 59static void nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data); 60static void nxt_controller_conn_write_error(nxt_task_t *task, void *obj, 61 void *data); 62static void nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj, 63 void *data); 64static void nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data); 65static void nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data); 66 67static nxt_int_t nxt_controller_request_content_length(void *ctx,
|
68 nxt_http_field_t *field, nxt_log_t *log);
| 68 nxt_http_field_t *field, uintptr_t data);
|
69 70static void nxt_controller_process_request(nxt_task_t *task, 71 nxt_controller_request_t *req); 72static void nxt_controller_conf_handler(nxt_task_t *task, 73 nxt_port_recv_msg_t *msg, void *data); 74static void nxt_controller_conf_store(nxt_task_t *task, 75 nxt_conf_value_t *conf); 76static void nxt_controller_response(nxt_task_t *task, 77 nxt_controller_request_t *req, nxt_controller_response_t *resp); 78static u_char *nxt_controller_date(u_char *buf, nxt_realtime_t *now, 79 struct tm *tm, size_t size, const char *format); 80 81
| 69 70static void nxt_controller_process_request(nxt_task_t *task, 71 nxt_controller_request_t *req); 72static void nxt_controller_conf_handler(nxt_task_t *task, 73 nxt_port_recv_msg_t *msg, void *data); 74static void nxt_controller_conf_store(nxt_task_t *task, 75 nxt_conf_value_t *conf); 76static void nxt_controller_response(nxt_task_t *task, 77 nxt_controller_request_t *req, nxt_controller_response_t *resp); 78static u_char *nxt_controller_date(u_char *buf, nxt_realtime_t *now, 79 struct tm *tm, size_t size, const char *format); 80 81
|
82static nxt_http_fields_hash_entry_t nxt_controller_request_fields[] = {
| 82static nxt_http_field_proc_t nxt_controller_request_fields[] = {
|
83 { nxt_string("Content-Length"), 84 &nxt_controller_request_content_length, 0 },
| 83 { nxt_string("Content-Length"), 84 &nxt_controller_request_content_length, 0 },
|
85 86 { nxt_null_string, NULL, 0 }
| |
87}; 88
| 85}; 86
|
89static nxt_http_fields_hash_t *nxt_controller_fields_hash;
| 87static nxt_lvlhsh_t nxt_controller_fields_hash;
|
90 91static nxt_uint_t nxt_controller_listening; 92static nxt_controller_conf_t nxt_controller_conf; 93static nxt_queue_t nxt_controller_waiting_requests; 94 95 96static const nxt_event_conn_state_t nxt_controller_conn_read_state; 97static const nxt_event_conn_state_t nxt_controller_conn_body_read_state; 98static const nxt_event_conn_state_t nxt_controller_conn_write_state; 99static const nxt_event_conn_state_t nxt_controller_conn_close_state; 100 101 102nxt_port_handlers_t nxt_controller_process_port_handlers = { 103 .quit = nxt_worker_process_quit_handler, 104 .new_port = nxt_controller_process_new_port_handler, 105 .change_file = nxt_port_change_log_file_handler, 106 .mmap = nxt_port_mmap_handler, 107 .data = nxt_port_data_handler, 108 .remove_pid = nxt_port_remove_pid_handler, 109 .rpc_ready = nxt_port_rpc_handler, 110 .rpc_error = nxt_port_rpc_handler, 111}; 112 113 114nxt_int_t 115nxt_controller_start(nxt_task_t *task, void *data) 116{
| 88 89static nxt_uint_t nxt_controller_listening; 90static nxt_controller_conf_t nxt_controller_conf; 91static nxt_queue_t nxt_controller_waiting_requests; 92 93 94static const nxt_event_conn_state_t nxt_controller_conn_read_state; 95static const nxt_event_conn_state_t nxt_controller_conn_body_read_state; 96static const nxt_event_conn_state_t nxt_controller_conn_write_state; 97static const nxt_event_conn_state_t nxt_controller_conn_close_state; 98 99 100nxt_port_handlers_t nxt_controller_process_port_handlers = { 101 .quit = nxt_worker_process_quit_handler, 102 .new_port = nxt_controller_process_new_port_handler, 103 .change_file = nxt_port_change_log_file_handler, 104 .mmap = nxt_port_mmap_handler, 105 .data = nxt_port_data_handler, 106 .remove_pid = nxt_port_remove_pid_handler, 107 .rpc_ready = nxt_port_rpc_handler, 108 .rpc_error = nxt_port_rpc_handler, 109}; 110 111 112nxt_int_t 113nxt_controller_start(nxt_task_t *task, void *data) 114{
|
117 nxt_mp_t *mp; 118 nxt_int_t ret; 119 nxt_str_t *json; 120 nxt_runtime_t *rt; 121 nxt_conf_value_t *conf; 122 nxt_event_engine_t *engine; 123 nxt_conf_validation_t vldt; 124 nxt_http_fields_hash_t *hash;
| 115 nxt_mp_t *mp; 116 nxt_int_t ret; 117 nxt_str_t *json; 118 nxt_runtime_t *rt; 119 nxt_conf_value_t *conf; 120 nxt_event_engine_t *engine; 121 nxt_conf_validation_t vldt;
|
125 126 rt = task->thread->runtime; 127 128 engine = task->thread->engine; 129 130 engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64); 131 if (nxt_slow_path(engine->mem_pool == NULL)) { 132 return NXT_ERROR; 133 } 134
| 122 123 rt = task->thread->runtime; 124 125 engine = task->thread->engine; 126 127 engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64); 128 if (nxt_slow_path(engine->mem_pool == NULL)) { 129 return NXT_ERROR; 130 } 131
|
135 hash = nxt_http_fields_hash_create(nxt_controller_request_fields, 136 rt->mem_pool); 137 if (nxt_slow_path(hash == NULL)) {
| 132 ret = nxt_http_fields_hash(&nxt_controller_fields_hash, rt->mem_pool, 133 nxt_controller_request_fields, 134 nxt_nitems(nxt_controller_request_fields)); 135 136 if (nxt_slow_path(ret != NXT_OK)) {
|
138 return NXT_ERROR; 139 } 140
| 137 return NXT_ERROR; 138 } 139
|
141 nxt_controller_fields_hash = hash;
| |
142 nxt_queue_init(&nxt_controller_waiting_requests); 143 144 json = data; 145 146 if (json->length == 0) { 147 return NXT_OK; 148 } 149 150 mp = nxt_mp_create(1024, 128, 256, 32); 151 if (nxt_slow_path(mp == NULL)) { 152 return NXT_ERROR; 153 } 154 155 conf = nxt_conf_json_parse_str(mp, json); 156 nxt_free(json->start); 157 158 if (nxt_slow_path(conf == NULL)) { 159 nxt_log(task, NXT_LOG_ALERT, 160 "failed to restore previous configuration: " 161 "file is corrupted or not enough memory"); 162 163 nxt_mp_destroy(mp); 164 return NXT_OK; 165 } 166 167 nxt_memzero(&vldt, sizeof(nxt_conf_validation_t)); 168 169 vldt.pool = nxt_mp_create(1024, 128, 256, 32); 170 if (nxt_slow_path(vldt.pool == NULL)) { 171 return NXT_ERROR; 172 } 173 174 vldt.conf = conf; 175 176 ret = nxt_conf_validate(&vldt); 177 178 if (nxt_slow_path(ret != NXT_OK)) { 179 180 if (ret == NXT_DECLINED) { 181 nxt_log(task, NXT_LOG_ALERT, 182 "the previous configuration is invalid: %V", &vldt.error); 183 184 nxt_mp_destroy(vldt.pool); 185 nxt_mp_destroy(mp); 186 187 return NXT_OK; 188 } 189 190 /* ret == NXT_ERROR */ 191 192 return NXT_ERROR; 193 } 194 195 nxt_mp_destroy(vldt.pool); 196 197 nxt_controller_conf.root = conf; 198 nxt_controller_conf.pool = mp; 199 200 return NXT_OK; 201} 202 203 204static void 205nxt_controller_process_new_port_handler(nxt_task_t *task, 206 nxt_port_recv_msg_t *msg) 207{ 208 nxt_int_t rc; 209 nxt_runtime_t *rt; 210 nxt_conf_value_t *conf; 211 212 nxt_port_new_port_handler(task, msg); 213 214 if (msg->u.new_port->type != NXT_PROCESS_ROUTER) { 215 return; 216 } 217 218 conf = nxt_controller_conf.root; 219 220 if (conf != NULL) { 221 rc = nxt_controller_conf_send(task, conf, 222 nxt_controller_conf_init_handler, NULL); 223 224 if (nxt_fast_path(rc == NXT_OK)) { 225 return; 226 } 227 228 nxt_mp_destroy(nxt_controller_conf.pool); 229 230 if (nxt_slow_path(nxt_controller_conf_default() != NXT_OK)) { 231 nxt_abort(); 232 } 233 } 234 235 if (nxt_slow_path(nxt_controller_conf_default() != NXT_OK)) { 236 nxt_abort(); 237 } 238 239 rt = task->thread->runtime; 240 241 if (nxt_slow_path(nxt_listen_event(task, rt->controller_socket) == NULL)) { 242 nxt_abort(); 243 } 244 245 nxt_controller_listening = 1; 246} 247 248 249static nxt_int_t 250nxt_controller_conf_default(void) 251{ 252 nxt_mp_t *mp; 253 nxt_conf_value_t *conf; 254 255 static const nxt_str_t json 256 = nxt_string("{ \"listeners\": {}, \"applications\": {} }"); 257 258 mp = nxt_mp_create(1024, 128, 256, 32); 259 260 if (nxt_slow_path(mp == NULL)) { 261 return NXT_ERROR; 262 } 263 264 conf = nxt_conf_json_parse_str(mp, &json); 265 266 if (nxt_slow_path(conf == NULL)) { 267 return NXT_ERROR; 268 } 269 270 nxt_controller_conf.root = conf; 271 nxt_controller_conf.pool = mp; 272 273 return NXT_OK; 274} 275 276 277static void 278nxt_controller_conf_init_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 279 void *data) 280{ 281 nxt_runtime_t *rt; 282 283 if (msg->port_msg.type != NXT_PORT_MSG_RPC_READY) { 284 nxt_log(task, NXT_LOG_ALERT, "failed to apply previous configuration"); 285 286 nxt_mp_destroy(nxt_controller_conf.pool); 287 288 if (nxt_slow_path(nxt_controller_conf_default() != NXT_OK)) { 289 nxt_abort(); 290 } 291 } 292 293 if (nxt_controller_listening == 0) { 294 rt = task->thread->runtime; 295 296 if (nxt_slow_path(nxt_listen_event(task, rt->controller_socket) 297 == NULL)) 298 { 299 nxt_abort(); 300 } 301 302 nxt_controller_listening = 1; 303 } 304} 305 306 307static nxt_int_t 308nxt_controller_conf_send(nxt_task_t *task, nxt_conf_value_t *conf, 309 nxt_port_rpc_handler_t handler, void *data) 310{ 311 size_t size; 312 uint32_t stream; 313 nxt_int_t rc; 314 nxt_buf_t *b; 315 nxt_port_t *router_port, *controller_port; 316 nxt_runtime_t *rt; 317 318 rt = task->thread->runtime; 319 320 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 321 322 if (nxt_slow_path(router_port == NULL)) { 323 return NXT_DECLINED; 324 } 325 326 controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER]; 327 328 size = nxt_conf_json_length(conf, NULL); 329 330 b = nxt_port_mmap_get_buf(task, router_port, size); 331 if (nxt_slow_path(b == NULL)) { 332 return NXT_ERROR; 333 } 334 335 b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL); 336 337 stream = nxt_port_rpc_register_handler(task, controller_port, 338 handler, handler, 339 router_port->pid, data); 340 341 rc = nxt_port_socket_write(task, router_port, NXT_PORT_MSG_DATA_LAST, -1, 342 stream, controller_port->id, b); 343 344 if (nxt_slow_path(rc != NXT_OK)) { 345 nxt_port_rpc_cancel(task, controller_port, stream); 346 return NXT_ERROR; 347 } 348 349 return NXT_OK; 350} 351 352 353nxt_int_t 354nxt_runtime_controller_socket(nxt_task_t *task, nxt_runtime_t *rt) 355{ 356 nxt_sockaddr_t *sa; 357 nxt_listen_socket_t *ls; 358 359 sa = rt->controller_listen; 360 361 ls = nxt_mp_alloc(rt->mem_pool, sizeof(nxt_listen_socket_t)); 362 if (ls == NULL) { 363 return NXT_ERROR; 364 } 365 366 ls->sockaddr = nxt_sockaddr_create(rt->mem_pool, &sa->u.sockaddr, 367 sa->socklen, sa->length); 368 if (ls->sockaddr == NULL) { 369 return NXT_ERROR; 370 } 371 372 ls->sockaddr->type = sa->type; 373 nxt_sockaddr_text(ls->sockaddr); 374 375 nxt_listen_socket_remote_size(ls); 376 377 ls->socket = -1; 378 ls->backlog = NXT_LISTEN_BACKLOG; 379 ls->read_after_accept = 1; 380 ls->flags = NXT_NONBLOCK; 381 382#if 0 383 /* STUB */ 384 wq = nxt_mp_zget(cf->mem_pool, sizeof(nxt_work_queue_t)); 385 if (wq == NULL) { 386 return NXT_ERROR; 387 } 388 nxt_work_queue_name(wq, "listen"); 389 /**/ 390 391 ls->work_queue = wq; 392#endif 393 ls->handler = nxt_controller_conn_init; 394 395 if (nxt_listen_socket_create(task, ls, 0) != NXT_OK) { 396 return NXT_ERROR; 397 } 398 399 rt->controller_socket = ls; 400 401 return NXT_OK; 402} 403 404 405static void 406nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data) 407{ 408 nxt_buf_t *b; 409 nxt_conn_t *c; 410 nxt_event_engine_t *engine; 411 nxt_controller_request_t *r; 412 413 c = obj; 414 415 nxt_debug(task, "controller conn init fd:%d", c->socket.fd); 416 417 r = nxt_mp_zget(c->mem_pool, sizeof(nxt_controller_request_t)); 418 if (nxt_slow_path(r == NULL)) { 419 nxt_controller_conn_free(task, c, NULL); 420 return; 421 } 422 423 r->conn = c; 424 425 if (nxt_slow_path(nxt_http_parse_request_init(&r->parser, c->mem_pool) 426 != NXT_OK)) 427 { 428 nxt_controller_conn_free(task, c, NULL); 429 return; 430 } 431
| 140 nxt_queue_init(&nxt_controller_waiting_requests); 141 142 json = data; 143 144 if (json->length == 0) { 145 return NXT_OK; 146 } 147 148 mp = nxt_mp_create(1024, 128, 256, 32); 149 if (nxt_slow_path(mp == NULL)) { 150 return NXT_ERROR; 151 } 152 153 conf = nxt_conf_json_parse_str(mp, json); 154 nxt_free(json->start); 155 156 if (nxt_slow_path(conf == NULL)) { 157 nxt_log(task, NXT_LOG_ALERT, 158 "failed to restore previous configuration: " 159 "file is corrupted or not enough memory"); 160 161 nxt_mp_destroy(mp); 162 return NXT_OK; 163 } 164 165 nxt_memzero(&vldt, sizeof(nxt_conf_validation_t)); 166 167 vldt.pool = nxt_mp_create(1024, 128, 256, 32); 168 if (nxt_slow_path(vldt.pool == NULL)) { 169 return NXT_ERROR; 170 } 171 172 vldt.conf = conf; 173 174 ret = nxt_conf_validate(&vldt); 175 176 if (nxt_slow_path(ret != NXT_OK)) { 177 178 if (ret == NXT_DECLINED) { 179 nxt_log(task, NXT_LOG_ALERT, 180 "the previous configuration is invalid: %V", &vldt.error); 181 182 nxt_mp_destroy(vldt.pool); 183 nxt_mp_destroy(mp); 184 185 return NXT_OK; 186 } 187 188 /* ret == NXT_ERROR */ 189 190 return NXT_ERROR; 191 } 192 193 nxt_mp_destroy(vldt.pool); 194 195 nxt_controller_conf.root = conf; 196 nxt_controller_conf.pool = mp; 197 198 return NXT_OK; 199} 200 201 202static void 203nxt_controller_process_new_port_handler(nxt_task_t *task, 204 nxt_port_recv_msg_t *msg) 205{ 206 nxt_int_t rc; 207 nxt_runtime_t *rt; 208 nxt_conf_value_t *conf; 209 210 nxt_port_new_port_handler(task, msg); 211 212 if (msg->u.new_port->type != NXT_PROCESS_ROUTER) { 213 return; 214 } 215 216 conf = nxt_controller_conf.root; 217 218 if (conf != NULL) { 219 rc = nxt_controller_conf_send(task, conf, 220 nxt_controller_conf_init_handler, NULL); 221 222 if (nxt_fast_path(rc == NXT_OK)) { 223 return; 224 } 225 226 nxt_mp_destroy(nxt_controller_conf.pool); 227 228 if (nxt_slow_path(nxt_controller_conf_default() != NXT_OK)) { 229 nxt_abort(); 230 } 231 } 232 233 if (nxt_slow_path(nxt_controller_conf_default() != NXT_OK)) { 234 nxt_abort(); 235 } 236 237 rt = task->thread->runtime; 238 239 if (nxt_slow_path(nxt_listen_event(task, rt->controller_socket) == NULL)) { 240 nxt_abort(); 241 } 242 243 nxt_controller_listening = 1; 244} 245 246 247static nxt_int_t 248nxt_controller_conf_default(void) 249{ 250 nxt_mp_t *mp; 251 nxt_conf_value_t *conf; 252 253 static const nxt_str_t json 254 = nxt_string("{ \"listeners\": {}, \"applications\": {} }"); 255 256 mp = nxt_mp_create(1024, 128, 256, 32); 257 258 if (nxt_slow_path(mp == NULL)) { 259 return NXT_ERROR; 260 } 261 262 conf = nxt_conf_json_parse_str(mp, &json); 263 264 if (nxt_slow_path(conf == NULL)) { 265 return NXT_ERROR; 266 } 267 268 nxt_controller_conf.root = conf; 269 nxt_controller_conf.pool = mp; 270 271 return NXT_OK; 272} 273 274 275static void 276nxt_controller_conf_init_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 277 void *data) 278{ 279 nxt_runtime_t *rt; 280 281 if (msg->port_msg.type != NXT_PORT_MSG_RPC_READY) { 282 nxt_log(task, NXT_LOG_ALERT, "failed to apply previous configuration"); 283 284 nxt_mp_destroy(nxt_controller_conf.pool); 285 286 if (nxt_slow_path(nxt_controller_conf_default() != NXT_OK)) { 287 nxt_abort(); 288 } 289 } 290 291 if (nxt_controller_listening == 0) { 292 rt = task->thread->runtime; 293 294 if (nxt_slow_path(nxt_listen_event(task, rt->controller_socket) 295 == NULL)) 296 { 297 nxt_abort(); 298 } 299 300 nxt_controller_listening = 1; 301 } 302} 303 304 305static nxt_int_t 306nxt_controller_conf_send(nxt_task_t *task, nxt_conf_value_t *conf, 307 nxt_port_rpc_handler_t handler, void *data) 308{ 309 size_t size; 310 uint32_t stream; 311 nxt_int_t rc; 312 nxt_buf_t *b; 313 nxt_port_t *router_port, *controller_port; 314 nxt_runtime_t *rt; 315 316 rt = task->thread->runtime; 317 318 router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; 319 320 if (nxt_slow_path(router_port == NULL)) { 321 return NXT_DECLINED; 322 } 323 324 controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER]; 325 326 size = nxt_conf_json_length(conf, NULL); 327 328 b = nxt_port_mmap_get_buf(task, router_port, size); 329 if (nxt_slow_path(b == NULL)) { 330 return NXT_ERROR; 331 } 332 333 b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL); 334 335 stream = nxt_port_rpc_register_handler(task, controller_port, 336 handler, handler, 337 router_port->pid, data); 338 339 rc = nxt_port_socket_write(task, router_port, NXT_PORT_MSG_DATA_LAST, -1, 340 stream, controller_port->id, b); 341 342 if (nxt_slow_path(rc != NXT_OK)) { 343 nxt_port_rpc_cancel(task, controller_port, stream); 344 return NXT_ERROR; 345 } 346 347 return NXT_OK; 348} 349 350 351nxt_int_t 352nxt_runtime_controller_socket(nxt_task_t *task, nxt_runtime_t *rt) 353{ 354 nxt_sockaddr_t *sa; 355 nxt_listen_socket_t *ls; 356 357 sa = rt->controller_listen; 358 359 ls = nxt_mp_alloc(rt->mem_pool, sizeof(nxt_listen_socket_t)); 360 if (ls == NULL) { 361 return NXT_ERROR; 362 } 363 364 ls->sockaddr = nxt_sockaddr_create(rt->mem_pool, &sa->u.sockaddr, 365 sa->socklen, sa->length); 366 if (ls->sockaddr == NULL) { 367 return NXT_ERROR; 368 } 369 370 ls->sockaddr->type = sa->type; 371 nxt_sockaddr_text(ls->sockaddr); 372 373 nxt_listen_socket_remote_size(ls); 374 375 ls->socket = -1; 376 ls->backlog = NXT_LISTEN_BACKLOG; 377 ls->read_after_accept = 1; 378 ls->flags = NXT_NONBLOCK; 379 380#if 0 381 /* STUB */ 382 wq = nxt_mp_zget(cf->mem_pool, sizeof(nxt_work_queue_t)); 383 if (wq == NULL) { 384 return NXT_ERROR; 385 } 386 nxt_work_queue_name(wq, "listen"); 387 /**/ 388 389 ls->work_queue = wq; 390#endif 391 ls->handler = nxt_controller_conn_init; 392 393 if (nxt_listen_socket_create(task, ls, 0) != NXT_OK) { 394 return NXT_ERROR; 395 } 396 397 rt->controller_socket = ls; 398 399 return NXT_OK; 400} 401 402 403static void 404nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data) 405{ 406 nxt_buf_t *b; 407 nxt_conn_t *c; 408 nxt_event_engine_t *engine; 409 nxt_controller_request_t *r; 410 411 c = obj; 412 413 nxt_debug(task, "controller conn init fd:%d", c->socket.fd); 414 415 r = nxt_mp_zget(c->mem_pool, sizeof(nxt_controller_request_t)); 416 if (nxt_slow_path(r == NULL)) { 417 nxt_controller_conn_free(task, c, NULL); 418 return; 419 } 420 421 r->conn = c; 422 423 if (nxt_slow_path(nxt_http_parse_request_init(&r->parser, c->mem_pool) 424 != NXT_OK)) 425 { 426 nxt_controller_conn_free(task, c, NULL); 427 return; 428 } 429
|
432 r->parser.fields_hash = nxt_controller_fields_hash; 433
| |
434 b = nxt_buf_mem_alloc(c->mem_pool, 1024, 0); 435 if (nxt_slow_path(b == NULL)) { 436 nxt_controller_conn_free(task, c, NULL); 437 return; 438 } 439 440 c->read = b; 441 c->socket.data = r; 442 c->socket.read_ready = 1; 443 c->read_state = &nxt_controller_conn_read_state; 444 445 engine = task->thread->engine; 446 c->read_work_queue = &engine->read_work_queue; 447 c->write_work_queue = &engine->write_work_queue; 448 449 nxt_conn_read(engine, c); 450} 451 452 453static const nxt_event_conn_state_t nxt_controller_conn_read_state 454 nxt_aligned(64) = 455{ 456 .ready_handler = nxt_controller_conn_read, 457 .close_handler = nxt_controller_conn_close, 458 .error_handler = nxt_controller_conn_read_error, 459 460 .timer_handler = nxt_controller_conn_read_timeout, 461 .timer_value = nxt_controller_conn_timeout_value, 462 .timer_data = 60 * 1000, 463}; 464 465 466static void 467nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data) 468{ 469 size_t preread; 470 nxt_buf_t *b; 471 nxt_int_t rc; 472 nxt_conn_t *c; 473 nxt_controller_request_t *r; 474 475 c = obj; 476 r = data; 477 478 nxt_debug(task, "controller conn read"); 479 480 nxt_queue_remove(&c->link); 481 nxt_queue_self(&c->link); 482 483 b = c->read; 484 485 rc = nxt_http_parse_request(&r->parser, &b->mem); 486 487 if (nxt_slow_path(rc != NXT_DONE)) { 488 489 if (rc == NXT_AGAIN) { 490 if (nxt_buf_mem_free_size(&b->mem) == 0) { 491 nxt_log(task, NXT_LOG_ERR, "too long request headers"); 492 nxt_controller_conn_close(task, c, r); 493 return; 494 } 495 496 nxt_conn_read(task->thread->engine, c); 497 return; 498 } 499 500 /* rc == NXT_ERROR */ 501 502 nxt_log(task, NXT_LOG_ERR, "parsing error"); 503 504 nxt_controller_conn_close(task, c, r); 505 return; 506 } 507
| 430 b = nxt_buf_mem_alloc(c->mem_pool, 1024, 0); 431 if (nxt_slow_path(b == NULL)) { 432 nxt_controller_conn_free(task, c, NULL); 433 return; 434 } 435 436 c->read = b; 437 c->socket.data = r; 438 c->socket.read_ready = 1; 439 c->read_state = &nxt_controller_conn_read_state; 440 441 engine = task->thread->engine; 442 c->read_work_queue = &engine->read_work_queue; 443 c->write_work_queue = &engine->write_work_queue; 444 445 nxt_conn_read(engine, c); 446} 447 448 449static const nxt_event_conn_state_t nxt_controller_conn_read_state 450 nxt_aligned(64) = 451{ 452 .ready_handler = nxt_controller_conn_read, 453 .close_handler = nxt_controller_conn_close, 454 .error_handler = nxt_controller_conn_read_error, 455 456 .timer_handler = nxt_controller_conn_read_timeout, 457 .timer_value = nxt_controller_conn_timeout_value, 458 .timer_data = 60 * 1000, 459}; 460 461 462static void 463nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data) 464{ 465 size_t preread; 466 nxt_buf_t *b; 467 nxt_int_t rc; 468 nxt_conn_t *c; 469 nxt_controller_request_t *r; 470 471 c = obj; 472 r = data; 473 474 nxt_debug(task, "controller conn read"); 475 476 nxt_queue_remove(&c->link); 477 nxt_queue_self(&c->link); 478 479 b = c->read; 480 481 rc = nxt_http_parse_request(&r->parser, &b->mem); 482 483 if (nxt_slow_path(rc != NXT_DONE)) { 484 485 if (rc == NXT_AGAIN) { 486 if (nxt_buf_mem_free_size(&b->mem) == 0) { 487 nxt_log(task, NXT_LOG_ERR, "too long request headers"); 488 nxt_controller_conn_close(task, c, r); 489 return; 490 } 491 492 nxt_conn_read(task->thread->engine, c); 493 return; 494 } 495 496 /* rc == NXT_ERROR */ 497 498 nxt_log(task, NXT_LOG_ERR, "parsing error"); 499 500 nxt_controller_conn_close(task, c, r); 501 return; 502 } 503
|
508 rc = nxt_http_fields_process(r->parser.fields, r, task->log);
| 504 rc = nxt_http_fields_process(r->parser.fields, &nxt_controller_fields_hash, 505 r);
|
509 510 if (nxt_slow_path(rc != NXT_OK)) { 511 nxt_controller_conn_close(task, c, r); 512 return; 513 } 514 515 preread = nxt_buf_mem_used_size(&b->mem); 516 517 nxt_debug(task, "controller request header parsing complete, " 518 "body length: %uz, preread: %uz", 519 r->length, preread); 520 521 if (preread >= r->length) { 522 nxt_controller_process_request(task, r); 523 return; 524 } 525 526 if (r->length - preread > (size_t) nxt_buf_mem_free_size(&b->mem)) { 527 b = nxt_buf_mem_alloc(c->mem_pool, r->length, 0); 528 if (nxt_slow_path(b == NULL)) { 529 nxt_controller_conn_free(task, c, NULL); 530 return; 531 } 532 533 b->mem.free = nxt_cpymem(b->mem.free, c->read->mem.pos, preread); 534 535 c->read = b; 536 } 537 538 c->read_state = &nxt_controller_conn_body_read_state; 539 540 nxt_conn_read(task->thread->engine, c); 541} 542 543 544static nxt_msec_t 545nxt_controller_conn_timeout_value(nxt_conn_t *c, uintptr_t data) 546{ 547 return (nxt_msec_t) data; 548} 549 550 551static void 552nxt_controller_conn_read_error(nxt_task_t *task, void *obj, void *data) 553{ 554 nxt_conn_t *c; 555 556 c = obj; 557 558 nxt_debug(task, "controller conn read error"); 559 560 nxt_controller_conn_close(task, c, data); 561} 562 563 564static void 565nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj, void *data) 566{ 567 nxt_timer_t *timer; 568 nxt_conn_t *c; 569 570 timer = obj; 571 572 c = nxt_read_timer_conn(timer); 573 c->socket.timedout = 1; 574 c->socket.closed = 1; 575 576 nxt_debug(task, "controller conn read timeout"); 577 578 nxt_controller_conn_close(task, c, data); 579} 580 581 582static const nxt_event_conn_state_t nxt_controller_conn_body_read_state 583 nxt_aligned(64) = 584{ 585 .ready_handler = nxt_controller_conn_body_read, 586 .close_handler = nxt_controller_conn_close, 587 .error_handler = nxt_controller_conn_read_error, 588 589 .timer_handler = nxt_controller_conn_read_timeout, 590 .timer_value = nxt_controller_conn_timeout_value, 591 .timer_data = 60 * 1000, 592 .timer_autoreset = 1, 593}; 594 595 596static void 597nxt_controller_conn_body_read(nxt_task_t *task, void *obj, void *data) 598{ 599 size_t read; 600 nxt_buf_t *b; 601 nxt_conn_t *c; 602 nxt_controller_request_t *r; 603 604 c = obj; 605 r = data; 606 b = c->read; 607 608 read = nxt_buf_mem_used_size(&b->mem); 609 610 nxt_debug(task, "controller conn body read: %uz of %uz", 611 read, r->length); 612 613 if (read >= r->length) { 614 nxt_controller_process_request(task, r); 615 return; 616 } 617 618 nxt_conn_read(task->thread->engine, c); 619} 620 621 622static const nxt_event_conn_state_t nxt_controller_conn_write_state 623 nxt_aligned(64) = 624{ 625 .ready_handler = nxt_controller_conn_write, 626 .error_handler = nxt_controller_conn_write_error, 627 628 .timer_handler = nxt_controller_conn_write_timeout, 629 .timer_value = nxt_controller_conn_timeout_value, 630 .timer_data = 60 * 1000, 631 .timer_autoreset = 1, 632}; 633 634 635static void 636nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data) 637{ 638 nxt_buf_t *b; 639 nxt_conn_t *c; 640 641 c = obj; 642 643 nxt_debug(task, "controller conn write"); 644 645 b = c->write; 646 647 if (b->mem.pos != b->mem.free) { 648 nxt_conn_write(task->thread->engine, c); 649 return; 650 } 651 652 nxt_debug(task, "controller conn write complete"); 653 654 nxt_controller_conn_close(task, c, data); 655} 656 657 658static void 659nxt_controller_conn_write_error(nxt_task_t *task, void *obj, void *data) 660{ 661 nxt_conn_t *c; 662 663 c = obj; 664 665 nxt_debug(task, "controller conn write error"); 666 667 nxt_controller_conn_close(task, c, data); 668} 669 670 671static void 672nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj, void *data) 673{ 674 nxt_conn_t *c; 675 nxt_timer_t *timer; 676 677 timer = obj; 678 679 c = nxt_write_timer_conn(timer); 680 c->socket.timedout = 1; 681 c->socket.closed = 1; 682 683 nxt_debug(task, "controller conn write timeout"); 684 685 nxt_controller_conn_close(task, c, data); 686} 687 688 689static const nxt_event_conn_state_t nxt_controller_conn_close_state 690 nxt_aligned(64) = 691{ 692 .ready_handler = nxt_controller_conn_free, 693}; 694 695 696static void 697nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data) 698{ 699 nxt_conn_t *c; 700 701 c = obj; 702 703 nxt_debug(task, "controller conn close"); 704 705 nxt_queue_remove(&c->link); 706 707 c->write_state = &nxt_controller_conn_close_state; 708 709 nxt_conn_close(task->thread->engine, c); 710} 711 712 713static void 714nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data) 715{ 716 nxt_conn_t *c; 717 718 c = obj; 719 720 nxt_debug(task, "controller conn free"); 721 722 nxt_sockaddr_cache_free(task->thread->engine, c); 723 724 nxt_conn_free(task, c); 725} 726 727 728static nxt_int_t 729nxt_controller_request_content_length(void *ctx, nxt_http_field_t *field,
| 506 507 if (nxt_slow_path(rc != NXT_OK)) { 508 nxt_controller_conn_close(task, c, r); 509 return; 510 } 511 512 preread = nxt_buf_mem_used_size(&b->mem); 513 514 nxt_debug(task, "controller request header parsing complete, " 515 "body length: %uz, preread: %uz", 516 r->length, preread); 517 518 if (preread >= r->length) { 519 nxt_controller_process_request(task, r); 520 return; 521 } 522 523 if (r->length - preread > (size_t) nxt_buf_mem_free_size(&b->mem)) { 524 b = nxt_buf_mem_alloc(c->mem_pool, r->length, 0); 525 if (nxt_slow_path(b == NULL)) { 526 nxt_controller_conn_free(task, c, NULL); 527 return; 528 } 529 530 b->mem.free = nxt_cpymem(b->mem.free, c->read->mem.pos, preread); 531 532 c->read = b; 533 } 534 535 c->read_state = &nxt_controller_conn_body_read_state; 536 537 nxt_conn_read(task->thread->engine, c); 538} 539 540 541static nxt_msec_t 542nxt_controller_conn_timeout_value(nxt_conn_t *c, uintptr_t data) 543{ 544 return (nxt_msec_t) data; 545} 546 547 548static void 549nxt_controller_conn_read_error(nxt_task_t *task, void *obj, void *data) 550{ 551 nxt_conn_t *c; 552 553 c = obj; 554 555 nxt_debug(task, "controller conn read error"); 556 557 nxt_controller_conn_close(task, c, data); 558} 559 560 561static void 562nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj, void *data) 563{ 564 nxt_timer_t *timer; 565 nxt_conn_t *c; 566 567 timer = obj; 568 569 c = nxt_read_timer_conn(timer); 570 c->socket.timedout = 1; 571 c->socket.closed = 1; 572 573 nxt_debug(task, "controller conn read timeout"); 574 575 nxt_controller_conn_close(task, c, data); 576} 577 578 579static const nxt_event_conn_state_t nxt_controller_conn_body_read_state 580 nxt_aligned(64) = 581{ 582 .ready_handler = nxt_controller_conn_body_read, 583 .close_handler = nxt_controller_conn_close, 584 .error_handler = nxt_controller_conn_read_error, 585 586 .timer_handler = nxt_controller_conn_read_timeout, 587 .timer_value = nxt_controller_conn_timeout_value, 588 .timer_data = 60 * 1000, 589 .timer_autoreset = 1, 590}; 591 592 593static void 594nxt_controller_conn_body_read(nxt_task_t *task, void *obj, void *data) 595{ 596 size_t read; 597 nxt_buf_t *b; 598 nxt_conn_t *c; 599 nxt_controller_request_t *r; 600 601 c = obj; 602 r = data; 603 b = c->read; 604 605 read = nxt_buf_mem_used_size(&b->mem); 606 607 nxt_debug(task, "controller conn body read: %uz of %uz", 608 read, r->length); 609 610 if (read >= r->length) { 611 nxt_controller_process_request(task, r); 612 return; 613 } 614 615 nxt_conn_read(task->thread->engine, c); 616} 617 618 619static const nxt_event_conn_state_t nxt_controller_conn_write_state 620 nxt_aligned(64) = 621{ 622 .ready_handler = nxt_controller_conn_write, 623 .error_handler = nxt_controller_conn_write_error, 624 625 .timer_handler = nxt_controller_conn_write_timeout, 626 .timer_value = nxt_controller_conn_timeout_value, 627 .timer_data = 60 * 1000, 628 .timer_autoreset = 1, 629}; 630 631 632static void 633nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data) 634{ 635 nxt_buf_t *b; 636 nxt_conn_t *c; 637 638 c = obj; 639 640 nxt_debug(task, "controller conn write"); 641 642 b = c->write; 643 644 if (b->mem.pos != b->mem.free) { 645 nxt_conn_write(task->thread->engine, c); 646 return; 647 } 648 649 nxt_debug(task, "controller conn write complete"); 650 651 nxt_controller_conn_close(task, c, data); 652} 653 654 655static void 656nxt_controller_conn_write_error(nxt_task_t *task, void *obj, void *data) 657{ 658 nxt_conn_t *c; 659 660 c = obj; 661 662 nxt_debug(task, "controller conn write error"); 663 664 nxt_controller_conn_close(task, c, data); 665} 666 667 668static void 669nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj, void *data) 670{ 671 nxt_conn_t *c; 672 nxt_timer_t *timer; 673 674 timer = obj; 675 676 c = nxt_write_timer_conn(timer); 677 c->socket.timedout = 1; 678 c->socket.closed = 1; 679 680 nxt_debug(task, "controller conn write timeout"); 681 682 nxt_controller_conn_close(task, c, data); 683} 684 685 686static const nxt_event_conn_state_t nxt_controller_conn_close_state 687 nxt_aligned(64) = 688{ 689 .ready_handler = nxt_controller_conn_free, 690}; 691 692 693static void 694nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data) 695{ 696 nxt_conn_t *c; 697 698 c = obj; 699 700 nxt_debug(task, "controller conn close"); 701 702 nxt_queue_remove(&c->link); 703 704 c->write_state = &nxt_controller_conn_close_state; 705 706 nxt_conn_close(task->thread->engine, c); 707} 708 709 710static void 711nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data) 712{ 713 nxt_conn_t *c; 714 715 c = obj; 716 717 nxt_debug(task, "controller conn free"); 718 719 nxt_sockaddr_cache_free(task->thread->engine, c); 720 721 nxt_conn_free(task, c); 722} 723 724 725static nxt_int_t 726nxt_controller_request_content_length(void *ctx, nxt_http_field_t *field,
|
730 nxt_log_t *log)
| 727 uintptr_t data)
|
731{ 732 off_t length; 733 nxt_controller_request_t *r; 734 735 r = ctx; 736
| 728{ 729 off_t length; 730 nxt_controller_request_t *r; 731 732 r = ctx; 733
|
737 length = nxt_off_t_parse(field->value.start, field->value.length);
| 734 length = nxt_off_t_parse(field->value, field->value_length);
|
738 739 if (nxt_fast_path(length > 0)) { 740 741 if (nxt_slow_path(length > NXT_SIZE_T_MAX)) {
| 735 736 if (nxt_fast_path(length > 0)) { 737 738 if (nxt_slow_path(length > NXT_SIZE_T_MAX)) {
|
742 nxt_log_error(NXT_LOG_ERR, log, "Content-Length is too big");
| 739 nxt_log_error(NXT_LOG_ERR, &r->conn->log, 740 "Content-Length is too big");
|
743 return NXT_ERROR; 744 } 745 746 r->length = length; 747 return NXT_OK; 748 } 749
| 741 return NXT_ERROR; 742 } 743 744 r->length = length; 745 return NXT_OK; 746 } 747
|
750 nxt_log_error(NXT_LOG_ERR, log, "Content-Length is invalid");
| 748 nxt_log_error(NXT_LOG_ERR, &r->conn->log, "Content-Length is invalid");
|
751 752 return NXT_ERROR; 753} 754 755 756static void 757nxt_controller_process_request(nxt_task_t *task, nxt_controller_request_t *req) 758{ 759 nxt_mp_t *mp; 760 nxt_int_t rc; 761 nxt_str_t path; 762 nxt_conn_t *c; 763 nxt_buf_mem_t *mbuf; 764 nxt_conf_op_t *ops; 765 nxt_conf_value_t *value; 766 nxt_conf_validation_t vldt; 767 nxt_conf_json_error_t error; 768 nxt_controller_response_t resp; 769 770 static const nxt_str_t empty_obj = nxt_string("{}"); 771 772 c = req->conn; 773 path = req->parser.path; 774 775 if (path.length > 1 && path.start[path.length - 1] == '/') { 776 path.length--; 777 } 778 779 nxt_memzero(&resp, sizeof(nxt_controller_response_t)); 780 781 if (nxt_str_eq(&req->parser.method, "GET", 3)) { 782 783 value = nxt_conf_get_path(nxt_controller_conf.root, &path); 784 785 if (value == NULL) { 786 goto not_found; 787 } 788 789 resp.status = 200; 790 resp.conf = value; 791 792 nxt_controller_response(task, req, &resp); 793 return; 794 } 795 796 if (nxt_str_eq(&req->parser.method, "PUT", 3)) { 797 798 if (!nxt_queue_is_empty(&nxt_controller_waiting_requests)) { 799 nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link); 800 return; 801 } 802 803 mp = nxt_mp_create(1024, 128, 256, 32); 804 805 if (nxt_slow_path(mp == NULL)) { 806 goto alloc_fail; 807 } 808 809 mbuf = &c->read->mem; 810 811 nxt_memzero(&error, sizeof(nxt_conf_json_error_t)); 812 813 value = nxt_conf_json_parse(mp, mbuf->pos, mbuf->free, &error); 814 815 if (value == NULL) { 816 nxt_mp_destroy(mp); 817 818 if (error.pos == NULL) { 819 goto alloc_fail; 820 } 821 822 resp.status = 400; 823 resp.title = (u_char *) "Invalid JSON."; 824 resp.detail.length = nxt_strlen(error.detail); 825 resp.detail.start = error.detail; 826 resp.offset = error.pos - mbuf->pos; 827 828 nxt_conf_json_position(mbuf->pos, error.pos, 829 &resp.line, &resp.column); 830 831 nxt_controller_response(task, req, &resp); 832 return; 833 } 834 835 if (path.length != 1) { 836 rc = nxt_conf_op_compile(c->mem_pool, &ops, 837 nxt_controller_conf.root, 838 &path, value); 839 840 if (rc != NXT_OK) { 841 if (rc == NXT_DECLINED) { 842 goto not_found; 843 } 844 845 goto alloc_fail; 846 } 847 848 value = nxt_conf_clone(mp, ops, nxt_controller_conf.root); 849 850 if (nxt_slow_path(value == NULL)) { 851 nxt_mp_destroy(mp); 852 goto alloc_fail; 853 } 854 } 855 856 nxt_memzero(&vldt, sizeof(nxt_conf_validation_t)); 857 858 vldt.conf = value; 859 vldt.pool = c->mem_pool; 860 861 rc = nxt_conf_validate(&vldt); 862 863 if (nxt_slow_path(rc != NXT_OK)) { 864 nxt_mp_destroy(mp); 865 866 if (rc == NXT_DECLINED) { 867 resp.detail = vldt.error; 868 goto invalid_conf; 869 } 870 871 /* rc == NXT_ERROR */ 872 goto alloc_fail; 873 } 874 875 rc = nxt_controller_conf_send(task, value, 876 nxt_controller_conf_handler, req); 877 878 if (nxt_slow_path(rc != NXT_OK)) { 879 nxt_mp_destroy(mp); 880 881 if (rc == NXT_DECLINED) { 882 goto no_router; 883 } 884 885 /* rc == NXT_ERROR */ 886 goto alloc_fail; 887 } 888 889 req->conf.root = value; 890 req->conf.pool = mp; 891 892 nxt_queue_insert_head(&nxt_controller_waiting_requests, &req->link); 893 894 return; 895 } 896 897 if (nxt_str_eq(&req->parser.method, "DELETE", 6)) { 898 899 if (!nxt_queue_is_empty(&nxt_controller_waiting_requests)) { 900 nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link); 901 return; 902 } 903 904 if (path.length == 1) { 905 mp = nxt_mp_create(1024, 128, 256, 32); 906 907 if (nxt_slow_path(mp == NULL)) { 908 goto alloc_fail; 909 } 910 911 value = nxt_conf_json_parse_str(mp, &empty_obj); 912 913 } else { 914 rc = nxt_conf_op_compile(c->mem_pool, &ops, 915 nxt_controller_conf.root, 916 &path, NULL); 917 918 if (rc != NXT_OK) { 919 if (rc == NXT_DECLINED) { 920 goto not_found; 921 } 922 923 goto alloc_fail; 924 } 925 926 mp = nxt_mp_create(1024, 128, 256, 32); 927 928 if (nxt_slow_path(mp == NULL)) { 929 goto alloc_fail; 930 } 931 932 value = nxt_conf_clone(mp, ops, nxt_controller_conf.root); 933 } 934 935 if (nxt_slow_path(value == NULL)) { 936 nxt_mp_destroy(mp); 937 goto alloc_fail; 938 } 939 940 nxt_memzero(&vldt, sizeof(nxt_conf_validation_t)); 941 942 vldt.conf = value; 943 vldt.pool = c->mem_pool; 944 945 rc = nxt_conf_validate(&vldt); 946 947 if (nxt_slow_path(rc != NXT_OK)) { 948 nxt_mp_destroy(mp); 949 950 if (rc == NXT_DECLINED) { 951 resp.detail = vldt.error; 952 goto invalid_conf; 953 } 954 955 /* rc == NXT_ERROR */ 956 goto alloc_fail; 957 } 958 959 rc = nxt_controller_conf_send(task, value, 960 nxt_controller_conf_handler, req); 961 962 if (nxt_slow_path(rc != NXT_OK)) { 963 nxt_mp_destroy(mp); 964 965 if (rc == NXT_DECLINED) { 966 goto no_router; 967 } 968 969 /* rc == NXT_ERROR */ 970 goto alloc_fail; 971 } 972 973 req->conf.root = value; 974 req->conf.pool = mp; 975 976 nxt_queue_insert_head(&nxt_controller_waiting_requests, &req->link); 977 978 return; 979 } 980 981 resp.status = 405; 982 resp.title = (u_char *) "Invalid method."; 983 resp.offset = -1; 984 985 nxt_controller_response(task, req, &resp); 986 return; 987 988not_found: 989 990 resp.status = 404; 991 resp.title = (u_char *) "Value doesn't exist."; 992 resp.offset = -1; 993 994 nxt_controller_response(task, req, &resp); 995 return; 996 997invalid_conf: 998 999 resp.status = 400; 1000 resp.title = (u_char *) "Invalid configuration."; 1001 resp.offset = -1; 1002 1003 nxt_controller_response(task, req, &resp); 1004 return; 1005 1006alloc_fail: 1007 1008 resp.status = 500; 1009 resp.title = (u_char *) "Memory allocation failed."; 1010 resp.offset = -1; 1011 1012 nxt_controller_response(task, req, &resp); 1013 return; 1014 1015no_router: 1016 1017 resp.status = 500; 1018 resp.title = (u_char *) "Router process isn't available."; 1019 resp.offset = -1; 1020 1021 nxt_controller_response(task, req, &resp); 1022 return; 1023} 1024 1025 1026static void 1027nxt_controller_conf_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 1028 void *data) 1029{ 1030 nxt_queue_t queue; 1031 nxt_controller_request_t *req; 1032 nxt_controller_response_t resp; 1033 1034 req = data; 1035 1036 nxt_debug(task, "controller conf ready: %*s", 1037 nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos); 1038 1039 nxt_queue_remove(&req->link); 1040 1041 nxt_memzero(&resp, sizeof(nxt_controller_response_t)); 1042 1043 if (msg->port_msg.type == NXT_PORT_MSG_RPC_READY) { 1044 nxt_mp_destroy(nxt_controller_conf.pool); 1045 1046 nxt_controller_conf = req->conf; 1047 1048 nxt_controller_conf_store(task, req->conf.root); 1049 1050 resp.status = 200; 1051 resp.title = (u_char *) "Reconfiguration done."; 1052 1053 } else { 1054 nxt_mp_destroy(req->conf.pool); 1055 1056 resp.status = 500; 1057 resp.title = (u_char *) "Failed to apply new configuration."; 1058 resp.offset = -1; 1059 } 1060 1061 nxt_controller_response(task, req, &resp); 1062 1063 nxt_queue_init(&queue); 1064 nxt_queue_add(&queue, &nxt_controller_waiting_requests); 1065 1066 nxt_queue_init(&nxt_controller_waiting_requests); 1067 1068 nxt_queue_each(req, &queue, nxt_controller_request_t, link) { 1069 nxt_controller_process_request(task, req); 1070 } nxt_queue_loop; 1071} 1072 1073 1074static void 1075nxt_controller_conf_store(nxt_task_t *task, nxt_conf_value_t *conf) 1076{ 1077 size_t size; 1078 nxt_buf_t *b; 1079 nxt_port_t *main_port; 1080 nxt_runtime_t *rt; 1081 1082 rt = task->thread->runtime; 1083 1084 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 1085 1086 size = nxt_conf_json_length(conf, NULL); 1087 1088 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size); 1089 1090 if (nxt_fast_path(b != NULL)) { 1091 b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL); 1092 1093 (void) nxt_port_socket_write(task, main_port, NXT_PORT_MSG_CONF_STORE, 1094 -1, 0, -1, b); 1095 } 1096} 1097 1098 1099static void 1100nxt_controller_response(nxt_task_t *task, nxt_controller_request_t *req, 1101 nxt_controller_response_t *resp) 1102{ 1103 size_t size; 1104 nxt_str_t status_line, str; 1105 nxt_buf_t *b, *body; 1106 nxt_conn_t *c; 1107 nxt_uint_t n; 1108 nxt_conf_value_t *value, *location; 1109 nxt_conf_json_pretty_t pretty; 1110 1111 static nxt_str_t success_str = nxt_string("success"); 1112 static nxt_str_t error_str = nxt_string("error"); 1113 static nxt_str_t detail_str = nxt_string("detail"); 1114 static nxt_str_t location_str = nxt_string("location"); 1115 static nxt_str_t offset_str = nxt_string("offset"); 1116 static nxt_str_t line_str = nxt_string("line"); 1117 static nxt_str_t column_str = nxt_string("column"); 1118 1119 static nxt_time_string_t date_cache = { 1120 (nxt_atomic_uint_t) -1, 1121 nxt_controller_date, 1122 "%s, %02d %s %4d %02d:%02d:%02d GMT", 1123 sizeof("Wed, 31 Dec 1986 16:40:00 GMT") - 1, 1124 NXT_THREAD_TIME_GMT, 1125 NXT_THREAD_TIME_SEC, 1126 }; 1127 1128 switch (resp->status) { 1129 1130 case 200: 1131 nxt_str_set(&status_line, "200 OK"); 1132 break; 1133 1134 case 400: 1135 nxt_str_set(&status_line, "400 Bad Request"); 1136 break; 1137 1138 case 404: 1139 nxt_str_set(&status_line, "404 Not Found"); 1140 break; 1141 1142 case 405: 1143 nxt_str_set(&status_line, "405 Method Not Allowed"); 1144 break; 1145 1146 default: 1147 nxt_str_set(&status_line, "500 Internal Server Error"); 1148 break; 1149 } 1150 1151 c = req->conn; 1152 value = resp->conf; 1153 1154 if (value == NULL) { 1155 n = 1 1156 + (resp->detail.length != 0) 1157 + (resp->status >= 400 && resp->offset != -1); 1158 1159 value = nxt_conf_create_object(c->mem_pool, n); 1160 1161 if (nxt_slow_path(value == NULL)) { 1162 nxt_controller_conn_close(task, c, req); 1163 return; 1164 } 1165 1166 str.length = nxt_strlen(resp->title); 1167 str.start = resp->title; 1168 1169 if (resp->status < 400) { 1170 nxt_conf_set_member_string(value, &success_str, &str, 0); 1171 1172 } else { 1173 nxt_conf_set_member_string(value, &error_str, &str, 0); 1174 } 1175 1176 n = 0; 1177 1178 if (resp->detail.length != 0) { 1179 n++; 1180 1181 nxt_conf_set_member_string(value, &detail_str, &resp->detail, n); 1182 } 1183 1184 if (resp->status >= 400 && resp->offset != -1) { 1185 n++; 1186 1187 location = nxt_conf_create_object(c->mem_pool, 1188 resp->line != 0 ? 3 : 1); 1189 1190 nxt_conf_set_member(value, &location_str, location, n); 1191 1192 nxt_conf_set_member_integer(location, &offset_str, resp->offset, 0); 1193 1194 if (resp->line != 0) { 1195 nxt_conf_set_member_integer(location, &line_str, 1196 resp->line, 1); 1197 1198 nxt_conf_set_member_integer(location, &column_str, 1199 resp->column, 2); 1200 } 1201 } 1202 } 1203 1204 nxt_memzero(&pretty, sizeof(nxt_conf_json_pretty_t)); 1205 1206 size = nxt_conf_json_length(value, &pretty) + 2; 1207 1208 body = nxt_buf_mem_alloc(c->mem_pool, size, 0); 1209 if (nxt_slow_path(body == NULL)) { 1210 nxt_controller_conn_close(task, c, req); 1211 return; 1212 } 1213 1214 nxt_memzero(&pretty, sizeof(nxt_conf_json_pretty_t)); 1215 1216 body->mem.free = nxt_conf_json_print(body->mem.free, value, &pretty); 1217 1218 body->mem.free = nxt_cpymem(body->mem.free, "\r\n", 2); 1219 1220 size = sizeof("HTTP/1.1 " "\r\n") - 1 + status_line.length 1221 + sizeof("Server: unit/" NXT_VERSION "\r\n") - 1 1222 + sizeof("Date: Wed, 31 Dec 1986 16:40:00 GMT\r\n") - 1 1223 + sizeof("Content-Type: application/json\r\n") - 1 1224 + sizeof("Content-Length: " "\r\n") - 1 + NXT_SIZE_T_LEN 1225 + sizeof("Connection: close\r\n") - 1 1226 + sizeof("\r\n") - 1; 1227 1228 b = nxt_buf_mem_alloc(c->mem_pool, size, 0); 1229 if (nxt_slow_path(b == NULL)) { 1230 nxt_controller_conn_close(task, c, req); 1231 return; 1232 } 1233 1234 b->next = body; 1235 1236 nxt_str_set(&str, "HTTP/1.1 "); 1237 1238 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length); 1239 b->mem.free = nxt_cpymem(b->mem.free, status_line.start, 1240 status_line.length); 1241 1242 nxt_str_set(&str, "\r\n" 1243 "Server: unit/" NXT_VERSION "\r\n" 1244 "Date: "); 1245 1246 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length); 1247 1248 b->mem.free = nxt_thread_time_string(task->thread, &date_cache, 1249 b->mem.free); 1250 1251 nxt_str_set(&str, "\r\n" 1252 "Content-Type: application/json\r\n" 1253 "Content-Length: "); 1254 1255 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length); 1256 1257 b->mem.free = nxt_sprintf(b->mem.free, b->mem.end, "%uz", 1258 nxt_buf_mem_used_size(&body->mem)); 1259 1260 nxt_str_set(&str, "\r\n" 1261 "Connection: close\r\n" 1262 "\r\n"); 1263 1264 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length); 1265 1266 c->write = b; 1267 c->write_state = &nxt_controller_conn_write_state; 1268 1269 nxt_conn_write(task->thread->engine, c); 1270} 1271 1272 1273static u_char * 1274nxt_controller_date(u_char *buf, nxt_realtime_t *now, struct tm *tm, 1275 size_t size, const char *format) 1276{ 1277 static const char *week[] = { "Sun", "Mon", "Tue", "Wed", "Thu", "Fri", 1278 "Sat" }; 1279 1280 static const char *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun", 1281 "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" }; 1282 1283 return nxt_sprintf(buf, buf + size, format, 1284 week[tm->tm_wday], tm->tm_mday, 1285 month[tm->tm_mon], tm->tm_year + 1900, 1286 tm->tm_hour, tm->tm_min, tm->tm_sec); 1287}
| 749 750 return NXT_ERROR; 751} 752 753 754static void 755nxt_controller_process_request(nxt_task_t *task, nxt_controller_request_t *req) 756{ 757 nxt_mp_t *mp; 758 nxt_int_t rc; 759 nxt_str_t path; 760 nxt_conn_t *c; 761 nxt_buf_mem_t *mbuf; 762 nxt_conf_op_t *ops; 763 nxt_conf_value_t *value; 764 nxt_conf_validation_t vldt; 765 nxt_conf_json_error_t error; 766 nxt_controller_response_t resp; 767 768 static const nxt_str_t empty_obj = nxt_string("{}"); 769 770 c = req->conn; 771 path = req->parser.path; 772 773 if (path.length > 1 && path.start[path.length - 1] == '/') { 774 path.length--; 775 } 776 777 nxt_memzero(&resp, sizeof(nxt_controller_response_t)); 778 779 if (nxt_str_eq(&req->parser.method, "GET", 3)) { 780 781 value = nxt_conf_get_path(nxt_controller_conf.root, &path); 782 783 if (value == NULL) { 784 goto not_found; 785 } 786 787 resp.status = 200; 788 resp.conf = value; 789 790 nxt_controller_response(task, req, &resp); 791 return; 792 } 793 794 if (nxt_str_eq(&req->parser.method, "PUT", 3)) { 795 796 if (!nxt_queue_is_empty(&nxt_controller_waiting_requests)) { 797 nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link); 798 return; 799 } 800 801 mp = nxt_mp_create(1024, 128, 256, 32); 802 803 if (nxt_slow_path(mp == NULL)) { 804 goto alloc_fail; 805 } 806 807 mbuf = &c->read->mem; 808 809 nxt_memzero(&error, sizeof(nxt_conf_json_error_t)); 810 811 value = nxt_conf_json_parse(mp, mbuf->pos, mbuf->free, &error); 812 813 if (value == NULL) { 814 nxt_mp_destroy(mp); 815 816 if (error.pos == NULL) { 817 goto alloc_fail; 818 } 819 820 resp.status = 400; 821 resp.title = (u_char *) "Invalid JSON."; 822 resp.detail.length = nxt_strlen(error.detail); 823 resp.detail.start = error.detail; 824 resp.offset = error.pos - mbuf->pos; 825 826 nxt_conf_json_position(mbuf->pos, error.pos, 827 &resp.line, &resp.column); 828 829 nxt_controller_response(task, req, &resp); 830 return; 831 } 832 833 if (path.length != 1) { 834 rc = nxt_conf_op_compile(c->mem_pool, &ops, 835 nxt_controller_conf.root, 836 &path, value); 837 838 if (rc != NXT_OK) { 839 if (rc == NXT_DECLINED) { 840 goto not_found; 841 } 842 843 goto alloc_fail; 844 } 845 846 value = nxt_conf_clone(mp, ops, nxt_controller_conf.root); 847 848 if (nxt_slow_path(value == NULL)) { 849 nxt_mp_destroy(mp); 850 goto alloc_fail; 851 } 852 } 853 854 nxt_memzero(&vldt, sizeof(nxt_conf_validation_t)); 855 856 vldt.conf = value; 857 vldt.pool = c->mem_pool; 858 859 rc = nxt_conf_validate(&vldt); 860 861 if (nxt_slow_path(rc != NXT_OK)) { 862 nxt_mp_destroy(mp); 863 864 if (rc == NXT_DECLINED) { 865 resp.detail = vldt.error; 866 goto invalid_conf; 867 } 868 869 /* rc == NXT_ERROR */ 870 goto alloc_fail; 871 } 872 873 rc = nxt_controller_conf_send(task, value, 874 nxt_controller_conf_handler, req); 875 876 if (nxt_slow_path(rc != NXT_OK)) { 877 nxt_mp_destroy(mp); 878 879 if (rc == NXT_DECLINED) { 880 goto no_router; 881 } 882 883 /* rc == NXT_ERROR */ 884 goto alloc_fail; 885 } 886 887 req->conf.root = value; 888 req->conf.pool = mp; 889 890 nxt_queue_insert_head(&nxt_controller_waiting_requests, &req->link); 891 892 return; 893 } 894 895 if (nxt_str_eq(&req->parser.method, "DELETE", 6)) { 896 897 if (!nxt_queue_is_empty(&nxt_controller_waiting_requests)) { 898 nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link); 899 return; 900 } 901 902 if (path.length == 1) { 903 mp = nxt_mp_create(1024, 128, 256, 32); 904 905 if (nxt_slow_path(mp == NULL)) { 906 goto alloc_fail; 907 } 908 909 value = nxt_conf_json_parse_str(mp, &empty_obj); 910 911 } else { 912 rc = nxt_conf_op_compile(c->mem_pool, &ops, 913 nxt_controller_conf.root, 914 &path, NULL); 915 916 if (rc != NXT_OK) { 917 if (rc == NXT_DECLINED) { 918 goto not_found; 919 } 920 921 goto alloc_fail; 922 } 923 924 mp = nxt_mp_create(1024, 128, 256, 32); 925 926 if (nxt_slow_path(mp == NULL)) { 927 goto alloc_fail; 928 } 929 930 value = nxt_conf_clone(mp, ops, nxt_controller_conf.root); 931 } 932 933 if (nxt_slow_path(value == NULL)) { 934 nxt_mp_destroy(mp); 935 goto alloc_fail; 936 } 937 938 nxt_memzero(&vldt, sizeof(nxt_conf_validation_t)); 939 940 vldt.conf = value; 941 vldt.pool = c->mem_pool; 942 943 rc = nxt_conf_validate(&vldt); 944 945 if (nxt_slow_path(rc != NXT_OK)) { 946 nxt_mp_destroy(mp); 947 948 if (rc == NXT_DECLINED) { 949 resp.detail = vldt.error; 950 goto invalid_conf; 951 } 952 953 /* rc == NXT_ERROR */ 954 goto alloc_fail; 955 } 956 957 rc = nxt_controller_conf_send(task, value, 958 nxt_controller_conf_handler, req); 959 960 if (nxt_slow_path(rc != NXT_OK)) { 961 nxt_mp_destroy(mp); 962 963 if (rc == NXT_DECLINED) { 964 goto no_router; 965 } 966 967 /* rc == NXT_ERROR */ 968 goto alloc_fail; 969 } 970 971 req->conf.root = value; 972 req->conf.pool = mp; 973 974 nxt_queue_insert_head(&nxt_controller_waiting_requests, &req->link); 975 976 return; 977 } 978 979 resp.status = 405; 980 resp.title = (u_char *) "Invalid method."; 981 resp.offset = -1; 982 983 nxt_controller_response(task, req, &resp); 984 return; 985 986not_found: 987 988 resp.status = 404; 989 resp.title = (u_char *) "Value doesn't exist."; 990 resp.offset = -1; 991 992 nxt_controller_response(task, req, &resp); 993 return; 994 995invalid_conf: 996 997 resp.status = 400; 998 resp.title = (u_char *) "Invalid configuration."; 999 resp.offset = -1; 1000 1001 nxt_controller_response(task, req, &resp); 1002 return; 1003 1004alloc_fail: 1005 1006 resp.status = 500; 1007 resp.title = (u_char *) "Memory allocation failed."; 1008 resp.offset = -1; 1009 1010 nxt_controller_response(task, req, &resp); 1011 return; 1012 1013no_router: 1014 1015 resp.status = 500; 1016 resp.title = (u_char *) "Router process isn't available."; 1017 resp.offset = -1; 1018 1019 nxt_controller_response(task, req, &resp); 1020 return; 1021} 1022 1023 1024static void 1025nxt_controller_conf_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 1026 void *data) 1027{ 1028 nxt_queue_t queue; 1029 nxt_controller_request_t *req; 1030 nxt_controller_response_t resp; 1031 1032 req = data; 1033 1034 nxt_debug(task, "controller conf ready: %*s", 1035 nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos); 1036 1037 nxt_queue_remove(&req->link); 1038 1039 nxt_memzero(&resp, sizeof(nxt_controller_response_t)); 1040 1041 if (msg->port_msg.type == NXT_PORT_MSG_RPC_READY) { 1042 nxt_mp_destroy(nxt_controller_conf.pool); 1043 1044 nxt_controller_conf = req->conf; 1045 1046 nxt_controller_conf_store(task, req->conf.root); 1047 1048 resp.status = 200; 1049 resp.title = (u_char *) "Reconfiguration done."; 1050 1051 } else { 1052 nxt_mp_destroy(req->conf.pool); 1053 1054 resp.status = 500; 1055 resp.title = (u_char *) "Failed to apply new configuration."; 1056 resp.offset = -1; 1057 } 1058 1059 nxt_controller_response(task, req, &resp); 1060 1061 nxt_queue_init(&queue); 1062 nxt_queue_add(&queue, &nxt_controller_waiting_requests); 1063 1064 nxt_queue_init(&nxt_controller_waiting_requests); 1065 1066 nxt_queue_each(req, &queue, nxt_controller_request_t, link) { 1067 nxt_controller_process_request(task, req); 1068 } nxt_queue_loop; 1069} 1070 1071 1072static void 1073nxt_controller_conf_store(nxt_task_t *task, nxt_conf_value_t *conf) 1074{ 1075 size_t size; 1076 nxt_buf_t *b; 1077 nxt_port_t *main_port; 1078 nxt_runtime_t *rt; 1079 1080 rt = task->thread->runtime; 1081 1082 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 1083 1084 size = nxt_conf_json_length(conf, NULL); 1085 1086 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size); 1087 1088 if (nxt_fast_path(b != NULL)) { 1089 b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL); 1090 1091 (void) nxt_port_socket_write(task, main_port, NXT_PORT_MSG_CONF_STORE, 1092 -1, 0, -1, b); 1093 } 1094} 1095 1096 1097static void 1098nxt_controller_response(nxt_task_t *task, nxt_controller_request_t *req, 1099 nxt_controller_response_t *resp) 1100{ 1101 size_t size; 1102 nxt_str_t status_line, str; 1103 nxt_buf_t *b, *body; 1104 nxt_conn_t *c; 1105 nxt_uint_t n; 1106 nxt_conf_value_t *value, *location; 1107 nxt_conf_json_pretty_t pretty; 1108 1109 static nxt_str_t success_str = nxt_string("success"); 1110 static nxt_str_t error_str = nxt_string("error"); 1111 static nxt_str_t detail_str = nxt_string("detail"); 1112 static nxt_str_t location_str = nxt_string("location"); 1113 static nxt_str_t offset_str = nxt_string("offset"); 1114 static nxt_str_t line_str = nxt_string("line"); 1115 static nxt_str_t column_str = nxt_string("column"); 1116 1117 static nxt_time_string_t date_cache = { 1118 (nxt_atomic_uint_t) -1, 1119 nxt_controller_date, 1120 "%s, %02d %s %4d %02d:%02d:%02d GMT", 1121 sizeof("Wed, 31 Dec 1986 16:40:00 GMT") - 1, 1122 NXT_THREAD_TIME_GMT, 1123 NXT_THREAD_TIME_SEC, 1124 }; 1125 1126 switch (resp->status) { 1127 1128 case 200: 1129 nxt_str_set(&status_line, "200 OK"); 1130 break; 1131 1132 case 400: 1133 nxt_str_set(&status_line, "400 Bad Request"); 1134 break; 1135 1136 case 404: 1137 nxt_str_set(&status_line, "404 Not Found"); 1138 break; 1139 1140 case 405: 1141 nxt_str_set(&status_line, "405 Method Not Allowed"); 1142 break; 1143 1144 default: 1145 nxt_str_set(&status_line, "500 Internal Server Error"); 1146 break; 1147 } 1148 1149 c = req->conn; 1150 value = resp->conf; 1151 1152 if (value == NULL) { 1153 n = 1 1154 + (resp->detail.length != 0) 1155 + (resp->status >= 400 && resp->offset != -1); 1156 1157 value = nxt_conf_create_object(c->mem_pool, n); 1158 1159 if (nxt_slow_path(value == NULL)) { 1160 nxt_controller_conn_close(task, c, req); 1161 return; 1162 } 1163 1164 str.length = nxt_strlen(resp->title); 1165 str.start = resp->title; 1166 1167 if (resp->status < 400) { 1168 nxt_conf_set_member_string(value, &success_str, &str, 0); 1169 1170 } else { 1171 nxt_conf_set_member_string(value, &error_str, &str, 0); 1172 } 1173 1174 n = 0; 1175 1176 if (resp->detail.length != 0) { 1177 n++; 1178 1179 nxt_conf_set_member_string(value, &detail_str, &resp->detail, n); 1180 } 1181 1182 if (resp->status >= 400 && resp->offset != -1) { 1183 n++; 1184 1185 location = nxt_conf_create_object(c->mem_pool, 1186 resp->line != 0 ? 3 : 1); 1187 1188 nxt_conf_set_member(value, &location_str, location, n); 1189 1190 nxt_conf_set_member_integer(location, &offset_str, resp->offset, 0); 1191 1192 if (resp->line != 0) { 1193 nxt_conf_set_member_integer(location, &line_str, 1194 resp->line, 1); 1195 1196 nxt_conf_set_member_integer(location, &column_str, 1197 resp->column, 2); 1198 } 1199 } 1200 } 1201 1202 nxt_memzero(&pretty, sizeof(nxt_conf_json_pretty_t)); 1203 1204 size = nxt_conf_json_length(value, &pretty) + 2; 1205 1206 body = nxt_buf_mem_alloc(c->mem_pool, size, 0); 1207 if (nxt_slow_path(body == NULL)) { 1208 nxt_controller_conn_close(task, c, req); 1209 return; 1210 } 1211 1212 nxt_memzero(&pretty, sizeof(nxt_conf_json_pretty_t)); 1213 1214 body->mem.free = nxt_conf_json_print(body->mem.free, value, &pretty); 1215 1216 body->mem.free = nxt_cpymem(body->mem.free, "\r\n", 2); 1217 1218 size = sizeof("HTTP/1.1 " "\r\n") - 1 + status_line.length 1219 + sizeof("Server: unit/" NXT_VERSION "\r\n") - 1 1220 + sizeof("Date: Wed, 31 Dec 1986 16:40:00 GMT\r\n") - 1 1221 + sizeof("Content-Type: application/json\r\n") - 1 1222 + sizeof("Content-Length: " "\r\n") - 1 + NXT_SIZE_T_LEN 1223 + sizeof("Connection: close\r\n") - 1 1224 + sizeof("\r\n") - 1; 1225 1226 b = nxt_buf_mem_alloc(c->mem_pool, size, 0); 1227 if (nxt_slow_path(b == NULL)) { 1228 nxt_controller_conn_close(task, c, req); 1229 return; 1230 } 1231 1232 b->next = body; 1233 1234 nxt_str_set(&str, "HTTP/1.1 "); 1235 1236 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length); 1237 b->mem.free = nxt_cpymem(b->mem.free, status_line.start, 1238 status_line.length); 1239 1240 nxt_str_set(&str, "\r\n" 1241 "Server: unit/" NXT_VERSION "\r\n" 1242 "Date: "); 1243 1244 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length); 1245 1246 b->mem.free = nxt_thread_time_string(task->thread, &date_cache, 1247 b->mem.free); 1248 1249 nxt_str_set(&str, "\r\n" 1250 "Content-Type: application/json\r\n" 1251 "Content-Length: "); 1252 1253 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length); 1254 1255 b->mem.free = nxt_sprintf(b->mem.free, b->mem.end, "%uz", 1256 nxt_buf_mem_used_size(&body->mem)); 1257 1258 nxt_str_set(&str, "\r\n" 1259 "Connection: close\r\n" 1260 "\r\n"); 1261 1262 b->mem.free = nxt_cpymem(b->mem.free, str.start, str.length); 1263 1264 c->write = b; 1265 c->write_state = &nxt_controller_conn_write_state; 1266 1267 nxt_conn_write(task->thread->engine, c); 1268} 1269 1270 1271static u_char * 1272nxt_controller_date(u_char *buf, nxt_realtime_t *now, struct tm *tm, 1273 size_t size, const char *format) 1274{ 1275 static const char *week[] = { "Sun", "Mon", "Tue", "Wed", "Thu", "Fri", 1276 "Sat" }; 1277 1278 static const char *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun", 1279 "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" }; 1280 1281 return nxt_sprintf(buf, buf + size, format, 1282 week[tm->tm_wday], tm->tm_mday, 1283 month[tm->tm_mon], tm->tm_year + 1900, 1284 tm->tm_hour, tm->tm_min, tm->tm_sec); 1285}
|