nxt_router.c (1558:026e4b909b61) nxt_router.c (1561:4dae2fd8e02a)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>

--- 190 unchanged lines hidden (view full) ---

199 nxt_port_recv_msg_t *msg, void *data);
200
201static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
202
203static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
204 nxt_apr_action_t action);
205static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
206 nxt_request_rpc_data_t *req_rpc_data);
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>

--- 190 unchanged lines hidden (view full) ---

199 nxt_port_recv_msg_t *msg, void *data);
200
201static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
202
203static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
204 nxt_apr_action_t action);
205static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
206 nxt_request_rpc_data_t *req_rpc_data);
207static void nxt_router_http_request_error(nxt_task_t *task, void *obj,
208 void *data);
207static void nxt_router_http_request_done(nxt_task_t *task, void *obj,
208 void *data);
209
210static void nxt_router_app_prepare_request(nxt_task_t *task,
211 nxt_request_rpc_data_t *req_rpc_data);
212static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task,
213 nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix);
214

--- 319 unchanged lines hidden (view full) ---

534 return 0;
535}
536
537
538nxt_inline void
539nxt_request_rpc_data_unlink(nxt_task_t *task,
540 nxt_request_rpc_data_t *req_rpc_data)
541{
209static void nxt_router_http_request_done(nxt_task_t *task, void *obj,
210 void *data);
211
212static void nxt_router_app_prepare_request(nxt_task_t *task,
213 nxt_request_rpc_data_t *req_rpc_data);
214static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task,
215 nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix);
216

--- 319 unchanged lines hidden (view full) ---

536 return 0;
537}
538
539
540nxt_inline void
541nxt_request_rpc_data_unlink(nxt_task_t *task,
542 nxt_request_rpc_data_t *req_rpc_data)
543{
544 nxt_app_t *app;
545 nxt_bool_t unlinked;
542 nxt_http_request_t *r;
543
544 nxt_router_msg_cancel(task, req_rpc_data);
545
546 if (req_rpc_data->app_port != NULL) {
547 nxt_router_app_port_release(task, req_rpc_data->app_port,
548 req_rpc_data->apr_action);
549
550 req_rpc_data->app_port = NULL;
551 }
552
546 nxt_http_request_t *r;
547
548 nxt_router_msg_cancel(task, req_rpc_data);
549
550 if (req_rpc_data->app_port != NULL) {
551 nxt_router_app_port_release(task, req_rpc_data->app_port,
552 req_rpc_data->apr_action);
553
554 req_rpc_data->app_port = NULL;
555 }
556
553 if (req_rpc_data->app != NULL) {
554 nxt_router_app_use(task, req_rpc_data->app, -1);
555
556 req_rpc_data->app = NULL;
557 }
558
557 app = req_rpc_data->app;
559 r = req_rpc_data->request;
560
561 if (r != NULL) {
562 r->timer_data = NULL;
563
564 nxt_router_http_request_release_post(task, r);
565
566 r->req_rpc_data = NULL;
567 req_rpc_data->request = NULL;
558 r = req_rpc_data->request;
559
560 if (r != NULL) {
561 r->timer_data = NULL;
562
563 nxt_router_http_request_release_post(task, r);
564
565 r->req_rpc_data = NULL;
566 req_rpc_data->request = NULL;
567
568 if (app != NULL) {
569 unlinked = 0;
570
571 nxt_thread_mutex_lock(&app->mutex);
572
573 if (r->app_link.next != NULL) {
574 nxt_queue_remove(&r->app_link);
575 r->app_link.next = NULL;
576
577 unlinked = 1;
578 }
579
580 nxt_thread_mutex_unlock(&app->mutex);
581
582 if (unlinked) {
583 nxt_mp_release(r->mem_pool);
584 }
585 }
568 }
569
586 }
587
588 if (app != NULL) {
589 nxt_router_app_use(task, app, -1);
590
591 req_rpc_data->app = NULL;
592 }
593
570 if (req_rpc_data->msg_info.body_fd != -1) {
571 nxt_fd_close(req_rpc_data->msg_info.body_fd);
572
573 req_rpc_data->msg_info.body_fd = -1;
574 }
575
576 if (req_rpc_data->rpc_cancel) {
577 req_rpc_data->rpc_cancel = 0;

--- 909 unchanged lines hidden (view full) ---

1487 ret = nxt_thread_mutex_create(&app->mutex);
1488 if (ret != NXT_OK) {
1489 goto app_fail;
1490 }
1491
1492 nxt_queue_init(&app->ports);
1493 nxt_queue_init(&app->spare_ports);
1494 nxt_queue_init(&app->idle_ports);
594 if (req_rpc_data->msg_info.body_fd != -1) {
595 nxt_fd_close(req_rpc_data->msg_info.body_fd);
596
597 req_rpc_data->msg_info.body_fd = -1;
598 }
599
600 if (req_rpc_data->rpc_cancel) {
601 req_rpc_data->rpc_cancel = 0;

--- 909 unchanged lines hidden (view full) ---

1511 ret = nxt_thread_mutex_create(&app->mutex);
1512 if (ret != NXT_OK) {
1513 goto app_fail;
1514 }
1515
1516 nxt_queue_init(&app->ports);
1517 nxt_queue_init(&app->spare_ports);
1518 nxt_queue_init(&app->idle_ports);
1519 nxt_queue_init(&app->ack_waiting_req);
1495
1496 app->name.length = name.length;
1497 nxt_memcpy(app->name.start, name.start, name.length);
1498
1499 app->type = lang->type;
1500 app->max_processes = apcf.max_processes;
1501 app->spare_processes = apcf.spare_processes;
1502 app->max_pending_processes = apcf.spare_processes

--- 2276 unchanged lines hidden (view full) ---

3779
3780
3781static void
3782nxt_router_req_headers_ack_handler(nxt_task_t *task,
3783 nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data)
3784{
3785 int res;
3786 nxt_app_t *app;
1520
1521 app->name.length = name.length;
1522 nxt_memcpy(app->name.start, name.start, name.length);
1523
1524 app->type = lang->type;
1525 app->max_processes = apcf.max_processes;
1526 app->spare_processes = apcf.spare_processes;
1527 app->max_pending_processes = apcf.spare_processes

--- 2276 unchanged lines hidden (view full) ---

3804
3805
3806static void
3807nxt_router_req_headers_ack_handler(nxt_task_t *task,
3808 nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data)
3809{
3810 int res;
3811 nxt_app_t *app;
3787 nxt_bool_t start_process;
3812 nxt_bool_t start_process, unlinked;
3788 nxt_port_t *app_port, *main_app_port, *idle_port;
3789 nxt_queue_link_t *idle_lnk;
3790 nxt_http_request_t *r;
3791
3792 nxt_debug(task, "stream #%uD: got ack from %PI:%d",
3793 req_rpc_data->stream,
3794 msg->port_msg.pid, msg->port_msg.reply_port);
3795
3796 nxt_port_rpc_ex_set_peer(task, msg->port, req_rpc_data,
3797 msg->port_msg.pid);
3798
3799 app = req_rpc_data->app;
3813 nxt_port_t *app_port, *main_app_port, *idle_port;
3814 nxt_queue_link_t *idle_lnk;
3815 nxt_http_request_t *r;
3816
3817 nxt_debug(task, "stream #%uD: got ack from %PI:%d",
3818 req_rpc_data->stream,
3819 msg->port_msg.pid, msg->port_msg.reply_port);
3820
3821 nxt_port_rpc_ex_set_peer(task, msg->port, req_rpc_data,
3822 msg->port_msg.pid);
3823
3824 app = req_rpc_data->app;
3825 r = req_rpc_data->request;
3800
3801 start_process = 0;
3826
3827 start_process = 0;
3828 unlinked = 0;
3802
3803 nxt_thread_mutex_lock(&app->mutex);
3804
3829
3830 nxt_thread_mutex_lock(&app->mutex);
3831
3832 if (r->app_link.next != NULL) {
3833 nxt_queue_remove(&r->app_link);
3834 r->app_link.next = NULL;
3835
3836 unlinked = 1;
3837 }
3838
3805 app_port = nxt_port_hash_find(&app->port_hash, msg->port_msg.pid,
3806 msg->port_msg.reply_port);
3807 if (nxt_slow_path(app_port == NULL)) {
3808 nxt_thread_mutex_unlock(&app->mutex);
3809
3839 app_port = nxt_port_hash_find(&app->port_hash, msg->port_msg.pid,
3840 msg->port_msg.reply_port);
3841 if (nxt_slow_path(app_port == NULL)) {
3842 nxt_thread_mutex_unlock(&app->mutex);
3843
3810 r = req_rpc_data->request;
3811 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
3812
3844 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
3845
3846 if (unlinked) {
3847 nxt_mp_release(r->mem_pool);
3848 }
3849
3813 return;
3814 }
3815
3816 main_app_port = app_port->main_app_port;
3817
3818 if (nxt_queue_chk_remove(&main_app_port->idle_link)) {
3819 app->idle_processes--;
3820

--- 31 unchanged lines hidden (view full) ---

3852 }
3853
3854 main_app_port->active_requests++;
3855
3856 nxt_port_inc_use(app_port);
3857
3858 nxt_thread_mutex_unlock(&app->mutex);
3859
3850 return;
3851 }
3852
3853 main_app_port = app_port->main_app_port;
3854
3855 if (nxt_queue_chk_remove(&main_app_port->idle_link)) {
3856 app->idle_processes--;
3857

--- 31 unchanged lines hidden (view full) ---

3889 }
3890
3891 main_app_port->active_requests++;
3892
3893 nxt_port_inc_use(app_port);
3894
3895 nxt_thread_mutex_unlock(&app->mutex);
3896
3897 if (unlinked) {
3898 nxt_mp_release(r->mem_pool);
3899 }
3900
3860 if (start_process) {
3861 nxt_router_start_app_process(task, app);
3862 }
3863
3864 nxt_port_use(task, req_rpc_data->app_port, -1);
3865
3866 req_rpc_data->app_port = app_port;
3867

--- 4 unchanged lines hidden (view full) ---

3872 lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET);
3873
3874 res = nxt_port_socket_write(task, app_port, NXT_PORT_MSG_REQ_BODY,
3875 req_rpc_data->msg_info.body_fd,
3876 req_rpc_data->stream,
3877 task->thread->engine->port->id, NULL);
3878
3879 if (nxt_slow_path(res != NXT_OK)) {
3901 if (start_process) {
3902 nxt_router_start_app_process(task, app);
3903 }
3904
3905 nxt_port_use(task, req_rpc_data->app_port, -1);
3906
3907 req_rpc_data->app_port = app_port;
3908

--- 4 unchanged lines hidden (view full) ---

3913 lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET);
3914
3915 res = nxt_port_socket_write(task, app_port, NXT_PORT_MSG_REQ_BODY,
3916 req_rpc_data->msg_info.body_fd,
3917 req_rpc_data->stream,
3918 task->thread->engine->port->id, NULL);
3919
3920 if (nxt_slow_path(res != NXT_OK)) {
3880 r = req_rpc_data->request;
3881
3882 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
3883 }
3884 }
3885
3886 if (app->timeout != 0) {
3921 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
3922 }
3923 }
3924
3925 if (app->timeout != 0) {
3887 r = req_rpc_data->request;
3888
3889 r->timer.handler = nxt_router_app_timeout;
3890 r->timer_data = req_rpc_data;
3891 nxt_timer_add(task->thread->engine, &r->timer, app->timeout);
3892 }
3893}
3894
3895
3896static const nxt_http_request_state_t nxt_http_request_send_state

--- 126 unchanged lines hidden (view full) ---

4023 0, 0, b);
4024}
4025
4026
4027static void
4028nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4029 void *data)
4030{
3926 r->timer.handler = nxt_router_app_timeout;
3927 r->timer_data = req_rpc_data;
3928 nxt_timer_add(task->thread->engine, &r->timer, app->timeout);
3929 }
3930}
3931
3932
3933static const nxt_http_request_state_t nxt_http_request_send_state

--- 126 unchanged lines hidden (view full) ---

4060 0, 0, b);
4061}
4062
4063
4064static void
4065nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4066 void *data)
4067{
4031 nxt_app_t *app;
4032 nxt_app_joint_t *app_joint;
4068 nxt_app_t *app;
4069 nxt_app_joint_t *app_joint;
4070 nxt_queue_link_t *link;
4071 nxt_http_request_t *r;
4033
4034 app_joint = data;
4035
4036 nxt_assert(app_joint != NULL);
4037
4038 app = app_joint->app;
4039
4040 nxt_router_app_joint_use(task, app_joint, -1);
4041
4042 if (nxt_slow_path(app == NULL)) {
4043 nxt_debug(task, "start error for released app");
4044
4045 return;
4046 }
4047
4048 nxt_debug(task, "app '%V' %p start error", &app->name, app);
4049
4072
4073 app_joint = data;
4074
4075 nxt_assert(app_joint != NULL);
4076
4077 app = app_joint->app;
4078
4079 nxt_router_app_joint_use(task, app_joint, -1);
4080
4081 if (nxt_slow_path(app == NULL)) {
4082 nxt_debug(task, "start error for released app");
4083
4084 return;
4085 }
4086
4087 nxt_debug(task, "app '%V' %p start error", &app->name, app);
4088
4089 link = NULL;
4090
4050 nxt_thread_mutex_lock(&app->mutex);
4051
4052 nxt_assert(app->pending_processes != 0);
4053
4054 app->pending_processes--;
4055
4091 nxt_thread_mutex_lock(&app->mutex);
4092
4093 nxt_assert(app->pending_processes != 0);
4094
4095 app->pending_processes--;
4096
4097 if (app->processes == 0 && !nxt_queue_is_empty(&app->ack_waiting_req)) {
4098 link = nxt_queue_first(&app->ack_waiting_req);
4099
4100 nxt_queue_remove(link);
4101 link->next = NULL;
4102 }
4103
4056 nxt_thread_mutex_unlock(&app->mutex);
4057
4104 nxt_thread_mutex_unlock(&app->mutex);
4105
4058 /* TODO req_app_link to cancel first pending message */
4106 while (link != NULL) {
4107 r = nxt_container_of(link, nxt_http_request_t, app_link);
4108
4109 nxt_event_engine_post(r->engine, &r->err_work);
4110
4111 link = NULL;
4112
4113 nxt_thread_mutex_lock(&app->mutex);
4114
4115 if (app->processes == 0 && app->pending_processes == 0
4116 && !nxt_queue_is_empty(&app->ack_waiting_req))
4117 {
4118 link = nxt_queue_first(&app->ack_waiting_req);
4119
4120 nxt_queue_remove(link);
4121 link->next = NULL;
4122 }
4123
4124 nxt_thread_mutex_unlock(&app->mutex);
4125 }
4059}
4060
4061
4062void
4063nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i)
4064{
4065 int c;
4066

--- 469 unchanged lines hidden (view full) ---

4536 }
4537}
4538
4539
4540static void
4541nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
4542 nxt_request_rpc_data_t *req_rpc_data)
4543{
4126}
4127
4128
4129void
4130nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i)
4131{
4132 int c;
4133

--- 469 unchanged lines hidden (view full) ---

4603 }
4604}
4605
4606
4607static void
4608nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
4609 nxt_request_rpc_data_t *req_rpc_data)
4610{
4544 nxt_bool_t start_process;
4545 nxt_port_t *port;
4611 nxt_bool_t start_process;
4612 nxt_port_t *port;
4613 nxt_http_request_t *r;
4546
4547 start_process = 0;
4548
4549 nxt_thread_mutex_lock(&app->mutex);
4550
4551 port = app->shared_port;
4552 nxt_port_inc_use(port);
4553
4554 app->active_requests++;
4555
4556 if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) {
4557 app->pending_processes++;
4558 start_process = 1;
4559 }
4560
4614
4615 start_process = 0;
4616
4617 nxt_thread_mutex_lock(&app->mutex);
4618
4619 port = app->shared_port;
4620 nxt_port_inc_use(port);
4621
4622 app->active_requests++;
4623
4624 if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) {
4625 app->pending_processes++;
4626 start_process = 1;
4627 }
4628
4629 r = req_rpc_data->request;
4630
4631 /*
4632 * Put request into application-wide list to be able to cancel request
4633 * if something goes wrong with application processes.
4634 */
4635 nxt_queue_insert_tail(&app->ack_waiting_req, &r->app_link);
4636
4561 nxt_thread_mutex_unlock(&app->mutex);
4562
4637 nxt_thread_mutex_unlock(&app->mutex);
4638
4639 /*
4640 * Retain request memory pool while request is linked in ack_waiting_req
4641 * to guarantee request structure memory is accessble.
4642 */
4643 nxt_mp_retain(r->mem_pool);
4644
4563 req_rpc_data->app_port = port;
4564 req_rpc_data->apr_action = NXT_APR_REQUEST_FAILED;
4565
4566 if (start_process) {
4567 nxt_router_start_app_process(task, app);
4568 }
4569}
4570

--- 26 unchanged lines hidden (view full) ---

4597 */
4598 nxt_mp_retain(r->mem_pool);
4599
4600 r->timer.task = &engine->task;
4601 r->timer.work_queue = &engine->fast_work_queue;
4602 r->timer.log = engine->task.log;
4603 r->timer.bias = NXT_TIMER_DEFAULT_BIAS;
4604
4645 req_rpc_data->app_port = port;
4646 req_rpc_data->apr_action = NXT_APR_REQUEST_FAILED;
4647
4648 if (start_process) {
4649 nxt_router_start_app_process(task, app);
4650 }
4651}
4652

--- 26 unchanged lines hidden (view full) ---

4679 */
4680 nxt_mp_retain(r->mem_pool);
4681
4682 r->timer.task = &engine->task;
4683 r->timer.work_queue = &engine->fast_work_queue;
4684 r->timer.log = engine->task.log;
4685 r->timer.bias = NXT_TIMER_DEFAULT_BIAS;
4686
4687 r->engine = engine;
4688 r->err_work.handler = nxt_router_http_request_error;
4689 r->err_work.task = task;
4690 r->err_work.obj = r;
4691
4605 req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data);
4606 req_rpc_data->app = app;
4607 req_rpc_data->msg_info.body_fd = -1;
4608 req_rpc_data->rpc_cancel = 1;
4609
4610 nxt_router_app_use(task, app, 1);
4611
4612 req_rpc_data->request = r;

--- 4 unchanged lines hidden (view full) ---

4617 }
4618
4619 nxt_router_app_port_get(task, app, req_rpc_data);
4620 nxt_router_app_prepare_request(task, req_rpc_data);
4621}
4622
4623
4624static void
4692 req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data);
4693 req_rpc_data->app = app;
4694 req_rpc_data->msg_info.body_fd = -1;
4695 req_rpc_data->rpc_cancel = 1;
4696
4697 nxt_router_app_use(task, app, 1);
4698
4699 req_rpc_data->request = r;

--- 4 unchanged lines hidden (view full) ---

4704 }
4705
4706 nxt_router_app_port_get(task, app, req_rpc_data);
4707 nxt_router_app_prepare_request(task, req_rpc_data);
4708}
4709
4710
4711static void
4712nxt_router_http_request_error(nxt_task_t *task, void *obj, void *data)
4713{
4714 nxt_http_request_t *r;
4715
4716 r = obj;
4717
4718 nxt_debug(task, "router http request error (rpc_data %p)", r->req_rpc_data);
4719
4720 nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
4721
4722 if (r->req_rpc_data != NULL) {
4723 nxt_request_rpc_data_unlink(task, r->req_rpc_data);
4724 }
4725
4726 nxt_mp_release(r->mem_pool);
4727}
4728
4729
4730static void
4625nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data)
4626{
4627 nxt_http_request_t *r;
4628
4629 r = data;
4630
4631 nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data);
4632
4731nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data)
4732{
4733 nxt_http_request_t *r;
4734
4735 r = data;
4736
4737 nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data);
4738
4633 if (r->req_rpc_data) {
4739 if (r->req_rpc_data != NULL) {
4634 nxt_request_rpc_data_unlink(task, r->req_rpc_data);
4635 }
4636
4637 nxt_http_request_close_handler(task, r, r->proto.any);
4638}
4639
4640
4641static void

--- 641 unchanged lines hidden ---
4740 nxt_request_rpc_data_unlink(task, r->req_rpc_data);
4741 }
4742
4743 nxt_http_request_close_handler(task, r, r->proto.any);
4744}
4745
4746
4747static void

--- 641 unchanged lines hidden ---