xref: /unit/src/nxt_router.c (revision 216:07257705cd64)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Valentin V. Bartenev
5  * Copyright (C) NGINX, Inc.
6  */
7 
8 #include <nxt_router.h>
9 #include <nxt_conf.h>
10 
11 
12 typedef struct {
13     nxt_str_t  type;
14     uint32_t   workers;
15 } nxt_router_app_conf_t;
16 
17 
18 typedef struct {
19     nxt_str_t  application;
20 } nxt_router_listener_conf_t;
21 
22 
23 typedef struct nxt_req_app_link_s nxt_req_app_link_t;
24 typedef struct nxt_start_worker_s nxt_start_worker_t;
25 
26 struct nxt_start_worker_s {
27     nxt_app_t              *app;
28     nxt_req_app_link_t     *ra;
29 
30     nxt_work_t             work;
31 };
32 
33 
34 struct nxt_req_app_link_s {
35     nxt_req_id_t         req_id;
36     nxt_port_t           *app_port;
37     nxt_port_t           *reply_port;
38     nxt_app_parse_ctx_t  *ap;
39     nxt_req_conn_link_t  *rc;
40 
41     nxt_queue_link_t     link; /* for nxt_app_t.requests */
42 
43     nxt_mp_t             *mem_pool;
44     nxt_work_t           work;
45 };
46 
47 
48 typedef struct {
49     nxt_socket_conf_t       *socket_conf;
50     nxt_router_temp_conf_t  *temp_conf;
51 } nxt_socket_rpc_t;
52 
53 
54 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
55 static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
56 static void nxt_router_conf_ready(nxt_task_t *task,
57     nxt_router_temp_conf_t *tmcf);
58 static void nxt_router_conf_error(nxt_task_t *task,
59     nxt_router_temp_conf_t *tmcf);
60 static void nxt_router_conf_send(nxt_task_t *task,
61     nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
62 static void nxt_router_listen_sockets_sort(nxt_router_t *router,
63     nxt_router_temp_conf_t *tmcf);
64 
65 static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
66     nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
67 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
68 static nxt_app_t *nxt_router_listener_application(nxt_router_temp_conf_t *tmcf,
69     nxt_str_t *name);
70 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
71     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
72 static void nxt_router_listen_socket_ready(nxt_task_t *task,
73     nxt_port_recv_msg_t *msg, void *data);
74 static void nxt_router_listen_socket_error(nxt_task_t *task,
75     nxt_port_recv_msg_t *msg, void *data);
76 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task, nxt_mp_t *mp,
77     nxt_sockaddr_t *sa);
78 
79 static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
80     nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
81     const nxt_event_interface_t *interface);
82 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
83     nxt_router_engine_conf_t *recf);
84 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
85     nxt_router_engine_conf_t *recf);
86 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
87     nxt_router_engine_conf_t *recf);
88 static void nxt_router_engine_socket_count(nxt_queue_t *sockets);
89 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
90     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
91     nxt_work_handler_t handler);
92 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
93     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
94 
95 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
96     nxt_router_temp_conf_t *tmcf);
97 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
98     nxt_event_engine_t *engine);
99 static void nxt_router_apps_sort(nxt_router_t *router,
100     nxt_router_temp_conf_t *tmcf);
101 
102 static void nxt_router_engines_post(nxt_router_temp_conf_t *tmcf);
103 static void nxt_router_engine_post(nxt_router_engine_conf_t *recf);
104 
105 static void nxt_router_thread_start(void *data);
106 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
107     void *data);
108 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
109     void *data);
110 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
111     void *data);
112 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
113     void *data);
114 static void nxt_router_listen_socket_release(nxt_task_t *task,
115     nxt_socket_conf_joint_t *joint);
116 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
117     void *data);
118 static void nxt_router_conf_release(nxt_task_t *task,
119     nxt_socket_conf_joint_t *joint);
120 
121 static void nxt_router_send_sw_request(nxt_task_t *task, void *obj,
122     void *data);
123 static nxt_bool_t nxt_router_app_free(nxt_task_t *task, nxt_app_t *app);
124 static nxt_port_t * nxt_router_app_get_port(nxt_app_t *app, uint32_t req_id);
125 static void nxt_router_app_release_port(nxt_task_t *task, void *obj,
126     void *data);
127 
128 static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data);
129 static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj,
130     void *data);
131 static void nxt_router_conn_http_body_read(nxt_task_t *task, void *obj,
132     void *data);
133 static void nxt_router_process_http_request(nxt_task_t *task,
134     nxt_conn_t *c, nxt_app_parse_ctx_t *ap);
135 static void nxt_router_process_http_request_mp(nxt_task_t *task,
136     nxt_req_app_link_t *ra, nxt_port_t *port);
137 static nxt_int_t nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
138     nxt_app_wmsg_t *wmsg);
139 static nxt_int_t nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
140     nxt_app_wmsg_t *wmsg);
141 static nxt_int_t nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
142     nxt_app_wmsg_t *wmsg);
143 static void nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data);
144 static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data);
145 static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data);
146 static void nxt_router_conn_error(nxt_task_t *task, void *obj, void *data);
147 static void nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data);
148 static nxt_msec_t nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data);
149 
150 static void nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code,
151     const char* fmt, ...);
152 
153 static nxt_router_t  *nxt_router;
154 
155 
156 static nxt_app_prepare_msg_t  nxt_app_prepare_msg[] = {
157     nxt_python_prepare_msg,
158     nxt_php_prepare_msg,
159     nxt_go_prepare_msg,
160 };
161 
162 
163 nxt_int_t
164 nxt_router_start(nxt_task_t *task, void *data)
165 {
166     nxt_int_t      ret;
167     nxt_router_t   *router;
168     nxt_runtime_t  *rt;
169 
170     rt = task->thread->runtime;
171 
172     ret = nxt_app_http_init(task, rt);
173     if (nxt_slow_path(ret != NXT_OK)) {
174         return ret;
175     }
176 
177     router = nxt_zalloc(sizeof(nxt_router_t));
178     if (nxt_slow_path(router == NULL)) {
179         return NXT_ERROR;
180     }
181 
182     nxt_queue_init(&router->engines);
183     nxt_queue_init(&router->sockets);
184     nxt_queue_init(&router->apps);
185 
186     nxt_router = router;
187 
188     return NXT_OK;
189 }
190 
191 
192 static nxt_start_worker_t *
193 nxt_router_sw_create(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra)
194 {
195     nxt_port_t          *master_port;
196     nxt_runtime_t       *rt;
197     nxt_start_worker_t  *sw;
198 
199     sw = nxt_zalloc(sizeof(nxt_start_worker_t));
200 
201     if (nxt_slow_path(sw == NULL)) {
202         return NULL;
203     }
204 
205     sw->app = app;
206     sw->ra = ra;
207 
208     nxt_debug(task, "sw %p create, request #%uxD, app '%V' %p", sw,
209                     ra->req_id, &app->name, app);
210 
211     rt = task->thread->runtime;
212     master_port = rt->port_by_type[NXT_PROCESS_MASTER];
213 
214     sw->work.handler = nxt_router_send_sw_request;
215     sw->work.task = &master_port->engine->task;
216     sw->work.obj = sw;
217     sw->work.data = task->thread->engine;
218     sw->work.next = NULL;
219 
220     if (task->thread->engine != master_port->engine) {
221         nxt_debug(task, "sw %p post send to master engine %p", sw,
222                   master_port->engine);
223 
224         nxt_event_engine_post(master_port->engine, &sw->work);
225 
226     } else {
227         nxt_router_send_sw_request(task, sw, sw->work.data);
228     }
229 
230     return sw;
231 }
232 
233 
234 nxt_inline void
235 nxt_router_sw_release(nxt_task_t *task, nxt_start_worker_t *sw)
236 {
237     nxt_debug(task, "sw %p release", sw);
238 
239     nxt_free(sw);
240 }
241 
242 
243 static nxt_req_app_link_t *
244 nxt_router_ra_create(nxt_task_t *task, nxt_req_conn_link_t *rc)
245 {
246     nxt_mp_t            *mp;
247     nxt_req_app_link_t  *ra;
248 
249     mp = rc->conn->mem_pool;
250 
251     ra = nxt_mp_retain(mp, sizeof(nxt_req_app_link_t));
252 
253     if (nxt_slow_path(ra == NULL)) {
254         return NULL;
255     }
256 
257     nxt_debug(task, "ra #%uxD create", ra->req_id);
258 
259     nxt_memzero(ra, sizeof(nxt_req_app_link_t));
260 
261     ra->req_id = rc->req_id;
262     ra->app_port = NULL;
263     ra->rc = rc;
264 
265     ra->mem_pool = mp;
266 
267     ra->work.handler = NULL;
268     ra->work.task = &task->thread->engine->task;
269     ra->work.obj = ra;
270     ra->work.data = task->thread->engine;
271 
272     return ra;
273 }
274 
275 
276 static void
277 nxt_router_ra_release(nxt_task_t *task, void *obj, void *data)
278 {
279     nxt_req_app_link_t  *ra;
280     nxt_event_engine_t  *engine;
281 
282     ra = obj;
283     engine = data;
284 
285     if (task->thread->engine != engine) {
286         ra->work.handler = nxt_router_ra_release;
287         ra->work.task = &engine->task;
288         ra->work.next = NULL;
289 
290         nxt_debug(task, "ra #%uxD post release to %p", ra->req_id, engine);
291 
292         nxt_event_engine_post(engine, &ra->work);
293 
294         return;
295     }
296 
297     nxt_debug(task, "ra #%uxD release", ra->req_id);
298 
299     if (ra->app_port != NULL) {
300 
301         if (ra->rc->conn != NULL) {
302             ra->rc->app_port = ra->app_port;
303 
304         } else {
305             nxt_router_app_release_port(task, ra->app_port, ra->app_port->app);
306         }
307     }
308 
309     nxt_mp_release(ra->mem_pool, ra);
310 }
311 
312 
313 void
314 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
315 {
316     nxt_port_new_port_handler(task, msg);
317 
318     if (msg->port_msg.stream == 0) {
319         return;
320     }
321 
322     if (msg->new_port == NULL || msg->new_port->type != NXT_PROCESS_WORKER) {
323         msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
324     }
325 
326     nxt_port_rpc_handler(task, msg);
327 }
328 
329 
330 void
331 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
332 {
333     size_t                  dump_size;
334     nxt_int_t               ret;
335     nxt_buf_t               *b;
336     nxt_router_temp_conf_t  *tmcf;
337 
338     b = msg->buf;
339 
340     dump_size = nxt_buf_used_size(b);
341 
342     if (dump_size > 300) {
343         dump_size = 300;
344     }
345 
346     nxt_debug(task, "router conf data (%z): %*s",
347               msg->size, dump_size, b->mem.pos);
348 
349     tmcf = nxt_router_temp_conf(task);
350     if (nxt_slow_path(tmcf == NULL)) {
351         return;
352     }
353 
354     tmcf->conf->router = nxt_router;
355     tmcf->stream = msg->port_msg.stream;
356     tmcf->port = nxt_runtime_port_find(task->thread->runtime,
357                                        msg->port_msg.pid,
358                                        msg->port_msg.reply_port);
359 
360     ret = nxt_router_conf_create(task, tmcf, b->mem.pos, b->mem.free);
361 
362     if (nxt_fast_path(ret == NXT_OK)) {
363         nxt_router_conf_apply(task, tmcf, NULL);
364 
365     } else {
366         nxt_router_conf_error(task, tmcf);
367     }
368 }
369 
370 
371 void
372 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
373 {
374     nxt_port_remove_pid_handler(task, msg);
375 
376     if (msg->port_msg.stream == 0) {
377         return;
378     }
379 
380     msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
381 
382     nxt_port_rpc_handler(task, msg);
383 }
384 
385 
386 static nxt_router_temp_conf_t *
387 nxt_router_temp_conf(nxt_task_t *task)
388 {
389     nxt_mp_t                *mp, *tmp;
390     nxt_router_conf_t       *rtcf;
391     nxt_router_temp_conf_t  *tmcf;
392 
393     mp = nxt_mp_create(1024, 128, 256, 32);
394     if (nxt_slow_path(mp == NULL)) {
395         return NULL;
396     }
397 
398     rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t));
399     if (nxt_slow_path(rtcf == NULL)) {
400         goto fail;
401     }
402 
403     rtcf->mem_pool = mp;
404 
405     tmp = nxt_mp_create(1024, 128, 256, 32);
406     if (nxt_slow_path(tmp == NULL)) {
407         goto fail;
408     }
409 
410     tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t));
411     if (nxt_slow_path(tmcf == NULL)) {
412         goto temp_fail;
413     }
414 
415     tmcf->mem_pool = tmp;
416     tmcf->conf = rtcf;
417     tmcf->count = 1;
418     tmcf->engine = task->thread->engine;
419 
420     tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
421                                      sizeof(nxt_router_engine_conf_t));
422     if (nxt_slow_path(tmcf->engines == NULL)) {
423         goto temp_fail;
424     }
425 
426     nxt_queue_init(&tmcf->deleting);
427     nxt_queue_init(&tmcf->keeping);
428     nxt_queue_init(&tmcf->updating);
429     nxt_queue_init(&tmcf->pending);
430     nxt_queue_init(&tmcf->creating);
431     nxt_queue_init(&tmcf->apps);
432     nxt_queue_init(&tmcf->previous);
433 
434     return tmcf;
435 
436 temp_fail:
437 
438     nxt_mp_destroy(tmp);
439 
440 fail:
441 
442     nxt_mp_destroy(mp);
443 
444     return NULL;
445 }
446 
447 
448 static void
449 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
450 {
451     nxt_int_t                    ret;
452     nxt_router_t                 *router;
453     nxt_runtime_t                *rt;
454     nxt_queue_link_t             *qlk;
455     nxt_socket_conf_t            *skcf;
456     nxt_router_temp_conf_t       *tmcf;
457     const nxt_event_interface_t  *interface;
458 
459     tmcf = obj;
460 
461     qlk = nxt_queue_first(&tmcf->pending);
462 
463     if (qlk != nxt_queue_tail(&tmcf->pending)) {
464         nxt_queue_remove(qlk);
465         nxt_queue_insert_tail(&tmcf->creating, qlk);
466 
467         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
468 
469         nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
470 
471         return;
472     }
473 
474     rt = task->thread->runtime;
475 
476     interface = nxt_service_get(rt->services, "engine", NULL);
477 
478     router = tmcf->conf->router;
479 
480     ret = nxt_router_engines_create(task, router, tmcf, interface);
481     if (nxt_slow_path(ret != NXT_OK)) {
482         goto fail;
483     }
484 
485     ret = nxt_router_threads_create(task, rt, tmcf);
486     if (nxt_slow_path(ret != NXT_OK)) {
487         goto fail;
488     }
489 
490     nxt_router_apps_sort(router, tmcf);
491 
492     nxt_router_engines_post(tmcf);
493 
494     nxt_queue_add(&router->sockets, &tmcf->updating);
495     nxt_queue_add(&router->sockets, &tmcf->creating);
496 
497     nxt_router_conf_ready(task, tmcf);
498 
499     return;
500 
501 fail:
502 
503     nxt_router_conf_error(task, tmcf);
504 
505     return;
506 }
507 
508 
509 static void
510 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data)
511 {
512     nxt_joint_job_t  *job;
513 
514     job = obj;
515 
516     nxt_router_conf_ready(task, job->tmcf);
517 }
518 
519 
520 static void
521 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
522 {
523     nxt_debug(task, "temp conf count:%D", tmcf->count);
524 
525     if (--tmcf->count == 0) {
526         nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
527     }
528 }
529 
530 
531 static void
532 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
533 {
534     nxt_socket_t       s;
535     nxt_router_t       *router;
536     nxt_queue_link_t   *qlk;
537     nxt_socket_conf_t  *skcf;
538 
539     nxt_log(task, NXT_LOG_CRIT, "failed to apply new conf");
540 
541     for (qlk = nxt_queue_first(&tmcf->creating);
542          qlk != nxt_queue_tail(&tmcf->creating);
543          qlk = nxt_queue_next(qlk))
544     {
545         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
546         s = skcf->listen.socket;
547 
548         if (s != -1) {
549             nxt_socket_close(task, s);
550         }
551 
552         nxt_free(skcf->socket);
553     }
554 
555     router = tmcf->conf->router;
556 
557     nxt_queue_add(&router->sockets, &tmcf->keeping);
558     nxt_queue_add(&router->sockets, &tmcf->deleting);
559 
560     // TODO: new engines and threads
561 
562     nxt_mp_destroy(tmcf->conf->mem_pool);
563 
564     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
565 }
566 
567 
568 static void
569 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
570     nxt_port_msg_type_t type)
571 {
572     nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);
573 }
574 
575 
576 static nxt_conf_map_t  nxt_router_conf[] = {
577     {
578         nxt_string("listeners_threads"),
579         NXT_CONF_MAP_INT32,
580         offsetof(nxt_router_conf_t, threads),
581     },
582 };
583 
584 
585 static nxt_conf_map_t  nxt_router_app_conf[] = {
586     {
587         nxt_string("type"),
588         NXT_CONF_MAP_STR,
589         offsetof(nxt_router_app_conf_t, type),
590     },
591 
592     {
593         nxt_string("workers"),
594         NXT_CONF_MAP_INT32,
595         offsetof(nxt_router_app_conf_t, workers),
596     },
597 };
598 
599 
600 static nxt_conf_map_t  nxt_router_listener_conf[] = {
601     {
602         nxt_string("application"),
603         NXT_CONF_MAP_STR,
604         offsetof(nxt_router_listener_conf_t, application),
605     },
606 };
607 
608 
609 static nxt_conf_map_t  nxt_router_http_conf[] = {
610     {
611         nxt_string("header_buffer_size"),
612         NXT_CONF_MAP_SIZE,
613         offsetof(nxt_socket_conf_t, header_buffer_size),
614     },
615 
616     {
617         nxt_string("large_header_buffer_size"),
618         NXT_CONF_MAP_SIZE,
619         offsetof(nxt_socket_conf_t, large_header_buffer_size),
620     },
621 
622     {
623         nxt_string("large_header_buffers"),
624         NXT_CONF_MAP_SIZE,
625         offsetof(nxt_socket_conf_t, large_header_buffers),
626     },
627 
628     {
629         nxt_string("body_buffer_size"),
630         NXT_CONF_MAP_SIZE,
631         offsetof(nxt_socket_conf_t, body_buffer_size),
632     },
633 
634     {
635         nxt_string("max_body_size"),
636         NXT_CONF_MAP_SIZE,
637         offsetof(nxt_socket_conf_t, max_body_size),
638     },
639 
640     {
641         nxt_string("header_read_timeout"),
642         NXT_CONF_MAP_MSEC,
643         offsetof(nxt_socket_conf_t, header_read_timeout),
644     },
645 
646     {
647         nxt_string("body_read_timeout"),
648         NXT_CONF_MAP_MSEC,
649         offsetof(nxt_socket_conf_t, body_read_timeout),
650     },
651 };
652 
653 
654 static nxt_int_t
655 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
656     u_char *start, u_char *end)
657 {
658     u_char                      *p;
659     size_t                      size;
660     nxt_mp_t                    *mp;
661     uint32_t                    next;
662     nxt_int_t                   ret;
663     nxt_str_t                   name;
664     nxt_app_t                   *app, *prev;
665     nxt_app_type_t              type;
666     nxt_sockaddr_t              *sa;
667     nxt_conf_value_t            *conf, *http;
668     nxt_conf_value_t            *applications, *application;
669     nxt_conf_value_t            *listeners, *listener;
670     nxt_socket_conf_t           *skcf;
671     nxt_app_lang_module_t       *lang;
672     nxt_router_app_conf_t       apcf;
673     nxt_router_listener_conf_t  lscf;
674 
675     static nxt_str_t  http_path = nxt_string("/http");
676     static nxt_str_t  applications_path = nxt_string("/applications");
677     static nxt_str_t  listeners_path = nxt_string("/listeners");
678 
679     conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
680     if (conf == NULL) {
681         nxt_log(task, NXT_LOG_CRIT, "configuration parsing error");
682         return NXT_ERROR;
683     }
684 
685     mp = tmcf->conf->mem_pool;
686 
687     ret = nxt_conf_map_object(mp, conf, nxt_router_conf,
688                               nxt_nitems(nxt_router_conf), tmcf->conf);
689     if (ret != NXT_OK) {
690         nxt_log(task, NXT_LOG_CRIT, "root map error");
691         return NXT_ERROR;
692     }
693 
694     if (tmcf->conf->threads == 0) {
695         tmcf->conf->threads = nxt_ncpu;
696     }
697 
698     applications = nxt_conf_get_path(conf, &applications_path);
699     if (applications == NULL) {
700         nxt_log(task, NXT_LOG_CRIT, "no \"applications\" block");
701         return NXT_ERROR;
702     }
703 
704     next = 0;
705 
706     for ( ;; ) {
707         application = nxt_conf_next_object_member(applications, &name, &next);
708         if (application == NULL) {
709             break;
710         }
711 
712         nxt_debug(task, "application \"%V\"", &name);
713 
714         size = nxt_conf_json_length(application, NULL);
715 
716         app = nxt_malloc(sizeof(nxt_app_t) + name.length + size);
717         if (app == NULL) {
718             goto fail;
719         }
720 
721         nxt_memzero(app, sizeof(nxt_app_t));
722 
723         app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t));
724         app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t) + name.length);
725 
726         p = nxt_conf_json_print(app->conf.start, application, NULL);
727         app->conf.length = p - app->conf.start;
728 
729         nxt_assert(app->conf.length <= size);
730 
731         nxt_debug(task, "application conf \"%V\"", &app->conf);
732 
733         prev = nxt_router_app_find(&tmcf->conf->router->apps, &name);
734 
735         if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
736             nxt_free(app);
737 
738             nxt_queue_remove(&prev->link);
739             nxt_queue_insert_tail(&tmcf->previous, &prev->link);
740             continue;
741         }
742 
743         ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
744                                   nxt_nitems(nxt_router_app_conf), &apcf);
745         if (ret != NXT_OK) {
746             nxt_log(task, NXT_LOG_CRIT, "application map error");
747             goto app_fail;
748         }
749 
750         nxt_debug(task, "application type: %V", &apcf.type);
751         nxt_debug(task, "application workers: %D", apcf.workers);
752 
753         lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
754 
755         if (lang == NULL) {
756             nxt_log(task, NXT_LOG_CRIT, "unknown application type: \"%V\"",
757                     &apcf.type);
758             goto app_fail;
759         }
760 
761         nxt_debug(task, "application language module: \"%s\"", lang->file);
762 
763         type = nxt_app_parse_type(&lang->type);
764 
765         if (type == NXT_APP_UNKNOWN) {
766             nxt_log(task, NXT_LOG_CRIT, "unknown application type: \"%V\"",
767                     &lang->type);
768             goto app_fail;
769         }
770 
771         if (nxt_app_prepare_msg[type] == NULL) {
772             nxt_log(task, NXT_LOG_CRIT, "unsupported application type: \"%V\"",
773                     &lang->type);
774             goto app_fail;
775         }
776 
777         ret = nxt_thread_mutex_create(&app->mutex);
778         if (ret != NXT_OK) {
779             goto app_fail;
780         }
781 
782         nxt_queue_init(&app->ports);
783         nxt_queue_init(&app->requests);
784 
785         app->name.length = name.length;
786         nxt_memcpy(app->name.start, name.start, name.length);
787 
788         app->type = type;
789         app->max_workers = apcf.workers;
790         app->live = 1;
791         app->prepare_msg = nxt_app_prepare_msg[type];
792 
793         nxt_queue_insert_tail(&tmcf->apps, &app->link);
794     }
795 
796     http = nxt_conf_get_path(conf, &http_path);
797 #if 0
798     if (http == NULL) {
799         nxt_log(task, NXT_LOG_CRIT, "no \"http\" block");
800         return NXT_ERROR;
801     }
802 #endif
803 
804     listeners = nxt_conf_get_path(conf, &listeners_path);
805     if (listeners == NULL) {
806         nxt_log(task, NXT_LOG_CRIT, "no \"listeners\" block");
807         return NXT_ERROR;
808     }
809 
810     next = 0;
811 
812     for ( ;; ) {
813         listener = nxt_conf_next_object_member(listeners, &name, &next);
814         if (listener == NULL) {
815             break;
816         }
817 
818         sa = nxt_sockaddr_parse(mp, &name);
819         if (sa == NULL) {
820             nxt_log(task, NXT_LOG_CRIT, "invalid listener \"%V\"", &name);
821             goto fail;
822         }
823 
824         sa->type = SOCK_STREAM;
825 
826         nxt_debug(task, "router listener: \"%*s\"",
827                   sa->length, nxt_sockaddr_start(sa));
828 
829         skcf = nxt_router_socket_conf(task, mp, sa);
830         if (skcf == NULL) {
831             goto fail;
832         }
833 
834         ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
835                                   nxt_nitems(nxt_router_listener_conf), &lscf);
836         if (ret != NXT_OK) {
837             nxt_log(task, NXT_LOG_CRIT, "listener map error");
838             goto fail;
839         }
840 
841         nxt_debug(task, "application: %V", &lscf.application);
842 
843         // STUB, default values if http block is not defined.
844         skcf->header_buffer_size = 2048;
845         skcf->large_header_buffer_size = 8192;
846         skcf->large_header_buffers = 4;
847         skcf->body_buffer_size = 16 * 1024;
848         skcf->max_body_size = 2 * 1024 * 1024;
849         skcf->header_read_timeout = 5000;
850         skcf->body_read_timeout = 5000;
851 
852         if (http != NULL) {
853             ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
854                                       nxt_nitems(nxt_router_http_conf), skcf);
855             if (ret != NXT_OK) {
856                 nxt_log(task, NXT_LOG_CRIT, "http map error");
857                 goto fail;
858             }
859         }
860 
861         skcf->listen.handler = nxt_router_conn_init;
862         skcf->router_conf = tmcf->conf;
863         skcf->router_conf->count++;
864         skcf->application = nxt_router_listener_application(tmcf,
865                                                             &lscf.application);
866 
867         nxt_queue_insert_tail(&tmcf->pending, &skcf->link);
868     }
869 
870     nxt_router_listen_sockets_sort(tmcf->conf->router, tmcf);
871 
872     return NXT_OK;
873 
874 app_fail:
875 
876     nxt_free(app);
877 
878 fail:
879 
880     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
881 
882         nxt_queue_remove(&app->link);
883         nxt_thread_mutex_destroy(&app->mutex);
884         nxt_free(app);
885 
886     } nxt_queue_loop;
887 
888     return NXT_ERROR;
889 }
890 
891 
892 static nxt_app_t *
893 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
894 {
895     nxt_app_t  *app;
896 
897     nxt_queue_each(app, queue, nxt_app_t, link) {
898 
899         if (nxt_strstr_eq(name, &app->name)) {
900             return app;
901         }
902 
903     } nxt_queue_loop;
904 
905     return NULL;
906 }
907 
908 
909 static nxt_app_t *
910 nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name)
911 {
912     nxt_app_t  *app;
913 
914     app = nxt_router_app_find(&tmcf->apps, name);
915 
916     if (app == NULL) {
917         app = nxt_router_app_find(&tmcf->previous, name);
918     }
919 
920     return app;
921 }
922 
923 
924 static nxt_socket_conf_t *
925 nxt_router_socket_conf(nxt_task_t *task, nxt_mp_t *mp, nxt_sockaddr_t *sa)
926 {
927     nxt_socket_conf_t  *skcf;
928 
929     skcf = nxt_mp_zget(mp, sizeof(nxt_socket_conf_t));
930     if (nxt_slow_path(skcf == NULL)) {
931         return NULL;
932     }
933 
934     skcf->sockaddr = sa;
935 
936     skcf->listen.sockaddr = sa;
937     skcf->listen.socklen = sa->socklen;
938     skcf->listen.address_length = sa->length;
939 
940     skcf->listen.socket = -1;
941     skcf->listen.backlog = NXT_LISTEN_BACKLOG;
942     skcf->listen.flags = NXT_NONBLOCK;
943     skcf->listen.read_after_accept = 1;
944 
945     return skcf;
946 }
947 
948 
949 static void
950 nxt_router_listen_sockets_sort(nxt_router_t *router,
951     nxt_router_temp_conf_t *tmcf)
952 {
953     nxt_queue_link_t   *nqlk, *oqlk, *next;
954     nxt_socket_conf_t  *nskcf, *oskcf;
955 
956     for (nqlk = nxt_queue_first(&tmcf->pending);
957          nqlk != nxt_queue_tail(&tmcf->pending);
958          nqlk = next)
959     {
960         next = nxt_queue_next(nqlk);
961         nskcf = nxt_queue_link_data(nqlk, nxt_socket_conf_t, link);
962 
963         for (oqlk = nxt_queue_first(&router->sockets);
964              oqlk != nxt_queue_tail(&router->sockets);
965              oqlk = nxt_queue_next(oqlk))
966         {
967             oskcf = nxt_queue_link_data(oqlk, nxt_socket_conf_t, link);
968 
969             if (nxt_sockaddr_cmp(nskcf->sockaddr, oskcf->sockaddr)) {
970                 nskcf->socket = oskcf->socket;
971                 nskcf->listen.socket = oskcf->listen.socket;
972 
973                 nxt_queue_remove(oqlk);
974                 nxt_queue_insert_tail(&tmcf->keeping, oqlk);
975 
976                 nxt_queue_remove(nqlk);
977                 nxt_queue_insert_tail(&tmcf->updating, nqlk);
978 
979                 break;
980             }
981         }
982     }
983 
984     nxt_queue_add(&tmcf->deleting, &router->sockets);
985     nxt_queue_init(&router->sockets);
986 }
987 
988 
989 static void
990 nxt_router_listen_socket_rpc_create(nxt_task_t *task,
991     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)
992 {
993     uint32_t          stream;
994     nxt_buf_t         *b;
995     nxt_port_t        *main_port, *router_port;
996     nxt_runtime_t     *rt;
997     nxt_socket_rpc_t  *rpc;
998 
999     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
1000     if (rpc == NULL) {
1001         goto fail;
1002     }
1003 
1004     rpc->socket_conf = skcf;
1005     rpc->temp_conf = tmcf;
1006 
1007     b = nxt_buf_mem_alloc(tmcf->mem_pool, skcf->sockaddr->sockaddr_size, 0);
1008     if (b == NULL) {
1009         goto fail;
1010     }
1011 
1012     b->mem.free = nxt_cpymem(b->mem.free, skcf->sockaddr,
1013                              skcf->sockaddr->sockaddr_size);
1014 
1015     rt = task->thread->runtime;
1016     main_port = rt->port_by_type[NXT_PROCESS_MASTER];
1017     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
1018 
1019     stream = nxt_port_rpc_register_handler(task, router_port,
1020                                            nxt_router_listen_socket_ready,
1021                                            nxt_router_listen_socket_error,
1022                                            main_port->pid, rpc);
1023     if (stream == 0) {
1024         goto fail;
1025     }
1026 
1027     nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1,
1028                           stream, router_port->id, b);
1029 
1030     return;
1031 
1032 fail:
1033 
1034     nxt_router_conf_error(task, tmcf);
1035 }
1036 
1037 
1038 static void
1039 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
1040     void *data)
1041 {
1042     nxt_int_t            ret;
1043     nxt_socket_t         s;
1044     nxt_socket_rpc_t     *rpc;
1045     nxt_router_socket_t  *rtsk;
1046 
1047     rpc = data;
1048 
1049     s = msg->fd;
1050 
1051     ret = nxt_socket_nonblocking(task, s);
1052     if (nxt_slow_path(ret != NXT_OK)) {
1053         goto fail;
1054     }
1055 
1056 #ifdef TCP_DEFER_ACCEPT
1057 
1058     /* Defer Linux accept() up to for 1 second. */
1059     (void) nxt_socket_setsockopt(task, s, IPPROTO_TCP, TCP_DEFER_ACCEPT, 1);
1060 
1061 #endif
1062 
1063     ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
1064     if (nxt_slow_path(ret != NXT_OK)) {
1065         goto fail;
1066     }
1067 
1068     rtsk = nxt_malloc(sizeof(nxt_router_socket_t));
1069     if (nxt_slow_path(rtsk == NULL)) {
1070         goto fail;
1071     }
1072 
1073     rtsk->count = 0;
1074     rtsk->fd = s;
1075 
1076     rpc->socket_conf->listen.socket = s;
1077     rpc->socket_conf->socket = rtsk;
1078 
1079     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
1080                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
1081 
1082     return;
1083 
1084 fail:
1085 
1086     nxt_socket_close(task, s);
1087 
1088     nxt_router_conf_error(task, rpc->temp_conf);
1089 }
1090 
1091 
1092 static void
1093 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
1094     void *data)
1095 {
1096     u_char                  *p;
1097     size_t                  size;
1098     uint8_t                 error;
1099     nxt_buf_t               *in, *out;
1100     nxt_sockaddr_t          *sa;
1101     nxt_socket_rpc_t        *rpc;
1102     nxt_router_temp_conf_t  *tmcf;
1103 
1104     static nxt_str_t  socket_errors[] = {
1105         nxt_string("ListenerSystem"),
1106         nxt_string("ListenerNoIPv6"),
1107         nxt_string("ListenerPort"),
1108         nxt_string("ListenerInUse"),
1109         nxt_string("ListenerNoAddress"),
1110         nxt_string("ListenerNoAccess"),
1111         nxt_string("ListenerPath"),
1112     };
1113 
1114     rpc = data;
1115     sa = rpc->socket_conf->sockaddr;
1116 
1117     in = msg->buf;
1118     p = in->mem.pos;
1119 
1120     error = *p++;
1121 
1122     size = sizeof("listen socket error: ") - 1
1123            + sizeof("{listener: \"\", code:\"\", message: \"\"}") - 1
1124            + sa->length + socket_errors[error].length + (in->mem.free - p);
1125 
1126     tmcf = rpc->temp_conf;
1127 
1128     out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
1129     if (nxt_slow_path(out == NULL)) {
1130         return;
1131     }
1132 
1133     out->mem.free = nxt_sprintf(out->mem.free, out->mem.end,
1134                         "listen socket error: "
1135                         "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}",
1136                         sa->length, nxt_sockaddr_start(sa),
1137                         &socket_errors[error], in->mem.free - p, p);
1138 
1139     nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos);
1140 
1141     nxt_router_conf_error(task, tmcf);
1142 }
1143 
1144 
1145 static nxt_int_t
1146 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
1147     nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
1148 {
1149     nxt_int_t                 ret;
1150     nxt_uint_t                n, threads;
1151     nxt_queue_link_t          *qlk;
1152     nxt_router_engine_conf_t  *recf;
1153 
1154     threads = tmcf->conf->threads;
1155 
1156     tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
1157                                      sizeof(nxt_router_engine_conf_t));
1158     if (nxt_slow_path(tmcf->engines == NULL)) {
1159         return NXT_ERROR;
1160     }
1161 
1162     n = 0;
1163 
1164     for (qlk = nxt_queue_first(&router->engines);
1165          qlk != nxt_queue_tail(&router->engines);
1166          qlk = nxt_queue_next(qlk))
1167     {
1168         recf = nxt_array_zero_add(tmcf->engines);
1169         if (nxt_slow_path(recf == NULL)) {
1170             return NXT_ERROR;
1171         }
1172 
1173         recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0);
1174 
1175         if (n < threads) {
1176             ret = nxt_router_engine_conf_update(tmcf, recf);
1177 
1178         } else {
1179             ret = nxt_router_engine_conf_delete(tmcf, recf);
1180         }
1181 
1182         if (nxt_slow_path(ret != NXT_OK)) {
1183             return ret;
1184         }
1185 
1186         n++;
1187     }
1188 
1189     tmcf->new_threads = n;
1190 
1191     while (n < threads) {
1192         recf = nxt_array_zero_add(tmcf->engines);
1193         if (nxt_slow_path(recf == NULL)) {
1194             return NXT_ERROR;
1195         }
1196 
1197         recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
1198         if (nxt_slow_path(recf->engine == NULL)) {
1199             return NXT_ERROR;
1200         }
1201 
1202         ret = nxt_router_engine_conf_create(tmcf, recf);
1203         if (nxt_slow_path(ret != NXT_OK)) {
1204             return ret;
1205         }
1206 
1207         nxt_queue_insert_tail(&router->engines, &recf->engine->link0);
1208 
1209         n++;
1210     }
1211 
1212     return NXT_OK;
1213 }
1214 
1215 
1216 static nxt_int_t
1217 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
1218     nxt_router_engine_conf_t *recf)
1219 {
1220     nxt_int_t              ret;
1221     nxt_thread_spinlock_t  *lock;
1222 
1223     ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating,
1224                                           nxt_router_listen_socket_create);
1225     if (nxt_slow_path(ret != NXT_OK)) {
1226         return ret;
1227     }
1228 
1229     ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating,
1230                                           nxt_router_listen_socket_create);
1231     if (nxt_slow_path(ret != NXT_OK)) {
1232         return ret;
1233     }
1234 
1235     lock = &tmcf->conf->router->lock;
1236 
1237     nxt_thread_spin_lock(lock);
1238 
1239     nxt_router_engine_socket_count(&tmcf->creating);
1240     nxt_router_engine_socket_count(&tmcf->updating);
1241 
1242     nxt_thread_spin_unlock(lock);
1243 
1244     return ret;
1245 }
1246 
1247 
1248 static nxt_int_t
1249 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
1250     nxt_router_engine_conf_t *recf)
1251 {
1252     nxt_int_t              ret;
1253     nxt_thread_spinlock_t  *lock;
1254 
1255     ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating,
1256                                           nxt_router_listen_socket_create);
1257     if (nxt_slow_path(ret != NXT_OK)) {
1258         return ret;
1259     }
1260 
1261     ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating,
1262                                           nxt_router_listen_socket_update);
1263     if (nxt_slow_path(ret != NXT_OK)) {
1264         return ret;
1265     }
1266 
1267     ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting);
1268     if (nxt_slow_path(ret != NXT_OK)) {
1269         return ret;
1270     }
1271 
1272     lock = &tmcf->conf->router->lock;
1273 
1274     nxt_thread_spin_lock(lock);
1275 
1276     nxt_router_engine_socket_count(&tmcf->creating);
1277 
1278     nxt_thread_spin_unlock(lock);
1279 
1280     return ret;
1281 }
1282 
1283 
1284 static nxt_int_t
1285 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
1286     nxt_router_engine_conf_t *recf)
1287 {
1288     nxt_int_t  ret;
1289 
1290     ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->updating);
1291     if (nxt_slow_path(ret != NXT_OK)) {
1292         return ret;
1293     }
1294 
1295     return nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting);
1296 }
1297 
1298 
1299 static nxt_int_t
1300 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
1301     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
1302     nxt_work_handler_t handler)
1303 {
1304     nxt_joint_job_t          *job;
1305     nxt_queue_link_t         *qlk;
1306     nxt_socket_conf_t        *skcf;
1307     nxt_socket_conf_joint_t  *joint;
1308 
1309     for (qlk = nxt_queue_first(sockets);
1310          qlk != nxt_queue_tail(sockets);
1311          qlk = nxt_queue_next(qlk))
1312     {
1313         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
1314         if (nxt_slow_path(job == NULL)) {
1315             return NXT_ERROR;
1316         }
1317 
1318         job->work.next = recf->jobs;
1319         recf->jobs = &job->work;
1320 
1321         job->task = tmcf->engine->task;
1322         job->work.handler = handler;
1323         job->work.task = &job->task;
1324         job->work.obj = job;
1325         job->tmcf = tmcf;
1326 
1327         tmcf->count++;
1328 
1329         joint = nxt_mp_alloc(tmcf->conf->mem_pool,
1330                              sizeof(nxt_socket_conf_joint_t));
1331         if (nxt_slow_path(joint == NULL)) {
1332             return NXT_ERROR;
1333         }
1334 
1335         job->work.data = joint;
1336 
1337         joint->count = 1;
1338 
1339         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1340         skcf->count++;
1341         joint->socket_conf = skcf;
1342 
1343         joint->engine = recf->engine;
1344     }
1345 
1346     return NXT_OK;
1347 }
1348 
1349 
1350 static void
1351 nxt_router_engine_socket_count(nxt_queue_t *sockets)
1352 {
1353     nxt_queue_link_t   *qlk;
1354     nxt_socket_conf_t  *skcf;
1355 
1356     for (qlk = nxt_queue_first(sockets);
1357          qlk != nxt_queue_tail(sockets);
1358          qlk = nxt_queue_next(qlk))
1359     {
1360         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1361         skcf->socket->count++;
1362     }
1363 }
1364 
1365 
1366 static nxt_int_t
1367 nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
1368     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets)
1369 {
1370     nxt_joint_job_t   *job;
1371     nxt_queue_link_t  *qlk;
1372 
1373     for (qlk = nxt_queue_first(sockets);
1374          qlk != nxt_queue_tail(sockets);
1375          qlk = nxt_queue_next(qlk))
1376     {
1377         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
1378         if (nxt_slow_path(job == NULL)) {
1379             return NXT_ERROR;
1380         }
1381 
1382         job->work.next = recf->jobs;
1383         recf->jobs = &job->work;
1384 
1385         job->task = tmcf->engine->task;
1386         job->work.handler = nxt_router_listen_socket_delete;
1387         job->work.task = &job->task;
1388         job->work.obj = job;
1389         job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1390         job->tmcf = tmcf;
1391 
1392         tmcf->count++;
1393     }
1394 
1395     return NXT_OK;
1396 }
1397 
1398 
1399 static nxt_int_t
1400 nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
1401     nxt_router_temp_conf_t *tmcf)
1402 {
1403     nxt_int_t                 ret;
1404     nxt_uint_t                i, threads;
1405     nxt_router_engine_conf_t  *recf;
1406 
1407     recf = tmcf->engines->elts;
1408     threads = tmcf->conf->threads;
1409 
1410     for (i = tmcf->new_threads; i < threads; i++) {
1411         ret = nxt_router_thread_create(task, rt, recf[i].engine);
1412         if (nxt_slow_path(ret != NXT_OK)) {
1413             return ret;
1414         }
1415     }
1416 
1417     return NXT_OK;
1418 }
1419 
1420 
1421 static nxt_int_t
1422 nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
1423     nxt_event_engine_t *engine)
1424 {
1425     nxt_int_t            ret;
1426     nxt_thread_link_t    *link;
1427     nxt_thread_handle_t  handle;
1428 
1429     link = nxt_zalloc(sizeof(nxt_thread_link_t));
1430 
1431     if (nxt_slow_path(link == NULL)) {
1432         return NXT_ERROR;
1433     }
1434 
1435     link->start = nxt_router_thread_start;
1436     link->engine = engine;
1437     link->work.handler = nxt_router_thread_exit_handler;
1438     link->work.task = task;
1439     link->work.data = link;
1440 
1441     nxt_queue_insert_tail(&rt->engines, &engine->link);
1442 
1443     ret = nxt_thread_create(&handle, link);
1444 
1445     if (nxt_slow_path(ret != NXT_OK)) {
1446         nxt_queue_remove(&engine->link);
1447     }
1448 
1449     return ret;
1450 }
1451 
1452 
1453 static void
1454 nxt_router_apps_sort(nxt_router_t *router, nxt_router_temp_conf_t *tmcf)
1455 {
1456     nxt_app_t    *app;
1457     nxt_port_t   *port;
1458 
1459     nxt_queue_each(app, &router->apps, nxt_app_t, link) {
1460 
1461         nxt_queue_remove(&app->link);
1462 
1463         nxt_thread_log_debug("about to remove app '%V' %p", &app->name, app);
1464 
1465         app->live = 0;
1466 
1467         if (nxt_router_app_free(NULL, app) != 0) {
1468             continue;
1469         }
1470 
1471         if (!nxt_queue_is_empty(&app->requests)) {
1472 
1473             nxt_thread_log_debug("app '%V' %p pending requests found",
1474                                  &app->name, app);
1475             continue;
1476         }
1477 
1478         do {
1479             port = nxt_router_app_get_port(app, 0);
1480             if (port == NULL) {
1481                 break;
1482             }
1483 
1484             nxt_thread_log_debug("port %p send quit", port);
1485 
1486             nxt_port_socket_write(&port->engine->task, port,
1487                                   NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
1488         } while (1);
1489 
1490     } nxt_queue_loop;
1491 
1492     nxt_queue_add(&router->apps, &tmcf->previous);
1493     nxt_queue_add(&router->apps, &tmcf->apps);
1494 }
1495 
1496 
1497 static void
1498 nxt_router_engines_post(nxt_router_temp_conf_t *tmcf)
1499 {
1500     nxt_uint_t                n;
1501     nxt_router_engine_conf_t  *recf;
1502 
1503     recf = tmcf->engines->elts;
1504 
1505     for (n = tmcf->engines->nelts; n != 0; n--) {
1506         nxt_router_engine_post(recf);
1507         recf++;
1508     }
1509 }
1510 
1511 
1512 static void
1513 nxt_router_engine_post(nxt_router_engine_conf_t *recf)
1514 {
1515     nxt_work_t  *work, *next;
1516 
1517     for (work = recf->jobs; work != NULL; work = next) {
1518         next = work->next;
1519         work->next = NULL;
1520 
1521         nxt_event_engine_post(recf->engine, work);
1522     }
1523 }
1524 
1525 
1526 static void
1527 nxt_router_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
1528 
1529 static nxt_port_handler_t  nxt_router_app_port_handlers[] = {
1530     NULL, /* NXT_PORT_MSG_QUIT         */
1531     NULL, /* NXT_PORT_MSG_NEW_PORT     */
1532     NULL, /* NXT_PORT_MSG_CHANGE_FILE  */
1533     /* TODO: remove mmap_handler from app ports */
1534     nxt_port_mmap_handler, /* NXT_PORT_MSG_MMAP         */
1535     nxt_router_app_data_handler,
1536     NULL, /* NXT_PORT_MSG_REMOVE_PID   */
1537     NULL, /* NXT_PORT_MSG_READY        */
1538     NULL, /* NXT_PORT_MSG_START_WORKER */
1539     nxt_port_rpc_handler,
1540     nxt_port_rpc_handler,
1541 };
1542 
1543 
1544 static void
1545 nxt_router_thread_start(void *data)
1546 {
1547     nxt_int_t           ret;
1548     nxt_port_t          *port;
1549     nxt_task_t          *task;
1550     nxt_thread_t        *thread;
1551     nxt_thread_link_t   *link;
1552     nxt_event_engine_t  *engine;
1553 
1554     link = data;
1555     engine = link->engine;
1556     task = &engine->task;
1557 
1558     thread = nxt_thread();
1559 
1560     nxt_event_engine_thread_adopt(engine);
1561 
1562     /* STUB */
1563     thread->runtime = engine->task.thread->runtime;
1564 
1565     engine->task.thread = thread;
1566     engine->task.log = thread->log;
1567     thread->engine = engine;
1568     thread->task = &engine->task;
1569     thread->fiber = &engine->fibers->fiber;
1570 
1571     engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
1572 
1573     port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid,
1574                         NXT_PROCESS_ROUTER);
1575     if (nxt_slow_path(port == NULL)) {
1576         return;
1577     }
1578 
1579     ret = nxt_port_socket_init(task, port, 0);
1580     if (nxt_slow_path(ret != NXT_OK)) {
1581         nxt_mp_release(port->mem_pool, port);
1582         return;
1583     }
1584 
1585     engine->port = port;
1586 
1587     nxt_port_enable(task, port, nxt_router_app_port_handlers);
1588 
1589     nxt_event_engine_start(engine);
1590 }
1591 
1592 
1593 static void
1594 nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data)
1595 {
1596     nxt_joint_job_t          *job;
1597     nxt_listen_event_t       *listen;
1598     nxt_listen_socket_t      *ls;
1599     nxt_socket_conf_joint_t  *joint;
1600 
1601     job = obj;
1602     joint = data;
1603 
1604     ls = &joint->socket_conf->listen;
1605 
1606     nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link);
1607 
1608     listen = nxt_listen_event(task, ls);
1609     if (nxt_slow_path(listen == NULL)) {
1610         nxt_router_listen_socket_release(task, joint);
1611         return;
1612     }
1613 
1614     listen->socket.data = joint;
1615 
1616     job->work.next = NULL;
1617     job->work.handler = nxt_router_conf_wait;
1618 
1619     nxt_event_engine_post(job->tmcf->engine, &job->work);
1620 }
1621 
1622 
1623 nxt_inline nxt_listen_event_t *
1624 nxt_router_listen_event(nxt_queue_t *listen_connections,
1625     nxt_socket_conf_t *skcf)
1626 {
1627     nxt_socket_t        fd;
1628     nxt_queue_link_t    *qlk;
1629     nxt_listen_event_t  *listen;
1630 
1631     fd = skcf->socket->fd;
1632 
1633     for (qlk = nxt_queue_first(listen_connections);
1634          qlk != nxt_queue_tail(listen_connections);
1635          qlk = nxt_queue_next(qlk))
1636     {
1637         listen = nxt_queue_link_data(qlk, nxt_listen_event_t, link);
1638 
1639         if (fd == listen->socket.fd) {
1640             return listen;
1641         }
1642     }
1643 
1644     return NULL;
1645 }
1646 
1647 
1648 static void
1649 nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data)
1650 {
1651     nxt_joint_job_t          *job;
1652     nxt_event_engine_t       *engine;
1653     nxt_listen_event_t       *listen;
1654     nxt_socket_conf_joint_t  *joint, *old;
1655 
1656     job = obj;
1657     joint = data;
1658 
1659     engine = task->thread->engine;
1660 
1661     nxt_queue_insert_tail(&engine->joints, &joint->link);
1662 
1663     listen = nxt_router_listen_event(&engine->listen_connections,
1664                                      joint->socket_conf);
1665 
1666     old = listen->socket.data;
1667     listen->socket.data = joint;
1668     listen->listen = &joint->socket_conf->listen;
1669 
1670     job->work.next = NULL;
1671     job->work.handler = nxt_router_conf_wait;
1672 
1673     nxt_event_engine_post(job->tmcf->engine, &job->work);
1674 
1675     /*
1676      * The task is allocated from configuration temporary
1677      * memory pool so it can be freed after engine post operation.
1678      */
1679 
1680     nxt_router_conf_release(&engine->task, old);
1681 }
1682 
1683 
1684 static void
1685 nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data)
1686 {
1687     nxt_joint_job_t     *job;
1688     nxt_socket_conf_t   *skcf;
1689     nxt_listen_event_t  *listen;
1690     nxt_event_engine_t  *engine;
1691 
1692     job = obj;
1693     skcf = data;
1694 
1695     engine = task->thread->engine;
1696 
1697     listen = nxt_router_listen_event(&engine->listen_connections, skcf);
1698 
1699     nxt_fd_event_delete(engine, &listen->socket);
1700 
1701     nxt_debug(task, "engine %p: listen socket delete: %d", engine,
1702               listen->socket.fd);
1703 
1704     listen->timer.handler = nxt_router_listen_socket_close;
1705     listen->timer.work_queue = &engine->fast_work_queue;
1706 
1707     nxt_timer_add(engine, &listen->timer, 0);
1708 
1709     job->work.next = NULL;
1710     job->work.handler = nxt_router_conf_wait;
1711 
1712     nxt_event_engine_post(job->tmcf->engine, &job->work);
1713 }
1714 
1715 
1716 static void
1717 nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data)
1718 {
1719     nxt_timer_t              *timer;
1720     nxt_listen_event_t       *listen;
1721     nxt_socket_conf_joint_t  *joint;
1722 
1723     timer = obj;
1724     listen = nxt_timer_data(timer, nxt_listen_event_t, timer);
1725     joint = listen->socket.data;
1726 
1727     nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine,
1728               listen->socket.fd);
1729 
1730     nxt_queue_remove(&listen->link);
1731 
1732     /* 'task' refers to listen->task and we cannot use after nxt_free() */
1733     task = &task->thread->engine->task;
1734 
1735     nxt_free(listen);
1736 
1737     nxt_router_listen_socket_release(task, joint);
1738 }
1739 
1740 
1741 static void
1742 nxt_router_listen_socket_release(nxt_task_t *task,
1743     nxt_socket_conf_joint_t *joint)
1744 {
1745     nxt_socket_conf_t      *skcf;
1746     nxt_router_socket_t    *rtsk;
1747     nxt_thread_spinlock_t  *lock;
1748 
1749     skcf = joint->socket_conf;
1750     rtsk = skcf->socket;
1751     lock = &skcf->router_conf->router->lock;
1752 
1753     nxt_thread_spin_lock(lock);
1754 
1755     nxt_debug(task, "engine %p: listen socket release: rtsk->count %D",
1756               task->thread->engine, rtsk->count);
1757 
1758     if (--rtsk->count != 0) {
1759         rtsk = NULL;
1760     }
1761 
1762     nxt_thread_spin_unlock(lock);
1763 
1764     if (rtsk != NULL) {
1765         nxt_socket_close(task, rtsk->fd);
1766         nxt_free(rtsk);
1767         skcf->socket = NULL;
1768     }
1769 
1770     nxt_router_conf_release(task, joint);
1771 }
1772 
1773 
1774 static void
1775 nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
1776 {
1777     nxt_bool_t             exit;
1778     nxt_socket_conf_t      *skcf;
1779     nxt_router_conf_t      *rtcf;
1780     nxt_thread_spinlock_t  *lock;
1781 
1782     nxt_debug(task, "conf joint %p count: %D", joint, joint->count);
1783 
1784     if (--joint->count != 0) {
1785         return;
1786     }
1787 
1788     nxt_queue_remove(&joint->link);
1789 
1790     skcf = joint->socket_conf;
1791     rtcf = skcf->router_conf;
1792     lock = &rtcf->router->lock;
1793 
1794     nxt_thread_spin_lock(lock);
1795 
1796     nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count,
1797               rtcf, rtcf->count);
1798 
1799     if (--skcf->count != 0) {
1800         rtcf = NULL;
1801 
1802     } else {
1803         nxt_queue_remove(&skcf->link);
1804 
1805         if (--rtcf->count != 0) {
1806             rtcf = NULL;
1807         }
1808     }
1809 
1810     nxt_thread_spin_unlock(lock);
1811 
1812     /* TODO remove engine->port */
1813     /* TODO excude from connected ports */
1814 
1815     /* The joint content can be used before memory pool destruction. */
1816     exit = nxt_queue_is_empty(&joint->engine->joints);
1817 
1818     if (rtcf != NULL) {
1819         nxt_debug(task, "old router conf is destroyed");
1820 
1821         nxt_mp_thread_adopt(rtcf->mem_pool);
1822 
1823         nxt_mp_destroy(rtcf->mem_pool);
1824     }
1825 
1826     if (exit) {
1827         nxt_thread_exit(task->thread);
1828     }
1829 }
1830 
1831 
1832 static void
1833 nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data)
1834 {
1835     nxt_port_t           *port;
1836     nxt_thread_link_t    *link;
1837     nxt_event_engine_t   *engine;
1838     nxt_thread_handle_t  handle;
1839 
1840     handle = (nxt_thread_handle_t) obj;
1841     link = data;
1842 
1843     nxt_thread_wait(handle);
1844 
1845     engine = link->engine;
1846 
1847     nxt_queue_remove(&engine->link);
1848 
1849     port = engine->port;
1850 
1851     // TODO notify all apps
1852 
1853     nxt_mp_thread_adopt(port->mem_pool);
1854     nxt_port_release(port);
1855 
1856     nxt_mp_thread_adopt(engine->mem_pool);
1857     nxt_mp_destroy(engine->mem_pool);
1858 
1859     nxt_event_engine_free(engine);
1860 
1861     nxt_free(link);
1862 }
1863 
1864 
1865 static const nxt_conn_state_t  nxt_router_conn_read_header_state
1866     nxt_aligned(64) =
1867 {
1868     .ready_handler = nxt_router_conn_http_header_parse,
1869     .close_handler = nxt_router_conn_close,
1870     .error_handler = nxt_router_conn_error,
1871 
1872     .timer_handler = nxt_router_conn_timeout,
1873     .timer_value = nxt_router_conn_timeout_value,
1874     .timer_data = offsetof(nxt_socket_conf_t, header_read_timeout),
1875 };
1876 
1877 
1878 static const nxt_conn_state_t  nxt_router_conn_read_body_state
1879     nxt_aligned(64) =
1880 {
1881     .ready_handler = nxt_router_conn_http_body_read,
1882     .close_handler = nxt_router_conn_close,
1883     .error_handler = nxt_router_conn_error,
1884 
1885     .timer_handler = nxt_router_conn_timeout,
1886     .timer_value = nxt_router_conn_timeout_value,
1887     .timer_data = offsetof(nxt_socket_conf_t, body_read_timeout),
1888     .timer_autoreset = 1,
1889 };
1890 
1891 
1892 static void
1893 nxt_router_conn_init(nxt_task_t *task, void *obj, void *data)
1894 {
1895     size_t                   size;
1896     nxt_conn_t               *c;
1897     nxt_event_engine_t       *engine;
1898     nxt_socket_conf_joint_t  *joint;
1899 
1900     c = obj;
1901     joint = data;
1902 
1903     nxt_debug(task, "router conn init");
1904 
1905     joint->count++;
1906 
1907     size = joint->socket_conf->header_buffer_size;
1908     c->read = nxt_buf_mem_alloc(c->mem_pool, size, 0);
1909 
1910     c->socket.data = NULL;
1911 
1912     engine = task->thread->engine;
1913     c->read_work_queue = &engine->fast_work_queue;
1914     c->write_work_queue = &engine->fast_work_queue;
1915 
1916     c->read_state = &nxt_router_conn_read_header_state;
1917 
1918     nxt_conn_read(engine, c);
1919 }
1920 
1921 
1922 static const nxt_conn_state_t  nxt_router_conn_write_state
1923     nxt_aligned(64) =
1924 {
1925     .ready_handler = nxt_router_conn_ready,
1926     .close_handler = nxt_router_conn_close,
1927     .error_handler = nxt_router_conn_error,
1928 };
1929 
1930 
1931 static void
1932 nxt_router_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1933 {
1934     size_t               dump_size;
1935     nxt_buf_t            *b, *last;
1936     nxt_conn_t           *c;
1937     nxt_req_conn_link_t  *rc;
1938     nxt_event_engine_t   *engine;
1939 
1940     b = msg->buf;
1941     engine = task->thread->engine;
1942 
1943     rc = nxt_event_engine_request_find(engine, msg->port_msg.stream);
1944     if (nxt_slow_path(rc == NULL)) {
1945         nxt_debug(task, "request id %08uxD not found", msg->port_msg.stream);
1946 
1947         return;
1948     }
1949 
1950     c = rc->conn;
1951 
1952     dump_size = nxt_buf_used_size(b);
1953 
1954     if (dump_size > 300) {
1955         dump_size = 300;
1956     }
1957 
1958     nxt_debug(task, "%srouter app data (%z): %*s",
1959               msg->port_msg.last ? "last " : "", msg->size, dump_size,
1960               b->mem.pos);
1961 
1962     if (msg->size == 0) {
1963         b = NULL;
1964     }
1965 
1966     if (msg->port_msg.last != 0) {
1967         nxt_debug(task, "router data create last buf");
1968 
1969         last = nxt_buf_sync_alloc(c->mem_pool, NXT_BUF_SYNC_LAST);
1970         if (nxt_slow_path(last == NULL)) {
1971             /* TODO pogorevaTb */
1972         }
1973 
1974         nxt_buf_chain_add(&b, last);
1975 
1976         if (rc->app_port != NULL) {
1977             nxt_router_app_release_port(task, rc->app_port, rc->app_port->app);
1978 
1979             rc->app_port = NULL;
1980         }
1981 
1982         rc->conn = NULL;
1983     }
1984 
1985     if (b == NULL) {
1986         return;
1987     }
1988 
1989     if (msg->buf == b) {
1990         /* Disable instant buffer completion/re-using by port. */
1991         msg->buf = NULL;
1992     }
1993 
1994     if (c->write == NULL) {
1995         c->write = b;
1996         c->write_state = &nxt_router_conn_write_state;
1997 
1998         nxt_conn_write(task->thread->engine, c);
1999     } else {
2000         nxt_debug(task, "router data attach out bufs to existing chain");
2001 
2002         nxt_buf_chain_add(&c->write, b);
2003     }
2004 }
2005 
2006 nxt_inline const char *
2007 nxt_router_text_by_code(int code)
2008 {
2009     switch (code) {
2010     case 400: return "Bad request";
2011     case 404: return "Not found";
2012     case 403: return "Forbidden";
2013     case 408: return "Request Timeout";
2014     case 411: return "Length Required";
2015     case 413: return "Request Entity Too Large";
2016     case 500:
2017     default:  return "Internal server error";
2018     }
2019 }
2020 
2021 
2022 static nxt_buf_t *
2023 nxt_router_get_error_buf(nxt_task_t *task, nxt_mp_t *mp, int code,
2024     const char* fmt, va_list args)
2025 {
2026     nxt_buf_t   *b, *last;
2027     const char  *msg;
2028 
2029     b = nxt_buf_mem_ts_alloc(task, mp, 16384);
2030     if (nxt_slow_path(b == NULL)) {
2031         return NULL;
2032     }
2033 
2034     b->mem.free = nxt_sprintf(b->mem.free, b->mem.end,
2035         "HTTP/1.0 %d %s\r\n"
2036         "Content-Type: text/plain\r\n"
2037         "Connection: close\r\n\r\n",
2038         code, nxt_router_text_by_code(code));
2039 
2040     msg = (const char *) b->mem.free;
2041 
2042     b->mem.free = nxt_vsprintf(b->mem.free, b->mem.end, fmt, args);
2043     b->mem.free[0] = '\0';
2044 
2045     nxt_log_alert(task->log, "error %d: %s", code, msg);
2046 
2047     last = nxt_buf_mem_ts_alloc(task, mp, 0);
2048 
2049     if (nxt_slow_path(last == NULL)) {
2050         nxt_mp_release(mp, b);
2051         return NULL;
2052     }
2053 
2054     nxt_buf_set_sync(last);
2055     nxt_buf_set_last(last);
2056 
2057     nxt_buf_chain_add(&b, last);
2058 
2059     return b;
2060 }
2061 
2062 
2063 
2064 static void
2065 nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code,
2066     const char* fmt, ...)
2067 {
2068     va_list    args;
2069     nxt_buf_t  *b;
2070 
2071     va_start(args, fmt);
2072     b = nxt_router_get_error_buf(task, c->mem_pool, code, fmt, args);
2073     va_end(args);
2074 
2075     if (c->socket.data != NULL) {
2076         nxt_mp_free(c->mem_pool, c->socket.data);
2077         c->socket.data = NULL;
2078     }
2079 
2080     if (c->write == NULL) {
2081         c->write = b;
2082         c->write_state = &nxt_router_conn_write_state;
2083 
2084         nxt_conn_write(task->thread->engine, c);
2085     } else {
2086         nxt_debug(task, "router data attach out bufs to existing chain");
2087 
2088         nxt_buf_chain_add(&c->write, b);
2089     }
2090 }
2091 
2092 
2093 static void
2094 nxt_router_sw_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data)
2095 {
2096     nxt_start_worker_t  *sw;
2097 
2098     sw = data;
2099 
2100     nxt_assert(sw != NULL);
2101     nxt_assert(sw->app->pending_workers != 0);
2102 
2103     msg->new_port->app = sw->app;
2104 
2105     sw->app->pending_workers--;
2106     sw->app->workers++;
2107 
2108     nxt_debug(task, "sw %p got port %p", sw, msg->new_port);
2109 
2110     nxt_router_app_release_port(task, msg->new_port, sw->app);
2111 
2112     nxt_router_sw_release(task, sw);
2113 }
2114 
2115 
2116 static void
2117 nxt_router_sw_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data)
2118 {
2119     nxt_start_worker_t  *sw;
2120 
2121     sw = data;
2122 
2123     nxt_assert(sw != NULL);
2124     nxt_assert(sw->app->pending_workers != 0);
2125 
2126     sw->app->pending_workers--;
2127 
2128     nxt_debug(task, "sw %p error, failed to start app '%V'", sw, &sw->app->name);
2129 
2130     nxt_router_sw_release(task, sw);
2131 }
2132 
2133 
2134 static void
2135 nxt_router_send_sw_request(nxt_task_t *task, void *obj, void *data)
2136 {
2137     size_t              size;
2138     uint32_t            stream;
2139     nxt_buf_t           *b;
2140     nxt_app_t           *app;
2141     nxt_port_t          *master_port, *router_port;
2142     nxt_runtime_t       *rt;
2143     nxt_start_worker_t  *sw;
2144 
2145     sw = obj;
2146     app = sw->app;
2147 
2148     nxt_queue_insert_tail(&app->requests, &sw->ra->link);
2149 
2150     if (app->workers + app->pending_workers >= app->max_workers) {
2151         nxt_debug(task, "app '%V' %p %uD/%uD running/penging workers, "
2152                   "post sw #%uxD release to %p", &app->name, app,
2153                   "sw %p release", &app->name, app,
2154                    app->workers, app->pending_workers, sw);
2155 
2156         nxt_router_sw_release(task, sw);
2157 
2158         return;
2159     }
2160 
2161     app->pending_workers++;
2162 
2163     nxt_debug(task, "sw %p send", sw);
2164 
2165     rt = task->thread->runtime;
2166     master_port = rt->port_by_type[NXT_PROCESS_MASTER];
2167     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2168 
2169     size = app->name.length + 1 + app->conf.length;
2170 
2171     b = nxt_buf_mem_alloc(master_port->mem_pool, size, 0);
2172 
2173     nxt_buf_cpystr(b, &app->name);
2174     *b->mem.free++ = '\0';
2175     nxt_buf_cpystr(b, &app->conf);
2176 
2177     stream = nxt_port_rpc_register_handler(task, router_port,
2178                                            nxt_router_sw_ready,
2179                                            nxt_router_sw_error,
2180                                            master_port->pid, sw);
2181 
2182     nxt_port_socket_write(task, master_port, NXT_PORT_MSG_START_WORKER, -1,
2183                           stream, router_port->id, b);
2184 }
2185 
2186 
2187 static nxt_bool_t
2188 nxt_router_app_free(nxt_task_t *task, nxt_app_t *app)
2189 {
2190     nxt_queue_link_t    *lnk;
2191     nxt_req_app_link_t  *ra;
2192 
2193     nxt_thread_log_debug("app '%V' %p state: %d/%uD/%uD/%d", &app->name, app,
2194                          app->live, app->workers, app->pending_workers,
2195                          nxt_queue_is_empty(&app->requests));
2196 
2197     if (app->live == 0 && app->workers == 0 &&
2198         app->pending_workers == 0 &&
2199         nxt_queue_is_empty(&app->requests)) {
2200 
2201         nxt_thread_mutex_destroy(&app->mutex);
2202         nxt_free(app);
2203 
2204         return 1;
2205     }
2206 
2207     if (app->live == 1 && nxt_queue_is_empty(&app->requests) == 0 &&
2208        (app->workers + app->pending_workers < app->max_workers)) {
2209 
2210         lnk = nxt_queue_first(&app->requests);
2211         nxt_queue_remove(lnk);
2212 
2213         ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link);
2214 
2215         nxt_router_sw_create(task, app, ra);
2216     }
2217 
2218     return 0;
2219 }
2220 
2221 
2222 static nxt_port_t *
2223 nxt_router_app_get_port(nxt_app_t *app, uint32_t req_id)
2224 {
2225     nxt_port_t        *port;
2226     nxt_queue_link_t  *lnk;
2227 
2228     port = NULL;
2229 
2230     nxt_thread_mutex_lock(&app->mutex);
2231 
2232     if (!nxt_queue_is_empty(&app->ports)) {
2233         lnk = nxt_queue_first(&app->ports);
2234         nxt_queue_remove(lnk);
2235 
2236         lnk->next = NULL;
2237 
2238         port = nxt_queue_link_data(lnk, nxt_port_t, app_link);
2239 
2240         port->app_req_id = req_id;
2241     }
2242 
2243     nxt_thread_mutex_unlock(&app->mutex);
2244 
2245     return port;
2246 }
2247 
2248 
2249 static void
2250 nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data)
2251 {
2252     nxt_app_t            *app;
2253     nxt_port_t           *port;
2254     nxt_work_t           *work;
2255     nxt_queue_link_t     *lnk;
2256     nxt_req_app_link_t   *ra;
2257 
2258     port = obj;
2259     app = data;
2260 
2261     nxt_assert(app != NULL);
2262     nxt_assert(app == port->app);
2263     nxt_assert(port->app_link.next == NULL);
2264 
2265 
2266     if (task->thread->engine != port->engine) {
2267         work = &port->work;
2268 
2269         nxt_debug(task, "post release port to engine %p", port->engine);
2270 
2271         work->next = NULL;
2272         work->handler = nxt_router_app_release_port;
2273         work->task = &port->engine->task;
2274         work->obj = port;
2275         work->data = app;
2276 
2277         nxt_event_engine_post(port->engine, work);
2278 
2279         return;
2280     }
2281 
2282     if (!nxt_queue_is_empty(&app->requests)) {
2283         lnk = nxt_queue_first(&app->requests);
2284         nxt_queue_remove(lnk);
2285 
2286         ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link);
2287 
2288         nxt_debug(task, "app '%V' %p process next request #%uxD",
2289                   &app->name, app, ra->req_id);
2290 
2291         ra->app_port = port;
2292         port->app_req_id = ra->req_id;
2293 
2294         nxt_router_process_http_request_mp(task, ra, port);
2295 
2296         nxt_router_ra_release(task, ra, ra->work.data);
2297 
2298         return;
2299     }
2300 
2301     port->app_req_id = 0;
2302 
2303     if (port->pair[1] == -1) {
2304         nxt_debug(task, "app '%V' %p port already closed (pid %PI dead?)",
2305                   &app->name, app, port->pid);
2306 
2307         app->workers--;
2308         nxt_router_app_free(task, app);
2309 
2310         port->app = NULL;
2311 
2312         nxt_port_release(port);
2313 
2314         return;
2315     }
2316 
2317     if (!app->live) {
2318         nxt_debug(task, "app '%V' %p is not alive, send QUIT to port",
2319                   &app->name, app);
2320 
2321         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT,
2322                               -1, 0, 0, NULL);
2323 
2324         return;
2325     }
2326 
2327     nxt_debug(task, "app '%V' %p requests queue is empty, keep the port",
2328               &app->name, app);
2329 
2330     nxt_thread_mutex_lock(&app->mutex);
2331 
2332     nxt_queue_insert_head(&app->ports, &port->app_link);
2333 
2334     nxt_thread_mutex_unlock(&app->mutex);
2335 }
2336 
2337 
2338 nxt_bool_t
2339 nxt_router_app_remove_port(nxt_port_t *port)
2340 {
2341     nxt_app_t   *app;
2342     nxt_bool_t  busy;
2343 
2344     app = port->app;
2345     busy = port->app_req_id != 0;
2346 
2347     if (app == NULL) {
2348         nxt_thread_log_debug("port %p app remove, no app", port);
2349 
2350         nxt_assert(port->app_link.next == NULL);
2351 
2352         return 1;
2353     }
2354 
2355     nxt_thread_mutex_lock(&app->mutex);
2356 
2357     if (port->app_link.next != NULL) {
2358 
2359         nxt_queue_remove(&port->app_link);
2360         port->app_link.next = NULL;
2361 
2362     }
2363 
2364     nxt_thread_mutex_unlock(&app->mutex);
2365 
2366     if (busy == 0) {
2367         nxt_thread_log_debug("port %p app remove, free, app '%V' %p", port,
2368                              &app->name, app);
2369 
2370         app->workers--;
2371         nxt_router_app_free(&port->engine->task, app);
2372 
2373         return 1;
2374     }
2375 
2376     nxt_thread_log_debug("port %p app remove, busy, app '%V' %p, req #%uxD",
2377                          port, &app->name, app, port->app_req_id);
2378 
2379     return 0;
2380 }
2381 
2382 
2383 static nxt_int_t
2384 nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
2385 {
2386     nxt_app_t                *app;
2387     nxt_conn_t               *c;
2388     nxt_port_t               *port;
2389     nxt_start_worker_t       *sw;
2390     nxt_socket_conf_joint_t  *joint;
2391 
2392     port = NULL;
2393     c = ra->rc->conn;
2394 
2395     joint = c->listen->socket.data;
2396     app = joint->socket_conf->application;
2397 
2398     if (app == NULL) {
2399         nxt_router_gen_error(task, c, 500,
2400                              "Application is NULL in socket_conf");
2401         return NXT_ERROR;
2402     }
2403 
2404 
2405     port = nxt_router_app_get_port(app, ra->req_id);
2406 
2407     if (port != NULL) {
2408         nxt_debug(task, "already have port for app '%V'", &app->name);
2409 
2410         ra->app_port = port;
2411         return NXT_OK;
2412     }
2413 
2414     sw = nxt_router_sw_create(task, app, ra);
2415 
2416     if (nxt_slow_path(sw == NULL)) {
2417         nxt_router_gen_error(task, c, 500,
2418                              "Failed to allocate start worker struct");
2419         return NXT_ERROR;
2420     }
2421 
2422     return NXT_AGAIN;
2423 }
2424 
2425 
2426 static void
2427 nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data)
2428 {
2429     size_t                    size;
2430     nxt_int_t                 ret;
2431     nxt_buf_t                 *buf;
2432     nxt_conn_t                *c;
2433     nxt_app_parse_ctx_t       *ap;
2434     nxt_app_request_body_t    *b;
2435     nxt_socket_conf_joint_t   *joint;
2436     nxt_app_request_header_t  *h;
2437 
2438     c = obj;
2439     ap = data;
2440     buf = c->read;
2441     joint = c->listen->socket.data;
2442 
2443     nxt_debug(task, "router conn http header parse");
2444 
2445     if (ap == NULL) {
2446         ap = nxt_mp_zalloc(c->mem_pool, sizeof(nxt_app_parse_ctx_t));
2447         if (nxt_slow_path(ap == NULL)) {
2448             nxt_router_conn_close(task, c, data);
2449             return;
2450         }
2451 
2452         ret = nxt_app_http_req_init(task, ap);
2453         if (nxt_slow_path(ret != NXT_OK)) {
2454             nxt_router_conn_close(task, c, data);
2455             return;
2456         }
2457 
2458         c->socket.data = ap;
2459 
2460         ap->r.remote.start = nxt_sockaddr_address(c->remote);
2461         ap->r.remote.length = c->remote->address_length;
2462 
2463         ap->r.header.buf = buf;
2464     }
2465 
2466     h = &ap->r.header;
2467     b = &ap->r.body;
2468 
2469     ret = nxt_app_http_req_header_parse(task, ap, buf);
2470 
2471     nxt_debug(task, "http parse request header: %d", ret);
2472 
2473     switch (nxt_expect(NXT_DONE, ret)) {
2474 
2475     case NXT_DONE:
2476         nxt_debug(task, "router request header parsing complete, "
2477                   "content length: %O, preread: %uz",
2478                   h->parsed_content_length, nxt_buf_mem_used_size(&buf->mem));
2479 
2480         if (b->done) {
2481             nxt_router_process_http_request(task, c, ap);
2482 
2483             return;
2484         }
2485 
2486         if (joint->socket_conf->max_body_size > 0 &&
2487             (size_t) h->parsed_content_length >
2488             joint->socket_conf->max_body_size) {
2489 
2490             nxt_router_gen_error(task, c, 413, "Content-Length too big");
2491             return;
2492         }
2493 
2494         if (nxt_buf_mem_free_size(&buf->mem) == 0) {
2495             size = nxt_min(joint->socket_conf->body_buffer_size,
2496                            (size_t) h->parsed_content_length);
2497 
2498             buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0);
2499             if (nxt_slow_path(buf->next == NULL)) {
2500                 nxt_router_gen_error(task, c, 500, "Failed to allocate "
2501                                      "buffer for request body");
2502                 return;
2503             }
2504 
2505             c->read = buf->next;
2506 
2507             b->preread_size += nxt_buf_mem_used_size(&buf->mem);
2508         }
2509 
2510         if (b->buf == NULL) {
2511             b->buf = c->read;
2512         }
2513 
2514         c->read_state = &nxt_router_conn_read_body_state;
2515         break;
2516 
2517     case NXT_ERROR:
2518         nxt_router_gen_error(task, c, 400, "Request header parse error");
2519         return;
2520 
2521     default:  /* NXT_AGAIN */
2522 
2523         if (c->read->mem.free == c->read->mem.end) {
2524             size = joint->socket_conf->large_header_buffer_size;
2525 
2526             if (size <= (size_t) nxt_buf_mem_used_size(&buf->mem) ||
2527                 ap->r.header.bufs >= joint->socket_conf->large_header_buffers) {
2528                 nxt_router_gen_error(task, c, 413,
2529                                      "Too long request headers");
2530                 return;
2531             }
2532 
2533             buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0);
2534             if (nxt_slow_path(buf->next == NULL)) {
2535                 nxt_router_gen_error(task, c, 500,
2536                                      "Failed to allocate large header "
2537                                      "buffer");
2538                 return;
2539             }
2540 
2541             ap->r.header.bufs++;
2542 
2543             size = c->read->mem.free - c->read->mem.pos;
2544 
2545             c->read = nxt_buf_cpy(buf->next, c->read->mem.pos, size);
2546         }
2547 
2548     }
2549 
2550     nxt_conn_read(task->thread->engine, c);
2551 }
2552 
2553 
2554 static void
2555 nxt_router_conn_http_body_read(nxt_task_t *task, void *obj, void *data)
2556 {
2557     size_t                    size;
2558     nxt_int_t                 ret;
2559     nxt_buf_t                 *buf;
2560     nxt_conn_t                *c;
2561     nxt_app_parse_ctx_t       *ap;
2562     nxt_app_request_body_t    *b;
2563     nxt_socket_conf_joint_t   *joint;
2564     nxt_app_request_header_t  *h;
2565 
2566     c = obj;
2567     ap = data;
2568     buf = c->read;
2569 
2570     nxt_debug(task, "router conn http body read");
2571 
2572     nxt_assert(ap != NULL);
2573 
2574     b = &ap->r.body;
2575     h = &ap->r.header;
2576 
2577     ret = nxt_app_http_req_body_read(task, ap, buf);
2578 
2579     nxt_debug(task, "http read request body: %d", ret);
2580 
2581     switch (nxt_expect(NXT_DONE, ret)) {
2582 
2583     case NXT_DONE:
2584         nxt_router_process_http_request(task, c, ap);
2585         return;
2586 
2587     case NXT_ERROR:
2588         nxt_router_gen_error(task, c, 500, "Read body error");
2589         return;
2590 
2591     default:  /* NXT_AGAIN */
2592 
2593         if (nxt_buf_mem_free_size(&buf->mem) == 0) {
2594             joint = c->listen->socket.data;
2595 
2596             b->preread_size += nxt_buf_mem_used_size(&buf->mem);
2597 
2598             size = nxt_min(joint->socket_conf->body_buffer_size,
2599                            (size_t) h->parsed_content_length - b->preread_size);
2600 
2601             buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0);
2602             if (nxt_slow_path(buf->next == NULL)) {
2603                 nxt_router_gen_error(task, c, 500, "Failed to allocate "
2604                                      "buffer for request body");
2605                 return;
2606             }
2607 
2608             c->read = buf->next;
2609         }
2610 
2611         nxt_debug(task, "router request body read again, rest: %uz",
2612                   h->parsed_content_length - b->preread_size);
2613     }
2614 
2615     nxt_conn_read(task->thread->engine, c);
2616 }
2617 
2618 
2619 static void
2620 nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
2621     nxt_app_parse_ctx_t *ap)
2622 {
2623     nxt_mp_t             *port_mp;
2624     nxt_int_t            res;
2625     nxt_port_t           *port;
2626     nxt_req_id_t         req_id;
2627     nxt_event_engine_t   *engine;
2628     nxt_req_app_link_t   *ra;
2629     nxt_req_conn_link_t  *rc;
2630 
2631     engine = task->thread->engine;
2632 
2633     do {
2634         req_id = nxt_random(&task->thread->random);
2635     } while (nxt_event_engine_request_find(engine, req_id) != NULL);
2636 
2637     rc = nxt_conn_request_add(c, req_id);
2638 
2639     if (nxt_slow_path(rc == NULL)) {
2640         nxt_router_gen_error(task, c, 500, "Failed to allocate "
2641                              "req->conn link");
2642 
2643         return;
2644     }
2645 
2646     nxt_event_engine_request_add(engine, rc);
2647 
2648     nxt_debug(task, "req_id %uxD linked to conn %p at engine %p",
2649               req_id, c, engine);
2650 
2651 
2652     ra = nxt_router_ra_create(task, rc);
2653 
2654     ra->ap = ap;
2655     ra->reply_port = engine->port;
2656 
2657     res = nxt_router_app_port(task, ra);
2658 
2659     if (res != NXT_OK) {
2660         return;
2661     }
2662 
2663     port = ra->app_port;
2664 
2665     if (nxt_slow_path(port == NULL)) {
2666         nxt_router_gen_error(task, rc->conn, 500, "Application port not found");
2667         return;
2668     }
2669 
2670     port_mp = port->mem_pool;
2671     port->mem_pool = c->mem_pool;
2672 
2673     nxt_router_process_http_request_mp(task, ra, port);
2674 
2675     port->mem_pool = port_mp;
2676 
2677 
2678     nxt_router_ra_release(task, ra, ra->work.data);
2679 }
2680 
2681 
2682 static void
2683 nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra,
2684     nxt_port_t *port)
2685 {
2686     nxt_int_t            res;
2687     nxt_port_t           *c_port, *reply_port;
2688     nxt_conn_t           *c;
2689     nxt_app_wmsg_t       wmsg;
2690     nxt_app_parse_ctx_t  *ap;
2691 
2692     reply_port = ra->reply_port;
2693     ap = ra->ap;
2694     c = ra->rc->conn;
2695 
2696     c_port = nxt_process_connected_port_find(port->process, reply_port->pid,
2697                                              reply_port->id);
2698     if (nxt_slow_path(c_port != reply_port)) {
2699         res = nxt_port_send_port(task, port, reply_port, 0);
2700 
2701         if (nxt_slow_path(res != NXT_OK)) {
2702             nxt_router_gen_error(task, c, 500,
2703                                  "Failed to send reply port to application");
2704             return;
2705         }
2706 
2707         nxt_process_connected_port_add(port->process, reply_port);
2708     }
2709 
2710     wmsg.port = port;
2711     wmsg.write = NULL;
2712     wmsg.buf = &wmsg.write;
2713     wmsg.stream = ra->req_id;
2714 
2715     res = port->app->prepare_msg(task, &ap->r, &wmsg);
2716 
2717     if (nxt_slow_path(res != NXT_OK)) {
2718         nxt_router_gen_error(task, c, 500,
2719                              "Failed to prepare message for application");
2720         return;
2721     }
2722 
2723     nxt_debug(task, "about to send %d bytes buffer to worker port %d",
2724                     nxt_buf_used_size(wmsg.write),
2725                     wmsg.port->socket.fd);
2726 
2727     res = nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA,
2728                                  -1, ra->req_id, reply_port->id, wmsg.write);
2729 
2730     if (nxt_slow_path(res != NXT_OK)) {
2731         nxt_router_gen_error(task, c, 500,
2732                              "Failed to send message to application");
2733         return;
2734     }
2735 }
2736 
2737 
2738 static nxt_int_t
2739 nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
2740     nxt_app_wmsg_t *wmsg)
2741 {
2742     nxt_int_t                 rc;
2743     nxt_buf_t                 *b;
2744     nxt_http_field_t          *field;
2745     nxt_app_request_header_t  *h;
2746 
2747     static const nxt_str_t prefix = nxt_string("HTTP_");
2748     static const nxt_str_t eof = nxt_null_string;
2749 
2750     h = &r->header;
2751 
2752 #define RC(S)                                                                 \
2753     do {                                                                      \
2754         rc = (S);                                                             \
2755         if (nxt_slow_path(rc != NXT_OK)) {                                    \
2756             goto fail;                                                        \
2757         }                                                                     \
2758     } while(0)
2759 
2760 #define NXT_WRITE(N)                                                          \
2761     RC(nxt_app_msg_write_str(task, wmsg, N))
2762 
2763     /* TODO error handle, async mmap buffer assignment */
2764 
2765     NXT_WRITE(&h->method);
2766     NXT_WRITE(&h->target);
2767     if (h->path.start == h->target.start) {
2768         NXT_WRITE(&eof);
2769     } else {
2770         NXT_WRITE(&h->path);
2771     }
2772 
2773     if (h->query.start != NULL) {
2774         RC(nxt_app_msg_write_size(task, wmsg,
2775                                   h->query.start - h->target.start + 1));
2776     } else {
2777         RC(nxt_app_msg_write_size(task, wmsg, 0));
2778     }
2779 
2780     NXT_WRITE(&h->version);
2781 
2782     NXT_WRITE(&r->remote);
2783 
2784     NXT_WRITE(&h->host);
2785     NXT_WRITE(&h->content_type);
2786     NXT_WRITE(&h->content_length);
2787 
2788     nxt_list_each(field, h->fields) {
2789         RC(nxt_app_msg_write_prefixed_upcase(task, wmsg,
2790                                              &prefix, &field->name));
2791         NXT_WRITE(&field->value);
2792 
2793     } nxt_list_loop;
2794 
2795     /* end-of-headers mark */
2796     NXT_WRITE(&eof);
2797 
2798     RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size));
2799 
2800     for(b = r->body.buf; b != NULL; b = b->next) {
2801         RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos,
2802                                  nxt_buf_mem_used_size(&b->mem)));
2803     }
2804 
2805 #undef NXT_WRITE
2806 #undef RC
2807 
2808     return NXT_OK;
2809 
2810 fail:
2811 
2812     return NXT_ERROR;
2813 }
2814 
2815 
2816 static nxt_int_t
2817 nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
2818     nxt_app_wmsg_t *wmsg)
2819 {
2820     nxt_int_t                 rc;
2821     nxt_buf_t                 *b;
2822     nxt_http_field_t          *field;
2823     nxt_app_request_header_t  *h;
2824 
2825     static const nxt_str_t prefix = nxt_string("HTTP_");
2826     static const nxt_str_t eof = nxt_null_string;
2827 
2828     h = &r->header;
2829 
2830 #define RC(S)                                                                 \
2831     do {                                                                      \
2832         rc = (S);                                                             \
2833         if (nxt_slow_path(rc != NXT_OK)) {                                    \
2834             goto fail;                                                        \
2835         }                                                                     \
2836     } while(0)
2837 
2838 #define NXT_WRITE(N)                                                          \
2839     RC(nxt_app_msg_write_str(task, wmsg, N))
2840 
2841     /* TODO error handle, async mmap buffer assignment */
2842 
2843     NXT_WRITE(&h->method);
2844     NXT_WRITE(&h->target);
2845     if (h->path.start == h->target.start) {
2846         NXT_WRITE(&eof);
2847     } else {
2848         NXT_WRITE(&h->path);
2849     }
2850 
2851     if (h->query.start != NULL) {
2852         RC(nxt_app_msg_write_size(task, wmsg,
2853                                   h->query.start - h->target.start + 1));
2854     } else {
2855         RC(nxt_app_msg_write_size(task, wmsg, 0));
2856     }
2857 
2858     NXT_WRITE(&h->version);
2859 
2860     // PHP_SELF
2861     // SCRIPT_NAME
2862     // SCRIPT_FILENAME
2863     // DOCUMENT_ROOT
2864 
2865     NXT_WRITE(&r->remote);
2866 
2867     NXT_WRITE(&h->host);
2868     NXT_WRITE(&h->cookie);
2869     NXT_WRITE(&h->content_type);
2870     NXT_WRITE(&h->content_length);
2871 
2872     RC(nxt_app_msg_write_size(task, wmsg, h->parsed_content_length));
2873 
2874     nxt_list_each(field, h->fields) {
2875         RC(nxt_app_msg_write_prefixed_upcase(task, wmsg,
2876                                              &prefix, &field->name));
2877         NXT_WRITE(&field->value);
2878 
2879     } nxt_list_loop;
2880 
2881     /* end-of-headers mark */
2882     NXT_WRITE(&eof);
2883 
2884     RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size));
2885 
2886     for(b = r->body.buf; b != NULL; b = b->next) {
2887         RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos,
2888                                  nxt_buf_mem_used_size(&b->mem)));
2889     }
2890 
2891 #undef NXT_WRITE
2892 #undef RC
2893 
2894     return NXT_OK;
2895 
2896 fail:
2897 
2898     return NXT_ERROR;
2899 }
2900 
2901 
2902 static nxt_int_t
2903 nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, nxt_app_wmsg_t *wmsg)
2904 {
2905     nxt_int_t                 rc;
2906     nxt_buf_t                 *b;
2907     nxt_http_field_t          *field;
2908     nxt_app_request_header_t  *h;
2909 
2910     static const nxt_str_t eof = nxt_null_string;
2911 
2912     h = &r->header;
2913 
2914 #define RC(S)                                                                 \
2915     do {                                                                      \
2916         rc = (S);                                                             \
2917         if (nxt_slow_path(rc != NXT_OK)) {                                    \
2918             goto fail;                                                        \
2919         }                                                                     \
2920     } while(0)
2921 
2922 #define NXT_WRITE(N)                                                          \
2923     RC(nxt_app_msg_write_str(task, wmsg, N))
2924 
2925     /* TODO error handle, async mmap buffer assignment */
2926 
2927     NXT_WRITE(&h->method);
2928     NXT_WRITE(&h->target);
2929     if (h->path.start == h->target.start) {
2930         NXT_WRITE(&eof);
2931     } else {
2932         NXT_WRITE(&h->path);
2933     }
2934 
2935     if (h->query.start != NULL) {
2936         RC(nxt_app_msg_write_size(task, wmsg,
2937                                   h->query.start - h->target.start + 1));
2938     } else {
2939         RC(nxt_app_msg_write_size(task, wmsg, 0));
2940     }
2941 
2942     NXT_WRITE(&h->version);
2943 
2944     NXT_WRITE(&h->host);
2945     NXT_WRITE(&h->cookie);
2946     NXT_WRITE(&h->content_type);
2947     NXT_WRITE(&h->content_length);
2948 
2949     RC(nxt_app_msg_write_size(task, wmsg, h->parsed_content_length));
2950 
2951     nxt_list_each(field, h->fields) {
2952         NXT_WRITE(&field->name);
2953         NXT_WRITE(&field->value);
2954 
2955     } nxt_list_loop;
2956 
2957     /* end-of-headers mark */
2958     NXT_WRITE(&eof);
2959 
2960     RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size));
2961 
2962     for(b = r->body.buf; b != NULL; b = b->next) {
2963         RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos,
2964                                  nxt_buf_mem_used_size(&b->mem)));
2965     }
2966 
2967 #undef NXT_WRITE
2968 #undef RC
2969 
2970     return NXT_OK;
2971 
2972 fail:
2973 
2974     return NXT_ERROR;
2975 }
2976 
2977 
2978 static const nxt_conn_state_t  nxt_router_conn_close_state
2979     nxt_aligned(64) =
2980 {
2981     .ready_handler = nxt_router_conn_free,
2982 };
2983 
2984 
2985 static void
2986 nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data)
2987 {
2988     nxt_buf_t         *b;
2989     nxt_bool_t        last;
2990     nxt_conn_t        *c;
2991     nxt_work_queue_t  *wq;
2992 
2993     nxt_debug(task, "router conn ready %p", obj);
2994 
2995     c = obj;
2996     b = c->write;
2997 
2998     wq = &task->thread->engine->fast_work_queue;
2999 
3000     last = 0;
3001 
3002     while (b != NULL) {
3003         if (!nxt_buf_is_sync(b)) {
3004             if (nxt_buf_used_size(b) > 0) {
3005                 break;
3006             }
3007         }
3008 
3009         if (nxt_buf_is_last(b)) {
3010             last = 1;
3011         }
3012 
3013         nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
3014 
3015         b = b->next;
3016     }
3017 
3018     c->write = b;
3019 
3020     if (b != NULL) {
3021         nxt_debug(task, "router conn %p has more data to write", obj);
3022 
3023         nxt_conn_write(task->thread->engine, c);
3024     } else {
3025         nxt_debug(task, "router conn %p no more data to write, last = %d", obj,
3026                   last);
3027 
3028         if (last != 0) {
3029             nxt_debug(task, "enqueue router conn close %p (ready handler)", c);
3030 
3031             nxt_work_queue_add(wq, nxt_router_conn_close, task, c,
3032                                c->socket.data);
3033         }
3034     }
3035 }
3036 
3037 
3038 static void
3039 nxt_router_conn_close(nxt_task_t *task, void *obj, void *data)
3040 {
3041     nxt_conn_t  *c;
3042 
3043     c = obj;
3044 
3045     nxt_debug(task, "router conn close");
3046 
3047     c->write_state = &nxt_router_conn_close_state;
3048 
3049     nxt_conn_close(task->thread->engine, c);
3050 }
3051 
3052 
3053 static void
3054 nxt_router_conn_mp_cleanup(nxt_task_t *task, void *obj, void *data)
3055 {
3056     nxt_socket_conf_joint_t  *joint;
3057 
3058     joint = obj;
3059 
3060     nxt_router_conf_release(task, joint);
3061 }
3062 
3063 
3064 static void
3065 nxt_router_conn_free(nxt_task_t *task, void *obj, void *data)
3066 {
3067     nxt_conn_t               *c;
3068     nxt_req_conn_link_t      *rc;
3069     nxt_socket_conf_joint_t  *joint;
3070 
3071     c = obj;
3072 
3073     nxt_debug(task, "router conn close done");
3074 
3075     nxt_queue_each(rc, &c->requests, nxt_req_conn_link_t, link) {
3076 
3077         nxt_debug(task, "conn %p close, req %uxD", c, rc->req_id);
3078 
3079         if (rc->app_port != NULL) {
3080             nxt_router_app_release_port(task, rc->app_port, rc->app_port->app);
3081 
3082             rc->app_port = NULL;
3083         }
3084 
3085         rc->conn = NULL;
3086 
3087         nxt_event_engine_request_remove(task->thread->engine, rc);
3088 
3089     } nxt_queue_loop;
3090 
3091     nxt_queue_remove(&c->link);
3092 
3093     joint = c->listen->socket.data;
3094 
3095     task = &task->thread->engine->task;
3096 
3097     nxt_mp_cleanup(c->mem_pool, nxt_router_conn_mp_cleanup, task, joint, NULL);
3098 
3099     nxt_mp_release(c->mem_pool, c);
3100 }
3101 
3102 
3103 static void
3104 nxt_router_conn_error(nxt_task_t *task, void *obj, void *data)
3105 {
3106     nxt_conn_t  *c;
3107 
3108     c = obj;
3109 
3110     nxt_debug(task, "router conn error");
3111 
3112     c->write_state = &nxt_router_conn_close_state;
3113 
3114     nxt_conn_close(task->thread->engine, c);
3115 }
3116 
3117 
3118 static void
3119 nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data)
3120 {
3121     nxt_conn_t   *c;
3122     nxt_timer_t  *timer;
3123 
3124     timer = obj;
3125 
3126     nxt_debug(task, "router conn timeout");
3127 
3128     c = nxt_read_timer_conn(timer);
3129 
3130     if (c->read_state == &nxt_router_conn_read_header_state) {
3131         nxt_router_gen_error(task, c, 408, "Read header timeout");
3132 
3133     } else {
3134         nxt_router_gen_error(task, c, 408, "Read body timeout");
3135     }
3136 }
3137 
3138 
3139 static nxt_msec_t
3140 nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data)
3141 {
3142     nxt_socket_conf_joint_t  *joint;
3143 
3144     joint = c->listen->socket.data;
3145 
3146     return nxt_value_at(nxt_msec_t, joint->socket_conf, data);
3147 }
3148