xref: /unit/src/nxt_router.c (revision 1915:48167dd83aa5)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Valentin V. Bartenev
5  * Copyright (C) NGINX, Inc.
6  */
7 
8 #include <nxt_router.h>
9 #include <nxt_conf.h>
10 #if (NXT_TLS)
11 #include <nxt_cert.h>
12 #endif
13 #include <nxt_http.h>
14 #include <nxt_port_memory_int.h>
15 #include <nxt_unit_request.h>
16 #include <nxt_unit_response.h>
17 #include <nxt_router_request.h>
18 #include <nxt_app_queue.h>
19 #include <nxt_port_queue.h>
20 
21 typedef struct {
22     nxt_str_t         type;
23     uint32_t          processes;
24     uint32_t          max_processes;
25     uint32_t          spare_processes;
26     nxt_msec_t        timeout;
27     nxt_msec_t        idle_timeout;
28     uint32_t          requests;
29     nxt_conf_value_t  *limits_value;
30     nxt_conf_value_t  *processes_value;
31     nxt_conf_value_t  *targets_value;
32 } nxt_router_app_conf_t;
33 
34 
35 typedef struct {
36     nxt_str_t         pass;
37     nxt_str_t         application;
38 } nxt_router_listener_conf_t;
39 
40 
41 #if (NXT_TLS)
42 
43 typedef struct {
44     nxt_str_t               name;
45     nxt_socket_conf_t       *socket_conf;
46     nxt_router_temp_conf_t  *temp_conf;
47     nxt_conf_value_t        *conf_cmds;
48     nxt_bool_t              last;
49 
50     nxt_queue_link_t   link;  /* for nxt_socket_conf_t.tls */
51 } nxt_router_tlssock_t;
52 
53 #endif
54 
55 
56 typedef struct {
57     nxt_str_t               *name;
58     nxt_socket_conf_t       *socket_conf;
59     nxt_router_temp_conf_t  *temp_conf;
60     nxt_bool_t              last;
61 } nxt_socket_rpc_t;
62 
63 
64 typedef struct {
65     nxt_app_t               *app;
66     nxt_router_temp_conf_t  *temp_conf;
67 } nxt_app_rpc_t;
68 
69 
70 static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process,
71     nxt_mp_t *mp);
72 static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data);
73 static void nxt_router_greet_controller(nxt_task_t *task,
74     nxt_port_t *controller_port);
75 
76 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
77 
78 static void nxt_router_new_port_handler(nxt_task_t *task,
79     nxt_port_recv_msg_t *msg);
80 static void nxt_router_conf_data_handler(nxt_task_t *task,
81     nxt_port_recv_msg_t *msg);
82 static void nxt_router_remove_pid_handler(nxt_task_t *task,
83     nxt_port_recv_msg_t *msg);
84 static void nxt_router_access_log_reopen_handler(nxt_task_t *task,
85     nxt_port_recv_msg_t *msg);
86 
87 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
88 static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
89 static void nxt_router_conf_ready(nxt_task_t *task,
90     nxt_router_temp_conf_t *tmcf);
91 static void nxt_router_conf_error(nxt_task_t *task,
92     nxt_router_temp_conf_t *tmcf);
93 static void nxt_router_conf_send(nxt_task_t *task,
94     nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
95 
96 static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
97     nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
98 static nxt_int_t nxt_router_conf_process_static(nxt_task_t *task,
99     nxt_router_conf_t *rtcf, nxt_conf_value_t *conf);
100 
101 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
102 static nxt_int_t nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data);
103 static nxt_int_t nxt_router_apps_hash_add(nxt_router_conf_t *rtcf,
104     nxt_app_t *app);
105 static nxt_app_t *nxt_router_apps_hash_get(nxt_router_conf_t *rtcf,
106     nxt_str_t *name);
107 static void nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf,
108     int i);
109 
110 static nxt_int_t nxt_router_app_queue_init(nxt_task_t *task,
111     nxt_port_t *port);
112 static nxt_int_t nxt_router_port_queue_init(nxt_task_t *task,
113     nxt_port_t *port);
114 static nxt_int_t nxt_router_port_queue_map(nxt_task_t *task,
115     nxt_port_t *port, nxt_fd_t fd);
116 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
117     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
118 static void nxt_router_listen_socket_ready(nxt_task_t *task,
119     nxt_port_recv_msg_t *msg, void *data);
120 static void nxt_router_listen_socket_error(nxt_task_t *task,
121     nxt_port_recv_msg_t *msg, void *data);
122 #if (NXT_TLS)
123 static void nxt_router_tls_rpc_handler(nxt_task_t *task,
124     nxt_port_recv_msg_t *msg, void *data);
125 static nxt_int_t nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
126     nxt_conf_value_t *value, nxt_socket_conf_t *skcf,
127     nxt_conf_value_t * conf_cmds, nxt_bool_t last);
128 #endif
129 static void nxt_router_app_rpc_create(nxt_task_t *task,
130     nxt_router_temp_conf_t *tmcf, nxt_app_t *app);
131 static void nxt_router_app_prefork_ready(nxt_task_t *task,
132     nxt_port_recv_msg_t *msg, void *data);
133 static void nxt_router_app_prefork_error(nxt_task_t *task,
134     nxt_port_recv_msg_t *msg, void *data);
135 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
136     nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
137 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
138     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa);
139 
140 static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
141     nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
142     const nxt_event_interface_t *interface);
143 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
144     nxt_router_engine_conf_t *recf);
145 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
146     nxt_router_engine_conf_t *recf);
147 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
148     nxt_router_engine_conf_t *recf);
149 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
150     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
151     nxt_work_handler_t handler);
152 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
153     nxt_router_engine_conf_t *recf);
154 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
155     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
156 
157 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
158     nxt_router_temp_conf_t *tmcf);
159 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
160     nxt_event_engine_t *engine);
161 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
162     nxt_router_temp_conf_t *tmcf);
163 
164 static void nxt_router_engines_post(nxt_router_t *router,
165     nxt_router_temp_conf_t *tmcf);
166 static void nxt_router_engine_post(nxt_event_engine_t *engine,
167     nxt_work_t *jobs);
168 
169 static void nxt_router_thread_start(void *data);
170 static void nxt_router_rt_add_port(nxt_task_t *task, void *obj,
171     void *data);
172 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
173     void *data);
174 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
175     void *data);
176 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
177     void *data);
178 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
179     void *data);
180 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
181     void *data);
182 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
183     void *data);
184 static void nxt_router_req_headers_ack_handler(nxt_task_t *task,
185     nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data);
186 static void nxt_router_listen_socket_release(nxt_task_t *task,
187     nxt_socket_conf_t *skcf);
188 
189 static void nxt_router_access_log_writer(nxt_task_t *task,
190     nxt_http_request_t *r, nxt_router_access_log_t *access_log);
191 static u_char *nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now,
192     struct tm *tm, size_t size, const char *format);
193 static void nxt_router_access_log_open(nxt_task_t *task,
194     nxt_router_temp_conf_t *tmcf);
195 static void nxt_router_access_log_ready(nxt_task_t *task,
196     nxt_port_recv_msg_t *msg, void *data);
197 static void nxt_router_access_log_error(nxt_task_t *task,
198     nxt_port_recv_msg_t *msg, void *data);
199 static void nxt_router_access_log_release(nxt_task_t *task,
200     nxt_thread_spinlock_t *lock, nxt_router_access_log_t *access_log);
201 static void nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj,
202     void *data);
203 static void nxt_router_access_log_reopen_ready(nxt_task_t *task,
204     nxt_port_recv_msg_t *msg, void *data);
205 static void nxt_router_access_log_reopen_error(nxt_task_t *task,
206     nxt_port_recv_msg_t *msg, void *data);
207 
208 static void nxt_router_app_port_ready(nxt_task_t *task,
209     nxt_port_recv_msg_t *msg, void *data);
210 static nxt_int_t nxt_router_app_shared_port_send(nxt_task_t *task,
211     nxt_port_t *app_port);
212 static void nxt_router_app_port_error(nxt_task_t *task,
213     nxt_port_recv_msg_t *msg, void *data);
214 
215 static void nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i);
216 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
217 
218 static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
219     nxt_apr_action_t action);
220 static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
221     nxt_request_rpc_data_t *req_rpc_data);
222 static void nxt_router_http_request_error(nxt_task_t *task, void *obj,
223     void *data);
224 static void nxt_router_http_request_done(nxt_task_t *task, void *obj,
225     void *data);
226 
227 static void nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj,
228     void *data);
229 static void nxt_router_app_prepare_request(nxt_task_t *task,
230     nxt_request_rpc_data_t *req_rpc_data);
231 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task,
232     nxt_http_request_t *r, nxt_app_t *app, const nxt_str_t *prefix);
233 
234 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
235 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
236     void *data);
237 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj,
238     void *data);
239 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj,
240     void *data);
241 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data);
242 
243 static const nxt_http_request_state_t  nxt_http_request_send_state;
244 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
245 
246 static void nxt_router_app_joint_use(nxt_task_t *task,
247     nxt_app_joint_t *app_joint, int i);
248 
249 static void nxt_router_http_request_release_post(nxt_task_t *task,
250     nxt_http_request_t *r);
251 static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
252     void *data);
253 static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
254 static void nxt_router_get_port_handler(nxt_task_t *task,
255     nxt_port_recv_msg_t *msg);
256 static void nxt_router_get_mmap_handler(nxt_task_t *task,
257     nxt_port_recv_msg_t *msg);
258 
259 extern const nxt_http_request_state_t  nxt_http_websocket;
260 
261 static nxt_router_t  *nxt_router;
262 
263 static const nxt_str_t http_prefix = nxt_string("HTTP_");
264 static const nxt_str_t empty_prefix = nxt_string("");
265 
266 static const nxt_str_t  *nxt_app_msg_prefix[] = {
267     &empty_prefix,
268     &empty_prefix,
269     &http_prefix,
270     &http_prefix,
271     &http_prefix,
272     &empty_prefix,
273 };
274 
275 
276 static const nxt_port_handlers_t  nxt_router_process_port_handlers = {
277     .quit         = nxt_signal_quit_handler,
278     .new_port     = nxt_router_new_port_handler,
279     .get_port     = nxt_router_get_port_handler,
280     .change_file  = nxt_port_change_log_file_handler,
281     .mmap         = nxt_port_mmap_handler,
282     .get_mmap     = nxt_router_get_mmap_handler,
283     .data         = nxt_router_conf_data_handler,
284     .remove_pid   = nxt_router_remove_pid_handler,
285     .access_log   = nxt_router_access_log_reopen_handler,
286     .rpc_ready    = nxt_port_rpc_handler,
287     .rpc_error    = nxt_port_rpc_handler,
288     .oosm         = nxt_router_oosm_handler,
289 };
290 
291 
292 const nxt_process_init_t  nxt_router_process = {
293     .name           = "router",
294     .type           = NXT_PROCESS_ROUTER,
295     .prefork        = nxt_router_prefork,
296     .restart        = 1,
297     .setup          = nxt_process_core_setup,
298     .start          = nxt_router_start,
299     .port_handlers  = &nxt_router_process_port_handlers,
300     .signals        = nxt_process_signals,
301 };
302 
303 
304 /* Queues of nxt_socket_conf_t */
305 nxt_queue_t  creating_sockets;
306 nxt_queue_t  pending_sockets;
307 nxt_queue_t  updating_sockets;
308 nxt_queue_t  keeping_sockets;
309 nxt_queue_t  deleting_sockets;
310 
311 
312 static nxt_int_t
313 nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp)
314 {
315     nxt_runtime_stop_app_processes(task, task->thread->runtime);
316 
317     return NXT_OK;
318 }
319 
320 
321 static nxt_int_t
322 nxt_router_start(nxt_task_t *task, nxt_process_data_t *data)
323 {
324     nxt_int_t      ret;
325     nxt_port_t     *controller_port;
326     nxt_router_t   *router;
327     nxt_runtime_t  *rt;
328 
329     rt = task->thread->runtime;
330 
331     nxt_log(task, NXT_LOG_INFO, "router started");
332 
333 #if (NXT_TLS)
334     rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL");
335     if (nxt_slow_path(rt->tls == NULL)) {
336         return NXT_ERROR;
337     }
338 
339     ret = rt->tls->library_init(task);
340     if (nxt_slow_path(ret != NXT_OK)) {
341         return ret;
342     }
343 #endif
344 
345     ret = nxt_http_init(task);
346     if (nxt_slow_path(ret != NXT_OK)) {
347         return ret;
348     }
349 
350     router = nxt_zalloc(sizeof(nxt_router_t));
351     if (nxt_slow_path(router == NULL)) {
352         return NXT_ERROR;
353     }
354 
355     nxt_queue_init(&router->engines);
356     nxt_queue_init(&router->sockets);
357     nxt_queue_init(&router->apps);
358 
359     nxt_router = router;
360 
361     controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
362     if (controller_port != NULL) {
363         nxt_router_greet_controller(task, controller_port);
364     }
365 
366     return NXT_OK;
367 }
368 
369 
370 static void
371 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port)
372 {
373     nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY,
374                           -1, 0, 0, NULL);
375 }
376 
377 
378 static void
379 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
380     void *data)
381 {
382     size_t         size;
383     uint32_t       stream;
384     nxt_mp_t       *mp;
385     nxt_int_t      ret;
386     nxt_app_t      *app;
387     nxt_buf_t      *b;
388     nxt_port_t     *main_port;
389     nxt_runtime_t  *rt;
390 
391     app = data;
392 
393     rt = task->thread->runtime;
394     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
395 
396     nxt_debug(task, "app '%V' %p start process", &app->name, app);
397 
398     size = app->name.length + 1 + app->conf.length;
399 
400     b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size);
401 
402     if (nxt_slow_path(b == NULL)) {
403         goto failed;
404     }
405 
406     nxt_buf_cpystr(b, &app->name);
407     *b->mem.free++ = '\0';
408     nxt_buf_cpystr(b, &app->conf);
409 
410     nxt_router_app_joint_use(task, app->joint, 1);
411 
412     stream = nxt_port_rpc_register_handler(task, port,
413                                            nxt_router_app_port_ready,
414                                            nxt_router_app_port_error,
415                                            -1, app->joint);
416 
417     if (nxt_slow_path(stream == 0)) {
418         nxt_router_app_joint_use(task, app->joint, -1);
419 
420         goto failed;
421     }
422 
423     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS,
424                                 -1, stream, port->id, b);
425 
426     if (nxt_slow_path(ret != NXT_OK)) {
427         nxt_port_rpc_cancel(task, port, stream);
428 
429         nxt_router_app_joint_use(task, app->joint, -1);
430 
431         goto failed;
432     }
433 
434     nxt_router_app_use(task, app, -1);
435 
436     return;
437 
438 failed:
439 
440     if (b != NULL) {
441         mp = b->data;
442         nxt_mp_free(mp, b);
443         nxt_mp_release(mp);
444     }
445 
446     nxt_thread_mutex_lock(&app->mutex);
447 
448     app->pending_processes--;
449 
450     nxt_thread_mutex_unlock(&app->mutex);
451 
452     nxt_router_app_use(task, app, -1);
453 }
454 
455 
456 static void
457 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i)
458 {
459     app_joint->use_count += i;
460 
461     if (app_joint->use_count == 0) {
462         nxt_assert(app_joint->app == NULL);
463 
464         nxt_free(app_joint);
465     }
466 }
467 
468 
469 static nxt_int_t
470 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
471 {
472     nxt_int_t      res;
473     nxt_port_t     *router_port;
474     nxt_runtime_t  *rt;
475 
476     nxt_debug(task, "app '%V' start process", &app->name);
477 
478     rt = task->thread->runtime;
479     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
480 
481     nxt_router_app_use(task, app, 1);
482 
483     res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler,
484                         app);
485 
486     if (res == NXT_OK) {
487         return res;
488     }
489 
490     nxt_thread_mutex_lock(&app->mutex);
491 
492     app->pending_processes--;
493 
494     nxt_thread_mutex_unlock(&app->mutex);
495 
496     nxt_router_app_use(task, app, -1);
497 
498     return NXT_ERROR;
499 }
500 
501 
502 nxt_inline nxt_bool_t
503 nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data)
504 {
505     nxt_buf_t       *b, *next;
506     nxt_bool_t      cancelled;
507     nxt_msg_info_t  *msg_info;
508 
509     msg_info = &req_rpc_data->msg_info;
510 
511     if (msg_info->buf == NULL) {
512         return 0;
513     }
514 
515     cancelled = nxt_app_queue_cancel(req_rpc_data->app->shared_port->queue,
516                                      msg_info->tracking_cookie,
517                                      req_rpc_data->stream);
518 
519     if (cancelled) {
520         nxt_debug(task, "stream #%uD: cancelled by router",
521                   req_rpc_data->stream);
522     }
523 
524     for (b = msg_info->buf; b != NULL; b = next) {
525         next = b->next;
526         b->next = NULL;
527 
528         if (b->is_port_mmap_sent) {
529             b->is_port_mmap_sent = cancelled == 0;
530         }
531 
532         b->completion_handler(task, b, b->parent);
533     }
534 
535     msg_info->buf = NULL;
536 
537     return cancelled;
538 }
539 
540 
541 nxt_inline nxt_bool_t
542 nxt_queue_chk_remove(nxt_queue_link_t *lnk)
543 {
544     if (lnk->next != NULL) {
545         nxt_queue_remove(lnk);
546 
547         lnk->next = NULL;
548 
549         return 1;
550     }
551 
552     return 0;
553 }
554 
555 
556 nxt_inline void
557 nxt_request_rpc_data_unlink(nxt_task_t *task,
558     nxt_request_rpc_data_t *req_rpc_data)
559 {
560     nxt_app_t           *app;
561     nxt_bool_t          unlinked;
562     nxt_http_request_t  *r;
563 
564     nxt_router_msg_cancel(task, req_rpc_data);
565 
566     if (req_rpc_data->app_port != NULL) {
567         nxt_router_app_port_release(task, req_rpc_data->app_port,
568                                     req_rpc_data->apr_action);
569 
570         req_rpc_data->app_port = NULL;
571     }
572 
573     app = req_rpc_data->app;
574     r = req_rpc_data->request;
575 
576     if (r != NULL) {
577         r->timer_data = NULL;
578 
579         nxt_router_http_request_release_post(task, r);
580 
581         r->req_rpc_data = NULL;
582         req_rpc_data->request = NULL;
583 
584         if (app != NULL) {
585             unlinked = 0;
586 
587             nxt_thread_mutex_lock(&app->mutex);
588 
589             if (r->app_link.next != NULL) {
590                 nxt_queue_remove(&r->app_link);
591                 r->app_link.next = NULL;
592 
593                 unlinked = 1;
594             }
595 
596             nxt_thread_mutex_unlock(&app->mutex);
597 
598             if (unlinked) {
599                 nxt_mp_release(r->mem_pool);
600             }
601         }
602     }
603 
604     if (app != NULL) {
605         nxt_router_app_use(task, app, -1);
606 
607         req_rpc_data->app = NULL;
608     }
609 
610     if (req_rpc_data->msg_info.body_fd != -1) {
611         nxt_fd_close(req_rpc_data->msg_info.body_fd);
612 
613         req_rpc_data->msg_info.body_fd = -1;
614     }
615 
616     if (req_rpc_data->rpc_cancel) {
617         req_rpc_data->rpc_cancel = 0;
618 
619         nxt_port_rpc_cancel(task, task->thread->engine->port,
620                             req_rpc_data->stream);
621     }
622 }
623 
624 
625 static void
626 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
627 {
628     nxt_int_t      res;
629     nxt_app_t      *app;
630     nxt_port_t     *port, *main_app_port;
631     nxt_runtime_t  *rt;
632 
633     nxt_port_new_port_handler(task, msg);
634 
635     port = msg->u.new_port;
636 
637     if (port != NULL && port->type == NXT_PROCESS_CONTROLLER) {
638         nxt_router_greet_controller(task, msg->u.new_port);
639     }
640 
641     if (port == NULL || port->type != NXT_PROCESS_APP) {
642 
643         if (msg->port_msg.stream == 0) {
644             return;
645         }
646 
647         msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
648 
649     } else {
650         if (msg->fd[1] != -1) {
651             res = nxt_router_port_queue_map(task, port, msg->fd[1]);
652             if (nxt_slow_path(res != NXT_OK)) {
653                 return;
654             }
655 
656             nxt_fd_close(msg->fd[1]);
657             msg->fd[1] = -1;
658         }
659     }
660 
661     if (msg->port_msg.stream != 0) {
662         nxt_port_rpc_handler(task, msg);
663         return;
664     }
665 
666     /*
667      * Port with "id == 0" is application 'main' port and it always
668      * should come with non-zero stream.
669      */
670     nxt_assert(port->id != 0);
671 
672     /* Find 'main' app port and get app reference. */
673     rt = task->thread->runtime;
674 
675     /*
676      * It is safe to access 'runtime->ports' hash because 'NEW_PORT'
677      * sent to main port (with id == 0) and processed in main thread.
678      */
679     main_app_port = nxt_port_hash_find(&rt->ports, port->pid, 0);
680     nxt_assert(main_app_port != NULL);
681 
682     app = main_app_port->app;
683 
684     if (nxt_fast_path(app != NULL)) {
685         nxt_thread_mutex_lock(&app->mutex);
686 
687         /* TODO here should be find-and-add code because there can be
688            port waiters in port_hash */
689         nxt_port_hash_add(&app->port_hash, port);
690         app->port_hash_count++;
691 
692         nxt_thread_mutex_unlock(&app->mutex);
693 
694         port->app = app;
695     }
696 
697     port->main_app_port = main_app_port;
698 
699     nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
700 }
701 
702 
703 static void
704 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
705 {
706     void                    *p;
707     size_t                  size;
708     nxt_int_t               ret;
709     nxt_port_t              *port;
710     nxt_router_temp_conf_t  *tmcf;
711 
712     port = nxt_runtime_port_find(task->thread->runtime,
713                                  msg->port_msg.pid,
714                                  msg->port_msg.reply_port);
715     if (nxt_slow_path(port == NULL)) {
716         nxt_alert(task, "conf_data_handler: reply port not found");
717         return;
718     }
719 
720     p = MAP_FAILED;
721 
722     /*
723      * Ancient compilers like gcc 4.8.5 on CentOS 7 wants 'size' to be
724      * initialized in 'cleanup' section.
725      */
726     size = 0;
727 
728     tmcf = nxt_router_temp_conf(task);
729     if (nxt_slow_path(tmcf == NULL)) {
730         goto fail;
731     }
732 
733     if (nxt_slow_path(msg->fd[0] == -1)) {
734         nxt_alert(task, "conf_data_handler: invalid shm fd");
735         goto fail;
736     }
737 
738     if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) {
739         nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)",
740                   (int) nxt_buf_mem_used_size(&msg->buf->mem));
741         goto fail;
742     }
743 
744     nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t));
745 
746     p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0);
747 
748     nxt_fd_close(msg->fd[0]);
749     msg->fd[0] = -1;
750 
751     if (nxt_slow_path(p == MAP_FAILED)) {
752         goto fail;
753     }
754 
755     nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p);
756 
757     tmcf->router_conf->router = nxt_router;
758     tmcf->stream = msg->port_msg.stream;
759     tmcf->port = port;
760 
761     nxt_port_use(task, tmcf->port, 1);
762 
763     ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size));
764 
765     if (nxt_fast_path(ret == NXT_OK)) {
766         nxt_router_conf_apply(task, tmcf, NULL);
767 
768     } else {
769         nxt_router_conf_error(task, tmcf);
770     }
771 
772     goto cleanup;
773 
774 fail:
775 
776     nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
777                           msg->port_msg.stream, 0, NULL);
778 
779     if (tmcf != NULL) {
780         nxt_mp_release(tmcf->mem_pool);
781     }
782 
783 cleanup:
784 
785     if (p != MAP_FAILED) {
786         nxt_mem_munmap(p, size);
787     }
788 
789     if (msg->fd[0] != -1) {
790         nxt_fd_close(msg->fd[0]);
791         msg->fd[0] = -1;
792     }
793 }
794 
795 
796 static void
797 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
798     void *data)
799 {
800     union {
801         nxt_pid_t  removed_pid;
802         void       *data;
803     } u;
804 
805     u.data = data;
806 
807     nxt_port_rpc_remove_peer(task, port, u.removed_pid);
808 }
809 
810 
811 static void
812 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
813 {
814     nxt_event_engine_t  *engine;
815 
816     nxt_port_remove_pid_handler(task, msg);
817 
818     nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
819     {
820         if (nxt_fast_path(engine->port != NULL)) {
821             nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid,
822                           msg->u.data);
823         }
824     }
825     nxt_queue_loop;
826 
827     if (msg->port_msg.stream == 0) {
828         return;
829     }
830 
831     msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
832 
833     nxt_port_rpc_handler(task, msg);
834 }
835 
836 
837 static nxt_router_temp_conf_t *
838 nxt_router_temp_conf(nxt_task_t *task)
839 {
840     nxt_mp_t                *mp, *tmp;
841     nxt_router_conf_t       *rtcf;
842     nxt_router_temp_conf_t  *tmcf;
843 
844     mp = nxt_mp_create(1024, 128, 256, 32);
845     if (nxt_slow_path(mp == NULL)) {
846         return NULL;
847     }
848 
849     rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t));
850     if (nxt_slow_path(rtcf == NULL)) {
851         goto fail;
852     }
853 
854     rtcf->mem_pool = mp;
855 
856     tmp = nxt_mp_create(1024, 128, 256, 32);
857     if (nxt_slow_path(tmp == NULL)) {
858         goto fail;
859     }
860 
861     tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t));
862     if (nxt_slow_path(tmcf == NULL)) {
863         goto temp_fail;
864     }
865 
866     tmcf->mem_pool = tmp;
867     tmcf->router_conf = rtcf;
868     tmcf->count = 1;
869     tmcf->engine = task->thread->engine;
870 
871     tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
872                                      sizeof(nxt_router_engine_conf_t));
873     if (nxt_slow_path(tmcf->engines == NULL)) {
874         goto temp_fail;
875     }
876 
877     nxt_queue_init(&creating_sockets);
878     nxt_queue_init(&pending_sockets);
879     nxt_queue_init(&updating_sockets);
880     nxt_queue_init(&keeping_sockets);
881     nxt_queue_init(&deleting_sockets);
882 
883 #if (NXT_TLS)
884     nxt_queue_init(&tmcf->tls);
885 #endif
886 
887     nxt_queue_init(&tmcf->apps);
888     nxt_queue_init(&tmcf->previous);
889 
890     return tmcf;
891 
892 temp_fail:
893 
894     nxt_mp_destroy(tmp);
895 
896 fail:
897 
898     nxt_mp_destroy(mp);
899 
900     return NULL;
901 }
902 
903 
904 nxt_inline nxt_bool_t
905 nxt_router_app_can_start(nxt_app_t *app)
906 {
907     return app->processes + app->pending_processes < app->max_processes
908             && app->pending_processes < app->max_pending_processes;
909 }
910 
911 
912 nxt_inline nxt_bool_t
913 nxt_router_app_need_start(nxt_app_t *app)
914 {
915     return (app->active_requests
916               > app->port_hash_count + app->pending_processes)
917            || (app->spare_processes
918                 > app->idle_processes + app->pending_processes);
919 }
920 
921 
922 static void
923 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
924 {
925     nxt_int_t                    ret;
926     nxt_app_t                    *app;
927     nxt_router_t                 *router;
928     nxt_runtime_t                *rt;
929     nxt_queue_link_t             *qlk;
930     nxt_socket_conf_t            *skcf;
931     nxt_router_conf_t            *rtcf;
932     nxt_router_temp_conf_t       *tmcf;
933     const nxt_event_interface_t  *interface;
934 #if (NXT_TLS)
935     nxt_router_tlssock_t         *tls;
936 #endif
937 
938     tmcf = obj;
939 
940     qlk = nxt_queue_first(&pending_sockets);
941 
942     if (qlk != nxt_queue_tail(&pending_sockets)) {
943         nxt_queue_remove(qlk);
944         nxt_queue_insert_tail(&creating_sockets, qlk);
945 
946         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
947 
948         nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
949 
950         return;
951     }
952 
953 #if (NXT_TLS)
954     qlk = nxt_queue_last(&tmcf->tls);
955 
956     if (qlk != nxt_queue_head(&tmcf->tls)) {
957         nxt_queue_remove(qlk);
958 
959         tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link);
960 
961         nxt_cert_store_get(task, &tls->name, tmcf->mem_pool,
962                            nxt_router_tls_rpc_handler, tls);
963         return;
964     }
965 #endif
966 
967     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
968 
969         if (nxt_router_app_need_start(app)) {
970             nxt_router_app_rpc_create(task, tmcf, app);
971             return;
972         }
973 
974     } nxt_queue_loop;
975 
976     rtcf = tmcf->router_conf;
977 
978     if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) {
979         nxt_router_access_log_open(task, tmcf);
980         return;
981     }
982 
983     rt = task->thread->runtime;
984 
985     interface = nxt_service_get(rt->services, "engine", NULL);
986 
987     router = rtcf->router;
988 
989     ret = nxt_router_engines_create(task, router, tmcf, interface);
990     if (nxt_slow_path(ret != NXT_OK)) {
991         goto fail;
992     }
993 
994     ret = nxt_router_threads_create(task, rt, tmcf);
995     if (nxt_slow_path(ret != NXT_OK)) {
996         goto fail;
997     }
998 
999     nxt_router_apps_sort(task, router, tmcf);
1000 
1001     nxt_router_apps_hash_use(task, rtcf, 1);
1002 
1003     nxt_router_engines_post(router, tmcf);
1004 
1005     nxt_queue_add(&router->sockets, &updating_sockets);
1006     nxt_queue_add(&router->sockets, &creating_sockets);
1007 
1008     router->access_log = rtcf->access_log;
1009 
1010     nxt_router_conf_ready(task, tmcf);
1011 
1012     return;
1013 
1014 fail:
1015 
1016     nxt_router_conf_error(task, tmcf);
1017 
1018     return;
1019 }
1020 
1021 
1022 static void
1023 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data)
1024 {
1025     nxt_joint_job_t  *job;
1026 
1027     job = obj;
1028 
1029     nxt_router_conf_ready(task, job->tmcf);
1030 }
1031 
1032 
1033 static void
1034 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1035 {
1036     uint32_t               count;
1037     nxt_router_conf_t      *rtcf;
1038     nxt_thread_spinlock_t  *lock;
1039 
1040     nxt_debug(task, "temp conf %p count: %D", tmcf, tmcf->count);
1041 
1042     if (--tmcf->count > 0) {
1043         return;
1044     }
1045 
1046     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
1047 
1048     rtcf = tmcf->router_conf;
1049 
1050     lock = &rtcf->router->lock;
1051 
1052     nxt_thread_spin_lock(lock);
1053 
1054     count = rtcf->count;
1055 
1056     nxt_thread_spin_unlock(lock);
1057 
1058     nxt_debug(task, "rtcf %p: %D", rtcf, count);
1059 
1060     if (count == 0) {
1061         nxt_router_apps_hash_use(task, rtcf, -1);
1062 
1063         nxt_router_access_log_release(task, lock, rtcf->access_log);
1064 
1065         nxt_mp_destroy(rtcf->mem_pool);
1066     }
1067 
1068     nxt_mp_release(tmcf->mem_pool);
1069 }
1070 
1071 
1072 static void
1073 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1074 {
1075     nxt_app_t          *app;
1076     nxt_queue_t        new_socket_confs;
1077     nxt_socket_t       s;
1078     nxt_router_t       *router;
1079     nxt_queue_link_t   *qlk;
1080     nxt_socket_conf_t  *skcf;
1081     nxt_router_conf_t  *rtcf;
1082 
1083     nxt_alert(task, "failed to apply new conf");
1084 
1085     for (qlk = nxt_queue_first(&creating_sockets);
1086          qlk != nxt_queue_tail(&creating_sockets);
1087          qlk = nxt_queue_next(qlk))
1088     {
1089         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1090         s = skcf->listen->socket;
1091 
1092         if (s != -1) {
1093             nxt_socket_close(task, s);
1094         }
1095 
1096         nxt_free(skcf->listen);
1097     }
1098 
1099     nxt_queue_init(&new_socket_confs);
1100     nxt_queue_add(&new_socket_confs, &updating_sockets);
1101     nxt_queue_add(&new_socket_confs, &pending_sockets);
1102     nxt_queue_add(&new_socket_confs, &creating_sockets);
1103 
1104     rtcf = tmcf->router_conf;
1105 
1106     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1107 
1108         nxt_router_app_unlink(task, app);
1109 
1110     } nxt_queue_loop;
1111 
1112     router = rtcf->router;
1113 
1114     nxt_queue_add(&router->sockets, &keeping_sockets);
1115     nxt_queue_add(&router->sockets, &deleting_sockets);
1116 
1117     nxt_queue_add(&router->apps, &tmcf->previous);
1118 
1119     // TODO: new engines and threads
1120 
1121     nxt_router_access_log_release(task, &router->lock, rtcf->access_log);
1122 
1123     nxt_mp_destroy(rtcf->mem_pool);
1124 
1125     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
1126 
1127     nxt_mp_release(tmcf->mem_pool);
1128 }
1129 
1130 
1131 static void
1132 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1133     nxt_port_msg_type_t type)
1134 {
1135     nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);
1136 
1137     nxt_port_use(task, tmcf->port, -1);
1138 
1139     tmcf->port = NULL;
1140 }
1141 
1142 
1143 static nxt_conf_map_t  nxt_router_conf[] = {
1144     {
1145         nxt_string("listeners_threads"),
1146         NXT_CONF_MAP_INT32,
1147         offsetof(nxt_router_conf_t, threads),
1148     },
1149 };
1150 
1151 
1152 static nxt_conf_map_t  nxt_router_app_conf[] = {
1153     {
1154         nxt_string("type"),
1155         NXT_CONF_MAP_STR,
1156         offsetof(nxt_router_app_conf_t, type),
1157     },
1158 
1159     {
1160         nxt_string("limits"),
1161         NXT_CONF_MAP_PTR,
1162         offsetof(nxt_router_app_conf_t, limits_value),
1163     },
1164 
1165     {
1166         nxt_string("processes"),
1167         NXT_CONF_MAP_INT32,
1168         offsetof(nxt_router_app_conf_t, processes),
1169     },
1170 
1171     {
1172         nxt_string("processes"),
1173         NXT_CONF_MAP_PTR,
1174         offsetof(nxt_router_app_conf_t, processes_value),
1175     },
1176 
1177     {
1178         nxt_string("targets"),
1179         NXT_CONF_MAP_PTR,
1180         offsetof(nxt_router_app_conf_t, targets_value),
1181     },
1182 };
1183 
1184 
1185 static nxt_conf_map_t  nxt_router_app_limits_conf[] = {
1186     {
1187         nxt_string("timeout"),
1188         NXT_CONF_MAP_MSEC,
1189         offsetof(nxt_router_app_conf_t, timeout),
1190     },
1191 
1192     {
1193         nxt_string("requests"),
1194         NXT_CONF_MAP_INT32,
1195         offsetof(nxt_router_app_conf_t, requests),
1196     },
1197 };
1198 
1199 
1200 static nxt_conf_map_t  nxt_router_app_processes_conf[] = {
1201     {
1202         nxt_string("spare"),
1203         NXT_CONF_MAP_INT32,
1204         offsetof(nxt_router_app_conf_t, spare_processes),
1205     },
1206 
1207     {
1208         nxt_string("max"),
1209         NXT_CONF_MAP_INT32,
1210         offsetof(nxt_router_app_conf_t, max_processes),
1211     },
1212 
1213     {
1214         nxt_string("idle_timeout"),
1215         NXT_CONF_MAP_MSEC,
1216         offsetof(nxt_router_app_conf_t, idle_timeout),
1217     },
1218 };
1219 
1220 
1221 static nxt_conf_map_t  nxt_router_listener_conf[] = {
1222     {
1223         nxt_string("pass"),
1224         NXT_CONF_MAP_STR_COPY,
1225         offsetof(nxt_router_listener_conf_t, pass),
1226     },
1227 
1228     {
1229         nxt_string("application"),
1230         NXT_CONF_MAP_STR_COPY,
1231         offsetof(nxt_router_listener_conf_t, application),
1232     },
1233 };
1234 
1235 
1236 static nxt_conf_map_t  nxt_router_http_conf[] = {
1237     {
1238         nxt_string("header_buffer_size"),
1239         NXT_CONF_MAP_SIZE,
1240         offsetof(nxt_socket_conf_t, header_buffer_size),
1241     },
1242 
1243     {
1244         nxt_string("large_header_buffer_size"),
1245         NXT_CONF_MAP_SIZE,
1246         offsetof(nxt_socket_conf_t, large_header_buffer_size),
1247     },
1248 
1249     {
1250         nxt_string("large_header_buffers"),
1251         NXT_CONF_MAP_SIZE,
1252         offsetof(nxt_socket_conf_t, large_header_buffers),
1253     },
1254 
1255     {
1256         nxt_string("body_buffer_size"),
1257         NXT_CONF_MAP_SIZE,
1258         offsetof(nxt_socket_conf_t, body_buffer_size),
1259     },
1260 
1261     {
1262         nxt_string("max_body_size"),
1263         NXT_CONF_MAP_SIZE,
1264         offsetof(nxt_socket_conf_t, max_body_size),
1265     },
1266 
1267     {
1268         nxt_string("idle_timeout"),
1269         NXT_CONF_MAP_MSEC,
1270         offsetof(nxt_socket_conf_t, idle_timeout),
1271     },
1272 
1273     {
1274         nxt_string("header_read_timeout"),
1275         NXT_CONF_MAP_MSEC,
1276         offsetof(nxt_socket_conf_t, header_read_timeout),
1277     },
1278 
1279     {
1280         nxt_string("body_read_timeout"),
1281         NXT_CONF_MAP_MSEC,
1282         offsetof(nxt_socket_conf_t, body_read_timeout),
1283     },
1284 
1285     {
1286         nxt_string("send_timeout"),
1287         NXT_CONF_MAP_MSEC,
1288         offsetof(nxt_socket_conf_t, send_timeout),
1289     },
1290 
1291     {
1292         nxt_string("body_temp_path"),
1293         NXT_CONF_MAP_STR,
1294         offsetof(nxt_socket_conf_t, body_temp_path),
1295     },
1296 
1297     {
1298         nxt_string("discard_unsafe_fields"),
1299         NXT_CONF_MAP_INT8,
1300         offsetof(nxt_socket_conf_t, discard_unsafe_fields),
1301     },
1302 };
1303 
1304 
1305 static nxt_conf_map_t  nxt_router_websocket_conf[] = {
1306     {
1307         nxt_string("max_frame_size"),
1308         NXT_CONF_MAP_SIZE,
1309         offsetof(nxt_websocket_conf_t, max_frame_size),
1310     },
1311 
1312     {
1313         nxt_string("read_timeout"),
1314         NXT_CONF_MAP_MSEC,
1315         offsetof(nxt_websocket_conf_t, read_timeout),
1316     },
1317 
1318     {
1319         nxt_string("keepalive_interval"),
1320         NXT_CONF_MAP_MSEC,
1321         offsetof(nxt_websocket_conf_t, keepalive_interval),
1322     },
1323 
1324 };
1325 
1326 
1327 static nxt_int_t
1328 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1329     u_char *start, u_char *end)
1330 {
1331     u_char                      *p;
1332     size_t                      size;
1333     nxt_mp_t                    *mp, *app_mp;
1334     uint32_t                    next, next_target;
1335     nxt_int_t                   ret;
1336     nxt_str_t                   name, path, target;
1337     nxt_app_t                   *app, *prev;
1338     nxt_str_t                   *t, *s, *targets;
1339     nxt_uint_t                  n, i;
1340     nxt_port_t                  *port;
1341     nxt_router_t                *router;
1342     nxt_app_joint_t             *app_joint;
1343 #if (NXT_TLS)
1344     nxt_conf_value_t            *certificate, *conf_cmds;
1345 #endif
1346     nxt_conf_value_t            *conf, *http, *value, *websocket;
1347     nxt_conf_value_t            *applications, *application;
1348     nxt_conf_value_t            *listeners, *listener;
1349     nxt_conf_value_t            *routes_conf, *static_conf;
1350     nxt_socket_conf_t           *skcf;
1351     nxt_http_routes_t           *routes;
1352     nxt_event_engine_t          *engine;
1353     nxt_app_lang_module_t       *lang;
1354     nxt_router_app_conf_t       apcf;
1355     nxt_router_access_log_t     *access_log;
1356     nxt_router_listener_conf_t  lscf;
1357 
1358     static nxt_str_t  http_path = nxt_string("/settings/http");
1359     static nxt_str_t  applications_path = nxt_string("/applications");
1360     static nxt_str_t  listeners_path = nxt_string("/listeners");
1361     static nxt_str_t  routes_path = nxt_string("/routes");
1362     static nxt_str_t  access_log_path = nxt_string("/access_log");
1363 #if (NXT_TLS)
1364     static nxt_str_t  certificate_path = nxt_string("/tls/certificate");
1365     static nxt_str_t  conf_commands_path = nxt_string("/tls/conf_commands");
1366 #endif
1367     static nxt_str_t  static_path = nxt_string("/settings/http/static");
1368     static nxt_str_t  websocket_path = nxt_string("/settings/http/websocket");
1369 
1370     conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
1371     if (conf == NULL) {
1372         nxt_alert(task, "configuration parsing error");
1373         return NXT_ERROR;
1374     }
1375 
1376     mp = tmcf->router_conf->mem_pool;
1377 
1378     ret = nxt_conf_map_object(mp, conf, nxt_router_conf,
1379                               nxt_nitems(nxt_router_conf), tmcf->router_conf);
1380     if (ret != NXT_OK) {
1381         nxt_alert(task, "root map error");
1382         return NXT_ERROR;
1383     }
1384 
1385     if (tmcf->router_conf->threads == 0) {
1386         tmcf->router_conf->threads = nxt_ncpu;
1387     }
1388 
1389     static_conf = nxt_conf_get_path(conf, &static_path);
1390 
1391     ret = nxt_router_conf_process_static(task, tmcf->router_conf, static_conf);
1392     if (nxt_slow_path(ret != NXT_OK)) {
1393         return NXT_ERROR;
1394     }
1395 
1396     router = tmcf->router_conf->router;
1397 
1398     applications = nxt_conf_get_path(conf, &applications_path);
1399 
1400     if (applications != NULL) {
1401         next = 0;
1402 
1403         for ( ;; ) {
1404             application = nxt_conf_next_object_member(applications,
1405                                                       &name, &next);
1406             if (application == NULL) {
1407                 break;
1408             }
1409 
1410             nxt_debug(task, "application \"%V\"", &name);
1411 
1412             size = nxt_conf_json_length(application, NULL);
1413 
1414             app_mp = nxt_mp_create(4096, 128, 1024, 64);
1415             if (nxt_slow_path(app_mp == NULL)) {
1416                 goto fail;
1417             }
1418 
1419             app = nxt_mp_get(app_mp, sizeof(nxt_app_t) + name.length + size);
1420             if (app == NULL) {
1421                 goto app_fail;
1422             }
1423 
1424             nxt_memzero(app, sizeof(nxt_app_t));
1425 
1426             app->mem_pool = app_mp;
1427 
1428             app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t));
1429             app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t)
1430                                                   + name.length);
1431 
1432             p = nxt_conf_json_print(app->conf.start, application, NULL);
1433             app->conf.length = p - app->conf.start;
1434 
1435             nxt_assert(app->conf.length <= size);
1436 
1437             nxt_debug(task, "application conf \"%V\"", &app->conf);
1438 
1439             prev = nxt_router_app_find(&router->apps, &name);
1440 
1441             if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
1442                 nxt_mp_destroy(app_mp);
1443 
1444                 nxt_queue_remove(&prev->link);
1445                 nxt_queue_insert_tail(&tmcf->previous, &prev->link);
1446 
1447                 ret = nxt_router_apps_hash_add(tmcf->router_conf, prev);
1448                 if (nxt_slow_path(ret != NXT_OK)) {
1449                     goto fail;
1450                 }
1451 
1452                 continue;
1453             }
1454 
1455             apcf.processes = 1;
1456             apcf.max_processes = 1;
1457             apcf.spare_processes = 0;
1458             apcf.timeout = 0;
1459             apcf.idle_timeout = 15000;
1460             apcf.requests = 0;
1461             apcf.limits_value = NULL;
1462             apcf.processes_value = NULL;
1463             apcf.targets_value = NULL;
1464 
1465             app_joint = nxt_malloc(sizeof(nxt_app_joint_t));
1466             if (nxt_slow_path(app_joint == NULL)) {
1467                 goto app_fail;
1468             }
1469 
1470             nxt_memzero(app_joint, sizeof(nxt_app_joint_t));
1471 
1472             ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
1473                                       nxt_nitems(nxt_router_app_conf), &apcf);
1474             if (ret != NXT_OK) {
1475                 nxt_alert(task, "application map error");
1476                 goto app_fail;
1477             }
1478 
1479             if (apcf.limits_value != NULL) {
1480 
1481                 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) {
1482                     nxt_alert(task, "application limits is not object");
1483                     goto app_fail;
1484                 }
1485 
1486                 ret = nxt_conf_map_object(mp, apcf.limits_value,
1487                                         nxt_router_app_limits_conf,
1488                                         nxt_nitems(nxt_router_app_limits_conf),
1489                                         &apcf);
1490                 if (ret != NXT_OK) {
1491                     nxt_alert(task, "application limits map error");
1492                     goto app_fail;
1493                 }
1494             }
1495 
1496             if (apcf.processes_value != NULL
1497                 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT)
1498             {
1499                 ret = nxt_conf_map_object(mp, apcf.processes_value,
1500                                      nxt_router_app_processes_conf,
1501                                      nxt_nitems(nxt_router_app_processes_conf),
1502                                      &apcf);
1503                 if (ret != NXT_OK) {
1504                     nxt_alert(task, "application processes map error");
1505                     goto app_fail;
1506                 }
1507 
1508             } else {
1509                 apcf.max_processes = apcf.processes;
1510                 apcf.spare_processes = apcf.processes;
1511             }
1512 
1513             if (apcf.targets_value != NULL) {
1514                 n = nxt_conf_object_members_count(apcf.targets_value);
1515 
1516                 targets = nxt_mp_get(app_mp, sizeof(nxt_str_t) * n);
1517                 if (nxt_slow_path(targets == NULL)) {
1518                     goto app_fail;
1519                 }
1520 
1521                 next_target = 0;
1522 
1523                 for (i = 0; i < n; i++) {
1524                     (void) nxt_conf_next_object_member(apcf.targets_value,
1525                                                        &target, &next_target);
1526 
1527                     s = nxt_str_dup(app_mp, &targets[i], &target);
1528                     if (nxt_slow_path(s == NULL)) {
1529                         goto app_fail;
1530                     }
1531                 }
1532 
1533             } else {
1534                 targets = NULL;
1535             }
1536 
1537             nxt_debug(task, "application type: %V", &apcf.type);
1538             nxt_debug(task, "application processes: %D", apcf.processes);
1539             nxt_debug(task, "application request timeout: %M", apcf.timeout);
1540             nxt_debug(task, "application requests: %D", apcf.requests);
1541 
1542             lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
1543 
1544             if (lang == NULL) {
1545                 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type);
1546                 goto app_fail;
1547             }
1548 
1549             nxt_debug(task, "application language module: \"%s\"", lang->file);
1550 
1551             ret = nxt_thread_mutex_create(&app->mutex);
1552             if (ret != NXT_OK) {
1553                 goto app_fail;
1554             }
1555 
1556             nxt_queue_init(&app->ports);
1557             nxt_queue_init(&app->spare_ports);
1558             nxt_queue_init(&app->idle_ports);
1559             nxt_queue_init(&app->ack_waiting_req);
1560 
1561             app->name.length = name.length;
1562             nxt_memcpy(app->name.start, name.start, name.length);
1563 
1564             app->type = lang->type;
1565             app->max_processes = apcf.max_processes;
1566             app->spare_processes = apcf.spare_processes;
1567             app->max_pending_processes = apcf.spare_processes
1568                                          ? apcf.spare_processes : 1;
1569             app->timeout = apcf.timeout;
1570             app->idle_timeout = apcf.idle_timeout;
1571             app->max_requests = apcf.requests;
1572 
1573             app->targets = targets;
1574 
1575             engine = task->thread->engine;
1576 
1577             app->engine = engine;
1578 
1579             app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;
1580             app->adjust_idle_work.task = &engine->task;
1581             app->adjust_idle_work.obj = app;
1582 
1583             nxt_queue_insert_tail(&tmcf->apps, &app->link);
1584 
1585             ret = nxt_router_apps_hash_add(tmcf->router_conf, app);
1586             if (nxt_slow_path(ret != NXT_OK)) {
1587                 goto app_fail;
1588             }
1589 
1590             nxt_router_app_use(task, app, 1);
1591 
1592             app->joint = app_joint;
1593 
1594             app_joint->use_count = 1;
1595             app_joint->app = app;
1596 
1597             app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS;
1598             app_joint->idle_timer.work_queue = &engine->fast_work_queue;
1599             app_joint->idle_timer.handler = nxt_router_app_idle_timeout;
1600             app_joint->idle_timer.task = &engine->task;
1601             app_joint->idle_timer.log = app_joint->idle_timer.task->log;
1602 
1603             app_joint->free_app_work.handler = nxt_router_free_app;
1604             app_joint->free_app_work.task = &engine->task;
1605             app_joint->free_app_work.obj = app_joint;
1606 
1607             port = nxt_port_new(task, (nxt_port_id_t) -1, nxt_pid,
1608                                 NXT_PROCESS_APP);
1609             if (nxt_slow_path(port == NULL)) {
1610                 return NXT_ERROR;
1611             }
1612 
1613             ret = nxt_port_socket_init(task, port, 0);
1614             if (nxt_slow_path(ret != NXT_OK)) {
1615                 nxt_port_use(task, port, -1);
1616                 return NXT_ERROR;
1617             }
1618 
1619             ret = nxt_router_app_queue_init(task, port);
1620             if (nxt_slow_path(ret != NXT_OK)) {
1621                 nxt_port_write_close(port);
1622                 nxt_port_read_close(port);
1623                 nxt_port_use(task, port, -1);
1624                 return NXT_ERROR;
1625             }
1626 
1627             nxt_port_write_enable(task, port);
1628             port->app = app;
1629 
1630             app->shared_port = port;
1631 
1632             nxt_thread_mutex_create(&app->outgoing.mutex);
1633         }
1634     }
1635 
1636     routes_conf = nxt_conf_get_path(conf, &routes_path);
1637     if (nxt_fast_path(routes_conf != NULL)) {
1638         routes = nxt_http_routes_create(task, tmcf, routes_conf);
1639         if (nxt_slow_path(routes == NULL)) {
1640             return NXT_ERROR;
1641         }
1642         tmcf->router_conf->routes = routes;
1643     }
1644 
1645     ret = nxt_upstreams_create(task, tmcf, conf);
1646     if (nxt_slow_path(ret != NXT_OK)) {
1647         return ret;
1648     }
1649 
1650     http = nxt_conf_get_path(conf, &http_path);
1651 #if 0
1652     if (http == NULL) {
1653         nxt_alert(task, "no \"http\" block");
1654         return NXT_ERROR;
1655     }
1656 #endif
1657 
1658     websocket = nxt_conf_get_path(conf, &websocket_path);
1659 
1660     listeners = nxt_conf_get_path(conf, &listeners_path);
1661 
1662     if (listeners != NULL) {
1663         next = 0;
1664 
1665         for ( ;; ) {
1666             listener = nxt_conf_next_object_member(listeners, &name, &next);
1667             if (listener == NULL) {
1668                 break;
1669             }
1670 
1671             skcf = nxt_router_socket_conf(task, tmcf, &name);
1672             if (skcf == NULL) {
1673                 goto fail;
1674             }
1675 
1676             nxt_memzero(&lscf, sizeof(lscf));
1677 
1678             ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
1679                                       nxt_nitems(nxt_router_listener_conf),
1680                                       &lscf);
1681             if (ret != NXT_OK) {
1682                 nxt_alert(task, "listener map error");
1683                 goto fail;
1684             }
1685 
1686             nxt_debug(task, "application: %V", &lscf.application);
1687 
1688             // STUB, default values if http block is not defined.
1689             skcf->header_buffer_size = 2048;
1690             skcf->large_header_buffer_size = 8192;
1691             skcf->large_header_buffers = 4;
1692             skcf->discard_unsafe_fields = 1;
1693             skcf->body_buffer_size = 16 * 1024;
1694             skcf->max_body_size = 8 * 1024 * 1024;
1695             skcf->proxy_header_buffer_size = 64 * 1024;
1696             skcf->proxy_buffer_size = 4096;
1697             skcf->proxy_buffers = 256;
1698             skcf->idle_timeout = 180 * 1000;
1699             skcf->header_read_timeout = 30 * 1000;
1700             skcf->body_read_timeout = 30 * 1000;
1701             skcf->send_timeout = 30 * 1000;
1702             skcf->proxy_timeout = 60 * 1000;
1703             skcf->proxy_send_timeout = 30 * 1000;
1704             skcf->proxy_read_timeout = 30 * 1000;
1705 
1706             skcf->websocket_conf.max_frame_size = 1024 * 1024;
1707             skcf->websocket_conf.read_timeout = 60 * 1000;
1708             skcf->websocket_conf.keepalive_interval = 30 * 1000;
1709 
1710             nxt_str_null(&skcf->body_temp_path);
1711 
1712             if (http != NULL) {
1713                 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
1714                                           nxt_nitems(nxt_router_http_conf),
1715                                           skcf);
1716                 if (ret != NXT_OK) {
1717                     nxt_alert(task, "http map error");
1718                     goto fail;
1719                 }
1720             }
1721 
1722             if (websocket != NULL) {
1723                 ret = nxt_conf_map_object(mp, websocket,
1724                                           nxt_router_websocket_conf,
1725                                           nxt_nitems(nxt_router_websocket_conf),
1726                                           &skcf->websocket_conf);
1727                 if (ret != NXT_OK) {
1728                     nxt_alert(task, "websocket map error");
1729                     goto fail;
1730                 }
1731             }
1732 
1733             t = &skcf->body_temp_path;
1734 
1735             if (t->length == 0) {
1736                 t->start = (u_char *) task->thread->runtime->tmp;
1737                 t->length = nxt_strlen(t->start);
1738             }
1739 
1740 #if (NXT_TLS)
1741             certificate = nxt_conf_get_path(listener, &certificate_path);
1742 
1743             if (certificate != NULL) {
1744                 conf_cmds = nxt_conf_get_path(listener, &conf_commands_path);
1745 
1746                 if (nxt_conf_type(certificate) == NXT_CONF_ARRAY) {
1747                     n = nxt_conf_array_elements_count(certificate);
1748 
1749                     for (i = 0; i < n; i++) {
1750                         value = nxt_conf_get_array_element(certificate, i);
1751 
1752                         nxt_assert(value != NULL);
1753 
1754                         ret = nxt_router_conf_tls_insert(tmcf, value, skcf,
1755                                                          conf_cmds, i == 0);
1756                         if (nxt_slow_path(ret != NXT_OK)) {
1757                             goto fail;
1758                         }
1759                     }
1760 
1761                 } else {
1762                     /* NXT_CONF_STRING */
1763                     ret = nxt_router_conf_tls_insert(tmcf, certificate, skcf,
1764                                                      conf_cmds, 1);
1765                     if (nxt_slow_path(ret != NXT_OK)) {
1766                         goto fail;
1767                     }
1768                 }
1769             }
1770 #endif
1771 
1772             skcf->listen->handler = nxt_http_conn_init;
1773             skcf->router_conf = tmcf->router_conf;
1774             skcf->router_conf->count++;
1775 
1776             if (lscf.pass.length != 0) {
1777                 skcf->action = nxt_http_action_create(task, tmcf, &lscf.pass);
1778 
1779             /* COMPATIBILITY: listener application. */
1780             } else if (lscf.application.length > 0) {
1781                 skcf->action = nxt_http_pass_application(task,
1782                                                          tmcf->router_conf,
1783                                                          &lscf.application);
1784             }
1785 
1786             if (nxt_slow_path(skcf->action == NULL)) {
1787                 goto fail;
1788             }
1789         }
1790     }
1791 
1792     ret = nxt_http_routes_resolve(task, tmcf);
1793     if (nxt_slow_path(ret != NXT_OK)) {
1794         goto fail;
1795     }
1796 
1797     value = nxt_conf_get_path(conf, &access_log_path);
1798 
1799     if (value != NULL) {
1800         nxt_conf_get_string(value, &path);
1801 
1802         access_log = router->access_log;
1803 
1804         if (access_log != NULL && nxt_strstr_eq(&path, &access_log->path)) {
1805             nxt_thread_spin_lock(&router->lock);
1806             access_log->count++;
1807             nxt_thread_spin_unlock(&router->lock);
1808 
1809         } else {
1810             access_log = nxt_malloc(sizeof(nxt_router_access_log_t)
1811                                     + path.length);
1812             if (access_log == NULL) {
1813                 nxt_alert(task, "failed to allocate access log structure");
1814                 goto fail;
1815             }
1816 
1817             access_log->fd = -1;
1818             access_log->handler = &nxt_router_access_log_writer;
1819             access_log->count = 1;
1820 
1821             access_log->path.length = path.length;
1822             access_log->path.start = (u_char *) access_log
1823                                      + sizeof(nxt_router_access_log_t);
1824 
1825             nxt_memcpy(access_log->path.start, path.start, path.length);
1826         }
1827 
1828         tmcf->router_conf->access_log = access_log;
1829     }
1830 
1831     nxt_queue_add(&deleting_sockets, &router->sockets);
1832     nxt_queue_init(&router->sockets);
1833 
1834     return NXT_OK;
1835 
1836 app_fail:
1837 
1838     nxt_mp_destroy(app_mp);
1839 
1840 fail:
1841 
1842     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1843 
1844         nxt_queue_remove(&app->link);
1845         nxt_thread_mutex_destroy(&app->mutex);
1846         nxt_mp_destroy(app->mem_pool);
1847 
1848     } nxt_queue_loop;
1849 
1850     return NXT_ERROR;
1851 }
1852 
1853 
1854 #if (NXT_TLS)
1855 
1856 static nxt_int_t
1857 nxt_router_conf_tls_insert(nxt_router_temp_conf_t *tmcf,
1858     nxt_conf_value_t *value, nxt_socket_conf_t *skcf,
1859     nxt_conf_value_t *conf_cmds, nxt_bool_t last)
1860 {
1861     nxt_router_tlssock_t  *tls;
1862 
1863     tls = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_router_tlssock_t));
1864     if (nxt_slow_path(tls == NULL)) {
1865         return NXT_ERROR;
1866     }
1867 
1868     tls->socket_conf = skcf;
1869     tls->conf_cmds = conf_cmds;
1870     tls->temp_conf = tmcf;
1871     tls->last = last;
1872     nxt_conf_get_string(value, &tls->name);
1873 
1874     nxt_queue_insert_tail(&tmcf->tls, &tls->link);
1875 
1876     return NXT_OK;
1877 }
1878 
1879 #endif
1880 
1881 
1882 static nxt_int_t
1883 nxt_router_conf_process_static(nxt_task_t *task, nxt_router_conf_t *rtcf,
1884     nxt_conf_value_t *conf)
1885 {
1886     uint32_t          next, i;
1887     nxt_mp_t          *mp;
1888     nxt_str_t         *type, extension, str;
1889     nxt_int_t         ret;
1890     nxt_uint_t        exts;
1891     nxt_conf_value_t  *mtypes_conf, *ext_conf, *value;
1892 
1893     static nxt_str_t  mtypes_path = nxt_string("/mime_types");
1894 
1895     mp = rtcf->mem_pool;
1896 
1897     ret = nxt_http_static_mtypes_init(mp, &rtcf->mtypes_hash);
1898     if (nxt_slow_path(ret != NXT_OK)) {
1899         return NXT_ERROR;
1900     }
1901 
1902     if (conf == NULL) {
1903         return NXT_OK;
1904     }
1905 
1906     mtypes_conf = nxt_conf_get_path(conf, &mtypes_path);
1907 
1908     if (mtypes_conf != NULL) {
1909         next = 0;
1910 
1911         for ( ;; ) {
1912             ext_conf = nxt_conf_next_object_member(mtypes_conf, &str, &next);
1913 
1914             if (ext_conf == NULL) {
1915                 break;
1916             }
1917 
1918             type = nxt_str_dup(mp, NULL, &str);
1919             if (nxt_slow_path(type == NULL)) {
1920                 return NXT_ERROR;
1921             }
1922 
1923             if (nxt_conf_type(ext_conf) == NXT_CONF_STRING) {
1924                 nxt_conf_get_string(ext_conf, &str);
1925 
1926                 if (nxt_slow_path(nxt_str_dup(mp, &extension, &str) == NULL)) {
1927                     return NXT_ERROR;
1928                 }
1929 
1930                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
1931                                                       &extension, type);
1932                 if (nxt_slow_path(ret != NXT_OK)) {
1933                     return NXT_ERROR;
1934                 }
1935 
1936                 continue;
1937             }
1938 
1939             exts = nxt_conf_array_elements_count(ext_conf);
1940 
1941             for (i = 0; i < exts; i++) {
1942                 value = nxt_conf_get_array_element(ext_conf, i);
1943 
1944                 nxt_conf_get_string(value, &str);
1945 
1946                 if (nxt_slow_path(nxt_str_dup(mp, &extension, &str) == NULL)) {
1947                     return NXT_ERROR;
1948                 }
1949 
1950                 ret = nxt_http_static_mtypes_hash_add(mp, &rtcf->mtypes_hash,
1951                                                       &extension, type);
1952                 if (nxt_slow_path(ret != NXT_OK)) {
1953                     return NXT_ERROR;
1954                 }
1955             }
1956         }
1957     }
1958 
1959     return NXT_OK;
1960 }
1961 
1962 
1963 static nxt_app_t *
1964 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
1965 {
1966     nxt_app_t  *app;
1967 
1968     nxt_queue_each(app, queue, nxt_app_t, link) {
1969 
1970         if (nxt_strstr_eq(name, &app->name)) {
1971             return app;
1972         }
1973 
1974     } nxt_queue_loop;
1975 
1976     return NULL;
1977 }
1978 
1979 
1980 static nxt_int_t
1981 nxt_router_app_queue_init(nxt_task_t *task, nxt_port_t *port)
1982 {
1983     void       *mem;
1984     nxt_int_t  fd;
1985 
1986     fd = nxt_shm_open(task, sizeof(nxt_app_queue_t));
1987     if (nxt_slow_path(fd == -1)) {
1988         return NXT_ERROR;
1989     }
1990 
1991     mem = nxt_mem_mmap(NULL, sizeof(nxt_app_queue_t),
1992                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
1993     if (nxt_slow_path(mem == MAP_FAILED)) {
1994         nxt_fd_close(fd);
1995 
1996         return NXT_ERROR;
1997     }
1998 
1999     nxt_app_queue_init(mem);
2000 
2001     port->queue_fd = fd;
2002     port->queue = mem;
2003 
2004     return NXT_OK;
2005 }
2006 
2007 
2008 static nxt_int_t
2009 nxt_router_port_queue_init(nxt_task_t *task, nxt_port_t *port)
2010 {
2011     void       *mem;
2012     nxt_int_t  fd;
2013 
2014     fd = nxt_shm_open(task, sizeof(nxt_port_queue_t));
2015     if (nxt_slow_path(fd == -1)) {
2016         return NXT_ERROR;
2017     }
2018 
2019     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2020                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2021     if (nxt_slow_path(mem == MAP_FAILED)) {
2022         nxt_fd_close(fd);
2023 
2024         return NXT_ERROR;
2025     }
2026 
2027     nxt_port_queue_init(mem);
2028 
2029     port->queue_fd = fd;
2030     port->queue = mem;
2031 
2032     return NXT_OK;
2033 }
2034 
2035 
2036 static nxt_int_t
2037 nxt_router_port_queue_map(nxt_task_t *task, nxt_port_t *port, nxt_fd_t fd)
2038 {
2039     void  *mem;
2040 
2041     nxt_assert(fd != -1);
2042 
2043     mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
2044                        PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
2045     if (nxt_slow_path(mem == MAP_FAILED)) {
2046 
2047         return NXT_ERROR;
2048     }
2049 
2050     port->queue = mem;
2051 
2052     return NXT_OK;
2053 }
2054 
2055 
2056 static const nxt_lvlhsh_proto_t  nxt_router_apps_hash_proto  nxt_aligned(64) = {
2057     NXT_LVLHSH_DEFAULT,
2058     nxt_router_apps_hash_test,
2059     nxt_mp_lvlhsh_alloc,
2060     nxt_mp_lvlhsh_free,
2061 };
2062 
2063 
2064 static nxt_int_t
2065 nxt_router_apps_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
2066 {
2067     nxt_app_t  *app;
2068 
2069     app = data;
2070 
2071     return nxt_strstr_eq(&lhq->key, &app->name) ? NXT_OK : NXT_DECLINED;
2072 }
2073 
2074 
2075 static nxt_int_t
2076 nxt_router_apps_hash_add(nxt_router_conf_t *rtcf, nxt_app_t *app)
2077 {
2078     nxt_lvlhsh_query_t  lhq;
2079 
2080     lhq.key_hash = nxt_djb_hash(app->name.start, app->name.length);
2081     lhq.replace = 0;
2082     lhq.key = app->name;
2083     lhq.value = app;
2084     lhq.proto = &nxt_router_apps_hash_proto;
2085     lhq.pool = rtcf->mem_pool;
2086 
2087     switch (nxt_lvlhsh_insert(&rtcf->apps_hash, &lhq)) {
2088 
2089     case NXT_OK:
2090         return NXT_OK;
2091 
2092     case NXT_DECLINED:
2093         nxt_thread_log_alert("router app hash adding failed: "
2094                              "\"%V\" is already in hash", &lhq.key);
2095         /* Fall through. */
2096     default:
2097         return NXT_ERROR;
2098     }
2099 }
2100 
2101 
2102 static nxt_app_t *
2103 nxt_router_apps_hash_get(nxt_router_conf_t *rtcf, nxt_str_t *name)
2104 {
2105     nxt_lvlhsh_query_t  lhq;
2106 
2107     lhq.key_hash = nxt_djb_hash(name->start, name->length);
2108     lhq.key = *name;
2109     lhq.proto = &nxt_router_apps_hash_proto;
2110 
2111     if (nxt_lvlhsh_find(&rtcf->apps_hash, &lhq) != NXT_OK) {
2112         return NULL;
2113     }
2114 
2115     return lhq.value;
2116 }
2117 
2118 
2119 static void
2120 nxt_router_apps_hash_use(nxt_task_t *task, nxt_router_conf_t *rtcf, int i)
2121 {
2122     nxt_app_t          *app;
2123     nxt_lvlhsh_each_t  lhe;
2124 
2125     nxt_lvlhsh_each_init(&lhe, &nxt_router_apps_hash_proto);
2126 
2127     for ( ;; ) {
2128         app = nxt_lvlhsh_each(&rtcf->apps_hash, &lhe);
2129 
2130         if (app == NULL) {
2131             break;
2132         }
2133 
2134         nxt_router_app_use(task, app, i);
2135     }
2136 }
2137 
2138 
2139 
2140 nxt_int_t
2141 nxt_router_listener_application(nxt_router_conf_t *rtcf, nxt_str_t *name,
2142     nxt_http_action_t *action)
2143 {
2144     nxt_app_t  *app;
2145 
2146     app = nxt_router_apps_hash_get(rtcf, name);
2147 
2148     if (app == NULL) {
2149         return NXT_DECLINED;
2150     }
2151 
2152     action->u.app.application = app;
2153     action->handler = nxt_http_application_handler;
2154 
2155     return NXT_OK;
2156 }
2157 
2158 
2159 static nxt_socket_conf_t *
2160 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2161     nxt_str_t *name)
2162 {
2163     size_t               size;
2164     nxt_int_t            ret;
2165     nxt_bool_t           wildcard;
2166     nxt_sockaddr_t       *sa;
2167     nxt_socket_conf_t    *skcf;
2168     nxt_listen_socket_t  *ls;
2169 
2170     sa = nxt_sockaddr_parse(tmcf->mem_pool, name);
2171     if (nxt_slow_path(sa == NULL)) {
2172         nxt_alert(task, "invalid listener \"%V\"", name);
2173         return NULL;
2174     }
2175 
2176     sa->type = SOCK_STREAM;
2177 
2178     nxt_debug(task, "router listener: \"%*s\"",
2179               (size_t) sa->length, nxt_sockaddr_start(sa));
2180 
2181     skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t));
2182     if (nxt_slow_path(skcf == NULL)) {
2183         return NULL;
2184     }
2185 
2186     size = nxt_sockaddr_size(sa);
2187 
2188     ret = nxt_router_listen_socket_find(tmcf, skcf, sa);
2189 
2190     if (ret != NXT_OK) {
2191 
2192         ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size);
2193         if (nxt_slow_path(ls == NULL)) {
2194             return NULL;
2195         }
2196 
2197         skcf->listen = ls;
2198 
2199         ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t));
2200         nxt_memcpy(ls->sockaddr, sa, size);
2201 
2202         nxt_listen_socket_remote_size(ls);
2203 
2204         ls->socket = -1;
2205         ls->backlog = NXT_LISTEN_BACKLOG;
2206         ls->flags = NXT_NONBLOCK;
2207         ls->read_after_accept = 1;
2208     }
2209 
2210     switch (sa->u.sockaddr.sa_family) {
2211 #if (NXT_HAVE_UNIX_DOMAIN)
2212     case AF_UNIX:
2213         wildcard = 0;
2214         break;
2215 #endif
2216 #if (NXT_INET6)
2217     case AF_INET6:
2218         wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr);
2219         break;
2220 #endif
2221     case AF_INET:
2222     default:
2223         wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY);
2224         break;
2225     }
2226 
2227     if (!wildcard) {
2228         skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size);
2229         if (nxt_slow_path(skcf->sockaddr == NULL)) {
2230             return NULL;
2231         }
2232 
2233         nxt_memcpy(skcf->sockaddr, sa, size);
2234     }
2235 
2236     return skcf;
2237 }
2238 
2239 
2240 static nxt_int_t
2241 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
2242     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa)
2243 {
2244     nxt_router_t       *router;
2245     nxt_queue_link_t   *qlk;
2246     nxt_socket_conf_t  *skcf;
2247 
2248     router = tmcf->router_conf->router;
2249 
2250     for (qlk = nxt_queue_first(&router->sockets);
2251          qlk != nxt_queue_tail(&router->sockets);
2252          qlk = nxt_queue_next(qlk))
2253     {
2254         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2255 
2256         if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) {
2257             nskcf->listen = skcf->listen;
2258 
2259             nxt_queue_remove(qlk);
2260             nxt_queue_insert_tail(&keeping_sockets, qlk);
2261 
2262             nxt_queue_insert_tail(&updating_sockets, &nskcf->link);
2263 
2264             return NXT_OK;
2265         }
2266     }
2267 
2268     nxt_queue_insert_tail(&pending_sockets, &nskcf->link);
2269 
2270     return NXT_DECLINED;
2271 }
2272 
2273 
2274 static void
2275 nxt_router_listen_socket_rpc_create(nxt_task_t *task,
2276     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)
2277 {
2278     size_t            size;
2279     uint32_t          stream;
2280     nxt_int_t         ret;
2281     nxt_buf_t         *b;
2282     nxt_port_t        *main_port, *router_port;
2283     nxt_runtime_t     *rt;
2284     nxt_socket_rpc_t  *rpc;
2285 
2286     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
2287     if (rpc == NULL) {
2288         goto fail;
2289     }
2290 
2291     rpc->socket_conf = skcf;
2292     rpc->temp_conf = tmcf;
2293 
2294     size = nxt_sockaddr_size(skcf->listen->sockaddr);
2295 
2296     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2297     if (b == NULL) {
2298         goto fail;
2299     }
2300 
2301     b->completion_handler = nxt_router_dummy_buf_completion;
2302 
2303     b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size);
2304 
2305     rt = task->thread->runtime;
2306     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2307     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2308 
2309     stream = nxt_port_rpc_register_handler(task, router_port,
2310                                            nxt_router_listen_socket_ready,
2311                                            nxt_router_listen_socket_error,
2312                                            main_port->pid, rpc);
2313     if (nxt_slow_path(stream == 0)) {
2314         goto fail;
2315     }
2316 
2317     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1,
2318                                 stream, router_port->id, b);
2319 
2320     if (nxt_slow_path(ret != NXT_OK)) {
2321         nxt_port_rpc_cancel(task, router_port, stream);
2322         goto fail;
2323     }
2324 
2325     return;
2326 
2327 fail:
2328 
2329     nxt_router_conf_error(task, tmcf);
2330 }
2331 
2332 
2333 static void
2334 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2335     void *data)
2336 {
2337     nxt_int_t         ret;
2338     nxt_socket_t      s;
2339     nxt_socket_rpc_t  *rpc;
2340 
2341     rpc = data;
2342 
2343     s = msg->fd[0];
2344 
2345     ret = nxt_socket_nonblocking(task, s);
2346     if (nxt_slow_path(ret != NXT_OK)) {
2347         goto fail;
2348     }
2349 
2350     nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr);
2351 
2352     ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
2353     if (nxt_slow_path(ret != NXT_OK)) {
2354         goto fail;
2355     }
2356 
2357     rpc->socket_conf->listen->socket = s;
2358 
2359     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2360                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2361 
2362     return;
2363 
2364 fail:
2365 
2366     nxt_socket_close(task, s);
2367 
2368     nxt_router_conf_error(task, rpc->temp_conf);
2369 }
2370 
2371 
2372 static void
2373 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2374     void *data)
2375 {
2376     nxt_socket_rpc_t        *rpc;
2377     nxt_router_temp_conf_t  *tmcf;
2378 
2379     rpc = data;
2380     tmcf = rpc->temp_conf;
2381 
2382 #if 0
2383     u_char                  *p;
2384     size_t                  size;
2385     uint8_t                 error;
2386     nxt_buf_t               *in, *out;
2387     nxt_sockaddr_t          *sa;
2388 
2389     static nxt_str_t  socket_errors[] = {
2390         nxt_string("ListenerSystem"),
2391         nxt_string("ListenerNoIPv6"),
2392         nxt_string("ListenerPort"),
2393         nxt_string("ListenerInUse"),
2394         nxt_string("ListenerNoAddress"),
2395         nxt_string("ListenerNoAccess"),
2396         nxt_string("ListenerPath"),
2397     };
2398 
2399     sa = rpc->socket_conf->listen->sockaddr;
2400 
2401     in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size);
2402 
2403     if (nxt_slow_path(in == NULL)) {
2404         return;
2405     }
2406 
2407     p = in->mem.pos;
2408 
2409     error = *p++;
2410 
2411     size = nxt_length("listen socket error: ")
2412            + nxt_length("{listener: \"\", code:\"\", message: \"\"}")
2413            + sa->length + socket_errors[error].length + (in->mem.free - p);
2414 
2415     out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2416     if (nxt_slow_path(out == NULL)) {
2417         return;
2418     }
2419 
2420     out->mem.free = nxt_sprintf(out->mem.free, out->mem.end,
2421                         "listen socket error: "
2422                         "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}",
2423                         (size_t) sa->length, nxt_sockaddr_start(sa),
2424                         &socket_errors[error], in->mem.free - p, p);
2425 
2426     nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos);
2427 #endif
2428 
2429     nxt_router_conf_error(task, tmcf);
2430 }
2431 
2432 
2433 #if (NXT_TLS)
2434 
2435 static void
2436 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2437     void *data)
2438 {
2439     nxt_mp_t                *mp;
2440     nxt_int_t               ret;
2441     nxt_tls_conf_t          *tlscf;
2442     nxt_router_tlssock_t    *tls;
2443     nxt_tls_bundle_conf_t   *bundle;
2444     nxt_router_temp_conf_t  *tmcf;
2445 
2446     nxt_debug(task, "tls rpc handler");
2447 
2448     tls = data;
2449     tmcf = tls->temp_conf;
2450 
2451     if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) {
2452         goto fail;
2453     }
2454 
2455     mp = tmcf->router_conf->mem_pool;
2456 
2457     if (tls->socket_conf->tls == NULL){
2458         tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t));
2459         if (nxt_slow_path(tlscf == NULL)) {
2460             goto fail;
2461         }
2462 
2463         tlscf->no_wait_shutdown = 1;
2464         tls->socket_conf->tls = tlscf;
2465 
2466     } else {
2467         tlscf = tls->socket_conf->tls;
2468     }
2469 
2470     bundle = nxt_mp_get(mp, sizeof(nxt_tls_bundle_conf_t));
2471     if (nxt_slow_path(bundle == NULL)) {
2472         goto fail;
2473     }
2474 
2475     if (nxt_slow_path(nxt_str_dup(mp, &bundle->name, &tls->name) == NULL)) {
2476         goto fail;
2477     }
2478 
2479     bundle->chain_file = msg->fd[0];
2480     bundle->next = tlscf->bundle;
2481     tlscf->bundle = bundle;
2482 
2483     ret = task->thread->runtime->tls->server_init(task, tlscf, mp,
2484                                                   tls->conf_cmds, tls->last);
2485     if (nxt_slow_path(ret != NXT_OK)) {
2486         goto fail;
2487     }
2488 
2489     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2490                        nxt_router_conf_apply, task, tmcf, NULL);
2491     return;
2492 
2493 fail:
2494 
2495     nxt_router_conf_error(task, tmcf);
2496 }
2497 
2498 #endif
2499 
2500 
2501 static void
2502 nxt_router_app_rpc_create(nxt_task_t *task,
2503     nxt_router_temp_conf_t *tmcf, nxt_app_t *app)
2504 {
2505     size_t         size;
2506     uint32_t       stream;
2507     nxt_int_t      ret;
2508     nxt_buf_t      *b;
2509     nxt_port_t     *main_port, *router_port;
2510     nxt_runtime_t  *rt;
2511     nxt_app_rpc_t  *rpc;
2512 
2513     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_app_rpc_t));
2514     if (rpc == NULL) {
2515         goto fail;
2516     }
2517 
2518     rpc->app = app;
2519     rpc->temp_conf = tmcf;
2520 
2521     nxt_debug(task, "app '%V' prefork", &app->name);
2522 
2523     size = app->name.length + 1 + app->conf.length;
2524 
2525     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2526     if (nxt_slow_path(b == NULL)) {
2527         goto fail;
2528     }
2529 
2530     b->completion_handler = nxt_router_dummy_buf_completion;
2531 
2532     nxt_buf_cpystr(b, &app->name);
2533     *b->mem.free++ = '\0';
2534     nxt_buf_cpystr(b, &app->conf);
2535 
2536     rt = task->thread->runtime;
2537     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2538     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2539 
2540     stream = nxt_port_rpc_register_handler(task, router_port,
2541                                            nxt_router_app_prefork_ready,
2542                                            nxt_router_app_prefork_error,
2543                                            -1, rpc);
2544     if (nxt_slow_path(stream == 0)) {
2545         goto fail;
2546     }
2547 
2548     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS,
2549                                 -1, stream, router_port->id, b);
2550 
2551     if (nxt_slow_path(ret != NXT_OK)) {
2552         nxt_port_rpc_cancel(task, router_port, stream);
2553         goto fail;
2554     }
2555 
2556     app->pending_processes++;
2557 
2558     return;
2559 
2560 fail:
2561 
2562     nxt_router_conf_error(task, tmcf);
2563 }
2564 
2565 
2566 static void
2567 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2568     void *data)
2569 {
2570     nxt_app_t           *app;
2571     nxt_port_t          *port;
2572     nxt_app_rpc_t       *rpc;
2573     nxt_event_engine_t  *engine;
2574 
2575     rpc = data;
2576     app = rpc->app;
2577 
2578     port = msg->u.new_port;
2579 
2580     nxt_assert(port != NULL);
2581     nxt_assert(port->type == NXT_PROCESS_APP);
2582     nxt_assert(port->id == 0);
2583 
2584     port->app = app;
2585     port->main_app_port = port;
2586 
2587     app->pending_processes--;
2588     app->processes++;
2589     app->idle_processes++;
2590 
2591     engine = task->thread->engine;
2592 
2593     nxt_queue_insert_tail(&app->ports, &port->app_link);
2594     nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
2595 
2596     nxt_debug(task, "app '%V' move new port %PI:%d to spare_ports",
2597               &app->name, port->pid, port->id);
2598 
2599     nxt_port_hash_add(&app->port_hash, port);
2600     app->port_hash_count++;
2601 
2602     port->idle_start = 0;
2603 
2604     nxt_port_inc_use(port);
2605 
2606     nxt_router_app_shared_port_send(task, port);
2607 
2608     nxt_work_queue_add(&engine->fast_work_queue,
2609                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2610 }
2611 
2612 
2613 static void
2614 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2615     void *data)
2616 {
2617     nxt_app_t               *app;
2618     nxt_app_rpc_t           *rpc;
2619     nxt_router_temp_conf_t  *tmcf;
2620 
2621     rpc = data;
2622     app = rpc->app;
2623     tmcf = rpc->temp_conf;
2624 
2625     nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"",
2626             &app->name);
2627 
2628     app->pending_processes--;
2629 
2630     nxt_router_conf_error(task, tmcf);
2631 }
2632 
2633 
2634 static nxt_int_t
2635 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
2636     nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
2637 {
2638     nxt_int_t                 ret;
2639     nxt_uint_t                n, threads;
2640     nxt_queue_link_t          *qlk;
2641     nxt_router_engine_conf_t  *recf;
2642 
2643     threads = tmcf->router_conf->threads;
2644 
2645     tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
2646                                      sizeof(nxt_router_engine_conf_t));
2647     if (nxt_slow_path(tmcf->engines == NULL)) {
2648         return NXT_ERROR;
2649     }
2650 
2651     n = 0;
2652 
2653     for (qlk = nxt_queue_first(&router->engines);
2654          qlk != nxt_queue_tail(&router->engines);
2655          qlk = nxt_queue_next(qlk))
2656     {
2657         recf = nxt_array_zero_add(tmcf->engines);
2658         if (nxt_slow_path(recf == NULL)) {
2659             return NXT_ERROR;
2660         }
2661 
2662         recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0);
2663 
2664         if (n < threads) {
2665             recf->action = NXT_ROUTER_ENGINE_KEEP;
2666             ret = nxt_router_engine_conf_update(tmcf, recf);
2667 
2668         } else {
2669             recf->action = NXT_ROUTER_ENGINE_DELETE;
2670             ret = nxt_router_engine_conf_delete(tmcf, recf);
2671         }
2672 
2673         if (nxt_slow_path(ret != NXT_OK)) {
2674             return ret;
2675         }
2676 
2677         n++;
2678     }
2679 
2680     tmcf->new_threads = n;
2681 
2682     while (n < threads) {
2683         recf = nxt_array_zero_add(tmcf->engines);
2684         if (nxt_slow_path(recf == NULL)) {
2685             return NXT_ERROR;
2686         }
2687 
2688         recf->action = NXT_ROUTER_ENGINE_ADD;
2689 
2690         recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
2691         if (nxt_slow_path(recf->engine == NULL)) {
2692             return NXT_ERROR;
2693         }
2694 
2695         ret = nxt_router_engine_conf_create(tmcf, recf);
2696         if (nxt_slow_path(ret != NXT_OK)) {
2697             return ret;
2698         }
2699 
2700         n++;
2701     }
2702 
2703     return NXT_OK;
2704 }
2705 
2706 
2707 static nxt_int_t
2708 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
2709     nxt_router_engine_conf_t *recf)
2710 {
2711     nxt_int_t  ret;
2712 
2713     ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
2714                                           nxt_router_listen_socket_create);
2715     if (nxt_slow_path(ret != NXT_OK)) {
2716         return ret;
2717     }
2718 
2719     ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
2720                                           nxt_router_listen_socket_create);
2721     if (nxt_slow_path(ret != NXT_OK)) {
2722         return ret;
2723     }
2724 
2725     return ret;
2726 }
2727 
2728 
2729 static nxt_int_t
2730 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
2731     nxt_router_engine_conf_t *recf)
2732 {
2733     nxt_int_t  ret;
2734 
2735     ret = nxt_router_engine_joints_create(tmcf, recf, &creating_sockets,
2736                                           nxt_router_listen_socket_create);
2737     if (nxt_slow_path(ret != NXT_OK)) {
2738         return ret;
2739     }
2740 
2741     ret = nxt_router_engine_joints_create(tmcf, recf, &updating_sockets,
2742                                           nxt_router_listen_socket_update);
2743     if (nxt_slow_path(ret != NXT_OK)) {
2744         return ret;
2745     }
2746 
2747     ret = nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
2748     if (nxt_slow_path(ret != NXT_OK)) {
2749         return ret;
2750     }
2751 
2752     return ret;
2753 }
2754 
2755 
2756 static nxt_int_t
2757 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
2758     nxt_router_engine_conf_t *recf)
2759 {
2760     nxt_int_t  ret;
2761 
2762     ret = nxt_router_engine_quit(tmcf, recf);
2763     if (nxt_slow_path(ret != NXT_OK)) {
2764         return ret;
2765     }
2766 
2767     ret = nxt_router_engine_joints_delete(tmcf, recf, &updating_sockets);
2768     if (nxt_slow_path(ret != NXT_OK)) {
2769         return ret;
2770     }
2771 
2772     return nxt_router_engine_joints_delete(tmcf, recf, &deleting_sockets);
2773 }
2774 
2775 
2776 static nxt_int_t
2777 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
2778     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
2779     nxt_work_handler_t handler)
2780 {
2781     nxt_int_t                ret;
2782     nxt_joint_job_t          *job;
2783     nxt_queue_link_t         *qlk;
2784     nxt_socket_conf_t        *skcf;
2785     nxt_socket_conf_joint_t  *joint;
2786 
2787     for (qlk = nxt_queue_first(sockets);
2788          qlk != nxt_queue_tail(sockets);
2789          qlk = nxt_queue_next(qlk))
2790     {
2791         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
2792         if (nxt_slow_path(job == NULL)) {
2793             return NXT_ERROR;
2794         }
2795 
2796         job->work.next = recf->jobs;
2797         recf->jobs = &job->work;
2798 
2799         job->task = tmcf->engine->task;
2800         job->work.handler = handler;
2801         job->work.task = &job->task;
2802         job->work.obj = job;
2803         job->tmcf = tmcf;
2804 
2805         tmcf->count++;
2806 
2807         joint = nxt_mp_alloc(tmcf->router_conf->mem_pool,
2808                              sizeof(nxt_socket_conf_joint_t));
2809         if (nxt_slow_path(joint == NULL)) {
2810             return NXT_ERROR;
2811         }
2812 
2813         job->work.data = joint;
2814 
2815         ret = nxt_upstreams_joint_create(tmcf, &joint->upstreams);
2816         if (nxt_slow_path(ret != NXT_OK)) {
2817             return ret;
2818         }
2819 
2820         joint->count = 1;
2821 
2822         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2823         skcf->count++;
2824         joint->socket_conf = skcf;
2825 
2826         joint->engine = recf->engine;
2827     }
2828 
2829     return NXT_OK;
2830 }
2831 
2832 
2833 static nxt_int_t
2834 nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
2835     nxt_router_engine_conf_t *recf)
2836 {
2837     nxt_joint_job_t  *job;
2838 
2839     job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
2840     if (nxt_slow_path(job == NULL)) {
2841         return NXT_ERROR;
2842     }
2843 
2844     job->work.next = recf->jobs;
2845     recf->jobs = &job->work;
2846 
2847     job->task = tmcf->engine->task;
2848     job->work.handler = nxt_router_worker_thread_quit;
2849     job->work.task = &job->task;
2850     job->work.obj = NULL;
2851     job->work.data = NULL;
2852     job->tmcf = NULL;
2853 
2854     return NXT_OK;
2855 }
2856 
2857 
2858 static nxt_int_t
2859 nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
2860     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets)
2861 {
2862     nxt_joint_job_t   *job;
2863     nxt_queue_link_t  *qlk;
2864 
2865     for (qlk = nxt_queue_first(sockets);
2866          qlk != nxt_queue_tail(sockets);
2867          qlk = nxt_queue_next(qlk))
2868     {
2869         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
2870         if (nxt_slow_path(job == NULL)) {
2871             return NXT_ERROR;
2872         }
2873 
2874         job->work.next = recf->jobs;
2875         recf->jobs = &job->work;
2876 
2877         job->task = tmcf->engine->task;
2878         job->work.handler = nxt_router_listen_socket_delete;
2879         job->work.task = &job->task;
2880         job->work.obj = job;
2881         job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2882         job->tmcf = tmcf;
2883 
2884         tmcf->count++;
2885     }
2886 
2887     return NXT_OK;
2888 }
2889 
2890 
2891 static nxt_int_t
2892 nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
2893     nxt_router_temp_conf_t *tmcf)
2894 {
2895     nxt_int_t                 ret;
2896     nxt_uint_t                i, threads;
2897     nxt_router_engine_conf_t  *recf;
2898 
2899     recf = tmcf->engines->elts;
2900     threads = tmcf->router_conf->threads;
2901 
2902     for (i = tmcf->new_threads; i < threads; i++) {
2903         ret = nxt_router_thread_create(task, rt, recf[i].engine);
2904         if (nxt_slow_path(ret != NXT_OK)) {
2905             return ret;
2906         }
2907     }
2908 
2909     return NXT_OK;
2910 }
2911 
2912 
2913 static nxt_int_t
2914 nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
2915     nxt_event_engine_t *engine)
2916 {
2917     nxt_int_t            ret;
2918     nxt_thread_link_t    *link;
2919     nxt_thread_handle_t  handle;
2920 
2921     link = nxt_zalloc(sizeof(nxt_thread_link_t));
2922 
2923     if (nxt_slow_path(link == NULL)) {
2924         return NXT_ERROR;
2925     }
2926 
2927     link->start = nxt_router_thread_start;
2928     link->engine = engine;
2929     link->work.handler = nxt_router_thread_exit_handler;
2930     link->work.task = task;
2931     link->work.data = link;
2932 
2933     nxt_queue_insert_tail(&rt->engines, &engine->link);
2934 
2935     ret = nxt_thread_create(&handle, link);
2936 
2937     if (nxt_slow_path(ret != NXT_OK)) {
2938         nxt_queue_remove(&engine->link);
2939     }
2940 
2941     return ret;
2942 }
2943 
2944 
2945 static void
2946 nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
2947     nxt_router_temp_conf_t *tmcf)
2948 {
2949     nxt_app_t  *app;
2950 
2951     nxt_queue_each(app, &router->apps, nxt_app_t, link) {
2952 
2953         nxt_router_app_unlink(task, app);
2954 
2955     } nxt_queue_loop;
2956 
2957     nxt_queue_add(&router->apps, &tmcf->previous);
2958     nxt_queue_add(&router->apps, &tmcf->apps);
2959 }
2960 
2961 
2962 static void
2963 nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf)
2964 {
2965     nxt_uint_t                n;
2966     nxt_event_engine_t        *engine;
2967     nxt_router_engine_conf_t  *recf;
2968 
2969     recf = tmcf->engines->elts;
2970 
2971     for (n = tmcf->engines->nelts; n != 0; n--) {
2972         engine = recf->engine;
2973 
2974         switch (recf->action) {
2975 
2976         case NXT_ROUTER_ENGINE_KEEP:
2977             break;
2978 
2979         case NXT_ROUTER_ENGINE_ADD:
2980             nxt_queue_insert_tail(&router->engines, &engine->link0);
2981             break;
2982 
2983         case NXT_ROUTER_ENGINE_DELETE:
2984             nxt_queue_remove(&engine->link0);
2985             break;
2986         }
2987 
2988         nxt_router_engine_post(engine, recf->jobs);
2989 
2990         recf++;
2991     }
2992 }
2993 
2994 
2995 static void
2996 nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs)
2997 {
2998     nxt_work_t  *work, *next;
2999 
3000     for (work = jobs; work != NULL; work = next) {
3001         next = work->next;
3002         work->next = NULL;
3003 
3004         nxt_event_engine_post(engine, work);
3005     }
3006 }
3007 
3008 
3009 static nxt_port_handlers_t  nxt_router_app_port_handlers = {
3010     .rpc_error       = nxt_port_rpc_handler,
3011     .mmap            = nxt_port_mmap_handler,
3012     .data            = nxt_port_rpc_handler,
3013     .oosm            = nxt_router_oosm_handler,
3014     .req_headers_ack = nxt_port_rpc_handler,
3015 };
3016 
3017 
3018 static void
3019 nxt_router_thread_start(void *data)
3020 {
3021     nxt_int_t           ret;
3022     nxt_port_t          *port;
3023     nxt_task_t          *task;
3024     nxt_work_t          *work;
3025     nxt_thread_t        *thread;
3026     nxt_thread_link_t   *link;
3027     nxt_event_engine_t  *engine;
3028 
3029     link = data;
3030     engine = link->engine;
3031     task = &engine->task;
3032 
3033     thread = nxt_thread();
3034 
3035     nxt_event_engine_thread_adopt(engine);
3036 
3037     /* STUB */
3038     thread->runtime = engine->task.thread->runtime;
3039 
3040     engine->task.thread = thread;
3041     engine->task.log = thread->log;
3042     thread->engine = engine;
3043     thread->task = &engine->task;
3044 #if 0
3045     thread->fiber = &engine->fibers->fiber;
3046 #endif
3047 
3048     engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
3049     if (nxt_slow_path(engine->mem_pool == NULL)) {
3050         return;
3051     }
3052 
3053     port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid,
3054                         NXT_PROCESS_ROUTER);
3055     if (nxt_slow_path(port == NULL)) {
3056         return;
3057     }
3058 
3059     ret = nxt_port_socket_init(task, port, 0);
3060     if (nxt_slow_path(ret != NXT_OK)) {
3061         nxt_port_use(task, port, -1);
3062         return;
3063     }
3064 
3065     ret = nxt_router_port_queue_init(task, port);
3066     if (nxt_slow_path(ret != NXT_OK)) {
3067         nxt_port_use(task, port, -1);
3068         return;
3069     }
3070 
3071     engine->port = port;
3072 
3073     nxt_port_enable(task, port, &nxt_router_app_port_handlers);
3074 
3075     work = nxt_zalloc(sizeof(nxt_work_t));
3076     if (nxt_slow_path(work == NULL)) {
3077         return;
3078     }
3079 
3080     work->handler = nxt_router_rt_add_port;
3081     work->task = link->work.task;
3082     work->obj = work;
3083     work->data = port;
3084 
3085     nxt_event_engine_post(link->work.task->thread->engine, work);
3086 
3087     nxt_event_engine_start(engine);
3088 }
3089 
3090 
3091 static void
3092 nxt_router_rt_add_port(nxt_task_t *task, void *obj, void *data)
3093 {
3094     nxt_int_t      res;
3095     nxt_port_t     *port;
3096     nxt_runtime_t  *rt;
3097 
3098     rt = task->thread->runtime;
3099     port = data;
3100 
3101     nxt_free(obj);
3102 
3103     res = nxt_port_hash_add(&rt->ports, port);
3104 
3105     if (nxt_fast_path(res == NXT_OK)) {
3106         nxt_port_use(task, port, 1);
3107     }
3108 }
3109 
3110 
3111 static void
3112 nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data)
3113 {
3114     nxt_joint_job_t          *job;
3115     nxt_socket_conf_t        *skcf;
3116     nxt_listen_event_t       *lev;
3117     nxt_listen_socket_t      *ls;
3118     nxt_thread_spinlock_t    *lock;
3119     nxt_socket_conf_joint_t  *joint;
3120 
3121     job = obj;
3122     joint = data;
3123 
3124     nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link);
3125 
3126     skcf = joint->socket_conf;
3127     ls = skcf->listen;
3128 
3129     lev = nxt_listen_event(task, ls);
3130     if (nxt_slow_path(lev == NULL)) {
3131         nxt_router_listen_socket_release(task, skcf);
3132         return;
3133     }
3134 
3135     lev->socket.data = joint;
3136 
3137     lock = &skcf->router_conf->router->lock;
3138 
3139     nxt_thread_spin_lock(lock);
3140     ls->count++;
3141     nxt_thread_spin_unlock(lock);
3142 
3143     job->work.next = NULL;
3144     job->work.handler = nxt_router_conf_wait;
3145 
3146     nxt_event_engine_post(job->tmcf->engine, &job->work);
3147 }
3148 
3149 
3150 nxt_inline nxt_listen_event_t *
3151 nxt_router_listen_event(nxt_queue_t *listen_connections,
3152     nxt_socket_conf_t *skcf)
3153 {
3154     nxt_socket_t        fd;
3155     nxt_queue_link_t    *qlk;
3156     nxt_listen_event_t  *lev;
3157 
3158     fd = skcf->listen->socket;
3159 
3160     for (qlk = nxt_queue_first(listen_connections);
3161          qlk != nxt_queue_tail(listen_connections);
3162          qlk = nxt_queue_next(qlk))
3163     {
3164         lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link);
3165 
3166         if (fd == lev->socket.fd) {
3167             return lev;
3168         }
3169     }
3170 
3171     return NULL;
3172 }
3173 
3174 
3175 static void
3176 nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data)
3177 {
3178     nxt_joint_job_t          *job;
3179     nxt_event_engine_t       *engine;
3180     nxt_listen_event_t       *lev;
3181     nxt_socket_conf_joint_t  *joint, *old;
3182 
3183     job = obj;
3184     joint = data;
3185 
3186     engine = task->thread->engine;
3187 
3188     nxt_queue_insert_tail(&engine->joints, &joint->link);
3189 
3190     lev = nxt_router_listen_event(&engine->listen_connections,
3191                                   joint->socket_conf);
3192 
3193     old = lev->socket.data;
3194     lev->socket.data = joint;
3195     lev->listen = joint->socket_conf->listen;
3196 
3197     job->work.next = NULL;
3198     job->work.handler = nxt_router_conf_wait;
3199 
3200     nxt_event_engine_post(job->tmcf->engine, &job->work);
3201 
3202     /*
3203      * The task is allocated from configuration temporary
3204      * memory pool so it can be freed after engine post operation.
3205      */
3206 
3207     nxt_router_conf_release(&engine->task, old);
3208 }
3209 
3210 
3211 static void
3212 nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data)
3213 {
3214     nxt_socket_conf_t        *skcf;
3215     nxt_listen_event_t       *lev;
3216     nxt_event_engine_t       *engine;
3217     nxt_socket_conf_joint_t  *joint;
3218 
3219     skcf = data;
3220 
3221     engine = task->thread->engine;
3222 
3223     lev = nxt_router_listen_event(&engine->listen_connections, skcf);
3224 
3225     nxt_fd_event_delete(engine, &lev->socket);
3226 
3227     nxt_debug(task, "engine %p: listen socket delete: %d", engine,
3228               lev->socket.fd);
3229 
3230     joint = lev->socket.data;
3231     joint->close_job = obj;
3232 
3233     lev->timer.handler = nxt_router_listen_socket_close;
3234     lev->timer.work_queue = &engine->fast_work_queue;
3235 
3236     nxt_timer_add(engine, &lev->timer, 0);
3237 }
3238 
3239 
3240 static void
3241 nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, void *data)
3242 {
3243     nxt_event_engine_t  *engine;
3244 
3245     nxt_debug(task, "router worker thread quit");
3246 
3247     engine = task->thread->engine;
3248 
3249     engine->shutdown = 1;
3250 
3251     if (nxt_queue_is_empty(&engine->joints)) {
3252         nxt_thread_exit(task->thread);
3253     }
3254 }
3255 
3256 
3257 static void
3258 nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data)
3259 {
3260     nxt_timer_t              *timer;
3261     nxt_joint_job_t          *job;
3262     nxt_listen_event_t       *lev;
3263     nxt_socket_conf_joint_t  *joint;
3264 
3265     timer = obj;
3266     lev = nxt_timer_data(timer, nxt_listen_event_t, timer);
3267 
3268     nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine,
3269               lev->socket.fd);
3270 
3271     nxt_queue_remove(&lev->link);
3272 
3273     joint = lev->socket.data;
3274     lev->socket.data = NULL;
3275 
3276     /* 'task' refers to lev->task and we cannot use after nxt_free() */
3277     task = &task->thread->engine->task;
3278 
3279     nxt_router_listen_socket_release(task, joint->socket_conf);
3280 
3281     job = joint->close_job;
3282     job->work.next = NULL;
3283     job->work.handler = nxt_router_conf_wait;
3284 
3285     nxt_event_engine_post(job->tmcf->engine, &job->work);
3286 
3287     nxt_router_listen_event_release(task, lev, joint);
3288 }
3289 
3290 
3291 static void
3292 nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf)
3293 {
3294     nxt_listen_socket_t    *ls;
3295     nxt_thread_spinlock_t  *lock;
3296 
3297     ls = skcf->listen;
3298     lock = &skcf->router_conf->router->lock;
3299 
3300     nxt_thread_spin_lock(lock);
3301 
3302     nxt_debug(task, "engine %p: listen socket release: ls->count %D",
3303               task->thread->engine, ls->count);
3304 
3305     if (--ls->count != 0) {
3306         ls = NULL;
3307     }
3308 
3309     nxt_thread_spin_unlock(lock);
3310 
3311     if (ls != NULL) {
3312         nxt_socket_close(task, ls->socket);
3313         nxt_free(ls);
3314     }
3315 }
3316 
3317 
3318 void
3319 nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev,
3320     nxt_socket_conf_joint_t *joint)
3321 {
3322     nxt_event_engine_t  *engine;
3323 
3324     nxt_debug(task, "listen event count: %D", lev->count);
3325 
3326     engine = task->thread->engine;
3327 
3328     if (--lev->count == 0) {
3329         if (lev->next != NULL) {
3330             nxt_sockaddr_cache_free(engine, lev->next);
3331 
3332             nxt_conn_free(task, lev->next);
3333         }
3334 
3335         nxt_free(lev);
3336     }
3337 
3338     if (joint != NULL) {
3339         nxt_router_conf_release(task, joint);
3340     }
3341 
3342     if (engine->shutdown && nxt_queue_is_empty(&engine->joints)) {
3343         nxt_thread_exit(task->thread);
3344     }
3345 }
3346 
3347 
3348 void
3349 nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
3350 {
3351     nxt_socket_conf_t      *skcf;
3352     nxt_router_conf_t      *rtcf;
3353     nxt_thread_spinlock_t  *lock;
3354 
3355     nxt_debug(task, "conf joint %p count: %D", joint, joint->count);
3356 
3357     if (--joint->count != 0) {
3358         return;
3359     }
3360 
3361     nxt_queue_remove(&joint->link);
3362 
3363     /*
3364      * The joint content can not be safely used after the critical
3365      * section protected by the spinlock because its memory pool may
3366      * be already destroyed by another thread.
3367      */
3368     skcf = joint->socket_conf;
3369     rtcf = skcf->router_conf;
3370     lock = &rtcf->router->lock;
3371 
3372     nxt_thread_spin_lock(lock);
3373 
3374     nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count,
3375               rtcf, rtcf->count);
3376 
3377     if (--skcf->count != 0) {
3378         skcf = NULL;
3379         rtcf = NULL;
3380 
3381     } else {
3382         nxt_queue_remove(&skcf->link);
3383 
3384         if (--rtcf->count != 0) {
3385             rtcf = NULL;
3386         }
3387     }
3388 
3389     nxt_thread_spin_unlock(lock);
3390 
3391 #if (NXT_TLS)
3392     if (skcf != NULL && skcf->tls != NULL) {
3393         task->thread->runtime->tls->server_free(task, skcf->tls);
3394     }
3395 #endif
3396 
3397     /* TODO remove engine->port */
3398 
3399     if (rtcf != NULL) {
3400         nxt_debug(task, "old router conf is destroyed");
3401 
3402         nxt_router_apps_hash_use(task, rtcf, -1);
3403 
3404         nxt_router_access_log_release(task, lock, rtcf->access_log);
3405 
3406         nxt_mp_thread_adopt(rtcf->mem_pool);
3407 
3408         nxt_mp_destroy(rtcf->mem_pool);
3409     }
3410 }
3411 
3412 
3413 static void
3414 nxt_router_access_log_writer(nxt_task_t *task, nxt_http_request_t *r,
3415     nxt_router_access_log_t *access_log)
3416 {
3417     size_t     size;
3418     u_char     *buf, *p;
3419     nxt_off_t  bytes;
3420 
3421     static nxt_time_string_t  date_cache = {
3422         (nxt_atomic_uint_t) -1,
3423         nxt_router_access_log_date,
3424         "%02d/%s/%4d:%02d:%02d:%02d %c%02d%02d",
3425         nxt_length("31/Dec/1986:19:40:00 +0300"),
3426         NXT_THREAD_TIME_LOCAL,
3427         NXT_THREAD_TIME_SEC,
3428     };
3429 
3430     size = r->remote->address_length
3431            + 6                  /* ' - - [' */
3432            + date_cache.size
3433            + 3                  /* '] "' */
3434            + r->method->length
3435            + 1                  /* space */
3436            + r->target.length
3437            + 1                  /* space */
3438            + r->version.length
3439            + 2                  /* '" ' */
3440            + 3                  /* status */
3441            + 1                  /* space */
3442            + NXT_OFF_T_LEN
3443            + 2                  /* ' "' */
3444            + (r->referer != NULL ? r->referer->value_length : 1)
3445            + 3                  /* '" "' */
3446            + (r->user_agent != NULL ? r->user_agent->value_length : 1)
3447            + 2                  /* '"\n' */
3448     ;
3449 
3450     buf = nxt_mp_nget(r->mem_pool, size);
3451     if (nxt_slow_path(buf == NULL)) {
3452         return;
3453     }
3454 
3455     p = nxt_cpymem(buf, nxt_sockaddr_address(r->remote),
3456                    r->remote->address_length);
3457 
3458     p = nxt_cpymem(p, " - - [", 6);
3459 
3460     p = nxt_thread_time_string(task->thread, &date_cache, p);
3461 
3462     p = nxt_cpymem(p, "] \"", 3);
3463 
3464     if (r->method->length != 0) {
3465         p = nxt_cpymem(p, r->method->start, r->method->length);
3466 
3467         if (r->target.length != 0) {
3468             *p++ = ' ';
3469             p = nxt_cpymem(p, r->target.start, r->target.length);
3470 
3471             if (r->version.length != 0) {
3472                 *p++ = ' ';
3473                 p = nxt_cpymem(p, r->version.start, r->version.length);
3474             }
3475         }
3476 
3477     } else {
3478         *p++ = '-';
3479     }
3480 
3481     p = nxt_cpymem(p, "\" ", 2);
3482 
3483     p = nxt_sprintf(p, p + 3, "%03d", r->status);
3484 
3485     *p++ = ' ';
3486 
3487     bytes = nxt_http_proto[r->protocol].body_bytes_sent(task, r->proto);
3488 
3489     p = nxt_sprintf(p, p + NXT_OFF_T_LEN, "%O", bytes);
3490 
3491     p = nxt_cpymem(p, " \"", 2);
3492 
3493     if (r->referer != NULL) {
3494         p = nxt_cpymem(p, r->referer->value, r->referer->value_length);
3495 
3496     } else {
3497         *p++ = '-';
3498     }
3499 
3500     p = nxt_cpymem(p, "\" \"", 3);
3501 
3502     if (r->user_agent != NULL) {
3503         p = nxt_cpymem(p, r->user_agent->value, r->user_agent->value_length);
3504 
3505     } else {
3506         *p++ = '-';
3507     }
3508 
3509     p = nxt_cpymem(p, "\"\n", 2);
3510 
3511     nxt_fd_write(access_log->fd, buf, p - buf);
3512 }
3513 
3514 
3515 static u_char *
3516 nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, struct tm *tm,
3517     size_t size, const char *format)
3518 {
3519     u_char  sign;
3520     time_t  gmtoff;
3521 
3522     static const char  *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun",
3523                                     "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
3524 
3525     gmtoff = nxt_timezone(tm) / 60;
3526 
3527     if (gmtoff < 0) {
3528         gmtoff = -gmtoff;
3529         sign = '-';
3530 
3531     } else {
3532         sign = '+';
3533     }
3534 
3535     return nxt_sprintf(buf, buf + size, format,
3536                        tm->tm_mday, month[tm->tm_mon], tm->tm_year + 1900,
3537                        tm->tm_hour, tm->tm_min, tm->tm_sec,
3538                        sign, gmtoff / 60, gmtoff % 60);
3539 }
3540 
3541 
3542 static void
3543 nxt_router_access_log_open(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
3544 {
3545     uint32_t                 stream;
3546     nxt_int_t                ret;
3547     nxt_buf_t                *b;
3548     nxt_port_t               *main_port, *router_port;
3549     nxt_runtime_t            *rt;
3550     nxt_router_access_log_t  *access_log;
3551 
3552     access_log = tmcf->router_conf->access_log;
3553 
3554     b = nxt_buf_mem_alloc(tmcf->mem_pool, access_log->path.length + 1, 0);
3555     if (nxt_slow_path(b == NULL)) {
3556         goto fail;
3557     }
3558 
3559     b->completion_handler = nxt_router_dummy_buf_completion;
3560 
3561     nxt_buf_cpystr(b, &access_log->path);
3562     *b->mem.free++ = '\0';
3563 
3564     rt = task->thread->runtime;
3565     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
3566     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
3567 
3568     stream = nxt_port_rpc_register_handler(task, router_port,
3569                                            nxt_router_access_log_ready,
3570                                            nxt_router_access_log_error,
3571                                            -1, tmcf);
3572     if (nxt_slow_path(stream == 0)) {
3573         goto fail;
3574     }
3575 
3576     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1,
3577                                 stream, router_port->id, b);
3578 
3579     if (nxt_slow_path(ret != NXT_OK)) {
3580         nxt_port_rpc_cancel(task, router_port, stream);
3581         goto fail;
3582     }
3583 
3584     return;
3585 
3586 fail:
3587 
3588     nxt_router_conf_error(task, tmcf);
3589 }
3590 
3591 
3592 static void
3593 nxt_router_access_log_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3594     void *data)
3595 {
3596     nxt_router_temp_conf_t   *tmcf;
3597     nxt_router_access_log_t  *access_log;
3598 
3599     tmcf = data;
3600 
3601     access_log = tmcf->router_conf->access_log;
3602 
3603     access_log->fd = msg->fd[0];
3604 
3605     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
3606                        nxt_router_conf_apply, task, tmcf, NULL);
3607 }
3608 
3609 
3610 static void
3611 nxt_router_access_log_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3612     void *data)
3613 {
3614     nxt_router_temp_conf_t  *tmcf;
3615 
3616     tmcf = data;
3617 
3618     nxt_router_conf_error(task, tmcf);
3619 }
3620 
3621 
3622 static void
3623 nxt_router_access_log_release(nxt_task_t *task, nxt_thread_spinlock_t *lock,
3624     nxt_router_access_log_t *access_log)
3625 {
3626     if (access_log == NULL) {
3627         return;
3628     }
3629 
3630     nxt_thread_spin_lock(lock);
3631 
3632     if (--access_log->count != 0) {
3633         access_log = NULL;
3634     }
3635 
3636     nxt_thread_spin_unlock(lock);
3637 
3638     if (access_log != NULL) {
3639 
3640         if (access_log->fd != -1) {
3641             nxt_fd_close(access_log->fd);
3642         }
3643 
3644         nxt_free(access_log);
3645     }
3646 }
3647 
3648 
3649 typedef struct {
3650     nxt_mp_t                 *mem_pool;
3651     nxt_router_access_log_t  *access_log;
3652 } nxt_router_access_log_reopen_t;
3653 
3654 
3655 static void
3656 nxt_router_access_log_reopen_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
3657 {
3658     nxt_mp_t                        *mp;
3659     uint32_t                        stream;
3660     nxt_int_t                       ret;
3661     nxt_buf_t                       *b;
3662     nxt_port_t                      *main_port, *router_port;
3663     nxt_runtime_t                   *rt;
3664     nxt_router_access_log_t         *access_log;
3665     nxt_router_access_log_reopen_t  *reopen;
3666 
3667     access_log = nxt_router->access_log;
3668 
3669     if (access_log == NULL) {
3670         return;
3671     }
3672 
3673     mp = nxt_mp_create(1024, 128, 256, 32);
3674     if (nxt_slow_path(mp == NULL)) {
3675         return;
3676     }
3677 
3678     reopen = nxt_mp_get(mp, sizeof(nxt_router_access_log_reopen_t));
3679     if (nxt_slow_path(reopen == NULL)) {
3680         goto fail;
3681     }
3682 
3683     reopen->mem_pool = mp;
3684     reopen->access_log = access_log;
3685 
3686     b = nxt_buf_mem_alloc(mp, access_log->path.length + 1, 0);
3687     if (nxt_slow_path(b == NULL)) {
3688         goto fail;
3689     }
3690 
3691     b->completion_handler = nxt_router_access_log_reopen_completion;
3692 
3693     nxt_buf_cpystr(b, &access_log->path);
3694     *b->mem.free++ = '\0';
3695 
3696     rt = task->thread->runtime;
3697     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
3698     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
3699 
3700     stream = nxt_port_rpc_register_handler(task, router_port,
3701                                            nxt_router_access_log_reopen_ready,
3702                                            nxt_router_access_log_reopen_error,
3703                                            -1, reopen);
3704     if (nxt_slow_path(stream == 0)) {
3705         goto fail;
3706     }
3707 
3708     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1,
3709                                 stream, router_port->id, b);
3710 
3711     if (nxt_slow_path(ret != NXT_OK)) {
3712         nxt_port_rpc_cancel(task, router_port, stream);
3713         goto fail;
3714     }
3715 
3716     nxt_mp_retain(mp);
3717 
3718     return;
3719 
3720 fail:
3721 
3722     nxt_mp_destroy(mp);
3723 }
3724 
3725 
3726 static void
3727 nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj, void *data)
3728 {
3729     nxt_mp_t   *mp;
3730     nxt_buf_t  *b;
3731 
3732     b = obj;
3733     mp = b->data;
3734 
3735     nxt_mp_release(mp);
3736 }
3737 
3738 
3739 static void
3740 nxt_router_access_log_reopen_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3741     void *data)
3742 {
3743     nxt_router_access_log_t         *access_log;
3744     nxt_router_access_log_reopen_t  *reopen;
3745 
3746     reopen = data;
3747 
3748     access_log = reopen->access_log;
3749 
3750     if (access_log == nxt_router->access_log) {
3751 
3752         if (nxt_slow_path(dup2(msg->fd[0], access_log->fd) == -1)) {
3753             nxt_alert(task, "dup2(%FD, %FD) failed %E",
3754                       msg->fd[0], access_log->fd, nxt_errno);
3755         }
3756     }
3757 
3758     nxt_fd_close(msg->fd[0]);
3759     nxt_mp_release(reopen->mem_pool);
3760 }
3761 
3762 
3763 static void
3764 nxt_router_access_log_reopen_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3765     void *data)
3766 {
3767     nxt_router_access_log_reopen_t  *reopen;
3768 
3769     reopen = data;
3770 
3771     nxt_mp_release(reopen->mem_pool);
3772 }
3773 
3774 
3775 static void
3776 nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data)
3777 {
3778     nxt_port_t           *port;
3779     nxt_thread_link_t    *link;
3780     nxt_event_engine_t   *engine;
3781     nxt_thread_handle_t  handle;
3782 
3783     handle = (nxt_thread_handle_t) (uintptr_t) obj;
3784     link = data;
3785 
3786     nxt_thread_wait(handle);
3787 
3788     engine = link->engine;
3789 
3790     nxt_queue_remove(&engine->link);
3791 
3792     port = engine->port;
3793 
3794     // TODO notify all apps
3795 
3796     port->engine = task->thread->engine;
3797     nxt_mp_thread_adopt(port->mem_pool);
3798     nxt_port_use(task, port, -1);
3799 
3800     nxt_mp_thread_adopt(engine->mem_pool);
3801     nxt_mp_destroy(engine->mem_pool);
3802 
3803     nxt_event_engine_free(engine);
3804 
3805     nxt_free(link);
3806 }
3807 
3808 
3809 static void
3810 nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3811     void *data)
3812 {
3813     size_t                  b_size, count;
3814     nxt_int_t               ret;
3815     nxt_app_t               *app;
3816     nxt_buf_t               *b, *next;
3817     nxt_port_t              *app_port;
3818     nxt_unit_field_t        *f;
3819     nxt_http_field_t        *field;
3820     nxt_http_request_t      *r;
3821     nxt_unit_response_t     *resp;
3822     nxt_request_rpc_data_t  *req_rpc_data;
3823 
3824     req_rpc_data = data;
3825 
3826     r = req_rpc_data->request;
3827     if (nxt_slow_path(r == NULL)) {
3828         return;
3829     }
3830 
3831     if (r->error) {
3832         nxt_request_rpc_data_unlink(task, req_rpc_data);
3833         return;
3834     }
3835 
3836     app = req_rpc_data->app;
3837     nxt_assert(app != NULL);
3838 
3839     if (msg->port_msg.type == _NXT_PORT_MSG_REQ_HEADERS_ACK) {
3840         nxt_router_req_headers_ack_handler(task, msg, req_rpc_data);
3841 
3842         return;
3843     }
3844 
3845     b = (msg->size == 0) ? NULL : msg->buf;
3846 
3847     if (msg->port_msg.last != 0) {
3848         nxt_debug(task, "router data create last buf");
3849 
3850         nxt_buf_chain_add(&b, nxt_http_buf_last(r));
3851 
3852         req_rpc_data->rpc_cancel = 0;
3853 
3854         if (req_rpc_data->apr_action == NXT_APR_REQUEST_FAILED) {
3855             req_rpc_data->apr_action = NXT_APR_GOT_RESPONSE;
3856         }
3857 
3858         nxt_request_rpc_data_unlink(task, req_rpc_data);
3859 
3860     } else {
3861         if (app->timeout != 0) {
3862             r->timer.handler = nxt_router_app_timeout;
3863             r->timer_data = req_rpc_data;
3864             nxt_timer_add(task->thread->engine, &r->timer, app->timeout);
3865         }
3866     }
3867 
3868     if (b == NULL) {
3869         return;
3870     }
3871 
3872     if (msg->buf == b) {
3873         /* Disable instant buffer completion/re-using by port. */
3874         msg->buf = NULL;
3875     }
3876 
3877     if (r->header_sent) {
3878         nxt_buf_chain_add(&r->out, b);
3879         nxt_http_request_send_body(task, r, NULL);
3880 
3881     } else {
3882         b_size = nxt_buf_is_mem(b) ? nxt_buf_mem_used_size(&b->mem) : 0;
3883 
3884         if (nxt_slow_path(b_size < sizeof(nxt_unit_response_t))) {
3885             nxt_alert(task, "response buffer too small: %z", b_size);
3886             goto fail;
3887         }
3888 
3889         resp = (void *) b->mem.pos;
3890         count = (b_size - sizeof(nxt_unit_response_t))
3891                     / sizeof(nxt_unit_field_t);
3892 
3893         if (nxt_slow_path(count < resp->fields_count)) {
3894             nxt_alert(task, "response buffer too small for fields count: %D",
3895                       resp->fields_count);
3896             goto fail;
3897         }
3898 
3899         field = NULL;
3900 
3901         for (f = resp->fields; f < resp->fields + resp->fields_count; f++) {
3902             if (f->skip) {
3903                 continue;
3904             }
3905 
3906             field = nxt_list_add(r->resp.fields);
3907 
3908             if (nxt_slow_path(field == NULL)) {
3909                 goto fail;
3910             }
3911 
3912             field->hash = f->hash;
3913             field->skip = 0;
3914             field->hopbyhop = 0;
3915 
3916             field->name_length = f->name_length;
3917             field->value_length = f->value_length;
3918             field->name = nxt_unit_sptr_get(&f->name);
3919             field->value = nxt_unit_sptr_get(&f->value);
3920 
3921             ret = nxt_http_field_process(field, &nxt_response_fields_hash, r);
3922             if (nxt_slow_path(ret != NXT_OK)) {
3923                 goto fail;
3924             }
3925 
3926             nxt_debug(task, "header%s: %*s: %*s",
3927                       (field->skip ? " skipped" : ""),
3928                       (size_t) field->name_length, field->name,
3929                       (size_t) field->value_length, field->value);
3930 
3931             if (field->skip) {
3932                 r->resp.fields->last->nelts--;
3933             }
3934         }
3935 
3936         r->status = resp->status;
3937 
3938         if (resp->piggyback_content_length != 0) {
3939             b->mem.pos = nxt_unit_sptr_get(&resp->piggyback_content);
3940             b->mem.free = b->mem.pos + resp->piggyback_content_length;
3941 
3942         } else {
3943             b->mem.pos = b->mem.free;
3944         }
3945 
3946         if (nxt_buf_mem_used_size(&b->mem) == 0) {
3947             next = b->next;
3948             b->next = NULL;
3949 
3950             nxt_work_queue_add(&task->thread->engine->fast_work_queue,
3951                                b->completion_handler, task, b, b->parent);
3952 
3953             b = next;
3954         }
3955 
3956         if (b != NULL) {
3957             nxt_buf_chain_add(&r->out, b);
3958         }
3959 
3960         nxt_http_request_header_send(task, r, nxt_http_request_send_body, NULL);
3961 
3962         if (r->websocket_handshake
3963             && r->status == NXT_HTTP_SWITCHING_PROTOCOLS)
3964         {
3965             app_port = req_rpc_data->app_port;
3966             if (nxt_slow_path(app_port == NULL)) {
3967                 goto fail;
3968             }
3969 
3970             nxt_thread_mutex_lock(&app->mutex);
3971 
3972             app_port->main_app_port->active_websockets++;
3973 
3974             nxt_thread_mutex_unlock(&app->mutex);
3975 
3976             nxt_router_app_port_release(task, app_port, NXT_APR_UPGRADE);
3977             req_rpc_data->apr_action = NXT_APR_CLOSE;
3978 
3979             nxt_debug(task, "stream #%uD upgrade", req_rpc_data->stream);
3980 
3981             r->state = &nxt_http_websocket;
3982 
3983         } else {
3984             r->state = &nxt_http_request_send_state;
3985         }
3986     }
3987 
3988     return;
3989 
3990 fail:
3991 
3992     nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
3993 
3994     nxt_request_rpc_data_unlink(task, req_rpc_data);
3995 }
3996 
3997 
3998 static void
3999 nxt_router_req_headers_ack_handler(nxt_task_t *task,
4000     nxt_port_recv_msg_t *msg, nxt_request_rpc_data_t *req_rpc_data)
4001 {
4002     int                 res;
4003     nxt_app_t           *app;
4004     nxt_buf_t           *b;
4005     nxt_bool_t          start_process, unlinked;
4006     nxt_port_t          *app_port, *main_app_port, *idle_port;
4007     nxt_queue_link_t    *idle_lnk;
4008     nxt_http_request_t  *r;
4009 
4010     nxt_debug(task, "stream #%uD: got ack from %PI:%d",
4011               req_rpc_data->stream,
4012               msg->port_msg.pid, msg->port_msg.reply_port);
4013 
4014     nxt_port_rpc_ex_set_peer(task, msg->port, req_rpc_data,
4015                              msg->port_msg.pid);
4016 
4017     app = req_rpc_data->app;
4018     r = req_rpc_data->request;
4019 
4020     start_process = 0;
4021     unlinked = 0;
4022 
4023     nxt_thread_mutex_lock(&app->mutex);
4024 
4025     if (r->app_link.next != NULL) {
4026         nxt_queue_remove(&r->app_link);
4027         r->app_link.next = NULL;
4028 
4029         unlinked = 1;
4030     }
4031 
4032     app_port = nxt_port_hash_find(&app->port_hash, msg->port_msg.pid,
4033                                   msg->port_msg.reply_port);
4034     if (nxt_slow_path(app_port == NULL)) {
4035         nxt_thread_mutex_unlock(&app->mutex);
4036 
4037         nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
4038 
4039         if (unlinked) {
4040             nxt_mp_release(r->mem_pool);
4041         }
4042 
4043         return;
4044     }
4045 
4046     main_app_port = app_port->main_app_port;
4047 
4048     if (nxt_queue_chk_remove(&main_app_port->idle_link)) {
4049         app->idle_processes--;
4050 
4051         nxt_debug(task, "app '%V' move port %PI:%d out of %s (ack)",
4052                   &app->name, main_app_port->pid, main_app_port->id,
4053                   (main_app_port->idle_start ? "idle_ports" : "spare_ports"));
4054 
4055         /* Check port was in 'spare_ports' using idle_start field. */
4056         if (main_app_port->idle_start == 0
4057             && app->idle_processes >= app->spare_processes)
4058         {
4059             /*
4060              * If there is a vacant space in spare ports,
4061              * move the last idle to spare_ports.
4062              */
4063             nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4064 
4065             idle_lnk = nxt_queue_last(&app->idle_ports);
4066             idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link);
4067             nxt_queue_remove(idle_lnk);
4068 
4069             nxt_queue_insert_tail(&app->spare_ports, idle_lnk);
4070 
4071             idle_port->idle_start = 0;
4072 
4073             nxt_debug(task, "app '%V' move port %PI:%d from idle_ports "
4074                       "to spare_ports",
4075                       &app->name, idle_port->pid, idle_port->id);
4076         }
4077 
4078         if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) {
4079             app->pending_processes++;
4080             start_process = 1;
4081         }
4082     }
4083 
4084     main_app_port->active_requests++;
4085 
4086     nxt_port_inc_use(app_port);
4087 
4088     nxt_thread_mutex_unlock(&app->mutex);
4089 
4090     if (unlinked) {
4091         nxt_mp_release(r->mem_pool);
4092     }
4093 
4094     if (start_process) {
4095         nxt_router_start_app_process(task, app);
4096     }
4097 
4098     nxt_port_use(task, req_rpc_data->app_port, -1);
4099 
4100     req_rpc_data->app_port = app_port;
4101 
4102     b = req_rpc_data->msg_info.buf;
4103 
4104     if (b != NULL) {
4105         /* First buffer is already sent.  Start from second. */
4106         b = b->next;
4107 
4108         req_rpc_data->msg_info.buf->next = NULL;
4109     }
4110 
4111     if (req_rpc_data->msg_info.body_fd != -1 || b != NULL) {
4112         nxt_debug(task, "stream #%uD: send body fd %d", req_rpc_data->stream,
4113                   req_rpc_data->msg_info.body_fd);
4114 
4115         if (req_rpc_data->msg_info.body_fd != -1) {
4116             lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET);
4117         }
4118 
4119         res = nxt_port_socket_write(task, app_port, NXT_PORT_MSG_REQ_BODY,
4120                                     req_rpc_data->msg_info.body_fd,
4121                                     req_rpc_data->stream,
4122                                     task->thread->engine->port->id, b);
4123 
4124         if (nxt_slow_path(res != NXT_OK)) {
4125             nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
4126         }
4127     }
4128 
4129     if (app->timeout != 0) {
4130         r->timer.handler = nxt_router_app_timeout;
4131         r->timer_data = req_rpc_data;
4132         nxt_timer_add(task->thread->engine, &r->timer, app->timeout);
4133     }
4134 }
4135 
4136 
4137 static const nxt_http_request_state_t  nxt_http_request_send_state
4138     nxt_aligned(64) =
4139 {
4140     .error_handler = nxt_http_request_error_handler,
4141 };
4142 
4143 
4144 static void
4145 nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data)
4146 {
4147     nxt_buf_t           *out;
4148     nxt_http_request_t  *r;
4149 
4150     r = obj;
4151 
4152     out = r->out;
4153 
4154     if (out != NULL) {
4155         r->out = NULL;
4156         nxt_http_request_send(task, r, out);
4157     }
4158 }
4159 
4160 
4161 static void
4162 nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4163     void *data)
4164 {
4165     nxt_request_rpc_data_t  *req_rpc_data;
4166 
4167     req_rpc_data = data;
4168 
4169     req_rpc_data->rpc_cancel = 0;
4170 
4171     /* TODO cancel message and return if cancelled. */
4172     // nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream);
4173 
4174     if (req_rpc_data->request != NULL) {
4175         nxt_http_request_error(task, req_rpc_data->request,
4176                                NXT_HTTP_SERVICE_UNAVAILABLE);
4177     }
4178 
4179     nxt_request_rpc_data_unlink(task, req_rpc_data);
4180 }
4181 
4182 
4183 static void
4184 nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4185     void *data)
4186 {
4187     nxt_app_t        *app;
4188     nxt_port_t       *port;
4189     nxt_app_joint_t  *app_joint;
4190 
4191     app_joint = data;
4192     port = msg->u.new_port;
4193 
4194     nxt_assert(app_joint != NULL);
4195     nxt_assert(port != NULL);
4196     nxt_assert(port->type == NXT_PROCESS_APP);
4197     nxt_assert(port->id == 0);
4198 
4199     app = app_joint->app;
4200 
4201     nxt_router_app_joint_use(task, app_joint, -1);
4202 
4203     if (nxt_slow_path(app == NULL)) {
4204         nxt_debug(task, "new port ready for released app, send QUIT");
4205 
4206         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4207 
4208         return;
4209     }
4210 
4211     port->app = app;
4212     port->main_app_port = port;
4213 
4214     nxt_thread_mutex_lock(&app->mutex);
4215 
4216     nxt_assert(app->pending_processes != 0);
4217 
4218     app->pending_processes--;
4219     app->processes++;
4220     nxt_port_hash_add(&app->port_hash, port);
4221     app->port_hash_count++;
4222 
4223     nxt_thread_mutex_unlock(&app->mutex);
4224 
4225     nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d",
4226               &app->name, port->pid, app->processes, app->pending_processes);
4227 
4228     nxt_router_app_shared_port_send(task, port);
4229 
4230     nxt_router_app_port_release(task, port, NXT_APR_NEW_PORT);
4231 }
4232 
4233 
4234 static nxt_int_t
4235 nxt_router_app_shared_port_send(nxt_task_t *task, nxt_port_t *app_port)
4236 {
4237     nxt_buf_t                *b;
4238     nxt_port_t               *port;
4239     nxt_port_msg_new_port_t  *msg;
4240 
4241     b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
4242                              sizeof(nxt_port_data_t));
4243     if (nxt_slow_path(b == NULL)) {
4244         return NXT_ERROR;
4245     }
4246 
4247     port = app_port->app->shared_port;
4248 
4249     nxt_debug(task, "send port %FD to process %PI",
4250               port->pair[0], app_port->pid);
4251 
4252     b->mem.free += sizeof(nxt_port_msg_new_port_t);
4253     msg = (nxt_port_msg_new_port_t *) b->mem.pos;
4254 
4255     msg->id = port->id;
4256     msg->pid = port->pid;
4257     msg->max_size = port->max_size;
4258     msg->max_share = port->max_share;
4259     msg->type = port->type;
4260 
4261     return nxt_port_socket_write2(task, app_port,
4262                                   NXT_PORT_MSG_NEW_PORT,
4263                                   port->pair[0], port->queue_fd,
4264                                   0, 0, b);
4265 }
4266 
4267 
4268 static void
4269 nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
4270     void *data)
4271 {
4272     nxt_app_t           *app;
4273     nxt_app_joint_t     *app_joint;
4274     nxt_queue_link_t    *link;
4275     nxt_http_request_t  *r;
4276 
4277     app_joint = data;
4278 
4279     nxt_assert(app_joint != NULL);
4280 
4281     app = app_joint->app;
4282 
4283     nxt_router_app_joint_use(task, app_joint, -1);
4284 
4285     if (nxt_slow_path(app == NULL)) {
4286         nxt_debug(task, "start error for released app");
4287 
4288         return;
4289     }
4290 
4291     nxt_debug(task, "app '%V' %p start error", &app->name, app);
4292 
4293     link = NULL;
4294 
4295     nxt_thread_mutex_lock(&app->mutex);
4296 
4297     nxt_assert(app->pending_processes != 0);
4298 
4299     app->pending_processes--;
4300 
4301     if (app->processes == 0 && !nxt_queue_is_empty(&app->ack_waiting_req)) {
4302         link = nxt_queue_first(&app->ack_waiting_req);
4303 
4304         nxt_queue_remove(link);
4305         link->next = NULL;
4306     }
4307 
4308     nxt_thread_mutex_unlock(&app->mutex);
4309 
4310     while (link != NULL) {
4311         r = nxt_container_of(link, nxt_http_request_t, app_link);
4312 
4313         nxt_event_engine_post(r->engine, &r->err_work);
4314 
4315         link = NULL;
4316 
4317         nxt_thread_mutex_lock(&app->mutex);
4318 
4319         if (app->processes == 0 && app->pending_processes == 0
4320             && !nxt_queue_is_empty(&app->ack_waiting_req))
4321         {
4322             link = nxt_queue_first(&app->ack_waiting_req);
4323 
4324             nxt_queue_remove(link);
4325             link->next = NULL;
4326         }
4327 
4328         nxt_thread_mutex_unlock(&app->mutex);
4329     }
4330 }
4331 
4332 
4333 
4334 nxt_inline nxt_port_t *
4335 nxt_router_app_get_port_for_quit(nxt_task_t *task, nxt_app_t *app)
4336 {
4337     nxt_port_t  *port;
4338 
4339     port = NULL;
4340 
4341     nxt_thread_mutex_lock(&app->mutex);
4342 
4343     nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
4344 
4345         /* Caller is responsible to decrease port use count. */
4346         nxt_queue_chk_remove(&port->app_link);
4347 
4348         if (nxt_queue_chk_remove(&port->idle_link)) {
4349             app->idle_processes--;
4350 
4351             nxt_debug(task, "app '%V' move port %PI:%d out of %s for quit",
4352                       &app->name, port->pid, port->id,
4353                       (port->idle_start ? "idle_ports" : "spare_ports"));
4354         }
4355 
4356         nxt_port_hash_remove(&app->port_hash, port);
4357         app->port_hash_count--;
4358 
4359         port->app = NULL;
4360         app->processes--;
4361 
4362         break;
4363 
4364     } nxt_queue_loop;
4365 
4366     nxt_thread_mutex_unlock(&app->mutex);
4367 
4368     return port;
4369 }
4370 
4371 
4372 static void
4373 nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i)
4374 {
4375     int  c;
4376 
4377     c = nxt_atomic_fetch_add(&app->use_count, i);
4378 
4379     if (i < 0 && c == -i) {
4380 
4381         if (task->thread->engine != app->engine) {
4382             nxt_event_engine_post(app->engine, &app->joint->free_app_work);
4383 
4384         } else {
4385             nxt_router_free_app(task, app->joint, NULL);
4386         }
4387     }
4388 }
4389 
4390 
4391 static void
4392 nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app)
4393 {
4394     nxt_debug(task, "app '%V' %p unlink", &app->name, app);
4395 
4396     nxt_queue_remove(&app->link);
4397 
4398     nxt_router_app_use(task, app, -1);
4399 }
4400 
4401 
4402 static void
4403 nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
4404     nxt_apr_action_t action)
4405 {
4406     int         inc_use;
4407     uint32_t    got_response, dec_requests;
4408     nxt_app_t   *app;
4409     nxt_bool_t  port_unchained, send_quit, adjust_idle_timer;
4410     nxt_port_t  *main_app_port;
4411 
4412     nxt_assert(port != NULL);
4413     nxt_assert(port->app != NULL);
4414 
4415     app = port->app;
4416 
4417     inc_use = 0;
4418     got_response = 0;
4419     dec_requests = 0;
4420 
4421     switch (action) {
4422     case NXT_APR_NEW_PORT:
4423         break;
4424     case NXT_APR_REQUEST_FAILED:
4425         dec_requests = 1;
4426         inc_use = -1;
4427         break;
4428     case NXT_APR_GOT_RESPONSE:
4429         got_response = 1;
4430         inc_use = -1;
4431         break;
4432     case NXT_APR_UPGRADE:
4433         got_response = 1;
4434         break;
4435     case NXT_APR_CLOSE:
4436         inc_use = -1;
4437         break;
4438     }
4439 
4440     nxt_debug(task, "app '%V' release port %PI:%d: %d %d", &app->name,
4441               port->pid, port->id,
4442               (int) inc_use, (int) got_response);
4443 
4444     if (port == app->shared_port) {
4445         nxt_thread_mutex_lock(&app->mutex);
4446 
4447         app->active_requests -= got_response + dec_requests;
4448 
4449         nxt_thread_mutex_unlock(&app->mutex);
4450 
4451         goto adjust_use;
4452     }
4453 
4454     main_app_port = port->main_app_port;
4455 
4456     nxt_thread_mutex_lock(&app->mutex);
4457 
4458     main_app_port->app_responses += got_response;
4459     main_app_port->active_requests -= got_response + dec_requests;
4460     app->active_requests -= got_response + dec_requests;
4461 
4462     if (main_app_port->pair[1] != -1
4463         && (app->max_requests == 0
4464             || main_app_port->app_responses < app->max_requests))
4465     {
4466         if (main_app_port->app_link.next == NULL) {
4467             nxt_queue_insert_tail(&app->ports, &main_app_port->app_link);
4468 
4469             nxt_port_inc_use(main_app_port);
4470         }
4471     }
4472 
4473     send_quit = (app->max_requests > 0
4474                  && main_app_port->app_responses >= app->max_requests);
4475 
4476     if (send_quit) {
4477         port_unchained = nxt_queue_chk_remove(&main_app_port->app_link);
4478 
4479         nxt_port_hash_remove(&app->port_hash, main_app_port);
4480         app->port_hash_count--;
4481 
4482         main_app_port->app = NULL;
4483         app->processes--;
4484 
4485     } else {
4486         port_unchained = 0;
4487     }
4488 
4489     adjust_idle_timer = 0;
4490 
4491     if (main_app_port->pair[1] != -1 && !send_quit
4492         && main_app_port->active_requests == 0
4493         && main_app_port->active_websockets == 0
4494         && main_app_port->idle_link.next == NULL)
4495     {
4496         if (app->idle_processes == app->spare_processes
4497             && app->adjust_idle_work.data == NULL)
4498         {
4499             adjust_idle_timer = 1;
4500             app->adjust_idle_work.data = app;
4501             app->adjust_idle_work.next = NULL;
4502         }
4503 
4504         if (app->idle_processes < app->spare_processes) {
4505             nxt_queue_insert_tail(&app->spare_ports, &main_app_port->idle_link);
4506 
4507             nxt_debug(task, "app '%V' move port %PI:%d to spare_ports",
4508                       &app->name, main_app_port->pid, main_app_port->id);
4509         } else {
4510             nxt_queue_insert_tail(&app->idle_ports, &main_app_port->idle_link);
4511 
4512             main_app_port->idle_start = task->thread->engine->timers.now;
4513 
4514             nxt_debug(task, "app '%V' move port %PI:%d to idle_ports",
4515                       &app->name, main_app_port->pid, main_app_port->id);
4516         }
4517 
4518         app->idle_processes++;
4519     }
4520 
4521     nxt_thread_mutex_unlock(&app->mutex);
4522 
4523     if (adjust_idle_timer) {
4524         nxt_router_app_use(task, app, 1);
4525         nxt_event_engine_post(app->engine, &app->adjust_idle_work);
4526     }
4527 
4528     /* ? */
4529     if (main_app_port->pair[1] == -1) {
4530         nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)",
4531                   &app->name, app, main_app_port, main_app_port->pid);
4532 
4533         goto adjust_use;
4534     }
4535 
4536     if (send_quit) {
4537         nxt_debug(task, "app '%V' %p send QUIT to port", &app->name, app);
4538 
4539         nxt_port_socket_write(task, main_app_port, NXT_PORT_MSG_QUIT, -1, 0, 0,
4540                               NULL);
4541 
4542         if (port_unchained) {
4543             nxt_port_use(task, main_app_port, -1);
4544         }
4545 
4546         goto adjust_use;
4547     }
4548 
4549     nxt_debug(task, "app '%V' %p requests queue is empty, keep the port",
4550               &app->name, app);
4551 
4552 adjust_use:
4553 
4554     nxt_port_use(task, port, inc_use);
4555 }
4556 
4557 
4558 void
4559 nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port)
4560 {
4561     nxt_app_t         *app;
4562     nxt_bool_t        unchain, start_process;
4563     nxt_port_t        *idle_port;
4564     nxt_queue_link_t  *idle_lnk;
4565 
4566     app = port->app;
4567 
4568     nxt_assert(app != NULL);
4569 
4570     nxt_thread_mutex_lock(&app->mutex);
4571 
4572     nxt_port_hash_remove(&app->port_hash, port);
4573     app->port_hash_count--;
4574 
4575     if (port->id != 0) {
4576         nxt_thread_mutex_unlock(&app->mutex);
4577 
4578         nxt_debug(task, "app '%V' port (%PI, %d) closed", &app->name,
4579                   port->pid, port->id);
4580 
4581         return;
4582     }
4583 
4584     unchain = nxt_queue_chk_remove(&port->app_link);
4585 
4586     if (nxt_queue_chk_remove(&port->idle_link)) {
4587         app->idle_processes--;
4588 
4589         nxt_debug(task, "app '%V' move port %PI:%d out of %s before close",
4590                   &app->name, port->pid, port->id,
4591                   (port->idle_start ? "idle_ports" : "spare_ports"));
4592 
4593         if (port->idle_start == 0
4594             && app->idle_processes >= app->spare_processes)
4595         {
4596             nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4597 
4598             idle_lnk = nxt_queue_last(&app->idle_ports);
4599             idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link);
4600             nxt_queue_remove(idle_lnk);
4601 
4602             nxt_queue_insert_tail(&app->spare_ports, idle_lnk);
4603 
4604             idle_port->idle_start = 0;
4605 
4606             nxt_debug(task, "app '%V' move port %PI:%d from idle_ports "
4607                       "to spare_ports",
4608                       &app->name, idle_port->pid, idle_port->id);
4609         }
4610     }
4611 
4612     app->processes--;
4613 
4614     start_process = !task->thread->engine->shutdown
4615                     && nxt_router_app_can_start(app)
4616                     && nxt_router_app_need_start(app);
4617 
4618     if (start_process) {
4619         app->pending_processes++;
4620     }
4621 
4622     nxt_thread_mutex_unlock(&app->mutex);
4623 
4624     nxt_debug(task, "app '%V' pid %PI closed", &app->name, port->pid);
4625 
4626     if (unchain) {
4627         nxt_port_use(task, port, -1);
4628     }
4629 
4630     if (start_process) {
4631         nxt_router_start_app_process(task, app);
4632     }
4633 }
4634 
4635 
4636 static void
4637 nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data)
4638 {
4639     nxt_app_t           *app;
4640     nxt_bool_t          queued;
4641     nxt_port_t          *port;
4642     nxt_msec_t          timeout, threshold;
4643     nxt_queue_link_t    *lnk;
4644     nxt_event_engine_t  *engine;
4645 
4646     app = obj;
4647     queued = (data == app);
4648 
4649     nxt_debug(task, "nxt_router_adjust_idle_timer: app \"%V\", queued %b",
4650               &app->name, queued);
4651 
4652     engine = task->thread->engine;
4653 
4654     nxt_assert(app->engine == engine);
4655 
4656     threshold = engine->timers.now + app->joint->idle_timer.bias;
4657     timeout = 0;
4658 
4659     nxt_thread_mutex_lock(&app->mutex);
4660 
4661     if (queued) {
4662         app->adjust_idle_work.data = NULL;
4663     }
4664 
4665     nxt_debug(task, "app '%V' idle_processes %d, spare_processes %d",
4666               &app->name,
4667               (int) app->idle_processes, (int) app->spare_processes);
4668 
4669     while (app->idle_processes > app->spare_processes) {
4670 
4671         nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4672 
4673         lnk = nxt_queue_first(&app->idle_ports);
4674         port = nxt_queue_link_data(lnk, nxt_port_t, idle_link);
4675 
4676         timeout = port->idle_start + app->idle_timeout;
4677 
4678         nxt_debug(task, "app '%V' pid %PI, start %M, timeout %M, threshold %M",
4679                   &app->name, port->pid,
4680                   port->idle_start, timeout, threshold);
4681 
4682         if (timeout > threshold) {
4683             break;
4684         }
4685 
4686         nxt_queue_remove(lnk);
4687         lnk->next = NULL;
4688 
4689         nxt_debug(task, "app '%V' move port %PI:%d out of idle_ports (timeout)",
4690                   &app->name, port->pid, port->id);
4691 
4692         nxt_queue_chk_remove(&port->app_link);
4693 
4694         nxt_port_hash_remove(&app->port_hash, port);
4695         app->port_hash_count--;
4696 
4697         app->idle_processes--;
4698         app->processes--;
4699         port->app = NULL;
4700 
4701         nxt_thread_mutex_unlock(&app->mutex);
4702 
4703         nxt_debug(task, "app '%V' send QUIT to idle port %PI",
4704                   &app->name, port->pid);
4705 
4706         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4707 
4708         nxt_port_use(task, port, -1);
4709 
4710         nxt_thread_mutex_lock(&app->mutex);
4711     }
4712 
4713     nxt_thread_mutex_unlock(&app->mutex);
4714 
4715     if (timeout > threshold) {
4716         nxt_timer_add(engine, &app->joint->idle_timer, timeout - threshold);
4717 
4718     } else {
4719         nxt_timer_disable(engine, &app->joint->idle_timer);
4720     }
4721 
4722     if (queued) {
4723         nxt_router_app_use(task, app, -1);
4724     }
4725 }
4726 
4727 
4728 static void
4729 nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, void *data)
4730 {
4731     nxt_timer_t      *timer;
4732     nxt_app_joint_t  *app_joint;
4733 
4734     timer = obj;
4735     app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer);
4736 
4737     if (nxt_fast_path(app_joint->app != NULL)) {
4738         nxt_router_adjust_idle_timer(task, app_joint->app, NULL);
4739     }
4740 }
4741 
4742 
4743 static void
4744 nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, void *data)
4745 {
4746     nxt_timer_t      *timer;
4747     nxt_app_joint_t  *app_joint;
4748 
4749     timer = obj;
4750     app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer);
4751 
4752     nxt_router_app_joint_use(task, app_joint, -1);
4753 }
4754 
4755 
4756 static void
4757 nxt_router_free_app(nxt_task_t *task, void *obj, void *data)
4758 {
4759     nxt_app_t        *app;
4760     nxt_port_t       *port;
4761     nxt_app_joint_t  *app_joint;
4762 
4763     app_joint = obj;
4764     app = app_joint->app;
4765 
4766     for ( ;; ) {
4767         port = nxt_router_app_get_port_for_quit(task, app);
4768         if (port == NULL) {
4769             break;
4770         }
4771 
4772         nxt_debug(task, "send QUIT to app '%V' pid %PI", &app->name, port->pid);
4773 
4774         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4775 
4776         nxt_port_use(task, port, -1);
4777     }
4778 
4779     nxt_thread_mutex_lock(&app->mutex);
4780 
4781     for ( ;; ) {
4782         port = nxt_port_hash_retrieve(&app->port_hash);
4783         if (port == NULL) {
4784             break;
4785         }
4786 
4787         app->port_hash_count--;
4788 
4789         port->app = NULL;
4790 
4791         nxt_port_close(task, port);
4792 
4793         nxt_port_use(task, port, -1);
4794     }
4795 
4796     nxt_thread_mutex_unlock(&app->mutex);
4797 
4798     nxt_assert(app->processes == 0);
4799     nxt_assert(app->active_requests == 0);
4800     nxt_assert(app->port_hash_count == 0);
4801     nxt_assert(app->idle_processes == 0);
4802     nxt_assert(nxt_queue_is_empty(&app->ports));
4803     nxt_assert(nxt_queue_is_empty(&app->spare_ports));
4804     nxt_assert(nxt_queue_is_empty(&app->idle_ports));
4805 
4806     nxt_port_mmaps_destroy(&app->outgoing, 1);
4807 
4808     nxt_thread_mutex_destroy(&app->outgoing.mutex);
4809 
4810     if (app->shared_port != NULL) {
4811         app->shared_port->app = NULL;
4812         nxt_port_close(task, app->shared_port);
4813         nxt_port_use(task, app->shared_port, -1);
4814     }
4815 
4816     nxt_thread_mutex_destroy(&app->mutex);
4817     nxt_mp_destroy(app->mem_pool);
4818 
4819     app_joint->app = NULL;
4820 
4821     if (nxt_timer_delete(task->thread->engine, &app_joint->idle_timer)) {
4822         app_joint->idle_timer.handler = nxt_router_app_joint_release_handler;
4823         nxt_timer_add(task->thread->engine, &app_joint->idle_timer, 0);
4824 
4825     } else {
4826         nxt_router_app_joint_use(task, app_joint, -1);
4827     }
4828 }
4829 
4830 
4831 static void
4832 nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
4833     nxt_request_rpc_data_t *req_rpc_data)
4834 {
4835     nxt_bool_t          start_process;
4836     nxt_port_t          *port;
4837     nxt_http_request_t  *r;
4838 
4839     start_process = 0;
4840 
4841     nxt_thread_mutex_lock(&app->mutex);
4842 
4843     port = app->shared_port;
4844     nxt_port_inc_use(port);
4845 
4846     app->active_requests++;
4847 
4848     if (nxt_router_app_can_start(app) && nxt_router_app_need_start(app)) {
4849         app->pending_processes++;
4850         start_process = 1;
4851     }
4852 
4853     r = req_rpc_data->request;
4854 
4855     /*
4856      * Put request into application-wide list to be able to cancel request
4857      * if something goes wrong with application processes.
4858      */
4859     nxt_queue_insert_tail(&app->ack_waiting_req, &r->app_link);
4860 
4861     nxt_thread_mutex_unlock(&app->mutex);
4862 
4863     /*
4864      * Retain request memory pool while request is linked in ack_waiting_req
4865      * to guarantee request structure memory is accessble.
4866      */
4867     nxt_mp_retain(r->mem_pool);
4868 
4869     req_rpc_data->app_port = port;
4870     req_rpc_data->apr_action = NXT_APR_REQUEST_FAILED;
4871 
4872     if (start_process) {
4873         nxt_router_start_app_process(task, app);
4874     }
4875 }
4876 
4877 
4878 void
4879 nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r,
4880     nxt_app_t *app)
4881 {
4882     nxt_event_engine_t      *engine;
4883     nxt_request_rpc_data_t  *req_rpc_data;
4884 
4885     engine = task->thread->engine;
4886 
4887     req_rpc_data = nxt_port_rpc_register_handler_ex(task, engine->port,
4888                                           nxt_router_response_ready_handler,
4889                                           nxt_router_response_error_handler,
4890                                           sizeof(nxt_request_rpc_data_t));
4891     if (nxt_slow_path(req_rpc_data == NULL)) {
4892         nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
4893         return;
4894     }
4895 
4896     /*
4897      * At this point we have request req_rpc_data allocated and registered
4898      * in port handlers.  Need to fixup request memory pool.  Counterpart
4899      * release will be called via following call chain:
4900      *    nxt_request_rpc_data_unlink() ->
4901      *        nxt_router_http_request_release_post() ->
4902      *            nxt_router_http_request_release()
4903      */
4904     nxt_mp_retain(r->mem_pool);
4905 
4906     r->timer.task = &engine->task;
4907     r->timer.work_queue = &engine->fast_work_queue;
4908     r->timer.log = engine->task.log;
4909     r->timer.bias = NXT_TIMER_DEFAULT_BIAS;
4910 
4911     r->engine = engine;
4912     r->err_work.handler = nxt_router_http_request_error;
4913     r->err_work.task = task;
4914     r->err_work.obj = r;
4915 
4916     req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data);
4917     req_rpc_data->app = app;
4918     req_rpc_data->msg_info.body_fd = -1;
4919     req_rpc_data->rpc_cancel = 1;
4920 
4921     nxt_router_app_use(task, app, 1);
4922 
4923     req_rpc_data->request = r;
4924     r->req_rpc_data = req_rpc_data;
4925 
4926     if (r->last != NULL) {
4927         r->last->completion_handler = nxt_router_http_request_done;
4928     }
4929 
4930     nxt_router_app_port_get(task, app, req_rpc_data);
4931     nxt_router_app_prepare_request(task, req_rpc_data);
4932 }
4933 
4934 
4935 static void
4936 nxt_router_http_request_error(nxt_task_t *task, void *obj, void *data)
4937 {
4938     nxt_http_request_t  *r;
4939 
4940     r = obj;
4941 
4942     nxt_debug(task, "router http request error (rpc_data %p)", r->req_rpc_data);
4943 
4944     nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
4945 
4946     if (r->req_rpc_data != NULL) {
4947         nxt_request_rpc_data_unlink(task, r->req_rpc_data);
4948     }
4949 
4950     nxt_mp_release(r->mem_pool);
4951 }
4952 
4953 
4954 static void
4955 nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data)
4956 {
4957     nxt_http_request_t  *r;
4958 
4959     r = data;
4960 
4961     nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data);
4962 
4963     if (r->req_rpc_data != NULL) {
4964         nxt_request_rpc_data_unlink(task, r->req_rpc_data);
4965     }
4966 
4967     nxt_http_request_close_handler(task, r, r->proto.any);
4968 }
4969 
4970 
4971 static void
4972 nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj, void *data)
4973 {
4974 }
4975 
4976 
4977 static void
4978 nxt_router_app_prepare_request(nxt_task_t *task,
4979     nxt_request_rpc_data_t *req_rpc_data)
4980 {
4981     nxt_app_t         *app;
4982     nxt_buf_t         *buf, *body;
4983     nxt_int_t         res;
4984     nxt_port_t        *port, *reply_port;
4985 
4986     int                   notify;
4987     struct {
4988         nxt_port_msg_t       pm;
4989         nxt_port_mmap_msg_t  mm;
4990     } msg;
4991 
4992 
4993     app = req_rpc_data->app;
4994 
4995     nxt_assert(app != NULL);
4996 
4997     port = req_rpc_data->app_port;
4998 
4999     nxt_assert(port != NULL);
5000     nxt_assert(port->queue != NULL);
5001 
5002     reply_port = task->thread->engine->port;
5003 
5004     buf = nxt_router_prepare_msg(task, req_rpc_data->request, app,
5005                                  nxt_app_msg_prefix[app->type]);
5006     if (nxt_slow_path(buf == NULL)) {
5007         nxt_alert(task, "stream #%uD, app '%V': failed to prepare app message",
5008                   req_rpc_data->stream, &app->name);
5009 
5010         nxt_http_request_error(task, req_rpc_data->request,
5011                                NXT_HTTP_INTERNAL_SERVER_ERROR);
5012 
5013         return;
5014     }
5015 
5016     nxt_debug(task, "about to send %O bytes buffer to app process port %d",
5017                     nxt_buf_used_size(buf),
5018                     port->socket.fd);
5019 
5020     req_rpc_data->msg_info.buf = buf;
5021 
5022     body = req_rpc_data->request->body;
5023 
5024     if (body != NULL && nxt_buf_is_file(body)) {
5025         req_rpc_data->msg_info.body_fd = body->file->fd;
5026 
5027         body->file->fd = -1;
5028 
5029     } else {
5030         req_rpc_data->msg_info.body_fd = -1;
5031     }
5032 
5033     msg.pm.stream = req_rpc_data->stream;
5034     msg.pm.pid = reply_port->pid;
5035     msg.pm.reply_port = reply_port->id;
5036     msg.pm.type = NXT_PORT_MSG_REQ_HEADERS;
5037     msg.pm.last = 0;
5038     msg.pm.mmap = 1;
5039     msg.pm.nf = 0;
5040     msg.pm.mf = 0;
5041     msg.pm.tracking = 0;
5042 
5043     nxt_port_mmap_handler_t *mmap_handler = buf->parent;
5044     nxt_port_mmap_header_t *hdr = mmap_handler->hdr;
5045 
5046     msg.mm.mmap_id = hdr->id;
5047     msg.mm.chunk_id = nxt_port_mmap_chunk_id(hdr, buf->mem.pos);
5048     msg.mm.size = nxt_buf_used_size(buf);
5049 
5050     res = nxt_app_queue_send(port->queue, &msg, sizeof(msg),
5051                              req_rpc_data->stream, &notify,
5052                              &req_rpc_data->msg_info.tracking_cookie);
5053     if (nxt_fast_path(res == NXT_OK)) {
5054         if (notify != 0) {
5055             (void) nxt_port_socket_write(task, port,
5056                                          NXT_PORT_MSG_READ_QUEUE,
5057                                          -1, req_rpc_data->stream,
5058                                          reply_port->id, NULL);
5059 
5060         } else {
5061             nxt_debug(task, "queue is not empty");
5062         }
5063 
5064         buf->is_port_mmap_sent = 1;
5065         buf->mem.pos = buf->mem.free;
5066 
5067     } else {
5068         nxt_alert(task, "stream #%uD, app '%V': failed to send app message",
5069                   req_rpc_data->stream, &app->name);
5070 
5071         nxt_http_request_error(task, req_rpc_data->request,
5072                                NXT_HTTP_INTERNAL_SERVER_ERROR);
5073     }
5074 }
5075 
5076 
5077 struct nxt_fields_iter_s {
5078     nxt_list_part_t   *part;
5079     nxt_http_field_t  *field;
5080 };
5081 
5082 typedef struct nxt_fields_iter_s  nxt_fields_iter_t;
5083 
5084 
5085 static nxt_http_field_t *
5086 nxt_fields_part_first(nxt_list_part_t *part, nxt_fields_iter_t *i)
5087 {
5088     if (part == NULL) {
5089         return NULL;
5090     }
5091 
5092     while (part->nelts == 0) {
5093         part = part->next;
5094         if (part == NULL) {
5095             return NULL;
5096         }
5097     }
5098 
5099     i->part = part;
5100     i->field = nxt_list_data(i->part);
5101 
5102     return i->field;
5103 }
5104 
5105 
5106 static nxt_http_field_t *
5107 nxt_fields_first(nxt_list_t *fields, nxt_fields_iter_t *i)
5108 {
5109     return nxt_fields_part_first(nxt_list_part(fields), i);
5110 }
5111 
5112 
5113 static nxt_http_field_t *
5114 nxt_fields_next(nxt_fields_iter_t *i)
5115 {
5116     nxt_http_field_t  *end = nxt_list_data(i->part);
5117 
5118     end += i->part->nelts;
5119     i->field++;
5120 
5121     if (i->field < end) {
5122         return i->field;
5123     }
5124 
5125     return nxt_fields_part_first(i->part->next, i);
5126 }
5127 
5128 
5129 static nxt_buf_t *
5130 nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r,
5131     nxt_app_t *app, const nxt_str_t *prefix)
5132 {
5133     void                *target_pos, *query_pos;
5134     u_char              *pos, *end, *p, c;
5135     size_t              fields_count, req_size, size, free_size;
5136     size_t              copy_size;
5137     nxt_off_t           content_length;
5138     nxt_buf_t           *b, *buf, *out, **tail;
5139     nxt_http_field_t    *field, *dup;
5140     nxt_unit_field_t    *dst_field;
5141     nxt_fields_iter_t   iter, dup_iter;
5142     nxt_unit_request_t  *req;
5143 
5144     req_size = sizeof(nxt_unit_request_t)
5145                + r->method->length + 1
5146                + r->version.length + 1
5147                + r->remote->length + 1
5148                + r->local->length + 1
5149                + r->server_name.length + 1
5150                + r->target.length + 1
5151                + (r->path->start != r->target.start ? r->path->length + 1 : 0);
5152 
5153     content_length = r->content_length_n < 0 ? 0 : r->content_length_n;
5154     fields_count = 0;
5155 
5156     nxt_list_each(field, r->fields) {
5157         fields_count++;
5158 
5159         req_size += field->name_length + prefix->length + 1
5160                     + field->value_length + 1;
5161     } nxt_list_loop;
5162 
5163     req_size += fields_count * sizeof(nxt_unit_field_t);
5164 
5165     if (nxt_slow_path(req_size > PORT_MMAP_DATA_SIZE)) {
5166         nxt_alert(task, "headers to big to fit in shared memory (%d)",
5167                   (int) req_size);
5168 
5169         return NULL;
5170     }
5171 
5172     out = nxt_port_mmap_get_buf(task, &app->outgoing,
5173               nxt_min(req_size + content_length, PORT_MMAP_DATA_SIZE));
5174     if (nxt_slow_path(out == NULL)) {
5175         return NULL;
5176     }
5177 
5178     req = (nxt_unit_request_t *) out->mem.free;
5179     out->mem.free += req_size;
5180 
5181     req->app_target = r->app_target;
5182 
5183     req->content_length = content_length;
5184 
5185     p = (u_char *) (req->fields + fields_count);
5186 
5187     nxt_debug(task, "fields_count=%d", (int) fields_count);
5188 
5189     req->method_length = r->method->length;
5190     nxt_unit_sptr_set(&req->method, p);
5191     p = nxt_cpymem(p, r->method->start, r->method->length);
5192     *p++ = '\0';
5193 
5194     req->version_length = r->version.length;
5195     nxt_unit_sptr_set(&req->version, p);
5196     p = nxt_cpymem(p, r->version.start, r->version.length);
5197     *p++ = '\0';
5198 
5199     req->remote_length = r->remote->address_length;
5200     nxt_unit_sptr_set(&req->remote, p);
5201     p = nxt_cpymem(p, nxt_sockaddr_address(r->remote),
5202                    r->remote->address_length);
5203     *p++ = '\0';
5204 
5205     req->local_length = r->local->address_length;
5206     nxt_unit_sptr_set(&req->local, p);
5207     p = nxt_cpymem(p, nxt_sockaddr_address(r->local), r->local->address_length);
5208     *p++ = '\0';
5209 
5210     req->tls = (r->tls != NULL);
5211     req->websocket_handshake = r->websocket_handshake;
5212 
5213     req->server_name_length = r->server_name.length;
5214     nxt_unit_sptr_set(&req->server_name, p);
5215     p = nxt_cpymem(p, r->server_name.start, r->server_name.length);
5216     *p++ = '\0';
5217 
5218     target_pos = p;
5219     req->target_length = (uint32_t) r->target.length;
5220     nxt_unit_sptr_set(&req->target, p);
5221     p = nxt_cpymem(p, r->target.start, r->target.length);
5222     *p++ = '\0';
5223 
5224     req->path_length = (uint32_t) r->path->length;
5225     if (r->path->start == r->target.start) {
5226         nxt_unit_sptr_set(&req->path, target_pos);
5227 
5228     } else {
5229         nxt_unit_sptr_set(&req->path, p);
5230         p = nxt_cpymem(p, r->path->start, r->path->length);
5231         *p++ = '\0';
5232     }
5233 
5234     req->query_length = r->args != NULL ? (uint32_t) r->args->length : 0;
5235     if (r->args != NULL && r->args->start != NULL) {
5236         query_pos = nxt_pointer_to(target_pos,
5237                                    r->args->start - r->target.start);
5238 
5239         nxt_unit_sptr_set(&req->query, query_pos);
5240 
5241     } else {
5242         req->query.offset = 0;
5243     }
5244 
5245     req->content_length_field = NXT_UNIT_NONE_FIELD;
5246     req->content_type_field   = NXT_UNIT_NONE_FIELD;
5247     req->cookie_field         = NXT_UNIT_NONE_FIELD;
5248     req->authorization_field  = NXT_UNIT_NONE_FIELD;
5249 
5250     dst_field = req->fields;
5251 
5252     for (field = nxt_fields_first(r->fields, &iter);
5253          field != NULL;
5254          field = nxt_fields_next(&iter))
5255     {
5256         if (field->skip) {
5257             continue;
5258         }
5259 
5260         dst_field->hash = field->hash;
5261         dst_field->skip = 0;
5262         dst_field->name_length = field->name_length + prefix->length;
5263         dst_field->value_length = field->value_length;
5264 
5265         if (field == r->content_length) {
5266             req->content_length_field = dst_field - req->fields;
5267 
5268         } else if (field == r->content_type) {
5269             req->content_type_field = dst_field - req->fields;
5270 
5271         } else if (field == r->cookie) {
5272             req->cookie_field = dst_field - req->fields;
5273 
5274         } else if (field == r->authorization) {
5275             req->authorization_field = dst_field - req->fields;
5276         }
5277 
5278         nxt_debug(task, "add field 0x%04Xd, %d, %d, %p : %d %p",
5279                   (int) field->hash, (int) field->skip,
5280                   (int) field->name_length, field->name,
5281                   (int) field->value_length, field->value);
5282 
5283         if (prefix->length != 0) {
5284             nxt_unit_sptr_set(&dst_field->name, p);
5285             p = nxt_cpymem(p, prefix->start, prefix->length);
5286 
5287             end = field->name + field->name_length;
5288             for (pos = field->name; pos < end; pos++) {
5289                 c = *pos;
5290 
5291                 if (c >= 'a' && c <= 'z') {
5292                     *p++ = (c & ~0x20);
5293                     continue;
5294                 }
5295 
5296                 if (c == '-') {
5297                     *p++ = '_';
5298                     continue;
5299                 }
5300 
5301                 *p++ = c;
5302             }
5303 
5304         } else {
5305             nxt_unit_sptr_set(&dst_field->name, p);
5306             p = nxt_cpymem(p, field->name, field->name_length);
5307         }
5308 
5309         *p++ = '\0';
5310 
5311         nxt_unit_sptr_set(&dst_field->value, p);
5312         p = nxt_cpymem(p, field->value, field->value_length);
5313 
5314         if (prefix->length != 0) {
5315             dup_iter = iter;
5316 
5317             for (dup = nxt_fields_next(&dup_iter);
5318                  dup != NULL;
5319                  dup = nxt_fields_next(&dup_iter))
5320             {
5321                 if (dup->name_length != field->name_length
5322                     || dup->skip
5323                     || dup->hash != field->hash
5324                     || nxt_memcasecmp(dup->name, field->name, dup->name_length))
5325                 {
5326                     continue;
5327                 }
5328 
5329                 p = nxt_cpymem(p, ", ", 2);
5330                 p = nxt_cpymem(p, dup->value, dup->value_length);
5331 
5332                 dst_field->value_length += 2 + dup->value_length;
5333 
5334                 dup->skip = 1;
5335             }
5336         }
5337 
5338         *p++ = '\0';
5339 
5340         dst_field++;
5341     }
5342 
5343     req->fields_count = (uint32_t) (dst_field - req->fields);
5344 
5345     nxt_unit_sptr_set(&req->preread_content, out->mem.free);
5346 
5347     buf = out;
5348     tail = &buf->next;
5349 
5350     for (b = r->body; b != NULL; b = b->next) {
5351         size = nxt_buf_mem_used_size(&b->mem);
5352         pos = b->mem.pos;
5353 
5354         while (size > 0) {
5355             if (buf == NULL) {
5356                 free_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
5357 
5358                 buf = nxt_port_mmap_get_buf(task, &app->outgoing, free_size);
5359                 if (nxt_slow_path(buf == NULL)) {
5360                     while (out != NULL) {
5361                         buf = out->next;
5362                         out->next = NULL;
5363                         out->completion_handler(task, out, out->parent);
5364                         out = buf;
5365                     }
5366                     return NULL;
5367                 }
5368 
5369                 *tail = buf;
5370                 tail = &buf->next;
5371 
5372             } else {
5373                 free_size = nxt_buf_mem_free_size(&buf->mem);
5374                 if (free_size < size
5375                     && nxt_port_mmap_increase_buf(task, buf, size, 1)
5376                        == NXT_OK)
5377                 {
5378                     free_size = nxt_buf_mem_free_size(&buf->mem);
5379                 }
5380             }
5381 
5382             if (free_size > 0) {
5383                 copy_size = nxt_min(free_size, size);
5384 
5385                 buf->mem.free = nxt_cpymem(buf->mem.free, pos, copy_size);
5386 
5387                 size -= copy_size;
5388                 pos += copy_size;
5389 
5390                 if (size == 0) {
5391                     break;
5392                 }
5393             }
5394 
5395             buf = NULL;
5396         }
5397     }
5398 
5399     return out;
5400 }
5401 
5402 
5403 static void
5404 nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
5405 {
5406     nxt_timer_t              *timer;
5407     nxt_http_request_t       *r;
5408     nxt_request_rpc_data_t   *req_rpc_data;
5409 
5410     timer = obj;
5411 
5412     nxt_debug(task, "router app timeout");
5413 
5414     r = nxt_timer_data(timer, nxt_http_request_t, timer);
5415     req_rpc_data = r->timer_data;
5416 
5417     nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
5418 
5419     nxt_request_rpc_data_unlink(task, req_rpc_data);
5420 }
5421 
5422 
5423 static void
5424 nxt_router_http_request_release_post(nxt_task_t *task, nxt_http_request_t *r)
5425 {
5426     r->timer.handler = nxt_router_http_request_release;
5427     nxt_timer_add(task->thread->engine, &r->timer, 0);
5428 }
5429 
5430 
5431 static void
5432 nxt_router_http_request_release(nxt_task_t *task, void *obj, void *data)
5433 {
5434     nxt_http_request_t  *r;
5435 
5436     nxt_debug(task, "http request pool release");
5437 
5438     r = nxt_timer_data(obj, nxt_http_request_t, timer);
5439 
5440     nxt_mp_release(r->mem_pool);
5441 }
5442 
5443 
5444 static void
5445 nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5446 {
5447     size_t                   mi;
5448     uint32_t                 i;
5449     nxt_bool_t               ack;
5450     nxt_process_t            *process;
5451     nxt_free_map_t           *m;
5452     nxt_port_mmap_handler_t  *mmap_handler;
5453 
5454     nxt_debug(task, "oosm in %PI", msg->port_msg.pid);
5455 
5456     process = nxt_runtime_process_find(task->thread->runtime,
5457                                        msg->port_msg.pid);
5458     if (nxt_slow_path(process == NULL)) {
5459         return;
5460     }
5461 
5462     ack = 0;
5463 
5464     /*
5465      * To mitigate possible racing condition (when OOSM message received
5466      * after some of the memory was already freed), need to try to find
5467      * first free segment in shared memory and send ACK if found.
5468      */
5469 
5470     nxt_thread_mutex_lock(&process->incoming.mutex);
5471 
5472     for (i = 0; i < process->incoming.size; i++) {
5473         mmap_handler = process->incoming.elts[i].mmap_handler;
5474 
5475         if (nxt_slow_path(mmap_handler == NULL)) {
5476             continue;
5477         }
5478 
5479         m = mmap_handler->hdr->free_map;
5480 
5481         for (mi = 0; mi < MAX_FREE_IDX; mi++) {
5482             if (m[mi] != 0) {
5483                 ack = 1;
5484 
5485                 nxt_debug(task, "oosm: already free #%uD %uz = 0x%08xA",
5486                           i, mi, m[mi]);
5487 
5488                 break;
5489             }
5490         }
5491     }
5492 
5493     nxt_thread_mutex_unlock(&process->incoming.mutex);
5494 
5495     if (ack) {
5496         nxt_process_broadcast_shm_ack(task, process);
5497     }
5498 }
5499 
5500 
5501 static void
5502 nxt_router_get_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5503 {
5504     nxt_fd_t                 fd;
5505     nxt_port_t               *port;
5506     nxt_runtime_t            *rt;
5507     nxt_port_mmaps_t         *mmaps;
5508     nxt_port_msg_get_mmap_t  *get_mmap_msg;
5509     nxt_port_mmap_handler_t  *mmap_handler;
5510 
5511     rt = task->thread->runtime;
5512 
5513     port = nxt_runtime_port_find(rt, msg->port_msg.pid,
5514                                  msg->port_msg.reply_port);
5515     if (nxt_slow_path(port == NULL)) {
5516         nxt_alert(task, "get_mmap_handler: reply_port %PI:%d not found",
5517                   msg->port_msg.pid, msg->port_msg.reply_port);
5518 
5519         return;
5520     }
5521 
5522     if (nxt_slow_path(nxt_buf_used_size(msg->buf)
5523                       < (int) sizeof(nxt_port_msg_get_mmap_t)))
5524     {
5525         nxt_alert(task, "get_mmap_handler: message buffer too small (%d)",
5526                   (int) nxt_buf_used_size(msg->buf));
5527 
5528         return;
5529     }
5530 
5531     get_mmap_msg = (nxt_port_msg_get_mmap_t *) msg->buf->mem.pos;
5532 
5533     nxt_assert(port->type == NXT_PROCESS_APP);
5534 
5535     if (nxt_slow_path(port->app == NULL)) {
5536         nxt_alert(task, "get_mmap_handler: app == NULL for reply port %PI:%d",
5537                   port->pid, port->id);
5538 
5539         // FIXME
5540         nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
5541                               -1, msg->port_msg.stream, 0, NULL);
5542 
5543         return;
5544     }
5545 
5546     mmaps = &port->app->outgoing;
5547     nxt_thread_mutex_lock(&mmaps->mutex);
5548 
5549     if (nxt_slow_path(get_mmap_msg->id >= mmaps->size)) {
5550         nxt_thread_mutex_unlock(&mmaps->mutex);
5551 
5552         nxt_alert(task, "get_mmap_handler: mmap id is too big (%d)",
5553                   (int) get_mmap_msg->id);
5554 
5555         // FIXME
5556         nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
5557                               -1, msg->port_msg.stream, 0, NULL);
5558         return;
5559     }
5560 
5561     mmap_handler = mmaps->elts[get_mmap_msg->id].mmap_handler;
5562 
5563     fd = mmap_handler->fd;
5564 
5565     nxt_thread_mutex_unlock(&mmaps->mutex);
5566 
5567     nxt_debug(task, "get mmap %PI:%d found",
5568               msg->port_msg.pid, (int) get_mmap_msg->id);
5569 
5570     (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL);
5571 }
5572 
5573 
5574 static void
5575 nxt_router_get_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5576 {
5577     nxt_port_t               *port, *reply_port;
5578     nxt_runtime_t            *rt;
5579     nxt_port_msg_get_port_t  *get_port_msg;
5580 
5581     rt = task->thread->runtime;
5582 
5583     reply_port = nxt_runtime_port_find(rt, msg->port_msg.pid,
5584                                        msg->port_msg.reply_port);
5585     if (nxt_slow_path(reply_port == NULL)) {
5586         nxt_alert(task, "get_port_handler: reply_port %PI:%d not found",
5587                   msg->port_msg.pid, msg->port_msg.reply_port);
5588 
5589         return;
5590     }
5591 
5592     if (nxt_slow_path(nxt_buf_used_size(msg->buf)
5593                       < (int) sizeof(nxt_port_msg_get_port_t)))
5594     {
5595         nxt_alert(task, "get_port_handler: message buffer too small (%d)",
5596                   (int) nxt_buf_used_size(msg->buf));
5597 
5598         return;
5599     }
5600 
5601     get_port_msg = (nxt_port_msg_get_port_t *) msg->buf->mem.pos;
5602 
5603     port = nxt_runtime_port_find(rt, get_port_msg->pid, get_port_msg->id);
5604     if (nxt_slow_path(port == NULL)) {
5605         nxt_alert(task, "get_port_handler: port %PI:%d not found",
5606                   get_port_msg->pid, get_port_msg->id);
5607 
5608         return;
5609     }
5610 
5611     nxt_debug(task, "get port %PI:%d found", get_port_msg->pid,
5612               get_port_msg->id);
5613 
5614     (void) nxt_port_send_port(task, reply_port, port, msg->port_msg.stream);
5615 }
5616