xref: /unit/src/nxt_router.c (revision 1936:953434450ea9)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Valentin V. Bartenev
5  * Copyright (C) NGINX, Inc.
6  */
7 
8 #include <nxt_router.h>
9 #include <nxt_conf.h>
10 #if (NXT_TLS)
11 #include <nxt_cert.h>
12 #endif
13 #include <nxt_http.h>
14 #include <nxt_port_memory_int.h>
15 #include <nxt_unit_request.h>
16 #include <nxt_unit_response.h>
17 #include <nxt_router_request.h>
18 #include <nxt_app_queue.h>
19 #include <nxt_port_queue.h>
20 
21 #define NXT_SHARED_PORT_ID  0xFFFFu
22 
23 typedef struct {
24     nxt_str_t         type;
25     uint32_t          processes;
26     uint32_t          max_processes;
27     uint32_t          spare_processes;
28     nxt_msec_t        timeout;
29     nxt_msec_t        idle_timeout;
30     uint32_t          requests;
31     nxt_conf_value_t  *limits_value;
32     nxt_conf_value_t  *processes_value;
33     nxt_conf_value_t  *targets_value;
34 } nxt_router_app_conf_t;
35 
36 
37 typedef struct {
38     nxt_str_t         pass;
39     nxt_str_t         application;
40 } nxt_router_listener_conf_t;
41 
42 
43 #if (NXT_TLS)
44 
45 typedef struct {
46     nxt_str_t               name;
47     nxt_socket_conf_t       *socket_conf;
48     nxt_router_temp_conf_t  *temp_conf;
49     nxt_tls_init_t          *tls_init;
50     nxt_bool_t              last;
51 
52     nxt_queue_link_t        link;  /* for nxt_socket_conf_t.tls */
53 } nxt_router_tlssock_t;
54 
55 #endif
56 
57 
58 typedef struct {
59     nxt_str_t               *name;
60     nxt_socket_conf_t       *socket_conf;
61     nxt_router_temp_conf_t  *temp_conf;
62     nxt_bool_t              last;
63 } nxt_socket_rpc_t;
64 
65 
66 typedef struct {
67     nxt_app_t               *app;
68     nxt_router_temp_conf_t  *temp_conf;
69 } nxt_app_rpc_t;
70 
71 
72 typedef struct {
73     nxt_app_joint_t         *app_joint;
74     uint32_t                generation;
75 } nxt_app_joint_rpc_t;
76 
77 
78 static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process,
79     nxt_mp_t *mp);
80 static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data);
81 static void nxt_router_greet_controller(nxt_task_t *task,
82     nxt_port_t *controller_port);
83 
84 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
85 
86 static void nxt_router_new_port_handler(nxt_task_t *task,
87     nxt_port_recv_msg_t *msg);
88 static void nxt_router_conf_data_handler(nxt_task_t *task,
89     nxt_port_recv_msg_t *msg);
90 static void nxt_router_app_restart_handler(nxt_task_t *task,
91     nxt_port_recv_msg_t *msg);
92 static void nxt_router_remove_pid_handler(nxt_task_t *task,
93     nxt_port_recv_msg_t *msg);
94 static void nxt_router_access_log_reopen_handler(nxt_task_t *task,
95     nxt_port_recv_msg_t *msg);
96 
97 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
98 static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
99 static void nxt_router_conf_ready(nxt_task_t *task,
100     nxt_router_temp_conf_t *tmcf);
101 static void nxt_router_conf_error(nxt_task_t *task,
102     nxt_router_temp_conf_t *tmcf);
103 static void nxt_router_conf_send(nxt_task_t *task,
104     nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
105 
106 static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
107     nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
108 static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task,
109     nxt_router_conf_t *rtcf, nxt_conf_value_t *conf);
110 static nxt_int_t nxt_router_conf_process_client_ip(nxt_task_t *task,
111     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf,
112     nxt_conf_value_t *conf);
113 
114 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
115 static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data);
116 static nxt_int_t nxt_router_apps_hash_add(nxt_router_conf_t *rtcf,
117     nxt_app_t *app);
118 static nxt_app_t *nxt_router_apps_hash_get(nxt_router_conf_t *rtcf,
119     nxt_str_t *name);
120 static void nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf,
121     int i);
122 
123 static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task,
124     nxt_port_t *port);
125 static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task,
126     nxt_port_t *port);
127 static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task,
128     nxt_port_t *port, nxt_fd_t fd);
129 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
130     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
131 static void nxt_router_listen_socket_ready(nxt_task_t *task,
132     nxt_port_recv_msg_t *msg, void *data);
133 static void nxt_router_listen_socket_error(nxt_task_t *task,
134     nxt_port_recv_msg_t *msg, void *data);
135 #if (NXT_TLS)
136 static void nxt_router_tls_rpc_handler(nxt_task_t *task,
137     nxt_port_recv_msg_t *msg, void *data);
138 static nxt_int_t nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
139     nxt_conf_value_t *value, nxt_socket_conf_t *skcf, nxt_tls_init_t *tls_init,
140     nxt_bool_t last);
141 #endif
142 static void nxt_router_app_rpc_create(nxt_task_t *task,
143     nxt_router_temp_conf_t *tmcf, nxt_app_t *app);
144 static void nxt_router_app_prefork_ready(nxt_task_t *task,
145     nxt_port_recv_msg_t *msg, void *data);
146 static void nxt_router_app_prefork_error(nxt_task_t *task,
147     nxt_port_recv_msg_t *msg, void *data);
148 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
149     nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
150 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
151     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa);
152 
153 static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
154     nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
155     const nxt_event_interface_t *interface);
156 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
157     nxt_router_engine_conf_t *recf);
158 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
159     nxt_router_engine_conf_t *recf);
160 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
161     nxt_router_engine_conf_t *recf);
162 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
163     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
164     nxt_work_handler_t handler);
165 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
166     nxt_router_engine_conf_t *recf);
167 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
168     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
169 
170 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
171     nxt_router_temp_conf_t *tmcf);
172 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
173     nxt_event_engine_t *engine);
174 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
175     nxt_router_temp_conf_t *tmcf);
176 
177 static void nxt_router_engines_post(nxt_router_t *router,
178     nxt_router_temp_conf_t *tmcf);
179 static void nxt_router_engine_post(nxt_event_engine_t *engine,
180     nxt_work_t *jobs);
181 
182 static void nxt_router_thread_start(void *data);
183 static void nxt_router_rt_add_port(nxt_task_t *task, void *obj,
184     void *data);
185 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
186     void *data);
187 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
188     void *data);
189 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
190     void *data);
191 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
192     void *data);
193 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
194     void *data);
195 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
196     void *data);
197 static void nxt_router_req_headers_ack_handler(nxt_task_t *task,
198     nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data);
199 static void nxt_router_listen_socket_release(nxt_task_t *task,
200     nxt_socket_conf_t *skcf);
201 
202 static void nxt_router_access_log_writer(nxt_task_t *task,
203     nxt_http_request_t *r, nxt_router_access_log_t *access_log);
204 static u_char *nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now,
205     struct tm *tm, size_t size, const char *format);
206 static void nxt_router_access_log_open(nxt_task_t *task,
207     nxt_router_temp_conf_t *tmcf);
208 static void nxt_router_access_log_ready(nxt_task_t *task,
209     nxt_port_recv_msg_t *msg, void *data);
210 static void nxt_router_access_log_error(nxt_task_t *task,
211     nxt_port_recv_msg_t *msg, void *data);
212 static void nxt_router_access_log_release(nxt_task_t *task,
213     nxt_thread_spinlock_t *lock, nxt_router_access_log_t *access_log);
214 static void nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj,
215     void *data);
216 static void nxt_router_access_log_reopen_ready(nxt_task_t *task,
217     nxt_port_recv_msg_t *msg, void *data);
218 static void nxt_router_access_log_reopen_error(nxt_task_t *task,
219     nxt_port_recv_msg_t *msg, void *data);
220 
221 static void nxt_router_app_port_ready(nxt_task_t *task,
222     nxt_port_recv_msg_t *msg, void *data);
223 static nxt_int_t nxt_router_app_shared_port_send(nxt_task_t *task,
224     nxt_port_t *app_port);
225 static void nxt_router_app_port_error(nxt_task_t *task,
226     nxt_port_recv_msg_t *msg, void *data);
227 
228 static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i);
229 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
230 
231 static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
232     nxt_apr_action_t action);
233 static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
234     nxt_request_rpc_data_t *req_rpc_data);
235 static void nxt_router_http_request_error(nxt_task_t *task, void *obj,
236     void *data);
237 static void nxt_router_http_request_done(nxt_task_t *task, void *obj,
238     void *data);
239 
240 static void nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj,
241     void *data);
242 static void nxt_router_app_prepare_request(nxt_task_t *task,
243     nxt_request_rpc_data_t *req_rpc_data);
244 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task,
245     nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix);
246 
247 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
248 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
249     void *data);
250 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj,
251     void *data);
252 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj,
253     void *data);
254 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data);
255 
256 static const nxt_http_request_state_t  nxt_http_request_send_state;
257 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
258 
259 static void nxt_router_app_joint_use(nxt_task_t *task,
260     nxt_app_joint_t *app_joint, int i);
261 
262 static void nxt_router_http_request_release_post(nxt_task_t *task,
263     nxt_http_request_t *r);
264 static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
265     void *data);
266 static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
267 static void nxt_router_get_port_handler(nxt_task_t *task,
268     nxt_port_recv_msg_t *msg);
269 static void nxt_router_get_mmap_handler(nxt_task_t *task,
270     nxt_port_recv_msg_t *msg);
271 
272 extern const nxt_http_request_state_t  nxt_http_websocket;
273 
274 static nxt_router_t  *nxt_router;
275 
276 static const nxt_str_t http_prefix = nxt_string("HTTP_");
277 static const nxt_str_t empty_prefix = nxt_string("");
278 
279 static const nxt_str_t  *nxt_app_msg_prefix[] = {
280     &empty_prefix,
281     &empty_prefix,
282     &http_prefix,
283     &http_prefix,
284     &http_prefix,
285     &empty_prefix,
286 };
287 
288 
289 static const nxt_port_handlers_t  nxt_router_process_port_handlers = {
290     .quit         = nxt_signal_quit_handler,
291     .new_port     = nxt_router_new_port_handler,
292     .get_port     = nxt_router_get_port_handler,
293     .change_file  = nxt_port_change_log_file_handler,
294     .mmap         = nxt_port_mmap_handler,
295     .get_mmap     = nxt_router_get_mmap_handler,
296     .data         = nxt_router_conf_data_handler,
297     .app_restart  = nxt_router_app_restart_handler,
298     .remove_pid   = nxt_router_remove_pid_handler,
299     .access_log   = nxt_router_access_log_reopen_handler,
300     .rpc_ready    = nxt_port_rpc_handler,
301     .rpc_error    = nxt_port_rpc_handler,
302     .oosm         = nxt_router_oosm_handler,
303 };
304 
305 
306 const nxt_process_init_t  nxt_router_process = {
307     .name           = "router",
308     .type           = NXT_PROCESS_ROUTER,
309     .prefork        = nxt_router_prefork,
310     .restart        = 1,
311     .setup          = nxt_process_core_setup,
312     .start          = nxt_router_start,
313     .port_handlers  = &nxt_router_process_port_handlers,
314     .signals        = nxt_process_signals,
315 };
316 
317 
318 /* Queues of nxt_socket_conf_t */
319 nxt_queue_t  creating_sockets;
320 nxt_queue_t  pending_sockets;
321 nxt_queue_t  updating_sockets;
322 nxt_queue_t  keeping_sockets;
323 nxt_queue_t  deleting_sockets;
324 
325 
326 static nxt_int_t
327 nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp)
328 {
329     nxt_runtime_stop_app_processes(task, task->thread->runtime);
330 
331     return NXT_OK;
332 }
333 
334 
335 static nxt_int_t
336 nxt_router_start(nxt_task_t *task, nxt_process_data_t *data)
337 {
338     nxt_int_t      ret;
339     nxt_port_t     *controller_port;
340     nxt_router_t   *router;
341     nxt_runtime_t  *rt;
342 
343     rt = task->thread->runtime;
344 
345     nxt_log(task, NXT_LOG_INFO, "router started");
346 
347 #if (NXT_TLS)
348     rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL");
349     if (nxt_slow_path(rt->tls == NULL)) {
350         return NXT_ERROR;
351     }
352 
353     ret = rt->tls->library_init(task);
354     if (nxt_slow_path(ret != NXT_OK)) {
355         return ret;
356     }
357 #endif
358 
359     ret = nxt_http_init(task);
360     if (nxt_slow_path(ret != NXT_OK)) {
361         return ret;
362     }
363 
364     router = nxt_zalloc(sizeof(nxt_router_t));
365     if (nxt_slow_path(router == NULL)) {
366         return NXT_ERROR;
367     }
368 
369     nxt_queue_init(&router->engines);
370     nxt_queue_init(&router->sockets);
371     nxt_queue_init(&router->apps);
372 
373     nxt_router = router;
374 
375     controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
376     if (controller_port != NULL) {
377         nxt_router_greet_controller(task, controller_port);
378     }
379 
380     return NXT_OK;
381 }
382 
383 
384 static void
385 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port)
386 {
387     nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY,
388                           -1, 0, 0, NULL);
389 }
390 
391 
392 static void
393 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
394     void *data)
395 {
396     size_t               size;
397     uint32_t             stream;
398     nxt_mp_t             *mp;
399     nxt_int_t            ret;
400     nxt_app_t            *app;
401     nxt_buf_t            *b;
402     nxt_port_t           *main_port;
403     nxt_runtime_t        *rt;
404     nxt_app_joint_rpc_t  *app_joint_rpc;
405 
406     app = data;
407 
408     rt = task->thread->runtime;
409     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
410 
411     nxt_debug(task, "app '%V' %p start process", &app->name, app);
412 
413     size = app->name.length + 1 + app->conf.length;
414 
415     b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size);
416 
417     if (nxt_slow_path(b == NULL)) {
418         goto failed;
419     }
420 
421     nxt_buf_cpystr(b, &app->name);
422     *b->mem.free++ = '\0';
423     nxt_buf_cpystr(b, &app->conf);
424 
425     app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port,
426                                                      nxt_router_app_port_ready,
427                                                      nxt_router_app_port_error,
428                                                    sizeof(nxt_app_joint_rpc_t));
429     if (nxt_slow_path(app_joint_rpc == NULL)) {
430         goto failed;
431     }
432 
433     stream = nxt_port_rpc_ex_stream(app_joint_rpc);
434 
435     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS,
436                                 -1, stream, port->id, b);
437     if (nxt_slow_path(ret != NXT_OK)) {
438         nxt_port_rpc_cancel(task, port, stream);
439 
440         goto failed;
441     }
442 
443     app_joint_rpc->app_joint = app->joint;
444     app_joint_rpc->generation = app->generation;
445 
446     nxt_router_app_joint_use(task, app->joint, 1);
447 
448     nxt_router_app_use(task, app, -1);
449 
450     return;
451 
452 failed:
453 
454     if (b != NULL) {
455         mp = b->data;
456         nxt_mp_free(mp, b);
457         nxt_mp_release(mp);
458     }
459 
460     nxt_thread_mutex_lock(&app->mutex);
461 
462     app->pending_processes--;
463 
464     nxt_thread_mutex_unlock(&app->mutex);
465 
466     nxt_router_app_use(task, app, -1);
467 }
468 
469 
470 static void
471 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i)
472 {
473     app_joint->use_count += i;
474 
475     if (app_joint->use_count == 0) {
476         nxt_assert(app_joint->app == NULL);
477 
478         nxt_free(app_joint);
479     }
480 }
481 
482 
483 static nxt_int_t
484 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
485 {
486     nxt_int_t      res;
487     nxt_port_t     *router_port;
488     nxt_runtime_t  *rt;
489 
490     nxt_debug(task, "app '%V' start process", &app->name);
491 
492     rt = task->thread->runtime;
493     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
494 
495     nxt_router_app_use(task, app, 1);
496 
497     res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler,
498                         app);
499 
500     if (res == NXT_OK) {
501         return res;
502     }
503 
504     nxt_thread_mutex_lock(&app->mutex);
505 
506     app->pending_processes--;
507 
508     nxt_thread_mutex_unlock(&app->mutex);
509 
510     nxt_router_app_use(task, app, -1);
511 
512     return NXT_ERROR;
513 }
514 
515 
516 nxt_inline nxt_bool_t
517 nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data)
518 {
519     nxt_buf_t       *b, *next;
520     nxt_bool_t      cancelled;
521     nxt_port_t      *app_port;
522     nxt_msg_info_t  *msg_info;
523 
524     msg_info = &req_rpc_data->msg_info;
525 
526     if (msg_info->buf == NULL) {
527         return 0;
528     }
529 
530     app_port = req_rpc_data->app_port;
531 
532     if (app_port != NULL && app_port->id == NXT_SHARED_PORT_ID) {
533         cancelled = nxt_app_queue_cancel(app_port->queue,
534                                          msg_info->tracking_cookie,
535                                          req_rpc_data->stream);
536 
537         if (cancelled) {
538             nxt_debug(task, "stream #%uD: cancelled by router",
539                       req_rpc_data->stream);
540         }
541 
542     } else {
543         cancelled = 0;
544     }
545 
546     for (b = msg_info->buf; b != NULL; b = next) {
547         next = b->next;
548         b->next = NULL;
549 
550         if (b->is_port_mmap_sent) {
551             b->is_port_mmap_sent = cancelled == 0;
552         }
553 
554         b->completion_handler(task, b, b->parent);
555     }
556 
557     msg_info->buf = NULL;
558 
559     return cancelled;
560 }
561 
562 
563 nxt_inline nxt_bool_t
564 nxt_queue_chk_remove(nxt_queue_link_t *lnk)
565 {
566     if (lnk->next != NULL) {
567         nxt_queue_remove(lnk);
568 
569         lnk->next = NULL;
570 
571         return 1;
572     }
573 
574     return 0;
575 }
576 
577 
578 nxt_inline void
579 nxt_request_rpc_data_unlink(nxt_task_t *task,
580     nxt_request_rpc_data_t *req_rpc_data)
581 {
582     nxt_app_t           *app;
583     nxt_bool_t          unlinked;
584     nxt_http_request_t  *r;
585 
586     nxt_router_msg_cancel(task, req_rpc_data);
587 
588     if (req_rpc_data->app_port != NULL) {
589         nxt_router_app_port_release(task, req_rpc_data->app_port,
590                                     req_rpc_data->apr_action);
591 
592         req_rpc_data->app_port = NULL;
593     }
594 
595     app = req_rpc_data->app;
596     r = req_rpc_data->request;
597 
598     if (r != NULL) {
599         r->timer_data = NULL;
600 
601         nxt_router_http_request_release_post(task, r);
602 
603         r->req_rpc_data = NULL;
604         req_rpc_data->request = NULL;
605 
606         if (app != NULL) {
607             unlinked = 0;
608 
609             nxt_thread_mutex_lock(&app->mutex);
610 
611             if (r->app_link.next != NULL) {
612                 nxt_queue_remove(&r->app_link);
613                 r->app_link.next = NULL;
614 
615                 unlinked = 1;
616             }
617 
618             nxt_thread_mutex_unlock(&app->mutex);
619 
620             if (unlinked) {
621                 nxt_mp_release(r->mem_pool);
622             }
623         }
624     }
625 
626     if (app != NULL) {
627         nxt_router_app_use(task, app, -1);
628 
629         req_rpc_data->app = NULL;
630     }
631 
632     if (req_rpc_data->msg_info.body_fd != -1) {
633         nxt_fd_close(req_rpc_data->msg_info.body_fd);
634 
635         req_rpc_data->msg_info.body_fd = -1;
636     }
637 
638     if (req_rpc_data->rpc_cancel) {
639         req_rpc_data->rpc_cancel = 0;
640 
641         nxt_port_rpc_cancel(task, task->thread->engine->port,
642                             req_rpc_data->stream);
643     }
644 }
645 
646 
647 static void
648 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
649 {
650     nxt_int_t      res;
651     nxt_app_t      *app;
652     nxt_port_t     *port, *main_app_port;
653     nxt_runtime_t  *rt;
654 
655     nxt_port_new_port_handler(task, msg);
656 
657     port = msg->u.new_port;
658 
659     if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) {
660         nxt_router_greet_controller(task, msg->u.new_port);
661     }
662 
663     if (port == NULL || port->type != NXT_PROCESS_APP) {
664 
665         if (msg->port_msg.stream == 0) {
666             return;
667         }
668 
669         msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
670 
671     } else {
672         if (msg->fd[1] != -1) {
673             res = nxt_router_port_queue_map(task, port, msg->fd[1]);
674             if (nxt_slow_path(res != NXT_OK)) {
675                 return;
676             }
677 
678             nxt_fd_close(msg->fd[1]);
679             msg->fd[1] = -1;
680         }
681     }
682 
683     if (msg->port_msg.stream != 0) {
684         nxt_port_rpc_handler(task, msg);
685         return;
686     }
687 
688     /*
689      * Port with "id == 0" is application 'main' port and it always
690      * should come with non-zero stream.
691      */
692     nxt_assert(port->id != 0);
693 
694     /* Find 'main' app port and get app reference. */
695     rt = task->thread->runtime;
696 
697     /*
698      * It is safe to access 'runtime->ports' hash because 'NEW_PORT'
699      * sent to main port (with id == 0) and processed in main thread.
700      */
701     main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0);
702     nxt_assert(main_app_port != NULL);
703 
704     app = main_app_port->app;
705 
706     if (nxt_fast_path(app != NULL)) {
707         nxt_thread_mutex_lock(&app->mutex);
708 
709         /* TODO here should be find-and-add code because there can be
710            port waiters in port_hash */
711         nxt_port_hash_add(&app->port_hash, port);
712         app->port_hash_count++;
713 
714         nxt_thread_mutex_unlock(&app->mutex);
715 
716         port->app = app;
717     }
718 
719     port->main_app_port = main_app_port;
720 
721     nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
722 }
723 
724 
725 static void
726 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
727 {
728     void                    *p;
729     size_t                  size;
730     nxt_int_t               ret;
731     nxt_port_t              *port;
732     nxt_router_temp_conf_t  *tmcf;
733 
734     port = nxt_runtime_port_find(task->thread->runtime,
735                                  msg->port_msg.pid,
736                                  msg->port_msg.reply_port);
737     if (nxt_slow_path(port == NULL)) {
738         nxt_alert(task, "conf_data_handler: reply port not found");
739         return;
740     }
741 
742     p = MAP_FAILED;
743 
744     /*
745      * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be
746      * initialized in 'cleanup' section.
747      */
748     size = 0;
749 
750     tmcf = nxt_router_temp_conf(task);
751     if (nxt_slow_path(tmcf == NULL)) {
752         goto fail;
753     }
754 
755     if (nxt_slow_path(msg->fd[0] == -1)) {
756         nxt_alert(task, "conf_data_handler: invalid shm fd");
757         goto fail;
758     }
759 
760     if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) {
761         nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)",
762                   (int) nxt_buf_mem_used_size(&msg->buf->mem));
763         goto fail;
764     }
765 
766     nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t));
767 
768     p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0);
769 
770     nxt_fd_close(msg->fd[0]);
771     msg->fd[0] = -1;
772 
773     if (nxt_slow_path(p == MAP_FAILED)) {
774         goto fail;
775     }
776 
777     nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p);
778 
779     tmcf->router_conf->router = nxt_router;
780     tmcf->stream = msg->port_msg.stream;
781     tmcf->port = port;
782 
783     nxt_port_use(task, tmcf->port, 1);
784 
785     ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size));
786 
787     if (nxt_fast_path(ret == NXT_OK)) {
788         nxt_router_conf_apply(task, tmcf, NULL);
789 
790     } else {
791         nxt_router_conf_error(task, tmcf);
792     }
793 
794     goto cleanup;
795 
796 fail:
797 
798     nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
799                           msg->port_msg.stream, 0, NULL);
800 
801     if (tmcf != NULL) {
802         nxt_mp_release(tmcf->mem_pool);
803     }
804 
805 cleanup:
806 
807     if (p != MAP_FAILED) {
808         nxt_mem_munmap(p, size);
809     }
810 
811     if (msg->fd[0] != -1) {
812         nxt_fd_close(msg->fd[0]);
813         msg->fd[0] = -1;
814     }
815 }
816 
817 
818 static void
819 nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
820 {
821     nxt_app_t            *app;
822     nxt_int_t            ret;
823     nxt_str_t            app_name;
824     nxt_port_t           *port, *reply_port, *shared_port, *old_shared_port;
825     nxt_port_msg_type_t  reply;
826 
827     reply_port = nxt_runtime_port_find(task->thread->runtime,
828                                        msg->port_msg.pid,
829                                        msg->port_msg.reply_port);
830     if (nxt_slow_path(reply_port == NULL)) {
831         nxt_alert(task, "app_restart_handler: reply port not found");
832         return;
833     }
834 
835     app_name.length = nxt_buf_mem_used_size(&msg->buf->mem);
836     app_name.start = msg->buf->mem.pos;
837 
838     nxt_debug(task, "app_restart_handler: %V", &app_name);
839 
840     app = nxt_router_app_find(&nxt_router->apps, &app_name);
841 
842     if (nxt_fast_path(app != NULL)) {
843         shared_port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
844                                    NXT_PROCESS_APP);
845         if (nxt_slow_path(shared_port == NULL)) {
846             goto fail;
847         }
848 
849         ret = nxt_port_socket_init(task, shared_port, 0);
850         if (nxt_slow_path(ret != NXT_OK)) {
851             nxt_port_use(task, shared_port, -1);
852             goto fail;
853         }
854 
855         ret = nxt_router_app_queue_init(task, shared_port);
856         if (nxt_slow_path(ret != NXT_OK)) {
857             nxt_port_write_close(shared_port);
858             nxt_port_read_close(shared_port);
859             nxt_port_use(task, shared_port, -1);
860             goto fail;
861         }
862 
863         nxt_port_write_enable(task, shared_port);
864 
865         nxt_thread_mutex_lock(&app->mutex);
866 
867         nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
868 
869             (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1,
870                                          0, 0, NULL);
871 
872         } nxt_queue_loop;
873 
874         app->generation++;
875 
876         shared_port->app = app;
877 
878         old_shared_port = app->shared_port;
879         old_shared_port->app = NULL;
880 
881         app->shared_port = shared_port;
882 
883         nxt_thread_mutex_unlock(&app->mutex);
884 
885         nxt_port_close(task, old_shared_port);
886         nxt_port_use(task, old_shared_port, -1);
887 
888         reply = NXT_PORT_MSG_RPC_READY_LAST;
889 
890     } else {
891 
892 fail:
893 
894         reply = NXT_PORT_MSG_RPC_ERROR;
895     }
896 
897     nxt_port_socket_write(task, reply_port, reply, -1, msg->port_msg.stream,
898                           0, NULL);
899 }
900 
901 
902 static void
903 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
904     void *data)
905 {
906     union {
907         nxt_pid_t  removed_pid;
908         void       *data;
909     } u;
910 
911     u.data = data;
912 
913     nxt_port_rpc_remove_peer(task, port, u.removed_pid);
914 }
915 
916 
917 static void
918 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
919 {
920     nxt_event_engine_t  *engine;
921 
922     nxt_port_remove_pid_handler(task, msg);
923 
924     nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
925     {
926         if (nxt_fast_path(engine->port != NULL)) {
927             nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid,
928                           msg->u.data);
929         }
930     }
931     nxt_queue_loop;
932 
933     if (msg->port_msg.stream == 0) {
934         return;
935     }
936 
937     msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
938 
939     nxt_port_rpc_handler(task, msg);
940 }
941 
942 
943 static nxt_router_temp_conf_t *
944 nxt_router_temp_conf(nxt_task_t *task)
945 {
946     nxt_mp_t                *mp, *tmp;
947     nxt_router_conf_t       *rtcf;
948     nxt_router_temp_conf_t  *tmcf;
949 
950     mp = nxt_mp_create(1024, 128, 256, 32);
951     if (nxt_slow_path(mp == NULL)) {
952         return NULL;
953     }
954 
955     rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t));
956     if (nxt_slow_path(rtcf == NULL)) {
957         goto fail;
958     }
959 
960     rtcf->mem_pool = mp;
961 
962     tmp = nxt_mp_create(1024, 128, 256, 32);
963     if (nxt_slow_path(tmp == NULL)) {
964         goto fail;
965     }
966 
967     tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t));
968     if (nxt_slow_path(tmcf == NULL)) {
969         goto temp_fail;
970     }
971 
972     tmcf->mem_pool = tmp;
973     tmcf->router_conf = rtcf;
974     tmcf->count = 1;
975     tmcf->engine = task->thread->engine;
976 
977     tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
978                                      sizeof(nxt_router_engine_conf_t));
979     if (nxt_slow_path(tmcf->engines == NULL)) {
980         goto temp_fail;
981     }
982 
983     nxt_queue_init(&creating_sockets);
984     nxt_queue_init(&pending_sockets);
985     nxt_queue_init(&updating_sockets);
986     nxt_queue_init(&keeping_sockets);
987     nxt_queue_init(&deleting_sockets);
988 
989 #if (NXT_TLS)
990     nxt_queue_init(&tmcf->tls);
991 #endif
992 
993     nxt_queue_init(&tmcf->apps);
994     nxt_queue_init(&tmcf->previous);
995 
996     return tmcf;
997 
998 temp_fail:
999 
1000     nxt_mp_destroy(tmp);
1001 
1002 fail:
1003 
1004     nxt_mp_destroy(mp);
1005 
1006     return NULL;
1007 }
1008 
1009 
1010 nxt_inline nxt_bool_t
1011 nxt_router_app_can_start(nxt_app_t *app)
1012 {
1013     return app->processes + app->pending_processes < app->max_processes
1014             && app->pending_processes < app->max_pending_processes;
1015 }
1016 
1017 
1018 nxt_inline nxt_bool_t
1019 nxt_router_app_need_start(nxt_app_t *app)
1020 {
1021     return (app->active_requests
1022               > app->port_hash_count + app->pending_processes)
1023            || (app->spare_processes
1024                 > app->idle_processes + app->pending_processes);
1025 }
1026 
1027 
1028 static void
1029 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
1030 {
1031     nxt_int_t                    ret;
1032     nxt_app_t                    *app;
1033     nxt_router_t                 *router;
1034     nxt_runtime_t                *rt;
1035     nxt_queue_link_t             *qlk;
1036     nxt_socket_conf_t            *skcf;
1037     nxt_router_conf_t            *rtcf;
1038     nxt_router_temp_conf_t       *tmcf;
1039     const nxt_event_interface_t  *interface;
1040 #if (NXT_TLS)
1041     nxt_router_tlssock_t         *tls;
1042 #endif
1043 
1044     tmcf = obj;
1045 
1046     qlk = nxt_queue_first(&pending_sockets);
1047 
1048     if (qlk != nxt_queue_tail(&pending_sockets)) {
1049         nxt_queue_remove(qlk);
1050         nxt_queue_insert_tail(&creating_sockets, qlk);
1051 
1052         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1053 
1054         nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
1055 
1056         return;
1057     }
1058 
1059 #if (NXT_TLS)
1060     qlk = nxt_queue_last(&tmcf->tls);
1061 
1062     if (qlk != nxt_queue_head(&tmcf->tls)) {
1063         nxt_queue_remove(qlk);
1064 
1065         tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link);
1066 
1067         nxt_cert_store_get(task, &tls->name, tmcf->mem_pool,
1068                            nxt_router_tls_rpc_handler, tls);
1069         return;
1070     }
1071 #endif
1072 
1073     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1074 
1075         if (nxt_router_app_need_start(app)) {
1076             nxt_router_app_rpc_create(task, tmcf, app);
1077             return;
1078         }
1079 
1080     } nxt_queue_loop;
1081 
1082     rtcf = tmcf->router_conf;
1083 
1084     if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) {
1085         nxt_router_access_log_open(task, tmcf);
1086         return;
1087     }
1088 
1089     rt = task->thread->runtime;
1090 
1091     interface = nxt_service_get(rt->services, "engine", NULL);
1092 
1093     router = rtcf->router;
1094 
1095     ret = nxt_router_engines_create(task, router, tmcf, interface);
1096     if (nxt_slow_path(ret != NXT_OK)) {
1097         goto fail;
1098     }
1099 
1100     ret = nxt_router_threads_create(task, rt, tmcf);
1101     if (nxt_slow_path(ret != NXT_OK)) {
1102         goto fail;
1103     }
1104 
1105     nxt_router_apps_sort(task, router, tmcf);
1106 
1107     nxt_router_apps_hash_use(task, rtcf, 1);
1108 
1109     nxt_router_engines_post(router, tmcf);
1110 
1111     nxt_queue_add(&router->sockets, &updating_sockets);
1112     nxt_queue_add(&router->sockets, &creating_sockets);
1113 
1114     router->access_log = rtcf->access_log;
1115 
1116     nxt_router_conf_ready(task, tmcf);
1117 
1118     return;
1119 
1120 fail:
1121 
1122     nxt_router_conf_error(task, tmcf);
1123 
1124     return;
1125 }
1126 
1127 
1128 static void
1129 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data)
1130 {
1131     nxt_joint_job_t  *job;
1132 
1133     job = obj;
1134 
1135     nxt_router_conf_ready(task, job->tmcf);
1136 }
1137 
1138 
1139 static void
1140 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1141 {
1142     uint32_t               count;
1143     nxt_router_conf_t      *rtcf;
1144     nxt_thread_spinlock_t  *lock;
1145 
1146     nxt_debug(task, "temp conf %p count: %D", tmcf, tmcf->count);
1147 
1148     if (--tmcf->count > 0) {
1149         return;
1150     }
1151 
1152     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
1153 
1154     rtcf = tmcf->router_conf;
1155 
1156     lock = &rtcf->router->lock;
1157 
1158     nxt_thread_spin_lock(lock);
1159 
1160     count = rtcf->count;
1161 
1162     nxt_thread_spin_unlock(lock);
1163 
1164     nxt_debug(task, "rtcf %p: %D", rtcf, count);
1165 
1166     if (count == 0) {
1167         nxt_router_apps_hash_use(task, rtcf, -1);
1168 
1169         nxt_router_access_log_release(task, lock, rtcf->access_log);
1170 
1171         nxt_mp_destroy(rtcf->mem_pool);
1172     }
1173 
1174     nxt_mp_release(tmcf->mem_pool);
1175 }
1176 
1177 
1178 static void
1179 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1180 {
1181     nxt_app_t          *app;
1182     nxt_queue_t        new_socket_confs;
1183     nxt_socket_t       s;
1184     nxt_router_t       *router;
1185     nxt_queue_link_t   *qlk;
1186     nxt_socket_conf_t  *skcf;
1187     nxt_router_conf_t  *rtcf;
1188 
1189     nxt_alert(task, "failed to apply new conf");
1190 
1191     for (qlk = nxt_queue_first(&creating_sockets);
1192          qlk != nxt_queue_tail(&creating_sockets);
1193          qlk = nxt_queue_next(qlk))
1194     {
1195         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1196         s = skcf->listen->socket;
1197 
1198         if (s != -1) {
1199             nxt_socket_close(task, s);
1200         }
1201 
1202         nxt_free(skcf->listen);
1203     }
1204 
1205     nxt_queue_init(&new_socket_confs);
1206     nxt_queue_add(&new_socket_confs, &updating_sockets);
1207     nxt_queue_add(&new_socket_confs, &pending_sockets);
1208     nxt_queue_add(&new_socket_confs, &creating_sockets);
1209 
1210     rtcf = tmcf->router_conf;
1211 
1212     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1213 
1214         nxt_router_app_unlink(task, app);
1215 
1216     } nxt_queue_loop;
1217 
1218     router = rtcf->router;
1219 
1220     nxt_queue_add(&router->sockets, &keeping_sockets);
1221     nxt_queue_add(&router->sockets, &deleting_sockets);
1222 
1223     nxt_queue_add(&router->apps, &tmcf->previous);
1224 
1225     // TODO: new engines and threads
1226 
1227     nxt_router_access_log_release(task, &router->lock, rtcf->access_log);
1228 
1229     nxt_mp_destroy(rtcf->mem_pool);
1230 
1231     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
1232 
1233     nxt_mp_release(tmcf->mem_pool);
1234 }
1235 
1236 
1237 static void
1238 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1239     nxt_port_msg_type_t type)
1240 {
1241     nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);
1242 
1243     nxt_port_use(task, tmcf->port, -1);
1244 
1245     tmcf->port = NULL;
1246 }
1247 
1248 
1249 static nxt_conf_map_t  nxt_router_conf[] = {
1250     {
1251         nxt_string("listeners_threads"),
1252         NXT_CONF_MAP_INT32,
1253         offsetof(nxt_router_conf_t, threads),
1254     },
1255 };
1256 
1257 
1258 static nxt_conf_map_t  nxt_router_app_conf[] = {
1259     {
1260         nxt_string("type"),
1261         NXT_CONF_MAP_STR,
1262         offsetof(nxt_router_app_conf_t, type),
1263     },
1264 
1265     {
1266         nxt_string("limits"),
1267         NXT_CONF_MAP_PTR,
1268         offsetof(nxt_router_app_conf_t, limits_value),
1269     },
1270 
1271     {
1272         nxt_string("processes"),
1273         NXT_CONF_MAP_INT32,
1274         offsetof(nxt_router_app_conf_t, processes),
1275     },
1276 
1277     {
1278         nxt_string("processes"),
1279         NXT_CONF_MAP_PTR,
1280         offsetof(nxt_router_app_conf_t, processes_value),
1281     },
1282 
1283     {
1284         nxt_string("targets"),
1285         NXT_CONF_MAP_PTR,
1286         offsetof(nxt_router_app_conf_t, targets_value),
1287     },
1288 };
1289 
1290 
1291 static nxt_conf_map_t  nxt_router_app_limits_conf[] = {
1292     {
1293         nxt_string("timeout"),
1294         NXT_CONF_MAP_MSEC,
1295         offsetof(nxt_router_app_conf_t, timeout),
1296     },
1297 
1298     {
1299         nxt_string("requests"),
1300         NXT_CONF_MAP_INT32,
1301         offsetof(nxt_router_app_conf_t, requests),
1302     },
1303 };
1304 
1305 
1306 static nxt_conf_map_t  nxt_router_app_processes_conf[] = {
1307     {
1308         nxt_string("spare"),
1309         NXT_CONF_MAP_INT32,
1310         offsetof(nxt_router_app_conf_t, spare_processes),
1311     },
1312 
1313     {
1314         nxt_string("max"),
1315         NXT_CONF_MAP_INT32,
1316         offsetof(nxt_router_app_conf_t, max_processes),
1317     },
1318 
1319     {
1320         nxt_string("idle_timeout"),
1321         NXT_CONF_MAP_MSEC,
1322         offsetof(nxt_router_app_conf_t, idle_timeout),
1323     },
1324 };
1325 
1326 
1327 static nxt_conf_map_t  nxt_router_listener_conf[] = {
1328     {
1329         nxt_string("pass"),
1330         NXT_CONF_MAP_STR_COPY,
1331         offsetof(nxt_router_listener_conf_t, pass),
1332     },
1333 
1334     {
1335         nxt_string("application"),
1336         NXT_CONF_MAP_STR_COPY,
1337         offsetof(nxt_router_listener_conf_t, application),
1338     },
1339 };
1340 
1341 
1342 static nxt_conf_map_t  nxt_router_http_conf[] = {
1343     {
1344         nxt_string("header_buffer_size"),
1345         NXT_CONF_MAP_SIZE,
1346         offsetof(nxt_socket_conf_t, header_buffer_size),
1347     },
1348 
1349     {
1350         nxt_string("large_header_buffer_size"),
1351         NXT_CONF_MAP_SIZE,
1352         offsetof(nxt_socket_conf_t, large_header_buffer_size),
1353     },
1354 
1355     {
1356         nxt_string("large_header_buffers"),
1357         NXT_CONF_MAP_SIZE,
1358         offsetof(nxt_socket_conf_t, large_header_buffers),
1359     },
1360 
1361     {
1362         nxt_string("body_buffer_size"),
1363         NXT_CONF_MAP_SIZE,
1364         offsetof(nxt_socket_conf_t, body_buffer_size),
1365     },
1366 
1367     {
1368         nxt_string("max_body_size"),
1369         NXT_CONF_MAP_SIZE,
1370         offsetof(nxt_socket_conf_t, max_body_size),
1371     },
1372 
1373     {
1374         nxt_string("idle_timeout"),
1375         NXT_CONF_MAP_MSEC,
1376         offsetof(nxt_socket_conf_t, idle_timeout),
1377     },
1378 
1379     {
1380         nxt_string("header_read_timeout"),
1381         NXT_CONF_MAP_MSEC,
1382         offsetof(nxt_socket_conf_t, header_read_timeout),
1383     },
1384 
1385     {
1386         nxt_string("body_read_timeout"),
1387         NXT_CONF_MAP_MSEC,
1388         offsetof(nxt_socket_conf_t, body_read_timeout),
1389     },
1390 
1391     {
1392         nxt_string("send_timeout"),
1393         NXT_CONF_MAP_MSEC,
1394         offsetof(nxt_socket_conf_t, send_timeout),
1395     },
1396 
1397     {
1398         nxt_string("body_temp_path"),
1399         NXT_CONF_MAP_STR,
1400         offsetof(nxt_socket_conf_t, body_temp_path),
1401     },
1402 
1403     {
1404         nxt_string("discard_unsafe_fields"),
1405         NXT_CONF_MAP_INT8,
1406         offsetof(nxt_socket_conf_t, discard_unsafe_fields),
1407     },
1408 };
1409 
1410 
1411 static nxt_conf_map_t  nxt_router_websocket_conf[] = {
1412     {
1413         nxt_string("max_frame_size"),
1414         NXT_CONF_MAP_SIZE,
1415         offsetof(nxt_websocket_conf_t, max_frame_size),
1416     },
1417 
1418     {
1419         nxt_string("read_timeout"),
1420         NXT_CONF_MAP_MSEC,
1421         offsetof(nxt_websocket_conf_t, read_timeout),
1422     },
1423 
1424     {
1425         nxt_string("keepalive_interval"),
1426         NXT_CONF_MAP_MSEC,
1427         offsetof(nxt_websocket_conf_t, keepalive_interval),
1428     },
1429 
1430 };
1431 
1432 
1433 static nxt_int_t
1434 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1435     u_char *start, u_char *end)
1436 {
1437     u_char                      *p;
1438     size_t                      size;
1439     nxt_mp_t                    *mp, *app_mp;
1440     uint32_t                    next, next_target;
1441     nxt_int_t                   ret;
1442     nxt_str_t                   name, path, target;
1443     nxt_app_t                   *app, *prev;
1444     nxt_str_t                   *t, *s, *targets;
1445     nxt_uint_t                  n, i;
1446     nxt_port_t                  *port;
1447     nxt_router_t                *router;
1448     nxt_app_joint_t             *app_joint;
1449 #if (NXT_TLS)
1450     nxt_tls_init_t              *tls_init;
1451     nxt_conf_value_t            *certificate;
1452 #endif
1453     nxt_conf_value_t            *conf, *http, *value, *websocket;
1454     nxt_conf_value_t            *applications, *application;
1455     nxt_conf_value_t            *listeners, *listener;
1456     nxt_conf_value_t            *routes_conf, *static_conf, *client_ip_conf;
1457     nxt_socket_conf_t           *skcf;
1458     nxt_http_routes_t           *routes;
1459     nxt_event_engine_t          *engine;
1460     nxt_app_lang_module_t       *lang;
1461     nxt_router_app_conf_t       apcf;
1462     nxt_router_access_log_t     *access_log;
1463     nxt_router_listener_conf_t  lscf;
1464 
1465     static nxt_str_t  http_path = nxt_string("/settings/http");
1466     static nxt_str_t  applications_path = nxt_string("/applications");
1467     static nxt_str_t  listeners_path = nxt_string("/listeners");
1468     static nxt_str_t  routes_path = nxt_string("/routes");
1469     static nxt_str_t  access_log_path = nxt_string("/access_log");
1470 #if (NXT_TLS)
1471     static nxt_str_t  certificate_path = nxt_string("/tls/certificate");
1472     static nxt_str_t  conf_commands_path = nxt_string("/tls/conf_commands");
1473     static nxt_str_t  conf_cache_path = nxt_string("/tls/session/cache_size");
1474     static nxt_str_t  conf_timeout_path = nxt_string("/tls/session/timeout");
1475 #endif
1476     static nxt_str_t  static_path = nxt_string("/settings/http/static");
1477     static nxt_str_t  websocket_path = nxt_string("/settings/http/websocket");
1478     static nxt_str_t  client_ip_path = nxt_string("/client_ip");
1479 
1480     conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
1481     if (conf == NULL) {
1482         nxt_alert(task, "configuration parsing error");
1483         return NXT_ERROR;
1484     }
1485 
1486     mp = tmcf->router_conf->mem_pool;
1487 
1488     ret = nxt_conf_map_object(mp, conf, nxt_router_conf,
1489                               nxt_nitems(nxt_router_conf), tmcf->router_conf);
1490     if (ret != NXT_OK) {
1491         nxt_alert(task, "root map error");
1492         return NXT_ERROR;
1493     }
1494 
1495     if (tmcf->router_conf->threads == 0) {
1496         tmcf->router_conf->threads = nxt_ncpu;
1497     }
1498 
1499     static_conf = nxt_conf_get_path(conf, &static_path);
1500 
1501     ret = nxt_router_conf_process_static(task, tmcf->router_conf, static_conf);
1502     if (nxt_slow_path(ret != NXT_OK)) {
1503         return NXT_ERROR;
1504     }
1505 
1506     router = tmcf->router_conf->router;
1507 
1508     applications = nxt_conf_get_path(conf, &applications_path);
1509 
1510     if (applications != NULL) {
1511         next = 0;
1512 
1513         for ( ;; ) {
1514             application = nxt_conf_next_object_member(applications,
1515                                                       &name, &next);
1516             if (application == NULL) {
1517                 break;
1518             }
1519 
1520             nxt_debug(task, "application \"%V\"", &name);
1521 
1522             size = nxt_conf_json_length(application, NULL);
1523 
1524             app_mp = nxt_mp_create(4096, 128, 1024, 64);
1525             if (nxt_slow_path(app_mp == NULL)) {
1526                 goto fail;
1527             }
1528 
1529             app = nxt_mp_get(app_mp, sizeof(nxt_app_t) + name.length + size);
1530             if (app == NULL) {
1531                 goto app_fail;
1532             }
1533 
1534             nxt_memzero(app, sizeof(nxt_app_t));
1535 
1536             app->mem_pool = app_mp;
1537 
1538             app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t));
1539             app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t)
1540                                                   + name.length);
1541 
1542             p = nxt_conf_json_print(app->conf.start, application, NULL);
1543             app->conf.length = p - app->conf.start;
1544 
1545             nxt_assert(app->conf.length <= size);
1546 
1547             nxt_debug(task, "application conf \"%V\"", &app->conf);
1548 
1549             prev = nxt_router_app_find(&router->apps, &name);
1550 
1551             if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
1552                 nxt_mp_destroy(app_mp);
1553 
1554                 nxt_queue_remove(&prev->link);
1555                 nxt_queue_insert_tail(&tmcf->previous, &prev->link);
1556 
1557                 ret = nxt_router_apps_hash_add(tmcf->router_conf, prev);
1558                 if (nxt_slow_path(ret != NXT_OK)) {
1559                     goto fail;
1560                 }
1561 
1562                 continue;
1563             }
1564 
1565             apcf.processes = 1;
1566             apcf.max_processes = 1;
1567             apcf.spare_processes = 0;
1568             apcf.timeout = 0;
1569             apcf.idle_timeout = 15000;
1570             apcf.requests = 0;
1571             apcf.limits_value = NULL;
1572             apcf.processes_value = NULL;
1573             apcf.targets_value = NULL;
1574 
1575             app_joint = nxt_malloc(sizeof(nxt_app_joint_t));
1576             if (nxt_slow_path(app_joint == NULL)) {
1577                 goto app_fail;
1578             }
1579 
1580             nxt_memzero(app_joint, sizeof(nxt_app_joint_t));
1581 
1582             ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
1583                                       nxt_nitems(nxt_router_app_conf), &apcf);
1584             if (ret != NXT_OK) {
1585                 nxt_alert(task, "application map error");
1586                 goto app_fail;
1587             }
1588 
1589             if (apcf.limits_value != NULL) {
1590 
1591                 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) {
1592                     nxt_alert(task, "application limits is not object");
1593                     goto app_fail;
1594                 }
1595 
1596                 ret = nxt_conf_map_object(mp, apcf.limits_value,
1597                                         nxt_router_app_limits_conf,
1598                                         nxt_nitems(nxt_router_app_limits_conf),
1599                                         &apcf);
1600                 if (ret != NXT_OK) {
1601                     nxt_alert(task, "application limits map error");
1602                     goto app_fail;
1603                 }
1604             }
1605 
1606             if (apcf.processes_value != NULL
1607                 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT)
1608             {
1609                 ret = nxt_conf_map_object(mp, apcf.processes_value,
1610                                      nxt_router_app_processes_conf,
1611                                      nxt_nitems(nxt_router_app_processes_conf),
1612                                      &apcf);
1613                 if (ret != NXT_OK) {
1614                     nxt_alert(task, "application processes map error");
1615                     goto app_fail;
1616                 }
1617 
1618             } else {
1619                 apcf.max_processes = apcf.processes;
1620                 apcf.spare_processes = apcf.processes;
1621             }
1622 
1623             if (apcf.targets_value != NULL) {
1624                 n = nxt_conf_object_members_count(apcf.targets_value);
1625 
1626                 targets = nxt_mp_get(app_mp, sizeof(nxt_str_t) * n);
1627                 if (nxt_slow_path(targets == NULL)) {
1628                     goto app_fail;
1629                 }
1630 
1631                 next_target = 0;
1632 
1633                 for (i = 0; i < n; i++) {
1634                     (void) nxt_conf_next_object_member(apcf.targets_value,
1635                                                        &target, &next_target);
1636 
1637                     s = nxt_str_dup(app_mp, &targets[i], &target);
1638                     if (nxt_slow_path(s == NULL)) {
1639                         goto app_fail;
1640                     }
1641                 }
1642 
1643             } else {
1644                 targets = NULL;
1645             }
1646 
1647             nxt_debug(task, "application type: %V", &apcf.type);
1648             nxt_debug(task, "application processes: %D", apcf.processes);
1649             nxt_debug(task, "application request timeout: %M", apcf.timeout);
1650             nxt_debug(task, "application requests: %D", apcf.requests);
1651 
1652             lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
1653 
1654             if (lang == NULL) {
1655                 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type);
1656                 goto app_fail;
1657             }
1658 
1659             nxt_debug(task, "application language module: \"%s\"", lang->file);
1660 
1661             ret = nxt_thread_mutex_create(&app->mutex);
1662             if (ret != NXT_OK) {
1663                 goto app_fail;
1664             }
1665 
1666             nxt_queue_init(&app->ports);
1667             nxt_queue_init(&app->spare_ports);
1668             nxt_queue_init(&app->idle_ports);
1669             nxt_queue_init(&app->ack_waiting_req);
1670 
1671             app->name.length = name.length;
1672             nxt_memcpy(app->name.start, name.start, name.length);
1673 
1674             app->type = lang->type;
1675             app->max_processes = apcf.max_processes;
1676             app->spare_processes = apcf.spare_processes;
1677             app->max_pending_processes = apcf.spare_processes
1678                                          ? apcf.spare_processes : 1;
1679             app->timeout = apcf.timeout;
1680             app->idle_timeout = apcf.idle_timeout;
1681             app->max_requests = apcf.requests;
1682 
1683             app->targets = targets;
1684 
1685             engine = task->thread->engine;
1686 
1687             app->engine = engine;
1688 
1689             app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;
1690             app->adjust_idle_work.task = &engine->task;
1691             app->adjust_idle_work.obj = app;
1692 
1693             nxt_queue_insert_tail(&tmcf->apps, &app->link);
1694 
1695             ret = nxt_router_apps_hash_add(tmcf->router_conf, app);
1696             if (nxt_slow_path(ret != NXT_OK)) {
1697                 goto app_fail;
1698             }
1699 
1700             nxt_router_app_use(task, app, 1);
1701 
1702             app->joint = app_joint;
1703 
1704             app_joint->use_count = 1;
1705             app_joint->app = app;
1706 
1707             app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS;
1708             app_joint->idle_timer.work_queue = &engine->fast_work_queue;
1709             app_joint->idle_timer.handler = nxt_router_app_idle_timeout;
1710             app_joint->idle_timer.task = &engine->task;
1711             app_joint->idle_timer.log = app_joint->idle_timer.task->log;
1712 
1713             app_joint->free_app_work.handler = nxt_router_free_app;
1714             app_joint->free_app_work.task = &engine->task;
1715             app_joint->free_app_work.obj = app_joint;
1716 
1717             port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
1718                                 NXT_PROCESS_APP);
1719             if (nxt_slow_path(port == NULL)) {
1720                 return NXT_ERROR;
1721             }
1722 
1723             ret = nxt_port_socket_init(task, port, 0);
1724             if (nxt_slow_path(ret != NXT_OK)) {
1725                 nxt_port_use(task, port, -1);
1726                 return NXT_ERROR;
1727             }
1728 
1729             ret = nxt_router_app_queue_init(task, port);
1730             if (nxt_slow_path(ret != NXT_OK)) {
1731                 nxt_port_write_close(port);
1732                 nxt_port_read_close(port);
1733                 nxt_port_use(task, port, -1);
1734                 return NXT_ERROR;
1735             }
1736 
1737             nxt_port_write_enable(task, port);
1738             port->app = app;
1739 
1740             app->shared_port = port;
1741 
1742             nxt_thread_mutex_create(&app->outgoing.mutex);
1743         }
1744     }
1745 
1746     routes_conf = nxt_conf_get_path(conf, &routes_path);
1747     if (nxt_fast_path(routes_conf != NULL)) {
1748         routes = nxt_http_routes_create(task, tmcf, routes_conf);
1749         if (nxt_slow_path(routes == NULL)) {
1750             return NXT_ERROR;
1751         }
1752         tmcf->router_conf->routes = routes;
1753     }
1754 
1755     ret = nxt_upstreams_create(task, tmcf, conf);
1756     if (nxt_slow_path(ret != NXT_OK)) {
1757         return ret;
1758     }
1759 
1760     http = nxt_conf_get_path(conf, &http_path);
1761 #if 0
1762     if (http == NULL) {
1763         nxt_alert(task, "no \"http\" block");
1764         return NXT_ERROR;
1765     }
1766 #endif
1767 
1768     websocket = nxt_conf_get_path(conf, &websocket_path);
1769 
1770     listeners = nxt_conf_get_path(conf, &listeners_path);
1771 
1772     if (listeners != NULL) {
1773         next = 0;
1774 
1775         for ( ;; ) {
1776             listener = nxt_conf_next_object_member(listeners, &name, &next);
1777             if (listener == NULL) {
1778                 break;
1779             }
1780 
1781             skcf = nxt_router_socket_conf(task, tmcf, &name);
1782             if (skcf == NULL) {
1783                 goto fail;
1784             }
1785 
1786             nxt_memzero(&lscf, sizeof(lscf));
1787 
1788             ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
1789                                       nxt_nitems(nxt_router_listener_conf),
1790                                       &lscf);
1791             if (ret != NXT_OK) {
1792                 nxt_alert(task, "listener map error");
1793                 goto fail;
1794             }
1795 
1796             nxt_debug(task, "application: %V", &lscf.application);
1797 
1798             // STUB, default values if http block is not defined.
1799             skcf->header_buffer_size = 2048;
1800             skcf->large_header_buffer_size = 8192;
1801             skcf->large_header_buffers = 4;
1802             skcf->discard_unsafe_fields = 1;
1803             skcf->body_buffer_size = 16 * 1024;
1804             skcf->max_body_size = 8 * 1024 * 1024;
1805             skcf->proxy_header_buffer_size = 64 * 1024;
1806             skcf->proxy_buffer_size = 4096;
1807             skcf->proxy_buffers = 256;
1808             skcf->idle_timeout = 180 * 1000;
1809             skcf->header_read_timeout = 30 * 1000;
1810             skcf->body_read_timeout = 30 * 1000;
1811             skcf->send_timeout = 30 * 1000;
1812             skcf->proxy_timeout = 60 * 1000;
1813             skcf->proxy_send_timeout = 30 * 1000;
1814             skcf->proxy_read_timeout = 30 * 1000;
1815 
1816             skcf->websocket_conf.max_frame_size = 1024 * 1024;
1817             skcf->websocket_conf.read_timeout = 60 * 1000;
1818             skcf->websocket_conf.keepalive_interval = 30 * 1000;
1819 
1820             nxt_str_null(&skcf->body_temp_path);
1821 
1822             if (http != NULL) {
1823                 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
1824                                           nxt_nitems(nxt_router_http_conf),
1825                                           skcf);
1826                 if (ret != NXT_OK) {
1827                     nxt_alert(task, "http map error");
1828                     goto fail;
1829                 }
1830             }
1831 
1832             if (websocket != NULL) {
1833                 ret = nxt_conf_map_object(mp, websocket,
1834                                           nxt_router_websocket_conf,
1835                                           nxt_nitems(nxt_router_websocket_conf),
1836                                           &skcf->websocket_conf);
1837                 if (ret != NXT_OK) {
1838                     nxt_alert(task, "websocket map error");
1839                     goto fail;
1840                 }
1841             }
1842 
1843             t = &skcf->body_temp_path;
1844 
1845             if (t->length == 0) {
1846                 t->start = (u_char *) task->thread->runtime->tmp;
1847                 t->length = nxt_strlen(t->start);
1848             }
1849 
1850             client_ip_conf = nxt_conf_get_path(listener, &client_ip_path);
1851             ret = nxt_router_conf_process_client_ip(task, tmcf, skcf,
1852                                                     client_ip_conf);
1853             if (nxt_slow_path(ret != NXT_OK)) {
1854                 return NXT_ERROR;
1855             }
1856 
1857 #if (NXT_TLS)
1858             certificate = nxt_conf_get_path(listener, &certificate_path);
1859 
1860             if (certificate != NULL) {
1861                 tls_init = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_tls_init_t));
1862                 if (nxt_slow_path(tls_init == NULL)) {
1863                     return NXT_ERROR;
1864                 }
1865 
1866                 tls_init->cache_size = 0;
1867                 tls_init->timeout = 300;
1868 
1869                 value = nxt_conf_get_path(listener, &conf_cache_path);
1870                 if (value != NULL) {
1871                     tls_init->cache_size = nxt_conf_get_number(value);
1872                 }
1873 
1874                 value = nxt_conf_get_path(listener, &conf_timeout_path);
1875                 if (value != NULL) {
1876                     tls_init->timeout = nxt_conf_get_number(value);
1877                 }
1878 
1879                 tls_init->conf_cmds = nxt_conf_get_path(listener,
1880                                                         &conf_commands_path);
1881 
1882                 if (nxt_conf_type(certificate) == NXT_CONF_ARRAY) {
1883                     n = nxt_conf_array_elements_count(certificate);
1884 
1885                     for (i = 0; i < n; i++) {
1886                         value = nxt_conf_get_array_element(certificate, i);
1887 
1888                         nxt_assert(value != NULL);
1889 
1890                         ret = nxt_router_conf_tls_insert(tmcf, value, skcf,
1891                                                          tls_init, i == 0);
1892                         if (nxt_slow_path(ret != NXT_OK)) {
1893                             goto fail;
1894                         }
1895                     }
1896 
1897                 } else {
1898                     /* NXT_CONF_STRING */
1899                     ret = nxt_router_conf_tls_insert(tmcf, certificate, skcf,
1900                                                      tls_init, 1);
1901                     if (nxt_slow_path(ret != NXT_OK)) {
1902                         goto fail;
1903                     }
1904                 }
1905             }
1906 #endif
1907 
1908             skcf->listen->handler = nxt_http_conn_init;
1909             skcf->router_conf = tmcf->router_conf;
1910             skcf->router_conf->count++;
1911 
1912             if (lscf.pass.length != 0) {
1913                 skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass);
1914 
1915             /* COMPATIBILITY: listener application. */
1916             } else if (lscf.application.length > 0) {
1917                 skcf->action = nxt_http_pass_application(task,
1918                                                          tmcf->router_conf,
1919                                                          &lscf.application);
1920             }
1921 
1922             if (nxt_slow_path(skcf->action == NULL)) {
1923                 goto fail;
1924             }
1925         }
1926     }
1927 
1928     ret = nxt_http_routes_resolve(task, tmcf);
1929     if (nxt_slow_path(ret != NXT_OK)) {
1930         goto fail;
1931     }
1932 
1933     value = nxt_conf_get_path(conf, &access_log_path);
1934 
1935     if (value != NULL) {
1936         nxt_conf_get_string(value, &path);
1937 
1938         access_log = router->access_log;
1939 
1940         if (access_log != NULL && nxt_strstr_eq(&path, &access_log->path)) {
1941             nxt_thread_spin_lock(&router->lock);
1942             access_log->count++;
1943             nxt_thread_spin_unlock(&router->lock);
1944 
1945         } else {
1946             access_log = nxt_malloc(sizeof(nxt_router_access_log_t)
1947                                     + path.length);
1948             if (access_log == NULL) {
1949                 nxt_alert(task, "failed to allocate access log structure");
1950                 goto fail;
1951             }
1952 
1953             access_log->fd = -1;
1954             access_log->handler = &nxt_router_access_log_writer;
1955             access_log->count = 1;
1956 
1957             access_log->path.length = path.length;
1958             access_log->path.start = (u_char *) access_log
1959                                      + sizeof(nxt_router_access_log_t);
1960 
1961             nxt_memcpy(access_log->path.start, path.start, path.length);
1962         }
1963 
1964         tmcf->router_conf->access_log = access_log;
1965     }
1966 
1967     nxt_queue_add(&deleting_sockets, &router->sockets);
1968     nxt_queue_init(&router->sockets);
1969 
1970     return NXT_OK;
1971 
1972 app_fail:
1973 
1974     nxt_mp_destroy(app_mp);
1975 
1976 fail:
1977 
1978     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1979 
1980         nxt_queue_remove(&app->link);
1981         nxt_thread_mutex_destroy(&app->mutex);
1982         nxt_mp_destroy(app->mem_pool);
1983 
1984     } nxt_queue_loop;
1985 
1986     return NXT_ERROR;
1987 }
1988 
1989 
1990 #if (NXT_TLS)
1991 
1992 static nxt_int_t
1993 nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
1994     nxt_conf_value_t *value, nxt_socket_conf_t *skcf,
1995     nxt_tls_init_t *tls_init, nxt_bool_t last)
1996 {
1997     nxt_router_tlssock_t  *tls;
1998 
1999     tls = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_router_tlssock_t));
2000     if (nxt_slow_path(tls == NULL)) {
2001         return NXT_ERROR;
2002     }
2003 
2004     tls->tls_init = tls_init;
2005     tls->socket_conf = skcf;
2006     tls->temp_conf = tmcf;
2007     tls->last = last;
2008     nxt_conf_get_string(value, &tls->name);
2009 
2010     nxt_queue_insert_tail(&tmcf->tls, &tls->link);
2011 
2012     return NXT_OK;
2013 }
2014 
2015 #endif
2016 
2017 
2018 static nxt_int_t
2019 nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf,
2020     nxt_conf_value_t *conf)
2021 {
2022     uint32_t          next, i;
2023     nxt_mp_t          *mp;
2024     nxt_str_t         *type, exten, str;
2025     nxt_int_t         ret;
2026     nxt_uint_t        exts;
2027     nxt_conf_value_t  *mtypes_conf, *ext_conf, *value;
2028 
2029     static nxt_str_t  mtypes_path = nxt_string("/mime_types");
2030 
2031     mp = rtcf->mem_pool;
2032 
2033     ret = nxt_http_static_mtypes_init(mp, &rtcf->mtypes_hash);
2034     if (nxt_slow_path(ret != NXT_OK)) {
2035         return NXT_ERROR;
2036     }
2037 
2038     if (conf == NULL) {
2039         return NXT_OK;
2040     }
2041 
2042     mtypes_conf = nxt_conf_get_path(conf, &mtypes_path);
2043 
2044     if (mtypes_conf != NULL) {
2045         next = 0;
2046 
2047         for ( ;; ) {
2048             ext_conf = nxt_conf_next_object_member(mtypes_conf, &str, &next);
2049 
2050             if (ext_conf == NULL) {
2051                 break;
2052             }
2053 
2054             type = nxt_str_dup(mp, NULL, &str);
2055             if (nxt_slow_path(type == NULL)) {
2056                 return NXT_ERROR;
2057             }
2058 
2059             if (nxt_conf_type(ext_conf) == NXT_CONF_STRING) {
2060                 nxt_conf_get_string(ext_conf, &str);
2061 
2062                 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2063                     return NXT_ERROR;
2064                 }
2065 
2066                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2067                                                       &exten, type);
2068                 if (nxt_slow_path(ret != NXT_OK)) {
2069                     return NXT_ERROR;
2070                 }
2071 
2072                 continue;
2073             }
2074 
2075             exts = nxt_conf_array_elements_count(ext_conf);
2076 
2077             for (i = 0; i < exts; i++) {
2078                 value = nxt_conf_get_array_element(ext_conf, i);
2079 
2080                 nxt_conf_get_string(value, &str);
2081 
2082                 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2083                     return NXT_ERROR;
2084                 }
2085 
2086                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2087                                                       &exten, type);
2088                 if (nxt_slow_path(ret != NXT_OK)) {
2089                     return NXT_ERROR;
2090                 }
2091             }
2092         }
2093     }
2094 
2095     return NXT_OK;
2096 }
2097 
2098 
2099 static nxt_int_t
2100 nxt_router_conf_process_client_ip(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2101     nxt_socket_conf_t *skcf, nxt_conf_value_t *conf)
2102 {
2103     char                        c;
2104     size_t                      i;
2105     nxt_mp_t                    *mp;
2106     uint32_t                    hash;
2107     nxt_str_t                   header;
2108     nxt_conf_value_t            *source_conf, *header_conf, *recursive_conf;
2109     nxt_http_client_ip_t        *client_ip;
2110     nxt_http_route_addr_rule_t  *source;
2111 
2112     static nxt_str_t  header_path = nxt_string("/header");
2113     static nxt_str_t  source_path = nxt_string("/source");
2114     static nxt_str_t  recursive_path = nxt_string("/recursive");
2115 
2116     if (conf == NULL) {
2117         skcf->client_ip = NULL;
2118 
2119         return NXT_OK;
2120     }
2121 
2122     mp = tmcf->router_conf->mem_pool;
2123 
2124     source_conf = nxt_conf_get_path(conf, &source_path);
2125     header_conf = nxt_conf_get_path(conf, &header_path);
2126     recursive_conf = nxt_conf_get_path(conf, &recursive_path);
2127 
2128     if (source_conf == NULL || header_conf == NULL) {
2129         return NXT_ERROR;
2130     }
2131 
2132     client_ip = nxt_mp_zget(mp, sizeof(nxt_http_client_ip_t));
2133     if (nxt_slow_path(client_ip == NULL)) {
2134         return NXT_ERROR;
2135     }
2136 
2137     source = nxt_http_route_addr_rule_create(task, mp, source_conf);
2138     if (nxt_slow_path(source == NULL)) {
2139         return NXT_ERROR;
2140     }
2141 
2142     client_ip->source = source;
2143 
2144     nxt_conf_get_string(header_conf, &header);
2145 
2146     if (recursive_conf != NULL) {
2147         client_ip->recursive = nxt_conf_get_boolean(recursive_conf);
2148     }
2149 
2150     client_ip->header = nxt_str_dup(mp, NULL, &header);
2151     if (nxt_slow_path(client_ip->header == NULL)) {
2152         return NXT_ERROR;
2153     }
2154 
2155     hash = NXT_HTTP_FIELD_HASH_INIT;
2156 
2157     for (i = 0; i < client_ip->header->length; i++) {
2158         c = client_ip->header->start[i];
2159         hash = nxt_http_field_hash_char(hash, nxt_lowcase(c));
2160     }
2161 
2162     hash = nxt_http_field_hash_end(hash) & 0xFFFF;
2163 
2164     client_ip->header_hash = hash;
2165 
2166     skcf->client_ip = client_ip;
2167 
2168     return NXT_OK;
2169 }
2170 
2171 
2172 static nxt_app_t *
2173 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
2174 {
2175     nxt_app_t  *app;
2176 
2177     nxt_queue_each(app, queue, nxt_app_t, link) {
2178 
2179         if (nxt_strstr_eq(name, &app->name)) {
2180             return app;
2181         }
2182 
2183     } nxt_queue_loop;
2184 
2185     return NULL;
2186 }
2187 
2188 
2189 static nxt_int_t
2190 nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port)
2191 {
2192     void       *mem;
2193     nxt_int_t  fd;
2194 
2195     fd = nxt_shm_open(task, sizeof(nxt_app_queue_t));
2196     if (nxt_slow_path(fd == -1)) {
2197         return NXT_ERROR;
2198     }
2199 
2200     mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t),
2201                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2202     if (nxt_slow_path(mem == MAP_FAILED)) {
2203         nxt_fd_close(fd);
2204 
2205         return NXT_ERROR;
2206     }
2207 
2208     nxt_app_queue_init(mem);
2209 
2210     port->queue_fd = fd;
2211     port->queue = mem;
2212 
2213     return NXT_OK;
2214 }
2215 
2216 
2217 static nxt_int_t
2218 nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port)
2219 {
2220     void       *mem;
2221     nxt_int_t  fd;
2222 
2223     fd = nxt_shm_open(task, sizeof(nxt_port_queue_t));
2224     if (nxt_slow_path(fd == -1)) {
2225         return NXT_ERROR;
2226     }
2227 
2228     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2229                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2230     if (nxt_slow_path(mem == MAP_FAILED)) {
2231         nxt_fd_close(fd);
2232 
2233         return NXT_ERROR;
2234     }
2235 
2236     nxt_port_queue_init(mem);
2237 
2238     port->queue_fd = fd;
2239     port->queue = mem;
2240 
2241     return NXT_OK;
2242 }
2243 
2244 
2245 static nxt_int_t
2246 nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd)
2247 {
2248     void  *mem;
2249 
2250     nxt_assert(fd != -1);
2251 
2252     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2253                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2254     if (nxt_slow_path(mem == MAP_FAILED)) {
2255 
2256         return NXT_ERROR;
2257     }
2258 
2259     port->queue = mem;
2260 
2261     return NXT_OK;
2262 }
2263 
2264 
2265 static const nxt_lvlhsh_proto_t  nxt_router_apps_hash_proto  nxt_aligned(64) = {
2266     NXT_LVLHSH_DEFAULT,
2267     nxt_router_apps_hash_test,
2268     nxt_mp_lvlhsh_alloc,
2269     nxt_mp_lvlhsh_free,
2270 };
2271 
2272 
2273 static nxt_int_t
2274 nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
2275 {
2276     nxt_app_t  *app;
2277 
2278     app = data;
2279 
2280     return nxt_strstr_eq(&lhq->key, &app->name) ? NXT_OK : NXT_DECLINED;
2281 }
2282 
2283 
2284 static nxt_int_t
2285 nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, nxt_app_t *app)
2286 {
2287     nxt_lvlhsh_query_t  lhq;
2288 
2289     lhq.key_hash = nxt_djb_hash(app->name.start, app->name.length);
2290     lhq.replace = 0;
2291     lhq.key = app->name;
2292     lhq.value = app;
2293     lhq.proto = &nxt_router_apps_hash_proto;
2294     lhq.pool = rtcf->mem_pool;
2295 
2296     switch (nxt_lvlhsh_insert(&rtcf->apps_hash, &lhq)) {
2297 
2298     case NXT_OK:
2299         return NXT_OK;
2300 
2301     case NXT_DECLINED:
2302         nxt_thread_log_alert("router app hash adding failed: "
2303                              "\"%V\" is already in hash", &lhq.key);
2304         /* Fall through. */
2305     default:
2306         return NXT_ERROR;
2307     }
2308 }
2309 
2310 
2311 static nxt_app_t *
2312 nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, nxt_str_t *name)
2313 {
2314     nxt_lvlhsh_query_t  lhq;
2315 
2316     lhq.key_hash = nxt_djb_hash(name->start, name->length);
2317     lhq.key = *name;
2318     lhq.proto = &nxt_router_apps_hash_proto;
2319 
2320     if (nxt_lvlhsh_find(&rtcf->apps_hash, &lhq) != NXT_OK) {
2321         return NULL;
2322     }
2323 
2324     return lhq.value;
2325 }
2326 
2327 
2328 static void
2329 nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i)
2330 {
2331     nxt_app_t          *app;
2332     nxt_lvlhsh_each_t  lhe;
2333 
2334     nxt_lvlhsh_each_init(&lhe, &nxt_router_apps_hash_proto);
2335 
2336     for ( ;; ) {
2337         app = nxt_lvlhsh_each(&rtcf->apps_hash, &lhe);
2338 
2339         if (app == NULL) {
2340             break;
2341         }
2342 
2343         nxt_router_app_use(task, app, i);
2344     }
2345 }
2346 
2347 
2348 typedef struct {
2349     nxt_app_t  *app;
2350     nxt_int_t  target;
2351 } nxt_http_app_conf_t;
2352 
2353 
2354 nxt_int_t
2355 nxt_router_application_init(nxt_router_conf_t *rtcf, nxt_str_t *name,
2356     nxt_str_t *target, nxt_http_action_t *action)
2357 {
2358     nxt_app_t            *app;
2359     nxt_str_t            *targets;
2360     nxt_uint_t           i;
2361     nxt_http_app_conf_t  *conf;
2362 
2363     app = nxt_router_apps_hash_get(rtcf, name);
2364     if (app == NULL) {
2365         return NXT_DECLINED;
2366     }
2367 
2368     conf = nxt_mp_get(rtcf->mem_pool, sizeof(nxt_http_app_conf_t));
2369     if (nxt_slow_path(conf == NULL)) {
2370         return NXT_ERROR;
2371     }
2372 
2373     action->handler = nxt_http_application_handler;
2374     action->u.conf = conf;
2375 
2376     conf->app = app;
2377 
2378     if (target != NULL && target->length != 0) {
2379         targets = app->targets;
2380 
2381         for (i = 0; !nxt_strstr_eq(target, &targets[i]); i++);
2382 
2383         conf->target = i;
2384 
2385     } else {
2386         conf->target = 0;
2387     }
2388 
2389     return NXT_OK;
2390 }
2391 
2392 
2393 static nxt_socket_conf_t *
2394 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2395     nxt_str_t *name)
2396 {
2397     size_t               size;
2398     nxt_int_t            ret;
2399     nxt_bool_t           wildcard;
2400     nxt_sockaddr_t       *sa;
2401     nxt_socket_conf_t    *skcf;
2402     nxt_listen_socket_t  *ls;
2403 
2404     sa = nxt_sockaddr_parse(tmcf->mem_pool, name);
2405     if (nxt_slow_path(sa == NULL)) {
2406         nxt_alert(task, "invalid listener \"%V\"", name);
2407         return NULL;
2408     }
2409 
2410     sa->type = SOCK_STREAM;
2411 
2412     nxt_debug(task, "router listener: \"%*s\"",
2413               (size_t) sa->length, nxt_sockaddr_start(sa));
2414 
2415     skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t));
2416     if (nxt_slow_path(skcf == NULL)) {
2417         return NULL;
2418     }
2419 
2420     size = nxt_sockaddr_size(sa);
2421 
2422     ret = nxt_router_listen_socket_find(tmcf, skcf, sa);
2423 
2424     if (ret != NXT_OK) {
2425 
2426         ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size);
2427         if (nxt_slow_path(ls == NULL)) {
2428             return NULL;
2429         }
2430 
2431         skcf->listen = ls;
2432 
2433         ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t));
2434         nxt_memcpy(ls->sockaddr, sa, size);
2435 
2436         nxt_listen_socket_remote_size(ls);
2437 
2438         ls->socket = -1;
2439         ls->backlog = NXT_LISTEN_BACKLOG;
2440         ls->flags = NXT_NONBLOCK;
2441         ls->read_after_accept = 1;
2442     }
2443 
2444     switch (sa->u.sockaddr.sa_family) {
2445 #if (NXT_HAVE_UNIX_DOMAIN)
2446     case AF_UNIX:
2447         wildcard = 0;
2448         break;
2449 #endif
2450 #if (NXT_INET6)
2451     case AF_INET6:
2452         wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr);
2453         break;
2454 #endif
2455     case AF_INET:
2456     default:
2457         wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY);
2458         break;
2459     }
2460 
2461     if (!wildcard) {
2462         skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size);
2463         if (nxt_slow_path(skcf->sockaddr == NULL)) {
2464             return NULL;
2465         }
2466 
2467         nxt_memcpy(skcf->sockaddr, sa, size);
2468     }
2469 
2470     return skcf;
2471 }
2472 
2473 
2474 static nxt_int_t
2475 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
2476     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa)
2477 {
2478     nxt_router_t       *router;
2479     nxt_queue_link_t   *qlk;
2480     nxt_socket_conf_t  *skcf;
2481 
2482     router = tmcf->router_conf->router;
2483 
2484     for (qlk = nxt_queue_first(&router->sockets);
2485          qlk != nxt_queue_tail(&router->sockets);
2486          qlk = nxt_queue_next(qlk))
2487     {
2488         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2489 
2490         if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) {
2491             nskcf->listen = skcf->listen;
2492 
2493             nxt_queue_remove(qlk);
2494             nxt_queue_insert_tail(&keeping_sockets, qlk);
2495 
2496             nxt_queue_insert_tail(&updating_sockets, &nskcf->link);
2497 
2498             return NXT_OK;
2499         }
2500     }
2501 
2502     nxt_queue_insert_tail(&pending_sockets, &nskcf->link);
2503 
2504     return NXT_DECLINED;
2505 }
2506 
2507 
2508 static void
2509 nxt_router_listen_socket_rpc_create(nxt_task_t *task,
2510     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)
2511 {
2512     size_t            size;
2513     uint32_t          stream;
2514     nxt_int_t         ret;
2515     nxt_buf_t         *b;
2516     nxt_port_t        *main_port, *router_port;
2517     nxt_runtime_t     *rt;
2518     nxt_socket_rpc_t  *rpc;
2519 
2520     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
2521     if (rpc == NULL) {
2522         goto fail;
2523     }
2524 
2525     rpc->socket_conf = skcf;
2526     rpc->temp_conf = tmcf;
2527 
2528     size = nxt_sockaddr_size(skcf->listen->sockaddr);
2529 
2530     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2531     if (b == NULL) {
2532         goto fail;
2533     }
2534 
2535     b->completion_handler = nxt_router_dummy_buf_completion;
2536 
2537     b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size);
2538 
2539     rt = task->thread->runtime;
2540     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2541     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2542 
2543     stream = nxt_port_rpc_register_handler(task, router_port,
2544                                            nxt_router_listen_socket_ready,
2545                                            nxt_router_listen_socket_error,
2546                                            main_port->pid, rpc);
2547     if (nxt_slow_path(stream == 0)) {
2548         goto fail;
2549     }
2550 
2551     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1,
2552                                 stream, router_port->id, b);
2553 
2554     if (nxt_slow_path(ret != NXT_OK)) {
2555         nxt_port_rpc_cancel(task, router_port, stream);
2556         goto fail;
2557     }
2558 
2559     return;
2560 
2561 fail:
2562 
2563     nxt_router_conf_error(task, tmcf);
2564 }
2565 
2566 
2567 static void
2568 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2569     void *data)
2570 {
2571     nxt_int_t         ret;
2572     nxt_socket_t      s;
2573     nxt_socket_rpc_t  *rpc;
2574 
2575     rpc = data;
2576 
2577     s = msg->fd[0];
2578 
2579     ret = nxt_socket_nonblocking(task, s);
2580     if (nxt_slow_path(ret != NXT_OK)) {
2581         goto fail;
2582     }
2583 
2584     nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr);
2585 
2586     ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
2587     if (nxt_slow_path(ret != NXT_OK)) {
2588         goto fail;
2589     }
2590 
2591     rpc->socket_conf->listen->socket = s;
2592 
2593     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2594                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2595 
2596     return;
2597 
2598 fail:
2599 
2600     nxt_socket_close(task, s);
2601 
2602     nxt_router_conf_error(task, rpc->temp_conf);
2603 }
2604 
2605 
2606 static void
2607 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2608     void *data)
2609 {
2610     nxt_socket_rpc_t        *rpc;
2611     nxt_router_temp_conf_t  *tmcf;
2612 
2613     rpc = data;
2614     tmcf = rpc->temp_conf;
2615 
2616 #if 0
2617     u_char                  *p;
2618     size_t                  size;
2619     uint8_t                 error;
2620     nxt_buf_t               *in, *out;
2621     nxt_sockaddr_t          *sa;
2622 
2623     static nxt_str_t  socket_errors[] = {
2624         nxt_string("ListenerSystem"),
2625         nxt_string("ListenerNoIPv6"),
2626         nxt_string("ListenerPort"),
2627         nxt_string("ListenerInUse"),
2628         nxt_string("ListenerNoAddress"),
2629         nxt_string("ListenerNoAccess"),
2630         nxt_string("ListenerPath"),
2631     };
2632 
2633     sa = rpc->socket_conf->listen->sockaddr;
2634 
2635     in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size);
2636 
2637     if (nxt_slow_path(in == NULL)) {
2638         return;
2639     }
2640 
2641     p = in->mem.pos;
2642 
2643     error = *p++;
2644 
2645     size = nxt_length("listen socket error: ")
2646            + nxt_length("{listener: \"\", code:\"\", message: \"\"}")
2647            + sa->length + socket_errors[error].length + (in->mem.free - p);
2648 
2649     out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2650     if (nxt_slow_path(out == NULL)) {
2651         return;
2652     }
2653 
2654     out->mem.free = nxt_sprintf(out->mem.free, out->mem.end,
2655                         "listen socket error: "
2656                         "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}",
2657                         (size_t) sa->length, nxt_sockaddr_start(sa),
2658                         &socket_errors[error], in->mem.free - p, p);
2659 
2660     nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos);
2661 #endif
2662 
2663     nxt_router_conf_error(task, tmcf);
2664 }
2665 
2666 
2667 #if (NXT_TLS)
2668 
2669 static void
2670 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2671     void *data)
2672 {
2673     nxt_mp_t                *mp;
2674     nxt_int_t               ret;
2675     nxt_tls_conf_t          *tlscf;
2676     nxt_router_tlssock_t    *tls;
2677     nxt_tls_bundle_conf_t   *bundle;
2678     nxt_router_temp_conf_t  *tmcf;
2679 
2680     nxt_debug(task, "tls rpc handler");
2681 
2682     tls = data;
2683     tmcf = tls->temp_conf;
2684 
2685     if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) {
2686         goto fail;
2687     }
2688 
2689     mp = tmcf->router_conf->mem_pool;
2690 
2691     if (tls->socket_conf->tls == NULL){
2692         tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t));
2693         if (nxt_slow_path(tlscf == NULL)) {
2694             goto fail;
2695         }
2696 
2697         tlscf->no_wait_shutdown = 1;
2698         tls->socket_conf->tls = tlscf;
2699 
2700     } else {
2701         tlscf = tls->socket_conf->tls;
2702     }
2703 
2704     tls->tls_init->conf = tlscf;
2705 
2706     bundle = nxt_mp_get(mp, sizeof(nxt_tls_bundle_conf_t));
2707     if (nxt_slow_path(bundle == NULL)) {
2708         goto fail;
2709     }
2710 
2711     if (nxt_slow_path(nxt_str_dup(mp, &bundle->name, &tls->name) == NULL)) {
2712         goto fail;
2713     }
2714 
2715     bundle->chain_file = msg->fd[0];
2716     bundle->next = tlscf->bundle;
2717     tlscf->bundle = bundle;
2718 
2719     ret = task->thread->runtime->tls->server_init(task, mp, tls->tls_init,
2720                                                   tls->last);
2721     if (nxt_slow_path(ret != NXT_OK)) {
2722         goto fail;
2723     }
2724 
2725     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2726                        nxt_router_conf_apply, task, tmcf, NULL);
2727     return;
2728 
2729 fail:
2730 
2731     nxt_router_conf_error(task, tmcf);
2732 }
2733 
2734 #endif
2735 
2736 
2737 static void
2738 nxt_router_app_rpc_create(nxt_task_t *task,
2739     nxt_router_temp_conf_t *tmcf, nxt_app_t *app)
2740 {
2741     size_t         size;
2742     uint32_t       stream;
2743     nxt_int_t      ret;
2744     nxt_buf_t      *b;
2745     nxt_port_t     *main_port, *router_port;
2746     nxt_runtime_t  *rt;
2747     nxt_app_rpc_t  *rpc;
2748 
2749     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_app_rpc_t));
2750     if (rpc == NULL) {
2751         goto fail;
2752     }
2753 
2754     rpc->app = app;
2755     rpc->temp_conf = tmcf;
2756 
2757     nxt_debug(task, "app '%V' prefork", &app->name);
2758 
2759     size = app->name.length + 1 + app->conf.length;
2760 
2761     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2762     if (nxt_slow_path(b == NULL)) {
2763         goto fail;
2764     }
2765 
2766     b->completion_handler = nxt_router_dummy_buf_completion;
2767 
2768     nxt_buf_cpystr(b, &app->name);
2769     *b->mem.free++ = '\0';
2770     nxt_buf_cpystr(b, &app->conf);
2771 
2772     rt = task->thread->runtime;
2773     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2774     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2775 
2776     stream = nxt_port_rpc_register_handler(task, router_port,
2777                                            nxt_router_app_prefork_ready,
2778                                            nxt_router_app_prefork_error,
2779                                            -1, rpc);
2780     if (nxt_slow_path(stream == 0)) {
2781         goto fail;
2782     }
2783 
2784     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS,
2785                                 -1, stream, router_port->id, b);
2786 
2787     if (nxt_slow_path(ret != NXT_OK)) {
2788         nxt_port_rpc_cancel(task, router_port, stream);
2789         goto fail;
2790     }
2791 
2792     app->pending_processes++;
2793 
2794     return;
2795 
2796 fail:
2797 
2798     nxt_router_conf_error(task, tmcf);
2799 }
2800 
2801 
2802 static void
2803 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2804     void *data)
2805 {
2806     nxt_app_t           *app;
2807     nxt_port_t          *port;
2808     nxt_app_rpc_t       *rpc;
2809     nxt_event_engine_t  *engine;
2810 
2811     rpc = data;
2812     app = rpc->app;
2813 
2814     port = msg->u.new_port;
2815 
2816     nxt_assert(port != NULL);
2817     nxt_assert(port->type == NXT_PROCESS_APP);
2818     nxt_assert(port->id == 0);
2819 
2820     port->app = app;
2821     port->main_app_port = port;
2822 
2823     app->pending_processes--;
2824     app->processes++;
2825     app->idle_processes++;
2826 
2827     engine = task->thread->engine;
2828 
2829     nxt_queue_insert_tail(&app->ports, &port->app_link);
2830     nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
2831 
2832     nxt_debug(task, "app '%V' move new port %PI:%d to spare_ports",
2833               &app->name, port->pid, port->id);
2834 
2835     nxt_port_hash_add(&app->port_hash, port);
2836     app->port_hash_count++;
2837 
2838     port->idle_start = 0;
2839 
2840     nxt_port_inc_use(port);
2841 
2842     nxt_router_app_shared_port_send(task, port);
2843 
2844     nxt_work_queue_add(&engine->fast_work_queue,
2845                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2846 }
2847 
2848 
2849 static void
2850 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2851     void *data)
2852 {
2853     nxt_app_t               *app;
2854     nxt_app_rpc_t           *rpc;
2855     nxt_router_temp_conf_t  *tmcf;
2856 
2857     rpc = data;
2858     app = rpc->app;
2859     tmcf = rpc->temp_conf;
2860 
2861     nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"",
2862             &app->name);
2863 
2864     app->pending_processes--;
2865 
2866     nxt_router_conf_error(task, tmcf);
2867 }
2868 
2869 
2870 static nxt_int_t
2871 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
2872     nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
2873 {
2874     nxt_int_t                 ret;
2875     nxt_uint_t                n, threads;
2876     nxt_queue_link_t          *qlk;
2877     nxt_router_engine_conf_t  *recf;
2878 
2879     threads = tmcf->router_conf->threads;
2880 
2881     tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
2882                                      sizeof(nxt_router_engine_conf_t));
2883     if (nxt_slow_path(tmcf->engines == NULL)) {
2884         return NXT_ERROR;
2885     }
2886 
2887     n = 0;
2888 
2889     for (qlk = nxt_queue_first(&router->engines);
2890          qlk != nxt_queue_tail(&router->engines);
2891          qlk = nxt_queue_next(qlk))
2892     {
2893         recf = nxt_array_zero_add(tmcf->engines);
2894         if (nxt_slow_path(recf == NULL)) {
2895             return NXT_ERROR;
2896         }
2897 
2898         recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0);
2899 
2900         if (n < threads) {
2901             recf->action = NXT_ROUTER_ENGINE_KEEP;
2902             ret = nxt_router_engine_conf_update(tmcf, recf);
2903 
2904         } else {
2905             recf->action = NXT_ROUTER_ENGINE_DELETE;
2906             ret = nxt_router_engine_conf_delete(tmcf, recf);
2907         }
2908 
2909         if (nxt_slow_path(ret != NXT_OK)) {
2910             return ret;
2911         }
2912 
2913         n++;
2914     }
2915 
2916     tmcf->new_threads = n;
2917 
2918     while (n < threads) {
2919         recf = nxt_array_zero_add(tmcf->engines);
2920         if (nxt_slow_path(recf == NULL)) {
2921             return NXT_ERROR;
2922         }
2923 
2924         recf->action = NXT_ROUTER_ENGINE_ADD;
2925 
2926         recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
2927         if (nxt_slow_path(recf->engine == NULL)) {
2928             return NXT_ERROR;
2929         }
2930 
2931         ret = nxt_router_engine_conf_create(tmcf, recf);
2932         if (nxt_slow_path(ret != NXT_OK)) {
2933             return ret;
2934         }
2935 
2936         n++;
2937     }
2938 
2939     return NXT_OK;
2940 }
2941 
2942 
2943 static nxt_int_t
2944 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
2945     nxt_router_engine_conf_t *recf)
2946 {
2947     nxt_int_t  ret;
2948 
2949     ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
2950                                           nxt_router_listen_socket_create);
2951     if (nxt_slow_path(ret != NXT_OK)) {
2952         return ret;
2953     }
2954 
2955     ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
2956                                           nxt_router_listen_socket_create);
2957     if (nxt_slow_path(ret != NXT_OK)) {
2958         return ret;
2959     }
2960 
2961     return ret;
2962 }
2963 
2964 
2965 static nxt_int_t
2966 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
2967     nxt_router_engine_conf_t *recf)
2968 {
2969     nxt_int_t  ret;
2970 
2971     ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
2972                                           nxt_router_listen_socket_create);
2973     if (nxt_slow_path(ret != NXT_OK)) {
2974         return ret;
2975     }
2976 
2977     ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
2978                                           nxt_router_listen_socket_update);
2979     if (nxt_slow_path(ret != NXT_OK)) {
2980         return ret;
2981     }
2982 
2983     ret = nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
2984     if (nxt_slow_path(ret != NXT_OK)) {
2985         return ret;
2986     }
2987 
2988     return ret;
2989 }
2990 
2991 
2992 static nxt_int_t
2993 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
2994     nxt_router_engine_conf_t *recf)
2995 {
2996     nxt_int_t  ret;
2997 
2998     ret = nxt_router_engine_quit(tmcf, recf);
2999     if (nxt_slow_path(ret != NXT_OK)) {
3000         return ret;
3001     }
3002 
3003     ret = nxt_router_engine_joints_delete(tmcf, recf, &updating_sockets);
3004     if (nxt_slow_path(ret != NXT_OK)) {
3005         return ret;
3006     }
3007 
3008     return nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
3009 }
3010 
3011 
3012 static nxt_int_t
3013 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
3014     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
3015     nxt_work_handler_t handler)
3016 {
3017     nxt_int_t                ret;
3018     nxt_joint_job_t          *job;
3019     nxt_queue_link_t         *qlk;
3020     nxt_socket_conf_t        *skcf;
3021     nxt_socket_conf_joint_t  *joint;
3022 
3023     for (qlk = nxt_queue_first(sockets);
3024          qlk != nxt_queue_tail(sockets);
3025          qlk = nxt_queue_next(qlk))
3026     {
3027         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3028         if (nxt_slow_path(job == NULL)) {
3029             return NXT_ERROR;
3030         }
3031 
3032         job->work.next = recf->jobs;
3033         recf->jobs = &job->work;
3034 
3035         job->task = tmcf->engine->task;
3036         job->work.handler = handler;
3037         job->work.task = &job->task;
3038         job->work.obj = job;
3039         job->tmcf = tmcf;
3040 
3041         tmcf->count++;
3042 
3043         joint = nxt_mp_alloc(tmcf->router_conf->mem_pool,
3044                              sizeof(nxt_socket_conf_joint_t));
3045         if (nxt_slow_path(joint == NULL)) {
3046             return NXT_ERROR;
3047         }
3048 
3049         job->work.data = joint;
3050 
3051         ret = nxt_upstreams_joint_create(tmcf, &joint->upstreams);
3052         if (nxt_slow_path(ret != NXT_OK)) {
3053             return ret;
3054         }
3055 
3056         joint->count = 1;
3057 
3058         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
3059         skcf->count++;
3060         joint->socket_conf = skcf;
3061 
3062         joint->engine = recf->engine;
3063     }
3064 
3065     return NXT_OK;
3066 }
3067 
3068 
3069 static nxt_int_t
3070 nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
3071     nxt_router_engine_conf_t *recf)
3072 {
3073     nxt_joint_job_t  *job;
3074 
3075     job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3076     if (nxt_slow_path(job == NULL)) {
3077         return NXT_ERROR;
3078     }
3079 
3080     job->work.next = recf->jobs;
3081     recf->jobs = &job->work;
3082 
3083     job->task = tmcf->engine->task;
3084     job->work.handler = nxt_router_worker_thread_quit;
3085     job->work.task = &job->task;
3086     job->work.obj = NULL;
3087     job->work.data = NULL;
3088     job->tmcf = NULL;
3089 
3090     return NXT_OK;
3091 }
3092 
3093 
3094 static nxt_int_t
3095 nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
3096     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets)
3097 {
3098     nxt_joint_job_t   *job;
3099     nxt_queue_link_t  *qlk;
3100 
3101     for (qlk = nxt_queue_first(sockets);
3102          qlk != nxt_queue_tail(sockets);
3103          qlk = nxt_queue_next(qlk))
3104     {
3105         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3106         if (nxt_slow_path(job == NULL)) {
3107             return NXT_ERROR;
3108         }
3109 
3110         job->work.next = recf->jobs;
3111         recf->jobs = &job->work;
3112 
3113         job->task = tmcf->engine->task;
3114         job->work.handler = nxt_router_listen_socket_delete;
3115         job->work.task = &job->task;
3116         job->work.obj = job;
3117         job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
3118         job->tmcf = tmcf;
3119 
3120         tmcf->count++;
3121     }
3122 
3123     return NXT_OK;
3124 }
3125 
3126 
3127 static nxt_int_t
3128 nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
3129     nxt_router_temp_conf_t *tmcf)
3130 {
3131     nxt_int_t                 ret;
3132     nxt_uint_t                i, threads;
3133     nxt_router_engine_conf_t  *recf;
3134 
3135     recf = tmcf->engines->elts;
3136     threads = tmcf->router_conf->threads;
3137 
3138     for (i = tmcf->new_threads; i < threads; i++) {
3139         ret = nxt_router_thread_create(task, rt, recf[i].engine);
3140         if (nxt_slow_path(ret != NXT_OK)) {
3141             return ret;
3142         }
3143     }
3144 
3145     return NXT_OK;
3146 }
3147 
3148 
3149 static nxt_int_t
3150 nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
3151     nxt_event_engine_t *engine)
3152 {
3153     nxt_int_t            ret;
3154     nxt_thread_link_t    *link;
3155     nxt_thread_handle_t  handle;
3156 
3157     link = nxt_zalloc(sizeof(nxt_thread_link_t));
3158 
3159     if (nxt_slow_path(link == NULL)) {
3160         return NXT_ERROR;
3161     }
3162 
3163     link->start = nxt_router_thread_start;
3164     link->engine = engine;
3165     link->work.handler = nxt_router_thread_exit_handler;
3166     link->work.task = task;
3167     link->work.data = link;
3168 
3169     nxt_queue_insert_tail(&rt->engines, &engine->link);
3170 
3171     ret = nxt_thread_create(&handle, link);
3172 
3173     if (nxt_slow_path(ret != NXT_OK)) {
3174         nxt_queue_remove(&engine->link);
3175     }
3176 
3177     return ret;
3178 }
3179 
3180 
3181 static void
3182 nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
3183     nxt_router_temp_conf_t *tmcf)
3184 {
3185     nxt_app_t  *app;
3186 
3187     nxt_queue_each(app, &router->apps, nxt_app_t, link) {
3188 
3189         nxt_router_app_unlink(task, app);
3190 
3191     } nxt_queue_loop;
3192 
3193     nxt_queue_add(&router->apps, &tmcf->previous);
3194     nxt_queue_add(&router->apps, &tmcf->apps);
3195 }
3196 
3197 
3198 static void
3199 nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf)
3200 {
3201     nxt_uint_t                n;
3202     nxt_event_engine_t        *engine;
3203     nxt_router_engine_conf_t  *recf;
3204 
3205     recf = tmcf->engines->elts;
3206 
3207     for (n = tmcf->engines->nelts; n != 0; n--) {
3208         engine = recf->engine;
3209 
3210         switch (recf->action) {
3211 
3212         case NXT_ROUTER_ENGINE_KEEP:
3213             break;
3214 
3215         case NXT_ROUTER_ENGINE_ADD:
3216             nxt_queue_insert_tail(&router->engines, &engine->link0);
3217             break;
3218 
3219         case NXT_ROUTER_ENGINE_DELETE:
3220             nxt_queue_remove(&engine->link0);
3221             break;
3222         }
3223 
3224         nxt_router_engine_post(engine, recf->jobs);
3225 
3226         recf++;
3227     }
3228 }
3229 
3230 
3231 static void
3232 nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs)
3233 {
3234     nxt_work_t  *work, *next;
3235 
3236     for (work = jobs; work != NULL; work = next) {
3237         next = work->next;
3238         work->next = NULL;
3239 
3240         nxt_event_engine_post(engine, work);
3241     }
3242 }
3243 
3244 
3245 static nxt_port_handlers_t  nxt_router_app_port_handlers = {
3246     .rpc_error       = nxt_port_rpc_handler,
3247     .mmap            = nxt_port_mmap_handler,
3248     .data            = nxt_port_rpc_handler,
3249     .oosm            = nxt_router_oosm_handler,
3250     .req_headers_ack = nxt_port_rpc_handler,
3251 };
3252 
3253 
3254 static void
3255 nxt_router_thread_start(void *data)
3256 {
3257     nxt_int_t           ret;
3258     nxt_port_t          *port;
3259     nxt_task_t          *task;
3260     nxt_work_t          *work;
3261     nxt_thread_t        *thread;
3262     nxt_thread_link_t   *link;
3263     nxt_event_engine_t  *engine;
3264 
3265     link = data;
3266     engine = link->engine;
3267     task = &engine->task;
3268 
3269     thread = nxt_thread();
3270 
3271     nxt_event_engine_thread_adopt(engine);
3272 
3273     /* STUB */
3274     thread->runtime = engine->task.thread->runtime;
3275 
3276     engine->task.thread = thread;
3277     engine->task.log = thread->log;
3278     thread->engine = engine;
3279     thread->task = &engine->task;
3280 #if 0
3281     thread->fiber = &engine->fibers->fiber;
3282 #endif
3283 
3284     engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
3285     if (nxt_slow_path(engine->mem_pool == NULL)) {
3286         return;
3287     }
3288 
3289     port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid,
3290                         NXT_PROCESS_ROUTER);
3291     if (nxt_slow_path(port == NULL)) {
3292         return;
3293     }
3294 
3295     ret = nxt_port_socket_init(task, port, 0);
3296     if (nxt_slow_path(ret != NXT_OK)) {
3297         nxt_port_use(task, port, -1);
3298         return;
3299     }
3300 
3301     ret = nxt_router_port_queue_init(task, port);
3302     if (nxt_slow_path(ret != NXT_OK)) {
3303         nxt_port_use(task, port, -1);
3304         return;
3305     }
3306 
3307     engine->port = port;
3308 
3309     nxt_port_enable(task, port, &nxt_router_app_port_handlers);
3310 
3311     work = nxt_zalloc(sizeof(nxt_work_t));
3312     if (nxt_slow_path(work == NULL)) {
3313         return;
3314     }
3315 
3316     work->handler = nxt_router_rt_add_port;
3317     work->task = link->work.task;
3318     work->obj = work;
3319     work->data = port;
3320 
3321     nxt_event_engine_post(link->work.task->thread->engine, work);
3322 
3323     nxt_event_engine_start(engine);
3324 }
3325 
3326 
3327 static void
3328 nxt_router_rt_add_port(nxt_task_t *task, void *obj, void *data)
3329 {
3330     nxt_int_t      res;
3331     nxt_port_t     *port;
3332     nxt_runtime_t  *rt;
3333 
3334     rt = task->thread->runtime;
3335     port = data;
3336 
3337     nxt_free(obj);
3338 
3339     res = nxt_port_hash_add(&rt->ports, port);
3340 
3341     if (nxt_fast_path(res == NXT_OK)) {
3342         nxt_port_use(task, port, 1);
3343     }
3344 }
3345 
3346 
3347 static void
3348 nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data)
3349 {
3350     nxt_joint_job_t          *job;
3351     nxt_socket_conf_t        *skcf;
3352     nxt_listen_event_t       *lev;
3353     nxt_listen_socket_t      *ls;
3354     nxt_thread_spinlock_t    *lock;
3355     nxt_socket_conf_joint_t  *joint;
3356 
3357     job = obj;
3358     joint = data;
3359 
3360     nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link);
3361 
3362     skcf = joint->socket_conf;
3363     ls = skcf->listen;
3364 
3365     lev = nxt_listen_event(task, ls);
3366     if (nxt_slow_path(lev == NULL)) {
3367         nxt_router_listen_socket_release(task, skcf);
3368         return;
3369     }
3370 
3371     lev->socket.data = joint;
3372 
3373     lock = &skcf->router_conf->router->lock;
3374 
3375     nxt_thread_spin_lock(lock);
3376     ls->count++;
3377     nxt_thread_spin_unlock(lock);
3378 
3379     job->work.next = NULL;
3380     job->work.handler = nxt_router_conf_wait;
3381 
3382     nxt_event_engine_post(job->tmcf->engine, &job->work);
3383 }
3384 
3385 
3386 nxt_inline nxt_listen_event_t *
3387 nxt_router_listen_event(nxt_queue_t *listen_connections,
3388     nxt_socket_conf_t *skcf)
3389 {
3390     nxt_socket_t        fd;
3391     nxt_queue_link_t    *qlk;
3392     nxt_listen_event_t  *lev;
3393 
3394     fd = skcf->listen->socket;
3395 
3396     for (qlk = nxt_queue_first(listen_connections);
3397          qlk != nxt_queue_tail(listen_connections);
3398          qlk = nxt_queue_next(qlk))
3399     {
3400         lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link);
3401 
3402         if (fd == lev->socket.fd) {
3403             return lev;
3404         }
3405     }
3406 
3407     return NULL;
3408 }
3409 
3410 
3411 static void
3412 nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data)
3413 {
3414     nxt_joint_job_t          *job;
3415     nxt_event_engine_t       *engine;
3416     nxt_listen_event_t       *lev;
3417     nxt_socket_conf_joint_t  *joint, *old;
3418 
3419     job = obj;
3420     joint = data;
3421 
3422     engine = task->thread->engine;
3423 
3424     nxt_queue_insert_tail(&engine->joints, &joint->link);
3425 
3426     lev = nxt_router_listen_event(&engine->listen_connections,
3427                                   joint->socket_conf);
3428 
3429     old = lev->socket.data;
3430     lev->socket.data = joint;
3431     lev->listen = joint->socket_conf->listen;
3432 
3433     job->work.next = NULL;
3434     job->work.handler = nxt_router_conf_wait;
3435 
3436     nxt_event_engine_post(job->tmcf->engine, &job->work);
3437 
3438     /*
3439      * The task is allocated from configuration temporary
3440      * memory pool so it can be freed after engine post operation.
3441      */
3442 
3443     nxt_router_conf_release(&engine->task, old);
3444 }
3445 
3446 
3447 static void
3448 nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data)
3449 {
3450     nxt_socket_conf_t        *skcf;
3451     nxt_listen_event_t       *lev;
3452     nxt_event_engine_t       *engine;
3453     nxt_socket_conf_joint_t  *joint;
3454 
3455     skcf = data;
3456 
3457     engine = task->thread->engine;
3458 
3459     lev = nxt_router_listen_event(&engine->listen_connections, skcf);
3460 
3461     nxt_fd_event_delete(engine, &lev->socket);
3462 
3463     nxt_debug(task, "engine %p: listen socket delete: %d", engine,
3464               lev->socket.fd);
3465 
3466     joint = lev->socket.data;
3467     joint->close_job = obj;
3468 
3469     lev->timer.handler = nxt_router_listen_socket_close;
3470     lev->timer.work_queue = &engine->fast_work_queue;
3471 
3472     nxt_timer_add(engine, &lev->timer, 0);
3473 }
3474 
3475 
3476 static void
3477 nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, void *data)
3478 {
3479     nxt_event_engine_t  *engine;
3480 
3481     nxt_debug(task, "router worker thread quit");
3482 
3483     engine = task->thread->engine;
3484 
3485     engine->shutdown = 1;
3486 
3487     if (nxt_queue_is_empty(&engine->joints)) {
3488         nxt_thread_exit(task->thread);
3489     }
3490 }
3491 
3492 
3493 static void
3494 nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data)
3495 {
3496     nxt_timer_t              *timer;
3497     nxt_joint_job_t          *job;
3498     nxt_listen_event_t       *lev;
3499     nxt_socket_conf_joint_t  *joint;
3500 
3501     timer = obj;
3502     lev = nxt_timer_data(timer, nxt_listen_event_t, timer);
3503 
3504     nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine,
3505               lev->socket.fd);
3506 
3507     nxt_queue_remove(&lev->link);
3508 
3509     joint = lev->socket.data;
3510     lev->socket.data = NULL;
3511 
3512     /* 'task' refers to lev->task and we cannot use after nxt_free() */
3513     task = &task->thread->engine->task;
3514 
3515     nxt_router_listen_socket_release(task, joint->socket_conf);
3516 
3517     job = joint->close_job;
3518     job->work.next = NULL;
3519     job->work.handler = nxt_router_conf_wait;
3520 
3521     nxt_event_engine_post(job->tmcf->engine, &job->work);
3522 
3523     nxt_router_listen_event_release(task, lev, joint);
3524 }
3525 
3526 
3527 static void
3528 nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf)
3529 {
3530     nxt_listen_socket_t    *ls;
3531     nxt_thread_spinlock_t  *lock;
3532 
3533     ls = skcf->listen;
3534     lock = &skcf->router_conf->router->lock;
3535 
3536     nxt_thread_spin_lock(lock);
3537 
3538     nxt_debug(task, "engine %p: listen socket release: ls->count %D",
3539               task->thread->engine, ls->count);
3540 
3541     if (--ls->count != 0) {
3542         ls = NULL;
3543     }
3544 
3545     nxt_thread_spin_unlock(lock);
3546 
3547     if (ls != NULL) {
3548         nxt_socket_close(task, ls->socket);
3549         nxt_free(ls);
3550     }
3551 }
3552 
3553 
3554 void
3555 nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev,
3556     nxt_socket_conf_joint_t *joint)
3557 {
3558     nxt_event_engine_t  *engine;
3559 
3560     nxt_debug(task, "listen event count: %D", lev->count);
3561 
3562     engine = task->thread->engine;
3563 
3564     if (--lev->count == 0) {
3565         if (lev->next != NULL) {
3566             nxt_sockaddr_cache_free(engine, lev->next);
3567 
3568             nxt_conn_free(task, lev->next);
3569         }
3570 
3571         nxt_free(lev);
3572     }
3573 
3574     if (joint != NULL) {
3575         nxt_router_conf_release(task, joint);
3576     }
3577 
3578     if (engine->shutdown && nxt_queue_is_empty(&engine->joints)) {
3579         nxt_thread_exit(task->thread);
3580     }
3581 }
3582 
3583 
3584 void
3585 nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
3586 {
3587     nxt_socket_conf_t      *skcf;
3588     nxt_router_conf_t      *rtcf;
3589     nxt_thread_spinlock_t  *lock;
3590 
3591     nxt_debug(task, "conf joint %p count: %D", joint, joint->count);
3592 
3593     if (--joint->count != 0) {
3594         return;
3595     }
3596 
3597     nxt_queue_remove(&joint->link);
3598 
3599     /*
3600      * The joint content can not be safely used after the critical
3601      * section protected by the spinlock because its memory pool may
3602      * be already destroyed by another thread.
3603      */
3604     skcf = joint->socket_conf;
3605     rtcf = skcf->router_conf;
3606     lock = &rtcf->router->lock;
3607 
3608     nxt_thread_spin_lock(lock);
3609 
3610     nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count,
3611               rtcf, rtcf->count);
3612 
3613     if (--skcf->count != 0) {
3614         skcf = NULL;
3615         rtcf = NULL;
3616 
3617     } else {
3618         nxt_queue_remove(&skcf->link);
3619 
3620         if (--rtcf->count != 0) {
3621             rtcf = NULL;
3622         }
3623     }
3624 
3625     nxt_thread_spin_unlock(lock);
3626 
3627 #if (NXT_TLS)
3628     if (skcf != NULL && skcf->tls != NULL) {
3629         task->thread->runtime->tls->server_free(task, skcf->tls);
3630     }
3631 #endif
3632 
3633     /* TODO remove engine->port */
3634 
3635     if (rtcf != NULL) {
3636         nxt_debug(task, "old router conf is destroyed");
3637 
3638         nxt_router_apps_hash_use(task, rtcf, -1);
3639 
3640         nxt_router_access_log_release(task, lock, rtcf->access_log);
3641 
3642         nxt_mp_thread_adopt(rtcf->mem_pool);
3643 
3644         nxt_mp_destroy(rtcf->mem_pool);
3645     }
3646 }
3647 
3648 
3649 static void
3650 nxt_router_access_log_writer(nxt_task_t *task, nxt_http_request_t *r,
3651     nxt_router_access_log_t *access_log)
3652 {
3653     size_t     size;
3654     u_char     *buf, *p;
3655     nxt_off_t  bytes;
3656 
3657     static nxt_time_string_t  date_cache = {
3658         (nxt_atomic_uint_t) -1,
3659         nxt_router_access_log_date,
3660         "%02d/%s/%4d:%02d:%02d:%02d %c%02d%02d",
3661         nxt_length("31/Dec/1986:19:40:00 +0300"),
3662         NXT_THREAD_TIME_LOCAL,
3663         NXT_THREAD_TIME_SEC,
3664     };
3665 
3666     size = r->remote->address_length
3667            + 6                  /* ' - - [' */
3668            + date_cache.size
3669            + 3                  /* '] "' */
3670            + r->method->length
3671            + 1                  /* space */
3672            + r->target.length
3673            + 1                  /* space */
3674            + r->version.length
3675            + 2                  /* '" ' */
3676            + 3                  /* status */
3677            + 1                  /* space */
3678            + NXT_OFF_T_LEN
3679            + 2                  /* ' "' */
3680            + (r->referer != NULL ? r->referer->value_length : 1)
3681            + 3                  /* '" "' */
3682            + (r->user_agent != NULL ? r->user_agent->value_length : 1)
3683            + 2                  /* '"\n' */
3684     ;
3685 
3686     buf = nxt_mp_nget(r->mem_pool, size);
3687     if (nxt_slow_path(buf == NULL)) {
3688         return;
3689     }
3690 
3691     p = nxt_cpymem(buf, nxt_sockaddr_address(r->remote),
3692                    r->remote->address_length);
3693 
3694     p = nxt_cpymem(p, " - - [", 6);
3695 
3696     p = nxt_thread_time_string(task->thread, &date_cache, p);
3697 
3698     p = nxt_cpymem(p, "] \"", 3);
3699 
3700     if (r->method->length != 0) {
3701         p = nxt_cpymem(p, r->method->start, r->method->length);
3702 
3703         if (r->target.length != 0) {
3704             *p++ = ' ';
3705             p = nxt_cpymem(p, r->target.start, r->target.length);
3706 
3707             if (r->version.length != 0) {
3708                 *p++ = ' ';
3709                 p = nxt_cpymem(p, r->version.start, r->version.length);
3710             }
3711         }
3712 
3713     } else {
3714         *p++ = '-';
3715     }
3716 
3717     p = nxt_cpymem(p, "\" ", 2);
3718 
3719     p = nxt_sprintf(p, p + 3, "%03d", r->status);
3720 
3721     *p++ = ' ';
3722 
3723     bytes = nxt_http_proto[r->protocol].body_bytes_sent(task, r->proto);
3724 
3725     p = nxt_sprintf(p, p + NXT_OFF_T_LEN, "%O", bytes);
3726 
3727     p = nxt_cpymem(p, " \"", 2);
3728 
3729     if (r->referer != NULL) {
3730         p = nxt_cpymem(p, r->referer->value, r->referer->value_length);
3731 
3732     } else {
3733         *p++ = '-';
3734     }
3735 
3736     p = nxt_cpymem(p, "\" \"", 3);
3737 
3738     if (r->user_agent != NULL) {
3739         p = nxt_cpymem(p, r->user_agent->value, r->user_agent->value_length);
3740 
3741     } else {
3742         *p++ = '-';
3743     }
3744 
3745     p = nxt_cpymem(p, "\"\n", 2);
3746 
3747     nxt_fd_write(access_log->fd, buf, p - buf);
3748 }
3749 
3750 
3751 static u_char *
3752 nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, struct tm *tm,
3753     size_t size, const char *format)
3754 {
3755     u_char  sign;
3756     time_t  gmtoff;
3757 
3758     static const char  *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun",
3759                                     "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
3760 
3761     gmtoff = nxt_timezone(tm) / 60;
3762 
3763     if (gmtoff < 0) {
3764         gmtoff = -gmtoff;
3765         sign = '-';
3766 
3767     } else {
3768         sign = '+';
3769     }
3770 
3771     return nxt_sprintf(buf, buf + size, format,
3772                        tm->tm_mday, month[tm->tm_mon], tm->tm_year + 1900,
3773                        tm->tm_hour, tm->tm_min, tm->tm_sec,
3774                        sign, gmtoff / 60, gmtoff % 60);
3775 }
3776 
3777 
3778 static void
3779 nxt_router_access_log_open(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
3780 {
3781     uint32_t                 stream;
3782     nxt_int_t                ret;
3783     nxt_buf_t                *b;
3784     nxt_port_t               *main_port, *router_port;
3785     nxt_runtime_t            *rt;
3786     nxt_router_access_log_t  *access_log;
3787 
3788     access_log = tmcf->router_conf->access_log;
3789 
3790     b = nxt_buf_mem_alloc(tmcf->mem_pool, access_log->path.length + 1, 0);
3791     if (nxt_slow_path(b == NULL)) {
3792         goto fail;
3793     }
3794 
3795     b->completion_handler = nxt_router_dummy_buf_completion;
3796 
3797     nxt_buf_cpystr(b, &access_log->path);
3798     *b->mem.free++ = '\0';
3799 
3800     rt = task->thread->runtime;
3801     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
3802     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
3803 
3804     stream = nxt_port_rpc_register_handler(task, router_port,
3805                                            nxt_router_access_log_ready,
3806                                            nxt_router_access_log_error,
3807                                            -1, tmcf);
3808     if (nxt_slow_path(stream == 0)) {
3809         goto fail;
3810     }
3811 
3812     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1,
3813                                 stream, router_port->id, b);
3814 
3815     if (nxt_slow_path(ret != NXT_OK)) {
3816         nxt_port_rpc_cancel(task, router_port, stream);
3817         goto fail;
3818     }
3819 
3820     return;
3821 
3822 fail:
3823 
3824     nxt_router_conf_error(task, tmcf);
3825 }
3826 
3827 
3828 static void
3829 nxt_router_access_log_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3830     void *data)
3831 {
3832     nxt_router_temp_conf_t   *tmcf;
3833     nxt_router_access_log_t  *access_log;
3834 
3835     tmcf = data;
3836 
3837     access_log = tmcf->router_conf->access_log;
3838 
3839     access_log->fd = msg->fd[0];
3840 
3841     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
3842                        nxt_router_conf_apply, task, tmcf, NULL);
3843 }
3844 
3845 
3846 static void
3847 nxt_router_access_log_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3848     void *data)
3849 {
3850     nxt_router_temp_conf_t  *tmcf;
3851 
3852     tmcf = data;
3853 
3854     nxt_router_conf_error(task, tmcf);
3855 }
3856 
3857 
3858 static void
3859 nxt_router_access_log_release(nxt_task_t *task, nxt_thread_spinlock_t *lock,
3860     nxt_router_access_log_t *access_log)
3861 {
3862     if (access_log == NULL) {
3863         return;
3864     }
3865 
3866     nxt_thread_spin_lock(lock);
3867 
3868     if (--access_log->count != 0) {
3869         access_log = NULL;
3870     }
3871 
3872     nxt_thread_spin_unlock(lock);
3873 
3874     if (access_log != NULL) {
3875 
3876         if (access_log->fd != -1) {
3877             nxt_fd_close(access_log->fd);
3878         }
3879 
3880         nxt_free(access_log);
3881     }
3882 }
3883 
3884 
3885 typedef struct {
3886     nxt_mp_t                 *mem_pool;
3887     nxt_router_access_log_t  *access_log;
3888 } nxt_router_access_log_reopen_t;
3889 
3890 
3891 static void
3892 nxt_router_access_log_reopen_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
3893 {
3894     nxt_mp_t                        *mp;
3895     uint32_t                        stream;
3896     nxt_int_t                       ret;
3897     nxt_buf_t                       *b;
3898     nxt_port_t                      *main_port, *router_port;
3899     nxt_runtime_t                   *rt;
3900     nxt_router_access_log_t         *access_log;
3901     nxt_router_access_log_reopen_t  *reopen;
3902 
3903     access_log = nxt_router->access_log;
3904 
3905     if (access_log == NULL) {
3906         return;
3907     }
3908 
3909     mp = nxt_mp_create(1024, 128, 256, 32);
3910     if (nxt_slow_path(mp == NULL)) {
3911         return;
3912     }
3913 
3914     reopen = nxt_mp_get(mp, sizeof(nxt_router_access_log_reopen_t));
3915     if (nxt_slow_path(reopen == NULL)) {
3916         goto fail;
3917     }
3918 
3919     reopen->mem_pool = mp;
3920     reopen->access_log = access_log;
3921 
3922     b = nxt_buf_mem_alloc(mp, access_log->path.length + 1, 0);
3923     if (nxt_slow_path(b == NULL)) {
3924         goto fail;
3925     }
3926 
3927     b->completion_handler = nxt_router_access_log_reopen_completion;
3928 
3929     nxt_buf_cpystr(b, &access_log->path);
3930     *b->mem.free++ = '\0';
3931 
3932     rt = task->thread->runtime;
3933     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
3934     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
3935 
3936     stream = nxt_port_rpc_register_handler(task, router_port,
3937                                            nxt_router_access_log_reopen_ready,
3938                                            nxt_router_access_log_reopen_error,
3939                                            -1, reopen);
3940     if (nxt_slow_path(stream == 0)) {
3941         goto fail;
3942     }
3943 
3944     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1,
3945                                 stream, router_port->id, b);
3946 
3947     if (nxt_slow_path(ret != NXT_OK)) {
3948         nxt_port_rpc_cancel(task, router_port, stream);
3949         goto fail;
3950     }
3951 
3952     nxt_mp_retain(mp);
3953 
3954     return;
3955 
3956 fail:
3957 
3958     nxt_mp_destroy(mp);
3959 }
3960 
3961 
3962 static void
3963 nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj, void *data)
3964 {
3965     nxt_mp_t   *mp;
3966     nxt_buf_t  *b;
3967 
3968     b = obj;
3969     mp = b->data;
3970 
3971     nxt_mp_release(mp);
3972 }
3973 
3974 
3975 static void
3976 nxt_router_access_log_reopen_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3977     void *data)
3978 {
3979     nxt_router_access_log_t         *access_log;
3980     nxt_router_access_log_reopen_t  *reopen;
3981 
3982     reopen = data;
3983 
3984     access_log = reopen->access_log;
3985 
3986     if (access_log == nxt_router->access_log) {
3987 
3988         if (nxt_slow_path(dup2(msg->fd[0], access_log->fd) == -1)) {
3989             nxt_alert(task, "dup2(%FD, %FD) failed %E",
3990                       msg->fd[0], access_log->fd, nxt_errno);
3991         }
3992     }
3993 
3994     nxt_fd_close(msg->fd[0]);
3995     nxt_mp_release(reopen->mem_pool);
3996 }
3997 
3998 
3999 static void
4000 nxt_router_access_log_reopen_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4001     void *data)
4002 {
4003     nxt_router_access_log_reopen_t  *reopen;
4004 
4005     reopen = data;
4006 
4007     nxt_mp_release(reopen->mem_pool);
4008 }
4009 
4010 
4011 static void
4012 nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data)
4013 {
4014     nxt_port_t           *port;
4015     nxt_thread_link_t    *link;
4016     nxt_event_engine_t   *engine;
4017     nxt_thread_handle_t  handle;
4018 
4019     handle = (nxt_thread_handle_t) (uintptr_t) obj;
4020     link = data;
4021 
4022     nxt_thread_wait(handle);
4023 
4024     engine = link->engine;
4025 
4026     nxt_queue_remove(&engine->link);
4027 
4028     port = engine->port;
4029 
4030     // TODO notify all apps
4031 
4032     port->engine = task->thread->engine;
4033     nxt_mp_thread_adopt(port->mem_pool);
4034     nxt_port_use(task, port, -1);
4035 
4036     nxt_mp_thread_adopt(engine->mem_pool);
4037     nxt_mp_destroy(engine->mem_pool);
4038 
4039     nxt_event_engine_free(engine);
4040 
4041     nxt_free(link);
4042 }
4043 
4044 
4045 static void
4046 nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4047     void *data)
4048 {
4049     size_t                  b_size, count;
4050     nxt_int_t               ret;
4051     nxt_app_t               *app;
4052     nxt_buf_t               *b, *next;
4053     nxt_port_t              *app_port;
4054     nxt_unit_field_t        *f;
4055     nxt_http_field_t        *field;
4056     nxt_http_request_t      *r;
4057     nxt_unit_response_t     *resp;
4058     nxt_request_rpc_data_t  *req_rpc_data;
4059 
4060     req_rpc_data = data;
4061 
4062     r = req_rpc_data->request;
4063     if (nxt_slow_path(r == NULL)) {
4064         return;
4065     }
4066 
4067     if (r->error) {
4068         nxt_request_rpc_data_unlink(task, req_rpc_data);
4069         return;
4070     }
4071 
4072     app = req_rpc_data->app;
4073     nxt_assert(app != NULL);
4074 
4075     if (msg->port_msg.type == _NXT_PORT_MSG_REQ_HEADERS_ACK) {
4076         nxt_router_req_headers_ack_handler(task, msg, req_rpc_data);
4077 
4078         return;
4079     }
4080 
4081     b = (msg->size == 0) ? NULL : msg->buf;
4082 
4083     if (msg->port_msg.last != 0) {
4084         nxt_debug(task, "router data create last buf");
4085 
4086         nxt_buf_chain_add(&b, nxt_http_buf_last(r));
4087 
4088         req_rpc_data->rpc_cancel = 0;
4089 
4090         if (req_rpc_data->apr_action == NXT_APR_REQUEST_FAILED) {
4091             req_rpc_data->apr_action = NXT_APR_GOT_RESPONSE;
4092         }
4093 
4094         nxt_request_rpc_data_unlink(task, req_rpc_data);
4095 
4096     } else {
4097         if (app->timeout != 0) {
4098             r->timer.handler = nxt_router_app_timeout;
4099             r->timer_data = req_rpc_data;
4100             nxt_timer_add(task->thread->engine, &r->timer, app->timeout);
4101         }
4102     }
4103 
4104     if (b == NULL) {
4105         return;
4106     }
4107 
4108     if (msg->buf == b) {
4109         /* Disable instant buffer completion/re-using by port. */
4110         msg->buf = NULL;
4111     }
4112 
4113     if (r->header_sent) {
4114         nxt_buf_chain_add(&r->out, b);
4115         nxt_http_request_send_body(task, r, NULL);
4116 
4117     } else {
4118         b_size = nxt_buf_is_mem(b) ? nxt_buf_mem_used_size(&b->mem) : 0;
4119 
4120         if (nxt_slow_path(b_size < sizeof(nxt_unit_response_t))) {
4121             nxt_alert(task, "response buffer too small: %z", b_size);
4122             goto fail;
4123         }
4124 
4125         resp = (void *) b->mem.pos;
4126         count = (b_size - sizeof(nxt_unit_response_t))
4127                     / sizeof(nxt_unit_field_t);
4128 
4129         if (nxt_slow_path(count < resp->fields_count)) {
4130             nxt_alert(task, "response buffer too small for fields count: %D",
4131                       resp->fields_count);
4132             goto fail;
4133         }
4134 
4135         field = NULL;
4136 
4137         for (f = resp->fields; f < resp->fields + resp->fields_count; f++) {
4138             if (f->skip) {
4139                 continue;
4140             }
4141 
4142             field = nxt_list_add(r->resp.fields);
4143 
4144             if (nxt_slow_path(field == NULL)) {
4145                 goto fail;
4146             }
4147 
4148             field->hash = f->hash;
4149             field->skip = 0;
4150             field->hopbyhop = 0;
4151 
4152             field->name_length = f->name_length;
4153             field->value_length = f->value_length;
4154             field->name = nxt_unit_sptr_get(&f->name);
4155             field->value = nxt_unit_sptr_get(&f->value);
4156 
4157             ret = nxt_http_field_process(field, &nxt_response_fields_hash, r);
4158             if (nxt_slow_path(ret != NXT_OK)) {
4159                 goto fail;
4160             }
4161 
4162             nxt_debug(task, "header%s: %*s: %*s",
4163                       (field->skip ? " skipped" : ""),
4164                       (size_t) field->name_length, field->name,
4165                       (size_t) field->value_length, field->value);
4166 
4167             if (field->skip) {
4168                 r->resp.fields->last->nelts--;
4169             }
4170         }
4171 
4172         r->status = resp->status;
4173 
4174         if (resp->piggyback_content_length != 0) {
4175             b->mem.pos = nxt_unit_sptr_get(&resp->piggyback_content);
4176             b->mem.free = b->mem.pos + resp->piggyback_content_length;
4177 
4178         } else {
4179             b->mem.pos = b->mem.free;
4180         }
4181 
4182         if (nxt_buf_mem_used_size(&b->mem) == 0) {
4183             next = b->next;
4184             b->next = NULL;
4185 
4186             nxt_work_queue_add(&task->thread->engine->fast_work_queue,
4187                                b->completion_handler, task, b, b->parent);
4188 
4189             b = next;
4190         }
4191 
4192         if (b != NULL) {
4193             nxt_buf_chain_add(&r->out, b);
4194         }
4195 
4196         nxt_http_request_header_send(task, r, nxt_http_request_send_body, NULL);
4197 
4198         if (r->websocket_handshake
4199             && r->status == NXT_HTTP_SWITCHING_PROTOCOLS)
4200         {
4201             app_port = req_rpc_data->app_port;
4202             if (nxt_slow_path(app_port == NULL)) {
4203                 goto fail;
4204             }
4205 
4206             nxt_thread_mutex_lock(&app->mutex);
4207 
4208             app_port->main_app_port->active_websockets++;
4209 
4210             nxt_thread_mutex_unlock(&app->mutex);
4211 
4212             nxt_router_app_port_release(task, app_port, NXT_APR_UPGRADE);
4213             req_rpc_data->apr_action = NXT_APR_CLOSE;
4214 
4215             nxt_debug(task, "stream #%uD upgrade", req_rpc_data->stream);
4216 
4217             r->state = &nxt_http_websocket;
4218 
4219         } else {
4220             r->state = &nxt_http_request_send_state;
4221         }
4222     }
4223 
4224     return;
4225 
4226 fail:
4227 
4228     nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
4229 
4230     nxt_request_rpc_data_unlink(task, req_rpc_data);
4231 }
4232 
4233 
4234 static void
4235 nxt_router_req_headers_ack_handler(nxt_task_t *task,
4236     nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data)
4237 {
4238     int                 res;
4239     nxt_app_t           *app;
4240     nxt_buf_t           *b;
4241     nxt_bool_t          start_process, unlinked;
4242     nxt_port_t          *app_port, *main_app_port, *idle_port;
4243     nxt_queue_link_t    *idle_lnk;
4244     nxt_http_request_t  *r;
4245 
4246     nxt_debug(task, "stream #%uD: got ack from %PI:%d",
4247               req_rpc_data->stream,
4248               msg->port_msg.pid, msg->port_msg.reply_port);
4249 
4250     nxt_port_rpc_ex_set_peer(task, msg->port, req_rpc_data,
4251                              msg->port_msg.pid);
4252 
4253     app = req_rpc_data->app;
4254     r = req_rpc_data->request;
4255 
4256     start_process = 0;
4257     unlinked = 0;
4258 
4259     nxt_thread_mutex_lock(&app->mutex);
4260 
4261     if (r->app_link.next != NULL) {
4262         nxt_queue_remove(&r->app_link);
4263         r->app_link.next = NULL;
4264 
4265         unlinked = 1;
4266     }
4267 
4268     app_port = nxt_port_hash_find(&app->port_hash, msg->port_msg.pid,
4269                                   msg->port_msg.reply_port);
4270     if (nxt_slow_path(app_port == NULL)) {
4271         nxt_thread_mutex_unlock(&app->mutex);
4272 
4273         nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
4274 
4275         if (unlinked) {
4276             nxt_mp_release(r->mem_pool);
4277         }
4278 
4279         return;
4280     }
4281 
4282     main_app_port = app_port->main_app_port;
4283 
4284     if (nxt_queue_chk_remove(&main_app_port->idle_link)) {
4285         app->idle_processes--;
4286 
4287         nxt_debug(task, "app '%V' move port %PI:%d out of %s (ack)",
4288                   &app->name, main_app_port->pid, main_app_port->id,
4289                   (main_app_port->idle_start ? "idle_ports" : "spare_ports"));
4290 
4291         /* Check port was in 'spare_ports' using idle_start field. */
4292         if (main_app_port->idle_start == 0
4293             && app->idle_processes >= app->spare_processes)
4294         {
4295             /*
4296              * If there is a vacant space in spare ports,
4297              * move the last idle to spare_ports.
4298              */
4299             nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4300 
4301             idle_lnk = nxt_queue_last(&app->idle_ports);
4302             idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link);
4303             nxt_queue_remove(idle_lnk);
4304 
4305             nxt_queue_insert_tail(&app->spare_ports, idle_lnk);
4306 
4307             idle_port->idle_start = 0;
4308 
4309             nxt_debug(task, "app '%V' move port %PI:%d from idle_ports "
4310                       "to spare_ports",
4311                       &app->name, idle_port->pid, idle_port->id);
4312         }
4313 
4314         if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) {
4315             app->pending_processes++;
4316             start_process = 1;
4317         }
4318     }
4319 
4320     main_app_port->active_requests++;
4321 
4322     nxt_port_inc_use(app_port);
4323 
4324     nxt_thread_mutex_unlock(&app->mutex);
4325 
4326     if (unlinked) {
4327         nxt_mp_release(r->mem_pool);
4328     }
4329 
4330     if (start_process) {
4331         nxt_router_start_app_process(task, app);
4332     }
4333 
4334     nxt_port_use(task, req_rpc_data->app_port, -1);
4335 
4336     req_rpc_data->app_port = app_port;
4337 
4338     b = req_rpc_data->msg_info.buf;
4339 
4340     if (b != NULL) {
4341         /* First buffer is already sent.  Start from second. */
4342         b = b->next;
4343 
4344         req_rpc_data->msg_info.buf->next = NULL;
4345     }
4346 
4347     if (req_rpc_data->msg_info.body_fd != -1 || b != NULL) {
4348         nxt_debug(task, "stream #%uD: send body fd %d", req_rpc_data->stream,
4349                   req_rpc_data->msg_info.body_fd);
4350 
4351         if (req_rpc_data->msg_info.body_fd != -1) {
4352             lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET);
4353         }
4354 
4355         res = nxt_port_socket_write(task, app_port, NXT_PORT_MSG_REQ_BODY,
4356                                     req_rpc_data->msg_info.body_fd,
4357                                     req_rpc_data->stream,
4358                                     task->thread->engine->port->id, b);
4359 
4360         if (nxt_slow_path(res != NXT_OK)) {
4361             nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
4362         }
4363     }
4364 
4365     if (app->timeout != 0) {
4366         r->timer.handler = nxt_router_app_timeout;
4367         r->timer_data = req_rpc_data;
4368         nxt_timer_add(task->thread->engine, &r->timer, app->timeout);
4369     }
4370 }
4371 
4372 
4373 static const nxt_http_request_state_t  nxt_http_request_send_state
4374     nxt_aligned(64) =
4375 {
4376     .error_handler = nxt_http_request_error_handler,
4377 };
4378 
4379 
4380 static void
4381 nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data)
4382 {
4383     nxt_buf_t           *out;
4384     nxt_http_request_t  *r;
4385 
4386     r = obj;
4387 
4388     out = r->out;
4389 
4390     if (out != NULL) {
4391         r->out = NULL;
4392         nxt_http_request_send(task, r, out);
4393     }
4394 }
4395 
4396 
4397 static void
4398 nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4399     void *data)
4400 {
4401     nxt_request_rpc_data_t  *req_rpc_data;
4402 
4403     req_rpc_data = data;
4404 
4405     req_rpc_data->rpc_cancel = 0;
4406 
4407     /* TODO cancel message and return if cancelled. */
4408     // nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream);
4409 
4410     if (req_rpc_data->request != NULL) {
4411         nxt_http_request_error(task, req_rpc_data->request,
4412                                NXT_HTTP_SERVICE_UNAVAILABLE);
4413     }
4414 
4415     nxt_request_rpc_data_unlink(task, req_rpc_data);
4416 }
4417 
4418 
4419 static void
4420 nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4421     void *data)
4422 {
4423     nxt_app_t            *app;
4424     nxt_bool_t           start_process;
4425     nxt_port_t           *port;
4426     nxt_app_joint_t      *app_joint;
4427     nxt_app_joint_rpc_t  *app_joint_rpc;
4428 
4429     nxt_assert(data != NULL);
4430 
4431     app_joint_rpc = data;
4432     app_joint = app_joint_rpc->app_joint;
4433     port = msg->u.new_port;
4434 
4435     nxt_assert(app_joint != NULL);
4436     nxt_assert(port != NULL);
4437     nxt_assert(port->type == NXT_PROCESS_APP);
4438     nxt_assert(port->id == 0);
4439 
4440     app = app_joint->app;
4441 
4442     nxt_router_app_joint_use(task, app_joint, -1);
4443 
4444     if (nxt_slow_path(app == NULL)) {
4445         nxt_debug(task, "new port ready for released app, send QUIT");
4446 
4447         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4448 
4449         return;
4450     }
4451 
4452     nxt_thread_mutex_lock(&app->mutex);
4453 
4454     nxt_assert(app->pending_processes != 0);
4455 
4456     app->pending_processes--;
4457 
4458     if (nxt_slow_path(app->generation != app_joint_rpc->generation)) {
4459         nxt_debug(task, "new port ready for restarted app, send QUIT");
4460 
4461         start_process = !task->thread->engine->shutdown
4462                         && nxt_router_app_can_start(app)
4463                         && nxt_router_app_need_start(app);
4464 
4465         if (start_process) {
4466             app->pending_processes++;
4467         }
4468 
4469         nxt_thread_mutex_unlock(&app->mutex);
4470 
4471         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4472 
4473         if (start_process) {
4474             nxt_router_start_app_process(task, app);
4475         }
4476 
4477         return;
4478     }
4479 
4480     port->app = app;
4481     port->main_app_port = port;
4482 
4483     app->processes++;
4484     nxt_port_hash_add(&app->port_hash, port);
4485     app->port_hash_count++;
4486 
4487     nxt_thread_mutex_unlock(&app->mutex);
4488 
4489     nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d",
4490               &app->name, port->pid, app->processes, app->pending_processes);
4491 
4492     nxt_router_app_shared_port_send(task, port);
4493 
4494     nxt_router_app_port_release(task, port, NXT_APR_NEW_PORT);
4495 }
4496 
4497 
4498 static nxt_int_t
4499 nxt_router_app_shared_port_send(nxt_task_t *task, nxt_port_t *app_port)
4500 {
4501     nxt_buf_t                *b;
4502     nxt_port_t               *port;
4503     nxt_port_msg_new_port_t  *msg;
4504 
4505     b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
4506                              sizeof(nxt_port_data_t));
4507     if (nxt_slow_path(b == NULL)) {
4508         return NXT_ERROR;
4509     }
4510 
4511     port = app_port->app->shared_port;
4512 
4513     nxt_debug(task, "send port %FD to process %PI",
4514               port->pair[0], app_port->pid);
4515 
4516     b->mem.free += sizeof(nxt_port_msg_new_port_t);
4517     msg = (nxt_port_msg_new_port_t *) b->mem.pos;
4518 
4519     msg->id = port->id;
4520     msg->pid = port->pid;
4521     msg->max_size = port->max_size;
4522     msg->max_share = port->max_share;
4523     msg->type = port->type;
4524 
4525     return nxt_port_socket_write2(task, app_port,
4526                                   NXT_PORT_MSG_NEW_PORT,
4527                                   port->pair[0], port->queue_fd,
4528                                   0, 0, b);
4529 }
4530 
4531 
4532 static void
4533 nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4534     void *data)
4535 {
4536     nxt_app_t            *app;
4537     nxt_app_joint_t      *app_joint;
4538     nxt_queue_link_t     *link;
4539     nxt_http_request_t   *r;
4540     nxt_app_joint_rpc_t  *app_joint_rpc;
4541 
4542     nxt_assert(data != NULL);
4543 
4544     app_joint_rpc = data;
4545     app_joint = app_joint_rpc->app_joint;
4546 
4547     nxt_assert(app_joint != NULL);
4548 
4549     app = app_joint->app;
4550 
4551     nxt_router_app_joint_use(task, app_joint, -1);
4552 
4553     if (nxt_slow_path(app == NULL)) {
4554         nxt_debug(task, "start error for released app");
4555 
4556         return;
4557     }
4558 
4559     nxt_debug(task, "app '%V' %p start error", &app->name, app);
4560 
4561     link = NULL;
4562 
4563     nxt_thread_mutex_lock(&app->mutex);
4564 
4565     nxt_assert(app->pending_processes != 0);
4566 
4567     app->pending_processes--;
4568 
4569     if (app->processes == 0 && !nxt_queue_is_empty(&app->ack_waiting_req)) {
4570         link = nxt_queue_first(&app->ack_waiting_req);
4571 
4572         nxt_queue_remove(link);
4573         link->next = NULL;
4574     }
4575 
4576     nxt_thread_mutex_unlock(&app->mutex);
4577 
4578     while (link != NULL) {
4579         r = nxt_container_of(link, nxt_http_request_t, app_link);
4580 
4581         nxt_event_engine_post(r->engine, &r->err_work);
4582 
4583         link = NULL;
4584 
4585         nxt_thread_mutex_lock(&app->mutex);
4586 
4587         if (app->processes == 0 && app->pending_processes == 0
4588             && !nxt_queue_is_empty(&app->ack_waiting_req))
4589         {
4590             link = nxt_queue_first(&app->ack_waiting_req);
4591 
4592             nxt_queue_remove(link);
4593             link->next = NULL;
4594         }
4595 
4596         nxt_thread_mutex_unlock(&app->mutex);
4597     }
4598 }
4599 
4600 
4601 
4602 nxt_inline nxt_port_t *
4603 nxt_router_app_get_port_for_quit(nxt_task_t *task, nxt_app_t *app)
4604 {
4605     nxt_port_t  *port;
4606 
4607     port = NULL;
4608 
4609     nxt_thread_mutex_lock(&app->mutex);
4610 
4611     nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
4612 
4613         /* Caller is responsible to decrease port use count. */
4614         nxt_queue_chk_remove(&port->app_link);
4615 
4616         if (nxt_queue_chk_remove(&port->idle_link)) {
4617             app->idle_processes--;
4618 
4619             nxt_debug(task, "app '%V' move port %PI:%d out of %s for quit",
4620                       &app->name, port->pid, port->id,
4621                       (port->idle_start ? "idle_ports" : "spare_ports"));
4622         }
4623 
4624         nxt_port_hash_remove(&app->port_hash, port);
4625         app->port_hash_count--;
4626 
4627         port->app = NULL;
4628         app->processes--;
4629 
4630         break;
4631 
4632     } nxt_queue_loop;
4633 
4634     nxt_thread_mutex_unlock(&app->mutex);
4635 
4636     return port;
4637 }
4638 
4639 
4640 static void
4641 nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i)
4642 {
4643     int  c;
4644 
4645     c = nxt_atomic_fetch_add(&app->use_count, i);
4646 
4647     if (i < 0 && c == -i) {
4648 
4649         if (task->thread->engine != app->engine) {
4650             nxt_event_engine_post(app->engine, &app->joint->free_app_work);
4651 
4652         } else {
4653             nxt_router_free_app(task, app->joint, NULL);
4654         }
4655     }
4656 }
4657 
4658 
4659 static void
4660 nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app)
4661 {
4662     nxt_debug(task, "app '%V' %p unlink", &app->name, app);
4663 
4664     nxt_queue_remove(&app->link);
4665 
4666     nxt_router_app_use(task, app, -1);
4667 }
4668 
4669 
4670 static void
4671 nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
4672     nxt_apr_action_t action)
4673 {
4674     int         inc_use;
4675     uint32_t    got_response, dec_requests;
4676     nxt_app_t   *app;
4677     nxt_bool_t  port_unchained, send_quit, adjust_idle_timer;
4678     nxt_port_t  *main_app_port;
4679 
4680     nxt_assert(port != NULL);
4681     nxt_assert(port->app != NULL);
4682 
4683     app = port->app;
4684 
4685     inc_use = 0;
4686     got_response = 0;
4687     dec_requests = 0;
4688 
4689     switch (action) {
4690     case NXT_APR_NEW_PORT:
4691         break;
4692     case NXT_APR_REQUEST_FAILED:
4693         dec_requests = 1;
4694         inc_use = -1;
4695         break;
4696     case NXT_APR_GOT_RESPONSE:
4697         got_response = 1;
4698         inc_use = -1;
4699         break;
4700     case NXT_APR_UPGRADE:
4701         got_response = 1;
4702         break;
4703     case NXT_APR_CLOSE:
4704         inc_use = -1;
4705         break;
4706     }
4707 
4708     nxt_debug(task, "app '%V' release port %PI:%d: %d %d", &app->name,
4709               port->pid, port->id,
4710               (int) inc_use, (int) got_response);
4711 
4712     if (port->id == NXT_SHARED_PORT_ID) {
4713         nxt_thread_mutex_lock(&app->mutex);
4714 
4715         app->active_requests -= got_response + dec_requests;
4716 
4717         nxt_thread_mutex_unlock(&app->mutex);
4718 
4719         goto adjust_use;
4720     }
4721 
4722     main_app_port = port->main_app_port;
4723 
4724     nxt_thread_mutex_lock(&app->mutex);
4725 
4726     main_app_port->app_responses += got_response;
4727     main_app_port->active_requests -= got_response + dec_requests;
4728     app->active_requests -= got_response + dec_requests;
4729 
4730     if (main_app_port->pair[1] != -1
4731         && (app->max_requests == 0
4732             || main_app_port->app_responses < app->max_requests))
4733     {
4734         if (main_app_port->app_link.next == NULL) {
4735             nxt_queue_insert_tail(&app->ports, &main_app_port->app_link);
4736 
4737             nxt_port_inc_use(main_app_port);
4738         }
4739     }
4740 
4741     send_quit = (app->max_requests > 0
4742                  && main_app_port->app_responses >= app->max_requests);
4743 
4744     if (send_quit) {
4745         port_unchained = nxt_queue_chk_remove(&main_app_port->app_link);
4746 
4747         nxt_port_hash_remove(&app->port_hash, main_app_port);
4748         app->port_hash_count--;
4749 
4750         main_app_port->app = NULL;
4751         app->processes--;
4752 
4753     } else {
4754         port_unchained = 0;
4755     }
4756 
4757     adjust_idle_timer = 0;
4758 
4759     if (main_app_port->pair[1] != -1 && !send_quit
4760         && main_app_port->active_requests == 0
4761         && main_app_port->active_websockets == 0
4762         && main_app_port->idle_link.next == NULL)
4763     {
4764         if (app->idle_processes == app->spare_processes
4765             && app->adjust_idle_work.data == NULL)
4766         {
4767             adjust_idle_timer = 1;
4768             app->adjust_idle_work.data = app;
4769             app->adjust_idle_work.next = NULL;
4770         }
4771 
4772         if (app->idle_processes < app->spare_processes) {
4773             nxt_queue_insert_tail(&app->spare_ports, &main_app_port->idle_link);
4774 
4775             nxt_debug(task, "app '%V' move port %PI:%d to spare_ports",
4776                       &app->name, main_app_port->pid, main_app_port->id);
4777         } else {
4778             nxt_queue_insert_tail(&app->idle_ports, &main_app_port->idle_link);
4779 
4780             main_app_port->idle_start = task->thread->engine->timers.now;
4781 
4782             nxt_debug(task, "app '%V' move port %PI:%d to idle_ports",
4783                       &app->name, main_app_port->pid, main_app_port->id);
4784         }
4785 
4786         app->idle_processes++;
4787     }
4788 
4789     nxt_thread_mutex_unlock(&app->mutex);
4790 
4791     if (adjust_idle_timer) {
4792         nxt_router_app_use(task, app, 1);
4793         nxt_event_engine_post(app->engine, &app->adjust_idle_work);
4794     }
4795 
4796     /* ? */
4797     if (main_app_port->pair[1] == -1) {
4798         nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)",
4799                   &app->name, app, main_app_port, main_app_port->pid);
4800 
4801         goto adjust_use;
4802     }
4803 
4804     if (send_quit) {
4805         nxt_debug(task, "app '%V' %p send QUIT to port", &app->name, app);
4806 
4807         nxt_port_socket_write(task, main_app_port, NXT_PORT_MSG_QUIT, -1, 0, 0,
4808                               NULL);
4809 
4810         if (port_unchained) {
4811             nxt_port_use(task, main_app_port, -1);
4812         }
4813 
4814         goto adjust_use;
4815     }
4816 
4817     nxt_debug(task, "app '%V' %p requests queue is empty, keep the port",
4818               &app->name, app);
4819 
4820 adjust_use:
4821 
4822     nxt_port_use(task, port, inc_use);
4823 }
4824 
4825 
4826 void
4827 nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port)
4828 {
4829     nxt_app_t         *app;
4830     nxt_bool_t        unchain, start_process;
4831     nxt_port_t        *idle_port;
4832     nxt_queue_link_t  *idle_lnk;
4833 
4834     app = port->app;
4835 
4836     nxt_assert(app != NULL);
4837 
4838     nxt_thread_mutex_lock(&app->mutex);
4839 
4840     nxt_port_hash_remove(&app->port_hash, port);
4841     app->port_hash_count--;
4842 
4843     if (port->id != 0) {
4844         nxt_thread_mutex_unlock(&app->mutex);
4845 
4846         nxt_debug(task, "app '%V' port (%PI, %d) closed", &app->name,
4847                   port->pid, port->id);
4848 
4849         return;
4850     }
4851 
4852     unchain = nxt_queue_chk_remove(&port->app_link);
4853 
4854     if (nxt_queue_chk_remove(&port->idle_link)) {
4855         app->idle_processes--;
4856 
4857         nxt_debug(task, "app '%V' move port %PI:%d out of %s before close",
4858                   &app->name, port->pid, port->id,
4859                   (port->idle_start ? "idle_ports" : "spare_ports"));
4860 
4861         if (port->idle_start == 0
4862             && app->idle_processes >= app->spare_processes)
4863         {
4864             nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4865 
4866             idle_lnk = nxt_queue_last(&app->idle_ports);
4867             idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link);
4868             nxt_queue_remove(idle_lnk);
4869 
4870             nxt_queue_insert_tail(&app->spare_ports, idle_lnk);
4871 
4872             idle_port->idle_start = 0;
4873 
4874             nxt_debug(task, "app '%V' move port %PI:%d from idle_ports "
4875                       "to spare_ports",
4876                       &app->name, idle_port->pid, idle_port->id);
4877         }
4878     }
4879 
4880     app->processes--;
4881 
4882     start_process = !task->thread->engine->shutdown
4883                     && nxt_router_app_can_start(app)
4884                     && nxt_router_app_need_start(app);
4885 
4886     if (start_process) {
4887         app->pending_processes++;
4888     }
4889 
4890     nxt_thread_mutex_unlock(&app->mutex);
4891 
4892     nxt_debug(task, "app '%V' pid %PI closed", &app->name, port->pid);
4893 
4894     if (unchain) {
4895         nxt_port_use(task, port, -1);
4896     }
4897 
4898     if (start_process) {
4899         nxt_router_start_app_process(task, app);
4900     }
4901 }
4902 
4903 
4904 static void
4905 nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data)
4906 {
4907     nxt_app_t           *app;
4908     nxt_bool_t          queued;
4909     nxt_port_t          *port;
4910     nxt_msec_t          timeout, threshold;
4911     nxt_queue_link_t    *lnk;
4912     nxt_event_engine_t  *engine;
4913 
4914     app = obj;
4915     queued = (data == app);
4916 
4917     nxt_debug(task, "nxt_router_adjust_idle_timer: app \"%V\", queued %b",
4918               &app->name, queued);
4919 
4920     engine = task->thread->engine;
4921 
4922     nxt_assert(app->engine == engine);
4923 
4924     threshold = engine->timers.now + app->joint->idle_timer.bias;
4925     timeout = 0;
4926 
4927     nxt_thread_mutex_lock(&app->mutex);
4928 
4929     if (queued) {
4930         app->adjust_idle_work.data = NULL;
4931     }
4932 
4933     nxt_debug(task, "app '%V' idle_processes %d, spare_processes %d",
4934               &app->name,
4935               (int) app->idle_processes, (int) app->spare_processes);
4936 
4937     while (app->idle_processes > app->spare_processes) {
4938 
4939         nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4940 
4941         lnk = nxt_queue_first(&app->idle_ports);
4942         port = nxt_queue_link_data(lnk, nxt_port_t, idle_link);
4943 
4944         timeout = port->idle_start + app->idle_timeout;
4945 
4946         nxt_debug(task, "app '%V' pid %PI, start %M, timeout %M, threshold %M",
4947                   &app->name, port->pid,
4948                   port->idle_start, timeout, threshold);
4949 
4950         if (timeout > threshold) {
4951             break;
4952         }
4953 
4954         nxt_queue_remove(lnk);
4955         lnk->next = NULL;
4956 
4957         nxt_debug(task, "app '%V' move port %PI:%d out of idle_ports (timeout)",
4958                   &app->name, port->pid, port->id);
4959 
4960         nxt_queue_chk_remove(&port->app_link);
4961 
4962         nxt_port_hash_remove(&app->port_hash, port);
4963         app->port_hash_count--;
4964 
4965         app->idle_processes--;
4966         app->processes--;
4967         port->app = NULL;
4968 
4969         nxt_thread_mutex_unlock(&app->mutex);
4970 
4971         nxt_debug(task, "app '%V' send QUIT to idle port %PI",
4972                   &app->name, port->pid);
4973 
4974         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4975 
4976         nxt_port_use(task, port, -1);
4977 
4978         nxt_thread_mutex_lock(&app->mutex);
4979     }
4980 
4981     nxt_thread_mutex_unlock(&app->mutex);
4982 
4983     if (timeout > threshold) {
4984         nxt_timer_add(engine, &app->joint->idle_timer, timeout - threshold);
4985 
4986     } else {
4987         nxt_timer_disable(engine, &app->joint->idle_timer);
4988     }
4989 
4990     if (queued) {
4991         nxt_router_app_use(task, app, -1);
4992     }
4993 }
4994 
4995 
4996 static void
4997 nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, void *data)
4998 {
4999     nxt_timer_t      *timer;
5000     nxt_app_joint_t  *app_joint;
5001 
5002     timer = obj;
5003     app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer);
5004 
5005     if (nxt_fast_path(app_joint->app != NULL)) {
5006         nxt_router_adjust_idle_timer(task, app_joint->app, NULL);
5007     }
5008 }
5009 
5010 
5011 static void
5012 nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, void *data)
5013 {
5014     nxt_timer_t      *timer;
5015     nxt_app_joint_t  *app_joint;
5016 
5017     timer = obj;
5018     app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer);
5019 
5020     nxt_router_app_joint_use(task, app_joint, -1);
5021 }
5022 
5023 
5024 static void
5025 nxt_router_free_app(nxt_task_t *task, void *obj, void *data)
5026 {
5027     nxt_app_t        *app;
5028     nxt_port_t       *port;
5029     nxt_app_joint_t  *app_joint;
5030 
5031     app_joint = obj;
5032     app = app_joint->app;
5033 
5034     for ( ;; ) {
5035         port = nxt_router_app_get_port_for_quit(task, app);
5036         if (port == NULL) {
5037             break;
5038         }
5039 
5040         nxt_debug(task, "send QUIT to app '%V' pid %PI", &app->name, port->pid);
5041 
5042         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
5043 
5044         nxt_port_use(task, port, -1);
5045     }
5046 
5047     nxt_thread_mutex_lock(&app->mutex);
5048 
5049     for ( ;; ) {
5050         port = nxt_port_hash_retrieve(&app->port_hash);
5051         if (port == NULL) {
5052             break;
5053         }
5054 
5055         app->port_hash_count--;
5056 
5057         port->app = NULL;
5058 
5059         nxt_port_close(task, port);
5060 
5061         nxt_port_use(task, port, -1);
5062     }
5063 
5064     nxt_thread_mutex_unlock(&app->mutex);
5065 
5066     nxt_assert(app->processes == 0);
5067     nxt_assert(app->active_requests == 0);
5068     nxt_assert(app->port_hash_count == 0);
5069     nxt_assert(app->idle_processes == 0);
5070     nxt_assert(nxt_queue_is_empty(&app->ports));
5071     nxt_assert(nxt_queue_is_empty(&app->spare_ports));
5072     nxt_assert(nxt_queue_is_empty(&app->idle_ports));
5073 
5074     nxt_port_mmaps_destroy(&app->outgoing, 1);
5075 
5076     nxt_thread_mutex_destroy(&app->outgoing.mutex);
5077 
5078     if (app->shared_port != NULL) {
5079         app->shared_port->app = NULL;
5080         nxt_port_close(task, app->shared_port);
5081         nxt_port_use(task, app->shared_port, -1);
5082 
5083         app->shared_port = NULL;
5084     }
5085 
5086     nxt_thread_mutex_destroy(&app->mutex);
5087     nxt_mp_destroy(app->mem_pool);
5088 
5089     app_joint->app = NULL;
5090 
5091     if (nxt_timer_delete(task->thread->engine, &app_joint->idle_timer)) {
5092         app_joint->idle_timer.handler = nxt_router_app_joint_release_handler;
5093         nxt_timer_add(task->thread->engine, &app_joint->idle_timer, 0);
5094 
5095     } else {
5096         nxt_router_app_joint_use(task, app_joint, -1);
5097     }
5098 }
5099 
5100 
5101 static void
5102 nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
5103     nxt_request_rpc_data_t *req_rpc_data)
5104 {
5105     nxt_bool_t          start_process;
5106     nxt_port_t          *port;
5107     nxt_http_request_t  *r;
5108 
5109     start_process = 0;
5110 
5111     nxt_thread_mutex_lock(&app->mutex);
5112 
5113     port = app->shared_port;
5114     nxt_port_inc_use(port);
5115 
5116     app->active_requests++;
5117 
5118     if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) {
5119         app->pending_processes++;
5120         start_process = 1;
5121     }
5122 
5123     r = req_rpc_data->request;
5124 
5125     /*
5126      * Put request into application-wide list to be able to cancel request
5127      * if something goes wrong with application processes.
5128      */
5129     nxt_queue_insert_tail(&app->ack_waiting_req, &r->app_link);
5130 
5131     nxt_thread_mutex_unlock(&app->mutex);
5132 
5133     /*
5134      * Retain request memory pool while request is linked in ack_waiting_req
5135      * to guarantee request structure memory is accessble.
5136      */
5137     nxt_mp_retain(r->mem_pool);
5138 
5139     req_rpc_data->app_port = port;
5140     req_rpc_data->apr_action = NXT_APR_REQUEST_FAILED;
5141 
5142     if (start_process) {
5143         nxt_router_start_app_process(task, app);
5144     }
5145 }
5146 
5147 
5148 void
5149 nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r,
5150     nxt_http_action_t *action)
5151 {
5152     nxt_event_engine_t      *engine;
5153     nxt_http_app_conf_t     *conf;
5154     nxt_request_rpc_data_t  *req_rpc_data;
5155 
5156     conf = action->u.conf;
5157     engine = task->thread->engine;
5158 
5159     r->app_target = conf->target;
5160 
5161     req_rpc_data = nxt_port_rpc_register_handler_ex(task, engine->port,
5162                                           nxt_router_response_ready_handler,
5163                                           nxt_router_response_error_handler,
5164                                           sizeof(nxt_request_rpc_data_t));
5165     if (nxt_slow_path(req_rpc_data == NULL)) {
5166         nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
5167         return;
5168     }
5169 
5170     /*
5171      * At this point we have request req_rpc_data allocated and registered
5172      * in port handlers.  Need to fixup request memory pool.  Counterpart
5173      * release will be called via following call chain:
5174      *    nxt_request_rpc_data_unlink() ->
5175      *        nxt_router_http_request_release_post() ->
5176      *            nxt_router_http_request_release()
5177      */
5178     nxt_mp_retain(r->mem_pool);
5179 
5180     r->timer.task = &engine->task;
5181     r->timer.work_queue = &engine->fast_work_queue;
5182     r->timer.log = engine->task.log;
5183     r->timer.bias = NXT_TIMER_DEFAULT_BIAS;
5184 
5185     r->engine = engine;
5186     r->err_work.handler = nxt_router_http_request_error;
5187     r->err_work.task = task;
5188     r->err_work.obj = r;
5189 
5190     req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data);
5191     req_rpc_data->app = conf->app;
5192     req_rpc_data->msg_info.body_fd = -1;
5193     req_rpc_data->rpc_cancel = 1;
5194 
5195     nxt_router_app_use(task, conf->app, 1);
5196 
5197     req_rpc_data->request = r;
5198     r->req_rpc_data = req_rpc_data;
5199 
5200     if (r->last != NULL) {
5201         r->last->completion_handler = nxt_router_http_request_done;
5202     }
5203 
5204     nxt_router_app_port_get(task, conf->app, req_rpc_data);
5205     nxt_router_app_prepare_request(task, req_rpc_data);
5206 }
5207 
5208 
5209 static void
5210 nxt_router_http_request_error(nxt_task_t *task, void *obj, void *data)
5211 {
5212     nxt_http_request_t  *r;
5213 
5214     r = obj;
5215 
5216     nxt_debug(task, "router http request error (rpc_data %p)", r->req_rpc_data);
5217 
5218     nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
5219 
5220     if (r->req_rpc_data != NULL) {
5221         nxt_request_rpc_data_unlink(task, r->req_rpc_data);
5222     }
5223 
5224     nxt_mp_release(r->mem_pool);
5225 }
5226 
5227 
5228 static void
5229 nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data)
5230 {
5231     nxt_http_request_t  *r;
5232 
5233     r = data;
5234 
5235     nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data);
5236 
5237     if (r->req_rpc_data != NULL) {
5238         nxt_request_rpc_data_unlink(task, r->req_rpc_data);
5239     }
5240 
5241     nxt_http_request_close_handler(task, r, r->proto.any);
5242 }
5243 
5244 
5245 static void
5246 nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj, void *data)
5247 {
5248 }
5249 
5250 
5251 static void
5252 nxt_router_app_prepare_request(nxt_task_t *task,
5253     nxt_request_rpc_data_t *req_rpc_data)
5254 {
5255     nxt_app_t         *app;
5256     nxt_buf_t         *buf, *body;
5257     nxt_int_t         res;
5258     nxt_port_t        *port, *reply_port;
5259 
5260     int                   notify;
5261     struct {
5262         nxt_port_msg_t       pm;
5263         nxt_port_mmap_msg_t  mm;
5264     } msg;
5265 
5266 
5267     app = req_rpc_data->app;
5268 
5269     nxt_assert(app != NULL);
5270 
5271     port = req_rpc_data->app_port;
5272 
5273     nxt_assert(port != NULL);
5274     nxt_assert(port->queue != NULL);
5275 
5276     reply_port = task->thread->engine->port;
5277 
5278     buf = nxt_router_prepare_msg(task, req_rpc_data->request, app,
5279                                  nxt_app_msg_prefix[app->type]);
5280     if (nxt_slow_path(buf == NULL)) {
5281         nxt_alert(task, "stream #%uD, app '%V': failed to prepare app message",
5282                   req_rpc_data->stream, &app->name);
5283 
5284         nxt_http_request_error(task, req_rpc_data->request,
5285                                NXT_HTTP_INTERNAL_SERVER_ERROR);
5286 
5287         return;
5288     }
5289 
5290     nxt_debug(task, "about to send %O bytes buffer to app process port %d",
5291                     nxt_buf_used_size(buf),
5292                     port->socket.fd);
5293 
5294     req_rpc_data->msg_info.buf = buf;
5295 
5296     body = req_rpc_data->request->body;
5297 
5298     if (body != NULL && nxt_buf_is_file(body)) {
5299         req_rpc_data->msg_info.body_fd = body->file->fd;
5300 
5301         body->file->fd = -1;
5302 
5303     } else {
5304         req_rpc_data->msg_info.body_fd = -1;
5305     }
5306 
5307     msg.pm.stream = req_rpc_data->stream;
5308     msg.pm.pid = reply_port->pid;
5309     msg.pm.reply_port = reply_port->id;
5310     msg.pm.type = NXT_PORT_MSG_REQ_HEADERS;
5311     msg.pm.last = 0;
5312     msg.pm.mmap = 1;
5313     msg.pm.nf = 0;
5314     msg.pm.mf = 0;
5315     msg.pm.tracking = 0;
5316 
5317     nxt_port_mmap_handler_t *mmap_handler = buf->parent;
5318     nxt_port_mmap_header_t *hdr = mmap_handler->hdr;
5319 
5320     msg.mm.mmap_id = hdr->id;
5321     msg.mm.chunk_id = nxt_port_mmap_chunk_id(hdr, buf->mem.pos);
5322     msg.mm.size = nxt_buf_used_size(buf);
5323 
5324     res = nxt_app_queue_send(port->queue, &msg, sizeof(msg),
5325                              req_rpc_data->stream, &notify,
5326                              &req_rpc_data->msg_info.tracking_cookie);
5327     if (nxt_fast_path(res == NXT_OK)) {
5328         if (notify != 0) {
5329             (void) nxt_port_socket_write(task, port,
5330                                          NXT_PORT_MSG_READ_QUEUE,
5331                                          -1, req_rpc_data->stream,
5332                                          reply_port->id, NULL);
5333 
5334         } else {
5335             nxt_debug(task, "queue is not empty");
5336         }
5337 
5338         buf->is_port_mmap_sent = 1;
5339         buf->mem.pos = buf->mem.free;
5340 
5341     } else {
5342         nxt_alert(task, "stream #%uD, app '%V': failed to send app message",
5343                   req_rpc_data->stream, &app->name);
5344 
5345         nxt_http_request_error(task, req_rpc_data->request,
5346                                NXT_HTTP_INTERNAL_SERVER_ERROR);
5347     }
5348 }
5349 
5350 
5351 struct nxt_fields_iter_s {
5352     nxt_list_part_t   *part;
5353     nxt_http_field_t  *field;
5354 };
5355 
5356 typedef struct nxt_fields_iter_s  nxt_fields_iter_t;
5357 
5358 
5359 static nxt_http_field_t *
5360 nxt_fields_part_first(nxt_list_part_t *part, nxt_fields_iter_t *i)
5361 {
5362     if (part == NULL) {
5363         return NULL;
5364     }
5365 
5366     while (part->nelts == 0) {
5367         part = part->next;
5368         if (part == NULL) {
5369             return NULL;
5370         }
5371     }
5372 
5373     i->part = part;
5374     i->field = nxt_list_data(i->part);
5375 
5376     return i->field;
5377 }
5378 
5379 
5380 static nxt_http_field_t *
5381 nxt_fields_first(nxt_list_t *fields, nxt_fields_iter_t *i)
5382 {
5383     return nxt_fields_part_first(nxt_list_part(fields), i);
5384 }
5385 
5386 
5387 static nxt_http_field_t *
5388 nxt_fields_next(nxt_fields_iter_t *i)
5389 {
5390     nxt_http_field_t  *end = nxt_list_data(i->part);
5391 
5392     end += i->part->nelts;
5393     i->field++;
5394 
5395     if (i->field < end) {
5396         return i->field;
5397     }
5398 
5399     return nxt_fields_part_first(i->part->next, i);
5400 }
5401 
5402 
5403 static nxt_buf_t *
5404 nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r,
5405     nxt_app_t *app, const nxt_str_t *prefix)
5406 {
5407     void                *target_pos, *query_pos;
5408     u_char              *pos, *end, *p, c;
5409     size_t              fields_count, req_size, size, free_size;
5410     size_t              copy_size;
5411     nxt_off_t           content_length;
5412     nxt_buf_t           *b, *buf, *out, **tail;
5413     nxt_http_field_t    *field, *dup;
5414     nxt_unit_field_t    *dst_field;
5415     nxt_fields_iter_t   iter, dup_iter;
5416     nxt_unit_request_t  *req;
5417 
5418     req_size = sizeof(nxt_unit_request_t)
5419                + r->method->length + 1
5420                + r->version.length + 1
5421                + r->remote->length + 1
5422                + r->local->length + 1
5423                + r->server_name.length + 1
5424                + r->target.length + 1
5425                + (r->path->start != r->target.start ? r->path->length + 1 : 0);
5426 
5427     content_length = r->content_length_n < 0 ? 0 : r->content_length_n;
5428     fields_count = 0;
5429 
5430     nxt_list_each(field, r->fields) {
5431         fields_count++;
5432 
5433         req_size += field->name_length + prefix->length + 1
5434                     + field->value_length + 1;
5435     } nxt_list_loop;
5436 
5437     req_size += fields_count * sizeof(nxt_unit_field_t);
5438 
5439     if (nxt_slow_path(req_size > PORT_MMAP_DATA_SIZE)) {
5440         nxt_alert(task, "headers to big to fit in shared memory (%d)",
5441                   (int) req_size);
5442 
5443         return NULL;
5444     }
5445 
5446     out = nxt_port_mmap_get_buf(task, &app->outgoing,
5447               nxt_min(req_size + content_length, PORT_MMAP_DATA_SIZE));
5448     if (nxt_slow_path(out == NULL)) {
5449         return NULL;
5450     }
5451 
5452     req = (nxt_unit_request_t *) out->mem.free;
5453     out->mem.free += req_size;
5454 
5455     req->app_target = r->app_target;
5456 
5457     req->content_length = content_length;
5458 
5459     p = (u_char *) (req->fields + fields_count);
5460 
5461     nxt_debug(task, "fields_count=%d", (int) fields_count);
5462 
5463     req->method_length = r->method->length;
5464     nxt_unit_sptr_set(&req->method, p);
5465     p = nxt_cpymem(p, r->method->start, r->method->length);
5466     *p++ = '\0';
5467 
5468     req->version_length = r->version.length;
5469     nxt_unit_sptr_set(&req->version, p);
5470     p = nxt_cpymem(p, r->version.start, r->version.length);
5471     *p++ = '\0';
5472 
5473     req->remote_length = r->remote->address_length;
5474     nxt_unit_sptr_set(&req->remote, p);
5475     p = nxt_cpymem(p, nxt_sockaddr_address(r->remote),
5476                    r->remote->address_length);
5477     *p++ = '\0';
5478 
5479     req->local_length = r->local->address_length;
5480     nxt_unit_sptr_set(&req->local, p);
5481     p = nxt_cpymem(p, nxt_sockaddr_address(r->local), r->local->address_length);
5482     *p++ = '\0';
5483 
5484     req->tls = (r->tls != NULL);
5485     req->websocket_handshake = r->websocket_handshake;
5486 
5487     req->server_name_length = r->server_name.length;
5488     nxt_unit_sptr_set(&req->server_name, p);
5489     p = nxt_cpymem(p, r->server_name.start, r->server_name.length);
5490     *p++ = '\0';
5491 
5492     target_pos = p;
5493     req->target_length = (uint32_t) r->target.length;
5494     nxt_unit_sptr_set(&req->target, p);
5495     p = nxt_cpymem(p, r->target.start, r->target.length);
5496     *p++ = '\0';
5497 
5498     req->path_length = (uint32_t) r->path->length;
5499     if (r->path->start == r->target.start) {
5500         nxt_unit_sptr_set(&req->path, target_pos);
5501 
5502     } else {
5503         nxt_unit_sptr_set(&req->path, p);
5504         p = nxt_cpymem(p, r->path->start, r->path->length);
5505         *p++ = '\0';
5506     }
5507 
5508     req->query_length = r->args != NULL ? (uint32_t) r->args->length : 0;
5509     if (r->args != NULL && r->args->start != NULL) {
5510         query_pos = nxt_pointer_to(target_pos,
5511                                    r->args->start - r->target.start);
5512 
5513         nxt_unit_sptr_set(&req->query, query_pos);
5514 
5515     } else {
5516         req->query.offset = 0;
5517     }
5518 
5519     req->content_length_field = NXT_UNIT_NONE_FIELD;
5520     req->content_type_field   = NXT_UNIT_NONE_FIELD;
5521     req->cookie_field         = NXT_UNIT_NONE_FIELD;
5522     req->authorization_field  = NXT_UNIT_NONE_FIELD;
5523 
5524     dst_field = req->fields;
5525 
5526     for (field = nxt_fields_first(r->fields, &iter);
5527          field != NULL;
5528          field = nxt_fields_next(&iter))
5529     {
5530         if (field->skip) {
5531             continue;
5532         }
5533 
5534         dst_field->hash = field->hash;
5535         dst_field->skip = 0;
5536         dst_field->name_length = field->name_length + prefix->length;
5537         dst_field->value_length = field->value_length;
5538 
5539         if (field == r->content_length) {
5540             req->content_length_field = dst_field - req->fields;
5541 
5542         } else if (field == r->content_type) {
5543             req->content_type_field = dst_field - req->fields;
5544 
5545         } else if (field == r->cookie) {
5546             req->cookie_field = dst_field - req->fields;
5547 
5548         } else if (field == r->authorization) {
5549             req->authorization_field = dst_field - req->fields;
5550         }
5551 
5552         nxt_debug(task, "add field 0x%04Xd, %d, %d, %p : %d %p",
5553                   (int) field->hash, (int) field->skip,
5554                   (int) field->name_length, field->name,
5555                   (int) field->value_length, field->value);
5556 
5557         if (prefix->length != 0) {
5558             nxt_unit_sptr_set(&dst_field->name, p);
5559             p = nxt_cpymem(p, prefix->start, prefix->length);
5560 
5561             end = field->name + field->name_length;
5562             for (pos = field->name; pos < end; pos++) {
5563                 c = *pos;
5564 
5565                 if (c >= 'a' && c <= 'z') {
5566                     *p++ = (c & ~0x20);
5567                     continue;
5568                 }
5569 
5570                 if (c == '-') {
5571                     *p++ = '_';
5572                     continue;
5573                 }
5574 
5575                 *p++ = c;
5576             }
5577 
5578         } else {
5579             nxt_unit_sptr_set(&dst_field->name, p);
5580             p = nxt_cpymem(p, field->name, field->name_length);
5581         }
5582 
5583         *p++ = '\0';
5584 
5585         nxt_unit_sptr_set(&dst_field->value, p);
5586         p = nxt_cpymem(p, field->value, field->value_length);
5587 
5588         if (prefix->length != 0) {
5589             dup_iter = iter;
5590 
5591             for (dup = nxt_fields_next(&dup_iter);
5592                  dup != NULL;
5593                  dup = nxt_fields_next(&dup_iter))
5594             {
5595                 if (dup->name_length != field->name_length
5596                     || dup->skip
5597                     || dup->hash != field->hash
5598                     || nxt_memcasecmp(dup->name, field->name, dup->name_length))
5599                 {
5600                     continue;
5601                 }
5602 
5603                 p = nxt_cpymem(p, ", ", 2);
5604                 p = nxt_cpymem(p, dup->value, dup->value_length);
5605 
5606                 dst_field->value_length += 2 + dup->value_length;
5607 
5608                 dup->skip = 1;
5609             }
5610         }
5611 
5612         *p++ = '\0';
5613 
5614         dst_field++;
5615     }
5616 
5617     req->fields_count = (uint32_t) (dst_field - req->fields);
5618 
5619     nxt_unit_sptr_set(&req->preread_content, out->mem.free);
5620 
5621     buf = out;
5622     tail = &buf->next;
5623 
5624     for (b = r->body; b != NULL; b = b->next) {
5625         size = nxt_buf_mem_used_size(&b->mem);
5626         pos = b->mem.pos;
5627 
5628         while (size > 0) {
5629             if (buf == NULL) {
5630                 free_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
5631 
5632                 buf = nxt_port_mmap_get_buf(task, &app->outgoing, free_size);
5633                 if (nxt_slow_path(buf == NULL)) {
5634                     while (out != NULL) {
5635                         buf = out->next;
5636                         out->next = NULL;
5637                         out->completion_handler(task, out, out->parent);
5638                         out = buf;
5639                     }
5640                     return NULL;
5641                 }
5642 
5643                 *tail = buf;
5644                 tail = &buf->next;
5645 
5646             } else {
5647                 free_size = nxt_buf_mem_free_size(&buf->mem);
5648                 if (free_size < size
5649                     && nxt_port_mmap_increase_buf(task, buf, size, 1)
5650                        == NXT_OK)
5651                 {
5652                     free_size = nxt_buf_mem_free_size(&buf->mem);
5653                 }
5654             }
5655 
5656             if (free_size > 0) {
5657                 copy_size = nxt_min(free_size, size);
5658 
5659                 buf->mem.free = nxt_cpymem(buf->mem.free, pos, copy_size);
5660 
5661                 size -= copy_size;
5662                 pos += copy_size;
5663 
5664                 if (size == 0) {
5665                     break;
5666                 }
5667             }
5668 
5669             buf = NULL;
5670         }
5671     }
5672 
5673     return out;
5674 }
5675 
5676 
5677 static void
5678 nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
5679 {
5680     nxt_timer_t              *timer;
5681     nxt_http_request_t       *r;
5682     nxt_request_rpc_data_t   *req_rpc_data;
5683 
5684     timer = obj;
5685 
5686     nxt_debug(task, "router app timeout");
5687 
5688     r = nxt_timer_data(timer, nxt_http_request_t, timer);
5689     req_rpc_data = r->timer_data;
5690 
5691     nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
5692 
5693     nxt_request_rpc_data_unlink(task, req_rpc_data);
5694 }
5695 
5696 
5697 static void
5698 nxt_router_http_request_release_post(nxt_task_t *task, nxt_http_request_t *r)
5699 {
5700     r->timer.handler = nxt_router_http_request_release;
5701     nxt_timer_add(task->thread->engine, &r->timer, 0);
5702 }
5703 
5704 
5705 static void
5706 nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data)
5707 {
5708     nxt_http_request_t  *r;
5709 
5710     nxt_debug(task, "http request pool release");
5711 
5712     r = nxt_timer_data(obj, nxt_http_request_t, timer);
5713 
5714     nxt_mp_release(r->mem_pool);
5715 }
5716 
5717 
5718 static void
5719 nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5720 {
5721     size_t                   mi;
5722     uint32_t                 i;
5723     nxt_bool_t               ack;
5724     nxt_process_t            *process;
5725     nxt_free_map_t           *m;
5726     nxt_port_mmap_handler_t  *mmap_handler;
5727 
5728     nxt_debug(task, "oosm in %PI", msg->port_msg.pid);
5729 
5730     process = nxt_runtime_process_find(task->thread->runtime,
5731                                        msg->port_msg.pid);
5732     if (nxt_slow_path(process == NULL)) {
5733         return;
5734     }
5735 
5736     ack = 0;
5737 
5738     /*
5739      * To mitigate possible racing condition (when OOSM message received
5740      * after some of the memory was already freed), need to try to find
5741      * first free segment in shared memory and send ACK if found.
5742      */
5743 
5744     nxt_thread_mutex_lock(&process->incoming.mutex);
5745 
5746     for (i = 0; i < process->incoming.size; i++) {
5747         mmap_handler = process->incoming.elts[i].mmap_handler;
5748 
5749         if (nxt_slow_path(mmap_handler == NULL)) {
5750             continue;
5751         }
5752 
5753         m = mmap_handler->hdr->free_map;
5754 
5755         for (mi = 0; mi < MAX_FREE_IDX; mi++) {
5756             if (m[mi] != 0) {
5757                 ack = 1;
5758 
5759                 nxt_debug(task, "oosm: already free #%uD %uz = 0x%08xA",
5760                           i, mi, m[mi]);
5761 
5762                 break;
5763             }
5764         }
5765     }
5766 
5767     nxt_thread_mutex_unlock(&process->incoming.mutex);
5768 
5769     if (ack) {
5770         nxt_process_broadcast_shm_ack(task, process);
5771     }
5772 }
5773 
5774 
5775 static void
5776 nxt_router_get_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5777 {
5778     nxt_fd_t                 fd;
5779     nxt_port_t               *port;
5780     nxt_runtime_t            *rt;
5781     nxt_port_mmaps_t         *mmaps;
5782     nxt_port_msg_get_mmap_t  *get_mmap_msg;
5783     nxt_port_mmap_handler_t  *mmap_handler;
5784 
5785     rt = task->thread->runtime;
5786 
5787     port = nxt_runtime_port_find(rt, msg->port_msg.pid,
5788                                  msg->port_msg.reply_port);
5789     if (nxt_slow_path(port == NULL)) {
5790         nxt_alert(task, "get_mmap_handler: reply_port %PI:%d not found",
5791                   msg->port_msg.pid, msg->port_msg.reply_port);
5792 
5793         return;
5794     }
5795 
5796     if (nxt_slow_path(nxt_buf_used_size(msg->buf)
5797                       < (int) sizeof(nxt_port_msg_get_mmap_t)))
5798     {
5799         nxt_alert(task, "get_mmap_handler: message buffer too small (%d)",
5800                   (int) nxt_buf_used_size(msg->buf));
5801 
5802         return;
5803     }
5804 
5805     get_mmap_msg = (nxt_port_msg_get_mmap_t *) msg->buf->mem.pos;
5806 
5807     nxt_assert(port->type == NXT_PROCESS_APP);
5808 
5809     if (nxt_slow_path(port->app == NULL)) {
5810         nxt_alert(task, "get_mmap_handler: app == NULL for reply port %PI:%d",
5811                   port->pid, port->id);
5812 
5813         // FIXME
5814         nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
5815                               -1, msg->port_msg.stream, 0, NULL);
5816 
5817         return;
5818     }
5819 
5820     mmaps = &port->app->outgoing;
5821     nxt_thread_mutex_lock(&mmaps->mutex);
5822 
5823     if (nxt_slow_path(get_mmap_msg->id >= mmaps->size)) {
5824         nxt_thread_mutex_unlock(&mmaps->mutex);
5825 
5826         nxt_alert(task, "get_mmap_handler: mmap id is too big (%d)",
5827                   (int) get_mmap_msg->id);
5828 
5829         // FIXME
5830         nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
5831                               -1, msg->port_msg.stream, 0, NULL);
5832         return;
5833     }
5834 
5835     mmap_handler = mmaps->elts[get_mmap_msg->id].mmap_handler;
5836 
5837     fd = mmap_handler->fd;
5838 
5839     nxt_thread_mutex_unlock(&mmaps->mutex);
5840 
5841     nxt_debug(task, "get mmap %PI:%d found",
5842               msg->port_msg.pid, (int) get_mmap_msg->id);
5843 
5844     (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL);
5845 }
5846 
5847 
5848 static void
5849 nxt_router_get_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5850 {
5851     nxt_port_t               *port, *reply_port;
5852     nxt_runtime_t            *rt;
5853     nxt_port_msg_get_port_t  *get_port_msg;
5854 
5855     rt = task->thread->runtime;
5856 
5857     reply_port = nxt_runtime_port_find(rt, msg->port_msg.pid,
5858                                        msg->port_msg.reply_port);
5859     if (nxt_slow_path(reply_port == NULL)) {
5860         nxt_alert(task, "get_port_handler: reply_port %PI:%d not found",
5861                   msg->port_msg.pid, msg->port_msg.reply_port);
5862 
5863         return;
5864     }
5865 
5866     if (nxt_slow_path(nxt_buf_used_size(msg->buf)
5867                       < (int) sizeof(nxt_port_msg_get_port_t)))
5868     {
5869         nxt_alert(task, "get_port_handler: message buffer too small (%d)",
5870                   (int) nxt_buf_used_size(msg->buf));
5871 
5872         return;
5873     }
5874 
5875     get_port_msg = (nxt_port_msg_get_port_t *) msg->buf->mem.pos;
5876 
5877     port = nxt_runtime_port_find(rt, get_port_msg->pid, get_port_msg->id);
5878     if (nxt_slow_path(port == NULL)) {
5879         nxt_alert(task, "get_port_handler: port %PI:%d not found",
5880                   get_port_msg->pid, get_port_msg->id);
5881 
5882         return;
5883     }
5884 
5885     nxt_debug(task, "get port %PI:%d found", get_port_msg->pid,
5886               get_port_msg->id);
5887 
5888     (void) nxt_port_send_port(task, reply_port, port, msg->port_msg.stream);
5889 }
5890