nxt_router.c (1546:06017e6e3a5f) nxt_router.c (1547:cbcd76704c90)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>

--- 47 unchanged lines hidden (view full) ---

56
57
58typedef struct {
59 nxt_app_t *app;
60 nxt_router_temp_conf_t *temp_conf;
61} nxt_app_rpc_t;
62
63
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>

--- 47 unchanged lines hidden (view full) ---

56
57
58typedef struct {
59 nxt_app_t *app;
60 nxt_router_temp_conf_t *temp_conf;
61} nxt_app_rpc_t;
62
63
64struct nxt_port_select_state_s {
65 nxt_app_t *app;
66 nxt_request_app_link_t *req_app_link;
67
68 nxt_port_t *failed_port;
69 int failed_port_use_delta;
70
71 uint8_t start_process; /* 1 bit */
72 nxt_request_app_link_t *shared_ra;
73 nxt_port_t *port;
74};
75
76typedef struct nxt_port_select_state_s nxt_port_select_state_t;
77
78static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process,
79 nxt_mp_t *mp);
80static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data);
81static void nxt_router_greet_controller(nxt_task_t *task,
82 nxt_port_t *controller_port);
83
64static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process,
65 nxt_mp_t *mp);
66static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data);
67static void nxt_router_greet_controller(nxt_task_t *task,
68 nxt_port_t *controller_port);
69
84static void nxt_router_port_select(nxt_task_t *task,
85 nxt_port_select_state_t *state);
86
87static nxt_int_t nxt_router_port_post_select(nxt_task_t *task,
88 nxt_port_select_state_t *state);
89
90static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
70static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
91static void nxt_request_app_link_update_peer(nxt_task_t *task,
92 nxt_request_app_link_t *req_app_link);
93
71
94
95nxt_inline void
96nxt_request_app_link_inc_use(nxt_request_app_link_t *req_app_link)
97{
98 nxt_atomic_fetch_add(&req_app_link->use_count, 1);
99}
100
101nxt_inline void
102nxt_request_app_link_chk_use(nxt_request_app_link_t *req_app_link, int i)
103{
104#if (NXT_DEBUG)
105 int c;
106
107 c = nxt_atomic_fetch_add(&req_app_link->use_count, i);
108
109 nxt_assert((c + i) > 0);
110#else
111 (void) nxt_atomic_fetch_add(&req_app_link->use_count, i);
112#endif
113}
114
115static void nxt_request_app_link_use(nxt_task_t *task,
116 nxt_request_app_link_t *req_app_link, int i);
117
118static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
119static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
120static void nxt_router_conf_ready(nxt_task_t *task,
121 nxt_router_temp_conf_t *tmcf);
122static void nxt_router_conf_error(nxt_task_t *task,
123 nxt_router_temp_conf_t *tmcf);
124static void nxt_router_conf_send(nxt_task_t *task,
125 nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);

--- 65 unchanged lines hidden (view full) ---

191static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
192 void *data);
193static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
194 void *data);
195static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
196 void *data);
197static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
198 void *data);
72static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
73static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
74static void nxt_router_conf_ready(nxt_task_t *task,
75 nxt_router_temp_conf_t *tmcf);
76static void nxt_router_conf_error(nxt_task_t *task,
77 nxt_router_temp_conf_t *tmcf);
78static void nxt_router_conf_send(nxt_task_t *task,
79 nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);

--- 65 unchanged lines hidden (view full) ---

145static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
146 void *data);
147static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
148 void *data);
149static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
150 void *data);
151static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
152 void *data);
153static void nxt_router_req_headers_ack_handler(nxt_task_t *task,
154 nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data);
199static void nxt_router_listen_socket_release(nxt_task_t *task,
200 nxt_socket_conf_t *skcf);
201
202static void nxt_router_access_log_writer(nxt_task_t *task,
203 nxt_http_request_t *r, nxt_router_access_log_t *access_log);
204static u_char *nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now,
205 struct tm *tm, size_t size, const char *format);
206static void nxt_router_access_log_open(nxt_task_t *task,

--- 8 unchanged lines hidden (view full) ---

215 void *data);
216static void nxt_router_access_log_reopen_ready(nxt_task_t *task,
217 nxt_port_recv_msg_t *msg, void *data);
218static void nxt_router_access_log_reopen_error(nxt_task_t *task,
219 nxt_port_recv_msg_t *msg, void *data);
220
221static void nxt_router_app_port_ready(nxt_task_t *task,
222 nxt_port_recv_msg_t *msg, void *data);
155static void nxt_router_listen_socket_release(nxt_task_t *task,
156 nxt_socket_conf_t *skcf);
157
158static void nxt_router_access_log_writer(nxt_task_t *task,
159 nxt_http_request_t *r, nxt_router_access_log_t *access_log);
160static u_char *nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now,
161 struct tm *tm, size_t size, const char *format);
162static void nxt_router_access_log_open(nxt_task_t *task,

--- 8 unchanged lines hidden (view full) ---

171 void *data);
172static void nxt_router_access_log_reopen_ready(nxt_task_t *task,
173 nxt_port_recv_msg_t *msg, void *data);
174static void nxt_router_access_log_reopen_error(nxt_task_t *task,
175 nxt_port_recv_msg_t *msg, void *data);
176
177static void nxt_router_app_port_ready(nxt_task_t *task,
178 nxt_port_recv_msg_t *msg, void *data);
179static nxt_int_t nxt_router_app_shared_port_send(nxt_task_t *task,
180 nxt_port_t *app_port);
223static void nxt_router_app_port_error(nxt_task_t *task,
224 nxt_port_recv_msg_t *msg, void *data);
225
226static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
227
228static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
229 nxt_apr_action_t action);
181static void nxt_router_app_port_error(nxt_task_t *task,
182 nxt_port_recv_msg_t *msg, void *data);
183
184static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
185
186static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
187 nxt_apr_action_t action);
230static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app,
231 nxt_request_app_link_t *req_app_link);
188static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
189 nxt_request_rpc_data_t *req_rpc_data);
190static void nxt_router_http_request_done(nxt_task_t *task, void *obj,
191 void *data);
232
233static void nxt_router_app_prepare_request(nxt_task_t *task,
192
193static void nxt_router_app_prepare_request(nxt_task_t *task,
234 nxt_request_app_link_t *req_app_link);
194 nxt_request_rpc_data_t *req_rpc_data);
235static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task,
195static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task,
236 nxt_http_request_t *r, nxt_port_t *port, const nxt_str_t *prefix);
196 nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix);
237
238static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
239static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
240 void *data);
241static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj,
242 void *data);
243static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj,
244 void *data);
245static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data);
246
247static const nxt_http_request_state_t nxt_http_request_send_state;
248static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
249
250static void nxt_router_app_joint_use(nxt_task_t *task,
251 nxt_app_joint_t *app_joint, int i);
252
197
198static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
199static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
200 void *data);
201static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj,
202 void *data);
203static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj,
204 void *data);
205static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data);
206
207static const nxt_http_request_state_t nxt_http_request_send_state;
208static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
209
210static void nxt_router_app_joint_use(nxt_task_t *task,
211 nxt_app_joint_t *app_joint, int i);
212
253static nxt_int_t nxt_router_http_request_done(nxt_task_t *task,
213static void nxt_router_http_request_release_post(nxt_task_t *task,
254 nxt_http_request_t *r);
255static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
256 void *data);
257static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
258static void nxt_router_get_port_handler(nxt_task_t *task,
259 nxt_port_recv_msg_t *msg);
260static void nxt_router_get_mmap_handler(nxt_task_t *task,
261 nxt_port_recv_msg_t *msg);

--- 234 unchanged lines hidden (view full) ---

496 nxt_thread_mutex_unlock(&app->mutex);
497
498 nxt_router_app_use(task, app, -1);
499
500 return NXT_ERROR;
501}
502
503
214 nxt_http_request_t *r);
215static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
216 void *data);
217static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
218static void nxt_router_get_port_handler(nxt_task_t *task,
219 nxt_port_recv_msg_t *msg);
220static void nxt_router_get_mmap_handler(nxt_task_t *task,
221 nxt_port_recv_msg_t *msg);

--- 234 unchanged lines hidden (view full) ---

456 nxt_thread_mutex_unlock(&app->mutex);
457
458 nxt_router_app_use(task, app, -1);
459
460 return NXT_ERROR;
461}
462
463
504nxt_inline void
505nxt_request_app_link_init(nxt_task_t *task,
506 nxt_request_app_link_t *req_app_link, nxt_request_rpc_data_t *req_rpc_data)
507{
508 nxt_buf_t *body;
509 nxt_event_engine_t *engine;
510
511 engine = task->thread->engine;
512
513 nxt_memzero(req_app_link, sizeof(nxt_request_app_link_t));
514
515 req_app_link->stream = req_rpc_data->stream;
516 req_app_link->use_count = 1;
517 req_app_link->req_rpc_data = req_rpc_data;
518 req_rpc_data->req_app_link = req_app_link;
519 req_app_link->reply_port = engine->port;
520 req_app_link->request = req_rpc_data->request;
521 req_app_link->apr_action = NXT_APR_GOT_RESPONSE;
522
523 req_app_link->work.handler = NULL;
524 req_app_link->work.task = &engine->task;
525 req_app_link->work.obj = req_app_link;
526 req_app_link->work.data = engine;
527
528 body = req_rpc_data->request->body;
529
530 if (body != NULL && nxt_buf_is_file(body)) {
531 req_app_link->body_fd = body->file->fd;
532
533 body->file->fd = -1;
534
535 } else {
536 req_app_link->body_fd = -1;
537 }
538}
539
540
541nxt_inline nxt_request_app_link_t *
542nxt_request_app_link_alloc(nxt_task_t *task,
543 nxt_request_app_link_t *ra_src, nxt_request_rpc_data_t *req_rpc_data)
544{
545 nxt_mp_t *mp;
546 nxt_request_app_link_t *req_app_link;
547
548 if (ra_src != NULL && ra_src->mem_pool != NULL) {
549 return ra_src;
550 }
551
552 mp = req_rpc_data->request->mem_pool;
553
554 req_app_link = nxt_mp_alloc(mp, sizeof(nxt_request_app_link_t));
555
556 if (nxt_slow_path(req_app_link == NULL)) {
557
558 req_rpc_data->req_app_link = NULL;
559
560 if (ra_src != NULL) {
561 ra_src->req_rpc_data = NULL;
562 }
563
564 return NULL;
565 }
566
567 nxt_mp_retain(mp);
568
569 nxt_request_app_link_init(task, req_app_link, req_rpc_data);
570
571 if (ra_src != NULL) {
572 req_app_link->body_fd = ra_src->body_fd;
573 }
574
575 req_app_link->mem_pool = mp;
576
577 return req_app_link;
578}
579
580
581nxt_inline nxt_bool_t
582nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info,
583 uint32_t stream)
584{
585 nxt_buf_t *b, *next;
586 nxt_bool_t cancelled;
587
588 if (msg_info->buf == NULL) {

--- 20 unchanged lines hidden (view full) ---

609 }
610
611 msg_info->buf = NULL;
612
613 return cancelled;
614}
615
616
464nxt_inline nxt_bool_t
465nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info,
466 uint32_t stream)
467{
468 nxt_buf_t *b, *next;
469 nxt_bool_t cancelled;
470
471 if (msg_info->buf == NULL) {

--- 20 unchanged lines hidden (view full) ---

492 }
493
494 msg_info->buf = NULL;
495
496 return cancelled;
497}
498
499
617static void
618nxt_request_app_link_update_peer_handler(nxt_task_t *task, void *obj,
619 void *data)
620{
621 nxt_request_app_link_t *req_app_link;
622
623 req_app_link = obj;
624
625 nxt_request_app_link_update_peer(task, req_app_link);
626
627 nxt_request_app_link_use(task, req_app_link, -1);
628}
629
630
631static void
632nxt_request_app_link_update_peer(nxt_task_t *task,
633 nxt_request_app_link_t *req_app_link)
634{
635 nxt_event_engine_t *engine;
636 nxt_request_rpc_data_t *req_rpc_data;
637
638 engine = req_app_link->work.data;
639
640 if (task->thread->engine != engine) {
641 nxt_request_app_link_inc_use(req_app_link);
642
643 req_app_link->work.handler = nxt_request_app_link_update_peer_handler;
644 req_app_link->work.task = &engine->task;
645 req_app_link->work.next = NULL;
646
647 nxt_debug(task, "req_app_link stream #%uD post update peer to %p",
648 req_app_link->stream, engine);
649
650 nxt_event_engine_post(engine, &req_app_link->work);
651
652 return;
653 }
654
655 nxt_debug(task, "req_app_link stream #%uD update peer",
656 req_app_link->stream);
657
658 req_rpc_data = req_app_link->req_rpc_data;
659
660 if (req_rpc_data != NULL && req_app_link->app_port != NULL) {
661 nxt_port_rpc_ex_set_peer(task, engine->port, req_rpc_data,
662 req_app_link->app_port->pid);
663 }
664}
665
666
667static void
668nxt_request_app_link_release(nxt_task_t *task,
669 nxt_request_app_link_t *req_app_link)
670{
671 nxt_mp_t *mp;
672 nxt_http_request_t *r;
673 nxt_request_rpc_data_t *req_rpc_data;
674
675 nxt_assert(task->thread->engine == req_app_link->work.data);
676 nxt_assert(req_app_link->use_count == 0);
677
678 nxt_debug(task, "req_app_link stream #%uD release", req_app_link->stream);
679
680 req_rpc_data = req_app_link->req_rpc_data;
681
682 if (req_rpc_data != NULL) {
683 if (nxt_slow_path(req_app_link->err_code != 0)) {
684 nxt_http_request_error(task, req_rpc_data->request,
685 req_app_link->err_code);
686
687 } else {
688 req_rpc_data->app_port = req_app_link->app_port;
689 req_rpc_data->apr_action = req_app_link->apr_action;
690 req_rpc_data->msg_info = req_app_link->msg_info;
691
692 if (req_rpc_data->app->timeout != 0) {
693 r = req_rpc_data->request;
694
695 r->timer.handler = nxt_router_app_timeout;
696 r->timer_data = req_rpc_data;
697 nxt_timer_add(task->thread->engine, &r->timer,
698 req_rpc_data->app->timeout);
699 }
700
701 req_app_link->app_port = NULL;
702 req_app_link->msg_info.buf = NULL;
703 }
704
705 req_rpc_data->req_app_link = NULL;
706 req_app_link->req_rpc_data = NULL;
707 }
708
709 if (req_app_link->app_port != NULL) {
710 nxt_router_app_port_release(task, req_app_link->app_port,
711 req_app_link->apr_action);
712
713 req_app_link->app_port = NULL;
714 }
715
716 if (req_app_link->body_fd != -1) {
717 nxt_fd_close(req_app_link->body_fd);
718
719 req_app_link->body_fd = -1;
720 }
721
722 nxt_router_msg_cancel(task, &req_app_link->msg_info, req_app_link->stream);
723
724 mp = req_app_link->mem_pool;
725
726 if (mp != NULL) {
727 nxt_mp_free(mp, req_app_link);
728 nxt_mp_release(mp);
729 }
730}
731
732
733static void
734nxt_request_app_link_release_handler(nxt_task_t *task, void *obj, void *data)
735{
736 nxt_request_app_link_t *req_app_link;
737
738 req_app_link = obj;
739
740 nxt_assert(req_app_link->work.data == data);
741
742 nxt_request_app_link_use(task, req_app_link, -1);
743}
744
745
746static void
747nxt_request_app_link_use(nxt_task_t *task, nxt_request_app_link_t *req_app_link,
748 int i)
749{
750 int c;
751 nxt_event_engine_t *engine;
752
753 c = nxt_atomic_fetch_add(&req_app_link->use_count, i);
754
755 if (i < 0 && c == -i) {
756 engine = req_app_link->work.data;
757
758 if (task->thread->engine == engine) {
759 nxt_request_app_link_release(task, req_app_link);
760
761 return;
762 }
763
764 nxt_request_app_link_inc_use(req_app_link);
765
766 req_app_link->work.handler = nxt_request_app_link_release_handler;
767 req_app_link->work.task = &engine->task;
768 req_app_link->work.next = NULL;
769
770 nxt_debug(task, "req_app_link stream #%uD post release to %p",
771 req_app_link->stream, engine);
772
773 nxt_event_engine_post(engine, &req_app_link->work);
774 }
775}
776
777
778nxt_inline void
779nxt_request_app_link_error(nxt_task_t *task, nxt_app_t *app,
780 nxt_request_app_link_t *req_app_link, const char *str)
781{
782 req_app_link->app_port = NULL;
783 req_app_link->err_code = 500;
784 req_app_link->err_str = str;
785
786 nxt_alert(task, "app \"%V\" internal error: %s on #%uD",
787 &app->name, str, req_app_link->stream);
788}
789
790
791nxt_inline void
792nxt_request_app_link_pending(nxt_task_t *task, nxt_app_t *app,
793 nxt_request_app_link_t *req_app_link)
794{
795 nxt_queue_insert_tail(&req_app_link->app_port->pending_requests,
796 &req_app_link->link_port_pending);
797 nxt_queue_insert_tail(&app->pending, &req_app_link->link_app_pending);
798
799 nxt_request_app_link_inc_use(req_app_link);
800
801 req_app_link->res_time = nxt_thread_monotonic_time(task->thread)
802 + app->res_timeout;
803
804 nxt_debug(task, "req_app_link stream #%uD enqueue to pending_requests",
805 req_app_link->stream);
806}
807
808
809nxt_inline nxt_bool_t
810nxt_queue_chk_remove(nxt_queue_link_t *lnk)
811{
812 if (lnk->next != NULL) {
813 nxt_queue_remove(lnk);
814
815 lnk->next = NULL;
816
817 return 1;
818 }
819
820 return 0;
821}
822
823
824nxt_inline void
825nxt_request_rpc_data_unlink(nxt_task_t *task,
826 nxt_request_rpc_data_t *req_rpc_data)
827{
500nxt_inline nxt_bool_t
501nxt_queue_chk_remove(nxt_queue_link_t *lnk)
502{
503 if (lnk->next != NULL) {
504 nxt_queue_remove(lnk);
505
506 lnk->next = NULL;
507
508 return 1;
509 }
510
511 return 0;
512}
513
514
515nxt_inline void
516nxt_request_rpc_data_unlink(nxt_task_t *task,
517 nxt_request_rpc_data_t *req_rpc_data)
518{
828 int ra_use_delta;
829 nxt_request_app_link_t *req_app_link;
519 nxt_http_request_t *r;
830
520
521 nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream);
522
831 if (req_rpc_data->app_port != NULL) {
832 nxt_router_app_port_release(task, req_rpc_data->app_port,
833 req_rpc_data->apr_action);
834
835 req_rpc_data->app_port = NULL;
836 }
837
523 if (req_rpc_data->app_port != NULL) {
524 nxt_router_app_port_release(task, req_rpc_data->app_port,
525 req_rpc_data->apr_action);
526
527 req_rpc_data->app_port = NULL;
528 }
529
838 nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream);
530 if (req_rpc_data->app != NULL) {
531 nxt_router_app_use(task, req_rpc_data->app, -1);
839
532
840 req_app_link = req_rpc_data->req_app_link;
841 if (req_app_link != NULL) {
842 req_rpc_data->req_app_link = NULL;
843 req_app_link->req_rpc_data = NULL;
533 req_rpc_data->app = NULL;
534 }
844
535
845 ra_use_delta = 0;
536 r = req_rpc_data->request;
846
537
847 nxt_thread_mutex_lock(&req_rpc_data->app->mutex);
538 if (r != NULL) {
539 r->timer_data = NULL;
848
540
849 if (req_app_link->link_app_requests.next == NULL
850 && req_app_link->link_port_pending.next == NULL
851 && req_app_link->link_app_pending.next == NULL
852 && req_app_link->link_port_websockets.next == NULL)
853 {
854 req_app_link = NULL;
541 nxt_router_http_request_release_post(task, r);
855
542
856 } else {
857 ra_use_delta -=
858 nxt_queue_chk_remove(&req_app_link->link_app_requests)
859 + nxt_queue_chk_remove(&req_app_link->link_port_pending)
860 + nxt_queue_chk_remove(&req_app_link->link_port_websockets);
861
862 nxt_queue_chk_remove(&req_app_link->link_app_pending);
863 }
864
865 nxt_thread_mutex_unlock(&req_rpc_data->app->mutex);
866
867 if (req_app_link != NULL) {
868 nxt_request_app_link_use(task, req_app_link, ra_use_delta);
869 }
543 r->req_rpc_data = NULL;
544 req_rpc_data->request = NULL;
870 }
871
545 }
546
872 if (req_rpc_data->app != NULL) {
873 nxt_router_app_use(task, req_rpc_data->app, -1);
547 if (req_rpc_data->msg_info.body_fd != -1) {
548 nxt_fd_close(req_rpc_data->msg_info.body_fd);
874
549
875 req_rpc_data->app = NULL;
550 req_rpc_data->msg_info.body_fd = -1;
876 }
877
551 }
552
878 if (req_rpc_data->request != NULL) {
879 req_rpc_data->request->timer_data = NULL;
553 if (req_rpc_data->rpc_cancel) {
554 req_rpc_data->rpc_cancel = 0;
880
555
881 nxt_router_http_request_done(task, req_rpc_data->request);
882
883 req_rpc_data->request->req_rpc_data = NULL;
884 req_rpc_data->request = NULL;
556 nxt_port_rpc_cancel(task, task->thread->engine->port,
557 req_rpc_data->stream);
885 }
886}
887
888
889void
890nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
891{
558 }
559}
560
561
562void
563nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
564{
565 nxt_app_t *app;
566 nxt_port_t *port, *main_app_port;
567 nxt_runtime_t *rt;
568
892 nxt_port_new_port_handler(task, msg);
893
569 nxt_port_new_port_handler(task, msg);
570
894 if (msg->u.new_port != NULL
895 && msg->u.new_port->type == NXT_PROCESS_CONTROLLER)
896 {
571 port = msg->u.new_port;
572
573 if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) {
897 nxt_router_greet_controller(task, msg->u.new_port);
898 }
899
574 nxt_router_greet_controller(task, msg->u.new_port);
575 }
576
900 if (msg->port_msg.stream == 0) {
901 return;
902 }
577 if (port == NULL || port->type != NXT_PROCESS_APP) {
903
578
904 if (msg->u.new_port == NULL
905 || msg->u.new_port->type != NXT_PROCESS_APP)
906 {
579 if (msg->port_msg.stream == 0) {
580 return;
581 }
582
907 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
908 }
909
583 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
584 }
585
910 nxt_port_rpc_handler(task, msg);
586 if (msg->port_msg.stream != 0) {
587 nxt_port_rpc_handler(task, msg);
588 return;
589 }
590
591 /*
592 * Port with "id == 0" is application 'main' port and it always
593 * should come with non-zero stream.
594 */
595 nxt_assert(port->id != 0);
596
597 /* Find 'main' app port and get app reference. */
598 rt = task->thread->runtime;
599
600 /*
601 * It is safe to access 'runtime->ports' hash because 'NEW_PORT'
602 * sent to main port (with id == 0) and processed in main thread.
603 */
604 main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0);
605 nxt_assert(main_app_port != NULL);
606
607 app = main_app_port->app;
608 nxt_assert(app != NULL);
609
610 nxt_thread_mutex_lock(&app->mutex);
611
612 /* TODO here should be find-and-add code because there can be
613 port waiters in port_hash */
614 nxt_port_hash_add(&app->port_hash, port);
615 app->port_hash_count++;
616
617 nxt_thread_mutex_unlock(&app->mutex);
618
619 port->app = app;
620 port->main_app_port = main_app_port;
911}
912
913
914void
915nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
916{
917 void *p;
918 size_t size;

--- 176 unchanged lines hidden (view full) ---

1095 return app->processes + app->pending_processes < app->max_processes
1096 && app->pending_processes < app->max_pending_processes;
1097}
1098
1099
1100nxt_inline nxt_bool_t
1101nxt_router_app_need_start(nxt_app_t *app)
1102{
621}
622
623
624void
625nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
626{
627 void *p;
628 size_t size;

--- 176 unchanged lines hidden (view full) ---

805 return app->processes + app->pending_processes < app->max_processes
806 && app->pending_processes < app->max_pending_processes;
807}
808
809
810nxt_inline nxt_bool_t
811nxt_router_app_need_start(nxt_app_t *app)
812{
1103 return app->idle_processes + app->pending_processes
1104 < app->spare_processes;
813 return (app->active_requests
814 > app->port_hash_count + app->pending_processes)
815 || (app->spare_processes
816 > app->idle_processes + app->pending_processes);
1105}
1106
1107
1108static void
1109nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
1110{
1111 nxt_int_t ret;
1112 nxt_app_t *app;

--- 412 unchanged lines hidden (view full) ---

1525 size_t size;
1526 nxt_mp_t *mp, *app_mp;
1527 uint32_t next, next_target;
1528 nxt_int_t ret;
1529 nxt_str_t name, path, target;
1530 nxt_app_t *app, *prev;
1531 nxt_str_t *t, *s, *targets;
1532 nxt_uint_t n, i;
817}
818
819
820static void
821nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
822{
823 nxt_int_t ret;
824 nxt_app_t *app;

--- 412 unchanged lines hidden (view full) ---

1237 size_t size;
1238 nxt_mp_t *mp, *app_mp;
1239 uint32_t next, next_target;
1240 nxt_int_t ret;
1241 nxt_str_t name, path, target;
1242 nxt_app_t *app, *prev;
1243 nxt_str_t *t, *s, *targets;
1244 nxt_uint_t n, i;
1245 nxt_port_t *port;
1533 nxt_router_t *router;
1534 nxt_app_joint_t *app_joint;
1535 nxt_conf_value_t *conf, *http, *value, *websocket;
1536 nxt_conf_value_t *applications, *application;
1537 nxt_conf_value_t *listeners, *listener;
1538 nxt_conf_value_t *routes_conf, *static_conf;
1539 nxt_socket_conf_t *skcf;
1540 nxt_http_routes_t *routes;

--- 198 unchanged lines hidden (view full) ---

1739 ret = nxt_thread_mutex_create(&app->mutex);
1740 if (ret != NXT_OK) {
1741 goto app_fail;
1742 }
1743
1744 nxt_queue_init(&app->ports);
1745 nxt_queue_init(&app->spare_ports);
1746 nxt_queue_init(&app->idle_ports);
1246 nxt_router_t *router;
1247 nxt_app_joint_t *app_joint;
1248 nxt_conf_value_t *conf, *http, *value, *websocket;
1249 nxt_conf_value_t *applications, *application;
1250 nxt_conf_value_t *listeners, *listener;
1251 nxt_conf_value_t *routes_conf, *static_conf;
1252 nxt_socket_conf_t *skcf;
1253 nxt_http_routes_t *routes;

--- 198 unchanged lines hidden (view full) ---

1452 ret = nxt_thread_mutex_create(&app->mutex);
1453 if (ret != NXT_OK) {
1454 goto app_fail;
1455 }
1456
1457 nxt_queue_init(&app->ports);
1458 nxt_queue_init(&app->spare_ports);
1459 nxt_queue_init(&app->idle_ports);
1747 nxt_queue_init(&app->requests);
1748 nxt_queue_init(&app->pending);
1749
1750 app->name.length = name.length;
1751 nxt_memcpy(app->name.start, name.start, name.length);
1752
1753 app->type = lang->type;
1754 app->max_processes = apcf.max_processes;
1755 app->spare_processes = apcf.spare_processes;
1756 app->max_pending_processes = apcf.spare_processes
1757 ? apcf.spare_processes : 1;
1758 app->timeout = apcf.timeout;
1759 app->res_timeout = apcf.res_timeout * 1000000;
1760 app->idle_timeout = apcf.idle_timeout;
1460
1461 app->name.length = name.length;
1462 nxt_memcpy(app->name.start, name.start, name.length);
1463
1464 app->type = lang->type;
1465 app->max_processes = apcf.max_processes;
1466 app->spare_processes = apcf.spare_processes;
1467 app->max_pending_processes = apcf.spare_processes
1468 ? apcf.spare_processes : 1;
1469 app->timeout = apcf.timeout;
1470 app->res_timeout = apcf.res_timeout * 1000000;
1471 app->idle_timeout = apcf.idle_timeout;
1761 app->max_pending_responses = 2;
1762 app->max_requests = apcf.requests;
1763
1764 app->targets = targets;
1765
1766 engine = task->thread->engine;
1767
1768 app->engine = engine;
1769

--- 14 unchanged lines hidden (view full) ---

1784 app_joint->idle_timer.work_queue = &engine->fast_work_queue;
1785 app_joint->idle_timer.handler = nxt_router_app_idle_timeout;
1786 app_joint->idle_timer.task = &engine->task;
1787 app_joint->idle_timer.log = app_joint->idle_timer.task->log;
1788
1789 app_joint->free_app_work.handler = nxt_router_free_app;
1790 app_joint->free_app_work.task = &engine->task;
1791 app_joint->free_app_work.obj = app_joint;
1472 app->max_requests = apcf.requests;
1473
1474 app->targets = targets;
1475
1476 engine = task->thread->engine;
1477
1478 app->engine = engine;
1479

--- 14 unchanged lines hidden (view full) ---

1494 app_joint->idle_timer.work_queue = &engine->fast_work_queue;
1495 app_joint->idle_timer.handler = nxt_router_app_idle_timeout;
1496 app_joint->idle_timer.task = &engine->task;
1497 app_joint->idle_timer.log = app_joint->idle_timer.task->log;
1498
1499 app_joint->free_app_work.handler = nxt_router_free_app;
1500 app_joint->free_app_work.task = &engine->task;
1501 app_joint->free_app_work.obj = app_joint;
1502
1503 port = nxt_port_new(task, (nxt_port_id_t) -1, nxt_pid,
1504 NXT_PROCESS_APP);
1505 if (nxt_slow_path(port == NULL)) {
1506 return NXT_ERROR;
1507 }
1508
1509 ret = nxt_port_socket_init(task, port, 0);
1510 if (nxt_slow_path(ret != NXT_OK)) {
1511 nxt_port_use(task, port, -1);
1512 return NXT_ERROR;
1513 }
1514
1515 nxt_port_write_enable(task, port);
1516 port->app = app;
1517
1518 app->shared_port = port;
1519
1520 nxt_thread_mutex_create(&app->outgoing.mutex);
1792 }
1793 }
1794
1795 routes_conf = nxt_conf_get_path(conf, &routes_path);
1796 if (nxt_fast_path(routes_conf != NULL)) {
1797 routes = nxt_http_routes_create(task, tmcf, routes_conf);
1798 if (nxt_slow_path(routes == NULL)) {
1799 return NXT_ERROR;

--- 717 unchanged lines hidden (view full) ---

2517 nxt_port_t *port;
2518 nxt_app_rpc_t *rpc;
2519 nxt_event_engine_t *engine;
2520
2521 rpc = data;
2522 app = rpc->app;
2523
2524 port = msg->u.new_port;
1521 }
1522 }
1523
1524 routes_conf = nxt_conf_get_path(conf, &routes_path);
1525 if (nxt_fast_path(routes_conf != NULL)) {
1526 routes = nxt_http_routes_create(task, tmcf, routes_conf);
1527 if (nxt_slow_path(routes == NULL)) {
1528 return NXT_ERROR;

--- 717 unchanged lines hidden (view full) ---

2246 nxt_port_t *port;
2247 nxt_app_rpc_t *rpc;
2248 nxt_event_engine_t *engine;
2249
2250 rpc = data;
2251 app = rpc->app;
2252
2253 port = msg->u.new_port;
2254
2255 nxt_assert(port != NULL);
2256 nxt_assert(port->type == NXT_PROCESS_APP);
2257 nxt_assert(port->id == 0);
2258
2525 port->app = app;
2259 port->app = app;
2260 port->main_app_port = port;
2526
2527 app->pending_processes--;
2528 app->processes++;
2529 app->idle_processes++;
2530
2531 engine = task->thread->engine;
2532
2533 nxt_queue_insert_tail(&app->ports, &port->app_link);
2534 nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
2261
2262 app->pending_processes--;
2263 app->processes++;
2264 app->idle_processes++;
2265
2266 engine = task->thread->engine;
2267
2268 nxt_queue_insert_tail(&app->ports, &port->app_link);
2269 nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
2270 nxt_port_hash_add(&app->port_hash, port);
2271 app->port_hash_count++;
2535
2536 port->idle_start = 0;
2537
2538 nxt_port_inc_use(port);
2539
2272
2273 port->idle_start = 0;
2274
2275 nxt_port_inc_use(port);
2276
2277 nxt_router_app_shared_port_send(task, port);
2278
2540 nxt_work_queue_add(&engine->fast_work_queue,
2541 nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2542}
2543
2544
2545static void
2546nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2547 void *data)

--- 386 unchanged lines hidden (view full) ---

2934 work->next = NULL;
2935
2936 nxt_event_engine_post(engine, work);
2937 }
2938}
2939
2940
2941static nxt_port_handlers_t nxt_router_app_port_handlers = {
2279 nxt_work_queue_add(&engine->fast_work_queue,
2280 nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2281}
2282
2283
2284static void
2285nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2286 void *data)

--- 386 unchanged lines hidden (view full) ---

2673 work->next = NULL;
2674
2675 nxt_event_engine_post(engine, work);
2676 }
2677}
2678
2679
2680static nxt_port_handlers_t nxt_router_app_port_handlers = {
2942 .rpc_error = nxt_port_rpc_handler,
2943 .mmap = nxt_port_mmap_handler,
2944 .data = nxt_port_rpc_handler,
2945 .oosm = nxt_router_oosm_handler,
2681 .rpc_error = nxt_port_rpc_handler,
2682 .mmap = nxt_port_mmap_handler,
2683 .data = nxt_port_rpc_handler,
2684 .oosm = nxt_router_oosm_handler,
2685 .req_headers_ack = nxt_port_rpc_handler,
2946};
2947
2948
2949static void
2950nxt_router_thread_start(void *data)
2951{
2952 nxt_int_t ret;
2953 nxt_port_t *port;

--- 777 unchanged lines hidden (view full) ---

3731}
3732
3733
3734static void
3735nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3736 void *data)
3737{
3738 nxt_int_t ret;
2686};
2687
2688
2689static void
2690nxt_router_thread_start(void *data)
2691{
2692 nxt_int_t ret;
2693 nxt_port_t *port;

--- 777 unchanged lines hidden (view full) ---

3471}
3472
3473
3474static void
3475nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3476 void *data)
3477{
3478 nxt_int_t ret;
3479 nxt_app_t *app;
3739 nxt_buf_t *b, *next;
3740 nxt_port_t *app_port;
3741 nxt_unit_field_t *f;
3742 nxt_http_field_t *field;
3743 nxt_http_request_t *r;
3744 nxt_unit_response_t *resp;
3480 nxt_buf_t *b, *next;
3481 nxt_port_t *app_port;
3482 nxt_unit_field_t *f;
3483 nxt_http_field_t *field;
3484 nxt_http_request_t *r;
3485 nxt_unit_response_t *resp;
3745 nxt_request_app_link_t *req_app_link;
3746 nxt_request_rpc_data_t *req_rpc_data;
3747
3486 nxt_request_rpc_data_t *req_rpc_data;
3487
3748 b = msg->buf;
3749 req_rpc_data = data;
3750
3488 req_rpc_data = data;
3489
3751 if (msg->size == 0) {
3752 b = NULL;
3753 }
3754
3755 r = req_rpc_data->request;
3756 if (nxt_slow_path(r == NULL)) {
3757 return;
3758 }
3759
3760 if (r->error) {
3761 nxt_request_rpc_data_unlink(task, req_rpc_data);
3762 return;
3763 }
3764
3490 r = req_rpc_data->request;
3491 if (nxt_slow_path(r == NULL)) {
3492 return;
3493 }
3494
3495 if (r->error) {
3496 nxt_request_rpc_data_unlink(task, req_rpc_data);
3497 return;
3498 }
3499
3500 app = req_rpc_data->app;
3501 nxt_assert(app != NULL);
3502
3503 if (msg->port_msg.type == _NXT_PORT_MSG_REQ_HEADERS_ACK) {
3504 nxt_router_req_headers_ack_handler(task, msg, req_rpc_data);
3505
3506 return;
3507 }
3508
3509 b = (msg->size == 0) ? NULL : msg->buf;
3510
3765 if (msg->port_msg.last != 0) {
3766 nxt_debug(task, "router data create last buf");
3767
3768 nxt_buf_chain_add(&b, nxt_http_buf_last(r));
3769
3511 if (msg->port_msg.last != 0) {
3512 nxt_debug(task, "router data create last buf");
3513
3514 nxt_buf_chain_add(&b, nxt_http_buf_last(r));
3515
3516 req_rpc_data->rpc_cancel = 0;
3517 req_rpc_data->apr_action = NXT_APR_GOT_RESPONSE;
3518
3770 nxt_request_rpc_data_unlink(task, req_rpc_data);
3771
3772 } else {
3519 nxt_request_rpc_data_unlink(task, req_rpc_data);
3520
3521 } else {
3773 if (req_rpc_data->app != NULL && req_rpc_data->app->timeout != 0) {
3522 if (app->timeout != 0) {
3774 r->timer.handler = nxt_router_app_timeout;
3775 r->timer_data = req_rpc_data;
3523 r->timer.handler = nxt_router_app_timeout;
3524 r->timer_data = req_rpc_data;
3776 nxt_timer_add(task->thread->engine, &r->timer,
3777 req_rpc_data->app->timeout);
3525 nxt_timer_add(task->thread->engine, &r->timer, app->timeout);
3778 }
3779 }
3780
3781 if (b == NULL) {
3782 return;
3783 }
3784
3785 if (msg->buf == b) {

--- 79 unchanged lines hidden (view full) ---

3865 nxt_buf_chain_add(&r->out, b);
3866 }
3867
3868 nxt_http_request_header_send(task, r, nxt_http_request_send_body, NULL);
3869
3870 if (r->websocket_handshake
3871 && r->status == NXT_HTTP_SWITCHING_PROTOCOLS)
3872 {
3526 }
3527 }
3528
3529 if (b == NULL) {
3530 return;
3531 }
3532
3533 if (msg->buf == b) {

--- 79 unchanged lines hidden (view full) ---

3613 nxt_buf_chain_add(&r->out, b);
3614 }
3615
3616 nxt_http_request_header_send(task, r, nxt_http_request_send_body, NULL);
3617
3618 if (r->websocket_handshake
3619 && r->status == NXT_HTTP_SWITCHING_PROTOCOLS)
3620 {
3873 req_app_link = nxt_request_app_link_alloc(task,
3874 req_rpc_data->req_app_link,
3875 req_rpc_data);
3876 if (nxt_slow_path(req_app_link == NULL)) {
3877 goto fail;
3878 }
3879
3880 app_port = req_app_link->app_port;
3881
3882 if (app_port == NULL && req_rpc_data->app_port != NULL) {
3883 req_app_link->app_port = req_rpc_data->app_port;
3884 app_port = req_app_link->app_port;
3885 req_app_link->apr_action = req_rpc_data->apr_action;
3886
3887 req_rpc_data->app_port = NULL;
3888 }
3889
3621 app_port = req_rpc_data->app_port;
3890 if (nxt_slow_path(app_port == NULL)) {
3891 goto fail;
3892 }
3893
3622 if (nxt_slow_path(app_port == NULL)) {
3623 goto fail;
3624 }
3625
3894 nxt_thread_mutex_lock(&req_rpc_data->app->mutex);
3626 nxt_thread_mutex_lock(&app->mutex);
3895
3627
3896 nxt_queue_insert_tail(&app_port->active_websockets,
3897 &req_app_link->link_port_websockets);
3628 app_port->main_app_port->active_websockets++;
3898
3629
3899 nxt_thread_mutex_unlock(&req_rpc_data->app->mutex);
3630 nxt_thread_mutex_unlock(&app->mutex);
3900
3901 nxt_router_app_port_release(task, app_port, NXT_APR_UPGRADE);
3631
3632 nxt_router_app_port_release(task, app_port, NXT_APR_UPGRADE);
3902 req_app_link->apr_action = NXT_APR_CLOSE;
3633 req_rpc_data->apr_action = NXT_APR_CLOSE;
3903
3634
3904 nxt_debug(task, "req_app_link stream #%uD upgrade",
3905 req_app_link->stream);
3635 nxt_debug(task, "stream #%uD upgrade", req_rpc_data->stream);
3906
3907 r->state = &nxt_http_websocket;
3908
3909 } else {
3910 r->state = &nxt_http_request_send_state;
3911 }
3912 }
3913
3914 return;
3915
3916fail:
3917
3918 nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
3919
3920 nxt_request_rpc_data_unlink(task, req_rpc_data);
3921}
3922
3923
3636
3637 r->state = &nxt_http_websocket;
3638
3639 } else {
3640 r->state = &nxt_http_request_send_state;
3641 }
3642 }
3643
3644 return;
3645
3646fail:
3647
3648 nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
3649
3650 nxt_request_rpc_data_unlink(task, req_rpc_data);
3651}
3652
3653
3654static void
3655nxt_router_req_headers_ack_handler(nxt_task_t *task,
3656 nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data)
3657{
3658 nxt_app_t *app;
3659 nxt_bool_t start_process;
3660 nxt_port_t *app_port, *main_app_port, *idle_port;
3661 nxt_queue_link_t *idle_lnk;
3662 nxt_http_request_t *r;
3663
3664 nxt_debug(task, "stream #%uD: got ack from %PI:%d",
3665 req_rpc_data->stream,
3666 msg->port_msg.pid, msg->port_msg.reply_port);
3667
3668 nxt_port_rpc_ex_set_peer(task, msg->port, req_rpc_data,
3669 msg->port_msg.pid);
3670
3671 app = req_rpc_data->app;
3672
3673 start_process = 0;
3674
3675 nxt_thread_mutex_lock(&app->mutex);
3676
3677 app_port = nxt_port_hash_find(&app->port_hash, msg->port_msg.pid,
3678 msg->port_msg.reply_port);
3679 if (nxt_slow_path(app_port == NULL)) {
3680 nxt_thread_mutex_unlock(&app->mutex);
3681
3682 r = req_rpc_data->request;
3683 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
3684
3685 return;
3686 }
3687
3688 main_app_port = app_port->main_app_port;
3689
3690 if (nxt_queue_chk_remove(&main_app_port->idle_link)) {
3691 app->idle_processes--;
3692
3693 /* Check port was in 'spare_ports' using idle_start field. */
3694 if (main_app_port->idle_start == 0
3695 && app->idle_processes >= app->spare_processes)
3696 {
3697 /*
3698 * If there is a vacant space in spare ports,
3699 * move the last idle to spare_ports.
3700 */
3701 nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
3702
3703 idle_lnk = nxt_queue_last(&app->idle_ports);
3704 idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link);
3705 nxt_queue_remove(idle_lnk);
3706
3707 nxt_queue_insert_tail(&app->spare_ports, idle_lnk);
3708
3709 idle_port->idle_start = 0;
3710 }
3711
3712 if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) {
3713 app->pending_processes++;
3714 start_process = 1;
3715 }
3716 }
3717
3718 main_app_port->active_requests++;
3719
3720 nxt_port_inc_use(app_port);
3721
3722 nxt_thread_mutex_unlock(&app->mutex);
3723
3724 if (start_process) {
3725 nxt_router_start_app_process(task, app);
3726 }
3727
3728 nxt_port_use(task, req_rpc_data->app_port, -1);
3729
3730 req_rpc_data->app_port = app_port;
3731
3732 if (app->timeout != 0) {
3733 r = req_rpc_data->request;
3734
3735 r->timer.handler = nxt_router_app_timeout;
3736 r->timer_data = req_rpc_data;
3737 nxt_timer_add(task->thread->engine, &r->timer, app->timeout);
3738 }
3739}
3740
3741
3924static const nxt_http_request_state_t nxt_http_request_send_state
3925 nxt_aligned(64) =
3926{
3927 .error_handler = nxt_http_request_error_handler,
3928};
3929
3930
3931static void

--- 12 unchanged lines hidden (view full) ---

3944 }
3945}
3946
3947
3948static void
3949nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3950 void *data)
3951{
3742static const nxt_http_request_state_t nxt_http_request_send_state
3743 nxt_aligned(64) =
3744{
3745 .error_handler = nxt_http_request_error_handler,
3746};
3747
3748
3749static void

--- 12 unchanged lines hidden (view full) ---

3762 }
3763}
3764
3765
3766static void
3767nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3768 void *data)
3769{
3952 nxt_int_t res;
3953 nxt_port_t *port;
3954 nxt_bool_t cancelled;
3955 nxt_request_app_link_t *req_app_link;
3956 nxt_request_rpc_data_t *req_rpc_data;
3957
3958 req_rpc_data = data;
3959
3770 nxt_request_rpc_data_t *req_rpc_data;
3771
3772 req_rpc_data = data;
3773
3960 req_app_link = req_rpc_data->req_app_link;
3774 req_rpc_data->rpc_cancel = 0;
3961
3775
3962 if (req_app_link != NULL) {
3963 cancelled = nxt_router_msg_cancel(task, &req_app_link->msg_info,
3964 req_app_link->stream);
3965 if (cancelled) {
3966 res = nxt_router_app_port(task, req_rpc_data->app, req_app_link);
3776 /* TODO cancel message and return if cancelled. */
3777 // nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream);
3967
3778
3968 if (res == NXT_OK) {
3969 port = req_app_link->app_port;
3970
3971 if (nxt_slow_path(port == NULL)) {
3972 nxt_log(task, NXT_LOG_ERR,
3973 "port is NULL in cancelled req_app_link");
3974 return;
3975 }
3976
3977 nxt_port_rpc_ex_set_peer(task, task->thread->engine->port,
3978 req_rpc_data, port->pid);
3979
3980 nxt_router_app_prepare_request(task, req_app_link);
3981 }
3982
3983 msg->port_msg.last = 0;
3984
3985 return;
3986 }
3987 }
3988
3989 if (req_rpc_data->request != NULL) {
3990 nxt_http_request_error(task, req_rpc_data->request,
3991 NXT_HTTP_SERVICE_UNAVAILABLE);
3992 }
3993
3994 nxt_request_rpc_data_unlink(task, req_rpc_data);
3995}
3996

--- 6 unchanged lines hidden (view full) ---

4003 nxt_port_t *port;
4004 nxt_app_joint_t *app_joint;
4005
4006 app_joint = data;
4007 port = msg->u.new_port;
4008
4009 nxt_assert(app_joint != NULL);
4010 nxt_assert(port != NULL);
3779 if (req_rpc_data->request != NULL) {
3780 nxt_http_request_error(task, req_rpc_data->request,
3781 NXT_HTTP_SERVICE_UNAVAILABLE);
3782 }
3783
3784 nxt_request_rpc_data_unlink(task, req_rpc_data);
3785}
3786

--- 6 unchanged lines hidden (view full) ---

3793 nxt_port_t *port;
3794 nxt_app_joint_t *app_joint;
3795
3796 app_joint = data;
3797 port = msg->u.new_port;
3798
3799 nxt_assert(app_joint != NULL);
3800 nxt_assert(port != NULL);
3801 nxt_assert(port->type == NXT_PROCESS_APP);
3802 nxt_assert(port->id == 0);
4011
4012 app = app_joint->app;
4013
4014 nxt_router_app_joint_use(task, app_joint, -1);
4015
4016 if (nxt_slow_path(app == NULL)) {
4017 nxt_debug(task, "new port ready for released app, send QUIT");
4018
4019 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4020
4021 return;
4022 }
4023
4024 port->app = app;
3803
3804 app = app_joint->app;
3805
3806 nxt_router_app_joint_use(task, app_joint, -1);
3807
3808 if (nxt_slow_path(app == NULL)) {
3809 nxt_debug(task, "new port ready for released app, send QUIT");
3810
3811 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
3812
3813 return;
3814 }
3815
3816 port->app = app;
3817 port->main_app_port = port;
4025
4026 nxt_thread_mutex_lock(&app->mutex);
4027
4028 nxt_assert(app->pending_processes != 0);
4029
4030 app->pending_processes--;
4031 app->processes++;
3818
3819 nxt_thread_mutex_lock(&app->mutex);
3820
3821 nxt_assert(app->pending_processes != 0);
3822
3823 app->pending_processes--;
3824 app->processes++;
3825 nxt_port_hash_add(&app->port_hash, port);
3826 app->port_hash_count++;
4032
4033 nxt_thread_mutex_unlock(&app->mutex);
4034
4035 nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d",
4036 &app->name, port->pid, app->processes, app->pending_processes);
4037
3827
3828 nxt_thread_mutex_unlock(&app->mutex);
3829
3830 nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d",
3831 &app->name, port->pid, app->processes, app->pending_processes);
3832
3833 nxt_router_app_shared_port_send(task, port);
3834
4038 nxt_router_app_port_release(task, port, NXT_APR_NEW_PORT);
4039}
4040
4041
3835 nxt_router_app_port_release(task, port, NXT_APR_NEW_PORT);
3836}
3837
3838
3839static nxt_int_t
3840nxt_router_app_shared_port_send(nxt_task_t *task, nxt_port_t *app_port)
3841{
3842 nxt_buf_t *b;
3843 nxt_port_t *port;
3844 nxt_port_msg_new_port_t *msg;
3845
3846 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
3847 sizeof(nxt_port_data_t));
3848 if (nxt_slow_path(b == NULL)) {
3849 return NXT_ERROR;
3850 }
3851
3852 port = app_port->app->shared_port;
3853
3854 nxt_debug(task, "send port %FD to process %PI",
3855 port->pair[0], app_port->pid);
3856
3857 b->mem.free += sizeof(nxt_port_msg_new_port_t);
3858 msg = (nxt_port_msg_new_port_t *) b->mem.pos;
3859
3860 msg->id = port->id;
3861 msg->pid = port->pid;
3862 msg->max_size = port->max_size;
3863 msg->max_share = port->max_share;
3864 msg->type = port->type;
3865
3866 return nxt_port_socket_twrite(task, app_port,
3867 NXT_PORT_MSG_NEW_PORT,
3868 port->pair[0],
3869 0, 0, b, NULL);
3870}
3871
3872
4042static void
4043nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4044 void *data)
4045{
3873static void
3874nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3875 void *data)
3876{
4046 nxt_app_t *app;
4047 nxt_app_joint_t *app_joint;
4048 nxt_queue_link_t *lnk;
4049 nxt_request_app_link_t *req_app_link;
3877 nxt_app_t *app;
3878 nxt_app_joint_t *app_joint;
4050
4051 app_joint = data;
4052
4053 nxt_assert(app_joint != NULL);
4054
4055 app = app_joint->app;
4056
4057 nxt_router_app_joint_use(task, app_joint, -1);

--- 7 unchanged lines hidden (view full) ---

4065 nxt_debug(task, "app '%V' %p start error", &app->name, app);
4066
4067 nxt_thread_mutex_lock(&app->mutex);
4068
4069 nxt_assert(app->pending_processes != 0);
4070
4071 app->pending_processes--;
4072
3879
3880 app_joint = data;
3881
3882 nxt_assert(app_joint != NULL);
3883
3884 app = app_joint->app;
3885
3886 nxt_router_app_joint_use(task, app_joint, -1);

--- 7 unchanged lines hidden (view full) ---

3894 nxt_debug(task, "app '%V' %p start error", &app->name, app);
3895
3896 nxt_thread_mutex_lock(&app->mutex);
3897
3898 nxt_assert(app->pending_processes != 0);
3899
3900 app->pending_processes--;
3901
4073 if (!nxt_queue_is_empty(&app->requests)) {
4074 lnk = nxt_queue_last(&app->requests);
4075 nxt_queue_remove(lnk);
4076 lnk->next = NULL;
4077
4078 req_app_link = nxt_queue_link_data(lnk, nxt_request_app_link_t,
4079 link_app_requests);
4080
4081 } else {
4082 req_app_link = NULL;
4083 }
4084
4085 nxt_thread_mutex_unlock(&app->mutex);
4086
3902 nxt_thread_mutex_unlock(&app->mutex);
3903
4087 if (req_app_link != NULL) {
4088 nxt_debug(task, "app '%V' %p abort next stream #%uD",
4089 &app->name, app, req_app_link->stream);
4090
4091 nxt_request_app_link_error(task, app, req_app_link,
4092 "Failed to start application process");
4093 nxt_request_app_link_use(task, req_app_link, -1);
4094 }
3904 /* TODO req_app_link to cancel first pending message */
4095}
4096
3905}
3906
4097nxt_inline nxt_port_t *
4098nxt_router_app_get_port_for_quit(nxt_app_t *app);
4099
4100void
4101nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i)
4102{
4103 int c;
4104
4105 c = nxt_atomic_fetch_add(&app->use_count, i);
4106

--- 4 unchanged lines hidden (view full) ---

4111
4112 } else {
4113 nxt_router_free_app(task, app->joint, NULL);
4114 }
4115 }
4116}
4117
4118
3907
3908void
3909nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i)
3910{
3911 int c;
3912
3913 c = nxt_atomic_fetch_add(&app->use_count, i);
3914

--- 4 unchanged lines hidden (view full) ---

3919
3920 } else {
3921 nxt_router_free_app(task, app->joint, NULL);
3922 }
3923 }
3924}
3925
3926
4119nxt_inline nxt_bool_t
4120nxt_router_app_first_port_busy(nxt_app_t *app)
4121{
4122 nxt_port_t *port;
4123 nxt_queue_link_t *lnk;
4124
4125 lnk = nxt_queue_first(&app->ports);
4126 port = nxt_queue_link_data(lnk, nxt_port_t, app_link);
4127
4128 return port->app_pending_responses > 0;
4129}
4130
4131
4132nxt_inline nxt_port_t *
3927nxt_inline nxt_port_t *
4133nxt_router_pop_first_port(nxt_app_t *app)
4134{
4135 nxt_port_t *port;
4136 nxt_queue_link_t *lnk;
4137
4138 lnk = nxt_queue_first(&app->ports);
4139 nxt_queue_remove(lnk);
4140
4141 port = nxt_queue_link_data(lnk, nxt_port_t, app_link);
4142
4143 port->app_pending_responses++;
4144
4145 if (nxt_queue_chk_remove(&port->idle_link)) {
4146 app->idle_processes--;
4147
4148 if (port->idle_start == 0) {
4149 nxt_assert(app->idle_processes < app->spare_processes);
4150
4151 } else {
4152 nxt_assert(app->idle_processes >= app->spare_processes);
4153
4154 port->idle_start = 0;
4155 }
4156 }
4157
4158 if ((app->max_pending_responses == 0
4159 || port->app_pending_responses < app->max_pending_responses)
4160 && (app->max_requests == 0
4161 || port->app_responses + port->app_pending_responses
4162 < app->max_requests))
4163 {
4164 nxt_queue_insert_tail(&app->ports, lnk);
4165
4166 nxt_port_inc_use(port);
4167
4168 } else {
4169 lnk->next = NULL;
4170 }
4171
4172 return port;
4173}
4174
4175
4176nxt_inline nxt_port_t *
4177nxt_router_app_get_port_for_quit(nxt_app_t *app)
4178{
4179 nxt_port_t *port;
4180
4181 port = NULL;
4182
4183 nxt_thread_mutex_lock(&app->mutex);
4184
4185 nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
4186
3928nxt_router_app_get_port_for_quit(nxt_app_t *app)
3929{
3930 nxt_port_t *port;
3931
3932 port = NULL;
3933
3934 nxt_thread_mutex_lock(&app->mutex);
3935
3936 nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
3937
4187 if (port->app_pending_responses > 0) {
4188 port = NULL;
4189
4190 continue;
4191 }
4192
4193 /* Caller is responsible to decrease port use count. */
4194 nxt_queue_chk_remove(&port->app_link);
4195
4196 if (nxt_queue_chk_remove(&port->idle_link)) {
4197 app->idle_processes--;
4198 }
4199
3938 /* Caller is responsible to decrease port use count. */
3939 nxt_queue_chk_remove(&port->app_link);
3940
3941 if (nxt_queue_chk_remove(&port->idle_link)) {
3942 app->idle_processes--;
3943 }
3944
3945 nxt_port_hash_remove(&app->port_hash, port);
3946 app->port_hash_count--;
3947
4200 port->app = NULL;
4201 app->processes--;
4202
4203 break;
4204
4205 } nxt_queue_loop;
4206
4207 nxt_thread_mutex_unlock(&app->mutex);

--- 9 unchanged lines hidden (view full) ---

4217
4218 nxt_queue_remove(&app->link);
4219
4220 nxt_router_app_use(task, app, -1);
4221}
4222
4223
4224static void
3948 port->app = NULL;
3949 app->processes--;
3950
3951 break;
3952
3953 } nxt_queue_loop;
3954
3955 nxt_thread_mutex_unlock(&app->mutex);

--- 9 unchanged lines hidden (view full) ---

3965
3966 nxt_queue_remove(&app->link);
3967
3968 nxt_router_app_use(task, app, -1);
3969}
3970
3971
3972static void
4225nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data)
4226{
4227 nxt_request_app_link_t *req_app_link;
4228
4229 req_app_link = data;
4230
4231#if (NXT_DEBUG)
4232 {
4233 nxt_app_t *app;
4234
4235 app = obj;
4236
4237 nxt_assert(app != NULL);
4238 nxt_assert(req_app_link != NULL);
4239 nxt_assert(req_app_link->app_port != NULL);
4240
4241 nxt_debug(task, "app '%V' %p process next stream #%uD",
4242 &app->name, app, req_app_link->stream);
4243 }
4244#endif
4245
4246 nxt_router_app_prepare_request(task, req_app_link);
4247
4248 nxt_request_app_link_use(task, req_app_link, -1);
4249}
4250
4251
4252static void
4253nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
4254 nxt_apr_action_t action)
4255{
3973nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
3974 nxt_apr_action_t action)
3975{
4256 int inc_use;
4257 uint32_t dec_pending, got_response;
4258 nxt_app_t *app;
4259 nxt_bool_t port_unchained;
4260 nxt_bool_t send_quit, cancelled, adjust_idle_timer;
4261 nxt_queue_link_t *lnk;
4262 nxt_request_app_link_t *req_app_link, *pending_ra, *re_ra;
4263 nxt_port_select_state_t state;
3976 int inc_use;
3977 uint32_t got_response, dec_requests;
3978 nxt_app_t *app;
3979 nxt_bool_t port_unchained, send_quit, adjust_idle_timer;
3980 nxt_port_t *main_app_port;
4264
4265 nxt_assert(port != NULL);
4266 nxt_assert(port->app != NULL);
4267
3981
3982 nxt_assert(port != NULL);
3983 nxt_assert(port->app != NULL);
3984
4268 req_app_link = NULL;
4269
4270 app = port->app;
4271
4272 inc_use = 0;
3985 app = port->app;
3986
3987 inc_use = 0;
4273 dec_pending = 0;
4274 got_response = 0;
3988 got_response = 0;
3989 dec_requests = 0;
4275
4276 switch (action) {
4277 case NXT_APR_NEW_PORT:
4278 break;
4279 case NXT_APR_REQUEST_FAILED:
3990
3991 switch (action) {
3992 case NXT_APR_NEW_PORT:
3993 break;
3994 case NXT_APR_REQUEST_FAILED:
4280 dec_pending = 1;
3995 dec_requests = 1;
4281 inc_use = -1;
4282 break;
4283 case NXT_APR_GOT_RESPONSE:
3996 inc_use = -1;
3997 break;
3998 case NXT_APR_GOT_RESPONSE:
4284 dec_pending = 1;
4285 got_response = 1;
4286 inc_use = -1;
4287 break;
4288 case NXT_APR_UPGRADE:
3999 got_response = 1;
4000 inc_use = -1;
4001 break;
4002 case NXT_APR_UPGRADE:
4289 dec_pending = 1;
4290 got_response = 1;
4291 break;
4292 case NXT_APR_CLOSE:
4293 inc_use = -1;
4294 break;
4295 }
4296
4003 got_response = 1;
4004 break;
4005 case NXT_APR_CLOSE:
4006 inc_use = -1;
4007 break;
4008 }
4009
4297 nxt_thread_mutex_lock(&app->mutex);
4010 nxt_debug(task, "app '%V' release port %PI:%d: %d %d", &app->name,
4011 port->pid, port->id,
4012 (int) inc_use, (int) got_response);
4298
4013
4299 port->app_pending_responses -= dec_pending;
4300 port->app_responses += got_response;
4014 if (port == app->shared_port) {
4015 nxt_thread_mutex_lock(&app->mutex);
4301
4016
4302 if (port->pair[1] != -1
4303 && (app->max_pending_responses == 0
4304 || port->app_pending_responses < app->max_pending_responses)
4305 && (app->max_requests == 0
4306 || port->app_responses + port->app_pending_responses
4307 < app->max_requests))
4308 {
4309 if (port->app_link.next == NULL) {
4310 if (port->app_pending_responses > 0) {
4311 nxt_queue_insert_tail(&app->ports, &port->app_link);
4017 app->active_requests -= got_response + dec_requests;
4312
4018
4313 } else {
4314 nxt_queue_insert_head(&app->ports, &port->app_link);
4315 }
4019 nxt_thread_mutex_unlock(&app->mutex);
4316
4020
4317 nxt_port_inc_use(port);
4318
4319 } else {
4320 if (port->app_pending_responses == 0
4321 && nxt_queue_first(&app->ports) != &port->app_link)
4322 {
4323 nxt_queue_remove(&port->app_link);
4324 nxt_queue_insert_head(&app->ports, &port->app_link);
4325 }
4326 }
4021 goto adjust_use;
4327 }
4328
4022 }
4023
4329 if (!nxt_queue_is_empty(&app->ports)
4330 && !nxt_queue_is_empty(&app->requests))
4331 {
4332 lnk = nxt_queue_first(&app->requests);
4333 nxt_queue_remove(lnk);
4334 lnk->next = NULL;
4024 main_app_port = port->main_app_port;
4335
4025
4336 req_app_link = nxt_queue_link_data(lnk, nxt_request_app_link_t,
4337 link_app_requests);
4026 nxt_thread_mutex_lock(&app->mutex);
4338
4027
4339 req_app_link->app_port = nxt_router_pop_first_port(app);
4028 main_app_port->app_responses += got_response;
4029 main_app_port->active_requests -= got_response + dec_requests;
4030 app->active_requests -= got_response + dec_requests;
4340
4031
4341 if (req_app_link->app_port->app_pending_responses > 1) {
4342 nxt_request_app_link_pending(task, app, req_app_link);
4343 }
4344 }
4345
4346 /* Pop first pending request for this port. */
4347 if (dec_pending > 0
4348 && !nxt_queue_is_empty(&port->pending_requests))
4032 if (main_app_port->pair[1] != -1
4033 && (app->max_requests == 0
4034 || main_app_port->app_responses < app->max_requests))
4349 {
4035 {
4350 lnk = nxt_queue_first(&port->pending_requests);
4351 nxt_queue_remove(lnk);
4352 lnk->next = NULL;
4036 if (main_app_port->app_link.next == NULL) {
4037 nxt_queue_insert_tail(&app->ports, &main_app_port->app_link);
4353
4038
4354 pending_ra = nxt_queue_link_data(lnk, nxt_request_app_link_t,
4355 link_port_pending);
4356
4357 nxt_assert(pending_ra->link_app_pending.next != NULL);
4358
4359 nxt_queue_remove(&pending_ra->link_app_pending);
4360 pending_ra->link_app_pending.next = NULL;
4361
4362 } else {
4363 pending_ra = NULL;
4364 }
4365
4366 /* Try to cancel and re-schedule first stalled request for this app. */
4367 if (got_response > 0 && !nxt_queue_is_empty(&app->pending)) {
4368 lnk = nxt_queue_first(&app->pending);
4369
4370 re_ra = nxt_queue_link_data(lnk, nxt_request_app_link_t,
4371 link_app_pending);
4372
4373 if (re_ra->res_time <= nxt_thread_monotonic_time(task->thread)) {
4374
4375 nxt_debug(task, "app '%V' stalled request #%uD detected",
4376 &app->name, re_ra->stream);
4377
4378 cancelled = nxt_router_msg_cancel(task, &re_ra->msg_info,
4379 re_ra->stream);
4380
4381 if (cancelled) {
4382 state.req_app_link = re_ra;
4383 state.app = app;
4384
4385 /*
4386 * Need to increment use count "in advance" because
4387 * nxt_router_port_select() will remove re_ra from lists
4388 * and decrement use count.
4389 */
4390 nxt_request_app_link_inc_use(re_ra);
4391
4392 nxt_router_port_select(task, &state);
4393
4394 goto re_ra_cancelled;
4395 }
4039 nxt_port_inc_use(main_app_port);
4396 }
4397 }
4398
4040 }
4041 }
4042
4399 re_ra = NULL;
4400
4401re_ra_cancelled:
4402
4403 send_quit = (app->max_requests > 0
4043 send_quit = (app->max_requests > 0
4404 && port->app_pending_responses == 0
4405 && port->app_responses >= app->max_requests);
4044 && main_app_port->app_responses >= app->max_requests);
4406
4407 if (send_quit) {
4045
4046 if (send_quit) {
4408 port_unchained = nxt_queue_chk_remove(&port->app_link);
4047 port_unchained = nxt_queue_chk_remove(&main_app_port->app_link);
4409
4048
4410 port->app = NULL;
4049 nxt_port_hash_remove(&app->port_hash, main_app_port);
4050 app->port_hash_count--;
4051
4052 main_app_port->app = NULL;
4411 app->processes--;
4412
4413 } else {
4414 port_unchained = 0;
4415 }
4416
4417 adjust_idle_timer = 0;
4418
4053 app->processes--;
4054
4055 } else {
4056 port_unchained = 0;
4057 }
4058
4059 adjust_idle_timer = 0;
4060
4419 if (port->pair[1] != -1 && !send_quit && port->app_pending_responses == 0
4420 && nxt_queue_is_empty(&port->active_websockets)
4421 && port->idle_link.next == NULL)
4061 if (main_app_port->pair[1] != -1 && !send_quit
4062 && main_app_port->active_requests == 0
4063 && main_app_port->active_websockets == 0
4064 && main_app_port->idle_link.next == NULL)
4422 {
4423 if (app->idle_processes == app->spare_processes
4424 && app->adjust_idle_work.data == NULL)
4425 {
4426 adjust_idle_timer = 1;
4427 app->adjust_idle_work.data = app;
4428 app->adjust_idle_work.next = NULL;
4429 }
4430
4431 if (app->idle_processes < app->spare_processes) {
4065 {
4066 if (app->idle_processes == app->spare_processes
4067 && app->adjust_idle_work.data == NULL)
4068 {
4069 adjust_idle_timer = 1;
4070 app->adjust_idle_work.data = app;
4071 app->adjust_idle_work.next = NULL;
4072 }
4073
4074 if (app->idle_processes < app->spare_processes) {
4432 nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
4075 nxt_queue_insert_tail(&app->spare_ports, &main_app_port->idle_link);
4433
4434 } else {
4076
4077 } else {
4435 nxt_queue_insert_tail(&app->idle_ports, &port->idle_link);
4078 nxt_queue_insert_tail(&app->idle_ports, &main_app_port->idle_link);
4436
4079
4437 port->idle_start = task->thread->engine->timers.now;
4080 main_app_port->idle_start = task->thread->engine->timers.now;
4438 }
4439
4440 app->idle_processes++;
4441 }
4442
4443 nxt_thread_mutex_unlock(&app->mutex);
4444
4445 if (adjust_idle_timer) {
4446 nxt_router_app_use(task, app, 1);
4447 nxt_event_engine_post(app->engine, &app->adjust_idle_work);
4448 }
4449
4081 }
4082
4083 app->idle_processes++;
4084 }
4085
4086 nxt_thread_mutex_unlock(&app->mutex);
4087
4088 if (adjust_idle_timer) {
4089 nxt_router_app_use(task, app, 1);
4090 nxt_event_engine_post(app->engine, &app->adjust_idle_work);
4091 }
4092
4450 if (pending_ra != NULL) {
4451 nxt_request_app_link_use(task, pending_ra, -1);
4452 }
4453
4454 if (re_ra != NULL) {
4455 if (nxt_router_port_post_select(task, &state) == NXT_OK) {
4456 /*
4457 * Reference counter already incremented above, this will
4458 * keep re_ra while nxt_router_app_process_request()
4459 * task is in queue. Reference counter decreased in
4460 * nxt_router_app_process_request() after processing.
4461 */
4462
4463 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
4464 nxt_router_app_process_request,
4465 &task->thread->engine->task, app, re_ra);
4466
4467 } else {
4468 nxt_request_app_link_use(task, re_ra, -1);
4469 }
4470 }
4471
4472 if (req_app_link != NULL) {
4473 /*
4474 * There should be call nxt_request_app_link_inc_use(req_app_link),
4475 * because of one more link in the queue. But one link was
4476 * recently removed from app->requests linked list.
4477 * Corresponding decrement is in nxt_router_app_process_request().
4478 */
4479
4480 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
4481 nxt_router_app_process_request,
4482 &task->thread->engine->task, app, req_app_link);
4483
4484 goto adjust_use;
4485 }
4486
4487 /* ? */
4093 /* ? */
4488 if (port->pair[1] == -1) {
4094 if (main_app_port->pair[1] == -1) {
4489 nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)",
4095 nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)",
4490 &app->name, app, port, port->pid);
4096 &app->name, app, main_app_port, main_app_port->pid);
4491
4492 goto adjust_use;
4493 }
4494
4495 if (send_quit) {
4097
4098 goto adjust_use;
4099 }
4100
4101 if (send_quit) {
4496 nxt_debug(task, "app '%V' %p send QUIT to port",
4497 &app->name, app);
4102 nxt_debug(task, "app '%V' %p send QUIT to port", &app->name, app);
4498
4103
4499 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT,
4500 -1, 0, 0, NULL);
4104 nxt_port_socket_write(task, main_app_port, NXT_PORT_MSG_QUIT, -1, 0, 0,
4105 NULL);
4501
4502 if (port_unchained) {
4106
4107 if (port_unchained) {
4503 nxt_port_use(task, port, -1);
4108 nxt_port_use(task, main_app_port, -1);
4504 }
4505
4506 goto adjust_use;
4507 }
4508
4509 nxt_debug(task, "app '%V' %p requests queue is empty, keep the port",
4510 &app->name, app);
4511

--- 12 unchanged lines hidden (view full) ---

4524 nxt_queue_link_t *idle_lnk;
4525
4526 app = port->app;
4527
4528 nxt_assert(app != NULL);
4529
4530 nxt_thread_mutex_lock(&app->mutex);
4531
4109 }
4110
4111 goto adjust_use;
4112 }
4113
4114 nxt_debug(task, "app '%V' %p requests queue is empty, keep the port",
4115 &app->name, app);
4116

--- 12 unchanged lines hidden (view full) ---

4129 nxt_queue_link_t *idle_lnk;
4130
4131 app = port->app;
4132
4133 nxt_assert(app != NULL);
4134
4135 nxt_thread_mutex_lock(&app->mutex);
4136
4137 nxt_port_hash_remove(&app->port_hash, port);
4138 app->port_hash_count--;
4139
4140 if (port->id != 0) {
4141 nxt_thread_mutex_unlock(&app->mutex);
4142
4143 nxt_debug(task, "app '%V' port (%PI, %d) closed", &app->name,
4144 port->pid, port->id);
4145
4146 return;
4147 }
4148
4532 unchain = nxt_queue_chk_remove(&port->app_link);
4533
4534 if (nxt_queue_chk_remove(&port->idle_link)) {
4535 app->idle_processes--;
4536
4537 if (port->idle_start == 0
4538 && app->idle_processes >= app->spare_processes)
4539 {

--- 8 unchanged lines hidden (view full) ---

4548 idle_port->idle_start = 0;
4549 }
4550 }
4551
4552 app->processes--;
4553
4554 start_process = !task->thread->engine->shutdown
4555 && nxt_router_app_can_start(app)
4149 unchain = nxt_queue_chk_remove(&port->app_link);
4150
4151 if (nxt_queue_chk_remove(&port->idle_link)) {
4152 app->idle_processes--;
4153
4154 if (port->idle_start == 0
4155 && app->idle_processes >= app->spare_processes)
4156 {

--- 8 unchanged lines hidden (view full) ---

4165 idle_port->idle_start = 0;
4166 }
4167 }
4168
4169 app->processes--;
4170
4171 start_process = !task->thread->engine->shutdown
4172 && nxt_router_app_can_start(app)
4556 && (!nxt_queue_is_empty(&app->requests)
4557 || nxt_router_app_need_start(app));
4173 && nxt_router_app_need_start(app);
4558
4559 if (start_process) {
4560 app->pending_processes++;
4561 }
4562
4563 nxt_thread_mutex_unlock(&app->mutex);
4564
4565 nxt_debug(task, "app '%V' pid %PI closed", &app->name, port->pid);

--- 32 unchanged lines hidden (view full) ---

4598 timeout = 0;
4599
4600 nxt_thread_mutex_lock(&app->mutex);
4601
4602 if (queued) {
4603 app->adjust_idle_work.data = NULL;
4604 }
4605
4174
4175 if (start_process) {
4176 app->pending_processes++;
4177 }
4178
4179 nxt_thread_mutex_unlock(&app->mutex);
4180
4181 nxt_debug(task, "app '%V' pid %PI closed", &app->name, port->pid);

--- 32 unchanged lines hidden (view full) ---

4214 timeout = 0;
4215
4216 nxt_thread_mutex_lock(&app->mutex);
4217
4218 if (queued) {
4219 app->adjust_idle_work.data = NULL;
4220 }
4221
4222 nxt_debug(task, "app '%V' idle_processes %d, spare_processes %d",
4223 &app->name,
4224 (int) app->idle_processes, (int) app->spare_processes);
4225
4606 while (app->idle_processes > app->spare_processes) {
4607
4608 nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4609
4610 lnk = nxt_queue_first(&app->idle_ports);
4611 port = nxt_queue_link_data(lnk, nxt_port_t, idle_link);
4612
4613 timeout = port->idle_start + app->idle_timeout;
4614
4226 while (app->idle_processes > app->spare_processes) {
4227
4228 nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4229
4230 lnk = nxt_queue_first(&app->idle_ports);
4231 port = nxt_queue_link_data(lnk, nxt_port_t, idle_link);
4232
4233 timeout = port->idle_start + app->idle_timeout;
4234
4235 nxt_debug(task, "app '%V' pid %PI, start %M, timeout %M, threshold %M",
4236 &app->name, port->pid,
4237 port->idle_start, timeout, threshold);
4238
4615 if (timeout > threshold) {
4616 break;
4617 }
4618
4619 nxt_queue_remove(lnk);
4620 lnk->next = NULL;
4621
4622 nxt_queue_chk_remove(&port->app_link);
4623
4239 if (timeout > threshold) {
4240 break;
4241 }
4242
4243 nxt_queue_remove(lnk);
4244 lnk->next = NULL;
4245
4246 nxt_queue_chk_remove(&port->app_link);
4247
4248 nxt_port_hash_remove(&app->port_hash, port);
4249 app->port_hash_count--;
4250
4624 app->idle_processes--;
4625 app->processes--;
4626 port->app = NULL;
4627
4628 nxt_thread_mutex_unlock(&app->mutex);
4629
4630 nxt_debug(task, "app '%V' send QUIT to idle port %PI",
4631 &app->name, port->pid);

--- 67 unchanged lines hidden (view full) ---

4699 nxt_debug(task, "send QUIT to app '%V' pid %PI", &app->name, port->pid);
4700
4701 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4702
4703 nxt_port_use(task, port, -1);
4704 }
4705
4706 nxt_assert(app->processes == 0);
4251 app->idle_processes--;
4252 app->processes--;
4253 port->app = NULL;
4254
4255 nxt_thread_mutex_unlock(&app->mutex);
4256
4257 nxt_debug(task, "app '%V' send QUIT to idle port %PI",
4258 &app->name, port->pid);

--- 67 unchanged lines hidden (view full) ---

4326 nxt_debug(task, "send QUIT to app '%V' pid %PI", &app->name, port->pid);
4327
4328 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4329
4330 nxt_port_use(task, port, -1);
4331 }
4332
4333 nxt_assert(app->processes == 0);
4334 nxt_assert(app->active_requests == 0);
4335 nxt_assert(app->port_hash_count == 0);
4707 nxt_assert(app->idle_processes == 0);
4336 nxt_assert(app->idle_processes == 0);
4708 nxt_assert(nxt_queue_is_empty(&app->requests));
4709 nxt_assert(nxt_queue_is_empty(&app->ports));
4710 nxt_assert(nxt_queue_is_empty(&app->spare_ports));
4711 nxt_assert(nxt_queue_is_empty(&app->idle_ports));
4712
4337 nxt_assert(nxt_queue_is_empty(&app->ports));
4338 nxt_assert(nxt_queue_is_empty(&app->spare_ports));
4339 nxt_assert(nxt_queue_is_empty(&app->idle_ports));
4340
4341 nxt_port_mmaps_destroy(&app->outgoing, 1);
4342
4343 nxt_thread_mutex_destroy(&app->outgoing.mutex);
4344
4345 if (app->shared_port != NULL) {
4346 app->shared_port->app = NULL;
4347 nxt_port_close(task, app->shared_port);
4348 nxt_port_use(task, app->shared_port, -1);
4349 }
4350
4713 nxt_thread_mutex_destroy(&app->mutex);
4714 nxt_mp_destroy(app->mem_pool);
4715
4716 app_joint->app = NULL;
4717
4718 if (nxt_timer_delete(task->thread->engine, &app_joint->idle_timer)) {
4719 app_joint->idle_timer.handler = nxt_router_app_joint_release_handler;
4720 nxt_timer_add(task->thread->engine, &app_joint->idle_timer, 0);
4721
4722 } else {
4723 nxt_router_app_joint_use(task, app_joint, -1);
4724 }
4725}
4726
4727
4728static void
4351 nxt_thread_mutex_destroy(&app->mutex);
4352 nxt_mp_destroy(app->mem_pool);
4353
4354 app_joint->app = NULL;
4355
4356 if (nxt_timer_delete(task->thread->engine, &app_joint->idle_timer)) {
4357 app_joint->idle_timer.handler = nxt_router_app_joint_release_handler;
4358 nxt_timer_add(task->thread->engine, &app_joint->idle_timer, 0);
4359
4360 } else {
4361 nxt_router_app_joint_use(task, app_joint, -1);
4362 }
4363}
4364
4365
4366static void
4729nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state)
4367nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
4368 nxt_request_rpc_data_t *req_rpc_data)
4730{
4369{
4731 int ra_use_delta;
4732 nxt_app_t *app;
4733 nxt_bool_t can_start_process;
4734 nxt_request_app_link_t *req_app_link;
4370 nxt_bool_t start_process;
4371 nxt_port_t *port;
4735
4372
4736 req_app_link = state->req_app_link;
4737 app = state->app;
4373 start_process = 0;
4738
4374
4739 state->failed_port_use_delta = 0;
4740 ra_use_delta = -nxt_queue_chk_remove(&req_app_link->link_app_requests);
4375 nxt_thread_mutex_lock(&app->mutex);
4741
4376
4742 if (nxt_queue_chk_remove(&req_app_link->link_port_pending))
4743 {
4744 nxt_assert(req_app_link->link_app_pending.next != NULL);
4377 port = app->shared_port;
4378 nxt_port_inc_use(port);
4745
4379
4746 nxt_queue_remove(&req_app_link->link_app_pending);
4747 req_app_link->link_app_pending.next = NULL;
4380 app->active_requests++;
4748
4381
4749 ra_use_delta--;
4382 if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) {
4383 app->pending_processes++;
4384 start_process = 1;
4750 }
4751
4385 }
4386
4752 state->failed_port = req_app_link->app_port;
4387 nxt_thread_mutex_unlock(&app->mutex);
4753
4388
4754 if (req_app_link->app_port != NULL) {
4755 state->failed_port_use_delta--;
4389 req_rpc_data->app_port = port;
4390 req_rpc_data->apr_action = NXT_APR_REQUEST_FAILED;
4756
4391
4757 state->failed_port->app_pending_responses--;
4758
4759 if (nxt_queue_chk_remove(&state->failed_port->app_link)) {
4760 state->failed_port_use_delta--;
4761 }
4762
4763 req_app_link->app_port = NULL;
4392 if (start_process) {
4393 nxt_router_start_app_process(task, app);
4764 }
4394 }
4765
4766 can_start_process = nxt_router_app_can_start(app);
4767
4768 state->port = NULL;
4769 state->start_process = 0;
4770
4771 if (nxt_queue_is_empty(&app->ports)
4772 || (can_start_process && nxt_router_app_first_port_busy(app)) )
4773 {
4774 req_app_link = nxt_request_app_link_alloc(task, req_app_link,
4775 req_app_link->req_rpc_data);
4776 if (nxt_slow_path(req_app_link == NULL)) {
4777 goto fail;
4778 }
4779
4780 if (nxt_slow_path(state->failed_port != NULL)) {
4781 nxt_queue_insert_head(&app->requests,
4782 &req_app_link->link_app_requests);
4783
4784 } else {
4785 nxt_queue_insert_tail(&app->requests,
4786 &req_app_link->link_app_requests);
4787 }
4788
4789 nxt_request_app_link_inc_use(req_app_link);
4790
4791 nxt_debug(task, "req_app_link stream #%uD enqueue to app->requests",
4792 req_app_link->stream);
4793
4794 if (can_start_process) {
4795 app->pending_processes++;
4796 state->start_process = 1;
4797 }
4798
4799 } else {
4800 state->port = nxt_router_pop_first_port(app);
4801
4802 if (state->port->app_pending_responses > 1) {
4803 req_app_link = nxt_request_app_link_alloc(task, req_app_link,
4804 req_app_link->req_rpc_data);
4805 if (nxt_slow_path(req_app_link == NULL)) {
4806 goto fail;
4807 }
4808
4809 req_app_link->app_port = state->port;
4810
4811 nxt_request_app_link_pending(task, app, req_app_link);
4812 }
4813
4814 if (can_start_process && nxt_router_app_need_start(app)) {
4815 app->pending_processes++;
4816 state->start_process = 1;
4817 }
4818 }
4819
4820 nxt_request_app_link_chk_use(req_app_link, ra_use_delta);
4821
4822fail:
4823
4824 state->shared_ra = req_app_link;
4825}
4826
4827
4395}
4396
4397
4828static nxt_int_t
4829nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state)
4830{
4831 nxt_int_t res;
4832 nxt_app_t *app;
4833 nxt_request_app_link_t *req_app_link;
4834
4835 req_app_link = state->shared_ra;
4836 app = state->app;
4837
4838 if (state->failed_port_use_delta != 0) {
4839 nxt_port_use(task, state->failed_port, state->failed_port_use_delta);
4840 }
4841
4842 if (nxt_slow_path(req_app_link == NULL)) {
4843 if (state->port != NULL) {
4844 nxt_port_use(task, state->port, -1);
4845 }
4846
4847 nxt_request_app_link_error(task, app, state->req_app_link,
4848 "Failed to allocate shared req<->app link");
4849
4850 return NXT_ERROR;
4851 }
4852
4853 if (state->port != NULL) {
4854 nxt_debug(task, "already have port for app '%V' %p ", &app->name, app);
4855
4856 req_app_link->app_port = state->port;
4857
4858 if (state->start_process) {
4859 nxt_router_start_app_process(task, app);
4860 }
4861
4862 return NXT_OK;
4863 }
4864
4865 if (!state->start_process) {
4866 nxt_debug(task, "app '%V' %p too many running or pending processes",
4867 &app->name, app);
4868
4869 return NXT_AGAIN;
4870 }
4871
4872 res = nxt_router_start_app_process(task, app);
4873
4874 if (nxt_slow_path(res != NXT_OK)) {
4875 nxt_request_app_link_error(task, app, req_app_link,
4876 "Failed to start app process");
4877
4878 return NXT_ERROR;
4879 }
4880
4881 return NXT_AGAIN;
4882}
4883
4884
4885static nxt_int_t
4886nxt_router_app_port(nxt_task_t *task, nxt_app_t *app,
4887 nxt_request_app_link_t *req_app_link)
4888{
4889 nxt_port_select_state_t state;
4890
4891 state.req_app_link = req_app_link;
4892 state.app = app;
4893
4894 nxt_thread_mutex_lock(&app->mutex);
4895
4896 nxt_router_port_select(task, &state);
4897
4898 nxt_thread_mutex_unlock(&app->mutex);
4899
4900 return nxt_router_port_post_select(task, &state);
4901}
4902
4903
4904void
4905nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r,
4906 nxt_app_t *app)
4907{
4398void
4399nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r,
4400 nxt_app_t *app)
4401{
4908 nxt_int_t res;
4909 nxt_port_t *port;
4910 nxt_event_engine_t *engine;
4402 nxt_event_engine_t *engine;
4911 nxt_request_app_link_t ra_local, *req_app_link;
4912 nxt_request_rpc_data_t *req_rpc_data;
4913
4914 engine = task->thread->engine;
4915
4916 req_rpc_data = nxt_port_rpc_register_handler_ex(task, engine->port,
4917 nxt_router_response_ready_handler,
4918 nxt_router_response_error_handler,
4919 sizeof(nxt_request_rpc_data_t));
4920 if (nxt_slow_path(req_rpc_data == NULL)) {
4921 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
4922 return;
4923 }
4924
4925 /*
4926 * At this point we have request req_rpc_data allocated and registered
4927 * in port handlers. Need to fixup request memory pool. Counterpart
4928 * release will be called via following call chain:
4929 * nxt_request_rpc_data_unlink() ->
4403 nxt_request_rpc_data_t *req_rpc_data;
4404
4405 engine = task->thread->engine;
4406
4407 req_rpc_data = nxt_port_rpc_register_handler_ex(task, engine->port,
4408 nxt_router_response_ready_handler,
4409 nxt_router_response_error_handler,
4410 sizeof(nxt_request_rpc_data_t));
4411 if (nxt_slow_path(req_rpc_data == NULL)) {
4412 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
4413 return;
4414 }
4415
4416 /*
4417 * At this point we have request req_rpc_data allocated and registered
4418 * in port handlers. Need to fixup request memory pool. Counterpart
4419 * release will be called via following call chain:
4420 * nxt_request_rpc_data_unlink() ->
4930 * nxt_router_http_request_done() ->
4421 * nxt_router_http_request_release_post() ->
4931 * nxt_router_http_request_release()
4932 */
4933 nxt_mp_retain(r->mem_pool);
4934
4935 r->timer.task = &engine->task;
4936 r->timer.work_queue = &engine->fast_work_queue;
4937 r->timer.log = engine->task.log;
4938 r->timer.bias = NXT_TIMER_DEFAULT_BIAS;
4939
4940 req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data);
4941 req_rpc_data->app = app;
4422 * nxt_router_http_request_release()
4423 */
4424 nxt_mp_retain(r->mem_pool);
4425
4426 r->timer.task = &engine->task;
4427 r->timer.work_queue = &engine->fast_work_queue;
4428 r->timer.log = engine->task.log;
4429 r->timer.bias = NXT_TIMER_DEFAULT_BIAS;
4430
4431 req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data);
4432 req_rpc_data->app = app;
4433 req_rpc_data->msg_info.body_fd = -1;
4434 req_rpc_data->rpc_cancel = 1;
4942
4943 nxt_router_app_use(task, app, 1);
4944
4945 req_rpc_data->request = r;
4946 r->req_rpc_data = req_rpc_data;
4947
4435
4436 nxt_router_app_use(task, app, 1);
4437
4438 req_rpc_data->request = r;
4439 r->req_rpc_data = req_rpc_data;
4440
4948 req_app_link = &ra_local;
4949 nxt_request_app_link_init(task, req_app_link, req_rpc_data);
4441 if (r->last != NULL) {
4442 r->last->completion_handler = nxt_router_http_request_done;
4443 }
4950
4444
4951 res = nxt_router_app_port(task, app, req_app_link);
4952 req_app_link = req_rpc_data->req_app_link;
4445 nxt_router_app_port_get(task, app, req_rpc_data);
4446 nxt_router_app_prepare_request(task, req_rpc_data);
4447}
4953
4448
4954 if (res == NXT_OK) {
4955 port = req_app_link->app_port;
4956
4449
4957 nxt_assert(port != NULL);
4450static void
4451nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data)
4452{
4453 nxt_http_request_t *r;
4958
4454
4959 nxt_port_rpc_ex_set_peer(task, engine->port, req_rpc_data, port->pid);
4455 r = data;
4960
4456
4961 nxt_router_app_prepare_request(task, req_app_link);
4457 nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data);
4458
4459 if (r->req_rpc_data) {
4460 nxt_request_rpc_data_unlink(task, r->req_rpc_data);
4962 }
4963
4461 }
4462
4964 nxt_request_app_link_use(task, req_app_link, -1);
4463 nxt_http_request_close_handler(task, r, r->proto.any);
4965}
4966
4967
4968static void
4969nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj, void *data)
4970{
4971}
4972
4973
4974static void
4975nxt_router_app_prepare_request(nxt_task_t *task,
4464}
4465
4466
4467static void
4468nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj, void *data)
4469{
4470}
4471
4472
4473static void
4474nxt_router_app_prepare_request(nxt_task_t *task,
4976 nxt_request_app_link_t *req_app_link)
4475 nxt_request_rpc_data_t *req_rpc_data)
4977{
4476{
4978 nxt_buf_t *buf;
4477 nxt_app_t *app;
4478 nxt_buf_t *buf, *body;
4979 nxt_int_t res;
4980 nxt_port_t *port, *reply_port;
4479 nxt_int_t res;
4480 nxt_port_t *port, *reply_port;
4981 nxt_apr_action_t apr_action;
4982
4481
4983 nxt_assert(req_app_link->app_port != NULL);
4482 app = req_rpc_data->app;
4984
4483
4985 port = req_app_link->app_port;
4986 reply_port = req_app_link->reply_port;
4484 nxt_assert(app != NULL);
4987
4485
4988 apr_action = NXT_APR_REQUEST_FAILED;
4486 port = req_rpc_data->app_port;
4989
4487
4990 buf = nxt_router_prepare_msg(task, req_app_link->request, port,
4991 nxt_app_msg_prefix[port->app->type]);
4488 nxt_assert(port != NULL);
4992
4489
4490 reply_port = task->thread->engine->port;
4491
4492 buf = nxt_router_prepare_msg(task, req_rpc_data->request, app,
4493 nxt_app_msg_prefix[app->type]);
4993 if (nxt_slow_path(buf == NULL)) {
4494 if (nxt_slow_path(buf == NULL)) {
4994 nxt_request_app_link_error(task, port->app, req_app_link,
4995 "Failed to prepare message for application");
4996 goto release_port;
4495 nxt_alert(task, "stream #%uD, app '%V': failed to prepare app message",
4496 req_rpc_data->stream, &app->name);
4497
4498 nxt_http_request_error(task, req_rpc_data->request,
4499 NXT_HTTP_INTERNAL_SERVER_ERROR);
4500
4501 return;
4997 }
4998
4999 nxt_debug(task, "about to send %O bytes buffer to app process port %d",
5000 nxt_buf_used_size(buf),
5001 port->socket.fd);
5002
4502 }
4503
4504 nxt_debug(task, "about to send %O bytes buffer to app process port %d",
4505 nxt_buf_used_size(buf),
4506 port->socket.fd);
4507
5003 apr_action = NXT_APR_NEW_PORT;
4508 req_rpc_data->msg_info.buf = buf;
4509 req_rpc_data->msg_info.completion_handler = buf->completion_handler;
5004
4510
5005 req_app_link->msg_info.buf = buf;
5006 req_app_link->msg_info.completion_handler = buf->completion_handler;
5007
5008 for (; buf; buf = buf->next) {
4511 do {
5009 buf->completion_handler = nxt_router_dummy_buf_completion;
4512 buf->completion_handler = nxt_router_dummy_buf_completion;
5010 }
4513 buf = buf->next;
4514 } while (buf != NULL);
5011
4515
5012 buf = req_app_link->msg_info.buf;
4516 buf = req_rpc_data->msg_info.buf;
5013
4517
5014 res = nxt_port_mmap_get_tracking(task, &port->process->outgoing,
5015 &req_app_link->msg_info.tracking,
5016 req_app_link->stream);
5017 if (nxt_slow_path(res != NXT_OK)) {
5018 nxt_request_app_link_error(task, port->app, req_app_link,
5019 "Failed to get tracking area");
5020 goto release_port;
5021 }
4518 body = req_rpc_data->request->body;
5022
4519
5023 if (req_app_link->body_fd != -1) {
5024 nxt_debug(task, "stream #%uD: send body fd %d", req_app_link->stream,
5025 req_app_link->body_fd);
4520 if (body != NULL && nxt_buf_is_file(body)) {
4521 req_rpc_data->msg_info.body_fd = body->file->fd;
5026
4522
5027 lseek(req_app_link->body_fd, 0, SEEK_SET);
4523 body->file->fd = -1;
4524
4525 } else {
4526 req_rpc_data->msg_info.body_fd = -1;
5028 }
5029
4527 }
4528
5030 res = nxt_port_socket_twrite(task, port, NXT_PORT_MSG_REQ_HEADERS,
5031 req_app_link->body_fd,
5032 req_app_link->stream, reply_port->id, buf,
5033 &req_app_link->msg_info.tracking);
4529 if (req_rpc_data->msg_info.body_fd != -1) {
4530 nxt_debug(task, "stream #%uD: send body fd %d", req_rpc_data->stream,
4531 req_rpc_data->msg_info.body_fd);
5034
4532
5035 if (nxt_slow_path(res != NXT_OK)) {
5036 nxt_request_app_link_error(task, port->app, req_app_link,
5037 "Failed to send message to application");
5038 goto release_port;
4533 lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET);
5039 }
5040
4534 }
4535
5041release_port:
4536 res = nxt_port_socket_twrite(task, port,
4537 NXT_PORT_MSG_REQ_HEADERS,
4538 req_rpc_data->msg_info.body_fd,
4539 req_rpc_data->stream, reply_port->id, buf,
4540 NULL);
5042
4541
5043 nxt_router_app_port_release(task, port, apr_action);
4542 if (nxt_slow_path(res != NXT_OK)) {
4543 nxt_alert(task, "stream #%uD, app '%V': failed to send app message",
4544 req_rpc_data->stream, &app->name);
5044
4545
5045 nxt_request_app_link_update_peer(task, req_app_link);
4546 nxt_http_request_error(task, req_rpc_data->request,
4547 NXT_HTTP_INTERNAL_SERVER_ERROR);
4548 }
5046}
5047
5048
5049struct nxt_fields_iter_s {
5050 nxt_list_part_t *part;
5051 nxt_http_field_t *field;
5052};
5053

--- 41 unchanged lines hidden (view full) ---

5095 }
5096
5097 return nxt_fields_part_first(i->part->next, i);
5098}
5099
5100
5101static nxt_buf_t *
5102nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r,
4549}
4550
4551
4552struct nxt_fields_iter_s {
4553 nxt_list_part_t *part;
4554 nxt_http_field_t *field;
4555};
4556

--- 41 unchanged lines hidden (view full) ---

4598 }
4599
4600 return nxt_fields_part_first(i->part->next, i);
4601}
4602
4603
4604static nxt_buf_t *
4605nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r,
5103 nxt_port_t *port, const nxt_str_t *prefix)
4606 nxt_app_t *app, const nxt_str_t *prefix)
5104{
5105 void *target_pos, *query_pos;
5106 u_char *pos, *end, *p, c;
5107 size_t fields_count, req_size, size, free_size;
5108 size_t copy_size;
5109 nxt_off_t content_length;
5110 nxt_buf_t *b, *buf, *out, **tail;
5111 nxt_http_field_t *field, *dup;

--- 24 unchanged lines hidden (view full) ---

5136
5137 if (nxt_slow_path(req_size > PORT_MMAP_DATA_SIZE)) {
5138 nxt_alert(task, "headers to big to fit in shared memory (%d)",
5139 (int) req_size);
5140
5141 return NULL;
5142 }
5143
4607{
4608 void *target_pos, *query_pos;
4609 u_char *pos, *end, *p, c;
4610 size_t fields_count, req_size, size, free_size;
4611 size_t copy_size;
4612 nxt_off_t content_length;
4613 nxt_buf_t *b, *buf, *out, **tail;
4614 nxt_http_field_t *field, *dup;

--- 24 unchanged lines hidden (view full) ---

4639
4640 if (nxt_slow_path(req_size > PORT_MMAP_DATA_SIZE)) {
4641 nxt_alert(task, "headers to big to fit in shared memory (%d)",
4642 (int) req_size);
4643
4644 return NULL;
4645 }
4646
5144 out = nxt_port_mmap_get_buf(task, &port->process->outgoing,
4647 out = nxt_port_mmap_get_buf(task, &app->outgoing,
5145 nxt_min(req_size + content_length, PORT_MMAP_DATA_SIZE));
5146 if (nxt_slow_path(out == NULL)) {
5147 return NULL;
5148 }
5149
5150 req = (nxt_unit_request_t *) out->mem.free;
5151 out->mem.free += req_size;
5152

--- 165 unchanged lines hidden (view full) ---

5318 for (b = r->body; b != NULL; b = b->next) {
5319 size = nxt_buf_mem_used_size(&b->mem);
5320 pos = b->mem.pos;
5321
5322 while (size > 0) {
5323 if (buf == NULL) {
5324 free_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
5325
4648 nxt_min(req_size + content_length, PORT_MMAP_DATA_SIZE));
4649 if (nxt_slow_path(out == NULL)) {
4650 return NULL;
4651 }
4652
4653 req = (nxt_unit_request_t *) out->mem.free;
4654 out->mem.free += req_size;
4655

--- 165 unchanged lines hidden (view full) ---

4821 for (b = r->body; b != NULL; b = b->next) {
4822 size = nxt_buf_mem_used_size(&b->mem);
4823 pos = b->mem.pos;
4824
4825 while (size > 0) {
4826 if (buf == NULL) {
4827 free_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
4828
5326 buf = nxt_port_mmap_get_buf(task, &port->process->outgoing,
5327 free_size);
4829 buf = nxt_port_mmap_get_buf(task, &app->outgoing, free_size);
5328 if (nxt_slow_path(buf == NULL)) {
5329 while (out != NULL) {
5330 buf = out->next;
5331 out->next = NULL;
5332 out->completion_handler(task, out, out->parent);
5333 out = buf;
5334 }
5335 return NULL;

--- 31 unchanged lines hidden (view full) ---

5367
5368 return out;
5369}
5370
5371
5372static void
5373nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
5374{
4830 if (nxt_slow_path(buf == NULL)) {
4831 while (out != NULL) {
4832 buf = out->next;
4833 out->next = NULL;
4834 out->completion_handler(task, out, out->parent);
4835 out = buf;
4836 }
4837 return NULL;

--- 31 unchanged lines hidden (view full) ---

4869
4870 return out;
4871}
4872
4873
4874static void
4875nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
4876{
5375 nxt_app_t *app;
5376 nxt_bool_t cancelled, unlinked;
5377 nxt_port_t *port;
5378 nxt_timer_t *timer;
4877 nxt_timer_t *timer;
5379 nxt_queue_link_t *lnk;
5380 nxt_http_request_t *r;
4878 nxt_http_request_t *r;
5381 nxt_request_app_link_t *pending_ra;
5382 nxt_request_rpc_data_t *req_rpc_data;
4879 nxt_request_rpc_data_t *req_rpc_data;
5383 nxt_port_select_state_t state;
5384
5385 timer = obj;
5386
5387 nxt_debug(task, "router app timeout");
5388
5389 r = nxt_timer_data(timer, nxt_http_request_t, timer);
5390 req_rpc_data = r->timer_data;
4880
4881 timer = obj;
4882
4883 nxt_debug(task, "router app timeout");
4884
4885 r = nxt_timer_data(timer, nxt_http_request_t, timer);
4886 req_rpc_data = r->timer_data;
5391 app = req_rpc_data->app;
5392
4887
5393 if (app == NULL) {
5394 goto generate_error;
5395 }
5396
5397 port = NULL;
5398 pending_ra = NULL;
5399
5400 if (req_rpc_data->app_port != NULL) {
5401 port = req_rpc_data->app_port;
5402 req_rpc_data->app_port = NULL;
5403 }
5404
5405 if (port == NULL && req_rpc_data->req_app_link != NULL
5406 && req_rpc_data->req_app_link->app_port != NULL)
5407 {
5408 port = req_rpc_data->req_app_link->app_port;
5409 req_rpc_data->req_app_link->app_port = NULL;
5410 }
5411
5412 if (port == NULL) {
5413 goto generate_error;
5414 }
5415
5416 nxt_thread_mutex_lock(&app->mutex);
5417
5418 unlinked = nxt_queue_chk_remove(&port->app_link);
5419
5420 if (!nxt_queue_is_empty(&port->pending_requests)) {
5421 lnk = nxt_queue_first(&port->pending_requests);
5422
5423 pending_ra = nxt_queue_link_data(lnk, nxt_request_app_link_t,
5424 link_port_pending);
5425
5426 nxt_assert(pending_ra->link_app_pending.next != NULL);
5427
5428 nxt_debug(task, "app '%V' pending request #%uD found",
5429 &app->name, pending_ra->stream);
5430
5431 cancelled = nxt_router_msg_cancel(task, &pending_ra->msg_info,
5432 pending_ra->stream);
5433
5434 if (cancelled) {
5435 state.req_app_link = pending_ra;
5436 state.app = app;
5437
5438 /*
5439 * Need to increment use count "in advance" because
5440 * nxt_router_port_select() will remove pending_ra from lists
5441 * and decrement use count.
5442 */
5443 nxt_request_app_link_inc_use(pending_ra);
5444
5445 nxt_router_port_select(task, &state);
5446
5447 } else {
5448 pending_ra = NULL;
5449 }
5450 }
5451
5452 nxt_thread_mutex_unlock(&app->mutex);
5453
5454 if (pending_ra != NULL) {
5455 if (nxt_router_port_post_select(task, &state) == NXT_OK) {
5456 /*
5457 * Reference counter already incremented above, this will
5458 * keep pending_ra while nxt_router_app_process_request()
5459 * task is in queue. Reference counter decreased in
5460 * nxt_router_app_process_request() after processing.
5461 */
5462
5463 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
5464 nxt_router_app_process_request,
5465 &task->thread->engine->task, app, pending_ra);
5466
5467 } else {
5468 nxt_request_app_link_use(task, pending_ra, -1);
5469 }
5470 }
5471
5472 nxt_debug(task, "send quit to app '%V' pid %PI", &app->name, port->pid);
5473
5474 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
5475
5476 nxt_port_use(task, port, unlinked ? -2 : -1);
5477
5478generate_error:
5479
5480 nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
5481
5482 nxt_request_rpc_data_unlink(task, req_rpc_data);
5483}
5484
5485
4888 nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
4889
4890 nxt_request_rpc_data_unlink(task, req_rpc_data);
4891}
4892
4893
5486static nxt_int_t
5487nxt_router_http_request_done(nxt_task_t *task, nxt_http_request_t *r)
4894static void
4895nxt_router_http_request_release_post(nxt_task_t *task, nxt_http_request_t *r)
5488{
5489 r->timer.handler = nxt_router_http_request_release;
5490 nxt_timer_add(task->thread->engine, &r->timer, 0);
4896{
4897 r->timer.handler = nxt_router_http_request_release;
4898 nxt_timer_add(task->thread->engine, &r->timer, 0);
5491
5492 return NXT_OK;
5493}
5494
5495
5496static void
5497nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data)
5498{
5499 nxt_http_request_t *r;
5500
4899}
4900
4901
4902static void
4903nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data)
4904{
4905 nxt_http_request_t *r;
4906
5501 nxt_debug(task, "http app release");
4907 nxt_debug(task, "http request pool release");
5502
5503 r = nxt_timer_data(obj, nxt_http_request_t, timer);
5504
5505 nxt_mp_release(r->mem_pool);
5506}
5507
5508
5509static void

--- 78 unchanged lines hidden (view full) ---

5588
5589 return;
5590 }
5591
5592 get_mmap_msg = (nxt_port_msg_get_mmap_t *) msg->buf->mem.pos;
5593
5594 nxt_assert(port->type == NXT_PROCESS_APP);
5595
4908
4909 r = nxt_timer_data(obj, nxt_http_request_t, timer);
4910
4911 nxt_mp_release(r->mem_pool);
4912}
4913
4914
4915static void

--- 78 unchanged lines hidden (view full) ---

4994
4995 return;
4996 }
4997
4998 get_mmap_msg = (nxt_port_msg_get_mmap_t *) msg->buf->mem.pos;
4999
5000 nxt_assert(port->type == NXT_PROCESS_APP);
5001
5596 mmaps = &port->process->outgoing;
5002 if (nxt_slow_path(port->app == NULL)) {
5003 nxt_alert(task, "get_mmap_handler: app == NULL for reply port %PI:%d",
5004 port->pid, port->id);
5005
5006 // FIXME
5007 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
5008 -1, msg->port_msg.stream, 0, NULL);
5009
5010 return;
5011 }
5012
5013 mmaps = &port->app->outgoing;
5597 nxt_thread_mutex_lock(&mmaps->mutex);
5598
5599 if (nxt_slow_path(get_mmap_msg->id >= mmaps->size)) {
5600 nxt_thread_mutex_unlock(&mmaps->mutex);
5601
5602 nxt_alert(task, "get_mmap_handler: mmap id is too big (%d)",
5603 (int) get_mmap_msg->id);
5604
5014 nxt_thread_mutex_lock(&mmaps->mutex);
5015
5016 if (nxt_slow_path(get_mmap_msg->id >= mmaps->size)) {
5017 nxt_thread_mutex_unlock(&mmaps->mutex);
5018
5019 nxt_alert(task, "get_mmap_handler: mmap id is too big (%d)",
5020 (int) get_mmap_msg->id);
5021
5022 // FIXME
5023 nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
5024 -1, msg->port_msg.stream, 0, NULL);
5605 return;
5606 }
5607
5608 mmap_handler = mmaps->elts[get_mmap_msg->id].mmap_handler;
5609
5610 fd = mmap_handler->fd;
5611
5612 nxt_thread_mutex_unlock(&mmaps->mutex);

--- 50 unchanged lines hidden ---
5025 return;
5026 }
5027
5028 mmap_handler = mmaps->elts[get_mmap_msg->id].mmap_handler;
5029
5030 fd = mmap_handler->fd;
5031
5032 nxt_thread_mutex_unlock(&mmaps->mutex);

--- 50 unchanged lines hidden ---