xref: /unit/src/nxt_router.c (revision 1980:43553aa72111)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Valentin V. Bartenev
5  * Copyright (C) NGINX, Inc.
6  */
7 
8 #include <nxt_router.h>
9 #include <nxt_conf.h>
10 #if (NXT_TLS)
11 #include <nxt_cert.h>
12 #endif
13 #include <nxt_http.h>
14 #include <nxt_port_memory_int.h>
15 #include <nxt_unit_request.h>
16 #include <nxt_unit_response.h>
17 #include <nxt_router_request.h>
18 #include <nxt_app_queue.h>
19 #include <nxt_port_queue.h>
20 
21 #define NXT_SHARED_PORT_ID  0xFFFFu
22 
23 typedef struct {
24     nxt_str_t         type;
25     uint32_t          processes;
26     uint32_t          max_processes;
27     uint32_t          spare_processes;
28     nxt_msec_t        timeout;
29     nxt_msec_t        idle_timeout;
30     nxt_conf_value_t  *limits_value;
31     nxt_conf_value_t  *processes_value;
32     nxt_conf_value_t  *targets_value;
33 } nxt_router_app_conf_t;
34 
35 
36 typedef struct {
37     nxt_str_t         pass;
38     nxt_str_t         application;
39 } nxt_router_listener_conf_t;
40 
41 
42 #if (NXT_TLS)
43 
44 typedef struct {
45     nxt_str_t               name;
46     nxt_socket_conf_t       *socket_conf;
47     nxt_router_temp_conf_t  *temp_conf;
48     nxt_tls_init_t          *tls_init;
49     nxt_bool_t              last;
50 
51     nxt_queue_link_t        link;  /* for nxt_socket_conf_t.tls */
52 } nxt_router_tlssock_t;
53 
54 #endif
55 
56 
57 typedef struct {
58     nxt_str_t               *name;
59     nxt_socket_conf_t       *socket_conf;
60     nxt_router_temp_conf_t  *temp_conf;
61     nxt_bool_t              last;
62 } nxt_socket_rpc_t;
63 
64 
65 typedef struct {
66     nxt_app_t               *app;
67     nxt_router_temp_conf_t  *temp_conf;
68 } nxt_app_rpc_t;
69 
70 
71 typedef struct {
72     nxt_app_joint_t         *app_joint;
73     uint32_t                generation;
74 } nxt_app_joint_rpc_t;
75 
76 
77 static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process,
78     nxt_mp_t *mp);
79 static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data);
80 static void nxt_router_greet_controller(nxt_task_t *task,
81     nxt_port_t *controller_port);
82 
83 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
84 
85 static void nxt_router_new_port_handler(nxt_task_t *task,
86     nxt_port_recv_msg_t *msg);
87 static void nxt_router_conf_data_handler(nxt_task_t *task,
88     nxt_port_recv_msg_t *msg);
89 static void nxt_router_app_restart_handler(nxt_task_t *task,
90     nxt_port_recv_msg_t *msg);
91 static void nxt_router_remove_pid_handler(nxt_task_t *task,
92     nxt_port_recv_msg_t *msg);
93 static void nxt_router_access_log_reopen_handler(nxt_task_t *task,
94     nxt_port_recv_msg_t *msg);
95 
96 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
97 static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
98 static void nxt_router_conf_ready(nxt_task_t *task,
99     nxt_router_temp_conf_t *tmcf);
100 static void nxt_router_conf_error(nxt_task_t *task,
101     nxt_router_temp_conf_t *tmcf);
102 static void nxt_router_conf_send(nxt_task_t *task,
103     nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
104 
105 static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
106     nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
107 static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task,
108     nxt_router_conf_t *rtcf, nxt_conf_value_t *conf);
109 static nxt_int_t nxt_router_conf_process_client_ip(nxt_task_t *task,
110     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf,
111     nxt_conf_value_t *conf);
112 
113 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
114 static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data);
115 static nxt_int_t nxt_router_apps_hash_add(nxt_router_conf_t *rtcf,
116     nxt_app_t *app);
117 static nxt_app_t *nxt_router_apps_hash_get(nxt_router_conf_t *rtcf,
118     nxt_str_t *name);
119 static void nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf,
120     int i);
121 
122 static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task,
123     nxt_port_t *port);
124 static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task,
125     nxt_port_t *port);
126 static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task,
127     nxt_port_t *port, nxt_fd_t fd);
128 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
129     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
130 static void nxt_router_listen_socket_ready(nxt_task_t *task,
131     nxt_port_recv_msg_t *msg, void *data);
132 static void nxt_router_listen_socket_error(nxt_task_t *task,
133     nxt_port_recv_msg_t *msg, void *data);
134 #if (NXT_TLS)
135 static void nxt_router_tls_rpc_handler(nxt_task_t *task,
136     nxt_port_recv_msg_t *msg, void *data);
137 static nxt_int_t nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
138     nxt_conf_value_t *value, nxt_socket_conf_t *skcf, nxt_tls_init_t *tls_init,
139     nxt_bool_t last);
140 #endif
141 static void nxt_router_app_rpc_create(nxt_task_t *task,
142     nxt_router_temp_conf_t *tmcf, nxt_app_t *app);
143 static void nxt_router_app_prefork_ready(nxt_task_t *task,
144     nxt_port_recv_msg_t *msg, void *data);
145 static void nxt_router_app_prefork_error(nxt_task_t *task,
146     nxt_port_recv_msg_t *msg, void *data);
147 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
148     nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
149 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
150     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa);
151 
152 static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
153     nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
154     const nxt_event_interface_t *interface);
155 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
156     nxt_router_engine_conf_t *recf);
157 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
158     nxt_router_engine_conf_t *recf);
159 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
160     nxt_router_engine_conf_t *recf);
161 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
162     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
163     nxt_work_handler_t handler);
164 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
165     nxt_router_engine_conf_t *recf);
166 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
167     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
168 
169 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
170     nxt_router_temp_conf_t *tmcf);
171 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
172     nxt_event_engine_t *engine);
173 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
174     nxt_router_temp_conf_t *tmcf);
175 
176 static void nxt_router_engines_post(nxt_router_t *router,
177     nxt_router_temp_conf_t *tmcf);
178 static void nxt_router_engine_post(nxt_event_engine_t *engine,
179     nxt_work_t *jobs);
180 
181 static void nxt_router_thread_start(void *data);
182 static void nxt_router_rt_add_port(nxt_task_t *task, void *obj,
183     void *data);
184 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
185     void *data);
186 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
187     void *data);
188 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
189     void *data);
190 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
191     void *data);
192 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
193     void *data);
194 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
195     void *data);
196 static void nxt_router_req_headers_ack_handler(nxt_task_t *task,
197     nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data);
198 static void nxt_router_listen_socket_release(nxt_task_t *task,
199     nxt_socket_conf_t *skcf);
200 
201 static void nxt_router_access_log_writer(nxt_task_t *task,
202     nxt_http_request_t *r, nxt_router_access_log_t *access_log);
203 static u_char *nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now,
204     struct tm *tm, size_t size, const char *format);
205 static void nxt_router_access_log_open(nxt_task_t *task,
206     nxt_router_temp_conf_t *tmcf);
207 static void nxt_router_access_log_ready(nxt_task_t *task,
208     nxt_port_recv_msg_t *msg, void *data);
209 static void nxt_router_access_log_error(nxt_task_t *task,
210     nxt_port_recv_msg_t *msg, void *data);
211 static void nxt_router_access_log_release(nxt_task_t *task,
212     nxt_thread_spinlock_t *lock, nxt_router_access_log_t *access_log);
213 static void nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj,
214     void *data);
215 static void nxt_router_access_log_reopen_ready(nxt_task_t *task,
216     nxt_port_recv_msg_t *msg, void *data);
217 static void nxt_router_access_log_reopen_error(nxt_task_t *task,
218     nxt_port_recv_msg_t *msg, void *data);
219 
220 static void nxt_router_app_port_ready(nxt_task_t *task,
221     nxt_port_recv_msg_t *msg, void *data);
222 static nxt_int_t nxt_router_app_shared_port_send(nxt_task_t *task,
223     nxt_port_t *app_port);
224 static void nxt_router_app_port_error(nxt_task_t *task,
225     nxt_port_recv_msg_t *msg, void *data);
226 
227 static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i);
228 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
229 
230 static void nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app,
231     nxt_port_t *port, nxt_apr_action_t action);
232 static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
233     nxt_request_rpc_data_t *req_rpc_data);
234 static void nxt_router_http_request_error(nxt_task_t *task, void *obj,
235     void *data);
236 static void nxt_router_http_request_done(nxt_task_t *task, void *obj,
237     void *data);
238 
239 static void nxt_router_app_prepare_request(nxt_task_t *task,
240     nxt_request_rpc_data_t *req_rpc_data);
241 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task,
242     nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix);
243 
244 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
245 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
246     void *data);
247 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj,
248     void *data);
249 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj,
250     void *data);
251 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data);
252 
253 static const nxt_http_request_state_t  nxt_http_request_send_state;
254 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
255 
256 static void nxt_router_app_joint_use(nxt_task_t *task,
257     nxt_app_joint_t *app_joint, int i);
258 
259 static void nxt_router_http_request_release_post(nxt_task_t *task,
260     nxt_http_request_t *r);
261 static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
262     void *data);
263 static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
264 static void nxt_router_get_port_handler(nxt_task_t *task,
265     nxt_port_recv_msg_t *msg);
266 static void nxt_router_get_mmap_handler(nxt_task_t *task,
267     nxt_port_recv_msg_t *msg);
268 
269 extern const nxt_http_request_state_t  nxt_http_websocket;
270 
271 static nxt_router_t  *nxt_router;
272 
273 static const nxt_str_t http_prefix = nxt_string("HTTP_");
274 static const nxt_str_t empty_prefix = nxt_string("");
275 
276 static const nxt_str_t  *nxt_app_msg_prefix[] = {
277     &empty_prefix,
278     &empty_prefix,
279     &http_prefix,
280     &http_prefix,
281     &http_prefix,
282     &empty_prefix,
283 };
284 
285 
286 static const nxt_port_handlers_t  nxt_router_process_port_handlers = {
287     .quit         = nxt_signal_quit_handler,
288     .new_port     = nxt_router_new_port_handler,
289     .get_port     = nxt_router_get_port_handler,
290     .change_file  = nxt_port_change_log_file_handler,
291     .mmap         = nxt_port_mmap_handler,
292     .get_mmap     = nxt_router_get_mmap_handler,
293     .data         = nxt_router_conf_data_handler,
294     .app_restart  = nxt_router_app_restart_handler,
295     .remove_pid   = nxt_router_remove_pid_handler,
296     .access_log   = nxt_router_access_log_reopen_handler,
297     .rpc_ready    = nxt_port_rpc_handler,
298     .rpc_error    = nxt_port_rpc_handler,
299     .oosm         = nxt_router_oosm_handler,
300 };
301 
302 
303 const nxt_process_init_t  nxt_router_process = {
304     .name           = "router",
305     .type           = NXT_PROCESS_ROUTER,
306     .prefork        = nxt_router_prefork,
307     .restart        = 1,
308     .setup          = nxt_process_core_setup,
309     .start          = nxt_router_start,
310     .port_handlers  = &nxt_router_process_port_handlers,
311     .signals        = nxt_process_signals,
312 };
313 
314 
315 /* Queues of nxt_socket_conf_t */
316 nxt_queue_t  creating_sockets;
317 nxt_queue_t  pending_sockets;
318 nxt_queue_t  updating_sockets;
319 nxt_queue_t  keeping_sockets;
320 nxt_queue_t  deleting_sockets;
321 
322 
323 static nxt_int_t
324 nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp)
325 {
326     nxt_runtime_stop_app_processes(task, task->thread->runtime);
327 
328     return NXT_OK;
329 }
330 
331 
332 static nxt_int_t
333 nxt_router_start(nxt_task_t *task, nxt_process_data_t *data)
334 {
335     nxt_int_t      ret;
336     nxt_port_t     *controller_port;
337     nxt_router_t   *router;
338     nxt_runtime_t  *rt;
339 
340     rt = task->thread->runtime;
341 
342     nxt_log(task, NXT_LOG_INFO, "router started");
343 
344 #if (NXT_TLS)
345     rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL");
346     if (nxt_slow_path(rt->tls == NULL)) {
347         return NXT_ERROR;
348     }
349 
350     ret = rt->tls->library_init(task);
351     if (nxt_slow_path(ret != NXT_OK)) {
352         return ret;
353     }
354 #endif
355 
356     ret = nxt_http_init(task);
357     if (nxt_slow_path(ret != NXT_OK)) {
358         return ret;
359     }
360 
361     router = nxt_zalloc(sizeof(nxt_router_t));
362     if (nxt_slow_path(router == NULL)) {
363         return NXT_ERROR;
364     }
365 
366     nxt_queue_init(&router->engines);
367     nxt_queue_init(&router->sockets);
368     nxt_queue_init(&router->apps);
369 
370     nxt_router = router;
371 
372     controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
373     if (controller_port != NULL) {
374         nxt_router_greet_controller(task, controller_port);
375     }
376 
377     return NXT_OK;
378 }
379 
380 
381 static void
382 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port)
383 {
384     nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY,
385                           -1, 0, 0, NULL);
386 }
387 
388 
389 static void
390 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
391     void *data)
392 {
393     size_t               size;
394     uint32_t             stream;
395     nxt_mp_t             *mp;
396     nxt_int_t            ret;
397     nxt_app_t            *app;
398     nxt_buf_t            *b;
399     nxt_port_t           *main_port;
400     nxt_runtime_t        *rt;
401     nxt_app_joint_rpc_t  *app_joint_rpc;
402 
403     app = data;
404 
405     rt = task->thread->runtime;
406     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
407 
408     nxt_debug(task, "app '%V' %p start process", &app->name, app);
409 
410     size = app->name.length + 1 + app->conf.length;
411 
412     b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size);
413 
414     if (nxt_slow_path(b == NULL)) {
415         goto failed;
416     }
417 
418     nxt_buf_cpystr(b, &app->name);
419     *b->mem.free++ = '\0';
420     nxt_buf_cpystr(b, &app->conf);
421 
422     app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port,
423                                                      nxt_router_app_port_ready,
424                                                      nxt_router_app_port_error,
425                                                    sizeof(nxt_app_joint_rpc_t));
426     if (nxt_slow_path(app_joint_rpc == NULL)) {
427         goto failed;
428     }
429 
430     stream = nxt_port_rpc_ex_stream(app_joint_rpc);
431 
432     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS,
433                                 -1, stream, port->id, b);
434     if (nxt_slow_path(ret != NXT_OK)) {
435         nxt_port_rpc_cancel(task, port, stream);
436 
437         goto failed;
438     }
439 
440     app_joint_rpc->app_joint = app->joint;
441     app_joint_rpc->generation = app->generation;
442 
443     nxt_router_app_joint_use(task, app->joint, 1);
444 
445     nxt_router_app_use(task, app, -1);
446 
447     return;
448 
449 failed:
450 
451     if (b != NULL) {
452         mp = b->data;
453         nxt_mp_free(mp, b);
454         nxt_mp_release(mp);
455     }
456 
457     nxt_thread_mutex_lock(&app->mutex);
458 
459     app->pending_processes--;
460 
461     nxt_thread_mutex_unlock(&app->mutex);
462 
463     nxt_router_app_use(task, app, -1);
464 }
465 
466 
467 static void
468 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i)
469 {
470     app_joint->use_count += i;
471 
472     if (app_joint->use_count == 0) {
473         nxt_assert(app_joint->app == NULL);
474 
475         nxt_free(app_joint);
476     }
477 }
478 
479 
480 static nxt_int_t
481 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
482 {
483     nxt_int_t      res;
484     nxt_port_t     *router_port;
485     nxt_runtime_t  *rt;
486 
487     nxt_debug(task, "app '%V' start process", &app->name);
488 
489     rt = task->thread->runtime;
490     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
491 
492     nxt_router_app_use(task, app, 1);
493 
494     res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler,
495                         app);
496 
497     if (res == NXT_OK) {
498         return res;
499     }
500 
501     nxt_thread_mutex_lock(&app->mutex);
502 
503     app->pending_processes--;
504 
505     nxt_thread_mutex_unlock(&app->mutex);
506 
507     nxt_router_app_use(task, app, -1);
508 
509     return NXT_ERROR;
510 }
511 
512 
513 nxt_inline nxt_bool_t
514 nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data)
515 {
516     nxt_buf_t       *b, *next;
517     nxt_bool_t      cancelled;
518     nxt_port_t      *app_port;
519     nxt_msg_info_t  *msg_info;
520 
521     msg_info = &req_rpc_data->msg_info;
522 
523     if (msg_info->buf == NULL) {
524         return 0;
525     }
526 
527     app_port = req_rpc_data->app_port;
528 
529     if (app_port != NULL && app_port->id == NXT_SHARED_PORT_ID) {
530         cancelled = nxt_app_queue_cancel(app_port->queue,
531                                          msg_info->tracking_cookie,
532                                          req_rpc_data->stream);
533 
534         if (cancelled) {
535             nxt_debug(task, "stream #%uD: cancelled by router",
536                       req_rpc_data->stream);
537         }
538 
539     } else {
540         cancelled = 0;
541     }
542 
543     for (b = msg_info->buf; b != NULL; b = next) {
544         next = b->next;
545         b->next = NULL;
546 
547         if (b->is_port_mmap_sent) {
548             b->is_port_mmap_sent = cancelled == 0;
549         }
550 
551         b->completion_handler(task, b, b->parent);
552     }
553 
554     msg_info->buf = NULL;
555 
556     return cancelled;
557 }
558 
559 
560 nxt_inline nxt_bool_t
561 nxt_queue_chk_remove(nxt_queue_link_t *lnk)
562 {
563     if (lnk->next != NULL) {
564         nxt_queue_remove(lnk);
565 
566         lnk->next = NULL;
567 
568         return 1;
569     }
570 
571     return 0;
572 }
573 
574 
575 nxt_inline void
576 nxt_request_rpc_data_unlink(nxt_task_t *task,
577     nxt_request_rpc_data_t *req_rpc_data)
578 {
579     nxt_app_t           *app;
580     nxt_bool_t          unlinked;
581     nxt_http_request_t  *r;
582 
583     nxt_router_msg_cancel(task, req_rpc_data);
584 
585     app = req_rpc_data->app;
586 
587     if (req_rpc_data->app_port != NULL) {
588         nxt_router_app_port_release(task, app, req_rpc_data->app_port,
589                                     req_rpc_data->apr_action);
590 
591         req_rpc_data->app_port = NULL;
592     }
593 
594     r = req_rpc_data->request;
595 
596     if (r != NULL) {
597         r->timer_data = NULL;
598 
599         nxt_router_http_request_release_post(task, r);
600 
601         r->req_rpc_data = NULL;
602         req_rpc_data->request = NULL;
603 
604         if (app != NULL) {
605             unlinked = 0;
606 
607             nxt_thread_mutex_lock(&app->mutex);
608 
609             if (r->app_link.next != NULL) {
610                 nxt_queue_remove(&r->app_link);
611                 r->app_link.next = NULL;
612 
613                 unlinked = 1;
614             }
615 
616             nxt_thread_mutex_unlock(&app->mutex);
617 
618             if (unlinked) {
619                 nxt_mp_release(r->mem_pool);
620             }
621         }
622     }
623 
624     if (app != NULL) {
625         nxt_router_app_use(task, app, -1);
626 
627         req_rpc_data->app = NULL;
628     }
629 
630     if (req_rpc_data->msg_info.body_fd != -1) {
631         nxt_fd_close(req_rpc_data->msg_info.body_fd);
632 
633         req_rpc_data->msg_info.body_fd = -1;
634     }
635 
636     if (req_rpc_data->rpc_cancel) {
637         req_rpc_data->rpc_cancel = 0;
638 
639         nxt_port_rpc_cancel(task, task->thread->engine->port,
640                             req_rpc_data->stream);
641     }
642 }
643 
644 
645 static void
646 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
647 {
648     nxt_int_t      res;
649     nxt_app_t      *app;
650     nxt_port_t     *port, *main_app_port;
651     nxt_runtime_t  *rt;
652 
653     nxt_port_new_port_handler(task, msg);
654 
655     port = msg->u.new_port;
656 
657     if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) {
658         nxt_router_greet_controller(task, msg->u.new_port);
659     }
660 
661     if (port == NULL || port->type != NXT_PROCESS_APP) {
662 
663         if (msg->port_msg.stream == 0) {
664             return;
665         }
666 
667         msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
668 
669     } else {
670         if (msg->fd[1] != -1) {
671             res = nxt_router_port_queue_map(task, port, msg->fd[1]);
672             if (nxt_slow_path(res != NXT_OK)) {
673                 return;
674             }
675 
676             nxt_fd_close(msg->fd[1]);
677             msg->fd[1] = -1;
678         }
679     }
680 
681     if (msg->port_msg.stream != 0) {
682         nxt_port_rpc_handler(task, msg);
683         return;
684     }
685 
686     /*
687      * Port with "id == 0" is application 'main' port and it always
688      * should come with non-zero stream.
689      */
690     nxt_assert(port->id != 0);
691 
692     /* Find 'main' app port and get app reference. */
693     rt = task->thread->runtime;
694 
695     /*
696      * It is safe to access 'runtime->ports' hash because 'NEW_PORT'
697      * sent to main port (with id == 0) and processed in main thread.
698      */
699     main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0);
700     nxt_assert(main_app_port != NULL);
701 
702     app = main_app_port->app;
703 
704     if (nxt_fast_path(app != NULL)) {
705         nxt_thread_mutex_lock(&app->mutex);
706 
707         /* TODO here should be find-and-add code because there can be
708            port waiters in port_hash */
709         nxt_port_hash_add(&app->port_hash, port);
710         app->port_hash_count++;
711 
712         nxt_thread_mutex_unlock(&app->mutex);
713 
714         port->app = app;
715     }
716 
717     port->main_app_port = main_app_port;
718 
719     nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
720 }
721 
722 
723 static void
724 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
725 {
726     void                    *p;
727     size_t                  size;
728     nxt_int_t               ret;
729     nxt_port_t              *port;
730     nxt_router_temp_conf_t  *tmcf;
731 
732     port = nxt_runtime_port_find(task->thread->runtime,
733                                  msg->port_msg.pid,
734                                  msg->port_msg.reply_port);
735     if (nxt_slow_path(port == NULL)) {
736         nxt_alert(task, "conf_data_handler: reply port not found");
737         return;
738     }
739 
740     p = MAP_FAILED;
741 
742     /*
743      * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be
744      * initialized in 'cleanup' section.
745      */
746     size = 0;
747 
748     tmcf = nxt_router_temp_conf(task);
749     if (nxt_slow_path(tmcf == NULL)) {
750         goto fail;
751     }
752 
753     if (nxt_slow_path(msg->fd[0] == -1)) {
754         nxt_alert(task, "conf_data_handler: invalid shm fd");
755         goto fail;
756     }
757 
758     if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) {
759         nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)",
760                   (int) nxt_buf_mem_used_size(&msg->buf->mem));
761         goto fail;
762     }
763 
764     nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t));
765 
766     p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0);
767 
768     nxt_fd_close(msg->fd[0]);
769     msg->fd[0] = -1;
770 
771     if (nxt_slow_path(p == MAP_FAILED)) {
772         goto fail;
773     }
774 
775     nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p);
776 
777     tmcf->router_conf->router = nxt_router;
778     tmcf->stream = msg->port_msg.stream;
779     tmcf->port = port;
780 
781     nxt_port_use(task, tmcf->port, 1);
782 
783     ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size));
784 
785     if (nxt_fast_path(ret == NXT_OK)) {
786         nxt_router_conf_apply(task, tmcf, NULL);
787 
788     } else {
789         nxt_router_conf_error(task, tmcf);
790     }
791 
792     goto cleanup;
793 
794 fail:
795 
796     nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
797                           msg->port_msg.stream, 0, NULL);
798 
799     if (tmcf != NULL) {
800         nxt_mp_release(tmcf->mem_pool);
801     }
802 
803 cleanup:
804 
805     if (p != MAP_FAILED) {
806         nxt_mem_munmap(p, size);
807     }
808 
809     if (msg->fd[0] != -1) {
810         nxt_fd_close(msg->fd[0]);
811         msg->fd[0] = -1;
812     }
813 }
814 
815 
816 static void
817 nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
818 {
819     nxt_app_t            *app;
820     nxt_int_t            ret;
821     nxt_str_t            app_name;
822     nxt_port_t           *port, *reply_port, *shared_port, *old_shared_port;
823     nxt_port_msg_type_t  reply;
824 
825     reply_port = nxt_runtime_port_find(task->thread->runtime,
826                                        msg->port_msg.pid,
827                                        msg->port_msg.reply_port);
828     if (nxt_slow_path(reply_port == NULL)) {
829         nxt_alert(task, "app_restart_handler: reply port not found");
830         return;
831     }
832 
833     app_name.length = nxt_buf_mem_used_size(&msg->buf->mem);
834     app_name.start = msg->buf->mem.pos;
835 
836     nxt_debug(task, "app_restart_handler: %V", &app_name);
837 
838     app = nxt_router_app_find(&nxt_router->apps, &app_name);
839 
840     if (nxt_fast_path(app != NULL)) {
841         shared_port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
842                                    NXT_PROCESS_APP);
843         if (nxt_slow_path(shared_port == NULL)) {
844             goto fail;
845         }
846 
847         ret = nxt_port_socket_init(task, shared_port, 0);
848         if (nxt_slow_path(ret != NXT_OK)) {
849             nxt_port_use(task, shared_port, -1);
850             goto fail;
851         }
852 
853         ret = nxt_router_app_queue_init(task, shared_port);
854         if (nxt_slow_path(ret != NXT_OK)) {
855             nxt_port_write_close(shared_port);
856             nxt_port_read_close(shared_port);
857             nxt_port_use(task, shared_port, -1);
858             goto fail;
859         }
860 
861         nxt_port_write_enable(task, shared_port);
862 
863         nxt_thread_mutex_lock(&app->mutex);
864 
865         nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
866 
867             (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1,
868                                          0, 0, NULL);
869 
870         } nxt_queue_loop;
871 
872         app->generation++;
873 
874         shared_port->app = app;
875 
876         old_shared_port = app->shared_port;
877         old_shared_port->app = NULL;
878 
879         app->shared_port = shared_port;
880 
881         nxt_thread_mutex_unlock(&app->mutex);
882 
883         nxt_port_close(task, old_shared_port);
884         nxt_port_use(task, old_shared_port, -1);
885 
886         reply = NXT_PORT_MSG_RPC_READY_LAST;
887 
888     } else {
889 
890 fail:
891 
892         reply = NXT_PORT_MSG_RPC_ERROR;
893     }
894 
895     nxt_port_socket_write(task, reply_port, reply, -1, msg->port_msg.stream,
896                           0, NULL);
897 }
898 
899 
900 static void
901 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
902     void *data)
903 {
904     union {
905         nxt_pid_t  removed_pid;
906         void       *data;
907     } u;
908 
909     u.data = data;
910 
911     nxt_port_rpc_remove_peer(task, port, u.removed_pid);
912 }
913 
914 
915 static void
916 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
917 {
918     nxt_event_engine_t  *engine;
919 
920     nxt_port_remove_pid_handler(task, msg);
921 
922     nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
923     {
924         if (nxt_fast_path(engine->port != NULL)) {
925             nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid,
926                           msg->u.data);
927         }
928     }
929     nxt_queue_loop;
930 
931     if (msg->port_msg.stream == 0) {
932         return;
933     }
934 
935     msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
936 
937     nxt_port_rpc_handler(task, msg);
938 }
939 
940 
941 static nxt_router_temp_conf_t *
942 nxt_router_temp_conf(nxt_task_t *task)
943 {
944     nxt_mp_t                *mp, *tmp;
945     nxt_router_conf_t       *rtcf;
946     nxt_router_temp_conf_t  *tmcf;
947 
948     mp = nxt_mp_create(1024, 128, 256, 32);
949     if (nxt_slow_path(mp == NULL)) {
950         return NULL;
951     }
952 
953     rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t));
954     if (nxt_slow_path(rtcf == NULL)) {
955         goto fail;
956     }
957 
958     rtcf->mem_pool = mp;
959 
960     tmp = nxt_mp_create(1024, 128, 256, 32);
961     if (nxt_slow_path(tmp == NULL)) {
962         goto fail;
963     }
964 
965     tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t));
966     if (nxt_slow_path(tmcf == NULL)) {
967         goto temp_fail;
968     }
969 
970     tmcf->mem_pool = tmp;
971     tmcf->router_conf = rtcf;
972     tmcf->count = 1;
973     tmcf->engine = task->thread->engine;
974 
975     tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
976                                      sizeof(nxt_router_engine_conf_t));
977     if (nxt_slow_path(tmcf->engines == NULL)) {
978         goto temp_fail;
979     }
980 
981     nxt_queue_init(&creating_sockets);
982     nxt_queue_init(&pending_sockets);
983     nxt_queue_init(&updating_sockets);
984     nxt_queue_init(&keeping_sockets);
985     nxt_queue_init(&deleting_sockets);
986 
987 #if (NXT_TLS)
988     nxt_queue_init(&tmcf->tls);
989 #endif
990 
991     nxt_queue_init(&tmcf->apps);
992     nxt_queue_init(&tmcf->previous);
993 
994     return tmcf;
995 
996 temp_fail:
997 
998     nxt_mp_destroy(tmp);
999 
1000 fail:
1001 
1002     nxt_mp_destroy(mp);
1003 
1004     return NULL;
1005 }
1006 
1007 
1008 nxt_inline nxt_bool_t
1009 nxt_router_app_can_start(nxt_app_t *app)
1010 {
1011     return app->processes + app->pending_processes < app->max_processes
1012             && app->pending_processes < app->max_pending_processes;
1013 }
1014 
1015 
1016 nxt_inline nxt_bool_t
1017 nxt_router_app_need_start(nxt_app_t *app)
1018 {
1019     return (app->active_requests
1020               > app->port_hash_count + app->pending_processes)
1021            || (app->spare_processes
1022                 > app->idle_processes + app->pending_processes);
1023 }
1024 
1025 
1026 static void
1027 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
1028 {
1029     nxt_int_t                    ret;
1030     nxt_app_t                    *app;
1031     nxt_router_t                 *router;
1032     nxt_runtime_t                *rt;
1033     nxt_queue_link_t             *qlk;
1034     nxt_socket_conf_t            *skcf;
1035     nxt_router_conf_t            *rtcf;
1036     nxt_router_temp_conf_t       *tmcf;
1037     const nxt_event_interface_t  *interface;
1038 #if (NXT_TLS)
1039     nxt_router_tlssock_t         *tls;
1040 #endif
1041 
1042     tmcf = obj;
1043 
1044     qlk = nxt_queue_first(&pending_sockets);
1045 
1046     if (qlk != nxt_queue_tail(&pending_sockets)) {
1047         nxt_queue_remove(qlk);
1048         nxt_queue_insert_tail(&creating_sockets, qlk);
1049 
1050         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1051 
1052         nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
1053 
1054         return;
1055     }
1056 
1057 #if (NXT_TLS)
1058     qlk = nxt_queue_last(&tmcf->tls);
1059 
1060     if (qlk != nxt_queue_head(&tmcf->tls)) {
1061         nxt_queue_remove(qlk);
1062 
1063         tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link);
1064 
1065         nxt_cert_store_get(task, &tls->name, tmcf->mem_pool,
1066                            nxt_router_tls_rpc_handler, tls);
1067         return;
1068     }
1069 #endif
1070 
1071     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1072 
1073         if (nxt_router_app_need_start(app)) {
1074             nxt_router_app_rpc_create(task, tmcf, app);
1075             return;
1076         }
1077 
1078     } nxt_queue_loop;
1079 
1080     rtcf = tmcf->router_conf;
1081 
1082     if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) {
1083         nxt_router_access_log_open(task, tmcf);
1084         return;
1085     }
1086 
1087     rt = task->thread->runtime;
1088 
1089     interface = nxt_service_get(rt->services, "engine", NULL);
1090 
1091     router = rtcf->router;
1092 
1093     ret = nxt_router_engines_create(task, router, tmcf, interface);
1094     if (nxt_slow_path(ret != NXT_OK)) {
1095         goto fail;
1096     }
1097 
1098     ret = nxt_router_threads_create(task, rt, tmcf);
1099     if (nxt_slow_path(ret != NXT_OK)) {
1100         goto fail;
1101     }
1102 
1103     nxt_router_apps_sort(task, router, tmcf);
1104 
1105     nxt_router_apps_hash_use(task, rtcf, 1);
1106 
1107     nxt_router_engines_post(router, tmcf);
1108 
1109     nxt_queue_add(&router->sockets, &updating_sockets);
1110     nxt_queue_add(&router->sockets, &creating_sockets);
1111 
1112     router->access_log = rtcf->access_log;
1113 
1114     nxt_router_conf_ready(task, tmcf);
1115 
1116     return;
1117 
1118 fail:
1119 
1120     nxt_router_conf_error(task, tmcf);
1121 
1122     return;
1123 }
1124 
1125 
1126 static void
1127 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data)
1128 {
1129     nxt_joint_job_t  *job;
1130 
1131     job = obj;
1132 
1133     nxt_router_conf_ready(task, job->tmcf);
1134 }
1135 
1136 
1137 static void
1138 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1139 {
1140     uint32_t               count;
1141     nxt_router_conf_t      *rtcf;
1142     nxt_thread_spinlock_t  *lock;
1143 
1144     nxt_debug(task, "temp conf %p count: %D", tmcf, tmcf->count);
1145 
1146     if (--tmcf->count > 0) {
1147         return;
1148     }
1149 
1150     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
1151 
1152     rtcf = tmcf->router_conf;
1153 
1154     lock = &rtcf->router->lock;
1155 
1156     nxt_thread_spin_lock(lock);
1157 
1158     count = rtcf->count;
1159 
1160     nxt_thread_spin_unlock(lock);
1161 
1162     nxt_debug(task, "rtcf %p: %D", rtcf, count);
1163 
1164     if (count == 0) {
1165         nxt_router_apps_hash_use(task, rtcf, -1);
1166 
1167         nxt_router_access_log_release(task, lock, rtcf->access_log);
1168 
1169         nxt_mp_destroy(rtcf->mem_pool);
1170     }
1171 
1172     nxt_mp_release(tmcf->mem_pool);
1173 }
1174 
1175 
1176 static void
1177 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1178 {
1179     nxt_app_t          *app;
1180     nxt_queue_t        new_socket_confs;
1181     nxt_socket_t       s;
1182     nxt_router_t       *router;
1183     nxt_queue_link_t   *qlk;
1184     nxt_socket_conf_t  *skcf;
1185     nxt_router_conf_t  *rtcf;
1186 
1187     nxt_alert(task, "failed to apply new conf");
1188 
1189     for (qlk = nxt_queue_first(&creating_sockets);
1190          qlk != nxt_queue_tail(&creating_sockets);
1191          qlk = nxt_queue_next(qlk))
1192     {
1193         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1194         s = skcf->listen->socket;
1195 
1196         if (s != -1) {
1197             nxt_socket_close(task, s);
1198         }
1199 
1200         nxt_free(skcf->listen);
1201     }
1202 
1203     nxt_queue_init(&new_socket_confs);
1204     nxt_queue_add(&new_socket_confs, &updating_sockets);
1205     nxt_queue_add(&new_socket_confs, &pending_sockets);
1206     nxt_queue_add(&new_socket_confs, &creating_sockets);
1207 
1208     rtcf = tmcf->router_conf;
1209 
1210     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1211 
1212         nxt_router_app_unlink(task, app);
1213 
1214     } nxt_queue_loop;
1215 
1216     router = rtcf->router;
1217 
1218     nxt_queue_add(&router->sockets, &keeping_sockets);
1219     nxt_queue_add(&router->sockets, &deleting_sockets);
1220 
1221     nxt_queue_add(&router->apps, &tmcf->previous);
1222 
1223     // TODO: new engines and threads
1224 
1225     nxt_router_access_log_release(task, &router->lock, rtcf->access_log);
1226 
1227     nxt_mp_destroy(rtcf->mem_pool);
1228 
1229     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
1230 
1231     nxt_mp_release(tmcf->mem_pool);
1232 }
1233 
1234 
1235 static void
1236 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1237     nxt_port_msg_type_t type)
1238 {
1239     nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);
1240 
1241     nxt_port_use(task, tmcf->port, -1);
1242 
1243     tmcf->port = NULL;
1244 }
1245 
1246 
1247 static nxt_conf_map_t  nxt_router_conf[] = {
1248     {
1249         nxt_string("listeners_threads"),
1250         NXT_CONF_MAP_INT32,
1251         offsetof(nxt_router_conf_t, threads),
1252     },
1253 };
1254 
1255 
1256 static nxt_conf_map_t  nxt_router_app_conf[] = {
1257     {
1258         nxt_string("type"),
1259         NXT_CONF_MAP_STR,
1260         offsetof(nxt_router_app_conf_t, type),
1261     },
1262 
1263     {
1264         nxt_string("limits"),
1265         NXT_CONF_MAP_PTR,
1266         offsetof(nxt_router_app_conf_t, limits_value),
1267     },
1268 
1269     {
1270         nxt_string("processes"),
1271         NXT_CONF_MAP_INT32,
1272         offsetof(nxt_router_app_conf_t, processes),
1273     },
1274 
1275     {
1276         nxt_string("processes"),
1277         NXT_CONF_MAP_PTR,
1278         offsetof(nxt_router_app_conf_t, processes_value),
1279     },
1280 
1281     {
1282         nxt_string("targets"),
1283         NXT_CONF_MAP_PTR,
1284         offsetof(nxt_router_app_conf_t, targets_value),
1285     },
1286 };
1287 
1288 
1289 static nxt_conf_map_t  nxt_router_app_limits_conf[] = {
1290     {
1291         nxt_string("timeout"),
1292         NXT_CONF_MAP_MSEC,
1293         offsetof(nxt_router_app_conf_t, timeout),
1294     },
1295 };
1296 
1297 
1298 static nxt_conf_map_t  nxt_router_app_processes_conf[] = {
1299     {
1300         nxt_string("spare"),
1301         NXT_CONF_MAP_INT32,
1302         offsetof(nxt_router_app_conf_t, spare_processes),
1303     },
1304 
1305     {
1306         nxt_string("max"),
1307         NXT_CONF_MAP_INT32,
1308         offsetof(nxt_router_app_conf_t, max_processes),
1309     },
1310 
1311     {
1312         nxt_string("idle_timeout"),
1313         NXT_CONF_MAP_MSEC,
1314         offsetof(nxt_router_app_conf_t, idle_timeout),
1315     },
1316 };
1317 
1318 
1319 static nxt_conf_map_t  nxt_router_listener_conf[] = {
1320     {
1321         nxt_string("pass"),
1322         NXT_CONF_MAP_STR_COPY,
1323         offsetof(nxt_router_listener_conf_t, pass),
1324     },
1325 
1326     {
1327         nxt_string("application"),
1328         NXT_CONF_MAP_STR_COPY,
1329         offsetof(nxt_router_listener_conf_t, application),
1330     },
1331 };
1332 
1333 
1334 static nxt_conf_map_t  nxt_router_http_conf[] = {
1335     {
1336         nxt_string("header_buffer_size"),
1337         NXT_CONF_MAP_SIZE,
1338         offsetof(nxt_socket_conf_t, header_buffer_size),
1339     },
1340 
1341     {
1342         nxt_string("large_header_buffer_size"),
1343         NXT_CONF_MAP_SIZE,
1344         offsetof(nxt_socket_conf_t, large_header_buffer_size),
1345     },
1346 
1347     {
1348         nxt_string("large_header_buffers"),
1349         NXT_CONF_MAP_SIZE,
1350         offsetof(nxt_socket_conf_t, large_header_buffers),
1351     },
1352 
1353     {
1354         nxt_string("body_buffer_size"),
1355         NXT_CONF_MAP_SIZE,
1356         offsetof(nxt_socket_conf_t, body_buffer_size),
1357     },
1358 
1359     {
1360         nxt_string("max_body_size"),
1361         NXT_CONF_MAP_SIZE,
1362         offsetof(nxt_socket_conf_t, max_body_size),
1363     },
1364 
1365     {
1366         nxt_string("idle_timeout"),
1367         NXT_CONF_MAP_MSEC,
1368         offsetof(nxt_socket_conf_t, idle_timeout),
1369     },
1370 
1371     {
1372         nxt_string("header_read_timeout"),
1373         NXT_CONF_MAP_MSEC,
1374         offsetof(nxt_socket_conf_t, header_read_timeout),
1375     },
1376 
1377     {
1378         nxt_string("body_read_timeout"),
1379         NXT_CONF_MAP_MSEC,
1380         offsetof(nxt_socket_conf_t, body_read_timeout),
1381     },
1382 
1383     {
1384         nxt_string("send_timeout"),
1385         NXT_CONF_MAP_MSEC,
1386         offsetof(nxt_socket_conf_t, send_timeout),
1387     },
1388 
1389     {
1390         nxt_string("body_temp_path"),
1391         NXT_CONF_MAP_STR,
1392         offsetof(nxt_socket_conf_t, body_temp_path),
1393     },
1394 
1395     {
1396         nxt_string("discard_unsafe_fields"),
1397         NXT_CONF_MAP_INT8,
1398         offsetof(nxt_socket_conf_t, discard_unsafe_fields),
1399     },
1400 };
1401 
1402 
1403 static nxt_conf_map_t  nxt_router_websocket_conf[] = {
1404     {
1405         nxt_string("max_frame_size"),
1406         NXT_CONF_MAP_SIZE,
1407         offsetof(nxt_websocket_conf_t, max_frame_size),
1408     },
1409 
1410     {
1411         nxt_string("read_timeout"),
1412         NXT_CONF_MAP_MSEC,
1413         offsetof(nxt_websocket_conf_t, read_timeout),
1414     },
1415 
1416     {
1417         nxt_string("keepalive_interval"),
1418         NXT_CONF_MAP_MSEC,
1419         offsetof(nxt_websocket_conf_t, keepalive_interval),
1420     },
1421 
1422 };
1423 
1424 
1425 static nxt_int_t
1426 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1427     u_char *start, u_char *end)
1428 {
1429     u_char                      *p;
1430     size_t                      size;
1431     nxt_mp_t                    *mp, *app_mp;
1432     uint32_t                    next, next_target;
1433     nxt_int_t                   ret;
1434     nxt_str_t                   name, path, target;
1435     nxt_app_t                   *app, *prev;
1436     nxt_str_t                   *t, *s, *targets;
1437     nxt_uint_t                  n, i;
1438     nxt_port_t                  *port;
1439     nxt_router_t                *router;
1440     nxt_app_joint_t             *app_joint;
1441 #if (NXT_TLS)
1442     nxt_tls_init_t              *tls_init;
1443     nxt_conf_value_t            *certificate;
1444 #endif
1445     nxt_conf_value_t            *conf, *http, *value, *websocket;
1446     nxt_conf_value_t            *applications, *application;
1447     nxt_conf_value_t            *listeners, *listener;
1448     nxt_conf_value_t            *routes_conf, *static_conf, *client_ip_conf;
1449     nxt_socket_conf_t           *skcf;
1450     nxt_http_routes_t           *routes;
1451     nxt_event_engine_t          *engine;
1452     nxt_app_lang_module_t       *lang;
1453     nxt_router_app_conf_t       apcf;
1454     nxt_router_access_log_t     *access_log;
1455     nxt_router_listener_conf_t  lscf;
1456 
1457     static nxt_str_t  http_path = nxt_string("/settings/http");
1458     static nxt_str_t  applications_path = nxt_string("/applications");
1459     static nxt_str_t  listeners_path = nxt_string("/listeners");
1460     static nxt_str_t  routes_path = nxt_string("/routes");
1461     static nxt_str_t  access_log_path = nxt_string("/access_log");
1462 #if (NXT_TLS)
1463     static nxt_str_t  certificate_path = nxt_string("/tls/certificate");
1464     static nxt_str_t  conf_commands_path = nxt_string("/tls/conf_commands");
1465     static nxt_str_t  conf_cache_path = nxt_string("/tls/session/cache_size");
1466     static nxt_str_t  conf_timeout_path = nxt_string("/tls/session/timeout");
1467     static nxt_str_t  conf_tickets = nxt_string("/tls/session/tickets");
1468 #endif
1469     static nxt_str_t  static_path = nxt_string("/settings/http/static");
1470     static nxt_str_t  websocket_path = nxt_string("/settings/http/websocket");
1471     static nxt_str_t  client_ip_path = nxt_string("/client_ip");
1472 
1473     conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
1474     if (conf == NULL) {
1475         nxt_alert(task, "configuration parsing error");
1476         return NXT_ERROR;
1477     }
1478 
1479     mp = tmcf->router_conf->mem_pool;
1480 
1481     ret = nxt_conf_map_object(mp, conf, nxt_router_conf,
1482                               nxt_nitems(nxt_router_conf), tmcf->router_conf);
1483     if (ret != NXT_OK) {
1484         nxt_alert(task, "root map error");
1485         return NXT_ERROR;
1486     }
1487 
1488     if (tmcf->router_conf->threads == 0) {
1489         tmcf->router_conf->threads = nxt_ncpu;
1490     }
1491 
1492     static_conf = nxt_conf_get_path(conf, &static_path);
1493 
1494     ret = nxt_router_conf_process_static(task, tmcf->router_conf, static_conf);
1495     if (nxt_slow_path(ret != NXT_OK)) {
1496         return NXT_ERROR;
1497     }
1498 
1499     router = tmcf->router_conf->router;
1500 
1501     applications = nxt_conf_get_path(conf, &applications_path);
1502 
1503     if (applications != NULL) {
1504         next = 0;
1505 
1506         for ( ;; ) {
1507             application = nxt_conf_next_object_member(applications,
1508                                                       &name, &next);
1509             if (application == NULL) {
1510                 break;
1511             }
1512 
1513             nxt_debug(task, "application \"%V\"", &name);
1514 
1515             size = nxt_conf_json_length(application, NULL);
1516 
1517             app_mp = nxt_mp_create(4096, 128, 1024, 64);
1518             if (nxt_slow_path(app_mp == NULL)) {
1519                 goto fail;
1520             }
1521 
1522             app = nxt_mp_get(app_mp, sizeof(nxt_app_t) + name.length + size);
1523             if (app == NULL) {
1524                 goto app_fail;
1525             }
1526 
1527             nxt_memzero(app, sizeof(nxt_app_t));
1528 
1529             app->mem_pool = app_mp;
1530 
1531             app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t));
1532             app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t)
1533                                                   + name.length);
1534 
1535             p = nxt_conf_json_print(app->conf.start, application, NULL);
1536             app->conf.length = p - app->conf.start;
1537 
1538             nxt_assert(app->conf.length <= size);
1539 
1540             nxt_debug(task, "application conf \"%V\"", &app->conf);
1541 
1542             prev = nxt_router_app_find(&router->apps, &name);
1543 
1544             if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
1545                 nxt_mp_destroy(app_mp);
1546 
1547                 nxt_queue_remove(&prev->link);
1548                 nxt_queue_insert_tail(&tmcf->previous, &prev->link);
1549 
1550                 ret = nxt_router_apps_hash_add(tmcf->router_conf, prev);
1551                 if (nxt_slow_path(ret != NXT_OK)) {
1552                     goto fail;
1553                 }
1554 
1555                 continue;
1556             }
1557 
1558             apcf.processes = 1;
1559             apcf.max_processes = 1;
1560             apcf.spare_processes = 0;
1561             apcf.timeout = 0;
1562             apcf.idle_timeout = 15000;
1563             apcf.limits_value = NULL;
1564             apcf.processes_value = NULL;
1565             apcf.targets_value = NULL;
1566 
1567             app_joint = nxt_malloc(sizeof(nxt_app_joint_t));
1568             if (nxt_slow_path(app_joint == NULL)) {
1569                 goto app_fail;
1570             }
1571 
1572             nxt_memzero(app_joint, sizeof(nxt_app_joint_t));
1573 
1574             ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
1575                                       nxt_nitems(nxt_router_app_conf), &apcf);
1576             if (ret != NXT_OK) {
1577                 nxt_alert(task, "application map error");
1578                 goto app_fail;
1579             }
1580 
1581             if (apcf.limits_value != NULL) {
1582 
1583                 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) {
1584                     nxt_alert(task, "application limits is not object");
1585                     goto app_fail;
1586                 }
1587 
1588                 ret = nxt_conf_map_object(mp, apcf.limits_value,
1589                                         nxt_router_app_limits_conf,
1590                                         nxt_nitems(nxt_router_app_limits_conf),
1591                                         &apcf);
1592                 if (ret != NXT_OK) {
1593                     nxt_alert(task, "application limits map error");
1594                     goto app_fail;
1595                 }
1596             }
1597 
1598             if (apcf.processes_value != NULL
1599                 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT)
1600             {
1601                 ret = nxt_conf_map_object(mp, apcf.processes_value,
1602                                      nxt_router_app_processes_conf,
1603                                      nxt_nitems(nxt_router_app_processes_conf),
1604                                      &apcf);
1605                 if (ret != NXT_OK) {
1606                     nxt_alert(task, "application processes map error");
1607                     goto app_fail;
1608                 }
1609 
1610             } else {
1611                 apcf.max_processes = apcf.processes;
1612                 apcf.spare_processes = apcf.processes;
1613             }
1614 
1615             if (apcf.targets_value != NULL) {
1616                 n = nxt_conf_object_members_count(apcf.targets_value);
1617 
1618                 targets = nxt_mp_get(app_mp, sizeof(nxt_str_t) * n);
1619                 if (nxt_slow_path(targets == NULL)) {
1620                     goto app_fail;
1621                 }
1622 
1623                 next_target = 0;
1624 
1625                 for (i = 0; i < n; i++) {
1626                     (void) nxt_conf_next_object_member(apcf.targets_value,
1627                                                        &target, &next_target);
1628 
1629                     s = nxt_str_dup(app_mp, &targets[i], &target);
1630                     if (nxt_slow_path(s == NULL)) {
1631                         goto app_fail;
1632                     }
1633                 }
1634 
1635             } else {
1636                 targets = NULL;
1637             }
1638 
1639             nxt_debug(task, "application type: %V", &apcf.type);
1640             nxt_debug(task, "application processes: %D", apcf.processes);
1641             nxt_debug(task, "application request timeout: %M", apcf.timeout);
1642 
1643             lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
1644 
1645             if (lang == NULL) {
1646                 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type);
1647                 goto app_fail;
1648             }
1649 
1650             nxt_debug(task, "application language module: \"%s\"", lang->file);
1651 
1652             ret = nxt_thread_mutex_create(&app->mutex);
1653             if (ret != NXT_OK) {
1654                 goto app_fail;
1655             }
1656 
1657             nxt_queue_init(&app->ports);
1658             nxt_queue_init(&app->spare_ports);
1659             nxt_queue_init(&app->idle_ports);
1660             nxt_queue_init(&app->ack_waiting_req);
1661 
1662             app->name.length = name.length;
1663             nxt_memcpy(app->name.start, name.start, name.length);
1664 
1665             app->type = lang->type;
1666             app->max_processes = apcf.max_processes;
1667             app->spare_processes = apcf.spare_processes;
1668             app->max_pending_processes = apcf.spare_processes
1669                                          ? apcf.spare_processes : 1;
1670             app->timeout = apcf.timeout;
1671             app->idle_timeout = apcf.idle_timeout;
1672 
1673             app->targets = targets;
1674 
1675             engine = task->thread->engine;
1676 
1677             app->engine = engine;
1678 
1679             app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;
1680             app->adjust_idle_work.task = &engine->task;
1681             app->adjust_idle_work.obj = app;
1682 
1683             nxt_queue_insert_tail(&tmcf->apps, &app->link);
1684 
1685             ret = nxt_router_apps_hash_add(tmcf->router_conf, app);
1686             if (nxt_slow_path(ret != NXT_OK)) {
1687                 goto app_fail;
1688             }
1689 
1690             nxt_router_app_use(task, app, 1);
1691 
1692             app->joint = app_joint;
1693 
1694             app_joint->use_count = 1;
1695             app_joint->app = app;
1696 
1697             app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS;
1698             app_joint->idle_timer.work_queue = &engine->fast_work_queue;
1699             app_joint->idle_timer.handler = nxt_router_app_idle_timeout;
1700             app_joint->idle_timer.task = &engine->task;
1701             app_joint->idle_timer.log = app_joint->idle_timer.task->log;
1702 
1703             app_joint->free_app_work.handler = nxt_router_free_app;
1704             app_joint->free_app_work.task = &engine->task;
1705             app_joint->free_app_work.obj = app_joint;
1706 
1707             port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
1708                                 NXT_PROCESS_APP);
1709             if (nxt_slow_path(port == NULL)) {
1710                 return NXT_ERROR;
1711             }
1712 
1713             ret = nxt_port_socket_init(task, port, 0);
1714             if (nxt_slow_path(ret != NXT_OK)) {
1715                 nxt_port_use(task, port, -1);
1716                 return NXT_ERROR;
1717             }
1718 
1719             ret = nxt_router_app_queue_init(task, port);
1720             if (nxt_slow_path(ret != NXT_OK)) {
1721                 nxt_port_write_close(port);
1722                 nxt_port_read_close(port);
1723                 nxt_port_use(task, port, -1);
1724                 return NXT_ERROR;
1725             }
1726 
1727             nxt_port_write_enable(task, port);
1728             port->app = app;
1729 
1730             app->shared_port = port;
1731 
1732             nxt_thread_mutex_create(&app->outgoing.mutex);
1733         }
1734     }
1735 
1736     routes_conf = nxt_conf_get_path(conf, &routes_path);
1737     if (nxt_fast_path(routes_conf != NULL)) {
1738         routes = nxt_http_routes_create(task, tmcf, routes_conf);
1739         if (nxt_slow_path(routes == NULL)) {
1740             return NXT_ERROR;
1741         }
1742         tmcf->router_conf->routes = routes;
1743     }
1744 
1745     ret = nxt_upstreams_create(task, tmcf, conf);
1746     if (nxt_slow_path(ret != NXT_OK)) {
1747         return ret;
1748     }
1749 
1750     http = nxt_conf_get_path(conf, &http_path);
1751 #if 0
1752     if (http == NULL) {
1753         nxt_alert(task, "no \"http\" block");
1754         return NXT_ERROR;
1755     }
1756 #endif
1757 
1758     websocket = nxt_conf_get_path(conf, &websocket_path);
1759 
1760     listeners = nxt_conf_get_path(conf, &listeners_path);
1761 
1762     if (listeners != NULL) {
1763         next = 0;
1764 
1765         for ( ;; ) {
1766             listener = nxt_conf_next_object_member(listeners, &name, &next);
1767             if (listener == NULL) {
1768                 break;
1769             }
1770 
1771             skcf = nxt_router_socket_conf(task, tmcf, &name);
1772             if (skcf == NULL) {
1773                 goto fail;
1774             }
1775 
1776             nxt_memzero(&lscf, sizeof(lscf));
1777 
1778             ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
1779                                       nxt_nitems(nxt_router_listener_conf),
1780                                       &lscf);
1781             if (ret != NXT_OK) {
1782                 nxt_alert(task, "listener map error");
1783                 goto fail;
1784             }
1785 
1786             nxt_debug(task, "application: %V", &lscf.application);
1787 
1788             // STUB, default values if http block is not defined.
1789             skcf->header_buffer_size = 2048;
1790             skcf->large_header_buffer_size = 8192;
1791             skcf->large_header_buffers = 4;
1792             skcf->discard_unsafe_fields = 1;
1793             skcf->body_buffer_size = 16 * 1024;
1794             skcf->max_body_size = 8 * 1024 * 1024;
1795             skcf->proxy_header_buffer_size = 64 * 1024;
1796             skcf->proxy_buffer_size = 4096;
1797             skcf->proxy_buffers = 256;
1798             skcf->idle_timeout = 180 * 1000;
1799             skcf->header_read_timeout = 30 * 1000;
1800             skcf->body_read_timeout = 30 * 1000;
1801             skcf->send_timeout = 30 * 1000;
1802             skcf->proxy_timeout = 60 * 1000;
1803             skcf->proxy_send_timeout = 30 * 1000;
1804             skcf->proxy_read_timeout = 30 * 1000;
1805 
1806             skcf->websocket_conf.max_frame_size = 1024 * 1024;
1807             skcf->websocket_conf.read_timeout = 60 * 1000;
1808             skcf->websocket_conf.keepalive_interval = 30 * 1000;
1809 
1810             nxt_str_null(&skcf->body_temp_path);
1811 
1812             if (http != NULL) {
1813                 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
1814                                           nxt_nitems(nxt_router_http_conf),
1815                                           skcf);
1816                 if (ret != NXT_OK) {
1817                     nxt_alert(task, "http map error");
1818                     goto fail;
1819                 }
1820             }
1821 
1822             if (websocket != NULL) {
1823                 ret = nxt_conf_map_object(mp, websocket,
1824                                           nxt_router_websocket_conf,
1825                                           nxt_nitems(nxt_router_websocket_conf),
1826                                           &skcf->websocket_conf);
1827                 if (ret != NXT_OK) {
1828                     nxt_alert(task, "websocket map error");
1829                     goto fail;
1830                 }
1831             }
1832 
1833             t = &skcf->body_temp_path;
1834 
1835             if (t->length == 0) {
1836                 t->start = (u_char *) task->thread->runtime->tmp;
1837                 t->length = nxt_strlen(t->start);
1838             }
1839 
1840             client_ip_conf = nxt_conf_get_path(listener, &client_ip_path);
1841             ret = nxt_router_conf_process_client_ip(task, tmcf, skcf,
1842                                                     client_ip_conf);
1843             if (nxt_slow_path(ret != NXT_OK)) {
1844                 return NXT_ERROR;
1845             }
1846 
1847 #if (NXT_TLS)
1848             certificate = nxt_conf_get_path(listener, &certificate_path);
1849 
1850             if (certificate != NULL) {
1851                 tls_init = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_tls_init_t));
1852                 if (nxt_slow_path(tls_init == NULL)) {
1853                     return NXT_ERROR;
1854                 }
1855 
1856                 tls_init->cache_size = 0;
1857                 tls_init->timeout = 300;
1858 
1859                 value = nxt_conf_get_path(listener, &conf_cache_path);
1860                 if (value != NULL) {
1861                     tls_init->cache_size = nxt_conf_get_number(value);
1862                 }
1863 
1864                 value = nxt_conf_get_path(listener, &conf_timeout_path);
1865                 if (value != NULL) {
1866                     tls_init->timeout = nxt_conf_get_number(value);
1867                 }
1868 
1869                 tls_init->conf_cmds = nxt_conf_get_path(listener,
1870                                                         &conf_commands_path);
1871 
1872                 tls_init->tickets_conf = nxt_conf_get_path(listener,
1873                                                            &conf_tickets);
1874 
1875                 if (nxt_conf_type(certificate) == NXT_CONF_ARRAY) {
1876                     n = nxt_conf_array_elements_count(certificate);
1877 
1878                     for (i = 0; i < n; i++) {
1879                         value = nxt_conf_get_array_element(certificate, i);
1880 
1881                         nxt_assert(value != NULL);
1882 
1883                         ret = nxt_router_conf_tls_insert(tmcf, value, skcf,
1884                                                          tls_init, i == 0);
1885                         if (nxt_slow_path(ret != NXT_OK)) {
1886                             goto fail;
1887                         }
1888                     }
1889 
1890                 } else {
1891                     /* NXT_CONF_STRING */
1892                     ret = nxt_router_conf_tls_insert(tmcf, certificate, skcf,
1893                                                      tls_init, 1);
1894                     if (nxt_slow_path(ret != NXT_OK)) {
1895                         goto fail;
1896                     }
1897                 }
1898             }
1899 #endif
1900 
1901             skcf->listen->handler = nxt_http_conn_init;
1902             skcf->router_conf = tmcf->router_conf;
1903             skcf->router_conf->count++;
1904 
1905             if (lscf.pass.length != 0) {
1906                 skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass);
1907 
1908             /* COMPATIBILITY: listener application. */
1909             } else if (lscf.application.length > 0) {
1910                 skcf->action = nxt_http_pass_application(task,
1911                                                          tmcf->router_conf,
1912                                                          &lscf.application);
1913             }
1914 
1915             if (nxt_slow_path(skcf->action == NULL)) {
1916                 goto fail;
1917             }
1918         }
1919     }
1920 
1921     ret = nxt_http_routes_resolve(task, tmcf);
1922     if (nxt_slow_path(ret != NXT_OK)) {
1923         goto fail;
1924     }
1925 
1926     value = nxt_conf_get_path(conf, &access_log_path);
1927 
1928     if (value != NULL) {
1929         nxt_conf_get_string(value, &path);
1930 
1931         access_log = router->access_log;
1932 
1933         if (access_log != NULL && nxt_strstr_eq(&path, &access_log->path)) {
1934             nxt_thread_spin_lock(&router->lock);
1935             access_log->count++;
1936             nxt_thread_spin_unlock(&router->lock);
1937 
1938         } else {
1939             access_log = nxt_malloc(sizeof(nxt_router_access_log_t)
1940                                     + path.length);
1941             if (access_log == NULL) {
1942                 nxt_alert(task, "failed to allocate access log structure");
1943                 goto fail;
1944             }
1945 
1946             access_log->fd = -1;
1947             access_log->handler = &nxt_router_access_log_writer;
1948             access_log->count = 1;
1949 
1950             access_log->path.length = path.length;
1951             access_log->path.start = (u_char *) access_log
1952                                      + sizeof(nxt_router_access_log_t);
1953 
1954             nxt_memcpy(access_log->path.start, path.start, path.length);
1955         }
1956 
1957         tmcf->router_conf->access_log = access_log;
1958     }
1959 
1960     nxt_queue_add(&deleting_sockets, &router->sockets);
1961     nxt_queue_init(&router->sockets);
1962 
1963     return NXT_OK;
1964 
1965 app_fail:
1966 
1967     nxt_mp_destroy(app_mp);
1968 
1969 fail:
1970 
1971     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1972 
1973         nxt_queue_remove(&app->link);
1974         nxt_thread_mutex_destroy(&app->mutex);
1975         nxt_mp_destroy(app->mem_pool);
1976 
1977     } nxt_queue_loop;
1978 
1979     return NXT_ERROR;
1980 }
1981 
1982 
1983 #if (NXT_TLS)
1984 
1985 static nxt_int_t
1986 nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
1987     nxt_conf_value_t *value, nxt_socket_conf_t *skcf,
1988     nxt_tls_init_t *tls_init, nxt_bool_t last)
1989 {
1990     nxt_router_tlssock_t  *tls;
1991 
1992     tls = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_router_tlssock_t));
1993     if (nxt_slow_path(tls == NULL)) {
1994         return NXT_ERROR;
1995     }
1996 
1997     tls->tls_init = tls_init;
1998     tls->socket_conf = skcf;
1999     tls->temp_conf = tmcf;
2000     tls->last = last;
2001     nxt_conf_get_string(value, &tls->name);
2002 
2003     nxt_queue_insert_tail(&tmcf->tls, &tls->link);
2004 
2005     return NXT_OK;
2006 }
2007 
2008 #endif
2009 
2010 
2011 static nxt_int_t
2012 nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf,
2013     nxt_conf_value_t *conf)
2014 {
2015     uint32_t          next, i;
2016     nxt_mp_t          *mp;
2017     nxt_str_t         *type, exten, str;
2018     nxt_int_t         ret;
2019     nxt_uint_t        exts;
2020     nxt_conf_value_t  *mtypes_conf, *ext_conf, *value;
2021 
2022     static nxt_str_t  mtypes_path = nxt_string("/mime_types");
2023 
2024     mp = rtcf->mem_pool;
2025 
2026     ret = nxt_http_static_mtypes_init(mp, &rtcf->mtypes_hash);
2027     if (nxt_slow_path(ret != NXT_OK)) {
2028         return NXT_ERROR;
2029     }
2030 
2031     if (conf == NULL) {
2032         return NXT_OK;
2033     }
2034 
2035     mtypes_conf = nxt_conf_get_path(conf, &mtypes_path);
2036 
2037     if (mtypes_conf != NULL) {
2038         next = 0;
2039 
2040         for ( ;; ) {
2041             ext_conf = nxt_conf_next_object_member(mtypes_conf, &str, &next);
2042 
2043             if (ext_conf == NULL) {
2044                 break;
2045             }
2046 
2047             type = nxt_str_dup(mp, NULL, &str);
2048             if (nxt_slow_path(type == NULL)) {
2049                 return NXT_ERROR;
2050             }
2051 
2052             if (nxt_conf_type(ext_conf) == NXT_CONF_STRING) {
2053                 nxt_conf_get_string(ext_conf, &str);
2054 
2055                 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2056                     return NXT_ERROR;
2057                 }
2058 
2059                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2060                                                       &exten, type);
2061                 if (nxt_slow_path(ret != NXT_OK)) {
2062                     return NXT_ERROR;
2063                 }
2064 
2065                 continue;
2066             }
2067 
2068             exts = nxt_conf_array_elements_count(ext_conf);
2069 
2070             for (i = 0; i < exts; i++) {
2071                 value = nxt_conf_get_array_element(ext_conf, i);
2072 
2073                 nxt_conf_get_string(value, &str);
2074 
2075                 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2076                     return NXT_ERROR;
2077                 }
2078 
2079                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2080                                                       &exten, type);
2081                 if (nxt_slow_path(ret != NXT_OK)) {
2082                     return NXT_ERROR;
2083                 }
2084             }
2085         }
2086     }
2087 
2088     return NXT_OK;
2089 }
2090 
2091 
2092 static nxt_int_t
2093 nxt_router_conf_process_client_ip(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2094     nxt_socket_conf_t *skcf, nxt_conf_value_t *conf)
2095 {
2096     char                        c;
2097     size_t                      i;
2098     nxt_mp_t                    *mp;
2099     uint32_t                    hash;
2100     nxt_str_t                   header;
2101     nxt_conf_value_t            *source_conf, *header_conf, *recursive_conf;
2102     nxt_http_client_ip_t        *client_ip;
2103     nxt_http_route_addr_rule_t  *source;
2104 
2105     static nxt_str_t  header_path = nxt_string("/header");
2106     static nxt_str_t  source_path = nxt_string("/source");
2107     static nxt_str_t  recursive_path = nxt_string("/recursive");
2108 
2109     if (conf == NULL) {
2110         skcf->client_ip = NULL;
2111 
2112         return NXT_OK;
2113     }
2114 
2115     mp = tmcf->router_conf->mem_pool;
2116 
2117     source_conf = nxt_conf_get_path(conf, &source_path);
2118     header_conf = nxt_conf_get_path(conf, &header_path);
2119     recursive_conf = nxt_conf_get_path(conf, &recursive_path);
2120 
2121     if (source_conf == NULL || header_conf == NULL) {
2122         return NXT_ERROR;
2123     }
2124 
2125     client_ip = nxt_mp_zget(mp, sizeof(nxt_http_client_ip_t));
2126     if (nxt_slow_path(client_ip == NULL)) {
2127         return NXT_ERROR;
2128     }
2129 
2130     source = nxt_http_route_addr_rule_create(task, mp, source_conf);
2131     if (nxt_slow_path(source == NULL)) {
2132         return NXT_ERROR;
2133     }
2134 
2135     client_ip->source = source;
2136 
2137     nxt_conf_get_string(header_conf, &header);
2138 
2139     if (recursive_conf != NULL) {
2140         client_ip->recursive = nxt_conf_get_boolean(recursive_conf);
2141     }
2142 
2143     client_ip->header = nxt_str_dup(mp, NULL, &header);
2144     if (nxt_slow_path(client_ip->header == NULL)) {
2145         return NXT_ERROR;
2146     }
2147 
2148     hash = NXT_HTTP_FIELD_HASH_INIT;
2149 
2150     for (i = 0; i < client_ip->header->length; i++) {
2151         c = client_ip->header->start[i];
2152         hash = nxt_http_field_hash_char(hash, nxt_lowcase(c));
2153     }
2154 
2155     hash = nxt_http_field_hash_end(hash) & 0xFFFF;
2156 
2157     client_ip->header_hash = hash;
2158 
2159     skcf->client_ip = client_ip;
2160 
2161     return NXT_OK;
2162 }
2163 
2164 
2165 static nxt_app_t *
2166 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
2167 {
2168     nxt_app_t  *app;
2169 
2170     nxt_queue_each(app, queue, nxt_app_t, link) {
2171 
2172         if (nxt_strstr_eq(name, &app->name)) {
2173             return app;
2174         }
2175 
2176     } nxt_queue_loop;
2177 
2178     return NULL;
2179 }
2180 
2181 
2182 static nxt_int_t
2183 nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port)
2184 {
2185     void       *mem;
2186     nxt_int_t  fd;
2187 
2188     fd = nxt_shm_open(task, sizeof(nxt_app_queue_t));
2189     if (nxt_slow_path(fd == -1)) {
2190         return NXT_ERROR;
2191     }
2192 
2193     mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t),
2194                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2195     if (nxt_slow_path(mem == MAP_FAILED)) {
2196         nxt_fd_close(fd);
2197 
2198         return NXT_ERROR;
2199     }
2200 
2201     nxt_app_queue_init(mem);
2202 
2203     port->queue_fd = fd;
2204     port->queue = mem;
2205 
2206     return NXT_OK;
2207 }
2208 
2209 
2210 static nxt_int_t
2211 nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port)
2212 {
2213     void       *mem;
2214     nxt_int_t  fd;
2215 
2216     fd = nxt_shm_open(task, sizeof(nxt_port_queue_t));
2217     if (nxt_slow_path(fd == -1)) {
2218         return NXT_ERROR;
2219     }
2220 
2221     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2222                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2223     if (nxt_slow_path(mem == MAP_FAILED)) {
2224         nxt_fd_close(fd);
2225 
2226         return NXT_ERROR;
2227     }
2228 
2229     nxt_port_queue_init(mem);
2230 
2231     port->queue_fd = fd;
2232     port->queue = mem;
2233 
2234     return NXT_OK;
2235 }
2236 
2237 
2238 static nxt_int_t
2239 nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd)
2240 {
2241     void  *mem;
2242 
2243     nxt_assert(fd != -1);
2244 
2245     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2246                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2247     if (nxt_slow_path(mem == MAP_FAILED)) {
2248 
2249         return NXT_ERROR;
2250     }
2251 
2252     port->queue = mem;
2253 
2254     return NXT_OK;
2255 }
2256 
2257 
2258 static const nxt_lvlhsh_proto_t  nxt_router_apps_hash_proto  nxt_aligned(64) = {
2259     NXT_LVLHSH_DEFAULT,
2260     nxt_router_apps_hash_test,
2261     nxt_mp_lvlhsh_alloc,
2262     nxt_mp_lvlhsh_free,
2263 };
2264 
2265 
2266 static nxt_int_t
2267 nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
2268 {
2269     nxt_app_t  *app;
2270 
2271     app = data;
2272 
2273     return nxt_strstr_eq(&lhq->key, &app->name) ? NXT_OK : NXT_DECLINED;
2274 }
2275 
2276 
2277 static nxt_int_t
2278 nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, nxt_app_t *app)
2279 {
2280     nxt_lvlhsh_query_t  lhq;
2281 
2282     lhq.key_hash = nxt_djb_hash(app->name.start, app->name.length);
2283     lhq.replace = 0;
2284     lhq.key = app->name;
2285     lhq.value = app;
2286     lhq.proto = &nxt_router_apps_hash_proto;
2287     lhq.pool = rtcf->mem_pool;
2288 
2289     switch (nxt_lvlhsh_insert(&rtcf->apps_hash, &lhq)) {
2290 
2291     case NXT_OK:
2292         return NXT_OK;
2293 
2294     case NXT_DECLINED:
2295         nxt_thread_log_alert("router app hash adding failed: "
2296                              "\"%V\" is already in hash", &lhq.key);
2297         /* Fall through. */
2298     default:
2299         return NXT_ERROR;
2300     }
2301 }
2302 
2303 
2304 static nxt_app_t *
2305 nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, nxt_str_t *name)
2306 {
2307     nxt_lvlhsh_query_t  lhq;
2308 
2309     lhq.key_hash = nxt_djb_hash(name->start, name->length);
2310     lhq.key = *name;
2311     lhq.proto = &nxt_router_apps_hash_proto;
2312 
2313     if (nxt_lvlhsh_find(&rtcf->apps_hash, &lhq) != NXT_OK) {
2314         return NULL;
2315     }
2316 
2317     return lhq.value;
2318 }
2319 
2320 
2321 static void
2322 nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i)
2323 {
2324     nxt_app_t          *app;
2325     nxt_lvlhsh_each_t  lhe;
2326 
2327     nxt_lvlhsh_each_init(&lhe, &nxt_router_apps_hash_proto);
2328 
2329     for ( ;; ) {
2330         app = nxt_lvlhsh_each(&rtcf->apps_hash, &lhe);
2331 
2332         if (app == NULL) {
2333             break;
2334         }
2335 
2336         nxt_router_app_use(task, app, i);
2337     }
2338 }
2339 
2340 
2341 typedef struct {
2342     nxt_app_t  *app;
2343     nxt_int_t  target;
2344 } nxt_http_app_conf_t;
2345 
2346 
2347 nxt_int_t
2348 nxt_router_application_init(nxt_router_conf_t *rtcf, nxt_str_t *name,
2349     nxt_str_t *target, nxt_http_action_t *action)
2350 {
2351     nxt_app_t            *app;
2352     nxt_str_t            *targets;
2353     nxt_uint_t           i;
2354     nxt_http_app_conf_t  *conf;
2355 
2356     app = nxt_router_apps_hash_get(rtcf, name);
2357     if (app == NULL) {
2358         return NXT_DECLINED;
2359     }
2360 
2361     conf = nxt_mp_get(rtcf->mem_pool, sizeof(nxt_http_app_conf_t));
2362     if (nxt_slow_path(conf == NULL)) {
2363         return NXT_ERROR;
2364     }
2365 
2366     action->handler = nxt_http_application_handler;
2367     action->u.conf = conf;
2368 
2369     conf->app = app;
2370 
2371     if (target != NULL && target->length != 0) {
2372         targets = app->targets;
2373 
2374         for (i = 0; !nxt_strstr_eq(target, &targets[i]); i++);
2375 
2376         conf->target = i;
2377 
2378     } else {
2379         conf->target = 0;
2380     }
2381 
2382     return NXT_OK;
2383 }
2384 
2385 
2386 static nxt_socket_conf_t *
2387 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2388     nxt_str_t *name)
2389 {
2390     size_t               size;
2391     nxt_int_t            ret;
2392     nxt_bool_t           wildcard;
2393     nxt_sockaddr_t       *sa;
2394     nxt_socket_conf_t    *skcf;
2395     nxt_listen_socket_t  *ls;
2396 
2397     sa = nxt_sockaddr_parse(tmcf->mem_pool, name);
2398     if (nxt_slow_path(sa == NULL)) {
2399         nxt_alert(task, "invalid listener \"%V\"", name);
2400         return NULL;
2401     }
2402 
2403     sa->type = SOCK_STREAM;
2404 
2405     nxt_debug(task, "router listener: \"%*s\"",
2406               (size_t) sa->length, nxt_sockaddr_start(sa));
2407 
2408     skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t));
2409     if (nxt_slow_path(skcf == NULL)) {
2410         return NULL;
2411     }
2412 
2413     size = nxt_sockaddr_size(sa);
2414 
2415     ret = nxt_router_listen_socket_find(tmcf, skcf, sa);
2416 
2417     if (ret != NXT_OK) {
2418 
2419         ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size);
2420         if (nxt_slow_path(ls == NULL)) {
2421             return NULL;
2422         }
2423 
2424         skcf->listen = ls;
2425 
2426         ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t));
2427         nxt_memcpy(ls->sockaddr, sa, size);
2428 
2429         nxt_listen_socket_remote_size(ls);
2430 
2431         ls->socket = -1;
2432         ls->backlog = NXT_LISTEN_BACKLOG;
2433         ls->flags = NXT_NONBLOCK;
2434         ls->read_after_accept = 1;
2435     }
2436 
2437     switch (sa->u.sockaddr.sa_family) {
2438 #if (NXT_HAVE_UNIX_DOMAIN)
2439     case AF_UNIX:
2440         wildcard = 0;
2441         break;
2442 #endif
2443 #if (NXT_INET6)
2444     case AF_INET6:
2445         wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr);
2446         break;
2447 #endif
2448     case AF_INET:
2449     default:
2450         wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY);
2451         break;
2452     }
2453 
2454     if (!wildcard) {
2455         skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size);
2456         if (nxt_slow_path(skcf->sockaddr == NULL)) {
2457             return NULL;
2458         }
2459 
2460         nxt_memcpy(skcf->sockaddr, sa, size);
2461     }
2462 
2463     return skcf;
2464 }
2465 
2466 
2467 static nxt_int_t
2468 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
2469     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa)
2470 {
2471     nxt_router_t       *router;
2472     nxt_queue_link_t   *qlk;
2473     nxt_socket_conf_t  *skcf;
2474 
2475     router = tmcf->router_conf->router;
2476 
2477     for (qlk = nxt_queue_first(&router->sockets);
2478          qlk != nxt_queue_tail(&router->sockets);
2479          qlk = nxt_queue_next(qlk))
2480     {
2481         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2482 
2483         if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) {
2484             nskcf->listen = skcf->listen;
2485 
2486             nxt_queue_remove(qlk);
2487             nxt_queue_insert_tail(&keeping_sockets, qlk);
2488 
2489             nxt_queue_insert_tail(&updating_sockets, &nskcf->link);
2490 
2491             return NXT_OK;
2492         }
2493     }
2494 
2495     nxt_queue_insert_tail(&pending_sockets, &nskcf->link);
2496 
2497     return NXT_DECLINED;
2498 }
2499 
2500 
2501 static void
2502 nxt_router_listen_socket_rpc_create(nxt_task_t *task,
2503     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)
2504 {
2505     size_t            size;
2506     uint32_t          stream;
2507     nxt_int_t         ret;
2508     nxt_buf_t         *b;
2509     nxt_port_t        *main_port, *router_port;
2510     nxt_runtime_t     *rt;
2511     nxt_socket_rpc_t  *rpc;
2512 
2513     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
2514     if (rpc == NULL) {
2515         goto fail;
2516     }
2517 
2518     rpc->socket_conf = skcf;
2519     rpc->temp_conf = tmcf;
2520 
2521     size = nxt_sockaddr_size(skcf->listen->sockaddr);
2522 
2523     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2524     if (b == NULL) {
2525         goto fail;
2526     }
2527 
2528     b->completion_handler = nxt_buf_dummy_completion;
2529 
2530     b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size);
2531 
2532     rt = task->thread->runtime;
2533     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2534     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2535 
2536     stream = nxt_port_rpc_register_handler(task, router_port,
2537                                            nxt_router_listen_socket_ready,
2538                                            nxt_router_listen_socket_error,
2539                                            main_port->pid, rpc);
2540     if (nxt_slow_path(stream == 0)) {
2541         goto fail;
2542     }
2543 
2544     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1,
2545                                 stream, router_port->id, b);
2546 
2547     if (nxt_slow_path(ret != NXT_OK)) {
2548         nxt_port_rpc_cancel(task, router_port, stream);
2549         goto fail;
2550     }
2551 
2552     return;
2553 
2554 fail:
2555 
2556     nxt_router_conf_error(task, tmcf);
2557 }
2558 
2559 
2560 static void
2561 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2562     void *data)
2563 {
2564     nxt_int_t         ret;
2565     nxt_socket_t      s;
2566     nxt_socket_rpc_t  *rpc;
2567 
2568     rpc = data;
2569 
2570     s = msg->fd[0];
2571 
2572     ret = nxt_socket_nonblocking(task, s);
2573     if (nxt_slow_path(ret != NXT_OK)) {
2574         goto fail;
2575     }
2576 
2577     nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr);
2578 
2579     ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
2580     if (nxt_slow_path(ret != NXT_OK)) {
2581         goto fail;
2582     }
2583 
2584     rpc->socket_conf->listen->socket = s;
2585 
2586     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2587                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2588 
2589     return;
2590 
2591 fail:
2592 
2593     nxt_socket_close(task, s);
2594 
2595     nxt_router_conf_error(task, rpc->temp_conf);
2596 }
2597 
2598 
2599 static void
2600 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2601     void *data)
2602 {
2603     nxt_socket_rpc_t        *rpc;
2604     nxt_router_temp_conf_t  *tmcf;
2605 
2606     rpc = data;
2607     tmcf = rpc->temp_conf;
2608 
2609 #if 0
2610     u_char                  *p;
2611     size_t                  size;
2612     uint8_t                 error;
2613     nxt_buf_t               *in, *out;
2614     nxt_sockaddr_t          *sa;
2615 
2616     static nxt_str_t  socket_errors[] = {
2617         nxt_string("ListenerSystem"),
2618         nxt_string("ListenerNoIPv6"),
2619         nxt_string("ListenerPort"),
2620         nxt_string("ListenerInUse"),
2621         nxt_string("ListenerNoAddress"),
2622         nxt_string("ListenerNoAccess"),
2623         nxt_string("ListenerPath"),
2624     };
2625 
2626     sa = rpc->socket_conf->listen->sockaddr;
2627 
2628     in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size);
2629 
2630     if (nxt_slow_path(in == NULL)) {
2631         return;
2632     }
2633 
2634     p = in->mem.pos;
2635 
2636     error = *p++;
2637 
2638     size = nxt_length("listen socket error: ")
2639            + nxt_length("{listener: \"\", code:\"\", message: \"\"}")
2640            + sa->length + socket_errors[error].length + (in->mem.free - p);
2641 
2642     out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2643     if (nxt_slow_path(out == NULL)) {
2644         return;
2645     }
2646 
2647     out->mem.free = nxt_sprintf(out->mem.free, out->mem.end,
2648                         "listen socket error: "
2649                         "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}",
2650                         (size_t) sa->length, nxt_sockaddr_start(sa),
2651                         &socket_errors[error], in->mem.free - p, p);
2652 
2653     nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos);
2654 #endif
2655 
2656     nxt_router_conf_error(task, tmcf);
2657 }
2658 
2659 
2660 #if (NXT_TLS)
2661 
2662 static void
2663 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2664     void *data)
2665 {
2666     nxt_mp_t                *mp;
2667     nxt_int_t               ret;
2668     nxt_tls_conf_t          *tlscf;
2669     nxt_router_tlssock_t    *tls;
2670     nxt_tls_bundle_conf_t   *bundle;
2671     nxt_router_temp_conf_t  *tmcf;
2672 
2673     nxt_debug(task, "tls rpc handler");
2674 
2675     tls = data;
2676     tmcf = tls->temp_conf;
2677 
2678     if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) {
2679         goto fail;
2680     }
2681 
2682     mp = tmcf->router_conf->mem_pool;
2683 
2684     if (tls->socket_conf->tls == NULL){
2685         tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t));
2686         if (nxt_slow_path(tlscf == NULL)) {
2687             goto fail;
2688         }
2689 
2690         tlscf->no_wait_shutdown = 1;
2691         tls->socket_conf->tls = tlscf;
2692 
2693     } else {
2694         tlscf = tls->socket_conf->tls;
2695     }
2696 
2697     tls->tls_init->conf = tlscf;
2698 
2699     bundle = nxt_mp_get(mp, sizeof(nxt_tls_bundle_conf_t));
2700     if (nxt_slow_path(bundle == NULL)) {
2701         goto fail;
2702     }
2703 
2704     if (nxt_slow_path(nxt_str_dup(mp, &bundle->name, &tls->name) == NULL)) {
2705         goto fail;
2706     }
2707 
2708     bundle->chain_file = msg->fd[0];
2709     bundle->next = tlscf->bundle;
2710     tlscf->bundle = bundle;
2711 
2712     ret = task->thread->runtime->tls->server_init(task, mp, tls->tls_init,
2713                                                   tls->last);
2714     if (nxt_slow_path(ret != NXT_OK)) {
2715         goto fail;
2716     }
2717 
2718     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2719                        nxt_router_conf_apply, task, tmcf, NULL);
2720     return;
2721 
2722 fail:
2723 
2724     nxt_router_conf_error(task, tmcf);
2725 }
2726 
2727 #endif
2728 
2729 
2730 static void
2731 nxt_router_app_rpc_create(nxt_task_t *task,
2732     nxt_router_temp_conf_t *tmcf, nxt_app_t *app)
2733 {
2734     size_t         size;
2735     uint32_t       stream;
2736     nxt_int_t      ret;
2737     nxt_buf_t      *b;
2738     nxt_port_t     *main_port, *router_port;
2739     nxt_runtime_t  *rt;
2740     nxt_app_rpc_t  *rpc;
2741 
2742     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_app_rpc_t));
2743     if (rpc == NULL) {
2744         goto fail;
2745     }
2746 
2747     rpc->app = app;
2748     rpc->temp_conf = tmcf;
2749 
2750     nxt_debug(task, "app '%V' prefork", &app->name);
2751 
2752     size = app->name.length + 1 + app->conf.length;
2753 
2754     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2755     if (nxt_slow_path(b == NULL)) {
2756         goto fail;
2757     }
2758 
2759     b->completion_handler = nxt_buf_dummy_completion;
2760 
2761     nxt_buf_cpystr(b, &app->name);
2762     *b->mem.free++ = '\0';
2763     nxt_buf_cpystr(b, &app->conf);
2764 
2765     rt = task->thread->runtime;
2766     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2767     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2768 
2769     stream = nxt_port_rpc_register_handler(task, router_port,
2770                                            nxt_router_app_prefork_ready,
2771                                            nxt_router_app_prefork_error,
2772                                            -1, rpc);
2773     if (nxt_slow_path(stream == 0)) {
2774         goto fail;
2775     }
2776 
2777     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS,
2778                                 -1, stream, router_port->id, b);
2779 
2780     if (nxt_slow_path(ret != NXT_OK)) {
2781         nxt_port_rpc_cancel(task, router_port, stream);
2782         goto fail;
2783     }
2784 
2785     app->pending_processes++;
2786 
2787     return;
2788 
2789 fail:
2790 
2791     nxt_router_conf_error(task, tmcf);
2792 }
2793 
2794 
2795 static void
2796 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2797     void *data)
2798 {
2799     nxt_app_t           *app;
2800     nxt_port_t          *port;
2801     nxt_app_rpc_t       *rpc;
2802     nxt_event_engine_t  *engine;
2803 
2804     rpc = data;
2805     app = rpc->app;
2806 
2807     port = msg->u.new_port;
2808 
2809     nxt_assert(port != NULL);
2810     nxt_assert(port->type == NXT_PROCESS_APP);
2811     nxt_assert(port->id == 0);
2812 
2813     port->app = app;
2814     port->main_app_port = port;
2815 
2816     app->pending_processes--;
2817     app->processes++;
2818     app->idle_processes++;
2819 
2820     engine = task->thread->engine;
2821 
2822     nxt_queue_insert_tail(&app->ports, &port->app_link);
2823     nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
2824 
2825     nxt_debug(task, "app '%V' move new port %PI:%d to spare_ports",
2826               &app->name, port->pid, port->id);
2827 
2828     nxt_port_hash_add(&app->port_hash, port);
2829     app->port_hash_count++;
2830 
2831     port->idle_start = 0;
2832 
2833     nxt_port_inc_use(port);
2834 
2835     nxt_router_app_shared_port_send(task, port);
2836 
2837     nxt_work_queue_add(&engine->fast_work_queue,
2838                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2839 }
2840 
2841 
2842 static void
2843 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2844     void *data)
2845 {
2846     nxt_app_t               *app;
2847     nxt_app_rpc_t           *rpc;
2848     nxt_router_temp_conf_t  *tmcf;
2849 
2850     rpc = data;
2851     app = rpc->app;
2852     tmcf = rpc->temp_conf;
2853 
2854     nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"",
2855             &app->name);
2856 
2857     app->pending_processes--;
2858 
2859     nxt_router_conf_error(task, tmcf);
2860 }
2861 
2862 
2863 static nxt_int_t
2864 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
2865     nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
2866 {
2867     nxt_int_t                 ret;
2868     nxt_uint_t                n, threads;
2869     nxt_queue_link_t          *qlk;
2870     nxt_router_engine_conf_t  *recf;
2871 
2872     threads = tmcf->router_conf->threads;
2873 
2874     tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
2875                                      sizeof(nxt_router_engine_conf_t));
2876     if (nxt_slow_path(tmcf->engines == NULL)) {
2877         return NXT_ERROR;
2878     }
2879 
2880     n = 0;
2881 
2882     for (qlk = nxt_queue_first(&router->engines);
2883          qlk != nxt_queue_tail(&router->engines);
2884          qlk = nxt_queue_next(qlk))
2885     {
2886         recf = nxt_array_zero_add(tmcf->engines);
2887         if (nxt_slow_path(recf == NULL)) {
2888             return NXT_ERROR;
2889         }
2890 
2891         recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0);
2892 
2893         if (n < threads) {
2894             recf->action = NXT_ROUTER_ENGINE_KEEP;
2895             ret = nxt_router_engine_conf_update(tmcf, recf);
2896 
2897         } else {
2898             recf->action = NXT_ROUTER_ENGINE_DELETE;
2899             ret = nxt_router_engine_conf_delete(tmcf, recf);
2900         }
2901 
2902         if (nxt_slow_path(ret != NXT_OK)) {
2903             return ret;
2904         }
2905 
2906         n++;
2907     }
2908 
2909     tmcf->new_threads = n;
2910 
2911     while (n < threads) {
2912         recf = nxt_array_zero_add(tmcf->engines);
2913         if (nxt_slow_path(recf == NULL)) {
2914             return NXT_ERROR;
2915         }
2916 
2917         recf->action = NXT_ROUTER_ENGINE_ADD;
2918 
2919         recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
2920         if (nxt_slow_path(recf->engine == NULL)) {
2921             return NXT_ERROR;
2922         }
2923 
2924         ret = nxt_router_engine_conf_create(tmcf, recf);
2925         if (nxt_slow_path(ret != NXT_OK)) {
2926             return ret;
2927         }
2928 
2929         n++;
2930     }
2931 
2932     return NXT_OK;
2933 }
2934 
2935 
2936 static nxt_int_t
2937 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
2938     nxt_router_engine_conf_t *recf)
2939 {
2940     nxt_int_t  ret;
2941 
2942     ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
2943                                           nxt_router_listen_socket_create);
2944     if (nxt_slow_path(ret != NXT_OK)) {
2945         return ret;
2946     }
2947 
2948     ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
2949                                           nxt_router_listen_socket_create);
2950     if (nxt_slow_path(ret != NXT_OK)) {
2951         return ret;
2952     }
2953 
2954     return ret;
2955 }
2956 
2957 
2958 static nxt_int_t
2959 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
2960     nxt_router_engine_conf_t *recf)
2961 {
2962     nxt_int_t  ret;
2963 
2964     ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
2965                                           nxt_router_listen_socket_create);
2966     if (nxt_slow_path(ret != NXT_OK)) {
2967         return ret;
2968     }
2969 
2970     ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
2971                                           nxt_router_listen_socket_update);
2972     if (nxt_slow_path(ret != NXT_OK)) {
2973         return ret;
2974     }
2975 
2976     ret = nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
2977     if (nxt_slow_path(ret != NXT_OK)) {
2978         return ret;
2979     }
2980 
2981     return ret;
2982 }
2983 
2984 
2985 static nxt_int_t
2986 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
2987     nxt_router_engine_conf_t *recf)
2988 {
2989     nxt_int_t  ret;
2990 
2991     ret = nxt_router_engine_quit(tmcf, recf);
2992     if (nxt_slow_path(ret != NXT_OK)) {
2993         return ret;
2994     }
2995 
2996     ret = nxt_router_engine_joints_delete(tmcf, recf, &updating_sockets);
2997     if (nxt_slow_path(ret != NXT_OK)) {
2998         return ret;
2999     }
3000 
3001     return nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
3002 }
3003 
3004 
3005 static nxt_int_t
3006 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
3007     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
3008     nxt_work_handler_t handler)
3009 {
3010     nxt_int_t                ret;
3011     nxt_joint_job_t          *job;
3012     nxt_queue_link_t         *qlk;
3013     nxt_socket_conf_t        *skcf;
3014     nxt_socket_conf_joint_t  *joint;
3015 
3016     for (qlk = nxt_queue_first(sockets);
3017          qlk != nxt_queue_tail(sockets);
3018          qlk = nxt_queue_next(qlk))
3019     {
3020         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3021         if (nxt_slow_path(job == NULL)) {
3022             return NXT_ERROR;
3023         }
3024 
3025         job->work.next = recf->jobs;
3026         recf->jobs = &job->work;
3027 
3028         job->task = tmcf->engine->task;
3029         job->work.handler = handler;
3030         job->work.task = &job->task;
3031         job->work.obj = job;
3032         job->tmcf = tmcf;
3033 
3034         tmcf->count++;
3035 
3036         joint = nxt_mp_alloc(tmcf->router_conf->mem_pool,
3037                              sizeof(nxt_socket_conf_joint_t));
3038         if (nxt_slow_path(joint == NULL)) {
3039             return NXT_ERROR;
3040         }
3041 
3042         job->work.data = joint;
3043 
3044         ret = nxt_upstreams_joint_create(tmcf, &joint->upstreams);
3045         if (nxt_slow_path(ret != NXT_OK)) {
3046             return ret;
3047         }
3048 
3049         joint->count = 1;
3050 
3051         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
3052         skcf->count++;
3053         joint->socket_conf = skcf;
3054 
3055         joint->engine = recf->engine;
3056     }
3057 
3058     return NXT_OK;
3059 }
3060 
3061 
3062 static nxt_int_t
3063 nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
3064     nxt_router_engine_conf_t *recf)
3065 {
3066     nxt_joint_job_t  *job;
3067 
3068     job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3069     if (nxt_slow_path(job == NULL)) {
3070         return NXT_ERROR;
3071     }
3072 
3073     job->work.next = recf->jobs;
3074     recf->jobs = &job->work;
3075 
3076     job->task = tmcf->engine->task;
3077     job->work.handler = nxt_router_worker_thread_quit;
3078     job->work.task = &job->task;
3079     job->work.obj = NULL;
3080     job->work.data = NULL;
3081     job->tmcf = NULL;
3082 
3083     return NXT_OK;
3084 }
3085 
3086 
3087 static nxt_int_t
3088 nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
3089     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets)
3090 {
3091     nxt_joint_job_t   *job;
3092     nxt_queue_link_t  *qlk;
3093 
3094     for (qlk = nxt_queue_first(sockets);
3095          qlk != nxt_queue_tail(sockets);
3096          qlk = nxt_queue_next(qlk))
3097     {
3098         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3099         if (nxt_slow_path(job == NULL)) {
3100             return NXT_ERROR;
3101         }
3102 
3103         job->work.next = recf->jobs;
3104         recf->jobs = &job->work;
3105 
3106         job->task = tmcf->engine->task;
3107         job->work.handler = nxt_router_listen_socket_delete;
3108         job->work.task = &job->task;
3109         job->work.obj = job;
3110         job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
3111         job->tmcf = tmcf;
3112 
3113         tmcf->count++;
3114     }
3115 
3116     return NXT_OK;
3117 }
3118 
3119 
3120 static nxt_int_t
3121 nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
3122     nxt_router_temp_conf_t *tmcf)
3123 {
3124     nxt_int_t                 ret;
3125     nxt_uint_t                i, threads;
3126     nxt_router_engine_conf_t  *recf;
3127 
3128     recf = tmcf->engines->elts;
3129     threads = tmcf->router_conf->threads;
3130 
3131     for (i = tmcf->new_threads; i < threads; i++) {
3132         ret = nxt_router_thread_create(task, rt, recf[i].engine);
3133         if (nxt_slow_path(ret != NXT_OK)) {
3134             return ret;
3135         }
3136     }
3137 
3138     return NXT_OK;
3139 }
3140 
3141 
3142 static nxt_int_t
3143 nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
3144     nxt_event_engine_t *engine)
3145 {
3146     nxt_int_t            ret;
3147     nxt_thread_link_t    *link;
3148     nxt_thread_handle_t  handle;
3149 
3150     link = nxt_zalloc(sizeof(nxt_thread_link_t));
3151 
3152     if (nxt_slow_path(link == NULL)) {
3153         return NXT_ERROR;
3154     }
3155 
3156     link->start = nxt_router_thread_start;
3157     link->engine = engine;
3158     link->work.handler = nxt_router_thread_exit_handler;
3159     link->work.task = task;
3160     link->work.data = link;
3161 
3162     nxt_queue_insert_tail(&rt->engines, &engine->link);
3163 
3164     ret = nxt_thread_create(&handle, link);
3165 
3166     if (nxt_slow_path(ret != NXT_OK)) {
3167         nxt_queue_remove(&engine->link);
3168     }
3169 
3170     return ret;
3171 }
3172 
3173 
3174 static void
3175 nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
3176     nxt_router_temp_conf_t *tmcf)
3177 {
3178     nxt_app_t  *app;
3179 
3180     nxt_queue_each(app, &router->apps, nxt_app_t, link) {
3181 
3182         nxt_router_app_unlink(task, app);
3183 
3184     } nxt_queue_loop;
3185 
3186     nxt_queue_add(&router->apps, &tmcf->previous);
3187     nxt_queue_add(&router->apps, &tmcf->apps);
3188 }
3189 
3190 
3191 static void
3192 nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf)
3193 {
3194     nxt_uint_t                n;
3195     nxt_event_engine_t        *engine;
3196     nxt_router_engine_conf_t  *recf;
3197 
3198     recf = tmcf->engines->elts;
3199 
3200     for (n = tmcf->engines->nelts; n != 0; n--) {
3201         engine = recf->engine;
3202 
3203         switch (recf->action) {
3204 
3205         case NXT_ROUTER_ENGINE_KEEP:
3206             break;
3207 
3208         case NXT_ROUTER_ENGINE_ADD:
3209             nxt_queue_insert_tail(&router->engines, &engine->link0);
3210             break;
3211 
3212         case NXT_ROUTER_ENGINE_DELETE:
3213             nxt_queue_remove(&engine->link0);
3214             break;
3215         }
3216 
3217         nxt_router_engine_post(engine, recf->jobs);
3218 
3219         recf++;
3220     }
3221 }
3222 
3223 
3224 static void
3225 nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs)
3226 {
3227     nxt_work_t  *work, *next;
3228 
3229     for (work = jobs; work != NULL; work = next) {
3230         next = work->next;
3231         work->next = NULL;
3232 
3233         nxt_event_engine_post(engine, work);
3234     }
3235 }
3236 
3237 
3238 static nxt_port_handlers_t  nxt_router_app_port_handlers = {
3239     .rpc_error       = nxt_port_rpc_handler,
3240     .mmap            = nxt_port_mmap_handler,
3241     .data            = nxt_port_rpc_handler,
3242     .oosm            = nxt_router_oosm_handler,
3243     .req_headers_ack = nxt_port_rpc_handler,
3244 };
3245 
3246 
3247 static void
3248 nxt_router_thread_start(void *data)
3249 {
3250     nxt_int_t           ret;
3251     nxt_port_t          *port;
3252     nxt_task_t          *task;
3253     nxt_work_t          *work;
3254     nxt_thread_t        *thread;
3255     nxt_thread_link_t   *link;
3256     nxt_event_engine_t  *engine;
3257 
3258     link = data;
3259     engine = link->engine;
3260     task = &engine->task;
3261 
3262     thread = nxt_thread();
3263 
3264     nxt_event_engine_thread_adopt(engine);
3265 
3266     /* STUB */
3267     thread->runtime = engine->task.thread->runtime;
3268 
3269     engine->task.thread = thread;
3270     engine->task.log = thread->log;
3271     thread->engine = engine;
3272     thread->task = &engine->task;
3273 #if 0
3274     thread->fiber = &engine->fibers->fiber;
3275 #endif
3276 
3277     engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
3278     if (nxt_slow_path(engine->mem_pool == NULL)) {
3279         return;
3280     }
3281 
3282     port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid,
3283                         NXT_PROCESS_ROUTER);
3284     if (nxt_slow_path(port == NULL)) {
3285         return;
3286     }
3287 
3288     ret = nxt_port_socket_init(task, port, 0);
3289     if (nxt_slow_path(ret != NXT_OK)) {
3290         nxt_port_use(task, port, -1);
3291         return;
3292     }
3293 
3294     ret = nxt_router_port_queue_init(task, port);
3295     if (nxt_slow_path(ret != NXT_OK)) {
3296         nxt_port_use(task, port, -1);
3297         return;
3298     }
3299 
3300     engine->port = port;
3301 
3302     nxt_port_enable(task, port, &nxt_router_app_port_handlers);
3303 
3304     work = nxt_zalloc(sizeof(nxt_work_t));
3305     if (nxt_slow_path(work == NULL)) {
3306         return;
3307     }
3308 
3309     work->handler = nxt_router_rt_add_port;
3310     work->task = link->work.task;
3311     work->obj = work;
3312     work->data = port;
3313 
3314     nxt_event_engine_post(link->work.task->thread->engine, work);
3315 
3316     nxt_event_engine_start(engine);
3317 }
3318 
3319 
3320 static void
3321 nxt_router_rt_add_port(nxt_task_t *task, void *obj, void *data)
3322 {
3323     nxt_int_t      res;
3324     nxt_port_t     *port;
3325     nxt_runtime_t  *rt;
3326 
3327     rt = task->thread->runtime;
3328     port = data;
3329 
3330     nxt_free(obj);
3331 
3332     res = nxt_port_hash_add(&rt->ports, port);
3333 
3334     if (nxt_fast_path(res == NXT_OK)) {
3335         nxt_port_use(task, port, 1);
3336     }
3337 }
3338 
3339 
3340 static void
3341 nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data)
3342 {
3343     nxt_joint_job_t          *job;
3344     nxt_socket_conf_t        *skcf;
3345     nxt_listen_event_t       *lev;
3346     nxt_listen_socket_t      *ls;
3347     nxt_thread_spinlock_t    *lock;
3348     nxt_socket_conf_joint_t  *joint;
3349 
3350     job = obj;
3351     joint = data;
3352 
3353     nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link);
3354 
3355     skcf = joint->socket_conf;
3356     ls = skcf->listen;
3357 
3358     lev = nxt_listen_event(task, ls);
3359     if (nxt_slow_path(lev == NULL)) {
3360         nxt_router_listen_socket_release(task, skcf);
3361         return;
3362     }
3363 
3364     lev->socket.data = joint;
3365 
3366     lock = &skcf->router_conf->router->lock;
3367 
3368     nxt_thread_spin_lock(lock);
3369     ls->count++;
3370     nxt_thread_spin_unlock(lock);
3371 
3372     job->work.next = NULL;
3373     job->work.handler = nxt_router_conf_wait;
3374 
3375     nxt_event_engine_post(job->tmcf->engine, &job->work);
3376 }
3377 
3378 
3379 nxt_inline nxt_listen_event_t *
3380 nxt_router_listen_event(nxt_queue_t *listen_connections,
3381     nxt_socket_conf_t *skcf)
3382 {
3383     nxt_socket_t        fd;
3384     nxt_queue_link_t    *qlk;
3385     nxt_listen_event_t  *lev;
3386 
3387     fd = skcf->listen->socket;
3388 
3389     for (qlk = nxt_queue_first(listen_connections);
3390          qlk != nxt_queue_tail(listen_connections);
3391          qlk = nxt_queue_next(qlk))
3392     {
3393         lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link);
3394 
3395         if (fd == lev->socket.fd) {
3396             return lev;
3397         }
3398     }
3399 
3400     return NULL;
3401 }
3402 
3403 
3404 static void
3405 nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data)
3406 {
3407     nxt_joint_job_t          *job;
3408     nxt_event_engine_t       *engine;
3409     nxt_listen_event_t       *lev;
3410     nxt_socket_conf_joint_t  *joint, *old;
3411 
3412     job = obj;
3413     joint = data;
3414 
3415     engine = task->thread->engine;
3416 
3417     nxt_queue_insert_tail(&engine->joints, &joint->link);
3418 
3419     lev = nxt_router_listen_event(&engine->listen_connections,
3420                                   joint->socket_conf);
3421 
3422     old = lev->socket.data;
3423     lev->socket.data = joint;
3424     lev->listen = joint->socket_conf->listen;
3425 
3426     job->work.next = NULL;
3427     job->work.handler = nxt_router_conf_wait;
3428 
3429     nxt_event_engine_post(job->tmcf->engine, &job->work);
3430 
3431     /*
3432      * The task is allocated from configuration temporary
3433      * memory pool so it can be freed after engine post operation.
3434      */
3435 
3436     nxt_router_conf_release(&engine->task, old);
3437 }
3438 
3439 
3440 static void
3441 nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data)
3442 {
3443     nxt_socket_conf_t        *skcf;
3444     nxt_listen_event_t       *lev;
3445     nxt_event_engine_t       *engine;
3446     nxt_socket_conf_joint_t  *joint;
3447 
3448     skcf = data;
3449 
3450     engine = task->thread->engine;
3451 
3452     lev = nxt_router_listen_event(&engine->listen_connections, skcf);
3453 
3454     nxt_fd_event_delete(engine, &lev->socket);
3455 
3456     nxt_debug(task, "engine %p: listen socket delete: %d", engine,
3457               lev->socket.fd);
3458 
3459     joint = lev->socket.data;
3460     joint->close_job = obj;
3461 
3462     lev->timer.handler = nxt_router_listen_socket_close;
3463     lev->timer.work_queue = &engine->fast_work_queue;
3464 
3465     nxt_timer_add(engine, &lev->timer, 0);
3466 }
3467 
3468 
3469 static void
3470 nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, void *data)
3471 {
3472     nxt_event_engine_t  *engine;
3473 
3474     nxt_debug(task, "router worker thread quit");
3475 
3476     engine = task->thread->engine;
3477 
3478     engine->shutdown = 1;
3479 
3480     if (nxt_queue_is_empty(&engine->joints)) {
3481         nxt_thread_exit(task->thread);
3482     }
3483 }
3484 
3485 
3486 static void
3487 nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data)
3488 {
3489     nxt_timer_t              *timer;
3490     nxt_joint_job_t          *job;
3491     nxt_listen_event_t       *lev;
3492     nxt_socket_conf_joint_t  *joint;
3493 
3494     timer = obj;
3495     lev = nxt_timer_data(timer, nxt_listen_event_t, timer);
3496 
3497     nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine,
3498               lev->socket.fd);
3499 
3500     nxt_queue_remove(&lev->link);
3501 
3502     joint = lev->socket.data;
3503     lev->socket.data = NULL;
3504 
3505     /* 'task' refers to lev->task and we cannot use after nxt_free() */
3506     task = &task->thread->engine->task;
3507 
3508     nxt_router_listen_socket_release(task, joint->socket_conf);
3509 
3510     job = joint->close_job;
3511     job->work.next = NULL;
3512     job->work.handler = nxt_router_conf_wait;
3513 
3514     nxt_event_engine_post(job->tmcf->engine, &job->work);
3515 
3516     nxt_router_listen_event_release(task, lev, joint);
3517 }
3518 
3519 
3520 static void
3521 nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf)
3522 {
3523     nxt_listen_socket_t    *ls;
3524     nxt_thread_spinlock_t  *lock;
3525 
3526     ls = skcf->listen;
3527     lock = &skcf->router_conf->router->lock;
3528 
3529     nxt_thread_spin_lock(lock);
3530 
3531     nxt_debug(task, "engine %p: listen socket release: ls->count %D",
3532               task->thread->engine, ls->count);
3533 
3534     if (--ls->count != 0) {
3535         ls = NULL;
3536     }
3537 
3538     nxt_thread_spin_unlock(lock);
3539 
3540     if (ls != NULL) {
3541         nxt_socket_close(task, ls->socket);
3542         nxt_free(ls);
3543     }
3544 }
3545 
3546 
3547 void
3548 nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev,
3549     nxt_socket_conf_joint_t *joint)
3550 {
3551     nxt_event_engine_t  *engine;
3552 
3553     nxt_debug(task, "listen event count: %D", lev->count);
3554 
3555     engine = task->thread->engine;
3556 
3557     if (--lev->count == 0) {
3558         if (lev->next != NULL) {
3559             nxt_sockaddr_cache_free(engine, lev->next);
3560 
3561             nxt_conn_free(task, lev->next);
3562         }
3563 
3564         nxt_free(lev);
3565     }
3566 
3567     if (joint != NULL) {
3568         nxt_router_conf_release(task, joint);
3569     }
3570 
3571     if (engine->shutdown && nxt_queue_is_empty(&engine->joints)) {
3572         nxt_thread_exit(task->thread);
3573     }
3574 }
3575 
3576 
3577 void
3578 nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
3579 {
3580     nxt_socket_conf_t      *skcf;
3581     nxt_router_conf_t      *rtcf;
3582     nxt_thread_spinlock_t  *lock;
3583 
3584     nxt_debug(task, "conf joint %p count: %D", joint, joint->count);
3585 
3586     if (--joint->count != 0) {
3587         return;
3588     }
3589 
3590     nxt_queue_remove(&joint->link);
3591 
3592     /*
3593      * The joint content can not be safely used after the critical
3594      * section protected by the spinlock because its memory pool may
3595      * be already destroyed by another thread.
3596      */
3597     skcf = joint->socket_conf;
3598     rtcf = skcf->router_conf;
3599     lock = &rtcf->router->lock;
3600 
3601     nxt_thread_spin_lock(lock);
3602 
3603     nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count,
3604               rtcf, rtcf->count);
3605 
3606     if (--skcf->count != 0) {
3607         skcf = NULL;
3608         rtcf = NULL;
3609 
3610     } else {
3611         nxt_queue_remove(&skcf->link);
3612 
3613         if (--rtcf->count != 0) {
3614             rtcf = NULL;
3615         }
3616     }
3617 
3618     nxt_thread_spin_unlock(lock);
3619 
3620 #if (NXT_TLS)
3621     if (skcf != NULL && skcf->tls != NULL) {
3622         task->thread->runtime->tls->server_free(task, skcf->tls);
3623     }
3624 #endif
3625 
3626     /* TODO remove engine->port */
3627 
3628     if (rtcf != NULL) {
3629         nxt_debug(task, "old router conf is destroyed");
3630 
3631         nxt_router_apps_hash_use(task, rtcf, -1);
3632 
3633         nxt_router_access_log_release(task, lock, rtcf->access_log);
3634 
3635         nxt_mp_thread_adopt(rtcf->mem_pool);
3636 
3637         nxt_mp_destroy(rtcf->mem_pool);
3638     }
3639 }
3640 
3641 
3642 static void
3643 nxt_router_access_log_writer(nxt_task_t *task, nxt_http_request_t *r,
3644     nxt_router_access_log_t *access_log)
3645 {
3646     size_t     size;
3647     u_char     *buf, *p;
3648     nxt_off_t  bytes;
3649 
3650     static nxt_time_string_t  date_cache = {
3651         (nxt_atomic_uint_t) -1,
3652         nxt_router_access_log_date,
3653         "%02d/%s/%4d:%02d:%02d:%02d %c%02d%02d",
3654         nxt_length("31/Dec/1986:19:40:00 +0300"),
3655         NXT_THREAD_TIME_LOCAL,
3656         NXT_THREAD_TIME_SEC,
3657     };
3658 
3659     size = r->remote->address_length
3660            + 6                  /* ' - - [' */
3661            + date_cache.size
3662            + 3                  /* '] "' */
3663            + r->method->length
3664            + 1                  /* space */
3665            + r->target.length
3666            + 1                  /* space */
3667            + r->version.length
3668            + 2                  /* '" ' */
3669            + 3                  /* status */
3670            + 1                  /* space */
3671            + NXT_OFF_T_LEN
3672            + 2                  /* ' "' */
3673            + (r->referer != NULL ? r->referer->value_length : 1)
3674            + 3                  /* '" "' */
3675            + (r->user_agent != NULL ? r->user_agent->value_length : 1)
3676            + 2                  /* '"\n' */
3677     ;
3678 
3679     buf = nxt_mp_nget(r->mem_pool, size);
3680     if (nxt_slow_path(buf == NULL)) {
3681         return;
3682     }
3683 
3684     p = nxt_cpymem(buf, nxt_sockaddr_address(r->remote),
3685                    r->remote->address_length);
3686 
3687     p = nxt_cpymem(p, " - - [", 6);
3688 
3689     p = nxt_thread_time_string(task->thread, &date_cache, p);
3690 
3691     p = nxt_cpymem(p, "] \"", 3);
3692 
3693     if (r->method->length != 0) {
3694         p = nxt_cpymem(p, r->method->start, r->method->length);
3695 
3696         if (r->target.length != 0) {
3697             *p++ = ' ';
3698             p = nxt_cpymem(p, r->target.start, r->target.length);
3699 
3700             if (r->version.length != 0) {
3701                 *p++ = ' ';
3702                 p = nxt_cpymem(p, r->version.start, r->version.length);
3703             }
3704         }
3705 
3706     } else {
3707         *p++ = '-';
3708     }
3709 
3710     p = nxt_cpymem(p, "\" ", 2);
3711 
3712     p = nxt_sprintf(p, p + 3, "%03d", r->status);
3713 
3714     *p++ = ' ';
3715 
3716     bytes = nxt_http_proto[r->protocol].body_bytes_sent(task, r->proto);
3717 
3718     p = nxt_sprintf(p, p + NXT_OFF_T_LEN, "%O", bytes);
3719 
3720     p = nxt_cpymem(p, " \"", 2);
3721 
3722     if (r->referer != NULL) {
3723         p = nxt_cpymem(p, r->referer->value, r->referer->value_length);
3724 
3725     } else {
3726         *p++ = '-';
3727     }
3728 
3729     p = nxt_cpymem(p, "\" \"", 3);
3730 
3731     if (r->user_agent != NULL) {
3732         p = nxt_cpymem(p, r->user_agent->value, r->user_agent->value_length);
3733 
3734     } else {
3735         *p++ = '-';
3736     }
3737 
3738     p = nxt_cpymem(p, "\"\n", 2);
3739 
3740     nxt_fd_write(access_log->fd, buf, p - buf);
3741 }
3742 
3743 
3744 static u_char *
3745 nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, struct tm *tm,
3746     size_t size, const char *format)
3747 {
3748     u_char  sign;
3749     time_t  gmtoff;
3750 
3751     static const char  *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun",
3752                                     "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
3753 
3754     gmtoff = nxt_timezone(tm) / 60;
3755 
3756     if (gmtoff < 0) {
3757         gmtoff = -gmtoff;
3758         sign = '-';
3759 
3760     } else {
3761         sign = '+';
3762     }
3763 
3764     return nxt_sprintf(buf, buf + size, format,
3765                        tm->tm_mday, month[tm->tm_mon], tm->tm_year + 1900,
3766                        tm->tm_hour, tm->tm_min, tm->tm_sec,
3767                        sign, gmtoff / 60, gmtoff % 60);
3768 }
3769 
3770 
3771 static void
3772 nxt_router_access_log_open(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
3773 {
3774     uint32_t                 stream;
3775     nxt_int_t                ret;
3776     nxt_buf_t                *b;
3777     nxt_port_t               *main_port, *router_port;
3778     nxt_runtime_t            *rt;
3779     nxt_router_access_log_t  *access_log;
3780 
3781     access_log = tmcf->router_conf->access_log;
3782 
3783     b = nxt_buf_mem_alloc(tmcf->mem_pool, access_log->path.length + 1, 0);
3784     if (nxt_slow_path(b == NULL)) {
3785         goto fail;
3786     }
3787 
3788     b->completion_handler = nxt_buf_dummy_completion;
3789 
3790     nxt_buf_cpystr(b, &access_log->path);
3791     *b->mem.free++ = '\0';
3792 
3793     rt = task->thread->runtime;
3794     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
3795     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
3796 
3797     stream = nxt_port_rpc_register_handler(task, router_port,
3798                                            nxt_router_access_log_ready,
3799                                            nxt_router_access_log_error,
3800                                            -1, tmcf);
3801     if (nxt_slow_path(stream == 0)) {
3802         goto fail;
3803     }
3804 
3805     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1,
3806                                 stream, router_port->id, b);
3807 
3808     if (nxt_slow_path(ret != NXT_OK)) {
3809         nxt_port_rpc_cancel(task, router_port, stream);
3810         goto fail;
3811     }
3812 
3813     return;
3814 
3815 fail:
3816 
3817     nxt_router_conf_error(task, tmcf);
3818 }
3819 
3820 
3821 static void
3822 nxt_router_access_log_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3823     void *data)
3824 {
3825     nxt_router_temp_conf_t   *tmcf;
3826     nxt_router_access_log_t  *access_log;
3827 
3828     tmcf = data;
3829 
3830     access_log = tmcf->router_conf->access_log;
3831 
3832     access_log->fd = msg->fd[0];
3833 
3834     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
3835                        nxt_router_conf_apply, task, tmcf, NULL);
3836 }
3837 
3838 
3839 static void
3840 nxt_router_access_log_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3841     void *data)
3842 {
3843     nxt_router_temp_conf_t  *tmcf;
3844 
3845     tmcf = data;
3846 
3847     nxt_router_conf_error(task, tmcf);
3848 }
3849 
3850 
3851 static void
3852 nxt_router_access_log_release(nxt_task_t *task, nxt_thread_spinlock_t *lock,
3853     nxt_router_access_log_t *access_log)
3854 {
3855     if (access_log == NULL) {
3856         return;
3857     }
3858 
3859     nxt_thread_spin_lock(lock);
3860 
3861     if (--access_log->count != 0) {
3862         access_log = NULL;
3863     }
3864 
3865     nxt_thread_spin_unlock(lock);
3866 
3867     if (access_log != NULL) {
3868 
3869         if (access_log->fd != -1) {
3870             nxt_fd_close(access_log->fd);
3871         }
3872 
3873         nxt_free(access_log);
3874     }
3875 }
3876 
3877 
3878 typedef struct {
3879     nxt_mp_t                 *mem_pool;
3880     nxt_router_access_log_t  *access_log;
3881 } nxt_router_access_log_reopen_t;
3882 
3883 
3884 static void
3885 nxt_router_access_log_reopen_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
3886 {
3887     nxt_mp_t                        *mp;
3888     uint32_t                        stream;
3889     nxt_int_t                       ret;
3890     nxt_buf_t                       *b;
3891     nxt_port_t                      *main_port, *router_port;
3892     nxt_runtime_t                   *rt;
3893     nxt_router_access_log_t         *access_log;
3894     nxt_router_access_log_reopen_t  *reopen;
3895 
3896     access_log = nxt_router->access_log;
3897 
3898     if (access_log == NULL) {
3899         return;
3900     }
3901 
3902     mp = nxt_mp_create(1024, 128, 256, 32);
3903     if (nxt_slow_path(mp == NULL)) {
3904         return;
3905     }
3906 
3907     reopen = nxt_mp_get(mp, sizeof(nxt_router_access_log_reopen_t));
3908     if (nxt_slow_path(reopen == NULL)) {
3909         goto fail;
3910     }
3911 
3912     reopen->mem_pool = mp;
3913     reopen->access_log = access_log;
3914 
3915     b = nxt_buf_mem_alloc(mp, access_log->path.length + 1, 0);
3916     if (nxt_slow_path(b == NULL)) {
3917         goto fail;
3918     }
3919 
3920     b->completion_handler = nxt_router_access_log_reopen_completion;
3921 
3922     nxt_buf_cpystr(b, &access_log->path);
3923     *b->mem.free++ = '\0';
3924 
3925     rt = task->thread->runtime;
3926     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
3927     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
3928 
3929     stream = nxt_port_rpc_register_handler(task, router_port,
3930                                            nxt_router_access_log_reopen_ready,
3931                                            nxt_router_access_log_reopen_error,
3932                                            -1, reopen);
3933     if (nxt_slow_path(stream == 0)) {
3934         goto fail;
3935     }
3936 
3937     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1,
3938                                 stream, router_port->id, b);
3939 
3940     if (nxt_slow_path(ret != NXT_OK)) {
3941         nxt_port_rpc_cancel(task, router_port, stream);
3942         goto fail;
3943     }
3944 
3945     nxt_mp_retain(mp);
3946 
3947     return;
3948 
3949 fail:
3950 
3951     nxt_mp_destroy(mp);
3952 }
3953 
3954 
3955 static void
3956 nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj, void *data)
3957 {
3958     nxt_mp_t   *mp;
3959     nxt_buf_t  *b;
3960 
3961     b = obj;
3962     mp = b->data;
3963 
3964     nxt_mp_release(mp);
3965 }
3966 
3967 
3968 static void
3969 nxt_router_access_log_reopen_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3970     void *data)
3971 {
3972     nxt_router_access_log_t         *access_log;
3973     nxt_router_access_log_reopen_t  *reopen;
3974 
3975     reopen = data;
3976 
3977     access_log = reopen->access_log;
3978 
3979     if (access_log == nxt_router->access_log) {
3980 
3981         if (nxt_slow_path(dup2(msg->fd[0], access_log->fd) == -1)) {
3982             nxt_alert(task, "dup2(%FD, %FD) failed %E",
3983                       msg->fd[0], access_log->fd, nxt_errno);
3984         }
3985     }
3986 
3987     nxt_fd_close(msg->fd[0]);
3988     nxt_mp_release(reopen->mem_pool);
3989 }
3990 
3991 
3992 static void
3993 nxt_router_access_log_reopen_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3994     void *data)
3995 {
3996     nxt_router_access_log_reopen_t  *reopen;
3997 
3998     reopen = data;
3999 
4000     nxt_mp_release(reopen->mem_pool);
4001 }
4002 
4003 
4004 static void
4005 nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data)
4006 {
4007     nxt_port_t           *port;
4008     nxt_thread_link_t    *link;
4009     nxt_event_engine_t   *engine;
4010     nxt_thread_handle_t  handle;
4011 
4012     handle = (nxt_thread_handle_t) (uintptr_t) obj;
4013     link = data;
4014 
4015     nxt_thread_wait(handle);
4016 
4017     engine = link->engine;
4018 
4019     nxt_queue_remove(&engine->link);
4020 
4021     port = engine->port;
4022 
4023     // TODO notify all apps
4024 
4025     port->engine = task->thread->engine;
4026     nxt_mp_thread_adopt(port->mem_pool);
4027     nxt_port_use(task, port, -1);
4028 
4029     nxt_mp_thread_adopt(engine->mem_pool);
4030     nxt_mp_destroy(engine->mem_pool);
4031 
4032     nxt_event_engine_free(engine);
4033 
4034     nxt_free(link);
4035 }
4036 
4037 
4038 static void
4039 nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4040     void *data)
4041 {
4042     size_t                  b_size, count;
4043     nxt_int_t               ret;
4044     nxt_app_t               *app;
4045     nxt_buf_t               *b, *next;
4046     nxt_port_t              *app_port;
4047     nxt_unit_field_t        *f;
4048     nxt_http_field_t        *field;
4049     nxt_http_request_t      *r;
4050     nxt_unit_response_t     *resp;
4051     nxt_request_rpc_data_t  *req_rpc_data;
4052 
4053     req_rpc_data = data;
4054 
4055     r = req_rpc_data->request;
4056     if (nxt_slow_path(r == NULL)) {
4057         return;
4058     }
4059 
4060     if (r->error) {
4061         nxt_request_rpc_data_unlink(task, req_rpc_data);
4062         return;
4063     }
4064 
4065     app = req_rpc_data->app;
4066     nxt_assert(app != NULL);
4067 
4068     if (msg->port_msg.type == _NXT_PORT_MSG_REQ_HEADERS_ACK) {
4069         nxt_router_req_headers_ack_handler(task, msg, req_rpc_data);
4070 
4071         return;
4072     }
4073 
4074     b = (msg->size == 0) ? NULL : msg->buf;
4075 
4076     if (msg->port_msg.last != 0) {
4077         nxt_debug(task, "router data create last buf");
4078 
4079         nxt_buf_chain_add(&b, nxt_http_buf_last(r));
4080 
4081         req_rpc_data->rpc_cancel = 0;
4082 
4083         if (req_rpc_data->apr_action == NXT_APR_REQUEST_FAILED) {
4084             req_rpc_data->apr_action = NXT_APR_GOT_RESPONSE;
4085         }
4086 
4087         nxt_request_rpc_data_unlink(task, req_rpc_data);
4088 
4089     } else {
4090         if (app->timeout != 0) {
4091             r->timer.handler = nxt_router_app_timeout;
4092             r->timer_data = req_rpc_data;
4093             nxt_timer_add(task->thread->engine, &r->timer, app->timeout);
4094         }
4095     }
4096 
4097     if (b == NULL) {
4098         return;
4099     }
4100 
4101     if (msg->buf == b) {
4102         /* Disable instant buffer completion/re-using by port. */
4103         msg->buf = NULL;
4104     }
4105 
4106     if (r->header_sent) {
4107         nxt_buf_chain_add(&r->out, b);
4108         nxt_http_request_send_body(task, r, NULL);
4109 
4110     } else {
4111         b_size = nxt_buf_is_mem(b) ? nxt_buf_mem_used_size(&b->mem) : 0;
4112 
4113         if (nxt_slow_path(b_size < sizeof(nxt_unit_response_t))) {
4114             nxt_alert(task, "response buffer too small: %z", b_size);
4115             goto fail;
4116         }
4117 
4118         resp = (void *) b->mem.pos;
4119         count = (b_size - sizeof(nxt_unit_response_t))
4120                     / sizeof(nxt_unit_field_t);
4121 
4122         if (nxt_slow_path(count < resp->fields_count)) {
4123             nxt_alert(task, "response buffer too small for fields count: %D",
4124                       resp->fields_count);
4125             goto fail;
4126         }
4127 
4128         field = NULL;
4129 
4130         for (f = resp->fields; f < resp->fields + resp->fields_count; f++) {
4131             if (f->skip) {
4132                 continue;
4133             }
4134 
4135             field = nxt_list_add(r->resp.fields);
4136 
4137             if (nxt_slow_path(field == NULL)) {
4138                 goto fail;
4139             }
4140 
4141             field->hash = f->hash;
4142             field->skip = 0;
4143             field->hopbyhop = 0;
4144 
4145             field->name_length = f->name_length;
4146             field->value_length = f->value_length;
4147             field->name = nxt_unit_sptr_get(&f->name);
4148             field->value = nxt_unit_sptr_get(&f->value);
4149 
4150             ret = nxt_http_field_process(field, &nxt_response_fields_hash, r);
4151             if (nxt_slow_path(ret != NXT_OK)) {
4152                 goto fail;
4153             }
4154 
4155             nxt_debug(task, "header%s: %*s: %*s",
4156                       (field->skip ? " skipped" : ""),
4157                       (size_t) field->name_length, field->name,
4158                       (size_t) field->value_length, field->value);
4159 
4160             if (field->skip) {
4161                 r->resp.fields->last->nelts--;
4162             }
4163         }
4164 
4165         r->status = resp->status;
4166 
4167         if (resp->piggyback_content_length != 0) {
4168             b->mem.pos = nxt_unit_sptr_get(&resp->piggyback_content);
4169             b->mem.free = b->mem.pos + resp->piggyback_content_length;
4170 
4171         } else {
4172             b->mem.pos = b->mem.free;
4173         }
4174 
4175         if (nxt_buf_mem_used_size(&b->mem) == 0) {
4176             next = b->next;
4177             b->next = NULL;
4178 
4179             nxt_work_queue_add(&task->thread->engine->fast_work_queue,
4180                                b->completion_handler, task, b, b->parent);
4181 
4182             b = next;
4183         }
4184 
4185         if (b != NULL) {
4186             nxt_buf_chain_add(&r->out, b);
4187         }
4188 
4189         nxt_http_request_header_send(task, r, nxt_http_request_send_body, NULL);
4190 
4191         if (r->websocket_handshake
4192             && r->status == NXT_HTTP_SWITCHING_PROTOCOLS)
4193         {
4194             app_port = req_rpc_data->app_port;
4195             if (nxt_slow_path(app_port == NULL)) {
4196                 goto fail;
4197             }
4198 
4199             nxt_thread_mutex_lock(&app->mutex);
4200 
4201             app_port->main_app_port->active_websockets++;
4202 
4203             nxt_thread_mutex_unlock(&app->mutex);
4204 
4205             nxt_router_app_port_release(task, app, app_port, NXT_APR_UPGRADE);
4206             req_rpc_data->apr_action = NXT_APR_CLOSE;
4207 
4208             nxt_debug(task, "stream #%uD upgrade", req_rpc_data->stream);
4209 
4210             r->state = &nxt_http_websocket;
4211 
4212         } else {
4213             r->state = &nxt_http_request_send_state;
4214         }
4215     }
4216 
4217     return;
4218 
4219 fail:
4220 
4221     nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
4222 
4223     nxt_request_rpc_data_unlink(task, req_rpc_data);
4224 }
4225 
4226 
4227 static void
4228 nxt_router_req_headers_ack_handler(nxt_task_t *task,
4229     nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data)
4230 {
4231     int                 res;
4232     nxt_app_t           *app;
4233     nxt_buf_t           *b;
4234     nxt_bool_t          start_process, unlinked;
4235     nxt_port_t          *app_port, *main_app_port, *idle_port;
4236     nxt_queue_link_t    *idle_lnk;
4237     nxt_http_request_t  *r;
4238 
4239     nxt_debug(task, "stream #%uD: got ack from %PI:%d",
4240               req_rpc_data->stream,
4241               msg->port_msg.pid, msg->port_msg.reply_port);
4242 
4243     nxt_port_rpc_ex_set_peer(task, msg->port, req_rpc_data,
4244                              msg->port_msg.pid);
4245 
4246     app = req_rpc_data->app;
4247     r = req_rpc_data->request;
4248 
4249     start_process = 0;
4250     unlinked = 0;
4251 
4252     nxt_thread_mutex_lock(&app->mutex);
4253 
4254     if (r->app_link.next != NULL) {
4255         nxt_queue_remove(&r->app_link);
4256         r->app_link.next = NULL;
4257 
4258         unlinked = 1;
4259     }
4260 
4261     app_port = nxt_port_hash_find(&app->port_hash, msg->port_msg.pid,
4262                                   msg->port_msg.reply_port);
4263     if (nxt_slow_path(app_port == NULL)) {
4264         nxt_thread_mutex_unlock(&app->mutex);
4265 
4266         nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
4267 
4268         if (unlinked) {
4269             nxt_mp_release(r->mem_pool);
4270         }
4271 
4272         return;
4273     }
4274 
4275     main_app_port = app_port->main_app_port;
4276 
4277     if (nxt_queue_chk_remove(&main_app_port->idle_link)) {
4278         app->idle_processes--;
4279 
4280         nxt_debug(task, "app '%V' move port %PI:%d out of %s (ack)",
4281                   &app->name, main_app_port->pid, main_app_port->id,
4282                   (main_app_port->idle_start ? "idle_ports" : "spare_ports"));
4283 
4284         /* Check port was in 'spare_ports' using idle_start field. */
4285         if (main_app_port->idle_start == 0
4286             && app->idle_processes >= app->spare_processes)
4287         {
4288             /*
4289              * If there is a vacant space in spare ports,
4290              * move the last idle to spare_ports.
4291              */
4292             nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4293 
4294             idle_lnk = nxt_queue_last(&app->idle_ports);
4295             idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link);
4296             nxt_queue_remove(idle_lnk);
4297 
4298             nxt_queue_insert_tail(&app->spare_ports, idle_lnk);
4299 
4300             idle_port->idle_start = 0;
4301 
4302             nxt_debug(task, "app '%V' move port %PI:%d from idle_ports "
4303                       "to spare_ports",
4304                       &app->name, idle_port->pid, idle_port->id);
4305         }
4306 
4307         if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) {
4308             app->pending_processes++;
4309             start_process = 1;
4310         }
4311     }
4312 
4313     main_app_port->active_requests++;
4314 
4315     nxt_port_inc_use(app_port);
4316 
4317     nxt_thread_mutex_unlock(&app->mutex);
4318 
4319     if (unlinked) {
4320         nxt_mp_release(r->mem_pool);
4321     }
4322 
4323     if (start_process) {
4324         nxt_router_start_app_process(task, app);
4325     }
4326 
4327     nxt_port_use(task, req_rpc_data->app_port, -1);
4328 
4329     req_rpc_data->app_port = app_port;
4330 
4331     b = req_rpc_data->msg_info.buf;
4332 
4333     if (b != NULL) {
4334         /* First buffer is already sent.  Start from second. */
4335         b = b->next;
4336 
4337         req_rpc_data->msg_info.buf->next = NULL;
4338     }
4339 
4340     if (req_rpc_data->msg_info.body_fd != -1 || b != NULL) {
4341         nxt_debug(task, "stream #%uD: send body fd %d", req_rpc_data->stream,
4342                   req_rpc_data->msg_info.body_fd);
4343 
4344         if (req_rpc_data->msg_info.body_fd != -1) {
4345             lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET);
4346         }
4347 
4348         res = nxt_port_socket_write(task, app_port, NXT_PORT_MSG_REQ_BODY,
4349                                     req_rpc_data->msg_info.body_fd,
4350                                     req_rpc_data->stream,
4351                                     task->thread->engine->port->id, b);
4352 
4353         if (nxt_slow_path(res != NXT_OK)) {
4354             nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
4355         }
4356     }
4357 
4358     if (app->timeout != 0) {
4359         r->timer.handler = nxt_router_app_timeout;
4360         r->timer_data = req_rpc_data;
4361         nxt_timer_add(task->thread->engine, &r->timer, app->timeout);
4362     }
4363 }
4364 
4365 
4366 static const nxt_http_request_state_t  nxt_http_request_send_state
4367     nxt_aligned(64) =
4368 {
4369     .error_handler = nxt_http_request_error_handler,
4370 };
4371 
4372 
4373 static void
4374 nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data)
4375 {
4376     nxt_buf_t           *out;
4377     nxt_http_request_t  *r;
4378 
4379     r = obj;
4380 
4381     out = r->out;
4382 
4383     if (out != NULL) {
4384         r->out = NULL;
4385         nxt_http_request_send(task, r, out);
4386     }
4387 }
4388 
4389 
4390 static void
4391 nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4392     void *data)
4393 {
4394     nxt_request_rpc_data_t  *req_rpc_data;
4395 
4396     req_rpc_data = data;
4397 
4398     req_rpc_data->rpc_cancel = 0;
4399 
4400     /* TODO cancel message and return if cancelled. */
4401     // nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream);
4402 
4403     if (req_rpc_data->request != NULL) {
4404         nxt_http_request_error(task, req_rpc_data->request,
4405                                NXT_HTTP_SERVICE_UNAVAILABLE);
4406     }
4407 
4408     nxt_request_rpc_data_unlink(task, req_rpc_data);
4409 }
4410 
4411 
4412 static void
4413 nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4414     void *data)
4415 {
4416     nxt_app_t            *app;
4417     nxt_bool_t           start_process;
4418     nxt_port_t           *port;
4419     nxt_app_joint_t      *app_joint;
4420     nxt_app_joint_rpc_t  *app_joint_rpc;
4421 
4422     nxt_assert(data != NULL);
4423 
4424     app_joint_rpc = data;
4425     app_joint = app_joint_rpc->app_joint;
4426     port = msg->u.new_port;
4427 
4428     nxt_assert(app_joint != NULL);
4429     nxt_assert(port != NULL);
4430     nxt_assert(port->type == NXT_PROCESS_APP);
4431     nxt_assert(port->id == 0);
4432 
4433     app = app_joint->app;
4434 
4435     nxt_router_app_joint_use(task, app_joint, -1);
4436 
4437     if (nxt_slow_path(app == NULL)) {
4438         nxt_debug(task, "new port ready for released app, send QUIT");
4439 
4440         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4441 
4442         return;
4443     }
4444 
4445     nxt_thread_mutex_lock(&app->mutex);
4446 
4447     nxt_assert(app->pending_processes != 0);
4448 
4449     app->pending_processes--;
4450 
4451     if (nxt_slow_path(app->generation != app_joint_rpc->generation)) {
4452         nxt_debug(task, "new port ready for restarted app, send QUIT");
4453 
4454         start_process = !task->thread->engine->shutdown
4455                         && nxt_router_app_can_start(app)
4456                         && nxt_router_app_need_start(app);
4457 
4458         if (start_process) {
4459             app->pending_processes++;
4460         }
4461 
4462         nxt_thread_mutex_unlock(&app->mutex);
4463 
4464         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4465 
4466         if (start_process) {
4467             nxt_router_start_app_process(task, app);
4468         }
4469 
4470         return;
4471     }
4472 
4473     port->app = app;
4474     port->main_app_port = port;
4475 
4476     app->processes++;
4477     nxt_port_hash_add(&app->port_hash, port);
4478     app->port_hash_count++;
4479 
4480     nxt_thread_mutex_unlock(&app->mutex);
4481 
4482     nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d",
4483               &app->name, port->pid, app->processes, app->pending_processes);
4484 
4485     nxt_router_app_shared_port_send(task, port);
4486 
4487     nxt_router_app_port_release(task, app, port, NXT_APR_NEW_PORT);
4488 }
4489 
4490 
4491 static nxt_int_t
4492 nxt_router_app_shared_port_send(nxt_task_t *task, nxt_port_t *app_port)
4493 {
4494     nxt_buf_t                *b;
4495     nxt_port_t               *port;
4496     nxt_port_msg_new_port_t  *msg;
4497 
4498     b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
4499                              sizeof(nxt_port_data_t));
4500     if (nxt_slow_path(b == NULL)) {
4501         return NXT_ERROR;
4502     }
4503 
4504     port = app_port->app->shared_port;
4505 
4506     nxt_debug(task, "send port %FD to process %PI",
4507               port->pair[0], app_port->pid);
4508 
4509     b->mem.free += sizeof(nxt_port_msg_new_port_t);
4510     msg = (nxt_port_msg_new_port_t *) b->mem.pos;
4511 
4512     msg->id = port->id;
4513     msg->pid = port->pid;
4514     msg->max_size = port->max_size;
4515     msg->max_share = port->max_share;
4516     msg->type = port->type;
4517 
4518     return nxt_port_socket_write2(task, app_port,
4519                                   NXT_PORT_MSG_NEW_PORT,
4520                                   port->pair[0], port->queue_fd,
4521                                   0, 0, b);
4522 }
4523 
4524 
4525 static void
4526 nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4527     void *data)
4528 {
4529     nxt_app_t            *app;
4530     nxt_app_joint_t      *app_joint;
4531     nxt_queue_link_t     *link;
4532     nxt_http_request_t   *r;
4533     nxt_app_joint_rpc_t  *app_joint_rpc;
4534 
4535     nxt_assert(data != NULL);
4536 
4537     app_joint_rpc = data;
4538     app_joint = app_joint_rpc->app_joint;
4539 
4540     nxt_assert(app_joint != NULL);
4541 
4542     app = app_joint->app;
4543 
4544     nxt_router_app_joint_use(task, app_joint, -1);
4545 
4546     if (nxt_slow_path(app == NULL)) {
4547         nxt_debug(task, "start error for released app");
4548 
4549         return;
4550     }
4551 
4552     nxt_debug(task, "app '%V' %p start error", &app->name, app);
4553 
4554     link = NULL;
4555 
4556     nxt_thread_mutex_lock(&app->mutex);
4557 
4558     nxt_assert(app->pending_processes != 0);
4559 
4560     app->pending_processes--;
4561 
4562     if (app->processes == 0 && !nxt_queue_is_empty(&app->ack_waiting_req)) {
4563         link = nxt_queue_first(&app->ack_waiting_req);
4564 
4565         nxt_queue_remove(link);
4566         link->next = NULL;
4567     }
4568 
4569     nxt_thread_mutex_unlock(&app->mutex);
4570 
4571     while (link != NULL) {
4572         r = nxt_container_of(link, nxt_http_request_t, app_link);
4573 
4574         nxt_event_engine_post(r->engine, &r->err_work);
4575 
4576         link = NULL;
4577 
4578         nxt_thread_mutex_lock(&app->mutex);
4579 
4580         if (app->processes == 0 && app->pending_processes == 0
4581             && !nxt_queue_is_empty(&app->ack_waiting_req))
4582         {
4583             link = nxt_queue_first(&app->ack_waiting_req);
4584 
4585             nxt_queue_remove(link);
4586             link->next = NULL;
4587         }
4588 
4589         nxt_thread_mutex_unlock(&app->mutex);
4590     }
4591 }
4592 
4593 
4594 
4595 nxt_inline nxt_port_t *
4596 nxt_router_app_get_port_for_quit(nxt_task_t *task, nxt_app_t *app)
4597 {
4598     nxt_port_t  *port;
4599 
4600     port = NULL;
4601 
4602     nxt_thread_mutex_lock(&app->mutex);
4603 
4604     nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
4605 
4606         /* Caller is responsible to decrease port use count. */
4607         nxt_queue_chk_remove(&port->app_link);
4608 
4609         if (nxt_queue_chk_remove(&port->idle_link)) {
4610             app->idle_processes--;
4611 
4612             nxt_debug(task, "app '%V' move port %PI:%d out of %s for quit",
4613                       &app->name, port->pid, port->id,
4614                       (port->idle_start ? "idle_ports" : "spare_ports"));
4615         }
4616 
4617         nxt_port_hash_remove(&app->port_hash, port);
4618         app->port_hash_count--;
4619 
4620         port->app = NULL;
4621         app->processes--;
4622 
4623         break;
4624 
4625     } nxt_queue_loop;
4626 
4627     nxt_thread_mutex_unlock(&app->mutex);
4628 
4629     return port;
4630 }
4631 
4632 
4633 static void
4634 nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i)
4635 {
4636     int  c;
4637 
4638     c = nxt_atomic_fetch_add(&app->use_count, i);
4639 
4640     if (i < 0 && c == -i) {
4641 
4642         if (task->thread->engine != app->engine) {
4643             nxt_event_engine_post(app->engine, &app->joint->free_app_work);
4644 
4645         } else {
4646             nxt_router_free_app(task, app->joint, NULL);
4647         }
4648     }
4649 }
4650 
4651 
4652 static void
4653 nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app)
4654 {
4655     nxt_debug(task, "app '%V' %p unlink", &app->name, app);
4656 
4657     nxt_queue_remove(&app->link);
4658 
4659     nxt_router_app_use(task, app, -1);
4660 }
4661 
4662 
4663 static void
4664 nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app, nxt_port_t *port,
4665     nxt_apr_action_t action)
4666 {
4667     int         inc_use;
4668     uint32_t    got_response, dec_requests;
4669     nxt_bool_t  adjust_idle_timer;
4670     nxt_port_t  *main_app_port;
4671 
4672     nxt_assert(port != NULL);
4673 
4674     inc_use = 0;
4675     got_response = 0;
4676     dec_requests = 0;
4677 
4678     switch (action) {
4679     case NXT_APR_NEW_PORT:
4680         break;
4681     case NXT_APR_REQUEST_FAILED:
4682         dec_requests = 1;
4683         inc_use = -1;
4684         break;
4685     case NXT_APR_GOT_RESPONSE:
4686         got_response = 1;
4687         inc_use = -1;
4688         break;
4689     case NXT_APR_UPGRADE:
4690         got_response = 1;
4691         break;
4692     case NXT_APR_CLOSE:
4693         inc_use = -1;
4694         break;
4695     }
4696 
4697     nxt_debug(task, "app '%V' release port %PI:%d: %d %d", &app->name,
4698               port->pid, port->id,
4699               (int) inc_use, (int) got_response);
4700 
4701     if (port->id == NXT_SHARED_PORT_ID) {
4702         nxt_thread_mutex_lock(&app->mutex);
4703 
4704         app->active_requests -= got_response + dec_requests;
4705 
4706         nxt_thread_mutex_unlock(&app->mutex);
4707 
4708         goto adjust_use;
4709     }
4710 
4711     main_app_port = port->main_app_port;
4712 
4713     nxt_thread_mutex_lock(&app->mutex);
4714 
4715     main_app_port->active_requests -= got_response + dec_requests;
4716     app->active_requests -= got_response + dec_requests;
4717 
4718     if (main_app_port->pair[1] != -1 && main_app_port->app_link.next == NULL) {
4719         nxt_queue_insert_tail(&app->ports, &main_app_port->app_link);
4720 
4721         nxt_port_inc_use(main_app_port);
4722     }
4723 
4724     adjust_idle_timer = 0;
4725 
4726     if (main_app_port->pair[1] != -1
4727         && main_app_port->active_requests == 0
4728         && main_app_port->active_websockets == 0
4729         && main_app_port->idle_link.next == NULL)
4730     {
4731         if (app->idle_processes == app->spare_processes
4732             && app->adjust_idle_work.data == NULL)
4733         {
4734             adjust_idle_timer = 1;
4735             app->adjust_idle_work.data = app;
4736             app->adjust_idle_work.next = NULL;
4737         }
4738 
4739         if (app->idle_processes < app->spare_processes) {
4740             nxt_queue_insert_tail(&app->spare_ports, &main_app_port->idle_link);
4741 
4742             nxt_debug(task, "app '%V' move port %PI:%d to spare_ports",
4743                       &app->name, main_app_port->pid, main_app_port->id);
4744         } else {
4745             nxt_queue_insert_tail(&app->idle_ports, &main_app_port->idle_link);
4746 
4747             main_app_port->idle_start = task->thread->engine->timers.now;
4748 
4749             nxt_debug(task, "app '%V' move port %PI:%d to idle_ports",
4750                       &app->name, main_app_port->pid, main_app_port->id);
4751         }
4752 
4753         app->idle_processes++;
4754     }
4755 
4756     nxt_thread_mutex_unlock(&app->mutex);
4757 
4758     if (adjust_idle_timer) {
4759         nxt_router_app_use(task, app, 1);
4760         nxt_event_engine_post(app->engine, &app->adjust_idle_work);
4761     }
4762 
4763     /* ? */
4764     if (main_app_port->pair[1] == -1) {
4765         nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)",
4766                   &app->name, app, main_app_port, main_app_port->pid);
4767 
4768         goto adjust_use;
4769     }
4770 
4771     nxt_debug(task, "app '%V' %p requests queue is empty, keep the port",
4772               &app->name, app);
4773 
4774 adjust_use:
4775 
4776     nxt_port_use(task, port, inc_use);
4777 }
4778 
4779 
4780 void
4781 nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port)
4782 {
4783     nxt_app_t         *app;
4784     nxt_bool_t        unchain, start_process;
4785     nxt_port_t        *idle_port;
4786     nxt_queue_link_t  *idle_lnk;
4787 
4788     app = port->app;
4789 
4790     nxt_assert(app != NULL);
4791 
4792     nxt_thread_mutex_lock(&app->mutex);
4793 
4794     nxt_port_hash_remove(&app->port_hash, port);
4795     app->port_hash_count--;
4796 
4797     if (port->id != 0) {
4798         nxt_thread_mutex_unlock(&app->mutex);
4799 
4800         nxt_debug(task, "app '%V' port (%PI, %d) closed", &app->name,
4801                   port->pid, port->id);
4802 
4803         return;
4804     }
4805 
4806     unchain = nxt_queue_chk_remove(&port->app_link);
4807 
4808     if (nxt_queue_chk_remove(&port->idle_link)) {
4809         app->idle_processes--;
4810 
4811         nxt_debug(task, "app '%V' move port %PI:%d out of %s before close",
4812                   &app->name, port->pid, port->id,
4813                   (port->idle_start ? "idle_ports" : "spare_ports"));
4814 
4815         if (port->idle_start == 0
4816             && app->idle_processes >= app->spare_processes)
4817         {
4818             nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4819 
4820             idle_lnk = nxt_queue_last(&app->idle_ports);
4821             idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link);
4822             nxt_queue_remove(idle_lnk);
4823 
4824             nxt_queue_insert_tail(&app->spare_ports, idle_lnk);
4825 
4826             idle_port->idle_start = 0;
4827 
4828             nxt_debug(task, "app '%V' move port %PI:%d from idle_ports "
4829                       "to spare_ports",
4830                       &app->name, idle_port->pid, idle_port->id);
4831         }
4832     }
4833 
4834     app->processes--;
4835 
4836     start_process = !task->thread->engine->shutdown
4837                     && nxt_router_app_can_start(app)
4838                     && nxt_router_app_need_start(app);
4839 
4840     if (start_process) {
4841         app->pending_processes++;
4842     }
4843 
4844     nxt_thread_mutex_unlock(&app->mutex);
4845 
4846     nxt_debug(task, "app '%V' pid %PI closed", &app->name, port->pid);
4847 
4848     if (unchain) {
4849         nxt_port_use(task, port, -1);
4850     }
4851 
4852     if (start_process) {
4853         nxt_router_start_app_process(task, app);
4854     }
4855 }
4856 
4857 
4858 static void
4859 nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data)
4860 {
4861     nxt_app_t           *app;
4862     nxt_bool_t          queued;
4863     nxt_port_t          *port;
4864     nxt_msec_t          timeout, threshold;
4865     nxt_queue_link_t    *lnk;
4866     nxt_event_engine_t  *engine;
4867 
4868     app = obj;
4869     queued = (data == app);
4870 
4871     nxt_debug(task, "nxt_router_adjust_idle_timer: app \"%V\", queued %b",
4872               &app->name, queued);
4873 
4874     engine = task->thread->engine;
4875 
4876     nxt_assert(app->engine == engine);
4877 
4878     threshold = engine->timers.now + app->joint->idle_timer.bias;
4879     timeout = 0;
4880 
4881     nxt_thread_mutex_lock(&app->mutex);
4882 
4883     if (queued) {
4884         app->adjust_idle_work.data = NULL;
4885     }
4886 
4887     nxt_debug(task, "app '%V' idle_processes %d, spare_processes %d",
4888               &app->name,
4889               (int) app->idle_processes, (int) app->spare_processes);
4890 
4891     while (app->idle_processes > app->spare_processes) {
4892 
4893         nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4894 
4895         lnk = nxt_queue_first(&app->idle_ports);
4896         port = nxt_queue_link_data(lnk, nxt_port_t, idle_link);
4897 
4898         timeout = port->idle_start + app->idle_timeout;
4899 
4900         nxt_debug(task, "app '%V' pid %PI, start %M, timeout %M, threshold %M",
4901                   &app->name, port->pid,
4902                   port->idle_start, timeout, threshold);
4903 
4904         if (timeout > threshold) {
4905             break;
4906         }
4907 
4908         nxt_queue_remove(lnk);
4909         lnk->next = NULL;
4910 
4911         nxt_debug(task, "app '%V' move port %PI:%d out of idle_ports (timeout)",
4912                   &app->name, port->pid, port->id);
4913 
4914         nxt_queue_chk_remove(&port->app_link);
4915 
4916         nxt_port_hash_remove(&app->port_hash, port);
4917         app->port_hash_count--;
4918 
4919         app->idle_processes--;
4920         app->processes--;
4921         port->app = NULL;
4922 
4923         nxt_thread_mutex_unlock(&app->mutex);
4924 
4925         nxt_debug(task, "app '%V' send QUIT to idle port %PI",
4926                   &app->name, port->pid);
4927 
4928         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4929 
4930         nxt_port_use(task, port, -1);
4931 
4932         nxt_thread_mutex_lock(&app->mutex);
4933     }
4934 
4935     nxt_thread_mutex_unlock(&app->mutex);
4936 
4937     if (timeout > threshold) {
4938         nxt_timer_add(engine, &app->joint->idle_timer, timeout - threshold);
4939 
4940     } else {
4941         nxt_timer_disable(engine, &app->joint->idle_timer);
4942     }
4943 
4944     if (queued) {
4945         nxt_router_app_use(task, app, -1);
4946     }
4947 }
4948 
4949 
4950 static void
4951 nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, void *data)
4952 {
4953     nxt_timer_t      *timer;
4954     nxt_app_joint_t  *app_joint;
4955 
4956     timer = obj;
4957     app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer);
4958 
4959     if (nxt_fast_path(app_joint->app != NULL)) {
4960         nxt_router_adjust_idle_timer(task, app_joint->app, NULL);
4961     }
4962 }
4963 
4964 
4965 static void
4966 nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, void *data)
4967 {
4968     nxt_timer_t      *timer;
4969     nxt_app_joint_t  *app_joint;
4970 
4971     timer = obj;
4972     app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer);
4973 
4974     nxt_router_app_joint_use(task, app_joint, -1);
4975 }
4976 
4977 
4978 static void
4979 nxt_router_free_app(nxt_task_t *task, void *obj, void *data)
4980 {
4981     nxt_app_t        *app;
4982     nxt_port_t       *port;
4983     nxt_app_joint_t  *app_joint;
4984 
4985     app_joint = obj;
4986     app = app_joint->app;
4987 
4988     for ( ;; ) {
4989         port = nxt_router_app_get_port_for_quit(task, app);
4990         if (port == NULL) {
4991             break;
4992         }
4993 
4994         nxt_debug(task, "send QUIT to app '%V' pid %PI", &app->name, port->pid);
4995 
4996         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4997 
4998         nxt_port_use(task, port, -1);
4999     }
5000 
5001     nxt_thread_mutex_lock(&app->mutex);
5002 
5003     for ( ;; ) {
5004         port = nxt_port_hash_retrieve(&app->port_hash);
5005         if (port == NULL) {
5006             break;
5007         }
5008 
5009         app->port_hash_count--;
5010 
5011         port->app = NULL;
5012 
5013         nxt_port_close(task, port);
5014 
5015         nxt_port_use(task, port, -1);
5016     }
5017 
5018     nxt_thread_mutex_unlock(&app->mutex);
5019 
5020     nxt_assert(app->processes == 0);
5021     nxt_assert(app->active_requests == 0);
5022     nxt_assert(app->port_hash_count == 0);
5023     nxt_assert(app->idle_processes == 0);
5024     nxt_assert(nxt_queue_is_empty(&app->ports));
5025     nxt_assert(nxt_queue_is_empty(&app->spare_ports));
5026     nxt_assert(nxt_queue_is_empty(&app->idle_ports));
5027 
5028     nxt_port_mmaps_destroy(&app->outgoing, 1);
5029 
5030     nxt_thread_mutex_destroy(&app->outgoing.mutex);
5031 
5032     if (app->shared_port != NULL) {
5033         app->shared_port->app = NULL;
5034         nxt_port_close(task, app->shared_port);
5035         nxt_port_use(task, app->shared_port, -1);
5036 
5037         app->shared_port = NULL;
5038     }
5039 
5040     nxt_thread_mutex_destroy(&app->mutex);
5041     nxt_mp_destroy(app->mem_pool);
5042 
5043     app_joint->app = NULL;
5044 
5045     if (nxt_timer_delete(task->thread->engine, &app_joint->idle_timer)) {
5046         app_joint->idle_timer.handler = nxt_router_app_joint_release_handler;
5047         nxt_timer_add(task->thread->engine, &app_joint->idle_timer, 0);
5048 
5049     } else {
5050         nxt_router_app_joint_use(task, app_joint, -1);
5051     }
5052 }
5053 
5054 
5055 static void
5056 nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
5057     nxt_request_rpc_data_t *req_rpc_data)
5058 {
5059     nxt_bool_t          start_process;
5060     nxt_port_t          *port;
5061     nxt_http_request_t  *r;
5062 
5063     start_process = 0;
5064 
5065     nxt_thread_mutex_lock(&app->mutex);
5066 
5067     port = app->shared_port;
5068     nxt_port_inc_use(port);
5069 
5070     app->active_requests++;
5071 
5072     if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) {
5073         app->pending_processes++;
5074         start_process = 1;
5075     }
5076 
5077     r = req_rpc_data->request;
5078 
5079     /*
5080      * Put request into application-wide list to be able to cancel request
5081      * if something goes wrong with application processes.
5082      */
5083     nxt_queue_insert_tail(&app->ack_waiting_req, &r->app_link);
5084 
5085     nxt_thread_mutex_unlock(&app->mutex);
5086 
5087     /*
5088      * Retain request memory pool while request is linked in ack_waiting_req
5089      * to guarantee request structure memory is accessble.
5090      */
5091     nxt_mp_retain(r->mem_pool);
5092 
5093     req_rpc_data->app_port = port;
5094     req_rpc_data->apr_action = NXT_APR_REQUEST_FAILED;
5095 
5096     if (start_process) {
5097         nxt_router_start_app_process(task, app);
5098     }
5099 }
5100 
5101 
5102 void
5103 nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r,
5104     nxt_http_action_t *action)
5105 {
5106     nxt_event_engine_t      *engine;
5107     nxt_http_app_conf_t     *conf;
5108     nxt_request_rpc_data_t  *req_rpc_data;
5109 
5110     conf = action->u.conf;
5111     engine = task->thread->engine;
5112 
5113     r->app_target = conf->target;
5114 
5115     req_rpc_data = nxt_port_rpc_register_handler_ex(task, engine->port,
5116                                           nxt_router_response_ready_handler,
5117                                           nxt_router_response_error_handler,
5118                                           sizeof(nxt_request_rpc_data_t));
5119     if (nxt_slow_path(req_rpc_data == NULL)) {
5120         nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
5121         return;
5122     }
5123 
5124     /*
5125      * At this point we have request req_rpc_data allocated and registered
5126      * in port handlers.  Need to fixup request memory pool.  Counterpart
5127      * release will be called via following call chain:
5128      *    nxt_request_rpc_data_unlink() ->
5129      *        nxt_router_http_request_release_post() ->
5130      *            nxt_router_http_request_release()
5131      */
5132     nxt_mp_retain(r->mem_pool);
5133 
5134     r->timer.task = &engine->task;
5135     r->timer.work_queue = &engine->fast_work_queue;
5136     r->timer.log = engine->task.log;
5137     r->timer.bias = NXT_TIMER_DEFAULT_BIAS;
5138 
5139     r->engine = engine;
5140     r->err_work.handler = nxt_router_http_request_error;
5141     r->err_work.task = task;
5142     r->err_work.obj = r;
5143 
5144     req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data);
5145     req_rpc_data->app = conf->app;
5146     req_rpc_data->msg_info.body_fd = -1;
5147     req_rpc_data->rpc_cancel = 1;
5148 
5149     nxt_router_app_use(task, conf->app, 1);
5150 
5151     req_rpc_data->request = r;
5152     r->req_rpc_data = req_rpc_data;
5153 
5154     if (r->last != NULL) {
5155         r->last->completion_handler = nxt_router_http_request_done;
5156     }
5157 
5158     nxt_router_app_port_get(task, conf->app, req_rpc_data);
5159     nxt_router_app_prepare_request(task, req_rpc_data);
5160 }
5161 
5162 
5163 static void
5164 nxt_router_http_request_error(nxt_task_t *task, void *obj, void *data)
5165 {
5166     nxt_http_request_t  *r;
5167 
5168     r = obj;
5169 
5170     nxt_debug(task, "router http request error (rpc_data %p)", r->req_rpc_data);
5171 
5172     nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
5173 
5174     if (r->req_rpc_data != NULL) {
5175         nxt_request_rpc_data_unlink(task, r->req_rpc_data);
5176     }
5177 
5178     nxt_mp_release(r->mem_pool);
5179 }
5180 
5181 
5182 static void
5183 nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data)
5184 {
5185     nxt_http_request_t  *r;
5186 
5187     r = data;
5188 
5189     nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data);
5190 
5191     if (r->req_rpc_data != NULL) {
5192         nxt_request_rpc_data_unlink(task, r->req_rpc_data);
5193     }
5194 
5195     nxt_http_request_close_handler(task, r, r->proto.any);
5196 }
5197 
5198 
5199 static void
5200 nxt_router_app_prepare_request(nxt_task_t *task,
5201     nxt_request_rpc_data_t *req_rpc_data)
5202 {
5203     nxt_app_t         *app;
5204     nxt_buf_t         *buf, *body;
5205     nxt_int_t         res;
5206     nxt_port_t        *port, *reply_port;
5207 
5208     int                   notify;
5209     struct {
5210         nxt_port_msg_t       pm;
5211         nxt_port_mmap_msg_t  mm;
5212     } msg;
5213 
5214 
5215     app = req_rpc_data->app;
5216 
5217     nxt_assert(app != NULL);
5218 
5219     port = req_rpc_data->app_port;
5220 
5221     nxt_assert(port != NULL);
5222     nxt_assert(port->queue != NULL);
5223 
5224     reply_port = task->thread->engine->port;
5225 
5226     buf = nxt_router_prepare_msg(task, req_rpc_data->request, app,
5227                                  nxt_app_msg_prefix[app->type]);
5228     if (nxt_slow_path(buf == NULL)) {
5229         nxt_alert(task, "stream #%uD, app '%V': failed to prepare app message",
5230                   req_rpc_data->stream, &app->name);
5231 
5232         nxt_http_request_error(task, req_rpc_data->request,
5233                                NXT_HTTP_INTERNAL_SERVER_ERROR);
5234 
5235         return;
5236     }
5237 
5238     nxt_debug(task, "about to send %O bytes buffer to app process port %d",
5239                     nxt_buf_used_size(buf),
5240                     port->socket.fd);
5241 
5242     req_rpc_data->msg_info.buf = buf;
5243 
5244     body = req_rpc_data->request->body;
5245 
5246     if (body != NULL && nxt_buf_is_file(body)) {
5247         req_rpc_data->msg_info.body_fd = body->file->fd;
5248 
5249         body->file->fd = -1;
5250 
5251     } else {
5252         req_rpc_data->msg_info.body_fd = -1;
5253     }
5254 
5255     msg.pm.stream = req_rpc_data->stream;
5256     msg.pm.pid = reply_port->pid;
5257     msg.pm.reply_port = reply_port->id;
5258     msg.pm.type = NXT_PORT_MSG_REQ_HEADERS;
5259     msg.pm.last = 0;
5260     msg.pm.mmap = 1;
5261     msg.pm.nf = 0;
5262     msg.pm.mf = 0;
5263     msg.pm.tracking = 0;
5264 
5265     nxt_port_mmap_handler_t *mmap_handler = buf->parent;
5266     nxt_port_mmap_header_t *hdr = mmap_handler->hdr;
5267 
5268     msg.mm.mmap_id = hdr->id;
5269     msg.mm.chunk_id = nxt_port_mmap_chunk_id(hdr, buf->mem.pos);
5270     msg.mm.size = nxt_buf_used_size(buf);
5271 
5272     res = nxt_app_queue_send(port->queue, &msg, sizeof(msg),
5273                              req_rpc_data->stream, &notify,
5274                              &req_rpc_data->msg_info.tracking_cookie);
5275     if (nxt_fast_path(res == NXT_OK)) {
5276         if (notify != 0) {
5277             (void) nxt_port_socket_write(task, port,
5278                                          NXT_PORT_MSG_READ_QUEUE,
5279                                          -1, req_rpc_data->stream,
5280                                          reply_port->id, NULL);
5281 
5282         } else {
5283             nxt_debug(task, "queue is not empty");
5284         }
5285 
5286         buf->is_port_mmap_sent = 1;
5287         buf->mem.pos = buf->mem.free;
5288 
5289     } else {
5290         nxt_alert(task, "stream #%uD, app '%V': failed to send app message",
5291                   req_rpc_data->stream, &app->name);
5292 
5293         nxt_http_request_error(task, req_rpc_data->request,
5294                                NXT_HTTP_INTERNAL_SERVER_ERROR);
5295     }
5296 }
5297 
5298 
5299 struct nxt_fields_iter_s {
5300     nxt_list_part_t   *part;
5301     nxt_http_field_t  *field;
5302 };
5303 
5304 typedef struct nxt_fields_iter_s  nxt_fields_iter_t;
5305 
5306 
5307 static nxt_http_field_t *
5308 nxt_fields_part_first(nxt_list_part_t *part, nxt_fields_iter_t *i)
5309 {
5310     if (part == NULL) {
5311         return NULL;
5312     }
5313 
5314     while (part->nelts == 0) {
5315         part = part->next;
5316         if (part == NULL) {
5317             return NULL;
5318         }
5319     }
5320 
5321     i->part = part;
5322     i->field = nxt_list_data(i->part);
5323 
5324     return i->field;
5325 }
5326 
5327 
5328 static nxt_http_field_t *
5329 nxt_fields_first(nxt_list_t *fields, nxt_fields_iter_t *i)
5330 {
5331     return nxt_fields_part_first(nxt_list_part(fields), i);
5332 }
5333 
5334 
5335 static nxt_http_field_t *
5336 nxt_fields_next(nxt_fields_iter_t *i)
5337 {
5338     nxt_http_field_t  *end = nxt_list_data(i->part);
5339 
5340     end += i->part->nelts;
5341     i->field++;
5342 
5343     if (i->field < end) {
5344         return i->field;
5345     }
5346 
5347     return nxt_fields_part_first(i->part->next, i);
5348 }
5349 
5350 
5351 static nxt_buf_t *
5352 nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r,
5353     nxt_app_t *app, const nxt_str_t *prefix)
5354 {
5355     void                *target_pos, *query_pos;
5356     u_char              *pos, *end, *p, c;
5357     size_t              fields_count, req_size, size, free_size;
5358     size_t              copy_size;
5359     nxt_off_t           content_length;
5360     nxt_buf_t           *b, *buf, *out, **tail;
5361     nxt_http_field_t    *field, *dup;
5362     nxt_unit_field_t    *dst_field;
5363     nxt_fields_iter_t   iter, dup_iter;
5364     nxt_unit_request_t  *req;
5365 
5366     req_size = sizeof(nxt_unit_request_t)
5367                + r->method->length + 1
5368                + r->version.length + 1
5369                + r->remote->length + 1
5370                + r->local->length + 1
5371                + r->server_name.length + 1
5372                + r->target.length + 1
5373                + (r->path->start != r->target.start ? r->path->length + 1 : 0);
5374 
5375     content_length = r->content_length_n < 0 ? 0 : r->content_length_n;
5376     fields_count = 0;
5377 
5378     nxt_list_each(field, r->fields) {
5379         fields_count++;
5380 
5381         req_size += field->name_length + prefix->length + 1
5382                     + field->value_length + 1;
5383     } nxt_list_loop;
5384 
5385     req_size += fields_count * sizeof(nxt_unit_field_t);
5386 
5387     if (nxt_slow_path(req_size > PORT_MMAP_DATA_SIZE)) {
5388         nxt_alert(task, "headers to big to fit in shared memory (%d)",
5389                   (int) req_size);
5390 
5391         return NULL;
5392     }
5393 
5394     out = nxt_port_mmap_get_buf(task, &app->outgoing,
5395               nxt_min(req_size + content_length, PORT_MMAP_DATA_SIZE));
5396     if (nxt_slow_path(out == NULL)) {
5397         return NULL;
5398     }
5399 
5400     req = (nxt_unit_request_t *) out->mem.free;
5401     out->mem.free += req_size;
5402 
5403     req->app_target = r->app_target;
5404 
5405     req->content_length = content_length;
5406 
5407     p = (u_char *) (req->fields + fields_count);
5408 
5409     nxt_debug(task, "fields_count=%d", (int) fields_count);
5410 
5411     req->method_length = r->method->length;
5412     nxt_unit_sptr_set(&req->method, p);
5413     p = nxt_cpymem(p, r->method->start, r->method->length);
5414     *p++ = '\0';
5415 
5416     req->version_length = r->version.length;
5417     nxt_unit_sptr_set(&req->version, p);
5418     p = nxt_cpymem(p, r->version.start, r->version.length);
5419     *p++ = '\0';
5420 
5421     req->remote_length = r->remote->address_length;
5422     nxt_unit_sptr_set(&req->remote, p);
5423     p = nxt_cpymem(p, nxt_sockaddr_address(r->remote),
5424                    r->remote->address_length);
5425     *p++ = '\0';
5426 
5427     req->local_length = r->local->address_length;
5428     nxt_unit_sptr_set(&req->local, p);
5429     p = nxt_cpymem(p, nxt_sockaddr_address(r->local), r->local->address_length);
5430     *p++ = '\0';
5431 
5432     req->tls = (r->tls != NULL);
5433     req->websocket_handshake = r->websocket_handshake;
5434 
5435     req->server_name_length = r->server_name.length;
5436     nxt_unit_sptr_set(&req->server_name, p);
5437     p = nxt_cpymem(p, r->server_name.start, r->server_name.length);
5438     *p++ = '\0';
5439 
5440     target_pos = p;
5441     req->target_length = (uint32_t) r->target.length;
5442     nxt_unit_sptr_set(&req->target, p);
5443     p = nxt_cpymem(p, r->target.start, r->target.length);
5444     *p++ = '\0';
5445 
5446     req->path_length = (uint32_t) r->path->length;
5447     if (r->path->start == r->target.start) {
5448         nxt_unit_sptr_set(&req->path, target_pos);
5449 
5450     } else {
5451         nxt_unit_sptr_set(&req->path, p);
5452         p = nxt_cpymem(p, r->path->start, r->path->length);
5453         *p++ = '\0';
5454     }
5455 
5456     req->query_length = r->args != NULL ? (uint32_t) r->args->length : 0;
5457     if (r->args != NULL && r->args->start != NULL) {
5458         query_pos = nxt_pointer_to(target_pos,
5459                                    r->args->start - r->target.start);
5460 
5461         nxt_unit_sptr_set(&req->query, query_pos);
5462 
5463     } else {
5464         req->query.offset = 0;
5465     }
5466 
5467     req->content_length_field = NXT_UNIT_NONE_FIELD;
5468     req->content_type_field   = NXT_UNIT_NONE_FIELD;
5469     req->cookie_field         = NXT_UNIT_NONE_FIELD;
5470     req->authorization_field  = NXT_UNIT_NONE_FIELD;
5471 
5472     dst_field = req->fields;
5473 
5474     for (field = nxt_fields_first(r->fields, &iter);
5475          field != NULL;
5476          field = nxt_fields_next(&iter))
5477     {
5478         if (field->skip) {
5479             continue;
5480         }
5481 
5482         dst_field->hash = field->hash;
5483         dst_field->skip = 0;
5484         dst_field->name_length = field->name_length + prefix->length;
5485         dst_field->value_length = field->value_length;
5486 
5487         if (field == r->content_length) {
5488             req->content_length_field = dst_field - req->fields;
5489 
5490         } else if (field == r->content_type) {
5491             req->content_type_field = dst_field - req->fields;
5492 
5493         } else if (field == r->cookie) {
5494             req->cookie_field = dst_field - req->fields;
5495 
5496         } else if (field == r->authorization) {
5497             req->authorization_field = dst_field - req->fields;
5498         }
5499 
5500         nxt_debug(task, "add field 0x%04Xd, %d, %d, %p : %d %p",
5501                   (int) field->hash, (int) field->skip,
5502                   (int) field->name_length, field->name,
5503                   (int) field->value_length, field->value);
5504 
5505         if (prefix->length != 0) {
5506             nxt_unit_sptr_set(&dst_field->name, p);
5507             p = nxt_cpymem(p, prefix->start, prefix->length);
5508 
5509             end = field->name + field->name_length;
5510             for (pos = field->name; pos < end; pos++) {
5511                 c = *pos;
5512 
5513                 if (c >= 'a' && c <= 'z') {
5514                     *p++ = (c & ~0x20);
5515                     continue;
5516                 }
5517 
5518                 if (c == '-') {
5519                     *p++ = '_';
5520                     continue;
5521                 }
5522 
5523                 *p++ = c;
5524             }
5525 
5526         } else {
5527             nxt_unit_sptr_set(&dst_field->name, p);
5528             p = nxt_cpymem(p, field->name, field->name_length);
5529         }
5530 
5531         *p++ = '\0';
5532 
5533         nxt_unit_sptr_set(&dst_field->value, p);
5534         p = nxt_cpymem(p, field->value, field->value_length);
5535 
5536         if (prefix->length != 0) {
5537             dup_iter = iter;
5538 
5539             for (dup = nxt_fields_next(&dup_iter);
5540                  dup != NULL;
5541                  dup = nxt_fields_next(&dup_iter))
5542             {
5543                 if (dup->name_length != field->name_length
5544                     || dup->skip
5545                     || dup->hash != field->hash
5546                     || nxt_memcasecmp(dup->name, field->name, dup->name_length))
5547                 {
5548                     continue;
5549                 }
5550 
5551                 p = nxt_cpymem(p, ", ", 2);
5552                 p = nxt_cpymem(p, dup->value, dup->value_length);
5553 
5554                 dst_field->value_length += 2 + dup->value_length;
5555 
5556                 dup->skip = 1;
5557             }
5558         }
5559 
5560         *p++ = '\0';
5561 
5562         dst_field++;
5563     }
5564 
5565     req->fields_count = (uint32_t) (dst_field - req->fields);
5566 
5567     nxt_unit_sptr_set(&req->preread_content, out->mem.free);
5568 
5569     buf = out;
5570     tail = &buf->next;
5571 
5572     for (b = r->body; b != NULL; b = b->next) {
5573         size = nxt_buf_mem_used_size(&b->mem);
5574         pos = b->mem.pos;
5575 
5576         while (size > 0) {
5577             if (buf == NULL) {
5578                 free_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
5579 
5580                 buf = nxt_port_mmap_get_buf(task, &app->outgoing, free_size);
5581                 if (nxt_slow_path(buf == NULL)) {
5582                     while (out != NULL) {
5583                         buf = out->next;
5584                         out->next = NULL;
5585                         out->completion_handler(task, out, out->parent);
5586                         out = buf;
5587                     }
5588                     return NULL;
5589                 }
5590 
5591                 *tail = buf;
5592                 tail = &buf->next;
5593 
5594             } else {
5595                 free_size = nxt_buf_mem_free_size(&buf->mem);
5596                 if (free_size < size
5597                     && nxt_port_mmap_increase_buf(task, buf, size, 1)
5598                        == NXT_OK)
5599                 {
5600                     free_size = nxt_buf_mem_free_size(&buf->mem);
5601                 }
5602             }
5603 
5604             if (free_size > 0) {
5605                 copy_size = nxt_min(free_size, size);
5606 
5607                 buf->mem.free = nxt_cpymem(buf->mem.free, pos, copy_size);
5608 
5609                 size -= copy_size;
5610                 pos += copy_size;
5611 
5612                 if (size == 0) {
5613                     break;
5614                 }
5615             }
5616 
5617             buf = NULL;
5618         }
5619     }
5620 
5621     return out;
5622 }
5623 
5624 
5625 static void
5626 nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
5627 {
5628     nxt_timer_t              *timer;
5629     nxt_http_request_t       *r;
5630     nxt_request_rpc_data_t   *req_rpc_data;
5631 
5632     timer = obj;
5633 
5634     nxt_debug(task, "router app timeout");
5635 
5636     r = nxt_timer_data(timer, nxt_http_request_t, timer);
5637     req_rpc_data = r->timer_data;
5638 
5639     nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
5640 
5641     nxt_request_rpc_data_unlink(task, req_rpc_data);
5642 }
5643 
5644 
5645 static void
5646 nxt_router_http_request_release_post(nxt_task_t *task, nxt_http_request_t *r)
5647 {
5648     r->timer.handler = nxt_router_http_request_release;
5649     nxt_timer_add(task->thread->engine, &r->timer, 0);
5650 }
5651 
5652 
5653 static void
5654 nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data)
5655 {
5656     nxt_http_request_t  *r;
5657 
5658     nxt_debug(task, "http request pool release");
5659 
5660     r = nxt_timer_data(obj, nxt_http_request_t, timer);
5661 
5662     nxt_mp_release(r->mem_pool);
5663 }
5664 
5665 
5666 static void
5667 nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5668 {
5669     size_t                   mi;
5670     uint32_t                 i;
5671     nxt_bool_t               ack;
5672     nxt_process_t            *process;
5673     nxt_free_map_t           *m;
5674     nxt_port_mmap_handler_t  *mmap_handler;
5675 
5676     nxt_debug(task, "oosm in %PI", msg->port_msg.pid);
5677 
5678     process = nxt_runtime_process_find(task->thread->runtime,
5679                                        msg->port_msg.pid);
5680     if (nxt_slow_path(process == NULL)) {
5681         return;
5682     }
5683 
5684     ack = 0;
5685 
5686     /*
5687      * To mitigate possible racing condition (when OOSM message received
5688      * after some of the memory was already freed), need to try to find
5689      * first free segment in shared memory and send ACK if found.
5690      */
5691 
5692     nxt_thread_mutex_lock(&process->incoming.mutex);
5693 
5694     for (i = 0; i < process->incoming.size; i++) {
5695         mmap_handler = process->incoming.elts[i].mmap_handler;
5696 
5697         if (nxt_slow_path(mmap_handler == NULL)) {
5698             continue;
5699         }
5700 
5701         m = mmap_handler->hdr->free_map;
5702 
5703         for (mi = 0; mi < MAX_FREE_IDX; mi++) {
5704             if (m[mi] != 0) {
5705                 ack = 1;
5706 
5707                 nxt_debug(task, "oosm: already free #%uD %uz = 0x%08xA",
5708                           i, mi, m[mi]);
5709 
5710                 break;
5711             }
5712         }
5713     }
5714 
5715     nxt_thread_mutex_unlock(&process->incoming.mutex);
5716 
5717     if (ack) {
5718         nxt_process_broadcast_shm_ack(task, process);
5719     }
5720 }
5721 
5722 
5723 static void
5724 nxt_router_get_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5725 {
5726     nxt_fd_t                 fd;
5727     nxt_port_t               *port;
5728     nxt_runtime_t            *rt;
5729     nxt_port_mmaps_t         *mmaps;
5730     nxt_port_msg_get_mmap_t  *get_mmap_msg;
5731     nxt_port_mmap_handler_t  *mmap_handler;
5732 
5733     rt = task->thread->runtime;
5734 
5735     port = nxt_runtime_port_find(rt, msg->port_msg.pid,
5736                                  msg->port_msg.reply_port);
5737     if (nxt_slow_path(port == NULL)) {
5738         nxt_alert(task, "get_mmap_handler: reply_port %PI:%d not found",
5739                   msg->port_msg.pid, msg->port_msg.reply_port);
5740 
5741         return;
5742     }
5743 
5744     if (nxt_slow_path(nxt_buf_used_size(msg->buf)
5745                       < (int) sizeof(nxt_port_msg_get_mmap_t)))
5746     {
5747         nxt_alert(task, "get_mmap_handler: message buffer too small (%d)",
5748                   (int) nxt_buf_used_size(msg->buf));
5749 
5750         return;
5751     }
5752 
5753     get_mmap_msg = (nxt_port_msg_get_mmap_t *) msg->buf->mem.pos;
5754 
5755     nxt_assert(port->type == NXT_PROCESS_APP);
5756 
5757     if (nxt_slow_path(port->app == NULL)) {
5758         nxt_alert(task, "get_mmap_handler: app == NULL for reply port %PI:%d",
5759                   port->pid, port->id);
5760 
5761         // FIXME
5762         nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
5763                               -1, msg->port_msg.stream, 0, NULL);
5764 
5765         return;
5766     }
5767 
5768     mmaps = &port->app->outgoing;
5769     nxt_thread_mutex_lock(&mmaps->mutex);
5770 
5771     if (nxt_slow_path(get_mmap_msg->id >= mmaps->size)) {
5772         nxt_thread_mutex_unlock(&mmaps->mutex);
5773 
5774         nxt_alert(task, "get_mmap_handler: mmap id is too big (%d)",
5775                   (int) get_mmap_msg->id);
5776 
5777         // FIXME
5778         nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
5779                               -1, msg->port_msg.stream, 0, NULL);
5780         return;
5781     }
5782 
5783     mmap_handler = mmaps->elts[get_mmap_msg->id].mmap_handler;
5784 
5785     fd = mmap_handler->fd;
5786 
5787     nxt_thread_mutex_unlock(&mmaps->mutex);
5788 
5789     nxt_debug(task, "get mmap %PI:%d found",
5790               msg->port_msg.pid, (int) get_mmap_msg->id);
5791 
5792     (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL);
5793 }
5794 
5795 
5796 static void
5797 nxt_router_get_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5798 {
5799     nxt_port_t               *port, *reply_port;
5800     nxt_runtime_t            *rt;
5801     nxt_port_msg_get_port_t  *get_port_msg;
5802 
5803     rt = task->thread->runtime;
5804 
5805     reply_port = nxt_runtime_port_find(rt, msg->port_msg.pid,
5806                                        msg->port_msg.reply_port);
5807     if (nxt_slow_path(reply_port == NULL)) {
5808         nxt_alert(task, "get_port_handler: reply_port %PI:%d not found",
5809                   msg->port_msg.pid, msg->port_msg.reply_port);
5810 
5811         return;
5812     }
5813 
5814     if (nxt_slow_path(nxt_buf_used_size(msg->buf)
5815                       < (int) sizeof(nxt_port_msg_get_port_t)))
5816     {
5817         nxt_alert(task, "get_port_handler: message buffer too small (%d)",
5818                   (int) nxt_buf_used_size(msg->buf));
5819 
5820         return;
5821     }
5822 
5823     get_port_msg = (nxt_port_msg_get_port_t *) msg->buf->mem.pos;
5824 
5825     port = nxt_runtime_port_find(rt, get_port_msg->pid, get_port_msg->id);
5826     if (nxt_slow_path(port == NULL)) {
5827         nxt_alert(task, "get_port_handler: port %PI:%d not found",
5828                   get_port_msg->pid, get_port_msg->id);
5829 
5830         return;
5831     }
5832 
5833     nxt_debug(task, "get port %PI:%d found", get_port_msg->pid,
5834               get_port_msg->id);
5835 
5836     (void) nxt_port_send_port(task, reply_port, port, msg->port_msg.stream);
5837 }
5838