nxt_router.c (103:88fc973fd7a2) nxt_router.c (113:b0148ec28c4d)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>
9#include <nxt_application.h>
10
11
12static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task,
13 nxt_router_t *router);
14static void nxt_router_listen_sockets_sort(nxt_router_t *router,
15 nxt_router_temp_conf_t *tmcf);
16
17static nxt_int_t nxt_router_stub_conf(nxt_task_t *task,
18 nxt_router_temp_conf_t *tmcf);
19static nxt_int_t nxt_router_listen_sockets_stub_create(nxt_task_t *task,
20 nxt_router_temp_conf_t *tmcf);
21static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task, nxt_mp_t *mp,
22 nxt_sockaddr_t *sa);
23static nxt_sockaddr_t *nxt_router_listen_sockaddr_stub(nxt_task_t *task,
24 nxt_mp_t *mp, uint32_t port);
25
26static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
27 nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
28 const nxt_event_interface_t *interface);
29static nxt_int_t nxt_router_engine_conf_create(nxt_task_t *task, nxt_mp_t *mp,
30 nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf);
31static nxt_int_t nxt_router_engine_conf_update(nxt_task_t *task, nxt_mp_t *mp,
32 nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf);
33static nxt_int_t nxt_router_engine_conf_delete(nxt_task_t *task, nxt_mp_t *mp,
34 nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf);
35static nxt_int_t nxt_router_engine_joints_create(nxt_task_t *task, nxt_mp_t *mp,
36 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array,
37 nxt_work_handler_t handler);
38static nxt_int_t nxt_router_engine_joints_delete(nxt_task_t *task,
39 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array);
40
41static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
42 nxt_router_temp_conf_t *tmcf);
43static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
44 nxt_event_engine_t *engine);
45
46static void nxt_router_engines_post(nxt_router_temp_conf_t *tmcf);
47static void nxt_router_engine_post(nxt_router_engine_conf_t *recf);
48
49static void nxt_router_thread_start(void *data);
50static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
51 void *data);
52static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
53 void *data);
54static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
55 void *data);
56static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
57 void *data);
58static void nxt_router_listen_socket_release(nxt_task_t *task,
59 nxt_socket_conf_joint_t *joint);
60static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
61 void *data);
62static void nxt_router_conf_release(nxt_task_t *task,
63 nxt_socket_conf_joint_t *joint);
64
65static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data);
66static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj,
67 void *data);
68static void nxt_router_process_http_request(nxt_task_t *task,
69 nxt_conn_t *c, nxt_app_parse_ctx_t *ap);
70static void nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data);
71static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data);
72static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data);
73static void nxt_router_conn_error(nxt_task_t *task, void *obj, void *data);
74static void nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data);
75static nxt_msec_t nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data);
76
77
78nxt_int_t
79nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt)
80{
81 nxt_int_t ret;
82 nxt_router_t *router;
83 nxt_router_temp_conf_t *tmcf;
84 const nxt_event_interface_t *interface;
85
86 ret = nxt_app_http_init(task, rt);
87 if (nxt_slow_path(ret != NXT_OK)) {
88 return ret;
89 }
90
91 router = nxt_zalloc(sizeof(nxt_router_t));
92 if (nxt_slow_path(router == NULL)) {
93 return NXT_ERROR;
94 }
95
96 nxt_queue_init(&router->engines);
97 nxt_queue_init(&router->sockets);
98
99 /**/
100
101 tmcf = nxt_router_temp_conf(task, router);
102 if (nxt_slow_path(tmcf == NULL)) {
103 return NXT_ERROR;
104 }
105
106 ret = nxt_router_stub_conf(task, tmcf);
107 if (nxt_slow_path(ret != NXT_OK)) {
108 return ret;
109 }
110
111 nxt_router_listen_sockets_sort(router, tmcf);
112
113 ret = nxt_router_listen_sockets_stub_create(task, tmcf);
114 if (nxt_slow_path(ret != NXT_OK)) {
115 return ret;
116 }
117
118 interface = nxt_service_get(rt->services, "engine", NULL);
119
120 ret = nxt_router_engines_create(task, router, tmcf, interface);
121 if (nxt_slow_path(ret != NXT_OK)) {
122 return ret;
123 }
124
125 ret = nxt_router_threads_create(task, rt, tmcf);
126 if (nxt_slow_path(ret != NXT_OK)) {
127 return ret;
128 }
129
130 nxt_router_engines_post(tmcf);
131
132 nxt_queue_add(&router->sockets, &tmcf->updating);
133 nxt_queue_add(&router->sockets, &tmcf->creating);
134
135 return NXT_OK;
136}
137
138
139static nxt_router_temp_conf_t *
140nxt_router_temp_conf(nxt_task_t *task, nxt_router_t *router)
141{
142 nxt_mp_t *mp, *tmp;
143 nxt_router_conf_t *rtcf;
144 nxt_router_temp_conf_t *tmcf;
145
146 mp = nxt_mp_create(1024, 128, 256, 32);
147 if (nxt_slow_path(mp == NULL)) {
148 return NULL;
149 }
150
151 rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t));
152 if (nxt_slow_path(rtcf == NULL)) {
153 goto fail;
154 }
155
156 rtcf->mem_pool = mp;
157 rtcf->router = router;
158 rtcf->count = 1;
159
160 tmp = nxt_mp_create(1024, 128, 256, 32);
161 if (nxt_slow_path(tmp == NULL)) {
162 goto fail;
163 }
164
165 tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t));
166 if (nxt_slow_path(tmcf == NULL)) {
167 goto temp_fail;
168 }
169
170 tmcf->mem_pool = tmp;
171 tmcf->conf = rtcf;
172
173 tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
174 sizeof(nxt_router_engine_conf_t));
175 if (nxt_slow_path(tmcf->engines == NULL)) {
176 goto temp_fail;
177 }
178
179 nxt_queue_init(&tmcf->deleting);
180 nxt_queue_init(&tmcf->keeping);
181 nxt_queue_init(&tmcf->updating);
182 nxt_queue_init(&tmcf->pending);
183 nxt_queue_init(&tmcf->creating);
184
185 return tmcf;
186
187temp_fail:
188
189 nxt_mp_destroy(tmp);
190
191fail:
192
193 nxt_mp_destroy(mp);
194
195 return NULL;
196}
197
198
199static nxt_int_t
200nxt_router_stub_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
201{
202 nxt_mp_t *mp;
203 nxt_sockaddr_t *sa;
204 nxt_socket_conf_t *skcf;
205
206 tmcf->conf->threads = 1;
207
208 mp = tmcf->conf->mem_pool;
209
210 sa = nxt_router_listen_sockaddr_stub(task, mp, 8000);
211 skcf = nxt_router_socket_conf(task, mp, sa);
212
213 skcf->listen.handler = nxt_router_conn_init;
214 skcf->header_buffer_size = 2048;
215 skcf->large_header_buffer_size = 8192;
216 skcf->header_read_timeout = 5000;
217
218 nxt_queue_insert_tail(&tmcf->pending, &skcf->link);
219
220 sa = nxt_router_listen_sockaddr_stub(task, mp, 8001);
221 skcf = nxt_router_socket_conf(task, mp, sa);
222
223 skcf->listen.handler = nxt_stream_connection_init;
224 skcf->header_read_timeout = 5000;
225
226 nxt_queue_insert_tail(&tmcf->pending, &skcf->link);
227
228 return NXT_OK;
229}
230
231
232static nxt_socket_conf_t *
233nxt_router_socket_conf(nxt_task_t *task, nxt_mp_t *mp, nxt_sockaddr_t *sa)
234{
235 nxt_socket_conf_t *conf;
236
237 conf = nxt_mp_zget(mp, sizeof(nxt_socket_conf_t));
238 if (nxt_slow_path(conf == NULL)) {
239 return NULL;
240 }
241
242 conf->listen.sockaddr = sa;
243 conf->listen.socklen = sa->socklen;
244 conf->listen.address_length = sa->length;
245
246 conf->listen.socket = -1;
247 conf->listen.backlog = NXT_LISTEN_BACKLOG;
248 conf->listen.flags = NXT_NONBLOCK;
249 conf->listen.read_after_accept = 1;
250
251 return conf;
252}
253
254
255static nxt_sockaddr_t *
256nxt_router_listen_sockaddr_stub(nxt_task_t *task, nxt_mp_t *mp, uint32_t port)
257{
258 nxt_sockaddr_t *sa;
259 struct sockaddr_in sin;
260
261 nxt_memzero(&sin, sizeof(struct sockaddr_in));
262
263 sin.sin_family = AF_INET;
264 sin.sin_port = htons(port);
265
266 sa = nxt_sockaddr_create(mp, (struct sockaddr *) &sin,
267 sizeof(struct sockaddr_in), NXT_INET_ADDR_STR_LEN);
268 if (nxt_slow_path(sa == NULL)) {
269 return NULL;
270 }
271
272 sa->type = SOCK_STREAM;
273
274 nxt_sockaddr_text(sa);
275
276 return sa;
277}
278
279
280static void
281nxt_router_listen_sockets_sort(nxt_router_t *router,
282 nxt_router_temp_conf_t *tmcf)
283{
284 nxt_queue_link_t *nqlk, *oqlk, *next;
285 nxt_socket_conf_t *nskcf, *oskcf;
286
287 for (nqlk = nxt_queue_first(&tmcf->pending);
288 nqlk != nxt_queue_tail(&tmcf->pending);
289 nqlk = next)
290 {
291 next = nxt_queue_next(nqlk);
292 nskcf = nxt_queue_link_data(nqlk, nxt_socket_conf_t, link);
293
294 for (oqlk = nxt_queue_first(&router->sockets);
295 oqlk != nxt_queue_tail(&router->sockets);
296 oqlk = nxt_queue_next(oqlk))
297 {
298 oskcf = nxt_queue_link_data(oqlk, nxt_socket_conf_t, link);
299
300 if (nxt_sockaddr_cmp(nskcf->listen.sockaddr,
301 oskcf->listen.sockaddr))
302 {
303 nxt_queue_remove(oqlk);
304 nxt_queue_insert_tail(&tmcf->keeping, oqlk);
305
306 nxt_queue_remove(nqlk);
307 nxt_queue_insert_tail(&tmcf->updating, nqlk);
308
309 break;
310 }
311 }
312 }
313
314 nxt_queue_add(&tmcf->deleting, &router->sockets);
315}
316
317
318static nxt_int_t
319nxt_router_listen_sockets_stub_create(nxt_task_t *task,
320 nxt_router_temp_conf_t *tmcf)
321{
322 nxt_queue_link_t *qlk, *nqlk;
323 nxt_socket_conf_t *skcf;
324
325 for (qlk = nxt_queue_first(&tmcf->pending);
326 qlk != nxt_queue_tail(&tmcf->pending);
327 qlk = nqlk)
328 {
329 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
330
331 if (nxt_listen_socket_create(task, &skcf->listen, 0) != NXT_OK) {
332 return NXT_ERROR;
333 }
334
335 nqlk = nxt_queue_next(qlk);
336 nxt_queue_remove(qlk);
337 nxt_queue_insert_tail(&tmcf->creating, qlk);
338 }
339
340 return NXT_OK;
341}
342
343
344static nxt_int_t
345nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
346 nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
347{
348 nxt_mp_t *mp;
349 nxt_int_t ret;
350 nxt_uint_t n, threads;
351 nxt_queue_link_t *qlk;
352 nxt_router_engine_conf_t *recf;
353
354 mp = tmcf->conf->mem_pool;
355 threads = tmcf->conf->threads;
356
357 tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
358 sizeof(nxt_router_engine_conf_t));
359 if (nxt_slow_path(tmcf->engines == NULL)) {
360 return NXT_ERROR;
361 }
362
363 n = 0;
364
365 for (qlk = nxt_queue_first(&router->engines);
366 qlk != nxt_queue_tail(&router->engines);
367 qlk = nxt_queue_next(qlk))
368 {
369 recf = nxt_array_zero_add(tmcf->engines);
370 if (nxt_slow_path(recf == NULL)) {
371 return NXT_ERROR;
372 }
373
374 recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link);
375 // STUB
376 recf->task = recf->engine->task;
377
378 if (n < threads) {
379 ret = nxt_router_engine_conf_update(task, mp, tmcf, recf);
380
381 } else {
382 ret = nxt_router_engine_conf_delete(task, mp, tmcf, recf);
383 }
384
385 if (nxt_slow_path(ret != NXT_OK)) {
386 return ret;
387 }
388
389 n++;
390 }
391
392 tmcf->new_threads = n;
393
394 while (n < threads) {
395 recf = nxt_array_zero_add(tmcf->engines);
396 if (nxt_slow_path(recf == NULL)) {
397 return NXT_ERROR;
398 }
399
400 recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
401 if (nxt_slow_path(recf->engine == NULL)) {
402 return NXT_ERROR;
403 }
404 // STUB
405 recf->task = recf->engine->task;
406
407 ret = nxt_router_engine_conf_create(task, mp, tmcf, recf);
408 if (nxt_slow_path(ret != NXT_OK)) {
409 return ret;
410 }
411
412 n++;
413 }
414
415 return NXT_OK;
416}
417
418
419static nxt_int_t
420nxt_router_engine_conf_create(nxt_task_t *task, nxt_mp_t *mp,
421 nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf)
422{
423 nxt_int_t ret;
424
425 recf->creating = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t));
426 if (nxt_slow_path(recf->creating == NULL)) {
427 return NXT_ERROR;
428 }
429
430 ret = nxt_router_engine_joints_create(task, mp, recf, &tmcf->creating,
431 recf->creating, nxt_router_listen_socket_create);
432 if (nxt_slow_path(ret != NXT_OK)) {
433 return ret;
434 }
435
436 return nxt_router_engine_joints_create(task, mp, recf, &tmcf->updating,
437 recf->creating, nxt_router_listen_socket_create);
438}
439
440
441static nxt_int_t
442nxt_router_engine_conf_update(nxt_task_t *task, nxt_mp_t *mp,
443 nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf)
444{
445 nxt_int_t ret;
446
447 recf->creating = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t));
448 if (nxt_slow_path(recf->creating == NULL)) {
449 return NXT_ERROR;
450 }
451
452 ret = nxt_router_engine_joints_create(task, mp, recf, &tmcf->creating,
453 recf->creating, nxt_router_listen_socket_create);
454 if (nxt_slow_path(ret != NXT_OK)) {
455 return ret;
456 }
457
458 recf->updating = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t));
459 if (nxt_slow_path(recf->updating == NULL)) {
460 return NXT_ERROR;
461 }
462
463 ret = nxt_router_engine_joints_create(task, mp, recf, &tmcf->updating,
464 recf->updating, nxt_router_listen_socket_update);
465 if (nxt_slow_path(ret != NXT_OK)) {
466 return ret;
467 }
468
469 recf->deleting = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t));
470 if (nxt_slow_path(recf->deleting == NULL)) {
471 return NXT_ERROR;
472 }
473
474 return nxt_router_engine_joints_delete(task, recf, &tmcf->deleting,
475 recf->deleting);
476}
477
478
479static nxt_int_t
480nxt_router_engine_conf_delete(nxt_task_t *task, nxt_mp_t *mp,
481 nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf)
482{
483 nxt_int_t ret;
484
485 recf->deleting = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t));
486 if (nxt_slow_path(recf->deleting == NULL)) {
487 return NXT_ERROR;
488 }
489
490 ret = nxt_router_engine_joints_delete(task, recf, &tmcf->updating,
491 recf->deleting);
492 if (nxt_slow_path(ret != NXT_OK)) {
493 return ret;
494 }
495
496 return nxt_router_engine_joints_delete(task, recf, &tmcf->deleting,
497 recf->deleting);
498}
499
500
501static nxt_int_t
502nxt_router_engine_joints_create(nxt_task_t *task, nxt_mp_t *mp,
503 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array,
504 nxt_work_handler_t handler)
505{
506 nxt_work_t *work;
507 nxt_queue_link_t *qlk;
508 nxt_socket_conf_joint_t *joint;
509
510 for (qlk = nxt_queue_first(sockets);
511 qlk != nxt_queue_tail(sockets);
512 qlk = nxt_queue_next(qlk))
513 {
514 work = nxt_array_add(array);
515 if (nxt_slow_path(work == NULL)) {
516 return NXT_ERROR;
517 }
518
519 work->next = NULL;
520 work->handler = handler;
521 work->task = &recf->task;
522 work->obj = recf->engine;
523
524 joint = nxt_mp_alloc(mp, sizeof(nxt_socket_conf_joint_t));
525 if (nxt_slow_path(joint == NULL)) {
526 return NXT_ERROR;
527 }
528
529 work->data = joint;
530
531 joint->count = 1;
532 joint->socket_conf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
533 joint->engine = recf->engine;
534 }
535
536 return NXT_OK;
537}
538
539
540static nxt_int_t
541nxt_router_engine_joints_delete(nxt_task_t *task,
542 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array)
543{
544 nxt_work_t *work;
545 nxt_queue_link_t *qlk;
546
547 for (qlk = nxt_queue_first(sockets);
548 qlk != nxt_queue_tail(sockets);
549 qlk = nxt_queue_next(qlk))
550 {
551 work = nxt_array_add(array);
552 if (nxt_slow_path(work == NULL)) {
553 return NXT_ERROR;
554 }
555
556 work->next = NULL;
557 work->handler = nxt_router_listen_socket_delete;
558 work->task = &recf->task;
559 work->obj = recf->engine;
560 work->data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
561 }
562
563 return NXT_OK;
564}
565
566
567static nxt_int_t
568nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
569 nxt_router_temp_conf_t *tmcf)
570{
571 nxt_int_t ret;
572 nxt_uint_t i, threads;
573 nxt_router_engine_conf_t *recf;
574
575 recf = tmcf->engines->elts;
576 threads = tmcf->conf->threads;
577
578 for (i = tmcf->new_threads; i < threads; i++) {
579 ret = nxt_router_thread_create(task, rt, recf[i].engine);
580 if (nxt_slow_path(ret != NXT_OK)) {
581 return ret;
582 }
583 }
584
585 return NXT_OK;
586}
587
588
589static nxt_int_t
590nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
591 nxt_event_engine_t *engine)
592{
593 nxt_mp_t *mp;
594 nxt_int_t ret;
595 nxt_port_t *port;
596 nxt_process_t *process;
597 nxt_thread_link_t *link;
598 nxt_thread_handle_t handle;
599
600 link = nxt_zalloc(sizeof(nxt_thread_link_t));
601
602 if (nxt_slow_path(link == NULL)) {
603 return NXT_ERROR;
604 }
605
606 link->start = nxt_router_thread_start;
607 link->engine = engine;
608 link->work.handler = nxt_router_thread_exit_handler;
609 link->work.task = task;
610 link->work.data = link;
611
612 nxt_queue_insert_tail(&rt->engines, &engine->link);
613
614
615 process = nxt_runtime_process_find(rt, nxt_pid);
616 if (nxt_slow_path(process == NULL)) {
617 return NXT_ERROR;
618 }
619
620 port = nxt_process_port_new(process);
621 if (nxt_slow_path(port == NULL)) {
622 return NXT_ERROR;
623 }
624
625 ret = nxt_port_socket_init(task, port, 0);
626 if (nxt_slow_path(ret != NXT_OK)) {
627 return ret;
628 }
629
630 mp = nxt_mp_create(1024, 128, 256, 32);
631 if (nxt_slow_path(mp == NULL)) {
632 return NXT_ERROR;
633 }
634
635 port->mem_pool = mp;
636 port->engine = 0;
637 port->type = NXT_PROCESS_ROUTER;
638
639 engine->port = port;
640
641 nxt_runtime_port_add(rt, port);
642
643
644 ret = nxt_thread_create(&handle, link);
645
646 if (nxt_slow_path(ret != NXT_OK)) {
647 nxt_queue_remove(&engine->link);
648 }
649
650 return ret;
651}
652
653
654static void
655nxt_router_engines_post(nxt_router_temp_conf_t *tmcf)
656{
657 nxt_uint_t n;
658 nxt_router_engine_conf_t *recf;
659
660 recf = tmcf->engines->elts;
661
662 for (n = tmcf->engines->nelts; n != 0; n--) {
663 nxt_router_engine_post(recf);
664 recf++;
665 }
666}
667
668
669static void
670nxt_router_engine_post(nxt_router_engine_conf_t *recf)
671{
672 nxt_uint_t n;
673 nxt_work_t *work;
674
675 work = recf->creating->elts;
676
677 for (n = recf->creating->nelts; n != 0; n--) {
678 nxt_event_engine_post(recf->engine, work);
679 work++;
680 }
681}
682
683
684static void
685nxt_router_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
686
687nxt_port_handler_t nxt_router_process_port_handlers[] = {
688 NULL,
689 nxt_port_new_port_handler,
690 nxt_port_change_log_file_handler,
691 nxt_port_mmap_handler,
692 nxt_router_data_handler,
693};
694
695
696static void
697nxt_router_thread_start(void *data)
698{
699 nxt_task_t *task;
700 nxt_thread_t *thread;
701 nxt_thread_link_t *link;
702 nxt_event_engine_t *engine;
703
704 link = data;
705 engine = link->engine;
706 task = &engine->task;
707
708 thread = nxt_thread();
709
710 /* STUB */
711 thread->runtime = engine->task.thread->runtime;
712
713 engine->task.thread = thread;
714 engine->task.log = thread->log;
715 thread->engine = engine;
716 thread->task = &engine->task;
717 thread->fiber = &engine->fibers->fiber;
718
719 engine->port->socket.task = task;
720 nxt_port_create(task, engine->port, nxt_router_process_port_handlers);
721
722 engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
723
724 nxt_event_engine_start(engine);
725}
726
727
728static void
729nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data)
730{
731 nxt_listen_event_t *listen;
732 nxt_listen_socket_t *ls;
733 nxt_socket_conf_joint_t *joint;
734
735 joint = data;
736
737 ls = &joint->socket_conf->listen;
738
739 listen = nxt_listen_event(task, ls);
740 if (nxt_slow_path(listen == NULL)) {
741 nxt_router_listen_socket_release(task, joint);
742 return;
743 }
744
745 listen->socket.data = joint;
746}
747
748
749nxt_inline nxt_listen_event_t *
750nxt_router_listen_event(nxt_queue_t *listen_connections,
751 nxt_socket_conf_t *skcf)
752{
753 nxt_socket_t socket;
754 nxt_queue_link_t *link;
755 nxt_listen_event_t *listen;
756
757 socket = skcf->listen.socket;
758
759 for (link = nxt_queue_first(listen_connections);
760 link != nxt_queue_tail(listen_connections);
761 link = nxt_queue_next(link))
762 {
763 listen = nxt_queue_link_data(link, nxt_listen_event_t, link);
764
765 if (socket == listen->socket.fd) {
766 return listen;
767 }
768 }
769
770 return NULL;
771}
772
773
774static void
775nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data)
776{
777 nxt_event_engine_t *engine;
778 nxt_listen_event_t *listen;
779 nxt_socket_conf_joint_t *joint, *old;
780
781 engine = obj;
782 joint = data;
783
784 listen = nxt_router_listen_event(&engine->listen_connections,
785 joint->socket_conf);
786
787 old = listen->socket.data;
788 listen->socket.data = joint;
789
790 nxt_router_conf_release(task, old);
791}
792
793
794static void
795nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data)
796{
797 nxt_socket_conf_t *skcf;
798 nxt_listen_event_t *listen;
799 nxt_event_engine_t *engine;
800
801 engine = obj;
802 skcf = data;
803
804 listen = nxt_router_listen_event(&engine->listen_connections, skcf);
805
806 nxt_fd_event_delete(engine, &listen->socket);
807
808 listen->timer.handler = nxt_router_listen_socket_close;
809 listen->timer.work_queue = &engine->fast_work_queue;
810
811 nxt_timer_add(engine, &listen->timer, 0);
812}
813
814
815static void
816nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data)
817{
818 nxt_timer_t *timer;
819 nxt_listen_event_t *listen;
820 nxt_socket_conf_joint_t *joint;
821
822 timer = obj;
823 listen = nxt_timer_data(timer, nxt_listen_event_t, timer);
824 joint = listen->socket.data;
825
826 nxt_queue_remove(&listen->link);
827 nxt_free(listen);
828
829 nxt_router_listen_socket_release(task, joint);
830}
831
832
833static void
834nxt_router_listen_socket_release(nxt_task_t *task,
835 nxt_socket_conf_joint_t *joint)
836{
837 nxt_socket_t s;
838 nxt_listen_socket_t *ls;
839 nxt_thread_spinlock_t *lock;
840
841 s = -1;
842 ls = &joint->socket_conf->listen;
843 lock = &joint->socket_conf->router_conf->router->lock;
844
845 nxt_thread_spin_lock(lock);
846
847 if (--ls->count == 0) {
848 s = ls->socket;
849 ls->socket = -1;
850 }
851
852 nxt_thread_spin_unlock(lock);
853
854 if (s != -1) {
855 nxt_socket_close(task, s);
856 }
857
858 nxt_router_conf_release(task, joint);
859}
860
861
862static void
863nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
864{
865 nxt_socket_conf_t *skcf;
866 nxt_router_conf_t *rtcf;
867 nxt_thread_spinlock_t *lock;
868
869 nxt_debug(task, "conf joint count: %D", joint->count);
870
871 if (--joint->count != 0) {
872 return;
873 }
874
875 nxt_queue_remove(&joint->link);
876
877 skcf = joint->socket_conf;
878 rtcf = skcf->router_conf;
879 lock = &rtcf->router->lock;
880
881 nxt_thread_spin_lock(lock);
882
883 if (--skcf->count != 0) {
884 rtcf = NULL;
885
886 } else {
887 nxt_queue_remove(&skcf->link);
888
889 if (--rtcf->count != 0) {
890 rtcf = NULL;
891 }
892 }
893
894 nxt_thread_spin_unlock(lock);
895
896 if (rtcf != NULL) {
897 nxt_mp_destroy(rtcf->mem_pool);
898 }
899
900 if (nxt_queue_is_empty(&joint->engine->joints)) {
901 nxt_thread_exit(task->thread);
902 }
903}
904
905
906static void
907nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data)
908{
909 nxt_thread_link_t *link;
910 nxt_event_engine_t *engine;
911 nxt_thread_handle_t handle;
912
913 handle = (nxt_thread_handle_t) obj;
914 link = data;
915
916 nxt_thread_wait(handle);
917
918 engine = link->engine;
919
920 nxt_queue_remove(&engine->link);
921
922 nxt_mp_destroy(engine->mem_pool);
923
924 nxt_event_engine_free(engine);
925
926 nxt_free(link);
927
928 // TODO: free port
929}
930
931
932static const nxt_conn_state_t nxt_router_conn_read_state
933 nxt_aligned(64) =
934{
935 .ready_handler = nxt_router_conn_http_header_parse,
936 .close_handler = nxt_router_conn_close,
937 .error_handler = nxt_router_conn_error,
938
939 .timer_handler = nxt_router_conn_timeout,
940 .timer_value = nxt_router_conn_timeout_value,
941 .timer_data = offsetof(nxt_socket_conf_t, header_read_timeout),
942};
943
944
945static void
946nxt_router_conn_init(nxt_task_t *task, void *obj, void *data)
947{
948 size_t size;
949 nxt_conn_t *c;
950 nxt_event_engine_t *engine;
951 nxt_socket_conf_joint_t *joint;
952
953 c = obj;
954 joint = data;
955
956 nxt_debug(task, "router conn init");
957
958 joint->count++;
959
960 size = joint->socket_conf->header_buffer_size;
961 c->read = nxt_buf_mem_alloc(c->mem_pool, size, 0);
962
963 c->socket.data = NULL;
964
965 engine = task->thread->engine;
966 c->read_work_queue = &engine->fast_work_queue;
967 c->write_work_queue = &engine->fast_work_queue;
968
969 c->read_state = &nxt_router_conn_read_state;
970
971 nxt_conn_read(engine, c);
972}
973
974
975static const nxt_conn_state_t nxt_router_conn_write_state
976 nxt_aligned(64) =
977{
978 .ready_handler = nxt_router_conn_ready,
979 .close_handler = nxt_router_conn_close,
980 .error_handler = nxt_router_conn_error,
981};
982
983
984static void
985nxt_router_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
986{
987 size_t dump_size;
988 nxt_buf_t *b, *i, *last;
989 nxt_conn_t *c;
990 nxt_req_conn_link_t *rc;
991 nxt_event_engine_t *engine;
992
993 b = msg->buf;
994 engine = task->thread->engine;
995
996 rc = nxt_event_engine_request_find(engine, msg->port_msg.stream);
997 if (nxt_slow_path(rc == NULL)) {
998
999 nxt_debug(task, "request id %08uxD not found", msg->port_msg.stream);
1000
1001 /* Mark buffers as read. */
1002 for (i = b; i != NULL; i = i->next) {
1003 i->mem.pos = i->mem.free;
1004 }
1005
1006 return;
1007 }
1008
1009 c = rc->conn;
1010
1011 dump_size = nxt_buf_used_size(b);
1012
1013 if (dump_size > 300) {
1014 dump_size = 300;
1015 }
1016
1017 nxt_debug(task, "%srouter data (%z): %*s",
1018 msg->port_msg.last ? "last " : "", msg->size, dump_size,
1019 b->mem.pos);
1020
1021 if (msg->size == 0) {
1022 b = NULL;
1023 }
1024
1025 if (msg->port_msg.last != 0) {
1026 nxt_debug(task, "router data create last buf");
1027
1028 last = nxt_buf_sync_alloc(c->mem_pool, NXT_BUF_SYNC_LAST);
1029 if (nxt_slow_path(last == NULL)) {
1030 /* TODO pogorevaTb */
1031 }
1032
1033 nxt_buf_chain_add(&b, last);
1034 }
1035
1036 if (b == NULL) {
1037 return;
1038 }
1039
1040 if (c->write == NULL) {
1041 c->write = b;
1042 c->write_state = &nxt_router_conn_write_state;
1043
1044 nxt_conn_write(task->thread->engine, c);
1045 } else {
1046 nxt_debug(task, "router data attach out bufs to existing chain");
1047
1048 nxt_buf_chain_add(&c->write, b);
1049 }
1050}
1051
1052
1053nxt_inline nxt_port_t *
1054nxt_router_app_port(nxt_task_t *task)
1055{
1056 nxt_port_t *port;
1057 nxt_runtime_t *rt;
1058
1059 rt = task->thread->runtime;
1060
1061 nxt_runtime_port_each(rt, port) {
1062
1063 if (nxt_pid == port->pid) {
1064 continue;
1065 }
1066
1067 if (port->type == NXT_PROCESS_WORKER) {
1068 return port;
1069 }
1070
1071 } nxt_runtime_port_loop;
1072
1073 return NULL;
1074}
1075
1076
1077static void
1078nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data)
1079{
1080 size_t size, preread;
1081 nxt_int_t ret;
1082 nxt_buf_t *b;
1083 nxt_conn_t *c;
1084 nxt_app_parse_ctx_t *ap;
1085 nxt_socket_conf_joint_t *joint;
1086 nxt_app_request_header_t *h;
1087
1088 c = obj;
1089 ap = data;
1090 b = c->read;
1091
1092 nxt_debug(task, "router conn http header parse");
1093
1094 if (ap == NULL) {
1095 ap = nxt_mp_zget(c->mem_pool, sizeof(nxt_app_parse_ctx_t));
1096 if (nxt_slow_path(ap == NULL)) {
1097 nxt_router_conn_close(task, c, data);
1098 return;
1099 }
1100
1101 ret = nxt_app_http_req_init(task, ap);
1102 if (nxt_slow_path(ret != NXT_OK)) {
1103 nxt_router_conn_close(task, c, data);
1104 return;
1105 }
1106
1107 c->socket.data = ap;
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>
9#include <nxt_application.h>
10
11
12static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task,
13 nxt_router_t *router);
14static void nxt_router_listen_sockets_sort(nxt_router_t *router,
15 nxt_router_temp_conf_t *tmcf);
16
17static nxt_int_t nxt_router_stub_conf(nxt_task_t *task,
18 nxt_router_temp_conf_t *tmcf);
19static nxt_int_t nxt_router_listen_sockets_stub_create(nxt_task_t *task,
20 nxt_router_temp_conf_t *tmcf);
21static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task, nxt_mp_t *mp,
22 nxt_sockaddr_t *sa);
23static nxt_sockaddr_t *nxt_router_listen_sockaddr_stub(nxt_task_t *task,
24 nxt_mp_t *mp, uint32_t port);
25
26static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
27 nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
28 const nxt_event_interface_t *interface);
29static nxt_int_t nxt_router_engine_conf_create(nxt_task_t *task, nxt_mp_t *mp,
30 nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf);
31static nxt_int_t nxt_router_engine_conf_update(nxt_task_t *task, nxt_mp_t *mp,
32 nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf);
33static nxt_int_t nxt_router_engine_conf_delete(nxt_task_t *task, nxt_mp_t *mp,
34 nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf);
35static nxt_int_t nxt_router_engine_joints_create(nxt_task_t *task, nxt_mp_t *mp,
36 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array,
37 nxt_work_handler_t handler);
38static nxt_int_t nxt_router_engine_joints_delete(nxt_task_t *task,
39 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array);
40
41static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
42 nxt_router_temp_conf_t *tmcf);
43static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
44 nxt_event_engine_t *engine);
45
46static void nxt_router_engines_post(nxt_router_temp_conf_t *tmcf);
47static void nxt_router_engine_post(nxt_router_engine_conf_t *recf);
48
49static void nxt_router_thread_start(void *data);
50static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
51 void *data);
52static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
53 void *data);
54static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
55 void *data);
56static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
57 void *data);
58static void nxt_router_listen_socket_release(nxt_task_t *task,
59 nxt_socket_conf_joint_t *joint);
60static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
61 void *data);
62static void nxt_router_conf_release(nxt_task_t *task,
63 nxt_socket_conf_joint_t *joint);
64
65static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data);
66static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj,
67 void *data);
68static void nxt_router_process_http_request(nxt_task_t *task,
69 nxt_conn_t *c, nxt_app_parse_ctx_t *ap);
70static void nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data);
71static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data);
72static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data);
73static void nxt_router_conn_error(nxt_task_t *task, void *obj, void *data);
74static void nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data);
75static nxt_msec_t nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data);
76
77
78nxt_int_t
79nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt)
80{
81 nxt_int_t ret;
82 nxt_router_t *router;
83 nxt_router_temp_conf_t *tmcf;
84 const nxt_event_interface_t *interface;
85
86 ret = nxt_app_http_init(task, rt);
87 if (nxt_slow_path(ret != NXT_OK)) {
88 return ret;
89 }
90
91 router = nxt_zalloc(sizeof(nxt_router_t));
92 if (nxt_slow_path(router == NULL)) {
93 return NXT_ERROR;
94 }
95
96 nxt_queue_init(&router->engines);
97 nxt_queue_init(&router->sockets);
98
99 /**/
100
101 tmcf = nxt_router_temp_conf(task, router);
102 if (nxt_slow_path(tmcf == NULL)) {
103 return NXT_ERROR;
104 }
105
106 ret = nxt_router_stub_conf(task, tmcf);
107 if (nxt_slow_path(ret != NXT_OK)) {
108 return ret;
109 }
110
111 nxt_router_listen_sockets_sort(router, tmcf);
112
113 ret = nxt_router_listen_sockets_stub_create(task, tmcf);
114 if (nxt_slow_path(ret != NXT_OK)) {
115 return ret;
116 }
117
118 interface = nxt_service_get(rt->services, "engine", NULL);
119
120 ret = nxt_router_engines_create(task, router, tmcf, interface);
121 if (nxt_slow_path(ret != NXT_OK)) {
122 return ret;
123 }
124
125 ret = nxt_router_threads_create(task, rt, tmcf);
126 if (nxt_slow_path(ret != NXT_OK)) {
127 return ret;
128 }
129
130 nxt_router_engines_post(tmcf);
131
132 nxt_queue_add(&router->sockets, &tmcf->updating);
133 nxt_queue_add(&router->sockets, &tmcf->creating);
134
135 return NXT_OK;
136}
137
138
139static nxt_router_temp_conf_t *
140nxt_router_temp_conf(nxt_task_t *task, nxt_router_t *router)
141{
142 nxt_mp_t *mp, *tmp;
143 nxt_router_conf_t *rtcf;
144 nxt_router_temp_conf_t *tmcf;
145
146 mp = nxt_mp_create(1024, 128, 256, 32);
147 if (nxt_slow_path(mp == NULL)) {
148 return NULL;
149 }
150
151 rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t));
152 if (nxt_slow_path(rtcf == NULL)) {
153 goto fail;
154 }
155
156 rtcf->mem_pool = mp;
157 rtcf->router = router;
158 rtcf->count = 1;
159
160 tmp = nxt_mp_create(1024, 128, 256, 32);
161 if (nxt_slow_path(tmp == NULL)) {
162 goto fail;
163 }
164
165 tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t));
166 if (nxt_slow_path(tmcf == NULL)) {
167 goto temp_fail;
168 }
169
170 tmcf->mem_pool = tmp;
171 tmcf->conf = rtcf;
172
173 tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
174 sizeof(nxt_router_engine_conf_t));
175 if (nxt_slow_path(tmcf->engines == NULL)) {
176 goto temp_fail;
177 }
178
179 nxt_queue_init(&tmcf->deleting);
180 nxt_queue_init(&tmcf->keeping);
181 nxt_queue_init(&tmcf->updating);
182 nxt_queue_init(&tmcf->pending);
183 nxt_queue_init(&tmcf->creating);
184
185 return tmcf;
186
187temp_fail:
188
189 nxt_mp_destroy(tmp);
190
191fail:
192
193 nxt_mp_destroy(mp);
194
195 return NULL;
196}
197
198
199static nxt_int_t
200nxt_router_stub_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
201{
202 nxt_mp_t *mp;
203 nxt_sockaddr_t *sa;
204 nxt_socket_conf_t *skcf;
205
206 tmcf->conf->threads = 1;
207
208 mp = tmcf->conf->mem_pool;
209
210 sa = nxt_router_listen_sockaddr_stub(task, mp, 8000);
211 skcf = nxt_router_socket_conf(task, mp, sa);
212
213 skcf->listen.handler = nxt_router_conn_init;
214 skcf->header_buffer_size = 2048;
215 skcf->large_header_buffer_size = 8192;
216 skcf->header_read_timeout = 5000;
217
218 nxt_queue_insert_tail(&tmcf->pending, &skcf->link);
219
220 sa = nxt_router_listen_sockaddr_stub(task, mp, 8001);
221 skcf = nxt_router_socket_conf(task, mp, sa);
222
223 skcf->listen.handler = nxt_stream_connection_init;
224 skcf->header_read_timeout = 5000;
225
226 nxt_queue_insert_tail(&tmcf->pending, &skcf->link);
227
228 return NXT_OK;
229}
230
231
232static nxt_socket_conf_t *
233nxt_router_socket_conf(nxt_task_t *task, nxt_mp_t *mp, nxt_sockaddr_t *sa)
234{
235 nxt_socket_conf_t *conf;
236
237 conf = nxt_mp_zget(mp, sizeof(nxt_socket_conf_t));
238 if (nxt_slow_path(conf == NULL)) {
239 return NULL;
240 }
241
242 conf->listen.sockaddr = sa;
243 conf->listen.socklen = sa->socklen;
244 conf->listen.address_length = sa->length;
245
246 conf->listen.socket = -1;
247 conf->listen.backlog = NXT_LISTEN_BACKLOG;
248 conf->listen.flags = NXT_NONBLOCK;
249 conf->listen.read_after_accept = 1;
250
251 return conf;
252}
253
254
255static nxt_sockaddr_t *
256nxt_router_listen_sockaddr_stub(nxt_task_t *task, nxt_mp_t *mp, uint32_t port)
257{
258 nxt_sockaddr_t *sa;
259 struct sockaddr_in sin;
260
261 nxt_memzero(&sin, sizeof(struct sockaddr_in));
262
263 sin.sin_family = AF_INET;
264 sin.sin_port = htons(port);
265
266 sa = nxt_sockaddr_create(mp, (struct sockaddr *) &sin,
267 sizeof(struct sockaddr_in), NXT_INET_ADDR_STR_LEN);
268 if (nxt_slow_path(sa == NULL)) {
269 return NULL;
270 }
271
272 sa->type = SOCK_STREAM;
273
274 nxt_sockaddr_text(sa);
275
276 return sa;
277}
278
279
280static void
281nxt_router_listen_sockets_sort(nxt_router_t *router,
282 nxt_router_temp_conf_t *tmcf)
283{
284 nxt_queue_link_t *nqlk, *oqlk, *next;
285 nxt_socket_conf_t *nskcf, *oskcf;
286
287 for (nqlk = nxt_queue_first(&tmcf->pending);
288 nqlk != nxt_queue_tail(&tmcf->pending);
289 nqlk = next)
290 {
291 next = nxt_queue_next(nqlk);
292 nskcf = nxt_queue_link_data(nqlk, nxt_socket_conf_t, link);
293
294 for (oqlk = nxt_queue_first(&router->sockets);
295 oqlk != nxt_queue_tail(&router->sockets);
296 oqlk = nxt_queue_next(oqlk))
297 {
298 oskcf = nxt_queue_link_data(oqlk, nxt_socket_conf_t, link);
299
300 if (nxt_sockaddr_cmp(nskcf->listen.sockaddr,
301 oskcf->listen.sockaddr))
302 {
303 nxt_queue_remove(oqlk);
304 nxt_queue_insert_tail(&tmcf->keeping, oqlk);
305
306 nxt_queue_remove(nqlk);
307 nxt_queue_insert_tail(&tmcf->updating, nqlk);
308
309 break;
310 }
311 }
312 }
313
314 nxt_queue_add(&tmcf->deleting, &router->sockets);
315}
316
317
318static nxt_int_t
319nxt_router_listen_sockets_stub_create(nxt_task_t *task,
320 nxt_router_temp_conf_t *tmcf)
321{
322 nxt_queue_link_t *qlk, *nqlk;
323 nxt_socket_conf_t *skcf;
324
325 for (qlk = nxt_queue_first(&tmcf->pending);
326 qlk != nxt_queue_tail(&tmcf->pending);
327 qlk = nqlk)
328 {
329 skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
330
331 if (nxt_listen_socket_create(task, &skcf->listen, 0) != NXT_OK) {
332 return NXT_ERROR;
333 }
334
335 nqlk = nxt_queue_next(qlk);
336 nxt_queue_remove(qlk);
337 nxt_queue_insert_tail(&tmcf->creating, qlk);
338 }
339
340 return NXT_OK;
341}
342
343
344static nxt_int_t
345nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
346 nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
347{
348 nxt_mp_t *mp;
349 nxt_int_t ret;
350 nxt_uint_t n, threads;
351 nxt_queue_link_t *qlk;
352 nxt_router_engine_conf_t *recf;
353
354 mp = tmcf->conf->mem_pool;
355 threads = tmcf->conf->threads;
356
357 tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
358 sizeof(nxt_router_engine_conf_t));
359 if (nxt_slow_path(tmcf->engines == NULL)) {
360 return NXT_ERROR;
361 }
362
363 n = 0;
364
365 for (qlk = nxt_queue_first(&router->engines);
366 qlk != nxt_queue_tail(&router->engines);
367 qlk = nxt_queue_next(qlk))
368 {
369 recf = nxt_array_zero_add(tmcf->engines);
370 if (nxt_slow_path(recf == NULL)) {
371 return NXT_ERROR;
372 }
373
374 recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link);
375 // STUB
376 recf->task = recf->engine->task;
377
378 if (n < threads) {
379 ret = nxt_router_engine_conf_update(task, mp, tmcf, recf);
380
381 } else {
382 ret = nxt_router_engine_conf_delete(task, mp, tmcf, recf);
383 }
384
385 if (nxt_slow_path(ret != NXT_OK)) {
386 return ret;
387 }
388
389 n++;
390 }
391
392 tmcf->new_threads = n;
393
394 while (n < threads) {
395 recf = nxt_array_zero_add(tmcf->engines);
396 if (nxt_slow_path(recf == NULL)) {
397 return NXT_ERROR;
398 }
399
400 recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
401 if (nxt_slow_path(recf->engine == NULL)) {
402 return NXT_ERROR;
403 }
404 // STUB
405 recf->task = recf->engine->task;
406
407 ret = nxt_router_engine_conf_create(task, mp, tmcf, recf);
408 if (nxt_slow_path(ret != NXT_OK)) {
409 return ret;
410 }
411
412 n++;
413 }
414
415 return NXT_OK;
416}
417
418
419static nxt_int_t
420nxt_router_engine_conf_create(nxt_task_t *task, nxt_mp_t *mp,
421 nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf)
422{
423 nxt_int_t ret;
424
425 recf->creating = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t));
426 if (nxt_slow_path(recf->creating == NULL)) {
427 return NXT_ERROR;
428 }
429
430 ret = nxt_router_engine_joints_create(task, mp, recf, &tmcf->creating,
431 recf->creating, nxt_router_listen_socket_create);
432 if (nxt_slow_path(ret != NXT_OK)) {
433 return ret;
434 }
435
436 return nxt_router_engine_joints_create(task, mp, recf, &tmcf->updating,
437 recf->creating, nxt_router_listen_socket_create);
438}
439
440
441static nxt_int_t
442nxt_router_engine_conf_update(nxt_task_t *task, nxt_mp_t *mp,
443 nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf)
444{
445 nxt_int_t ret;
446
447 recf->creating = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t));
448 if (nxt_slow_path(recf->creating == NULL)) {
449 return NXT_ERROR;
450 }
451
452 ret = nxt_router_engine_joints_create(task, mp, recf, &tmcf->creating,
453 recf->creating, nxt_router_listen_socket_create);
454 if (nxt_slow_path(ret != NXT_OK)) {
455 return ret;
456 }
457
458 recf->updating = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t));
459 if (nxt_slow_path(recf->updating == NULL)) {
460 return NXT_ERROR;
461 }
462
463 ret = nxt_router_engine_joints_create(task, mp, recf, &tmcf->updating,
464 recf->updating, nxt_router_listen_socket_update);
465 if (nxt_slow_path(ret != NXT_OK)) {
466 return ret;
467 }
468
469 recf->deleting = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t));
470 if (nxt_slow_path(recf->deleting == NULL)) {
471 return NXT_ERROR;
472 }
473
474 return nxt_router_engine_joints_delete(task, recf, &tmcf->deleting,
475 recf->deleting);
476}
477
478
479static nxt_int_t
480nxt_router_engine_conf_delete(nxt_task_t *task, nxt_mp_t *mp,
481 nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf)
482{
483 nxt_int_t ret;
484
485 recf->deleting = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t));
486 if (nxt_slow_path(recf->deleting == NULL)) {
487 return NXT_ERROR;
488 }
489
490 ret = nxt_router_engine_joints_delete(task, recf, &tmcf->updating,
491 recf->deleting);
492 if (nxt_slow_path(ret != NXT_OK)) {
493 return ret;
494 }
495
496 return nxt_router_engine_joints_delete(task, recf, &tmcf->deleting,
497 recf->deleting);
498}
499
500
501static nxt_int_t
502nxt_router_engine_joints_create(nxt_task_t *task, nxt_mp_t *mp,
503 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array,
504 nxt_work_handler_t handler)
505{
506 nxt_work_t *work;
507 nxt_queue_link_t *qlk;
508 nxt_socket_conf_joint_t *joint;
509
510 for (qlk = nxt_queue_first(sockets);
511 qlk != nxt_queue_tail(sockets);
512 qlk = nxt_queue_next(qlk))
513 {
514 work = nxt_array_add(array);
515 if (nxt_slow_path(work == NULL)) {
516 return NXT_ERROR;
517 }
518
519 work->next = NULL;
520 work->handler = handler;
521 work->task = &recf->task;
522 work->obj = recf->engine;
523
524 joint = nxt_mp_alloc(mp, sizeof(nxt_socket_conf_joint_t));
525 if (nxt_slow_path(joint == NULL)) {
526 return NXT_ERROR;
527 }
528
529 work->data = joint;
530
531 joint->count = 1;
532 joint->socket_conf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
533 joint->engine = recf->engine;
534 }
535
536 return NXT_OK;
537}
538
539
540static nxt_int_t
541nxt_router_engine_joints_delete(nxt_task_t *task,
542 nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array)
543{
544 nxt_work_t *work;
545 nxt_queue_link_t *qlk;
546
547 for (qlk = nxt_queue_first(sockets);
548 qlk != nxt_queue_tail(sockets);
549 qlk = nxt_queue_next(qlk))
550 {
551 work = nxt_array_add(array);
552 if (nxt_slow_path(work == NULL)) {
553 return NXT_ERROR;
554 }
555
556 work->next = NULL;
557 work->handler = nxt_router_listen_socket_delete;
558 work->task = &recf->task;
559 work->obj = recf->engine;
560 work->data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
561 }
562
563 return NXT_OK;
564}
565
566
567static nxt_int_t
568nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
569 nxt_router_temp_conf_t *tmcf)
570{
571 nxt_int_t ret;
572 nxt_uint_t i, threads;
573 nxt_router_engine_conf_t *recf;
574
575 recf = tmcf->engines->elts;
576 threads = tmcf->conf->threads;
577
578 for (i = tmcf->new_threads; i < threads; i++) {
579 ret = nxt_router_thread_create(task, rt, recf[i].engine);
580 if (nxt_slow_path(ret != NXT_OK)) {
581 return ret;
582 }
583 }
584
585 return NXT_OK;
586}
587
588
589static nxt_int_t
590nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
591 nxt_event_engine_t *engine)
592{
593 nxt_mp_t *mp;
594 nxt_int_t ret;
595 nxt_port_t *port;
596 nxt_process_t *process;
597 nxt_thread_link_t *link;
598 nxt_thread_handle_t handle;
599
600 link = nxt_zalloc(sizeof(nxt_thread_link_t));
601
602 if (nxt_slow_path(link == NULL)) {
603 return NXT_ERROR;
604 }
605
606 link->start = nxt_router_thread_start;
607 link->engine = engine;
608 link->work.handler = nxt_router_thread_exit_handler;
609 link->work.task = task;
610 link->work.data = link;
611
612 nxt_queue_insert_tail(&rt->engines, &engine->link);
613
614
615 process = nxt_runtime_process_find(rt, nxt_pid);
616 if (nxt_slow_path(process == NULL)) {
617 return NXT_ERROR;
618 }
619
620 port = nxt_process_port_new(process);
621 if (nxt_slow_path(port == NULL)) {
622 return NXT_ERROR;
623 }
624
625 ret = nxt_port_socket_init(task, port, 0);
626 if (nxt_slow_path(ret != NXT_OK)) {
627 return ret;
628 }
629
630 mp = nxt_mp_create(1024, 128, 256, 32);
631 if (nxt_slow_path(mp == NULL)) {
632 return NXT_ERROR;
633 }
634
635 port->mem_pool = mp;
636 port->engine = 0;
637 port->type = NXT_PROCESS_ROUTER;
638
639 engine->port = port;
640
641 nxt_runtime_port_add(rt, port);
642
643
644 ret = nxt_thread_create(&handle, link);
645
646 if (nxt_slow_path(ret != NXT_OK)) {
647 nxt_queue_remove(&engine->link);
648 }
649
650 return ret;
651}
652
653
654static void
655nxt_router_engines_post(nxt_router_temp_conf_t *tmcf)
656{
657 nxt_uint_t n;
658 nxt_router_engine_conf_t *recf;
659
660 recf = tmcf->engines->elts;
661
662 for (n = tmcf->engines->nelts; n != 0; n--) {
663 nxt_router_engine_post(recf);
664 recf++;
665 }
666}
667
668
669static void
670nxt_router_engine_post(nxt_router_engine_conf_t *recf)
671{
672 nxt_uint_t n;
673 nxt_work_t *work;
674
675 work = recf->creating->elts;
676
677 for (n = recf->creating->nelts; n != 0; n--) {
678 nxt_event_engine_post(recf->engine, work);
679 work++;
680 }
681}
682
683
684static void
685nxt_router_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
686
687nxt_port_handler_t nxt_router_process_port_handlers[] = {
688 NULL,
689 nxt_port_new_port_handler,
690 nxt_port_change_log_file_handler,
691 nxt_port_mmap_handler,
692 nxt_router_data_handler,
693};
694
695
696static void
697nxt_router_thread_start(void *data)
698{
699 nxt_task_t *task;
700 nxt_thread_t *thread;
701 nxt_thread_link_t *link;
702 nxt_event_engine_t *engine;
703
704 link = data;
705 engine = link->engine;
706 task = &engine->task;
707
708 thread = nxt_thread();
709
710 /* STUB */
711 thread->runtime = engine->task.thread->runtime;
712
713 engine->task.thread = thread;
714 engine->task.log = thread->log;
715 thread->engine = engine;
716 thread->task = &engine->task;
717 thread->fiber = &engine->fibers->fiber;
718
719 engine->port->socket.task = task;
720 nxt_port_create(task, engine->port, nxt_router_process_port_handlers);
721
722 engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
723
724 nxt_event_engine_start(engine);
725}
726
727
728static void
729nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data)
730{
731 nxt_listen_event_t *listen;
732 nxt_listen_socket_t *ls;
733 nxt_socket_conf_joint_t *joint;
734
735 joint = data;
736
737 ls = &joint->socket_conf->listen;
738
739 listen = nxt_listen_event(task, ls);
740 if (nxt_slow_path(listen == NULL)) {
741 nxt_router_listen_socket_release(task, joint);
742 return;
743 }
744
745 listen->socket.data = joint;
746}
747
748
749nxt_inline nxt_listen_event_t *
750nxt_router_listen_event(nxt_queue_t *listen_connections,
751 nxt_socket_conf_t *skcf)
752{
753 nxt_socket_t socket;
754 nxt_queue_link_t *link;
755 nxt_listen_event_t *listen;
756
757 socket = skcf->listen.socket;
758
759 for (link = nxt_queue_first(listen_connections);
760 link != nxt_queue_tail(listen_connections);
761 link = nxt_queue_next(link))
762 {
763 listen = nxt_queue_link_data(link, nxt_listen_event_t, link);
764
765 if (socket == listen->socket.fd) {
766 return listen;
767 }
768 }
769
770 return NULL;
771}
772
773
774static void
775nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data)
776{
777 nxt_event_engine_t *engine;
778 nxt_listen_event_t *listen;
779 nxt_socket_conf_joint_t *joint, *old;
780
781 engine = obj;
782 joint = data;
783
784 listen = nxt_router_listen_event(&engine->listen_connections,
785 joint->socket_conf);
786
787 old = listen->socket.data;
788 listen->socket.data = joint;
789
790 nxt_router_conf_release(task, old);
791}
792
793
794static void
795nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data)
796{
797 nxt_socket_conf_t *skcf;
798 nxt_listen_event_t *listen;
799 nxt_event_engine_t *engine;
800
801 engine = obj;
802 skcf = data;
803
804 listen = nxt_router_listen_event(&engine->listen_connections, skcf);
805
806 nxt_fd_event_delete(engine, &listen->socket);
807
808 listen->timer.handler = nxt_router_listen_socket_close;
809 listen->timer.work_queue = &engine->fast_work_queue;
810
811 nxt_timer_add(engine, &listen->timer, 0);
812}
813
814
815static void
816nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data)
817{
818 nxt_timer_t *timer;
819 nxt_listen_event_t *listen;
820 nxt_socket_conf_joint_t *joint;
821
822 timer = obj;
823 listen = nxt_timer_data(timer, nxt_listen_event_t, timer);
824 joint = listen->socket.data;
825
826 nxt_queue_remove(&listen->link);
827 nxt_free(listen);
828
829 nxt_router_listen_socket_release(task, joint);
830}
831
832
833static void
834nxt_router_listen_socket_release(nxt_task_t *task,
835 nxt_socket_conf_joint_t *joint)
836{
837 nxt_socket_t s;
838 nxt_listen_socket_t *ls;
839 nxt_thread_spinlock_t *lock;
840
841 s = -1;
842 ls = &joint->socket_conf->listen;
843 lock = &joint->socket_conf->router_conf->router->lock;
844
845 nxt_thread_spin_lock(lock);
846
847 if (--ls->count == 0) {
848 s = ls->socket;
849 ls->socket = -1;
850 }
851
852 nxt_thread_spin_unlock(lock);
853
854 if (s != -1) {
855 nxt_socket_close(task, s);
856 }
857
858 nxt_router_conf_release(task, joint);
859}
860
861
862static void
863nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
864{
865 nxt_socket_conf_t *skcf;
866 nxt_router_conf_t *rtcf;
867 nxt_thread_spinlock_t *lock;
868
869 nxt_debug(task, "conf joint count: %D", joint->count);
870
871 if (--joint->count != 0) {
872 return;
873 }
874
875 nxt_queue_remove(&joint->link);
876
877 skcf = joint->socket_conf;
878 rtcf = skcf->router_conf;
879 lock = &rtcf->router->lock;
880
881 nxt_thread_spin_lock(lock);
882
883 if (--skcf->count != 0) {
884 rtcf = NULL;
885
886 } else {
887 nxt_queue_remove(&skcf->link);
888
889 if (--rtcf->count != 0) {
890 rtcf = NULL;
891 }
892 }
893
894 nxt_thread_spin_unlock(lock);
895
896 if (rtcf != NULL) {
897 nxt_mp_destroy(rtcf->mem_pool);
898 }
899
900 if (nxt_queue_is_empty(&joint->engine->joints)) {
901 nxt_thread_exit(task->thread);
902 }
903}
904
905
906static void
907nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data)
908{
909 nxt_thread_link_t *link;
910 nxt_event_engine_t *engine;
911 nxt_thread_handle_t handle;
912
913 handle = (nxt_thread_handle_t) obj;
914 link = data;
915
916 nxt_thread_wait(handle);
917
918 engine = link->engine;
919
920 nxt_queue_remove(&engine->link);
921
922 nxt_mp_destroy(engine->mem_pool);
923
924 nxt_event_engine_free(engine);
925
926 nxt_free(link);
927
928 // TODO: free port
929}
930
931
932static const nxt_conn_state_t nxt_router_conn_read_state
933 nxt_aligned(64) =
934{
935 .ready_handler = nxt_router_conn_http_header_parse,
936 .close_handler = nxt_router_conn_close,
937 .error_handler = nxt_router_conn_error,
938
939 .timer_handler = nxt_router_conn_timeout,
940 .timer_value = nxt_router_conn_timeout_value,
941 .timer_data = offsetof(nxt_socket_conf_t, header_read_timeout),
942};
943
944
945static void
946nxt_router_conn_init(nxt_task_t *task, void *obj, void *data)
947{
948 size_t size;
949 nxt_conn_t *c;
950 nxt_event_engine_t *engine;
951 nxt_socket_conf_joint_t *joint;
952
953 c = obj;
954 joint = data;
955
956 nxt_debug(task, "router conn init");
957
958 joint->count++;
959
960 size = joint->socket_conf->header_buffer_size;
961 c->read = nxt_buf_mem_alloc(c->mem_pool, size, 0);
962
963 c->socket.data = NULL;
964
965 engine = task->thread->engine;
966 c->read_work_queue = &engine->fast_work_queue;
967 c->write_work_queue = &engine->fast_work_queue;
968
969 c->read_state = &nxt_router_conn_read_state;
970
971 nxt_conn_read(engine, c);
972}
973
974
975static const nxt_conn_state_t nxt_router_conn_write_state
976 nxt_aligned(64) =
977{
978 .ready_handler = nxt_router_conn_ready,
979 .close_handler = nxt_router_conn_close,
980 .error_handler = nxt_router_conn_error,
981};
982
983
984static void
985nxt_router_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
986{
987 size_t dump_size;
988 nxt_buf_t *b, *i, *last;
989 nxt_conn_t *c;
990 nxt_req_conn_link_t *rc;
991 nxt_event_engine_t *engine;
992
993 b = msg->buf;
994 engine = task->thread->engine;
995
996 rc = nxt_event_engine_request_find(engine, msg->port_msg.stream);
997 if (nxt_slow_path(rc == NULL)) {
998
999 nxt_debug(task, "request id %08uxD not found", msg->port_msg.stream);
1000
1001 /* Mark buffers as read. */
1002 for (i = b; i != NULL; i = i->next) {
1003 i->mem.pos = i->mem.free;
1004 }
1005
1006 return;
1007 }
1008
1009 c = rc->conn;
1010
1011 dump_size = nxt_buf_used_size(b);
1012
1013 if (dump_size > 300) {
1014 dump_size = 300;
1015 }
1016
1017 nxt_debug(task, "%srouter data (%z): %*s",
1018 msg->port_msg.last ? "last " : "", msg->size, dump_size,
1019 b->mem.pos);
1020
1021 if (msg->size == 0) {
1022 b = NULL;
1023 }
1024
1025 if (msg->port_msg.last != 0) {
1026 nxt_debug(task, "router data create last buf");
1027
1028 last = nxt_buf_sync_alloc(c->mem_pool, NXT_BUF_SYNC_LAST);
1029 if (nxt_slow_path(last == NULL)) {
1030 /* TODO pogorevaTb */
1031 }
1032
1033 nxt_buf_chain_add(&b, last);
1034 }
1035
1036 if (b == NULL) {
1037 return;
1038 }
1039
1040 if (c->write == NULL) {
1041 c->write = b;
1042 c->write_state = &nxt_router_conn_write_state;
1043
1044 nxt_conn_write(task->thread->engine, c);
1045 } else {
1046 nxt_debug(task, "router data attach out bufs to existing chain");
1047
1048 nxt_buf_chain_add(&c->write, b);
1049 }
1050}
1051
1052
1053nxt_inline nxt_port_t *
1054nxt_router_app_port(nxt_task_t *task)
1055{
1056 nxt_port_t *port;
1057 nxt_runtime_t *rt;
1058
1059 rt = task->thread->runtime;
1060
1061 nxt_runtime_port_each(rt, port) {
1062
1063 if (nxt_pid == port->pid) {
1064 continue;
1065 }
1066
1067 if (port->type == NXT_PROCESS_WORKER) {
1068 return port;
1069 }
1070
1071 } nxt_runtime_port_loop;
1072
1073 return NULL;
1074}
1075
1076
1077static void
1078nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data)
1079{
1080 size_t size, preread;
1081 nxt_int_t ret;
1082 nxt_buf_t *b;
1083 nxt_conn_t *c;
1084 nxt_app_parse_ctx_t *ap;
1085 nxt_socket_conf_joint_t *joint;
1086 nxt_app_request_header_t *h;
1087
1088 c = obj;
1089 ap = data;
1090 b = c->read;
1091
1092 nxt_debug(task, "router conn http header parse");
1093
1094 if (ap == NULL) {
1095 ap = nxt_mp_zget(c->mem_pool, sizeof(nxt_app_parse_ctx_t));
1096 if (nxt_slow_path(ap == NULL)) {
1097 nxt_router_conn_close(task, c, data);
1098 return;
1099 }
1100
1101 ret = nxt_app_http_req_init(task, ap);
1102 if (nxt_slow_path(ret != NXT_OK)) {
1103 nxt_router_conn_close(task, c, data);
1104 return;
1105 }
1106
1107 c->socket.data = ap;
1108
1109 ap->r.remote.start = nxt_sockaddr_address(c->remote);
1110 ap->r.remote.length = c->remote->address_length;
1108 }
1109
1110 h = &ap->r.header;
1111
1112 ret = nxt_app_http_req_parse(task, ap, b);
1113
1114 nxt_debug(task, "http parse request: %d", ret);
1115
1116 switch (nxt_expect(NXT_DONE, ret)) {
1117
1118 case NXT_DONE:
1119 preread = nxt_buf_mem_used_size(&b->mem);
1120
1121 nxt_debug(task, "router request header parsing complete, "
1122 "content length: %O, preread: %uz",
1123 h->parsed_content_length, preread);
1124
1125 nxt_router_process_http_request(task, c, ap);
1126 return;
1127
1128 case NXT_ERROR:
1129 nxt_router_conn_close(task, c, data);
1130 return;
1131
1132 default: /* NXT_AGAIN */
1133
1134 if (h->done == 0) {
1135
1136 if (c->read->mem.free == c->read->mem.end) {
1137 joint = c->listen->socket.data;
1138 size = joint->socket_conf->large_header_buffer_size;
1139
1140 if (size > (size_t) nxt_buf_mem_size(&b->mem)) {
1141 b = nxt_buf_mem_alloc(c->mem_pool, size, 0);
1142 if (nxt_slow_path(b == NULL)) {
1143 nxt_router_conn_close(task, c, data);
1144 return;
1145 }
1146
1147 size = c->read->mem.free - c->read->mem.pos;
1148 nxt_memcpy(b->mem.pos, c->read->mem.pos, size);
1149
1150 b->mem.free += size;
1151 c->read = b;
1152 } else {
1153 // TODO 500 Too long request headers
1154 nxt_log_alert(task->log, "Too long request headers");
1155 }
1156 }
1157 }
1158
1159 if (ap->r.body.done == 0) {
1160
1161 preread = nxt_buf_mem_used_size(&b->mem);
1162
1163 if (h->parsed_content_length - preread >
1164 (size_t) nxt_buf_mem_free_size(&b->mem)) {
1165
1166 b = nxt_buf_mem_alloc(c->mem_pool, h->parsed_content_length, 0);
1167 if (nxt_slow_path(b == NULL)) {
1168 // TODO 500 Failed to allocate buffer for request body
1169 nxt_log_alert(task->log, "Failed to allocate buffer for "
1170 "request body");
1171 }
1172
1173 b->mem.free = nxt_cpymem(b->mem.free, c->read->mem.pos,
1174 preread);
1175
1176 c->read = b;
1177 }
1178
1179 nxt_debug(task, "router request body read again, rest: %uz",
1180 h->parsed_content_length - preread);
1181
1182 }
1183
1184 }
1185
1186 nxt_conn_read(task->thread->engine, c);
1187}
1188
1189
1190static void
1191nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
1192 nxt_app_parse_ctx_t *ap)
1193{
1194 nxt_port_t *port, *c_port;
1195 nxt_req_id_t req_id;
1196 nxt_app_wmsg_t wmsg;
1197 nxt_event_engine_t *engine;
1198 nxt_req_conn_link_t *rc;
1199
1200 if (nxt_slow_path(nxt_app == NULL)) {
1201 // 500 Application not found
1202 nxt_log_alert(task->log, "application is NULL");
1203 }
1204
1205 port = nxt_router_app_port(task);
1206
1207 if (nxt_slow_path(port == NULL)) {
1208 // 500 Application port not found
1209 nxt_log_alert(task->log, "application port not found");
1210 }
1211
1212 engine = task->thread->engine;
1213
1214 do {
1215 req_id = nxt_random(&nxt_random_data);
1216 } while (nxt_event_engine_request_find(engine, req_id) != NULL);
1217
1218 rc = nxt_conn_request_add(c, req_id);
1219 if (nxt_slow_path(rc == NULL)) {
1220 // 500 Failed to allocate req->conn link
1221 nxt_log_alert(task->log, "failed to allocate req->conn link");
1222 }
1223
1224 nxt_event_engine_request_add(engine, rc);
1225
1226 nxt_debug(task, "req_id %uxD linked to conn %p at engine %p",
1227 req_id, c, engine);
1228
1229 c_port = nxt_process_connected_port_find(port->process,
1230 engine->port->pid,
1231 engine->port->id);
1232 if (nxt_slow_path(c_port != engine->port)) {
1233 (void) nxt_port_send_port(task, port, engine->port);
1234 nxt_process_connected_port_add(port->process, engine->port);
1235 }
1236
1237 wmsg.port = port;
1238 wmsg.write = NULL;
1239 wmsg.buf = &wmsg.write;
1240 wmsg.stream = req_id;
1241
1242 (void)nxt_app->prepare_msg(task, &ap->r, &wmsg);
1243
1244 nxt_debug(task, "about to send %d bytes buffer to worker port %d",
1245 nxt_buf_used_size(wmsg.write),
1246 wmsg.port->socket.fd);
1247
1248 (void) nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA,
1249 -1, req_id, engine->port->id, wmsg.write);
1250}
1251
1252
1253static const nxt_conn_state_t nxt_router_conn_close_state
1254 nxt_aligned(64) =
1255{
1256 .ready_handler = nxt_router_conn_free,
1257};
1258
1259
1260static void
1261nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data)
1262{
1263 nxt_buf_t *b;
1264 nxt_bool_t last;
1265 nxt_conn_t *c;
1266 nxt_work_queue_t *wq;
1267
1268 nxt_debug(task, "router conn ready %p", obj);
1269
1270 c = obj;
1271 b = c->write;
1272
1273 wq = &task->thread->engine->fast_work_queue;
1274
1275 last = 0;
1276
1277 while (b != NULL) {
1278 if (!nxt_buf_is_sync(b)) {
1279 if (nxt_buf_used_size(b) > 0) {
1280 break;
1281 }
1282 }
1283
1284 if (nxt_buf_is_last(b)) {
1285 last = 1;
1286 }
1287
1288 nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
1289
1290 b = b->next;
1291 }
1292
1293 c->write = b;
1294
1295 if (b != NULL) {
1296 nxt_debug(task, "router conn %p has more data to write", obj);
1297
1298 nxt_conn_write(task->thread->engine, c);
1299 } else {
1300 nxt_debug(task, "router conn %p no more data to write, last = %d", obj,
1301 last);
1302
1303 if (last != 0) {
1304 nxt_debug(task, "enqueue router conn close %p (ready handler)", c);
1305
1306 nxt_work_queue_add(wq, nxt_router_conn_close, task, c,
1307 c->socket.data);
1308 }
1309 }
1310}
1311
1312
1313static void
1314nxt_router_conn_close(nxt_task_t *task, void *obj, void *data)
1315{
1316 nxt_conn_t *c;
1317
1318 c = obj;
1319
1320 nxt_debug(task, "router conn close");
1321
1322 c->write_state = &nxt_router_conn_close_state;
1323
1324 nxt_conn_close(task->thread->engine, c);
1325}
1326
1327
1328static void
1329nxt_router_conn_free(nxt_task_t *task, void *obj, void *data)
1330{
1331 nxt_conn_t *c;
1332 nxt_req_conn_link_t *rc;
1333 nxt_socket_conf_joint_t *joint;
1334
1335 c = obj;
1336
1337 nxt_debug(task, "router conn close done");
1338
1339 joint = c->listen->socket.data;
1340 nxt_router_conf_release(task, joint);
1341
1342 nxt_queue_each(rc, &c->requests, nxt_req_conn_link_t, link) {
1343
1344 nxt_debug(task, "conn %p close, req %uxD", c, rc->req_id);
1345
1346 nxt_event_engine_request_remove(task->thread->engine, rc);
1347
1348 } nxt_queue_loop;
1349
1350 nxt_mp_destroy(c->mem_pool);
1351}
1352
1353
1354static void
1355nxt_router_conn_error(nxt_task_t *task, void *obj, void *data)
1356{
1357 nxt_conn_t *c;
1358
1359 c = obj;
1360
1361 nxt_debug(task, "router conn error");
1362
1363 c->write_state = &nxt_router_conn_close_state;
1364
1365 nxt_conn_close(task->thread->engine, c);
1366}
1367
1368
1369static void
1370nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data)
1371{
1372 nxt_conn_t *c;
1373 nxt_timer_t *timer;
1374
1375 timer = obj;
1376
1377 nxt_debug(task, "router conn timeout");
1378
1379 c = nxt_read_timer_conn(timer);
1380
1381 c->write_state = &nxt_router_conn_close_state;
1382
1383 nxt_conn_close(task->thread->engine, c);
1384}
1385
1386
1387static nxt_msec_t
1388nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data)
1389{
1390 nxt_socket_conf_joint_t *joint;
1391
1392 joint = c->listen->socket.data;
1393
1394 return nxt_value_at(nxt_msec_t, joint->socket_conf, data);
1395}
1111 }
1112
1113 h = &ap->r.header;
1114
1115 ret = nxt_app_http_req_parse(task, ap, b);
1116
1117 nxt_debug(task, "http parse request: %d", ret);
1118
1119 switch (nxt_expect(NXT_DONE, ret)) {
1120
1121 case NXT_DONE:
1122 preread = nxt_buf_mem_used_size(&b->mem);
1123
1124 nxt_debug(task, "router request header parsing complete, "
1125 "content length: %O, preread: %uz",
1126 h->parsed_content_length, preread);
1127
1128 nxt_router_process_http_request(task, c, ap);
1129 return;
1130
1131 case NXT_ERROR:
1132 nxt_router_conn_close(task, c, data);
1133 return;
1134
1135 default: /* NXT_AGAIN */
1136
1137 if (h->done == 0) {
1138
1139 if (c->read->mem.free == c->read->mem.end) {
1140 joint = c->listen->socket.data;
1141 size = joint->socket_conf->large_header_buffer_size;
1142
1143 if (size > (size_t) nxt_buf_mem_size(&b->mem)) {
1144 b = nxt_buf_mem_alloc(c->mem_pool, size, 0);
1145 if (nxt_slow_path(b == NULL)) {
1146 nxt_router_conn_close(task, c, data);
1147 return;
1148 }
1149
1150 size = c->read->mem.free - c->read->mem.pos;
1151 nxt_memcpy(b->mem.pos, c->read->mem.pos, size);
1152
1153 b->mem.free += size;
1154 c->read = b;
1155 } else {
1156 // TODO 500 Too long request headers
1157 nxt_log_alert(task->log, "Too long request headers");
1158 }
1159 }
1160 }
1161
1162 if (ap->r.body.done == 0) {
1163
1164 preread = nxt_buf_mem_used_size(&b->mem);
1165
1166 if (h->parsed_content_length - preread >
1167 (size_t) nxt_buf_mem_free_size(&b->mem)) {
1168
1169 b = nxt_buf_mem_alloc(c->mem_pool, h->parsed_content_length, 0);
1170 if (nxt_slow_path(b == NULL)) {
1171 // TODO 500 Failed to allocate buffer for request body
1172 nxt_log_alert(task->log, "Failed to allocate buffer for "
1173 "request body");
1174 }
1175
1176 b->mem.free = nxt_cpymem(b->mem.free, c->read->mem.pos,
1177 preread);
1178
1179 c->read = b;
1180 }
1181
1182 nxt_debug(task, "router request body read again, rest: %uz",
1183 h->parsed_content_length - preread);
1184
1185 }
1186
1187 }
1188
1189 nxt_conn_read(task->thread->engine, c);
1190}
1191
1192
1193static void
1194nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
1195 nxt_app_parse_ctx_t *ap)
1196{
1197 nxt_port_t *port, *c_port;
1198 nxt_req_id_t req_id;
1199 nxt_app_wmsg_t wmsg;
1200 nxt_event_engine_t *engine;
1201 nxt_req_conn_link_t *rc;
1202
1203 if (nxt_slow_path(nxt_app == NULL)) {
1204 // 500 Application not found
1205 nxt_log_alert(task->log, "application is NULL");
1206 }
1207
1208 port = nxt_router_app_port(task);
1209
1210 if (nxt_slow_path(port == NULL)) {
1211 // 500 Application port not found
1212 nxt_log_alert(task->log, "application port not found");
1213 }
1214
1215 engine = task->thread->engine;
1216
1217 do {
1218 req_id = nxt_random(&nxt_random_data);
1219 } while (nxt_event_engine_request_find(engine, req_id) != NULL);
1220
1221 rc = nxt_conn_request_add(c, req_id);
1222 if (nxt_slow_path(rc == NULL)) {
1223 // 500 Failed to allocate req->conn link
1224 nxt_log_alert(task->log, "failed to allocate req->conn link");
1225 }
1226
1227 nxt_event_engine_request_add(engine, rc);
1228
1229 nxt_debug(task, "req_id %uxD linked to conn %p at engine %p",
1230 req_id, c, engine);
1231
1232 c_port = nxt_process_connected_port_find(port->process,
1233 engine->port->pid,
1234 engine->port->id);
1235 if (nxt_slow_path(c_port != engine->port)) {
1236 (void) nxt_port_send_port(task, port, engine->port);
1237 nxt_process_connected_port_add(port->process, engine->port);
1238 }
1239
1240 wmsg.port = port;
1241 wmsg.write = NULL;
1242 wmsg.buf = &wmsg.write;
1243 wmsg.stream = req_id;
1244
1245 (void)nxt_app->prepare_msg(task, &ap->r, &wmsg);
1246
1247 nxt_debug(task, "about to send %d bytes buffer to worker port %d",
1248 nxt_buf_used_size(wmsg.write),
1249 wmsg.port->socket.fd);
1250
1251 (void) nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA,
1252 -1, req_id, engine->port->id, wmsg.write);
1253}
1254
1255
1256static const nxt_conn_state_t nxt_router_conn_close_state
1257 nxt_aligned(64) =
1258{
1259 .ready_handler = nxt_router_conn_free,
1260};
1261
1262
1263static void
1264nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data)
1265{
1266 nxt_buf_t *b;
1267 nxt_bool_t last;
1268 nxt_conn_t *c;
1269 nxt_work_queue_t *wq;
1270
1271 nxt_debug(task, "router conn ready %p", obj);
1272
1273 c = obj;
1274 b = c->write;
1275
1276 wq = &task->thread->engine->fast_work_queue;
1277
1278 last = 0;
1279
1280 while (b != NULL) {
1281 if (!nxt_buf_is_sync(b)) {
1282 if (nxt_buf_used_size(b) > 0) {
1283 break;
1284 }
1285 }
1286
1287 if (nxt_buf_is_last(b)) {
1288 last = 1;
1289 }
1290
1291 nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
1292
1293 b = b->next;
1294 }
1295
1296 c->write = b;
1297
1298 if (b != NULL) {
1299 nxt_debug(task, "router conn %p has more data to write", obj);
1300
1301 nxt_conn_write(task->thread->engine, c);
1302 } else {
1303 nxt_debug(task, "router conn %p no more data to write, last = %d", obj,
1304 last);
1305
1306 if (last != 0) {
1307 nxt_debug(task, "enqueue router conn close %p (ready handler)", c);
1308
1309 nxt_work_queue_add(wq, nxt_router_conn_close, task, c,
1310 c->socket.data);
1311 }
1312 }
1313}
1314
1315
1316static void
1317nxt_router_conn_close(nxt_task_t *task, void *obj, void *data)
1318{
1319 nxt_conn_t *c;
1320
1321 c = obj;
1322
1323 nxt_debug(task, "router conn close");
1324
1325 c->write_state = &nxt_router_conn_close_state;
1326
1327 nxt_conn_close(task->thread->engine, c);
1328}
1329
1330
1331static void
1332nxt_router_conn_free(nxt_task_t *task, void *obj, void *data)
1333{
1334 nxt_conn_t *c;
1335 nxt_req_conn_link_t *rc;
1336 nxt_socket_conf_joint_t *joint;
1337
1338 c = obj;
1339
1340 nxt_debug(task, "router conn close done");
1341
1342 joint = c->listen->socket.data;
1343 nxt_router_conf_release(task, joint);
1344
1345 nxt_queue_each(rc, &c->requests, nxt_req_conn_link_t, link) {
1346
1347 nxt_debug(task, "conn %p close, req %uxD", c, rc->req_id);
1348
1349 nxt_event_engine_request_remove(task->thread->engine, rc);
1350
1351 } nxt_queue_loop;
1352
1353 nxt_mp_destroy(c->mem_pool);
1354}
1355
1356
1357static void
1358nxt_router_conn_error(nxt_task_t *task, void *obj, void *data)
1359{
1360 nxt_conn_t *c;
1361
1362 c = obj;
1363
1364 nxt_debug(task, "router conn error");
1365
1366 c->write_state = &nxt_router_conn_close_state;
1367
1368 nxt_conn_close(task->thread->engine, c);
1369}
1370
1371
1372static void
1373nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data)
1374{
1375 nxt_conn_t *c;
1376 nxt_timer_t *timer;
1377
1378 timer = obj;
1379
1380 nxt_debug(task, "router conn timeout");
1381
1382 c = nxt_read_timer_conn(timer);
1383
1384 c->write_state = &nxt_router_conn_close_state;
1385
1386 nxt_conn_close(task->thread->engine, c);
1387}
1388
1389
1390static nxt_msec_t
1391nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data)
1392{
1393 nxt_socket_conf_joint_t *joint;
1394
1395 joint = c->listen->socket.data;
1396
1397 return nxt_value_at(nxt_msec_t, joint->socket_conf, data);
1398}