xref: /unit/src/nxt_router.c (revision 804:fe8d2dea28dd)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Valentin V. Bartenev
5  * Copyright (C) NGINX, Inc.
6  */
7 
8 #include <nxt_router.h>
9 #include <nxt_conf.h>
10 #if (NXT_TLS)
11 #include <nxt_cert.h>
12 #endif
13 #include <nxt_http.h>
14 #include <nxt_port_memory_int.h>
15 #include <nxt_unit_request.h>
16 #include <nxt_unit_response.h>
17 
18 
19 typedef struct {
20     nxt_str_t         type;
21     uint32_t          processes;
22     uint32_t          max_processes;
23     uint32_t          spare_processes;
24     nxt_msec_t        timeout;
25     nxt_msec_t        res_timeout;
26     nxt_msec_t        idle_timeout;
27     uint32_t          requests;
28     nxt_conf_value_t  *limits_value;
29     nxt_conf_value_t  *processes_value;
30 } nxt_router_app_conf_t;
31 
32 
33 typedef struct {
34     nxt_str_t  application;
35 } nxt_router_listener_conf_t;
36 
37 
38 #if (NXT_TLS)
39 
40 typedef struct {
41     nxt_str_t          name;
42     nxt_socket_conf_t  *conf;
43 
44     nxt_queue_link_t   link;  /* for nxt_socket_conf_t.tls */
45 } nxt_router_tlssock_t;
46 
47 #endif
48 
49 
50 typedef struct nxt_msg_info_s {
51     nxt_buf_t                 *buf;
52     nxt_port_mmap_tracking_t  tracking;
53     nxt_work_handler_t        completion_handler;
54 } nxt_msg_info_t;
55 
56 
57 typedef struct nxt_req_app_link_s nxt_req_app_link_t;
58 
59 
60 typedef struct {
61     uint32_t                 stream;
62     nxt_app_t                *app;
63     nxt_port_t               *app_port;
64     nxt_app_parse_ctx_t      *ap;
65     nxt_msg_info_t           msg_info;
66     nxt_req_app_link_t       *ra;
67 
68     nxt_queue_link_t         link;     /* for nxt_conn_t.requests */
69 } nxt_req_conn_link_t;
70 
71 
72 struct nxt_req_app_link_s {
73     uint32_t             stream;
74     nxt_atomic_t         use_count;
75     nxt_port_t           *app_port;
76     nxt_port_t           *reply_port;
77     nxt_app_parse_ctx_t  *ap;
78     nxt_msg_info_t       msg_info;
79     nxt_req_conn_link_t  *rc;
80 
81     nxt_nsec_t           res_time;
82 
83     nxt_queue_link_t     link_app_requests; /* for nxt_app_t.requests */
84     nxt_queue_link_t     link_port_pending; /* for nxt_port_t.pending_requests */
85     nxt_queue_link_t     link_app_pending;  /* for nxt_app_t.pending */
86 
87     nxt_mp_t             *mem_pool;
88     nxt_work_t           work;
89 
90     int                  err_code;
91     const char           *err_str;
92 };
93 
94 
95 typedef struct {
96     nxt_socket_conf_t       *socket_conf;
97     nxt_router_temp_conf_t  *temp_conf;
98 } nxt_socket_rpc_t;
99 
100 
101 typedef struct {
102     nxt_app_t               *app;
103     nxt_router_temp_conf_t  *temp_conf;
104 } nxt_app_rpc_t;
105 
106 
107 struct nxt_port_select_state_s {
108     nxt_app_t           *app;
109     nxt_req_app_link_t  *ra;
110 
111     nxt_port_t          *failed_port;
112     int                 failed_port_use_delta;
113 
114     uint8_t             start_process;    /* 1 bit */
115     nxt_req_app_link_t  *shared_ra;
116     nxt_port_t          *port;
117 };
118 
119 typedef struct nxt_port_select_state_s nxt_port_select_state_t;
120 
121 static void nxt_router_greet_controller(nxt_task_t *task,
122     nxt_port_t *controller_port);
123 
124 static void nxt_router_port_select(nxt_task_t *task,
125     nxt_port_select_state_t *state);
126 
127 static nxt_int_t nxt_router_port_post_select(nxt_task_t *task,
128     nxt_port_select_state_t *state);
129 
130 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
131 
132 nxt_inline void
133 nxt_router_ra_inc_use(nxt_req_app_link_t *ra)
134 {
135     nxt_atomic_fetch_add(&ra->use_count, 1);
136 }
137 
138 nxt_inline void
139 nxt_router_ra_dec_use(nxt_req_app_link_t *ra)
140 {
141 #if (NXT_DEBUG)
142     int  c;
143 
144     c = nxt_atomic_fetch_add(&ra->use_count, -1);
145 
146     nxt_assert(c > 1);
147 #else
148     (void) nxt_atomic_fetch_add(&ra->use_count, -1);
149 #endif
150 }
151 
152 static void nxt_router_ra_use(nxt_task_t *task, nxt_req_app_link_t *ra, int i);
153 
154 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
155 static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
156 static void nxt_router_conf_ready(nxt_task_t *task,
157     nxt_router_temp_conf_t *tmcf);
158 static void nxt_router_conf_error(nxt_task_t *task,
159     nxt_router_temp_conf_t *tmcf);
160 static void nxt_router_conf_send(nxt_task_t *task,
161     nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
162 
163 static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
164     nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
165 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
166 static nxt_app_t *nxt_router_listener_application(nxt_router_temp_conf_t *tmcf,
167     nxt_str_t *name);
168 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
169     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
170 static void nxt_router_listen_socket_ready(nxt_task_t *task,
171     nxt_port_recv_msg_t *msg, void *data);
172 static void nxt_router_listen_socket_error(nxt_task_t *task,
173     nxt_port_recv_msg_t *msg, void *data);
174 #if (NXT_TLS)
175 static void nxt_router_tls_rpc_create(nxt_task_t *task,
176     nxt_router_temp_conf_t *tmcf, nxt_router_tlssock_t *tls);
177 static void nxt_router_tls_rpc_handler(nxt_task_t *task,
178     nxt_port_recv_msg_t *msg, void *data);
179 #endif
180 static void nxt_router_app_rpc_create(nxt_task_t *task,
181     nxt_router_temp_conf_t *tmcf, nxt_app_t *app);
182 static void nxt_router_app_prefork_ready(nxt_task_t *task,
183     nxt_port_recv_msg_t *msg, void *data);
184 static void nxt_router_app_prefork_error(nxt_task_t *task,
185     nxt_port_recv_msg_t *msg, void *data);
186 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
187     nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
188 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
189     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa);
190 
191 static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
192     nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
193     const nxt_event_interface_t *interface);
194 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
195     nxt_router_engine_conf_t *recf);
196 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
197     nxt_router_engine_conf_t *recf);
198 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
199     nxt_router_engine_conf_t *recf);
200 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
201     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
202     nxt_work_handler_t handler);
203 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
204     nxt_router_engine_conf_t *recf);
205 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
206     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
207 
208 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
209     nxt_router_temp_conf_t *tmcf);
210 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
211     nxt_event_engine_t *engine);
212 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
213     nxt_router_temp_conf_t *tmcf);
214 
215 static void nxt_router_engines_post(nxt_router_t *router,
216     nxt_router_temp_conf_t *tmcf);
217 static void nxt_router_engine_post(nxt_event_engine_t *engine,
218     nxt_work_t *jobs);
219 
220 static void nxt_router_thread_start(void *data);
221 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
222     void *data);
223 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
224     void *data);
225 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
226     void *data);
227 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
228     void *data);
229 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
230     void *data);
231 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
232     void *data);
233 static void nxt_router_listen_socket_release(nxt_task_t *task,
234     nxt_socket_conf_t *skcf);
235 
236 static void nxt_router_access_log_writer(nxt_task_t *task,
237     nxt_http_request_t *r, nxt_router_access_log_t *access_log);
238 static u_char *nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now,
239     struct tm *tm, size_t size, const char *format);
240 static void nxt_router_access_log_open(nxt_task_t *task,
241     nxt_router_temp_conf_t *tmcf);
242 static void nxt_router_access_log_ready(nxt_task_t *task,
243     nxt_port_recv_msg_t *msg, void *data);
244 static void nxt_router_access_log_error(nxt_task_t *task,
245     nxt_port_recv_msg_t *msg, void *data);
246 static void nxt_router_access_log_release(nxt_task_t *task,
247     nxt_thread_spinlock_t *lock, nxt_router_access_log_t *access_log);
248 static void nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj,
249     void *data);
250 static void nxt_router_access_log_reopen_ready(nxt_task_t *task,
251     nxt_port_recv_msg_t *msg, void *data);
252 static void nxt_router_access_log_reopen_error(nxt_task_t *task,
253     nxt_port_recv_msg_t *msg, void *data);
254 
255 static void nxt_router_app_port_ready(nxt_task_t *task,
256     nxt_port_recv_msg_t *msg, void *data);
257 static void nxt_router_app_port_error(nxt_task_t *task,
258     nxt_port_recv_msg_t *msg, void *data);
259 
260 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
261 static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
262     uint32_t request_failed, uint32_t got_response);
263 static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app,
264     nxt_req_app_link_t *ra);
265 
266 static void nxt_router_app_prepare_request(nxt_task_t *task,
267     nxt_req_app_link_t *ra);
268 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
269     nxt_port_t *port, const nxt_str_t *prefix);
270 
271 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
272 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
273     void *data);
274 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj,
275     void *data);
276 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj,
277     void *data);
278 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data);
279 
280 static const nxt_http_request_state_t  nxt_http_request_send_state;
281 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
282 
283 static void nxt_router_app_joint_use(nxt_task_t *task,
284     nxt_app_joint_t *app_joint, int i);
285 
286 static nxt_router_t  *nxt_router;
287 
288 static const nxt_str_t http_prefix = nxt_string("HTTP_");
289 static const nxt_str_t empty_prefix = nxt_string("");
290 
291 static const nxt_str_t  *nxt_app_msg_prefix[] = {
292     &empty_prefix,
293     &http_prefix,
294     &http_prefix,
295     &http_prefix,
296     &http_prefix,
297 };
298 
299 
300 nxt_port_handlers_t  nxt_router_process_port_handlers = {
301     .quit         = nxt_worker_process_quit_handler,
302     .new_port     = nxt_router_new_port_handler,
303     .change_file  = nxt_port_change_log_file_handler,
304     .mmap         = nxt_port_mmap_handler,
305     .data         = nxt_router_conf_data_handler,
306     .remove_pid   = nxt_router_remove_pid_handler,
307     .access_log   = nxt_router_access_log_reopen_handler,
308     .rpc_ready    = nxt_port_rpc_handler,
309     .rpc_error    = nxt_port_rpc_handler,
310 };
311 
312 
313 nxt_int_t
314 nxt_router_start(nxt_task_t *task, void *data)
315 {
316     nxt_int_t      ret;
317     nxt_port_t     *controller_port;
318     nxt_router_t   *router;
319     nxt_runtime_t  *rt;
320 
321     rt = task->thread->runtime;
322 
323 #if (NXT_TLS)
324     rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL");
325     if (nxt_slow_path(rt->tls == NULL)) {
326         return NXT_ERROR;
327     }
328 
329     ret = rt->tls->library_init(task);
330     if (nxt_slow_path(ret != NXT_OK)) {
331         return ret;
332     }
333 #endif
334 
335     ret = nxt_http_init(task, rt);
336     if (nxt_slow_path(ret != NXT_OK)) {
337         return ret;
338     }
339 
340     router = nxt_zalloc(sizeof(nxt_router_t));
341     if (nxt_slow_path(router == NULL)) {
342         return NXT_ERROR;
343     }
344 
345     nxt_queue_init(&router->engines);
346     nxt_queue_init(&router->sockets);
347     nxt_queue_init(&router->apps);
348 
349     nxt_router = router;
350 
351     controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
352     if (controller_port != NULL) {
353         nxt_router_greet_controller(task, controller_port);
354     }
355 
356     return NXT_OK;
357 }
358 
359 
360 static void
361 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port)
362 {
363     nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY,
364                           -1, 0, 0, NULL);
365 }
366 
367 
368 static void
369 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
370     void *data)
371 {
372     size_t         size;
373     uint32_t       stream;
374     nxt_mp_t       *mp;
375     nxt_int_t      ret;
376     nxt_app_t      *app;
377     nxt_buf_t      *b;
378     nxt_port_t     *main_port;
379     nxt_runtime_t  *rt;
380 
381     app = data;
382 
383     rt = task->thread->runtime;
384     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
385 
386     nxt_debug(task, "app '%V' %p start process", &app->name, app);
387 
388     size = app->name.length + 1 + app->conf.length;
389 
390     b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size);
391 
392     if (nxt_slow_path(b == NULL)) {
393         goto failed;
394     }
395 
396     nxt_buf_cpystr(b, &app->name);
397     *b->mem.free++ = '\0';
398     nxt_buf_cpystr(b, &app->conf);
399 
400     nxt_router_app_joint_use(task, app->joint, 1);
401 
402     stream = nxt_port_rpc_register_handler(task, port,
403                                            nxt_router_app_port_ready,
404                                            nxt_router_app_port_error,
405                                            -1, app->joint);
406 
407     if (nxt_slow_path(stream == 0)) {
408         nxt_router_app_joint_use(task, app->joint, -1);
409 
410         goto failed;
411     }
412 
413     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1,
414                                 stream, port->id, b);
415 
416     if (nxt_slow_path(ret != NXT_OK)) {
417         nxt_port_rpc_cancel(task, port, stream);
418 
419         nxt_router_app_joint_use(task, app->joint, -1);
420 
421         goto failed;
422     }
423 
424     nxt_router_app_use(task, app, -1);
425 
426     return;
427 
428 failed:
429 
430     if (b != NULL) {
431         mp = b->data;
432         nxt_mp_free(mp, b);
433         nxt_mp_release(mp);
434     }
435 
436     nxt_thread_mutex_lock(&app->mutex);
437 
438     app->pending_processes--;
439 
440     nxt_thread_mutex_unlock(&app->mutex);
441 
442     nxt_router_app_use(task, app, -1);
443 }
444 
445 
446 static void
447 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i)
448 {
449     app_joint->use_count += i;
450 
451     if (app_joint->use_count == 0) {
452         nxt_assert(app_joint->app == NULL);
453 
454         nxt_free(app_joint);
455     }
456 }
457 
458 
459 static nxt_int_t
460 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
461 {
462     nxt_int_t      res;
463     nxt_port_t     *router_port;
464     nxt_runtime_t  *rt;
465 
466     rt = task->thread->runtime;
467     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
468 
469     nxt_router_app_use(task, app, 1);
470 
471     res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler,
472                         app);
473 
474     if (res == NXT_OK) {
475         return res;
476     }
477 
478     nxt_thread_mutex_lock(&app->mutex);
479 
480     app->pending_processes--;
481 
482     nxt_thread_mutex_unlock(&app->mutex);
483 
484     nxt_router_app_use(task, app, -1);
485 
486     return NXT_ERROR;
487 }
488 
489 
490 nxt_inline void
491 nxt_router_ra_init(nxt_task_t *task, nxt_req_app_link_t *ra,
492     nxt_req_conn_link_t *rc)
493 {
494     nxt_event_engine_t  *engine;
495 
496     engine = task->thread->engine;
497 
498     nxt_memzero(ra, sizeof(nxt_req_app_link_t));
499 
500     ra->stream = rc->stream;
501     ra->use_count = 1;
502     ra->rc = rc;
503     rc->ra = ra;
504     ra->reply_port = engine->port;
505     ra->ap = rc->ap;
506 
507     ra->work.handler = NULL;
508     ra->work.task = &engine->task;
509     ra->work.obj = ra;
510     ra->work.data = engine;
511 }
512 
513 
514 nxt_inline nxt_req_app_link_t *
515 nxt_router_ra_create(nxt_task_t *task, nxt_req_app_link_t *ra_src)
516 {
517     nxt_mp_t            *mp;
518     nxt_req_app_link_t  *ra;
519 
520     if (ra_src->mem_pool != NULL) {
521         return ra_src;
522     }
523 
524     mp = ra_src->ap->mem_pool;
525 
526     ra = nxt_mp_alloc(mp, sizeof(nxt_req_app_link_t));
527 
528     if (nxt_slow_path(ra == NULL)) {
529 
530         ra_src->rc->ra = NULL;
531         ra_src->rc = NULL;
532 
533         return NULL;
534     }
535 
536     nxt_mp_retain(mp);
537 
538     nxt_router_ra_init(task, ra, ra_src->rc);
539 
540     ra->mem_pool = mp;
541 
542     return ra;
543 }
544 
545 
546 nxt_inline nxt_bool_t
547 nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info,
548     uint32_t stream)
549 {
550     nxt_buf_t   *b, *next;
551     nxt_bool_t  cancelled;
552 
553     if (msg_info->buf == NULL) {
554         return 0;
555     }
556 
557     cancelled = nxt_port_mmap_tracking_cancel(task, &msg_info->tracking,
558                                               stream);
559 
560     if (cancelled) {
561         nxt_debug(task, "stream #%uD: cancelled by router", stream);
562     }
563 
564     for (b = msg_info->buf; b != NULL; b = next) {
565         next = b->next;
566 
567         b->completion_handler = msg_info->completion_handler;
568 
569         if (b->is_port_mmap_sent) {
570             b->is_port_mmap_sent = cancelled == 0;
571             b->completion_handler(task, b, b->parent);
572         }
573     }
574 
575     msg_info->buf = NULL;
576 
577     return cancelled;
578 }
579 
580 
581 static void
582 nxt_router_ra_update_peer(nxt_task_t *task, nxt_req_app_link_t *ra);
583 
584 
585 static void
586 nxt_router_ra_update_peer_handler(nxt_task_t *task, void *obj, void *data)
587 {
588     nxt_req_app_link_t  *ra;
589 
590     ra = obj;
591 
592     nxt_router_ra_update_peer(task, ra);
593 
594     nxt_router_ra_use(task, ra, -1);
595 }
596 
597 
598 static void
599 nxt_router_ra_update_peer(nxt_task_t *task, nxt_req_app_link_t *ra)
600 {
601     nxt_event_engine_t   *engine;
602     nxt_req_conn_link_t  *rc;
603 
604     engine = ra->work.data;
605 
606     if (task->thread->engine != engine) {
607         nxt_router_ra_inc_use(ra);
608 
609         ra->work.handler = nxt_router_ra_update_peer_handler;
610         ra->work.task = &engine->task;
611         ra->work.next = NULL;
612 
613         nxt_debug(task, "ra stream #%uD post update peer to %p",
614                   ra->stream, engine);
615 
616         nxt_event_engine_post(engine, &ra->work);
617 
618         return;
619     }
620 
621     nxt_debug(task, "ra stream #%uD update peer", ra->stream);
622 
623     rc = ra->rc;
624 
625     if (rc != NULL && ra->app_port != NULL) {
626         nxt_port_rpc_ex_set_peer(task, engine->port, rc, ra->app_port->pid);
627     }
628 
629     nxt_router_ra_use(task, ra, -1);
630 }
631 
632 
633 static void
634 nxt_router_ra_release(nxt_task_t *task, nxt_req_app_link_t *ra)
635 {
636     nxt_mp_t                *mp;
637     nxt_req_conn_link_t     *rc;
638 
639     nxt_assert(task->thread->engine == ra->work.data);
640     nxt_assert(ra->use_count == 0);
641 
642     nxt_debug(task, "ra stream #%uD release", ra->stream);
643 
644     rc = ra->rc;
645 
646     if (rc != NULL) {
647         if (nxt_slow_path(ra->err_code != 0)) {
648             nxt_http_request_error(task, rc->ap->request, ra->err_code);
649 
650         } else {
651             rc->app_port = ra->app_port;
652             rc->msg_info = ra->msg_info;
653 
654             if (rc->app->timeout != 0) {
655                 rc->ap->timer.handler = nxt_router_app_timeout;
656                 rc->ap->timer_data = rc;
657                 nxt_timer_add(task->thread->engine, &rc->ap->timer,
658                               rc->app->timeout);
659             }
660 
661             ra->app_port = NULL;
662             ra->msg_info.buf = NULL;
663         }
664 
665         rc->ra = NULL;
666         ra->rc = NULL;
667     }
668 
669     if (ra->app_port != NULL) {
670         nxt_router_app_port_release(task, ra->app_port, 0, 1);
671 
672         ra->app_port = NULL;
673     }
674 
675     nxt_router_msg_cancel(task, &ra->msg_info, ra->stream);
676 
677     mp = ra->mem_pool;
678 
679     if (mp != NULL) {
680         nxt_mp_free(mp, ra);
681         nxt_mp_release(mp);
682     }
683 }
684 
685 
686 static void
687 nxt_router_ra_release_handler(nxt_task_t *task, void *obj, void *data)
688 {
689     nxt_req_app_link_t  *ra;
690 
691     ra = obj;
692 
693     nxt_assert(ra->work.data == data);
694 
695     nxt_atomic_fetch_add(&ra->use_count, -1);
696 
697     nxt_router_ra_release(task, ra);
698 }
699 
700 
701 static void
702 nxt_router_ra_use(nxt_task_t *task, nxt_req_app_link_t *ra, int i)
703 {
704     int                 c;
705     nxt_event_engine_t  *engine;
706 
707     c = nxt_atomic_fetch_add(&ra->use_count, i);
708 
709     if (i < 0 && c == -i) {
710         engine = ra->work.data;
711 
712         if (task->thread->engine == engine) {
713             nxt_router_ra_release(task, ra);
714 
715             return;
716         }
717 
718         nxt_router_ra_inc_use(ra);
719 
720         ra->work.handler = nxt_router_ra_release_handler;
721         ra->work.task = &engine->task;
722         ra->work.next = NULL;
723 
724         nxt_debug(task, "ra stream #%uD post release to %p",
725                   ra->stream, engine);
726 
727         nxt_event_engine_post(engine, &ra->work);
728     }
729 }
730 
731 
732 nxt_inline void
733 nxt_router_ra_error(nxt_req_app_link_t *ra, int code, const char *str)
734 {
735     ra->app_port = NULL;
736     ra->err_code = code;
737     ra->err_str = str;
738 }
739 
740 
741 nxt_inline void
742 nxt_router_ra_pending(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra)
743 {
744     nxt_queue_insert_tail(&ra->app_port->pending_requests,
745                           &ra->link_port_pending);
746     nxt_queue_insert_tail(&app->pending, &ra->link_app_pending);
747 
748     nxt_router_ra_inc_use(ra);
749 
750     ra->res_time = nxt_thread_monotonic_time(task->thread) + app->res_timeout;
751 
752     nxt_debug(task, "ra stream #%uD enqueue to pending_requests", ra->stream);
753 }
754 
755 
756 nxt_inline nxt_bool_t
757 nxt_queue_chk_remove(nxt_queue_link_t *lnk)
758 {
759     if (lnk->next != NULL) {
760         nxt_queue_remove(lnk);
761 
762         lnk->next = NULL;
763 
764         return 1;
765     }
766 
767     return 0;
768 }
769 
770 
771 nxt_inline void
772 nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc)
773 {
774     int                 ra_use_delta;
775     nxt_req_app_link_t  *ra;
776 
777     if (rc->app_port != NULL) {
778         nxt_router_app_port_release(task, rc->app_port, 0, 1);
779 
780         rc->app_port = NULL;
781     }
782 
783     nxt_router_msg_cancel(task, &rc->msg_info, rc->stream);
784 
785     ra = rc->ra;
786 
787     if (ra != NULL) {
788         rc->ra = NULL;
789         ra->rc = NULL;
790 
791         ra_use_delta = 0;
792 
793         nxt_thread_mutex_lock(&rc->app->mutex);
794 
795         if (ra->link_app_requests.next == NULL
796             && ra->link_port_pending.next == NULL
797             && ra->link_app_pending.next == NULL)
798         {
799             ra = NULL;
800 
801         } else {
802             ra_use_delta -= nxt_queue_chk_remove(&ra->link_app_requests);
803             ra_use_delta -= nxt_queue_chk_remove(&ra->link_port_pending);
804             nxt_queue_chk_remove(&ra->link_app_pending);
805         }
806 
807         nxt_thread_mutex_unlock(&rc->app->mutex);
808 
809         if (ra != NULL) {
810             nxt_router_ra_use(task, ra, ra_use_delta);
811         }
812     }
813 
814     if (rc->app != NULL) {
815         nxt_router_app_use(task, rc->app, -1);
816 
817         rc->app = NULL;
818     }
819 
820     if (rc->ap != NULL) {
821         rc->ap->timer_data = NULL;
822 
823         nxt_app_http_req_done(task, rc->ap);
824 
825         rc->ap = NULL;
826     }
827 }
828 
829 
830 void
831 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
832 {
833     nxt_port_new_port_handler(task, msg);
834 
835     if (msg->u.new_port != NULL
836         && msg->u.new_port->type == NXT_PROCESS_CONTROLLER)
837     {
838         nxt_router_greet_controller(task, msg->u.new_port);
839     }
840 
841     if (msg->port_msg.stream == 0) {
842         return;
843     }
844 
845     if (msg->u.new_port == NULL
846         || msg->u.new_port->type != NXT_PROCESS_WORKER)
847     {
848         msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
849     }
850 
851     nxt_port_rpc_handler(task, msg);
852 }
853 
854 
855 void
856 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
857 {
858     nxt_int_t               ret;
859     nxt_buf_t               *b;
860     nxt_router_temp_conf_t  *tmcf;
861 
862     tmcf = nxt_router_temp_conf(task);
863     if (nxt_slow_path(tmcf == NULL)) {
864         return;
865     }
866 
867     nxt_debug(task, "nxt_router_conf_data_handler(%O): %*s",
868               nxt_buf_used_size(msg->buf),
869               (size_t) nxt_buf_used_size(msg->buf), msg->buf->mem.pos);
870 
871     tmcf->router_conf->router = nxt_router;
872     tmcf->stream = msg->port_msg.stream;
873     tmcf->port = nxt_runtime_port_find(task->thread->runtime,
874                                        msg->port_msg.pid,
875                                        msg->port_msg.reply_port);
876 
877     if (nxt_slow_path(tmcf->port == NULL)) {
878         nxt_alert(task, "reply port not found");
879 
880         return;
881     }
882 
883     nxt_port_use(task, tmcf->port, 1);
884 
885     b = nxt_buf_chk_make_plain(tmcf->router_conf->mem_pool,
886                                msg->buf, msg->size);
887     if (nxt_slow_path(b == NULL)) {
888         nxt_router_conf_error(task, tmcf);
889 
890         return;
891     }
892 
893     ret = nxt_router_conf_create(task, tmcf, b->mem.pos, b->mem.free);
894 
895     if (nxt_fast_path(ret == NXT_OK)) {
896         nxt_router_conf_apply(task, tmcf, NULL);
897 
898     } else {
899         nxt_router_conf_error(task, tmcf);
900     }
901 }
902 
903 
904 static void
905 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
906     void *data)
907 {
908     union {
909         nxt_pid_t  removed_pid;
910         void       *data;
911     } u;
912 
913     u.data = data;
914 
915     nxt_port_rpc_remove_peer(task, port, u.removed_pid);
916 }
917 
918 
919 void
920 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
921 {
922     nxt_event_engine_t  *engine;
923 
924     nxt_port_remove_pid_handler(task, msg);
925 
926     if (msg->port_msg.stream == 0) {
927         return;
928     }
929 
930     nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
931     {
932         nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid,
933                       msg->u.data);
934     }
935     nxt_queue_loop;
936 
937     msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
938 
939     nxt_port_rpc_handler(task, msg);
940 }
941 
942 
943 static nxt_router_temp_conf_t *
944 nxt_router_temp_conf(nxt_task_t *task)
945 {
946     nxt_mp_t                *mp, *tmp;
947     nxt_router_conf_t       *rtcf;
948     nxt_router_temp_conf_t  *tmcf;
949 
950     mp = nxt_mp_create(1024, 128, 256, 32);
951     if (nxt_slow_path(mp == NULL)) {
952         return NULL;
953     }
954 
955     rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t));
956     if (nxt_slow_path(rtcf == NULL)) {
957         goto fail;
958     }
959 
960     rtcf->mem_pool = mp;
961 
962     tmp = nxt_mp_create(1024, 128, 256, 32);
963     if (nxt_slow_path(tmp == NULL)) {
964         goto fail;
965     }
966 
967     tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t));
968     if (nxt_slow_path(tmcf == NULL)) {
969         goto temp_fail;
970     }
971 
972     tmcf->mem_pool = tmp;
973     tmcf->router_conf = rtcf;
974     tmcf->count = 1;
975     tmcf->engine = task->thread->engine;
976 
977     tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
978                                      sizeof(nxt_router_engine_conf_t));
979     if (nxt_slow_path(tmcf->engines == NULL)) {
980         goto temp_fail;
981     }
982 
983     nxt_queue_init(&tmcf->deleting);
984     nxt_queue_init(&tmcf->keeping);
985     nxt_queue_init(&tmcf->updating);
986     nxt_queue_init(&tmcf->pending);
987     nxt_queue_init(&tmcf->creating);
988 
989 #if (NXT_TLS)
990     nxt_queue_init(&tmcf->tls);
991 #endif
992 
993     nxt_queue_init(&tmcf->apps);
994     nxt_queue_init(&tmcf->previous);
995 
996     return tmcf;
997 
998 temp_fail:
999 
1000     nxt_mp_destroy(tmp);
1001 
1002 fail:
1003 
1004     nxt_mp_destroy(mp);
1005 
1006     return NULL;
1007 }
1008 
1009 
1010 nxt_inline nxt_bool_t
1011 nxt_router_app_can_start(nxt_app_t *app)
1012 {
1013     return app->processes + app->pending_processes < app->max_processes
1014             && app->pending_processes < app->max_pending_processes;
1015 }
1016 
1017 
1018 nxt_inline nxt_bool_t
1019 nxt_router_app_need_start(nxt_app_t *app)
1020 {
1021     return app->idle_processes + app->pending_processes
1022             < app->spare_processes;
1023 }
1024 
1025 
1026 static void
1027 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
1028 {
1029     nxt_int_t                    ret;
1030     nxt_app_t                    *app;
1031     nxt_router_t                 *router;
1032     nxt_runtime_t                *rt;
1033     nxt_queue_link_t             *qlk;
1034     nxt_socket_conf_t            *skcf;
1035     nxt_router_conf_t            *rtcf;
1036     nxt_router_temp_conf_t       *tmcf;
1037     const nxt_event_interface_t  *interface;
1038 #if (NXT_TLS)
1039     nxt_router_tlssock_t         *tls;
1040 #endif
1041 
1042     tmcf = obj;
1043 
1044     qlk = nxt_queue_first(&tmcf->pending);
1045 
1046     if (qlk != nxt_queue_tail(&tmcf->pending)) {
1047         nxt_queue_remove(qlk);
1048         nxt_queue_insert_tail(&tmcf->creating, qlk);
1049 
1050         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1051 
1052         nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
1053 
1054         return;
1055     }
1056 
1057 #if (NXT_TLS)
1058     qlk = nxt_queue_first(&tmcf->tls);
1059 
1060     if (qlk != nxt_queue_tail(&tmcf->tls)) {
1061         nxt_queue_remove(qlk);
1062 
1063         tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link);
1064 
1065         nxt_router_tls_rpc_create(task, tmcf, tls);
1066         return;
1067     }
1068 #endif
1069 
1070     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1071 
1072         if (nxt_router_app_need_start(app)) {
1073             nxt_router_app_rpc_create(task, tmcf, app);
1074             return;
1075         }
1076 
1077     } nxt_queue_loop;
1078 
1079     rtcf = tmcf->router_conf;
1080 
1081     if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) {
1082         nxt_router_access_log_open(task, tmcf);
1083         return;
1084     }
1085 
1086     rt = task->thread->runtime;
1087 
1088     interface = nxt_service_get(rt->services, "engine", NULL);
1089 
1090     router = rtcf->router;
1091 
1092     ret = nxt_router_engines_create(task, router, tmcf, interface);
1093     if (nxt_slow_path(ret != NXT_OK)) {
1094         goto fail;
1095     }
1096 
1097     ret = nxt_router_threads_create(task, rt, tmcf);
1098     if (nxt_slow_path(ret != NXT_OK)) {
1099         goto fail;
1100     }
1101 
1102     nxt_router_apps_sort(task, router, tmcf);
1103 
1104     nxt_router_engines_post(router, tmcf);
1105 
1106     nxt_queue_add(&router->sockets, &tmcf->updating);
1107     nxt_queue_add(&router->sockets, &tmcf->creating);
1108 
1109     router->access_log = rtcf->access_log;
1110 
1111     nxt_router_conf_ready(task, tmcf);
1112 
1113     return;
1114 
1115 fail:
1116 
1117     nxt_router_conf_error(task, tmcf);
1118 
1119     return;
1120 }
1121 
1122 
1123 static void
1124 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data)
1125 {
1126     nxt_joint_job_t  *job;
1127 
1128     job = obj;
1129 
1130     nxt_router_conf_ready(task, job->tmcf);
1131 }
1132 
1133 
1134 static void
1135 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1136 {
1137     nxt_debug(task, "temp conf count:%D", tmcf->count);
1138 
1139     if (--tmcf->count == 0) {
1140         nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
1141     }
1142 }
1143 
1144 
1145 static void
1146 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1147 {
1148     nxt_app_t          *app;
1149     nxt_queue_t        new_socket_confs;
1150     nxt_socket_t       s;
1151     nxt_router_t       *router;
1152     nxt_queue_link_t   *qlk;
1153     nxt_socket_conf_t  *skcf;
1154     nxt_router_conf_t  *rtcf;
1155 
1156     nxt_alert(task, "failed to apply new conf");
1157 
1158     for (qlk = nxt_queue_first(&tmcf->creating);
1159          qlk != nxt_queue_tail(&tmcf->creating);
1160          qlk = nxt_queue_next(qlk))
1161     {
1162         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1163         s = skcf->listen->socket;
1164 
1165         if (s != -1) {
1166             nxt_socket_close(task, s);
1167         }
1168 
1169         nxt_free(skcf->listen);
1170     }
1171 
1172     nxt_queue_init(&new_socket_confs);
1173     nxt_queue_add(&new_socket_confs, &tmcf->updating);
1174     nxt_queue_add(&new_socket_confs, &tmcf->pending);
1175     nxt_queue_add(&new_socket_confs, &tmcf->creating);
1176 
1177     nxt_queue_each(skcf, &new_socket_confs, nxt_socket_conf_t, link) {
1178 
1179         if (skcf->application != NULL) {
1180             nxt_router_app_use(task, skcf->application, -1);
1181             skcf->application = NULL;
1182         }
1183 
1184     } nxt_queue_loop;
1185 
1186     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1187 
1188         nxt_router_app_unlink(task, app);
1189 
1190     } nxt_queue_loop;
1191 
1192     rtcf = tmcf->router_conf;
1193     router = rtcf->router;
1194 
1195     nxt_queue_add(&router->sockets, &tmcf->keeping);
1196     nxt_queue_add(&router->sockets, &tmcf->deleting);
1197 
1198     nxt_queue_add(&router->apps, &tmcf->previous);
1199 
1200     // TODO: new engines and threads
1201 
1202     nxt_router_access_log_release(task, &router->lock, rtcf->access_log);
1203 
1204     nxt_mp_destroy(rtcf->mem_pool);
1205 
1206     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
1207 }
1208 
1209 
1210 static void
1211 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1212     nxt_port_msg_type_t type)
1213 {
1214     nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);
1215 
1216     nxt_port_use(task, tmcf->port, -1);
1217 
1218     tmcf->port = NULL;
1219 }
1220 
1221 
1222 static nxt_conf_map_t  nxt_router_conf[] = {
1223     {
1224         nxt_string("listeners_threads"),
1225         NXT_CONF_MAP_INT32,
1226         offsetof(nxt_router_conf_t, threads),
1227     },
1228 };
1229 
1230 
1231 static nxt_conf_map_t  nxt_router_app_conf[] = {
1232     {
1233         nxt_string("type"),
1234         NXT_CONF_MAP_STR,
1235         offsetof(nxt_router_app_conf_t, type),
1236     },
1237 
1238     {
1239         nxt_string("limits"),
1240         NXT_CONF_MAP_PTR,
1241         offsetof(nxt_router_app_conf_t, limits_value),
1242     },
1243 
1244     {
1245         nxt_string("processes"),
1246         NXT_CONF_MAP_INT32,
1247         offsetof(nxt_router_app_conf_t, processes),
1248     },
1249 
1250     {
1251         nxt_string("processes"),
1252         NXT_CONF_MAP_PTR,
1253         offsetof(nxt_router_app_conf_t, processes_value),
1254     },
1255 };
1256 
1257 
1258 static nxt_conf_map_t  nxt_router_app_limits_conf[] = {
1259     {
1260         nxt_string("timeout"),
1261         NXT_CONF_MAP_MSEC,
1262         offsetof(nxt_router_app_conf_t, timeout),
1263     },
1264 
1265     {
1266         nxt_string("reschedule_timeout"),
1267         NXT_CONF_MAP_MSEC,
1268         offsetof(nxt_router_app_conf_t, res_timeout),
1269     },
1270 
1271     {
1272         nxt_string("requests"),
1273         NXT_CONF_MAP_INT32,
1274         offsetof(nxt_router_app_conf_t, requests),
1275     },
1276 };
1277 
1278 
1279 static nxt_conf_map_t  nxt_router_app_processes_conf[] = {
1280     {
1281         nxt_string("spare"),
1282         NXT_CONF_MAP_INT32,
1283         offsetof(nxt_router_app_conf_t, spare_processes),
1284     },
1285 
1286     {
1287         nxt_string("max"),
1288         NXT_CONF_MAP_INT32,
1289         offsetof(nxt_router_app_conf_t, max_processes),
1290     },
1291 
1292     {
1293         nxt_string("idle_timeout"),
1294         NXT_CONF_MAP_MSEC,
1295         offsetof(nxt_router_app_conf_t, idle_timeout),
1296     },
1297 };
1298 
1299 
1300 static nxt_conf_map_t  nxt_router_listener_conf[] = {
1301     {
1302         nxt_string("application"),
1303         NXT_CONF_MAP_STR,
1304         offsetof(nxt_router_listener_conf_t, application),
1305     },
1306 };
1307 
1308 
1309 static nxt_conf_map_t  nxt_router_http_conf[] = {
1310     {
1311         nxt_string("header_buffer_size"),
1312         NXT_CONF_MAP_SIZE,
1313         offsetof(nxt_socket_conf_t, header_buffer_size),
1314     },
1315 
1316     {
1317         nxt_string("large_header_buffer_size"),
1318         NXT_CONF_MAP_SIZE,
1319         offsetof(nxt_socket_conf_t, large_header_buffer_size),
1320     },
1321 
1322     {
1323         nxt_string("large_header_buffers"),
1324         NXT_CONF_MAP_SIZE,
1325         offsetof(nxt_socket_conf_t, large_header_buffers),
1326     },
1327 
1328     {
1329         nxt_string("body_buffer_size"),
1330         NXT_CONF_MAP_SIZE,
1331         offsetof(nxt_socket_conf_t, body_buffer_size),
1332     },
1333 
1334     {
1335         nxt_string("max_body_size"),
1336         NXT_CONF_MAP_SIZE,
1337         offsetof(nxt_socket_conf_t, max_body_size),
1338     },
1339 
1340     {
1341         nxt_string("idle_timeout"),
1342         NXT_CONF_MAP_MSEC,
1343         offsetof(nxt_socket_conf_t, idle_timeout),
1344     },
1345 
1346     {
1347         nxt_string("header_read_timeout"),
1348         NXT_CONF_MAP_MSEC,
1349         offsetof(nxt_socket_conf_t, header_read_timeout),
1350     },
1351 
1352     {
1353         nxt_string("body_read_timeout"),
1354         NXT_CONF_MAP_MSEC,
1355         offsetof(nxt_socket_conf_t, body_read_timeout),
1356     },
1357 
1358     {
1359         nxt_string("send_timeout"),
1360         NXT_CONF_MAP_MSEC,
1361         offsetof(nxt_socket_conf_t, send_timeout),
1362     },
1363 };
1364 
1365 
1366 static nxt_int_t
1367 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1368     u_char *start, u_char *end)
1369 {
1370     u_char                      *p;
1371     size_t                      size;
1372     nxt_mp_t                    *mp;
1373     uint32_t                    next;
1374     nxt_int_t                   ret;
1375     nxt_str_t                   name, path;
1376     nxt_app_t                   *app, *prev;
1377     nxt_router_t                *router;
1378     nxt_app_joint_t             *app_joint;
1379     nxt_conf_value_t            *conf, *http, *value;
1380     nxt_conf_value_t            *applications, *application;
1381     nxt_conf_value_t            *listeners, *listener;
1382     nxt_socket_conf_t           *skcf;
1383     nxt_event_engine_t          *engine;
1384     nxt_app_lang_module_t       *lang;
1385     nxt_router_app_conf_t       apcf;
1386     nxt_router_access_log_t     *access_log;
1387     nxt_router_listener_conf_t  lscf;
1388 #if (NXT_TLS)
1389     nxt_router_tlssock_t        *tls;
1390 #endif
1391 
1392     static nxt_str_t  http_path = nxt_string("/settings/http");
1393     static nxt_str_t  applications_path = nxt_string("/applications");
1394     static nxt_str_t  listeners_path = nxt_string("/listeners");
1395     static nxt_str_t  access_log_path = nxt_string("/access_log");
1396 #if (NXT_TLS)
1397     static nxt_str_t  certificate_path = nxt_string("/tls/certificate");
1398 #endif
1399 
1400     conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
1401     if (conf == NULL) {
1402         nxt_alert(task, "configuration parsing error");
1403         return NXT_ERROR;
1404     }
1405 
1406     mp = tmcf->router_conf->mem_pool;
1407 
1408     ret = nxt_conf_map_object(mp, conf, nxt_router_conf,
1409                               nxt_nitems(nxt_router_conf), tmcf->router_conf);
1410     if (ret != NXT_OK) {
1411         nxt_alert(task, "root map error");
1412         return NXT_ERROR;
1413     }
1414 
1415     if (tmcf->router_conf->threads == 0) {
1416         tmcf->router_conf->threads = nxt_ncpu;
1417     }
1418 
1419     applications = nxt_conf_get_path(conf, &applications_path);
1420     if (applications == NULL) {
1421         nxt_alert(task, "no \"applications\" block");
1422         return NXT_ERROR;
1423     }
1424 
1425     router = tmcf->router_conf->router;
1426 
1427     next = 0;
1428 
1429     for ( ;; ) {
1430         application = nxt_conf_next_object_member(applications, &name, &next);
1431         if (application == NULL) {
1432             break;
1433         }
1434 
1435         nxt_debug(task, "application \"%V\"", &name);
1436 
1437         size = nxt_conf_json_length(application, NULL);
1438 
1439         app = nxt_malloc(sizeof(nxt_app_t) + name.length + size);
1440         if (app == NULL) {
1441             goto fail;
1442         }
1443 
1444         nxt_memzero(app, sizeof(nxt_app_t));
1445 
1446         app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t));
1447         app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t) + name.length);
1448 
1449         p = nxt_conf_json_print(app->conf.start, application, NULL);
1450         app->conf.length = p - app->conf.start;
1451 
1452         nxt_assert(app->conf.length <= size);
1453 
1454         nxt_debug(task, "application conf \"%V\"", &app->conf);
1455 
1456         prev = nxt_router_app_find(&router->apps, &name);
1457 
1458         if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
1459             nxt_free(app);
1460 
1461             nxt_queue_remove(&prev->link);
1462             nxt_queue_insert_tail(&tmcf->previous, &prev->link);
1463             continue;
1464         }
1465 
1466         apcf.processes = 1;
1467         apcf.max_processes = 1;
1468         apcf.spare_processes = 0;
1469         apcf.timeout = 0;
1470         apcf.res_timeout = 1000;
1471         apcf.idle_timeout = 15000;
1472         apcf.requests = 0;
1473         apcf.limits_value = NULL;
1474         apcf.processes_value = NULL;
1475 
1476         app_joint = nxt_malloc(sizeof(nxt_app_joint_t));
1477         if (nxt_slow_path(app_joint == NULL)) {
1478             goto app_fail;
1479         }
1480 
1481         nxt_memzero(app_joint, sizeof(nxt_app_joint_t));
1482 
1483         ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
1484                                   nxt_nitems(nxt_router_app_conf), &apcf);
1485         if (ret != NXT_OK) {
1486             nxt_alert(task, "application map error");
1487             goto app_fail;
1488         }
1489 
1490         if (apcf.limits_value != NULL) {
1491 
1492             if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) {
1493                 nxt_alert(task, "application limits is not object");
1494                 goto app_fail;
1495             }
1496 
1497             ret = nxt_conf_map_object(mp, apcf.limits_value,
1498                                       nxt_router_app_limits_conf,
1499                                       nxt_nitems(nxt_router_app_limits_conf),
1500                                       &apcf);
1501             if (ret != NXT_OK) {
1502                 nxt_alert(task, "application limits map error");
1503                 goto app_fail;
1504             }
1505         }
1506 
1507         if (apcf.processes_value != NULL
1508             && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT)
1509         {
1510             ret = nxt_conf_map_object(mp, apcf.processes_value,
1511                                       nxt_router_app_processes_conf,
1512                                       nxt_nitems(nxt_router_app_processes_conf),
1513                                       &apcf);
1514             if (ret != NXT_OK) {
1515                 nxt_alert(task, "application processes map error");
1516                 goto app_fail;
1517             }
1518 
1519         } else {
1520             apcf.max_processes = apcf.processes;
1521             apcf.spare_processes = apcf.processes;
1522         }
1523 
1524         nxt_debug(task, "application type: %V", &apcf.type);
1525         nxt_debug(task, "application processes: %D", apcf.processes);
1526         nxt_debug(task, "application request timeout: %M", apcf.timeout);
1527         nxt_debug(task, "application reschedule timeout: %M", apcf.res_timeout);
1528         nxt_debug(task, "application requests: %D", apcf.requests);
1529 
1530         lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
1531 
1532         if (lang == NULL) {
1533             nxt_alert(task, "unknown application type: \"%V\"", &apcf.type);
1534             goto app_fail;
1535         }
1536 
1537         nxt_debug(task, "application language module: \"%s\"", lang->file);
1538 
1539         ret = nxt_thread_mutex_create(&app->mutex);
1540         if (ret != NXT_OK) {
1541             goto app_fail;
1542         }
1543 
1544         nxt_queue_init(&app->ports);
1545         nxt_queue_init(&app->spare_ports);
1546         nxt_queue_init(&app->idle_ports);
1547         nxt_queue_init(&app->requests);
1548         nxt_queue_init(&app->pending);
1549 
1550         app->name.length = name.length;
1551         nxt_memcpy(app->name.start, name.start, name.length);
1552 
1553         app->type = lang->type;
1554         app->max_processes = apcf.max_processes;
1555         app->spare_processes = apcf.spare_processes;
1556         app->max_pending_processes = apcf.spare_processes
1557                                       ? apcf.spare_processes : 1;
1558         app->timeout = apcf.timeout;
1559         app->res_timeout = apcf.res_timeout * 1000000;
1560         app->idle_timeout = apcf.idle_timeout;
1561         app->max_pending_responses = 2;
1562         app->max_requests = apcf.requests;
1563 
1564         engine = task->thread->engine;
1565 
1566         app->engine = engine;
1567 
1568         app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;
1569         app->adjust_idle_work.task = &engine->task;
1570         app->adjust_idle_work.obj = app;
1571 
1572         nxt_queue_insert_tail(&tmcf->apps, &app->link);
1573 
1574         nxt_router_app_use(task, app, 1);
1575 
1576         app->joint = app_joint;
1577 
1578         app_joint->use_count = 1;
1579         app_joint->app = app;
1580 
1581         app_joint->idle_timer.precision = NXT_TIMER_DEFAULT_PRECISION;
1582         app_joint->idle_timer.work_queue = &engine->fast_work_queue;
1583         app_joint->idle_timer.handler = nxt_router_app_idle_timeout;
1584         app_joint->idle_timer.task = &engine->task;
1585         app_joint->idle_timer.log = app_joint->idle_timer.task->log;
1586 
1587         app_joint->free_app_work.handler = nxt_router_free_app;
1588         app_joint->free_app_work.task = &engine->task;
1589         app_joint->free_app_work.obj = app_joint;
1590     }
1591 
1592     http = nxt_conf_get_path(conf, &http_path);
1593 #if 0
1594     if (http == NULL) {
1595         nxt_alert(task, "no \"http\" block");
1596         return NXT_ERROR;
1597     }
1598 #endif
1599 
1600     listeners = nxt_conf_get_path(conf, &listeners_path);
1601     if (listeners == NULL) {
1602         nxt_alert(task, "no \"listeners\" block");
1603         return NXT_ERROR;
1604     }
1605 
1606     next = 0;
1607 
1608     for ( ;; ) {
1609         listener = nxt_conf_next_object_member(listeners, &name, &next);
1610         if (listener == NULL) {
1611             break;
1612         }
1613 
1614         skcf = nxt_router_socket_conf(task, tmcf, &name);
1615         if (skcf == NULL) {
1616             goto fail;
1617         }
1618 
1619         nxt_memzero(&lscf, sizeof(lscf));
1620 
1621         ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
1622                                   nxt_nitems(nxt_router_listener_conf), &lscf);
1623         if (ret != NXT_OK) {
1624             nxt_alert(task, "listener map error");
1625             goto fail;
1626         }
1627 
1628         nxt_debug(task, "application: %V", &lscf.application);
1629 
1630         // STUB, default values if http block is not defined.
1631         skcf->header_buffer_size = 2048;
1632         skcf->large_header_buffer_size = 8192;
1633         skcf->large_header_buffers = 4;
1634         skcf->body_buffer_size = 16 * 1024;
1635         skcf->max_body_size = 8 * 1024 * 1024;
1636         skcf->idle_timeout = 180 * 1000;
1637         skcf->header_read_timeout = 30 * 1000;
1638         skcf->body_read_timeout = 30 * 1000;
1639         skcf->send_timeout = 30 * 1000;
1640 
1641         if (http != NULL) {
1642             ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
1643                                       nxt_nitems(nxt_router_http_conf), skcf);
1644             if (ret != NXT_OK) {
1645                 nxt_alert(task, "http map error");
1646                 goto fail;
1647             }
1648         }
1649 
1650 #if (NXT_TLS)
1651 
1652         value = nxt_conf_get_path(listener, &certificate_path);
1653 
1654         if (value != NULL) {
1655             nxt_conf_get_string(value, &name);
1656 
1657             tls = nxt_mp_get(mp, sizeof(nxt_router_tlssock_t));
1658             if (nxt_slow_path(tls == NULL)) {
1659                 goto fail;
1660             }
1661 
1662             tls->name = name;
1663             tls->conf = skcf;
1664 
1665             nxt_queue_insert_tail(&tmcf->tls, &tls->link);
1666         }
1667 
1668 #endif
1669 
1670         skcf->listen->handler = nxt_http_conn_init;
1671         skcf->router_conf = tmcf->router_conf;
1672         skcf->router_conf->count++;
1673 
1674         if (lscf.application.length > 0) {
1675             skcf->application = nxt_router_listener_application(tmcf,
1676                                                             &lscf.application);
1677             nxt_router_app_use(task, skcf->application, 1);
1678         }
1679     }
1680 
1681     value = nxt_conf_get_path(conf, &access_log_path);
1682 
1683     if (value != NULL) {
1684         nxt_conf_get_string(value, &path);
1685 
1686         access_log = router->access_log;
1687 
1688         if (access_log != NULL && nxt_strstr_eq(&path, &access_log->path)) {
1689             nxt_thread_spin_lock(&router->lock);
1690             access_log->count++;
1691             nxt_thread_spin_unlock(&router->lock);
1692 
1693         } else {
1694             access_log = nxt_malloc(sizeof(nxt_router_access_log_t)
1695                                     + path.length);
1696             if (access_log == NULL) {
1697                 nxt_alert(task, "failed to allocate access log structure");
1698                 goto fail;
1699             }
1700 
1701             access_log->fd = -1;
1702             access_log->handler = &nxt_router_access_log_writer;
1703             access_log->count = 1;
1704 
1705             access_log->path.length = path.length;
1706             access_log->path.start = (u_char *) access_log
1707                                      + sizeof(nxt_router_access_log_t);
1708 
1709             nxt_memcpy(access_log->path.start, path.start, path.length);
1710         }
1711 
1712         tmcf->router_conf->access_log = access_log;
1713     }
1714 
1715     nxt_queue_add(&tmcf->deleting, &router->sockets);
1716     nxt_queue_init(&router->sockets);
1717 
1718     return NXT_OK;
1719 
1720 app_fail:
1721 
1722     nxt_free(app);
1723 
1724 fail:
1725 
1726     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1727 
1728         nxt_queue_remove(&app->link);
1729         nxt_thread_mutex_destroy(&app->mutex);
1730         nxt_free(app);
1731 
1732     } nxt_queue_loop;
1733 
1734     return NXT_ERROR;
1735 }
1736 
1737 
1738 static nxt_app_t *
1739 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
1740 {
1741     nxt_app_t  *app;
1742 
1743     nxt_queue_each(app, queue, nxt_app_t, link) {
1744 
1745         if (nxt_strstr_eq(name, &app->name)) {
1746             return app;
1747         }
1748 
1749     } nxt_queue_loop;
1750 
1751     return NULL;
1752 }
1753 
1754 
1755 static nxt_app_t *
1756 nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name)
1757 {
1758     nxt_app_t  *app;
1759 
1760     app = nxt_router_app_find(&tmcf->apps, name);
1761 
1762     if (app == NULL) {
1763         app = nxt_router_app_find(&tmcf->previous, name);
1764     }
1765 
1766     return app;
1767 }
1768 
1769 
1770 static nxt_socket_conf_t *
1771 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1772     nxt_str_t *name)
1773 {
1774     size_t               size;
1775     nxt_int_t            ret;
1776     nxt_bool_t           wildcard;
1777     nxt_sockaddr_t       *sa;
1778     nxt_socket_conf_t    *skcf;
1779     nxt_listen_socket_t  *ls;
1780 
1781     sa = nxt_sockaddr_parse(tmcf->mem_pool, name);
1782     if (nxt_slow_path(sa == NULL)) {
1783         nxt_alert(task, "invalid listener \"%V\"", name);
1784         return NULL;
1785     }
1786 
1787     sa->type = SOCK_STREAM;
1788 
1789     nxt_debug(task, "router listener: \"%*s\"",
1790               (size_t) sa->length, nxt_sockaddr_start(sa));
1791 
1792     skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t));
1793     if (nxt_slow_path(skcf == NULL)) {
1794         return NULL;
1795     }
1796 
1797     size = nxt_sockaddr_size(sa);
1798 
1799     ret = nxt_router_listen_socket_find(tmcf, skcf, sa);
1800 
1801     if (ret != NXT_OK) {
1802 
1803         ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size);
1804         if (nxt_slow_path(ls == NULL)) {
1805             return NULL;
1806         }
1807 
1808         skcf->listen = ls;
1809 
1810         ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t));
1811         nxt_memcpy(ls->sockaddr, sa, size);
1812 
1813         nxt_listen_socket_remote_size(ls);
1814 
1815         ls->socket = -1;
1816         ls->backlog = NXT_LISTEN_BACKLOG;
1817         ls->flags = NXT_NONBLOCK;
1818         ls->read_after_accept = 1;
1819     }
1820 
1821     switch (sa->u.sockaddr.sa_family) {
1822 #if (NXT_HAVE_UNIX_DOMAIN)
1823     case AF_UNIX:
1824         wildcard = 0;
1825         break;
1826 #endif
1827 #if (NXT_INET6)
1828     case AF_INET6:
1829         wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr);
1830         break;
1831 #endif
1832     case AF_INET:
1833     default:
1834         wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY);
1835         break;
1836     }
1837 
1838     if (!wildcard) {
1839         skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size);
1840         if (nxt_slow_path(skcf->sockaddr == NULL)) {
1841             return NULL;
1842         }
1843 
1844         nxt_memcpy(skcf->sockaddr, sa, size);
1845     }
1846 
1847     return skcf;
1848 }
1849 
1850 
1851 static nxt_int_t
1852 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
1853     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa)
1854 {
1855     nxt_router_t       *router;
1856     nxt_queue_link_t   *qlk;
1857     nxt_socket_conf_t  *skcf;
1858 
1859     router = tmcf->router_conf->router;
1860 
1861     for (qlk = nxt_queue_first(&router->sockets);
1862          qlk != nxt_queue_tail(&router->sockets);
1863          qlk = nxt_queue_next(qlk))
1864     {
1865         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1866 
1867         if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) {
1868             nskcf->listen = skcf->listen;
1869 
1870             nxt_queue_remove(qlk);
1871             nxt_queue_insert_tail(&tmcf->keeping, qlk);
1872 
1873             nxt_queue_insert_tail(&tmcf->updating, &nskcf->link);
1874 
1875             return NXT_OK;
1876         }
1877     }
1878 
1879     nxt_queue_insert_tail(&tmcf->pending, &nskcf->link);
1880 
1881     return NXT_DECLINED;
1882 }
1883 
1884 
1885 static void
1886 nxt_router_listen_socket_rpc_create(nxt_task_t *task,
1887     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)
1888 {
1889     size_t            size;
1890     uint32_t          stream;
1891     nxt_int_t         ret;
1892     nxt_buf_t         *b;
1893     nxt_port_t        *main_port, *router_port;
1894     nxt_runtime_t     *rt;
1895     nxt_socket_rpc_t  *rpc;
1896 
1897     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
1898     if (rpc == NULL) {
1899         goto fail;
1900     }
1901 
1902     rpc->socket_conf = skcf;
1903     rpc->temp_conf = tmcf;
1904 
1905     size = nxt_sockaddr_size(skcf->listen->sockaddr);
1906 
1907     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
1908     if (b == NULL) {
1909         goto fail;
1910     }
1911 
1912     b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size);
1913 
1914     rt = task->thread->runtime;
1915     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
1916     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
1917 
1918     stream = nxt_port_rpc_register_handler(task, router_port,
1919                                            nxt_router_listen_socket_ready,
1920                                            nxt_router_listen_socket_error,
1921                                            main_port->pid, rpc);
1922     if (nxt_slow_path(stream == 0)) {
1923         goto fail;
1924     }
1925 
1926     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1,
1927                                 stream, router_port->id, b);
1928 
1929     if (nxt_slow_path(ret != NXT_OK)) {
1930         nxt_port_rpc_cancel(task, router_port, stream);
1931         goto fail;
1932     }
1933 
1934     return;
1935 
1936 fail:
1937 
1938     nxt_router_conf_error(task, tmcf);
1939 }
1940 
1941 
1942 static void
1943 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
1944     void *data)
1945 {
1946     nxt_int_t         ret;
1947     nxt_socket_t      s;
1948     nxt_socket_rpc_t  *rpc;
1949 
1950     rpc = data;
1951 
1952     s = msg->fd;
1953 
1954     ret = nxt_socket_nonblocking(task, s);
1955     if (nxt_slow_path(ret != NXT_OK)) {
1956         goto fail;
1957     }
1958 
1959     nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr);
1960 
1961     ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
1962     if (nxt_slow_path(ret != NXT_OK)) {
1963         goto fail;
1964     }
1965 
1966     rpc->socket_conf->listen->socket = s;
1967 
1968     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
1969                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
1970 
1971     return;
1972 
1973 fail:
1974 
1975     nxt_socket_close(task, s);
1976 
1977     nxt_router_conf_error(task, rpc->temp_conf);
1978 }
1979 
1980 
1981 static void
1982 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
1983     void *data)
1984 {
1985     u_char                  *p;
1986     size_t                  size;
1987     uint8_t                 error;
1988     nxt_buf_t               *in, *out;
1989     nxt_sockaddr_t          *sa;
1990     nxt_socket_rpc_t        *rpc;
1991     nxt_router_temp_conf_t  *tmcf;
1992 
1993     static nxt_str_t  socket_errors[] = {
1994         nxt_string("ListenerSystem"),
1995         nxt_string("ListenerNoIPv6"),
1996         nxt_string("ListenerPort"),
1997         nxt_string("ListenerInUse"),
1998         nxt_string("ListenerNoAddress"),
1999         nxt_string("ListenerNoAccess"),
2000         nxt_string("ListenerPath"),
2001     };
2002 
2003     rpc = data;
2004     sa = rpc->socket_conf->listen->sockaddr;
2005     tmcf = rpc->temp_conf;
2006 
2007     in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size);
2008 
2009     if (nxt_slow_path(in == NULL)) {
2010         return;
2011     }
2012 
2013     p = in->mem.pos;
2014 
2015     error = *p++;
2016 
2017     size = nxt_length("listen socket error: ")
2018            + nxt_length("{listener: \"\", code:\"\", message: \"\"}")
2019            + sa->length + socket_errors[error].length + (in->mem.free - p);
2020 
2021     out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2022     if (nxt_slow_path(out == NULL)) {
2023         return;
2024     }
2025 
2026     out->mem.free = nxt_sprintf(out->mem.free, out->mem.end,
2027                         "listen socket error: "
2028                         "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}",
2029                         (size_t) sa->length, nxt_sockaddr_start(sa),
2030                         &socket_errors[error], in->mem.free - p, p);
2031 
2032     nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos);
2033 
2034     nxt_router_conf_error(task, tmcf);
2035 }
2036 
2037 
2038 #if (NXT_TLS)
2039 
2040 static void
2041 nxt_router_tls_rpc_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2042     nxt_router_tlssock_t *tls)
2043 {
2044     nxt_socket_rpc_t  *rpc;
2045 
2046     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
2047     if (rpc == NULL) {
2048         nxt_router_conf_error(task, tmcf);
2049         return;
2050     }
2051 
2052     rpc->socket_conf = tls->conf;
2053     rpc->temp_conf = tmcf;
2054 
2055     nxt_cert_store_get(task, &tls->name, tmcf->mem_pool,
2056                        nxt_router_tls_rpc_handler, rpc);
2057 }
2058 
2059 
2060 static void
2061 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2062     void *data)
2063 {
2064     nxt_mp_t               *mp;
2065     nxt_int_t              ret;
2066     nxt_tls_conf_t         *tlscf;
2067     nxt_socket_rpc_t       *rpc;
2068     nxt_router_temp_conf_t *tmcf;
2069 
2070     nxt_debug(task, "tls rpc handler");
2071 
2072     rpc = data;
2073     tmcf = rpc->temp_conf;
2074 
2075     if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) {
2076         goto fail;
2077     }
2078 
2079     mp = tmcf->router_conf->mem_pool;
2080 
2081     tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t));
2082     if (nxt_slow_path(tlscf == NULL)) {
2083         goto fail;
2084     }
2085 
2086     tlscf->chain_file = msg->fd;
2087 
2088     ret = task->thread->runtime->tls->server_init(task, tlscf);
2089     if (nxt_slow_path(ret != NXT_OK)) {
2090         goto fail;
2091     }
2092 
2093     rpc->socket_conf->tls = tlscf;
2094 
2095     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2096                        nxt_router_conf_apply, task, tmcf, NULL);
2097     return;
2098 
2099 fail:
2100 
2101     nxt_router_conf_error(task, tmcf);
2102 }
2103 
2104 #endif
2105 
2106 
2107 static void
2108 nxt_router_app_rpc_create(nxt_task_t *task,
2109     nxt_router_temp_conf_t *tmcf, nxt_app_t *app)
2110 {
2111     size_t         size;
2112     uint32_t       stream;
2113     nxt_int_t      ret;
2114     nxt_buf_t      *b;
2115     nxt_port_t     *main_port, *router_port;
2116     nxt_runtime_t  *rt;
2117     nxt_app_rpc_t  *rpc;
2118 
2119     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_app_rpc_t));
2120     if (rpc == NULL) {
2121         goto fail;
2122     }
2123 
2124     rpc->app = app;
2125     rpc->temp_conf = tmcf;
2126 
2127     nxt_debug(task, "app '%V' prefork", &app->name);
2128 
2129     size = app->name.length + 1 + app->conf.length;
2130 
2131     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2132     if (nxt_slow_path(b == NULL)) {
2133         goto fail;
2134     }
2135 
2136     nxt_buf_cpystr(b, &app->name);
2137     *b->mem.free++ = '\0';
2138     nxt_buf_cpystr(b, &app->conf);
2139 
2140     rt = task->thread->runtime;
2141     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2142     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2143 
2144     stream = nxt_port_rpc_register_handler(task, router_port,
2145                                            nxt_router_app_prefork_ready,
2146                                            nxt_router_app_prefork_error,
2147                                            -1, rpc);
2148     if (nxt_slow_path(stream == 0)) {
2149         goto fail;
2150     }
2151 
2152     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1,
2153                                 stream, router_port->id, b);
2154 
2155     if (nxt_slow_path(ret != NXT_OK)) {
2156         nxt_port_rpc_cancel(task, router_port, stream);
2157         goto fail;
2158     }
2159 
2160     app->pending_processes++;
2161 
2162     return;
2163 
2164 fail:
2165 
2166     nxt_router_conf_error(task, tmcf);
2167 }
2168 
2169 
2170 static void
2171 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2172     void *data)
2173 {
2174     nxt_app_t           *app;
2175     nxt_port_t          *port;
2176     nxt_app_rpc_t       *rpc;
2177     nxt_event_engine_t  *engine;
2178 
2179     rpc = data;
2180     app = rpc->app;
2181 
2182     port = msg->u.new_port;
2183     port->app = app;
2184 
2185     app->pending_processes--;
2186     app->processes++;
2187     app->idle_processes++;
2188 
2189     engine = task->thread->engine;
2190 
2191     nxt_queue_insert_tail(&app->ports, &port->app_link);
2192     nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
2193 
2194     port->idle_start = 0;
2195 
2196     nxt_port_inc_use(port);
2197 
2198     nxt_work_queue_add(&engine->fast_work_queue,
2199                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2200 }
2201 
2202 
2203 static void
2204 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2205     void *data)
2206 {
2207     nxt_app_t               *app;
2208     nxt_app_rpc_t           *rpc;
2209     nxt_router_temp_conf_t  *tmcf;
2210 
2211     rpc = data;
2212     app = rpc->app;
2213     tmcf = rpc->temp_conf;
2214 
2215     nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"",
2216             &app->name);
2217 
2218     app->pending_processes--;
2219 
2220     nxt_router_conf_error(task, tmcf);
2221 }
2222 
2223 
2224 static nxt_int_t
2225 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
2226     nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
2227 {
2228     nxt_int_t                 ret;
2229     nxt_uint_t                n, threads;
2230     nxt_queue_link_t          *qlk;
2231     nxt_router_engine_conf_t  *recf;
2232 
2233     threads = tmcf->router_conf->threads;
2234 
2235     tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
2236                                      sizeof(nxt_router_engine_conf_t));
2237     if (nxt_slow_path(tmcf->engines == NULL)) {
2238         return NXT_ERROR;
2239     }
2240 
2241     n = 0;
2242 
2243     for (qlk = nxt_queue_first(&router->engines);
2244          qlk != nxt_queue_tail(&router->engines);
2245          qlk = nxt_queue_next(qlk))
2246     {
2247         recf = nxt_array_zero_add(tmcf->engines);
2248         if (nxt_slow_path(recf == NULL)) {
2249             return NXT_ERROR;
2250         }
2251 
2252         recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0);
2253 
2254         if (n < threads) {
2255             recf->action = NXT_ROUTER_ENGINE_KEEP;
2256             ret = nxt_router_engine_conf_update(tmcf, recf);
2257 
2258         } else {
2259             recf->action = NXT_ROUTER_ENGINE_DELETE;
2260             ret = nxt_router_engine_conf_delete(tmcf, recf);
2261         }
2262 
2263         if (nxt_slow_path(ret != NXT_OK)) {
2264             return ret;
2265         }
2266 
2267         n++;
2268     }
2269 
2270     tmcf->new_threads = n;
2271 
2272     while (n < threads) {
2273         recf = nxt_array_zero_add(tmcf->engines);
2274         if (nxt_slow_path(recf == NULL)) {
2275             return NXT_ERROR;
2276         }
2277 
2278         recf->action = NXT_ROUTER_ENGINE_ADD;
2279 
2280         recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
2281         if (nxt_slow_path(recf->engine == NULL)) {
2282             return NXT_ERROR;
2283         }
2284 
2285         ret = nxt_router_engine_conf_create(tmcf, recf);
2286         if (nxt_slow_path(ret != NXT_OK)) {
2287             return ret;
2288         }
2289 
2290         n++;
2291     }
2292 
2293     return NXT_OK;
2294 }
2295 
2296 
2297 static nxt_int_t
2298 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
2299     nxt_router_engine_conf_t *recf)
2300 {
2301     nxt_int_t  ret;
2302 
2303     ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating,
2304                                           nxt_router_listen_socket_create);
2305     if (nxt_slow_path(ret != NXT_OK)) {
2306         return ret;
2307     }
2308 
2309     ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating,
2310                                           nxt_router_listen_socket_create);
2311     if (nxt_slow_path(ret != NXT_OK)) {
2312         return ret;
2313     }
2314 
2315     return ret;
2316 }
2317 
2318 
2319 static nxt_int_t
2320 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
2321     nxt_router_engine_conf_t *recf)
2322 {
2323     nxt_int_t  ret;
2324 
2325     ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating,
2326                                           nxt_router_listen_socket_create);
2327     if (nxt_slow_path(ret != NXT_OK)) {
2328         return ret;
2329     }
2330 
2331     ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating,
2332                                           nxt_router_listen_socket_update);
2333     if (nxt_slow_path(ret != NXT_OK)) {
2334         return ret;
2335     }
2336 
2337     ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting);
2338     if (nxt_slow_path(ret != NXT_OK)) {
2339         return ret;
2340     }
2341 
2342     return ret;
2343 }
2344 
2345 
2346 static nxt_int_t
2347 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
2348     nxt_router_engine_conf_t *recf)
2349 {
2350     nxt_int_t  ret;
2351 
2352     ret = nxt_router_engine_quit(tmcf, recf);
2353     if (nxt_slow_path(ret != NXT_OK)) {
2354         return ret;
2355     }
2356 
2357     ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->updating);
2358     if (nxt_slow_path(ret != NXT_OK)) {
2359         return ret;
2360     }
2361 
2362     return nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting);
2363 }
2364 
2365 
2366 static nxt_int_t
2367 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
2368     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
2369     nxt_work_handler_t handler)
2370 {
2371     nxt_joint_job_t          *job;
2372     nxt_queue_link_t         *qlk;
2373     nxt_socket_conf_t        *skcf;
2374     nxt_socket_conf_joint_t  *joint;
2375 
2376     for (qlk = nxt_queue_first(sockets);
2377          qlk != nxt_queue_tail(sockets);
2378          qlk = nxt_queue_next(qlk))
2379     {
2380         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
2381         if (nxt_slow_path(job == NULL)) {
2382             return NXT_ERROR;
2383         }
2384 
2385         job->work.next = recf->jobs;
2386         recf->jobs = &job->work;
2387 
2388         job->task = tmcf->engine->task;
2389         job->work.handler = handler;
2390         job->work.task = &job->task;
2391         job->work.obj = job;
2392         job->tmcf = tmcf;
2393 
2394         tmcf->count++;
2395 
2396         joint = nxt_mp_alloc(tmcf->router_conf->mem_pool,
2397                              sizeof(nxt_socket_conf_joint_t));
2398         if (nxt_slow_path(joint == NULL)) {
2399             return NXT_ERROR;
2400         }
2401 
2402         job->work.data = joint;
2403 
2404         joint->count = 1;
2405 
2406         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2407         skcf->count++;
2408         joint->socket_conf = skcf;
2409 
2410         joint->engine = recf->engine;
2411     }
2412 
2413     return NXT_OK;
2414 }
2415 
2416 
2417 static nxt_int_t
2418 nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
2419     nxt_router_engine_conf_t *recf)
2420 {
2421     nxt_joint_job_t  *job;
2422 
2423     job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
2424     if (nxt_slow_path(job == NULL)) {
2425         return NXT_ERROR;
2426     }
2427 
2428     job->work.next = recf->jobs;
2429     recf->jobs = &job->work;
2430 
2431     job->task = tmcf->engine->task;
2432     job->work.handler = nxt_router_worker_thread_quit;
2433     job->work.task = &job->task;
2434     job->work.obj = NULL;
2435     job->work.data = NULL;
2436     job->tmcf = NULL;
2437 
2438     return NXT_OK;
2439 }
2440 
2441 
2442 static nxt_int_t
2443 nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
2444     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets)
2445 {
2446     nxt_joint_job_t   *job;
2447     nxt_queue_link_t  *qlk;
2448 
2449     for (qlk = nxt_queue_first(sockets);
2450          qlk != nxt_queue_tail(sockets);
2451          qlk = nxt_queue_next(qlk))
2452     {
2453         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
2454         if (nxt_slow_path(job == NULL)) {
2455             return NXT_ERROR;
2456         }
2457 
2458         job->work.next = recf->jobs;
2459         recf->jobs = &job->work;
2460 
2461         job->task = tmcf->engine->task;
2462         job->work.handler = nxt_router_listen_socket_delete;
2463         job->work.task = &job->task;
2464         job->work.obj = job;
2465         job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2466         job->tmcf = tmcf;
2467 
2468         tmcf->count++;
2469     }
2470 
2471     return NXT_OK;
2472 }
2473 
2474 
2475 static nxt_int_t
2476 nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
2477     nxt_router_temp_conf_t *tmcf)
2478 {
2479     nxt_int_t                 ret;
2480     nxt_uint_t                i, threads;
2481     nxt_router_engine_conf_t  *recf;
2482 
2483     recf = tmcf->engines->elts;
2484     threads = tmcf->router_conf->threads;
2485 
2486     for (i = tmcf->new_threads; i < threads; i++) {
2487         ret = nxt_router_thread_create(task, rt, recf[i].engine);
2488         if (nxt_slow_path(ret != NXT_OK)) {
2489             return ret;
2490         }
2491     }
2492 
2493     return NXT_OK;
2494 }
2495 
2496 
2497 static nxt_int_t
2498 nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
2499     nxt_event_engine_t *engine)
2500 {
2501     nxt_int_t            ret;
2502     nxt_thread_link_t    *link;
2503     nxt_thread_handle_t  handle;
2504 
2505     link = nxt_zalloc(sizeof(nxt_thread_link_t));
2506 
2507     if (nxt_slow_path(link == NULL)) {
2508         return NXT_ERROR;
2509     }
2510 
2511     link->start = nxt_router_thread_start;
2512     link->engine = engine;
2513     link->work.handler = nxt_router_thread_exit_handler;
2514     link->work.task = task;
2515     link->work.data = link;
2516 
2517     nxt_queue_insert_tail(&rt->engines, &engine->link);
2518 
2519     ret = nxt_thread_create(&handle, link);
2520 
2521     if (nxt_slow_path(ret != NXT_OK)) {
2522         nxt_queue_remove(&engine->link);
2523     }
2524 
2525     return ret;
2526 }
2527 
2528 
2529 static void
2530 nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
2531     nxt_router_temp_conf_t *tmcf)
2532 {
2533     nxt_app_t  *app;
2534 
2535     nxt_queue_each(app, &router->apps, nxt_app_t, link) {
2536 
2537         nxt_router_app_unlink(task, app);
2538 
2539     } nxt_queue_loop;
2540 
2541     nxt_queue_add(&router->apps, &tmcf->previous);
2542     nxt_queue_add(&router->apps, &tmcf->apps);
2543 }
2544 
2545 
2546 static void
2547 nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf)
2548 {
2549     nxt_uint_t                n;
2550     nxt_event_engine_t        *engine;
2551     nxt_router_engine_conf_t  *recf;
2552 
2553     recf = tmcf->engines->elts;
2554 
2555     for (n = tmcf->engines->nelts; n != 0; n--) {
2556         engine = recf->engine;
2557 
2558         switch (recf->action) {
2559 
2560         case NXT_ROUTER_ENGINE_KEEP:
2561             break;
2562 
2563         case NXT_ROUTER_ENGINE_ADD:
2564             nxt_queue_insert_tail(&router->engines, &engine->link0);
2565             break;
2566 
2567         case NXT_ROUTER_ENGINE_DELETE:
2568             nxt_queue_remove(&engine->link0);
2569             break;
2570         }
2571 
2572         nxt_router_engine_post(engine, recf->jobs);
2573 
2574         recf++;
2575     }
2576 }
2577 
2578 
2579 static void
2580 nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs)
2581 {
2582     nxt_work_t  *work, *next;
2583 
2584     for (work = jobs; work != NULL; work = next) {
2585         next = work->next;
2586         work->next = NULL;
2587 
2588         nxt_event_engine_post(engine, work);
2589     }
2590 }
2591 
2592 
2593 static nxt_port_handlers_t  nxt_router_app_port_handlers = {
2594     .rpc_error = nxt_port_rpc_handler,
2595     .mmap      = nxt_port_mmap_handler,
2596     .data      = nxt_port_rpc_handler,
2597 };
2598 
2599 
2600 static void
2601 nxt_router_thread_start(void *data)
2602 {
2603     nxt_int_t           ret;
2604     nxt_port_t          *port;
2605     nxt_task_t          *task;
2606     nxt_thread_t        *thread;
2607     nxt_thread_link_t   *link;
2608     nxt_event_engine_t  *engine;
2609 
2610     link = data;
2611     engine = link->engine;
2612     task = &engine->task;
2613 
2614     thread = nxt_thread();
2615 
2616     nxt_event_engine_thread_adopt(engine);
2617 
2618     /* STUB */
2619     thread->runtime = engine->task.thread->runtime;
2620 
2621     engine->task.thread = thread;
2622     engine->task.log = thread->log;
2623     thread->engine = engine;
2624     thread->task = &engine->task;
2625 #if 0
2626     thread->fiber = &engine->fibers->fiber;
2627 #endif
2628 
2629     engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
2630     if (nxt_slow_path(engine->mem_pool == NULL)) {
2631         return;
2632     }
2633 
2634     port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid,
2635                         NXT_PROCESS_ROUTER);
2636     if (nxt_slow_path(port == NULL)) {
2637         return;
2638     }
2639 
2640     ret = nxt_port_socket_init(task, port, 0);
2641     if (nxt_slow_path(ret != NXT_OK)) {
2642         nxt_port_use(task, port, -1);
2643         return;
2644     }
2645 
2646     engine->port = port;
2647 
2648     nxt_port_enable(task, port, &nxt_router_app_port_handlers);
2649 
2650     nxt_event_engine_start(engine);
2651 }
2652 
2653 
2654 static void
2655 nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data)
2656 {
2657     nxt_joint_job_t          *job;
2658     nxt_socket_conf_t        *skcf;
2659     nxt_listen_event_t       *lev;
2660     nxt_listen_socket_t      *ls;
2661     nxt_thread_spinlock_t    *lock;
2662     nxt_socket_conf_joint_t  *joint;
2663 
2664     job = obj;
2665     joint = data;
2666 
2667     nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link);
2668 
2669     skcf = joint->socket_conf;
2670     ls = skcf->listen;
2671 
2672     lev = nxt_listen_event(task, ls);
2673     if (nxt_slow_path(lev == NULL)) {
2674         nxt_router_listen_socket_release(task, skcf);
2675         return;
2676     }
2677 
2678     lev->socket.data = joint;
2679 
2680     lock = &skcf->router_conf->router->lock;
2681 
2682     nxt_thread_spin_lock(lock);
2683     ls->count++;
2684     nxt_thread_spin_unlock(lock);
2685 
2686     job->work.next = NULL;
2687     job->work.handler = nxt_router_conf_wait;
2688 
2689     nxt_event_engine_post(job->tmcf->engine, &job->work);
2690 }
2691 
2692 
2693 nxt_inline nxt_listen_event_t *
2694 nxt_router_listen_event(nxt_queue_t *listen_connections,
2695     nxt_socket_conf_t *skcf)
2696 {
2697     nxt_socket_t        fd;
2698     nxt_queue_link_t    *qlk;
2699     nxt_listen_event_t  *lev;
2700 
2701     fd = skcf->listen->socket;
2702 
2703     for (qlk = nxt_queue_first(listen_connections);
2704          qlk != nxt_queue_tail(listen_connections);
2705          qlk = nxt_queue_next(qlk))
2706     {
2707         lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link);
2708 
2709         if (fd == lev->socket.fd) {
2710             return lev;
2711         }
2712     }
2713 
2714     return NULL;
2715 }
2716 
2717 
2718 static void
2719 nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data)
2720 {
2721     nxt_joint_job_t          *job;
2722     nxt_event_engine_t       *engine;
2723     nxt_listen_event_t       *lev;
2724     nxt_socket_conf_joint_t  *joint, *old;
2725 
2726     job = obj;
2727     joint = data;
2728 
2729     engine = task->thread->engine;
2730 
2731     nxt_queue_insert_tail(&engine->joints, &joint->link);
2732 
2733     lev = nxt_router_listen_event(&engine->listen_connections,
2734                                   joint->socket_conf);
2735 
2736     old = lev->socket.data;
2737     lev->socket.data = joint;
2738     lev->listen = joint->socket_conf->listen;
2739 
2740     job->work.next = NULL;
2741     job->work.handler = nxt_router_conf_wait;
2742 
2743     nxt_event_engine_post(job->tmcf->engine, &job->work);
2744 
2745     /*
2746      * The task is allocated from configuration temporary
2747      * memory pool so it can be freed after engine post operation.
2748      */
2749 
2750     nxt_router_conf_release(&engine->task, old);
2751 }
2752 
2753 
2754 static void
2755 nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data)
2756 {
2757     nxt_joint_job_t     *job;
2758     nxt_socket_conf_t   *skcf;
2759     nxt_listen_event_t  *lev;
2760     nxt_event_engine_t  *engine;
2761 
2762     job = obj;
2763     skcf = data;
2764 
2765     engine = task->thread->engine;
2766 
2767     lev = nxt_router_listen_event(&engine->listen_connections, skcf);
2768 
2769     nxt_fd_event_delete(engine, &lev->socket);
2770 
2771     nxt_debug(task, "engine %p: listen socket delete: %d", engine,
2772               lev->socket.fd);
2773 
2774     lev->timer.handler = nxt_router_listen_socket_close;
2775     lev->timer.work_queue = &engine->fast_work_queue;
2776 
2777     nxt_timer_add(engine, &lev->timer, 0);
2778 
2779     job->work.next = NULL;
2780     job->work.handler = nxt_router_conf_wait;
2781 
2782     nxt_event_engine_post(job->tmcf->engine, &job->work);
2783 }
2784 
2785 
2786 static void
2787 nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, void *data)
2788 {
2789     nxt_event_engine_t  *engine;
2790 
2791     nxt_debug(task, "router worker thread quit");
2792 
2793     engine = task->thread->engine;
2794 
2795     engine->shutdown = 1;
2796 
2797     if (nxt_queue_is_empty(&engine->joints)) {
2798         nxt_thread_exit(task->thread);
2799     }
2800 }
2801 
2802 
2803 static void
2804 nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data)
2805 {
2806     nxt_timer_t              *timer;
2807     nxt_listen_event_t       *lev;
2808     nxt_socket_conf_joint_t  *joint;
2809 
2810     timer = obj;
2811     lev = nxt_timer_data(timer, nxt_listen_event_t, timer);
2812 
2813     nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine,
2814               lev->socket.fd);
2815 
2816     nxt_queue_remove(&lev->link);
2817 
2818     joint = lev->socket.data;
2819     lev->socket.data = NULL;
2820 
2821     /* 'task' refers to lev->task and we cannot use after nxt_free() */
2822     task = &task->thread->engine->task;
2823 
2824     nxt_router_listen_socket_release(task, joint->socket_conf);
2825 
2826     nxt_router_listen_event_release(task, lev, joint);
2827 }
2828 
2829 
2830 static void
2831 nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf)
2832 {
2833     nxt_listen_socket_t    *ls;
2834     nxt_thread_spinlock_t  *lock;
2835 
2836     ls = skcf->listen;
2837     lock = &skcf->router_conf->router->lock;
2838 
2839     nxt_thread_spin_lock(lock);
2840 
2841     nxt_debug(task, "engine %p: listen socket release: ls->count %D",
2842               task->thread->engine, ls->count);
2843 
2844     if (--ls->count != 0) {
2845         ls = NULL;
2846     }
2847 
2848     nxt_thread_spin_unlock(lock);
2849 
2850     if (ls != NULL) {
2851         nxt_socket_close(task, ls->socket);
2852         nxt_free(ls);
2853     }
2854 }
2855 
2856 
2857 void
2858 nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev,
2859     nxt_socket_conf_joint_t *joint)
2860 {
2861     nxt_event_engine_t  *engine;
2862 
2863     nxt_debug(task, "listen event count: %D", lev->count);
2864 
2865     if (--lev->count == 0) {
2866         nxt_free(lev);
2867     }
2868 
2869     if (joint != NULL) {
2870         nxt_router_conf_release(task, joint);
2871     }
2872 
2873     engine = task->thread->engine;
2874 
2875     if (engine->shutdown && nxt_queue_is_empty(&engine->joints)) {
2876         nxt_thread_exit(task->thread);
2877     }
2878 }
2879 
2880 
2881 void
2882 nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
2883 {
2884     nxt_app_t              *app;
2885     nxt_socket_conf_t      *skcf;
2886     nxt_router_conf_t      *rtcf;
2887     nxt_thread_spinlock_t  *lock;
2888 
2889     nxt_debug(task, "conf joint %p count: %D", joint, joint->count);
2890 
2891     if (--joint->count != 0) {
2892         return;
2893     }
2894 
2895     nxt_queue_remove(&joint->link);
2896 
2897     /*
2898      * The joint content can not be safely used after the critical
2899      * section protected by the spinlock because its memory pool may
2900      * be already destroyed by another thread.
2901      */
2902     skcf = joint->socket_conf;
2903     app = skcf->application;
2904     rtcf = skcf->router_conf;
2905     lock = &rtcf->router->lock;
2906 
2907     nxt_thread_spin_lock(lock);
2908 
2909     nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count,
2910               rtcf, rtcf->count);
2911 
2912     if (--skcf->count != 0) {
2913         rtcf = NULL;
2914         app = NULL;
2915 
2916     } else {
2917         nxt_queue_remove(&skcf->link);
2918 
2919         if (--rtcf->count != 0) {
2920             rtcf = NULL;
2921         }
2922     }
2923 
2924     nxt_thread_spin_unlock(lock);
2925 
2926     if (app != NULL) {
2927         nxt_router_app_use(task, app, -1);
2928     }
2929 
2930     /* TODO remove engine->port */
2931     /* TODO excude from connected ports */
2932 
2933     if (rtcf != NULL) {
2934         nxt_debug(task, "old router conf is destroyed");
2935 
2936 #if (NXT_TLS)
2937         if (skcf->tls != NULL) {
2938             task->thread->runtime->tls->server_free(task, skcf->tls);
2939         }
2940 #endif
2941 
2942         nxt_router_access_log_release(task, lock, rtcf->access_log);
2943 
2944         nxt_mp_thread_adopt(rtcf->mem_pool);
2945 
2946         nxt_mp_destroy(rtcf->mem_pool);
2947     }
2948 }
2949 
2950 
2951 static void
2952 nxt_router_access_log_writer(nxt_task_t *task, nxt_http_request_t *r,
2953     nxt_router_access_log_t *access_log)
2954 {
2955     size_t     size;
2956     u_char     *buf, *p;
2957     nxt_off_t  bytes;
2958 
2959     static nxt_time_string_t  date_cache = {
2960         (nxt_atomic_uint_t) -1,
2961         nxt_router_access_log_date,
2962         "%02d/%s/%4d:%02d:%02d:%02d %c%02d%02d",
2963         nxt_length("31/Dec/1986:19:40:00 +0300"),
2964         NXT_THREAD_TIME_LOCAL,
2965         NXT_THREAD_TIME_SEC,
2966     };
2967 
2968     size = r->remote->address_length
2969            + 6                  /* ' - - [' */
2970            + date_cache.size
2971            + 3                  /* '] "' */
2972            + r->method->length
2973            + 1                  /* space */
2974            + r->target.length
2975            + 1                  /* space */
2976            + r->version.length
2977            + 2                  /* '" ' */
2978            + 3                  /* status */
2979            + 1                  /* space */
2980            + NXT_OFF_T_LEN
2981            + 2                  /* ' "' */
2982            + (r->referer != NULL ? r->referer->value_length : 1)
2983            + 3                  /* '" "' */
2984            + (r->user_agent != NULL ? r->user_agent->value_length : 1)
2985            + 2                  /* '"\n' */
2986     ;
2987 
2988     buf = nxt_mp_nget(r->mem_pool, size);
2989     if (nxt_slow_path(buf == NULL)) {
2990         return;
2991     }
2992 
2993     p = nxt_cpymem(buf, nxt_sockaddr_address(r->remote),
2994                    r->remote->address_length);
2995 
2996     p = nxt_cpymem(p, " - - [", 6);
2997 
2998     p = nxt_thread_time_string(task->thread, &date_cache, p);
2999 
3000     p = nxt_cpymem(p, "] \"", 3);
3001 
3002     if (r->method->length != 0) {
3003         p = nxt_cpymem(p, r->method->start, r->method->length);
3004 
3005         if (r->target.length != 0) {
3006             *p++ = ' ';
3007             p = nxt_cpymem(p, r->target.start, r->target.length);
3008 
3009             if (r->version.length != 0) {
3010                 *p++ = ' ';
3011                 p = nxt_cpymem(p, r->version.start, r->version.length);
3012             }
3013         }
3014 
3015     } else {
3016         *p++ = '-';
3017     }
3018 
3019     p = nxt_cpymem(p, "\" ", 2);
3020 
3021     p = nxt_sprintf(p, p + 3, "%03d", r->status);
3022 
3023     *p++ = ' ';
3024 
3025     bytes = nxt_http_proto_body_bytes_sent[r->protocol](task, r->proto);
3026 
3027     p = nxt_sprintf(p, p + NXT_OFF_T_LEN, "%O", bytes);
3028 
3029     p = nxt_cpymem(p, " \"", 2);
3030 
3031     if (r->referer != NULL) {
3032         p = nxt_cpymem(p, r->referer->value, r->referer->value_length);
3033 
3034     } else {
3035         *p++ = '-';
3036     }
3037 
3038     p = nxt_cpymem(p, "\" \"", 3);
3039 
3040     if (r->user_agent != NULL) {
3041         p = nxt_cpymem(p, r->user_agent->value, r->user_agent->value_length);
3042 
3043     } else {
3044         *p++ = '-';
3045     }
3046 
3047     p = nxt_cpymem(p, "\"\n", 2);
3048 
3049     nxt_fd_write(access_log->fd, buf, p - buf);
3050 }
3051 
3052 
3053 static u_char *
3054 nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, struct tm *tm,
3055     size_t size, const char *format)
3056 {
3057     u_char  sign;
3058     time_t  gmtoff;
3059 
3060     static const char  *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun",
3061                                     "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
3062 
3063     gmtoff = nxt_timezone(tm) / 60;
3064 
3065     if (gmtoff < 0) {
3066         gmtoff = -gmtoff;
3067         sign = '-';
3068 
3069     } else {
3070         sign = '+';
3071     }
3072 
3073     return nxt_sprintf(buf, buf + size, format,
3074                        tm->tm_mday, month[tm->tm_mon], tm->tm_year + 1900,
3075                        tm->tm_hour, tm->tm_min, tm->tm_sec,
3076                        sign, gmtoff / 60, gmtoff % 60);
3077 }
3078 
3079 
3080 static void
3081 nxt_router_access_log_open(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
3082 {
3083     uint32_t                 stream;
3084     nxt_int_t                ret;
3085     nxt_buf_t                *b;
3086     nxt_port_t               *main_port, *router_port;
3087     nxt_runtime_t            *rt;
3088     nxt_router_access_log_t  *access_log;
3089 
3090     access_log = tmcf->router_conf->access_log;
3091 
3092     b = nxt_buf_mem_alloc(tmcf->mem_pool, access_log->path.length + 1, 0);
3093     if (nxt_slow_path(b == NULL)) {
3094         goto fail;
3095     }
3096 
3097     nxt_buf_cpystr(b, &access_log->path);
3098     *b->mem.free++ = '\0';
3099 
3100     rt = task->thread->runtime;
3101     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
3102     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
3103 
3104     stream = nxt_port_rpc_register_handler(task, router_port,
3105                                            nxt_router_access_log_ready,
3106                                            nxt_router_access_log_error,
3107                                            -1, tmcf);
3108     if (nxt_slow_path(stream == 0)) {
3109         goto fail;
3110     }
3111 
3112     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1,
3113                                 stream, router_port->id, b);
3114 
3115     if (nxt_slow_path(ret != NXT_OK)) {
3116         nxt_port_rpc_cancel(task, router_port, stream);
3117         goto fail;
3118     }
3119 
3120     return;
3121 
3122 fail:
3123 
3124     nxt_router_conf_error(task, tmcf);
3125 }
3126 
3127 
3128 static void
3129 nxt_router_access_log_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3130     void *data)
3131 {
3132     nxt_router_temp_conf_t   *tmcf;
3133     nxt_router_access_log_t  *access_log;
3134 
3135     tmcf = data;
3136 
3137     access_log = tmcf->router_conf->access_log;
3138 
3139     access_log->fd = msg->fd;
3140 
3141     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
3142                        nxt_router_conf_apply, task, tmcf, NULL);
3143 }
3144 
3145 
3146 static void
3147 nxt_router_access_log_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3148     void *data)
3149 {
3150     nxt_router_temp_conf_t  *tmcf;
3151 
3152     tmcf = data;
3153 
3154     nxt_router_conf_error(task, tmcf);
3155 }
3156 
3157 
3158 static void
3159 nxt_router_access_log_release(nxt_task_t *task, nxt_thread_spinlock_t *lock,
3160     nxt_router_access_log_t *access_log)
3161 {
3162     if (access_log == NULL) {
3163         return;
3164     }
3165 
3166     nxt_thread_spin_lock(lock);
3167 
3168     if (--access_log->count != 0) {
3169         access_log = NULL;
3170     }
3171 
3172     nxt_thread_spin_unlock(lock);
3173 
3174     if (access_log != NULL) {
3175 
3176         if (access_log->fd != -1) {
3177             nxt_fd_close(access_log->fd);
3178         }
3179 
3180         nxt_free(access_log);
3181     }
3182 }
3183 
3184 
3185 typedef struct {
3186     nxt_mp_t                 *mem_pool;
3187