xref: /unit/src/nxt_router.c (revision 2131:aea375f03b0b)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Valentin V. Bartenev
5  * Copyright (C) NGINX, Inc.
6  */
7 
8 #include <nxt_router.h>
9 #include <nxt_conf.h>
10 #if (NXT_TLS)
11 #include <nxt_cert.h>
12 #endif
13 #include <nxt_http.h>
14 #include <nxt_port_memory_int.h>
15 #include <nxt_unit_request.h>
16 #include <nxt_unit_response.h>
17 #include <nxt_router_request.h>
18 #include <nxt_app_queue.h>
19 #include <nxt_port_queue.h>
20 
21 #define NXT_SHARED_PORT_ID  0xFFFFu
22 
23 typedef struct {
24     nxt_str_t         type;
25     uint32_t          processes;
26     uint32_t          max_processes;
27     uint32_t          spare_processes;
28     nxt_msec_t        timeout;
29     nxt_msec_t        idle_timeout;
30     nxt_conf_value_t  *limits_value;
31     nxt_conf_value_t  *processes_value;
32     nxt_conf_value_t  *targets_value;
33 } nxt_router_app_conf_t;
34 
35 
36 typedef struct {
37     nxt_str_t         pass;
38     nxt_str_t         application;
39 } nxt_router_listener_conf_t;
40 
41 
42 #if (NXT_TLS)
43 
44 typedef struct {
45     nxt_str_t               name;
46     nxt_socket_conf_t       *socket_conf;
47     nxt_router_temp_conf_t  *temp_conf;
48     nxt_tls_init_t          *tls_init;
49     nxt_bool_t              last;
50 
51     nxt_queue_link_t        link;  /* for nxt_socket_conf_t.tls */
52 } nxt_router_tlssock_t;
53 
54 #endif
55 
56 
57 typedef struct {
58     nxt_str_t               *name;
59     nxt_socket_conf_t       *socket_conf;
60     nxt_router_temp_conf_t  *temp_conf;
61     nxt_bool_t              last;
62 } nxt_socket_rpc_t;
63 
64 
65 typedef struct {
66     nxt_app_t               *app;
67     nxt_router_temp_conf_t  *temp_conf;
68     uint8_t                 proto;  /* 1 bit */
69 } nxt_app_rpc_t;
70 
71 
72 typedef struct {
73     nxt_app_joint_t         *app_joint;
74     uint32_t                generation;
75     uint8_t                 proto;  /* 1 bit */
76 } nxt_app_joint_rpc_t;
77 
78 
79 static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process,
80     nxt_mp_t *mp);
81 static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data);
82 static void nxt_router_greet_controller(nxt_task_t *task,
83     nxt_port_t *controller_port);
84 
85 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
86 
87 static void nxt_router_new_port_handler(nxt_task_t *task,
88     nxt_port_recv_msg_t *msg);
89 static void nxt_router_conf_data_handler(nxt_task_t *task,
90     nxt_port_recv_msg_t *msg);
91 static void nxt_router_app_restart_handler(nxt_task_t *task,
92     nxt_port_recv_msg_t *msg);
93 static void nxt_router_remove_pid_handler(nxt_task_t *task,
94     nxt_port_recv_msg_t *msg);
95 static void nxt_router_access_log_reopen_handler(nxt_task_t *task,
96     nxt_port_recv_msg_t *msg);
97 
98 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
99 static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
100 static void nxt_router_conf_ready(nxt_task_t *task,
101     nxt_router_temp_conf_t *tmcf);
102 static void nxt_router_conf_error(nxt_task_t *task,
103     nxt_router_temp_conf_t *tmcf);
104 static void nxt_router_conf_send(nxt_task_t *task,
105     nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
106 
107 static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
108     nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
109 static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task,
110     nxt_router_conf_t *rtcf, nxt_conf_value_t *conf);
111 static nxt_int_t nxt_router_conf_process_client_ip(nxt_task_t *task,
112     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf,
113     nxt_conf_value_t *conf);
114 
115 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
116 static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data);
117 static nxt_int_t nxt_router_apps_hash_add(nxt_router_conf_t *rtcf,
118     nxt_app_t *app);
119 static nxt_app_t *nxt_router_apps_hash_get(nxt_router_conf_t *rtcf,
120     nxt_str_t *name);
121 static void nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf,
122     int i);
123 
124 static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task,
125     nxt_port_t *port);
126 static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task,
127     nxt_port_t *port);
128 static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task,
129     nxt_port_t *port, nxt_fd_t fd);
130 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
131     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
132 static void nxt_router_listen_socket_ready(nxt_task_t *task,
133     nxt_port_recv_msg_t *msg, void *data);
134 static void nxt_router_listen_socket_error(nxt_task_t *task,
135     nxt_port_recv_msg_t *msg, void *data);
136 #if (NXT_TLS)
137 static void nxt_router_tls_rpc_handler(nxt_task_t *task,
138     nxt_port_recv_msg_t *msg, void *data);
139 static nxt_int_t nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
140     nxt_conf_value_t *value, nxt_socket_conf_t *skcf, nxt_tls_init_t *tls_init,
141     nxt_bool_t last);
142 #endif
143 static void nxt_router_app_rpc_create(nxt_task_t *task,
144     nxt_router_temp_conf_t *tmcf, nxt_app_t *app);
145 static void nxt_router_app_prefork_ready(nxt_task_t *task,
146     nxt_port_recv_msg_t *msg, void *data);
147 static void nxt_router_app_prefork_error(nxt_task_t *task,
148     nxt_port_recv_msg_t *msg, void *data);
149 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
150     nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
151 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
152     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa);
153 
154 static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
155     nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
156     const nxt_event_interface_t *interface);
157 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
158     nxt_router_engine_conf_t *recf);
159 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
160     nxt_router_engine_conf_t *recf);
161 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
162     nxt_router_engine_conf_t *recf);
163 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
164     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
165     nxt_work_handler_t handler);
166 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
167     nxt_router_engine_conf_t *recf);
168 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
169     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
170 
171 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
172     nxt_router_temp_conf_t *tmcf);
173 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
174     nxt_event_engine_t *engine);
175 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
176     nxt_router_temp_conf_t *tmcf);
177 
178 static void nxt_router_engines_post(nxt_router_t *router,
179     nxt_router_temp_conf_t *tmcf);
180 static void nxt_router_engine_post(nxt_event_engine_t *engine,
181     nxt_work_t *jobs);
182 
183 static void nxt_router_thread_start(void *data);
184 static void nxt_router_rt_add_port(nxt_task_t *task, void *obj,
185     void *data);
186 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
187     void *data);
188 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
189     void *data);
190 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
191     void *data);
192 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
193     void *data);
194 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
195     void *data);
196 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
197     void *data);
198 static void nxt_router_req_headers_ack_handler(nxt_task_t *task,
199     nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data);
200 static void nxt_router_listen_socket_release(nxt_task_t *task,
201     nxt_socket_conf_t *skcf);
202 
203 static void nxt_router_access_log_writer(nxt_task_t *task,
204     nxt_http_request_t *r, nxt_router_access_log_t *access_log);
205 static u_char *nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now,
206     struct tm *tm, size_t size, const char *format);
207 static void nxt_router_access_log_open(nxt_task_t *task,
208     nxt_router_temp_conf_t *tmcf);
209 static void nxt_router_access_log_ready(nxt_task_t *task,
210     nxt_port_recv_msg_t *msg, void *data);
211 static void nxt_router_access_log_error(nxt_task_t *task,
212     nxt_port_recv_msg_t *msg, void *data);
213 static void nxt_router_access_log_use(nxt_thread_spinlock_t *lock,
214     nxt_router_access_log_t *access_log);
215 static void nxt_router_access_log_release(nxt_task_t *task,
216     nxt_thread_spinlock_t *lock, nxt_router_access_log_t *access_log);
217 static void nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj,
218     void *data);
219 static void nxt_router_access_log_reopen_ready(nxt_task_t *task,
220     nxt_port_recv_msg_t *msg, void *data);
221 static void nxt_router_access_log_reopen_error(nxt_task_t *task,
222     nxt_port_recv_msg_t *msg, void *data);
223 
224 static void nxt_router_app_port_ready(nxt_task_t *task,
225     nxt_port_recv_msg_t *msg, void *data);
226 static void nxt_router_app_port_error(nxt_task_t *task,
227     nxt_port_recv_msg_t *msg, void *data);
228 
229 static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i);
230 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
231 
232 static void nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app,
233     nxt_port_t *port, nxt_apr_action_t action);
234 static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
235     nxt_request_rpc_data_t *req_rpc_data);
236 static void nxt_router_http_request_error(nxt_task_t *task, void *obj,
237     void *data);
238 static void nxt_router_http_request_done(nxt_task_t *task, void *obj,
239     void *data);
240 
241 static void nxt_router_app_prepare_request(nxt_task_t *task,
242     nxt_request_rpc_data_t *req_rpc_data);
243 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task,
244     nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix);
245 
246 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
247 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
248     void *data);
249 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj,
250     void *data);
251 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj,
252     void *data);
253 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data);
254 
255 static const nxt_http_request_state_t  nxt_http_request_send_state;
256 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
257 
258 static void nxt_router_app_joint_use(nxt_task_t *task,
259     nxt_app_joint_t *app_joint, int i);
260 
261 static void nxt_router_http_request_release_post(nxt_task_t *task,
262     nxt_http_request_t *r);
263 static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
264     void *data);
265 static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
266 static void nxt_router_get_port_handler(nxt_task_t *task,
267     nxt_port_recv_msg_t *msg);
268 static void nxt_router_get_mmap_handler(nxt_task_t *task,
269     nxt_port_recv_msg_t *msg);
270 
271 extern const nxt_http_request_state_t  nxt_http_websocket;
272 
273 static nxt_router_t  *nxt_router;
274 
275 static const nxt_str_t http_prefix = nxt_string("HTTP_");
276 static const nxt_str_t empty_prefix = nxt_string("");
277 
278 static const nxt_str_t  *nxt_app_msg_prefix[] = {
279     &empty_prefix,
280     &empty_prefix,
281     &http_prefix,
282     &http_prefix,
283     &http_prefix,
284     &empty_prefix,
285 };
286 
287 
288 static const nxt_port_handlers_t  nxt_router_process_port_handlers = {
289     .quit         = nxt_signal_quit_handler,
290     .new_port     = nxt_router_new_port_handler,
291     .get_port     = nxt_router_get_port_handler,
292     .change_file  = nxt_port_change_log_file_handler,
293     .mmap         = nxt_port_mmap_handler,
294     .get_mmap     = nxt_router_get_mmap_handler,
295     .data         = nxt_router_conf_data_handler,
296     .app_restart  = nxt_router_app_restart_handler,
297     .remove_pid   = nxt_router_remove_pid_handler,
298     .access_log   = nxt_router_access_log_reopen_handler,
299     .rpc_ready    = nxt_port_rpc_handler,
300     .rpc_error    = nxt_port_rpc_handler,
301     .oosm         = nxt_router_oosm_handler,
302 };
303 
304 
305 const nxt_process_init_t  nxt_router_process = {
306     .name           = "router",
307     .type           = NXT_PROCESS_ROUTER,
308     .prefork        = nxt_router_prefork,
309     .restart        = 1,
310     .setup          = nxt_process_core_setup,
311     .start          = nxt_router_start,
312     .port_handlers  = &nxt_router_process_port_handlers,
313     .signals        = nxt_process_signals,
314 };
315 
316 
317 /* Queues of nxt_socket_conf_t */
318 nxt_queue_t  creating_sockets;
319 nxt_queue_t  pending_sockets;
320 nxt_queue_t  updating_sockets;
321 nxt_queue_t  keeping_sockets;
322 nxt_queue_t  deleting_sockets;
323 
324 
325 static nxt_int_t
326 nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp)
327 {
328     nxt_runtime_stop_app_processes(task, task->thread->runtime);
329 
330     return NXT_OK;
331 }
332 
333 
334 static nxt_int_t
335 nxt_router_start(nxt_task_t *task, nxt_process_data_t *data)
336 {
337     nxt_int_t      ret;
338     nxt_port_t     *controller_port;
339     nxt_router_t   *router;
340     nxt_runtime_t  *rt;
341 
342     rt = task->thread->runtime;
343 
344     nxt_log(task, NXT_LOG_INFO, "router started");
345 
346 #if (NXT_TLS)
347     rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL");
348     if (nxt_slow_path(rt->tls == NULL)) {
349         return NXT_ERROR;
350     }
351 
352     ret = rt->tls->library_init(task);
353     if (nxt_slow_path(ret != NXT_OK)) {
354         return ret;
355     }
356 #endif
357 
358     ret = nxt_http_init(task);
359     if (nxt_slow_path(ret != NXT_OK)) {
360         return ret;
361     }
362 
363     router = nxt_zalloc(sizeof(nxt_router_t));
364     if (nxt_slow_path(router == NULL)) {
365         return NXT_ERROR;
366     }
367 
368     nxt_queue_init(&router->engines);
369     nxt_queue_init(&router->sockets);
370     nxt_queue_init(&router->apps);
371 
372     nxt_router = router;
373 
374     controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
375     if (controller_port != NULL) {
376         nxt_router_greet_controller(task, controller_port);
377     }
378 
379     return NXT_OK;
380 }
381 
382 
383 static void
384 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port)
385 {
386     nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY,
387                           -1, 0, 0, NULL);
388 }
389 
390 
391 static void
392 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
393     void *data)
394 {
395     size_t               size;
396     uint32_t             stream;
397     nxt_fd_t             port_fd, queue_fd;
398     nxt_int_t            ret;
399     nxt_app_t            *app;
400     nxt_buf_t            *b;
401     nxt_port_t           *dport;
402     nxt_runtime_t        *rt;
403     nxt_app_joint_rpc_t  *app_joint_rpc;
404 
405     app = data;
406 
407     nxt_thread_mutex_lock(&app->mutex);
408 
409     dport = app->proto_port;
410 
411     nxt_thread_mutex_unlock(&app->mutex);
412 
413     if (dport != NULL) {
414         nxt_debug(task, "app '%V' %p start process", &app->name, app);
415 
416         b = NULL;
417         port_fd = -1;
418         queue_fd = -1;
419 
420     } else {
421         if (app->proto_port_requests > 0) {
422             nxt_debug(task, "app '%V' %p wait for prototype process",
423                       &app->name, app);
424 
425             app->proto_port_requests++;
426 
427             goto skip;
428         }
429 
430         nxt_debug(task, "app '%V' %p start prototype process", &app->name, app);
431 
432         rt = task->thread->runtime;
433         dport = rt->port_by_type[NXT_PROCESS_MAIN];
434 
435         size = app->name.length + 1 + app->conf.length;
436 
437         b = nxt_buf_mem_alloc(task->thread->engine->mem_pool, size, 0);
438         if (nxt_slow_path(b == NULL)) {
439             goto failed;
440         }
441 
442         nxt_buf_cpystr(b, &app->name);
443         *b->mem.free++ = '\0';
444         nxt_buf_cpystr(b, &app->conf);
445 
446         port_fd = app->shared_port->pair[0];
447         queue_fd = app->shared_port->queue_fd;
448     }
449 
450     app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port,
451                                                      nxt_router_app_port_ready,
452                                                      nxt_router_app_port_error,
453                                                    sizeof(nxt_app_joint_rpc_t));
454     if (nxt_slow_path(app_joint_rpc == NULL)) {
455         goto failed;
456     }
457 
458     stream = nxt_port_rpc_ex_stream(app_joint_rpc);
459 
460     ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS,
461                                  port_fd, queue_fd, stream, port->id, b);
462     if (nxt_slow_path(ret != NXT_OK)) {
463         nxt_port_rpc_cancel(task, port, stream);
464 
465         goto failed;
466     }
467 
468     app_joint_rpc->app_joint = app->joint;
469     app_joint_rpc->generation = app->generation;
470     app_joint_rpc->proto = (b != NULL);
471 
472     if (b != NULL) {
473         app->proto_port_requests++;
474 
475         b = NULL;
476     }
477 
478     nxt_router_app_joint_use(task, app->joint, 1);
479 
480 failed:
481 
482     if (b != NULL) {
483         nxt_mp_free(b->data, b);
484     }
485 
486 skip:
487 
488     nxt_router_app_use(task, app, -1);
489 }
490 
491 
492 static void
493 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i)
494 {
495     app_joint->use_count += i;
496 
497     if (app_joint->use_count == 0) {
498         nxt_assert(app_joint->app == NULL);
499 
500         nxt_free(app_joint);
501     }
502 }
503 
504 
505 static nxt_int_t
506 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
507 {
508     nxt_int_t      res;
509     nxt_port_t     *router_port;
510     nxt_runtime_t  *rt;
511 
512     nxt_debug(task, "app '%V' start process", &app->name);
513 
514     rt = task->thread->runtime;
515     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
516 
517     nxt_router_app_use(task, app, 1);
518 
519     res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler,
520                         app);
521 
522     if (res == NXT_OK) {
523         return res;
524     }
525 
526     nxt_thread_mutex_lock(&app->mutex);
527 
528     app->pending_processes--;
529 
530     nxt_thread_mutex_unlock(&app->mutex);
531 
532     nxt_router_app_use(task, app, -1);
533 
534     return NXT_ERROR;
535 }
536 
537 
538 nxt_inline nxt_bool_t
539 nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data)
540 {
541     nxt_buf_t       *b, *next;
542     nxt_bool_t      cancelled;
543     nxt_port_t      *app_port;
544     nxt_msg_info_t  *msg_info;
545 
546     msg_info = &req_rpc_data->msg_info;
547 
548     if (msg_info->buf == NULL) {
549         return 0;
550     }
551 
552     app_port = req_rpc_data->app_port;
553 
554     if (app_port != NULL && app_port->id == NXT_SHARED_PORT_ID) {
555         cancelled = nxt_app_queue_cancel(app_port->queue,
556                                          msg_info->tracking_cookie,
557                                          req_rpc_data->stream);
558 
559         if (cancelled) {
560             nxt_debug(task, "stream #%uD: cancelled by router",
561                       req_rpc_data->stream);
562         }
563 
564     } else {
565         cancelled = 0;
566     }
567 
568     for (b = msg_info->buf; b != NULL; b = next) {
569         next = b->next;
570         b->next = NULL;
571 
572         if (b->is_port_mmap_sent) {
573             b->is_port_mmap_sent = cancelled == 0;
574         }
575 
576         b->completion_handler(task, b, b->parent);
577     }
578 
579     msg_info->buf = NULL;
580 
581     return cancelled;
582 }
583 
584 
585 nxt_inline nxt_bool_t
586 nxt_queue_chk_remove(nxt_queue_link_t *lnk)
587 {
588     if (lnk->next != NULL) {
589         nxt_queue_remove(lnk);
590 
591         lnk->next = NULL;
592 
593         return 1;
594     }
595 
596     return 0;
597 }
598 
599 
600 nxt_inline void
601 nxt_request_rpc_data_unlink(nxt_task_t *task,
602     nxt_request_rpc_data_t *req_rpc_data)
603 {
604     nxt_app_t           *app;
605     nxt_bool_t          unlinked;
606     nxt_http_request_t  *r;
607 
608     nxt_router_msg_cancel(task, req_rpc_data);
609 
610     app = req_rpc_data->app;
611 
612     if (req_rpc_data->app_port != NULL) {
613         nxt_router_app_port_release(task, app, req_rpc_data->app_port,
614                                     req_rpc_data->apr_action);
615 
616         req_rpc_data->app_port = NULL;
617     }
618 
619     r = req_rpc_data->request;
620 
621     if (r != NULL) {
622         r->timer_data = NULL;
623 
624         nxt_router_http_request_release_post(task, r);
625 
626         r->req_rpc_data = NULL;
627         req_rpc_data->request = NULL;
628 
629         if (app != NULL) {
630             unlinked = 0;
631 
632             nxt_thread_mutex_lock(&app->mutex);
633 
634             if (r->app_link.next != NULL) {
635                 nxt_queue_remove(&r->app_link);
636                 r->app_link.next = NULL;
637 
638                 unlinked = 1;
639             }
640 
641             nxt_thread_mutex_unlock(&app->mutex);
642 
643             if (unlinked) {
644                 nxt_mp_release(r->mem_pool);
645             }
646         }
647     }
648 
649     if (app != NULL) {
650         nxt_router_app_use(task, app, -1);
651 
652         req_rpc_data->app = NULL;
653     }
654 
655     if (req_rpc_data->msg_info.body_fd != -1) {
656         nxt_fd_close(req_rpc_data->msg_info.body_fd);
657 
658         req_rpc_data->msg_info.body_fd = -1;
659     }
660 
661     if (req_rpc_data->rpc_cancel) {
662         req_rpc_data->rpc_cancel = 0;
663 
664         nxt_port_rpc_cancel(task, task->thread->engine->port,
665                             req_rpc_data->stream);
666     }
667 }
668 
669 
670 static void
671 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
672 {
673     nxt_int_t      res;
674     nxt_app_t      *app;
675     nxt_port_t     *port, *main_app_port;
676     nxt_runtime_t  *rt;
677 
678     nxt_port_new_port_handler(task, msg);
679 
680     port = msg->u.new_port;
681 
682     if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) {
683         nxt_router_greet_controller(task, msg->u.new_port);
684     }
685 
686     if (port != NULL && port->type == NXT_PROCESS_PROTOTYPE)  {
687         nxt_port_rpc_handler(task, msg);
688 
689         return;
690     }
691 
692     if (port == NULL || port->type != NXT_PROCESS_APP) {
693 
694         if (msg->port_msg.stream == 0) {
695             return;
696         }
697 
698         msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
699 
700     } else {
701         if (msg->fd[1] != -1) {
702             res = nxt_router_port_queue_map(task, port, msg->fd[1]);
703             if (nxt_slow_path(res != NXT_OK)) {
704                 return;
705             }
706 
707             nxt_fd_close(msg->fd[1]);
708             msg->fd[1] = -1;
709         }
710     }
711 
712     if (msg->port_msg.stream != 0) {
713         nxt_port_rpc_handler(task, msg);
714         return;
715     }
716 
717     nxt_debug(task, "new port id %d (%d)", port->id, port->type);
718 
719     /*
720      * Port with "id == 0" is application 'main' port and it always
721      * should come with non-zero stream.
722      */
723     nxt_assert(port->id != 0);
724 
725     /* Find 'main' app port and get app reference. */
726     rt = task->thread->runtime;
727 
728     /*
729      * It is safe to access 'runtime->ports' hash because 'NEW_PORT'
730      * sent to main port (with id == 0) and processed in main thread.
731      */
732     main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0);
733     nxt_assert(main_app_port != NULL);
734 
735     app = main_app_port->app;
736 
737     if (nxt_fast_path(app != NULL)) {
738         nxt_thread_mutex_lock(&app->mutex);
739 
740         /* TODO here should be find-and-add code because there can be
741            port waiters in port_hash */
742         nxt_port_hash_add(&app->port_hash, port);
743         app->port_hash_count++;
744 
745         nxt_thread_mutex_unlock(&app->mutex);
746 
747         port->app = app;
748     }
749 
750     port->main_app_port = main_app_port;
751 
752     nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
753 }
754 
755 
756 static void
757 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
758 {
759     void                    *p;
760     size_t                  size;
761     nxt_int_t               ret;
762     nxt_port_t              *port;
763     nxt_router_temp_conf_t  *tmcf;
764 
765     port = nxt_runtime_port_find(task->thread->runtime,
766                                  msg->port_msg.pid,
767                                  msg->port_msg.reply_port);
768     if (nxt_slow_path(port == NULL)) {
769         nxt_alert(task, "conf_data_handler: reply port not found");
770         return;
771     }
772 
773     p = MAP_FAILED;
774 
775     /*
776      * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be
777      * initialized in 'cleanup' section.
778      */
779     size = 0;
780 
781     tmcf = nxt_router_temp_conf(task);
782     if (nxt_slow_path(tmcf == NULL)) {
783         goto fail;
784     }
785 
786     if (nxt_slow_path(msg->fd[0] == -1)) {
787         nxt_alert(task, "conf_data_handler: invalid shm fd");
788         goto fail;
789     }
790 
791     if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) {
792         nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)",
793                   (int) nxt_buf_mem_used_size(&msg->buf->mem));
794         goto fail;
795     }
796 
797     nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t));
798 
799     p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0);
800 
801     nxt_fd_close(msg->fd[0]);
802     msg->fd[0] = -1;
803 
804     if (nxt_slow_path(p == MAP_FAILED)) {
805         goto fail;
806     }
807 
808     nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p);
809 
810     tmcf->router_conf->router = nxt_router;
811     tmcf->stream = msg->port_msg.stream;
812     tmcf->port = port;
813 
814     nxt_port_use(task, tmcf->port, 1);
815 
816     ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size));
817 
818     if (nxt_fast_path(ret == NXT_OK)) {
819         nxt_router_conf_apply(task, tmcf, NULL);
820 
821     } else {
822         nxt_router_conf_error(task, tmcf);
823     }
824 
825     goto cleanup;
826 
827 fail:
828 
829     nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
830                           msg->port_msg.stream, 0, NULL);
831 
832     if (tmcf != NULL) {
833         nxt_mp_release(tmcf->mem_pool);
834     }
835 
836 cleanup:
837 
838     if (p != MAP_FAILED) {
839         nxt_mem_munmap(p, size);
840     }
841 
842     if (msg->fd[0] != -1) {
843         nxt_fd_close(msg->fd[0]);
844         msg->fd[0] = -1;
845     }
846 }
847 
848 
849 static void
850 nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
851 {
852     nxt_app_t            *app;
853     nxt_int_t            ret;
854     nxt_str_t            app_name;
855     nxt_port_t           *reply_port, *shared_port, *old_shared_port;
856     nxt_port_t           *proto_port;
857     nxt_port_msg_type_t  reply;
858 
859     reply_port = nxt_runtime_port_find(task->thread->runtime,
860                                        msg->port_msg.pid,
861                                        msg->port_msg.reply_port);
862     if (nxt_slow_path(reply_port == NULL)) {
863         nxt_alert(task, "app_restart_handler: reply port not found");
864         return;
865     }
866 
867     app_name.length = nxt_buf_mem_used_size(&msg->buf->mem);
868     app_name.start = msg->buf->mem.pos;
869 
870     nxt_debug(task, "app_restart_handler: %V", &app_name);
871 
872     app = nxt_router_app_find(&nxt_router->apps, &app_name);
873 
874     if (nxt_fast_path(app != NULL)) {
875         shared_port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
876                                    NXT_PROCESS_APP);
877         if (nxt_slow_path(shared_port == NULL)) {
878             goto fail;
879         }
880 
881         ret = nxt_port_socket_init(task, shared_port, 0);
882         if (nxt_slow_path(ret != NXT_OK)) {
883             nxt_port_use(task, shared_port, -1);
884             goto fail;
885         }
886 
887         ret = nxt_router_app_queue_init(task, shared_port);
888         if (nxt_slow_path(ret != NXT_OK)) {
889             nxt_port_write_close(shared_port);
890             nxt_port_read_close(shared_port);
891             nxt_port_use(task, shared_port, -1);
892             goto fail;
893         }
894 
895         nxt_port_write_enable(task, shared_port);
896 
897         nxt_thread_mutex_lock(&app->mutex);
898 
899         proto_port = app->proto_port;
900 
901         if (proto_port != NULL) {
902             nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name,
903                       proto_port->pid);
904 
905             app->proto_port = NULL;
906             proto_port->app = NULL;
907         }
908 
909         app->generation++;
910 
911         shared_port->app = app;
912 
913         old_shared_port = app->shared_port;
914         old_shared_port->app = NULL;
915 
916         app->shared_port = shared_port;
917 
918         nxt_thread_mutex_unlock(&app->mutex);
919 
920         nxt_port_close(task, old_shared_port);
921         nxt_port_use(task, old_shared_port, -1);
922 
923         if (proto_port != NULL) {
924             (void) nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT,
925                                          -1, 0, 0, NULL);
926 
927             nxt_port_close(task, proto_port);
928 
929             nxt_port_use(task, proto_port, -1);
930         }
931 
932         reply = NXT_PORT_MSG_RPC_READY_LAST;
933 
934     } else {
935 
936 fail:
937 
938         reply = NXT_PORT_MSG_RPC_ERROR;
939     }
940 
941     nxt_port_socket_write(task, reply_port, reply, -1, msg->port_msg.stream,
942                           0, NULL);
943 }
944 
945 
946 static void
947 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
948     void *data)
949 {
950     union {
951         nxt_pid_t  removed_pid;
952         void       *data;
953     } u;
954 
955     u.data = data;
956 
957     nxt_port_rpc_remove_peer(task, port, u.removed_pid);
958 }
959 
960 
961 static void
962 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
963 {
964     nxt_event_engine_t  *engine;
965 
966     nxt_port_remove_pid_handler(task, msg);
967 
968     nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
969     {
970         if (nxt_fast_path(engine->port != NULL)) {
971             nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid,
972                           msg->u.data);
973         }
974     }
975     nxt_queue_loop;
976 
977     if (msg->port_msg.stream == 0) {
978         return;
979     }
980 
981     msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
982 
983     nxt_port_rpc_handler(task, msg);
984 }
985 
986 
987 static nxt_router_temp_conf_t *
988 nxt_router_temp_conf(nxt_task_t *task)
989 {
990     nxt_mp_t                *mp, *tmp;
991     nxt_router_conf_t       *rtcf;
992     nxt_router_temp_conf_t  *tmcf;
993 
994     mp = nxt_mp_create(1024, 128, 256, 32);
995     if (nxt_slow_path(mp == NULL)) {
996         return NULL;
997     }
998 
999     rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t));
1000     if (nxt_slow_path(rtcf == NULL)) {
1001         goto fail;
1002     }
1003 
1004     rtcf->mem_pool = mp;
1005 
1006     tmp = nxt_mp_create(1024, 128, 256, 32);
1007     if (nxt_slow_path(tmp == NULL)) {
1008         goto fail;
1009     }
1010 
1011     tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t));
1012     if (nxt_slow_path(tmcf == NULL)) {
1013         goto temp_fail;
1014     }
1015 
1016     tmcf->mem_pool = tmp;
1017     tmcf->router_conf = rtcf;
1018     tmcf->count = 1;
1019     tmcf->engine = task->thread->engine;
1020 
1021     tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
1022                                      sizeof(nxt_router_engine_conf_t));
1023     if (nxt_slow_path(tmcf->engines == NULL)) {
1024         goto temp_fail;
1025     }
1026 
1027     nxt_queue_init(&creating_sockets);
1028     nxt_queue_init(&pending_sockets);
1029     nxt_queue_init(&updating_sockets);
1030     nxt_queue_init(&keeping_sockets);
1031     nxt_queue_init(&deleting_sockets);
1032 
1033 #if (NXT_TLS)
1034     nxt_queue_init(&tmcf->tls);
1035 #endif
1036 
1037     nxt_queue_init(&tmcf->apps);
1038     nxt_queue_init(&tmcf->previous);
1039 
1040     return tmcf;
1041 
1042 temp_fail:
1043 
1044     nxt_mp_destroy(tmp);
1045 
1046 fail:
1047 
1048     nxt_mp_destroy(mp);
1049 
1050     return NULL;
1051 }
1052 
1053 
1054 nxt_inline nxt_bool_t
1055 nxt_router_app_can_start(nxt_app_t *app)
1056 {
1057     return app->processes + app->pending_processes < app->max_processes
1058             && app->pending_processes < app->max_pending_processes;
1059 }
1060 
1061 
1062 nxt_inline nxt_bool_t
1063 nxt_router_app_need_start(nxt_app_t *app)
1064 {
1065     return (app->active_requests
1066               > app->port_hash_count + app->pending_processes)
1067            || (app->spare_processes
1068                 > app->idle_processes + app->pending_processes);
1069 }
1070 
1071 
1072 static void
1073 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
1074 {
1075     nxt_int_t                    ret;
1076     nxt_app_t                    *app;
1077     nxt_router_t                 *router;
1078     nxt_runtime_t                *rt;
1079     nxt_queue_link_t             *qlk;
1080     nxt_socket_conf_t            *skcf;
1081     nxt_router_conf_t            *rtcf;
1082     nxt_router_temp_conf_t       *tmcf;
1083     const nxt_event_interface_t  *interface;
1084 #if (NXT_TLS)
1085     nxt_router_tlssock_t         *tls;
1086 #endif
1087 
1088     tmcf = obj;
1089 
1090     qlk = nxt_queue_first(&pending_sockets);
1091 
1092     if (qlk != nxt_queue_tail(&pending_sockets)) {
1093         nxt_queue_remove(qlk);
1094         nxt_queue_insert_tail(&creating_sockets, qlk);
1095 
1096         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1097 
1098         nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
1099 
1100         return;
1101     }
1102 
1103 #if (NXT_TLS)
1104     qlk = nxt_queue_last(&tmcf->tls);
1105 
1106     if (qlk != nxt_queue_head(&tmcf->tls)) {
1107         nxt_queue_remove(qlk);
1108 
1109         tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link);
1110 
1111         nxt_cert_store_get(task, &tls->name, tmcf->mem_pool,
1112                            nxt_router_tls_rpc_handler, tls);
1113         return;
1114     }
1115 #endif
1116 
1117     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1118 
1119         if (nxt_router_app_need_start(app)) {
1120             nxt_router_app_rpc_create(task, tmcf, app);
1121             return;
1122         }
1123 
1124     } nxt_queue_loop;
1125 
1126     rtcf = tmcf->router_conf;
1127 
1128     if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) {
1129         nxt_router_access_log_open(task, tmcf);
1130         return;
1131     }
1132 
1133     rt = task->thread->runtime;
1134 
1135     interface = nxt_service_get(rt->services, "engine", NULL);
1136 
1137     router = rtcf->router;
1138 
1139     ret = nxt_router_engines_create(task, router, tmcf, interface);
1140     if (nxt_slow_path(ret != NXT_OK)) {
1141         goto fail;
1142     }
1143 
1144     ret = nxt_router_threads_create(task, rt, tmcf);
1145     if (nxt_slow_path(ret != NXT_OK)) {
1146         goto fail;
1147     }
1148 
1149     nxt_router_apps_sort(task, router, tmcf);
1150 
1151     nxt_router_apps_hash_use(task, rtcf, 1);
1152 
1153     nxt_router_engines_post(router, tmcf);
1154 
1155     nxt_queue_add(&router->sockets, &updating_sockets);
1156     nxt_queue_add(&router->sockets, &creating_sockets);
1157 
1158     if (router->access_log != rtcf->access_log) {
1159         nxt_router_access_log_use(&router->lock, rtcf->access_log);
1160 
1161         nxt_router_access_log_release(task, &router->lock, router->access_log);
1162 
1163         router->access_log = rtcf->access_log;
1164     }
1165 
1166     nxt_router_conf_ready(task, tmcf);
1167 
1168     return;
1169 
1170 fail:
1171 
1172     nxt_router_conf_error(task, tmcf);
1173 
1174     return;
1175 }
1176 
1177 
1178 static void
1179 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data)
1180 {
1181     nxt_joint_job_t  *job;
1182 
1183     job = obj;
1184 
1185     nxt_router_conf_ready(task, job->tmcf);
1186 }
1187 
1188 
1189 static void
1190 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1191 {
1192     uint32_t               count;
1193     nxt_router_conf_t      *rtcf;
1194     nxt_thread_spinlock_t  *lock;
1195 
1196     nxt_debug(task, "temp conf %p count: %D", tmcf, tmcf->count);
1197 
1198     if (--tmcf->count > 0) {
1199         return;
1200     }
1201 
1202     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
1203 
1204     rtcf = tmcf->router_conf;
1205 
1206     lock = &rtcf->router->lock;
1207 
1208     nxt_thread_spin_lock(lock);
1209 
1210     count = rtcf->count;
1211 
1212     nxt_thread_spin_unlock(lock);
1213 
1214     nxt_debug(task, "rtcf %p: %D", rtcf, count);
1215 
1216     if (count == 0) {
1217         nxt_router_apps_hash_use(task, rtcf, -1);
1218 
1219         nxt_router_access_log_release(task, lock, rtcf->access_log);
1220 
1221         nxt_mp_destroy(rtcf->mem_pool);
1222     }
1223 
1224     nxt_mp_release(tmcf->mem_pool);
1225 }
1226 
1227 
1228 static void
1229 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1230 {
1231     nxt_app_t          *app;
1232     nxt_socket_t       s;
1233     nxt_router_t       *router;
1234     nxt_queue_link_t   *qlk;
1235     nxt_socket_conf_t  *skcf;
1236     nxt_router_conf_t  *rtcf;
1237 
1238     nxt_alert(task, "failed to apply new conf");
1239 
1240     for (qlk = nxt_queue_first(&creating_sockets);
1241          qlk != nxt_queue_tail(&creating_sockets);
1242          qlk = nxt_queue_next(qlk))
1243     {
1244         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1245         s = skcf->listen->socket;
1246 
1247         if (s != -1) {
1248             nxt_socket_close(task, s);
1249         }
1250 
1251         nxt_free(skcf->listen);
1252     }
1253 
1254     rtcf = tmcf->router_conf;
1255 
1256     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1257 
1258         nxt_router_app_unlink(task, app);
1259 
1260     } nxt_queue_loop;
1261 
1262     router = rtcf->router;
1263 
1264     nxt_queue_add(&router->sockets, &keeping_sockets);
1265     nxt_queue_add(&router->sockets, &deleting_sockets);
1266 
1267     nxt_queue_add(&router->apps, &tmcf->previous);
1268 
1269     // TODO: new engines and threads
1270 
1271     nxt_router_access_log_release(task, &router->lock, rtcf->access_log);
1272 
1273     nxt_mp_destroy(rtcf->mem_pool);
1274 
1275     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
1276 
1277     nxt_mp_release(tmcf->mem_pool);
1278 }
1279 
1280 
1281 static void
1282 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1283     nxt_port_msg_type_t type)
1284 {
1285     nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);
1286 
1287     nxt_port_use(task, tmcf->port, -1);
1288 
1289     tmcf->port = NULL;
1290 }
1291 
1292 
1293 static nxt_conf_map_t  nxt_router_conf[] = {
1294     {
1295         nxt_string("listeners_threads"),
1296         NXT_CONF_MAP_INT32,
1297         offsetof(nxt_router_conf_t, threads),
1298     },
1299 };
1300 
1301 
1302 static nxt_conf_map_t  nxt_router_app_conf[] = {
1303     {
1304         nxt_string("type"),
1305         NXT_CONF_MAP_STR,
1306         offsetof(nxt_router_app_conf_t, type),
1307     },
1308 
1309     {
1310         nxt_string("limits"),
1311         NXT_CONF_MAP_PTR,
1312         offsetof(nxt_router_app_conf_t, limits_value),
1313     },
1314 
1315     {
1316         nxt_string("processes"),
1317         NXT_CONF_MAP_INT32,
1318         offsetof(nxt_router_app_conf_t, processes),
1319     },
1320 
1321     {
1322         nxt_string("processes"),
1323         NXT_CONF_MAP_PTR,
1324         offsetof(nxt_router_app_conf_t, processes_value),
1325     },
1326 
1327     {
1328         nxt_string("targets"),
1329         NXT_CONF_MAP_PTR,
1330         offsetof(nxt_router_app_conf_t, targets_value),
1331     },
1332 };
1333 
1334 
1335 static nxt_conf_map_t  nxt_router_app_limits_conf[] = {
1336     {
1337         nxt_string("timeout"),
1338         NXT_CONF_MAP_MSEC,
1339         offsetof(nxt_router_app_conf_t, timeout),
1340     },
1341 };
1342 
1343 
1344 static nxt_conf_map_t  nxt_router_app_processes_conf[] = {
1345     {
1346         nxt_string("spare"),
1347         NXT_CONF_MAP_INT32,
1348         offsetof(nxt_router_app_conf_t, spare_processes),
1349     },
1350 
1351     {
1352         nxt_string("max"),
1353         NXT_CONF_MAP_INT32,
1354         offsetof(nxt_router_app_conf_t, max_processes),
1355     },
1356 
1357     {
1358         nxt_string("idle_timeout"),
1359         NXT_CONF_MAP_MSEC,
1360         offsetof(nxt_router_app_conf_t, idle_timeout),
1361     },
1362 };
1363 
1364 
1365 static nxt_conf_map_t  nxt_router_listener_conf[] = {
1366     {
1367         nxt_string("pass"),
1368         NXT_CONF_MAP_STR_COPY,
1369         offsetof(nxt_router_listener_conf_t, pass),
1370     },
1371 
1372     {
1373         nxt_string("application"),
1374         NXT_CONF_MAP_STR_COPY,
1375         offsetof(nxt_router_listener_conf_t, application),
1376     },
1377 };
1378 
1379 
1380 static nxt_conf_map_t  nxt_router_http_conf[] = {
1381     {
1382         nxt_string("header_buffer_size"),
1383         NXT_CONF_MAP_SIZE,
1384         offsetof(nxt_socket_conf_t, header_buffer_size),
1385     },
1386 
1387     {
1388         nxt_string("large_header_buffer_size"),
1389         NXT_CONF_MAP_SIZE,
1390         offsetof(nxt_socket_conf_t, large_header_buffer_size),
1391     },
1392 
1393     {
1394         nxt_string("large_header_buffers"),
1395         NXT_CONF_MAP_SIZE,
1396         offsetof(nxt_socket_conf_t, large_header_buffers),
1397     },
1398 
1399     {
1400         nxt_string("body_buffer_size"),
1401         NXT_CONF_MAP_SIZE,
1402         offsetof(nxt_socket_conf_t, body_buffer_size),
1403     },
1404 
1405     {
1406         nxt_string("max_body_size"),
1407         NXT_CONF_MAP_SIZE,
1408         offsetof(nxt_socket_conf_t, max_body_size),
1409     },
1410 
1411     {
1412         nxt_string("idle_timeout"),
1413         NXT_CONF_MAP_MSEC,
1414         offsetof(nxt_socket_conf_t, idle_timeout),
1415     },
1416 
1417     {
1418         nxt_string("header_read_timeout"),
1419         NXT_CONF_MAP_MSEC,
1420         offsetof(nxt_socket_conf_t, header_read_timeout),
1421     },
1422 
1423     {
1424         nxt_string("body_read_timeout"),
1425         NXT_CONF_MAP_MSEC,
1426         offsetof(nxt_socket_conf_t, body_read_timeout),
1427     },
1428 
1429     {
1430         nxt_string("send_timeout"),
1431         NXT_CONF_MAP_MSEC,
1432         offsetof(nxt_socket_conf_t, send_timeout),
1433     },
1434 
1435     {
1436         nxt_string("body_temp_path"),
1437         NXT_CONF_MAP_STR,
1438         offsetof(nxt_socket_conf_t, body_temp_path),
1439     },
1440 
1441     {
1442         nxt_string("discard_unsafe_fields"),
1443         NXT_CONF_MAP_INT8,
1444         offsetof(nxt_socket_conf_t, discard_unsafe_fields),
1445     },
1446 };
1447 
1448 
1449 static nxt_conf_map_t  nxt_router_websocket_conf[] = {
1450     {
1451         nxt_string("max_frame_size"),
1452         NXT_CONF_MAP_SIZE,
1453         offsetof(nxt_websocket_conf_t, max_frame_size),
1454     },
1455 
1456     {
1457         nxt_string("read_timeout"),
1458         NXT_CONF_MAP_MSEC,
1459         offsetof(nxt_websocket_conf_t, read_timeout),
1460     },
1461 
1462     {
1463         nxt_string("keepalive_interval"),
1464         NXT_CONF_MAP_MSEC,
1465         offsetof(nxt_websocket_conf_t, keepalive_interval),
1466     },
1467 
1468 };
1469 
1470 
1471 static nxt_int_t
1472 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1473     u_char *start, u_char *end)
1474 {
1475     u_char                      *p;
1476     size_t                      size;
1477     nxt_mp_t                    *mp, *app_mp;
1478     uint32_t                    next, next_target;
1479     nxt_int_t                   ret;
1480     nxt_str_t                   name, path, target;
1481     nxt_app_t                   *app, *prev;
1482     nxt_str_t                   *t, *s, *targets;
1483     nxt_uint_t                  n, i;
1484     nxt_port_t                  *port;
1485     nxt_router_t                *router;
1486     nxt_app_joint_t             *app_joint;
1487 #if (NXT_TLS)
1488     nxt_tls_init_t              *tls_init;
1489     nxt_conf_value_t            *certificate;
1490 #endif
1491     nxt_conf_value_t            *root, *conf, *http, *value, *websocket;
1492     nxt_conf_value_t            *applications, *application;
1493     nxt_conf_value_t            *listeners, *listener;
1494     nxt_socket_conf_t           *skcf;
1495     nxt_router_conf_t           *rtcf;
1496     nxt_http_routes_t           *routes;
1497     nxt_event_engine_t          *engine;
1498     nxt_app_lang_module_t       *lang;
1499     nxt_router_app_conf_t       apcf;
1500     nxt_router_access_log_t     *access_log;
1501     nxt_router_listener_conf_t  lscf;
1502 
1503     static nxt_str_t  http_path = nxt_string("/settings/http");
1504     static nxt_str_t  applications_path = nxt_string("/applications");
1505     static nxt_str_t  listeners_path = nxt_string("/listeners");
1506     static nxt_str_t  routes_path = nxt_string("/routes");
1507     static nxt_str_t  access_log_path = nxt_string("/access_log");
1508 #if (NXT_TLS)
1509     static nxt_str_t  certificate_path = nxt_string("/tls/certificate");
1510     static nxt_str_t  conf_commands_path = nxt_string("/tls/conf_commands");
1511     static nxt_str_t  conf_cache_path = nxt_string("/tls/session/cache_size");
1512     static nxt_str_t  conf_timeout_path = nxt_string("/tls/session/timeout");
1513     static nxt_str_t  conf_tickets = nxt_string("/tls/session/tickets");
1514 #endif
1515     static nxt_str_t  static_path = nxt_string("/settings/http/static");
1516     static nxt_str_t  websocket_path = nxt_string("/settings/http/websocket");
1517     static nxt_str_t  client_ip_path = nxt_string("/client_ip");
1518 
1519     root = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
1520     if (root == NULL) {
1521         nxt_alert(task, "configuration parsing error");
1522         return NXT_ERROR;
1523     }
1524 
1525     rtcf = tmcf->router_conf;
1526     mp = rtcf->mem_pool;
1527 
1528     ret = nxt_conf_map_object(mp, root, nxt_router_conf,
1529                               nxt_nitems(nxt_router_conf), rtcf);
1530     if (ret != NXT_OK) {
1531         nxt_alert(task, "root map error");
1532         return NXT_ERROR;
1533     }
1534 
1535     if (rtcf->threads == 0) {
1536         rtcf->threads = nxt_ncpu;
1537     }
1538 
1539     conf = nxt_conf_get_path(root, &static_path);
1540 
1541     ret = nxt_router_conf_process_static(task, rtcf, conf);
1542     if (nxt_slow_path(ret != NXT_OK)) {
1543         return NXT_ERROR;
1544     }
1545 
1546     router = rtcf->router;
1547 
1548     applications = nxt_conf_get_path(root, &applications_path);
1549 
1550     if (applications != NULL) {
1551         next = 0;
1552 
1553         for ( ;; ) {
1554             application = nxt_conf_next_object_member(applications,
1555                                                       &name, &next);
1556             if (application == NULL) {
1557                 break;
1558             }
1559 
1560             nxt_debug(task, "application \"%V\"", &name);
1561 
1562             size = nxt_conf_json_length(application, NULL);
1563 
1564             app_mp = nxt_mp_create(4096, 128, 1024, 64);
1565             if (nxt_slow_path(app_mp == NULL)) {
1566                 goto fail;
1567             }
1568 
1569             app = nxt_mp_get(app_mp, sizeof(nxt_app_t) + name.length + size);
1570             if (app == NULL) {
1571                 goto app_fail;
1572             }
1573 
1574             nxt_memzero(app, sizeof(nxt_app_t));
1575 
1576             app->mem_pool = app_mp;
1577 
1578             app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t));
1579             app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t)
1580                                                   + name.length);
1581 
1582             p = nxt_conf_json_print(app->conf.start, application, NULL);
1583             app->conf.length = p - app->conf.start;
1584 
1585             nxt_assert(app->conf.length <= size);
1586 
1587             nxt_debug(task, "application conf \"%V\"", &app->conf);
1588 
1589             prev = nxt_router_app_find(&router->apps, &name);
1590 
1591             if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
1592                 nxt_mp_destroy(app_mp);
1593 
1594                 nxt_queue_remove(&prev->link);
1595                 nxt_queue_insert_tail(&tmcf->previous, &prev->link);
1596 
1597                 ret = nxt_router_apps_hash_add(rtcf, prev);
1598                 if (nxt_slow_path(ret != NXT_OK)) {
1599                     goto fail;
1600                 }
1601 
1602                 continue;
1603             }
1604 
1605             apcf.processes = 1;
1606             apcf.max_processes = 1;
1607             apcf.spare_processes = 0;
1608             apcf.timeout = 0;
1609             apcf.idle_timeout = 15000;
1610             apcf.limits_value = NULL;
1611             apcf.processes_value = NULL;
1612             apcf.targets_value = NULL;
1613 
1614             app_joint = nxt_malloc(sizeof(nxt_app_joint_t));
1615             if (nxt_slow_path(app_joint == NULL)) {
1616                 goto app_fail;
1617             }
1618 
1619             nxt_memzero(app_joint, sizeof(nxt_app_joint_t));
1620 
1621             ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
1622                                       nxt_nitems(nxt_router_app_conf), &apcf);
1623             if (ret != NXT_OK) {
1624                 nxt_alert(task, "application map error");
1625                 goto app_fail;
1626             }
1627 
1628             if (apcf.limits_value != NULL) {
1629 
1630                 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) {
1631                     nxt_alert(task, "application limits is not object");
1632                     goto app_fail;
1633                 }
1634 
1635                 ret = nxt_conf_map_object(mp, apcf.limits_value,
1636                                         nxt_router_app_limits_conf,
1637                                         nxt_nitems(nxt_router_app_limits_conf),
1638                                         &apcf);
1639                 if (ret != NXT_OK) {
1640                     nxt_alert(task, "application limits map error");
1641                     goto app_fail;
1642                 }
1643             }
1644 
1645             if (apcf.processes_value != NULL
1646                 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT)
1647             {
1648                 ret = nxt_conf_map_object(mp, apcf.processes_value,
1649                                      nxt_router_app_processes_conf,
1650                                      nxt_nitems(nxt_router_app_processes_conf),
1651                                      &apcf);
1652                 if (ret != NXT_OK) {
1653                     nxt_alert(task, "application processes map error");
1654                     goto app_fail;
1655                 }
1656 
1657             } else {
1658                 apcf.max_processes = apcf.processes;
1659                 apcf.spare_processes = apcf.processes;
1660             }
1661 
1662             if (apcf.targets_value != NULL) {
1663                 n = nxt_conf_object_members_count(apcf.targets_value);
1664 
1665                 targets = nxt_mp_get(app_mp, sizeof(nxt_str_t) * n);
1666                 if (nxt_slow_path(targets == NULL)) {
1667                     goto app_fail;
1668                 }
1669 
1670                 next_target = 0;
1671 
1672                 for (i = 0; i < n; i++) {
1673                     (void) nxt_conf_next_object_member(apcf.targets_value,
1674                                                        &target, &next_target);
1675 
1676                     s = nxt_str_dup(app_mp, &targets[i], &target);
1677                     if (nxt_slow_path(s == NULL)) {
1678                         goto app_fail;
1679                     }
1680                 }
1681 
1682             } else {
1683                 targets = NULL;
1684             }
1685 
1686             nxt_debug(task, "application type: %V", &apcf.type);
1687             nxt_debug(task, "application processes: %D", apcf.processes);
1688             nxt_debug(task, "application request timeout: %M", apcf.timeout);
1689 
1690             lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
1691 
1692             if (lang == NULL) {
1693                 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type);
1694                 goto app_fail;
1695             }
1696 
1697             nxt_debug(task, "application language module: \"%s\"", lang->file);
1698 
1699             ret = nxt_thread_mutex_create(&app->mutex);
1700             if (ret != NXT_OK) {
1701                 goto app_fail;
1702             }
1703 
1704             nxt_queue_init(&app->ports);
1705             nxt_queue_init(&app->spare_ports);
1706             nxt_queue_init(&app->idle_ports);
1707             nxt_queue_init(&app->ack_waiting_req);
1708 
1709             app->name.length = name.length;
1710             nxt_memcpy(app->name.start, name.start, name.length);
1711 
1712             app->type = lang->type;
1713             app->max_processes = apcf.max_processes;
1714             app->spare_processes = apcf.spare_processes;
1715             app->max_pending_processes = apcf.spare_processes
1716                                          ? apcf.spare_processes : 1;
1717             app->timeout = apcf.timeout;
1718             app->idle_timeout = apcf.idle_timeout;
1719 
1720             app->targets = targets;
1721 
1722             engine = task->thread->engine;
1723 
1724             app->engine = engine;
1725 
1726             app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;
1727             app->adjust_idle_work.task = &engine->task;
1728             app->adjust_idle_work.obj = app;
1729 
1730             nxt_queue_insert_tail(&tmcf->apps, &app->link);
1731 
1732             ret = nxt_router_apps_hash_add(rtcf, app);
1733             if (nxt_slow_path(ret != NXT_OK)) {
1734                 goto app_fail;
1735             }
1736 
1737             nxt_router_app_use(task, app, 1);
1738 
1739             app->joint = app_joint;
1740 
1741             app_joint->use_count = 1;
1742             app_joint->app = app;
1743 
1744             app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS;
1745             app_joint->idle_timer.work_queue = &engine->fast_work_queue;
1746             app_joint->idle_timer.handler = nxt_router_app_idle_timeout;
1747             app_joint->idle_timer.task = &engine->task;
1748             app_joint->idle_timer.log = app_joint->idle_timer.task->log;
1749 
1750             app_joint->free_app_work.handler = nxt_router_free_app;
1751             app_joint->free_app_work.task = &engine->task;
1752             app_joint->free_app_work.obj = app_joint;
1753 
1754             port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
1755                                 NXT_PROCESS_APP);
1756             if (nxt_slow_path(port == NULL)) {
1757                 return NXT_ERROR;
1758             }
1759 
1760             ret = nxt_port_socket_init(task, port, 0);
1761             if (nxt_slow_path(ret != NXT_OK)) {
1762                 nxt_port_use(task, port, -1);
1763                 return NXT_ERROR;
1764             }
1765 
1766             ret = nxt_router_app_queue_init(task, port);
1767             if (nxt_slow_path(ret != NXT_OK)) {
1768                 nxt_port_write_close(port);
1769                 nxt_port_read_close(port);
1770                 nxt_port_use(task, port, -1);
1771                 return NXT_ERROR;
1772             }
1773 
1774             nxt_port_write_enable(task, port);
1775             port->app = app;
1776 
1777             app->shared_port = port;
1778 
1779             nxt_thread_mutex_create(&app->outgoing.mutex);
1780         }
1781     }
1782 
1783     conf = nxt_conf_get_path(root, &routes_path);
1784     if (nxt_fast_path(conf != NULL)) {
1785         routes = nxt_http_routes_create(task, tmcf, conf);
1786         if (nxt_slow_path(routes == NULL)) {
1787             return NXT_ERROR;
1788         }
1789         rtcf->routes = routes;
1790     }
1791 
1792     ret = nxt_upstreams_create(task, tmcf, root);
1793     if (nxt_slow_path(ret != NXT_OK)) {
1794         return ret;
1795     }
1796 
1797     http = nxt_conf_get_path(root, &http_path);
1798 #if 0
1799     if (http == NULL) {
1800         nxt_alert(task, "no \"http\" block");
1801         return NXT_ERROR;
1802     }
1803 #endif
1804 
1805     websocket = nxt_conf_get_path(root, &websocket_path);
1806 
1807     listeners = nxt_conf_get_path(root, &listeners_path);
1808 
1809     if (listeners != NULL) {
1810         next = 0;
1811 
1812         for ( ;; ) {
1813             listener = nxt_conf_next_object_member(listeners, &name, &next);
1814             if (listener == NULL) {
1815                 break;
1816             }
1817 
1818             skcf = nxt_router_socket_conf(task, tmcf, &name);
1819             if (skcf == NULL) {
1820                 goto fail;
1821             }
1822 
1823             nxt_memzero(&lscf, sizeof(lscf));
1824 
1825             ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
1826                                       nxt_nitems(nxt_router_listener_conf),
1827                                       &lscf);
1828             if (ret != NXT_OK) {
1829                 nxt_alert(task, "listener map error");
1830                 goto fail;
1831             }
1832 
1833             nxt_debug(task, "application: %V", &lscf.application);
1834 
1835             // STUB, default values if http block is not defined.
1836             skcf->header_buffer_size = 2048;
1837             skcf->large_header_buffer_size = 8192;
1838             skcf->large_header_buffers = 4;
1839             skcf->discard_unsafe_fields = 1;
1840             skcf->body_buffer_size = 16 * 1024;
1841             skcf->max_body_size = 8 * 1024 * 1024;
1842             skcf->proxy_header_buffer_size = 64 * 1024;
1843             skcf->proxy_buffer_size = 4096;
1844             skcf->proxy_buffers = 256;
1845             skcf->idle_timeout = 180 * 1000;
1846             skcf->header_read_timeout = 30 * 1000;
1847             skcf->body_read_timeout = 30 * 1000;
1848             skcf->send_timeout = 30 * 1000;
1849             skcf->proxy_timeout = 60 * 1000;
1850             skcf->proxy_send_timeout = 30 * 1000;
1851             skcf->proxy_read_timeout = 30 * 1000;
1852 
1853             skcf->websocket_conf.max_frame_size = 1024 * 1024;
1854             skcf->websocket_conf.read_timeout = 60 * 1000;
1855             skcf->websocket_conf.keepalive_interval = 30 * 1000;
1856 
1857             nxt_str_null(&skcf->body_temp_path);
1858 
1859             if (http != NULL) {
1860                 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
1861                                           nxt_nitems(nxt_router_http_conf),
1862                                           skcf);
1863                 if (ret != NXT_OK) {
1864                     nxt_alert(task, "http map error");
1865                     goto fail;
1866                 }
1867             }
1868 
1869             if (websocket != NULL) {
1870                 ret = nxt_conf_map_object(mp, websocket,
1871                                           nxt_router_websocket_conf,
1872                                           nxt_nitems(nxt_router_websocket_conf),
1873                                           &skcf->websocket_conf);
1874                 if (ret != NXT_OK) {
1875                     nxt_alert(task, "websocket map error");
1876                     goto fail;
1877                 }
1878             }
1879 
1880             t = &skcf->body_temp_path;
1881 
1882             if (t->length == 0) {
1883                 t->start = (u_char *) task->thread->runtime->tmp;
1884                 t->length = nxt_strlen(t->start);
1885             }
1886 
1887             conf = nxt_conf_get_path(listener, &client_ip_path);
1888             ret = nxt_router_conf_process_client_ip(task, tmcf, skcf,
1889                                                     conf);
1890             if (nxt_slow_path(ret != NXT_OK)) {
1891                 return NXT_ERROR;
1892             }
1893 
1894 #if (NXT_TLS)
1895             certificate = nxt_conf_get_path(listener, &certificate_path);
1896 
1897             if (certificate != NULL) {
1898                 tls_init = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_tls_init_t));
1899                 if (nxt_slow_path(tls_init == NULL)) {
1900                     return NXT_ERROR;
1901                 }
1902 
1903                 tls_init->cache_size = 0;
1904                 tls_init->timeout = 300;
1905 
1906                 value = nxt_conf_get_path(listener, &conf_cache_path);
1907                 if (value != NULL) {
1908                     tls_init->cache_size = nxt_conf_get_number(value);
1909                 }
1910 
1911                 value = nxt_conf_get_path(listener, &conf_timeout_path);
1912                 if (value != NULL) {
1913                     tls_init->timeout = nxt_conf_get_number(value);
1914                 }
1915 
1916                 tls_init->conf_cmds = nxt_conf_get_path(listener,
1917                                                         &conf_commands_path);
1918 
1919                 tls_init->tickets_conf = nxt_conf_get_path(listener,
1920                                                            &conf_tickets);
1921 
1922                 n = nxt_conf_array_elements_count_or_1(certificate);
1923 
1924                 for (i = 0; i < n; i++) {
1925                     value = nxt_conf_get_array_element_or_itself(certificate,
1926                                                                  i);
1927                     nxt_assert(value != NULL);
1928 
1929                     ret = nxt_router_conf_tls_insert(tmcf, value, skcf,
1930                                                      tls_init, i == 0);
1931                     if (nxt_slow_path(ret != NXT_OK)) {
1932                         goto fail;
1933                     }
1934                 }
1935             }
1936 #endif
1937 
1938             skcf->listen->handler = nxt_http_conn_init;
1939             skcf->router_conf = rtcf;
1940             skcf->router_conf->count++;
1941 
1942             if (lscf.pass.length != 0) {
1943                 skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass);
1944 
1945             /* COMPATIBILITY: listener application. */
1946             } else if (lscf.application.length > 0) {
1947                 skcf->action = nxt_http_pass_application(task, rtcf,
1948                                                          &lscf.application);
1949             }
1950 
1951             if (nxt_slow_path(skcf->action == NULL)) {
1952                 goto fail;
1953             }
1954         }
1955     }
1956 
1957     ret = nxt_http_routes_resolve(task, tmcf);
1958     if (nxt_slow_path(ret != NXT_OK)) {
1959         goto fail;
1960     }
1961 
1962     value = nxt_conf_get_path(root, &access_log_path);
1963 
1964     if (value != NULL) {
1965         nxt_conf_get_string(value, &path);
1966 
1967         access_log = router->access_log;
1968 
1969         if (access_log != NULL && nxt_strstr_eq(&path, &access_log->path)) {
1970             nxt_router_access_log_use(&router->lock, access_log);
1971 
1972         } else {
1973             access_log = nxt_malloc(sizeof(nxt_router_access_log_t)
1974                                     + path.length);
1975             if (access_log == NULL) {
1976                 nxt_alert(task, "failed to allocate access log structure");
1977                 goto fail;
1978             }
1979 
1980             access_log->fd = -1;
1981             access_log->handler = &nxt_router_access_log_writer;
1982             access_log->count = 1;
1983 
1984             access_log->path.length = path.length;
1985             access_log->path.start = (u_char *) access_log
1986                                      + sizeof(nxt_router_access_log_t);
1987 
1988             nxt_memcpy(access_log->path.start, path.start, path.length);
1989         }
1990 
1991         rtcf->access_log = access_log;
1992     }
1993 
1994     nxt_queue_add(&deleting_sockets, &router->sockets);
1995     nxt_queue_init(&router->sockets);
1996 
1997     return NXT_OK;
1998 
1999 app_fail:
2000 
2001     nxt_mp_destroy(app_mp);
2002 
2003 fail:
2004 
2005     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
2006 
2007         nxt_queue_remove(&app->link);
2008         nxt_thread_mutex_destroy(&app->mutex);
2009         nxt_mp_destroy(app->mem_pool);
2010 
2011     } nxt_queue_loop;
2012 
2013     return NXT_ERROR;
2014 }
2015 
2016 
2017 #if (NXT_TLS)
2018 
2019 static nxt_int_t
2020 nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
2021     nxt_conf_value_t *value, nxt_socket_conf_t *skcf,
2022     nxt_tls_init_t *tls_init, nxt_bool_t last)
2023 {
2024     nxt_router_tlssock_t  *tls;
2025 
2026     tls = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_router_tlssock_t));
2027     if (nxt_slow_path(tls == NULL)) {
2028         return NXT_ERROR;
2029     }
2030 
2031     tls->tls_init = tls_init;
2032     tls->socket_conf = skcf;
2033     tls->temp_conf = tmcf;
2034     tls->last = last;
2035     nxt_conf_get_string(value, &tls->name);
2036 
2037     nxt_queue_insert_tail(&tmcf->tls, &tls->link);
2038 
2039     return NXT_OK;
2040 }
2041 
2042 #endif
2043 
2044 
2045 static nxt_int_t
2046 nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf,
2047     nxt_conf_value_t *conf)
2048 {
2049     uint32_t          next, i;
2050     nxt_mp_t          *mp;
2051     nxt_str_t         *type, exten, str;
2052     nxt_int_t         ret;
2053     nxt_uint_t        exts;
2054     nxt_conf_value_t  *mtypes_conf, *ext_conf, *value;
2055 
2056     static nxt_str_t  mtypes_path = nxt_string("/mime_types");
2057 
2058     mp = rtcf->mem_pool;
2059 
2060     ret = nxt_http_static_mtypes_init(mp, &rtcf->mtypes_hash);
2061     if (nxt_slow_path(ret != NXT_OK)) {
2062         return NXT_ERROR;
2063     }
2064 
2065     if (conf == NULL) {
2066         return NXT_OK;
2067     }
2068 
2069     mtypes_conf = nxt_conf_get_path(conf, &mtypes_path);
2070 
2071     if (mtypes_conf != NULL) {
2072         next = 0;
2073 
2074         for ( ;; ) {
2075             ext_conf = nxt_conf_next_object_member(mtypes_conf, &str, &next);
2076 
2077             if (ext_conf == NULL) {
2078                 break;
2079             }
2080 
2081             type = nxt_str_dup(mp, NULL, &str);
2082             if (nxt_slow_path(type == NULL)) {
2083                 return NXT_ERROR;
2084             }
2085 
2086             if (nxt_conf_type(ext_conf) == NXT_CONF_STRING) {
2087                 nxt_conf_get_string(ext_conf, &str);
2088 
2089                 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2090                     return NXT_ERROR;
2091                 }
2092 
2093                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2094                                                       &exten, type);
2095                 if (nxt_slow_path(ret != NXT_OK)) {
2096                     return NXT_ERROR;
2097                 }
2098 
2099                 continue;
2100             }
2101 
2102             exts = nxt_conf_array_elements_count(ext_conf);
2103 
2104             for (i = 0; i < exts; i++) {
2105                 value = nxt_conf_get_array_element(ext_conf, i);
2106 
2107                 nxt_conf_get_string(value, &str);
2108 
2109                 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2110                     return NXT_ERROR;
2111                 }
2112 
2113                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2114                                                       &exten, type);
2115                 if (nxt_slow_path(ret != NXT_OK)) {
2116                     return NXT_ERROR;
2117                 }
2118             }
2119         }
2120     }
2121 
2122     return NXT_OK;
2123 }
2124 
2125 
2126 static nxt_int_t
2127 nxt_router_conf_process_client_ip(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2128     nxt_socket_conf_t *skcf, nxt_conf_value_t *conf)
2129 {
2130     char                        c;
2131     size_t                      i;
2132     nxt_mp_t                    *mp;
2133     uint32_t                    hash;
2134     nxt_str_t                   header;
2135     nxt_conf_value_t            *source_conf, *header_conf, *recursive_conf;
2136     nxt_http_client_ip_t        *client_ip;
2137     nxt_http_route_addr_rule_t  *source;
2138 
2139     static nxt_str_t  header_path = nxt_string("/header");
2140     static nxt_str_t  source_path = nxt_string("/source");
2141     static nxt_str_t  recursive_path = nxt_string("/recursive");
2142 
2143     if (conf == NULL) {
2144         skcf->client_ip = NULL;
2145 
2146         return NXT_OK;
2147     }
2148 
2149     mp = tmcf->router_conf->mem_pool;
2150 
2151     source_conf = nxt_conf_get_path(conf, &source_path);
2152     header_conf = nxt_conf_get_path(conf, &header_path);
2153     recursive_conf = nxt_conf_get_path(conf, &recursive_path);
2154 
2155     if (source_conf == NULL || header_conf == NULL) {
2156         return NXT_ERROR;
2157     }
2158 
2159     client_ip = nxt_mp_zget(mp, sizeof(nxt_http_client_ip_t));
2160     if (nxt_slow_path(client_ip == NULL)) {
2161         return NXT_ERROR;
2162     }
2163 
2164     source = nxt_http_route_addr_rule_create(task, mp, source_conf);
2165     if (nxt_slow_path(source == NULL)) {
2166         return NXT_ERROR;
2167     }
2168 
2169     client_ip->source = source;
2170 
2171     nxt_conf_get_string(header_conf, &header);
2172 
2173     if (recursive_conf != NULL) {
2174         client_ip->recursive = nxt_conf_get_boolean(recursive_conf);
2175     }
2176 
2177     client_ip->header = nxt_str_dup(mp, NULL, &header);
2178     if (nxt_slow_path(client_ip->header == NULL)) {
2179         return NXT_ERROR;
2180     }
2181 
2182     hash = NXT_HTTP_FIELD_HASH_INIT;
2183 
2184     for (i = 0; i < client_ip->header->length; i++) {
2185         c = client_ip->header->start[i];
2186         hash = nxt_http_field_hash_char(hash, nxt_lowcase(c));
2187     }
2188 
2189     hash = nxt_http_field_hash_end(hash) & 0xFFFF;
2190 
2191     client_ip->header_hash = hash;
2192 
2193     skcf->client_ip = client_ip;
2194 
2195     return NXT_OK;
2196 }
2197 
2198 
2199 static nxt_app_t *
2200 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
2201 {
2202     nxt_app_t  *app;
2203 
2204     nxt_queue_each(app, queue, nxt_app_t, link) {
2205 
2206         if (nxt_strstr_eq(name, &app->name)) {
2207             return app;
2208         }
2209 
2210     } nxt_queue_loop;
2211 
2212     return NULL;
2213 }
2214 
2215 
2216 static nxt_int_t
2217 nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port)
2218 {
2219     void       *mem;
2220     nxt_int_t  fd;
2221 
2222     fd = nxt_shm_open(task, sizeof(nxt_app_queue_t));
2223     if (nxt_slow_path(fd == -1)) {
2224         return NXT_ERROR;
2225     }
2226 
2227     mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t),
2228                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2229     if (nxt_slow_path(mem == MAP_FAILED)) {
2230         nxt_fd_close(fd);
2231 
2232         return NXT_ERROR;
2233     }
2234 
2235     nxt_app_queue_init(mem);
2236 
2237     port->queue_fd = fd;
2238     port->queue = mem;
2239 
2240     return NXT_OK;
2241 }
2242 
2243 
2244 static nxt_int_t
2245 nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port)
2246 {
2247     void       *mem;
2248     nxt_int_t  fd;
2249 
2250     fd = nxt_shm_open(task, sizeof(nxt_port_queue_t));
2251     if (nxt_slow_path(fd == -1)) {
2252         return NXT_ERROR;
2253     }
2254 
2255     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2256                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2257     if (nxt_slow_path(mem == MAP_FAILED)) {
2258         nxt_fd_close(fd);
2259 
2260         return NXT_ERROR;
2261     }
2262 
2263     nxt_port_queue_init(mem);
2264 
2265     port->queue_fd = fd;
2266     port->queue = mem;
2267 
2268     return NXT_OK;
2269 }
2270 
2271 
2272 static nxt_int_t
2273 nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd)
2274 {
2275     void  *mem;
2276 
2277     nxt_assert(fd != -1);
2278 
2279     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2280                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2281     if (nxt_slow_path(mem == MAP_FAILED)) {
2282 
2283         return NXT_ERROR;
2284     }
2285 
2286     port->queue = mem;
2287 
2288     return NXT_OK;
2289 }
2290 
2291 
2292 static const nxt_lvlhsh_proto_t  nxt_router_apps_hash_proto  nxt_aligned(64) = {
2293     NXT_LVLHSH_DEFAULT,
2294     nxt_router_apps_hash_test,
2295     nxt_mp_lvlhsh_alloc,
2296     nxt_mp_lvlhsh_free,
2297 };
2298 
2299 
2300 static nxt_int_t
2301 nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
2302 {
2303     nxt_app_t  *app;
2304 
2305     app = data;
2306 
2307     return nxt_strstr_eq(&lhq->key, &app->name) ? NXT_OK : NXT_DECLINED;
2308 }
2309 
2310 
2311 static nxt_int_t
2312 nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, nxt_app_t *app)
2313 {
2314     nxt_lvlhsh_query_t  lhq;
2315 
2316     lhq.key_hash = nxt_djb_hash(app->name.start, app->name.length);
2317     lhq.replace = 0;
2318     lhq.key = app->name;
2319     lhq.value = app;
2320     lhq.proto = &nxt_router_apps_hash_proto;
2321     lhq.pool = rtcf->mem_pool;
2322 
2323     switch (nxt_lvlhsh_insert(&rtcf->apps_hash, &lhq)) {
2324 
2325     case NXT_OK:
2326         return NXT_OK;
2327 
2328     case NXT_DECLINED:
2329         nxt_thread_log_alert("router app hash adding failed: "
2330                              "\"%V\" is already in hash", &lhq.key);
2331         /* Fall through. */
2332     default:
2333         return NXT_ERROR;
2334     }
2335 }
2336 
2337 
2338 static nxt_app_t *
2339 nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, nxt_str_t *name)
2340 {
2341     nxt_lvlhsh_query_t  lhq;
2342 
2343     lhq.key_hash = nxt_djb_hash(name->start, name->length);
2344     lhq.key = *name;
2345     lhq.proto = &nxt_router_apps_hash_proto;
2346 
2347     if (nxt_lvlhsh_find(&rtcf->apps_hash, &lhq) != NXT_OK) {
2348         return NULL;
2349     }
2350 
2351     return lhq.value;
2352 }
2353 
2354 
2355 static void
2356 nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i)
2357 {
2358     nxt_app_t          *app;
2359     nxt_lvlhsh_each_t  lhe;
2360 
2361     nxt_lvlhsh_each_init(&lhe, &nxt_router_apps_hash_proto);
2362 
2363     for ( ;; ) {
2364         app = nxt_lvlhsh_each(&rtcf->apps_hash, &lhe);
2365 
2366         if (app == NULL) {
2367             break;
2368         }
2369 
2370         nxt_router_app_use(task, app, i);
2371     }
2372 }
2373 
2374 
2375 typedef struct {
2376     nxt_app_t  *app;
2377     nxt_int_t  target;
2378 } nxt_http_app_conf_t;
2379 
2380 
2381 nxt_int_t
2382 nxt_router_application_init(nxt_router_conf_t *rtcf, nxt_str_t *name,
2383     nxt_str_t *target, nxt_http_action_t *action)
2384 {
2385     nxt_app_t            *app;
2386     nxt_str_t            *targets;
2387     nxt_uint_t           i;
2388     nxt_http_app_conf_t  *conf;
2389 
2390     app = nxt_router_apps_hash_get(rtcf, name);
2391     if (app == NULL) {
2392         return NXT_DECLINED;
2393     }
2394 
2395     conf = nxt_mp_get(rtcf->mem_pool, sizeof(nxt_http_app_conf_t));
2396     if (nxt_slow_path(conf == NULL)) {
2397         return NXT_ERROR;
2398     }
2399 
2400     action->handler = nxt_http_application_handler;
2401     action->u.conf = conf;
2402 
2403     conf->app = app;
2404 
2405     if (target != NULL && target->length != 0) {
2406         targets = app->targets;
2407 
2408         for (i = 0; !nxt_strstr_eq(target, &targets[i]); i++);
2409 
2410         conf->target = i;
2411 
2412     } else {
2413         conf->target = 0;
2414     }
2415 
2416     return NXT_OK;
2417 }
2418 
2419 
2420 static nxt_socket_conf_t *
2421 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2422     nxt_str_t *name)
2423 {
2424     size_t               size;
2425     nxt_int_t            ret;
2426     nxt_bool_t           wildcard;
2427     nxt_sockaddr_t       *sa;
2428     nxt_socket_conf_t    *skcf;
2429     nxt_listen_socket_t  *ls;
2430 
2431     sa = nxt_sockaddr_parse(tmcf->mem_pool, name);
2432     if (nxt_slow_path(sa == NULL)) {
2433         nxt_alert(task, "invalid listener \"%V\"", name);
2434         return NULL;
2435     }
2436 
2437     sa->type = SOCK_STREAM;
2438 
2439     nxt_debug(task, "router listener: \"%*s\"",
2440               (size_t) sa->length, nxt_sockaddr_start(sa));
2441 
2442     skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t));
2443     if (nxt_slow_path(skcf == NULL)) {
2444         return NULL;
2445     }
2446 
2447     size = nxt_sockaddr_size(sa);
2448 
2449     ret = nxt_router_listen_socket_find(tmcf, skcf, sa);
2450 
2451     if (ret != NXT_OK) {
2452 
2453         ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size);
2454         if (nxt_slow_path(ls == NULL)) {
2455             return NULL;
2456         }
2457 
2458         skcf->listen = ls;
2459 
2460         ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t));
2461         nxt_memcpy(ls->sockaddr, sa, size);
2462 
2463         nxt_listen_socket_remote_size(ls);
2464 
2465         ls->socket = -1;
2466         ls->backlog = NXT_LISTEN_BACKLOG;
2467         ls->flags = NXT_NONBLOCK;
2468         ls->read_after_accept = 1;
2469     }
2470 
2471     switch (sa->u.sockaddr.sa_family) {
2472 #if (NXT_HAVE_UNIX_DOMAIN)
2473     case AF_UNIX:
2474         wildcard = 0;
2475         break;
2476 #endif
2477 #if (NXT_INET6)
2478     case AF_INET6:
2479         wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr);
2480         break;
2481 #endif
2482     case AF_INET:
2483     default:
2484         wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY);
2485         break;
2486     }
2487 
2488     if (!wildcard) {
2489         skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size);
2490         if (nxt_slow_path(skcf->sockaddr == NULL)) {
2491             return NULL;
2492         }
2493 
2494         nxt_memcpy(skcf->sockaddr, sa, size);
2495     }
2496 
2497     return skcf;
2498 }
2499 
2500 
2501 static nxt_int_t
2502 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
2503     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa)
2504 {
2505     nxt_router_t       *router;
2506     nxt_queue_link_t   *qlk;
2507     nxt_socket_conf_t  *skcf;
2508 
2509     router = tmcf->router_conf->router;
2510 
2511     for (qlk = nxt_queue_first(&router->sockets);
2512          qlk != nxt_queue_tail(&router->sockets);
2513          qlk = nxt_queue_next(qlk))
2514     {
2515         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2516 
2517         if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) {
2518             nskcf->listen = skcf->listen;
2519 
2520             nxt_queue_remove(qlk);
2521             nxt_queue_insert_tail(&keeping_sockets, qlk);
2522 
2523             nxt_queue_insert_tail(&updating_sockets, &nskcf->link);
2524 
2525             return NXT_OK;
2526         }
2527     }
2528 
2529     nxt_queue_insert_tail(&pending_sockets, &nskcf->link);
2530 
2531     return NXT_DECLINED;
2532 }
2533 
2534 
2535 static void
2536 nxt_router_listen_socket_rpc_create(nxt_task_t *task,
2537     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)
2538 {
2539     size_t            size;
2540     uint32_t          stream;
2541     nxt_int_t         ret;
2542     nxt_buf_t         *b;
2543     nxt_port_t        *main_port, *router_port;
2544     nxt_runtime_t     *rt;
2545     nxt_socket_rpc_t  *rpc;
2546 
2547     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
2548     if (rpc == NULL) {
2549         goto fail;
2550     }
2551 
2552     rpc->socket_conf = skcf;
2553     rpc->temp_conf = tmcf;
2554 
2555     size = nxt_sockaddr_size(skcf->listen->sockaddr);
2556 
2557     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2558     if (b == NULL) {
2559         goto fail;
2560     }
2561 
2562     b->completion_handler = nxt_buf_dummy_completion;
2563 
2564     b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size);
2565 
2566     rt = task->thread->runtime;
2567     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2568     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2569 
2570     stream = nxt_port_rpc_register_handler(task, router_port,
2571                                            nxt_router_listen_socket_ready,
2572                                            nxt_router_listen_socket_error,
2573                                            main_port->pid, rpc);
2574     if (nxt_slow_path(stream == 0)) {
2575         goto fail;
2576     }
2577 
2578     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1,
2579                                 stream, router_port->id, b);
2580 
2581     if (nxt_slow_path(ret != NXT_OK)) {
2582         nxt_port_rpc_cancel(task, router_port, stream);
2583         goto fail;
2584     }
2585 
2586     return;
2587 
2588 fail:
2589 
2590     nxt_router_conf_error(task, tmcf);
2591 }
2592 
2593 
2594 static void
2595 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2596     void *data)
2597 {
2598     nxt_int_t         ret;
2599     nxt_socket_t      s;
2600     nxt_socket_rpc_t  *rpc;
2601 
2602     rpc = data;
2603 
2604     s = msg->fd[0];
2605 
2606     ret = nxt_socket_nonblocking(task, s);
2607     if (nxt_slow_path(ret != NXT_OK)) {
2608         goto fail;
2609     }
2610 
2611     nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr);
2612 
2613     ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
2614     if (nxt_slow_path(ret != NXT_OK)) {
2615         goto fail;
2616     }
2617 
2618     rpc->socket_conf->listen->socket = s;
2619 
2620     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2621                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2622 
2623     return;
2624 
2625 fail:
2626 
2627     nxt_socket_close(task, s);
2628 
2629     nxt_router_conf_error(task, rpc->temp_conf);
2630 }
2631 
2632 
2633 static void
2634 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2635     void *data)
2636 {
2637     nxt_socket_rpc_t        *rpc;
2638     nxt_router_temp_conf_t  *tmcf;
2639 
2640     rpc = data;
2641     tmcf = rpc->temp_conf;
2642 
2643 #if 0
2644     u_char                  *p;
2645     size_t                  size;
2646     uint8_t                 error;
2647     nxt_buf_t               *in, *out;
2648     nxt_sockaddr_t          *sa;
2649 
2650     static nxt_str_t  socket_errors[] = {
2651         nxt_string("ListenerSystem"),
2652         nxt_string("ListenerNoIPv6"),
2653         nxt_string("ListenerPort"),
2654         nxt_string("ListenerInUse"),
2655         nxt_string("ListenerNoAddress"),
2656         nxt_string("ListenerNoAccess"),
2657         nxt_string("ListenerPath"),
2658     };
2659 
2660     sa = rpc->socket_conf->listen->sockaddr;
2661 
2662     in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size);
2663 
2664     if (nxt_slow_path(in == NULL)) {
2665         return;
2666     }
2667 
2668     p = in->mem.pos;
2669 
2670     error = *p++;
2671 
2672     size = nxt_length("listen socket error: ")
2673            + nxt_length("{listener: \"\", code:\"\", message: \"\"}")
2674            + sa->length + socket_errors[error].length + (in->mem.free - p);
2675 
2676     out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2677     if (nxt_slow_path(out == NULL)) {
2678         return;
2679     }
2680 
2681     out->mem.free = nxt_sprintf(out->mem.free, out->mem.end,
2682                         "listen socket error: "
2683                         "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}",
2684                         (size_t) sa->length, nxt_sockaddr_start(sa),
2685                         &socket_errors[error], in->mem.free - p, p);
2686 
2687     nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos);
2688 #endif
2689 
2690     nxt_router_conf_error(task, tmcf);
2691 }
2692 
2693 
2694 #if (NXT_TLS)
2695 
2696 static void
2697 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2698     void *data)
2699 {
2700     nxt_mp_t                *mp;
2701     nxt_int_t               ret;
2702     nxt_tls_conf_t          *tlscf;
2703     nxt_router_tlssock_t    *tls;
2704     nxt_tls_bundle_conf_t   *bundle;
2705     nxt_router_temp_conf_t  *tmcf;
2706 
2707     nxt_debug(task, "tls rpc handler");
2708 
2709     tls = data;
2710     tmcf = tls->temp_conf;
2711 
2712     if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) {
2713         goto fail;
2714     }
2715 
2716     mp = tmcf->router_conf->mem_pool;
2717 
2718     if (tls->socket_conf->tls == NULL){
2719         tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t));
2720         if (nxt_slow_path(tlscf == NULL)) {
2721             goto fail;
2722         }
2723 
2724         tlscf->no_wait_shutdown = 1;
2725         tls->socket_conf->tls = tlscf;
2726 
2727     } else {
2728         tlscf = tls->socket_conf->tls;
2729     }
2730 
2731     tls->tls_init->conf = tlscf;
2732 
2733     bundle = nxt_mp_get(mp, sizeof(nxt_tls_bundle_conf_t));
2734     if (nxt_slow_path(bundle == NULL)) {
2735         goto fail;
2736     }
2737 
2738     if (nxt_slow_path(nxt_str_dup(mp, &bundle->name, &tls->name) == NULL)) {
2739         goto fail;
2740     }
2741 
2742     bundle->chain_file = msg->fd[0];
2743     bundle->next = tlscf->bundle;
2744     tlscf->bundle = bundle;
2745 
2746     ret = task->thread->runtime->tls->server_init(task, mp, tls->tls_init,
2747                                                   tls->last);
2748     if (nxt_slow_path(ret != NXT_OK)) {
2749         goto fail;
2750     }
2751 
2752     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2753                        nxt_router_conf_apply, task, tmcf, NULL);
2754     return;
2755 
2756 fail:
2757 
2758     nxt_router_conf_error(task, tmcf);
2759 }
2760 
2761 #endif
2762 
2763 
2764 static void
2765 nxt_router_app_rpc_create(nxt_task_t *task,
2766     nxt_router_temp_conf_t *tmcf, nxt_app_t *app)
2767 {
2768     size_t         size;
2769     uint32_t       stream;
2770     nxt_fd_t       port_fd, queue_fd;
2771     nxt_int_t      ret;
2772     nxt_buf_t      *b;
2773     nxt_port_t     *router_port, *dport;
2774     nxt_runtime_t  *rt;
2775     nxt_app_rpc_t  *rpc;
2776 
2777     rt = task->thread->runtime;
2778 
2779     dport = app->proto_port;
2780 
2781     if (dport == NULL) {
2782         nxt_debug(task, "app '%V' prototype prefork", &app->name);
2783 
2784         size = app->name.length + 1 + app->conf.length;
2785 
2786         b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2787         if (nxt_slow_path(b == NULL)) {
2788             goto fail;
2789         }
2790 
2791         b->completion_handler = nxt_buf_dummy_completion;
2792 
2793         nxt_buf_cpystr(b, &app->name);
2794         *b->mem.free++ = '\0';
2795         nxt_buf_cpystr(b, &app->conf);
2796 
2797         dport = rt->port_by_type[NXT_PROCESS_MAIN];
2798 
2799         port_fd = app->shared_port->pair[0];
2800         queue_fd = app->shared_port->queue_fd;
2801 
2802     } else {
2803         nxt_debug(task, "app '%V' prefork", &app->name);
2804 
2805         b = NULL;
2806         port_fd = -1;
2807         queue_fd = -1;
2808     }
2809 
2810     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2811 
2812     rpc = nxt_port_rpc_register_handler_ex(task, router_port,
2813                                            nxt_router_app_prefork_ready,
2814                                            nxt_router_app_prefork_error,
2815                                            sizeof(nxt_app_rpc_t));
2816     if (nxt_slow_path(rpc == NULL)) {
2817         goto fail;
2818     }
2819 
2820     rpc->app = app;
2821     rpc->temp_conf = tmcf;
2822     rpc->proto = (b != NULL);
2823 
2824     stream = nxt_port_rpc_ex_stream(rpc);
2825 
2826     ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS,
2827                                  port_fd, queue_fd, stream, router_port->id, b);
2828     if (nxt_slow_path(ret != NXT_OK)) {
2829         nxt_port_rpc_cancel(task, router_port, stream);
2830         goto fail;
2831     }
2832 
2833     if (b == NULL) {
2834         nxt_port_rpc_ex_set_peer(task, router_port, rpc, dport->pid);
2835 
2836         app->pending_processes++;
2837     }
2838 
2839     return;
2840 
2841 fail:
2842 
2843     nxt_router_conf_error(task, tmcf);
2844 }
2845 
2846 
2847 static void
2848 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2849     void *data)
2850 {
2851     nxt_app_t           *app;
2852     nxt_port_t          *port;
2853     nxt_app_rpc_t       *rpc;
2854     nxt_event_engine_t  *engine;
2855 
2856     rpc = data;
2857     app = rpc->app;
2858 
2859     port = msg->u.new_port;
2860 
2861     nxt_assert(port != NULL);
2862     nxt_assert(port->id == 0);
2863 
2864     if (rpc->proto) {
2865         nxt_assert(app->proto_port == NULL);
2866         nxt_assert(port->type == NXT_PROCESS_PROTOTYPE);
2867 
2868         nxt_port_inc_use(port);
2869 
2870         app->proto_port = port;
2871         port->app = app;
2872 
2873         nxt_router_app_rpc_create(task, rpc->temp_conf, app);
2874 
2875         return;
2876     }
2877 
2878     nxt_assert(port->type == NXT_PROCESS_APP);
2879 
2880     port->app = app;
2881     port->main_app_port = port;
2882 
2883     app->pending_processes--;
2884     app->processes++;
2885     app->idle_processes++;
2886 
2887     engine = task->thread->engine;
2888 
2889     nxt_queue_insert_tail(&app->ports, &port->app_link);
2890     nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
2891 
2892     nxt_debug(task, "app '%V' move new port %PI:%d to spare_ports",
2893               &app->name, port->pid, port->id);
2894 
2895     nxt_port_hash_add(&app->port_hash, port);
2896     app->port_hash_count++;
2897 
2898     port->idle_start = 0;
2899 
2900     nxt_port_inc_use(port);
2901 
2902     nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
2903 
2904     nxt_work_queue_add(&engine->fast_work_queue,
2905                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2906 }
2907 
2908 
2909 static void
2910 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2911     void *data)
2912 {
2913     nxt_app_t               *app;
2914     nxt_app_rpc_t           *rpc;
2915     nxt_router_temp_conf_t  *tmcf;
2916 
2917     rpc = data;
2918     app = rpc->app;
2919     tmcf = rpc->temp_conf;
2920 
2921     if (rpc->proto) {
2922         nxt_log(task, NXT_LOG_WARN, "failed to start prototype \"%V\"",
2923                 &app->name);
2924 
2925     } else {
2926         nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"",
2927                 &app->name);
2928 
2929         app->pending_processes--;
2930     }
2931 
2932     nxt_router_conf_error(task, tmcf);
2933 }
2934 
2935 
2936 static nxt_int_t
2937 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
2938     nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
2939 {
2940     nxt_int_t                 ret;
2941     nxt_uint_t                n, threads;
2942     nxt_queue_link_t          *qlk;
2943     nxt_router_engine_conf_t  *recf;
2944 
2945     threads = tmcf->router_conf->threads;
2946 
2947     tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
2948                                      sizeof(nxt_router_engine_conf_t));
2949     if (nxt_slow_path(tmcf->engines == NULL)) {
2950         return NXT_ERROR;
2951     }
2952 
2953     n = 0;
2954 
2955     for (qlk = nxt_queue_first(&router->engines);
2956          qlk != nxt_queue_tail(&router->engines);
2957          qlk = nxt_queue_next(qlk))
2958     {
2959         recf = nxt_array_zero_add(tmcf->engines);
2960         if (nxt_slow_path(recf == NULL)) {
2961             return NXT_ERROR;
2962         }
2963 
2964         recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0);
2965 
2966         if (n < threads) {
2967             recf->action = NXT_ROUTER_ENGINE_KEEP;
2968             ret = nxt_router_engine_conf_update(tmcf, recf);
2969 
2970         } else {
2971             recf->action = NXT_ROUTER_ENGINE_DELETE;
2972             ret = nxt_router_engine_conf_delete(tmcf, recf);
2973         }
2974 
2975         if (nxt_slow_path(ret != NXT_OK)) {
2976             return ret;
2977         }
2978 
2979         n++;
2980     }
2981 
2982     tmcf->new_threads = n;
2983 
2984     while (n < threads) {
2985         recf = nxt_array_zero_add(tmcf->engines);
2986         if (nxt_slow_path(recf == NULL)) {
2987             return NXT_ERROR;
2988         }
2989 
2990         recf->action = NXT_ROUTER_ENGINE_ADD;
2991 
2992         recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
2993         if (nxt_slow_path(recf->engine == NULL)) {
2994             return NXT_ERROR;
2995         }
2996 
2997         ret = nxt_router_engine_conf_create(tmcf, recf);
2998         if (nxt_slow_path(ret != NXT_OK)) {
2999             return ret;
3000         }
3001 
3002         n++;
3003     }
3004 
3005     return NXT_OK;
3006 }
3007 
3008 
3009 static nxt_int_t
3010 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
3011     nxt_router_engine_conf_t *recf)
3012 {
3013     nxt_int_t  ret;
3014 
3015     ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
3016                                           nxt_router_listen_socket_create);
3017     if (nxt_slow_path(ret != NXT_OK)) {
3018         return ret;
3019     }
3020 
3021     ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
3022                                           nxt_router_listen_socket_create);
3023     if (nxt_slow_path(ret != NXT_OK)) {
3024         return ret;
3025     }
3026 
3027     return ret;
3028 }
3029 
3030 
3031 static nxt_int_t
3032 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
3033     nxt_router_engine_conf_t *recf)
3034 {
3035     nxt_int_t  ret;
3036 
3037     ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
3038                                           nxt_router_listen_socket_create);
3039     if (nxt_slow_path(ret != NXT_OK)) {
3040         return ret;
3041     }
3042 
3043     ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
3044                                           nxt_router_listen_socket_update);
3045     if (nxt_slow_path(ret != NXT_OK)) {
3046         return ret;
3047     }
3048 
3049     ret = nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
3050     if (nxt_slow_path(ret != NXT_OK)) {
3051         return ret;
3052     }
3053 
3054     return ret;
3055 }
3056 
3057 
3058 static nxt_int_t
3059 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
3060     nxt_router_engine_conf_t *recf)
3061 {
3062     nxt_int_t  ret;
3063 
3064     ret = nxt_router_engine_quit(tmcf, recf);
3065     if (nxt_slow_path(ret != NXT_OK)) {
3066         return ret;
3067     }
3068 
3069     ret = nxt_router_engine_joints_delete(tmcf, recf, &updating_sockets);
3070     if (nxt_slow_path(ret != NXT_OK)) {
3071         return ret;
3072     }
3073 
3074     return nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
3075 }
3076 
3077 
3078 static nxt_int_t
3079 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
3080     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
3081     nxt_work_handler_t handler)
3082 {
3083     nxt_int_t                ret;
3084     nxt_joint_job_t          *job;
3085     nxt_queue_link_t         *qlk;
3086     nxt_socket_conf_t        *skcf;
3087     nxt_socket_conf_joint_t  *joint;
3088 
3089     for (qlk = nxt_queue_first(sockets);
3090          qlk != nxt_queue_tail(sockets);
3091          qlk = nxt_queue_next(qlk))
3092     {
3093         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3094         if (nxt_slow_path(job == NULL)) {
3095             return NXT_ERROR;
3096         }
3097 
3098         job->work.next = recf->jobs;
3099         recf->jobs = &job->work;
3100 
3101         job->task = tmcf->engine->task;
3102         job->work.handler = handler;
3103         job->work.task = &job->task;
3104         job->work.obj = job;
3105         job->tmcf = tmcf;
3106 
3107         tmcf->count++;
3108 
3109         joint = nxt_mp_alloc(tmcf->router_conf->mem_pool,
3110                              sizeof(nxt_socket_conf_joint_t));
3111         if (nxt_slow_path(joint == NULL)) {
3112             return NXT_ERROR;
3113         }
3114 
3115         job->work.data = joint;
3116 
3117         ret = nxt_upstreams_joint_create(tmcf, &joint->upstreams);
3118         if (nxt_slow_path(ret != NXT_OK)) {
3119             return ret;
3120         }
3121 
3122         joint->count = 1;
3123 
3124         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
3125         skcf->count++;
3126         joint->socket_conf = skcf;
3127 
3128         joint->engine = recf->engine;
3129     }
3130 
3131     return NXT_OK;
3132 }
3133 
3134 
3135 static nxt_int_t
3136 nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
3137     nxt_router_engine_conf_t *recf)
3138 {
3139     nxt_joint_job_t  *job;
3140 
3141     job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3142     if (nxt_slow_path(job == NULL)) {
3143         return NXT_ERROR;
3144     }
3145 
3146     job->work.next = recf->jobs;
3147     recf->jobs = &job->work;
3148 
3149     job->task = tmcf->engine->task;
3150     job->work.handler = nxt_router_worker_thread_quit;
3151     job->work.task = &job->task;
3152     job->work.obj = NULL;
3153     job->work.data = NULL;
3154     job->tmcf = NULL;
3155 
3156     return NXT_OK;
3157 }
3158 
3159 
3160 static nxt_int_t
3161 nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
3162     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets)
3163 {
3164     nxt_joint_job_t   *job;
3165     nxt_queue_link_t  *qlk;
3166 
3167     for (qlk = nxt_queue_first(sockets);
3168          qlk != nxt_queue_tail(sockets);
3169          qlk = nxt_queue_next(qlk))
3170     {
3171         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3172         if (nxt_slow_path(job == NULL)) {
3173             return NXT_ERROR;
3174         }
3175 
3176         job->work.next = recf->jobs;
3177         recf->jobs = &job->work;
3178 
3179         job->task = tmcf->engine->task;
3180         job->work.handler = nxt_router_listen_socket_delete;
3181         job->work.task = &job->task;
3182         job->work.obj = job;
3183         job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
3184         job->tmcf = tmcf;
3185 
3186         tmcf->count++;
3187     }
3188 
3189     return NXT_OK;
3190 }
3191 
3192 
3193 static nxt_int_t
3194 nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
3195     nxt_router_temp_conf_t *tmcf)
3196 {
3197     nxt_int_t                 ret;
3198     nxt_uint_t                i, threads;
3199     nxt_router_engine_conf_t  *recf;
3200 
3201     recf = tmcf->engines->elts;
3202     threads = tmcf->router_conf->threads;
3203 
3204     for (i = tmcf->new_threads; i < threads; i++) {
3205         ret = nxt_router_thread_create(task, rt, recf[i].engine);
3206         if (nxt_slow_path(ret != NXT_OK)) {
3207             return ret;
3208         }
3209     }
3210 
3211     return NXT_OK;
3212 }
3213 
3214 
3215 static nxt_int_t
3216 nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
3217     nxt_event_engine_t *engine)
3218 {
3219     nxt_int_t            ret;
3220     nxt_thread_link_t    *link;
3221     nxt_thread_handle_t  handle;
3222 
3223     link = nxt_zalloc(sizeof(nxt_thread_link_t));
3224 
3225     if (nxt_slow_path(link == NULL)) {
3226         return NXT_ERROR;
3227     }
3228 
3229     link->start = nxt_router_thread_start;
3230     link->engine = engine;
3231     link->work.handler = nxt_router_thread_exit_handler;
3232     link->work.task = task;
3233     link->work.data = link;
3234 
3235     nxt_queue_insert_tail(&rt->engines, &engine->link);
3236 
3237     ret = nxt_thread_create(&handle, link);
3238 
3239     if (nxt_slow_path(ret != NXT_OK)) {
3240         nxt_queue_remove(&engine->link);
3241     }
3242 
3243     return ret;
3244 }
3245 
3246 
3247 static void
3248 nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
3249     nxt_router_temp_conf_t *tmcf)
3250 {
3251     nxt_app_t  *app;
3252 
3253     nxt_queue_each(app, &router->apps, nxt_app_t, link) {
3254 
3255         nxt_router_app_unlink(task, app);
3256 
3257     } nxt_queue_loop;
3258 
3259     nxt_queue_add(&router->apps, &tmcf->previous);
3260     nxt_queue_add(&router->apps, &tmcf->apps);
3261 }
3262 
3263 
3264 static void
3265 nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf)
3266 {
3267     nxt_uint_t                n;
3268     nxt_event_engine_t        *engine;
3269     nxt_router_engine_conf_t  *recf;
3270 
3271     recf = tmcf->engines->elts;
3272 
3273     for (n = tmcf->engines->nelts; n != 0; n--) {
3274         engine = recf->engine;
3275 
3276         switch (recf->action) {
3277 
3278         case NXT_ROUTER_ENGINE_KEEP:
3279             break;
3280 
3281         case NXT_ROUTER_ENGINE_ADD:
3282             nxt_queue_insert_tail(&router->engines, &engine->link0);
3283             break;
3284 
3285         case NXT_ROUTER_ENGINE_DELETE:
3286             nxt_queue_remove(&engine->link0);
3287             break;
3288         }
3289 
3290         nxt_router_engine_post(engine, recf->jobs);
3291 
3292         recf++;
3293     }
3294 }
3295 
3296 
3297 static void
3298 nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs)
3299 {
3300     nxt_work_t  *work, *next;
3301 
3302     for (work = jobs; work != NULL; work = next) {
3303         next = work->next;
3304         work->next = NULL;
3305 
3306         nxt_event_engine_post(engine, work);
3307     }
3308 }
3309 
3310 
3311 static nxt_port_handlers_t  nxt_router_app_port_handlers = {
3312     .rpc_error       = nxt_port_rpc_handler,
3313     .mmap            = nxt_port_mmap_handler,
3314     .data            = nxt_port_rpc_handler,
3315     .oosm            = nxt_router_oosm_handler,
3316     .req_headers_ack = nxt_port_rpc_handler,
3317 };
3318 
3319 
3320 static void
3321 nxt_router_thread_start(void *data)
3322 {
3323     nxt_int_t           ret;
3324     nxt_port_t          *port;
3325     nxt_task_t          *task;
3326     nxt_work_t          *work;
3327     nxt_thread_t        *thread;
3328     nxt_thread_link_t   *link;
3329     nxt_event_engine_t  *engine;
3330 
3331     link = data;
3332     engine = link->engine;
3333     task = &engine->task;
3334 
3335     thread = nxt_thread();
3336 
3337     nxt_event_engine_thread_adopt(engine);
3338 
3339     /* STUB */
3340     thread->runtime = engine->task.thread->runtime;
3341 
3342     engine->task.thread = thread;
3343     engine->task.log = thread->log;
3344     thread->engine = engine;
3345     thread->task = &engine->task;
3346 #if 0
3347     thread->fiber = &engine->fibers->fiber;
3348 #endif
3349 
3350     engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
3351     if (nxt_slow_path(engine->mem_pool == NULL)) {
3352         return;
3353     }
3354 
3355     port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid,
3356                         NXT_PROCESS_ROUTER);
3357     if (nxt_slow_path(port == NULL)) {
3358         return;
3359     }
3360 
3361     ret = nxt_port_socket_init(task, port, 0);
3362     if (nxt_slow_path(ret != NXT_OK)) {
3363         nxt_port_use(task, port, -1);
3364         return;
3365     }
3366 
3367     ret = nxt_router_port_queue_init(task, port);
3368     if (nxt_slow_path(ret != NXT_OK)) {
3369         nxt_port_use(task, port, -1);
3370         return;
3371     }
3372 
3373     engine->port = port;
3374 
3375     nxt_port_enable(task, port, &nxt_router_app_port_handlers);
3376 
3377     work = nxt_zalloc(sizeof(nxt_work_t));
3378     if (nxt_slow_path(work == NULL)) {
3379         return;
3380     }
3381 
3382     work->handler = nxt_router_rt_add_port;
3383     work->task = link->work.task;
3384     work->obj = work;
3385     work->data = port;
3386 
3387     nxt_event_engine_post(link->work.task->thread->engine, work);
3388 
3389     nxt_event_engine_start(engine);
3390 }
3391 
3392 
3393 static void
3394 nxt_router_rt_add_port(nxt_task_t *task, void *obj, void *data)
3395 {
3396     nxt_int_t      res;
3397     nxt_port_t     *port;
3398     nxt_runtime_t  *rt;
3399 
3400     rt = task->thread->runtime;
3401     port = data;
3402 
3403     nxt_free(obj);
3404 
3405     res = nxt_port_hash_add(&rt->ports, port);
3406 
3407     if (nxt_fast_path(res == NXT_OK)) {
3408         nxt_port_use(task, port, 1);
3409     }
3410 }
3411 
3412 
3413 static void
3414 nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data)
3415 {
3416     nxt_joint_job_t          *job;
3417     nxt_socket_conf_t        *skcf;
3418     nxt_listen_event_t       *lev;
3419     nxt_listen_socket_t      *ls;
3420     nxt_thread_spinlock_t    *lock;
3421     nxt_socket_conf_joint_t  *joint;
3422 
3423     job = obj;
3424     joint = data;
3425 
3426     nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link);
3427 
3428     skcf = joint->socket_conf;
3429     ls = skcf->listen;
3430 
3431     lev = nxt_listen_event(task, ls);
3432     if (nxt_slow_path(lev == NULL)) {
3433         nxt_router_listen_socket_release(task, skcf);
3434         return;
3435     }
3436 
3437     lev->socket.data = joint;
3438 
3439     lock = &skcf->router_conf->router->lock;
3440 
3441     nxt_thread_spin_lock(lock);
3442     ls->count++;
3443     nxt_thread_spin_unlock(lock);
3444 
3445     job->work.next = NULL;
3446     job->work.handler = nxt_router_conf_wait;
3447 
3448     nxt_event_engine_post(job->tmcf->engine, &job->work);
3449 }
3450 
3451 
3452 nxt_inline nxt_listen_event_t *
3453 nxt_router_listen_event(nxt_queue_t *listen_connections,
3454     nxt_socket_conf_t *skcf)
3455 {
3456     nxt_socket_t        fd;
3457     nxt_queue_link_t    *qlk;
3458     nxt_listen_event_t  *lev;
3459 
3460     fd = skcf->listen->socket;
3461 
3462     for (qlk = nxt_queue_first(listen_connections);
3463          qlk != nxt_queue_tail(listen_connections);
3464          qlk = nxt_queue_next(qlk))
3465     {
3466         lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link);
3467 
3468         if (fd == lev->socket.fd) {
3469             return lev;
3470         }
3471     }
3472 
3473     return NULL;
3474 }
3475 
3476 
3477 static void
3478 nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data)
3479 {
3480     nxt_joint_job_t          *job;
3481     nxt_event_engine_t       *engine;
3482     nxt_listen_event_t       *lev;
3483     nxt_socket_conf_joint_t  *joint, *old;
3484 
3485     job = obj;
3486     joint = data;
3487 
3488     engine = task->thread->engine;
3489 
3490     nxt_queue_insert_tail(&engine->joints, &joint->link);
3491 
3492     lev = nxt_router_listen_event(&engine->listen_connections,
3493                                   joint->socket_conf);
3494 
3495     old = lev->socket.data;
3496     lev->socket.data = joint;
3497     lev->listen = joint->socket_conf->listen;
3498 
3499     job->work.next = NULL;
3500     job->work.handler = nxt_router_conf_wait;
3501 
3502     nxt_event_engine_post(job->tmcf->engine, &job->work);
3503 
3504     /*
3505      * The task is allocated from configuration temporary
3506      * memory pool so it can be freed after engine post operation.
3507      */
3508 
3509     nxt_router_conf_release(&engine->task, old);
3510 }
3511 
3512 
3513 static void
3514 nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data)
3515 {
3516     nxt_socket_conf_t        *skcf;
3517     nxt_listen_event_t       *lev;
3518     nxt_event_engine_t       *engine;
3519     nxt_socket_conf_joint_t  *joint;
3520 
3521     skcf = data;
3522 
3523     engine = task->thread->engine;
3524 
3525     lev = nxt_router_listen_event(&engine->listen_connections, skcf);
3526 
3527     nxt_fd_event_delete(engine, &lev->socket);
3528 
3529     nxt_debug(task, "engine %p: listen socket delete: %d", engine,
3530               lev->socket.fd);
3531 
3532     joint = lev->socket.data;
3533     joint->close_job = obj;
3534 
3535     lev->timer.handler = nxt_router_listen_socket_close;
3536     lev->timer.work_queue = &engine->fast_work_queue;
3537 
3538     nxt_timer_add(engine, &lev->timer, 0);
3539 }
3540 
3541 
3542 static void
3543 nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, void *data)
3544 {
3545     nxt_event_engine_t  *engine;
3546 
3547     nxt_debug(task, "router worker thread quit");
3548 
3549     engine = task->thread->engine;
3550 
3551     engine->shutdown = 1;
3552 
3553     if (nxt_queue_is_empty(&engine->joints)) {
3554         nxt_thread_exit(task->thread);
3555     }
3556 }
3557 
3558 
3559 static void
3560 nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data)
3561 {
3562     nxt_timer_t              *timer;
3563     nxt_joint_job_t          *job;
3564     nxt_listen_event_t       *lev;
3565     nxt_socket_conf_joint_t  *joint;
3566 
3567     timer = obj;
3568     lev = nxt_timer_data(timer, nxt_listen_event_t, timer);
3569 
3570     nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine,
3571               lev->socket.fd);
3572 
3573     nxt_queue_remove(&lev->link);
3574 
3575     joint = lev->socket.data;
3576     lev->socket.data = NULL;
3577 
3578     /* 'task' refers to lev->task and we cannot use after nxt_free() */
3579     task = &task->thread->engine->task;
3580 
3581     nxt_router_listen_socket_release(task, joint->socket_conf);
3582 
3583     job = joint->close_job;
3584     job->work.next = NULL;
3585     job->work.handler = nxt_router_conf_wait;
3586 
3587     nxt_event_engine_post(job->tmcf->engine, &job->work);
3588 
3589     nxt_router_listen_event_release(task, lev, joint);
3590 }
3591 
3592 
3593 static void
3594 nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf)
3595 {
3596     nxt_listen_socket_t    *ls;
3597     nxt_thread_spinlock_t  *lock;
3598 
3599     ls = skcf->listen;
3600     lock = &skcf->router_conf->router->lock;
3601 
3602     nxt_thread_spin_lock(lock);
3603 
3604     nxt_debug(task, "engine %p: listen socket release: ls->count %D",
3605               task->thread->engine, ls->count);
3606 
3607     if (--ls->count != 0) {
3608         ls = NULL;
3609     }
3610 
3611     nxt_thread_spin_unlock(lock);
3612 
3613     if (ls != NULL) {
3614         nxt_socket_close(task, ls->socket);
3615         nxt_free(ls);
3616     }
3617 }
3618 
3619 
3620 void
3621 nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev,
3622     nxt_socket_conf_joint_t *joint)
3623 {
3624     nxt_event_engine_t  *engine;
3625 
3626     nxt_debug(task, "listen event count: %D", lev->count);
3627 
3628     engine = task->thread->engine;
3629 
3630     if (--lev->count == 0) {
3631         if (lev->next != NULL) {
3632             nxt_sockaddr_cache_free(engine, lev->next);
3633 
3634             nxt_conn_free(task, lev->next);
3635         }
3636 
3637         nxt_free(lev);
3638     }
3639 
3640     if (joint != NULL) {
3641         nxt_router_conf_release(task, joint);
3642     }
3643 
3644     if (engine->shutdown && nxt_queue_is_empty(&engine->joints)) {
3645         nxt_thread_exit(task->thread);
3646     }
3647 }
3648 
3649 
3650 void
3651 nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
3652 {
3653     nxt_socket_conf_t      *skcf;
3654     nxt_router_conf_t      *rtcf;
3655     nxt_thread_spinlock_t  *lock;
3656 
3657     nxt_debug(task, "conf joint %p count: %D", joint, joint->count);
3658 
3659     if (--joint->count != 0) {
3660         return;
3661     }
3662 
3663     nxt_queue_remove(&joint->link);
3664 
3665     /*
3666      * The joint content can not be safely used after the critical
3667      * section protected by the spinlock because its memory pool may
3668      * be already destroyed by another thread.
3669      */
3670     skcf = joint->socket_conf;
3671     rtcf = skcf->router_conf;
3672     lock = &rtcf->router->lock;
3673 
3674     nxt_thread_spin_lock(lock);
3675 
3676     nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count,
3677               rtcf, rtcf->count);
3678 
3679     if (--skcf->count != 0) {
3680         skcf = NULL;
3681         rtcf = NULL;
3682 
3683     } else {
3684         nxt_queue_remove(&skcf->link);
3685 
3686         if (--rtcf->count != 0) {
3687             rtcf = NULL;
3688         }
3689     }
3690 
3691     nxt_thread_spin_unlock(lock);
3692 
3693 #if (NXT_TLS)
3694     if (skcf != NULL && skcf->tls != NULL) {
3695         task->thread->runtime->tls->server_free(task, skcf->tls);
3696     }
3697 #endif
3698 
3699     /* TODO remove engine->port */
3700 
3701     if (rtcf != NULL) {
3702         nxt_debug(task, "old router conf is destroyed");
3703 
3704         nxt_router_apps_hash_use(task, rtcf, -1);
3705 
3706         nxt_router_access_log_release(task, lock, rtcf->access_log);
3707 
3708         nxt_mp_thread_adopt(rtcf->mem_pool);
3709 
3710         nxt_mp_destroy(rtcf->mem_pool);
3711     }
3712 }
3713 
3714 
3715 static void
3716 nxt_router_access_log_writer(nxt_task_t *task, nxt_http_request_t *r,
3717     nxt_router_access_log_t *access_log)
3718 {
3719     size_t     size;
3720     u_char     *buf, *p;
3721     nxt_off_t  bytes;
3722 
3723     static nxt_time_string_t  date_cache = {
3724         (nxt_atomic_uint_t) -1,
3725         nxt_router_access_log_date,
3726         "%02d/%s/%4d:%02d:%02d:%02d %c%02d%02d",
3727         nxt_length("31/Dec/1986:19:40:00 +0300"),
3728         NXT_THREAD_TIME_LOCAL,
3729         NXT_THREAD_TIME_SEC,
3730     };
3731 
3732     size = r->remote->address_length
3733            + 6                  /* ' - - [' */
3734            + date_cache.size
3735            + 3                  /* '] "' */
3736            + r->method->length
3737            + 1                  /* space */
3738            + r->target.length
3739            + 1                  /* space */
3740            + r->version.length
3741            + 2                  /* '" ' */
3742            + 3                  /* status */
3743            + 1                  /* space */
3744            + NXT_OFF_T_LEN
3745            + 2                  /* ' "' */
3746            + (r->referer != NULL ? r->referer->value_length : 1)
3747            + 3                  /* '" "' */
3748            + (r->user_agent != NULL ? r->user_agent->value_length : 1)
3749            + 2                  /* '"\n' */
3750     ;
3751 
3752     buf = nxt_mp_nget(r->mem_pool, size);
3753     if (nxt_slow_path(buf == NULL)) {
3754         return;
3755     }
3756 
3757     p = nxt_cpymem(buf, nxt_sockaddr_address(r->remote),
3758                    r->remote->address_length);
3759 
3760     p = nxt_cpymem(p, " - - [", 6);
3761 
3762     p = nxt_thread_time_string(task->thread, &date_cache, p);
3763 
3764     p = nxt_cpymem(p, "] \"", 3);
3765 
3766     if (r->method->length != 0) {
3767         p = nxt_cpymem(p, r->method->start, r->method->length);
3768 
3769         if (r->target.length != 0) {
3770             *p++ = ' ';
3771             p = nxt_cpymem(p, r->target.start, r->target.length);
3772 
3773             if (r->version.length != 0) {
3774                 *p++ = ' ';
3775                 p = nxt_cpymem(p, r->version.start, r->version.length);
3776             }
3777         }
3778 
3779     } else {
3780         *p++ = '-';
3781     }
3782 
3783     p = nxt_cpymem(p, "\" ", 2);
3784 
3785     p = nxt_sprintf(p, p + 3, "%03d", r->status);
3786 
3787     *p++ = ' ';
3788 
3789     bytes = nxt_http_proto[r->protocol].body_bytes_sent(task, r->proto);
3790 
3791     p = nxt_sprintf(p, p + NXT_OFF_T_LEN, "%O", bytes);
3792 
3793     p = nxt_cpymem(p, " \"", 2);
3794 
3795     if (r->referer != NULL) {
3796         p = nxt_cpymem(p, r->referer->value, r->referer->value_length);
3797 
3798     } else {
3799         *p++ = '-';
3800     }
3801 
3802     p = nxt_cpymem(p, "\" \"", 3);
3803 
3804     if (r->user_agent != NULL) {
3805         p = nxt_cpymem(p, r->user_agent->value, r->user_agent->value_length);
3806 
3807     } else {
3808         *p++ = '-';
3809     }
3810 
3811     p = nxt_cpymem(p, "\"\n", 2);
3812 
3813     nxt_fd_write(access_log->fd, buf, p - buf);
3814 }
3815 
3816 
3817 static u_char *
3818 nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, struct tm *tm,
3819     size_t size, const char *format)
3820 {
3821     u_char  sign;
3822     time_t  gmtoff;
3823 
3824     static const char  *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun",
3825                                     "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
3826 
3827     gmtoff = nxt_timezone(tm) / 60;
3828 
3829     if (gmtoff < 0) {
3830         gmtoff = -gmtoff;
3831         sign = '-';
3832 
3833     } else {
3834         sign = '+';
3835     }
3836 
3837     return nxt_sprintf(buf, buf + size, format,
3838                        tm->tm_mday, month[tm->tm_mon], tm->tm_year + 1900,
3839                        tm->tm_hour, tm->tm_min, tm->tm_sec,
3840                        sign, gmtoff / 60, gmtoff % 60);
3841 }
3842 
3843 
3844 static void
3845 nxt_router_access_log_open(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
3846 {
3847     uint32_t                 stream;
3848     nxt_int_t                ret;
3849     nxt_buf_t                *b;
3850     nxt_port_t               *main_port, *router_port;
3851     nxt_runtime_t            *rt;
3852     nxt_router_access_log_t  *access_log;
3853 
3854     access_log = tmcf->router_conf->access_log;
3855 
3856     b = nxt_buf_mem_alloc(tmcf->mem_pool, access_log->path.length + 1, 0);
3857     if (nxt_slow_path(b == NULL)) {
3858         goto fail;
3859     }
3860 
3861     b->completion_handler = nxt_buf_dummy_completion;
3862 
3863     nxt_buf_cpystr(b, &access_log->path);
3864     *b->mem.free++ = '\0';
3865 
3866     rt = task->thread->runtime;
3867     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
3868     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
3869 
3870     stream = nxt_port_rpc_register_handler(task, router_port,
3871                                            nxt_router_access_log_ready,
3872                                            nxt_router_access_log_error,
3873                                            -1, tmcf);
3874     if (nxt_slow_path(stream == 0)) {
3875         goto fail;
3876     }
3877 
3878     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1,
3879                                 stream, router_port->id, b);
3880 
3881     if (nxt_slow_path(ret != NXT_OK)) {
3882         nxt_port_rpc_cancel(task, router_port, stream);
3883         goto fail;
3884     }
3885 
3886     return;
3887 
3888 fail:
3889 
3890     nxt_router_conf_error(task, tmcf);
3891 }
3892 
3893 
3894 static void
3895 nxt_router_access_log_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3896     void *data)
3897 {
3898     nxt_router_temp_conf_t   *tmcf;
3899     nxt_router_access_log_t  *access_log;
3900 
3901     tmcf = data;
3902 
3903     access_log = tmcf->router_conf->access_log;
3904 
3905     access_log->fd = msg->fd[0];
3906 
3907     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
3908                        nxt_router_conf_apply, task, tmcf, NULL);
3909 }
3910 
3911 
3912 static void
3913 nxt_router_access_log_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3914     void *data)
3915 {
3916     nxt_router_temp_conf_t  *tmcf;
3917 
3918     tmcf = data;
3919 
3920     nxt_router_conf_error(task, tmcf);
3921 }
3922 
3923 
3924 static void
3925 nxt_router_access_log_use(nxt_thread_spinlock_t *lock,
3926     nxt_router_access_log_t *access_log)
3927 {
3928     if (access_log == NULL) {
3929         return;
3930     }
3931 
3932     nxt_thread_spin_lock(lock);
3933 
3934     access_log->count++;
3935 
3936     nxt_thread_spin_unlock(lock);
3937 }
3938 
3939 
3940 static void
3941 nxt_router_access_log_release(nxt_task_t *task, nxt_thread_spinlock_t *lock,
3942     nxt_router_access_log_t *access_log)
3943 {
3944     if (access_log == NULL) {
3945         return;
3946     }
3947 
3948     nxt_thread_spin_lock(lock);
3949 
3950     if (--access_log->count != 0) {
3951         access_log = NULL;
3952     }
3953 
3954     nxt_thread_spin_unlock(lock);
3955 
3956     if (access_log != NULL) {
3957 
3958         if (access_log->fd != -1) {
3959             nxt_fd_close(access_log->fd);
3960         }
3961 
3962         nxt_free(access_log);
3963     }
3964 }
3965 
3966 
3967 typedef struct {
3968     nxt_mp_t                 *mem_pool;
3969     nxt_router_access_log_t  *access_log;
3970 } nxt_router_access_log_reopen_t;
3971 
3972 
3973 static void
3974 nxt_router_access_log_reopen_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
3975 {
3976     nxt_mp_t                        *mp;
3977     uint32_t                        stream;
3978     nxt_int_t                       ret;
3979     nxt_buf_t                       *b;
3980     nxt_port_t                      *main_port, *router_port;
3981     nxt_runtime_t                   *rt;
3982     nxt_router_access_log_t         *access_log;
3983     nxt_router_access_log_reopen_t  *reopen;
3984 
3985     access_log = nxt_router->access_log;
3986 
3987     if (access_log == NULL) {
3988         return;
3989     }
3990 
3991     mp = nxt_mp_create(1024, 128, 256, 32);
3992     if (nxt_slow_path(mp == NULL)) {
3993         return;
3994     }
3995 
3996     reopen = nxt_mp_get(mp, sizeof(nxt_router_access_log_reopen_t));
3997     if (nxt_slow_path(reopen == NULL)) {
3998         goto fail;
3999     }
4000 
4001     reopen->mem_pool = mp;
4002     reopen->access_log = access_log;
4003 
4004     b = nxt_buf_mem_alloc(mp, access_log->path.length + 1, 0);
4005     if (nxt_slow_path(b == NULL)) {
4006         goto fail;
4007     }
4008 
4009     b->completion_handler = nxt_router_access_log_reopen_completion;
4010 
4011     nxt_buf_cpystr(b, &access_log->path);
4012     *b->mem.free++ = '\0';
4013 
4014     rt = task->thread->runtime;
4015     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
4016     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
4017 
4018     stream = nxt_port_rpc_register_handler(task, router_port,
4019                                            nxt_router_access_log_reopen_ready,
4020                                            nxt_router_access_log_reopen_error,
4021                                            -1, reopen);
4022     if (nxt_slow_path(stream == 0)) {
4023         goto fail;
4024     }
4025 
4026     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1,
4027                                 stream, router_port->id, b);
4028 
4029     if (nxt_slow_path(ret != NXT_OK)) {
4030         nxt_port_rpc_cancel(task, router_port, stream);
4031         goto fail;
4032     }
4033 
4034     nxt_mp_retain(mp);
4035 
4036     return;
4037 
4038 fail:
4039 
4040     nxt_mp_destroy(mp);
4041 }
4042 
4043 
4044 static void
4045 nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj, void *data)
4046 {
4047     nxt_mp_t   *mp;
4048     nxt_buf_t  *b;
4049 
4050     b = obj;
4051     mp = b->data;
4052 
4053     nxt_mp_release(mp);
4054 }
4055 
4056 
4057 static void
4058 nxt_router_access_log_reopen_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4059     void *data)
4060 {
4061     nxt_router_access_log_t         *access_log;
4062     nxt_router_access_log_reopen_t  *reopen;
4063 
4064     reopen = data;
4065 
4066     access_log = reopen->access_log;
4067 
4068     if (access_log == nxt_router->access_log) {
4069 
4070         if (nxt_slow_path(dup2(msg->fd[0], access_log->fd) == -1)) {
4071             nxt_alert(task, "dup2(%FD, %FD) failed %E",
4072                       msg->fd[0], access_log->fd, nxt_errno);
4073         }
4074     }
4075 
4076     nxt_fd_close(msg->fd[0]);
4077     nxt_mp_release(reopen->mem_pool);
4078 }
4079 
4080 
4081 static void
4082 nxt_router_access_log_reopen_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4083     void *data)
4084 {
4085     nxt_router_access_log_reopen_t  *reopen;
4086 
4087     reopen = data;
4088 
4089     nxt_mp_release(reopen->mem_pool);
4090 }
4091 
4092 
4093 static void
4094 nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data)
4095 {
4096     nxt_port_t           *port;
4097     nxt_thread_link_t    *link;
4098     nxt_event_engine_t   *engine;
4099     nxt_thread_handle_t  handle;
4100 
4101     handle = (nxt_thread_handle_t) (uintptr_t) obj;
4102     link = data;
4103 
4104     nxt_thread_wait(handle);
4105 
4106     engine = link->engine;
4107 
4108     nxt_queue_remove(&engine->link);
4109 
4110     port = engine->port;
4111 
4112     // TODO notify all apps
4113 
4114     port->engine = task->thread->engine;
4115     nxt_mp_thread_adopt(port->mem_pool);
4116     nxt_port_use(task, port, -1);
4117 
4118     nxt_mp_thread_adopt(engine->mem_pool);
4119     nxt_mp_destroy(engine->mem_pool);
4120 
4121     nxt_event_engine_free(engine);
4122 
4123     nxt_free(link);
4124 }
4125 
4126 
4127 static void
4128 nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4129     void *data)
4130 {
4131     size_t                  b_size, count;
4132     nxt_int_t               ret;
4133     nxt_app_t               *app;
4134     nxt_buf_t               *b, *next;
4135     nxt_port_t              *app_port;
4136     nxt_unit_field_t        *f;
4137     nxt_http_field_t        *field;
4138     nxt_http_request_t      *r;
4139     nxt_unit_response_t     *resp;
4140     nxt_request_rpc_data_t  *req_rpc_data;
4141 
4142     req_rpc_data = data;
4143 
4144     r = req_rpc_data->request;
4145     if (nxt_slow_path(r == NULL)) {
4146         return;
4147     }
4148 
4149     if (r->error) {
4150         nxt_request_rpc_data_unlink(task, req_rpc_data);
4151         return;
4152     }
4153 
4154     app = req_rpc_data->app;
4155     nxt_assert(app != NULL);
4156 
4157     if (msg->port_msg.type == _NXT_PORT_MSG_REQ_HEADERS_ACK) {
4158         nxt_router_req_headers_ack_handler(task, msg, req_rpc_data);
4159 
4160         return;
4161     }
4162 
4163     b = (msg->size == 0) ? NULL : msg->buf;
4164 
4165     if (msg->port_msg.last != 0) {
4166         nxt_debug(task, "router data create last buf");
4167 
4168         nxt_buf_chain_add(&b, nxt_http_buf_last(r));
4169 
4170         req_rpc_data->rpc_cancel = 0;
4171 
4172         if (req_rpc_data->apr_action == NXT_APR_REQUEST_FAILED) {
4173             req_rpc_data->apr_action = NXT_APR_GOT_RESPONSE;
4174         }
4175 
4176         nxt_request_rpc_data_unlink(task, req_rpc_data);
4177 
4178     } else {
4179         if (app->timeout != 0) {
4180             r->timer.handler = nxt_router_app_timeout;
4181             r->timer_data = req_rpc_data;
4182             nxt_timer_add(task->thread->engine, &r->timer, app->timeout);
4183         }
4184     }
4185 
4186     if (b == NULL) {
4187         return;
4188     }
4189 
4190     if (msg->buf == b) {
4191         /* Disable instant buffer completion/re-using by port. */
4192         msg->buf = NULL;
4193     }
4194 
4195     if (r->header_sent) {
4196         nxt_buf_chain_add(&r->out, b);
4197         nxt_http_request_send_body(task, r, NULL);
4198 
4199     } else {
4200         b_size = nxt_buf_is_mem(b) ? nxt_buf_mem_used_size(&b->mem) : 0;
4201 
4202         if (nxt_slow_path(b_size < sizeof(nxt_unit_response_t))) {
4203             nxt_alert(task, "response buffer too small: %z", b_size);
4204             goto fail;
4205         }
4206 
4207         resp = (void *) b->mem.pos;
4208         count = (b_size - sizeof(nxt_unit_response_t))
4209                     / sizeof(nxt_unit_field_t);
4210 
4211         if (nxt_slow_path(count < resp->fields_count)) {
4212             nxt_alert(task, "response buffer too small for fields count: %D",
4213                       resp->fields_count);
4214             goto fail;
4215         }
4216 
4217         field = NULL;
4218 
4219         for (f = resp->fields; f < resp->fields + resp->fields_count; f++) {
4220             if (f->skip) {
4221                 continue;
4222             }
4223 
4224             field = nxt_list_add(r->resp.fields);
4225 
4226             if (nxt_slow_path(field == NULL)) {
4227                 goto fail;
4228             }
4229 
4230             field->hash = f->hash;
4231             field->skip = 0;
4232             field->hopbyhop = 0;
4233 
4234             field->name_length = f->name_length;
4235             field->value_length = f->value_length;
4236             field->name = nxt_unit_sptr_get(&f->name);
4237             field->value = nxt_unit_sptr_get(&f->value);
4238 
4239             ret = nxt_http_field_process(field, &nxt_response_fields_hash, r);
4240             if (nxt_slow_path(ret != NXT_OK)) {
4241                 goto fail;
4242             }
4243 
4244             nxt_debug(task, "header%s: %*s: %*s",
4245                       (field->skip ? " skipped" : ""),
4246                       (size_t) field->name_length, field->name,
4247                       (size_t) field->value_length, field->value);
4248 
4249             if (field->skip) {
4250                 r->resp.fields->last->nelts--;
4251             }
4252         }
4253 
4254         r->status = resp->status;
4255 
4256         if (resp->piggyback_content_length != 0) {
4257             b->mem.pos = nxt_unit_sptr_get(&resp->piggyback_content);
4258             b->mem.free = b->mem.pos + resp->piggyback_content_length;
4259 
4260         } else {
4261             b->mem.pos = b->mem.free;
4262         }
4263 
4264         if (nxt_buf_mem_used_size(&b->mem) == 0) {
4265             next = b->next;
4266             b->next = NULL;
4267 
4268             nxt_work_queue_add(&task->thread->engine->fast_work_queue,
4269                                b->completion_handler, task, b, b->parent);
4270 
4271             b = next;
4272         }
4273 
4274         if (b != NULL) {
4275             nxt_buf_chain_add(&r->out, b);
4276         }
4277 
4278         nxt_http_request_header_send(task, r, nxt_http_request_send_body, NULL);
4279 
4280         if (r->websocket_handshake
4281             && r->status == NXT_HTTP_SWITCHING_PROTOCOLS)
4282         {
4283             app_port = req_rpc_data->app_port;
4284             if (nxt_slow_path(app_port == NULL)) {
4285                 goto fail;
4286             }
4287 
4288             nxt_thread_mutex_lock(&app->mutex);
4289 
4290             app_port->main_app_port->active_websockets++;
4291 
4292             nxt_thread_mutex_unlock(&app->mutex);
4293 
4294             nxt_router_app_port_release(task, app, app_port, NXT_APR_UPGRADE);
4295             req_rpc_data->apr_action = NXT_APR_CLOSE;
4296 
4297             nxt_debug(task, "stream #%uD upgrade", req_rpc_data->stream);
4298 
4299             r->state = &nxt_http_websocket;
4300 
4301         } else {
4302             r->state = &nxt_http_request_send_state;
4303         }
4304     }
4305 
4306     return;
4307 
4308 fail:
4309 
4310     nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
4311 
4312     nxt_request_rpc_data_unlink(task, req_rpc_data);
4313 }
4314 
4315 
4316 static void
4317 nxt_router_req_headers_ack_handler(nxt_task_t *task,
4318     nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data)
4319 {
4320     int                 res;
4321     nxt_app_t           *app;
4322     nxt_buf_t           *b;
4323     nxt_bool_t          start_process, unlinked;
4324     nxt_port_t          *app_port, *main_app_port, *idle_port;
4325     nxt_queue_link_t    *idle_lnk;
4326     nxt_http_request_t  *r;
4327 
4328     nxt_debug(task, "stream #%uD: got ack from %PI:%d",
4329               req_rpc_data->stream,
4330               msg->port_msg.pid, msg->port_msg.reply_port);
4331 
4332     nxt_port_rpc_ex_set_peer(task, msg->port, req_rpc_data,
4333                              msg->port_msg.pid);
4334 
4335     app = req_rpc_data->app;
4336     r = req_rpc_data->request;
4337 
4338     start_process = 0;
4339     unlinked = 0;
4340 
4341     nxt_thread_mutex_lock(&app->mutex);
4342 
4343     if (r->app_link.next != NULL) {
4344         nxt_queue_remove(&r->app_link);
4345         r->app_link.next = NULL;
4346 
4347         unlinked = 1;
4348     }
4349 
4350     app_port = nxt_port_hash_find(&app->port_hash, msg->port_msg.pid,
4351                                   msg->port_msg.reply_port);
4352     if (nxt_slow_path(app_port == NULL)) {
4353         nxt_thread_mutex_unlock(&app->mutex);
4354 
4355         nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
4356 
4357         if (unlinked) {
4358             nxt_mp_release(r->mem_pool);
4359         }
4360 
4361         return;
4362     }
4363 
4364     main_app_port = app_port->main_app_port;
4365 
4366     if (nxt_queue_chk_remove(&main_app_port->idle_link)) {
4367         app->idle_processes--;
4368 
4369         nxt_debug(task, "app '%V' move port %PI:%d out of %s (ack)",
4370                   &app->name, main_app_port->pid, main_app_port->id,
4371                   (main_app_port->idle_start ? "idle_ports" : "spare_ports"));
4372 
4373         /* Check port was in 'spare_ports' using idle_start field. */
4374         if (main_app_port->idle_start == 0
4375             && app->idle_processes >= app->spare_processes)
4376         {
4377             /*
4378              * If there is a vacant space in spare ports,
4379              * move the last idle to spare_ports.
4380              */
4381             nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4382 
4383             idle_lnk = nxt_queue_last(&app->idle_ports);
4384             idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link);
4385             nxt_queue_remove(idle_lnk);
4386 
4387             nxt_queue_insert_tail(&app->spare_ports, idle_lnk);
4388 
4389             idle_port->idle_start = 0;
4390 
4391             nxt_debug(task, "app '%V' move port %PI:%d from idle_ports "
4392                       "to spare_ports",
4393                       &app->name, idle_port->pid, idle_port->id);
4394         }
4395 
4396         if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) {
4397             app->pending_processes++;
4398             start_process = 1;
4399         }
4400     }
4401 
4402     main_app_port->active_requests++;
4403 
4404     nxt_port_inc_use(app_port);
4405 
4406     nxt_thread_mutex_unlock(&app->mutex);
4407 
4408     if (unlinked) {
4409         nxt_mp_release(r->mem_pool);
4410     }
4411 
4412     if (start_process) {
4413         nxt_router_start_app_process(task, app);
4414     }
4415 
4416     nxt_port_use(task, req_rpc_data->app_port, -1);
4417 
4418     req_rpc_data->app_port = app_port;
4419 
4420     b = req_rpc_data->msg_info.buf;
4421 
4422     if (b != NULL) {
4423         /* First buffer is already sent.  Start from second. */
4424         b = b->next;
4425 
4426         req_rpc_data->msg_info.buf->next = NULL;
4427     }
4428 
4429     if (req_rpc_data->msg_info.body_fd != -1 || b != NULL) {
4430         nxt_debug(task, "stream #%uD: send body fd %d", req_rpc_data->stream,
4431                   req_rpc_data->msg_info.body_fd);
4432 
4433         if (req_rpc_data->msg_info.body_fd != -1) {
4434             lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET);
4435         }
4436 
4437         res = nxt_port_socket_write(task, app_port, NXT_PORT_MSG_REQ_BODY,
4438                                     req_rpc_data->msg_info.body_fd,
4439                                     req_rpc_data->stream,
4440                                     task->thread->engine->port->id, b);
4441 
4442         if (nxt_slow_path(res != NXT_OK)) {
4443             nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
4444         }
4445     }
4446 
4447     if (app->timeout != 0) {
4448         r->timer.handler = nxt_router_app_timeout;
4449         r->timer_data = req_rpc_data;
4450         nxt_timer_add(task->thread->engine, &r->timer, app->timeout);
4451     }
4452 }
4453 
4454 
4455 static const nxt_http_request_state_t  nxt_http_request_send_state
4456     nxt_aligned(64) =
4457 {
4458     .error_handler = nxt_http_request_error_handler,
4459 };
4460 
4461 
4462 static void
4463 nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data)
4464 {
4465     nxt_buf_t           *out;
4466     nxt_http_request_t  *r;
4467 
4468     r = obj;
4469 
4470     out = r->out;
4471 
4472     if (out != NULL) {
4473         r->out = NULL;
4474         nxt_http_request_send(task, r, out);
4475     }
4476 }
4477 
4478 
4479 static void
4480 nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4481     void *data)
4482 {
4483     nxt_request_rpc_data_t  *req_rpc_data;
4484 
4485     req_rpc_data = data;
4486 
4487     req_rpc_data->rpc_cancel = 0;
4488 
4489     /* TODO cancel message and return if cancelled. */
4490     // nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream);
4491 
4492     if (req_rpc_data->request != NULL) {
4493         nxt_http_request_error(task, req_rpc_data->request,
4494                                NXT_HTTP_SERVICE_UNAVAILABLE);
4495     }
4496 
4497     nxt_request_rpc_data_unlink(task, req_rpc_data);
4498 }
4499 
4500 
4501 static void
4502 nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4503     void *data)
4504 {
4505     uint32_t             n;
4506     nxt_app_t            *app;
4507     nxt_bool_t           start_process, restarted;
4508     nxt_port_t           *port;
4509     nxt_app_joint_t      *app_joint;
4510     nxt_app_joint_rpc_t  *app_joint_rpc;
4511 
4512     nxt_assert(data != NULL);
4513 
4514     app_joint_rpc = data;
4515     app_joint = app_joint_rpc->app_joint;
4516     port = msg->u.new_port;
4517 
4518     nxt_assert(app_joint != NULL);
4519     nxt_assert(port != NULL);
4520     nxt_assert(port->id == 0);
4521 
4522     app = app_joint->app;
4523 
4524     nxt_router_app_joint_use(task, app_joint, -1);
4525 
4526     if (nxt_slow_path(app == NULL)) {
4527         nxt_debug(task, "new port ready for released app, send QUIT");
4528 
4529         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4530 
4531         return;
4532     }
4533 
4534     nxt_thread_mutex_lock(&app->mutex);
4535 
4536     restarted = (app->generation != app_joint_rpc->generation);
4537 
4538     if (app_joint_rpc->proto) {
4539         nxt_assert(app->proto_port == NULL);
4540         nxt_assert(port->type == NXT_PROCESS_PROTOTYPE);
4541 
4542         n = app->proto_port_requests;
4543         app->proto_port_requests = 0;
4544 
4545         if (nxt_slow_path(restarted)) {
4546             nxt_thread_mutex_unlock(&app->mutex);
4547 
4548             nxt_debug(task, "proto port ready for restarted app, send QUIT");
4549 
4550             nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0,
4551                                   NULL);
4552 
4553         } else {
4554             port->app = app;
4555             app->proto_port = port;
4556 
4557             nxt_thread_mutex_unlock(&app->mutex);
4558 
4559             nxt_port_use(task, port, 1);
4560         }
4561 
4562         port = task->thread->runtime->port_by_type[NXT_PROCESS_ROUTER];
4563 
4564         while (n > 0) {
4565             nxt_router_app_use(task, app, 1);
4566 
4567             nxt_router_start_app_process_handler(task, port, app);
4568 
4569             n--;
4570         }
4571 
4572         return;
4573     }
4574 
4575     nxt_assert(port->type == NXT_PROCESS_APP);
4576     nxt_assert(app->pending_processes != 0);
4577 
4578     app->pending_processes--;
4579 
4580     if (nxt_slow_path(restarted)) {
4581         nxt_debug(task, "new port ready for restarted app, send QUIT");
4582 
4583         start_process = !task->thread->engine->shutdown
4584                         && nxt_router_app_can_start(app)
4585                         && nxt_router_app_need_start(app);
4586 
4587         if (start_process) {
4588             app->pending_processes++;
4589         }
4590 
4591         nxt_thread_mutex_unlock(&app->mutex);
4592 
4593         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4594 
4595         if (start_process) {
4596             nxt_router_start_app_process(task, app);
4597         }
4598 
4599         return;
4600     }
4601 
4602     port->app = app;
4603     port->main_app_port = port;
4604 
4605     app->processes++;
4606     nxt_port_hash_add(&app->port_hash, port);
4607     app->port_hash_count++;
4608 
4609     nxt_thread_mutex_unlock(&app->mutex);
4610 
4611     nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d",
4612               &app->name, port->pid, app->processes, app->pending_processes);
4613 
4614     nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
4615 
4616     nxt_router_app_port_release(task, app, port, NXT_APR_NEW_PORT);
4617 }
4618 
4619 
4620 static void
4621 nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4622     void *data)
4623 {
4624     nxt_app_t            *app;
4625     nxt_app_joint_t      *app_joint;
4626     nxt_queue_link_t     *link;
4627     nxt_http_request_t   *r;
4628     nxt_app_joint_rpc_t  *app_joint_rpc;
4629 
4630     nxt_assert(data != NULL);
4631 
4632     app_joint_rpc = data;
4633     app_joint = app_joint_rpc->app_joint;
4634 
4635     nxt_assert(app_joint != NULL);
4636 
4637     app = app_joint->app;
4638 
4639     nxt_router_app_joint_use(task, app_joint, -1);
4640 
4641     if (nxt_slow_path(app == NULL)) {
4642         nxt_debug(task, "start error for released app");
4643 
4644         return;
4645     }
4646 
4647     nxt_debug(task, "app '%V' %p start error", &app->name, app);
4648 
4649     link = NULL;
4650 
4651     nxt_thread_mutex_lock(&app->mutex);
4652 
4653     nxt_assert(app->pending_processes != 0);
4654 
4655     app->pending_processes--;
4656 
4657     if (app->processes == 0 && !nxt_queue_is_empty(&app->ack_waiting_req)) {
4658         link = nxt_queue_first(&app->ack_waiting_req);
4659 
4660         nxt_queue_remove(link);
4661         link->next = NULL;
4662     }
4663 
4664     nxt_thread_mutex_unlock(&app->mutex);
4665 
4666     while (link != NULL) {
4667         r = nxt_container_of(link, nxt_http_request_t, app_link);
4668 
4669         nxt_event_engine_post(r->engine, &r->err_work);
4670 
4671         link = NULL;
4672 
4673         nxt_thread_mutex_lock(&app->mutex);
4674 
4675         if (app->processes == 0 && app->pending_processes == 0
4676             && !nxt_queue_is_empty(&app->ack_waiting_req))
4677         {
4678             link = nxt_queue_first(&app->ack_waiting_req);
4679 
4680             nxt_queue_remove(link);
4681             link->next = NULL;
4682         }
4683 
4684         nxt_thread_mutex_unlock(&app->mutex);
4685     }
4686 }
4687 
4688 
4689 nxt_inline nxt_port_t *
4690 nxt_router_app_get_port_for_quit(nxt_task_t *task, nxt_app_t *app)
4691 {
4692     nxt_port_t  *port;
4693 
4694     port = NULL;
4695 
4696     nxt_thread_mutex_lock(&app->mutex);
4697 
4698     nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
4699 
4700         /* Caller is responsible to decrease port use count. */
4701         nxt_queue_chk_remove(&port->app_link);
4702 
4703         if (nxt_queue_chk_remove(&port->idle_link)) {
4704             app->idle_processes--;
4705 
4706             nxt_debug(task, "app '%V' move port %PI:%d out of %s for quit",
4707                       &app->name, port->pid, port->id,
4708                       (port->idle_start ? "idle_ports" : "spare_ports"));
4709         }
4710 
4711         nxt_port_hash_remove(&app->port_hash, port);
4712         app->port_hash_count--;
4713 
4714         port->app = NULL;
4715         app->processes--;
4716 
4717         break;
4718 
4719     } nxt_queue_loop;
4720 
4721     nxt_thread_mutex_unlock(&app->mutex);
4722 
4723     return port;
4724 }
4725 
4726 
4727 static void
4728 nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i)
4729 {
4730     int  c;
4731 
4732     c = nxt_atomic_fetch_add(&app->use_count, i);
4733 
4734     if (i < 0 && c == -i) {
4735 
4736         if (task->thread->engine != app->engine) {
4737             nxt_event_engine_post(app->engine, &app->joint->free_app_work);
4738 
4739         } else {
4740             nxt_router_free_app(task, app->joint, NULL);
4741         }
4742     }
4743 }
4744 
4745 
4746 static void
4747 nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app)
4748 {
4749     nxt_debug(task, "app '%V' %p unlink", &app->name, app);
4750 
4751     nxt_queue_remove(&app->link);
4752 
4753     nxt_router_app_use(task, app, -1);
4754 }
4755 
4756 
4757 static void
4758 nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app, nxt_port_t *port,
4759     nxt_apr_action_t action)
4760 {
4761     int         inc_use;
4762     uint32_t    got_response, dec_requests;
4763     nxt_bool_t  adjust_idle_timer;
4764     nxt_port_t  *main_app_port;
4765 
4766     nxt_assert(port != NULL);
4767 
4768     inc_use = 0;
4769     got_response = 0;
4770     dec_requests = 0;
4771 
4772     switch (action) {
4773     case NXT_APR_NEW_PORT:
4774         break;
4775     case NXT_APR_REQUEST_FAILED:
4776         dec_requests = 1;
4777         inc_use = -1;
4778         break;
4779     case NXT_APR_GOT_RESPONSE:
4780         got_response = 1;
4781         inc_use = -1;
4782         break;
4783     case NXT_APR_UPGRADE:
4784         got_response = 1;
4785         break;
4786     case NXT_APR_CLOSE:
4787         inc_use = -1;
4788         break;
4789     }
4790 
4791     nxt_debug(task, "app '%V' release port %PI:%d: %d %d", &app->name,
4792               port->pid, port->id,
4793               (int) inc_use, (int) got_response);
4794 
4795     if (port->id == NXT_SHARED_PORT_ID) {
4796         nxt_thread_mutex_lock(&app->mutex);
4797 
4798         app->active_requests -= got_response + dec_requests;
4799 
4800         nxt_thread_mutex_unlock(&app->mutex);
4801 
4802         goto adjust_use;
4803     }
4804 
4805     main_app_port = port->main_app_port;
4806 
4807     nxt_thread_mutex_lock(&app->mutex);
4808 
4809     main_app_port->active_requests -= got_response + dec_requests;
4810     app->active_requests -= got_response + dec_requests;
4811 
4812     if (main_app_port->pair[1] != -1 && main_app_port->app_link.next == NULL) {
4813         nxt_queue_insert_tail(&app->ports, &main_app_port->app_link);
4814 
4815         nxt_port_inc_use(main_app_port);
4816     }
4817 
4818     adjust_idle_timer = 0;
4819 
4820     if (main_app_port->pair[1] != -1
4821         && main_app_port->active_requests == 0
4822         && main_app_port->active_websockets == 0
4823         && main_app_port->idle_link.next == NULL)
4824     {
4825         if (app->idle_processes == app->spare_processes
4826             && app->adjust_idle_work.data == NULL)
4827         {
4828             adjust_idle_timer = 1;
4829             app->adjust_idle_work.data = app;
4830             app->adjust_idle_work.next = NULL;
4831         }
4832 
4833         if (app->idle_processes < app->spare_processes) {
4834             nxt_queue_insert_tail(&app->spare_ports, &main_app_port->idle_link);
4835 
4836             nxt_debug(task, "app '%V' move port %PI:%d to spare_ports",
4837                       &app->name, main_app_port->pid, main_app_port->id);
4838         } else {
4839             nxt_queue_insert_tail(&app->idle_ports, &main_app_port->idle_link);
4840 
4841             main_app_port->idle_start = task->thread->engine->timers.now;
4842 
4843             nxt_debug(task, "app '%V' move port %PI:%d to idle_ports",
4844                       &app->name, main_app_port->pid, main_app_port->id);
4845         }
4846 
4847         app->idle_processes++;
4848     }
4849 
4850     nxt_thread_mutex_unlock(&app->mutex);
4851 
4852     if (adjust_idle_timer) {
4853         nxt_router_app_use(task, app, 1);
4854         nxt_event_engine_post(app->engine, &app->adjust_idle_work);
4855     }
4856 
4857     /* ? */
4858     if (main_app_port->pair[1] == -1) {
4859         nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)",
4860                   &app->name, app, main_app_port, main_app_port->pid);
4861 
4862         goto adjust_use;
4863     }
4864 
4865     nxt_debug(task, "app '%V' %p requests queue is empty, keep the port",
4866               &app->name, app);
4867 
4868 adjust_use:
4869 
4870     nxt_port_use(task, port, inc_use);
4871 }
4872 
4873 
4874 void
4875 nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port)
4876 {
4877     nxt_app_t         *app;
4878     nxt_bool_t        unchain, start_process;
4879     nxt_port_t        *idle_port;
4880     nxt_queue_link_t  *idle_lnk;
4881 
4882     app = port->app;
4883 
4884     nxt_assert(app != NULL);
4885 
4886     nxt_thread_mutex_lock(&app->mutex);
4887 
4888     if (port == app->proto_port) {
4889         app->proto_port = NULL;
4890         port->app = NULL;
4891 
4892         nxt_thread_mutex_unlock(&app->mutex);
4893 
4894         nxt_debug(task, "app '%V' prototype pid %PI closed", &app->name,
4895                   port->pid);
4896 
4897         nxt_port_use(task, port, -1);
4898 
4899         return;
4900     }
4901 
4902     nxt_port_hash_remove(&app->port_hash, port);
4903     app->port_hash_count--;
4904 
4905     if (port->id != 0) {
4906         nxt_thread_mutex_unlock(&app->mutex);
4907 
4908         nxt_debug(task, "app '%V' port (%PI, %d) closed", &app->name,
4909                   port->pid, port->id);
4910 
4911         return;
4912     }
4913 
4914     unchain = nxt_queue_chk_remove(&port->app_link);
4915 
4916     if (nxt_queue_chk_remove(&port->idle_link)) {
4917         app->idle_processes--;
4918 
4919         nxt_debug(task, "app '%V' move port %PI:%d out of %s before close",
4920                   &app->name, port->pid, port->id,
4921                   (port->idle_start ? "idle_ports" : "spare_ports"));
4922 
4923         if (port->idle_start == 0
4924             && app->idle_processes >= app->spare_processes)
4925         {
4926             nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4927 
4928             idle_lnk = nxt_queue_last(&app->idle_ports);
4929             idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link);
4930             nxt_queue_remove(idle_lnk);
4931 
4932             nxt_queue_insert_tail(&app->spare_ports, idle_lnk);
4933 
4934             idle_port->idle_start = 0;
4935 
4936             nxt_debug(task, "app '%V' move port %PI:%d from idle_ports "
4937                       "to spare_ports",
4938                       &app->name, idle_port->pid, idle_port->id);
4939         }
4940     }
4941 
4942     app->processes--;
4943 
4944     start_process = !task->thread->engine->shutdown
4945                     && nxt_router_app_can_start(app)
4946                     && nxt_router_app_need_start(app);
4947 
4948     if (start_process) {
4949         app->pending_processes++;
4950     }
4951 
4952     nxt_thread_mutex_unlock(&app->mutex);
4953 
4954     nxt_debug(task, "app '%V' pid %PI closed", &app->name, port->pid);
4955 
4956     if (unchain) {
4957         nxt_port_use(task, port, -1);
4958     }
4959 
4960     if (start_process) {
4961         nxt_router_start_app_process(task, app);
4962     }
4963 }
4964 
4965 
4966 static void
4967 nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data)
4968 {
4969     nxt_app_t           *app;
4970     nxt_bool_t          queued;
4971     nxt_port_t          *port;
4972     nxt_msec_t          timeout, threshold;
4973     nxt_queue_link_t    *lnk;
4974     nxt_event_engine_t  *engine;
4975 
4976     app = obj;
4977     queued = (data == app);
4978 
4979     nxt_debug(task, "nxt_router_adjust_idle_timer: app \"%V\", queued %b",
4980               &app->name, queued);
4981 
4982     engine = task->thread->engine;
4983 
4984     nxt_assert(app->engine == engine);
4985 
4986     threshold = engine->timers.now + app->joint->idle_timer.bias;
4987     timeout = 0;
4988 
4989     nxt_thread_mutex_lock(&app->mutex);
4990 
4991     if (queued) {
4992         app->adjust_idle_work.data = NULL;
4993     }
4994 
4995     nxt_debug(task, "app '%V' idle_processes %d, spare_processes %d",
4996               &app->name,
4997               (int) app->idle_processes, (int) app->spare_processes);
4998 
4999     while (app->idle_processes > app->spare_processes) {
5000 
5001         nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
5002 
5003         lnk = nxt_queue_first(&app->idle_ports);
5004         port = nxt_queue_link_data(lnk, nxt_port_t, idle_link);
5005 
5006         timeout = port->idle_start + app->idle_timeout;
5007 
5008         nxt_debug(task, "app '%V' pid %PI, start %M, timeout %M, threshold %M",
5009                   &app->name, port->pid,
5010                   port->idle_start, timeout, threshold);
5011 
5012         if (timeout > threshold) {
5013             break;
5014         }
5015 
5016         nxt_queue_remove(lnk);
5017         lnk->next = NULL;
5018 
5019         nxt_debug(task, "app '%V' move port %PI:%d out of idle_ports (timeout)",
5020                   &app->name, port->pid, port->id);
5021 
5022         nxt_queue_chk_remove(&port->app_link);
5023 
5024         nxt_port_hash_remove(&app->port_hash, port);
5025         app->port_hash_count--;
5026 
5027         app->idle_processes--;
5028         app->processes--;
5029         port->app = NULL;
5030 
5031         nxt_thread_mutex_unlock(&app->mutex);
5032 
5033         nxt_debug(task, "app '%V' send QUIT to idle port %PI",
5034                   &app->name, port->pid);
5035 
5036         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
5037 
5038         nxt_port_use(task, port, -1);
5039 
5040         nxt_thread_mutex_lock(&app->mutex);
5041     }
5042 
5043     nxt_thread_mutex_unlock(&app->mutex);
5044 
5045     if (timeout > threshold) {
5046         nxt_timer_add(engine, &app->joint->idle_timer, timeout - threshold);
5047 
5048     } else {
5049         nxt_timer_disable(engine, &app->joint->idle_timer);
5050     }
5051 
5052     if (queued) {
5053         nxt_router_app_use(task, app, -1);
5054     }
5055 }
5056 
5057 
5058 static void
5059 nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, void *data)
5060 {
5061     nxt_timer_t      *timer;
5062     nxt_app_joint_t  *app_joint;
5063 
5064     timer = obj;
5065     app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer);
5066 
5067     if (nxt_fast_path(app_joint->app != NULL)) {
5068         nxt_router_adjust_idle_timer(task, app_joint->app, NULL);
5069     }
5070 }
5071 
5072 
5073 static void
5074 nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, void *data)
5075 {
5076     nxt_timer_t      *timer;
5077     nxt_app_joint_t  *app_joint;
5078 
5079     timer = obj;
5080     app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer);
5081 
5082     nxt_router_app_joint_use(task, app_joint, -1);
5083 }
5084 
5085 
5086 static void
5087 nxt_router_free_app(nxt_task_t *task, void *obj, void *data)
5088 {
5089     nxt_app_t        *app;
5090     nxt_port_t       *port, *proto_port;
5091     nxt_app_joint_t  *app_joint;
5092 
5093     app_joint = obj;
5094     app = app_joint->app;
5095 
5096     for ( ;; ) {
5097         port = nxt_router_app_get_port_for_quit(task, app);
5098         if (port == NULL) {
5099             break;
5100         }
5101 
5102         nxt_port_use(task, port, -1);
5103     }
5104 
5105     nxt_thread_mutex_lock(&app->mutex);
5106 
5107     for ( ;; ) {
5108         port = nxt_port_hash_retrieve(&app->port_hash);
5109         if (port == NULL) {
5110             break;
5111         }
5112 
5113         app->port_hash_count--;
5114 
5115         port->app = NULL;
5116 
5117         nxt_port_close(task, port);
5118 
5119         nxt_port_use(task, port, -1);
5120     }
5121 
5122     proto_port = app->proto_port;
5123 
5124     if (proto_port != NULL) {
5125         nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name,
5126                   proto_port->pid);
5127 
5128         app->proto_port = NULL;
5129         proto_port->app = NULL;
5130     }
5131 
5132     nxt_thread_mutex_unlock(&app->mutex);
5133 
5134     if (proto_port != NULL) {
5135         nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT,
5136                               -1, 0, 0, NULL);
5137 
5138         nxt_port_close(task, proto_port);
5139 
5140         nxt_port_use(task, proto_port, -1);
5141     }
5142 
5143     nxt_assert(app->proto_port == NULL);
5144     nxt_assert(app->processes == 0);
5145     nxt_assert(app->active_requests == 0);
5146     nxt_assert(app->port_hash_count == 0);
5147     nxt_assert(app->idle_processes == 0);
5148     nxt_assert(nxt_queue_is_empty(&app->ports));
5149     nxt_assert(nxt_queue_is_empty(&app->spare_ports));
5150     nxt_assert(nxt_queue_is_empty(&app->idle_ports));
5151 
5152     nxt_port_mmaps_destroy(&app->outgoing, 1);
5153 
5154     nxt_thread_mutex_destroy(&app->outgoing.mutex);
5155 
5156     if (app->shared_port != NULL) {
5157         app->shared_port->app = NULL;
5158         nxt_port_close(task, app->shared_port);
5159         nxt_port_use(task, app->shared_port, -1);
5160 
5161         app->shared_port = NULL;
5162     }
5163 
5164     nxt_thread_mutex_destroy(&app->mutex);
5165     nxt_mp_destroy(app->mem_pool);
5166 
5167     app_joint->app = NULL;
5168 
5169     if (nxt_timer_delete(task->thread->engine, &app_joint->idle_timer)) {
5170         app_joint->idle_timer.handler = nxt_router_app_joint_release_handler;
5171         nxt_timer_add(task->thread->engine, &app_joint->idle_timer, 0);
5172 
5173     } else {
5174         nxt_router_app_joint_use(task, app_joint, -1);
5175     }
5176 }
5177 
5178 
5179 static void
5180 nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
5181     nxt_request_rpc_data_t *req_rpc_data)
5182 {
5183     nxt_bool_t          start_process;
5184     nxt_port_t          *port;
5185     nxt_http_request_t  *r;
5186 
5187     start_process = 0;
5188 
5189     nxt_thread_mutex_lock(&app->mutex);
5190 
5191     port = app->shared_port;
5192     nxt_port_inc_use(port);
5193 
5194     app->active_requests++;
5195 
5196     if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) {
5197         app->pending_processes++;
5198         start_process = 1;
5199     }
5200 
5201     r = req_rpc_data->request;
5202 
5203     /*
5204      * Put request into application-wide list to be able to cancel request
5205      * if something goes wrong with application processes.
5206      */
5207     nxt_queue_insert_tail(&app->ack_waiting_req, &r->app_link);
5208 
5209     nxt_thread_mutex_unlock(&app->mutex);
5210 
5211     /*
5212      * Retain request memory pool while request is linked in ack_waiting_req
5213      * to guarantee request structure memory is accessble.
5214      */
5215     nxt_mp_retain(r->mem_pool);
5216 
5217     req_rpc_data->app_port = port;
5218     req_rpc_data->apr_action = NXT_APR_REQUEST_FAILED;
5219 
5220     if (start_process) {
5221         nxt_router_start_app_process(task, app);
5222     }
5223 }
5224 
5225 
5226 void
5227 nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r,
5228     nxt_http_action_t *action)
5229 {
5230     nxt_event_engine_t      *engine;
5231     nxt_http_app_conf_t     *conf;
5232     nxt_request_rpc_data_t  *req_rpc_data;
5233 
5234     conf = action->u.conf;
5235     engine = task->thread->engine;
5236 
5237     r->app_target = conf->target;
5238 
5239     req_rpc_data = nxt_port_rpc_register_handler_ex(task, engine->port,
5240                                           nxt_router_response_ready_handler,
5241                                           nxt_router_response_error_handler,
5242                                           sizeof(nxt_request_rpc_data_t));
5243     if (nxt_slow_path(req_rpc_data == NULL)) {
5244         nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
5245         return;
5246     }
5247 
5248     /*
5249      * At this point we have request req_rpc_data allocated and registered
5250      * in port handlers.  Need to fixup request memory pool.  Counterpart
5251      * release will be called via following call chain:
5252      *    nxt_request_rpc_data_unlink() ->
5253      *        nxt_router_http_request_release_post() ->
5254      *            nxt_router_http_request_release()
5255      */
5256     nxt_mp_retain(r->mem_pool);
5257 
5258     r->timer.task = &engine->task;
5259     r->timer.work_queue = &engine->fast_work_queue;
5260     r->timer.log = engine->task.log;
5261     r->timer.bias = NXT_TIMER_DEFAULT_BIAS;
5262 
5263     r->engine = engine;
5264     r->err_work.handler = nxt_router_http_request_error;
5265     r->err_work.task = task;
5266     r->err_work.obj = r;
5267 
5268     req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data);
5269     req_rpc_data->app = conf->app;
5270     req_rpc_data->msg_info.body_fd = -1;
5271     req_rpc_data->rpc_cancel = 1;
5272 
5273     nxt_router_app_use(task, conf->app, 1);
5274 
5275     req_rpc_data->request = r;
5276     r->req_rpc_data = req_rpc_data;
5277 
5278     if (r->last != NULL) {
5279         r->last->completion_handler = nxt_router_http_request_done;
5280     }
5281 
5282     nxt_router_app_port_get(task, conf->app, req_rpc_data);
5283     nxt_router_app_prepare_request(task, req_rpc_data);
5284 }
5285 
5286 
5287 static void
5288 nxt_router_http_request_error(nxt_task_t *task, void *obj, void *data)
5289 {
5290     nxt_http_request_t  *r;
5291 
5292     r = obj;
5293 
5294     nxt_debug(task, "router http request error (rpc_data %p)", r->req_rpc_data);
5295 
5296     nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
5297 
5298     if (r->req_rpc_data != NULL) {
5299         nxt_request_rpc_data_unlink(task, r->req_rpc_data);
5300     }
5301 
5302     nxt_mp_release(r->mem_pool);
5303 }
5304 
5305 
5306 static void
5307 nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data)
5308 {
5309     nxt_http_request_t  *r;
5310 
5311     r = data;
5312 
5313     nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data);
5314 
5315     if (r->req_rpc_data != NULL) {
5316         nxt_request_rpc_data_unlink(task, r->req_rpc_data);
5317     }
5318 
5319     nxt_http_request_close_handler(task, r, r->proto.any);
5320 }
5321 
5322 
5323 static void
5324 nxt_router_app_prepare_request(nxt_task_t *task,
5325     nxt_request_rpc_data_t *req_rpc_data)
5326 {
5327     nxt_app_t         *app;
5328     nxt_buf_t         *buf, *body;
5329     nxt_int_t         res;
5330     nxt_port_t        *port, *reply_port;
5331 
5332     int                   notify;
5333     struct {
5334         nxt_port_msg_t       pm;
5335         nxt_port_mmap_msg_t  mm;
5336     } msg;
5337 
5338 
5339     app = req_rpc_data->app;
5340 
5341     nxt_assert(app != NULL);
5342 
5343     port = req_rpc_data->app_port;
5344 
5345     nxt_assert(port != NULL);
5346     nxt_assert(port->queue != NULL);
5347 
5348     reply_port = task->thread->engine->port;
5349 
5350     buf = nxt_router_prepare_msg(task, req_rpc_data->request, app,
5351                                  nxt_app_msg_prefix[app->type]);
5352     if (nxt_slow_path(buf == NULL)) {
5353         nxt_alert(task, "stream #%uD, app '%V': failed to prepare app message",
5354                   req_rpc_data->stream, &app->name);
5355 
5356         nxt_http_request_error(task, req_rpc_data->request,
5357                                NXT_HTTP_INTERNAL_SERVER_ERROR);
5358 
5359         return;
5360     }
5361 
5362     nxt_debug(task, "about to send %O bytes buffer to app process port %d",
5363                     nxt_buf_used_size(buf),
5364                     port->socket.fd);
5365 
5366     req_rpc_data->msg_info.buf = buf;
5367 
5368     body = req_rpc_data->request->body;
5369 
5370     if (body != NULL && nxt_buf_is_file(body)) {
5371         req_rpc_data->msg_info.body_fd = body->file->fd;
5372 
5373         body->file->fd = -1;
5374 
5375     } else {
5376         req_rpc_data->msg_info.body_fd = -1;
5377     }
5378 
5379     msg.pm.stream = req_rpc_data->stream;
5380     msg.pm.pid = reply_port->pid;
5381     msg.pm.reply_port = reply_port->id;
5382     msg.pm.type = NXT_PORT_MSG_REQ_HEADERS;
5383     msg.pm.last = 0;
5384     msg.pm.mmap = 1;
5385     msg.pm.nf = 0;
5386     msg.pm.mf = 0;
5387 
5388     nxt_port_mmap_handler_t *mmap_handler = buf->parent;
5389     nxt_port_mmap_header_t *hdr = mmap_handler->hdr;
5390 
5391     msg.mm.mmap_id = hdr->id;
5392     msg.mm.chunk_id = nxt_port_mmap_chunk_id(hdr, buf->mem.pos);
5393     msg.mm.size = nxt_buf_used_size(buf);
5394 
5395     res = nxt_app_queue_send(port->queue, &msg, sizeof(msg),
5396                              req_rpc_data->stream, &notify,
5397                              &req_rpc_data->msg_info.tracking_cookie);
5398     if (nxt_fast_path(res == NXT_OK)) {
5399         if (notify != 0) {
5400             (void) nxt_port_socket_write(task, port,
5401                                          NXT_PORT_MSG_READ_QUEUE,
5402                                          -1, req_rpc_data->stream,
5403                                          reply_port->id, NULL);
5404 
5405         } else {
5406             nxt_debug(task, "queue is not empty");
5407         }
5408 
5409         buf->is_port_mmap_sent = 1;
5410         buf->mem.pos = buf->mem.free;
5411 
5412     } else {
5413         nxt_alert(task, "stream #%uD, app '%V': failed to send app message",
5414                   req_rpc_data->stream, &app->name);
5415 
5416         nxt_http_request_error(task, req_rpc_data->request,
5417                                NXT_HTTP_INTERNAL_SERVER_ERROR);
5418     }
5419 }
5420 
5421 
5422 struct nxt_fields_iter_s {
5423     nxt_list_part_t   *part;
5424     nxt_http_field_t  *field;
5425 };
5426 
5427 typedef struct nxt_fields_iter_s  nxt_fields_iter_t;
5428 
5429 
5430 static nxt_http_field_t *
5431 nxt_fields_part_first(nxt_list_part_t *part, nxt_fields_iter_t *i)
5432 {
5433     if (part == NULL) {
5434         return NULL;
5435     }
5436 
5437     while (part->nelts == 0) {
5438         part = part->next;
5439         if (part == NULL) {
5440             return NULL;
5441         }
5442     }
5443 
5444     i->part = part;
5445     i->field = nxt_list_data(i->part);
5446 
5447     return i->field;
5448 }
5449 
5450 
5451 static nxt_http_field_t *
5452 nxt_fields_first(nxt_list_t *fields, nxt_fields_iter_t *i)
5453 {
5454     return nxt_fields_part_first(nxt_list_part(fields), i);
5455 }
5456 
5457 
5458 static nxt_http_field_t *
5459 nxt_fields_next(nxt_fields_iter_t *i)
5460 {
5461     nxt_http_field_t  *end = nxt_list_data(i->part);
5462 
5463     end += i->part->nelts;
5464     i->field++;
5465 
5466     if (i->field < end) {
5467         return i->field;
5468     }
5469 
5470     return nxt_fields_part_first(i->part->next, i);
5471 }
5472 
5473 
5474 static nxt_buf_t *
5475 nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r,
5476     nxt_app_t *app, const nxt_str_t *prefix)
5477 {
5478     void                *target_pos, *query_pos;
5479     u_char              *pos, *end, *p, c;
5480     size_t              fields_count, req_size, size, free_size;
5481     size_t              copy_size;
5482     nxt_off_t           content_length;
5483     nxt_buf_t           *b, *buf, *out, **tail;
5484     nxt_http_field_t    *field, *dup;
5485     nxt_unit_field_t    *dst_field;
5486     nxt_fields_iter_t   iter, dup_iter;
5487     nxt_unit_request_t  *req;
5488 
5489     req_size = sizeof(nxt_unit_request_t)
5490                + r->method->length + 1
5491                + r->version.length + 1
5492                + r->remote->length + 1
5493                + r->local->length + 1
5494                + r->server_name.length + 1
5495                + r->target.length + 1
5496                + (r->path->start != r->target.start ? r->path->length + 1 : 0);
5497 
5498     content_length = r->content_length_n < 0 ? 0 : r->content_length_n;
5499     fields_count = 0;
5500 
5501     nxt_list_each(field, r->fields) {
5502         fields_count++;
5503 
5504         req_size += field->name_length + prefix->length + 1
5505                     + field->value_length + 1;
5506     } nxt_list_loop;
5507 
5508     req_size += fields_count * sizeof(nxt_unit_field_t);
5509 
5510     if (nxt_slow_path(req_size > PORT_MMAP_DATA_SIZE)) {
5511         nxt_alert(task, "headers to big to fit in shared memory (%d)",
5512                   (int) req_size);
5513 
5514         return NULL;
5515     }
5516 
5517     out = nxt_port_mmap_get_buf(task, &app->outgoing,
5518               nxt_min(req_size + content_length, PORT_MMAP_DATA_SIZE));
5519     if (nxt_slow_path(out == NULL)) {
5520         return NULL;
5521     }
5522 
5523     req = (nxt_unit_request_t *) out->mem.free;
5524     out->mem.free += req_size;
5525 
5526     req->app_target = r->app_target;
5527 
5528     req->content_length = content_length;
5529 
5530     p = (u_char *) (req->fields + fields_count);
5531 
5532     nxt_debug(task, "fields_count=%d", (int) fields_count);
5533 
5534     req->method_length = r->method->length;
5535     nxt_unit_sptr_set(&req->method, p);
5536     p = nxt_cpymem(p, r->method->start, r->method->length);
5537     *p++ = '\0';
5538 
5539     req->version_length = r->version.length;
5540     nxt_unit_sptr_set(&req->version, p);
5541     p = nxt_cpymem(p, r->version.start, r->version.length);
5542     *p++ = '\0';
5543 
5544     req->remote_length = r->remote->address_length;
5545     nxt_unit_sptr_set(&req->remote, p);
5546     p = nxt_cpymem(p, nxt_sockaddr_address(r->remote),
5547                    r->remote->address_length);
5548     *p++ = '\0';
5549 
5550     req->local_length = r->local->address_length;
5551     nxt_unit_sptr_set(&req->local, p);
5552     p = nxt_cpymem(p, nxt_sockaddr_address(r->local), r->local->address_length);
5553     *p++ = '\0';
5554 
5555     req->tls = (r->tls != NULL);
5556     req->websocket_handshake = r->websocket_handshake;
5557 
5558     req->server_name_length = r->server_name.length;
5559     nxt_unit_sptr_set(&req->server_name, p);
5560     p = nxt_cpymem(p, r->server_name.start, r->server_name.length);
5561     *p++ = '\0';
5562 
5563     target_pos = p;
5564     req->target_length = (uint32_t) r->target.length;
5565     nxt_unit_sptr_set(&req->target, p);
5566     p = nxt_cpymem(p, r->target.start, r->target.length);
5567     *p++ = '\0';
5568 
5569     req->path_length = (uint32_t) r->path->length;
5570     if (r->path->start == r->target.start) {
5571         nxt_unit_sptr_set(&req->path, target_pos);
5572 
5573     } else {
5574         nxt_unit_sptr_set(&req->path, p);
5575         p = nxt_cpymem(p, r->path->start, r->path->length);
5576         *p++ = '\0';
5577     }
5578 
5579     req->query_length = (uint32_t) r->args->length;
5580     if (r->args->start != NULL) {
5581         query_pos = nxt_pointer_to(target_pos,
5582                                    r->args->start - r->target.start);
5583 
5584         nxt_unit_sptr_set(&req->query, query_pos);
5585 
5586     } else {
5587         req->query.offset = 0;
5588     }
5589 
5590     req->content_length_field = NXT_UNIT_NONE_FIELD;
5591     req->content_type_field   = NXT_UNIT_NONE_FIELD;
5592     req->cookie_field         = NXT_UNIT_NONE_FIELD;
5593     req->authorization_field  = NXT_UNIT_NONE_FIELD;
5594 
5595     dst_field = req->fields;
5596 
5597     for (field = nxt_fields_first(r->fields, &iter);
5598          field != NULL;
5599          field = nxt_fields_next(&iter))
5600     {
5601         if (field->skip) {
5602             continue;
5603         }
5604 
5605         dst_field->hash = field->hash;
5606         dst_field->skip = 0;
5607         dst_field->name_length = field->name_length + prefix->length;
5608         dst_field->value_length = field->value_length;
5609 
5610         if (field == r->content_length) {
5611             req->content_length_field = dst_field - req->fields;
5612 
5613         } else if (field == r->content_type) {
5614             req->content_type_field = dst_field - req->fields;
5615 
5616         } else if (field == r->cookie) {
5617             req->cookie_field = dst_field - req->fields;
5618 
5619         } else if (field == r->authorization) {
5620             req->authorization_field = dst_field - req->fields;
5621         }
5622 
5623         nxt_debug(task, "add field 0x%04Xd, %d, %d, %p : %d %p",
5624                   (int) field->hash, (int) field->skip,
5625                   (int) field->name_length, field->name,
5626                   (int) field->value_length, field->value);
5627 
5628         if (prefix->length != 0) {
5629             nxt_unit_sptr_set(&dst_field->name, p);
5630             p = nxt_cpymem(p, prefix->start, prefix->length);
5631 
5632             end = field->name + field->name_length;
5633             for (pos = field->name; pos < end; pos++) {
5634                 c = *pos;
5635 
5636                 if (c >= 'a' && c <= 'z') {
5637                     *p++ = (c & ~0x20);
5638                     continue;
5639                 }
5640 
5641                 if (c == '-') {
5642                     *p++ = '_';
5643                     continue;
5644                 }
5645 
5646                 *p++ = c;
5647             }
5648 
5649         } else {
5650             nxt_unit_sptr_set(&dst_field->name, p);
5651             p = nxt_cpymem(p, field->name, field->name_length);
5652         }
5653 
5654         *p++ = '\0';
5655 
5656         nxt_unit_sptr_set(&dst_field->value, p);
5657         p = nxt_cpymem(p, field->value, field->value_length);
5658 
5659         if (prefix->length != 0) {
5660             dup_iter = iter;
5661 
5662             for (dup = nxt_fields_next(&dup_iter);
5663                  dup != NULL;
5664                  dup = nxt_fields_next(&dup_iter))
5665             {
5666                 if (dup->name_length != field->name_length
5667                     || dup->skip
5668                     || dup->hash != field->hash
5669                     || nxt_memcasecmp(dup->name, field->name, dup->name_length))
5670                 {
5671                     continue;
5672                 }
5673 
5674                 p = nxt_cpymem(p, ", ", 2);
5675                 p = nxt_cpymem(p, dup->value, dup->value_length);
5676 
5677                 dst_field->value_length += 2 + dup->value_length;
5678 
5679                 dup->skip = 1;
5680             }
5681         }
5682 
5683         *p++ = '\0';
5684 
5685         dst_field++;
5686     }
5687 
5688     req->fields_count = (uint32_t) (dst_field - req->fields);
5689 
5690     nxt_unit_sptr_set(&req->preread_content, out->mem.free);
5691 
5692     buf = out;
5693     tail = &buf->next;
5694 
5695     for (b = r->body; b != NULL; b = b->next) {
5696         size = nxt_buf_mem_used_size(&b->mem);
5697         pos = b->mem.pos;
5698 
5699         while (size > 0) {
5700             if (buf == NULL) {
5701                 free_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
5702 
5703                 buf = nxt_port_mmap_get_buf(task, &app->outgoing, free_size);
5704                 if (nxt_slow_path(buf == NULL)) {
5705                     while (out != NULL) {
5706                         buf = out->next;
5707                         out->next = NULL;
5708                         out->completion_handler(task, out, out->parent);
5709                         out = buf;
5710                     }
5711                     return NULL;
5712                 }
5713 
5714                 *tail = buf;
5715                 tail = &buf->next;
5716 
5717             } else {
5718                 free_size = nxt_buf_mem_free_size(&buf->mem);
5719                 if (free_size < size
5720                     && nxt_port_mmap_increase_buf(task, buf, size, 1)
5721                        == NXT_OK)
5722                 {
5723                     free_size = nxt_buf_mem_free_size(&buf->mem);
5724                 }
5725             }
5726 
5727             if (free_size > 0) {
5728                 copy_size = nxt_min(free_size, size);
5729 
5730                 buf->mem.free = nxt_cpymem(buf->mem.free, pos, copy_size);
5731 
5732                 size -= copy_size;
5733                 pos += copy_size;
5734 
5735                 if (size == 0) {
5736                     break;
5737                 }
5738             }
5739 
5740             buf = NULL;
5741         }
5742     }
5743 
5744     return out;
5745 }
5746 
5747 
5748 static void
5749 nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
5750 {
5751     nxt_timer_t              *timer;
5752     nxt_http_request_t       *r;
5753     nxt_request_rpc_data_t   *req_rpc_data;
5754 
5755     timer = obj;
5756 
5757     nxt_debug(task, "router app timeout");
5758 
5759     r = nxt_timer_data(timer, nxt_http_request_t, timer);
5760     req_rpc_data = r->timer_data;
5761 
5762     nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
5763 
5764     nxt_request_rpc_data_unlink(task, req_rpc_data);
5765 }
5766 
5767 
5768 static void
5769 nxt_router_http_request_release_post(nxt_task_t *task, nxt_http_request_t *r)
5770 {
5771     r->timer.handler = nxt_router_http_request_release;
5772     nxt_timer_add(task->thread->engine, &r->timer, 0);
5773 }
5774 
5775 
5776 static void
5777 nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data)
5778 {
5779     nxt_http_request_t  *r;
5780 
5781     nxt_debug(task, "http request pool release");
5782 
5783     r = nxt_timer_data(obj, nxt_http_request_t, timer);
5784 
5785     nxt_mp_release(r->mem_pool);
5786 }
5787 
5788 
5789 static void
5790 nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5791 {
5792     size_t                   mi;
5793     uint32_t                 i;
5794     nxt_bool_t               ack;
5795     nxt_process_t            *process;
5796     nxt_free_map_t           *m;
5797     nxt_port_mmap_handler_t  *mmap_handler;
5798 
5799     nxt_debug(task, "oosm in %PI", msg->port_msg.pid);
5800 
5801     process = nxt_runtime_process_find(task->thread->runtime,
5802                                        msg->port_msg.pid);
5803     if (nxt_slow_path(process == NULL)) {
5804         return;
5805     }
5806 
5807     ack = 0;
5808 
5809     /*
5810      * To mitigate possible racing condition (when OOSM message received
5811      * after some of the memory was already freed), need to try to find
5812      * first free segment in shared memory and send ACK if found.
5813      */
5814 
5815     nxt_thread_mutex_lock(&process->incoming.mutex);
5816 
5817     for (i = 0; i < process->incoming.size; i++) {
5818         mmap_handler = process->incoming.elts[i].mmap_handler;
5819 
5820         if (nxt_slow_path(mmap_handler == NULL)) {
5821             continue;
5822         }
5823 
5824         m = mmap_handler->hdr->free_map;
5825 
5826         for (mi = 0; mi < MAX_FREE_IDX; mi++) {
5827             if (m[mi] != 0) {
5828                 ack = 1;
5829 
5830                 nxt_debug(task, "oosm: already free #%uD %uz = 0x%08xA",
5831                           i, mi, m[mi]);
5832 
5833                 break;
5834             }
5835         }
5836     }
5837 
5838     nxt_thread_mutex_unlock(&process->incoming.mutex);
5839 
5840     if (ack) {
5841         nxt_process_broadcast_shm_ack(task, process);
5842     }
5843 }
5844 
5845 
5846 static void
5847 nxt_router_get_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5848 {
5849     nxt_fd_t                 fd;
5850     nxt_port_t               *port;
5851     nxt_runtime_t            *rt;
5852     nxt_port_mmaps_t         *mmaps;
5853     nxt_port_msg_get_mmap_t  *get_mmap_msg;
5854     nxt_port_mmap_handler_t  *mmap_handler;
5855 
5856     rt = task->thread->runtime;
5857 
5858     port = nxt_runtime_port_find(rt, msg->port_msg.pid,
5859                                  msg->port_msg.reply_port);
5860     if (nxt_slow_path(port == NULL)) {
5861         nxt_alert(task, "get_mmap_handler: reply_port %PI:%d not found",
5862                   msg->port_msg.pid, msg->port_msg.reply_port);
5863 
5864         return;
5865     }
5866 
5867     if (nxt_slow_path(nxt_buf_used_size(msg->buf)
5868                       < (int) sizeof(nxt_port_msg_get_mmap_t)))
5869     {
5870         nxt_alert(task, "get_mmap_handler: message buffer too small (%d)",
5871                   (int) nxt_buf_used_size(msg->buf));
5872 
5873         return;
5874     }
5875 
5876     get_mmap_msg = (nxt_port_msg_get_mmap_t *) msg->buf->mem.pos;
5877 
5878     nxt_assert(port->type == NXT_PROCESS_APP);
5879 
5880     if (nxt_slow_path(port->app == NULL)) {
5881         nxt_alert(task, "get_mmap_handler: app == NULL for reply port %PI:%d",
5882                   port->pid, port->id);
5883 
5884         // FIXME
5885         nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
5886                               -1, msg->port_msg.stream, 0, NULL);
5887 
5888         return;
5889     }
5890 
5891     mmaps = &port->app->outgoing;
5892     nxt_thread_mutex_lock(&mmaps->mutex);
5893 
5894     if (nxt_slow_path(get_mmap_msg->id >= mmaps->size)) {
5895         nxt_thread_mutex_unlock(&mmaps->mutex);
5896 
5897         nxt_alert(task, "get_mmap_handler: mmap id is too big (%d)",
5898                   (int) get_mmap_msg->id);
5899 
5900         // FIXME
5901         nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
5902                               -1, msg->port_msg.stream, 0, NULL);
5903         return;
5904     }
5905 
5906     mmap_handler = mmaps->elts[get_mmap_msg->id].mmap_handler;
5907 
5908     fd = mmap_handler->fd;
5909 
5910     nxt_thread_mutex_unlock(&mmaps->mutex);
5911 
5912     nxt_debug(task, "get mmap %PI:%d found",
5913               msg->port_msg.pid, (int) get_mmap_msg->id);
5914 
5915     (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL);
5916 }
5917 
5918 
5919 static void
5920 nxt_router_get_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5921 {
5922     nxt_port_t               *port, *reply_port;
5923     nxt_runtime_t            *rt;
5924     nxt_port_msg_get_port_t  *get_port_msg;
5925 
5926     rt = task->thread->runtime;
5927 
5928     reply_port = nxt_runtime_port_find(rt, msg->port_msg.pid,
5929                                        msg->port_msg.reply_port);
5930     if (nxt_slow_path(reply_port == NULL)) {
5931         nxt_alert(task, "get_port_handler: reply_port %PI:%d not found",
5932                   msg->port_msg.pid, msg->port_msg.reply_port);
5933 
5934         return;
5935     }
5936 
5937     if (nxt_slow_path(nxt_buf_used_size(msg->buf)
5938                       < (int) sizeof(nxt_port_msg_get_port_t)))
5939     {
5940         nxt_alert(task, "get_port_handler: message buffer too small (%d)",
5941                   (int) nxt_buf_used_size(msg->buf));
5942 
5943         return;
5944     }
5945 
5946     get_port_msg = (nxt_port_msg_get_port_t *) msg->buf->mem.pos;
5947 
5948     port = nxt_runtime_port_find(rt, get_port_msg->pid, get_port_msg->id);
5949     if (nxt_slow_path(port == NULL)) {
5950         nxt_alert(task, "get_port_handler: port %PI:%d not found",
5951                   get_port_msg->pid, get_port_msg->id);
5952 
5953         return;
5954     }
5955 
5956     nxt_debug(task, "get port %PI:%d found", get_port_msg->pid,
5957               get_port_msg->id);
5958 
5959     (void) nxt_port_send_port(task, reply_port, port, msg->port_msg.stream);
5960 }
5961