xref: /unit/src/nxt_router.c (revision 1923:9f268a8a1a2f)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Valentin V. Bartenev
5  * Copyright (C) NGINX, Inc.
6  */
7 
8 #include <nxt_router.h>
9 #include <nxt_conf.h>
10 #if (NXT_TLS)
11 #include <nxt_cert.h>
12 #endif
13 #include <nxt_http.h>
14 #include <nxt_port_memory_int.h>
15 #include <nxt_unit_request.h>
16 #include <nxt_unit_response.h>
17 #include <nxt_router_request.h>
18 #include <nxt_app_queue.h>
19 #include <nxt_port_queue.h>
20 
21 typedef struct {
22     nxt_str_t         type;
23     uint32_t          processes;
24     uint32_t          max_processes;
25     uint32_t          spare_processes;
26     nxt_msec_t        timeout;
27     nxt_msec_t        idle_timeout;
28     uint32_t          requests;
29     nxt_conf_value_t  *limits_value;
30     nxt_conf_value_t  *processes_value;
31     nxt_conf_value_t  *targets_value;
32 } nxt_router_app_conf_t;
33 
34 
35 typedef struct {
36     nxt_str_t         pass;
37     nxt_str_t         application;
38 } nxt_router_listener_conf_t;
39 
40 
41 #if (NXT_TLS)
42 
43 typedef struct {
44     nxt_str_t               name;
45     nxt_socket_conf_t       *socket_conf;
46     nxt_router_temp_conf_t  *temp_conf;
47     nxt_tls_init_t          *tls_init;
48     nxt_bool_t              last;
49 
50     nxt_queue_link_t        link;  /* for nxt_socket_conf_t.tls */
51 } nxt_router_tlssock_t;
52 
53 #endif
54 
55 
56 typedef struct {
57     nxt_str_t               *name;
58     nxt_socket_conf_t       *socket_conf;
59     nxt_router_temp_conf_t  *temp_conf;
60     nxt_bool_t              last;
61 } nxt_socket_rpc_t;
62 
63 
64 typedef struct {
65     nxt_app_t               *app;
66     nxt_router_temp_conf_t  *temp_conf;
67 } nxt_app_rpc_t;
68 
69 
70 static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process,
71     nxt_mp_t *mp);
72 static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data);
73 static void nxt_router_greet_controller(nxt_task_t *task,
74     nxt_port_t *controller_port);
75 
76 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
77 
78 static void nxt_router_new_port_handler(nxt_task_t *task,
79     nxt_port_recv_msg_t *msg);
80 static void nxt_router_conf_data_handler(nxt_task_t *task,
81     nxt_port_recv_msg_t *msg);
82 static void nxt_router_remove_pid_handler(nxt_task_t *task,
83     nxt_port_recv_msg_t *msg);
84 static void nxt_router_access_log_reopen_handler(nxt_task_t *task,
85     nxt_port_recv_msg_t *msg);
86 
87 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
88 static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
89 static void nxt_router_conf_ready(nxt_task_t *task,
90     nxt_router_temp_conf_t *tmcf);
91 static void nxt_router_conf_error(nxt_task_t *task,
92     nxt_router_temp_conf_t *tmcf);
93 static void nxt_router_conf_send(nxt_task_t *task,
94     nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
95 
96 static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
97     nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
98 static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task,
99     nxt_router_conf_t *rtcf, nxt_conf_value_t *conf);
100 
101 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
102 static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data);
103 static nxt_int_t nxt_router_apps_hash_add(nxt_router_conf_t *rtcf,
104     nxt_app_t *app);
105 static nxt_app_t *nxt_router_apps_hash_get(nxt_router_conf_t *rtcf,
106     nxt_str_t *name);
107 static void nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf,
108     int i);
109 
110 static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task,
111     nxt_port_t *port);
112 static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task,
113     nxt_port_t *port);
114 static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task,
115     nxt_port_t *port, nxt_fd_t fd);
116 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
117     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
118 static void nxt_router_listen_socket_ready(nxt_task_t *task,
119     nxt_port_recv_msg_t *msg, void *data);
120 static void nxt_router_listen_socket_error(nxt_task_t *task,
121     nxt_port_recv_msg_t *msg, void *data);
122 #if (NXT_TLS)
123 static void nxt_router_tls_rpc_handler(nxt_task_t *task,
124     nxt_port_recv_msg_t *msg, void *data);
125 static nxt_int_t nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
126     nxt_conf_value_t *value, nxt_socket_conf_t *skcf, nxt_tls_init_t *tls_init,
127     nxt_bool_t last);
128 #endif
129 static void nxt_router_app_rpc_create(nxt_task_t *task,
130     nxt_router_temp_conf_t *tmcf, nxt_app_t *app);
131 static void nxt_router_app_prefork_ready(nxt_task_t *task,
132     nxt_port_recv_msg_t *msg, void *data);
133 static void nxt_router_app_prefork_error(nxt_task_t *task,
134     nxt_port_recv_msg_t *msg, void *data);
135 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
136     nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
137 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
138     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa);
139 
140 static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
141     nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
142     const nxt_event_interface_t *interface);
143 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
144     nxt_router_engine_conf_t *recf);
145 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
146     nxt_router_engine_conf_t *recf);
147 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
148     nxt_router_engine_conf_t *recf);
149 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
150     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
151     nxt_work_handler_t handler);
152 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
153     nxt_router_engine_conf_t *recf);
154 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
155     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
156 
157 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
158     nxt_router_temp_conf_t *tmcf);
159 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
160     nxt_event_engine_t *engine);
161 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
162     nxt_router_temp_conf_t *tmcf);
163 
164 static void nxt_router_engines_post(nxt_router_t *router,
165     nxt_router_temp_conf_t *tmcf);
166 static void nxt_router_engine_post(nxt_event_engine_t *engine,
167     nxt_work_t *jobs);
168 
169 static void nxt_router_thread_start(void *data);
170 static void nxt_router_rt_add_port(nxt_task_t *task, void *obj,
171     void *data);
172 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
173     void *data);
174 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
175     void *data);
176 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
177     void *data);
178 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
179     void *data);
180 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
181     void *data);
182 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
183     void *data);
184 static void nxt_router_req_headers_ack_handler(nxt_task_t *task,
185     nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data);
186 static void nxt_router_listen_socket_release(nxt_task_t *task,
187     nxt_socket_conf_t *skcf);
188 
189 static void nxt_router_access_log_writer(nxt_task_t *task,
190     nxt_http_request_t *r, nxt_router_access_log_t *access_log);
191 static u_char *nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now,
192     struct tm *tm, size_t size, const char *format);
193 static void nxt_router_access_log_open(nxt_task_t *task,
194     nxt_router_temp_conf_t *tmcf);
195 static void nxt_router_access_log_ready(nxt_task_t *task,
196     nxt_port_recv_msg_t *msg, void *data);
197 static void nxt_router_access_log_error(nxt_task_t *task,
198     nxt_port_recv_msg_t *msg, void *data);
199 static void nxt_router_access_log_release(nxt_task_t *task,
200     nxt_thread_spinlock_t *lock, nxt_router_access_log_t *access_log);
201 static void nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj,
202     void *data);
203 static void nxt_router_access_log_reopen_ready(nxt_task_t *task,
204     nxt_port_recv_msg_t *msg, void *data);
205 static void nxt_router_access_log_reopen_error(nxt_task_t *task,
206     nxt_port_recv_msg_t *msg, void *data);
207 
208 static void nxt_router_app_port_ready(nxt_task_t *task,
209     nxt_port_recv_msg_t *msg, void *data);
210 static nxt_int_t nxt_router_app_shared_port_send(nxt_task_t *task,
211     nxt_port_t *app_port);
212 static void nxt_router_app_port_error(nxt_task_t *task,
213     nxt_port_recv_msg_t *msg, void *data);
214 
215 static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i);
216 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
217 
218 static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
219     nxt_apr_action_t action);
220 static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
221     nxt_request_rpc_data_t *req_rpc_data);
222 static void nxt_router_http_request_error(nxt_task_t *task, void *obj,
223     void *data);
224 static void nxt_router_http_request_done(nxt_task_t *task, void *obj,
225     void *data);
226 
227 static void nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj,
228     void *data);
229 static void nxt_router_app_prepare_request(nxt_task_t *task,
230     nxt_request_rpc_data_t *req_rpc_data);
231 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task,
232     nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix);
233 
234 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
235 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
236     void *data);
237 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj,
238     void *data);
239 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj,
240     void *data);
241 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data);
242 
243 static const nxt_http_request_state_t  nxt_http_request_send_state;
244 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
245 
246 static void nxt_router_app_joint_use(nxt_task_t *task,
247     nxt_app_joint_t *app_joint, int i);
248 
249 static void nxt_router_http_request_release_post(nxt_task_t *task,
250     nxt_http_request_t *r);
251 static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
252     void *data);
253 static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
254 static void nxt_router_get_port_handler(nxt_task_t *task,
255     nxt_port_recv_msg_t *msg);
256 static void nxt_router_get_mmap_handler(nxt_task_t *task,
257     nxt_port_recv_msg_t *msg);
258 
259 extern const nxt_http_request_state_t  nxt_http_websocket;
260 
261 static nxt_router_t  *nxt_router;
262 
263 static const nxt_str_t http_prefix = nxt_string("HTTP_");
264 static const nxt_str_t empty_prefix = nxt_string("");
265 
266 static const nxt_str_t  *nxt_app_msg_prefix[] = {
267     &empty_prefix,
268     &empty_prefix,
269     &http_prefix,
270     &http_prefix,
271     &http_prefix,
272     &empty_prefix,
273 };
274 
275 
276 static const nxt_port_handlers_t  nxt_router_process_port_handlers = {
277     .quit         = nxt_signal_quit_handler,
278     .new_port     = nxt_router_new_port_handler,
279     .get_port     = nxt_router_get_port_handler,
280     .change_file  = nxt_port_change_log_file_handler,
281     .mmap         = nxt_port_mmap_handler,
282     .get_mmap     = nxt_router_get_mmap_handler,
283     .data         = nxt_router_conf_data_handler,
284     .remove_pid   = nxt_router_remove_pid_handler,
285     .access_log   = nxt_router_access_log_reopen_handler,
286     .rpc_ready    = nxt_port_rpc_handler,
287     .rpc_error    = nxt_port_rpc_handler,
288     .oosm         = nxt_router_oosm_handler,
289 };
290 
291 
292 const nxt_process_init_t  nxt_router_process = {
293     .name           = "router",
294     .type           = NXT_PROCESS_ROUTER,
295     .prefork        = nxt_router_prefork,
296     .restart        = 1,
297     .setup          = nxt_process_core_setup,
298     .start          = nxt_router_start,
299     .port_handlers  = &nxt_router_process_port_handlers,
300     .signals        = nxt_process_signals,
301 };
302 
303 
304 /* Queues of nxt_socket_conf_t */
305 nxt_queue_t  creating_sockets;
306 nxt_queue_t  pending_sockets;
307 nxt_queue_t  updating_sockets;
308 nxt_queue_t  keeping_sockets;
309 nxt_queue_t  deleting_sockets;
310 
311 
312 static nxt_int_t
313 nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp)
314 {
315     nxt_runtime_stop_app_processes(task, task->thread->runtime);
316 
317     return NXT_OK;
318 }
319 
320 
321 static nxt_int_t
322 nxt_router_start(nxt_task_t *task, nxt_process_data_t *data)
323 {
324     nxt_int_t      ret;
325     nxt_port_t     *controller_port;
326     nxt_router_t   *router;
327     nxt_runtime_t  *rt;
328 
329     rt = task->thread->runtime;
330 
331     nxt_log(task, NXT_LOG_INFO, "router started");
332 
333 #if (NXT_TLS)
334     rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL");
335     if (nxt_slow_path(rt->tls == NULL)) {
336         return NXT_ERROR;
337     }
338 
339     ret = rt->tls->library_init(task);
340     if (nxt_slow_path(ret != NXT_OK)) {
341         return ret;
342     }
343 #endif
344 
345     ret = nxt_http_init(task);
346     if (nxt_slow_path(ret != NXT_OK)) {
347         return ret;
348     }
349 
350     router = nxt_zalloc(sizeof(nxt_router_t));
351     if (nxt_slow_path(router == NULL)) {
352         return NXT_ERROR;
353     }
354 
355     nxt_queue_init(&router->engines);
356     nxt_queue_init(&router->sockets);
357     nxt_queue_init(&router->apps);
358 
359     nxt_router = router;
360 
361     controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
362     if (controller_port != NULL) {
363         nxt_router_greet_controller(task, controller_port);
364     }
365 
366     return NXT_OK;
367 }
368 
369 
370 static void
371 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port)
372 {
373     nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY,
374                           -1, 0, 0, NULL);
375 }
376 
377 
378 static void
379 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
380     void *data)
381 {
382     size_t         size;
383     uint32_t       stream;
384     nxt_mp_t       *mp;
385     nxt_int_t      ret;
386     nxt_app_t      *app;
387     nxt_buf_t      *b;
388     nxt_port_t     *main_port;
389     nxt_runtime_t  *rt;
390 
391     app = data;
392 
393     rt = task->thread->runtime;
394     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
395 
396     nxt_debug(task, "app '%V' %p start process", &app->name, app);
397 
398     size = app->name.length + 1 + app->conf.length;
399 
400     b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size);
401 
402     if (nxt_slow_path(b == NULL)) {
403         goto failed;
404     }
405 
406     nxt_buf_cpystr(b, &app->name);
407     *b->mem.free++ = '\0';
408     nxt_buf_cpystr(b, &app->conf);
409 
410     nxt_router_app_joint_use(task, app->joint, 1);
411 
412     stream = nxt_port_rpc_register_handler(task, port,
413                                            nxt_router_app_port_ready,
414                                            nxt_router_app_port_error,
415                                            -1, app->joint);
416 
417     if (nxt_slow_path(stream == 0)) {
418         nxt_router_app_joint_use(task, app->joint, -1);
419 
420         goto failed;
421     }
422 
423     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS,
424                                 -1, stream, port->id, b);
425 
426     if (nxt_slow_path(ret != NXT_OK)) {
427         nxt_port_rpc_cancel(task, port, stream);
428 
429         nxt_router_app_joint_use(task, app->joint, -1);
430 
431         goto failed;
432     }
433 
434     nxt_router_app_use(task, app, -1);
435 
436     return;
437 
438 failed:
439 
440     if (b != NULL) {
441         mp = b->data;
442         nxt_mp_free(mp, b);
443         nxt_mp_release(mp);
444     }
445 
446     nxt_thread_mutex_lock(&app->mutex);
447 
448     app->pending_processes--;
449 
450     nxt_thread_mutex_unlock(&app->mutex);
451 
452     nxt_router_app_use(task, app, -1);
453 }
454 
455 
456 static void
457 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i)
458 {
459     app_joint->use_count += i;
460 
461     if (app_joint->use_count == 0) {
462         nxt_assert(app_joint->app == NULL);
463 
464         nxt_free(app_joint);
465     }
466 }
467 
468 
469 static nxt_int_t
470 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
471 {
472     nxt_int_t      res;
473     nxt_port_t     *router_port;
474     nxt_runtime_t  *rt;
475 
476     nxt_debug(task, "app '%V' start process", &app->name);
477 
478     rt = task->thread->runtime;
479     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
480 
481     nxt_router_app_use(task, app, 1);
482 
483     res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler,
484                         app);
485 
486     if (res == NXT_OK) {
487         return res;
488     }
489 
490     nxt_thread_mutex_lock(&app->mutex);
491 
492     app->pending_processes--;
493 
494     nxt_thread_mutex_unlock(&app->mutex);
495 
496     nxt_router_app_use(task, app, -1);
497 
498     return NXT_ERROR;
499 }
500 
501 
502 nxt_inline nxt_bool_t
503 nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data)
504 {
505     nxt_buf_t       *b, *next;
506     nxt_bool_t      cancelled;
507     nxt_msg_info_t  *msg_info;
508 
509     msg_info = &req_rpc_data->msg_info;
510 
511     if (msg_info->buf == NULL) {
512         return 0;
513     }
514 
515     cancelled = nxt_app_queue_cancel(req_rpc_data->app->shared_port->queue,
516                                      msg_info->tracking_cookie,
517                                      req_rpc_data->stream);
518 
519     if (cancelled) {
520         nxt_debug(task, "stream #%uD: cancelled by router",
521                   req_rpc_data->stream);
522     }
523 
524     for (b = msg_info->buf; b != NULL; b = next) {
525         next = b->next;
526         b->next = NULL;
527 
528         if (b->is_port_mmap_sent) {
529             b->is_port_mmap_sent = cancelled == 0;
530         }
531 
532         b->completion_handler(task, b, b->parent);
533     }
534 
535     msg_info->buf = NULL;
536 
537     return cancelled;
538 }
539 
540 
541 nxt_inline nxt_bool_t
542 nxt_queue_chk_remove(nxt_queue_link_t *lnk)
543 {
544     if (lnk->next != NULL) {
545         nxt_queue_remove(lnk);
546 
547         lnk->next = NULL;
548 
549         return 1;
550     }
551 
552     return 0;
553 }
554 
555 
556 nxt_inline void
557 nxt_request_rpc_data_unlink(nxt_task_t *task,
558     nxt_request_rpc_data_t *req_rpc_data)
559 {
560     nxt_app_t           *app;
561     nxt_bool_t          unlinked;
562     nxt_http_request_t  *r;
563 
564     nxt_router_msg_cancel(task, req_rpc_data);
565 
566     if (req_rpc_data->app_port != NULL) {
567         nxt_router_app_port_release(task, req_rpc_data->app_port,
568                                     req_rpc_data->apr_action);
569 
570         req_rpc_data->app_port = NULL;
571     }
572 
573     app = req_rpc_data->app;
574     r = req_rpc_data->request;
575 
576     if (r != NULL) {
577         r->timer_data = NULL;
578 
579         nxt_router_http_request_release_post(task, r);
580 
581         r->req_rpc_data = NULL;
582         req_rpc_data->request = NULL;
583 
584         if (app != NULL) {
585             unlinked = 0;
586 
587             nxt_thread_mutex_lock(&app->mutex);
588 
589             if (r->app_link.next != NULL) {
590                 nxt_queue_remove(&r->app_link);
591                 r->app_link.next = NULL;
592 
593                 unlinked = 1;
594             }
595 
596             nxt_thread_mutex_unlock(&app->mutex);
597 
598             if (unlinked) {
599                 nxt_mp_release(r->mem_pool);
600             }
601         }
602     }
603 
604     if (app != NULL) {
605         nxt_router_app_use(task, app, -1);
606 
607         req_rpc_data->app = NULL;
608     }
609 
610     if (req_rpc_data->msg_info.body_fd != -1) {
611         nxt_fd_close(req_rpc_data->msg_info.body_fd);
612 
613         req_rpc_data->msg_info.body_fd = -1;
614     }
615 
616     if (req_rpc_data->rpc_cancel) {
617         req_rpc_data->rpc_cancel = 0;
618 
619         nxt_port_rpc_cancel(task, task->thread->engine->port,
620                             req_rpc_data->stream);
621     }
622 }
623 
624 
625 static void
626 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
627 {
628     nxt_int_t      res;
629     nxt_app_t      *app;
630     nxt_port_t     *port, *main_app_port;
631     nxt_runtime_t  *rt;
632 
633     nxt_port_new_port_handler(task, msg);
634 
635     port = msg->u.new_port;
636 
637     if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) {
638         nxt_router_greet_controller(task, msg->u.new_port);
639     }
640 
641     if (port == NULL || port->type != NXT_PROCESS_APP) {
642 
643         if (msg->port_msg.stream == 0) {
644             return;
645         }
646 
647         msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
648 
649     } else {
650         if (msg->fd[1] != -1) {
651             res = nxt_router_port_queue_map(task, port, msg->fd[1]);
652             if (nxt_slow_path(res != NXT_OK)) {
653                 return;
654             }
655 
656             nxt_fd_close(msg->fd[1]);
657             msg->fd[1] = -1;
658         }
659     }
660 
661     if (msg->port_msg.stream != 0) {
662         nxt_port_rpc_handler(task, msg);
663         return;
664     }
665 
666     /*
667      * Port with "id == 0" is application 'main' port and it always
668      * should come with non-zero stream.
669      */
670     nxt_assert(port->id != 0);
671 
672     /* Find 'main' app port and get app reference. */
673     rt = task->thread->runtime;
674 
675     /*
676      * It is safe to access 'runtime->ports' hash because 'NEW_PORT'
677      * sent to main port (with id == 0) and processed in main thread.
678      */
679     main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0);
680     nxt_assert(main_app_port != NULL);
681 
682     app = main_app_port->app;
683 
684     if (nxt_fast_path(app != NULL)) {
685         nxt_thread_mutex_lock(&app->mutex);
686 
687         /* TODO here should be find-and-add code because there can be
688            port waiters in port_hash */
689         nxt_port_hash_add(&app->port_hash, port);
690         app->port_hash_count++;
691 
692         nxt_thread_mutex_unlock(&app->mutex);
693 
694         port->app = app;
695     }
696 
697     port->main_app_port = main_app_port;
698 
699     nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
700 }
701 
702 
703 static void
704 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
705 {
706     void                    *p;
707     size_t                  size;
708     nxt_int_t               ret;
709     nxt_port_t              *port;
710     nxt_router_temp_conf_t  *tmcf;
711 
712     port = nxt_runtime_port_find(task->thread->runtime,
713                                  msg->port_msg.pid,
714                                  msg->port_msg.reply_port);
715     if (nxt_slow_path(port == NULL)) {
716         nxt_alert(task, "conf_data_handler: reply port not found");
717         return;
718     }
719 
720     p = MAP_FAILED;
721 
722     /*
723      * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be
724      * initialized in 'cleanup' section.
725      */
726     size = 0;
727 
728     tmcf = nxt_router_temp_conf(task);
729     if (nxt_slow_path(tmcf == NULL)) {
730         goto fail;
731     }
732 
733     if (nxt_slow_path(msg->fd[0] == -1)) {
734         nxt_alert(task, "conf_data_handler: invalid shm fd");
735         goto fail;
736     }
737 
738     if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) {
739         nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)",
740                   (int) nxt_buf_mem_used_size(&msg->buf->mem));
741         goto fail;
742     }
743 
744     nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t));
745 
746     p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0);
747 
748     nxt_fd_close(msg->fd[0]);
749     msg->fd[0] = -1;
750 
751     if (nxt_slow_path(p == MAP_FAILED)) {
752         goto fail;
753     }
754 
755     nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p);
756 
757     tmcf->router_conf->router = nxt_router;
758     tmcf->stream = msg->port_msg.stream;
759     tmcf->port = port;
760 
761     nxt_port_use(task, tmcf->port, 1);
762 
763     ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size));
764 
765     if (nxt_fast_path(ret == NXT_OK)) {
766         nxt_router_conf_apply(task, tmcf, NULL);
767 
768     } else {
769         nxt_router_conf_error(task, tmcf);
770     }
771 
772     goto cleanup;
773 
774 fail:
775 
776     nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
777                           msg->port_msg.stream, 0, NULL);
778 
779     if (tmcf != NULL) {
780         nxt_mp_release(tmcf->mem_pool);
781     }
782 
783 cleanup:
784 
785     if (p != MAP_FAILED) {
786         nxt_mem_munmap(p, size);
787     }
788 
789     if (msg->fd[0] != -1) {
790         nxt_fd_close(msg->fd[0]);
791         msg->fd[0] = -1;
792     }
793 }
794 
795 
796 static void
797 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
798     void *data)
799 {
800     union {
801         nxt_pid_t  removed_pid;
802         void       *data;
803     } u;
804 
805     u.data = data;
806 
807     nxt_port_rpc_remove_peer(task, port, u.removed_pid);
808 }
809 
810 
811 static void
812 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
813 {
814     nxt_event_engine_t  *engine;
815 
816     nxt_port_remove_pid_handler(task, msg);
817 
818     nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
819     {
820         if (nxt_fast_path(engine->port != NULL)) {
821             nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid,
822                           msg->u.data);
823         }
824     }
825     nxt_queue_loop;
826 
827     if (msg->port_msg.stream == 0) {
828         return;
829     }
830 
831     msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
832 
833     nxt_port_rpc_handler(task, msg);
834 }
835 
836 
837 static nxt_router_temp_conf_t *
838 nxt_router_temp_conf(nxt_task_t *task)
839 {
840     nxt_mp_t                *mp, *tmp;
841     nxt_router_conf_t       *rtcf;
842     nxt_router_temp_conf_t  *tmcf;
843 
844     mp = nxt_mp_create(1024, 128, 256, 32);
845     if (nxt_slow_path(mp == NULL)) {
846         return NULL;
847     }
848 
849     rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t));
850     if (nxt_slow_path(rtcf == NULL)) {
851         goto fail;
852     }
853 
854     rtcf->mem_pool = mp;
855 
856     tmp = nxt_mp_create(1024, 128, 256, 32);
857     if (nxt_slow_path(tmp == NULL)) {
858         goto fail;
859     }
860 
861     tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t));
862     if (nxt_slow_path(tmcf == NULL)) {
863         goto temp_fail;
864     }
865 
866     tmcf->mem_pool = tmp;
867     tmcf->router_conf = rtcf;
868     tmcf->count = 1;
869     tmcf->engine = task->thread->engine;
870 
871     tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
872                                      sizeof(nxt_router_engine_conf_t));
873     if (nxt_slow_path(tmcf->engines == NULL)) {
874         goto temp_fail;
875     }
876 
877     nxt_queue_init(&creating_sockets);
878     nxt_queue_init(&pending_sockets);
879     nxt_queue_init(&updating_sockets);
880     nxt_queue_init(&keeping_sockets);
881     nxt_queue_init(&deleting_sockets);
882 
883 #if (NXT_TLS)
884     nxt_queue_init(&tmcf->tls);
885 #endif
886 
887     nxt_queue_init(&tmcf->apps);
888     nxt_queue_init(&tmcf->previous);
889 
890     return tmcf;
891 
892 temp_fail:
893 
894     nxt_mp_destroy(tmp);
895 
896 fail:
897 
898     nxt_mp_destroy(mp);
899 
900     return NULL;
901 }
902 
903 
904 nxt_inline nxt_bool_t
905 nxt_router_app_can_start(nxt_app_t *app)
906 {
907     return app->processes + app->pending_processes < app->max_processes
908             && app->pending_processes < app->max_pending_processes;
909 }
910 
911 
912 nxt_inline nxt_bool_t
913 nxt_router_app_need_start(nxt_app_t *app)
914 {
915     return (app->active_requests
916               > app->port_hash_count + app->pending_processes)
917            || (app->spare_processes
918                 > app->idle_processes + app->pending_processes);
919 }
920 
921 
922 static void
923 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
924 {
925     nxt_int_t                    ret;
926     nxt_app_t                    *app;
927     nxt_router_t                 *router;
928     nxt_runtime_t                *rt;
929     nxt_queue_link_t             *qlk;
930     nxt_socket_conf_t            *skcf;
931     nxt_router_conf_t            *rtcf;
932     nxt_router_temp_conf_t       *tmcf;
933     const nxt_event_interface_t  *interface;
934 #if (NXT_TLS)
935     nxt_router_tlssock_t         *tls;
936 #endif
937 
938     tmcf = obj;
939 
940     qlk = nxt_queue_first(&pending_sockets);
941 
942     if (qlk != nxt_queue_tail(&pending_sockets)) {
943         nxt_queue_remove(qlk);
944         nxt_queue_insert_tail(&creating_sockets, qlk);
945 
946         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
947 
948         nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
949 
950         return;
951     }
952 
953 #if (NXT_TLS)
954     qlk = nxt_queue_last(&tmcf->tls);
955 
956     if (qlk != nxt_queue_head(&tmcf->tls)) {
957         nxt_queue_remove(qlk);
958 
959         tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link);
960 
961         nxt_cert_store_get(task, &tls->name, tmcf->mem_pool,
962                            nxt_router_tls_rpc_handler, tls);
963         return;
964     }
965 #endif
966 
967     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
968 
969         if (nxt_router_app_need_start(app)) {
970             nxt_router_app_rpc_create(task, tmcf, app);
971             return;
972         }
973 
974     } nxt_queue_loop;
975 
976     rtcf = tmcf->router_conf;
977 
978     if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) {
979         nxt_router_access_log_open(task, tmcf);
980         return;
981     }
982 
983     rt = task->thread->runtime;
984 
985     interface = nxt_service_get(rt->services, "engine", NULL);
986 
987     router = rtcf->router;
988 
989     ret = nxt_router_engines_create(task, router, tmcf, interface);
990     if (nxt_slow_path(ret != NXT_OK)) {
991         goto fail;
992     }
993 
994     ret = nxt_router_threads_create(task, rt, tmcf);
995     if (nxt_slow_path(ret != NXT_OK)) {
996         goto fail;
997     }
998 
999     nxt_router_apps_sort(task, router, tmcf);
1000 
1001     nxt_router_apps_hash_use(task, rtcf, 1);
1002 
1003     nxt_router_engines_post(router, tmcf);
1004 
1005     nxt_queue_add(&router->sockets, &updating_sockets);
1006     nxt_queue_add(&router->sockets, &creating_sockets);
1007 
1008     router->access_log = rtcf->access_log;
1009 
1010     nxt_router_conf_ready(task, tmcf);
1011 
1012     return;
1013 
1014 fail:
1015 
1016     nxt_router_conf_error(task, tmcf);
1017 
1018     return;
1019 }
1020 
1021 
1022 static void
1023 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data)
1024 {
1025     nxt_joint_job_t  *job;
1026 
1027     job = obj;
1028 
1029     nxt_router_conf_ready(task, job->tmcf);
1030 }
1031 
1032 
1033 static void
1034 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1035 {
1036     uint32_t               count;
1037     nxt_router_conf_t      *rtcf;
1038     nxt_thread_spinlock_t  *lock;
1039 
1040     nxt_debug(task, "temp conf %p count: %D", tmcf, tmcf->count);
1041 
1042     if (--tmcf->count > 0) {
1043         return;
1044     }
1045 
1046     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
1047 
1048     rtcf = tmcf->router_conf;
1049 
1050     lock = &rtcf->router->lock;
1051 
1052     nxt_thread_spin_lock(lock);
1053 
1054     count = rtcf->count;
1055 
1056     nxt_thread_spin_unlock(lock);
1057 
1058     nxt_debug(task, "rtcf %p: %D", rtcf, count);
1059 
1060     if (count == 0) {
1061         nxt_router_apps_hash_use(task, rtcf, -1);
1062 
1063         nxt_router_access_log_release(task, lock, rtcf->access_log);
1064 
1065         nxt_mp_destroy(rtcf->mem_pool);
1066     }
1067 
1068     nxt_mp_release(tmcf->mem_pool);
1069 }
1070 
1071 
1072 static void
1073 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1074 {
1075     nxt_app_t          *app;
1076     nxt_queue_t        new_socket_confs;
1077     nxt_socket_t       s;
1078     nxt_router_t       *router;
1079     nxt_queue_link_t   *qlk;
1080     nxt_socket_conf_t  *skcf;
1081     nxt_router_conf_t  *rtcf;
1082 
1083     nxt_alert(task, "failed to apply new conf");
1084 
1085     for (qlk = nxt_queue_first(&creating_sockets);
1086          qlk != nxt_queue_tail(&creating_sockets);
1087          qlk = nxt_queue_next(qlk))
1088     {
1089         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1090         s = skcf->listen->socket;
1091 
1092         if (s != -1) {
1093             nxt_socket_close(task, s);
1094         }
1095 
1096         nxt_free(skcf->listen);
1097     }
1098 
1099     nxt_queue_init(&new_socket_confs);
1100     nxt_queue_add(&new_socket_confs, &updating_sockets);
1101     nxt_queue_add(&new_socket_confs, &pending_sockets);
1102     nxt_queue_add(&new_socket_confs, &creating_sockets);
1103 
1104     rtcf = tmcf->router_conf;
1105 
1106     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1107 
1108         nxt_router_app_unlink(task, app);
1109 
1110     } nxt_queue_loop;
1111 
1112     router = rtcf->router;
1113 
1114     nxt_queue_add(&router->sockets, &keeping_sockets);
1115     nxt_queue_add(&router->sockets, &deleting_sockets);
1116 
1117     nxt_queue_add(&router->apps, &tmcf->previous);
1118 
1119     // TODO: new engines and threads
1120 
1121     nxt_router_access_log_release(task, &router->lock, rtcf->access_log);
1122 
1123     nxt_mp_destroy(rtcf->mem_pool);
1124 
1125     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
1126 
1127     nxt_mp_release(tmcf->mem_pool);
1128 }
1129 
1130 
1131 static void
1132 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1133     nxt_port_msg_type_t type)
1134 {
1135     nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);
1136 
1137     nxt_port_use(task, tmcf->port, -1);
1138 
1139     tmcf->port = NULL;
1140 }
1141 
1142 
1143 static nxt_conf_map_t  nxt_router_conf[] = {
1144     {
1145         nxt_string("listeners_threads"),
1146         NXT_CONF_MAP_INT32,
1147         offsetof(nxt_router_conf_t, threads),
1148     },
1149 };
1150 
1151 
1152 static nxt_conf_map_t  nxt_router_app_conf[] = {
1153     {
1154         nxt_string("type"),
1155         NXT_CONF_MAP_STR,
1156         offsetof(nxt_router_app_conf_t, type),
1157     },
1158 
1159     {
1160         nxt_string("limits"),
1161         NXT_CONF_MAP_PTR,
1162         offsetof(nxt_router_app_conf_t, limits_value),
1163     },
1164 
1165     {
1166         nxt_string("processes"),
1167         NXT_CONF_MAP_INT32,
1168         offsetof(nxt_router_app_conf_t, processes),
1169     },
1170 
1171     {
1172         nxt_string("processes"),
1173         NXT_CONF_MAP_PTR,
1174         offsetof(nxt_router_app_conf_t, processes_value),
1175     },
1176 
1177     {
1178         nxt_string("targets"),
1179         NXT_CONF_MAP_PTR,
1180         offsetof(nxt_router_app_conf_t, targets_value),
1181     },
1182 };
1183 
1184 
1185 static nxt_conf_map_t  nxt_router_app_limits_conf[] = {
1186     {
1187         nxt_string("timeout"),
1188         NXT_CONF_MAP_MSEC,
1189         offsetof(nxt_router_app_conf_t, timeout),
1190     },
1191 
1192     {
1193         nxt_string("requests"),
1194         NXT_CONF_MAP_INT32,
1195         offsetof(nxt_router_app_conf_t, requests),
1196     },
1197 };
1198 
1199 
1200 static nxt_conf_map_t  nxt_router_app_processes_conf[] = {
1201     {
1202         nxt_string("spare"),
1203         NXT_CONF_MAP_INT32,
1204         offsetof(nxt_router_app_conf_t, spare_processes),
1205     },
1206 
1207     {
1208         nxt_string("max"),
1209         NXT_CONF_MAP_INT32,
1210         offsetof(nxt_router_app_conf_t, max_processes),
1211     },
1212 
1213     {
1214         nxt_string("idle_timeout"),
1215         NXT_CONF_MAP_MSEC,
1216         offsetof(nxt_router_app_conf_t, idle_timeout),
1217     },
1218 };
1219 
1220 
1221 static nxt_conf_map_t  nxt_router_listener_conf[] = {
1222     {
1223         nxt_string("pass"),
1224         NXT_CONF_MAP_STR_COPY,
1225         offsetof(nxt_router_listener_conf_t, pass),
1226     },
1227 
1228     {
1229         nxt_string("application"),
1230         NXT_CONF_MAP_STR_COPY,
1231         offsetof(nxt_router_listener_conf_t, application),
1232     },
1233 };
1234 
1235 
1236 static nxt_conf_map_t  nxt_router_http_conf[] = {
1237     {
1238         nxt_string("header_buffer_size"),
1239         NXT_CONF_MAP_SIZE,
1240         offsetof(nxt_socket_conf_t, header_buffer_size),
1241     },
1242 
1243     {
1244         nxt_string("large_header_buffer_size"),
1245         NXT_CONF_MAP_SIZE,
1246         offsetof(nxt_socket_conf_t, large_header_buffer_size),
1247     },
1248 
1249     {
1250         nxt_string("large_header_buffers"),
1251         NXT_CONF_MAP_SIZE,
1252         offsetof(nxt_socket_conf_t, large_header_buffers),
1253     },
1254 
1255     {
1256         nxt_string("body_buffer_size"),
1257         NXT_CONF_MAP_SIZE,
1258         offsetof(nxt_socket_conf_t, body_buffer_size),
1259     },
1260 
1261     {
1262         nxt_string("max_body_size"),
1263         NXT_CONF_MAP_SIZE,
1264         offsetof(nxt_socket_conf_t, max_body_size),
1265     },
1266 
1267     {
1268         nxt_string("idle_timeout"),
1269         NXT_CONF_MAP_MSEC,
1270         offsetof(nxt_socket_conf_t, idle_timeout),
1271     },
1272 
1273     {
1274         nxt_string("header_read_timeout"),
1275         NXT_CONF_MAP_MSEC,
1276         offsetof(nxt_socket_conf_t, header_read_timeout),
1277     },
1278 
1279     {
1280         nxt_string("body_read_timeout"),
1281         NXT_CONF_MAP_MSEC,
1282         offsetof(nxt_socket_conf_t, body_read_timeout),
1283     },
1284 
1285     {
1286         nxt_string("send_timeout"),
1287         NXT_CONF_MAP_MSEC,
1288         offsetof(nxt_socket_conf_t, send_timeout),
1289     },
1290 
1291     {
1292         nxt_string("body_temp_path"),
1293         NXT_CONF_MAP_STR,
1294         offsetof(nxt_socket_conf_t, body_temp_path),
1295     },
1296 
1297     {
1298         nxt_string("discard_unsafe_fields"),
1299         NXT_CONF_MAP_INT8,
1300         offsetof(nxt_socket_conf_t, discard_unsafe_fields),
1301     },
1302 };
1303 
1304 
1305 static nxt_conf_map_t  nxt_router_websocket_conf[] = {
1306     {
1307         nxt_string("max_frame_size"),
1308         NXT_CONF_MAP_SIZE,
1309         offsetof(nxt_websocket_conf_t, max_frame_size),
1310     },
1311 
1312     {
1313         nxt_string("read_timeout"),
1314         NXT_CONF_MAP_MSEC,
1315         offsetof(nxt_websocket_conf_t, read_timeout),
1316     },
1317 
1318     {
1319         nxt_string("keepalive_interval"),
1320         NXT_CONF_MAP_MSEC,
1321         offsetof(nxt_websocket_conf_t, keepalive_interval),
1322     },
1323 
1324 };
1325 
1326 
1327 static nxt_int_t
1328 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1329     u_char *start, u_char *end)
1330 {
1331     u_char                      *p;
1332     size_t                      size;
1333     nxt_mp_t                    *mp, *app_mp;
1334     uint32_t                    next, next_target;
1335     nxt_int_t                   ret;
1336     nxt_str_t                   name, path, target;
1337     nxt_app_t                   *app, *prev;
1338     nxt_str_t                   *t, *s, *targets;
1339     nxt_uint_t                  n, i;
1340     nxt_port_t                  *port;
1341     nxt_router_t                *router;
1342     nxt_app_joint_t             *app_joint;
1343 #if (NXT_TLS)
1344     nxt_tls_init_t              *tls_init;
1345     nxt_conf_value_t            *certificate;
1346 #endif
1347     nxt_conf_value_t            *conf, *http, *value, *websocket;
1348     nxt_conf_value_t            *applications, *application;
1349     nxt_conf_value_t            *listeners, *listener;
1350     nxt_conf_value_t            *routes_conf, *static_conf;
1351     nxt_socket_conf_t           *skcf;
1352     nxt_http_routes_t           *routes;
1353     nxt_event_engine_t          *engine;
1354     nxt_app_lang_module_t       *lang;
1355     nxt_router_app_conf_t       apcf;
1356     nxt_router_access_log_t     *access_log;
1357     nxt_router_listener_conf_t  lscf;
1358 
1359     static nxt_str_t  http_path = nxt_string("/settings/http");
1360     static nxt_str_t  applications_path = nxt_string("/applications");
1361     static nxt_str_t  listeners_path = nxt_string("/listeners");
1362     static nxt_str_t  routes_path = nxt_string("/routes");
1363     static nxt_str_t  access_log_path = nxt_string("/access_log");
1364 #if (NXT_TLS)
1365     static nxt_str_t  certificate_path = nxt_string("/tls/certificate");
1366     static nxt_str_t  conf_commands_path = nxt_string("/tls/conf_commands");
1367     static nxt_str_t  conf_cache_path = nxt_string("/tls/session/cache_size");
1368     static nxt_str_t  conf_timeout_path = nxt_string("/tls/session/timeout");
1369 #endif
1370     static nxt_str_t  static_path = nxt_string("/settings/http/static");
1371     static nxt_str_t  websocket_path = nxt_string("/settings/http/websocket");
1372 
1373     conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
1374     if (conf == NULL) {
1375         nxt_alert(task, "configuration parsing error");
1376         return NXT_ERROR;
1377     }
1378 
1379     mp = tmcf->router_conf->mem_pool;
1380 
1381     ret = nxt_conf_map_object(mp, conf, nxt_router_conf,
1382                               nxt_nitems(nxt_router_conf), tmcf->router_conf);
1383     if (ret != NXT_OK) {
1384         nxt_alert(task, "root map error");
1385         return NXT_ERROR;
1386     }
1387 
1388     if (tmcf->router_conf->threads == 0) {
1389         tmcf->router_conf->threads = nxt_ncpu;
1390     }
1391 
1392     static_conf = nxt_conf_get_path(conf, &static_path);
1393 
1394     ret = nxt_router_conf_process_static(task, tmcf->router_conf, static_conf);
1395     if (nxt_slow_path(ret != NXT_OK)) {
1396         return NXT_ERROR;
1397     }
1398 
1399     router = tmcf->router_conf->router;
1400 
1401     applications = nxt_conf_get_path(conf, &applications_path);
1402 
1403     if (applications != NULL) {
1404         next = 0;
1405 
1406         for ( ;; ) {
1407             application = nxt_conf_next_object_member(applications,
1408                                                       &name, &next);
1409             if (application == NULL) {
1410                 break;
1411             }
1412 
1413             nxt_debug(task, "application \"%V\"", &name);
1414 
1415             size = nxt_conf_json_length(application, NULL);
1416 
1417             app_mp = nxt_mp_create(4096, 128, 1024, 64);
1418             if (nxt_slow_path(app_mp == NULL)) {
1419                 goto fail;
1420             }
1421 
1422             app = nxt_mp_get(app_mp, sizeof(nxt_app_t) + name.length + size);
1423             if (app == NULL) {
1424                 goto app_fail;
1425             }
1426 
1427             nxt_memzero(app, sizeof(nxt_app_t));
1428 
1429             app->mem_pool = app_mp;
1430 
1431             app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t));
1432             app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t)
1433                                                   + name.length);
1434 
1435             p = nxt_conf_json_print(app->conf.start, application, NULL);
1436             app->conf.length = p - app->conf.start;
1437 
1438             nxt_assert(app->conf.length <= size);
1439 
1440             nxt_debug(task, "application conf \"%V\"", &app->conf);
1441 
1442             prev = nxt_router_app_find(&router->apps, &name);
1443 
1444             if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
1445                 nxt_mp_destroy(app_mp);
1446 
1447                 nxt_queue_remove(&prev->link);
1448                 nxt_queue_insert_tail(&tmcf->previous, &prev->link);
1449 
1450                 ret = nxt_router_apps_hash_add(tmcf->router_conf, prev);
1451                 if (nxt_slow_path(ret != NXT_OK)) {
1452                     goto fail;
1453                 }
1454 
1455                 continue;
1456             }
1457 
1458             apcf.processes = 1;
1459             apcf.max_processes = 1;
1460             apcf.spare_processes = 0;
1461             apcf.timeout = 0;
1462             apcf.idle_timeout = 15000;
1463             apcf.requests = 0;
1464             apcf.limits_value = NULL;
1465             apcf.processes_value = NULL;
1466             apcf.targets_value = NULL;
1467 
1468             app_joint = nxt_malloc(sizeof(nxt_app_joint_t));
1469             if (nxt_slow_path(app_joint == NULL)) {
1470                 goto app_fail;
1471             }
1472 
1473             nxt_memzero(app_joint, sizeof(nxt_app_joint_t));
1474 
1475             ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
1476                                       nxt_nitems(nxt_router_app_conf), &apcf);
1477             if (ret != NXT_OK) {
1478                 nxt_alert(task, "application map error");
1479                 goto app_fail;
1480             }
1481 
1482             if (apcf.limits_value != NULL) {
1483 
1484                 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) {
1485                     nxt_alert(task, "application limits is not object");
1486                     goto app_fail;
1487                 }
1488 
1489                 ret = nxt_conf_map_object(mp, apcf.limits_value,
1490                                         nxt_router_app_limits_conf,
1491                                         nxt_nitems(nxt_router_app_limits_conf),
1492                                         &apcf);
1493                 if (ret != NXT_OK) {
1494                     nxt_alert(task, "application limits map error");
1495                     goto app_fail;
1496                 }
1497             }
1498 
1499             if (apcf.processes_value != NULL
1500                 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT)
1501             {
1502                 ret = nxt_conf_map_object(mp, apcf.processes_value,
1503                                      nxt_router_app_processes_conf,
1504                                      nxt_nitems(nxt_router_app_processes_conf),
1505                                      &apcf);
1506                 if (ret != NXT_OK) {
1507                     nxt_alert(task, "application processes map error");
1508                     goto app_fail;
1509                 }
1510 
1511             } else {
1512                 apcf.max_processes = apcf.processes;
1513                 apcf.spare_processes = apcf.processes;
1514             }
1515 
1516             if (apcf.targets_value != NULL) {
1517                 n = nxt_conf_object_members_count(apcf.targets_value);
1518 
1519                 targets = nxt_mp_get(app_mp, sizeof(nxt_str_t) * n);
1520                 if (nxt_slow_path(targets == NULL)) {
1521                     goto app_fail;
1522                 }
1523 
1524                 next_target = 0;
1525 
1526                 for (i = 0; i < n; i++) {
1527                     (void) nxt_conf_next_object_member(apcf.targets_value,
1528                                                        &target, &next_target);
1529 
1530                     s = nxt_str_dup(app_mp, &targets[i], &target);
1531                     if (nxt_slow_path(s == NULL)) {
1532                         goto app_fail;
1533                     }
1534                 }
1535 
1536             } else {
1537                 targets = NULL;
1538             }
1539 
1540             nxt_debug(task, "application type: %V", &apcf.type);
1541             nxt_debug(task, "application processes: %D", apcf.processes);
1542             nxt_debug(task, "application request timeout: %M", apcf.timeout);
1543             nxt_debug(task, "application requests: %D", apcf.requests);
1544 
1545             lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
1546 
1547             if (lang == NULL) {
1548                 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type);
1549                 goto app_fail;
1550             }
1551 
1552             nxt_debug(task, "application language module: \"%s\"", lang->file);
1553 
1554             ret = nxt_thread_mutex_create(&app->mutex);
1555             if (ret != NXT_OK) {
1556                 goto app_fail;
1557             }
1558 
1559             nxt_queue_init(&app->ports);
1560             nxt_queue_init(&app->spare_ports);
1561             nxt_queue_init(&app->idle_ports);
1562             nxt_queue_init(&app->ack_waiting_req);
1563 
1564             app->name.length = name.length;
1565             nxt_memcpy(app->name.start, name.start, name.length);
1566 
1567             app->type = lang->type;
1568             app->max_processes = apcf.max_processes;
1569             app->spare_processes = apcf.spare_processes;
1570             app->max_pending_processes = apcf.spare_processes
1571                                          ? apcf.spare_processes : 1;
1572             app->timeout = apcf.timeout;
1573             app->idle_timeout = apcf.idle_timeout;
1574             app->max_requests = apcf.requests;
1575 
1576             app->targets = targets;
1577 
1578             engine = task->thread->engine;
1579 
1580             app->engine = engine;
1581 
1582             app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;
1583             app->adjust_idle_work.task = &engine->task;
1584             app->adjust_idle_work.obj = app;
1585 
1586             nxt_queue_insert_tail(&tmcf->apps, &app->link);
1587 
1588             ret = nxt_router_apps_hash_add(tmcf->router_conf, app);
1589             if (nxt_slow_path(ret != NXT_OK)) {
1590                 goto app_fail;
1591             }
1592 
1593             nxt_router_app_use(task, app, 1);
1594 
1595             app->joint = app_joint;
1596 
1597             app_joint->use_count = 1;
1598             app_joint->app = app;
1599 
1600             app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS;
1601             app_joint->idle_timer.work_queue = &engine->fast_work_queue;
1602             app_joint->idle_timer.handler = nxt_router_app_idle_timeout;
1603             app_joint->idle_timer.task = &engine->task;
1604             app_joint->idle_timer.log = app_joint->idle_timer.task->log;
1605 
1606             app_joint->free_app_work.handler = nxt_router_free_app;
1607             app_joint->free_app_work.task = &engine->task;
1608             app_joint->free_app_work.obj = app_joint;
1609 
1610             port = nxt_port_new(task, (nxt_port_id_t) -1, nxt_pid,
1611                                 NXT_PROCESS_APP);
1612             if (nxt_slow_path(port == NULL)) {
1613                 return NXT_ERROR;
1614             }
1615 
1616             ret = nxt_port_socket_init(task, port, 0);
1617             if (nxt_slow_path(ret != NXT_OK)) {
1618                 nxt_port_use(task, port, -1);
1619                 return NXT_ERROR;
1620             }
1621 
1622             ret = nxt_router_app_queue_init(task, port);
1623             if (nxt_slow_path(ret != NXT_OK)) {
1624                 nxt_port_write_close(port);
1625                 nxt_port_read_close(port);
1626                 nxt_port_use(task, port, -1);
1627                 return NXT_ERROR;
1628             }
1629 
1630             nxt_port_write_enable(task, port);
1631             port->app = app;
1632 
1633             app->shared_port = port;
1634 
1635             nxt_thread_mutex_create(&app->outgoing.mutex);
1636         }
1637     }
1638 
1639     routes_conf = nxt_conf_get_path(conf, &routes_path);
1640     if (nxt_fast_path(routes_conf != NULL)) {
1641         routes = nxt_http_routes_create(task, tmcf, routes_conf);
1642         if (nxt_slow_path(routes == NULL)) {
1643             return NXT_ERROR;
1644         }
1645         tmcf->router_conf->routes = routes;
1646     }
1647 
1648     ret = nxt_upstreams_create(task, tmcf, conf);
1649     if (nxt_slow_path(ret != NXT_OK)) {
1650         return ret;
1651     }
1652 
1653     http = nxt_conf_get_path(conf, &http_path);
1654 #if 0
1655     if (http == NULL) {
1656         nxt_alert(task, "no \"http\" block");
1657         return NXT_ERROR;
1658     }
1659 #endif
1660 
1661     websocket = nxt_conf_get_path(conf, &websocket_path);
1662 
1663     listeners = nxt_conf_get_path(conf, &listeners_path);
1664 
1665     if (listeners != NULL) {
1666         next = 0;
1667 
1668         for ( ;; ) {
1669             listener = nxt_conf_next_object_member(listeners, &name, &next);
1670             if (listener == NULL) {
1671                 break;
1672             }
1673 
1674             skcf = nxt_router_socket_conf(task, tmcf, &name);
1675             if (skcf == NULL) {
1676                 goto fail;
1677             }
1678 
1679             nxt_memzero(&lscf, sizeof(lscf));
1680 
1681             ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
1682                                       nxt_nitems(nxt_router_listener_conf),
1683                                       &lscf);
1684             if (ret != NXT_OK) {
1685                 nxt_alert(task, "listener map error");
1686                 goto fail;
1687             }
1688 
1689             nxt_debug(task, "application: %V", &lscf.application);
1690 
1691             // STUB, default values if http block is not defined.
1692             skcf->header_buffer_size = 2048;
1693             skcf->large_header_buffer_size = 8192;
1694             skcf->large_header_buffers = 4;
1695             skcf->discard_unsafe_fields = 1;
1696             skcf->body_buffer_size = 16 * 1024;
1697             skcf->max_body_size = 8 * 1024 * 1024;
1698             skcf->proxy_header_buffer_size = 64 * 1024;
1699             skcf->proxy_buffer_size = 4096;
1700             skcf->proxy_buffers = 256;
1701             skcf->idle_timeout = 180 * 1000;
1702             skcf->header_read_timeout = 30 * 1000;
1703             skcf->body_read_timeout = 30 * 1000;
1704             skcf->send_timeout = 30 * 1000;
1705             skcf->proxy_timeout = 60 * 1000;
1706             skcf->proxy_send_timeout = 30 * 1000;
1707             skcf->proxy_read_timeout = 30 * 1000;
1708 
1709             skcf->websocket_conf.max_frame_size = 1024 * 1024;
1710             skcf->websocket_conf.read_timeout = 60 * 1000;
1711             skcf->websocket_conf.keepalive_interval = 30 * 1000;
1712 
1713             nxt_str_null(&skcf->body_temp_path);
1714 
1715             if (http != NULL) {
1716                 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
1717                                           nxt_nitems(nxt_router_http_conf),
1718                                           skcf);
1719                 if (ret != NXT_OK) {
1720                     nxt_alert(task, "http map error");
1721                     goto fail;
1722                 }
1723             }
1724 
1725             if (websocket != NULL) {
1726                 ret = nxt_conf_map_object(mp, websocket,
1727                                           nxt_router_websocket_conf,
1728                                           nxt_nitems(nxt_router_websocket_conf),
1729                                           &skcf->websocket_conf);
1730                 if (ret != NXT_OK) {
1731                     nxt_alert(task, "websocket map error");
1732                     goto fail;
1733                 }
1734             }
1735 
1736             t = &skcf->body_temp_path;
1737 
1738             if (t->length == 0) {
1739                 t->start = (u_char *) task->thread->runtime->tmp;
1740                 t->length = nxt_strlen(t->start);
1741             }
1742 
1743 #if (NXT_TLS)
1744             certificate = nxt_conf_get_path(listener, &certificate_path);
1745 
1746             if (certificate != NULL) {
1747                 tls_init = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_tls_init_t));
1748                 if (nxt_slow_path(tls_init == NULL)) {
1749                     return NXT_ERROR;
1750                 }
1751 
1752                 tls_init->cache_size = 0;
1753                 tls_init->timeout = 300;
1754 
1755                 value = nxt_conf_get_path(listener, &conf_cache_path);
1756                 if (value != NULL) {
1757                     tls_init->cache_size = nxt_conf_get_number(value);
1758                 }
1759 
1760                 value = nxt_conf_get_path(listener, &conf_timeout_path);
1761                 if (value != NULL) {
1762                     tls_init->timeout = nxt_conf_get_number(value);
1763                 }
1764 
1765                 tls_init->conf_cmds = nxt_conf_get_path(listener,
1766                                                         &conf_commands_path);
1767 
1768                 if (nxt_conf_type(certificate) == NXT_CONF_ARRAY) {
1769                     n = nxt_conf_array_elements_count(certificate);
1770 
1771                     for (i = 0; i < n; i++) {
1772                         value = nxt_conf_get_array_element(certificate, i);
1773 
1774                         nxt_assert(value != NULL);
1775 
1776                         ret = nxt_router_conf_tls_insert(tmcf, value, skcf,
1777                                                          tls_init, i == 0);
1778                         if (nxt_slow_path(ret != NXT_OK)) {
1779                             goto fail;
1780                         }
1781                     }
1782 
1783                 } else {
1784                     /* NXT_CONF_STRING */
1785                     ret = nxt_router_conf_tls_insert(tmcf, certificate, skcf,
1786                                                      tls_init, 1);
1787                     if (nxt_slow_path(ret != NXT_OK)) {
1788                         goto fail;
1789                     }
1790                 }
1791             }
1792 #endif
1793 
1794             skcf->listen->handler = nxt_http_conn_init;
1795             skcf->router_conf = tmcf->router_conf;
1796             skcf->router_conf->count++;
1797 
1798             if (lscf.pass.length != 0) {
1799                 skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass);
1800 
1801             /* COMPATIBILITY: listener application. */
1802             } else if (lscf.application.length > 0) {
1803                 skcf->action = nxt_http_pass_application(task,
1804                                                          tmcf->router_conf,
1805                                                          &lscf.application);
1806             }
1807 
1808             if (nxt_slow_path(skcf->action == NULL)) {
1809                 goto fail;
1810             }
1811         }
1812     }
1813 
1814     ret = nxt_http_routes_resolve(task, tmcf);
1815     if (nxt_slow_path(ret != NXT_OK)) {
1816         goto fail;
1817     }
1818 
1819     value = nxt_conf_get_path(conf, &access_log_path);
1820 
1821     if (value != NULL) {
1822         nxt_conf_get_string(value, &path);
1823 
1824         access_log = router->access_log;
1825 
1826         if (access_log != NULL && nxt_strstr_eq(&path, &access_log->path)) {
1827             nxt_thread_spin_lock(&router->lock);
1828             access_log->count++;
1829             nxt_thread_spin_unlock(&router->lock);
1830 
1831         } else {
1832             access_log = nxt_malloc(sizeof(nxt_router_access_log_t)
1833                                     + path.length);
1834             if (access_log == NULL) {
1835                 nxt_alert(task, "failed to allocate access log structure");
1836                 goto fail;
1837             }
1838 
1839             access_log->fd = -1;
1840             access_log->handler = &nxt_router_access_log_writer;
1841             access_log->count = 1;
1842 
1843             access_log->path.length = path.length;
1844             access_log->path.start = (u_char *) access_log
1845                                      + sizeof(nxt_router_access_log_t);
1846 
1847             nxt_memcpy(access_log->path.start, path.start, path.length);
1848         }
1849 
1850         tmcf->router_conf->access_log = access_log;
1851     }
1852 
1853     nxt_queue_add(&deleting_sockets, &router->sockets);
1854     nxt_queue_init(&router->sockets);
1855 
1856     return NXT_OK;
1857 
1858 app_fail:
1859 
1860     nxt_mp_destroy(app_mp);
1861 
1862 fail:
1863 
1864     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1865 
1866         nxt_queue_remove(&app->link);
1867         nxt_thread_mutex_destroy(&app->mutex);
1868         nxt_mp_destroy(app->mem_pool);
1869 
1870     } nxt_queue_loop;
1871 
1872     return NXT_ERROR;
1873 }
1874 
1875 
1876 #if (NXT_TLS)
1877 
1878 static nxt_int_t
1879 nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
1880     nxt_conf_value_t *value, nxt_socket_conf_t *skcf,
1881     nxt_tls_init_t *tls_init, nxt_bool_t last)
1882 {
1883     nxt_router_tlssock_t  *tls;
1884 
1885     tls = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_router_tlssock_t));
1886     if (nxt_slow_path(tls == NULL)) {
1887         return NXT_ERROR;
1888     }
1889 
1890     tls->tls_init = tls_init;
1891     tls->socket_conf = skcf;
1892     tls->temp_conf = tmcf;
1893     tls->last = last;
1894     nxt_conf_get_string(value, &tls->name);
1895 
1896     nxt_queue_insert_tail(&tmcf->tls, &tls->link);
1897 
1898     return NXT_OK;
1899 }
1900 
1901 #endif
1902 
1903 
1904 static nxt_int_t
1905 nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf,
1906     nxt_conf_value_t *conf)
1907 {
1908     uint32_t          next, i;
1909     nxt_mp_t          *mp;
1910     nxt_str_t         *type, exten, str;
1911     nxt_int_t         ret;
1912     nxt_uint_t        exts;
1913     nxt_conf_value_t  *mtypes_conf, *ext_conf, *value;
1914 
1915     static nxt_str_t  mtypes_path = nxt_string("/mime_types");
1916 
1917     mp = rtcf->mem_pool;
1918 
1919     ret = nxt_http_static_mtypes_init(mp, &rtcf->mtypes_hash);
1920     if (nxt_slow_path(ret != NXT_OK)) {
1921         return NXT_ERROR;
1922     }
1923 
1924     if (conf == NULL) {
1925         return NXT_OK;
1926     }
1927 
1928     mtypes_conf = nxt_conf_get_path(conf, &mtypes_path);
1929 
1930     if (mtypes_conf != NULL) {
1931         next = 0;
1932 
1933         for ( ;; ) {
1934             ext_conf = nxt_conf_next_object_member(mtypes_conf, &str, &next);
1935 
1936             if (ext_conf == NULL) {
1937                 break;
1938             }
1939 
1940             type = nxt_str_dup(mp, NULL, &str);
1941             if (nxt_slow_path(type == NULL)) {
1942                 return NXT_ERROR;
1943             }
1944 
1945             if (nxt_conf_type(ext_conf) == NXT_CONF_STRING) {
1946                 nxt_conf_get_string(ext_conf, &str);
1947 
1948                 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
1949                     return NXT_ERROR;
1950                 }
1951 
1952                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
1953                                                       &exten, type);
1954                 if (nxt_slow_path(ret != NXT_OK)) {
1955                     return NXT_ERROR;
1956                 }
1957 
1958                 continue;
1959             }
1960 
1961             exts = nxt_conf_array_elements_count(ext_conf);
1962 
1963             for (i = 0; i < exts; i++) {
1964                 value = nxt_conf_get_array_element(ext_conf, i);
1965 
1966                 nxt_conf_get_string(value, &str);
1967 
1968                 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
1969                     return NXT_ERROR;
1970                 }
1971 
1972                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
1973                                                       &exten, type);
1974                 if (nxt_slow_path(ret != NXT_OK)) {
1975                     return NXT_ERROR;
1976                 }
1977             }
1978         }
1979     }
1980 
1981     return NXT_OK;
1982 }
1983 
1984 
1985 static nxt_app_t *
1986 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
1987 {
1988     nxt_app_t  *app;
1989 
1990     nxt_queue_each(app, queue, nxt_app_t, link) {
1991 
1992         if (nxt_strstr_eq(name, &app->name)) {
1993             return app;
1994         }
1995 
1996     } nxt_queue_loop;
1997 
1998     return NULL;
1999 }
2000 
2001 
2002 static nxt_int_t
2003 nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port)
2004 {
2005     void       *mem;
2006     nxt_int_t  fd;
2007 
2008     fd = nxt_shm_open(task, sizeof(nxt_app_queue_t));
2009     if (nxt_slow_path(fd == -1)) {
2010         return NXT_ERROR;
2011     }
2012 
2013     mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t),
2014                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2015     if (nxt_slow_path(mem == MAP_FAILED)) {
2016         nxt_fd_close(fd);
2017 
2018         return NXT_ERROR;
2019     }
2020 
2021     nxt_app_queue_init(mem);
2022 
2023     port->queue_fd = fd;
2024     port->queue = mem;
2025 
2026     return NXT_OK;
2027 }
2028 
2029 
2030 static nxt_int_t
2031 nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port)
2032 {
2033     void       *mem;
2034     nxt_int_t  fd;
2035 
2036     fd = nxt_shm_open(task, sizeof(nxt_port_queue_t));
2037     if (nxt_slow_path(fd == -1)) {
2038         return NXT_ERROR;
2039     }
2040 
2041     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2042                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2043     if (nxt_slow_path(mem == MAP_FAILED)) {
2044         nxt_fd_close(fd);
2045 
2046         return NXT_ERROR;
2047     }
2048 
2049     nxt_port_queue_init(mem);
2050 
2051     port->queue_fd = fd;
2052     port->queue = mem;
2053 
2054     return NXT_OK;
2055 }
2056 
2057 
2058 static nxt_int_t
2059 nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd)
2060 {
2061     void  *mem;
2062 
2063     nxt_assert(fd != -1);
2064 
2065     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2066                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2067     if (nxt_slow_path(mem == MAP_FAILED)) {
2068 
2069         return NXT_ERROR;
2070     }
2071 
2072     port->queue = mem;
2073 
2074     return NXT_OK;
2075 }
2076 
2077 
2078 static const nxt_lvlhsh_proto_t  nxt_router_apps_hash_proto  nxt_aligned(64) = {
2079     NXT_LVLHSH_DEFAULT,
2080     nxt_router_apps_hash_test,
2081     nxt_mp_lvlhsh_alloc,
2082     nxt_mp_lvlhsh_free,
2083 };
2084 
2085 
2086 static nxt_int_t
2087 nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
2088 {
2089     nxt_app_t  *app;
2090 
2091     app = data;
2092 
2093     return nxt_strstr_eq(&lhq->key, &app->name) ? NXT_OK : NXT_DECLINED;
2094 }
2095 
2096 
2097 static nxt_int_t
2098 nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, nxt_app_t *app)
2099 {
2100     nxt_lvlhsh_query_t  lhq;
2101 
2102     lhq.key_hash = nxt_djb_hash(app->name.start, app->name.length);
2103     lhq.replace = 0;
2104     lhq.key = app->name;
2105     lhq.value = app;
2106     lhq.proto = &nxt_router_apps_hash_proto;
2107     lhq.pool = rtcf->mem_pool;
2108 
2109     switch (nxt_lvlhsh_insert(&rtcf->apps_hash, &lhq)) {
2110 
2111     case NXT_OK:
2112         return NXT_OK;
2113 
2114     case NXT_DECLINED:
2115         nxt_thread_log_alert("router app hash adding failed: "
2116                              "\"%V\" is already in hash", &lhq.key);
2117         /* Fall through. */
2118     default:
2119         return NXT_ERROR;
2120     }
2121 }
2122 
2123 
2124 static nxt_app_t *
2125 nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, nxt_str_t *name)
2126 {
2127     nxt_lvlhsh_query_t  lhq;
2128 
2129     lhq.key_hash = nxt_djb_hash(name->start, name->length);
2130     lhq.key = *name;
2131     lhq.proto = &nxt_router_apps_hash_proto;
2132 
2133     if (nxt_lvlhsh_find(&rtcf->apps_hash, &lhq) != NXT_OK) {
2134         return NULL;
2135     }
2136 
2137     return lhq.value;
2138 }
2139 
2140 
2141 static void
2142 nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i)
2143 {
2144     nxt_app_t          *app;
2145     nxt_lvlhsh_each_t  lhe;
2146 
2147     nxt_lvlhsh_each_init(&lhe, &nxt_router_apps_hash_proto);
2148 
2149     for ( ;; ) {
2150         app = nxt_lvlhsh_each(&rtcf->apps_hash, &lhe);
2151 
2152         if (app == NULL) {
2153             break;
2154         }
2155 
2156         nxt_router_app_use(task, app, i);
2157     }
2158 }
2159 
2160 
2161 
2162 nxt_int_t
2163 nxt_router_listener_application(nxt_router_conf_t *rtcf, nxt_str_t *name,
2164     nxt_http_action_t *action)
2165 {
2166     nxt_app_t  *app;
2167 
2168     app = nxt_router_apps_hash_get(rtcf, name);
2169 
2170     if (app == NULL) {
2171         return NXT_DECLINED;
2172     }
2173 
2174     action->u.app.application = app;
2175     action->handler = nxt_http_application_handler;
2176 
2177     return NXT_OK;
2178 }
2179 
2180 
2181 static nxt_socket_conf_t *
2182 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2183     nxt_str_t *name)
2184 {
2185     size_t               size;
2186     nxt_int_t            ret;
2187     nxt_bool_t           wildcard;
2188     nxt_sockaddr_t       *sa;
2189     nxt_socket_conf_t    *skcf;
2190     nxt_listen_socket_t  *ls;
2191 
2192     sa = nxt_sockaddr_parse(tmcf->mem_pool, name);
2193     if (nxt_slow_path(sa == NULL)) {
2194         nxt_alert(task, "invalid listener \"%V\"", name);
2195         return NULL;
2196     }
2197 
2198     sa->type = SOCK_STREAM;
2199 
2200     nxt_debug(task, "router listener: \"%*s\"",
2201               (size_t) sa->length, nxt_sockaddr_start(sa));
2202 
2203     skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t));
2204     if (nxt_slow_path(skcf == NULL)) {
2205         return NULL;
2206     }
2207 
2208     size = nxt_sockaddr_size(sa);
2209 
2210     ret = nxt_router_listen_socket_find(tmcf, skcf, sa);
2211 
2212     if (ret != NXT_OK) {
2213 
2214         ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size);
2215         if (nxt_slow_path(ls == NULL)) {
2216             return NULL;
2217         }
2218 
2219         skcf->listen = ls;
2220 
2221         ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t));
2222         nxt_memcpy(ls->sockaddr, sa, size);
2223 
2224         nxt_listen_socket_remote_size(ls);
2225 
2226         ls->socket = -1;
2227         ls->backlog = NXT_LISTEN_BACKLOG;
2228         ls->flags = NXT_NONBLOCK;
2229         ls->read_after_accept = 1;
2230     }
2231 
2232     switch (sa->u.sockaddr.sa_family) {
2233 #if (NXT_HAVE_UNIX_DOMAIN)
2234     case AF_UNIX:
2235         wildcard = 0;
2236         break;
2237 #endif
2238 #if (NXT_INET6)
2239     case AF_INET6:
2240         wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr);
2241         break;
2242 #endif
2243     case AF_INET:
2244     default:
2245         wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY);
2246         break;
2247     }
2248 
2249     if (!wildcard) {
2250         skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size);
2251         if (nxt_slow_path(skcf->sockaddr == NULL)) {
2252             return NULL;
2253         }
2254 
2255         nxt_memcpy(skcf->sockaddr, sa, size);
2256     }
2257 
2258     return skcf;
2259 }
2260 
2261 
2262 static nxt_int_t
2263 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
2264     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa)
2265 {
2266     nxt_router_t       *router;
2267     nxt_queue_link_t   *qlk;
2268     nxt_socket_conf_t  *skcf;
2269 
2270     router = tmcf->router_conf->router;
2271 
2272     for (qlk = nxt_queue_first(&router->sockets);
2273          qlk != nxt_queue_tail(&router->sockets);
2274          qlk = nxt_queue_next(qlk))
2275     {
2276         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2277 
2278         if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) {
2279             nskcf->listen = skcf->listen;
2280 
2281             nxt_queue_remove(qlk);
2282             nxt_queue_insert_tail(&keeping_sockets, qlk);
2283 
2284             nxt_queue_insert_tail(&updating_sockets, &nskcf->link);
2285 
2286             return NXT_OK;
2287         }
2288     }
2289 
2290     nxt_queue_insert_tail(&pending_sockets, &nskcf->link);
2291 
2292     return NXT_DECLINED;
2293 }
2294 
2295 
2296 static void
2297 nxt_router_listen_socket_rpc_create(nxt_task_t *task,
2298     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)
2299 {
2300     size_t            size;
2301     uint32_t          stream;
2302     nxt_int_t         ret;
2303     nxt_buf_t         *b;
2304     nxt_port_t        *main_port, *router_port;
2305     nxt_runtime_t     *rt;
2306     nxt_socket_rpc_t  *rpc;
2307 
2308     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
2309     if (rpc == NULL) {
2310         goto fail;
2311     }
2312 
2313     rpc->socket_conf = skcf;
2314     rpc->temp_conf = tmcf;
2315 
2316     size = nxt_sockaddr_size(skcf->listen->sockaddr);
2317 
2318     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2319     if (b == NULL) {
2320         goto fail;
2321     }
2322 
2323     b->completion_handler = nxt_router_dummy_buf_completion;
2324 
2325     b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size);
2326 
2327     rt = task->thread->runtime;
2328     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2329     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2330 
2331     stream = nxt_port_rpc_register_handler(task, router_port,
2332                                            nxt_router_listen_socket_ready,
2333                                            nxt_router_listen_socket_error,
2334                                            main_port->pid, rpc);
2335     if (nxt_slow_path(stream == 0)) {
2336         goto fail;
2337     }
2338 
2339     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1,
2340                                 stream, router_port->id, b);
2341 
2342     if (nxt_slow_path(ret != NXT_OK)) {
2343         nxt_port_rpc_cancel(task, router_port, stream);
2344         goto fail;
2345     }
2346 
2347     return;
2348 
2349 fail:
2350 
2351     nxt_router_conf_error(task, tmcf);
2352 }
2353 
2354 
2355 static void
2356 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2357     void *data)
2358 {
2359     nxt_int_t         ret;
2360     nxt_socket_t      s;
2361     nxt_socket_rpc_t  *rpc;
2362 
2363     rpc = data;
2364 
2365     s = msg->fd[0];
2366 
2367     ret = nxt_socket_nonblocking(task, s);
2368     if (nxt_slow_path(ret != NXT_OK)) {
2369         goto fail;
2370     }
2371 
2372     nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr);
2373 
2374     ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
2375     if (nxt_slow_path(ret != NXT_OK)) {
2376         goto fail;
2377     }
2378 
2379     rpc->socket_conf->listen->socket = s;
2380 
2381     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2382                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2383 
2384     return;
2385 
2386 fail:
2387 
2388     nxt_socket_close(task, s);
2389 
2390     nxt_router_conf_error(task, rpc->temp_conf);
2391 }
2392 
2393 
2394 static void
2395 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2396     void *data)
2397 {
2398     nxt_socket_rpc_t        *rpc;
2399     nxt_router_temp_conf_t  *tmcf;
2400 
2401     rpc = data;
2402     tmcf = rpc->temp_conf;
2403 
2404 #if 0
2405     u_char                  *p;
2406     size_t                  size;
2407     uint8_t                 error;
2408     nxt_buf_t               *in, *out;
2409     nxt_sockaddr_t          *sa;
2410 
2411     static nxt_str_t  socket_errors[] = {
2412         nxt_string("ListenerSystem"),
2413         nxt_string("ListenerNoIPv6"),
2414         nxt_string("ListenerPort"),
2415         nxt_string("ListenerInUse"),
2416         nxt_string("ListenerNoAddress"),
2417         nxt_string("ListenerNoAccess"),
2418         nxt_string("ListenerPath"),
2419     };
2420 
2421     sa = rpc->socket_conf->listen->sockaddr;
2422 
2423     in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size);
2424 
2425     if (nxt_slow_path(in == NULL)) {
2426         return;
2427     }
2428 
2429     p = in->mem.pos;
2430 
2431     error = *p++;
2432 
2433     size = nxt_length("listen socket error: ")
2434            + nxt_length("{listener: \"\", code:\"\", message: \"\"}")
2435            + sa->length + socket_errors[error].length + (in->mem.free - p);
2436 
2437     out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2438     if (nxt_slow_path(out == NULL)) {
2439         return;
2440     }
2441 
2442     out->mem.free = nxt_sprintf(out->mem.free, out->mem.end,
2443                         "listen socket error: "
2444                         "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}",
2445                         (size_t) sa->length, nxt_sockaddr_start(sa),
2446                         &socket_errors[error], in->mem.free - p, p);
2447 
2448     nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos);
2449 #endif
2450 
2451     nxt_router_conf_error(task, tmcf);
2452 }
2453 
2454 
2455 #if (NXT_TLS)
2456 
2457 static void
2458 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2459     void *data)
2460 {
2461     nxt_mp_t                *mp;
2462     nxt_int_t               ret;
2463     nxt_tls_conf_t          *tlscf;
2464     nxt_router_tlssock_t    *tls;
2465     nxt_tls_bundle_conf_t   *bundle;
2466     nxt_router_temp_conf_t  *tmcf;
2467 
2468     nxt_debug(task, "tls rpc handler");
2469 
2470     tls = data;
2471     tmcf = tls->temp_conf;
2472 
2473     if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) {
2474         goto fail;
2475     }
2476 
2477     mp = tmcf->router_conf->mem_pool;
2478 
2479     if (tls->socket_conf->tls == NULL){
2480         tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t));
2481         if (nxt_slow_path(tlscf == NULL)) {
2482             goto fail;
2483         }
2484 
2485         tlscf->no_wait_shutdown = 1;
2486         tls->socket_conf->tls = tlscf;
2487 
2488     } else {
2489         tlscf = tls->socket_conf->tls;
2490     }
2491 
2492     tls->tls_init->conf = tlscf;
2493 
2494     bundle = nxt_mp_get(mp, sizeof(nxt_tls_bundle_conf_t));
2495     if (nxt_slow_path(bundle == NULL)) {
2496         goto fail;
2497     }
2498 
2499     if (nxt_slow_path(nxt_str_dup(mp, &bundle->name, &tls->name) == NULL)) {
2500         goto fail;
2501     }
2502 
2503     bundle->chain_file = msg->fd[0];
2504     bundle->next = tlscf->bundle;
2505     tlscf->bundle = bundle;
2506 
2507     ret = task->thread->runtime->tls->server_init(task, mp, tls->tls_init,
2508                                                   tls->last);
2509     if (nxt_slow_path(ret != NXT_OK)) {
2510         goto fail;
2511     }
2512 
2513     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2514                        nxt_router_conf_apply, task, tmcf, NULL);
2515     return;
2516 
2517 fail:
2518 
2519     nxt_router_conf_error(task, tmcf);
2520 }
2521 
2522 #endif
2523 
2524 
2525 static void
2526 nxt_router_app_rpc_create(nxt_task_t *task,
2527     nxt_router_temp_conf_t *tmcf, nxt_app_t *app)
2528 {
2529     size_t         size;
2530     uint32_t       stream;
2531     nxt_int_t      ret;
2532     nxt_buf_t      *b;
2533     nxt_port_t     *main_port, *router_port;
2534     nxt_runtime_t  *rt;
2535     nxt_app_rpc_t  *rpc;
2536 
2537     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_app_rpc_t));
2538     if (rpc == NULL) {
2539         goto fail;
2540     }
2541 
2542     rpc->app = app;
2543     rpc->temp_conf = tmcf;
2544 
2545     nxt_debug(task, "app '%V' prefork", &app->name);
2546 
2547     size = app->name.length + 1 + app->conf.length;
2548 
2549     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2550     if (nxt_slow_path(b == NULL)) {
2551         goto fail;
2552     }
2553 
2554     b->completion_handler = nxt_router_dummy_buf_completion;
2555 
2556     nxt_buf_cpystr(b, &app->name);
2557     *b->mem.free++ = '\0';
2558     nxt_buf_cpystr(b, &app->conf);
2559 
2560     rt = task->thread->runtime;
2561     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2562     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2563 
2564     stream = nxt_port_rpc_register_handler(task, router_port,
2565                                            nxt_router_app_prefork_ready,
2566                                            nxt_router_app_prefork_error,
2567                                            -1, rpc);
2568     if (nxt_slow_path(stream == 0)) {
2569         goto fail;
2570     }
2571 
2572     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS,
2573                                 -1, stream, router_port->id, b);
2574 
2575     if (nxt_slow_path(ret != NXT_OK)) {
2576         nxt_port_rpc_cancel(task, router_port, stream);
2577         goto fail;
2578     }
2579 
2580     app->pending_processes++;
2581 
2582     return;
2583 
2584 fail:
2585 
2586     nxt_router_conf_error(task, tmcf);
2587 }
2588 
2589 
2590 static void
2591 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2592     void *data)
2593 {
2594     nxt_app_t           *app;
2595     nxt_port_t          *port;
2596     nxt_app_rpc_t       *rpc;
2597     nxt_event_engine_t  *engine;
2598 
2599     rpc = data;
2600     app = rpc->app;
2601 
2602     port = msg->u.new_port;
2603 
2604     nxt_assert(port != NULL);
2605     nxt_assert(port->type == NXT_PROCESS_APP);
2606     nxt_assert(port->id == 0);
2607 
2608     port->app = app;
2609     port->main_app_port = port;
2610 
2611     app->pending_processes--;
2612     app->processes++;
2613     app->idle_processes++;
2614 
2615     engine = task->thread->engine;
2616 
2617     nxt_queue_insert_tail(&app->ports, &port->app_link);
2618     nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
2619 
2620     nxt_debug(task, "app '%V' move new port %PI:%d to spare_ports",
2621               &app->name, port->pid, port->id);
2622 
2623     nxt_port_hash_add(&app->port_hash, port);
2624     app->port_hash_count++;
2625 
2626     port->idle_start = 0;
2627 
2628     nxt_port_inc_use(port);
2629 
2630     nxt_router_app_shared_port_send(task, port);
2631 
2632     nxt_work_queue_add(&engine->fast_work_queue,
2633                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2634 }
2635 
2636 
2637 static void
2638 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2639     void *data)
2640 {
2641     nxt_app_t               *app;
2642     nxt_app_rpc_t           *rpc;
2643     nxt_router_temp_conf_t  *tmcf;
2644 
2645     rpc = data;
2646     app = rpc->app;
2647     tmcf = rpc->temp_conf;
2648 
2649     nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"",
2650             &app->name);
2651 
2652     app->pending_processes--;
2653 
2654     nxt_router_conf_error(task, tmcf);
2655 }
2656 
2657 
2658 static nxt_int_t
2659 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
2660     nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
2661 {
2662     nxt_int_t                 ret;
2663     nxt_uint_t                n, threads;
2664     nxt_queue_link_t          *qlk;
2665     nxt_router_engine_conf_t  *recf;
2666 
2667     threads = tmcf->router_conf->threads;
2668 
2669     tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
2670                                      sizeof(nxt_router_engine_conf_t));
2671     if (nxt_slow_path(tmcf->engines == NULL)) {
2672         return NXT_ERROR;
2673     }
2674 
2675     n = 0;
2676 
2677     for (qlk = nxt_queue_first(&router->engines);
2678          qlk != nxt_queue_tail(&router->engines);
2679          qlk = nxt_queue_next(qlk))
2680     {
2681         recf = nxt_array_zero_add(tmcf->engines);
2682         if (nxt_slow_path(recf == NULL)) {
2683             return NXT_ERROR;
2684         }
2685 
2686         recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0);
2687 
2688         if (n < threads) {
2689             recf->action = NXT_ROUTER_ENGINE_KEEP;
2690             ret = nxt_router_engine_conf_update(tmcf, recf);
2691 
2692         } else {
2693             recf->action = NXT_ROUTER_ENGINE_DELETE;
2694             ret = nxt_router_engine_conf_delete(tmcf, recf);
2695         }
2696 
2697         if (nxt_slow_path(ret != NXT_OK)) {
2698             return ret;
2699         }
2700 
2701         n++;
2702     }
2703 
2704     tmcf->new_threads = n;
2705 
2706     while (n < threads) {
2707         recf = nxt_array_zero_add(tmcf->engines);
2708         if (nxt_slow_path(recf == NULL)) {
2709             return NXT_ERROR;
2710         }
2711 
2712         recf->action = NXT_ROUTER_ENGINE_ADD;
2713 
2714         recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
2715         if (nxt_slow_path(recf->engine == NULL)) {
2716             return NXT_ERROR;
2717         }
2718 
2719         ret = nxt_router_engine_conf_create(tmcf, recf);
2720         if (nxt_slow_path(ret != NXT_OK)) {
2721             return ret;
2722         }
2723 
2724         n++;
2725     }
2726 
2727     return NXT_OK;
2728 }
2729 
2730 
2731 static nxt_int_t
2732 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
2733     nxt_router_engine_conf_t *recf)
2734 {
2735     nxt_int_t  ret;
2736 
2737     ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
2738                                           nxt_router_listen_socket_create);
2739     if (nxt_slow_path(ret != NXT_OK)) {
2740         return ret;
2741     }
2742 
2743     ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
2744                                           nxt_router_listen_socket_create);
2745     if (nxt_slow_path(ret != NXT_OK)) {
2746         return ret;
2747     }
2748 
2749     return ret;
2750 }
2751 
2752 
2753 static nxt_int_t
2754 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
2755     nxt_router_engine_conf_t *recf)
2756 {
2757     nxt_int_t  ret;
2758 
2759     ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
2760                                           nxt_router_listen_socket_create);
2761     if (nxt_slow_path(ret != NXT_OK)) {
2762         return ret;
2763     }
2764 
2765     ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
2766                                           nxt_router_listen_socket_update);
2767     if (nxt_slow_path(ret != NXT_OK)) {
2768         return ret;
2769     }
2770 
2771     ret = nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
2772     if (nxt_slow_path(ret != NXT_OK)) {
2773         return ret;
2774     }
2775 
2776     return ret;
2777 }
2778 
2779 
2780 static nxt_int_t
2781 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
2782     nxt_router_engine_conf_t *recf)
2783 {
2784     nxt_int_t  ret;
2785 
2786     ret = nxt_router_engine_quit(tmcf, recf);
2787     if (nxt_slow_path(ret != NXT_OK)) {
2788         return ret;
2789     }
2790 
2791     ret = nxt_router_engine_joints_delete(tmcf, recf, &updating_sockets);
2792     if (nxt_slow_path(ret != NXT_OK)) {
2793         return ret;
2794     }
2795 
2796     return nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
2797 }
2798 
2799 
2800 static nxt_int_t
2801 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
2802     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
2803     nxt_work_handler_t handler)
2804 {
2805     nxt_int_t                ret;
2806     nxt_joint_job_t          *job;
2807     nxt_queue_link_t         *qlk;
2808     nxt_socket_conf_t        *skcf;
2809     nxt_socket_conf_joint_t  *joint;
2810 
2811     for (qlk = nxt_queue_first(sockets);
2812          qlk != nxt_queue_tail(sockets);
2813          qlk = nxt_queue_next(qlk))
2814     {
2815         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
2816         if (nxt_slow_path(job == NULL)) {
2817             return NXT_ERROR;
2818         }
2819 
2820         job->work.next = recf->jobs;
2821         recf->jobs = &job->work;
2822 
2823         job->task = tmcf->engine->task;
2824         job->work.handler = handler;
2825         job->work.task = &job->task;
2826         job->work.obj = job;
2827         job->tmcf = tmcf;
2828 
2829         tmcf->count++;
2830 
2831         joint = nxt_mp_alloc(tmcf->router_conf->mem_pool,
2832                              sizeof(nxt_socket_conf_joint_t));
2833         if (nxt_slow_path(joint == NULL)) {
2834             return NXT_ERROR;
2835         }
2836 
2837         job->work.data = joint;
2838 
2839         ret = nxt_upstreams_joint_create(tmcf, &joint->upstreams);
2840         if (nxt_slow_path(ret != NXT_OK)) {
2841             return ret;
2842         }
2843 
2844         joint->count = 1;
2845 
2846         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2847         skcf->count++;
2848         joint->socket_conf = skcf;
2849 
2850         joint->engine = recf->engine;
2851     }
2852 
2853     return NXT_OK;
2854 }
2855 
2856 
2857 static nxt_int_t
2858 nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
2859     nxt_router_engine_conf_t *recf)
2860 {
2861     nxt_joint_job_t  *job;
2862 
2863     job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
2864     if (nxt_slow_path(job == NULL)) {
2865         return NXT_ERROR;
2866     }
2867 
2868     job->work.next = recf->jobs;
2869     recf->jobs = &job->work;
2870 
2871     job->task = tmcf->engine->task;
2872     job->work.handler = nxt_router_worker_thread_quit;
2873     job->work.task = &job->task;
2874     job->work.obj = NULL;
2875     job->work.data = NULL;
2876     job->tmcf = NULL;
2877 
2878     return NXT_OK;
2879 }
2880 
2881 
2882 static nxt_int_t
2883 nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
2884     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets)
2885 {
2886     nxt_joint_job_t   *job;
2887     nxt_queue_link_t  *qlk;
2888 
2889     for (qlk = nxt_queue_first(sockets);
2890          qlk != nxt_queue_tail(sockets);
2891          qlk = nxt_queue_next(qlk))
2892     {
2893         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
2894         if (nxt_slow_path(job == NULL)) {
2895             return NXT_ERROR;
2896         }
2897 
2898         job->work.next = recf->jobs;
2899         recf->jobs = &job->work;
2900 
2901         job->task = tmcf->engine->task;
2902         job->work.handler = nxt_router_listen_socket_delete;
2903         job->work.task = &job->task;
2904         job->work.obj = job;
2905         job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2906         job->tmcf = tmcf;
2907 
2908         tmcf->count++;
2909     }
2910 
2911     return NXT_OK;
2912 }
2913 
2914 
2915 static nxt_int_t
2916 nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
2917     nxt_router_temp_conf_t *tmcf)
2918 {
2919     nxt_int_t                 ret;
2920     nxt_uint_t                i, threads;
2921     nxt_router_engine_conf_t  *recf;
2922 
2923     recf = tmcf->engines->elts;
2924     threads = tmcf->router_conf->threads;
2925 
2926     for (i = tmcf->new_threads; i < threads; i++) {
2927         ret = nxt_router_thread_create(task, rt, recf[i].engine);
2928         if (nxt_slow_path(ret != NXT_OK)) {
2929             return ret;
2930         }
2931     }
2932 
2933     return NXT_OK;
2934 }
2935 
2936 
2937 static nxt_int_t
2938 nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
2939     nxt_event_engine_t *engine)
2940 {
2941     nxt_int_t            ret;
2942     nxt_thread_link_t    *link;
2943     nxt_thread_handle_t  handle;
2944 
2945     link = nxt_zalloc(sizeof(nxt_thread_link_t));
2946 
2947     if (nxt_slow_path(link == NULL)) {
2948         return NXT_ERROR;
2949     }
2950 
2951     link->start = nxt_router_thread_start;
2952     link->engine = engine;
2953     link->work.handler = nxt_router_thread_exit_handler;
2954     link->work.task = task;
2955     link->work.data = link;
2956 
2957     nxt_queue_insert_tail(&rt->engines, &engine->link);
2958 
2959     ret = nxt_thread_create(&handle, link);
2960 
2961     if (nxt_slow_path(ret != NXT_OK)) {
2962         nxt_queue_remove(&engine->link);
2963     }
2964 
2965     return ret;
2966 }
2967 
2968 
2969 static void
2970 nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
2971     nxt_router_temp_conf_t *tmcf)
2972 {
2973     nxt_app_t  *app;
2974 
2975     nxt_queue_each(app, &router->apps, nxt_app_t, link) {
2976 
2977         nxt_router_app_unlink(task, app);
2978 
2979     } nxt_queue_loop;
2980 
2981     nxt_queue_add(&router->apps, &tmcf->previous);
2982     nxt_queue_add(&router->apps, &tmcf->apps);
2983 }
2984 
2985 
2986 static void
2987 nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf)
2988 {
2989     nxt_uint_t                n;
2990     nxt_event_engine_t        *engine;
2991     nxt_router_engine_conf_t  *recf;
2992 
2993     recf = tmcf->engines->elts;
2994 
2995     for (n = tmcf->engines->nelts; n != 0; n--) {
2996         engine = recf->engine;
2997 
2998         switch (recf->action) {
2999 
3000         case NXT_ROUTER_ENGINE_KEEP:
3001             break;
3002 
3003         case NXT_ROUTER_ENGINE_ADD:
3004             nxt_queue_insert_tail(&router->engines, &engine->link0);
3005             break;
3006 
3007         case NXT_ROUTER_ENGINE_DELETE:
3008             nxt_queue_remove(&engine->link0);
3009             break;
3010         }
3011 
3012         nxt_router_engine_post(engine, recf->jobs);
3013 
3014         recf++;
3015     }
3016 }
3017 
3018 
3019 static void
3020 nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs)
3021 {
3022     nxt_work_t  *work, *next;
3023 
3024     for (work = jobs; work != NULL; work = next) {
3025         next = work->next;
3026         work->next = NULL;
3027 
3028         nxt_event_engine_post(engine, work);
3029     }
3030 }
3031 
3032 
3033 static nxt_port_handlers_t  nxt_router_app_port_handlers = {
3034     .rpc_error       = nxt_port_rpc_handler,
3035     .mmap            = nxt_port_mmap_handler,
3036     .data            = nxt_port_rpc_handler,
3037     .oosm            = nxt_router_oosm_handler,
3038     .req_headers_ack = nxt_port_rpc_handler,
3039 };
3040 
3041 
3042 static void
3043 nxt_router_thread_start(void *data)
3044 {
3045     nxt_int_t           ret;
3046     nxt_port_t          *port;
3047     nxt_task_t          *task;
3048     nxt_work_t          *work;
3049     nxt_thread_t        *thread;
3050     nxt_thread_link_t   *link;
3051     nxt_event_engine_t  *engine;
3052 
3053     link = data;
3054     engine = link->engine;
3055     task = &engine->task;
3056 
3057     thread = nxt_thread();
3058 
3059     nxt_event_engine_thread_adopt(engine);
3060 
3061     /* STUB */
3062     thread->runtime = engine->task.thread->runtime;
3063 
3064     engine->task.thread = thread;
3065     engine->task.log = thread->log;
3066     thread->engine = engine;
3067     thread->task = &engine->task;
3068 #if 0
3069     thread->fiber = &engine->fibers->fiber;
3070 #endif
3071 
3072     engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
3073     if (nxt_slow_path(engine->mem_pool == NULL)) {
3074         return;
3075     }
3076 
3077     port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid,
3078                         NXT_PROCESS_ROUTER);
3079     if (nxt_slow_path(port == NULL)) {
3080         return;
3081     }
3082 
3083     ret = nxt_port_socket_init(task, port, 0);
3084     if (nxt_slow_path(ret != NXT_OK)) {
3085         nxt_port_use(task, port, -1);
3086         return;
3087     }
3088 
3089     ret = nxt_router_port_queue_init(task, port);
3090     if (nxt_slow_path(ret != NXT_OK)) {
3091         nxt_port_use(task, port, -1);
3092         return;
3093     }
3094 
3095     engine->port = port;
3096 
3097     nxt_port_enable(task, port, &nxt_router_app_port_handlers);
3098 
3099     work = nxt_zalloc(sizeof(nxt_work_t));
3100     if (nxt_slow_path(work == NULL)) {
3101         return;
3102     }
3103 
3104     work->handler = nxt_router_rt_add_port;
3105     work->task = link->work.task;
3106     work->obj = work;
3107     work->data = port;
3108 
3109     nxt_event_engine_post(link->work.task->thread->engine, work);
3110 
3111     nxt_event_engine_start(engine);
3112 }
3113 
3114 
3115 static void
3116 nxt_router_rt_add_port(nxt_task_t *task, void *obj, void *data)
3117 {
3118     nxt_int_t      res;
3119     nxt_port_t     *port;
3120     nxt_runtime_t  *rt;
3121 
3122     rt = task->thread->runtime;
3123     port = data;
3124 
3125     nxt_free(obj);
3126 
3127     res = nxt_port_hash_add(&rt->ports, port);
3128 
3129     if (nxt_fast_path(res == NXT_OK)) {
3130         nxt_port_use(task, port, 1);
3131     }
3132 }
3133 
3134 
3135 static void
3136 nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data)
3137 {
3138     nxt_joint_job_t          *job;
3139     nxt_socket_conf_t        *skcf;
3140     nxt_listen_event_t       *lev;
3141     nxt_listen_socket_t      *ls;
3142     nxt_thread_spinlock_t    *lock;
3143     nxt_socket_conf_joint_t  *joint;
3144 
3145     job = obj;
3146     joint = data;
3147 
3148     nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link);
3149 
3150     skcf = joint->socket_conf;
3151     ls = skcf->listen;
3152 
3153     lev = nxt_listen_event(task, ls);
3154     if (nxt_slow_path(lev == NULL)) {
3155         nxt_router_listen_socket_release(task, skcf);
3156         return;
3157     }
3158 
3159     lev->socket.data = joint;
3160 
3161     lock = &skcf->router_conf->router->lock;
3162 
3163     nxt_thread_spin_lock(lock);
3164     ls->count++;
3165     nxt_thread_spin_unlock(lock);
3166 
3167     job->work.next = NULL;
3168     job->work.handler = nxt_router_conf_wait;
3169 
3170     nxt_event_engine_post(job->tmcf->engine, &job->work);
3171 }
3172 
3173 
3174 nxt_inline nxt_listen_event_t *
3175 nxt_router_listen_event(nxt_queue_t *listen_connections,
3176     nxt_socket_conf_t *skcf)
3177 {
3178     nxt_socket_t        fd;
3179     nxt_queue_link_t    *qlk;
3180     nxt_listen_event_t  *lev;
3181 
3182     fd = skcf->listen->socket;
3183 
3184     for (qlk = nxt_queue_first(listen_connections);
3185          qlk != nxt_queue_tail(listen_connections);
3186          qlk = nxt_queue_next(qlk))
3187     {
3188         lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link);
3189 
3190         if (fd == lev->socket.fd) {
3191             return lev;
3192         }
3193     }
3194 
3195     return NULL;
3196 }
3197 
3198 
3199 static void
3200 nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data)
3201 {
3202     nxt_joint_job_t          *job;
3203     nxt_event_engine_t       *engine;
3204     nxt_listen_event_t       *lev;
3205     nxt_socket_conf_joint_t  *joint, *old;
3206 
3207     job = obj;
3208     joint = data;
3209 
3210     engine = task->thread->engine;
3211 
3212     nxt_queue_insert_tail(&engine->joints, &joint->link);
3213 
3214     lev = nxt_router_listen_event(&engine->listen_connections,
3215                                   joint->socket_conf);
3216 
3217     old = lev->socket.data;
3218     lev->socket.data = joint;
3219     lev->listen = joint->socket_conf->listen;
3220 
3221     job->work.next = NULL;
3222     job->work.handler = nxt_router_conf_wait;
3223 
3224     nxt_event_engine_post(job->tmcf->engine, &job->work);
3225 
3226     /*
3227      * The task is allocated from configuration temporary
3228      * memory pool so it can be freed after engine post operation.
3229      */
3230 
3231     nxt_router_conf_release(&engine->task, old);
3232 }
3233 
3234 
3235 static void
3236 nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data)
3237 {
3238     nxt_socket_conf_t        *skcf;
3239     nxt_listen_event_t       *lev;
3240     nxt_event_engine_t       *engine;
3241     nxt_socket_conf_joint_t  *joint;
3242 
3243     skcf = data;
3244 
3245     engine = task->thread->engine;
3246 
3247     lev = nxt_router_listen_event(&engine->listen_connections, skcf);
3248 
3249     nxt_fd_event_delete(engine, &lev->socket);
3250 
3251     nxt_debug(task, "engine %p: listen socket delete: %d", engine,
3252               lev->socket.fd);
3253 
3254     joint = lev->socket.data;
3255     joint->close_job = obj;
3256 
3257     lev->timer.handler = nxt_router_listen_socket_close;
3258     lev->timer.work_queue = &engine->fast_work_queue;
3259 
3260     nxt_timer_add(engine, &lev->timer, 0);
3261 }
3262 
3263 
3264 static void
3265 nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, void *data)
3266 {
3267     nxt_event_engine_t  *engine;
3268 
3269     nxt_debug(task, "router worker thread quit");
3270 
3271     engine = task->thread->engine;
3272 
3273     engine->shutdown = 1;
3274 
3275     if (nxt_queue_is_empty(&engine->joints)) {
3276         nxt_thread_exit(task->thread);
3277     }
3278 }
3279 
3280 
3281 static void
3282 nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data)
3283 {
3284     nxt_timer_t              *timer;
3285     nxt_joint_job_t          *job;
3286     nxt_listen_event_t       *lev;
3287     nxt_socket_conf_joint_t  *joint;
3288 
3289     timer = obj;
3290     lev = nxt_timer_data(timer, nxt_listen_event_t, timer);
3291 
3292     nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine,
3293               lev->socket.fd);
3294 
3295     nxt_queue_remove(&lev->link);
3296 
3297     joint = lev->socket.data;
3298     lev->socket.data = NULL;
3299 
3300     /* 'task' refers to lev->task and we cannot use after nxt_free() */
3301     task = &task->thread->engine->task;
3302 
3303     nxt_router_listen_socket_release(task, joint->socket_conf);
3304 
3305     job = joint->close_job;
3306     job->work.next = NULL;
3307     job->work.handler = nxt_router_conf_wait;
3308 
3309     nxt_event_engine_post(job->tmcf->engine, &job->work);
3310 
3311     nxt_router_listen_event_release(task, lev, joint);
3312 }
3313 
3314 
3315 static void
3316 nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf)
3317 {
3318     nxt_listen_socket_t    *ls;
3319     nxt_thread_spinlock_t  *lock;
3320 
3321     ls = skcf->listen;
3322     lock = &skcf->router_conf->router->lock;
3323 
3324     nxt_thread_spin_lock(lock);
3325 
3326     nxt_debug(task, "engine %p: listen socket release: ls->count %D",
3327               task->thread->engine, ls->count);
3328 
3329     if (--ls->count != 0) {
3330         ls = NULL;
3331     }
3332 
3333     nxt_thread_spin_unlock(lock);
3334 
3335     if (ls != NULL) {
3336         nxt_socket_close(task, ls->socket);
3337         nxt_free(ls);
3338     }
3339 }
3340 
3341 
3342 void
3343 nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev,
3344     nxt_socket_conf_joint_t *joint)
3345 {
3346     nxt_event_engine_t  *engine;
3347 
3348     nxt_debug(task, "listen event count: %D", lev->count);
3349 
3350     engine = task->thread->engine;
3351 
3352     if (--lev->count == 0) {
3353         if (lev->next != NULL) {
3354             nxt_sockaddr_cache_free(engine, lev->next);
3355 
3356             nxt_conn_free(task, lev->next);
3357         }
3358 
3359         nxt_free(lev);
3360     }
3361 
3362     if (joint != NULL) {
3363         nxt_router_conf_release(task, joint);
3364     }
3365 
3366     if (engine->shutdown && nxt_queue_is_empty(&engine->joints)) {
3367         nxt_thread_exit(task->thread);
3368     }
3369 }
3370 
3371 
3372 void
3373 nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
3374 {
3375     nxt_socket_conf_t      *skcf;
3376     nxt_router_conf_t      *rtcf;
3377     nxt_thread_spinlock_t  *lock;
3378 
3379     nxt_debug(task, "conf joint %p count: %D", joint, joint->count);
3380 
3381     if (--joint->count != 0) {
3382         return;
3383     }
3384 
3385     nxt_queue_remove(&joint->link);
3386 
3387     /*
3388      * The joint content can not be safely used after the critical
3389      * section protected by the spinlock because its memory pool may
3390      * be already destroyed by another thread.
3391      */
3392     skcf = joint->socket_conf;
3393     rtcf = skcf->router_conf;
3394     lock = &rtcf->router->lock;
3395 
3396     nxt_thread_spin_lock(lock);
3397 
3398     nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count,
3399               rtcf, rtcf->count);
3400 
3401     if (--skcf->count != 0) {
3402         skcf = NULL;
3403         rtcf = NULL;
3404 
3405     } else {
3406         nxt_queue_remove(&skcf->link);
3407 
3408         if (--rtcf->count != 0) {
3409             rtcf = NULL;
3410         }
3411     }
3412 
3413     nxt_thread_spin_unlock(lock);
3414 
3415 #if (NXT_TLS)
3416     if (skcf != NULL && skcf->tls != NULL) {
3417         task->thread->runtime->tls->server_free(task, skcf->tls);
3418     }
3419 #endif
3420 
3421     /* TODO remove engine->port */
3422 
3423     if (rtcf != NULL) {
3424         nxt_debug(task, "old router conf is destroyed");
3425 
3426         nxt_router_apps_hash_use(task, rtcf, -1);
3427 
3428         nxt_router_access_log_release(task, lock, rtcf->access_log);
3429 
3430         nxt_mp_thread_adopt(rtcf->mem_pool);
3431 
3432         nxt_mp_destroy(rtcf->mem_pool);
3433     }
3434 }
3435 
3436 
3437 static void
3438 nxt_router_access_log_writer(nxt_task_t *task, nxt_http_request_t *r,
3439     nxt_router_access_log_t *access_log)
3440 {
3441     size_t     size;
3442     u_char     *buf, *p;
3443     nxt_off_t  bytes;
3444 
3445     static nxt_time_string_t  date_cache = {
3446         (nxt_atomic_uint_t) -1,
3447         nxt_router_access_log_date,
3448         "%02d/%s/%4d:%02d:%02d:%02d %c%02d%02d",
3449         nxt_length("31/Dec/1986:19:40:00 +0300"),
3450         NXT_THREAD_TIME_LOCAL,
3451         NXT_THREAD_TIME_SEC,
3452     };
3453 
3454     size = r->remote->address_length
3455            + 6                  /* ' - - [' */
3456            + date_cache.size
3457            + 3                  /* '] "' */
3458            + r->method->length
3459            + 1                  /* space */
3460            + r->target.length
3461            + 1                  /* space */
3462            + r->version.length
3463            + 2                  /* '" ' */
3464            + 3                  /* status */
3465            + 1                  /* space */
3466            + NXT_OFF_T_LEN
3467            + 2                  /* ' "' */
3468            + (r->referer != NULL ? r->referer->value_length : 1)
3469            + 3                  /* '" "' */
3470            + (r->user_agent != NULL ? r->user_agent->value_length : 1)
3471            + 2                  /* '"\n' */
3472     ;
3473 
3474     buf = nxt_mp_nget(r->mem_pool, size);
3475     if (nxt_slow_path(buf == NULL)) {
3476         return;
3477     }
3478 
3479     p = nxt_cpymem(buf, nxt_sockaddr_address(r->remote),
3480                    r->remote->address_length);
3481 
3482     p = nxt_cpymem(p, " - - [", 6);
3483 
3484     p = nxt_thread_time_string(task->thread, &date_cache, p);
3485 
3486     p = nxt_cpymem(p, "] \"", 3);
3487 
3488     if (r->method->length != 0) {
3489         p = nxt_cpymem(p, r->method->start, r->method->length);
3490 
3491         if (r->target.length != 0) {
3492             *p++ = ' ';
3493             p = nxt_cpymem(p, r->target.start, r->target.length);
3494 
3495             if (r->version.length != 0) {
3496                 *p++ = ' ';
3497                 p = nxt_cpymem(p, r->version.start, r->version.length);
3498             }
3499         }
3500 
3501     } else {
3502         *p++ = '-';
3503     }
3504 
3505     p = nxt_cpymem(p, "\" ", 2);
3506 
3507     p = nxt_sprintf(p, p + 3, "%03d", r->status);
3508 
3509     *p++ = ' ';
3510 
3511     bytes = nxt_http_proto[r->protocol].body_bytes_sent(task, r->proto);
3512 
3513     p = nxt_sprintf(p, p + NXT_OFF_T_LEN, "%O", bytes);
3514 
3515     p = nxt_cpymem(p, " \"", 2);
3516 
3517     if (r->referer != NULL) {
3518         p = nxt_cpymem(p, r->referer->value, r->referer->value_length);
3519 
3520     } else {
3521         *p++ = '-';
3522     }
3523 
3524     p = nxt_cpymem(p, "\" \"", 3);
3525 
3526     if (r->user_agent != NULL) {
3527         p = nxt_cpymem(p, r->user_agent->value, r->user_agent->value_length);
3528 
3529     } else {
3530         *p++ = '-';
3531     }
3532 
3533     p = nxt_cpymem(p, "\"\n", 2);
3534 
3535     nxt_fd_write(access_log->fd, buf, p - buf);
3536 }
3537 
3538 
3539 static u_char *
3540 nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, struct tm *tm,
3541     size_t size, const char *format)
3542 {
3543     u_char  sign;
3544     time_t  gmtoff;
3545 
3546     static const char  *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun",
3547                                     "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
3548 
3549     gmtoff = nxt_timezone(tm) / 60;
3550 
3551     if (gmtoff < 0) {
3552         gmtoff = -gmtoff;
3553         sign = '-';
3554 
3555     } else {
3556         sign = '+';
3557     }
3558 
3559     return nxt_sprintf(buf, buf + size, format,
3560                        tm->tm_mday, month[tm->tm_mon], tm->tm_year + 1900,
3561                        tm->tm_hour, tm->tm_min, tm->tm_sec,
3562                        sign, gmtoff / 60, gmtoff % 60);
3563 }
3564 
3565 
3566 static void
3567 nxt_router_access_log_open(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
3568 {
3569     uint32_t                 stream;
3570     nxt_int_t                ret;
3571     nxt_buf_t                *b;
3572     nxt_port_t               *main_port, *router_port;
3573     nxt_runtime_t            *rt;
3574     nxt_router_access_log_t  *access_log;
3575 
3576     access_log = tmcf->router_conf->access_log;
3577 
3578     b = nxt_buf_mem_alloc(tmcf->mem_pool, access_log->path.length + 1, 0);
3579     if (nxt_slow_path(b == NULL)) {
3580         goto fail;
3581     }
3582 
3583     b->completion_handler = nxt_router_dummy_buf_completion;
3584 
3585     nxt_buf_cpystr(b, &access_log->path);
3586     *b->mem.free++ = '\0';
3587 
3588     rt = task->thread->runtime;
3589     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
3590     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
3591 
3592     stream = nxt_port_rpc_register_handler(task, router_port,
3593                                            nxt_router_access_log_ready,
3594                                            nxt_router_access_log_error,
3595                                            -1, tmcf);
3596     if (nxt_slow_path(stream == 0)) {
3597         goto fail;
3598     }
3599 
3600     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1,
3601                                 stream, router_port->id, b);
3602 
3603     if (nxt_slow_path(ret != NXT_OK)) {
3604         nxt_port_rpc_cancel(task, router_port, stream);
3605         goto fail;
3606     }
3607 
3608     return;
3609 
3610 fail:
3611 
3612     nxt_router_conf_error(task, tmcf);
3613 }
3614 
3615 
3616 static void
3617 nxt_router_access_log_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3618     void *data)
3619 {
3620     nxt_router_temp_conf_t   *tmcf;
3621     nxt_router_access_log_t  *access_log;
3622 
3623     tmcf = data;
3624 
3625     access_log = tmcf->router_conf->access_log;
3626 
3627     access_log->fd = msg->fd[0];
3628 
3629     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
3630                        nxt_router_conf_apply, task, tmcf, NULL);
3631 }
3632 
3633 
3634 static void
3635 nxt_router_access_log_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3636     void *data)
3637 {
3638     nxt_router_temp_conf_t  *tmcf;
3639 
3640     tmcf = data;
3641 
3642     nxt_router_conf_error(task, tmcf);
3643 }
3644 
3645 
3646 static void
3647 nxt_router_access_log_release(nxt_task_t *task, nxt_thread_spinlock_t *lock,
3648     nxt_router_access_log_t *access_log)
3649 {
3650     if (access_log == NULL) {
3651         return;
3652     }
3653 
3654     nxt_thread_spin_lock(lock);
3655 
3656     if (--access_log->count != 0) {
3657         access_log = NULL;
3658     }
3659 
3660     nxt_thread_spin_unlock(lock);
3661 
3662     if (access_log != NULL) {
3663 
3664         if (access_log->fd != -1) {
3665             nxt_fd_close(access_log->fd);
3666         }
3667 
3668         nxt_free(access_log);
3669     }
3670 }
3671 
3672 
3673 typedef struct {
3674     nxt_mp_t                 *mem_pool;
3675     nxt_router_access_log_t  *access_log;
3676 } nxt_router_access_log_reopen_t;
3677 
3678 
3679 static void
3680 nxt_router_access_log_reopen_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
3681 {
3682     nxt_mp_t                        *mp;
3683     uint32_t                        stream;
3684     nxt_int_t                       ret;
3685     nxt_buf_t                       *b;
3686     nxt_port_t                      *main_port, *router_port;
3687     nxt_runtime_t                   *rt;
3688     nxt_router_access_log_t         *access_log;
3689     nxt_router_access_log_reopen_t  *reopen;
3690 
3691     access_log = nxt_router->access_log;
3692 
3693     if (access_log == NULL) {
3694         return;
3695     }
3696 
3697     mp = nxt_mp_create(1024, 128, 256, 32);
3698     if (nxt_slow_path(mp == NULL)) {
3699         return;
3700     }
3701 
3702     reopen = nxt_mp_get(mp, sizeof(nxt_router_access_log_reopen_t));
3703     if (nxt_slow_path(reopen == NULL)) {
3704         goto fail;
3705     }
3706 
3707     reopen->mem_pool = mp;
3708     reopen->access_log = access_log;
3709 
3710     b = nxt_buf_mem_alloc(mp, access_log->path.length + 1, 0);
3711     if (nxt_slow_path(b == NULL)) {
3712         goto fail;
3713     }
3714 
3715     b->completion_handler = nxt_router_access_log_reopen_completion;
3716 
3717     nxt_buf_cpystr(b, &access_log->path);
3718     *b->mem.free++ = '\0';
3719 
3720     rt = task->thread->runtime;
3721     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
3722     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
3723 
3724     stream = nxt_port_rpc_register_handler(task, router_port,
3725                                            nxt_router_access_log_reopen_ready,
3726                                            nxt_router_access_log_reopen_error,
3727                                            -1, reopen);
3728     if (nxt_slow_path(stream == 0)) {
3729         goto fail;
3730     }
3731 
3732     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1,
3733                                 stream, router_port->id, b);
3734 
3735     if (nxt_slow_path(ret != NXT_OK)) {
3736         nxt_port_rpc_cancel(task, router_port, stream);
3737         goto fail;
3738     }
3739 
3740     nxt_mp_retain(mp);
3741 
3742     return;
3743 
3744 fail:
3745 
3746     nxt_mp_destroy(mp);
3747 }
3748 
3749 
3750 static void
3751 nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj, void *data)
3752 {
3753     nxt_mp_t   *mp;
3754     nxt_buf_t  *b;
3755 
3756     b = obj;
3757     mp = b->data;
3758 
3759     nxt_mp_release(mp);
3760 }
3761 
3762 
3763 static void
3764 nxt_router_access_log_reopen_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3765     void *data)
3766 {
3767     nxt_router_access_log_t         *access_log;
3768     nxt_router_access_log_reopen_t  *reopen;
3769 
3770     reopen = data;
3771 
3772     access_log = reopen->access_log;
3773 
3774     if (access_log == nxt_router->access_log) {
3775 
3776         if (nxt_slow_path(dup2(msg->fd[0], access_log->fd) == -1)) {
3777             nxt_alert(task, "dup2(%FD, %FD) failed %E",
3778                       msg->fd[0], access_log->fd, nxt_errno);
3779         }
3780     }
3781 
3782     nxt_fd_close(msg->fd[0]);
3783     nxt_mp_release(reopen->mem_pool);
3784 }
3785 
3786 
3787 static void
3788 nxt_router_access_log_reopen_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3789     void *data)
3790 {
3791     nxt_router_access_log_reopen_t  *reopen;
3792 
3793     reopen = data;
3794 
3795     nxt_mp_release(reopen->mem_pool);
3796 }
3797 
3798 
3799 static void
3800 nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data)
3801 {
3802     nxt_port_t           *port;
3803     nxt_thread_link_t    *link;
3804     nxt_event_engine_t   *engine;
3805     nxt_thread_handle_t  handle;
3806 
3807     handle = (nxt_thread_handle_t) (uintptr_t) obj;
3808     link = data;
3809 
3810     nxt_thread_wait(handle);
3811 
3812     engine = link->engine;
3813 
3814     nxt_queue_remove(&engine->link);
3815 
3816     port = engine->port;
3817 
3818     // TODO notify all apps
3819 
3820     port->engine = task->thread->engine;
3821     nxt_mp_thread_adopt(port->mem_pool);
3822     nxt_port_use(task, port, -1);
3823 
3824     nxt_mp_thread_adopt(engine->mem_pool);
3825     nxt_mp_destroy(engine->mem_pool);
3826 
3827     nxt_event_engine_free(engine);
3828 
3829     nxt_free(link);
3830 }
3831 
3832 
3833 static void
3834 nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3835     void *data)
3836 {
3837     size_t                  b_size, count;
3838     nxt_int_t               ret;
3839     nxt_app_t               *app;
3840     nxt_buf_t               *b, *next;
3841     nxt_port_t              *app_port;
3842     nxt_unit_field_t        *f;
3843     nxt_http_field_t        *field;
3844     nxt_http_request_t      *r;
3845     nxt_unit_response_t     *resp;
3846     nxt_request_rpc_data_t  *req_rpc_data;
3847 
3848     req_rpc_data = data;
3849 
3850     r = req_rpc_data->request;
3851     if (nxt_slow_path(r == NULL)) {
3852         return;
3853     }
3854 
3855     if (r->error) {
3856         nxt_request_rpc_data_unlink(task, req_rpc_data);
3857         return;
3858     }
3859 
3860     app = req_rpc_data->app;
3861     nxt_assert(app != NULL);
3862 
3863     if (msg->port_msg.type == _NXT_PORT_MSG_REQ_HEADERS_ACK) {
3864         nxt_router_req_headers_ack_handler(task, msg, req_rpc_data);
3865 
3866         return;
3867     }
3868 
3869     b = (msg->size == 0) ? NULL : msg->buf;
3870 
3871     if (msg->port_msg.last != 0) {
3872         nxt_debug(task, "router data create last buf");
3873 
3874         nxt_buf_chain_add(&b, nxt_http_buf_last(r));
3875 
3876         req_rpc_data->rpc_cancel = 0;
3877 
3878         if (req_rpc_data->apr_action == NXT_APR_REQUEST_FAILED) {
3879             req_rpc_data->apr_action = NXT_APR_GOT_RESPONSE;
3880         }
3881 
3882         nxt_request_rpc_data_unlink(task, req_rpc_data);
3883 
3884     } else {
3885         if (app->timeout != 0) {
3886             r->timer.handler = nxt_router_app_timeout;
3887             r->timer_data = req_rpc_data;
3888             nxt_timer_add(task->thread->engine, &r->timer, app->timeout);
3889         }
3890     }
3891 
3892     if (b == NULL) {
3893         return;
3894     }
3895 
3896     if (msg->buf == b) {
3897         /* Disable instant buffer completion/re-using by port. */
3898         msg->buf = NULL;
3899     }
3900 
3901     if (r->header_sent) {
3902         nxt_buf_chain_add(&r->out, b);
3903         nxt_http_request_send_body(task, r, NULL);
3904 
3905     } else {
3906         b_size = nxt_buf_is_mem(b) ? nxt_buf_mem_used_size(&b->mem) : 0;
3907 
3908         if (nxt_slow_path(b_size < sizeof(nxt_unit_response_t))) {
3909             nxt_alert(task, "response buffer too small: %z", b_size);
3910             goto fail;
3911         }
3912 
3913         resp = (void *) b->mem.pos;
3914         count = (b_size - sizeof(nxt_unit_response_t))
3915                     / sizeof(nxt_unit_field_t);
3916 
3917         if (nxt_slow_path(count < resp->fields_count)) {
3918             nxt_alert(task, "response buffer too small for fields count: %D",
3919                       resp->fields_count);
3920             goto fail;
3921         }
3922 
3923         field = NULL;
3924 
3925         for (f = resp->fields; f < resp->fields + resp->fields_count; f++) {
3926             if (f->skip) {
3927                 continue;
3928             }
3929 
3930             field = nxt_list_add(r->resp.fields);
3931 
3932             if (nxt_slow_path(field == NULL)) {
3933                 goto fail;
3934             }
3935 
3936             field->hash = f->hash;
3937             field->skip = 0;
3938             field->hopbyhop = 0;
3939 
3940             field->name_length = f->name_length;
3941             field->value_length = f->value_length;
3942             field->name = nxt_unit_sptr_get(&f->name);
3943             field->value = nxt_unit_sptr_get(&f->value);
3944 
3945             ret = nxt_http_field_process(field, &nxt_response_fields_hash, r);
3946             if (nxt_slow_path(ret != NXT_OK)) {
3947                 goto fail;
3948             }
3949 
3950             nxt_debug(task, "header%s: %*s: %*s",
3951                       (field->skip ? " skipped" : ""),
3952                       (size_t) field->name_length, field->name,
3953                       (size_t) field->value_length, field->value);
3954 
3955             if (field->skip) {
3956                 r->resp.fields->last->nelts--;
3957             }
3958         }
3959 
3960         r->status = resp->status;
3961 
3962         if (resp->piggyback_content_length != 0) {
3963             b->mem.pos = nxt_unit_sptr_get(&resp->piggyback_content);
3964             b->mem.free = b->mem.pos + resp->piggyback_content_length;
3965 
3966         } else {
3967             b->mem.pos = b->mem.free;
3968         }
3969 
3970         if (nxt_buf_mem_used_size(&b->mem) == 0) {
3971             next = b->next;
3972             b->next = NULL;
3973 
3974             nxt_work_queue_add(&task->thread->engine->fast_work_queue,
3975                                b->completion_handler, task, b, b->parent);
3976 
3977             b = next;
3978         }
3979 
3980         if (b != NULL) {
3981             nxt_buf_chain_add(&r->out, b);
3982         }
3983 
3984         nxt_http_request_header_send(task, r, nxt_http_request_send_body, NULL);
3985 
3986         if (r->websocket_handshake
3987             && r->status == NXT_HTTP_SWITCHING_PROTOCOLS)
3988         {
3989             app_port = req_rpc_data->app_port;
3990             if (nxt_slow_path(app_port == NULL)) {
3991                 goto fail;
3992             }
3993 
3994             nxt_thread_mutex_lock(&app->mutex);
3995 
3996             app_port->main_app_port->active_websockets++;
3997 
3998             nxt_thread_mutex_unlock(&app->mutex);
3999 
4000             nxt_router_app_port_release(task, app_port, NXT_APR_UPGRADE);
4001             req_rpc_data->apr_action = NXT_APR_CLOSE;
4002 
4003             nxt_debug(task, "stream #%uD upgrade", req_rpc_data->stream);
4004 
4005             r->state = &nxt_http_websocket;
4006 
4007         } else {
4008             r->state = &nxt_http_request_send_state;
4009         }
4010     }
4011 
4012     return;
4013 
4014 fail:
4015 
4016     nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
4017 
4018     nxt_request_rpc_data_unlink(task, req_rpc_data);
4019 }
4020 
4021 
4022 static void
4023 nxt_router_req_headers_ack_handler(nxt_task_t *task,
4024     nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data)
4025 {
4026     int                 res;
4027     nxt_app_t           *app;
4028     nxt_buf_t           *b;
4029     nxt_bool_t          start_process, unlinked;
4030     nxt_port_t          *app_port, *main_app_port, *idle_port;
4031     nxt_queue_link_t    *idle_lnk;
4032     nxt_http_request_t  *r;
4033 
4034     nxt_debug(task, "stream #%uD: got ack from %PI:%d",
4035               req_rpc_data->stream,
4036               msg->port_msg.pid, msg->port_msg.reply_port);
4037 
4038     nxt_port_rpc_ex_set_peer(task, msg->port, req_rpc_data,
4039                              msg->port_msg.pid);
4040 
4041     app = req_rpc_data->app;
4042     r = req_rpc_data->request;
4043 
4044     start_process = 0;
4045     unlinked = 0;
4046 
4047     nxt_thread_mutex_lock(&app->mutex);
4048 
4049     if (r->app_link.next != NULL) {
4050         nxt_queue_remove(&r->app_link);
4051         r->app_link.next = NULL;
4052 
4053         unlinked = 1;
4054     }
4055 
4056     app_port = nxt_port_hash_find(&app->port_hash, msg->port_msg.pid,
4057                                   msg->port_msg.reply_port);
4058     if (nxt_slow_path(app_port == NULL)) {
4059         nxt_thread_mutex_unlock(&app->mutex);
4060 
4061         nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
4062 
4063         if (unlinked) {
4064             nxt_mp_release(r->mem_pool);
4065         }
4066 
4067         return;
4068     }
4069 
4070     main_app_port = app_port->main_app_port;
4071 
4072     if (nxt_queue_chk_remove(&main_app_port->idle_link)) {
4073         app->idle_processes--;
4074 
4075         nxt_debug(task, "app '%V' move port %PI:%d out of %s (ack)",
4076                   &app->name, main_app_port->pid, main_app_port->id,
4077                   (main_app_port->idle_start ? "idle_ports" : "spare_ports"));
4078 
4079         /* Check port was in 'spare_ports' using idle_start field. */
4080         if (main_app_port->idle_start == 0
4081             && app->idle_processes >= app->spare_processes)
4082         {
4083             /*
4084              * If there is a vacant space in spare ports,
4085              * move the last idle to spare_ports.
4086              */
4087             nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4088 
4089             idle_lnk = nxt_queue_last(&app->idle_ports);
4090             idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link);
4091             nxt_queue_remove(idle_lnk);
4092 
4093             nxt_queue_insert_tail(&app->spare_ports, idle_lnk);
4094 
4095             idle_port->idle_start = 0;
4096 
4097             nxt_debug(task, "app '%V' move port %PI:%d from idle_ports "
4098                       "to spare_ports",
4099                       &app->name, idle_port->pid, idle_port->id);
4100         }
4101 
4102         if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) {
4103             app->pending_processes++;
4104             start_process = 1;
4105         }
4106     }
4107 
4108     main_app_port->active_requests++;
4109 
4110     nxt_port_inc_use(app_port);
4111 
4112     nxt_thread_mutex_unlock(&app->mutex);
4113 
4114     if (unlinked) {
4115         nxt_mp_release(r->mem_pool);
4116     }
4117 
4118     if (start_process) {
4119         nxt_router_start_app_process(task, app);
4120     }
4121 
4122     nxt_port_use(task, req_rpc_data->app_port, -1);
4123 
4124     req_rpc_data->app_port = app_port;
4125 
4126     b = req_rpc_data->msg_info.buf;
4127 
4128     if (b != NULL) {
4129         /* First buffer is already sent.  Start from second. */
4130         b = b->next;
4131 
4132         req_rpc_data->msg_info.buf->next = NULL;
4133     }
4134 
4135     if (req_rpc_data->msg_info.body_fd != -1 || b != NULL) {
4136         nxt_debug(task, "stream #%uD: send body fd %d", req_rpc_data->stream,
4137                   req_rpc_data->msg_info.body_fd);
4138 
4139         if (req_rpc_data->msg_info.body_fd != -1) {
4140             lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET);
4141         }
4142 
4143         res = nxt_port_socket_write(task, app_port, NXT_PORT_MSG_REQ_BODY,
4144                                     req_rpc_data->msg_info.body_fd,
4145                                     req_rpc_data->stream,
4146                                     task->thread->engine->port->id, b);
4147 
4148         if (nxt_slow_path(res != NXT_OK)) {
4149             nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
4150         }
4151     }
4152 
4153     if (app->timeout != 0) {
4154         r->timer.handler = nxt_router_app_timeout;
4155         r->timer_data = req_rpc_data;
4156         nxt_timer_add(task->thread->engine, &r->timer, app->timeout);
4157     }
4158 }
4159 
4160 
4161 static const nxt_http_request_state_t  nxt_http_request_send_state
4162     nxt_aligned(64) =
4163 {
4164     .error_handler = nxt_http_request_error_handler,
4165 };
4166 
4167 
4168 static void
4169 nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data)
4170 {
4171     nxt_buf_t           *out;
4172     nxt_http_request_t  *r;
4173 
4174     r = obj;
4175 
4176     out = r->out;
4177 
4178     if (out != NULL) {
4179         r->out = NULL;
4180         nxt_http_request_send(task, r, out);
4181     }
4182 }
4183 
4184 
4185 static void
4186 nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4187     void *data)
4188 {
4189     nxt_request_rpc_data_t  *req_rpc_data;
4190 
4191     req_rpc_data = data;
4192 
4193     req_rpc_data->rpc_cancel = 0;
4194 
4195     /* TODO cancel message and return if cancelled. */
4196     // nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream);
4197 
4198     if (req_rpc_data->request != NULL) {
4199         nxt_http_request_error(task, req_rpc_data->request,
4200                                NXT_HTTP_SERVICE_UNAVAILABLE);
4201     }
4202 
4203     nxt_request_rpc_data_unlink(task, req_rpc_data);
4204 }
4205 
4206 
4207 static void
4208 nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4209     void *data)
4210 {
4211     nxt_app_t        *app;
4212     nxt_port_t       *port;
4213     nxt_app_joint_t  *app_joint;
4214 
4215     app_joint = data;
4216     port = msg->u.new_port;
4217 
4218     nxt_assert(app_joint != NULL);
4219     nxt_assert(port != NULL);
4220     nxt_assert(port->type == NXT_PROCESS_APP);
4221     nxt_assert(port->id == 0);
4222 
4223     app = app_joint->app;
4224 
4225     nxt_router_app_joint_use(task, app_joint, -1);
4226 
4227     if (nxt_slow_path(app == NULL)) {
4228         nxt_debug(task, "new port ready for released app, send QUIT");
4229 
4230         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4231 
4232         return;
4233     }
4234 
4235     port->app = app;
4236     port->main_app_port = port;
4237 
4238     nxt_thread_mutex_lock(&app->mutex);
4239 
4240     nxt_assert(app->pending_processes != 0);
4241 
4242     app->pending_processes--;
4243     app->processes++;
4244     nxt_port_hash_add(&app->port_hash, port);
4245     app->port_hash_count++;
4246 
4247     nxt_thread_mutex_unlock(&app->mutex);
4248 
4249     nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d",
4250               &app->name, port->pid, app->processes, app->pending_processes);
4251 
4252     nxt_router_app_shared_port_send(task, port);
4253 
4254     nxt_router_app_port_release(task, port, NXT_APR_NEW_PORT);
4255 }
4256 
4257 
4258 static nxt_int_t
4259 nxt_router_app_shared_port_send(nxt_task_t *task, nxt_port_t *app_port)
4260 {
4261     nxt_buf_t                *b;
4262     nxt_port_t               *port;
4263     nxt_port_msg_new_port_t  *msg;
4264 
4265     b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
4266                              sizeof(nxt_port_data_t));
4267     if (nxt_slow_path(b == NULL)) {
4268         return NXT_ERROR;
4269     }
4270 
4271     port = app_port->app->shared_port;
4272 
4273     nxt_debug(task, "send port %FD to process %PI",
4274               port->pair[0], app_port->pid);
4275 
4276     b->mem.free += sizeof(nxt_port_msg_new_port_t);
4277     msg = (nxt_port_msg_new_port_t *) b->mem.pos;
4278 
4279     msg->id = port->id;
4280     msg->pid = port->pid;
4281     msg->max_size = port->max_size;
4282     msg->max_share = port->max_share;
4283     msg->type = port->type;
4284 
4285     return nxt_port_socket_write2(task, app_port,
4286                                   NXT_PORT_MSG_NEW_PORT,
4287                                   port->pair[0], port->queue_fd,
4288                                   0, 0, b);
4289 }
4290 
4291 
4292 static void
4293 nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4294     void *data)
4295 {
4296     nxt_app_t           *app;
4297     nxt_app_joint_t     *app_joint;
4298     nxt_queue_link_t    *link;
4299     nxt_http_request_t  *r;
4300 
4301     app_joint = data;
4302 
4303     nxt_assert(app_joint != NULL);
4304 
4305     app = app_joint->app;
4306 
4307     nxt_router_app_joint_use(task, app_joint, -1);
4308 
4309     if (nxt_slow_path(app == NULL)) {
4310         nxt_debug(task, "start error for released app");
4311 
4312         return;
4313     }
4314 
4315     nxt_debug(task, "app '%V' %p start error", &app->name, app);
4316 
4317     link = NULL;
4318 
4319     nxt_thread_mutex_lock(&app->mutex);
4320 
4321     nxt_assert(app->pending_processes != 0);
4322 
4323     app->pending_processes--;
4324 
4325     if (app->processes == 0 && !nxt_queue_is_empty(&app->ack_waiting_req)) {
4326         link = nxt_queue_first(&app->ack_waiting_req);
4327 
4328         nxt_queue_remove(link);
4329         link->next = NULL;
4330     }
4331 
4332     nxt_thread_mutex_unlock(&app->mutex);
4333 
4334     while (link != NULL) {
4335         r = nxt_container_of(link, nxt_http_request_t, app_link);
4336 
4337         nxt_event_engine_post(r->engine, &r->err_work);
4338 
4339         link = NULL;
4340 
4341         nxt_thread_mutex_lock(&app->mutex);
4342 
4343         if (app->processes == 0 && app->pending_processes == 0
4344             && !nxt_queue_is_empty(&app->ack_waiting_req))
4345         {
4346             link = nxt_queue_first(&app->ack_waiting_req);
4347 
4348             nxt_queue_remove(link);
4349             link->next = NULL;
4350         }
4351 
4352         nxt_thread_mutex_unlock(&app->mutex);
4353     }
4354 }
4355 
4356 
4357 
4358 nxt_inline nxt_port_t *
4359 nxt_router_app_get_port_for_quit(nxt_task_t *task, nxt_app_t *app)
4360 {
4361     nxt_port_t  *port;
4362 
4363     port = NULL;
4364 
4365     nxt_thread_mutex_lock(&app->mutex);
4366 
4367     nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
4368 
4369         /* Caller is responsible to decrease port use count. */
4370         nxt_queue_chk_remove(&port->app_link);
4371 
4372         if (nxt_queue_chk_remove(&port->idle_link)) {
4373             app->idle_processes--;
4374 
4375             nxt_debug(task, "app '%V' move port %PI:%d out of %s for quit",
4376                       &app->name, port->pid, port->id,
4377                       (port->idle_start ? "idle_ports" : "spare_ports"));
4378         }
4379 
4380         nxt_port_hash_remove(&app->port_hash, port);
4381         app->port_hash_count--;
4382 
4383         port->app = NULL;
4384         app->processes--;
4385 
4386         break;
4387 
4388     } nxt_queue_loop;
4389 
4390     nxt_thread_mutex_unlock(&app->mutex);
4391 
4392     return port;
4393 }
4394 
4395 
4396 static void
4397 nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i)
4398 {
4399     int  c;
4400 
4401     c = nxt_atomic_fetch_add(&app->use_count, i);
4402 
4403     if (i < 0 && c == -i) {
4404 
4405         if (task->thread->engine != app->engine) {
4406             nxt_event_engine_post(app->engine, &app->joint->free_app_work);
4407 
4408         } else {
4409             nxt_router_free_app(task, app->joint, NULL);
4410         }
4411     }
4412 }
4413 
4414 
4415 static void
4416 nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app)
4417 {
4418     nxt_debug(task, "app '%V' %p unlink", &app->name, app);
4419 
4420     nxt_queue_remove(&app->link);
4421 
4422     nxt_router_app_use(task, app, -1);
4423 }
4424 
4425 
4426 static void
4427 nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
4428     nxt_apr_action_t action)
4429 {
4430     int         inc_use;
4431     uint32_t    got_response, dec_requests;
4432     nxt_app_t   *app;
4433     nxt_bool_t  port_unchained, send_quit, adjust_idle_timer;
4434     nxt_port_t  *main_app_port;
4435 
4436     nxt_assert(port != NULL);
4437     nxt_assert(port->app != NULL);
4438 
4439     app = port->app;
4440 
4441     inc_use = 0;
4442     got_response = 0;
4443     dec_requests = 0;
4444 
4445     switch (action) {
4446     case NXT_APR_NEW_PORT:
4447         break;
4448     case NXT_APR_REQUEST_FAILED:
4449         dec_requests = 1;
4450         inc_use = -1;
4451         break;
4452     case NXT_APR_GOT_RESPONSE:
4453         got_response = 1;
4454         inc_use = -1;
4455         break;
4456     case NXT_APR_UPGRADE:
4457         got_response = 1;
4458         break;
4459     case NXT_APR_CLOSE:
4460         inc_use = -1;
4461         break;
4462     }
4463 
4464     nxt_debug(task, "app '%V' release port %PI:%d: %d %d", &app->name,
4465               port->pid, port->id,
4466               (int) inc_use, (int) got_response);
4467 
4468     if (port == app->shared_port) {
4469         nxt_thread_mutex_lock(&app->mutex);
4470 
4471         app->active_requests -= got_response + dec_requests;
4472 
4473         nxt_thread_mutex_unlock(&app->mutex);
4474 
4475         goto adjust_use;
4476     }
4477 
4478     main_app_port = port->main_app_port;
4479 
4480     nxt_thread_mutex_lock(&app->mutex);
4481 
4482     main_app_port->app_responses += got_response;
4483     main_app_port->active_requests -= got_response + dec_requests;
4484     app->active_requests -= got_response + dec_requests;
4485 
4486     if (main_app_port->pair[1] != -1
4487         && (app->max_requests == 0
4488             || main_app_port->app_responses < app->max_requests))
4489     {
4490         if (main_app_port->app_link.next == NULL) {
4491             nxt_queue_insert_tail(&app->ports, &main_app_port->app_link);
4492 
4493             nxt_port_inc_use(main_app_port);
4494         }
4495     }
4496 
4497     send_quit = (app->max_requests > 0
4498                  && main_app_port->app_responses >= app->max_requests);
4499 
4500     if (send_quit) {
4501         port_unchained = nxt_queue_chk_remove(&main_app_port->app_link);
4502 
4503         nxt_port_hash_remove(&app->port_hash, main_app_port);
4504         app->port_hash_count--;
4505 
4506         main_app_port->app = NULL;
4507         app->processes--;
4508 
4509     } else {
4510         port_unchained = 0;
4511     }
4512 
4513     adjust_idle_timer = 0;
4514 
4515     if (main_app_port->pair[1] != -1 && !send_quit
4516         && main_app_port->active_requests == 0
4517         && main_app_port->active_websockets == 0
4518         && main_app_port->idle_link.next == NULL)
4519     {
4520         if (app->idle_processes == app->spare_processes
4521             && app->adjust_idle_work.data == NULL)
4522         {
4523             adjust_idle_timer = 1;
4524             app->adjust_idle_work.data = app;
4525             app->adjust_idle_work.next = NULL;
4526         }
4527 
4528         if (app->idle_processes < app->spare_processes) {
4529             nxt_queue_insert_tail(&app->spare_ports, &main_app_port->idle_link);
4530 
4531             nxt_debug(task, "app '%V' move port %PI:%d to spare_ports",
4532                       &app->name, main_app_port->pid, main_app_port->id);
4533         } else {
4534             nxt_queue_insert_tail(&app->idle_ports, &main_app_port->idle_link);
4535 
4536             main_app_port->idle_start = task->thread->engine->timers.now;
4537 
4538             nxt_debug(task, "app '%V' move port %PI:%d to idle_ports",
4539                       &app->name, main_app_port->pid, main_app_port->id);
4540         }
4541 
4542         app->idle_processes++;
4543     }
4544 
4545     nxt_thread_mutex_unlock(&app->mutex);
4546 
4547     if (adjust_idle_timer) {
4548         nxt_router_app_use(task, app, 1);
4549         nxt_event_engine_post(app->engine, &app->adjust_idle_work);
4550     }
4551 
4552     /* ? */
4553     if (main_app_port->pair[1] == -1) {
4554         nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)",
4555                   &app->name, app, main_app_port, main_app_port->pid);
4556 
4557         goto adjust_use;
4558     }
4559 
4560     if (send_quit) {
4561         nxt_debug(task, "app '%V' %p send QUIT to port", &app->name, app);
4562 
4563         nxt_port_socket_write(task, main_app_port, NXT_PORT_MSG_QUIT, -1, 0, 0,
4564                               NULL);
4565 
4566         if (port_unchained) {
4567             nxt_port_use(task, main_app_port, -1);
4568         }
4569 
4570         goto adjust_use;
4571     }
4572 
4573     nxt_debug(task, "app '%V' %p requests queue is empty, keep the port",
4574               &app->name, app);
4575 
4576 adjust_use:
4577 
4578     nxt_port_use(task, port, inc_use);
4579 }
4580 
4581 
4582 void
4583 nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port)
4584 {
4585     nxt_app_t         *app;
4586     nxt_bool_t        unchain, start_process;
4587     nxt_port_t        *idle_port;
4588     nxt_queue_link_t  *idle_lnk;
4589 
4590     app = port->app;
4591 
4592     nxt_assert(app != NULL);
4593 
4594     nxt_thread_mutex_lock(&app->mutex);
4595 
4596     nxt_port_hash_remove(&app->port_hash, port);
4597     app->port_hash_count--;
4598 
4599     if (port->id != 0) {
4600         nxt_thread_mutex_unlock(&app->mutex);
4601 
4602         nxt_debug(task, "app '%V' port (%PI, %d) closed", &app->name,
4603                   port->pid, port->id);
4604 
4605         return;
4606     }
4607 
4608     unchain = nxt_queue_chk_remove(&port->app_link);
4609 
4610     if (nxt_queue_chk_remove(&port->idle_link)) {
4611         app->idle_processes--;
4612 
4613         nxt_debug(task, "app '%V' move port %PI:%d out of %s before close",
4614                   &app->name, port->pid, port->id,
4615                   (port->idle_start ? "idle_ports" : "spare_ports"));
4616 
4617         if (port->idle_start == 0
4618             && app->idle_processes >= app->spare_processes)
4619         {
4620             nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4621 
4622             idle_lnk = nxt_queue_last(&app->idle_ports);
4623             idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link);
4624             nxt_queue_remove(idle_lnk);
4625 
4626             nxt_queue_insert_tail(&app->spare_ports, idle_lnk);
4627 
4628             idle_port->idle_start = 0;
4629 
4630             nxt_debug(task, "app '%V' move port %PI:%d from idle_ports "
4631                       "to spare_ports",
4632                       &app->name, idle_port->pid, idle_port->id);
4633         }
4634     }
4635 
4636     app->processes--;
4637 
4638     start_process = !task->thread->engine->shutdown
4639                     && nxt_router_app_can_start(app)
4640                     && nxt_router_app_need_start(app);
4641 
4642     if (start_process) {
4643         app->pending_processes++;
4644     }
4645 
4646     nxt_thread_mutex_unlock(&app->mutex);
4647 
4648     nxt_debug(task, "app '%V' pid %PI closed", &app->name, port->pid);
4649 
4650     if (unchain) {
4651         nxt_port_use(task, port, -1);
4652     }
4653 
4654     if (start_process) {
4655         nxt_router_start_app_process(task, app);
4656     }
4657 }
4658 
4659 
4660 static void
4661 nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data)
4662 {
4663     nxt_app_t           *app;
4664     nxt_bool_t          queued;
4665     nxt_port_t          *port;
4666     nxt_msec_t          timeout, threshold;
4667     nxt_queue_link_t    *lnk;
4668     nxt_event_engine_t  *engine;
4669 
4670     app = obj;
4671     queued = (data == app);
4672 
4673     nxt_debug(task, "nxt_router_adjust_idle_timer: app \"%V\", queued %b",
4674               &app->name, queued);
4675 
4676     engine = task->thread->engine;
4677 
4678     nxt_assert(app->engine == engine);
4679 
4680     threshold = engine->timers.now + app->joint->idle_timer.bias;
4681     timeout = 0;
4682 
4683     nxt_thread_mutex_lock(&app->mutex);
4684 
4685     if (queued) {
4686         app->adjust_idle_work.data = NULL;
4687     }
4688 
4689     nxt_debug(task, "app '%V' idle_processes %d, spare_processes %d",
4690               &app->name,
4691               (int) app->idle_processes, (int) app->spare_processes);
4692 
4693     while (app->idle_processes > app->spare_processes) {
4694 
4695         nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4696 
4697         lnk = nxt_queue_first(&app->idle_ports);
4698         port = nxt_queue_link_data(lnk, nxt_port_t, idle_link);
4699 
4700         timeout = port->idle_start + app->idle_timeout;
4701 
4702         nxt_debug(task, "app '%V' pid %PI, start %M, timeout %M, threshold %M",
4703                   &app->name, port->pid,
4704                   port->idle_start, timeout, threshold);
4705 
4706         if (timeout > threshold) {
4707             break;
4708         }
4709 
4710         nxt_queue_remove(lnk);
4711         lnk->next = NULL;
4712 
4713         nxt_debug(task, "app '%V' move port %PI:%d out of idle_ports (timeout)",
4714                   &app->name, port->pid, port->id);
4715 
4716         nxt_queue_chk_remove(&port->app_link);
4717 
4718         nxt_port_hash_remove(&app->port_hash, port);
4719         app->port_hash_count--;
4720 
4721         app->idle_processes--;
4722         app->processes--;
4723         port->app = NULL;
4724 
4725         nxt_thread_mutex_unlock(&app->mutex);
4726 
4727         nxt_debug(task, "app '%V' send QUIT to idle port %PI",
4728                   &app->name, port->pid);
4729 
4730         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4731 
4732         nxt_port_use(task, port, -1);
4733 
4734         nxt_thread_mutex_lock(&app->mutex);
4735     }
4736 
4737     nxt_thread_mutex_unlock(&app->mutex);
4738 
4739     if (timeout > threshold) {
4740         nxt_timer_add(engine, &app->joint->idle_timer, timeout - threshold);
4741 
4742     } else {
4743         nxt_timer_disable(engine, &app->joint->idle_timer);
4744     }
4745 
4746     if (queued) {
4747         nxt_router_app_use(task, app, -1);
4748     }
4749 }
4750 
4751 
4752 static void
4753 nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, void *data)
4754 {
4755     nxt_timer_t      *timer;
4756     nxt_app_joint_t  *app_joint;
4757 
4758     timer = obj;
4759     app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer);
4760 
4761     if (nxt_fast_path(app_joint->app != NULL)) {
4762         nxt_router_adjust_idle_timer(task, app_joint->app, NULL);
4763     }
4764 }
4765 
4766 
4767 static void
4768 nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, void *data)
4769 {
4770     nxt_timer_t      *timer;
4771     nxt_app_joint_t  *app_joint;
4772 
4773     timer = obj;
4774     app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer);
4775 
4776     nxt_router_app_joint_use(task, app_joint, -1);
4777 }
4778 
4779 
4780 static void
4781 nxt_router_free_app(nxt_task_t *task, void *obj, void *data)
4782 {
4783     nxt_app_t        *app;
4784     nxt_port_t       *port;
4785     nxt_app_joint_t  *app_joint;
4786 
4787     app_joint = obj;
4788     app = app_joint->app;
4789 
4790     for ( ;; ) {
4791         port = nxt_router_app_get_port_for_quit(task, app);
4792         if (port == NULL) {
4793             break;
4794         }
4795 
4796         nxt_debug(task, "send QUIT to app '%V' pid %PI", &app->name, port->pid);
4797 
4798         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4799 
4800         nxt_port_use(task, port, -1);
4801     }
4802 
4803     nxt_thread_mutex_lock(&app->mutex);
4804 
4805     for ( ;; ) {
4806         port = nxt_port_hash_retrieve(&app->port_hash);
4807         if (port == NULL) {
4808             break;
4809         }
4810 
4811         app->port_hash_count--;
4812 
4813         port->app = NULL;
4814 
4815         nxt_port_close(task, port);
4816 
4817         nxt_port_use(task, port, -1);
4818     }
4819 
4820     nxt_thread_mutex_unlock(&app->mutex);
4821 
4822     nxt_assert(app->processes == 0);
4823     nxt_assert(app->active_requests == 0);
4824     nxt_assert(app->port_hash_count == 0);
4825     nxt_assert(app->idle_processes == 0);
4826     nxt_assert(nxt_queue_is_empty(&app->ports));
4827     nxt_assert(nxt_queue_is_empty(&app->spare_ports));
4828     nxt_assert(nxt_queue_is_empty(&app->idle_ports));
4829 
4830     nxt_port_mmaps_destroy(&app->outgoing, 1);
4831 
4832     nxt_thread_mutex_destroy(&app->outgoing.mutex);
4833 
4834     if (app->shared_port != NULL) {
4835         app->shared_port->app = NULL;
4836         nxt_port_close(task, app->shared_port);
4837         nxt_port_use(task, app->shared_port, -1);
4838     }
4839 
4840     nxt_thread_mutex_destroy(&app->mutex);
4841     nxt_mp_destroy(app->mem_pool);
4842 
4843     app_joint->app = NULL;
4844 
4845     if (nxt_timer_delete(task->thread->engine, &app_joint->idle_timer)) {
4846         app_joint->idle_timer.handler = nxt_router_app_joint_release_handler;
4847         nxt_timer_add(task->thread->engine, &app_joint->idle_timer, 0);
4848 
4849     } else {
4850         nxt_router_app_joint_use(task, app_joint, -1);
4851     }
4852 }
4853 
4854 
4855 static void
4856 nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
4857     nxt_request_rpc_data_t *req_rpc_data)
4858 {
4859     nxt_bool_t          start_process;
4860     nxt_port_t          *port;
4861     nxt_http_request_t  *r;
4862 
4863     start_process = 0;
4864 
4865     nxt_thread_mutex_lock(&app->mutex);
4866 
4867     port = app->shared_port;
4868     nxt_port_inc_use(port);
4869 
4870     app->active_requests++;
4871 
4872     if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) {
4873         app->pending_processes++;
4874         start_process = 1;
4875     }
4876 
4877     r = req_rpc_data->request;
4878 
4879     /*
4880      * Put request into application-wide list to be able to cancel request
4881      * if something goes wrong with application processes.
4882      */
4883     nxt_queue_insert_tail(&app->ack_waiting_req, &r->app_link);
4884 
4885     nxt_thread_mutex_unlock(&app->mutex);
4886 
4887     /*
4888      * Retain request memory pool while request is linked in ack_waiting_req
4889      * to guarantee request structure memory is accessble.
4890      */
4891     nxt_mp_retain(r->mem_pool);
4892 
4893     req_rpc_data->app_port = port;
4894     req_rpc_data->apr_action = NXT_APR_REQUEST_FAILED;
4895 
4896     if (start_process) {
4897         nxt_router_start_app_process(task, app);
4898     }
4899 }
4900 
4901 
4902 void
4903 nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r,
4904     nxt_app_t *app)
4905 {
4906     nxt_event_engine_t      *engine;
4907     nxt_request_rpc_data_t  *req_rpc_data;
4908 
4909     engine = task->thread->engine;
4910 
4911     req_rpc_data = nxt_port_rpc_register_handler_ex(task, engine->port,
4912                                           nxt_router_response_ready_handler,
4913                                           nxt_router_response_error_handler,
4914                                           sizeof(nxt_request_rpc_data_t));
4915     if (nxt_slow_path(req_rpc_data == NULL)) {
4916         nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
4917         return;
4918     }
4919 
4920     /*
4921      * At this point we have request req_rpc_data allocated and registered
4922      * in port handlers.  Need to fixup request memory pool.  Counterpart
4923      * release will be called via following call chain:
4924      *    nxt_request_rpc_data_unlink() ->
4925      *        nxt_router_http_request_release_post() ->
4926      *            nxt_router_http_request_release()
4927      */
4928     nxt_mp_retain(r->mem_pool);
4929 
4930     r->timer.task = &engine->task;
4931     r->timer.work_queue = &engine->fast_work_queue;
4932     r->timer.log = engine->task.log;
4933     r->timer.bias = NXT_TIMER_DEFAULT_BIAS;
4934 
4935     r->engine = engine;
4936     r->err_work.handler = nxt_router_http_request_error;
4937     r->err_work.task = task;
4938     r->err_work.obj = r;
4939 
4940     req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data);
4941     req_rpc_data->app = app;
4942     req_rpc_data->msg_info.body_fd = -1;
4943     req_rpc_data->rpc_cancel = 1;
4944 
4945     nxt_router_app_use(task, app, 1);
4946 
4947     req_rpc_data->request = r;
4948     r->req_rpc_data = req_rpc_data;
4949 
4950     if (r->last != NULL) {
4951         r->last->completion_handler = nxt_router_http_request_done;
4952     }
4953 
4954     nxt_router_app_port_get(task, app, req_rpc_data);
4955     nxt_router_app_prepare_request(task, req_rpc_data);
4956 }
4957 
4958 
4959 static void
4960 nxt_router_http_request_error(nxt_task_t *task, void *obj, void *data)
4961 {
4962     nxt_http_request_t  *r;
4963 
4964     r = obj;
4965 
4966     nxt_debug(task, "router http request error (rpc_data %p)", r->req_rpc_data);
4967 
4968     nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
4969 
4970     if (r->req_rpc_data != NULL) {
4971         nxt_request_rpc_data_unlink(task, r->req_rpc_data);
4972     }
4973 
4974     nxt_mp_release(r->mem_pool);
4975 }
4976 
4977 
4978 static void
4979 nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data)
4980 {
4981     nxt_http_request_t  *r;
4982 
4983     r = data;
4984 
4985     nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data);
4986 
4987     if (r->req_rpc_data != NULL) {
4988         nxt_request_rpc_data_unlink(task, r->req_rpc_data);
4989     }
4990 
4991     nxt_http_request_close_handler(task, r, r->proto.any);
4992 }
4993 
4994 
4995 static void
4996 nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj, void *data)
4997 {
4998 }
4999 
5000 
5001 static void
5002 nxt_router_app_prepare_request(nxt_task_t *task,
5003     nxt_request_rpc_data_t *req_rpc_data)
5004 {
5005     nxt_app_t         *app;
5006     nxt_buf_t         *buf, *body;
5007     nxt_int_t         res;
5008     nxt_port_t        *port, *reply_port;
5009 
5010     int                   notify;
5011     struct {
5012         nxt_port_msg_t       pm;
5013         nxt_port_mmap_msg_t  mm;
5014     } msg;
5015 
5016 
5017     app = req_rpc_data->app;
5018 
5019     nxt_assert(app != NULL);
5020 
5021     port = req_rpc_data->app_port;
5022 
5023     nxt_assert(port != NULL);
5024     nxt_assert(port->queue != NULL);
5025 
5026     reply_port = task->thread->engine->port;
5027 
5028     buf = nxt_router_prepare_msg(task, req_rpc_data->request, app,
5029                                  nxt_app_msg_prefix[app->type]);
5030     if (nxt_slow_path(buf == NULL)) {
5031         nxt_alert(task, "stream #%uD, app '%V': failed to prepare app message",
5032                   req_rpc_data->stream, &app->name);
5033 
5034         nxt_http_request_error(task, req_rpc_data->request,
5035                                NXT_HTTP_INTERNAL_SERVER_ERROR);
5036 
5037         return;
5038     }
5039 
5040     nxt_debug(task, "about to send %O bytes buffer to app process port %d",
5041                     nxt_buf_used_size(buf),
5042                     port->socket.fd);
5043 
5044     req_rpc_data->msg_info.buf = buf;
5045 
5046     body = req_rpc_data->request->body;
5047 
5048     if (body != NULL && nxt_buf_is_file(body)) {
5049         req_rpc_data->msg_info.body_fd = body->file->fd;
5050 
5051         body->file->fd = -1;
5052 
5053     } else {
5054         req_rpc_data->msg_info.body_fd = -1;
5055     }
5056 
5057     msg.pm.stream = req_rpc_data->stream;
5058     msg.pm.pid = reply_port->pid;
5059     msg.pm.reply_port = reply_port->id;
5060     msg.pm.type = NXT_PORT_MSG_REQ_HEADERS;
5061     msg.pm.last = 0;
5062     msg.pm.mmap = 1;
5063     msg.pm.nf = 0;
5064     msg.pm.mf = 0;
5065     msg.pm.tracking = 0;
5066 
5067     nxt_port_mmap_handler_t *mmap_handler = buf->parent;
5068     nxt_port_mmap_header_t *hdr = mmap_handler->hdr;
5069 
5070     msg.mm.mmap_id = hdr->id;
5071     msg.mm.chunk_id = nxt_port_mmap_chunk_id(hdr, buf->mem.pos);
5072     msg.mm.size = nxt_buf_used_size(buf);
5073 
5074     res = nxt_app_queue_send(port->queue, &msg, sizeof(msg),
5075                              req_rpc_data->stream, &notify,
5076                              &req_rpc_data->msg_info.tracking_cookie);
5077     if (nxt_fast_path(res == NXT_OK)) {
5078         if (notify != 0) {
5079             (void) nxt_port_socket_write(task, port,
5080                                          NXT_PORT_MSG_READ_QUEUE,
5081                                          -1, req_rpc_data->stream,
5082                                          reply_port->id, NULL);
5083 
5084         } else {
5085             nxt_debug(task, "queue is not empty");
5086         }
5087 
5088         buf->is_port_mmap_sent = 1;
5089         buf->mem.pos = buf->mem.free;
5090 
5091     } else {
5092         nxt_alert(task, "stream #%uD, app '%V': failed to send app message",
5093                   req_rpc_data->stream, &app->name);
5094 
5095         nxt_http_request_error(task, req_rpc_data->request,
5096                                NXT_HTTP_INTERNAL_SERVER_ERROR);
5097     }
5098 }
5099 
5100 
5101 struct nxt_fields_iter_s {
5102     nxt_list_part_t   *part;
5103     nxt_http_field_t  *field;
5104 };
5105 
5106 typedef struct nxt_fields_iter_s  nxt_fields_iter_t;
5107 
5108 
5109 static nxt_http_field_t *
5110 nxt_fields_part_first(nxt_list_part_t *part, nxt_fields_iter_t *i)
5111 {
5112     if (part == NULL) {
5113         return NULL;
5114     }
5115 
5116     while (part->nelts == 0) {
5117         part = part->next;
5118         if (part == NULL) {
5119             return NULL;
5120         }
5121     }
5122 
5123     i->part = part;
5124     i->field = nxt_list_data(i->part);
5125 
5126     return i->field;
5127 }
5128 
5129 
5130 static nxt_http_field_t *
5131 nxt_fields_first(nxt_list_t *fields, nxt_fields_iter_t *i)
5132 {
5133     return nxt_fields_part_first(nxt_list_part(fields), i);
5134 }
5135 
5136 
5137 static nxt_http_field_t *
5138 nxt_fields_next(nxt_fields_iter_t *i)
5139 {
5140     nxt_http_field_t  *end = nxt_list_data(i->part);
5141 
5142     end += i->part->nelts;
5143     i->field++;
5144 
5145     if (i->field < end) {
5146         return i->field;
5147     }
5148 
5149     return nxt_fields_part_first(i->part->next, i);
5150 }
5151 
5152 
5153 static nxt_buf_t *
5154 nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r,
5155     nxt_app_t *app, const nxt_str_t *prefix)
5156 {
5157     void                *target_pos, *query_pos;
5158     u_char              *pos, *end, *p, c;
5159     size_t              fields_count, req_size, size, free_size;
5160     size_t              copy_size;
5161     nxt_off_t           content_length;
5162     nxt_buf_t           *b, *buf, *out, **tail;
5163     nxt_http_field_t    *field, *dup;
5164     nxt_unit_field_t    *dst_field;
5165     nxt_fields_iter_t   iter, dup_iter;
5166     nxt_unit_request_t  *req;
5167 
5168     req_size = sizeof(nxt_unit_request_t)
5169                + r->method->length + 1
5170                + r->version.length + 1
5171                + r->remote->length + 1
5172                + r->local->length + 1
5173                + r->server_name.length + 1
5174                + r->target.length + 1
5175                + (r->path->start != r->target.start ? r->path->length + 1 : 0);
5176 
5177     content_length = r->content_length_n < 0 ? 0 : r->content_length_n;
5178     fields_count = 0;
5179 
5180     nxt_list_each(field, r->fields) {
5181         fields_count++;
5182 
5183         req_size += field->name_length + prefix->length + 1
5184                     + field->value_length + 1;
5185     } nxt_list_loop;
5186 
5187     req_size += fields_count * sizeof(nxt_unit_field_t);
5188 
5189     if (nxt_slow_path(req_size > PORT_MMAP_DATA_SIZE)) {
5190         nxt_alert(task, "headers to big to fit in shared memory (%d)",
5191                   (int) req_size);
5192 
5193         return NULL;
5194     }
5195 
5196     out = nxt_port_mmap_get_buf(task, &app->outgoing,
5197               nxt_min(req_size + content_length, PORT_MMAP_DATA_SIZE));
5198     if (nxt_slow_path(out == NULL)) {
5199         return NULL;
5200     }
5201 
5202     req = (nxt_unit_request_t *) out->mem.free;
5203     out->mem.free += req_size;
5204 
5205     req->app_target = r->app_target;
5206 
5207     req->content_length = content_length;
5208 
5209     p = (u_char *) (req->fields + fields_count);
5210 
5211     nxt_debug(task, "fields_count=%d", (int) fields_count);
5212 
5213     req->method_length = r->method->length;
5214     nxt_unit_sptr_set(&req->method, p);
5215     p = nxt_cpymem(p, r->method->start, r->method->length);
5216     *p++ = '\0';
5217 
5218     req->version_length = r->version.length;
5219     nxt_unit_sptr_set(&req->version, p);
5220     p = nxt_cpymem(p, r->version.start, r->version.length);
5221     *p++ = '\0';
5222 
5223     req->remote_length = r->remote->address_length;
5224     nxt_unit_sptr_set(&req->remote, p);
5225     p = nxt_cpymem(p, nxt_sockaddr_address(r->remote),
5226                    r->remote->address_length);
5227     *p++ = '\0';
5228 
5229     req->local_length = r->local->address_length;
5230     nxt_unit_sptr_set(&req->local, p);
5231     p = nxt_cpymem(p, nxt_sockaddr_address(r->local), r->local->address_length);
5232     *p++ = '\0';
5233 
5234     req->tls = (r->tls != NULL);
5235     req->websocket_handshake = r->websocket_handshake;
5236 
5237     req->server_name_length = r->server_name.length;
5238     nxt_unit_sptr_set(&req->server_name, p);
5239     p = nxt_cpymem(p, r->server_name.start, r->server_name.length);
5240     *p++ = '\0';
5241 
5242     target_pos = p;
5243     req->target_length = (uint32_t) r->target.length;
5244     nxt_unit_sptr_set(&req->target, p);
5245     p = nxt_cpymem(p, r->target.start, r->target.length);
5246     *p++ = '\0';
5247 
5248     req->path_length = (uint32_t) r->path->length;
5249     if (r->path->start == r->target.start) {
5250         nxt_unit_sptr_set(&req->path, target_pos);
5251 
5252     } else {
5253         nxt_unit_sptr_set(&req->path, p);
5254         p = nxt_cpymem(p, r->path->start, r->path->length);
5255         *p++ = '\0';
5256     }
5257 
5258     req->query_length = r->args != NULL ? (uint32_t) r->args->length : 0;
5259     if (r->args != NULL && r->args->start != NULL) {
5260         query_pos = nxt_pointer_to(target_pos,
5261                                    r->args->start - r->target.start);
5262 
5263         nxt_unit_sptr_set(&req->query, query_pos);
5264 
5265     } else {
5266         req->query.offset = 0;
5267     }
5268 
5269     req->content_length_field = NXT_UNIT_NONE_FIELD;
5270     req->content_type_field   = NXT_UNIT_NONE_FIELD;
5271     req->cookie_field         = NXT_UNIT_NONE_FIELD;
5272     req->authorization_field  = NXT_UNIT_NONE_FIELD;
5273 
5274     dst_field = req->fields;
5275 
5276     for (field = nxt_fields_first(r->fields, &iter);
5277          field != NULL;
5278          field = nxt_fields_next(&iter))
5279     {
5280         if (field->skip) {
5281             continue;
5282         }
5283 
5284         dst_field->hash = field->hash;
5285         dst_field->skip = 0;
5286         dst_field->name_length = field->name_length + prefix->length;
5287         dst_field->value_length = field->value_length;
5288 
5289         if (field == r->content_length) {
5290             req->content_length_field = dst_field - req->fields;
5291 
5292         } else if (field == r->content_type) {
5293             req->content_type_field = dst_field - req->fields;
5294 
5295         } else if (field == r->cookie) {
5296             req->cookie_field = dst_field - req->fields;
5297 
5298         } else if (field == r->authorization) {
5299             req->authorization_field = dst_field - req->fields;
5300         }
5301 
5302         nxt_debug(task, "add field 0x%04Xd, %d, %d, %p : %d %p",
5303                   (int) field->hash, (int) field->skip,
5304                   (int) field->name_length, field->name,
5305                   (int) field->value_length, field->value);
5306 
5307         if (prefix->length != 0) {
5308             nxt_unit_sptr_set(&dst_field->name, p);
5309             p = nxt_cpymem(p, prefix->start, prefix->length);
5310 
5311             end = field->name + field->name_length;
5312             for (pos = field->name; pos < end; pos++) {
5313                 c = *pos;
5314 
5315                 if (c >= 'a' && c <= 'z') {
5316                     *p++ = (c & ~0x20);
5317                     continue;
5318                 }
5319 
5320                 if (c == '-') {
5321                     *p++ = '_';
5322                     continue;
5323                 }
5324 
5325                 *p++ = c;
5326             }
5327 
5328         } else {
5329             nxt_unit_sptr_set(&dst_field->name, p);
5330             p = nxt_cpymem(p, field->name, field->name_length);
5331         }
5332 
5333         *p++ = '\0';
5334 
5335         nxt_unit_sptr_set(&dst_field->value, p);
5336         p = nxt_cpymem(p, field->value, field->value_length);
5337 
5338         if (prefix->length != 0) {
5339             dup_iter = iter;
5340 
5341             for (dup = nxt_fields_next(&dup_iter);
5342                  dup != NULL;
5343                  dup = nxt_fields_next(&dup_iter))
5344             {
5345                 if (dup->name_length != field->name_length
5346                     || dup->skip
5347                     || dup->hash != field->hash
5348                     || nxt_memcasecmp(dup->name, field->name, dup->name_length))
5349                 {
5350                     continue;
5351                 }
5352 
5353                 p = nxt_cpymem(p, ", ", 2);
5354                 p = nxt_cpymem(p, dup->value, dup->value_length);
5355 
5356                 dst_field->value_length += 2 + dup->value_length;
5357 
5358                 dup->skip = 1;
5359             }
5360         }
5361 
5362         *p++ = '\0';
5363 
5364         dst_field++;
5365     }
5366 
5367     req->fields_count = (uint32_t) (dst_field - req->fields);
5368 
5369     nxt_unit_sptr_set(&req->preread_content, out->mem.free);
5370 
5371     buf = out;
5372     tail = &buf->next;
5373 
5374     for (b = r->body; b != NULL; b = b->next) {
5375         size = nxt_buf_mem_used_size(&b->mem);
5376         pos = b->mem.pos;
5377 
5378         while (size > 0) {
5379             if (buf == NULL) {
5380                 free_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
5381 
5382                 buf = nxt_port_mmap_get_buf(task, &app->outgoing, free_size);
5383                 if (nxt_slow_path(buf == NULL)) {
5384                     while (out != NULL) {
5385                         buf = out->next;
5386                         out->next = NULL;
5387                         out->completion_handler(task, out, out->parent);
5388                         out = buf;
5389                     }
5390                     return NULL;
5391                 }
5392 
5393                 *tail = buf;
5394                 tail = &buf->next;
5395 
5396             } else {
5397                 free_size = nxt_buf_mem_free_size(&buf->mem);
5398                 if (free_size < size
5399                     && nxt_port_mmap_increase_buf(task, buf, size, 1)
5400                        == NXT_OK)
5401                 {
5402                     free_size = nxt_buf_mem_free_size(&buf->mem);
5403                 }
5404             }
5405 
5406             if (free_size > 0) {
5407                 copy_size = nxt_min(free_size, size);
5408 
5409                 buf->mem.free = nxt_cpymem(buf->mem.free, pos, copy_size);
5410 
5411                 size -= copy_size;
5412                 pos += copy_size;
5413 
5414                 if (size == 0) {
5415                     break;
5416                 }
5417             }
5418 
5419             buf = NULL;
5420         }
5421     }
5422 
5423     return out;
5424 }
5425 
5426 
5427 static void
5428 nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
5429 {
5430     nxt_timer_t              *timer;
5431     nxt_http_request_t       *r;
5432     nxt_request_rpc_data_t   *req_rpc_data;
5433 
5434     timer = obj;
5435 
5436     nxt_debug(task, "router app timeout");
5437 
5438     r = nxt_timer_data(timer, nxt_http_request_t, timer);
5439     req_rpc_data = r->timer_data;
5440 
5441     nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
5442 
5443     nxt_request_rpc_data_unlink(task, req_rpc_data);
5444 }
5445 
5446 
5447 static void
5448 nxt_router_http_request_release_post(nxt_task_t *task, nxt_http_request_t *r)
5449 {
5450     r->timer.handler = nxt_router_http_request_release;
5451     nxt_timer_add(task->thread->engine, &r->timer, 0);
5452 }
5453 
5454 
5455 static void
5456 nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data)
5457 {
5458     nxt_http_request_t  *r;
5459 
5460     nxt_debug(task, "http request pool release");
5461 
5462     r = nxt_timer_data(obj, nxt_http_request_t, timer);
5463 
5464     nxt_mp_release(r->mem_pool);
5465 }
5466 
5467 
5468 static void
5469 nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5470 {
5471     size_t                   mi;
5472     uint32_t                 i;
5473     nxt_bool_t               ack;
5474     nxt_process_t            *process;
5475     nxt_free_map_t           *m;
5476     nxt_port_mmap_handler_t  *mmap_handler;
5477 
5478     nxt_debug(task, "oosm in %PI", msg->port_msg.pid);
5479 
5480     process = nxt_runtime_process_find(task->thread->runtime,
5481                                        msg->port_msg.pid);
5482     if (nxt_slow_path(process == NULL)) {
5483         return;
5484     }
5485 
5486     ack = 0;
5487 
5488     /*
5489      * To mitigate possible racing condition (when OOSM message received
5490      * after some of the memory was already freed), need to try to find
5491      * first free segment in shared memory and send ACK if found.
5492      */
5493 
5494     nxt_thread_mutex_lock(&process->incoming.mutex);
5495 
5496     for (i = 0; i < process->incoming.size; i++) {
5497         mmap_handler = process->incoming.elts[i].mmap_handler;
5498 
5499         if (nxt_slow_path(mmap_handler == NULL)) {
5500             continue;
5501         }
5502 
5503         m = mmap_handler->hdr->free_map;
5504 
5505         for (mi = 0; mi < MAX_FREE_IDX; mi++) {
5506             if (m[mi] != 0) {
5507                 ack = 1;
5508 
5509                 nxt_debug(task, "oosm: already free #%uD %uz = 0x%08xA",
5510                           i, mi, m[mi]);
5511 
5512                 break;
5513             }
5514         }
5515     }
5516 
5517     nxt_thread_mutex_unlock(&process->incoming.mutex);
5518 
5519     if (ack) {
5520         nxt_process_broadcast_shm_ack(task, process);
5521     }
5522 }
5523 
5524 
5525 static void
5526 nxt_router_get_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5527 {
5528     nxt_fd_t                 fd;
5529     nxt_port_t               *port;
5530     nxt_runtime_t            *rt;
5531     nxt_port_mmaps_t         *mmaps;
5532     nxt_port_msg_get_mmap_t  *get_mmap_msg;
5533     nxt_port_mmap_handler_t  *mmap_handler;
5534 
5535     rt = task->thread->runtime;
5536 
5537     port = nxt_runtime_port_find(rt, msg->port_msg.pid,
5538                                  msg->port_msg.reply_port);
5539     if (nxt_slow_path(port == NULL)) {
5540         nxt_alert(task, "get_mmap_handler: reply_port %PI:%d not found",
5541                   msg->port_msg.pid, msg->port_msg.reply_port);
5542 
5543         return;
5544     }
5545 
5546     if (nxt_slow_path(nxt_buf_used_size(msg->buf)
5547                       < (int) sizeof(nxt_port_msg_get_mmap_t)))
5548     {
5549         nxt_alert(task, "get_mmap_handler: message buffer too small (%d)",
5550                   (int) nxt_buf_used_size(msg->buf));
5551 
5552         return;
5553     }
5554 
5555     get_mmap_msg = (nxt_port_msg_get_mmap_t *) msg->buf->mem.pos;
5556 
5557     nxt_assert(port->type == NXT_PROCESS_APP);
5558 
5559     if (nxt_slow_path(port->app == NULL)) {
5560         nxt_alert(task, "get_mmap_handler: app == NULL for reply port %PI:%d",
5561                   port->pid, port->id);
5562 
5563         // FIXME
5564         nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
5565                               -1, msg->port_msg.stream, 0, NULL);
5566 
5567         return;
5568     }
5569 
5570     mmaps = &port->app->outgoing;
5571     nxt_thread_mutex_lock(&mmaps->mutex);
5572 
5573     if (nxt_slow_path(get_mmap_msg->id >= mmaps->size)) {
5574         nxt_thread_mutex_unlock(&mmaps->mutex);
5575 
5576         nxt_alert(task, "get_mmap_handler: mmap id is too big (%d)",
5577                   (int) get_mmap_msg->id);
5578 
5579         // FIXME
5580         nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
5581                               -1, msg->port_msg.stream, 0, NULL);
5582         return;
5583     }
5584 
5585     mmap_handler = mmaps->elts[get_mmap_msg->id].mmap_handler;
5586 
5587     fd = mmap_handler->fd;
5588 
5589     nxt_thread_mutex_unlock(&mmaps->mutex);
5590 
5591     nxt_debug(task, "get mmap %PI:%d found",
5592               msg->port_msg.pid, (int) get_mmap_msg->id);
5593 
5594     (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL);
5595 }
5596 
5597 
5598 static void
5599 nxt_router_get_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5600 {
5601     nxt_port_t               *port, *reply_port;
5602     nxt_runtime_t            *rt;
5603     nxt_port_msg_get_port_t  *get_port_msg;
5604 
5605     rt = task->thread->runtime;
5606 
5607     reply_port = nxt_runtime_port_find(rt, msg->port_msg.pid,
5608                                        msg->port_msg.reply_port);
5609     if (nxt_slow_path(reply_port == NULL)) {
5610         nxt_alert(task, "get_port_handler: reply_port %PI:%d not found",
5611                   msg->port_msg.pid, msg->port_msg.reply_port);
5612 
5613         return;
5614     }
5615 
5616     if (nxt_slow_path(nxt_buf_used_size(msg->buf)
5617                       < (int) sizeof(nxt_port_msg_get_port_t)))
5618     {
5619         nxt_alert(task, "get_port_handler: message buffer too small (%d)",
5620                   (int) nxt_buf_used_size(msg->buf));
5621 
5622         return;
5623     }
5624 
5625     get_port_msg = (nxt_port_msg_get_port_t *) msg->buf->mem.pos;
5626 
5627     port = nxt_runtime_port_find(rt, get_port_msg->pid, get_port_msg->id);
5628     if (nxt_slow_path(port == NULL)) {
5629         nxt_alert(task, "get_port_handler: port %PI:%d not found",
5630                   get_port_msg->pid, get_port_msg->id);
5631 
5632         return;
5633     }
5634 
5635     nxt_debug(task, "get port %PI:%d found", get_port_msg->pid,
5636               get_port_msg->id);
5637 
5638     (void) nxt_port_send_port(task, reply_port, port, msg->port_msg.stream);
5639 }
5640