nxt_router.c (113:b0148ec28c4d) nxt_router.c (115:bef7c075837b)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>
9#include <nxt_conf.h>
9#include <nxt_application.h>
10
11
10#include <nxt_application.h>
11
12
13typedef struct {
14 nxt_str_t application_type;
15 uint32_t application_workers;
16} nxt_router_listener_conf_t;
17
18
12static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task,
13 nxt_router_t *router);
14static void nxt_router_listen_sockets_sort(nxt_router_t *router,
15 nxt_router_temp_conf_t *tmcf);
16
19static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task,
20 nxt_router_t *router);
21static void nxt_router_listen_sockets_sort(nxt_router_t *router,
22 nxt_router_temp_conf_t *tmcf);
23
17static nxt_int_t nxt_router_stub_conf(nxt_task_t *task,
18 nxt_router_temp_conf_t *tmcf);
24static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
25 nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
19static nxt_int_t nxt_router_listen_sockets_stub_create(nxt_task_t *task,
20 nxt_router_temp_conf_t *tmcf);
21static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task, nxt_mp_t *mp,
22 nxt_sockaddr_t *sa);
26static nxt_int_t nxt_router_listen_sockets_stub_create(nxt_task_t *task,
27 nxt_router_temp_conf_t *tmcf);
28static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task, nxt_mp_t *mp,
29 nxt_sockaddr_t *sa);
23static nxt_sockaddr_t *nxt_router_listen_sockaddr_stub(nxt_task_t *task,
24 nxt_mp_t *mp, uint32_t port);
25
26static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
27 nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
28 const nxt_event_interface_t *interface);
30
31static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
32 nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
33 const nxt_event_interface_t *interface);
29static nxt_int_t nxt_router_engine_conf_create(nxt_task_t *task, nxt_mp_t *mp,
30 nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf);
31static nxt_int_t nxt_router_engine_conf_update(nxt_task_t *task, nxt_mp_t *mp,
32 nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf);
33static nxt_int_t nxt_router_engine_conf_delete(nxt_task_t *task, nxt_mp_t *mp,
34 nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf);
35static nxt_int_t nxt_router_engine_joints_create(nxt_task_t *task, nxt_mp_t *mp,
34static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
35 nxt_router_engine_conf_t *recf);
36static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
37 nxt_router_engine_conf_t *recf);
38static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
39 nxt_router_engine_conf_t *recf);
40static void nxt_router_engine_socket_count(nxt_queue_t *sockets);
41static nxt_int_t nxt_router_engine_joints_create(nxt_mp_t *mp,
36 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array,
37 nxt_work_handler_t handler);
42 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array,
43 nxt_work_handler_t handler);
38static nxt_int_t nxt_router_engine_joints_delete(nxt_task_t *task,
39 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array);
44static nxt_int_t nxt_router_engine_joints_delete(nxt_router_engine_conf_t *recf,
45 nxt_queue_t *sockets);
40
41static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
42 nxt_router_temp_conf_t *tmcf);
43static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
44 nxt_event_engine_t *engine);
45
46static void nxt_router_engines_post(nxt_router_temp_conf_t *tmcf);
47static void nxt_router_engine_post(nxt_router_engine_conf_t *recf);

--- 25 unchanged lines hidden (view full) ---

73static void nxt_router_conn_error(nxt_task_t *task, void *obj, void *data);
74static void nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data);
75static nxt_msec_t nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data);
76
77
78nxt_int_t
79nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt)
80{
46
47static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
48 nxt_router_temp_conf_t *tmcf);
49static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
50 nxt_event_engine_t *engine);
51
52static void nxt_router_engines_post(nxt_router_temp_conf_t *tmcf);
53static void nxt_router_engine_post(nxt_router_engine_conf_t *recf);

--- 25 unchanged lines hidden (view full) ---

79static void nxt_router_conn_error(nxt_task_t *task, void *obj, void *data);
80static void nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data);
81static nxt_msec_t nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data);
82
83
84nxt_int_t
85nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt)
86{
81 nxt_int_t ret;
82 nxt_router_t *router;
83 nxt_router_temp_conf_t *tmcf;
84 const nxt_event_interface_t *interface;
87 nxt_int_t ret;
88 nxt_router_t *router;
85
86 ret = nxt_app_http_init(task, rt);
87 if (nxt_slow_path(ret != NXT_OK)) {
88 return ret;
89 }
90
91 router = nxt_zalloc(sizeof(nxt_router_t));
92 if (nxt_slow_path(router == NULL)) {
93 return NXT_ERROR;
94 }
95
96 nxt_queue_init(&router->engines);
97 nxt_queue_init(&router->sockets);
98
89
90 ret = nxt_app_http_init(task, rt);
91 if (nxt_slow_path(ret != NXT_OK)) {
92 return ret;
93 }
94
95 router = nxt_zalloc(sizeof(nxt_router_t));
96 if (nxt_slow_path(router == NULL)) {
97 return NXT_ERROR;
98 }
99
100 nxt_queue_init(&router->engines);
101 nxt_queue_init(&router->sockets);
102
99 /**/
103 return NXT_OK;
104}
100
105
106
107nxt_int_t
108nxt_router_new_conf(nxt_task_t *task, nxt_runtime_t *rt, nxt_router_t *router,
109 u_char *start, u_char *end)
110{
111 nxt_int_t ret;
112 nxt_router_temp_conf_t *tmcf;
113 const nxt_event_interface_t *interface;
114
101 tmcf = nxt_router_temp_conf(task, router);
102 if (nxt_slow_path(tmcf == NULL)) {
103 return NXT_ERROR;
104 }
105
115 tmcf = nxt_router_temp_conf(task, router);
116 if (nxt_slow_path(tmcf == NULL)) {
117 return NXT_ERROR;
118 }
119
106 ret = nxt_router_stub_conf(task, tmcf);
120 ret = nxt_router_conf_create(task, tmcf, start, end);
107 if (nxt_slow_path(ret != NXT_OK)) {
108 return ret;
109 }
110
111 nxt_router_listen_sockets_sort(router, tmcf);
112
113 ret = nxt_router_listen_sockets_stub_create(task, tmcf);
114 if (nxt_slow_path(ret != NXT_OK)) {

--- 12 unchanged lines hidden (view full) ---

127 return ret;
128 }
129
130 nxt_router_engines_post(tmcf);
131
132 nxt_queue_add(&router->sockets, &tmcf->updating);
133 nxt_queue_add(&router->sockets, &tmcf->creating);
134
121 if (nxt_slow_path(ret != NXT_OK)) {
122 return ret;
123 }
124
125 nxt_router_listen_sockets_sort(router, tmcf);
126
127 ret = nxt_router_listen_sockets_stub_create(task, tmcf);
128 if (nxt_slow_path(ret != NXT_OK)) {

--- 12 unchanged lines hidden (view full) ---

141 return ret;
142 }
143
144 nxt_router_engines_post(tmcf);
145
146 nxt_queue_add(&router->sockets, &tmcf->updating);
147 nxt_queue_add(&router->sockets, &tmcf->creating);
148
149// nxt_mp_destroy(tmcf->mem_pool);
150
135 return NXT_OK;
136}
137
138
139static nxt_router_temp_conf_t *
140nxt_router_temp_conf(nxt_task_t *task, nxt_router_t *router)
141{
142 nxt_mp_t *mp, *tmp;

--- 48 unchanged lines hidden (view full) ---

191fail:
192
193 nxt_mp_destroy(mp);
194
195 return NULL;
196}
197
198
151 return NXT_OK;
152}
153
154
155static nxt_router_temp_conf_t *
156nxt_router_temp_conf(nxt_task_t *task, nxt_router_t *router)
157{
158 nxt_mp_t *mp, *tmp;

--- 48 unchanged lines hidden (view full) ---

207fail:
208
209 nxt_mp_destroy(mp);
210
211 return NULL;
212}
213
214
215static nxt_conf_map_t nxt_router_conf[] = {
216 {
217 nxt_string("threads"),
218 NXT_CONF_MAP_INT32,
219 offsetof(nxt_router_conf_t, threads),
220 },
221
222 {
223 nxt_null_string, 0, 0,
224 },
225};
226
227
228static nxt_conf_map_t nxt_router_listener_conf[] = {
229 {
230 nxt_string("_application_type"),
231 NXT_CONF_MAP_STR,
232 offsetof(nxt_router_listener_conf_t, application_type),
233 },
234
235 {
236 nxt_string("_application_workers"),
237 NXT_CONF_MAP_INT32,
238 offsetof(nxt_router_listener_conf_t, application_workers),
239 },
240
241 {
242 nxt_null_string, 0, 0,
243 },
244};
245
246
247static nxt_conf_map_t nxt_router_http_conf[] = {
248 {
249 nxt_string("header_buffer_size"),
250 NXT_CONF_MAP_SIZE,
251 offsetof(nxt_socket_conf_t, header_buffer_size),
252 },
253
254 {
255 nxt_string("large_header_buffer_size"),
256 NXT_CONF_MAP_SIZE,
257 offsetof(nxt_socket_conf_t, large_header_buffer_size),
258 },
259
260 {
261 nxt_string("header_read_timeout"),
262 NXT_CONF_MAP_MSEC,
263 offsetof(nxt_socket_conf_t, header_read_timeout),
264 },
265
266 {
267 nxt_null_string, 0, 0,
268 },
269};
270
271
199static nxt_int_t
272static nxt_int_t
200nxt_router_stub_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
273nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
274 u_char *start, u_char *end)
201{
275{
202 nxt_mp_t *mp;
203 nxt_sockaddr_t *sa;
204 nxt_socket_conf_t *skcf;
276 nxt_mp_t *mp;
277 uint32_t next;
278 nxt_int_t ret;
279 nxt_str_t name;
280 nxt_sockaddr_t *sa;
281 nxt_conf_value_t *conf, *listeners, *router, *http, *listener;
282 nxt_socket_conf_t *skcf;
283 nxt_router_listener_conf_t lscf;
205
284
206 tmcf->conf->threads = 1;
285 static nxt_str_t router_path = nxt_string("/router");
286 static nxt_str_t http_path = nxt_string("/http");
287 static nxt_str_t listeners_path = nxt_string("/listeners");
207
288
289 conf = nxt_conf_json_parse(tmcf->mem_pool, start, end);
290 if (conf == NULL) {
291 nxt_log(task, NXT_LOG_CRIT, "configuration parsing error");
292 return NXT_ERROR;
293 }
294
295 router = nxt_conf_get_path(conf, &router_path);
296
297 if (router == NULL) {
298 nxt_log(task, NXT_LOG_CRIT, "no \"/router\" block");
299 return NXT_ERROR;
300 }
301
302 ret = nxt_conf_map_object(router, nxt_router_conf, tmcf->conf);
303 if (ret != NXT_OK) {
304 nxt_log(task, NXT_LOG_CRIT, "router map error");
305 return NXT_ERROR;
306 }
307
308 http = nxt_conf_get_path(conf, &http_path);
309
310 if (http == NULL) {
311 nxt_log(task, NXT_LOG_CRIT, "no \"/http\" block");
312 return NXT_ERROR;
313 }
314
315 listeners = nxt_conf_get_path(conf, &listeners_path);
316
317 if (listeners == NULL) {
318 nxt_log(task, NXT_LOG_CRIT, "no \"/listeners\" block");
319 return NXT_ERROR;
320 }
321
208 mp = tmcf->conf->mem_pool;
209
322 mp = tmcf->conf->mem_pool;
323
210 sa = nxt_router_listen_sockaddr_stub(task, mp, 8000);
211 skcf = nxt_router_socket_conf(task, mp, sa);
324 next = 0;
212
325
213 skcf->listen.handler = nxt_router_conn_init;
214 skcf->header_buffer_size = 2048;
215 skcf->large_header_buffer_size = 8192;
216 skcf->header_read_timeout = 5000;
326 for ( ;; ) {
327 listener = nxt_conf_next_object_member(listeners, &name, &next);
328 if (listener == NULL) {
329 break;
330 }
217
331
218 nxt_queue_insert_tail(&tmcf->pending, &skcf->link);
332 sa = nxt_sockaddr_parse(mp, &name);
333 if (sa == NULL) {
334 nxt_log(task, NXT_LOG_CRIT, "invalid listener \"%V\"", &name);
335 return NXT_ERROR;
336 }
219
337
220 sa = nxt_router_listen_sockaddr_stub(task, mp, 8001);
221 skcf = nxt_router_socket_conf(task, mp, sa);
338 sa->type = SOCK_STREAM;
222
339
223 skcf->listen.handler = nxt_stream_connection_init;
224 skcf->header_read_timeout = 5000;
340 nxt_debug(task, "router listener: \"%*s\"",
341 sa->length, nxt_sockaddr_start(sa));
225
342
226 nxt_queue_insert_tail(&tmcf->pending, &skcf->link);
343 skcf = nxt_router_socket_conf(task, mp, sa);
344 if (skcf == NULL) {
345 return NXT_ERROR;
346 }
227
347
348 ret = nxt_conf_map_object(listener, nxt_router_listener_conf, &lscf);
349 if (ret != NXT_OK) {
350 nxt_log(task, NXT_LOG_CRIT, "listener map error");
351 return NXT_ERROR;
352 }
353
354 nxt_debug(task, "router type: %V", &lscf.application_type);
355 nxt_debug(task, "router workers: %D", lscf.application_workers);
356
357 ret = nxt_conf_map_object(http, nxt_router_http_conf, skcf);
358 if (ret != NXT_OK) {
359 nxt_log(task, NXT_LOG_CRIT, "http map error");
360 return NXT_ERROR;
361 }
362
363 skcf->listen.handler = nxt_router_conn_init;
364 skcf->router_conf = tmcf->conf;
365
366 nxt_queue_insert_tail(&tmcf->pending, &skcf->link);
367 }
368
228 return NXT_OK;
229}
230
231
232static nxt_socket_conf_t *
233nxt_router_socket_conf(nxt_task_t *task, nxt_mp_t *mp, nxt_sockaddr_t *sa)
234{
235 nxt_socket_conf_t *conf;
236
237 conf = nxt_mp_zget(mp, sizeof(nxt_socket_conf_t));
238 if (nxt_slow_path(conf == NULL)) {
239 return NULL;
240 }
241
369 return NXT_OK;
370}
371
372
373static nxt_socket_conf_t *
374nxt_router_socket_conf(nxt_task_t *task, nxt_mp_t *mp, nxt_sockaddr_t *sa)
375{
376 nxt_socket_conf_t *conf;
377
378 conf = nxt_mp_zget(mp, sizeof(nxt_socket_conf_t));
379 if (nxt_slow_path(conf == NULL)) {
380 return NULL;
381 }
382
383 conf->sockaddr = sa;
384
242 conf->listen.sockaddr = sa;
243 conf->listen.socklen = sa->socklen;
244 conf->listen.address_length = sa->length;
245
246 conf->listen.socket = -1;
247 conf->listen.backlog = NXT_LISTEN_BACKLOG;
248 conf->listen.flags = NXT_NONBLOCK;
249 conf->listen.read_after_accept = 1;
250
251 return conf;
252}
253
254
385 conf->listen.sockaddr = sa;
386 conf->listen.socklen = sa->socklen;
387 conf->listen.address_length = sa->length;
388
389 conf->listen.socket = -1;
390 conf->listen.backlog = NXT_LISTEN_BACKLOG;
391 conf->listen.flags = NXT_NONBLOCK;
392 conf->listen.read_after_accept = 1;
393
394 return conf;
395}
396
397
255static nxt_sockaddr_t *
256nxt_router_listen_sockaddr_stub(nxt_task_t *task, nxt_mp_t *mp, uint32_t port)
257{
258 nxt_sockaddr_t *sa;
259 struct sockaddr_in sin;
260
261 nxt_memzero(&sin, sizeof(struct sockaddr_in));
262
263 sin.sin_family = AF_INET;
264 sin.sin_port = htons(port);
265
266 sa = nxt_sockaddr_create(mp, (struct sockaddr *) &sin,
267 sizeof(struct sockaddr_in), NXT_INET_ADDR_STR_LEN);
268 if (nxt_slow_path(sa == NULL)) {
269 return NULL;
270 }
271
272 sa->type = SOCK_STREAM;
273
274 nxt_sockaddr_text(sa);
275
276 return sa;
277}
278
279
280static void
281nxt_router_listen_sockets_sort(nxt_router_t *router,
282 nxt_router_temp_conf_t *tmcf)
283{
284 nxt_queue_link_t *nqlk, *oqlk, *next;
285 nxt_socket_conf_t *nskcf, *oskcf;
286
287 for (nqlk = nxt_queue_first(&tmcf->pending);

--- 4 unchanged lines hidden (view full) ---

292 nskcf = nxt_queue_link_data(nqlk, nxt_socket_conf_t, link);
293
294 for (oqlk = nxt_queue_first(&router->sockets);
295 oqlk != nxt_queue_tail(&router->sockets);
296 oqlk = nxt_queue_next(oqlk))
297 {
298 oskcf = nxt_queue_link_data(oqlk, nxt_socket_conf_t, link);
299
398static void
399nxt_router_listen_sockets_sort(nxt_router_t *router,
400 nxt_router_temp_conf_t *tmcf)
401{
402 nxt_queue_link_t *nqlk, *oqlk, *next;
403 nxt_socket_conf_t *nskcf, *oskcf;
404
405 for (nqlk = nxt_queue_first(&tmcf->pending);

--- 4 unchanged lines hidden (view full) ---

410 nskcf = nxt_queue_link_data(nqlk, nxt_socket_conf_t, link);
411
412 for (oqlk = nxt_queue_first(&router->sockets);
413 oqlk != nxt_queue_tail(&router->sockets);
414 oqlk = nxt_queue_next(oqlk))
415 {
416 oskcf = nxt_queue_link_data(oqlk, nxt_socket_conf_t, link);
417
300 if (nxt_sockaddr_cmp(nskcf->listen.sockaddr,
301 oskcf->listen.sockaddr))
302 {
418 if (nxt_sockaddr_cmp(nskcf->sockaddr, oskcf->sockaddr)) {
419 nskcf->socket = oskcf->socket;
420 nskcf->listen.socket = oskcf->listen.socket;
421
303 nxt_queue_remove(oqlk);
304 nxt_queue_insert_tail(&tmcf->keeping, oqlk);
305
306 nxt_queue_remove(nqlk);
307 nxt_queue_insert_tail(&tmcf->updating, nqlk);
308
309 break;
310 }
311 }
312 }
313
314 nxt_queue_add(&tmcf->deleting, &router->sockets);
422 nxt_queue_remove(oqlk);
423 nxt_queue_insert_tail(&tmcf->keeping, oqlk);
424
425 nxt_queue_remove(nqlk);
426 nxt_queue_insert_tail(&tmcf->updating, nqlk);
427
428 break;
429 }
430 }
431 }
432
433 nxt_queue_add(&tmcf->deleting, &router->sockets);
434 nxt_queue_init(&router->sockets);
315}
316
317
318static nxt_int_t
319nxt_router_listen_sockets_stub_create(nxt_task_t *task,
320 nxt_router_temp_conf_t *tmcf)
321{
435}
436
437
438static nxt_int_t
439nxt_router_listen_sockets_stub_create(nxt_task_t *task,
440 nxt_router_temp_conf_t *tmcf)
441{
322 nxt_queue_link_t *qlk, *nqlk;
323 nxt_socket_conf_t *skcf;
442 nxt_int_t ret;
443 nxt_socket_t s;
444 nxt_queue_link_t *qlk, *nqlk;
445 nxt_socket_conf_t *skcf;
446 nxt_router_socket_t *rtsk;
324
325 for (qlk = nxt_queue_first(&tmcf->pending);
326 qlk != nxt_queue_tail(&tmcf->pending);
327 qlk = nqlk)
328 {
447
448 for (qlk = nxt_queue_first(&tmcf->pending);
449 qlk != nxt_queue_tail(&tmcf->pending);
450 qlk = nqlk)
451 {
452 rtsk = nxt_malloc(sizeof(nxt_router_socket_t));
453 if (nxt_slow_path(rtsk == NULL)) {
454 return NXT_ERROR;
455 }
456
457 rtsk->count = 0;
458
329 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
459 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
460 skcf->socket = rtsk;
330
461
331 if (nxt_listen_socket_create(task, &skcf->listen, 0) != NXT_OK) {
462 s = nxt_listen_socket_create0(task, skcf->sockaddr, NXT_NONBLOCK);
463 if (nxt_slow_path(s == -1)) {
332 return NXT_ERROR;
333 }
334
464 return NXT_ERROR;
465 }
466
467 ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
468 if (nxt_slow_path(ret != NXT_OK)) {
469 return NXT_ERROR;
470 }
471
472 skcf->listen.socket = s;
473
474 rtsk->fd = s;
475
335 nqlk = nxt_queue_next(qlk);
336 nxt_queue_remove(qlk);
337 nxt_queue_insert_tail(&tmcf->creating, qlk);
338 }
339
340 return NXT_OK;
341}
342
343
344static nxt_int_t
345nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
346 nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
347{
476 nqlk = nxt_queue_next(qlk);
477 nxt_queue_remove(qlk);
478 nxt_queue_insert_tail(&tmcf->creating, qlk);
479 }
480
481 return NXT_OK;
482}
483
484
485static nxt_int_t
486nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
487 nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
488{
348 nxt_mp_t *mp;
349 nxt_int_t ret;
350 nxt_uint_t n, threads;
351 nxt_queue_link_t *qlk;
352 nxt_router_engine_conf_t *recf;
353
489 nxt_int_t ret;
490 nxt_uint_t n, threads;
491 nxt_queue_link_t *qlk;
492 nxt_router_engine_conf_t *recf;
493
354 mp = tmcf->conf->mem_pool;
355 threads = tmcf->conf->threads;
356
357 tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
358 sizeof(nxt_router_engine_conf_t));
359 if (nxt_slow_path(tmcf->engines == NULL)) {
360 return NXT_ERROR;
361 }
362
363 n = 0;
364
365 for (qlk = nxt_queue_first(&router->engines);
366 qlk != nxt_queue_tail(&router->engines);
367 qlk = nxt_queue_next(qlk))
368 {
369 recf = nxt_array_zero_add(tmcf->engines);
370 if (nxt_slow_path(recf == NULL)) {
371 return NXT_ERROR;
372 }
373
494 threads = tmcf->conf->threads;
495
496 tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
497 sizeof(nxt_router_engine_conf_t));
498 if (nxt_slow_path(tmcf->engines == NULL)) {
499 return NXT_ERROR;
500 }
501
502 n = 0;
503
504 for (qlk = nxt_queue_first(&router->engines);
505 qlk != nxt_queue_tail(&router->engines);
506 qlk = nxt_queue_next(qlk))
507 {
508 recf = nxt_array_zero_add(tmcf->engines);
509 if (nxt_slow_path(recf == NULL)) {
510 return NXT_ERROR;
511 }
512
374 recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link);
513 recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0);
375 // STUB
376 recf->task = recf->engine->task;
377
378 if (n < threads) {
514 // STUB
515 recf->task = recf->engine->task;
516
517 if (n < threads) {
379 ret = nxt_router_engine_conf_update(task, mp, tmcf, recf);
518 ret = nxt_router_engine_conf_update(tmcf, recf);
380
381 } else {
519
520 } else {
382 ret = nxt_router_engine_conf_delete(task, mp, tmcf, recf);
521 ret = nxt_router_engine_conf_delete(tmcf, recf);
383 }
384
385 if (nxt_slow_path(ret != NXT_OK)) {
386 return ret;
387 }
388
389 n++;
390 }

--- 8 unchanged lines hidden (view full) ---

399
400 recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
401 if (nxt_slow_path(recf->engine == NULL)) {
402 return NXT_ERROR;
403 }
404 // STUB
405 recf->task = recf->engine->task;
406
522 }
523
524 if (nxt_slow_path(ret != NXT_OK)) {
525 return ret;
526 }
527
528 n++;
529 }

--- 8 unchanged lines hidden (view full) ---

538
539 recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
540 if (nxt_slow_path(recf->engine == NULL)) {
541 return NXT_ERROR;
542 }
543 // STUB
544 recf->task = recf->engine->task;
545
407 ret = nxt_router_engine_conf_create(task, mp, tmcf, recf);
546 ret = nxt_router_engine_conf_create(tmcf, recf);
408 if (nxt_slow_path(ret != NXT_OK)) {
409 return ret;
410 }
411
547 if (nxt_slow_path(ret != NXT_OK)) {
548 return ret;
549 }
550
551 nxt_queue_insert_tail(&router->engines, &recf->engine->link0);
552
412 n++;
413 }
414
415 return NXT_OK;
416}
417
418
419static nxt_int_t
553 n++;
554 }
555
556 return NXT_OK;
557}
558
559
560static nxt_int_t
420nxt_router_engine_conf_create(nxt_task_t *task, nxt_mp_t *mp,
421 nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf)
561nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
562 nxt_router_engine_conf_t *recf)
422{
563{
423 nxt_int_t ret;
564 nxt_mp_t *mp;
565 nxt_int_t ret;
566 nxt_thread_spinlock_t *lock;
424
425 recf->creating = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t));
426 if (nxt_slow_path(recf->creating == NULL)) {
427 return NXT_ERROR;
428 }
429
567
568 recf->creating = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t));
569 if (nxt_slow_path(recf->creating == NULL)) {
570 return NXT_ERROR;
571 }
572
430 ret = nxt_router_engine_joints_create(task, mp, recf, &tmcf->creating,
573 mp = tmcf->conf->mem_pool;
574
575 ret = nxt_router_engine_joints_create(mp, recf, &tmcf->creating,
431 recf->creating, nxt_router_listen_socket_create);
432 if (nxt_slow_path(ret != NXT_OK)) {
433 return ret;
434 }
435
576 recf->creating, nxt_router_listen_socket_create);
577 if (nxt_slow_path(ret != NXT_OK)) {
578 return ret;
579 }
580
436 return nxt_router_engine_joints_create(task, mp, recf, &tmcf->updating,
581 ret = nxt_router_engine_joints_create(mp, recf, &tmcf->updating,
437 recf->creating, nxt_router_listen_socket_create);
582 recf->creating, nxt_router_listen_socket_create);
583 if (nxt_slow_path(ret != NXT_OK)) {
584 return ret;
585 }
586
587 lock = &tmcf->conf->router->lock;
588
589 nxt_thread_spin_lock(lock);
590
591 nxt_router_engine_socket_count(&tmcf->creating);
592 nxt_router_engine_socket_count(&tmcf->updating);
593
594 nxt_thread_spin_unlock(lock);
595
596 return ret;
438}
439
440
441static nxt_int_t
597}
598
599
600static nxt_int_t
442nxt_router_engine_conf_update(nxt_task_t *task, nxt_mp_t *mp,
443 nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf)
601nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
602 nxt_router_engine_conf_t *recf)
444{
603{
445 nxt_int_t ret;
604 nxt_mp_t *mp;
605 nxt_int_t ret;
606 nxt_thread_spinlock_t *lock;
446
447 recf->creating = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t));
448 if (nxt_slow_path(recf->creating == NULL)) {
449 return NXT_ERROR;
450 }
451
607
608 recf->creating = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t));
609 if (nxt_slow_path(recf->creating == NULL)) {
610 return NXT_ERROR;
611 }
612
452 ret = nxt_router_engine_joints_create(task, mp, recf, &tmcf->creating,
613 mp = tmcf->conf->mem_pool;
614
615 ret = nxt_router_engine_joints_create(mp, recf, &tmcf->creating,
453 recf->creating, nxt_router_listen_socket_create);
454 if (nxt_slow_path(ret != NXT_OK)) {
455 return ret;
456 }
457
458 recf->updating = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t));
459 if (nxt_slow_path(recf->updating == NULL)) {
460 return NXT_ERROR;
461 }
462
616 recf->creating, nxt_router_listen_socket_create);
617 if (nxt_slow_path(ret != NXT_OK)) {
618 return ret;
619 }
620
621 recf->updating = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t));
622 if (nxt_slow_path(recf->updating == NULL)) {
623 return NXT_ERROR;
624 }
625
463 ret = nxt_router_engine_joints_create(task, mp, recf, &tmcf->updating,
626 ret = nxt_router_engine_joints_create(mp, recf, &tmcf->updating,
464 recf->updating, nxt_router_listen_socket_update);
465 if (nxt_slow_path(ret != NXT_OK)) {
466 return ret;
467 }
468
469 recf->deleting = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t));
470 if (nxt_slow_path(recf->deleting == NULL)) {
471 return NXT_ERROR;
472 }
473
627 recf->updating, nxt_router_listen_socket_update);
628 if (nxt_slow_path(ret != NXT_OK)) {
629 return ret;
630 }
631
632 recf->deleting = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t));
633 if (nxt_slow_path(recf->deleting == NULL)) {
634 return NXT_ERROR;
635 }
636
474 return nxt_router_engine_joints_delete(task, recf, &tmcf->deleting,
475 recf->deleting);
637 ret = nxt_router_engine_joints_delete(recf, &tmcf->deleting);
638 if (nxt_slow_path(ret != NXT_OK)) {
639 return ret;
640 }
641
642 lock = &tmcf->conf->router->lock;
643
644 nxt_thread_spin_lock(lock);
645
646 nxt_router_engine_socket_count(&tmcf->creating);
647
648 nxt_thread_spin_unlock(lock);
649
650 return ret;
476}
477
478
479static nxt_int_t
651}
652
653
654static nxt_int_t
480nxt_router_engine_conf_delete(nxt_task_t *task, nxt_mp_t *mp,
481 nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf)
655nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
656 nxt_router_engine_conf_t *recf)
482{
483 nxt_int_t ret;
484
485 recf->deleting = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t));
486 if (nxt_slow_path(recf->deleting == NULL)) {
487 return NXT_ERROR;
488 }
489
657{
658 nxt_int_t ret;
659
660 recf->deleting = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t));
661 if (nxt_slow_path(recf->deleting == NULL)) {
662 return NXT_ERROR;
663 }
664
490 ret = nxt_router_engine_joints_delete(task, recf, &tmcf->updating,
491 recf->deleting);
665 ret = nxt_router_engine_joints_delete(recf, &tmcf->updating);
492 if (nxt_slow_path(ret != NXT_OK)) {
493 return ret;
494 }
495
666 if (nxt_slow_path(ret != NXT_OK)) {
667 return ret;
668 }
669
496 return nxt_router_engine_joints_delete(task, recf, &tmcf->deleting,
497 recf->deleting);
670 return nxt_router_engine_joints_delete(recf, &tmcf->deleting);
498}
499
500
501static nxt_int_t
671}
672
673
674static nxt_int_t
502nxt_router_engine_joints_create(nxt_task_t *task, nxt_mp_t *mp,
503 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array,
675nxt_router_engine_joints_create(nxt_mp_t *mp, nxt_router_engine_conf_t *recf,
676 nxt_queue_t *sockets, nxt_array_t *array,
504 nxt_work_handler_t handler)
505{
506 nxt_work_t *work;
507 nxt_queue_link_t *qlk;
508 nxt_socket_conf_joint_t *joint;
509
510 for (qlk = nxt_queue_first(sockets);
511 qlk != nxt_queue_tail(sockets);

--- 14 unchanged lines hidden (view full) ---

526 return NXT_ERROR;
527 }
528
529 work->data = joint;
530
531 joint->count = 1;
532 joint->socket_conf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
533 joint->engine = recf->engine;
677 nxt_work_handler_t handler)
678{
679 nxt_work_t *work;
680 nxt_queue_link_t *qlk;
681 nxt_socket_conf_joint_t *joint;
682
683 for (qlk = nxt_queue_first(sockets);
684 qlk != nxt_queue_tail(sockets);

--- 14 unchanged lines hidden (view full) ---

699 return NXT_ERROR;
700 }
701
702 work->data = joint;
703
704 joint->count = 1;
705 joint->socket_conf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
706 joint->engine = recf->engine;
707
708 nxt_queue_insert_tail(&joint->engine->joints, &joint->link);
534 }
535
536 return NXT_OK;
537}
538
539
709 }
710
711 return NXT_OK;
712}
713
714
715static void
716nxt_router_engine_socket_count(nxt_queue_t *sockets)
717{
718 nxt_queue_link_t *qlk;
719 nxt_socket_conf_t *skcf;
720
721 for (qlk = nxt_queue_first(sockets);
722 qlk != nxt_queue_tail(sockets);
723 qlk = nxt_queue_next(qlk))
724 {
725 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
726 skcf->socket->count++;
727 }
728}
729
730
540static nxt_int_t
731static nxt_int_t
541nxt_router_engine_joints_delete(nxt_task_t *task,
542 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array)
732nxt_router_engine_joints_delete(nxt_router_engine_conf_t *recf,
733 nxt_queue_t *sockets)
543{
544 nxt_work_t *work;
545 nxt_queue_link_t *qlk;
546
547 for (qlk = nxt_queue_first(sockets);
548 qlk != nxt_queue_tail(sockets);
549 qlk = nxt_queue_next(qlk))
550 {
734{
735 nxt_work_t *work;
736 nxt_queue_link_t *qlk;
737
738 for (qlk = nxt_queue_first(sockets);
739 qlk != nxt_queue_tail(sockets);
740 qlk = nxt_queue_next(qlk))
741 {
551 work = nxt_array_add(array);
742 work = nxt_array_add(recf->deleting);
552 if (nxt_slow_path(work == NULL)) {
553 return NXT_ERROR;
554 }
555
556 work->next = NULL;
557 work->handler = nxt_router_listen_socket_delete;
558 work->task = &recf->task;
559 work->obj = recf->engine;

--- 46 unchanged lines hidden (view full) ---

606 link->start = nxt_router_thread_start;
607 link->engine = engine;
608 link->work.handler = nxt_router_thread_exit_handler;
609 link->work.task = task;
610 link->work.data = link;
611
612 nxt_queue_insert_tail(&rt->engines, &engine->link);
613
743 if (nxt_slow_path(work == NULL)) {
744 return NXT_ERROR;
745 }
746
747 work->next = NULL;
748 work->handler = nxt_router_listen_socket_delete;
749 work->task = &recf->task;
750 work->obj = recf->engine;

--- 46 unchanged lines hidden (view full) ---

797 link->start = nxt_router_thread_start;
798 link->engine = engine;
799 link->work.handler = nxt_router_thread_exit_handler;
800 link->work.task = task;
801 link->work.data = link;
802
803 nxt_queue_insert_tail(&rt->engines, &engine->link);
804
614
615 process = nxt_runtime_process_find(rt, nxt_pid);
616 if (nxt_slow_path(process == NULL)) {
617 return NXT_ERROR;
618 }
619
620 port = nxt_process_port_new(process);
621 if (nxt_slow_path(port == NULL)) {
622 return NXT_ERROR;

--- 12 unchanged lines hidden (view full) ---

635 port->mem_pool = mp;
636 port->engine = 0;
637 port->type = NXT_PROCESS_ROUTER;
638
639 engine->port = port;
640
641 nxt_runtime_port_add(rt, port);
642
805 process = nxt_runtime_process_find(rt, nxt_pid);
806 if (nxt_slow_path(process == NULL)) {
807 return NXT_ERROR;
808 }
809
810 port = nxt_process_port_new(process);
811 if (nxt_slow_path(port == NULL)) {
812 return NXT_ERROR;

--- 12 unchanged lines hidden (view full) ---

825 port->mem_pool = mp;
826 port->engine = 0;
827 port->type = NXT_PROCESS_ROUTER;
828
829 engine->port = port;
830
831 nxt_runtime_port_add(rt, port);
832
643
644 ret = nxt_thread_create(&handle, link);
645
646 if (nxt_slow_path(ret != NXT_OK)) {
647 nxt_queue_remove(&engine->link);
648 }
649
650 return ret;
651}

--- 15 unchanged lines hidden (view full) ---

667
668
669static void
670nxt_router_engine_post(nxt_router_engine_conf_t *recf)
671{
672 nxt_uint_t n;
673 nxt_work_t *work;
674
833 ret = nxt_thread_create(&handle, link);
834
835 if (nxt_slow_path(ret != NXT_OK)) {
836 nxt_queue_remove(&engine->link);
837 }
838
839 return ret;
840}

--- 15 unchanged lines hidden (view full) ---

856
857
858static void
859nxt_router_engine_post(nxt_router_engine_conf_t *recf)
860{
861 nxt_uint_t n;
862 nxt_work_t *work;
863
675 work = recf->creating->elts;
864 if (recf->creating != NULL) {
865 work = recf->creating->elts;
676
866
677 for (n = recf->creating->nelts; n != 0; n--) {
678 nxt_event_engine_post(recf->engine, work);
679 work++;
867 for (n = recf->creating->nelts; n != 0; n--) {
868 nxt_event_engine_post(recf->engine, work);
869 work++;
870 }
680 }
871 }
872
873 if (recf->updating != NULL) {
874 work = recf->updating->elts;
875
876 for (n = recf->updating->nelts; n != 0; n--) {
877 nxt_event_engine_post(recf->engine, work);
878 work++;
879 }
880 }
881
882 if (recf->deleting != NULL) {
883 work = recf->deleting->elts;
884
885 for (n = recf->deleting->nelts; n != 0; n--) {
886 nxt_event_engine_post(recf->engine, work);
887 work++;
888 }
889 }
681}
682
683
684static void
685nxt_router_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
686
687nxt_port_handler_t nxt_router_process_port_handlers[] = {
688 NULL,

--- 56 unchanged lines hidden (view full) ---

745 listen->socket.data = joint;
746}
747
748
749nxt_inline nxt_listen_event_t *
750nxt_router_listen_event(nxt_queue_t *listen_connections,
751 nxt_socket_conf_t *skcf)
752{
890}
891
892
893static void
894nxt_router_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
895
896nxt_port_handler_t nxt_router_process_port_handlers[] = {
897 NULL,

--- 56 unchanged lines hidden (view full) ---

954 listen->socket.data = joint;
955}
956
957
958nxt_inline nxt_listen_event_t *
959nxt_router_listen_event(nxt_queue_t *listen_connections,
960 nxt_socket_conf_t *skcf)
961{
753 nxt_socket_t socket;
754 nxt_queue_link_t *link;
962 nxt_socket_t fd;
963 nxt_queue_link_t *qlk;
755 nxt_listen_event_t *listen;
756
964 nxt_listen_event_t *listen;
965
757 socket = skcf->listen.socket;
966 fd = skcf->socket->fd;
758
967
759 for (link = nxt_queue_first(listen_connections);
760 link != nxt_queue_tail(listen_connections);
761 link = nxt_queue_next(link))
968 for (qlk = nxt_queue_first(listen_connections);
969 qlk != nxt_queue_tail(listen_connections);
970 qlk = nxt_queue_next(qlk))
762 {
971 {
763 listen = nxt_queue_link_data(link, nxt_listen_event_t, link);
972 listen = nxt_queue_link_data(qlk, nxt_listen_event_t, link);
764
973
765 if (socket == listen->socket.fd) {
974 if (fd == listen->socket.fd) {
766 return listen;
767 }
768 }
769
770 return NULL;
771}
772
773

--- 55 unchanged lines hidden (view full) ---

829 nxt_router_listen_socket_release(task, joint);
830}
831
832
833static void
834nxt_router_listen_socket_release(nxt_task_t *task,
835 nxt_socket_conf_joint_t *joint)
836{
975 return listen;
976 }
977 }
978
979 return NULL;
980}
981
982

--- 55 unchanged lines hidden (view full) ---

1038 nxt_router_listen_socket_release(task, joint);
1039}
1040
1041
1042static void
1043nxt_router_listen_socket_release(nxt_task_t *task,
1044 nxt_socket_conf_joint_t *joint)
1045{
837 nxt_socket_t s;
838 nxt_listen_socket_t *ls;
1046 nxt_router_socket_t *rtsk;
839 nxt_thread_spinlock_t *lock;
840
1047 nxt_thread_spinlock_t *lock;
1048
841 s = -1;
842 ls = &joint->socket_conf->listen;
1049 rtsk = joint->socket_conf->socket;
843 lock = &joint->socket_conf->router_conf->router->lock;
844
845 nxt_thread_spin_lock(lock);
846
1050 lock = &joint->socket_conf->router_conf->router->lock;
1051
1052 nxt_thread_spin_lock(lock);
1053
847 if (--ls->count == 0) {
848 s = ls->socket;
849 ls->socket = -1;
1054 if (--rtsk->count != 0) {
1055 rtsk = NULL;
850 }
851
852 nxt_thread_spin_unlock(lock);
853
1056 }
1057
1058 nxt_thread_spin_unlock(lock);
1059
854 if (s != -1) {
855 nxt_socket_close(task, s);
1060 if (rtsk != NULL) {
1061 nxt_socket_close(task, rtsk->fd);
1062 nxt_free(rtsk);
856 }
857
858 nxt_router_conf_release(task, joint);
859}
860
861
862static void
863nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)

--- 25 unchanged lines hidden (view full) ---

889 if (--rtcf->count != 0) {
890 rtcf = NULL;
891 }
892 }
893
894 nxt_thread_spin_unlock(lock);
895
896 if (rtcf != NULL) {
1063 }
1064
1065 nxt_router_conf_release(task, joint);
1066}
1067
1068
1069static void
1070nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)

--- 25 unchanged lines hidden (view full) ---

1096 if (--rtcf->count != 0) {
1097 rtcf = NULL;
1098 }
1099 }
1100
1101 nxt_thread_spin_unlock(lock);
1102
1103 if (rtcf != NULL) {
1104 nxt_debug(task, "old router conf is destroyed");
897 nxt_mp_destroy(rtcf->mem_pool);
898 }
899
900 if (nxt_queue_is_empty(&joint->engine->joints)) {
901 nxt_thread_exit(task->thread);
902 }
903}
904

--- 494 unchanged lines hidden ---
1105 nxt_mp_destroy(rtcf->mem_pool);
1106 }
1107
1108 if (nxt_queue_is_empty(&joint->engine->joints)) {
1109 nxt_thread_exit(task->thread);
1110 }
1111}
1112

--- 494 unchanged lines hidden ---