xref: /unit/src/nxt_router.c (revision 88)
120Sigor@sysoev.ru 
220Sigor@sysoev.ru /*
320Sigor@sysoev.ru  * Copyright (C) Igor Sysoev
420Sigor@sysoev.ru  * Copyright (C) Valentin V. Bartenev
520Sigor@sysoev.ru  * Copyright (C) NGINX, Inc.
620Sigor@sysoev.ru  */
720Sigor@sysoev.ru 
853Sigor@sysoev.ru #include <nxt_router.h>
9*88Smax.romanov@nginx.com #include <nxt_application.h>
1020Sigor@sysoev.ru 
1120Sigor@sysoev.ru 
1253Sigor@sysoev.ru static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task,
1353Sigor@sysoev.ru     nxt_router_t *router);
1453Sigor@sysoev.ru static void nxt_router_listen_sockets_sort(nxt_router_t *router,
1553Sigor@sysoev.ru     nxt_router_temp_conf_t *tmcf);
1653Sigor@sysoev.ru 
1753Sigor@sysoev.ru static nxt_int_t nxt_router_stub_conf(nxt_task_t *task,
1853Sigor@sysoev.ru     nxt_router_temp_conf_t *tmcf);
1953Sigor@sysoev.ru static nxt_int_t nxt_router_listen_sockets_stub_create(nxt_task_t *task,
2053Sigor@sysoev.ru     nxt_router_temp_conf_t *tmcf);
2165Sigor@sysoev.ru static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task, nxt_mp_t *mp,
2265Sigor@sysoev.ru     nxt_sockaddr_t *sa);
2353Sigor@sysoev.ru static nxt_sockaddr_t *nxt_router_listen_sockaddr_stub(nxt_task_t *task,
2465Sigor@sysoev.ru     nxt_mp_t *mp, uint32_t port);
2553Sigor@sysoev.ru 
2653Sigor@sysoev.ru static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
2753Sigor@sysoev.ru     nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
2853Sigor@sysoev.ru     const nxt_event_interface_t *interface);
2965Sigor@sysoev.ru static nxt_int_t nxt_router_engine_conf_create(nxt_task_t *task, nxt_mp_t *mp,
3065Sigor@sysoev.ru     nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf);
3165Sigor@sysoev.ru static nxt_int_t nxt_router_engine_conf_update(nxt_task_t *task, nxt_mp_t *mp,
3265Sigor@sysoev.ru     nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf);
3365Sigor@sysoev.ru static nxt_int_t nxt_router_engine_conf_delete(nxt_task_t *task, nxt_mp_t *mp,
3465Sigor@sysoev.ru     nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf);
3565Sigor@sysoev.ru static nxt_int_t nxt_router_engine_joints_create(nxt_task_t *task, nxt_mp_t *mp,
3665Sigor@sysoev.ru     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array,
3765Sigor@sysoev.ru     nxt_work_handler_t handler);
3853Sigor@sysoev.ru static nxt_int_t nxt_router_engine_joints_delete(nxt_task_t *task,
3953Sigor@sysoev.ru     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array);
4053Sigor@sysoev.ru 
4153Sigor@sysoev.ru static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
4253Sigor@sysoev.ru     nxt_router_temp_conf_t *tmcf);
4353Sigor@sysoev.ru static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
4453Sigor@sysoev.ru     nxt_event_engine_t *engine);
4553Sigor@sysoev.ru 
4653Sigor@sysoev.ru static void nxt_router_engines_post(nxt_router_temp_conf_t *tmcf);
4753Sigor@sysoev.ru static void nxt_router_engine_post(nxt_router_engine_conf_t *recf);
4853Sigor@sysoev.ru 
4953Sigor@sysoev.ru static void nxt_router_thread_start(void *data);
5053Sigor@sysoev.ru static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
5153Sigor@sysoev.ru     void *data);
5253Sigor@sysoev.ru static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
5353Sigor@sysoev.ru     void *data);
5453Sigor@sysoev.ru static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
5553Sigor@sysoev.ru     void *data);
5653Sigor@sysoev.ru static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
5753Sigor@sysoev.ru     void *data);
5853Sigor@sysoev.ru static void nxt_router_listen_socket_release(nxt_task_t *task,
5953Sigor@sysoev.ru     nxt_socket_conf_joint_t *joint);
6053Sigor@sysoev.ru static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
6153Sigor@sysoev.ru     void *data);
6253Sigor@sysoev.ru static void nxt_router_conf_release(nxt_task_t *task,
6353Sigor@sysoev.ru     nxt_socket_conf_joint_t *joint);
6453Sigor@sysoev.ru 
6553Sigor@sysoev.ru static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data);
6653Sigor@sysoev.ru static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj,
6753Sigor@sysoev.ru     void *data);
68*88Smax.romanov@nginx.com static void nxt_router_process_http_request(nxt_task_t *task,
69*88Smax.romanov@nginx.com     nxt_conn_t *c, nxt_app_parse_ctx_t *ap);
70*88Smax.romanov@nginx.com static void nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data);
7153Sigor@sysoev.ru static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data);
7253Sigor@sysoev.ru static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data);
7353Sigor@sysoev.ru static void nxt_router_conn_error(nxt_task_t *task, void *obj, void *data);
7453Sigor@sysoev.ru static void nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data);
7562Sigor@sysoev.ru static nxt_msec_t nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data);
7620Sigor@sysoev.ru 
7720Sigor@sysoev.ru 
7820Sigor@sysoev.ru nxt_int_t
7920Sigor@sysoev.ru nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt)
8020Sigor@sysoev.ru {
8153Sigor@sysoev.ru     nxt_int_t                    ret;
8253Sigor@sysoev.ru     nxt_router_t                 *router;
8353Sigor@sysoev.ru     nxt_router_temp_conf_t       *tmcf;
8453Sigor@sysoev.ru     const nxt_event_interface_t  *interface;
8553Sigor@sysoev.ru 
86*88Smax.romanov@nginx.com     ret = nxt_app_http_init(task, rt);
87*88Smax.romanov@nginx.com     if (nxt_slow_path(ret != NXT_OK)) {
88*88Smax.romanov@nginx.com         return ret;
89*88Smax.romanov@nginx.com     }
90*88Smax.romanov@nginx.com 
9153Sigor@sysoev.ru     router = nxt_zalloc(sizeof(nxt_router_t));
9253Sigor@sysoev.ru     if (nxt_slow_path(router == NULL)) {
9353Sigor@sysoev.ru         return NXT_ERROR;
9453Sigor@sysoev.ru     }
9553Sigor@sysoev.ru 
9653Sigor@sysoev.ru     nxt_queue_init(&router->engines);
9753Sigor@sysoev.ru     nxt_queue_init(&router->sockets);
9853Sigor@sysoev.ru 
9953Sigor@sysoev.ru     /**/
10053Sigor@sysoev.ru 
10153Sigor@sysoev.ru     tmcf = nxt_router_temp_conf(task, router);
10253Sigor@sysoev.ru     if (nxt_slow_path(tmcf == NULL)) {
10320Sigor@sysoev.ru         return NXT_ERROR;
10420Sigor@sysoev.ru     }
10520Sigor@sysoev.ru 
10653Sigor@sysoev.ru     ret = nxt_router_stub_conf(task, tmcf);
10753Sigor@sysoev.ru     if (nxt_slow_path(ret != NXT_OK)) {
10853Sigor@sysoev.ru         return ret;
10953Sigor@sysoev.ru     }
11053Sigor@sysoev.ru 
11153Sigor@sysoev.ru     nxt_router_listen_sockets_sort(router, tmcf);
11253Sigor@sysoev.ru 
11353Sigor@sysoev.ru     ret = nxt_router_listen_sockets_stub_create(task, tmcf);
11453Sigor@sysoev.ru     if (nxt_slow_path(ret != NXT_OK)) {
11553Sigor@sysoev.ru         return ret;
11653Sigor@sysoev.ru     }
11753Sigor@sysoev.ru 
11853Sigor@sysoev.ru     interface = nxt_service_get(rt->services, "engine", NULL);
11953Sigor@sysoev.ru 
12053Sigor@sysoev.ru     ret = nxt_router_engines_create(task, router, tmcf, interface);
12153Sigor@sysoev.ru     if (nxt_slow_path(ret != NXT_OK)) {
12253Sigor@sysoev.ru         return ret;
12353Sigor@sysoev.ru     }
12453Sigor@sysoev.ru 
12553Sigor@sysoev.ru     ret = nxt_router_threads_create(task, rt, tmcf);
12653Sigor@sysoev.ru     if (nxt_slow_path(ret != NXT_OK)) {
12753Sigor@sysoev.ru         return ret;
12853Sigor@sysoev.ru     }
12953Sigor@sysoev.ru 
13053Sigor@sysoev.ru     nxt_router_engines_post(tmcf);
13153Sigor@sysoev.ru 
13253Sigor@sysoev.ru     nxt_queue_add(&router->sockets, &tmcf->updating);
13353Sigor@sysoev.ru     nxt_queue_add(&router->sockets, &tmcf->creating);
13453Sigor@sysoev.ru 
13553Sigor@sysoev.ru     return NXT_OK;
13653Sigor@sysoev.ru }
13753Sigor@sysoev.ru 
13853Sigor@sysoev.ru 
13953Sigor@sysoev.ru static nxt_router_temp_conf_t *
14053Sigor@sysoev.ru nxt_router_temp_conf(nxt_task_t *task, nxt_router_t *router)
14153Sigor@sysoev.ru {
14265Sigor@sysoev.ru     nxt_mp_t                *mp, *tmp;
14353Sigor@sysoev.ru     nxt_router_conf_t       *rtcf;
14453Sigor@sysoev.ru     nxt_router_temp_conf_t  *tmcf;
14553Sigor@sysoev.ru 
14665Sigor@sysoev.ru     mp = nxt_mp_create(1024, 128, 256, 32);
14753Sigor@sysoev.ru     if (nxt_slow_path(mp == NULL)) {
14853Sigor@sysoev.ru         return NULL;
14953Sigor@sysoev.ru     }
15053Sigor@sysoev.ru 
15165Sigor@sysoev.ru     rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t));
15253Sigor@sysoev.ru     if (nxt_slow_path(rtcf == NULL)) {
15353Sigor@sysoev.ru         goto fail;
15453Sigor@sysoev.ru     }
15553Sigor@sysoev.ru 
15653Sigor@sysoev.ru     rtcf->mem_pool = mp;
15753Sigor@sysoev.ru     rtcf->router = router;
15853Sigor@sysoev.ru     rtcf->count = 1;
15953Sigor@sysoev.ru 
16065Sigor@sysoev.ru     tmp = nxt_mp_create(1024, 128, 256, 32);
16153Sigor@sysoev.ru     if (nxt_slow_path(tmp == NULL)) {
16253Sigor@sysoev.ru         goto fail;
16353Sigor@sysoev.ru     }
16453Sigor@sysoev.ru 
16565Sigor@sysoev.ru     tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t));
16653Sigor@sysoev.ru     if (nxt_slow_path(tmcf == NULL)) {
16753Sigor@sysoev.ru         goto temp_fail;
16853Sigor@sysoev.ru     }
16953Sigor@sysoev.ru 
17053Sigor@sysoev.ru     tmcf->mem_pool = tmp;
17153Sigor@sysoev.ru     tmcf->conf = rtcf;
17253Sigor@sysoev.ru 
17353Sigor@sysoev.ru     tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
17453Sigor@sysoev.ru                                      sizeof(nxt_router_engine_conf_t));
17553Sigor@sysoev.ru     if (nxt_slow_path(tmcf->engines == NULL)) {
17653Sigor@sysoev.ru         goto temp_fail;
17753Sigor@sysoev.ru     }
17853Sigor@sysoev.ru 
17953Sigor@sysoev.ru     nxt_queue_init(&tmcf->deleting);
18053Sigor@sysoev.ru     nxt_queue_init(&tmcf->keeping);
18153Sigor@sysoev.ru     nxt_queue_init(&tmcf->updating);
18253Sigor@sysoev.ru     nxt_queue_init(&tmcf->pending);
18353Sigor@sysoev.ru     nxt_queue_init(&tmcf->creating);
18453Sigor@sysoev.ru 
18553Sigor@sysoev.ru     return tmcf;
18653Sigor@sysoev.ru 
18753Sigor@sysoev.ru temp_fail:
18853Sigor@sysoev.ru 
18965Sigor@sysoev.ru     nxt_mp_destroy(tmp);
19053Sigor@sysoev.ru 
19153Sigor@sysoev.ru fail:
19253Sigor@sysoev.ru 
19365Sigor@sysoev.ru     nxt_mp_destroy(mp);
19453Sigor@sysoev.ru 
19553Sigor@sysoev.ru     return NULL;
19653Sigor@sysoev.ru }
19753Sigor@sysoev.ru 
19853Sigor@sysoev.ru 
19953Sigor@sysoev.ru static nxt_int_t
20053Sigor@sysoev.ru nxt_router_stub_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
20153Sigor@sysoev.ru {
20265Sigor@sysoev.ru     nxt_mp_t           *mp;
20353Sigor@sysoev.ru     nxt_sockaddr_t     *sa;
20453Sigor@sysoev.ru     nxt_socket_conf_t  *skcf;
20553Sigor@sysoev.ru 
20653Sigor@sysoev.ru     tmcf->conf->threads = 1;
20753Sigor@sysoev.ru 
20853Sigor@sysoev.ru     mp = tmcf->conf->mem_pool;
20953Sigor@sysoev.ru 
21053Sigor@sysoev.ru     sa = nxt_router_listen_sockaddr_stub(task, mp, 8000);
21153Sigor@sysoev.ru     skcf = nxt_router_socket_conf(task, mp, sa);
21253Sigor@sysoev.ru 
21353Sigor@sysoev.ru     skcf->listen.handler = nxt_router_conn_init;
21453Sigor@sysoev.ru     skcf->header_buffer_size = 2048;
21553Sigor@sysoev.ru     skcf->large_header_buffer_size = 8192;
21653Sigor@sysoev.ru     skcf->header_read_timeout = 5000;
21753Sigor@sysoev.ru 
21853Sigor@sysoev.ru     nxt_queue_insert_tail(&tmcf->pending, &skcf->link);
21953Sigor@sysoev.ru 
22053Sigor@sysoev.ru     sa = nxt_router_listen_sockaddr_stub(task, mp, 8001);
22153Sigor@sysoev.ru     skcf = nxt_router_socket_conf(task, mp, sa);
22253Sigor@sysoev.ru 
22353Sigor@sysoev.ru     skcf->listen.handler = nxt_stream_connection_init;
22453Sigor@sysoev.ru     skcf->header_read_timeout = 5000;
22553Sigor@sysoev.ru 
22653Sigor@sysoev.ru     nxt_queue_insert_tail(&tmcf->pending, &skcf->link);
22753Sigor@sysoev.ru 
22853Sigor@sysoev.ru     return NXT_OK;
22953Sigor@sysoev.ru }
23053Sigor@sysoev.ru 
23153Sigor@sysoev.ru 
23253Sigor@sysoev.ru static nxt_socket_conf_t *
23365Sigor@sysoev.ru nxt_router_socket_conf(nxt_task_t *task, nxt_mp_t *mp, nxt_sockaddr_t *sa)
23453Sigor@sysoev.ru {
23553Sigor@sysoev.ru     nxt_socket_conf_t  *conf;
23653Sigor@sysoev.ru 
23765Sigor@sysoev.ru     conf = nxt_mp_zget(mp, sizeof(nxt_socket_conf_t));
23853Sigor@sysoev.ru     if (nxt_slow_path(conf == NULL)) {
23953Sigor@sysoev.ru         return NULL;
24053Sigor@sysoev.ru     }
24153Sigor@sysoev.ru 
24253Sigor@sysoev.ru     conf->listen.sockaddr = sa;
24353Sigor@sysoev.ru 
24453Sigor@sysoev.ru     conf->listen.socket = -1;
24553Sigor@sysoev.ru     conf->listen.backlog = NXT_LISTEN_BACKLOG;
24653Sigor@sysoev.ru     conf->listen.flags = NXT_NONBLOCK;
24753Sigor@sysoev.ru     conf->listen.read_after_accept = 1;
24853Sigor@sysoev.ru 
24953Sigor@sysoev.ru     return conf;
25053Sigor@sysoev.ru }
25153Sigor@sysoev.ru 
25253Sigor@sysoev.ru 
25353Sigor@sysoev.ru static nxt_sockaddr_t *
25465Sigor@sysoev.ru nxt_router_listen_sockaddr_stub(nxt_task_t *task, nxt_mp_t *mp, uint32_t port)
25553Sigor@sysoev.ru {
25653Sigor@sysoev.ru     nxt_sockaddr_t      *sa;
25753Sigor@sysoev.ru     struct sockaddr_in  sin;
25853Sigor@sysoev.ru 
25953Sigor@sysoev.ru     nxt_memzero(&sin, sizeof(struct sockaddr_in));
26053Sigor@sysoev.ru 
26153Sigor@sysoev.ru     sin.sin_family = AF_INET;
26253Sigor@sysoev.ru     sin.sin_port = htons(port);
26353Sigor@sysoev.ru 
26453Sigor@sysoev.ru     sa = nxt_sockaddr_create(mp, (struct sockaddr *) &sin,
26553Sigor@sysoev.ru                              sizeof(struct sockaddr_in), NXT_INET_ADDR_STR_LEN);
26653Sigor@sysoev.ru     if (nxt_slow_path(sa == NULL)) {
26753Sigor@sysoev.ru         return NULL;
26853Sigor@sysoev.ru     }
26953Sigor@sysoev.ru 
27053Sigor@sysoev.ru     sa->type = SOCK_STREAM;
27153Sigor@sysoev.ru 
27253Sigor@sysoev.ru     nxt_sockaddr_text(sa);
27353Sigor@sysoev.ru 
27453Sigor@sysoev.ru     return sa;
27553Sigor@sysoev.ru }
27653Sigor@sysoev.ru 
27753Sigor@sysoev.ru 
27853Sigor@sysoev.ru static void
27953Sigor@sysoev.ru nxt_router_listen_sockets_sort(nxt_router_t *router,
28053Sigor@sysoev.ru     nxt_router_temp_conf_t *tmcf)
28153Sigor@sysoev.ru {
28253Sigor@sysoev.ru     nxt_queue_link_t   *nqlk, *oqlk, *next;
28353Sigor@sysoev.ru     nxt_socket_conf_t  *nskcf, *oskcf;
28453Sigor@sysoev.ru 
28553Sigor@sysoev.ru     for (nqlk = nxt_queue_first(&tmcf->pending);
28653Sigor@sysoev.ru          nqlk != nxt_queue_tail(&tmcf->pending);
28753Sigor@sysoev.ru          nqlk = next)
28853Sigor@sysoev.ru     {
28953Sigor@sysoev.ru         next = nxt_queue_next(nqlk);
29053Sigor@sysoev.ru         nskcf = nxt_queue_link_data(nqlk, nxt_socket_conf_t, link);
29153Sigor@sysoev.ru 
29253Sigor@sysoev.ru         for (oqlk = nxt_queue_first(&router->sockets);
29353Sigor@sysoev.ru              oqlk != nxt_queue_tail(&router->sockets);
29453Sigor@sysoev.ru              oqlk = nxt_queue_next(oqlk))
29553Sigor@sysoev.ru         {
29653Sigor@sysoev.ru             oskcf = nxt_queue_link_data(oqlk, nxt_socket_conf_t, link);
29753Sigor@sysoev.ru 
29853Sigor@sysoev.ru             if (nxt_sockaddr_cmp(nskcf->listen.sockaddr,
29953Sigor@sysoev.ru                                  oskcf->listen.sockaddr))
30053Sigor@sysoev.ru             {
30153Sigor@sysoev.ru                 nxt_queue_remove(oqlk);
30253Sigor@sysoev.ru                 nxt_queue_insert_tail(&tmcf->keeping, oqlk);
30353Sigor@sysoev.ru 
30453Sigor@sysoev.ru                 nxt_queue_remove(nqlk);
30553Sigor@sysoev.ru                 nxt_queue_insert_tail(&tmcf->updating, nqlk);
30653Sigor@sysoev.ru 
30753Sigor@sysoev.ru                 break;
30853Sigor@sysoev.ru             }
30953Sigor@sysoev.ru         }
31053Sigor@sysoev.ru     }
31153Sigor@sysoev.ru 
31253Sigor@sysoev.ru     nxt_queue_add(&tmcf->deleting, &router->sockets);
31353Sigor@sysoev.ru }
31453Sigor@sysoev.ru 
31553Sigor@sysoev.ru 
31653Sigor@sysoev.ru static nxt_int_t
31753Sigor@sysoev.ru nxt_router_listen_sockets_stub_create(nxt_task_t *task,
31853Sigor@sysoev.ru     nxt_router_temp_conf_t *tmcf)
31953Sigor@sysoev.ru {
32053Sigor@sysoev.ru     nxt_queue_link_t   *qlk, *nqlk;
32153Sigor@sysoev.ru     nxt_socket_conf_t  *skcf;
32253Sigor@sysoev.ru 
32353Sigor@sysoev.ru     for (qlk = nxt_queue_first(&tmcf->pending);
32453Sigor@sysoev.ru          qlk != nxt_queue_tail(&tmcf->pending);
32553Sigor@sysoev.ru          qlk = nqlk)
32653Sigor@sysoev.ru     {
32753Sigor@sysoev.ru         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
32853Sigor@sysoev.ru 
32953Sigor@sysoev.ru         if (nxt_listen_socket_create(task, &skcf->listen, 0) != NXT_OK) {
33053Sigor@sysoev.ru             return NXT_ERROR;
33153Sigor@sysoev.ru         }
33253Sigor@sysoev.ru 
33353Sigor@sysoev.ru         nqlk = nxt_queue_next(qlk);
33453Sigor@sysoev.ru         nxt_queue_remove(qlk);
33553Sigor@sysoev.ru         nxt_queue_insert_tail(&tmcf->creating, qlk);
33653Sigor@sysoev.ru     }
33753Sigor@sysoev.ru 
33853Sigor@sysoev.ru     return NXT_OK;
33953Sigor@sysoev.ru }
34053Sigor@sysoev.ru 
34153Sigor@sysoev.ru 
34253Sigor@sysoev.ru static nxt_int_t
34353Sigor@sysoev.ru nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
34453Sigor@sysoev.ru     nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
34553Sigor@sysoev.ru {
34665Sigor@sysoev.ru     nxt_mp_t                  *mp;
34753Sigor@sysoev.ru     nxt_int_t                 ret;
34853Sigor@sysoev.ru     nxt_uint_t                n, threads;
34953Sigor@sysoev.ru     nxt_queue_link_t          *qlk;
35053Sigor@sysoev.ru     nxt_router_engine_conf_t  *recf;
35153Sigor@sysoev.ru 
35253Sigor@sysoev.ru     mp = tmcf->conf->mem_pool;
35353Sigor@sysoev.ru     threads = tmcf->conf->threads;
35453Sigor@sysoev.ru 
35553Sigor@sysoev.ru     tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
35653Sigor@sysoev.ru                                      sizeof(nxt_router_engine_conf_t));
35753Sigor@sysoev.ru     if (nxt_slow_path(tmcf->engines == NULL)) {
35853Sigor@sysoev.ru         return NXT_ERROR;
35953Sigor@sysoev.ru     }
36053Sigor@sysoev.ru 
36153Sigor@sysoev.ru     n = 0;
36253Sigor@sysoev.ru 
36353Sigor@sysoev.ru     for (qlk = nxt_queue_first(&router->engines);
36453Sigor@sysoev.ru          qlk != nxt_queue_tail(&router->engines);
36553Sigor@sysoev.ru          qlk = nxt_queue_next(qlk))
36653Sigor@sysoev.ru     {
36753Sigor@sysoev.ru         recf = nxt_array_zero_add(tmcf->engines);
36853Sigor@sysoev.ru         if (nxt_slow_path(recf == NULL)) {
36953Sigor@sysoev.ru             return NXT_ERROR;
37053Sigor@sysoev.ru         }
37153Sigor@sysoev.ru 
37253Sigor@sysoev.ru         recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link);
37353Sigor@sysoev.ru         // STUB
37453Sigor@sysoev.ru         recf->task = recf->engine->task;
37553Sigor@sysoev.ru 
37653Sigor@sysoev.ru         if (n < threads) {
37753Sigor@sysoev.ru             ret = nxt_router_engine_conf_update(task, mp, tmcf, recf);
37853Sigor@sysoev.ru 
37953Sigor@sysoev.ru         } else {
38053Sigor@sysoev.ru             ret = nxt_router_engine_conf_delete(task, mp, tmcf, recf);
38153Sigor@sysoev.ru         }
38253Sigor@sysoev.ru 
38353Sigor@sysoev.ru         if (nxt_slow_path(ret != NXT_OK)) {
38453Sigor@sysoev.ru             return ret;
38553Sigor@sysoev.ru         }
38653Sigor@sysoev.ru 
38753Sigor@sysoev.ru         n++;
38853Sigor@sysoev.ru     }
38953Sigor@sysoev.ru 
39053Sigor@sysoev.ru     tmcf->new_threads = n;
39153Sigor@sysoev.ru 
39253Sigor@sysoev.ru     while (n < threads) {
39353Sigor@sysoev.ru         recf = nxt_array_zero_add(tmcf->engines);
39453Sigor@sysoev.ru         if (nxt_slow_path(recf == NULL)) {
39553Sigor@sysoev.ru             return NXT_ERROR;
39653Sigor@sysoev.ru         }
39753Sigor@sysoev.ru 
39853Sigor@sysoev.ru         recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
39953Sigor@sysoev.ru         if (nxt_slow_path(recf->engine == NULL)) {
40053Sigor@sysoev.ru             return NXT_ERROR;
40153Sigor@sysoev.ru         }
40253Sigor@sysoev.ru         // STUB
40353Sigor@sysoev.ru         recf->task = recf->engine->task;
40453Sigor@sysoev.ru 
40553Sigor@sysoev.ru         ret = nxt_router_engine_conf_create(task, mp, tmcf, recf);
40653Sigor@sysoev.ru         if (nxt_slow_path(ret != NXT_OK)) {
40753Sigor@sysoev.ru             return ret;
40853Sigor@sysoev.ru         }
40953Sigor@sysoev.ru 
41053Sigor@sysoev.ru         n++;
41153Sigor@sysoev.ru     }
41253Sigor@sysoev.ru 
41353Sigor@sysoev.ru     return NXT_OK;
41453Sigor@sysoev.ru }
41553Sigor@sysoev.ru 
41653Sigor@sysoev.ru 
41753Sigor@sysoev.ru static nxt_int_t
41865Sigor@sysoev.ru nxt_router_engine_conf_create(nxt_task_t *task, nxt_mp_t *mp,
41953Sigor@sysoev.ru     nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf)
42053Sigor@sysoev.ru {
42153Sigor@sysoev.ru     nxt_int_t  ret;
42253Sigor@sysoev.ru 
42353Sigor@sysoev.ru     recf->creating = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t));
42453Sigor@sysoev.ru     if (nxt_slow_path(recf->creating == NULL)) {
42553Sigor@sysoev.ru         return NXT_ERROR;
42653Sigor@sysoev.ru     }
42753Sigor@sysoev.ru 
42853Sigor@sysoev.ru     ret = nxt_router_engine_joints_create(task, mp, recf, &tmcf->creating,
42953Sigor@sysoev.ru                             recf->creating, nxt_router_listen_socket_create);
43053Sigor@sysoev.ru     if (nxt_slow_path(ret != NXT_OK)) {
43153Sigor@sysoev.ru         return ret;
43253Sigor@sysoev.ru     }
43353Sigor@sysoev.ru 
43453Sigor@sysoev.ru     return nxt_router_engine_joints_create(task, mp, recf, &tmcf->updating,
43553Sigor@sysoev.ru                             recf->creating, nxt_router_listen_socket_create);
43653Sigor@sysoev.ru }
43753Sigor@sysoev.ru 
43853Sigor@sysoev.ru 
43953Sigor@sysoev.ru static nxt_int_t
44065Sigor@sysoev.ru nxt_router_engine_conf_update(nxt_task_t *task, nxt_mp_t *mp,
44153Sigor@sysoev.ru     nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf)
44253Sigor@sysoev.ru {
44353Sigor@sysoev.ru     nxt_int_t  ret;
44453Sigor@sysoev.ru 
44553Sigor@sysoev.ru     recf->creating = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t));
44653Sigor@sysoev.ru     if (nxt_slow_path(recf->creating == NULL)) {
44753Sigor@sysoev.ru         return NXT_ERROR;
44853Sigor@sysoev.ru     }
44953Sigor@sysoev.ru 
45053Sigor@sysoev.ru     ret = nxt_router_engine_joints_create(task, mp, recf, &tmcf->creating,
45153Sigor@sysoev.ru                             recf->creating, nxt_router_listen_socket_create);
45253Sigor@sysoev.ru     if (nxt_slow_path(ret != NXT_OK)) {
45353Sigor@sysoev.ru         return ret;
45453Sigor@sysoev.ru     }
45553Sigor@sysoev.ru 
45653Sigor@sysoev.ru     recf->updating = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t));
45753Sigor@sysoev.ru     if (nxt_slow_path(recf->updating == NULL)) {
45853Sigor@sysoev.ru         return NXT_ERROR;
45953Sigor@sysoev.ru     }
46053Sigor@sysoev.ru 
46153Sigor@sysoev.ru     ret = nxt_router_engine_joints_create(task, mp, recf, &tmcf->updating,
46253Sigor@sysoev.ru                             recf->updating, nxt_router_listen_socket_update);
46353Sigor@sysoev.ru     if (nxt_slow_path(ret != NXT_OK)) {
46453Sigor@sysoev.ru         return ret;
46553Sigor@sysoev.ru     }
46653Sigor@sysoev.ru 
46753Sigor@sysoev.ru     recf->deleting = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t));
46853Sigor@sysoev.ru     if (nxt_slow_path(recf->deleting == NULL)) {
46953Sigor@sysoev.ru         return NXT_ERROR;
47053Sigor@sysoev.ru     }
47153Sigor@sysoev.ru 
47253Sigor@sysoev.ru     return nxt_router_engine_joints_delete(task, recf, &tmcf->deleting,
47353Sigor@sysoev.ru                                            recf->deleting);
47453Sigor@sysoev.ru }
47553Sigor@sysoev.ru 
47653Sigor@sysoev.ru 
47753Sigor@sysoev.ru static nxt_int_t
47865Sigor@sysoev.ru nxt_router_engine_conf_delete(nxt_task_t *task, nxt_mp_t *mp,
47953Sigor@sysoev.ru     nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf)
48053Sigor@sysoev.ru {
48153Sigor@sysoev.ru     nxt_int_t  ret;
48253Sigor@sysoev.ru 
48353Sigor@sysoev.ru     recf->deleting = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t));
48453Sigor@sysoev.ru     if (nxt_slow_path(recf->deleting == NULL)) {
48553Sigor@sysoev.ru         return NXT_ERROR;
48653Sigor@sysoev.ru     }
48753Sigor@sysoev.ru 
48853Sigor@sysoev.ru     ret = nxt_router_engine_joints_delete(task, recf, &tmcf->updating,
48953Sigor@sysoev.ru                                           recf->deleting);
49053Sigor@sysoev.ru     if (nxt_slow_path(ret != NXT_OK)) {
49153Sigor@sysoev.ru         return ret;
49253Sigor@sysoev.ru     }
49353Sigor@sysoev.ru 
49453Sigor@sysoev.ru     return nxt_router_engine_joints_delete(task, recf, &tmcf->deleting,
49553Sigor@sysoev.ru                                            recf->deleting);
49653Sigor@sysoev.ru }
49753Sigor@sysoev.ru 
49853Sigor@sysoev.ru 
49953Sigor@sysoev.ru static nxt_int_t
50065Sigor@sysoev.ru nxt_router_engine_joints_create(nxt_task_t *task, nxt_mp_t *mp,
50153Sigor@sysoev.ru     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array,
50253Sigor@sysoev.ru     nxt_work_handler_t handler)
50353Sigor@sysoev.ru {
50453Sigor@sysoev.ru     nxt_work_t               *work;
50553Sigor@sysoev.ru     nxt_queue_link_t         *qlk;
50653Sigor@sysoev.ru     nxt_socket_conf_joint_t  *joint;
50753Sigor@sysoev.ru 
50853Sigor@sysoev.ru     for (qlk = nxt_queue_first(sockets);
50953Sigor@sysoev.ru          qlk != nxt_queue_tail(sockets);
51053Sigor@sysoev.ru          qlk = nxt_queue_next(qlk))
51153Sigor@sysoev.ru     {
51253Sigor@sysoev.ru         work = nxt_array_add(array);
51353Sigor@sysoev.ru         if (nxt_slow_path(work == NULL)) {
51453Sigor@sysoev.ru             return NXT_ERROR;
51553Sigor@sysoev.ru         }
51653Sigor@sysoev.ru 
51753Sigor@sysoev.ru         work->next = NULL;
51853Sigor@sysoev.ru         work->handler = handler;
51953Sigor@sysoev.ru         work->task = &recf->task;
52053Sigor@sysoev.ru         work->obj = recf->engine;
52153Sigor@sysoev.ru 
52265Sigor@sysoev.ru         joint = nxt_mp_alloc(mp, sizeof(nxt_socket_conf_joint_t));
52353Sigor@sysoev.ru         if (nxt_slow_path(joint == NULL)) {
52453Sigor@sysoev.ru             return NXT_ERROR;
52553Sigor@sysoev.ru         }
52653Sigor@sysoev.ru 
52753Sigor@sysoev.ru         work->data = joint;
52853Sigor@sysoev.ru 
52953Sigor@sysoev.ru         joint->count = 1;
53053Sigor@sysoev.ru         joint->socket_conf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
531*88Smax.romanov@nginx.com         joint->engine = recf->engine;
53253Sigor@sysoev.ru     }
53353Sigor@sysoev.ru 
53420Sigor@sysoev.ru     return NXT_OK;
53520Sigor@sysoev.ru }
53620Sigor@sysoev.ru 
53720Sigor@sysoev.ru 
53820Sigor@sysoev.ru static nxt_int_t
53953Sigor@sysoev.ru nxt_router_engine_joints_delete(nxt_task_t *task,
54053Sigor@sysoev.ru     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array)
54120Sigor@sysoev.ru {
54253Sigor@sysoev.ru     nxt_work_t        *work;
54353Sigor@sysoev.ru     nxt_queue_link_t  *qlk;
54420Sigor@sysoev.ru 
54553Sigor@sysoev.ru     for (qlk = nxt_queue_first(sockets);
54653Sigor@sysoev.ru          qlk != nxt_queue_tail(sockets);
54753Sigor@sysoev.ru          qlk = nxt_queue_next(qlk))
54853Sigor@sysoev.ru     {
54953Sigor@sysoev.ru         work = nxt_array_add(array);
55053Sigor@sysoev.ru         if (nxt_slow_path(work == NULL)) {
55153Sigor@sysoev.ru             return NXT_ERROR;
55253Sigor@sysoev.ru         }
55320Sigor@sysoev.ru 
55453Sigor@sysoev.ru         work->next = NULL;
55553Sigor@sysoev.ru         work->handler = nxt_router_listen_socket_delete;
55653Sigor@sysoev.ru         work->task = &recf->task;
55753Sigor@sysoev.ru         work->obj = recf->engine;
55853Sigor@sysoev.ru         work->data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
55920Sigor@sysoev.ru     }
56020Sigor@sysoev.ru 
56153Sigor@sysoev.ru     return NXT_OK;
56253Sigor@sysoev.ru }
56320Sigor@sysoev.ru 
56420Sigor@sysoev.ru 
56553Sigor@sysoev.ru static nxt_int_t
56653Sigor@sysoev.ru nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
56753Sigor@sysoev.ru     nxt_router_temp_conf_t *tmcf)
56853Sigor@sysoev.ru {
56953Sigor@sysoev.ru     nxt_int_t                 ret;
57053Sigor@sysoev.ru     nxt_uint_t                i, threads;
57153Sigor@sysoev.ru     nxt_router_engine_conf_t  *recf;
57220Sigor@sysoev.ru 
57353Sigor@sysoev.ru     recf = tmcf->engines->elts;
57453Sigor@sysoev.ru     threads = tmcf->conf->threads;
57520Sigor@sysoev.ru 
57653Sigor@sysoev.ru     for (i = tmcf->new_threads; i < threads; i++) {
57753Sigor@sysoev.ru         ret = nxt_router_thread_create(task, rt, recf[i].engine);
57853Sigor@sysoev.ru         if (nxt_slow_path(ret != NXT_OK)) {
57953Sigor@sysoev.ru             return ret;
58053Sigor@sysoev.ru         }
58120Sigor@sysoev.ru     }
58220Sigor@sysoev.ru 
58320Sigor@sysoev.ru     return NXT_OK;
58420Sigor@sysoev.ru }
58553Sigor@sysoev.ru 
58653Sigor@sysoev.ru 
58753Sigor@sysoev.ru static nxt_int_t
58853Sigor@sysoev.ru nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
58953Sigor@sysoev.ru     nxt_event_engine_t *engine)
59053Sigor@sysoev.ru {
591*88Smax.romanov@nginx.com     nxt_mp_t             *mp;
59253Sigor@sysoev.ru     nxt_int_t            ret;
593*88Smax.romanov@nginx.com     nxt_port_t           *port;
594*88Smax.romanov@nginx.com     nxt_process_t        *process;
59553Sigor@sysoev.ru     nxt_thread_link_t    *link;
59653Sigor@sysoev.ru     nxt_thread_handle_t  handle;
59753Sigor@sysoev.ru 
59853Sigor@sysoev.ru     link = nxt_zalloc(sizeof(nxt_thread_link_t));
59953Sigor@sysoev.ru 
60053Sigor@sysoev.ru     if (nxt_slow_path(link == NULL)) {
60153Sigor@sysoev.ru         return NXT_ERROR;
60253Sigor@sysoev.ru     }
60353Sigor@sysoev.ru 
60453Sigor@sysoev.ru     link->start = nxt_router_thread_start;
60553Sigor@sysoev.ru     link->engine = engine;
60653Sigor@sysoev.ru     link->work.handler = nxt_router_thread_exit_handler;
60753Sigor@sysoev.ru     link->work.task = task;
60853Sigor@sysoev.ru     link->work.data = link;
60953Sigor@sysoev.ru 
61053Sigor@sysoev.ru     nxt_queue_insert_tail(&rt->engines, &engine->link);
61153Sigor@sysoev.ru 
612*88Smax.romanov@nginx.com 
613*88Smax.romanov@nginx.com     process = nxt_runtime_process_find(rt, nxt_pid);
614*88Smax.romanov@nginx.com     if (nxt_slow_path(process == NULL)) {
615*88Smax.romanov@nginx.com         return NXT_ERROR;
616*88Smax.romanov@nginx.com     }
617*88Smax.romanov@nginx.com 
618*88Smax.romanov@nginx.com     port = nxt_process_port_new(process);
619*88Smax.romanov@nginx.com     if (nxt_slow_path(port == NULL)) {
620*88Smax.romanov@nginx.com         return NXT_ERROR;
621*88Smax.romanov@nginx.com     }
622*88Smax.romanov@nginx.com 
623*88Smax.romanov@nginx.com     ret = nxt_port_socket_init(task, port, 0);
624*88Smax.romanov@nginx.com     if (nxt_slow_path(ret != NXT_OK)) {
625*88Smax.romanov@nginx.com         return ret;
626*88Smax.romanov@nginx.com     }
627*88Smax.romanov@nginx.com 
628*88Smax.romanov@nginx.com     mp = nxt_mp_create(1024, 128, 256, 32);
629*88Smax.romanov@nginx.com     if (nxt_slow_path(mp == NULL)) {
630*88Smax.romanov@nginx.com         return NXT_ERROR;
631*88Smax.romanov@nginx.com     }
632*88Smax.romanov@nginx.com 
633*88Smax.romanov@nginx.com     port->mem_pool = mp;
634*88Smax.romanov@nginx.com     port->engine = 0;
635*88Smax.romanov@nginx.com     port->type = NXT_PROCESS_ROUTER;
636*88Smax.romanov@nginx.com 
637*88Smax.romanov@nginx.com     engine->port = port;
638*88Smax.romanov@nginx.com 
639*88Smax.romanov@nginx.com     nxt_runtime_port_add(rt, port);
640*88Smax.romanov@nginx.com 
641*88Smax.romanov@nginx.com 
64253Sigor@sysoev.ru     ret = nxt_thread_create(&handle, link);
64353Sigor@sysoev.ru 
64453Sigor@sysoev.ru     if (nxt_slow_path(ret != NXT_OK)) {
64553Sigor@sysoev.ru         nxt_queue_remove(&engine->link);
64653Sigor@sysoev.ru     }
64753Sigor@sysoev.ru 
64853Sigor@sysoev.ru     return ret;
64953Sigor@sysoev.ru }
65053Sigor@sysoev.ru 
65153Sigor@sysoev.ru 
65253Sigor@sysoev.ru static void
65353Sigor@sysoev.ru nxt_router_engines_post(nxt_router_temp_conf_t *tmcf)
65453Sigor@sysoev.ru {
65553Sigor@sysoev.ru     nxt_uint_t                n;
65653Sigor@sysoev.ru     nxt_router_engine_conf_t  *recf;
65753Sigor@sysoev.ru 
65853Sigor@sysoev.ru     recf = tmcf->engines->elts;
65953Sigor@sysoev.ru 
66053Sigor@sysoev.ru     for (n = tmcf->engines->nelts; n != 0; n--) {
66153Sigor@sysoev.ru         nxt_router_engine_post(recf);
66253Sigor@sysoev.ru         recf++;
66353Sigor@sysoev.ru     }
66453Sigor@sysoev.ru }
66553Sigor@sysoev.ru 
66653Sigor@sysoev.ru 
66753Sigor@sysoev.ru static void
66853Sigor@sysoev.ru nxt_router_engine_post(nxt_router_engine_conf_t *recf)
66953Sigor@sysoev.ru {
67053Sigor@sysoev.ru     nxt_uint_t  n;
67153Sigor@sysoev.ru     nxt_work_t  *work;
67253Sigor@sysoev.ru 
67353Sigor@sysoev.ru     work = recf->creating->elts;
67453Sigor@sysoev.ru 
67553Sigor@sysoev.ru     for (n = recf->creating->nelts; n != 0; n--) {
67653Sigor@sysoev.ru         nxt_event_engine_post(recf->engine, work);
67753Sigor@sysoev.ru         work++;
67853Sigor@sysoev.ru     }
67953Sigor@sysoev.ru }
68053Sigor@sysoev.ru 
68153Sigor@sysoev.ru 
68253Sigor@sysoev.ru static void
683*88Smax.romanov@nginx.com nxt_router_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
684*88Smax.romanov@nginx.com 
685*88Smax.romanov@nginx.com nxt_port_handler_t  nxt_router_process_port_handlers[] = {
686*88Smax.romanov@nginx.com     NULL,
687*88Smax.romanov@nginx.com     nxt_port_new_port_handler,
688*88Smax.romanov@nginx.com     nxt_port_change_log_file_handler,
689*88Smax.romanov@nginx.com     nxt_port_mmap_handler,
690*88Smax.romanov@nginx.com     nxt_router_data_handler,
691*88Smax.romanov@nginx.com };
692*88Smax.romanov@nginx.com 
693*88Smax.romanov@nginx.com 
694*88Smax.romanov@nginx.com static void
69553Sigor@sysoev.ru nxt_router_thread_start(void *data)
69653Sigor@sysoev.ru {
697*88Smax.romanov@nginx.com     nxt_task_t          *task;
69853Sigor@sysoev.ru     nxt_thread_t        *thread;
69953Sigor@sysoev.ru     nxt_thread_link_t   *link;
70053Sigor@sysoev.ru     nxt_event_engine_t  *engine;
70153Sigor@sysoev.ru 
70253Sigor@sysoev.ru     link = data;
70353Sigor@sysoev.ru     engine = link->engine;
704*88Smax.romanov@nginx.com     task = &engine->task;
70553Sigor@sysoev.ru 
70653Sigor@sysoev.ru     thread = nxt_thread();
70753Sigor@sysoev.ru 
70853Sigor@sysoev.ru     /* STUB */
70953Sigor@sysoev.ru     thread->runtime = engine->task.thread->runtime;
71053Sigor@sysoev.ru 
71153Sigor@sysoev.ru     engine->task.thread = thread;
71253Sigor@sysoev.ru     engine->task.log = thread->log;
71353Sigor@sysoev.ru     thread->engine = engine;
71463Sigor@sysoev.ru     thread->task = &engine->task;
71553Sigor@sysoev.ru     thread->fiber = &engine->fibers->fiber;
71653Sigor@sysoev.ru 
717*88Smax.romanov@nginx.com     engine->port->socket.task = task;
718*88Smax.romanov@nginx.com     nxt_port_create(task, engine->port, nxt_router_process_port_handlers);
719*88Smax.romanov@nginx.com 
72063Sigor@sysoev.ru     engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
72153Sigor@sysoev.ru 
72253Sigor@sysoev.ru     nxt_event_engine_start(engine);
72353Sigor@sysoev.ru }
72453Sigor@sysoev.ru 
72553Sigor@sysoev.ru 
72653Sigor@sysoev.ru static void
72753Sigor@sysoev.ru nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data)
72853Sigor@sysoev.ru {
72953Sigor@sysoev.ru     nxt_listen_event_t       *listen;
73053Sigor@sysoev.ru     nxt_listen_socket_t      *ls;
73153Sigor@sysoev.ru     nxt_socket_conf_joint_t  *joint;
73253Sigor@sysoev.ru 
73353Sigor@sysoev.ru     joint = data;
73453Sigor@sysoev.ru 
73553Sigor@sysoev.ru     ls = &joint->socket_conf->listen;
73653Sigor@sysoev.ru 
73753Sigor@sysoev.ru     listen = nxt_listen_event(task, ls);
73853Sigor@sysoev.ru     if (nxt_slow_path(listen == NULL)) {
73953Sigor@sysoev.ru         nxt_router_listen_socket_release(task, joint);
74053Sigor@sysoev.ru         return;
74153Sigor@sysoev.ru     }
74253Sigor@sysoev.ru 
74353Sigor@sysoev.ru     listen->socket.data = joint;
74453Sigor@sysoev.ru }
74553Sigor@sysoev.ru 
74653Sigor@sysoev.ru 
74753Sigor@sysoev.ru nxt_inline nxt_listen_event_t *
74853Sigor@sysoev.ru nxt_router_listen_event(nxt_queue_t *listen_connections,
74953Sigor@sysoev.ru     nxt_socket_conf_t *skcf)
75053Sigor@sysoev.ru {
75153Sigor@sysoev.ru     nxt_socket_t        socket;
75253Sigor@sysoev.ru     nxt_queue_link_t    *link;
75353Sigor@sysoev.ru     nxt_listen_event_t  *listen;
75453Sigor@sysoev.ru 
75553Sigor@sysoev.ru     socket = skcf->listen.socket;
75653Sigor@sysoev.ru 
75753Sigor@sysoev.ru     for (link = nxt_queue_first(listen_connections);
75853Sigor@sysoev.ru          link != nxt_queue_tail(listen_connections);
75953Sigor@sysoev.ru          link = nxt_queue_next(link))
76053Sigor@sysoev.ru     {
76153Sigor@sysoev.ru         listen = nxt_queue_link_data(link, nxt_listen_event_t, link);
76253Sigor@sysoev.ru 
76353Sigor@sysoev.ru         if (socket == listen->socket.fd) {
76453Sigor@sysoev.ru             return listen;
76553Sigor@sysoev.ru         }
76653Sigor@sysoev.ru     }
76753Sigor@sysoev.ru 
76853Sigor@sysoev.ru     return NULL;
76953Sigor@sysoev.ru }
77053Sigor@sysoev.ru 
77153Sigor@sysoev.ru 
77253Sigor@sysoev.ru static void
77353Sigor@sysoev.ru nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data)
77453Sigor@sysoev.ru {
77553Sigor@sysoev.ru     nxt_event_engine_t       *engine;
77653Sigor@sysoev.ru     nxt_listen_event_t       *listen;
77753Sigor@sysoev.ru     nxt_socket_conf_joint_t  *joint, *old;
77853Sigor@sysoev.ru 
77953Sigor@sysoev.ru     engine = obj;
78053Sigor@sysoev.ru     joint = data;
78153Sigor@sysoev.ru 
78253Sigor@sysoev.ru     listen = nxt_router_listen_event(&engine->listen_connections,
78353Sigor@sysoev.ru                                      joint->socket_conf);
78453Sigor@sysoev.ru 
78553Sigor@sysoev.ru     old = listen->socket.data;
78653Sigor@sysoev.ru     listen->socket.data = joint;
78753Sigor@sysoev.ru 
78853Sigor@sysoev.ru     nxt_router_conf_release(task, old);
78953Sigor@sysoev.ru }
79053Sigor@sysoev.ru 
79153Sigor@sysoev.ru 
79253Sigor@sysoev.ru static void
79353Sigor@sysoev.ru nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data)
79453Sigor@sysoev.ru {
79553Sigor@sysoev.ru     nxt_socket_conf_t   *skcf;
79653Sigor@sysoev.ru     nxt_listen_event_t  *listen;
79753Sigor@sysoev.ru     nxt_event_engine_t  *engine;
79853Sigor@sysoev.ru 
79953Sigor@sysoev.ru     engine = obj;
80053Sigor@sysoev.ru     skcf = data;
80153Sigor@sysoev.ru 
80253Sigor@sysoev.ru     listen = nxt_router_listen_event(&engine->listen_connections, skcf);
80353Sigor@sysoev.ru 
80453Sigor@sysoev.ru     nxt_fd_event_delete(engine, &listen->socket);
80553Sigor@sysoev.ru 
80653Sigor@sysoev.ru     listen->timer.handler = nxt_router_listen_socket_close;
80753Sigor@sysoev.ru     listen->timer.work_queue = &engine->fast_work_queue;
80853Sigor@sysoev.ru 
80953Sigor@sysoev.ru     nxt_timer_add(engine, &listen->timer, 0);
81053Sigor@sysoev.ru }
81153Sigor@sysoev.ru 
81253Sigor@sysoev.ru 
81353Sigor@sysoev.ru static void
81453Sigor@sysoev.ru nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data)
81553Sigor@sysoev.ru {
81653Sigor@sysoev.ru     nxt_timer_t              *timer;
81753Sigor@sysoev.ru     nxt_listen_event_t       *listen;
81853Sigor@sysoev.ru     nxt_socket_conf_joint_t  *joint;
81953Sigor@sysoev.ru 
82053Sigor@sysoev.ru     timer = obj;
82153Sigor@sysoev.ru     listen = nxt_timer_data(timer, nxt_listen_event_t, timer);
82253Sigor@sysoev.ru     joint = listen->socket.data;
82353Sigor@sysoev.ru 
82453Sigor@sysoev.ru     nxt_queue_remove(&listen->link);
82553Sigor@sysoev.ru     nxt_free(listen);
82653Sigor@sysoev.ru 
82753Sigor@sysoev.ru     nxt_router_listen_socket_release(task, joint);
82853Sigor@sysoev.ru }
82953Sigor@sysoev.ru 
83053Sigor@sysoev.ru 
83153Sigor@sysoev.ru static void
83253Sigor@sysoev.ru nxt_router_listen_socket_release(nxt_task_t *task,
83353Sigor@sysoev.ru     nxt_socket_conf_joint_t *joint)
83453Sigor@sysoev.ru {
83553Sigor@sysoev.ru     nxt_socket_t           s;
83653Sigor@sysoev.ru     nxt_listen_socket_t    *ls;
83753Sigor@sysoev.ru     nxt_thread_spinlock_t  *lock;
83853Sigor@sysoev.ru 
83953Sigor@sysoev.ru     s = -1;
84053Sigor@sysoev.ru     ls = &joint->socket_conf->listen;
84153Sigor@sysoev.ru     lock = &joint->socket_conf->router_conf->router->lock;
84253Sigor@sysoev.ru 
84353Sigor@sysoev.ru     nxt_thread_spin_lock(lock);
84453Sigor@sysoev.ru 
84553Sigor@sysoev.ru     if (--ls->count == 0) {
84653Sigor@sysoev.ru         s = ls->socket;
84753Sigor@sysoev.ru         ls->socket = -1;
84853Sigor@sysoev.ru     }
84953Sigor@sysoev.ru 
85053Sigor@sysoev.ru     nxt_thread_spin_unlock(lock);
85153Sigor@sysoev.ru 
85253Sigor@sysoev.ru     if (s != -1) {
85353Sigor@sysoev.ru         nxt_socket_close(task, s);
85453Sigor@sysoev.ru     }
85553Sigor@sysoev.ru 
85653Sigor@sysoev.ru     nxt_router_conf_release(task, joint);
85753Sigor@sysoev.ru }
85853Sigor@sysoev.ru 
85953Sigor@sysoev.ru 
86053Sigor@sysoev.ru static void
86153Sigor@sysoev.ru nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
86253Sigor@sysoev.ru {
86353Sigor@sysoev.ru     nxt_socket_conf_t      *skcf;
86453Sigor@sysoev.ru     nxt_router_conf_t      *rtcf;
86553Sigor@sysoev.ru     nxt_thread_spinlock_t  *lock;
86653Sigor@sysoev.ru 
86753Sigor@sysoev.ru     nxt_debug(task, "conf joint count: %D", joint->count);
86853Sigor@sysoev.ru 
86953Sigor@sysoev.ru     if (--joint->count != 0) {
87053Sigor@sysoev.ru         return;
87153Sigor@sysoev.ru     }
87253Sigor@sysoev.ru 
87353Sigor@sysoev.ru     nxt_queue_remove(&joint->link);
87453Sigor@sysoev.ru 
87553Sigor@sysoev.ru     skcf = joint->socket_conf;
87653Sigor@sysoev.ru     rtcf = skcf->router_conf;
87753Sigor@sysoev.ru     lock = &rtcf->router->lock;
87853Sigor@sysoev.ru 
87953Sigor@sysoev.ru     nxt_thread_spin_lock(lock);
88053Sigor@sysoev.ru 
88153Sigor@sysoev.ru     if (--skcf->count != 0) {
88253Sigor@sysoev.ru         rtcf = NULL;
88353Sigor@sysoev.ru 
88453Sigor@sysoev.ru     } else {
88553Sigor@sysoev.ru         nxt_queue_remove(&skcf->link);
88653Sigor@sysoev.ru 
88753Sigor@sysoev.ru         if (--rtcf->count != 0) {
88853Sigor@sysoev.ru             rtcf = NULL;
88953Sigor@sysoev.ru         }
89053Sigor@sysoev.ru     }
89153Sigor@sysoev.ru 
89253Sigor@sysoev.ru     nxt_thread_spin_unlock(lock);
89353Sigor@sysoev.ru 
89453Sigor@sysoev.ru     if (rtcf != NULL) {
89565Sigor@sysoev.ru         nxt_mp_destroy(rtcf->mem_pool);
89653Sigor@sysoev.ru     }
89753Sigor@sysoev.ru 
89853Sigor@sysoev.ru     if (nxt_queue_is_empty(&joint->engine->joints)) {
89953Sigor@sysoev.ru         nxt_thread_exit(task->thread);
90053Sigor@sysoev.ru     }
90153Sigor@sysoev.ru }
90253Sigor@sysoev.ru 
90353Sigor@sysoev.ru 
90453Sigor@sysoev.ru static void
90553Sigor@sysoev.ru nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data)
90653Sigor@sysoev.ru {
90753Sigor@sysoev.ru     nxt_thread_link_t    *link;
90853Sigor@sysoev.ru     nxt_event_engine_t   *engine;
90953Sigor@sysoev.ru     nxt_thread_handle_t  handle;
91053Sigor@sysoev.ru 
91158Svbart@nginx.com     handle = (nxt_thread_handle_t) obj;
91253Sigor@sysoev.ru     link = data;
91353Sigor@sysoev.ru 
91453Sigor@sysoev.ru     nxt_thread_wait(handle);
91553Sigor@sysoev.ru 
91653Sigor@sysoev.ru     engine = link->engine;
91753Sigor@sysoev.ru 
91853Sigor@sysoev.ru     nxt_queue_remove(&engine->link);
91953Sigor@sysoev.ru 
92063Sigor@sysoev.ru     nxt_mp_destroy(engine->mem_pool);
92153Sigor@sysoev.ru 
92253Sigor@sysoev.ru     nxt_event_engine_free(engine);
92353Sigor@sysoev.ru 
92453Sigor@sysoev.ru     nxt_free(link);
92553Sigor@sysoev.ru 
92653Sigor@sysoev.ru     // TODO: free port
92753Sigor@sysoev.ru }
92853Sigor@sysoev.ru 
92953Sigor@sysoev.ru 
93062Sigor@sysoev.ru static const nxt_conn_state_t  nxt_router_conn_read_state
93153Sigor@sysoev.ru     nxt_aligned(64) =
93253Sigor@sysoev.ru {
93353Sigor@sysoev.ru     .ready_handler = nxt_router_conn_http_header_parse,
93453Sigor@sysoev.ru     .close_handler = nxt_router_conn_close,
93553Sigor@sysoev.ru     .error_handler = nxt_router_conn_error,
93653Sigor@sysoev.ru 
93753Sigor@sysoev.ru     .timer_handler = nxt_router_conn_timeout,
93853Sigor@sysoev.ru     .timer_value = nxt_router_conn_timeout_value,
93953Sigor@sysoev.ru     .timer_data = offsetof(nxt_socket_conf_t, header_read_timeout),
94053Sigor@sysoev.ru };
94153Sigor@sysoev.ru 
94253Sigor@sysoev.ru 
94353Sigor@sysoev.ru static void
94453Sigor@sysoev.ru nxt_router_conn_init(nxt_task_t *task, void *obj, void *data)
94553Sigor@sysoev.ru {
94653Sigor@sysoev.ru     size_t                   size;
94762Sigor@sysoev.ru     nxt_conn_t               *c;
94853Sigor@sysoev.ru     nxt_event_engine_t       *engine;
94953Sigor@sysoev.ru     nxt_socket_conf_joint_t  *joint;
95053Sigor@sysoev.ru 
95153Sigor@sysoev.ru     c = obj;
95253Sigor@sysoev.ru     joint = data;
95353Sigor@sysoev.ru 
95453Sigor@sysoev.ru     nxt_debug(task, "router conn init");
95553Sigor@sysoev.ru 
95653Sigor@sysoev.ru     joint->count++;
95753Sigor@sysoev.ru 
95853Sigor@sysoev.ru     size = joint->socket_conf->header_buffer_size;
95953Sigor@sysoev.ru     c->read = nxt_buf_mem_alloc(c->mem_pool, size, 0);
96053Sigor@sysoev.ru 
96153Sigor@sysoev.ru     c->socket.data = NULL;
96253Sigor@sysoev.ru 
96353Sigor@sysoev.ru     engine = task->thread->engine;
96453Sigor@sysoev.ru     c->read_work_queue = &engine->fast_work_queue;
96553Sigor@sysoev.ru     c->write_work_queue = &engine->fast_work_queue;
96653Sigor@sysoev.ru 
96753Sigor@sysoev.ru     c->read_state = &nxt_router_conn_read_state;
96853Sigor@sysoev.ru 
96962Sigor@sysoev.ru     nxt_conn_read(engine, c);
97053Sigor@sysoev.ru }
97153Sigor@sysoev.ru 
97253Sigor@sysoev.ru 
97362Sigor@sysoev.ru static const nxt_conn_state_t  nxt_router_conn_write_state
97453Sigor@sysoev.ru     nxt_aligned(64) =
97553Sigor@sysoev.ru {
976*88Smax.romanov@nginx.com     .ready_handler = nxt_router_conn_ready,
97753Sigor@sysoev.ru     .close_handler = nxt_router_conn_close,
97853Sigor@sysoev.ru     .error_handler = nxt_router_conn_error,
97953Sigor@sysoev.ru };
98053Sigor@sysoev.ru 
98153Sigor@sysoev.ru 
98253Sigor@sysoev.ru static void
983*88Smax.romanov@nginx.com nxt_router_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
984*88Smax.romanov@nginx.com {
985*88Smax.romanov@nginx.com     size_t               dump_size;
986*88Smax.romanov@nginx.com     nxt_buf_t            *b, *i, *last;
987*88Smax.romanov@nginx.com     nxt_conn_t           *c;
988*88Smax.romanov@nginx.com     nxt_work_queue_t     *wq;
989*88Smax.romanov@nginx.com     nxt_req_conn_link_t  *rc;
990*88Smax.romanov@nginx.com     nxt_event_engine_t   *engine;
991*88Smax.romanov@nginx.com 
992*88Smax.romanov@nginx.com     b = msg->buf;
993*88Smax.romanov@nginx.com     engine = task->thread->engine;
994*88Smax.romanov@nginx.com     wq = &engine->fast_work_queue;
995*88Smax.romanov@nginx.com 
996*88Smax.romanov@nginx.com     rc = nxt_event_engine_request_find(engine, msg->port_msg.stream);
997*88Smax.romanov@nginx.com     if (nxt_slow_path(rc == NULL)) {
998*88Smax.romanov@nginx.com 
999*88Smax.romanov@nginx.com         nxt_debug(task, "request id %08uxD not found", msg->port_msg.stream);
1000*88Smax.romanov@nginx.com 
1001*88Smax.romanov@nginx.com         /* complete buffer(s) */
1002*88Smax.romanov@nginx.com         for (i = b; i != NULL; i = i->next) {
1003*88Smax.romanov@nginx.com             i->mem.pos = i->mem.free;
1004*88Smax.romanov@nginx.com 
1005*88Smax.romanov@nginx.com             nxt_work_queue_add(wq, i->completion_handler, task, i, i->parent);
1006*88Smax.romanov@nginx.com         }
1007*88Smax.romanov@nginx.com 
1008*88Smax.romanov@nginx.com         return;
1009*88Smax.romanov@nginx.com     }
1010*88Smax.romanov@nginx.com 
1011*88Smax.romanov@nginx.com     c = rc->conn;
1012*88Smax.romanov@nginx.com 
1013*88Smax.romanov@nginx.com     dump_size = nxt_buf_used_size(b);
1014*88Smax.romanov@nginx.com 
1015*88Smax.romanov@nginx.com     if (dump_size > 300) {
1016*88Smax.romanov@nginx.com         dump_size = 300;
1017*88Smax.romanov@nginx.com     }
1018*88Smax.romanov@nginx.com 
1019*88Smax.romanov@nginx.com     nxt_debug(task, "%srouter data (%z): %*s",
1020*88Smax.romanov@nginx.com               msg->port_msg.last ? "last " : "", msg->size, dump_size,
1021*88Smax.romanov@nginx.com               b->mem.pos);
1022*88Smax.romanov@nginx.com 
1023*88Smax.romanov@nginx.com     if (msg->size == 0) {
1024*88Smax.romanov@nginx.com         b = NULL;
1025