Deleted
Added
nxt_router.c (1558:026e4b909b61) | nxt_router.c (1561:4dae2fd8e02a) |
---|---|
1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8#include <nxt_router.h> --- 190 unchanged lines hidden (view full) --- 199 nxt_port_recv_msg_t *msg, void *data); 200 201static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app); 202 203static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, 204 nxt_apr_action_t action); 205static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, 206 nxt_request_rpc_data_t *req_rpc_data); | 1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8#include <nxt_router.h> --- 190 unchanged lines hidden (view full) --- 199 nxt_port_recv_msg_t *msg, void *data); 200 201static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app); 202 203static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, 204 nxt_apr_action_t action); 205static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, 206 nxt_request_rpc_data_t *req_rpc_data); |
207static void nxt_router_http_request_error(nxt_task_t *task, void *obj, 208 void *data); |
|
207static void nxt_router_http_request_done(nxt_task_t *task, void *obj, 208 void *data); 209 210static void nxt_router_app_prepare_request(nxt_task_t *task, 211 nxt_request_rpc_data_t *req_rpc_data); 212static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task, 213 nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix); 214 --- 319 unchanged lines hidden (view full) --- 534 return 0; 535} 536 537 538nxt_inline void 539nxt_request_rpc_data_unlink(nxt_task_t *task, 540 nxt_request_rpc_data_t *req_rpc_data) 541{ | 209static void nxt_router_http_request_done(nxt_task_t *task, void *obj, 210 void *data); 211 212static void nxt_router_app_prepare_request(nxt_task_t *task, 213 nxt_request_rpc_data_t *req_rpc_data); 214static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task, 215 nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix); 216 --- 319 unchanged lines hidden (view full) --- 536 return 0; 537} 538 539 540nxt_inline void 541nxt_request_rpc_data_unlink(nxt_task_t *task, 542 nxt_request_rpc_data_t *req_rpc_data) 543{ |
544 nxt_app_t *app; 545 nxt_bool_t unlinked; |
|
542 nxt_http_request_t *r; 543 544 nxt_router_msg_cancel(task, req_rpc_data); 545 546 if (req_rpc_data->app_port != NULL) { 547 nxt_router_app_port_release(task, req_rpc_data->app_port, 548 req_rpc_data->apr_action); 549 550 req_rpc_data->app_port = NULL; 551 } 552 | 546 nxt_http_request_t *r; 547 548 nxt_router_msg_cancel(task, req_rpc_data); 549 550 if (req_rpc_data->app_port != NULL) { 551 nxt_router_app_port_release(task, req_rpc_data->app_port, 552 req_rpc_data->apr_action); 553 554 req_rpc_data->app_port = NULL; 555 } 556 |
553 if (req_rpc_data->app != NULL) { 554 nxt_router_app_use(task, req_rpc_data->app, -1); 555 556 req_rpc_data->app = NULL; 557 } 558 | 557 app = req_rpc_data->app; |
559 r = req_rpc_data->request; 560 561 if (r != NULL) { 562 r->timer_data = NULL; 563 564 nxt_router_http_request_release_post(task, r); 565 566 r->req_rpc_data = NULL; 567 req_rpc_data->request = NULL; | 558 r = req_rpc_data->request; 559 560 if (r != NULL) { 561 r->timer_data = NULL; 562 563 nxt_router_http_request_release_post(task, r); 564 565 r->req_rpc_data = NULL; 566 req_rpc_data->request = NULL; |
567 568 if (app != NULL) { 569 unlinked = 0; 570 571 nxt_thread_mutex_lock(&app->mutex); 572 573 if (r->app_link.next != NULL) { 574 nxt_queue_remove(&r->app_link); 575 r->app_link.next = NULL; 576 577 unlinked = 1; 578 } 579 580 nxt_thread_mutex_unlock(&app->mutex); 581 582 if (unlinked) { 583 nxt_mp_release(r->mem_pool); 584 } 585 } |
|
568 } 569 | 586 } 587 |
588 if (app != NULL) { 589 nxt_router_app_use(task, app, -1); 590 591 req_rpc_data->app = NULL; 592 } 593 |
|
570 if (req_rpc_data->msg_info.body_fd != -1) { 571 nxt_fd_close(req_rpc_data->msg_info.body_fd); 572 573 req_rpc_data->msg_info.body_fd = -1; 574 } 575 576 if (req_rpc_data->rpc_cancel) { 577 req_rpc_data->rpc_cancel = 0; --- 909 unchanged lines hidden (view full) --- 1487 ret = nxt_thread_mutex_create(&app->mutex); 1488 if (ret != NXT_OK) { 1489 goto app_fail; 1490 } 1491 1492 nxt_queue_init(&app->ports); 1493 nxt_queue_init(&app->spare_ports); 1494 nxt_queue_init(&app->idle_ports); | 594 if (req_rpc_data->msg_info.body_fd != -1) { 595 nxt_fd_close(req_rpc_data->msg_info.body_fd); 596 597 req_rpc_data->msg_info.body_fd = -1; 598 } 599 600 if (req_rpc_data->rpc_cancel) { 601 req_rpc_data->rpc_cancel = 0; --- 909 unchanged lines hidden (view full) --- 1511 ret = nxt_thread_mutex_create(&app->mutex); 1512 if (ret != NXT_OK) { 1513 goto app_fail; 1514 } 1515 1516 nxt_queue_init(&app->ports); 1517 nxt_queue_init(&app->spare_ports); 1518 nxt_queue_init(&app->idle_ports); |
1519 nxt_queue_init(&app->ack_waiting_req); |
|
1495 1496 app->name.length = name.length; 1497 nxt_memcpy(app->name.start, name.start, name.length); 1498 1499 app->type = lang->type; 1500 app->max_processes = apcf.max_processes; 1501 app->spare_processes = apcf.spare_processes; 1502 app->max_pending_processes = apcf.spare_processes --- 2276 unchanged lines hidden (view full) --- 3779 3780 3781static void 3782nxt_router_req_headers_ack_handler(nxt_task_t *task, 3783 nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data) 3784{ 3785 int res; 3786 nxt_app_t *app; | 1520 1521 app->name.length = name.length; 1522 nxt_memcpy(app->name.start, name.start, name.length); 1523 1524 app->type = lang->type; 1525 app->max_processes = apcf.max_processes; 1526 app->spare_processes = apcf.spare_processes; 1527 app->max_pending_processes = apcf.spare_processes --- 2276 unchanged lines hidden (view full) --- 3804 3805 3806static void 3807nxt_router_req_headers_ack_handler(nxt_task_t *task, 3808 nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data) 3809{ 3810 int res; 3811 nxt_app_t *app; |
3787 nxt_bool_t start_process; | 3812 nxt_bool_t start_process, unlinked; |
3788 nxt_port_t *app_port, *main_app_port, *idle_port; 3789 nxt_queue_link_t *idle_lnk; 3790 nxt_http_request_t *r; 3791 3792 nxt_debug(task, "stream #%uD: got ack from %PI:%d", 3793 req_rpc_data->stream, 3794 msg->port_msg.pid, msg->port_msg.reply_port); 3795 3796 nxt_port_rpc_ex_set_peer(task, msg->port, req_rpc_data, 3797 msg->port_msg.pid); 3798 3799 app = req_rpc_data->app; | 3813 nxt_port_t *app_port, *main_app_port, *idle_port; 3814 nxt_queue_link_t *idle_lnk; 3815 nxt_http_request_t *r; 3816 3817 nxt_debug(task, "stream #%uD: got ack from %PI:%d", 3818 req_rpc_data->stream, 3819 msg->port_msg.pid, msg->port_msg.reply_port); 3820 3821 nxt_port_rpc_ex_set_peer(task, msg->port, req_rpc_data, 3822 msg->port_msg.pid); 3823 3824 app = req_rpc_data->app; |
3825 r = req_rpc_data->request; |
|
3800 3801 start_process = 0; | 3826 3827 start_process = 0; |
3828 unlinked = 0; |
|
3802 3803 nxt_thread_mutex_lock(&app->mutex); 3804 | 3829 3830 nxt_thread_mutex_lock(&app->mutex); 3831 |
3832 if (r->app_link.next != NULL) { 3833 nxt_queue_remove(&r->app_link); 3834 r->app_link.next = NULL; 3835 3836 unlinked = 1; 3837 } 3838 |
|
3805 app_port = nxt_port_hash_find(&app->port_hash, msg->port_msg.pid, 3806 msg->port_msg.reply_port); 3807 if (nxt_slow_path(app_port == NULL)) { 3808 nxt_thread_mutex_unlock(&app->mutex); 3809 | 3839 app_port = nxt_port_hash_find(&app->port_hash, msg->port_msg.pid, 3840 msg->port_msg.reply_port); 3841 if (nxt_slow_path(app_port == NULL)) { 3842 nxt_thread_mutex_unlock(&app->mutex); 3843 |
3810 r = req_rpc_data->request; | |
3811 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 3812 | 3844 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 3845 |
3846 if (unlinked) { 3847 nxt_mp_release(r->mem_pool); 3848 } 3849 |
|
3813 return; 3814 } 3815 3816 main_app_port = app_port->main_app_port; 3817 3818 if (nxt_queue_chk_remove(&main_app_port->idle_link)) { 3819 app->idle_processes--; 3820 --- 31 unchanged lines hidden (view full) --- 3852 } 3853 3854 main_app_port->active_requests++; 3855 3856 nxt_port_inc_use(app_port); 3857 3858 nxt_thread_mutex_unlock(&app->mutex); 3859 | 3850 return; 3851 } 3852 3853 main_app_port = app_port->main_app_port; 3854 3855 if (nxt_queue_chk_remove(&main_app_port->idle_link)) { 3856 app->idle_processes--; 3857 --- 31 unchanged lines hidden (view full) --- 3889 } 3890 3891 main_app_port->active_requests++; 3892 3893 nxt_port_inc_use(app_port); 3894 3895 nxt_thread_mutex_unlock(&app->mutex); 3896 |
3897 if (unlinked) { 3898 nxt_mp_release(r->mem_pool); 3899 } 3900 |
|
3860 if (start_process) { 3861 nxt_router_start_app_process(task, app); 3862 } 3863 3864 nxt_port_use(task, req_rpc_data->app_port, -1); 3865 3866 req_rpc_data->app_port = app_port; 3867 --- 4 unchanged lines hidden (view full) --- 3872 lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET); 3873 3874 res = nxt_port_socket_write(task, app_port, NXT_PORT_MSG_REQ_BODY, 3875 req_rpc_data->msg_info.body_fd, 3876 req_rpc_data->stream, 3877 task->thread->engine->port->id, NULL); 3878 3879 if (nxt_slow_path(res != NXT_OK)) { | 3901 if (start_process) { 3902 nxt_router_start_app_process(task, app); 3903 } 3904 3905 nxt_port_use(task, req_rpc_data->app_port, -1); 3906 3907 req_rpc_data->app_port = app_port; 3908 --- 4 unchanged lines hidden (view full) --- 3913 lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET); 3914 3915 res = nxt_port_socket_write(task, app_port, NXT_PORT_MSG_REQ_BODY, 3916 req_rpc_data->msg_info.body_fd, 3917 req_rpc_data->stream, 3918 task->thread->engine->port->id, NULL); 3919 3920 if (nxt_slow_path(res != NXT_OK)) { |
3880 r = req_rpc_data->request; 3881 | |
3882 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 3883 } 3884 } 3885 3886 if (app->timeout != 0) { | 3921 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); 3922 } 3923 } 3924 3925 if (app->timeout != 0) { |
3887 r = req_rpc_data->request; 3888 | |
3889 r->timer.handler = nxt_router_app_timeout; 3890 r->timer_data = req_rpc_data; 3891 nxt_timer_add(task->thread->engine, &r->timer, app->timeout); 3892 } 3893} 3894 3895 3896static const nxt_http_request_state_t nxt_http_request_send_state --- 126 unchanged lines hidden (view full) --- 4023 0, 0, b); 4024} 4025 4026 4027static void 4028nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 4029 void *data) 4030{ | 3926 r->timer.handler = nxt_router_app_timeout; 3927 r->timer_data = req_rpc_data; 3928 nxt_timer_add(task->thread->engine, &r->timer, app->timeout); 3929 } 3930} 3931 3932 3933static const nxt_http_request_state_t nxt_http_request_send_state --- 126 unchanged lines hidden (view full) --- 4060 0, 0, b); 4061} 4062 4063 4064static void 4065nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, 4066 void *data) 4067{ |
4031 nxt_app_t *app; 4032 nxt_app_joint_t *app_joint; | 4068 nxt_app_t *app; 4069 nxt_app_joint_t *app_joint; 4070 nxt_queue_link_t *link; 4071 nxt_http_request_t *r; |
4033 4034 app_joint = data; 4035 4036 nxt_assert(app_joint != NULL); 4037 4038 app = app_joint->app; 4039 4040 nxt_router_app_joint_use(task, app_joint, -1); 4041 4042 if (nxt_slow_path(app == NULL)) { 4043 nxt_debug(task, "start error for released app"); 4044 4045 return; 4046 } 4047 4048 nxt_debug(task, "app '%V' %p start error", &app->name, app); 4049 | 4072 4073 app_joint = data; 4074 4075 nxt_assert(app_joint != NULL); 4076 4077 app = app_joint->app; 4078 4079 nxt_router_app_joint_use(task, app_joint, -1); 4080 4081 if (nxt_slow_path(app == NULL)) { 4082 nxt_debug(task, "start error for released app"); 4083 4084 return; 4085 } 4086 4087 nxt_debug(task, "app '%V' %p start error", &app->name, app); 4088 |
4089 link = NULL; 4090 |
|
4050 nxt_thread_mutex_lock(&app->mutex); 4051 4052 nxt_assert(app->pending_processes != 0); 4053 4054 app->pending_processes--; 4055 | 4091 nxt_thread_mutex_lock(&app->mutex); 4092 4093 nxt_assert(app->pending_processes != 0); 4094 4095 app->pending_processes--; 4096 |
4097 if (app->processes == 0 && !nxt_queue_is_empty(&app->ack_waiting_req)) { 4098 link = nxt_queue_first(&app->ack_waiting_req); 4099 4100 nxt_queue_remove(link); 4101 link->next = NULL; 4102 } 4103 |
|
4056 nxt_thread_mutex_unlock(&app->mutex); 4057 | 4104 nxt_thread_mutex_unlock(&app->mutex); 4105 |
4058 /* TODO req_app_link to cancel first pending message */ | 4106 while (link != NULL) { 4107 r = nxt_container_of(link, nxt_http_request_t, app_link); 4108 4109 nxt_event_engine_post(r->engine, &r->err_work); 4110 4111 link = NULL; 4112 4113 nxt_thread_mutex_lock(&app->mutex); 4114 4115 if (app->processes == 0 && app->pending_processes == 0 4116 && !nxt_queue_is_empty(&app->ack_waiting_req)) 4117 { 4118 link = nxt_queue_first(&app->ack_waiting_req); 4119 4120 nxt_queue_remove(link); 4121 link->next = NULL; 4122 } 4123 4124 nxt_thread_mutex_unlock(&app->mutex); 4125 } |
4059} 4060 4061 4062void 4063nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i) 4064{ 4065 int c; 4066 --- 469 unchanged lines hidden (view full) --- 4536 } 4537} 4538 4539 4540static void 4541nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, 4542 nxt_request_rpc_data_t *req_rpc_data) 4543{ | 4126} 4127 4128 4129void 4130nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i) 4131{ 4132 int c; 4133 --- 469 unchanged lines hidden (view full) --- 4603 } 4604} 4605 4606 4607static void 4608nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, 4609 nxt_request_rpc_data_t *req_rpc_data) 4610{ |
4544 nxt_bool_t start_process; 4545 nxt_port_t *port; | 4611 nxt_bool_t start_process; 4612 nxt_port_t *port; 4613 nxt_http_request_t *r; |
4546 4547 start_process = 0; 4548 4549 nxt_thread_mutex_lock(&app->mutex); 4550 4551 port = app->shared_port; 4552 nxt_port_inc_use(port); 4553 4554 app->active_requests++; 4555 4556 if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) { 4557 app->pending_processes++; 4558 start_process = 1; 4559 } 4560 | 4614 4615 start_process = 0; 4616 4617 nxt_thread_mutex_lock(&app->mutex); 4618 4619 port = app->shared_port; 4620 nxt_port_inc_use(port); 4621 4622 app->active_requests++; 4623 4624 if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) { 4625 app->pending_processes++; 4626 start_process = 1; 4627 } 4628 |
4629 r = req_rpc_data->request; 4630 4631 /* 4632 * Put request into application-wide list to be able to cancel request 4633 * if something goes wrong with application processes. 4634 */ 4635 nxt_queue_insert_tail(&app->ack_waiting_req, &r->app_link); 4636 |
|
4561 nxt_thread_mutex_unlock(&app->mutex); 4562 | 4637 nxt_thread_mutex_unlock(&app->mutex); 4638 |
4639 /* 4640 * Retain request memory pool while request is linked in ack_waiting_req 4641 * to guarantee request structure memory is accessble. 4642 */ 4643 nxt_mp_retain(r->mem_pool); 4644 |
|
4563 req_rpc_data->app_port = port; 4564 req_rpc_data->apr_action = NXT_APR_REQUEST_FAILED; 4565 4566 if (start_process) { 4567 nxt_router_start_app_process(task, app); 4568 } 4569} 4570 --- 26 unchanged lines hidden (view full) --- 4597 */ 4598 nxt_mp_retain(r->mem_pool); 4599 4600 r->timer.task = &engine->task; 4601 r->timer.work_queue = &engine->fast_work_queue; 4602 r->timer.log = engine->task.log; 4603 r->timer.bias = NXT_TIMER_DEFAULT_BIAS; 4604 | 4645 req_rpc_data->app_port = port; 4646 req_rpc_data->apr_action = NXT_APR_REQUEST_FAILED; 4647 4648 if (start_process) { 4649 nxt_router_start_app_process(task, app); 4650 } 4651} 4652 --- 26 unchanged lines hidden (view full) --- 4679 */ 4680 nxt_mp_retain(r->mem_pool); 4681 4682 r->timer.task = &engine->task; 4683 r->timer.work_queue = &engine->fast_work_queue; 4684 r->timer.log = engine->task.log; 4685 r->timer.bias = NXT_TIMER_DEFAULT_BIAS; 4686 |
4687 r->engine = engine; 4688 r->err_work.handler = nxt_router_http_request_error; 4689 r->err_work.task = task; 4690 r->err_work.obj = r; 4691 |
|
4605 req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data); 4606 req_rpc_data->app = app; 4607 req_rpc_data->msg_info.body_fd = -1; 4608 req_rpc_data->rpc_cancel = 1; 4609 4610 nxt_router_app_use(task, app, 1); 4611 4612 req_rpc_data->request = r; --- 4 unchanged lines hidden (view full) --- 4617 } 4618 4619 nxt_router_app_port_get(task, app, req_rpc_data); 4620 nxt_router_app_prepare_request(task, req_rpc_data); 4621} 4622 4623 4624static void | 4692 req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data); 4693 req_rpc_data->app = app; 4694 req_rpc_data->msg_info.body_fd = -1; 4695 req_rpc_data->rpc_cancel = 1; 4696 4697 nxt_router_app_use(task, app, 1); 4698 4699 req_rpc_data->request = r; --- 4 unchanged lines hidden (view full) --- 4704 } 4705 4706 nxt_router_app_port_get(task, app, req_rpc_data); 4707 nxt_router_app_prepare_request(task, req_rpc_data); 4708} 4709 4710 4711static void |
4712nxt_router_http_request_error(nxt_task_t *task, void *obj, void *data) 4713{ 4714 nxt_http_request_t *r; 4715 4716 r = obj; 4717 4718 nxt_debug(task, "router http request error (rpc_data %p)", r->req_rpc_data); 4719 4720 nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE); 4721 4722 if (r->req_rpc_data != NULL) { 4723 nxt_request_rpc_data_unlink(task, r->req_rpc_data); 4724 } 4725 4726 nxt_mp_release(r->mem_pool); 4727} 4728 4729 4730static void |
|
4625nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data) 4626{ 4627 nxt_http_request_t *r; 4628 4629 r = data; 4630 4631 nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data); 4632 | 4731nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data) 4732{ 4733 nxt_http_request_t *r; 4734 4735 r = data; 4736 4737 nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data); 4738 |
4633 if (r->req_rpc_data) { | 4739 if (r->req_rpc_data != NULL) { |
4634 nxt_request_rpc_data_unlink(task, r->req_rpc_data); 4635 } 4636 4637 nxt_http_request_close_handler(task, r, r->proto.any); 4638} 4639 4640 4641static void --- 641 unchanged lines hidden --- | 4740 nxt_request_rpc_data_unlink(task, r->req_rpc_data); 4741 } 4742 4743 nxt_http_request_close_handler(task, r, r->proto.any); 4744} 4745 4746 4747static void --- 641 unchanged lines hidden --- |