xref: /unit/src/nxt_router.c (revision 1867:7f1b2eaa2d58)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Valentin V. Bartenev
5  * Copyright (C) NGINX, Inc.
6  */
7 
8 #include <nxt_router.h>
9 #include <nxt_conf.h>
10 #if (NXT_TLS)
11 #include <nxt_cert.h>
12 #endif
13 #include <nxt_http.h>
14 #include <nxt_port_memory_int.h>
15 #include <nxt_unit_request.h>
16 #include <nxt_unit_response.h>
17 #include <nxt_router_request.h>
18 #include <nxt_app_queue.h>
19 #include <nxt_port_queue.h>
20 
21 typedef struct {
22     nxt_str_t         type;
23     uint32_t          processes;
24     uint32_t          max_processes;
25     uint32_t          spare_processes;
26     nxt_msec_t        timeout;
27     nxt_msec_t        idle_timeout;
28     uint32_t          requests;
29     nxt_conf_value_t  *limits_value;
30     nxt_conf_value_t  *processes_value;
31     nxt_conf_value_t  *targets_value;
32 } nxt_router_app_conf_t;
33 
34 
35 typedef struct {
36     nxt_str_t         pass;
37     nxt_str_t         application;
38 } nxt_router_listener_conf_t;
39 
40 
41 #if (NXT_TLS)
42 
43 typedef struct {
44     nxt_str_t          name;
45     nxt_socket_conf_t  *conf;
46 
47     nxt_queue_link_t   link;  /* for nxt_socket_conf_t.tls */
48 } nxt_router_tlssock_t;
49 
50 #endif
51 
52 
53 typedef struct {
54     nxt_str_t               *name;
55     nxt_socket_conf_t       *socket_conf;
56     nxt_router_temp_conf_t  *temp_conf;
57     nxt_bool_t              last;
58 } nxt_socket_rpc_t;
59 
60 
61 typedef struct {
62     nxt_app_t               *app;
63     nxt_router_temp_conf_t  *temp_conf;
64 } nxt_app_rpc_t;
65 
66 
67 static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process,
68     nxt_mp_t *mp);
69 static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data);
70 static void nxt_router_greet_controller(nxt_task_t *task,
71     nxt_port_t *controller_port);
72 
73 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
74 
75 static void nxt_router_new_port_handler(nxt_task_t *task,
76     nxt_port_recv_msg_t *msg);
77 static void nxt_router_conf_data_handler(nxt_task_t *task,
78     nxt_port_recv_msg_t *msg);
79 static void nxt_router_remove_pid_handler(nxt_task_t *task,
80     nxt_port_recv_msg_t *msg);
81 static void nxt_router_access_log_reopen_handler(nxt_task_t *task,
82     nxt_port_recv_msg_t *msg);
83 
84 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
85 static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
86 static void nxt_router_conf_ready(nxt_task_t *task,
87     nxt_router_temp_conf_t *tmcf);
88 static void nxt_router_conf_error(nxt_task_t *task,
89     nxt_router_temp_conf_t *tmcf);
90 static void nxt_router_conf_send(nxt_task_t *task,
91     nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
92 
93 static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
94     nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
95 static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task,
96     nxt_router_conf_t *rtcf, nxt_conf_value_t *conf);
97 
98 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
99 static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data);
100 static nxt_int_t nxt_router_apps_hash_add(nxt_router_conf_t *rtcf,
101     nxt_app_t *app);
102 static nxt_app_t *nxt_router_apps_hash_get(nxt_router_conf_t *rtcf,
103     nxt_str_t *name);
104 static void nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf,
105     int i);
106 
107 static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task,
108     nxt_port_t *port);
109 static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task,
110     nxt_port_t *port);
111 static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task,
112     nxt_port_t *port, nxt_fd_t fd);
113 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
114     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
115 static void nxt_router_listen_socket_ready(nxt_task_t *task,
116     nxt_port_recv_msg_t *msg, void *data);
117 static void nxt_router_listen_socket_error(nxt_task_t *task,
118     nxt_port_recv_msg_t *msg, void *data);
119 #if (NXT_TLS)
120 static void nxt_router_tls_rpc_create(nxt_task_t *task,
121     nxt_router_temp_conf_t *tmcf, nxt_router_tlssock_t *tls, nxt_bool_t last);
122 static void nxt_router_tls_rpc_handler(nxt_task_t *task,
123     nxt_port_recv_msg_t *msg, void *data);
124 static nxt_int_t nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
125     nxt_conf_value_t *value, nxt_socket_conf_t *skcf);
126 #endif
127 static void nxt_router_app_rpc_create(nxt_task_t *task,
128     nxt_router_temp_conf_t *tmcf, nxt_app_t *app);
129 static void nxt_router_app_prefork_ready(nxt_task_t *task,
130     nxt_port_recv_msg_t *msg, void *data);
131 static void nxt_router_app_prefork_error(nxt_task_t *task,
132     nxt_port_recv_msg_t *msg, void *data);
133 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
134     nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
135 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
136     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa);
137 
138 static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
139     nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
140     const nxt_event_interface_t *interface);
141 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
142     nxt_router_engine_conf_t *recf);
143 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
144     nxt_router_engine_conf_t *recf);
145 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
146     nxt_router_engine_conf_t *recf);
147 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
148     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
149     nxt_work_handler_t handler);
150 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
151     nxt_router_engine_conf_t *recf);
152 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
153     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
154 
155 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
156     nxt_router_temp_conf_t *tmcf);
157 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
158     nxt_event_engine_t *engine);
159 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
160     nxt_router_temp_conf_t *tmcf);
161 
162 static void nxt_router_engines_post(nxt_router_t *router,
163     nxt_router_temp_conf_t *tmcf);
164 static void nxt_router_engine_post(nxt_event_engine_t *engine,
165     nxt_work_t *jobs);
166 
167 static void nxt_router_thread_start(void *data);
168 static void nxt_router_rt_add_port(nxt_task_t *task, void *obj,
169     void *data);
170 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
171     void *data);
172 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
173     void *data);
174 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
175     void *data);
176 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
177     void *data);
178 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
179     void *data);
180 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
181     void *data);
182 static void nxt_router_req_headers_ack_handler(nxt_task_t *task,
183     nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data);
184 static void nxt_router_listen_socket_release(nxt_task_t *task,
185     nxt_socket_conf_t *skcf);
186 
187 static void nxt_router_access_log_writer(nxt_task_t *task,
188     nxt_http_request_t *r, nxt_router_access_log_t *access_log);
189 static u_char *nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now,
190     struct tm *tm, size_t size, const char *format);
191 static void nxt_router_access_log_open(nxt_task_t *task,
192     nxt_router_temp_conf_t *tmcf);
193 static void nxt_router_access_log_ready(nxt_task_t *task,
194     nxt_port_recv_msg_t *msg, void *data);
195 static void nxt_router_access_log_error(nxt_task_t *task,
196     nxt_port_recv_msg_t *msg, void *data);
197 static void nxt_router_access_log_release(nxt_task_t *task,
198     nxt_thread_spinlock_t *lock, nxt_router_access_log_t *access_log);
199 static void nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj,
200     void *data);
201 static void nxt_router_access_log_reopen_ready(nxt_task_t *task,
202     nxt_port_recv_msg_t *msg, void *data);
203 static void nxt_router_access_log_reopen_error(nxt_task_t *task,
204     nxt_port_recv_msg_t *msg, void *data);
205 
206 static void nxt_router_app_port_ready(nxt_task_t *task,
207     nxt_port_recv_msg_t *msg, void *data);
208 static nxt_int_t nxt_router_app_shared_port_send(nxt_task_t *task,
209     nxt_port_t *app_port);
210 static void nxt_router_app_port_error(nxt_task_t *task,
211     nxt_port_recv_msg_t *msg, void *data);
212 
213 static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i);
214 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
215 
216 static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
217     nxt_apr_action_t action);
218 static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
219     nxt_request_rpc_data_t *req_rpc_data);
220 static void nxt_router_http_request_error(nxt_task_t *task, void *obj,
221     void *data);
222 static void nxt_router_http_request_done(nxt_task_t *task, void *obj,
223     void *data);
224 
225 static void nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj,
226     void *data);
227 static void nxt_router_app_prepare_request(nxt_task_t *task,
228     nxt_request_rpc_data_t *req_rpc_data);
229 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task,
230     nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix);
231 
232 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
233 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
234     void *data);
235 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj,
236     void *data);
237 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj,
238     void *data);
239 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data);
240 
241 static const nxt_http_request_state_t  nxt_http_request_send_state;
242 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
243 
244 static void nxt_router_app_joint_use(nxt_task_t *task,
245     nxt_app_joint_t *app_joint, int i);
246 
247 static void nxt_router_http_request_release_post(nxt_task_t *task,
248     nxt_http_request_t *r);
249 static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
250     void *data);
251 static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
252 static void nxt_router_get_port_handler(nxt_task_t *task,
253     nxt_port_recv_msg_t *msg);
254 static void nxt_router_get_mmap_handler(nxt_task_t *task,
255     nxt_port_recv_msg_t *msg);
256 
257 extern const nxt_http_request_state_t  nxt_http_websocket;
258 
259 static nxt_router_t  *nxt_router;
260 
261 static const nxt_str_t http_prefix = nxt_string("HTTP_");
262 static const nxt_str_t empty_prefix = nxt_string("");
263 
264 static const nxt_str_t  *nxt_app_msg_prefix[] = {
265     &empty_prefix,
266     &empty_prefix,
267     &http_prefix,
268     &http_prefix,
269     &http_prefix,
270     &empty_prefix,
271 };
272 
273 
274 static const nxt_port_handlers_t  nxt_router_process_port_handlers = {
275     .quit         = nxt_signal_quit_handler,
276     .new_port     = nxt_router_new_port_handler,
277     .get_port     = nxt_router_get_port_handler,
278     .change_file  = nxt_port_change_log_file_handler,
279     .mmap         = nxt_port_mmap_handler,
280     .get_mmap     = nxt_router_get_mmap_handler,
281     .data         = nxt_router_conf_data_handler,
282     .remove_pid   = nxt_router_remove_pid_handler,
283     .access_log   = nxt_router_access_log_reopen_handler,
284     .rpc_ready    = nxt_port_rpc_handler,
285     .rpc_error    = nxt_port_rpc_handler,
286     .oosm         = nxt_router_oosm_handler,
287 };
288 
289 
290 const nxt_process_init_t  nxt_router_process = {
291     .name           = "router",
292     .type           = NXT_PROCESS_ROUTER,
293     .prefork        = nxt_router_prefork,
294     .restart        = 1,
295     .setup          = nxt_process_core_setup,
296     .start          = nxt_router_start,
297     .port_handlers  = &nxt_router_process_port_handlers,
298     .signals        = nxt_process_signals,
299 };
300 
301 
302 /* Queues of nxt_socket_conf_t */
303 nxt_queue_t  creating_sockets;
304 nxt_queue_t  pending_sockets;
305 nxt_queue_t  updating_sockets;
306 nxt_queue_t  keeping_sockets;
307 nxt_queue_t  deleting_sockets;
308 
309 
310 static nxt_int_t
311 nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp)
312 {
313     nxt_runtime_stop_app_processes(task, task->thread->runtime);
314 
315     return NXT_OK;
316 }
317 
318 
319 static nxt_int_t
320 nxt_router_start(nxt_task_t *task, nxt_process_data_t *data)
321 {
322     nxt_int_t      ret;
323     nxt_port_t     *controller_port;
324     nxt_router_t   *router;
325     nxt_runtime_t  *rt;
326 
327     rt = task->thread->runtime;
328 
329     nxt_log(task, NXT_LOG_INFO, "router started");
330 
331 #if (NXT_TLS)
332     rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL");
333     if (nxt_slow_path(rt->tls == NULL)) {
334         return NXT_ERROR;
335     }
336 
337     ret = rt->tls->library_init(task);
338     if (nxt_slow_path(ret != NXT_OK)) {
339         return ret;
340     }
341 #endif
342 
343     ret = nxt_http_init(task);
344     if (nxt_slow_path(ret != NXT_OK)) {
345         return ret;
346     }
347 
348     router = nxt_zalloc(sizeof(nxt_router_t));
349     if (nxt_slow_path(router == NULL)) {
350         return NXT_ERROR;
351     }
352 
353     nxt_queue_init(&router->engines);
354     nxt_queue_init(&router->sockets);
355     nxt_queue_init(&router->apps);
356 
357     nxt_router = router;
358 
359     controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
360     if (controller_port != NULL) {
361         nxt_router_greet_controller(task, controller_port);
362     }
363 
364     return NXT_OK;
365 }
366 
367 
368 static void
369 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port)
370 {
371     nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY,
372                           -1, 0, 0, NULL);
373 }
374 
375 
376 static void
377 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
378     void *data)
379 {
380     size_t         size;
381     uint32_t       stream;
382     nxt_mp_t       *mp;
383     nxt_int_t      ret;
384     nxt_app_t      *app;
385     nxt_buf_t      *b;
386     nxt_port_t     *main_port;
387     nxt_runtime_t  *rt;
388 
389     app = data;
390 
391     rt = task->thread->runtime;
392     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
393 
394     nxt_debug(task, "app '%V' %p start process", &app->name, app);
395 
396     size = app->name.length + 1 + app->conf.length;
397 
398     b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size);
399 
400     if (nxt_slow_path(b == NULL)) {
401         goto failed;
402     }
403 
404     nxt_buf_cpystr(b, &app->name);
405     *b->mem.free++ = '\0';
406     nxt_buf_cpystr(b, &app->conf);
407 
408     nxt_router_app_joint_use(task, app->joint, 1);
409 
410     stream = nxt_port_rpc_register_handler(task, port,
411                                            nxt_router_app_port_ready,
412                                            nxt_router_app_port_error,
413                                            -1, app->joint);
414 
415     if (nxt_slow_path(stream == 0)) {
416         nxt_router_app_joint_use(task, app->joint, -1);
417 
418         goto failed;
419     }
420 
421     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS,
422                                 -1, stream, port->id, b);
423 
424     if (nxt_slow_path(ret != NXT_OK)) {
425         nxt_port_rpc_cancel(task, port, stream);
426 
427         nxt_router_app_joint_use(task, app->joint, -1);
428 
429         goto failed;
430     }
431 
432     nxt_router_app_use(task, app, -1);
433 
434     return;
435 
436 failed:
437 
438     if (b != NULL) {
439         mp = b->data;
440         nxt_mp_free(mp, b);
441         nxt_mp_release(mp);
442     }
443 
444     nxt_thread_mutex_lock(&app->mutex);
445 
446     app->pending_processes--;
447 
448     nxt_thread_mutex_unlock(&app->mutex);
449 
450     nxt_router_app_use(task, app, -1);
451 }
452 
453 
454 static void
455 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i)
456 {
457     app_joint->use_count += i;
458 
459     if (app_joint->use_count == 0) {
460         nxt_assert(app_joint->app == NULL);
461 
462         nxt_free(app_joint);
463     }
464 }
465 
466 
467 static nxt_int_t
468 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
469 {
470     nxt_int_t      res;
471     nxt_port_t     *router_port;
472     nxt_runtime_t  *rt;
473 
474     nxt_debug(task, "app '%V' start process", &app->name);
475 
476     rt = task->thread->runtime;
477     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
478 
479     nxt_router_app_use(task, app, 1);
480 
481     res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler,
482                         app);
483 
484     if (res == NXT_OK) {
485         return res;
486     }
487 
488     nxt_thread_mutex_lock(&app->mutex);
489 
490     app->pending_processes--;
491 
492     nxt_thread_mutex_unlock(&app->mutex);
493 
494     nxt_router_app_use(task, app, -1);
495 
496     return NXT_ERROR;
497 }
498 
499 
500 nxt_inline nxt_bool_t
501 nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data)
502 {
503     nxt_buf_t       *b, *next;
504     nxt_bool_t      cancelled;
505     nxt_msg_info_t  *msg_info;
506 
507     msg_info = &req_rpc_data->msg_info;
508 
509     if (msg_info->buf == NULL) {
510         return 0;
511     }
512 
513     cancelled = nxt_app_queue_cancel(req_rpc_data->app->shared_port->queue,
514                                      msg_info->tracking_cookie,
515                                      req_rpc_data->stream);
516 
517     if (cancelled) {
518         nxt_debug(task, "stream #%uD: cancelled by router",
519                   req_rpc_data->stream);
520     }
521 
522     for (b = msg_info->buf; b != NULL; b = next) {
523         next = b->next;
524         b->next = NULL;
525 
526         if (b->is_port_mmap_sent) {
527             b->is_port_mmap_sent = cancelled == 0;
528         }
529 
530         b->completion_handler(task, b, b->parent);
531     }
532 
533     msg_info->buf = NULL;
534 
535     return cancelled;
536 }
537 
538 
539 nxt_inline nxt_bool_t
540 nxt_queue_chk_remove(nxt_queue_link_t *lnk)
541 {
542     if (lnk->next != NULL) {
543         nxt_queue_remove(lnk);
544 
545         lnk->next = NULL;
546 
547         return 1;
548     }
549 
550     return 0;
551 }
552 
553 
554 nxt_inline void
555 nxt_request_rpc_data_unlink(nxt_task_t *task,
556     nxt_request_rpc_data_t *req_rpc_data)
557 {
558     nxt_app_t           *app;
559     nxt_bool_t          unlinked;
560     nxt_http_request_t  *r;
561 
562     nxt_router_msg_cancel(task, req_rpc_data);
563 
564     if (req_rpc_data->app_port != NULL) {
565         nxt_router_app_port_release(task, req_rpc_data->app_port,
566                                     req_rpc_data->apr_action);
567 
568         req_rpc_data->app_port = NULL;
569     }
570 
571     app = req_rpc_data->app;
572     r = req_rpc_data->request;
573 
574     if (r != NULL) {
575         r->timer_data = NULL;
576 
577         nxt_router_http_request_release_post(task, r);
578 
579         r->req_rpc_data = NULL;
580         req_rpc_data->request = NULL;
581 
582         if (app != NULL) {
583             unlinked = 0;
584 
585             nxt_thread_mutex_lock(&app->mutex);
586 
587             if (r->app_link.next != NULL) {
588                 nxt_queue_remove(&r->app_link);
589                 r->app_link.next = NULL;
590 
591                 unlinked = 1;
592             }
593 
594             nxt_thread_mutex_unlock(&app->mutex);
595 
596             if (unlinked) {
597                 nxt_mp_release(r->mem_pool);
598             }
599         }
600     }
601 
602     if (app != NULL) {
603         nxt_router_app_use(task, app, -1);
604 
605         req_rpc_data->app = NULL;
606     }
607 
608     if (req_rpc_data->msg_info.body_fd != -1) {
609         nxt_fd_close(req_rpc_data->msg_info.body_fd);
610 
611         req_rpc_data->msg_info.body_fd = -1;
612     }
613 
614     if (req_rpc_data->rpc_cancel) {
615         req_rpc_data->rpc_cancel = 0;
616 
617         nxt_port_rpc_cancel(task, task->thread->engine->port,
618                             req_rpc_data->stream);
619     }
620 }
621 
622 
623 static void
624 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
625 {
626     nxt_int_t      res;
627     nxt_app_t      *app;
628     nxt_port_t     *port, *main_app_port;
629     nxt_runtime_t  *rt;
630 
631     nxt_port_new_port_handler(task, msg);
632 
633     port = msg->u.new_port;
634 
635     if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) {
636         nxt_router_greet_controller(task, msg->u.new_port);
637     }
638 
639     if (port == NULL || port->type != NXT_PROCESS_APP) {
640 
641         if (msg->port_msg.stream == 0) {
642             return;
643         }
644 
645         msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
646 
647     } else {
648         if (msg->fd[1] != -1) {
649             res = nxt_router_port_queue_map(task, port, msg->fd[1]);
650             if (nxt_slow_path(res != NXT_OK)) {
651                 return;
652             }
653 
654             nxt_fd_close(msg->fd[1]);
655             msg->fd[1] = -1;
656         }
657     }
658 
659     if (msg->port_msg.stream != 0) {
660         nxt_port_rpc_handler(task, msg);
661         return;
662     }
663 
664     /*
665      * Port with "id == 0" is application 'main' port and it always
666      * should come with non-zero stream.
667      */
668     nxt_assert(port->id != 0);
669 
670     /* Find 'main' app port and get app reference. */
671     rt = task->thread->runtime;
672 
673     /*
674      * It is safe to access 'runtime->ports' hash because 'NEW_PORT'
675      * sent to main port (with id == 0) and processed in main thread.
676      */
677     main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0);
678     nxt_assert(main_app_port != NULL);
679 
680     app = main_app_port->app;
681     nxt_assert(app != NULL);
682 
683     nxt_thread_mutex_lock(&app->mutex);
684 
685     /* TODO here should be find-and-add code because there can be
686        port waiters in port_hash */
687     nxt_port_hash_add(&app->port_hash, port);
688     app->port_hash_count++;
689 
690     nxt_thread_mutex_unlock(&app->mutex);
691 
692     port->app = app;
693     port->main_app_port = main_app_port;
694 
695     nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
696 }
697 
698 
699 static void
700 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
701 {
702     void                    *p;
703     size_t                  size;
704     nxt_int_t               ret;
705     nxt_port_t              *port;
706     nxt_router_temp_conf_t  *tmcf;
707 
708     port = nxt_runtime_port_find(task->thread->runtime,
709                                  msg->port_msg.pid,
710                                  msg->port_msg.reply_port);
711     if (nxt_slow_path(port == NULL)) {
712         nxt_alert(task, "conf_data_handler: reply port not found");
713         return;
714     }
715 
716     p = MAP_FAILED;
717 
718     /*
719      * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be
720      * initialized in 'cleanup' section.
721      */
722     size = 0;
723 
724     tmcf = nxt_router_temp_conf(task);
725     if (nxt_slow_path(tmcf == NULL)) {
726         goto fail;
727     }
728 
729     if (nxt_slow_path(msg->fd[0] == -1)) {
730         nxt_alert(task, "conf_data_handler: invalid shm fd");
731         goto fail;
732     }
733 
734     if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) {
735         nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)",
736                   (int) nxt_buf_mem_used_size(&msg->buf->mem));
737         goto fail;
738     }
739 
740     nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t));
741 
742     p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0);
743 
744     nxt_fd_close(msg->fd[0]);
745     msg->fd[0] = -1;
746 
747     if (nxt_slow_path(p == MAP_FAILED)) {
748         goto fail;
749     }
750 
751     nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p);
752 
753     tmcf->router_conf->router = nxt_router;
754     tmcf->stream = msg->port_msg.stream;
755     tmcf->port = port;
756 
757     nxt_port_use(task, tmcf->port, 1);
758 
759     ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size));
760 
761     if (nxt_fast_path(ret == NXT_OK)) {
762         nxt_router_conf_apply(task, tmcf, NULL);
763 
764     } else {
765         nxt_router_conf_error(task, tmcf);
766     }
767 
768     goto cleanup;
769 
770 fail:
771 
772     nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
773                           msg->port_msg.stream, 0, NULL);
774 
775     if (tmcf != NULL) {
776         nxt_mp_destroy(tmcf->mem_pool);
777     }
778 
779 cleanup:
780 
781     if (p != MAP_FAILED) {
782         nxt_mem_munmap(p, size);
783     }
784 
785     if (msg->fd[0] != -1) {
786         nxt_fd_close(msg->fd[0]);
787         msg->fd[0] = -1;
788     }
789 }
790 
791 
792 static void
793 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
794     void *data)
795 {
796     union {
797         nxt_pid_t  removed_pid;
798         void       *data;
799     } u;
800 
801     u.data = data;
802 
803     nxt_port_rpc_remove_peer(task, port, u.removed_pid);
804 }
805 
806 
807 static void
808 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
809 {
810     nxt_event_engine_t  *engine;
811 
812     nxt_port_remove_pid_handler(task, msg);
813 
814     nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
815     {
816         if (nxt_fast_path(engine->port != NULL)) {
817             nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid,
818                           msg->u.data);
819         }
820     }
821     nxt_queue_loop;
822 
823     if (msg->port_msg.stream == 0) {
824         return;
825     }
826 
827     msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
828 
829     nxt_port_rpc_handler(task, msg);
830 }
831 
832 
833 static nxt_router_temp_conf_t *
834 nxt_router_temp_conf(nxt_task_t *task)
835 {
836     nxt_mp_t                *mp, *tmp;
837     nxt_router_conf_t       *rtcf;
838     nxt_router_temp_conf_t  *tmcf;
839 
840     mp = nxt_mp_create(1024, 128, 256, 32);
841     if (nxt_slow_path(mp == NULL)) {
842         return NULL;
843     }
844 
845     rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t));
846     if (nxt_slow_path(rtcf == NULL)) {
847         goto fail;
848     }
849 
850     rtcf->mem_pool = mp;
851 
852     tmp = nxt_mp_create(1024, 128, 256, 32);
853     if (nxt_slow_path(tmp == NULL)) {
854         goto fail;
855     }
856 
857     tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t));
858     if (nxt_slow_path(tmcf == NULL)) {
859         goto temp_fail;
860     }
861 
862     tmcf->mem_pool = tmp;
863     tmcf->router_conf = rtcf;
864     tmcf->count = 1;
865     tmcf->engine = task->thread->engine;
866 
867     tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
868                                      sizeof(nxt_router_engine_conf_t));
869     if (nxt_slow_path(tmcf->engines == NULL)) {
870         goto temp_fail;
871     }
872 
873     nxt_queue_init(&creating_sockets);
874     nxt_queue_init(&pending_sockets);
875     nxt_queue_init(&updating_sockets);
876     nxt_queue_init(&keeping_sockets);
877     nxt_queue_init(&deleting_sockets);
878 
879 #if (NXT_TLS)
880     nxt_queue_init(&tmcf->tls);
881 #endif
882 
883     nxt_queue_init(&tmcf->apps);
884     nxt_queue_init(&tmcf->previous);
885 
886     return tmcf;
887 
888 temp_fail:
889 
890     nxt_mp_destroy(tmp);
891 
892 fail:
893 
894     nxt_mp_destroy(mp);
895 
896     return NULL;
897 }
898 
899 
900 nxt_inline nxt_bool_t
901 nxt_router_app_can_start(nxt_app_t *app)
902 {
903     return app->processes + app->pending_processes < app->max_processes
904             && app->pending_processes < app->max_pending_processes;
905 }
906 
907 
908 nxt_inline nxt_bool_t
909 nxt_router_app_need_start(nxt_app_t *app)
910 {
911     return (app->active_requests
912               > app->port_hash_count + app->pending_processes)
913            || (app->spare_processes
914                 > app->idle_processes + app->pending_processes);
915 }
916 
917 
918 static void
919 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
920 {
921     nxt_int_t                    ret;
922     nxt_app_t                    *app;
923     nxt_router_t                 *router;
924     nxt_runtime_t                *rt;
925     nxt_queue_link_t             *qlk;
926     nxt_socket_conf_t            *skcf;
927     nxt_router_conf_t            *rtcf;
928     nxt_router_temp_conf_t       *tmcf;
929     const nxt_event_interface_t  *interface;
930 #if (NXT_TLS)
931     nxt_router_tlssock_t         *tls;
932 #endif
933 
934     tmcf = obj;
935 
936     qlk = nxt_queue_first(&pending_sockets);
937 
938     if (qlk != nxt_queue_tail(&pending_sockets)) {
939         nxt_queue_remove(qlk);
940         nxt_queue_insert_tail(&creating_sockets, qlk);
941 
942         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
943 
944         nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
945 
946         return;
947     }
948 
949 #if (NXT_TLS)
950     qlk = nxt_queue_last(&tmcf->tls);
951 
952     if (qlk != nxt_queue_head(&tmcf->tls)) {
953         nxt_queue_remove(qlk);
954 
955         tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link);
956 
957         nxt_router_tls_rpc_create(task, tmcf, tls,
958                                   nxt_queue_is_empty(&tmcf->tls));
959         return;
960     }
961 #endif
962 
963     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
964 
965         if (nxt_router_app_need_start(app)) {
966             nxt_router_app_rpc_create(task, tmcf, app);
967             return;
968         }
969 
970     } nxt_queue_loop;
971 
972     rtcf = tmcf->router_conf;
973 
974     if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) {
975         nxt_router_access_log_open(task, tmcf);
976         return;
977     }
978 
979     rt = task->thread->runtime;
980 
981     interface = nxt_service_get(rt->services, "engine", NULL);
982 
983     router = rtcf->router;
984 
985     ret = nxt_router_engines_create(task, router, tmcf, interface);
986     if (nxt_slow_path(ret != NXT_OK)) {
987         goto fail;
988     }
989 
990     ret = nxt_router_threads_create(task, rt, tmcf);
991     if (nxt_slow_path(ret != NXT_OK)) {
992         goto fail;
993     }
994 
995     nxt_router_apps_sort(task, router, tmcf);
996 
997     nxt_router_apps_hash_use(task, rtcf, 1);
998 
999     nxt_router_engines_post(router, tmcf);
1000 
1001     nxt_queue_add(&router->sockets, &updating_sockets);
1002     nxt_queue_add(&router->sockets, &creating_sockets);
1003 
1004     router->access_log = rtcf->access_log;
1005 
1006     nxt_router_conf_ready(task, tmcf);
1007 
1008     return;
1009 
1010 fail:
1011 
1012     nxt_router_conf_error(task, tmcf);
1013 
1014     return;
1015 }
1016 
1017 
1018 static void
1019 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data)
1020 {
1021     nxt_joint_job_t  *job;
1022 
1023     job = obj;
1024 
1025     nxt_router_conf_ready(task, job->tmcf);
1026 }
1027 
1028 
1029 static void
1030 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1031 {
1032     uint32_t               count;
1033     nxt_router_conf_t      *rtcf;
1034     nxt_thread_spinlock_t  *lock;
1035 
1036     nxt_debug(task, "temp conf %p count: %D", tmcf, tmcf->count);
1037 
1038     if (--tmcf->count > 0) {
1039         return;
1040     }
1041 
1042     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
1043 
1044     rtcf = tmcf->router_conf;
1045 
1046     lock = &rtcf->router->lock;
1047 
1048     nxt_thread_spin_lock(lock);
1049 
1050     count = rtcf->count;
1051 
1052     nxt_thread_spin_unlock(lock);
1053 
1054     nxt_debug(task, "rtcf %p: %D", rtcf, count);
1055 
1056     if (count == 0) {
1057         nxt_router_apps_hash_use(task, rtcf, -1);
1058 
1059         nxt_router_access_log_release(task, lock, rtcf->access_log);
1060 
1061         nxt_mp_destroy(rtcf->mem_pool);
1062     }
1063 
1064     nxt_mp_destroy(tmcf->mem_pool);
1065 }
1066 
1067 
1068 static void
1069 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1070 {
1071     nxt_app_t          *app;
1072     nxt_queue_t        new_socket_confs;
1073     nxt_socket_t       s;
1074     nxt_router_t       *router;
1075     nxt_queue_link_t   *qlk;
1076     nxt_socket_conf_t  *skcf;
1077     nxt_router_conf_t  *rtcf;
1078 
1079     nxt_alert(task, "failed to apply new conf");
1080 
1081     for (qlk = nxt_queue_first(&creating_sockets);
1082          qlk != nxt_queue_tail(&creating_sockets);
1083          qlk = nxt_queue_next(qlk))
1084     {
1085         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1086         s = skcf->listen->socket;
1087 
1088         if (s != -1) {
1089             nxt_socket_close(task, s);
1090         }
1091 
1092         nxt_free(skcf->listen);
1093     }
1094 
1095     nxt_queue_init(&new_socket_confs);
1096     nxt_queue_add(&new_socket_confs, &updating_sockets);
1097     nxt_queue_add(&new_socket_confs, &pending_sockets);
1098     nxt_queue_add(&new_socket_confs, &creating_sockets);
1099 
1100     rtcf = tmcf->router_conf;
1101 
1102     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1103 
1104         nxt_router_app_unlink(task, app);
1105 
1106     } nxt_queue_loop;
1107 
1108     router = rtcf->router;
1109 
1110     nxt_queue_add(&router->sockets, &keeping_sockets);
1111     nxt_queue_add(&router->sockets, &deleting_sockets);
1112 
1113     nxt_queue_add(&router->apps, &tmcf->previous);
1114 
1115     // TODO: new engines and threads
1116 
1117     nxt_router_access_log_release(task, &router->lock, rtcf->access_log);
1118 
1119     nxt_mp_destroy(rtcf->mem_pool);
1120 
1121     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
1122 
1123     nxt_mp_destroy(tmcf->mem_pool);
1124 }
1125 
1126 
1127 static void
1128 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1129     nxt_port_msg_type_t type)
1130 {
1131     nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);
1132 
1133     nxt_port_use(task, tmcf->port, -1);
1134 
1135     tmcf->port = NULL;
1136 }
1137 
1138 
1139 static nxt_conf_map_t  nxt_router_conf[] = {
1140     {
1141         nxt_string("listeners_threads"),
1142         NXT_CONF_MAP_INT32,
1143         offsetof(nxt_router_conf_t, threads),
1144     },
1145 };
1146 
1147 
1148 static nxt_conf_map_t  nxt_router_app_conf[] = {
1149     {
1150         nxt_string("type"),
1151         NXT_CONF_MAP_STR,
1152         offsetof(nxt_router_app_conf_t, type),
1153     },
1154 
1155     {
1156         nxt_string("limits"),
1157         NXT_CONF_MAP_PTR,
1158         offsetof(nxt_router_app_conf_t, limits_value),
1159     },
1160 
1161     {
1162         nxt_string("processes"),
1163         NXT_CONF_MAP_INT32,
1164         offsetof(nxt_router_app_conf_t, processes),
1165     },
1166 
1167     {
1168         nxt_string("processes"),
1169         NXT_CONF_MAP_PTR,
1170         offsetof(nxt_router_app_conf_t, processes_value),
1171     },
1172 
1173     {
1174         nxt_string("targets"),
1175         NXT_CONF_MAP_PTR,
1176         offsetof(nxt_router_app_conf_t, targets_value),
1177     },
1178 };
1179 
1180 
1181 static nxt_conf_map_t  nxt_router_app_limits_conf[] = {
1182     {
1183         nxt_string("timeout"),
1184         NXT_CONF_MAP_MSEC,
1185         offsetof(nxt_router_app_conf_t, timeout),
1186     },
1187 
1188     {
1189         nxt_string("requests"),
1190         NXT_CONF_MAP_INT32,
1191         offsetof(nxt_router_app_conf_t, requests),
1192     },
1193 };
1194 
1195 
1196 static nxt_conf_map_t  nxt_router_app_processes_conf[] = {
1197     {
1198         nxt_string("spare"),
1199         NXT_CONF_MAP_INT32,
1200         offsetof(nxt_router_app_conf_t, spare_processes),
1201     },
1202 
1203     {
1204         nxt_string("max"),
1205         NXT_CONF_MAP_INT32,
1206         offsetof(nxt_router_app_conf_t, max_processes),
1207     },
1208 
1209     {
1210         nxt_string("idle_timeout"),
1211         NXT_CONF_MAP_MSEC,
1212         offsetof(nxt_router_app_conf_t, idle_timeout),
1213     },
1214 };
1215 
1216 
1217 static nxt_conf_map_t  nxt_router_listener_conf[] = {
1218     {
1219         nxt_string("pass"),
1220         NXT_CONF_MAP_STR_COPY,
1221         offsetof(nxt_router_listener_conf_t, pass),
1222     },
1223 
1224     {
1225         nxt_string("application"),
1226         NXT_CONF_MAP_STR_COPY,
1227         offsetof(nxt_router_listener_conf_t, application),
1228     },
1229 };
1230 
1231 
1232 static nxt_conf_map_t  nxt_router_http_conf[] = {
1233     {
1234         nxt_string("header_buffer_size"),
1235         NXT_CONF_MAP_SIZE,
1236         offsetof(nxt_socket_conf_t, header_buffer_size),
1237     },
1238 
1239     {
1240         nxt_string("large_header_buffer_size"),
1241         NXT_CONF_MAP_SIZE,
1242         offsetof(nxt_socket_conf_t, large_header_buffer_size),
1243     },
1244 
1245     {
1246         nxt_string("large_header_buffers"),
1247         NXT_CONF_MAP_SIZE,
1248         offsetof(nxt_socket_conf_t, large_header_buffers),
1249     },
1250 
1251     {
1252         nxt_string("body_buffer_size"),
1253         NXT_CONF_MAP_SIZE,
1254         offsetof(nxt_socket_conf_t, body_buffer_size),
1255     },
1256 
1257     {
1258         nxt_string("max_body_size"),
1259         NXT_CONF_MAP_SIZE,
1260         offsetof(nxt_socket_conf_t, max_body_size),
1261     },
1262 
1263     {
1264         nxt_string("idle_timeout"),
1265         NXT_CONF_MAP_MSEC,
1266         offsetof(nxt_socket_conf_t, idle_timeout),
1267     },
1268 
1269     {
1270         nxt_string("header_read_timeout"),
1271         NXT_CONF_MAP_MSEC,
1272         offsetof(nxt_socket_conf_t, header_read_timeout),
1273     },
1274 
1275     {
1276         nxt_string("body_read_timeout"),
1277         NXT_CONF_MAP_MSEC,
1278         offsetof(nxt_socket_conf_t, body_read_timeout),
1279     },
1280 
1281     {
1282         nxt_string("send_timeout"),
1283         NXT_CONF_MAP_MSEC,
1284         offsetof(nxt_socket_conf_t, send_timeout),
1285     },
1286 
1287     {
1288         nxt_string("body_temp_path"),
1289         NXT_CONF_MAP_STR,
1290         offsetof(nxt_socket_conf_t, body_temp_path),
1291     },
1292 
1293     {
1294         nxt_string("discard_unsafe_fields"),
1295         NXT_CONF_MAP_INT8,
1296         offsetof(nxt_socket_conf_t, discard_unsafe_fields),
1297     },
1298 };
1299 
1300 
1301 static nxt_conf_map_t  nxt_router_websocket_conf[] = {
1302     {
1303         nxt_string("max_frame_size"),
1304         NXT_CONF_MAP_SIZE,
1305         offsetof(nxt_websocket_conf_t, max_frame_size),
1306     },
1307 
1308     {
1309         nxt_string("read_timeout"),
1310         NXT_CONF_MAP_MSEC,
1311         offsetof(nxt_websocket_conf_t, read_timeout),
1312     },
1313 
1314     {
1315         nxt_string("keepalive_interval"),
1316         NXT_CONF_MAP_MSEC,
1317         offsetof(nxt_websocket_conf_t, keepalive_interval),
1318     },
1319 
1320 };
1321 
1322 
1323 static nxt_int_t
1324 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1325     u_char *start, u_char *end)
1326 {
1327     u_char                      *p;
1328     size_t                      size;
1329     nxt_mp_t                    *mp, *app_mp;
1330     uint32_t                    next, next_target;
1331     nxt_int_t                   ret;
1332     nxt_str_t                   name, path, target;
1333     nxt_app_t                   *app, *prev;
1334     nxt_str_t                   *t, *s, *targets;
1335     nxt_uint_t                  n, i;
1336     nxt_port_t                  *port;
1337     nxt_router_t                *router;
1338     nxt_app_joint_t             *app_joint;
1339 #if (NXT_TLS)
1340     nxt_conf_value_t            *certificate;
1341 #endif
1342     nxt_conf_value_t            *conf, *http, *value, *websocket;
1343     nxt_conf_value_t            *applications, *application;
1344     nxt_conf_value_t            *listeners, *listener;
1345     nxt_conf_value_t            *routes_conf, *static_conf;
1346     nxt_socket_conf_t           *skcf;
1347     nxt_http_routes_t           *routes;
1348     nxt_event_engine_t          *engine;
1349     nxt_app_lang_module_t       *lang;
1350     nxt_router_app_conf_t       apcf;
1351     nxt_router_access_log_t     *access_log;
1352     nxt_router_listener_conf_t  lscf;
1353 
1354     static nxt_str_t  http_path = nxt_string("/settings/http");
1355     static nxt_str_t  applications_path = nxt_string("/applications");
1356     static nxt_str_t  listeners_path = nxt_string("/listeners");
1357     static nxt_str_t  routes_path = nxt_string("/routes");
1358     static nxt_str_t  access_log_path = nxt_string("/access_log");
1359 #if (NXT_TLS)
1360     static nxt_str_t  certificate_path = nxt_string("/tls/certificate");
1361 #endif
1362     static nxt_str_t  static_path = nxt_string("/settings/http/static");
1363     static nxt_str_t  websocket_path = nxt_string("/settings/http/websocket");
1364 
1365     conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
1366     if (conf == NULL) {
1367         nxt_alert(task, "configuration parsing error");
1368         return NXT_ERROR;
1369     }
1370 
1371     mp = tmcf->router_conf->mem_pool;
1372 
1373     ret = nxt_conf_map_object(mp, conf, nxt_router_conf,
1374                               nxt_nitems(nxt_router_conf), tmcf->router_conf);
1375     if (ret != NXT_OK) {
1376         nxt_alert(task, "root map error");
1377         return NXT_ERROR;
1378     }
1379 
1380     if (tmcf->router_conf->threads == 0) {
1381         tmcf->router_conf->threads = nxt_ncpu;
1382     }
1383 
1384     static_conf = nxt_conf_get_path(conf, &static_path);
1385 
1386     ret = nxt_router_conf_process_static(task, tmcf->router_conf, static_conf);
1387     if (nxt_slow_path(ret != NXT_OK)) {
1388         return NXT_ERROR;
1389     }
1390 
1391     router = tmcf->router_conf->router;
1392 
1393     applications = nxt_conf_get_path(conf, &applications_path);
1394 
1395     if (applications != NULL) {
1396         next = 0;
1397 
1398         for ( ;; ) {
1399             application = nxt_conf_next_object_member(applications,
1400                                                       &name, &next);
1401             if (application == NULL) {
1402                 break;
1403             }
1404 
1405             nxt_debug(task, "application \"%V\"", &name);
1406 
1407             size = nxt_conf_json_length(application, NULL);
1408 
1409             app_mp = nxt_mp_create(4096, 128, 1024, 64);
1410             if (nxt_slow_path(app_mp == NULL)) {
1411                 goto fail;
1412             }
1413 
1414             app = nxt_mp_get(app_mp, sizeof(nxt_app_t) + name.length + size);
1415             if (app == NULL) {
1416                 goto app_fail;
1417             }
1418 
1419             nxt_memzero(app, sizeof(nxt_app_t));
1420 
1421             app->mem_pool = app_mp;
1422 
1423             app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t));
1424             app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t)
1425                                                   + name.length);
1426 
1427             p = nxt_conf_json_print(app->conf.start, application, NULL);
1428             app->conf.length = p - app->conf.start;
1429 
1430             nxt_assert(app->conf.length <= size);
1431 
1432             nxt_debug(task, "application conf \"%V\"", &app->conf);
1433 
1434             prev = nxt_router_app_find(&router->apps, &name);
1435 
1436             if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
1437                 nxt_mp_destroy(app_mp);
1438 
1439                 nxt_queue_remove(&prev->link);
1440                 nxt_queue_insert_tail(&tmcf->previous, &prev->link);
1441 
1442                 ret = nxt_router_apps_hash_add(tmcf->router_conf, prev);
1443                 if (nxt_slow_path(ret != NXT_OK)) {
1444                     goto fail;
1445                 }
1446 
1447                 continue;
1448             }
1449 
1450             apcf.processes = 1;
1451             apcf.max_processes = 1;
1452             apcf.spare_processes = 0;
1453             apcf.timeout = 0;
1454             apcf.idle_timeout = 15000;
1455             apcf.requests = 0;
1456             apcf.limits_value = NULL;
1457             apcf.processes_value = NULL;
1458             apcf.targets_value = NULL;
1459 
1460             app_joint = nxt_malloc(sizeof(nxt_app_joint_t));
1461             if (nxt_slow_path(app_joint == NULL)) {
1462                 goto app_fail;
1463             }
1464 
1465             nxt_memzero(app_joint, sizeof(nxt_app_joint_t));
1466 
1467             ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
1468                                       nxt_nitems(nxt_router_app_conf), &apcf);
1469             if (ret != NXT_OK) {
1470                 nxt_alert(task, "application map error");
1471                 goto app_fail;
1472             }
1473 
1474             if (apcf.limits_value != NULL) {
1475 
1476                 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) {
1477                     nxt_alert(task, "application limits is not object");
1478                     goto app_fail;
1479                 }
1480 
1481                 ret = nxt_conf_map_object(mp, apcf.limits_value,
1482                                         nxt_router_app_limits_conf,
1483                                         nxt_nitems(nxt_router_app_limits_conf),
1484                                         &apcf);
1485                 if (ret != NXT_OK) {
1486                     nxt_alert(task, "application limits map error");
1487                     goto app_fail;
1488                 }
1489             }
1490 
1491             if (apcf.processes_value != NULL
1492                 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT)
1493             {
1494                 ret = nxt_conf_map_object(mp, apcf.processes_value,
1495                                      nxt_router_app_processes_conf,
1496                                      nxt_nitems(nxt_router_app_processes_conf),
1497                                      &apcf);
1498                 if (ret != NXT_OK) {
1499                     nxt_alert(task, "application processes map error");
1500                     goto app_fail;
1501                 }
1502 
1503             } else {
1504                 apcf.max_processes = apcf.processes;
1505                 apcf.spare_processes = apcf.processes;
1506             }
1507 
1508             if (apcf.targets_value != NULL) {
1509                 n = nxt_conf_object_members_count(apcf.targets_value);
1510 
1511                 targets = nxt_mp_get(app_mp, sizeof(nxt_str_t) * n);
1512                 if (nxt_slow_path(targets == NULL)) {
1513                     goto app_fail;
1514                 }
1515 
1516                 next_target = 0;
1517 
1518                 for (i = 0; i < n; i++) {
1519                     (void) nxt_conf_next_object_member(apcf.targets_value,
1520                                                        &target, &next_target);
1521 
1522                     s = nxt_str_dup(app_mp, &targets[i], &target);
1523                     if (nxt_slow_path(s == NULL)) {
1524                         goto app_fail;
1525                     }
1526                 }
1527 
1528             } else {
1529                 targets = NULL;
1530             }
1531 
1532             nxt_debug(task, "application type: %V", &apcf.type);
1533             nxt_debug(task, "application processes: %D", apcf.processes);
1534             nxt_debug(task, "application request timeout: %M", apcf.timeout);
1535             nxt_debug(task, "application requests: %D", apcf.requests);
1536 
1537             lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
1538 
1539             if (lang == NULL) {
1540                 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type);
1541                 goto app_fail;
1542             }
1543 
1544             nxt_debug(task, "application language module: \"%s\"", lang->file);
1545 
1546             ret = nxt_thread_mutex_create(&app->mutex);
1547             if (ret != NXT_OK) {
1548                 goto app_fail;
1549             }
1550 
1551             nxt_queue_init(&app->ports);
1552             nxt_queue_init(&app->spare_ports);
1553             nxt_queue_init(&app->idle_ports);
1554             nxt_queue_init(&app->ack_waiting_req);
1555 
1556             app->name.length = name.length;
1557             nxt_memcpy(app->name.start, name.start, name.length);
1558 
1559             app->type = lang->type;
1560             app->max_processes = apcf.max_processes;
1561             app->spare_processes = apcf.spare_processes;
1562             app->max_pending_processes = apcf.spare_processes
1563                                          ? apcf.spare_processes : 1;
1564             app->timeout = apcf.timeout;
1565             app->idle_timeout = apcf.idle_timeout;
1566             app->max_requests = apcf.requests;
1567 
1568             app->targets = targets;
1569 
1570             engine = task->thread->engine;
1571 
1572             app->engine = engine;
1573 
1574             app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;
1575             app->adjust_idle_work.task = &engine->task;
1576             app->adjust_idle_work.obj = app;
1577 
1578             nxt_queue_insert_tail(&tmcf->apps, &app->link);
1579 
1580             ret = nxt_router_apps_hash_add(tmcf->router_conf, app);
1581             if (nxt_slow_path(ret != NXT_OK)) {
1582                 goto app_fail;
1583             }
1584 
1585             nxt_router_app_use(task, app, 1);
1586 
1587             app->joint = app_joint;
1588 
1589             app_joint->use_count = 1;
1590             app_joint->app = app;
1591 
1592             app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS;
1593             app_joint->idle_timer.work_queue = &engine->fast_work_queue;
1594             app_joint->idle_timer.handler = nxt_router_app_idle_timeout;
1595             app_joint->idle_timer.task = &engine->task;
1596             app_joint->idle_timer.log = app_joint->idle_timer.task->log;
1597 
1598             app_joint->free_app_work.handler = nxt_router_free_app;
1599             app_joint->free_app_work.task = &engine->task;
1600             app_joint->free_app_work.obj = app_joint;
1601 
1602             port = nxt_port_new(task, (nxt_port_id_t) -1, nxt_pid,
1603                                 NXT_PROCESS_APP);
1604             if (nxt_slow_path(port == NULL)) {
1605                 return NXT_ERROR;
1606             }
1607 
1608             ret = nxt_port_socket_init(task, port, 0);
1609             if (nxt_slow_path(ret != NXT_OK)) {
1610                 nxt_port_use(task, port, -1);
1611                 return NXT_ERROR;
1612             }
1613 
1614             ret = nxt_router_app_queue_init(task, port);
1615             if (nxt_slow_path(ret != NXT_OK)) {
1616                 nxt_port_write_close(port);
1617                 nxt_port_read_close(port);
1618                 nxt_port_use(task, port, -1);
1619                 return NXT_ERROR;
1620             }
1621 
1622             nxt_port_write_enable(task, port);
1623             port->app = app;
1624 
1625             app->shared_port = port;
1626 
1627             nxt_thread_mutex_create(&app->outgoing.mutex);
1628         }
1629     }
1630 
1631     routes_conf = nxt_conf_get_path(conf, &routes_path);
1632     if (nxt_fast_path(routes_conf != NULL)) {
1633         routes = nxt_http_routes_create(task, tmcf, routes_conf);
1634         if (nxt_slow_path(routes == NULL)) {
1635             return NXT_ERROR;
1636         }
1637         tmcf->router_conf->routes = routes;
1638     }
1639 
1640     ret = nxt_upstreams_create(task, tmcf, conf);
1641     if (nxt_slow_path(ret != NXT_OK)) {
1642         return ret;
1643     }
1644 
1645     http = nxt_conf_get_path(conf, &http_path);
1646 #if 0
1647     if (http == NULL) {
1648         nxt_alert(task, "no \"http\" block");
1649         return NXT_ERROR;
1650     }
1651 #endif
1652 
1653     websocket = nxt_conf_get_path(conf, &websocket_path);
1654 
1655     listeners = nxt_conf_get_path(conf, &listeners_path);
1656 
1657     if (listeners != NULL) {
1658         next = 0;
1659 
1660         for ( ;; ) {
1661             listener = nxt_conf_next_object_member(listeners, &name, &next);
1662             if (listener == NULL) {
1663                 break;
1664             }
1665 
1666             skcf = nxt_router_socket_conf(task, tmcf, &name);
1667             if (skcf == NULL) {
1668                 goto fail;
1669             }
1670 
1671             nxt_memzero(&lscf, sizeof(lscf));
1672 
1673             ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
1674                                       nxt_nitems(nxt_router_listener_conf),
1675                                       &lscf);
1676             if (ret != NXT_OK) {
1677                 nxt_alert(task, "listener map error");
1678                 goto fail;
1679             }
1680 
1681             nxt_debug(task, "application: %V", &lscf.application);
1682 
1683             // STUB, default values if http block is not defined.
1684             skcf->header_buffer_size = 2048;
1685             skcf->large_header_buffer_size = 8192;
1686             skcf->large_header_buffers = 4;
1687             skcf->discard_unsafe_fields = 1;
1688             skcf->body_buffer_size = 16 * 1024;
1689             skcf->max_body_size = 8 * 1024 * 1024;
1690             skcf->proxy_header_buffer_size = 64 * 1024;
1691             skcf->proxy_buffer_size = 4096;
1692             skcf->proxy_buffers = 256;
1693             skcf->idle_timeout = 180 * 1000;
1694             skcf->header_read_timeout = 30 * 1000;
1695             skcf->body_read_timeout = 30 * 1000;
1696             skcf->send_timeout = 30 * 1000;
1697             skcf->proxy_timeout = 60 * 1000;
1698             skcf->proxy_send_timeout = 30 * 1000;
1699             skcf->proxy_read_timeout = 30 * 1000;
1700 
1701             skcf->websocket_conf.max_frame_size = 1024 * 1024;
1702             skcf->websocket_conf.read_timeout = 60 * 1000;
1703             skcf->websocket_conf.keepalive_interval = 30 * 1000;
1704 
1705             nxt_str_null(&skcf->body_temp_path);
1706 
1707             if (http != NULL) {
1708                 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
1709                                           nxt_nitems(nxt_router_http_conf),
1710                                           skcf);
1711                 if (ret != NXT_OK) {
1712                     nxt_alert(task, "http map error");
1713                     goto fail;
1714                 }
1715             }
1716 
1717             if (websocket != NULL) {
1718                 ret = nxt_conf_map_object(mp, websocket,
1719                                           nxt_router_websocket_conf,
1720                                           nxt_nitems(nxt_router_websocket_conf),
1721                                           &skcf->websocket_conf);
1722                 if (ret != NXT_OK) {
1723                     nxt_alert(task, "websocket map error");
1724                     goto fail;
1725                 }
1726             }
1727 
1728             t = &skcf->body_temp_path;
1729 
1730             if (t->length == 0) {
1731                 t->start = (u_char *) task->thread->runtime->tmp;
1732                 t->length = nxt_strlen(t->start);
1733             }
1734 
1735 #if (NXT_TLS)
1736             certificate = nxt_conf_get_path(listener, &certificate_path);
1737 
1738             if (certificate != NULL) {
1739                 if (nxt_conf_type(certificate) == NXT_CONF_ARRAY) {
1740                     n = nxt_conf_array_elements_count(certificate);
1741 
1742                     for (i = 0; i < n; i++) {
1743                         value = nxt_conf_get_array_element(certificate, i);
1744 
1745                         nxt_assert(value != NULL);
1746 
1747                         ret = nxt_router_conf_tls_insert(tmcf, value, skcf);
1748                         if (nxt_slow_path(ret != NXT_OK)) {
1749                             goto fail;
1750                         }
1751                     }
1752 
1753                 } else {
1754                     /* NXT_CONF_STRING */
1755                     ret = nxt_router_conf_tls_insert(tmcf, certificate, skcf);
1756                     if (nxt_slow_path(ret != NXT_OK)) {
1757                         goto fail;
1758                     }
1759                 }
1760             }
1761 #endif
1762 
1763             skcf->listen->handler = nxt_http_conn_init;
1764             skcf->router_conf = tmcf->router_conf;
1765             skcf->router_conf->count++;
1766 
1767             if (lscf.pass.length != 0) {
1768                 skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass);
1769 
1770             /* COMPATIBILITY: listener application. */
1771             } else if (lscf.application.length > 0) {
1772                 skcf->action = nxt_http_pass_application(task,
1773                                                          tmcf->router_conf,
1774                                                          &lscf.application);
1775             }
1776 
1777             if (nxt_slow_path(skcf->action == NULL)) {
1778                 goto fail;
1779             }
1780         }
1781     }
1782 
1783     ret = nxt_http_routes_resolve(task, tmcf);
1784     if (nxt_slow_path(ret != NXT_OK)) {
1785         goto fail;
1786     }
1787 
1788     value = nxt_conf_get_path(conf, &access_log_path);
1789 
1790     if (value != NULL) {
1791         nxt_conf_get_string(value, &path);
1792 
1793         access_log = router->access_log;
1794 
1795         if (access_log != NULL && nxt_strstr_eq(&path, &access_log->path)) {
1796             nxt_thread_spin_lock(&router->lock);
1797             access_log->count++;
1798             nxt_thread_spin_unlock(&router->lock);
1799 
1800         } else {
1801             access_log = nxt_malloc(sizeof(nxt_router_access_log_t)
1802                                     + path.length);
1803             if (access_log == NULL) {
1804                 nxt_alert(task, "failed to allocate access log structure");
1805                 goto fail;
1806             }
1807 
1808             access_log->fd = -1;
1809             access_log->handler = &nxt_router_access_log_writer;
1810             access_log->count = 1;
1811 
1812             access_log->path.length = path.length;
1813             access_log->path.start = (u_char *) access_log
1814                                      + sizeof(nxt_router_access_log_t);
1815 
1816             nxt_memcpy(access_log->path.start, path.start, path.length);
1817         }
1818 
1819         tmcf->router_conf->access_log = access_log;
1820     }
1821 
1822     nxt_queue_add(&deleting_sockets, &router->sockets);
1823     nxt_queue_init(&router->sockets);
1824 
1825     return NXT_OK;
1826 
1827 app_fail:
1828 
1829     nxt_mp_destroy(app_mp);
1830 
1831 fail:
1832 
1833     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1834 
1835         nxt_queue_remove(&app->link);
1836         nxt_thread_mutex_destroy(&app->mutex);
1837         nxt_mp_destroy(app->mem_pool);
1838 
1839     } nxt_queue_loop;
1840 
1841     return NXT_ERROR;
1842 }
1843 
1844 
1845 #if (NXT_TLS)
1846 
1847 static nxt_int_t
1848 nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
1849     nxt_conf_value_t *value, nxt_socket_conf_t *skcf)
1850 {
1851     nxt_mp_t              *mp;
1852     nxt_str_t             str;
1853     nxt_router_tlssock_t  *tls;
1854 
1855     mp = tmcf->router_conf->mem_pool;
1856 
1857     tls = nxt_mp_get(mp, sizeof(nxt_router_tlssock_t));
1858     if (nxt_slow_path(tls == NULL)) {
1859         return NXT_ERROR;
1860     }
1861 
1862     tls->conf = skcf;
1863     nxt_conf_get_string(value, &str);
1864 
1865     if (nxt_slow_path(nxt_str_dup(mp, &tls->name, &str) == NULL)) {
1866         return NXT_ERROR;
1867     }
1868 
1869     nxt_queue_insert_tail(&tmcf->tls, &tls->link);
1870 
1871     return NXT_OK;
1872 }
1873 
1874 #endif
1875 
1876 
1877 static nxt_int_t
1878 nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf,
1879     nxt_conf_value_t *conf)
1880 {
1881     uint32_t          next, i;
1882     nxt_mp_t          *mp;
1883     nxt_str_t         *type, extension, str;
1884     nxt_int_t         ret;
1885     nxt_uint_t        exts;
1886     nxt_conf_value_t  *mtypes_conf, *ext_conf, *value;
1887 
1888     static nxt_str_t  mtypes_path = nxt_string("/mime_types");
1889 
1890     mp = rtcf->mem_pool;
1891 
1892     ret = nxt_http_static_mtypes_init(mp, &rtcf->mtypes_hash);
1893     if (nxt_slow_path(ret != NXT_OK)) {
1894         return NXT_ERROR;
1895     }
1896 
1897     if (conf == NULL) {
1898         return NXT_OK;
1899     }
1900 
1901     mtypes_conf = nxt_conf_get_path(conf, &mtypes_path);
1902 
1903     if (mtypes_conf != NULL) {
1904         next = 0;
1905 
1906         for ( ;; ) {
1907             ext_conf = nxt_conf_next_object_member(mtypes_conf, &str, &next);
1908 
1909             if (ext_conf == NULL) {
1910                 break;
1911             }
1912 
1913             type = nxt_str_dup(mp, NULL, &str);
1914             if (nxt_slow_path(type == NULL)) {
1915                 return NXT_ERROR;
1916             }
1917 
1918             if (nxt_conf_type(ext_conf) == NXT_CONF_STRING) {
1919                 nxt_conf_get_string(ext_conf, &str);
1920 
1921                 if (nxt_slow_path(nxt_str_dup(mp, &extension, &str) == NULL)) {
1922                     return NXT_ERROR;
1923                 }
1924 
1925                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
1926                                                       &extension, type);
1927                 if (nxt_slow_path(ret != NXT_OK)) {
1928                     return NXT_ERROR;
1929                 }
1930 
1931                 continue;
1932             }
1933 
1934             exts = nxt_conf_array_elements_count(ext_conf);
1935 
1936             for (i = 0; i < exts; i++) {
1937                 value = nxt_conf_get_array_element(ext_conf, i);
1938 
1939                 nxt_conf_get_string(value, &str);
1940 
1941                 if (nxt_slow_path(nxt_str_dup(mp, &extension, &str) == NULL)) {
1942                     return NXT_ERROR;
1943                 }
1944 
1945                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
1946                                                       &extension, type);
1947                 if (nxt_slow_path(ret != NXT_OK)) {
1948                     return NXT_ERROR;
1949                 }
1950             }
1951         }
1952     }
1953 
1954     return NXT_OK;
1955 }
1956 
1957 
1958 static nxt_app_t *
1959 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
1960 {
1961     nxt_app_t  *app;
1962 
1963     nxt_queue_each(app, queue, nxt_app_t, link) {
1964 
1965         if (nxt_strstr_eq(name, &app->name)) {
1966             return app;
1967         }
1968 
1969     } nxt_queue_loop;
1970 
1971     return NULL;
1972 }
1973 
1974 
1975 static nxt_int_t
1976 nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port)
1977 {
1978     void       *mem;
1979     nxt_int_t  fd;
1980 
1981     fd = nxt_shm_open(task, sizeof(nxt_app_queue_t));
1982     if (nxt_slow_path(fd == -1)) {
1983         return NXT_ERROR;
1984     }
1985 
1986     mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t),
1987                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
1988     if (nxt_slow_path(mem == MAP_FAILED)) {
1989         nxt_fd_close(fd);
1990 
1991         return NXT_ERROR;
1992     }
1993 
1994     nxt_app_queue_init(mem);
1995 
1996     port->queue_fd = fd;
1997     port->queue = mem;
1998 
1999     return NXT_OK;
2000 }
2001 
2002 
2003 static nxt_int_t
2004 nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port)
2005 {
2006     void       *mem;
2007     nxt_int_t  fd;
2008 
2009     fd = nxt_shm_open(task, sizeof(nxt_port_queue_t));
2010     if (nxt_slow_path(fd == -1)) {
2011         return NXT_ERROR;
2012     }
2013 
2014     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2015                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2016     if (nxt_slow_path(mem == MAP_FAILED)) {
2017         nxt_fd_close(fd);
2018 
2019         return NXT_ERROR;
2020     }
2021 
2022     nxt_port_queue_init(mem);
2023 
2024     port->queue_fd = fd;
2025     port->queue = mem;
2026 
2027     return NXT_OK;
2028 }
2029 
2030 
2031 static nxt_int_t
2032 nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd)
2033 {
2034     void  *mem;
2035 
2036     nxt_assert(fd != -1);
2037 
2038     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2039                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2040     if (nxt_slow_path(mem == MAP_FAILED)) {
2041 
2042         return NXT_ERROR;
2043     }
2044 
2045     port->queue = mem;
2046 
2047     return NXT_OK;
2048 }
2049 
2050 
2051 static const nxt_lvlhsh_proto_t  nxt_router_apps_hash_proto  nxt_aligned(64) = {
2052     NXT_LVLHSH_DEFAULT,
2053     nxt_router_apps_hash_test,
2054     nxt_mp_lvlhsh_alloc,
2055     nxt_mp_lvlhsh_free,
2056 };
2057 
2058 
2059 static nxt_int_t
2060 nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
2061 {
2062     nxt_app_t  *app;
2063 
2064     app = data;
2065 
2066     return nxt_strstr_eq(&lhq->key, &app->name) ? NXT_OK : NXT_DECLINED;
2067 }
2068 
2069 
2070 static nxt_int_t
2071 nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, nxt_app_t *app)
2072 {
2073     nxt_lvlhsh_query_t  lhq;
2074 
2075     lhq.key_hash = nxt_djb_hash(app->name.start, app->name.length);
2076     lhq.replace = 0;
2077     lhq.key = app->name;
2078     lhq.value = app;
2079     lhq.proto = &nxt_router_apps_hash_proto;
2080     lhq.pool = rtcf->mem_pool;
2081 
2082     switch (nxt_lvlhsh_insert(&rtcf->apps_hash, &lhq)) {
2083 
2084     case NXT_OK:
2085         return NXT_OK;
2086 
2087     case NXT_DECLINED:
2088         nxt_thread_log_alert("router app hash adding failed: "
2089                              "\"%V\" is already in hash", &lhq.key);
2090         /* Fall through. */
2091     default:
2092         return NXT_ERROR;
2093     }
2094 }
2095 
2096 
2097 static nxt_app_t *
2098 nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, nxt_str_t *name)
2099 {
2100     nxt_lvlhsh_query_t  lhq;
2101 
2102     lhq.key_hash = nxt_djb_hash(name->start, name->length);
2103     lhq.key = *name;
2104     lhq.proto = &nxt_router_apps_hash_proto;
2105 
2106     if (nxt_lvlhsh_find(&rtcf->apps_hash, &lhq) != NXT_OK) {
2107         return NULL;
2108     }
2109 
2110     return lhq.value;
2111 }
2112 
2113 
2114 static void
2115 nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i)
2116 {
2117     nxt_app_t          *app;
2118     nxt_lvlhsh_each_t  lhe;
2119 
2120     nxt_lvlhsh_each_init(&lhe, &nxt_router_apps_hash_proto);
2121 
2122     for ( ;; ) {
2123         app = nxt_lvlhsh_each(&rtcf->apps_hash, &lhe);
2124 
2125         if (app == NULL) {
2126             break;
2127         }
2128 
2129         nxt_router_app_use(task, app, i);
2130     }
2131 }
2132 
2133 
2134 
2135 nxt_int_t
2136 nxt_router_listener_application(nxt_router_conf_t *rtcf, nxt_str_t *name,
2137     nxt_http_action_t *action)
2138 {
2139     nxt_app_t  *app;
2140 
2141     app = nxt_router_apps_hash_get(rtcf, name);
2142 
2143     if (app == NULL) {
2144         return NXT_DECLINED;
2145     }
2146 
2147     action->u.app.application = app;
2148     action->handler = nxt_http_application_handler;
2149 
2150     return NXT_OK;
2151 }
2152 
2153 
2154 static nxt_socket_conf_t *
2155 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2156     nxt_str_t *name)
2157 {
2158     size_t               size;
2159     nxt_int_t            ret;
2160     nxt_bool_t           wildcard;
2161     nxt_sockaddr_t       *sa;
2162     nxt_socket_conf_t    *skcf;
2163     nxt_listen_socket_t  *ls;
2164 
2165     sa = nxt_sockaddr_parse(tmcf->mem_pool, name);
2166     if (nxt_slow_path(sa == NULL)) {
2167         nxt_alert(task, "invalid listener \"%V\"", name);
2168         return NULL;
2169     }
2170 
2171     sa->type = SOCK_STREAM;
2172 
2173     nxt_debug(task, "router listener: \"%*s\"",
2174               (size_t) sa->length, nxt_sockaddr_start(sa));
2175 
2176     skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t));
2177     if (nxt_slow_path(skcf == NULL)) {
2178         return NULL;
2179     }
2180 
2181     size = nxt_sockaddr_size(sa);
2182 
2183     ret = nxt_router_listen_socket_find(tmcf, skcf, sa);
2184 
2185     if (ret != NXT_OK) {
2186 
2187         ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size);
2188         if (nxt_slow_path(ls == NULL)) {
2189             return NULL;
2190         }
2191 
2192         skcf->listen = ls;
2193 
2194         ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t));
2195         nxt_memcpy(ls->sockaddr, sa, size);
2196 
2197         nxt_listen_socket_remote_size(ls);
2198 
2199         ls->socket = -1;
2200         ls->backlog = NXT_LISTEN_BACKLOG;
2201         ls->flags = NXT_NONBLOCK;
2202         ls->read_after_accept = 1;
2203     }
2204 
2205     switch (sa->u.sockaddr.sa_family) {
2206 #if (NXT_HAVE_UNIX_DOMAIN)
2207     case AF_UNIX:
2208         wildcard = 0;
2209         break;
2210 #endif
2211 #if (NXT_INET6)
2212     case AF_INET6:
2213         wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr);
2214         break;
2215 #endif
2216     case AF_INET:
2217     default:
2218         wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY);
2219         break;
2220     }
2221 
2222     if (!wildcard) {
2223         skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size);
2224         if (nxt_slow_path(skcf->sockaddr == NULL)) {
2225             return NULL;
2226         }
2227 
2228         nxt_memcpy(skcf->sockaddr, sa, size);
2229     }
2230 
2231     return skcf;
2232 }
2233 
2234 
2235 static nxt_int_t
2236 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
2237     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa)
2238 {
2239     nxt_router_t       *router;
2240     nxt_queue_link_t   *qlk;
2241     nxt_socket_conf_t  *skcf;
2242 
2243     router = tmcf->router_conf->router;
2244 
2245     for (qlk = nxt_queue_first(&router->sockets);
2246          qlk != nxt_queue_tail(&router->sockets);
2247          qlk = nxt_queue_next(qlk))
2248     {
2249         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2250 
2251         if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) {
2252             nskcf->listen = skcf->listen;
2253 
2254             nxt_queue_remove(qlk);
2255             nxt_queue_insert_tail(&keeping_sockets, qlk);
2256 
2257             nxt_queue_insert_tail(&updating_sockets, &nskcf->link);
2258 
2259             return NXT_OK;
2260         }
2261     }
2262 
2263     nxt_queue_insert_tail(&pending_sockets, &nskcf->link);
2264 
2265     return NXT_DECLINED;
2266 }
2267 
2268 
2269 static void
2270 nxt_router_listen_socket_rpc_create(nxt_task_t *task,
2271     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)
2272 {
2273     size_t            size;
2274     uint32_t          stream;
2275     nxt_int_t         ret;
2276     nxt_buf_t         *b;
2277     nxt_port_t        *main_port, *router_port;
2278     nxt_runtime_t     *rt;
2279     nxt_socket_rpc_t  *rpc;
2280 
2281     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
2282     if (rpc == NULL) {
2283         goto fail;
2284     }
2285 
2286     rpc->socket_conf = skcf;
2287     rpc->temp_conf = tmcf;
2288 
2289     size = nxt_sockaddr_size(skcf->listen->sockaddr);
2290 
2291     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2292     if (b == NULL) {
2293         goto fail;
2294     }
2295 
2296     b->completion_handler = nxt_router_dummy_buf_completion;
2297 
2298     b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size);
2299 
2300     rt = task->thread->runtime;
2301     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2302     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2303 
2304     stream = nxt_port_rpc_register_handler(task, router_port,
2305                                            nxt_router_listen_socket_ready,
2306                                            nxt_router_listen_socket_error,
2307                                            main_port->pid, rpc);
2308     if (nxt_slow_path(stream == 0)) {
2309         goto fail;
2310     }
2311 
2312     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1,
2313                                 stream, router_port->id, b);
2314 
2315     if (nxt_slow_path(ret != NXT_OK)) {
2316         nxt_port_rpc_cancel(task, router_port, stream);
2317         goto fail;
2318     }
2319 
2320     return;
2321 
2322 fail:
2323 
2324     nxt_router_conf_error(task, tmcf);
2325 }
2326 
2327 
2328 static void
2329 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2330     void *data)
2331 {
2332     nxt_int_t         ret;
2333     nxt_socket_t      s;
2334     nxt_socket_rpc_t  *rpc;
2335 
2336     rpc = data;
2337 
2338     s = msg->fd[0];
2339 
2340     ret = nxt_socket_nonblocking(task, s);
2341     if (nxt_slow_path(ret != NXT_OK)) {
2342         goto fail;
2343     }
2344 
2345     nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr);
2346 
2347     ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
2348     if (nxt_slow_path(ret != NXT_OK)) {
2349         goto fail;
2350     }
2351 
2352     rpc->socket_conf->listen->socket = s;
2353 
2354     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2355                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2356 
2357     return;
2358 
2359 fail:
2360 
2361     nxt_socket_close(task, s);
2362 
2363     nxt_router_conf_error(task, rpc->temp_conf);
2364 }
2365 
2366 
2367 static void
2368 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2369     void *data)
2370 {
2371     nxt_socket_rpc_t        *rpc;
2372     nxt_router_temp_conf_t  *tmcf;
2373 
2374     rpc = data;
2375     tmcf = rpc->temp_conf;
2376 
2377 #if 0
2378     u_char                  *p;
2379     size_t                  size;
2380     uint8_t                 error;
2381     nxt_buf_t               *in, *out;
2382     nxt_sockaddr_t          *sa;
2383 
2384     static nxt_str_t  socket_errors[] = {
2385         nxt_string("ListenerSystem"),
2386         nxt_string("ListenerNoIPv6"),
2387         nxt_string("ListenerPort"),
2388         nxt_string("ListenerInUse"),
2389         nxt_string("ListenerNoAddress"),
2390         nxt_string("ListenerNoAccess"),
2391         nxt_string("ListenerPath"),
2392     };
2393 
2394     sa = rpc->socket_conf->listen->sockaddr;
2395 
2396     in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size);
2397 
2398     if (nxt_slow_path(in == NULL)) {
2399         return;
2400     }
2401 
2402     p = in->mem.pos;
2403 
2404     error = *p++;
2405 
2406     size = nxt_length("listen socket error: ")
2407            + nxt_length("{listener: \"\", code:\"\", message: \"\"}")
2408            + sa->length + socket_errors[error].length + (in->mem.free - p);
2409 
2410     out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2411     if (nxt_slow_path(out == NULL)) {
2412         return;
2413     }
2414 
2415     out->mem.free = nxt_sprintf(out->mem.free, out->mem.end,
2416                         "listen socket error: "
2417                         "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}",
2418                         (size_t) sa->length, nxt_sockaddr_start(sa),
2419                         &socket_errors[error], in->mem.free - p, p);
2420 
2421     nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos);
2422 #endif
2423 
2424     nxt_router_conf_error(task, tmcf);
2425 }
2426 
2427 
2428 #if (NXT_TLS)
2429 
2430 static void
2431 nxt_router_tls_rpc_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2432     nxt_router_tlssock_t *tls, nxt_bool_t last)
2433 {
2434     nxt_socket_rpc_t  *rpc;
2435 
2436     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
2437     if (rpc == NULL) {
2438         nxt_router_conf_error(task, tmcf);
2439         return;
2440     }
2441 
2442     rpc->name = &tls->name;
2443     rpc->socket_conf = tls->conf;
2444     rpc->temp_conf = tmcf;
2445     rpc->last = last;
2446 
2447     nxt_cert_store_get(task, &tls->name, tmcf->mem_pool,
2448                        nxt_router_tls_rpc_handler, rpc);
2449 }
2450 
2451 
2452 static void
2453 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2454     void *data)
2455 {
2456     nxt_mp_t                *mp;
2457     nxt_int_t               ret;
2458     nxt_tls_conf_t          *tlscf;
2459     nxt_socket_rpc_t        *rpc;
2460     nxt_tls_bundle_conf_t   *bundle;
2461     nxt_router_temp_conf_t  *tmcf;
2462 
2463     nxt_debug(task, "tls rpc handler");
2464 
2465     rpc = data;
2466     tmcf = rpc->temp_conf;
2467 
2468     if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) {
2469         goto fail;
2470     }
2471 
2472     mp = tmcf->router_conf->mem_pool;
2473 
2474     if (rpc->socket_conf->tls == NULL){
2475         tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t));
2476         if (nxt_slow_path(tlscf == NULL)) {
2477             goto fail;
2478         }
2479 
2480         rpc->socket_conf->tls = tlscf;
2481 
2482     } else {
2483         tlscf = rpc->socket_conf->tls;
2484     }
2485 
2486     bundle = nxt_mp_get(mp, sizeof(nxt_tls_bundle_conf_t));
2487     if (nxt_slow_path(bundle == NULL)) {
2488         goto fail;
2489     }
2490 
2491     bundle->name = rpc->name;
2492     bundle->chain_file = msg->fd[0];
2493     bundle->next = tlscf->bundle;
2494     tlscf->bundle = bundle;
2495 
2496     ret = task->thread->runtime->tls->server_init(task, tlscf, mp, rpc->last);
2497     if (nxt_slow_path(ret != NXT_OK)) {
2498         goto fail;
2499     }
2500 
2501     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2502                        nxt_router_conf_apply, task, tmcf, NULL);
2503     return;
2504 
2505 fail:
2506 
2507     nxt_router_conf_error(task, tmcf);
2508 }
2509 
2510 #endif
2511 
2512 
2513 static void
2514 nxt_router_app_rpc_create(nxt_task_t *task,
2515     nxt_router_temp_conf_t *tmcf, nxt_app_t *app)
2516 {
2517     size_t         size;
2518     uint32_t       stream;
2519     nxt_int_t      ret;
2520     nxt_buf_t      *b;
2521     nxt_port_t     *main_port, *router_port;
2522     nxt_runtime_t  *rt;
2523     nxt_app_rpc_t  *rpc;
2524 
2525     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_app_rpc_t));
2526     if (rpc == NULL) {
2527         goto fail;
2528     }
2529 
2530     rpc->app = app;
2531     rpc->temp_conf = tmcf;
2532 
2533     nxt_debug(task, "app '%V' prefork", &app->name);
2534 
2535     size = app->name.length + 1 + app->conf.length;
2536 
2537     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2538     if (nxt_slow_path(b == NULL)) {
2539         goto fail;
2540     }
2541 
2542     b->completion_handler = nxt_router_dummy_buf_completion;
2543 
2544     nxt_buf_cpystr(b, &app->name);
2545     *b->mem.free++ = '\0';
2546     nxt_buf_cpystr(b, &app->conf);
2547 
2548     rt = task->thread->runtime;
2549     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2550     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2551 
2552     stream = nxt_port_rpc_register_handler(task, router_port,
2553                                            nxt_router_app_prefork_ready,
2554                                            nxt_router_app_prefork_error,
2555                                            -1, rpc);
2556     if (nxt_slow_path(stream == 0)) {
2557         goto fail;
2558     }
2559 
2560     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS,
2561                                 -1, stream, router_port->id, b);
2562 
2563     if (nxt_slow_path(ret != NXT_OK)) {
2564         nxt_port_rpc_cancel(task, router_port, stream);
2565         goto fail;
2566     }
2567 
2568     app->pending_processes++;
2569 
2570     return;
2571 
2572 fail:
2573 
2574     nxt_router_conf_error(task, tmcf);
2575 }
2576 
2577 
2578 static void
2579 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2580     void *data)
2581 {
2582     nxt_app_t           *app;
2583     nxt_port_t          *port;
2584     nxt_app_rpc_t       *rpc;
2585     nxt_event_engine_t  *engine;
2586 
2587     rpc = data;
2588     app = rpc->app;
2589 
2590     port = msg->u.new_port;
2591 
2592     nxt_assert(port != NULL);
2593     nxt_assert(port->type == NXT_PROCESS_APP);
2594     nxt_assert(port->id == 0);
2595 
2596     port->app = app;
2597     port->main_app_port = port;
2598 
2599     app->pending_processes--;
2600     app->processes++;
2601     app->idle_processes++;
2602 
2603     engine = task->thread->engine;
2604 
2605     nxt_queue_insert_tail(&app->ports, &port->app_link);
2606     nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
2607 
2608     nxt_debug(task, "app '%V' move new port %PI:%d to spare_ports",
2609               &app->name, port->pid, port->id);
2610 
2611     nxt_port_hash_add(&app->port_hash, port);
2612     app->port_hash_count++;
2613 
2614     port->idle_start = 0;
2615 
2616     nxt_port_inc_use(port);
2617 
2618     nxt_router_app_shared_port_send(task, port);
2619 
2620     nxt_work_queue_add(&engine->fast_work_queue,
2621                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2622 }
2623 
2624 
2625 static void
2626 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2627     void *data)
2628 {
2629     nxt_app_t               *app;
2630     nxt_app_rpc_t           *rpc;
2631     nxt_router_temp_conf_t  *tmcf;
2632 
2633     rpc = data;
2634     app = rpc->app;
2635     tmcf = rpc->temp_conf;
2636 
2637     nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"",
2638             &app->name);
2639 
2640     app->pending_processes--;
2641 
2642     nxt_router_conf_error(task, tmcf);
2643 }
2644 
2645 
2646 static nxt_int_t
2647 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
2648     nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
2649 {
2650     nxt_int_t                 ret;
2651     nxt_uint_t                n, threads;
2652     nxt_queue_link_t          *qlk;
2653     nxt_router_engine_conf_t  *recf;
2654 
2655     threads = tmcf->router_conf->threads;
2656 
2657     tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
2658                                      sizeof(nxt_router_engine_conf_t));
2659     if (nxt_slow_path(tmcf->engines == NULL)) {
2660         return NXT_ERROR;
2661     }
2662 
2663     n = 0;
2664 
2665     for (qlk = nxt_queue_first(&router->engines);
2666          qlk != nxt_queue_tail(&router->engines);
2667          qlk = nxt_queue_next(qlk))
2668     {
2669         recf = nxt_array_zero_add(tmcf->engines);
2670         if (nxt_slow_path(recf == NULL)) {
2671             return NXT_ERROR;
2672         }
2673 
2674         recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0);
2675 
2676         if (n < threads) {
2677             recf->action = NXT_ROUTER_ENGINE_KEEP;
2678             ret = nxt_router_engine_conf_update(tmcf, recf);
2679 
2680         } else {
2681             recf->action = NXT_ROUTER_ENGINE_DELETE;
2682             ret = nxt_router_engine_conf_delete(tmcf, recf);
2683         }
2684 
2685         if (nxt_slow_path(ret != NXT_OK)) {
2686             return ret;
2687         }
2688 
2689         n++;
2690     }
2691 
2692     tmcf->new_threads = n;
2693 
2694     while (n < threads) {
2695         recf = nxt_array_zero_add(tmcf->engines);
2696         if (nxt_slow_path(recf == NULL)) {
2697             return NXT_ERROR;
2698         }
2699 
2700         recf->action = NXT_ROUTER_ENGINE_ADD;
2701 
2702         recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
2703         if (nxt_slow_path(recf->engine == NULL)) {
2704             return NXT_ERROR;
2705         }
2706 
2707         ret = nxt_router_engine_conf_create(tmcf, recf);
2708         if (nxt_slow_path(ret != NXT_OK)) {
2709             return ret;
2710         }
2711 
2712         n++;
2713     }
2714 
2715     return NXT_OK;
2716 }
2717 
2718 
2719 static nxt_int_t
2720 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
2721     nxt_router_engine_conf_t *recf)
2722 {
2723     nxt_int_t  ret;
2724 
2725     ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
2726                                           nxt_router_listen_socket_create);
2727     if (nxt_slow_path(ret != NXT_OK)) {
2728         return ret;
2729     }
2730 
2731     ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
2732                                           nxt_router_listen_socket_create);
2733     if (nxt_slow_path(ret != NXT_OK)) {
2734         return ret;
2735     }
2736 
2737     return ret;
2738 }
2739 
2740 
2741 static nxt_int_t
2742 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
2743     nxt_router_engine_conf_t *recf)
2744 {
2745     nxt_int_t  ret;
2746 
2747     ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
2748                                           nxt_router_listen_socket_create);
2749     if (nxt_slow_path(ret != NXT_OK)) {
2750         return ret;
2751     }
2752 
2753     ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
2754                                           nxt_router_listen_socket_update);
2755     if (nxt_slow_path(ret != NXT_OK)) {
2756         return ret;
2757     }
2758 
2759     ret = nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
2760     if (nxt_slow_path(ret != NXT_OK)) {
2761         return ret;
2762     }
2763 
2764     return ret;
2765 }
2766 
2767 
2768 static nxt_int_t
2769 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
2770     nxt_router_engine_conf_t *recf)
2771 {
2772     nxt_int_t  ret;
2773 
2774     ret = nxt_router_engine_quit(tmcf, recf);
2775     if (nxt_slow_path(ret != NXT_OK)) {
2776         return ret;
2777     }
2778 
2779     ret = nxt_router_engine_joints_delete(tmcf, recf, &updating_sockets);
2780     if (nxt_slow_path(ret != NXT_OK)) {
2781         return ret;
2782     }
2783 
2784     return nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
2785 }
2786 
2787 
2788 static nxt_int_t
2789 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
2790     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
2791     nxt_work_handler_t handler)
2792 {
2793     nxt_int_t                ret;
2794     nxt_joint_job_t          *job;
2795     nxt_queue_link_t         *qlk;
2796     nxt_socket_conf_t        *skcf;
2797     nxt_socket_conf_joint_t  *joint;
2798 
2799     for (qlk = nxt_queue_first(sockets);
2800          qlk != nxt_queue_tail(sockets);
2801          qlk = nxt_queue_next(qlk))
2802     {
2803         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
2804         if (nxt_slow_path(job == NULL)) {
2805             return NXT_ERROR;
2806         }
2807 
2808         job->work.next = recf->jobs;
2809         recf->jobs = &job->work;
2810 
2811         job->task = tmcf->engine->task;
2812         job->work.handler = handler;
2813         job->work.task = &job->task;
2814         job->work.obj = job;
2815         job->tmcf = tmcf;
2816 
2817         tmcf->count++;
2818 
2819         joint = nxt_mp_alloc(tmcf->router_conf->mem_pool,
2820                              sizeof(nxt_socket_conf_joint_t));
2821         if (nxt_slow_path(joint == NULL)) {
2822             return NXT_ERROR;
2823         }
2824 
2825         job->work.data = joint;
2826 
2827         ret = nxt_upstreams_joint_create(tmcf, &joint->upstreams);
2828         if (nxt_slow_path(ret != NXT_OK)) {
2829             return ret;
2830         }
2831 
2832         joint->count = 1;
2833 
2834         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2835         skcf->count++;
2836         joint->socket_conf = skcf;
2837 
2838         joint->engine = recf->engine;
2839     }
2840 
2841     return NXT_OK;
2842 }
2843 
2844 
2845 static nxt_int_t
2846 nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
2847     nxt_router_engine_conf_t *recf)
2848 {
2849     nxt_joint_job_t  *job;
2850 
2851     job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
2852     if (nxt_slow_path(job == NULL)) {
2853         return NXT_ERROR;
2854     }
2855 
2856     job->work.next = recf->jobs;
2857     recf->jobs = &job->work;
2858 
2859     job->task = tmcf->engine->task;
2860     job->work.handler = nxt_router_worker_thread_quit;
2861     job->work.task = &job->task;
2862     job->work.obj = NULL;
2863     job->work.data = NULL;
2864     job->tmcf = NULL;
2865 
2866     return NXT_OK;
2867 }
2868 
2869 
2870 static nxt_int_t
2871 nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
2872     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets)
2873 {
2874     nxt_joint_job_t   *job;
2875     nxt_queue_link_t  *qlk;
2876 
2877     for (qlk = nxt_queue_first(sockets);
2878          qlk != nxt_queue_tail(sockets);
2879          qlk = nxt_queue_next(qlk))
2880     {
2881         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
2882         if (nxt_slow_path(job == NULL)) {
2883             return NXT_ERROR;
2884         }
2885 
2886         job->work.next = recf->jobs;
2887         recf->jobs = &job->work;
2888 
2889         job->task = tmcf->engine->task;
2890         job->work.handler = nxt_router_listen_socket_delete;
2891         job->work.task = &job->task;
2892         job->work.obj = job;
2893         job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2894         job->tmcf = tmcf;
2895 
2896         tmcf->count++;
2897     }
2898 
2899     return NXT_OK;
2900 }
2901 
2902 
2903 static nxt_int_t
2904 nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
2905     nxt_router_temp_conf_t *tmcf)
2906 {
2907     nxt_int_t                 ret;
2908     nxt_uint_t                i, threads;
2909     nxt_router_engine_conf_t  *recf;
2910 
2911     recf = tmcf->engines->elts;
2912     threads = tmcf->router_conf->threads;
2913 
2914     for (i = tmcf->new_threads; i < threads; i++) {
2915         ret = nxt_router_thread_create(task, rt, recf[i].engine);
2916         if (nxt_slow_path(ret != NXT_OK)) {
2917             return ret;
2918         }
2919     }
2920 
2921     return NXT_OK;
2922 }
2923 
2924 
2925 static nxt_int_t
2926 nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
2927     nxt_event_engine_t *engine)
2928 {
2929     nxt_int_t            ret;
2930     nxt_thread_link_t    *link;
2931     nxt_thread_handle_t  handle;
2932 
2933     link = nxt_zalloc(sizeof(nxt_thread_link_t));
2934 
2935     if (nxt_slow_path(link == NULL)) {
2936         return NXT_ERROR;
2937     }
2938 
2939     link->start = nxt_router_thread_start;
2940     link->engine = engine;
2941     link->work.handler = nxt_router_thread_exit_handler;
2942     link->work.task = task;
2943     link->work.data = link;
2944 
2945     nxt_queue_insert_tail(&rt->engines, &engine->link);
2946 
2947     ret = nxt_thread_create(&handle, link);
2948 
2949     if (nxt_slow_path(ret != NXT_OK)) {
2950         nxt_queue_remove(&engine->link);
2951     }
2952 
2953     return ret;
2954 }
2955 
2956 
2957 static void
2958 nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
2959     nxt_router_temp_conf_t *tmcf)
2960 {
2961     nxt_app_t  *app;
2962 
2963     nxt_queue_each(app, &router->apps, nxt_app_t, link) {
2964 
2965         nxt_router_app_unlink(task, app);
2966 
2967     } nxt_queue_loop;
2968 
2969     nxt_queue_add(&router->apps, &tmcf->previous);
2970     nxt_queue_add(&router->apps, &tmcf->apps);
2971 }
2972 
2973 
2974 static void
2975 nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf)
2976 {
2977     nxt_uint_t                n;
2978     nxt_event_engine_t        *engine;
2979     nxt_router_engine_conf_t  *recf;
2980 
2981     recf = tmcf->engines->elts;
2982 
2983     for (n = tmcf->engines->nelts; n != 0; n--) {
2984         engine = recf->engine;
2985 
2986         switch (recf->action) {
2987 
2988         case NXT_ROUTER_ENGINE_KEEP:
2989             break;
2990 
2991         case NXT_ROUTER_ENGINE_ADD:
2992             nxt_queue_insert_tail(&router->engines, &engine->link0);
2993             break;
2994 
2995         case NXT_ROUTER_ENGINE_DELETE:
2996             nxt_queue_remove(&engine->link0);
2997             break;
2998         }
2999 
3000         nxt_router_engine_post(engine, recf->jobs);
3001 
3002         recf++;
3003     }
3004 }
3005 
3006 
3007 static void
3008 nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs)
3009 {
3010     nxt_work_t  *work, *next;
3011 
3012     for (work = jobs; work != NULL; work = next) {
3013         next = work->next;
3014         work->next = NULL;
3015 
3016         nxt_event_engine_post(engine, work);
3017     }
3018 }
3019 
3020 
3021 static nxt_port_handlers_t  nxt_router_app_port_handlers = {
3022     .rpc_error       = nxt_port_rpc_handler,
3023     .mmap            = nxt_port_mmap_handler,
3024     .data            = nxt_port_rpc_handler,
3025     .oosm            = nxt_router_oosm_handler,
3026     .req_headers_ack = nxt_port_rpc_handler,
3027 };
3028 
3029 
3030 static void
3031 nxt_router_thread_start(void *data)
3032 {
3033     nxt_int_t           ret;
3034     nxt_port_t          *port;
3035     nxt_task_t          *task;
3036     nxt_work_t          *work;
3037     nxt_thread_t        *thread;
3038     nxt_thread_link_t   *link;
3039     nxt_event_engine_t  *engine;
3040 
3041     link = data;
3042     engine = link->engine;
3043     task = &engine->task;
3044 
3045     thread = nxt_thread();
3046 
3047     nxt_event_engine_thread_adopt(engine);
3048 
3049     /* STUB */
3050     thread->runtime = engine->task.thread->runtime;
3051 
3052     engine->task.thread = thread;
3053     engine->task.log = thread->log;
3054     thread->engine = engine;
3055     thread->task = &engine->task;
3056 #if 0
3057     thread->fiber = &engine->fibers->fiber;
3058 #endif
3059 
3060     engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
3061     if (nxt_slow_path(engine->mem_pool == NULL)) {
3062         return;
3063     }
3064 
3065     port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid,
3066                         NXT_PROCESS_ROUTER);
3067     if (nxt_slow_path(port == NULL)) {
3068         return;
3069     }
3070 
3071     ret = nxt_port_socket_init(task, port, 0);
3072     if (nxt_slow_path(ret != NXT_OK)) {
3073         nxt_port_use(task, port, -1);
3074         return;
3075     }
3076 
3077     ret = nxt_router_port_queue_init(task, port);
3078     if (nxt_slow_path(ret != NXT_OK)) {
3079         nxt_port_use(task, port, -1);
3080         return;
3081     }
3082 
3083     engine->port = port;
3084 
3085     nxt_port_enable(task, port, &nxt_router_app_port_handlers);
3086 
3087     work = nxt_zalloc(sizeof(nxt_work_t));
3088     if (nxt_slow_path(work == NULL)) {
3089         return;
3090     }
3091 
3092     work->handler = nxt_router_rt_add_port;
3093     work->task = link->work.task;
3094     work->obj = work;
3095     work->data = port;
3096 
3097     nxt_event_engine_post(link->work.task->thread->engine, work);
3098 
3099     nxt_event_engine_start(engine);
3100 }
3101 
3102 
3103 static void
3104 nxt_router_rt_add_port(nxt_task_t *task, void *obj, void *data)
3105 {
3106     nxt_int_t      res;
3107     nxt_port_t     *port;
3108     nxt_runtime_t  *rt;
3109 
3110     rt = task->thread->runtime;
3111     port = data;
3112 
3113     nxt_free(obj);
3114 
3115     res = nxt_port_hash_add(&rt->ports, port);
3116 
3117     if (nxt_fast_path(res == NXT_OK)) {
3118         nxt_port_use(task, port, 1);
3119     }
3120 }
3121 
3122 
3123 static void
3124 nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data)
3125 {
3126     nxt_joint_job_t          *job;
3127     nxt_socket_conf_t        *skcf;
3128     nxt_listen_event_t       *lev;
3129     nxt_listen_socket_t      *ls;
3130     nxt_thread_spinlock_t    *lock;
3131     nxt_socket_conf_joint_t  *joint;
3132 
3133     job = obj;
3134     joint = data;
3135 
3136     nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link);
3137 
3138     skcf = joint->socket_conf;
3139     ls = skcf->listen;
3140 
3141     lev = nxt_listen_event(task, ls);
3142     if (nxt_slow_path(lev == NULL)) {
3143         nxt_router_listen_socket_release(task, skcf);
3144         return;
3145     }
3146 
3147     lev->socket.data = joint;
3148 
3149     lock = &skcf->router_conf->router->lock;
3150 
3151     nxt_thread_spin_lock(lock);
3152     ls->count++;
3153     nxt_thread_spin_unlock(lock);
3154 
3155     job->work.next = NULL;
3156     job->work.handler = nxt_router_conf_wait;
3157 
3158     nxt_event_engine_post(job->tmcf->engine, &job->work);
3159 }
3160 
3161 
3162 nxt_inline nxt_listen_event_t *
3163 nxt_router_listen_event(nxt_queue_t *listen_connections,
3164     nxt_socket_conf_t *skcf)
3165 {
3166     nxt_socket_t        fd;
3167     nxt_queue_link_t    *qlk;
3168     nxt_listen_event_t  *lev;
3169 
3170     fd = skcf->listen->socket;
3171 
3172     for (qlk = nxt_queue_first(listen_connections);
3173          qlk != nxt_queue_tail(listen_connections);
3174          qlk = nxt_queue_next(qlk))
3175     {
3176         lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link);
3177 
3178         if (fd == lev->socket.fd) {
3179             return lev;
3180         }
3181     }
3182 
3183     return NULL;
3184 }
3185 
3186 
3187 static void
3188 nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data)
3189 {
3190     nxt_joint_job_t          *job;
3191     nxt_event_engine_t       *engine;
3192     nxt_listen_event_t       *lev;
3193     nxt_socket_conf_joint_t  *joint, *old;
3194 
3195     job = obj;
3196     joint = data;
3197 
3198     engine = task->thread->engine;
3199 
3200     nxt_queue_insert_tail(&engine->joints, &joint->link);
3201 
3202     lev = nxt_router_listen_event(&engine->listen_connections,
3203                                   joint->socket_conf);
3204 
3205     old = lev->socket.data;
3206     lev->socket.data = joint;
3207     lev->listen = joint->socket_conf->listen;
3208 
3209     job->work.next = NULL;
3210     job->work.handler = nxt_router_conf_wait;
3211 
3212     nxt_event_engine_post(job->tmcf->engine, &job->work);
3213 
3214     /*
3215      * The task is allocated from configuration temporary
3216      * memory pool so it can be freed after engine post operation.
3217      */
3218 
3219     nxt_router_conf_release(&engine->task, old);
3220 }
3221 
3222 
3223 static void
3224 nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data)
3225 {
3226     nxt_socket_conf_t        *skcf;
3227     nxt_listen_event_t       *lev;
3228     nxt_event_engine_t       *engine;
3229     nxt_socket_conf_joint_t  *joint;
3230 
3231     skcf = data;
3232 
3233     engine = task->thread->engine;
3234 
3235     lev = nxt_router_listen_event(&engine->listen_connections, skcf);
3236 
3237     nxt_fd_event_delete(engine, &lev->socket);
3238 
3239     nxt_debug(task, "engine %p: listen socket delete: %d", engine,
3240               lev->socket.fd);
3241 
3242     joint = lev->socket.data;
3243     joint->close_job = obj;
3244 
3245     lev->timer.handler = nxt_router_listen_socket_close;
3246     lev->timer.work_queue = &engine->fast_work_queue;
3247 
3248     nxt_timer_add(engine, &lev->timer, 0);
3249 }
3250 
3251 
3252 static void
3253 nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, void *data)
3254 {
3255     nxt_event_engine_t  *engine;
3256 
3257     nxt_debug(task, "router worker thread quit");
3258 
3259     engine = task->thread->engine;
3260 
3261     engine->shutdown = 1;
3262 
3263     if (nxt_queue_is_empty(&engine->joints)) {
3264         nxt_thread_exit(task->thread);
3265     }
3266 }
3267 
3268 
3269 static void
3270 nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data)
3271 {
3272     nxt_timer_t              *timer;
3273     nxt_joint_job_t          *job;
3274     nxt_listen_event_t       *lev;
3275     nxt_socket_conf_joint_t  *joint;
3276 
3277     timer = obj;
3278     lev = nxt_timer_data(timer, nxt_listen_event_t, timer);
3279 
3280     nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine,
3281               lev->socket.fd);
3282 
3283     nxt_queue_remove(&lev->link);
3284 
3285     joint = lev->socket.data;
3286     lev->socket.data = NULL;
3287 
3288     job = joint->close_job;
3289     job->work.next = NULL;
3290     job->work.handler = nxt_router_conf_wait;
3291 
3292     nxt_event_engine_post(job->tmcf->engine, &job->work);
3293 
3294     /* 'task' refers to lev->task and we cannot use after nxt_free() */
3295     task = &task->thread->engine->task;
3296 
3297     nxt_router_listen_socket_release(task, joint->socket_conf);
3298 
3299     nxt_router_listen_event_release(task, lev, joint);
3300 }
3301 
3302 
3303 static void
3304 nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf)
3305 {
3306     nxt_listen_socket_t    *ls;
3307     nxt_thread_spinlock_t  *lock;
3308 
3309     ls = skcf->listen;
3310     lock = &skcf->router_conf->router->lock;
3311 
3312     nxt_thread_spin_lock(lock);
3313 
3314     nxt_debug(task, "engine %p: listen socket release: ls->count %D",
3315               task->thread->engine, ls->count);
3316 
3317     if (--ls->count != 0) {
3318         ls = NULL;
3319     }
3320 
3321     nxt_thread_spin_unlock(lock);
3322 
3323     if (ls != NULL) {
3324         nxt_socket_close(task, ls->socket);
3325         nxt_free(ls);
3326     }
3327 }
3328 
3329 
3330 void
3331 nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev,
3332     nxt_socket_conf_joint_t *joint)
3333 {
3334     nxt_event_engine_t  *engine;
3335 
3336     nxt_debug(task, "listen event count: %D", lev->count);
3337 
3338     engine = task->thread->engine;
3339 
3340     if (--lev->count == 0) {
3341         if (lev->next != NULL) {
3342             nxt_sockaddr_cache_free(engine, lev->next);
3343 
3344             nxt_conn_free(task, lev->next);
3345         }
3346 
3347         nxt_free(lev);
3348     }
3349 
3350     if (joint != NULL) {
3351         nxt_router_conf_release(task, joint);
3352     }
3353 
3354     if (engine->shutdown && nxt_queue_is_empty(&engine->joints)) {
3355         nxt_thread_exit(task->thread);
3356     }
3357 }
3358 
3359 
3360 void
3361 nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
3362 {
3363     nxt_socket_conf_t      *skcf;
3364     nxt_router_conf_t      *rtcf;
3365     nxt_thread_spinlock_t  *lock;
3366 
3367     nxt_debug(task, "conf joint %p count: %D", joint, joint->count);
3368 
3369     if (--joint->count != 0) {
3370         return;
3371     }
3372 
3373     nxt_queue_remove(&joint->link);
3374 
3375     /*
3376      * The joint content can not be safely used after the critical
3377      * section protected by the spinlock because its memory pool may
3378      * be already destroyed by another thread.
3379      */
3380     skcf = joint->socket_conf;
3381     rtcf = skcf->router_conf;
3382     lock = &rtcf->router->lock;
3383 
3384     nxt_thread_spin_lock(lock);
3385 
3386     nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count,
3387               rtcf, rtcf->count);
3388 
3389     if (--skcf->count != 0) {
3390         skcf = NULL;
3391         rtcf = NULL;
3392 
3393     } else {
3394         nxt_queue_remove(&skcf->link);
3395 
3396         if (--rtcf->count != 0) {
3397             rtcf = NULL;
3398         }
3399     }
3400 
3401     nxt_thread_spin_unlock(lock);
3402 
3403 #if (NXT_TLS)
3404     if (skcf != NULL && skcf->tls != NULL) {
3405         task->thread->runtime->tls->server_free(task, skcf->tls);
3406     }
3407 #endif
3408 
3409     /* TODO remove engine->port */
3410 
3411     if (rtcf != NULL) {
3412         nxt_debug(task, "old router conf is destroyed");
3413 
3414         nxt_router_apps_hash_use(task, rtcf, -1);
3415 
3416         nxt_router_access_log_release(task, lock, rtcf->access_log);
3417 
3418         nxt_mp_thread_adopt(rtcf->mem_pool);
3419 
3420         nxt_mp_destroy(rtcf->mem_pool);
3421     }
3422 }
3423 
3424 
3425 static void
3426 nxt_router_access_log_writer(nxt_task_t *task, nxt_http_request_t *r,
3427     nxt_router_access_log_t *access_log)
3428 {
3429     size_t     size;
3430     u_char     *buf, *p;
3431     nxt_off_t  bytes;
3432 
3433     static nxt_time_string_t  date_cache = {
3434         (nxt_atomic_uint_t) -1,
3435         nxt_router_access_log_date,
3436         "%02d/%s/%4d:%02d:%02d:%02d %c%02d%02d",
3437         nxt_length("31/Dec/1986:19:40:00 +0300"),
3438         NXT_THREAD_TIME_LOCAL,
3439         NXT_THREAD_TIME_SEC,
3440     };
3441 
3442     size = r->remote->address_length
3443            + 6                  /* ' - - [' */
3444            + date_cache.size
3445            + 3                  /* '] "' */
3446            + r->method->length
3447            + 1                  /* space */
3448            + r->target.length
3449            + 1                  /* space */
3450            + r->version.length
3451            + 2                  /* '" ' */
3452            + 3                  /* status */
3453            + 1                  /* space */
3454            + NXT_OFF_T_LEN
3455            + 2                  /* ' "' */
3456            + (r->referer != NULL ? r->referer->value_length : 1)
3457            + 3                  /* '" "' */
3458            + (r->user_agent != NULL ? r->user_agent->value_length : 1)
3459            + 2                  /* '"\n' */
3460     ;
3461 
3462     buf = nxt_mp_nget(r->mem_pool, size);
3463     if (nxt_slow_path(buf == NULL)) {
3464         return;
3465     }
3466 
3467     p = nxt_cpymem(buf, nxt_sockaddr_address(r->remote),
3468                    r->remote->address_length);
3469 
3470     p = nxt_cpymem(p, " - - [", 6);
3471 
3472     p = nxt_thread_time_string(task->thread, &date_cache, p);
3473 
3474     p = nxt_cpymem(p, "] \"", 3);
3475 
3476     if (r->method->length != 0) {
3477         p = nxt_cpymem(p, r->method->start, r->method->length);
3478 
3479         if (r->target.length != 0) {
3480             *p++ = ' ';
3481             p = nxt_cpymem(p, r->target.start, r->target.length);
3482 
3483             if (r->version.length != 0) {
3484                 *p++ = ' ';
3485                 p = nxt_cpymem(p, r->version.start, r->version.length);
3486             }
3487         }
3488 
3489     } else {
3490         *p++ = '-';
3491     }
3492 
3493     p = nxt_cpymem(p, "\" ", 2);
3494 
3495     p = nxt_sprintf(p, p + 3, "%03d", r->status);
3496 
3497     *p++ = ' ';
3498 
3499     bytes = nxt_http_proto[r->protocol].body_bytes_sent(task, r->proto);
3500 
3501     p = nxt_sprintf(p, p + NXT_OFF_T_LEN, "%O", bytes);
3502 
3503     p = nxt_cpymem(p, " \"", 2);
3504 
3505     if (r->referer != NULL) {
3506         p = nxt_cpymem(p, r->referer->value, r->referer->value_length);
3507 
3508     } else {
3509         *p++ = '-';
3510     }
3511 
3512     p = nxt_cpymem(p, "\" \"", 3);
3513 
3514     if (r->user_agent != NULL) {
3515         p = nxt_cpymem(p, r->user_agent->value, r->user_agent->value_length);
3516 
3517     } else {
3518         *p++ = '-';
3519     }
3520 
3521     p = nxt_cpymem(p, "\"\n", 2);
3522 
3523     nxt_fd_write(access_log->fd, buf, p - buf);
3524 }
3525 
3526 
3527 static u_char *
3528 nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, struct tm *tm,
3529     size_t size, const char *format)
3530 {
3531     u_char  sign;
3532     time_t  gmtoff;
3533 
3534     static const char  *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun",
3535                                     "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
3536 
3537     gmtoff = nxt_timezone(tm) / 60;
3538 
3539     if (gmtoff < 0) {
3540         gmtoff = -gmtoff;
3541         sign = '-';
3542 
3543     } else {
3544         sign = '+';
3545     }
3546 
3547     return nxt_sprintf(buf, buf + size, format,
3548                        tm->tm_mday, month[tm->tm_mon], tm->tm_year + 1900,
3549                        tm->tm_hour, tm->tm_min, tm->tm_sec,
3550                        sign, gmtoff / 60, gmtoff % 60);
3551 }
3552 
3553 
3554 static void
3555 nxt_router_access_log_open(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
3556 {
3557     uint32_t                 stream;
3558     nxt_int_t                ret;
3559     nxt_buf_t                *b;
3560     nxt_port_t               *main_port, *router_port;
3561     nxt_runtime_t            *rt;
3562     nxt_router_access_log_t  *access_log;
3563 
3564     access_log = tmcf->router_conf->access_log;
3565 
3566     b = nxt_buf_mem_alloc(tmcf->mem_pool, access_log->path.length + 1, 0);
3567     if (nxt_slow_path(b == NULL)) {
3568         goto fail;
3569     }
3570 
3571     b->completion_handler = nxt_router_dummy_buf_completion;
3572 
3573     nxt_buf_cpystr(b, &access_log->path);
3574     *b->mem.free++ = '\0';
3575 
3576     rt = task->thread->runtime;
3577     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
3578     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
3579 
3580     stream = nxt_port_rpc_register_handler(task, router_port,
3581                                            nxt_router_access_log_ready,
3582                                            nxt_router_access_log_error,
3583                                            -1, tmcf);
3584     if (nxt_slow_path(stream == 0)) {
3585         goto fail;
3586     }
3587 
3588     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1,
3589                                 stream, router_port->id, b);
3590 
3591     if (nxt_slow_path(ret != NXT_OK)) {
3592         nxt_port_rpc_cancel(task, router_port, stream);
3593         goto fail;
3594     }
3595 
3596     return;
3597 
3598 fail:
3599 
3600     nxt_router_conf_error(task, tmcf);
3601 }
3602 
3603 
3604 static void
3605 nxt_router_access_log_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3606     void *data)
3607 {
3608     nxt_router_temp_conf_t   *tmcf;
3609     nxt_router_access_log_t  *access_log;
3610 
3611     tmcf = data;
3612 
3613     access_log = tmcf->router_conf->access_log;
3614 
3615     access_log->fd = msg->fd[0];
3616 
3617     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
3618                        nxt_router_conf_apply, task, tmcf, NULL);
3619 }
3620 
3621 
3622 static void
3623 nxt_router_access_log_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3624     void *data)
3625 {
3626     nxt_router_temp_conf_t  *tmcf;
3627 
3628     tmcf = data;
3629 
3630     nxt_router_conf_error(task, tmcf);
3631 }
3632 
3633 
3634 static void
3635 nxt_router_access_log_release(nxt_task_t *task, nxt_thread_spinlock_t *lock,
3636     nxt_router_access_log_t *access_log)
3637 {
3638     if (access_log == NULL) {
3639         return;
3640     }
3641 
3642     nxt_thread_spin_lock(lock);
3643 
3644     if (--access_log->count != 0) {
3645         access_log = NULL;
3646     }
3647 
3648     nxt_thread_spin_unlock(lock);
3649 
3650     if (access_log != NULL) {
3651 
3652         if (access_log->fd != -1) {
3653             nxt_fd_close(access_log->fd);
3654         }
3655 
3656         nxt_free(access_log);
3657     }
3658 }
3659 
3660 
3661 typedef struct {
3662     nxt_mp_t                 *mem_pool;
3663     nxt_router_access_log_t  *access_log;
3664 } nxt_router_access_log_reopen_t;
3665 
3666 
3667 static void
3668 nxt_router_access_log_reopen_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
3669 {
3670     nxt_mp_t                        *mp;
3671     uint32_t                        stream;
3672     nxt_int_t                       ret;
3673     nxt_buf_t                       *b;
3674     nxt_port_t                      *main_port, *router_port;
3675     nxt_runtime_t                   *rt;
3676     nxt_router_access_log_t         *access_log;
3677     nxt_router_access_log_reopen_t  *reopen;
3678 
3679     access_log = nxt_router->access_log;
3680 
3681     if (access_log == NULL) {
3682         return;
3683     }
3684 
3685     mp = nxt_mp_create(1024, 128, 256, 32);
3686     if (nxt_slow_path(mp == NULL)) {
3687         return;
3688     }
3689 
3690     reopen = nxt_mp_get(mp, sizeof(nxt_router_access_log_reopen_t));
3691     if (nxt_slow_path(reopen == NULL)) {
3692         goto fail;
3693     }
3694 
3695     reopen->mem_pool = mp;
3696     reopen->access_log = access_log;
3697 
3698     b = nxt_buf_mem_alloc(mp, access_log->path.length + 1, 0);
3699     if (nxt_slow_path(b == NULL)) {
3700         goto fail;
3701     }
3702 
3703     b->completion_handler = nxt_router_access_log_reopen_completion;
3704 
3705     nxt_buf_cpystr(b, &access_log->path);
3706     *b->mem.free++ = '\0';
3707 
3708     rt = task->thread->runtime;
3709     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
3710     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
3711 
3712     stream = nxt_port_rpc_register_handler(task, router_port,
3713                                            nxt_router_access_log_reopen_ready,
3714                                            nxt_router_access_log_reopen_error,
3715                                            -1, reopen);
3716     if (nxt_slow_path(stream == 0)) {
3717         goto fail;
3718     }
3719 
3720     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1,
3721                                 stream, router_port->id, b);
3722 
3723     if (nxt_slow_path(ret != NXT_OK)) {
3724         nxt_port_rpc_cancel(task, router_port, stream);
3725         goto fail;
3726     }
3727 
3728     nxt_mp_retain(mp);
3729 
3730     return;
3731 
3732 fail:
3733 
3734     nxt_mp_destroy(mp);
3735 }
3736 
3737 
3738 static void
3739 nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj, void *data)
3740 {
3741     nxt_mp_t   *mp;
3742     nxt_buf_t  *b;
3743 
3744     b = obj;
3745     mp = b->data;
3746 
3747     nxt_mp_release(mp);
3748 }
3749 
3750 
3751 static void
3752 nxt_router_access_log_reopen_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3753     void *data)
3754 {
3755     nxt_router_access_log_t         *access_log;
3756     nxt_router_access_log_reopen_t  *reopen;
3757 
3758     reopen = data;
3759 
3760     access_log = reopen->access_log;
3761 
3762     if (access_log == nxt_router->access_log) {
3763 
3764         if (nxt_slow_path(dup2(msg->fd[0], access_log->fd) == -1)) {
3765             nxt_alert(task, "dup2(%FD, %FD) failed %E",
3766                       msg->fd[0], access_log->fd, nxt_errno);
3767         }
3768     }
3769 
3770     nxt_fd_close(msg->fd[0]);
3771     nxt_mp_release(reopen->mem_pool);
3772 }
3773 
3774 
3775 static void
3776 nxt_router_access_log_reopen_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3777     void *data)
3778 {
3779     nxt_router_access_log_reopen_t  *reopen;
3780 
3781     reopen = data;
3782 
3783     nxt_mp_release(reopen->mem_pool);
3784 }
3785 
3786 
3787 static void
3788 nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data)
3789 {
3790     nxt_port_t           *port;
3791     nxt_thread_link_t    *link;
3792     nxt_event_engine_t   *engine;
3793     nxt_thread_handle_t  handle;
3794 
3795     handle = (nxt_thread_handle_t) (uintptr_t) obj;
3796     link = data;
3797 
3798     nxt_thread_wait(handle);
3799 
3800     engine = link->engine;
3801 
3802     nxt_queue_remove(&engine->link);
3803 
3804     port = engine->port;
3805 
3806     // TODO notify all apps
3807 
3808     port->engine = task->thread->engine;
3809     nxt_mp_thread_adopt(port->mem_pool);
3810     nxt_port_use(task, port, -1);
3811 
3812     nxt_mp_thread_adopt(engine->mem_pool);
3813     nxt_mp_destroy(engine->mem_pool);
3814 
3815     nxt_event_engine_free(engine);
3816 
3817     nxt_free(link);
3818 }
3819 
3820 
3821 static void
3822 nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3823     void *data)
3824 {
3825     size_t                  b_size, count;
3826     nxt_int_t               ret;
3827     nxt_app_t               *app;
3828     nxt_buf_t               *b, *next;
3829     nxt_port_t              *app_port;
3830     nxt_unit_field_t        *f;
3831     nxt_http_field_t        *field;
3832     nxt_http_request_t      *r;
3833     nxt_unit_response_t     *resp;
3834     nxt_request_rpc_data_t  *req_rpc_data;
3835 
3836     req_rpc_data = data;
3837 
3838     r = req_rpc_data->request;
3839     if (nxt_slow_path(r == NULL)) {
3840         return;
3841     }
3842 
3843     if (r->error) {
3844         nxt_request_rpc_data_unlink(task, req_rpc_data);
3845         return;
3846     }
3847 
3848     app = req_rpc_data->app;
3849     nxt_assert(app != NULL);
3850 
3851     if (msg->port_msg.type == _NXT_PORT_MSG_REQ_HEADERS_ACK) {
3852         nxt_router_req_headers_ack_handler(task, msg, req_rpc_data);
3853 
3854         return;
3855     }
3856 
3857     b = (msg->size == 0) ? NULL : msg->buf;
3858 
3859     if (msg->port_msg.last != 0) {
3860         nxt_debug(task, "router data create last buf");
3861 
3862         nxt_buf_chain_add(&b, nxt_http_buf_last(r));
3863 
3864         req_rpc_data->rpc_cancel = 0;
3865 
3866         if (req_rpc_data->apr_action == NXT_APR_REQUEST_FAILED) {
3867             req_rpc_data->apr_action = NXT_APR_GOT_RESPONSE;
3868         }
3869 
3870         nxt_request_rpc_data_unlink(task, req_rpc_data);
3871 
3872     } else {
3873         if (app->timeout != 0) {
3874             r->timer.handler = nxt_router_app_timeout;
3875             r->timer_data = req_rpc_data;
3876             nxt_timer_add(task->thread->engine, &r->timer, app->timeout);
3877         }
3878     }
3879 
3880     if (b == NULL) {
3881         return;
3882     }
3883 
3884     if (msg->buf == b) {
3885         /* Disable instant buffer completion/re-using by port. */
3886         msg->buf = NULL;
3887     }
3888 
3889     if (r->header_sent) {
3890         nxt_buf_chain_add(&r->out, b);
3891         nxt_http_request_send_body(task, r, NULL);
3892 
3893     } else {
3894         b_size = nxt_buf_is_mem(b) ? nxt_buf_mem_used_size(&b->mem) : 0;
3895 
3896         if (nxt_slow_path(b_size < sizeof(nxt_unit_response_t))) {
3897             nxt_alert(task, "response buffer too small: %z", b_size);
3898             goto fail;
3899         }
3900 
3901         resp = (void *) b->mem.pos;
3902         count = (b_size - sizeof(nxt_unit_response_t))
3903                     / sizeof(nxt_unit_field_t);
3904 
3905         if (nxt_slow_path(count < resp->fields_count)) {
3906             nxt_alert(task, "response buffer too small for fields count: %D",
3907                       resp->fields_count);
3908             goto fail;
3909         }
3910 
3911         field = NULL;
3912 
3913         for (f = resp->fields; f < resp->fields + resp->fields_count; f++) {
3914             if (f->skip) {
3915                 continue;
3916             }
3917 
3918             field = nxt_list_add(r->resp.fields);
3919 
3920             if (nxt_slow_path(field == NULL)) {
3921                 goto fail;
3922             }
3923 
3924             field->hash = f->hash;
3925             field->skip = 0;
3926             field->hopbyhop = 0;
3927 
3928             field->name_length = f->name_length;
3929             field->value_length = f->value_length;
3930             field->name = nxt_unit_sptr_get(&f->name);
3931             field->value = nxt_unit_sptr_get(&f->value);
3932 
3933             ret = nxt_http_field_process(field, &nxt_response_fields_hash, r);
3934             if (nxt_slow_path(ret != NXT_OK)) {
3935                 goto fail;
3936             }
3937 
3938             nxt_debug(task, "header%s: %*s: %*s",
3939                       (field->skip ? " skipped" : ""),
3940                       (size_t) field->name_length, field->name,
3941                       (size_t) field->value_length, field->value);
3942 
3943             if (field->skip) {
3944                 r->resp.fields->last->nelts--;
3945             }
3946         }
3947 
3948         r->status = resp->status;
3949 
3950         if (resp->piggyback_content_length != 0) {
3951             b->mem.pos = nxt_unit_sptr_get(&resp->piggyback_content);
3952             b->mem.free = b->mem.pos + resp->piggyback_content_length;
3953 
3954         } else {
3955             b->mem.pos = b->mem.free;
3956         }
3957 
3958         if (nxt_buf_mem_used_size(&b->mem) == 0) {
3959             next = b->next;
3960             b->next = NULL;
3961 
3962             nxt_work_queue_add(&task->thread->engine->fast_work_queue,
3963                                b->completion_handler, task, b, b->parent);
3964 
3965             b = next;
3966         }
3967 
3968         if (b != NULL) {
3969             nxt_buf_chain_add(&r->out, b);
3970         }
3971 
3972         nxt_http_request_header_send(task, r, nxt_http_request_send_body, NULL);
3973 
3974         if (r->websocket_handshake
3975             && r->status == NXT_HTTP_SWITCHING_PROTOCOLS)
3976         {
3977             app_port = req_rpc_data->app_port;
3978             if (nxt_slow_path(app_port == NULL)) {
3979                 goto fail;
3980             }
3981 
3982             nxt_thread_mutex_lock(&app->mutex);
3983 
3984             app_port->main_app_port->active_websockets++;
3985 
3986             nxt_thread_mutex_unlock(&app->mutex);
3987 
3988             nxt_router_app_port_release(task, app_port, NXT_APR_UPGRADE);
3989             req_rpc_data->apr_action = NXT_APR_CLOSE;
3990 
3991             nxt_debug(task, "stream #%uD upgrade", req_rpc_data->stream);
3992 
3993             r->state = &nxt_http_websocket;
3994 
3995         } else {
3996             r->state = &nxt_http_request_send_state;
3997         }
3998     }
3999 
4000     return;
4001 
4002 fail:
4003 
4004     nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
4005 
4006     nxt_request_rpc_data_unlink(task, req_rpc_data);
4007 }
4008 
4009 
4010 static void
4011 nxt_router_req_headers_ack_handler(nxt_task_t *task,
4012     nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data)
4013 {
4014     int                 res;
4015     nxt_app_t           *app;
4016     nxt_buf_t           *b;
4017     nxt_bool_t          start_process, unlinked;
4018     nxt_port_t          *app_port, *main_app_port, *idle_port;
4019     nxt_queue_link_t    *idle_lnk;
4020     nxt_http_request_t  *r;
4021 
4022     nxt_debug(task, "stream #%uD: got ack from %PI:%d",
4023               req_rpc_data->stream,
4024               msg->port_msg.pid, msg->port_msg.reply_port);
4025 
4026     nxt_port_rpc_ex_set_peer(task, msg->port, req_rpc_data,
4027                              msg->port_msg.pid);
4028 
4029     app = req_rpc_data->app;
4030     r = req_rpc_data->request;
4031 
4032     start_process = 0;
4033     unlinked = 0;
4034 
4035     nxt_thread_mutex_lock(&app->mutex);
4036 
4037     if (r->app_link.next != NULL) {
4038         nxt_queue_remove(&r->app_link);
4039         r->app_link.next = NULL;
4040 
4041         unlinked = 1;
4042     }
4043 
4044     app_port = nxt_port_hash_find(&app->port_hash, msg->port_msg.pid,
4045                                   msg->port_msg.reply_port);
4046     if (nxt_slow_path(app_port == NULL)) {
4047         nxt_thread_mutex_unlock(&app->mutex);
4048 
4049         nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
4050 
4051         if (unlinked) {
4052             nxt_mp_release(r->mem_pool);
4053         }
4054 
4055         return;
4056     }
4057 
4058     main_app_port = app_port->main_app_port;
4059 
4060     if (nxt_queue_chk_remove(&main_app_port->idle_link)) {
4061         app->idle_processes--;
4062 
4063         nxt_debug(task, "app '%V' move port %PI:%d out of %s (ack)",
4064                   &app->name, main_app_port->pid, main_app_port->id,
4065                   (main_app_port->idle_start ? "idle_ports" : "spare_ports"));
4066 
4067         /* Check port was in 'spare_ports' using idle_start field. */
4068         if (main_app_port->idle_start == 0
4069             && app->idle_processes >= app->spare_processes)
4070         {
4071             /*
4072              * If there is a vacant space in spare ports,
4073              * move the last idle to spare_ports.
4074              */
4075             nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4076 
4077             idle_lnk = nxt_queue_last(&app->idle_ports);
4078             idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link);
4079             nxt_queue_remove(idle_lnk);
4080 
4081             nxt_queue_insert_tail(&app->spare_ports, idle_lnk);
4082 
4083             idle_port->idle_start = 0;
4084 
4085             nxt_debug(task, "app '%V' move port %PI:%d from idle_ports "
4086                       "to spare_ports",
4087                       &app->name, idle_port->pid, idle_port->id);
4088         }
4089 
4090         if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) {
4091             app->pending_processes++;
4092             start_process = 1;
4093         }
4094     }
4095 
4096     main_app_port->active_requests++;
4097 
4098     nxt_port_inc_use(app_port);
4099 
4100     nxt_thread_mutex_unlock(&app->mutex);
4101 
4102     if (unlinked) {
4103         nxt_mp_release(r->mem_pool);
4104     }
4105 
4106     if (start_process) {
4107         nxt_router_start_app_process(task, app);
4108     }
4109 
4110     nxt_port_use(task, req_rpc_data->app_port, -1);
4111 
4112     req_rpc_data->app_port = app_port;
4113 
4114     b = req_rpc_data->msg_info.buf;
4115 
4116     if (b != NULL) {
4117         /* First buffer is already sent.  Start from second. */
4118         b = b->next;
4119 
4120         req_rpc_data->msg_info.buf->next = NULL;
4121     }
4122 
4123     if (req_rpc_data->msg_info.body_fd != -1 || b != NULL) {
4124         nxt_debug(task, "stream #%uD: send body fd %d", req_rpc_data->stream,
4125                   req_rpc_data->msg_info.body_fd);
4126 
4127         if (req_rpc_data->msg_info.body_fd != -1) {
4128             lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET);
4129         }
4130 
4131         res = nxt_port_socket_write(task, app_port, NXT_PORT_MSG_REQ_BODY,
4132                                     req_rpc_data->msg_info.body_fd,
4133                                     req_rpc_data->stream,
4134                                     task->thread->engine->port->id, b);
4135 
4136         if (nxt_slow_path(res != NXT_OK)) {
4137             nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
4138         }
4139     }
4140 
4141     if (app->timeout != 0) {
4142         r->timer.handler = nxt_router_app_timeout;
4143         r->timer_data = req_rpc_data;
4144         nxt_timer_add(task->thread->engine, &r->timer, app->timeout);
4145     }
4146 }
4147 
4148 
4149 static const nxt_http_request_state_t  nxt_http_request_send_state
4150     nxt_aligned(64) =
4151 {
4152     .error_handler = nxt_http_request_error_handler,
4153 };
4154 
4155 
4156 static void
4157 nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data)
4158 {
4159     nxt_buf_t           *out;
4160     nxt_http_request_t  *r;
4161 
4162     r = obj;
4163 
4164     out = r->out;
4165 
4166     if (out != NULL) {
4167         r->out = NULL;
4168         nxt_http_request_send(task, r, out);
4169     }
4170 }
4171 
4172 
4173 static void
4174 nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4175     void *data)
4176 {
4177     nxt_request_rpc_data_t  *req_rpc_data;
4178 
4179     req_rpc_data = data;
4180 
4181     req_rpc_data->rpc_cancel = 0;
4182 
4183     /* TODO cancel message and return if cancelled. */
4184     // nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream);
4185 
4186     if (req_rpc_data->request != NULL) {
4187         nxt_http_request_error(task, req_rpc_data->request,
4188                                NXT_HTTP_SERVICE_UNAVAILABLE);
4189     }
4190 
4191     nxt_request_rpc_data_unlink(task, req_rpc_data);
4192 }
4193 
4194 
4195 static void
4196 nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4197     void *data)
4198 {
4199     nxt_app_t        *app;
4200     nxt_port_t       *port;
4201     nxt_app_joint_t  *app_joint;
4202 
4203     app_joint = data;
4204     port = msg->u.new_port;
4205 
4206     nxt_assert(app_joint != NULL);
4207     nxt_assert(port != NULL);
4208     nxt_assert(port->type == NXT_PROCESS_APP);
4209     nxt_assert(port->id == 0);
4210 
4211     app = app_joint->app;
4212 
4213     nxt_router_app_joint_use(task, app_joint, -1);
4214 
4215     if (nxt_slow_path(app == NULL)) {
4216         nxt_debug(task, "new port ready for released app, send QUIT");
4217 
4218         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4219 
4220         return;
4221     }
4222 
4223     port->app = app;
4224     port->main_app_port = port;
4225 
4226     nxt_thread_mutex_lock(&app->mutex);
4227 
4228     nxt_assert(app->pending_processes != 0);
4229 
4230     app->pending_processes--;
4231     app->processes++;
4232     nxt_port_hash_add(&app->port_hash, port);
4233     app->port_hash_count++;
4234 
4235     nxt_thread_mutex_unlock(&app->mutex);
4236 
4237     nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d",
4238               &app->name, port->pid, app->processes, app->pending_processes);
4239 
4240     nxt_router_app_shared_port_send(task, port);
4241 
4242     nxt_router_app_port_release(task, port, NXT_APR_NEW_PORT);
4243 }
4244 
4245 
4246 static nxt_int_t
4247 nxt_router_app_shared_port_send(nxt_task_t *task, nxt_port_t *app_port)
4248 {
4249     nxt_buf_t                *b;
4250     nxt_port_t               *port;
4251     nxt_port_msg_new_port_t  *msg;
4252 
4253     b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
4254                              sizeof(nxt_port_data_t));
4255     if (nxt_slow_path(b == NULL)) {
4256         return NXT_ERROR;
4257     }
4258 
4259     port = app_port->app->shared_port;
4260 
4261     nxt_debug(task, "send port %FD to process %PI",
4262               port->pair[0], app_port->pid);
4263 
4264     b->mem.free += sizeof(nxt_port_msg_new_port_t);
4265     msg = (nxt_port_msg_new_port_t *) b->mem.pos;
4266 
4267     msg->id = port->id;
4268     msg->pid = port->pid;
4269     msg->max_size = port->max_size;
4270     msg->max_share = port->max_share;
4271     msg->type = port->type;
4272 
4273     return nxt_port_socket_write2(task, app_port,
4274                                   NXT_PORT_MSG_NEW_PORT,
4275                                   port->pair[0], port->queue_fd,
4276                                   0, 0, b);
4277 }
4278 
4279 
4280 static void
4281 nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4282     void *data)
4283 {
4284     nxt_app_t           *app;
4285     nxt_app_joint_t     *app_joint;
4286     nxt_queue_link_t    *link;
4287     nxt_http_request_t  *r;
4288 
4289     app_joint = data;
4290 
4291     nxt_assert(app_joint != NULL);
4292 
4293     app = app_joint->app;
4294 
4295     nxt_router_app_joint_use(task, app_joint, -1);
4296 
4297     if (nxt_slow_path(app == NULL)) {
4298         nxt_debug(task, "start error for released app");
4299 
4300         return;
4301     }
4302 
4303     nxt_debug(task, "app '%V' %p start error", &app->name, app);
4304 
4305     link = NULL;
4306 
4307     nxt_thread_mutex_lock(&app->mutex);
4308 
4309     nxt_assert(app->pending_processes != 0);
4310 
4311     app->pending_processes--;
4312 
4313     if (app->processes == 0 && !nxt_queue_is_empty(&app->ack_waiting_req)) {
4314         link = nxt_queue_first(&app->ack_waiting_req);
4315 
4316         nxt_queue_remove(link);
4317         link->next = NULL;
4318     }
4319 
4320     nxt_thread_mutex_unlock(&app->mutex);
4321 
4322     while (link != NULL) {
4323         r = nxt_container_of(link, nxt_http_request_t, app_link);
4324 
4325         nxt_event_engine_post(r->engine, &r->err_work);
4326 
4327         link = NULL;
4328 
4329         nxt_thread_mutex_lock(&app->mutex);
4330 
4331         if (app->processes == 0 && app->pending_processes == 0
4332             && !nxt_queue_is_empty(&app->ack_waiting_req))
4333         {
4334             link = nxt_queue_first(&app->ack_waiting_req);
4335 
4336             nxt_queue_remove(link);
4337             link->next = NULL;
4338         }
4339 
4340         nxt_thread_mutex_unlock(&app->mutex);
4341     }
4342 }
4343 
4344 
4345 
4346 nxt_inline nxt_port_t *
4347 nxt_router_app_get_port_for_quit(nxt_task_t *task, nxt_app_t *app)
4348 {
4349     nxt_port_t  *port;
4350 
4351     port = NULL;
4352 
4353     nxt_thread_mutex_lock(&app->mutex);
4354 
4355     nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
4356 
4357         /* Caller is responsible to decrease port use count. */
4358         nxt_queue_chk_remove(&port->app_link);
4359 
4360         if (nxt_queue_chk_remove(&port->idle_link)) {
4361             app->idle_processes--;
4362 
4363             nxt_debug(task, "app '%V' move port %PI:%d out of %s for quit",
4364                       &app->name, port->pid, port->id,
4365                       (port->idle_start ? "idle_ports" : "spare_ports"));
4366         }
4367 
4368         nxt_port_hash_remove(&app->port_hash, port);
4369         app->port_hash_count--;
4370 
4371         port->app = NULL;
4372         app->processes--;
4373 
4374         break;
4375 
4376     } nxt_queue_loop;
4377 
4378     nxt_thread_mutex_unlock(&app->mutex);
4379 
4380     return port;
4381 }
4382 
4383 
4384 static void
4385 nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i)
4386 {
4387     int  c;
4388 
4389     c = nxt_atomic_fetch_add(&app->use_count, i);
4390 
4391     if (i < 0 && c == -i) {
4392 
4393         if (task->thread->engine != app->engine) {
4394             nxt_event_engine_post(app->engine, &app->joint->free_app_work);
4395 
4396         } else {
4397             nxt_router_free_app(task, app->joint, NULL);
4398         }
4399     }
4400 }
4401 
4402 
4403 static void
4404 nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app)
4405 {
4406     nxt_debug(task, "app '%V' %p unlink", &app->name, app);
4407 
4408     nxt_queue_remove(&app->link);
4409 
4410     nxt_router_app_use(task, app, -1);
4411 }
4412 
4413 
4414 static void
4415 nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
4416     nxt_apr_action_t action)
4417 {
4418     int         inc_use;
4419     uint32_t    got_response, dec_requests;
4420     nxt_app_t   *app;
4421     nxt_bool_t  port_unchained, send_quit, adjust_idle_timer;
4422     nxt_port_t  *main_app_port;
4423 
4424     nxt_assert(port != NULL);
4425     nxt_assert(port->app != NULL);
4426 
4427     app = port->app;
4428 
4429     inc_use = 0;
4430     got_response = 0;
4431     dec_requests = 0;
4432 
4433     switch (action) {
4434     case NXT_APR_NEW_PORT:
4435         break;
4436     case NXT_APR_REQUEST_FAILED:
4437         dec_requests = 1;
4438         inc_use = -1;
4439         break;
4440     case NXT_APR_GOT_RESPONSE:
4441         got_response = 1;
4442         inc_use = -1;
4443         break;
4444     case NXT_APR_UPGRADE:
4445         got_response = 1;
4446         break;
4447     case NXT_APR_CLOSE:
4448         inc_use = -1;
4449         break;
4450     }
4451 
4452     nxt_debug(task, "app '%V' release port %PI:%d: %d %d", &app->name,
4453               port->pid, port->id,
4454               (int) inc_use, (int) got_response);
4455 
4456     if (port == app->shared_port) {
4457         nxt_thread_mutex_lock(&app->mutex);
4458 
4459         app->active_requests -= got_response + dec_requests;
4460 
4461         nxt_thread_mutex_unlock(&app->mutex);
4462 
4463         goto adjust_use;
4464     }
4465 
4466     main_app_port = port->main_app_port;
4467 
4468     nxt_thread_mutex_lock(&app->mutex);
4469 
4470     main_app_port->app_responses += got_response;
4471     main_app_port->active_requests -= got_response + dec_requests;
4472     app->active_requests -= got_response + dec_requests;
4473 
4474     if (main_app_port->pair[1] != -1
4475         && (app->max_requests == 0
4476             || main_app_port->app_responses < app->max_requests))
4477     {
4478         if (main_app_port->app_link.next == NULL) {
4479             nxt_queue_insert_tail(&app->ports, &main_app_port->app_link);
4480 
4481             nxt_port_inc_use(main_app_port);
4482         }
4483     }
4484 
4485     send_quit = (app->max_requests > 0
4486                  && main_app_port->app_responses >= app->max_requests);
4487 
4488     if (send_quit) {
4489         port_unchained = nxt_queue_chk_remove(&main_app_port->app_link);
4490 
4491         nxt_port_hash_remove(&app->port_hash, main_app_port);
4492         app->port_hash_count--;
4493 
4494         main_app_port->app = NULL;
4495         app->processes--;
4496 
4497     } else {
4498         port_unchained = 0;
4499     }
4500 
4501     adjust_idle_timer = 0;
4502 
4503     if (main_app_port->pair[1] != -1 && !send_quit
4504         && main_app_port->active_requests == 0
4505         && main_app_port->active_websockets == 0
4506         && main_app_port->idle_link.next == NULL)
4507     {
4508         if (app->idle_processes == app->spare_processes
4509             && app->adjust_idle_work.data == NULL)
4510         {
4511             adjust_idle_timer = 1;
4512             app->adjust_idle_work.data = app;
4513             app->adjust_idle_work.next = NULL;
4514         }
4515 
4516         if (app->idle_processes < app->spare_processes) {
4517             nxt_queue_insert_tail(&app->spare_ports, &main_app_port->idle_link);
4518 
4519             nxt_debug(task, "app '%V' move port %PI:%d to spare_ports",
4520                       &app->name, main_app_port->pid, main_app_port->id);
4521         } else {
4522             nxt_queue_insert_tail(&app->idle_ports, &main_app_port->idle_link);
4523 
4524             main_app_port->idle_start = task->thread->engine->timers.now;
4525 
4526             nxt_debug(task, "app '%V' move port %PI:%d to idle_ports",
4527                       &app->name, main_app_port->pid, main_app_port->id);
4528         }
4529 
4530         app->idle_processes++;
4531     }
4532 
4533     nxt_thread_mutex_unlock(&app->mutex);
4534 
4535     if (adjust_idle_timer) {
4536         nxt_router_app_use(task, app, 1);
4537         nxt_event_engine_post(app->engine, &app->adjust_idle_work);
4538     }
4539 
4540     /* ? */
4541     if (main_app_port->pair[1] == -1) {
4542         nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)",
4543                   &app->name, app, main_app_port, main_app_port->pid);
4544 
4545         goto adjust_use;
4546     }
4547 
4548     if (send_quit) {
4549         nxt_debug(task, "app '%V' %p send QUIT to port", &app->name, app);
4550 
4551         nxt_port_socket_write(task, main_app_port, NXT_PORT_MSG_QUIT, -1, 0, 0,
4552                               NULL);
4553 
4554         if (port_unchained) {
4555             nxt_port_use(task, main_app_port, -1);
4556         }
4557 
4558         goto adjust_use;
4559     }
4560 
4561     nxt_debug(task, "app '%V' %p requests queue is empty, keep the port",
4562               &app->name, app);
4563 
4564 adjust_use:
4565 
4566     nxt_port_use(task, port, inc_use);
4567 }
4568 
4569 
4570 void
4571 nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port)
4572 {
4573     nxt_app_t         *app;
4574     nxt_bool_t        unchain, start_process;
4575     nxt_port_t        *idle_port;
4576     nxt_queue_link_t  *idle_lnk;
4577 
4578     app = port->app;
4579 
4580     nxt_assert(app != NULL);
4581 
4582     nxt_thread_mutex_lock(&app->mutex);
4583 
4584     nxt_port_hash_remove(&app->port_hash, port);
4585     app->port_hash_count--;
4586 
4587     if (port->id != 0) {
4588         nxt_thread_mutex_unlock(&app->mutex);
4589 
4590         nxt_debug(task, "app '%V' port (%PI, %d) closed", &app->name,
4591                   port->pid, port->id);
4592 
4593         return;
4594     }
4595 
4596     unchain = nxt_queue_chk_remove(&port->app_link);
4597 
4598     if (nxt_queue_chk_remove(&port->idle_link)) {
4599         app->idle_processes--;
4600 
4601         nxt_debug(task, "app '%V' move port %PI:%d out of %s before close",
4602                   &app->name, port->pid, port->id,
4603                   (port->idle_start ? "idle_ports" : "spare_ports"));
4604 
4605         if (port->idle_start == 0
4606             && app->idle_processes >= app->spare_processes)
4607         {
4608             nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4609 
4610             idle_lnk = nxt_queue_last(&app->idle_ports);
4611             idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link);
4612             nxt_queue_remove(idle_lnk);
4613 
4614             nxt_queue_insert_tail(&app->spare_ports, idle_lnk);
4615 
4616             idle_port->idle_start = 0;
4617 
4618             nxt_debug(task, "app '%V' move port %PI:%d from idle_ports "
4619                       "to spare_ports",
4620                       &app->name, idle_port->pid, idle_port->id);
4621         }
4622     }
4623 
4624     app->processes--;
4625 
4626     start_process = !task->thread->engine->shutdown
4627                     && nxt_router_app_can_start(app)
4628                     && nxt_router_app_need_start(app);
4629 
4630     if (start_process) {
4631         app->pending_processes++;
4632     }
4633 
4634     nxt_thread_mutex_unlock(&app->mutex);
4635 
4636     nxt_debug(task, "app '%V' pid %PI closed", &app->name, port->pid);
4637 
4638     if (unchain) {
4639         nxt_port_use(task, port, -1);
4640     }
4641 
4642     if (start_process) {
4643         nxt_router_start_app_process(task, app);
4644     }
4645 }
4646 
4647 
4648 static void
4649 nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data)
4650 {
4651     nxt_app_t           *app;
4652     nxt_bool_t          queued;
4653     nxt_port_t          *port;
4654     nxt_msec_t          timeout, threshold;
4655     nxt_queue_link_t    *lnk;
4656     nxt_event_engine_t  *engine;
4657 
4658     app = obj;
4659     queued = (data == app);
4660 
4661     nxt_debug(task, "nxt_router_adjust_idle_timer: app \"%V\", queued %b",
4662               &app->name, queued);
4663 
4664     engine = task->thread->engine;
4665 
4666     nxt_assert(app->engine == engine);
4667 
4668     threshold = engine->timers.now + app->joint->idle_timer.bias;
4669     timeout = 0;
4670 
4671     nxt_thread_mutex_lock(&app->mutex);
4672 
4673     if (queued) {
4674         app->adjust_idle_work.data = NULL;
4675     }
4676 
4677     nxt_debug(task, "app '%V' idle_processes %d, spare_processes %d",
4678               &app->name,
4679               (int) app->idle_processes, (int) app->spare_processes);
4680 
4681     while (app->idle_processes > app->spare_processes) {
4682 
4683         nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4684 
4685         lnk = nxt_queue_first(&app->idle_ports);
4686         port = nxt_queue_link_data(lnk, nxt_port_t, idle_link);
4687 
4688         timeout = port->idle_start + app->idle_timeout;
4689 
4690         nxt_debug(task, "app '%V' pid %PI, start %M, timeout %M, threshold %M",
4691                   &app->name, port->pid,
4692                   port->idle_start, timeout, threshold);
4693 
4694         if (timeout > threshold) {
4695             break;
4696         }
4697 
4698         nxt_queue_remove(lnk);
4699         lnk->next = NULL;
4700 
4701         nxt_debug(task, "app '%V' move port %PI:%d out of idle_ports (timeout)",
4702                   &app->name, port->pid, port->id);
4703 
4704         nxt_queue_chk_remove(&port->app_link);
4705 
4706         nxt_port_hash_remove(&app->port_hash, port);
4707         app->port_hash_count--;
4708 
4709         app->idle_processes--;
4710         app->processes--;
4711         port->app = NULL;
4712 
4713         nxt_thread_mutex_unlock(&app->mutex);
4714 
4715         nxt_debug(task, "app '%V' send QUIT to idle port %PI",
4716                   &app->name, port->pid);
4717 
4718         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4719 
4720         nxt_port_use(task, port, -1);
4721 
4722         nxt_thread_mutex_lock(&app->mutex);
4723     }
4724 
4725     nxt_thread_mutex_unlock(&app->mutex);
4726 
4727     if (timeout > threshold) {
4728         nxt_timer_add(engine, &app->joint->idle_timer, timeout - threshold);
4729 
4730     } else {
4731         nxt_timer_disable(engine, &app->joint->idle_timer);
4732     }
4733 
4734     if (queued) {
4735         nxt_router_app_use(task, app, -1);
4736     }
4737 }
4738 
4739 
4740 static void
4741 nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, void *data)
4742 {
4743     nxt_timer_t      *timer;
4744     nxt_app_joint_t  *app_joint;
4745 
4746     timer = obj;
4747     app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer);
4748 
4749     if (nxt_fast_path(app_joint->app != NULL)) {
4750         nxt_router_adjust_idle_timer(task, app_joint->app, NULL);
4751     }
4752 }
4753 
4754 
4755 static void
4756 nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, void *data)
4757 {
4758     nxt_timer_t      *timer;
4759     nxt_app_joint_t  *app_joint;
4760 
4761     timer = obj;
4762     app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer);
4763 
4764     nxt_router_app_joint_use(task, app_joint, -1);
4765 }
4766 
4767 
4768 static void
4769 nxt_router_free_app(nxt_task_t *task, void *obj, void *data)
4770 {
4771     nxt_app_t        *app;
4772     nxt_port_t       *port;
4773     nxt_app_joint_t  *app_joint;
4774 
4775     app_joint = obj;
4776     app = app_joint->app;
4777 
4778     for ( ;; ) {
4779         port = nxt_router_app_get_port_for_quit(task, app);
4780         if (port == NULL) {
4781             break;
4782         }
4783 
4784         nxt_debug(task, "send QUIT to app '%V' pid %PI", &app->name, port->pid);
4785 
4786         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4787 
4788         nxt_port_use(task, port, -1);
4789     }
4790 
4791     nxt_thread_mutex_lock(&app->mutex);
4792 
4793     for ( ;; ) {
4794         port = nxt_port_hash_retrieve(&app->port_hash);
4795         if (port == NULL) {
4796             break;
4797         }
4798 
4799         app->port_hash_count--;
4800 
4801         port->app = NULL;
4802 
4803         nxt_port_close(task, port);
4804 
4805         nxt_port_use(task, port, -1);
4806     }
4807 
4808     nxt_thread_mutex_unlock(&app->mutex);
4809 
4810     nxt_assert(app->processes == 0);
4811     nxt_assert(app->active_requests == 0);
4812     nxt_assert(app->port_hash_count == 0);
4813     nxt_assert(app->idle_processes == 0);
4814     nxt_assert(nxt_queue_is_empty(&app->ports));
4815     nxt_assert(nxt_queue_is_empty(&app->spare_ports));
4816     nxt_assert(nxt_queue_is_empty(&app->idle_ports));
4817 
4818     nxt_port_mmaps_destroy(&app->outgoing, 1);
4819 
4820     nxt_thread_mutex_destroy(&app->outgoing.mutex);
4821 
4822     if (app->shared_port != NULL) {
4823         app->shared_port->app = NULL;
4824         nxt_port_close(task, app->shared_port);
4825         nxt_port_use(task, app->shared_port, -1);
4826     }
4827 
4828     nxt_thread_mutex_destroy(&app->mutex);
4829     nxt_mp_destroy(app->mem_pool);
4830 
4831     app_joint->app = NULL;
4832 
4833     if (nxt_timer_delete(task->thread->engine, &app_joint->idle_timer)) {
4834         app_joint->idle_timer.handler = nxt_router_app_joint_release_handler;
4835         nxt_timer_add(task->thread->engine, &app_joint->idle_timer, 0);
4836 
4837     } else {
4838         nxt_router_app_joint_use(task, app_joint, -1);
4839     }
4840 }
4841 
4842 
4843 static void
4844 nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
4845     nxt_request_rpc_data_t *req_rpc_data)
4846 {
4847     nxt_bool_t          start_process;
4848     nxt_port_t          *port;
4849     nxt_http_request_t  *r;
4850 
4851     start_process = 0;
4852 
4853     nxt_thread_mutex_lock(&app->mutex);
4854 
4855     port = app->shared_port;
4856     nxt_port_inc_use(port);
4857 
4858     app->active_requests++;
4859 
4860     if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) {
4861         app->pending_processes++;
4862         start_process = 1;
4863     }
4864 
4865     r = req_rpc_data->request;
4866 
4867     /*
4868      * Put request into application-wide list to be able to cancel request
4869      * if something goes wrong with application processes.
4870      */
4871     nxt_queue_insert_tail(&app->ack_waiting_req, &r->app_link);
4872 
4873     nxt_thread_mutex_unlock(&app->mutex);
4874 
4875     /*
4876      * Retain request memory pool while request is linked in ack_waiting_req
4877      * to guarantee request structure memory is accessble.
4878      */
4879     nxt_mp_retain(r->mem_pool);
4880 
4881     req_rpc_data->app_port = port;
4882     req_rpc_data->apr_action = NXT_APR_REQUEST_FAILED;
4883 
4884     if (start_process) {
4885         nxt_router_start_app_process(task, app);
4886     }
4887 }
4888 
4889 
4890 void
4891 nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r,
4892     nxt_app_t *app)
4893 {
4894     nxt_event_engine_t      *engine;
4895     nxt_request_rpc_data_t  *req_rpc_data;
4896 
4897     engine = task->thread->engine;
4898 
4899     req_rpc_data = nxt_port_rpc_register_handler_ex(task, engine->port,
4900                                           nxt_router_response_ready_handler,
4901                                           nxt_router_response_error_handler,
4902                                           sizeof(nxt_request_rpc_data_t));
4903     if (nxt_slow_path(req_rpc_data == NULL)) {
4904         nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
4905         return;
4906     }
4907 
4908     /*
4909      * At this point we have request req_rpc_data allocated and registered
4910      * in port handlers.  Need to fixup request memory pool.  Counterpart
4911      * release will be called via following call chain:
4912      *    nxt_request_rpc_data_unlink() ->
4913      *        nxt_router_http_request_release_post() ->
4914      *            nxt_router_http_request_release()
4915      */
4916     nxt_mp_retain(r->mem_pool);
4917 
4918     r->timer.task = &engine->task;
4919     r->timer.work_queue = &engine->fast_work_queue;
4920     r->timer.log = engine->task.log;
4921     r->timer.bias = NXT_TIMER_DEFAULT_BIAS;
4922 
4923     r->engine = engine;
4924     r->err_work.handler = nxt_router_http_request_error;
4925     r->err_work.task = task;
4926     r->err_work.obj = r;
4927 
4928     req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data);
4929     req_rpc_data->app = app;
4930     req_rpc_data->msg_info.body_fd = -1;
4931     req_rpc_data->rpc_cancel = 1;
4932 
4933     nxt_router_app_use(task, app, 1);
4934 
4935     req_rpc_data->request = r;
4936     r->req_rpc_data = req_rpc_data;
4937 
4938     if (r->last != NULL) {
4939         r->last->completion_handler = nxt_router_http_request_done;
4940     }
4941 
4942     nxt_router_app_port_get(task, app, req_rpc_data);
4943     nxt_router_app_prepare_request(task, req_rpc_data);
4944 }
4945 
4946 
4947 static void
4948 nxt_router_http_request_error(nxt_task_t *task, void *obj, void *data)
4949 {
4950     nxt_http_request_t  *r;
4951 
4952     r = obj;
4953 
4954     nxt_debug(task, "router http request error (rpc_data %p)", r->req_rpc_data);
4955 
4956     nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
4957 
4958     if (r->req_rpc_data != NULL) {
4959         nxt_request_rpc_data_unlink(task, r->req_rpc_data);
4960     }
4961 
4962     nxt_mp_release(r->mem_pool);
4963 }
4964 
4965 
4966 static void
4967 nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data)
4968 {
4969     nxt_http_request_t  *r;
4970 
4971     r = data;
4972 
4973     nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data);
4974 
4975     if (r->req_rpc_data != NULL) {
4976         nxt_request_rpc_data_unlink(task, r->req_rpc_data);
4977     }
4978 
4979     nxt_http_request_close_handler(task, r, r->proto.any);
4980 }
4981 
4982 
4983 static void
4984 nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj, void *data)
4985 {
4986 }
4987 
4988 
4989 static void
4990 nxt_router_app_prepare_request(nxt_task_t *task,
4991     nxt_request_rpc_data_t *req_rpc_data)
4992 {
4993     nxt_app_t         *app;
4994     nxt_buf_t         *buf, *body;
4995     nxt_int_t         res;
4996     nxt_port_t        *port, *reply_port;
4997 
4998     int                   notify;
4999     struct {
5000         nxt_port_msg_t       pm;
5001         nxt_port_mmap_msg_t  mm;
5002     } msg;
5003 
5004 
5005     app = req_rpc_data->app;
5006 
5007     nxt_assert(app != NULL);
5008 
5009     port = req_rpc_data->app_port;
5010 
5011     nxt_assert(port != NULL);
5012     nxt_assert(port->queue != NULL);
5013 
5014     reply_port = task->thread->engine->port;
5015 
5016     buf = nxt_router_prepare_msg(task, req_rpc_data->request, app,
5017                                  nxt_app_msg_prefix[app->type]);
5018     if (nxt_slow_path(buf == NULL)) {
5019         nxt_alert(task, "stream #%uD, app '%V': failed to prepare app message",
5020                   req_rpc_data->stream, &app->name);
5021 
5022         nxt_http_request_error(task, req_rpc_data->request,
5023                                NXT_HTTP_INTERNAL_SERVER_ERROR);
5024 
5025         return;
5026     }
5027 
5028     nxt_debug(task, "about to send %O bytes buffer to app process port %d",
5029                     nxt_buf_used_size(buf),
5030                     port->socket.fd);
5031 
5032     req_rpc_data->msg_info.buf = buf;
5033 
5034     body = req_rpc_data->request->body;
5035 
5036     if (body != NULL && nxt_buf_is_file(body)) {
5037         req_rpc_data->msg_info.body_fd = body->file->fd;
5038 
5039         body->file->fd = -1;
5040 
5041     } else {
5042         req_rpc_data->msg_info.body_fd = -1;
5043     }
5044 
5045     msg.pm.stream = req_rpc_data->stream;
5046     msg.pm.pid = reply_port->pid;
5047     msg.pm.reply_port = reply_port->id;
5048     msg.pm.type = NXT_PORT_MSG_REQ_HEADERS;
5049     msg.pm.last = 0;
5050     msg.pm.mmap = 1;
5051     msg.pm.nf = 0;
5052     msg.pm.mf = 0;
5053     msg.pm.tracking = 0;
5054 
5055     nxt_port_mmap_handler_t *mmap_handler = buf->parent;
5056     nxt_port_mmap_header_t *hdr = mmap_handler->hdr;
5057 
5058     msg.mm.mmap_id = hdr->id;
5059     msg.mm.chunk_id = nxt_port_mmap_chunk_id(hdr, buf->mem.pos);
5060     msg.mm.size = nxt_buf_used_size(buf);
5061 
5062     res = nxt_app_queue_send(port->queue, &msg, sizeof(msg),
5063                              req_rpc_data->stream, &notify,
5064                              &req_rpc_data->msg_info.tracking_cookie);
5065     if (nxt_fast_path(res == NXT_OK)) {
5066         if (notify != 0) {
5067             (void) nxt_port_socket_write(task, port,
5068                                          NXT_PORT_MSG_READ_QUEUE,
5069                                          -1, req_rpc_data->stream,
5070                                          reply_port->id, NULL);
5071 
5072         } else {
5073             nxt_debug(task, "queue is not empty");
5074         }
5075 
5076         buf->is_port_mmap_sent = 1;
5077         buf->mem.pos = buf->mem.free;
5078 
5079     } else {
5080         nxt_alert(task, "stream #%uD, app '%V': failed to send app message",
5081                   req_rpc_data->stream, &app->name);
5082 
5083         nxt_http_request_error(task, req_rpc_data->request,
5084                                NXT_HTTP_INTERNAL_SERVER_ERROR);
5085     }
5086 }
5087 
5088 
5089 struct nxt_fields_iter_s {
5090     nxt_list_part_t   *part;
5091     nxt_http_field_t  *field;
5092 };
5093 
5094 typedef struct nxt_fields_iter_s  nxt_fields_iter_t;
5095 
5096 
5097 static nxt_http_field_t *
5098 nxt_fields_part_first(nxt_list_part_t *part, nxt_fields_iter_t *i)
5099 {
5100     if (part == NULL) {
5101         return NULL;
5102     }
5103 
5104     while (part->nelts == 0) {
5105         part = part->next;
5106         if (part == NULL) {
5107             return NULL;
5108         }
5109     }
5110 
5111     i->part = part;
5112     i->field = nxt_list_data(i->part);
5113 
5114     return i->field;
5115 }
5116 
5117 
5118 static nxt_http_field_t *
5119 nxt_fields_first(nxt_list_t *fields, nxt_fields_iter_t *i)
5120 {
5121     return nxt_fields_part_first(nxt_list_part(fields), i);
5122 }
5123 
5124 
5125 static nxt_http_field_t *
5126 nxt_fields_next(nxt_fields_iter_t *i)
5127 {
5128     nxt_http_field_t  *end = nxt_list_data(i->part);
5129 
5130     end += i->part->nelts;
5131     i->field++;
5132 
5133     if (i->field < end) {
5134         return i->field;
5135     }
5136 
5137     return nxt_fields_part_first(i->part->next, i);
5138 }
5139 
5140 
5141 static nxt_buf_t *
5142 nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r,
5143     nxt_app_t *app, const nxt_str_t *prefix)
5144 {
5145     void                *target_pos, *query_pos;
5146     u_char              *pos, *end, *p, c;
5147     size_t              fields_count, req_size, size, free_size;
5148     size_t              copy_size;
5149     nxt_off_t           content_length;
5150     nxt_buf_t           *b, *buf, *out, **tail;
5151     nxt_http_field_t    *field, *dup;
5152     nxt_unit_field_t    *dst_field;
5153     nxt_fields_iter_t   iter, dup_iter;
5154     nxt_unit_request_t  *req;
5155 
5156     req_size = sizeof(nxt_unit_request_t)
5157                + r->method->length + 1
5158                + r->version.length + 1
5159                + r->remote->length + 1
5160                + r->local->length + 1
5161                + r->server_name.length + 1
5162                + r->target.length + 1
5163                + (r->path->start != r->target.start ? r->path->length + 1 : 0);
5164 
5165     content_length = r->content_length_n < 0 ? 0 : r->content_length_n;
5166     fields_count = 0;
5167 
5168     nxt_list_each(field, r->fields) {
5169         fields_count++;
5170 
5171         req_size += field->name_length + prefix->length + 1
5172                     + field->value_length + 1;
5173     } nxt_list_loop;
5174 
5175     req_size += fields_count * sizeof(nxt_unit_field_t);
5176 
5177     if (nxt_slow_path(req_size > PORT_MMAP_DATA_SIZE)) {
5178         nxt_alert(task, "headers to big to fit in shared memory (%d)",
5179                   (int) req_size);
5180 
5181         return NULL;
5182     }
5183 
5184     out = nxt_port_mmap_get_buf(task, &app->outgoing,
5185               nxt_min(req_size + content_length, PORT_MMAP_DATA_SIZE));
5186     if (nxt_slow_path(out == NULL)) {
5187         return NULL;
5188     }
5189 
5190     req = (nxt_unit_request_t *) out->mem.free;
5191     out->mem.free += req_size;
5192 
5193     req->app_target = r->app_target;
5194 
5195     req->content_length = content_length;
5196 
5197     p = (u_char *) (req->fields + fields_count);
5198 
5199     nxt_debug(task, "fields_count=%d", (int) fields_count);
5200 
5201     req->method_length = r->method->length;
5202     nxt_unit_sptr_set(&req->method, p);
5203     p = nxt_cpymem(p, r->method->start, r->method->length);
5204     *p++ = '\0';
5205 
5206     req->version_length = r->version.length;
5207     nxt_unit_sptr_set(&req->version, p);
5208     p = nxt_cpymem(p, r->version.start, r->version.length);
5209     *p++ = '\0';
5210 
5211     req->remote_length = r->remote->address_length;
5212     nxt_unit_sptr_set(&req->remote, p);
5213     p = nxt_cpymem(p, nxt_sockaddr_address(r->remote),
5214                    r->remote->address_length);
5215     *p++ = '\0';
5216 
5217     req->local_length = r->local->address_length;
5218     nxt_unit_sptr_set(&req->local, p);
5219     p = nxt_cpymem(p, nxt_sockaddr_address(r->local), r->local->address_length);
5220     *p++ = '\0';
5221 
5222     req->tls = (r->tls != NULL);
5223     req->websocket_handshake = r->websocket_handshake;
5224 
5225     req->server_name_length = r->server_name.length;
5226     nxt_unit_sptr_set(&req->server_name, p);
5227     p = nxt_cpymem(p, r->server_name.start, r->server_name.length);
5228     *p++ = '\0';
5229 
5230     target_pos = p;
5231     req->target_length = (uint32_t) r->target.length;
5232     nxt_unit_sptr_set(&req->target, p);
5233     p = nxt_cpymem(p, r->target.start, r->target.length);
5234     *p++ = '\0';
5235 
5236     req->path_length = (uint32_t) r->path->length;
5237     if (r->path->start == r->target.start) {
5238         nxt_unit_sptr_set(&req->path, target_pos);
5239 
5240     } else {
5241         nxt_unit_sptr_set(&req->path, p);
5242         p = nxt_cpymem(p, r->path->start, r->path->length);
5243         *p++ = '\0';
5244     }
5245 
5246     req->query_length = r->args != NULL ? (uint32_t) r->args->length : 0;
5247     if (r->args != NULL && r->args->start != NULL) {
5248         query_pos = nxt_pointer_to(target_pos,
5249                                    r->args->start - r->target.start);
5250 
5251         nxt_unit_sptr_set(&req->query, query_pos);
5252 
5253     } else {
5254         req->query.offset = 0;
5255     }
5256 
5257     req->content_length_field = NXT_UNIT_NONE_FIELD;
5258     req->content_type_field   = NXT_UNIT_NONE_FIELD;
5259     req->cookie_field         = NXT_UNIT_NONE_FIELD;
5260     req->authorization_field  = NXT_UNIT_NONE_FIELD;
5261 
5262     dst_field = req->fields;
5263 
5264     for (field = nxt_fields_first(r->fields, &iter);
5265          field != NULL;
5266          field = nxt_fields_next(&iter))
5267     {
5268         if (field->skip) {
5269             continue;
5270         }
5271 
5272         dst_field->hash = field->hash;
5273         dst_field->skip = 0;
5274         dst_field->name_length = field->name_length + prefix->length;
5275         dst_field->value_length = field->value_length;
5276 
5277         if (field == r->content_length) {
5278             req->content_length_field = dst_field - req->fields;
5279 
5280         } else if (field == r->content_type) {
5281             req->content_type_field = dst_field - req->fields;
5282 
5283         } else if (field == r->cookie) {
5284             req->cookie_field = dst_field - req->fields;
5285 
5286         } else if (field == r->authorization) {
5287             req->authorization_field = dst_field - req->fields;
5288         }
5289 
5290         nxt_debug(task, "add field 0x%04Xd, %d, %d, %p : %d %p",
5291                   (int) field->hash, (int) field->skip,
5292                   (int) field->name_length, field->name,
5293                   (int) field->value_length, field->value);
5294 
5295         if (prefix->length != 0) {
5296             nxt_unit_sptr_set(&dst_field->name, p);
5297             p = nxt_cpymem(p, prefix->start, prefix->length);
5298 
5299             end = field->name + field->name_length;
5300             for (pos = field->name; pos < end; pos++) {
5301                 c = *pos;
5302 
5303                 if (c >= 'a' && c <= 'z') {
5304                     *p++ = (c & ~0x20);
5305                     continue;
5306                 }
5307 
5308                 if (c == '-') {
5309                     *p++ = '_';
5310                     continue;
5311                 }
5312 
5313                 *p++ = c;
5314             }
5315 
5316         } else {
5317             nxt_unit_sptr_set(&dst_field->name, p);
5318             p = nxt_cpymem(p, field->name, field->name_length);
5319         }
5320 
5321         *p++ = '\0';
5322 
5323         nxt_unit_sptr_set(&dst_field->value, p);
5324         p = nxt_cpymem(p, field->value, field->value_length);
5325 
5326         if (prefix->length != 0) {
5327             dup_iter = iter;
5328 
5329             for (dup = nxt_fields_next(&dup_iter);
5330                  dup != NULL;
5331                  dup = nxt_fields_next(&dup_iter))
5332             {
5333                 if (dup->name_length != field->name_length
5334                     || dup->skip
5335                     || dup->hash != field->hash
5336                     || nxt_memcasecmp(dup->name, field->name, dup->name_length))
5337                 {
5338                     continue;
5339                 }
5340 
5341                 p = nxt_cpymem(p, ", ", 2);
5342                 p = nxt_cpymem(p, dup->value, dup->value_length);
5343 
5344                 dst_field->value_length += 2 + dup->value_length;
5345 
5346                 dup->skip = 1;
5347             }
5348         }
5349 
5350         *p++ = '\0';
5351 
5352         dst_field++;
5353     }
5354 
5355     req->fields_count = (uint32_t) (dst_field - req->fields);
5356 
5357     nxt_unit_sptr_set(&req->preread_content, out->mem.free);
5358 
5359     buf = out;
5360     tail = &buf->next;
5361 
5362     for (b = r->body; b != NULL; b = b->next) {
5363         size = nxt_buf_mem_used_size(&b->mem);
5364         pos = b->mem.pos;
5365 
5366         while (size > 0) {
5367             if (buf == NULL) {
5368                 free_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
5369 
5370                 buf = nxt_port_mmap_get_buf(task, &app->outgoing, free_size);
5371                 if (nxt_slow_path(buf == NULL)) {
5372                     while (out != NULL) {
5373                         buf = out->next;
5374                         out->next = NULL;
5375                         out->completion_handler(task, out, out->parent);
5376                         out = buf;
5377                     }
5378                     return NULL;
5379                 }
5380 
5381                 *tail = buf;
5382                 tail = &buf->next;
5383 
5384             } else {
5385                 free_size = nxt_buf_mem_free_size(&buf->mem);
5386                 if (free_size < size
5387                     && nxt_port_mmap_increase_buf(task, buf, size, 1)
5388                        == NXT_OK)
5389                 {
5390                     free_size = nxt_buf_mem_free_size(&buf->mem);
5391                 }
5392             }
5393 
5394             if (free_size > 0) {
5395                 copy_size = nxt_min(free_size, size);
5396 
5397                 buf->mem.free = nxt_cpymem(buf->mem.free, pos, copy_size);
5398 
5399                 size -= copy_size;
5400                 pos += copy_size;
5401 
5402                 if (size == 0) {
5403                     break;
5404                 }
5405             }
5406 
5407             buf = NULL;
5408         }
5409     }
5410 
5411     return out;
5412 }
5413 
5414 
5415 static void
5416 nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
5417 {
5418     nxt_timer_t              *timer;
5419     nxt_http_request_t       *r;
5420     nxt_request_rpc_data_t   *req_rpc_data;
5421 
5422     timer = obj;
5423 
5424     nxt_debug(task, "router app timeout");
5425 
5426     r = nxt_timer_data(timer, nxt_http_request_t, timer);
5427     req_rpc_data = r->timer_data;
5428 
5429     nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
5430 
5431     nxt_request_rpc_data_unlink(task, req_rpc_data);
5432 }
5433 
5434 
5435 static void
5436 nxt_router_http_request_release_post(nxt_task_t *task, nxt_http_request_t *r)
5437 {
5438     r->timer.handler = nxt_router_http_request_release;
5439     nxt_timer_add(task->thread->engine, &r->timer, 0);
5440 }
5441 
5442 
5443 static void
5444 nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data)
5445 {
5446     nxt_http_request_t  *r;
5447 
5448     nxt_debug(task, "http request pool release");
5449 
5450     r = nxt_timer_data(obj, nxt_http_request_t, timer);
5451 
5452     nxt_mp_release(r->mem_pool);
5453 }
5454 
5455 
5456 static void
5457 nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5458 {
5459     size_t                   mi;
5460     uint32_t                 i;
5461     nxt_bool_t               ack;
5462     nxt_process_t            *process;
5463     nxt_free_map_t           *m;
5464     nxt_port_mmap_handler_t  *mmap_handler;
5465 
5466     nxt_debug(task, "oosm in %PI", msg->port_msg.pid);
5467 
5468     process = nxt_runtime_process_find(task->thread->runtime,
5469                                        msg->port_msg.pid);
5470     if (nxt_slow_path(process == NULL)) {
5471         return;
5472     }
5473 
5474     ack = 0;
5475 
5476     /*
5477      * To mitigate possible racing condition (when OOSM message received
5478      * after some of the memory was already freed), need to try to find
5479      * first free segment in shared memory and send ACK if found.
5480      */
5481 
5482     nxt_thread_mutex_lock(&process->incoming.mutex);
5483 
5484     for (i = 0; i < process->incoming.size; i++) {
5485         mmap_handler = process->incoming.elts[i].mmap_handler;
5486 
5487         if (nxt_slow_path(mmap_handler == NULL)) {
5488             continue;
5489         }
5490 
5491         m = mmap_handler->hdr->free_map;
5492 
5493         for (mi = 0; mi < MAX_FREE_IDX; mi++) {
5494             if (m[mi] != 0) {
5495                 ack = 1;
5496 
5497                 nxt_debug(task, "oosm: already free #%uD %uz = 0x%08xA",
5498                           i, mi, m[mi]);
5499 
5500                 break;
5501             }
5502         }
5503     }
5504 
5505     nxt_thread_mutex_unlock(&process->incoming.mutex);
5506 
5507     if (ack) {
5508         nxt_process_broadcast_shm_ack(task, process);
5509     }
5510 }
5511 
5512 
5513 static void
5514 nxt_router_get_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5515 {
5516     nxt_fd_t                 fd;
5517     nxt_port_t               *port;
5518     nxt_runtime_t            *rt;
5519     nxt_port_mmaps_t         *mmaps;
5520     nxt_port_msg_get_mmap_t  *get_mmap_msg;
5521     nxt_port_mmap_handler_t  *mmap_handler;
5522 
5523     rt = task->thread->runtime;
5524 
5525     port = nxt_runtime_port_find(rt, msg->port_msg.pid,
5526                                  msg->port_msg.reply_port);
5527     if (nxt_slow_path(port == NULL)) {
5528         nxt_alert(task, "get_mmap_handler: reply_port %PI:%d not found",
5529                   msg->port_msg.pid, msg->port_msg.reply_port);
5530 
5531         return;
5532     }
5533 
5534     if (nxt_slow_path(nxt_buf_used_size(msg->buf)
5535                       < (int) sizeof(nxt_port_msg_get_mmap_t)))
5536     {
5537         nxt_alert(task, "get_mmap_handler: message buffer too small (%d)",
5538                   (int) nxt_buf_used_size(msg->buf));
5539 
5540         return;
5541     }
5542 
5543     get_mmap_msg = (nxt_port_msg_get_mmap_t *) msg->buf->mem.pos;
5544 
5545     nxt_assert(port->type == NXT_PROCESS_APP);
5546 
5547     if (nxt_slow_path(port->app == NULL)) {
5548         nxt_alert(task, "get_mmap_handler: app == NULL for reply port %PI:%d",
5549                   port->pid, port->id);
5550 
5551         // FIXME
5552         nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
5553                               -1, msg->port_msg.stream, 0, NULL);
5554 
5555         return;
5556     }
5557 
5558     mmaps = &port->app->outgoing;
5559     nxt_thread_mutex_lock(&mmaps->mutex);
5560 
5561     if (nxt_slow_path(get_mmap_msg->id >= mmaps->size)) {
5562         nxt_thread_mutex_unlock(&mmaps->mutex);
5563 
5564         nxt_alert(task, "get_mmap_handler: mmap id is too big (%d)",
5565                   (int) get_mmap_msg->id);
5566 
5567         // FIXME
5568         nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
5569                               -1, msg->port_msg.stream, 0, NULL);
5570         return;
5571     }
5572 
5573     mmap_handler = mmaps->elts[get_mmap_msg->id].mmap_handler;
5574 
5575     fd = mmap_handler->fd;
5576 
5577     nxt_thread_mutex_unlock(&mmaps->mutex);
5578 
5579     nxt_debug(task, "get mmap %PI:%d found",
5580               msg->port_msg.pid, (int) get_mmap_msg->id);
5581 
5582     (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL);
5583 }
5584 
5585 
5586 static void
5587 nxt_router_get_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5588 {
5589     nxt_port_t               *port, *reply_port;
5590     nxt_runtime_t            *rt;
5591     nxt_port_msg_get_port_t  *get_port_msg;
5592 
5593     rt = task->thread->runtime;
5594 
5595     reply_port = nxt_runtime_port_find(rt, msg->port_msg.pid,
5596                                        msg->port_msg.reply_port);
5597     if (nxt_slow_path(reply_port == NULL)) {
5598         nxt_alert(task, "get_port_handler: reply_port %PI:%d not found",
5599                   msg->port_msg.pid, msg->port_msg.reply_port);
5600 
5601         return;
5602     }
5603 
5604     if (nxt_slow_path(nxt_buf_used_size(msg->buf)
5605                       < (int) sizeof(nxt_port_msg_get_port_t)))
5606     {
5607         nxt_alert(task, "get_port_handler: message buffer too small (%d)",
5608                   (int) nxt_buf_used_size(msg->buf));
5609 
5610         return;
5611     }
5612 
5613     get_port_msg = (nxt_port_msg_get_port_t *) msg->buf->mem.pos;
5614 
5615     port = nxt_runtime_port_find(rt, get_port_msg->pid, get_port_msg->id);
5616     if (nxt_slow_path(port == NULL)) {
5617         nxt_alert(task, "get_port_handler: port %PI:%d not found",
5618                   get_port_msg->pid, get_port_msg->id);
5619 
5620         return;
5621     }
5622 
5623     nxt_debug(task, "get port %PI:%d found", get_port_msg->pid,
5624               get_port_msg->id);
5625 
5626     (void) nxt_port_send_port(task, reply_port, port, msg->port_msg.stream);
5627 }
5628