xref: /unit/src/nxt_router.c (revision 2077:624e51cfe97a)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Valentin V. Bartenev
5  * Copyright (C) NGINX, Inc.
6  */
7 
8 #include <nxt_router.h>
9 #include <nxt_conf.h>
10 #if (NXT_TLS)
11 #include <nxt_cert.h>
12 #endif
13 #include <nxt_http.h>
14 #include <nxt_port_memory_int.h>
15 #include <nxt_unit_request.h>
16 #include <nxt_unit_response.h>
17 #include <nxt_router_request.h>
18 #include <nxt_app_queue.h>
19 #include <nxt_port_queue.h>
20 
21 #define NXT_SHARED_PORT_ID  0xFFFFu
22 
23 typedef struct {
24     nxt_str_t         type;
25     uint32_t          processes;
26     uint32_t          max_processes;
27     uint32_t          spare_processes;
28     nxt_msec_t        timeout;
29     nxt_msec_t        idle_timeout;
30     nxt_conf_value_t  *limits_value;
31     nxt_conf_value_t  *processes_value;
32     nxt_conf_value_t  *targets_value;
33 } nxt_router_app_conf_t;
34 
35 
36 typedef struct {
37     nxt_str_t         pass;
38     nxt_str_t         application;
39 } nxt_router_listener_conf_t;
40 
41 
42 #if (NXT_TLS)
43 
44 typedef struct {
45     nxt_str_t               name;
46     nxt_socket_conf_t       *socket_conf;
47     nxt_router_temp_conf_t  *temp_conf;
48     nxt_tls_init_t          *tls_init;
49     nxt_bool_t              last;
50 
51     nxt_queue_link_t        link;  /* for nxt_socket_conf_t.tls */
52 } nxt_router_tlssock_t;
53 
54 #endif
55 
56 
57 typedef struct {
58     nxt_str_t               *name;
59     nxt_socket_conf_t       *socket_conf;
60     nxt_router_temp_conf_t  *temp_conf;
61     nxt_bool_t              last;
62 } nxt_socket_rpc_t;
63 
64 
65 typedef struct {
66     nxt_app_t               *app;
67     nxt_router_temp_conf_t  *temp_conf;
68     uint8_t                 proto;  /* 1 bit */
69 } nxt_app_rpc_t;
70 
71 
72 typedef struct {
73     nxt_app_joint_t         *app_joint;
74     uint32_t                generation;
75     uint8_t                 proto;  /* 1 bit */
76 } nxt_app_joint_rpc_t;
77 
78 
79 static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process,
80     nxt_mp_t *mp);
81 static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data);
82 static void nxt_router_greet_controller(nxt_task_t *task,
83     nxt_port_t *controller_port);
84 
85 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
86 
87 static void nxt_router_new_port_handler(nxt_task_t *task,
88     nxt_port_recv_msg_t *msg);
89 static void nxt_router_conf_data_handler(nxt_task_t *task,
90     nxt_port_recv_msg_t *msg);
91 static void nxt_router_app_restart_handler(nxt_task_t *task,
92     nxt_port_recv_msg_t *msg);
93 static void nxt_router_remove_pid_handler(nxt_task_t *task,
94     nxt_port_recv_msg_t *msg);
95 static void nxt_router_access_log_reopen_handler(nxt_task_t *task,
96     nxt_port_recv_msg_t *msg);
97 
98 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
99 static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
100 static void nxt_router_conf_ready(nxt_task_t *task,
101     nxt_router_temp_conf_t *tmcf);
102 static void nxt_router_conf_error(nxt_task_t *task,
103     nxt_router_temp_conf_t *tmcf);
104 static void nxt_router_conf_send(nxt_task_t *task,
105     nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
106 
107 static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
108     nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
109 static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task,
110     nxt_router_conf_t *rtcf, nxt_conf_value_t *conf);
111 static nxt_int_t nxt_router_conf_process_client_ip(nxt_task_t *task,
112     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf,
113     nxt_conf_value_t *conf);
114 
115 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
116 static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data);
117 static nxt_int_t nxt_router_apps_hash_add(nxt_router_conf_t *rtcf,
118     nxt_app_t *app);
119 static nxt_app_t *nxt_router_apps_hash_get(nxt_router_conf_t *rtcf,
120     nxt_str_t *name);
121 static void nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf,
122     int i);
123 
124 static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task,
125     nxt_port_t *port);
126 static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task,
127     nxt_port_t *port);
128 static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task,
129     nxt_port_t *port, nxt_fd_t fd);
130 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
131     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
132 static void nxt_router_listen_socket_ready(nxt_task_t *task,
133     nxt_port_recv_msg_t *msg, void *data);
134 static void nxt_router_listen_socket_error(nxt_task_t *task,
135     nxt_port_recv_msg_t *msg, void *data);
136 #if (NXT_TLS)
137 static void nxt_router_tls_rpc_handler(nxt_task_t *task,
138     nxt_port_recv_msg_t *msg, void *data);
139 static nxt_int_t nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
140     nxt_conf_value_t *value, nxt_socket_conf_t *skcf, nxt_tls_init_t *tls_init,
141     nxt_bool_t last);
142 #endif
143 static void nxt_router_app_rpc_create(nxt_task_t *task,
144     nxt_router_temp_conf_t *tmcf, nxt_app_t *app);
145 static void nxt_router_app_prefork_ready(nxt_task_t *task,
146     nxt_port_recv_msg_t *msg, void *data);
147 static void nxt_router_app_prefork_error(nxt_task_t *task,
148     nxt_port_recv_msg_t *msg, void *data);
149 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
150     nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
151 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
152     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa);
153 
154 static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
155     nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
156     const nxt_event_interface_t *interface);
157 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
158     nxt_router_engine_conf_t *recf);
159 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
160     nxt_router_engine_conf_t *recf);
161 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
162     nxt_router_engine_conf_t *recf);
163 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
164     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
165     nxt_work_handler_t handler);
166 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
167     nxt_router_engine_conf_t *recf);
168 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
169     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
170 
171 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
172     nxt_router_temp_conf_t *tmcf);
173 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
174     nxt_event_engine_t *engine);
175 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
176     nxt_router_temp_conf_t *tmcf);
177 
178 static void nxt_router_engines_post(nxt_router_t *router,
179     nxt_router_temp_conf_t *tmcf);
180 static void nxt_router_engine_post(nxt_event_engine_t *engine,
181     nxt_work_t *jobs);
182 
183 static void nxt_router_thread_start(void *data);
184 static void nxt_router_rt_add_port(nxt_task_t *task, void *obj,
185     void *data);
186 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
187     void *data);
188 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
189     void *data);
190 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
191     void *data);
192 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
193     void *data);
194 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
195     void *data);
196 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
197     void *data);
198 static void nxt_router_req_headers_ack_handler(nxt_task_t *task,
199     nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data);
200 static void nxt_router_listen_socket_release(nxt_task_t *task,
201     nxt_socket_conf_t *skcf);
202 
203 static void nxt_router_access_log_writer(nxt_task_t *task,
204     nxt_http_request_t *r, nxt_router_access_log_t *access_log);
205 static u_char *nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now,
206     struct tm *tm, size_t size, const char *format);
207 static void nxt_router_access_log_open(nxt_task_t *task,
208     nxt_router_temp_conf_t *tmcf);
209 static void nxt_router_access_log_ready(nxt_task_t *task,
210     nxt_port_recv_msg_t *msg, void *data);
211 static void nxt_router_access_log_error(nxt_task_t *task,
212     nxt_port_recv_msg_t *msg, void *data);
213 static void nxt_router_access_log_use(nxt_thread_spinlock_t *lock,
214     nxt_router_access_log_t *access_log);
215 static void nxt_router_access_log_release(nxt_task_t *task,
216     nxt_thread_spinlock_t *lock, nxt_router_access_log_t *access_log);
217 static void nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj,
218     void *data);
219 static void nxt_router_access_log_reopen_ready(nxt_task_t *task,
220     nxt_port_recv_msg_t *msg, void *data);
221 static void nxt_router_access_log_reopen_error(nxt_task_t *task,
222     nxt_port_recv_msg_t *msg, void *data);
223 
224 static void nxt_router_app_port_ready(nxt_task_t *task,
225     nxt_port_recv_msg_t *msg, void *data);
226 static void nxt_router_app_port_error(nxt_task_t *task,
227     nxt_port_recv_msg_t *msg, void *data);
228 
229 static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i);
230 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
231 
232 static void nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app,
233     nxt_port_t *port, nxt_apr_action_t action);
234 static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
235     nxt_request_rpc_data_t *req_rpc_data);
236 static void nxt_router_http_request_error(nxt_task_t *task, void *obj,
237     void *data);
238 static void nxt_router_http_request_done(nxt_task_t *task, void *obj,
239     void *data);
240 
241 static void nxt_router_app_prepare_request(nxt_task_t *task,
242     nxt_request_rpc_data_t *req_rpc_data);
243 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task,
244     nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix);
245 
246 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
247 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
248     void *data);
249 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj,
250     void *data);
251 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj,
252     void *data);
253 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data);
254 
255 static const nxt_http_request_state_t  nxt_http_request_send_state;
256 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
257 
258 static void nxt_router_app_joint_use(nxt_task_t *task,
259     nxt_app_joint_t *app_joint, int i);
260 
261 static void nxt_router_http_request_release_post(nxt_task_t *task,
262     nxt_http_request_t *r);
263 static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
264     void *data);
265 static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
266 static void nxt_router_get_port_handler(nxt_task_t *task,
267     nxt_port_recv_msg_t *msg);
268 static void nxt_router_get_mmap_handler(nxt_task_t *task,
269     nxt_port_recv_msg_t *msg);
270 
271 extern const nxt_http_request_state_t  nxt_http_websocket;
272 
273 static nxt_router_t  *nxt_router;
274 
275 static const nxt_str_t http_prefix = nxt_string("HTTP_");
276 static const nxt_str_t empty_prefix = nxt_string("");
277 
278 static const nxt_str_t  *nxt_app_msg_prefix[] = {
279     &empty_prefix,
280     &empty_prefix,
281     &http_prefix,
282     &http_prefix,
283     &http_prefix,
284     &empty_prefix,
285 };
286 
287 
288 static const nxt_port_handlers_t  nxt_router_process_port_handlers = {
289     .quit         = nxt_signal_quit_handler,
290     .new_port     = nxt_router_new_port_handler,
291     .get_port     = nxt_router_get_port_handler,
292     .change_file  = nxt_port_change_log_file_handler,
293     .mmap         = nxt_port_mmap_handler,
294     .get_mmap     = nxt_router_get_mmap_handler,
295     .data         = nxt_router_conf_data_handler,
296     .app_restart  = nxt_router_app_restart_handler,
297     .remove_pid   = nxt_router_remove_pid_handler,
298     .access_log   = nxt_router_access_log_reopen_handler,
299     .rpc_ready    = nxt_port_rpc_handler,
300     .rpc_error    = nxt_port_rpc_handler,
301     .oosm         = nxt_router_oosm_handler,
302 };
303 
304 
305 const nxt_process_init_t  nxt_router_process = {
306     .name           = "router",
307     .type           = NXT_PROCESS_ROUTER,
308     .prefork        = nxt_router_prefork,
309     .restart        = 1,
310     .setup          = nxt_process_core_setup,
311     .start          = nxt_router_start,
312     .port_handlers  = &nxt_router_process_port_handlers,
313     .signals        = nxt_process_signals,
314 };
315 
316 
317 /* Queues of nxt_socket_conf_t */
318 nxt_queue_t  creating_sockets;
319 nxt_queue_t  pending_sockets;
320 nxt_queue_t  updating_sockets;
321 nxt_queue_t  keeping_sockets;
322 nxt_queue_t  deleting_sockets;
323 
324 
325 static nxt_int_t
326 nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp)
327 {
328     nxt_runtime_stop_app_processes(task, task->thread->runtime);
329 
330     return NXT_OK;
331 }
332 
333 
334 static nxt_int_t
335 nxt_router_start(nxt_task_t *task, nxt_process_data_t *data)
336 {
337     nxt_int_t      ret;
338     nxt_port_t     *controller_port;
339     nxt_router_t   *router;
340     nxt_runtime_t  *rt;
341 
342     rt = task->thread->runtime;
343 
344     nxt_log(task, NXT_LOG_INFO, "router started");
345 
346 #if (NXT_TLS)
347     rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL");
348     if (nxt_slow_path(rt->tls == NULL)) {
349         return NXT_ERROR;
350     }
351 
352     ret = rt->tls->library_init(task);
353     if (nxt_slow_path(ret != NXT_OK)) {
354         return ret;
355     }
356 #endif
357 
358     ret = nxt_http_init(task);
359     if (nxt_slow_path(ret != NXT_OK)) {
360         return ret;
361     }
362 
363     router = nxt_zalloc(sizeof(nxt_router_t));
364     if (nxt_slow_path(router == NULL)) {
365         return NXT_ERROR;
366     }
367 
368     nxt_queue_init(&router->engines);
369     nxt_queue_init(&router->sockets);
370     nxt_queue_init(&router->apps);
371 
372     nxt_router = router;
373 
374     controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
375     if (controller_port != NULL) {
376         nxt_router_greet_controller(task, controller_port);
377     }
378 
379     return NXT_OK;
380 }
381 
382 
383 static void
384 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port)
385 {
386     nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY,
387                           -1, 0, 0, NULL);
388 }
389 
390 
391 static void
392 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
393     void *data)
394 {
395     size_t               size;
396     uint32_t             stream;
397     nxt_fd_t             port_fd, queue_fd;
398     nxt_int_t            ret;
399     nxt_app_t            *app;
400     nxt_buf_t            *b;
401     nxt_port_t           *dport;
402     nxt_runtime_t        *rt;
403     nxt_app_joint_rpc_t  *app_joint_rpc;
404 
405     app = data;
406 
407     nxt_thread_mutex_lock(&app->mutex);
408 
409     dport = app->proto_port;
410 
411     nxt_thread_mutex_unlock(&app->mutex);
412 
413     if (dport != NULL) {
414         nxt_debug(task, "app '%V' %p start process", &app->name, app);
415 
416         b = NULL;
417         port_fd = -1;
418         queue_fd = -1;
419 
420     } else {
421         if (app->proto_port_requests > 0) {
422             nxt_debug(task, "app '%V' %p wait for prototype process",
423                       &app->name, app);
424 
425             app->proto_port_requests++;
426 
427             goto skip;
428         }
429 
430         nxt_debug(task, "app '%V' %p start prototype process", &app->name, app);
431 
432         rt = task->thread->runtime;
433         dport = rt->port_by_type[NXT_PROCESS_MAIN];
434 
435         size = app->name.length + 1 + app->conf.length;
436 
437         b = nxt_buf_mem_alloc(task->thread->engine->mem_pool, size, 0);
438         if (nxt_slow_path(b == NULL)) {
439             goto failed;
440         }
441 
442         nxt_buf_cpystr(b, &app->name);
443         *b->mem.free++ = '\0';
444         nxt_buf_cpystr(b, &app->conf);
445 
446         port_fd = app->shared_port->pair[0];
447         queue_fd = app->shared_port->queue_fd;
448     }
449 
450     app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port,
451                                                      nxt_router_app_port_ready,
452                                                      nxt_router_app_port_error,
453                                                    sizeof(nxt_app_joint_rpc_t));
454     if (nxt_slow_path(app_joint_rpc == NULL)) {
455         goto failed;
456     }
457 
458     stream = nxt_port_rpc_ex_stream(app_joint_rpc);
459 
460     ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS,
461                                  port_fd, queue_fd, stream, port->id, b);
462     if (nxt_slow_path(ret != NXT_OK)) {
463         nxt_port_rpc_cancel(task, port, stream);
464 
465         goto failed;
466     }
467 
468     app_joint_rpc->app_joint = app->joint;
469     app_joint_rpc->generation = app->generation;
470     app_joint_rpc->proto = (b != NULL);
471 
472     if (b != NULL) {
473         app->proto_port_requests++;
474 
475         b = NULL;
476     }
477 
478     nxt_router_app_joint_use(task, app->joint, 1);
479 
480 failed:
481 
482     if (b != NULL) {
483         nxt_mp_free(b->data, b);
484     }
485 
486 skip:
487 
488     nxt_router_app_use(task, app, -1);
489 }
490 
491 
492 static void
493 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i)
494 {
495     app_joint->use_count += i;
496 
497     if (app_joint->use_count == 0) {
498         nxt_assert(app_joint->app == NULL);
499 
500         nxt_free(app_joint);
501     }
502 }
503 
504 
505 static nxt_int_t
506 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
507 {
508     nxt_int_t      res;
509     nxt_port_t     *router_port;
510     nxt_runtime_t  *rt;
511 
512     nxt_debug(task, "app '%V' start process", &app->name);
513 
514     rt = task->thread->runtime;
515     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
516 
517     nxt_router_app_use(task, app, 1);
518 
519     res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler,
520                         app);
521 
522     if (res == NXT_OK) {
523         return res;
524     }
525 
526     nxt_thread_mutex_lock(&app->mutex);
527 
528     app->pending_processes--;
529 
530     nxt_thread_mutex_unlock(&app->mutex);
531 
532     nxt_router_app_use(task, app, -1);
533 
534     return NXT_ERROR;
535 }
536 
537 
538 nxt_inline nxt_bool_t
539 nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data)
540 {
541     nxt_buf_t       *b, *next;
542     nxt_bool_t      cancelled;
543     nxt_port_t      *app_port;
544     nxt_msg_info_t  *msg_info;
545 
546     msg_info = &req_rpc_data->msg_info;
547 
548     if (msg_info->buf == NULL) {
549         return 0;
550     }
551 
552     app_port = req_rpc_data->app_port;
553 
554     if (app_port != NULL && app_port->id == NXT_SHARED_PORT_ID) {
555         cancelled = nxt_app_queue_cancel(app_port->queue,
556                                          msg_info->tracking_cookie,
557                                          req_rpc_data->stream);
558 
559         if (cancelled) {
560             nxt_debug(task, "stream #%uD: cancelled by router",
561                       req_rpc_data->stream);
562         }
563 
564     } else {
565         cancelled = 0;
566     }
567 
568     for (b = msg_info->buf; b != NULL; b = next) {
569         next = b->next;
570         b->next = NULL;
571 
572         if (b->is_port_mmap_sent) {
573             b->is_port_mmap_sent = cancelled == 0;
574         }
575 
576         b->completion_handler(task, b, b->parent);
577     }
578 
579     msg_info->buf = NULL;
580 
581     return cancelled;
582 }
583 
584 
585 nxt_inline nxt_bool_t
586 nxt_queue_chk_remove(nxt_queue_link_t *lnk)
587 {
588     if (lnk->next != NULL) {
589         nxt_queue_remove(lnk);
590 
591         lnk->next = NULL;
592 
593         return 1;
594     }
595 
596     return 0;
597 }
598 
599 
600 nxt_inline void
601 nxt_request_rpc_data_unlink(nxt_task_t *task,
602     nxt_request_rpc_data_t *req_rpc_data)
603 {
604     nxt_app_t           *app;
605     nxt_bool_t          unlinked;
606     nxt_http_request_t  *r;
607 
608     nxt_router_msg_cancel(task, req_rpc_data);
609 
610     app = req_rpc_data->app;
611 
612     if (req_rpc_data->app_port != NULL) {
613         nxt_router_app_port_release(task, app, req_rpc_data->app_port,
614                                     req_rpc_data->apr_action);
615 
616         req_rpc_data->app_port = NULL;
617     }
618 
619     r = req_rpc_data->request;
620 
621     if (r != NULL) {
622         r->timer_data = NULL;
623 
624         nxt_router_http_request_release_post(task, r);
625 
626         r->req_rpc_data = NULL;
627         req_rpc_data->request = NULL;
628 
629         if (app != NULL) {
630             unlinked = 0;
631 
632             nxt_thread_mutex_lock(&app->mutex);
633 
634             if (r->app_link.next != NULL) {
635                 nxt_queue_remove(&r->app_link);
636                 r->app_link.next = NULL;
637 
638                 unlinked = 1;
639             }
640 
641             nxt_thread_mutex_unlock(&app->mutex);
642 
643             if (unlinked) {
644                 nxt_mp_release(r->mem_pool);
645             }
646         }
647     }
648 
649     if (app != NULL) {
650         nxt_router_app_use(task, app, -1);
651 
652         req_rpc_data->app = NULL;
653     }
654 
655     if (req_rpc_data->msg_info.body_fd != -1) {
656         nxt_fd_close(req_rpc_data->msg_info.body_fd);
657 
658         req_rpc_data->msg_info.body_fd = -1;
659     }
660 
661     if (req_rpc_data->rpc_cancel) {
662         req_rpc_data->rpc_cancel = 0;
663 
664         nxt_port_rpc_cancel(task, task->thread->engine->port,
665                             req_rpc_data->stream);
666     }
667 }
668 
669 
670 static void
671 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
672 {
673     nxt_int_t      res;
674     nxt_app_t      *app;
675     nxt_port_t     *port, *main_app_port;
676     nxt_runtime_t  *rt;
677 
678     nxt_port_new_port_handler(task, msg);
679 
680     port = msg->u.new_port;
681 
682     if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) {
683         nxt_router_greet_controller(task, msg->u.new_port);
684     }
685 
686     if (port != NULL && port->type == NXT_PROCESS_PROTOTYPE)  {
687         nxt_port_rpc_handler(task, msg);
688 
689         return;
690     }
691 
692     if (port == NULL || port->type != NXT_PROCESS_APP) {
693 
694         if (msg->port_msg.stream == 0) {
695             return;
696         }
697 
698         msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
699 
700     } else {
701         if (msg->fd[1] != -1) {
702             res = nxt_router_port_queue_map(task, port, msg->fd[1]);
703             if (nxt_slow_path(res != NXT_OK)) {
704                 return;
705             }
706 
707             nxt_fd_close(msg->fd[1]);
708             msg->fd[1] = -1;
709         }
710     }
711 
712     if (msg->port_msg.stream != 0) {
713         nxt_port_rpc_handler(task, msg);
714         return;
715     }
716 
717     nxt_debug(task, "new port id %d (%d)", port->id, port->type);
718 
719     /*
720      * Port with "id == 0" is application 'main' port and it always
721      * should come with non-zero stream.
722      */
723     nxt_assert(port->id != 0);
724 
725     /* Find 'main' app port and get app reference. */
726     rt = task->thread->runtime;
727 
728     /*
729      * It is safe to access 'runtime->ports' hash because 'NEW_PORT'
730      * sent to main port (with id == 0) and processed in main thread.
731      */
732     main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0);
733     nxt_assert(main_app_port != NULL);
734 
735     app = main_app_port->app;
736 
737     if (nxt_fast_path(app != NULL)) {
738         nxt_thread_mutex_lock(&app->mutex);
739 
740         /* TODO here should be find-and-add code because there can be
741            port waiters in port_hash */
742         nxt_port_hash_add(&app->port_hash, port);
743         app->port_hash_count++;
744 
745         nxt_thread_mutex_unlock(&app->mutex);
746 
747         port->app = app;
748     }
749 
750     port->main_app_port = main_app_port;
751 
752     nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
753 }
754 
755 
756 static void
757 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
758 {
759     void                    *p;
760     size_t                  size;
761     nxt_int_t               ret;
762     nxt_port_t              *port;
763     nxt_router_temp_conf_t  *tmcf;
764 
765     port = nxt_runtime_port_find(task->thread->runtime,
766                                  msg->port_msg.pid,
767                                  msg->port_msg.reply_port);
768     if (nxt_slow_path(port == NULL)) {
769         nxt_alert(task, "conf_data_handler: reply port not found");
770         return;
771     }
772 
773     p = MAP_FAILED;
774 
775     /*
776      * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be
777      * initialized in 'cleanup' section.
778      */
779     size = 0;
780 
781     tmcf = nxt_router_temp_conf(task);
782     if (nxt_slow_path(tmcf == NULL)) {
783         goto fail;
784     }
785 
786     if (nxt_slow_path(msg->fd[0] == -1)) {
787         nxt_alert(task, "conf_data_handler: invalid shm fd");
788         goto fail;
789     }
790 
791     if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) {
792         nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)",
793                   (int) nxt_buf_mem_used_size(&msg->buf->mem));
794         goto fail;
795     }
796 
797     nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t));
798 
799     p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0);
800 
801     nxt_fd_close(msg->fd[0]);
802     msg->fd[0] = -1;
803 
804     if (nxt_slow_path(p == MAP_FAILED)) {
805         goto fail;
806     }
807 
808     nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p);
809 
810     tmcf->router_conf->router = nxt_router;
811     tmcf->stream = msg->port_msg.stream;
812     tmcf->port = port;
813 
814     nxt_port_use(task, tmcf->port, 1);
815 
816     ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size));
817 
818     if (nxt_fast_path(ret == NXT_OK)) {
819         nxt_router_conf_apply(task, tmcf, NULL);
820 
821     } else {
822         nxt_router_conf_error(task, tmcf);
823     }
824 
825     goto cleanup;
826 
827 fail:
828 
829     nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
830                           msg->port_msg.stream, 0, NULL);
831 
832     if (tmcf != NULL) {
833         nxt_mp_release(tmcf->mem_pool);
834     }
835 
836 cleanup:
837 
838     if (p != MAP_FAILED) {
839         nxt_mem_munmap(p, size);
840     }
841 
842     if (msg->fd[0] != -1) {
843         nxt_fd_close(msg->fd[0]);
844         msg->fd[0] = -1;
845     }
846 }
847 
848 
849 static void
850 nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
851 {
852     nxt_app_t            *app;
853     nxt_int_t            ret;
854     nxt_str_t            app_name;
855     nxt_port_t           *reply_port, *shared_port, *old_shared_port;
856     nxt_port_t           *proto_port;
857     nxt_port_msg_type_t  reply;
858 
859     reply_port = nxt_runtime_port_find(task->thread->runtime,
860                                        msg->port_msg.pid,
861                                        msg->port_msg.reply_port);
862     if (nxt_slow_path(reply_port == NULL)) {
863         nxt_alert(task, "app_restart_handler: reply port not found");
864         return;
865     }
866 
867     app_name.length = nxt_buf_mem_used_size(&msg->buf->mem);
868     app_name.start = msg->buf->mem.pos;
869 
870     nxt_debug(task, "app_restart_handler: %V", &app_name);
871 
872     app = nxt_router_app_find(&nxt_router->apps, &app_name);
873 
874     if (nxt_fast_path(app != NULL)) {
875         shared_port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
876                                    NXT_PROCESS_APP);
877         if (nxt_slow_path(shared_port == NULL)) {
878             goto fail;
879         }
880 
881         ret = nxt_port_socket_init(task, shared_port, 0);
882         if (nxt_slow_path(ret != NXT_OK)) {
883             nxt_port_use(task, shared_port, -1);
884             goto fail;
885         }
886 
887         ret = nxt_router_app_queue_init(task, shared_port);
888         if (nxt_slow_path(ret != NXT_OK)) {
889             nxt_port_write_close(shared_port);
890             nxt_port_read_close(shared_port);
891             nxt_port_use(task, shared_port, -1);
892             goto fail;
893         }
894 
895         nxt_port_write_enable(task, shared_port);
896 
897         nxt_thread_mutex_lock(&app->mutex);
898 
899         proto_port = app->proto_port;
900 
901         if (proto_port != NULL) {
902             nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name,
903                       proto_port->pid);
904 
905             app->proto_port = NULL;
906             proto_port->app = NULL;
907         }
908 
909         app->generation++;
910 
911         shared_port->app = app;
912 
913         old_shared_port = app->shared_port;
914         old_shared_port->app = NULL;
915 
916         app->shared_port = shared_port;
917 
918         nxt_thread_mutex_unlock(&app->mutex);
919 
920         nxt_port_close(task, old_shared_port);
921         nxt_port_use(task, old_shared_port, -1);
922 
923         if (proto_port != NULL) {
924             (void) nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT,
925                                          -1, 0, 0, NULL);
926 
927             nxt_port_close(task, proto_port);
928 
929             nxt_port_use(task, proto_port, -1);
930         }
931 
932         reply = NXT_PORT_MSG_RPC_READY_LAST;
933 
934     } else {
935 
936 fail:
937 
938         reply = NXT_PORT_MSG_RPC_ERROR;
939     }
940 
941     nxt_port_socket_write(task, reply_port, reply, -1, msg->port_msg.stream,
942                           0, NULL);
943 }
944 
945 
946 static void
947 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
948     void *data)
949 {
950     union {
951         nxt_pid_t  removed_pid;
952         void       *data;
953     } u;
954 
955     u.data = data;
956 
957     nxt_port_rpc_remove_peer(task, port, u.removed_pid);
958 }
959 
960 
961 static void
962 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
963 {
964     nxt_event_engine_t  *engine;
965 
966     nxt_port_remove_pid_handler(task, msg);
967 
968     nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
969     {
970         if (nxt_fast_path(engine->port != NULL)) {
971             nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid,
972                           msg->u.data);
973         }
974     }
975     nxt_queue_loop;
976 
977     if (msg->port_msg.stream == 0) {
978         return;
979     }
980 
981     msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
982 
983     nxt_port_rpc_handler(task, msg);
984 }
985 
986 
987 static nxt_router_temp_conf_t *
988 nxt_router_temp_conf(nxt_task_t *task)
989 {
990     nxt_mp_t                *mp, *tmp;
991     nxt_router_conf_t       *rtcf;
992     nxt_router_temp_conf_t  *tmcf;
993 
994     mp = nxt_mp_create(1024, 128, 256, 32);
995     if (nxt_slow_path(mp == NULL)) {
996         return NULL;
997     }
998 
999     rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t));
1000     if (nxt_slow_path(rtcf == NULL)) {
1001         goto fail;
1002     }
1003 
1004     rtcf->mem_pool = mp;
1005 
1006     tmp = nxt_mp_create(1024, 128, 256, 32);
1007     if (nxt_slow_path(tmp == NULL)) {
1008         goto fail;
1009     }
1010 
1011     tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t));
1012     if (nxt_slow_path(tmcf == NULL)) {
1013         goto temp_fail;
1014     }
1015 
1016     tmcf->mem_pool = tmp;
1017     tmcf->router_conf = rtcf;
1018     tmcf->count = 1;
1019     tmcf->engine = task->thread->engine;
1020 
1021     tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
1022                                      sizeof(nxt_router_engine_conf_t));
1023     if (nxt_slow_path(tmcf->engines == NULL)) {
1024         goto temp_fail;
1025     }
1026 
1027     nxt_queue_init(&creating_sockets);
1028     nxt_queue_init(&pending_sockets);
1029     nxt_queue_init(&updating_sockets);
1030     nxt_queue_init(&keeping_sockets);
1031     nxt_queue_init(&deleting_sockets);
1032 
1033 #if (NXT_TLS)
1034     nxt_queue_init(&tmcf->tls);
1035 #endif
1036 
1037     nxt_queue_init(&tmcf->apps);
1038     nxt_queue_init(&tmcf->previous);
1039 
1040     return tmcf;
1041 
1042 temp_fail:
1043 
1044     nxt_mp_destroy(tmp);
1045 
1046 fail:
1047 
1048     nxt_mp_destroy(mp);
1049 
1050     return NULL;
1051 }
1052 
1053 
1054 nxt_inline nxt_bool_t
1055 nxt_router_app_can_start(nxt_app_t *app)
1056 {
1057     return app->processes + app->pending_processes < app->max_processes
1058             && app->pending_processes < app->max_pending_processes;
1059 }
1060 
1061 
1062 nxt_inline nxt_bool_t
1063 nxt_router_app_need_start(nxt_app_t *app)
1064 {
1065     return (app->active_requests
1066               > app->port_hash_count + app->pending_processes)
1067            || (app->spare_processes
1068                 > app->idle_processes + app->pending_processes);
1069 }
1070 
1071 
1072 static void
1073 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
1074 {
1075     nxt_int_t                    ret;
1076     nxt_app_t                    *app;
1077     nxt_router_t                 *router;
1078     nxt_runtime_t                *rt;
1079     nxt_queue_link_t             *qlk;
1080     nxt_socket_conf_t            *skcf;
1081     nxt_router_conf_t            *rtcf;
1082     nxt_router_temp_conf_t       *tmcf;
1083     const nxt_event_interface_t  *interface;
1084 #if (NXT_TLS)
1085     nxt_router_tlssock_t         *tls;
1086 #endif
1087 
1088     tmcf = obj;
1089 
1090     qlk = nxt_queue_first(&pending_sockets);
1091 
1092     if (qlk != nxt_queue_tail(&pending_sockets)) {
1093         nxt_queue_remove(qlk);
1094         nxt_queue_insert_tail(&creating_sockets, qlk);
1095 
1096         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1097 
1098         nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
1099 
1100         return;
1101     }
1102 
1103 #if (NXT_TLS)
1104     qlk = nxt_queue_last(&tmcf->tls);
1105 
1106     if (qlk != nxt_queue_head(&tmcf->tls)) {
1107         nxt_queue_remove(qlk);
1108 
1109         tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link);
1110 
1111         nxt_cert_store_get(task, &tls->name, tmcf->mem_pool,
1112                            nxt_router_tls_rpc_handler, tls);
1113         return;
1114     }
1115 #endif
1116 
1117     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1118 
1119         if (nxt_router_app_need_start(app)) {
1120             nxt_router_app_rpc_create(task, tmcf, app);
1121             return;
1122         }
1123 
1124     } nxt_queue_loop;
1125 
1126     rtcf = tmcf->router_conf;
1127 
1128     if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) {
1129         nxt_router_access_log_open(task, tmcf);
1130         return;
1131     }
1132 
1133     rt = task->thread->runtime;
1134 
1135     interface = nxt_service_get(rt->services, "engine", NULL);
1136 
1137     router = rtcf->router;
1138 
1139     ret = nxt_router_engines_create(task, router, tmcf, interface);
1140     if (nxt_slow_path(ret != NXT_OK)) {
1141         goto fail;
1142     }
1143 
1144     ret = nxt_router_threads_create(task, rt, tmcf);
1145     if (nxt_slow_path(ret != NXT_OK)) {
1146         goto fail;
1147     }
1148 
1149     nxt_router_apps_sort(task, router, tmcf);
1150 
1151     nxt_router_apps_hash_use(task, rtcf, 1);
1152 
1153     nxt_router_engines_post(router, tmcf);
1154 
1155     nxt_queue_add(&router->sockets, &updating_sockets);
1156     nxt_queue_add(&router->sockets, &creating_sockets);
1157 
1158     if (router->access_log != rtcf->access_log) {
1159         nxt_router_access_log_use(&router->lock, rtcf->access_log);
1160 
1161         nxt_router_access_log_release(task, &router->lock, router->access_log);
1162 
1163         router->access_log = rtcf->access_log;
1164     }
1165 
1166     nxt_router_conf_ready(task, tmcf);
1167 
1168     return;
1169 
1170 fail:
1171 
1172     nxt_router_conf_error(task, tmcf);
1173 
1174     return;
1175 }
1176 
1177 
1178 static void
1179 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data)
1180 {
1181     nxt_joint_job_t  *job;
1182 
1183     job = obj;
1184 
1185     nxt_router_conf_ready(task, job->tmcf);
1186 }
1187 
1188 
1189 static void
1190 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1191 {
1192     uint32_t               count;
1193     nxt_router_conf_t      *rtcf;
1194     nxt_thread_spinlock_t  *lock;
1195 
1196     nxt_debug(task, "temp conf %p count: %D", tmcf, tmcf->count);
1197 
1198     if (--tmcf->count > 0) {
1199         return;
1200     }
1201 
1202     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
1203 
1204     rtcf = tmcf->router_conf;
1205 
1206     lock = &rtcf->router->lock;
1207 
1208     nxt_thread_spin_lock(lock);
1209 
1210     count = rtcf->count;
1211 
1212     nxt_thread_spin_unlock(lock);
1213 
1214     nxt_debug(task, "rtcf %p: %D", rtcf, count);
1215 
1216     if (count == 0) {
1217         nxt_router_apps_hash_use(task, rtcf, -1);
1218 
1219         nxt_router_access_log_release(task, lock, rtcf->access_log);
1220 
1221         nxt_mp_destroy(rtcf->mem_pool);
1222     }
1223 
1224     nxt_mp_release(tmcf->mem_pool);
1225 }
1226 
1227 
1228 static void
1229 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1230 {
1231     nxt_app_t          *app;
1232     nxt_queue_t        new_socket_confs;
1233     nxt_socket_t       s;
1234     nxt_router_t       *router;
1235     nxt_queue_link_t   *qlk;
1236     nxt_socket_conf_t  *skcf;
1237     nxt_router_conf_t  *rtcf;
1238 
1239     nxt_alert(task, "failed to apply new conf");
1240 
1241     for (qlk = nxt_queue_first(&creating_sockets);
1242          qlk != nxt_queue_tail(&creating_sockets);
1243          qlk = nxt_queue_next(qlk))
1244     {
1245         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1246         s = skcf->listen->socket;
1247 
1248         if (s != -1) {
1249             nxt_socket_close(task, s);
1250         }
1251 
1252         nxt_free(skcf->listen);
1253     }
1254 
1255     nxt_queue_init(&new_socket_confs);
1256     nxt_queue_add(&new_socket_confs, &updating_sockets);
1257     nxt_queue_add(&new_socket_confs, &pending_sockets);
1258     nxt_queue_add(&new_socket_confs, &creating_sockets);
1259 
1260     rtcf = tmcf->router_conf;
1261 
1262     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1263 
1264         nxt_router_app_unlink(task, app);
1265 
1266     } nxt_queue_loop;
1267 
1268     router = rtcf->router;
1269 
1270     nxt_queue_add(&router->sockets, &keeping_sockets);
1271     nxt_queue_add(&router->sockets, &deleting_sockets);
1272 
1273     nxt_queue_add(&router->apps, &tmcf->previous);
1274 
1275     // TODO: new engines and threads
1276 
1277     nxt_router_access_log_release(task, &router->lock, rtcf->access_log);
1278 
1279     nxt_mp_destroy(rtcf->mem_pool);
1280 
1281     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
1282 
1283     nxt_mp_release(tmcf->mem_pool);
1284 }
1285 
1286 
1287 static void
1288 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1289     nxt_port_msg_type_t type)
1290 {
1291     nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);
1292 
1293     nxt_port_use(task, tmcf->port, -1);
1294 
1295     tmcf->port = NULL;
1296 }
1297 
1298 
1299 static nxt_conf_map_t  nxt_router_conf[] = {
1300     {
1301         nxt_string("listeners_threads"),
1302         NXT_CONF_MAP_INT32,
1303         offsetof(nxt_router_conf_t, threads),
1304     },
1305 };
1306 
1307 
1308 static nxt_conf_map_t  nxt_router_app_conf[] = {
1309     {
1310         nxt_string("type"),
1311         NXT_CONF_MAP_STR,
1312         offsetof(nxt_router_app_conf_t, type),
1313     },
1314 
1315     {
1316         nxt_string("limits"),
1317         NXT_CONF_MAP_PTR,
1318         offsetof(nxt_router_app_conf_t, limits_value),
1319     },
1320 
1321     {
1322         nxt_string("processes"),
1323         NXT_CONF_MAP_INT32,
1324         offsetof(nxt_router_app_conf_t, processes),
1325     },
1326 
1327     {
1328         nxt_string("processes"),
1329         NXT_CONF_MAP_PTR,
1330         offsetof(nxt_router_app_conf_t, processes_value),
1331     },
1332 
1333     {
1334         nxt_string("targets"),
1335         NXT_CONF_MAP_PTR,
1336         offsetof(nxt_router_app_conf_t, targets_value),
1337     },
1338 };
1339 
1340 
1341 static nxt_conf_map_t  nxt_router_app_limits_conf[] = {
1342     {
1343         nxt_string("timeout"),
1344         NXT_CONF_MAP_MSEC,
1345         offsetof(nxt_router_app_conf_t, timeout),
1346     },
1347 };
1348 
1349 
1350 static nxt_conf_map_t  nxt_router_app_processes_conf[] = {
1351     {
1352         nxt_string("spare"),
1353         NXT_CONF_MAP_INT32,
1354         offsetof(nxt_router_app_conf_t, spare_processes),
1355     },
1356 
1357     {
1358         nxt_string("max"),
1359         NXT_CONF_MAP_INT32,
1360         offsetof(nxt_router_app_conf_t, max_processes),
1361     },
1362 
1363     {
1364         nxt_string("idle_timeout"),
1365         NXT_CONF_MAP_MSEC,
1366         offsetof(nxt_router_app_conf_t, idle_timeout),
1367     },
1368 };
1369 
1370 
1371 static nxt_conf_map_t  nxt_router_listener_conf[] = {
1372     {
1373         nxt_string("pass"),
1374         NXT_CONF_MAP_STR_COPY,
1375         offsetof(nxt_router_listener_conf_t, pass),
1376     },
1377 
1378     {
1379         nxt_string("application"),
1380         NXT_CONF_MAP_STR_COPY,
1381         offsetof(nxt_router_listener_conf_t, application),
1382     },
1383 };
1384 
1385 
1386 static nxt_conf_map_t  nxt_router_http_conf[] = {
1387     {
1388         nxt_string("header_buffer_size"),
1389         NXT_CONF_MAP_SIZE,
1390         offsetof(nxt_socket_conf_t, header_buffer_size),
1391     },
1392 
1393     {
1394         nxt_string("large_header_buffer_size"),
1395         NXT_CONF_MAP_SIZE,
1396         offsetof(nxt_socket_conf_t, large_header_buffer_size),
1397     },
1398 
1399     {
1400         nxt_string("large_header_buffers"),
1401         NXT_CONF_MAP_SIZE,
1402         offsetof(nxt_socket_conf_t, large_header_buffers),
1403     },
1404 
1405     {
1406         nxt_string("body_buffer_size"),
1407         NXT_CONF_MAP_SIZE,
1408         offsetof(nxt_socket_conf_t, body_buffer_size),
1409     },
1410 
1411     {
1412         nxt_string("max_body_size"),
1413         NXT_CONF_MAP_SIZE,
1414         offsetof(nxt_socket_conf_t, max_body_size),
1415     },
1416 
1417     {
1418         nxt_string("idle_timeout"),
1419         NXT_CONF_MAP_MSEC,
1420         offsetof(nxt_socket_conf_t, idle_timeout),
1421     },
1422 
1423     {
1424         nxt_string("header_read_timeout"),
1425         NXT_CONF_MAP_MSEC,
1426         offsetof(nxt_socket_conf_t, header_read_timeout),
1427     },
1428 
1429     {
1430         nxt_string("body_read_timeout"),
1431         NXT_CONF_MAP_MSEC,
1432         offsetof(nxt_socket_conf_t, body_read_timeout),
1433     },
1434 
1435     {
1436         nxt_string("send_timeout"),
1437         NXT_CONF_MAP_MSEC,
1438         offsetof(nxt_socket_conf_t, send_timeout),
1439     },
1440 
1441     {
1442         nxt_string("body_temp_path"),
1443         NXT_CONF_MAP_STR,
1444         offsetof(nxt_socket_conf_t, body_temp_path),
1445     },
1446 
1447     {
1448         nxt_string("discard_unsafe_fields"),
1449         NXT_CONF_MAP_INT8,
1450         offsetof(nxt_socket_conf_t, discard_unsafe_fields),
1451     },
1452 };
1453 
1454 
1455 static nxt_conf_map_t  nxt_router_websocket_conf[] = {
1456     {
1457         nxt_string("max_frame_size"),
1458         NXT_CONF_MAP_SIZE,
1459         offsetof(nxt_websocket_conf_t, max_frame_size),
1460     },
1461 
1462     {
1463         nxt_string("read_timeout"),
1464         NXT_CONF_MAP_MSEC,
1465         offsetof(nxt_websocket_conf_t, read_timeout),
1466     },
1467 
1468     {
1469         nxt_string("keepalive_interval"),
1470         NXT_CONF_MAP_MSEC,
1471         offsetof(nxt_websocket_conf_t, keepalive_interval),
1472     },
1473 
1474 };
1475 
1476 
1477 static nxt_int_t
1478 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1479     u_char *start, u_char *end)
1480 {
1481     u_char                      *p;
1482     size_t                      size;
1483     nxt_mp_t                    *mp, *app_mp;
1484     uint32_t                    next, next_target;
1485     nxt_int_t                   ret;
1486     nxt_str_t                   name, path, target;
1487     nxt_app_t                   *app, *prev;
1488     nxt_str_t                   *t, *s, *targets;
1489     nxt_uint_t                  n, i;
1490     nxt_port_t                  *port;
1491     nxt_router_t                *router;
1492     nxt_app_joint_t             *app_joint;
1493 #if (NXT_TLS)
1494     nxt_tls_init_t              *tls_init;
1495     nxt_conf_value_t            *certificate;
1496 #endif
1497     nxt_conf_value_t            *conf, *http, *value, *websocket;
1498     nxt_conf_value_t            *applications, *application;
1499     nxt_conf_value_t            *listeners, *listener;
1500     nxt_conf_value_t            *routes_conf, *static_conf, *client_ip_conf;
1501     nxt_socket_conf_t           *skcf;
1502     nxt_http_routes_t           *routes;
1503     nxt_event_engine_t          *engine;
1504     nxt_app_lang_module_t       *lang;
1505     nxt_router_app_conf_t       apcf;
1506     nxt_router_access_log_t     *access_log;
1507     nxt_router_listener_conf_t  lscf;
1508 
1509     static nxt_str_t  http_path = nxt_string("/settings/http");
1510     static nxt_str_t  applications_path = nxt_string("/applications");
1511     static nxt_str_t  listeners_path = nxt_string("/listeners");
1512     static nxt_str_t  routes_path = nxt_string("/routes");
1513     static nxt_str_t  access_log_path = nxt_string("/access_log");
1514 #if (NXT_TLS)
1515     static nxt_str_t  certificate_path = nxt_string("/tls/certificate");
1516     static nxt_str_t  conf_commands_path = nxt_string("/tls/conf_commands");
1517     static nxt_str_t  conf_cache_path = nxt_string("/tls/session/cache_size");
1518     static nxt_str_t  conf_timeout_path = nxt_string("/tls/session/timeout");
1519     static nxt_str_t  conf_tickets = nxt_string("/tls/session/tickets");
1520 #endif
1521     static nxt_str_t  static_path = nxt_string("/settings/http/static");
1522     static nxt_str_t  websocket_path = nxt_string("/settings/http/websocket");
1523     static nxt_str_t  client_ip_path = nxt_string("/client_ip");
1524 
1525     conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
1526     if (conf == NULL) {
1527         nxt_alert(task, "configuration parsing error");
1528         return NXT_ERROR;
1529     }
1530 
1531     mp = tmcf->router_conf->mem_pool;
1532 
1533     ret = nxt_conf_map_object(mp, conf, nxt_router_conf,
1534                               nxt_nitems(nxt_router_conf), tmcf->router_conf);
1535     if (ret != NXT_OK) {
1536         nxt_alert(task, "root map error");
1537         return NXT_ERROR;
1538     }
1539 
1540     if (tmcf->router_conf->threads == 0) {
1541         tmcf->router_conf->threads = nxt_ncpu;
1542     }
1543 
1544     static_conf = nxt_conf_get_path(conf, &static_path);
1545 
1546     ret = nxt_router_conf_process_static(task, tmcf->router_conf, static_conf);
1547     if (nxt_slow_path(ret != NXT_OK)) {
1548         return NXT_ERROR;
1549     }
1550 
1551     router = tmcf->router_conf->router;
1552 
1553     applications = nxt_conf_get_path(conf, &applications_path);
1554 
1555     if (applications != NULL) {
1556         next = 0;
1557 
1558         for ( ;; ) {
1559             application = nxt_conf_next_object_member(applications,
1560                                                       &name, &next);
1561             if (application == NULL) {
1562                 break;
1563             }
1564 
1565             nxt_debug(task, "application \"%V\"", &name);
1566 
1567             size = nxt_conf_json_length(application, NULL);
1568 
1569             app_mp = nxt_mp_create(4096, 128, 1024, 64);
1570             if (nxt_slow_path(app_mp == NULL)) {
1571                 goto fail;
1572             }
1573 
1574             app = nxt_mp_get(app_mp, sizeof(nxt_app_t) + name.length + size);
1575             if (app == NULL) {
1576                 goto app_fail;
1577             }
1578 
1579             nxt_memzero(app, sizeof(nxt_app_t));
1580 
1581             app->mem_pool = app_mp;
1582 
1583             app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t));
1584             app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t)
1585                                                   + name.length);
1586 
1587             p = nxt_conf_json_print(app->conf.start, application, NULL);
1588             app->conf.length = p - app->conf.start;
1589 
1590             nxt_assert(app->conf.length <= size);
1591 
1592             nxt_debug(task, "application conf \"%V\"", &app->conf);
1593 
1594             prev = nxt_router_app_find(&router->apps, &name);
1595 
1596             if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
1597                 nxt_mp_destroy(app_mp);
1598 
1599                 nxt_queue_remove(&prev->link);
1600                 nxt_queue_insert_tail(&tmcf->previous, &prev->link);
1601 
1602                 ret = nxt_router_apps_hash_add(tmcf->router_conf, prev);
1603                 if (nxt_slow_path(ret != NXT_OK)) {
1604                     goto fail;
1605                 }
1606 
1607                 continue;
1608             }
1609 
1610             apcf.processes = 1;
1611             apcf.max_processes = 1;
1612             apcf.spare_processes = 0;
1613             apcf.timeout = 0;
1614             apcf.idle_timeout = 15000;
1615             apcf.limits_value = NULL;
1616             apcf.processes_value = NULL;
1617             apcf.targets_value = NULL;
1618 
1619             app_joint = nxt_malloc(sizeof(nxt_app_joint_t));
1620             if (nxt_slow_path(app_joint == NULL)) {
1621                 goto app_fail;
1622             }
1623 
1624             nxt_memzero(app_joint, sizeof(nxt_app_joint_t));
1625 
1626             ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
1627                                       nxt_nitems(nxt_router_app_conf), &apcf);
1628             if (ret != NXT_OK) {
1629                 nxt_alert(task, "application map error");
1630                 goto app_fail;
1631             }
1632 
1633             if (apcf.limits_value != NULL) {
1634 
1635                 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) {
1636                     nxt_alert(task, "application limits is not object");
1637                     goto app_fail;
1638                 }
1639 
1640                 ret = nxt_conf_map_object(mp, apcf.limits_value,
1641                                         nxt_router_app_limits_conf,
1642                                         nxt_nitems(nxt_router_app_limits_conf),
1643                                         &apcf);
1644                 if (ret != NXT_OK) {
1645                     nxt_alert(task, "application limits map error");
1646                     goto app_fail;
1647                 }
1648             }
1649 
1650             if (apcf.processes_value != NULL
1651                 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT)
1652             {
1653                 ret = nxt_conf_map_object(mp, apcf.processes_value,
1654                                      nxt_router_app_processes_conf,
1655                                      nxt_nitems(nxt_router_app_processes_conf),
1656                                      &apcf);
1657                 if (ret != NXT_OK) {
1658                     nxt_alert(task, "application processes map error");
1659                     goto app_fail;
1660                 }
1661 
1662             } else {
1663                 apcf.max_processes = apcf.processes;
1664                 apcf.spare_processes = apcf.processes;
1665             }
1666 
1667             if (apcf.targets_value != NULL) {
1668                 n = nxt_conf_object_members_count(apcf.targets_value);
1669 
1670                 targets = nxt_mp_get(app_mp, sizeof(nxt_str_t) * n);
1671                 if (nxt_slow_path(targets == NULL)) {
1672                     goto app_fail;
1673                 }
1674 
1675                 next_target = 0;
1676 
1677                 for (i = 0; i < n; i++) {
1678                     (void) nxt_conf_next_object_member(apcf.targets_value,
1679                                                        &target, &next_target);
1680 
1681                     s = nxt_str_dup(app_mp, &targets[i], &target);
1682                     if (nxt_slow_path(s == NULL)) {
1683                         goto app_fail;
1684                     }
1685                 }
1686 
1687             } else {
1688                 targets = NULL;
1689             }
1690 
1691             nxt_debug(task, "application type: %V", &apcf.type);
1692             nxt_debug(task, "application processes: %D", apcf.processes);
1693             nxt_debug(task, "application request timeout: %M", apcf.timeout);
1694 
1695             lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
1696 
1697             if (lang == NULL) {
1698                 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type);
1699                 goto app_fail;
1700             }
1701 
1702             nxt_debug(task, "application language module: \"%s\"", lang->file);
1703 
1704             ret = nxt_thread_mutex_create(&app->mutex);
1705             if (ret != NXT_OK) {
1706                 goto app_fail;
1707             }
1708 
1709             nxt_queue_init(&app->ports);
1710             nxt_queue_init(&app->spare_ports);
1711             nxt_queue_init(&app->idle_ports);
1712             nxt_queue_init(&app->ack_waiting_req);
1713 
1714             app->name.length = name.length;
1715             nxt_memcpy(app->name.start, name.start, name.length);
1716 
1717             app->type = lang->type;
1718             app->max_processes = apcf.max_processes;
1719             app->spare_processes = apcf.spare_processes;
1720             app->max_pending_processes = apcf.spare_processes
1721                                          ? apcf.spare_processes : 1;
1722             app->timeout = apcf.timeout;
1723             app->idle_timeout = apcf.idle_timeout;
1724 
1725             app->targets = targets;
1726 
1727             engine = task->thread->engine;
1728 
1729             app->engine = engine;
1730 
1731             app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;
1732             app->adjust_idle_work.task = &engine->task;
1733             app->adjust_idle_work.obj = app;
1734 
1735             nxt_queue_insert_tail(&tmcf->apps, &app->link);
1736 
1737             ret = nxt_router_apps_hash_add(tmcf->router_conf, app);
1738             if (nxt_slow_path(ret != NXT_OK)) {
1739                 goto app_fail;
1740             }
1741 
1742             nxt_router_app_use(task, app, 1);
1743 
1744             app->joint = app_joint;
1745 
1746             app_joint->use_count = 1;
1747             app_joint->app = app;
1748 
1749             app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS;
1750             app_joint->idle_timer.work_queue = &engine->fast_work_queue;
1751             app_joint->idle_timer.handler = nxt_router_app_idle_timeout;
1752             app_joint->idle_timer.task = &engine->task;
1753             app_joint->idle_timer.log = app_joint->idle_timer.task->log;
1754 
1755             app_joint->free_app_work.handler = nxt_router_free_app;
1756             app_joint->free_app_work.task = &engine->task;
1757             app_joint->free_app_work.obj = app_joint;
1758 
1759             port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
1760                                 NXT_PROCESS_APP);
1761             if (nxt_slow_path(port == NULL)) {
1762                 return NXT_ERROR;
1763             }
1764 
1765             ret = nxt_port_socket_init(task, port, 0);
1766             if (nxt_slow_path(ret != NXT_OK)) {
1767                 nxt_port_use(task, port, -1);
1768                 return NXT_ERROR;
1769             }
1770 
1771             ret = nxt_router_app_queue_init(task, port);
1772             if (nxt_slow_path(ret != NXT_OK)) {
1773                 nxt_port_write_close(port);
1774                 nxt_port_read_close(port);
1775                 nxt_port_use(task, port, -1);
1776                 return NXT_ERROR;
1777             }
1778 
1779             nxt_port_write_enable(task, port);
1780             port->app = app;
1781 
1782             app->shared_port = port;
1783 
1784             nxt_thread_mutex_create(&app->outgoing.mutex);
1785         }
1786     }
1787 
1788     routes_conf = nxt_conf_get_path(conf, &routes_path);
1789     if (nxt_fast_path(routes_conf != NULL)) {
1790         routes = nxt_http_routes_create(task, tmcf, routes_conf);
1791         if (nxt_slow_path(routes == NULL)) {
1792             return NXT_ERROR;
1793         }
1794         tmcf->router_conf->routes = routes;
1795     }
1796 
1797     ret = nxt_upstreams_create(task, tmcf, conf);
1798     if (nxt_slow_path(ret != NXT_OK)) {
1799         return ret;
1800     }
1801 
1802     http = nxt_conf_get_path(conf, &http_path);
1803 #if 0
1804     if (http == NULL) {
1805         nxt_alert(task, "no \"http\" block");
1806         return NXT_ERROR;
1807     }
1808 #endif
1809 
1810     websocket = nxt_conf_get_path(conf, &websocket_path);
1811 
1812     listeners = nxt_conf_get_path(conf, &listeners_path);
1813 
1814     if (listeners != NULL) {
1815         next = 0;
1816 
1817         for ( ;; ) {
1818             listener = nxt_conf_next_object_member(listeners, &name, &next);
1819             if (listener == NULL) {
1820                 break;
1821             }
1822 
1823             skcf = nxt_router_socket_conf(task, tmcf, &name);
1824             if (skcf == NULL) {
1825                 goto fail;
1826             }
1827 
1828             nxt_memzero(&lscf, sizeof(lscf));
1829 
1830             ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
1831                                       nxt_nitems(nxt_router_listener_conf),
1832                                       &lscf);
1833             if (ret != NXT_OK) {
1834                 nxt_alert(task, "listener map error");
1835                 goto fail;
1836             }
1837 
1838             nxt_debug(task, "application: %V", &lscf.application);
1839 
1840             // STUB, default values if http block is not defined.
1841             skcf->header_buffer_size = 2048;
1842             skcf->large_header_buffer_size = 8192;
1843             skcf->large_header_buffers = 4;
1844             skcf->discard_unsafe_fields = 1;
1845             skcf->body_buffer_size = 16 * 1024;
1846             skcf->max_body_size = 8 * 1024 * 1024;
1847             skcf->proxy_header_buffer_size = 64 * 1024;
1848             skcf->proxy_buffer_size = 4096;
1849             skcf->proxy_buffers = 256;
1850             skcf->idle_timeout = 180 * 1000;
1851             skcf->header_read_timeout = 30 * 1000;
1852             skcf->body_read_timeout = 30 * 1000;
1853             skcf->send_timeout = 30 * 1000;
1854             skcf->proxy_timeout = 60 * 1000;
1855             skcf->proxy_send_timeout = 30 * 1000;
1856             skcf->proxy_read_timeout = 30 * 1000;
1857 
1858             skcf->websocket_conf.max_frame_size = 1024 * 1024;
1859             skcf->websocket_conf.read_timeout = 60 * 1000;
1860             skcf->websocket_conf.keepalive_interval = 30 * 1000;
1861 
1862             nxt_str_null(&skcf->body_temp_path);
1863 
1864             if (http != NULL) {
1865                 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
1866                                           nxt_nitems(nxt_router_http_conf),
1867                                           skcf);
1868                 if (ret != NXT_OK) {
1869                     nxt_alert(task, "http map error");
1870                     goto fail;
1871                 }
1872             }
1873 
1874             if (websocket != NULL) {
1875                 ret = nxt_conf_map_object(mp, websocket,
1876                                           nxt_router_websocket_conf,
1877                                           nxt_nitems(nxt_router_websocket_conf),
1878                                           &skcf->websocket_conf);
1879                 if (ret != NXT_OK) {
1880                     nxt_alert(task, "websocket map error");
1881                     goto fail;
1882                 }
1883             }
1884 
1885             t = &skcf->body_temp_path;
1886 
1887             if (t->length == 0) {
1888                 t->start = (u_char *) task->thread->runtime->tmp;
1889                 t->length = nxt_strlen(t->start);
1890             }
1891 
1892             client_ip_conf = nxt_conf_get_path(listener, &client_ip_path);
1893             ret = nxt_router_conf_process_client_ip(task, tmcf, skcf,
1894                                                     client_ip_conf);
1895             if (nxt_slow_path(ret != NXT_OK)) {
1896                 return NXT_ERROR;
1897             }
1898 
1899 #if (NXT_TLS)
1900             certificate = nxt_conf_get_path(listener, &certificate_path);
1901 
1902             if (certificate != NULL) {
1903                 tls_init = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_tls_init_t));
1904                 if (nxt_slow_path(tls_init == NULL)) {
1905                     return NXT_ERROR;
1906                 }
1907 
1908                 tls_init->cache_size = 0;
1909                 tls_init->timeout = 300;
1910 
1911                 value = nxt_conf_get_path(listener, &conf_cache_path);
1912                 if (value != NULL) {
1913                     tls_init->cache_size = nxt_conf_get_number(value);
1914                 }
1915 
1916                 value = nxt_conf_get_path(listener, &conf_timeout_path);
1917                 if (value != NULL) {
1918                     tls_init->timeout = nxt_conf_get_number(value);
1919                 }
1920 
1921                 tls_init->conf_cmds = nxt_conf_get_path(listener,
1922                                                         &conf_commands_path);
1923 
1924                 tls_init->tickets_conf = nxt_conf_get_path(listener,
1925                                                            &conf_tickets);
1926 
1927                 n = nxt_conf_array_elements_count_or_1(certificate);
1928 
1929                 for (i = 0; i < n; i++) {
1930                     value = nxt_conf_get_array_element_or_itself(certificate,
1931                                                                  i);
1932                     nxt_assert(value != NULL);
1933 
1934                     ret = nxt_router_conf_tls_insert(tmcf, value, skcf,
1935                                                      tls_init, i == 0);
1936                     if (nxt_slow_path(ret != NXT_OK)) {
1937                         goto fail;
1938                     }
1939                 }
1940             }
1941 #endif
1942 
1943             skcf->listen->handler = nxt_http_conn_init;
1944             skcf->router_conf = tmcf->router_conf;
1945             skcf->router_conf->count++;
1946 
1947             if (lscf.pass.length != 0) {
1948                 skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass);
1949 
1950             /* COMPATIBILITY: listener application. */
1951             } else if (lscf.application.length > 0) {
1952                 skcf->action = nxt_http_pass_application(task,
1953                                                          tmcf->router_conf,
1954                                                          &lscf.application);
1955             }
1956 
1957             if (nxt_slow_path(skcf->action == NULL)) {
1958                 goto fail;
1959             }
1960         }
1961     }
1962 
1963     ret = nxt_http_routes_resolve(task, tmcf);
1964     if (nxt_slow_path(ret != NXT_OK)) {
1965         goto fail;
1966     }
1967 
1968     value = nxt_conf_get_path(conf, &access_log_path);
1969 
1970     if (value != NULL) {
1971         nxt_conf_get_string(value, &path);
1972 
1973         access_log = router->access_log;
1974 
1975         if (access_log != NULL && nxt_strstr_eq(&path, &access_log->path)) {
1976             nxt_router_access_log_use(&router->lock, access_log);
1977 
1978         } else {
1979             access_log = nxt_malloc(sizeof(nxt_router_access_log_t)
1980                                     + path.length);
1981             if (access_log == NULL) {
1982                 nxt_alert(task, "failed to allocate access log structure");
1983                 goto fail;
1984             }
1985 
1986             access_log->fd = -1;
1987             access_log->handler = &nxt_router_access_log_writer;
1988             access_log->count = 1;
1989 
1990             access_log->path.length = path.length;
1991             access_log->path.start = (u_char *) access_log
1992                                      + sizeof(nxt_router_access_log_t);
1993 
1994             nxt_memcpy(access_log->path.start, path.start, path.length);
1995         }
1996 
1997         tmcf->router_conf->access_log = access_log;
1998     }
1999 
2000     nxt_queue_add(&deleting_sockets, &router->sockets);
2001     nxt_queue_init(&router->sockets);
2002 
2003     return NXT_OK;
2004 
2005 app_fail:
2006 
2007     nxt_mp_destroy(app_mp);
2008 
2009 fail:
2010 
2011     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
2012 
2013         nxt_queue_remove(&app->link);
2014         nxt_thread_mutex_destroy(&app->mutex);
2015         nxt_mp_destroy(app->mem_pool);
2016 
2017     } nxt_queue_loop;
2018 
2019     return NXT_ERROR;
2020 }
2021 
2022 
2023 #if (NXT_TLS)
2024 
2025 static nxt_int_t
2026 nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
2027     nxt_conf_value_t *value, nxt_socket_conf_t *skcf,
2028     nxt_tls_init_t *tls_init, nxt_bool_t last)
2029 {
2030     nxt_router_tlssock_t  *tls;
2031 
2032     tls = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_router_tlssock_t));
2033     if (nxt_slow_path(tls == NULL)) {
2034         return NXT_ERROR;
2035     }
2036 
2037     tls->tls_init = tls_init;
2038     tls->socket_conf = skcf;
2039     tls->temp_conf = tmcf;
2040     tls->last = last;
2041     nxt_conf_get_string(value, &tls->name);
2042 
2043     nxt_queue_insert_tail(&tmcf->tls, &tls->link);
2044 
2045     return NXT_OK;
2046 }
2047 
2048 #endif
2049 
2050 
2051 static nxt_int_t
2052 nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf,
2053     nxt_conf_value_t *conf)
2054 {
2055     uint32_t          next, i;
2056     nxt_mp_t          *mp;
2057     nxt_str_t         *type, exten, str;
2058     nxt_int_t         ret;
2059     nxt_uint_t        exts;
2060     nxt_conf_value_t  *mtypes_conf, *ext_conf, *value;
2061 
2062     static nxt_str_t  mtypes_path = nxt_string("/mime_types");
2063 
2064     mp = rtcf->mem_pool;
2065 
2066     ret = nxt_http_static_mtypes_init(mp, &rtcf->mtypes_hash);
2067     if (nxt_slow_path(ret != NXT_OK)) {
2068         return NXT_ERROR;
2069     }
2070 
2071     if (conf == NULL) {
2072         return NXT_OK;
2073     }
2074 
2075     mtypes_conf = nxt_conf_get_path(conf, &mtypes_path);
2076 
2077     if (mtypes_conf != NULL) {
2078         next = 0;
2079 
2080         for ( ;; ) {
2081             ext_conf = nxt_conf_next_object_member(mtypes_conf, &str, &next);
2082 
2083             if (ext_conf == NULL) {
2084                 break;
2085             }
2086 
2087             type = nxt_str_dup(mp, NULL, &str);
2088             if (nxt_slow_path(type == NULL)) {
2089                 return NXT_ERROR;
2090             }
2091 
2092             if (nxt_conf_type(ext_conf) == NXT_CONF_STRING) {
2093                 nxt_conf_get_string(ext_conf, &str);
2094 
2095                 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2096                     return NXT_ERROR;
2097                 }
2098 
2099                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2100                                                       &exten, type);
2101                 if (nxt_slow_path(ret != NXT_OK)) {
2102                     return NXT_ERROR;
2103                 }
2104 
2105                 continue;
2106             }
2107 
2108             exts = nxt_conf_array_elements_count(ext_conf);
2109 
2110             for (i = 0; i < exts; i++) {
2111                 value = nxt_conf_get_array_element(ext_conf, i);
2112 
2113                 nxt_conf_get_string(value, &str);
2114 
2115                 if (nxt_slow_path(nxt_str_dup(mp, &exten, &str) == NULL)) {
2116                     return NXT_ERROR;
2117                 }
2118 
2119                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
2120                                                       &exten, type);
2121                 if (nxt_slow_path(ret != NXT_OK)) {
2122                     return NXT_ERROR;
2123                 }
2124             }
2125         }
2126     }
2127 
2128     return NXT_OK;
2129 }
2130 
2131 
2132 static nxt_int_t
2133 nxt_router_conf_process_client_ip(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2134     nxt_socket_conf_t *skcf, nxt_conf_value_t *conf)
2135 {
2136     char                        c;
2137     size_t                      i;
2138     nxt_mp_t                    *mp;
2139     uint32_t                    hash;
2140     nxt_str_t                   header;
2141     nxt_conf_value_t            *source_conf, *header_conf, *recursive_conf;
2142     nxt_http_client_ip_t        *client_ip;
2143     nxt_http_route_addr_rule_t  *source;
2144 
2145     static nxt_str_t  header_path = nxt_string("/header");
2146     static nxt_str_t  source_path = nxt_string("/source");
2147     static nxt_str_t  recursive_path = nxt_string("/recursive");
2148 
2149     if (conf == NULL) {
2150         skcf->client_ip = NULL;
2151 
2152         return NXT_OK;
2153     }
2154 
2155     mp = tmcf->router_conf->mem_pool;
2156 
2157     source_conf = nxt_conf_get_path(conf, &source_path);
2158     header_conf = nxt_conf_get_path(conf, &header_path);
2159     recursive_conf = nxt_conf_get_path(conf, &recursive_path);
2160 
2161     if (source_conf == NULL || header_conf == NULL) {
2162         return NXT_ERROR;
2163     }
2164 
2165     client_ip = nxt_mp_zget(mp, sizeof(nxt_http_client_ip_t));
2166     if (nxt_slow_path(client_ip == NULL)) {
2167         return NXT_ERROR;
2168     }
2169 
2170     source = nxt_http_route_addr_rule_create(task, mp, source_conf);
2171     if (nxt_slow_path(source == NULL)) {
2172         return NXT_ERROR;
2173     }
2174 
2175     client_ip->source = source;
2176 
2177     nxt_conf_get_string(header_conf, &header);
2178 
2179     if (recursive_conf != NULL) {
2180         client_ip->recursive = nxt_conf_get_boolean(recursive_conf);
2181     }
2182 
2183     client_ip->header = nxt_str_dup(mp, NULL, &header);
2184     if (nxt_slow_path(client_ip->header == NULL)) {
2185         return NXT_ERROR;
2186     }
2187 
2188     hash = NXT_HTTP_FIELD_HASH_INIT;
2189 
2190     for (i = 0; i < client_ip->header->length; i++) {
2191         c = client_ip->header->start[i];
2192         hash = nxt_http_field_hash_char(hash, nxt_lowcase(c));
2193     }
2194 
2195     hash = nxt_http_field_hash_end(hash) & 0xFFFF;
2196 
2197     client_ip->header_hash = hash;
2198 
2199     skcf->client_ip = client_ip;
2200 
2201     return NXT_OK;
2202 }
2203 
2204 
2205 static nxt_app_t *
2206 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
2207 {
2208     nxt_app_t  *app;
2209 
2210     nxt_queue_each(app, queue, nxt_app_t, link) {
2211 
2212         if (nxt_strstr_eq(name, &app->name)) {
2213             return app;
2214         }
2215 
2216     } nxt_queue_loop;
2217 
2218     return NULL;
2219 }
2220 
2221 
2222 static nxt_int_t
2223 nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port)
2224 {
2225     void       *mem;
2226     nxt_int_t  fd;
2227 
2228     fd = nxt_shm_open(task, sizeof(nxt_app_queue_t));
2229     if (nxt_slow_path(fd == -1)) {
2230         return NXT_ERROR;
2231     }
2232 
2233     mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t),
2234                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2235     if (nxt_slow_path(mem == MAP_FAILED)) {
2236         nxt_fd_close(fd);
2237 
2238         return NXT_ERROR;
2239     }
2240 
2241     nxt_app_queue_init(mem);
2242 
2243     port->queue_fd = fd;
2244     port->queue = mem;
2245 
2246     return NXT_OK;
2247 }
2248 
2249 
2250 static nxt_int_t
2251 nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port)
2252 {
2253     void       *mem;
2254     nxt_int_t  fd;
2255 
2256     fd = nxt_shm_open(task, sizeof(nxt_port_queue_t));
2257     if (nxt_slow_path(fd == -1)) {
2258         return NXT_ERROR;
2259     }
2260 
2261     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2262                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2263     if (nxt_slow_path(mem == MAP_FAILED)) {
2264         nxt_fd_close(fd);
2265 
2266         return NXT_ERROR;
2267     }
2268 
2269     nxt_port_queue_init(mem);
2270 
2271     port->queue_fd = fd;
2272     port->queue = mem;
2273 
2274     return NXT_OK;
2275 }
2276 
2277 
2278 static nxt_int_t
2279 nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd)
2280 {
2281     void  *mem;
2282 
2283     nxt_assert(fd != -1);
2284 
2285     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2286                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2287     if (nxt_slow_path(mem == MAP_FAILED)) {
2288 
2289         return NXT_ERROR;
2290     }
2291 
2292     port->queue = mem;
2293 
2294     return NXT_OK;
2295 }
2296 
2297 
2298 static const nxt_lvlhsh_proto_t  nxt_router_apps_hash_proto  nxt_aligned(64) = {
2299     NXT_LVLHSH_DEFAULT,
2300     nxt_router_apps_hash_test,
2301     nxt_mp_lvlhsh_alloc,
2302     nxt_mp_lvlhsh_free,
2303 };
2304 
2305 
2306 static nxt_int_t
2307 nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
2308 {
2309     nxt_app_t  *app;
2310 
2311     app = data;
2312 
2313     return nxt_strstr_eq(&lhq->key, &app->name) ? NXT_OK : NXT_DECLINED;
2314 }
2315 
2316 
2317 static nxt_int_t
2318 nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, nxt_app_t *app)
2319 {
2320     nxt_lvlhsh_query_t  lhq;
2321 
2322     lhq.key_hash = nxt_djb_hash(app->name.start, app->name.length);
2323     lhq.replace = 0;
2324     lhq.key = app->name;
2325     lhq.value = app;
2326     lhq.proto = &nxt_router_apps_hash_proto;
2327     lhq.pool = rtcf->mem_pool;
2328 
2329     switch (nxt_lvlhsh_insert(&rtcf->apps_hash, &lhq)) {
2330 
2331     case NXT_OK:
2332         return NXT_OK;
2333 
2334     case NXT_DECLINED:
2335         nxt_thread_log_alert("router app hash adding failed: "
2336                              "\"%V\" is already in hash", &lhq.key);
2337         /* Fall through. */
2338     default:
2339         return NXT_ERROR;
2340     }
2341 }
2342 
2343 
2344 static nxt_app_t *
2345 nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, nxt_str_t *name)
2346 {
2347     nxt_lvlhsh_query_t  lhq;
2348 
2349     lhq.key_hash = nxt_djb_hash(name->start, name->length);
2350     lhq.key = *name;
2351     lhq.proto = &nxt_router_apps_hash_proto;
2352 
2353     if (nxt_lvlhsh_find(&rtcf->apps_hash, &lhq) != NXT_OK) {
2354         return NULL;
2355     }
2356 
2357     return lhq.value;
2358 }
2359 
2360 
2361 static void
2362 nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i)
2363 {
2364     nxt_app_t          *app;
2365     nxt_lvlhsh_each_t  lhe;
2366 
2367     nxt_lvlhsh_each_init(&lhe, &nxt_router_apps_hash_proto);
2368 
2369     for ( ;; ) {
2370         app = nxt_lvlhsh_each(&rtcf->apps_hash, &lhe);
2371 
2372         if (app == NULL) {
2373             break;
2374         }
2375 
2376         nxt_router_app_use(task, app, i);
2377     }
2378 }
2379 
2380 
2381 typedef struct {
2382     nxt_app_t  *app;
2383     nxt_int_t  target;
2384 } nxt_http_app_conf_t;
2385 
2386 
2387 nxt_int_t
2388 nxt_router_application_init(nxt_router_conf_t *rtcf, nxt_str_t *name,
2389     nxt_str_t *target, nxt_http_action_t *action)
2390 {
2391     nxt_app_t            *app;
2392     nxt_str_t            *targets;
2393     nxt_uint_t           i;
2394     nxt_http_app_conf_t  *conf;
2395 
2396     app = nxt_router_apps_hash_get(rtcf, name);
2397     if (app == NULL) {
2398         return NXT_DECLINED;
2399     }
2400 
2401     conf = nxt_mp_get(rtcf->mem_pool, sizeof(nxt_http_app_conf_t));
2402     if (nxt_slow_path(conf == NULL)) {
2403         return NXT_ERROR;
2404     }
2405 
2406     action->handler = nxt_http_application_handler;
2407     action->u.conf = conf;
2408 
2409     conf->app = app;
2410 
2411     if (target != NULL && target->length != 0) {
2412         targets = app->targets;
2413 
2414         for (i = 0; !nxt_strstr_eq(target, &targets[i]); i++);
2415 
2416         conf->target = i;
2417 
2418     } else {
2419         conf->target = 0;
2420     }
2421 
2422     return NXT_OK;
2423 }
2424 
2425 
2426 static nxt_socket_conf_t *
2427 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2428     nxt_str_t *name)
2429 {
2430     size_t               size;
2431     nxt_int_t            ret;
2432     nxt_bool_t           wildcard;
2433     nxt_sockaddr_t       *sa;
2434     nxt_socket_conf_t    *skcf;
2435     nxt_listen_socket_t  *ls;
2436 
2437     sa = nxt_sockaddr_parse(tmcf->mem_pool, name);
2438     if (nxt_slow_path(sa == NULL)) {
2439         nxt_alert(task, "invalid listener \"%V\"", name);
2440         return NULL;
2441     }
2442 
2443     sa->type = SOCK_STREAM;
2444 
2445     nxt_debug(task, "router listener: \"%*s\"",
2446               (size_t) sa->length, nxt_sockaddr_start(sa));
2447 
2448     skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t));
2449     if (nxt_slow_path(skcf == NULL)) {
2450         return NULL;
2451     }
2452 
2453     size = nxt_sockaddr_size(sa);
2454 
2455     ret = nxt_router_listen_socket_find(tmcf, skcf, sa);
2456 
2457     if (ret != NXT_OK) {
2458 
2459         ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size);
2460         if (nxt_slow_path(ls == NULL)) {
2461             return NULL;
2462         }
2463 
2464         skcf->listen = ls;
2465 
2466         ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t));
2467         nxt_memcpy(ls->sockaddr, sa, size);
2468 
2469         nxt_listen_socket_remote_size(ls);
2470 
2471         ls->socket = -1;
2472         ls->backlog = NXT_LISTEN_BACKLOG;
2473         ls->flags = NXT_NONBLOCK;
2474         ls->read_after_accept = 1;
2475     }
2476 
2477     switch (sa->u.sockaddr.sa_family) {
2478 #if (NXT_HAVE_UNIX_DOMAIN)
2479     case AF_UNIX:
2480         wildcard = 0;
2481         break;
2482 #endif
2483 #if (NXT_INET6)
2484     case AF_INET6:
2485         wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr);
2486         break;
2487 #endif
2488     case AF_INET:
2489     default:
2490         wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY);
2491         break;
2492     }
2493 
2494     if (!wildcard) {
2495         skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size);
2496         if (nxt_slow_path(skcf->sockaddr == NULL)) {
2497             return NULL;
2498         }
2499 
2500         nxt_memcpy(skcf->sockaddr, sa, size);
2501     }
2502 
2503     return skcf;
2504 }
2505 
2506 
2507 static nxt_int_t
2508 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
2509     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa)
2510 {
2511     nxt_router_t       *router;
2512     nxt_queue_link_t   *qlk;
2513     nxt_socket_conf_t  *skcf;
2514 
2515     router = tmcf->router_conf->router;
2516 
2517     for (qlk = nxt_queue_first(&router->sockets);
2518          qlk != nxt_queue_tail(&router->sockets);
2519          qlk = nxt_queue_next(qlk))
2520     {
2521         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2522 
2523         if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) {
2524             nskcf->listen = skcf->listen;
2525 
2526             nxt_queue_remove(qlk);
2527             nxt_queue_insert_tail(&keeping_sockets, qlk);
2528 
2529             nxt_queue_insert_tail(&updating_sockets, &nskcf->link);
2530 
2531             return NXT_OK;
2532         }
2533     }
2534 
2535     nxt_queue_insert_tail(&pending_sockets, &nskcf->link);
2536 
2537     return NXT_DECLINED;
2538 }
2539 
2540 
2541 static void
2542 nxt_router_listen_socket_rpc_create(nxt_task_t *task,
2543     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)
2544 {
2545     size_t            size;
2546     uint32_t          stream;
2547     nxt_int_t         ret;
2548     nxt_buf_t         *b;
2549     nxt_port_t        *main_port, *router_port;
2550     nxt_runtime_t     *rt;
2551     nxt_socket_rpc_t  *rpc;
2552 
2553     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
2554     if (rpc == NULL) {
2555         goto fail;
2556     }
2557 
2558     rpc->socket_conf = skcf;
2559     rpc->temp_conf = tmcf;
2560 
2561     size = nxt_sockaddr_size(skcf->listen->sockaddr);
2562 
2563     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2564     if (b == NULL) {
2565         goto fail;
2566     }
2567 
2568     b->completion_handler = nxt_buf_dummy_completion;
2569 
2570     b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size);
2571 
2572     rt = task->thread->runtime;
2573     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2574     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2575 
2576     stream = nxt_port_rpc_register_handler(task, router_port,
2577                                            nxt_router_listen_socket_ready,
2578                                            nxt_router_listen_socket_error,
2579                                            main_port->pid, rpc);
2580     if (nxt_slow_path(stream == 0)) {
2581         goto fail;
2582     }
2583 
2584     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1,
2585                                 stream, router_port->id, b);
2586 
2587     if (nxt_slow_path(ret != NXT_OK)) {
2588         nxt_port_rpc_cancel(task, router_port, stream);
2589         goto fail;
2590     }
2591 
2592     return;
2593 
2594 fail:
2595 
2596     nxt_router_conf_error(task, tmcf);
2597 }
2598 
2599 
2600 static void
2601 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2602     void *data)
2603 {
2604     nxt_int_t         ret;
2605     nxt_socket_t      s;
2606     nxt_socket_rpc_t  *rpc;
2607 
2608     rpc = data;
2609 
2610     s = msg->fd[0];
2611 
2612     ret = nxt_socket_nonblocking(task, s);
2613     if (nxt_slow_path(ret != NXT_OK)) {
2614         goto fail;
2615     }
2616 
2617     nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr);
2618 
2619     ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
2620     if (nxt_slow_path(ret != NXT_OK)) {
2621         goto fail;
2622     }
2623 
2624     rpc->socket_conf->listen->socket = s;
2625 
2626     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2627                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2628 
2629     return;
2630 
2631 fail:
2632 
2633     nxt_socket_close(task, s);
2634 
2635     nxt_router_conf_error(task, rpc->temp_conf);
2636 }
2637 
2638 
2639 static void
2640 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2641     void *data)
2642 {
2643     nxt_socket_rpc_t        *rpc;
2644     nxt_router_temp_conf_t  *tmcf;
2645 
2646     rpc = data;
2647     tmcf = rpc->temp_conf;
2648 
2649 #if 0
2650     u_char                  *p;
2651     size_t                  size;
2652     uint8_t                 error;
2653     nxt_buf_t               *in, *out;
2654     nxt_sockaddr_t          *sa;
2655 
2656     static nxt_str_t  socket_errors[] = {
2657         nxt_string("ListenerSystem"),
2658         nxt_string("ListenerNoIPv6"),
2659         nxt_string("ListenerPort"),
2660         nxt_string("ListenerInUse"),
2661         nxt_string("ListenerNoAddress"),
2662         nxt_string("ListenerNoAccess"),
2663         nxt_string("ListenerPath"),
2664     };
2665 
2666     sa = rpc->socket_conf->listen->sockaddr;
2667 
2668     in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size);
2669 
2670     if (nxt_slow_path(in == NULL)) {
2671         return;
2672     }
2673 
2674     p = in->mem.pos;
2675 
2676     error = *p++;
2677 
2678     size = nxt_length("listen socket error: ")
2679            + nxt_length("{listener: \"\", code:\"\", message: \"\"}")
2680            + sa->length + socket_errors[error].length + (in->mem.free - p);
2681 
2682     out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2683     if (nxt_slow_path(out == NULL)) {
2684         return;
2685     }
2686 
2687     out->mem.free = nxt_sprintf(out->mem.free, out->mem.end,
2688                         "listen socket error: "
2689                         "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}",
2690                         (size_t) sa->length, nxt_sockaddr_start(sa),
2691                         &socket_errors[error], in->mem.free - p, p);
2692 
2693     nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos);
2694 #endif
2695 
2696     nxt_router_conf_error(task, tmcf);
2697 }
2698 
2699 
2700 #if (NXT_TLS)
2701 
2702 static void
2703 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2704     void *data)
2705 {
2706     nxt_mp_t                *mp;
2707     nxt_int_t               ret;
2708     nxt_tls_conf_t          *tlscf;
2709     nxt_router_tlssock_t    *tls;
2710     nxt_tls_bundle_conf_t   *bundle;
2711     nxt_router_temp_conf_t  *tmcf;
2712 
2713     nxt_debug(task, "tls rpc handler");
2714 
2715     tls = data;
2716     tmcf = tls->temp_conf;
2717 
2718     if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) {
2719         goto fail;
2720     }
2721 
2722     mp = tmcf->router_conf->mem_pool;
2723 
2724     if (tls->socket_conf->tls == NULL){
2725         tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t));
2726         if (nxt_slow_path(tlscf == NULL)) {
2727             goto fail;
2728         }
2729 
2730         tlscf->no_wait_shutdown = 1;
2731         tls->socket_conf->tls = tlscf;
2732 
2733     } else {
2734         tlscf = tls->socket_conf->tls;
2735     }
2736 
2737     tls->tls_init->conf = tlscf;
2738 
2739     bundle = nxt_mp_get(mp, sizeof(nxt_tls_bundle_conf_t));
2740     if (nxt_slow_path(bundle == NULL)) {
2741         goto fail;
2742     }
2743 
2744     if (nxt_slow_path(nxt_str_dup(mp, &bundle->name, &tls->name) == NULL)) {
2745         goto fail;
2746     }
2747 
2748     bundle->chain_file = msg->fd[0];
2749     bundle->next = tlscf->bundle;
2750     tlscf->bundle = bundle;
2751 
2752     ret = task->thread->runtime->tls->server_init(task, mp, tls->tls_init,
2753                                                   tls->last);
2754     if (nxt_slow_path(ret != NXT_OK)) {
2755         goto fail;
2756     }
2757 
2758     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2759                        nxt_router_conf_apply, task, tmcf, NULL);
2760     return;
2761 
2762 fail:
2763 
2764     nxt_router_conf_error(task, tmcf);
2765 }
2766 
2767 #endif
2768 
2769 
2770 static void
2771 nxt_router_app_rpc_create(nxt_task_t *task,
2772     nxt_router_temp_conf_t *tmcf, nxt_app_t *app)
2773 {
2774     size_t         size;
2775     uint32_t       stream;
2776     nxt_fd_t       port_fd, queue_fd;
2777     nxt_int_t      ret;
2778     nxt_buf_t      *b;
2779     nxt_port_t     *router_port, *dport;
2780     nxt_runtime_t  *rt;
2781     nxt_app_rpc_t  *rpc;
2782 
2783     rt = task->thread->runtime;
2784 
2785     dport = app->proto_port;
2786 
2787     if (dport == NULL) {
2788         nxt_debug(task, "app '%V' prototype prefork", &app->name);
2789 
2790         size = app->name.length + 1 + app->conf.length;
2791 
2792         b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2793         if (nxt_slow_path(b == NULL)) {
2794             goto fail;
2795         }
2796 
2797         b->completion_handler = nxt_buf_dummy_completion;
2798 
2799         nxt_buf_cpystr(b, &app->name);
2800         *b->mem.free++ = '\0';
2801         nxt_buf_cpystr(b, &app->conf);
2802 
2803         dport = rt->port_by_type[NXT_PROCESS_MAIN];
2804 
2805         port_fd = app->shared_port->pair[0];
2806         queue_fd = app->shared_port->queue_fd;
2807 
2808     } else {
2809         nxt_debug(task, "app '%V' prefork", &app->name);
2810 
2811         b = NULL;
2812         port_fd = -1;
2813         queue_fd = -1;
2814     }
2815 
2816     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2817 
2818     rpc = nxt_port_rpc_register_handler_ex(task, router_port,
2819                                            nxt_router_app_prefork_ready,
2820                                            nxt_router_app_prefork_error,
2821                                            sizeof(nxt_app_rpc_t));
2822     if (nxt_slow_path(rpc == NULL)) {
2823         goto fail;
2824     }
2825 
2826     rpc->app = app;
2827     rpc->temp_conf = tmcf;
2828     rpc->proto = (b != NULL);
2829 
2830     stream = nxt_port_rpc_ex_stream(rpc);
2831 
2832     ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS,
2833                                  port_fd, queue_fd, stream, router_port->id, b);
2834     if (nxt_slow_path(ret != NXT_OK)) {
2835         nxt_port_rpc_cancel(task, router_port, stream);
2836         goto fail;
2837     }
2838 
2839     if (b == NULL) {
2840         nxt_port_rpc_ex_set_peer(task, router_port, rpc, dport->pid);
2841 
2842         app->pending_processes++;
2843     }
2844 
2845     return;
2846 
2847 fail:
2848 
2849     nxt_router_conf_error(task, tmcf);
2850 }
2851 
2852 
2853 static void
2854 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2855     void *data)
2856 {
2857     nxt_app_t           *app;
2858     nxt_port_t          *port;
2859     nxt_app_rpc_t       *rpc;
2860     nxt_event_engine_t  *engine;
2861 
2862     rpc = data;
2863     app = rpc->app;
2864 
2865     port = msg->u.new_port;
2866 
2867     nxt_assert(port != NULL);
2868     nxt_assert(port->id == 0);
2869 
2870     if (rpc->proto) {
2871         nxt_assert(app->proto_port == NULL);
2872         nxt_assert(port->type == NXT_PROCESS_PROTOTYPE);
2873 
2874         nxt_port_inc_use(port);
2875 
2876         app->proto_port = port;
2877         port->app = app;
2878 
2879         nxt_router_app_rpc_create(task, rpc->temp_conf, app);
2880 
2881         return;
2882     }
2883 
2884     nxt_assert(port->type == NXT_PROCESS_APP);
2885 
2886     port->app = app;
2887     port->main_app_port = port;
2888 
2889     app->pending_processes--;
2890     app->processes++;
2891     app->idle_processes++;
2892 
2893     engine = task->thread->engine;
2894 
2895     nxt_queue_insert_tail(&app->ports, &port->app_link);
2896     nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
2897 
2898     nxt_debug(task, "app '%V' move new port %PI:%d to spare_ports",
2899               &app->name, port->pid, port->id);
2900 
2901     nxt_port_hash_add(&app->port_hash, port);
2902     app->port_hash_count++;
2903 
2904     port->idle_start = 0;
2905 
2906     nxt_port_inc_use(port);
2907 
2908     nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
2909 
2910     nxt_work_queue_add(&engine->fast_work_queue,
2911                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2912 }
2913 
2914 
2915 static void
2916 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2917     void *data)
2918 {
2919     nxt_app_t               *app;
2920     nxt_app_rpc_t           *rpc;
2921     nxt_router_temp_conf_t  *tmcf;
2922 
2923     rpc = data;
2924     app = rpc->app;
2925     tmcf = rpc->temp_conf;
2926 
2927     if (rpc->proto) {
2928         nxt_log(task, NXT_LOG_WARN, "failed to start prototype \"%V\"",
2929                 &app->name);
2930 
2931     } else {
2932         nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"",
2933                 &app->name);
2934 
2935         app->pending_processes--;
2936     }
2937 
2938     nxt_router_conf_error(task, tmcf);
2939 }
2940 
2941 
2942 static nxt_int_t
2943 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
2944     nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
2945 {
2946     nxt_int_t                 ret;
2947     nxt_uint_t                n, threads;
2948     nxt_queue_link_t          *qlk;
2949     nxt_router_engine_conf_t  *recf;
2950 
2951     threads = tmcf->router_conf->threads;
2952 
2953     tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
2954                                      sizeof(nxt_router_engine_conf_t));
2955     if (nxt_slow_path(tmcf->engines == NULL)) {
2956         return NXT_ERROR;
2957     }
2958 
2959     n = 0;
2960 
2961     for (qlk = nxt_queue_first(&router->engines);
2962          qlk != nxt_queue_tail(&router->engines);
2963          qlk = nxt_queue_next(qlk))
2964     {
2965         recf = nxt_array_zero_add(tmcf->engines);
2966         if (nxt_slow_path(recf == NULL)) {
2967             return NXT_ERROR;
2968         }
2969 
2970         recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0);
2971 
2972         if (n < threads) {
2973             recf->action = NXT_ROUTER_ENGINE_KEEP;
2974             ret = nxt_router_engine_conf_update(tmcf, recf);
2975 
2976         } else {
2977             recf->action = NXT_ROUTER_ENGINE_DELETE;
2978             ret = nxt_router_engine_conf_delete(tmcf, recf);
2979         }
2980 
2981         if (nxt_slow_path(ret != NXT_OK)) {
2982             return ret;
2983         }
2984 
2985         n++;
2986     }
2987 
2988     tmcf->new_threads = n;
2989 
2990     while (n < threads) {
2991         recf = nxt_array_zero_add(tmcf->engines);
2992         if (nxt_slow_path(recf == NULL)) {
2993             return NXT_ERROR;
2994         }
2995 
2996         recf->action = NXT_ROUTER_ENGINE_ADD;
2997 
2998         recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
2999         if (nxt_slow_path(recf->engine == NULL)) {
3000             return NXT_ERROR;
3001         }
3002 
3003         ret = nxt_router_engine_conf_create(tmcf, recf);
3004         if (nxt_slow_path(ret != NXT_OK)) {
3005             return ret;
3006         }
3007 
3008         n++;
3009     }
3010 
3011     return NXT_OK;
3012 }
3013 
3014 
3015 static nxt_int_t
3016 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
3017     nxt_router_engine_conf_t *recf)
3018 {
3019     nxt_int_t  ret;
3020 
3021     ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
3022                                           nxt_router_listen_socket_create);
3023     if (nxt_slow_path(ret != NXT_OK)) {
3024         return ret;
3025     }
3026 
3027     ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
3028                                           nxt_router_listen_socket_create);
3029     if (nxt_slow_path(ret != NXT_OK)) {
3030         return ret;
3031     }
3032 
3033     return ret;
3034 }
3035 
3036 
3037 static nxt_int_t
3038 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
3039     nxt_router_engine_conf_t *recf)
3040 {
3041     nxt_int_t  ret;
3042 
3043     ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
3044                                           nxt_router_listen_socket_create);
3045     if (nxt_slow_path(ret != NXT_OK)) {
3046         return ret;
3047     }
3048 
3049     ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
3050                                           nxt_router_listen_socket_update);
3051     if (nxt_slow_path(ret != NXT_OK)) {
3052         return ret;
3053     }
3054 
3055     ret = nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
3056     if (nxt_slow_path(ret != NXT_OK)) {
3057         return ret;
3058     }
3059 
3060     return ret;
3061 }
3062 
3063 
3064 static nxt_int_t
3065 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
3066     nxt_router_engine_conf_t *recf)
3067 {
3068     nxt_int_t  ret;
3069 
3070     ret = nxt_router_engine_quit(tmcf, recf);
3071     if (nxt_slow_path(ret != NXT_OK)) {
3072         return ret;
3073     }
3074 
3075     ret = nxt_router_engine_joints_delete(tmcf, recf, &updating_sockets);
3076     if (nxt_slow_path(ret != NXT_OK)) {
3077         return ret;
3078     }
3079 
3080     return nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
3081 }
3082 
3083 
3084 static nxt_int_t
3085 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
3086     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
3087     nxt_work_handler_t handler)
3088 {
3089     nxt_int_t                ret;
3090     nxt_joint_job_t          *job;
3091     nxt_queue_link_t         *qlk;
3092     nxt_socket_conf_t        *skcf;
3093     nxt_socket_conf_joint_t  *joint;
3094 
3095     for (qlk = nxt_queue_first(sockets);
3096          qlk != nxt_queue_tail(sockets);
3097          qlk = nxt_queue_next(qlk))
3098     {
3099         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3100         if (nxt_slow_path(job == NULL)) {
3101             return NXT_ERROR;
3102         }
3103 
3104         job->work.next = recf->jobs;
3105         recf->jobs = &job->work;
3106 
3107         job->task = tmcf->engine->task;
3108         job->work.handler = handler;
3109         job->work.task = &job->task;
3110         job->work.obj = job;
3111         job->tmcf = tmcf;
3112 
3113         tmcf->count++;
3114 
3115         joint = nxt_mp_alloc(tmcf->router_conf->mem_pool,
3116                              sizeof(nxt_socket_conf_joint_t));
3117         if (nxt_slow_path(joint == NULL)) {
3118             return NXT_ERROR;
3119         }
3120 
3121         job->work.data = joint;
3122 
3123         ret = nxt_upstreams_joint_create(tmcf, &joint->upstreams);
3124         if (nxt_slow_path(ret != NXT_OK)) {
3125             return ret;
3126         }
3127 
3128         joint->count = 1;
3129 
3130         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
3131         skcf->count++;
3132         joint->socket_conf = skcf;
3133 
3134         joint->engine = recf->engine;
3135     }
3136 
3137     return NXT_OK;
3138 }
3139 
3140 
3141 static nxt_int_t
3142 nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
3143     nxt_router_engine_conf_t *recf)
3144 {
3145     nxt_joint_job_t  *job;
3146 
3147     job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3148     if (nxt_slow_path(job == NULL)) {
3149         return NXT_ERROR;
3150     }
3151 
3152     job->work.next = recf->jobs;
3153     recf->jobs = &job->work;
3154 
3155     job->task = tmcf->engine->task;
3156     job->work.handler = nxt_router_worker_thread_quit;
3157     job->work.task = &job->task;
3158     job->work.obj = NULL;
3159     job->work.data = NULL;
3160     job->tmcf = NULL;
3161 
3162     return NXT_OK;
3163 }
3164 
3165 
3166 static nxt_int_t
3167 nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
3168     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets)
3169 {
3170     nxt_joint_job_t   *job;
3171     nxt_queue_link_t  *qlk;
3172 
3173     for (qlk = nxt_queue_first(sockets);
3174          qlk != nxt_queue_tail(sockets);
3175          qlk = nxt_queue_next(qlk))
3176     {
3177         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
3178         if (nxt_slow_path(job == NULL)) {
3179             return NXT_ERROR;
3180         }
3181 
3182         job->work.next = recf->jobs;
3183         recf->jobs = &job->work;
3184 
3185         job->task = tmcf->engine->task;
3186         job->work.handler = nxt_router_listen_socket_delete;
3187         job->work.task = &job->task;
3188         job->work.obj = job;
3189         job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
3190         job->tmcf = tmcf;
3191 
3192         tmcf->count++;
3193     }
3194 
3195     return NXT_OK;
3196 }
3197 
3198 
3199 static nxt_int_t
3200 nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
3201     nxt_router_temp_conf_t *tmcf)
3202 {
3203     nxt_int_t                 ret;
3204     nxt_uint_t                i, threads;
3205     nxt_router_engine_conf_t  *recf;
3206 
3207     recf = tmcf->engines->elts;
3208     threads = tmcf->router_conf->threads;
3209 
3210     for (i = tmcf->new_threads; i < threads; i++) {
3211         ret = nxt_router_thread_create(task, rt, recf[i].engine);
3212         if (nxt_slow_path(ret != NXT_OK)) {
3213             return ret;
3214         }
3215     }
3216 
3217     return NXT_OK;
3218 }
3219 
3220 
3221 static nxt_int_t
3222 nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
3223     nxt_event_engine_t *engine)
3224 {
3225     nxt_int_t            ret;
3226     nxt_thread_link_t    *link;
3227     nxt_thread_handle_t  handle;
3228 
3229     link = nxt_zalloc(sizeof(nxt_thread_link_t));
3230 
3231     if (nxt_slow_path(link == NULL)) {
3232         return NXT_ERROR;
3233     }
3234 
3235     link->start = nxt_router_thread_start;
3236     link->engine = engine;
3237     link->work.handler = nxt_router_thread_exit_handler;
3238     link->work.task = task;
3239     link->work.data = link;
3240 
3241     nxt_queue_insert_tail(&rt->engines, &engine->link);
3242 
3243     ret = nxt_thread_create(&handle, link);
3244 
3245     if (nxt_slow_path(ret != NXT_OK)) {
3246         nxt_queue_remove(&engine->link);
3247     }
3248 
3249     return ret;
3250 }
3251 
3252 
3253 static void
3254 nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
3255     nxt_router_temp_conf_t *tmcf)
3256 {
3257     nxt_app_t  *app;
3258 
3259     nxt_queue_each(app, &router->apps, nxt_app_t, link) {
3260 
3261         nxt_router_app_unlink(task, app);
3262 
3263     } nxt_queue_loop;
3264 
3265     nxt_queue_add(&router->apps, &tmcf->previous);
3266     nxt_queue_add(&router->apps, &tmcf->apps);
3267 }
3268 
3269 
3270 static void
3271 nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf)
3272 {
3273     nxt_uint_t                n;
3274     nxt_event_engine_t        *engine;
3275     nxt_router_engine_conf_t  *recf;
3276 
3277     recf = tmcf->engines->elts;
3278 
3279     for (n = tmcf->engines->nelts; n != 0; n--) {
3280         engine = recf->engine;
3281 
3282         switch (recf->action) {
3283 
3284         case NXT_ROUTER_ENGINE_KEEP:
3285             break;
3286 
3287         case NXT_ROUTER_ENGINE_ADD:
3288             nxt_queue_insert_tail(&router->engines, &engine->link0);
3289             break;
3290 
3291         case NXT_ROUTER_ENGINE_DELETE:
3292             nxt_queue_remove(&engine->link0);
3293             break;
3294         }
3295 
3296         nxt_router_engine_post(engine, recf->jobs);
3297 
3298         recf++;
3299     }
3300 }
3301 
3302 
3303 static void
3304 nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs)
3305 {
3306     nxt_work_t  *work, *next;
3307 
3308     for (work = jobs; work != NULL; work = next) {
3309         next = work->next;
3310         work->next = NULL;
3311 
3312         nxt_event_engine_post(engine, work);
3313     }
3314 }
3315 
3316 
3317 static nxt_port_handlers_t  nxt_router_app_port_handlers = {
3318     .rpc_error       = nxt_port_rpc_handler,
3319     .mmap            = nxt_port_mmap_handler,
3320     .data            = nxt_port_rpc_handler,
3321     .oosm            = nxt_router_oosm_handler,
3322     .req_headers_ack = nxt_port_rpc_handler,
3323 };
3324 
3325 
3326 static void
3327 nxt_router_thread_start(void *data)
3328 {
3329     nxt_int_t           ret;
3330     nxt_port_t          *port;
3331     nxt_task_t          *task;
3332     nxt_work_t          *work;
3333     nxt_thread_t        *thread;
3334     nxt_thread_link_t   *link;
3335     nxt_event_engine_t  *engine;
3336 
3337     link = data;
3338     engine = link->engine;
3339     task = &engine->task;
3340 
3341     thread = nxt_thread();
3342 
3343     nxt_event_engine_thread_adopt(engine);
3344 
3345     /* STUB */
3346     thread->runtime = engine->task.thread->runtime;
3347 
3348     engine->task.thread = thread;
3349     engine->task.log = thread->log;
3350     thread->engine = engine;
3351     thread->task = &engine->task;
3352 #if 0
3353     thread->fiber = &engine->fibers->fiber;
3354 #endif
3355 
3356     engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
3357     if (nxt_slow_path(engine->mem_pool == NULL)) {
3358         return;
3359     }
3360 
3361     port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid,
3362                         NXT_PROCESS_ROUTER);
3363     if (nxt_slow_path(port == NULL)) {
3364         return;
3365     }
3366 
3367     ret = nxt_port_socket_init(task, port, 0);
3368     if (nxt_slow_path(ret != NXT_OK)) {
3369         nxt_port_use(task, port, -1);
3370         return;
3371     }
3372 
3373     ret = nxt_router_port_queue_init(task, port);
3374     if (nxt_slow_path(ret != NXT_OK)) {
3375         nxt_port_use(task, port, -1);
3376         return;
3377     }
3378 
3379     engine->port = port;
3380 
3381     nxt_port_enable(task, port, &nxt_router_app_port_handlers);
3382 
3383     work = nxt_zalloc(sizeof(nxt_work_t));
3384     if (nxt_slow_path(work == NULL)) {
3385         return;
3386     }
3387 
3388     work->handler = nxt_router_rt_add_port;
3389     work->task = link->work.task;
3390     work->obj = work;
3391     work->data = port;
3392 
3393     nxt_event_engine_post(link->work.task->thread->engine, work);
3394 
3395     nxt_event_engine_start(engine);
3396 }
3397 
3398 
3399 static void
3400 nxt_router_rt_add_port(nxt_task_t *task, void *obj, void *data)
3401 {
3402     nxt_int_t      res;
3403     nxt_port_t     *port;
3404     nxt_runtime_t  *rt;
3405 
3406     rt = task->thread->runtime;
3407     port = data;
3408 
3409     nxt_free(obj);
3410 
3411     res = nxt_port_hash_add(&rt->ports, port);
3412 
3413     if (nxt_fast_path(res == NXT_OK)) {
3414         nxt_port_use(task, port, 1);
3415     }
3416 }
3417 
3418 
3419 static void
3420 nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data)
3421 {
3422     nxt_joint_job_t          *job;
3423     nxt_socket_conf_t        *skcf;
3424     nxt_listen_event_t       *lev;
3425     nxt_listen_socket_t      *ls;
3426     nxt_thread_spinlock_t    *lock;
3427     nxt_socket_conf_joint_t  *joint;
3428 
3429     job = obj;
3430     joint = data;
3431 
3432     nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link);
3433 
3434     skcf = joint->socket_conf;
3435     ls = skcf->listen;
3436 
3437     lev = nxt_listen_event(task, ls);
3438     if (nxt_slow_path(lev == NULL)) {
3439         nxt_router_listen_socket_release(task, skcf);
3440         return;
3441     }
3442 
3443     lev->socket.data = joint;
3444 
3445     lock = &skcf->router_conf->router->lock;
3446 
3447     nxt_thread_spin_lock(lock);
3448     ls->count++;
3449     nxt_thread_spin_unlock(lock);
3450 
3451     job->work.next = NULL;
3452     job->work.handler = nxt_router_conf_wait;
3453 
3454     nxt_event_engine_post(job->tmcf->engine, &job->work);
3455 }
3456 
3457 
3458 nxt_inline nxt_listen_event_t *
3459 nxt_router_listen_event(nxt_queue_t *listen_connections,
3460     nxt_socket_conf_t *skcf)
3461 {
3462     nxt_socket_t        fd;
3463     nxt_queue_link_t    *qlk;
3464     nxt_listen_event_t  *lev;
3465 
3466     fd = skcf->listen->socket;
3467 
3468     for (qlk = nxt_queue_first(listen_connections);
3469          qlk != nxt_queue_tail(listen_connections);
3470          qlk = nxt_queue_next(qlk))
3471     {
3472         lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link);
3473 
3474         if (fd == lev->socket.fd) {
3475             return lev;
3476         }
3477     }
3478 
3479     return NULL;
3480 }
3481 
3482 
3483 static void
3484 nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data)
3485 {
3486     nxt_joint_job_t          *job;
3487     nxt_event_engine_t       *engine;
3488     nxt_listen_event_t       *lev;
3489     nxt_socket_conf_joint_t  *joint, *old;
3490 
3491     job = obj;
3492     joint = data;
3493 
3494     engine = task->thread->engine;
3495 
3496     nxt_queue_insert_tail(&engine->joints, &joint->link);
3497 
3498     lev = nxt_router_listen_event(&engine->listen_connections,
3499                                   joint->socket_conf);
3500 
3501     old = lev->socket.data;
3502     lev->socket.data = joint;
3503     lev->listen = joint->socket_conf->listen;
3504 
3505     job->work.next = NULL;
3506     job->work.handler = nxt_router_conf_wait;
3507 
3508     nxt_event_engine_post(job->tmcf->engine, &job->work);
3509 
3510     /*
3511      * The task is allocated from configuration temporary
3512      * memory pool so it can be freed after engine post operation.
3513      */
3514 
3515     nxt_router_conf_release(&engine->task, old);
3516 }
3517 
3518 
3519 static void
3520 nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data)
3521 {
3522     nxt_socket_conf_t        *skcf;
3523     nxt_listen_event_t       *lev;
3524     nxt_event_engine_t       *engine;
3525     nxt_socket_conf_joint_t  *joint;
3526 
3527     skcf = data;
3528 
3529     engine = task->thread->engine;
3530 
3531     lev = nxt_router_listen_event(&engine->listen_connections, skcf);
3532 
3533     nxt_fd_event_delete(engine, &lev->socket);
3534 
3535     nxt_debug(task, "engine %p: listen socket delete: %d", engine,
3536               lev->socket.fd);
3537 
3538     joint = lev->socket.data;
3539     joint->close_job = obj;
3540 
3541     lev->timer.handler = nxt_router_listen_socket_close;
3542     lev->timer.work_queue = &engine->fast_work_queue;
3543 
3544     nxt_timer_add(engine, &lev->timer, 0);
3545 }
3546 
3547 
3548 static void
3549 nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, void *data)
3550 {
3551     nxt_event_engine_t  *engine;
3552 
3553     nxt_debug(task, "router worker thread quit");
3554 
3555     engine = task->thread->engine;
3556 
3557     engine->shutdown = 1;
3558 
3559     if (nxt_queue_is_empty(&engine->joints)) {
3560         nxt_thread_exit(task->thread);
3561     }
3562 }
3563 
3564 
3565 static void
3566 nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data)
3567 {
3568     nxt_timer_t              *timer;
3569     nxt_joint_job_t          *job;
3570     nxt_listen_event_t       *lev;
3571     nxt_socket_conf_joint_t  *joint;
3572 
3573     timer = obj;
3574     lev = nxt_timer_data(timer, nxt_listen_event_t, timer);
3575 
3576     nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine,
3577               lev->socket.fd);
3578 
3579     nxt_queue_remove(&lev->link);
3580 
3581     joint = lev->socket.data;
3582     lev->socket.data = NULL;
3583 
3584     /* 'task' refers to lev->task and we cannot use after nxt_free() */
3585     task = &task->thread->engine->task;
3586 
3587     nxt_router_listen_socket_release(task, joint->socket_conf);
3588 
3589     job = joint->close_job;
3590     job->work.next = NULL;
3591     job->work.handler = nxt_router_conf_wait;
3592 
3593     nxt_event_engine_post(job->tmcf->engine, &job->work);
3594 
3595     nxt_router_listen_event_release(task, lev, joint);
3596 }
3597 
3598 
3599 static void
3600 nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf)
3601 {
3602     nxt_listen_socket_t    *ls;
3603     nxt_thread_spinlock_t  *lock;
3604 
3605     ls = skcf->listen;
3606     lock = &skcf->router_conf->router->lock;
3607 
3608     nxt_thread_spin_lock(lock);
3609 
3610     nxt_debug(task, "engine %p: listen socket release: ls->count %D",
3611               task->thread->engine, ls->count);
3612 
3613     if (--ls->count != 0) {
3614         ls = NULL;
3615     }
3616 
3617     nxt_thread_spin_unlock(lock);
3618 
3619     if (ls != NULL) {
3620         nxt_socket_close(task, ls->socket);
3621         nxt_free(ls);
3622     }
3623 }
3624 
3625 
3626 void
3627 nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev,
3628     nxt_socket_conf_joint_t *joint)
3629 {
3630     nxt_event_engine_t  *engine;
3631 
3632     nxt_debug(task, "listen event count: %D", lev->count);
3633 
3634     engine = task->thread->engine;
3635 
3636     if (--lev->count == 0) {
3637         if (lev->next != NULL) {
3638             nxt_sockaddr_cache_free(engine, lev->next);
3639 
3640             nxt_conn_free(task, lev->next);
3641         }
3642 
3643         nxt_free(lev);
3644     }
3645 
3646     if (joint != NULL) {
3647         nxt_router_conf_release(task, joint);
3648     }
3649 
3650     if (engine->shutdown && nxt_queue_is_empty(&engine->joints)) {
3651         nxt_thread_exit(task->thread);
3652     }
3653 }
3654 
3655 
3656 void
3657 nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
3658 {
3659     nxt_socket_conf_t      *skcf;
3660     nxt_router_conf_t      *rtcf;
3661     nxt_thread_spinlock_t  *lock;
3662 
3663     nxt_debug(task, "conf joint %p count: %D", joint, joint->count);
3664 
3665     if (--joint->count != 0) {
3666         return;
3667     }
3668 
3669     nxt_queue_remove(&joint->link);
3670 
3671     /*
3672      * The joint content can not be safely used after the critical
3673      * section protected by the spinlock because its memory pool may
3674      * be already destroyed by another thread.
3675      */
3676     skcf = joint->socket_conf;
3677     rtcf = skcf->router_conf;
3678     lock = &rtcf->router->lock;
3679 
3680     nxt_thread_spin_lock(lock);
3681 
3682     nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count,
3683               rtcf, rtcf->count);
3684 
3685     if (--skcf->count != 0) {
3686         skcf = NULL;
3687         rtcf = NULL;
3688 
3689     } else {
3690         nxt_queue_remove(&skcf->link);
3691 
3692         if (--rtcf->count != 0) {
3693             rtcf = NULL;
3694         }
3695     }
3696 
3697     nxt_thread_spin_unlock(lock);
3698 
3699 #if (NXT_TLS)
3700     if (skcf != NULL && skcf->tls != NULL) {
3701         task->thread->runtime->tls->server_free(task, skcf->tls);
3702     }
3703 #endif
3704 
3705     /* TODO remove engine->port */
3706 
3707     if (rtcf != NULL) {
3708         nxt_debug(task, "old router conf is destroyed");
3709 
3710         nxt_router_apps_hash_use(task, rtcf, -1);
3711 
3712         nxt_router_access_log_release(task, lock, rtcf->access_log);
3713 
3714         nxt_mp_thread_adopt(rtcf->mem_pool);
3715 
3716         nxt_mp_destroy(rtcf->mem_pool);
3717     }
3718 }
3719 
3720 
3721 static void
3722 nxt_router_access_log_writer(nxt_task_t *task, nxt_http_request_t *r,
3723     nxt_router_access_log_t *access_log)
3724 {
3725     size_t     size;
3726     u_char     *buf, *p;
3727     nxt_off_t  bytes;
3728 
3729     static nxt_time_string_t  date_cache = {
3730         (nxt_atomic_uint_t) -1,
3731         nxt_router_access_log_date,
3732         "%02d/%s/%4d:%02d:%02d:%02d %c%02d%02d",
3733         nxt_length("31/Dec/1986:19:40:00 +0300"),
3734         NXT_THREAD_TIME_LOCAL,
3735         NXT_THREAD_TIME_SEC,
3736     };
3737 
3738     size = r->remote->address_length
3739            + 6                  /* ' - - [' */
3740            + date_cache.size
3741            + 3                  /* '] "' */
3742            + r->method->length
3743            + 1                  /* space */
3744            + r->target.length
3745            + 1                  /* space */
3746            + r->version.length
3747            + 2                  /* '" ' */
3748            + 3                  /* status */
3749            + 1                  /* space */
3750            + NXT_OFF_T_LEN
3751            + 2                  /* ' "' */
3752            + (r->referer != NULL ? r->referer->value_length : 1)
3753            + 3                  /* '" "' */
3754            + (r->user_agent != NULL ? r->user_agent->value_length : 1)
3755            + 2                  /* '"\n' */
3756     ;
3757 
3758     buf = nxt_mp_nget(r->mem_pool, size);
3759     if (nxt_slow_path(buf == NULL)) {
3760         return;
3761     }
3762 
3763     p = nxt_cpymem(buf, nxt_sockaddr_address(r->remote),
3764                    r->remote->address_length);
3765 
3766     p = nxt_cpymem(p, " - - [", 6);
3767 
3768     p = nxt_thread_time_string(task->thread, &date_cache, p);
3769 
3770     p = nxt_cpymem(p, "] \"", 3);
3771 
3772     if (r->method->length != 0) {
3773         p = nxt_cpymem(p, r->method->start, r->method->length);
3774 
3775         if (r->target.length != 0) {
3776             *p++ = ' ';
3777             p = nxt_cpymem(p, r->target.start, r->target.length);
3778 
3779             if (r->version.length != 0) {
3780                 *p++ = ' ';
3781                 p = nxt_cpymem(p, r->version.start, r->version.length);
3782             }
3783         }
3784 
3785     } else {
3786         *p++ = '-';
3787     }
3788 
3789     p = nxt_cpymem(p, "\" ", 2);
3790 
3791     p = nxt_sprintf(p, p + 3, "%03d", r->status);
3792 
3793     *p++ = ' ';
3794 
3795     bytes = nxt_http_proto[r->protocol].body_bytes_sent(task, r->proto);
3796 
3797     p = nxt_sprintf(p, p + NXT_OFF_T_LEN, "%O", bytes);
3798 
3799     p = nxt_cpymem(p, " \"", 2);
3800 
3801     if (r->referer != NULL) {
3802         p = nxt_cpymem(p, r->referer->value, r->referer->value_length);
3803 
3804     } else {
3805         *p++ = '-';
3806     }
3807 
3808     p = nxt_cpymem(p, "\" \"", 3);
3809 
3810     if (r->user_agent != NULL) {
3811         p = nxt_cpymem(p, r->user_agent->value, r->user_agent->value_length);
3812 
3813     } else {
3814         *p++ = '-';
3815     }
3816 
3817     p = nxt_cpymem(p, "\"\n", 2);
3818 
3819     nxt_fd_write(access_log->fd, buf, p - buf);
3820 }
3821 
3822 
3823 static u_char *
3824 nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, struct tm *tm,
3825     size_t size, const char *format)
3826 {
3827     u_char  sign;
3828     time_t  gmtoff;
3829 
3830     static const char  *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun",
3831                                     "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
3832 
3833     gmtoff = nxt_timezone(tm) / 60;
3834 
3835     if (gmtoff < 0) {
3836         gmtoff = -gmtoff;
3837         sign = '-';
3838 
3839     } else {
3840         sign = '+';
3841     }
3842 
3843     return nxt_sprintf(buf, buf + size, format,
3844                        tm->tm_mday, month[tm->tm_mon], tm->tm_year + 1900,
3845                        tm->tm_hour, tm->tm_min, tm->tm_sec,
3846                        sign, gmtoff / 60, gmtoff % 60);
3847 }
3848 
3849 
3850 static void
3851 nxt_router_access_log_open(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
3852 {
3853     uint32_t                 stream;
3854     nxt_int_t                ret;
3855     nxt_buf_t                *b;
3856     nxt_port_t               *main_port, *router_port;
3857     nxt_runtime_t            *rt;
3858     nxt_router_access_log_t  *access_log;
3859 
3860     access_log = tmcf->router_conf->access_log;
3861 
3862     b = nxt_buf_mem_alloc(tmcf->mem_pool, access_log->path.length + 1, 0);
3863     if (nxt_slow_path(b == NULL)) {
3864         goto fail;
3865     }
3866 
3867     b->completion_handler = nxt_buf_dummy_completion;
3868 
3869     nxt_buf_cpystr(b, &access_log->path);
3870     *b->mem.free++ = '\0';
3871 
3872     rt = task->thread->runtime;
3873     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
3874     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
3875 
3876     stream = nxt_port_rpc_register_handler(task, router_port,
3877                                            nxt_router_access_log_ready,
3878                                            nxt_router_access_log_error,
3879                                            -1, tmcf);
3880     if (nxt_slow_path(stream == 0)) {
3881         goto fail;
3882     }
3883 
3884     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1,
3885                                 stream, router_port->id, b);
3886 
3887     if (nxt_slow_path(ret != NXT_OK)) {
3888         nxt_port_rpc_cancel(task, router_port, stream);
3889         goto fail;
3890     }
3891 
3892     return;
3893 
3894 fail:
3895 
3896     nxt_router_conf_error(task, tmcf);
3897 }
3898 
3899 
3900 static void
3901 nxt_router_access_log_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3902     void *data)
3903 {
3904     nxt_router_temp_conf_t   *tmcf;
3905     nxt_router_access_log_t  *access_log;
3906 
3907     tmcf = data;
3908 
3909     access_log = tmcf->router_conf->access_log;
3910 
3911     access_log->fd = msg->fd[0];
3912 
3913     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
3914                        nxt_router_conf_apply, task, tmcf, NULL);
3915 }
3916 
3917 
3918 static void
3919 nxt_router_access_log_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3920     void *data)
3921 {
3922     nxt_router_temp_conf_t  *tmcf;
3923 
3924     tmcf = data;
3925 
3926     nxt_router_conf_error(task, tmcf);
3927 }
3928 
3929 
3930 static void
3931 nxt_router_access_log_use(nxt_thread_spinlock_t *lock,
3932     nxt_router_access_log_t *access_log)
3933 {
3934     if (access_log == NULL) {
3935         return;
3936     }
3937 
3938     nxt_thread_spin_lock(lock);
3939 
3940     access_log->count++;
3941 
3942     nxt_thread_spin_unlock(lock);
3943 }
3944 
3945 
3946 static void
3947 nxt_router_access_log_release(nxt_task_t *task, nxt_thread_spinlock_t *lock,
3948     nxt_router_access_log_t *access_log)
3949 {
3950     if (access_log == NULL) {
3951         return;
3952     }
3953 
3954     nxt_thread_spin_lock(lock);
3955 
3956     if (--access_log->count != 0) {
3957         access_log = NULL;
3958     }
3959 
3960     nxt_thread_spin_unlock(lock);
3961 
3962     if (access_log != NULL) {
3963 
3964         if (access_log->fd != -1) {
3965             nxt_fd_close(access_log->fd);
3966         }
3967 
3968         nxt_free(access_log);
3969     }
3970 }
3971 
3972 
3973 typedef struct {
3974     nxt_mp_t                 *mem_pool;
3975     nxt_router_access_log_t  *access_log;
3976 } nxt_router_access_log_reopen_t;
3977 
3978 
3979 static void
3980 nxt_router_access_log_reopen_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
3981 {
3982     nxt_mp_t                        *mp;
3983     uint32_t                        stream;
3984     nxt_int_t                       ret;
3985     nxt_buf_t                       *b;
3986     nxt_port_t                      *main_port, *router_port;
3987     nxt_runtime_t                   *rt;
3988     nxt_router_access_log_t         *access_log;
3989     nxt_router_access_log_reopen_t  *reopen;
3990 
3991     access_log = nxt_router->access_log;
3992 
3993     if (access_log == NULL) {
3994         return;
3995     }
3996 
3997     mp = nxt_mp_create(1024, 128, 256, 32);
3998     if (nxt_slow_path(mp == NULL)) {
3999         return;
4000     }
4001 
4002     reopen = nxt_mp_get(mp, sizeof(nxt_router_access_log_reopen_t));
4003     if (nxt_slow_path(reopen == NULL)) {
4004         goto fail;
4005     }
4006 
4007     reopen->mem_pool = mp;
4008     reopen->access_log = access_log;
4009 
4010     b = nxt_buf_mem_alloc(mp, access_log->path.length + 1, 0);
4011     if (nxt_slow_path(b == NULL)) {
4012         goto fail;
4013     }
4014 
4015     b->completion_handler = nxt_router_access_log_reopen_completion;
4016 
4017     nxt_buf_cpystr(b, &access_log->path);
4018     *b->mem.free++ = '\0';
4019 
4020     rt = task->thread->runtime;
4021     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
4022     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
4023 
4024     stream = nxt_port_rpc_register_handler(task, router_port,
4025                                            nxt_router_access_log_reopen_ready,
4026                                            nxt_router_access_log_reopen_error,
4027                                            -1, reopen);
4028     if (nxt_slow_path(stream == 0)) {
4029         goto fail;
4030     }
4031 
4032     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1,
4033                                 stream, router_port->id, b);
4034 
4035     if (nxt_slow_path(ret != NXT_OK)) {
4036         nxt_port_rpc_cancel(task, router_port, stream);
4037         goto fail;
4038     }
4039 
4040     nxt_mp_retain(mp);
4041 
4042     return;
4043 
4044 fail:
4045 
4046     nxt_mp_destroy(mp);
4047 }
4048 
4049 
4050 static void
4051 nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj, void *data)
4052 {
4053     nxt_mp_t   *mp;
4054     nxt_buf_t  *b;
4055 
4056     b = obj;
4057     mp = b->data;
4058 
4059     nxt_mp_release(mp);
4060 }
4061 
4062 
4063 static void
4064 nxt_router_access_log_reopen_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4065     void *data)
4066 {
4067     nxt_router_access_log_t         *access_log;
4068     nxt_router_access_log_reopen_t  *reopen;
4069 
4070     reopen = data;
4071 
4072     access_log = reopen->access_log;
4073 
4074     if (access_log == nxt_router->access_log) {
4075 
4076         if (nxt_slow_path(dup2(msg->fd[0], access_log->fd) == -1)) {
4077             nxt_alert(task, "dup2(%FD, %FD) failed %E",
4078                       msg->fd[0], access_log->fd, nxt_errno);
4079         }
4080     }
4081 
4082     nxt_fd_close(msg->fd[0]);
4083     nxt_mp_release(reopen->mem_pool);
4084 }
4085 
4086 
4087 static void
4088 nxt_router_access_log_reopen_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4089     void *data)
4090 {
4091     nxt_router_access_log_reopen_t  *reopen;
4092 
4093     reopen = data;
4094 
4095     nxt_mp_release(reopen->mem_pool);
4096 }
4097 
4098 
4099 static void
4100 nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data)
4101 {
4102     nxt_port_t           *port;
4103     nxt_thread_link_t    *link;
4104     nxt_event_engine_t   *engine;
4105     nxt_thread_handle_t  handle;
4106 
4107     handle = (nxt_thread_handle_t) (uintptr_t) obj;
4108     link = data;
4109 
4110     nxt_thread_wait(handle);
4111 
4112     engine = link->engine;
4113 
4114     nxt_queue_remove(&engine->link);
4115 
4116     port = engine->port;
4117 
4118     // TODO notify all apps
4119 
4120     port->engine = task->thread->engine;
4121     nxt_mp_thread_adopt(port->mem_pool);
4122     nxt_port_use(task, port, -1);
4123 
4124     nxt_mp_thread_adopt(engine->mem_pool);
4125     nxt_mp_destroy(engine->mem_pool);
4126 
4127     nxt_event_engine_free(engine);
4128 
4129     nxt_free(link);
4130 }
4131 
4132 
4133 static void
4134 nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4135     void *data)
4136 {
4137     size_t                  b_size, count;
4138     nxt_int_t               ret;
4139     nxt_app_t               *app;
4140     nxt_buf_t               *b, *next;
4141     nxt_port_t              *app_port;
4142     nxt_unit_field_t        *f;
4143     nxt_http_field_t        *field;
4144     nxt_http_request_t      *r;
4145     nxt_unit_response_t     *resp;
4146     nxt_request_rpc_data_t  *req_rpc_data;
4147 
4148     req_rpc_data = data;
4149 
4150     r = req_rpc_data->request;
4151     if (nxt_slow_path(r == NULL)) {
4152         return;
4153     }
4154 
4155     if (r->error) {
4156         nxt_request_rpc_data_unlink(task, req_rpc_data);
4157         return;
4158     }
4159 
4160     app = req_rpc_data->app;
4161     nxt_assert(app != NULL);
4162 
4163     if (msg->port_msg.type == _NXT_PORT_MSG_REQ_HEADERS_ACK) {
4164         nxt_router_req_headers_ack_handler(task, msg, req_rpc_data);
4165 
4166         return;
4167     }
4168 
4169     b = (msg->size == 0) ? NULL : msg->buf;
4170 
4171     if (msg->port_msg.last != 0) {
4172         nxt_debug(task, "router data create last buf");
4173 
4174         nxt_buf_chain_add(&b, nxt_http_buf_last(r));
4175 
4176         req_rpc_data->rpc_cancel = 0;
4177 
4178         if (req_rpc_data->apr_action == NXT_APR_REQUEST_FAILED) {
4179             req_rpc_data->apr_action = NXT_APR_GOT_RESPONSE;
4180         }
4181 
4182         nxt_request_rpc_data_unlink(task, req_rpc_data);
4183 
4184     } else {
4185         if (app->timeout != 0) {
4186             r->timer.handler = nxt_router_app_timeout;
4187             r->timer_data = req_rpc_data;
4188             nxt_timer_add(task->thread->engine, &r->timer, app->timeout);
4189         }
4190     }
4191 
4192     if (b == NULL) {
4193         return;
4194     }
4195 
4196     if (msg->buf == b) {
4197         /* Disable instant buffer completion/re-using by port. */
4198         msg->buf = NULL;
4199     }
4200 
4201     if (r->header_sent) {
4202         nxt_buf_chain_add(&r->out, b);
4203         nxt_http_request_send_body(task, r, NULL);
4204 
4205     } else {
4206         b_size = nxt_buf_is_mem(b) ? nxt_buf_mem_used_size(&b->mem) : 0;
4207 
4208         if (nxt_slow_path(b_size < sizeof(nxt_unit_response_t))) {
4209             nxt_alert(task, "response buffer too small: %z", b_size);
4210             goto fail;
4211         }
4212 
4213         resp = (void *) b->mem.pos;
4214         count = (b_size - sizeof(nxt_unit_response_t))
4215                     / sizeof(nxt_unit_field_t);
4216 
4217         if (nxt_slow_path(count < resp->fields_count)) {
4218             nxt_alert(task, "response buffer too small for fields count: %D",
4219                       resp->fields_count);
4220             goto fail;
4221         }
4222 
4223         field = NULL;
4224 
4225         for (f = resp->fields; f < resp->fields + resp->fields_count; f++) {
4226             if (f->skip) {
4227                 continue;
4228             }
4229 
4230             field = nxt_list_add(r->resp.fields);
4231 
4232             if (nxt_slow_path(field == NULL)) {
4233                 goto fail;
4234             }
4235 
4236             field->hash = f->hash;
4237             field->skip = 0;
4238             field->hopbyhop = 0;
4239 
4240             field->name_length = f->name_length;
4241             field->value_length = f->value_length;
4242             field->name = nxt_unit_sptr_get(&f->name);
4243             field->value = nxt_unit_sptr_get(&f->value);
4244 
4245             ret = nxt_http_field_process(field, &nxt_response_fields_hash, r);
4246             if (nxt_slow_path(ret != NXT_OK)) {
4247                 goto fail;
4248             }
4249 
4250             nxt_debug(task, "header%s: %*s: %*s",
4251                       (field->skip ? " skipped" : ""),
4252                       (size_t) field->name_length, field->name,
4253                       (size_t) field->value_length, field->value);
4254 
4255             if (field->skip) {
4256                 r->resp.fields->last->nelts--;
4257             }
4258         }
4259 
4260         r->status = resp->status;
4261 
4262         if (resp->piggyback_content_length != 0) {
4263             b->mem.pos = nxt_unit_sptr_get(&resp->piggyback_content);
4264             b->mem.free = b->mem.pos + resp->piggyback_content_length;
4265 
4266         } else {
4267             b->mem.pos = b->mem.free;
4268         }
4269 
4270         if (nxt_buf_mem_used_size(&b->mem) == 0) {
4271             next = b->next;
4272             b->next = NULL;
4273 
4274             nxt_work_queue_add(&task->thread->engine->fast_work_queue,
4275                                b->completion_handler, task, b, b->parent);
4276 
4277             b = next;
4278         }
4279 
4280         if (b != NULL) {
4281             nxt_buf_chain_add(&r->out, b);
4282         }
4283 
4284         nxt_http_request_header_send(task, r, nxt_http_request_send_body, NULL);
4285 
4286         if (r->websocket_handshake
4287             && r->status == NXT_HTTP_SWITCHING_PROTOCOLS)
4288         {
4289             app_port = req_rpc_data->app_port;
4290             if (nxt_slow_path(app_port == NULL)) {
4291                 goto fail;
4292             }
4293 
4294             nxt_thread_mutex_lock(&app->mutex);
4295 
4296             app_port->main_app_port->active_websockets++;
4297 
4298             nxt_thread_mutex_unlock(&app->mutex);
4299 
4300             nxt_router_app_port_release(task, app, app_port, NXT_APR_UPGRADE);
4301             req_rpc_data->apr_action = NXT_APR_CLOSE;
4302 
4303             nxt_debug(task, "stream #%uD upgrade", req_rpc_data->stream);
4304 
4305             r->state = &nxt_http_websocket;
4306 
4307         } else {
4308             r->state = &nxt_http_request_send_state;
4309         }
4310     }
4311 
4312     return;
4313 
4314 fail:
4315 
4316     nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
4317 
4318     nxt_request_rpc_data_unlink(task, req_rpc_data);
4319 }
4320 
4321 
4322 static void
4323 nxt_router_req_headers_ack_handler(nxt_task_t *task,
4324     nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data)
4325 {
4326     int                 res;
4327     nxt_app_t           *app;
4328     nxt_buf_t           *b;
4329     nxt_bool_t          start_process, unlinked;
4330     nxt_port_t          *app_port, *main_app_port, *idle_port;
4331     nxt_queue_link_t    *idle_lnk;
4332     nxt_http_request_t  *r;
4333 
4334     nxt_debug(task, "stream #%uD: got ack from %PI:%d",
4335               req_rpc_data->stream,
4336               msg->port_msg.pid, msg->port_msg.reply_port);
4337 
4338     nxt_port_rpc_ex_set_peer(task, msg->port, req_rpc_data,
4339                              msg->port_msg.pid);
4340 
4341     app = req_rpc_data->app;
4342     r = req_rpc_data->request;
4343 
4344     start_process = 0;
4345     unlinked = 0;
4346 
4347     nxt_thread_mutex_lock(&app->mutex);
4348 
4349     if (r->app_link.next != NULL) {
4350         nxt_queue_remove(&r->app_link);
4351         r->app_link.next = NULL;
4352 
4353         unlinked = 1;
4354     }
4355 
4356     app_port = nxt_port_hash_find(&app->port_hash, msg->port_msg.pid,
4357                                   msg->port_msg.reply_port);
4358     if (nxt_slow_path(app_port == NULL)) {
4359         nxt_thread_mutex_unlock(&app->mutex);
4360 
4361         nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
4362 
4363         if (unlinked) {
4364             nxt_mp_release(r->mem_pool);
4365         }
4366 
4367         return;
4368     }
4369 
4370     main_app_port = app_port->main_app_port;
4371 
4372     if (nxt_queue_chk_remove(&main_app_port->idle_link)) {
4373         app->idle_processes--;
4374 
4375         nxt_debug(task, "app '%V' move port %PI:%d out of %s (ack)",
4376                   &app->name, main_app_port->pid, main_app_port->id,
4377                   (main_app_port->idle_start ? "idle_ports" : "spare_ports"));
4378 
4379         /* Check port was in 'spare_ports' using idle_start field. */
4380         if (main_app_port->idle_start == 0
4381             && app->idle_processes >= app->spare_processes)
4382         {
4383             /*
4384              * If there is a vacant space in spare ports,
4385              * move the last idle to spare_ports.
4386              */
4387             nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4388 
4389             idle_lnk = nxt_queue_last(&app->idle_ports);
4390             idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link);
4391             nxt_queue_remove(idle_lnk);
4392 
4393             nxt_queue_insert_tail(&app->spare_ports, idle_lnk);
4394 
4395             idle_port->idle_start = 0;
4396 
4397             nxt_debug(task, "app '%V' move port %PI:%d from idle_ports "
4398                       "to spare_ports",
4399                       &app->name, idle_port->pid, idle_port->id);
4400         }
4401 
4402         if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) {
4403             app->pending_processes++;
4404             start_process = 1;
4405         }
4406     }
4407 
4408     main_app_port->active_requests++;
4409 
4410     nxt_port_inc_use(app_port);
4411 
4412     nxt_thread_mutex_unlock(&app->mutex);
4413 
4414     if (unlinked) {
4415         nxt_mp_release(r->mem_pool);
4416     }
4417 
4418     if (start_process) {
4419         nxt_router_start_app_process(task, app);
4420     }
4421 
4422     nxt_port_use(task, req_rpc_data->app_port, -1);
4423 
4424     req_rpc_data->app_port = app_port;
4425 
4426     b = req_rpc_data->msg_info.buf;
4427 
4428     if (b != NULL) {
4429         /* First buffer is already sent.  Start from second. */
4430         b = b->next;
4431 
4432         req_rpc_data->msg_info.buf->next = NULL;
4433     }
4434 
4435     if (req_rpc_data->msg_info.body_fd != -1 || b != NULL) {
4436         nxt_debug(task, "stream #%uD: send body fd %d", req_rpc_data->stream,
4437                   req_rpc_data->msg_info.body_fd);
4438 
4439         if (req_rpc_data->msg_info.body_fd != -1) {
4440             lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET);
4441         }
4442 
4443         res = nxt_port_socket_write(task, app_port, NXT_PORT_MSG_REQ_BODY,
4444                                     req_rpc_data->msg_info.body_fd,
4445                                     req_rpc_data->stream,
4446                                     task->thread->engine->port->id, b);
4447 
4448         if (nxt_slow_path(res != NXT_OK)) {
4449             nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
4450         }
4451     }
4452 
4453     if (app->timeout != 0) {
4454         r->timer.handler = nxt_router_app_timeout;
4455         r->timer_data = req_rpc_data;
4456         nxt_timer_add(task->thread->engine, &r->timer, app->timeout);
4457     }
4458 }
4459 
4460 
4461 static const nxt_http_request_state_t  nxt_http_request_send_state
4462     nxt_aligned(64) =
4463 {
4464     .error_handler = nxt_http_request_error_handler,
4465 };
4466 
4467 
4468 static void
4469 nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data)
4470 {
4471     nxt_buf_t           *out;
4472     nxt_http_request_t  *r;
4473 
4474     r = obj;
4475 
4476     out = r->out;
4477 
4478     if (out != NULL) {
4479         r->out = NULL;
4480         nxt_http_request_send(task, r, out);
4481     }
4482 }
4483 
4484 
4485 static void
4486 nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4487     void *data)
4488 {
4489     nxt_request_rpc_data_t  *req_rpc_data;
4490 
4491     req_rpc_data = data;
4492 
4493     req_rpc_data->rpc_cancel = 0;
4494 
4495     /* TODO cancel message and return if cancelled. */
4496     // nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream);
4497 
4498     if (req_rpc_data->request != NULL) {
4499         nxt_http_request_error(task, req_rpc_data->request,
4500                                NXT_HTTP_SERVICE_UNAVAILABLE);
4501     }
4502 
4503     nxt_request_rpc_data_unlink(task, req_rpc_data);
4504 }
4505 
4506 
4507 static void
4508 nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4509     void *data)
4510 {
4511     uint32_t             n;
4512     nxt_app_t            *app;
4513     nxt_bool_t           start_process, restarted;
4514     nxt_port_t           *port;
4515     nxt_app_joint_t      *app_joint;
4516     nxt_app_joint_rpc_t  *app_joint_rpc;
4517 
4518     nxt_assert(data != NULL);
4519 
4520     app_joint_rpc = data;
4521     app_joint = app_joint_rpc->app_joint;
4522     port = msg->u.new_port;
4523 
4524     nxt_assert(app_joint != NULL);
4525     nxt_assert(port != NULL);
4526     nxt_assert(port->id == 0);
4527 
4528     app = app_joint->app;
4529 
4530     nxt_router_app_joint_use(task, app_joint, -1);
4531 
4532     if (nxt_slow_path(app == NULL)) {
4533         nxt_debug(task, "new port ready for released app, send QUIT");
4534 
4535         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4536 
4537         return;
4538     }
4539 
4540     nxt_thread_mutex_lock(&app->mutex);
4541 
4542     restarted = (app->generation != app_joint_rpc->generation);
4543 
4544     if (app_joint_rpc->proto) {
4545         nxt_assert(app->proto_port == NULL);
4546         nxt_assert(port->type == NXT_PROCESS_PROTOTYPE);
4547 
4548         n = app->proto_port_requests;
4549         app->proto_port_requests = 0;
4550 
4551         if (nxt_slow_path(restarted)) {
4552             nxt_thread_mutex_unlock(&app->mutex);
4553 
4554             nxt_debug(task, "proto port ready for restarted app, send QUIT");
4555 
4556             nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0,
4557                                   NULL);
4558 
4559         } else {
4560             port->app = app;
4561             app->proto_port = port;
4562 
4563             nxt_thread_mutex_unlock(&app->mutex);
4564 
4565             nxt_port_use(task, port, 1);
4566         }
4567 
4568         port = task->thread->runtime->port_by_type[NXT_PROCESS_ROUTER];
4569 
4570         while (n > 0) {
4571             nxt_router_app_use(task, app, 1);
4572 
4573             nxt_router_start_app_process_handler(task, port, app);
4574 
4575             n--;
4576         }
4577 
4578         return;
4579     }
4580 
4581     nxt_assert(port->type == NXT_PROCESS_APP);
4582     nxt_assert(app->pending_processes != 0);
4583 
4584     app->pending_processes--;
4585 
4586     if (nxt_slow_path(restarted)) {
4587         nxt_debug(task, "new port ready for restarted app, send QUIT");
4588 
4589         start_process = !task->thread->engine->shutdown
4590                         && nxt_router_app_can_start(app)
4591                         && nxt_router_app_need_start(app);
4592 
4593         if (start_process) {
4594             app->pending_processes++;
4595         }
4596 
4597         nxt_thread_mutex_unlock(&app->mutex);
4598 
4599         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4600 
4601         if (start_process) {
4602             nxt_router_start_app_process(task, app);
4603         }
4604 
4605         return;
4606     }
4607 
4608     port->app = app;
4609     port->main_app_port = port;
4610 
4611     app->processes++;
4612     nxt_port_hash_add(&app->port_hash, port);
4613     app->port_hash_count++;
4614 
4615     nxt_thread_mutex_unlock(&app->mutex);
4616 
4617     nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d",
4618               &app->name, port->pid, app->processes, app->pending_processes);
4619 
4620     nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
4621 
4622     nxt_router_app_port_release(task, app, port, NXT_APR_NEW_PORT);
4623 }
4624 
4625 
4626 static void
4627 nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4628     void *data)
4629 {
4630     nxt_app_t            *app;
4631     nxt_app_joint_t      *app_joint;
4632     nxt_queue_link_t     *link;
4633     nxt_http_request_t   *r;
4634     nxt_app_joint_rpc_t  *app_joint_rpc;
4635 
4636     nxt_assert(data != NULL);
4637 
4638     app_joint_rpc = data;
4639     app_joint = app_joint_rpc->app_joint;
4640 
4641     nxt_assert(app_joint != NULL);
4642 
4643     app = app_joint->app;
4644 
4645     nxt_router_app_joint_use(task, app_joint, -1);
4646 
4647     if (nxt_slow_path(app == NULL)) {
4648         nxt_debug(task, "start error for released app");
4649 
4650         return;
4651     }
4652 
4653     nxt_debug(task, "app '%V' %p start error", &app->name, app);
4654 
4655     link = NULL;
4656 
4657     nxt_thread_mutex_lock(&app->mutex);
4658 
4659     nxt_assert(app->pending_processes != 0);
4660 
4661     app->pending_processes--;
4662 
4663     if (app->processes == 0 && !nxt_queue_is_empty(&app->ack_waiting_req)) {
4664         link = nxt_queue_first(&app->ack_waiting_req);
4665 
4666         nxt_queue_remove(link);
4667         link->next = NULL;
4668     }
4669 
4670     nxt_thread_mutex_unlock(&app->mutex);
4671 
4672     while (link != NULL) {
4673         r = nxt_container_of(link, nxt_http_request_t, app_link);
4674 
4675         nxt_event_engine_post(r->engine, &r->err_work);
4676 
4677         link = NULL;
4678 
4679         nxt_thread_mutex_lock(&app->mutex);
4680 
4681         if (app->processes == 0 && app->pending_processes == 0
4682             && !nxt_queue_is_empty(&app->ack_waiting_req))
4683         {
4684             link = nxt_queue_first(&app->ack_waiting_req);
4685 
4686             nxt_queue_remove(link);
4687             link->next = NULL;
4688         }
4689 
4690         nxt_thread_mutex_unlock(&app->mutex);
4691     }
4692 }
4693 
4694 
4695 nxt_inline nxt_port_t *
4696 nxt_router_app_get_port_for_quit(nxt_task_t *task, nxt_app_t *app)
4697 {
4698     nxt_port_t  *port;
4699 
4700     port = NULL;
4701 
4702     nxt_thread_mutex_lock(&app->mutex);
4703 
4704     nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
4705 
4706         /* Caller is responsible to decrease port use count. */
4707         nxt_queue_chk_remove(&port->app_link);
4708 
4709         if (nxt_queue_chk_remove(&port->idle_link)) {
4710             app->idle_processes--;
4711 
4712             nxt_debug(task, "app '%V' move port %PI:%d out of %s for quit",
4713                       &app->name, port->pid, port->id,
4714                       (port->idle_start ? "idle_ports" : "spare_ports"));
4715         }
4716 
4717         nxt_port_hash_remove(&app->port_hash, port);
4718         app->port_hash_count--;
4719 
4720         port->app = NULL;
4721         app->processes--;
4722 
4723         break;
4724 
4725     } nxt_queue_loop;
4726 
4727     nxt_thread_mutex_unlock(&app->mutex);
4728 
4729     return port;
4730 }
4731 
4732 
4733 static void
4734 nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i)
4735 {
4736     int  c;
4737 
4738     c = nxt_atomic_fetch_add(&app->use_count, i);
4739 
4740     if (i < 0 && c == -i) {
4741 
4742         if (task->thread->engine != app->engine) {
4743             nxt_event_engine_post(app->engine, &app->joint->free_app_work);
4744 
4745         } else {
4746             nxt_router_free_app(task, app->joint, NULL);
4747         }
4748     }
4749 }
4750 
4751 
4752 static void
4753 nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app)
4754 {
4755     nxt_debug(task, "app '%V' %p unlink", &app->name, app);
4756 
4757     nxt_queue_remove(&app->link);
4758 
4759     nxt_router_app_use(task, app, -1);
4760 }
4761 
4762 
4763 static void
4764 nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app, nxt_port_t *port,
4765     nxt_apr_action_t action)
4766 {
4767     int         inc_use;
4768     uint32_t    got_response, dec_requests;
4769     nxt_bool_t  adjust_idle_timer;
4770     nxt_port_t  *main_app_port;
4771 
4772     nxt_assert(port != NULL);
4773 
4774     inc_use = 0;
4775     got_response = 0;
4776     dec_requests = 0;
4777 
4778     switch (action) {
4779     case NXT_APR_NEW_PORT:
4780         break;
4781     case NXT_APR_REQUEST_FAILED:
4782         dec_requests = 1;
4783         inc_use = -1;
4784         break;
4785     case NXT_APR_GOT_RESPONSE:
4786         got_response = 1;
4787         inc_use = -1;
4788         break;
4789     case NXT_APR_UPGRADE:
4790         got_response = 1;
4791         break;
4792     case NXT_APR_CLOSE:
4793         inc_use = -1;
4794         break;
4795     }
4796 
4797     nxt_debug(task, "app '%V' release port %PI:%d: %d %d", &app->name,
4798               port->pid, port->id,
4799               (int) inc_use, (int) got_response);
4800 
4801     if (port->id == NXT_SHARED_PORT_ID) {
4802         nxt_thread_mutex_lock(&app->mutex);
4803 
4804         app->active_requests -= got_response + dec_requests;
4805 
4806         nxt_thread_mutex_unlock(&app->mutex);
4807 
4808         goto adjust_use;
4809     }
4810 
4811     main_app_port = port->main_app_port;
4812 
4813     nxt_thread_mutex_lock(&app->mutex);
4814 
4815     main_app_port->active_requests -= got_response + dec_requests;
4816     app->active_requests -= got_response + dec_requests;
4817 
4818     if (main_app_port->pair[1] != -1 && main_app_port->app_link.next == NULL) {
4819         nxt_queue_insert_tail(&app->ports, &main_app_port->app_link);
4820 
4821         nxt_port_inc_use(main_app_port);
4822     }
4823 
4824     adjust_idle_timer = 0;
4825 
4826     if (main_app_port->pair[1] != -1
4827         && main_app_port->active_requests == 0
4828         && main_app_port->active_websockets == 0
4829         && main_app_port->idle_link.next == NULL)
4830     {
4831         if (app->idle_processes == app->spare_processes
4832             && app->adjust_idle_work.data == NULL)
4833         {
4834             adjust_idle_timer = 1;
4835             app->adjust_idle_work.data = app;
4836             app->adjust_idle_work.next = NULL;
4837         }
4838 
4839         if (app->idle_processes < app->spare_processes) {
4840             nxt_queue_insert_tail(&app->spare_ports, &main_app_port->idle_link);
4841 
4842             nxt_debug(task, "app '%V' move port %PI:%d to spare_ports",
4843                       &app->name, main_app_port->pid, main_app_port->id);
4844         } else {
4845             nxt_queue_insert_tail(&app->idle_ports, &main_app_port->idle_link);
4846 
4847             main_app_port->idle_start = task->thread->engine->timers.now;
4848 
4849             nxt_debug(task, "app '%V' move port %PI:%d to idle_ports",
4850                       &app->name, main_app_port->pid, main_app_port->id);
4851         }
4852 
4853         app->idle_processes++;
4854     }
4855 
4856     nxt_thread_mutex_unlock(&app->mutex);
4857 
4858     if (adjust_idle_timer) {
4859         nxt_router_app_use(task, app, 1);
4860         nxt_event_engine_post(app->engine, &app->adjust_idle_work);
4861     }
4862 
4863     /* ? */
4864     if (main_app_port->pair[1] == -1) {
4865         nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)",
4866                   &app->name, app, main_app_port, main_app_port->pid);
4867 
4868         goto adjust_use;
4869     }
4870 
4871     nxt_debug(task, "app '%V' %p requests queue is empty, keep the port",
4872               &app->name, app);
4873 
4874 adjust_use:
4875 
4876     nxt_port_use(task, port, inc_use);
4877 }
4878 
4879 
4880 void
4881 nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port)
4882 {
4883     nxt_app_t         *app;
4884     nxt_bool_t        unchain, start_process;
4885     nxt_port_t        *idle_port;
4886     nxt_queue_link_t  *idle_lnk;
4887 
4888     app = port->app;
4889 
4890     nxt_assert(app != NULL);
4891 
4892     nxt_thread_mutex_lock(&app->mutex);
4893 
4894     if (port == app->proto_port) {
4895         app->proto_port = NULL;
4896         port->app = NULL;
4897 
4898         nxt_thread_mutex_unlock(&app->mutex);
4899 
4900         nxt_debug(task, "app '%V' prototype pid %PI closed", &app->name,
4901                   port->pid);
4902 
4903         nxt_port_use(task, port, -1);
4904 
4905         return;
4906     }
4907 
4908     nxt_port_hash_remove(&app->port_hash, port);
4909     app->port_hash_count--;
4910 
4911     if (port->id != 0) {
4912         nxt_thread_mutex_unlock(&app->mutex);
4913 
4914         nxt_debug(task, "app '%V' port (%PI, %d) closed", &app->name,
4915                   port->pid, port->id);
4916 
4917         return;
4918     }
4919 
4920     unchain = nxt_queue_chk_remove(&port->app_link);
4921 
4922     if (nxt_queue_chk_remove(&port->idle_link)) {
4923         app->idle_processes--;
4924 
4925         nxt_debug(task, "app '%V' move port %PI:%d out of %s before close",
4926                   &app->name, port->pid, port->id,
4927                   (port->idle_start ? "idle_ports" : "spare_ports"));
4928 
4929         if (port->idle_start == 0
4930             && app->idle_processes >= app->spare_processes)
4931         {
4932             nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4933 
4934             idle_lnk = nxt_queue_last(&app->idle_ports);
4935             idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link);
4936             nxt_queue_remove(idle_lnk);
4937 
4938             nxt_queue_insert_tail(&app->spare_ports, idle_lnk);
4939 
4940             idle_port->idle_start = 0;
4941 
4942             nxt_debug(task, "app '%V' move port %PI:%d from idle_ports "
4943                       "to spare_ports",
4944                       &app->name, idle_port->pid, idle_port->id);
4945         }
4946     }
4947 
4948     app->processes--;
4949 
4950     start_process = !task->thread->engine->shutdown
4951                     && nxt_router_app_can_start(app)
4952                     && nxt_router_app_need_start(app);
4953 
4954     if (start_process) {
4955         app->pending_processes++;
4956     }
4957 
4958     nxt_thread_mutex_unlock(&app->mutex);
4959 
4960     nxt_debug(task, "app '%V' pid %PI closed", &app->name, port->pid);
4961 
4962     if (unchain) {
4963         nxt_port_use(task, port, -1);
4964     }
4965 
4966     if (start_process) {
4967         nxt_router_start_app_process(task, app);
4968     }
4969 }
4970 
4971 
4972 static void
4973 nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data)
4974 {
4975     nxt_app_t           *app;
4976     nxt_bool_t          queued;
4977     nxt_port_t          *port;
4978     nxt_msec_t          timeout, threshold;
4979     nxt_queue_link_t    *lnk;
4980     nxt_event_engine_t  *engine;
4981 
4982     app = obj;
4983     queued = (data == app);
4984 
4985     nxt_debug(task, "nxt_router_adjust_idle_timer: app \"%V\", queued %b",
4986               &app->name, queued);
4987 
4988     engine = task->thread->engine;
4989 
4990     nxt_assert(app->engine == engine);
4991 
4992     threshold = engine->timers.now + app->joint->idle_timer.bias;
4993     timeout = 0;
4994 
4995     nxt_thread_mutex_lock(&app->mutex);
4996 
4997     if (queued) {
4998         app->adjust_idle_work.data = NULL;
4999     }
5000 
5001     nxt_debug(task, "app '%V' idle_processes %d, spare_processes %d",
5002               &app->name,
5003               (int) app->idle_processes, (int) app->spare_processes);
5004 
5005     while (app->idle_processes > app->spare_processes) {
5006 
5007         nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
5008 
5009         lnk = nxt_queue_first(&app->idle_ports);
5010         port = nxt_queue_link_data(lnk, nxt_port_t, idle_link);
5011 
5012         timeout = port->idle_start + app->idle_timeout;
5013 
5014         nxt_debug(task, "app '%V' pid %PI, start %M, timeout %M, threshold %M",
5015                   &app->name, port->pid,
5016                   port->idle_start, timeout, threshold);
5017 
5018         if (timeout > threshold) {
5019             break;
5020         }
5021 
5022         nxt_queue_remove(lnk);
5023         lnk->next = NULL;
5024 
5025         nxt_debug(task, "app '%V' move port %PI:%d out of idle_ports (timeout)",
5026                   &app->name, port->pid, port->id);
5027 
5028         nxt_queue_chk_remove(&port->app_link);
5029 
5030         nxt_port_hash_remove(&app->port_hash, port);
5031         app->port_hash_count--;
5032 
5033         app->idle_processes--;
5034         app->processes--;
5035         port->app = NULL;
5036 
5037         nxt_thread_mutex_unlock(&app->mutex);
5038 
5039         nxt_debug(task, "app '%V' send QUIT to idle port %PI",
5040                   &app->name, port->pid);
5041 
5042         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
5043 
5044         nxt_port_use(task, port, -1);
5045 
5046         nxt_thread_mutex_lock(&app->mutex);
5047     }
5048 
5049     nxt_thread_mutex_unlock(&app->mutex);
5050 
5051     if (timeout > threshold) {
5052         nxt_timer_add(engine, &app->joint->idle_timer, timeout - threshold);
5053 
5054     } else {
5055         nxt_timer_disable(engine, &app->joint->idle_timer);
5056     }
5057 
5058     if (queued) {
5059         nxt_router_app_use(task, app, -1);
5060     }
5061 }
5062 
5063 
5064 static void
5065 nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, void *data)
5066 {
5067     nxt_timer_t      *timer;
5068     nxt_app_joint_t  *app_joint;
5069 
5070     timer = obj;
5071     app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer);
5072 
5073     if (nxt_fast_path(app_joint->app != NULL)) {
5074         nxt_router_adjust_idle_timer(task, app_joint->app, NULL);
5075     }
5076 }
5077 
5078 
5079 static void
5080 nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, void *data)
5081 {
5082     nxt_timer_t      *timer;
5083     nxt_app_joint_t  *app_joint;
5084 
5085     timer = obj;
5086     app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer);
5087 
5088     nxt_router_app_joint_use(task, app_joint, -1);
5089 }
5090 
5091 
5092 static void
5093 nxt_router_free_app(nxt_task_t *task, void *obj, void *data)
5094 {
5095     nxt_app_t        *app;
5096     nxt_port_t       *port, *proto_port;
5097     nxt_app_joint_t  *app_joint;
5098 
5099     app_joint = obj;
5100     app = app_joint->app;
5101 
5102     for ( ;; ) {
5103         port = nxt_router_app_get_port_for_quit(task, app);
5104         if (port == NULL) {
5105             break;
5106         }
5107 
5108         nxt_port_use(task, port, -1);
5109     }
5110 
5111     nxt_thread_mutex_lock(&app->mutex);
5112 
5113     for ( ;; ) {
5114         port = nxt_port_hash_retrieve(&app->port_hash);
5115         if (port == NULL) {
5116             break;
5117         }
5118 
5119         app->port_hash_count--;
5120 
5121         port->app = NULL;
5122 
5123         nxt_port_close(task, port);
5124 
5125         nxt_port_use(task, port, -1);
5126     }
5127 
5128     proto_port = app->proto_port;
5129 
5130     if (proto_port != NULL) {
5131         nxt_debug(task, "send QUIT to prototype '%V' pid %PI", &app->name,
5132                   proto_port->pid);
5133 
5134         app->proto_port = NULL;
5135         proto_port->app = NULL;
5136     }
5137 
5138     nxt_thread_mutex_unlock(&app->mutex);
5139 
5140     if (proto_port != NULL) {
5141         nxt_port_socket_write(task, proto_port, NXT_PORT_MSG_QUIT,
5142                               -1, 0, 0, NULL);
5143 
5144         nxt_port_close(task, proto_port);
5145 
5146         nxt_port_use(task, proto_port, -1);
5147     }
5148 
5149     nxt_assert(app->proto_port == NULL);
5150     nxt_assert(app->processes == 0);
5151     nxt_assert(app->active_requests == 0);
5152     nxt_assert(app->port_hash_count == 0);
5153     nxt_assert(app->idle_processes == 0);
5154     nxt_assert(nxt_queue_is_empty(&app->ports));
5155     nxt_assert(nxt_queue_is_empty(&app->spare_ports));
5156     nxt_assert(nxt_queue_is_empty(&app->idle_ports));
5157 
5158     nxt_port_mmaps_destroy(&app->outgoing, 1);
5159 
5160     nxt_thread_mutex_destroy(&app->outgoing.mutex);
5161 
5162     if (app->shared_port != NULL) {
5163         app->shared_port->app = NULL;
5164         nxt_port_close(task, app->shared_port);
5165         nxt_port_use(task, app->shared_port, -1);
5166 
5167         app->shared_port = NULL;
5168     }
5169 
5170     nxt_thread_mutex_destroy(&app->mutex);
5171     nxt_mp_destroy(app->mem_pool);
5172 
5173     app_joint->app = NULL;
5174 
5175     if (nxt_timer_delete(task->thread->engine, &app_joint->idle_timer)) {
5176         app_joint->idle_timer.handler = nxt_router_app_joint_release_handler;
5177         nxt_timer_add(task->thread->engine, &app_joint->idle_timer, 0);
5178 
5179     } else {
5180         nxt_router_app_joint_use(task, app_joint, -1);
5181     }
5182 }
5183 
5184 
5185 static void
5186 nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
5187     nxt_request_rpc_data_t *req_rpc_data)
5188 {
5189     nxt_bool_t          start_process;
5190     nxt_port_t          *port;
5191     nxt_http_request_t  *r;
5192 
5193     start_process = 0;
5194 
5195     nxt_thread_mutex_lock(&app->mutex);
5196 
5197     port = app->shared_port;
5198     nxt_port_inc_use(port);
5199 
5200     app->active_requests++;
5201 
5202     if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) {
5203         app->pending_processes++;
5204         start_process = 1;
5205     }
5206 
5207     r = req_rpc_data->request;
5208 
5209     /*
5210      * Put request into application-wide list to be able to cancel request
5211      * if something goes wrong with application processes.
5212      */
5213     nxt_queue_insert_tail(&app->ack_waiting_req, &r->app_link);
5214 
5215     nxt_thread_mutex_unlock(&app->mutex);
5216 
5217     /*
5218      * Retain request memory pool while request is linked in ack_waiting_req
5219      * to guarantee request structure memory is accessble.
5220      */
5221     nxt_mp_retain(r->mem_pool);
5222 
5223     req_rpc_data->app_port = port;
5224     req_rpc_data->apr_action = NXT_APR_REQUEST_FAILED;
5225 
5226     if (start_process) {
5227         nxt_router_start_app_process(task, app);
5228     }
5229 }
5230 
5231 
5232 void
5233 nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r,
5234     nxt_http_action_t *action)
5235 {
5236     nxt_event_engine_t      *engine;
5237     nxt_http_app_conf_t     *conf;
5238     nxt_request_rpc_data_t  *req_rpc_data;
5239 
5240     conf = action->u.conf;
5241     engine = task->thread->engine;
5242 
5243     r->app_target = conf->target;
5244 
5245     req_rpc_data = nxt_port_rpc_register_handler_ex(task, engine->port,
5246                                           nxt_router_response_ready_handler,
5247                                           nxt_router_response_error_handler,
5248                                           sizeof(nxt_request_rpc_data_t));
5249     if (nxt_slow_path(req_rpc_data == NULL)) {
5250         nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
5251         return;
5252     }
5253 
5254     /*
5255      * At this point we have request req_rpc_data allocated and registered
5256      * in port handlers.  Need to fixup request memory pool.  Counterpart
5257      * release will be called via following call chain:
5258      *    nxt_request_rpc_data_unlink() ->
5259      *        nxt_router_http_request_release_post() ->
5260      *            nxt_router_http_request_release()
5261      */
5262     nxt_mp_retain(r->mem_pool);
5263 
5264     r->timer.task = &engine->task;
5265     r->timer.work_queue = &engine->fast_work_queue;
5266     r->timer.log = engine->task.log;
5267     r->timer.bias = NXT_TIMER_DEFAULT_BIAS;
5268 
5269     r->engine = engine;
5270     r->err_work.handler = nxt_router_http_request_error;
5271     r->err_work.task = task;
5272     r->err_work.obj = r;
5273 
5274     req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data);
5275     req_rpc_data->app = conf->app;
5276     req_rpc_data->msg_info.body_fd = -1;
5277     req_rpc_data->rpc_cancel = 1;
5278 
5279     nxt_router_app_use(task, conf->app, 1);
5280 
5281     req_rpc_data->request = r;
5282     r->req_rpc_data = req_rpc_data;
5283 
5284     if (r->last != NULL) {
5285         r->last->completion_handler = nxt_router_http_request_done;
5286     }
5287 
5288     nxt_router_app_port_get(task, conf->app, req_rpc_data);
5289     nxt_router_app_prepare_request(task, req_rpc_data);
5290 }
5291 
5292 
5293 static void
5294 nxt_router_http_request_error(nxt_task_t *task, void *obj, void *data)
5295 {
5296     nxt_http_request_t  *r;
5297 
5298     r = obj;
5299 
5300     nxt_debug(task, "router http request error (rpc_data %p)", r->req_rpc_data);
5301 
5302     nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
5303 
5304     if (r->req_rpc_data != NULL) {
5305         nxt_request_rpc_data_unlink(task, r->req_rpc_data);
5306     }
5307 
5308     nxt_mp_release(r->mem_pool);
5309 }
5310 
5311 
5312 static void
5313 nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data)
5314 {
5315     nxt_http_request_t  *r;
5316 
5317     r = data;
5318 
5319     nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data);
5320 
5321     if (r->req_rpc_data != NULL) {
5322         nxt_request_rpc_data_unlink(task, r->req_rpc_data);
5323     }
5324 
5325     nxt_http_request_close_handler(task, r, r->proto.any);
5326 }
5327 
5328 
5329 static void
5330 nxt_router_app_prepare_request(nxt_task_t *task,
5331     nxt_request_rpc_data_t *req_rpc_data)
5332 {
5333     nxt_app_t         *app;
5334     nxt_buf_t         *buf, *body;
5335     nxt_int_t         res;
5336     nxt_port_t        *port, *reply_port;
5337 
5338     int                   notify;
5339     struct {
5340         nxt_port_msg_t       pm;
5341         nxt_port_mmap_msg_t  mm;
5342     } msg;
5343 
5344 
5345     app = req_rpc_data->app;
5346 
5347     nxt_assert(app != NULL);
5348 
5349     port = req_rpc_data->app_port;
5350 
5351     nxt_assert(port != NULL);
5352     nxt_assert(port->queue != NULL);
5353 
5354     reply_port = task->thread->engine->port;
5355 
5356     buf = nxt_router_prepare_msg(task, req_rpc_data->request, app,
5357                                  nxt_app_msg_prefix[app->type]);
5358     if (nxt_slow_path(buf == NULL)) {
5359         nxt_alert(task, "stream #%uD, app '%V': failed to prepare app message",
5360                   req_rpc_data->stream, &app->name);
5361 
5362         nxt_http_request_error(task, req_rpc_data->request,
5363                                NXT_HTTP_INTERNAL_SERVER_ERROR);
5364 
5365         return;
5366     }
5367 
5368     nxt_debug(task, "about to send %O bytes buffer to app process port %d",
5369                     nxt_buf_used_size(buf),
5370                     port->socket.fd);
5371 
5372     req_rpc_data->msg_info.buf = buf;
5373 
5374     body = req_rpc_data->request->body;
5375 
5376     if (body != NULL && nxt_buf_is_file(body)) {
5377         req_rpc_data->msg_info.body_fd = body->file->fd;
5378 
5379         body->file->fd = -1;
5380 
5381     } else {
5382         req_rpc_data->msg_info.body_fd = -1;
5383     }
5384 
5385     msg.pm.stream = req_rpc_data->stream;
5386     msg.pm.pid = reply_port->pid;
5387     msg.pm.reply_port = reply_port->id;
5388     msg.pm.type = NXT_PORT_MSG_REQ_HEADERS;
5389     msg.pm.last = 0;
5390     msg.pm.mmap = 1;
5391     msg.pm.nf = 0;
5392     msg.pm.mf = 0;
5393     msg.pm.tracking = 0;
5394 
5395     nxt_port_mmap_handler_t *mmap_handler = buf->parent;
5396     nxt_port_mmap_header_t *hdr = mmap_handler->hdr;
5397 
5398     msg.mm.mmap_id = hdr->id;
5399     msg.mm.chunk_id = nxt_port_mmap_chunk_id(hdr, buf->mem.pos);
5400     msg.mm.size = nxt_buf_used_size(buf);
5401 
5402     res = nxt_app_queue_send(port->queue, &msg, sizeof(msg),
5403                              req_rpc_data->stream, &notify,
5404                              &req_rpc_data->msg_info.tracking_cookie);
5405     if (nxt_fast_path(res == NXT_OK)) {
5406         if (notify != 0) {
5407             (void) nxt_port_socket_write(task, port,
5408                                          NXT_PORT_MSG_READ_QUEUE,
5409                                          -1, req_rpc_data->stream,
5410                                          reply_port->id, NULL);
5411 
5412         } else {
5413             nxt_debug(task, "queue is not empty");
5414         }
5415 
5416         buf->is_port_mmap_sent = 1;
5417         buf->mem.pos = buf->mem.free;
5418 
5419     } else {
5420         nxt_alert(task, "stream #%uD, app '%V': failed to send app message",
5421                   req_rpc_data->stream, &app->name);
5422 
5423         nxt_http_request_error(task, req_rpc_data->request,
5424                                NXT_HTTP_INTERNAL_SERVER_ERROR);
5425     }
5426 }
5427 
5428 
5429 struct nxt_fields_iter_s {
5430     nxt_list_part_t   *part;
5431     nxt_http_field_t  *field;
5432 };
5433 
5434 typedef struct nxt_fields_iter_s  nxt_fields_iter_t;
5435 
5436 
5437 static nxt_http_field_t *
5438 nxt_fields_part_first(nxt_list_part_t *part, nxt_fields_iter_t *i)
5439 {
5440     if (part == NULL) {
5441         return NULL;
5442     }
5443 
5444     while (part->nelts == 0) {
5445         part = part->next;
5446         if (part == NULL) {
5447             return NULL;
5448         }
5449     }
5450 
5451     i->part = part;
5452     i->field = nxt_list_data(i->part);
5453 
5454     return i->field;
5455 }
5456 
5457 
5458 static nxt_http_field_t *
5459 nxt_fields_first(nxt_list_t *fields, nxt_fields_iter_t *i)
5460 {
5461     return nxt_fields_part_first(nxt_list_part(fields), i);
5462 }
5463 
5464 
5465 static nxt_http_field_t *
5466 nxt_fields_next(nxt_fields_iter_t *i)
5467 {
5468     nxt_http_field_t  *end = nxt_list_data(i->part);
5469 
5470     end += i->part->nelts;
5471     i->field++;
5472 
5473     if (i->field < end) {
5474         return i->field;
5475     }
5476 
5477     return nxt_fields_part_first(i->part->next, i);
5478 }
5479 
5480 
5481 static nxt_buf_t *
5482 nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r,
5483     nxt_app_t *app, const nxt_str_t *prefix)
5484 {
5485     void                *target_pos, *query_pos;
5486     u_char              *pos, *end, *p, c;
5487     size_t              fields_count, req_size, size, free_size;
5488     size_t              copy_size;
5489     nxt_off_t           content_length;
5490     nxt_buf_t           *b, *buf, *out, **tail;
5491     nxt_http_field_t    *field, *dup;
5492     nxt_unit_field_t    *dst_field;
5493     nxt_fields_iter_t   iter, dup_iter;
5494     nxt_unit_request_t  *req;
5495 
5496     req_size = sizeof(nxt_unit_request_t)
5497                + r->method->length + 1
5498                + r->version.length + 1
5499                + r->remote->length + 1
5500                + r->local->length + 1
5501                + r->server_name.length + 1
5502                + r->target.length + 1
5503                + (r->path->start != r->target.start ? r->path->length + 1 : 0);
5504 
5505     content_length = r->content_length_n < 0 ? 0 : r->content_length_n;
5506     fields_count = 0;
5507 
5508     nxt_list_each(field, r->fields) {
5509         fields_count++;
5510 
5511         req_size += field->name_length + prefix->length + 1
5512                     + field->value_length + 1;
5513     } nxt_list_loop;
5514 
5515     req_size += fields_count * sizeof(nxt_unit_field_t);
5516 
5517     if (nxt_slow_path(req_size > PORT_MMAP_DATA_SIZE)) {
5518         nxt_alert(task, "headers to big to fit in shared memory (%d)",
5519                   (int) req_size);
5520 
5521         return NULL;
5522     }
5523 
5524     out = nxt_port_mmap_get_buf(task, &app->outgoing,
5525               nxt_min(req_size + content_length, PORT_MMAP_DATA_SIZE));
5526     if (nxt_slow_path(out == NULL)) {
5527         return NULL;
5528     }
5529 
5530     req = (nxt_unit_request_t *) out->mem.free;
5531     out->mem.free += req_size;
5532 
5533     req->app_target = r->app_target;
5534 
5535     req->content_length = content_length;
5536 
5537     p = (u_char *) (req->fields + fields_count);
5538 
5539     nxt_debug(task, "fields_count=%d", (int) fields_count);
5540 
5541     req->method_length = r->method->length;
5542     nxt_unit_sptr_set(&req->method, p);
5543     p = nxt_cpymem(p, r->method->start, r->method->length);
5544     *p++ = '\0';
5545 
5546     req->version_length = r->version.length;
5547     nxt_unit_sptr_set(&req->version, p);
5548     p = nxt_cpymem(p, r->version.start, r->version.length);
5549     *p++ = '\0';
5550 
5551     req->remote_length = r->remote->address_length;
5552     nxt_unit_sptr_set(&req->remote, p);
5553     p = nxt_cpymem(p, nxt_sockaddr_address(r->remote),
5554                    r->remote->address_length);
5555     *p++ = '\0';
5556 
5557     req->local_length = r->local->address_length;
5558     nxt_unit_sptr_set(&req->local, p);
5559     p = nxt_cpymem(p, nxt_sockaddr_address(r->local), r->local->address_length);
5560     *p++ = '\0';
5561 
5562     req->tls = (r->tls != NULL);
5563     req->websocket_handshake = r->websocket_handshake;
5564 
5565     req->server_name_length = r->server_name.length;
5566     nxt_unit_sptr_set(&req->server_name, p);
5567     p = nxt_cpymem(p, r->server_name.start, r->server_name.length);
5568     *p++ = '\0';
5569 
5570     target_pos = p;
5571     req->target_length = (uint32_t) r->target.length;
5572     nxt_unit_sptr_set(&req->target, p);
5573     p = nxt_cpymem(p, r->target.start, r->target.length);
5574     *p++ = '\0';
5575 
5576     req->path_length = (uint32_t) r->path->length;
5577     if (r->path->start == r->target.start) {
5578         nxt_unit_sptr_set(&req->path, target_pos);
5579 
5580     } else {
5581         nxt_unit_sptr_set(&req->path, p);
5582         p = nxt_cpymem(p, r->path->start, r->path->length);
5583         *p++ = '\0';
5584     }
5585 
5586     req->query_length = (uint32_t) r->args->length;
5587     if (r->args->start != NULL) {
5588         query_pos = nxt_pointer_to(target_pos,
5589                                    r->args->start - r->target.start);
5590 
5591         nxt_unit_sptr_set(&req->query, query_pos);
5592 
5593     } else {
5594         req->query.offset = 0;
5595     }
5596 
5597     req->content_length_field = NXT_UNIT_NONE_FIELD;
5598     req->content_type_field   = NXT_UNIT_NONE_FIELD;
5599     req->cookie_field         = NXT_UNIT_NONE_FIELD;
5600     req->authorization_field  = NXT_UNIT_NONE_FIELD;
5601 
5602     dst_field = req->fields;
5603 
5604     for (field = nxt_fields_first(r->fields, &iter);
5605          field != NULL;
5606          field = nxt_fields_next(&iter))
5607     {
5608         if (field->skip) {
5609             continue;
5610         }
5611 
5612         dst_field->hash = field->hash;
5613         dst_field->skip = 0;
5614         dst_field->name_length = field->name_length + prefix->length;
5615         dst_field->value_length = field->value_length;
5616 
5617         if (field == r->content_length) {
5618             req->content_length_field = dst_field - req->fields;
5619 
5620         } else if (field == r->content_type) {
5621             req->content_type_field = dst_field - req->fields;
5622 
5623         } else if (field == r->cookie) {
5624             req->cookie_field = dst_field - req->fields;
5625 
5626         } else if (field == r->authorization) {
5627             req->authorization_field = dst_field - req->fields;
5628         }
5629 
5630         nxt_debug(task, "add field 0x%04Xd, %d, %d, %p : %d %p",
5631                   (int) field->hash, (int) field->skip,
5632                   (int) field->name_length, field->name,
5633                   (int) field->value_length, field->value);
5634 
5635         if (prefix->length != 0) {
5636             nxt_unit_sptr_set(&dst_field->name, p);
5637             p = nxt_cpymem(p, prefix->start, prefix->length);
5638 
5639             end = field->name + field->name_length;
5640             for (pos = field->name; pos < end; pos++) {
5641                 c = *pos;
5642 
5643                 if (c >= 'a' && c <= 'z') {
5644                     *p++ = (c & ~0x20);
5645                     continue;
5646                 }
5647 
5648                 if (c == '-') {
5649                     *p++ = '_';
5650                     continue;
5651                 }
5652 
5653                 *p++ = c;
5654             }
5655 
5656         } else {
5657             nxt_unit_sptr_set(&dst_field->name, p);
5658             p = nxt_cpymem(p, field->name, field->name_length);
5659         }
5660 
5661         *p++ = '\0';
5662 
5663         nxt_unit_sptr_set(&dst_field->value, p);
5664         p = nxt_cpymem(p, field->value, field->value_length);
5665 
5666         if (prefix->length != 0) {
5667             dup_iter = iter;
5668 
5669             for (dup = nxt_fields_next(&dup_iter);
5670                  dup != NULL;
5671                  dup = nxt_fields_next(&dup_iter))
5672             {
5673                 if (dup->name_length != field->name_length
5674                     || dup->skip
5675                     || dup->hash != field->hash
5676                     || nxt_memcasecmp(dup->name, field->name, dup->name_length))
5677                 {
5678                     continue;
5679                 }
5680 
5681                 p = nxt_cpymem(p, ", ", 2);
5682                 p = nxt_cpymem(p, dup->value, dup->value_length);
5683 
5684                 dst_field->value_length += 2 + dup->value_length;
5685 
5686                 dup->skip = 1;
5687             }
5688         }
5689 
5690         *p++ = '\0';
5691 
5692         dst_field++;
5693     }
5694 
5695     req->fields_count = (uint32_t) (dst_field - req->fields);
5696 
5697     nxt_unit_sptr_set(&req->preread_content, out->mem.free);
5698 
5699     buf = out;
5700     tail = &buf->next;
5701 
5702     for (b = r->body; b != NULL; b = b->next) {
5703         size = nxt_buf_mem_used_size(&b->mem);
5704         pos = b->mem.pos;
5705 
5706         while (size > 0) {
5707             if (buf == NULL) {
5708                 free_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
5709 
5710                 buf = nxt_port_mmap_get_buf(task, &app->outgoing, free_size);
5711                 if (nxt_slow_path(buf == NULL)) {
5712                     while (out != NULL) {
5713                         buf = out->next;
5714                         out->next = NULL;
5715                         out->completion_handler(task, out, out->parent);
5716                         out = buf;
5717                     }
5718                     return NULL;
5719                 }
5720 
5721                 *tail = buf;
5722                 tail = &buf->next;
5723 
5724             } else {
5725                 free_size = nxt_buf_mem_free_size(&buf->mem);
5726                 if (free_size < size
5727                     && nxt_port_mmap_increase_buf(task, buf, size, 1)
5728                        == NXT_OK)
5729                 {
5730                     free_size = nxt_buf_mem_free_size(&buf->mem);
5731                 }
5732             }
5733 
5734             if (free_size > 0) {
5735                 copy_size = nxt_min(free_size, size);
5736 
5737                 buf->mem.free = nxt_cpymem(buf->mem.free, pos, copy_size);
5738 
5739                 size -= copy_size;
5740                 pos += copy_size;
5741 
5742                 if (size == 0) {
5743                     break;
5744                 }
5745             }
5746 
5747             buf = NULL;
5748         }
5749     }
5750 
5751     return out;
5752 }
5753 
5754 
5755 static void
5756 nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
5757 {
5758     nxt_timer_t              *timer;
5759     nxt_http_request_t       *r;
5760     nxt_request_rpc_data_t   *req_rpc_data;
5761 
5762     timer = obj;
5763 
5764     nxt_debug(task, "router app timeout");
5765 
5766     r = nxt_timer_data(timer, nxt_http_request_t, timer);
5767     req_rpc_data = r->timer_data;
5768 
5769     nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
5770 
5771     nxt_request_rpc_data_unlink(task, req_rpc_data);
5772 }
5773 
5774 
5775 static void
5776 nxt_router_http_request_release_post(nxt_task_t *task, nxt_http_request_t *r)
5777 {
5778     r->timer.handler = nxt_router_http_request_release;
5779     nxt_timer_add(task->thread->engine, &r->timer, 0);
5780 }
5781 
5782 
5783 static void
5784 nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data)
5785 {
5786     nxt_http_request_t  *r;
5787 
5788     nxt_debug(task, "http request pool release");
5789 
5790     r = nxt_timer_data(obj, nxt_http_request_t, timer);
5791 
5792     nxt_mp_release(r->mem_pool);
5793 }
5794 
5795 
5796 static void
5797 nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5798 {
5799     size_t                   mi;
5800     uint32_t                 i;
5801     nxt_bool_t               ack;
5802     nxt_process_t            *process;
5803     nxt_free_map_t           *m;
5804     nxt_port_mmap_handler_t  *mmap_handler;
5805 
5806     nxt_debug(task, "oosm in %PI", msg->port_msg.pid);
5807 
5808     process = nxt_runtime_process_find(task->thread->runtime,
5809                                        msg->port_msg.pid);
5810     if (nxt_slow_path(process == NULL)) {
5811         return;
5812     }
5813 
5814     ack = 0;
5815 
5816     /*
5817      * To mitigate possible racing condition (when OOSM message received
5818      * after some of the memory was already freed), need to try to find
5819      * first free segment in shared memory and send ACK if found.
5820      */
5821 
5822     nxt_thread_mutex_lock(&process->incoming.mutex);
5823 
5824     for (i = 0; i < process->incoming.size; i++) {
5825         mmap_handler = process->incoming.elts[i].mmap_handler;
5826 
5827         if (nxt_slow_path(mmap_handler == NULL)) {
5828             continue;
5829         }
5830 
5831         m = mmap_handler->hdr->free_map;
5832 
5833         for (mi = 0; mi < MAX_FREE_IDX; mi++) {
5834             if (m[mi] != 0) {
5835                 ack = 1;
5836 
5837                 nxt_debug(task, "oosm: already free #%uD %uz = 0x%08xA",
5838                           i, mi, m[mi]);
5839 
5840                 break;
5841             }
5842         }
5843     }
5844 
5845     nxt_thread_mutex_unlock(&process->incoming.mutex);
5846 
5847     if (ack) {
5848         nxt_process_broadcast_shm_ack(task, process);
5849     }
5850 }
5851 
5852 
5853 static void
5854 nxt_router_get_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5855 {
5856     nxt_fd_t                 fd;
5857     nxt_port_t               *port;
5858     nxt_runtime_t            *rt;
5859     nxt_port_mmaps_t         *mmaps;
5860     nxt_port_msg_get_mmap_t  *get_mmap_msg;
5861     nxt_port_mmap_handler_t  *mmap_handler;
5862 
5863     rt = task->thread->runtime;
5864 
5865     port = nxt_runtime_port_find(rt, msg->port_msg.pid,
5866                                  msg->port_msg.reply_port);
5867     if (nxt_slow_path(port == NULL)) {
5868         nxt_alert(task, "get_mmap_handler: reply_port %PI:%d not found",
5869                   msg->port_msg.pid, msg->port_msg.reply_port);
5870 
5871         return;
5872     }
5873 
5874     if (nxt_slow_path(nxt_buf_used_size(msg->buf)
5875                       < (int) sizeof(nxt_port_msg_get_mmap_t)))
5876     {
5877         nxt_alert(task, "get_mmap_handler: message buffer too small (%d)",
5878                   (int) nxt_buf_used_size(msg->buf));
5879 
5880         return;
5881     }
5882 
5883     get_mmap_msg = (nxt_port_msg_get_mmap_t *) msg->buf->mem.pos;
5884 
5885     nxt_assert(port->type == NXT_PROCESS_APP);
5886 
5887     if (nxt_slow_path(port->app == NULL)) {
5888         nxt_alert(task, "get_mmap_handler: app == NULL for reply port %PI:%d",
5889                   port->pid, port->id);
5890 
5891         // FIXME
5892         nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
5893                               -1, msg->port_msg.stream, 0, NULL);
5894 
5895         return;
5896     }
5897 
5898     mmaps = &port->app->outgoing;
5899     nxt_thread_mutex_lock(&mmaps->mutex);
5900 
5901     if (nxt_slow_path(get_mmap_msg->id >= mmaps->size)) {
5902         nxt_thread_mutex_unlock(&mmaps->mutex);
5903 
5904         nxt_alert(task, "get_mmap_handler: mmap id is too big (%d)",
5905                   (int) get_mmap_msg->id);
5906 
5907         // FIXME
5908         nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
5909                               -1, msg->port_msg.stream, 0, NULL);
5910         return;
5911     }
5912 
5913     mmap_handler = mmaps->elts[get_mmap_msg->id].mmap_handler;
5914 
5915     fd = mmap_handler->fd;
5916 
5917     nxt_thread_mutex_unlock(&mmaps->mutex);
5918 
5919     nxt_debug(task, "get mmap %PI:%d found",
5920               msg->port_msg.pid, (int) get_mmap_msg->id);
5921 
5922     (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL);
5923 }
5924 
5925 
5926 static void
5927 nxt_router_get_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5928 {
5929     nxt_port_t               *port, *reply_port;
5930     nxt_runtime_t            *rt;
5931     nxt_port_msg_get_port_t  *get_port_msg;
5932 
5933     rt = task->thread->runtime;
5934 
5935     reply_port = nxt_runtime_port_find(rt, msg->port_msg.pid,
5936                                        msg->port_msg.reply_port);
5937     if (nxt_slow_path(reply_port == NULL)) {
5938         nxt_alert(task, "get_port_handler: reply_port %PI:%d not found",
5939                   msg->port_msg.pid, msg->port_msg.reply_port);
5940 
5941         return;
5942     }
5943 
5944     if (nxt_slow_path(nxt_buf_used_size(msg->buf)
5945                       < (int) sizeof(nxt_port_msg_get_port_t)))
5946     {
5947         nxt_alert(task, "get_port_handler: message buffer too small (%d)",
5948                   (int) nxt_buf_used_size(msg->buf));
5949 
5950         return;
5951     }
5952 
5953     get_port_msg = (nxt_port_msg_get_port_t *) msg->buf->mem.pos;
5954 
5955     port = nxt_runtime_port_find(rt, get_port_msg->pid, get_port_msg->id);
5956     if (nxt_slow_path(port == NULL)) {
5957         nxt_alert(task, "get_port_handler: port %PI:%d not found",
5958                   get_port_msg->pid, get_port_msg->id);
5959 
5960         return;
5961     }
5962 
5963     nxt_debug(task, "get port %PI:%d found", get_port_msg->pid,
5964               get_port_msg->id);
5965 
5966     (void) nxt_port_send_port(task, reply_port, port, msg->port_msg.stream);
5967 }
5968