xref: /unit/src/nxt_router.c (revision 1926:6e85d6c0b8bb)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Valentin V. Bartenev
5  * Copyright (C) NGINX, Inc.
6  */
7 
8 #include <nxt_router.h>
9 #include <nxt_conf.h>
10 #if (NXT_TLS)
11 #include <nxt_cert.h>
12 #endif
13 #include <nxt_http.h>
14 #include <nxt_port_memory_int.h>
15 #include <nxt_unit_request.h>
16 #include <nxt_unit_response.h>
17 #include <nxt_router_request.h>
18 #include <nxt_app_queue.h>
19 #include <nxt_port_queue.h>
20 
21 #define NXT_SHARED_PORT_ID  0xFFFFu
22 
23 typedef struct {
24     nxt_str_t         type;
25     uint32_t          processes;
26     uint32_t          max_processes;
27     uint32_t          spare_processes;
28     nxt_msec_t        timeout;
29     nxt_msec_t        idle_timeout;
30     uint32_t          requests;
31     nxt_conf_value_t  *limits_value;
32     nxt_conf_value_t  *processes_value;
33     nxt_conf_value_t  *targets_value;
34 } nxt_router_app_conf_t;
35 
36 
37 typedef struct {
38     nxt_str_t         pass;
39     nxt_str_t         application;
40 } nxt_router_listener_conf_t;
41 
42 
43 #if (NXT_TLS)
44 
45 typedef struct {
46     nxt_str_t               name;
47     nxt_socket_conf_t       *socket_conf;
48     nxt_router_temp_conf_t  *temp_conf;
49     nxt_tls_init_t          *tls_init;
50     nxt_bool_t              last;
51 
52     nxt_queue_link_t        link;  /* for nxt_socket_conf_t.tls */
53 } nxt_router_tlssock_t;
54 
55 #endif
56 
57 
58 typedef struct {
59     nxt_str_t               *name;
60     nxt_socket_conf_t       *socket_conf;
61     nxt_router_temp_conf_t  *temp_conf;
62     nxt_bool_t              last;
63 } nxt_socket_rpc_t;
64 
65 
66 typedef struct {
67     nxt_app_t               *app;
68     nxt_router_temp_conf_t  *temp_conf;
69 } nxt_app_rpc_t;
70 
71 
72 typedef struct {
73     nxt_app_joint_t         *app_joint;
74     uint32_t                generation;
75 } nxt_app_joint_rpc_t;
76 
77 
78 static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process,
79     nxt_mp_t *mp);
80 static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data);
81 static void nxt_router_greet_controller(nxt_task_t *task,
82     nxt_port_t *controller_port);
83 
84 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
85 
86 static void nxt_router_new_port_handler(nxt_task_t *task,
87     nxt_port_recv_msg_t *msg);
88 static void nxt_router_conf_data_handler(nxt_task_t *task,
89     nxt_port_recv_msg_t *msg);
90 static void nxt_router_app_restart_handler(nxt_task_t *task,
91     nxt_port_recv_msg_t *msg);
92 static void nxt_router_remove_pid_handler(nxt_task_t *task,
93     nxt_port_recv_msg_t *msg);
94 static void nxt_router_access_log_reopen_handler(nxt_task_t *task,
95     nxt_port_recv_msg_t *msg);
96 
97 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
98 static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
99 static void nxt_router_conf_ready(nxt_task_t *task,
100     nxt_router_temp_conf_t *tmcf);
101 static void nxt_router_conf_error(nxt_task_t *task,
102     nxt_router_temp_conf_t *tmcf);
103 static void nxt_router_conf_send(nxt_task_t *task,
104     nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
105 
106 static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
107     nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
108 static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task,
109     nxt_router_conf_t *rtcf, nxt_conf_value_t *conf);
110 
111 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
112 static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data);
113 static nxt_int_t nxt_router_apps_hash_add(nxt_router_conf_t *rtcf,
114     nxt_app_t *app);
115 static nxt_app_t *nxt_router_apps_hash_get(nxt_router_conf_t *rtcf,
116     nxt_str_t *name);
117 static void nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf,
118     int i);
119 
120 static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task,
121     nxt_port_t *port);
122 static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task,
123     nxt_port_t *port);
124 static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task,
125     nxt_port_t *port, nxt_fd_t fd);
126 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
127     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
128 static void nxt_router_listen_socket_ready(nxt_task_t *task,
129     nxt_port_recv_msg_t *msg, void *data);
130 static void nxt_router_listen_socket_error(nxt_task_t *task,
131     nxt_port_recv_msg_t *msg, void *data);
132 #if (NXT_TLS)
133 static void nxt_router_tls_rpc_handler(nxt_task_t *task,
134     nxt_port_recv_msg_t *msg, void *data);
135 static nxt_int_t nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
136     nxt_conf_value_t *value, nxt_socket_conf_t *skcf, nxt_tls_init_t *tls_init,
137     nxt_bool_t last);
138 #endif
139 static void nxt_router_app_rpc_create(nxt_task_t *task,
140     nxt_router_temp_conf_t *tmcf, nxt_app_t *app);
141 static void nxt_router_app_prefork_ready(nxt_task_t *task,
142     nxt_port_recv_msg_t *msg, void *data);
143 static void nxt_router_app_prefork_error(nxt_task_t *task,
144     nxt_port_recv_msg_t *msg, void *data);
145 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
146     nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
147 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
148     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa);
149 
150 static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
151     nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
152     const nxt_event_interface_t *interface);
153 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
154     nxt_router_engine_conf_t *recf);
155 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
156     nxt_router_engine_conf_t *recf);
157 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
158     nxt_router_engine_conf_t *recf);
159 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
160     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
161     nxt_work_handler_t handler);
162 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
163     nxt_router_engine_conf_t *recf);
164 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
165     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
166 
167 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
168     nxt_router_temp_conf_t *tmcf);
169 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
170     nxt_event_engine_t *engine);
171 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
172     nxt_router_temp_conf_t *tmcf);
173 
174 static void nxt_router_engines_post(nxt_router_t *router,
175     nxt_router_temp_conf_t *tmcf);
176 static void nxt_router_engine_post(nxt_event_engine_t *engine,
177     nxt_work_t *jobs);
178 
179 static void nxt_router_thread_start(void *data);
180 static void nxt_router_rt_add_port(nxt_task_t *task, void *obj,
181     void *data);
182 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
183     void *data);
184 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
185     void *data);
186 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
187     void *data);
188 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
189     void *data);
190 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
191     void *data);
192 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
193     void *data);
194 static void nxt_router_req_headers_ack_handler(nxt_task_t *task,
195     nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data);
196 static void nxt_router_listen_socket_release(nxt_task_t *task,
197     nxt_socket_conf_t *skcf);
198 
199 static void nxt_router_access_log_writer(nxt_task_t *task,
200     nxt_http_request_t *r, nxt_router_access_log_t *access_log);
201 static u_char *nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now,
202     struct tm *tm, size_t size, const char *format);
203 static void nxt_router_access_log_open(nxt_task_t *task,
204     nxt_router_temp_conf_t *tmcf);
205 static void nxt_router_access_log_ready(nxt_task_t *task,
206     nxt_port_recv_msg_t *msg, void *data);
207 static void nxt_router_access_log_error(nxt_task_t *task,
208     nxt_port_recv_msg_t *msg, void *data);
209 static void nxt_router_access_log_release(nxt_task_t *task,
210     nxt_thread_spinlock_t *lock, nxt_router_access_log_t *access_log);
211 static void nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj,
212     void *data);
213 static void nxt_router_access_log_reopen_ready(nxt_task_t *task,
214     nxt_port_recv_msg_t *msg, void *data);
215 static void nxt_router_access_log_reopen_error(nxt_task_t *task,
216     nxt_port_recv_msg_t *msg, void *data);
217 
218 static void nxt_router_app_port_ready(nxt_task_t *task,
219     nxt_port_recv_msg_t *msg, void *data);
220 static nxt_int_t nxt_router_app_shared_port_send(nxt_task_t *task,
221     nxt_port_t *app_port);
222 static void nxt_router_app_port_error(nxt_task_t *task,
223     nxt_port_recv_msg_t *msg, void *data);
224 
225 static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i);
226 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
227 
228 static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
229     nxt_apr_action_t action);
230 static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
231     nxt_request_rpc_data_t *req_rpc_data);
232 static void nxt_router_http_request_error(nxt_task_t *task, void *obj,
233     void *data);
234 static void nxt_router_http_request_done(nxt_task_t *task, void *obj,
235     void *data);
236 
237 static void nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj,
238     void *data);
239 static void nxt_router_app_prepare_request(nxt_task_t *task,
240     nxt_request_rpc_data_t *req_rpc_data);
241 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task,
242     nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix);
243 
244 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
245 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
246     void *data);
247 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj,
248     void *data);
249 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj,
250     void *data);
251 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data);
252 
253 static const nxt_http_request_state_t  nxt_http_request_send_state;
254 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
255 
256 static void nxt_router_app_joint_use(nxt_task_t *task,
257     nxt_app_joint_t *app_joint, int i);
258 
259 static void nxt_router_http_request_release_post(nxt_task_t *task,
260     nxt_http_request_t *r);
261 static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
262     void *data);
263 static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
264 static void nxt_router_get_port_handler(nxt_task_t *task,
265     nxt_port_recv_msg_t *msg);
266 static void nxt_router_get_mmap_handler(nxt_task_t *task,
267     nxt_port_recv_msg_t *msg);
268 
269 extern const nxt_http_request_state_t  nxt_http_websocket;
270 
271 static nxt_router_t  *nxt_router;
272 
273 static const nxt_str_t http_prefix = nxt_string("HTTP_");
274 static const nxt_str_t empty_prefix = nxt_string("");
275 
276 static const nxt_str_t  *nxt_app_msg_prefix[] = {
277     &empty_prefix,
278     &empty_prefix,
279     &http_prefix,
280     &http_prefix,
281     &http_prefix,
282     &empty_prefix,
283 };
284 
285 
286 static const nxt_port_handlers_t  nxt_router_process_port_handlers = {
287     .quit         = nxt_signal_quit_handler,
288     .new_port     = nxt_router_new_port_handler,
289     .get_port     = nxt_router_get_port_handler,
290     .change_file  = nxt_port_change_log_file_handler,
291     .mmap         = nxt_port_mmap_handler,
292     .get_mmap     = nxt_router_get_mmap_handler,
293     .data         = nxt_router_conf_data_handler,
294     .app_restart  = nxt_router_app_restart_handler,
295     .remove_pid   = nxt_router_remove_pid_handler,
296     .access_log   = nxt_router_access_log_reopen_handler,
297     .rpc_ready    = nxt_port_rpc_handler,
298     .rpc_error    = nxt_port_rpc_handler,
299     .oosm         = nxt_router_oosm_handler,
300 };
301 
302 
303 const nxt_process_init_t  nxt_router_process = {
304     .name           = "router",
305     .type           = NXT_PROCESS_ROUTER,
306     .prefork        = nxt_router_prefork,
307     .restart        = 1,
308     .setup          = nxt_process_core_setup,
309     .start          = nxt_router_start,
310     .port_handlers  = &nxt_router_process_port_handlers,
311     .signals        = nxt_process_signals,
312 };
313 
314 
315 /* Queues of nxt_socket_conf_t */
316 nxt_queue_t  creating_sockets;
317 nxt_queue_t  pending_sockets;
318 nxt_queue_t  updating_sockets;
319 nxt_queue_t  keeping_sockets;
320 nxt_queue_t  deleting_sockets;
321 
322 
323 static nxt_int_t
324 nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp)
325 {
326     nxt_runtime_stop_app_processes(task, task->thread->runtime);
327 
328     return NXT_OK;
329 }
330 
331 
332 static nxt_int_t
333 nxt_router_start(nxt_task_t *task, nxt_process_data_t *data)
334 {
335     nxt_int_t      ret;
336     nxt_port_t     *controller_port;
337     nxt_router_t   *router;
338     nxt_runtime_t  *rt;
339 
340     rt = task->thread->runtime;
341 
342     nxt_log(task, NXT_LOG_INFO, "router started");
343 
344 #if (NXT_TLS)
345     rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL");
346     if (nxt_slow_path(rt->tls == NULL)) {
347         return NXT_ERROR;
348     }
349 
350     ret = rt->tls->library_init(task);
351     if (nxt_slow_path(ret != NXT_OK)) {
352         return ret;
353     }
354 #endif
355 
356     ret = nxt_http_init(task);
357     if (nxt_slow_path(ret != NXT_OK)) {
358         return ret;
359     }
360 
361     router = nxt_zalloc(sizeof(nxt_router_t));
362     if (nxt_slow_path(router == NULL)) {
363         return NXT_ERROR;
364     }
365 
366     nxt_queue_init(&router->engines);
367     nxt_queue_init(&router->sockets);
368     nxt_queue_init(&router->apps);
369 
370     nxt_router = router;
371 
372     controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
373     if (controller_port != NULL) {
374         nxt_router_greet_controller(task, controller_port);
375     }
376 
377     return NXT_OK;
378 }
379 
380 
381 static void
382 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port)
383 {
384     nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY,
385                           -1, 0, 0, NULL);
386 }
387 
388 
389 static void
390 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
391     void *data)
392 {
393     size_t               size;
394     uint32_t             stream;
395     nxt_mp_t             *mp;
396     nxt_int_t            ret;
397     nxt_app_t            *app;
398     nxt_buf_t            *b;
399     nxt_port_t           *main_port;
400     nxt_runtime_t        *rt;
401     nxt_app_joint_rpc_t  *app_joint_rpc;
402 
403     app = data;
404 
405     rt = task->thread->runtime;
406     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
407 
408     nxt_debug(task, "app '%V' %p start process", &app->name, app);
409 
410     size = app->name.length + 1 + app->conf.length;
411 
412     b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size);
413 
414     if (nxt_slow_path(b == NULL)) {
415         goto failed;
416     }
417 
418     nxt_buf_cpystr(b, &app->name);
419     *b->mem.free++ = '\0';
420     nxt_buf_cpystr(b, &app->conf);
421 
422     app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port,
423                                                      nxt_router_app_port_ready,
424                                                      nxt_router_app_port_error,
425                                                    sizeof(nxt_app_joint_rpc_t));
426     if (nxt_slow_path(app_joint_rpc == NULL)) {
427         goto failed;
428     }
429 
430     stream = nxt_port_rpc_ex_stream(app_joint_rpc);
431 
432     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS,
433                                 -1, stream, port->id, b);
434     if (nxt_slow_path(ret != NXT_OK)) {
435         nxt_port_rpc_cancel(task, port, stream);
436 
437         goto failed;
438     }
439 
440     app_joint_rpc->app_joint = app->joint;
441     app_joint_rpc->generation = app->generation;
442 
443     nxt_router_app_joint_use(task, app->joint, 1);
444 
445     nxt_router_app_use(task, app, -1);
446 
447     return;
448 
449 failed:
450 
451     if (b != NULL) {
452         mp = b->data;
453         nxt_mp_free(mp, b);
454         nxt_mp_release(mp);
455     }
456 
457     nxt_thread_mutex_lock(&app->mutex);
458 
459     app->pending_processes--;
460 
461     nxt_thread_mutex_unlock(&app->mutex);
462 
463     nxt_router_app_use(task, app, -1);
464 }
465 
466 
467 static void
468 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i)
469 {
470     app_joint->use_count += i;
471 
472     if (app_joint->use_count == 0) {
473         nxt_assert(app_joint->app == NULL);
474 
475         nxt_free(app_joint);
476     }
477 }
478 
479 
480 static nxt_int_t
481 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
482 {
483     nxt_int_t      res;
484     nxt_port_t     *router_port;
485     nxt_runtime_t  *rt;
486 
487     nxt_debug(task, "app '%V' start process", &app->name);
488 
489     rt = task->thread->runtime;
490     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
491 
492     nxt_router_app_use(task, app, 1);
493 
494     res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler,
495                         app);
496 
497     if (res == NXT_OK) {
498         return res;
499     }
500 
501     nxt_thread_mutex_lock(&app->mutex);
502 
503     app->pending_processes--;
504 
505     nxt_thread_mutex_unlock(&app->mutex);
506 
507     nxt_router_app_use(task, app, -1);
508 
509     return NXT_ERROR;
510 }
511 
512 
513 nxt_inline nxt_bool_t
514 nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data)
515 {
516     nxt_buf_t       *b, *next;
517     nxt_bool_t      cancelled;
518     nxt_port_t      *app_port;
519     nxt_msg_info_t  *msg_info;
520 
521     msg_info = &req_rpc_data->msg_info;
522 
523     if (msg_info->buf == NULL) {
524         return 0;
525     }
526 
527     app_port = req_rpc_data->app_port;
528 
529     if (app_port != NULL && app_port->id == NXT_SHARED_PORT_ID) {
530         cancelled = nxt_app_queue_cancel(app_port->queue,
531                                          msg_info->tracking_cookie,
532                                          req_rpc_data->stream);
533 
534         if (cancelled) {
535             nxt_debug(task, "stream #%uD: cancelled by router",
536                       req_rpc_data->stream);
537         }
538 
539     } else {
540         cancelled = 0;
541     }
542 
543     for (b = msg_info->buf; b != NULL; b = next) {
544         next = b->next;
545         b->next = NULL;
546 
547         if (b->is_port_mmap_sent) {
548             b->is_port_mmap_sent = cancelled == 0;
549         }
550 
551         b->completion_handler(task, b, b->parent);
552     }
553 
554     msg_info->buf = NULL;
555 
556     return cancelled;
557 }
558 
559 
560 nxt_inline nxt_bool_t
561 nxt_queue_chk_remove(nxt_queue_link_t *lnk)
562 {
563     if (lnk->next != NULL) {
564         nxt_queue_remove(lnk);
565 
566         lnk->next = NULL;
567 
568         return 1;
569     }
570 
571     return 0;
572 }
573 
574 
575 nxt_inline void
576 nxt_request_rpc_data_unlink(nxt_task_t *task,
577     nxt_request_rpc_data_t *req_rpc_data)
578 {
579     nxt_app_t           *app;
580     nxt_bool_t          unlinked;
581     nxt_http_request_t  *r;
582 
583     nxt_router_msg_cancel(task, req_rpc_data);
584 
585     if (req_rpc_data->app_port != NULL) {
586         nxt_router_app_port_release(task, req_rpc_data->app_port,
587                                     req_rpc_data->apr_action);
588 
589         req_rpc_data->app_port = NULL;
590     }
591 
592     app = req_rpc_data->app;
593     r = req_rpc_data->request;
594 
595     if (r != NULL) {
596         r->timer_data = NULL;
597 
598         nxt_router_http_request_release_post(task, r);
599 
600         r->req_rpc_data = NULL;
601         req_rpc_data->request = NULL;
602 
603         if (app != NULL) {
604             unlinked = 0;
605 
606             nxt_thread_mutex_lock(&app->mutex);
607 
608             if (r->app_link.next != NULL) {
609                 nxt_queue_remove(&r->app_link);
610                 r->app_link.next = NULL;
611 
612                 unlinked = 1;
613             }
614 
615             nxt_thread_mutex_unlock(&app->mutex);
616 
617             if (unlinked) {
618                 nxt_mp_release(r->mem_pool);
619             }
620         }
621     }
622 
623     if (app != NULL) {
624         nxt_router_app_use(task, app, -1);
625 
626         req_rpc_data->app = NULL;
627     }
628 
629     if (req_rpc_data->msg_info.body_fd != -1) {
630         nxt_fd_close(req_rpc_data->msg_info.body_fd);
631 
632         req_rpc_data->msg_info.body_fd = -1;
633     }
634 
635     if (req_rpc_data->rpc_cancel) {
636         req_rpc_data->rpc_cancel = 0;
637 
638         nxt_port_rpc_cancel(task, task->thread->engine->port,
639                             req_rpc_data->stream);
640     }
641 }
642 
643 
644 static void
645 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
646 {
647     nxt_int_t      res;
648     nxt_app_t      *app;
649     nxt_port_t     *port, *main_app_port;
650     nxt_runtime_t  *rt;
651 
652     nxt_port_new_port_handler(task, msg);
653 
654     port = msg->u.new_port;
655 
656     if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) {
657         nxt_router_greet_controller(task, msg->u.new_port);
658     }
659 
660     if (port == NULL || port->type != NXT_PROCESS_APP) {
661 
662         if (msg->port_msg.stream == 0) {
663             return;
664         }
665 
666         msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
667 
668     } else {
669         if (msg->fd[1] != -1) {
670             res = nxt_router_port_queue_map(task, port, msg->fd[1]);
671             if (nxt_slow_path(res != NXT_OK)) {
672                 return;
673             }
674 
675             nxt_fd_close(msg->fd[1]);
676             msg->fd[1] = -1;
677         }
678     }
679 
680     if (msg->port_msg.stream != 0) {
681         nxt_port_rpc_handler(task, msg);
682         return;
683     }
684 
685     /*
686      * Port with "id == 0" is application 'main' port and it always
687      * should come with non-zero stream.
688      */
689     nxt_assert(port->id != 0);
690 
691     /* Find 'main' app port and get app reference. */
692     rt = task->thread->runtime;
693 
694     /*
695      * It is safe to access 'runtime->ports' hash because 'NEW_PORT'
696      * sent to main port (with id == 0) and processed in main thread.
697      */
698     main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0);
699     nxt_assert(main_app_port != NULL);
700 
701     app = main_app_port->app;
702 
703     if (nxt_fast_path(app != NULL)) {
704         nxt_thread_mutex_lock(&app->mutex);
705 
706         /* TODO here should be find-and-add code because there can be
707            port waiters in port_hash */
708         nxt_port_hash_add(&app->port_hash, port);
709         app->port_hash_count++;
710 
711         nxt_thread_mutex_unlock(&app->mutex);
712 
713         port->app = app;
714     }
715 
716     port->main_app_port = main_app_port;
717 
718     nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
719 }
720 
721 
722 static void
723 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
724 {
725     void                    *p;
726     size_t                  size;
727     nxt_int_t               ret;
728     nxt_port_t              *port;
729     nxt_router_temp_conf_t  *tmcf;
730 
731     port = nxt_runtime_port_find(task->thread->runtime,
732                                  msg->port_msg.pid,
733                                  msg->port_msg.reply_port);
734     if (nxt_slow_path(port == NULL)) {
735         nxt_alert(task, "conf_data_handler: reply port not found");
736         return;
737     }
738 
739     p = MAP_FAILED;
740 
741     /*
742      * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be
743      * initialized in 'cleanup' section.
744      */
745     size = 0;
746 
747     tmcf = nxt_router_temp_conf(task);
748     if (nxt_slow_path(tmcf == NULL)) {
749         goto fail;
750     }
751 
752     if (nxt_slow_path(msg->fd[0] == -1)) {
753         nxt_alert(task, "conf_data_handler: invalid shm fd");
754         goto fail;
755     }
756 
757     if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) {
758         nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)",
759                   (int) nxt_buf_mem_used_size(&msg->buf->mem));
760         goto fail;
761     }
762 
763     nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t));
764 
765     p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0);
766 
767     nxt_fd_close(msg->fd[0]);
768     msg->fd[0] = -1;
769 
770     if (nxt_slow_path(p == MAP_FAILED)) {
771         goto fail;
772     }
773 
774     nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p);
775 
776     tmcf->router_conf->router = nxt_router;
777     tmcf->stream = msg->port_msg.stream;
778     tmcf->port = port;
779 
780     nxt_port_use(task, tmcf->port, 1);
781 
782     ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size));
783 
784     if (nxt_fast_path(ret == NXT_OK)) {
785         nxt_router_conf_apply(task, tmcf, NULL);
786 
787     } else {
788         nxt_router_conf_error(task, tmcf);
789     }
790 
791     goto cleanup;
792 
793 fail:
794 
795     nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
796                           msg->port_msg.stream, 0, NULL);
797 
798     if (tmcf != NULL) {
799         nxt_mp_release(tmcf->mem_pool);
800     }
801 
802 cleanup:
803 
804     if (p != MAP_FAILED) {
805         nxt_mem_munmap(p, size);
806     }
807 
808     if (msg->fd[0] != -1) {
809         nxt_fd_close(msg->fd[0]);
810         msg->fd[0] = -1;
811     }
812 }
813 
814 
815 static void
816 nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
817 {
818     nxt_app_t            *app;
819     nxt_int_t            ret;
820     nxt_str_t            app_name;
821     nxt_port_t           *port, *reply_port, *shared_port, *old_shared_port;
822     nxt_port_msg_type_t  reply;
823 
824     reply_port = nxt_runtime_port_find(task->thread->runtime,
825                                        msg->port_msg.pid,
826                                        msg->port_msg.reply_port);
827     if (nxt_slow_path(reply_port == NULL)) {
828         nxt_alert(task, "app_restart_handler: reply port not found");
829         return;
830     }
831 
832     app_name.length = nxt_buf_mem_used_size(&msg->buf->mem);
833     app_name.start = msg->buf->mem.pos;
834 
835     nxt_debug(task, "app_restart_handler: %V", &app_name);
836 
837     app = nxt_router_app_find(&nxt_router->apps, &app_name);
838 
839     if (nxt_fast_path(app != NULL)) {
840         shared_port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
841                                    NXT_PROCESS_APP);
842         if (nxt_slow_path(shared_port == NULL)) {
843             goto fail;
844         }
845 
846         ret = nxt_port_socket_init(task, shared_port, 0);
847         if (nxt_slow_path(ret != NXT_OK)) {
848             nxt_port_use(task, shared_port, -1);
849             goto fail;
850         }
851 
852         ret = nxt_router_app_queue_init(task, shared_port);
853         if (nxt_slow_path(ret != NXT_OK)) {
854             nxt_port_write_close(shared_port);
855             nxt_port_read_close(shared_port);
856             nxt_port_use(task, shared_port, -1);
857             goto fail;
858         }
859 
860         nxt_port_write_enable(task, shared_port);
861 
862         nxt_thread_mutex_lock(&app->mutex);
863 
864         nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
865 
866             (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1,
867                                          0, 0, NULL);
868 
869         } nxt_queue_loop;
870 
871         app->generation++;
872 
873         shared_port->app = app;
874 
875         old_shared_port = app->shared_port;
876         old_shared_port->app = NULL;
877 
878         app->shared_port = shared_port;
879 
880         nxt_thread_mutex_unlock(&app->mutex);
881 
882         nxt_port_close(task, old_shared_port);
883         nxt_port_use(task, old_shared_port, -1);
884 
885         reply = NXT_PORT_MSG_RPC_READY_LAST;
886 
887     } else {
888 
889 fail:
890 
891         reply = NXT_PORT_MSG_RPC_ERROR;
892     }
893 
894     nxt_port_socket_write(task, reply_port, reply, -1, msg->port_msg.stream,
895                           0, NULL);
896 }
897 
898 
899 static void
900 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
901     void *data)
902 {
903     union {
904         nxt_pid_t  removed_pid;
905         void       *data;
906     } u;
907 
908     u.data = data;
909 
910     nxt_port_rpc_remove_peer(task, port, u.removed_pid);
911 }
912 
913 
914 static void
915 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
916 {
917     nxt_event_engine_t  *engine;
918 
919     nxt_port_remove_pid_handler(task, msg);
920 
921     nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
922     {
923         if (nxt_fast_path(engine->port != NULL)) {
924             nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid,
925                           msg->u.data);
926         }
927     }
928     nxt_queue_loop;
929 
930     if (msg->port_msg.stream == 0) {
931         return;
932     }
933 
934     msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
935 
936     nxt_port_rpc_handler(task, msg);
937 }
938 
939 
940 static nxt_router_temp_conf_t *
941 nxt_router_temp_conf(nxt_task_t *task)
942 {
943     nxt_mp_t                *mp, *tmp;
944     nxt_router_conf_t       *rtcf;
945     nxt_router_temp_conf_t  *tmcf;
946 
947     mp = nxt_mp_create(1024, 128, 256, 32);
948     if (nxt_slow_path(mp == NULL)) {
949         return NULL;
950     }
951 
952     rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t));
953     if (nxt_slow_path(rtcf == NULL)) {
954         goto fail;
955     }
956 
957     rtcf->mem_pool = mp;
958 
959     tmp = nxt_mp_create(1024, 128, 256, 32);
960     if (nxt_slow_path(tmp == NULL)) {
961         goto fail;
962     }
963 
964     tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t));
965     if (nxt_slow_path(tmcf == NULL)) {
966         goto temp_fail;
967     }
968 
969     tmcf->mem_pool = tmp;
970     tmcf->router_conf = rtcf;
971     tmcf->count = 1;
972     tmcf->engine = task->thread->engine;
973 
974     tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
975                                      sizeof(nxt_router_engine_conf_t));
976     if (nxt_slow_path(tmcf->engines == NULL)) {
977         goto temp_fail;
978     }
979 
980     nxt_queue_init(&creating_sockets);
981     nxt_queue_init(&pending_sockets);
982     nxt_queue_init(&updating_sockets);
983     nxt_queue_init(&keeping_sockets);
984     nxt_queue_init(&deleting_sockets);
985 
986 #if (NXT_TLS)
987     nxt_queue_init(&tmcf->tls);
988 #endif
989 
990     nxt_queue_init(&tmcf->apps);
991     nxt_queue_init(&tmcf->previous);
992 
993     return tmcf;
994 
995 temp_fail:
996 
997     nxt_mp_destroy(tmp);
998 
999 fail:
1000 
1001     nxt_mp_destroy(mp);
1002 
1003     return NULL;
1004 }
1005 
1006 
1007 nxt_inline nxt_bool_t
1008 nxt_router_app_can_start(nxt_app_t *app)
1009 {
1010     return app->processes + app->pending_processes < app->max_processes
1011             && app->pending_processes < app->max_pending_processes;
1012 }
1013 
1014 
1015 nxt_inline nxt_bool_t
1016 nxt_router_app_need_start(nxt_app_t *app)
1017 {
1018     return (app->active_requests
1019               > app->port_hash_count + app->pending_processes)
1020            || (app->spare_processes
1021                 > app->idle_processes + app->pending_processes);
1022 }
1023 
1024 
1025 static void
1026 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
1027 {
1028     nxt_int_t                    ret;
1029     nxt_app_t                    *app;
1030     nxt_router_t                 *router;
1031     nxt_runtime_t                *rt;
1032     nxt_queue_link_t             *qlk;
1033     nxt_socket_conf_t            *skcf;
1034     nxt_router_conf_t            *rtcf;
1035     nxt_router_temp_conf_t       *tmcf;
1036     const nxt_event_interface_t  *interface;
1037 #if (NXT_TLS)
1038     nxt_router_tlssock_t         *tls;
1039 #endif
1040 
1041     tmcf = obj;
1042 
1043     qlk = nxt_queue_first(&pending_sockets);
1044 
1045     if (qlk != nxt_queue_tail(&pending_sockets)) {
1046         nxt_queue_remove(qlk);
1047         nxt_queue_insert_tail(&creating_sockets, qlk);
1048 
1049         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1050 
1051         nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
1052 
1053         return;
1054     }
1055 
1056 #if (NXT_TLS)
1057     qlk = nxt_queue_last(&tmcf->tls);
1058 
1059     if (qlk != nxt_queue_head(&tmcf->tls)) {
1060         nxt_queue_remove(qlk);
1061 
1062         tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link);
1063 
1064         nxt_cert_store_get(task, &tls->name, tmcf->mem_pool,
1065                            nxt_router_tls_rpc_handler, tls);
1066         return;
1067     }
1068 #endif
1069 
1070     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1071 
1072         if (nxt_router_app_need_start(app)) {
1073             nxt_router_app_rpc_create(task, tmcf, app);
1074             return;
1075         }
1076 
1077     } nxt_queue_loop;
1078 
1079     rtcf = tmcf->router_conf;
1080 
1081     if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) {
1082         nxt_router_access_log_open(task, tmcf);
1083         return;
1084     }
1085 
1086     rt = task->thread->runtime;
1087 
1088     interface = nxt_service_get(rt->services, "engine", NULL);
1089 
1090     router = rtcf->router;
1091 
1092     ret = nxt_router_engines_create(task, router, tmcf, interface);
1093     if (nxt_slow_path(ret != NXT_OK)) {
1094         goto fail;
1095     }
1096 
1097     ret = nxt_router_threads_create(task, rt, tmcf);
1098     if (nxt_slow_path(ret != NXT_OK)) {
1099         goto fail;
1100     }
1101 
1102     nxt_router_apps_sort(task, router, tmcf);
1103 
1104     nxt_router_apps_hash_use(task, rtcf, 1);
1105 
1106     nxt_router_engines_post(router, tmcf);
1107 
1108     nxt_queue_add(&router->sockets, &updating_sockets);
1109     nxt_queue_add(&router->sockets, &creating_sockets);
1110 
1111     router->access_log = rtcf->access_log;
1112 
1113     nxt_router_conf_ready(task, tmcf);
1114 
1115     return;
1116 
1117 fail:
1118 
1119     nxt_router_conf_error(task, tmcf);
1120 
1121     return;
1122 }
1123 
1124 
1125 static void
1126 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data)
1127 {
1128     nxt_joint_job_t  *job;
1129 
1130     job = obj;
1131 
1132     nxt_router_conf_ready(task, job->tmcf);
1133 }
1134 
1135 
1136 static void
1137 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1138 {
1139     uint32_t               count;
1140     nxt_router_conf_t      *rtcf;
1141     nxt_thread_spinlock_t  *lock;
1142 
1143     nxt_debug(task, "temp conf %p count: %D", tmcf, tmcf->count);
1144 
1145     if (--tmcf->count > 0) {
1146         return;
1147     }
1148 
1149     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
1150 
1151     rtcf = tmcf->router_conf;
1152 
1153     lock = &rtcf->router->lock;
1154 
1155     nxt_thread_spin_lock(lock);
1156 
1157     count = rtcf->count;
1158 
1159     nxt_thread_spin_unlock(lock);
1160 
1161     nxt_debug(task, "rtcf %p: %D", rtcf, count);
1162 
1163     if (count == 0) {
1164         nxt_router_apps_hash_use(task, rtcf, -1);
1165 
1166         nxt_router_access_log_release(task, lock, rtcf->access_log);
1167 
1168         nxt_mp_destroy(rtcf->mem_pool);
1169     }
1170 
1171     nxt_mp_release(tmcf->mem_pool);
1172 }
1173 
1174 
1175 static void
1176 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1177 {
1178     nxt_app_t          *app;
1179     nxt_queue_t        new_socket_confs;
1180     nxt_socket_t       s;
1181     nxt_router_t       *router;
1182     nxt_queue_link_t   *qlk;
1183     nxt_socket_conf_t  *skcf;
1184     nxt_router_conf_t  *rtcf;
1185 
1186     nxt_alert(task, "failed to apply new conf");
1187 
1188     for (qlk = nxt_queue_first(&creating_sockets);
1189          qlk != nxt_queue_tail(&creating_sockets);
1190          qlk = nxt_queue_next(qlk))
1191     {
1192         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1193         s = skcf->listen->socket;
1194 
1195         if (s != -1) {
1196             nxt_socket_close(task, s);
1197         }
1198 
1199         nxt_free(skcf->listen);
1200     }
1201 
1202     nxt_queue_init(&new_socket_confs);
1203     nxt_queue_add(&new_socket_confs, &updating_sockets);
1204     nxt_queue_add(&new_socket_confs, &pending_sockets);
1205     nxt_queue_add(&new_socket_confs, &creating_sockets);
1206 
1207     rtcf = tmcf->router_conf;
1208 
1209     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1210 
1211         nxt_router_app_unlink(task, app);
1212 
1213     } nxt_queue_loop;
1214 
1215     router = rtcf->router;
1216 
1217     nxt_queue_add(&router->sockets, &keeping_sockets);
1218     nxt_queue_add(&router->sockets, &deleting_sockets);
1219 
1220     nxt_queue_add(&router->apps, &tmcf->previous);
1221 
1222     // TODO: new engines and threads
1223 
1224     nxt_router_access_log_release(task, &router->lock, rtcf->access_log);
1225 
1226     nxt_mp_destroy(rtcf->mem_pool);
1227 
1228     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
1229 
1230     nxt_mp_release(tmcf->mem_pool);
1231 }
1232 
1233 
1234 static void
1235 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1236     nxt_port_msg_type_t type)
1237 {
1238     nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);
1239 
1240     nxt_port_use(task, tmcf->port, -1);
1241 
1242     tmcf->port = NULL;
1243 }
1244 
1245 
1246 static nxt_conf_map_t  nxt_router_conf[] = {
1247     {
1248         nxt_string("listeners_threads"),
1249         NXT_CONF_MAP_INT32,
1250         offsetof(nxt_router_conf_t, threads),
1251     },
1252 };
1253 
1254 
1255 static nxt_conf_map_t  nxt_router_app_conf[] = {
1256     {
1257         nxt_string("type"),
1258         NXT_CONF_MAP_STR,
1259         offsetof(nxt_router_app_conf_t, type),
1260     },
1261 
1262     {
1263         nxt_string("limits"),
1264         NXT_CONF_MAP_PTR,
1265         offsetof(nxt_router_app_conf_t, limits_value),
1266     },
1267 
1268     {
1269         nxt_string("processes"),
1270         NXT_CONF_MAP_INT32,
1271         offsetof(nxt_router_app_conf_t, processes),
1272     },
1273 
1274     {
1275         nxt_string("processes"),
1276         NXT_CONF_MAP_PTR,
1277         offsetof(nxt_router_app_conf_t, processes_value),
1278     },
1279 
1280     {
1281         nxt_string("targets"),
1282         NXT_CONF_MAP_PTR,
1283         offsetof(nxt_router_app_conf_t, targets_value),
1284     },
1285 };
1286 
1287 
1288 static nxt_conf_map_t  nxt_router_app_limits_conf[] = {
1289     {
1290         nxt_string("timeout"),
1291         NXT_CONF_MAP_MSEC,
1292         offsetof(nxt_router_app_conf_t, timeout),
1293     },
1294 
1295     {
1296         nxt_string("requests"),
1297         NXT_CONF_MAP_INT32,
1298         offsetof(nxt_router_app_conf_t, requests),
1299     },
1300 };
1301 
1302 
1303 static nxt_conf_map_t  nxt_router_app_processes_conf[] = {
1304     {
1305         nxt_string("spare"),
1306         NXT_CONF_MAP_INT32,
1307         offsetof(nxt_router_app_conf_t, spare_processes),
1308     },
1309 
1310     {
1311         nxt_string("max"),
1312         NXT_CONF_MAP_INT32,
1313         offsetof(nxt_router_app_conf_t, max_processes),
1314     },
1315 
1316     {
1317         nxt_string("idle_timeout"),
1318         NXT_CONF_MAP_MSEC,
1319         offsetof(nxt_router_app_conf_t, idle_timeout),
1320     },
1321 };
1322 
1323 
1324 static nxt_conf_map_t  nxt_router_listener_conf[] = {
1325     {
1326         nxt_string("pass"),
1327         NXT_CONF_MAP_STR_COPY,
1328         offsetof(nxt_router_listener_conf_t, pass),
1329     },
1330 
1331     {
1332         nxt_string("application"),
1333         NXT_CONF_MAP_STR_COPY,
1334         offsetof(nxt_router_listener_conf_t, application),
1335     },
1336 };
1337 
1338 
1339 static nxt_conf_map_t  nxt_router_http_conf[] = {
1340     {
1341         nxt_string("header_buffer_size"),
1342         NXT_CONF_MAP_SIZE,
1343         offsetof(nxt_socket_conf_t, header_buffer_size),
1344     },
1345 
1346     {
1347         nxt_string("large_header_buffer_size"),
1348         NXT_CONF_MAP_SIZE,
1349         offsetof(nxt_socket_conf_t, large_header_buffer_size),
1350     },
1351 
1352     {
1353         nxt_string("large_header_buffers"),
1354         NXT_CONF_MAP_SIZE,
1355         offsetof(nxt_socket_conf_t, large_header_buffers),
1356     },
1357 
1358     {
1359         nxt_string("body_buffer_size"),
1360         NXT_CONF_MAP_SIZE,
1361         offsetof(nxt_socket_conf_t, body_buffer_size),
1362     },
1363 
1364     {
1365         nxt_string("max_body_size"),
1366         NXT_CONF_MAP_SIZE,
1367         offsetof(nxt_socket_conf_t, max_body_size),
1368     },
1369 
1370     {
1371         nxt_string("idle_timeout"),
1372         NXT_CONF_MAP_MSEC,
1373         offsetof(nxt_socket_conf_t, idle_timeout),
1374     },
1375 
1376     {
1377         nxt_string("header_read_timeout"),
1378         NXT_CONF_MAP_MSEC,
1379         offsetof(nxt_socket_conf_t, header_read_timeout),
1380     },
1381 
1382     {
1383         nxt_string("body_read_timeout"),
1384         NXT_CONF_MAP_MSEC,
1385         offsetof(nxt_socket_conf_t, body_read_timeout),
1386     },
1387 
1388     {
1389         nxt_string("send_timeout"),
1390         NXT_CONF_MAP_MSEC,
1391         offsetof(nxt_socket_conf_t, send_timeout),
1392     },
1393 
1394     {
1395         nxt_string("body_temp_path"),
1396         NXT_CONF_MAP_STR,
1397         offsetof(nxt_socket_conf_t, body_temp_path),
1398     },
1399 
1400     {
1401         nxt_string("discard_unsafe_fields"),
1402         NXT_CONF_MAP_INT8,
1403         offsetof(nxt_socket_conf_t, discard_unsafe_fields),
1404     },
1405 };
1406 
1407 
1408 static nxt_conf_map_t  nxt_router_websocket_conf[] = {
1409     {
1410         nxt_string("max_frame_size"),
1411         NXT_CONF_MAP_SIZE,
1412         offsetof(nxt_websocket_conf_t, max_frame_size),
1413     },
1414 
1415     {
1416         nxt_string("read_timeout"),
1417         NXT_CONF_MAP_MSEC,
1418         offsetof(nxt_websocket_conf_t, read_timeout),
1419     },
1420 
1421     {
1422         nxt_string("keepalive_interval"),
1423         NXT_CONF_MAP_MSEC,
1424         offsetof(nxt_websocket_conf_t, keepalive_interval),
1425     },
1426 
1427 };
1428 
1429 
1430 static nxt_int_t
1431 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1432     u_char *start, u_char *end)
1433 {
1434     u_char                      *p;
1435     size_t                      size;
1436     nxt_mp_t                    *mp, *app_mp;
1437     uint32_t                    next, next_target;
1438     nxt_int_t                   ret;
1439     nxt_str_t                   name, path, target;
1440     nxt_app_t                   *app, *prev;
1441     nxt_str_t                   *t, *s, *targets;
1442     nxt_uint_t                  n, i;
1443     nxt_port_t                  *port;
1444     nxt_router_t                *router;
1445     nxt_app_joint_t             *app_joint;
1446 #if (NXT_TLS)
1447     nxt_tls_init_t              *tls_init;
1448     nxt_conf_value_t            *certificate;
1449 #endif
1450     nxt_conf_value_t            *conf, *http, *value, *websocket;
1451     nxt_conf_value_t            *applications, *application;
1452     nxt_conf_value_t            *listeners, *listener;
1453     nxt_conf_value_t            *routes_conf, *static_conf;
1454     nxt_socket_conf_t           *skcf;
1455     nxt_http_routes_t           *routes;
1456     nxt_event_engine_t          *engine;
1457     nxt_app_lang_module_t       *lang;
1458     nxt_router_app_conf_t       apcf;
1459     nxt_router_access_log_t     *access_log;
1460     nxt_router_listener_conf_t  lscf;
1461 
1462     static nxt_str_t  http_path = nxt_string("/settings/http");
1463     static nxt_str_t  applications_path = nxt_string("/applications");
1464     static nxt_str_t  listeners_path = nxt_string("/listeners");
1465     static nxt_str_t  routes_path = nxt_string("/routes");
1466     static nxt_str_t  access_log_path = nxt_string("/access_log");
1467 #if (NXT_TLS)
1468     static nxt_str_t  certificate_path = nxt_string("/tls/certificate");
1469     static nxt_str_t  conf_commands_path = nxt_string("/tls/conf_commands");
1470     static nxt_str_t  conf_cache_path = nxt_string("/tls/session/cache_size");
1471     static nxt_str_t  conf_timeout_path = nxt_string("/tls/session/timeout");
1472 #endif
1473     static nxt_str_t  static_path = nxt_string("/settings/http/static");
1474     static nxt_str_t  websocket_path = nxt_string("/settings/http/websocket");
1475 
1476     conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
1477     if (conf == NULL) {
1478         nxt_alert(task, "configuration parsing error");
1479         return NXT_ERROR;
1480     }
1481 
1482     mp = tmcf->router_conf->mem_pool;
1483 
1484     ret = nxt_conf_map_object(mp, conf, nxt_router_conf,
1485                               nxt_nitems(nxt_router_conf), tmcf->router_conf);
1486     if (ret != NXT_OK) {
1487         nxt_alert(task, "root map error");
1488         return NXT_ERROR;
1489     }
1490 
1491     if (tmcf->router_conf->threads == 0) {
1492         tmcf->router_conf->threads = nxt_ncpu;
1493     }
1494 
1495     static_conf = nxt_conf_get_path(conf, &static_path);
1496 
1497     ret = nxt_router_conf_process_static(task, tmcf->router_conf, static_conf);
1498     if (nxt_slow_path(ret != NXT_OK)) {
1499         return NXT_ERROR;
1500     }
1501 
1502     router = tmcf->router_conf->router;
1503 
1504     applications = nxt_conf_get_path(conf, &applications_path);
1505 
1506     if (applications != NULL) {
1507         next = 0;
1508 
1509         for ( ;; ) {
1510             application = nxt_conf_next_object_member(applications,
1511                                                       &name, &next);
1512             if (application == NULL) {
1513                 break;
1514             }
1515 
1516             nxt_debug(task, "application \"%V\"", &name);
1517 
1518             size = nxt_conf_json_length(application, NULL);
1519 
1520             app_mp = nxt_mp_create(4096, 128, 1024, 64);
1521             if (nxt_slow_path(app_mp == NULL)) {
1522                 goto fail;
1523             }
1524 
1525             app = nxt_mp_get(app_mp, sizeof(nxt_app_t) + name.length + size);
1526             if (app == NULL) {
1527                 goto app_fail;
1528             }
1529 
1530             nxt_memzero(app, sizeof(nxt_app_t));
1531 
1532             app->mem_pool = app_mp;
1533 
1534             app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t));
1535             app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t)
1536                                                   + name.length);
1537 
1538             p = nxt_conf_json_print(app->conf.start, application, NULL);
1539             app->conf.length = p - app->conf.start;
1540 
1541             nxt_assert(app->conf.length <= size);
1542 
1543             nxt_debug(task, "application conf \"%V\"", &app->conf);
1544 
1545             prev = nxt_router_app_find(&router->apps, &name);
1546 
1547             if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
1548                 nxt_mp_destroy(app_mp);
1549 
1550                 nxt_queue_remove(&prev->link);
1551                 nxt_queue_insert_tail(&tmcf->previous, &prev->link);
1552 
1553                 ret = nxt_router_apps_hash_add(tmcf->router_conf, prev);
1554                 if (nxt_slow_path(ret != NXT_OK)) {
1555                     goto fail;
1556                 }
1557 
1558                 continue;
1559             }
1560 
1561             apcf.processes = 1;
1562             apcf.max_processes = 1;
1563             apcf.spare_processes = 0;
1564             apcf.timeout = 0;
1565             apcf.idle_timeout = 15000;
1566             apcf.requests = 0;
1567             apcf.limits_value = NULL;
1568             apcf.processes_value = NULL;
1569             apcf.targets_value = NULL;
1570 
1571             app_joint = nxt_malloc(sizeof(nxt_app_joint_t));
1572             if (nxt_slow_path(app_joint == NULL)) {
1573                 goto app_fail;
1574             }
1575 
1576             nxt_memzero(app_joint, sizeof(nxt_app_joint_t));
1577 
1578             ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
1579                                       nxt_nitems(nxt_router_app_conf), &apcf);
1580             if (ret != NXT_OK) {
1581                 nxt_alert(task, "application map error");
1582                 goto app_fail;
1583             }
1584 
1585             if (apcf.limits_value != NULL) {
1586 
1587                 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) {
1588                     nxt_alert(task, "application limits is not object");
1589                     goto app_fail;
1590                 }
1591 
1592                 ret = nxt_conf_map_object(mp, apcf.limits_value,
1593                                         nxt_router_app_limits_conf,
1594                                         nxt_nitems(nxt_router_app_limits_conf),
1595                                         &apcf);
1596                 if (ret != NXT_OK) {
1597                     nxt_alert(task, "application limits map error");
1598                     goto app_fail;
1599                 }
1600             }
1601 
1602             if (apcf.processes_value != NULL
1603                 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT)
1604             {
1605                 ret = nxt_conf_map_object(mp, apcf.processes_value,
1606                                      nxt_router_app_processes_conf,
1607                                      nxt_nitems(nxt_router_app_processes_conf),
1608                                      &apcf);
1609                 if (ret != NXT_OK) {
1610                     nxt_alert(task, "application processes map error");
1611                     goto app_fail;
1612                 }
1613 
1614             } else {
1615                 apcf.max_processes = apcf.processes;
1616                 apcf.spare_processes = apcf.processes;
1617             }
1618 
1619             if (apcf.targets_value != NULL) {
1620                 n = nxt_conf_object_members_count(apcf.targets_value);
1621 
1622                 targets = nxt_mp_get(app_mp, sizeof(nxt_str_t) * n);
1623                 if (nxt_slow_path(targets == NULL)) {
1624                     goto app_fail;
1625                 }
1626 
1627                 next_target = 0;
1628 
1629                 for (i = 0; i < n; i++) {
1630                     (void) nxt_conf_next_object_member(apcf.targets_value,
1631                                                        &target, &next_target);
1632 
1633                     s = nxt_str_dup(app_mp, &targets[i], &target);
1634                     if (nxt_slow_path(s == NULL)) {
1635                         goto app_fail;
1636                     }
1637                 }
1638 
1639             } else {
1640                 targets = NULL;
1641             }
1642 
1643             nxt_debug(task, "application type: %V", &apcf.type);
1644             nxt_debug(task, "application processes: %D", apcf.processes);
1645             nxt_debug(task, "application request timeout: %M", apcf.timeout);
1646             nxt_debug(task, "application requests: %D", apcf.requests);
1647 
1648             lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
1649 
1650             if (lang == NULL) {
1651                 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type);
1652                 goto app_fail;
1653             }
1654 
1655             nxt_debug(task, "application language module: \"%s\"", lang->file);
1656 
1657             ret = nxt_thread_mutex_create(&app->mutex);
1658             if (ret != NXT_OK) {
1659                 goto app_fail;
1660             }
1661 
1662             nxt_queue_init(&app->ports);
1663             nxt_queue_init(&app->spare_ports);
1664             nxt_queue_init(&app->idle_ports);
1665             nxt_queue_init(&app->ack_waiting_req);
1666 
1667             app->name.length = name.length;
1668             nxt_memcpy(app->name.start, name.start, name.length);
1669 
1670             app->type = lang->type;
1671             app->max_processes = apcf.max_processes;
1672             app->spare_processes = apcf.spare_processes;
1673             app->max_pending_processes = apcf.spare_processes
1674                                          ? apcf.spare_processes : 1;
1675             app->timeout = apcf.timeout;
1676             app->idle_timeout = apcf.idle_timeout;
1677             app->max_requests = apcf.requests;
1678 
1679             app->targets = targets;
1680 
1681             engine = task->thread->engine;
1682 
1683             app->engine = engine;
1684 
1685             app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;
1686             app->adjust_idle_work.task = &engine->task;
1687             app->adjust_idle_work.obj = app;
1688 
1689             nxt_queue_insert_tail(&tmcf->apps, &app->link);
1690 
1691             ret = nxt_router_apps_hash_add(tmcf->router_conf, app);
1692             if (nxt_slow_path(ret != NXT_OK)) {
1693                 goto app_fail;
1694             }
1695 
1696             nxt_router_app_use(task, app, 1);
1697 
1698             app->joint = app_joint;
1699 
1700             app_joint->use_count = 1;
1701             app_joint->app = app;
1702 
1703             app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS;
1704             app_joint->idle_timer.work_queue = &engine->fast_work_queue;
1705             app_joint->idle_timer.handler = nxt_router_app_idle_timeout;
1706             app_joint->idle_timer.task = &engine->task;
1707             app_joint->idle_timer.log = app_joint->idle_timer.task->log;
1708 
1709             app_joint->free_app_work.handler = nxt_router_free_app;
1710             app_joint->free_app_work.task = &engine->task;
1711             app_joint->free_app_work.obj = app_joint;
1712 
1713             port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
1714                                 NXT_PROCESS_APP);
1715             if (nxt_slow_path(port == NULL)) {
1716                 return NXT_ERROR;
1717             }
1718 
1719             ret = nxt_port_socket_init(task, port, 0);
1720             if (nxt_slow_path(ret != NXT_OK)) {
1721                 nxt_port_use(task, port, -1);
1722                 return NXT_ERROR;
1723             }
1724 
1725             ret = nxt_router_app_queue_init(task, port);
1726             if (nxt_slow_path(ret != NXT_OK)) {
1727                 nxt_port_write_close(port);
1728                 nxt_port_read_close(port);
1729                 nxt_port_use(task, port, -1);
1730                 return NXT_ERROR;
1731             }
1732 
1733             nxt_port_write_enable(task, port);
1734             port->app = app;
1735 
1736             app->shared_port = port;
1737 
1738             nxt_thread_mutex_create(&app->outgoing.mutex);
1739         }
1740     }
1741 
1742     routes_conf = nxt_conf_get_path(conf, &routes_path);
1743     if (nxt_fast_path(routes_conf != NULL)) {
1744         routes = nxt_http_routes_create(task, tmcf, routes_conf);
1745         if (nxt_slow_path(routes == NULL)) {
1746             return NXT_ERROR;
1747         }
1748         tmcf->router_conf->routes = routes;
1749     }
1750 
1751     ret = nxt_upstreams_create(task, tmcf, conf);
1752     if (nxt_slow_path(ret != NXT_OK)) {
1753         return ret;
1754     }
1755 
1756     http = nxt_conf_get_path(conf, &http_path);
1757 #if 0
1758     if (http == NULL) {
1759         nxt_alert(task, "no \"http\" block");
1760         return NXT_ERROR;
1761     }
1762 #endif
1763 
1764     websocket = nxt_conf_get_path(conf, &websocket_path);
1765 
1766     listeners = nxt_conf_get_path(conf, &listeners_path);
1767 
1768     if (listeners != NULL) {
1769         next = 0;
1770 
1771         for ( ;; ) {
1772             listener = nxt_conf_next_object_member(listeners, &name, &next);
1773             if (listener == NULL) {
1774                 break;
1775             }
1776 
1777             skcf = nxt_router_socket_conf(task, tmcf, &name);
1778             if (skcf == NULL) {
1779                 goto fail;
1780             }
1781 
1782             nxt_memzero(&lscf, sizeof(lscf));
1783 
1784             ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
1785                                       nxt_nitems(nxt_router_listener_conf),
1786                                       &lscf);
1787             if (ret != NXT_OK) {
1788                 nxt_alert(task, "listener map error");
1789                 goto fail;
1790             }
1791 
1792             nxt_debug(task, "application: %V", &lscf.application);
1793 
1794             // STUB, default values if http block is not defined.
1795             skcf->header_buffer_size = 2048;
1796             skcf->large_header_buffer_size = 8192;
1797             skcf->large_header_buffers = 4;
1798             skcf->discard_unsafe_fields = 1;
1799             skcf->body_buffer_size = 16 * 1024;
1800             skcf->max_body_size = 8 * 1024 * 1024;
1801             skcf->proxy_header_buffer_size = 64 * 1024;
1802             skcf->proxy_buffer_size = 4096;
1803             skcf->proxy_buffers = 256;
1804             skcf->idle_timeout = 180 * 1000;
1805             skcf->header_read_timeout = 30 * 1000;
1806             skcf->body_read_timeout = 30 * 1000;
1807             skcf->send_timeout = 30 * 1000;
1808             skcf->proxy_timeout = 60 * 1000;
1809             skcf->proxy_send_timeout = 30 * 1000;
1810             skcf->proxy_read_timeout = 30 * 1000;
1811 
1812             skcf->websocket_conf.max_frame_size = 1024 * 1024;
1813             skcf->websocket_conf.read_timeout = 60 * 1000;
1814             skcf->websocket_conf.keepalive_interval = 30 * 1000;
1815 
1816             nxt_str_null(&skcf->body_temp_path);
1817 
1818             if (http != NULL) {
1819                 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
1820                                           nxt_nitems(nxt_router_http_conf),
1821                                           skcf);
1822                 if (ret != NXT_OK) {
1823                     nxt_alert(task, "http map error");
1824                     goto fail;
1825                 }
1826             }
1827 
1828             if (websocket != NULL) {
1829                 ret = nxt_conf_map_object(mp, websocket,
1830                                           nxt_router_websocket_conf,
1831                                           nxt_nitems(nxt_router_websocket_conf),
1832                                           &skcf->websocket_conf);
1833                 if (ret != NXT_OK) {
1834                     nxt_alert(task, "websocket map error");
1835                     goto fail;
1836                 }
1837             }
1838 
1839             t = &skcf->body_temp_path;
1840 
1841             if (t->length == 0) {
1842                 t->start = (u_char *) task->thread->runtime->tmp;
1843                 t->length = nxt_strlen(t->start);
1844             }
1845 
1846 #if (NXT_TLS)
1847             certificate = nxt_conf_get_path(listener, &certificate_path);
1848 
1849             if (certificate != NULL) {
1850                 tls_init = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_tls_init_t));
1851                 if (nxt_slow_path(tls_init == NULL)) {
1852                     return NXT_ERROR;
1853                 }
1854 
1855                 tls_init->cache_size = 0;
1856                 tls_init->timeout = 300;
1857 
1858                 value = nxt_conf_get_path(listener, &conf_cache_path);
1859                 if (value != NULL) {
1860                     tls_init->cache_size = nxt_conf_get_number(value);
1861                 }
1862 
1863                 value = nxt_conf_get_path(listener, &conf_timeout_path);
1864                 if (value != NULL) {
1865                     tls_init->timeout = nxt_conf_get_number(value);
1866                 }
1867 
1868                 tls_init->conf_cmds = nxt_conf_get_path(listener,
1869                                                         &conf_commands_path);
1870 
1871                 if (nxt_conf_type(certificate) == NXT_CONF_ARRAY) {
1872                     n = nxt_conf_array_elements_count(certificate);
1873 
1874                     for (i = 0; i < n; i++) {
1875                         value = nxt_conf_get_array_element(certificate, i);
1876 
1877                         nxt_assert(value != NULL);
1878 
1879                         ret = nxt_router_conf_tls_insert(tmcf, value, skcf,
1880                                                          tls_init, i == 0);
1881                         if (nxt_slow_path(ret != NXT_OK)) {
1882                             goto fail;
1883                         }
1884                     }
1885 
1886                 } else {
1887                     /* NXT_CONF_STRING */
1888                     ret = nxt_router_conf_tls_insert(tmcf, certificate, skcf,
1889                                                      tls_init, 1);
1890                     if (nxt_slow_path(ret != NXT_OK)) {
1891                         goto fail;
1892                     }
1893                 }
1894             }
1895 #endif
1896 
1897             skcf->listen->handler = nxt_http_conn_init;
1898             skcf->router_conf = tmcf->router_conf;
1899             skcf->router_conf->count++;
1900 
1901             if (lscf.pass.length != 0) {
1902                 skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass);
1903 
1904             /* COMPATIBILITY: listener application. */
1905             } else if (lscf.application.length > 0) {
1906                 skcf->action = nxt_http_pass_application(task,
1907                                                          tmcf->router_conf,
1908                                                          &lscf.application);
1909             }
1910 
1911             if (nxt_slow_path(skcf->action == NULL)) {
1912                 goto fail;
1913             }
1914         }
1915     }
1916 
1917     ret = nxt_http_routes_resolve(task, tmcf);
1918     if (nxt_slow_path(ret != NXT_OK)) {
1919         goto fail;
1920     }
1921 
1922     value = nxt_conf_get_path(conf, &access_log_path);
1923 
1924     if (value != NULL) {
1925         nxt_conf_get_string(value, &path);
1926 
1927         access_log = router->access_log;
1928 
1929         if (access_log != NULL && nxt_strstr_eq(&path, &access_log->path)) {
1930             nxt_thread_spin_lock(&router->lock);
1931             access_log->count++;
1932             nxt_thread_spin_unlock(&router->lock);
1933 
1934         } else {
1935             access_log = nxt_malloc(sizeof(nxt_router_access_log_t)
1936                                     + path.length);
1937             if (access_log == NULL) {
1938                 nxt_alert(task, "failed to allocate access log structure");
1939                 goto fail;
1940             }
1941 
1942             access_log->fd = -1;
1943             access_log->handler = &nxt_router_access_log_writer;
1944             access_log->count = 1;
1945 
1946             access_log->path.length = path.length;
1947             access_log->path.start = (u_char *) access_log
1948                                      + sizeof(nxt_router_access_log_t);
1949 
1950             nxt_memcpy(access_log->path.start, path.start, path.length);
1951         }
1952 
1953         tmcf->router_conf->access_log = access_log;
1954     }
1955 
1956     nxt_queue_add(&deleting_sockets, &router->sockets);
1957     nxt_queue_init(&router->sockets);
1958 
1959     return NXT_OK;
1960 
1961 app_fail:
1962 
1963     nxt_mp_destroy(app_mp);
1964 
1965 fail:
1966 
1967     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1968 
1969         nxt_queue_remove(&app->link);
1970         nxt_thread_mutex_destroy(&app->mutex);
1971         nxt_mp_destroy(app->mem_pool);
1972 
1973     } nxt_queue_loop;
1974 
1975     return NXT_ERROR;
1976 }
1977 
1978 
1979 #if (NXT_TLS)
1980 
1981 static nxt_int_t
1982 nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
1983     nxt_conf_value_t *value, nxt_socket_conf_t *skcf,
1984     nxt_tls_init_t *tls_init, nxt_bool_t last)
1985 {
1986     nxt_router_tlssock_t  *tls;
1987 
1988     tls = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_router_tlssock_t));
1989     if (nxt_slow_path(tls == NULL)) {
1990         return NXT_ERROR;
1991     }
1992 
1993     tls->tls_init = tls_init;
1994     tls->socket_conf = skcf;
1995     tls->temp_conf = tmcf;
1996     tls->last = last;
1997     nxt_conf_get_string(value, &tls->name);
1998 
1999     nxt_queue_insert_tail(&tmcf->tls, &tls->link);
2000 
2001     return NXT_OK;
2002 }
2003 
2004 #endif
2005 
2006 
2007 static nxt_int_t
2008 nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf,
2009     nxt_conf_value_t *conf)
2010 {
2011     uint32_t          next, i;
2012     nxt_mp_t          *mp;
2013     nxt_str_t         *type, exten, str;
2014     nxt_int_t         ret;
2015     nxt_uint_t        exts;
2016     nxt_conf_value_t  *mtypes_conf, *ext_conf, *value;
2017 
2018     static nxt_str_t  mtypes_path = nxt_string("/mime_types");
2019 
2020     mp = rtcf->mem_pool;
2021 
2022     ret = nxt_http_static_mtypes_init(mp, &rtcf->mtypes_hash);
2023     if (nxt_slow_path(ret != NXT_OK)) {
2024         return NXT_ERROR;
2025     }
2026 
2027     if (conf == NULL) {
2028         return NXT_OK;
2029     }
2030 
2031     mtypes_conf = nxt_conf_get_path(conf, &mtypes_path);
2032 
2033     if (mtypes_conf != NULL) {
2034         next = 0;
2035 
2036         for ( ;; ) {
2037             ext_conf = nxt_conf_next_object_member(mtypes_conf, &str, &next);
2038 
2039             if (ext_conf == NULL) {
2040                 break;
2041             }
2042 
2043             type = nxt_str_dup(mp, NULL, &str);
2044             if (nxt_slow_path(type == NULL)) {
2045                 return NXT_ERROR;
2046             }
2047 
2048             if (nxt_conf_type(ext_conf) == NXT_CONF_STRING) {
2049                 nxt_conf_get_string(ext_conf, &str);
2050 
2051                 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2052                     return NXT_ERROR;
2053                 }
2054 
2055                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2056                                                       &exten, type);
2057                 if (nxt_slow_path(ret != NXT_OK)) {
2058                     return NXT_ERROR;
2059                 }
2060 
2061                 continue;
2062             }
2063 
2064             exts = nxt_conf_array_elements_count(ext_conf);
2065 
2066             for (i = 0; i < exts; i++) {
2067                 value = nxt_conf_get_array_element(ext_conf, i);
2068 
2069                 nxt_conf_get_string(value, &str);
2070 
2071                 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2072                     return NXT_ERROR;
2073                 }
2074 
2075                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2076                                                       &exten, type);
2077                 if (nxt_slow_path(ret != NXT_OK)) {
2078                     return NXT_ERROR;
2079                 }
2080             }
2081         }
2082     }
2083 
2084     return NXT_OK;
2085 }
2086 
2087 
2088 static nxt_app_t *
2089 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
2090 {
2091     nxt_app_t  *app;
2092 
2093     nxt_queue_each(app, queue, nxt_app_t, link) {
2094 
2095         if (nxt_strstr_eq(name, &app->name)) {
2096             return app;
2097         }
2098 
2099     } nxt_queue_loop;
2100 
2101     return NULL;
2102 }
2103 
2104 
2105 static nxt_int_t
2106 nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port)
2107 {
2108     void       *mem;
2109     nxt_int_t  fd;
2110 
2111     fd = nxt_shm_open(task, sizeof(nxt_app_queue_t));
2112     if (nxt_slow_path(fd == -1)) {
2113         return NXT_ERROR;
2114     }
2115 
2116     mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t),
2117                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2118     if (nxt_slow_path(mem == MAP_FAILED)) {
2119         nxt_fd_close(fd);
2120 
2121         return NXT_ERROR;
2122     }
2123 
2124     nxt_app_queue_init(mem);
2125 
2126     port->queue_fd = fd;
2127     port->queue = mem;
2128 
2129     return NXT_OK;
2130 }
2131 
2132 
2133 static nxt_int_t
2134 nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port)
2135 {
2136     void       *mem;
2137     nxt_int_t  fd;
2138 
2139     fd = nxt_shm_open(task, sizeof(nxt_port_queue_t));
2140     if (nxt_slow_path(fd == -1)) {
2141         return NXT_ERROR;
2142     }
2143 
2144     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2145                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2146     if (nxt_slow_path(mem == MAP_FAILED)) {
2147         nxt_fd_close(fd);
2148 
2149         return NXT_ERROR;
2150     }
2151 
2152     nxt_port_queue_init(mem);
2153 
2154     port->queue_fd = fd;
2155     port->queue = mem;
2156 
2157     return NXT_OK;
2158 }
2159 
2160 
2161 static nxt_int_t
2162 nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd)
2163 {
2164     void  *mem;
2165 
2166     nxt_assert(fd != -1);
2167 
2168     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2169                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2170     if (nxt_slow_path(mem == MAP_FAILED)) {
2171 
2172         return NXT_ERROR;
2173     }
2174 
2175     port->queue = mem;
2176 
2177     return NXT_OK;
2178 }
2179 
2180 
2181 static const nxt_lvlhsh_proto_t  nxt_router_apps_hash_proto  nxt_aligned(64) = {
2182     NXT_LVLHSH_DEFAULT,
2183     nxt_router_apps_hash_test,
2184     nxt_mp_lvlhsh_alloc,
2185     nxt_mp_lvlhsh_free,
2186 };
2187 
2188 
2189 static nxt_int_t
2190 nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
2191 {
2192     nxt_app_t  *app;
2193 
2194     app = data;
2195 
2196     return nxt_strstr_eq(&lhq->key, &app->name) ? NXT_OK : NXT_DECLINED;
2197 }
2198 
2199 
2200 static nxt_int_t
2201 nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, nxt_app_t *app)
2202 {
2203     nxt_lvlhsh_query_t  lhq;
2204 
2205     lhq.key_hash = nxt_djb_hash(app->name.start, app->name.length);
2206     lhq.replace = 0;
2207     lhq.key = app->name;
2208     lhq.value = app;
2209     lhq.proto = &nxt_router_apps_hash_proto;
2210     lhq.pool = rtcf->mem_pool;
2211 
2212     switch (nxt_lvlhsh_insert(&rtcf->apps_hash, &lhq)) {
2213 
2214     case NXT_OK:
2215         return NXT_OK;
2216 
2217     case NXT_DECLINED:
2218         nxt_thread_log_alert("router app hash adding failed: "
2219                              "\"%V\" is already in hash", &lhq.key);
2220         /* Fall through. */
2221     default:
2222         return NXT_ERROR;
2223     }
2224 }
2225 
2226 
2227 static nxt_app_t *
2228 nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, nxt_str_t *name)
2229 {
2230     nxt_lvlhsh_query_t  lhq;
2231 
2232     lhq.key_hash = nxt_djb_hash(name->start, name->length);
2233     lhq.key = *name;
2234     lhq.proto = &nxt_router_apps_hash_proto;
2235 
2236     if (nxt_lvlhsh_find(&rtcf->apps_hash, &lhq) != NXT_OK) {
2237         return NULL;
2238     }
2239 
2240     return lhq.value;
2241 }
2242 
2243 
2244 static void
2245 nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i)
2246 {
2247     nxt_app_t          *app;
2248     nxt_lvlhsh_each_t  lhe;
2249 
2250     nxt_lvlhsh_each_init(&lhe, &nxt_router_apps_hash_proto);
2251 
2252     for ( ;; ) {
2253         app = nxt_lvlhsh_each(&rtcf->apps_hash, &lhe);
2254 
2255         if (app == NULL) {
2256             break;
2257         }
2258 
2259         nxt_router_app_use(task, app, i);
2260     }
2261 }
2262 
2263 
2264 typedef struct {
2265     nxt_app_t  *app;
2266     nxt_int_t  target;
2267 } nxt_http_app_conf_t;
2268 
2269 
2270 nxt_int_t
2271 nxt_router_application_init(nxt_router_conf_t *rtcf, nxt_str_t *name,
2272     nxt_str_t *target, nxt_http_action_t *action)
2273 {
2274     nxt_app_t            *app;
2275     nxt_str_t            *targets;
2276     nxt_uint_t           i;
2277     nxt_http_app_conf_t  *conf;
2278 
2279     app = nxt_router_apps_hash_get(rtcf, name);
2280     if (app == NULL) {
2281         return NXT_DECLINED;
2282     }
2283 
2284     conf = nxt_mp_get(rtcf->mem_pool, sizeof(nxt_http_app_conf_t));
2285     if (nxt_slow_path(conf == NULL)) {
2286         return NXT_ERROR;
2287     }
2288 
2289     action->handler = nxt_http_application_handler;
2290     action->u.conf = conf;
2291 
2292     conf->app = app;
2293 
2294     if (target != NULL && target->length != 0) {
2295         targets = app->targets;
2296 
2297         for (i = 0; !nxt_strstr_eq(target, &targets[i]); i++);
2298 
2299         conf->target = i;
2300 
2301     } else {
2302         conf->target = 0;
2303     }
2304 
2305     return NXT_OK;
2306 }
2307 
2308 
2309 static nxt_socket_conf_t *
2310 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2311     nxt_str_t *name)
2312 {
2313     size_t               size;
2314     nxt_int_t            ret;
2315     nxt_bool_t           wildcard;
2316     nxt_sockaddr_t       *sa;
2317     nxt_socket_conf_t    *skcf;
2318     nxt_listen_socket_t  *ls;
2319 
2320     sa = nxt_sockaddr_parse(tmcf->mem_pool, name);
2321     if (nxt_slow_path(sa == NULL)) {
2322         nxt_alert(task, "invalid listener \"%V\"", name);
2323         return NULL;
2324     }
2325 
2326     sa->type = SOCK_STREAM;
2327 
2328     nxt_debug(task, "router listener: \"%*s\"",
2329               (size_t) sa->length, nxt_sockaddr_start(sa));
2330 
2331     skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t));
2332     if (nxt_slow_path(skcf == NULL)) {
2333         return NULL;
2334     }
2335 
2336     size = nxt_sockaddr_size(sa);
2337 
2338     ret = nxt_router_listen_socket_find(tmcf, skcf, sa);
2339 
2340     if (ret != NXT_OK) {
2341 
2342         ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size);
2343         if (nxt_slow_path(ls == NULL)) {
2344             return NULL;
2345         }
2346 
2347         skcf->listen = ls;
2348 
2349         ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t));
2350         nxt_memcpy(ls->sockaddr, sa, size);
2351 
2352         nxt_listen_socket_remote_size(ls);
2353 
2354         ls->socket = -1;
2355         ls->backlog = NXT_LISTEN_BACKLOG;
2356         ls->flags = NXT_NONBLOCK;
2357         ls->read_after_accept = 1;
2358     }
2359 
2360     switch (sa->u.sockaddr.sa_family) {
2361 #if (NXT_HAVE_UNIX_DOMAIN)
2362     case AF_UNIX:
2363         wildcard = 0;
2364         break;
2365 #endif
2366 #if (NXT_INET6)
2367     case AF_INET6:
2368         wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr);
2369         break;
2370 #endif
2371     case AF_INET:
2372     default:
2373         wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY);
2374         break;
2375     }
2376 
2377     if (!wildcard) {
2378         skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size);
2379         if (nxt_slow_path(skcf->sockaddr == NULL)) {
2380             return NULL;
2381         }
2382 
2383         nxt_memcpy(skcf->sockaddr, sa, size);
2384     }
2385 
2386     return skcf;
2387 }
2388 
2389 
2390 static nxt_int_t
2391 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
2392     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa)
2393 {
2394     nxt_router_t       *router;
2395     nxt_queue_link_t   *qlk;
2396     nxt_socket_conf_t  *skcf;
2397 
2398     router = tmcf->router_conf->router;
2399 
2400     for (qlk = nxt_queue_first(&router->sockets);
2401          qlk != nxt_queue_tail(&router->sockets);
2402          qlk = nxt_queue_next(qlk))
2403     {
2404         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2405 
2406         if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) {
2407             nskcf->listen = skcf->listen;
2408 
2409             nxt_queue_remove(qlk);
2410             nxt_queue_insert_tail(&keeping_sockets, qlk);
2411 
2412             nxt_queue_insert_tail(&updating_sockets, &nskcf->link);
2413 
2414             return NXT_OK;
2415         }
2416     }
2417 
2418     nxt_queue_insert_tail(&pending_sockets, &nskcf->link);
2419 
2420     return NXT_DECLINED;
2421 }
2422 
2423 
2424 static void
2425 nxt_router_listen_socket_rpc_create(nxt_task_t *task,
2426     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)
2427 {
2428     size_t            size;
2429     uint32_t          stream;
2430     nxt_int_t         ret;
2431     nxt_buf_t         *b;
2432     nxt_port_t        *main_port, *router_port;
2433     nxt_runtime_t     *rt;
2434     nxt_socket_rpc_t  *rpc;
2435 
2436     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
2437     if (rpc == NULL) {
2438         goto fail;
2439     }
2440 
2441     rpc->socket_conf = skcf;
2442     rpc->temp_conf = tmcf;
2443 
2444     size = nxt_sockaddr_size(skcf->listen->sockaddr);
2445 
2446     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2447     if (b == NULL) {
2448         goto fail;
2449     }
2450 
2451     b->completion_handler = nxt_router_dummy_buf_completion;
2452 
2453     b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size);
2454 
2455     rt = task->thread->runtime;
2456     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2457     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2458 
2459     stream = nxt_port_rpc_register_handler(task, router_port,
2460                                            nxt_router_listen_socket_ready,
2461                                            nxt_router_listen_socket_error,
2462                                            main_port->pid, rpc);
2463     if (nxt_slow_path(stream == 0)) {
2464         goto fail;
2465     }
2466 
2467     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1,
2468                                 stream, router_port->id, b);
2469 
2470     if (nxt_slow_path(ret != NXT_OK)) {
2471         nxt_port_rpc_cancel(task, router_port, stream);
2472         goto fail;
2473     }
2474 
2475     return;
2476 
2477 fail:
2478 
2479     nxt_router_conf_error(task, tmcf);
2480 }
2481 
2482 
2483 static void
2484 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2485     void *data)
2486 {
2487     nxt_int_t         ret;
2488     nxt_socket_t      s;
2489     nxt_socket_rpc_t  *rpc;
2490 
2491     rpc = data;
2492 
2493     s = msg->fd[0];
2494 
2495     ret = nxt_socket_nonblocking(task, s);
2496     if (nxt_slow_path(ret != NXT_OK)) {
2497         goto fail;
2498     }
2499 
2500     nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr);
2501 
2502     ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
2503     if (nxt_slow_path(ret != NXT_OK)) {
2504         goto fail;
2505     }
2506 
2507     rpc->socket_conf->listen->socket = s;
2508 
2509     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2510                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2511 
2512     return;
2513 
2514 fail:
2515 
2516     nxt_socket_close(task, s);
2517 
2518     nxt_router_conf_error(task, rpc->temp_conf);
2519 }
2520 
2521 
2522 static void
2523 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2524     void *data)
2525 {
2526     nxt_socket_rpc_t        *rpc;
2527     nxt_router_temp_conf_t  *tmcf;
2528 
2529     rpc = data;
2530     tmcf = rpc->temp_conf;
2531 
2532 #if 0
2533     u_char                  *p;
2534     size_t                  size;
2535     uint8_t                 error;
2536     nxt_buf_t               *in, *out;
2537     nxt_sockaddr_t          *sa;
2538 
2539     static nxt_str_t  socket_errors[] = {
2540         nxt_string("ListenerSystem"),
2541         nxt_string("ListenerNoIPv6"),
2542         nxt_string("ListenerPort"),
2543         nxt_string("ListenerInUse"),
2544         nxt_string("ListenerNoAddress"),
2545         nxt_string("ListenerNoAccess"),
2546         nxt_string("ListenerPath"),
2547     };
2548 
2549     sa = rpc->socket_conf->listen->sockaddr;
2550 
2551     in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size);
2552 
2553     if (nxt_slow_path(in == NULL)) {
2554         return;
2555     }
2556 
2557     p = in->mem.pos;
2558 
2559     error = *p++;
2560 
2561     size = nxt_length("listen socket error: ")
2562            + nxt_length("{listener: \"\", code:\"\", message: \"\"}")
2563            + sa->length + socket_errors[error].length + (in->mem.free - p);
2564 
2565     out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2566     if (nxt_slow_path(out == NULL)) {
2567         return;
2568     }
2569 
2570     out->mem.free = nxt_sprintf(out->mem.free, out->mem.end,
2571                         "listen socket error: "
2572                         "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}",
2573                         (size_t) sa->length, nxt_sockaddr_start(sa),
2574                         &socket_errors[error], in->mem.free - p, p);
2575 
2576     nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos);
2577 #endif
2578 
2579     nxt_router_conf_error(task, tmcf);
2580 }
2581 
2582 
2583 #if (NXT_TLS)
2584 
2585 static void
2586 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2587     void *data)
2588 {
2589     nxt_mp_t                *mp;
2590     nxt_int_t               ret;
2591     nxt_tls_conf_t          *tlscf;
2592     nxt_router_tlssock_t    *tls;
2593     nxt_tls_bundle_conf_t   *bundle;
2594     nxt_router_temp_conf_t  *tmcf;
2595 
2596     nxt_debug(task, "tls rpc handler");
2597 
2598     tls = data;
2599     tmcf = tls->temp_conf;
2600 
2601     if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) {
2602         goto fail;
2603     }
2604 
2605     mp = tmcf->router_conf->mem_pool;
2606 
2607     if (tls->socket_conf->tls == NULL){
2608         tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t));
2609         if (nxt_slow_path(tlscf == NULL)) {
2610             goto fail;
2611         }
2612 
2613         tlscf->no_wait_shutdown = 1;
2614         tls->socket_conf->tls = tlscf;
2615 
2616     } else {
2617         tlscf = tls->socket_conf->tls;
2618     }
2619 
2620     tls->tls_init->conf = tlscf;
2621 
2622     bundle = nxt_mp_get(mp, sizeof(nxt_tls_bundle_conf_t));
2623     if (nxt_slow_path(bundle == NULL)) {
2624         goto fail;
2625     }
2626 
2627     if (nxt_slow_path(nxt_str_dup(mp, &bundle->name, &tls->name) == NULL)) {
2628         goto fail;
2629     }
2630 
2631     bundle->chain_file = msg->fd[0];
2632     bundle->next = tlscf->bundle;
2633     tlscf->bundle = bundle;
2634 
2635     ret = task->thread->runtime->tls->server_init(task, mp, tls->tls_init,
2636                                                   tls->last);
2637     if (nxt_slow_path(ret != NXT_OK)) {
2638         goto fail;
2639     }
2640 
2641     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2642                        nxt_router_conf_apply, task, tmcf, NULL);
2643     return;
2644 
2645 fail:
2646 
2647     nxt_router_conf_error(task, tmcf);
2648 }
2649 
2650 #endif
2651 
2652 
2653 static void
2654 nxt_router_app_rpc_create(nxt_task_t *task,
2655     nxt_router_temp_conf_t *tmcf, nxt_app_t *app)
2656 {
2657     size_t         size;
2658     uint32_t       stream;
2659     nxt_int_t      ret;
2660     nxt_buf_t      *b;
2661     nxt_port_t     *main_port, *router_port;
2662     nxt_runtime_t  *rt;
2663     nxt_app_rpc_t  *rpc;
2664 
2665     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_app_rpc_t));
2666     if (rpc == NULL) {
2667         goto fail;
2668     }
2669 
2670     rpc->app = app;
2671     rpc->temp_conf = tmcf;
2672 
2673     nxt_debug(task, "app '%V' prefork", &app->name);
2674 
2675     size = app->name.length + 1 + app->conf.length;
2676 
2677     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2678     if (nxt_slow_path(b == NULL)) {
2679         goto fail;
2680     }
2681 
2682     b->completion_handler = nxt_router_dummy_buf_completion;
2683 
2684     nxt_buf_cpystr(b, &app->name);
2685     *b->mem.free++ = '\0';
2686     nxt_buf_cpystr(b, &app->conf);
2687 
2688     rt = task->thread->runtime;
2689     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2690     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2691 
2692     stream = nxt_port_rpc_register_handler(task, router_port,
2693                                            nxt_router_app_prefork_ready,
2694                                            nxt_router_app_prefork_error,
2695                                            -1, rpc);
2696     if (nxt_slow_path(stream == 0)) {
2697         goto fail;
2698     }
2699 
2700     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS,
2701                                 -1, stream, router_port->id, b);
2702 
2703     if (nxt_slow_path(ret != NXT_OK)) {
2704         nxt_port_rpc_cancel(task, router_port, stream);
2705         goto fail;
2706     }
2707 
2708     app->pending_processes++;
2709 
2710     return;
2711 
2712 fail:
2713 
2714     nxt_router_conf_error(task, tmcf);
2715 }
2716 
2717 
2718 static void
2719 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2720     void *data)
2721 {
2722     nxt_app_t           *app;
2723     nxt_port_t          *port;
2724     nxt_app_rpc_t       *rpc;
2725     nxt_event_engine_t  *engine;
2726 
2727     rpc = data;
2728     app = rpc->app;
2729 
2730     port = msg->u.new_port;
2731 
2732     nxt_assert(port != NULL);
2733     nxt_assert(port->type == NXT_PROCESS_APP);
2734     nxt_assert(port->id == 0);
2735 
2736     port->app = app;
2737     port->main_app_port = port;
2738 
2739     app->pending_processes--;
2740     app->processes++;
2741     app->idle_processes++;
2742 
2743     engine = task->thread->engine;
2744 
2745     nxt_queue_insert_tail(&app->ports, &port->app_link);
2746     nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
2747 
2748     nxt_debug(task, "app '%V' move new port %PI:%d to spare_ports",
2749               &app->name, port->pid, port->id);
2750 
2751     nxt_port_hash_add(&app->port_hash, port);
2752     app->port_hash_count++;
2753 
2754     port->idle_start = 0;
2755 
2756     nxt_port_inc_use(port);
2757 
2758     nxt_router_app_shared_port_send(task, port);
2759 
2760     nxt_work_queue_add(&engine->fast_work_queue,
2761                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2762 }
2763 
2764 
2765 static void
2766 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2767     void *data)
2768 {
2769     nxt_app_t               *app;
2770     nxt_app_rpc_t           *rpc;
2771     nxt_router_temp_conf_t  *tmcf;
2772 
2773     rpc = data;
2774     app = rpc->app;
2775     tmcf = rpc->temp_conf;
2776 
2777     nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"",
2778             &app->name);
2779 
2780     app->pending_processes--;
2781 
2782     nxt_router_conf_error(task, tmcf);
2783 }
2784 
2785 
2786 static nxt_int_t
2787 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
2788     nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
2789 {
2790     nxt_int_t                 ret;
2791     nxt_uint_t                n, threads;
2792     nxt_queue_link_t          *qlk;
2793     nxt_router_engine_conf_t  *recf;
2794 
2795     threads = tmcf->router_conf->threads;
2796 
2797     tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
2798                                      sizeof(nxt_router_engine_conf_t));
2799     if (nxt_slow_path(tmcf->engines == NULL)) {
2800         return NXT_ERROR;
2801     }
2802 
2803     n = 0;
2804 
2805     for (qlk = nxt_queue_first(&router->engines);
2806          qlk != nxt_queue_tail(&router->engines);
2807          qlk = nxt_queue_next(qlk))
2808     {
2809         recf = nxt_array_zero_add(tmcf->engines);
2810         if (nxt_slow_path(recf == NULL)) {
2811             return NXT_ERROR;
2812         }
2813 
2814         recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0);
2815 
2816         if (n < threads) {
2817             recf->action = NXT_ROUTER_ENGINE_KEEP;
2818             ret = nxt_router_engine_conf_update(tmcf, recf);
2819 
2820         } else {
2821             recf->action = NXT_ROUTER_ENGINE_DELETE;
2822             ret = nxt_router_engine_conf_delete(tmcf, recf);
2823         }
2824 
2825         if (nxt_slow_path(ret != NXT_OK)) {
2826             return ret;
2827         }
2828 
2829         n++;
2830     }
2831 
2832     tmcf->new_threads = n;
2833 
2834     while (n < threads) {
2835         recf = nxt_array_zero_add(tmcf->engines);
2836         if (nxt_slow_path(recf == NULL)) {
2837             return NXT_ERROR;
2838         }
2839 
2840         recf->action = NXT_ROUTER_ENGINE_ADD;
2841 
2842         recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
2843         if (nxt_slow_path(recf->engine == NULL)) {
2844             return NXT_ERROR;
2845         }
2846 
2847         ret = nxt_router_engine_conf_create(tmcf, recf);
2848         if (nxt_slow_path(ret != NXT_OK)) {
2849             return ret;
2850         }
2851 
2852         n++;
2853     }
2854 
2855     return NXT_OK;
2856 }
2857 
2858 
2859 static nxt_int_t
2860 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
2861     nxt_router_engine_conf_t *recf)
2862 {
2863     nxt_int_t  ret;
2864 
2865     ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
2866                                           nxt_router_listen_socket_create);
2867     if (nxt_slow_path(ret != NXT_OK)) {
2868         return ret;
2869     }
2870 
2871     ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
2872                                           nxt_router_listen_socket_create);
2873     if (nxt_slow_path(ret != NXT_OK)) {
2874         return ret;
2875     }
2876 
2877     return ret;
2878 }
2879 
2880 
2881 static nxt_int_t
2882 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
2883     nxt_router_engine_conf_t *recf)
2884 {
2885     nxt_int_t  ret;
2886 
2887     ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
2888                                           nxt_router_listen_socket_create);
2889     if (nxt_slow_path(ret != NXT_OK)) {
2890         return ret;
2891     }
2892 
2893     ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
2894                                           nxt_router_listen_socket_update);
2895     if (nxt_slow_path(ret != NXT_OK)) {
2896         return ret;
2897     }
2898 
2899     ret = nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
2900     if (nxt_slow_path(ret != NXT_OK)) {
2901         return ret;
2902     }
2903 
2904     return ret;
2905 }
2906 
2907 
2908 static nxt_int_t
2909 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
2910     nxt_router_engine_conf_t *recf)
2911 {
2912     nxt_int_t  ret;
2913 
2914     ret = nxt_router_engine_quit(tmcf, recf);
2915     if (nxt_slow_path(ret != NXT_OK)) {
2916         return ret;
2917     }
2918 
2919     ret = nxt_router_engine_joints_delete(tmcf, recf, &updating_sockets);
2920     if (nxt_slow_path(ret != NXT_OK)) {
2921         return ret;
2922     }
2923 
2924     return nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
2925 }
2926 
2927 
2928 static nxt_int_t
2929 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
2930     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
2931     nxt_work_handler_t handler)
2932 {
2933     nxt_int_t                ret;
2934     nxt_joint_job_t          *job;
2935     nxt_queue_link_t         *qlk;
2936     nxt_socket_conf_t        *skcf;
2937     nxt_socket_conf_joint_t  *joint;
2938 
2939     for (qlk = nxt_queue_first(sockets);
2940          qlk != nxt_queue_tail(sockets);
2941          qlk = nxt_queue_next(qlk))
2942     {
2943         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
2944         if (nxt_slow_path(job == NULL)) {
2945             return NXT_ERROR;
2946         }
2947 
2948         job->work.next = recf->jobs;
2949         recf->jobs = &job->work;
2950 
2951         job->task = tmcf->engine->task;
2952         job->work.handler = handler;
2953         job->work.task = &job->task;
2954         job->work.obj = job;
2955         job->tmcf = tmcf;
2956 
2957         tmcf->count++;
2958 
2959         joint = nxt_mp_alloc(tmcf->router_conf->mem_pool,
2960                              sizeof(nxt_socket_conf_joint_t));
2961         if (nxt_slow_path(joint == NULL)) {
2962             return NXT_ERROR;
2963         }
2964 
2965         job->work.data = joint;
2966 
2967         ret = nxt_upstreams_joint_create(tmcf, &joint->upstreams);
2968         if (nxt_slow_path(ret != NXT_OK)) {
2969             return ret;
2970         }
2971 
2972         joint->count = 1;
2973 
2974         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2975         skcf->count++;
2976         joint->socket_conf = skcf;
2977 
2978         joint->engine = recf->engine;
2979     }
2980 
2981     return NXT_OK;
2982 }
2983 
2984 
2985 static nxt_int_t
2986 nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
2987     nxt_router_engine_conf_t *recf)
2988 {
2989     nxt_joint_job_t  *job;
2990 
2991     job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
2992     if (nxt_slow_path(job == NULL)) {
2993         return NXT_ERROR;
2994     }
2995 
2996     job->work.next = recf->jobs;
2997     recf->jobs = &job->work;
2998 
2999     job->task = tmcf->engine->task;
3000     job->work.handler = nxt_router_worker_thread_quit;
3001     job->work.task = &job->task;
3002     job->work.obj = NULL;
3003     job->work.data = NULL;
3004     job->tmcf = NULL;
3005 
3006     return NXT_OK;
3007 }
3008 
3009 
3010 static nxt_int_t
3011 nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
3012     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets)
3013 {
3014     nxt_joint_job_t   *job;
3015     nxt_queue_link_t  *qlk;
3016 
3017     for (qlk = nxt_queue_first(sockets);
3018          qlk != nxt_queue_tail(sockets);
3019          qlk = nxt_queue_next(qlk))
3020     {
3021         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3022         if (nxt_slow_path(job == NULL)) {
3023             return NXT_ERROR;
3024         }
3025 
3026         job->work.next = recf->jobs;
3027         recf->jobs = &job->work;
3028 
3029         job->task = tmcf->engine->task;
3030         job->work.handler = nxt_router_listen_socket_delete;
3031         job->work.task = &job->task;
3032         job->work.obj = job;
3033         job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
3034         job->tmcf = tmcf;
3035 
3036         tmcf->count++;
3037     }
3038 
3039     return NXT_OK;
3040 }
3041 
3042 
3043 static nxt_int_t
3044 nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
3045     nxt_router_temp_conf_t *tmcf)
3046 {
3047     nxt_int_t                 ret;
3048     nxt_uint_t                i, threads;
3049     nxt_router_engine_conf_t  *recf;
3050 
3051     recf = tmcf->engines->elts;
3052     threads = tmcf->router_conf->threads;
3053 
3054     for (i = tmcf->new_threads; i < threads; i++) {
3055         ret = nxt_router_thread_create(task, rt, recf[i].engine);
3056         if (nxt_slow_path(ret != NXT_OK)) {
3057             return ret;
3058         }
3059     }
3060 
3061     return NXT_OK;
3062 }
3063 
3064 
3065 static nxt_int_t
3066 nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
3067     nxt_event_engine_t *engine)
3068 {
3069     nxt_int_t            ret;
3070     nxt_thread_link_t    *link;
3071     nxt_thread_handle_t  handle;
3072 
3073     link = nxt_zalloc(sizeof(nxt_thread_link_t));
3074 
3075     if (nxt_slow_path(link == NULL)) {
3076         return NXT_ERROR;
3077     }
3078 
3079     link->start = nxt_router_thread_start;
3080     link->engine = engine;
3081     link->work.handler = nxt_router_thread_exit_handler;
3082     link->work.task = task;
3083     link->work.data = link;
3084 
3085     nxt_queue_insert_tail(&rt->engines, &engine->link);
3086 
3087     ret = nxt_thread_create(&handle, link);
3088 
3089     if (nxt_slow_path(ret != NXT_OK)) {
3090         nxt_queue_remove(&engine->link);
3091     }
3092 
3093     return ret;
3094 }
3095 
3096 
3097 static void
3098 nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
3099     nxt_router_temp_conf_t *tmcf)
3100 {
3101     nxt_app_t  *app;
3102 
3103     nxt_queue_each(app, &router->apps, nxt_app_t, link) {
3104 
3105         nxt_router_app_unlink(task, app);
3106 
3107     } nxt_queue_loop;
3108 
3109     nxt_queue_add(&router->apps, &tmcf->previous);
3110     nxt_queue_add(&router->apps, &tmcf->apps);
3111 }
3112 
3113 
3114 static void
3115 nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf)
3116 {
3117     nxt_uint_t                n;
3118     nxt_event_engine_t        *engine;
3119     nxt_router_engine_conf_t  *recf;
3120 
3121     recf = tmcf->engines->elts;
3122 
3123     for (n = tmcf->engines->nelts; n != 0; n--) {
3124         engine = recf->engine;
3125 
3126         switch (recf->action) {
3127 
3128         case NXT_ROUTER_ENGINE_KEEP:
3129             break;
3130 
3131         case NXT_ROUTER_ENGINE_ADD:
3132             nxt_queue_insert_tail(&router->engines, &engine->link0);
3133             break;
3134 
3135         case NXT_ROUTER_ENGINE_DELETE:
3136             nxt_queue_remove(&engine->link0);
3137             break;
3138         }
3139 
3140         nxt_router_engine_post(engine, recf->jobs);
3141 
3142         recf++;
3143     }
3144 }
3145 
3146 
3147 static void
3148 nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs)
3149 {
3150     nxt_work_t  *work, *next;
3151 
3152     for (work = jobs; work != NULL; work = next) {
3153         next = work->next;
3154         work->next = NULL;
3155 
3156         nxt_event_engine_post(engine, work);
3157     }
3158 }
3159 
3160 
3161 static nxt_port_handlers_t  nxt_router_app_port_handlers = {
3162     .rpc_error       = nxt_port_rpc_handler,
3163     .mmap            = nxt_port_mmap_handler,
3164     .data            = nxt_port_rpc_handler,
3165     .oosm            = nxt_router_oosm_handler,
3166     .req_headers_ack = nxt_port_rpc_handler,
3167 };
3168 
3169 
3170 static void
3171 nxt_router_thread_start(void *data)
3172 {
3173     nxt_int_t           ret;
3174     nxt_port_t          *port;
3175     nxt_task_t          *task;
3176     nxt_work_t          *work;
3177     nxt_thread_t        *thread;
3178     nxt_thread_link_t   *link;
3179     nxt_event_engine_t  *engine;
3180 
3181     link = data;
3182     engine = link->engine;
3183     task = &engine->task;
3184 
3185     thread = nxt_thread();
3186 
3187     nxt_event_engine_thread_adopt(engine);
3188 
3189     /* STUB */
3190     thread->runtime = engine->task.thread->runtime;
3191 
3192     engine->task.thread = thread;
3193     engine->task.log = thread->log;
3194     thread->engine = engine;
3195     thread->task = &engine->task;
3196 #if 0
3197     thread->fiber = &engine->fibers->fiber;
3198 #endif
3199 
3200     engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
3201     if (nxt_slow_path(engine->mem_pool == NULL)) {
3202         return;
3203     }
3204 
3205     port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid,
3206                         NXT_PROCESS_ROUTER);
3207     if (nxt_slow_path(port == NULL)) {
3208         return;
3209     }
3210 
3211     ret = nxt_port_socket_init(task, port, 0);
3212     if (nxt_slow_path(ret != NXT_OK)) {
3213         nxt_port_use(task, port, -1);
3214         return;
3215     }
3216 
3217     ret = nxt_router_port_queue_init(task, port);
3218     if (nxt_slow_path(ret != NXT_OK)) {
3219         nxt_port_use(task, port, -1);
3220         return;
3221     }
3222 
3223     engine->port = port;
3224 
3225     nxt_port_enable(task, port, &nxt_router_app_port_handlers);
3226 
3227     work = nxt_zalloc(sizeof(nxt_work_t));
3228     if (nxt_slow_path(work == NULL)) {
3229         return;
3230     }
3231 
3232     work->handler = nxt_router_rt_add_port;
3233     work->task = link->work.task;
3234     work->obj = work;
3235     work->data = port;
3236 
3237     nxt_event_engine_post(link->work.task->thread->engine, work);
3238 
3239     nxt_event_engine_start(engine);
3240 }
3241 
3242 
3243 static void
3244 nxt_router_rt_add_port(nxt_task_t *task, void *obj, void *data)
3245 {
3246     nxt_int_t      res;
3247     nxt_port_t     *port;
3248     nxt_runtime_t  *rt;
3249 
3250     rt = task->thread->runtime;
3251     port = data;
3252 
3253     nxt_free(obj);
3254 
3255     res = nxt_port_hash_add(&rt->ports, port);
3256 
3257     if (nxt_fast_path(res == NXT_OK)) {
3258         nxt_port_use(task, port, 1);
3259     }
3260 }
3261 
3262 
3263 static void
3264 nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data)
3265 {
3266     nxt_joint_job_t          *job;
3267     nxt_socket_conf_t        *skcf;
3268     nxt_listen_event_t       *lev;
3269     nxt_listen_socket_t      *ls;
3270     nxt_thread_spinlock_t    *lock;
3271     nxt_socket_conf_joint_t  *joint;
3272 
3273     job = obj;
3274     joint = data;
3275 
3276     nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link);
3277 
3278     skcf = joint->socket_conf;
3279     ls = skcf->listen;
3280 
3281     lev = nxt_listen_event(task, ls);
3282     if (nxt_slow_path(lev == NULL)) {
3283         nxt_router_listen_socket_release(task, skcf);
3284         return;
3285     }
3286 
3287     lev->socket.data = joint;
3288 
3289     lock = &skcf->router_conf->router->lock;
3290 
3291     nxt_thread_spin_lock(lock);
3292     ls->count++;
3293     nxt_thread_spin_unlock(lock);
3294 
3295     job->work.next = NULL;
3296     job->work.handler = nxt_router_conf_wait;
3297 
3298     nxt_event_engine_post(job->tmcf->engine, &job->work);
3299 }
3300 
3301 
3302 nxt_inline nxt_listen_event_t *
3303 nxt_router_listen_event(nxt_queue_t *listen_connections,
3304     nxt_socket_conf_t *skcf)
3305 {
3306     nxt_socket_t        fd;
3307     nxt_queue_link_t    *qlk;
3308     nxt_listen_event_t  *lev;
3309 
3310     fd = skcf->listen->socket;
3311 
3312     for (qlk = nxt_queue_first(listen_connections);
3313          qlk != nxt_queue_tail(listen_connections);
3314          qlk = nxt_queue_next(qlk))
3315     {
3316         lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link);
3317 
3318         if (fd == lev->socket.fd) {
3319             return lev;
3320         }
3321     }
3322 
3323     return NULL;
3324 }
3325 
3326 
3327 static void
3328 nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data)
3329 {
3330     nxt_joint_job_t          *job;
3331     nxt_event_engine_t       *engine;
3332     nxt_listen_event_t       *lev;
3333     nxt_socket_conf_joint_t  *joint, *old;
3334 
3335     job = obj;
3336     joint = data;
3337 
3338     engine = task->thread->engine;
3339 
3340     nxt_queue_insert_tail(&engine->joints, &joint->link);
3341 
3342     lev = nxt_router_listen_event(&engine->listen_connections,
3343                                   joint->socket_conf);
3344 
3345     old = lev->socket.data;
3346     lev->socket.data = joint;
3347     lev->listen = joint->socket_conf->listen;
3348 
3349     job->work.next = NULL;
3350     job->work.handler = nxt_router_conf_wait;
3351 
3352     nxt_event_engine_post(job->tmcf->engine, &job->work);
3353 
3354     /*
3355      * The task is allocated from configuration temporary
3356      * memory pool so it can be freed after engine post operation.
3357      */
3358 
3359     nxt_router_conf_release(&engine->task, old);
3360 }
3361 
3362 
3363 static void
3364 nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data)
3365 {
3366     nxt_socket_conf_t        *skcf;
3367     nxt_listen_event_t       *lev;
3368     nxt_event_engine_t       *engine;
3369     nxt_socket_conf_joint_t  *joint;
3370 
3371     skcf = data;
3372 
3373     engine = task->thread->engine;
3374 
3375     lev = nxt_router_listen_event(&engine->listen_connections, skcf);
3376 
3377     nxt_fd_event_delete(engine, &lev->socket);
3378 
3379     nxt_debug(task, "engine %p: listen socket delete: %d", engine,
3380               lev->socket.fd);
3381 
3382     joint = lev->socket.data;
3383     joint->close_job = obj;
3384 
3385     lev->timer.handler = nxt_router_listen_socket_close;
3386     lev->timer.work_queue = &engine->fast_work_queue;
3387 
3388     nxt_timer_add(engine, &lev->timer, 0);
3389 }
3390 
3391 
3392 static void
3393 nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, void *data)
3394 {
3395     nxt_event_engine_t  *engine;
3396 
3397     nxt_debug(task, "router worker thread quit");
3398 
3399     engine = task->thread->engine;
3400 
3401     engine->shutdown = 1;
3402 
3403     if (nxt_queue_is_empty(&engine->joints)) {
3404         nxt_thread_exit(task->thread);
3405     }
3406 }
3407 
3408 
3409 static void
3410 nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data)
3411 {
3412     nxt_timer_t              *timer;
3413     nxt_joint_job_t          *job;
3414     nxt_listen_event_t       *lev;
3415     nxt_socket_conf_joint_t  *joint;
3416 
3417     timer = obj;
3418     lev = nxt_timer_data(timer, nxt_listen_event_t, timer);
3419 
3420     nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine,
3421               lev->socket.fd);
3422 
3423     nxt_queue_remove(&lev->link);
3424 
3425     joint = lev->socket.data;
3426     lev->socket.data = NULL;
3427 
3428     /* 'task' refers to lev->task and we cannot use after nxt_free() */
3429     task = &task->thread->engine->task;
3430 
3431     nxt_router_listen_socket_release(task, joint->socket_conf);
3432 
3433     job = joint->close_job;
3434     job->work.next = NULL;
3435     job->work.handler = nxt_router_conf_wait;
3436 
3437     nxt_event_engine_post(job->tmcf->engine, &job->work);
3438 
3439     nxt_router_listen_event_release(task, lev, joint);
3440 }
3441 
3442 
3443 static void
3444 nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf)
3445 {
3446     nxt_listen_socket_t    *ls;
3447     nxt_thread_spinlock_t  *lock;
3448 
3449     ls = skcf->listen;
3450     lock = &skcf->router_conf->router->lock;
3451 
3452     nxt_thread_spin_lock(lock);
3453 
3454     nxt_debug(task, "engine %p: listen socket release: ls->count %D",
3455               task->thread->engine, ls->count);
3456 
3457     if (--ls->count != 0) {
3458         ls = NULL;
3459     }
3460 
3461     nxt_thread_spin_unlock(lock);
3462 
3463     if (ls != NULL) {
3464         nxt_socket_close(task, ls->socket);
3465         nxt_free(ls);
3466     }
3467 }
3468 
3469 
3470 void
3471 nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev,
3472     nxt_socket_conf_joint_t *joint)
3473 {
3474     nxt_event_engine_t  *engine;
3475 
3476     nxt_debug(task, "listen event count: %D", lev->count);
3477 
3478     engine = task->thread->engine;
3479 
3480     if (--lev->count == 0) {
3481         if (lev->next != NULL) {
3482             nxt_sockaddr_cache_free(engine, lev->next);
3483 
3484             nxt_conn_free(task, lev->next);
3485         }
3486 
3487         nxt_free(lev);
3488     }
3489 
3490     if (joint != NULL) {
3491         nxt_router_conf_release(task, joint);
3492     }
3493 
3494     if (engine->shutdown && nxt_queue_is_empty(&engine->joints)) {
3495         nxt_thread_exit(task->thread);
3496     }
3497 }
3498 
3499 
3500 void
3501 nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
3502 {
3503     nxt_socket_conf_t      *skcf;
3504     nxt_router_conf_t      *rtcf;
3505     nxt_thread_spinlock_t  *lock;
3506 
3507     nxt_debug(task, "conf joint %p count: %D", joint, joint->count);
3508 
3509     if (--joint->count != 0) {
3510         return;
3511     }
3512 
3513     nxt_queue_remove(&joint->link);
3514 
3515     /*
3516      * The joint content can not be safely used after the critical
3517      * section protected by the spinlock because its memory pool may
3518      * be already destroyed by another thread.
3519      */
3520     skcf = joint->socket_conf;
3521     rtcf = skcf->router_conf;
3522     lock = &rtcf->router->lock;
3523 
3524     nxt_thread_spin_lock(lock);
3525 
3526     nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count,
3527               rtcf, rtcf->count);
3528 
3529     if (--skcf->count != 0) {
3530         skcf = NULL;
3531         rtcf = NULL;
3532 
3533     } else {
3534         nxt_queue_remove(&skcf->link);
3535 
3536         if (--rtcf->count != 0) {
3537             rtcf = NULL;
3538         }
3539     }
3540 
3541     nxt_thread_spin_unlock(lock);
3542 
3543 #if (NXT_TLS)
3544     if (skcf != NULL && skcf->tls != NULL) {
3545         task->thread->runtime->tls->server_free(task, skcf->tls);
3546     }
3547 #endif
3548 
3549     /* TODO remove engine->port */
3550 
3551     if (rtcf != NULL) {
3552         nxt_debug(task, "old router conf is destroyed");
3553 
3554         nxt_router_apps_hash_use(task, rtcf, -1);
3555 
3556         nxt_router_access_log_release(task, lock, rtcf->access_log);
3557 
3558         nxt_mp_thread_adopt(rtcf->mem_pool);
3559 
3560         nxt_mp_destroy(rtcf->mem_pool);
3561     }
3562 }
3563 
3564 
3565 static void
3566 nxt_router_access_log_writer(nxt_task_t *task, nxt_http_request_t *r,
3567     nxt_router_access_log_t *access_log)
3568 {
3569     size_t     size;
3570     u_char     *buf, *p;
3571     nxt_off_t  bytes;
3572 
3573     static nxt_time_string_t  date_cache = {
3574         (nxt_atomic_uint_t) -1,
3575         nxt_router_access_log_date,
3576         "%02d/%s/%4d:%02d:%02d:%02d %c%02d%02d",
3577         nxt_length("31/Dec/1986:19:40:00 +0300"),
3578         NXT_THREAD_TIME_LOCAL,
3579         NXT_THREAD_TIME_SEC,
3580     };
3581 
3582     size = r->remote->address_length
3583            + 6                  /* ' - - [' */
3584            + date_cache.size
3585            + 3                  /* '] "' */
3586            + r->method->length
3587            + 1                  /* space */
3588            + r->target.length
3589            + 1                  /* space */
3590            + r->version.length
3591            + 2                  /* '" ' */
3592            + 3                  /* status */
3593            + 1                  /* space */
3594            + NXT_OFF_T_LEN
3595            + 2                  /* ' "' */
3596            + (r->referer != NULL ? r->referer->value_length : 1)
3597            + 3                  /* '" "' */
3598            + (r->user_agent != NULL ? r->user_agent->value_length : 1)
3599            + 2                  /* '"\n' */
3600     ;
3601 
3602     buf = nxt_mp_nget(r->mem_pool, size);
3603     if (nxt_slow_path(buf == NULL)) {
3604         return;
3605     }
3606 
3607     p = nxt_cpymem(buf, nxt_sockaddr_address(r->remote),
3608                    r->remote->address_length);
3609 
3610     p = nxt_cpymem(p, " - - [", 6);
3611 
3612     p = nxt_thread_time_string(task->thread, &date_cache, p);
3613 
3614     p = nxt_cpymem(p, "] \"", 3);
3615 
3616     if (r->method->length != 0) {
3617         p = nxt_cpymem(p, r->method->start, r->method->length);
3618 
3619         if (r->target.length != 0) {
3620             *p++ = ' ';
3621             p = nxt_cpymem(p, r->target.start, r->target.length);
3622 
3623             if (r->version.length != 0) {
3624                 *p++ = ' ';
3625                 p = nxt_cpymem(p, r->version.start, r->version.length);
3626             }
3627         }
3628 
3629     } else {
3630         *p++ = '-';
3631     }
3632 
3633     p = nxt_cpymem(p, "\" ", 2);
3634 
3635     p = nxt_sprintf(p, p + 3, "%03d", r->status);
3636 
3637     *p++ = ' ';
3638 
3639     bytes = nxt_http_proto[r->protocol].body_bytes_sent(task, r->proto);
3640 
3641     p = nxt_sprintf(p, p + NXT_OFF_T_LEN, "%O", bytes);
3642 
3643     p = nxt_cpymem(p, " \"", 2);
3644 
3645     if (r->referer != NULL) {
3646         p = nxt_cpymem(p, r->referer->value, r->referer->value_length);
3647 
3648     } else {
3649         *p++ = '-';
3650     }
3651 
3652     p = nxt_cpymem(p, "\" \"", 3);
3653 
3654     if (r->user_agent != NULL) {
3655         p = nxt_cpymem(p, r->user_agent->value, r->user_agent->value_length);
3656 
3657     } else {
3658         *p++ = '-';
3659     }
3660 
3661     p = nxt_cpymem(p, "\"\n", 2);
3662 
3663     nxt_fd_write(access_log->fd, buf, p - buf);
3664 }
3665 
3666 
3667 static u_char *
3668 nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, struct tm *tm,
3669     size_t size, const char *format)
3670 {
3671     u_char  sign;
3672     time_t  gmtoff;
3673 
3674     static const char  *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun",
3675                                     "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
3676 
3677     gmtoff = nxt_timezone(tm) / 60;
3678 
3679     if (gmtoff < 0) {
3680         gmtoff = -gmtoff;
3681         sign = '-';
3682 
3683     } else {
3684         sign = '+';
3685     }
3686 
3687     return nxt_sprintf(buf, buf + size, format,
3688                        tm->tm_mday, month[tm->tm_mon], tm->tm_year + 1900,
3689                        tm->tm_hour, tm->tm_min, tm->tm_sec,
3690                        sign, gmtoff / 60, gmtoff % 60);
3691 }
3692 
3693 
3694 static void
3695 nxt_router_access_log_open(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
3696 {
3697     uint32_t                 stream;
3698     nxt_int_t                ret;
3699     nxt_buf_t                *b;
3700     nxt_port_t               *main_port, *router_port;
3701     nxt_runtime_t            *rt;
3702     nxt_router_access_log_t  *access_log;
3703 
3704     access_log = tmcf->router_conf->access_log;
3705 
3706     b = nxt_buf_mem_alloc(tmcf->mem_pool, access_log->path.length + 1, 0);
3707     if (nxt_slow_path(b == NULL)) {
3708         goto fail;
3709     }
3710 
3711     b->completion_handler = nxt_router_dummy_buf_completion;
3712 
3713     nxt_buf_cpystr(b, &access_log->path);
3714     *b->mem.free++ = '\0';
3715 
3716     rt = task->thread->runtime;
3717     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
3718     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
3719 
3720     stream = nxt_port_rpc_register_handler(task, router_port,
3721                                            nxt_router_access_log_ready,
3722                                            nxt_router_access_log_error,
3723                                            -1, tmcf);
3724     if (nxt_slow_path(stream == 0)) {
3725         goto fail;
3726     }
3727 
3728     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1,
3729                                 stream, router_port->id, b);
3730 
3731     if (nxt_slow_path(ret != NXT_OK)) {
3732         nxt_port_rpc_cancel(task, router_port, stream);
3733         goto fail;
3734     }
3735 
3736     return;
3737 
3738 fail:
3739 
3740     nxt_router_conf_error(task, tmcf);
3741 }
3742 
3743 
3744 static void
3745 nxt_router_access_log_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3746     void *data)
3747 {
3748     nxt_router_temp_conf_t   *tmcf;
3749     nxt_router_access_log_t  *access_log;
3750 
3751     tmcf = data;
3752 
3753     access_log = tmcf->router_conf->access_log;
3754 
3755     access_log->fd = msg->fd[0];
3756 
3757     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
3758                        nxt_router_conf_apply, task, tmcf, NULL);
3759 }
3760 
3761 
3762 static void
3763 nxt_router_access_log_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3764     void *data)
3765 {
3766     nxt_router_temp_conf_t  *tmcf;
3767 
3768     tmcf = data;
3769 
3770     nxt_router_conf_error(task, tmcf);
3771 }
3772 
3773 
3774 static void
3775 nxt_router_access_log_release(nxt_task_t *task, nxt_thread_spinlock_t *lock,
3776     nxt_router_access_log_t *access_log)
3777 {
3778     if (access_log == NULL) {
3779         return;
3780     }
3781 
3782     nxt_thread_spin_lock(lock);
3783 
3784     if (--access_log->count != 0) {
3785         access_log = NULL;
3786     }
3787 
3788     nxt_thread_spin_unlock(lock);
3789 
3790     if (access_log != NULL) {
3791 
3792         if (access_log->fd != -1) {
3793             nxt_fd_close(access_log->fd);
3794         }
3795 
3796         nxt_free(access_log);
3797     }
3798 }
3799 
3800 
3801 typedef struct {
3802     nxt_mp_t                 *mem_pool;
3803     nxt_router_access_log_t  *access_log;
3804 } nxt_router_access_log_reopen_t;
3805 
3806 
3807 static void
3808 nxt_router_access_log_reopen_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
3809 {
3810     nxt_mp_t                        *mp;
3811     uint32_t                        stream;
3812     nxt_int_t                       ret;
3813     nxt_buf_t                       *b;
3814     nxt_port_t                      *main_port, *router_port;
3815     nxt_runtime_t                   *rt;
3816     nxt_router_access_log_t         *access_log;
3817     nxt_router_access_log_reopen_t  *reopen;
3818 
3819     access_log = nxt_router->access_log;
3820 
3821     if (access_log == NULL) {
3822         return;
3823     }
3824 
3825     mp = nxt_mp_create(1024, 128, 256, 32);
3826     if (nxt_slow_path(mp == NULL)) {
3827         return;
3828     }
3829 
3830     reopen = nxt_mp_get(mp, sizeof(nxt_router_access_log_reopen_t));
3831     if (nxt_slow_path(reopen == NULL)) {
3832         goto fail;
3833     }
3834 
3835     reopen->mem_pool = mp;
3836     reopen->access_log = access_log;
3837 
3838     b = nxt_buf_mem_alloc(mp, access_log->path.length + 1, 0);
3839     if (nxt_slow_path(b == NULL)) {
3840         goto fail;
3841     }
3842 
3843     b->completion_handler = nxt_router_access_log_reopen_completion;
3844 
3845     nxt_buf_cpystr(b, &access_log->path);
3846     *b->mem.free++ = '\0';
3847 
3848     rt = task->thread->runtime;
3849     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
3850     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
3851 
3852     stream = nxt_port_rpc_register_handler(task, router_port,
3853                                            nxt_router_access_log_reopen_ready,
3854                                            nxt_router_access_log_reopen_error,
3855                                            -1, reopen);
3856     if (nxt_slow_path(stream == 0)) {
3857         goto fail;
3858     }
3859 
3860     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1,
3861                                 stream, router_port->id, b);
3862 
3863     if (nxt_slow_path(ret != NXT_OK)) {
3864         nxt_port_rpc_cancel(task, router_port, stream);
3865         goto fail;
3866     }
3867 
3868     nxt_mp_retain(mp);
3869 
3870     return;
3871 
3872 fail:
3873 
3874     nxt_mp_destroy(mp);
3875 }
3876 
3877 
3878 static void
3879 nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj, void *data)
3880 {
3881     nxt_mp_t   *mp;
3882     nxt_buf_t  *b;
3883 
3884     b = obj;
3885     mp = b->data;
3886 
3887     nxt_mp_release(mp);
3888 }
3889 
3890 
3891 static void
3892 nxt_router_access_log_reopen_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3893     void *data)
3894 {
3895     nxt_router_access_log_t         *access_log;
3896     nxt_router_access_log_reopen_t  *reopen;
3897 
3898     reopen = data;
3899 
3900     access_log = reopen->access_log;
3901 
3902     if (access_log == nxt_router->access_log) {
3903 
3904         if (nxt_slow_path(dup2(msg->fd[0], access_log->fd) == -1)) {
3905             nxt_alert(task, "dup2(%FD, %FD) failed %E",
3906                       msg->fd[0], access_log->fd, nxt_errno);
3907         }
3908     }
3909 
3910     nxt_fd_close(msg->fd[0]);
3911     nxt_mp_release(reopen->mem_pool);
3912 }
3913 
3914 
3915 static void
3916 nxt_router_access_log_reopen_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3917     void *data)
3918 {
3919     nxt_router_access_log_reopen_t  *reopen;
3920 
3921     reopen = data;
3922 
3923     nxt_mp_release(reopen->mem_pool);
3924 }
3925 
3926 
3927 static void
3928 nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data)
3929 {
3930     nxt_port_t           *port;
3931     nxt_thread_link_t    *link;
3932     nxt_event_engine_t   *engine;
3933     nxt_thread_handle_t  handle;
3934 
3935     handle = (nxt_thread_handle_t) (uintptr_t) obj;
3936     link = data;
3937 
3938     nxt_thread_wait(handle);
3939 
3940     engine = link->engine;
3941 
3942     nxt_queue_remove(&engine->link);
3943 
3944     port = engine->port;
3945 
3946     // TODO notify all apps
3947 
3948     port->engine = task->thread->engine;
3949     nxt_mp_thread_adopt(port->mem_pool);
3950     nxt_port_use(task, port, -1);
3951 
3952     nxt_mp_thread_adopt(engine->mem_pool);
3953     nxt_mp_destroy(engine->mem_pool);
3954 
3955     nxt_event_engine_free(engine);
3956 
3957     nxt_free(link);
3958 }
3959 
3960 
3961 static void
3962 nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3963     void *data)
3964 {
3965     size_t                  b_size, count;
3966     nxt_int_t               ret;
3967     nxt_app_t               *app;
3968     nxt_buf_t               *b, *next;
3969     nxt_port_t              *app_port;
3970     nxt_unit_field_t        *f;
3971     nxt_http_field_t        *field;
3972     nxt_http_request_t      *r;
3973     nxt_unit_response_t     *resp;
3974     nxt_request_rpc_data_t  *req_rpc_data;
3975 
3976     req_rpc_data = data;
3977 
3978     r = req_rpc_data->request;
3979     if (nxt_slow_path(r == NULL)) {
3980         return;
3981     }
3982 
3983     if (r->error) {
3984         nxt_request_rpc_data_unlink(task, req_rpc_data);
3985         return;
3986     }
3987 
3988     app = req_rpc_data->app;
3989     nxt_assert(app != NULL);
3990 
3991     if (msg->port_msg.type == _NXT_PORT_MSG_REQ_HEADERS_ACK) {
3992         nxt_router_req_headers_ack_handler(task, msg, req_rpc_data);
3993 
3994         return;
3995     }
3996 
3997     b = (msg->size == 0) ? NULL : msg->buf;
3998 
3999     if (msg->port_msg.last != 0) {
4000         nxt_debug(task, "router data create last buf");
4001 
4002         nxt_buf_chain_add(&b, nxt_http_buf_last(r));
4003 
4004         req_rpc_data->rpc_cancel = 0;
4005 
4006         if (req_rpc_data->apr_action == NXT_APR_REQUEST_FAILED) {
4007             req_rpc_data->apr_action = NXT_APR_GOT_RESPONSE;
4008         }
4009 
4010         nxt_request_rpc_data_unlink(task, req_rpc_data);
4011 
4012     } else {
4013         if (app->timeout != 0) {
4014             r->timer.handler = nxt_router_app_timeout;
4015             r->timer_data = req_rpc_data;
4016             nxt_timer_add(task->thread->engine, &r->timer, app->timeout);
4017         }
4018     }
4019 
4020     if (b == NULL) {
4021         return;
4022     }
4023 
4024     if (msg->buf == b) {
4025         /* Disable instant buffer completion/re-using by port. */
4026         msg->buf = NULL;
4027     }
4028 
4029     if (r->header_sent) {
4030         nxt_buf_chain_add(&r->out, b);
4031         nxt_http_request_send_body(task, r, NULL);
4032 
4033     } else {
4034         b_size = nxt_buf_is_mem(b) ? nxt_buf_mem_used_size(&b->mem) : 0;
4035 
4036         if (nxt_slow_path(b_size < sizeof(nxt_unit_response_t))) {
4037             nxt_alert(task, "response buffer too small: %z", b_size);
4038             goto fail;
4039         }
4040 
4041         resp = (void *) b->mem.pos;
4042         count = (b_size - sizeof(nxt_unit_response_t))
4043                     / sizeof(nxt_unit_field_t);
4044 
4045         if (nxt_slow_path(count < resp->fields_count)) {
4046             nxt_alert(task, "response buffer too small for fields count: %D",
4047                       resp->fields_count);
4048             goto fail;
4049         }
4050 
4051         field = NULL;
4052 
4053         for (f = resp->fields; f < resp->fields + resp->fields_count; f++) {
4054             if (f->skip) {
4055                 continue;
4056             }
4057 
4058             field = nxt_list_add(r->resp.fields);
4059 
4060             if (nxt_slow_path(field == NULL)) {
4061                 goto fail;
4062             }
4063 
4064             field->hash = f->hash;
4065             field->skip = 0;
4066             field->hopbyhop = 0;
4067 
4068             field->name_length = f->name_length;
4069             field->value_length = f->value_length;
4070             field->name = nxt_unit_sptr_get(&f->name);
4071             field->value = nxt_unit_sptr_get(&f->value);
4072 
4073             ret = nxt_http_field_process(field, &nxt_response_fields_hash, r);
4074             if (nxt_slow_path(ret != NXT_OK)) {
4075                 goto fail;
4076             }
4077 
4078             nxt_debug(task, "header%s: %*s: %*s",
4079                       (field->skip ? " skipped" : ""),
4080                       (size_t) field->name_length, field->name,
4081                       (size_t) field->value_length, field->value);
4082 
4083             if (field->skip) {
4084                 r->resp.fields->last->nelts--;
4085             }
4086         }
4087 
4088         r->status = resp->status;
4089 
4090         if (resp->piggyback_content_length != 0) {
4091             b->mem.pos = nxt_unit_sptr_get(&resp->piggyback_content);
4092             b->mem.free = b->mem.pos + resp->piggyback_content_length;
4093 
4094         } else {
4095             b->mem.pos = b->mem.free;
4096         }
4097 
4098         if (nxt_buf_mem_used_size(&b->mem) == 0) {
4099             next = b->next;
4100             b->next = NULL;
4101 
4102             nxt_work_queue_add(&task->thread->engine->fast_work_queue,
4103                                b->completion_handler, task, b, b->parent);
4104 
4105             b = next;
4106         }
4107 
4108         if (b != NULL) {
4109             nxt_buf_chain_add(&r->out, b);
4110         }
4111 
4112         nxt_http_request_header_send(task, r, nxt_http_request_send_body, NULL);
4113 
4114         if (r->websocket_handshake
4115             && r->status == NXT_HTTP_SWITCHING_PROTOCOLS)
4116         {
4117             app_port = req_rpc_data->app_port;
4118             if (nxt_slow_path(app_port == NULL)) {
4119                 goto fail;
4120             }
4121 
4122             nxt_thread_mutex_lock(&app->mutex);
4123 
4124             app_port->main_app_port->active_websockets++;
4125 
4126             nxt_thread_mutex_unlock(&app->mutex);
4127 
4128             nxt_router_app_port_release(task, app_port, NXT_APR_UPGRADE);
4129             req_rpc_data->apr_action = NXT_APR_CLOSE;
4130 
4131             nxt_debug(task, "stream #%uD upgrade", req_rpc_data->stream);
4132 
4133             r->state = &nxt_http_websocket;
4134 
4135         } else {
4136             r->state = &nxt_http_request_send_state;
4137         }
4138     }
4139 
4140     return;
4141 
4142 fail:
4143 
4144     nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
4145 
4146     nxt_request_rpc_data_unlink(task, req_rpc_data);
4147 }
4148 
4149 
4150 static void
4151 nxt_router_req_headers_ack_handler(nxt_task_t *task,
4152     nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data)
4153 {
4154     int                 res;
4155     nxt_app_t           *app;
4156     nxt_buf_t           *b;
4157     nxt_bool_t          start_process, unlinked;
4158     nxt_port_t          *app_port, *main_app_port, *idle_port;
4159     nxt_queue_link_t    *idle_lnk;
4160     nxt_http_request_t  *r;
4161 
4162     nxt_debug(task, "stream #%uD: got ack from %PI:%d",
4163               req_rpc_data->stream,
4164               msg->port_msg.pid, msg->port_msg.reply_port);
4165 
4166     nxt_port_rpc_ex_set_peer(task, msg->port, req_rpc_data,
4167                              msg->port_msg.pid);
4168 
4169     app = req_rpc_data->app;
4170     r = req_rpc_data->request;
4171 
4172     start_process = 0;
4173     unlinked = 0;
4174 
4175     nxt_thread_mutex_lock(&app->mutex);
4176 
4177     if (r->app_link.next != NULL) {
4178         nxt_queue_remove(&r->app_link);
4179         r->app_link.next = NULL;
4180 
4181         unlinked = 1;
4182     }
4183 
4184     app_port = nxt_port_hash_find(&app->port_hash, msg->port_msg.pid,
4185                                   msg->port_msg.reply_port);
4186     if (nxt_slow_path(app_port == NULL)) {
4187         nxt_thread_mutex_unlock(&app->mutex);
4188 
4189         nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
4190 
4191         if (unlinked) {
4192             nxt_mp_release(r->mem_pool);
4193         }
4194 
4195         return;
4196     }
4197 
4198     main_app_port = app_port->main_app_port;
4199 
4200     if (nxt_queue_chk_remove(&main_app_port->idle_link)) {
4201         app->idle_processes--;
4202 
4203         nxt_debug(task, "app '%V' move port %PI:%d out of %s (ack)",
4204                   &app->name, main_app_port->pid, main_app_port->id,
4205                   (main_app_port->idle_start ? "idle_ports" : "spare_ports"));
4206 
4207         /* Check port was in 'spare_ports' using idle_start field. */
4208         if (main_app_port->idle_start == 0
4209             && app->idle_processes >= app->spare_processes)
4210         {
4211             /*
4212              * If there is a vacant space in spare ports,
4213              * move the last idle to spare_ports.
4214              */
4215             nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4216 
4217             idle_lnk = nxt_queue_last(&app->idle_ports);
4218             idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link);
4219             nxt_queue_remove(idle_lnk);
4220 
4221             nxt_queue_insert_tail(&app->spare_ports, idle_lnk);
4222 
4223             idle_port->idle_start = 0;
4224 
4225             nxt_debug(task, "app '%V' move port %PI:%d from idle_ports "
4226                       "to spare_ports",
4227                       &app->name, idle_port->pid, idle_port->id);
4228         }
4229 
4230         if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) {
4231             app->pending_processes++;
4232             start_process = 1;
4233         }
4234     }
4235 
4236     main_app_port->active_requests++;
4237 
4238     nxt_port_inc_use(app_port);
4239 
4240     nxt_thread_mutex_unlock(&app->mutex);
4241 
4242     if (unlinked) {
4243         nxt_mp_release(r->mem_pool);
4244     }
4245 
4246     if (start_process) {
4247         nxt_router_start_app_process(task, app);
4248     }
4249 
4250     nxt_port_use(task, req_rpc_data->app_port, -1);
4251 
4252     req_rpc_data->app_port = app_port;
4253 
4254     b = req_rpc_data->msg_info.buf;
4255 
4256     if (b != NULL) {
4257         /* First buffer is already sent.  Start from second. */
4258         b = b->next;
4259 
4260         req_rpc_data->msg_info.buf->next = NULL;
4261     }
4262 
4263     if (req_rpc_data->msg_info.body_fd != -1 || b != NULL) {
4264         nxt_debug(task, "stream #%uD: send body fd %d", req_rpc_data->stream,
4265                   req_rpc_data->msg_info.body_fd);
4266 
4267         if (req_rpc_data->msg_info.body_fd != -1) {
4268             lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET);
4269         }
4270 
4271         res = nxt_port_socket_write(task, app_port, NXT_PORT_MSG_REQ_BODY,
4272                                     req_rpc_data->msg_info.body_fd,
4273                                     req_rpc_data->stream,
4274                                     task->thread->engine->port->id, b);
4275 
4276         if (nxt_slow_path(res != NXT_OK)) {
4277             nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
4278         }
4279     }
4280 
4281     if (app->timeout != 0) {
4282         r->timer.handler = nxt_router_app_timeout;
4283         r->timer_data = req_rpc_data;
4284         nxt_timer_add(task->thread->engine, &r->timer, app->timeout);
4285     }
4286 }
4287 
4288 
4289 static const nxt_http_request_state_t  nxt_http_request_send_state
4290     nxt_aligned(64) =
4291 {
4292     .error_handler = nxt_http_request_error_handler,
4293 };
4294 
4295 
4296 static void
4297 nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data)
4298 {
4299     nxt_buf_t           *out;
4300     nxt_http_request_t  *r;
4301 
4302     r = obj;
4303 
4304     out = r->out;
4305 
4306     if (out != NULL) {
4307         r->out = NULL;
4308         nxt_http_request_send(task, r, out);
4309     }
4310 }
4311 
4312 
4313 static void
4314 nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4315     void *data)
4316 {
4317     nxt_request_rpc_data_t  *req_rpc_data;
4318 
4319     req_rpc_data = data;
4320 
4321     req_rpc_data->rpc_cancel = 0;
4322 
4323     /* TODO cancel message and return if cancelled. */
4324     // nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream);
4325 
4326     if (req_rpc_data->request != NULL) {
4327         nxt_http_request_error(task, req_rpc_data->request,
4328                                NXT_HTTP_SERVICE_UNAVAILABLE);
4329     }
4330 
4331     nxt_request_rpc_data_unlink(task, req_rpc_data);
4332 }
4333 
4334 
4335 static void
4336 nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4337     void *data)
4338 {
4339     nxt_app_t            *app;
4340     nxt_bool_t           start_process;
4341     nxt_port_t           *port;
4342     nxt_app_joint_t      *app_joint;
4343     nxt_app_joint_rpc_t  *app_joint_rpc;
4344 
4345     nxt_assert(data != NULL);
4346 
4347     app_joint_rpc = data;
4348     app_joint = app_joint_rpc->app_joint;
4349     port = msg->u.new_port;
4350 
4351     nxt_assert(app_joint != NULL);
4352     nxt_assert(port != NULL);
4353     nxt_assert(port->type == NXT_PROCESS_APP);
4354     nxt_assert(port->id == 0);
4355 
4356     app = app_joint->app;
4357 
4358     nxt_router_app_joint_use(task, app_joint, -1);
4359 
4360     if (nxt_slow_path(app == NULL)) {
4361         nxt_debug(task, "new port ready for released app, send QUIT");
4362 
4363         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4364 
4365         return;
4366     }
4367 
4368     nxt_thread_mutex_lock(&app->mutex);
4369 
4370     nxt_assert(app->pending_processes != 0);
4371 
4372     app->pending_processes--;
4373 
4374     if (nxt_slow_path(app->generation != app_joint_rpc->generation)) {
4375         nxt_debug(task, "new port ready for restarted app, send QUIT");
4376 
4377         start_process = !task->thread->engine->shutdown
4378                         && nxt_router_app_can_start(app)
4379                         && nxt_router_app_need_start(app);
4380 
4381         if (start_process) {
4382             app->pending_processes++;
4383         }
4384 
4385         nxt_thread_mutex_unlock(&app->mutex);
4386 
4387         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4388 
4389         if (start_process) {
4390             nxt_router_start_app_process(task, app);
4391         }
4392 
4393         return;
4394     }
4395 
4396     port->app = app;
4397     port->main_app_port = port;
4398 
4399     app->processes++;
4400     nxt_port_hash_add(&app->port_hash, port);
4401     app->port_hash_count++;
4402 
4403     nxt_thread_mutex_unlock(&app->mutex);
4404 
4405     nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d",
4406               &app->name, port->pid, app->processes, app->pending_processes);
4407 
4408     nxt_router_app_shared_port_send(task, port);
4409 
4410     nxt_router_app_port_release(task, port, NXT_APR_NEW_PORT);
4411 }
4412 
4413 
4414 static nxt_int_t
4415 nxt_router_app_shared_port_send(nxt_task_t *task, nxt_port_t *app_port)
4416 {
4417     nxt_buf_t                *b;
4418     nxt_port_t               *port;
4419     nxt_port_msg_new_port_t  *msg;
4420 
4421     b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
4422                              sizeof(nxt_port_data_t));
4423     if (nxt_slow_path(b == NULL)) {
4424         return NXT_ERROR;
4425     }
4426 
4427     port = app_port->app->shared_port;
4428 
4429     nxt_debug(task, "send port %FD to process %PI",
4430               port->pair[0], app_port->pid);
4431 
4432     b->mem.free += sizeof(nxt_port_msg_new_port_t);
4433     msg = (nxt_port_msg_new_port_t *) b->mem.pos;
4434 
4435     msg->id = port->id;
4436     msg->pid = port->pid;
4437     msg->max_size = port->max_size;
4438     msg->max_share = port->max_share;
4439     msg->type = port->type;
4440 
4441     return nxt_port_socket_write2(task, app_port,
4442                                   NXT_PORT_MSG_NEW_PORT,
4443                                   port->pair[0], port->queue_fd,
4444                                   0, 0, b);
4445 }
4446 
4447 
4448 static void
4449 nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4450     void *data)
4451 {
4452     nxt_app_t            *app;
4453     nxt_app_joint_t      *app_joint;
4454     nxt_queue_link_t     *link;
4455     nxt_http_request_t   *r;
4456     nxt_app_joint_rpc_t  *app_joint_rpc;
4457 
4458     nxt_assert(data != NULL);
4459 
4460     app_joint_rpc = data;
4461     app_joint = app_joint_rpc->app_joint;
4462 
4463     nxt_assert(app_joint != NULL);
4464 
4465     app = app_joint->app;
4466 
4467     nxt_router_app_joint_use(task, app_joint, -1);
4468 
4469     if (nxt_slow_path(app == NULL)) {
4470         nxt_debug(task, "start error for released app");
4471 
4472         return;
4473     }
4474 
4475     nxt_debug(task, "app '%V' %p start error", &app->name, app);
4476 
4477     link = NULL;
4478 
4479     nxt_thread_mutex_lock(&app->mutex);
4480 
4481     nxt_assert(app->pending_processes != 0);
4482 
4483     app->pending_processes--;
4484 
4485     if (app->processes == 0 && !nxt_queue_is_empty(&app->ack_waiting_req)) {
4486         link = nxt_queue_first(&app->ack_waiting_req);
4487 
4488         nxt_queue_remove(link);
4489         link->next = NULL;
4490     }
4491 
4492     nxt_thread_mutex_unlock(&app->mutex);
4493 
4494     while (link != NULL) {
4495         r = nxt_container_of(link, nxt_http_request_t, app_link);
4496 
4497         nxt_event_engine_post(r->engine, &r->err_work);
4498 
4499         link = NULL;
4500 
4501         nxt_thread_mutex_lock(&app->mutex);
4502 
4503         if (app->processes == 0 && app->pending_processes == 0
4504             && !nxt_queue_is_empty(&app->ack_waiting_req))
4505         {
4506             link = nxt_queue_first(&app->ack_waiting_req);
4507 
4508             nxt_queue_remove(link);
4509             link->next = NULL;
4510         }
4511 
4512         nxt_thread_mutex_unlock(&app->mutex);
4513     }
4514 }
4515 
4516 
4517 
4518 nxt_inline nxt_port_t *
4519 nxt_router_app_get_port_for_quit(nxt_task_t *task, nxt_app_t *app)
4520 {
4521     nxt_port_t  *port;
4522 
4523     port = NULL;
4524 
4525     nxt_thread_mutex_lock(&app->mutex);
4526 
4527     nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
4528 
4529         /* Caller is responsible to decrease port use count. */
4530         nxt_queue_chk_remove(&port->app_link);
4531 
4532         if (nxt_queue_chk_remove(&port->idle_link)) {
4533             app->idle_processes--;
4534 
4535             nxt_debug(task, "app '%V' move port %PI:%d out of %s for quit",
4536                       &app->name, port->pid, port->id,
4537                       (port->idle_start ? "idle_ports" : "spare_ports"));
4538         }
4539 
4540         nxt_port_hash_remove(&app->port_hash, port);
4541         app->port_hash_count--;
4542 
4543         port->app = NULL;
4544         app->processes--;
4545 
4546         break;
4547 
4548     } nxt_queue_loop;
4549 
4550     nxt_thread_mutex_unlock(&app->mutex);
4551 
4552     return port;
4553 }
4554 
4555 
4556 static void
4557 nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i)
4558 {
4559     int  c;
4560 
4561     c = nxt_atomic_fetch_add(&app->use_count, i);
4562 
4563     if (i < 0 && c == -i) {
4564 
4565         if (task->thread->engine != app->engine) {
4566             nxt_event_engine_post(app->engine, &app->joint->free_app_work);
4567 
4568         } else {
4569             nxt_router_free_app(task, app->joint, NULL);
4570         }
4571     }
4572 }
4573 
4574 
4575 static void
4576 nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app)
4577 {
4578     nxt_debug(task, "app '%V' %p unlink", &app->name, app);
4579 
4580     nxt_queue_remove(&app->link);
4581 
4582     nxt_router_app_use(task, app, -1);
4583 }
4584 
4585 
4586 static void
4587 nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
4588     nxt_apr_action_t action)
4589 {
4590     int         inc_use;
4591     uint32_t    got_response, dec_requests;
4592     nxt_app_t   *app;
4593     nxt_bool_t  port_unchained, send_quit, adjust_idle_timer;
4594     nxt_port_t  *main_app_port;
4595 
4596     nxt_assert(port != NULL);
4597     nxt_assert(port->app != NULL);
4598 
4599     app = port->app;
4600 
4601     inc_use = 0;
4602     got_response = 0;
4603     dec_requests = 0;
4604 
4605     switch (action) {
4606     case NXT_APR_NEW_PORT:
4607         break;
4608     case NXT_APR_REQUEST_FAILED:
4609         dec_requests = 1;
4610         inc_use = -1;
4611         break;
4612     case NXT_APR_GOT_RESPONSE:
4613         got_response = 1;
4614         inc_use = -1;
4615         break;
4616     case NXT_APR_UPGRADE:
4617         got_response = 1;
4618         break;
4619     case NXT_APR_CLOSE:
4620         inc_use = -1;
4621         break;
4622     }
4623 
4624     nxt_debug(task, "app '%V' release port %PI:%d: %d %d", &app->name,
4625               port->pid, port->id,
4626               (int) inc_use, (int) got_response);
4627 
4628     if (port->id == NXT_SHARED_PORT_ID) {
4629         nxt_thread_mutex_lock(&app->mutex);
4630 
4631         app->active_requests -= got_response + dec_requests;
4632 
4633         nxt_thread_mutex_unlock(&app->mutex);
4634 
4635         goto adjust_use;
4636     }
4637 
4638     main_app_port = port->main_app_port;
4639 
4640     nxt_thread_mutex_lock(&app->mutex);
4641 
4642     main_app_port->app_responses += got_response;
4643     main_app_port->active_requests -= got_response + dec_requests;
4644     app->active_requests -= got_response + dec_requests;
4645 
4646     if (main_app_port->pair[1] != -1
4647         && (app->max_requests == 0
4648             || main_app_port->app_responses < app->max_requests))
4649     {
4650         if (main_app_port->app_link.next == NULL) {
4651             nxt_queue_insert_tail(&app->ports, &main_app_port->app_link);
4652 
4653             nxt_port_inc_use(main_app_port);
4654         }
4655     }
4656 
4657     send_quit = (app->max_requests > 0
4658                  && main_app_port->app_responses >= app->max_requests);
4659 
4660     if (send_quit) {
4661         port_unchained = nxt_queue_chk_remove(&main_app_port->app_link);
4662 
4663         nxt_port_hash_remove(&app->port_hash, main_app_port);
4664         app->port_hash_count--;
4665 
4666         main_app_port->app = NULL;
4667         app->processes--;
4668 
4669     } else {
4670         port_unchained = 0;
4671     }
4672 
4673     adjust_idle_timer = 0;
4674 
4675     if (main_app_port->pair[1] != -1 && !send_quit
4676         && main_app_port->active_requests == 0
4677         && main_app_port->active_websockets == 0
4678         && main_app_port->idle_link.next == NULL)
4679     {
4680         if (app->idle_processes == app->spare_processes
4681             && app->adjust_idle_work.data == NULL)
4682         {
4683             adjust_idle_timer = 1;
4684             app->adjust_idle_work.data = app;
4685             app->adjust_idle_work.next = NULL;
4686         }
4687 
4688         if (app->idle_processes < app->spare_processes) {
4689             nxt_queue_insert_tail(&app->spare_ports, &main_app_port->idle_link);
4690 
4691             nxt_debug(task, "app '%V' move port %PI:%d to spare_ports",
4692                       &app->name, main_app_port->pid, main_app_port->id);
4693         } else {
4694             nxt_queue_insert_tail(&app->idle_ports, &main_app_port->idle_link);
4695 
4696             main_app_port->idle_start = task->thread->engine->timers.now;
4697 
4698             nxt_debug(task, "app '%V' move port %PI:%d to idle_ports",
4699                       &app->name, main_app_port->pid, main_app_port->id);
4700         }
4701 
4702         app->idle_processes++;
4703     }
4704 
4705     nxt_thread_mutex_unlock(&app->mutex);
4706 
4707     if (adjust_idle_timer) {
4708         nxt_router_app_use(task, app, 1);
4709         nxt_event_engine_post(app->engine, &app->adjust_idle_work);
4710     }
4711 
4712     /* ? */
4713     if (main_app_port->pair[1] == -1) {
4714         nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)",
4715                   &app->name, app, main_app_port, main_app_port->pid);
4716 
4717         goto adjust_use;
4718     }
4719 
4720     if (send_quit) {
4721         nxt_debug(task, "app '%V' %p send QUIT to port", &app->name, app);
4722 
4723         nxt_port_socket_write(task, main_app_port, NXT_PORT_MSG_QUIT, -1, 0, 0,
4724                               NULL);
4725 
4726         if (port_unchained) {
4727             nxt_port_use(task, main_app_port, -1);
4728         }
4729 
4730         goto adjust_use;
4731     }
4732 
4733     nxt_debug(task, "app '%V' %p requests queue is empty, keep the port",
4734               &app->name, app);
4735 
4736 adjust_use:
4737 
4738     nxt_port_use(task, port, inc_use);
4739 }
4740 
4741 
4742 void
4743 nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port)
4744 {
4745     nxt_app_t         *app;
4746     nxt_bool_t        unchain, start_process;
4747     nxt_port_t        *idle_port;
4748     nxt_queue_link_t  *idle_lnk;
4749 
4750     app = port->app;
4751 
4752     nxt_assert(app != NULL);
4753 
4754     nxt_thread_mutex_lock(&app->mutex);
4755 
4756     nxt_port_hash_remove(&app->port_hash, port);
4757     app->port_hash_count--;
4758 
4759     if (port->id != 0) {
4760         nxt_thread_mutex_unlock(&app->mutex);
4761 
4762         nxt_debug(task, "app '%V' port (%PI, %d) closed", &app->name,
4763                   port->pid, port->id);
4764 
4765         return;
4766     }
4767 
4768     unchain = nxt_queue_chk_remove(&port->app_link);
4769 
4770     if (nxt_queue_chk_remove(&port->idle_link)) {
4771         app->idle_processes--;
4772 
4773         nxt_debug(task, "app '%V' move port %PI:%d out of %s before close",
4774                   &app->name, port->pid, port->id,
4775                   (port->idle_start ? "idle_ports" : "spare_ports"));
4776 
4777         if (port->idle_start == 0
4778             && app->idle_processes >= app->spare_processes)
4779         {
4780             nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4781 
4782             idle_lnk = nxt_queue_last(&app->idle_ports);
4783             idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link);
4784             nxt_queue_remove(idle_lnk);
4785 
4786             nxt_queue_insert_tail(&app->spare_ports, idle_lnk);
4787 
4788             idle_port->idle_start = 0;
4789 
4790             nxt_debug(task, "app '%V' move port %PI:%d from idle_ports "
4791                       "to spare_ports",
4792                       &app->name, idle_port->pid, idle_port->id);
4793         }
4794     }
4795 
4796     app->processes--;
4797 
4798     start_process = !task->thread->engine->shutdown
4799                     && nxt_router_app_can_start(app)
4800                     && nxt_router_app_need_start(app);
4801 
4802     if (start_process) {
4803         app->pending_processes++;
4804     }
4805 
4806     nxt_thread_mutex_unlock(&app->mutex);
4807 
4808     nxt_debug(task, "app '%V' pid %PI closed", &app->name, port->pid);
4809 
4810     if (unchain) {
4811         nxt_port_use(task, port, -1);
4812     }
4813 
4814     if (start_process) {
4815         nxt_router_start_app_process(task, app);
4816     }
4817 }
4818 
4819 
4820 static void
4821 nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data)
4822 {
4823     nxt_app_t           *app;
4824     nxt_bool_t          queued;
4825     nxt_port_t          *port;
4826     nxt_msec_t          timeout, threshold;
4827     nxt_queue_link_t    *lnk;
4828     nxt_event_engine_t  *engine;
4829 
4830     app = obj;
4831     queued = (data == app);
4832 
4833     nxt_debug(task, "nxt_router_adjust_idle_timer: app \"%V\", queued %b",
4834               &app->name, queued);
4835 
4836     engine = task->thread->engine;
4837 
4838     nxt_assert(app->engine == engine);
4839 
4840     threshold = engine->timers.now + app->joint->idle_timer.bias;
4841     timeout = 0;
4842 
4843     nxt_thread_mutex_lock(&app->mutex);
4844 
4845     if (queued) {
4846         app->adjust_idle_work.data = NULL;
4847     }
4848 
4849     nxt_debug(task, "app '%V' idle_processes %d, spare_processes %d",
4850               &app->name,
4851               (int) app->idle_processes, (int) app->spare_processes);
4852 
4853     while (app->idle_processes > app->spare_processes) {
4854 
4855         nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4856 
4857         lnk = nxt_queue_first(&app->idle_ports);
4858         port = nxt_queue_link_data(lnk, nxt_port_t, idle_link);
4859 
4860         timeout = port->idle_start + app->idle_timeout;
4861 
4862         nxt_debug(task, "app '%V' pid %PI, start %M, timeout %M, threshold %M",
4863                   &app->name, port->pid,
4864                   port->idle_start, timeout, threshold);
4865 
4866         if (timeout > threshold) {
4867             break;
4868         }
4869 
4870         nxt_queue_remove(lnk);
4871         lnk->next = NULL;
4872 
4873         nxt_debug(task, "app '%V' move port %PI:%d out of idle_ports (timeout)",
4874                   &app->name, port->pid, port->id);
4875 
4876         nxt_queue_chk_remove(&port->app_link);
4877 
4878         nxt_port_hash_remove(&app->port_hash, port);
4879         app->port_hash_count--;
4880 
4881         app->idle_processes--;
4882         app->processes--;
4883         port->app = NULL;
4884 
4885         nxt_thread_mutex_unlock(&app->mutex);
4886 
4887         nxt_debug(task, "app '%V' send QUIT to idle port %PI",
4888                   &app->name, port->pid);
4889 
4890         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4891 
4892         nxt_port_use(task, port, -1);
4893 
4894         nxt_thread_mutex_lock(&app->mutex);
4895     }
4896 
4897     nxt_thread_mutex_unlock(&app->mutex);
4898 
4899     if (timeout > threshold) {
4900         nxt_timer_add(engine, &app->joint->idle_timer, timeout - threshold);
4901 
4902     } else {
4903         nxt_timer_disable(engine, &app->joint->idle_timer);
4904     }
4905 
4906     if (queued) {
4907         nxt_router_app_use(task, app, -1);
4908     }
4909 }
4910 
4911 
4912 static void
4913 nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, void *data)
4914 {
4915     nxt_timer_t      *timer;
4916     nxt_app_joint_t  *app_joint;
4917 
4918     timer = obj;
4919     app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer);
4920 
4921     if (nxt_fast_path(app_joint->app != NULL)) {
4922         nxt_router_adjust_idle_timer(task, app_joint->app, NULL);
4923     }
4924 }
4925 
4926 
4927 static void
4928 nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, void *data)
4929 {
4930     nxt_timer_t      *timer;
4931     nxt_app_joint_t  *app_joint;
4932 
4933     timer = obj;
4934     app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer);
4935 
4936     nxt_router_app_joint_use(task, app_joint, -1);
4937 }
4938 
4939 
4940 static void
4941 nxt_router_free_app(nxt_task_t *task, void *obj, void *data)
4942 {
4943     nxt_app_t        *app;
4944     nxt_port_t       *port;
4945     nxt_app_joint_t  *app_joint;
4946 
4947     app_joint = obj;
4948     app = app_joint->app;
4949 
4950     for ( ;; ) {
4951         port = nxt_router_app_get_port_for_quit(task, app);
4952         if (port == NULL) {
4953             break;
4954         }
4955 
4956         nxt_debug(task, "send QUIT to app '%V' pid %PI", &app->name, port->pid);
4957 
4958         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4959 
4960         nxt_port_use(task, port, -1);
4961     }
4962 
4963     nxt_thread_mutex_lock(&app->mutex);
4964 
4965     for ( ;; ) {
4966         port = nxt_port_hash_retrieve(&app->port_hash);
4967         if (port == NULL) {
4968             break;
4969         }
4970 
4971         app->port_hash_count--;
4972 
4973         port->app = NULL;
4974 
4975         nxt_port_close(task, port);
4976 
4977         nxt_port_use(task, port, -1);
4978     }
4979 
4980     nxt_thread_mutex_unlock(&app->mutex);
4981 
4982     nxt_assert(app->processes == 0);
4983     nxt_assert(app->active_requests == 0);
4984     nxt_assert(app->port_hash_count == 0);
4985     nxt_assert(app->idle_processes == 0);
4986     nxt_assert(nxt_queue_is_empty(&app->ports));
4987     nxt_assert(nxt_queue_is_empty(&app->spare_ports));
4988     nxt_assert(nxt_queue_is_empty(&app->idle_ports));
4989 
4990     nxt_port_mmaps_destroy(&app->outgoing, 1);
4991 
4992     nxt_thread_mutex_destroy(&app->outgoing.mutex);
4993 
4994     if (app->shared_port != NULL) {
4995         app->shared_port->app = NULL;
4996         nxt_port_close(task, app->shared_port);
4997         nxt_port_use(task, app->shared_port, -1);
4998 
4999         app->shared_port = NULL;
5000     }
5001 
5002     nxt_thread_mutex_destroy(&app->mutex);
5003     nxt_mp_destroy(app->mem_pool);
5004 
5005     app_joint->app = NULL;
5006 
5007     if (nxt_timer_delete(task->thread->engine, &app_joint->idle_timer)) {
5008         app_joint->idle_timer.handler = nxt_router_app_joint_release_handler;
5009         nxt_timer_add(task->thread->engine, &app_joint->idle_timer, 0);
5010 
5011     } else {
5012         nxt_router_app_joint_use(task, app_joint, -1);
5013     }
5014 }
5015 
5016 
5017 static void
5018 nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
5019     nxt_request_rpc_data_t *req_rpc_data)
5020 {
5021     nxt_bool_t          start_process;
5022     nxt_port_t          *port;
5023     nxt_http_request_t  *r;
5024 
5025     start_process = 0;
5026 
5027     nxt_thread_mutex_lock(&app->mutex);
5028 
5029     port = app->shared_port;
5030     nxt_port_inc_use(port);
5031 
5032     app->active_requests++;
5033 
5034     if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) {
5035         app->pending_processes++;
5036         start_process = 1;
5037     }
5038 
5039     r = req_rpc_data->request;
5040 
5041     /*
5042      * Put request into application-wide list to be able to cancel request
5043      * if something goes wrong with application processes.
5044      */
5045     nxt_queue_insert_tail(&app->ack_waiting_req, &r->app_link);
5046 
5047     nxt_thread_mutex_unlock(&app->mutex);
5048 
5049     /*
5050      * Retain request memory pool while request is linked in ack_waiting_req
5051      * to guarantee request structure memory is accessble.
5052      */
5053     nxt_mp_retain(r->mem_pool);
5054 
5055     req_rpc_data->app_port = port;
5056     req_rpc_data->apr_action = NXT_APR_REQUEST_FAILED;
5057 
5058     if (start_process) {
5059         nxt_router_start_app_process(task, app);
5060     }
5061 }
5062 
5063 
5064 void
5065 nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r,
5066     nxt_http_action_t *action)
5067 {
5068     nxt_event_engine_t      *engine;
5069     nxt_http_app_conf_t     *conf;
5070     nxt_request_rpc_data_t  *req_rpc_data;
5071 
5072     conf = action->u.conf;
5073     engine = task->thread->engine;
5074 
5075     r->app_target = conf->target;
5076 
5077     req_rpc_data = nxt_port_rpc_register_handler_ex(task, engine->port,
5078                                           nxt_router_response_ready_handler,
5079                                           nxt_router_response_error_handler,
5080                                           sizeof(nxt_request_rpc_data_t));
5081     if (nxt_slow_path(req_rpc_data == NULL)) {
5082         nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
5083         return;
5084     }
5085 
5086     /*
5087      * At this point we have request req_rpc_data allocated and registered
5088      * in port handlers.  Need to fixup request memory pool.  Counterpart
5089      * release will be called via following call chain:
5090      *    nxt_request_rpc_data_unlink() ->
5091      *        nxt_router_http_request_release_post() ->
5092      *            nxt_router_http_request_release()
5093      */
5094     nxt_mp_retain(r->mem_pool);
5095 
5096     r->timer.task = &engine->task;
5097     r->timer.work_queue = &engine->fast_work_queue;
5098     r->timer.log = engine->task.log;
5099     r->timer.bias = NXT_TIMER_DEFAULT_BIAS;
5100 
5101     r->engine = engine;
5102     r->err_work.handler = nxt_router_http_request_error;
5103     r->err_work.task = task;
5104     r->err_work.obj = r;
5105 
5106     req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data);
5107     req_rpc_data->app = conf->app;
5108     req_rpc_data->msg_info.body_fd = -1;
5109     req_rpc_data->rpc_cancel = 1;
5110 
5111     nxt_router_app_use(task, conf->app, 1);
5112 
5113     req_rpc_data->request = r;
5114     r->req_rpc_data = req_rpc_data;
5115 
5116     if (r->last != NULL) {
5117         r->last->completion_handler = nxt_router_http_request_done;
5118     }
5119 
5120     nxt_router_app_port_get(task, conf->app, req_rpc_data);
5121     nxt_router_app_prepare_request(task, req_rpc_data);
5122 }
5123 
5124 
5125 static void
5126 nxt_router_http_request_error(nxt_task_t *task, void *obj, void *data)
5127 {
5128     nxt_http_request_t  *r;
5129 
5130     r = obj;
5131 
5132     nxt_debug(task, "router http request error (rpc_data %p)", r->req_rpc_data);
5133 
5134     nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
5135 
5136     if (r->req_rpc_data != NULL) {
5137         nxt_request_rpc_data_unlink(task, r->req_rpc_data);
5138     }
5139 
5140     nxt_mp_release(r->mem_pool);
5141 }
5142 
5143 
5144 static void
5145 nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data)
5146 {
5147     nxt_http_request_t  *r;
5148 
5149     r = data;
5150 
5151     nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data);
5152 
5153     if (r->req_rpc_data != NULL) {
5154         nxt_request_rpc_data_unlink(task, r->req_rpc_data);
5155     }
5156 
5157     nxt_http_request_close_handler(task, r, r->proto.any);
5158 }
5159 
5160 
5161 static void
5162 nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj, void *data)
5163 {
5164 }
5165 
5166 
5167 static void
5168 nxt_router_app_prepare_request(nxt_task_t *task,
5169     nxt_request_rpc_data_t *req_rpc_data)
5170 {
5171     nxt_app_t         *app;
5172     nxt_buf_t         *buf, *body;
5173     nxt_int_t         res;
5174     nxt_port_t        *port, *reply_port;
5175 
5176     int                   notify;
5177     struct {
5178         nxt_port_msg_t       pm;
5179         nxt_port_mmap_msg_t  mm;
5180     } msg;
5181 
5182 
5183     app = req_rpc_data->app;
5184 
5185     nxt_assert(app != NULL);
5186 
5187     port = req_rpc_data->app_port;
5188 
5189     nxt_assert(port != NULL);
5190     nxt_assert(port->queue != NULL);
5191 
5192     reply_port = task->thread->engine->port;
5193 
5194     buf = nxt_router_prepare_msg(task, req_rpc_data->request, app,
5195                                  nxt_app_msg_prefix[app->type]);
5196     if (nxt_slow_path(buf == NULL)) {
5197         nxt_alert(task, "stream #%uD, app '%V': failed to prepare app message",
5198                   req_rpc_data->stream, &app->name);
5199 
5200         nxt_http_request_error(task, req_rpc_data->request,
5201                                NXT_HTTP_INTERNAL_SERVER_ERROR);
5202 
5203         return;
5204     }
5205 
5206     nxt_debug(task, "about to send %O bytes buffer to app process port %d",
5207                     nxt_buf_used_size(buf),
5208                     port->socket.fd);
5209 
5210     req_rpc_data->msg_info.buf = buf;
5211 
5212     body = req_rpc_data->request->body;
5213 
5214     if (body != NULL && nxt_buf_is_file(body)) {
5215         req_rpc_data->msg_info.body_fd = body->file->fd;
5216 
5217         body->file->fd = -1;
5218 
5219     } else {
5220         req_rpc_data->msg_info.body_fd = -1;
5221     }
5222 
5223     msg.pm.stream = req_rpc_data->stream;
5224     msg.pm.pid = reply_port->pid;
5225     msg.pm.reply_port = reply_port->id;
5226     msg.pm.type = NXT_PORT_MSG_REQ_HEADERS;
5227     msg.pm.last = 0;
5228     msg.pm.mmap = 1;
5229     msg.pm.nf = 0;
5230     msg.pm.mf = 0;
5231     msg.pm.tracking = 0;
5232 
5233     nxt_port_mmap_handler_t *mmap_handler = buf->parent;
5234     nxt_port_mmap_header_t *hdr = mmap_handler->hdr;
5235 
5236     msg.mm.mmap_id = hdr->id;
5237     msg.mm.chunk_id = nxt_port_mmap_chunk_id(hdr, buf->mem.pos);
5238     msg.mm.size = nxt_buf_used_size(buf);
5239 
5240     res = nxt_app_queue_send(port->queue, &msg, sizeof(msg),
5241                              req_rpc_data->stream, &notify,
5242                              &req_rpc_data->msg_info.tracking_cookie);
5243     if (nxt_fast_path(res == NXT_OK)) {
5244         if (notify != 0) {
5245             (void) nxt_port_socket_write(task, port,
5246                                          NXT_PORT_MSG_READ_QUEUE,
5247                                          -1, req_rpc_data->stream,
5248                                          reply_port->id, NULL);
5249 
5250         } else {
5251             nxt_debug(task, "queue is not empty");
5252         }
5253 
5254         buf->is_port_mmap_sent = 1;
5255         buf->mem.pos = buf->mem.free;
5256 
5257     } else {
5258         nxt_alert(task, "stream #%uD, app '%V': failed to send app message",
5259                   req_rpc_data->stream, &app->name);
5260 
5261         nxt_http_request_error(task, req_rpc_data->request,
5262                                NXT_HTTP_INTERNAL_SERVER_ERROR);
5263     }
5264 }
5265 
5266 
5267 struct nxt_fields_iter_s {
5268     nxt_list_part_t   *part;
5269     nxt_http_field_t  *field;
5270 };
5271 
5272 typedef struct nxt_fields_iter_s  nxt_fields_iter_t;
5273 
5274 
5275 static nxt_http_field_t *
5276 nxt_fields_part_first(nxt_list_part_t *part, nxt_fields_iter_t *i)
5277 {
5278     if (part == NULL) {
5279         return NULL;
5280     }
5281 
5282     while (part->nelts == 0) {
5283         part = part->next;
5284         if (part == NULL) {
5285             return NULL;
5286         }
5287     }
5288 
5289     i->part = part;
5290     i->field = nxt_list_data(i->part);
5291 
5292     return i->field;
5293 }
5294 
5295 
5296 static nxt_http_field_t *
5297 nxt_fields_first(nxt_list_t *fields, nxt_fields_iter_t *i)
5298 {
5299     return nxt_fields_part_first(nxt_list_part(fields), i);
5300 }
5301 
5302 
5303 static nxt_http_field_t *
5304 nxt_fields_next(nxt_fields_iter_t *i)
5305 {
5306     nxt_http_field_t  *end = nxt_list_data(i->part);
5307 
5308     end += i->part->nelts;
5309     i->field++;
5310 
5311     if (i->field < end) {
5312         return i->field;
5313     }
5314 
5315     return nxt_fields_part_first(i->part->next, i);
5316 }
5317 
5318 
5319 static nxt_buf_t *
5320 nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r,
5321     nxt_app_t *app, const nxt_str_t *prefix)
5322 {
5323     void                *target_pos, *query_pos;
5324     u_char              *pos, *end, *p, c;
5325     size_t              fields_count, req_size, size, free_size;
5326     size_t              copy_size;
5327     nxt_off_t           content_length;
5328     nxt_buf_t           *b, *buf, *out, **tail;
5329     nxt_http_field_t    *field, *dup;
5330     nxt_unit_field_t    *dst_field;
5331     nxt_fields_iter_t   iter, dup_iter;
5332     nxt_unit_request_t  *req;
5333 
5334     req_size = sizeof(nxt_unit_request_t)
5335                + r->method->length + 1
5336                + r->version.length + 1
5337                + r->remote->length + 1
5338                + r->local->length + 1
5339                + r->server_name.length + 1
5340                + r->target.length + 1
5341                + (r->path->start != r->target.start ? r->path->length + 1 : 0);
5342 
5343     content_length = r->content_length_n < 0 ? 0 : r->content_length_n;
5344     fields_count = 0;
5345 
5346     nxt_list_each(field, r->fields) {
5347         fields_count++;
5348 
5349         req_size += field->name_length + prefix->length + 1
5350                     + field->value_length + 1;
5351     } nxt_list_loop;
5352 
5353     req_size += fields_count * sizeof(nxt_unit_field_t);
5354 
5355     if (nxt_slow_path(req_size > PORT_MMAP_DATA_SIZE)) {
5356         nxt_alert(task, "headers to big to fit in shared memory (%d)",
5357                   (int) req_size);
5358 
5359         return NULL;
5360     }
5361 
5362     out = nxt_port_mmap_get_buf(task, &app->outgoing,
5363               nxt_min(req_size + content_length, PORT_MMAP_DATA_SIZE));
5364     if (nxt_slow_path(out == NULL)) {
5365         return NULL;
5366     }
5367 
5368     req = (nxt_unit_request_t *) out->mem.free;
5369     out->mem.free += req_size;
5370 
5371     req->app_target = r->app_target;
5372 
5373     req->content_length = content_length;
5374 
5375     p = (u_char *) (req->fields + fields_count);
5376 
5377     nxt_debug(task, "fields_count=%d", (int) fields_count);
5378 
5379     req->method_length = r->method->length;
5380     nxt_unit_sptr_set(&req->method, p);
5381     p = nxt_cpymem(p, r->method->start, r->method->length);
5382     *p++ = '\0';
5383 
5384     req->version_length = r->version.length;
5385     nxt_unit_sptr_set(&req->version, p);
5386     p = nxt_cpymem(p, r->version.start, r->version.length);
5387     *p++ = '\0';
5388 
5389     req->remote_length = r->remote->address_length;
5390     nxt_unit_sptr_set(&req->remote, p);
5391     p = nxt_cpymem(p, nxt_sockaddr_address(r->remote),
5392                    r->remote->address_length);
5393     *p++ = '\0';
5394 
5395     req->local_length = r->local->address_length;
5396     nxt_unit_sptr_set(&req->local, p);
5397     p = nxt_cpymem(p, nxt_sockaddr_address(r->local), r->local->address_length);
5398     *p++ = '\0';
5399 
5400     req->tls = (r->tls != NULL);
5401     req->websocket_handshake = r->websocket_handshake;
5402 
5403     req->server_name_length = r->server_name.length;
5404     nxt_unit_sptr_set(&req->server_name, p);
5405     p = nxt_cpymem(p, r->server_name.start, r->server_name.length);
5406     *p++ = '\0';
5407 
5408     target_pos = p;
5409     req->target_length = (uint32_t) r->target.length;
5410     nxt_unit_sptr_set(&req->target, p);
5411     p = nxt_cpymem(p, r->target.start, r->target.length);
5412     *p++ = '\0';
5413 
5414     req->path_length = (uint32_t) r->path->length;
5415     if (r->path->start == r->target.start) {
5416         nxt_unit_sptr_set(&req->path, target_pos);
5417 
5418     } else {
5419         nxt_unit_sptr_set(&req->path, p);
5420         p = nxt_cpymem(p, r->path->start, r->path->length);
5421         *p++ = '\0';
5422     }
5423 
5424     req->query_length = r->args != NULL ? (uint32_t) r->args->length : 0;
5425     if (r->args != NULL && r->args->start != NULL) {
5426         query_pos = nxt_pointer_to(target_pos,
5427                                    r->args->start - r->target.start);
5428 
5429         nxt_unit_sptr_set(&req->query, query_pos);
5430 
5431     } else {
5432         req->query.offset = 0;
5433     }
5434 
5435     req->content_length_field = NXT_UNIT_NONE_FIELD;
5436     req->content_type_field   = NXT_UNIT_NONE_FIELD;
5437     req->cookie_field         = NXT_UNIT_NONE_FIELD;
5438     req->authorization_field  = NXT_UNIT_NONE_FIELD;
5439 
5440     dst_field = req->fields;
5441 
5442     for (field = nxt_fields_first(r->fields, &iter);
5443          field != NULL;
5444          field = nxt_fields_next(&iter))
5445     {
5446         if (field->skip) {
5447             continue;
5448         }
5449 
5450         dst_field->hash = field->hash;
5451         dst_field->skip = 0;
5452         dst_field->name_length = field->name_length + prefix->length;
5453         dst_field->value_length = field->value_length;
5454 
5455         if (field == r->content_length) {
5456             req->content_length_field = dst_field - req->fields;
5457 
5458         } else if (field == r->content_type) {
5459             req->content_type_field = dst_field - req->fields;
5460 
5461         } else if (field == r->cookie) {
5462             req->cookie_field = dst_field - req->fields;
5463 
5464         } else if (field == r->authorization) {
5465             req->authorization_field = dst_field - req->fields;
5466         }
5467 
5468         nxt_debug(task, "add field 0x%04Xd, %d, %d, %p : %d %p",
5469                   (int) field->hash, (int) field->skip,
5470                   (int) field->name_length, field->name,
5471                   (int) field->value_length, field->value);
5472 
5473         if (prefix->length != 0) {
5474             nxt_unit_sptr_set(&dst_field->name, p);
5475             p = nxt_cpymem(p, prefix->start, prefix->length);
5476 
5477             end = field->name + field->name_length;
5478             for (pos = field->name; pos < end; pos++) {
5479                 c = *pos;
5480 
5481                 if (c >= 'a' && c <= 'z') {
5482                     *p++ = (c & ~0x20);
5483                     continue;
5484                 }
5485 
5486                 if (c == '-') {
5487                     *p++ = '_';
5488                     continue;
5489                 }
5490 
5491                 *p++ = c;
5492             }
5493 
5494         } else {
5495             nxt_unit_sptr_set(&dst_field->name, p);
5496             p = nxt_cpymem(p, field->name, field->name_length);
5497         }
5498 
5499         *p++ = '\0';
5500 
5501         nxt_unit_sptr_set(&dst_field->value, p);
5502         p = nxt_cpymem(p, field->value, field->value_length);
5503 
5504         if (prefix->length != 0) {
5505             dup_iter = iter;
5506 
5507             for (dup = nxt_fields_next(&dup_iter);
5508                  dup != NULL;
5509                  dup = nxt_fields_next(&dup_iter))
5510             {
5511                 if (dup->name_length != field->name_length
5512                     || dup->skip
5513                     || dup->hash != field->hash
5514                     || nxt_memcasecmp(dup->name, field->name, dup->name_length))
5515                 {
5516                     continue;
5517                 }
5518 
5519                 p = nxt_cpymem(p, ", ", 2);
5520                 p = nxt_cpymem(p, dup->value, dup->value_length);
5521 
5522                 dst_field->value_length += 2 + dup->value_length;
5523 
5524                 dup->skip = 1;
5525             }
5526         }
5527 
5528         *p++ = '\0';
5529 
5530         dst_field++;
5531     }
5532 
5533     req->fields_count = (uint32_t) (dst_field - req->fields);
5534 
5535     nxt_unit_sptr_set(&req->preread_content, out->mem.free);
5536 
5537     buf = out;
5538     tail = &buf->next;
5539 
5540     for (b = r->body; b != NULL; b = b->next) {
5541         size = nxt_buf_mem_used_size(&b->mem);
5542         pos = b->mem.pos;
5543 
5544         while (size > 0) {
5545             if (buf == NULL) {
5546                 free_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
5547 
5548                 buf = nxt_port_mmap_get_buf(task, &app->outgoing, free_size);
5549                 if (nxt_slow_path(buf == NULL)) {
5550                     while (out != NULL) {
5551                         buf = out->next;
5552                         out->next = NULL;
5553                         out->completion_handler(task, out, out->parent);
5554                         out = buf;
5555                     }
5556                     return NULL;
5557                 }
5558 
5559                 *tail = buf;
5560                 tail = &buf->next;
5561 
5562             } else {
5563                 free_size = nxt_buf_mem_free_size(&buf->mem);
5564                 if (free_size < size
5565                     && nxt_port_mmap_increase_buf(task, buf, size, 1)
5566                        == NXT_OK)
5567                 {
5568                     free_size = nxt_buf_mem_free_size(&buf->mem);
5569                 }
5570             }
5571 
5572             if (free_size > 0) {
5573                 copy_size = nxt_min(free_size, size);
5574 
5575                 buf->mem.free = nxt_cpymem(buf->mem.free, pos, copy_size);
5576 
5577                 size -= copy_size;
5578                 pos += copy_size;
5579 
5580                 if (size == 0) {
5581                     break;
5582                 }
5583             }
5584 
5585             buf = NULL;
5586         }
5587     }
5588 
5589     return out;
5590 }
5591 
5592 
5593 static void
5594 nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
5595 {
5596     nxt_timer_t              *timer;
5597     nxt_http_request_t       *r;
5598     nxt_request_rpc_data_t   *req_rpc_data;
5599 
5600     timer = obj;
5601 
5602     nxt_debug(task, "router app timeout");
5603 
5604     r = nxt_timer_data(timer, nxt_http_request_t, timer);
5605     req_rpc_data = r->timer_data;
5606 
5607     nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
5608 
5609     nxt_request_rpc_data_unlink(task, req_rpc_data);
5610 }
5611 
5612 
5613 static void
5614 nxt_router_http_request_release_post(nxt_task_t *task, nxt_http_request_t *r)
5615 {
5616     r->timer.handler = nxt_router_http_request_release;
5617     nxt_timer_add(task->thread->engine, &r->timer, 0);
5618 }
5619 
5620 
5621 static void
5622 nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data)
5623 {
5624     nxt_http_request_t  *r;
5625 
5626     nxt_debug(task, "http request pool release");
5627 
5628     r = nxt_timer_data(obj, nxt_http_request_t, timer);
5629 
5630     nxt_mp_release(r->mem_pool);
5631 }
5632 
5633 
5634 static void
5635 nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5636 {
5637     size_t                   mi;
5638     uint32_t                 i;
5639     nxt_bool_t               ack;
5640     nxt_process_t            *process;
5641     nxt_free_map_t           *m;
5642     nxt_port_mmap_handler_t  *mmap_handler;
5643 
5644     nxt_debug(task, "oosm in %PI", msg->port_msg.pid);
5645 
5646     process = nxt_runtime_process_find(task->thread->runtime,
5647                                        msg->port_msg.pid);
5648     if (nxt_slow_path(process == NULL)) {
5649         return;
5650     }
5651 
5652     ack = 0;
5653 
5654     /*
5655      * To mitigate possible racing condition (when OOSM message received
5656      * after some of the memory was already freed), need to try to find
5657      * first free segment in shared memory and send ACK if found.
5658      */
5659 
5660     nxt_thread_mutex_lock(&process->incoming.mutex);
5661 
5662     for (i = 0; i < process->incoming.size; i++) {
5663         mmap_handler = process->incoming.elts[i].mmap_handler;
5664 
5665         if (nxt_slow_path(mmap_handler == NULL)) {
5666             continue;
5667         }
5668 
5669         m = mmap_handler->hdr->free_map;
5670 
5671         for (mi = 0; mi < MAX_FREE_IDX; mi++) {
5672             if (m[mi] != 0) {
5673                 ack = 1;
5674 
5675                 nxt_debug(task, "oosm: already free #%uD %uz = 0x%08xA",
5676                           i, mi, m[mi]);
5677 
5678                 break;
5679             }
5680         }
5681     }
5682 
5683     nxt_thread_mutex_unlock(&process->incoming.mutex);
5684 
5685     if (ack) {
5686         nxt_process_broadcast_shm_ack(task, process);
5687     }
5688 }
5689 
5690 
5691 static void
5692 nxt_router_get_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5693 {
5694     nxt_fd_t                 fd;
5695     nxt_port_t               *port;
5696     nxt_runtime_t            *rt;
5697     nxt_port_mmaps_t         *mmaps;
5698     nxt_port_msg_get_mmap_t  *get_mmap_msg;
5699     nxt_port_mmap_handler_t  *mmap_handler;
5700 
5701     rt = task->thread->runtime;
5702 
5703     port = nxt_runtime_port_find(rt, msg->port_msg.pid,
5704                                  msg->port_msg.reply_port);
5705     if (nxt_slow_path(port == NULL)) {
5706         nxt_alert(task, "get_mmap_handler: reply_port %PI:%d not found",
5707                   msg->port_msg.pid, msg->port_msg.reply_port);
5708 
5709         return;
5710     }
5711 
5712     if (nxt_slow_path(nxt_buf_used_size(msg->buf)
5713                       < (int) sizeof(nxt_port_msg_get_mmap_t)))
5714     {
5715         nxt_alert(task, "get_mmap_handler: message buffer too small (%d)",
5716                   (int) nxt_buf_used_size(msg->buf));
5717 
5718         return;
5719     }
5720 
5721     get_mmap_msg = (nxt_port_msg_get_mmap_t *) msg->buf->mem.pos;
5722 
5723     nxt_assert(port->type == NXT_PROCESS_APP);
5724 
5725     if (nxt_slow_path(port->app == NULL)) {
5726         nxt_alert(task, "get_mmap_handler: app == NULL for reply port %PI:%d",
5727                   port->pid, port->id);
5728 
5729         // FIXME
5730         nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
5731                               -1, msg->port_msg.stream, 0, NULL);
5732 
5733         return;
5734     }
5735 
5736     mmaps = &port->app->outgoing;
5737     nxt_thread_mutex_lock(&mmaps->mutex);
5738 
5739     if (nxt_slow_path(get_mmap_msg->id >= mmaps->size)) {
5740         nxt_thread_mutex_unlock(&mmaps->mutex);
5741 
5742         nxt_alert(task, "get_mmap_handler: mmap id is too big (%d)",
5743                   (int) get_mmap_msg->id);
5744 
5745         // FIXME
5746         nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
5747                               -1, msg->port_msg.stream, 0, NULL);
5748         return;
5749     }
5750 
5751     mmap_handler = mmaps->elts[get_mmap_msg->id].mmap_handler;
5752 
5753     fd = mmap_handler->fd;
5754 
5755     nxt_thread_mutex_unlock(&mmaps->mutex);
5756 
5757     nxt_debug(task, "get mmap %PI:%d found",
5758               msg->port_msg.pid, (int) get_mmap_msg->id);
5759 
5760     (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL);
5761 }
5762 
5763 
5764 static void
5765 nxt_router_get_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5766 {
5767     nxt_port_t               *port, *reply_port;
5768     nxt_runtime_t            *rt;
5769     nxt_port_msg_get_port_t  *get_port_msg;
5770 
5771     rt = task->thread->runtime;
5772 
5773     reply_port = nxt_runtime_port_find(rt, msg->port_msg.pid,
5774                                        msg->port_msg.reply_port);
5775     if (nxt_slow_path(reply_port == NULL)) {
5776         nxt_alert(task, "get_port_handler: reply_port %PI:%d not found",
5777                   msg->port_msg.pid, msg->port_msg.reply_port);
5778 
5779         return;
5780     }
5781 
5782     if (nxt_slow_path(nxt_buf_used_size(msg->buf)
5783                       < (int) sizeof(nxt_port_msg_get_port_t)))
5784     {
5785         nxt_alert(task, "get_port_handler: message buffer too small (%d)",
5786                   (int) nxt_buf_used_size(msg->buf));
5787 
5788         return;
5789     }
5790 
5791     get_port_msg = (nxt_port_msg_get_port_t *) msg->buf->mem.pos;
5792 
5793     port = nxt_runtime_port_find(rt, get_port_msg->pid, get_port_msg->id);
5794     if (nxt_slow_path(port == NULL)) {
5795         nxt_alert(task, "get_port_handler: port %PI:%d not found",
5796                   get_port_msg->pid, get_port_msg->id);
5797 
5798         return;
5799     }
5800 
5801     nxt_debug(task, "get port %PI:%d found", get_port_msg->pid,
5802               get_port_msg->id);
5803 
5804     (void) nxt_port_send_port(task, reply_port, port, msg->port_msg.stream);
5805 }
5806