xref: /unit/src/nxt_router.c (revision 277:6baa1731cc6f)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Valentin V. Bartenev
5  * Copyright (C) NGINX, Inc.
6  */
7 
8 #include <nxt_router.h>
9 #include <nxt_conf.h>
10 
11 
12 typedef struct {
13     nxt_str_t  type;
14     uint32_t   workers;
15 } nxt_router_app_conf_t;
16 
17 
18 typedef struct {
19     nxt_str_t  application;
20 } nxt_router_listener_conf_t;
21 
22 
23 typedef struct nxt_req_app_link_s nxt_req_app_link_t;
24 typedef struct nxt_start_worker_s nxt_start_worker_t;
25 
26 struct nxt_start_worker_s {
27     nxt_app_t              *app;
28     nxt_req_app_link_t     *ra;
29 
30     nxt_work_t             work;
31 };
32 
33 
34 struct nxt_req_app_link_s {
35     nxt_req_id_t         req_id;
36     nxt_port_t           *app_port;
37     nxt_port_t           *reply_port;
38     nxt_app_parse_ctx_t  *ap;
39     nxt_req_conn_link_t  *rc;
40 
41     nxt_queue_link_t     link; /* for nxt_app_t.requests */
42 
43     nxt_mp_t             *mem_pool;
44     nxt_work_t           work;
45 };
46 
47 
48 typedef struct {
49     nxt_socket_conf_t       *socket_conf;
50     nxt_router_temp_conf_t  *temp_conf;
51 } nxt_socket_rpc_t;
52 
53 
54 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
55 static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
56 static void nxt_router_conf_ready(nxt_task_t *task,
57     nxt_router_temp_conf_t *tmcf);
58 static void nxt_router_conf_error(nxt_task_t *task,
59     nxt_router_temp_conf_t *tmcf);
60 static void nxt_router_conf_send(nxt_task_t *task,
61     nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
62 static void nxt_router_listen_sockets_sort(nxt_router_t *router,
63     nxt_router_temp_conf_t *tmcf);
64 
65 static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
66     nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
67 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
68 static nxt_app_t *nxt_router_listener_application(nxt_router_temp_conf_t *tmcf,
69     nxt_str_t *name);
70 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
71     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
72 static void nxt_router_listen_socket_ready(nxt_task_t *task,
73     nxt_port_recv_msg_t *msg, void *data);
74 static void nxt_router_listen_socket_error(nxt_task_t *task,
75     nxt_port_recv_msg_t *msg, void *data);
76 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task, nxt_mp_t *mp,
77     nxt_sockaddr_t *sa);
78 
79 static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
80     nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
81     const nxt_event_interface_t *interface);
82 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
83     nxt_router_engine_conf_t *recf);
84 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
85     nxt_router_engine_conf_t *recf);
86 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
87     nxt_router_engine_conf_t *recf);
88 static void nxt_router_engine_socket_count(nxt_queue_t *sockets);
89 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
90     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
91     nxt_work_handler_t handler);
92 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
93     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
94 
95 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
96     nxt_router_temp_conf_t *tmcf);
97 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
98     nxt_event_engine_t *engine);
99 static void nxt_router_apps_sort(nxt_router_t *router,
100     nxt_router_temp_conf_t *tmcf);
101 
102 static void nxt_router_engines_post(nxt_router_temp_conf_t *tmcf);
103 static void nxt_router_engine_post(nxt_router_engine_conf_t *recf);
104 static void nxt_router_app_data_handler(nxt_task_t *task,
105     nxt_port_recv_msg_t *msg);
106 
107 static void nxt_router_thread_start(void *data);
108 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
109     void *data);
110 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
111     void *data);
112 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
113     void *data);
114 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
115     void *data);
116 static void nxt_router_listen_socket_release(nxt_task_t *task,
117     nxt_socket_conf_joint_t *joint);
118 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
119     void *data);
120 static void nxt_router_conf_release(nxt_task_t *task,
121     nxt_socket_conf_joint_t *joint);
122 
123 static void nxt_router_send_sw_request(nxt_task_t *task, void *obj,
124     void *data);
125 static nxt_bool_t nxt_router_app_free(nxt_task_t *task, nxt_app_t *app);
126 static nxt_port_t * nxt_router_app_get_port(nxt_app_t *app, uint32_t req_id);
127 static void nxt_router_app_release_port(nxt_task_t *task, void *obj,
128     void *data);
129 
130 static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data);
131 static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj,
132     void *data);
133 static void nxt_router_conn_http_body_read(nxt_task_t *task, void *obj,
134     void *data);
135 static void nxt_router_process_http_request(nxt_task_t *task,
136     nxt_conn_t *c, nxt_app_parse_ctx_t *ap);
137 static void nxt_router_process_http_request_mp(nxt_task_t *task,
138     nxt_req_app_link_t *ra, nxt_port_t *port);
139 static nxt_int_t nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
140     nxt_app_wmsg_t *wmsg);
141 static nxt_int_t nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
142     nxt_app_wmsg_t *wmsg);
143 static nxt_int_t nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
144     nxt_app_wmsg_t *wmsg);
145 static void nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data);
146 static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data);
147 static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data);
148 static void nxt_router_conn_error(nxt_task_t *task, void *obj, void *data);
149 static void nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data);
150 static nxt_msec_t nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data);
151 
152 static void nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code,
153     const char* fmt, ...);
154 
155 static nxt_router_t  *nxt_router;
156 
157 
158 static nxt_app_prepare_msg_t  nxt_app_prepare_msg[] = {
159     nxt_python_prepare_msg,
160     nxt_php_prepare_msg,
161     nxt_go_prepare_msg,
162 };
163 
164 
165 nxt_int_t
166 nxt_router_start(nxt_task_t *task, void *data)
167 {
168     nxt_int_t      ret;
169     nxt_router_t   *router;
170     nxt_runtime_t  *rt;
171 
172     rt = task->thread->runtime;
173 
174     ret = nxt_app_http_init(task, rt);
175     if (nxt_slow_path(ret != NXT_OK)) {
176         return ret;
177     }
178 
179     router = nxt_zalloc(sizeof(nxt_router_t));
180     if (nxt_slow_path(router == NULL)) {
181         return NXT_ERROR;
182     }
183 
184     nxt_queue_init(&router->engines);
185     nxt_queue_init(&router->sockets);
186     nxt_queue_init(&router->apps);
187 
188     nxt_router = router;
189 
190     return NXT_OK;
191 }
192 
193 
194 static nxt_start_worker_t *
195 nxt_router_sw_create(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra)
196 {
197     nxt_port_t          *main_port;
198     nxt_runtime_t       *rt;
199     nxt_start_worker_t  *sw;
200 
201     sw = nxt_zalloc(sizeof(nxt_start_worker_t));
202 
203     if (nxt_slow_path(sw == NULL)) {
204         return NULL;
205     }
206 
207     sw->app = app;
208     sw->ra = ra;
209 
210     nxt_debug(task, "sw %p create, request #%uxD, app '%V' %p", sw,
211                     ra->req_id, &app->name, app);
212 
213     rt = task->thread->runtime;
214     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
215 
216     sw->work.handler = nxt_router_send_sw_request;
217     sw->work.task = &main_port->engine->task;
218     sw->work.obj = sw;
219     sw->work.data = task->thread->engine;
220     sw->work.next = NULL;
221 
222     if (task->thread->engine != main_port->engine) {
223         nxt_debug(task, "sw %p post send to main engine %p", sw,
224                   main_port->engine);
225 
226         nxt_event_engine_post(main_port->engine, &sw->work);
227 
228     } else {
229         nxt_router_send_sw_request(task, sw, sw->work.data);
230     }
231 
232     return sw;
233 }
234 
235 
236 nxt_inline void
237 nxt_router_sw_release(nxt_task_t *task, nxt_start_worker_t *sw)
238 {
239     nxt_debug(task, "sw %p release", sw);
240 
241     nxt_free(sw);
242 }
243 
244 
245 static nxt_req_app_link_t *
246 nxt_router_ra_create(nxt_task_t *task, nxt_req_conn_link_t *rc)
247 {
248     nxt_mp_t            *mp;
249     nxt_req_app_link_t  *ra;
250 
251     mp = rc->conn->mem_pool;
252 
253     ra = nxt_mp_retain(mp, sizeof(nxt_req_app_link_t));
254 
255     if (nxt_slow_path(ra == NULL)) {
256         return NULL;
257     }
258 
259     nxt_debug(task, "ra #%uxD create", rc->req_id);
260 
261     nxt_memzero(ra, sizeof(nxt_req_app_link_t));
262 
263     ra->req_id = rc->req_id;
264     ra->app_port = NULL;
265     ra->rc = rc;
266 
267     ra->mem_pool = mp;
268 
269     ra->work.handler = NULL;
270     ra->work.task = &task->thread->engine->task;
271     ra->work.obj = ra;
272     ra->work.data = task->thread->engine;
273 
274     return ra;
275 }
276 
277 
278 static void
279 nxt_router_ra_release(nxt_task_t *task, void *obj, void *data)
280 {
281     nxt_req_app_link_t  *ra;
282     nxt_event_engine_t  *engine;
283 
284     ra = obj;
285     engine = data;
286 
287     if (task->thread->engine != engine) {
288         ra->work.handler = nxt_router_ra_release;
289         ra->work.task = &engine->task;
290         ra->work.next = NULL;
291 
292         nxt_debug(task, "ra #%uxD post release to %p", ra->req_id, engine);
293 
294         nxt_event_engine_post(engine, &ra->work);
295 
296         return;
297     }
298 
299     nxt_debug(task, "ra #%uxD release", ra->req_id);
300 
301     if (ra->app_port != NULL) {
302 
303         nxt_router_app_release_port(task, ra->app_port, ra->app_port->app);
304 
305 #if 0
306         /* Uncomment to hold app port until complete response received. */
307         if (ra->rc->conn != NULL) {
308             ra->rc->app_port = ra->app_port;
309 
310         } else {
311             nxt_router_app_release_port(task, ra->app_port, ra->app_port->app);
312         }
313 #endif
314     }
315 
316     nxt_mp_release(ra->mem_pool, ra);
317 }
318 
319 
320 void
321 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
322 {
323     nxt_port_new_port_handler(task, msg);
324 
325     if (msg->port_msg.stream == 0) {
326         return;
327     }
328 
329     if (msg->new_port == NULL || msg->new_port->type != NXT_PROCESS_WORKER) {
330         msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
331     }
332 
333     nxt_port_rpc_handler(task, msg);
334 }
335 
336 
337 void
338 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
339 {
340     size_t                  dump_size;
341     nxt_int_t               ret;
342     nxt_buf_t               *b;
343     nxt_router_temp_conf_t  *tmcf;
344 
345     b = msg->buf;
346 
347     dump_size = nxt_buf_used_size(b);
348 
349     if (dump_size > 300) {
350         dump_size = 300;
351     }
352 
353     nxt_debug(task, "router conf data (%z): %*s",
354               msg->size, dump_size, b->mem.pos);
355 
356     tmcf = nxt_router_temp_conf(task);
357     if (nxt_slow_path(tmcf == NULL)) {
358         return;
359     }
360 
361     tmcf->conf->router = nxt_router;
362     tmcf->stream = msg->port_msg.stream;
363     tmcf->port = nxt_runtime_port_find(task->thread->runtime,
364                                        msg->port_msg.pid,
365                                        msg->port_msg.reply_port);
366 
367     ret = nxt_router_conf_create(task, tmcf, b->mem.pos, b->mem.free);
368 
369     if (nxt_fast_path(ret == NXT_OK)) {
370         nxt_router_conf_apply(task, tmcf, NULL);
371 
372     } else {
373         nxt_router_conf_error(task, tmcf);
374     }
375 }
376 
377 
378 void
379 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
380 {
381     nxt_port_remove_pid_handler(task, msg);
382 
383     if (msg->port_msg.stream == 0) {
384         return;
385     }
386 
387     msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
388 
389     nxt_port_rpc_handler(task, msg);
390 }
391 
392 
393 static nxt_router_temp_conf_t *
394 nxt_router_temp_conf(nxt_task_t *task)
395 {
396     nxt_mp_t                *mp, *tmp;
397     nxt_router_conf_t       *rtcf;
398     nxt_router_temp_conf_t  *tmcf;
399 
400     mp = nxt_mp_create(1024, 128, 256, 32);
401     if (nxt_slow_path(mp == NULL)) {
402         return NULL;
403     }
404 
405     rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t));
406     if (nxt_slow_path(rtcf == NULL)) {
407         goto fail;
408     }
409 
410     rtcf->mem_pool = mp;
411 
412     tmp = nxt_mp_create(1024, 128, 256, 32);
413     if (nxt_slow_path(tmp == NULL)) {
414         goto fail;
415     }
416 
417     tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t));
418     if (nxt_slow_path(tmcf == NULL)) {
419         goto temp_fail;
420     }
421 
422     tmcf->mem_pool = tmp;
423     tmcf->conf = rtcf;
424     tmcf->count = 1;
425     tmcf->engine = task->thread->engine;
426 
427     tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
428                                      sizeof(nxt_router_engine_conf_t));
429     if (nxt_slow_path(tmcf->engines == NULL)) {
430         goto temp_fail;
431     }
432 
433     nxt_queue_init(&tmcf->deleting);
434     nxt_queue_init(&tmcf->keeping);
435     nxt_queue_init(&tmcf->updating);
436     nxt_queue_init(&tmcf->pending);
437     nxt_queue_init(&tmcf->creating);
438     nxt_queue_init(&tmcf->apps);
439     nxt_queue_init(&tmcf->previous);
440 
441     return tmcf;
442 
443 temp_fail:
444 
445     nxt_mp_destroy(tmp);
446 
447 fail:
448 
449     nxt_mp_destroy(mp);
450 
451     return NULL;
452 }
453 
454 
455 static void
456 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
457 {
458     nxt_int_t                    ret;
459     nxt_router_t                 *router;
460     nxt_runtime_t                *rt;
461     nxt_queue_link_t             *qlk;
462     nxt_socket_conf_t            *skcf;
463     nxt_router_temp_conf_t       *tmcf;
464     const nxt_event_interface_t  *interface;
465 
466     tmcf = obj;
467 
468     qlk = nxt_queue_first(&tmcf->pending);
469 
470     if (qlk != nxt_queue_tail(&tmcf->pending)) {
471         nxt_queue_remove(qlk);
472         nxt_queue_insert_tail(&tmcf->creating, qlk);
473 
474         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
475 
476         nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
477 
478         return;
479     }
480 
481     rt = task->thread->runtime;
482 
483     interface = nxt_service_get(rt->services, "engine", NULL);
484 
485     router = tmcf->conf->router;
486 
487     ret = nxt_router_engines_create(task, router, tmcf, interface);
488     if (nxt_slow_path(ret != NXT_OK)) {
489         goto fail;
490     }
491 
492     ret = nxt_router_threads_create(task, rt, tmcf);
493     if (nxt_slow_path(ret != NXT_OK)) {
494         goto fail;
495     }
496 
497     nxt_router_apps_sort(router, tmcf);
498 
499     nxt_router_engines_post(tmcf);
500 
501     nxt_queue_add(&router->sockets, &tmcf->updating);
502     nxt_queue_add(&router->sockets, &tmcf->creating);
503 
504     nxt_router_conf_ready(task, tmcf);
505 
506     return;
507 
508 fail:
509 
510     nxt_router_conf_error(task, tmcf);
511 
512     return;
513 }
514 
515 
516 static void
517 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data)
518 {
519     nxt_joint_job_t  *job;
520 
521     job = obj;
522 
523     nxt_router_conf_ready(task, job->tmcf);
524 }
525 
526 
527 static void
528 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
529 {
530     nxt_debug(task, "temp conf count:%D", tmcf->count);
531 
532     if (--tmcf->count == 0) {
533         nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
534     }
535 }
536 
537 
538 static void
539 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
540 {
541     nxt_socket_t       s;
542     nxt_router_t       *router;
543     nxt_queue_link_t   *qlk;
544     nxt_socket_conf_t  *skcf;
545 
546     nxt_log(task, NXT_LOG_CRIT, "failed to apply new conf");
547 
548     for (qlk = nxt_queue_first(&tmcf->creating);
549          qlk != nxt_queue_tail(&tmcf->creating);
550          qlk = nxt_queue_next(qlk))
551     {
552         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
553         s = skcf->listen.socket;
554 
555         if (s != -1) {
556             nxt_socket_close(task, s);
557         }
558 
559         nxt_free(skcf->socket);
560     }
561 
562     router = tmcf->conf->router;
563 
564     nxt_queue_add(&router->sockets, &tmcf->keeping);
565     nxt_queue_add(&router->sockets, &tmcf->deleting);
566 
567     // TODO: new engines and threads
568 
569     nxt_mp_destroy(tmcf->conf->mem_pool);
570 
571     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
572 }
573 
574 
575 static void
576 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
577     nxt_port_msg_type_t type)
578 {
579     nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);
580 }
581 
582 
583 static nxt_conf_map_t  nxt_router_conf[] = {
584     {
585         nxt_string("listeners_threads"),
586         NXT_CONF_MAP_INT32,
587         offsetof(nxt_router_conf_t, threads),
588     },
589 };
590 
591 
592 static nxt_conf_map_t  nxt_router_app_conf[] = {
593     {
594         nxt_string("type"),
595         NXT_CONF_MAP_STR,
596         offsetof(nxt_router_app_conf_t, type),
597     },
598 
599     {
600         nxt_string("workers"),
601         NXT_CONF_MAP_INT32,
602         offsetof(nxt_router_app_conf_t, workers),
603     },
604 };
605 
606 
607 static nxt_conf_map_t  nxt_router_listener_conf[] = {
608     {
609         nxt_string("application"),
610         NXT_CONF_MAP_STR,
611         offsetof(nxt_router_listener_conf_t, application),
612     },
613 };
614 
615 
616 static nxt_conf_map_t  nxt_router_http_conf[] = {
617     {
618         nxt_string("header_buffer_size"),
619         NXT_CONF_MAP_SIZE,
620         offsetof(nxt_socket_conf_t, header_buffer_size),
621     },
622 
623     {
624         nxt_string("large_header_buffer_size"),
625         NXT_CONF_MAP_SIZE,
626         offsetof(nxt_socket_conf_t, large_header_buffer_size),
627     },
628 
629     {
630         nxt_string("large_header_buffers"),
631         NXT_CONF_MAP_SIZE,
632         offsetof(nxt_socket_conf_t, large_header_buffers),
633     },
634 
635     {
636         nxt_string("body_buffer_size"),
637         NXT_CONF_MAP_SIZE,
638         offsetof(nxt_socket_conf_t, body_buffer_size),
639     },
640 
641     {
642         nxt_string("max_body_size"),
643         NXT_CONF_MAP_SIZE,
644         offsetof(nxt_socket_conf_t, max_body_size),
645     },
646 
647     {
648         nxt_string("header_read_timeout"),
649         NXT_CONF_MAP_MSEC,
650         offsetof(nxt_socket_conf_t, header_read_timeout),
651     },
652 
653     {
654         nxt_string("body_read_timeout"),
655         NXT_CONF_MAP_MSEC,
656         offsetof(nxt_socket_conf_t, body_read_timeout),
657     },
658 };
659 
660 
661 static nxt_int_t
662 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
663     u_char *start, u_char *end)
664 {
665     u_char                      *p;
666     size_t                      size;
667     nxt_mp_t                    *mp;
668     uint32_t                    next;
669     nxt_int_t                   ret;
670     nxt_str_t                   name;
671     nxt_app_t                   *app, *prev;
672     nxt_app_type_t              type;
673     nxt_sockaddr_t              *sa;
674     nxt_conf_value_t            *conf, *http;
675     nxt_conf_value_t            *applications, *application;
676     nxt_conf_value_t            *listeners, *listener;
677     nxt_socket_conf_t           *skcf;
678     nxt_app_lang_module_t       *lang;
679     nxt_router_app_conf_t       apcf;
680     nxt_router_listener_conf_t  lscf;
681 
682     static nxt_str_t  http_path = nxt_string("/http");
683     static nxt_str_t  applications_path = nxt_string("/applications");
684     static nxt_str_t  listeners_path = nxt_string("/listeners");
685 
686     conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
687     if (conf == NULL) {
688         nxt_log(task, NXT_LOG_CRIT, "configuration parsing error");
689         return NXT_ERROR;
690     }
691 
692     mp = tmcf->conf->mem_pool;
693 
694     ret = nxt_conf_map_object(mp, conf, nxt_router_conf,
695                               nxt_nitems(nxt_router_conf), tmcf->conf);
696     if (ret != NXT_OK) {
697         nxt_log(task, NXT_LOG_CRIT, "root map error");
698         return NXT_ERROR;
699     }
700 
701     if (tmcf->conf->threads == 0) {
702         tmcf->conf->threads = nxt_ncpu;
703     }
704 
705     applications = nxt_conf_get_path(conf, &applications_path);
706     if (applications == NULL) {
707         nxt_log(task, NXT_LOG_CRIT, "no \"applications\" block");
708         return NXT_ERROR;
709     }
710 
711     next = 0;
712 
713     for ( ;; ) {
714         application = nxt_conf_next_object_member(applications, &name, &next);
715         if (application == NULL) {
716             break;
717         }
718 
719         nxt_debug(task, "application \"%V\"", &name);
720 
721         size = nxt_conf_json_length(application, NULL);
722 
723         app = nxt_malloc(sizeof(nxt_app_t) + name.length + size);
724         if (app == NULL) {
725             goto fail;
726         }
727 
728         nxt_memzero(app, sizeof(nxt_app_t));
729 
730         app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t));
731         app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t) + name.length);
732 
733         p = nxt_conf_json_print(app->conf.start, application, NULL);
734         app->conf.length = p - app->conf.start;
735 
736         nxt_assert(app->conf.length <= size);
737 
738         nxt_debug(task, "application conf \"%V\"", &app->conf);
739 
740         prev = nxt_router_app_find(&tmcf->conf->router->apps, &name);
741 
742         if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
743             nxt_free(app);
744 
745             nxt_queue_remove(&prev->link);
746             nxt_queue_insert_tail(&tmcf->previous, &prev->link);
747             continue;
748         }
749 
750         apcf.workers = 1;
751 
752         ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
753                                   nxt_nitems(nxt_router_app_conf), &apcf);
754         if (ret != NXT_OK) {
755             nxt_log(task, NXT_LOG_CRIT, "application map error");
756             goto app_fail;
757         }
758 
759         nxt_debug(task, "application type: %V", &apcf.type);
760         nxt_debug(task, "application workers: %D", apcf.workers);
761 
762         lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
763 
764         if (lang == NULL) {
765             nxt_log(task, NXT_LOG_CRIT, "unknown application type: \"%V\"",
766                     &apcf.type);
767             goto app_fail;
768         }
769 
770         nxt_debug(task, "application language module: \"%s\"", lang->file);
771 
772         type = nxt_app_parse_type(&lang->type);
773 
774         if (type == NXT_APP_UNKNOWN) {
775             nxt_log(task, NXT_LOG_CRIT, "unknown application type: \"%V\"",
776                     &lang->type);
777             goto app_fail;
778         }
779 
780         if (nxt_app_prepare_msg[type] == NULL) {
781             nxt_log(task, NXT_LOG_CRIT, "unsupported application type: \"%V\"",
782                     &lang->type);
783             goto app_fail;
784         }
785 
786         ret = nxt_thread_mutex_create(&app->mutex);
787         if (ret != NXT_OK) {
788             goto app_fail;
789         }
790 
791         nxt_queue_init(&app->ports);
792         nxt_queue_init(&app->requests);
793 
794         app->name.length = name.length;
795         nxt_memcpy(app->name.start, name.start, name.length);
796 
797         app->type = type;
798         app->max_workers = apcf.workers;
799         app->live = 1;
800         app->prepare_msg = nxt_app_prepare_msg[type];
801 
802         nxt_queue_insert_tail(&tmcf->apps, &app->link);
803     }
804 
805     http = nxt_conf_get_path(conf, &http_path);
806 #if 0
807     if (http == NULL) {
808         nxt_log(task, NXT_LOG_CRIT, "no \"http\" block");
809         return NXT_ERROR;
810     }
811 #endif
812 
813     listeners = nxt_conf_get_path(conf, &listeners_path);
814     if (listeners == NULL) {
815         nxt_log(task, NXT_LOG_CRIT, "no \"listeners\" block");
816         return NXT_ERROR;
817     }
818 
819     next = 0;
820 
821     for ( ;; ) {
822         listener = nxt_conf_next_object_member(listeners, &name, &next);
823         if (listener == NULL) {
824             break;
825         }
826 
827         sa = nxt_sockaddr_parse(mp, &name);
828         if (sa == NULL) {
829             nxt_log(task, NXT_LOG_CRIT, "invalid listener \"%V\"", &name);
830             goto fail;
831         }
832 
833         sa->type = SOCK_STREAM;
834 
835         nxt_debug(task, "router listener: \"%*s\"",
836                   sa->length, nxt_sockaddr_start(sa));
837 
838         skcf = nxt_router_socket_conf(task, mp, sa);
839         if (skcf == NULL) {
840             goto fail;
841         }
842 
843         ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
844                                   nxt_nitems(nxt_router_listener_conf), &lscf);
845         if (ret != NXT_OK) {
846             nxt_log(task, NXT_LOG_CRIT, "listener map error");
847             goto fail;
848         }
849 
850         nxt_debug(task, "application: %V", &lscf.application);
851 
852         // STUB, default values if http block is not defined.
853         skcf->header_buffer_size = 2048;
854         skcf->large_header_buffer_size = 8192;
855         skcf->large_header_buffers = 4;
856         skcf->body_buffer_size = 16 * 1024;
857         skcf->max_body_size = 2 * 1024 * 1024;
858         skcf->header_read_timeout = 5000;
859         skcf->body_read_timeout = 5000;
860 
861         if (http != NULL) {
862             ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
863                                       nxt_nitems(nxt_router_http_conf), skcf);
864             if (ret != NXT_OK) {
865                 nxt_log(task, NXT_LOG_CRIT, "http map error");
866                 goto fail;
867             }
868         }
869 
870         skcf->listen.handler = nxt_router_conn_init;
871         skcf->router_conf = tmcf->conf;
872         skcf->router_conf->count++;
873         skcf->application = nxt_router_listener_application(tmcf,
874                                                             &lscf.application);
875 
876         nxt_queue_insert_tail(&tmcf->pending, &skcf->link);
877     }
878 
879     nxt_router_listen_sockets_sort(tmcf->conf->router, tmcf);
880 
881     return NXT_OK;
882 
883 app_fail:
884 
885     nxt_free(app);
886 
887 fail:
888 
889     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
890 
891         nxt_queue_remove(&app->link);
892         nxt_thread_mutex_destroy(&app->mutex);
893         nxt_free(app);
894 
895     } nxt_queue_loop;
896 
897     return NXT_ERROR;
898 }
899 
900 
901 static nxt_app_t *
902 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
903 {
904     nxt_app_t  *app;
905 
906     nxt_queue_each(app, queue, nxt_app_t, link) {
907 
908         if (nxt_strstr_eq(name, &app->name)) {
909             return app;
910         }
911 
912     } nxt_queue_loop;
913 
914     return NULL;
915 }
916 
917 
918 static nxt_app_t *
919 nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name)
920 {
921     nxt_app_t  *app;
922 
923     app = nxt_router_app_find(&tmcf->apps, name);
924 
925     if (app == NULL) {
926         app = nxt_router_app_find(&tmcf->previous, name);
927     }
928 
929     return app;
930 }
931 
932 
933 static nxt_socket_conf_t *
934 nxt_router_socket_conf(nxt_task_t *task, nxt_mp_t *mp, nxt_sockaddr_t *sa)
935 {
936     nxt_socket_conf_t  *skcf;
937 
938     skcf = nxt_mp_zget(mp, sizeof(nxt_socket_conf_t));
939     if (nxt_slow_path(skcf == NULL)) {
940         return NULL;
941     }
942 
943     skcf->sockaddr = sa;
944 
945     skcf->listen.sockaddr = sa;
946     skcf->listen.socklen = sa->socklen;
947     skcf->listen.address_length = sa->length;
948 
949     skcf->listen.socket = -1;
950     skcf->listen.backlog = NXT_LISTEN_BACKLOG;
951     skcf->listen.flags = NXT_NONBLOCK;
952     skcf->listen.read_after_accept = 1;
953 
954     return skcf;
955 }
956 
957 
958 static void
959 nxt_router_listen_sockets_sort(nxt_router_t *router,
960     nxt_router_temp_conf_t *tmcf)
961 {
962     nxt_queue_link_t   *nqlk, *oqlk, *next;
963     nxt_socket_conf_t  *nskcf, *oskcf;
964 
965     for (nqlk = nxt_queue_first(&tmcf->pending);
966          nqlk != nxt_queue_tail(&tmcf->pending);
967          nqlk = next)
968     {
969         next = nxt_queue_next(nqlk);
970         nskcf = nxt_queue_link_data(nqlk, nxt_socket_conf_t, link);
971 
972         for (oqlk = nxt_queue_first(&router->sockets);
973              oqlk != nxt_queue_tail(&router->sockets);
974              oqlk = nxt_queue_next(oqlk))
975         {
976             oskcf = nxt_queue_link_data(oqlk, nxt_socket_conf_t, link);
977 
978             if (nxt_sockaddr_cmp(nskcf->sockaddr, oskcf->sockaddr)) {
979                 nskcf->socket = oskcf->socket;
980                 nskcf->listen.socket = oskcf->listen.socket;
981 
982                 nxt_queue_remove(oqlk);
983                 nxt_queue_insert_tail(&tmcf->keeping, oqlk);
984 
985                 nxt_queue_remove(nqlk);
986                 nxt_queue_insert_tail(&tmcf->updating, nqlk);
987 
988                 break;
989             }
990         }
991     }
992 
993     nxt_queue_add(&tmcf->deleting, &router->sockets);
994     nxt_queue_init(&router->sockets);
995 }
996 
997 
998 static void
999 nxt_router_listen_socket_rpc_create(nxt_task_t *task,
1000     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)
1001 {
1002     uint32_t          stream;
1003     nxt_buf_t         *b;
1004     nxt_port_t        *main_port, *router_port;
1005     nxt_runtime_t     *rt;
1006     nxt_socket_rpc_t  *rpc;
1007 
1008     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
1009     if (rpc == NULL) {
1010         goto fail;
1011     }
1012 
1013     rpc->socket_conf = skcf;
1014     rpc->temp_conf = tmcf;
1015 
1016     b = nxt_buf_mem_alloc(tmcf->mem_pool, skcf->sockaddr->sockaddr_size, 0);
1017     if (b == NULL) {
1018         goto fail;
1019     }
1020 
1021     b->mem.free = nxt_cpymem(b->mem.free, skcf->sockaddr,
1022                              skcf->sockaddr->sockaddr_size);
1023 
1024     rt = task->thread->runtime;
1025     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
1026     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
1027 
1028     stream = nxt_port_rpc_register_handler(task, router_port,
1029                                            nxt_router_listen_socket_ready,
1030                                            nxt_router_listen_socket_error,
1031                                            main_port->pid, rpc);
1032     if (stream == 0) {
1033         goto fail;
1034     }
1035 
1036     nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1,
1037                           stream, router_port->id, b);
1038 
1039     return;
1040 
1041 fail:
1042 
1043     nxt_router_conf_error(task, tmcf);
1044 }
1045 
1046 
1047 static void
1048 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
1049     void *data)
1050 {
1051     nxt_int_t            ret;
1052     nxt_socket_t         s;
1053     nxt_socket_rpc_t     *rpc;
1054     nxt_router_socket_t  *rtsk;
1055 
1056     rpc = data;
1057 
1058     s = msg->fd;
1059 
1060     ret = nxt_socket_nonblocking(task, s);
1061     if (nxt_slow_path(ret != NXT_OK)) {
1062         goto fail;
1063     }
1064 
1065     nxt_socket_defer_accept(task, s, rpc->socket_conf->sockaddr);
1066 
1067     ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
1068     if (nxt_slow_path(ret != NXT_OK)) {
1069         goto fail;
1070     }
1071 
1072     rtsk = nxt_malloc(sizeof(nxt_router_socket_t));
1073     if (nxt_slow_path(rtsk == NULL)) {
1074         goto fail;
1075     }
1076 
1077     rtsk->count = 0;
1078     rtsk->fd = s;
1079 
1080     rpc->socket_conf->listen.socket = s;
1081     rpc->socket_conf->socket = rtsk;
1082 
1083     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
1084                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
1085 
1086     return;
1087 
1088 fail:
1089 
1090     nxt_socket_close(task, s);
1091 
1092     nxt_router_conf_error(task, rpc->temp_conf);
1093 }
1094 
1095 
1096 static void
1097 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
1098     void *data)
1099 {
1100     u_char                  *p;
1101     size_t                  size;
1102     uint8_t                 error;
1103     nxt_buf_t               *in, *out;
1104     nxt_sockaddr_t          *sa;
1105     nxt_socket_rpc_t        *rpc;
1106     nxt_router_temp_conf_t  *tmcf;
1107 
1108     static nxt_str_t  socket_errors[] = {
1109         nxt_string("ListenerSystem"),
1110         nxt_string("ListenerNoIPv6"),
1111         nxt_string("ListenerPort"),
1112         nxt_string("ListenerInUse"),
1113         nxt_string("ListenerNoAddress"),
1114         nxt_string("ListenerNoAccess"),
1115         nxt_string("ListenerPath"),
1116     };
1117 
1118     rpc = data;
1119     sa = rpc->socket_conf->sockaddr;
1120 
1121     in = msg->buf;
1122     p = in->mem.pos;
1123 
1124     error = *p++;
1125 
1126     size = sizeof("listen socket error: ") - 1
1127            + sizeof("{listener: \"\", code:\"\", message: \"\"}") - 1
1128            + sa->length + socket_errors[error].length + (in->mem.free - p);
1129 
1130     tmcf = rpc->temp_conf;
1131 
1132     out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
1133     if (nxt_slow_path(out == NULL)) {
1134         return;
1135     }
1136 
1137     out->mem.free = nxt_sprintf(out->mem.free, out->mem.end,
1138                         "listen socket error: "
1139                         "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}",
1140                         sa->length, nxt_sockaddr_start(sa),
1141                         &socket_errors[error], in->mem.free - p, p);
1142 
1143     nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos);
1144 
1145     nxt_router_conf_error(task, tmcf);
1146 }
1147 
1148 
1149 static nxt_int_t
1150 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
1151     nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
1152 {
1153     nxt_int_t                 ret;
1154     nxt_uint_t                n, threads;
1155     nxt_queue_link_t          *qlk;
1156     nxt_router_engine_conf_t  *recf;
1157 
1158     threads = tmcf->conf->threads;
1159 
1160     tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
1161                                      sizeof(nxt_router_engine_conf_t));
1162     if (nxt_slow_path(tmcf->engines == NULL)) {
1163         return NXT_ERROR;
1164     }
1165 
1166     n = 0;
1167 
1168     for (qlk = nxt_queue_first(&router->engines);
1169          qlk != nxt_queue_tail(&router->engines);
1170          qlk = nxt_queue_next(qlk))
1171     {
1172         recf = nxt_array_zero_add(tmcf->engines);
1173         if (nxt_slow_path(recf == NULL)) {
1174             return NXT_ERROR;
1175         }
1176 
1177         recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0);
1178 
1179         if (n < threads) {
1180             ret = nxt_router_engine_conf_update(tmcf, recf);
1181 
1182         } else {
1183             ret = nxt_router_engine_conf_delete(tmcf, recf);
1184         }
1185 
1186         if (nxt_slow_path(ret != NXT_OK)) {
1187             return ret;
1188         }
1189 
1190         n++;
1191     }
1192 
1193     tmcf->new_threads = n;
1194 
1195     while (n < threads) {
1196         recf = nxt_array_zero_add(tmcf->engines);
1197         if (nxt_slow_path(recf == NULL)) {
1198             return NXT_ERROR;
1199         }
1200 
1201         recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
1202         if (nxt_slow_path(recf->engine == NULL)) {
1203             return NXT_ERROR;
1204         }
1205 
1206         ret = nxt_router_engine_conf_create(tmcf, recf);
1207         if (nxt_slow_path(ret != NXT_OK)) {
1208             return ret;
1209         }
1210 
1211         nxt_queue_insert_tail(&router->engines, &recf->engine->link0);
1212 
1213         n++;
1214     }
1215 
1216     return NXT_OK;
1217 }
1218 
1219 
1220 static nxt_int_t
1221 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
1222     nxt_router_engine_conf_t *recf)
1223 {
1224     nxt_int_t              ret;
1225     nxt_thread_spinlock_t  *lock;
1226 
1227     ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating,
1228                                           nxt_router_listen_socket_create);
1229     if (nxt_slow_path(ret != NXT_OK)) {
1230         return ret;
1231     }
1232 
1233     ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating,
1234                                           nxt_router_listen_socket_create);
1235     if (nxt_slow_path(ret != NXT_OK)) {
1236         return ret;
1237     }
1238 
1239     lock = &tmcf->conf->router->lock;
1240 
1241     nxt_thread_spin_lock(lock);
1242 
1243     nxt_router_engine_socket_count(&tmcf->creating);
1244     nxt_router_engine_socket_count(&tmcf->updating);
1245 
1246     nxt_thread_spin_unlock(lock);
1247 
1248     return ret;
1249 }
1250 
1251 
1252 static nxt_int_t
1253 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
1254     nxt_router_engine_conf_t *recf)
1255 {
1256     nxt_int_t              ret;
1257     nxt_thread_spinlock_t  *lock;
1258 
1259     ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating,
1260                                           nxt_router_listen_socket_create);
1261     if (nxt_slow_path(ret != NXT_OK)) {
1262         return ret;
1263     }
1264 
1265     ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating,
1266                                           nxt_router_listen_socket_update);
1267     if (nxt_slow_path(ret != NXT_OK)) {
1268         return ret;
1269     }
1270 
1271     ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting);
1272     if (nxt_slow_path(ret != NXT_OK)) {
1273         return ret;
1274     }
1275 
1276     lock = &tmcf->conf->router->lock;
1277 
1278     nxt_thread_spin_lock(lock);
1279 
1280     nxt_router_engine_socket_count(&tmcf->creating);
1281 
1282     nxt_thread_spin_unlock(lock);
1283 
1284     return ret;
1285 }
1286 
1287 
1288 static nxt_int_t
1289 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
1290     nxt_router_engine_conf_t *recf)
1291 {
1292     nxt_int_t  ret;
1293 
1294     ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->updating);
1295     if (nxt_slow_path(ret != NXT_OK)) {
1296         return ret;
1297     }
1298 
1299     return nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting);
1300 }
1301 
1302 
1303 static nxt_int_t
1304 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
1305     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
1306     nxt_work_handler_t handler)
1307 {
1308     nxt_joint_job_t          *job;
1309     nxt_queue_link_t         *qlk;
1310     nxt_socket_conf_t        *skcf;
1311     nxt_socket_conf_joint_t  *joint;
1312 
1313     for (qlk = nxt_queue_first(sockets);
1314          qlk != nxt_queue_tail(sockets);
1315          qlk = nxt_queue_next(qlk))
1316     {
1317         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
1318         if (nxt_slow_path(job == NULL)) {
1319             return NXT_ERROR;
1320         }
1321 
1322         job->work.next = recf->jobs;
1323         recf->jobs = &job->work;
1324 
1325         job->task = tmcf->engine->task;
1326         job->work.handler = handler;
1327         job->work.task = &job->task;
1328         job->work.obj = job;
1329         job->tmcf = tmcf;
1330 
1331         tmcf->count++;
1332 
1333         joint = nxt_mp_alloc(tmcf->conf->mem_pool,
1334                              sizeof(nxt_socket_conf_joint_t));
1335         if (nxt_slow_path(joint == NULL)) {
1336             return NXT_ERROR;
1337         }
1338 
1339         job->work.data = joint;
1340 
1341         joint->count = 1;
1342 
1343         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1344         skcf->count++;
1345         joint->socket_conf = skcf;
1346 
1347         joint->engine = recf->engine;
1348     }
1349 
1350     return NXT_OK;
1351 }
1352 
1353 
1354 static void
1355 nxt_router_engine_socket_count(nxt_queue_t *sockets)
1356 {
1357     nxt_queue_link_t   *qlk;
1358     nxt_socket_conf_t  *skcf;
1359 
1360     for (qlk = nxt_queue_first(sockets);
1361          qlk != nxt_queue_tail(sockets);
1362          qlk = nxt_queue_next(qlk))
1363     {
1364         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1365         skcf->socket->count++;
1366     }
1367 }
1368 
1369 
1370 static nxt_int_t
1371 nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
1372     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets)
1373 {
1374     nxt_joint_job_t   *job;
1375     nxt_queue_link_t  *qlk;
1376 
1377     for (qlk = nxt_queue_first(sockets);
1378          qlk != nxt_queue_tail(sockets);
1379          qlk = nxt_queue_next(qlk))
1380     {
1381         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
1382         if (nxt_slow_path(job == NULL)) {
1383             return NXT_ERROR;
1384         }
1385 
1386         job->work.next = recf->jobs;
1387         recf->jobs = &job->work;
1388 
1389         job->task = tmcf->engine->task;
1390         job->work.handler = nxt_router_listen_socket_delete;
1391         job->work.task = &job->task;
1392         job->work.obj = job;
1393         job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1394         job->tmcf = tmcf;
1395 
1396         tmcf->count++;
1397     }
1398 
1399     return NXT_OK;
1400 }
1401 
1402 
1403 static nxt_int_t
1404 nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
1405     nxt_router_temp_conf_t *tmcf)
1406 {
1407     nxt_int_t                 ret;
1408     nxt_uint_t                i, threads;
1409     nxt_router_engine_conf_t  *recf;
1410 
1411     recf = tmcf->engines->elts;
1412     threads = tmcf->conf->threads;
1413 
1414     for (i = tmcf->new_threads; i < threads; i++) {
1415         ret = nxt_router_thread_create(task, rt, recf[i].engine);
1416         if (nxt_slow_path(ret != NXT_OK)) {
1417             return ret;
1418         }
1419     }
1420 
1421     return NXT_OK;
1422 }
1423 
1424 
1425 static nxt_int_t
1426 nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
1427     nxt_event_engine_t *engine)
1428 {
1429     nxt_int_t            ret;
1430     nxt_thread_link_t    *link;
1431     nxt_thread_handle_t  handle;
1432 
1433     link = nxt_zalloc(sizeof(nxt_thread_link_t));
1434 
1435     if (nxt_slow_path(link == NULL)) {
1436         return NXT_ERROR;
1437     }
1438 
1439     link->start = nxt_router_thread_start;
1440     link->engine = engine;
1441     link->work.handler = nxt_router_thread_exit_handler;
1442     link->work.task = task;
1443     link->work.data = link;
1444 
1445     nxt_queue_insert_tail(&rt->engines, &engine->link);
1446 
1447     ret = nxt_thread_create(&handle, link);
1448 
1449     if (nxt_slow_path(ret != NXT_OK)) {
1450         nxt_queue_remove(&engine->link);
1451     }
1452 
1453     return ret;
1454 }
1455 
1456 
1457 static void
1458 nxt_router_apps_sort(nxt_router_t *router, nxt_router_temp_conf_t *tmcf)
1459 {
1460     nxt_app_t    *app;
1461     nxt_port_t   *port;
1462 
1463     nxt_queue_each(app, &router->apps, nxt_app_t, link) {
1464 
1465         nxt_queue_remove(&app->link);
1466 
1467         nxt_thread_log_debug("about to remove app '%V' %p", &app->name, app);
1468 
1469         app->live = 0;
1470 
1471         if (nxt_router_app_free(NULL, app) != 0) {
1472             continue;
1473         }
1474 
1475         if (!nxt_queue_is_empty(&app->requests)) {
1476 
1477             nxt_thread_log_debug("app '%V' %p pending requests found",
1478                                  &app->name, app);
1479             continue;
1480         }
1481 
1482         do {
1483             port = nxt_router_app_get_port(app, 0);
1484             if (port == NULL) {
1485                 break;
1486             }
1487 
1488             nxt_thread_log_debug("port %p send quit", port);
1489 
1490             nxt_port_socket_write(&port->engine->task, port,
1491                                   NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
1492         } while (1);
1493 
1494     } nxt_queue_loop;
1495 
1496     nxt_queue_add(&router->apps, &tmcf->previous);
1497     nxt_queue_add(&router->apps, &tmcf->apps);
1498 }
1499 
1500 
1501 static void
1502 nxt_router_engines_post(nxt_router_temp_conf_t *tmcf)
1503 {
1504     nxt_uint_t                n;
1505     nxt_router_engine_conf_t  *recf;
1506 
1507     recf = tmcf->engines->elts;
1508 
1509     for (n = tmcf->engines->nelts; n != 0; n--) {
1510         nxt_router_engine_post(recf);
1511         recf++;
1512     }
1513 }
1514 
1515 
1516 static void
1517 nxt_router_engine_post(nxt_router_engine_conf_t *recf)
1518 {
1519     nxt_work_t  *work, *next;
1520 
1521     for (work = recf->jobs; work != NULL; work = next) {
1522         next = work->next;
1523         work->next = NULL;
1524 
1525         nxt_event_engine_post(recf->engine, work);
1526     }
1527 }
1528 
1529 
1530 static nxt_port_handler_t  nxt_router_app_port_handlers[] = {
1531     NULL, /* NXT_PORT_MSG_QUIT         */
1532     NULL, /* NXT_PORT_MSG_NEW_PORT     */
1533     NULL, /* NXT_PORT_MSG_CHANGE_FILE  */
1534     /* TODO: remove mmap_handler from app ports */
1535     nxt_port_mmap_handler, /* NXT_PORT_MSG_MMAP         */
1536     nxt_router_app_data_handler,
1537     NULL, /* NXT_PORT_MSG_REMOVE_PID   */
1538     NULL, /* NXT_PORT_MSG_READY        */
1539     NULL, /* NXT_PORT_MSG_START_WORKER */
1540     nxt_port_rpc_handler,
1541     nxt_port_rpc_handler,
1542 };
1543 
1544 
1545 static void
1546 nxt_router_thread_start(void *data)
1547 {
1548     nxt_int_t           ret;
1549     nxt_port_t          *port;
1550     nxt_task_t          *task;
1551     nxt_thread_t        *thread;
1552     nxt_thread_link_t   *link;
1553     nxt_event_engine_t  *engine;
1554 
1555     link = data;
1556     engine = link->engine;
1557     task = &engine->task;
1558 
1559     thread = nxt_thread();
1560 
1561     nxt_event_engine_thread_adopt(engine);
1562 
1563     /* STUB */
1564     thread->runtime = engine->task.thread->runtime;
1565 
1566     engine->task.thread = thread;
1567     engine->task.log = thread->log;
1568     thread->engine = engine;
1569     thread->task = &engine->task;
1570     thread->fiber = &engine->fibers->fiber;
1571 
1572     engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
1573 
1574     port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid,
1575                         NXT_PROCESS_ROUTER);
1576     if (nxt_slow_path(port == NULL)) {
1577         return;
1578     }
1579 
1580     ret = nxt_port_socket_init(task, port, 0);
1581     if (nxt_slow_path(ret != NXT_OK)) {
1582         nxt_mp_release(port->mem_pool, port);
1583         return;
1584     }
1585 
1586     engine->port = port;
1587 
1588     nxt_port_enable(task, port, nxt_router_app_port_handlers);
1589 
1590     nxt_event_engine_start(engine);
1591 }
1592 
1593 
1594 static void
1595 nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data)
1596 {
1597     nxt_joint_job_t          *job;
1598     nxt_listen_event_t       *listen;
1599     nxt_listen_socket_t      *ls;
1600     nxt_socket_conf_joint_t  *joint;
1601 
1602     job = obj;
1603     joint = data;
1604 
1605     ls = &joint->socket_conf->listen;
1606 
1607     nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link);
1608 
1609     listen = nxt_listen_event(task, ls);
1610     if (nxt_slow_path(listen == NULL)) {
1611         nxt_router_listen_socket_release(task, joint);
1612         return;
1613     }
1614 
1615     listen->socket.data = joint;
1616 
1617     job->work.next = NULL;
1618     job->work.handler = nxt_router_conf_wait;
1619 
1620     nxt_event_engine_post(job->tmcf->engine, &job->work);
1621 }
1622 
1623 
1624 nxt_inline nxt_listen_event_t *
1625 nxt_router_listen_event(nxt_queue_t *listen_connections,
1626     nxt_socket_conf_t *skcf)
1627 {
1628     nxt_socket_t        fd;
1629     nxt_queue_link_t    *qlk;
1630     nxt_listen_event_t  *listen;
1631 
1632     fd = skcf->socket->fd;
1633 
1634     for (qlk = nxt_queue_first(listen_connections);
1635          qlk != nxt_queue_tail(listen_connections);
1636          qlk = nxt_queue_next(qlk))
1637     {
1638         listen = nxt_queue_link_data(qlk, nxt_listen_event_t, link);
1639 
1640         if (fd == listen->socket.fd) {
1641             return listen;
1642         }
1643     }
1644 
1645     return NULL;
1646 }
1647 
1648 
1649 static void
1650 nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data)
1651 {
1652     nxt_joint_job_t          *job;
1653     nxt_event_engine_t       *engine;
1654     nxt_listen_event_t       *listen;
1655     nxt_socket_conf_joint_t  *joint, *old;
1656 
1657     job = obj;
1658     joint = data;
1659 
1660     engine = task->thread->engine;
1661 
1662     nxt_queue_insert_tail(&engine->joints, &joint->link);
1663 
1664     listen = nxt_router_listen_event(&engine->listen_connections,
1665                                      joint->socket_conf);
1666 
1667     old = listen->socket.data;
1668     listen->socket.data = joint;
1669     listen->listen = &joint->socket_conf->listen;
1670 
1671     job->work.next = NULL;
1672     job->work.handler = nxt_router_conf_wait;
1673 
1674     nxt_event_engine_post(job->tmcf->engine, &job->work);
1675 
1676     /*
1677      * The task is allocated from configuration temporary
1678      * memory pool so it can be freed after engine post operation.
1679      */
1680 
1681     nxt_router_conf_release(&engine->task, old);
1682 }
1683 
1684 
1685 static void
1686 nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data)
1687 {
1688     nxt_joint_job_t     *job;
1689     nxt_socket_conf_t   *skcf;
1690     nxt_listen_event_t  *listen;
1691     nxt_event_engine_t  *engine;
1692 
1693     job = obj;
1694     skcf = data;
1695 
1696     engine = task->thread->engine;
1697 
1698     listen = nxt_router_listen_event(&engine->listen_connections, skcf);
1699 
1700     nxt_fd_event_delete(engine, &listen->socket);
1701 
1702     nxt_debug(task, "engine %p: listen socket delete: %d", engine,
1703               listen->socket.fd);
1704 
1705     listen->timer.handler = nxt_router_listen_socket_close;
1706     listen->timer.work_queue = &engine->fast_work_queue;
1707 
1708     nxt_timer_add(engine, &listen->timer, 0);
1709 
1710     job->work.next = NULL;
1711     job->work.handler = nxt_router_conf_wait;
1712 
1713     nxt_event_engine_post(job->tmcf->engine, &job->work);
1714 }
1715 
1716 
1717 static void
1718 nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data)
1719 {
1720     nxt_timer_t              *timer;
1721     nxt_listen_event_t       *listen;
1722     nxt_socket_conf_joint_t  *joint;
1723 
1724     timer = obj;
1725     listen = nxt_timer_data(timer, nxt_listen_event_t, timer);
1726     joint = listen->socket.data;
1727 
1728     nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine,
1729               listen->socket.fd);
1730 
1731     nxt_queue_remove(&listen->link);
1732 
1733     /* 'task' refers to listen->task and we cannot use after nxt_free() */
1734     task = &task->thread->engine->task;
1735 
1736     nxt_free(listen);
1737 
1738     nxt_router_listen_socket_release(task, joint);
1739 }
1740 
1741 
1742 static void
1743 nxt_router_listen_socket_release(nxt_task_t *task,
1744     nxt_socket_conf_joint_t *joint)
1745 {
1746     nxt_socket_conf_t      *skcf;
1747     nxt_router_socket_t    *rtsk;
1748     nxt_thread_spinlock_t  *lock;
1749 
1750     skcf = joint->socket_conf;
1751     rtsk = skcf->socket;
1752     lock = &skcf->router_conf->router->lock;
1753 
1754     nxt_thread_spin_lock(lock);
1755 
1756     nxt_debug(task, "engine %p: listen socket release: rtsk->count %D",
1757               task->thread->engine, rtsk->count);
1758 
1759     if (--rtsk->count != 0) {
1760         rtsk = NULL;
1761     }
1762 
1763     nxt_thread_spin_unlock(lock);
1764 
1765     if (rtsk != NULL) {
1766         nxt_socket_close(task, rtsk->fd);
1767         nxt_free(rtsk);
1768         skcf->socket = NULL;
1769     }
1770 
1771     nxt_router_conf_release(task, joint);
1772 }
1773 
1774 
1775 static void
1776 nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
1777 {
1778     nxt_bool_t             exit;
1779     nxt_socket_conf_t      *skcf;
1780     nxt_router_conf_t      *rtcf;
1781     nxt_thread_spinlock_t  *lock;
1782 
1783     nxt_debug(task, "conf joint %p count: %D", joint, joint->count);
1784 
1785     if (--joint->count != 0) {
1786         return;
1787     }
1788 
1789     nxt_queue_remove(&joint->link);
1790 
1791     skcf = joint->socket_conf;
1792     rtcf = skcf->router_conf;
1793     lock = &rtcf->router->lock;
1794 
1795     nxt_thread_spin_lock(lock);
1796 
1797     nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count,
1798               rtcf, rtcf->count);
1799 
1800     if (--skcf->count != 0) {
1801         rtcf = NULL;
1802 
1803     } else {
1804         nxt_queue_remove(&skcf->link);
1805 
1806         if (--rtcf->count != 0) {
1807             rtcf = NULL;
1808         }
1809     }
1810 
1811     nxt_thread_spin_unlock(lock);
1812 
1813     /* TODO remove engine->port */
1814     /* TODO excude from connected ports */
1815 
1816     /* The joint content can be used before memory pool destruction. */
1817     exit = nxt_queue_is_empty(&joint->engine->joints);
1818 
1819     if (rtcf != NULL) {
1820         nxt_debug(task, "old router conf is destroyed");
1821 
1822         nxt_mp_thread_adopt(rtcf->mem_pool);
1823 
1824         nxt_mp_destroy(rtcf->mem_pool);
1825     }
1826 
1827     if (exit) {
1828         nxt_thread_exit(task->thread);
1829     }
1830 }
1831 
1832 
1833 static void
1834 nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data)
1835 {
1836     nxt_port_t           *port;
1837     nxt_thread_link_t    *link;
1838     nxt_event_engine_t   *engine;
1839     nxt_thread_handle_t  handle;
1840 
1841     handle = (nxt_thread_handle_t) obj;
1842     link = data;
1843 
1844     nxt_thread_wait(handle);
1845 
1846     engine = link->engine;
1847 
1848     nxt_queue_remove(&engine->link);
1849 
1850     port = engine->port;
1851 
1852     // TODO notify all apps
1853 
1854     nxt_mp_thread_adopt(port->mem_pool);
1855     nxt_port_release(port);
1856 
1857     nxt_mp_thread_adopt(engine->mem_pool);
1858     nxt_mp_destroy(engine->mem_pool);
1859 
1860     nxt_event_engine_free(engine);
1861 
1862     nxt_free(link);
1863 }
1864 
1865 
1866 static const nxt_conn_state_t  nxt_router_conn_read_header_state
1867     nxt_aligned(64) =
1868 {
1869     .ready_handler = nxt_router_conn_http_header_parse,
1870     .close_handler = nxt_router_conn_close,
1871     .error_handler = nxt_router_conn_error,
1872 
1873     .timer_handler = nxt_router_conn_timeout,
1874     .timer_value = nxt_router_conn_timeout_value,
1875     .timer_data = offsetof(nxt_socket_conf_t, header_read_timeout),
1876 };
1877 
1878 
1879 static const nxt_conn_state_t  nxt_router_conn_read_body_state
1880     nxt_aligned(64) =
1881 {
1882     .ready_handler = nxt_router_conn_http_body_read,
1883     .close_handler = nxt_router_conn_close,
1884     .error_handler = nxt_router_conn_error,
1885 
1886     .timer_handler = nxt_router_conn_timeout,
1887     .timer_value = nxt_router_conn_timeout_value,
1888     .timer_data = offsetof(nxt_socket_conf_t, body_read_timeout),
1889     .timer_autoreset = 1,
1890 };
1891 
1892 
1893 static void
1894 nxt_router_conn_init(nxt_task_t *task, void *obj, void *data)
1895 {
1896     size_t                   size;
1897     nxt_conn_t               *c;
1898     nxt_event_engine_t       *engine;
1899     nxt_socket_conf_joint_t  *joint;
1900 
1901     c = obj;
1902     joint = data;
1903 
1904     nxt_debug(task, "router conn init");
1905 
1906     joint->count++;
1907 
1908     size = joint->socket_conf->header_buffer_size;
1909     c->read = nxt_buf_mem_alloc(c->mem_pool, size, 0);
1910 
1911     c->socket.data = NULL;
1912 
1913     engine = task->thread->engine;
1914     c->read_work_queue = &engine->fast_work_queue;
1915     c->write_work_queue = &engine->fast_work_queue;
1916 
1917     c->read_state = &nxt_router_conn_read_header_state;
1918 
1919     nxt_conn_read(engine, c);
1920 }
1921 
1922 
1923 static const nxt_conn_state_t  nxt_router_conn_write_state
1924     nxt_aligned(64) =
1925 {
1926     .ready_handler = nxt_router_conn_ready,
1927     .close_handler = nxt_router_conn_close,
1928     .error_handler = nxt_router_conn_error,
1929 };
1930 
1931 
1932 static void
1933 nxt_router_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1934 {
1935     size_t               dump_size;
1936     nxt_buf_t            *b, *last;
1937     nxt_conn_t           *c;
1938     nxt_req_conn_link_t  *rc;
1939     nxt_event_engine_t   *engine;
1940 
1941     b = msg->buf;
1942     engine = task->thread->engine;
1943 
1944     rc = nxt_event_engine_request_find(engine, msg->port_msg.stream);
1945     if (nxt_slow_path(rc == NULL)) {
1946         nxt_debug(task, "request id %08uxD not found", msg->port_msg.stream);
1947 
1948         return;
1949     }
1950 
1951     c = rc->conn;
1952 
1953     dump_size = nxt_buf_used_size(b);
1954 
1955     if (dump_size > 300) {
1956         dump_size = 300;
1957     }
1958 
1959     nxt_debug(task, "%srouter app data (%z): %*s",
1960               msg->port_msg.last ? "last " : "", msg->size, dump_size,
1961               b->mem.pos);
1962 
1963     if (msg->size == 0) {
1964         b = NULL;
1965     }
1966 
1967     if (msg->port_msg.last != 0) {
1968         nxt_debug(task, "router data create last buf");
1969 
1970         last = nxt_buf_sync_alloc(c->mem_pool, NXT_BUF_SYNC_LAST);
1971         if (nxt_slow_path(last == NULL)) {
1972             /* TODO pogorevaTb */
1973         }
1974 
1975         nxt_buf_chain_add(&b, last);
1976 
1977         if (rc->app_port != NULL) {
1978             nxt_router_app_release_port(task, rc->app_port, rc->app_port->app);
1979 
1980             rc->app_port = NULL;
1981         }
1982 
1983         rc->conn = NULL;
1984     }
1985 
1986     if (b == NULL) {
1987         return;
1988     }
1989 
1990     if (msg->buf == b) {
1991         /* Disable instant buffer completion/re-using by port. */
1992         msg->buf = NULL;
1993     }
1994 
1995     if (c->write == NULL) {
1996         c->write = b;
1997         c->write_state = &nxt_router_conn_write_state;
1998 
1999         nxt_conn_write(task->thread->engine, c);
2000 
2001     } else {
2002         nxt_debug(task, "router data attach out bufs to existing chain");
2003 
2004         nxt_buf_chain_add(&c->write, b);
2005     }
2006 }
2007 
2008 
2009 nxt_inline const char *
2010 nxt_router_text_by_code(int code)
2011 {
2012     switch (code) {
2013     case 400: return "Bad request";
2014     case 404: return "Not found";
2015     case 403: return "Forbidden";
2016     case 408: return "Request Timeout";
2017     case 411: return "Length Required";
2018     case 413: return "Request Entity Too Large";
2019     case 500:
2020     default:  return "Internal server error";
2021     }
2022 }
2023 
2024 
2025 static nxt_buf_t *
2026 nxt_router_get_error_buf(nxt_task_t *task, nxt_mp_t *mp, int code,
2027     const char* fmt, va_list args)
2028 {
2029     nxt_buf_t   *b, *last;
2030     const char  *msg;
2031 
2032     b = nxt_buf_mem_ts_alloc(task, mp, 16384);
2033     if (nxt_slow_path(b == NULL)) {
2034         return NULL;
2035     }
2036 
2037     b->mem.free = nxt_sprintf(b->mem.free, b->mem.end,
2038         "HTTP/1.0 %d %s\r\n"
2039         "Content-Type: text/plain\r\n"
2040         "Connection: close\r\n\r\n",
2041         code, nxt_router_text_by_code(code));
2042 
2043     msg = (const char *) b->mem.free;
2044 
2045     b->mem.free = nxt_vsprintf(b->mem.free, b->mem.end, fmt, args);
2046     b->mem.free[0] = '\0';
2047 
2048     nxt_log_alert(task->log, "error %d: %s", code, msg);
2049 
2050     last = nxt_buf_mem_ts_alloc(task, mp, 0);
2051 
2052     if (nxt_slow_path(last == NULL)) {
2053         nxt_mp_release(mp, b);
2054         return NULL;
2055     }
2056 
2057     nxt_buf_set_sync(last);
2058     nxt_buf_set_last(last);
2059 
2060     nxt_buf_chain_add(&b, last);
2061 
2062     return b;
2063 }
2064 
2065 
2066 
2067 static void
2068 nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code,
2069     const char* fmt, ...)
2070 {
2071     va_list    args;
2072     nxt_buf_t  *b;
2073 
2074     va_start(args, fmt);
2075     b = nxt_router_get_error_buf(task, c->mem_pool, code, fmt, args);
2076     va_end(args);
2077 
2078     if (c->socket.data != NULL) {
2079         nxt_mp_free(c->mem_pool, c->socket.data);
2080         c->socket.data = NULL;
2081     }
2082 
2083     if (c->socket.fd == -1) {
2084         nxt_mp_release(c->mem_pool, b->next);
2085         nxt_mp_release(c->mem_pool, b);
2086         return;
2087     }
2088 
2089     if (c->write == NULL) {
2090         c->write = b;
2091         c->write_state = &nxt_router_conn_write_state;
2092 
2093         nxt_conn_write(task->thread->engine, c);
2094 
2095     } else {
2096         nxt_debug(task, "router data attach out bufs to existing chain");
2097 
2098         nxt_buf_chain_add(&c->write, b);
2099     }
2100 }
2101 
2102 
2103 static void
2104 nxt_router_sw_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data)
2105 {
2106     nxt_start_worker_t  *sw;
2107 
2108     sw = data;
2109 
2110     nxt_assert(sw != NULL);
2111     nxt_assert(sw->app->pending_workers != 0);
2112 
2113     msg->new_port->app = sw->app;
2114 
2115     sw->app->pending_workers--;
2116     sw->app->workers++;
2117 
2118     nxt_debug(task, "sw %p got port %p", sw, msg->new_port);
2119 
2120     nxt_router_app_release_port(task, msg->new_port, sw->app);
2121 
2122     nxt_router_sw_release(task, sw);
2123 }
2124 
2125 
2126 static void
2127 nxt_router_sw_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data)
2128 {
2129     nxt_start_worker_t  *sw;
2130 
2131     sw = data;
2132 
2133     nxt_assert(sw != NULL);
2134     nxt_assert(sw->app->pending_workers != 0);
2135 
2136     sw->app->pending_workers--;
2137 
2138     nxt_debug(task, "sw %p error, failed to start app '%V'",
2139               sw, &sw->app->name);
2140 
2141     nxt_router_sw_release(task, sw);
2142 }
2143 
2144 
2145 static void
2146 nxt_router_send_sw_request(nxt_task_t *task, void *obj, void *data)
2147 {
2148     size_t              size;
2149     uint32_t            stream;
2150     nxt_buf_t           *b;
2151     nxt_app_t           *app;
2152     nxt_port_t          *main_port, *router_port, *app_port;
2153     nxt_runtime_t       *rt;
2154     nxt_start_worker_t  *sw;
2155     nxt_req_app_link_t  *ra;
2156 
2157     sw = obj;
2158     app = sw->app;
2159 
2160     if (nxt_queue_is_empty(&app->requests)) {
2161         ra = sw->ra;
2162         app_port = nxt_router_app_get_port(app, ra->req_id);
2163 
2164         if (app_port != NULL) {
2165             nxt_debug(task, "app '%V' %p process request #%uxD",
2166                       &app->name, app, ra->req_id);
2167 
2168             ra->app_port = app_port;
2169 
2170             nxt_router_process_http_request_mp(task, ra, app_port);
2171 
2172             nxt_router_ra_release(task, ra, ra->work.data);
2173             nxt_router_sw_release(task, sw);
2174 
2175             return;
2176         }
2177     }
2178 
2179     nxt_queue_insert_tail(&app->requests, &sw->ra->link);
2180 
2181     if (app->workers + app->pending_workers >= app->max_workers) {
2182         nxt_debug(task, "app '%V' %p %uD/%uD running/pending workers, "
2183                   "max_workers (%uD) reached", &app->name, app,
2184                    app->workers, app->pending_workers, app->max_workers);
2185 
2186         nxt_router_sw_release(task, sw);
2187 
2188         return;
2189     }
2190 
2191     app->pending_workers++;
2192 
2193     nxt_debug(task, "sw %p send", sw);
2194 
2195     rt = task->thread->runtime;
2196     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2197     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2198 
2199     size = app->name.length + 1 + app->conf.length;
2200 
2201     b = nxt_buf_mem_alloc(main_port->mem_pool, size, 0);
2202 
2203     nxt_buf_cpystr(b, &app->name);
2204     *b->mem.free++ = '\0';
2205     nxt_buf_cpystr(b, &app->conf);
2206 
2207     stream = nxt_port_rpc_register_handler(task, router_port,
2208                                            nxt_router_sw_ready,
2209                                            nxt_router_sw_error,
2210                                            main_port->pid, sw);
2211 
2212     nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1,
2213                           stream, router_port->id, b);
2214 }
2215 
2216 
2217 static nxt_bool_t
2218 nxt_router_app_free(nxt_task_t *task, nxt_app_t *app)
2219 {
2220     nxt_queue_link_t    *lnk;
2221     nxt_req_app_link_t  *ra;
2222 
2223     nxt_thread_log_debug("app '%V' %p state: %d/%uD/%uD/%d", &app->name, app,
2224                          app->live, app->workers, app->pending_workers,
2225                          nxt_queue_is_empty(&app->requests));
2226 
2227     if (app->live == 0
2228         && app->workers == 0
2229         && app->pending_workers == 0
2230         && nxt_queue_is_empty(&app->requests))
2231     {
2232         nxt_thread_mutex_destroy(&app->mutex);
2233         nxt_free(app);
2234 
2235         return 1;
2236     }
2237 
2238     if (app->live == 1
2239         && nxt_queue_is_empty(&app->requests) == 0
2240         && app->workers + app->pending_workers < app->max_workers)
2241     {
2242         lnk = nxt_queue_first(&app->requests);
2243         nxt_queue_remove(lnk);
2244 
2245         ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link);
2246 
2247         nxt_router_sw_create(task, app, ra);
2248     }
2249 
2250     return 0;
2251 }
2252 
2253 
2254 static nxt_port_t *
2255 nxt_router_app_get_port(nxt_app_t *app, uint32_t req_id)
2256 {
2257     nxt_port_t        *port;
2258     nxt_queue_link_t  *lnk;
2259 
2260     port = NULL;
2261 
2262     nxt_thread_mutex_lock(&app->mutex);
2263 
2264     if (!nxt_queue_is_empty(&app->ports)) {
2265         lnk = nxt_queue_first(&app->ports);
2266         nxt_queue_remove(lnk);
2267 
2268         lnk->next = NULL;
2269 
2270         port = nxt_queue_link_data(lnk, nxt_port_t, app_link);
2271 
2272         port->app_req_id = req_id;
2273     }
2274 
2275     nxt_thread_mutex_unlock(&app->mutex);
2276 
2277     return port;
2278 }
2279 
2280 
2281 static void
2282 nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data)
2283 {
2284     nxt_app_t            *app;
2285     nxt_port_t           *port;
2286     nxt_work_t           *work;
2287     nxt_queue_link_t     *lnk;
2288     nxt_req_app_link_t   *ra;
2289 
2290     port = obj;
2291     app = data;
2292 
2293     nxt_assert(app != NULL);
2294     nxt_assert(app == port->app);
2295     nxt_assert(port->app_link.next == NULL);
2296 
2297 
2298     if (task->thread->engine != port->engine) {
2299         work = &port->work;
2300 
2301         nxt_debug(task, "post release port to engine %p", port->engine);
2302 
2303         work->next = NULL;
2304         work->handler = nxt_router_app_release_port;
2305         work->task = &port->engine->task;
2306         work->obj = port;
2307         work->data = app;
2308 
2309         nxt_event_engine_post(port->engine, work);
2310 
2311         return;
2312     }
2313 
2314     if (!nxt_queue_is_empty(&app->requests)) {
2315         lnk = nxt_queue_first(&app->requests);
2316         nxt_queue_remove(lnk);
2317 
2318         ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link);
2319 
2320         nxt_debug(task, "app '%V' %p process next request #%uxD",
2321                   &app->name, app, ra->req_id);
2322 
2323         ra->app_port = port;
2324         port->app_req_id = ra->req_id;
2325 
2326         nxt_router_process_http_request_mp(task, ra, port);
2327 
2328         nxt_router_ra_release(task, ra, ra->work.data);
2329 
2330         return;
2331     }
2332 
2333     port->app_req_id = 0;
2334 
2335     if (port->pair[1] == -1) {
2336         nxt_debug(task, "app '%V' %p port already closed (pid %PI dead?)",
2337                   &app->name, app, port->pid);
2338 
2339         app->workers--;
2340         nxt_router_app_free(task, app);
2341 
2342         port->app = NULL;
2343 
2344         nxt_port_release(port);
2345 
2346         return;
2347     }
2348 
2349     if (!app->live) {
2350         nxt_debug(task, "app '%V' %p is not alive, send QUIT to port",
2351                   &app->name, app);
2352 
2353         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT,
2354                               -1, 0, 0, NULL);
2355 
2356         return;
2357     }
2358 
2359     nxt_debug(task, "app '%V' %p requests queue is empty, keep the port",
2360               &app->name, app);
2361 
2362     nxt_thread_mutex_lock(&app->mutex);
2363 
2364     nxt_queue_insert_head(&app->ports, &port->app_link);
2365 
2366     nxt_thread_mutex_unlock(&app->mutex);
2367 }
2368 
2369 
2370 nxt_bool_t
2371 nxt_router_app_remove_port(nxt_port_t *port)
2372 {
2373     nxt_app_t   *app;
2374     nxt_bool_t  busy;
2375 
2376     app = port->app;
2377     busy = port->app_req_id != 0;
2378 
2379     if (app == NULL) {
2380         nxt_thread_log_debug("port %p app remove, no app", port);
2381 
2382         nxt_assert(port->app_link.next == NULL);
2383 
2384         return 1;
2385     }
2386 
2387     nxt_thread_mutex_lock(&app->mutex);
2388 
2389     if (port->app_link.next != NULL) {
2390 
2391         nxt_queue_remove(&port->app_link);
2392         port->app_link.next = NULL;
2393 
2394     }
2395 
2396     nxt_thread_mutex_unlock(&app->mutex);
2397 
2398     if (busy == 0) {
2399         nxt_thread_log_debug("port %p app remove, free, app '%V' %p", port,
2400                              &app->name, app);
2401 
2402         app->workers--;
2403         nxt_router_app_free(&port->engine->task, app);
2404 
2405         return 1;
2406     }
2407 
2408     nxt_thread_log_debug("port %p app remove, busy, app '%V' %p, req #%uxD",
2409                          port, &app->name, app, port->app_req_id);
2410 
2411     return 0;
2412 }
2413 
2414 
2415 static nxt_int_t
2416 nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
2417 {
2418     nxt_app_t                *app;
2419     nxt_conn_t               *c;
2420     nxt_port_t               *port;
2421     nxt_start_worker_t       *sw;
2422     nxt_socket_conf_joint_t  *joint;
2423 
2424     port = NULL;
2425     c = ra->rc->conn;
2426 
2427     joint = c->listen->socket.data;
2428     app = joint->socket_conf->application;
2429 
2430     if (app == NULL) {
2431         nxt_router_gen_error(task, c, 500,
2432                              "Application is NULL in socket_conf");
2433         return NXT_ERROR;
2434     }
2435 
2436 
2437     port = nxt_router_app_get_port(app, ra->req_id);
2438 
2439     if (port != NULL) {
2440         nxt_debug(task, "already have port for app '%V'", &app->name);
2441 
2442         ra->app_port = port;
2443         return NXT_OK;
2444     }
2445 
2446     sw = nxt_router_sw_create(task, app, ra);
2447 
2448     if (nxt_slow_path(sw == NULL)) {
2449         nxt_router_gen_error(task, c, 500,
2450                              "Failed to allocate start worker struct");
2451         return NXT_ERROR;
2452     }
2453 
2454     return NXT_AGAIN;
2455 }
2456 
2457 
2458 static void
2459 nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data)
2460 {
2461     size_t                    size;
2462     nxt_int_t                 ret;
2463     nxt_buf_t                 *buf;
2464     nxt_conn_t                *c;
2465     nxt_sockaddr_t            *local;
2466     nxt_app_parse_ctx_t       *ap;
2467     nxt_app_request_body_t    *b;
2468     nxt_socket_conf_joint_t   *joint;
2469     nxt_app_request_header_t  *h;
2470 
2471     c = obj;
2472     ap = data;
2473     buf = c->read;
2474     joint = c->listen->socket.data;
2475 
2476     nxt_debug(task, "router conn http header parse");
2477 
2478     if (ap == NULL) {
2479         ap = nxt_mp_zalloc(c->mem_pool, sizeof(nxt_app_parse_ctx_t));
2480         if (nxt_slow_path(ap == NULL)) {
2481             nxt_router_conn_close(task, c, data);
2482             return;
2483         }
2484 
2485         ret = nxt_app_http_req_init(task, ap);
2486         if (nxt_slow_path(ret != NXT_OK)) {
2487             nxt_router_conn_close(task, c, data);
2488             return;
2489         }
2490 
2491         c->socket.data = ap;
2492 
2493         ap->r.remote.start = nxt_sockaddr_address(c->remote);
2494         ap->r.remote.length = c->remote->address_length;
2495 
2496         local = joint->socket_conf->sockaddr;
2497         ap->r.local.start = nxt_sockaddr_address(local);
2498         ap->r.local.length = local->address_length;
2499 
2500         ap->r.header.buf = buf;
2501     }
2502 
2503     h = &ap->r.header;
2504     b = &ap->r.body;
2505 
2506     ret = nxt_app_http_req_header_parse(task, ap, buf);
2507 
2508     nxt_debug(task, "http parse request header: %d", ret);
2509 
2510     switch (nxt_expect(NXT_DONE, ret)) {
2511 
2512     case NXT_DONE:
2513         nxt_debug(task, "router request header parsing complete, "
2514                   "content length: %O, preread: %uz",
2515                   h->parsed_content_length, nxt_buf_mem_used_size(&buf->mem));
2516 
2517         if (b->done) {
2518             nxt_router_process_http_request(task, c, ap);
2519 
2520             return;
2521         }
2522 
2523         if (joint->socket_conf->max_body_size > 0
2524             && (size_t) h->parsed_content_length
2525                > joint->socket_conf->max_body_size)
2526         {
2527             nxt_router_gen_error(task, c, 413, "Content-Length too big");
2528             return;
2529         }
2530 
2531         if (nxt_buf_mem_free_size(&buf->mem) == 0) {
2532             size = nxt_min(joint->socket_conf->body_buffer_size,
2533                            (size_t) h->parsed_content_length);
2534 
2535             buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0);
2536             if (nxt_slow_path(buf->next == NULL)) {
2537                 nxt_router_gen_error(task, c, 500, "Failed to allocate "
2538                                      "buffer for request body");
2539                 return;
2540             }
2541 
2542             c->read = buf->next;
2543 
2544             b->preread_size += nxt_buf_mem_used_size(&buf->mem);
2545         }
2546 
2547         if (b->buf == NULL) {
2548             b->buf = c->read;
2549         }
2550 
2551         c->read_state = &nxt_router_conn_read_body_state;
2552         break;
2553 
2554     case NXT_ERROR:
2555         nxt_router_gen_error(task, c, 400, "Request header parse error");
2556         return;
2557 
2558     default:  /* NXT_AGAIN */
2559 
2560         if (c->read->mem.free == c->read->mem.end) {
2561             size = joint->socket_conf->large_header_buffer_size;
2562 
2563             if (size <= (size_t) nxt_buf_mem_used_size(&buf->mem)
2564                 || ap->r.header.bufs
2565                    >= joint->socket_conf->large_header_buffers)
2566             {
2567                 nxt_router_gen_error(task, c, 413,
2568                                      "Too long request headers");
2569                 return;
2570             }
2571 
2572             buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0);
2573             if (nxt_slow_path(buf->next == NULL)) {
2574                 nxt_router_gen_error(task, c, 500,
2575                                      "Failed to allocate large header "
2576                                      "buffer");
2577                 return;
2578             }
2579 
2580             ap->r.header.bufs++;
2581 
2582             size = c->read->mem.free - c->read->mem.pos;
2583 
2584             c->read = nxt_buf_cpy(buf->next, c->read->mem.pos, size);
2585         }
2586 
2587     }
2588 
2589     nxt_conn_read(task->thread->engine, c);
2590 }
2591 
2592 
2593 static void
2594 nxt_router_conn_http_body_read(nxt_task_t *task, void *obj, void *data)
2595 {
2596     size_t                    size;
2597     nxt_int_t                 ret;
2598     nxt_buf_t                 *buf;
2599     nxt_conn_t                *c;
2600     nxt_app_parse_ctx_t       *ap;
2601     nxt_app_request_body_t    *b;
2602     nxt_socket_conf_joint_t   *joint;
2603     nxt_app_request_header_t  *h;
2604 
2605     c = obj;
2606     ap = data;
2607     buf = c->read;
2608 
2609     nxt_debug(task, "router conn http body read");
2610 
2611     nxt_assert(ap != NULL);
2612 
2613     b = &ap->r.body;
2614     h = &ap->r.header;
2615 
2616     ret = nxt_app_http_req_body_read(task, ap, buf);
2617 
2618     nxt_debug(task, "http read request body: %d", ret);
2619 
2620     switch (nxt_expect(NXT_DONE, ret)) {
2621 
2622     case NXT_DONE:
2623         nxt_router_process_http_request(task, c, ap);
2624         return;
2625 
2626     case NXT_ERROR:
2627         nxt_router_gen_error(task, c, 500, "Read body error");
2628         return;
2629 
2630     default:  /* NXT_AGAIN */
2631 
2632         if (nxt_buf_mem_free_size(&buf->mem) == 0) {
2633             joint = c->listen->socket.data;
2634 
2635             b->preread_size += nxt_buf_mem_used_size(&buf->mem);
2636 
2637             size = nxt_min(joint->socket_conf->body_buffer_size,
2638                            (size_t) h->parsed_content_length - b->preread_size);
2639 
2640             buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0);
2641             if (nxt_slow_path(buf->next == NULL)) {
2642                 nxt_router_gen_error(task, c, 500, "Failed to allocate "
2643                                      "buffer for request body");
2644                 return;
2645             }
2646 
2647             c->read = buf->next;
2648         }
2649 
2650         nxt_debug(task, "router request body read again, rest: %uz",
2651                   h->parsed_content_length - b->preread_size);
2652     }
2653 
2654     nxt_conn_read(task->thread->engine, c);
2655 }
2656 
2657 
2658 static void
2659 nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
2660     nxt_app_parse_ctx_t *ap)
2661 {
2662     nxt_mp_t             *port_mp;
2663     nxt_int_t            res;
2664     nxt_port_t           *port;
2665     nxt_req_id_t         req_id;
2666     nxt_event_engine_t   *engine;
2667     nxt_req_app_link_t   *ra;
2668     nxt_req_conn_link_t  *rc;
2669 
2670     engine = task->thread->engine;
2671 
2672     do {
2673         req_id = nxt_random(&task->thread->random);
2674     } while (nxt_event_engine_request_find(engine, req_id) != NULL);
2675 
2676     rc = nxt_conn_request_add(c, req_id);
2677 
2678     if (nxt_slow_path(rc == NULL)) {
2679         nxt_router_gen_error(task, c, 500, "Failed to allocate "
2680                              "req->conn link");
2681 
2682         return;
2683     }
2684 
2685     nxt_event_engine_request_add(engine, rc);
2686 
2687     nxt_debug(task, "req_id %uxD linked to conn %p at engine %p",
2688               req_id, c, engine);
2689 
2690     c->socket.data = NULL;
2691 
2692     ra = nxt_router_ra_create(task, rc);
2693 
2694     ra->ap = ap;
2695     ra->reply_port = engine->port;
2696 
2697     res = nxt_router_app_port(task, ra);
2698 
2699     if (res != NXT_OK) {
2700         return;
2701     }
2702 
2703     port = ra->app_port;
2704 
2705     if (nxt_slow_path(port == NULL)) {
2706         nxt_router_gen_error(task, rc->conn, 500, "Application port not found");
2707         return;
2708     }
2709 
2710     port_mp = port->mem_pool;
2711     port->mem_pool = c->mem_pool;
2712 
2713     nxt_router_process_http_request_mp(task, ra, port);
2714 
2715     port->mem_pool = port_mp;
2716 
2717 
2718     nxt_router_ra_release(task, ra, ra->work.data);
2719 }
2720 
2721 
2722 static void
2723 nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra,
2724     nxt_port_t *port)
2725 {
2726     nxt_int_t            res;
2727     nxt_port_t           *c_port, *reply_port;
2728     nxt_conn_t           *c;
2729     nxt_app_wmsg_t       wmsg;
2730     nxt_app_parse_ctx_t  *ap;
2731 
2732     reply_port = ra->reply_port;
2733     ap = ra->ap;
2734     c = ra->rc->conn;
2735 
2736     c_port = nxt_process_connected_port_find(port->process, reply_port->pid,
2737                                              reply_port->id);
2738     if (nxt_slow_path(c_port != reply_port)) {
2739         res = nxt_port_send_port(task, port, reply_port, 0);
2740 
2741         if (nxt_slow_path(res != NXT_OK)) {
2742             nxt_router_gen_error(task, c, 500,
2743                                  "Failed to send reply port to application");
2744             return;
2745         }
2746 
2747         nxt_process_connected_port_add(port->process, reply_port);
2748     }
2749 
2750     wmsg.port = port;
2751     wmsg.write = NULL;
2752     wmsg.buf = &wmsg.write;
2753     wmsg.stream = ra->req_id;
2754 
2755     res = port->app->prepare_msg(task, &ap->r, &wmsg);
2756 
2757     if (nxt_slow_path(res != NXT_OK)) {
2758         nxt_router_gen_error(task, c, 500,
2759                              "Failed to prepare message for application");
2760         return;
2761     }
2762 
2763     nxt_debug(task, "about to send %d bytes buffer to worker port %d",
2764                     nxt_buf_used_size(wmsg.write),
2765                     wmsg.port->socket.fd);
2766 
2767     res = nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA,
2768                                  -1, ra->req_id, reply_port->id, wmsg.write);
2769 
2770     if (nxt_slow_path(res != NXT_OK)) {
2771         nxt_router_gen_error(task, c, 500,
2772                              "Failed to send message to application");
2773         return;
2774     }
2775 }
2776 
2777 
2778 static nxt_int_t
2779 nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
2780     nxt_app_wmsg_t *wmsg)
2781 {
2782     nxt_int_t                 rc;
2783     nxt_buf_t                 *b;
2784     nxt_http_field_t          *field;
2785     nxt_app_request_header_t  *h;
2786 
2787     static const nxt_str_t prefix = nxt_string("HTTP_");
2788     static const nxt_str_t eof = nxt_null_string;
2789 
2790     h = &r->header;
2791 
2792 #define RC(S)                                                                 \
2793     do {                                                                      \
2794         rc = (S);                                                             \
2795         if (nxt_slow_path(rc != NXT_OK)) {                                    \
2796             goto fail;                                                        \
2797         }                                                                     \
2798     } while(0)
2799 
2800 #define NXT_WRITE(N)                                                          \
2801     RC(nxt_app_msg_write_str(task, wmsg, N))
2802 
2803     /* TODO error handle, async mmap buffer assignment */
2804 
2805     NXT_WRITE(&h->method);
2806     NXT_WRITE(&h->target);
2807 
2808     if (h->path.start == h->target.start) {
2809         NXT_WRITE(&eof);
2810 
2811     } else {
2812         NXT_WRITE(&h->path);
2813     }
2814 
2815     if (h->query.start != NULL) {
2816         RC(nxt_app_msg_write_size(task, wmsg,
2817                                   h->query.start - h->target.start + 1));
2818     } else {
2819         RC(nxt_app_msg_write_size(task, wmsg, 0));
2820     }
2821 
2822     NXT_WRITE(&h->version);
2823 
2824     NXT_WRITE(&r->remote);
2825     NXT_WRITE(&r->local);
2826 
2827     NXT_WRITE(&h->host);
2828     NXT_WRITE(&h->content_type);
2829     NXT_WRITE(&h->content_length);
2830 
2831     nxt_list_each(field, h->fields) {
2832         RC(nxt_app_msg_write_prefixed_upcase(task, wmsg,
2833                                              &prefix, &field->name));
2834         NXT_WRITE(&field->value);
2835 
2836     } nxt_list_loop;
2837 
2838     /* end-of-headers mark */
2839     NXT_WRITE(&eof);
2840 
2841     RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size));
2842 
2843     for(b = r->body.buf; b != NULL; b = b->next) {
2844         RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos,
2845                                  nxt_buf_mem_used_size(&b->mem)));
2846     }
2847 
2848 #undef NXT_WRITE
2849 #undef RC
2850 
2851     return NXT_OK;
2852 
2853 fail:
2854 
2855     return NXT_ERROR;
2856 }
2857 
2858 
2859 static nxt_int_t
2860 nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
2861     nxt_app_wmsg_t *wmsg)
2862 {
2863     nxt_int_t                 rc;
2864     nxt_buf_t                 *b;
2865     nxt_http_field_t          *field;
2866     nxt_app_request_header_t  *h;
2867 
2868     static const nxt_str_t prefix = nxt_string("HTTP_");
2869     static const nxt_str_t eof = nxt_null_string;
2870 
2871     h = &r->header;
2872 
2873 #define RC(S)                                                                 \
2874     do {                                                                      \
2875         rc = (S);                                                             \
2876         if (nxt_slow_path(rc != NXT_OK)) {                                    \
2877             goto fail;                                                        \
2878         }                                                                     \
2879     } while(0)
2880 
2881 #define NXT_WRITE(N)                                                          \
2882     RC(nxt_app_msg_write_str(task, wmsg, N))
2883 
2884     /* TODO error handle, async mmap buffer assignment */
2885 
2886     NXT_WRITE(&h->method);
2887     NXT_WRITE(&h->target);
2888 
2889     if (h->path.start == h->target.start) {
2890         NXT_WRITE(&eof);
2891 
2892     } else {
2893         NXT_WRITE(&h->path);
2894     }
2895 
2896     if (h->query.start != NULL) {
2897         RC(nxt_app_msg_write_size(task, wmsg,
2898                                   h->query.start - h->target.start + 1));
2899     } else {
2900         RC(nxt_app_msg_write_size(task, wmsg, 0));
2901     }
2902 
2903     NXT_WRITE(&h->version);
2904 
2905     // PHP_SELF
2906     // SCRIPT_NAME
2907     // SCRIPT_FILENAME
2908     // DOCUMENT_ROOT
2909 
2910     NXT_WRITE(&r->remote);
2911     NXT_WRITE(&r->local);
2912 
2913     NXT_WRITE(&h->host);
2914     NXT_WRITE(&h->cookie);
2915     NXT_WRITE(&h->content_type);
2916     NXT_WRITE(&h->content_length);
2917 
2918     RC(nxt_app_msg_write_size(task, wmsg, h->parsed_content_length));
2919 
2920     nxt_list_each(field, h->fields) {
2921         RC(nxt_app_msg_write_prefixed_upcase(task, wmsg,
2922                                              &prefix, &field->name));
2923         NXT_WRITE(&field->value);
2924 
2925     } nxt_list_loop;
2926 
2927     /* end-of-headers mark */
2928     NXT_WRITE(&eof);
2929 
2930     RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size));
2931 
2932     for(b = r->body.buf; b != NULL; b = b->next) {
2933         RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos,
2934                                  nxt_buf_mem_used_size(&b->mem)));
2935     }
2936 
2937 #undef NXT_WRITE
2938 #undef RC
2939 
2940     return NXT_OK;
2941 
2942 fail:
2943 
2944     return NXT_ERROR;
2945 }
2946 
2947 
2948 static nxt_int_t
2949 nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, nxt_app_wmsg_t *wmsg)
2950 {
2951     nxt_int_t                 rc;
2952     nxt_buf_t                 *b;
2953     nxt_http_field_t          *field;
2954     nxt_app_request_header_t  *h;
2955 
2956     static const nxt_str_t eof = nxt_null_string;
2957 
2958     h = &r->header;
2959 
2960 #define RC(S)                                                                 \
2961     do {                                                                      \
2962         rc = (S);                                                             \
2963         if (nxt_slow_path(rc != NXT_OK)) {                                    \
2964             goto fail;                                                        \
2965         }                                                                     \
2966     } while(0)
2967 
2968 #define NXT_WRITE(N)                                                          \
2969     RC(nxt_app_msg_write_str(task, wmsg, N))
2970 
2971     /* TODO error handle, async mmap buffer assignment */
2972 
2973     NXT_WRITE(&h->method);
2974     NXT_WRITE(&h->target);
2975 
2976     if (h->path.start == h->target.start) {
2977         NXT_WRITE(&eof);
2978 
2979     } else {
2980         NXT_WRITE(&h->path);
2981     }
2982 
2983     if (h->query.start != NULL) {
2984         RC(nxt_app_msg_write_size(task, wmsg,
2985                                   h->query.start - h->target.start + 1));
2986     } else {
2987         RC(nxt_app_msg_write_size(task, wmsg, 0));
2988     }
2989 
2990     NXT_WRITE(&h->version);
2991     NXT_WRITE(&r->remote);
2992 
2993     NXT_WRITE(&h->host);
2994     NXT_WRITE(&h->cookie);
2995     NXT_WRITE(&h->content_type);
2996     NXT_WRITE(&h->content_length);
2997 
2998     RC(nxt_app_msg_write_size(task, wmsg, h->parsed_content_length));
2999 
3000     nxt_list_each(field, h->fields) {
3001         NXT_WRITE(&field->name);
3002         NXT_WRITE(&field->value);
3003 
3004     } nxt_list_loop;
3005 
3006     /* end-of-headers mark */
3007     NXT_WRITE(&eof);
3008 
3009     RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size));
3010 
3011     for(b = r->body.buf; b != NULL; b = b->next) {
3012         RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos,
3013                                  nxt_buf_mem_used_size(&b->mem)));
3014     }
3015 
3016 #undef NXT_WRITE
3017 #undef RC
3018 
3019     return NXT_OK;
3020 
3021 fail:
3022 
3023     return NXT_ERROR;
3024 }
3025 
3026 
3027 static const nxt_conn_state_t  nxt_router_conn_close_state
3028     nxt_aligned(64) =
3029 {
3030     .ready_handler = nxt_router_conn_free,
3031 };
3032 
3033 
3034 static void
3035 nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data)
3036 {
3037     nxt_buf_t         *b;
3038     nxt_bool_t        last;
3039     nxt_conn_t        *c;
3040     nxt_work_queue_t  *wq;
3041 
3042     nxt_debug(task, "router conn ready %p", obj);
3043 
3044     c = obj;
3045     b = c->write;
3046 
3047     wq = &task->thread->engine->fast_work_queue;
3048 
3049     last = 0;
3050 
3051     while (b != NULL) {
3052         if (!nxt_buf_is_sync(b)) {
3053             if (nxt_buf_used_size(b) > 0) {
3054                 break;
3055             }
3056         }
3057 
3058         if (nxt_buf_is_last(b)) {
3059             last = 1;
3060         }
3061 
3062         nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
3063 
3064         b = b->next;
3065     }
3066 
3067     c->write = b;
3068 
3069     if (b != NULL) {
3070         nxt_debug(task, "router conn %p has more data to write", obj);
3071 
3072         nxt_conn_write(task->thread->engine, c);
3073 
3074     } else {
3075         nxt_debug(task, "router conn %p no more data to write, last = %d", obj,
3076                   last);
3077 
3078         if (last != 0) {
3079             nxt_debug(task, "enqueue router conn close %p (ready handler)", c);
3080 
3081             nxt_work_queue_add(wq, nxt_router_conn_close, task, c,
3082                                c->socket.data);
3083         }
3084     }
3085 }
3086 
3087 
3088 static void
3089 nxt_router_conn_close(nxt_task_t *task, void *obj, void *data)
3090 {
3091     nxt_conn_t  *c;
3092 
3093     c = obj;
3094 
3095     nxt_debug(task, "router conn close");
3096 
3097     c->write_state = &nxt_router_conn_close_state;
3098 
3099     nxt_conn_close(task->thread->engine, c);
3100 }
3101 
3102 
3103 static void
3104 nxt_router_conn_mp_cleanup(nxt_task_t *task, void *obj, void *data)
3105 {
3106     nxt_socket_conf_joint_t  *joint;
3107 
3108     joint = obj;
3109 
3110     nxt_router_conf_release(task, joint);
3111 }
3112 
3113 
3114 static void
3115 nxt_router_conn_free(nxt_task_t *task, void *obj, void *data)
3116 {
3117     nxt_conn_t               *c;
3118     nxt_req_conn_link_t      *rc;
3119     nxt_socket_conf_joint_t  *joint;
3120 
3121     c = obj;
3122 
3123     nxt_debug(task, "router conn close done");
3124 
3125     nxt_queue_each(rc, &c->requests, nxt_req_conn_link_t, link) {
3126 
3127         nxt_debug(task, "conn %p close, req %uxD", c, rc->req_id);
3128 
3129         if (rc->app_port != NULL) {
3130             nxt_router_app_release_port(task, rc->app_port, rc->app_port->app);
3131 
3132             rc->app_port = NULL;
3133         }
3134 
3135         rc->conn = NULL;
3136 
3137         nxt_event_engine_request_remove(task->thread->engine, rc);
3138 
3139     } nxt_queue_loop;
3140 
3141     nxt_queue_remove(&c->link);
3142 
3143     joint = c->listen->socket.data;
3144 
3145     task = &task->thread->engine->task;
3146 
3147     nxt_mp_cleanup(c->mem_pool, nxt_router_conn_mp_cleanup, task, joint, NULL);
3148 
3149     nxt_mp_release(c->mem_pool, c);
3150 }
3151 
3152 
3153 static void
3154 nxt_router_conn_error(nxt_task_t *task, void *obj, void *data)
3155 {
3156     nxt_conn_t  *c;
3157 
3158     c = obj;
3159 
3160     nxt_debug(task, "router conn error");
3161 
3162     if (c->socket.fd != -1) {
3163         c->write_state = &nxt_router_conn_close_state;
3164 
3165         nxt_conn_close(task->thread->engine, c);
3166     }
3167 }
3168 
3169 
3170 static void
3171 nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data)
3172 {
3173     nxt_conn_t   *c;
3174     nxt_timer_t  *timer;
3175 
3176     timer = obj;
3177 
3178     nxt_debug(task, "router conn timeout");
3179 
3180     c = nxt_read_timer_conn(timer);
3181 
3182     if (c->read_state == &nxt_router_conn_read_header_state) {
3183         nxt_router_gen_error(task, c, 408, "Read header timeout");
3184 
3185     } else {
3186         nxt_router_gen_error(task, c, 408, "Read body timeout");
3187     }
3188 }
3189 
3190 
3191 static nxt_msec_t
3192 nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data)
3193 {
3194     nxt_socket_conf_joint_t  *joint;
3195 
3196     joint = c->listen->socket.data;
3197 
3198     return nxt_value_at(nxt_msec_t, joint->socket_conf, data);
3199 }
3200