xref: /unit/src/nxt_router.c (revision 1780:73699f41c956)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Valentin V. Bartenev
5  * Copyright (C) NGINX, Inc.
6  */
7 
8 #include <nxt_router.h>
9 #include <nxt_conf.h>
10 #if (NXT_TLS)
11 #include <nxt_cert.h>
12 #endif
13 #include <nxt_http.h>
14 #include <nxt_port_memory_int.h>
15 #include <nxt_unit_request.h>
16 #include <nxt_unit_response.h>
17 #include <nxt_router_request.h>
18 #include <nxt_app_queue.h>
19 #include <nxt_port_queue.h>
20 
21 typedef struct {
22     nxt_str_t         type;
23     uint32_t          processes;
24     uint32_t          max_processes;
25     uint32_t          spare_processes;
26     nxt_msec_t        timeout;
27     nxt_msec_t        idle_timeout;
28     uint32_t          requests;
29     nxt_conf_value_t  *limits_value;
30     nxt_conf_value_t  *processes_value;
31     nxt_conf_value_t  *targets_value;
32 } nxt_router_app_conf_t;
33 
34 
35 typedef struct {
36     nxt_str_t         pass;
37     nxt_str_t         application;
38 } nxt_router_listener_conf_t;
39 
40 
41 #if (NXT_TLS)
42 
43 typedef struct {
44     nxt_str_t          name;
45     nxt_socket_conf_t  *conf;
46 
47     nxt_queue_link_t   link;  /* for nxt_socket_conf_t.tls */
48 } nxt_router_tlssock_t;
49 
50 #endif
51 
52 
53 typedef struct {
54     nxt_socket_conf_t       *socket_conf;
55     nxt_router_temp_conf_t  *temp_conf;
56 } nxt_socket_rpc_t;
57 
58 
59 typedef struct {
60     nxt_app_t               *app;
61     nxt_router_temp_conf_t  *temp_conf;
62 } nxt_app_rpc_t;
63 
64 
65 static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process,
66     nxt_mp_t *mp);
67 static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data);
68 static void nxt_router_greet_controller(nxt_task_t *task,
69     nxt_port_t *controller_port);
70 
71 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
72 
73 static void nxt_router_new_port_handler(nxt_task_t *task,
74     nxt_port_recv_msg_t *msg);
75 static void nxt_router_conf_data_handler(nxt_task_t *task,
76     nxt_port_recv_msg_t *msg);
77 static void nxt_router_remove_pid_handler(nxt_task_t *task,
78     nxt_port_recv_msg_t *msg);
79 static void nxt_router_access_log_reopen_handler(nxt_task_t *task,
80     nxt_port_recv_msg_t *msg);
81 
82 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
83 static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
84 static void nxt_router_conf_ready(nxt_task_t *task,
85     nxt_router_temp_conf_t *tmcf);
86 static void nxt_router_conf_error(nxt_task_t *task,
87     nxt_router_temp_conf_t *tmcf);
88 static void nxt_router_conf_send(nxt_task_t *task,
89     nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
90 
91 static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
92     nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
93 static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task,
94     nxt_router_conf_t *rtcf, nxt_conf_value_t *conf);
95 
96 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
97 static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data);
98 static nxt_int_t nxt_router_apps_hash_add(nxt_router_conf_t *rtcf,
99     nxt_app_t *app);
100 static nxt_app_t *nxt_router_apps_hash_get(nxt_router_conf_t *rtcf,
101     nxt_str_t *name);
102 static void nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf,
103     int i);
104 
105 static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task,
106     nxt_port_t *port);
107 static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task,
108     nxt_port_t *port);
109 static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task,
110     nxt_port_t *port, nxt_fd_t fd);
111 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
112     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
113 static void nxt_router_listen_socket_ready(nxt_task_t *task,
114     nxt_port_recv_msg_t *msg, void *data);
115 static void nxt_router_listen_socket_error(nxt_task_t *task,
116     nxt_port_recv_msg_t *msg, void *data);
117 #if (NXT_TLS)
118 static void nxt_router_tls_rpc_create(nxt_task_t *task,
119     nxt_router_temp_conf_t *tmcf, nxt_router_tlssock_t *tls);
120 static void nxt_router_tls_rpc_handler(nxt_task_t *task,
121     nxt_port_recv_msg_t *msg, void *data);
122 #endif
123 static void nxt_router_app_rpc_create(nxt_task_t *task,
124     nxt_router_temp_conf_t *tmcf, nxt_app_t *app);
125 static void nxt_router_app_prefork_ready(nxt_task_t *task,
126     nxt_port_recv_msg_t *msg, void *data);
127 static void nxt_router_app_prefork_error(nxt_task_t *task,
128     nxt_port_recv_msg_t *msg, void *data);
129 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
130     nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
131 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
132     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa);
133 
134 static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
135     nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
136     const nxt_event_interface_t *interface);
137 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
138     nxt_router_engine_conf_t *recf);
139 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
140     nxt_router_engine_conf_t *recf);
141 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
142     nxt_router_engine_conf_t *recf);
143 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
144     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
145     nxt_work_handler_t handler);
146 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
147     nxt_router_engine_conf_t *recf);
148 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
149     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
150 
151 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
152     nxt_router_temp_conf_t *tmcf);
153 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
154     nxt_event_engine_t *engine);
155 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
156     nxt_router_temp_conf_t *tmcf);
157 
158 static void nxt_router_engines_post(nxt_router_t *router,
159     nxt_router_temp_conf_t *tmcf);
160 static void nxt_router_engine_post(nxt_event_engine_t *engine,
161     nxt_work_t *jobs);
162 
163 static void nxt_router_thread_start(void *data);
164 static void nxt_router_rt_add_port(nxt_task_t *task, void *obj,
165     void *data);
166 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
167     void *data);
168 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
169     void *data);
170 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
171     void *data);
172 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
173     void *data);
174 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
175     void *data);
176 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
177     void *data);
178 static void nxt_router_req_headers_ack_handler(nxt_task_t *task,
179     nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data);
180 static void nxt_router_listen_socket_release(nxt_task_t *task,
181     nxt_socket_conf_t *skcf);
182 
183 static void nxt_router_access_log_writer(nxt_task_t *task,
184     nxt_http_request_t *r, nxt_router_access_log_t *access_log);
185 static u_char *nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now,
186     struct tm *tm, size_t size, const char *format);
187 static void nxt_router_access_log_open(nxt_task_t *task,
188     nxt_router_temp_conf_t *tmcf);
189 static void nxt_router_access_log_ready(nxt_task_t *task,
190     nxt_port_recv_msg_t *msg, void *data);
191 static void nxt_router_access_log_error(nxt_task_t *task,
192     nxt_port_recv_msg_t *msg, void *data);
193 static void nxt_router_access_log_release(nxt_task_t *task,
194     nxt_thread_spinlock_t *lock, nxt_router_access_log_t *access_log);
195 static void nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj,
196     void *data);
197 static void nxt_router_access_log_reopen_ready(nxt_task_t *task,
198     nxt_port_recv_msg_t *msg, void *data);
199 static void nxt_router_access_log_reopen_error(nxt_task_t *task,
200     nxt_port_recv_msg_t *msg, void *data);
201 
202 static void nxt_router_app_port_ready(nxt_task_t *task,
203     nxt_port_recv_msg_t *msg, void *data);
204 static nxt_int_t nxt_router_app_shared_port_send(nxt_task_t *task,
205     nxt_port_t *app_port);
206 static void nxt_router_app_port_error(nxt_task_t *task,
207     nxt_port_recv_msg_t *msg, void *data);
208 
209 static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i);
210 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
211 
212 static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
213     nxt_apr_action_t action);
214 static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
215     nxt_request_rpc_data_t *req_rpc_data);
216 static void nxt_router_http_request_error(nxt_task_t *task, void *obj,
217     void *data);
218 static void nxt_router_http_request_done(nxt_task_t *task, void *obj,
219     void *data);
220 
221 static void nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj,
222     void *data);
223 static void nxt_router_app_prepare_request(nxt_task_t *task,
224     nxt_request_rpc_data_t *req_rpc_data);
225 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task,
226     nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix);
227 
228 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
229 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
230     void *data);
231 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj,
232     void *data);
233 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj,
234     void *data);
235 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data);
236 
237 static const nxt_http_request_state_t  nxt_http_request_send_state;
238 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
239 
240 static void nxt_router_app_joint_use(nxt_task_t *task,
241     nxt_app_joint_t *app_joint, int i);
242 
243 static void nxt_router_http_request_release_post(nxt_task_t *task,
244     nxt_http_request_t *r);
245 static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
246     void *data);
247 static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
248 static void nxt_router_get_port_handler(nxt_task_t *task,
249     nxt_port_recv_msg_t *msg);
250 static void nxt_router_get_mmap_handler(nxt_task_t *task,
251     nxt_port_recv_msg_t *msg);
252 
253 extern const nxt_http_request_state_t  nxt_http_websocket;
254 
255 static nxt_router_t  *nxt_router;
256 
257 static const nxt_str_t http_prefix = nxt_string("HTTP_");
258 static const nxt_str_t empty_prefix = nxt_string("");
259 
260 static const nxt_str_t  *nxt_app_msg_prefix[] = {
261     &empty_prefix,
262     &empty_prefix,
263     &http_prefix,
264     &http_prefix,
265     &http_prefix,
266     &empty_prefix,
267 };
268 
269 
270 static const nxt_port_handlers_t  nxt_router_process_port_handlers = {
271     .quit         = nxt_signal_quit_handler,
272     .new_port     = nxt_router_new_port_handler,
273     .get_port     = nxt_router_get_port_handler,
274     .change_file  = nxt_port_change_log_file_handler,
275     .mmap         = nxt_port_mmap_handler,
276     .get_mmap     = nxt_router_get_mmap_handler,
277     .data         = nxt_router_conf_data_handler,
278     .remove_pid   = nxt_router_remove_pid_handler,
279     .access_log   = nxt_router_access_log_reopen_handler,
280     .rpc_ready    = nxt_port_rpc_handler,
281     .rpc_error    = nxt_port_rpc_handler,
282     .oosm         = nxt_router_oosm_handler,
283 };
284 
285 
286 const nxt_process_init_t  nxt_router_process = {
287     .name           = "router",
288     .type           = NXT_PROCESS_ROUTER,
289     .prefork        = nxt_router_prefork,
290     .restart        = 1,
291     .setup          = nxt_process_core_setup,
292     .start          = nxt_router_start,
293     .port_handlers  = &nxt_router_process_port_handlers,
294     .signals        = nxt_process_signals,
295 };
296 
297 
298 /* Queues of nxt_socket_conf_t */
299 nxt_queue_t  creating_sockets;
300 nxt_queue_t  pending_sockets;
301 nxt_queue_t  updating_sockets;
302 nxt_queue_t  keeping_sockets;
303 nxt_queue_t  deleting_sockets;
304 
305 
306 static nxt_int_t
307 nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp)
308 {
309     nxt_runtime_stop_app_processes(task, task->thread->runtime);
310 
311     return NXT_OK;
312 }
313 
314 
315 static nxt_int_t
316 nxt_router_start(nxt_task_t *task, nxt_process_data_t *data)
317 {
318     nxt_int_t      ret;
319     nxt_port_t     *controller_port;
320     nxt_router_t   *router;
321     nxt_runtime_t  *rt;
322 
323     rt = task->thread->runtime;
324 
325     nxt_log(task, NXT_LOG_INFO, "router started");
326 
327 #if (NXT_TLS)
328     rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL");
329     if (nxt_slow_path(rt->tls == NULL)) {
330         return NXT_ERROR;
331     }
332 
333     ret = rt->tls->library_init(task);
334     if (nxt_slow_path(ret != NXT_OK)) {
335         return ret;
336     }
337 #endif
338 
339     ret = nxt_http_init(task);
340     if (nxt_slow_path(ret != NXT_OK)) {
341         return ret;
342     }
343 
344     router = nxt_zalloc(sizeof(nxt_router_t));
345     if (nxt_slow_path(router == NULL)) {
346         return NXT_ERROR;
347     }
348 
349     nxt_queue_init(&router->engines);
350     nxt_queue_init(&router->sockets);
351     nxt_queue_init(&router->apps);
352 
353     nxt_router = router;
354 
355     controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
356     if (controller_port != NULL) {
357         nxt_router_greet_controller(task, controller_port);
358     }
359 
360     return NXT_OK;
361 }
362 
363 
364 static void
365 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port)
366 {
367     nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY,
368                           -1, 0, 0, NULL);
369 }
370 
371 
372 static void
373 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
374     void *data)
375 {
376     size_t         size;
377     uint32_t       stream;
378     nxt_mp_t       *mp;
379     nxt_int_t      ret;
380     nxt_app_t      *app;
381     nxt_buf_t      *b;
382     nxt_port_t     *main_port;
383     nxt_runtime_t  *rt;
384 
385     app = data;
386 
387     rt = task->thread->runtime;
388     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
389 
390     nxt_debug(task, "app '%V' %p start process", &app->name, app);
391 
392     size = app->name.length + 1 + app->conf.length;
393 
394     b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size);
395 
396     if (nxt_slow_path(b == NULL)) {
397         goto failed;
398     }
399 
400     nxt_buf_cpystr(b, &app->name);
401     *b->mem.free++ = '\0';
402     nxt_buf_cpystr(b, &app->conf);
403 
404     nxt_router_app_joint_use(task, app->joint, 1);
405 
406     stream = nxt_port_rpc_register_handler(task, port,
407                                            nxt_router_app_port_ready,
408                                            nxt_router_app_port_error,
409                                            -1, app->joint);
410 
411     if (nxt_slow_path(stream == 0)) {
412         nxt_router_app_joint_use(task, app->joint, -1);
413 
414         goto failed;
415     }
416 
417     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS,
418                                 -1, stream, port->id, b);
419 
420     if (nxt_slow_path(ret != NXT_OK)) {
421         nxt_port_rpc_cancel(task, port, stream);
422 
423         nxt_router_app_joint_use(task, app->joint, -1);
424 
425         goto failed;
426     }
427 
428     nxt_router_app_use(task, app, -1);
429 
430     return;
431 
432 failed:
433 
434     if (b != NULL) {
435         mp = b->data;
436         nxt_mp_free(mp, b);
437         nxt_mp_release(mp);
438     }
439 
440     nxt_thread_mutex_lock(&app->mutex);
441 
442     app->pending_processes--;
443 
444     nxt_thread_mutex_unlock(&app->mutex);
445 
446     nxt_router_app_use(task, app, -1);
447 }
448 
449 
450 static void
451 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i)
452 {
453     app_joint->use_count += i;
454 
455     if (app_joint->use_count == 0) {
456         nxt_assert(app_joint->app == NULL);
457 
458         nxt_free(app_joint);
459     }
460 }
461 
462 
463 static nxt_int_t
464 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
465 {
466     nxt_int_t      res;
467     nxt_port_t     *router_port;
468     nxt_runtime_t  *rt;
469 
470     nxt_debug(task, "app '%V' start process", &app->name);
471 
472     rt = task->thread->runtime;
473     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
474 
475     nxt_router_app_use(task, app, 1);
476 
477     res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler,
478                         app);
479 
480     if (res == NXT_OK) {
481         return res;
482     }
483 
484     nxt_thread_mutex_lock(&app->mutex);
485 
486     app->pending_processes--;
487 
488     nxt_thread_mutex_unlock(&app->mutex);
489 
490     nxt_router_app_use(task, app, -1);
491 
492     return NXT_ERROR;
493 }
494 
495 
496 nxt_inline nxt_bool_t
497 nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data)
498 {
499     nxt_buf_t       *b, *next;
500     nxt_bool_t      cancelled;
501     nxt_msg_info_t  *msg_info;
502 
503     msg_info = &req_rpc_data->msg_info;
504 
505     if (msg_info->buf == NULL) {
506         return 0;
507     }
508 
509     cancelled = nxt_app_queue_cancel(req_rpc_data->app->shared_port->queue,
510                                      msg_info->tracking_cookie,
511                                      req_rpc_data->stream);
512 
513     if (cancelled) {
514         nxt_debug(task, "stream #%uD: cancelled by router",
515                   req_rpc_data->stream);
516     }
517 
518     for (b = msg_info->buf; b != NULL; b = next) {
519         next = b->next;
520         b->next = NULL;
521 
522         b->completion_handler = msg_info->completion_handler;
523 
524         if (b->is_port_mmap_sent) {
525             b->is_port_mmap_sent = cancelled == 0;
526             b->completion_handler(task, b, b->parent);
527         }
528     }
529 
530     msg_info->buf = NULL;
531 
532     return cancelled;
533 }
534 
535 
536 nxt_inline nxt_bool_t
537 nxt_queue_chk_remove(nxt_queue_link_t *lnk)
538 {
539     if (lnk->next != NULL) {
540         nxt_queue_remove(lnk);
541 
542         lnk->next = NULL;
543 
544         return 1;
545     }
546 
547     return 0;
548 }
549 
550 
551 nxt_inline void
552 nxt_request_rpc_data_unlink(nxt_task_t *task,
553     nxt_request_rpc_data_t *req_rpc_data)
554 {
555     nxt_app_t           *app;
556     nxt_bool_t          unlinked;
557     nxt_http_request_t  *r;
558 
559     nxt_router_msg_cancel(task, req_rpc_data);
560 
561     if (req_rpc_data->app_port != NULL) {
562         nxt_router_app_port_release(task, req_rpc_data->app_port,
563                                     req_rpc_data->apr_action);
564 
565         req_rpc_data->app_port = NULL;
566     }
567 
568     app = req_rpc_data->app;
569     r = req_rpc_data->request;
570 
571     if (r != NULL) {
572         r->timer_data = NULL;
573 
574         nxt_router_http_request_release_post(task, r);
575 
576         r->req_rpc_data = NULL;
577         req_rpc_data->request = NULL;
578 
579         if (app != NULL) {
580             unlinked = 0;
581 
582             nxt_thread_mutex_lock(&app->mutex);
583 
584             if (r->app_link.next != NULL) {
585                 nxt_queue_remove(&r->app_link);
586                 r->app_link.next = NULL;
587 
588                 unlinked = 1;
589             }
590 
591             nxt_thread_mutex_unlock(&app->mutex);
592 
593             if (unlinked) {
594                 nxt_mp_release(r->mem_pool);
595             }
596         }
597     }
598 
599     if (app != NULL) {
600         nxt_router_app_use(task, app, -1);
601 
602         req_rpc_data->app = NULL;
603     }
604 
605     if (req_rpc_data->msg_info.body_fd != -1) {
606         nxt_fd_close(req_rpc_data->msg_info.body_fd);
607 
608         req_rpc_data->msg_info.body_fd = -1;
609     }
610 
611     if (req_rpc_data->rpc_cancel) {
612         req_rpc_data->rpc_cancel = 0;
613 
614         nxt_port_rpc_cancel(task, task->thread->engine->port,
615                             req_rpc_data->stream);
616     }
617 }
618 
619 
620 static void
621 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
622 {
623     nxt_int_t      res;
624     nxt_app_t      *app;
625     nxt_port_t     *port, *main_app_port;
626     nxt_runtime_t  *rt;
627 
628     nxt_port_new_port_handler(task, msg);
629 
630     port = msg->u.new_port;
631 
632     if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) {
633         nxt_router_greet_controller(task, msg->u.new_port);
634     }
635 
636     if (port == NULL || port->type != NXT_PROCESS_APP) {
637 
638         if (msg->port_msg.stream == 0) {
639             return;
640         }
641 
642         msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
643 
644     } else {
645         if (msg->fd[1] != -1) {
646             res = nxt_router_port_queue_map(task, port, msg->fd[1]);
647             if (nxt_slow_path(res != NXT_OK)) {
648                 return;
649             }
650 
651             nxt_fd_close(msg->fd[1]);
652             msg->fd[1] = -1;
653         }
654     }
655 
656     if (msg->port_msg.stream != 0) {
657         nxt_port_rpc_handler(task, msg);
658         return;
659     }
660 
661     /*
662      * Port with "id == 0" is application 'main' port and it always
663      * should come with non-zero stream.
664      */
665     nxt_assert(port->id != 0);
666 
667     /* Find 'main' app port and get app reference. */
668     rt = task->thread->runtime;
669 
670     /*
671      * It is safe to access 'runtime->ports' hash because 'NEW_PORT'
672      * sent to main port (with id == 0) and processed in main thread.
673      */
674     main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0);
675     nxt_assert(main_app_port != NULL);
676 
677     app = main_app_port->app;
678     nxt_assert(app != NULL);
679 
680     nxt_thread_mutex_lock(&app->mutex);
681 
682     /* TODO here should be find-and-add code because there can be
683        port waiters in port_hash */
684     nxt_port_hash_add(&app->port_hash, port);
685     app->port_hash_count++;
686 
687     nxt_thread_mutex_unlock(&app->mutex);
688 
689     port->app = app;
690     port->main_app_port = main_app_port;
691 
692     nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
693 }
694 
695 
696 static void
697 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
698 {
699     void                    *p;
700     size_t                  size;
701     nxt_int_t               ret;
702     nxt_port_t              *port;
703     nxt_router_temp_conf_t  *tmcf;
704 
705     port = nxt_runtime_port_find(task->thread->runtime,
706                                  msg->port_msg.pid,
707                                  msg->port_msg.reply_port);
708     if (nxt_slow_path(port == NULL)) {
709         nxt_alert(task, "conf_data_handler: reply port not found");
710         return;
711     }
712 
713     p = MAP_FAILED;
714 
715     /*
716      * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be
717      * initialized in 'cleanup' section.
718      */
719     size = 0;
720 
721     tmcf = nxt_router_temp_conf(task);
722     if (nxt_slow_path(tmcf == NULL)) {
723         goto fail;
724     }
725 
726     if (nxt_slow_path(msg->fd[0] == -1)) {
727         nxt_alert(task, "conf_data_handler: invalid shm fd");
728         goto fail;
729     }
730 
731     if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) {
732         nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)",
733                   (int) nxt_buf_mem_used_size(&msg->buf->mem));
734         goto fail;
735     }
736 
737     nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t));
738 
739     p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0);
740 
741     nxt_fd_close(msg->fd[0]);
742     msg->fd[0] = -1;
743 
744     if (nxt_slow_path(p == MAP_FAILED)) {
745         goto fail;
746     }
747 
748     nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p);
749 
750     tmcf->router_conf->router = nxt_router;
751     tmcf->stream = msg->port_msg.stream;
752     tmcf->port = port;
753 
754     nxt_port_use(task, tmcf->port, 1);
755 
756     ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size));
757 
758     if (nxt_fast_path(ret == NXT_OK)) {
759         nxt_router_conf_apply(task, tmcf, NULL);
760 
761     } else {
762         nxt_router_conf_error(task, tmcf);
763     }
764 
765     goto cleanup;
766 
767 fail:
768 
769     nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
770                           msg->port_msg.stream, 0, NULL);
771 
772     if (tmcf != NULL) {
773         nxt_mp_destroy(tmcf->mem_pool);
774     }
775 
776 cleanup:
777 
778     if (p != MAP_FAILED) {
779         nxt_mem_munmap(p, size);
780     }
781 
782     if (msg->fd[0] != -1) {
783         nxt_fd_close(msg->fd[0]);
784         msg->fd[0] = -1;
785     }
786 }
787 
788 
789 static void
790 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
791     void *data)
792 {
793     union {
794         nxt_pid_t  removed_pid;
795         void       *data;
796     } u;
797 
798     u.data = data;
799 
800     nxt_port_rpc_remove_peer(task, port, u.removed_pid);
801 }
802 
803 
804 static void
805 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
806 {
807     nxt_event_engine_t  *engine;
808 
809     nxt_port_remove_pid_handler(task, msg);
810 
811     nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
812     {
813         if (nxt_fast_path(engine->port != NULL)) {
814             nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid,
815                           msg->u.data);
816         }
817     }
818     nxt_queue_loop;
819 
820     if (msg->port_msg.stream == 0) {
821         return;
822     }
823 
824     msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
825 
826     nxt_port_rpc_handler(task, msg);
827 }
828 
829 
830 static nxt_router_temp_conf_t *
831 nxt_router_temp_conf(nxt_task_t *task)
832 {
833     nxt_mp_t                *mp, *tmp;
834     nxt_router_conf_t       *rtcf;
835     nxt_router_temp_conf_t  *tmcf;
836 
837     mp = nxt_mp_create(1024, 128, 256, 32);
838     if (nxt_slow_path(mp == NULL)) {
839         return NULL;
840     }
841 
842     rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t));
843     if (nxt_slow_path(rtcf == NULL)) {
844         goto fail;
845     }
846 
847     rtcf->mem_pool = mp;
848 
849     tmp = nxt_mp_create(1024, 128, 256, 32);
850     if (nxt_slow_path(tmp == NULL)) {
851         goto fail;
852     }
853 
854     tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t));
855     if (nxt_slow_path(tmcf == NULL)) {
856         goto temp_fail;
857     }
858 
859     tmcf->mem_pool = tmp;
860     tmcf->router_conf = rtcf;
861     tmcf->count = 1;
862     tmcf->engine = task->thread->engine;
863 
864     tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
865                                      sizeof(nxt_router_engine_conf_t));
866     if (nxt_slow_path(tmcf->engines == NULL)) {
867         goto temp_fail;
868     }
869 
870     nxt_queue_init(&creating_sockets);
871     nxt_queue_init(&pending_sockets);
872     nxt_queue_init(&updating_sockets);
873     nxt_queue_init(&keeping_sockets);
874     nxt_queue_init(&deleting_sockets);
875 
876 #if (NXT_TLS)
877     nxt_queue_init(&tmcf->tls);
878 #endif
879 
880     nxt_queue_init(&tmcf->apps);
881     nxt_queue_init(&tmcf->previous);
882 
883     return tmcf;
884 
885 temp_fail:
886 
887     nxt_mp_destroy(tmp);
888 
889 fail:
890 
891     nxt_mp_destroy(mp);
892 
893     return NULL;
894 }
895 
896 
897 nxt_inline nxt_bool_t
898 nxt_router_app_can_start(nxt_app_t *app)
899 {
900     return app->processes + app->pending_processes < app->max_processes
901             && app->pending_processes < app->max_pending_processes;
902 }
903 
904 
905 nxt_inline nxt_bool_t
906 nxt_router_app_need_start(nxt_app_t *app)
907 {
908     return (app->active_requests
909               > app->port_hash_count + app->pending_processes)
910            || (app->spare_processes
911                 > app->idle_processes + app->pending_processes);
912 }
913 
914 
915 static void
916 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
917 {
918     nxt_int_t                    ret;
919     nxt_app_t                    *app;
920     nxt_router_t                 *router;
921     nxt_runtime_t                *rt;
922     nxt_queue_link_t             *qlk;
923     nxt_socket_conf_t            *skcf;
924     nxt_router_conf_t            *rtcf;
925     nxt_router_temp_conf_t       *tmcf;
926     const nxt_event_interface_t  *interface;
927 #if (NXT_TLS)
928     nxt_router_tlssock_t         *tls;
929 #endif
930 
931     tmcf = obj;
932 
933     qlk = nxt_queue_first(&pending_sockets);
934 
935     if (qlk != nxt_queue_tail(&pending_sockets)) {
936         nxt_queue_remove(qlk);
937         nxt_queue_insert_tail(&creating_sockets, qlk);
938 
939         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
940 
941         nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
942 
943         return;
944     }
945 
946 #if (NXT_TLS)
947     qlk = nxt_queue_first(&tmcf->tls);
948 
949     if (qlk != nxt_queue_tail(&tmcf->tls)) {
950         nxt_queue_remove(qlk);
951 
952         tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link);
953 
954         nxt_router_tls_rpc_create(task, tmcf, tls);
955         return;
956     }
957 #endif
958 
959     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
960 
961         if (nxt_router_app_need_start(app)) {
962             nxt_router_app_rpc_create(task, tmcf, app);
963             return;
964         }
965 
966     } nxt_queue_loop;
967 
968     rtcf = tmcf->router_conf;
969 
970     if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) {
971         nxt_router_access_log_open(task, tmcf);
972         return;
973     }
974 
975     rt = task->thread->runtime;
976 
977     interface = nxt_service_get(rt->services, "engine", NULL);
978 
979     router = rtcf->router;
980 
981     ret = nxt_router_engines_create(task, router, tmcf, interface);
982     if (nxt_slow_path(ret != NXT_OK)) {
983         goto fail;
984     }
985 
986     ret = nxt_router_threads_create(task, rt, tmcf);
987     if (nxt_slow_path(ret != NXT_OK)) {
988         goto fail;
989     }
990 
991     nxt_router_apps_sort(task, router, tmcf);
992 
993     nxt_router_apps_hash_use(task, rtcf, 1);
994 
995     nxt_router_engines_post(router, tmcf);
996 
997     nxt_queue_add(&router->sockets, &updating_sockets);
998     nxt_queue_add(&router->sockets, &creating_sockets);
999 
1000     router->access_log = rtcf->access_log;
1001 
1002     nxt_router_conf_ready(task, tmcf);
1003 
1004     return;
1005 
1006 fail:
1007 
1008     nxt_router_conf_error(task, tmcf);
1009 
1010     return;
1011 }
1012 
1013 
1014 static void
1015 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data)
1016 {
1017     nxt_joint_job_t  *job;
1018 
1019     job = obj;
1020 
1021     nxt_router_conf_ready(task, job->tmcf);
1022 }
1023 
1024 
1025 static void
1026 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1027 {
1028     uint32_t               count;
1029     nxt_router_conf_t      *rtcf;
1030     nxt_thread_spinlock_t  *lock;
1031 
1032     nxt_debug(task, "temp conf %p count: %D", tmcf, tmcf->count);
1033 
1034     if (--tmcf->count > 0) {
1035         return;
1036     }
1037 
1038     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
1039 
1040     rtcf = tmcf->router_conf;
1041 
1042     lock = &rtcf->router->lock;
1043 
1044     nxt_thread_spin_lock(lock);
1045 
1046     count = rtcf->count;
1047 
1048     nxt_thread_spin_unlock(lock);
1049 
1050     nxt_debug(task, "rtcf %p: %D", rtcf, count);
1051 
1052     if (count == 0) {
1053         nxt_router_apps_hash_use(task, rtcf, -1);
1054 
1055         nxt_router_access_log_release(task, lock, rtcf->access_log);
1056 
1057         nxt_mp_destroy(rtcf->mem_pool);
1058     }
1059 
1060     nxt_mp_destroy(tmcf->mem_pool);
1061 }
1062 
1063 
1064 static void
1065 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1066 {
1067     nxt_app_t          *app;
1068     nxt_queue_t        new_socket_confs;
1069     nxt_socket_t       s;
1070     nxt_router_t       *router;
1071     nxt_queue_link_t   *qlk;
1072     nxt_socket_conf_t  *skcf;
1073     nxt_router_conf_t  *rtcf;
1074 
1075     nxt_alert(task, "failed to apply new conf");
1076 
1077     for (qlk = nxt_queue_first(&creating_sockets);
1078          qlk != nxt_queue_tail(&creating_sockets);
1079          qlk = nxt_queue_next(qlk))
1080     {
1081         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1082         s = skcf->listen->socket;
1083 
1084         if (s != -1) {
1085             nxt_socket_close(task, s);
1086         }
1087 
1088         nxt_free(skcf->listen);
1089     }
1090 
1091     nxt_queue_init(&new_socket_confs);
1092     nxt_queue_add(&new_socket_confs, &updating_sockets);
1093     nxt_queue_add(&new_socket_confs, &pending_sockets);
1094     nxt_queue_add(&new_socket_confs, &creating_sockets);
1095 
1096     rtcf = tmcf->router_conf;
1097 
1098     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1099 
1100         nxt_router_app_unlink(task, app);
1101 
1102     } nxt_queue_loop;
1103 
1104     router = rtcf->router;
1105 
1106     nxt_queue_add(&router->sockets, &keeping_sockets);
1107     nxt_queue_add(&router->sockets, &deleting_sockets);
1108 
1109     nxt_queue_add(&router->apps, &tmcf->previous);
1110 
1111     // TODO: new engines and threads
1112 
1113     nxt_router_access_log_release(task, &router->lock, rtcf->access_log);
1114 
1115     nxt_mp_destroy(rtcf->mem_pool);
1116 
1117     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
1118 
1119     nxt_mp_destroy(tmcf->mem_pool);
1120 }
1121 
1122 
1123 static void
1124 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1125     nxt_port_msg_type_t type)
1126 {
1127     nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);
1128 
1129     nxt_port_use(task, tmcf->port, -1);
1130 
1131     tmcf->port = NULL;
1132 }
1133 
1134 
1135 static nxt_conf_map_t  nxt_router_conf[] = {
1136     {
1137         nxt_string("listeners_threads"),
1138         NXT_CONF_MAP_INT32,
1139         offsetof(nxt_router_conf_t, threads),
1140     },
1141 };
1142 
1143 
1144 static nxt_conf_map_t  nxt_router_app_conf[] = {
1145     {
1146         nxt_string("type"),
1147         NXT_CONF_MAP_STR,
1148         offsetof(nxt_router_app_conf_t, type),
1149     },
1150 
1151     {
1152         nxt_string("limits"),
1153         NXT_CONF_MAP_PTR,
1154         offsetof(nxt_router_app_conf_t, limits_value),
1155     },
1156 
1157     {
1158         nxt_string("processes"),
1159         NXT_CONF_MAP_INT32,
1160         offsetof(nxt_router_app_conf_t, processes),
1161     },
1162 
1163     {
1164         nxt_string("processes"),
1165         NXT_CONF_MAP_PTR,
1166         offsetof(nxt_router_app_conf_t, processes_value),
1167     },
1168 
1169     {
1170         nxt_string("targets"),
1171         NXT_CONF_MAP_PTR,
1172         offsetof(nxt_router_app_conf_t, targets_value),
1173     },
1174 };
1175 
1176 
1177 static nxt_conf_map_t  nxt_router_app_limits_conf[] = {
1178     {
1179         nxt_string("timeout"),
1180         NXT_CONF_MAP_MSEC,
1181         offsetof(nxt_router_app_conf_t, timeout),
1182     },
1183 
1184     {
1185         nxt_string("requests"),
1186         NXT_CONF_MAP_INT32,
1187         offsetof(nxt_router_app_conf_t, requests),
1188     },
1189 };
1190 
1191 
1192 static nxt_conf_map_t  nxt_router_app_processes_conf[] = {
1193     {
1194         nxt_string("spare"),
1195         NXT_CONF_MAP_INT32,
1196         offsetof(nxt_router_app_conf_t, spare_processes),
1197     },
1198 
1199     {
1200         nxt_string("max"),
1201         NXT_CONF_MAP_INT32,
1202         offsetof(nxt_router_app_conf_t, max_processes),
1203     },
1204 
1205     {
1206         nxt_string("idle_timeout"),
1207         NXT_CONF_MAP_MSEC,
1208         offsetof(nxt_router_app_conf_t, idle_timeout),
1209     },
1210 };
1211 
1212 
1213 static nxt_conf_map_t  nxt_router_listener_conf[] = {
1214     {
1215         nxt_string("pass"),
1216         NXT_CONF_MAP_STR_COPY,
1217         offsetof(nxt_router_listener_conf_t, pass),
1218     },
1219 
1220     {
1221         nxt_string("application"),
1222         NXT_CONF_MAP_STR_COPY,
1223         offsetof(nxt_router_listener_conf_t, application),
1224     },
1225 };
1226 
1227 
1228 static nxt_conf_map_t  nxt_router_http_conf[] = {
1229     {
1230         nxt_string("header_buffer_size"),
1231         NXT_CONF_MAP_SIZE,
1232         offsetof(nxt_socket_conf_t, header_buffer_size),
1233     },
1234 
1235     {
1236         nxt_string("large_header_buffer_size"),
1237         NXT_CONF_MAP_SIZE,
1238         offsetof(nxt_socket_conf_t, large_header_buffer_size),
1239     },
1240 
1241     {
1242         nxt_string("large_header_buffers"),
1243         NXT_CONF_MAP_SIZE,
1244         offsetof(nxt_socket_conf_t, large_header_buffers),
1245     },
1246 
1247     {
1248         nxt_string("body_buffer_size"),
1249         NXT_CONF_MAP_SIZE,
1250         offsetof(nxt_socket_conf_t, body_buffer_size),
1251     },
1252 
1253     {
1254         nxt_string("max_body_size"),
1255         NXT_CONF_MAP_SIZE,
1256         offsetof(nxt_socket_conf_t, max_body_size),
1257     },
1258 
1259     {
1260         nxt_string("idle_timeout"),
1261         NXT_CONF_MAP_MSEC,
1262         offsetof(nxt_socket_conf_t, idle_timeout),
1263     },
1264 
1265     {
1266         nxt_string("header_read_timeout"),
1267         NXT_CONF_MAP_MSEC,
1268         offsetof(nxt_socket_conf_t, header_read_timeout),
1269     },
1270 
1271     {
1272         nxt_string("body_read_timeout"),
1273         NXT_CONF_MAP_MSEC,
1274         offsetof(nxt_socket_conf_t, body_read_timeout),
1275     },
1276 
1277     {
1278         nxt_string("send_timeout"),
1279         NXT_CONF_MAP_MSEC,
1280         offsetof(nxt_socket_conf_t, send_timeout),
1281     },
1282 
1283     {
1284         nxt_string("body_temp_path"),
1285         NXT_CONF_MAP_STR,
1286         offsetof(nxt_socket_conf_t, body_temp_path),
1287     },
1288 
1289     {
1290         nxt_string("discard_unsafe_fields"),
1291         NXT_CONF_MAP_INT8,
1292         offsetof(nxt_socket_conf_t, discard_unsafe_fields),
1293     },
1294 };
1295 
1296 
1297 static nxt_conf_map_t  nxt_router_websocket_conf[] = {
1298     {
1299         nxt_string("max_frame_size"),
1300         NXT_CONF_MAP_SIZE,
1301         offsetof(nxt_websocket_conf_t, max_frame_size),
1302     },
1303 
1304     {
1305         nxt_string("read_timeout"),
1306         NXT_CONF_MAP_MSEC,
1307         offsetof(nxt_websocket_conf_t, read_timeout),
1308     },
1309 
1310     {
1311         nxt_string("keepalive_interval"),
1312         NXT_CONF_MAP_MSEC,
1313         offsetof(nxt_websocket_conf_t, keepalive_interval),
1314     },
1315 
1316 };
1317 
1318 
1319 static nxt_int_t
1320 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1321     u_char *start, u_char *end)
1322 {
1323     u_char                      *p;
1324     size_t                      size;
1325     nxt_mp_t                    *mp, *app_mp;
1326     uint32_t                    next, next_target;
1327     nxt_int_t                   ret;
1328     nxt_str_t                   name, path, target;
1329     nxt_app_t                   *app, *prev;
1330     nxt_str_t                   *t, *s, *targets;
1331     nxt_uint_t                  n, i;
1332     nxt_port_t                  *port;
1333     nxt_router_t                *router;
1334     nxt_app_joint_t             *app_joint;
1335     nxt_conf_value_t            *conf, *http, *value, *websocket;
1336     nxt_conf_value_t            *applications, *application;
1337     nxt_conf_value_t            *listeners, *listener;
1338     nxt_conf_value_t            *routes_conf, *static_conf;
1339     nxt_socket_conf_t           *skcf;
1340     nxt_http_routes_t           *routes;
1341     nxt_event_engine_t          *engine;
1342     nxt_app_lang_module_t       *lang;
1343     nxt_router_app_conf_t       apcf;
1344     nxt_router_access_log_t     *access_log;
1345     nxt_router_listener_conf_t  lscf;
1346 #if (NXT_TLS)
1347     nxt_router_tlssock_t        *tls;
1348 #endif
1349 
1350     static nxt_str_t  http_path = nxt_string("/settings/http");
1351     static nxt_str_t  applications_path = nxt_string("/applications");
1352     static nxt_str_t  listeners_path = nxt_string("/listeners");
1353     static nxt_str_t  routes_path = nxt_string("/routes");
1354     static nxt_str_t  access_log_path = nxt_string("/access_log");
1355 #if (NXT_TLS)
1356     static nxt_str_t  certificate_path = nxt_string("/tls/certificate");
1357 #endif
1358     static nxt_str_t  static_path = nxt_string("/settings/http/static");
1359     static nxt_str_t  websocket_path = nxt_string("/settings/http/websocket");
1360 
1361     conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
1362     if (conf == NULL) {
1363         nxt_alert(task, "configuration parsing error");
1364         return NXT_ERROR;
1365     }
1366 
1367     mp = tmcf->router_conf->mem_pool;
1368 
1369     ret = nxt_conf_map_object(mp, conf, nxt_router_conf,
1370                               nxt_nitems(nxt_router_conf), tmcf->router_conf);
1371     if (ret != NXT_OK) {
1372         nxt_alert(task, "root map error");
1373         return NXT_ERROR;
1374     }
1375 
1376     if (tmcf->router_conf->threads == 0) {
1377         tmcf->router_conf->threads = nxt_ncpu;
1378     }
1379 
1380     static_conf = nxt_conf_get_path(conf, &static_path);
1381 
1382     ret = nxt_router_conf_process_static(task, tmcf->router_conf, static_conf);
1383     if (nxt_slow_path(ret != NXT_OK)) {
1384         return NXT_ERROR;
1385     }
1386 
1387     router = tmcf->router_conf->router;
1388 
1389     applications = nxt_conf_get_path(conf, &applications_path);
1390 
1391     if (applications != NULL) {
1392         next = 0;
1393 
1394         for ( ;; ) {
1395             application = nxt_conf_next_object_member(applications,
1396                                                       &name, &next);
1397             if (application == NULL) {
1398                 break;
1399             }
1400 
1401             nxt_debug(task, "application \"%V\"", &name);
1402 
1403             size = nxt_conf_json_length(application, NULL);
1404 
1405             app_mp = nxt_mp_create(4096, 128, 1024, 64);
1406             if (nxt_slow_path(app_mp == NULL)) {
1407                 goto fail;
1408             }
1409 
1410             app = nxt_mp_get(app_mp, sizeof(nxt_app_t) + name.length + size);
1411             if (app == NULL) {
1412                 goto app_fail;
1413             }
1414 
1415             nxt_memzero(app, sizeof(nxt_app_t));
1416 
1417             app->mem_pool = app_mp;
1418 
1419             app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t));
1420             app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t)
1421                                                   + name.length);
1422 
1423             p = nxt_conf_json_print(app->conf.start, application, NULL);
1424             app->conf.length = p - app->conf.start;
1425 
1426             nxt_assert(app->conf.length <= size);
1427 
1428             nxt_debug(task, "application conf \"%V\"", &app->conf);
1429 
1430             prev = nxt_router_app_find(&router->apps, &name);
1431 
1432             if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
1433                 nxt_mp_destroy(app_mp);
1434 
1435                 nxt_queue_remove(&prev->link);
1436                 nxt_queue_insert_tail(&tmcf->previous, &prev->link);
1437 
1438                 ret = nxt_router_apps_hash_add(tmcf->router_conf, prev);
1439                 if (nxt_slow_path(ret != NXT_OK)) {
1440                     goto fail;
1441                 }
1442 
1443                 continue;
1444             }
1445 
1446             apcf.processes = 1;
1447             apcf.max_processes = 1;
1448             apcf.spare_processes = 0;
1449             apcf.timeout = 0;
1450             apcf.idle_timeout = 15000;
1451             apcf.requests = 0;
1452             apcf.limits_value = NULL;
1453             apcf.processes_value = NULL;
1454             apcf.targets_value = NULL;
1455 
1456             app_joint = nxt_malloc(sizeof(nxt_app_joint_t));
1457             if (nxt_slow_path(app_joint == NULL)) {
1458                 goto app_fail;
1459             }
1460 
1461             nxt_memzero(app_joint, sizeof(nxt_app_joint_t));
1462 
1463             ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
1464                                       nxt_nitems(nxt_router_app_conf), &apcf);
1465             if (ret != NXT_OK) {
1466                 nxt_alert(task, "application map error");
1467                 goto app_fail;
1468             }
1469 
1470             if (apcf.limits_value != NULL) {
1471 
1472                 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) {
1473                     nxt_alert(task, "application limits is not object");
1474                     goto app_fail;
1475                 }
1476 
1477                 ret = nxt_conf_map_object(mp, apcf.limits_value,
1478                                         nxt_router_app_limits_conf,
1479                                         nxt_nitems(nxt_router_app_limits_conf),
1480                                         &apcf);
1481                 if (ret != NXT_OK) {
1482                     nxt_alert(task, "application limits map error");
1483                     goto app_fail;
1484                 }
1485             }
1486 
1487             if (apcf.processes_value != NULL
1488                 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT)
1489             {
1490                 ret = nxt_conf_map_object(mp, apcf.processes_value,
1491                                      nxt_router_app_processes_conf,
1492                                      nxt_nitems(nxt_router_app_processes_conf),
1493                                      &apcf);
1494                 if (ret != NXT_OK) {
1495                     nxt_alert(task, "application processes map error");
1496                     goto app_fail;
1497                 }
1498 
1499             } else {
1500                 apcf.max_processes = apcf.processes;
1501                 apcf.spare_processes = apcf.processes;
1502             }
1503 
1504             if (apcf.targets_value != NULL) {
1505                 n = nxt_conf_object_members_count(apcf.targets_value);
1506 
1507                 targets = nxt_mp_get(app_mp, sizeof(nxt_str_t) * n);
1508                 if (nxt_slow_path(targets == NULL)) {
1509                     goto app_fail;
1510                 }
1511 
1512                 next_target = 0;
1513 
1514                 for (i = 0; i < n; i++) {
1515                     (void) nxt_conf_next_object_member(apcf.targets_value,
1516                                                        &target, &next_target);
1517 
1518                     s = nxt_str_dup(app_mp, &targets[i], &target);
1519                     if (nxt_slow_path(s == NULL)) {
1520                         goto app_fail;
1521                     }
1522                 }
1523 
1524             } else {
1525                 targets = NULL;
1526             }
1527 
1528             nxt_debug(task, "application type: %V", &apcf.type);
1529             nxt_debug(task, "application processes: %D", apcf.processes);
1530             nxt_debug(task, "application request timeout: %M", apcf.timeout);
1531             nxt_debug(task, "application requests: %D", apcf.requests);
1532 
1533             lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
1534 
1535             if (lang == NULL) {
1536                 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type);
1537                 goto app_fail;
1538             }
1539 
1540             nxt_debug(task, "application language module: \"%s\"", lang->file);
1541 
1542             ret = nxt_thread_mutex_create(&app->mutex);
1543             if (ret != NXT_OK) {
1544                 goto app_fail;
1545             }
1546 
1547             nxt_queue_init(&app->ports);
1548             nxt_queue_init(&app->spare_ports);
1549             nxt_queue_init(&app->idle_ports);
1550             nxt_queue_init(&app->ack_waiting_req);
1551 
1552             app->name.length = name.length;
1553             nxt_memcpy(app->name.start, name.start, name.length);
1554 
1555             app->type = lang->type;
1556             app->max_processes = apcf.max_processes;
1557             app->spare_processes = apcf.spare_processes;
1558             app->max_pending_processes = apcf.spare_processes
1559                                          ? apcf.spare_processes : 1;
1560             app->timeout = apcf.timeout;
1561             app->idle_timeout = apcf.idle_timeout;
1562             app->max_requests = apcf.requests;
1563 
1564             app->targets = targets;
1565 
1566             engine = task->thread->engine;
1567 
1568             app->engine = engine;
1569 
1570             app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;
1571             app->adjust_idle_work.task = &engine->task;
1572             app->adjust_idle_work.obj = app;
1573 
1574             nxt_queue_insert_tail(&tmcf->apps, &app->link);
1575 
1576             ret = nxt_router_apps_hash_add(tmcf->router_conf, app);
1577             if (nxt_slow_path(ret != NXT_OK)) {
1578                 goto app_fail;
1579             }
1580 
1581             nxt_router_app_use(task, app, 1);
1582 
1583             app->joint = app_joint;
1584 
1585             app_joint->use_count = 1;
1586             app_joint->app = app;
1587 
1588             app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS;
1589             app_joint->idle_timer.work_queue = &engine->fast_work_queue;
1590             app_joint->idle_timer.handler = nxt_router_app_idle_timeout;
1591             app_joint->idle_timer.task = &engine->task;
1592             app_joint->idle_timer.log = app_joint->idle_timer.task->log;
1593 
1594             app_joint->free_app_work.handler = nxt_router_free_app;
1595             app_joint->free_app_work.task = &engine->task;
1596             app_joint->free_app_work.obj = app_joint;
1597 
1598             port = nxt_port_new(task, (nxt_port_id_t) -1, nxt_pid,
1599                                 NXT_PROCESS_APP);
1600             if (nxt_slow_path(port == NULL)) {
1601                 return NXT_ERROR;
1602             }
1603 
1604             ret = nxt_port_socket_init(task, port, 0);
1605             if (nxt_slow_path(ret != NXT_OK)) {
1606                 nxt_port_use(task, port, -1);
1607                 return NXT_ERROR;
1608             }
1609 
1610             ret = nxt_router_app_queue_init(task, port);
1611             if (nxt_slow_path(ret != NXT_OK)) {
1612                 nxt_port_write_close(port);
1613                 nxt_port_read_close(port);
1614                 nxt_port_use(task, port, -1);
1615                 return NXT_ERROR;
1616             }
1617 
1618             nxt_port_write_enable(task, port);
1619             port->app = app;
1620 
1621             app->shared_port = port;
1622 
1623             nxt_thread_mutex_create(&app->outgoing.mutex);
1624         }
1625     }
1626 
1627     routes_conf = nxt_conf_get_path(conf, &routes_path);
1628     if (nxt_fast_path(routes_conf != NULL)) {
1629         routes = nxt_http_routes_create(task, tmcf, routes_conf);
1630         if (nxt_slow_path(routes == NULL)) {
1631             return NXT_ERROR;
1632         }
1633         tmcf->router_conf->routes = routes;
1634     }
1635 
1636     ret = nxt_upstreams_create(task, tmcf, conf);
1637     if (nxt_slow_path(ret != NXT_OK)) {
1638         return ret;
1639     }
1640 
1641     http = nxt_conf_get_path(conf, &http_path);
1642 #if 0
1643     if (http == NULL) {
1644         nxt_alert(task, "no \"http\" block");
1645         return NXT_ERROR;
1646     }
1647 #endif
1648 
1649     websocket = nxt_conf_get_path(conf, &websocket_path);
1650 
1651     listeners = nxt_conf_get_path(conf, &listeners_path);
1652 
1653     if (listeners != NULL) {
1654         next = 0;
1655 
1656         for ( ;; ) {
1657             listener = nxt_conf_next_object_member(listeners, &name, &next);
1658             if (listener == NULL) {
1659                 break;
1660             }
1661 
1662             skcf = nxt_router_socket_conf(task, tmcf, &name);
1663             if (skcf == NULL) {
1664                 goto fail;
1665             }
1666 
1667             nxt_memzero(&lscf, sizeof(lscf));
1668 
1669             ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
1670                                       nxt_nitems(nxt_router_listener_conf),
1671                                       &lscf);
1672             if (ret != NXT_OK) {
1673                 nxt_alert(task, "listener map error");
1674                 goto fail;
1675             }
1676 
1677             nxt_debug(task, "application: %V", &lscf.application);
1678 
1679             // STUB, default values if http block is not defined.
1680             skcf->header_buffer_size = 2048;
1681             skcf->large_header_buffer_size = 8192;
1682             skcf->large_header_buffers = 4;
1683             skcf->discard_unsafe_fields = 1;
1684             skcf->body_buffer_size = 16 * 1024;
1685             skcf->max_body_size = 8 * 1024 * 1024;
1686             skcf->proxy_header_buffer_size = 64 * 1024;
1687             skcf->proxy_buffer_size = 4096;
1688             skcf->proxy_buffers = 256;
1689             skcf->idle_timeout = 180 * 1000;
1690             skcf->header_read_timeout = 30 * 1000;
1691             skcf->body_read_timeout = 30 * 1000;
1692             skcf->send_timeout = 30 * 1000;
1693             skcf->proxy_timeout = 60 * 1000;
1694             skcf->proxy_send_timeout = 30 * 1000;
1695             skcf->proxy_read_timeout = 30 * 1000;
1696 
1697             skcf->websocket_conf.max_frame_size = 1024 * 1024;
1698             skcf->websocket_conf.read_timeout = 60 * 1000;
1699             skcf->websocket_conf.keepalive_interval = 30 * 1000;
1700 
1701             nxt_str_null(&skcf->body_temp_path);
1702 
1703             if (http != NULL) {
1704                 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
1705                                           nxt_nitems(nxt_router_http_conf),
1706                                           skcf);
1707                 if (ret != NXT_OK) {
1708                     nxt_alert(task, "http map error");
1709                     goto fail;
1710                 }
1711             }
1712 
1713             if (websocket != NULL) {
1714                 ret = nxt_conf_map_object(mp, websocket,
1715                                           nxt_router_websocket_conf,
1716                                           nxt_nitems(nxt_router_websocket_conf),
1717                                           &skcf->websocket_conf);
1718                 if (ret != NXT_OK) {
1719                     nxt_alert(task, "websocket map error");
1720                     goto fail;
1721                 }
1722             }
1723 
1724             t = &skcf->body_temp_path;
1725 
1726             if (t->length == 0) {
1727                 t->start = (u_char *) task->thread->runtime->tmp;
1728                 t->length = nxt_strlen(t->start);
1729             }
1730 
1731 #if (NXT_TLS)
1732             value = nxt_conf_get_path(listener, &certificate_path);
1733 
1734             if (value != NULL) {
1735                 nxt_conf_get_string(value, &name);
1736 
1737                 tls = nxt_mp_get(mp, sizeof(nxt_router_tlssock_t));
1738                 if (nxt_slow_path(tls == NULL)) {
1739                     goto fail;
1740                 }
1741 
1742                 tls->name = name;
1743                 tls->conf = skcf;
1744 
1745                 nxt_queue_insert_tail(&tmcf->tls, &tls->link);
1746             }
1747 #endif
1748 
1749             skcf->listen->handler = nxt_http_conn_init;
1750             skcf->router_conf = tmcf->router_conf;
1751             skcf->router_conf->count++;
1752 
1753             if (lscf.pass.length != 0) {
1754                 skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass);
1755 
1756             /* COMPATIBILITY: listener application. */
1757             } else if (lscf.application.length > 0) {
1758                 skcf->action = nxt_http_pass_application(task,
1759                                                          tmcf->router_conf,
1760                                                          &lscf.application);
1761             }
1762 
1763             if (nxt_slow_path(skcf->action == NULL)) {
1764                 goto fail;
1765             }
1766         }
1767     }
1768 
1769     ret = nxt_http_routes_resolve(task, tmcf);
1770     if (nxt_slow_path(ret != NXT_OK)) {
1771         goto fail;
1772     }
1773 
1774     value = nxt_conf_get_path(conf, &access_log_path);
1775 
1776     if (value != NULL) {
1777         nxt_conf_get_string(value, &path);
1778 
1779         access_log = router->access_log;
1780 
1781         if (access_log != NULL && nxt_strstr_eq(&path, &access_log->path)) {
1782             nxt_thread_spin_lock(&router->lock);
1783             access_log->count++;
1784             nxt_thread_spin_unlock(&router->lock);
1785 
1786         } else {
1787             access_log = nxt_malloc(sizeof(nxt_router_access_log_t)
1788                                     + path.length);
1789             if (access_log == NULL) {
1790                 nxt_alert(task, "failed to allocate access log structure");
1791                 goto fail;
1792             }
1793 
1794             access_log->fd = -1;
1795             access_log->handler = &nxt_router_access_log_writer;
1796             access_log->count = 1;
1797 
1798             access_log->path.length = path.length;
1799             access_log->path.start = (u_char *) access_log
1800                                      + sizeof(nxt_router_access_log_t);
1801 
1802             nxt_memcpy(access_log->path.start, path.start, path.length);
1803         }
1804 
1805         tmcf->router_conf->access_log = access_log;
1806     }
1807 
1808     nxt_queue_add(&deleting_sockets, &router->sockets);
1809     nxt_queue_init(&router->sockets);
1810 
1811     return NXT_OK;
1812 
1813 app_fail:
1814 
1815     nxt_mp_destroy(app_mp);
1816 
1817 fail:
1818 
1819     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1820 
1821         nxt_queue_remove(&app->link);
1822         nxt_thread_mutex_destroy(&app->mutex);
1823         nxt_mp_destroy(app->mem_pool);
1824 
1825     } nxt_queue_loop;
1826 
1827     return NXT_ERROR;
1828 }
1829 
1830 
1831 static nxt_int_t
1832 nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf,
1833     nxt_conf_value_t *conf)
1834 {
1835     uint32_t          next, i;
1836     nxt_mp_t          *mp;
1837     nxt_str_t         *type, extension, str;
1838     nxt_int_t         ret;
1839     nxt_uint_t        exts;
1840     nxt_conf_value_t  *mtypes_conf, *ext_conf, *value;
1841 
1842     static nxt_str_t  mtypes_path = nxt_string("/mime_types");
1843 
1844     mp = rtcf->mem_pool;
1845 
1846     ret = nxt_http_static_mtypes_init(mp, &rtcf->mtypes_hash);
1847     if (nxt_slow_path(ret != NXT_OK)) {
1848         return NXT_ERROR;
1849     }
1850 
1851     if (conf == NULL) {
1852         return NXT_OK;
1853     }
1854 
1855     mtypes_conf = nxt_conf_get_path(conf, &mtypes_path);
1856 
1857     if (mtypes_conf != NULL) {
1858         next = 0;
1859 
1860         for ( ;; ) {
1861             ext_conf = nxt_conf_next_object_member(mtypes_conf, &str, &next);
1862 
1863             if (ext_conf == NULL) {
1864                 break;
1865             }
1866 
1867             type = nxt_str_dup(mp, NULL, &str);
1868             if (nxt_slow_path(type == NULL)) {
1869                 return NXT_ERROR;
1870             }
1871 
1872             if (nxt_conf_type(ext_conf) == NXT_CONF_STRING) {
1873                 nxt_conf_get_string(ext_conf, &str);
1874 
1875                 if (nxt_slow_path(nxt_str_dup(mp, &extension, &str) == NULL)) {
1876                     return NXT_ERROR;
1877                 }
1878 
1879                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
1880                                                       &extension, type);
1881                 if (nxt_slow_path(ret != NXT_OK)) {
1882                     return NXT_ERROR;
1883                 }
1884 
1885                 continue;
1886             }
1887 
1888             exts = nxt_conf_array_elements_count(ext_conf);
1889 
1890             for (i = 0; i < exts; i++) {
1891                 value = nxt_conf_get_array_element(ext_conf, i);
1892 
1893                 nxt_conf_get_string(value, &str);
1894 
1895                 if (nxt_slow_path(nxt_str_dup(mp, &extension, &str) == NULL)) {
1896                     return NXT_ERROR;
1897                 }
1898 
1899                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
1900                                                       &extension, type);
1901                 if (nxt_slow_path(ret != NXT_OK)) {
1902                     return NXT_ERROR;
1903                 }
1904             }
1905         }
1906     }
1907 
1908     return NXT_OK;
1909 }
1910 
1911 
1912 static nxt_app_t *
1913 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
1914 {
1915     nxt_app_t  *app;
1916 
1917     nxt_queue_each(app, queue, nxt_app_t, link) {
1918 
1919         if (nxt_strstr_eq(name, &app->name)) {
1920             return app;
1921         }
1922 
1923     } nxt_queue_loop;
1924 
1925     return NULL;
1926 }
1927 
1928 
1929 static nxt_int_t
1930 nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port)
1931 {
1932     void       *mem;
1933     nxt_int_t  fd;
1934 
1935     fd = nxt_shm_open(task, sizeof(nxt_app_queue_t));
1936     if (nxt_slow_path(fd == -1)) {
1937         return NXT_ERROR;
1938     }
1939 
1940     mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t),
1941                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
1942     if (nxt_slow_path(mem == MAP_FAILED)) {
1943         nxt_fd_close(fd);
1944 
1945         return NXT_ERROR;
1946     }
1947 
1948     nxt_app_queue_init(mem);
1949 
1950     port->queue_fd = fd;
1951     port->queue = mem;
1952 
1953     return NXT_OK;
1954 }
1955 
1956 
1957 static nxt_int_t
1958 nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port)
1959 {
1960     void       *mem;
1961     nxt_int_t  fd;
1962 
1963     fd = nxt_shm_open(task, sizeof(nxt_port_queue_t));
1964     if (nxt_slow_path(fd == -1)) {
1965         return NXT_ERROR;
1966     }
1967 
1968     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
1969                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
1970     if (nxt_slow_path(mem == MAP_FAILED)) {
1971         nxt_fd_close(fd);
1972 
1973         return NXT_ERROR;
1974     }
1975 
1976     nxt_port_queue_init(mem);
1977 
1978     port->queue_fd = fd;
1979     port->queue = mem;
1980 
1981     return NXT_OK;
1982 }
1983 
1984 
1985 static nxt_int_t
1986 nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd)
1987 {
1988     void  *mem;
1989 
1990     nxt_assert(fd != -1);
1991 
1992     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
1993                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
1994     if (nxt_slow_path(mem == MAP_FAILED)) {
1995 
1996         return NXT_ERROR;
1997     }
1998 
1999     port->queue = mem;
2000 
2001     return NXT_OK;
2002 }
2003 
2004 
2005 static const nxt_lvlhsh_proto_t  nxt_router_apps_hash_proto  nxt_aligned(64) = {
2006     NXT_LVLHSH_DEFAULT,
2007     nxt_router_apps_hash_test,
2008     nxt_mp_lvlhsh_alloc,
2009     nxt_mp_lvlhsh_free,
2010 };
2011 
2012 
2013 static nxt_int_t
2014 nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
2015 {
2016     nxt_app_t  *app;
2017 
2018     app = data;
2019 
2020     return nxt_strstr_eq(&lhq->key, &app->name) ? NXT_OK : NXT_DECLINED;
2021 }
2022 
2023 
2024 static nxt_int_t
2025 nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, nxt_app_t *app)
2026 {
2027     nxt_lvlhsh_query_t  lhq;
2028 
2029     lhq.key_hash = nxt_djb_hash(app->name.start, app->name.length);
2030     lhq.replace = 0;
2031     lhq.key = app->name;
2032     lhq.value = app;
2033     lhq.proto = &nxt_router_apps_hash_proto;
2034     lhq.pool = rtcf->mem_pool;
2035 
2036     switch (nxt_lvlhsh_insert(&rtcf->apps_hash, &lhq)) {
2037 
2038     case NXT_OK:
2039         return NXT_OK;
2040 
2041     case NXT_DECLINED:
2042         nxt_thread_log_alert("router app hash adding failed: "
2043                              "\"%V\" is already in hash", &lhq.key);
2044         /* Fall through. */
2045     default:
2046         return NXT_ERROR;
2047     }
2048 }
2049 
2050 
2051 static nxt_app_t *
2052 nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, nxt_str_t *name)
2053 {
2054     nxt_lvlhsh_query_t  lhq;
2055 
2056     lhq.key_hash = nxt_djb_hash(name->start, name->length);
2057     lhq.key = *name;
2058     lhq.proto = &nxt_router_apps_hash_proto;
2059 
2060     if (nxt_lvlhsh_find(&rtcf->apps_hash, &lhq) != NXT_OK) {
2061         return NULL;
2062     }
2063 
2064     return lhq.value;
2065 }
2066 
2067 
2068 static void
2069 nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i)
2070 {
2071     nxt_app_t          *app;
2072     nxt_lvlhsh_each_t  lhe;
2073 
2074     nxt_lvlhsh_each_init(&lhe, &nxt_router_apps_hash_proto);
2075 
2076     for ( ;; ) {
2077         app = nxt_lvlhsh_each(&rtcf->apps_hash, &lhe);
2078 
2079         if (app == NULL) {
2080             break;
2081         }
2082 
2083         nxt_router_app_use(task, app, i);
2084     }
2085 }
2086 
2087 
2088 
2089 nxt_int_t
2090 nxt_router_listener_application(nxt_router_conf_t *rtcf, nxt_str_t *name,
2091     nxt_http_action_t *action)
2092 {
2093     nxt_app_t  *app;
2094 
2095     app = nxt_router_apps_hash_get(rtcf, name);
2096 
2097     if (app == NULL) {
2098         return NXT_DECLINED;
2099     }
2100 
2101     action->u.application = app;
2102     action->handler = nxt_http_application_handler;
2103 
2104     return NXT_OK;
2105 }
2106 
2107 
2108 static nxt_socket_conf_t *
2109 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2110     nxt_str_t *name)
2111 {
2112     size_t               size;
2113     nxt_int_t            ret;
2114     nxt_bool_t           wildcard;
2115     nxt_sockaddr_t       *sa;
2116     nxt_socket_conf_t    *skcf;
2117     nxt_listen_socket_t  *ls;
2118 
2119     sa = nxt_sockaddr_parse(tmcf->mem_pool, name);
2120     if (nxt_slow_path(sa == NULL)) {
2121         nxt_alert(task, "invalid listener \"%V\"", name);
2122         return NULL;
2123     }
2124 
2125     sa->type = SOCK_STREAM;
2126 
2127     nxt_debug(task, "router listener: \"%*s\"",
2128               (size_t) sa->length, nxt_sockaddr_start(sa));
2129 
2130     skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t));
2131     if (nxt_slow_path(skcf == NULL)) {
2132         return NULL;
2133     }
2134 
2135     size = nxt_sockaddr_size(sa);
2136 
2137     ret = nxt_router_listen_socket_find(tmcf, skcf, sa);
2138 
2139     if (ret != NXT_OK) {
2140 
2141         ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size);
2142         if (nxt_slow_path(ls == NULL)) {
2143             return NULL;
2144         }
2145 
2146         skcf->listen = ls;
2147 
2148         ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t));
2149         nxt_memcpy(ls->sockaddr, sa, size);
2150 
2151         nxt_listen_socket_remote_size(ls);
2152 
2153         ls->socket = -1;
2154         ls->backlog = NXT_LISTEN_BACKLOG;
2155         ls->flags = NXT_NONBLOCK;
2156         ls->read_after_accept = 1;
2157     }
2158 
2159     switch (sa->u.sockaddr.sa_family) {
2160 #if (NXT_HAVE_UNIX_DOMAIN)
2161     case AF_UNIX:
2162         wildcard = 0;
2163         break;
2164 #endif
2165 #if (NXT_INET6)
2166     case AF_INET6:
2167         wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr);
2168         break;
2169 #endif
2170     case AF_INET:
2171     default:
2172         wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY);
2173         break;
2174     }
2175 
2176     if (!wildcard) {
2177         skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size);
2178         if (nxt_slow_path(skcf->sockaddr == NULL)) {
2179             return NULL;
2180         }
2181 
2182         nxt_memcpy(skcf->sockaddr, sa, size);
2183     }
2184 
2185     return skcf;
2186 }
2187 
2188 
2189 static nxt_int_t
2190 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
2191     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa)
2192 {
2193     nxt_router_t       *router;
2194     nxt_queue_link_t   *qlk;
2195     nxt_socket_conf_t  *skcf;
2196 
2197     router = tmcf->router_conf->router;
2198 
2199     for (qlk = nxt_queue_first(&router->sockets);
2200          qlk != nxt_queue_tail(&router->sockets);
2201          qlk = nxt_queue_next(qlk))
2202     {
2203         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2204 
2205         if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) {
2206             nskcf->listen = skcf->listen;
2207 
2208             nxt_queue_remove(qlk);
2209             nxt_queue_insert_tail(&keeping_sockets, qlk);
2210 
2211             nxt_queue_insert_tail(&updating_sockets, &nskcf->link);
2212 
2213             return NXT_OK;
2214         }
2215     }
2216 
2217     nxt_queue_insert_tail(&pending_sockets, &nskcf->link);
2218 
2219     return NXT_DECLINED;
2220 }
2221 
2222 
2223 static void
2224 nxt_router_listen_socket_rpc_create(nxt_task_t *task,
2225     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)
2226 {
2227     size_t            size;
2228     uint32_t          stream;
2229     nxt_int_t         ret;
2230     nxt_buf_t         *b;
2231     nxt_port_t        *main_port, *router_port;
2232     nxt_runtime_t     *rt;
2233     nxt_socket_rpc_t  *rpc;
2234 
2235     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
2236     if (rpc == NULL) {
2237         goto fail;
2238     }
2239 
2240     rpc->socket_conf = skcf;
2241     rpc->temp_conf = tmcf;
2242 
2243     size = nxt_sockaddr_size(skcf->listen->sockaddr);
2244 
2245     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2246     if (b == NULL) {
2247         goto fail;
2248     }
2249 
2250     b->completion_handler = nxt_router_dummy_buf_completion;
2251 
2252     b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size);
2253 
2254     rt = task->thread->runtime;
2255     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2256     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2257 
2258     stream = nxt_port_rpc_register_handler(task, router_port,
2259                                            nxt_router_listen_socket_ready,
2260                                            nxt_router_listen_socket_error,
2261                                            main_port->pid, rpc);
2262     if (nxt_slow_path(stream == 0)) {
2263         goto fail;
2264     }
2265 
2266     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1,
2267                                 stream, router_port->id, b);
2268 
2269     if (nxt_slow_path(ret != NXT_OK)) {
2270         nxt_port_rpc_cancel(task, router_port, stream);
2271         goto fail;
2272     }
2273 
2274     return;
2275 
2276 fail:
2277 
2278     nxt_router_conf_error(task, tmcf);
2279 }
2280 
2281 
2282 static void
2283 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2284     void *data)
2285 {
2286     nxt_int_t         ret;
2287     nxt_socket_t      s;
2288     nxt_socket_rpc_t  *rpc;
2289 
2290     rpc = data;
2291 
2292     s = msg->fd[0];
2293 
2294     ret = nxt_socket_nonblocking(task, s);
2295     if (nxt_slow_path(ret != NXT_OK)) {
2296         goto fail;
2297     }
2298 
2299     nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr);
2300 
2301     ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
2302     if (nxt_slow_path(ret != NXT_OK)) {
2303         goto fail;
2304     }
2305 
2306     rpc->socket_conf->listen->socket = s;
2307 
2308     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2309                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2310 
2311     return;
2312 
2313 fail:
2314 
2315     nxt_socket_close(task, s);
2316 
2317     nxt_router_conf_error(task, rpc->temp_conf);
2318 }
2319 
2320 
2321 static void
2322 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2323     void *data)
2324 {
2325     nxt_socket_rpc_t        *rpc;
2326     nxt_router_temp_conf_t  *tmcf;
2327 
2328     rpc = data;
2329     tmcf = rpc->temp_conf;
2330 
2331 #if 0
2332     u_char                  *p;
2333     size_t                  size;
2334     uint8_t                 error;
2335     nxt_buf_t               *in, *out;
2336     nxt_sockaddr_t          *sa;
2337 
2338     static nxt_str_t  socket_errors[] = {
2339         nxt_string("ListenerSystem"),
2340         nxt_string("ListenerNoIPv6"),
2341         nxt_string("ListenerPort"),
2342         nxt_string("ListenerInUse"),
2343         nxt_string("ListenerNoAddress"),
2344         nxt_string("ListenerNoAccess"),
2345         nxt_string("ListenerPath"),
2346     };
2347 
2348     sa = rpc->socket_conf->listen->sockaddr;
2349 
2350     in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size);
2351 
2352     if (nxt_slow_path(in == NULL)) {
2353         return;
2354     }
2355 
2356     p = in->mem.pos;
2357 
2358     error = *p++;
2359 
2360     size = nxt_length("listen socket error: ")
2361            + nxt_length("{listener: \"\", code:\"\", message: \"\"}")
2362            + sa->length + socket_errors[error].length + (in->mem.free - p);
2363 
2364     out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2365     if (nxt_slow_path(out == NULL)) {
2366         return;
2367     }
2368 
2369     out->mem.free = nxt_sprintf(out->mem.free, out->mem.end,
2370                         "listen socket error: "
2371                         "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}",
2372                         (size_t) sa->length, nxt_sockaddr_start(sa),
2373                         &socket_errors[error], in->mem.free - p, p);
2374 
2375     nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos);
2376 #endif
2377 
2378     nxt_router_conf_error(task, tmcf);
2379 }
2380 
2381 
2382 #if (NXT_TLS)
2383 
2384 static void
2385 nxt_router_tls_rpc_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2386     nxt_router_tlssock_t *tls)
2387 {
2388     nxt_socket_rpc_t  *rpc;
2389 
2390     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
2391     if (rpc == NULL) {
2392         nxt_router_conf_error(task, tmcf);
2393         return;
2394     }
2395 
2396     rpc->socket_conf = tls->conf;
2397     rpc->temp_conf = tmcf;
2398 
2399     nxt_cert_store_get(task, &tls->name, tmcf->mem_pool,
2400                        nxt_router_tls_rpc_handler, rpc);
2401 }
2402 
2403 
2404 static void
2405 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2406     void *data)
2407 {
2408     nxt_mp_t               *mp;
2409     nxt_int_t              ret;
2410     nxt_tls_conf_t         *tlscf;
2411     nxt_socket_rpc_t       *rpc;
2412     nxt_router_temp_conf_t *tmcf;
2413 
2414     nxt_debug(task, "tls rpc handler");
2415 
2416     rpc = data;
2417     tmcf = rpc->temp_conf;
2418 
2419     if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) {
2420         goto fail;
2421     }
2422 
2423     mp = tmcf->router_conf->mem_pool;
2424 
2425     tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t));
2426     if (nxt_slow_path(tlscf == NULL)) {
2427         goto fail;
2428     }
2429 
2430     tlscf->chain_file = msg->fd[0];
2431 
2432     ret = task->thread->runtime->tls->server_init(task, tlscf);
2433     if (nxt_slow_path(ret != NXT_OK)) {
2434         goto fail;
2435     }
2436 
2437     rpc->socket_conf->tls = tlscf;
2438 
2439     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2440                        nxt_router_conf_apply, task, tmcf, NULL);
2441     return;
2442 
2443 fail:
2444 
2445     nxt_router_conf_error(task, tmcf);
2446 }
2447 
2448 #endif
2449 
2450 
2451 static void
2452 nxt_router_app_rpc_create(nxt_task_t *task,
2453     nxt_router_temp_conf_t *tmcf, nxt_app_t *app)
2454 {
2455     size_t         size;
2456     uint32_t       stream;
2457     nxt_int_t      ret;
2458     nxt_buf_t      *b;
2459     nxt_port_t     *main_port, *router_port;
2460     nxt_runtime_t  *rt;
2461     nxt_app_rpc_t  *rpc;
2462 
2463     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_app_rpc_t));
2464     if (rpc == NULL) {
2465         goto fail;
2466     }
2467 
2468     rpc->app = app;
2469     rpc->temp_conf = tmcf;
2470 
2471     nxt_debug(task, "app '%V' prefork", &app->name);
2472 
2473     size = app->name.length + 1 + app->conf.length;
2474 
2475     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2476     if (nxt_slow_path(b == NULL)) {
2477         goto fail;
2478     }
2479 
2480     b->completion_handler = nxt_router_dummy_buf_completion;
2481 
2482     nxt_buf_cpystr(b, &app->name);
2483     *b->mem.free++ = '\0';
2484     nxt_buf_cpystr(b, &app->conf);
2485 
2486     rt = task->thread->runtime;
2487     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2488     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2489 
2490     stream = nxt_port_rpc_register_handler(task, router_port,
2491                                            nxt_router_app_prefork_ready,
2492                                            nxt_router_app_prefork_error,
2493                                            -1, rpc);
2494     if (nxt_slow_path(stream == 0)) {
2495         goto fail;
2496     }
2497 
2498     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS,
2499                                 -1, stream, router_port->id, b);
2500 
2501     if (nxt_slow_path(ret != NXT_OK)) {
2502         nxt_port_rpc_cancel(task, router_port, stream);
2503         goto fail;
2504     }
2505 
2506     app->pending_processes++;
2507 
2508     return;
2509 
2510 fail:
2511 
2512     nxt_router_conf_error(task, tmcf);
2513 }
2514 
2515 
2516 static void
2517 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2518     void *data)
2519 {
2520     nxt_app_t           *app;
2521     nxt_port_t          *port;
2522     nxt_app_rpc_t       *rpc;
2523     nxt_event_engine_t  *engine;
2524 
2525     rpc = data;
2526     app = rpc->app;
2527 
2528     port = msg->u.new_port;
2529 
2530     nxt_assert(port != NULL);
2531     nxt_assert(port->type == NXT_PROCESS_APP);
2532     nxt_assert(port->id == 0);
2533 
2534     port->app = app;
2535     port->main_app_port = port;
2536 
2537     app->pending_processes--;
2538     app->processes++;
2539     app->idle_processes++;
2540 
2541     engine = task->thread->engine;
2542 
2543     nxt_queue_insert_tail(&app->ports, &port->app_link);
2544     nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
2545 
2546     nxt_debug(task, "app '%V' move new port %PI:%d to spare_ports",
2547               &app->name, port->pid, port->id);
2548 
2549     nxt_port_hash_add(&app->port_hash, port);
2550     app->port_hash_count++;
2551 
2552     port->idle_start = 0;
2553 
2554     nxt_port_inc_use(port);
2555 
2556     nxt_router_app_shared_port_send(task, port);
2557 
2558     nxt_work_queue_add(&engine->fast_work_queue,
2559                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2560 }
2561 
2562 
2563 static void
2564 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2565     void *data)
2566 {
2567     nxt_app_t               *app;
2568     nxt_app_rpc_t           *rpc;
2569     nxt_router_temp_conf_t  *tmcf;
2570 
2571     rpc = data;
2572     app = rpc->app;
2573     tmcf = rpc->temp_conf;
2574 
2575     nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"",
2576             &app->name);
2577 
2578     app->pending_processes--;
2579 
2580     nxt_router_conf_error(task, tmcf);
2581 }
2582 
2583 
2584 static nxt_int_t
2585 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
2586     nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
2587 {
2588     nxt_int_t                 ret;
2589     nxt_uint_t                n, threads;
2590     nxt_queue_link_t          *qlk;
2591     nxt_router_engine_conf_t  *recf;
2592 
2593     threads = tmcf->router_conf->threads;
2594 
2595     tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
2596                                      sizeof(nxt_router_engine_conf_t));
2597     if (nxt_slow_path(tmcf->engines == NULL)) {
2598         return NXT_ERROR;
2599     }
2600 
2601     n = 0;
2602 
2603     for (qlk = nxt_queue_first(&router->engines);
2604          qlk != nxt_queue_tail(&router->engines);
2605          qlk = nxt_queue_next(qlk))
2606     {
2607         recf = nxt_array_zero_add(tmcf->engines);
2608         if (nxt_slow_path(recf == NULL)) {
2609             return NXT_ERROR;
2610         }
2611 
2612         recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0);
2613 
2614         if (n < threads) {
2615             recf->action = NXT_ROUTER_ENGINE_KEEP;
2616             ret = nxt_router_engine_conf_update(tmcf, recf);
2617 
2618         } else {
2619             recf->action = NXT_ROUTER_ENGINE_DELETE;
2620             ret = nxt_router_engine_conf_delete(tmcf, recf);
2621         }
2622 
2623         if (nxt_slow_path(ret != NXT_OK)) {
2624             return ret;
2625         }
2626 
2627         n++;
2628     }
2629 
2630     tmcf->new_threads = n;
2631 
2632     while (n < threads) {
2633         recf = nxt_array_zero_add(tmcf->engines);
2634         if (nxt_slow_path(recf == NULL)) {
2635             return NXT_ERROR;
2636         }
2637 
2638         recf->action = NXT_ROUTER_ENGINE_ADD;
2639 
2640         recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
2641         if (nxt_slow_path(recf->engine == NULL)) {
2642             return NXT_ERROR;
2643         }
2644 
2645         ret = nxt_router_engine_conf_create(tmcf, recf);
2646         if (nxt_slow_path(ret != NXT_OK)) {
2647             return ret;
2648         }
2649 
2650         n++;
2651     }
2652 
2653     return NXT_OK;
2654 }
2655 
2656 
2657 static nxt_int_t
2658 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
2659     nxt_router_engine_conf_t *recf)
2660 {
2661     nxt_int_t  ret;
2662 
2663     ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
2664                                           nxt_router_listen_socket_create);
2665     if (nxt_slow_path(ret != NXT_OK)) {
2666         return ret;
2667     }
2668 
2669     ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
2670                                           nxt_router_listen_socket_create);
2671     if (nxt_slow_path(ret != NXT_OK)) {
2672         return ret;
2673     }
2674 
2675     return ret;
2676 }
2677 
2678 
2679 static nxt_int_t
2680 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
2681     nxt_router_engine_conf_t *recf)
2682 {
2683     nxt_int_t  ret;
2684 
2685     ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
2686                                           nxt_router_listen_socket_create);
2687     if (nxt_slow_path(ret != NXT_OK)) {
2688         return ret;
2689     }
2690 
2691     ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
2692                                           nxt_router_listen_socket_update);
2693     if (nxt_slow_path(ret != NXT_OK)) {
2694         return ret;
2695     }
2696 
2697     ret = nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
2698     if (nxt_slow_path(ret != NXT_OK)) {
2699         return ret;
2700     }
2701 
2702     return ret;
2703 }
2704 
2705 
2706 static nxt_int_t
2707 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
2708     nxt_router_engine_conf_t *recf)
2709 {
2710     nxt_int_t  ret;
2711 
2712     ret = nxt_router_engine_quit(tmcf, recf);
2713     if (nxt_slow_path(ret != NXT_OK)) {
2714         return ret;
2715     }
2716 
2717     ret = nxt_router_engine_joints_delete(tmcf, recf, &updating_sockets);
2718     if (nxt_slow_path(ret != NXT_OK)) {
2719         return ret;
2720     }
2721 
2722     return nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
2723 }
2724 
2725 
2726 static nxt_int_t
2727 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
2728     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
2729     nxt_work_handler_t handler)
2730 {
2731     nxt_int_t                ret;
2732     nxt_joint_job_t          *job;
2733     nxt_queue_link_t         *qlk;
2734     nxt_socket_conf_t        *skcf;
2735     nxt_socket_conf_joint_t  *joint;
2736 
2737     for (qlk = nxt_queue_first(sockets);
2738          qlk != nxt_queue_tail(sockets);
2739          qlk = nxt_queue_next(qlk))
2740     {
2741         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
2742         if (nxt_slow_path(job == NULL)) {
2743             return NXT_ERROR;
2744         }
2745 
2746         job->work.next = recf->jobs;
2747         recf->jobs = &job->work;
2748 
2749         job->task = tmcf->engine->task;
2750         job->work.handler = handler;
2751         job->work.task = &job->task;
2752         job->work.obj = job;
2753         job->tmcf = tmcf;
2754 
2755         tmcf->count++;
2756 
2757         joint = nxt_mp_alloc(tmcf->router_conf->mem_pool,
2758                              sizeof(nxt_socket_conf_joint_t));
2759         if (nxt_slow_path(joint == NULL)) {
2760             return NXT_ERROR;
2761         }
2762 
2763         job->work.data = joint;
2764 
2765         ret = nxt_upstreams_joint_create(tmcf, &joint->upstreams);
2766         if (nxt_slow_path(ret != NXT_OK)) {
2767             return ret;
2768         }
2769 
2770         joint->count = 1;
2771 
2772         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2773         skcf->count++;
2774         joint->socket_conf = skcf;
2775 
2776         joint->engine = recf->engine;
2777     }
2778 
2779     return NXT_OK;
2780 }
2781 
2782 
2783 static nxt_int_t
2784 nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
2785     nxt_router_engine_conf_t *recf)
2786 {
2787     nxt_joint_job_t  *job;
2788 
2789     job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
2790     if (nxt_slow_path(job == NULL)) {
2791         return NXT_ERROR;
2792     }
2793 
2794     job->work.next = recf->jobs;
2795     recf->jobs = &job->work;
2796 
2797     job->task = tmcf->engine->task;
2798     job->work.handler = nxt_router_worker_thread_quit;
2799     job->work.task = &job->task;
2800     job->work.obj = NULL;
2801     job->work.data = NULL;
2802     job->tmcf = NULL;
2803 
2804     return NXT_OK;
2805 }
2806 
2807 
2808 static nxt_int_t
2809 nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
2810     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets)
2811 {
2812     nxt_joint_job_t   *job;
2813     nxt_queue_link_t  *qlk;
2814 
2815     for (qlk = nxt_queue_first(sockets);
2816          qlk != nxt_queue_tail(sockets);
2817          qlk = nxt_queue_next(qlk))
2818     {
2819         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
2820         if (nxt_slow_path(job == NULL)) {
2821             return NXT_ERROR;
2822         }
2823 
2824         job->work.next = recf->jobs;
2825         recf->jobs = &job->work;
2826 
2827         job->task = tmcf->engine->task;
2828         job->work.handler = nxt_router_listen_socket_delete;
2829         job->work.task = &job->task;
2830         job->work.obj = job;
2831         job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2832         job->tmcf = tmcf;
2833 
2834         tmcf->count++;
2835     }
2836 
2837     return NXT_OK;
2838 }
2839 
2840 
2841 static nxt_int_t
2842 nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
2843     nxt_router_temp_conf_t *tmcf)
2844 {
2845     nxt_int_t                 ret;
2846     nxt_uint_t                i, threads;
2847     nxt_router_engine_conf_t  *recf;
2848 
2849     recf = tmcf->engines->elts;
2850     threads = tmcf->router_conf->threads;
2851 
2852     for (i = tmcf->new_threads; i < threads; i++) {
2853         ret = nxt_router_thread_create(task, rt, recf[i].engine);
2854         if (nxt_slow_path(ret != NXT_OK)) {
2855             return ret;
2856         }
2857     }
2858 
2859     return NXT_OK;
2860 }
2861 
2862 
2863 static nxt_int_t
2864 nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
2865     nxt_event_engine_t *engine)
2866 {
2867     nxt_int_t            ret;
2868     nxt_thread_link_t    *link;
2869     nxt_thread_handle_t  handle;
2870 
2871     link = nxt_zalloc(sizeof(nxt_thread_link_t));
2872 
2873     if (nxt_slow_path(link == NULL)) {
2874         return NXT_ERROR;
2875     }
2876 
2877     link->start = nxt_router_thread_start;
2878     link->engine = engine;
2879     link->work.handler = nxt_router_thread_exit_handler;
2880     link->work.task = task;
2881     link->work.data = link;
2882 
2883     nxt_queue_insert_tail(&rt->engines, &engine->link);
2884 
2885     ret = nxt_thread_create(&handle, link);
2886 
2887     if (nxt_slow_path(ret != NXT_OK)) {
2888         nxt_queue_remove(&engine->link);
2889     }
2890 
2891     return ret;
2892 }
2893 
2894 
2895 static void
2896 nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
2897     nxt_router_temp_conf_t *tmcf)
2898 {
2899     nxt_app_t  *app;
2900 
2901     nxt_queue_each(app, &router->apps, nxt_app_t, link) {
2902 
2903         nxt_router_app_unlink(task, app);
2904 
2905     } nxt_queue_loop;
2906 
2907     nxt_queue_add(&router->apps, &tmcf->previous);
2908     nxt_queue_add(&router->apps, &tmcf->apps);
2909 }
2910 
2911 
2912 static void
2913 nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf)
2914 {
2915     nxt_uint_t                n;
2916     nxt_event_engine_t        *engine;
2917     nxt_router_engine_conf_t  *recf;
2918 
2919     recf = tmcf->engines->elts;
2920 
2921     for (n = tmcf->engines->nelts; n != 0; n--) {
2922         engine = recf->engine;
2923 
2924         switch (recf->action) {
2925 
2926         case NXT_ROUTER_ENGINE_KEEP:
2927             break;
2928 
2929         case NXT_ROUTER_ENGINE_ADD:
2930             nxt_queue_insert_tail(&router->engines, &engine->link0);
2931             break;
2932 
2933         case NXT_ROUTER_ENGINE_DELETE:
2934             nxt_queue_remove(&engine->link0);
2935             break;
2936         }
2937 
2938         nxt_router_engine_post(engine, recf->jobs);
2939 
2940         recf++;
2941     }
2942 }
2943 
2944 
2945 static void
2946 nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs)
2947 {
2948     nxt_work_t  *work, *next;
2949 
2950     for (work = jobs; work != NULL; work = next) {
2951         next = work->next;
2952         work->next = NULL;
2953 
2954         nxt_event_engine_post(engine, work);
2955     }
2956 }
2957 
2958 
2959 static nxt_port_handlers_t  nxt_router_app_port_handlers = {
2960     .rpc_error       = nxt_port_rpc_handler,
2961     .mmap            = nxt_port_mmap_handler,
2962     .data            = nxt_port_rpc_handler,
2963     .oosm            = nxt_router_oosm_handler,
2964     .req_headers_ack = nxt_port_rpc_handler,
2965 };
2966 
2967 
2968 static void
2969 nxt_router_thread_start(void *data)
2970 {
2971     nxt_int_t           ret;
2972     nxt_port_t          *port;
2973     nxt_task_t          *task;
2974     nxt_work_t          *work;
2975     nxt_thread_t        *thread;
2976     nxt_thread_link_t   *link;
2977     nxt_event_engine_t  *engine;
2978 
2979     link = data;
2980     engine = link->engine;
2981     task = &engine->task;
2982 
2983     thread = nxt_thread();
2984 
2985     nxt_event_engine_thread_adopt(engine);
2986 
2987     /* STUB */
2988     thread->runtime = engine->task.thread->runtime;
2989 
2990     engine->task.thread = thread;
2991     engine->task.log = thread->log;
2992     thread->engine = engine;
2993     thread->task = &engine->task;
2994 #if 0
2995     thread->fiber = &engine->fibers->fiber;
2996 #endif
2997 
2998     engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
2999     if (nxt_slow_path(engine->mem_pool == NULL)) {
3000         return;
3001     }
3002 
3003     port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid,
3004                         NXT_PROCESS_ROUTER);
3005     if (nxt_slow_path(port == NULL)) {
3006         return;
3007     }
3008 
3009     ret = nxt_port_socket_init(task, port, 0);
3010     if (nxt_slow_path(ret != NXT_OK)) {
3011         nxt_port_use(task, port, -1);
3012         return;
3013     }
3014 
3015     ret = nxt_router_port_queue_init(task, port);
3016     if (nxt_slow_path(ret != NXT_OK)) {
3017         nxt_port_use(task, port, -1);
3018         return;
3019     }
3020 
3021     engine->port = port;
3022 
3023     nxt_port_enable(task, port, &nxt_router_app_port_handlers);
3024 
3025     work = nxt_zalloc(sizeof(nxt_work_t));
3026     if (nxt_slow_path(work == NULL)) {
3027         return;
3028     }
3029 
3030     work->handler = nxt_router_rt_add_port;
3031     work->task = link->work.task;
3032     work->obj = work;
3033     work->data = port;
3034 
3035     nxt_event_engine_post(link->work.task->thread->engine, work);
3036 
3037     nxt_event_engine_start(engine);
3038 }
3039 
3040 
3041 static void
3042 nxt_router_rt_add_port(nxt_task_t *task, void *obj, void *data)
3043 {
3044     nxt_int_t      res;
3045     nxt_port_t     *port;
3046     nxt_runtime_t  *rt;
3047 
3048     rt = task->thread->runtime;
3049     port = data;
3050 
3051     nxt_free(obj);
3052 
3053     res = nxt_port_hash_add(&rt->ports, port);
3054 
3055     if (nxt_fast_path(res == NXT_OK)) {
3056         nxt_port_use(task, port, 1);
3057     }
3058 }
3059 
3060 
3061 static void
3062 nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data)
3063 {
3064     nxt_joint_job_t          *job;
3065     nxt_socket_conf_t        *skcf;
3066     nxt_listen_event_t       *lev;
3067     nxt_listen_socket_t      *ls;
3068     nxt_thread_spinlock_t    *lock;
3069     nxt_socket_conf_joint_t  *joint;
3070 
3071     job = obj;
3072     joint = data;
3073 
3074     nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link);
3075 
3076     skcf = joint->socket_conf;
3077     ls = skcf->listen;
3078 
3079     lev = nxt_listen_event(task, ls);
3080     if (nxt_slow_path(lev == NULL)) {
3081         nxt_router_listen_socket_release(task, skcf);
3082         return;
3083     }
3084 
3085     lev->socket.data = joint;
3086 
3087     lock = &skcf->router_conf->router->lock;
3088 
3089     nxt_thread_spin_lock(lock);
3090     ls->count++;
3091     nxt_thread_spin_unlock(lock);
3092 
3093     job->work.next = NULL;
3094     job->work.handler = nxt_router_conf_wait;
3095 
3096     nxt_event_engine_post(job->tmcf->engine, &job->work);
3097 }
3098 
3099 
3100 nxt_inline nxt_listen_event_t *
3101 nxt_router_listen_event(nxt_queue_t *listen_connections,
3102     nxt_socket_conf_t *skcf)
3103 {
3104     nxt_socket_t        fd;
3105     nxt_queue_link_t    *qlk;
3106     nxt_listen_event_t  *lev;
3107 
3108     fd = skcf->listen->socket;
3109 
3110     for (qlk = nxt_queue_first(listen_connections);
3111          qlk != nxt_queue_tail(listen_connections);
3112          qlk = nxt_queue_next(qlk))
3113     {
3114         lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link);
3115 
3116         if (fd == lev->socket.fd) {
3117             return lev;
3118         }
3119     }
3120 
3121     return NULL;
3122 }
3123 
3124 
3125 static void
3126 nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data)
3127 {
3128     nxt_joint_job_t          *job;
3129     nxt_event_engine_t       *engine;
3130     nxt_listen_event_t       *lev;
3131     nxt_socket_conf_joint_t  *joint, *old;
3132 
3133     job = obj;
3134     joint = data;
3135 
3136     engine = task->thread->engine;
3137 
3138     nxt_queue_insert_tail(&engine->joints, &joint->link);
3139 
3140     lev = nxt_router_listen_event(&engine->listen_connections,
3141                                   joint->socket_conf);
3142 
3143     old = lev->socket.data;
3144     lev->socket.data = joint;
3145     lev->listen = joint->socket_conf->listen;
3146 
3147     job->work.next = NULL;
3148     job->work.handler = nxt_router_conf_wait;
3149 
3150     nxt_event_engine_post(job->tmcf->engine, &job->work);
3151 
3152     /*
3153      * The task is allocated from configuration temporary
3154      * memory pool so it can be freed after engine post operation.
3155      */
3156 
3157     nxt_router_conf_release(&engine->task, old);
3158 }
3159 
3160 
3161 static void
3162 nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data)
3163 {
3164     nxt_joint_job_t     *job;
3165     nxt_socket_conf_t   *skcf;
3166     nxt_listen_event_t  *lev;
3167     nxt_event_engine_t  *engine;
3168 
3169     job = obj;
3170     skcf = data;
3171 
3172     engine = task->thread->engine;
3173 
3174     lev = nxt_router_listen_event(&engine->listen_connections, skcf);
3175 
3176     nxt_fd_event_delete(engine, &lev->socket);
3177 
3178     nxt_debug(task, "engine %p: listen socket delete: %d", engine,
3179               lev->socket.fd);
3180 
3181     lev->timer.handler = nxt_router_listen_socket_close;
3182     lev->timer.work_queue = &engine->fast_work_queue;
3183 
3184     nxt_timer_add(engine, &lev->timer, 0);
3185 
3186     job->work.next = NULL;
3187     job->work.handler = nxt_router_conf_wait;
3188 
3189     nxt_event_engine_post(job->tmcf->engine, &job->work);
3190 }
3191 
3192 
3193 static void
3194 nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, void *data)
3195 {
3196     nxt_event_engine_t  *engine;
3197 
3198     nxt_debug(task, "router worker thread quit");
3199 
3200     engine = task->thread->engine;
3201 
3202     engine->shutdown = 1;
3203 
3204     if (nxt_queue_is_empty(&engine->joints)) {
3205         nxt_thread_exit(task->thread);
3206     }
3207 }
3208 
3209 
3210 static void
3211 nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data)
3212 {
3213     nxt_timer_t              *timer;
3214     nxt_listen_event_t       *lev;
3215     nxt_socket_conf_joint_t  *joint;
3216 
3217     timer = obj;
3218     lev = nxt_timer_data(timer, nxt_listen_event_t, timer);
3219 
3220     nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine,
3221               lev->socket.fd);
3222 
3223     nxt_queue_remove(&lev->link);
3224 
3225     joint = lev->socket.data;
3226     lev->socket.data = NULL;
3227 
3228     /* 'task' refers to lev->task and we cannot use after nxt_free() */
3229     task = &task->thread->engine->task;
3230 
3231     nxt_router_listen_socket_release(task, joint->socket_conf);
3232 
3233     nxt_router_listen_event_release(task, lev, joint);
3234 }
3235 
3236 
3237 static void
3238 nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf)
3239 {
3240     nxt_listen_socket_t    *ls;
3241     nxt_thread_spinlock_t  *lock;
3242 
3243     ls = skcf->listen;
3244     lock = &skcf->router_conf->router->lock;
3245 
3246     nxt_thread_spin_lock(lock);
3247 
3248     nxt_debug(task, "engine %p: listen socket release: ls->count %D",
3249               task->thread->engine, ls->count);
3250 
3251     if (--ls->count != 0) {
3252         ls = NULL;
3253     }
3254 
3255     nxt_thread_spin_unlock(lock);
3256 
3257     if (ls != NULL) {
3258         nxt_socket_close(task, ls->socket);
3259         nxt_free(ls);
3260     }
3261 }
3262 
3263 
3264 void
3265 nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev,
3266     nxt_socket_conf_joint_t *joint)
3267 {
3268     nxt_event_engine_t  *engine;
3269 
3270     nxt_debug(task, "listen event count: %D", lev->count);
3271 
3272     engine = task->thread->engine;
3273 
3274     if (--lev->count == 0) {
3275         if (lev->next != NULL) {
3276             nxt_sockaddr_cache_free(engine, lev->next);
3277 
3278             nxt_conn_free(task, lev->next);
3279         }
3280 
3281         nxt_free(lev);
3282     }
3283 
3284     if (joint != NULL) {
3285         nxt_router_conf_release(task, joint);
3286     }
3287 
3288     if (engine->shutdown && nxt_queue_is_empty(&engine->joints)) {
3289         nxt_thread_exit(task->thread);
3290     }
3291 }
3292 
3293 
3294 void
3295 nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
3296 {
3297     nxt_socket_conf_t      *skcf;
3298     nxt_router_conf_t      *rtcf;
3299     nxt_thread_spinlock_t  *lock;
3300 
3301     nxt_debug(task, "conf joint %p count: %D", joint, joint->count);
3302 
3303     if (--joint->count != 0) {
3304         return;
3305     }
3306 
3307     nxt_queue_remove(&joint->link);
3308 
3309     /*
3310      * The joint content can not be safely used after the critical
3311      * section protected by the spinlock because its memory pool may
3312      * be already destroyed by another thread.
3313      */
3314     skcf = joint->socket_conf;
3315     rtcf = skcf->router_conf;
3316     lock = &rtcf->router->lock;
3317 
3318     nxt_thread_spin_lock(lock);
3319 
3320     nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count,
3321               rtcf, rtcf->count);
3322 
3323     if (--skcf->count != 0) {
3324         skcf = NULL;
3325         rtcf = NULL;
3326 
3327     } else {
3328         nxt_queue_remove(&skcf->link);
3329 
3330         if (--rtcf->count != 0) {
3331             rtcf = NULL;
3332         }
3333     }
3334 
3335     nxt_thread_spin_unlock(lock);
3336 
3337 #if (NXT_TLS)
3338     if (skcf != NULL && skcf->tls != NULL) {
3339         task->thread->runtime->tls->server_free(task, skcf->tls);
3340     }
3341 #endif
3342 
3343     /* TODO remove engine->port */
3344 
3345     if (rtcf != NULL) {
3346         nxt_debug(task, "old router conf is destroyed");
3347 
3348         nxt_router_apps_hash_use(task, rtcf, -1);
3349 
3350         nxt_router_access_log_release(task, lock, rtcf->access_log);
3351 
3352         nxt_mp_thread_adopt(rtcf->mem_pool);
3353 
3354         nxt_mp_destroy(rtcf->mem_pool);
3355     }
3356 }
3357 
3358 
3359 static void
3360 nxt_router_access_log_writer(nxt_task_t *task, nxt_http_request_t *r,
3361     nxt_router_access_log_t *access_log)
3362 {
3363     size_t     size;
3364     u_char     *buf, *p;
3365     nxt_off_t  bytes;
3366 
3367     static nxt_time_string_t  date_cache = {
3368         (nxt_atomic_uint_t) -1,
3369         nxt_router_access_log_date,
3370         "%02d/%s/%4d:%02d:%02d:%02d %c%02d%02d",
3371         nxt_length("31/Dec/1986:19:40:00 +0300"),
3372         NXT_THREAD_TIME_LOCAL,
3373         NXT_THREAD_TIME_SEC,
3374     };
3375 
3376     size = r->remote->address_length
3377            + 6                  /* ' - - [' */
3378            + date_cache.size
3379            + 3                  /* '] "' */
3380            + r->method->length
3381            + 1                  /* space */
3382            + r->target.length
3383            + 1                  /* space */
3384            + r->version.length
3385            + 2                  /* '" ' */
3386            + 3                  /* status */
3387            + 1                  /* space */
3388            + NXT_OFF_T_LEN
3389            + 2                  /* ' "' */
3390            + (r->referer != NULL ? r->referer->value_length : 1)
3391            + 3                  /* '" "' */
3392            + (r->user_agent != NULL ? r->user_agent->value_length : 1)
3393            + 2                  /* '"\n' */
3394     ;
3395 
3396     buf = nxt_mp_nget(r->mem_pool, size);
3397     if (nxt_slow_path(buf == NULL)) {
3398         return;
3399     }
3400 
3401     p = nxt_cpymem(buf, nxt_sockaddr_address(r->remote),
3402                    r->remote->address_length);
3403 
3404     p = nxt_cpymem(p, " - - [", 6);
3405 
3406     p = nxt_thread_time_string(task->thread, &date_cache, p);
3407 
3408     p = nxt_cpymem(p, "] \"", 3);
3409 
3410     if (r->method->length != 0) {
3411         p = nxt_cpymem(p, r->method->start, r->method->length);
3412 
3413         if (r->target.length != 0) {
3414             *p++ = ' ';
3415             p = nxt_cpymem(p, r->target.start, r->target.length);
3416 
3417             if (r->version.length != 0) {
3418                 *p++ = ' ';
3419                 p = nxt_cpymem(p, r->version.start, r->version.length);
3420             }
3421         }
3422 
3423     } else {
3424         *p++ = '-';
3425     }
3426 
3427     p = nxt_cpymem(p, "\" ", 2);
3428 
3429     p = nxt_sprintf(p, p + 3, "%03d", r->status);
3430 
3431     *p++ = ' ';
3432 
3433     bytes = nxt_http_proto[r->protocol].body_bytes_sent(task, r->proto);
3434 
3435     p = nxt_sprintf(p, p + NXT_OFF_T_LEN, "%O", bytes);
3436 
3437     p = nxt_cpymem(p, " \"", 2);
3438 
3439     if (r->referer != NULL) {
3440         p = nxt_cpymem(p, r->referer->value, r->referer->value_length);
3441 
3442     } else {
3443         *p++ = '-';
3444     }
3445 
3446     p = nxt_cpymem(p, "\" \"", 3);
3447 
3448     if (r->user_agent != NULL) {
3449         p = nxt_cpymem(p, r->user_agent->value, r->user_agent->value_length);
3450 
3451     } else {
3452         *p++ = '-';
3453     }
3454 
3455     p = nxt_cpymem(p, "\"\n", 2);
3456 
3457     nxt_fd_write(access_log->fd, buf, p - buf);
3458 }
3459 
3460 
3461 static u_char *
3462 nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, struct tm *tm,
3463     size_t size, const char *format)
3464 {
3465     u_char  sign;
3466     time_t  gmtoff;
3467 
3468     static const char  *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun",
3469                                     "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
3470 
3471     gmtoff = nxt_timezone(tm) / 60;
3472 
3473     if (gmtoff < 0) {
3474         gmtoff = -gmtoff;
3475         sign = '-';
3476 
3477     } else {
3478         sign = '+';
3479     }
3480 
3481     return nxt_sprintf(buf, buf + size, format,
3482                        tm->tm_mday, month[tm->tm_mon], tm->tm_year + 1900,
3483                        tm->tm_hour, tm->tm_min, tm->tm_sec,
3484                        sign, gmtoff / 60, gmtoff % 60);
3485 }
3486 
3487 
3488 static void
3489 nxt_router_access_log_open(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
3490 {
3491     uint32_t                 stream;
3492     nxt_int_t                ret;
3493     nxt_buf_t                *b;
3494     nxt_port_t               *main_port, *router_port;
3495     nxt_runtime_t            *rt;
3496     nxt_router_access_log_t  *access_log;
3497 
3498     access_log = tmcf->router_conf->access_log;
3499 
3500     b = nxt_buf_mem_alloc(tmcf->mem_pool, access_log->path.length + 1, 0);
3501     if (nxt_slow_path(b == NULL)) {
3502         goto fail;
3503     }
3504 
3505     b->completion_handler = nxt_router_dummy_buf_completion;
3506 
3507     nxt_buf_cpystr(b, &access_log->path);
3508     *b->mem.free++ = '\0';
3509 
3510     rt = task->thread->runtime;
3511     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
3512     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
3513 
3514     stream = nxt_port_rpc_register_handler(task, router_port,
3515                                            nxt_router_access_log_ready,
3516                                            nxt_router_access_log_error,
3517                                            -1, tmcf);
3518     if (nxt_slow_path(stream == 0)) {
3519         goto fail;
3520     }
3521 
3522     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1,
3523                                 stream, router_port->id, b);
3524 
3525     if (nxt_slow_path(ret != NXT_OK)) {
3526         nxt_port_rpc_cancel(task, router_port, stream);
3527         goto fail;
3528     }
3529 
3530     return;
3531 
3532 fail:
3533 
3534     nxt_router_conf_error(task, tmcf);
3535 }
3536 
3537 
3538 static void
3539 nxt_router_access_log_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3540     void *data)
3541 {
3542     nxt_router_temp_conf_t   *tmcf;
3543     nxt_router_access_log_t  *access_log;
3544 
3545     tmcf = data;
3546 
3547     access_log = tmcf->router_conf->access_log;
3548 
3549     access_log->fd = msg->fd[0];
3550 
3551     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
3552                        nxt_router_conf_apply, task, tmcf, NULL);
3553 }
3554 
3555 
3556 static void
3557 nxt_router_access_log_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3558     void *data)
3559 {
3560     nxt_router_temp_conf_t  *tmcf;
3561 
3562     tmcf = data;
3563 
3564     nxt_router_conf_error(task, tmcf);
3565 }
3566 
3567 
3568 static void
3569 nxt_router_access_log_release(nxt_task_t *task, nxt_thread_spinlock_t *lock,
3570     nxt_router_access_log_t *access_log)
3571 {
3572     if (access_log == NULL) {
3573         return;
3574     }
3575 
3576     nxt_thread_spin_lock(lock);
3577 
3578     if (--access_log->count != 0) {
3579         access_log = NULL;
3580     }
3581 
3582     nxt_thread_spin_unlock(lock);
3583 
3584     if (access_log != NULL) {
3585 
3586         if (access_log->fd != -1) {
3587             nxt_fd_close(access_log->fd);
3588         }
3589 
3590         nxt_free(access_log);
3591     }
3592 }
3593 
3594 
3595 typedef struct {
3596     nxt_mp_t                 *mem_pool;
3597     nxt_router_access_log_t  *access_log;
3598 } nxt_router_access_log_reopen_t;
3599 
3600 
3601 static void
3602 nxt_router_access_log_reopen_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
3603 {
3604     nxt_mp_t                        *mp;
3605     uint32_t                        stream;
3606     nxt_int_t                       ret;
3607     nxt_buf_t                       *b;
3608     nxt_port_t                      *main_port, *router_port;
3609     nxt_runtime_t                   *rt;
3610     nxt_router_access_log_t         *access_log;
3611     nxt_router_access_log_reopen_t  *reopen;
3612 
3613     access_log = nxt_router->access_log;
3614 
3615     if (access_log == NULL) {
3616         return;
3617     }
3618 
3619     mp = nxt_mp_create(1024, 128, 256, 32);
3620     if (nxt_slow_path(mp == NULL)) {
3621         return;
3622     }
3623 
3624     reopen = nxt_mp_get(mp, sizeof(nxt_router_access_log_reopen_t));
3625     if (nxt_slow_path(reopen == NULL)) {
3626         goto fail;
3627     }
3628 
3629     reopen->mem_pool = mp;
3630     reopen->access_log = access_log;
3631 
3632     b = nxt_buf_mem_alloc(mp, access_log->path.length + 1, 0);
3633     if (nxt_slow_path(b == NULL)) {
3634         goto fail;
3635     }
3636 
3637     b->completion_handler = nxt_router_access_log_reopen_completion;
3638 
3639     nxt_buf_cpystr(b, &access_log->path);
3640     *b->mem.free++ = '\0';
3641 
3642     rt = task->thread->runtime;
3643     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
3644     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
3645 
3646     stream = nxt_port_rpc_register_handler(task, router_port,
3647                                            nxt_router_access_log_reopen_ready,
3648                                            nxt_router_access_log_reopen_error,
3649                                            -1, reopen);
3650     if (nxt_slow_path(stream == 0)) {
3651         goto fail;
3652     }
3653 
3654     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1,
3655                                 stream, router_port->id, b);
3656 
3657     if (nxt_slow_path(ret != NXT_OK)) {
3658         nxt_port_rpc_cancel(task, router_port, stream);
3659         goto fail;
3660     }
3661 
3662     nxt_mp_retain(mp);
3663 
3664     return;
3665 
3666 fail:
3667 
3668     nxt_mp_destroy(mp);
3669 }
3670 
3671 
3672 static void
3673 nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj, void *data)
3674 {
3675     nxt_mp_t   *mp;
3676     nxt_buf_t  *b;
3677 
3678     b = obj;
3679     mp = b->data;
3680 
3681     nxt_mp_release(mp);
3682 }
3683 
3684 
3685 static void
3686 nxt_router_access_log_reopen_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3687     void *data)
3688 {
3689     nxt_router_access_log_t         *access_log;
3690     nxt_router_access_log_reopen_t  *reopen;
3691 
3692     reopen = data;
3693 
3694     access_log = reopen->access_log;
3695 
3696     if (access_log == nxt_router->access_log) {
3697 
3698         if (nxt_slow_path(dup2(msg->fd[0], access_log->fd) == -1)) {
3699             nxt_alert(task, "dup2(%FD, %FD) failed %E",
3700                       msg->fd[0], access_log->fd, nxt_errno);
3701         }
3702     }
3703 
3704     nxt_fd_close(msg->fd[0]);
3705     nxt_mp_release(reopen->mem_pool);
3706 }
3707 
3708 
3709 static void
3710 nxt_router_access_log_reopen_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3711     void *data)
3712 {
3713     nxt_router_access_log_reopen_t  *reopen;
3714 
3715     reopen = data;
3716 
3717     nxt_mp_release(reopen->mem_pool);
3718 }
3719 
3720 
3721 static void
3722 nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data)
3723 {
3724     nxt_port_t           *port;
3725     nxt_thread_link_t    *link;
3726     nxt_event_engine_t   *engine;
3727     nxt_thread_handle_t  handle;
3728 
3729     handle = (nxt_thread_handle_t) obj;
3730     link = data;
3731 
3732     nxt_thread_wait(handle);
3733 
3734     engine = link->engine;
3735 
3736     nxt_queue_remove(&engine->link);
3737 
3738     port = engine->port;
3739 
3740     // TODO notify all apps
3741 
3742     port->engine = task->thread->engine;
3743     nxt_mp_thread_adopt(port->mem_pool);
3744     nxt_port_use(task, port, -1);
3745 
3746     nxt_mp_thread_adopt(engine->mem_pool);
3747     nxt_mp_destroy(engine->mem_pool);
3748 
3749     nxt_event_engine_free(engine);
3750 
3751     nxt_free(link);
3752 }
3753 
3754 
3755 static void
3756 nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3757     void *data)
3758 {
3759     size_t                  b_size, count;
3760     nxt_int_t               ret;
3761     nxt_app_t               *app;
3762     nxt_buf_t               *b, *next;
3763     nxt_port_t              *app_port;
3764     nxt_unit_field_t        *f;
3765     nxt_http_field_t        *field;
3766     nxt_http_request_t      *r;
3767     nxt_unit_response_t     *resp;
3768     nxt_request_rpc_data_t  *req_rpc_data;
3769 
3770     req_rpc_data = data;
3771 
3772     r = req_rpc_data->request;
3773     if (nxt_slow_path(r == NULL)) {
3774         return;
3775     }
3776 
3777     if (r->error) {
3778         nxt_request_rpc_data_unlink(task, req_rpc_data);
3779         return;
3780     }
3781 
3782     app = req_rpc_data->app;
3783     nxt_assert(app != NULL);
3784 
3785     if (msg->port_msg.type == _NXT_PORT_MSG_REQ_HEADERS_ACK) {
3786         nxt_router_req_headers_ack_handler(task, msg, req_rpc_data);
3787 
3788         return;
3789     }
3790 
3791     b = (msg->size == 0) ? NULL : msg->buf;
3792 
3793     if (msg->port_msg.last != 0) {
3794         nxt_debug(task, "router data create last buf");
3795 
3796         nxt_buf_chain_add(&b, nxt_http_buf_last(r));
3797 
3798         req_rpc_data->rpc_cancel = 0;
3799 
3800         if (req_rpc_data->apr_action == NXT_APR_REQUEST_FAILED) {
3801             req_rpc_data->apr_action = NXT_APR_GOT_RESPONSE;
3802         }
3803 
3804         nxt_request_rpc_data_unlink(task, req_rpc_data);
3805 
3806     } else {
3807         if (app->timeout != 0) {
3808             r->timer.handler = nxt_router_app_timeout;
3809             r->timer_data = req_rpc_data;
3810             nxt_timer_add(task->thread->engine, &r->timer, app->timeout);
3811         }
3812     }
3813 
3814     if (b == NULL) {
3815         return;
3816     }
3817 
3818     if (msg->buf == b) {
3819         /* Disable instant buffer completion/re-using by port. */
3820         msg->buf = NULL;
3821     }
3822 
3823     if (r->header_sent) {
3824         nxt_buf_chain_add(&r->out, b);
3825         nxt_http_request_send_body(task, r, NULL);
3826 
3827     } else {
3828         b_size = nxt_buf_is_mem(b) ? nxt_buf_mem_used_size(&b->mem) : 0;
3829 
3830         if (nxt_slow_path(b_size < sizeof(nxt_unit_response_t))) {
3831             nxt_alert(task, "response buffer too small: %z", b_size);
3832             goto fail;
3833         }
3834 
3835         resp = (void *) b->mem.pos;
3836         count = (b_size - sizeof(nxt_unit_response_t))
3837                     / sizeof(nxt_unit_field_t);
3838 
3839         if (nxt_slow_path(count < resp->fields_count)) {
3840             nxt_alert(task, "response buffer too small for fields count: %D",
3841                       resp->fields_count);
3842             goto fail;
3843         }
3844 
3845         field = NULL;
3846 
3847         for (f = resp->fields; f < resp->fields + resp->fields_count; f++) {
3848             if (f->skip) {
3849                 continue;
3850             }
3851 
3852             field = nxt_list_add(r->resp.fields);
3853 
3854             if (nxt_slow_path(field == NULL)) {
3855                 goto fail;
3856             }
3857 
3858             field->hash = f->hash;
3859             field->skip = 0;
3860             field->hopbyhop = 0;
3861 
3862             field->name_length = f->name_length;
3863             field->value_length = f->value_length;
3864             field->name = nxt_unit_sptr_get(&f->name);
3865             field->value = nxt_unit_sptr_get(&f->value);
3866 
3867             ret = nxt_http_field_process(field, &nxt_response_fields_hash, r);
3868             if (nxt_slow_path(ret != NXT_OK)) {
3869                 goto fail;
3870             }
3871 
3872             nxt_debug(task, "header%s: %*s: %*s",
3873                       (field->skip ? " skipped" : ""),
3874                       (size_t) field->name_length, field->name,
3875                       (size_t) field->value_length, field->value);
3876 
3877             if (field->skip) {
3878                 r->resp.fields->last->nelts--;
3879             }
3880         }
3881 
3882         r->status = resp->status;
3883 
3884         if (resp->piggyback_content_length != 0) {
3885             b->mem.pos = nxt_unit_sptr_get(&resp->piggyback_content);
3886             b->mem.free = b->mem.pos + resp->piggyback_content_length;
3887 
3888         } else {
3889             b->mem.pos = b->mem.free;
3890         }
3891 
3892         if (nxt_buf_mem_used_size(&b->mem) == 0) {
3893             next = b->next;
3894             b->next = NULL;
3895 
3896             nxt_work_queue_add(&task->thread->engine->fast_work_queue,
3897                                b->completion_handler, task, b, b->parent);
3898 
3899             b = next;
3900         }
3901 
3902         if (b != NULL) {
3903             nxt_buf_chain_add(&r->out, b);
3904         }
3905 
3906         nxt_http_request_header_send(task, r, nxt_http_request_send_body, NULL);
3907 
3908         if (r->websocket_handshake
3909             && r->status == NXT_HTTP_SWITCHING_PROTOCOLS)
3910         {
3911             app_port = req_rpc_data->app_port;
3912             if (nxt_slow_path(app_port == NULL)) {
3913                 goto fail;
3914             }
3915 
3916             nxt_thread_mutex_lock(&app->mutex);
3917 
3918             app_port->main_app_port->active_websockets++;
3919 
3920             nxt_thread_mutex_unlock(&app->mutex);
3921 
3922             nxt_router_app_port_release(task, app_port, NXT_APR_UPGRADE);
3923             req_rpc_data->apr_action = NXT_APR_CLOSE;
3924 
3925             nxt_debug(task, "stream #%uD upgrade", req_rpc_data->stream);
3926 
3927             r->state = &nxt_http_websocket;
3928 
3929         } else {
3930             r->state = &nxt_http_request_send_state;
3931         }
3932     }
3933 
3934     return;
3935 
3936 fail:
3937 
3938     nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
3939 
3940     nxt_request_rpc_data_unlink(task, req_rpc_data);
3941 }
3942 
3943 
3944 static void
3945 nxt_router_req_headers_ack_handler(nxt_task_t *task,
3946     nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data)
3947 {
3948     int                 res;
3949     nxt_app_t           *app;
3950     nxt_buf_t           *b;
3951     nxt_bool_t          start_process, unlinked;
3952     nxt_port_t          *app_port, *main_app_port, *idle_port;
3953     nxt_queue_link_t    *idle_lnk;
3954     nxt_http_request_t  *r;
3955 
3956     nxt_debug(task, "stream #%uD: got ack from %PI:%d",
3957               req_rpc_data->stream,
3958               msg->port_msg.pid, msg->port_msg.reply_port);
3959 
3960     nxt_port_rpc_ex_set_peer(task, msg->port, req_rpc_data,
3961                              msg->port_msg.pid);
3962 
3963     app = req_rpc_data->app;
3964     r = req_rpc_data->request;
3965 
3966     start_process = 0;
3967     unlinked = 0;
3968 
3969     nxt_thread_mutex_lock(&app->mutex);
3970 
3971     if (r->app_link.next != NULL) {
3972         nxt_queue_remove(&r->app_link);
3973         r->app_link.next = NULL;
3974 
3975         unlinked = 1;
3976     }
3977 
3978     app_port = nxt_port_hash_find(&app->port_hash, msg->port_msg.pid,
3979                                   msg->port_msg.reply_port);
3980     if (nxt_slow_path(app_port == NULL)) {
3981         nxt_thread_mutex_unlock(&app->mutex);
3982 
3983         nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
3984 
3985         if (unlinked) {
3986             nxt_mp_release(r->mem_pool);
3987         }
3988 
3989         return;
3990     }
3991 
3992     main_app_port = app_port->main_app_port;
3993 
3994     if (nxt_queue_chk_remove(&main_app_port->idle_link)) {
3995         app->idle_processes--;
3996 
3997         nxt_debug(task, "app '%V' move port %PI:%d out of %s (ack)",
3998                   &app->name, main_app_port->pid, main_app_port->id,
3999                   (main_app_port->idle_start ? "idle_ports" : "spare_ports"));
4000 
4001         /* Check port was in 'spare_ports' using idle_start field. */
4002         if (main_app_port->idle_start == 0
4003             && app->idle_processes >= app->spare_processes)
4004         {
4005             /*
4006              * If there is a vacant space in spare ports,
4007              * move the last idle to spare_ports.
4008              */
4009             nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4010 
4011             idle_lnk = nxt_queue_last(&app->idle_ports);
4012             idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link);
4013             nxt_queue_remove(idle_lnk);
4014 
4015             nxt_queue_insert_tail(&app->spare_ports, idle_lnk);
4016 
4017             idle_port->idle_start = 0;
4018 
4019             nxt_debug(task, "app '%V' move port %PI:%d from idle_ports "
4020                       "to spare_ports",
4021                       &app->name, idle_port->pid, idle_port->id);
4022         }
4023 
4024         if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) {
4025             app->pending_processes++;
4026             start_process = 1;
4027         }
4028     }
4029 
4030     main_app_port->active_requests++;
4031 
4032     nxt_port_inc_use(app_port);
4033 
4034     nxt_thread_mutex_unlock(&app->mutex);
4035 
4036     if (unlinked) {
4037         nxt_mp_release(r->mem_pool);
4038     }
4039 
4040     if (start_process) {
4041         nxt_router_start_app_process(task, app);
4042     }
4043 
4044     nxt_port_use(task, req_rpc_data->app_port, -1);
4045 
4046     req_rpc_data->app_port = app_port;
4047 
4048     b = req_rpc_data->msg_info.buf;
4049 
4050     if (b != NULL) {
4051         /* First buffer is already sent.  Start from second. */
4052         b = b->next;
4053     }
4054 
4055     if (req_rpc_data->msg_info.body_fd != -1 || b != NULL) {
4056         nxt_debug(task, "stream #%uD: send body fd %d", req_rpc_data->stream,
4057                   req_rpc_data->msg_info.body_fd);
4058 
4059         if (req_rpc_data->msg_info.body_fd != -1) {
4060             lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET);
4061         }
4062 
4063         res = nxt_port_socket_write(task, app_port, NXT_PORT_MSG_REQ_BODY,
4064                                     req_rpc_data->msg_info.body_fd,
4065                                     req_rpc_data->stream,
4066                                     task->thread->engine->port->id, b);
4067 
4068         if (nxt_slow_path(res != NXT_OK)) {
4069             nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
4070         }
4071     }
4072 
4073     if (app->timeout != 0) {
4074         r->timer.handler = nxt_router_app_timeout;
4075         r->timer_data = req_rpc_data;
4076         nxt_timer_add(task->thread->engine, &r->timer, app->timeout);
4077     }
4078 }
4079 
4080 
4081 static const nxt_http_request_state_t  nxt_http_request_send_state
4082     nxt_aligned(64) =
4083 {
4084     .error_handler = nxt_http_request_error_handler,
4085 };
4086 
4087 
4088 static void
4089 nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data)
4090 {
4091     nxt_buf_t           *out;
4092     nxt_http_request_t  *r;
4093 
4094     r = obj;
4095 
4096     out = r->out;
4097 
4098     if (out != NULL) {
4099         r->out = NULL;
4100         nxt_http_request_send(task, r, out);
4101     }
4102 }
4103 
4104 
4105 static void
4106 nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4107     void *data)
4108 {
4109     nxt_request_rpc_data_t  *req_rpc_data;
4110 
4111     req_rpc_data = data;
4112 
4113     req_rpc_data->rpc_cancel = 0;
4114 
4115     /* TODO cancel message and return if cancelled. */
4116     // nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream);
4117 
4118     if (req_rpc_data->request != NULL) {
4119         nxt_http_request_error(task, req_rpc_data->request,
4120                                NXT_HTTP_SERVICE_UNAVAILABLE);
4121     }
4122 
4123     nxt_request_rpc_data_unlink(task, req_rpc_data);
4124 }
4125 
4126 
4127 static void
4128 nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4129     void *data)
4130 {
4131     nxt_app_t        *app;
4132     nxt_port_t       *port;
4133     nxt_app_joint_t  *app_joint;
4134 
4135     app_joint = data;
4136     port = msg->u.new_port;
4137 
4138     nxt_assert(app_joint != NULL);
4139     nxt_assert(port != NULL);
4140     nxt_assert(port->type == NXT_PROCESS_APP);
4141     nxt_assert(port->id == 0);
4142 
4143     app = app_joint->app;
4144 
4145     nxt_router_app_joint_use(task, app_joint, -1);
4146 
4147     if (nxt_slow_path(app == NULL)) {
4148         nxt_debug(task, "new port ready for released app, send QUIT");
4149 
4150         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4151 
4152         return;
4153     }
4154 
4155     port->app = app;
4156     port->main_app_port = port;
4157 
4158     nxt_thread_mutex_lock(&app->mutex);
4159 
4160     nxt_assert(app->pending_processes != 0);
4161 
4162     app->pending_processes--;
4163     app->processes++;
4164     nxt_port_hash_add(&app->port_hash, port);
4165     app->port_hash_count++;
4166 
4167     nxt_thread_mutex_unlock(&app->mutex);
4168 
4169     nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d",
4170               &app->name, port->pid, app->processes, app->pending_processes);
4171 
4172     nxt_router_app_shared_port_send(task, port);
4173 
4174     nxt_router_app_port_release(task, port, NXT_APR_NEW_PORT);
4175 }
4176 
4177 
4178 static nxt_int_t
4179 nxt_router_app_shared_port_send(nxt_task_t *task, nxt_port_t *app_port)
4180 {
4181     nxt_buf_t                *b;
4182     nxt_port_t               *port;
4183     nxt_port_msg_new_port_t  *msg;
4184 
4185     b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
4186                              sizeof(nxt_port_data_t));
4187     if (nxt_slow_path(b == NULL)) {
4188         return NXT_ERROR;
4189     }
4190 
4191     port = app_port->app->shared_port;
4192 
4193     nxt_debug(task, "send port %FD to process %PI",
4194               port->pair[0], app_port->pid);
4195 
4196     b->mem.free += sizeof(nxt_port_msg_new_port_t);
4197     msg = (nxt_port_msg_new_port_t *) b->mem.pos;
4198 
4199     msg->id = port->id;
4200     msg->pid = port->pid;
4201     msg->max_size = port->max_size;
4202     msg->max_share = port->max_share;
4203     msg->type = port->type;
4204 
4205     return nxt_port_socket_write2(task, app_port,
4206                                   NXT_PORT_MSG_NEW_PORT,
4207                                   port->pair[0], port->queue_fd,
4208                                   0, 0, b);
4209 }
4210 
4211 
4212 static void
4213 nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4214     void *data)
4215 {
4216     nxt_app_t           *app;
4217     nxt_app_joint_t     *app_joint;
4218     nxt_queue_link_t    *link;
4219     nxt_http_request_t  *r;
4220 
4221     app_joint = data;
4222 
4223     nxt_assert(app_joint != NULL);
4224 
4225     app = app_joint->app;
4226 
4227     nxt_router_app_joint_use(task, app_joint, -1);
4228 
4229     if (nxt_slow_path(app == NULL)) {
4230         nxt_debug(task, "start error for released app");
4231 
4232         return;
4233     }
4234 
4235     nxt_debug(task, "app '%V' %p start error", &app->name, app);
4236 
4237     link = NULL;
4238 
4239     nxt_thread_mutex_lock(&app->mutex);
4240 
4241     nxt_assert(app->pending_processes != 0);
4242 
4243     app->pending_processes--;
4244 
4245     if (app->processes == 0 && !nxt_queue_is_empty(&app->ack_waiting_req)) {
4246         link = nxt_queue_first(&app->ack_waiting_req);
4247 
4248         nxt_queue_remove(link);
4249         link->next = NULL;
4250     }
4251 
4252     nxt_thread_mutex_unlock(&app->mutex);
4253 
4254     while (link != NULL) {
4255         r = nxt_container_of(link, nxt_http_request_t, app_link);
4256 
4257         nxt_event_engine_post(r->engine, &r->err_work);
4258 
4259         link = NULL;
4260 
4261         nxt_thread_mutex_lock(&app->mutex);
4262 
4263         if (app->processes == 0 && app->pending_processes == 0
4264             && !nxt_queue_is_empty(&app->ack_waiting_req))
4265         {
4266             link = nxt_queue_first(&app->ack_waiting_req);
4267 
4268             nxt_queue_remove(link);
4269             link->next = NULL;
4270         }
4271 
4272         nxt_thread_mutex_unlock(&app->mutex);
4273     }
4274 }
4275 
4276 
4277 
4278 nxt_inline nxt_port_t *
4279 nxt_router_app_get_port_for_quit(nxt_task_t *task, nxt_app_t *app)
4280 {
4281     nxt_port_t  *port;
4282 
4283     port = NULL;
4284 
4285     nxt_thread_mutex_lock(&app->mutex);
4286 
4287     nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
4288 
4289         /* Caller is responsible to decrease port use count. */
4290         nxt_queue_chk_remove(&port->app_link);
4291 
4292         if (nxt_queue_chk_remove(&port->idle_link)) {
4293             app->idle_processes--;
4294 
4295             nxt_debug(task, "app '%V' move port %PI:%d out of %s for quit",
4296                       &app->name, port->pid, port->id,
4297                       (port->idle_start ? "idle_ports" : "spare_ports"));
4298         }
4299 
4300         nxt_port_hash_remove(&app->port_hash, port);
4301         app->port_hash_count--;
4302 
4303         port->app = NULL;
4304         app->processes--;
4305 
4306         break;
4307 
4308     } nxt_queue_loop;
4309 
4310     nxt_thread_mutex_unlock(&app->mutex);
4311 
4312     return port;
4313 }
4314 
4315 
4316 static void
4317 nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i)
4318 {
4319     int  c;
4320 
4321     c = nxt_atomic_fetch_add(&app->use_count, i);
4322 
4323     if (i < 0 && c == -i) {
4324 
4325         if (task->thread->engine != app->engine) {
4326             nxt_event_engine_post(app->engine, &app->joint->free_app_work);
4327 
4328         } else {
4329             nxt_router_free_app(task, app->joint, NULL);
4330         }
4331     }
4332 }
4333 
4334 
4335 static void
4336 nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app)
4337 {
4338     nxt_debug(task, "app '%V' %p unlink", &app->name, app);
4339 
4340     nxt_queue_remove(&app->link);
4341 
4342     nxt_router_app_use(task, app, -1);
4343 }
4344 
4345 
4346 static void
4347 nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
4348     nxt_apr_action_t action)
4349 {
4350     int         inc_use;
4351     uint32_t    got_response, dec_requests;
4352     nxt_app_t   *app;
4353     nxt_bool_t  port_unchained, send_quit, adjust_idle_timer;
4354     nxt_port_t  *main_app_port;
4355 
4356     nxt_assert(port != NULL);
4357     nxt_assert(port->app != NULL);
4358 
4359     app = port->app;
4360 
4361     inc_use = 0;
4362     got_response = 0;
4363     dec_requests = 0;
4364 
4365     switch (action) {
4366     case NXT_APR_NEW_PORT:
4367         break;
4368     case NXT_APR_REQUEST_FAILED:
4369         dec_requests = 1;
4370         inc_use = -1;
4371         break;
4372     case NXT_APR_GOT_RESPONSE:
4373         got_response = 1;
4374         inc_use = -1;
4375         break;
4376     case NXT_APR_UPGRADE:
4377         got_response = 1;
4378         break;
4379     case NXT_APR_CLOSE:
4380         inc_use = -1;
4381         break;
4382     }
4383 
4384     nxt_debug(task, "app '%V' release port %PI:%d: %d %d", &app->name,
4385               port->pid, port->id,
4386               (int) inc_use, (int) got_response);
4387 
4388     if (port == app->shared_port) {
4389         nxt_thread_mutex_lock(&app->mutex);
4390 
4391         app->active_requests -= got_response + dec_requests;
4392 
4393         nxt_thread_mutex_unlock(&app->mutex);
4394 
4395         goto adjust_use;
4396     }
4397 
4398     main_app_port = port->main_app_port;
4399 
4400     nxt_thread_mutex_lock(&app->mutex);
4401 
4402     main_app_port->app_responses += got_response;
4403     main_app_port->active_requests -= got_response + dec_requests;
4404     app->active_requests -= got_response + dec_requests;
4405 
4406     if (main_app_port->pair[1] != -1
4407         && (app->max_requests == 0
4408             || main_app_port->app_responses < app->max_requests))
4409     {
4410         if (main_app_port->app_link.next == NULL) {
4411             nxt_queue_insert_tail(&app->ports, &main_app_port->app_link);
4412 
4413             nxt_port_inc_use(main_app_port);
4414         }
4415     }
4416 
4417     send_quit = (app->max_requests > 0
4418                  && main_app_port->app_responses >= app->max_requests);
4419 
4420     if (send_quit) {
4421         port_unchained = nxt_queue_chk_remove(&main_app_port->app_link);
4422 
4423         nxt_port_hash_remove(&app->port_hash, main_app_port);
4424         app->port_hash_count--;
4425 
4426         main_app_port->app = NULL;
4427         app->processes--;
4428 
4429     } else {
4430         port_unchained = 0;
4431     }
4432 
4433     adjust_idle_timer = 0;
4434 
4435     if (main_app_port->pair[1] != -1 && !send_quit
4436         && main_app_port->active_requests == 0
4437         && main_app_port->active_websockets == 0
4438         && main_app_port->idle_link.next == NULL)
4439     {
4440         if (app->idle_processes == app->spare_processes
4441             && app->adjust_idle_work.data == NULL)
4442         {
4443             adjust_idle_timer = 1;
4444             app->adjust_idle_work.data = app;
4445             app->adjust_idle_work.next = NULL;
4446         }
4447 
4448         if (app->idle_processes < app->spare_processes) {
4449             nxt_queue_insert_tail(&app->spare_ports, &main_app_port->idle_link);
4450 
4451             nxt_debug(task, "app '%V' move port %PI:%d to spare_ports",
4452                       &app->name, main_app_port->pid, main_app_port->id);
4453         } else {
4454             nxt_queue_insert_tail(&app->idle_ports, &main_app_port->idle_link);
4455 
4456             main_app_port->idle_start = task->thread->engine->timers.now;
4457 
4458             nxt_debug(task, "app '%V' move port %PI:%d to idle_ports",
4459                       &app->name, main_app_port->pid, main_app_port->id);
4460         }
4461 
4462         app->idle_processes++;
4463     }
4464 
4465     nxt_thread_mutex_unlock(&app->mutex);
4466 
4467     if (adjust_idle_timer) {
4468         nxt_router_app_use(task, app, 1);
4469         nxt_event_engine_post(app->engine, &app->adjust_idle_work);
4470     }
4471 
4472     /* ? */
4473     if (main_app_port->pair[1] == -1) {
4474         nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)",
4475                   &app->name, app, main_app_port, main_app_port->pid);
4476 
4477         goto adjust_use;
4478     }
4479 
4480     if (send_quit) {
4481         nxt_debug(task, "app '%V' %p send QUIT to port", &app->name, app);
4482 
4483         nxt_port_socket_write(task, main_app_port, NXT_PORT_MSG_QUIT, -1, 0, 0,
4484                               NULL);
4485 
4486         if (port_unchained) {
4487             nxt_port_use(task, main_app_port, -1);
4488         }
4489 
4490         goto adjust_use;
4491     }
4492 
4493     nxt_debug(task, "app '%V' %p requests queue is empty, keep the port",
4494               &app->name, app);
4495 
4496 adjust_use:
4497 
4498     nxt_port_use(task, port, inc_use);
4499 }
4500 
4501 
4502 void
4503 nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port)
4504 {
4505     nxt_app_t         *app;
4506     nxt_bool_t        unchain, start_process;
4507     nxt_port_t        *idle_port;
4508     nxt_queue_link_t  *idle_lnk;
4509 
4510     app = port->app;
4511 
4512     nxt_assert(app != NULL);
4513 
4514     nxt_thread_mutex_lock(&app->mutex);
4515 
4516     nxt_port_hash_remove(&app->port_hash, port);
4517     app->port_hash_count--;
4518 
4519     if (port->id != 0) {
4520         nxt_thread_mutex_unlock(&app->mutex);
4521 
4522         nxt_debug(task, "app '%V' port (%PI, %d) closed", &app->name,
4523                   port->pid, port->id);
4524 
4525         return;
4526     }
4527 
4528     unchain = nxt_queue_chk_remove(&port->app_link);
4529 
4530     if (nxt_queue_chk_remove(&port->idle_link)) {
4531         app->idle_processes--;
4532 
4533         nxt_debug(task, "app '%V' move port %PI:%d out of %s before close",
4534                   &app->name, port->pid, port->id,
4535                   (port->idle_start ? "idle_ports" : "spare_ports"));
4536 
4537         if (port->idle_start == 0
4538             && app->idle_processes >= app->spare_processes)
4539         {
4540             nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4541 
4542             idle_lnk = nxt_queue_last(&app->idle_ports);
4543             idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link);
4544             nxt_queue_remove(idle_lnk);
4545 
4546             nxt_queue_insert_tail(&app->spare_ports, idle_lnk);
4547 
4548             idle_port->idle_start = 0;
4549 
4550             nxt_debug(task, "app '%V' move port %PI:%d from idle_ports "
4551                       "to spare_ports",
4552                       &app->name, idle_port->pid, idle_port->id);
4553         }
4554     }
4555 
4556     app->processes--;
4557 
4558     start_process = !task->thread->engine->shutdown
4559                     && nxt_router_app_can_start(app)
4560                     && nxt_router_app_need_start(app);
4561 
4562     if (start_process) {
4563         app->pending_processes++;
4564     }
4565 
4566     nxt_thread_mutex_unlock(&app->mutex);
4567 
4568     nxt_debug(task, "app '%V' pid %PI closed", &app->name, port->pid);
4569 
4570     if (unchain) {
4571         nxt_port_use(task, port, -1);
4572     }
4573 
4574     if (start_process) {
4575         nxt_router_start_app_process(task, app);
4576     }
4577 }
4578 
4579 
4580 static void
4581 nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data)
4582 {
4583     nxt_app_t           *app;
4584     nxt_bool_t          queued;
4585     nxt_port_t          *port;
4586     nxt_msec_t          timeout, threshold;
4587     nxt_queue_link_t    *lnk;
4588     nxt_event_engine_t  *engine;
4589 
4590     app = obj;
4591     queued = (data == app);
4592 
4593     nxt_debug(task, "nxt_router_adjust_idle_timer: app \"%V\", queued %b",
4594               &app->name, queued);
4595 
4596     engine = task->thread->engine;
4597 
4598     nxt_assert(app->engine == engine);
4599 
4600     threshold = engine->timers.now + app->joint->idle_timer.bias;
4601     timeout = 0;
4602 
4603     nxt_thread_mutex_lock(&app->mutex);
4604 
4605     if (queued) {
4606         app->adjust_idle_work.data = NULL;
4607     }
4608 
4609     nxt_debug(task, "app '%V' idle_processes %d, spare_processes %d",
4610               &app->name,
4611               (int) app->idle_processes, (int) app->spare_processes);
4612 
4613     while (app->idle_processes > app->spare_processes) {
4614 
4615         nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4616 
4617         lnk = nxt_queue_first(&app->idle_ports);
4618         port = nxt_queue_link_data(lnk, nxt_port_t, idle_link);
4619 
4620         timeout = port->idle_start + app->idle_timeout;
4621 
4622         nxt_debug(task, "app '%V' pid %PI, start %M, timeout %M, threshold %M",
4623                   &app->name, port->pid,
4624                   port->idle_start, timeout, threshold);
4625 
4626         if (timeout > threshold) {
4627             break;
4628         }
4629 
4630         nxt_queue_remove(lnk);
4631         lnk->next = NULL;
4632 
4633         nxt_debug(task, "app '%V' move port %PI:%d out of idle_ports (timeout)",
4634                   &app->name, port->pid, port->id);
4635 
4636         nxt_queue_chk_remove(&port->app_link);
4637 
4638         nxt_port_hash_remove(&app->port_hash, port);
4639         app->port_hash_count--;
4640 
4641         app->idle_processes--;
4642         app->processes--;
4643         port->app = NULL;
4644 
4645         nxt_thread_mutex_unlock(&app->mutex);
4646 
4647         nxt_debug(task, "app '%V' send QUIT to idle port %PI",
4648                   &app->name, port->pid);
4649 
4650         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4651 
4652         nxt_port_use(task, port, -1);
4653 
4654         nxt_thread_mutex_lock(&app->mutex);
4655     }
4656 
4657     nxt_thread_mutex_unlock(&app->mutex);
4658 
4659     if (timeout > threshold) {
4660         nxt_timer_add(engine, &app->joint->idle_timer, timeout - threshold);
4661 
4662     } else {
4663         nxt_timer_disable(engine, &app->joint->idle_timer);
4664     }
4665 
4666     if (queued) {
4667         nxt_router_app_use(task, app, -1);
4668     }
4669 }
4670 
4671 
4672 static void
4673 nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, void *data)
4674 {
4675     nxt_timer_t      *timer;
4676     nxt_app_joint_t  *app_joint;
4677 
4678     timer = obj;
4679     app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer);
4680 
4681     if (nxt_fast_path(app_joint->app != NULL)) {
4682         nxt_router_adjust_idle_timer(task, app_joint->app, NULL);
4683     }
4684 }
4685 
4686 
4687 static void
4688 nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, void *data)
4689 {
4690     nxt_timer_t      *timer;
4691     nxt_app_joint_t  *app_joint;
4692 
4693     timer = obj;
4694     app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer);
4695 
4696     nxt_router_app_joint_use(task, app_joint, -1);
4697 }
4698 
4699 
4700 static void
4701 nxt_router_free_app(nxt_task_t *task, void *obj, void *data)
4702 {
4703     nxt_app_t        *app;
4704     nxt_port_t       *port;
4705     nxt_app_joint_t  *app_joint;
4706 
4707     app_joint = obj;
4708     app = app_joint->app;
4709 
4710     for ( ;; ) {
4711         port = nxt_router_app_get_port_for_quit(task, app);
4712         if (port == NULL) {
4713             break;
4714         }
4715 
4716         nxt_debug(task, "send QUIT to app '%V' pid %PI", &app->name, port->pid);
4717 
4718         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4719 
4720         nxt_port_use(task, port, -1);
4721     }
4722 
4723     nxt_thread_mutex_lock(&app->mutex);
4724 
4725     for ( ;; ) {
4726         port = nxt_port_hash_retrieve(&app->port_hash);
4727         if (port == NULL) {
4728             break;
4729         }
4730 
4731         app->port_hash_count--;
4732 
4733         port->app = NULL;
4734 
4735         nxt_port_close(task, port);
4736 
4737         nxt_port_use(task, port, -1);
4738     }
4739 
4740     nxt_thread_mutex_unlock(&app->mutex);
4741 
4742     nxt_assert(app->processes == 0);
4743     nxt_assert(app->active_requests == 0);
4744     nxt_assert(app->port_hash_count == 0);
4745     nxt_assert(app->idle_processes == 0);
4746     nxt_assert(nxt_queue_is_empty(&app->ports));
4747     nxt_assert(nxt_queue_is_empty(&app->spare_ports));
4748     nxt_assert(nxt_queue_is_empty(&app->idle_ports));
4749 
4750     nxt_port_mmaps_destroy(&app->outgoing, 1);
4751 
4752     nxt_thread_mutex_destroy(&app->outgoing.mutex);
4753 
4754     if (app->shared_port != NULL) {
4755         app->shared_port->app = NULL;
4756         nxt_port_close(task, app->shared_port);
4757         nxt_port_use(task, app->shared_port, -1);
4758     }
4759 
4760     nxt_thread_mutex_destroy(&app->mutex);
4761     nxt_mp_destroy(app->mem_pool);
4762 
4763     app_joint->app = NULL;
4764 
4765     if (nxt_timer_delete(task->thread->engine, &app_joint->idle_timer)) {
4766         app_joint->idle_timer.handler = nxt_router_app_joint_release_handler;
4767         nxt_timer_add(task->thread->engine, &app_joint->idle_timer, 0);
4768 
4769     } else {
4770         nxt_router_app_joint_use(task, app_joint, -1);
4771     }
4772 }
4773 
4774 
4775 static void
4776 nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
4777     nxt_request_rpc_data_t *req_rpc_data)
4778 {
4779     nxt_bool_t          start_process;
4780     nxt_port_t          *port;
4781     nxt_http_request_t  *r;
4782 
4783     start_process = 0;
4784 
4785     nxt_thread_mutex_lock(&app->mutex);
4786 
4787     port = app->shared_port;
4788     nxt_port_inc_use(port);
4789 
4790     app->active_requests++;
4791 
4792     if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) {
4793         app->pending_processes++;
4794         start_process = 1;
4795     }
4796 
4797     r = req_rpc_data->request;
4798 
4799     /*
4800      * Put request into application-wide list to be able to cancel request
4801      * if something goes wrong with application processes.
4802      */
4803     nxt_queue_insert_tail(&app->ack_waiting_req, &r->app_link);
4804 
4805     nxt_thread_mutex_unlock(&app->mutex);
4806 
4807     /*
4808      * Retain request memory pool while request is linked in ack_waiting_req
4809      * to guarantee request structure memory is accessble.
4810      */
4811     nxt_mp_retain(r->mem_pool);
4812 
4813     req_rpc_data->app_port = port;
4814     req_rpc_data->apr_action = NXT_APR_REQUEST_FAILED;
4815 
4816     if (start_process) {
4817         nxt_router_start_app_process(task, app);
4818     }
4819 }
4820 
4821 
4822 void
4823 nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r,
4824     nxt_app_t *app)
4825 {
4826     nxt_event_engine_t      *engine;
4827     nxt_request_rpc_data_t  *req_rpc_data;
4828 
4829     engine = task->thread->engine;
4830 
4831     req_rpc_data = nxt_port_rpc_register_handler_ex(task, engine->port,
4832                                           nxt_router_response_ready_handler,
4833                                           nxt_router_response_error_handler,
4834                                           sizeof(nxt_request_rpc_data_t));
4835     if (nxt_slow_path(req_rpc_data == NULL)) {
4836         nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
4837         return;
4838     }
4839 
4840     /*
4841      * At this point we have request req_rpc_data allocated and registered
4842      * in port handlers.  Need to fixup request memory pool.  Counterpart
4843      * release will be called via following call chain:
4844      *    nxt_request_rpc_data_unlink() ->
4845      *        nxt_router_http_request_release_post() ->
4846      *            nxt_router_http_request_release()
4847      */
4848     nxt_mp_retain(r->mem_pool);
4849 
4850     r->timer.task = &engine->task;
4851     r->timer.work_queue = &engine->fast_work_queue;
4852     r->timer.log = engine->task.log;
4853     r->timer.bias = NXT_TIMER_DEFAULT_BIAS;
4854 
4855     r->engine = engine;
4856     r->err_work.handler = nxt_router_http_request_error;
4857     r->err_work.task = task;
4858     r->err_work.obj = r;
4859 
4860     req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data);
4861     req_rpc_data->app = app;
4862     req_rpc_data->msg_info.body_fd = -1;
4863     req_rpc_data->rpc_cancel = 1;
4864 
4865     nxt_router_app_use(task, app, 1);
4866 
4867     req_rpc_data->request = r;
4868     r->req_rpc_data = req_rpc_data;
4869 
4870     if (r->last != NULL) {
4871         r->last->completion_handler = nxt_router_http_request_done;
4872     }
4873 
4874     nxt_router_app_port_get(task, app, req_rpc_data);
4875     nxt_router_app_prepare_request(task, req_rpc_data);
4876 }
4877 
4878 
4879 static void
4880 nxt_router_http_request_error(nxt_task_t *task, void *obj, void *data)
4881 {
4882     nxt_http_request_t  *r;
4883 
4884     r = obj;
4885 
4886     nxt_debug(task, "router http request error (rpc_data %p)", r->req_rpc_data);
4887 
4888     nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
4889 
4890     if (r->req_rpc_data != NULL) {
4891         nxt_request_rpc_data_unlink(task, r->req_rpc_data);
4892     }
4893 
4894     nxt_mp_release(r->mem_pool);
4895 }
4896 
4897 
4898 static void
4899 nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data)
4900 {
4901     nxt_http_request_t  *r;
4902 
4903     r = data;
4904 
4905     nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data);
4906 
4907     if (r->req_rpc_data != NULL) {
4908         nxt_request_rpc_data_unlink(task, r->req_rpc_data);
4909     }
4910 
4911     nxt_http_request_close_handler(task, r, r->proto.any);
4912 }
4913 
4914 
4915 static void
4916 nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj, void *data)
4917 {
4918 }
4919 
4920 
4921 static void
4922 nxt_router_app_prepare_request(nxt_task_t *task,
4923     nxt_request_rpc_data_t *req_rpc_data)
4924 {
4925     nxt_app_t         *app;
4926     nxt_buf_t         *buf, *body;
4927     nxt_int_t         res;
4928     nxt_port_t        *port, *reply_port;
4929 
4930     int                   notify;
4931     struct {
4932         nxt_port_msg_t       pm;
4933         nxt_port_mmap_msg_t  mm;
4934     } msg;
4935 
4936 
4937     app = req_rpc_data->app;
4938 
4939     nxt_assert(app != NULL);
4940 
4941     port = req_rpc_data->app_port;
4942 
4943     nxt_assert(port != NULL);
4944     nxt_assert(port->queue != NULL);
4945 
4946     reply_port = task->thread->engine->port;
4947 
4948     buf = nxt_router_prepare_msg(task, req_rpc_data->request, app,
4949                                  nxt_app_msg_prefix[app->type]);
4950     if (nxt_slow_path(buf == NULL)) {
4951         nxt_alert(task, "stream #%uD, app '%V': failed to prepare app message",
4952                   req_rpc_data->stream, &app->name);
4953 
4954         nxt_http_request_error(task, req_rpc_data->request,
4955                                NXT_HTTP_INTERNAL_SERVER_ERROR);
4956 
4957         return;
4958     }
4959 
4960     nxt_debug(task, "about to send %O bytes buffer to app process port %d",
4961                     nxt_buf_used_size(buf),
4962                     port->socket.fd);
4963 
4964     req_rpc_data->msg_info.buf = buf;
4965     req_rpc_data->msg_info.completion_handler = buf->completion_handler;
4966 
4967     do {
4968         buf->completion_handler = nxt_router_dummy_buf_completion;
4969         buf = buf->next;
4970     } while (buf != NULL);
4971 
4972     buf = req_rpc_data->msg_info.buf;
4973 
4974     body = req_rpc_data->request->body;
4975 
4976     if (body != NULL && nxt_buf_is_file(body)) {
4977         req_rpc_data->msg_info.body_fd = body->file->fd;
4978 
4979         body->file->fd = -1;
4980 
4981     } else {
4982         req_rpc_data->msg_info.body_fd = -1;
4983     }
4984 
4985     msg.pm.stream = req_rpc_data->stream;
4986     msg.pm.pid = reply_port->pid;
4987     msg.pm.reply_port = reply_port->id;
4988     msg.pm.type = NXT_PORT_MSG_REQ_HEADERS;
4989     msg.pm.last = 0;
4990     msg.pm.mmap = 1;
4991     msg.pm.nf = 0;
4992     msg.pm.mf = 0;
4993     msg.pm.tracking = 0;
4994 
4995     nxt_port_mmap_handler_t *mmap_handler = buf->parent;
4996     nxt_port_mmap_header_t *hdr = mmap_handler->hdr;
4997 
4998     msg.mm.mmap_id = hdr->id;
4999     msg.mm.chunk_id = nxt_port_mmap_chunk_id(hdr, buf->mem.pos);
5000     msg.mm.size = nxt_buf_used_size(buf);
5001 
5002     res = nxt_app_queue_send(port->queue, &msg, sizeof(msg),
5003                              req_rpc_data->stream, &notify,
5004                              &req_rpc_data->msg_info.tracking_cookie);
5005     if (nxt_fast_path(res == NXT_OK)) {
5006         if (notify != 0) {
5007             (void) nxt_port_socket_write(task, port,
5008                                          NXT_PORT_MSG_READ_QUEUE,
5009                                          -1, req_rpc_data->stream,
5010                                          reply_port->id, NULL);
5011 
5012         } else {
5013             nxt_debug(task, "queue is not empty");
5014         }
5015 
5016         buf->is_port_mmap_sent = 1;
5017         buf->mem.pos = buf->mem.free;
5018 
5019     } else {
5020         nxt_alert(task, "stream #%uD, app '%V': failed to send app message",
5021                   req_rpc_data->stream, &app->name);
5022 
5023         nxt_http_request_error(task, req_rpc_data->request,
5024                                NXT_HTTP_INTERNAL_SERVER_ERROR);
5025     }
5026 }
5027 
5028 
5029 struct nxt_fields_iter_s {
5030     nxt_list_part_t   *part;
5031     nxt_http_field_t  *field;
5032 };
5033 
5034 typedef struct nxt_fields_iter_s  nxt_fields_iter_t;
5035 
5036 
5037 static nxt_http_field_t *
5038 nxt_fields_part_first(nxt_list_part_t *part, nxt_fields_iter_t *i)
5039 {
5040     if (part == NULL) {
5041         return NULL;
5042     }
5043 
5044     while (part->nelts == 0) {
5045         part = part->next;
5046         if (part == NULL) {
5047             return NULL;
5048         }
5049     }
5050 
5051     i->part = part;
5052     i->field = nxt_list_data(i->part);
5053 
5054     return i->field;
5055 }
5056 
5057 
5058 static nxt_http_field_t *
5059 nxt_fields_first(nxt_list_t *fields, nxt_fields_iter_t *i)
5060 {
5061     return nxt_fields_part_first(nxt_list_part(fields), i);
5062 }
5063 
5064 
5065 static nxt_http_field_t *
5066 nxt_fields_next(nxt_fields_iter_t *i)
5067 {
5068     nxt_http_field_t  *end = nxt_list_data(i->part);
5069 
5070     end += i->part->nelts;
5071     i->field++;
5072 
5073     if (i->field < end) {
5074         return i->field;
5075     }
5076 
5077     return nxt_fields_part_first(i->part->next, i);
5078 }
5079 
5080 
5081 static nxt_buf_t *
5082 nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r,
5083     nxt_app_t *app, const nxt_str_t *prefix)
5084 {
5085     void                *target_pos, *query_pos;
5086     u_char              *pos, *end, *p, c;
5087     size_t              fields_count, req_size, size, free_size;
5088     size_t              copy_size;
5089     nxt_off_t           content_length;
5090     nxt_buf_t           *b, *buf, *out, **tail;
5091     nxt_http_field_t    *field, *dup;
5092     nxt_unit_field_t    *dst_field;
5093     nxt_fields_iter_t   iter, dup_iter;
5094     nxt_unit_request_t  *req;
5095 
5096     req_size = sizeof(nxt_unit_request_t)
5097                + r->method->length + 1
5098                + r->version.length + 1
5099                + r->remote->length + 1
5100                + r->local->length + 1
5101                + r->server_name.length + 1
5102                + r->target.length + 1
5103                + (r->path->start != r->target.start ? r->path->length + 1 : 0);
5104 
5105     content_length = r->content_length_n < 0 ? 0 : r->content_length_n;
5106     fields_count = 0;
5107 
5108     nxt_list_each(field, r->fields) {
5109         fields_count++;
5110 
5111         req_size += field->name_length + prefix->length + 1
5112                     + field->value_length + 1;
5113     } nxt_list_loop;
5114 
5115     req_size += fields_count * sizeof(nxt_unit_field_t);
5116 
5117     if (nxt_slow_path(req_size > PORT_MMAP_DATA_SIZE)) {
5118         nxt_alert(task, "headers to big to fit in shared memory (%d)",
5119                   (int) req_size);
5120 
5121         return NULL;
5122     }
5123 
5124     out = nxt_port_mmap_get_buf(task, &app->outgoing,
5125               nxt_min(req_size + content_length, PORT_MMAP_DATA_SIZE));
5126     if (nxt_slow_path(out == NULL)) {
5127         return NULL;
5128     }
5129 
5130     req = (nxt_unit_request_t *) out->mem.free;
5131     out->mem.free += req_size;
5132 
5133     req->app_target = r->app_target;
5134 
5135     req->content_length = content_length;
5136 
5137     p = (u_char *) (req->fields + fields_count);
5138 
5139     nxt_debug(task, "fields_count=%d", (int) fields_count);
5140 
5141     req->method_length = r->method->length;
5142     nxt_unit_sptr_set(&req->method, p);
5143     p = nxt_cpymem(p, r->method->start, r->method->length);
5144     *p++ = '\0';
5145 
5146     req->version_length = r->version.length;
5147     nxt_unit_sptr_set(&req->version, p);
5148     p = nxt_cpymem(p, r->version.start, r->version.length);
5149     *p++ = '\0';
5150 
5151     req->remote_length = r->remote->address_length;
5152     nxt_unit_sptr_set(&req->remote, p);
5153     p = nxt_cpymem(p, nxt_sockaddr_address(r->remote),
5154                    r->remote->address_length);
5155     *p++ = '\0';
5156 
5157     req->local_length = r->local->address_length;
5158     nxt_unit_sptr_set(&req->local, p);
5159     p = nxt_cpymem(p, nxt_sockaddr_address(r->local), r->local->address_length);
5160     *p++ = '\0';
5161 
5162     req->tls = (r->tls != NULL);
5163     req->websocket_handshake = r->websocket_handshake;
5164 
5165     req->server_name_length = r->server_name.length;
5166     nxt_unit_sptr_set(&req->server_name, p);
5167     p = nxt_cpymem(p, r->server_name.start, r->server_name.length);
5168     *p++ = '\0';
5169 
5170     target_pos = p;
5171     req->target_length = (uint32_t) r->target.length;
5172     nxt_unit_sptr_set(&req->target, p);
5173     p = nxt_cpymem(p, r->target.start, r->target.length);
5174     *p++ = '\0';
5175 
5176     req->path_length = (uint32_t) r->path->length;
5177     if (r->path->start == r->target.start) {
5178         nxt_unit_sptr_set(&req->path, target_pos);
5179 
5180     } else {
5181         nxt_unit_sptr_set(&req->path, p);
5182         p = nxt_cpymem(p, r->path->start, r->path->length);
5183         *p++ = '\0';
5184     }
5185 
5186     req->query_length = r->args != NULL ? (uint32_t) r->args->length : 0;
5187     if (r->args != NULL && r->args->start != NULL) {
5188         query_pos = nxt_pointer_to(target_pos,
5189                                    r->args->start - r->target.start);
5190 
5191         nxt_unit_sptr_set(&req->query, query_pos);
5192 
5193     } else {
5194         req->query.offset = 0;
5195     }
5196 
5197     req->content_length_field = NXT_UNIT_NONE_FIELD;
5198     req->content_type_field   = NXT_UNIT_NONE_FIELD;
5199     req->cookie_field         = NXT_UNIT_NONE_FIELD;
5200     req->authorization_field  = NXT_UNIT_NONE_FIELD;
5201 
5202     dst_field = req->fields;
5203 
5204     for (field = nxt_fields_first(r->fields, &iter);
5205          field != NULL;
5206          field = nxt_fields_next(&iter))
5207     {
5208         if (field->skip) {
5209             continue;
5210         }
5211 
5212         dst_field->hash = field->hash;
5213         dst_field->skip = 0;
5214         dst_field->name_length = field->name_length + prefix->length;
5215         dst_field->value_length = field->value_length;
5216 
5217         if (field == r->content_length) {
5218             req->content_length_field = dst_field - req->fields;
5219 
5220         } else if (field == r->content_type) {
5221             req->content_type_field = dst_field - req->fields;
5222 
5223         } else if (field == r->cookie) {
5224             req->cookie_field = dst_field - req->fields;
5225 
5226         } else if (field == r->authorization) {
5227             req->authorization_field = dst_field - req->fields;
5228         }
5229 
5230         nxt_debug(task, "add field 0x%04Xd, %d, %d, %p : %d %p",
5231                   (int) field->hash, (int) field->skip,
5232                   (int) field->name_length, field->name,
5233                   (int) field->value_length, field->value);
5234 
5235         if (prefix->length != 0) {
5236             nxt_unit_sptr_set(&dst_field->name, p);
5237             p = nxt_cpymem(p, prefix->start, prefix->length);
5238 
5239             end = field->name + field->name_length;
5240             for (pos = field->name; pos < end; pos++) {
5241                 c = *pos;
5242 
5243                 if (c >= 'a' && c <= 'z') {
5244                     *p++ = (c & ~0x20);
5245                     continue;
5246                 }
5247 
5248                 if (c == '-') {
5249                     *p++ = '_';
5250                     continue;
5251                 }
5252 
5253                 *p++ = c;
5254             }
5255 
5256         } else {
5257             nxt_unit_sptr_set(&dst_field->name, p);
5258             p = nxt_cpymem(p, field->name, field->name_length);
5259         }
5260 
5261         *p++ = '\0';
5262 
5263         nxt_unit_sptr_set(&dst_field->value, p);
5264         p = nxt_cpymem(p, field->value, field->value_length);
5265 
5266         if (prefix->length != 0) {
5267             dup_iter = iter;
5268 
5269             for (dup = nxt_fields_next(&dup_iter);
5270                  dup != NULL;
5271                  dup = nxt_fields_next(&dup_iter))
5272             {
5273                 if (dup->name_length != field->name_length
5274                     || dup->skip
5275                     || dup->hash != field->hash
5276                     || nxt_memcasecmp(dup->name, field->name, dup->name_length))
5277                 {
5278                     continue;
5279                 }
5280 
5281                 p = nxt_cpymem(p, ", ", 2);
5282                 p = nxt_cpymem(p, dup->value, dup->value_length);
5283 
5284                 dst_field->value_length += 2 + dup->value_length;
5285 
5286                 dup->skip = 1;
5287             }
5288         }
5289 
5290         *p++ = '\0';
5291 
5292         dst_field++;
5293     }
5294 
5295     req->fields_count = (uint32_t) (dst_field - req->fields);
5296 
5297     nxt_unit_sptr_set(&req->preread_content, out->mem.free);
5298 
5299     buf = out;
5300     tail = &buf->next;
5301 
5302     for (b = r->body; b != NULL; b = b->next) {
5303         size = nxt_buf_mem_used_size(&b->mem);
5304         pos = b->mem.pos;
5305 
5306         while (size > 0) {
5307             if (buf == NULL) {
5308                 free_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
5309 
5310                 buf = nxt_port_mmap_get_buf(task, &app->outgoing, free_size);
5311                 if (nxt_slow_path(buf == NULL)) {
5312                     while (out != NULL) {
5313                         buf = out->next;
5314                         out->next = NULL;
5315                         out->completion_handler(task, out, out->parent);
5316                         out = buf;
5317                     }
5318                     return NULL;
5319                 }
5320 
5321                 *tail = buf;
5322                 tail = &buf->next;
5323 
5324             } else {
5325                 free_size = nxt_buf_mem_free_size(&buf->mem);
5326                 if (free_size < size
5327                     && nxt_port_mmap_increase_buf(task, buf, size, 1)
5328                        == NXT_OK)
5329                 {
5330                     free_size = nxt_buf_mem_free_size(&buf->mem);
5331                 }
5332             }
5333 
5334             if (free_size > 0) {
5335                 copy_size = nxt_min(free_size, size);
5336 
5337                 buf->mem.free = nxt_cpymem(buf->mem.free, pos, copy_size);
5338 
5339                 size -= copy_size;
5340                 pos += copy_size;
5341 
5342                 if (size == 0) {
5343                     break;
5344                 }
5345             }
5346 
5347             buf = NULL;
5348         }
5349     }
5350 
5351     return out;
5352 }
5353 
5354 
5355 static void
5356 nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
5357 {
5358     nxt_timer_t              *timer;
5359     nxt_http_request_t       *r;
5360     nxt_request_rpc_data_t   *req_rpc_data;
5361 
5362     timer = obj;
5363 
5364     nxt_debug(task, "router app timeout");
5365 
5366     r = nxt_timer_data(timer, nxt_http_request_t, timer);
5367     req_rpc_data = r->timer_data;
5368 
5369     nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
5370 
5371     nxt_request_rpc_data_unlink(task, req_rpc_data);
5372 }
5373 
5374 
5375 static void
5376 nxt_router_http_request_release_post(nxt_task_t *task, nxt_http_request_t *r)
5377 {
5378     r->timer.handler = nxt_router_http_request_release;
5379     nxt_timer_add(task->thread->engine, &r->timer, 0);
5380 }
5381 
5382 
5383 static void
5384 nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data)
5385 {
5386     nxt_http_request_t  *r;
5387 
5388     nxt_debug(task, "http request pool release");
5389 
5390     r = nxt_timer_data(obj, nxt_http_request_t, timer);
5391 
5392     nxt_mp_release(r->mem_pool);
5393 }
5394 
5395 
5396 static void
5397 nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5398 {
5399     size_t                   mi;
5400     uint32_t                 i;
5401     nxt_bool_t               ack;
5402     nxt_process_t            *process;
5403     nxt_free_map_t           *m;
5404     nxt_port_mmap_handler_t  *mmap_handler;
5405 
5406     nxt_debug(task, "oosm in %PI", msg->port_msg.pid);
5407 
5408     process = nxt_runtime_process_find(task->thread->runtime,
5409                                        msg->port_msg.pid);
5410     if (nxt_slow_path(process == NULL)) {
5411         return;
5412     }
5413 
5414     ack = 0;
5415 
5416     /*
5417      * To mitigate possible racing condition (when OOSM message received
5418      * after some of the memory was already freed), need to try to find
5419      * first free segment in shared memory and send ACK if found.
5420      */
5421 
5422     nxt_thread_mutex_lock(&process->incoming.mutex);
5423 
5424     for (i = 0; i < process->incoming.size; i++) {
5425         mmap_handler = process->incoming.elts[i].mmap_handler;
5426 
5427         if (nxt_slow_path(mmap_handler == NULL)) {
5428             continue;
5429         }
5430 
5431         m = mmap_handler->hdr->free_map;
5432 
5433         for (mi = 0; mi < MAX_FREE_IDX; mi++) {
5434             if (m[mi] != 0) {
5435                 ack = 1;
5436 
5437                 nxt_debug(task, "oosm: already free #%uD %uz = 0x%08xA",
5438                           i, mi, m[mi]);
5439 
5440                 break;
5441             }
5442         }
5443     }
5444 
5445     nxt_thread_mutex_unlock(&process->incoming.mutex);
5446 
5447     if (ack) {
5448         nxt_process_broadcast_shm_ack(task, process);
5449     }
5450 }
5451 
5452 
5453 static void
5454 nxt_router_get_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5455 {
5456     nxt_fd_t                 fd;
5457     nxt_port_t               *port;
5458     nxt_runtime_t            *rt;
5459     nxt_port_mmaps_t         *mmaps;
5460     nxt_port_msg_get_mmap_t  *get_mmap_msg;
5461     nxt_port_mmap_handler_t  *mmap_handler;
5462 
5463     rt = task->thread->runtime;
5464 
5465     port = nxt_runtime_port_find(rt, msg->port_msg.pid,
5466                                  msg->port_msg.reply_port);
5467     if (nxt_slow_path(port == NULL)) {
5468         nxt_alert(task, "get_mmap_handler: reply_port %PI:%d not found",
5469                   msg->port_msg.pid, msg->port_msg.reply_port);
5470 
5471         return;
5472     }
5473 
5474     if (nxt_slow_path(nxt_buf_used_size(msg->buf)
5475                       < (int) sizeof(nxt_port_msg_get_mmap_t)))
5476     {
5477         nxt_alert(task, "get_mmap_handler: message buffer too small (%d)",
5478                   (int) nxt_buf_used_size(msg->buf));
5479 
5480         return;
5481     }
5482 
5483     get_mmap_msg = (nxt_port_msg_get_mmap_t *) msg->buf->mem.pos;
5484 
5485     nxt_assert(port->type == NXT_PROCESS_APP);
5486 
5487     if (nxt_slow_path(port->app == NULL)) {
5488         nxt_alert(task, "get_mmap_handler: app == NULL for reply port %PI:%d",
5489                   port->pid, port->id);
5490 
5491         // FIXME
5492         nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
5493                               -1, msg->port_msg.stream, 0, NULL);
5494 
5495         return;
5496     }
5497 
5498     mmaps = &port->app->outgoing;
5499     nxt_thread_mutex_lock(&mmaps->mutex);
5500 
5501     if (nxt_slow_path(get_mmap_msg->id >= mmaps->size)) {
5502         nxt_thread_mutex_unlock(&mmaps->mutex);
5503 
5504         nxt_alert(task, "get_mmap_handler: mmap id is too big (%d)",
5505                   (int) get_mmap_msg->id);
5506 
5507         // FIXME
5508         nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
5509                               -1, msg->port_msg.stream, 0, NULL);
5510         return;
5511     }
5512 
5513     mmap_handler = mmaps->elts[get_mmap_msg->id].mmap_handler;
5514 
5515     fd = mmap_handler->fd;
5516 
5517     nxt_thread_mutex_unlock(&mmaps->mutex);
5518 
5519     nxt_debug(task, "get mmap %PI:%d found",
5520               msg->port_msg.pid, (int) get_mmap_msg->id);
5521 
5522     (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL);
5523 }
5524 
5525 
5526 static void
5527 nxt_router_get_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5528 {
5529     nxt_port_t               *port, *reply_port;
5530     nxt_runtime_t            *rt;
5531     nxt_port_msg_get_port_t  *get_port_msg;
5532 
5533     rt = task->thread->runtime;
5534 
5535     reply_port = nxt_runtime_port_find(rt, msg->port_msg.pid,
5536                                        msg->port_msg.reply_port);
5537     if (nxt_slow_path(reply_port == NULL)) {
5538         nxt_alert(task, "get_port_handler: reply_port %PI:%d not found",
5539                   msg->port_msg.pid, msg->port_msg.reply_port);
5540 
5541         return;
5542     }
5543 
5544     if (nxt_slow_path(nxt_buf_used_size(msg->buf)
5545                       < (int) sizeof(nxt_port_msg_get_port_t)))
5546     {
5547         nxt_alert(task, "get_port_handler: message buffer too small (%d)",
5548                   (int) nxt_buf_used_size(msg->buf));
5549 
5550         return;
5551     }
5552 
5553     get_port_msg = (nxt_port_msg_get_port_t *) msg->buf->mem.pos;
5554 
5555     port = nxt_runtime_port_find(rt, get_port_msg->pid, get_port_msg->id);
5556     if (nxt_slow_path(port == NULL)) {
5557         nxt_alert(task, "get_port_handler: port %PI:%d not found",
5558                   get_port_msg->pid, get_port_msg->id);
5559 
5560         return;
5561     }
5562 
5563     nxt_debug(task, "get port %PI:%d found", get_port_msg->pid,
5564               get_port_msg->id);
5565 
5566     (void) nxt_port_send_port(task, reply_port, port, msg->port_msg.stream);
5567 }
5568