xref: /unit/src/nxt_router.c (revision 1940:29c2c9d80c5b)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Valentin V. Bartenev
5  * Copyright (C) NGINX, Inc.
6  */
7 
8 #include <nxt_router.h>
9 #include <nxt_conf.h>
10 #if (NXT_TLS)
11 #include <nxt_cert.h>
12 #endif
13 #include <nxt_http.h>
14 #include <nxt_port_memory_int.h>
15 #include <nxt_unit_request.h>
16 #include <nxt_unit_response.h>
17 #include <nxt_router_request.h>
18 #include <nxt_app_queue.h>
19 #include <nxt_port_queue.h>
20 
21 #define NXT_SHARED_PORT_ID  0xFFFFu
22 
23 typedef struct {
24     nxt_str_t         type;
25     uint32_t          processes;
26     uint32_t          max_processes;
27     uint32_t          spare_processes;
28     nxt_msec_t        timeout;
29     nxt_msec_t        idle_timeout;
30     uint32_t          requests;
31     nxt_conf_value_t  *limits_value;
32     nxt_conf_value_t  *processes_value;
33     nxt_conf_value_t  *targets_value;
34 } nxt_router_app_conf_t;
35 
36 
37 typedef struct {
38     nxt_str_t         pass;
39     nxt_str_t         application;
40 } nxt_router_listener_conf_t;
41 
42 
43 #if (NXT_TLS)
44 
45 typedef struct {
46     nxt_str_t               name;
47     nxt_socket_conf_t       *socket_conf;
48     nxt_router_temp_conf_t  *temp_conf;
49     nxt_tls_init_t          *tls_init;
50     nxt_bool_t              last;
51 
52     nxt_queue_link_t        link;  /* for nxt_socket_conf_t.tls */
53 } nxt_router_tlssock_t;
54 
55 #endif
56 
57 
58 typedef struct {
59     nxt_str_t               *name;
60     nxt_socket_conf_t       *socket_conf;
61     nxt_router_temp_conf_t  *temp_conf;
62     nxt_bool_t              last;
63 } nxt_socket_rpc_t;
64 
65 
66 typedef struct {
67     nxt_app_t               *app;
68     nxt_router_temp_conf_t  *temp_conf;
69 } nxt_app_rpc_t;
70 
71 
72 typedef struct {
73     nxt_app_joint_t         *app_joint;
74     uint32_t                generation;
75 } nxt_app_joint_rpc_t;
76 
77 
78 static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process,
79     nxt_mp_t *mp);
80 static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data);
81 static void nxt_router_greet_controller(nxt_task_t *task,
82     nxt_port_t *controller_port);
83 
84 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
85 
86 static void nxt_router_new_port_handler(nxt_task_t *task,
87     nxt_port_recv_msg_t *msg);
88 static void nxt_router_conf_data_handler(nxt_task_t *task,
89     nxt_port_recv_msg_t *msg);
90 static void nxt_router_app_restart_handler(nxt_task_t *task,
91     nxt_port_recv_msg_t *msg);
92 static void nxt_router_remove_pid_handler(nxt_task_t *task,
93     nxt_port_recv_msg_t *msg);
94 static void nxt_router_access_log_reopen_handler(nxt_task_t *task,
95     nxt_port_recv_msg_t *msg);
96 
97 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
98 static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
99 static void nxt_router_conf_ready(nxt_task_t *task,
100     nxt_router_temp_conf_t *tmcf);
101 static void nxt_router_conf_error(nxt_task_t *task,
102     nxt_router_temp_conf_t *tmcf);
103 static void nxt_router_conf_send(nxt_task_t *task,
104     nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
105 
106 static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
107     nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
108 static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task,
109     nxt_router_conf_t *rtcf, nxt_conf_value_t *conf);
110 static nxt_int_t nxt_router_conf_process_client_ip(nxt_task_t *task,
111     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf,
112     nxt_conf_value_t *conf);
113 
114 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
115 static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data);
116 static nxt_int_t nxt_router_apps_hash_add(nxt_router_conf_t *rtcf,
117     nxt_app_t *app);
118 static nxt_app_t *nxt_router_apps_hash_get(nxt_router_conf_t *rtcf,
119     nxt_str_t *name);
120 static void nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf,
121     int i);
122 
123 static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task,
124     nxt_port_t *port);
125 static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task,
126     nxt_port_t *port);
127 static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task,
128     nxt_port_t *port, nxt_fd_t fd);
129 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
130     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
131 static void nxt_router_listen_socket_ready(nxt_task_t *task,
132     nxt_port_recv_msg_t *msg, void *data);
133 static void nxt_router_listen_socket_error(nxt_task_t *task,
134     nxt_port_recv_msg_t *msg, void *data);
135 #if (NXT_TLS)
136 static void nxt_router_tls_rpc_handler(nxt_task_t *task,
137     nxt_port_recv_msg_t *msg, void *data);
138 static nxt_int_t nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
139     nxt_conf_value_t *value, nxt_socket_conf_t *skcf, nxt_tls_init_t *tls_init,
140     nxt_bool_t last);
141 #endif
142 static void nxt_router_app_rpc_create(nxt_task_t *task,
143     nxt_router_temp_conf_t *tmcf, nxt_app_t *app);
144 static void nxt_router_app_prefork_ready(nxt_task_t *task,
145     nxt_port_recv_msg_t *msg, void *data);
146 static void nxt_router_app_prefork_error(nxt_task_t *task,
147     nxt_port_recv_msg_t *msg, void *data);
148 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
149     nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
150 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
151     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa);
152 
153 static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
154     nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
155     const nxt_event_interface_t *interface);
156 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
157     nxt_router_engine_conf_t *recf);
158 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
159     nxt_router_engine_conf_t *recf);
160 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
161     nxt_router_engine_conf_t *recf);
162 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
163     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
164     nxt_work_handler_t handler);
165 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
166     nxt_router_engine_conf_t *recf);
167 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
168     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
169 
170 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
171     nxt_router_temp_conf_t *tmcf);
172 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
173     nxt_event_engine_t *engine);
174 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
175     nxt_router_temp_conf_t *tmcf);
176 
177 static void nxt_router_engines_post(nxt_router_t *router,
178     nxt_router_temp_conf_t *tmcf);
179 static void nxt_router_engine_post(nxt_event_engine_t *engine,
180     nxt_work_t *jobs);
181 
182 static void nxt_router_thread_start(void *data);
183 static void nxt_router_rt_add_port(nxt_task_t *task, void *obj,
184     void *data);
185 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
186     void *data);
187 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
188     void *data);
189 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
190     void *data);
191 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
192     void *data);
193 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
194     void *data);
195 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
196     void *data);
197 static void nxt_router_req_headers_ack_handler(nxt_task_t *task,
198     nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data);
199 static void nxt_router_listen_socket_release(nxt_task_t *task,
200     nxt_socket_conf_t *skcf);
201 
202 static void nxt_router_access_log_writer(nxt_task_t *task,
203     nxt_http_request_t *r, nxt_router_access_log_t *access_log);
204 static u_char *nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now,
205     struct tm *tm, size_t size, const char *format);
206 static void nxt_router_access_log_open(nxt_task_t *task,
207     nxt_router_temp_conf_t *tmcf);
208 static void nxt_router_access_log_ready(nxt_task_t *task,
209     nxt_port_recv_msg_t *msg, void *data);
210 static void nxt_router_access_log_error(nxt_task_t *task,
211     nxt_port_recv_msg_t *msg, void *data);
212 static void nxt_router_access_log_release(nxt_task_t *task,
213     nxt_thread_spinlock_t *lock, nxt_router_access_log_t *access_log);
214 static void nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj,
215     void *data);
216 static void nxt_router_access_log_reopen_ready(nxt_task_t *task,
217     nxt_port_recv_msg_t *msg, void *data);
218 static void nxt_router_access_log_reopen_error(nxt_task_t *task,
219     nxt_port_recv_msg_t *msg, void *data);
220 
221 static void nxt_router_app_port_ready(nxt_task_t *task,
222     nxt_port_recv_msg_t *msg, void *data);
223 static nxt_int_t nxt_router_app_shared_port_send(nxt_task_t *task,
224     nxt_port_t *app_port);
225 static void nxt_router_app_port_error(nxt_task_t *task,
226     nxt_port_recv_msg_t *msg, void *data);
227 
228 static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i);
229 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
230 
231 static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
232     nxt_apr_action_t action);
233 static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
234     nxt_request_rpc_data_t *req_rpc_data);
235 static void nxt_router_http_request_error(nxt_task_t *task, void *obj,
236     void *data);
237 static void nxt_router_http_request_done(nxt_task_t *task, void *obj,
238     void *data);
239 
240 static void nxt_router_app_prepare_request(nxt_task_t *task,
241     nxt_request_rpc_data_t *req_rpc_data);
242 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task,
243     nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix);
244 
245 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
246 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
247     void *data);
248 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj,
249     void *data);
250 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj,
251     void *data);
252 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data);
253 
254 static const nxt_http_request_state_t  nxt_http_request_send_state;
255 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
256 
257 static void nxt_router_app_joint_use(nxt_task_t *task,
258     nxt_app_joint_t *app_joint, int i);
259 
260 static void nxt_router_http_request_release_post(nxt_task_t *task,
261     nxt_http_request_t *r);
262 static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
263     void *data);
264 static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
265 static void nxt_router_get_port_handler(nxt_task_t *task,
266     nxt_port_recv_msg_t *msg);
267 static void nxt_router_get_mmap_handler(nxt_task_t *task,
268     nxt_port_recv_msg_t *msg);
269 
270 extern const nxt_http_request_state_t  nxt_http_websocket;
271 
272 static nxt_router_t  *nxt_router;
273 
274 static const nxt_str_t http_prefix = nxt_string("HTTP_");
275 static const nxt_str_t empty_prefix = nxt_string("");
276 
277 static const nxt_str_t  *nxt_app_msg_prefix[] = {
278     &empty_prefix,
279     &empty_prefix,
280     &http_prefix,
281     &http_prefix,
282     &http_prefix,
283     &empty_prefix,
284 };
285 
286 
287 static const nxt_port_handlers_t  nxt_router_process_port_handlers = {
288     .quit         = nxt_signal_quit_handler,
289     .new_port     = nxt_router_new_port_handler,
290     .get_port     = nxt_router_get_port_handler,
291     .change_file  = nxt_port_change_log_file_handler,
292     .mmap         = nxt_port_mmap_handler,
293     .get_mmap     = nxt_router_get_mmap_handler,
294     .data         = nxt_router_conf_data_handler,
295     .app_restart  = nxt_router_app_restart_handler,
296     .remove_pid   = nxt_router_remove_pid_handler,
297     .access_log   = nxt_router_access_log_reopen_handler,
298     .rpc_ready    = nxt_port_rpc_handler,
299     .rpc_error    = nxt_port_rpc_handler,
300     .oosm         = nxt_router_oosm_handler,
301 };
302 
303 
304 const nxt_process_init_t  nxt_router_process = {
305     .name           = "router",
306     .type           = NXT_PROCESS_ROUTER,
307     .prefork        = nxt_router_prefork,
308     .restart        = 1,
309     .setup          = nxt_process_core_setup,
310     .start          = nxt_router_start,
311     .port_handlers  = &nxt_router_process_port_handlers,
312     .signals        = nxt_process_signals,
313 };
314 
315 
316 /* Queues of nxt_socket_conf_t */
317 nxt_queue_t  creating_sockets;
318 nxt_queue_t  pending_sockets;
319 nxt_queue_t  updating_sockets;
320 nxt_queue_t  keeping_sockets;
321 nxt_queue_t  deleting_sockets;
322 
323 
324 static nxt_int_t
325 nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp)
326 {
327     nxt_runtime_stop_app_processes(task, task->thread->runtime);
328 
329     return NXT_OK;
330 }
331 
332 
333 static nxt_int_t
334 nxt_router_start(nxt_task_t *task, nxt_process_data_t *data)
335 {
336     nxt_int_t      ret;
337     nxt_port_t     *controller_port;
338     nxt_router_t   *router;
339     nxt_runtime_t  *rt;
340 
341     rt = task->thread->runtime;
342 
343     nxt_log(task, NXT_LOG_INFO, "router started");
344 
345 #if (NXT_TLS)
346     rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL");
347     if (nxt_slow_path(rt->tls == NULL)) {
348         return NXT_ERROR;
349     }
350 
351     ret = rt->tls->library_init(task);
352     if (nxt_slow_path(ret != NXT_OK)) {
353         return ret;
354     }
355 #endif
356 
357     ret = nxt_http_init(task);
358     if (nxt_slow_path(ret != NXT_OK)) {
359         return ret;
360     }
361 
362     router = nxt_zalloc(sizeof(nxt_router_t));
363     if (nxt_slow_path(router == NULL)) {
364         return NXT_ERROR;
365     }
366 
367     nxt_queue_init(&router->engines);
368     nxt_queue_init(&router->sockets);
369     nxt_queue_init(&router->apps);
370 
371     nxt_router = router;
372 
373     controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
374     if (controller_port != NULL) {
375         nxt_router_greet_controller(task, controller_port);
376     }
377 
378     return NXT_OK;
379 }
380 
381 
382 static void
383 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port)
384 {
385     nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY,
386                           -1, 0, 0, NULL);
387 }
388 
389 
390 static void
391 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
392     void *data)
393 {
394     size_t               size;
395     uint32_t             stream;
396     nxt_mp_t             *mp;
397     nxt_int_t            ret;
398     nxt_app_t            *app;
399     nxt_buf_t            *b;
400     nxt_port_t           *main_port;
401     nxt_runtime_t        *rt;
402     nxt_app_joint_rpc_t  *app_joint_rpc;
403 
404     app = data;
405 
406     rt = task->thread->runtime;
407     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
408 
409     nxt_debug(task, "app '%V' %p start process", &app->name, app);
410 
411     size = app->name.length + 1 + app->conf.length;
412 
413     b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size);
414 
415     if (nxt_slow_path(b == NULL)) {
416         goto failed;
417     }
418 
419     nxt_buf_cpystr(b, &app->name);
420     *b->mem.free++ = '\0';
421     nxt_buf_cpystr(b, &app->conf);
422 
423     app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port,
424                                                      nxt_router_app_port_ready,
425                                                      nxt_router_app_port_error,
426                                                    sizeof(nxt_app_joint_rpc_t));
427     if (nxt_slow_path(app_joint_rpc == NULL)) {
428         goto failed;
429     }
430 
431     stream = nxt_port_rpc_ex_stream(app_joint_rpc);
432 
433     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS,
434                                 -1, stream, port->id, b);
435     if (nxt_slow_path(ret != NXT_OK)) {
436         nxt_port_rpc_cancel(task, port, stream);
437 
438         goto failed;
439     }
440 
441     app_joint_rpc->app_joint = app->joint;
442     app_joint_rpc->generation = app->generation;
443 
444     nxt_router_app_joint_use(task, app->joint, 1);
445 
446     nxt_router_app_use(task, app, -1);
447 
448     return;
449 
450 failed:
451 
452     if (b != NULL) {
453         mp = b->data;
454         nxt_mp_free(mp, b);
455         nxt_mp_release(mp);
456     }
457 
458     nxt_thread_mutex_lock(&app->mutex);
459 
460     app->pending_processes--;
461 
462     nxt_thread_mutex_unlock(&app->mutex);
463 
464     nxt_router_app_use(task, app, -1);
465 }
466 
467 
468 static void
469 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i)
470 {
471     app_joint->use_count += i;
472 
473     if (app_joint->use_count == 0) {
474         nxt_assert(app_joint->app == NULL);
475 
476         nxt_free(app_joint);
477     }
478 }
479 
480 
481 static nxt_int_t
482 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
483 {
484     nxt_int_t      res;
485     nxt_port_t     *router_port;
486     nxt_runtime_t  *rt;
487 
488     nxt_debug(task, "app '%V' start process", &app->name);
489 
490     rt = task->thread->runtime;
491     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
492 
493     nxt_router_app_use(task, app, 1);
494 
495     res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler,
496                         app);
497 
498     if (res == NXT_OK) {
499         return res;
500     }
501 
502     nxt_thread_mutex_lock(&app->mutex);
503 
504     app->pending_processes--;
505 
506     nxt_thread_mutex_unlock(&app->mutex);
507 
508     nxt_router_app_use(task, app, -1);
509 
510     return NXT_ERROR;
511 }
512 
513 
514 nxt_inline nxt_bool_t
515 nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data)
516 {
517     nxt_buf_t       *b, *next;
518     nxt_bool_t      cancelled;
519     nxt_port_t      *app_port;
520     nxt_msg_info_t  *msg_info;
521 
522     msg_info = &req_rpc_data->msg_info;
523 
524     if (msg_info->buf == NULL) {
525         return 0;
526     }
527 
528     app_port = req_rpc_data->app_port;
529 
530     if (app_port != NULL && app_port->id == NXT_SHARED_PORT_ID) {
531         cancelled = nxt_app_queue_cancel(app_port->queue,
532                                          msg_info->tracking_cookie,
533                                          req_rpc_data->stream);
534 
535         if (cancelled) {
536             nxt_debug(task, "stream #%uD: cancelled by router",
537                       req_rpc_data->stream);
538         }
539 
540     } else {
541         cancelled = 0;
542     }
543 
544     for (b = msg_info->buf; b != NULL; b = next) {
545         next = b->next;
546         b->next = NULL;
547 
548         if (b->is_port_mmap_sent) {
549             b->is_port_mmap_sent = cancelled == 0;
550         }
551 
552         b->completion_handler(task, b, b->parent);
553     }
554 
555     msg_info->buf = NULL;
556 
557     return cancelled;
558 }
559 
560 
561 nxt_inline nxt_bool_t
562 nxt_queue_chk_remove(nxt_queue_link_t *lnk)
563 {
564     if (lnk->next != NULL) {
565         nxt_queue_remove(lnk);
566 
567         lnk->next = NULL;
568 
569         return 1;
570     }
571 
572     return 0;
573 }
574 
575 
576 nxt_inline void
577 nxt_request_rpc_data_unlink(nxt_task_t *task,
578     nxt_request_rpc_data_t *req_rpc_data)
579 {
580     nxt_app_t           *app;
581     nxt_bool_t          unlinked;
582     nxt_http_request_t  *r;
583 
584     nxt_router_msg_cancel(task, req_rpc_data);
585 
586     if (req_rpc_data->app_port != NULL) {
587         nxt_router_app_port_release(task, req_rpc_data->app_port,
588                                     req_rpc_data->apr_action);
589 
590         req_rpc_data->app_port = NULL;
591     }
592 
593     app = req_rpc_data->app;
594     r = req_rpc_data->request;
595 
596     if (r != NULL) {
597         r->timer_data = NULL;
598 
599         nxt_router_http_request_release_post(task, r);
600 
601         r->req_rpc_data = NULL;
602         req_rpc_data->request = NULL;
603 
604         if (app != NULL) {
605             unlinked = 0;
606 
607             nxt_thread_mutex_lock(&app->mutex);
608 
609             if (r->app_link.next != NULL) {
610                 nxt_queue_remove(&r->app_link);
611                 r->app_link.next = NULL;
612 
613                 unlinked = 1;
614             }
615 
616             nxt_thread_mutex_unlock(&app->mutex);
617 
618             if (unlinked) {
619                 nxt_mp_release(r->mem_pool);
620             }
621         }
622     }
623 
624     if (app != NULL) {
625         nxt_router_app_use(task, app, -1);
626 
627         req_rpc_data->app = NULL;
628     }
629 
630     if (req_rpc_data->msg_info.body_fd != -1) {
631         nxt_fd_close(req_rpc_data->msg_info.body_fd);
632 
633         req_rpc_data->msg_info.body_fd = -1;
634     }
635 
636     if (req_rpc_data->rpc_cancel) {
637         req_rpc_data->rpc_cancel = 0;
638 
639         nxt_port_rpc_cancel(task, task->thread->engine->port,
640                             req_rpc_data->stream);
641     }
642 }
643 
644 
645 static void
646 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
647 {
648     nxt_int_t      res;
649     nxt_app_t      *app;
650     nxt_port_t     *port, *main_app_port;
651     nxt_runtime_t  *rt;
652 
653     nxt_port_new_port_handler(task, msg);
654 
655     port = msg->u.new_port;
656 
657     if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) {
658         nxt_router_greet_controller(task, msg->u.new_port);
659     }
660 
661     if (port == NULL || port->type != NXT_PROCESS_APP) {
662 
663         if (msg->port_msg.stream == 0) {
664             return;
665         }
666 
667         msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
668 
669     } else {
670         if (msg->fd[1] != -1) {
671             res = nxt_router_port_queue_map(task, port, msg->fd[1]);
672             if (nxt_slow_path(res != NXT_OK)) {
673                 return;
674             }
675 
676             nxt_fd_close(msg->fd[1]);
677             msg->fd[1] = -1;
678         }
679     }
680 
681     if (msg->port_msg.stream != 0) {
682         nxt_port_rpc_handler(task, msg);
683         return;
684     }
685 
686     /*
687      * Port with "id == 0" is application 'main' port and it always
688      * should come with non-zero stream.
689      */
690     nxt_assert(port->id != 0);
691 
692     /* Find 'main' app port and get app reference. */
693     rt = task->thread->runtime;
694 
695     /*
696      * It is safe to access 'runtime->ports' hash because 'NEW_PORT'
697      * sent to main port (with id == 0) and processed in main thread.
698      */
699     main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0);
700     nxt_assert(main_app_port != NULL);
701 
702     app = main_app_port->app;
703 
704     if (nxt_fast_path(app != NULL)) {
705         nxt_thread_mutex_lock(&app->mutex);
706 
707         /* TODO here should be find-and-add code because there can be
708            port waiters in port_hash */
709         nxt_port_hash_add(&app->port_hash, port);
710         app->port_hash_count++;
711 
712         nxt_thread_mutex_unlock(&app->mutex);
713 
714         port->app = app;
715     }
716 
717     port->main_app_port = main_app_port;
718 
719     nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
720 }
721 
722 
723 static void
724 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
725 {
726     void                    *p;
727     size_t                  size;
728     nxt_int_t               ret;
729     nxt_port_t              *port;
730     nxt_router_temp_conf_t  *tmcf;
731 
732     port = nxt_runtime_port_find(task->thread->runtime,
733                                  msg->port_msg.pid,
734                                  msg->port_msg.reply_port);
735     if (nxt_slow_path(port == NULL)) {
736         nxt_alert(task, "conf_data_handler: reply port not found");
737         return;
738     }
739 
740     p = MAP_FAILED;
741 
742     /*
743      * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be
744      * initialized in 'cleanup' section.
745      */
746     size = 0;
747 
748     tmcf = nxt_router_temp_conf(task);
749     if (nxt_slow_path(tmcf == NULL)) {
750         goto fail;
751     }
752 
753     if (nxt_slow_path(msg->fd[0] == -1)) {
754         nxt_alert(task, "conf_data_handler: invalid shm fd");
755         goto fail;
756     }
757 
758     if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) {
759         nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)",
760                   (int) nxt_buf_mem_used_size(&msg->buf->mem));
761         goto fail;
762     }
763 
764     nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t));
765 
766     p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0);
767 
768     nxt_fd_close(msg->fd[0]);
769     msg->fd[0] = -1;
770 
771     if (nxt_slow_path(p == MAP_FAILED)) {
772         goto fail;
773     }
774 
775     nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p);
776 
777     tmcf->router_conf->router = nxt_router;
778     tmcf->stream = msg->port_msg.stream;
779     tmcf->port = port;
780 
781     nxt_port_use(task, tmcf->port, 1);
782 
783     ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size));
784 
785     if (nxt_fast_path(ret == NXT_OK)) {
786         nxt_router_conf_apply(task, tmcf, NULL);
787 
788     } else {
789         nxt_router_conf_error(task, tmcf);
790     }
791 
792     goto cleanup;
793 
794 fail:
795 
796     nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
797                           msg->port_msg.stream, 0, NULL);
798 
799     if (tmcf != NULL) {
800         nxt_mp_release(tmcf->mem_pool);
801     }
802 
803 cleanup:
804 
805     if (p != MAP_FAILED) {
806         nxt_mem_munmap(p, size);
807     }
808 
809     if (msg->fd[0] != -1) {
810         nxt_fd_close(msg->fd[0]);
811         msg->fd[0] = -1;
812     }
813 }
814 
815 
816 static void
817 nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
818 {
819     nxt_app_t            *app;
820     nxt_int_t            ret;
821     nxt_str_t            app_name;
822     nxt_port_t           *port, *reply_port, *shared_port, *old_shared_port;
823     nxt_port_msg_type_t  reply;
824 
825     reply_port = nxt_runtime_port_find(task->thread->runtime,
826                                        msg->port_msg.pid,
827                                        msg->port_msg.reply_port);
828     if (nxt_slow_path(reply_port == NULL)) {
829         nxt_alert(task, "app_restart_handler: reply port not found");
830         return;
831     }
832 
833     app_name.length = nxt_buf_mem_used_size(&msg->buf->mem);
834     app_name.start = msg->buf->mem.pos;
835 
836     nxt_debug(task, "app_restart_handler: %V", &app_name);
837 
838     app = nxt_router_app_find(&nxt_router->apps, &app_name);
839 
840     if (nxt_fast_path(app != NULL)) {
841         shared_port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
842                                    NXT_PROCESS_APP);
843         if (nxt_slow_path(shared_port == NULL)) {
844             goto fail;
845         }
846 
847         ret = nxt_port_socket_init(task, shared_port, 0);
848         if (nxt_slow_path(ret != NXT_OK)) {
849             nxt_port_use(task, shared_port, -1);
850             goto fail;
851         }
852 
853         ret = nxt_router_app_queue_init(task, shared_port);
854         if (nxt_slow_path(ret != NXT_OK)) {
855             nxt_port_write_close(shared_port);
856             nxt_port_read_close(shared_port);
857             nxt_port_use(task, shared_port, -1);
858             goto fail;
859         }
860 
861         nxt_port_write_enable(task, shared_port);
862 
863         nxt_thread_mutex_lock(&app->mutex);
864 
865         nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
866 
867             (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1,
868                                          0, 0, NULL);
869 
870         } nxt_queue_loop;
871 
872         app->generation++;
873 
874         shared_port->app = app;
875 
876         old_shared_port = app->shared_port;
877         old_shared_port->app = NULL;
878 
879         app->shared_port = shared_port;
880 
881         nxt_thread_mutex_unlock(&app->mutex);
882 
883         nxt_port_close(task, old_shared_port);
884         nxt_port_use(task, old_shared_port, -1);
885 
886         reply = NXT_PORT_MSG_RPC_READY_LAST;
887 
888     } else {
889 
890 fail:
891 
892         reply = NXT_PORT_MSG_RPC_ERROR;
893     }
894 
895     nxt_port_socket_write(task, reply_port, reply, -1, msg->port_msg.stream,
896                           0, NULL);
897 }
898 
899 
900 static void
901 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
902     void *data)
903 {
904     union {
905         nxt_pid_t  removed_pid;
906         void       *data;
907     } u;
908 
909     u.data = data;
910 
911     nxt_port_rpc_remove_peer(task, port, u.removed_pid);
912 }
913 
914 
915 static void
916 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
917 {
918     nxt_event_engine_t  *engine;
919 
920     nxt_port_remove_pid_handler(task, msg);
921 
922     nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
923     {
924         if (nxt_fast_path(engine->port != NULL)) {
925             nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid,
926                           msg->u.data);
927         }
928     }
929     nxt_queue_loop;
930 
931     if (msg->port_msg.stream == 0) {
932         return;
933     }
934 
935     msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
936 
937     nxt_port_rpc_handler(task, msg);
938 }
939 
940 
941 static nxt_router_temp_conf_t *
942 nxt_router_temp_conf(nxt_task_t *task)
943 {
944     nxt_mp_t                *mp, *tmp;
945     nxt_router_conf_t       *rtcf;
946     nxt_router_temp_conf_t  *tmcf;
947 
948     mp = nxt_mp_create(1024, 128, 256, 32);
949     if (nxt_slow_path(mp == NULL)) {
950         return NULL;
951     }
952 
953     rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t));
954     if (nxt_slow_path(rtcf == NULL)) {
955         goto fail;
956     }
957 
958     rtcf->mem_pool = mp;
959 
960     tmp = nxt_mp_create(1024, 128, 256, 32);
961     if (nxt_slow_path(tmp == NULL)) {
962         goto fail;
963     }
964 
965     tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t));
966     if (nxt_slow_path(tmcf == NULL)) {
967         goto temp_fail;
968     }
969 
970     tmcf->mem_pool = tmp;
971     tmcf->router_conf = rtcf;
972     tmcf->count = 1;
973     tmcf->engine = task->thread->engine;
974 
975     tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
976                                      sizeof(nxt_router_engine_conf_t));
977     if (nxt_slow_path(tmcf->engines == NULL)) {
978         goto temp_fail;
979     }
980 
981     nxt_queue_init(&creating_sockets);
982     nxt_queue_init(&pending_sockets);
983     nxt_queue_init(&updating_sockets);
984     nxt_queue_init(&keeping_sockets);
985     nxt_queue_init(&deleting_sockets);
986 
987 #if (NXT_TLS)
988     nxt_queue_init(&tmcf->tls);
989 #endif
990 
991     nxt_queue_init(&tmcf->apps);
992     nxt_queue_init(&tmcf->previous);
993 
994     return tmcf;
995 
996 temp_fail:
997 
998     nxt_mp_destroy(tmp);
999 
1000 fail:
1001 
1002     nxt_mp_destroy(mp);
1003 
1004     return NULL;
1005 }
1006 
1007 
1008 nxt_inline nxt_bool_t
1009 nxt_router_app_can_start(nxt_app_t *app)
1010 {
1011     return app->processes + app->pending_processes < app->max_processes
1012             && app->pending_processes < app->max_pending_processes;
1013 }
1014 
1015 
1016 nxt_inline nxt_bool_t
1017 nxt_router_app_need_start(nxt_app_t *app)
1018 {
1019     return (app->active_requests
1020               > app->port_hash_count + app->pending_processes)
1021            || (app->spare_processes
1022                 > app->idle_processes + app->pending_processes);
1023 }
1024 
1025 
1026 static void
1027 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
1028 {
1029     nxt_int_t                    ret;
1030     nxt_app_t                    *app;
1031     nxt_router_t                 *router;
1032     nxt_runtime_t                *rt;
1033     nxt_queue_link_t             *qlk;
1034     nxt_socket_conf_t            *skcf;
1035     nxt_router_conf_t            *rtcf;
1036     nxt_router_temp_conf_t       *tmcf;
1037     const nxt_event_interface_t  *interface;
1038 #if (NXT_TLS)
1039     nxt_router_tlssock_t         *tls;
1040 #endif
1041 
1042     tmcf = obj;
1043 
1044     qlk = nxt_queue_first(&pending_sockets);
1045 
1046     if (qlk != nxt_queue_tail(&pending_sockets)) {
1047         nxt_queue_remove(qlk);
1048         nxt_queue_insert_tail(&creating_sockets, qlk);
1049 
1050         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1051 
1052         nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
1053 
1054         return;
1055     }
1056 
1057 #if (NXT_TLS)
1058     qlk = nxt_queue_last(&tmcf->tls);
1059 
1060     if (qlk != nxt_queue_head(&tmcf->tls)) {
1061         nxt_queue_remove(qlk);
1062 
1063         tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link);
1064 
1065         nxt_cert_store_get(task, &tls->name, tmcf->mem_pool,
1066                            nxt_router_tls_rpc_handler, tls);
1067         return;
1068     }
1069 #endif
1070 
1071     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1072 
1073         if (nxt_router_app_need_start(app)) {
1074             nxt_router_app_rpc_create(task, tmcf, app);
1075             return;
1076         }
1077 
1078     } nxt_queue_loop;
1079 
1080     rtcf = tmcf->router_conf;
1081 
1082     if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) {
1083         nxt_router_access_log_open(task, tmcf);
1084         return;
1085     }
1086 
1087     rt = task->thread->runtime;
1088 
1089     interface = nxt_service_get(rt->services, "engine", NULL);
1090 
1091     router = rtcf->router;
1092 
1093     ret = nxt_router_engines_create(task, router, tmcf, interface);
1094     if (nxt_slow_path(ret != NXT_OK)) {
1095         goto fail;
1096     }
1097 
1098     ret = nxt_router_threads_create(task, rt, tmcf);
1099     if (nxt_slow_path(ret != NXT_OK)) {
1100         goto fail;
1101     }
1102 
1103     nxt_router_apps_sort(task, router, tmcf);
1104 
1105     nxt_router_apps_hash_use(task, rtcf, 1);
1106 
1107     nxt_router_engines_post(router, tmcf);
1108 
1109     nxt_queue_add(&router->sockets, &updating_sockets);
1110     nxt_queue_add(&router->sockets, &creating_sockets);
1111 
1112     router->access_log = rtcf->access_log;
1113 
1114     nxt_router_conf_ready(task, tmcf);
1115 
1116     return;
1117 
1118 fail:
1119 
1120     nxt_router_conf_error(task, tmcf);
1121 
1122     return;
1123 }
1124 
1125 
1126 static void
1127 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data)
1128 {
1129     nxt_joint_job_t  *job;
1130 
1131     job = obj;
1132 
1133     nxt_router_conf_ready(task, job->tmcf);
1134 }
1135 
1136 
1137 static void
1138 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1139 {
1140     uint32_t               count;
1141     nxt_router_conf_t      *rtcf;
1142     nxt_thread_spinlock_t  *lock;
1143 
1144     nxt_debug(task, "temp conf %p count: %D", tmcf, tmcf->count);
1145 
1146     if (--tmcf->count > 0) {
1147         return;
1148     }
1149 
1150     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
1151 
1152     rtcf = tmcf->router_conf;
1153 
1154     lock = &rtcf->router->lock;
1155 
1156     nxt_thread_spin_lock(lock);
1157 
1158     count = rtcf->count;
1159 
1160     nxt_thread_spin_unlock(lock);
1161 
1162     nxt_debug(task, "rtcf %p: %D", rtcf, count);
1163 
1164     if (count == 0) {
1165         nxt_router_apps_hash_use(task, rtcf, -1);
1166 
1167         nxt_router_access_log_release(task, lock, rtcf->access_log);
1168 
1169         nxt_mp_destroy(rtcf->mem_pool);
1170     }
1171 
1172     nxt_mp_release(tmcf->mem_pool);
1173 }
1174 
1175 
1176 static void
1177 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1178 {
1179     nxt_app_t          *app;
1180     nxt_queue_t        new_socket_confs;
1181     nxt_socket_t       s;
1182     nxt_router_t       *router;
1183     nxt_queue_link_t   *qlk;
1184     nxt_socket_conf_t  *skcf;
1185     nxt_router_conf_t  *rtcf;
1186 
1187     nxt_alert(task, "failed to apply new conf");
1188 
1189     for (qlk = nxt_queue_first(&creating_sockets);
1190          qlk != nxt_queue_tail(&creating_sockets);
1191          qlk = nxt_queue_next(qlk))
1192     {
1193         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1194         s = skcf->listen->socket;
1195 
1196         if (s != -1) {
1197             nxt_socket_close(task, s);
1198         }
1199 
1200         nxt_free(skcf->listen);
1201     }
1202 
1203     nxt_queue_init(&new_socket_confs);
1204     nxt_queue_add(&new_socket_confs, &updating_sockets);
1205     nxt_queue_add(&new_socket_confs, &pending_sockets);
1206     nxt_queue_add(&new_socket_confs, &creating_sockets);
1207 
1208     rtcf = tmcf->router_conf;
1209 
1210     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1211 
1212         nxt_router_app_unlink(task, app);
1213 
1214     } nxt_queue_loop;
1215 
1216     router = rtcf->router;
1217 
1218     nxt_queue_add(&router->sockets, &keeping_sockets);
1219     nxt_queue_add(&router->sockets, &deleting_sockets);
1220 
1221     nxt_queue_add(&router->apps, &tmcf->previous);
1222 
1223     // TODO: new engines and threads
1224 
1225     nxt_router_access_log_release(task, &router->lock, rtcf->access_log);
1226 
1227     nxt_mp_destroy(rtcf->mem_pool);
1228 
1229     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
1230 
1231     nxt_mp_release(tmcf->mem_pool);
1232 }
1233 
1234 
1235 static void
1236 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1237     nxt_port_msg_type_t type)
1238 {
1239     nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);
1240 
1241     nxt_port_use(task, tmcf->port, -1);
1242 
1243     tmcf->port = NULL;
1244 }
1245 
1246 
1247 static nxt_conf_map_t  nxt_router_conf[] = {
1248     {
1249         nxt_string("listeners_threads"),
1250         NXT_CONF_MAP_INT32,
1251         offsetof(nxt_router_conf_t, threads),
1252     },
1253 };
1254 
1255 
1256 static nxt_conf_map_t  nxt_router_app_conf[] = {
1257     {
1258         nxt_string("type"),
1259         NXT_CONF_MAP_STR,
1260         offsetof(nxt_router_app_conf_t, type),
1261     },
1262 
1263     {
1264         nxt_string("limits"),
1265         NXT_CONF_MAP_PTR,
1266         offsetof(nxt_router_app_conf_t, limits_value),
1267     },
1268 
1269     {
1270         nxt_string("processes"),
1271         NXT_CONF_MAP_INT32,
1272         offsetof(nxt_router_app_conf_t, processes),
1273     },
1274 
1275     {
1276         nxt_string("processes"),
1277         NXT_CONF_MAP_PTR,
1278         offsetof(nxt_router_app_conf_t, processes_value),
1279     },
1280 
1281     {
1282         nxt_string("targets"),
1283         NXT_CONF_MAP_PTR,
1284         offsetof(nxt_router_app_conf_t, targets_value),
1285     },
1286 };
1287 
1288 
1289 static nxt_conf_map_t  nxt_router_app_limits_conf[] = {
1290     {
1291         nxt_string("timeout"),
1292         NXT_CONF_MAP_MSEC,
1293         offsetof(nxt_router_app_conf_t, timeout),
1294     },
1295 
1296     {
1297         nxt_string("requests"),
1298         NXT_CONF_MAP_INT32,
1299         offsetof(nxt_router_app_conf_t, requests),
1300     },
1301 };
1302 
1303 
1304 static nxt_conf_map_t  nxt_router_app_processes_conf[] = {
1305     {
1306         nxt_string("spare"),
1307         NXT_CONF_MAP_INT32,
1308         offsetof(nxt_router_app_conf_t, spare_processes),
1309     },
1310 
1311     {
1312         nxt_string("max"),
1313         NXT_CONF_MAP_INT32,
1314         offsetof(nxt_router_app_conf_t, max_processes),
1315     },
1316 
1317     {
1318         nxt_string("idle_timeout"),
1319         NXT_CONF_MAP_MSEC,
1320         offsetof(nxt_router_app_conf_t, idle_timeout),
1321     },
1322 };
1323 
1324 
1325 static nxt_conf_map_t  nxt_router_listener_conf[] = {
1326     {
1327         nxt_string("pass"),
1328         NXT_CONF_MAP_STR_COPY,
1329         offsetof(nxt_router_listener_conf_t, pass),
1330     },
1331 
1332     {
1333         nxt_string("application"),
1334         NXT_CONF_MAP_STR_COPY,
1335         offsetof(nxt_router_listener_conf_t, application),
1336     },
1337 };
1338 
1339 
1340 static nxt_conf_map_t  nxt_router_http_conf[] = {
1341     {
1342         nxt_string("header_buffer_size"),
1343         NXT_CONF_MAP_SIZE,
1344         offsetof(nxt_socket_conf_t, header_buffer_size),
1345     },
1346 
1347     {
1348         nxt_string("large_header_buffer_size"),
1349         NXT_CONF_MAP_SIZE,
1350         offsetof(nxt_socket_conf_t, large_header_buffer_size),
1351     },
1352 
1353     {
1354         nxt_string("large_header_buffers"),
1355         NXT_CONF_MAP_SIZE,
1356         offsetof(nxt_socket_conf_t, large_header_buffers),
1357     },
1358 
1359     {
1360         nxt_string("body_buffer_size"),
1361         NXT_CONF_MAP_SIZE,
1362         offsetof(nxt_socket_conf_t, body_buffer_size),
1363     },
1364 
1365     {
1366         nxt_string("max_body_size"),
1367         NXT_CONF_MAP_SIZE,
1368         offsetof(nxt_socket_conf_t, max_body_size),
1369     },
1370 
1371     {
1372         nxt_string("idle_timeout"),
1373         NXT_CONF_MAP_MSEC,
1374         offsetof(nxt_socket_conf_t, idle_timeout),
1375     },
1376 
1377     {
1378         nxt_string("header_read_timeout"),
1379         NXT_CONF_MAP_MSEC,
1380         offsetof(nxt_socket_conf_t, header_read_timeout),
1381     },
1382 
1383     {
1384         nxt_string("body_read_timeout"),
1385         NXT_CONF_MAP_MSEC,
1386         offsetof(nxt_socket_conf_t, body_read_timeout),
1387     },
1388 
1389     {
1390         nxt_string("send_timeout"),
1391         NXT_CONF_MAP_MSEC,
1392         offsetof(nxt_socket_conf_t, send_timeout),
1393     },
1394 
1395     {
1396         nxt_string("body_temp_path"),
1397         NXT_CONF_MAP_STR,
1398         offsetof(nxt_socket_conf_t, body_temp_path),
1399     },
1400 
1401     {
1402         nxt_string("discard_unsafe_fields"),
1403         NXT_CONF_MAP_INT8,
1404         offsetof(nxt_socket_conf_t, discard_unsafe_fields),
1405     },
1406 };
1407 
1408 
1409 static nxt_conf_map_t  nxt_router_websocket_conf[] = {
1410     {
1411         nxt_string("max_frame_size"),
1412         NXT_CONF_MAP_SIZE,
1413         offsetof(nxt_websocket_conf_t, max_frame_size),
1414     },
1415 
1416     {
1417         nxt_string("read_timeout"),
1418         NXT_CONF_MAP_MSEC,
1419         offsetof(nxt_websocket_conf_t, read_timeout),
1420     },
1421 
1422     {
1423         nxt_string("keepalive_interval"),
1424         NXT_CONF_MAP_MSEC,
1425         offsetof(nxt_websocket_conf_t, keepalive_interval),
1426     },
1427 
1428 };
1429 
1430 
1431 static nxt_int_t
1432 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1433     u_char *start, u_char *end)
1434 {
1435     u_char                      *p;
1436     size_t                      size;
1437     nxt_mp_t                    *mp, *app_mp;
1438     uint32_t                    next, next_target;
1439     nxt_int_t                   ret;
1440     nxt_str_t                   name, path, target;
1441     nxt_app_t                   *app, *prev;
1442     nxt_str_t                   *t, *s, *targets;
1443     nxt_uint_t                  n, i;
1444     nxt_port_t                  *port;
1445     nxt_router_t                *router;
1446     nxt_app_joint_t             *app_joint;
1447 #if (NXT_TLS)
1448     nxt_tls_init_t              *tls_init;
1449     nxt_conf_value_t            *certificate;
1450 #endif
1451     nxt_conf_value_t            *conf, *http, *value, *websocket;
1452     nxt_conf_value_t            *applications, *application;
1453     nxt_conf_value_t            *listeners, *listener;
1454     nxt_conf_value_t            *routes_conf, *static_conf, *client_ip_conf;
1455     nxt_socket_conf_t           *skcf;
1456     nxt_http_routes_t           *routes;
1457     nxt_event_engine_t          *engine;
1458     nxt_app_lang_module_t       *lang;
1459     nxt_router_app_conf_t       apcf;
1460     nxt_router_access_log_t     *access_log;
1461     nxt_router_listener_conf_t  lscf;
1462 
1463     static nxt_str_t  http_path = nxt_string("/settings/http");
1464     static nxt_str_t  applications_path = nxt_string("/applications");
1465     static nxt_str_t  listeners_path = nxt_string("/listeners");
1466     static nxt_str_t  routes_path = nxt_string("/routes");
1467     static nxt_str_t  access_log_path = nxt_string("/access_log");
1468 #if (NXT_TLS)
1469     static nxt_str_t  certificate_path = nxt_string("/tls/certificate");
1470     static nxt_str_t  conf_commands_path = nxt_string("/tls/conf_commands");
1471     static nxt_str_t  conf_cache_path = nxt_string("/tls/session/cache_size");
1472     static nxt_str_t  conf_timeout_path = nxt_string("/tls/session/timeout");
1473 #endif
1474     static nxt_str_t  static_path = nxt_string("/settings/http/static");
1475     static nxt_str_t  websocket_path = nxt_string("/settings/http/websocket");
1476     static nxt_str_t  client_ip_path = nxt_string("/client_ip");
1477 
1478     conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
1479     if (conf == NULL) {
1480         nxt_alert(task, "configuration parsing error");
1481         return NXT_ERROR;
1482     }
1483 
1484     mp = tmcf->router_conf->mem_pool;
1485 
1486     ret = nxt_conf_map_object(mp, conf, nxt_router_conf,
1487                               nxt_nitems(nxt_router_conf), tmcf->router_conf);
1488     if (ret != NXT_OK) {
1489         nxt_alert(task, "root map error");
1490         return NXT_ERROR;
1491     }
1492 
1493     if (tmcf->router_conf->threads == 0) {
1494         tmcf->router_conf->threads = nxt_ncpu;
1495     }
1496 
1497     static_conf = nxt_conf_get_path(conf, &static_path);
1498 
1499     ret = nxt_router_conf_process_static(task, tmcf->router_conf, static_conf);
1500     if (nxt_slow_path(ret != NXT_OK)) {
1501         return NXT_ERROR;
1502     }
1503 
1504     router = tmcf->router_conf->router;
1505 
1506     applications = nxt_conf_get_path(conf, &applications_path);
1507 
1508     if (applications != NULL) {
1509         next = 0;
1510 
1511         for ( ;; ) {
1512             application = nxt_conf_next_object_member(applications,
1513                                                       &name, &next);
1514             if (application == NULL) {
1515                 break;
1516             }
1517 
1518             nxt_debug(task, "application \"%V\"", &name);
1519 
1520             size = nxt_conf_json_length(application, NULL);
1521 
1522             app_mp = nxt_mp_create(4096, 128, 1024, 64);
1523             if (nxt_slow_path(app_mp == NULL)) {
1524                 goto fail;
1525             }
1526 
1527             app = nxt_mp_get(app_mp, sizeof(nxt_app_t) + name.length + size);
1528             if (app == NULL) {
1529                 goto app_fail;
1530             }
1531 
1532             nxt_memzero(app, sizeof(nxt_app_t));
1533 
1534             app->mem_pool = app_mp;
1535 
1536             app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t));
1537             app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t)
1538                                                   + name.length);
1539 
1540             p = nxt_conf_json_print(app->conf.start, application, NULL);
1541             app->conf.length = p - app->conf.start;
1542 
1543             nxt_assert(app->conf.length <= size);
1544 
1545             nxt_debug(task, "application conf \"%V\"", &app->conf);
1546 
1547             prev = nxt_router_app_find(&router->apps, &name);
1548 
1549             if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
1550                 nxt_mp_destroy(app_mp);
1551 
1552                 nxt_queue_remove(&prev->link);
1553                 nxt_queue_insert_tail(&tmcf->previous, &prev->link);
1554 
1555                 ret = nxt_router_apps_hash_add(tmcf->router_conf, prev);
1556                 if (nxt_slow_path(ret != NXT_OK)) {
1557                     goto fail;
1558                 }
1559 
1560                 continue;
1561             }
1562 
1563             apcf.processes = 1;
1564             apcf.max_processes = 1;
1565             apcf.spare_processes = 0;
1566             apcf.timeout = 0;
1567             apcf.idle_timeout = 15000;
1568             apcf.requests = 0;
1569             apcf.limits_value = NULL;
1570             apcf.processes_value = NULL;
1571             apcf.targets_value = NULL;
1572 
1573             app_joint = nxt_malloc(sizeof(nxt_app_joint_t));
1574             if (nxt_slow_path(app_joint == NULL)) {
1575                 goto app_fail;
1576             }
1577 
1578             nxt_memzero(app_joint, sizeof(nxt_app_joint_t));
1579 
1580             ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
1581                                       nxt_nitems(nxt_router_app_conf), &apcf);
1582             if (ret != NXT_OK) {
1583                 nxt_alert(task, "application map error");
1584                 goto app_fail;
1585             }
1586 
1587             if (apcf.limits_value != NULL) {
1588 
1589                 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) {
1590                     nxt_alert(task, "application limits is not object");
1591                     goto app_fail;
1592                 }
1593 
1594                 ret = nxt_conf_map_object(mp, apcf.limits_value,
1595                                         nxt_router_app_limits_conf,
1596                                         nxt_nitems(nxt_router_app_limits_conf),
1597                                         &apcf);
1598                 if (ret != NXT_OK) {
1599                     nxt_alert(task, "application limits map error");
1600                     goto app_fail;
1601                 }
1602             }
1603 
1604             if (apcf.processes_value != NULL
1605                 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT)
1606             {
1607                 ret = nxt_conf_map_object(mp, apcf.processes_value,
1608                                      nxt_router_app_processes_conf,
1609                                      nxt_nitems(nxt_router_app_processes_conf),
1610                                      &apcf);
1611                 if (ret != NXT_OK) {
1612                     nxt_alert(task, "application processes map error");
1613                     goto app_fail;
1614                 }
1615 
1616             } else {
1617                 apcf.max_processes = apcf.processes;
1618                 apcf.spare_processes = apcf.processes;
1619             }
1620 
1621             if (apcf.targets_value != NULL) {
1622                 n = nxt_conf_object_members_count(apcf.targets_value);
1623 
1624                 targets = nxt_mp_get(app_mp, sizeof(nxt_str_t) * n);
1625                 if (nxt_slow_path(targets == NULL)) {
1626                     goto app_fail;
1627                 }
1628 
1629                 next_target = 0;
1630 
1631                 for (i = 0; i < n; i++) {
1632                     (void) nxt_conf_next_object_member(apcf.targets_value,
1633                                                        &target, &next_target);
1634 
1635                     s = nxt_str_dup(app_mp, &targets[i], &target);
1636                     if (nxt_slow_path(s == NULL)) {
1637                         goto app_fail;
1638                     }
1639                 }
1640 
1641             } else {
1642                 targets = NULL;
1643             }
1644 
1645             nxt_debug(task, "application type: %V", &apcf.type);
1646             nxt_debug(task, "application processes: %D", apcf.processes);
1647             nxt_debug(task, "application request timeout: %M", apcf.timeout);
1648             nxt_debug(task, "application requests: %D", apcf.requests);
1649 
1650             lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
1651 
1652             if (lang == NULL) {
1653                 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type);
1654                 goto app_fail;
1655             }
1656 
1657             nxt_debug(task, "application language module: \"%s\"", lang->file);
1658 
1659             ret = nxt_thread_mutex_create(&app->mutex);
1660             if (ret != NXT_OK) {
1661                 goto app_fail;
1662             }
1663 
1664             nxt_queue_init(&app->ports);
1665             nxt_queue_init(&app->spare_ports);
1666             nxt_queue_init(&app->idle_ports);
1667             nxt_queue_init(&app->ack_waiting_req);
1668 
1669             app->name.length = name.length;
1670             nxt_memcpy(app->name.start, name.start, name.length);
1671 
1672             app->type = lang->type;
1673             app->max_processes = apcf.max_processes;
1674             app->spare_processes = apcf.spare_processes;
1675             app->max_pending_processes = apcf.spare_processes
1676                                          ? apcf.spare_processes : 1;
1677             app->timeout = apcf.timeout;
1678             app->idle_timeout = apcf.idle_timeout;
1679             app->max_requests = apcf.requests;
1680 
1681             app->targets = targets;
1682 
1683             engine = task->thread->engine;
1684 
1685             app->engine = engine;
1686 
1687             app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;
1688             app->adjust_idle_work.task = &engine->task;
1689             app->adjust_idle_work.obj = app;
1690 
1691             nxt_queue_insert_tail(&tmcf->apps, &app->link);
1692 
1693             ret = nxt_router_apps_hash_add(tmcf->router_conf, app);
1694             if (nxt_slow_path(ret != NXT_OK)) {
1695                 goto app_fail;
1696             }
1697 
1698             nxt_router_app_use(task, app, 1);
1699 
1700             app->joint = app_joint;
1701 
1702             app_joint->use_count = 1;
1703             app_joint->app = app;
1704 
1705             app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS;
1706             app_joint->idle_timer.work_queue = &engine->fast_work_queue;
1707             app_joint->idle_timer.handler = nxt_router_app_idle_timeout;
1708             app_joint->idle_timer.task = &engine->task;
1709             app_joint->idle_timer.log = app_joint->idle_timer.task->log;
1710 
1711             app_joint->free_app_work.handler = nxt_router_free_app;
1712             app_joint->free_app_work.task = &engine->task;
1713             app_joint->free_app_work.obj = app_joint;
1714 
1715             port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
1716                                 NXT_PROCESS_APP);
1717             if (nxt_slow_path(port == NULL)) {
1718                 return NXT_ERROR;
1719             }
1720 
1721             ret = nxt_port_socket_init(task, port, 0);
1722             if (nxt_slow_path(ret != NXT_OK)) {
1723                 nxt_port_use(task, port, -1);
1724                 return NXT_ERROR;
1725             }
1726 
1727             ret = nxt_router_app_queue_init(task, port);
1728             if (nxt_slow_path(ret != NXT_OK)) {
1729                 nxt_port_write_close(port);
1730                 nxt_port_read_close(port);
1731                 nxt_port_use(task, port, -1);
1732                 return NXT_ERROR;
1733             }
1734 
1735             nxt_port_write_enable(task, port);
1736             port->app = app;
1737 
1738             app->shared_port = port;
1739 
1740             nxt_thread_mutex_create(&app->outgoing.mutex);
1741         }
1742     }
1743 
1744     routes_conf = nxt_conf_get_path(conf, &routes_path);
1745     if (nxt_fast_path(routes_conf != NULL)) {
1746         routes = nxt_http_routes_create(task, tmcf, routes_conf);
1747         if (nxt_slow_path(routes == NULL)) {
1748             return NXT_ERROR;
1749         }
1750         tmcf->router_conf->routes = routes;
1751     }
1752 
1753     ret = nxt_upstreams_create(task, tmcf, conf);
1754     if (nxt_slow_path(ret != NXT_OK)) {
1755         return ret;
1756     }
1757 
1758     http = nxt_conf_get_path(conf, &http_path);
1759 #if 0
1760     if (http == NULL) {
1761         nxt_alert(task, "no \"http\" block");
1762         return NXT_ERROR;
1763     }
1764 #endif
1765 
1766     websocket = nxt_conf_get_path(conf, &websocket_path);
1767 
1768     listeners = nxt_conf_get_path(conf, &listeners_path);
1769 
1770     if (listeners != NULL) {
1771         next = 0;
1772 
1773         for ( ;; ) {
1774             listener = nxt_conf_next_object_member(listeners, &name, &next);
1775             if (listener == NULL) {
1776                 break;
1777             }
1778 
1779             skcf = nxt_router_socket_conf(task, tmcf, &name);
1780             if (skcf == NULL) {
1781                 goto fail;
1782             }
1783 
1784             nxt_memzero(&lscf, sizeof(lscf));
1785 
1786             ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
1787                                       nxt_nitems(nxt_router_listener_conf),
1788                                       &lscf);
1789             if (ret != NXT_OK) {
1790                 nxt_alert(task, "listener map error");
1791                 goto fail;
1792             }
1793 
1794             nxt_debug(task, "application: %V", &lscf.application);
1795 
1796             // STUB, default values if http block is not defined.
1797             skcf->header_buffer_size = 2048;
1798             skcf->large_header_buffer_size = 8192;
1799             skcf->large_header_buffers = 4;
1800             skcf->discard_unsafe_fields = 1;
1801             skcf->body_buffer_size = 16 * 1024;
1802             skcf->max_body_size = 8 * 1024 * 1024;
1803             skcf->proxy_header_buffer_size = 64 * 1024;
1804             skcf->proxy_buffer_size = 4096;
1805             skcf->proxy_buffers = 256;
1806             skcf->idle_timeout = 180 * 1000;
1807             skcf->header_read_timeout = 30 * 1000;
1808             skcf->body_read_timeout = 30 * 1000;
1809             skcf->send_timeout = 30 * 1000;
1810             skcf->proxy_timeout = 60 * 1000;
1811             skcf->proxy_send_timeout = 30 * 1000;
1812             skcf->proxy_read_timeout = 30 * 1000;
1813 
1814             skcf->websocket_conf.max_frame_size = 1024 * 1024;
1815             skcf->websocket_conf.read_timeout = 60 * 1000;
1816             skcf->websocket_conf.keepalive_interval = 30 * 1000;
1817 
1818             nxt_str_null(&skcf->body_temp_path);
1819 
1820             if (http != NULL) {
1821                 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
1822                                           nxt_nitems(nxt_router_http_conf),
1823                                           skcf);
1824                 if (ret != NXT_OK) {
1825                     nxt_alert(task, "http map error");
1826                     goto fail;
1827                 }
1828             }
1829 
1830             if (websocket != NULL) {
1831                 ret = nxt_conf_map_object(mp, websocket,
1832                                           nxt_router_websocket_conf,
1833                                           nxt_nitems(nxt_router_websocket_conf),
1834                                           &skcf->websocket_conf);
1835                 if (ret != NXT_OK) {
1836                     nxt_alert(task, "websocket map error");
1837                     goto fail;
1838                 }
1839             }
1840 
1841             t = &skcf->body_temp_path;
1842 
1843             if (t->length == 0) {
1844                 t->start = (u_char *) task->thread->runtime->tmp;
1845                 t->length = nxt_strlen(t->start);
1846             }
1847 
1848             client_ip_conf = nxt_conf_get_path(listener, &client_ip_path);
1849             ret = nxt_router_conf_process_client_ip(task, tmcf, skcf,
1850                                                     client_ip_conf);
1851             if (nxt_slow_path(ret != NXT_OK)) {
1852                 return NXT_ERROR;
1853             }
1854 
1855 #if (NXT_TLS)
1856             certificate = nxt_conf_get_path(listener, &certificate_path);
1857 
1858             if (certificate != NULL) {
1859                 tls_init = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_tls_init_t));
1860                 if (nxt_slow_path(tls_init == NULL)) {
1861                     return NXT_ERROR;
1862                 }
1863 
1864                 tls_init->cache_size = 0;
1865                 tls_init->timeout = 300;
1866 
1867                 value = nxt_conf_get_path(listener, &conf_cache_path);
1868                 if (value != NULL) {
1869                     tls_init->cache_size = nxt_conf_get_number(value);
1870                 }
1871 
1872                 value = nxt_conf_get_path(listener, &conf_timeout_path);
1873                 if (value != NULL) {
1874                     tls_init->timeout = nxt_conf_get_number(value);
1875                 }
1876 
1877                 tls_init->conf_cmds = nxt_conf_get_path(listener,
1878                                                         &conf_commands_path);
1879 
1880                 if (nxt_conf_type(certificate) == NXT_CONF_ARRAY) {
1881                     n = nxt_conf_array_elements_count(certificate);
1882 
1883                     for (i = 0; i < n; i++) {
1884                         value = nxt_conf_get_array_element(certificate, i);
1885 
1886                         nxt_assert(value != NULL);
1887 
1888                         ret = nxt_router_conf_tls_insert(tmcf, value, skcf,
1889                                                          tls_init, i == 0);
1890                         if (nxt_slow_path(ret != NXT_OK)) {
1891                             goto fail;
1892                         }
1893                     }
1894 
1895                 } else {
1896                     /* NXT_CONF_STRING */
1897                     ret = nxt_router_conf_tls_insert(tmcf, certificate, skcf,
1898                                                      tls_init, 1);
1899                     if (nxt_slow_path(ret != NXT_OK)) {
1900                         goto fail;
1901                     }
1902                 }
1903             }
1904 #endif
1905 
1906             skcf->listen->handler = nxt_http_conn_init;
1907             skcf->router_conf = tmcf->router_conf;
1908             skcf->router_conf->count++;
1909 
1910             if (lscf.pass.length != 0) {
1911                 skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass);
1912 
1913             /* COMPATIBILITY: listener application. */
1914             } else if (lscf.application.length > 0) {
1915                 skcf->action = nxt_http_pass_application(task,
1916                                                          tmcf->router_conf,
1917                                                          &lscf.application);
1918             }
1919 
1920             if (nxt_slow_path(skcf->action == NULL)) {
1921                 goto fail;
1922             }
1923         }
1924     }
1925 
1926     ret = nxt_http_routes_resolve(task, tmcf);
1927     if (nxt_slow_path(ret != NXT_OK)) {
1928         goto fail;
1929     }
1930 
1931     value = nxt_conf_get_path(conf, &access_log_path);
1932 
1933     if (value != NULL) {
1934         nxt_conf_get_string(value, &path);
1935 
1936         access_log = router->access_log;
1937 
1938         if (access_log != NULL && nxt_strstr_eq(&path, &access_log->path)) {
1939             nxt_thread_spin_lock(&router->lock);
1940             access_log->count++;
1941             nxt_thread_spin_unlock(&router->lock);
1942 
1943         } else {
1944             access_log = nxt_malloc(sizeof(nxt_router_access_log_t)
1945                                     + path.length);
1946             if (access_log == NULL) {
1947                 nxt_alert(task, "failed to allocate access log structure");
1948                 goto fail;
1949             }
1950 
1951             access_log->fd = -1;
1952             access_log->handler = &nxt_router_access_log_writer;
1953             access_log->count = 1;
1954 
1955             access_log->path.length = path.length;
1956             access_log->path.start = (u_char *) access_log
1957                                      + sizeof(nxt_router_access_log_t);
1958 
1959             nxt_memcpy(access_log->path.start, path.start, path.length);
1960         }
1961 
1962         tmcf->router_conf->access_log = access_log;
1963     }
1964 
1965     nxt_queue_add(&deleting_sockets, &router->sockets);
1966     nxt_queue_init(&router->sockets);
1967 
1968     return NXT_OK;
1969 
1970 app_fail:
1971 
1972     nxt_mp_destroy(app_mp);
1973 
1974 fail:
1975 
1976     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1977 
1978         nxt_queue_remove(&app->link);
1979         nxt_thread_mutex_destroy(&app->mutex);
1980         nxt_mp_destroy(app->mem_pool);
1981 
1982     } nxt_queue_loop;
1983 
1984     return NXT_ERROR;
1985 }
1986 
1987 
1988 #if (NXT_TLS)
1989 
1990 static nxt_int_t
1991 nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
1992     nxt_conf_value_t *value, nxt_socket_conf_t *skcf,
1993     nxt_tls_init_t *tls_init, nxt_bool_t last)
1994 {
1995     nxt_router_tlssock_t  *tls;
1996 
1997     tls = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_router_tlssock_t));
1998     if (nxt_slow_path(tls == NULL)) {
1999         return NXT_ERROR;
2000     }
2001 
2002     tls->tls_init = tls_init;
2003     tls->socket_conf = skcf;
2004     tls->temp_conf = tmcf;
2005     tls->last = last;
2006     nxt_conf_get_string(value, &tls->name);
2007 
2008     nxt_queue_insert_tail(&tmcf->tls, &tls->link);
2009 
2010     return NXT_OK;
2011 }
2012 
2013 #endif
2014 
2015 
2016 static nxt_int_t
2017 nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf,
2018     nxt_conf_value_t *conf)
2019 {
2020     uint32_t          next, i;
2021     nxt_mp_t          *mp;
2022     nxt_str_t         *type, exten, str;
2023     nxt_int_t         ret;
2024     nxt_uint_t        exts;
2025     nxt_conf_value_t  *mtypes_conf, *ext_conf, *value;
2026 
2027     static nxt_str_t  mtypes_path = nxt_string("/mime_types");
2028 
2029     mp = rtcf->mem_pool;
2030 
2031     ret = nxt_http_static_mtypes_init(mp, &rtcf->mtypes_hash);
2032     if (nxt_slow_path(ret != NXT_OK)) {
2033         return NXT_ERROR;
2034     }
2035 
2036     if (conf == NULL) {
2037         return NXT_OK;
2038     }
2039 
2040     mtypes_conf = nxt_conf_get_path(conf, &mtypes_path);
2041 
2042     if (mtypes_conf != NULL) {
2043         next = 0;
2044 
2045         for ( ;; ) {
2046             ext_conf = nxt_conf_next_object_member(mtypes_conf, &str, &next);
2047 
2048             if (ext_conf == NULL) {
2049                 break;
2050             }
2051 
2052             type = nxt_str_dup(mp, NULL, &str);
2053             if (nxt_slow_path(type == NULL)) {
2054                 return NXT_ERROR;
2055             }
2056 
2057             if (nxt_conf_type(ext_conf) == NXT_CONF_STRING) {
2058                 nxt_conf_get_string(ext_conf, &str);
2059 
2060                 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2061                     return NXT_ERROR;
2062                 }
2063 
2064                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2065                                                       &exten, type);
2066                 if (nxt_slow_path(ret != NXT_OK)) {
2067                     return NXT_ERROR;
2068                 }
2069 
2070                 continue;
2071             }
2072 
2073             exts = nxt_conf_array_elements_count(ext_conf);
2074 
2075             for (i = 0; i < exts; i++) {
2076                 value = nxt_conf_get_array_element(ext_conf, i);
2077 
2078                 nxt_conf_get_string(value, &str);
2079 
2080                 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2081                     return NXT_ERROR;
2082                 }
2083 
2084                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2085                                                       &exten, type);
2086                 if (nxt_slow_path(ret != NXT_OK)) {
2087                     return NXT_ERROR;
2088                 }
2089             }
2090         }
2091     }
2092 
2093     return NXT_OK;
2094 }
2095 
2096 
2097 static nxt_int_t
2098 nxt_router_conf_process_client_ip(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2099     nxt_socket_conf_t *skcf, nxt_conf_value_t *conf)
2100 {
2101     char                        c;
2102     size_t                      i;
2103     nxt_mp_t                    *mp;
2104     uint32_t                    hash;
2105     nxt_str_t                   header;
2106     nxt_conf_value_t            *source_conf, *header_conf, *recursive_conf;
2107     nxt_http_client_ip_t        *client_ip;
2108     nxt_http_route_addr_rule_t  *source;
2109 
2110     static nxt_str_t  header_path = nxt_string("/header");
2111     static nxt_str_t  source_path = nxt_string("/source");
2112     static nxt_str_t  recursive_path = nxt_string("/recursive");
2113 
2114     if (conf == NULL) {
2115         skcf->client_ip = NULL;
2116 
2117         return NXT_OK;
2118     }
2119 
2120     mp = tmcf->router_conf->mem_pool;
2121 
2122     source_conf = nxt_conf_get_path(conf, &source_path);
2123     header_conf = nxt_conf_get_path(conf, &header_path);
2124     recursive_conf = nxt_conf_get_path(conf, &recursive_path);
2125 
2126     if (source_conf == NULL || header_conf == NULL) {
2127         return NXT_ERROR;
2128     }
2129 
2130     client_ip = nxt_mp_zget(mp, sizeof(nxt_http_client_ip_t));
2131     if (nxt_slow_path(client_ip == NULL)) {
2132         return NXT_ERROR;
2133     }
2134 
2135     source = nxt_http_route_addr_rule_create(task, mp, source_conf);
2136     if (nxt_slow_path(source == NULL)) {
2137         return NXT_ERROR;
2138     }
2139 
2140     client_ip->source = source;
2141 
2142     nxt_conf_get_string(header_conf, &header);
2143 
2144     if (recursive_conf != NULL) {
2145         client_ip->recursive = nxt_conf_get_boolean(recursive_conf);
2146     }
2147 
2148     client_ip->header = nxt_str_dup(mp, NULL, &header);
2149     if (nxt_slow_path(client_ip->header == NULL)) {
2150         return NXT_ERROR;
2151     }
2152 
2153     hash = NXT_HTTP_FIELD_HASH_INIT;
2154 
2155     for (i = 0; i < client_ip->header->length; i++) {
2156         c = client_ip->header->start[i];
2157         hash = nxt_http_field_hash_char(hash, nxt_lowcase(c));
2158     }
2159 
2160     hash = nxt_http_field_hash_end(hash) & 0xFFFF;
2161 
2162     client_ip->header_hash = hash;
2163 
2164     skcf->client_ip = client_ip;
2165 
2166     return NXT_OK;
2167 }
2168 
2169 
2170 static nxt_app_t *
2171 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
2172 {
2173     nxt_app_t  *app;
2174 
2175     nxt_queue_each(app, queue, nxt_app_t, link) {
2176 
2177         if (nxt_strstr_eq(name, &app->name)) {
2178             return app;
2179         }
2180 
2181     } nxt_queue_loop;
2182 
2183     return NULL;
2184 }
2185 
2186 
2187 static nxt_int_t
2188 nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port)
2189 {
2190     void       *mem;
2191     nxt_int_t  fd;
2192 
2193     fd = nxt_shm_open(task, sizeof(nxt_app_queue_t));
2194     if (nxt_slow_path(fd == -1)) {
2195         return NXT_ERROR;
2196     }
2197 
2198     mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t),
2199                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2200     if (nxt_slow_path(mem == MAP_FAILED)) {
2201         nxt_fd_close(fd);
2202 
2203         return NXT_ERROR;
2204     }
2205 
2206     nxt_app_queue_init(mem);
2207 
2208     port->queue_fd = fd;
2209     port->queue = mem;
2210 
2211     return NXT_OK;
2212 }
2213 
2214 
2215 static nxt_int_t
2216 nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port)
2217 {
2218     void       *mem;
2219     nxt_int_t  fd;
2220 
2221     fd = nxt_shm_open(task, sizeof(nxt_port_queue_t));
2222     if (nxt_slow_path(fd == -1)) {
2223         return NXT_ERROR;
2224     }
2225 
2226     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2227                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2228     if (nxt_slow_path(mem == MAP_FAILED)) {
2229         nxt_fd_close(fd);
2230 
2231         return NXT_ERROR;
2232     }
2233 
2234     nxt_port_queue_init(mem);
2235 
2236     port->queue_fd = fd;
2237     port->queue = mem;
2238 
2239     return NXT_OK;
2240 }
2241 
2242 
2243 static nxt_int_t
2244 nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd)
2245 {
2246     void  *mem;
2247 
2248     nxt_assert(fd != -1);
2249 
2250     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2251                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2252     if (nxt_slow_path(mem == MAP_FAILED)) {
2253 
2254         return NXT_ERROR;
2255     }
2256 
2257     port->queue = mem;
2258 
2259     return NXT_OK;
2260 }
2261 
2262 
2263 static const nxt_lvlhsh_proto_t  nxt_router_apps_hash_proto  nxt_aligned(64) = {
2264     NXT_LVLHSH_DEFAULT,
2265     nxt_router_apps_hash_test,
2266     nxt_mp_lvlhsh_alloc,
2267     nxt_mp_lvlhsh_free,
2268 };
2269 
2270 
2271 static nxt_int_t
2272 nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
2273 {
2274     nxt_app_t  *app;
2275 
2276     app = data;
2277 
2278     return nxt_strstr_eq(&lhq->key, &app->name) ? NXT_OK : NXT_DECLINED;
2279 }
2280 
2281 
2282 static nxt_int_t
2283 nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, nxt_app_t *app)
2284 {
2285     nxt_lvlhsh_query_t  lhq;
2286 
2287     lhq.key_hash = nxt_djb_hash(app->name.start, app->name.length);
2288     lhq.replace = 0;
2289     lhq.key = app->name;
2290     lhq.value = app;
2291     lhq.proto = &nxt_router_apps_hash_proto;
2292     lhq.pool = rtcf->mem_pool;
2293 
2294     switch (nxt_lvlhsh_insert(&rtcf->apps_hash, &lhq)) {
2295 
2296     case NXT_OK:
2297         return NXT_OK;
2298 
2299     case NXT_DECLINED:
2300         nxt_thread_log_alert("router app hash adding failed: "
2301                              "\"%V\" is already in hash", &lhq.key);
2302         /* Fall through. */
2303     default:
2304         return NXT_ERROR;
2305     }
2306 }
2307 
2308 
2309 static nxt_app_t *
2310 nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, nxt_str_t *name)
2311 {
2312     nxt_lvlhsh_query_t  lhq;
2313 
2314     lhq.key_hash = nxt_djb_hash(name->start, name->length);
2315     lhq.key = *name;
2316     lhq.proto = &nxt_router_apps_hash_proto;
2317 
2318     if (nxt_lvlhsh_find(&rtcf->apps_hash, &lhq) != NXT_OK) {
2319         return NULL;
2320     }
2321 
2322     return lhq.value;
2323 }
2324 
2325 
2326 static void
2327 nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i)
2328 {
2329     nxt_app_t          *app;
2330     nxt_lvlhsh_each_t  lhe;
2331 
2332     nxt_lvlhsh_each_init(&lhe, &nxt_router_apps_hash_proto);
2333 
2334     for ( ;; ) {
2335         app = nxt_lvlhsh_each(&rtcf->apps_hash, &lhe);
2336 
2337         if (app == NULL) {
2338             break;
2339         }
2340 
2341         nxt_router_app_use(task, app, i);
2342     }
2343 }
2344 
2345 
2346 typedef struct {
2347     nxt_app_t  *app;
2348     nxt_int_t  target;
2349 } nxt_http_app_conf_t;
2350 
2351 
2352 nxt_int_t
2353 nxt_router_application_init(nxt_router_conf_t *rtcf, nxt_str_t *name,
2354     nxt_str_t *target, nxt_http_action_t *action)
2355 {
2356     nxt_app_t            *app;
2357     nxt_str_t            *targets;
2358     nxt_uint_t           i;
2359     nxt_http_app_conf_t  *conf;
2360 
2361     app = nxt_router_apps_hash_get(rtcf, name);
2362     if (app == NULL) {
2363         return NXT_DECLINED;
2364     }
2365 
2366     conf = nxt_mp_get(rtcf->mem_pool, sizeof(nxt_http_app_conf_t));
2367     if (nxt_slow_path(conf == NULL)) {
2368         return NXT_ERROR;
2369     }
2370 
2371     action->handler = nxt_http_application_handler;
2372     action->u.conf = conf;
2373 
2374     conf->app = app;
2375 
2376     if (target != NULL && target->length != 0) {
2377         targets = app->targets;
2378 
2379         for (i = 0; !nxt_strstr_eq(target, &targets[i]); i++);
2380 
2381         conf->target = i;
2382 
2383     } else {
2384         conf->target = 0;
2385     }
2386 
2387     return NXT_OK;
2388 }
2389 
2390 
2391 static nxt_socket_conf_t *
2392 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2393     nxt_str_t *name)
2394 {
2395     size_t               size;
2396     nxt_int_t            ret;
2397     nxt_bool_t           wildcard;
2398     nxt_sockaddr_t       *sa;
2399     nxt_socket_conf_t    *skcf;
2400     nxt_listen_socket_t  *ls;
2401 
2402     sa = nxt_sockaddr_parse(tmcf->mem_pool, name);
2403     if (nxt_slow_path(sa == NULL)) {
2404         nxt_alert(task, "invalid listener \"%V\"", name);
2405         return NULL;
2406     }
2407 
2408     sa->type = SOCK_STREAM;
2409 
2410     nxt_debug(task, "router listener: \"%*s\"",
2411               (size_t) sa->length, nxt_sockaddr_start(sa));
2412 
2413     skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t));
2414     if (nxt_slow_path(skcf == NULL)) {
2415         return NULL;
2416     }
2417 
2418     size = nxt_sockaddr_size(sa);
2419 
2420     ret = nxt_router_listen_socket_find(tmcf, skcf, sa);
2421 
2422     if (ret != NXT_OK) {
2423 
2424         ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size);
2425         if (nxt_slow_path(ls == NULL)) {
2426             return NULL;
2427         }
2428 
2429         skcf->listen = ls;
2430 
2431         ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t));
2432         nxt_memcpy(ls->sockaddr, sa, size);
2433 
2434         nxt_listen_socket_remote_size(ls);
2435 
2436         ls->socket = -1;
2437         ls->backlog = NXT_LISTEN_BACKLOG;
2438         ls->flags = NXT_NONBLOCK;
2439         ls->read_after_accept = 1;
2440     }
2441 
2442     switch (sa->u.sockaddr.sa_family) {
2443 #if (NXT_HAVE_UNIX_DOMAIN)
2444     case AF_UNIX:
2445         wildcard = 0;
2446         break;
2447 #endif
2448 #if (NXT_INET6)
2449     case AF_INET6:
2450         wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr);
2451         break;
2452 #endif
2453     case AF_INET:
2454     default:
2455         wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY);
2456         break;
2457     }
2458 
2459     if (!wildcard) {
2460         skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size);
2461         if (nxt_slow_path(skcf->sockaddr == NULL)) {
2462             return NULL;
2463         }
2464 
2465         nxt_memcpy(skcf->sockaddr, sa, size);
2466     }
2467 
2468     return skcf;
2469 }
2470 
2471 
2472 static nxt_int_t
2473 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
2474     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa)
2475 {
2476     nxt_router_t       *router;
2477     nxt_queue_link_t   *qlk;
2478     nxt_socket_conf_t  *skcf;
2479 
2480     router = tmcf->router_conf->router;
2481 
2482     for (qlk = nxt_queue_first(&router->sockets);
2483          qlk != nxt_queue_tail(&router->sockets);
2484          qlk = nxt_queue_next(qlk))
2485     {
2486         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2487 
2488         if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) {
2489             nskcf->listen = skcf->listen;
2490 
2491             nxt_queue_remove(qlk);
2492             nxt_queue_insert_tail(&keeping_sockets, qlk);
2493 
2494             nxt_queue_insert_tail(&updating_sockets, &nskcf->link);
2495 
2496             return NXT_OK;
2497         }
2498     }
2499 
2500     nxt_queue_insert_tail(&pending_sockets, &nskcf->link);
2501 
2502     return NXT_DECLINED;
2503 }
2504 
2505 
2506 static void
2507 nxt_router_listen_socket_rpc_create(nxt_task_t *task,
2508     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)
2509 {
2510     size_t            size;
2511     uint32_t          stream;
2512     nxt_int_t         ret;
2513     nxt_buf_t         *b;
2514     nxt_port_t        *main_port, *router_port;
2515     nxt_runtime_t     *rt;
2516     nxt_socket_rpc_t  *rpc;
2517 
2518     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
2519     if (rpc == NULL) {
2520         goto fail;
2521     }
2522 
2523     rpc->socket_conf = skcf;
2524     rpc->temp_conf = tmcf;
2525 
2526     size = nxt_sockaddr_size(skcf->listen->sockaddr);
2527 
2528     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2529     if (b == NULL) {
2530         goto fail;
2531     }
2532 
2533     b->completion_handler = nxt_buf_dummy_completion;
2534 
2535     b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size);
2536 
2537     rt = task->thread->runtime;
2538     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2539     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2540 
2541     stream = nxt_port_rpc_register_handler(task, router_port,
2542                                            nxt_router_listen_socket_ready,
2543                                            nxt_router_listen_socket_error,
2544                                            main_port->pid, rpc);
2545     if (nxt_slow_path(stream == 0)) {
2546         goto fail;
2547     }
2548 
2549     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1,
2550                                 stream, router_port->id, b);
2551 
2552     if (nxt_slow_path(ret != NXT_OK)) {
2553         nxt_port_rpc_cancel(task, router_port, stream);
2554         goto fail;
2555     }
2556 
2557     return;
2558 
2559 fail:
2560 
2561     nxt_router_conf_error(task, tmcf);
2562 }
2563 
2564 
2565 static void
2566 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2567     void *data)
2568 {
2569     nxt_int_t         ret;
2570     nxt_socket_t      s;
2571     nxt_socket_rpc_t  *rpc;
2572 
2573     rpc = data;
2574 
2575     s = msg->fd[0];
2576 
2577     ret = nxt_socket_nonblocking(task, s);
2578     if (nxt_slow_path(ret != NXT_OK)) {
2579         goto fail;
2580     }
2581 
2582     nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr);
2583 
2584     ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
2585     if (nxt_slow_path(ret != NXT_OK)) {
2586         goto fail;
2587     }
2588 
2589     rpc->socket_conf->listen->socket = s;
2590 
2591     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2592                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2593 
2594     return;
2595 
2596 fail:
2597 
2598     nxt_socket_close(task, s);
2599 
2600     nxt_router_conf_error(task, rpc->temp_conf);
2601 }
2602 
2603 
2604 static void
2605 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2606     void *data)
2607 {
2608     nxt_socket_rpc_t        *rpc;
2609     nxt_router_temp_conf_t  *tmcf;
2610 
2611     rpc = data;
2612     tmcf = rpc->temp_conf;
2613 
2614 #if 0
2615     u_char                  *p;
2616     size_t                  size;
2617     uint8_t                 error;
2618     nxt_buf_t               *in, *out;
2619     nxt_sockaddr_t          *sa;
2620 
2621     static nxt_str_t  socket_errors[] = {
2622         nxt_string("ListenerSystem"),
2623         nxt_string("ListenerNoIPv6"),
2624         nxt_string("ListenerPort"),
2625         nxt_string("ListenerInUse"),
2626         nxt_string("ListenerNoAddress"),
2627         nxt_string("ListenerNoAccess"),
2628         nxt_string("ListenerPath"),
2629     };
2630 
2631     sa = rpc->socket_conf->listen->sockaddr;
2632 
2633     in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size);
2634 
2635     if (nxt_slow_path(in == NULL)) {
2636         return;
2637     }
2638 
2639     p = in->mem.pos;
2640 
2641     error = *p++;
2642 
2643     size = nxt_length("listen socket error: ")
2644            + nxt_length("{listener: \"\", code:\"\", message: \"\"}")
2645            + sa->length + socket_errors[error].length + (in->mem.free - p);
2646 
2647     out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2648     if (nxt_slow_path(out == NULL)) {
2649         return;
2650     }
2651 
2652     out->mem.free = nxt_sprintf(out->mem.free, out->mem.end,
2653                         "listen socket error: "
2654                         "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}",
2655                         (size_t) sa->length, nxt_sockaddr_start(sa),
2656                         &socket_errors[error], in->mem.free - p, p);
2657 
2658     nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos);
2659 #endif
2660 
2661     nxt_router_conf_error(task, tmcf);
2662 }
2663 
2664 
2665 #if (NXT_TLS)
2666 
2667 static void
2668 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2669     void *data)
2670 {
2671     nxt_mp_t                *mp;
2672     nxt_int_t               ret;
2673     nxt_tls_conf_t          *tlscf;
2674     nxt_router_tlssock_t    *tls;
2675     nxt_tls_bundle_conf_t   *bundle;
2676     nxt_router_temp_conf_t  *tmcf;
2677 
2678     nxt_debug(task, "tls rpc handler");
2679 
2680     tls = data;
2681     tmcf = tls->temp_conf;
2682 
2683     if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) {
2684         goto fail;
2685     }
2686 
2687     mp = tmcf->router_conf->mem_pool;
2688 
2689     if (tls->socket_conf->tls == NULL){
2690         tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t));
2691         if (nxt_slow_path(tlscf == NULL)) {
2692             goto fail;
2693         }
2694 
2695         tlscf->no_wait_shutdown = 1;
2696         tls->socket_conf->tls = tlscf;
2697 
2698     } else {
2699         tlscf = tls->socket_conf->tls;
2700     }
2701 
2702     tls->tls_init->conf = tlscf;
2703 
2704     bundle = nxt_mp_get(mp, sizeof(nxt_tls_bundle_conf_t));
2705     if (nxt_slow_path(bundle == NULL)) {
2706         goto fail;
2707     }
2708 
2709     if (nxt_slow_path(nxt_str_dup(mp, &bundle->name, &tls->name) == NULL)) {
2710         goto fail;
2711     }
2712 
2713     bundle->chain_file = msg->fd[0];
2714     bundle->next = tlscf->bundle;
2715     tlscf->bundle = bundle;
2716 
2717     ret = task->thread->runtime->tls->server_init(task, mp, tls->tls_init,
2718                                                   tls->last);
2719     if (nxt_slow_path(ret != NXT_OK)) {
2720         goto fail;
2721     }
2722 
2723     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2724                        nxt_router_conf_apply, task, tmcf, NULL);
2725     return;
2726 
2727 fail:
2728 
2729     nxt_router_conf_error(task, tmcf);
2730 }
2731 
2732 #endif
2733 
2734 
2735 static void
2736 nxt_router_app_rpc_create(nxt_task_t *task,
2737     nxt_router_temp_conf_t *tmcf, nxt_app_t *app)
2738 {
2739     size_t         size;
2740     uint32_t       stream;
2741     nxt_int_t      ret;
2742     nxt_buf_t      *b;
2743     nxt_port_t     *main_port, *router_port;
2744     nxt_runtime_t  *rt;
2745     nxt_app_rpc_t  *rpc;
2746 
2747     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_app_rpc_t));
2748     if (rpc == NULL) {
2749         goto fail;
2750     }
2751 
2752     rpc->app = app;
2753     rpc->temp_conf = tmcf;
2754 
2755     nxt_debug(task, "app '%V' prefork", &app->name);
2756 
2757     size = app->name.length + 1 + app->conf.length;
2758 
2759     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2760     if (nxt_slow_path(b == NULL)) {
2761         goto fail;
2762     }
2763 
2764     b->completion_handler = nxt_buf_dummy_completion;
2765 
2766     nxt_buf_cpystr(b, &app->name);
2767     *b->mem.free++ = '\0';
2768     nxt_buf_cpystr(b, &app->conf);
2769 
2770     rt = task->thread->runtime;
2771     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2772     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2773 
2774     stream = nxt_port_rpc_register_handler(task, router_port,
2775                                            nxt_router_app_prefork_ready,
2776                                            nxt_router_app_prefork_error,
2777                                            -1, rpc);
2778     if (nxt_slow_path(stream == 0)) {
2779         goto fail;
2780     }
2781 
2782     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS,
2783                                 -1, stream, router_port->id, b);
2784 
2785     if (nxt_slow_path(ret != NXT_OK)) {
2786         nxt_port_rpc_cancel(task, router_port, stream);
2787         goto fail;
2788     }
2789 
2790     app->pending_processes++;
2791 
2792     return;
2793 
2794 fail:
2795 
2796     nxt_router_conf_error(task, tmcf);
2797 }
2798 
2799 
2800 static void
2801 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2802     void *data)
2803 {
2804     nxt_app_t           *app;
2805     nxt_port_t          *port;
2806     nxt_app_rpc_t       *rpc;
2807     nxt_event_engine_t  *engine;
2808 
2809     rpc = data;
2810     app = rpc->app;
2811 
2812     port = msg->u.new_port;
2813 
2814     nxt_assert(port != NULL);
2815     nxt_assert(port->type == NXT_PROCESS_APP);
2816     nxt_assert(port->id == 0);
2817 
2818     port->app = app;
2819     port->main_app_port = port;
2820 
2821     app->pending_processes--;
2822     app->processes++;
2823     app->idle_processes++;
2824 
2825     engine = task->thread->engine;
2826 
2827     nxt_queue_insert_tail(&app->ports, &port->app_link);
2828     nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
2829 
2830     nxt_debug(task, "app '%V' move new port %PI:%d to spare_ports",
2831               &app->name, port->pid, port->id);
2832 
2833     nxt_port_hash_add(&app->port_hash, port);
2834     app->port_hash_count++;
2835 
2836     port->idle_start = 0;
2837 
2838     nxt_port_inc_use(port);
2839 
2840     nxt_router_app_shared_port_send(task, port);
2841 
2842     nxt_work_queue_add(&engine->fast_work_queue,
2843                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2844 }
2845 
2846 
2847 static void
2848 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2849     void *data)
2850 {
2851     nxt_app_t               *app;
2852     nxt_app_rpc_t           *rpc;
2853     nxt_router_temp_conf_t  *tmcf;
2854 
2855     rpc = data;
2856     app = rpc->app;
2857     tmcf = rpc->temp_conf;
2858 
2859     nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"",
2860             &app->name);
2861 
2862     app->pending_processes--;
2863 
2864     nxt_router_conf_error(task, tmcf);
2865 }
2866 
2867 
2868 static nxt_int_t
2869 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
2870     nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
2871 {
2872     nxt_int_t                 ret;
2873     nxt_uint_t                n, threads;
2874     nxt_queue_link_t          *qlk;
2875     nxt_router_engine_conf_t  *recf;
2876 
2877     threads = tmcf->router_conf->threads;
2878 
2879     tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
2880                                      sizeof(nxt_router_engine_conf_t));
2881     if (nxt_slow_path(tmcf->engines == NULL)) {
2882         return NXT_ERROR;
2883     }
2884 
2885     n = 0;
2886 
2887     for (qlk = nxt_queue_first(&router->engines);
2888          qlk != nxt_queue_tail(&router->engines);
2889          qlk = nxt_queue_next(qlk))
2890     {
2891         recf = nxt_array_zero_add(tmcf->engines);
2892         if (nxt_slow_path(recf == NULL)) {
2893             return NXT_ERROR;
2894         }
2895 
2896         recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0);
2897 
2898         if (n < threads) {
2899             recf->action = NXT_ROUTER_ENGINE_KEEP;
2900             ret = nxt_router_engine_conf_update(tmcf, recf);
2901 
2902         } else {
2903             recf->action = NXT_ROUTER_ENGINE_DELETE;
2904             ret = nxt_router_engine_conf_delete(tmcf, recf);
2905         }
2906 
2907         if (nxt_slow_path(ret != NXT_OK)) {
2908             return ret;
2909         }
2910 
2911         n++;
2912     }
2913 
2914     tmcf->new_threads = n;
2915 
2916     while (n < threads) {
2917         recf = nxt_array_zero_add(tmcf->engines);
2918         if (nxt_slow_path(recf == NULL)) {
2919             return NXT_ERROR;
2920         }
2921 
2922         recf->action = NXT_ROUTER_ENGINE_ADD;
2923 
2924         recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
2925         if (nxt_slow_path(recf->engine == NULL)) {
2926             return NXT_ERROR;
2927         }
2928 
2929         ret = nxt_router_engine_conf_create(tmcf, recf);
2930         if (nxt_slow_path(ret != NXT_OK)) {
2931             return ret;
2932         }
2933 
2934         n++;
2935     }
2936 
2937     return NXT_OK;
2938 }
2939 
2940 
2941 static nxt_int_t
2942 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
2943     nxt_router_engine_conf_t *recf)
2944 {
2945     nxt_int_t  ret;
2946 
2947     ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
2948                                           nxt_router_listen_socket_create);
2949     if (nxt_slow_path(ret != NXT_OK)) {
2950         return ret;
2951     }
2952 
2953     ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
2954                                           nxt_router_listen_socket_create);
2955     if (nxt_slow_path(ret != NXT_OK)) {
2956         return ret;
2957     }
2958 
2959     return ret;
2960 }
2961 
2962 
2963 static nxt_int_t
2964 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
2965     nxt_router_engine_conf_t *recf)
2966 {
2967     nxt_int_t  ret;
2968 
2969     ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
2970                                           nxt_router_listen_socket_create);
2971     if (nxt_slow_path(ret != NXT_OK)) {
2972         return ret;
2973     }
2974 
2975     ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
2976                                           nxt_router_listen_socket_update);
2977     if (nxt_slow_path(ret != NXT_OK)) {
2978         return ret;
2979     }
2980 
2981     ret = nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
2982     if (nxt_slow_path(ret != NXT_OK)) {
2983         return ret;
2984     }
2985 
2986     return ret;
2987 }
2988 
2989 
2990 static nxt_int_t
2991 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
2992     nxt_router_engine_conf_t *recf)
2993 {
2994     nxt_int_t  ret;
2995 
2996     ret = nxt_router_engine_quit(tmcf, recf);
2997     if (nxt_slow_path(ret != NXT_OK)) {
2998         return ret;
2999     }
3000 
3001     ret = nxt_router_engine_joints_delete(tmcf, recf, &updating_sockets);
3002     if (nxt_slow_path(ret != NXT_OK)) {
3003         return ret;
3004     }
3005 
3006     return nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
3007 }
3008 
3009 
3010 static nxt_int_t
3011 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
3012     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
3013     nxt_work_handler_t handler)
3014 {
3015     nxt_int_t                ret;
3016     nxt_joint_job_t          *job;
3017     nxt_queue_link_t         *qlk;
3018     nxt_socket_conf_t        *skcf;
3019     nxt_socket_conf_joint_t  *joint;
3020 
3021     for (qlk = nxt_queue_first(sockets);
3022          qlk != nxt_queue_tail(sockets);
3023          qlk = nxt_queue_next(qlk))
3024     {
3025         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3026         if (nxt_slow_path(job == NULL)) {
3027             return NXT_ERROR;
3028         }
3029 
3030         job->work.next = recf->jobs;
3031         recf->jobs = &job->work;
3032 
3033         job->task = tmcf->engine->task;
3034         job->work.handler = handler;
3035         job->work.task = &job->task;
3036         job->work.obj = job;
3037         job->tmcf = tmcf;
3038 
3039         tmcf->count++;
3040 
3041         joint = nxt_mp_alloc(tmcf->router_conf->mem_pool,
3042                              sizeof(nxt_socket_conf_joint_t));
3043         if (nxt_slow_path(joint == NULL)) {
3044             return NXT_ERROR;
3045         }
3046 
3047         job->work.data = joint;
3048 
3049         ret = nxt_upstreams_joint_create(tmcf, &joint->upstreams);
3050         if (nxt_slow_path(ret != NXT_OK)) {
3051             return ret;
3052         }
3053 
3054         joint->count = 1;
3055 
3056         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
3057         skcf->count++;
3058         joint->socket_conf = skcf;
3059 
3060         joint->engine = recf->engine;
3061     }
3062 
3063     return NXT_OK;
3064 }
3065 
3066 
3067 static nxt_int_t
3068 nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
3069     nxt_router_engine_conf_t *recf)
3070 {
3071     nxt_joint_job_t  *job;
3072 
3073     job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3074     if (nxt_slow_path(job == NULL)) {
3075         return NXT_ERROR;
3076     }
3077 
3078     job->work.next = recf->jobs;
3079     recf->jobs = &job->work;
3080 
3081     job->task = tmcf->engine->task;
3082     job->work.handler = nxt_router_worker_thread_quit;
3083     job->work.task = &job->task;
3084     job->work.obj = NULL;
3085     job->work.data = NULL;
3086     job->tmcf = NULL;
3087 
3088     return NXT_OK;
3089 }
3090 
3091 
3092 static nxt_int_t
3093 nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
3094     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets)
3095 {
3096     nxt_joint_job_t   *job;
3097     nxt_queue_link_t  *qlk;
3098 
3099     for (qlk = nxt_queue_first(sockets);
3100          qlk != nxt_queue_tail(sockets);
3101          qlk = nxt_queue_next(qlk))
3102     {
3103         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3104         if (nxt_slow_path(job == NULL)) {
3105             return NXT_ERROR;
3106         }
3107 
3108         job->work.next = recf->jobs;
3109         recf->jobs = &job->work;
3110 
3111         job->task = tmcf->engine->task;
3112         job->work.handler = nxt_router_listen_socket_delete;
3113         job->work.task = &job->task;
3114         job->work.obj = job;
3115         job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
3116         job->tmcf = tmcf;
3117 
3118         tmcf->count++;
3119     }
3120 
3121     return NXT_OK;
3122 }
3123 
3124 
3125 static nxt_int_t
3126 nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
3127     nxt_router_temp_conf_t *tmcf)
3128 {
3129     nxt_int_t                 ret;
3130     nxt_uint_t                i, threads;
3131     nxt_router_engine_conf_t  *recf;
3132 
3133     recf = tmcf->engines->elts;
3134     threads = tmcf->router_conf->threads;
3135 
3136     for (i = tmcf->new_threads; i < threads; i++) {
3137         ret = nxt_router_thread_create(task, rt, recf[i].engine);
3138         if (nxt_slow_path(ret != NXT_OK)) {
3139             return ret;
3140         }
3141     }
3142 
3143     return NXT_OK;
3144 }
3145 
3146 
3147 static nxt_int_t
3148 nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
3149     nxt_event_engine_t *engine)
3150 {
3151     nxt_int_t            ret;
3152     nxt_thread_link_t    *link;
3153     nxt_thread_handle_t  handle;
3154 
3155     link = nxt_zalloc(sizeof(nxt_thread_link_t));
3156 
3157     if (nxt_slow_path(link == NULL)) {
3158         return NXT_ERROR;
3159     }
3160 
3161     link->start = nxt_router_thread_start;
3162     link->engine = engine;
3163     link->work.handler = nxt_router_thread_exit_handler;
3164     link->work.task = task;
3165     link->work.data = link;
3166 
3167     nxt_queue_insert_tail(&rt->engines, &engine->link);
3168 
3169     ret = nxt_thread_create(&handle, link);
3170 
3171     if (nxt_slow_path(ret != NXT_OK)) {
3172         nxt_queue_remove(&engine->link);
3173     }
3174 
3175     return ret;
3176 }
3177 
3178 
3179 static void
3180 nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
3181     nxt_router_temp_conf_t *tmcf)
3182 {
3183     nxt_app_t  *app;
3184 
3185     nxt_queue_each(app, &router->apps, nxt_app_t, link) {
3186 
3187         nxt_router_app_unlink(task, app);
3188 
3189     } nxt_queue_loop;
3190 
3191     nxt_queue_add(&router->apps, &tmcf->previous);
3192     nxt_queue_add(&router->apps, &tmcf->apps);
3193 }
3194 
3195 
3196 static void
3197 nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf)
3198 {
3199     nxt_uint_t                n;
3200     nxt_event_engine_t        *engine;
3201     nxt_router_engine_conf_t  *recf;
3202 
3203     recf = tmcf->engines->elts;
3204 
3205     for (n = tmcf->engines->nelts; n != 0; n--) {
3206         engine = recf->engine;
3207 
3208         switch (recf->action) {
3209 
3210         case NXT_ROUTER_ENGINE_KEEP:
3211             break;
3212 
3213         case NXT_ROUTER_ENGINE_ADD:
3214             nxt_queue_insert_tail(&router->engines, &engine->link0);
3215             break;
3216 
3217         case NXT_ROUTER_ENGINE_DELETE:
3218             nxt_queue_remove(&engine->link0);
3219             break;
3220         }
3221 
3222         nxt_router_engine_post(engine, recf->jobs);
3223 
3224         recf++;
3225     }
3226 }
3227 
3228 
3229 static void
3230 nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs)
3231 {
3232     nxt_work_t  *work, *next;
3233 
3234     for (work = jobs; work != NULL; work = next) {
3235         next = work->next;
3236         work->next = NULL;
3237 
3238         nxt_event_engine_post(engine, work);
3239     }
3240 }
3241 
3242 
3243 static nxt_port_handlers_t  nxt_router_app_port_handlers = {
3244     .rpc_error       = nxt_port_rpc_handler,
3245     .mmap            = nxt_port_mmap_handler,
3246     .data            = nxt_port_rpc_handler,
3247     .oosm            = nxt_router_oosm_handler,
3248     .req_headers_ack = nxt_port_rpc_handler,
3249 };
3250 
3251 
3252 static void
3253 nxt_router_thread_start(void *data)
3254 {
3255     nxt_int_t           ret;
3256     nxt_port_t          *port;
3257     nxt_task_t          *task;
3258     nxt_work_t          *work;
3259     nxt_thread_t        *thread;
3260     nxt_thread_link_t   *link;
3261     nxt_event_engine_t  *engine;
3262 
3263     link = data;
3264     engine = link->engine;
3265     task = &engine->task;
3266 
3267     thread = nxt_thread();
3268 
3269     nxt_event_engine_thread_adopt(engine);
3270 
3271     /* STUB */
3272     thread->runtime = engine->task.thread->runtime;
3273 
3274     engine->task.thread = thread;
3275     engine->task.log = thread->log;
3276     thread->engine = engine;
3277     thread->task = &engine->task;
3278 #if 0
3279     thread->fiber = &engine->fibers->fiber;
3280 #endif
3281 
3282     engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
3283     if (nxt_slow_path(engine->mem_pool == NULL)) {
3284         return;
3285     }
3286 
3287     port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid,
3288                         NXT_PROCESS_ROUTER);
3289     if (nxt_slow_path(port == NULL)) {
3290         return;
3291     }
3292 
3293     ret = nxt_port_socket_init(task, port, 0);
3294     if (nxt_slow_path(ret != NXT_OK)) {
3295         nxt_port_use(task, port, -1);
3296         return;
3297     }
3298 
3299     ret = nxt_router_port_queue_init(task, port);
3300     if (nxt_slow_path(ret != NXT_OK)) {
3301         nxt_port_use(task, port, -1);
3302         return;
3303     }
3304 
3305     engine->port = port;
3306 
3307     nxt_port_enable(task, port, &nxt_router_app_port_handlers);
3308 
3309     work = nxt_zalloc(sizeof(nxt_work_t));
3310     if (nxt_slow_path(work == NULL)) {
3311         return;
3312     }
3313 
3314     work->handler = nxt_router_rt_add_port;
3315     work->task = link->work.task;
3316     work->obj = work;
3317     work->data = port;
3318 
3319     nxt_event_engine_post(link->work.task->thread->engine, work);
3320 
3321     nxt_event_engine_start(engine);
3322 }
3323 
3324 
3325 static void
3326 nxt_router_rt_add_port(nxt_task_t *task, void *obj, void *data)
3327 {
3328     nxt_int_t      res;
3329     nxt_port_t     *port;
3330     nxt_runtime_t  *rt;
3331 
3332     rt = task->thread->runtime;
3333     port = data;
3334 
3335     nxt_free(obj);
3336 
3337     res = nxt_port_hash_add(&rt->ports, port);
3338 
3339     if (nxt_fast_path(res == NXT_OK)) {
3340         nxt_port_use(task, port, 1);
3341     }
3342 }
3343 
3344 
3345 static void
3346 nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data)
3347 {
3348     nxt_joint_job_t          *job;
3349     nxt_socket_conf_t        *skcf;
3350     nxt_listen_event_t       *lev;
3351     nxt_listen_socket_t      *ls;
3352     nxt_thread_spinlock_t    *lock;
3353     nxt_socket_conf_joint_t  *joint;
3354 
3355     job = obj;
3356     joint = data;
3357 
3358     nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link);
3359 
3360     skcf = joint->socket_conf;
3361     ls = skcf->listen;
3362 
3363     lev = nxt_listen_event(task, ls);
3364     if (nxt_slow_path(lev == NULL)) {
3365         nxt_router_listen_socket_release(task, skcf);
3366         return;
3367     }
3368 
3369     lev->socket.data = joint;
3370 
3371     lock = &skcf->router_conf->router->lock;
3372 
3373     nxt_thread_spin_lock(lock);
3374     ls->count++;
3375     nxt_thread_spin_unlock(lock);
3376 
3377     job->work.next = NULL;
3378     job->work.handler = nxt_router_conf_wait;
3379 
3380     nxt_event_engine_post(job->tmcf->engine, &job->work);
3381 }
3382 
3383 
3384 nxt_inline nxt_listen_event_t *
3385 nxt_router_listen_event(nxt_queue_t *listen_connections,
3386     nxt_socket_conf_t *skcf)
3387 {
3388     nxt_socket_t        fd;
3389     nxt_queue_link_t    *qlk;
3390     nxt_listen_event_t  *lev;
3391 
3392     fd = skcf->listen->socket;
3393 
3394     for (qlk = nxt_queue_first(listen_connections);
3395          qlk != nxt_queue_tail(listen_connections);
3396          qlk = nxt_queue_next(qlk))
3397     {
3398         lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link);
3399 
3400         if (fd == lev->socket.fd) {
3401             return lev;
3402         }
3403     }
3404 
3405     return NULL;
3406 }
3407 
3408 
3409 static void
3410 nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data)
3411 {
3412     nxt_joint_job_t          *job;
3413     nxt_event_engine_t       *engine;
3414     nxt_listen_event_t       *lev;
3415     nxt_socket_conf_joint_t  *joint, *old;
3416 
3417     job = obj;
3418     joint = data;
3419 
3420     engine = task->thread->engine;
3421 
3422     nxt_queue_insert_tail(&engine->joints, &joint->link);
3423 
3424     lev = nxt_router_listen_event(&engine->listen_connections,
3425                                   joint->socket_conf);
3426 
3427     old = lev->socket.data;
3428     lev->socket.data = joint;
3429     lev->listen = joint->socket_conf->listen;
3430 
3431     job->work.next = NULL;
3432     job->work.handler = nxt_router_conf_wait;
3433 
3434     nxt_event_engine_post(job->tmcf->engine, &job->work);
3435 
3436     /*
3437      * The task is allocated from configuration temporary
3438      * memory pool so it can be freed after engine post operation.
3439      */
3440 
3441     nxt_router_conf_release(&engine->task, old);
3442 }
3443 
3444 
3445 static void
3446 nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data)
3447 {
3448     nxt_socket_conf_t        *skcf;
3449     nxt_listen_event_t       *lev;
3450     nxt_event_engine_t       *engine;
3451     nxt_socket_conf_joint_t  *joint;
3452 
3453     skcf = data;
3454 
3455     engine = task->thread->engine;
3456 
3457     lev = nxt_router_listen_event(&engine->listen_connections, skcf);
3458 
3459     nxt_fd_event_delete(engine, &lev->socket);
3460 
3461     nxt_debug(task, "engine %p: listen socket delete: %d", engine,
3462               lev->socket.fd);
3463 
3464     joint = lev->socket.data;
3465     joint->close_job = obj;
3466 
3467     lev->timer.handler = nxt_router_listen_socket_close;
3468     lev->timer.work_queue = &engine->fast_work_queue;
3469 
3470     nxt_timer_add(engine, &lev->timer, 0);
3471 }
3472 
3473 
3474 static void
3475 nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, void *data)
3476 {
3477     nxt_event_engine_t  *engine;
3478 
3479     nxt_debug(task, "router worker thread quit");
3480 
3481     engine = task->thread->engine;
3482 
3483     engine->shutdown = 1;
3484 
3485     if (nxt_queue_is_empty(&engine->joints)) {
3486         nxt_thread_exit(task->thread);
3487     }
3488 }
3489 
3490 
3491 static void
3492 nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data)
3493 {
3494     nxt_timer_t              *timer;
3495     nxt_joint_job_t          *job;
3496     nxt_listen_event_t       *lev;
3497     nxt_socket_conf_joint_t  *joint;
3498 
3499     timer = obj;
3500     lev = nxt_timer_data(timer, nxt_listen_event_t, timer);
3501 
3502     nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine,
3503               lev->socket.fd);
3504 
3505     nxt_queue_remove(&lev->link);
3506 
3507     joint = lev->socket.data;
3508     lev->socket.data = NULL;
3509 
3510     /* 'task' refers to lev->task and we cannot use after nxt_free() */
3511     task = &task->thread->engine->task;
3512 
3513     nxt_router_listen_socket_release(task, joint->socket_conf);
3514 
3515     job = joint->close_job;
3516     job->work.next = NULL;
3517     job->work.handler = nxt_router_conf_wait;
3518 
3519     nxt_event_engine_post(job->tmcf->engine, &job->work);
3520 
3521     nxt_router_listen_event_release(task, lev, joint);
3522 }
3523 
3524 
3525 static void
3526 nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf)
3527 {
3528     nxt_listen_socket_t    *ls;
3529     nxt_thread_spinlock_t  *lock;
3530 
3531     ls = skcf->listen;
3532     lock = &skcf->router_conf->router->lock;
3533 
3534     nxt_thread_spin_lock(lock);
3535 
3536     nxt_debug(task, "engine %p: listen socket release: ls->count %D",
3537               task->thread->engine, ls->count);
3538 
3539     if (--ls->count != 0) {
3540         ls = NULL;
3541     }
3542 
3543     nxt_thread_spin_unlock(lock);
3544 
3545     if (ls != NULL) {
3546         nxt_socket_close(task, ls->socket);
3547         nxt_free(ls);
3548     }
3549 }
3550 
3551 
3552 void
3553 nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev,
3554     nxt_socket_conf_joint_t *joint)
3555 {
3556     nxt_event_engine_t  *engine;
3557 
3558     nxt_debug(task, "listen event count: %D", lev->count);
3559 
3560     engine = task->thread->engine;
3561 
3562     if (--lev->count == 0) {
3563         if (lev->next != NULL) {
3564             nxt_sockaddr_cache_free(engine, lev->next);
3565 
3566             nxt_conn_free(task, lev->next);
3567         }
3568 
3569         nxt_free(lev);
3570     }
3571 
3572     if (joint != NULL) {
3573         nxt_router_conf_release(task, joint);
3574     }
3575 
3576     if (engine->shutdown && nxt_queue_is_empty(&engine->joints)) {
3577         nxt_thread_exit(task->thread);
3578     }
3579 }
3580 
3581 
3582 void
3583 nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
3584 {
3585     nxt_socket_conf_t      *skcf;
3586     nxt_router_conf_t      *rtcf;
3587     nxt_thread_spinlock_t  *lock;
3588 
3589     nxt_debug(task, "conf joint %p count: %D", joint, joint->count);
3590 
3591     if (--joint->count != 0) {
3592         return;
3593     }
3594 
3595     nxt_queue_remove(&joint->link);
3596 
3597     /*
3598      * The joint content can not be safely used after the critical
3599      * section protected by the spinlock because its memory pool may
3600      * be already destroyed by another thread.
3601      */
3602     skcf = joint->socket_conf;
3603     rtcf = skcf->router_conf;
3604     lock = &rtcf->router->lock;
3605 
3606     nxt_thread_spin_lock(lock);
3607 
3608     nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count,
3609               rtcf, rtcf->count);
3610 
3611     if (--skcf->count != 0) {
3612         skcf = NULL;
3613         rtcf = NULL;
3614 
3615     } else {
3616         nxt_queue_remove(&skcf->link);
3617 
3618         if (--rtcf->count != 0) {
3619             rtcf = NULL;
3620         }
3621     }
3622 
3623     nxt_thread_spin_unlock(lock);
3624 
3625 #if (NXT_TLS)
3626     if (skcf != NULL && skcf->tls != NULL) {
3627         task->thread->runtime->tls->server_free(task, skcf->tls);
3628     }
3629 #endif
3630 
3631     /* TODO remove engine->port */
3632 
3633     if (rtcf != NULL) {
3634         nxt_debug(task, "old router conf is destroyed");
3635 
3636         nxt_router_apps_hash_use(task, rtcf, -1);
3637 
3638         nxt_router_access_log_release(task, lock, rtcf->access_log);
3639 
3640         nxt_mp_thread_adopt(rtcf->mem_pool);
3641 
3642         nxt_mp_destroy(rtcf->mem_pool);
3643     }
3644 }
3645 
3646 
3647 static void
3648 nxt_router_access_log_writer(nxt_task_t *task, nxt_http_request_t *r,
3649     nxt_router_access_log_t *access_log)
3650 {
3651     size_t     size;
3652     u_char     *buf, *p;
3653     nxt_off_t  bytes;
3654 
3655     static nxt_time_string_t  date_cache = {
3656         (nxt_atomic_uint_t) -1,
3657         nxt_router_access_log_date,
3658         "%02d/%s/%4d:%02d:%02d:%02d %c%02d%02d",
3659         nxt_length("31/Dec/1986:19:40:00 +0300"),
3660         NXT_THREAD_TIME_LOCAL,
3661         NXT_THREAD_TIME_SEC,
3662     };
3663 
3664     size = r->remote->address_length
3665            + 6                  /* ' - - [' */
3666            + date_cache.size
3667            + 3                  /* '] "' */
3668            + r->method->length
3669            + 1                  /* space */
3670            + r->target.length
3671            + 1                  /* space */
3672            + r->version.length
3673            + 2                  /* '" ' */
3674            + 3                  /* status */
3675            + 1                  /* space */
3676            + NXT_OFF_T_LEN
3677            + 2                  /* ' "' */
3678            + (r->referer != NULL ? r->referer->value_length : 1)
3679            + 3                  /* '" "' */
3680            + (r->user_agent != NULL ? r->user_agent->value_length : 1)
3681            + 2                  /* '"\n' */
3682     ;
3683 
3684     buf = nxt_mp_nget(r->mem_pool, size);
3685     if (nxt_slow_path(buf == NULL)) {
3686         return;
3687     }
3688 
3689     p = nxt_cpymem(buf, nxt_sockaddr_address(r->remote),
3690                    r->remote->address_length);
3691 
3692     p = nxt_cpymem(p, " - - [", 6);
3693 
3694     p = nxt_thread_time_string(task->thread, &date_cache, p);
3695 
3696     p = nxt_cpymem(p, "] \"", 3);
3697 
3698     if (r->method->length != 0) {
3699         p = nxt_cpymem(p, r->method->start, r->method->length);
3700 
3701         if (r->target.length != 0) {
3702             *p++ = ' ';
3703             p = nxt_cpymem(p, r->target.start, r->target.length);
3704 
3705             if (r->version.length != 0) {
3706                 *p++ = ' ';
3707                 p = nxt_cpymem(p, r->version.start, r->version.length);
3708             }
3709         }
3710 
3711     } else {
3712         *p++ = '-';
3713     }
3714 
3715     p = nxt_cpymem(p, "\" ", 2);
3716 
3717     p = nxt_sprintf(p, p + 3, "%03d", r->status);
3718 
3719     *p++ = ' ';
3720 
3721     bytes = nxt_http_proto[r->protocol].body_bytes_sent(task, r->proto);
3722 
3723     p = nxt_sprintf(p, p + NXT_OFF_T_LEN, "%O", bytes);
3724 
3725     p = nxt_cpymem(p, " \"", 2);
3726 
3727     if (r->referer != NULL) {
3728         p = nxt_cpymem(p, r->referer->value, r->referer->value_length);
3729 
3730     } else {
3731         *p++ = '-';
3732     }
3733 
3734     p = nxt_cpymem(p, "\" \"", 3);
3735 
3736     if (r->user_agent != NULL) {
3737         p = nxt_cpymem(p, r->user_agent->value, r->user_agent->value_length);
3738 
3739     } else {
3740         *p++ = '-';
3741     }
3742 
3743     p = nxt_cpymem(p, "\"\n", 2);
3744 
3745     nxt_fd_write(access_log->fd, buf, p - buf);
3746 }
3747 
3748 
3749 static u_char *
3750 nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, struct tm *tm,
3751     size_t size, const char *format)
3752 {
3753     u_char  sign;
3754     time_t  gmtoff;
3755 
3756     static const char  *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun",
3757                                     "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
3758 
3759     gmtoff = nxt_timezone(tm) / 60;
3760 
3761     if (gmtoff < 0) {
3762         gmtoff = -gmtoff;
3763         sign = '-';
3764 
3765     } else {
3766         sign = '+';
3767     }
3768 
3769     return nxt_sprintf(buf, buf + size, format,
3770                        tm->tm_mday, month[tm->tm_mon], tm->tm_year + 1900,
3771                        tm->tm_hour, tm->tm_min, tm->tm_sec,
3772                        sign, gmtoff / 60, gmtoff % 60);
3773 }
3774 
3775 
3776 static void
3777 nxt_router_access_log_open(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
3778 {
3779     uint32_t                 stream;
3780     nxt_int_t                ret;
3781     nxt_buf_t                *b;
3782     nxt_port_t               *main_port, *router_port;
3783     nxt_runtime_t            *rt;
3784     nxt_router_access_log_t  *access_log;
3785 
3786     access_log = tmcf->router_conf->access_log;
3787 
3788     b = nxt_buf_mem_alloc(tmcf->mem_pool, access_log->path.length + 1, 0);
3789     if (nxt_slow_path(b == NULL)) {
3790         goto fail;
3791     }
3792 
3793     b->completion_handler = nxt_buf_dummy_completion;
3794 
3795     nxt_buf_cpystr(b, &access_log->path);
3796     *b->mem.free++ = '\0';
3797 
3798     rt = task->thread->runtime;
3799     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
3800     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
3801 
3802     stream = nxt_port_rpc_register_handler(task, router_port,
3803                                            nxt_router_access_log_ready,
3804                                            nxt_router_access_log_error,
3805                                            -1, tmcf);
3806     if (nxt_slow_path(stream == 0)) {
3807         goto fail;
3808     }
3809 
3810     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1,
3811                                 stream, router_port->id, b);
3812 
3813     if (nxt_slow_path(ret != NXT_OK)) {
3814         nxt_port_rpc_cancel(task, router_port, stream);
3815         goto fail;
3816     }
3817 
3818     return;
3819 
3820 fail:
3821 
3822     nxt_router_conf_error(task, tmcf);
3823 }
3824 
3825 
3826 static void
3827 nxt_router_access_log_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3828     void *data)
3829 {
3830     nxt_router_temp_conf_t   *tmcf;
3831     nxt_router_access_log_t  *access_log;
3832 
3833     tmcf = data;
3834 
3835     access_log = tmcf->router_conf->access_log;
3836 
3837     access_log->fd = msg->fd[0];
3838 
3839     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
3840                        nxt_router_conf_apply, task, tmcf, NULL);
3841 }
3842 
3843 
3844 static void
3845 nxt_router_access_log_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3846     void *data)
3847 {
3848     nxt_router_temp_conf_t  *tmcf;
3849 
3850     tmcf = data;
3851 
3852     nxt_router_conf_error(task, tmcf);
3853 }
3854 
3855 
3856 static void
3857 nxt_router_access_log_release(nxt_task_t *task, nxt_thread_spinlock_t *lock,
3858     nxt_router_access_log_t *access_log)
3859 {
3860     if (access_log == NULL) {
3861         return;
3862     }
3863 
3864     nxt_thread_spin_lock(lock);
3865 
3866     if (--access_log->count != 0) {
3867         access_log = NULL;
3868     }
3869 
3870     nxt_thread_spin_unlock(lock);
3871 
3872     if (access_log != NULL) {
3873 
3874         if (access_log->fd != -1) {
3875             nxt_fd_close(access_log->fd);
3876         }
3877 
3878         nxt_free(access_log);
3879     }
3880 }
3881 
3882 
3883 typedef struct {
3884     nxt_mp_t                 *mem_pool;
3885     nxt_router_access_log_t  *access_log;
3886 } nxt_router_access_log_reopen_t;
3887 
3888 
3889 static void
3890 nxt_router_access_log_reopen_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
3891 {
3892     nxt_mp_t                        *mp;
3893     uint32_t                        stream;
3894     nxt_int_t                       ret;
3895     nxt_buf_t                       *b;
3896     nxt_port_t                      *main_port, *router_port;
3897     nxt_runtime_t                   *rt;
3898     nxt_router_access_log_t         *access_log;
3899     nxt_router_access_log_reopen_t  *reopen;
3900 
3901     access_log = nxt_router->access_log;
3902 
3903     if (access_log == NULL) {
3904         return;
3905     }
3906 
3907     mp = nxt_mp_create(1024, 128, 256, 32);
3908     if (nxt_slow_path(mp == NULL)) {
3909         return;
3910     }
3911 
3912     reopen = nxt_mp_get(mp, sizeof(nxt_router_access_log_reopen_t));
3913     if (nxt_slow_path(reopen == NULL)) {
3914         goto fail;
3915     }
3916 
3917     reopen->mem_pool = mp;
3918     reopen->access_log = access_log;
3919 
3920     b = nxt_buf_mem_alloc(mp, access_log->path.length + 1, 0);
3921     if (nxt_slow_path(b == NULL)) {
3922         goto fail;
3923     }
3924 
3925     b->completion_handler = nxt_router_access_log_reopen_completion;
3926 
3927     nxt_buf_cpystr(b, &access_log->path);
3928     *b->mem.free++ = '\0';
3929 
3930     rt = task->thread->runtime;
3931     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
3932     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
3933 
3934     stream = nxt_port_rpc_register_handler(task, router_port,
3935                                            nxt_router_access_log_reopen_ready,
3936                                            nxt_router_access_log_reopen_error,
3937                                            -1, reopen);
3938     if (nxt_slow_path(stream == 0)) {
3939         goto fail;
3940     }
3941 
3942     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1,
3943                                 stream, router_port->id, b);
3944 
3945     if (nxt_slow_path(ret != NXT_OK)) {
3946         nxt_port_rpc_cancel(task, router_port, stream);
3947         goto fail;
3948     }
3949 
3950     nxt_mp_retain(mp);
3951 
3952     return;
3953 
3954 fail:
3955 
3956     nxt_mp_destroy(mp);
3957 }
3958 
3959 
3960 static void
3961 nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj, void *data)
3962 {
3963     nxt_mp_t   *mp;
3964     nxt_buf_t  *b;
3965 
3966     b = obj;
3967     mp = b->data;
3968 
3969     nxt_mp_release(mp);
3970 }
3971 
3972 
3973 static void
3974 nxt_router_access_log_reopen_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3975     void *data)
3976 {
3977     nxt_router_access_log_t         *access_log;
3978     nxt_router_access_log_reopen_t  *reopen;
3979 
3980     reopen = data;
3981 
3982     access_log = reopen->access_log;
3983 
3984     if (access_log == nxt_router->access_log) {
3985 
3986         if (nxt_slow_path(dup2(msg->fd[0], access_log->fd) == -1)) {
3987             nxt_alert(task, "dup2(%FD, %FD) failed %E",
3988                       msg->fd[0], access_log->fd, nxt_errno);
3989         }
3990     }
3991 
3992     nxt_fd_close(msg->fd[0]);
3993     nxt_mp_release(reopen->mem_pool);
3994 }
3995 
3996 
3997 static void
3998 nxt_router_access_log_reopen_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3999     void *data)
4000 {
4001     nxt_router_access_log_reopen_t  *reopen;
4002 
4003     reopen = data;
4004 
4005     nxt_mp_release(reopen->mem_pool);
4006 }
4007 
4008 
4009 static void
4010 nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data)
4011 {
4012     nxt_port_t           *port;
4013     nxt_thread_link_t    *link;
4014     nxt_event_engine_t   *engine;
4015     nxt_thread_handle_t  handle;
4016 
4017     handle = (nxt_thread_handle_t) (uintptr_t) obj;
4018     link = data;
4019 
4020     nxt_thread_wait(handle);
4021 
4022     engine = link->engine;
4023 
4024     nxt_queue_remove(&engine->link);
4025 
4026     port = engine->port;
4027 
4028     // TODO notify all apps
4029 
4030     port->engine = task->thread->engine;
4031     nxt_mp_thread_adopt(port->mem_pool);
4032     nxt_port_use(task, port, -1);
4033 
4034     nxt_mp_thread_adopt(engine->mem_pool);
4035     nxt_mp_destroy(engine->mem_pool);
4036 
4037     nxt_event_engine_free(engine);
4038 
4039     nxt_free(link);
4040 }
4041 
4042 
4043 static void
4044 nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4045     void *data)
4046 {
4047     size_t                  b_size, count;
4048     nxt_int_t               ret;
4049     nxt_app_t               *app;
4050     nxt_buf_t               *b, *next;
4051     nxt_port_t              *app_port;
4052     nxt_unit_field_t        *f;
4053     nxt_http_field_t        *field;
4054     nxt_http_request_t      *r;
4055     nxt_unit_response_t     *resp;
4056     nxt_request_rpc_data_t  *req_rpc_data;
4057 
4058     req_rpc_data = data;
4059 
4060     r = req_rpc_data->request;
4061     if (nxt_slow_path(r == NULL)) {
4062         return;
4063     }
4064 
4065     if (r->error) {
4066         nxt_request_rpc_data_unlink(task, req_rpc_data);
4067         return;
4068     }
4069 
4070     app = req_rpc_data->app;
4071     nxt_assert(app != NULL);
4072 
4073     if (msg->port_msg.type == _NXT_PORT_MSG_REQ_HEADERS_ACK) {
4074         nxt_router_req_headers_ack_handler(task, msg, req_rpc_data);
4075 
4076         return;
4077     }
4078 
4079     b = (msg->size == 0) ? NULL : msg->buf;
4080 
4081     if (msg->port_msg.last != 0) {
4082         nxt_debug(task, "router data create last buf");
4083 
4084         nxt_buf_chain_add(&b, nxt_http_buf_last(r));
4085 
4086         req_rpc_data->rpc_cancel = 0;
4087 
4088         if (req_rpc_data->apr_action == NXT_APR_REQUEST_FAILED) {
4089             req_rpc_data->apr_action = NXT_APR_GOT_RESPONSE;
4090         }
4091 
4092         nxt_request_rpc_data_unlink(task, req_rpc_data);
4093 
4094     } else {
4095         if (app->timeout != 0) {
4096             r->timer.handler = nxt_router_app_timeout;
4097             r->timer_data = req_rpc_data;
4098             nxt_timer_add(task->thread->engine, &r->timer, app->timeout);
4099         }
4100     }
4101 
4102     if (b == NULL) {
4103         return;
4104     }
4105 
4106     if (msg->buf == b) {
4107         /* Disable instant buffer completion/re-using by port. */
4108         msg->buf = NULL;
4109     }
4110 
4111     if (r->header_sent) {
4112         nxt_buf_chain_add(&r->out, b);
4113         nxt_http_request_send_body(task, r, NULL);
4114 
4115     } else {
4116         b_size = nxt_buf_is_mem(b) ? nxt_buf_mem_used_size(&b->mem) : 0;
4117 
4118         if (nxt_slow_path(b_size < sizeof(nxt_unit_response_t))) {
4119             nxt_alert(task, "response buffer too small: %z", b_size);
4120             goto fail;
4121         }
4122 
4123         resp = (void *) b->mem.pos;
4124         count = (b_size - sizeof(nxt_unit_response_t))
4125                     / sizeof(nxt_unit_field_t);
4126 
4127         if (nxt_slow_path(count < resp->fields_count)) {
4128             nxt_alert(task, "response buffer too small for fields count: %D",
4129                       resp->fields_count);
4130             goto fail;
4131         }
4132 
4133         field = NULL;
4134 
4135         for (f = resp->fields; f < resp->fields + resp->fields_count; f++) {
4136             if (f->skip) {
4137                 continue;
4138             }
4139 
4140             field = nxt_list_add(r->resp.fields);
4141 
4142             if (nxt_slow_path(field == NULL)) {
4143                 goto fail;
4144             }
4145 
4146             field->hash = f->hash;
4147             field->skip = 0;
4148             field->hopbyhop = 0;
4149 
4150             field->name_length = f->name_length;
4151             field->value_length = f->value_length;
4152             field->name = nxt_unit_sptr_get(&f->name);
4153             field->value = nxt_unit_sptr_get(&f->value);
4154 
4155             ret = nxt_http_field_process(field, &nxt_response_fields_hash, r);
4156             if (nxt_slow_path(ret != NXT_OK)) {
4157                 goto fail;
4158             }
4159 
4160             nxt_debug(task, "header%s: %*s: %*s",
4161                       (field->skip ? " skipped" : ""),
4162                       (size_t) field->name_length, field->name,
4163                       (size_t) field->value_length, field->value);
4164 
4165             if (field->skip) {
4166                 r->resp.fields->last->nelts--;
4167             }
4168         }
4169 
4170         r->status = resp->status;
4171 
4172         if (resp->piggyback_content_length != 0) {
4173             b->mem.pos = nxt_unit_sptr_get(&resp->piggyback_content);
4174             b->mem.free = b->mem.pos + resp->piggyback_content_length;
4175 
4176         } else {
4177             b->mem.pos = b->mem.free;
4178         }
4179 
4180         if (nxt_buf_mem_used_size(&b->mem) == 0) {
4181             next = b->next;
4182             b->next = NULL;
4183 
4184             nxt_work_queue_add(&task->thread->engine->fast_work_queue,
4185                                b->completion_handler, task, b, b->parent);
4186 
4187             b = next;
4188         }
4189 
4190         if (b != NULL) {
4191             nxt_buf_chain_add(&r->out, b);
4192         }
4193 
4194         nxt_http_request_header_send(task, r, nxt_http_request_send_body, NULL);
4195 
4196         if (r->websocket_handshake
4197             && r->status == NXT_HTTP_SWITCHING_PROTOCOLS)
4198         {
4199             app_port = req_rpc_data->app_port;
4200             if (nxt_slow_path(app_port == NULL)) {
4201                 goto fail;
4202             }
4203 
4204             nxt_thread_mutex_lock(&app->mutex);
4205 
4206             app_port->main_app_port->active_websockets++;
4207 
4208             nxt_thread_mutex_unlock(&app->mutex);
4209 
4210             nxt_router_app_port_release(task, app_port, NXT_APR_UPGRADE);
4211             req_rpc_data->apr_action = NXT_APR_CLOSE;
4212 
4213             nxt_debug(task, "stream #%uD upgrade", req_rpc_data->stream);
4214 
4215             r->state = &nxt_http_websocket;
4216 
4217         } else {
4218             r->state = &nxt_http_request_send_state;
4219         }
4220     }
4221 
4222     return;
4223 
4224 fail:
4225 
4226     nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
4227 
4228     nxt_request_rpc_data_unlink(task, req_rpc_data);
4229 }
4230 
4231 
4232 static void
4233 nxt_router_req_headers_ack_handler(nxt_task_t *task,
4234     nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data)
4235 {
4236     int                 res;
4237     nxt_app_t           *app;
4238     nxt_buf_t           *b;
4239     nxt_bool_t          start_process, unlinked;
4240     nxt_port_t          *app_port, *main_app_port, *idle_port;
4241     nxt_queue_link_t    *idle_lnk;
4242     nxt_http_request_t  *r;
4243 
4244     nxt_debug(task, "stream #%uD: got ack from %PI:%d",
4245               req_rpc_data->stream,
4246               msg->port_msg.pid, msg->port_msg.reply_port);
4247 
4248     nxt_port_rpc_ex_set_peer(task, msg->port, req_rpc_data,
4249                              msg->port_msg.pid);
4250 
4251     app = req_rpc_data->app;
4252     r = req_rpc_data->request;
4253 
4254     start_process = 0;
4255     unlinked = 0;
4256 
4257     nxt_thread_mutex_lock(&app->mutex);
4258 
4259     if (r->app_link.next != NULL) {
4260         nxt_queue_remove(&r->app_link);
4261         r->app_link.next = NULL;
4262 
4263         unlinked = 1;
4264     }
4265 
4266     app_port = nxt_port_hash_find(&app->port_hash, msg->port_msg.pid,
4267                                   msg->port_msg.reply_port);
4268     if (nxt_slow_path(app_port == NULL)) {
4269         nxt_thread_mutex_unlock(&app->mutex);
4270 
4271         nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
4272 
4273         if (unlinked) {
4274             nxt_mp_release(r->mem_pool);
4275         }
4276 
4277         return;
4278     }
4279 
4280     main_app_port = app_port->main_app_port;
4281 
4282     if (nxt_queue_chk_remove(&main_app_port->idle_link)) {
4283         app->idle_processes--;
4284 
4285         nxt_debug(task, "app '%V' move port %PI:%d out of %s (ack)",
4286                   &app->name, main_app_port->pid, main_app_port->id,
4287                   (main_app_port->idle_start ? "idle_ports" : "spare_ports"));
4288 
4289         /* Check port was in 'spare_ports' using idle_start field. */
4290         if (main_app_port->idle_start == 0
4291             && app->idle_processes >= app->spare_processes)
4292         {
4293             /*
4294              * If there is a vacant space in spare ports,
4295              * move the last idle to spare_ports.
4296              */
4297             nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4298 
4299             idle_lnk = nxt_queue_last(&app->idle_ports);
4300             idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link);
4301             nxt_queue_remove(idle_lnk);
4302 
4303             nxt_queue_insert_tail(&app->spare_ports, idle_lnk);
4304 
4305             idle_port->idle_start = 0;
4306 
4307             nxt_debug(task, "app '%V' move port %PI:%d from idle_ports "
4308                       "to spare_ports",
4309                       &app->name, idle_port->pid, idle_port->id);
4310         }
4311 
4312         if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) {
4313             app->pending_processes++;
4314             start_process = 1;
4315         }
4316     }
4317 
4318     main_app_port->active_requests++;
4319 
4320     nxt_port_inc_use(app_port);
4321 
4322     nxt_thread_mutex_unlock(&app->mutex);
4323 
4324     if (unlinked) {
4325         nxt_mp_release(r->mem_pool);
4326     }
4327 
4328     if (start_process) {
4329         nxt_router_start_app_process(task, app);
4330     }
4331 
4332     nxt_port_use(task, req_rpc_data->app_port, -1);
4333 
4334     req_rpc_data->app_port = app_port;
4335 
4336     b = req_rpc_data->msg_info.buf;
4337 
4338     if (b != NULL) {
4339         /* First buffer is already sent.  Start from second. */
4340         b = b->next;
4341 
4342         req_rpc_data->msg_info.buf->next = NULL;
4343     }
4344 
4345     if (req_rpc_data->msg_info.body_fd != -1 || b != NULL) {
4346         nxt_debug(task, "stream #%uD: send body fd %d", req_rpc_data->stream,
4347                   req_rpc_data->msg_info.body_fd);
4348 
4349         if (req_rpc_data->msg_info.body_fd != -1) {
4350             lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET);
4351         }
4352 
4353         res = nxt_port_socket_write(task, app_port, NXT_PORT_MSG_REQ_BODY,
4354                                     req_rpc_data->msg_info.body_fd,
4355                                     req_rpc_data->stream,
4356                                     task->thread->engine->port->id, b);
4357 
4358         if (nxt_slow_path(res != NXT_OK)) {
4359             nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
4360         }
4361     }
4362 
4363     if (app->timeout != 0) {
4364         r->timer.handler = nxt_router_app_timeout;
4365         r->timer_data = req_rpc_data;
4366         nxt_timer_add(task->thread->engine, &r->timer, app->timeout);
4367     }
4368 }
4369 
4370 
4371 static const nxt_http_request_state_t  nxt_http_request_send_state
4372     nxt_aligned(64) =
4373 {
4374     .error_handler = nxt_http_request_error_handler,
4375 };
4376 
4377 
4378 static void
4379 nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data)
4380 {
4381     nxt_buf_t           *out;
4382     nxt_http_request_t  *r;
4383 
4384     r = obj;
4385 
4386     out = r->out;
4387 
4388     if (out != NULL) {
4389         r->out = NULL;
4390         nxt_http_request_send(task, r, out);
4391     }
4392 }
4393 
4394 
4395 static void
4396 nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4397     void *data)
4398 {
4399     nxt_request_rpc_data_t  *req_rpc_data;
4400 
4401     req_rpc_data = data;
4402 
4403     req_rpc_data->rpc_cancel = 0;
4404 
4405     /* TODO cancel message and return if cancelled. */
4406     // nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream);
4407 
4408     if (req_rpc_data->request != NULL) {
4409         nxt_http_request_error(task, req_rpc_data->request,
4410                                NXT_HTTP_SERVICE_UNAVAILABLE);
4411     }
4412 
4413     nxt_request_rpc_data_unlink(task, req_rpc_data);
4414 }
4415 
4416 
4417 static void
4418 nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4419     void *data)
4420 {
4421     nxt_app_t            *app;
4422     nxt_bool_t           start_process;
4423     nxt_port_t           *port;
4424     nxt_app_joint_t      *app_joint;
4425     nxt_app_joint_rpc_t  *app_joint_rpc;
4426 
4427     nxt_assert(data != NULL);
4428 
4429     app_joint_rpc = data;
4430     app_joint = app_joint_rpc->app_joint;
4431     port = msg->u.new_port;
4432 
4433     nxt_assert(app_joint != NULL);
4434     nxt_assert(port != NULL);
4435     nxt_assert(port->type == NXT_PROCESS_APP);
4436     nxt_assert(port->id == 0);
4437 
4438     app = app_joint->app;
4439 
4440     nxt_router_app_joint_use(task, app_joint, -1);
4441 
4442     if (nxt_slow_path(app == NULL)) {
4443         nxt_debug(task, "new port ready for released app, send QUIT");
4444 
4445         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4446 
4447         return;
4448     }
4449 
4450     nxt_thread_mutex_lock(&app->mutex);
4451 
4452     nxt_assert(app->pending_processes != 0);
4453 
4454     app->pending_processes--;
4455 
4456     if (nxt_slow_path(app->generation != app_joint_rpc->generation)) {
4457         nxt_debug(task, "new port ready for restarted app, send QUIT");
4458 
4459         start_process = !task->thread->engine->shutdown
4460                         && nxt_router_app_can_start(app)
4461                         && nxt_router_app_need_start(app);
4462 
4463         if (start_process) {
4464             app->pending_processes++;
4465         }
4466 
4467         nxt_thread_mutex_unlock(&app->mutex);
4468 
4469         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4470 
4471         if (start_process) {
4472             nxt_router_start_app_process(task, app);
4473         }
4474 
4475         return;
4476     }
4477 
4478     port->app = app;
4479     port->main_app_port = port;
4480 
4481     app->processes++;
4482     nxt_port_hash_add(&app->port_hash, port);
4483     app->port_hash_count++;
4484 
4485     nxt_thread_mutex_unlock(&app->mutex);
4486 
4487     nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d",
4488               &app->name, port->pid, app->processes, app->pending_processes);
4489 
4490     nxt_router_app_shared_port_send(task, port);
4491 
4492     nxt_router_app_port_release(task, port, NXT_APR_NEW_PORT);
4493 }
4494 
4495 
4496 static nxt_int_t
4497 nxt_router_app_shared_port_send(nxt_task_t *task, nxt_port_t *app_port)
4498 {
4499     nxt_buf_t                *b;
4500     nxt_port_t               *port;
4501     nxt_port_msg_new_port_t  *msg;
4502 
4503     b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
4504                              sizeof(nxt_port_data_t));
4505     if (nxt_slow_path(b == NULL)) {
4506         return NXT_ERROR;
4507     }
4508 
4509     port = app_port->app->shared_port;
4510 
4511     nxt_debug(task, "send port %FD to process %PI",
4512               port->pair[0], app_port->pid);
4513 
4514     b->mem.free += sizeof(nxt_port_msg_new_port_t);
4515     msg = (nxt_port_msg_new_port_t *) b->mem.pos;
4516 
4517     msg->id = port->id;
4518     msg->pid = port->pid;
4519     msg->max_size = port->max_size;
4520     msg->max_share = port->max_share;
4521     msg->type = port->type;
4522 
4523     return nxt_port_socket_write2(task, app_port,
4524                                   NXT_PORT_MSG_NEW_PORT,
4525                                   port->pair[0], port->queue_fd,
4526                                   0, 0, b);
4527 }
4528 
4529 
4530 static void
4531 nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4532     void *data)
4533 {
4534     nxt_app_t            *app;
4535     nxt_app_joint_t      *app_joint;
4536     nxt_queue_link_t     *link;
4537     nxt_http_request_t   *r;
4538     nxt_app_joint_rpc_t  *app_joint_rpc;
4539 
4540     nxt_assert(data != NULL);
4541 
4542     app_joint_rpc = data;
4543     app_joint = app_joint_rpc->app_joint;
4544 
4545     nxt_assert(app_joint != NULL);
4546 
4547     app = app_joint->app;
4548 
4549     nxt_router_app_joint_use(task, app_joint, -1);
4550 
4551     if (nxt_slow_path(app == NULL)) {
4552         nxt_debug(task, "start error for released app");
4553 
4554         return;
4555     }
4556 
4557     nxt_debug(task, "app '%V' %p start error", &app->name, app);
4558 
4559     link = NULL;
4560 
4561     nxt_thread_mutex_lock(&app->mutex);
4562 
4563     nxt_assert(app->pending_processes != 0);
4564 
4565     app->pending_processes--;
4566 
4567     if (app->processes == 0 && !nxt_queue_is_empty(&app->ack_waiting_req)) {
4568         link = nxt_queue_first(&app->ack_waiting_req);
4569 
4570         nxt_queue_remove(link);
4571         link->next = NULL;
4572     }
4573 
4574     nxt_thread_mutex_unlock(&app->mutex);
4575 
4576     while (link != NULL) {
4577         r = nxt_container_of(link, nxt_http_request_t, app_link);
4578 
4579         nxt_event_engine_post(r->engine, &r->err_work);
4580 
4581         link = NULL;
4582 
4583         nxt_thread_mutex_lock(&app->mutex);
4584 
4585         if (app->processes == 0 && app->pending_processes == 0
4586             && !nxt_queue_is_empty(&app->ack_waiting_req))
4587         {
4588             link = nxt_queue_first(&app->ack_waiting_req);
4589 
4590             nxt_queue_remove(link);
4591             link->next = NULL;
4592         }
4593 
4594         nxt_thread_mutex_unlock(&app->mutex);
4595     }
4596 }
4597 
4598 
4599 
4600 nxt_inline nxt_port_t *
4601 nxt_router_app_get_port_for_quit(nxt_task_t *task, nxt_app_t *app)
4602 {
4603     nxt_port_t  *port;
4604 
4605     port = NULL;
4606 
4607     nxt_thread_mutex_lock(&app->mutex);
4608 
4609     nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
4610 
4611         /* Caller is responsible to decrease port use count. */
4612         nxt_queue_chk_remove(&port->app_link);
4613 
4614         if (nxt_queue_chk_remove(&port->idle_link)) {
4615             app->idle_processes--;
4616 
4617             nxt_debug(task, "app '%V' move port %PI:%d out of %s for quit",
4618                       &app->name, port->pid, port->id,
4619                       (port->idle_start ? "idle_ports" : "spare_ports"));
4620         }
4621 
4622         nxt_port_hash_remove(&app->port_hash, port);
4623         app->port_hash_count--;
4624 
4625         port->app = NULL;
4626         app->processes--;
4627 
4628         break;
4629 
4630     } nxt_queue_loop;
4631 
4632     nxt_thread_mutex_unlock(&app->mutex);
4633 
4634     return port;
4635 }
4636 
4637 
4638 static void
4639 nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i)
4640 {
4641     int  c;
4642 
4643     c = nxt_atomic_fetch_add(&app->use_count, i);
4644 
4645     if (i < 0 && c == -i) {
4646 
4647         if (task->thread->engine != app->engine) {
4648             nxt_event_engine_post(app->engine, &app->joint->free_app_work);
4649 
4650         } else {
4651             nxt_router_free_app(task, app->joint, NULL);
4652         }
4653     }
4654 }
4655 
4656 
4657 static void
4658 nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app)
4659 {
4660     nxt_debug(task, "app '%V' %p unlink", &app->name, app);
4661 
4662     nxt_queue_remove(&app->link);
4663 
4664     nxt_router_app_use(task, app, -1);
4665 }
4666 
4667 
4668 static void
4669 nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
4670     nxt_apr_action_t action)
4671 {
4672     int         inc_use;
4673     uint32_t    got_response, dec_requests;
4674     nxt_app_t   *app;
4675     nxt_bool_t  port_unchained, send_quit, adjust_idle_timer;
4676     nxt_port_t  *main_app_port;
4677 
4678     nxt_assert(port != NULL);
4679     nxt_assert(port->app != NULL);
4680 
4681     app = port->app;
4682 
4683     inc_use = 0;
4684     got_response = 0;
4685     dec_requests = 0;
4686 
4687     switch (action) {
4688     case NXT_APR_NEW_PORT:
4689         break;
4690     case NXT_APR_REQUEST_FAILED:
4691         dec_requests = 1;
4692         inc_use = -1;
4693         break;
4694     case NXT_APR_GOT_RESPONSE:
4695         got_response = 1;
4696         inc_use = -1;
4697         break;
4698     case NXT_APR_UPGRADE:
4699         got_response = 1;
4700         break;
4701     case NXT_APR_CLOSE:
4702         inc_use = -1;
4703         break;
4704     }
4705 
4706     nxt_debug(task, "app '%V' release port %PI:%d: %d %d", &app->name,
4707               port->pid, port->id,
4708               (int) inc_use, (int) got_response);
4709 
4710     if (port->id == NXT_SHARED_PORT_ID) {
4711         nxt_thread_mutex_lock(&app->mutex);
4712 
4713         app->active_requests -= got_response + dec_requests;
4714 
4715         nxt_thread_mutex_unlock(&app->mutex);
4716 
4717         goto adjust_use;
4718     }
4719 
4720     main_app_port = port->main_app_port;
4721 
4722     nxt_thread_mutex_lock(&app->mutex);
4723 
4724     main_app_port->app_responses += got_response;
4725     main_app_port->active_requests -= got_response + dec_requests;
4726     app->active_requests -= got_response + dec_requests;
4727 
4728     if (main_app_port->pair[1] != -1
4729         && (app->max_requests == 0
4730             || main_app_port->app_responses < app->max_requests))
4731     {
4732         if (main_app_port->app_link.next == NULL) {
4733             nxt_queue_insert_tail(&app->ports, &main_app_port->app_link);
4734 
4735             nxt_port_inc_use(main_app_port);
4736         }
4737     }
4738 
4739     send_quit = (app->max_requests > 0
4740                  && main_app_port->app_responses >= app->max_requests);
4741 
4742     if (send_quit) {
4743         port_unchained = nxt_queue_chk_remove(&main_app_port->app_link);
4744 
4745         nxt_port_hash_remove(&app->port_hash, main_app_port);
4746         app->port_hash_count--;
4747 
4748         main_app_port->app = NULL;
4749         app->processes--;
4750 
4751     } else {
4752         port_unchained = 0;
4753     }
4754 
4755     adjust_idle_timer = 0;
4756 
4757     if (main_app_port->pair[1] != -1 && !send_quit
4758         && main_app_port->active_requests == 0
4759         && main_app_port->active_websockets == 0
4760         && main_app_port->idle_link.next == NULL)
4761     {
4762         if (app->idle_processes == app->spare_processes
4763             && app->adjust_idle_work.data == NULL)
4764         {
4765             adjust_idle_timer = 1;
4766             app->adjust_idle_work.data = app;
4767             app->adjust_idle_work.next = NULL;
4768         }
4769 
4770         if (app->idle_processes < app->spare_processes) {
4771             nxt_queue_insert_tail(&app->spare_ports, &main_app_port->idle_link);
4772 
4773             nxt_debug(task, "app '%V' move port %PI:%d to spare_ports",
4774                       &app->name, main_app_port->pid, main_app_port->id);
4775         } else {
4776             nxt_queue_insert_tail(&app->idle_ports, &main_app_port->idle_link);
4777 
4778             main_app_port->idle_start = task->thread->engine->timers.now;
4779 
4780             nxt_debug(task, "app '%V' move port %PI:%d to idle_ports",
4781                       &app->name, main_app_port->pid, main_app_port->id);
4782         }
4783 
4784         app->idle_processes++;
4785     }
4786 
4787     nxt_thread_mutex_unlock(&app->mutex);
4788 
4789     if (adjust_idle_timer) {
4790         nxt_router_app_use(task, app, 1);
4791         nxt_event_engine_post(app->engine, &app->adjust_idle_work);
4792     }
4793 
4794     /* ? */
4795     if (main_app_port->pair[1] == -1) {
4796         nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)",
4797                   &app->name, app, main_app_port, main_app_port->pid);
4798 
4799         goto adjust_use;
4800     }
4801 
4802     if (send_quit) {
4803         nxt_debug(task, "app '%V' %p send QUIT to port", &app->name, app);
4804 
4805         nxt_port_socket_write(task, main_app_port, NXT_PORT_MSG_QUIT, -1, 0, 0,
4806                               NULL);
4807 
4808         if (port_unchained) {
4809             nxt_port_use(task, main_app_port, -1);
4810         }
4811 
4812         goto adjust_use;
4813     }
4814 
4815     nxt_debug(task, "app '%V' %p requests queue is empty, keep the port",
4816               &app->name, app);
4817 
4818 adjust_use:
4819 
4820     nxt_port_use(task, port, inc_use);
4821 }
4822 
4823 
4824 void
4825 nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port)
4826 {
4827     nxt_app_t         *app;
4828     nxt_bool_t        unchain, start_process;
4829     nxt_port_t        *idle_port;
4830     nxt_queue_link_t  *idle_lnk;
4831 
4832     app = port->app;
4833 
4834     nxt_assert(app != NULL);
4835 
4836     nxt_thread_mutex_lock(&app->mutex);
4837 
4838     nxt_port_hash_remove(&app->port_hash, port);
4839     app->port_hash_count--;
4840 
4841     if (port->id != 0) {
4842         nxt_thread_mutex_unlock(&app->mutex);
4843 
4844         nxt_debug(task, "app '%V' port (%PI, %d) closed", &app->name,
4845                   port->pid, port->id);
4846 
4847         return;
4848     }
4849 
4850     unchain = nxt_queue_chk_remove(&port->app_link);
4851 
4852     if (nxt_queue_chk_remove(&port->idle_link)) {
4853         app->idle_processes--;
4854 
4855         nxt_debug(task, "app '%V' move port %PI:%d out of %s before close",
4856                   &app->name, port->pid, port->id,
4857                   (port->idle_start ? "idle_ports" : "spare_ports"));
4858 
4859         if (port->idle_start == 0
4860             && app->idle_processes >= app->spare_processes)
4861         {
4862             nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4863 
4864             idle_lnk = nxt_queue_last(&app->idle_ports);
4865             idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link);
4866             nxt_queue_remove(idle_lnk);
4867 
4868             nxt_queue_insert_tail(&app->spare_ports, idle_lnk);
4869 
4870             idle_port->idle_start = 0;
4871 
4872             nxt_debug(task, "app '%V' move port %PI:%d from idle_ports "
4873                       "to spare_ports",
4874                       &app->name, idle_port->pid, idle_port->id);
4875         }
4876     }
4877 
4878     app->processes--;
4879 
4880     start_process = !task->thread->engine->shutdown
4881                     && nxt_router_app_can_start(app)
4882                     && nxt_router_app_need_start(app);
4883 
4884     if (start_process) {
4885         app->pending_processes++;
4886     }
4887 
4888     nxt_thread_mutex_unlock(&app->mutex);
4889 
4890     nxt_debug(task, "app '%V' pid %PI closed", &app->name, port->pid);
4891 
4892     if (unchain) {
4893         nxt_port_use(task, port, -1);
4894     }
4895 
4896     if (start_process) {
4897         nxt_router_start_app_process(task, app);
4898     }
4899 }
4900 
4901 
4902 static void
4903 nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data)
4904 {
4905     nxt_app_t           *app;
4906     nxt_bool_t          queued;
4907     nxt_port_t          *port;
4908     nxt_msec_t          timeout, threshold;
4909     nxt_queue_link_t    *lnk;
4910     nxt_event_engine_t  *engine;
4911 
4912     app = obj;
4913     queued = (data == app);
4914 
4915     nxt_debug(task, "nxt_router_adjust_idle_timer: app \"%V\", queued %b",
4916               &app->name, queued);
4917 
4918     engine = task->thread->engine;
4919 
4920     nxt_assert(app->engine == engine);
4921 
4922     threshold = engine->timers.now + app->joint->idle_timer.bias;
4923     timeout = 0;
4924 
4925     nxt_thread_mutex_lock(&app->mutex);
4926 
4927     if (queued) {
4928         app->adjust_idle_work.data = NULL;
4929     }
4930 
4931     nxt_debug(task, "app '%V' idle_processes %d, spare_processes %d",
4932               &app->name,
4933               (int) app->idle_processes, (int) app->spare_processes);
4934 
4935     while (app->idle_processes > app->spare_processes) {
4936 
4937         nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4938 
4939         lnk = nxt_queue_first(&app->idle_ports);
4940         port = nxt_queue_link_data(lnk, nxt_port_t, idle_link);
4941 
4942         timeout = port->idle_start + app->idle_timeout;
4943 
4944         nxt_debug(task, "app '%V' pid %PI, start %M, timeout %M, threshold %M",
4945                   &app->name, port->pid,
4946                   port->idle_start, timeout, threshold);
4947 
4948         if (timeout > threshold) {
4949             break;
4950         }
4951 
4952         nxt_queue_remove(lnk);
4953         lnk->next = NULL;
4954 
4955         nxt_debug(task, "app '%V' move port %PI:%d out of idle_ports (timeout)",
4956                   &app->name, port->pid, port->id);
4957 
4958         nxt_queue_chk_remove(&port->app_link);
4959 
4960         nxt_port_hash_remove(&app->port_hash, port);
4961         app->port_hash_count--;
4962 
4963         app->idle_processes--;
4964         app->processes--;
4965         port->app = NULL;
4966 
4967         nxt_thread_mutex_unlock(&app->mutex);
4968 
4969         nxt_debug(task, "app '%V' send QUIT to idle port %PI",
4970                   &app->name, port->pid);
4971 
4972         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4973 
4974         nxt_port_use(task, port, -1);
4975 
4976         nxt_thread_mutex_lock(&app->mutex);
4977     }
4978 
4979     nxt_thread_mutex_unlock(&app->mutex);
4980 
4981     if (timeout > threshold) {
4982         nxt_timer_add(engine, &app->joint->idle_timer, timeout - threshold);
4983 
4984     } else {
4985         nxt_timer_disable(engine, &app->joint->idle_timer);
4986     }
4987 
4988     if (queued) {
4989         nxt_router_app_use(task, app, -1);
4990     }
4991 }
4992 
4993 
4994 static void
4995 nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, void *data)
4996 {
4997     nxt_timer_t      *timer;
4998     nxt_app_joint_t  *app_joint;
4999 
5000     timer = obj;
5001     app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer);
5002 
5003     if (nxt_fast_path(app_joint->app != NULL)) {
5004         nxt_router_adjust_idle_timer(task, app_joint->app, NULL);
5005     }
5006 }
5007 
5008 
5009 static void
5010 nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, void *data)
5011 {
5012     nxt_timer_t      *timer;
5013     nxt_app_joint_t  *app_joint;
5014 
5015     timer = obj;
5016     app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer);
5017 
5018     nxt_router_app_joint_use(task, app_joint, -1);
5019 }
5020 
5021 
5022 static void
5023 nxt_router_free_app(nxt_task_t *task, void *obj, void *data)
5024 {
5025     nxt_app_t        *app;
5026     nxt_port_t       *port;
5027     nxt_app_joint_t  *app_joint;
5028 
5029     app_joint = obj;
5030     app = app_joint->app;
5031 
5032     for ( ;; ) {
5033         port = nxt_router_app_get_port_for_quit(task, app);
5034         if (port == NULL) {
5035             break;
5036         }
5037 
5038         nxt_debug(task, "send QUIT to app '%V' pid %PI", &app->name, port->pid);
5039 
5040         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
5041 
5042         nxt_port_use(task, port, -1);
5043     }
5044 
5045     nxt_thread_mutex_lock(&app->mutex);
5046 
5047     for ( ;; ) {
5048         port = nxt_port_hash_retrieve(&app->port_hash);
5049         if (port == NULL) {
5050             break;
5051         }
5052 
5053         app->port_hash_count--;
5054 
5055         port->app = NULL;
5056 
5057         nxt_port_close(task, port);
5058 
5059         nxt_port_use(task, port, -1);
5060     }
5061 
5062     nxt_thread_mutex_unlock(&app->mutex);
5063 
5064     nxt_assert(app->processes == 0);
5065     nxt_assert(app->active_requests == 0);
5066     nxt_assert(app->port_hash_count == 0);
5067     nxt_assert(app->idle_processes == 0);
5068     nxt_assert(nxt_queue_is_empty(&app->ports));
5069     nxt_assert(nxt_queue_is_empty(&app->spare_ports));
5070     nxt_assert(nxt_queue_is_empty(&app->idle_ports));
5071 
5072     nxt_port_mmaps_destroy(&app->outgoing, 1);
5073 
5074     nxt_thread_mutex_destroy(&app->outgoing.mutex);
5075 
5076     if (app->shared_port != NULL) {
5077         app->shared_port->app = NULL;
5078         nxt_port_close(task, app->shared_port);
5079         nxt_port_use(task, app->shared_port, -1);
5080 
5081         app->shared_port = NULL;
5082     }
5083 
5084     nxt_thread_mutex_destroy(&app->mutex);
5085     nxt_mp_destroy(app->mem_pool);
5086 
5087     app_joint->app = NULL;
5088 
5089     if (nxt_timer_delete(task->thread->engine, &app_joint->idle_timer)) {
5090         app_joint->idle_timer.handler = nxt_router_app_joint_release_handler;
5091         nxt_timer_add(task->thread->engine, &app_joint->idle_timer, 0);
5092 
5093     } else {
5094         nxt_router_app_joint_use(task, app_joint, -1);
5095     }
5096 }
5097 
5098 
5099 static void
5100 nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
5101     nxt_request_rpc_data_t *req_rpc_data)
5102 {
5103     nxt_bool_t          start_process;
5104     nxt_port_t          *port;
5105     nxt_http_request_t  *r;
5106 
5107     start_process = 0;
5108 
5109     nxt_thread_mutex_lock(&app->mutex);
5110 
5111     port = app->shared_port;
5112     nxt_port_inc_use(port);
5113 
5114     app->active_requests++;
5115 
5116     if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) {
5117         app->pending_processes++;
5118         start_process = 1;
5119     }
5120 
5121     r = req_rpc_data->request;
5122 
5123     /*
5124      * Put request into application-wide list to be able to cancel request
5125      * if something goes wrong with application processes.
5126      */
5127     nxt_queue_insert_tail(&app->ack_waiting_req, &r->app_link);
5128 
5129     nxt_thread_mutex_unlock(&app->mutex);
5130 
5131     /*
5132      * Retain request memory pool while request is linked in ack_waiting_req
5133      * to guarantee request structure memory is accessble.
5134      */
5135     nxt_mp_retain(r->mem_pool);
5136 
5137     req_rpc_data->app_port = port;
5138     req_rpc_data->apr_action = NXT_APR_REQUEST_FAILED;
5139 
5140     if (start_process) {
5141         nxt_router_start_app_process(task, app);
5142     }
5143 }
5144 
5145 
5146 void
5147 nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r,
5148     nxt_http_action_t *action)
5149 {
5150     nxt_event_engine_t      *engine;
5151     nxt_http_app_conf_t     *conf;
5152     nxt_request_rpc_data_t  *req_rpc_data;
5153 
5154     conf = action->u.conf;
5155     engine = task->thread->engine;
5156 
5157     r->app_target = conf->target;
5158 
5159     req_rpc_data = nxt_port_rpc_register_handler_ex(task, engine->port,
5160                                           nxt_router_response_ready_handler,
5161                                           nxt_router_response_error_handler,
5162                                           sizeof(nxt_request_rpc_data_t));
5163     if (nxt_slow_path(req_rpc_data == NULL)) {
5164         nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
5165         return;
5166     }
5167 
5168     /*
5169      * At this point we have request req_rpc_data allocated and registered
5170      * in port handlers.  Need to fixup request memory pool.  Counterpart
5171      * release will be called via following call chain:
5172      *    nxt_request_rpc_data_unlink() ->
5173      *        nxt_router_http_request_release_post() ->
5174      *            nxt_router_http_request_release()
5175      */
5176     nxt_mp_retain(r->mem_pool);
5177 
5178     r->timer.task = &engine->task;
5179     r->timer.work_queue = &engine->fast_work_queue;
5180     r->timer.log = engine->task.log;
5181     r->timer.bias = NXT_TIMER_DEFAULT_BIAS;
5182 
5183     r->engine = engine;
5184     r->err_work.handler = nxt_router_http_request_error;
5185     r->err_work.task = task;
5186     r->err_work.obj = r;
5187 
5188     req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data);
5189     req_rpc_data->app = conf->app;
5190     req_rpc_data->msg_info.body_fd = -1;
5191     req_rpc_data->rpc_cancel = 1;
5192 
5193     nxt_router_app_use(task, conf->app, 1);
5194 
5195     req_rpc_data->request = r;
5196     r->req_rpc_data = req_rpc_data;
5197 
5198     if (r->last != NULL) {
5199         r->last->completion_handler = nxt_router_http_request_done;
5200     }
5201 
5202     nxt_router_app_port_get(task, conf->app, req_rpc_data);
5203     nxt_router_app_prepare_request(task, req_rpc_data);
5204 }
5205 
5206 
5207 static void
5208 nxt_router_http_request_error(nxt_task_t *task, void *obj, void *data)
5209 {
5210     nxt_http_request_t  *r;
5211 
5212     r = obj;
5213 
5214     nxt_debug(task, "router http request error (rpc_data %p)", r->req_rpc_data);
5215 
5216     nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
5217 
5218     if (r->req_rpc_data != NULL) {
5219         nxt_request_rpc_data_unlink(task, r->req_rpc_data);
5220     }
5221 
5222     nxt_mp_release(r->mem_pool);
5223 }
5224 
5225 
5226 static void
5227 nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data)
5228 {
5229     nxt_http_request_t  *r;
5230 
5231     r = data;
5232 
5233     nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data);
5234 
5235     if (r->req_rpc_data != NULL) {
5236         nxt_request_rpc_data_unlink(task, r->req_rpc_data);
5237     }
5238 
5239     nxt_http_request_close_handler(task, r, r->proto.any);
5240 }
5241 
5242 
5243 static void
5244 nxt_router_app_prepare_request(nxt_task_t *task,
5245     nxt_request_rpc_data_t *req_rpc_data)
5246 {
5247     nxt_app_t         *app;
5248     nxt_buf_t         *buf, *body;
5249     nxt_int_t         res;
5250     nxt_port_t        *port, *reply_port;
5251 
5252     int                   notify;
5253     struct {
5254         nxt_port_msg_t       pm;
5255         nxt_port_mmap_msg_t  mm;
5256     } msg;
5257 
5258 
5259     app = req_rpc_data->app;
5260 
5261     nxt_assert(app != NULL);
5262 
5263     port = req_rpc_data->app_port;
5264 
5265     nxt_assert(port != NULL);
5266     nxt_assert(port->queue != NULL);
5267 
5268     reply_port = task->thread->engine->port;
5269 
5270     buf = nxt_router_prepare_msg(task, req_rpc_data->request, app,
5271                                  nxt_app_msg_prefix[app->type]);
5272     if (nxt_slow_path(buf == NULL)) {
5273         nxt_alert(task, "stream #%uD, app '%V': failed to prepare app message",
5274                   req_rpc_data->stream, &app->name);
5275 
5276         nxt_http_request_error(task, req_rpc_data->request,
5277                                NXT_HTTP_INTERNAL_SERVER_ERROR);
5278 
5279         return;
5280     }
5281 
5282     nxt_debug(task, "about to send %O bytes buffer to app process port %d",
5283                     nxt_buf_used_size(buf),
5284                     port->socket.fd);
5285 
5286     req_rpc_data->msg_info.buf = buf;
5287 
5288     body = req_rpc_data->request->body;
5289 
5290     if (body != NULL && nxt_buf_is_file(body)) {
5291         req_rpc_data->msg_info.body_fd = body->file->fd;
5292 
5293         body->file->fd = -1;
5294 
5295     } else {
5296         req_rpc_data->msg_info.body_fd = -1;
5297     }
5298 
5299     msg.pm.stream = req_rpc_data->stream;
5300     msg.pm.pid = reply_port->pid;
5301     msg.pm.reply_port = reply_port->id;
5302     msg.pm.type = NXT_PORT_MSG_REQ_HEADERS;
5303     msg.pm.last = 0;
5304     msg.pm.mmap = 1;
5305     msg.pm.nf = 0;
5306     msg.pm.mf = 0;
5307     msg.pm.tracking = 0;
5308 
5309     nxt_port_mmap_handler_t *mmap_handler = buf->parent;
5310     nxt_port_mmap_header_t *hdr = mmap_handler->hdr;
5311 
5312     msg.mm.mmap_id = hdr->id;
5313     msg.mm.chunk_id = nxt_port_mmap_chunk_id(hdr, buf->mem.pos);
5314     msg.mm.size = nxt_buf_used_size(buf);
5315 
5316     res = nxt_app_queue_send(port->queue, &msg, sizeof(msg),
5317                              req_rpc_data->stream, &notify,
5318                              &req_rpc_data->msg_info.tracking_cookie);
5319     if (nxt_fast_path(res == NXT_OK)) {
5320         if (notify != 0) {
5321             (void) nxt_port_socket_write(task, port,
5322                                          NXT_PORT_MSG_READ_QUEUE,
5323                                          -1, req_rpc_data->stream,
5324                                          reply_port->id, NULL);
5325 
5326         } else {
5327             nxt_debug(task, "queue is not empty");
5328         }
5329 
5330         buf->is_port_mmap_sent = 1;
5331         buf->mem.pos = buf->mem.free;
5332 
5333     } else {
5334         nxt_alert(task, "stream #%uD, app '%V': failed to send app message",
5335                   req_rpc_data->stream, &app->name);
5336 
5337         nxt_http_request_error(task, req_rpc_data->request,
5338                                NXT_HTTP_INTERNAL_SERVER_ERROR);
5339     }
5340 }
5341 
5342 
5343 struct nxt_fields_iter_s {
5344     nxt_list_part_t   *part;
5345     nxt_http_field_t  *field;
5346 };
5347 
5348 typedef struct nxt_fields_iter_s  nxt_fields_iter_t;
5349 
5350 
5351 static nxt_http_field_t *
5352 nxt_fields_part_first(nxt_list_part_t *part, nxt_fields_iter_t *i)
5353 {
5354     if (part == NULL) {
5355         return NULL;
5356     }
5357 
5358     while (part->nelts == 0) {
5359         part = part->next;
5360         if (part == NULL) {
5361             return NULL;
5362         }
5363     }
5364 
5365     i->part = part;
5366     i->field = nxt_list_data(i->part);
5367 
5368     return i->field;
5369 }
5370 
5371 
5372 static nxt_http_field_t *
5373 nxt_fields_first(nxt_list_t *fields, nxt_fields_iter_t *i)
5374 {
5375     return nxt_fields_part_first(nxt_list_part(fields), i);
5376 }
5377 
5378 
5379 static nxt_http_field_t *
5380 nxt_fields_next(nxt_fields_iter_t *i)
5381 {
5382     nxt_http_field_t  *end = nxt_list_data(i->part);
5383 
5384     end += i->part->nelts;
5385     i->field++;
5386 
5387     if (i->field < end) {
5388         return i->field;
5389     }
5390 
5391     return nxt_fields_part_first(i->part->next, i);
5392 }
5393 
5394 
5395 static nxt_buf_t *
5396 nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r,
5397     nxt_app_t *app, const nxt_str_t *prefix)
5398 {
5399     void                *target_pos, *query_pos;
5400     u_char              *pos, *end, *p, c;
5401     size_t              fields_count, req_size, size, free_size;
5402     size_t              copy_size;
5403     nxt_off_t           content_length;
5404     nxt_buf_t           *b, *buf, *out, **tail;
5405     nxt_http_field_t    *field, *dup;
5406     nxt_unit_field_t    *dst_field;
5407     nxt_fields_iter_t   iter, dup_iter;
5408     nxt_unit_request_t  *req;
5409 
5410     req_size = sizeof(nxt_unit_request_t)
5411                + r->method->length + 1
5412                + r->version.length + 1
5413                + r->remote->length + 1
5414                + r->local->length + 1
5415                + r->server_name.length + 1
5416                + r->target.length + 1
5417                + (r->path->start != r->target.start ? r->path->length + 1 : 0);
5418 
5419     content_length = r->content_length_n < 0 ? 0 : r->content_length_n;
5420     fields_count = 0;
5421 
5422     nxt_list_each(field, r->fields) {
5423         fields_count++;
5424 
5425         req_size += field->name_length + prefix->length + 1
5426                     + field->value_length + 1;
5427     } nxt_list_loop;
5428 
5429     req_size += fields_count * sizeof(nxt_unit_field_t);
5430 
5431     if (nxt_slow_path(req_size > PORT_MMAP_DATA_SIZE)) {
5432         nxt_alert(task, "headers to big to fit in shared memory (%d)",
5433                   (int) req_size);
5434 
5435         return NULL;
5436     }
5437 
5438     out = nxt_port_mmap_get_buf(task, &app->outgoing,
5439               nxt_min(req_size + content_length, PORT_MMAP_DATA_SIZE));
5440     if (nxt_slow_path(out == NULL)) {
5441         return NULL;
5442     }
5443 
5444     req = (nxt_unit_request_t *) out->mem.free;
5445     out->mem.free += req_size;
5446 
5447     req->app_target = r->app_target;
5448 
5449     req->content_length = content_length;
5450 
5451     p = (u_char *) (req->fields + fields_count);
5452 
5453     nxt_debug(task, "fields_count=%d", (int) fields_count);
5454 
5455     req->method_length = r->method->length;
5456     nxt_unit_sptr_set(&req->method, p);
5457     p = nxt_cpymem(p, r->method->start, r->method->length);
5458     *p++ = '\0';
5459 
5460     req->version_length = r->version.length;
5461     nxt_unit_sptr_set(&req->version, p);
5462     p = nxt_cpymem(p, r->version.start, r->version.length);
5463     *p++ = '\0';
5464 
5465     req->remote_length = r->remote->address_length;
5466     nxt_unit_sptr_set(&req->remote, p);
5467     p = nxt_cpymem(p, nxt_sockaddr_address(r->remote),
5468                    r->remote->address_length);
5469     *p++ = '\0';
5470 
5471     req->local_length = r->local->address_length;
5472     nxt_unit_sptr_set(&req->local, p);
5473     p = nxt_cpymem(p, nxt_sockaddr_address(r->local), r->local->address_length);
5474     *p++ = '\0';
5475 
5476     req->tls = (r->tls != NULL);
5477     req->websocket_handshake = r->websocket_handshake;
5478 
5479     req->server_name_length = r->server_name.length;
5480     nxt_unit_sptr_set(&req->server_name, p);
5481     p = nxt_cpymem(p, r->server_name.start, r->server_name.length);
5482     *p++ = '\0';
5483 
5484     target_pos = p;
5485     req->target_length = (uint32_t) r->target.length;
5486     nxt_unit_sptr_set(&req->target, p);
5487     p = nxt_cpymem(p, r->target.start, r->target.length);
5488     *p++ = '\0';
5489 
5490     req->path_length = (uint32_t) r->path->length;
5491     if (r->path->start == r->target.start) {
5492         nxt_unit_sptr_set(&req->path, target_pos);
5493 
5494     } else {
5495         nxt_unit_sptr_set(&req->path, p);
5496         p = nxt_cpymem(p, r->path->start, r->path->length);
5497         *p++ = '\0';
5498     }
5499 
5500     req->query_length = r->args != NULL ? (uint32_t) r->args->length : 0;
5501     if (r->args != NULL && r->args->start != NULL) {
5502         query_pos = nxt_pointer_to(target_pos,
5503                                    r->args->start - r->target.start);
5504 
5505         nxt_unit_sptr_set(&req->query, query_pos);
5506 
5507     } else {
5508         req->query.offset = 0;
5509     }
5510 
5511     req->content_length_field = NXT_UNIT_NONE_FIELD;
5512     req->content_type_field   = NXT_UNIT_NONE_FIELD;
5513     req->cookie_field         = NXT_UNIT_NONE_FIELD;
5514     req->authorization_field  = NXT_UNIT_NONE_FIELD;
5515 
5516     dst_field = req->fields;
5517 
5518     for (field = nxt_fields_first(r->fields, &iter);
5519          field != NULL;
5520          field = nxt_fields_next(&iter))
5521     {
5522         if (field->skip) {
5523             continue;
5524         }
5525 
5526         dst_field->hash = field->hash;
5527         dst_field->skip = 0;
5528         dst_field->name_length = field->name_length + prefix->length;
5529         dst_field->value_length = field->value_length;
5530 
5531         if (field == r->content_length) {
5532             req->content_length_field = dst_field - req->fields;
5533 
5534         } else if (field == r->content_type) {
5535             req->content_type_field = dst_field - req->fields;
5536 
5537         } else if (field == r->cookie) {
5538             req->cookie_field = dst_field - req->fields;
5539 
5540         } else if (field == r->authorization) {
5541             req->authorization_field = dst_field - req->fields;
5542         }
5543 
5544         nxt_debug(task, "add field 0x%04Xd, %d, %d, %p : %d %p",
5545                   (int) field->hash, (int) field->skip,
5546                   (int) field->name_length, field->name,
5547                   (int) field->value_length, field->value);
5548 
5549         if (prefix->length != 0) {
5550             nxt_unit_sptr_set(&dst_field->name, p);
5551             p = nxt_cpymem(p, prefix->start, prefix->length);
5552 
5553             end = field->name + field->name_length;
5554             for (pos = field->name; pos < end; pos++) {
5555                 c = *pos;
5556 
5557                 if (c >= 'a' && c <= 'z') {
5558                     *p++ = (c & ~0x20);
5559                     continue;
5560                 }
5561 
5562                 if (c == '-') {
5563                     *p++ = '_';
5564                     continue;
5565                 }
5566 
5567                 *p++ = c;
5568             }
5569 
5570         } else {
5571             nxt_unit_sptr_set(&dst_field->name, p);
5572             p = nxt_cpymem(p, field->name, field->name_length);
5573         }
5574 
5575         *p++ = '\0';
5576 
5577         nxt_unit_sptr_set(&dst_field->value, p);
5578         p = nxt_cpymem(p, field->value, field->value_length);
5579 
5580         if (prefix->length != 0) {
5581             dup_iter = iter;
5582 
5583             for (dup = nxt_fields_next(&dup_iter);
5584                  dup != NULL;
5585                  dup = nxt_fields_next(&dup_iter))
5586             {
5587                 if (dup->name_length != field->name_length
5588                     || dup->skip
5589                     || dup->hash != field->hash
5590                     || nxt_memcasecmp(dup->name, field->name, dup->name_length))
5591                 {
5592                     continue;
5593                 }
5594 
5595                 p = nxt_cpymem(p, ", ", 2);
5596                 p = nxt_cpymem(p, dup->value, dup->value_length);
5597 
5598                 dst_field->value_length += 2 + dup->value_length;
5599 
5600                 dup->skip = 1;
5601             }
5602         }
5603 
5604         *p++ = '\0';
5605 
5606         dst_field++;
5607     }
5608 
5609     req->fields_count = (uint32_t) (dst_field - req->fields);
5610 
5611     nxt_unit_sptr_set(&req->preread_content, out->mem.free);
5612 
5613     buf = out;
5614     tail = &buf->next;
5615 
5616     for (b = r->body; b != NULL; b = b->next) {
5617         size = nxt_buf_mem_used_size(&b->mem);
5618         pos = b->mem.pos;
5619 
5620         while (size > 0) {
5621             if (buf == NULL) {
5622                 free_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
5623 
5624                 buf = nxt_port_mmap_get_buf(task, &app->outgoing, free_size);
5625                 if (nxt_slow_path(buf == NULL)) {
5626                     while (out != NULL) {
5627                         buf = out->next;
5628                         out->next = NULL;
5629                         out->completion_handler(task, out, out->parent);
5630                         out = buf;
5631                     }
5632                     return NULL;
5633                 }
5634 
5635                 *tail = buf;
5636                 tail = &buf->next;
5637 
5638             } else {
5639                 free_size = nxt_buf_mem_free_size(&buf->mem);
5640                 if (free_size < size
5641                     && nxt_port_mmap_increase_buf(task, buf, size, 1)
5642                        == NXT_OK)
5643                 {
5644                     free_size = nxt_buf_mem_free_size(&buf->mem);
5645                 }
5646             }
5647 
5648             if (free_size > 0) {
5649                 copy_size = nxt_min(free_size, size);
5650 
5651                 buf->mem.free = nxt_cpymem(buf->mem.free, pos, copy_size);
5652 
5653                 size -= copy_size;
5654                 pos += copy_size;
5655 
5656                 if (size == 0) {
5657                     break;
5658                 }
5659             }
5660 
5661             buf = NULL;
5662         }
5663     }
5664 
5665     return out;
5666 }
5667 
5668 
5669 static void
5670 nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
5671 {
5672     nxt_timer_t              *timer;
5673     nxt_http_request_t       *r;
5674     nxt_request_rpc_data_t   *req_rpc_data;
5675 
5676     timer = obj;
5677 
5678     nxt_debug(task, "router app timeout");
5679 
5680     r = nxt_timer_data(timer, nxt_http_request_t, timer);
5681     req_rpc_data = r->timer_data;
5682 
5683     nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
5684 
5685     nxt_request_rpc_data_unlink(task, req_rpc_data);
5686 }
5687 
5688 
5689 static void
5690 nxt_router_http_request_release_post(nxt_task_t *task, nxt_http_request_t *r)
5691 {
5692     r->timer.handler = nxt_router_http_request_release;
5693     nxt_timer_add(task->thread->engine, &r->timer, 0);
5694 }
5695 
5696 
5697 static void
5698 nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data)
5699 {
5700     nxt_http_request_t  *r;
5701 
5702     nxt_debug(task, "http request pool release");
5703 
5704     r = nxt_timer_data(obj, nxt_http_request_t, timer);
5705 
5706     nxt_mp_release(r->mem_pool);
5707 }
5708 
5709 
5710 static void
5711 nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5712 {
5713     size_t                   mi;
5714     uint32_t                 i;
5715     nxt_bool_t               ack;
5716     nxt_process_t            *process;
5717     nxt_free_map_t           *m;
5718     nxt_port_mmap_handler_t  *mmap_handler;
5719 
5720     nxt_debug(task, "oosm in %PI", msg->port_msg.pid);
5721 
5722     process = nxt_runtime_process_find(task->thread->runtime,
5723                                        msg->port_msg.pid);
5724     if (nxt_slow_path(process == NULL)) {
5725         return;
5726     }
5727 
5728     ack = 0;
5729 
5730     /*
5731      * To mitigate possible racing condition (when OOSM message received
5732      * after some of the memory was already freed), need to try to find
5733      * first free segment in shared memory and send ACK if found.
5734      */
5735 
5736     nxt_thread_mutex_lock(&process->incoming.mutex);
5737 
5738     for (i = 0; i < process->incoming.size; i++) {
5739         mmap_handler = process->incoming.elts[i].mmap_handler;
5740 
5741         if (nxt_slow_path(mmap_handler == NULL)) {
5742             continue;
5743         }
5744 
5745         m = mmap_handler->hdr->free_map;
5746 
5747         for (mi = 0; mi < MAX_FREE_IDX; mi++) {
5748             if (m[mi] != 0) {
5749                 ack = 1;
5750 
5751                 nxt_debug(task, "oosm: already free #%uD %uz = 0x%08xA",
5752                           i, mi, m[mi]);
5753 
5754                 break;
5755             }
5756         }
5757     }
5758 
5759     nxt_thread_mutex_unlock(&process->incoming.mutex);
5760 
5761     if (ack) {
5762         nxt_process_broadcast_shm_ack(task, process);
5763     }
5764 }
5765 
5766 
5767 static void
5768 nxt_router_get_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5769 {
5770     nxt_fd_t                 fd;
5771     nxt_port_t               *port;
5772     nxt_runtime_t            *rt;
5773     nxt_port_mmaps_t         *mmaps;
5774     nxt_port_msg_get_mmap_t  *get_mmap_msg;
5775     nxt_port_mmap_handler_t  *mmap_handler;
5776 
5777     rt = task->thread->runtime;
5778 
5779     port = nxt_runtime_port_find(rt, msg->port_msg.pid,
5780                                  msg->port_msg.reply_port);
5781     if (nxt_slow_path(port == NULL)) {
5782         nxt_alert(task, "get_mmap_handler: reply_port %PI:%d not found",
5783                   msg->port_msg.pid, msg->port_msg.reply_port);
5784 
5785         return;
5786     }
5787 
5788     if (nxt_slow_path(nxt_buf_used_size(msg->buf)
5789                       < (int) sizeof(nxt_port_msg_get_mmap_t)))
5790     {
5791         nxt_alert(task, "get_mmap_handler: message buffer too small (%d)",
5792                   (int) nxt_buf_used_size(msg->buf));
5793 
5794         return;
5795     }
5796 
5797     get_mmap_msg = (nxt_port_msg_get_mmap_t *) msg->buf->mem.pos;
5798 
5799     nxt_assert(port->type == NXT_PROCESS_APP);
5800 
5801     if (nxt_slow_path(port->app == NULL)) {
5802         nxt_alert(task, "get_mmap_handler: app == NULL for reply port %PI:%d",
5803                   port->pid, port->id);
5804 
5805         // FIXME
5806         nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
5807                               -1, msg->port_msg.stream, 0, NULL);
5808 
5809         return;
5810     }
5811 
5812     mmaps = &port->app->outgoing;
5813     nxt_thread_mutex_lock(&mmaps->mutex);
5814 
5815     if (nxt_slow_path(get_mmap_msg->id >= mmaps->size)) {
5816         nxt_thread_mutex_unlock(&mmaps->mutex);
5817 
5818         nxt_alert(task, "get_mmap_handler: mmap id is too big (%d)",
5819                   (int) get_mmap_msg->id);
5820 
5821         // FIXME
5822         nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
5823                               -1, msg->port_msg.stream, 0, NULL);
5824         return;
5825     }
5826 
5827     mmap_handler = mmaps->elts[get_mmap_msg->id].mmap_handler;
5828 
5829     fd = mmap_handler->fd;
5830 
5831     nxt_thread_mutex_unlock(&mmaps->mutex);
5832 
5833     nxt_debug(task, "get mmap %PI:%d found",
5834               msg->port_msg.pid, (int) get_mmap_msg->id);
5835 
5836     (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL);
5837 }
5838 
5839 
5840 static void
5841 nxt_router_get_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5842 {
5843     nxt_port_t               *port, *reply_port;
5844     nxt_runtime_t            *rt;
5845     nxt_port_msg_get_port_t  *get_port_msg;
5846 
5847     rt = task->thread->runtime;
5848 
5849     reply_port = nxt_runtime_port_find(rt, msg->port_msg.pid,
5850                                        msg->port_msg.reply_port);
5851     if (nxt_slow_path(reply_port == NULL)) {
5852         nxt_alert(task, "get_port_handler: reply_port %PI:%d not found",
5853                   msg->port_msg.pid, msg->port_msg.reply_port);
5854 
5855         return;
5856     }
5857 
5858     if (nxt_slow_path(nxt_buf_used_size(msg->buf)
5859                       < (int) sizeof(nxt_port_msg_get_port_t)))
5860     {
5861         nxt_alert(task, "get_port_handler: message buffer too small (%d)",
5862                   (int) nxt_buf_used_size(msg->buf));
5863 
5864         return;
5865     }
5866 
5867     get_port_msg = (nxt_port_msg_get_port_t *) msg->buf->mem.pos;
5868 
5869     port = nxt_runtime_port_find(rt, get_port_msg->pid, get_port_msg->id);
5870     if (nxt_slow_path(port == NULL)) {
5871         nxt_alert(task, "get_port_handler: port %PI:%d not found",
5872                   get_port_msg->pid, get_port_msg->id);
5873 
5874         return;
5875     }
5876 
5877     nxt_debug(task, "get port %PI:%d found", get_port_msg->pid,
5878               get_port_msg->id);
5879 
5880     (void) nxt_port_send_port(task, reply_port, port, msg->port_msg.stream);
5881 }
5882