xref: /unit/src/nxt_router.c (revision 2186:47d365005fab)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Valentin V. Bartenev
5  * Copyright (C) NGINX, Inc.
6  */
7 
8 #include <nxt_router.h>
9 #include <nxt_conf.h>
10 #include <nxt_status.h>
11 #if (NXT_TLS)
12 #include <nxt_cert.h>
13 #endif
14 #include <nxt_http.h>
15 #include <nxt_port_memory_int.h>
16 #include <nxt_unit_request.h>
17 #include <nxt_unit_response.h>
18 #include <nxt_router_request.h>
19 #include <nxt_app_queue.h>
20 #include <nxt_port_queue.h>
21 
22 #define NXT_SHARED_PORT_ID  0xFFFFu
23 
24 typedef struct {
25     nxt_str_t         type;
26     uint32_t          processes;
27     uint32_t          max_processes;
28     uint32_t          spare_processes;
29     nxt_msec_t        timeout;
30     nxt_msec_t        idle_timeout;
31     nxt_conf_value_t  *limits_value;
32     nxt_conf_value_t  *processes_value;
33     nxt_conf_value_t  *targets_value;
34 } nxt_router_app_conf_t;
35 
36 
37 typedef struct {
38     nxt_str_t         pass;
39     nxt_str_t         application;
40 } nxt_router_listener_conf_t;
41 
42 
43 #if (NXT_TLS)
44 
45 typedef struct {
46     nxt_str_t               name;
47     nxt_socket_conf_t       *socket_conf;
48     nxt_router_temp_conf_t  *temp_conf;
49     nxt_tls_init_t          *tls_init;
50     nxt_bool_t              last;
51 
52     nxt_queue_link_t        link;  /* for nxt_socket_conf_t.tls */
53 } nxt_router_tlssock_t;
54 
55 #endif
56 
57 
58 typedef struct {
59     nxt_str_t               *name;
60     nxt_socket_conf_t       *socket_conf;
61     nxt_router_temp_conf_t  *temp_conf;
62     nxt_bool_t              last;
63 } nxt_socket_rpc_t;
64 
65 
66 typedef struct {
67     nxt_app_t               *app;
68     nxt_router_temp_conf_t  *temp_conf;
69     uint8_t                 proto;  /* 1 bit */
70 } nxt_app_rpc_t;
71 
72 
73 typedef struct {
74     nxt_app_joint_t         *app_joint;
75     uint32_t                generation;
76     uint8_t                 proto;  /* 1 bit */
77 } nxt_app_joint_rpc_t;
78 
79 
80 static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process,
81     nxt_mp_t *mp);
82 static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data);
83 static void nxt_router_greet_controller(nxt_task_t *task,
84     nxt_port_t *controller_port);
85 
86 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
87 
88 static void nxt_router_new_port_handler(nxt_task_t *task,
89     nxt_port_recv_msg_t *msg);
90 static void nxt_router_conf_data_handler(nxt_task_t *task,
91     nxt_port_recv_msg_t *msg);
92 static void nxt_router_app_restart_handler(nxt_task_t *task,
93     nxt_port_recv_msg_t *msg);
94 static void nxt_router_status_handler(nxt_task_t *task,
95     nxt_port_recv_msg_t *msg);
96 static void nxt_router_remove_pid_handler(nxt_task_t *task,
97     nxt_port_recv_msg_t *msg);
98 
99 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
100 static void nxt_router_conf_ready(nxt_task_t *task,
101     nxt_router_temp_conf_t *tmcf);
102 static void nxt_router_conf_send(nxt_task_t *task,
103     nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
104 
105 static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
106     nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
107 static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task,
108     nxt_router_conf_t *rtcf, nxt_conf_value_t *conf);
109 static nxt_http_forward_t *nxt_router_conf_forward(nxt_task_t *task,
110     nxt_mp_t *mp, nxt_conf_value_t *conf);
111 static nxt_int_t nxt_router_conf_forward_header(nxt_mp_t *mp,
112     nxt_conf_value_t *conf, nxt_http_forward_header_t *fh);
113 
114 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
115 static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data);
116 static nxt_int_t nxt_router_apps_hash_add(nxt_router_conf_t *rtcf,
117     nxt_app_t *app);
118 static nxt_app_t *nxt_router_apps_hash_get(nxt_router_conf_t *rtcf,
119     nxt_str_t *name);
120 static void nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf,
121     int i);
122 
123 static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task,
124     nxt_port_t *port);
125 static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task,
126     nxt_port_t *port);
127 static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task,
128     nxt_port_t *port, nxt_fd_t fd);
129 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
130     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
131 static void nxt_router_listen_socket_ready(nxt_task_t *task,
132     nxt_port_recv_msg_t *msg, void *data);
133 static void nxt_router_listen_socket_error(nxt_task_t *task,
134     nxt_port_recv_msg_t *msg, void *data);
135 #if (NXT_TLS)
136 static void nxt_router_tls_rpc_handler(nxt_task_t *task,
137     nxt_port_recv_msg_t *msg, void *data);
138 static nxt_int_t nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
139     nxt_conf_value_t *value, nxt_socket_conf_t *skcf, nxt_tls_init_t *tls_init,
140     nxt_bool_t last);
141 #endif
142 static void nxt_router_app_rpc_create(nxt_task_t *task,
143     nxt_router_temp_conf_t *tmcf, nxt_app_t *app);
144 static void nxt_router_app_prefork_ready(nxt_task_t *task,
145     nxt_port_recv_msg_t *msg, void *data);
146 static void nxt_router_app_prefork_error(nxt_task_t *task,
147     nxt_port_recv_msg_t *msg, void *data);
148 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
149     nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
150 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
151     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa);
152 
153 static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
154     nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
155     const nxt_event_interface_t *interface);
156 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
157     nxt_router_engine_conf_t *recf);
158 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
159     nxt_router_engine_conf_t *recf);
160 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
161     nxt_router_engine_conf_t *recf);
162 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
163     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
164     nxt_work_handler_t handler);
165 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
166     nxt_router_engine_conf_t *recf);
167 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
168     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
169 
170 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
171     nxt_router_temp_conf_t *tmcf);
172 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
173     nxt_event_engine_t *engine);
174 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
175     nxt_router_temp_conf_t *tmcf);
176 
177 static void nxt_router_engines_post(nxt_router_t *router,
178     nxt_router_temp_conf_t *tmcf);
179 static void nxt_router_engine_post(nxt_event_engine_t *engine,
180     nxt_work_t *jobs);
181 
182 static void nxt_router_thread_start(void *data);
183 static void nxt_router_rt_add_port(nxt_task_t *task, void *obj,
184     void *data);
185 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
186     void *data);
187 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
188     void *data);
189 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
190     void *data);
191 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
192     void *data);
193 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
194     void *data);
195 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
196     void *data);
197 static void nxt_router_req_headers_ack_handler(nxt_task_t *task,
198     nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data);
199 static void nxt_router_listen_socket_release(nxt_task_t *task,
200     nxt_socket_conf_t *skcf);
201 
202 static void nxt_router_app_port_ready(nxt_task_t *task,
203     nxt_port_recv_msg_t *msg, void *data);
204 static void nxt_router_app_port_error(nxt_task_t *task,
205     nxt_port_recv_msg_t *msg, void *data);
206 
207 static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i);
208 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
209 
210 static void nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app,
211     nxt_port_t *port, nxt_apr_action_t action);
212 static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
213     nxt_request_rpc_data_t *req_rpc_data);
214 static void nxt_router_http_request_error(nxt_task_t *task, void *obj,
215     void *data);
216 static void nxt_router_http_request_done(nxt_task_t *task, void *obj,
217     void *data);
218 
219 static void nxt_router_app_prepare_request(nxt_task_t *task,
220     nxt_request_rpc_data_t *req_rpc_data);
221 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task,
222     nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix);
223 
224 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
225 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
226     void *data);
227 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj,
228     void *data);
229 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj,
230     void *data);
231 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data);
232 
233 static const nxt_http_request_state_t  nxt_http_request_send_state;
234 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
235 
236 static void nxt_router_app_joint_use(nxt_task_t *task,
237     nxt_app_joint_t *app_joint, int i);
238 
239 static void nxt_router_http_request_release_post(nxt_task_t *task,
240     nxt_http_request_t *r);
241 static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
242     void *data);
243 static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
244 static void nxt_router_get_port_handler(nxt_task_t *task,
245     nxt_port_recv_msg_t *msg);
246 static void nxt_router_get_mmap_handler(nxt_task_t *task,
247     nxt_port_recv_msg_t *msg);
248 
249 extern const nxt_http_request_state_t  nxt_http_websocket;
250 
251 nxt_router_t  *nxt_router;
252 
253 static const nxt_str_t http_prefix = nxt_string("HTTP_");
254 static const nxt_str_t empty_prefix = nxt_string("");
255 
256 static const nxt_str_t  *nxt_app_msg_prefix[] = {
257     &empty_prefix,
258     &empty_prefix,
259     &http_prefix,
260     &http_prefix,
261     &http_prefix,
262     &empty_prefix,
263 };
264 
265 
266 static const nxt_port_handlers_t  nxt_router_process_port_handlers = {
267     .quit         = nxt_signal_quit_handler,
268     .new_port     = nxt_router_new_port_handler,
269     .get_port     = nxt_router_get_port_handler,
270     .change_file  = nxt_port_change_log_file_handler,
271     .mmap         = nxt_port_mmap_handler,
272     .get_mmap     = nxt_router_get_mmap_handler,
273     .data         = nxt_router_conf_data_handler,
274     .app_restart  = nxt_router_app_restart_handler,
275     .status       = nxt_router_status_handler,
276     .remove_pid   = nxt_router_remove_pid_handler,
277     .access_log   = nxt_router_access_log_reopen_handler,
278     .rpc_ready    = nxt_port_rpc_handler,
279     .rpc_error    = nxt_port_rpc_handler,
280     .oosm         = nxt_router_oosm_handler,
281 };
282 
283 
284 const nxt_process_init_t  nxt_router_process = {
285     .name           = "router",
286     .type           = NXT_PROCESS_ROUTER,
287     .prefork        = nxt_router_prefork,
288     .restart        = 1,
289     .setup          = nxt_process_core_setup,
290     .start          = nxt_router_start,
291     .port_handlers  = &nxt_router_process_port_handlers,
292     .signals        = nxt_process_signals,
293 };
294 
295 
296 /* Queues of nxt_socket_conf_t */
297 nxt_queue_t  creating_sockets;
298 nxt_queue_t  pending_sockets;
299 nxt_queue_t  updating_sockets;
300 nxt_queue_t  keeping_sockets;
301 nxt_queue_t  deleting_sockets;
302 
303 
304 static nxt_int_t
305 nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp)
306 {
307     nxt_runtime_stop_app_processes(task, task->thread->runtime);
308 
309     return NXT_OK;
310 }
311 
312 
313 static nxt_int_t
314 nxt_router_start(nxt_task_t *task, nxt_process_data_t *data)
315 {
316     nxt_int_t      ret;
317     nxt_port_t     *controller_port;
318     nxt_router_t   *router;
319     nxt_runtime_t  *rt;
320 
321     rt = task->thread->runtime;
322 
323     nxt_log(task, NXT_LOG_INFO, "router started");
324 
325 #if (NXT_TLS)
326     rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL");
327     if (nxt_slow_path(rt->tls == NULL)) {
328         return NXT_ERROR;
329     }
330 
331     ret = rt->tls->library_init(task);
332     if (nxt_slow_path(ret != NXT_OK)) {
333         return ret;
334     }
335 #endif
336 
337     ret = nxt_http_init(task);
338     if (nxt_slow_path(ret != NXT_OK)) {
339         return ret;
340     }
341 
342     router = nxt_zalloc(sizeof(nxt_router_t));
343     if (nxt_slow_path(router == NULL)) {
344         return NXT_ERROR;
345     }
346 
347     nxt_queue_init(&router->engines);
348     nxt_queue_init(&router->sockets);
349     nxt_queue_init(&router->apps);
350 
351     nxt_router = router;
352 
353     controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
354     if (controller_port != NULL) {
355         nxt_router_greet_controller(task, controller_port);
356     }
357 
358     return NXT_OK;
359 }
360 
361 
362 static void
363 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port)
364 {
365     nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY,
366                           -1, 0, 0, NULL);
367 }
368 
369 
370 static void
371 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
372     void *data)
373 {
374     size_t               size;
375     uint32_t             stream;
376     nxt_fd_t             port_fd, queue_fd;
377     nxt_int_t            ret;
378     nxt_app_t            *app;
379     nxt_buf_t            *b;
380     nxt_port_t           *dport;
381     nxt_runtime_t        *rt;
382     nxt_app_joint_rpc_t  *app_joint_rpc;
383 
384     app = data;
385 
386     nxt_thread_mutex_lock(&app->mutex);
387 
388     dport = app->proto_port;
389 
390     nxt_thread_mutex_unlock(&app->mutex);
391 
392     if (dport != NULL) {
393         nxt_debug(task, "app '%V' %p start process", &app->name, app);
394 
395         b = NULL;
396         port_fd = -1;
397         queue_fd = -1;
398 
399     } else {
400         if (app->proto_port_requests > 0) {
401             nxt_debug(task, "app '%V' %p wait for prototype process",
402                       &app->name, app);
403 
404             app->proto_port_requests++;
405 
406             goto skip;
407         }
408 
409         nxt_debug(task, "app '%V' %p start prototype process", &app->name, app);
410 
411         rt = task->thread->runtime;
412         dport = rt->port_by_type[NXT_PROCESS_MAIN];
413 
414         size = app->name.length + 1 + app->conf.length;
415 
416         b = nxt_buf_mem_alloc(task->thread->engine->mem_pool, size, 0);
417         if (nxt_slow_path(b == NULL)) {
418             goto failed;
419         }
420 
421         nxt_buf_cpystr(b, &app->name);
422         *b->mem.free++ = '\0';
423         nxt_buf_cpystr(b, &app->conf);
424 
425         port_fd = app->shared_port->pair[0];
426         queue_fd = app->shared_port->queue_fd;
427     }
428 
429     app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port,
430                                                      nxt_router_app_port_ready,
431                                                      nxt_router_app_port_error,
432                                                    sizeof(nxt_app_joint_rpc_t));
433     if (nxt_slow_path(app_joint_rpc == NULL)) {
434         goto failed;
435     }
436 
437     stream = nxt_port_rpc_ex_stream(app_joint_rpc);
438 
439     ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS,
440                                  port_fd, queue_fd, stream, port->id, b);
441     if (nxt_slow_path(ret != NXT_OK)) {
442         nxt_port_rpc_cancel(task, port, stream);
443 
444         goto failed;
445     }
446 
447     app_joint_rpc->app_joint = app->joint;
448     app_joint_rpc->generation = app->generation;
449     app_joint_rpc->proto = (b != NULL);
450 
451     if (b != NULL) {
452         app->proto_port_requests++;
453 
454         b = NULL;
455     }
456 
457     nxt_router_app_joint_use(task, app->joint, 1);
458 
459 failed:
460 
461     if (b != NULL) {
462         nxt_mp_free(b->data, b);
463     }
464 
465 skip:
466 
467     nxt_router_app_use(task, app, -1);
468 }
469 
470 
471 static void
472 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i)
473 {
474     app_joint->use_count += i;
475 
476     if (app_joint->use_count == 0) {
477         nxt_assert(app_joint->app == NULL);
478 
479         nxt_free(app_joint);
480     }
481 }
482 
483 
484 static nxt_int_t
485 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
486 {
487     nxt_int_t      res;
488     nxt_port_t     *router_port;
489     nxt_runtime_t  *rt;
490 
491     nxt_debug(task, "app '%V' start process", &app->name);
492 
493     rt = task->thread->runtime;
494     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
495 
496     nxt_router_app_use(task, app, 1);
497 
498     res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler,
499                         app);
500 
501     if (res == NXT_OK) {
502         return res;
503     }
504 
505     nxt_thread_mutex_lock(&app->mutex);
506 
507     app->pending_processes--;
508 
509     nxt_thread_mutex_unlock(&app->mutex);
510 
511     nxt_router_app_use(task, app, -1);
512 
513     return NXT_ERROR;
514 }
515 
516 
517 nxt_inline nxt_bool_t
518 nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data)
519 {
520     nxt_buf_t       *b, *next;
521     nxt_bool_t      cancelled;
522     nxt_port_t      *app_port;
523     nxt_msg_info_t  *msg_info;
524 
525     msg_info = &req_rpc_data->msg_info;
526 
527     if (msg_info->buf == NULL) {
528         return 0;
529     }
530 
531     app_port = req_rpc_data->app_port;
532 
533     if (app_port != NULL && app_port->id == NXT_SHARED_PORT_ID) {
534         cancelled = nxt_app_queue_cancel(app_port->queue,
535                                          msg_info->tracking_cookie,
536                                          req_rpc_data->stream);
537 
538         if (cancelled) {
539             nxt_debug(task, "stream #%uD: cancelled by router",
540                       req_rpc_data->stream);
541         }
542 
543     } else {
544         cancelled = 0;
545     }
546 
547     for (b = msg_info->buf; b != NULL; b = next) {
548         next = b->next;
549         b->next = NULL;
550 
551         if (b->is_port_mmap_sent) {
552             b->is_port_mmap_sent = cancelled == 0;
553         }
554 
555         b->completion_handler(task, b, b->parent);
556     }
557 
558     msg_info->buf = NULL;
559 
560     return cancelled;
561 }
562 
563 
564 nxt_inline nxt_bool_t
565 nxt_queue_chk_remove(nxt_queue_link_t *lnk)
566 {
567     if (lnk->next != NULL) {
568         nxt_queue_remove(lnk);
569 
570         lnk->next = NULL;
571 
572         return 1;
573     }
574 
575     return 0;
576 }
577 
578 
579 nxt_inline void
580 nxt_request_rpc_data_unlink(nxt_task_t *task,
581     nxt_request_rpc_data_t *req_rpc_data)
582 {
583     nxt_app_t           *app;
584     nxt_bool_t          unlinked;
585     nxt_http_request_t  *r;
586 
587     nxt_router_msg_cancel(task, req_rpc_data);
588 
589     app = req_rpc_data->app;
590 
591     if (req_rpc_data->app_port != NULL) {
592         nxt_router_app_port_release(task, app, req_rpc_data->app_port,
593                                     req_rpc_data->apr_action);
594 
595         req_rpc_data->app_port = NULL;
596     }
597 
598     r = req_rpc_data->request;
599 
600     if (r != NULL) {
601         r->timer_data = NULL;
602 
603         nxt_router_http_request_release_post(task, r);
604 
605         r->req_rpc_data = NULL;
606         req_rpc_data->request = NULL;
607 
608         if (app != NULL) {
609             unlinked = 0;
610 
611             nxt_thread_mutex_lock(&app->mutex);
612 
613             if (r->app_link.next != NULL) {
614                 nxt_queue_remove(&r->app_link);
615                 r->app_link.next = NULL;
616 
617                 unlinked = 1;
618             }
619 
620             nxt_thread_mutex_unlock(&app->mutex);
621 
622             if (unlinked) {
623                 nxt_mp_release(r->mem_pool);
624             }
625         }
626     }
627 
628     if (app != NULL) {
629         nxt_router_app_use(task, app, -1);
630 
631         req_rpc_data->app = NULL;
632     }
633 
634     if (req_rpc_data->msg_info.body_fd != -1) {
635         nxt_fd_close(req_rpc_data->msg_info.body_fd);
636 
637         req_rpc_data->msg_info.body_fd = -1;
638     }
639 
640     if (req_rpc_data->rpc_cancel) {
641         req_rpc_data->rpc_cancel = 0;
642 
643         nxt_port_rpc_cancel(task, task->thread->engine->port,
644                             req_rpc_data->stream);
645     }
646 }
647 
648 
649 static void
650 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
651 {
652     nxt_int_t      res;
653     nxt_app_t      *app;
654     nxt_port_t     *port, *main_app_port;
655     nxt_runtime_t  *rt;
656 
657     nxt_port_new_port_handler(task, msg);
658 
659     port = msg->u.new_port;
660 
661     if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) {
662         nxt_router_greet_controller(task, msg->u.new_port);
663     }
664 
665     if (port != NULL && port->type == NXT_PROCESS_PROTOTYPE)  {
666         nxt_port_rpc_handler(task, msg);
667 
668         return;
669     }
670 
671     if (port == NULL || port->type != NXT_PROCESS_APP) {
672 
673         if (msg->port_msg.stream == 0) {
674             return;
675         }
676 
677         msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
678 
679     } else {
680         if (msg->fd[1] != -1) {
681             res = nxt_router_port_queue_map(task, port, msg->fd[1]);
682             if (nxt_slow_path(res != NXT_OK)) {
683                 return;
684             }
685 
686             nxt_fd_close(msg->fd[1]);
687             msg->fd[1] = -1;
688         }
689     }
690 
691     if (msg->port_msg.stream != 0) {
692         nxt_port_rpc_handler(task, msg);
693         return;
694     }
695 
696     nxt_debug(task, "new port id %d (%d)", port->id, port->type);
697 
698     /*
699      * Port with "id == 0" is application 'main' port and it always
700      * should come with non-zero stream.
701      */
702     nxt_assert(port->id != 0);
703 
704     /* Find 'main' app port and get app reference. */
705     rt = task->thread->runtime;
706 
707     /*
708      * It is safe to access 'runtime->ports' hash because 'NEW_PORT'
709      * sent to main port (with id == 0) and processed in main thread.
710      */
711     main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0);
712     nxt_assert(main_app_port != NULL);
713 
714     app = main_app_port->app;
715 
716     if (nxt_fast_path(app != NULL)) {
717         nxt_thread_mutex_lock(&app->mutex);
718 
719         /* TODO here should be find-and-add code because there can be
720            port waiters in port_hash */
721         nxt_port_hash_add(&app->port_hash, port);
722         app->port_hash_count++;
723 
724         nxt_thread_mutex_unlock(&app->mutex);
725 
726         port->app = app;
727     }
728 
729     port->main_app_port = main_app_port;
730 
731     nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
732 }
733 
734 
735 static void
736 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
737 {
738     void                    *p;
739     size_t                  size;
740     nxt_int_t               ret;
741     nxt_port_t              *port;
742     nxt_router_temp_conf_t  *tmcf;
743 
744     port = nxt_runtime_port_find(task->thread->runtime,
745                                  msg->port_msg.pid,
746                                  msg->port_msg.reply_port);
747     if (nxt_slow_path(port == NULL)) {
748         nxt_alert(task, "conf_data_handler: reply port not found");
749         return;
750     }
751 
752     p = MAP_FAILED;
753 
754     /*
755      * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be
756      * initialized in 'cleanup' section.
757      */
758     size = 0;
759 
760     tmcf = nxt_router_temp_conf(task);
761     if (nxt_slow_path(tmcf == NULL)) {
762         goto fail;
763     }
764 
765     if (nxt_slow_path(msg->fd[0] == -1)) {
766         nxt_alert(task, "conf_data_handler: invalid shm fd");
767         goto fail;
768     }
769 
770     if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) {
771         nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)",
772                   (int) nxt_buf_mem_used_size(&msg->buf->mem));
773         goto fail;
774     }
775 
776     nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t));
777 
778     p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0);
779 
780     nxt_fd_close(msg->fd[0]);
781     msg->fd[0] = -1;
782 
783     if (nxt_slow_path(p == MAP_FAILED)) {
784         goto fail;
785     }
786 
787     nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p);
788 
789     tmcf->router_conf->router = nxt_router;
790     tmcf->stream = msg->port_msg.stream;
791     tmcf->port = port;
792 
793     nxt_port_use(task, tmcf->port, 1);
794 
795     ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size));
796 
797     if (nxt_fast_path(ret == NXT_OK)) {
798         nxt_router_conf_apply(task, tmcf, NULL);
799 
800     } else {
801         nxt_router_conf_error(task, tmcf);
802     }
803 
804     goto cleanup;
805 
806 fail:
807 
808     nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
809                           msg->port_msg.stream, 0, NULL);
810 
811     if (tmcf != NULL) {
812         nxt_mp_release(tmcf->mem_pool);
813     }
814 
815 cleanup:
816 
817     if (p != MAP_FAILED) {
818         nxt_mem_munmap(p, size);
819     }
820 
821     if (msg->fd[0] != -1) {
822         nxt_fd_close(msg->fd[0]);
823         msg->fd[0] = -1;
824     }
825 }
826 
827 
828 static void
829 nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
830 {
831     nxt_app_t            *app;
832     nxt_int_t            ret;
833     nxt_str_t            app_name;
834     nxt_port_t           *reply_port, *shared_port, *old_shared_port;
835     nxt_port_t           *proto_port;
836     nxt_port_msg_type_t  reply;
837 
838     reply_port = nxt_runtime_port_find(task->thread->runtime,
839                                        msg->port_msg.pid,
840                                        msg->port_msg.reply_port);
841     if (nxt_slow_path(reply_port == NULL)) {
842         nxt_alert(task, "app_restart_handler: reply port not found");
843         return;
844     }
845 
846     app_name.length = nxt_buf_mem_used_size(&msg->buf->mem);
847     app_name.start = msg->buf->mem.pos;
848 
849     nxt_debug(task, "app_restart_handler: %V", &app_name);
850 
851     app = nxt_router_app_find(&nxt_router->apps, &app_name);
852 
853     if (nxt_fast_path(app != NULL)) {
854         shared_port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
855                                    NXT_PROCESS_APP);
856         if (nxt_slow_path(shared_port == NULL)) {
857             goto fail;
858         }
859 
860         ret = nxt_port_socket_init(task, shared_port, 0);
861         if (nxt_slow_path(ret != NXT_OK)) {
862             nxt_port_use(task, shared_port, -1);
863             goto fail;
864         }
865 
866         ret = nxt_router_app_queue_init(task, shared_port);
867         if (nxt_slow_path(ret != NXT_OK)) {
868             nxt_port_write_close(shared_port);
869             nxt_port_read_close(shared_port);
870             nxt_port_use(task, shared_port, -1);
871             goto fail;
872         }
873 
874         nxt_port_write_enable(task, shared_port);
875 
876         nxt_thread_mutex_lock(&app->mutex);
877 
878         proto_port = app->proto_port;
879 
880         if (proto_port != NULL) {
881             nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name,
882                       proto_port->pid);
883 
884             app->proto_port = NULL;
885             proto_port->app = NULL;
886         }
887 
888         app->generation++;
889 
890         shared_port->app = app;
891 
892         old_shared_port = app->shared_port;
893         old_shared_port->app = NULL;
894 
895         app->shared_port = shared_port;
896 
897         nxt_thread_mutex_unlock(&app->mutex);
898 
899         nxt_port_close(task, old_shared_port);
900         nxt_port_use(task, old_shared_port, -1);
901 
902         if (proto_port != NULL) {
903             (void) nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT,
904                                          -1, 0, 0, NULL);
905 
906             nxt_port_close(task, proto_port);
907 
908             nxt_port_use(task, proto_port, -1);
909         }
910 
911         reply = NXT_PORT_MSG_RPC_READY_LAST;
912 
913     } else {
914 
915 fail:
916 
917         reply = NXT_PORT_MSG_RPC_ERROR;
918     }
919 
920     nxt_port_socket_write(task, reply_port, reply, -1, msg->port_msg.stream,
921                           0, NULL);
922 }
923 
924 
925 static void
926 nxt_router_status_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
927 {
928     u_char               *p;
929     size_t               alloc;
930     nxt_app_t            *app;
931     nxt_buf_t            *b;
932     nxt_uint_t           type;
933     nxt_port_t           *port;
934     nxt_status_app_t     *app_stat;
935     nxt_event_engine_t   *engine;
936     nxt_status_report_t  *report;
937 
938     port = nxt_runtime_port_find(task->thread->runtime,
939                                  msg->port_msg.pid,
940                                  msg->port_msg.reply_port);
941     if (nxt_slow_path(port == NULL)) {
942         nxt_alert(task, "nxt_router_status_handler(): reply port not found");
943         return;
944     }
945 
946     alloc = sizeof(nxt_status_report_t);
947 
948     nxt_queue_each(app, &nxt_router->apps, nxt_app_t, link) {
949 
950         alloc += sizeof(nxt_status_app_t) + app->name.length;
951 
952     } nxt_queue_loop;
953 
954     b = nxt_buf_mem_alloc(port->mem_pool, alloc, 0);
955     if (nxt_slow_path(b == NULL)) {
956         type = NXT_PORT_MSG_RPC_ERROR;
957         goto fail;
958     }
959 
960     report = (nxt_status_report_t *) b->mem.free;
961     b->mem.free = b->mem.end;
962 
963     nxt_memzero(report, sizeof(nxt_status_report_t));
964 
965     nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0) {
966 
967         report->accepted_conns += engine->accepted_conns_cnt;
968         report->idle_conns += engine->idle_conns_cnt;
969         report->closed_conns += engine->closed_conns_cnt;
970         report->requests += engine->requests_cnt;
971 
972     } nxt_queue_loop;
973 
974     report->apps_count = 0;
975     app_stat = report->apps;
976     p = b->mem.end;
977 
978     nxt_queue_each(app, &nxt_router->apps, nxt_app_t, link) {
979         p -= app->name.length;
980 
981         nxt_memcpy(p, app->name.start, app->name.length);
982 
983         app_stat->name.length = app->name.length;
984         app_stat->name.start = (u_char *) (p - b->mem.pos);
985 
986         app_stat->active_requests = app->active_requests;
987         app_stat->pending_processes = app->pending_processes;
988         app_stat->processes = app->processes;
989         app_stat->idle_processes = app->idle_processes;
990 
991         report->apps_count++;
992         app_stat++;
993     } nxt_queue_loop;
994 
995     type = NXT_PORT_MSG_RPC_READY_LAST;
996 
997 fail:
998 
999     nxt_port_socket_write(task, port, type, -1, msg->port_msg.stream, 0, b);
1000 }
1001 
1002 
1003 static void
1004 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
1005     void *data)
1006 {
1007     union {
1008         nxt_pid_t  removed_pid;
1009         void       *data;
1010     } u;
1011 
1012     u.data = data;
1013 
1014     nxt_port_rpc_remove_peer(task, port, u.removed_pid);
1015 }
1016 
1017 
1018 static void
1019 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
1020 {
1021     nxt_event_engine_t  *engine;
1022 
1023     nxt_port_remove_pid_handler(task, msg);
1024 
1025     nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
1026     {
1027         if (nxt_fast_path(engine->port != NULL)) {
1028             nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid,
1029                           msg->u.data);
1030         }
1031     }
1032     nxt_queue_loop;
1033 
1034     if (msg->port_msg.stream == 0) {
1035         return;
1036     }
1037 
1038     msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
1039 
1040     nxt_port_rpc_handler(task, msg);
1041 }
1042 
1043 
1044 static nxt_router_temp_conf_t *
1045 nxt_router_temp_conf(nxt_task_t *task)
1046 {
1047     nxt_mp_t                *mp, *tmp;
1048     nxt_router_conf_t       *rtcf;
1049     nxt_router_temp_conf_t  *tmcf;
1050 
1051     mp = nxt_mp_create(1024, 128, 256, 32);
1052     if (nxt_slow_path(mp == NULL)) {
1053         return NULL;
1054     }
1055 
1056     rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t));
1057     if (nxt_slow_path(rtcf == NULL)) {
1058         goto fail;
1059     }
1060 
1061     rtcf->mem_pool = mp;
1062 
1063     rtcf->var_fields = nxt_array_create(mp, 4, sizeof(nxt_var_field_t));
1064     if (nxt_slow_path(rtcf->var_fields == NULL)) {
1065         goto fail;
1066     }
1067 
1068     tmp = nxt_mp_create(1024, 128, 256, 32);
1069     if (nxt_slow_path(tmp == NULL)) {
1070         goto fail;
1071     }
1072 
1073     tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t));
1074     if (nxt_slow_path(tmcf == NULL)) {
1075         goto temp_fail;
1076     }
1077 
1078     tmcf->mem_pool = tmp;
1079     tmcf->router_conf = rtcf;
1080     tmcf->count = 1;
1081     tmcf->engine = task->thread->engine;
1082 
1083     tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
1084                                      sizeof(nxt_router_engine_conf_t));
1085     if (nxt_slow_path(tmcf->engines == NULL)) {
1086         goto temp_fail;
1087     }
1088 
1089     nxt_queue_init(&creating_sockets);
1090     nxt_queue_init(&pending_sockets);
1091     nxt_queue_init(&updating_sockets);
1092     nxt_queue_init(&keeping_sockets);
1093     nxt_queue_init(&deleting_sockets);
1094 
1095 #if (NXT_TLS)
1096     nxt_queue_init(&tmcf->tls);
1097 #endif
1098 
1099     nxt_queue_init(&tmcf->apps);
1100     nxt_queue_init(&tmcf->previous);
1101 
1102     return tmcf;
1103 
1104 temp_fail:
1105 
1106     nxt_mp_destroy(tmp);
1107 
1108 fail:
1109 
1110     nxt_mp_destroy(mp);
1111 
1112     return NULL;
1113 }
1114 
1115 
1116 nxt_inline nxt_bool_t
1117 nxt_router_app_can_start(nxt_app_t *app)
1118 {
1119     return app->processes + app->pending_processes < app->max_processes
1120             && app->pending_processes < app->max_pending_processes;
1121 }
1122 
1123 
1124 nxt_inline nxt_bool_t
1125 nxt_router_app_need_start(nxt_app_t *app)
1126 {
1127     return (app->active_requests
1128               > app->port_hash_count + app->pending_processes)
1129            || (app->spare_processes
1130                 > app->idle_processes + app->pending_processes);
1131 }
1132 
1133 
1134 void
1135 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
1136 {
1137     nxt_int_t                    ret;
1138     nxt_app_t                    *app;
1139     nxt_router_t                 *router;
1140     nxt_runtime_t                *rt;
1141     nxt_queue_link_t             *qlk;
1142     nxt_socket_conf_t            *skcf;
1143     nxt_router_conf_t            *rtcf;
1144     nxt_router_temp_conf_t       *tmcf;
1145     const nxt_event_interface_t  *interface;
1146 #if (NXT_TLS)
1147     nxt_router_tlssock_t         *tls;
1148 #endif
1149 
1150     tmcf = obj;
1151 
1152     qlk = nxt_queue_first(&pending_sockets);
1153 
1154     if (qlk != nxt_queue_tail(&pending_sockets)) {
1155         nxt_queue_remove(qlk);
1156         nxt_queue_insert_tail(&creating_sockets, qlk);
1157 
1158         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1159 
1160         nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
1161 
1162         return;
1163     }
1164 
1165 #if (NXT_TLS)
1166     qlk = nxt_queue_last(&tmcf->tls);
1167 
1168     if (qlk != nxt_queue_head(&tmcf->tls)) {
1169         nxt_queue_remove(qlk);
1170 
1171         tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link);
1172 
1173         nxt_cert_store_get(task, &tls->name, tmcf->mem_pool,
1174                            nxt_router_tls_rpc_handler, tls);
1175         return;
1176     }
1177 #endif
1178 
1179     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1180 
1181         if (nxt_router_app_need_start(app)) {
1182             nxt_router_app_rpc_create(task, tmcf, app);
1183             return;
1184         }
1185 
1186     } nxt_queue_loop;
1187 
1188     rtcf = tmcf->router_conf;
1189 
1190     if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) {
1191         nxt_router_access_log_open(task, tmcf);
1192         return;
1193     }
1194 
1195     rt = task->thread->runtime;
1196 
1197     interface = nxt_service_get(rt->services, "engine", NULL);
1198 
1199     router = rtcf->router;
1200 
1201     ret = nxt_router_engines_create(task, router, tmcf, interface);
1202     if (nxt_slow_path(ret != NXT_OK)) {
1203         goto fail;
1204     }
1205 
1206     ret = nxt_router_threads_create(task, rt, tmcf);
1207     if (nxt_slow_path(ret != NXT_OK)) {
1208         goto fail;
1209     }
1210 
1211     nxt_router_apps_sort(task, router, tmcf);
1212 
1213     nxt_router_apps_hash_use(task, rtcf, 1);
1214 
1215     nxt_router_engines_post(router, tmcf);
1216 
1217     nxt_queue_add(&router->sockets, &updating_sockets);
1218     nxt_queue_add(&router->sockets, &creating_sockets);
1219 
1220     if (router->access_log != rtcf->access_log) {
1221         nxt_router_access_log_use(&router->lock, rtcf->access_log);
1222 
1223         nxt_router_access_log_release(task, &router->lock, router->access_log);
1224 
1225         router->access_log = rtcf->access_log;
1226     }
1227 
1228     nxt_router_conf_ready(task, tmcf);
1229 
1230     return;
1231 
1232 fail:
1233 
1234     nxt_router_conf_error(task, tmcf);
1235 
1236     return;
1237 }
1238 
1239 
1240 static void
1241 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data)
1242 {
1243     nxt_joint_job_t  *job;
1244 
1245     job = obj;
1246 
1247     nxt_router_conf_ready(task, job->tmcf);
1248 }
1249 
1250 
1251 static void
1252 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1253 {
1254     uint32_t               count;
1255     nxt_router_conf_t      *rtcf;
1256     nxt_thread_spinlock_t  *lock;
1257 
1258     nxt_debug(task, "temp conf %p count: %D", tmcf, tmcf->count);
1259 
1260     if (--tmcf->count > 0) {
1261         return;
1262     }
1263 
1264     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
1265 
1266     rtcf = tmcf->router_conf;
1267 
1268     lock = &rtcf->router->lock;
1269 
1270     nxt_thread_spin_lock(lock);
1271 
1272     count = rtcf->count;
1273 
1274     nxt_thread_spin_unlock(lock);
1275 
1276     nxt_debug(task, "rtcf %p: %D", rtcf, count);
1277 
1278     if (count == 0) {
1279         nxt_router_apps_hash_use(task, rtcf, -1);
1280 
1281         nxt_router_access_log_release(task, lock, rtcf->access_log);
1282 
1283         nxt_mp_destroy(rtcf->mem_pool);
1284     }
1285 
1286     nxt_mp_release(tmcf->mem_pool);
1287 }
1288 
1289 
1290 void
1291 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1292 {
1293     nxt_app_t          *app;
1294     nxt_socket_t       s;
1295     nxt_router_t       *router;
1296     nxt_queue_link_t   *qlk;
1297     nxt_socket_conf_t  *skcf;
1298     nxt_router_conf_t  *rtcf;
1299 
1300     nxt_alert(task, "failed to apply new conf");
1301 
1302     for (qlk = nxt_queue_first(&creating_sockets);
1303          qlk != nxt_queue_tail(&creating_sockets);
1304          qlk = nxt_queue_next(qlk))
1305     {
1306         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1307         s = skcf->listen->socket;
1308 
1309         if (s != -1) {
1310             nxt_socket_close(task, s);
1311         }
1312 
1313         nxt_free(skcf->listen);
1314     }
1315 
1316     rtcf = tmcf->router_conf;
1317 
1318     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1319 
1320         nxt_router_app_unlink(task, app);
1321 
1322     } nxt_queue_loop;
1323 
1324     router = rtcf->router;
1325 
1326     nxt_queue_add(&router->sockets, &keeping_sockets);
1327     nxt_queue_add(&router->sockets, &deleting_sockets);
1328 
1329     nxt_queue_add(&router->apps, &tmcf->previous);
1330 
1331     // TODO: new engines and threads
1332 
1333     nxt_router_access_log_release(task, &router->lock, rtcf->access_log);
1334 
1335     nxt_mp_destroy(rtcf->mem_pool);
1336 
1337     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
1338 
1339     nxt_mp_release(tmcf->mem_pool);
1340 }
1341 
1342 
1343 static void
1344 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1345     nxt_port_msg_type_t type)
1346 {
1347     nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);
1348 
1349     nxt_port_use(task, tmcf->port, -1);
1350 
1351     tmcf->port = NULL;
1352 }
1353 
1354 
1355 static nxt_conf_map_t  nxt_router_conf[] = {
1356     {
1357         nxt_string("listeners_threads"),
1358         NXT_CONF_MAP_INT32,
1359         offsetof(nxt_router_conf_t, threads),
1360     },
1361 };
1362 
1363 
1364 static nxt_conf_map_t  nxt_router_app_conf[] = {
1365     {
1366         nxt_string("type"),
1367         NXT_CONF_MAP_STR,
1368         offsetof(nxt_router_app_conf_t, type),
1369     },
1370 
1371     {
1372         nxt_string("limits"),
1373         NXT_CONF_MAP_PTR,
1374         offsetof(nxt_router_app_conf_t, limits_value),
1375     },
1376 
1377     {
1378         nxt_string("processes"),
1379         NXT_CONF_MAP_INT32,
1380         offsetof(nxt_router_app_conf_t, processes),
1381     },
1382 
1383     {
1384         nxt_string("processes"),
1385         NXT_CONF_MAP_PTR,
1386         offsetof(nxt_router_app_conf_t, processes_value),
1387     },
1388 
1389     {
1390         nxt_string("targets"),
1391         NXT_CONF_MAP_PTR,
1392         offsetof(nxt_router_app_conf_t, targets_value),
1393     },
1394 };
1395 
1396 
1397 static nxt_conf_map_t  nxt_router_app_limits_conf[] = {
1398     {
1399         nxt_string("timeout"),
1400         NXT_CONF_MAP_MSEC,
1401         offsetof(nxt_router_app_conf_t, timeout),
1402     },
1403 };
1404 
1405 
1406 static nxt_conf_map_t  nxt_router_app_processes_conf[] = {
1407     {
1408         nxt_string("spare"),
1409         NXT_CONF_MAP_INT32,
1410         offsetof(nxt_router_app_conf_t, spare_processes),
1411     },
1412 
1413     {
1414         nxt_string("max"),
1415         NXT_CONF_MAP_INT32,
1416         offsetof(nxt_router_app_conf_t, max_processes),
1417     },
1418 
1419     {
1420         nxt_string("idle_timeout"),
1421         NXT_CONF_MAP_MSEC,
1422         offsetof(nxt_router_app_conf_t, idle_timeout),
1423     },
1424 };
1425 
1426 
1427 static nxt_conf_map_t  nxt_router_listener_conf[] = {
1428     {
1429         nxt_string("pass"),
1430         NXT_CONF_MAP_STR_COPY,
1431         offsetof(nxt_router_listener_conf_t, pass),
1432     },
1433 
1434     {
1435         nxt_string("application"),
1436         NXT_CONF_MAP_STR_COPY,
1437         offsetof(nxt_router_listener_conf_t, application),
1438     },
1439 };
1440 
1441 
1442 static nxt_conf_map_t  nxt_router_http_conf[] = {
1443     {
1444         nxt_string("header_buffer_size"),
1445         NXT_CONF_MAP_SIZE,
1446         offsetof(nxt_socket_conf_t, header_buffer_size),
1447     },
1448 
1449     {
1450         nxt_string("large_header_buffer_size"),
1451         NXT_CONF_MAP_SIZE,
1452         offsetof(nxt_socket_conf_t, large_header_buffer_size),
1453     },
1454 
1455     {
1456         nxt_string("large_header_buffers"),
1457         NXT_CONF_MAP_SIZE,
1458         offsetof(nxt_socket_conf_t, large_header_buffers),
1459     },
1460 
1461     {
1462         nxt_string("body_buffer_size"),
1463         NXT_CONF_MAP_SIZE,
1464         offsetof(nxt_socket_conf_t, body_buffer_size),
1465     },
1466 
1467     {
1468         nxt_string("max_body_size"),
1469         NXT_CONF_MAP_SIZE,
1470         offsetof(nxt_socket_conf_t, max_body_size),
1471     },
1472 
1473     {
1474         nxt_string("idle_timeout"),
1475         NXT_CONF_MAP_MSEC,
1476         offsetof(nxt_socket_conf_t, idle_timeout),
1477     },
1478 
1479     {
1480         nxt_string("header_read_timeout"),
1481         NXT_CONF_MAP_MSEC,
1482         offsetof(nxt_socket_conf_t, header_read_timeout),
1483     },
1484 
1485     {
1486         nxt_string("body_read_timeout"),
1487         NXT_CONF_MAP_MSEC,
1488         offsetof(nxt_socket_conf_t, body_read_timeout),
1489     },
1490 
1491     {
1492         nxt_string("send_timeout"),
1493         NXT_CONF_MAP_MSEC,
1494         offsetof(nxt_socket_conf_t, send_timeout),
1495     },
1496 
1497     {
1498         nxt_string("body_temp_path"),
1499         NXT_CONF_MAP_STR,
1500         offsetof(nxt_socket_conf_t, body_temp_path),
1501     },
1502 
1503     {
1504         nxt_string("discard_unsafe_fields"),
1505         NXT_CONF_MAP_INT8,
1506         offsetof(nxt_socket_conf_t, discard_unsafe_fields),
1507     },
1508 };
1509 
1510 
1511 static nxt_conf_map_t  nxt_router_websocket_conf[] = {
1512     {
1513         nxt_string("max_frame_size"),
1514         NXT_CONF_MAP_SIZE,
1515         offsetof(nxt_websocket_conf_t, max_frame_size),
1516     },
1517 
1518     {
1519         nxt_string("read_timeout"),
1520         NXT_CONF_MAP_MSEC,
1521         offsetof(nxt_websocket_conf_t, read_timeout),
1522     },
1523 
1524     {
1525         nxt_string("keepalive_interval"),
1526         NXT_CONF_MAP_MSEC,
1527         offsetof(nxt_websocket_conf_t, keepalive_interval),
1528     },
1529 
1530 };
1531 
1532 
1533 static nxt_int_t
1534 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1535     u_char *start, u_char *end)
1536 {
1537     u_char                      *p;
1538     size_t                      size;
1539     nxt_mp_t                    *mp, *app_mp;
1540     uint32_t                    next, next_target;
1541     nxt_int_t                   ret;
1542     nxt_str_t                   name, target;
1543     nxt_app_t                   *app, *prev;
1544     nxt_str_t                   *t, *s, *targets;
1545     nxt_uint_t                  n, i;
1546     nxt_port_t                  *port;
1547     nxt_router_t                *router;
1548     nxt_app_joint_t             *app_joint;
1549 #if (NXT_TLS)
1550     nxt_tls_init_t              *tls_init;
1551     nxt_conf_value_t            *certificate;
1552 #endif
1553     nxt_conf_value_t            *root, *conf, *http, *value, *websocket;
1554     nxt_conf_value_t            *applications, *application;
1555     nxt_conf_value_t            *listeners, *listener;
1556     nxt_socket_conf_t           *skcf;
1557     nxt_router_conf_t           *rtcf;
1558     nxt_http_routes_t           *routes;
1559     nxt_event_engine_t          *engine;
1560     nxt_app_lang_module_t       *lang;
1561     nxt_router_app_conf_t       apcf;
1562     nxt_router_listener_conf_t  lscf;
1563 
1564     static nxt_str_t  http_path = nxt_string("/settings/http");
1565     static nxt_str_t  applications_path = nxt_string("/applications");
1566     static nxt_str_t  listeners_path = nxt_string("/listeners");
1567     static nxt_str_t  routes_path = nxt_string("/routes");
1568     static nxt_str_t  access_log_path = nxt_string("/access_log");
1569 #if (NXT_TLS)
1570     static nxt_str_t  certificate_path = nxt_string("/tls/certificate");
1571     static nxt_str_t  conf_commands_path = nxt_string("/tls/conf_commands");
1572     static nxt_str_t  conf_cache_path = nxt_string("/tls/session/cache_size");
1573     static nxt_str_t  conf_timeout_path = nxt_string("/tls/session/timeout");
1574     static nxt_str_t  conf_tickets = nxt_string("/tls/session/tickets");
1575 #endif
1576     static nxt_str_t  static_path = nxt_string("/settings/http/static");
1577     static nxt_str_t  websocket_path = nxt_string("/settings/http/websocket");
1578     static nxt_str_t  forwarded_path = nxt_string("/forwarded");
1579     static nxt_str_t  client_ip_path = nxt_string("/client_ip");
1580 
1581     root = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
1582     if (root == NULL) {
1583         nxt_alert(task, "configuration parsing error");
1584         return NXT_ERROR;
1585     }
1586 
1587     rtcf = tmcf->router_conf;
1588     mp = rtcf->mem_pool;
1589 
1590     ret = nxt_conf_map_object(mp, root, nxt_router_conf,
1591                               nxt_nitems(nxt_router_conf), rtcf);
1592     if (ret != NXT_OK) {
1593         nxt_alert(task, "root map error");
1594         return NXT_ERROR;
1595     }
1596 
1597     if (rtcf->threads == 0) {
1598         rtcf->threads = nxt_ncpu;
1599     }
1600 
1601     conf = nxt_conf_get_path(root, &static_path);
1602 
1603     ret = nxt_router_conf_process_static(task, rtcf, conf);
1604     if (nxt_slow_path(ret != NXT_OK)) {
1605         return NXT_ERROR;
1606     }
1607 
1608     router = rtcf->router;
1609 
1610     applications = nxt_conf_get_path(root, &applications_path);
1611 
1612     if (applications != NULL) {
1613         next = 0;
1614 
1615         for ( ;; ) {
1616             application = nxt_conf_next_object_member(applications,
1617                                                       &name, &next);
1618             if (application == NULL) {
1619                 break;
1620             }
1621 
1622             nxt_debug(task, "application \"%V\"", &name);
1623 
1624             size = nxt_conf_json_length(application, NULL);
1625 
1626             app_mp = nxt_mp_create(4096, 128, 1024, 64);
1627             if (nxt_slow_path(app_mp == NULL)) {
1628                 goto fail;
1629             }
1630 
1631             app = nxt_mp_get(app_mp, sizeof(nxt_app_t) + name.length + size);
1632             if (app == NULL) {
1633                 goto app_fail;
1634             }
1635 
1636             nxt_memzero(app, sizeof(nxt_app_t));
1637 
1638             app->mem_pool = app_mp;
1639 
1640             app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t));
1641             app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t)
1642                                                   + name.length);
1643 
1644             p = nxt_conf_json_print(app->conf.start, application, NULL);
1645             app->conf.length = p - app->conf.start;
1646 
1647             nxt_assert(app->conf.length <= size);
1648 
1649             nxt_debug(task, "application conf \"%V\"", &app->conf);
1650 
1651             prev = nxt_router_app_find(&router->apps, &name);
1652 
1653             if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
1654                 nxt_mp_destroy(app_mp);
1655 
1656                 nxt_queue_remove(&prev->link);
1657                 nxt_queue_insert_tail(&tmcf->previous, &prev->link);
1658 
1659                 ret = nxt_router_apps_hash_add(rtcf, prev);
1660                 if (nxt_slow_path(ret != NXT_OK)) {
1661                     goto fail;
1662                 }
1663 
1664                 continue;
1665             }
1666 
1667             apcf.processes = 1;
1668             apcf.max_processes = 1;
1669             apcf.spare_processes = 0;
1670             apcf.timeout = 0;
1671             apcf.idle_timeout = 15000;
1672             apcf.limits_value = NULL;
1673             apcf.processes_value = NULL;
1674             apcf.targets_value = NULL;
1675 
1676             app_joint = nxt_malloc(sizeof(nxt_app_joint_t));
1677             if (nxt_slow_path(app_joint == NULL)) {
1678                 goto app_fail;
1679             }
1680 
1681             nxt_memzero(app_joint, sizeof(nxt_app_joint_t));
1682 
1683             ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
1684                                       nxt_nitems(nxt_router_app_conf), &apcf);
1685             if (ret != NXT_OK) {
1686                 nxt_alert(task, "application map error");
1687                 goto app_fail;
1688             }
1689 
1690             if (apcf.limits_value != NULL) {
1691 
1692                 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) {
1693                     nxt_alert(task, "application limits is not object");
1694                     goto app_fail;
1695                 }
1696 
1697                 ret = nxt_conf_map_object(mp, apcf.limits_value,
1698                                         nxt_router_app_limits_conf,
1699                                         nxt_nitems(nxt_router_app_limits_conf),
1700                                         &apcf);
1701                 if (ret != NXT_OK) {
1702                     nxt_alert(task, "application limits map error");
1703                     goto app_fail;
1704                 }
1705             }
1706 
1707             if (apcf.processes_value != NULL
1708                 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT)
1709             {
1710                 ret = nxt_conf_map_object(mp, apcf.processes_value,
1711                                      nxt_router_app_processes_conf,
1712                                      nxt_nitems(nxt_router_app_processes_conf),
1713                                      &apcf);
1714                 if (ret != NXT_OK) {
1715                     nxt_alert(task, "application processes map error");
1716                     goto app_fail;
1717                 }
1718 
1719             } else {
1720                 apcf.max_processes = apcf.processes;
1721                 apcf.spare_processes = apcf.processes;
1722             }
1723 
1724             if (apcf.targets_value != NULL) {
1725                 n = nxt_conf_object_members_count(apcf.targets_value);
1726 
1727                 targets = nxt_mp_get(app_mp, sizeof(nxt_str_t) * n);
1728                 if (nxt_slow_path(targets == NULL)) {
1729                     goto app_fail;
1730                 }
1731 
1732                 next_target = 0;
1733 
1734                 for (i = 0; i < n; i++) {
1735                     (void) nxt_conf_next_object_member(apcf.targets_value,
1736                                                        &target, &next_target);
1737 
1738                     s = nxt_str_dup(app_mp, &targets[i], &target);
1739                     if (nxt_slow_path(s == NULL)) {
1740                         goto app_fail;
1741                     }
1742                 }
1743 
1744             } else {
1745                 targets = NULL;
1746             }
1747 
1748             nxt_debug(task, "application type: %V", &apcf.type);
1749             nxt_debug(task, "application processes: %D", apcf.processes);
1750             nxt_debug(task, "application request timeout: %M", apcf.timeout);
1751 
1752             lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
1753 
1754             if (lang == NULL) {
1755                 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type);
1756                 goto app_fail;
1757             }
1758 
1759             nxt_debug(task, "application language module: \"%s\"", lang->file);
1760 
1761             ret = nxt_thread_mutex_create(&app->mutex);
1762             if (ret != NXT_OK) {
1763                 goto app_fail;
1764             }
1765 
1766             nxt_queue_init(&app->ports);
1767             nxt_queue_init(&app->spare_ports);
1768             nxt_queue_init(&app->idle_ports);
1769             nxt_queue_init(&app->ack_waiting_req);
1770 
1771             app->name.length = name.length;
1772             nxt_memcpy(app->name.start, name.start, name.length);
1773 
1774             app->type = lang->type;
1775             app->max_processes = apcf.max_processes;
1776             app->spare_processes = apcf.spare_processes;
1777             app->max_pending_processes = apcf.spare_processes
1778                                          ? apcf.spare_processes : 1;
1779             app->timeout = apcf.timeout;
1780             app->idle_timeout = apcf.idle_timeout;
1781 
1782             app->targets = targets;
1783 
1784             engine = task->thread->engine;
1785 
1786             app->engine = engine;
1787 
1788             app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;
1789             app->adjust_idle_work.task = &engine->task;
1790             app->adjust_idle_work.obj = app;
1791 
1792             nxt_queue_insert_tail(&tmcf->apps, &app->link);
1793 
1794             ret = nxt_router_apps_hash_add(rtcf, app);
1795             if (nxt_slow_path(ret != NXT_OK)) {
1796                 goto app_fail;
1797             }
1798 
1799             nxt_router_app_use(task, app, 1);
1800 
1801             app->joint = app_joint;
1802 
1803             app_joint->use_count = 1;
1804             app_joint->app = app;
1805 
1806             app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS;
1807             app_joint->idle_timer.work_queue = &engine->fast_work_queue;
1808             app_joint->idle_timer.handler = nxt_router_app_idle_timeout;
1809             app_joint->idle_timer.task = &engine->task;
1810             app_joint->idle_timer.log = app_joint->idle_timer.task->log;
1811 
1812             app_joint->free_app_work.handler = nxt_router_free_app;
1813             app_joint->free_app_work.task = &engine->task;
1814             app_joint->free_app_work.obj = app_joint;
1815 
1816             port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
1817                                 NXT_PROCESS_APP);
1818             if (nxt_slow_path(port == NULL)) {
1819                 return NXT_ERROR;
1820             }
1821 
1822             ret = nxt_port_socket_init(task, port, 0);
1823             if (nxt_slow_path(ret != NXT_OK)) {
1824                 nxt_port_use(task, port, -1);
1825                 return NXT_ERROR;
1826             }
1827 
1828             ret = nxt_router_app_queue_init(task, port);
1829             if (nxt_slow_path(ret != NXT_OK)) {
1830                 nxt_port_write_close(port);
1831                 nxt_port_read_close(port);
1832                 nxt_port_use(task, port, -1);
1833                 return NXT_ERROR;
1834             }
1835 
1836             nxt_port_write_enable(task, port);
1837             port->app = app;
1838 
1839             app->shared_port = port;
1840 
1841             nxt_thread_mutex_create(&app->outgoing.mutex);
1842         }
1843     }
1844 
1845     conf = nxt_conf_get_path(root, &routes_path);
1846     if (nxt_fast_path(conf != NULL)) {
1847         routes = nxt_http_routes_create(task, tmcf, conf);
1848         if (nxt_slow_path(routes == NULL)) {
1849             return NXT_ERROR;
1850         }
1851 
1852         rtcf->routes = routes;
1853     }
1854 
1855     ret = nxt_upstreams_create(task, tmcf, root);
1856     if (nxt_slow_path(ret != NXT_OK)) {
1857         return ret;
1858     }
1859 
1860     http = nxt_conf_get_path(root, &http_path);
1861 #if 0
1862     if (http == NULL) {
1863         nxt_alert(task, "no \"http\" block");
1864         return NXT_ERROR;
1865     }
1866 #endif
1867 
1868     websocket = nxt_conf_get_path(root, &websocket_path);
1869 
1870     listeners = nxt_conf_get_path(root, &listeners_path);
1871 
1872     if (listeners != NULL) {
1873         next = 0;
1874 
1875         for ( ;; ) {
1876             listener = nxt_conf_next_object_member(listeners, &name, &next);
1877             if (listener == NULL) {
1878                 break;
1879             }
1880 
1881             skcf = nxt_router_socket_conf(task, tmcf, &name);
1882             if (skcf == NULL) {
1883                 goto fail;
1884             }
1885 
1886             nxt_memzero(&lscf, sizeof(lscf));
1887 
1888             ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
1889                                       nxt_nitems(nxt_router_listener_conf),
1890                                       &lscf);
1891             if (ret != NXT_OK) {
1892                 nxt_alert(task, "listener map error");
1893                 goto fail;
1894             }
1895 
1896             nxt_debug(task, "application: %V", &lscf.application);
1897 
1898             // STUB, default values if http block is not defined.
1899             skcf->header_buffer_size = 2048;
1900             skcf->large_header_buffer_size = 8192;
1901             skcf->large_header_buffers = 4;
1902             skcf->discard_unsafe_fields = 1;
1903             skcf->body_buffer_size = 16 * 1024;
1904             skcf->max_body_size = 8 * 1024 * 1024;
1905             skcf->proxy_header_buffer_size = 64 * 1024;
1906             skcf->proxy_buffer_size = 4096;
1907             skcf->proxy_buffers = 256;
1908             skcf->idle_timeout = 180 * 1000;
1909             skcf->header_read_timeout = 30 * 1000;
1910             skcf->body_read_timeout = 30 * 1000;
1911             skcf->send_timeout = 30 * 1000;
1912             skcf->proxy_timeout = 60 * 1000;
1913             skcf->proxy_send_timeout = 30 * 1000;
1914             skcf->proxy_read_timeout = 30 * 1000;
1915 
1916             skcf->websocket_conf.max_frame_size = 1024 * 1024;
1917             skcf->websocket_conf.read_timeout = 60 * 1000;
1918             skcf->websocket_conf.keepalive_interval = 30 * 1000;
1919 
1920             nxt_str_null(&skcf->body_temp_path);
1921 
1922             if (http != NULL) {
1923                 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
1924                                           nxt_nitems(nxt_router_http_conf),
1925                                           skcf);
1926                 if (ret != NXT_OK) {
1927                     nxt_alert(task, "http map error");
1928                     goto fail;
1929                 }
1930             }
1931 
1932             if (websocket != NULL) {
1933                 ret = nxt_conf_map_object(mp, websocket,
1934                                           nxt_router_websocket_conf,
1935                                           nxt_nitems(nxt_router_websocket_conf),
1936                                           &skcf->websocket_conf);
1937                 if (ret != NXT_OK) {
1938                     nxt_alert(task, "websocket map error");
1939                     goto fail;
1940                 }
1941             }
1942 
1943             t = &skcf->body_temp_path;
1944 
1945             if (t->length == 0) {
1946                 t->start = (u_char *) task->thread->runtime->tmp;
1947                 t->length = nxt_strlen(t->start);
1948             }
1949 
1950             conf = nxt_conf_get_path(listener, &forwarded_path);
1951 
1952             if (conf != NULL) {
1953                 skcf->forwarded = nxt_router_conf_forward(task, mp, conf);
1954                 if (nxt_slow_path(skcf->forwarded == NULL)) {
1955                     return NXT_ERROR;
1956                 }
1957             }
1958 
1959             conf = nxt_conf_get_path(listener, &client_ip_path);
1960 
1961             if (conf != NULL) {
1962                 skcf->client_ip = nxt_router_conf_forward(task, mp, conf);
1963                 if (nxt_slow_path(skcf->client_ip == NULL)) {
1964                     return NXT_ERROR;
1965                 }
1966             }
1967 
1968 #if (NXT_TLS)
1969             certificate = nxt_conf_get_path(listener, &certificate_path);
1970 
1971             if (certificate != NULL) {
1972                 tls_init = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_tls_init_t));
1973                 if (nxt_slow_path(tls_init == NULL)) {
1974                     return NXT_ERROR;
1975                 }
1976 
1977                 tls_init->cache_size = 0;
1978                 tls_init->timeout = 300;
1979 
1980                 value = nxt_conf_get_path(listener, &conf_cache_path);
1981                 if (value != NULL) {
1982                     tls_init->cache_size = nxt_conf_get_number(value);
1983                 }
1984 
1985                 value = nxt_conf_get_path(listener, &conf_timeout_path);
1986                 if (value != NULL) {
1987                     tls_init->timeout = nxt_conf_get_number(value);
1988                 }
1989 
1990                 tls_init->conf_cmds = nxt_conf_get_path(listener,
1991                                                         &conf_commands_path);
1992 
1993                 tls_init->tickets_conf = nxt_conf_get_path(listener,
1994                                                            &conf_tickets);
1995 
1996                 n = nxt_conf_array_elements_count_or_1(certificate);
1997 
1998                 for (i = 0; i < n; i++) {
1999                     value = nxt_conf_get_array_element_or_itself(certificate,
2000                                                                  i);
2001                     nxt_assert(value != NULL);
2002 
2003                     ret = nxt_router_conf_tls_insert(tmcf, value, skcf,
2004                                                      tls_init, i == 0);
2005                     if (nxt_slow_path(ret != NXT_OK)) {
2006                         goto fail;
2007                     }
2008                 }
2009             }
2010 #endif
2011 
2012             skcf->listen->handler = nxt_http_conn_init;
2013             skcf->router_conf = rtcf;
2014             skcf->router_conf->count++;
2015 
2016             if (lscf.pass.length != 0) {
2017                 skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass);
2018 
2019             /* COMPATIBILITY: listener application. */
2020             } else if (lscf.application.length > 0) {
2021                 skcf->action = nxt_http_pass_application(task, rtcf,
2022                                                          &lscf.application);
2023             }
2024 
2025             if (nxt_slow_path(skcf->action == NULL)) {
2026                 goto fail;
2027             }
2028         }
2029     }
2030 
2031     ret = nxt_http_routes_resolve(task, tmcf);
2032     if (nxt_slow_path(ret != NXT_OK)) {
2033         goto fail;
2034     }
2035 
2036     value = nxt_conf_get_path(root, &access_log_path);
2037 
2038     if (value != NULL) {
2039         ret = nxt_router_access_log_create(task, rtcf, value);
2040         if (nxt_slow_path(ret != NXT_OK)) {
2041             goto fail;
2042         }
2043     }
2044 
2045     nxt_queue_add(&deleting_sockets, &router->sockets);
2046     nxt_queue_init(&router->sockets);
2047 
2048     return NXT_OK;
2049 
2050 app_fail:
2051 
2052     nxt_mp_destroy(app_mp);
2053 
2054 fail:
2055 
2056     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
2057 
2058         nxt_queue_remove(&app->link);
2059         nxt_thread_mutex_destroy(&app->mutex);
2060         nxt_mp_destroy(app->mem_pool);
2061 
2062     } nxt_queue_loop;
2063 
2064     return NXT_ERROR;
2065 }
2066 
2067 
2068 #if (NXT_TLS)
2069 
2070 static nxt_int_t
2071 nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
2072     nxt_conf_value_t *value, nxt_socket_conf_t *skcf,
2073     nxt_tls_init_t *tls_init, nxt_bool_t last)
2074 {
2075     nxt_router_tlssock_t  *tls;
2076 
2077     tls = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_router_tlssock_t));
2078     if (nxt_slow_path(tls == NULL)) {
2079         return NXT_ERROR;
2080     }
2081 
2082     tls->tls_init = tls_init;
2083     tls->socket_conf = skcf;
2084     tls->temp_conf = tmcf;
2085     tls->last = last;
2086     nxt_conf_get_string(value, &tls->name);
2087 
2088     nxt_queue_insert_tail(&tmcf->tls, &tls->link);
2089 
2090     return NXT_OK;
2091 }
2092 
2093 #endif
2094 
2095 
2096 static nxt_int_t
2097 nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf,
2098     nxt_conf_value_t *conf)
2099 {
2100     uint32_t          next, i;
2101     nxt_mp_t          *mp;
2102     nxt_str_t         *type, exten, str;
2103     nxt_int_t         ret;
2104     nxt_uint_t        exts;
2105     nxt_conf_value_t  *mtypes_conf, *ext_conf, *value;
2106 
2107     static nxt_str_t  mtypes_path = nxt_string("/mime_types");
2108 
2109     mp = rtcf->mem_pool;
2110 
2111     ret = nxt_http_static_mtypes_init(mp, &rtcf->mtypes_hash);
2112     if (nxt_slow_path(ret != NXT_OK)) {
2113         return NXT_ERROR;
2114     }
2115 
2116     if (conf == NULL) {
2117         return NXT_OK;
2118     }
2119 
2120     mtypes_conf = nxt_conf_get_path(conf, &mtypes_path);
2121 
2122     if (mtypes_conf != NULL) {
2123         next = 0;
2124 
2125         for ( ;; ) {
2126             ext_conf = nxt_conf_next_object_member(mtypes_conf, &str, &next);
2127 
2128             if (ext_conf == NULL) {
2129                 break;
2130             }
2131 
2132             type = nxt_str_dup(mp, NULL, &str);
2133             if (nxt_slow_path(type == NULL)) {
2134                 return NXT_ERROR;
2135             }
2136 
2137             if (nxt_conf_type(ext_conf) == NXT_CONF_STRING) {
2138                 nxt_conf_get_string(ext_conf, &str);
2139 
2140                 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2141                     return NXT_ERROR;
2142                 }
2143 
2144                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2145                                                       &exten, type);
2146                 if (nxt_slow_path(ret != NXT_OK)) {
2147                     return NXT_ERROR;
2148                 }
2149 
2150                 continue;
2151             }
2152 
2153             exts = nxt_conf_array_elements_count(ext_conf);
2154 
2155             for (i = 0; i < exts; i++) {
2156                 value = nxt_conf_get_array_element(ext_conf, i);
2157 
2158                 nxt_conf_get_string(value, &str);
2159 
2160                 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2161                     return NXT_ERROR;
2162                 }
2163 
2164                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2165                                                       &exten, type);
2166                 if (nxt_slow_path(ret != NXT_OK)) {
2167                     return NXT_ERROR;
2168                 }
2169             }
2170         }
2171     }
2172 
2173     return NXT_OK;
2174 }
2175 
2176 
2177 static nxt_http_forward_t *
2178 nxt_router_conf_forward(nxt_task_t *task, nxt_mp_t *mp, nxt_conf_value_t *conf)
2179 {
2180     nxt_int_t                   ret;
2181     nxt_conf_value_t            *header_conf, *client_ip_conf, *protocol_conf;
2182     nxt_conf_value_t            *source_conf, *recursive_conf;
2183     nxt_http_forward_t          *forward;
2184     nxt_http_route_addr_rule_t  *source;
2185 
2186     static nxt_str_t  header_path = nxt_string("/header");
2187     static nxt_str_t  client_ip_path = nxt_string("/client_ip");
2188     static nxt_str_t  protocol_path = nxt_string("/protocol");
2189     static nxt_str_t  source_path = nxt_string("/source");
2190     static nxt_str_t  recursive_path = nxt_string("/recursive");
2191 
2192     header_conf = nxt_conf_get_path(conf, &header_path);
2193 
2194     if (header_conf != NULL) {
2195         client_ip_conf = nxt_conf_get_path(conf, &header_path);
2196         protocol_conf = NULL;
2197 
2198     } else {
2199         client_ip_conf = nxt_conf_get_path(conf, &client_ip_path);
2200         protocol_conf = nxt_conf_get_path(conf, &protocol_path);
2201     }
2202 
2203     source_conf = nxt_conf_get_path(conf, &source_path);
2204     recursive_conf = nxt_conf_get_path(conf, &recursive_path);
2205 
2206     if (source_conf == NULL
2207         || (protocol_conf == NULL && client_ip_conf == NULL))
2208     {
2209         return NULL;
2210     }
2211 
2212     forward = nxt_mp_zget(mp, sizeof(nxt_http_forward_t));
2213     if (nxt_slow_path(forward == NULL)) {
2214         return NULL;
2215     }
2216 
2217     source = nxt_http_route_addr_rule_create(task, mp, source_conf);
2218     if (nxt_slow_path(source == NULL)) {
2219         return NULL;
2220     }
2221 
2222     forward->source = source;
2223 
2224     if (recursive_conf != NULL) {
2225         forward->recursive = nxt_conf_get_boolean(recursive_conf);
2226     }
2227 
2228     if (client_ip_conf != NULL) {
2229         ret = nxt_router_conf_forward_header(mp, client_ip_conf,
2230                                              &forward->client_ip);
2231         if (nxt_slow_path(ret != NXT_OK)) {
2232             return NULL;
2233         }
2234     }
2235 
2236     if (protocol_conf != NULL) {
2237         ret = nxt_router_conf_forward_header(mp, protocol_conf,
2238                                              &forward->protocol);
2239         if (nxt_slow_path(ret != NXT_OK)) {
2240             return NULL;
2241         }
2242     }
2243 
2244     return forward;
2245 }
2246 
2247 
2248 static nxt_int_t
2249 nxt_router_conf_forward_header(nxt_mp_t *mp, nxt_conf_value_t *conf,
2250     nxt_http_forward_header_t *fh)
2251 {
2252     char       c;
2253     size_t     i;
2254     uint32_t   hash;
2255     nxt_str_t  header;
2256 
2257     nxt_conf_get_string(conf, &header);
2258 
2259     fh->header = nxt_str_dup(mp, NULL, &header);
2260     if (nxt_slow_path(fh->header == NULL)) {
2261         return NXT_ERROR;
2262     }
2263 
2264     hash = NXT_HTTP_FIELD_HASH_INIT;
2265 
2266     for (i = 0; i < fh->header->length; i++) {
2267         c = fh->header->start[i];
2268         hash = nxt_http_field_hash_char(hash, nxt_lowcase(c));
2269     }
2270 
2271     hash = nxt_http_field_hash_end(hash) & 0xFFFF;
2272 
2273     fh->header_hash = hash;
2274 
2275     return NXT_OK;
2276 }
2277 
2278 
2279 static nxt_app_t *
2280 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
2281 {
2282     nxt_app_t  *app;
2283 
2284     nxt_queue_each(app, queue, nxt_app_t, link) {
2285 
2286         if (nxt_strstr_eq(name, &app->name)) {
2287             return app;
2288         }
2289 
2290     } nxt_queue_loop;
2291 
2292     return NULL;
2293 }
2294 
2295 
2296 static nxt_int_t
2297 nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port)
2298 {
2299     void       *mem;
2300     nxt_int_t  fd;
2301 
2302     fd = nxt_shm_open(task, sizeof(nxt_app_queue_t));
2303     if (nxt_slow_path(fd == -1)) {
2304         return NXT_ERROR;
2305     }
2306 
2307     mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t),
2308                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2309     if (nxt_slow_path(mem == MAP_FAILED)) {
2310         nxt_fd_close(fd);
2311 
2312         return NXT_ERROR;
2313     }
2314 
2315     nxt_app_queue_init(mem);
2316 
2317     port->queue_fd = fd;
2318     port->queue = mem;
2319 
2320     return NXT_OK;
2321 }
2322 
2323 
2324 static nxt_int_t
2325 nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port)
2326 {
2327     void       *mem;
2328     nxt_int_t  fd;
2329 
2330     fd = nxt_shm_open(task, sizeof(nxt_port_queue_t));
2331     if (nxt_slow_path(fd == -1)) {
2332         return NXT_ERROR;
2333     }
2334 
2335     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2336                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2337     if (nxt_slow_path(mem == MAP_FAILED)) {
2338         nxt_fd_close(fd);
2339 
2340         return NXT_ERROR;
2341     }
2342 
2343     nxt_port_queue_init(mem);
2344 
2345     port->queue_fd = fd;
2346     port->queue = mem;
2347 
2348     return NXT_OK;
2349 }
2350 
2351 
2352 static nxt_int_t
2353 nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd)
2354 {
2355     void  *mem;
2356 
2357     nxt_assert(fd != -1);
2358 
2359     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2360                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2361     if (nxt_slow_path(mem == MAP_FAILED)) {
2362 
2363         return NXT_ERROR;
2364     }
2365 
2366     port->queue = mem;
2367 
2368     return NXT_OK;
2369 }
2370 
2371 
2372 static const nxt_lvlhsh_proto_t  nxt_router_apps_hash_proto  nxt_aligned(64) = {
2373     NXT_LVLHSH_DEFAULT,
2374     nxt_router_apps_hash_test,
2375     nxt_mp_lvlhsh_alloc,
2376     nxt_mp_lvlhsh_free,
2377 };
2378 
2379 
2380 static nxt_int_t
2381 nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
2382 {
2383     nxt_app_t  *app;
2384 
2385     app = data;
2386 
2387     return nxt_strstr_eq(&lhq->key, &app->name) ? NXT_OK : NXT_DECLINED;
2388 }
2389 
2390 
2391 static nxt_int_t
2392 nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, nxt_app_t *app)
2393 {
2394     nxt_lvlhsh_query_t  lhq;
2395 
2396     lhq.key_hash = nxt_djb_hash(app->name.start, app->name.length);
2397     lhq.replace = 0;
2398     lhq.key = app->name;
2399     lhq.value = app;
2400     lhq.proto = &nxt_router_apps_hash_proto;
2401     lhq.pool = rtcf->mem_pool;
2402 
2403     switch (nxt_lvlhsh_insert(&rtcf->apps_hash, &lhq)) {
2404 
2405     case NXT_OK:
2406         return NXT_OK;
2407 
2408     case NXT_DECLINED:
2409         nxt_thread_log_alert("router app hash adding failed: "
2410                              "\"%V\" is already in hash", &lhq.key);
2411         /* Fall through. */
2412     default:
2413         return NXT_ERROR;
2414     }
2415 }
2416 
2417 
2418 static nxt_app_t *
2419 nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, nxt_str_t *name)
2420 {
2421     nxt_lvlhsh_query_t  lhq;
2422 
2423     lhq.key_hash = nxt_djb_hash(name->start, name->length);
2424     lhq.key = *name;
2425     lhq.proto = &nxt_router_apps_hash_proto;
2426 
2427     if (nxt_lvlhsh_find(&rtcf->apps_hash, &lhq) != NXT_OK) {
2428         return NULL;
2429     }
2430 
2431     return lhq.value;
2432 }
2433 
2434 
2435 static void
2436 nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i)
2437 {
2438     nxt_app_t          *app;
2439     nxt_lvlhsh_each_t  lhe;
2440 
2441     nxt_lvlhsh_each_init(&lhe, &nxt_router_apps_hash_proto);
2442 
2443     for ( ;; ) {
2444         app = nxt_lvlhsh_each(&rtcf->apps_hash, &lhe);
2445 
2446         if (app == NULL) {
2447             break;
2448         }
2449 
2450         nxt_router_app_use(task, app, i);
2451     }
2452 }
2453 
2454 
2455 typedef struct {
2456     nxt_app_t  *app;
2457     nxt_int_t  target;
2458 } nxt_http_app_conf_t;
2459 
2460 
2461 nxt_int_t
2462 nxt_router_application_init(nxt_router_conf_t *rtcf, nxt_str_t *name,
2463     nxt_str_t *target, nxt_http_action_t *action)
2464 {
2465     nxt_app_t            *app;
2466     nxt_str_t            *targets;
2467     nxt_uint_t           i;
2468     nxt_http_app_conf_t  *conf;
2469 
2470     app = nxt_router_apps_hash_get(rtcf, name);
2471     if (app == NULL) {
2472         return NXT_DECLINED;
2473     }
2474 
2475     conf = nxt_mp_get(rtcf->mem_pool, sizeof(nxt_http_app_conf_t));
2476     if (nxt_slow_path(conf == NULL)) {
2477         return NXT_ERROR;
2478     }
2479 
2480     action->handler = nxt_http_application_handler;
2481     action->u.conf = conf;
2482 
2483     conf->app = app;
2484 
2485     if (target != NULL && target->length != 0) {
2486         targets = app->targets;
2487 
2488         for (i = 0; !nxt_strstr_eq(target, &targets[i]); i++);
2489 
2490         conf->target = i;
2491 
2492     } else {
2493         conf->target = 0;
2494     }
2495 
2496     return NXT_OK;
2497 }
2498 
2499 
2500 static nxt_socket_conf_t *
2501 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2502     nxt_str_t *name)
2503 {
2504     size_t               size;
2505     nxt_int_t            ret;
2506     nxt_bool_t           wildcard;
2507     nxt_sockaddr_t       *sa;
2508     nxt_socket_conf_t    *skcf;
2509     nxt_listen_socket_t  *ls;
2510 
2511     sa = nxt_sockaddr_parse(tmcf->mem_pool, name);
2512     if (nxt_slow_path(sa == NULL)) {
2513         nxt_alert(task, "invalid listener \"%V\"", name);
2514         return NULL;
2515     }
2516 
2517     sa->type = SOCK_STREAM;
2518 
2519     nxt_debug(task, "router listener: \"%*s\"",
2520               (size_t) sa->length, nxt_sockaddr_start(sa));
2521 
2522     skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t));
2523     if (nxt_slow_path(skcf == NULL)) {
2524         return NULL;
2525     }
2526 
2527     size = nxt_sockaddr_size(sa);
2528 
2529     ret = nxt_router_listen_socket_find(tmcf, skcf, sa);
2530 
2531     if (ret != NXT_OK) {
2532 
2533         ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size);
2534         if (nxt_slow_path(ls == NULL)) {
2535             return NULL;
2536         }
2537 
2538         skcf->listen = ls;
2539 
2540         ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t));
2541         nxt_memcpy(ls->sockaddr, sa, size);
2542 
2543         nxt_listen_socket_remote_size(ls);
2544 
2545         ls->socket = -1;
2546         ls->backlog = NXT_LISTEN_BACKLOG;
2547         ls->flags = NXT_NONBLOCK;
2548         ls->read_after_accept = 1;
2549     }
2550 
2551     switch (sa->u.sockaddr.sa_family) {
2552 #if (NXT_HAVE_UNIX_DOMAIN)
2553     case AF_UNIX:
2554         wildcard = 0;
2555         break;
2556 #endif
2557 #if (NXT_INET6)
2558     case AF_INET6:
2559         wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr);
2560         break;
2561 #endif
2562     case AF_INET:
2563     default:
2564         wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY);
2565         break;
2566     }
2567 
2568     if (!wildcard) {
2569         skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size);
2570         if (nxt_slow_path(skcf->sockaddr == NULL)) {
2571             return NULL;
2572         }
2573 
2574         nxt_memcpy(skcf->sockaddr, sa, size);
2575     }
2576 
2577     return skcf;
2578 }
2579 
2580 
2581 static nxt_int_t
2582 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
2583     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa)
2584 {
2585     nxt_router_t       *router;
2586     nxt_queue_link_t   *qlk;
2587     nxt_socket_conf_t  *skcf;
2588 
2589     router = tmcf->router_conf->router;
2590 
2591     for (qlk = nxt_queue_first(&router->sockets);
2592          qlk != nxt_queue_tail(&router->sockets);
2593          qlk = nxt_queue_next(qlk))
2594     {
2595         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2596 
2597         if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) {
2598             nskcf->listen = skcf->listen;
2599 
2600             nxt_queue_remove(qlk);
2601             nxt_queue_insert_tail(&keeping_sockets, qlk);
2602 
2603             nxt_queue_insert_tail(&updating_sockets, &nskcf->link);
2604 
2605             return NXT_OK;
2606         }
2607     }
2608 
2609     nxt_queue_insert_tail(&pending_sockets, &nskcf->link);
2610 
2611     return NXT_DECLINED;
2612 }
2613 
2614 
2615 static void
2616 nxt_router_listen_socket_rpc_create(nxt_task_t *task,
2617     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)
2618 {
2619     size_t            size;
2620     uint32_t          stream;
2621     nxt_int_t         ret;
2622     nxt_buf_t         *b;
2623     nxt_port_t        *main_port, *router_port;
2624     nxt_runtime_t     *rt;
2625     nxt_socket_rpc_t  *rpc;
2626 
2627     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
2628     if (rpc == NULL) {
2629         goto fail;
2630     }
2631 
2632     rpc->socket_conf = skcf;
2633     rpc->temp_conf = tmcf;
2634 
2635     size = nxt_sockaddr_size(skcf->listen->sockaddr);
2636 
2637     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2638     if (b == NULL) {
2639         goto fail;
2640     }
2641 
2642     b->completion_handler = nxt_buf_dummy_completion;
2643 
2644     b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size);
2645 
2646     rt = task->thread->runtime;
2647     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2648     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2649 
2650     stream = nxt_port_rpc_register_handler(task, router_port,
2651                                            nxt_router_listen_socket_ready,
2652                                            nxt_router_listen_socket_error,
2653                                            main_port->pid, rpc);
2654     if (nxt_slow_path(stream == 0)) {
2655         goto fail;
2656     }
2657 
2658     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1,
2659                                 stream, router_port->id, b);
2660 
2661     if (nxt_slow_path(ret != NXT_OK)) {
2662         nxt_port_rpc_cancel(task, router_port, stream);
2663         goto fail;
2664     }
2665 
2666     return;
2667 
2668 fail:
2669 
2670     nxt_router_conf_error(task, tmcf);
2671 }
2672 
2673 
2674 static void
2675 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2676     void *data)
2677 {
2678     nxt_int_t         ret;
2679     nxt_socket_t      s;
2680     nxt_socket_rpc_t  *rpc;
2681 
2682     rpc = data;
2683 
2684     s = msg->fd[0];
2685 
2686     ret = nxt_socket_nonblocking(task, s);
2687     if (nxt_slow_path(ret != NXT_OK)) {
2688         goto fail;
2689     }
2690 
2691     nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr);
2692 
2693     ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
2694     if (nxt_slow_path(ret != NXT_OK)) {
2695         goto fail;
2696     }
2697 
2698     rpc->socket_conf->listen->socket = s;
2699 
2700     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2701                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2702 
2703     return;
2704 
2705 fail:
2706 
2707     nxt_socket_close(task, s);
2708 
2709     nxt_router_conf_error(task, rpc->temp_conf);
2710 }
2711 
2712 
2713 static void
2714 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2715     void *data)
2716 {
2717     nxt_socket_rpc_t        *rpc;
2718     nxt_router_temp_conf_t  *tmcf;
2719 
2720     rpc = data;
2721     tmcf = rpc->temp_conf;
2722 
2723 #if 0
2724     u_char                  *p;
2725     size_t                  size;
2726     uint8_t                 error;
2727     nxt_buf_t               *in, *out;
2728     nxt_sockaddr_t          *sa;
2729 
2730     static nxt_str_t  socket_errors[] = {
2731         nxt_string("ListenerSystem"),
2732         nxt_string("ListenerNoIPv6"),
2733         nxt_string("ListenerPort"),
2734         nxt_string("ListenerInUse"),
2735         nxt_string("ListenerNoAddress"),
2736         nxt_string("ListenerNoAccess"),
2737         nxt_string("ListenerPath"),
2738     };
2739 
2740     sa = rpc->socket_conf->listen->sockaddr;
2741 
2742     in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size);
2743 
2744     if (nxt_slow_path(in == NULL)) {
2745         return;
2746     }
2747 
2748     p = in->mem.pos;
2749 
2750     error = *p++;
2751 
2752     size = nxt_length("listen socket error: ")
2753            + nxt_length("{listener: \"\", code:\"\", message: \"\"}")
2754            + sa->length + socket_errors[error].length + (in->mem.free - p);
2755 
2756     out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2757     if (nxt_slow_path(out == NULL)) {
2758         return;
2759     }
2760 
2761     out->mem.free = nxt_sprintf(out->mem.free, out->mem.end,
2762                         "listen socket error: "
2763                         "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}",
2764                         (size_t) sa->length, nxt_sockaddr_start(sa),
2765                         &socket_errors[error], in->mem.free - p, p);
2766 
2767     nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos);
2768 #endif
2769 
2770     nxt_router_conf_error(task, tmcf);
2771 }
2772 
2773 
2774 #if (NXT_TLS)
2775 
2776 static void
2777 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2778     void *data)
2779 {
2780     nxt_mp_t                *mp;
2781     nxt_int_t               ret;
2782     nxt_tls_conf_t          *tlscf;
2783     nxt_router_tlssock_t    *tls;
2784     nxt_tls_bundle_conf_t   *bundle;
2785     nxt_router_temp_conf_t  *tmcf;
2786 
2787     nxt_debug(task, "tls rpc handler");
2788 
2789     tls = data;
2790     tmcf = tls->temp_conf;
2791 
2792     if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) {
2793         goto fail;
2794     }
2795 
2796     mp = tmcf->router_conf->mem_pool;
2797 
2798     if (tls->socket_conf->tls == NULL){
2799         tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t));
2800         if (nxt_slow_path(tlscf == NULL)) {
2801             goto fail;
2802         }
2803 
2804         tlscf->no_wait_shutdown = 1;
2805         tls->socket_conf->tls = tlscf;
2806 
2807     } else {
2808         tlscf = tls->socket_conf->tls;
2809     }
2810 
2811     tls->tls_init->conf = tlscf;
2812 
2813     bundle = nxt_mp_get(mp, sizeof(nxt_tls_bundle_conf_t));
2814     if (nxt_slow_path(bundle == NULL)) {
2815         goto fail;
2816     }
2817 
2818     if (nxt_slow_path(nxt_str_dup(mp, &bundle->name, &tls->name) == NULL)) {
2819         goto fail;
2820     }
2821 
2822     bundle->chain_file = msg->fd[0];
2823     bundle->next = tlscf->bundle;
2824     tlscf->bundle = bundle;
2825 
2826     ret = task->thread->runtime->tls->server_init(task, mp, tls->tls_init,
2827                                                   tls->last);
2828     if (nxt_slow_path(ret != NXT_OK)) {
2829         goto fail;
2830     }
2831 
2832     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2833                        nxt_router_conf_apply, task, tmcf, NULL);
2834     return;
2835 
2836 fail:
2837 
2838     nxt_router_conf_error(task, tmcf);
2839 }
2840 
2841 #endif
2842 
2843 
2844 static void
2845 nxt_router_app_rpc_create(nxt_task_t *task,
2846     nxt_router_temp_conf_t *tmcf, nxt_app_t *app)
2847 {
2848     size_t         size;
2849     uint32_t       stream;
2850     nxt_fd_t       port_fd, queue_fd;
2851     nxt_int_t      ret;
2852     nxt_buf_t      *b;
2853     nxt_port_t     *router_port, *dport;
2854     nxt_runtime_t  *rt;
2855     nxt_app_rpc_t  *rpc;
2856 
2857     rt = task->thread->runtime;
2858 
2859     dport = app->proto_port;
2860 
2861     if (dport == NULL) {
2862         nxt_debug(task, "app '%V' prototype prefork", &app->name);
2863 
2864         size = app->name.length + 1 + app->conf.length;
2865 
2866         b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2867         if (nxt_slow_path(b == NULL)) {
2868             goto fail;
2869         }
2870 
2871         b->completion_handler = nxt_buf_dummy_completion;
2872 
2873         nxt_buf_cpystr(b, &app->name);
2874         *b->mem.free++ = '\0';
2875         nxt_buf_cpystr(b, &app->conf);
2876 
2877         dport = rt->port_by_type[NXT_PROCESS_MAIN];
2878 
2879         port_fd = app->shared_port->pair[0];
2880         queue_fd = app->shared_port->queue_fd;
2881 
2882     } else {
2883         nxt_debug(task, "app '%V' prefork", &app->name);
2884 
2885         b = NULL;
2886         port_fd = -1;
2887         queue_fd = -1;
2888     }
2889 
2890     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2891 
2892     rpc = nxt_port_rpc_register_handler_ex(task, router_port,
2893                                            nxt_router_app_prefork_ready,
2894                                            nxt_router_app_prefork_error,
2895                                            sizeof(nxt_app_rpc_t));
2896     if (nxt_slow_path(rpc == NULL)) {
2897         goto fail;
2898     }
2899 
2900     rpc->app = app;
2901     rpc->temp_conf = tmcf;
2902     rpc->proto = (b != NULL);
2903 
2904     stream = nxt_port_rpc_ex_stream(rpc);
2905 
2906     ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS,
2907                                  port_fd, queue_fd, stream, router_port->id, b);
2908     if (nxt_slow_path(ret != NXT_OK)) {
2909         nxt_port_rpc_cancel(task, router_port, stream);
2910         goto fail;
2911     }
2912 
2913     if (b == NULL) {
2914         nxt_port_rpc_ex_set_peer(task, router_port, rpc, dport->pid);
2915 
2916         app->pending_processes++;
2917     }
2918 
2919     return;
2920 
2921 fail:
2922 
2923     nxt_router_conf_error(task, tmcf);
2924 }
2925 
2926 
2927 static void
2928 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2929     void *data)
2930 {
2931     nxt_app_t           *app;
2932     nxt_port_t          *port;
2933     nxt_app_rpc_t       *rpc;
2934     nxt_event_engine_t  *engine;
2935 
2936     rpc = data;
2937     app = rpc->app;
2938 
2939     port = msg->u.new_port;
2940 
2941     nxt_assert(port != NULL);
2942     nxt_assert(port->id == 0);
2943 
2944     if (rpc->proto) {
2945         nxt_assert(app->proto_port == NULL);
2946         nxt_assert(port->type == NXT_PROCESS_PROTOTYPE);
2947 
2948         nxt_port_inc_use(port);
2949 
2950         app->proto_port = port;
2951         port->app = app;
2952 
2953         nxt_router_app_rpc_create(task, rpc->temp_conf, app);
2954 
2955         return;
2956     }
2957 
2958     nxt_assert(port->type == NXT_PROCESS_APP);
2959 
2960     port->app = app;
2961     port->main_app_port = port;
2962 
2963     app->pending_processes--;
2964     app->processes++;
2965     app->idle_processes++;
2966 
2967     engine = task->thread->engine;
2968 
2969     nxt_queue_insert_tail(&app->ports, &port->app_link);
2970     nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
2971 
2972     nxt_debug(task, "app '%V' move new port %PI:%d to spare_ports",
2973               &app->name, port->pid, port->id);
2974 
2975     nxt_port_hash_add(&app->port_hash, port);
2976     app->port_hash_count++;
2977 
2978     port->idle_start = 0;
2979 
2980     nxt_port_inc_use(port);
2981 
2982     nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
2983 
2984     nxt_work_queue_add(&engine->fast_work_queue,
2985                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2986 }
2987 
2988 
2989 static void
2990 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2991     void *data)
2992 {
2993     nxt_app_t               *app;
2994     nxt_app_rpc_t           *rpc;
2995     nxt_router_temp_conf_t  *tmcf;
2996 
2997     rpc = data;
2998     app = rpc->app;
2999     tmcf = rpc->temp_conf;
3000 
3001     if (rpc->proto) {
3002         nxt_log(task, NXT_LOG_WARN, "failed to start prototype \"%V\"",
3003                 &app->name);
3004 
3005     } else {
3006         nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"",
3007                 &app->name);
3008 
3009         app->pending_processes--;
3010     }
3011 
3012     nxt_router_conf_error(task, tmcf);
3013 }
3014 
3015 
3016 static nxt_int_t
3017 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
3018     nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
3019 {
3020     nxt_int_t                 ret;
3021     nxt_uint_t                n, threads;
3022     nxt_queue_link_t          *qlk;
3023     nxt_router_engine_conf_t  *recf;
3024 
3025     threads = tmcf->router_conf->threads;
3026 
3027     tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
3028                                      sizeof(nxt_router_engine_conf_t));
3029     if (nxt_slow_path(tmcf->engines == NULL)) {
3030         return NXT_ERROR;
3031     }
3032 
3033     n = 0;
3034 
3035     for (qlk = nxt_queue_first(&router->engines);
3036          qlk != nxt_queue_tail(&router->engines);
3037          qlk = nxt_queue_next(qlk))
3038     {
3039         recf = nxt_array_zero_add(tmcf->engines);
3040         if (nxt_slow_path(recf == NULL)) {
3041             return NXT_ERROR;
3042         }
3043 
3044         recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0);
3045 
3046         if (n < threads) {
3047             recf->action = NXT_ROUTER_ENGINE_KEEP;
3048             ret = nxt_router_engine_conf_update(tmcf, recf);
3049 
3050         } else {
3051             recf->action = NXT_ROUTER_ENGINE_DELETE;
3052             ret = nxt_router_engine_conf_delete(tmcf, recf);
3053         }
3054 
3055         if (nxt_slow_path(ret != NXT_OK)) {
3056             return ret;
3057         }
3058 
3059         n++;
3060     }
3061 
3062     tmcf->new_threads = n;
3063 
3064     while (n < threads) {
3065         recf = nxt_array_zero_add(tmcf->engines);
3066         if (nxt_slow_path(recf == NULL)) {
3067             return NXT_ERROR;
3068         }
3069 
3070         recf->action = NXT_ROUTER_ENGINE_ADD;
3071 
3072         recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
3073         if (nxt_slow_path(recf->engine == NULL)) {
3074             return NXT_ERROR;
3075         }
3076 
3077         ret = nxt_router_engine_conf_create(tmcf, recf);
3078         if (nxt_slow_path(ret != NXT_OK)) {
3079             return ret;
3080         }
3081 
3082         n++;
3083     }
3084 
3085     return NXT_OK;
3086 }
3087 
3088 
3089 static nxt_int_t
3090 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
3091     nxt_router_engine_conf_t *recf)
3092 {
3093     nxt_int_t  ret;
3094 
3095     ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
3096                                           nxt_router_listen_socket_create);
3097     if (nxt_slow_path(ret != NXT_OK)) {
3098         return ret;
3099     }
3100 
3101     ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
3102                                           nxt_router_listen_socket_create);
3103     if (nxt_slow_path(ret != NXT_OK)) {
3104         return ret;
3105     }
3106 
3107     return ret;
3108 }
3109 
3110 
3111 static nxt_int_t
3112 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
3113     nxt_router_engine_conf_t *recf)
3114 {
3115     nxt_int_t  ret;
3116 
3117     ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
3118                                           nxt_router_listen_socket_create);
3119     if (nxt_slow_path(ret != NXT_OK)) {
3120         return ret;
3121     }
3122 
3123     ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
3124                                           nxt_router_listen_socket_update);
3125     if (nxt_slow_path(ret != NXT_OK)) {
3126         return ret;
3127     }
3128 
3129     ret = nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
3130     if (nxt_slow_path(ret != NXT_OK)) {
3131         return ret;
3132     }
3133 
3134     return ret;
3135 }
3136 
3137 
3138 static nxt_int_t
3139 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
3140     nxt_router_engine_conf_t *recf)
3141 {
3142     nxt_int_t  ret;
3143 
3144     ret = nxt_router_engine_quit(tmcf, recf);
3145     if (nxt_slow_path(ret != NXT_OK)) {
3146         return ret;
3147     }
3148 
3149     ret = nxt_router_engine_joints_delete(tmcf, recf, &updating_sockets);
3150     if (nxt_slow_path(ret != NXT_OK)) {
3151         return ret;
3152     }
3153 
3154     return nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
3155 }
3156 
3157 
3158 static nxt_int_t
3159 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
3160     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
3161     nxt_work_handler_t handler)
3162 {
3163     nxt_int_t                ret;
3164     nxt_joint_job_t          *job;
3165     nxt_queue_link_t         *qlk;
3166     nxt_socket_conf_t        *skcf;
3167     nxt_socket_conf_joint_t  *joint;
3168 
3169     for (qlk = nxt_queue_first(sockets);
3170          qlk != nxt_queue_tail(sockets);
3171          qlk = nxt_queue_next(qlk))
3172     {
3173         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3174         if (nxt_slow_path(job == NULL)) {
3175             return NXT_ERROR;
3176         }
3177 
3178         job->work.next = recf->jobs;
3179         recf->jobs = &job->work;
3180 
3181         job->task = tmcf->engine->task;
3182         job->work.handler = handler;
3183         job->work.task = &job->task;
3184         job->work.obj = job;
3185         job->tmcf = tmcf;
3186 
3187         tmcf->count++;
3188 
3189         joint = nxt_mp_alloc(tmcf->router_conf->mem_pool,
3190                              sizeof(nxt_socket_conf_joint_t));
3191         if (nxt_slow_path(joint == NULL)) {
3192             return NXT_ERROR;
3193         }
3194 
3195         job->work.data = joint;
3196 
3197         ret = nxt_upstreams_joint_create(tmcf, &joint->upstreams);
3198         if (nxt_slow_path(ret != NXT_OK)) {
3199             return ret;
3200         }
3201 
3202         joint->count = 1;
3203 
3204         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
3205         skcf->count++;
3206         joint->socket_conf = skcf;
3207 
3208         joint->engine = recf->engine;
3209     }
3210 
3211     return NXT_OK;
3212 }
3213 
3214 
3215 static nxt_int_t
3216 nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
3217     nxt_router_engine_conf_t *recf)
3218 {
3219     nxt_joint_job_t  *job;
3220 
3221     job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3222     if (nxt_slow_path(job == NULL)) {
3223         return NXT_ERROR;
3224     }
3225 
3226     job->work.next = recf->jobs;
3227     recf->jobs = &job->work;
3228 
3229     job->task = tmcf->engine->task;
3230     job->work.handler = nxt_router_worker_thread_quit;
3231     job->work.task = &job->task;
3232     job->work.obj = NULL;
3233     job->work.data = NULL;
3234     job->tmcf = NULL;
3235 
3236     return NXT_OK;
3237 }
3238 
3239 
3240 static nxt_int_t
3241 nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
3242     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets)
3243 {
3244     nxt_joint_job_t   *job;
3245     nxt_queue_link_t  *qlk;
3246 
3247     for (qlk = nxt_queue_first(sockets);
3248          qlk != nxt_queue_tail(sockets);
3249          qlk = nxt_queue_next(qlk))
3250     {
3251         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3252         if (nxt_slow_path(job == NULL)) {
3253             return NXT_ERROR;
3254         }
3255 
3256         job->work.next = recf->jobs;
3257         recf->jobs = &job->work;
3258 
3259         job->task = tmcf->engine->task;
3260         job->work.handler = nxt_router_listen_socket_delete;
3261         job->work.task = &job->task;
3262         job->work.obj = job;
3263         job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
3264         job->tmcf = tmcf;
3265 
3266         tmcf->count++;
3267     }
3268 
3269     return NXT_OK;
3270 }
3271 
3272 
3273 static nxt_int_t
3274 nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
3275     nxt_router_temp_conf_t *tmcf)
3276 {
3277     nxt_int_t                 ret;
3278     nxt_uint_t                i, threads;
3279     nxt_router_engine_conf_t  *recf;
3280 
3281     recf = tmcf->engines->elts;
3282     threads = tmcf->router_conf->threads;
3283 
3284     for (i = tmcf->new_threads; i < threads; i++) {
3285         ret = nxt_router_thread_create(task, rt, recf[i].engine);
3286         if (nxt_slow_path(ret != NXT_OK)) {
3287             return ret;
3288         }
3289     }
3290 
3291     return NXT_OK;
3292 }
3293 
3294 
3295 static nxt_int_t
3296 nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
3297     nxt_event_engine_t *engine)
3298 {
3299     nxt_int_t            ret;
3300     nxt_thread_link_t    *link;
3301     nxt_thread_handle_t  handle;
3302 
3303     link = nxt_zalloc(sizeof(nxt_thread_link_t));
3304 
3305     if (nxt_slow_path(link == NULL)) {
3306         return NXT_ERROR;
3307     }
3308 
3309     link->start = nxt_router_thread_start;
3310     link->engine = engine;
3311     link->work.handler = nxt_router_thread_exit_handler;
3312     link->work.task = task;
3313     link->work.data = link;
3314 
3315     nxt_queue_insert_tail(&rt->engines, &engine->link);
3316 
3317     ret = nxt_thread_create(&handle, link);
3318 
3319     if (nxt_slow_path(ret != NXT_OK)) {
3320         nxt_queue_remove(&engine->link);
3321     }
3322 
3323     return ret;
3324 }
3325 
3326 
3327 static void
3328 nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
3329     nxt_router_temp_conf_t *tmcf)
3330 {
3331     nxt_app_t  *app;
3332 
3333     nxt_queue_each(app, &router->apps, nxt_app_t, link) {
3334 
3335         nxt_router_app_unlink(task, app);
3336 
3337     } nxt_queue_loop;
3338 
3339     nxt_queue_add(&router->apps, &tmcf->previous);
3340     nxt_queue_add(&router->apps, &tmcf->apps);
3341 }
3342 
3343 
3344 static void
3345 nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf)
3346 {
3347     nxt_uint_t                n;
3348     nxt_event_engine_t        *engine;
3349     nxt_router_engine_conf_t  *recf;
3350 
3351     recf = tmcf->engines->elts;
3352 
3353     for (n = tmcf->engines->nelts; n != 0; n--) {
3354         engine = recf->engine;
3355 
3356         switch (recf->action) {
3357 
3358         case NXT_ROUTER_ENGINE_KEEP:
3359             break;
3360 
3361         case NXT_ROUTER_ENGINE_ADD:
3362             nxt_queue_insert_tail(&router->engines, &engine->link0);
3363             break;
3364 
3365         case NXT_ROUTER_ENGINE_DELETE:
3366             nxt_queue_remove(&engine->link0);
3367             break;
3368         }
3369 
3370         nxt_router_engine_post(engine, recf->jobs);
3371 
3372         recf++;
3373     }
3374 }
3375 
3376 
3377 static void
3378 nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs)
3379 {
3380     nxt_work_t  *work, *next;
3381 
3382     for (work = jobs; work != NULL; work = next) {
3383         next = work->next;
3384         work->next = NULL;
3385 
3386         nxt_event_engine_post(engine, work);
3387     }
3388 }
3389 
3390 
3391 static nxt_port_handlers_t  nxt_router_app_port_handlers = {
3392     .rpc_error       = nxt_port_rpc_handler,
3393     .mmap            = nxt_port_mmap_handler,
3394     .data            = nxt_port_rpc_handler,
3395     .oosm            = nxt_router_oosm_handler,
3396     .req_headers_ack = nxt_port_rpc_handler,
3397 };
3398 
3399 
3400 static void
3401 nxt_router_thread_start(void *data)
3402 {
3403     nxt_int_t           ret;
3404     nxt_port_t          *port;
3405     nxt_task_t          *task;
3406     nxt_work_t          *work;
3407     nxt_thread_t        *thread;
3408     nxt_thread_link_t   *link;
3409     nxt_event_engine_t  *engine;
3410 
3411     link = data;
3412     engine = link->engine;
3413     task = &engine->task;
3414 
3415     thread = nxt_thread();
3416 
3417     nxt_event_engine_thread_adopt(engine);
3418 
3419     /* STUB */
3420     thread->runtime = engine->task.thread->runtime;
3421 
3422     engine->task.thread = thread;
3423     engine->task.log = thread->log;
3424     thread->engine = engine;
3425     thread->task = &engine->task;
3426 #if 0
3427     thread->fiber = &engine->fibers->fiber;
3428 #endif
3429 
3430     engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
3431     if (nxt_slow_path(engine->mem_pool == NULL)) {
3432         return;
3433     }
3434 
3435     port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid,
3436                         NXT_PROCESS_ROUTER);
3437     if (nxt_slow_path(port == NULL)) {
3438         return;
3439     }
3440 
3441     ret = nxt_port_socket_init(task, port, 0);
3442     if (nxt_slow_path(ret != NXT_OK)) {
3443         nxt_port_use(task, port, -1);
3444         return;
3445     }
3446 
3447     ret = nxt_router_port_queue_init(task, port);
3448     if (nxt_slow_path(ret != NXT_OK)) {
3449         nxt_port_use(task, port, -1);
3450         return;
3451     }
3452 
3453     engine->port = port;
3454 
3455     nxt_port_enable(task, port, &nxt_router_app_port_handlers);
3456 
3457     work = nxt_zalloc(sizeof(nxt_work_t));
3458     if (nxt_slow_path(work == NULL)) {
3459         return;
3460     }
3461 
3462     work->handler = nxt_router_rt_add_port;
3463     work->task = link->work.task;
3464     work->obj = work;
3465     work->data = port;
3466 
3467     nxt_event_engine_post(link->work.task->thread->engine, work);
3468 
3469     nxt_event_engine_start(engine);
3470 }
3471 
3472 
3473 static void
3474 nxt_router_rt_add_port(nxt_task_t *task, void *obj, void *data)
3475 {
3476     nxt_int_t      res;
3477     nxt_port_t     *port;
3478     nxt_runtime_t  *rt;
3479 
3480     rt = task->thread->runtime;
3481     port = data;
3482 
3483     nxt_free(obj);
3484 
3485     res = nxt_port_hash_add(&rt->ports, port);
3486 
3487     if (nxt_fast_path(res == NXT_OK)) {
3488         nxt_port_use(task, port, 1);
3489     }
3490 }
3491 
3492 
3493 static void
3494 nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data)
3495 {
3496     nxt_joint_job_t          *job;
3497     nxt_socket_conf_t        *skcf;
3498     nxt_listen_event_t       *lev;
3499     nxt_listen_socket_t      *ls;
3500     nxt_thread_spinlock_t    *lock;
3501     nxt_socket_conf_joint_t  *joint;
3502 
3503     job = obj;
3504     joint = data;
3505 
3506     nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link);
3507 
3508     skcf = joint->socket_conf;
3509     ls = skcf->listen;
3510 
3511     lev = nxt_listen_event(task, ls);
3512     if (nxt_slow_path(lev == NULL)) {
3513         nxt_router_listen_socket_release(task, skcf);
3514         return;
3515     }
3516 
3517     lev->socket.data = joint;
3518 
3519     lock = &skcf->router_conf->router->lock;
3520 
3521     nxt_thread_spin_lock(lock);
3522     ls->count++;
3523     nxt_thread_spin_unlock(lock);
3524 
3525     job->work.next = NULL;
3526     job->work.handler = nxt_router_conf_wait;
3527 
3528     nxt_event_engine_post(job->tmcf->engine, &job->work);
3529 }
3530 
3531 
3532 nxt_inline nxt_listen_event_t *
3533 nxt_router_listen_event(nxt_queue_t *listen_connections,
3534     nxt_socket_conf_t *skcf)
3535 {
3536     nxt_socket_t        fd;
3537     nxt_queue_link_t    *qlk;
3538     nxt_listen_event_t  *lev;
3539 
3540     fd = skcf->listen->socket;
3541 
3542     for (qlk = nxt_queue_first(listen_connections);
3543          qlk != nxt_queue_tail(listen_connections);
3544          qlk = nxt_queue_next(qlk))
3545     {
3546         lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link);
3547 
3548         if (fd == lev->socket.fd) {
3549             return lev;
3550         }
3551     }
3552 
3553     return NULL;
3554 }
3555 
3556 
3557 static void
3558 nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data)
3559 {
3560     nxt_joint_job_t          *job;
3561     nxt_event_engine_t       *engine;
3562     nxt_listen_event_t       *lev;
3563     nxt_socket_conf_joint_t  *joint, *old;
3564 
3565     job = obj;
3566     joint = data;
3567 
3568     engine = task->thread->engine;
3569 
3570     nxt_queue_insert_tail(&engine->joints, &joint->link);
3571 
3572     lev = nxt_router_listen_event(&engine->listen_connections,
3573                                   joint->socket_conf);
3574 
3575     old = lev->socket.data;
3576     lev->socket.data = joint;
3577     lev->listen = joint->socket_conf->listen;
3578 
3579     job->work.next = NULL;
3580     job->work.handler = nxt_router_conf_wait;
3581 
3582     nxt_event_engine_post(job->tmcf->engine, &job->work);
3583 
3584     /*
3585      * The task is allocated from configuration temporary
3586      * memory pool so it can be freed after engine post operation.
3587      */
3588 
3589     nxt_router_conf_release(&engine->task, old);
3590 }
3591 
3592 
3593 static void
3594 nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data)
3595 {
3596     nxt_socket_conf_t        *skcf;
3597     nxt_listen_event_t       *lev;
3598     nxt_event_engine_t       *engine;
3599     nxt_socket_conf_joint_t  *joint;
3600 
3601     skcf = data;
3602 
3603     engine = task->thread->engine;
3604 
3605     lev = nxt_router_listen_event(&engine->listen_connections, skcf);
3606 
3607     nxt_fd_event_delete(engine, &lev->socket);
3608 
3609     nxt_debug(task, "engine %p: listen socket delete: %d", engine,
3610               lev->socket.fd);
3611 
3612     joint = lev->socket.data;
3613     joint->close_job = obj;
3614 
3615     lev->timer.handler = nxt_router_listen_socket_close;
3616     lev->timer.work_queue = &engine->fast_work_queue;
3617 
3618     nxt_timer_add(engine, &lev->timer, 0);
3619 }
3620 
3621 
3622 static void
3623 nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, void *data)
3624 {
3625     nxt_event_engine_t  *engine;
3626 
3627     nxt_debug(task, "router worker thread quit");
3628 
3629     engine = task->thread->engine;
3630 
3631     engine->shutdown = 1;
3632 
3633     if (nxt_queue_is_empty(&engine->joints)) {
3634         nxt_thread_exit(task->thread);
3635     }
3636 }
3637 
3638 
3639 static void
3640 nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data)
3641 {
3642     nxt_timer_t              *timer;
3643     nxt_joint_job_t          *job;
3644     nxt_listen_event_t       *lev;
3645     nxt_socket_conf_joint_t  *joint;
3646 
3647     timer = obj;
3648     lev = nxt_timer_data(timer, nxt_listen_event_t, timer);
3649 
3650     nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine,
3651               lev->socket.fd);
3652 
3653     nxt_queue_remove(&lev->link);
3654 
3655     joint = lev->socket.data;
3656     lev->socket.data = NULL;
3657 
3658     /* 'task' refers to lev->task and we cannot use after nxt_free() */
3659     task = &task->thread->engine->task;
3660 
3661     nxt_router_listen_socket_release(task, joint->socket_conf);
3662 
3663     job = joint->close_job;
3664     job->work.next = NULL;
3665     job->work.handler = nxt_router_conf_wait;
3666 
3667     nxt_event_engine_post(job->tmcf->engine, &job->work);
3668 
3669     nxt_router_listen_event_release(task, lev, joint);
3670 }
3671 
3672 
3673 static void
3674 nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf)
3675 {
3676     nxt_listen_socket_t    *ls;
3677     nxt_thread_spinlock_t  *lock;
3678 
3679     ls = skcf->listen;
3680     lock = &skcf->router_conf->router->lock;
3681 
3682     nxt_thread_spin_lock(lock);
3683 
3684     nxt_debug(task, "engine %p: listen socket release: ls->count %D",
3685               task->thread->engine, ls->count);
3686 
3687     if (--ls->count != 0) {
3688         ls = NULL;
3689     }
3690 
3691     nxt_thread_spin_unlock(lock);
3692 
3693     if (ls != NULL) {
3694         nxt_socket_close(task, ls->socket);
3695         nxt_free(ls);
3696     }
3697 }
3698 
3699 
3700 void
3701 nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev,
3702     nxt_socket_conf_joint_t *joint)
3703 {
3704     nxt_event_engine_t  *engine;
3705 
3706     nxt_debug(task, "listen event count: %D", lev->count);
3707 
3708     engine = task->thread->engine;
3709 
3710     if (--lev->count == 0) {
3711         if (lev->next != NULL) {
3712             nxt_sockaddr_cache_free(engine, lev->next);
3713 
3714             nxt_conn_free(task, lev->next);
3715         }
3716 
3717         nxt_free(lev);
3718     }
3719 
3720     if (joint != NULL) {
3721         nxt_router_conf_release(task, joint);
3722     }
3723 
3724     if (engine->shutdown && nxt_queue_is_empty(&engine->joints)) {
3725         nxt_thread_exit(task->thread);
3726     }
3727 }
3728 
3729 
3730 void
3731 nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
3732 {
3733     nxt_socket_conf_t      *skcf;
3734     nxt_router_conf_t      *rtcf;
3735     nxt_thread_spinlock_t  *lock;
3736 
3737     nxt_debug(task, "conf joint %p count: %D", joint, joint->count);
3738 
3739     if (--joint->count != 0) {
3740         return;
3741     }
3742 
3743     nxt_queue_remove(&joint->link);
3744 
3745     /*
3746      * The joint content can not be safely used after the critical
3747      * section protected by the spinlock because its memory pool may
3748      * be already destroyed by another thread.
3749      */
3750     skcf = joint->socket_conf;
3751     rtcf = skcf->router_conf;
3752     lock = &rtcf->router->lock;
3753 
3754     nxt_thread_spin_lock(lock);
3755 
3756     nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count,
3757               rtcf, rtcf->count);
3758 
3759     if (--skcf->count != 0) {
3760         skcf = NULL;
3761         rtcf = NULL;
3762 
3763     } else {
3764         nxt_queue_remove(&skcf->link);
3765 
3766         if (--rtcf->count != 0) {
3767             rtcf = NULL;
3768         }
3769     }
3770 
3771     nxt_thread_spin_unlock(lock);
3772 
3773 #if (NXT_TLS)
3774     if (skcf != NULL && skcf->tls != NULL) {
3775         task->thread->runtime->tls->server_free(task, skcf->tls);
3776     }
3777 #endif
3778 
3779     /* TODO remove engine->port */
3780 
3781     if (rtcf != NULL) {
3782         nxt_debug(task, "old router conf is destroyed");
3783 
3784         nxt_router_apps_hash_use(task, rtcf, -1);
3785 
3786         nxt_router_access_log_release(task, lock, rtcf->access_log);
3787 
3788         nxt_mp_thread_adopt(rtcf->mem_pool);
3789 
3790         nxt_mp_destroy(rtcf->mem_pool);
3791     }
3792 }
3793 
3794 
3795 static void
3796 nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data)
3797 {
3798     nxt_port_t           *port;
3799     nxt_thread_link_t    *link;
3800     nxt_event_engine_t   *engine;
3801     nxt_thread_handle_t  handle;
3802 
3803     handle = (nxt_thread_handle_t) (uintptr_t) obj;
3804     link = data;
3805 
3806     nxt_thread_wait(handle);
3807 
3808     engine = link->engine;
3809 
3810     nxt_queue_remove(&engine->link);
3811 
3812     port = engine->port;
3813 
3814     // TODO notify all apps
3815 
3816     port->engine = task->thread->engine;
3817     nxt_mp_thread_adopt(port->mem_pool);
3818     nxt_port_use(task, port, -1);
3819 
3820     nxt_mp_thread_adopt(engine->mem_pool);
3821     nxt_mp_destroy(engine->mem_pool);
3822 
3823     nxt_event_engine_free(engine);
3824 
3825     nxt_free(link);
3826 }
3827 
3828 
3829 static void
3830 nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3831     void *data)
3832 {
3833     size_t                  b_size, count;
3834     nxt_int_t               ret;
3835     nxt_app_t               *app;
3836     nxt_buf_t               *b, *next;
3837     nxt_port_t              *app_port;
3838     nxt_unit_field_t        *f;
3839     nxt_http_field_t        *field;
3840     nxt_http_request_t      *r;
3841     nxt_unit_response_t     *resp;
3842     nxt_request_rpc_data_t  *req_rpc_data;
3843 
3844     req_rpc_data = data;
3845 
3846     r = req_rpc_data->request;
3847     if (nxt_slow_path(r == NULL)) {
3848         return;
3849     }
3850 
3851     if (r->error) {
3852         nxt_request_rpc_data_unlink(task, req_rpc_data);
3853         return;
3854     }
3855 
3856     app = req_rpc_data->app;
3857     nxt_assert(app != NULL);
3858 
3859     if (msg->port_msg.type == _NXT_PORT_MSG_REQ_HEADERS_ACK) {
3860         nxt_router_req_headers_ack_handler(task, msg, req_rpc_data);
3861 
3862         return;
3863     }
3864 
3865     b = (msg->size == 0) ? NULL : msg->buf;
3866 
3867     if (msg->port_msg.last != 0) {
3868         nxt_debug(task, "router data create last buf");
3869 
3870         nxt_buf_chain_add(&b, nxt_http_buf_last(r));
3871 
3872         req_rpc_data->rpc_cancel = 0;
3873 
3874         if (req_rpc_data->apr_action == NXT_APR_REQUEST_FAILED) {
3875             req_rpc_data->apr_action = NXT_APR_GOT_RESPONSE;
3876         }
3877 
3878         nxt_request_rpc_data_unlink(task, req_rpc_data);
3879 
3880     } else {
3881         if (app->timeout != 0) {
3882             r->timer.handler = nxt_router_app_timeout;
3883             r->timer_data = req_rpc_data;
3884             nxt_timer_add(task->thread->engine, &r->timer, app->timeout);
3885         }
3886     }
3887 
3888     if (b == NULL) {
3889         return;
3890     }
3891 
3892     if (msg->buf == b) {
3893         /* Disable instant buffer completion/re-using by port. */
3894         msg->buf = NULL;
3895     }
3896 
3897     if (r->header_sent) {
3898         nxt_buf_chain_add(&r->out, b);
3899         nxt_http_request_send_body(task, r, NULL);
3900 
3901     } else {
3902         b_size = nxt_buf_is_mem(b) ? nxt_buf_mem_used_size(&b->mem) : 0;
3903 
3904         if (nxt_slow_path(b_size < sizeof(nxt_unit_response_t))) {
3905             nxt_alert(task, "response buffer too small: %z", b_size);
3906             goto fail;
3907         }
3908 
3909         resp = (void *) b->mem.pos;
3910         count = (b_size - sizeof(nxt_unit_response_t))
3911                     / sizeof(nxt_unit_field_t);
3912 
3913         if (nxt_slow_path(count < resp->fields_count)) {
3914             nxt_alert(task, "response buffer too small for fields count: %D",
3915                       resp->fields_count);
3916             goto fail;
3917         }
3918 
3919         field = NULL;
3920 
3921         for (f = resp->fields; f < resp->fields + resp->fields_count; f++) {
3922             if (f->skip) {
3923                 continue;
3924             }
3925 
3926             field = nxt_list_add(r->resp.fields);
3927 
3928             if (nxt_slow_path(field == NULL)) {
3929                 goto fail;
3930             }
3931 
3932             field->hash = f->hash;
3933             field->skip = 0;
3934             field->hopbyhop = 0;
3935 
3936             field->name_length = f->name_length;
3937             field->value_length = f->value_length;
3938             field->name = nxt_unit_sptr_get(&f->name);
3939             field->value = nxt_unit_sptr_get(&f->value);
3940 
3941             ret = nxt_http_field_process(field, &nxt_response_fields_hash, r);
3942             if (nxt_slow_path(ret != NXT_OK)) {
3943                 goto fail;
3944             }
3945 
3946             nxt_debug(task, "header%s: %*s: %*s",
3947                       (field->skip ? " skipped" : ""),
3948                       (size_t) field->name_length, field->name,
3949                       (size_t) field->value_length, field->value);
3950 
3951             if (field->skip) {
3952                 r->resp.fields->last->nelts--;
3953             }
3954         }
3955 
3956         r->status = resp->status;
3957 
3958         if (resp->piggyback_content_length != 0) {
3959             b->mem.pos = nxt_unit_sptr_get(&resp->piggyback_content);
3960             b->mem.free = b->mem.pos + resp->piggyback_content_length;
3961 
3962         } else {
3963             b->mem.pos = b->mem.free;
3964         }
3965 
3966         if (nxt_buf_mem_used_size(&b->mem) == 0) {
3967             next = b->next;
3968             b->next = NULL;
3969 
3970             nxt_work_queue_add(&task->thread->engine->fast_work_queue,
3971                                b->completion_handler, task, b, b->parent);
3972 
3973             b = next;
3974         }
3975 
3976         if (b != NULL) {
3977             nxt_buf_chain_add(&r->out, b);
3978         }
3979 
3980         nxt_http_request_header_send(task, r, nxt_http_request_send_body, NULL);
3981 
3982         if (r->websocket_handshake
3983             && r->status == NXT_HTTP_SWITCHING_PROTOCOLS)
3984         {
3985             app_port = req_rpc_data->app_port;
3986             if (nxt_slow_path(app_port == NULL)) {
3987                 goto fail;
3988             }
3989 
3990             nxt_thread_mutex_lock(&app->mutex);
3991 
3992             app_port->main_app_port->active_websockets++;
3993 
3994             nxt_thread_mutex_unlock(&app->mutex);
3995 
3996             nxt_router_app_port_release(task, app, app_port, NXT_APR_UPGRADE);
3997             req_rpc_data->apr_action = NXT_APR_CLOSE;
3998 
3999             nxt_debug(task, "stream #%uD upgrade", req_rpc_data->stream);
4000 
4001             r->state = &nxt_http_websocket;
4002 
4003         } else {
4004             r->state = &nxt_http_request_send_state;
4005         }
4006     }
4007 
4008     return;
4009 
4010 fail:
4011 
4012     nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
4013 
4014     nxt_request_rpc_data_unlink(task, req_rpc_data);
4015 }
4016 
4017 
4018 static void
4019 nxt_router_req_headers_ack_handler(nxt_task_t *task,
4020     nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data)
4021 {
4022     int                 res;
4023     nxt_app_t           *app;
4024     nxt_buf_t           *b;
4025     nxt_bool_t          start_process, unlinked;
4026     nxt_port_t          *app_port, *main_app_port, *idle_port;
4027     nxt_queue_link_t    *idle_lnk;
4028     nxt_http_request_t  *r;
4029 
4030     nxt_debug(task, "stream #%uD: got ack from %PI:%d",
4031               req_rpc_data->stream,
4032               msg->port_msg.pid, msg->port_msg.reply_port);
4033 
4034     nxt_port_rpc_ex_set_peer(task, msg->port, req_rpc_data,
4035                              msg->port_msg.pid);
4036 
4037     app = req_rpc_data->app;
4038     r = req_rpc_data->request;
4039 
4040     start_process = 0;
4041     unlinked = 0;
4042 
4043     nxt_thread_mutex_lock(&app->mutex);
4044 
4045     if (r->app_link.next != NULL) {
4046         nxt_queue_remove(&r->app_link);
4047         r->app_link.next = NULL;
4048 
4049         unlinked = 1;
4050     }
4051 
4052     app_port = nxt_port_hash_find(&app->port_hash, msg->port_msg.pid,
4053                                   msg->port_msg.reply_port);
4054     if (nxt_slow_path(app_port == NULL)) {
4055         nxt_thread_mutex_unlock(&app->mutex);
4056 
4057         nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
4058 
4059         if (unlinked) {
4060             nxt_mp_release(r->mem_pool);
4061         }
4062 
4063         return;
4064     }
4065 
4066     main_app_port = app_port->main_app_port;
4067 
4068     if (nxt_queue_chk_remove(&main_app_port->idle_link)) {
4069         app->idle_processes--;
4070 
4071         nxt_debug(task, "app '%V' move port %PI:%d out of %s (ack)",
4072                   &app->name, main_app_port->pid, main_app_port->id,
4073                   (main_app_port->idle_start ? "idle_ports" : "spare_ports"));
4074 
4075         /* Check port was in 'spare_ports' using idle_start field. */
4076         if (main_app_port->idle_start == 0
4077             && app->idle_processes >= app->spare_processes)
4078         {
4079             /*
4080              * If there is a vacant space in spare ports,
4081              * move the last idle to spare_ports.
4082              */
4083             nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4084 
4085             idle_lnk = nxt_queue_last(&app->idle_ports);
4086             idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link);
4087             nxt_queue_remove(idle_lnk);
4088 
4089             nxt_queue_insert_tail(&app->spare_ports, idle_lnk);
4090 
4091             idle_port->idle_start = 0;
4092 
4093             nxt_debug(task, "app '%V' move port %PI:%d from idle_ports "
4094                       "to spare_ports",
4095                       &app->name, idle_port->pid, idle_port->id);
4096         }
4097 
4098         if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) {
4099             app->pending_processes++;
4100             start_process = 1;
4101         }
4102     }
4103 
4104     main_app_port->active_requests++;
4105 
4106     nxt_port_inc_use(app_port);
4107 
4108     nxt_thread_mutex_unlock(&app->mutex);
4109 
4110     if (unlinked) {
4111         nxt_mp_release(r->mem_pool);
4112     }
4113 
4114     if (start_process) {
4115         nxt_router_start_app_process(task, app);
4116     }
4117 
4118     nxt_port_use(task, req_rpc_data->app_port, -1);
4119 
4120     req_rpc_data->app_port = app_port;
4121 
4122     b = req_rpc_data->msg_info.buf;
4123 
4124     if (b != NULL) {
4125         /* First buffer is already sent.  Start from second. */
4126         b = b->next;
4127 
4128         req_rpc_data->msg_info.buf->next = NULL;
4129     }
4130 
4131     if (req_rpc_data->msg_info.body_fd != -1 || b != NULL) {
4132         nxt_debug(task, "stream #%uD: send body fd %d", req_rpc_data->stream,
4133                   req_rpc_data->msg_info.body_fd);
4134 
4135         if (req_rpc_data->msg_info.body_fd != -1) {
4136             lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET);
4137         }
4138 
4139         res = nxt_port_socket_write(task, app_port, NXT_PORT_MSG_REQ_BODY,
4140                                     req_rpc_data->msg_info.body_fd,
4141                                     req_rpc_data->stream,
4142                                     task->thread->engine->port->id, b);
4143 
4144         if (nxt_slow_path(res != NXT_OK)) {
4145             nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
4146         }
4147     }
4148 
4149     if (app->timeout != 0) {
4150         r->timer.handler = nxt_router_app_timeout;
4151         r->timer_data = req_rpc_data;
4152         nxt_timer_add(task->thread->engine, &r->timer, app->timeout);
4153     }
4154 }
4155 
4156 
4157 static const nxt_http_request_state_t  nxt_http_request_send_state
4158     nxt_aligned(64) =
4159 {
4160     .error_handler = nxt_http_request_error_handler,
4161 };
4162 
4163 
4164 static void
4165 nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data)
4166 {
4167     nxt_buf_t           *out;
4168     nxt_http_request_t  *r;
4169 
4170     r = obj;
4171 
4172     out = r->out;
4173 
4174     if (out != NULL) {
4175         r->out = NULL;
4176         nxt_http_request_send(task, r, out);
4177     }
4178 }
4179 
4180 
4181 static void
4182 nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4183     void *data)
4184 {
4185     nxt_request_rpc_data_t  *req_rpc_data;
4186 
4187     req_rpc_data = data;
4188 
4189     req_rpc_data->rpc_cancel = 0;
4190 
4191     /* TODO cancel message and return if cancelled. */
4192     // nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream);
4193 
4194     if (req_rpc_data->request != NULL) {
4195         nxt_http_request_error(task, req_rpc_data->request,
4196                                NXT_HTTP_SERVICE_UNAVAILABLE);
4197     }
4198 
4199     nxt_request_rpc_data_unlink(task, req_rpc_data);
4200 }
4201 
4202 
4203 static void
4204 nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4205     void *data)
4206 {
4207     uint32_t             n;
4208     nxt_app_t            *app;
4209     nxt_bool_t           start_process, restarted;
4210     nxt_port_t           *port;
4211     nxt_app_joint_t      *app_joint;
4212     nxt_app_joint_rpc_t  *app_joint_rpc;
4213 
4214     nxt_assert(data != NULL);
4215 
4216     app_joint_rpc = data;
4217     app_joint = app_joint_rpc->app_joint;
4218     port = msg->u.new_port;
4219 
4220     nxt_assert(app_joint != NULL);
4221     nxt_assert(port != NULL);
4222     nxt_assert(port->id == 0);
4223 
4224     app = app_joint->app;
4225 
4226     nxt_router_app_joint_use(task, app_joint, -1);
4227 
4228     if (nxt_slow_path(app == NULL)) {
4229         nxt_debug(task, "new port ready for released app, send QUIT");
4230 
4231         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4232 
4233         return;
4234     }
4235 
4236     nxt_thread_mutex_lock(&app->mutex);
4237 
4238     restarted = (app->generation != app_joint_rpc->generation);
4239 
4240     if (app_joint_rpc->proto) {
4241         nxt_assert(app->proto_port == NULL);
4242         nxt_assert(port->type == NXT_PROCESS_PROTOTYPE);
4243 
4244         n = app->proto_port_requests;
4245         app->proto_port_requests = 0;
4246 
4247         if (nxt_slow_path(restarted)) {
4248             nxt_thread_mutex_unlock(&app->mutex);
4249 
4250             nxt_debug(task, "proto port ready for restarted app, send QUIT");
4251 
4252             nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0,
4253                                   NULL);
4254 
4255         } else {
4256             port->app = app;
4257             app->proto_port = port;
4258 
4259             nxt_thread_mutex_unlock(&app->mutex);
4260 
4261             nxt_port_use(task, port, 1);
4262         }
4263 
4264         port = task->thread->runtime->port_by_type[NXT_PROCESS_ROUTER];
4265 
4266         while (n > 0) {
4267             nxt_router_app_use(task, app, 1);
4268 
4269             nxt_router_start_app_process_handler(task, port, app);
4270 
4271             n--;
4272         }
4273 
4274         return;
4275     }
4276 
4277     nxt_assert(port->type == NXT_PROCESS_APP);
4278     nxt_assert(app->pending_processes != 0);
4279 
4280     app->pending_processes--;
4281 
4282     if (nxt_slow_path(restarted)) {
4283         nxt_debug(task, "new port ready for restarted app, send QUIT");
4284 
4285         start_process = !task->thread->engine->shutdown
4286                         && nxt_router_app_can_start(app)
4287                         && nxt_router_app_need_start(app);
4288 
4289         if (start_process) {
4290             app->pending_processes++;
4291         }
4292 
4293         nxt_thread_mutex_unlock(&app->mutex);
4294 
4295         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4296 
4297         if (start_process) {
4298             nxt_router_start_app_process(task, app);
4299         }
4300 
4301         return;
4302     }
4303 
4304     port->app = app;
4305     port->main_app_port = port;
4306 
4307     app->processes++;
4308     nxt_port_hash_add(&app->port_hash, port);
4309     app->port_hash_count++;
4310 
4311     nxt_thread_mutex_unlock(&app->mutex);
4312 
4313     nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d",
4314               &app->name, port->pid, app->processes, app->pending_processes);
4315 
4316     nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
4317 
4318     nxt_router_app_port_release(task, app, port, NXT_APR_NEW_PORT);
4319 }
4320 
4321 
4322 static void
4323 nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4324     void *data)
4325 {
4326     nxt_app_t            *app;
4327     nxt_app_joint_t      *app_joint;
4328     nxt_queue_link_t     *link;
4329     nxt_http_request_t   *r;
4330     nxt_app_joint_rpc_t  *app_joint_rpc;
4331 
4332     nxt_assert(data != NULL);
4333 
4334     app_joint_rpc = data;
4335     app_joint = app_joint_rpc->app_joint;
4336 
4337     nxt_assert(app_joint != NULL);
4338 
4339     app = app_joint->app;
4340 
4341     nxt_router_app_joint_use(task, app_joint, -1);
4342 
4343     if (nxt_slow_path(app == NULL)) {
4344         nxt_debug(task, "start error for released app");
4345 
4346         return;
4347     }
4348 
4349     nxt_debug(task, "app '%V' %p start error", &app->name, app);
4350 
4351     link = NULL;
4352 
4353     nxt_thread_mutex_lock(&app->mutex);
4354 
4355     nxt_assert(app->pending_processes != 0);
4356 
4357     app->pending_processes--;
4358 
4359     if (app->processes == 0 && !nxt_queue_is_empty(&app->ack_waiting_req)) {
4360         link = nxt_queue_first(&app->ack_waiting_req);
4361 
4362         nxt_queue_remove(link);
4363         link->next = NULL;
4364     }
4365 
4366     nxt_thread_mutex_unlock(&app->mutex);
4367 
4368     while (link != NULL) {
4369         r = nxt_container_of(link, nxt_http_request_t, app_link);
4370 
4371         nxt_event_engine_post(r->engine, &r->err_work);
4372 
4373         link = NULL;
4374 
4375         nxt_thread_mutex_lock(&app->mutex);
4376 
4377         if (app->processes == 0 && app->pending_processes == 0
4378             && !nxt_queue_is_empty(&app->ack_waiting_req))
4379         {
4380             link = nxt_queue_first(&app->ack_waiting_req);
4381 
4382             nxt_queue_remove(link);
4383             link->next = NULL;
4384         }
4385 
4386         nxt_thread_mutex_unlock(&app->mutex);
4387     }
4388 }
4389 
4390 
4391 nxt_inline nxt_port_t *
4392 nxt_router_app_get_port_for_quit(nxt_task_t *task, nxt_app_t *app)
4393 {
4394     nxt_port_t  *port;
4395 
4396     port = NULL;
4397 
4398     nxt_thread_mutex_lock(&app->mutex);
4399 
4400     nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
4401 
4402         /* Caller is responsible to decrease port use count. */
4403         nxt_queue_chk_remove(&port->app_link);
4404 
4405         if (nxt_queue_chk_remove(&port->idle_link)) {
4406             app->idle_processes--;
4407 
4408             nxt_debug(task, "app '%V' move port %PI:%d out of %s for quit",
4409                       &app->name, port->pid, port->id,
4410                       (port->idle_start ? "idle_ports" : "spare_ports"));
4411         }
4412 
4413         nxt_port_hash_remove(&app->port_hash, port);
4414         app->port_hash_count--;
4415 
4416         port->app = NULL;
4417         app->processes--;
4418 
4419         break;
4420 
4421     } nxt_queue_loop;
4422 
4423     nxt_thread_mutex_unlock(&app->mutex);
4424 
4425     return port;
4426 }
4427 
4428 
4429 static void
4430 nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i)
4431 {
4432     int  c;
4433 
4434     c = nxt_atomic_fetch_add(&app->use_count, i);
4435 
4436     if (i < 0 && c == -i) {
4437 
4438         if (task->thread->engine != app->engine) {
4439             nxt_event_engine_post(app->engine, &app->joint->free_app_work);
4440 
4441         } else {
4442             nxt_router_free_app(task, app->joint, NULL);
4443         }
4444     }
4445 }
4446 
4447 
4448 static void
4449 nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app)
4450 {
4451     nxt_debug(task, "app '%V' %p unlink", &app->name, app);
4452 
4453     nxt_queue_remove(&app->link);
4454 
4455     nxt_router_app_use(task, app, -1);
4456 }
4457 
4458 
4459 static void
4460 nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app, nxt_port_t *port,
4461     nxt_apr_action_t action)
4462 {
4463     int         inc_use;
4464     uint32_t    got_response, dec_requests;
4465     nxt_bool_t  adjust_idle_timer;
4466     nxt_port_t  *main_app_port;
4467 
4468     nxt_assert(port != NULL);
4469 
4470     inc_use = 0;
4471     got_response = 0;
4472     dec_requests = 0;
4473 
4474     switch (action) {
4475     case NXT_APR_NEW_PORT:
4476         break;
4477     case NXT_APR_REQUEST_FAILED:
4478         dec_requests = 1;
4479         inc_use = -1;
4480         break;
4481     case NXT_APR_GOT_RESPONSE:
4482         got_response = 1;
4483         inc_use = -1;
4484         break;
4485     case NXT_APR_UPGRADE:
4486         got_response = 1;
4487         break;
4488     case NXT_APR_CLOSE:
4489         inc_use = -1;
4490         break;
4491     }
4492 
4493     nxt_debug(task, "app '%V' release port %PI:%d: %d %d", &app->name,
4494               port->pid, port->id,
4495               (int) inc_use, (int) got_response);
4496 
4497     if (port->id == NXT_SHARED_PORT_ID) {
4498         nxt_thread_mutex_lock(&app->mutex);
4499 
4500         app->active_requests -= got_response + dec_requests;
4501 
4502         nxt_thread_mutex_unlock(&app->mutex);
4503 
4504         goto adjust_use;
4505     }
4506 
4507     main_app_port = port->main_app_port;
4508 
4509     nxt_thread_mutex_lock(&app->mutex);
4510 
4511     main_app_port->active_requests -= got_response + dec_requests;
4512     app->active_requests -= got_response + dec_requests;
4513 
4514     if (main_app_port->pair[1] != -1 && main_app_port->app_link.next == NULL) {
4515         nxt_queue_insert_tail(&app->ports, &main_app_port->app_link);
4516 
4517         nxt_port_inc_use(main_app_port);
4518     }
4519 
4520     adjust_idle_timer = 0;
4521 
4522     if (main_app_port->pair[1] != -1
4523         && main_app_port->active_requests == 0
4524         && main_app_port->active_websockets == 0
4525         && main_app_port->idle_link.next == NULL)
4526     {
4527         if (app->idle_processes == app->spare_processes
4528             && app->adjust_idle_work.data == NULL)
4529         {
4530             adjust_idle_timer = 1;
4531             app->adjust_idle_work.data = app;
4532             app->adjust_idle_work.next = NULL;
4533         }
4534 
4535         if (app->idle_processes < app->spare_processes) {
4536             nxt_queue_insert_tail(&app->spare_ports, &main_app_port->idle_link);
4537 
4538             nxt_debug(task, "app '%V' move port %PI:%d to spare_ports",
4539                       &app->name, main_app_port->pid, main_app_port->id);
4540         } else {
4541             nxt_queue_insert_tail(&app->idle_ports, &main_app_port->idle_link);
4542 
4543             main_app_port->idle_start = task->thread->engine->timers.now;
4544 
4545             nxt_debug(task, "app '%V' move port %PI:%d to idle_ports",
4546                       &app->name, main_app_port->pid, main_app_port->id);
4547         }
4548 
4549         app->idle_processes++;
4550     }
4551 
4552     nxt_thread_mutex_unlock(&app->mutex);
4553 
4554     if (adjust_idle_timer) {
4555         nxt_router_app_use(task, app, 1);
4556         nxt_event_engine_post(app->engine, &app->adjust_idle_work);
4557     }
4558 
4559     /* ? */
4560     if (main_app_port->pair[1] == -1) {
4561         nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)",
4562                   &app->name, app, main_app_port, main_app_port->pid);
4563 
4564         goto adjust_use;
4565     }
4566 
4567     nxt_debug(task, "app '%V' %p requests queue is empty, keep the port",
4568               &app->name, app);
4569 
4570 adjust_use:
4571 
4572     nxt_port_use(task, port, inc_use);
4573 }
4574 
4575 
4576 void
4577 nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port)
4578 {
4579     nxt_app_t         *app;
4580     nxt_bool_t        unchain, start_process;
4581     nxt_port_t        *idle_port;
4582     nxt_queue_link_t  *idle_lnk;
4583 
4584     app = port->app;
4585 
4586     nxt_assert(app != NULL);
4587 
4588     nxt_thread_mutex_lock(&app->mutex);
4589 
4590     if (port == app->proto_port) {
4591         app->proto_port = NULL;
4592         port->app = NULL;
4593 
4594         nxt_thread_mutex_unlock(&app->mutex);
4595 
4596         nxt_debug(task, "app '%V' prototype pid %PI closed", &app->name,
4597                   port->pid);
4598 
4599         nxt_port_use(task, port, -1);
4600 
4601         return;
4602     }
4603 
4604     nxt_port_hash_remove(&app->port_hash, port);
4605     app->port_hash_count--;
4606 
4607     if (port->id != 0) {
4608         nxt_thread_mutex_unlock(&app->mutex);
4609 
4610         nxt_debug(task, "app '%V' port (%PI, %d) closed", &app->name,
4611                   port->pid, port->id);
4612 
4613         return;
4614     }
4615 
4616     unchain = nxt_queue_chk_remove(&port->app_link);
4617 
4618     if (nxt_queue_chk_remove(&port->idle_link)) {
4619         app->idle_processes--;
4620 
4621         nxt_debug(task, "app '%V' move port %PI:%d out of %s before close",
4622                   &app->name, port->pid, port->id,
4623                   (port->idle_start ? "idle_ports" : "spare_ports"));
4624 
4625         if (port->idle_start == 0
4626             && app->idle_processes >= app->spare_processes)
4627         {
4628             nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4629 
4630             idle_lnk = nxt_queue_last(&app->idle_ports);
4631             idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link);
4632             nxt_queue_remove(idle_lnk);
4633 
4634             nxt_queue_insert_tail(&app->spare_ports, idle_lnk);
4635 
4636             idle_port->idle_start = 0;
4637 
4638             nxt_debug(task, "app '%V' move port %PI:%d from idle_ports "
4639                       "to spare_ports",
4640                       &app->name, idle_port->pid, idle_port->id);
4641         }
4642     }
4643 
4644     app->processes--;
4645 
4646     start_process = !task->thread->engine->shutdown
4647                     && nxt_router_app_can_start(app)
4648                     && nxt_router_app_need_start(app);
4649 
4650     if (start_process) {
4651         app->pending_processes++;
4652     }
4653 
4654     nxt_thread_mutex_unlock(&app->mutex);
4655 
4656     nxt_debug(task, "app '%V' pid %PI closed", &app->name, port->pid);
4657 
4658     if (unchain) {
4659         nxt_port_use(task, port, -1);
4660     }
4661 
4662     if (start_process) {
4663         nxt_router_start_app_process(task, app);
4664     }
4665 }
4666 
4667 
4668 static void
4669 nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data)
4670 {
4671     nxt_app_t           *app;
4672     nxt_bool_t          queued;
4673     nxt_port_t          *port;
4674     nxt_msec_t          timeout, threshold;
4675     nxt_queue_link_t    *lnk;
4676     nxt_event_engine_t  *engine;
4677 
4678     app = obj;
4679     queued = (data == app);
4680 
4681     nxt_debug(task, "nxt_router_adjust_idle_timer: app \"%V\", queued %b",
4682               &app->name, queued);
4683 
4684     engine = task->thread->engine;
4685 
4686     nxt_assert(app->engine == engine);
4687 
4688     threshold = engine->timers.now + app->joint->idle_timer.bias;
4689     timeout = 0;
4690 
4691     nxt_thread_mutex_lock(&app->mutex);
4692 
4693     if (queued) {
4694         app->adjust_idle_work.data = NULL;
4695     }
4696 
4697     nxt_debug(task, "app '%V' idle_processes %d, spare_processes %d",
4698               &app->name,
4699               (int) app->idle_processes, (int) app->spare_processes);
4700 
4701     while (app->idle_processes > app->spare_processes) {
4702 
4703         nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4704 
4705         lnk = nxt_queue_first(&app->idle_ports);
4706         port = nxt_queue_link_data(lnk, nxt_port_t, idle_link);
4707 
4708         timeout = port->idle_start + app->idle_timeout;
4709 
4710         nxt_debug(task, "app '%V' pid %PI, start %M, timeout %M, threshold %M",
4711                   &app->name, port->pid,
4712                   port->idle_start, timeout, threshold);
4713 
4714         if (timeout > threshold) {
4715             break;
4716         }
4717 
4718         nxt_queue_remove(lnk);
4719         lnk->next = NULL;
4720 
4721         nxt_debug(task, "app '%V' move port %PI:%d out of idle_ports (timeout)",
4722                   &app->name, port->pid, port->id);
4723 
4724         nxt_queue_chk_remove(&port->app_link);
4725 
4726         nxt_port_hash_remove(&app->port_hash, port);
4727         app->port_hash_count--;
4728 
4729         app->idle_processes--;
4730         app->processes--;
4731         port->app = NULL;
4732 
4733         nxt_thread_mutex_unlock(&app->mutex);
4734 
4735         nxt_debug(task, "app '%V' send QUIT to idle port %PI",
4736                   &app->name, port->pid);
4737 
4738         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4739 
4740         nxt_port_use(task, port, -1);
4741 
4742         nxt_thread_mutex_lock(&app->mutex);
4743     }
4744 
4745     nxt_thread_mutex_unlock(&app->mutex);
4746 
4747     if (timeout > threshold) {
4748         nxt_timer_add(engine, &app->joint->idle_timer, timeout - threshold);
4749 
4750     } else {
4751         nxt_timer_disable(engine, &app->joint->idle_timer);
4752     }
4753 
4754     if (queued) {
4755         nxt_router_app_use(task, app, -1);
4756     }
4757 }
4758 
4759 
4760 static void
4761 nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, void *data)
4762 {
4763     nxt_timer_t      *timer;
4764     nxt_app_joint_t  *app_joint;
4765 
4766     timer = obj;
4767     app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer);
4768 
4769     if (nxt_fast_path(app_joint->app != NULL)) {
4770         nxt_router_adjust_idle_timer(task, app_joint->app, NULL);
4771     }
4772 }
4773 
4774 
4775 static void
4776 nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, void *data)
4777 {
4778     nxt_timer_t      *timer;
4779     nxt_app_joint_t  *app_joint;
4780 
4781     timer = obj;
4782     app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer);
4783 
4784     nxt_router_app_joint_use(task, app_joint, -1);
4785 }
4786 
4787 
4788 static void
4789 nxt_router_free_app(nxt_task_t *task, void *obj, void *data)
4790 {
4791     nxt_app_t        *app;
4792     nxt_port_t       *port, *proto_port;
4793     nxt_app_joint_t  *app_joint;
4794 
4795     app_joint = obj;
4796     app = app_joint->app;
4797 
4798     for ( ;; ) {
4799         port = nxt_router_app_get_port_for_quit(task, app);
4800         if (port == NULL) {
4801             break;
4802         }
4803 
4804         nxt_port_use(task, port, -1);
4805     }
4806 
4807     nxt_thread_mutex_lock(&app->mutex);
4808 
4809     for ( ;; ) {
4810         port = nxt_port_hash_retrieve(&app->port_hash);
4811         if (port == NULL) {
4812             break;
4813         }
4814 
4815         app->port_hash_count--;
4816 
4817         port->app = NULL;
4818 
4819         nxt_port_close(task, port);
4820 
4821         nxt_port_use(task, port, -1);
4822     }
4823 
4824     proto_port = app->proto_port;
4825 
4826     if (proto_port != NULL) {
4827         nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name,
4828                   proto_port->pid);
4829 
4830         app->proto_port = NULL;
4831         proto_port->app = NULL;
4832     }
4833 
4834     nxt_thread_mutex_unlock(&app->mutex);
4835 
4836     if (proto_port != NULL) {
4837         nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT,
4838                               -1, 0, 0, NULL);
4839 
4840         nxt_port_close(task, proto_port);
4841 
4842         nxt_port_use(task, proto_port, -1);
4843     }
4844 
4845     nxt_assert(app->proto_port == NULL);
4846     nxt_assert(app->processes == 0);
4847     nxt_assert(app->active_requests == 0);
4848     nxt_assert(app->port_hash_count == 0);
4849     nxt_assert(app->idle_processes == 0);
4850     nxt_assert(nxt_queue_is_empty(&app->ports));
4851     nxt_assert(nxt_queue_is_empty(&app->spare_ports));
4852     nxt_assert(nxt_queue_is_empty(&app->idle_ports));
4853 
4854     nxt_port_mmaps_destroy(&app->outgoing, 1);
4855 
4856     nxt_thread_mutex_destroy(&app->outgoing.mutex);
4857 
4858     if (app->shared_port != NULL) {
4859         app->shared_port->app = NULL;
4860         nxt_port_close(task, app->shared_port);
4861         nxt_port_use(task, app->shared_port, -1);
4862 
4863         app->shared_port = NULL;
4864     }
4865 
4866     nxt_thread_mutex_destroy(&app->mutex);
4867     nxt_mp_destroy(app->mem_pool);
4868 
4869     app_joint->app = NULL;
4870 
4871     if (nxt_timer_delete(task->thread->engine, &app_joint->idle_timer)) {
4872         app_joint->idle_timer.handler = nxt_router_app_joint_release_handler;
4873         nxt_timer_add(task->thread->engine, &app_joint->idle_timer, 0);
4874 
4875     } else {
4876         nxt_router_app_joint_use(task, app_joint, -1);
4877     }
4878 }
4879 
4880 
4881 static void
4882 nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
4883     nxt_request_rpc_data_t *req_rpc_data)
4884 {
4885     nxt_bool_t          start_process;
4886     nxt_port_t          *port;
4887     nxt_http_request_t  *r;
4888 
4889     start_process = 0;
4890 
4891     nxt_thread_mutex_lock(&app->mutex);
4892 
4893     port = app->shared_port;
4894     nxt_port_inc_use(port);
4895 
4896     app->active_requests++;
4897 
4898     if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) {
4899         app->pending_processes++;
4900         start_process = 1;
4901     }
4902 
4903     r = req_rpc_data->request;
4904 
4905     /*
4906      * Put request into application-wide list to be able to cancel request
4907      * if something goes wrong with application processes.
4908      */
4909     nxt_queue_insert_tail(&app->ack_waiting_req, &r->app_link);
4910 
4911     nxt_thread_mutex_unlock(&app->mutex);
4912 
4913     /*
4914      * Retain request memory pool while request is linked in ack_waiting_req
4915      * to guarantee request structure memory is accessble.
4916      */
4917     nxt_mp_retain(r->mem_pool);
4918 
4919     req_rpc_data->app_port = port;
4920     req_rpc_data->apr_action = NXT_APR_REQUEST_FAILED;
4921 
4922     if (start_process) {
4923         nxt_router_start_app_process(task, app);
4924     }
4925 }
4926 
4927 
4928 void
4929 nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r,
4930     nxt_http_action_t *action)
4931 {
4932     nxt_event_engine_t      *engine;
4933     nxt_http_app_conf_t     *conf;
4934     nxt_request_rpc_data_t  *req_rpc_data;
4935 
4936     conf = action->u.conf;
4937     engine = task->thread->engine;
4938 
4939     r->app_target = conf->target;
4940 
4941     req_rpc_data = nxt_port_rpc_register_handler_ex(task, engine->port,
4942                                           nxt_router_response_ready_handler,
4943                                           nxt_router_response_error_handler,
4944                                           sizeof(nxt_request_rpc_data_t));
4945     if (nxt_slow_path(req_rpc_data == NULL)) {
4946         nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
4947         return;
4948     }
4949 
4950     /*
4951      * At this point we have request req_rpc_data allocated and registered
4952      * in port handlers.  Need to fixup request memory pool.  Counterpart
4953      * release will be called via following call chain:
4954      *    nxt_request_rpc_data_unlink() ->
4955      *        nxt_router_http_request_release_post() ->
4956      *            nxt_router_http_request_release()
4957      */
4958     nxt_mp_retain(r->mem_pool);
4959 
4960     r->timer.task = &engine->task;
4961     r->timer.work_queue = &engine->fast_work_queue;
4962     r->timer.log = engine->task.log;
4963     r->timer.bias = NXT_TIMER_DEFAULT_BIAS;
4964 
4965     r->engine = engine;
4966     r->err_work.handler = nxt_router_http_request_error;
4967     r->err_work.task = task;
4968     r->err_work.obj = r;
4969 
4970     req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data);
4971     req_rpc_data->app = conf->app;
4972     req_rpc_data->msg_info.body_fd = -1;
4973     req_rpc_data->rpc_cancel = 1;
4974 
4975     nxt_router_app_use(task, conf->app, 1);
4976 
4977     req_rpc_data->request = r;
4978     r->req_rpc_data = req_rpc_data;
4979 
4980     if (r->last != NULL) {
4981         r->last->completion_handler = nxt_router_http_request_done;
4982     }
4983 
4984     nxt_router_app_port_get(task, conf->app, req_rpc_data);
4985     nxt_router_app_prepare_request(task, req_rpc_data);
4986 }
4987 
4988 
4989 static void
4990 nxt_router_http_request_error(nxt_task_t *task, void *obj, void *data)
4991 {
4992     nxt_http_request_t  *r;
4993 
4994     r = obj;
4995 
4996     nxt_debug(task, "router http request error (rpc_data %p)", r->req_rpc_data);
4997 
4998     nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
4999 
5000     if (r->req_rpc_data != NULL) {
5001         nxt_request_rpc_data_unlink(task, r->req_rpc_data);
5002     }
5003 
5004     nxt_mp_release(r->mem_pool);
5005 }
5006 
5007 
5008 static void
5009 nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data)
5010 {
5011     nxt_http_request_t  *r;
5012 
5013     r = data;
5014 
5015     nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data);
5016 
5017     if (r->req_rpc_data != NULL) {
5018         nxt_request_rpc_data_unlink(task, r->req_rpc_data);
5019     }
5020 
5021     nxt_http_request_close_handler(task, r, r->proto.any);
5022 }
5023 
5024 
5025 static void
5026 nxt_router_app_prepare_request(nxt_task_t *task,
5027     nxt_request_rpc_data_t *req_rpc_data)
5028 {
5029     nxt_app_t         *app;
5030     nxt_buf_t         *buf, *body;
5031     nxt_int_t         res;
5032     nxt_port_t        *port, *reply_port;
5033 
5034     int                   notify;
5035     struct {
5036         nxt_port_msg_t       pm;
5037         nxt_port_mmap_msg_t  mm;
5038     } msg;
5039 
5040 
5041     app = req_rpc_data->app;
5042 
5043     nxt_assert(app != NULL);
5044 
5045     port = req_rpc_data->app_port;
5046 
5047     nxt_assert(port != NULL);
5048     nxt_assert(port->queue != NULL);
5049 
5050     reply_port = task->thread->engine->port;
5051 
5052     buf = nxt_router_prepare_msg(task, req_rpc_data->request, app,
5053                                  nxt_app_msg_prefix[app->type]);
5054     if (nxt_slow_path(buf == NULL)) {
5055         nxt_alert(task, "stream #%uD, app '%V': failed to prepare app message",
5056                   req_rpc_data->stream, &app->name);
5057 
5058         nxt_http_request_error(task, req_rpc_data->request,
5059                                NXT_HTTP_INTERNAL_SERVER_ERROR);
5060 
5061         return;
5062     }
5063 
5064     nxt_debug(task, "about to send %O bytes buffer to app process port %d",
5065                     nxt_buf_used_size(buf),
5066                     port->socket.fd);
5067 
5068     req_rpc_data->msg_info.buf = buf;
5069 
5070     body = req_rpc_data->request->body;
5071 
5072     if (body != NULL && nxt_buf_is_file(body)) {
5073         req_rpc_data->msg_info.body_fd = body->file->fd;
5074 
5075         body->file->fd = -1;
5076 
5077     } else {
5078         req_rpc_data->msg_info.body_fd = -1;
5079     }
5080 
5081     msg.pm.stream = req_rpc_data->stream;
5082     msg.pm.pid = reply_port->pid;
5083     msg.pm.reply_port = reply_port->id;
5084     msg.pm.type = NXT_PORT_MSG_REQ_HEADERS;
5085     msg.pm.last = 0;
5086     msg.pm.mmap = 1;
5087     msg.pm.nf = 0;
5088     msg.pm.mf = 0;
5089 
5090     nxt_port_mmap_handler_t *mmap_handler = buf->parent;
5091     nxt_port_mmap_header_t *hdr = mmap_handler->hdr;
5092 
5093     msg.mm.mmap_id = hdr->id;
5094     msg.mm.chunk_id = nxt_port_mmap_chunk_id(hdr, buf->mem.pos);
5095     msg.mm.size = nxt_buf_used_size(buf);
5096 
5097     res = nxt_app_queue_send(port->queue, &msg, sizeof(msg),
5098                              req_rpc_data->stream, &notify,
5099                              &req_rpc_data->msg_info.tracking_cookie);
5100     if (nxt_fast_path(res == NXT_OK)) {
5101         if (notify != 0) {
5102             (void) nxt_port_socket_write(task, port,
5103                                          NXT_PORT_MSG_READ_QUEUE,
5104                                          -1, req_rpc_data->stream,
5105                                          reply_port->id, NULL);
5106 
5107         } else {
5108             nxt_debug(task, "queue is not empty");
5109         }
5110 
5111         buf->is_port_mmap_sent = 1;
5112         buf->mem.pos = buf->mem.free;
5113 
5114     } else {
5115         nxt_alert(task, "stream #%uD, app '%V': failed to send app message",
5116                   req_rpc_data->stream, &app->name);
5117 
5118         nxt_http_request_error(task, req_rpc_data->request,
5119                                NXT_HTTP_INTERNAL_SERVER_ERROR);
5120     }
5121 }
5122 
5123 
5124 struct nxt_fields_iter_s {
5125     nxt_list_part_t   *part;
5126     nxt_http_field_t  *field;
5127 };
5128 
5129 typedef struct nxt_fields_iter_s  nxt_fields_iter_t;
5130 
5131 
5132 static nxt_http_field_t *
5133 nxt_fields_part_first(nxt_list_part_t *part, nxt_fields_iter_t *i)
5134 {
5135     if (part == NULL) {
5136         return NULL;
5137     }
5138 
5139     while (part->nelts == 0) {
5140         part = part->next;
5141         if (part == NULL) {
5142             return NULL;
5143         }
5144     }
5145 
5146     i->part = part;
5147     i->field = nxt_list_data(i->part);
5148 
5149     return i->field;
5150 }
5151 
5152 
5153 static nxt_http_field_t *
5154 nxt_fields_first(nxt_list_t *fields, nxt_fields_iter_t *i)
5155 {
5156     return nxt_fields_part_first(nxt_list_part(fields), i);
5157 }
5158 
5159 
5160 static nxt_http_field_t *
5161 nxt_fields_next(nxt_fields_iter_t *i)
5162 {
5163     nxt_http_field_t  *end = nxt_list_data(i->part);
5164 
5165     end += i->part->nelts;
5166     i->field++;
5167 
5168     if (i->field < end) {
5169         return i->field;
5170     }
5171 
5172     return nxt_fields_part_first(i->part->next, i);
5173 }
5174 
5175 
5176 static nxt_buf_t *
5177 nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r,
5178     nxt_app_t *app, const nxt_str_t *prefix)
5179 {
5180     void                *target_pos, *query_pos;
5181     u_char              *pos, *end, *p, c;
5182     size_t              fields_count, req_size, size, free_size;
5183     size_t              copy_size;
5184     nxt_off_t           content_length;
5185     nxt_buf_t           *b, *buf, *out, **tail;
5186     nxt_http_field_t    *field, *dup;
5187     nxt_unit_field_t    *dst_field;
5188     nxt_fields_iter_t   iter, dup_iter;
5189     nxt_unit_request_t  *req;
5190 
5191     req_size = sizeof(nxt_unit_request_t)
5192                + r->method->length + 1
5193                + r->version.length + 1
5194                + r->remote->length + 1
5195                + r->local->length + 1
5196                + r->server_name.length + 1
5197                + r->target.length + 1
5198                + (r->path->start != r->target.start ? r->path->length + 1 : 0);
5199 
5200     content_length = r->content_length_n < 0 ? 0 : r->content_length_n;
5201     fields_count = 0;
5202 
5203     nxt_list_each(field, r->fields) {
5204         fields_count++;
5205 
5206         req_size += field->name_length + prefix->length + 1
5207                     + field->value_length + 1;
5208     } nxt_list_loop;
5209 
5210     req_size += fields_count * sizeof(nxt_unit_field_t);
5211 
5212     if (nxt_slow_path(req_size > PORT_MMAP_DATA_SIZE)) {
5213         nxt_alert(task, "headers to big to fit in shared memory (%d)",
5214                   (int) req_size);
5215 
5216         return NULL;
5217     }
5218 
5219     out = nxt_port_mmap_get_buf(task, &app->outgoing,
5220               nxt_min(req_size + content_length, PORT_MMAP_DATA_SIZE));
5221     if (nxt_slow_path(out == NULL)) {
5222         return NULL;
5223     }
5224 
5225     req = (nxt_unit_request_t *) out->mem.free;
5226     out->mem.free += req_size;
5227 
5228     req->app_target = r->app_target;
5229 
5230     req->content_length = content_length;
5231 
5232     p = (u_char *) (req->fields + fields_count);
5233 
5234     nxt_debug(task, "fields_count=%d", (int) fields_count);
5235 
5236     req->method_length = r->method->length;
5237     nxt_unit_sptr_set(&req->method, p);
5238     p = nxt_cpymem(p, r->method->start, r->method->length);
5239     *p++ = '\0';
5240 
5241     req->version_length = r->version.length;
5242     nxt_unit_sptr_set(&req->version, p);
5243     p = nxt_cpymem(p, r->version.start, r->version.length);
5244     *p++ = '\0';
5245 
5246     req->remote_length = r->remote->address_length;
5247     nxt_unit_sptr_set(&req->remote, p);
5248     p = nxt_cpymem(p, nxt_sockaddr_address(r->remote),
5249                    r->remote->address_length);
5250     *p++ = '\0';
5251 
5252     req->local_length = r->local->address_length;
5253     nxt_unit_sptr_set(&req->local, p);
5254     p = nxt_cpymem(p, nxt_sockaddr_address(r->local), r->local->address_length);
5255     *p++ = '\0';
5256 
5257     req->tls = r->tls;
5258     req->websocket_handshake = r->websocket_handshake;
5259 
5260     req->server_name_length = r->server_name.length;
5261     nxt_unit_sptr_set(&req->server_name, p);
5262     p = nxt_cpymem(p, r->server_name.start, r->server_name.length);
5263     *p++ = '\0';
5264 
5265     target_pos = p;
5266     req->target_length = (uint32_t) r->target.length;
5267     nxt_unit_sptr_set(&req->target, p);
5268     p = nxt_cpymem(p, r->target.start, r->target.length);
5269     *p++ = '\0';
5270 
5271     req->path_length = (uint32_t) r->path->length;
5272     if (r->path->start == r->target.start) {
5273         nxt_unit_sptr_set(&req->path, target_pos);
5274 
5275     } else {
5276         nxt_unit_sptr_set(&req->path, p);
5277         p = nxt_cpymem(p, r->path->start, r->path->length);
5278         *p++ = '\0';
5279     }
5280 
5281     req->query_length = (uint32_t) r->args->length;
5282     if (r->args->start != NULL) {
5283         query_pos = nxt_pointer_to(target_pos,
5284                                    r->args->start - r->target.start);
5285 
5286         nxt_unit_sptr_set(&req->query, query_pos);
5287 
5288     } else {
5289         req->query.offset = 0;
5290     }
5291 
5292     req->content_length_field = NXT_UNIT_NONE_FIELD;
5293     req->content_type_field   = NXT_UNIT_NONE_FIELD;
5294     req->cookie_field         = NXT_UNIT_NONE_FIELD;
5295     req->authorization_field  = NXT_UNIT_NONE_FIELD;
5296 
5297     dst_field = req->fields;
5298 
5299     for (field = nxt_fields_first(r->fields, &iter);
5300          field != NULL;
5301          field = nxt_fields_next(&iter))
5302     {
5303         if (field->skip) {
5304             continue;
5305         }
5306 
5307         dst_field->hash = field->hash;
5308         dst_field->skip = 0;
5309         dst_field->name_length = field->name_length + prefix->length;
5310         dst_field->value_length = field->value_length;
5311 
5312         if (field == r->content_length) {
5313             req->content_length_field = dst_field - req->fields;
5314 
5315         } else if (field == r->content_type) {
5316             req->content_type_field = dst_field - req->fields;
5317 
5318         } else if (field == r->cookie) {
5319             req->cookie_field = dst_field - req->fields;
5320 
5321         } else if (field == r->authorization) {
5322             req->authorization_field = dst_field - req->fields;
5323         }
5324 
5325         nxt_debug(task, "add field 0x%04Xd, %d, %d, %p : %d %p",
5326                   (int) field->hash, (int) field->skip,
5327                   (int) field->name_length, field->name,
5328                   (int) field->value_length, field->value);
5329 
5330         if (prefix->length != 0) {
5331             nxt_unit_sptr_set(&dst_field->name, p);
5332             p = nxt_cpymem(p, prefix->start, prefix->length);
5333 
5334             end = field->name + field->name_length;
5335             for (pos = field->name; pos < end; pos++) {
5336                 c = *pos;
5337 
5338                 if (c >= 'a' && c <= 'z') {
5339                     *p++ = (c & ~0x20);
5340                     continue;
5341                 }
5342 
5343                 if (c == '-') {
5344                     *p++ = '_';
5345                     continue;
5346                 }
5347 
5348                 *p++ = c;
5349             }
5350 
5351         } else {
5352             nxt_unit_sptr_set(&dst_field->name, p);
5353             p = nxt_cpymem(p, field->name, field->name_length);
5354         }
5355 
5356         *p++ = '\0';
5357 
5358         nxt_unit_sptr_set(&dst_field->value, p);
5359         p = nxt_cpymem(p, field->value, field->value_length);
5360 
5361         if (prefix->length != 0) {
5362             dup_iter = iter;
5363 
5364             for (dup = nxt_fields_next(&dup_iter);
5365                  dup != NULL;
5366                  dup = nxt_fields_next(&dup_iter))
5367             {
5368                 if (dup->name_length != field->name_length
5369                     || dup->skip
5370                     || dup->hash != field->hash
5371                     || nxt_memcasecmp(dup->name, field->name, dup->name_length))
5372                 {
5373                     continue;
5374                 }
5375 
5376                 p = nxt_cpymem(p, ", ", 2);
5377                 p = nxt_cpymem(p, dup->value, dup->value_length);
5378 
5379                 dst_field->value_length += 2 + dup->value_length;
5380 
5381                 dup->skip = 1;
5382             }
5383         }
5384 
5385         *p++ = '\0';
5386 
5387         dst_field++;
5388     }
5389 
5390     req->fields_count = (uint32_t) (dst_field - req->fields);
5391 
5392     nxt_unit_sptr_set(&req->preread_content, out->mem.free);
5393 
5394     buf = out;
5395     tail = &buf->next;
5396 
5397     for (b = r->body; b != NULL; b = b->next) {
5398         size = nxt_buf_mem_used_size(&b->mem);
5399         pos = b->mem.pos;
5400 
5401         while (size > 0) {
5402             if (buf == NULL) {
5403                 free_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
5404 
5405                 buf = nxt_port_mmap_get_buf(task, &app->outgoing, free_size);
5406                 if (nxt_slow_path(buf == NULL)) {
5407                     while (out != NULL) {
5408                         buf = out->next;
5409                         out->next = NULL;
5410                         out->completion_handler(task, out, out->parent);
5411                         out = buf;
5412                     }
5413                     return NULL;
5414                 }
5415 
5416                 *tail = buf;
5417                 tail = &buf->next;
5418 
5419             } else {
5420                 free_size = nxt_buf_mem_free_size(&buf->mem);
5421                 if (free_size < size
5422                     && nxt_port_mmap_increase_buf(task, buf, size, 1)
5423                        == NXT_OK)
5424                 {
5425                     free_size = nxt_buf_mem_free_size(&buf->mem);
5426                 }
5427             }
5428 
5429             if (free_size > 0) {
5430                 copy_size = nxt_min(free_size, size);
5431 
5432                 buf->mem.free = nxt_cpymem(buf->mem.free, pos, copy_size);
5433 
5434                 size -= copy_size;
5435                 pos += copy_size;
5436 
5437                 if (size == 0) {
5438                     break;
5439                 }
5440             }
5441 
5442             buf = NULL;
5443         }
5444     }
5445 
5446     return out;
5447 }
5448 
5449 
5450 static void
5451 nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
5452 {
5453     nxt_timer_t              *timer;
5454     nxt_http_request_t       *r;
5455     nxt_request_rpc_data_t   *req_rpc_data;
5456 
5457     timer = obj;
5458 
5459     nxt_debug(task, "router app timeout");
5460 
5461     r = nxt_timer_data(timer, nxt_http_request_t, timer);
5462     req_rpc_data = r->timer_data;
5463 
5464     nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
5465 
5466     nxt_request_rpc_data_unlink(task, req_rpc_data);
5467 }
5468 
5469 
5470 static void
5471 nxt_router_http_request_release_post(nxt_task_t *task, nxt_http_request_t *r)
5472 {
5473     r->timer.handler = nxt_router_http_request_release;
5474     nxt_timer_add(task->thread->engine, &r->timer, 0);
5475 }
5476 
5477 
5478 static void
5479 nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data)
5480 {
5481     nxt_http_request_t  *r;
5482 
5483     nxt_debug(task, "http request pool release");
5484 
5485     r = nxt_timer_data(obj, nxt_http_request_t, timer);
5486 
5487     nxt_mp_release(r->mem_pool);
5488 }
5489 
5490 
5491 static void
5492 nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5493 {
5494     size_t                   mi;
5495     uint32_t                 i;
5496     nxt_bool_t               ack;
5497     nxt_process_t            *process;
5498     nxt_free_map_t           *m;
5499     nxt_port_mmap_handler_t  *mmap_handler;
5500 
5501     nxt_debug(task, "oosm in %PI", msg->port_msg.pid);
5502 
5503     process = nxt_runtime_process_find(task->thread->runtime,
5504                                        msg->port_msg.pid);
5505     if (nxt_slow_path(process == NULL)) {
5506         return;
5507     }
5508 
5509     ack = 0;
5510 
5511     /*
5512      * To mitigate possible racing condition (when OOSM message received
5513      * after some of the memory was already freed), need to try to find
5514      * first free segment in shared memory and send ACK if found.
5515      */
5516 
5517     nxt_thread_mutex_lock(&process->incoming.mutex);
5518 
5519     for (i = 0; i < process->incoming.size; i++) {
5520         mmap_handler = process->incoming.elts[i].mmap_handler;
5521 
5522         if (nxt_slow_path(mmap_handler == NULL)) {
5523             continue;
5524         }
5525 
5526         m = mmap_handler->hdr->free_map;
5527 
5528         for (mi = 0; mi < MAX_FREE_IDX; mi++) {
5529             if (m[mi] != 0) {
5530                 ack = 1;
5531 
5532                 nxt_debug(task, "oosm: already free #%uD %uz = 0x%08xA",
5533                           i, mi, m[mi]);
5534 
5535                 break;
5536             }
5537         }
5538     }
5539 
5540     nxt_thread_mutex_unlock(&process->incoming.mutex);
5541 
5542     if (ack) {
5543         nxt_process_broadcast_shm_ack(task, process);
5544     }
5545 }
5546 
5547 
5548 static void
5549 nxt_router_get_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5550 {
5551     nxt_fd_t                 fd;
5552     nxt_port_t               *port;
5553     nxt_runtime_t            *rt;
5554     nxt_port_mmaps_t         *mmaps;
5555     nxt_port_msg_get_mmap_t  *get_mmap_msg;
5556     nxt_port_mmap_handler_t  *mmap_handler;
5557 
5558     rt = task->thread->runtime;
5559 
5560     port = nxt_runtime_port_find(rt, msg->port_msg.pid,
5561                                  msg->port_msg.reply_port);
5562     if (nxt_slow_path(port == NULL)) {
5563         nxt_alert(task, "get_mmap_handler: reply_port %PI:%d not found",
5564                   msg->port_msg.pid, msg->port_msg.reply_port);
5565 
5566         return;
5567     }
5568 
5569     if (nxt_slow_path(nxt_buf_used_size(msg->buf)
5570                       < (int) sizeof(nxt_port_msg_get_mmap_t)))
5571     {
5572         nxt_alert(task, "get_mmap_handler: message buffer too small (%d)",
5573                   (int) nxt_buf_used_size(msg->buf));
5574 
5575         return;
5576     }
5577 
5578     get_mmap_msg = (nxt_port_msg_get_mmap_t *) msg->buf->mem.pos;
5579 
5580     nxt_assert(port->type == NXT_PROCESS_APP);
5581 
5582     if (nxt_slow_path(port->app == NULL)) {
5583         nxt_alert(task, "get_mmap_handler: app == NULL for reply port %PI:%d",
5584                   port->pid, port->id);
5585 
5586         // FIXME
5587         nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
5588                               -1, msg->port_msg.stream, 0, NULL);
5589 
5590         return;
5591     }
5592 
5593     mmaps = &port->app->outgoing;
5594     nxt_thread_mutex_lock(&mmaps->mutex);
5595 
5596     if (nxt_slow_path(get_mmap_msg->id >= mmaps->size)) {
5597         nxt_thread_mutex_unlock(&mmaps->mutex);
5598 
5599         nxt_alert(task, "get_mmap_handler: mmap id is too big (%d)",
5600                   (int) get_mmap_msg->id);
5601 
5602         // FIXME
5603         nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
5604                               -1, msg->port_msg.stream, 0, NULL);
5605         return;
5606     }
5607 
5608     mmap_handler = mmaps->elts[get_mmap_msg->id].mmap_handler;
5609 
5610     fd = mmap_handler->fd;
5611 
5612     nxt_thread_mutex_unlock(&mmaps->mutex);
5613 
5614     nxt_debug(task, "get mmap %PI:%d found",
5615               msg->port_msg.pid, (int) get_mmap_msg->id);
5616 
5617     (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL);
5618 }
5619 
5620 
5621 static void
5622 nxt_router_get_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5623 {
5624     nxt_port_t               *port, *reply_port;
5625     nxt_runtime_t            *rt;
5626     nxt_port_msg_get_port_t  *get_port_msg;
5627 
5628     rt = task->thread->runtime;
5629 
5630     reply_port = nxt_runtime_port_find(rt, msg->port_msg.pid,
5631                                        msg->port_msg.reply_port);
5632     if (nxt_slow_path(reply_port == NULL)) {
5633         nxt_alert(task, "get_port_handler: reply_port %PI:%d not found",
5634                   msg->port_msg.pid, msg->port_msg.reply_port);
5635 
5636         return;
5637     }
5638 
5639     if (nxt_slow_path(nxt_buf_used_size(msg->buf)
5640                       < (int) sizeof(nxt_port_msg_get_port_t)))
5641     {
5642         nxt_alert(task, "get_port_handler: message buffer too small (%d)",
5643                   (int) nxt_buf_used_size(msg->buf));
5644 
5645         return;
5646     }
5647 
5648     get_port_msg = (nxt_port_msg_get_port_t *) msg->buf->mem.pos;
5649 
5650     port = nxt_runtime_port_find(rt, get_port_msg->pid, get_port_msg->id);
5651     if (nxt_slow_path(port == NULL)) {
5652         nxt_alert(task, "get_port_handler: port %PI:%d not found",
5653                   get_port_msg->pid, get_port_msg->id);
5654 
5655         return;
5656     }
5657 
5658     nxt_debug(task, "get port %PI:%d found", get_port_msg->pid,
5659               get_port_msg->id);
5660 
5661     (void) nxt_port_send_port(task, reply_port, port, msg->port_msg.stream);
5662 }
5663