nxt_router.c (1414:5bf805f38a40) nxt_router.c (1446:ad6265786871)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>

--- 721 unchanged lines hidden (view full) ---

730 req_app_link->stream, engine);
731
732 nxt_event_engine_post(engine, &req_app_link->work);
733 }
734}
735
736
737nxt_inline void
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>

--- 721 unchanged lines hidden (view full) ---

730 req_app_link->stream, engine);
731
732 nxt_event_engine_post(engine, &req_app_link->work);
733 }
734}
735
736
737nxt_inline void
738nxt_request_app_link_error(nxt_request_app_link_t *req_app_link, int code,
739 const char *str)
738nxt_request_app_link_error(nxt_task_t *task, nxt_app_t *app,
739 nxt_request_app_link_t *req_app_link, const char *str)
740{
741 req_app_link->app_port = NULL;
740{
741 req_app_link->app_port = NULL;
742 req_app_link->err_code = code;
742 req_app_link->err_code = 500;
743 req_app_link->err_str = str;
743 req_app_link->err_str = str;
744
745 nxt_alert(task, "app \"%V\" internal error: %s on #%uD",
746 &app->name, str, req_app_link->stream);
744}
745
746
747nxt_inline void
748nxt_request_app_link_pending(nxt_task_t *task, nxt_app_t *app,
749 nxt_request_app_link_t *req_app_link)
750{
751 nxt_queue_insert_tail(&req_app_link->app_port->pending_requests,

--- 3152 unchanged lines hidden (view full) ---

3904 }
3905
3906 nxt_thread_mutex_unlock(&app->mutex);
3907
3908 if (req_app_link != NULL) {
3909 nxt_debug(task, "app '%V' %p abort next stream #%uD",
3910 &app->name, app, req_app_link->stream);
3911
747}
748
749
750nxt_inline void
751nxt_request_app_link_pending(nxt_task_t *task, nxt_app_t *app,
752 nxt_request_app_link_t *req_app_link)
753{
754 nxt_queue_insert_tail(&req_app_link->app_port->pending_requests,

--- 3152 unchanged lines hidden (view full) ---

3907 }
3908
3909 nxt_thread_mutex_unlock(&app->mutex);
3910
3911 if (req_app_link != NULL) {
3912 nxt_debug(task, "app '%V' %p abort next stream #%uD",
3913 &app->name, app, req_app_link->stream);
3914
3912 nxt_request_app_link_error(req_app_link, 500,
3915 nxt_request_app_link_error(task, app, req_app_link,
3913 "Failed to start application process");
3914 nxt_request_app_link_use(task, req_app_link, -1);
3915 }
3916}
3917
3918nxt_inline nxt_port_t *
3919nxt_router_app_get_port_for_quit(nxt_app_t *app);
3920

--- 739 unchanged lines hidden (view full) ---

4660 nxt_port_use(task, state->failed_port, state->failed_port_use_delta);
4661 }
4662
4663 if (nxt_slow_path(req_app_link == NULL)) {
4664 if (state->port != NULL) {
4665 nxt_port_use(task, state->port, -1);
4666 }
4667
3916 "Failed to start application process");
3917 nxt_request_app_link_use(task, req_app_link, -1);
3918 }
3919}
3920
3921nxt_inline nxt_port_t *
3922nxt_router_app_get_port_for_quit(nxt_app_t *app);
3923

--- 739 unchanged lines hidden (view full) ---

4663 nxt_port_use(task, state->failed_port, state->failed_port_use_delta);
4664 }
4665
4666 if (nxt_slow_path(req_app_link == NULL)) {
4667 if (state->port != NULL) {
4668 nxt_port_use(task, state->port, -1);
4669 }
4670
4668 nxt_request_app_link_error(state->req_app_link, 500,
4671 nxt_request_app_link_error(task, app, state->req_app_link,
4669 "Failed to allocate shared req<->app link");
4670
4671 return NXT_ERROR;
4672 }
4673
4674 if (state->port != NULL) {
4675 nxt_debug(task, "already have port for app '%V' %p ", &app->name, app);
4676

--- 11 unchanged lines hidden (view full) ---

4688 &app->name, app);
4689
4690 return NXT_AGAIN;
4691 }
4692
4693 res = nxt_router_start_app_process(task, app);
4694
4695 if (nxt_slow_path(res != NXT_OK)) {
4672 "Failed to allocate shared req<->app link");
4673
4674 return NXT_ERROR;
4675 }
4676
4677 if (state->port != NULL) {
4678 nxt_debug(task, "already have port for app '%V' %p ", &app->name, app);
4679

--- 11 unchanged lines hidden (view full) ---

4691 &app->name, app);
4692
4693 return NXT_AGAIN;
4694 }
4695
4696 res = nxt_router_start_app_process(task, app);
4697
4698 if (nxt_slow_path(res != NXT_OK)) {
4696 nxt_request_app_link_error(req_app_link, 500,
4699 nxt_request_app_link_error(task, app, req_app_link,
4697 "Failed to start app process");
4698
4699 return NXT_ERROR;
4700 }
4701
4702 return NXT_AGAIN;
4703}
4704

--- 98 unchanged lines hidden (view full) ---

4803
4804 nxt_assert(req_app_link->app_port != NULL);
4805
4806 port = req_app_link->app_port;
4807 reply_port = req_app_link->reply_port;
4808
4809 apr_action = NXT_APR_REQUEST_FAILED;
4810
4700 "Failed to start app process");
4701
4702 return NXT_ERROR;
4703 }
4704
4705 return NXT_AGAIN;
4706}
4707

--- 98 unchanged lines hidden (view full) ---

4806
4807 nxt_assert(req_app_link->app_port != NULL);
4808
4809 port = req_app_link->app_port;
4810 reply_port = req_app_link->reply_port;
4811
4812 apr_action = NXT_APR_REQUEST_FAILED;
4813
4811 c_port = nxt_process_connected_port_find(port->process, reply_port->pid,
4812 reply_port->id);
4814 c_port = nxt_process_connected_port_find_add(port->process, reply_port);
4815
4813 if (nxt_slow_path(c_port != reply_port)) {
4814 res = nxt_port_send_port(task, port, reply_port, 0);
4815
4816 if (nxt_slow_path(res != NXT_OK)) {
4816 if (nxt_slow_path(c_port != reply_port)) {
4817 res = nxt_port_send_port(task, port, reply_port, 0);
4818
4819 if (nxt_slow_path(res != NXT_OK)) {
4817 nxt_request_app_link_error(req_app_link, 500,
4820 nxt_request_app_link_error(task, port->app, req_app_link,
4818 "Failed to send reply port to application");
4821 "Failed to send reply port to application");
4822
4823 nxt_process_connected_port_remove(port->process, reply_port);
4824
4819 goto release_port;
4820 }
4825 goto release_port;
4826 }
4821
4822 nxt_process_connected_port_add(port->process, reply_port);
4823 }
4824
4825 buf = nxt_router_prepare_msg(task, req_app_link->request, port,
4826 nxt_app_msg_prefix[port->app->type]);
4827
4828 if (nxt_slow_path(buf == NULL)) {
4827 }
4828
4829 buf = nxt_router_prepare_msg(task, req_app_link->request, port,
4830 nxt_app_msg_prefix[port->app->type]);
4831
4832 if (nxt_slow_path(buf == NULL)) {
4829 nxt_request_app_link_error(req_app_link, 500,
4833 nxt_request_app_link_error(task, port->app, req_app_link,
4830 "Failed to prepare message for application");
4831 goto release_port;
4832 }
4833
4834 nxt_debug(task, "about to send %O bytes buffer to app process port %d",
4835 nxt_buf_used_size(buf),
4836 port->socket.fd);
4837

--- 7 unchanged lines hidden (view full) ---

4845 }
4846
4847 buf = req_app_link->msg_info.buf;
4848
4849 res = nxt_port_mmap_get_tracking(task, port,
4850 &req_app_link->msg_info.tracking,
4851 req_app_link->stream);
4852 if (nxt_slow_path(res != NXT_OK)) {
4834 "Failed to prepare message for application");
4835 goto release_port;
4836 }
4837
4838 nxt_debug(task, "about to send %O bytes buffer to app process port %d",
4839 nxt_buf_used_size(buf),
4840 port->socket.fd);
4841

--- 7 unchanged lines hidden (view full) ---

4849 }
4850
4851 buf = req_app_link->msg_info.buf;
4852
4853 res = nxt_port_mmap_get_tracking(task, port,
4854 &req_app_link->msg_info.tracking,
4855 req_app_link->stream);
4856 if (nxt_slow_path(res != NXT_OK)) {
4853 nxt_request_app_link_error(req_app_link, 500,
4857 nxt_request_app_link_error(task, port->app, req_app_link,
4854 "Failed to get tracking area");
4855 goto release_port;
4856 }
4857
4858 if (req_app_link->body_fd != -1) {
4859 nxt_debug(task, "stream #%uD: send body fd %d", req_app_link->stream,
4860 req_app_link->body_fd);
4861
4862 lseek(req_app_link->body_fd, 0, SEEK_SET);
4863 }
4864
4865 res = nxt_port_socket_twrite(task, port, NXT_PORT_MSG_REQ_HEADERS,
4866 req_app_link->body_fd,
4867 req_app_link->stream, reply_port->id, buf,
4868 &req_app_link->msg_info.tracking);
4869
4870 if (nxt_slow_path(res != NXT_OK)) {
4858 "Failed to get tracking area");
4859 goto release_port;
4860 }
4861
4862 if (req_app_link->body_fd != -1) {
4863 nxt_debug(task, "stream #%uD: send body fd %d", req_app_link->stream,
4864 req_app_link->body_fd);
4865
4866 lseek(req_app_link->body_fd, 0, SEEK_SET);
4867 }
4868
4869 res = nxt_port_socket_twrite(task, port, NXT_PORT_MSG_REQ_HEADERS,
4870 req_app_link->body_fd,
4871 req_app_link->stream, reply_port->id, buf,
4872 &req_app_link->msg_info.tracking);
4873
4874 if (nxt_slow_path(res != NXT_OK)) {
4871 nxt_request_app_link_error(req_app_link, 500,
4875 nxt_request_app_link_error(task, port->app, req_app_link,
4872 "Failed to send message to application");
4873 goto release_port;
4874 }
4875
4876release_port:
4877
4878 nxt_router_app_port_release(task, port, apr_action);
4879

--- 512 unchanged lines hidden ---
4876 "Failed to send message to application");
4877 goto release_port;
4878 }
4879
4880release_port:
4881
4882 nxt_router_app_port_release(task, port, apr_action);
4883

--- 512 unchanged lines hidden ---