Deleted
Added
nxt_router.c (317:94010c8bd7bc) | nxt_router.c (318:c2442f5e054d) |
---|---|
1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8#include <nxt_router.h> 9#include <nxt_conf.h> 10 11 12typedef struct { | 1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8#include <nxt_router.h> 9#include <nxt_conf.h> 10 11 12typedef struct { |
13 nxt_str_t type; 14 uint32_t workers; | 13 nxt_str_t type; 14 uint32_t workers; 15 nxt_msec_t timeout; 16 uint32_t requests; 17 nxt_conf_value_t *limits_value; |
15} nxt_router_app_conf_t; 16 17 18typedef struct { 19 nxt_str_t application; 20} nxt_router_listener_conf_t; 21 22 23typedef struct nxt_req_app_link_s nxt_req_app_link_t; 24typedef struct nxt_start_worker_s nxt_start_worker_t; 25 26struct nxt_start_worker_s { 27 nxt_app_t *app; 28 nxt_req_app_link_t *ra; 29 30 nxt_work_t work; 31}; 32 33 | 18} nxt_router_app_conf_t; 19 20 21typedef struct { 22 nxt_str_t application; 23} nxt_router_listener_conf_t; 24 25 26typedef struct nxt_req_app_link_s nxt_req_app_link_t; 27typedef struct nxt_start_worker_s nxt_start_worker_t; 28 29struct nxt_start_worker_s { 30 nxt_app_t *app; 31 nxt_req_app_link_t *ra; 32 33 nxt_work_t work; 34}; 35 36 |
37typedef struct { 38 uint32_t stream; 39 nxt_conn_t *conn; 40 nxt_port_t *app_port; 41 nxt_req_app_link_t *ra; 42 43 nxt_queue_link_t link; /* for nxt_conn_t.requests */ 44} nxt_req_conn_link_t; 45 46 |
|
34struct nxt_req_app_link_s { | 47struct nxt_req_app_link_s { |
35 nxt_req_id_t req_id; | 48 uint32_t stream; |
36 nxt_port_t *app_port; | 49 nxt_port_t *app_port; |
50 nxt_pid_t app_pid; |
|
37 nxt_port_t *reply_port; 38 nxt_app_parse_ctx_t *ap; 39 nxt_req_conn_link_t *rc; 40 41 nxt_queue_link_t link; /* for nxt_app_t.requests */ 42 43 nxt_mp_t *mem_pool; 44 nxt_work_t work; 45}; 46 47 48typedef struct { 49 nxt_socket_conf_t *socket_conf; 50 nxt_router_temp_conf_t *temp_conf; 51} nxt_socket_rpc_t; 52 53 | 51 nxt_port_t *reply_port; 52 nxt_app_parse_ctx_t *ap; 53 nxt_req_conn_link_t *rc; 54 55 nxt_queue_link_t link; /* for nxt_app_t.requests */ 56 57 nxt_mp_t *mem_pool; 58 nxt_work_t work; 59}; 60 61 62typedef struct { 63 nxt_socket_conf_t *socket_conf; 64 nxt_router_temp_conf_t *temp_conf; 65} nxt_socket_rpc_t; 66 67 |
68typedef struct { 69 nxt_mp_t *mem_pool; 70 nxt_port_recv_msg_t msg; 71 nxt_work_t work; 72} nxt_remove_pid_msg_t; 73 74 75static void nxt_router_worker_remove_pid_handler(nxt_task_t *task, void *obj, 76 void *data); 77static void nxt_router_worker_remove_pid_done(nxt_task_t *task, void *obj, 78 void *data); 79 |
|
54static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task); 55static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data); 56static void nxt_router_conf_ready(nxt_task_t *task, 57 nxt_router_temp_conf_t *tmcf); 58static void nxt_router_conf_error(nxt_task_t *task, 59 nxt_router_temp_conf_t *tmcf); 60static void nxt_router_conf_send(nxt_task_t *task, 61 nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type); --- 38 unchanged lines hidden (view full) --- 100 nxt_event_engine_t *engine); 101static void nxt_router_apps_sort(nxt_router_t *router, 102 nxt_router_temp_conf_t *tmcf); 103 104static void nxt_router_engines_post(nxt_router_t *router, 105 nxt_router_temp_conf_t *tmcf); 106static void nxt_router_engine_post(nxt_event_engine_t *engine, 107 nxt_work_t *jobs); | 80static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task); 81static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data); 82static void nxt_router_conf_ready(nxt_task_t *task, 83 nxt_router_temp_conf_t *tmcf); 84static void nxt_router_conf_error(nxt_task_t *task, 85 nxt_router_temp_conf_t *tmcf); 86static void nxt_router_conf_send(nxt_task_t *task, 87 nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type); --- 38 unchanged lines hidden (view full) --- 126 nxt_event_engine_t *engine); 127static void nxt_router_apps_sort(nxt_router_t *router, 128 nxt_router_temp_conf_t *tmcf); 129 130static void nxt_router_engines_post(nxt_router_t *router, 131 nxt_router_temp_conf_t *tmcf); 132static void nxt_router_engine_post(nxt_event_engine_t *engine, 133 nxt_work_t *jobs); |
108static void nxt_router_app_data_handler(nxt_task_t *task, 109 nxt_port_recv_msg_t *msg); | |
110 111static void nxt_router_thread_start(void *data); 112static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj, 113 void *data); 114static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj, 115 void *data); 116static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, 117 void *data); --- 6 unchanged lines hidden (view full) --- 124static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, 125 void *data); 126static void nxt_router_conf_release(nxt_task_t *task, 127 nxt_socket_conf_joint_t *joint); 128 129static void nxt_router_send_sw_request(nxt_task_t *task, void *obj, 130 void *data); 131static nxt_bool_t nxt_router_app_free(nxt_task_t *task, nxt_app_t *app); | 134 135static void nxt_router_thread_start(void *data); 136static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj, 137 void *data); 138static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj, 139 void *data); 140static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, 141 void *data); --- 6 unchanged lines hidden (view full) --- 148static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, 149 void *data); 150static void nxt_router_conf_release(nxt_task_t *task, 151 nxt_socket_conf_joint_t *joint); 152 153static void nxt_router_send_sw_request(nxt_task_t *task, void *obj, 154 void *data); 155static nxt_bool_t nxt_router_app_free(nxt_task_t *task, nxt_app_t *app); |
132static nxt_port_t * nxt_router_app_get_port(nxt_app_t *app, uint32_t req_id); | 156static nxt_port_t * nxt_router_app_get_port(nxt_app_t *app, uint32_t stream); |
133static void nxt_router_app_release_port(nxt_task_t *task, void *obj, 134 void *data); 135 136static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data); 137static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, 138 void *data); 139static void nxt_router_conn_http_body_read(nxt_task_t *task, void *obj, 140 void *data); --- 7 unchanged lines hidden (view full) --- 148 nxt_app_wmsg_t *wmsg); 149static nxt_int_t nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, 150 nxt_app_wmsg_t *wmsg); 151static void nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data); 152static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data); 153static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data); 154static void nxt_router_conn_error(nxt_task_t *task, void *obj, void *data); 155static void nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data); | 157static void nxt_router_app_release_port(nxt_task_t *task, void *obj, 158 void *data); 159 160static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data); 161static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, 162 void *data); 163static void nxt_router_conn_http_body_read(nxt_task_t *task, void *obj, 164 void *data); --- 7 unchanged lines hidden (view full) --- 172 nxt_app_wmsg_t *wmsg); 173static nxt_int_t nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, 174 nxt_app_wmsg_t *wmsg); 175static void nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data); 176static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data); 177static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data); 178static void nxt_router_conn_error(nxt_task_t *task, void *obj, void *data); 179static void nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data); |
180static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data); |
|
156static nxt_msec_t nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data); 157 158static void nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code, 159 const char* fmt, ...); 160 161static nxt_router_t *nxt_router; 162 163 --- 44 unchanged lines hidden (view full) --- 208 209 if (nxt_slow_path(sw == NULL)) { 210 return NULL; 211 } 212 213 sw->app = app; 214 sw->ra = ra; 215 | 181static nxt_msec_t nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data); 182 183static void nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code, 184 const char* fmt, ...); 185 186static nxt_router_t *nxt_router; 187 188 --- 44 unchanged lines hidden (view full) --- 233 234 if (nxt_slow_path(sw == NULL)) { 235 return NULL; 236 } 237 238 sw->app = app; 239 sw->ra = ra; 240 |
216 nxt_debug(task, "sw %p create, request #%uxD, app '%V' %p", sw, 217 ra->req_id, &app->name, app); | 241 nxt_debug(task, "sw %p create, stream #%uD, app '%V' %p", sw, 242 ra->stream, &app->name, app); |
218 219 rt = task->thread->runtime; 220 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 221 222 sw->work.handler = nxt_router_send_sw_request; 223 sw->work.task = &main_port->engine->task; 224 sw->work.obj = sw; 225 sw->work.data = task->thread->engine; --- 17 unchanged lines hidden (view full) --- 243nxt_router_sw_release(nxt_task_t *task, nxt_start_worker_t *sw) 244{ 245 nxt_debug(task, "sw %p release", sw); 246 247 nxt_free(sw); 248} 249 250 | 243 244 rt = task->thread->runtime; 245 main_port = rt->port_by_type[NXT_PROCESS_MAIN]; 246 247 sw->work.handler = nxt_router_send_sw_request; 248 sw->work.task = &main_port->engine->task; 249 sw->work.obj = sw; 250 sw->work.data = task->thread->engine; --- 17 unchanged lines hidden (view full) --- 268nxt_router_sw_release(nxt_task_t *task, nxt_start_worker_t *sw) 269{ 270 nxt_debug(task, "sw %p release", sw); 271 272 nxt_free(sw); 273} 274 275 |
276nxt_inline void 277nxt_router_rc_unlink(nxt_req_conn_link_t *rc) 278{ 279 nxt_queue_remove(&rc->link); 280 281 if (rc->ra != NULL) { 282 rc->ra->rc = NULL; 283 rc->ra = NULL; 284 } 285 286 rc->conn = NULL; 287} 288 289 |
|
251static nxt_req_app_link_t * 252nxt_router_ra_create(nxt_task_t *task, nxt_req_conn_link_t *rc) 253{ 254 nxt_mp_t *mp; | 290static nxt_req_app_link_t * 291nxt_router_ra_create(nxt_task_t *task, nxt_req_conn_link_t *rc) 292{ 293 nxt_mp_t *mp; |
294 nxt_event_engine_t *engine; |
|
255 nxt_req_app_link_t *ra; 256 257 mp = rc->conn->mem_pool; | 295 nxt_req_app_link_t *ra; 296 297 mp = rc->conn->mem_pool; |
298 engine = task->thread->engine; |
|
258 259 ra = nxt_mp_retain(mp, sizeof(nxt_req_app_link_t)); 260 261 if (nxt_slow_path(ra == NULL)) { 262 return NULL; 263 } 264 | 299 300 ra = nxt_mp_retain(mp, sizeof(nxt_req_app_link_t)); 301 302 if (nxt_slow_path(ra == NULL)) { 303 return NULL; 304 } 305 |
265 nxt_debug(task, "ra #%uxD create", rc->req_id); | 306 nxt_debug(task, "ra stream #%uD create", rc->stream); |
266 267 nxt_memzero(ra, sizeof(nxt_req_app_link_t)); 268 | 307 308 nxt_memzero(ra, sizeof(nxt_req_app_link_t)); 309 |
269 ra->req_id = rc->req_id; 270 ra->app_port = NULL; | 310 ra->stream = rc->stream; 311 ra->app_pid = -1; |
271 ra->rc = rc; | 312 ra->rc = rc; |
313 rc->ra = ra; 314 ra->reply_port = engine->port; |
|
272 273 ra->mem_pool = mp; 274 275 ra->work.handler = NULL; | 315 316 ra->mem_pool = mp; 317 318 ra->work.handler = NULL; |
276 ra->work.task = &task->thread->engine->task; | 319 ra->work.task = &engine->task; |
277 ra->work.obj = ra; | 320 ra->work.obj = ra; |
278 ra->work.data = task->thread->engine; | 321 ra->work.data = engine; |
279 280 return ra; 281} 282 283 284static void 285nxt_router_ra_release(nxt_task_t *task, void *obj, void *data) 286{ | 322 323 return ra; 324} 325 326 327static void 328nxt_router_ra_release(nxt_task_t *task, void *obj, void *data) 329{ |
330 nxt_port_t *app_port; |
|
287 nxt_req_app_link_t *ra; 288 nxt_event_engine_t *engine; 289 290 ra = obj; 291 engine = data; 292 | 331 nxt_req_app_link_t *ra; 332 nxt_event_engine_t *engine; 333 334 ra = obj; 335 engine = data; 336 |
337 if (ra->app_port != NULL) { 338 339 app_port = ra->app_port; 340 ra->app_port = NULL; 341 342 if (task->thread->engine != engine) { 343 ra->app_pid = app_port->pid; 344 } 345 346 nxt_router_app_release_port(task, app_port, app_port->app); 347 348#if 0 349 /* Uncomment to hold app port until complete response received. */ 350 if (ra->rc != NULL) { 351 ra->rc->app_port = ra->app_port; 352 353 } else { 354 nxt_router_app_release_port(task, ra->app_port, ra->app_port->app); 355 } 356#endif 357 } 358 |
|
293 if (task->thread->engine != engine) { 294 ra->work.handler = nxt_router_ra_release; 295 ra->work.task = &engine->task; 296 ra->work.next = NULL; 297 | 359 if (task->thread->engine != engine) { 360 ra->work.handler = nxt_router_ra_release; 361 ra->work.task = &engine->task; 362 ra->work.next = NULL; 363 |
298 nxt_debug(task, "ra #%uxD post release to %p", ra->req_id, engine); | 364 nxt_debug(task, "ra stream #%uD post release to %p", 365 ra->stream, engine); |
299 300 nxt_event_engine_post(engine, &ra->work); 301 302 return; 303 } 304 | 366 367 nxt_event_engine_post(engine, &ra->work); 368 369 return; 370 } 371 |
305 nxt_debug(task, "ra #%uxD release", ra->req_id); | 372 if (ra->rc != NULL && ra->app_pid != -1) { 373 nxt_port_rpc_ex_set_peer(task, engine->port, ra->rc, ra->app_pid); 374 } |
306 | 375 |
307 if (ra->app_port != NULL) { | 376 nxt_debug(task, "ra stream #%uD release", ra->stream); |
308 | 377 |
309 nxt_router_app_release_port(task, ra->app_port, ra->app_port->app); | 378 nxt_mp_release(ra->mem_pool, ra); 379} |
310 | 380 |
311#if 0 312 /* Uncomment to hold app port until complete response received. */ 313 if (ra->rc->conn != NULL) { 314 ra->rc->app_port = ra->app_port; | |
315 | 381 |
316 } else { 317 nxt_router_app_release_port(task, ra->app_port, ra->app_port->app); 318 } 319#endif | 382static void 383nxt_router_ra_abort(nxt_task_t *task, void *obj, void *data) 384{ 385 nxt_conn_t *c; 386 nxt_req_app_link_t *ra; 387 nxt_event_engine_t *engine; 388 389 ra = obj; 390 engine = data; 391 392 if (task->thread->engine != engine) { 393 ra->work.handler = nxt_router_ra_abort; 394 ra->work.task = &engine->task; 395 ra->work.next = NULL; 396 397 nxt_debug(task, "ra stream #%uD post abort to %p", ra->stream, engine); 398 399 nxt_event_engine_post(engine, &ra->work); 400 401 return; |
320 } 321 | 402 } 403 |
404 nxt_debug(task, "ra stream #%uD abort", ra->stream); 405 406 if (ra->rc != NULL) { 407 c = ra->rc->conn; 408 409 nxt_router_gen_error(task, c, 500, 410 "Failed to start application worker"); 411 } 412 |
|
322 nxt_mp_release(ra->mem_pool, ra); 323} 324 325 326void 327nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 328{ 329 nxt_port_new_port_handler(task, msg); --- 49 unchanged lines hidden (view full) --- 379 nxt_router_conf_error(task, tmcf); 380 } 381} 382 383 384void 385nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 386{ | 413 nxt_mp_release(ra->mem_pool, ra); 414} 415 416 417void 418nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 419{ 420 nxt_port_new_port_handler(task, msg); --- 49 unchanged lines hidden (view full) --- 470 nxt_router_conf_error(task, tmcf); 471 } 472} 473 474 475void 476nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 477{ |
478 nxt_mp_t *mp; 479 nxt_buf_t *buf; 480 nxt_event_engine_t *engine; 481 nxt_remove_pid_msg_t *rp; 482 |
|
387 nxt_port_remove_pid_handler(task, msg); 388 389 if (msg->port_msg.stream == 0) { 390 return; 391 } 392 | 483 nxt_port_remove_pid_handler(task, msg); 484 485 if (msg->port_msg.stream == 0) { 486 return; 487 } 488 |
489 mp = nxt_mp_create(1024, 128, 256, 32); 490 491 buf = nxt_buf_mem_alloc(mp, nxt_buf_used_size(msg->buf), 0); 492 buf->mem.free = nxt_cpymem(buf->mem.free, msg->buf->mem.pos, 493 nxt_buf_used_size(msg->buf)); 494 495 nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0) 496 { 497 rp = nxt_mp_retain(mp, sizeof(nxt_remove_pid_msg_t)); 498 499 rp->mem_pool = mp; 500 501 rp->msg.fd = msg->fd; 502 rp->msg.buf = buf; 503 rp->msg.port = engine->port; 504 rp->msg.port_msg = msg->port_msg; 505 rp->msg.size = msg->size; 506 rp->msg.new_port = NULL; 507 508 rp->work.handler = nxt_router_worker_remove_pid_handler; 509 rp->work.task = &engine->task; 510 rp->work.obj = rp; 511 rp->work.data = task->thread->engine; 512 rp->work.next = NULL; 513 514 nxt_event_engine_post(engine, &rp->work); 515 } 516 nxt_queue_loop; 517 |
|
393 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; 394 395 nxt_port_rpc_handler(task, msg); 396} 397 398 | 518 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; 519 520 nxt_port_rpc_handler(task, msg); 521} 522 523 |
524static void 525nxt_router_worker_remove_pid_handler(nxt_task_t *task, void *obj, void *data) 526{ 527 nxt_event_engine_t *engine; 528 nxt_remove_pid_msg_t *rp; 529 530 rp = obj; 531 532 nxt_port_remove_pid_handler(task, &rp->msg); 533 534 engine = rp->work.data; 535 536 rp->work.handler = nxt_router_worker_remove_pid_done; 537 rp->work.task = &engine->task; 538 rp->work.next = NULL; 539 540 nxt_event_engine_post(engine, &rp->work); 541} 542 543 544static void 545nxt_router_worker_remove_pid_done(nxt_task_t *task, void *obj, void *data) 546{ 547 nxt_remove_pid_msg_t *rp; 548 549 rp = obj; 550 551 nxt_mp_release(rp->mem_pool, rp); 552} 553 554 |
|
399static nxt_router_temp_conf_t * 400nxt_router_temp_conf(nxt_task_t *task) 401{ 402 nxt_mp_t *mp, *tmp; 403 nxt_router_conf_t *rtcf; 404 nxt_router_temp_conf_t *tmcf; 405 406 mp = nxt_mp_create(1024, 128, 256, 32); --- 195 unchanged lines hidden (view full) --- 602 offsetof(nxt_router_app_conf_t, type), 603 }, 604 605 { 606 nxt_string("workers"), 607 NXT_CONF_MAP_INT32, 608 offsetof(nxt_router_app_conf_t, workers), 609 }, | 555static nxt_router_temp_conf_t * 556nxt_router_temp_conf(nxt_task_t *task) 557{ 558 nxt_mp_t *mp, *tmp; 559 nxt_router_conf_t *rtcf; 560 nxt_router_temp_conf_t *tmcf; 561 562 mp = nxt_mp_create(1024, 128, 256, 32); --- 195 unchanged lines hidden (view full) --- 758 offsetof(nxt_router_app_conf_t, type), 759 }, 760 761 { 762 nxt_string("workers"), 763 NXT_CONF_MAP_INT32, 764 offsetof(nxt_router_app_conf_t, workers), 765 }, |
766 767 { 768 nxt_string("limits"), 769 NXT_CONF_MAP_PTR, 770 offsetof(nxt_router_app_conf_t, limits_value), 771 }, |
|
610}; 611 612 | 772}; 773 774 |
775static nxt_conf_map_t nxt_router_app_limits_conf[] = { 776 { 777 nxt_string("timeout"), 778 NXT_CONF_MAP_MSEC, 779 offsetof(nxt_router_app_conf_t, timeout), 780 }, 781 782 { 783 nxt_string("requests"), 784 NXT_CONF_MAP_INT32, 785 offsetof(nxt_router_app_conf_t, requests), 786 }, 787}; 788 789 |
|
613static nxt_conf_map_t nxt_router_listener_conf[] = { 614 { 615 nxt_string("application"), 616 NXT_CONF_MAP_STR, 617 offsetof(nxt_router_listener_conf_t, application), 618 }, 619}; 620 --- 128 unchanged lines hidden (view full) --- 749 nxt_free(app); 750 751 nxt_queue_remove(&prev->link); 752 nxt_queue_insert_tail(&tmcf->previous, &prev->link); 753 continue; 754 } 755 756 apcf.workers = 1; | 790static nxt_conf_map_t nxt_router_listener_conf[] = { 791 { 792 nxt_string("application"), 793 NXT_CONF_MAP_STR, 794 offsetof(nxt_router_listener_conf_t, application), 795 }, 796}; 797 --- 128 unchanged lines hidden (view full) --- 926 nxt_free(app); 927 928 nxt_queue_remove(&prev->link); 929 nxt_queue_insert_tail(&tmcf->previous, &prev->link); 930 continue; 931 } 932 933 apcf.workers = 1; |
934 apcf.timeout = 0; 935 apcf.requests = 0; 936 apcf.limits_value = NULL; |
|
757 758 ret = nxt_conf_map_object(mp, application, nxt_router_app_conf, 759 nxt_nitems(nxt_router_app_conf), &apcf); 760 if (ret != NXT_OK) { 761 nxt_log(task, NXT_LOG_CRIT, "application map error"); 762 goto app_fail; 763 } 764 | 937 938 ret = nxt_conf_map_object(mp, application, nxt_router_app_conf, 939 nxt_nitems(nxt_router_app_conf), &apcf); 940 if (ret != NXT_OK) { 941 nxt_log(task, NXT_LOG_CRIT, "application map error"); 942 goto app_fail; 943 } 944 |
945 if (apcf.limits_value != NULL) { 946 947 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) { 948 nxt_log(task, NXT_LOG_CRIT, "application limits is not object"); 949 goto app_fail; 950 } 951 952 ret = nxt_conf_map_object(mp, apcf.limits_value, 953 nxt_router_app_limits_conf, 954 nxt_nitems(nxt_router_app_limits_conf), 955 &apcf); 956 if (ret != NXT_OK) { 957 nxt_log(task, NXT_LOG_CRIT, "application limits map error"); 958 goto app_fail; 959 } 960 } 961 |
|
765 nxt_debug(task, "application type: %V", &apcf.type); 766 nxt_debug(task, "application workers: %D", apcf.workers); | 962 nxt_debug(task, "application type: %V", &apcf.type); 963 nxt_debug(task, "application workers: %D", apcf.workers); |
964 nxt_debug(task, "application timeout: %D", apcf.timeout); 965 nxt_debug(task, "application requests: %D", apcf.requests); |
|
767 768 lang = nxt_app_lang_module(task->thread->runtime, &apcf.type); 769 770 if (lang == NULL) { 771 nxt_log(task, NXT_LOG_CRIT, "unknown application type: \"%V\"", 772 &apcf.type); 773 goto app_fail; 774 } --- 22 unchanged lines hidden (view full) --- 797 nxt_queue_init(&app->ports); 798 nxt_queue_init(&app->requests); 799 800 app->name.length = name.length; 801 nxt_memcpy(app->name.start, name.start, name.length); 802 803 app->type = type; 804 app->max_workers = apcf.workers; | 966 967 lang = nxt_app_lang_module(task->thread->runtime, &apcf.type); 968 969 if (lang == NULL) { 970 nxt_log(task, NXT_LOG_CRIT, "unknown application type: \"%V\"", 971 &apcf.type); 972 goto app_fail; 973 } --- 22 unchanged lines hidden (view full) --- 996 nxt_queue_init(&app->ports); 997 nxt_queue_init(&app->requests); 998 999 app->name.length = name.length; 1000 nxt_memcpy(app->name.start, name.start, name.length); 1001 1002 app->type = type; 1003 app->max_workers = apcf.workers; |
1004 app->timeout = apcf.timeout; |
|
805 app->live = 1; 806 app->prepare_msg = nxt_app_prepare_msg[type]; 807 808 nxt_queue_insert_tail(&tmcf->apps, &app->link); 809 } 810 811 http = nxt_conf_get_path(conf, &http_path); 812#if 0 --- 771 unchanged lines hidden (view full) --- 1584 1585 1586static nxt_port_handler_t nxt_router_app_port_handlers[] = { 1587 NULL, /* NXT_PORT_MSG_QUIT */ 1588 NULL, /* NXT_PORT_MSG_NEW_PORT */ 1589 NULL, /* NXT_PORT_MSG_CHANGE_FILE */ 1590 /* TODO: remove mmap_handler from app ports */ 1591 nxt_port_mmap_handler, /* NXT_PORT_MSG_MMAP */ | 1005 app->live = 1; 1006 app->prepare_msg = nxt_app_prepare_msg[type]; 1007 1008 nxt_queue_insert_tail(&tmcf->apps, &app->link); 1009 } 1010 1011 http = nxt_conf_get_path(conf, &http_path); 1012#if 0 --- 771 unchanged lines hidden (view full) --- 1784 1785 1786static nxt_port_handler_t nxt_router_app_port_handlers[] = { 1787 NULL, /* NXT_PORT_MSG_QUIT */ 1788 NULL, /* NXT_PORT_MSG_NEW_PORT */ 1789 NULL, /* NXT_PORT_MSG_CHANGE_FILE */ 1790 /* TODO: remove mmap_handler from app ports */ 1791 nxt_port_mmap_handler, /* NXT_PORT_MSG_MMAP */ |
1592 nxt_router_app_data_handler, | 1792 nxt_port_rpc_handler, /* NXT_PORT_MSG_DATA */ |
1593 NULL, /* NXT_PORT_MSG_REMOVE_PID */ 1594 NULL, /* NXT_PORT_MSG_READY */ 1595 NULL, /* NXT_PORT_MSG_START_WORKER */ 1596 NULL, /* NXT_PORT_MSG_SOCKET */ 1597 NULL, /* NXT_PORT_MSG_MODULES */ 1598 NULL, /* NXT_PORT_MSG_CONF_STORE */ 1599 nxt_port_rpc_handler, 1600 nxt_port_rpc_handler, --- 402 unchanged lines hidden (view full) --- 2003{ 2004 .ready_handler = nxt_router_conn_ready, 2005 .close_handler = nxt_router_conn_close, 2006 .error_handler = nxt_router_conn_error, 2007}; 2008 2009 2010static void | 1793 NULL, /* NXT_PORT_MSG_REMOVE_PID */ 1794 NULL, /* NXT_PORT_MSG_READY */ 1795 NULL, /* NXT_PORT_MSG_START_WORKER */ 1796 NULL, /* NXT_PORT_MSG_SOCKET */ 1797 NULL, /* NXT_PORT_MSG_MODULES */ 1798 NULL, /* NXT_PORT_MSG_CONF_STORE */ 1799 nxt_port_rpc_handler, 1800 nxt_port_rpc_handler, --- 402 unchanged lines hidden (view full) --- 2203{ 2204 .ready_handler = nxt_router_conn_ready, 2205 .close_handler = nxt_router_conn_close, 2206 .error_handler = nxt_router_conn_error, 2207}; 2208 2209 2210static void |
2011nxt_router_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) | 2211nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2212 void *data) |
2012{ 2013 size_t dump_size; 2014 nxt_buf_t *b, *last; 2015 nxt_conn_t *c; 2016 nxt_req_conn_link_t *rc; | 2213{ 2214 size_t dump_size; 2215 nxt_buf_t *b, *last; 2216 nxt_conn_t *c; 2217 nxt_req_conn_link_t *rc; |
2017 nxt_event_engine_t *engine; | |
2018 2019 b = msg->buf; | 2218 2219 b = msg->buf; |
2020 engine = task->thread->engine; | 2220 rc = data; |
2021 | 2221 |
2022 rc = nxt_event_engine_request_find(engine, msg->port_msg.stream); 2023 if (nxt_slow_path(rc == NULL)) { 2024 nxt_debug(task, "request id %08uxD not found", msg->port_msg.stream); 2025 2026 return; 2027 } 2028 | |
2029 c = rc->conn; 2030 2031 dump_size = nxt_buf_used_size(b); 2032 2033 if (dump_size > 300) { 2034 dump_size = 300; 2035 } 2036 --- 16 unchanged lines hidden (view full) --- 2053 nxt_buf_chain_add(&b, last); 2054 2055 if (rc->app_port != NULL) { 2056 nxt_router_app_release_port(task, rc->app_port, rc->app_port->app); 2057 2058 rc->app_port = NULL; 2059 } 2060 | 2222 c = rc->conn; 2223 2224 dump_size = nxt_buf_used_size(b); 2225 2226 if (dump_size > 300) { 2227 dump_size = 300; 2228 } 2229 --- 16 unchanged lines hidden (view full) --- 2246 nxt_buf_chain_add(&b, last); 2247 2248 if (rc->app_port != NULL) { 2249 nxt_router_app_release_port(task, rc->app_port, rc->app_port->app); 2250 2251 rc->app_port = NULL; 2252 } 2253 |
2061 rc->conn = NULL; | 2254 nxt_router_rc_unlink(rc); |
2062 } 2063 2064 if (b == NULL) { 2065 return; 2066 } 2067 2068 if (msg->buf == b) { 2069 /* Disable instant buffer completion/re-using by port. */ --- 9 unchanged lines hidden (view full) --- 2079 } else { 2080 nxt_debug(task, "router data attach out bufs to existing chain"); 2081 2082 nxt_buf_chain_add(&c->write, b); 2083 } 2084} 2085 2086 | 2255 } 2256 2257 if (b == NULL) { 2258 return; 2259 } 2260 2261 if (msg->buf == b) { 2262 /* Disable instant buffer completion/re-using by port. */ --- 9 unchanged lines hidden (view full) --- 2272 } else { 2273 nxt_debug(task, "router data attach out bufs to existing chain"); 2274 2275 nxt_buf_chain_add(&c->write, b); 2276 } 2277} 2278 2279 |
2280static void 2281nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, 2282 void *data) 2283{ 2284 nxt_req_conn_link_t *rc; 2285 2286 rc = data; 2287 2288 nxt_router_gen_error(task, rc->conn, 500, 2289 "Application terminated unexpectedly"); 2290 2291 nxt_router_rc_unlink(rc); 2292} 2293 2294 |
|
2087nxt_inline const char * 2088nxt_router_text_by_code(int code) 2089{ 2090 switch (code) { 2091 case 400: return "Bad request"; 2092 case 404: return "Not found"; 2093 case 403: return "Forbidden"; 2094 case 408: return "Request Timeout"; --- 47 unchanged lines hidden (view full) --- 2142 2143 2144 2145static void 2146nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code, 2147 const char* fmt, ...) 2148{ 2149 va_list args; | 2295nxt_inline const char * 2296nxt_router_text_by_code(int code) 2297{ 2298 switch (code) { 2299 case 400: return "Bad request"; 2300 case 404: return "Not found"; 2301 case 403: return "Forbidden"; 2302 case 408: return "Request Timeout"; --- 47 unchanged lines hidden (view full) --- 2350 2351 2352 2353static void 2354nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code, 2355 const char* fmt, ...) 2356{ 2357 va_list args; |
2358 nxt_mp_t *mp; |
|
2150 nxt_buf_t *b; 2151 | 2359 nxt_buf_t *b; 2360 |
2361 /* TODO: fix when called from main thread */ 2362 /* TODO: fix when called in the middle of response */ 2363 2364 mp = nxt_mp_create(1024, 128, 256, 32); 2365 |
|
2152 va_start(args, fmt); | 2366 va_start(args, fmt); |
2153 b = nxt_router_get_error_buf(task, c->mem_pool, code, fmt, args); | 2367 b = nxt_router_get_error_buf(task, mp, code, fmt, args); |
2154 va_end(args); 2155 | 2368 va_end(args); 2369 |
2156 if (c->socket.data != NULL) { 2157 nxt_mp_free(c->mem_pool, c->socket.data); 2158 c->socket.data = NULL; 2159 } 2160 | |
2161 if (c->socket.fd == -1) { | 2370 if (c->socket.fd == -1) { |
2162 nxt_mp_release(c->mem_pool, b->next); 2163 nxt_mp_release(c->mem_pool, b); | 2371 nxt_mp_release(mp, b->next); 2372 nxt_mp_release(mp, b); |
2164 return; 2165 } 2166 2167 if (c->write == NULL) { 2168 c->write = b; 2169 c->write_state = &nxt_router_conn_write_state; 2170 2171 nxt_conn_write(task->thread->engine, c); --- 27 unchanged lines hidden (view full) --- 2199 2200 nxt_router_sw_release(task, sw); 2201} 2202 2203 2204static void 2205nxt_router_sw_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) 2206{ | 2373 return; 2374 } 2375 2376 if (c->write == NULL) { 2377 c->write = b; 2378 c->write_state = &nxt_router_conn_write_state; 2379 2380 nxt_conn_write(task->thread->engine, c); --- 27 unchanged lines hidden (view full) --- 2408 2409 nxt_router_sw_release(task, sw); 2410} 2411 2412 2413static void 2414nxt_router_sw_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) 2415{ |
2416 nxt_app_t *app; 2417 nxt_queue_link_t *lnk; 2418 nxt_req_app_link_t *ra; |
|
2207 nxt_start_worker_t *sw; 2208 2209 sw = data; 2210 2211 nxt_assert(sw != NULL); | 2419 nxt_start_worker_t *sw; 2420 2421 sw = data; 2422 2423 nxt_assert(sw != NULL); |
2424 nxt_assert(sw->app != NULL); |
|
2212 nxt_assert(sw->app->pending_workers != 0); 2213 | 2425 nxt_assert(sw->app->pending_workers != 0); 2426 |
2427 app = sw->app; 2428 |
|
2214 sw->app->pending_workers--; 2215 2216 nxt_debug(task, "sw %p error, failed to start app '%V'", | 2429 sw->app->pending_workers--; 2430 2431 nxt_debug(task, "sw %p error, failed to start app '%V'", |
2217 sw, &sw->app->name); | 2432 sw, &app->name); |
2218 | 2433 |
2434 if (!nxt_queue_is_empty(&app->requests)) { 2435 lnk = nxt_queue_last(&app->requests); 2436 nxt_queue_remove(lnk); 2437 2438 ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link); 2439 2440 nxt_debug(task, "app '%V' %p abort next stream #%uD", 2441 &app->name, app, ra->stream); 2442 2443 nxt_router_ra_abort(task, ra, ra->work.data); 2444 } 2445 |
|
2219 nxt_router_sw_release(task, sw); 2220} 2221 2222 2223static void 2224nxt_router_send_sw_request(nxt_task_t *task, void *obj, void *data) 2225{ 2226 size_t size; --- 5 unchanged lines hidden (view full) --- 2232 nxt_start_worker_t *sw; 2233 nxt_req_app_link_t *ra; 2234 2235 sw = obj; 2236 app = sw->app; 2237 2238 if (nxt_queue_is_empty(&app->requests)) { 2239 ra = sw->ra; | 2446 nxt_router_sw_release(task, sw); 2447} 2448 2449 2450static void 2451nxt_router_send_sw_request(nxt_task_t *task, void *obj, void *data) 2452{ 2453 size_t size; --- 5 unchanged lines hidden (view full) --- 2459 nxt_start_worker_t *sw; 2460 nxt_req_app_link_t *ra; 2461 2462 sw = obj; 2463 app = sw->app; 2464 2465 if (nxt_queue_is_empty(&app->requests)) { 2466 ra = sw->ra; |
2240 app_port = nxt_router_app_get_port(app, ra->req_id); | 2467 app_port = nxt_router_app_get_port(app, ra->stream); |
2241 2242 if (app_port != NULL) { | 2468 2469 if (app_port != NULL) { |
2243 nxt_debug(task, "app '%V' %p process request #%uxD", 2244 &app->name, app, ra->req_id); | 2470 nxt_debug(task, "app '%V' %p process stream #%uD", 2471 &app->name, app, ra->stream); |
2245 2246 ra->app_port = app_port; 2247 2248 nxt_router_process_http_request_mp(task, ra, app_port); 2249 2250 nxt_router_ra_release(task, ra, ra->work.data); 2251 nxt_router_sw_release(task, sw); 2252 --- 72 unchanged lines hidden (view full) --- 2325 nxt_router_sw_create(task, app, ra); 2326 } 2327 2328 return 0; 2329} 2330 2331 2332static nxt_port_t * | 2472 2473 ra->app_port = app_port; 2474 2475 nxt_router_process_http_request_mp(task, ra, app_port); 2476 2477 nxt_router_ra_release(task, ra, ra->work.data); 2478 nxt_router_sw_release(task, sw); 2479 --- 72 unchanged lines hidden (view full) --- 2552 nxt_router_sw_create(task, app, ra); 2553 } 2554 2555 return 0; 2556} 2557 2558 2559static nxt_port_t * |
2333nxt_router_app_get_port(nxt_app_t *app, uint32_t req_id) | 2560nxt_router_app_get_port(nxt_app_t *app, uint32_t stream) |
2334{ 2335 nxt_port_t *port; 2336 nxt_queue_link_t *lnk; 2337 2338 port = NULL; 2339 2340 nxt_thread_mutex_lock(&app->mutex); 2341 2342 if (!nxt_queue_is_empty(&app->ports)) { 2343 lnk = nxt_queue_first(&app->ports); 2344 nxt_queue_remove(lnk); 2345 2346 lnk->next = NULL; 2347 2348 port = nxt_queue_link_data(lnk, nxt_port_t, app_link); 2349 | 2561{ 2562 nxt_port_t *port; 2563 nxt_queue_link_t *lnk; 2564 2565 port = NULL; 2566 2567 nxt_thread_mutex_lock(&app->mutex); 2568 2569 if (!nxt_queue_is_empty(&app->ports)) { 2570 lnk = nxt_queue_first(&app->ports); 2571 nxt_queue_remove(lnk); 2572 2573 lnk->next = NULL; 2574 2575 port = nxt_queue_link_data(lnk, nxt_port_t, app_link); 2576 |
2350 port->app_req_id = req_id; | 2577 port->app_stream = stream; |
2351 } 2352 2353 nxt_thread_mutex_unlock(&app->mutex); 2354 2355 return port; 2356} 2357 2358 --- 31 unchanged lines hidden (view full) --- 2390 } 2391 2392 if (!nxt_queue_is_empty(&app->requests)) { 2393 lnk = nxt_queue_first(&app->requests); 2394 nxt_queue_remove(lnk); 2395 2396 ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link); 2397 | 2578 } 2579 2580 nxt_thread_mutex_unlock(&app->mutex); 2581 2582 return port; 2583} 2584 2585 --- 31 unchanged lines hidden (view full) --- 2617 } 2618 2619 if (!nxt_queue_is_empty(&app->requests)) { 2620 lnk = nxt_queue_first(&app->requests); 2621 nxt_queue_remove(lnk); 2622 2623 ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link); 2624 |
2398 nxt_debug(task, "app '%V' %p process next request #%uxD", 2399 &app->name, app, ra->req_id); | 2625 nxt_debug(task, "app '%V' %p process next stream #%uD", 2626 &app->name, app, ra->stream); |
2400 2401 ra->app_port = port; | 2627 2628 ra->app_port = port; |
2402 port->app_req_id = ra->req_id; | 2629 port->app_stream = ra->stream; |
2403 2404 nxt_router_process_http_request_mp(task, ra, port); 2405 2406 nxt_router_ra_release(task, ra, ra->work.data); 2407 2408 return; 2409 } 2410 | 2630 2631 nxt_router_process_http_request_mp(task, ra, port); 2632 2633 nxt_router_ra_release(task, ra, ra->work.data); 2634 2635 return; 2636 } 2637 |
2411 port->app_req_id = 0; | 2638 port->app_stream = 0; |
2412 2413 if (port->pair[1] == -1) { 2414 nxt_debug(task, "app '%V' %p port already closed (pid %PI dead?)", 2415 &app->name, app, port->pid); 2416 2417 app->workers--; 2418 nxt_router_app_free(task, app); 2419 --- 27 unchanged lines hidden (view full) --- 2447 2448nxt_bool_t 2449nxt_router_app_remove_port(nxt_port_t *port) 2450{ 2451 nxt_app_t *app; 2452 nxt_bool_t busy; 2453 2454 app = port->app; | 2639 2640 if (port->pair[1] == -1) { 2641 nxt_debug(task, "app '%V' %p port already closed (pid %PI dead?)", 2642 &app->name, app, port->pid); 2643 2644 app->workers--; 2645 nxt_router_app_free(task, app); 2646 --- 27 unchanged lines hidden (view full) --- 2674 2675nxt_bool_t 2676nxt_router_app_remove_port(nxt_port_t *port) 2677{ 2678 nxt_app_t *app; 2679 nxt_bool_t busy; 2680 2681 app = port->app; |
2455 busy = port->app_req_id != 0; | 2682 busy = port->app_stream != 0; |
2456 2457 if (app == NULL) { 2458 nxt_thread_log_debug("port %p app remove, no app", port); 2459 2460 nxt_assert(port->app_link.next == NULL); 2461 2462 return 1; 2463 } --- 14 unchanged lines hidden (view full) --- 2478 &app->name, app); 2479 2480 app->workers--; 2481 nxt_router_app_free(&port->engine->task, app); 2482 2483 return 1; 2484 } 2485 | 2683 2684 if (app == NULL) { 2685 nxt_thread_log_debug("port %p app remove, no app", port); 2686 2687 nxt_assert(port->app_link.next == NULL); 2688 2689 return 1; 2690 } --- 14 unchanged lines hidden (view full) --- 2705 &app->name, app); 2706 2707 app->workers--; 2708 nxt_router_app_free(&port->engine->task, app); 2709 2710 return 1; 2711 } 2712 |
2486 nxt_thread_log_debug("port %p app remove, busy, app '%V' %p, req #%uxD", 2487 port, &app->name, app, port->app_req_id); | 2713 nxt_thread_log_debug("port %p app remove, busy, app '%V' %p, " 2714 "app stream #%uD", port, &app->name, app, 2715 port->app_stream); |
2488 2489 return 0; 2490} 2491 2492 2493static nxt_int_t 2494nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra) 2495{ 2496 nxt_app_t *app; 2497 nxt_conn_t *c; 2498 nxt_port_t *port; | 2716 2717 return 0; 2718} 2719 2720 2721static nxt_int_t 2722nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra) 2723{ 2724 nxt_app_t *app; 2725 nxt_conn_t *c; 2726 nxt_port_t *port; |
2727 nxt_event_engine_t *engine; |
|
2499 nxt_start_worker_t *sw; 2500 nxt_socket_conf_joint_t *joint; 2501 2502 port = NULL; 2503 c = ra->rc->conn; 2504 2505 joint = c->listen->socket.data; 2506 app = joint->socket_conf->application; 2507 2508 if (app == NULL) { 2509 nxt_router_gen_error(task, c, 500, 2510 "Application is NULL in socket_conf"); 2511 return NXT_ERROR; 2512 } 2513 | 2728 nxt_start_worker_t *sw; 2729 nxt_socket_conf_joint_t *joint; 2730 2731 port = NULL; 2732 c = ra->rc->conn; 2733 2734 joint = c->listen->socket.data; 2735 app = joint->socket_conf->application; 2736 2737 if (app == NULL) { 2738 nxt_router_gen_error(task, c, 500, 2739 "Application is NULL in socket_conf"); 2740 return NXT_ERROR; 2741 } 2742 |
2743 engine = task->thread->engine; |
|
2514 | 2744 |
2515 port = nxt_router_app_get_port(app, ra->req_id); | 2745 nxt_timer_disable(engine, &c->read_timer); |
2516 | 2746 |
2747 if (app->timeout != 0) { 2748 c->read_timer.handler = nxt_router_app_timeout; 2749 nxt_timer_add(engine, &c->read_timer, app->timeout); 2750 } 2751 2752 port = nxt_router_app_get_port(app, ra->stream); 2753 |
|
2517 if (port != NULL) { 2518 nxt_debug(task, "already have port for app '%V'", &app->name); 2519 2520 ra->app_port = port; 2521 return NXT_OK; 2522 } 2523 2524 sw = nxt_router_sw_create(task, app, ra); --- 210 unchanged lines hidden (view full) --- 2735 2736static void 2737nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, 2738 nxt_app_parse_ctx_t *ap) 2739{ 2740 nxt_mp_t *port_mp; 2741 nxt_int_t res; 2742 nxt_port_t *port; | 2754 if (port != NULL) { 2755 nxt_debug(task, "already have port for app '%V'", &app->name); 2756 2757 ra->app_port = port; 2758 return NXT_OK; 2759 } 2760 2761 sw = nxt_router_sw_create(task, app, ra); --- 210 unchanged lines hidden (view full) --- 2972 2973static void 2974nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, 2975 nxt_app_parse_ctx_t *ap) 2976{ 2977 nxt_mp_t *port_mp; 2978 nxt_int_t res; 2979 nxt_port_t *port; |
2743 nxt_req_id_t req_id; | |
2744 nxt_event_engine_t *engine; 2745 nxt_req_app_link_t *ra; 2746 nxt_req_conn_link_t *rc; 2747 2748 engine = task->thread->engine; 2749 | 2980 nxt_event_engine_t *engine; 2981 nxt_req_app_link_t *ra; 2982 nxt_req_conn_link_t *rc; 2983 2984 engine = task->thread->engine; 2985 |
2750 do { 2751 req_id = nxt_random(&task->thread->random); 2752 } while (nxt_event_engine_request_find(engine, req_id) != NULL); | 2986 rc = nxt_port_rpc_register_handler_ex(task, engine->port, 2987 nxt_router_response_ready_handler, 2988 nxt_router_response_error_handler, 2989 sizeof(nxt_req_conn_link_t)); |
2753 | 2990 |
2754 rc = nxt_conn_request_add(c, req_id); 2755 | |
2756 if (nxt_slow_path(rc == NULL)) { 2757 nxt_router_gen_error(task, c, 500, "Failed to allocate " 2758 "req->conn link"); 2759 2760 return; 2761 } 2762 | 2991 if (nxt_slow_path(rc == NULL)) { 2992 nxt_router_gen_error(task, c, 500, "Failed to allocate " 2993 "req->conn link"); 2994 2995 return; 2996 } 2997 |
2763 nxt_event_engine_request_add(engine, rc); | 2998 rc->stream = nxt_port_rpc_ex_stream(rc); 2999 rc->conn = c; |
2764 | 3000 |
2765 nxt_debug(task, "req_id %uxD linked to conn %p at engine %p", 2766 req_id, c, engine); | 3001 nxt_queue_insert_tail(&c->requests, &rc->link); |
2767 | 3002 |
3003 nxt_debug(task, "stream #%uD linked to conn %p at engine %p", 3004 rc->stream, c, engine); 3005 |
|
2768 c->socket.data = NULL; 2769 2770 ra = nxt_router_ra_create(task, rc); 2771 2772 ra->ap = ap; | 3006 c->socket.data = NULL; 3007 3008 ra = nxt_router_ra_create(task, rc); 3009 3010 ra->ap = ap; |
2773 ra->reply_port = engine->port; | |
2774 2775 res = nxt_router_app_port(task, ra); 2776 2777 if (res != NXT_OK) { 2778 return; 2779 } 2780 2781 port = ra->app_port; 2782 2783 if (nxt_slow_path(port == NULL)) { | 3011 3012 res = nxt_router_app_port(task, ra); 3013 3014 if (res != NXT_OK) { 3015 return; 3016 } 3017 3018 port = ra->app_port; 3019 3020 if (nxt_slow_path(port == NULL)) { |
2784 nxt_router_gen_error(task, rc->conn, 500, "Application port not found"); | 3021 nxt_router_gen_error(task, c, 500, "Application port not found"); |
2785 return; 2786 } 2787 | 3022 return; 3023 } 3024 |
3025 nxt_port_rpc_ex_set_peer(task, engine->port, rc, port->pid); 3026 |
|
2788 port_mp = port->mem_pool; 2789 port->mem_pool = c->mem_pool; 2790 2791 nxt_router_process_http_request_mp(task, ra, port); 2792 2793 port->mem_pool = port_mp; 2794 | 3027 port_mp = port->mem_pool; 3028 port->mem_pool = c->mem_pool; 3029 3030 nxt_router_process_http_request_mp(task, ra, port); 3031 3032 port->mem_pool = port_mp; 3033 |
2795 | |
2796 nxt_router_ra_release(task, ra, ra->work.data); 2797} 2798 2799 2800static void 2801nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra, 2802 nxt_port_t *port) 2803{ 2804 nxt_int_t res; 2805 nxt_port_t *c_port, *reply_port; 2806 nxt_conn_t *c; 2807 nxt_app_wmsg_t wmsg; 2808 nxt_app_parse_ctx_t *ap; 2809 | 3034 nxt_router_ra_release(task, ra, ra->work.data); 3035} 3036 3037 3038static void 3039nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra, 3040 nxt_port_t *port) 3041{ 3042 nxt_int_t res; 3043 nxt_port_t *c_port, *reply_port; 3044 nxt_conn_t *c; 3045 nxt_app_wmsg_t wmsg; 3046 nxt_app_parse_ctx_t *ap; 3047 |
3048 /* TODO: it is unsafe to use ra->rc and ra->rc->conn in main thread */ 3049 3050 nxt_assert(ra->rc != NULL); 3051 |
|
2810 reply_port = ra->reply_port; 2811 ap = ra->ap; 2812 c = ra->rc->conn; 2813 2814 c_port = nxt_process_connected_port_find(port->process, reply_port->pid, 2815 reply_port->id); 2816 if (nxt_slow_path(c_port != reply_port)) { 2817 res = nxt_port_send_port(task, port, reply_port, 0); --- 5 unchanged lines hidden (view full) --- 2823 } 2824 2825 nxt_process_connected_port_add(port->process, reply_port); 2826 } 2827 2828 wmsg.port = port; 2829 wmsg.write = NULL; 2830 wmsg.buf = &wmsg.write; | 3052 reply_port = ra->reply_port; 3053 ap = ra->ap; 3054 c = ra->rc->conn; 3055 3056 c_port = nxt_process_connected_port_find(port->process, reply_port->pid, 3057 reply_port->id); 3058 if (nxt_slow_path(c_port != reply_port)) { 3059 res = nxt_port_send_port(task, port, reply_port, 0); --- 5 unchanged lines hidden (view full) --- 3065 } 3066 3067 nxt_process_connected_port_add(port->process, reply_port); 3068 } 3069 3070 wmsg.port = port; 3071 wmsg.write = NULL; 3072 wmsg.buf = &wmsg.write; |
2831 wmsg.stream = ra->req_id; | 3073 wmsg.stream = ra->stream; |
2832 2833 res = port->app->prepare_msg(task, &ap->r, &wmsg); 2834 2835 if (nxt_slow_path(res != NXT_OK)) { 2836 nxt_router_gen_error(task, c, 500, 2837 "Failed to prepare message for application"); 2838 return; 2839 } 2840 2841 nxt_debug(task, "about to send %d bytes buffer to worker port %d", 2842 nxt_buf_used_size(wmsg.write), 2843 wmsg.port->socket.fd); 2844 2845 res = nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA, | 3074 3075 res = port->app->prepare_msg(task, &ap->r, &wmsg); 3076 3077 if (nxt_slow_path(res != NXT_OK)) { 3078 nxt_router_gen_error(task, c, 500, 3079 "Failed to prepare message for application"); 3080 return; 3081 } 3082 3083 nxt_debug(task, "about to send %d bytes buffer to worker port %d", 3084 nxt_buf_used_size(wmsg.write), 3085 wmsg.port->socket.fd); 3086 3087 res = nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA, |
2846 -1, ra->req_id, reply_port->id, wmsg.write); | 3088 -1, ra->stream, reply_port->id, wmsg.write); |
2847 2848 if (nxt_slow_path(res != NXT_OK)) { 2849 nxt_router_gen_error(task, c, 500, 2850 "Failed to send message to application"); 2851 return; 2852 } 2853} 2854 --- 357 unchanged lines hidden (view full) --- 3212 nxt_socket_conf_joint_t *joint; 3213 3214 c = obj; 3215 3216 nxt_debug(task, "router conn close done"); 3217 3218 nxt_queue_each(rc, &c->requests, nxt_req_conn_link_t, link) { 3219 | 3089 3090 if (nxt_slow_path(res != NXT_OK)) { 3091 nxt_router_gen_error(task, c, 500, 3092 "Failed to send message to application"); 3093 return; 3094 } 3095} 3096 --- 357 unchanged lines hidden (view full) --- 3454 nxt_socket_conf_joint_t *joint; 3455 3456 c = obj; 3457 3458 nxt_debug(task, "router conn close done"); 3459 3460 nxt_queue_each(rc, &c->requests, nxt_req_conn_link_t, link) { 3461 |
3220 nxt_debug(task, "conn %p close, req %uxD", c, rc->req_id); | 3462 nxt_debug(task, "conn %p close, stream #%uD", c, rc->stream); |
3221 3222 if (rc->app_port != NULL) { 3223 nxt_router_app_release_port(task, rc->app_port, rc->app_port->app); 3224 3225 rc->app_port = NULL; 3226 } 3227 | 3463 3464 if (rc->app_port != NULL) { 3465 nxt_router_app_release_port(task, rc->app_port, rc->app_port->app); 3466 3467 rc->app_port = NULL; 3468 } 3469 |
3228 rc->conn = NULL; | 3470 nxt_router_rc_unlink(rc); |
3229 | 3471 |
3230 nxt_event_engine_request_remove(task->thread->engine, rc); | 3472 nxt_port_rpc_cancel(task, task->thread->engine->port, rc->stream); |
3231 3232 } nxt_queue_loop; 3233 3234 nxt_queue_remove(&c->link); 3235 3236 joint = c->listen->socket.data; 3237 3238 task = &task->thread->engine->task; --- 37 unchanged lines hidden (view full) --- 3276 nxt_router_gen_error(task, c, 408, "Read header timeout"); 3277 3278 } else { 3279 nxt_router_gen_error(task, c, 408, "Read body timeout"); 3280 } 3281} 3282 3283 | 3473 3474 } nxt_queue_loop; 3475 3476 nxt_queue_remove(&c->link); 3477 3478 joint = c->listen->socket.data; 3479 3480 task = &task->thread->engine->task; --- 37 unchanged lines hidden (view full) --- 3518 nxt_router_gen_error(task, c, 408, "Read header timeout"); 3519 3520 } else { 3521 nxt_router_gen_error(task, c, 408, "Read body timeout"); 3522 } 3523} 3524 3525 |
3526static void 3527nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data) 3528{ 3529 nxt_conn_t *c; 3530 nxt_timer_t *timer; 3531 3532 timer = obj; 3533 3534 nxt_debug(task, "router app timeout"); 3535 3536 c = nxt_read_timer_conn(timer); 3537 3538 nxt_router_gen_error(task, c, 408, "Application timeout"); 3539} 3540 3541 |
|
3284static nxt_msec_t 3285nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data) 3286{ 3287 nxt_socket_conf_joint_t *joint; 3288 3289 joint = c->listen->socket.data; 3290 3291 return nxt_value_at(nxt_msec_t, joint->socket_conf, data); 3292} | 3542static nxt_msec_t 3543nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data) 3544{ 3545 nxt_socket_conf_joint_t *joint; 3546 3547 joint = c->listen->socket.data; 3548 3549 return nxt_value_at(nxt_msec_t, joint->socket_conf, data); 3550} |