nxt_router.c (1552:a363564c527c) nxt_router.c (1555:1d84b9e4b459)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>
9#include <nxt_conf.h>
10#if (NXT_TLS)
11#include <nxt_cert.h>
12#endif
13#include <nxt_http.h>
14#include <nxt_port_memory_int.h>
15#include <nxt_unit_request.h>
16#include <nxt_unit_response.h>
17#include <nxt_router_request.h>
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>
9#include <nxt_conf.h>
10#if (NXT_TLS)
11#include <nxt_cert.h>
12#endif
13#include <nxt_http.h>
14#include <nxt_port_memory_int.h>
15#include <nxt_unit_request.h>
16#include <nxt_unit_response.h>
17#include <nxt_router_request.h>
18#include <nxt_app_queue.h>
19#include <nxt_port_queue.h>
18
19typedef struct {
20 nxt_str_t type;
21 uint32_t processes;
22 uint32_t max_processes;
23 uint32_t spare_processes;
24 nxt_msec_t timeout;
25 nxt_msec_t res_timeout;

--- 61 unchanged lines hidden (view full) ---

87static void nxt_router_conf_send(nxt_task_t *task,
88 nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
89
90static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
91 nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
92static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task,
93 nxt_router_conf_t *rtcf, nxt_conf_value_t *conf);
94static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
20
21typedef struct {
22 nxt_str_t type;
23 uint32_t processes;
24 uint32_t max_processes;
25 uint32_t spare_processes;
26 nxt_msec_t timeout;
27 nxt_msec_t res_timeout;

--- 61 unchanged lines hidden (view full) ---

89static void nxt_router_conf_send(nxt_task_t *task,
90 nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
91
92static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
93 nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
94static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task,
95 nxt_router_conf_t *rtcf, nxt_conf_value_t *conf);
96static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
97static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task,
98 nxt_port_t *port);
99static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task,
100 nxt_port_t *port);
101static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task,
102 nxt_port_t *port, nxt_fd_t fd);
95static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
96 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
97static void nxt_router_listen_socket_ready(nxt_task_t *task,
98 nxt_port_recv_msg_t *msg, void *data);
99static void nxt_router_listen_socket_error(nxt_task_t *task,
100 nxt_port_recv_msg_t *msg, void *data);
101#if (NXT_TLS)
102static void nxt_router_tls_rpc_create(nxt_task_t *task,

--- 365 unchanged lines hidden (view full) ---

468
469 nxt_router_app_use(task, app, -1);
470
471 return NXT_ERROR;
472}
473
474
475nxt_inline nxt_bool_t
103static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
104 nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
105static void nxt_router_listen_socket_ready(nxt_task_t *task,
106 nxt_port_recv_msg_t *msg, void *data);
107static void nxt_router_listen_socket_error(nxt_task_t *task,
108 nxt_port_recv_msg_t *msg, void *data);
109#if (NXT_TLS)
110static void nxt_router_tls_rpc_create(nxt_task_t *task,

--- 365 unchanged lines hidden (view full) ---

476
477 nxt_router_app_use(task, app, -1);
478
479 return NXT_ERROR;
480}
481
482
483nxt_inline nxt_bool_t
476nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info,
477 uint32_t stream)
484nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data)
478{
485{
479 nxt_buf_t *b, *next;
480 nxt_bool_t cancelled;
486 nxt_buf_t *b, *next;
487 nxt_bool_t cancelled;
488 nxt_msg_info_t *msg_info;
481
489
490 msg_info = &req_rpc_data->msg_info;
491
482 if (msg_info->buf == NULL) {
483 return 0;
484 }
485
492 if (msg_info->buf == NULL) {
493 return 0;
494 }
495
486 cancelled = nxt_port_mmap_tracking_cancel(task, &msg_info->tracking,
487 stream);
496 cancelled = nxt_app_queue_cancel(req_rpc_data->app->shared_port->queue,
497 msg_info->tracking_cookie,
498 req_rpc_data->stream);
488
489 if (cancelled) {
499
500 if (cancelled) {
490 nxt_debug(task, "stream #%uD: cancelled by router", stream);
501 nxt_debug(task, "stream #%uD: cancelled by router",
502 req_rpc_data->stream);
491 }
492
493 for (b = msg_info->buf; b != NULL; b = next) {
494 next = b->next;
495 b->next = NULL;
496
497 b->completion_handler = msg_info->completion_handler;
498

--- 25 unchanged lines hidden (view full) ---

524
525
526nxt_inline void
527nxt_request_rpc_data_unlink(nxt_task_t *task,
528 nxt_request_rpc_data_t *req_rpc_data)
529{
530 nxt_http_request_t *r;
531
503 }
504
505 for (b = msg_info->buf; b != NULL; b = next) {
506 next = b->next;
507 b->next = NULL;
508
509 b->completion_handler = msg_info->completion_handler;
510

--- 25 unchanged lines hidden (view full) ---

536
537
538nxt_inline void
539nxt_request_rpc_data_unlink(nxt_task_t *task,
540 nxt_request_rpc_data_t *req_rpc_data)
541{
542 nxt_http_request_t *r;
543
532 nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream);
544 nxt_router_msg_cancel(task, req_rpc_data);
533
534 if (req_rpc_data->app_port != NULL) {
535 nxt_router_app_port_release(task, req_rpc_data->app_port,
536 req_rpc_data->apr_action);
537
538 req_rpc_data->app_port = NULL;
539 }
540

--- 27 unchanged lines hidden (view full) ---

568 req_rpc_data->stream);
569 }
570}
571
572
573static void
574nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
575{
545
546 if (req_rpc_data->app_port != NULL) {
547 nxt_router_app_port_release(task, req_rpc_data->app_port,
548 req_rpc_data->apr_action);
549
550 req_rpc_data->app_port = NULL;
551 }
552

--- 27 unchanged lines hidden (view full) ---

580 req_rpc_data->stream);
581 }
582}
583
584
585static void
586nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
587{
588 nxt_int_t res;
576 nxt_app_t *app;
577 nxt_port_t *port, *main_app_port;
578 nxt_runtime_t *rt;
579
580 nxt_port_new_port_handler(task, msg);
581
582 port = msg->u.new_port;
583
584 if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) {
585 nxt_router_greet_controller(task, msg->u.new_port);
586 }
587
588 if (port == NULL || port->type != NXT_PROCESS_APP) {
589
590 if (msg->port_msg.stream == 0) {
591 return;
592 }
593
594 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
589 nxt_app_t *app;
590 nxt_port_t *port, *main_app_port;
591 nxt_runtime_t *rt;
592
593 nxt_port_new_port_handler(task, msg);
594
595 port = msg->u.new_port;
596
597 if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) {
598 nxt_router_greet_controller(task, msg->u.new_port);
599 }
600
601 if (port == NULL || port->type != NXT_PROCESS_APP) {
602
603 if (msg->port_msg.stream == 0) {
604 return;
605 }
606
607 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
608
609 } else {
610 if (msg->fd2 != -1) {
611 res = nxt_router_port_queue_map(task, port, msg->fd2);
612 if (nxt_slow_path(res != NXT_OK)) {
613 return;
614 }
615
616 nxt_fd_close(msg->fd2);
617 msg->fd2 = -1;
618 }
595 }
596
597 if (msg->port_msg.stream != 0) {
598 nxt_port_rpc_handler(task, msg);
599 return;
600 }
601
602 /*

--- 915 unchanged lines hidden (view full) ---

1518 }
1519
1520 ret = nxt_port_socket_init(task, port, 0);
1521 if (nxt_slow_path(ret != NXT_OK)) {
1522 nxt_port_use(task, port, -1);
1523 return NXT_ERROR;
1524 }
1525
619 }
620
621 if (msg->port_msg.stream != 0) {
622 nxt_port_rpc_handler(task, msg);
623 return;
624 }
625
626 /*

--- 915 unchanged lines hidden (view full) ---

1542 }
1543
1544 ret = nxt_port_socket_init(task, port, 0);
1545 if (nxt_slow_path(ret != NXT_OK)) {
1546 nxt_port_use(task, port, -1);
1547 return NXT_ERROR;
1548 }
1549
1550 ret = nxt_router_app_queue_init(task, port);
1551 if (nxt_slow_path(ret != NXT_OK)) {
1552 nxt_port_use(task, port, -1);
1553 return NXT_ERROR;
1554 }
1555
1526 nxt_port_write_enable(task, port);
1527 port->app = app;
1528
1529 app->shared_port = port;
1530
1531 nxt_thread_mutex_create(&app->outgoing.mutex);
1532 }
1533 }

--- 289 unchanged lines hidden (view full) ---

1823 }
1824
1825 } nxt_queue_loop;
1826
1827 return NULL;
1828}
1829
1830
1556 nxt_port_write_enable(task, port);
1557 port->app = app;
1558
1559 app->shared_port = port;
1560
1561 nxt_thread_mutex_create(&app->outgoing.mutex);
1562 }
1563 }

--- 289 unchanged lines hidden (view full) ---

1853 }
1854
1855 } nxt_queue_loop;
1856
1857 return NULL;
1858}
1859
1860
1861static nxt_int_t
1862nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port)
1863{
1864 void *mem;
1865 nxt_int_t fd;
1866
1867 fd = nxt_shm_open(task, sizeof(nxt_app_queue_t));
1868 if (nxt_slow_path(fd == -1)) {
1869 return NXT_ERROR;
1870 }
1871
1872 mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t),
1873 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
1874 if (nxt_slow_path(mem == MAP_FAILED)) {
1875 nxt_fd_close(fd);
1876
1877 return NXT_ERROR;
1878 }
1879
1880 nxt_app_queue_init(mem);
1881
1882 port->queue_fd = fd;
1883 port->queue = mem;
1884
1885 return NXT_OK;
1886}
1887
1888
1889static nxt_int_t
1890nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port)
1891{
1892 void *mem;
1893 nxt_int_t fd;
1894
1895 fd = nxt_shm_open(task, sizeof(nxt_port_queue_t));
1896 if (nxt_slow_path(fd == -1)) {
1897 return NXT_ERROR;
1898 }
1899
1900 mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
1901 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
1902 if (nxt_slow_path(mem == MAP_FAILED)) {
1903 nxt_fd_close(fd);
1904
1905 return NXT_ERROR;
1906 }
1907
1908 nxt_port_queue_init(mem);
1909
1910 port->queue_fd = fd;
1911 port->queue = mem;
1912
1913 return NXT_OK;
1914}
1915
1916
1917static nxt_int_t
1918nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd)
1919{
1920 void *mem;
1921
1922 nxt_assert(fd != -1);
1923
1924 mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
1925 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
1926 if (nxt_slow_path(mem == MAP_FAILED)) {
1927
1928 return NXT_ERROR;
1929 }
1930
1931 port->queue = mem;
1932
1933 return NXT_OK;
1934}
1935
1936
1831void
1832nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name,
1833 nxt_http_action_t *action)
1834{
1835 nxt_app_t *app;
1836
1837 app = nxt_router_app_find(&tmcf->apps, name);
1838

--- 904 unchanged lines hidden (view full) ---

2743 }
2744
2745 ret = nxt_port_socket_init(task, port, 0);
2746 if (nxt_slow_path(ret != NXT_OK)) {
2747 nxt_port_use(task, port, -1);
2748 return;
2749 }
2750
1937void
1938nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name,
1939 nxt_http_action_t *action)
1940{
1941 nxt_app_t *app;
1942
1943 app = nxt_router_app_find(&tmcf->apps, name);
1944

--- 904 unchanged lines hidden (view full) ---

2849 }
2850
2851 ret = nxt_port_socket_init(task, port, 0);
2852 if (nxt_slow_path(ret != NXT_OK)) {
2853 nxt_port_use(task, port, -1);
2854 return;
2855 }
2856
2857 ret = nxt_router_port_queue_init(task, port);
2858 if (nxt_slow_path(ret != NXT_OK)) {
2859 nxt_port_use(task, port, -1);
2860 return;
2861 }
2862
2751 engine->port = port;
2752
2753 nxt_port_enable(task, port, &nxt_router_app_port_handlers);
2754
2755 work = nxt_zalloc(sizeof(nxt_work_t));
2756 if (nxt_slow_path(work == NULL)) {
2757 return;
2758 }

--- 906 unchanged lines hidden (view full) ---

3665 nxt_request_rpc_data_unlink(task, req_rpc_data);
3666}
3667
3668
3669static void
3670nxt_router_req_headers_ack_handler(nxt_task_t *task,
3671 nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data)
3672{
2863 engine->port = port;
2864
2865 nxt_port_enable(task, port, &nxt_router_app_port_handlers);
2866
2867 work = nxt_zalloc(sizeof(nxt_work_t));
2868 if (nxt_slow_path(work == NULL)) {
2869 return;
2870 }

--- 906 unchanged lines hidden (view full) ---

3777 nxt_request_rpc_data_unlink(task, req_rpc_data);
3778}
3779
3780
3781static void
3782nxt_router_req_headers_ack_handler(nxt_task_t *task,
3783 nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data)
3784{
3785 int res;
3673 nxt_app_t *app;
3674 nxt_bool_t start_process;
3675 nxt_port_t *app_port, *main_app_port, *idle_port;
3676 nxt_queue_link_t *idle_lnk;
3677 nxt_http_request_t *r;
3678
3679 nxt_debug(task, "stream #%uD: got ack from %PI:%d",
3680 req_rpc_data->stream,

--- 66 unchanged lines hidden (view full) ---

3747 if (start_process) {
3748 nxt_router_start_app_process(task, app);
3749 }
3750
3751 nxt_port_use(task, req_rpc_data->app_port, -1);
3752
3753 req_rpc_data->app_port = app_port;
3754
3786 nxt_app_t *app;
3787 nxt_bool_t start_process;
3788 nxt_port_t *app_port, *main_app_port, *idle_port;
3789 nxt_queue_link_t *idle_lnk;
3790 nxt_http_request_t *r;
3791
3792 nxt_debug(task, "stream #%uD: got ack from %PI:%d",
3793 req_rpc_data->stream,

--- 66 unchanged lines hidden (view full) ---

3860 if (start_process) {
3861 nxt_router_start_app_process(task, app);
3862 }
3863
3864 nxt_port_use(task, req_rpc_data->app_port, -1);
3865
3866 req_rpc_data->app_port = app_port;
3867
3868 if (req_rpc_data->msg_info.body_fd != -1) {
3869 nxt_debug(task, "stream #%uD: send body fd %d", req_rpc_data->stream,
3870 req_rpc_data->msg_info.body_fd);
3871
3872 lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET);
3873
3874 res = nxt_port_socket_write(task, app_port, NXT_PORT_MSG_REQ_BODY,
3875 req_rpc_data->msg_info.body_fd,
3876 req_rpc_data->stream,
3877 task->thread->engine->port->id, NULL);
3878
3879 if (nxt_slow_path(res != NXT_OK)) {
3880 r = req_rpc_data->request;
3881
3882 nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
3883 }
3884 }
3885
3755 if (app->timeout != 0) {
3756 r = req_rpc_data->request;
3757
3758 r->timer.handler = nxt_router_app_timeout;
3759 r->timer_data = req_rpc_data;
3760 nxt_timer_add(task->thread->engine, &r->timer, app->timeout);
3761 }
3762}

--- 118 unchanged lines hidden (view full) ---

3881 msg = (nxt_port_msg_new_port_t *) b->mem.pos;
3882
3883 msg->id = port->id;
3884 msg->pid = port->pid;
3885 msg->max_size = port->max_size;
3886 msg->max_share = port->max_share;
3887 msg->type = port->type;
3888
3886 if (app->timeout != 0) {
3887 r = req_rpc_data->request;
3888
3889 r->timer.handler = nxt_router_app_timeout;
3890 r->timer_data = req_rpc_data;
3891 nxt_timer_add(task->thread->engine, &r->timer, app->timeout);
3892 }
3893}

--- 118 unchanged lines hidden (view full) ---

4012 msg = (nxt_port_msg_new_port_t *) b->mem.pos;
4013
4014 msg->id = port->id;
4015 msg->pid = port->pid;
4016 msg->max_size = port->max_size;
4017 msg->max_share = port->max_share;
4018 msg->type = port->type;
4019
3889 return nxt_port_socket_twrite(task, app_port,
4020 return nxt_port_socket_write2(task, app_port,
3890 NXT_PORT_MSG_NEW_PORT,
4021 NXT_PORT_MSG_NEW_PORT,
3891 port->pair[0],
3892 0, 0, b, NULL);
4022 port->pair[0], port->queue_fd,
4023 0, 0, b);
3893}
3894
3895
3896static void
3897nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3898 void *data)
3899{
3900 nxt_app_t *app;

--- 616 unchanged lines hidden (view full) ---

4517nxt_router_app_prepare_request(nxt_task_t *task,
4518 nxt_request_rpc_data_t *req_rpc_data)
4519{
4520 nxt_app_t *app;
4521 nxt_buf_t *buf, *body;
4522 nxt_int_t res;
4523 nxt_port_t *port, *reply_port;
4524
4024}
4025
4026
4027static void
4028nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4029 void *data)
4030{
4031 nxt_app_t *app;

--- 616 unchanged lines hidden (view full) ---

4648nxt_router_app_prepare_request(nxt_task_t *task,
4649 nxt_request_rpc_data_t *req_rpc_data)
4650{
4651 nxt_app_t *app;
4652 nxt_buf_t *buf, *body;
4653 nxt_int_t res;
4654 nxt_port_t *port, *reply_port;
4655
4656 int notify;
4657 struct {
4658 nxt_port_msg_t pm;
4659 nxt_port_mmap_msg_t mm;
4660 } msg;
4661
4662
4525 app = req_rpc_data->app;
4526
4527 nxt_assert(app != NULL);
4528
4529 port = req_rpc_data->app_port;
4530
4531 nxt_assert(port != NULL);
4663 app = req_rpc_data->app;
4664
4665 nxt_assert(app != NULL);
4666
4667 port = req_rpc_data->app_port;
4668
4669 nxt_assert(port != NULL);
4670 nxt_assert(port->queue != NULL);
4532
4533 reply_port = task->thread->engine->port;
4534
4535 buf = nxt_router_prepare_msg(task, req_rpc_data->request, app,
4536 nxt_app_msg_prefix[app->type]);
4537 if (nxt_slow_path(buf == NULL)) {
4538 nxt_alert(task, "stream #%uD, app '%V': failed to prepare app message",
4539 req_rpc_data->stream, &app->name);

--- 24 unchanged lines hidden (view full) ---

4564 req_rpc_data->msg_info.body_fd = body->file->fd;
4565
4566 body->file->fd = -1;
4567
4568 } else {
4569 req_rpc_data->msg_info.body_fd = -1;
4570 }
4571
4671
4672 reply_port = task->thread->engine->port;
4673
4674 buf = nxt_router_prepare_msg(task, req_rpc_data->request, app,
4675 nxt_app_msg_prefix[app->type]);
4676 if (nxt_slow_path(buf == NULL)) {
4677 nxt_alert(task, "stream #%uD, app '%V': failed to prepare app message",
4678 req_rpc_data->stream, &app->name);

--- 24 unchanged lines hidden (view full) ---

4703 req_rpc_data->msg_info.body_fd = body->file->fd;
4704
4705 body->file->fd = -1;
4706
4707 } else {
4708 req_rpc_data->msg_info.body_fd = -1;
4709 }
4710
4572 if (req_rpc_data->msg_info.body_fd != -1) {
4573 nxt_debug(task, "stream #%uD: send body fd %d", req_rpc_data->stream,
4574 req_rpc_data->msg_info.body_fd);
4711 msg.pm.stream = req_rpc_data->stream;
4712 msg.pm.pid = reply_port->pid;
4713 msg.pm.reply_port = reply_port->id;
4714 msg.pm.type = NXT_PORT_MSG_REQ_HEADERS;
4715 msg.pm.last = 0;
4716 msg.pm.mmap = 1;
4717 msg.pm.nf = 0;
4718 msg.pm.mf = 0;
4719 msg.pm.tracking = 0;
4575
4720
4576 lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET);
4577 }
4721 nxt_port_mmap_handler_t *mmap_handler = buf->parent;
4722 nxt_port_mmap_header_t *hdr = mmap_handler->hdr;
4578
4723
4579 res = nxt_port_socket_twrite(task, port,
4580 NXT_PORT_MSG_REQ_HEADERS,
4581 req_rpc_data->msg_info.body_fd,
4582 req_rpc_data->stream, reply_port->id, buf,
4583 NULL);
4724 msg.mm.mmap_id = hdr->id;
4725 msg.mm.chunk_id = nxt_port_mmap_chunk_id(hdr, buf->mem.pos);
4726 msg.mm.size = nxt_buf_used_size(buf);
4584
4727
4585 if (nxt_slow_path(res != NXT_OK)) {
4728 res = nxt_app_queue_send(port->queue, &msg, sizeof(msg),
4729 req_rpc_data->stream, &notify,
4730 &req_rpc_data->msg_info.tracking_cookie);
4731 if (nxt_fast_path(res == NXT_OK)) {
4732 if (notify != 0) {
4733 (void) nxt_port_socket_write(task, port,
4734 NXT_PORT_MSG_READ_QUEUE,
4735 -1, req_rpc_data->stream,
4736 reply_port->id, NULL);
4737
4738 } else {
4739 nxt_debug(task, "queue is not empty");
4740 }
4741
4742 } else {
4586 nxt_alert(task, "stream #%uD, app '%V': failed to send app message",
4587 req_rpc_data->stream, &app->name);
4588
4589 nxt_http_request_error(task, req_rpc_data->request,
4590 NXT_HTTP_INTERNAL_SERVER_ERROR);
4591 }
4592}
4593

--- 532 unchanged lines hidden ---
4743 nxt_alert(task, "stream #%uD, app '%V': failed to send app message",
4744 req_rpc_data->stream, &app->name);
4745
4746 nxt_http_request_error(task, req_rpc_data->request,
4747 NXT_HTTP_INTERNAL_SERVER_ERROR);
4748 }
4749}
4750

--- 532 unchanged lines hidden ---