xref: /unit/src/nxt_router.c (revision 1131:ec7d924d8dfb)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Valentin V. Bartenev
5  * Copyright (C) NGINX, Inc.
6  */
7 
8 #include <nxt_router.h>
9 #include <nxt_conf.h>
10 #if (NXT_TLS)
11 #include <nxt_cert.h>
12 #endif
13 #include <nxt_http.h>
14 #include <nxt_port_memory_int.h>
15 #include <nxt_unit_request.h>
16 #include <nxt_unit_response.h>
17 #include <nxt_router_request.h>
18 
19 typedef struct {
20     nxt_str_t         type;
21     uint32_t          processes;
22     uint32_t          max_processes;
23     uint32_t          spare_processes;
24     nxt_msec_t        timeout;
25     nxt_msec_t        res_timeout;
26     nxt_msec_t        idle_timeout;
27     uint32_t          requests;
28     nxt_conf_value_t  *limits_value;
29     nxt_conf_value_t  *processes_value;
30 } nxt_router_app_conf_t;
31 
32 
33 typedef struct {
34     nxt_str_t         pass;
35     nxt_str_t         application;
36 } nxt_router_listener_conf_t;
37 
38 
39 #if (NXT_TLS)
40 
41 typedef struct {
42     nxt_str_t          name;
43     nxt_socket_conf_t  *conf;
44 
45     nxt_queue_link_t   link;  /* for nxt_socket_conf_t.tls */
46 } nxt_router_tlssock_t;
47 
48 #endif
49 
50 
51 typedef struct {
52     nxt_socket_conf_t       *socket_conf;
53     nxt_router_temp_conf_t  *temp_conf;
54 } nxt_socket_rpc_t;
55 
56 
57 typedef struct {
58     nxt_app_t               *app;
59     nxt_router_temp_conf_t  *temp_conf;
60 } nxt_app_rpc_t;
61 
62 
63 struct nxt_port_select_state_s {
64     nxt_app_t               *app;
65     nxt_request_app_link_t  *req_app_link;
66 
67     nxt_port_t              *failed_port;
68     int                     failed_port_use_delta;
69 
70     uint8_t                 start_process;    /* 1 bit */
71     nxt_request_app_link_t  *shared_ra;
72     nxt_port_t              *port;
73 };
74 
75 typedef struct nxt_port_select_state_s nxt_port_select_state_t;
76 
77 static void nxt_router_greet_controller(nxt_task_t *task,
78     nxt_port_t *controller_port);
79 
80 static void nxt_router_port_select(nxt_task_t *task,
81     nxt_port_select_state_t *state);
82 
83 static nxt_int_t nxt_router_port_post_select(nxt_task_t *task,
84     nxt_port_select_state_t *state);
85 
86 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
87 static void nxt_request_app_link_update_peer(nxt_task_t *task,
88     nxt_request_app_link_t *req_app_link);
89 
90 
91 nxt_inline void
92 nxt_request_app_link_inc_use(nxt_request_app_link_t *req_app_link)
93 {
94     nxt_atomic_fetch_add(&req_app_link->use_count, 1);
95 }
96 
97 nxt_inline void
98 nxt_request_app_link_dec_use(nxt_request_app_link_t *req_app_link)
99 {
100 #if (NXT_DEBUG)
101     int  c;
102 
103     c = nxt_atomic_fetch_add(&req_app_link->use_count, -1);
104 
105     nxt_assert(c > 1);
106 #else
107     (void) nxt_atomic_fetch_add(&req_app_link->use_count, -1);
108 #endif
109 }
110 
111 static void nxt_request_app_link_use(nxt_task_t *task,
112     nxt_request_app_link_t *req_app_link, int i);
113 
114 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
115 static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
116 static void nxt_router_conf_ready(nxt_task_t *task,
117     nxt_router_temp_conf_t *tmcf);
118 static void nxt_router_conf_error(nxt_task_t *task,
119     nxt_router_temp_conf_t *tmcf);
120 static void nxt_router_conf_send(nxt_task_t *task,
121     nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
122 
123 static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
124     nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
125 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
126 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
127     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
128 static void nxt_router_listen_socket_ready(nxt_task_t *task,
129     nxt_port_recv_msg_t *msg, void *data);
130 static void nxt_router_listen_socket_error(nxt_task_t *task,
131     nxt_port_recv_msg_t *msg, void *data);
132 #if (NXT_TLS)
133 static void nxt_router_tls_rpc_create(nxt_task_t *task,
134     nxt_router_temp_conf_t *tmcf, nxt_router_tlssock_t *tls);
135 static void nxt_router_tls_rpc_handler(nxt_task_t *task,
136     nxt_port_recv_msg_t *msg, void *data);
137 #endif
138 static void nxt_router_app_rpc_create(nxt_task_t *task,
139     nxt_router_temp_conf_t *tmcf, nxt_app_t *app);
140 static void nxt_router_app_prefork_ready(nxt_task_t *task,
141     nxt_port_recv_msg_t *msg, void *data);
142 static void nxt_router_app_prefork_error(nxt_task_t *task,
143     nxt_port_recv_msg_t *msg, void *data);
144 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
145     nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
146 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
147     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa);
148 
149 static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
150     nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
151     const nxt_event_interface_t *interface);
152 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
153     nxt_router_engine_conf_t *recf);
154 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
155     nxt_router_engine_conf_t *recf);
156 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
157     nxt_router_engine_conf_t *recf);
158 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
159     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
160     nxt_work_handler_t handler);
161 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
162     nxt_router_engine_conf_t *recf);
163 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
164     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
165 
166 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
167     nxt_router_temp_conf_t *tmcf);
168 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
169     nxt_event_engine_t *engine);
170 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
171     nxt_router_temp_conf_t *tmcf);
172 
173 static void nxt_router_engines_post(nxt_router_t *router,
174     nxt_router_temp_conf_t *tmcf);
175 static void nxt_router_engine_post(nxt_event_engine_t *engine,
176     nxt_work_t *jobs);
177 
178 static void nxt_router_thread_start(void *data);
179 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
180     void *data);
181 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
182     void *data);
183 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
184     void *data);
185 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
186     void *data);
187 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
188     void *data);
189 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
190     void *data);
191 static void nxt_router_listen_socket_release(nxt_task_t *task,
192     nxt_socket_conf_t *skcf);
193 
194 static void nxt_router_access_log_writer(nxt_task_t *task,
195     nxt_http_request_t *r, nxt_router_access_log_t *access_log);
196 static u_char *nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now,
197     struct tm *tm, size_t size, const char *format);
198 static void nxt_router_access_log_open(nxt_task_t *task,
199     nxt_router_temp_conf_t *tmcf);
200 static void nxt_router_access_log_ready(nxt_task_t *task,
201     nxt_port_recv_msg_t *msg, void *data);
202 static void nxt_router_access_log_error(nxt_task_t *task,
203     nxt_port_recv_msg_t *msg, void *data);
204 static void nxt_router_access_log_release(nxt_task_t *task,
205     nxt_thread_spinlock_t *lock, nxt_router_access_log_t *access_log);
206 static void nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj,
207     void *data);
208 static void nxt_router_access_log_reopen_ready(nxt_task_t *task,
209     nxt_port_recv_msg_t *msg, void *data);
210 static void nxt_router_access_log_reopen_error(nxt_task_t *task,
211     nxt_port_recv_msg_t *msg, void *data);
212 
213 static void nxt_router_app_port_ready(nxt_task_t *task,
214     nxt_port_recv_msg_t *msg, void *data);
215 static void nxt_router_app_port_error(nxt_task_t *task,
216     nxt_port_recv_msg_t *msg, void *data);
217 
218 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
219 
220 static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
221     nxt_apr_action_t action);
222 static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app,
223     nxt_request_app_link_t *req_app_link);
224 
225 static void nxt_router_app_prepare_request(nxt_task_t *task,
226     nxt_request_app_link_t *req_app_link);
227 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task,
228     nxt_http_request_t *r, nxt_port_t *port, const nxt_str_t *prefix);
229 
230 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
231 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
232     void *data);
233 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj,
234     void *data);
235 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj,
236     void *data);
237 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data);
238 
239 static const nxt_http_request_state_t  nxt_http_request_send_state;
240 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
241 
242 static void nxt_router_app_joint_use(nxt_task_t *task,
243     nxt_app_joint_t *app_joint, int i);
244 
245 static nxt_int_t nxt_router_http_request_done(nxt_task_t *task,
246     nxt_http_request_t *r);
247 static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
248     void *data);
249 
250 const nxt_http_request_state_t  nxt_http_websocket;
251 
252 static nxt_router_t  *nxt_router;
253 
254 static const nxt_str_t http_prefix = nxt_string("HTTP_");
255 static const nxt_str_t empty_prefix = nxt_string("");
256 
257 static const nxt_str_t  *nxt_app_msg_prefix[] = {
258     &empty_prefix,
259     &http_prefix,
260     &http_prefix,
261     &http_prefix,
262     &http_prefix,
263     &empty_prefix,
264 };
265 
266 
267 nxt_port_handlers_t  nxt_router_process_port_handlers = {
268     .quit         = nxt_worker_process_quit_handler,
269     .new_port     = nxt_router_new_port_handler,
270     .change_file  = nxt_port_change_log_file_handler,
271     .mmap         = nxt_port_mmap_handler,
272     .data         = nxt_router_conf_data_handler,
273     .remove_pid   = nxt_router_remove_pid_handler,
274     .access_log   = nxt_router_access_log_reopen_handler,
275     .rpc_ready    = nxt_port_rpc_handler,
276     .rpc_error    = nxt_port_rpc_handler,
277 };
278 
279 
280 nxt_int_t
281 nxt_router_start(nxt_task_t *task, void *data)
282 {
283     nxt_int_t      ret;
284     nxt_port_t     *controller_port;
285     nxt_router_t   *router;
286     nxt_runtime_t  *rt;
287 
288     rt = task->thread->runtime;
289 
290 #if (NXT_TLS)
291     rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL");
292     if (nxt_slow_path(rt->tls == NULL)) {
293         return NXT_ERROR;
294     }
295 
296     ret = rt->tls->library_init(task);
297     if (nxt_slow_path(ret != NXT_OK)) {
298         return ret;
299     }
300 #endif
301 
302     ret = nxt_http_init(task, rt);
303     if (nxt_slow_path(ret != NXT_OK)) {
304         return ret;
305     }
306 
307     router = nxt_zalloc(sizeof(nxt_router_t));
308     if (nxt_slow_path(router == NULL)) {
309         return NXT_ERROR;
310     }
311 
312     nxt_queue_init(&router->engines);
313     nxt_queue_init(&router->sockets);
314     nxt_queue_init(&router->apps);
315 
316     nxt_router = router;
317 
318     controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
319     if (controller_port != NULL) {
320         nxt_router_greet_controller(task, controller_port);
321     }
322 
323     return NXT_OK;
324 }
325 
326 
327 static void
328 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port)
329 {
330     nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY,
331                           -1, 0, 0, NULL);
332 }
333 
334 
335 static void
336 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
337     void *data)
338 {
339     size_t         size;
340     uint32_t       stream;
341     nxt_mp_t       *mp;
342     nxt_int_t      ret;
343     nxt_app_t      *app;
344     nxt_buf_t      *b;
345     nxt_port_t     *main_port;
346     nxt_runtime_t  *rt;
347 
348     app = data;
349 
350     rt = task->thread->runtime;
351     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
352 
353     nxt_debug(task, "app '%V' %p start process", &app->name, app);
354 
355     size = app->name.length + 1 + app->conf.length;
356 
357     b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size);
358 
359     if (nxt_slow_path(b == NULL)) {
360         goto failed;
361     }
362 
363     nxt_buf_cpystr(b, &app->name);
364     *b->mem.free++ = '\0';
365     nxt_buf_cpystr(b, &app->conf);
366 
367     nxt_router_app_joint_use(task, app->joint, 1);
368 
369     stream = nxt_port_rpc_register_handler(task, port,
370                                            nxt_router_app_port_ready,
371                                            nxt_router_app_port_error,
372                                            -1, app->joint);
373 
374     if (nxt_slow_path(stream == 0)) {
375         nxt_router_app_joint_use(task, app->joint, -1);
376 
377         goto failed;
378     }
379 
380     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1,
381                                 stream, port->id, b);
382 
383     if (nxt_slow_path(ret != NXT_OK)) {
384         nxt_port_rpc_cancel(task, port, stream);
385 
386         nxt_router_app_joint_use(task, app->joint, -1);
387 
388         goto failed;
389     }
390 
391     nxt_router_app_use(task, app, -1);
392 
393     return;
394 
395 failed:
396 
397     if (b != NULL) {
398         mp = b->data;
399         nxt_mp_free(mp, b);
400         nxt_mp_release(mp);
401     }
402 
403     nxt_thread_mutex_lock(&app->mutex);
404 
405     app->pending_processes--;
406 
407     nxt_thread_mutex_unlock(&app->mutex);
408 
409     nxt_router_app_use(task, app, -1);
410 }
411 
412 
413 static void
414 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i)
415 {
416     app_joint->use_count += i;
417 
418     if (app_joint->use_count == 0) {
419         nxt_assert(app_joint->app == NULL);
420 
421         nxt_free(app_joint);
422     }
423 }
424 
425 
426 static nxt_int_t
427 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
428 {
429     nxt_int_t      res;
430     nxt_port_t     *router_port;
431     nxt_runtime_t  *rt;
432 
433     rt = task->thread->runtime;
434     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
435 
436     nxt_router_app_use(task, app, 1);
437 
438     res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler,
439                         app);
440 
441     if (res == NXT_OK) {
442         return res;
443     }
444 
445     nxt_thread_mutex_lock(&app->mutex);
446 
447     app->pending_processes--;
448 
449     nxt_thread_mutex_unlock(&app->mutex);
450 
451     nxt_router_app_use(task, app, -1);
452 
453     return NXT_ERROR;
454 }
455 
456 
457 nxt_inline void
458 nxt_request_app_link_init(nxt_task_t *task,
459     nxt_request_app_link_t *req_app_link, nxt_request_rpc_data_t *req_rpc_data)
460 {
461     nxt_event_engine_t  *engine;
462 
463     engine = task->thread->engine;
464 
465     nxt_memzero(req_app_link, sizeof(nxt_request_app_link_t));
466 
467     req_app_link->stream = req_rpc_data->stream;
468     req_app_link->use_count = 1;
469     req_app_link->req_rpc_data = req_rpc_data;
470     req_rpc_data->req_app_link = req_app_link;
471     req_app_link->reply_port = engine->port;
472     req_app_link->request = req_rpc_data->request;
473     req_app_link->apr_action = NXT_APR_GOT_RESPONSE;
474 
475     req_app_link->work.handler = NULL;
476     req_app_link->work.task = &engine->task;
477     req_app_link->work.obj = req_app_link;
478     req_app_link->work.data = engine;
479 }
480 
481 
482 nxt_inline nxt_request_app_link_t *
483 nxt_request_app_link_alloc(nxt_task_t *task,
484     nxt_request_app_link_t *ra_src, nxt_request_rpc_data_t *req_rpc_data)
485 {
486     nxt_mp_t                *mp;
487     nxt_request_app_link_t  *req_app_link;
488 
489     if (ra_src != NULL && ra_src->mem_pool != NULL) {
490         return ra_src;
491     }
492 
493     mp = req_rpc_data->request->mem_pool;
494 
495     req_app_link = nxt_mp_alloc(mp, sizeof(nxt_request_app_link_t));
496 
497     if (nxt_slow_path(req_app_link == NULL)) {
498 
499         req_rpc_data->req_app_link = NULL;
500 
501         if (ra_src != NULL) {
502             ra_src->req_rpc_data = NULL;
503         }
504 
505         return NULL;
506     }
507 
508     nxt_mp_retain(mp);
509 
510     nxt_request_app_link_init(task, req_app_link, req_rpc_data);
511 
512     req_app_link->mem_pool = mp;
513 
514     return req_app_link;
515 }
516 
517 
518 nxt_inline nxt_bool_t
519 nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info,
520     uint32_t stream)
521 {
522     nxt_buf_t   *b, *next;
523     nxt_bool_t  cancelled;
524 
525     if (msg_info->buf == NULL) {
526         return 0;
527     }
528 
529     cancelled = nxt_port_mmap_tracking_cancel(task, &msg_info->tracking,
530                                               stream);
531 
532     if (cancelled) {
533         nxt_debug(task, "stream #%uD: cancelled by router", stream);
534     }
535 
536     for (b = msg_info->buf; b != NULL; b = next) {
537         next = b->next;
538 
539         b->completion_handler = msg_info->completion_handler;
540 
541         if (b->is_port_mmap_sent) {
542             b->is_port_mmap_sent = cancelled == 0;
543             b->completion_handler(task, b, b->parent);
544         }
545     }
546 
547     msg_info->buf = NULL;
548 
549     return cancelled;
550 }
551 
552 
553 static void
554 nxt_request_app_link_update_peer_handler(nxt_task_t *task, void *obj,
555     void *data)
556 {
557     nxt_request_app_link_t  *req_app_link;
558 
559     req_app_link = obj;
560 
561     nxt_request_app_link_update_peer(task, req_app_link);
562 
563     nxt_request_app_link_use(task, req_app_link, -1);
564 }
565 
566 
567 static void
568 nxt_request_app_link_update_peer(nxt_task_t *task,
569     nxt_request_app_link_t *req_app_link)
570 {
571     nxt_event_engine_t      *engine;
572     nxt_request_rpc_data_t  *req_rpc_data;
573 
574     engine = req_app_link->work.data;
575 
576     if (task->thread->engine != engine) {
577         nxt_request_app_link_inc_use(req_app_link);
578 
579         req_app_link->work.handler = nxt_request_app_link_update_peer_handler;
580         req_app_link->work.task = &engine->task;
581         req_app_link->work.next = NULL;
582 
583         nxt_debug(task, "req_app_link stream #%uD post update peer to %p",
584                   req_app_link->stream, engine);
585 
586         nxt_event_engine_post(engine, &req_app_link->work);
587 
588         return;
589     }
590 
591     nxt_debug(task, "req_app_link stream #%uD update peer",
592               req_app_link->stream);
593 
594     req_rpc_data = req_app_link->req_rpc_data;
595 
596     if (req_rpc_data != NULL && req_app_link->app_port != NULL) {
597         nxt_port_rpc_ex_set_peer(task, engine->port, req_rpc_data,
598                                  req_app_link->app_port->pid);
599     }
600 
601     nxt_request_app_link_use(task, req_app_link, -1);
602 }
603 
604 
605 static void
606 nxt_request_app_link_release(nxt_task_t *task,
607     nxt_request_app_link_t *req_app_link)
608 {
609     nxt_mp_t                *mp;
610     nxt_http_request_t      *r;
611     nxt_request_rpc_data_t  *req_rpc_data;
612 
613     nxt_assert(task->thread->engine == req_app_link->work.data);
614     nxt_assert(req_app_link->use_count == 0);
615 
616     nxt_debug(task, "req_app_link stream #%uD release", req_app_link->stream);
617 
618     req_rpc_data = req_app_link->req_rpc_data;
619 
620     if (req_rpc_data != NULL) {
621         if (nxt_slow_path(req_app_link->err_code != 0)) {
622             nxt_http_request_error(task, req_rpc_data->request,
623                                    req_app_link->err_code);
624 
625         } else {
626             req_rpc_data->app_port = req_app_link->app_port;
627             req_rpc_data->apr_action = req_app_link->apr_action;
628             req_rpc_data->msg_info = req_app_link->msg_info;
629 
630             if (req_rpc_data->app->timeout != 0) {
631                 r = req_rpc_data->request;
632 
633                 r->timer.handler = nxt_router_app_timeout;
634                 r->timer_data = req_rpc_data;
635                 nxt_timer_add(task->thread->engine, &r->timer,
636                               req_rpc_data->app->timeout);
637             }
638 
639             req_app_link->app_port = NULL;
640             req_app_link->msg_info.buf = NULL;
641         }
642 
643         req_rpc_data->req_app_link = NULL;
644         req_app_link->req_rpc_data = NULL;
645     }
646 
647     if (req_app_link->app_port != NULL) {
648         nxt_router_app_port_release(task, req_app_link->app_port,
649                                     req_app_link->apr_action);
650 
651         req_app_link->app_port = NULL;
652     }
653 
654     nxt_router_msg_cancel(task, &req_app_link->msg_info, req_app_link->stream);
655 
656     mp = req_app_link->mem_pool;
657 
658     if (mp != NULL) {
659         nxt_mp_free(mp, req_app_link);
660         nxt_mp_release(mp);
661     }
662 }
663 
664 
665 static void
666 nxt_request_app_link_release_handler(nxt_task_t *task, void *obj, void *data)
667 {
668     nxt_request_app_link_t  *req_app_link;
669 
670     req_app_link = obj;
671 
672     nxt_assert(req_app_link->work.data == data);
673 
674     nxt_atomic_fetch_add(&req_app_link->use_count, -1);
675 
676     nxt_request_app_link_release(task, req_app_link);
677 }
678 
679 
680 static void
681 nxt_request_app_link_use(nxt_task_t *task, nxt_request_app_link_t *req_app_link,
682     int i)
683 {
684     int                 c;
685     nxt_event_engine_t  *engine;
686 
687     c = nxt_atomic_fetch_add(&req_app_link->use_count, i);
688 
689     if (i < 0 && c == -i) {
690         engine = req_app_link->work.data;
691 
692         if (task->thread->engine == engine) {
693             nxt_request_app_link_release(task, req_app_link);
694 
695             return;
696         }
697 
698         nxt_request_app_link_inc_use(req_app_link);
699 
700         req_app_link->work.handler = nxt_request_app_link_release_handler;
701         req_app_link->work.task = &engine->task;
702         req_app_link->work.next = NULL;
703 
704         nxt_debug(task, "req_app_link stream #%uD post release to %p",
705                   req_app_link->stream, engine);
706 
707         nxt_event_engine_post(engine, &req_app_link->work);
708     }
709 }
710 
711 
712 nxt_inline void
713 nxt_request_app_link_error(nxt_request_app_link_t *req_app_link, int code,
714     const char *str)
715 {
716     req_app_link->app_port = NULL;
717     req_app_link->err_code = code;
718     req_app_link->err_str = str;
719 }
720 
721 
722 nxt_inline void
723 nxt_request_app_link_pending(nxt_task_t *task, nxt_app_t *app,
724     nxt_request_app_link_t *req_app_link)
725 {
726     nxt_queue_insert_tail(&req_app_link->app_port->pending_requests,
727                           &req_app_link->link_port_pending);
728     nxt_queue_insert_tail(&app->pending, &req_app_link->link_app_pending);
729 
730     nxt_request_app_link_inc_use(req_app_link);
731 
732     req_app_link->res_time = nxt_thread_monotonic_time(task->thread)
733                              + app->res_timeout;
734 
735     nxt_debug(task, "req_app_link stream #%uD enqueue to pending_requests",
736               req_app_link->stream);
737 }
738 
739 
740 nxt_inline nxt_bool_t
741 nxt_queue_chk_remove(nxt_queue_link_t *lnk)
742 {
743     if (lnk->next != NULL) {
744         nxt_queue_remove(lnk);
745 
746         lnk->next = NULL;
747 
748         return 1;
749     }
750 
751     return 0;
752 }
753 
754 
755 nxt_inline void
756 nxt_request_rpc_data_unlink(nxt_task_t *task,
757     nxt_request_rpc_data_t *req_rpc_data)
758 {
759     int                     ra_use_delta;
760     nxt_request_app_link_t  *req_app_link;
761 
762     if (req_rpc_data->app_port != NULL) {
763         nxt_router_app_port_release(task, req_rpc_data->app_port,
764                                     req_rpc_data->apr_action);
765 
766         req_rpc_data->app_port = NULL;
767     }
768 
769     nxt_router_msg_cancel(task, &req_rpc_data->msg_info, req_rpc_data->stream);
770 
771     req_app_link = req_rpc_data->req_app_link;
772     if (req_app_link != NULL) {
773         req_rpc_data->req_app_link = NULL;
774         req_app_link->req_rpc_data = NULL;
775 
776         ra_use_delta = 0;
777 
778         nxt_thread_mutex_lock(&req_rpc_data->app->mutex);
779 
780         if (req_app_link->link_app_requests.next == NULL
781             && req_app_link->link_port_pending.next == NULL
782             && req_app_link->link_app_pending.next == NULL
783             && req_app_link->link_port_websockets.next == NULL)
784         {
785             req_app_link = NULL;
786 
787         } else {
788             ra_use_delta -=
789                 nxt_queue_chk_remove(&req_app_link->link_app_requests)
790                 + nxt_queue_chk_remove(&req_app_link->link_port_pending)
791                 + nxt_queue_chk_remove(&req_app_link->link_port_websockets);
792 
793             nxt_queue_chk_remove(&req_app_link->link_app_pending);
794         }
795 
796         nxt_thread_mutex_unlock(&req_rpc_data->app->mutex);
797 
798         if (req_app_link != NULL) {
799             nxt_request_app_link_use(task, req_app_link, ra_use_delta);
800         }
801     }
802 
803     if (req_rpc_data->app != NULL) {
804         nxt_router_app_use(task, req_rpc_data->app, -1);
805 
806         req_rpc_data->app = NULL;
807     }
808 
809     if (req_rpc_data->request != NULL) {
810         req_rpc_data->request->timer_data = NULL;
811 
812         nxt_router_http_request_done(task, req_rpc_data->request);
813 
814         req_rpc_data->request->req_rpc_data = NULL;
815         req_rpc_data->request = NULL;
816     }
817 }
818 
819 
820 void
821 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
822 {
823     nxt_port_new_port_handler(task, msg);
824 
825     if (msg->u.new_port != NULL
826         && msg->u.new_port->type == NXT_PROCESS_CONTROLLER)
827     {
828         nxt_router_greet_controller(task, msg->u.new_port);
829     }
830 
831     if (msg->port_msg.stream == 0) {
832         return;
833     }
834 
835     if (msg->u.new_port == NULL
836         || msg->u.new_port->type != NXT_PROCESS_WORKER)
837     {
838         msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
839     }
840 
841     nxt_port_rpc_handler(task, msg);
842 }
843 
844 
845 void
846 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
847 {
848     nxt_int_t               ret;
849     nxt_buf_t               *b;
850     nxt_router_temp_conf_t  *tmcf;
851 
852     tmcf = nxt_router_temp_conf(task);
853     if (nxt_slow_path(tmcf == NULL)) {
854         return;
855     }
856 
857     nxt_debug(task, "nxt_router_conf_data_handler(%O): %*s",
858               nxt_buf_used_size(msg->buf),
859               (size_t) nxt_buf_used_size(msg->buf), msg->buf->mem.pos);
860 
861     tmcf->router_conf->router = nxt_router;
862     tmcf->stream = msg->port_msg.stream;
863     tmcf->port = nxt_runtime_port_find(task->thread->runtime,
864                                        msg->port_msg.pid,
865                                        msg->port_msg.reply_port);
866 
867     if (nxt_slow_path(tmcf->port == NULL)) {
868         nxt_alert(task, "reply port not found");
869 
870         return;
871     }
872 
873     nxt_port_use(task, tmcf->port, 1);
874 
875     b = nxt_buf_chk_make_plain(tmcf->router_conf->mem_pool,
876                                msg->buf, msg->size);
877     if (nxt_slow_path(b == NULL)) {
878         nxt_router_conf_error(task, tmcf);
879 
880         return;
881     }
882 
883     ret = nxt_router_conf_create(task, tmcf, b->mem.pos, b->mem.free);
884 
885     if (nxt_fast_path(ret == NXT_OK)) {
886         nxt_router_conf_apply(task, tmcf, NULL);
887 
888     } else {
889         nxt_router_conf_error(task, tmcf);
890     }
891 }
892 
893 
894 static void
895 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
896     void *data)
897 {
898     union {
899         nxt_pid_t  removed_pid;
900         void       *data;
901     } u;
902 
903     u.data = data;
904 
905     nxt_port_rpc_remove_peer(task, port, u.removed_pid);
906 }
907 
908 
909 void
910 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
911 {
912     nxt_event_engine_t  *engine;
913 
914     nxt_port_remove_pid_handler(task, msg);
915 
916     nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
917     {
918         nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid,
919                       msg->u.data);
920     }
921     nxt_queue_loop;
922 
923     if (msg->port_msg.stream == 0) {
924         return;
925     }
926 
927     msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
928 
929     nxt_port_rpc_handler(task, msg);
930 }
931 
932 
933 static nxt_router_temp_conf_t *
934 nxt_router_temp_conf(nxt_task_t *task)
935 {
936     nxt_mp_t                *mp, *tmp;
937     nxt_router_conf_t       *rtcf;
938     nxt_router_temp_conf_t  *tmcf;
939 
940     mp = nxt_mp_create(1024, 128, 256, 32);
941     if (nxt_slow_path(mp == NULL)) {
942         return NULL;
943     }
944 
945     rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t));
946     if (nxt_slow_path(rtcf == NULL)) {
947         goto fail;
948     }
949 
950     rtcf->mem_pool = mp;
951 
952     tmp = nxt_mp_create(1024, 128, 256, 32);
953     if (nxt_slow_path(tmp == NULL)) {
954         goto fail;
955     }
956 
957     tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t));
958     if (nxt_slow_path(tmcf == NULL)) {
959         goto temp_fail;
960     }
961 
962     tmcf->mem_pool = tmp;
963     tmcf->router_conf = rtcf;
964     tmcf->count = 1;
965     tmcf->engine = task->thread->engine;
966 
967     tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
968                                      sizeof(nxt_router_engine_conf_t));
969     if (nxt_slow_path(tmcf->engines == NULL)) {
970         goto temp_fail;
971     }
972 
973     nxt_queue_init(&tmcf->deleting);
974     nxt_queue_init(&tmcf->keeping);
975     nxt_queue_init(&tmcf->updating);
976     nxt_queue_init(&tmcf->pending);
977     nxt_queue_init(&tmcf->creating);
978 
979 #if (NXT_TLS)
980     nxt_queue_init(&tmcf->tls);
981 #endif
982 
983     nxt_queue_init(&tmcf->apps);
984     nxt_queue_init(&tmcf->previous);
985 
986     return tmcf;
987 
988 temp_fail:
989 
990     nxt_mp_destroy(tmp);
991 
992 fail:
993 
994     nxt_mp_destroy(mp);
995 
996     return NULL;
997 }
998 
999 
1000 nxt_inline nxt_bool_t
1001 nxt_router_app_can_start(nxt_app_t *app)
1002 {
1003     return app->processes + app->pending_processes < app->max_processes
1004             && app->pending_processes < app->max_pending_processes;
1005 }
1006 
1007 
1008 nxt_inline nxt_bool_t
1009 nxt_router_app_need_start(nxt_app_t *app)
1010 {
1011     return app->idle_processes + app->pending_processes
1012             < app->spare_processes;
1013 }
1014 
1015 
1016 static void
1017 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
1018 {
1019     nxt_int_t                    ret;
1020     nxt_app_t                    *app;
1021     nxt_router_t                 *router;
1022     nxt_runtime_t                *rt;
1023     nxt_queue_link_t             *qlk;
1024     nxt_socket_conf_t            *skcf;
1025     nxt_router_conf_t            *rtcf;
1026     nxt_router_temp_conf_t       *tmcf;
1027     const nxt_event_interface_t  *interface;
1028 #if (NXT_TLS)
1029     nxt_router_tlssock_t         *tls;
1030 #endif
1031 
1032     tmcf = obj;
1033 
1034     qlk = nxt_queue_first(&tmcf->pending);
1035 
1036     if (qlk != nxt_queue_tail(&tmcf->pending)) {
1037         nxt_queue_remove(qlk);
1038         nxt_queue_insert_tail(&tmcf->creating, qlk);
1039 
1040         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1041 
1042         nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
1043 
1044         return;
1045     }
1046 
1047 #if (NXT_TLS)
1048     qlk = nxt_queue_first(&tmcf->tls);
1049 
1050     if (qlk != nxt_queue_tail(&tmcf->tls)) {
1051         nxt_queue_remove(qlk);
1052 
1053         tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link);
1054 
1055         nxt_router_tls_rpc_create(task, tmcf, tls);
1056         return;
1057     }
1058 #endif
1059 
1060     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1061 
1062         if (nxt_router_app_need_start(app)) {
1063             nxt_router_app_rpc_create(task, tmcf, app);
1064             return;
1065         }
1066 
1067     } nxt_queue_loop;
1068 
1069     rtcf = tmcf->router_conf;
1070 
1071     if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) {
1072         nxt_router_access_log_open(task, tmcf);
1073         return;
1074     }
1075 
1076     rt = task->thread->runtime;
1077 
1078     interface = nxt_service_get(rt->services, "engine", NULL);
1079 
1080     router = rtcf->router;
1081 
1082     ret = nxt_router_engines_create(task, router, tmcf, interface);
1083     if (nxt_slow_path(ret != NXT_OK)) {
1084         goto fail;
1085     }
1086 
1087     ret = nxt_router_threads_create(task, rt, tmcf);
1088     if (nxt_slow_path(ret != NXT_OK)) {
1089         goto fail;
1090     }
1091 
1092     nxt_router_apps_sort(task, router, tmcf);
1093 
1094     nxt_router_engines_post(router, tmcf);
1095 
1096     nxt_queue_add(&router->sockets, &tmcf->updating);
1097     nxt_queue_add(&router->sockets, &tmcf->creating);
1098 
1099     router->access_log = rtcf->access_log;
1100 
1101     nxt_router_conf_ready(task, tmcf);
1102 
1103     return;
1104 
1105 fail:
1106 
1107     nxt_router_conf_error(task, tmcf);
1108 
1109     return;
1110 }
1111 
1112 
1113 static void
1114 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data)
1115 {
1116     nxt_joint_job_t  *job;
1117 
1118     job = obj;
1119 
1120     nxt_router_conf_ready(task, job->tmcf);
1121 }
1122 
1123 
1124 static void
1125 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1126 {
1127     nxt_debug(task, "temp conf count:%D", tmcf->count);
1128 
1129     if (--tmcf->count == 0) {
1130         nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
1131     }
1132 }
1133 
1134 
1135 static void
1136 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1137 {
1138     nxt_app_t          *app;
1139     nxt_queue_t        new_socket_confs;
1140     nxt_socket_t       s;
1141     nxt_router_t       *router;
1142     nxt_queue_link_t   *qlk;
1143     nxt_socket_conf_t  *skcf;
1144     nxt_router_conf_t  *rtcf;
1145 
1146     nxt_alert(task, "failed to apply new conf");
1147 
1148     for (qlk = nxt_queue_first(&tmcf->creating);
1149          qlk != nxt_queue_tail(&tmcf->creating);
1150          qlk = nxt_queue_next(qlk))
1151     {
1152         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1153         s = skcf->listen->socket;
1154 
1155         if (s != -1) {
1156             nxt_socket_close(task, s);
1157         }
1158 
1159         nxt_free(skcf->listen);
1160     }
1161 
1162     nxt_queue_init(&new_socket_confs);
1163     nxt_queue_add(&new_socket_confs, &tmcf->updating);
1164     nxt_queue_add(&new_socket_confs, &tmcf->pending);
1165     nxt_queue_add(&new_socket_confs, &tmcf->creating);
1166 
1167     rtcf = tmcf->router_conf;
1168 
1169     nxt_http_routes_cleanup(task, rtcf->routes);
1170 
1171     nxt_queue_each(skcf, &new_socket_confs, nxt_socket_conf_t, link) {
1172 
1173         if (skcf->pass != NULL) {
1174             nxt_http_pass_cleanup(task, skcf->pass);
1175         }
1176 
1177     } nxt_queue_loop;
1178 
1179     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1180 
1181         nxt_router_app_unlink(task, app);
1182 
1183     } nxt_queue_loop;
1184 
1185     router = rtcf->router;
1186 
1187     nxt_queue_add(&router->sockets, &tmcf->keeping);
1188     nxt_queue_add(&router->sockets, &tmcf->deleting);
1189 
1190     nxt_queue_add(&router->apps, &tmcf->previous);
1191 
1192     // TODO: new engines and threads
1193 
1194     nxt_router_access_log_release(task, &router->lock, rtcf->access_log);
1195 
1196     nxt_mp_destroy(rtcf->mem_pool);
1197 
1198     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
1199 }
1200 
1201 
1202 static void
1203 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1204     nxt_port_msg_type_t type)
1205 {
1206     nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);
1207 
1208     nxt_port_use(task, tmcf->port, -1);
1209 
1210     tmcf->port = NULL;
1211 }
1212 
1213 
1214 static nxt_conf_map_t  nxt_router_conf[] = {
1215     {
1216         nxt_string("listeners_threads"),
1217         NXT_CONF_MAP_INT32,
1218         offsetof(nxt_router_conf_t, threads),
1219     },
1220 };
1221 
1222 
1223 static nxt_conf_map_t  nxt_router_app_conf[] = {
1224     {
1225         nxt_string("type"),
1226         NXT_CONF_MAP_STR,
1227         offsetof(nxt_router_app_conf_t, type),
1228     },
1229 
1230     {
1231         nxt_string("limits"),
1232         NXT_CONF_MAP_PTR,
1233         offsetof(nxt_router_app_conf_t, limits_value),
1234     },
1235 
1236     {
1237         nxt_string("processes"),
1238         NXT_CONF_MAP_INT32,
1239         offsetof(nxt_router_app_conf_t, processes),
1240     },
1241 
1242     {
1243         nxt_string("processes"),
1244         NXT_CONF_MAP_PTR,
1245         offsetof(nxt_router_app_conf_t, processes_value),
1246     },
1247 };
1248 
1249 
1250 static nxt_conf_map_t  nxt_router_app_limits_conf[] = {
1251     {
1252         nxt_string("timeout"),
1253         NXT_CONF_MAP_MSEC,
1254         offsetof(nxt_router_app_conf_t, timeout),
1255     },
1256 
1257     {
1258         nxt_string("reschedule_timeout"),
1259         NXT_CONF_MAP_MSEC,
1260         offsetof(nxt_router_app_conf_t, res_timeout),
1261     },
1262 
1263     {
1264         nxt_string("requests"),
1265         NXT_CONF_MAP_INT32,
1266         offsetof(nxt_router_app_conf_t, requests),
1267     },
1268 };
1269 
1270 
1271 static nxt_conf_map_t  nxt_router_app_processes_conf[] = {
1272     {
1273         nxt_string("spare"),
1274         NXT_CONF_MAP_INT32,
1275         offsetof(nxt_router_app_conf_t, spare_processes),
1276     },
1277 
1278     {
1279         nxt_string("max"),
1280         NXT_CONF_MAP_INT32,
1281         offsetof(nxt_router_app_conf_t, max_processes),
1282     },
1283 
1284     {
1285         nxt_string("idle_timeout"),
1286         NXT_CONF_MAP_MSEC,
1287         offsetof(nxt_router_app_conf_t, idle_timeout),
1288     },
1289 };
1290 
1291 
1292 static nxt_conf_map_t  nxt_router_listener_conf[] = {
1293     {
1294         nxt_string("pass"),
1295         NXT_CONF_MAP_STR_COPY,
1296         offsetof(nxt_router_listener_conf_t, pass),
1297     },
1298 
1299     {
1300         nxt_string("application"),
1301         NXT_CONF_MAP_STR_COPY,
1302         offsetof(nxt_router_listener_conf_t, application),
1303     },
1304 };
1305 
1306 
1307 static nxt_conf_map_t  nxt_router_http_conf[] = {
1308     {
1309         nxt_string("header_buffer_size"),
1310         NXT_CONF_MAP_SIZE,
1311         offsetof(nxt_socket_conf_t, header_buffer_size),
1312     },
1313 
1314     {
1315         nxt_string("large_header_buffer_size"),
1316         NXT_CONF_MAP_SIZE,
1317         offsetof(nxt_socket_conf_t, large_header_buffer_size),
1318     },
1319 
1320     {
1321         nxt_string("large_header_buffers"),
1322         NXT_CONF_MAP_SIZE,
1323         offsetof(nxt_socket_conf_t, large_header_buffers),
1324     },
1325 
1326     {
1327         nxt_string("body_buffer_size"),
1328         NXT_CONF_MAP_SIZE,
1329         offsetof(nxt_socket_conf_t, body_buffer_size),
1330     },
1331 
1332     {
1333         nxt_string("max_body_size"),
1334         NXT_CONF_MAP_SIZE,
1335         offsetof(nxt_socket_conf_t, max_body_size),
1336     },
1337 
1338     {
1339         nxt_string("idle_timeout"),
1340         NXT_CONF_MAP_MSEC,
1341         offsetof(nxt_socket_conf_t, idle_timeout),
1342     },
1343 
1344     {
1345         nxt_string("header_read_timeout"),
1346         NXT_CONF_MAP_MSEC,
1347         offsetof(nxt_socket_conf_t, header_read_timeout),
1348     },
1349 
1350     {
1351         nxt_string("body_read_timeout"),
1352         NXT_CONF_MAP_MSEC,
1353         offsetof(nxt_socket_conf_t, body_read_timeout),
1354     },
1355 
1356     {
1357         nxt_string("send_timeout"),
1358         NXT_CONF_MAP_MSEC,
1359         offsetof(nxt_socket_conf_t, send_timeout),
1360     },
1361 };
1362 
1363 
1364 static nxt_conf_map_t  nxt_router_websocket_conf[] = {
1365     {
1366         nxt_string("max_frame_size"),
1367         NXT_CONF_MAP_SIZE,
1368         offsetof(nxt_websocket_conf_t, max_frame_size),
1369     },
1370 
1371     {
1372         nxt_string("read_timeout"),
1373         NXT_CONF_MAP_MSEC,
1374         offsetof(nxt_websocket_conf_t, read_timeout),
1375     },
1376 
1377     {
1378         nxt_string("keepalive_interval"),
1379         NXT_CONF_MAP_MSEC,
1380         offsetof(nxt_websocket_conf_t, keepalive_interval),
1381     },
1382 
1383 };
1384 
1385 
1386 static nxt_int_t
1387 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1388     u_char *start, u_char *end)
1389 {
1390     u_char                      *p;
1391     size_t                      size;
1392     nxt_mp_t                    *mp;
1393     uint32_t                    next;
1394     nxt_int_t                   ret;
1395     nxt_str_t                   name, path;
1396     nxt_app_t                   *app, *prev;
1397     nxt_router_t                *router;
1398     nxt_app_joint_t             *app_joint;
1399     nxt_conf_value_t            *conf, *http, *value, *websocket;
1400     nxt_conf_value_t            *applications, *application;
1401     nxt_conf_value_t            *listeners, *listener;
1402     nxt_conf_value_t            *routes_conf;
1403     nxt_socket_conf_t           *skcf;
1404     nxt_http_routes_t           *routes;
1405     nxt_event_engine_t          *engine;
1406     nxt_app_lang_module_t       *lang;
1407     nxt_router_app_conf_t       apcf;
1408     nxt_router_access_log_t     *access_log;
1409     nxt_router_listener_conf_t  lscf;
1410 #if (NXT_TLS)
1411     nxt_router_tlssock_t        *tls;
1412 #endif
1413 
1414     static nxt_str_t  http_path = nxt_string("/settings/http");
1415     static nxt_str_t  applications_path = nxt_string("/applications");
1416     static nxt_str_t  listeners_path = nxt_string("/listeners");
1417     static nxt_str_t  routes_path = nxt_string("/routes");
1418     static nxt_str_t  access_log_path = nxt_string("/access_log");
1419 #if (NXT_TLS)
1420     static nxt_str_t  certificate_path = nxt_string("/tls/certificate");
1421 #endif
1422     static nxt_str_t  websocket_path = nxt_string("/settings/http/websocket");
1423 
1424     conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
1425     if (conf == NULL) {
1426         nxt_alert(task, "configuration parsing error");
1427         return NXT_ERROR;
1428     }
1429 
1430     mp = tmcf->router_conf->mem_pool;
1431 
1432     ret = nxt_conf_map_object(mp, conf, nxt_router_conf,
1433                               nxt_nitems(nxt_router_conf), tmcf->router_conf);
1434     if (ret != NXT_OK) {
1435         nxt_alert(task, "root map error");
1436         return NXT_ERROR;
1437     }
1438 
1439     if (tmcf->router_conf->threads == 0) {
1440         tmcf->router_conf->threads = nxt_ncpu;
1441     }
1442 
1443     router = tmcf->router_conf->router;
1444 
1445     applications = nxt_conf_get_path(conf, &applications_path);
1446 
1447     if (applications != NULL) {
1448         next = 0;
1449 
1450         for ( ;; ) {
1451             application = nxt_conf_next_object_member(applications, &name, &next);
1452             if (application == NULL) {
1453                 break;
1454             }
1455 
1456             nxt_debug(task, "application \"%V\"", &name);
1457 
1458             size = nxt_conf_json_length(application, NULL);
1459 
1460             app = nxt_malloc(sizeof(nxt_app_t) + name.length + size);
1461             if (app == NULL) {
1462                 goto fail;
1463             }
1464 
1465             nxt_memzero(app, sizeof(nxt_app_t));
1466 
1467             app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t));
1468             app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t)
1469                                                   + name.length);
1470 
1471             p = nxt_conf_json_print(app->conf.start, application, NULL);
1472             app->conf.length = p - app->conf.start;
1473 
1474             nxt_assert(app->conf.length <= size);
1475 
1476             nxt_debug(task, "application conf \"%V\"", &app->conf);
1477 
1478             prev = nxt_router_app_find(&router->apps, &name);
1479 
1480             if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
1481                 nxt_free(app);
1482 
1483                 nxt_queue_remove(&prev->link);
1484                 nxt_queue_insert_tail(&tmcf->previous, &prev->link);
1485                 continue;
1486             }
1487 
1488             apcf.processes = 1;
1489             apcf.max_processes = 1;
1490             apcf.spare_processes = 0;
1491             apcf.timeout = 0;
1492             apcf.res_timeout = 1000;
1493             apcf.idle_timeout = 15000;
1494             apcf.requests = 0;
1495             apcf.limits_value = NULL;
1496             apcf.processes_value = NULL;
1497 
1498             app_joint = nxt_malloc(sizeof(nxt_app_joint_t));
1499             if (nxt_slow_path(app_joint == NULL)) {
1500                 goto app_fail;
1501             }
1502 
1503             nxt_memzero(app_joint, sizeof(nxt_app_joint_t));
1504 
1505             ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
1506                                       nxt_nitems(nxt_router_app_conf), &apcf);
1507             if (ret != NXT_OK) {
1508                 nxt_alert(task, "application map error");
1509                 goto app_fail;
1510             }
1511 
1512             if (apcf.limits_value != NULL) {
1513 
1514                 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) {
1515                     nxt_alert(task, "application limits is not object");
1516                     goto app_fail;
1517                 }
1518 
1519                 ret = nxt_conf_map_object(mp, apcf.limits_value,
1520                                         nxt_router_app_limits_conf,
1521                                         nxt_nitems(nxt_router_app_limits_conf),
1522                                         &apcf);
1523                 if (ret != NXT_OK) {
1524                     nxt_alert(task, "application limits map error");
1525                     goto app_fail;
1526                 }
1527             }
1528 
1529             if (apcf.processes_value != NULL
1530                 && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT)
1531             {
1532                 ret = nxt_conf_map_object(mp, apcf.processes_value,
1533                                      nxt_router_app_processes_conf,
1534                                      nxt_nitems(nxt_router_app_processes_conf),
1535                                      &apcf);
1536                 if (ret != NXT_OK) {
1537                     nxt_alert(task, "application processes map error");
1538                     goto app_fail;
1539                 }
1540 
1541             } else {
1542                 apcf.max_processes = apcf.processes;
1543                 apcf.spare_processes = apcf.processes;
1544             }
1545 
1546             nxt_debug(task, "application type: %V", &apcf.type);
1547             nxt_debug(task, "application processes: %D", apcf.processes);
1548             nxt_debug(task, "application request timeout: %M", apcf.timeout);
1549             nxt_debug(task, "application reschedule timeout: %M",
1550                       apcf.res_timeout);
1551             nxt_debug(task, "application requests: %D", apcf.requests);
1552 
1553             lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
1554 
1555             if (lang == NULL) {
1556                 nxt_alert(task, "unknown application type: \"%V\"", &apcf.type);
1557                 goto app_fail;
1558             }
1559 
1560             nxt_debug(task, "application language module: \"%s\"", lang->file);
1561 
1562             ret = nxt_thread_mutex_create(&app->mutex);
1563             if (ret != NXT_OK) {
1564                 goto app_fail;
1565             }
1566 
1567             nxt_queue_init(&app->ports);
1568             nxt_queue_init(&app->spare_ports);
1569             nxt_queue_init(&app->idle_ports);
1570             nxt_queue_init(&app->requests);
1571             nxt_queue_init(&app->pending);
1572 
1573             app->name.length = name.length;
1574             nxt_memcpy(app->name.start, name.start, name.length);
1575 
1576             app->type = lang->type;
1577             app->max_processes = apcf.max_processes;
1578             app->spare_processes = apcf.spare_processes;
1579             app->max_pending_processes = apcf.spare_processes
1580                                          ? apcf.spare_processes : 1;
1581             app->timeout = apcf.timeout;
1582             app->res_timeout = apcf.res_timeout * 1000000;
1583             app->idle_timeout = apcf.idle_timeout;
1584             app->max_pending_responses = 2;
1585             app->max_requests = apcf.requests;
1586 
1587             engine = task->thread->engine;
1588 
1589             app->engine = engine;
1590 
1591             app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;
1592             app->adjust_idle_work.task = &engine->task;
1593             app->adjust_idle_work.obj = app;
1594 
1595             nxt_queue_insert_tail(&tmcf->apps, &app->link);
1596 
1597             nxt_router_app_use(task, app, 1);
1598 
1599             app->joint = app_joint;
1600 
1601             app_joint->use_count = 1;
1602             app_joint->app = app;
1603 
1604             app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS;
1605             app_joint->idle_timer.work_queue = &engine->fast_work_queue;
1606             app_joint->idle_timer.handler = nxt_router_app_idle_timeout;
1607             app_joint->idle_timer.task = &engine->task;
1608             app_joint->idle_timer.log = app_joint->idle_timer.task->log;
1609 
1610             app_joint->free_app_work.handler = nxt_router_free_app;
1611             app_joint->free_app_work.task = &engine->task;
1612             app_joint->free_app_work.obj = app_joint;
1613         }
1614     }
1615 
1616     routes_conf = nxt_conf_get_path(conf, &routes_path);
1617     if (nxt_fast_path(routes_conf != NULL)) {
1618         routes = nxt_http_routes_create(task, tmcf, routes_conf);
1619         if (nxt_slow_path(routes == NULL)) {
1620             return NXT_ERROR;
1621         }
1622         tmcf->router_conf->routes = routes;
1623     }
1624 
1625     http = nxt_conf_get_path(conf, &http_path);
1626 #if 0
1627     if (http == NULL) {
1628         nxt_alert(task, "no \"http\" block");
1629         return NXT_ERROR;
1630     }
1631 #endif
1632 
1633     websocket = nxt_conf_get_path(conf, &websocket_path);
1634 
1635     listeners = nxt_conf_get_path(conf, &listeners_path);
1636 
1637     if (listeners != NULL) {
1638         next = 0;
1639 
1640         for ( ;; ) {
1641             listener = nxt_conf_next_object_member(listeners, &name, &next);
1642             if (listener == NULL) {
1643                 break;
1644             }
1645 
1646             skcf = nxt_router_socket_conf(task, tmcf, &name);
1647             if (skcf == NULL) {
1648                 goto fail;
1649             }
1650 
1651             nxt_memzero(&lscf, sizeof(lscf));
1652 
1653             ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
1654                                       nxt_nitems(nxt_router_listener_conf),
1655                                       &lscf);
1656             if (ret != NXT_OK) {
1657                 nxt_alert(task, "listener map error");
1658                 goto fail;
1659             }
1660 
1661             nxt_debug(task, "application: %V", &lscf.application);
1662 
1663             // STUB, default values if http block is not defined.
1664             skcf->header_buffer_size = 2048;
1665             skcf->large_header_buffer_size = 8192;
1666             skcf->large_header_buffers = 4;
1667             skcf->body_buffer_size = 16 * 1024;
1668             skcf->max_body_size = 8 * 1024 * 1024;
1669             skcf->idle_timeout = 180 * 1000;
1670             skcf->header_read_timeout = 30 * 1000;
1671             skcf->body_read_timeout = 30 * 1000;
1672             skcf->send_timeout = 30 * 1000;
1673 
1674             skcf->websocket_conf.max_frame_size = 1024 * 1024;
1675             skcf->websocket_conf.read_timeout = 60 * 1000;
1676             skcf->websocket_conf.keepalive_interval = 30 * 1000;
1677 
1678             if (http != NULL) {
1679                 ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
1680                                           nxt_nitems(nxt_router_http_conf),
1681                                           skcf);
1682                 if (ret != NXT_OK) {
1683                     nxt_alert(task, "http map error");
1684                     goto fail;
1685                 }
1686             }
1687 
1688             if (websocket != NULL) {
1689                 ret = nxt_conf_map_object(mp, websocket,
1690                                           nxt_router_websocket_conf,
1691                                           nxt_nitems(nxt_router_websocket_conf),
1692                                           &skcf->websocket_conf);
1693                 if (ret != NXT_OK) {
1694                     nxt_alert(task, "websocket map error");
1695                     goto fail;
1696                 }
1697             }
1698 
1699 #if (NXT_TLS)
1700             value = nxt_conf_get_path(listener, &certificate_path);
1701 
1702             if (value != NULL) {
1703                 nxt_conf_get_string(value, &name);
1704 
1705                 tls = nxt_mp_get(mp, sizeof(nxt_router_tlssock_t));
1706                 if (nxt_slow_path(tls == NULL)) {
1707                     goto fail;
1708                 }
1709 
1710                 tls->name = name;
1711                 tls->conf = skcf;
1712 
1713                 nxt_queue_insert_tail(&tmcf->tls, &tls->link);
1714             }
1715 #endif
1716 
1717             skcf->listen->handler = nxt_http_conn_init;
1718             skcf->router_conf = tmcf->router_conf;
1719             skcf->router_conf->count++;
1720 
1721             if (lscf.pass.length != 0) {
1722                 skcf->pass = nxt_http_pass_create(task, tmcf, &lscf.pass);
1723 
1724             /* COMPATIBILITY: listener application. */
1725             } else if (lscf.application.length > 0) {
1726                 skcf->pass = nxt_http_pass_application(task, tmcf,
1727                                                        &lscf.application);
1728             }
1729         }
1730     }
1731 
1732     value = nxt_conf_get_path(conf, &access_log_path);
1733 
1734     if (value != NULL) {
1735         nxt_conf_get_string(value, &path);
1736 
1737         access_log = router->access_log;
1738 
1739         if (access_log != NULL && nxt_strstr_eq(&path, &access_log->path)) {
1740             nxt_thread_spin_lock(&router->lock);
1741             access_log->count++;
1742             nxt_thread_spin_unlock(&router->lock);
1743 
1744         } else {
1745             access_log = nxt_malloc(sizeof(nxt_router_access_log_t)
1746                                     + path.length);
1747             if (access_log == NULL) {
1748                 nxt_alert(task, "failed to allocate access log structure");
1749                 goto fail;
1750             }
1751 
1752             access_log->fd = -1;
1753             access_log->handler = &nxt_router_access_log_writer;
1754             access_log->count = 1;
1755 
1756             access_log->path.length = path.length;
1757             access_log->path.start = (u_char *) access_log
1758                                      + sizeof(nxt_router_access_log_t);
1759 
1760             nxt_memcpy(access_log->path.start, path.start, path.length);
1761         }
1762 
1763         tmcf->router_conf->access_log = access_log;
1764     }
1765 
1766     nxt_http_routes_resolve(task, tmcf);
1767 
1768     nxt_queue_add(&tmcf->deleting, &router->sockets);
1769     nxt_queue_init(&router->sockets);
1770 
1771     return NXT_OK;
1772 
1773 app_fail:
1774 
1775     nxt_free(app);
1776 
1777 fail:
1778 
1779     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1780 
1781         nxt_queue_remove(&app->link);
1782         nxt_thread_mutex_destroy(&app->mutex);
1783         nxt_free(app);
1784 
1785     } nxt_queue_loop;
1786 
1787     return NXT_ERROR;
1788 }
1789 
1790 
1791 static nxt_app_t *
1792 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
1793 {
1794     nxt_app_t  *app;
1795 
1796     nxt_queue_each(app, queue, nxt_app_t, link) {
1797 
1798         if (nxt_strstr_eq(name, &app->name)) {
1799             return app;
1800         }
1801 
1802     } nxt_queue_loop;
1803 
1804     return NULL;
1805 }
1806 
1807 
1808 nxt_app_t *
1809 nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name)
1810 {
1811     nxt_app_t  *app;
1812 
1813     app = nxt_router_app_find(&tmcf->apps, name);
1814 
1815     if (app == NULL) {
1816         app = nxt_router_app_find(&tmcf->previous, name);
1817     }
1818 
1819     return app;
1820 }
1821 
1822 
1823 static nxt_socket_conf_t *
1824 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1825     nxt_str_t *name)
1826 {
1827     size_t               size;
1828     nxt_int_t            ret;
1829     nxt_bool_t           wildcard;
1830     nxt_sockaddr_t       *sa;
1831     nxt_socket_conf_t    *skcf;
1832     nxt_listen_socket_t  *ls;
1833 
1834     sa = nxt_sockaddr_parse(tmcf->mem_pool, name);
1835     if (nxt_slow_path(sa == NULL)) {
1836         nxt_alert(task, "invalid listener \"%V\"", name);
1837         return NULL;
1838     }
1839 
1840     sa->type = SOCK_STREAM;
1841 
1842     nxt_debug(task, "router listener: \"%*s\"",
1843               (size_t) sa->length, nxt_sockaddr_start(sa));
1844 
1845     skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t));
1846     if (nxt_slow_path(skcf == NULL)) {
1847         return NULL;
1848     }
1849 
1850     size = nxt_sockaddr_size(sa);
1851 
1852     ret = nxt_router_listen_socket_find(tmcf, skcf, sa);
1853 
1854     if (ret != NXT_OK) {
1855 
1856         ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size);
1857         if (nxt_slow_path(ls == NULL)) {
1858             return NULL;
1859         }
1860 
1861         skcf->listen = ls;
1862 
1863         ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t));
1864         nxt_memcpy(ls->sockaddr, sa, size);
1865 
1866         nxt_listen_socket_remote_size(ls);
1867 
1868         ls->socket = -1;
1869         ls->backlog = NXT_LISTEN_BACKLOG;
1870         ls->flags = NXT_NONBLOCK;
1871         ls->read_after_accept = 1;
1872     }
1873 
1874     switch (sa->u.sockaddr.sa_family) {
1875 #if (NXT_HAVE_UNIX_DOMAIN)
1876     case AF_UNIX:
1877         wildcard = 0;
1878         break;
1879 #endif
1880 #if (NXT_INET6)
1881     case AF_INET6:
1882         wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr);
1883         break;
1884 #endif
1885     case AF_INET:
1886     default:
1887         wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY);
1888         break;
1889     }
1890 
1891     if (!wildcard) {
1892         skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size);
1893         if (nxt_slow_path(skcf->sockaddr == NULL)) {
1894             return NULL;
1895         }
1896 
1897         nxt_memcpy(skcf->sockaddr, sa, size);
1898     }
1899 
1900     return skcf;
1901 }
1902 
1903 
1904 static nxt_int_t
1905 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
1906     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa)
1907 {
1908     nxt_router_t       *router;
1909     nxt_queue_link_t   *qlk;
1910     nxt_socket_conf_t  *skcf;
1911 
1912     router = tmcf->router_conf->router;
1913 
1914     for (qlk = nxt_queue_first(&router->sockets);
1915          qlk != nxt_queue_tail(&router->sockets);
1916          qlk = nxt_queue_next(qlk))
1917     {
1918         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1919 
1920         if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) {
1921             nskcf->listen = skcf->listen;
1922 
1923             nxt_queue_remove(qlk);
1924             nxt_queue_insert_tail(&tmcf->keeping, qlk);
1925 
1926             nxt_queue_insert_tail(&tmcf->updating, &nskcf->link);
1927 
1928             return NXT_OK;
1929         }
1930     }
1931 
1932     nxt_queue_insert_tail(&tmcf->pending, &nskcf->link);
1933 
1934     return NXT_DECLINED;
1935 }
1936 
1937 
1938 static void
1939 nxt_router_listen_socket_rpc_create(nxt_task_t *task,
1940     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)
1941 {
1942     size_t            size;
1943     uint32_t          stream;
1944     nxt_int_t         ret;
1945     nxt_buf_t         *b;
1946     nxt_port_t        *main_port, *router_port;
1947     nxt_runtime_t     *rt;
1948     nxt_socket_rpc_t  *rpc;
1949 
1950     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
1951     if (rpc == NULL) {
1952         goto fail;
1953     }
1954 
1955     rpc->socket_conf = skcf;
1956     rpc->temp_conf = tmcf;
1957 
1958     size = nxt_sockaddr_size(skcf->listen->sockaddr);
1959 
1960     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
1961     if (b == NULL) {
1962         goto fail;
1963     }
1964 
1965     b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size);
1966 
1967     rt = task->thread->runtime;
1968     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
1969     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
1970 
1971     stream = nxt_port_rpc_register_handler(task, router_port,
1972                                            nxt_router_listen_socket_ready,
1973                                            nxt_router_listen_socket_error,
1974                                            main_port->pid, rpc);
1975     if (nxt_slow_path(stream == 0)) {
1976         goto fail;
1977     }
1978 
1979     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1,
1980                                 stream, router_port->id, b);
1981 
1982     if (nxt_slow_path(ret != NXT_OK)) {
1983         nxt_port_rpc_cancel(task, router_port, stream);
1984         goto fail;
1985     }
1986 
1987     return;
1988 
1989 fail:
1990 
1991     nxt_router_conf_error(task, tmcf);
1992 }
1993 
1994 
1995 static void
1996 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
1997     void *data)
1998 {
1999     nxt_int_t         ret;
2000     nxt_socket_t      s;
2001     nxt_socket_rpc_t  *rpc;
2002 
2003     rpc = data;
2004 
2005     s = msg->fd;
2006 
2007     ret = nxt_socket_nonblocking(task, s);
2008     if (nxt_slow_path(ret != NXT_OK)) {
2009         goto fail;
2010     }
2011 
2012     nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr);
2013 
2014     ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
2015     if (nxt_slow_path(ret != NXT_OK)) {
2016         goto fail;
2017     }
2018 
2019     rpc->socket_conf->listen->socket = s;
2020 
2021     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2022                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2023 
2024     return;
2025 
2026 fail:
2027 
2028     nxt_socket_close(task, s);
2029 
2030     nxt_router_conf_error(task, rpc->temp_conf);
2031 }
2032 
2033 
2034 static void
2035 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2036     void *data)
2037 {
2038     nxt_socket_rpc_t        *rpc;
2039     nxt_router_temp_conf_t  *tmcf;
2040 
2041     rpc = data;
2042     tmcf = rpc->temp_conf;
2043 
2044 #if 0
2045     u_char                  *p;
2046     size_t                  size;
2047     uint8_t                 error;
2048     nxt_buf_t               *in, *out;
2049     nxt_sockaddr_t          *sa;
2050 
2051     static nxt_str_t  socket_errors[] = {
2052         nxt_string("ListenerSystem"),
2053         nxt_string("ListenerNoIPv6"),
2054         nxt_string("ListenerPort"),
2055         nxt_string("ListenerInUse"),
2056         nxt_string("ListenerNoAddress"),
2057         nxt_string("ListenerNoAccess"),
2058         nxt_string("ListenerPath"),
2059     };
2060 
2061     sa = rpc->socket_conf->listen->sockaddr;
2062 
2063     in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size);
2064 
2065     if (nxt_slow_path(in == NULL)) {
2066         return;
2067     }
2068 
2069     p = in->mem.pos;
2070 
2071     error = *p++;
2072 
2073     size = nxt_length("listen socket error: ")
2074            + nxt_length("{listener: \"\", code:\"\", message: \"\"}")
2075            + sa->length + socket_errors[error].length + (in->mem.free - p);
2076 
2077     out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2078     if (nxt_slow_path(out == NULL)) {
2079         return;
2080     }
2081 
2082     out->mem.free = nxt_sprintf(out->mem.free, out->mem.end,
2083                         "listen socket error: "
2084                         "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}",
2085                         (size_t) sa->length, nxt_sockaddr_start(sa),
2086                         &socket_errors[error], in->mem.free - p, p);
2087 
2088     nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos);
2089 #endif
2090 
2091     nxt_router_conf_error(task, tmcf);
2092 }
2093 
2094 
2095 #if (NXT_TLS)
2096 
2097 static void
2098 nxt_router_tls_rpc_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2099     nxt_router_tlssock_t *tls)
2100 {
2101     nxt_socket_rpc_t  *rpc;
2102 
2103     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
2104     if (rpc == NULL) {
2105         nxt_router_conf_error(task, tmcf);
2106         return;
2107     }
2108 
2109     rpc->socket_conf = tls->conf;
2110     rpc->temp_conf = tmcf;
2111 
2112     nxt_cert_store_get(task, &tls->name, tmcf->mem_pool,
2113                        nxt_router_tls_rpc_handler, rpc);
2114 }
2115 
2116 
2117 static void
2118 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2119     void *data)
2120 {
2121     nxt_mp_t               *mp;
2122     nxt_int_t              ret;
2123     nxt_tls_conf_t         *tlscf;
2124     nxt_socket_rpc_t       *rpc;
2125     nxt_router_temp_conf_t *tmcf;
2126 
2127     nxt_debug(task, "tls rpc handler");
2128 
2129     rpc = data;
2130     tmcf = rpc->temp_conf;
2131 
2132     if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) {
2133         goto fail;
2134     }
2135 
2136     mp = tmcf->router_conf->mem_pool;
2137 
2138     tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t));
2139     if (nxt_slow_path(tlscf == NULL)) {
2140         goto fail;
2141     }
2142 
2143     tlscf->chain_file = msg->fd;
2144 
2145     ret = task->thread->runtime->tls->server_init(task, tlscf);
2146     if (nxt_slow_path(ret != NXT_OK)) {
2147         goto fail;
2148     }
2149 
2150     rpc->socket_conf->tls = tlscf;
2151 
2152     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2153                        nxt_router_conf_apply, task, tmcf, NULL);
2154     return;
2155 
2156 fail:
2157 
2158     nxt_router_conf_error(task, tmcf);
2159 }
2160 
2161 #endif
2162 
2163 
2164 static void
2165 nxt_router_app_rpc_create(nxt_task_t *task,
2166     nxt_router_temp_conf_t *tmcf, nxt_app_t *app)
2167 {
2168     size_t         size;
2169     uint32_t       stream;
2170     nxt_int_t      ret;
2171     nxt_buf_t      *b;
2172     nxt_port_t     *main_port, *router_port;
2173     nxt_runtime_t  *rt;
2174     nxt_app_rpc_t  *rpc;
2175 
2176     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_app_rpc_t));
2177     if (rpc == NULL) {
2178         goto fail;
2179     }
2180 
2181     rpc->app = app;
2182     rpc->temp_conf = tmcf;
2183 
2184     nxt_debug(task, "app '%V' prefork", &app->name);
2185 
2186     size = app->name.length + 1 + app->conf.length;
2187 
2188     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2189     if (nxt_slow_path(b == NULL)) {
2190         goto fail;
2191     }
2192 
2193     nxt_buf_cpystr(b, &app->name);
2194     *b->mem.free++ = '\0';
2195     nxt_buf_cpystr(b, &app->conf);
2196 
2197     rt = task->thread->runtime;
2198     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2199     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2200 
2201     stream = nxt_port_rpc_register_handler(task, router_port,
2202                                            nxt_router_app_prefork_ready,
2203                                            nxt_router_app_prefork_error,
2204                                            -1, rpc);
2205     if (nxt_slow_path(stream == 0)) {
2206         goto fail;
2207     }
2208 
2209     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1,
2210                                 stream, router_port->id, b);
2211 
2212     if (nxt_slow_path(ret != NXT_OK)) {
2213         nxt_port_rpc_cancel(task, router_port, stream);
2214         goto fail;
2215     }
2216 
2217     app->pending_processes++;
2218 
2219     return;
2220 
2221 fail:
2222 
2223     nxt_router_conf_error(task, tmcf);
2224 }
2225 
2226 
2227 static void
2228 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2229     void *data)
2230 {
2231     nxt_app_t           *app;
2232     nxt_port_t          *port;
2233     nxt_app_rpc_t       *rpc;
2234     nxt_event_engine_t  *engine;
2235 
2236     rpc = data;
2237     app = rpc->app;
2238 
2239     port = msg->u.new_port;
2240     port->app = app;
2241 
2242     app->pending_processes--;
2243     app->processes++;
2244     app->idle_processes++;
2245 
2246     engine = task->thread->engine;
2247 
2248     nxt_queue_insert_tail(&app->ports, &port->app_link);
2249     nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
2250 
2251     port->idle_start = 0;
2252 
2253     nxt_port_inc_use(port);
2254 
2255     nxt_work_queue_add(&engine->fast_work_queue,
2256                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2257 }
2258 
2259 
2260 static void
2261 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2262     void *data)
2263 {
2264     nxt_app_t               *app;
2265     nxt_app_rpc_t           *rpc;
2266     nxt_router_temp_conf_t  *tmcf;
2267 
2268     rpc = data;
2269     app = rpc->app;
2270     tmcf = rpc->temp_conf;
2271 
2272     nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"",
2273             &app->name);
2274 
2275     app->pending_processes--;
2276 
2277     nxt_router_conf_error(task, tmcf);
2278 }
2279 
2280 
2281 static nxt_int_t
2282 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
2283     nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
2284 {
2285     nxt_int_t                 ret;
2286     nxt_uint_t                n, threads;
2287     nxt_queue_link_t          *qlk;
2288     nxt_router_engine_conf_t  *recf;
2289 
2290     threads = tmcf->router_conf->threads;
2291 
2292     tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
2293                                      sizeof(nxt_router_engine_conf_t));
2294     if (nxt_slow_path(tmcf->engines == NULL)) {
2295         return NXT_ERROR;
2296     }
2297 
2298     n = 0;
2299 
2300     for (qlk = nxt_queue_first(&router->engines);
2301          qlk != nxt_queue_tail(&router->engines);
2302          qlk = nxt_queue_next(qlk))
2303     {
2304         recf = nxt_array_zero_add(tmcf->engines);
2305         if (nxt_slow_path(recf == NULL)) {
2306             return NXT_ERROR;
2307         }
2308 
2309         recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0);
2310 
2311         if (n < threads) {
2312             recf->action = NXT_ROUTER_ENGINE_KEEP;
2313             ret = nxt_router_engine_conf_update(tmcf, recf);
2314 
2315         } else {
2316             recf->action = NXT_ROUTER_ENGINE_DELETE;
2317             ret = nxt_router_engine_conf_delete(tmcf, recf);
2318         }
2319 
2320         if (nxt_slow_path(ret != NXT_OK)) {
2321             return ret;
2322         }
2323 
2324         n++;
2325     }
2326 
2327     tmcf->new_threads = n;
2328 
2329     while (n < threads) {
2330         recf = nxt_array_zero_add(tmcf->engines);
2331         if (nxt_slow_path(recf == NULL)) {
2332             return NXT_ERROR;
2333         }
2334 
2335         recf->action = NXT_ROUTER_ENGINE_ADD;
2336 
2337         recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
2338         if (nxt_slow_path(recf->engine == NULL)) {
2339             return NXT_ERROR;
2340         }
2341 
2342         ret = nxt_router_engine_conf_create(tmcf, recf);
2343         if (nxt_slow_path(ret != NXT_OK)) {
2344             return ret;
2345         }
2346 
2347         n++;
2348     }
2349 
2350     return NXT_OK;
2351 }
2352 
2353 
2354 static nxt_int_t
2355 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
2356     nxt_router_engine_conf_t *recf)
2357 {
2358     nxt_int_t  ret;
2359 
2360     ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating,
2361                                           nxt_router_listen_socket_create);
2362     if (nxt_slow_path(ret != NXT_OK)) {
2363         return ret;
2364     }
2365 
2366     ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating,
2367                                           nxt_router_listen_socket_create);
2368     if (nxt_slow_path(ret != NXT_OK)) {
2369         return ret;
2370     }
2371 
2372     return ret;
2373 }
2374 
2375 
2376 static nxt_int_t
2377 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
2378     nxt_router_engine_conf_t *recf)
2379 {
2380     nxt_int_t  ret;
2381 
2382     ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating,
2383                                           nxt_router_listen_socket_create);
2384     if (nxt_slow_path(ret != NXT_OK)) {
2385         return ret;
2386     }
2387 
2388     ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating,
2389                                           nxt_router_listen_socket_update);
2390     if (nxt_slow_path(ret != NXT_OK)) {
2391         return ret;
2392     }
2393 
2394     ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting);
2395     if (nxt_slow_path(ret != NXT_OK)) {
2396         return ret;
2397     }
2398 
2399     return ret;
2400 }
2401 
2402 
2403 static nxt_int_t
2404 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
2405     nxt_router_engine_conf_t *recf)
2406 {
2407     nxt_int_t  ret;
2408 
2409     ret = nxt_router_engine_quit(tmcf, recf);
2410     if (nxt_slow_path(ret != NXT_OK)) {
2411         return ret;
2412     }
2413 
2414     ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->updating);
2415     if (nxt_slow_path(ret != NXT_OK)) {
2416         return ret;
2417     }
2418 
2419     return nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting);
2420 }
2421 
2422 
2423 static nxt_int_t
2424 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
2425     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
2426     nxt_work_handler_t handler)
2427 {
2428     nxt_joint_job_t          *job;
2429     nxt_queue_link_t         *qlk;
2430     nxt_socket_conf_t        *skcf;
2431     nxt_socket_conf_joint_t  *joint;
2432 
2433     for (qlk = nxt_queue_first(sockets);
2434          qlk != nxt_queue_tail(sockets);
2435          qlk = nxt_queue_next(qlk))
2436     {
2437         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
2438         if (nxt_slow_path(job == NULL)) {
2439             return NXT_ERROR;
2440         }
2441 
2442         job->work.next = recf->jobs;
2443         recf->jobs = &job->work;
2444 
2445         job->task = tmcf->engine->task;
2446         job->work.handler = handler;
2447         job->work.task = &job->task;
2448         job->work.obj = job;
2449         job->tmcf = tmcf;
2450 
2451         tmcf->count++;
2452 
2453         joint = nxt_mp_alloc(tmcf->router_conf->mem_pool,
2454                              sizeof(nxt_socket_conf_joint_t));
2455         if (nxt_slow_path(joint == NULL)) {
2456             return NXT_ERROR;
2457         }
2458 
2459         job->work.data = joint;
2460 
2461         joint->count = 1;
2462 
2463         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2464         skcf->count++;
2465         joint->socket_conf = skcf;
2466 
2467         joint->engine = recf->engine;
2468     }
2469 
2470     return NXT_OK;
2471 }
2472 
2473 
2474 static nxt_int_t
2475 nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
2476     nxt_router_engine_conf_t *recf)
2477 {
2478     nxt_joint_job_t  *job;
2479 
2480     job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
2481     if (nxt_slow_path(job == NULL)) {
2482         return NXT_ERROR;
2483     }
2484 
2485     job->work.next = recf->jobs;
2486     recf->jobs = &job->work;
2487 
2488     job->task = tmcf->engine->task;
2489     job->work.handler = nxt_router_worker_thread_quit;
2490     job->work.task = &job->task;
2491     job->work.obj = NULL;
2492     job->work.data = NULL;
2493     job->tmcf = NULL;
2494 
2495     return NXT_OK;
2496 }
2497 
2498 
2499 static nxt_int_t
2500 nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
2501     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets)
2502 {
2503     nxt_joint_job_t   *job;
2504     nxt_queue_link_t  *qlk;
2505 
2506     for (qlk = nxt_queue_first(sockets);
2507          qlk != nxt_queue_tail(sockets);
2508          qlk = nxt_queue_next(qlk))
2509     {
2510         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
2511         if (nxt_slow_path(job == NULL)) {
2512             return NXT_ERROR;
2513         }
2514 
2515         job->work.next = recf->jobs;
2516         recf->jobs = &job->work;
2517 
2518         job->task = tmcf->engine->task;
2519         job->work.handler = nxt_router_listen_socket_delete;
2520         job->work.task = &job->task;
2521         job->work.obj = job;
2522         job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2523         job->tmcf = tmcf;
2524 
2525         tmcf->count++;
2526     }
2527 
2528     return NXT_OK;
2529 }
2530 
2531 
2532 static nxt_int_t
2533 nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
2534     nxt_router_temp_conf_t *tmcf)
2535 {
2536     nxt_int_t                 ret;
2537     nxt_uint_t                i, threads;
2538     nxt_router_engine_conf_t  *recf;
2539 
2540     recf = tmcf->engines->elts;
2541     threads = tmcf->router_conf->threads;
2542 
2543     for (i = tmcf->new_threads; i < threads; i++) {
2544         ret = nxt_router_thread_create(task, rt, recf[i].engine);
2545         if (nxt_slow_path(ret != NXT_OK)) {
2546             return ret;
2547         }
2548     }
2549 
2550     return NXT_OK;
2551 }
2552 
2553 
2554 static nxt_int_t
2555 nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
2556     nxt_event_engine_t *engine)
2557 {
2558     nxt_int_t            ret;
2559     nxt_thread_link_t    *link;
2560     nxt_thread_handle_t  handle;
2561 
2562     link = nxt_zalloc(sizeof(nxt_thread_link_t));
2563 
2564     if (nxt_slow_path(link == NULL)) {
2565         return NXT_ERROR;
2566     }
2567 
2568     link->start = nxt_router_thread_start;
2569     link->engine = engine;
2570     link->work.handler = nxt_router_thread_exit_handler;
2571     link->work.task = task;
2572     link->work.data = link;
2573 
2574     nxt_queue_insert_tail(&rt->engines, &engine->link);
2575 
2576     ret = nxt_thread_create(&handle, link);
2577 
2578     if (nxt_slow_path(ret != NXT_OK)) {
2579         nxt_queue_remove(&engine->link);
2580     }
2581 
2582     return ret;
2583 }
2584 
2585 
2586 static void
2587 nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
2588     nxt_router_temp_conf_t *tmcf)
2589 {
2590     nxt_app_t  *app;
2591 
2592     nxt_queue_each(app, &router->apps, nxt_app_t, link) {
2593 
2594         nxt_router_app_unlink(task, app);
2595 
2596     } nxt_queue_loop;
2597 
2598     nxt_queue_add(&router->apps, &tmcf->previous);
2599     nxt_queue_add(&router->apps, &tmcf->apps);
2600 }
2601 
2602 
2603 static void
2604 nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf)
2605 {
2606     nxt_uint_t                n;
2607     nxt_event_engine_t        *engine;
2608     nxt_router_engine_conf_t  *recf;
2609 
2610     recf = tmcf->engines->elts;
2611 
2612     for (n = tmcf->engines->nelts; n != 0; n--) {
2613         engine = recf->engine;
2614 
2615         switch (recf->action) {
2616 
2617         case NXT_ROUTER_ENGINE_KEEP:
2618             break;
2619 
2620         case NXT_ROUTER_ENGINE_ADD:
2621             nxt_queue_insert_tail(&router->engines, &engine->link0);
2622             break;
2623 
2624         case NXT_ROUTER_ENGINE_DELETE:
2625             nxt_queue_remove(&engine->link0);
2626             break;
2627         }
2628 
2629         nxt_router_engine_post(engine, recf->jobs);
2630 
2631         recf++;
2632     }
2633 }
2634 
2635 
2636 static void
2637 nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs)
2638 {
2639     nxt_work_t  *work, *next;
2640 
2641     for (work = jobs; work != NULL; work = next) {
2642         next = work->next;
2643         work->next = NULL;
2644 
2645         nxt_event_engine_post(engine, work);
2646     }
2647 }
2648 
2649 
2650 static nxt_port_handlers_t  nxt_router_app_port_handlers = {
2651     .rpc_error = nxt_port_rpc_handler,
2652     .mmap      = nxt_port_mmap_handler,
2653     .data      = nxt_port_rpc_handler,
2654 };
2655 
2656 
2657 static void
2658 nxt_router_thread_start(void *data)
2659 {
2660     nxt_int_t           ret;
2661     nxt_port_t          *port;
2662     nxt_task_t          *task;
2663     nxt_thread_t        *thread;
2664     nxt_thread_link_t   *link;
2665     nxt_event_engine_t  *engine;
2666 
2667     link = data;
2668     engine = link->engine;
2669     task = &engine->task;
2670 
2671     thread = nxt_thread();
2672 
2673     nxt_event_engine_thread_adopt(engine);
2674 
2675     /* STUB */
2676     thread->runtime = engine->task.thread->runtime;
2677 
2678     engine->task.thread = thread;
2679     engine->task.log = thread->log;
2680     thread->engine = engine;
2681     thread->task = &engine->task;
2682 #if 0
2683     thread->fiber = &engine->fibers->fiber;
2684 #endif
2685 
2686     engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
2687     if (nxt_slow_path(engine->mem_pool == NULL)) {
2688         return;
2689     }
2690 
2691     port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid,
2692                         NXT_PROCESS_ROUTER);
2693     if (nxt_slow_path(port == NULL)) {
2694         return;
2695     }
2696 
2697     ret = nxt_port_socket_init(task, port, 0);
2698     if (nxt_slow_path(ret != NXT_OK)) {
2699         nxt_port_use(task, port, -1);
2700         return;
2701     }
2702 
2703     engine->port = port;
2704 
2705     nxt_port_enable(task, port, &nxt_router_app_port_handlers);
2706 
2707     nxt_event_engine_start(engine);
2708 }
2709 
2710 
2711 static void
2712 nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data)
2713 {
2714     nxt_joint_job_t          *job;
2715     nxt_socket_conf_t        *skcf;
2716     nxt_listen_event_t       *lev;
2717     nxt_listen_socket_t      *ls;
2718     nxt_thread_spinlock_t    *lock;
2719     nxt_socket_conf_joint_t  *joint;
2720 
2721     job = obj;
2722     joint = data;
2723 
2724     nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link);
2725 
2726     skcf = joint->socket_conf;
2727     ls = skcf->listen;
2728 
2729     lev = nxt_listen_event(task, ls);
2730     if (nxt_slow_path(lev == NULL)) {
2731         nxt_router_listen_socket_release(task, skcf);
2732         return;
2733     }
2734 
2735     lev->socket.data = joint;
2736 
2737     lock = &skcf->router_conf->router->lock;
2738 
2739     nxt_thread_spin_lock(lock);
2740     ls->count++;
2741     nxt_thread_spin_unlock(lock);
2742 
2743     job->work.next = NULL;
2744     job->work.handler = nxt_router_conf_wait;
2745 
2746     nxt_event_engine_post(job->tmcf->engine, &job->work);
2747 }
2748 
2749 
2750 nxt_inline nxt_listen_event_t *
2751 nxt_router_listen_event(nxt_queue_t *listen_connections,
2752     nxt_socket_conf_t *skcf)
2753 {
2754     nxt_socket_t        fd;
2755     nxt_queue_link_t    *qlk;
2756     nxt_listen_event_t  *lev;
2757 
2758     fd = skcf->listen->socket;
2759 
2760     for (qlk = nxt_queue_first(listen_connections);
2761          qlk != nxt_queue_tail(listen_connections);
2762          qlk = nxt_queue_next(qlk))
2763     {
2764         lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link);
2765 
2766         if (fd == lev->socket.fd) {
2767             return lev;
2768         }
2769     }
2770 
2771     return NULL;
2772 }
2773 
2774 
2775 static void
2776 nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data)
2777 {
2778     nxt_joint_job_t          *job;
2779     nxt_event_engine_t       *engine;
2780     nxt_listen_event_t       *lev;
2781     nxt_socket_conf_joint_t  *joint, *old;
2782 
2783     job = obj;
2784     joint = data;
2785 
2786     engine = task->thread->engine;
2787 
2788     nxt_queue_insert_tail(&engine->joints, &joint->link);
2789 
2790     lev = nxt_router_listen_event(&engine->listen_connections,
2791                                   joint->socket_conf);
2792 
2793     old = lev->socket.data;
2794     lev->socket.data = joint;
2795     lev->listen = joint->socket_conf->listen;
2796 
2797     job->work.next = NULL;
2798     job->work.handler = nxt_router_conf_wait;
2799 
2800     nxt_event_engine_post(job->tmcf->engine, &job->work);
2801 
2802     /*
2803      * The task is allocated from configuration temporary
2804      * memory pool so it can be freed after engine post operation.
2805      */
2806 
2807     nxt_router_conf_release(&engine->task, old);
2808 }
2809 
2810 
2811 static void
2812 nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data)
2813 {
2814     nxt_joint_job_t     *job;
2815     nxt_socket_conf_t   *skcf;
2816     nxt_listen_event_t  *lev;
2817     nxt_event_engine_t  *engine;
2818 
2819     job = obj;
2820     skcf = data;
2821 
2822     engine = task->thread->engine;
2823 
2824     lev = nxt_router_listen_event(&engine->listen_connections, skcf);
2825 
2826     nxt_fd_event_delete(engine, &lev->socket);
2827 
2828     nxt_debug(task, "engine %p: listen socket delete: %d", engine,
2829               lev->socket.fd);
2830 
2831     lev->timer.handler = nxt_router_listen_socket_close;
2832     lev->timer.work_queue = &engine->fast_work_queue;
2833 
2834     nxt_timer_add(engine, &lev->timer, 0);
2835 
2836     job->work.next = NULL;
2837     job->work.handler = nxt_router_conf_wait;
2838 
2839     nxt_event_engine_post(job->tmcf->engine, &job->work);
2840 }
2841 
2842 
2843 static void
2844 nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, void *data)
2845 {
2846     nxt_event_engine_t  *engine;
2847 
2848     nxt_debug(task, "router worker thread quit");
2849 
2850     engine = task->thread->engine;
2851 
2852     engine->shutdown = 1;
2853 
2854     if (nxt_queue_is_empty(&engine->joints)) {
2855         nxt_thread_exit(task->thread);
2856     }
2857 }
2858 
2859 
2860 static void
2861 nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data)
2862 {
2863     nxt_timer_t              *timer;
2864     nxt_listen_event_t       *lev;
2865     nxt_socket_conf_joint_t  *joint;
2866 
2867     timer = obj;
2868     lev = nxt_timer_data(timer, nxt_listen_event_t, timer);
2869 
2870     nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine,
2871               lev->socket.fd);
2872 
2873     nxt_queue_remove(&lev->link);
2874 
2875     joint = lev->socket.data;
2876     lev->socket.data = NULL;
2877 
2878     /* 'task' refers to lev->task and we cannot use after nxt_free() */
2879     task = &task->thread->engine->task;
2880 
2881     nxt_router_listen_socket_release(task, joint->socket_conf);
2882 
2883     nxt_router_listen_event_release(task, lev, joint);
2884 }
2885 
2886 
2887 static void
2888 nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf)
2889 {
2890     nxt_listen_socket_t    *ls;
2891     nxt_thread_spinlock_t  *lock;
2892 
2893     ls = skcf->listen;
2894     lock = &skcf->router_conf->router->lock;
2895 
2896     nxt_thread_spin_lock(lock);
2897 
2898     nxt_debug(task, "engine %p: listen socket release: ls->count %D",
2899               task->thread->engine, ls->count);
2900 
2901     if (--ls->count != 0) {
2902         ls = NULL;
2903     }
2904 
2905     nxt_thread_spin_unlock(lock);
2906 
2907     if (ls != NULL) {
2908         nxt_socket_close(task, ls->socket);
2909         nxt_free(ls);
2910     }
2911 }
2912 
2913 
2914 void
2915 nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev,
2916     nxt_socket_conf_joint_t *joint)
2917 {
2918     nxt_event_engine_t  *engine;
2919 
2920     nxt_debug(task, "listen event count: %D", lev->count);
2921 
2922     if (--lev->count == 0) {
2923         nxt_free(lev);
2924     }
2925 
2926     if (joint != NULL) {
2927         nxt_router_conf_release(task, joint);
2928     }
2929 
2930     engine = task->thread->engine;
2931 
2932     if (engine->shutdown && nxt_queue_is_empty(&engine->joints)) {
2933         nxt_thread_exit(task->thread);
2934     }
2935 }
2936 
2937 
2938 void
2939 nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
2940 {
2941     nxt_socket_conf_t      *skcf;
2942     nxt_router_conf_t      *rtcf;
2943     nxt_thread_spinlock_t  *lock;
2944 
2945     nxt_debug(task, "conf joint %p count: %D", joint, joint->count);
2946 
2947     if (--joint->count != 0) {
2948         return;
2949     }
2950 
2951     nxt_queue_remove(&joint->link);
2952 
2953     /*
2954      * The joint content can not be safely used after the critical
2955      * section protected by the spinlock because its memory pool may
2956      * be already destroyed by another thread.
2957      */
2958     skcf = joint->socket_conf;
2959     rtcf = skcf->router_conf;
2960     lock = &rtcf->router->lock;
2961 
2962     nxt_thread_spin_lock(lock);
2963 
2964     nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count,
2965               rtcf, rtcf->count);
2966 
2967     if (--skcf->count != 0) {
2968         skcf = NULL;
2969         rtcf = NULL;
2970 
2971     } else {
2972         nxt_queue_remove(&skcf->link);
2973 
2974         if (--rtcf->count != 0) {
2975             rtcf = NULL;
2976         }
2977     }
2978 
2979     nxt_thread_spin_unlock(lock);
2980 
2981     if (skcf != NULL) {
2982         if (skcf->pass != NULL) {
2983             nxt_http_pass_cleanup(task, skcf->pass);
2984         }
2985 
2986 #if (NXT_TLS)
2987         if (skcf->tls != NULL) {
2988             task->thread->runtime->tls->server_free(task, skcf->tls);
2989         }
2990 #endif
2991     }
2992 
2993     /* TODO remove engine->port */
2994     /* TODO excude from connected ports */
2995 
2996     if (rtcf != NULL) {
2997         nxt_debug(task, "old router conf is destroyed");
2998 
2999         nxt_http_routes_cleanup(task, rtcf->routes);
3000 
3001         nxt_router_access_log_release(task, lock, rtcf->access_log);
3002 
3003         nxt_mp_thread_adopt(rtcf->mem_pool);
3004 
3005         nxt_mp_destroy(rtcf->mem_pool);
3006     }
3007 }
3008 
3009 
3010 static void
3011 nxt_router_access_log_writer(nxt_task_t *task, nxt_http_request_t *r,
3012     nxt_router_access_log_t *access_log)
3013 {
3014     size_t     size;
3015     u_char     *buf, *p;
3016     nxt_off_t  bytes;
3017 
3018     static nxt_time_string_t  date_cache = {
3019         (nxt_atomic_uint_t) -1,
3020         nxt_router_access_log_date,
3021         "%02d/%s/%4d:%02d:%02d:%02d %c%02d%02d",
3022         nxt_length("31/Dec/1986:19:40:00 +0300"),
3023         NXT_THREAD_TIME_LOCAL,
3024         NXT_THREAD_TIME_SEC,
3025     };
3026 
3027     size = r->remote->address_length
3028            + 6                  /* ' - - [' */
3029            + date_cache.size
3030            + 3                  /* '] "' */
3031            + r->method->length
3032            + 1                  /* space */
3033            + r->target.length
3034            + 1                  /* space */
3035            + r->version.length
3036            + 2                  /* '" ' */
3037            + 3                  /* status */
3038            + 1                  /* space */
3039            + NXT_OFF_T_LEN
3040            + 2                  /* ' "' */
3041            + (r->referer != NULL ? r->referer->value_length : 1)
3042            + 3                  /* '" "' */
3043            + (r->user_agent != NULL ? r->user_agent->value_length : 1)
3044            + 2                  /* '"\n' */
3045     ;
3046 
3047     buf = nxt_mp_nget(r->mem_pool, size);
3048     if (nxt_slow_path(buf == NULL)) {
3049         return;
3050     }
3051 
3052     p = nxt_cpymem(buf, nxt_sockaddr_address(r->remote),
3053                    r->remote->address_length);
3054 
3055     p = nxt_cpymem(p, " - - [", 6);
3056 
3057     p = nxt_thread_time_string(task->thread, &date_cache, p);
3058 
3059     p = nxt_cpymem(p, "] \"", 3);
3060 
3061     if (r->method->length != 0) {
3062         p = nxt_cpymem(p, r->method->start, r->method->length);
3063 
3064         if (r->target.length != 0) {
3065             *p++ = ' ';
3066             p = nxt_cpymem(p, r->target.start, r->target.length);
3067 
3068             if (r->version.length != 0) {
3069                 *p++ = ' ';
3070                 p = nxt_cpymem(p, r->version.start, r->version.length);
3071             }
3072         }
3073 
3074     } else {
3075         *p++ = '-';
3076     }
3077 
3078     p = nxt_cpymem(p, "\" ", 2);
3079 
3080     p = nxt_sprintf(p, p + 3, "%03d", r->status);
3081 
3082     *p++ = ' ';
3083 
3084     bytes = nxt_http_proto[r->protocol].body_bytes_sent(task, r->proto);
3085 
3086     p = nxt_sprintf(p, p + NXT_OFF_T_LEN, "%O", bytes);
3087 
3088     p = nxt_cpymem(p, " \"", 2);
3089 
3090     if (r->referer != NULL) {
3091         p = nxt_cpymem(p, r->referer->value, r->referer->value_length);
3092 
3093     } else {
3094         *p++ = '-';
3095     }
3096 
3097     p = nxt_cpymem(p, "\" \"", 3);
3098 
3099     if (r->user_agent != NULL) {
3100         p = nxt_cpymem(p, r->user_agent->value, r->user_agent->value_length);
3101 
3102     } else {
3103         *p++ = '-';
3104     }
3105 
3106     p = nxt_cpymem(p, "\"\n", 2);
3107 
3108     nxt_fd_write(access_log->fd, buf, p - buf);
3109 }
3110 
3111 
3112 static u_char *
3113 nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, struct tm *tm,
3114     size_t size, const char *format)
3115 {
3116     u_char  sign;
3117     time_t  gmtoff;
3118 
3119     static const char  *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun",
3120                                     "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
3121 
3122     gmtoff = nxt_timezone(tm) / 60;
3123 
3124     if (gmtoff < 0) {
3125         gmtoff = -gmtoff;
3126         sign = '-';
3127 
3128     } else {
3129         sign = '+';
3130     }
3131 
3132     return nxt_sprintf(buf, buf + size, format,
3133                        tm->tm_mday, month[tm->tm_mon], tm->tm_year + 1900,
3134                        tm->tm_hour, tm->tm_min, tm->tm_sec,
3135                        sign, gmtoff / 60, gmtoff % 60);
3136 }
3137 
3138 
3139 static void
3140 nxt_router_access_log_open(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
3141 {
3142     uint32_t                 stream;
3143     nxt_int_t                ret;
3144     nxt_buf_t                *b;
3145     nxt_port_t               *main_port, *router_port;
3146     nxt_runtime_t            *rt;
3147     nxt_router_access_log_t  *access_log;
3148 
3149     access_log = tmcf->router_conf->access_log;
3150 
3151     b = nxt_buf_mem_alloc(tmcf->mem_pool, access_log->path.length + 1, 0);
3152     if (nxt_slow_path(b == NULL)) {
3153         goto fail;
3154     }
3155 
3156     nxt_buf_cpystr(b, &access_log->path);
3157     *b->mem.free++ = '\0';
3158 
3159     rt = task->thread->runtime;
3160     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
3161     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
3162 
3163     stream = nxt_port_rpc_register_handler(task, router_port,
3164                                            nxt_router_access_log_ready,
3165                                            nxt_router_access_log_error,
3166