64,77d63 < struct nxt_port_select_state_s { < nxt_app_t *app; < nxt_request_app_link_t *req_app_link; < < nxt_port_t *failed_port; < int failed_port_use_delta; < < uint8_t start_process; /* 1 bit */ < nxt_request_app_link_t *shared_ra; < nxt_port_t *port; < }; < < typedef struct nxt_port_select_state_s nxt_port_select_state_t; < 84,89d69 < static void nxt_router_port_select(nxt_task_t *task, < nxt_port_select_state_t *state); < < static nxt_int_t nxt_router_port_post_select(nxt_task_t *task, < nxt_port_select_state_t *state); < 91,92d70 < static void nxt_request_app_link_update_peer(nxt_task_t *task, < nxt_request_app_link_t *req_app_link); 94,117d71 < < nxt_inline void < nxt_request_app_link_inc_use(nxt_request_app_link_t *req_app_link) < { < nxt_atomic_fetch_add(&req_app_link->use_count, 1); < } < < nxt_inline void < nxt_request_app_link_chk_use(nxt_request_app_link_t *req_app_link, int i) < { < #if (NXT_DEBUG) < int c; < < c = nxt_atomic_fetch_add(&req_app_link->use_count, i); < < nxt_assert((c + i) > 0); < #else < (void) nxt_atomic_fetch_add(&req_app_link->use_count, i); < #endif < } < < static void nxt_request_app_link_use(nxt_task_t *task, < nxt_request_app_link_t *req_app_link, int i); < 198a153,154 > static void nxt_router_req_headers_ack_handler(nxt_task_t *task, > nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data); 222a179,180 > static nxt_int_t nxt_router_app_shared_port_send(nxt_task_t *task, > nxt_port_t *app_port); 230,231c188,191 < static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app, < nxt_request_app_link_t *req_app_link); --- > static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, > nxt_request_rpc_data_t *req_rpc_data); > static void nxt_router_http_request_done(nxt_task_t *task, void *obj, > void *data); 234c194 < nxt_request_app_link_t *req_app_link); --- > nxt_request_rpc_data_t *req_rpc_data); 236c196 < nxt_http_request_t *r, nxt_port_t *port, const nxt_str_t *prefix); --- > nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix); 253c213 < static nxt_int_t nxt_router_http_request_done(nxt_task_t *task, --- > static void nxt_router_http_request_release_post(nxt_task_t *task, 504,580d463 < nxt_inline void < nxt_request_app_link_init(nxt_task_t *task, < nxt_request_app_link_t *req_app_link, nxt_request_rpc_data_t *req_rpc_data) < { < nxt_buf_t *body; < nxt_event_engine_t *engine; < < engine = task->thread->engine; < < nxt_memzero(req_app_link, sizeof(nxt_request_app_link_t)); < < req_app_link->stream = req_rpc_data->stream; < req_app_link->use_count = 1; < req_app_link->req_rpc_data = req_rpc_data; < req_rpc_data->req_app_link = req_app_link; < req_app_link->reply_port = engine->port; < req_app_link->request = req_rpc_data->request; < req_app_link->apr_action = NXT_APR_GOT_RESPONSE; < < req_app_link->work.handler = NULL; < req_app_link->work.task = &engine->task; < req_app_link->work.obj = req_app_link; < req_app_link->work.data = engine; < < body = req_rpc_data->request->body; < < if (body != NULL && nxt_buf_is_file(body)) { < req_app_link->body_fd = body->file->fd; < < body->file->fd = -1; < < } else { < req_app_link->body_fd = -1; < } < } < < < nxt_inline nxt_request_app_link_t * < nxt_request_app_link_alloc(nxt_task_t *task, < nxt_request_app_link_t *ra_src, nxt_request_rpc_data_t *req_rpc_data) < { < nxt_mp_t *mp; < nxt_request_app_link_t *req_app_link; < < if (ra_src != NULL && ra_src->mem_pool != NULL) { < return ra_src; < } < < mp = req_rpc_data->request->mem_pool; < < req_app_link = nxt_mp_alloc(mp, sizeof(nxt_request_app_link_t)); < < if (nxt_slow_path(req_app_link == NULL)) { < < req_rpc_data->req_app_link = NULL; < < if (ra_src != NULL) { < ra_src->req_rpc_data = NULL; < } < < return NULL; < } < < nxt_mp_retain(mp); < < nxt_request_app_link_init(task, req_app_link, req_rpc_data); < < if (ra_src != NULL) { < req_app_link->body_fd = ra_src->body_fd; < } < < req_app_link->mem_pool = mp; < < return req_app_link; < } < < 617,808d499 < static void < nxt_request_app_link_update_peer_handler(nxt_task_t *task, void *obj, < void *data) < { < nxt_request_app_link_t *req_app_link; < < req_app_link = obj; < < nxt_request_app_link_update_peer(task, req_app_link); < < nxt_request_app_link_use(task, req_app_link, -1); < } < < < static void < nxt_request_app_link_update_peer(nxt_task_t *task, < nxt_request_app_link_t *req_app_link) < { < nxt_event_engine_t *engine; < nxt_request_rpc_data_t *req_rpc_data; < < engine = req_app_link->work.data; < < if (task->thread->engine != engine) { < nxt_request_app_link_inc_use(req_app_link); < < req_app_link->work.handler = nxt_request_app_link_update_peer_handler; < req_app_link->work.task = &engine->task; < req_app_link->work.next = NULL; < < nxt_debug(task, "req_app_link stream #%uD post update peer to %p", < req_app_link->stream, engine); < < nxt_event_engine_post(engine, &req_app_link->work); < < return; < } < < nxt_debug(task, "req_app_link stream #%uD update peer", < req_app_link->stream); < < req_rpc_data = req_app_link->req_rpc_data; < < if (req_rpc_data != NULL && req_app_link->app_port != NULL) { < nxt_port_rpc_ex_set_peer(task, engine->port, req_rpc_data, < req_app_link->app_port->pid); < } < } < < < static void < nxt_request_app_link_release(nxt_task_t *task, < nxt_request_app_link_t *req_app_link) < { < nxt_mp_t *mp; < nxt_http_request_t *r; < nxt_request_rpc_data_t *req_rpc_data; < < nxt_assert(task->thread->engine == req_app_link->work.data); < nxt_assert(req_app_link->use_count == 0); < < nxt_debug(task, "req_app_link stream #%uD release", req_app_link->stream); < < req_rpc_data = req_app_link->req_rpc_data; < < if (req_rpc_data != NULL) { < if (nxt_slow_path(req_app_link->err_code != 0)) { < nxt_http_request_error(task, req_rpc_data->request, < req_app_link->err_code); < < } else { < req_rpc_data->app_port = req_app_link->app_port; < req_rpc_data->apr_action = req_app_link->apr_action; < req_rpc_data->msg_info = req_app_link->msg_info; < < if (req_rpc_data->app->timeout != 0) { < r = req_rpc_data->request; < < r->timer.handler = nxt_router_app_timeout; < r->timer_data = req_rpc_data; < nxt_timer_add(task->thread->engine, &r->timer, < req_rpc_data->app->timeout); < } < < req_app_link->app_port = NULL; < req_app_link->msg_info.buf = NULL; < } < < req_rpc_data->req_app_link = NULL; < req_app_link->req_rpc_data = NULL; < } < < if (req_app_link->app_port != NULL) { < nxt_router_app_port_release(task, req_app_link->app_port, < req_app_link->apr_action); < < req_app_link->app_port = NULL; < } < < if (req_app_link->body_fd != -1) { < nxt_fd_close(req_app_link->body_fd); < < req_app_link->body_fd = -1; < } < < nxt_router_msg_cancel(task, &req_app_link->msg_info, req_app_link->stream); < < mp = req_app_link->mem_pool; < < if (mp != NULL) { < nxt_mp_free(mp, req_app_link); < nxt_mp_release(mp); < } < } < < < static void < nxt_request_app_link_release_handler(nxt_task_t *task, void *obj, void *data) < { < nxt_request_app_link_t *req_app_link; < < req_app_link = obj; < < nxt_assert(req_app_link->work.data == data); < < nxt_request_app_link_use(task, req_app_link, -1); < } < < < static void < nxt_request_app_link_use(nxt_task_t *task, nxt_request_app_link_t *req_app_link, < int i) < { < int c; < nxt_event_engine_t *engine; < < c = nxt_atomic_fetch_add(&req_app_link->use_count, i); < < if (i < 0 && c == -i) { < engine = req_app_link->work.data; < < if (task->thread->engine == engine) { < nxt_request_app_link_release(task, req_app_link); < < return; < } < < nxt_request_app_link_inc_use(req_app_link); < < req_app_link->work.handler = nxt_request_app_link_release_handler; < req_app_link->work.task = &engine->task; < req_app_link->work.next = NULL; < < nxt_debug(task, "req_app_link stream #%uD post release to %p", < req_app_link->stream, engine); < < nxt_event_engine_post(engine, &req_app_link->work); < } < } < < < nxt_inline void < nxt_request_app_link_error(nxt_task_t *task, nxt_app_t *app, < nxt_request_app_link_t *req_app_link, const char *str) < { < req_app_link->app_port = NULL; < req_app_link->err_code = 500; < req_app_link->err_str = str; < < nxt_alert(task, "app \"%V\" internal error: %s on #%uD", < &app->name, str, req_app_link->stream); < } < < < nxt_inline void < nxt_request_app_link_pending(nxt_task_t *task, nxt_app_t *app, < nxt_request_app_link_t *req_app_link) < { < nxt_queue_insert_tail(&req_app_link->app_port->pending_requests, < &req_app_link->link_port_pending); < nxt_queue_insert_tail(&app->pending, &req_app_link->link_app_pending); < < nxt_request_app_link_inc_use(req_app_link); < < req_app_link->res_time = nxt_thread_monotonic_time(task->thread) < + app->res_timeout; < < nxt_debug(task, "req_app_link stream #%uD enqueue to pending_requests", < req_app_link->stream); < } < < 828,829c519 < int ra_use_delta; < nxt_request_app_link_t *req_app_link; --- > nxt_http_request_t *r; 830a521,522 > nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream); > 838c530,531 < nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream); --- > if (req_rpc_data->app != NULL) { > nxt_router_app_use(task, req_rpc_data->app, -1); 840,843c533,534 < req_app_link = req_rpc_data->req_app_link; < if (req_app_link != NULL) { < req_rpc_data->req_app_link = NULL; < req_app_link->req_rpc_data = NULL; --- > req_rpc_data->app = NULL; > } 845c536 < ra_use_delta = 0; --- > r = req_rpc_data->request; 847c538,539 < nxt_thread_mutex_lock(&req_rpc_data->app->mutex); --- > if (r != NULL) { > r->timer_data = NULL; 849,854c541 < if (req_app_link->link_app_requests.next == NULL < && req_app_link->link_port_pending.next == NULL < && req_app_link->link_app_pending.next == NULL < && req_app_link->link_port_websockets.next == NULL) < { < req_app_link = NULL; --- > nxt_router_http_request_release_post(task, r); 856,869c543,544 < } else { < ra_use_delta -= < nxt_queue_chk_remove(&req_app_link->link_app_requests) < + nxt_queue_chk_remove(&req_app_link->link_port_pending) < + nxt_queue_chk_remove(&req_app_link->link_port_websockets); < < nxt_queue_chk_remove(&req_app_link->link_app_pending); < } < < nxt_thread_mutex_unlock(&req_rpc_data->app->mutex); < < if (req_app_link != NULL) { < nxt_request_app_link_use(task, req_app_link, ra_use_delta); < } --- > r->req_rpc_data = NULL; > req_rpc_data->request = NULL; 872,873c547,548 < if (req_rpc_data->app != NULL) { < nxt_router_app_use(task, req_rpc_data->app, -1); --- > if (req_rpc_data->msg_info.body_fd != -1) { > nxt_fd_close(req_rpc_data->msg_info.body_fd); 875c550 < req_rpc_data->app = NULL; --- > req_rpc_data->msg_info.body_fd = -1; 878,879c553,554 < if (req_rpc_data->request != NULL) { < req_rpc_data->request->timer_data = NULL; --- > if (req_rpc_data->rpc_cancel) { > req_rpc_data->rpc_cancel = 0; 881,884c556,557 < nxt_router_http_request_done(task, req_rpc_data->request); < < req_rpc_data->request->req_rpc_data = NULL; < req_rpc_data->request = NULL; --- > nxt_port_rpc_cancel(task, task->thread->engine->port, > req_rpc_data->stream); 891a565,568 > nxt_app_t *app; > nxt_port_t *port, *main_app_port; > nxt_runtime_t *rt; > 894,896c571,573 < if (msg->u.new_port != NULL < && msg->u.new_port->type == NXT_PROCESS_CONTROLLER) < { --- > port = msg->u.new_port; > > if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) { 900,902c577 < if (msg->port_msg.stream == 0) { < return; < } --- > if (port == NULL || port->type != NXT_PROCESS_APP) { 904,906c579,582 < if (msg->u.new_port == NULL < || msg->u.new_port->type != NXT_PROCESS_APP) < { --- > if (msg->port_msg.stream == 0) { > return; > } > 910c586,620 < nxt_port_rpc_handler(task, msg); --- > if (msg->port_msg.stream != 0) { > nxt_port_rpc_handler(task, msg); > return; > } > > /* > * Port with "id == 0" is application 'main' port and it always > * should come with non-zero stream. > */ > nxt_assert(port->id != 0); > > /* Find 'main' app port and get app reference. */ > rt = task->thread->runtime; > > /* > * It is safe to access 'runtime->ports' hash because 'NEW_PORT' > * sent to main port (with id == 0) and processed in main thread. > */ > main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0); > nxt_assert(main_app_port != NULL); > > app = main_app_port->app; > nxt_assert(app != NULL); > > nxt_thread_mutex_lock(&app->mutex); > > /* TODO here should be find-and-add code because there can be > port waiters in port_hash */ > nxt_port_hash_add(&app->port_hash, port); > app->port_hash_count++; > > nxt_thread_mutex_unlock(&app->mutex); > > port->app = app; > port->main_app_port = main_app_port; 1103,1104c813,816 < return app->idle_processes + app->pending_processes < < app->spare_processes; --- > return (app->active_requests > > app->port_hash_count + app->pending_processes) > || (app->spare_processes > > app->idle_processes + app->pending_processes); 1532a1245 > nxt_port_t *port; 1747,1748d1459 < nxt_queue_init(&app->requests); < nxt_queue_init(&app->pending); 1761d1471 < app->max_pending_responses = 2; 1791a1502,1520 > > port = nxt_port_new(task, (nxt_port_id_t) -1, nxt_pid, > NXT_PROCESS_APP); > if (nxt_slow_path(port == NULL)) { > return NXT_ERROR; > } > > ret = nxt_port_socket_init(task, port, 0); > if (nxt_slow_path(ret != NXT_OK)) { > nxt_port_use(task, port, -1); > return NXT_ERROR; > } > > nxt_port_write_enable(task, port); > port->app = app; > > app->shared_port = port; > > nxt_thread_mutex_create(&app->outgoing.mutex); 2524a2254,2258 > > nxt_assert(port != NULL); > nxt_assert(port->type == NXT_PROCESS_APP); > nxt_assert(port->id == 0); > 2525a2260 > port->main_app_port = port; 2534a2270,2271 > nxt_port_hash_add(&app->port_hash, port); > app->port_hash_count++; 2539a2277,2278 > nxt_router_app_shared_port_send(task, port); > 2942,2945c2681,2685 < .rpc_error = nxt_port_rpc_handler, < .mmap = nxt_port_mmap_handler, < .data = nxt_port_rpc_handler, < .oosm = nxt_router_oosm_handler, --- > .rpc_error = nxt_port_rpc_handler, > .mmap = nxt_port_mmap_handler, > .data = nxt_port_rpc_handler, > .oosm = nxt_router_oosm_handler, > .req_headers_ack = nxt_port_rpc_handler, 3738a3479 > nxt_app_t *app; 3745d3485 < nxt_request_app_link_t *req_app_link; 3748d3487 < b = msg->buf; 3751,3754d3489 < if (msg->size == 0) { < b = NULL; < } < 3764a3500,3510 > app = req_rpc_data->app; > nxt_assert(app != NULL); > > if (msg->port_msg.type == _NXT_PORT_MSG_REQ_HEADERS_ACK) { > nxt_router_req_headers_ack_handler(task, msg, req_rpc_data); > > return; > } > > b = (msg->size == 0) ? NULL : msg->buf; > 3769a3516,3518 > req_rpc_data->rpc_cancel = 0; > req_rpc_data->apr_action = NXT_APR_GOT_RESPONSE; > 3773c3522 < if (req_rpc_data->app != NULL && req_rpc_data->app->timeout != 0) { --- > if (app->timeout != 0) { 3776,3777c3525 < nxt_timer_add(task->thread->engine, &r->timer, < req_rpc_data->app->timeout); --- > nxt_timer_add(task->thread->engine, &r->timer, app->timeout); 3873,3889c3621 < req_app_link = nxt_request_app_link_alloc(task, < req_rpc_data->req_app_link, < req_rpc_data); < if (nxt_slow_path(req_app_link == NULL)) { < goto fail; < } < < app_port = req_app_link->app_port; < < if (app_port == NULL && req_rpc_data->app_port != NULL) { < req_app_link->app_port = req_rpc_data->app_port; < app_port = req_app_link->app_port; < req_app_link->apr_action = req_rpc_data->apr_action; < < req_rpc_data->app_port = NULL; < } < --- > app_port = req_rpc_data->app_port; 3894c3626 < nxt_thread_mutex_lock(&req_rpc_data->app->mutex); --- > nxt_thread_mutex_lock(&app->mutex); 3896,3897c3628 < nxt_queue_insert_tail(&app_port->active_websockets, < &req_app_link->link_port_websockets); --- > app_port->main_app_port->active_websockets++; 3899c3630 < nxt_thread_mutex_unlock(&req_rpc_data->app->mutex); --- > nxt_thread_mutex_unlock(&app->mutex); 3902c3633 < req_app_link->apr_action = NXT_APR_CLOSE; --- > req_rpc_data->apr_action = NXT_APR_CLOSE; 3904,3905c3635 < nxt_debug(task, "req_app_link stream #%uD upgrade", < req_app_link->stream); --- > nxt_debug(task, "stream #%uD upgrade", req_rpc_data->stream); 3923a3654,3741 > static void > nxt_router_req_headers_ack_handler(nxt_task_t *task, > nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data) > { > nxt_app_t *app; > nxt_bool_t start_process; > nxt_port_t *app_port, *main_app_port, *idle_port; > nxt_queue_link_t *idle_lnk; > nxt_http_request_t *r; > > nxt_debug(task, "stream #%uD: got ack from %PI:%d", > req_rpc_data->stream, > msg->port_msg.pid, msg->port_msg.reply_port); > > nxt_port_rpc_ex_set_peer(task, msg->port, req_rpc_data, > msg->port_msg.pid); > > app = req_rpc_data->app; > > start_process = 0; > > nxt_thread_mutex_lock(&app->mutex); > > app_port = nxt_port_hash_find(&app->port_hash, msg->port_msg.pid, > msg->port_msg.reply_port); > if (nxt_slow_path(app_port == NULL)) { > nxt_thread_mutex_unlock(&app->mutex); > > r = req_rpc_data->request; > nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR); > > return; > } > > main_app_port = app_port->main_app_port; > > if (nxt_queue_chk_remove(&main_app_port->idle_link)) { > app->idle_processes--; > > /* Check port was in 'spare_ports' using idle_start field. */ > if (main_app_port->idle_start == 0 > && app->idle_processes >= app->spare_processes) > { > /* > * If there is a vacant space in spare ports, > * move the last idle to spare_ports. > */ > nxt_assert(!nxt_queue_is_empty(&app->idle_ports)); > > idle_lnk = nxt_queue_last(&app->idle_ports); > idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link); > nxt_queue_remove(idle_lnk); > > nxt_queue_insert_tail(&app->spare_ports, idle_lnk); > > idle_port->idle_start = 0; > } > > if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) { > app->pending_processes++; > start_process = 1; > } > } > > main_app_port->active_requests++; > > nxt_port_inc_use(app_port); > > nxt_thread_mutex_unlock(&app->mutex); > > if (start_process) { > nxt_router_start_app_process(task, app); > } > > nxt_port_use(task, req_rpc_data->app_port, -1); > > req_rpc_data->app_port = app_port; > > if (app->timeout != 0) { > r = req_rpc_data->request; > > r->timer.handler = nxt_router_app_timeout; > r->timer_data = req_rpc_data; > nxt_timer_add(task->thread->engine, &r->timer, app->timeout); > } > } > > 3952,3955d3769 < nxt_int_t res; < nxt_port_t *port; < nxt_bool_t cancelled; < nxt_request_app_link_t *req_app_link; 3960c3774 < req_app_link = req_rpc_data->req_app_link; --- > req_rpc_data->rpc_cancel = 0; 3962,3966c3776,3777 < if (req_app_link != NULL) { < cancelled = nxt_router_msg_cancel(task, &req_app_link->msg_info, < req_app_link->stream); < if (cancelled) { < res = nxt_router_app_port(task, req_rpc_data->app, req_app_link); --- > /* TODO cancel message and return if cancelled. */ > // nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream); 3968,3988d3778 < if (res == NXT_OK) { < port = req_app_link->app_port; < < if (nxt_slow_path(port == NULL)) { < nxt_log(task, NXT_LOG_ERR, < "port is NULL in cancelled req_app_link"); < return; < } < < nxt_port_rpc_ex_set_peer(task, task->thread->engine->port, < req_rpc_data, port->pid); < < nxt_router_app_prepare_request(task, req_app_link); < } < < msg->port_msg.last = 0; < < return; < } < } < 4010a3801,3802 > nxt_assert(port->type == NXT_PROCESS_APP); > nxt_assert(port->id == 0); 4024a3817 > port->main_app_port = port; 4031a3825,3826 > nxt_port_hash_add(&app->port_hash, port); > app->port_hash_count++; 4037a3833,3834 > nxt_router_app_shared_port_send(task, port); > 4041a3839,3872 > static nxt_int_t > nxt_router_app_shared_port_send(nxt_task_t *task, nxt_port_t *app_port) > { > nxt_buf_t *b; > nxt_port_t *port; > nxt_port_msg_new_port_t *msg; > > b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, > sizeof(nxt_port_data_t)); > if (nxt_slow_path(b == NULL)) { > return NXT_ERROR; > } > > port = app_port->app->shared_port; > > nxt_debug(task, "send port %FD to process %PI", > port->pair[0], app_port->pid); > > b->mem.free += sizeof(nxt_port_msg_new_port_t); > msg = (nxt_port_msg_new_port_t *) b->mem.pos; > > msg->id = port->id; > msg->pid = port->pid; > msg->max_size = port->max_size; > msg->max_share = port->max_share; > msg->type = port->type; > > return nxt_port_socket_twrite(task, app_port, > NXT_PORT_MSG_NEW_PORT, > port->pair[0], > 0, 0, b, NULL); > } > > 4046,4049c3877,3878 < nxt_app_t *app; < nxt_app_joint_t *app_joint; < nxt_queue_link_t *lnk; < nxt_request_app_link_t *req_app_link; --- > nxt_app_t *app; > nxt_app_joint_t *app_joint; 4073,4084d3901 < if (!nxt_queue_is_empty(&app->requests)) { < lnk = nxt_queue_last(&app->requests); < nxt_queue_remove(lnk); < lnk->next = NULL; < < req_app_link = nxt_queue_link_data(lnk, nxt_request_app_link_t, < link_app_requests); < < } else { < req_app_link = NULL; < } < 4087,4094c3904 < if (req_app_link != NULL) { < nxt_debug(task, "app '%V' %p abort next stream #%uD", < &app->name, app, req_app_link->stream); < < nxt_request_app_link_error(task, app, req_app_link, < "Failed to start application process"); < nxt_request_app_link_use(task, req_app_link, -1); < } --- > /* TODO req_app_link to cancel first pending message */ 4097,4098d3906 < nxt_inline nxt_port_t * < nxt_router_app_get_port_for_quit(nxt_app_t *app); 4119,4131d3926 < nxt_inline nxt_bool_t < nxt_router_app_first_port_busy(nxt_app_t *app) < { < nxt_port_t *port; < nxt_queue_link_t *lnk; < < lnk = nxt_queue_first(&app->ports); < port = nxt_queue_link_data(lnk, nxt_port_t, app_link); < < return port->app_pending_responses > 0; < } < < 4133,4176d3927 < nxt_router_pop_first_port(nxt_app_t *app) < { < nxt_port_t *port; < nxt_queue_link_t *lnk; < < lnk = nxt_queue_first(&app->ports); < nxt_queue_remove(lnk); < < port = nxt_queue_link_data(lnk, nxt_port_t, app_link); < < port->app_pending_responses++; < < if (nxt_queue_chk_remove(&port->idle_link)) { < app->idle_processes--; < < if (port->idle_start == 0) { < nxt_assert(app->idle_processes < app->spare_processes); < < } else { < nxt_assert(app->idle_processes >= app->spare_processes); < < port->idle_start = 0; < } < } < < if ((app->max_pending_responses == 0 < || port->app_pending_responses < app->max_pending_responses) < && (app->max_requests == 0 < || port->app_responses + port->app_pending_responses < < app->max_requests)) < { < nxt_queue_insert_tail(&app->ports, lnk); < < nxt_port_inc_use(port); < < } else { < lnk->next = NULL; < } < < return port; < } < < < nxt_inline nxt_port_t * 4187,4192d3937 < if (port->app_pending_responses > 0) { < port = NULL; < < continue; < } < 4199a3945,3947 > nxt_port_hash_remove(&app->port_hash, port); > app->port_hash_count--; > 4225,4252d3972 < nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data) < { < nxt_request_app_link_t *req_app_link; < < req_app_link = data; < < #if (NXT_DEBUG) < { < nxt_app_t *app; < < app = obj; < < nxt_assert(app != NULL); < nxt_assert(req_app_link != NULL); < nxt_assert(req_app_link->app_port != NULL); < < nxt_debug(task, "app '%V' %p process next stream #%uD", < &app->name, app, req_app_link->stream); < } < #endif < < nxt_router_app_prepare_request(task, req_app_link); < < nxt_request_app_link_use(task, req_app_link, -1); < } < < < static void 4256,4263c3976,3980 < int inc_use; < uint32_t dec_pending, got_response; < nxt_app_t *app; < nxt_bool_t port_unchained; < nxt_bool_t send_quit, cancelled, adjust_idle_timer; < nxt_queue_link_t *lnk; < nxt_request_app_link_t *req_app_link, *pending_ra, *re_ra; < nxt_port_select_state_t state; --- > int inc_use; > uint32_t got_response, dec_requests; > nxt_app_t *app; > nxt_bool_t port_unchained, send_quit, adjust_idle_timer; > nxt_port_t *main_app_port; 4268,4269d3984 < req_app_link = NULL; < 4273d3987 < dec_pending = 0; 4274a3989 > dec_requests = 0; 4280c3995 < dec_pending = 1; --- > dec_requests = 1; 4284d3998 < dec_pending = 1; 4289d4002 < dec_pending = 1; 4297c4010,4012 < nxt_thread_mutex_lock(&app->mutex); --- > nxt_debug(task, "app '%V' release port %PI:%d: %d %d", &app->name, > port->pid, port->id, > (int) inc_use, (int) got_response); 4299,4300c4014,4015 < port->app_pending_responses -= dec_pending; < port->app_responses += got_response; --- > if (port == app->shared_port) { > nxt_thread_mutex_lock(&app->mutex); 4302,4311c4017 < if (port->pair[1] != -1 < && (app->max_pending_responses == 0 < || port->app_pending_responses < app->max_pending_responses) < && (app->max_requests == 0 < || port->app_responses + port->app_pending_responses < < app->max_requests)) < { < if (port->app_link.next == NULL) { < if (port->app_pending_responses > 0) { < nxt_queue_insert_tail(&app->ports, &port->app_link); --- > app->active_requests -= got_response + dec_requests; 4313,4315c4019 < } else { < nxt_queue_insert_head(&app->ports, &port->app_link); < } --- > nxt_thread_mutex_unlock(&app->mutex); 4317,4326c4021 < nxt_port_inc_use(port); < < } else { < if (port->app_pending_responses == 0 < && nxt_queue_first(&app->ports) != &port->app_link) < { < nxt_queue_remove(&port->app_link); < nxt_queue_insert_head(&app->ports, &port->app_link); < } < } --- > goto adjust_use; 4329,4334c4024 < if (!nxt_queue_is_empty(&app->ports) < && !nxt_queue_is_empty(&app->requests)) < { < lnk = nxt_queue_first(&app->requests); < nxt_queue_remove(lnk); < lnk->next = NULL; --- > main_app_port = port->main_app_port; 4336,4337c4026 < req_app_link = nxt_queue_link_data(lnk, nxt_request_app_link_t, < link_app_requests); --- > nxt_thread_mutex_lock(&app->mutex); 4339c4028,4030 < req_app_link->app_port = nxt_router_pop_first_port(app); --- > main_app_port->app_responses += got_response; > main_app_port->active_requests -= got_response + dec_requests; > app->active_requests -= got_response + dec_requests; 4341,4348c4032,4034 < if (req_app_link->app_port->app_pending_responses > 1) { < nxt_request_app_link_pending(task, app, req_app_link); < } < } < < /* Pop first pending request for this port. */ < if (dec_pending > 0 < && !nxt_queue_is_empty(&port->pending_requests)) --- > if (main_app_port->pair[1] != -1 > && (app->max_requests == 0 > || main_app_port->app_responses < app->max_requests)) 4350,4352c4036,4037 < lnk = nxt_queue_first(&port->pending_requests); < nxt_queue_remove(lnk); < lnk->next = NULL; --- > if (main_app_port->app_link.next == NULL) { > nxt_queue_insert_tail(&app->ports, &main_app_port->app_link); 4354,4395c4039 < pending_ra = nxt_queue_link_data(lnk, nxt_request_app_link_t, < link_port_pending); < < nxt_assert(pending_ra->link_app_pending.next != NULL); < < nxt_queue_remove(&pending_ra->link_app_pending); < pending_ra->link_app_pending.next = NULL; < < } else { < pending_ra = NULL; < } < < /* Try to cancel and re-schedule first stalled request for this app. */ < if (got_response > 0 && !nxt_queue_is_empty(&app->pending)) { < lnk = nxt_queue_first(&app->pending); < < re_ra = nxt_queue_link_data(lnk, nxt_request_app_link_t, < link_app_pending); < < if (re_ra->res_time <= nxt_thread_monotonic_time(task->thread)) { < < nxt_debug(task, "app '%V' stalled request #%uD detected", < &app->name, re_ra->stream); < < cancelled = nxt_router_msg_cancel(task, &re_ra->msg_info, < re_ra->stream); < < if (cancelled) { < state.req_app_link = re_ra; < state.app = app; < < /* < * Need to increment use count "in advance" because < * nxt_router_port_select() will remove re_ra from lists < * and decrement use count. < */ < nxt_request_app_link_inc_use(re_ra); < < nxt_router_port_select(task, &state); < < goto re_ra_cancelled; < } --- > nxt_port_inc_use(main_app_port); 4399,4402d4042 < re_ra = NULL; < < re_ra_cancelled: < 4404,4405c4044 < && port->app_pending_responses == 0 < && port->app_responses >= app->max_requests); --- > && main_app_port->app_responses >= app->max_requests); 4408c4047 < port_unchained = nxt_queue_chk_remove(&port->app_link); --- > port_unchained = nxt_queue_chk_remove(&main_app_port->app_link); 4410c4049,4052 < port->app = NULL; --- > nxt_port_hash_remove(&app->port_hash, main_app_port); > app->port_hash_count--; > > main_app_port->app = NULL; 4419,4421c4061,4064 < if (port->pair[1] != -1 && !send_quit && port->app_pending_responses == 0 < && nxt_queue_is_empty(&port->active_websockets) < && port->idle_link.next == NULL) --- > if (main_app_port->pair[1] != -1 && !send_quit > && main_app_port->active_requests == 0 > && main_app_port->active_websockets == 0 > && main_app_port->idle_link.next == NULL) 4432c4075 < nxt_queue_insert_tail(&app->spare_ports, &port->idle_link); --- > nxt_queue_insert_tail(&app->spare_ports, &main_app_port->idle_link); 4435c4078 < nxt_queue_insert_tail(&app->idle_ports, &port->idle_link); --- > nxt_queue_insert_tail(&app->idle_ports, &main_app_port->idle_link); 4437c4080 < port->idle_start = task->thread->engine->timers.now; --- > main_app_port->idle_start = task->thread->engine->timers.now; 4450,4486d4092 < if (pending_ra != NULL) { < nxt_request_app_link_use(task, pending_ra, -1); < } < < if (re_ra != NULL) { < if (nxt_router_port_post_select(task, &state) == NXT_OK) { < /* < * Reference counter already incremented above, this will < * keep re_ra while nxt_router_app_process_request() < * task is in queue. Reference counter decreased in < * nxt_router_app_process_request() after processing. < */ < < nxt_work_queue_add(&task->thread->engine->fast_work_queue, < nxt_router_app_process_request, < &task->thread->engine->task, app, re_ra); < < } else { < nxt_request_app_link_use(task, re_ra, -1); < } < } < < if (req_app_link != NULL) { < /* < * There should be call nxt_request_app_link_inc_use(req_app_link), < * because of one more link in the queue. But one link was < * recently removed from app->requests linked list. < * Corresponding decrement is in nxt_router_app_process_request(). < */ < < nxt_work_queue_add(&task->thread->engine->fast_work_queue, < nxt_router_app_process_request, < &task->thread->engine->task, app, req_app_link); < < goto adjust_use; < } < 4488c4094 < if (port->pair[1] == -1) { --- > if (main_app_port->pair[1] == -1) { 4490c4096 < &app->name, app, port, port->pid); --- > &app->name, app, main_app_port, main_app_port->pid); 4496,4497c4102 < nxt_debug(task, "app '%V' %p send QUIT to port", < &app->name, app); --- > nxt_debug(task, "app '%V' %p send QUIT to port", &app->name, app); 4499,4500c4104,4105 < nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, < -1, 0, 0, NULL); --- > nxt_port_socket_write(task, main_app_port, NXT_PORT_MSG_QUIT, -1, 0, 0, > NULL); 4503c4108 < nxt_port_use(task, port, -1); --- > nxt_port_use(task, main_app_port, -1); 4531a4137,4148 > nxt_port_hash_remove(&app->port_hash, port); > app->port_hash_count--; > > if (port->id != 0) { > nxt_thread_mutex_unlock(&app->mutex); > > nxt_debug(task, "app '%V' port (%PI, %d) closed", &app->name, > port->pid, port->id); > > return; > } > 4556,4557c4173 < && (!nxt_queue_is_empty(&app->requests) < || nxt_router_app_need_start(app)); --- > && nxt_router_app_need_start(app); 4605a4222,4225 > nxt_debug(task, "app '%V' idle_processes %d, spare_processes %d", > &app->name, > (int) app->idle_processes, (int) app->spare_processes); > 4614a4235,4238 > nxt_debug(task, "app '%V' pid %PI, start %M, timeout %M, threshold %M", > &app->name, port->pid, > port->idle_start, timeout, threshold); > 4623a4248,4250 > nxt_port_hash_remove(&app->port_hash, port); > app->port_hash_count--; > 4706a4334,4335 > nxt_assert(app->active_requests == 0); > nxt_assert(app->port_hash_count == 0); 4708d4336 < nxt_assert(nxt_queue_is_empty(&app->requests)); 4712a4341,4350 > nxt_port_mmaps_destroy(&app->outgoing, 1); > > nxt_thread_mutex_destroy(&app->outgoing.mutex); > > if (app->shared_port != NULL) { > app->shared_port->app = NULL; > nxt_port_close(task, app->shared_port); > nxt_port_use(task, app->shared_port, -1); > } > 4729c4367,4368 < nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state) --- > nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app, > nxt_request_rpc_data_t *req_rpc_data) 4731,4734c4370,4371 < int ra_use_delta; < nxt_app_t *app; < nxt_bool_t can_start_process; < nxt_request_app_link_t *req_app_link; --- > nxt_bool_t start_process; > nxt_port_t *port; 4736,4737c4373 < req_app_link = state->req_app_link; < app = state->app; --- > start_process = 0; 4739,4740c4375 < state->failed_port_use_delta = 0; < ra_use_delta = -nxt_queue_chk_remove(&req_app_link->link_app_requests); --- > nxt_thread_mutex_lock(&app->mutex); 4742,4744c4377,4378 < if (nxt_queue_chk_remove(&req_app_link->link_port_pending)) < { < nxt_assert(req_app_link->link_app_pending.next != NULL); --- > port = app->shared_port; > nxt_port_inc_use(port); 4746,4747c4380 < nxt_queue_remove(&req_app_link->link_app_pending); < req_app_link->link_app_pending.next = NULL; --- > app->active_requests++; 4749c4382,4384 < ra_use_delta--; --- > if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) { > app->pending_processes++; > start_process = 1; 4752c4387 < state->failed_port = req_app_link->app_port; --- > nxt_thread_mutex_unlock(&app->mutex); 4754,4755c4389,4390 < if (req_app_link->app_port != NULL) { < state->failed_port_use_delta--; --- > req_rpc_data->app_port = port; > req_rpc_data->apr_action = NXT_APR_REQUEST_FAILED; 4757,4763c4392,4393 < state->failed_port->app_pending_responses--; < < if (nxt_queue_chk_remove(&state->failed_port->app_link)) { < state->failed_port_use_delta--; < } < < req_app_link->app_port = NULL; --- > if (start_process) { > nxt_router_start_app_process(task, app); 4765,4824d4394 < < can_start_process = nxt_router_app_can_start(app); < < state->port = NULL; < state->start_process = 0; < < if (nxt_queue_is_empty(&app->ports) < || (can_start_process && nxt_router_app_first_port_busy(app)) ) < { < req_app_link = nxt_request_app_link_alloc(task, req_app_link, < req_app_link->req_rpc_data); < if (nxt_slow_path(req_app_link == NULL)) { < goto fail; < } < < if (nxt_slow_path(state->failed_port != NULL)) { < nxt_queue_insert_head(&app->requests, < &req_app_link->link_app_requests); < < } else { < nxt_queue_insert_tail(&app->requests, < &req_app_link->link_app_requests); < } < < nxt_request_app_link_inc_use(req_app_link); < < nxt_debug(task, "req_app_link stream #%uD enqueue to app->requests", < req_app_link->stream); < < if (can_start_process) { < app->pending_processes++; < state->start_process = 1; < } < < } else { < state->port = nxt_router_pop_first_port(app); < < if (state->port->app_pending_responses > 1) { < req_app_link = nxt_request_app_link_alloc(task, req_app_link, < req_app_link->req_rpc_data); < if (nxt_slow_path(req_app_link == NULL)) { < goto fail; < } < < req_app_link->app_port = state->port; < < nxt_request_app_link_pending(task, app, req_app_link); < } < < if (can_start_process && nxt_router_app_need_start(app)) { < app->pending_processes++; < state->start_process = 1; < } < } < < nxt_request_app_link_chk_use(req_app_link, ra_use_delta); < < fail: < < state->shared_ra = req_app_link; 4828,4903d4397 < static nxt_int_t < nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state) < { < nxt_int_t res; < nxt_app_t *app; < nxt_request_app_link_t *req_app_link; < < req_app_link = state->shared_ra; < app = state->app; < < if (state->failed_port_use_delta != 0) { < nxt_port_use(task, state->failed_port, state->failed_port_use_delta); < } < < if (nxt_slow_path(req_app_link == NULL)) { < if (state->port != NULL) { < nxt_port_use(task, state->port, -1); < } < < nxt_request_app_link_error(task, app, state->req_app_link, < "Failed to allocate shared req<->app link"); < < return NXT_ERROR; < } < < if (state->port != NULL) { < nxt_debug(task, "already have port for app '%V' %p ", &app->name, app); < < req_app_link->app_port = state->port; < < if (state->start_process) { < nxt_router_start_app_process(task, app); < } < < return NXT_OK; < } < < if (!state->start_process) { < nxt_debug(task, "app '%V' %p too many running or pending processes", < &app->name, app); < < return NXT_AGAIN; < } < < res = nxt_router_start_app_process(task, app); < < if (nxt_slow_path(res != NXT_OK)) { < nxt_request_app_link_error(task, app, req_app_link, < "Failed to start app process"); < < return NXT_ERROR; < } < < return NXT_AGAIN; < } < < < static nxt_int_t < nxt_router_app_port(nxt_task_t *task, nxt_app_t *app, < nxt_request_app_link_t *req_app_link) < { < nxt_port_select_state_t state; < < state.req_app_link = req_app_link; < state.app = app; < < nxt_thread_mutex_lock(&app->mutex); < < nxt_router_port_select(task, &state); < < nxt_thread_mutex_unlock(&app->mutex); < < return nxt_router_port_post_select(task, &state); < } < < 4908,4909d4401 < nxt_int_t res; < nxt_port_t *port; 4911d4402 < nxt_request_app_link_t ra_local, *req_app_link; 4930c4421 < * nxt_router_http_request_done() -> --- > * nxt_router_http_request_release_post() -> 4941a4433,4434 > req_rpc_data->msg_info.body_fd = -1; > req_rpc_data->rpc_cancel = 1; 4948,4949c4441,4443 < req_app_link = &ra_local; < nxt_request_app_link_init(task, req_app_link, req_rpc_data); --- > if (r->last != NULL) { > r->last->completion_handler = nxt_router_http_request_done; > } 4951,4952c4445,4447 < res = nxt_router_app_port(task, app, req_app_link); < req_app_link = req_rpc_data->req_app_link; --- > nxt_router_app_port_get(task, app, req_rpc_data); > nxt_router_app_prepare_request(task, req_rpc_data); > } 4954,4955d4448 < if (res == NXT_OK) { < port = req_app_link->app_port; 4957c4450,4453 < nxt_assert(port != NULL); --- > static void > nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data) > { > nxt_http_request_t *r; 4959c4455 < nxt_port_rpc_ex_set_peer(task, engine->port, req_rpc_data, port->pid); --- > r = data; 4961c4457,4460 < nxt_router_app_prepare_request(task, req_app_link); --- > nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data); > > if (r->req_rpc_data) { > nxt_request_rpc_data_unlink(task, r->req_rpc_data); 4964c4463 < nxt_request_app_link_use(task, req_app_link, -1); --- > nxt_http_request_close_handler(task, r, r->proto.any); 4976c4475 < nxt_request_app_link_t *req_app_link) --- > nxt_request_rpc_data_t *req_rpc_data) 4978c4477,4478 < nxt_buf_t *buf; --- > nxt_app_t *app; > nxt_buf_t *buf, *body; 4981d4480 < nxt_apr_action_t apr_action; 4983c4482 < nxt_assert(req_app_link->app_port != NULL); --- > app = req_rpc_data->app; 4985,4986c4484 < port = req_app_link->app_port; < reply_port = req_app_link->reply_port; --- > nxt_assert(app != NULL); 4988c4486 < apr_action = NXT_APR_REQUEST_FAILED; --- > port = req_rpc_data->app_port; 4990,4991c4488 < buf = nxt_router_prepare_msg(task, req_app_link->request, port, < nxt_app_msg_prefix[port->app->type]); --- > nxt_assert(port != NULL); 4992a4490,4493 > reply_port = task->thread->engine->port; > > buf = nxt_router_prepare_msg(task, req_rpc_data->request, app, > nxt_app_msg_prefix[app->type]); 4994,4996c4495,4501 < nxt_request_app_link_error(task, port->app, req_app_link, < "Failed to prepare message for application"); < goto release_port; --- > nxt_alert(task, "stream #%uD, app '%V': failed to prepare app message", > req_rpc_data->stream, &app->name); > > nxt_http_request_error(task, req_rpc_data->request, > NXT_HTTP_INTERNAL_SERVER_ERROR); > > return; 5003c4508,4509 < apr_action = NXT_APR_NEW_PORT; --- > req_rpc_data->msg_info.buf = buf; > req_rpc_data->msg_info.completion_handler = buf->completion_handler; 5005,5008c4511 < req_app_link->msg_info.buf = buf; < req_app_link->msg_info.completion_handler = buf->completion_handler; < < for (; buf; buf = buf->next) { --- > do { 5010c4513,4514 < } --- > buf = buf->next; > } while (buf != NULL); 5012c4516 < buf = req_app_link->msg_info.buf; --- > buf = req_rpc_data->msg_info.buf; 5014,5021c4518 < res = nxt_port_mmap_get_tracking(task, &port->process->outgoing, < &req_app_link->msg_info.tracking, < req_app_link->stream); < if (nxt_slow_path(res != NXT_OK)) { < nxt_request_app_link_error(task, port->app, req_app_link, < "Failed to get tracking area"); < goto release_port; < } --- > body = req_rpc_data->request->body; 5023,5025c4520,4521 < if (req_app_link->body_fd != -1) { < nxt_debug(task, "stream #%uD: send body fd %d", req_app_link->stream, < req_app_link->body_fd); --- > if (body != NULL && nxt_buf_is_file(body)) { > req_rpc_data->msg_info.body_fd = body->file->fd; 5027c4523,4526 < lseek(req_app_link->body_fd, 0, SEEK_SET); --- > body->file->fd = -1; > > } else { > req_rpc_data->msg_info.body_fd = -1; 5030,5033c4529,4531 < res = nxt_port_socket_twrite(task, port, NXT_PORT_MSG_REQ_HEADERS, < req_app_link->body_fd, < req_app_link->stream, reply_port->id, buf, < &req_app_link->msg_info.tracking); --- > if (req_rpc_data->msg_info.body_fd != -1) { > nxt_debug(task, "stream #%uD: send body fd %d", req_rpc_data->stream, > req_rpc_data->msg_info.body_fd); 5035,5038c4533 < if (nxt_slow_path(res != NXT_OK)) { < nxt_request_app_link_error(task, port->app, req_app_link, < "Failed to send message to application"); < goto release_port; --- > lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET); 5041c4536,4540 < release_port: --- > res = nxt_port_socket_twrite(task, port, > NXT_PORT_MSG_REQ_HEADERS, > req_rpc_data->msg_info.body_fd, > req_rpc_data->stream, reply_port->id, buf, > NULL); 5043c4542,4544 < nxt_router_app_port_release(task, port, apr_action); --- > if (nxt_slow_path(res != NXT_OK)) { > nxt_alert(task, "stream #%uD, app '%V': failed to send app message", > req_rpc_data->stream, &app->name); 5045c4546,4548 < nxt_request_app_link_update_peer(task, req_app_link); --- > nxt_http_request_error(task, req_rpc_data->request, > NXT_HTTP_INTERNAL_SERVER_ERROR); > } 5103c4606 < nxt_port_t *port, const nxt_str_t *prefix) --- > nxt_app_t *app, const nxt_str_t *prefix) 5144c4647 < out = nxt_port_mmap_get_buf(task, &port->process->outgoing, --- > out = nxt_port_mmap_get_buf(task, &app->outgoing, 5326,5327c4829 < buf = nxt_port_mmap_get_buf(task, &port->process->outgoing, < free_size); --- > buf = nxt_port_mmap_get_buf(task, &app->outgoing, free_size); 5375,5377d4876 < nxt_app_t *app; < nxt_bool_t cancelled, unlinked; < nxt_port_t *port; 5379d4877 < nxt_queue_link_t *lnk; 5381d4878 < nxt_request_app_link_t *pending_ra; 5383d4879 < nxt_port_select_state_t state; 5391d4886 < app = req_rpc_data->app; 5393,5479d4887 < if (app == NULL) { < goto generate_error; < } < < port = NULL; < pending_ra = NULL; < < if (req_rpc_data->app_port != NULL) { < port = req_rpc_data->app_port; < req_rpc_data->app_port = NULL; < } < < if (port == NULL && req_rpc_data->req_app_link != NULL < && req_rpc_data->req_app_link->app_port != NULL) < { < port = req_rpc_data->req_app_link->app_port; < req_rpc_data->req_app_link->app_port = NULL; < } < < if (port == NULL) { < goto generate_error; < } < < nxt_thread_mutex_lock(&app->mutex); < < unlinked = nxt_queue_chk_remove(&port->app_link); < < if (!nxt_queue_is_empty(&port->pending_requests)) { < lnk = nxt_queue_first(&port->pending_requests); < < pending_ra = nxt_queue_link_data(lnk, nxt_request_app_link_t, < link_port_pending); < < nxt_assert(pending_ra->link_app_pending.next != NULL); < < nxt_debug(task, "app '%V' pending request #%uD found", < &app->name, pending_ra->stream); < < cancelled = nxt_router_msg_cancel(task, &pending_ra->msg_info, < pending_ra->stream); < < if (cancelled) { < state.req_app_link = pending_ra; < state.app = app; < < /* < * Need to increment use count "in advance" because < * nxt_router_port_select() will remove pending_ra from lists < * and decrement use count. < */ < nxt_request_app_link_inc_use(pending_ra); < < nxt_router_port_select(task, &state); < < } else { < pending_ra = NULL; < } < } < < nxt_thread_mutex_unlock(&app->mutex); < < if (pending_ra != NULL) { < if (nxt_router_port_post_select(task, &state) == NXT_OK) { < /* < * Reference counter already incremented above, this will < * keep pending_ra while nxt_router_app_process_request() < * task is in queue. Reference counter decreased in < * nxt_router_app_process_request() after processing. < */ < < nxt_work_queue_add(&task->thread->engine->fast_work_queue, < nxt_router_app_process_request, < &task->thread->engine->task, app, pending_ra); < < } else { < nxt_request_app_link_use(task, pending_ra, -1); < } < } < < nxt_debug(task, "send quit to app '%V' pid %PI", &app->name, port->pid); < < nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); < < nxt_port_use(task, port, unlinked ? -2 : -1); < < generate_error: < 5486,5487c4894,4895 < static nxt_int_t < nxt_router_http_request_done(nxt_task_t *task, nxt_http_request_t *r) --- > static void > nxt_router_http_request_release_post(nxt_task_t *task, nxt_http_request_t *r) 5491,5492d4898 < < return NXT_OK; 5501c4907 < nxt_debug(task, "http app release"); --- > nxt_debug(task, "http request pool release"); 5596c5002,5013 < mmaps = &port->process->outgoing; --- > if (nxt_slow_path(port->app == NULL)) { > nxt_alert(task, "get_mmap_handler: app == NULL for reply port %PI:%d", > port->pid, port->id); > > // FIXME > nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, > -1, msg->port_msg.stream, 0, NULL); > > return; > } > > mmaps = &port->app->outgoing; 5604a5022,5024 > // FIXME > nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, > -1, msg->port_msg.stream, 0, NULL);