xref: /unit/src/nxt_router.c (revision 743:e0f0cd7d244a)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Valentin V. Bartenev
5  * Copyright (C) NGINX, Inc.
6  */
7 
8 #include <nxt_router.h>
9 #include <nxt_conf.h>
10 #include <nxt_http.h>
11 #include <nxt_port_memory_int.h>
12 #include <nxt_unit_request.h>
13 #include <nxt_unit_response.h>
14 
15 
16 typedef struct {
17     nxt_str_t         type;
18     uint32_t          processes;
19     uint32_t          max_processes;
20     uint32_t          spare_processes;
21     nxt_msec_t        timeout;
22     nxt_msec_t        res_timeout;
23     nxt_msec_t        idle_timeout;
24     uint32_t          requests;
25     nxt_conf_value_t  *limits_value;
26     nxt_conf_value_t  *processes_value;
27 } nxt_router_app_conf_t;
28 
29 
30 typedef struct {
31     nxt_str_t  application;
32 } nxt_router_listener_conf_t;
33 
34 
35 typedef struct nxt_msg_info_s {
36     nxt_buf_t                 *buf;
37     nxt_port_mmap_tracking_t  tracking;
38     nxt_work_handler_t        completion_handler;
39 } nxt_msg_info_t;
40 
41 
42 typedef struct nxt_req_app_link_s nxt_req_app_link_t;
43 
44 
45 typedef struct {
46     uint32_t                 stream;
47     nxt_app_t                *app;
48     nxt_port_t               *app_port;
49     nxt_app_parse_ctx_t      *ap;
50     nxt_msg_info_t           msg_info;
51     nxt_req_app_link_t       *ra;
52 
53     nxt_queue_link_t         link;     /* for nxt_conn_t.requests */
54 } nxt_req_conn_link_t;
55 
56 
57 struct nxt_req_app_link_s {
58     uint32_t             stream;
59     nxt_atomic_t         use_count;
60     nxt_port_t           *app_port;
61     nxt_port_t           *reply_port;
62     nxt_app_parse_ctx_t  *ap;
63     nxt_msg_info_t       msg_info;
64     nxt_req_conn_link_t  *rc;
65 
66     nxt_nsec_t           res_time;
67 
68     nxt_queue_link_t     link_app_requests; /* for nxt_app_t.requests */
69     nxt_queue_link_t     link_port_pending; /* for nxt_port_t.pending_requests */
70     nxt_queue_link_t     link_app_pending;  /* for nxt_app_t.pending */
71 
72     nxt_mp_t             *mem_pool;
73     nxt_work_t           work;
74 
75     int                  err_code;
76     const char           *err_str;
77 };
78 
79 
80 typedef struct {
81     nxt_socket_conf_t       *socket_conf;
82     nxt_router_temp_conf_t  *temp_conf;
83 } nxt_socket_rpc_t;
84 
85 
86 typedef struct {
87     nxt_app_t               *app;
88     nxt_router_temp_conf_t  *temp_conf;
89 } nxt_app_rpc_t;
90 
91 
92 struct nxt_port_select_state_s {
93     nxt_app_t           *app;
94     nxt_req_app_link_t  *ra;
95 
96     nxt_port_t          *failed_port;
97     int                 failed_port_use_delta;
98 
99     uint8_t             start_process;    /* 1 bit */
100     nxt_req_app_link_t  *shared_ra;
101     nxt_port_t          *port;
102 };
103 
104 typedef struct nxt_port_select_state_s nxt_port_select_state_t;
105 
106 static void nxt_router_greet_controller(nxt_task_t *task,
107     nxt_port_t *controller_port);
108 
109 static void nxt_router_port_select(nxt_task_t *task,
110     nxt_port_select_state_t *state);
111 
112 static nxt_int_t nxt_router_port_post_select(nxt_task_t *task,
113     nxt_port_select_state_t *state);
114 
115 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
116 
117 nxt_inline void
118 nxt_router_ra_inc_use(nxt_req_app_link_t *ra)
119 {
120     nxt_atomic_fetch_add(&ra->use_count, 1);
121 }
122 
123 nxt_inline void
124 nxt_router_ra_dec_use(nxt_req_app_link_t *ra)
125 {
126 #if (NXT_DEBUG)
127     int  c;
128 
129     c = nxt_atomic_fetch_add(&ra->use_count, -1);
130 
131     nxt_assert(c > 1);
132 #else
133     (void) nxt_atomic_fetch_add(&ra->use_count, -1);
134 #endif
135 }
136 
137 static void nxt_router_ra_use(nxt_task_t *task, nxt_req_app_link_t *ra, int i);
138 
139 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
140 static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
141 static void nxt_router_conf_ready(nxt_task_t *task,
142     nxt_router_temp_conf_t *tmcf);
143 static void nxt_router_conf_error(nxt_task_t *task,
144     nxt_router_temp_conf_t *tmcf);
145 static void nxt_router_conf_send(nxt_task_t *task,
146     nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
147 
148 static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
149     nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
150 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
151 static nxt_app_t *nxt_router_listener_application(nxt_router_temp_conf_t *tmcf,
152     nxt_str_t *name);
153 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
154     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
155 static void nxt_router_listen_socket_ready(nxt_task_t *task,
156     nxt_port_recv_msg_t *msg, void *data);
157 static void nxt_router_listen_socket_error(nxt_task_t *task,
158     nxt_port_recv_msg_t *msg, void *data);
159 static void nxt_router_app_rpc_create(nxt_task_t *task,
160     nxt_router_temp_conf_t *tmcf, nxt_app_t *app);
161 static void nxt_router_app_prefork_ready(nxt_task_t *task,
162     nxt_port_recv_msg_t *msg, void *data);
163 static void nxt_router_app_prefork_error(nxt_task_t *task,
164     nxt_port_recv_msg_t *msg, void *data);
165 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
166     nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
167 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
168     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa);
169 
170 static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
171     nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
172     const nxt_event_interface_t *interface);
173 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
174     nxt_router_engine_conf_t *recf);
175 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
176     nxt_router_engine_conf_t *recf);
177 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
178     nxt_router_engine_conf_t *recf);
179 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
180     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
181     nxt_work_handler_t handler);
182 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
183     nxt_router_engine_conf_t *recf);
184 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
185     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
186 
187 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
188     nxt_router_temp_conf_t *tmcf);
189 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
190     nxt_event_engine_t *engine);
191 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
192     nxt_router_temp_conf_t *tmcf);
193 
194 static void nxt_router_engines_post(nxt_router_t *router,
195     nxt_router_temp_conf_t *tmcf);
196 static void nxt_router_engine_post(nxt_event_engine_t *engine,
197     nxt_work_t *jobs);
198 
199 static void nxt_router_thread_start(void *data);
200 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
201     void *data);
202 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
203     void *data);
204 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
205     void *data);
206 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
207     void *data);
208 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
209     void *data);
210 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
211     void *data);
212 static void nxt_router_listen_socket_release(nxt_task_t *task,
213     nxt_socket_conf_t *skcf);
214 
215 static void nxt_router_access_log_writer(nxt_task_t *task,
216     nxt_http_request_t *r, nxt_router_access_log_t *access_log);
217 static u_char *nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now,
218     struct tm *tm, size_t size, const char *format);
219 static void nxt_router_access_log_open(nxt_task_t *task,
220     nxt_router_temp_conf_t *tmcf);
221 static void nxt_router_access_log_ready(nxt_task_t *task,
222     nxt_port_recv_msg_t *msg, void *data);
223 static void nxt_router_access_log_error(nxt_task_t *task,
224     nxt_port_recv_msg_t *msg, void *data);
225 static void nxt_router_access_log_release(nxt_task_t *task,
226     nxt_thread_spinlock_t *lock, nxt_router_access_log_t *access_log);
227 static void nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj,
228     void *data);
229 static void nxt_router_access_log_reopen_ready(nxt_task_t *task,
230     nxt_port_recv_msg_t *msg, void *data);
231 static void nxt_router_access_log_reopen_error(nxt_task_t *task,
232     nxt_port_recv_msg_t *msg, void *data);
233 
234 static void nxt_router_app_port_ready(nxt_task_t *task,
235     nxt_port_recv_msg_t *msg, void *data);
236 static void nxt_router_app_port_error(nxt_task_t *task,
237     nxt_port_recv_msg_t *msg, void *data);
238 
239 static void nxt_router_app_quit(nxt_task_t *task, nxt_app_t *app);
240 static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
241     uint32_t request_failed, uint32_t got_response);
242 static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app,
243     nxt_req_app_link_t *ra);
244 
245 static void nxt_router_app_prepare_request(nxt_task_t *task,
246     nxt_req_app_link_t *ra);
247 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
248     nxt_port_t *port, const nxt_str_t *prefix);
249 
250 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
251 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
252     void *data);
253 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj,
254     void *data);
255 static void nxt_router_app_release_handler(nxt_task_t *task, void *obj,
256     void *data);
257 
258 static const nxt_http_request_state_t  nxt_http_request_send_state;
259 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
260 
261 static nxt_router_t  *nxt_router;
262 
263 static const nxt_str_t http_prefix = nxt_string("HTTP_");
264 static const nxt_str_t empty_prefix = nxt_string("");
265 
266 static const nxt_str_t  *nxt_app_msg_prefix[] = {
267     &http_prefix,
268     &http_prefix,
269     &empty_prefix,
270     &http_prefix,
271     &http_prefix,
272 };
273 
274 
275 nxt_port_handlers_t  nxt_router_process_port_handlers = {
276     .quit         = nxt_worker_process_quit_handler,
277     .new_port     = nxt_router_new_port_handler,
278     .change_file  = nxt_port_change_log_file_handler,
279     .mmap         = nxt_port_mmap_handler,
280     .data         = nxt_router_conf_data_handler,
281     .remove_pid   = nxt_router_remove_pid_handler,
282     .access_log   = nxt_router_access_log_reopen_handler,
283     .rpc_ready    = nxt_port_rpc_handler,
284     .rpc_error    = nxt_port_rpc_handler,
285 };
286 
287 
288 nxt_int_t
289 nxt_router_start(nxt_task_t *task, void *data)
290 {
291     nxt_int_t      ret;
292     nxt_port_t     *controller_port;
293     nxt_router_t   *router;
294     nxt_runtime_t  *rt;
295 
296     rt = task->thread->runtime;
297 
298     ret = nxt_http_init(task, rt);
299     if (nxt_slow_path(ret != NXT_OK)) {
300         return ret;
301     }
302 
303     router = nxt_zalloc(sizeof(nxt_router_t));
304     if (nxt_slow_path(router == NULL)) {
305         return NXT_ERROR;
306     }
307 
308     nxt_queue_init(&router->engines);
309     nxt_queue_init(&router->sockets);
310     nxt_queue_init(&router->apps);
311 
312     nxt_router = router;
313 
314     controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
315     if (controller_port != NULL) {
316         nxt_router_greet_controller(task, controller_port);
317     }
318 
319     return NXT_OK;
320 }
321 
322 
323 static void
324 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port)
325 {
326     nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY,
327                           -1, 0, 0, NULL);
328 }
329 
330 
331 static void
332 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
333     void *data)
334 {
335     size_t         size;
336     uint32_t       stream;
337     nxt_mp_t       *mp;
338     nxt_int_t      ret;
339     nxt_app_t      *app;
340     nxt_buf_t      *b;
341     nxt_port_t     *main_port;
342     nxt_runtime_t  *rt;
343 
344     app = data;
345 
346     rt = task->thread->runtime;
347     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
348 
349     nxt_debug(task, "app '%V' %p start process", &app->name, app);
350 
351     size = app->name.length + 1 + app->conf.length;
352 
353     b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size);
354 
355     if (nxt_slow_path(b == NULL)) {
356         goto failed;
357     }
358 
359     nxt_buf_cpystr(b, &app->name);
360     *b->mem.free++ = '\0';
361     nxt_buf_cpystr(b, &app->conf);
362 
363     stream = nxt_port_rpc_register_handler(task, port,
364                                            nxt_router_app_port_ready,
365                                            nxt_router_app_port_error,
366                                            -1, app);
367 
368     if (nxt_slow_path(stream == 0)) {
369         goto failed;
370     }
371 
372     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1,
373                                 stream, port->id, b);
374 
375     if (nxt_slow_path(ret != NXT_OK)) {
376         nxt_port_rpc_cancel(task, port, stream);
377         goto failed;
378     }
379 
380     return;
381 
382 failed:
383 
384     if (b != NULL) {
385         mp = b->data;
386         nxt_mp_free(mp, b);
387         nxt_mp_release(mp);
388     }
389 
390     nxt_thread_mutex_lock(&app->mutex);
391 
392     app->pending_processes--;
393 
394     nxt_thread_mutex_unlock(&app->mutex);
395 
396     nxt_router_app_use(task, app, -1);
397 }
398 
399 
400 static nxt_int_t
401 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
402 {
403     nxt_int_t      res;
404     nxt_port_t     *router_port;
405     nxt_runtime_t  *rt;
406 
407     rt = task->thread->runtime;
408     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
409 
410     nxt_router_app_use(task, app, 1);
411 
412     res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler,
413                         app);
414 
415     if (res == NXT_OK) {
416         return res;
417     }
418 
419     nxt_thread_mutex_lock(&app->mutex);
420 
421     app->pending_processes--;
422 
423     nxt_thread_mutex_unlock(&app->mutex);
424 
425     nxt_router_app_use(task, app, -1);
426 
427     return NXT_ERROR;
428 }
429 
430 
431 nxt_inline void
432 nxt_router_ra_init(nxt_task_t *task, nxt_req_app_link_t *ra,
433     nxt_req_conn_link_t *rc)
434 {
435     nxt_event_engine_t  *engine;
436 
437     engine = task->thread->engine;
438 
439     nxt_memzero(ra, sizeof(nxt_req_app_link_t));
440 
441     ra->stream = rc->stream;
442     ra->use_count = 1;
443     ra->rc = rc;
444     rc->ra = ra;
445     ra->reply_port = engine->port;
446     ra->ap = rc->ap;
447 
448     ra->work.handler = NULL;
449     ra->work.task = &engine->task;
450     ra->work.obj = ra;
451     ra->work.data = engine;
452 }
453 
454 
455 nxt_inline nxt_req_app_link_t *
456 nxt_router_ra_create(nxt_task_t *task, nxt_req_app_link_t *ra_src)
457 {
458     nxt_mp_t            *mp;
459     nxt_req_app_link_t  *ra;
460 
461     if (ra_src->mem_pool != NULL) {
462         return ra_src;
463     }
464 
465     mp = ra_src->ap->mem_pool;
466 
467     ra = nxt_mp_alloc(mp, sizeof(nxt_req_app_link_t));
468 
469     if (nxt_slow_path(ra == NULL)) {
470 
471         ra_src->rc->ra = NULL;
472         ra_src->rc = NULL;
473 
474         return NULL;
475     }
476 
477     nxt_mp_retain(mp);
478 
479     nxt_router_ra_init(task, ra, ra_src->rc);
480 
481     ra->mem_pool = mp;
482 
483     return ra;
484 }
485 
486 
487 nxt_inline nxt_bool_t
488 nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info,
489     uint32_t stream)
490 {
491     nxt_buf_t   *b, *next;
492     nxt_bool_t  cancelled;
493 
494     if (msg_info->buf == NULL) {
495         return 0;
496     }
497 
498     cancelled = nxt_port_mmap_tracking_cancel(task, &msg_info->tracking,
499                                               stream);
500 
501     if (cancelled) {
502         nxt_debug(task, "stream #%uD: cancelled by router", stream);
503     }
504 
505     for (b = msg_info->buf; b != NULL; b = next) {
506         next = b->next;
507 
508         b->completion_handler = msg_info->completion_handler;
509 
510         if (b->is_port_mmap_sent) {
511             b->is_port_mmap_sent = cancelled == 0;
512             b->completion_handler(task, b, b->parent);
513         }
514     }
515 
516     msg_info->buf = NULL;
517 
518     return cancelled;
519 }
520 
521 
522 static void
523 nxt_router_ra_update_peer(nxt_task_t *task, nxt_req_app_link_t *ra);
524 
525 
526 static void
527 nxt_router_ra_update_peer_handler(nxt_task_t *task, void *obj, void *data)
528 {
529     nxt_req_app_link_t  *ra;
530 
531     ra = obj;
532 
533     nxt_router_ra_update_peer(task, ra);
534 
535     nxt_router_ra_use(task, ra, -1);
536 }
537 
538 
539 static void
540 nxt_router_ra_update_peer(nxt_task_t *task, nxt_req_app_link_t *ra)
541 {
542     nxt_event_engine_t   *engine;
543     nxt_req_conn_link_t  *rc;
544 
545     engine = ra->work.data;
546 
547     if (task->thread->engine != engine) {
548         nxt_router_ra_inc_use(ra);
549 
550         ra->work.handler = nxt_router_ra_update_peer_handler;
551         ra->work.task = &engine->task;
552         ra->work.next = NULL;
553 
554         nxt_debug(task, "ra stream #%uD post update peer to %p",
555                   ra->stream, engine);
556 
557         nxt_event_engine_post(engine, &ra->work);
558 
559         return;
560     }
561 
562     nxt_debug(task, "ra stream #%uD update peer", ra->stream);
563 
564     rc = ra->rc;
565 
566     if (rc != NULL && ra->app_port != NULL) {
567         nxt_port_rpc_ex_set_peer(task, engine->port, rc, ra->app_port->pid);
568     }
569 
570     nxt_router_ra_use(task, ra, -1);
571 }
572 
573 
574 static void
575 nxt_router_ra_release(nxt_task_t *task, nxt_req_app_link_t *ra)
576 {
577     nxt_mp_t                *mp;
578     nxt_req_conn_link_t     *rc;
579 
580     nxt_assert(task->thread->engine == ra->work.data);
581     nxt_assert(ra->use_count == 0);
582 
583     nxt_debug(task, "ra stream #%uD release", ra->stream);
584 
585     rc = ra->rc;
586 
587     if (rc != NULL) {
588         if (nxt_slow_path(ra->err_code != 0)) {
589             nxt_http_request_error(task, rc->ap->request, ra->err_code);
590 
591         } else {
592             rc->app_port = ra->app_port;
593             rc->msg_info = ra->msg_info;
594 
595             if (rc->app->timeout != 0) {
596                 rc->ap->timer.handler = nxt_router_app_timeout;
597                 rc->ap->timer_data = rc;
598                 nxt_timer_add(task->thread->engine, &rc->ap->timer,
599                               rc->app->timeout);
600             }
601 
602             ra->app_port = NULL;
603             ra->msg_info.buf = NULL;
604         }
605 
606         rc->ra = NULL;
607         ra->rc = NULL;
608     }
609 
610     if (ra->app_port != NULL) {
611         nxt_router_app_port_release(task, ra->app_port, 0, 1);
612 
613         ra->app_port = NULL;
614     }
615 
616     nxt_router_msg_cancel(task, &ra->msg_info, ra->stream);
617 
618     mp = ra->mem_pool;
619 
620     if (mp != NULL) {
621         nxt_mp_free(mp, ra);
622         nxt_mp_release(mp);
623     }
624 }
625 
626 
627 static void
628 nxt_router_ra_release_handler(nxt_task_t *task, void *obj, void *data)
629 {
630     nxt_req_app_link_t  *ra;
631 
632     ra = obj;
633 
634     nxt_assert(ra->work.data == data);
635 
636     nxt_atomic_fetch_add(&ra->use_count, -1);
637 
638     nxt_router_ra_release(task, ra);
639 }
640 
641 
642 static void
643 nxt_router_ra_use(nxt_task_t *task, nxt_req_app_link_t *ra, int i)
644 {
645     int                 c;
646     nxt_event_engine_t  *engine;
647 
648     c = nxt_atomic_fetch_add(&ra->use_count, i);
649 
650     if (i < 0 && c == -i) {
651         engine = ra->work.data;
652 
653         if (task->thread->engine == engine) {
654             nxt_router_ra_release(task, ra);
655 
656             return;
657         }
658 
659         nxt_router_ra_inc_use(ra);
660 
661         ra->work.handler = nxt_router_ra_release_handler;
662         ra->work.task = &engine->task;
663         ra->work.next = NULL;
664 
665         nxt_debug(task, "ra stream #%uD post release to %p",
666                   ra->stream, engine);
667 
668         nxt_event_engine_post(engine, &ra->work);
669     }
670 }
671 
672 
673 nxt_inline void
674 nxt_router_ra_error(nxt_req_app_link_t *ra, int code, const char *str)
675 {
676     ra->app_port = NULL;
677     ra->err_code = code;
678     ra->err_str = str;
679 }
680 
681 
682 nxt_inline void
683 nxt_router_ra_pending(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra)
684 {
685     nxt_queue_insert_tail(&ra->app_port->pending_requests,
686                           &ra->link_port_pending);
687     nxt_queue_insert_tail(&app->pending, &ra->link_app_pending);
688 
689     nxt_router_ra_inc_use(ra);
690 
691     ra->res_time = nxt_thread_monotonic_time(task->thread) + app->res_timeout;
692 
693     nxt_debug(task, "ra stream #%uD enqueue to pending_requests", ra->stream);
694 }
695 
696 
697 nxt_inline nxt_bool_t
698 nxt_queue_chk_remove(nxt_queue_link_t *lnk)
699 {
700     if (lnk->next != NULL) {
701         nxt_queue_remove(lnk);
702 
703         lnk->next = NULL;
704 
705         return 1;
706     }
707 
708     return 0;
709 }
710 
711 
712 nxt_inline void
713 nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc)
714 {
715     int                 ra_use_delta;
716     nxt_req_app_link_t  *ra;
717 
718     if (rc->app_port != NULL) {
719         nxt_router_app_port_release(task, rc->app_port, 0, 1);
720 
721         rc->app_port = NULL;
722     }
723 
724     nxt_router_msg_cancel(task, &rc->msg_info, rc->stream);
725 
726     ra = rc->ra;
727 
728     if (ra != NULL) {
729         rc->ra = NULL;
730         ra->rc = NULL;
731 
732         ra_use_delta = 0;
733 
734         nxt_thread_mutex_lock(&rc->app->mutex);
735 
736         if (ra->link_app_requests.next == NULL
737             && ra->link_port_pending.next == NULL
738             && ra->link_app_pending.next == NULL)
739         {
740             ra = NULL;
741 
742         } else {
743             ra_use_delta -= nxt_queue_chk_remove(&ra->link_app_requests);
744             ra_use_delta -= nxt_queue_chk_remove(&ra->link_port_pending);
745             nxt_queue_chk_remove(&ra->link_app_pending);
746         }
747 
748         nxt_thread_mutex_unlock(&rc->app->mutex);
749 
750         if (ra != NULL) {
751             nxt_router_ra_use(task, ra, ra_use_delta);
752         }
753     }
754 
755     if (rc->app != NULL) {
756         nxt_router_app_use(task, rc->app, -1);
757 
758         rc->app = NULL;
759     }
760 
761     if (rc->ap != NULL) {
762         rc->ap->timer_data = NULL;
763 
764         nxt_app_http_req_done(task, rc->ap);
765 
766         rc->ap = NULL;
767     }
768 }
769 
770 
771 void
772 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
773 {
774     nxt_port_new_port_handler(task, msg);
775 
776     if (msg->u.new_port != NULL
777         && msg->u.new_port->type == NXT_PROCESS_CONTROLLER)
778     {
779         nxt_router_greet_controller(task, msg->u.new_port);
780     }
781 
782     if (msg->port_msg.stream == 0) {
783         return;
784     }
785 
786     if (msg->u.new_port == NULL
787         || msg->u.new_port->type != NXT_PROCESS_WORKER)
788     {
789         msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
790     }
791 
792     nxt_port_rpc_handler(task, msg);
793 }
794 
795 
796 void
797 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
798 {
799     nxt_int_t               ret;
800     nxt_buf_t               *b;
801     nxt_router_temp_conf_t  *tmcf;
802 
803     tmcf = nxt_router_temp_conf(task);
804     if (nxt_slow_path(tmcf == NULL)) {
805         return;
806     }
807 
808     nxt_debug(task, "nxt_router_conf_data_handler(%O): %*s",
809               nxt_buf_used_size(msg->buf),
810               (size_t) nxt_buf_used_size(msg->buf), msg->buf->mem.pos);
811 
812     tmcf->router_conf->router = nxt_router;
813     tmcf->stream = msg->port_msg.stream;
814     tmcf->port = nxt_runtime_port_find(task->thread->runtime,
815                                        msg->port_msg.pid,
816                                        msg->port_msg.reply_port);
817 
818     b = nxt_buf_chk_make_plain(tmcf->router_conf->mem_pool,
819                                msg->buf, msg->size);
820     if (nxt_slow_path(b == NULL)) {
821         nxt_router_conf_error(task, tmcf);
822 
823         return;
824     }
825 
826     ret = nxt_router_conf_create(task, tmcf, b->mem.pos, b->mem.free);
827 
828     if (nxt_fast_path(ret == NXT_OK)) {
829         nxt_router_conf_apply(task, tmcf, NULL);
830 
831     } else {
832         nxt_router_conf_error(task, tmcf);
833     }
834 }
835 
836 
837 static void
838 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
839     void *data)
840 {
841     union {
842         nxt_pid_t  removed_pid;
843         void       *data;
844     } u;
845 
846     u.data = data;
847 
848     nxt_port_rpc_remove_peer(task, port, u.removed_pid);
849 }
850 
851 
852 void
853 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
854 {
855     nxt_event_engine_t  *engine;
856 
857     nxt_port_remove_pid_handler(task, msg);
858 
859     if (msg->port_msg.stream == 0) {
860         return;
861     }
862 
863     nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
864     {
865         nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid,
866                       msg->u.data);
867     }
868     nxt_queue_loop;
869 
870     msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
871 
872     nxt_port_rpc_handler(task, msg);
873 }
874 
875 
876 static nxt_router_temp_conf_t *
877 nxt_router_temp_conf(nxt_task_t *task)
878 {
879     nxt_mp_t                *mp, *tmp;
880     nxt_router_conf_t       *rtcf;
881     nxt_router_temp_conf_t  *tmcf;
882 
883     mp = nxt_mp_create(1024, 128, 256, 32);
884     if (nxt_slow_path(mp == NULL)) {
885         return NULL;
886     }
887 
888     rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t));
889     if (nxt_slow_path(rtcf == NULL)) {
890         goto fail;
891     }
892 
893     rtcf->mem_pool = mp;
894 
895     tmp = nxt_mp_create(1024, 128, 256, 32);
896     if (nxt_slow_path(tmp == NULL)) {
897         goto fail;
898     }
899 
900     tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t));
901     if (nxt_slow_path(tmcf == NULL)) {
902         goto temp_fail;
903     }
904 
905     tmcf->mem_pool = tmp;
906     tmcf->router_conf = rtcf;
907     tmcf->count = 1;
908     tmcf->engine = task->thread->engine;
909 
910     tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
911                                      sizeof(nxt_router_engine_conf_t));
912     if (nxt_slow_path(tmcf->engines == NULL)) {
913         goto temp_fail;
914     }
915 
916     nxt_queue_init(&tmcf->deleting);
917     nxt_queue_init(&tmcf->keeping);
918     nxt_queue_init(&tmcf->updating);
919     nxt_queue_init(&tmcf->pending);
920     nxt_queue_init(&tmcf->creating);
921 
922     nxt_queue_init(&tmcf->apps);
923     nxt_queue_init(&tmcf->previous);
924 
925     return tmcf;
926 
927 temp_fail:
928 
929     nxt_mp_destroy(tmp);
930 
931 fail:
932 
933     nxt_mp_destroy(mp);
934 
935     return NULL;
936 }
937 
938 
939 nxt_inline nxt_bool_t
940 nxt_router_app_can_start(nxt_app_t *app)
941 {
942     return app->processes + app->pending_processes < app->max_processes
943             && app->pending_processes < app->max_pending_processes;
944 }
945 
946 
947 nxt_inline nxt_bool_t
948 nxt_router_app_need_start(nxt_app_t *app)
949 {
950     return app->idle_processes + app->pending_processes
951             < app->spare_processes;
952 }
953 
954 
955 static void
956 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
957 {
958     nxt_int_t                    ret;
959     nxt_app_t                    *app;
960     nxt_router_t                 *router;
961     nxt_runtime_t                *rt;
962     nxt_queue_link_t             *qlk;
963     nxt_socket_conf_t            *skcf;
964     nxt_router_conf_t            *rtcf;
965     nxt_router_temp_conf_t       *tmcf;
966     const nxt_event_interface_t  *interface;
967 
968     tmcf = obj;
969 
970     qlk = nxt_queue_first(&tmcf->pending);
971 
972     if (qlk != nxt_queue_tail(&tmcf->pending)) {
973         nxt_queue_remove(qlk);
974         nxt_queue_insert_tail(&tmcf->creating, qlk);
975 
976         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
977 
978         nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
979 
980         return;
981     }
982 
983     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
984 
985         if (nxt_router_app_need_start(app)) {
986             nxt_router_app_rpc_create(task, tmcf, app);
987             return;
988         }
989 
990     } nxt_queue_loop;
991 
992     rtcf = tmcf->router_conf;
993 
994     if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) {
995         nxt_router_access_log_open(task, tmcf);
996         return;
997     }
998 
999     rt = task->thread->runtime;
1000 
1001     interface = nxt_service_get(rt->services, "engine", NULL);
1002 
1003     router = rtcf->router;
1004 
1005     ret = nxt_router_engines_create(task, router, tmcf, interface);
1006     if (nxt_slow_path(ret != NXT_OK)) {
1007         goto fail;
1008     }
1009 
1010     ret = nxt_router_threads_create(task, rt, tmcf);
1011     if (nxt_slow_path(ret != NXT_OK)) {
1012         goto fail;
1013     }
1014 
1015     nxt_router_apps_sort(task, router, tmcf);
1016 
1017     nxt_router_engines_post(router, tmcf);
1018 
1019     nxt_queue_add(&router->sockets, &tmcf->updating);
1020     nxt_queue_add(&router->sockets, &tmcf->creating);
1021 
1022     router->access_log = rtcf->access_log;
1023 
1024     nxt_router_conf_ready(task, tmcf);
1025 
1026     return;
1027 
1028 fail:
1029 
1030     nxt_router_conf_error(task, tmcf);
1031 
1032     return;
1033 }
1034 
1035 
1036 static void
1037 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data)
1038 {
1039     nxt_joint_job_t  *job;
1040 
1041     job = obj;
1042 
1043     nxt_router_conf_ready(task, job->tmcf);
1044 }
1045 
1046 
1047 static void
1048 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1049 {
1050     nxt_debug(task, "temp conf count:%D", tmcf->count);
1051 
1052     if (--tmcf->count == 0) {
1053         nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
1054     }
1055 }
1056 
1057 
1058 static void
1059 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1060 {
1061     nxt_app_t          *app;
1062     nxt_queue_t        new_socket_confs;
1063     nxt_socket_t       s;
1064     nxt_router_t       *router;
1065     nxt_queue_link_t   *qlk;
1066     nxt_socket_conf_t  *skcf;
1067     nxt_router_conf_t  *rtcf;
1068 
1069     nxt_alert(task, "failed to apply new conf");
1070 
1071     for (qlk = nxt_queue_first(&tmcf->creating);
1072          qlk != nxt_queue_tail(&tmcf->creating);
1073          qlk = nxt_queue_next(qlk))
1074     {
1075         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1076         s = skcf->listen->socket;
1077 
1078         if (s != -1) {
1079             nxt_socket_close(task, s);
1080         }
1081 
1082         nxt_free(skcf->listen);
1083     }
1084 
1085     nxt_queue_init(&new_socket_confs);
1086     nxt_queue_add(&new_socket_confs, &tmcf->updating);
1087     nxt_queue_add(&new_socket_confs, &tmcf->pending);
1088     nxt_queue_add(&new_socket_confs, &tmcf->creating);
1089 
1090     nxt_queue_each(skcf, &new_socket_confs, nxt_socket_conf_t, link) {
1091 
1092         if (skcf->application != NULL) {
1093             nxt_router_app_use(task, skcf->application, -1);
1094             skcf->application = NULL;
1095         }
1096 
1097     } nxt_queue_loop;
1098 
1099     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1100 
1101         nxt_router_app_quit(task, app);
1102 
1103     } nxt_queue_loop;
1104 
1105     rtcf = tmcf->router_conf;
1106     router = rtcf->router;
1107 
1108     nxt_queue_add(&router->sockets, &tmcf->keeping);
1109     nxt_queue_add(&router->sockets, &tmcf->deleting);
1110 
1111     nxt_queue_add(&router->apps, &tmcf->previous);
1112 
1113     // TODO: new engines and threads
1114 
1115     nxt_router_access_log_release(task, &router->lock, rtcf->access_log);
1116 
1117     nxt_mp_destroy(rtcf->mem_pool);
1118 
1119     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
1120 }
1121 
1122 
1123 static void
1124 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1125     nxt_port_msg_type_t type)
1126 {
1127     nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);
1128 }
1129 
1130 
1131 static nxt_conf_map_t  nxt_router_conf[] = {
1132     {
1133         nxt_string("listeners_threads"),
1134         NXT_CONF_MAP_INT32,
1135         offsetof(nxt_router_conf_t, threads),
1136     },
1137 };
1138 
1139 
1140 static nxt_conf_map_t  nxt_router_app_conf[] = {
1141     {
1142         nxt_string("type"),
1143         NXT_CONF_MAP_STR,
1144         offsetof(nxt_router_app_conf_t, type),
1145     },
1146 
1147     {
1148         nxt_string("limits"),
1149         NXT_CONF_MAP_PTR,
1150         offsetof(nxt_router_app_conf_t, limits_value),
1151     },
1152 
1153     {
1154         nxt_string("processes"),
1155         NXT_CONF_MAP_INT32,
1156         offsetof(nxt_router_app_conf_t, processes),
1157     },
1158 
1159     {
1160         nxt_string("processes"),
1161         NXT_CONF_MAP_PTR,
1162         offsetof(nxt_router_app_conf_t, processes_value),
1163     },
1164 };
1165 
1166 
1167 static nxt_conf_map_t  nxt_router_app_limits_conf[] = {
1168     {
1169         nxt_string("timeout"),
1170         NXT_CONF_MAP_MSEC,
1171         offsetof(nxt_router_app_conf_t, timeout),
1172     },
1173 
1174     {
1175         nxt_string("reschedule_timeout"),
1176         NXT_CONF_MAP_MSEC,
1177         offsetof(nxt_router_app_conf_t, res_timeout),
1178     },
1179 
1180     {
1181         nxt_string("requests"),
1182         NXT_CONF_MAP_INT32,
1183         offsetof(nxt_router_app_conf_t, requests),
1184     },
1185 };
1186 
1187 
1188 static nxt_conf_map_t  nxt_router_app_processes_conf[] = {
1189     {
1190         nxt_string("spare"),
1191         NXT_CONF_MAP_INT32,
1192         offsetof(nxt_router_app_conf_t, spare_processes),
1193     },
1194 
1195     {
1196         nxt_string("max"),
1197         NXT_CONF_MAP_INT32,
1198         offsetof(nxt_router_app_conf_t, max_processes),
1199     },
1200 
1201     {
1202         nxt_string("idle_timeout"),
1203         NXT_CONF_MAP_MSEC,
1204         offsetof(nxt_router_app_conf_t, idle_timeout),
1205     },
1206 };
1207 
1208 
1209 static nxt_conf_map_t  nxt_router_listener_conf[] = {
1210     {
1211         nxt_string("application"),
1212         NXT_CONF_MAP_STR,
1213         offsetof(nxt_router_listener_conf_t, application),
1214     },
1215 };
1216 
1217 
1218 static nxt_conf_map_t  nxt_router_http_conf[] = {
1219     {
1220         nxt_string("header_buffer_size"),
1221         NXT_CONF_MAP_SIZE,
1222         offsetof(nxt_socket_conf_t, header_buffer_size),
1223     },
1224 
1225     {
1226         nxt_string("large_header_buffer_size"),
1227         NXT_CONF_MAP_SIZE,
1228         offsetof(nxt_socket_conf_t, large_header_buffer_size),
1229     },
1230 
1231     {
1232         nxt_string("large_header_buffers"),
1233         NXT_CONF_MAP_SIZE,
1234         offsetof(nxt_socket_conf_t, large_header_buffers),
1235     },
1236 
1237     {
1238         nxt_string("body_buffer_size"),
1239         NXT_CONF_MAP_SIZE,
1240         offsetof(nxt_socket_conf_t, body_buffer_size),
1241     },
1242 
1243     {
1244         nxt_string("max_body_size"),
1245         NXT_CONF_MAP_SIZE,
1246         offsetof(nxt_socket_conf_t, max_body_size),
1247     },
1248 
1249     {
1250         nxt_string("idle_timeout"),
1251         NXT_CONF_MAP_MSEC,
1252         offsetof(nxt_socket_conf_t, idle_timeout),
1253     },
1254 
1255     {
1256         nxt_string("header_read_timeout"),
1257         NXT_CONF_MAP_MSEC,
1258         offsetof(nxt_socket_conf_t, header_read_timeout),
1259     },
1260 
1261     {
1262         nxt_string("body_read_timeout"),
1263         NXT_CONF_MAP_MSEC,
1264         offsetof(nxt_socket_conf_t, body_read_timeout),
1265     },
1266 
1267     {
1268         nxt_string("send_timeout"),
1269         NXT_CONF_MAP_MSEC,
1270         offsetof(nxt_socket_conf_t, send_timeout),
1271     },
1272 };
1273 
1274 
1275 static nxt_int_t
1276 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1277     u_char *start, u_char *end)
1278 {
1279     u_char                      *p;
1280     size_t                      size;
1281     nxt_mp_t                    *mp;
1282     uint32_t                    next;
1283     nxt_int_t                   ret;
1284     nxt_str_t                   name, path;
1285     nxt_app_t                   *app, *prev;
1286     nxt_router_t                *router;
1287     nxt_conf_value_t            *conf, *http, *value;
1288     nxt_conf_value_t            *applications, *application;
1289     nxt_conf_value_t            *listeners, *listener;
1290     nxt_socket_conf_t           *skcf;
1291     nxt_event_engine_t          *engine;
1292     nxt_app_lang_module_t       *lang;
1293     nxt_router_app_conf_t       apcf;
1294     nxt_router_access_log_t     *access_log;
1295     nxt_router_listener_conf_t  lscf;
1296 
1297     static nxt_str_t  http_path = nxt_string("/settings/http");
1298     static nxt_str_t  applications_path = nxt_string("/applications");
1299     static nxt_str_t  listeners_path = nxt_string("/listeners");
1300     static nxt_str_t  access_log_path = nxt_string("/access_log");
1301 
1302     conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
1303     if (conf == NULL) {
1304         nxt_alert(task, "configuration parsing error");
1305         return NXT_ERROR;
1306     }
1307 
1308     mp = tmcf->router_conf->mem_pool;
1309 
1310     ret = nxt_conf_map_object(mp, conf, nxt_router_conf,
1311                               nxt_nitems(nxt_router_conf), tmcf->router_conf);
1312     if (ret != NXT_OK) {
1313         nxt_alert(task, "root map error");
1314         return NXT_ERROR;
1315     }
1316 
1317     if (tmcf->router_conf->threads == 0) {
1318         tmcf->router_conf->threads = nxt_ncpu;
1319     }
1320 
1321     applications = nxt_conf_get_path(conf, &applications_path);
1322     if (applications == NULL) {
1323         nxt_alert(task, "no \"applications\" block");
1324         return NXT_ERROR;
1325     }
1326 
1327     router = tmcf->router_conf->router;
1328 
1329     next = 0;
1330 
1331     for ( ;; ) {
1332         application = nxt_conf_next_object_member(applications, &name, &next);
1333         if (application == NULL) {
1334             break;
1335         }
1336 
1337         nxt_debug(task, "application \"%V\"", &name);
1338 
1339         size = nxt_conf_json_length(application, NULL);
1340 
1341         app = nxt_malloc(sizeof(nxt_app_t) + name.length + size);
1342         if (app == NULL) {
1343             goto fail;
1344         }
1345 
1346         nxt_memzero(app, sizeof(nxt_app_t));
1347 
1348         app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t));
1349         app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t) + name.length);
1350 
1351         p = nxt_conf_json_print(app->conf.start, application, NULL);
1352         app->conf.length = p - app->conf.start;
1353 
1354         nxt_assert(app->conf.length <= size);
1355 
1356         nxt_debug(task, "application conf \"%V\"", &app->conf);
1357 
1358         prev = nxt_router_app_find(&router->apps, &name);
1359 
1360         if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
1361             nxt_free(app);
1362 
1363             nxt_queue_remove(&prev->link);
1364             nxt_queue_insert_tail(&tmcf->previous, &prev->link);
1365             continue;
1366         }
1367 
1368         apcf.processes = 1;
1369         apcf.max_processes = 1;
1370         apcf.spare_processes = 0;
1371         apcf.timeout = 0;
1372         apcf.res_timeout = 1000;
1373         apcf.idle_timeout = 15000;
1374         apcf.requests = 0;
1375         apcf.limits_value = NULL;
1376         apcf.processes_value = NULL;
1377 
1378         ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
1379                                   nxt_nitems(nxt_router_app_conf), &apcf);
1380         if (ret != NXT_OK) {
1381             nxt_alert(task, "application map error");
1382             goto app_fail;
1383         }
1384 
1385         if (apcf.limits_value != NULL) {
1386 
1387             if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) {
1388                 nxt_alert(task, "application limits is not object");
1389                 goto app_fail;
1390             }
1391 
1392             ret = nxt_conf_map_object(mp, apcf.limits_value,
1393                                       nxt_router_app_limits_conf,
1394                                       nxt_nitems(nxt_router_app_limits_conf),
1395                                       &apcf);
1396             if (ret != NXT_OK) {
1397                 nxt_alert(task, "application limits map error");
1398                 goto app_fail;
1399             }
1400         }
1401 
1402         if (apcf.processes_value != NULL
1403             && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT)
1404         {
1405             ret = nxt_conf_map_object(mp, apcf.processes_value,
1406                                       nxt_router_app_processes_conf,
1407                                       nxt_nitems(nxt_router_app_processes_conf),
1408                                       &apcf);
1409             if (ret != NXT_OK) {
1410                 nxt_alert(task, "application processes map error");
1411                 goto app_fail;
1412             }
1413 
1414         } else {
1415             apcf.max_processes = apcf.processes;
1416             apcf.spare_processes = apcf.processes;
1417         }
1418 
1419         nxt_debug(task, "application type: %V", &apcf.type);
1420         nxt_debug(task, "application processes: %D", apcf.processes);
1421         nxt_debug(task, "application request timeout: %M", apcf.timeout);
1422         nxt_debug(task, "application reschedule timeout: %M", apcf.res_timeout);
1423         nxt_debug(task, "application requests: %D", apcf.requests);
1424 
1425         lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
1426 
1427         if (lang == NULL) {
1428             nxt_alert(task, "unknown application type: \"%V\"", &apcf.type);
1429             goto app_fail;
1430         }
1431 
1432         nxt_debug(task, "application language module: \"%s\"", lang->file);
1433 
1434         ret = nxt_thread_mutex_create(&app->mutex);
1435         if (ret != NXT_OK) {
1436             goto app_fail;
1437         }
1438 
1439         nxt_queue_init(&app->ports);
1440         nxt_queue_init(&app->spare_ports);
1441         nxt_queue_init(&app->idle_ports);
1442         nxt_queue_init(&app->requests);
1443         nxt_queue_init(&app->pending);
1444 
1445         app->name.length = name.length;
1446         nxt_memcpy(app->name.start, name.start, name.length);
1447 
1448         app->type = lang->type;
1449         app->max_processes = apcf.max_processes;
1450         app->spare_processes = apcf.spare_processes;
1451         app->max_pending_processes = apcf.spare_processes
1452                                       ? apcf.spare_processes : 1;
1453         app->timeout = apcf.timeout;
1454         app->res_timeout = apcf.res_timeout * 1000000;
1455         app->idle_timeout = apcf.idle_timeout;
1456         app->live = 1;
1457         app->max_pending_responses = 2;
1458         app->max_requests = apcf.requests;
1459 
1460         engine = task->thread->engine;
1461 
1462         app->engine = engine;
1463 
1464         app->idle_timer.precision = NXT_TIMER_DEFAULT_PRECISION;
1465         app->idle_timer.work_queue = &engine->fast_work_queue;
1466         app->idle_timer.handler = nxt_router_app_idle_timeout;
1467         app->idle_timer.task = &engine->task;
1468         app->idle_timer.log = app->idle_timer.task->log;
1469 
1470         app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;
1471         app->adjust_idle_work.task = &engine->task;
1472         app->adjust_idle_work.obj = app;
1473 
1474         nxt_queue_insert_tail(&tmcf->apps, &app->link);
1475 
1476         nxt_router_app_use(task, app, 1);
1477     }
1478 
1479     http = nxt_conf_get_path(conf, &http_path);
1480 #if 0
1481     if (http == NULL) {
1482         nxt_alert(task, "no \"http\" block");
1483         return NXT_ERROR;
1484     }
1485 #endif
1486 
1487     listeners = nxt_conf_get_path(conf, &listeners_path);
1488     if (listeners == NULL) {
1489         nxt_alert(task, "no \"listeners\" block");
1490         return NXT_ERROR;
1491     }
1492 
1493     next = 0;
1494 
1495     for ( ;; ) {
1496         listener = nxt_conf_next_object_member(listeners, &name, &next);
1497         if (listener == NULL) {
1498             break;
1499         }
1500 
1501         skcf = nxt_router_socket_conf(task, tmcf, &name);
1502         if (skcf == NULL) {
1503             goto fail;
1504         }
1505 
1506         ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
1507                                   nxt_nitems(nxt_router_listener_conf), &lscf);
1508         if (ret != NXT_OK) {
1509             nxt_alert(task, "listener map error");
1510             goto fail;
1511         }
1512 
1513         nxt_debug(task, "application: %V", &lscf.application);
1514 
1515         // STUB, default values if http block is not defined.
1516         skcf->header_buffer_size = 2048;
1517         skcf->large_header_buffer_size = 8192;
1518         skcf->large_header_buffers = 4;
1519         skcf->body_buffer_size = 16 * 1024;
1520         skcf->max_body_size = 8 * 1024 * 1024;
1521         skcf->idle_timeout = 180 * 1000;
1522         skcf->header_read_timeout = 30 * 1000;
1523         skcf->body_read_timeout = 30 * 1000;
1524         skcf->send_timeout = 30 * 1000;
1525 
1526         if (http != NULL) {
1527             ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
1528                                       nxt_nitems(nxt_router_http_conf), skcf);
1529             if (ret != NXT_OK) {
1530                 nxt_alert(task, "http map error");
1531                 goto fail;
1532             }
1533         }
1534 
1535         skcf->listen->handler = nxt_http_conn_init;
1536         skcf->router_conf = tmcf->router_conf;
1537         skcf->router_conf->count++;
1538         skcf->application = nxt_router_listener_application(tmcf,
1539                                                             &lscf.application);
1540         nxt_router_app_use(task, skcf->application, 1);
1541     }
1542 
1543     value = nxt_conf_get_path(conf, &access_log_path);
1544 
1545     if (value != NULL) {
1546         nxt_conf_get_string(value, &path);
1547 
1548         access_log = router->access_log;
1549 
1550         if (access_log != NULL && nxt_strstr_eq(&path, &access_log->path)) {
1551             nxt_thread_spin_lock(&router->lock);
1552             access_log->count++;
1553             nxt_thread_spin_unlock(&router->lock);
1554 
1555         } else {
1556             access_log = nxt_malloc(sizeof(nxt_router_access_log_t)
1557                                     + path.length);
1558             if (access_log == NULL) {
1559                 nxt_alert(task, "failed to allocate access log structure");
1560                 goto fail;
1561             }
1562 
1563             access_log->fd = -1;
1564             access_log->handler = &nxt_router_access_log_writer;
1565             access_log->count = 1;
1566 
1567             access_log->path.length = path.length;
1568             access_log->path.start = (u_char *) access_log
1569                                      + sizeof(nxt_router_access_log_t);
1570 
1571             nxt_memcpy(access_log->path.start, path.start, path.length);
1572         }
1573 
1574         tmcf->router_conf->access_log = access_log;
1575     }
1576 
1577     nxt_queue_add(&tmcf->deleting, &router->sockets);
1578     nxt_queue_init(&router->sockets);
1579 
1580     return NXT_OK;
1581 
1582 app_fail:
1583 
1584     nxt_free(app);
1585 
1586 fail:
1587 
1588     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1589 
1590         nxt_queue_remove(&app->link);
1591         nxt_thread_mutex_destroy(&app->mutex);
1592         nxt_free(app);
1593 
1594     } nxt_queue_loop;
1595 
1596     return NXT_ERROR;
1597 }
1598 
1599 
1600 static nxt_app_t *
1601 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
1602 {
1603     nxt_app_t  *app;
1604 
1605     nxt_queue_each(app, queue, nxt_app_t, link) {
1606 
1607         if (nxt_strstr_eq(name, &app->name)) {
1608             return app;
1609         }
1610 
1611     } nxt_queue_loop;
1612 
1613     return NULL;
1614 }
1615 
1616 
1617 static nxt_app_t *
1618 nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name)
1619 {
1620     nxt_app_t  *app;
1621 
1622     app = nxt_router_app_find(&tmcf->apps, name);
1623 
1624     if (app == NULL) {
1625         app = nxt_router_app_find(&tmcf->previous, name);
1626     }
1627 
1628     return app;
1629 }
1630 
1631 
1632 static nxt_socket_conf_t *
1633 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1634     nxt_str_t *name)
1635 {
1636     size_t               size;
1637     nxt_int_t            ret;
1638     nxt_bool_t           wildcard;
1639     nxt_sockaddr_t       *sa;
1640     nxt_socket_conf_t    *skcf;
1641     nxt_listen_socket_t  *ls;
1642 
1643     sa = nxt_sockaddr_parse(tmcf->mem_pool, name);
1644     if (nxt_slow_path(sa == NULL)) {
1645         nxt_alert(task, "invalid listener \"%V\"", name);
1646         return NULL;
1647     }
1648 
1649     sa->type = SOCK_STREAM;
1650 
1651     nxt_debug(task, "router listener: \"%*s\"",
1652               (size_t) sa->length, nxt_sockaddr_start(sa));
1653 
1654     skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t));
1655     if (nxt_slow_path(skcf == NULL)) {
1656         return NULL;
1657     }
1658 
1659     size = nxt_sockaddr_size(sa);
1660 
1661     ret = nxt_router_listen_socket_find(tmcf, skcf, sa);
1662 
1663     if (ret != NXT_OK) {
1664 
1665         ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size);
1666         if (nxt_slow_path(ls == NULL)) {
1667             return NULL;
1668         }
1669 
1670         skcf->listen = ls;
1671 
1672         ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t));
1673         nxt_memcpy(ls->sockaddr, sa, size);
1674 
1675         nxt_listen_socket_remote_size(ls);
1676 
1677         ls->socket = -1;
1678         ls->backlog = NXT_LISTEN_BACKLOG;
1679         ls->flags = NXT_NONBLOCK;
1680         ls->read_after_accept = 1;
1681     }
1682 
1683     switch (sa->u.sockaddr.sa_family) {
1684 #if (NXT_HAVE_UNIX_DOMAIN)
1685     case AF_UNIX:
1686         wildcard = 0;
1687         break;
1688 #endif
1689 #if (NXT_INET6)
1690     case AF_INET6:
1691         wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr);
1692         break;
1693 #endif
1694     case AF_INET:
1695     default:
1696         wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY);
1697         break;
1698     }
1699 
1700     if (!wildcard) {
1701         skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size);
1702         if (nxt_slow_path(skcf->sockaddr == NULL)) {
1703             return NULL;
1704         }
1705 
1706         nxt_memcpy(skcf->sockaddr, sa, size);
1707     }
1708 
1709     return skcf;
1710 }
1711 
1712 
1713 static nxt_int_t
1714 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
1715     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa)
1716 {
1717     nxt_router_t       *router;
1718     nxt_queue_link_t   *qlk;
1719     nxt_socket_conf_t  *skcf;
1720 
1721     router = tmcf->router_conf->router;
1722 
1723     for (qlk = nxt_queue_first(&router->sockets);
1724          qlk != nxt_queue_tail(&router->sockets);
1725          qlk = nxt_queue_next(qlk))
1726     {
1727         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1728 
1729         if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) {
1730             nskcf->listen = skcf->listen;
1731 
1732             nxt_queue_remove(qlk);
1733             nxt_queue_insert_tail(&tmcf->keeping, qlk);
1734 
1735             nxt_queue_insert_tail(&tmcf->updating, &nskcf->link);
1736 
1737             return NXT_OK;
1738         }
1739     }
1740 
1741     nxt_queue_insert_tail(&tmcf->pending, &nskcf->link);
1742 
1743     return NXT_DECLINED;
1744 }
1745 
1746 
1747 static void
1748 nxt_router_listen_socket_rpc_create(nxt_task_t *task,
1749     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)
1750 {
1751     size_t            size;
1752     uint32_t          stream;
1753     nxt_int_t         ret;
1754     nxt_buf_t         *b;
1755     nxt_port_t        *main_port, *router_port;
1756     nxt_runtime_t     *rt;
1757     nxt_socket_rpc_t  *rpc;
1758 
1759     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
1760     if (rpc == NULL) {
1761         goto fail;
1762     }
1763 
1764     rpc->socket_conf = skcf;
1765     rpc->temp_conf = tmcf;
1766 
1767     size = nxt_sockaddr_size(skcf->listen->sockaddr);
1768 
1769     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
1770     if (b == NULL) {
1771         goto fail;
1772     }
1773 
1774     b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size);
1775 
1776     rt = task->thread->runtime;
1777     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
1778     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
1779 
1780     stream = nxt_port_rpc_register_handler(task, router_port,
1781                                            nxt_router_listen_socket_ready,
1782                                            nxt_router_listen_socket_error,
1783                                            main_port->pid, rpc);
1784     if (nxt_slow_path(stream == 0)) {
1785         goto fail;
1786     }
1787 
1788     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1,
1789                                 stream, router_port->id, b);
1790 
1791     if (nxt_slow_path(ret != NXT_OK)) {
1792         nxt_port_rpc_cancel(task, router_port, stream);
1793         goto fail;
1794     }
1795 
1796     return;
1797 
1798 fail:
1799 
1800     nxt_router_conf_error(task, tmcf);
1801 }
1802 
1803 
1804 static void
1805 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
1806     void *data)
1807 {
1808     nxt_int_t         ret;
1809     nxt_socket_t      s;
1810     nxt_socket_rpc_t  *rpc;
1811 
1812     rpc = data;
1813 
1814     s = msg->fd;
1815 
1816     ret = nxt_socket_nonblocking(task, s);
1817     if (nxt_slow_path(ret != NXT_OK)) {
1818         goto fail;
1819     }
1820 
1821     nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr);
1822 
1823     ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
1824     if (nxt_slow_path(ret != NXT_OK)) {
1825         goto fail;
1826     }
1827 
1828     rpc->socket_conf->listen->socket = s;
1829 
1830     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
1831                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
1832 
1833     return;
1834 
1835 fail:
1836 
1837     nxt_socket_close(task, s);
1838 
1839     nxt_router_conf_error(task, rpc->temp_conf);
1840 }
1841 
1842 
1843 static void
1844 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
1845     void *data)
1846 {
1847     u_char                  *p;
1848     size_t                  size;
1849     uint8_t                 error;
1850     nxt_buf_t               *in, *out;
1851     nxt_sockaddr_t          *sa;
1852     nxt_socket_rpc_t        *rpc;
1853     nxt_router_temp_conf_t  *tmcf;
1854 
1855     static nxt_str_t  socket_errors[] = {
1856         nxt_string("ListenerSystem"),
1857         nxt_string("ListenerNoIPv6"),
1858         nxt_string("ListenerPort"),
1859         nxt_string("ListenerInUse"),
1860         nxt_string("ListenerNoAddress"),
1861         nxt_string("ListenerNoAccess"),
1862         nxt_string("ListenerPath"),
1863     };
1864 
1865     rpc = data;
1866     sa = rpc->socket_conf->listen->sockaddr;
1867     tmcf = rpc->temp_conf;
1868 
1869     in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size);
1870 
1871     if (nxt_slow_path(in == NULL)) {
1872         return;
1873     }
1874 
1875     p = in->mem.pos;
1876 
1877     error = *p++;
1878 
1879     size = nxt_length("listen socket error: ")
1880            + nxt_length("{listener: \"\", code:\"\", message: \"\"}")
1881            + sa->length + socket_errors[error].length + (in->mem.free - p);
1882 
1883     out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
1884     if (nxt_slow_path(out == NULL)) {
1885         return;
1886     }
1887 
1888     out->mem.free = nxt_sprintf(out->mem.free, out->mem.end,
1889                         "listen socket error: "
1890                         "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}",
1891                         (size_t) sa->length, nxt_sockaddr_start(sa),
1892                         &socket_errors[error], in->mem.free - p, p);
1893 
1894     nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos);
1895 
1896     nxt_router_conf_error(task, tmcf);
1897 }
1898 
1899 
1900 static void
1901 nxt_router_app_rpc_create(nxt_task_t *task,
1902     nxt_router_temp_conf_t *tmcf, nxt_app_t *app)
1903 {
1904     size_t         size;
1905     uint32_t       stream;
1906     nxt_int_t      ret;
1907     nxt_buf_t      *b;
1908     nxt_port_t     *main_port, *router_port;
1909     nxt_runtime_t  *rt;
1910     nxt_app_rpc_t  *rpc;
1911 
1912     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_app_rpc_t));
1913     if (rpc == NULL) {
1914         goto fail;
1915     }
1916 
1917     rpc->app = app;
1918     rpc->temp_conf = tmcf;
1919 
1920     nxt_debug(task, "app '%V' prefork", &app->name);
1921 
1922     size = app->name.length + 1 + app->conf.length;
1923 
1924     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
1925     if (nxt_slow_path(b == NULL)) {
1926         goto fail;
1927     }
1928 
1929     nxt_buf_cpystr(b, &app->name);
1930     *b->mem.free++ = '\0';
1931     nxt_buf_cpystr(b, &app->conf);
1932 
1933     rt = task->thread->runtime;
1934     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
1935     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
1936 
1937     stream = nxt_port_rpc_register_handler(task, router_port,
1938                                            nxt_router_app_prefork_ready,
1939                                            nxt_router_app_prefork_error,
1940                                            -1, rpc);
1941     if (nxt_slow_path(stream == 0)) {
1942         goto fail;
1943     }
1944 
1945     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1,
1946                                 stream, router_port->id, b);
1947 
1948     if (nxt_slow_path(ret != NXT_OK)) {
1949         nxt_port_rpc_cancel(task, router_port, stream);
1950         goto fail;
1951     }
1952 
1953     app->pending_processes++;
1954 
1955     return;
1956 
1957 fail:
1958 
1959     nxt_router_conf_error(task, tmcf);
1960 }
1961 
1962 
1963 static void
1964 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
1965     void *data)
1966 {
1967     nxt_app_t           *app;
1968     nxt_port_t          *port;
1969     nxt_app_rpc_t       *rpc;
1970     nxt_event_engine_t  *engine;
1971 
1972     rpc = data;
1973     app = rpc->app;
1974 
1975     port = msg->u.new_port;
1976     port->app = app;
1977 
1978     nxt_router_app_use(task, app, 1);
1979 
1980     app->pending_processes--;
1981     app->processes++;
1982     app->idle_processes++;
1983 
1984     engine = task->thread->engine;
1985 
1986     nxt_queue_insert_tail(&app->ports, &port->app_link);
1987     nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
1988 
1989     port->idle_start = 0;
1990 
1991     nxt_port_inc_use(port);
1992 
1993     nxt_work_queue_add(&engine->fast_work_queue,
1994                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
1995 }
1996 
1997 
1998 static void
1999 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2000     void *data)
2001 {
2002     nxt_app_t               *app;
2003     nxt_app_rpc_t           *rpc;
2004     nxt_router_temp_conf_t  *tmcf;
2005 
2006     rpc = data;
2007     app = rpc->app;
2008     tmcf = rpc->temp_conf;
2009 
2010     nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"",
2011             &app->name);
2012 
2013     app->pending_processes--;
2014 
2015     nxt_router_conf_error(task, tmcf);
2016 }
2017 
2018 
2019 static nxt_int_t
2020 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
2021     nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
2022 {
2023     nxt_int_t                 ret;
2024     nxt_uint_t                n, threads;
2025     nxt_queue_link_t          *qlk;
2026     nxt_router_engine_conf_t  *recf;
2027 
2028     threads = tmcf->router_conf->threads;
2029 
2030     tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
2031                                      sizeof(nxt_router_engine_conf_t));
2032     if (nxt_slow_path(tmcf->engines == NULL)) {
2033         return NXT_ERROR;
2034     }
2035 
2036     n = 0;
2037 
2038     for (qlk = nxt_queue_first(&router->engines);
2039          qlk != nxt_queue_tail(&router->engines);
2040          qlk = nxt_queue_next(qlk))
2041     {
2042         recf = nxt_array_zero_add(tmcf->engines);
2043         if (nxt_slow_path(recf == NULL)) {
2044             return NXT_ERROR;
2045         }
2046 
2047         recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0);
2048 
2049         if (n < threads) {
2050             recf->action = NXT_ROUTER_ENGINE_KEEP;
2051             ret = nxt_router_engine_conf_update(tmcf, recf);
2052 
2053         } else {
2054             recf->action = NXT_ROUTER_ENGINE_DELETE;
2055             ret = nxt_router_engine_conf_delete(tmcf, recf);
2056         }
2057 
2058         if (nxt_slow_path(ret != NXT_OK)) {
2059             return ret;
2060         }
2061 
2062         n++;
2063     }
2064 
2065     tmcf->new_threads = n;
2066 
2067     while (n < threads) {
2068         recf = nxt_array_zero_add(tmcf->engines);
2069         if (nxt_slow_path(recf == NULL)) {
2070             return NXT_ERROR;
2071         }
2072 
2073         recf->action = NXT_ROUTER_ENGINE_ADD;
2074 
2075         recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
2076         if (nxt_slow_path(recf->engine == NULL)) {
2077             return NXT_ERROR;
2078         }
2079 
2080         ret = nxt_router_engine_conf_create(tmcf, recf);
2081         if (nxt_slow_path(ret != NXT_OK)) {
2082             return ret;
2083         }
2084 
2085         n++;
2086     }
2087 
2088     return NXT_OK;
2089 }
2090 
2091 
2092 static nxt_int_t
2093 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
2094     nxt_router_engine_conf_t *recf)
2095 {
2096     nxt_int_t  ret;
2097 
2098     ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating,
2099                                           nxt_router_listen_socket_create);
2100     if (nxt_slow_path(ret != NXT_OK)) {
2101         return ret;
2102     }
2103 
2104     ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating,
2105                                           nxt_router_listen_socket_create);
2106     if (nxt_slow_path(ret != NXT_OK)) {
2107         return ret;
2108     }
2109 
2110     return ret;
2111 }
2112 
2113 
2114 static nxt_int_t
2115 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
2116     nxt_router_engine_conf_t *recf)
2117 {
2118     nxt_int_t  ret;
2119 
2120     ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating,
2121                                           nxt_router_listen_socket_create);
2122     if (nxt_slow_path(ret != NXT_OK)) {
2123         return ret;
2124     }
2125 
2126     ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating,
2127                                           nxt_router_listen_socket_update);
2128     if (nxt_slow_path(ret != NXT_OK)) {
2129         return ret;
2130     }
2131 
2132     ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting);
2133     if (nxt_slow_path(ret != NXT_OK)) {
2134         return ret;
2135     }
2136 
2137     return ret;
2138 }
2139 
2140 
2141 static nxt_int_t
2142 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
2143     nxt_router_engine_conf_t *recf)
2144 {
2145     nxt_int_t  ret;
2146 
2147     ret = nxt_router_engine_quit(tmcf, recf);
2148     if (nxt_slow_path(ret != NXT_OK)) {
2149         return ret;
2150     }
2151 
2152     ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->updating);
2153     if (nxt_slow_path(ret != NXT_OK)) {
2154         return ret;
2155     }
2156 
2157     return nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting);
2158 }
2159 
2160 
2161 static nxt_int_t
2162 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
2163     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
2164     nxt_work_handler_t handler)
2165 {
2166     nxt_joint_job_t          *job;
2167     nxt_queue_link_t         *qlk;
2168     nxt_socket_conf_t        *skcf;
2169     nxt_socket_conf_joint_t  *joint;
2170 
2171     for (qlk = nxt_queue_first(sockets);
2172          qlk != nxt_queue_tail(sockets);
2173          qlk = nxt_queue_next(qlk))
2174     {
2175         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
2176         if (nxt_slow_path(job == NULL)) {
2177             return NXT_ERROR;
2178         }
2179 
2180         job->work.next = recf->jobs;
2181         recf->jobs = &job->work;
2182 
2183         job->task = tmcf->engine->task;
2184         job->work.handler = handler;
2185         job->work.task = &job->task;
2186         job->work.obj = job;
2187         job->tmcf = tmcf;
2188 
2189         tmcf->count++;
2190 
2191         joint = nxt_mp_alloc(tmcf->router_conf->mem_pool,
2192                              sizeof(nxt_socket_conf_joint_t));
2193         if (nxt_slow_path(joint == NULL)) {
2194             return NXT_ERROR;
2195         }
2196 
2197         job->work.data = joint;
2198 
2199         joint->count = 1;
2200 
2201         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2202         skcf->count++;
2203         joint->socket_conf = skcf;
2204 
2205         joint->engine = recf->engine;
2206     }
2207 
2208     return NXT_OK;
2209 }
2210 
2211 
2212 static nxt_int_t
2213 nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
2214     nxt_router_engine_conf_t *recf)
2215 {
2216     nxt_joint_job_t  *job;
2217 
2218     job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
2219     if (nxt_slow_path(job == NULL)) {
2220         return NXT_ERROR;
2221     }
2222 
2223     job->work.next = recf->jobs;
2224     recf->jobs = &job->work;
2225 
2226     job->task = tmcf->engine->task;
2227     job->work.handler = nxt_router_worker_thread_quit;
2228     job->work.task = &job->task;
2229     job->work.obj = NULL;
2230     job->work.data = NULL;
2231     job->tmcf = NULL;
2232 
2233     return NXT_OK;
2234 }
2235 
2236 
2237 static nxt_int_t
2238 nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
2239     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets)
2240 {
2241     nxt_joint_job_t   *job;
2242     nxt_queue_link_t  *qlk;
2243 
2244     for (qlk = nxt_queue_first(sockets);
2245          qlk != nxt_queue_tail(sockets);
2246          qlk = nxt_queue_next(qlk))
2247     {
2248         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
2249         if (nxt_slow_path(job == NULL)) {
2250             return NXT_ERROR;
2251         }
2252 
2253         job->work.next = recf->jobs;
2254         recf->jobs = &job->work;
2255 
2256         job->task = tmcf->engine->task;
2257         job->work.handler = nxt_router_listen_socket_delete;
2258         job->work.task = &job->task;
2259         job->work.obj = job;
2260         job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2261         job->tmcf = tmcf;
2262 
2263         tmcf->count++;
2264     }
2265 
2266     return NXT_OK;
2267 }
2268 
2269 
2270 static nxt_int_t
2271 nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
2272     nxt_router_temp_conf_t *tmcf)
2273 {
2274     nxt_int_t                 ret;
2275     nxt_uint_t                i, threads;
2276     nxt_router_engine_conf_t  *recf;
2277 
2278     recf = tmcf->engines->elts;
2279     threads = tmcf->router_conf->threads;
2280 
2281     for (i = tmcf->new_threads; i < threads; i++) {
2282         ret = nxt_router_thread_create(task, rt, recf[i].engine);
2283         if (nxt_slow_path(ret != NXT_OK)) {
2284             return ret;
2285         }
2286     }
2287 
2288     return NXT_OK;
2289 }
2290 
2291 
2292 static nxt_int_t
2293 nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
2294     nxt_event_engine_t *engine)
2295 {
2296     nxt_int_t            ret;
2297     nxt_thread_link_t    *link;
2298     nxt_thread_handle_t  handle;
2299 
2300     link = nxt_zalloc(sizeof(nxt_thread_link_t));
2301 
2302     if (nxt_slow_path(link == NULL)) {
2303         return NXT_ERROR;
2304     }
2305 
2306     link->start = nxt_router_thread_start;
2307     link->engine = engine;
2308     link->work.handler = nxt_router_thread_exit_handler;
2309     link->work.task = task;
2310     link->work.data = link;
2311 
2312     nxt_queue_insert_tail(&rt->engines, &engine->link);
2313 
2314     ret = nxt_thread_create(&handle, link);
2315 
2316     if (nxt_slow_path(ret != NXT_OK)) {
2317         nxt_queue_remove(&engine->link);
2318     }
2319 
2320     return ret;
2321 }
2322 
2323 
2324 static void
2325 nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
2326     nxt_router_temp_conf_t *tmcf)
2327 {
2328     nxt_app_t  *app;
2329 
2330     nxt_queue_each(app, &router->apps, nxt_app_t, link) {
2331 
2332         nxt_router_app_quit(task, app);
2333 
2334     } nxt_queue_loop;
2335 
2336     nxt_queue_add(&router->apps, &tmcf->previous);
2337     nxt_queue_add(&router->apps, &tmcf->apps);
2338 }
2339 
2340 
2341 static void
2342 nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf)
2343 {
2344     nxt_uint_t                n;
2345     nxt_event_engine_t        *engine;
2346     nxt_router_engine_conf_t  *recf;
2347 
2348     recf = tmcf->engines->elts;
2349 
2350     for (n = tmcf->engines->nelts; n != 0; n--) {
2351         engine = recf->engine;
2352 
2353         switch (recf->action) {
2354 
2355         case NXT_ROUTER_ENGINE_KEEP:
2356             break;
2357 
2358         case NXT_ROUTER_ENGINE_ADD:
2359             nxt_queue_insert_tail(&router->engines, &engine->link0);
2360             break;
2361 
2362         case NXT_ROUTER_ENGINE_DELETE:
2363             nxt_queue_remove(&engine->link0);
2364             break;
2365         }
2366 
2367         nxt_router_engine_post(engine, recf->jobs);
2368 
2369         recf++;
2370     }
2371 }
2372 
2373 
2374 static void
2375 nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs)
2376 {
2377     nxt_work_t  *work, *next;
2378 
2379     for (work = jobs; work != NULL; work = next) {
2380         next = work->next;
2381         work->next = NULL;
2382 
2383         nxt_event_engine_post(engine, work);
2384     }
2385 }
2386 
2387 
2388 static nxt_port_handlers_t  nxt_router_app_port_handlers = {
2389     .rpc_error = nxt_port_rpc_handler,
2390     .mmap      = nxt_port_mmap_handler,
2391     .data      = nxt_port_rpc_handler,
2392 };
2393 
2394 
2395 static void
2396 nxt_router_thread_start(void *data)
2397 {
2398     nxt_int_t           ret;
2399     nxt_port_t          *port;
2400     nxt_task_t          *task;
2401     nxt_thread_t        *thread;
2402     nxt_thread_link_t   *link;
2403     nxt_event_engine_t  *engine;
2404 
2405     link = data;
2406     engine = link->engine;
2407     task = &engine->task;
2408 
2409     thread = nxt_thread();
2410 
2411     nxt_event_engine_thread_adopt(engine);
2412 
2413     /* STUB */
2414     thread->runtime = engine->task.thread->runtime;
2415 
2416     engine->task.thread = thread;
2417     engine->task.log = thread->log;
2418     thread->engine = engine;
2419     thread->task = &engine->task;
2420 #if 0
2421     thread->fiber = &engine->fibers->fiber;
2422 #endif
2423 
2424     engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
2425     if (nxt_slow_path(engine->mem_pool == NULL)) {
2426         return;
2427     }
2428 
2429     port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid,
2430                         NXT_PROCESS_ROUTER);
2431     if (nxt_slow_path(port == NULL)) {
2432         return;
2433     }
2434 
2435     ret = nxt_port_socket_init(task, port, 0);
2436     if (nxt_slow_path(ret != NXT_OK)) {
2437         nxt_port_use(task, port, -1);
2438         return;
2439     }
2440 
2441     engine->port = port;
2442 
2443     nxt_port_enable(task, port, &nxt_router_app_port_handlers);
2444 
2445     nxt_event_engine_start(engine);
2446 }
2447 
2448 
2449 static void
2450 nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data)
2451 {
2452     nxt_joint_job_t          *job;
2453     nxt_socket_conf_t        *skcf;
2454     nxt_listen_event_t       *lev;
2455     nxt_listen_socket_t      *ls;
2456     nxt_thread_spinlock_t    *lock;
2457     nxt_socket_conf_joint_t  *joint;
2458 
2459     job = obj;
2460     joint = data;
2461 
2462     nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link);
2463 
2464     skcf = joint->socket_conf;
2465     ls = skcf->listen;
2466 
2467     lev = nxt_listen_event(task, ls);
2468     if (nxt_slow_path(lev == NULL)) {
2469         nxt_router_listen_socket_release(task, skcf);
2470         return;
2471     }
2472 
2473     lev->socket.data = joint;
2474 
2475     lock = &skcf->router_conf->router->lock;
2476 
2477     nxt_thread_spin_lock(lock);
2478     ls->count++;
2479     nxt_thread_spin_unlock(lock);
2480 
2481     job->work.next = NULL;
2482     job->work.handler = nxt_router_conf_wait;
2483 
2484     nxt_event_engine_post(job->tmcf->engine, &job->work);
2485 }
2486 
2487 
2488 nxt_inline nxt_listen_event_t *
2489 nxt_router_listen_event(nxt_queue_t *listen_connections,
2490     nxt_socket_conf_t *skcf)
2491 {
2492     nxt_socket_t        fd;
2493     nxt_queue_link_t    *qlk;
2494     nxt_listen_event_t  *lev;
2495 
2496     fd = skcf->listen->socket;
2497 
2498     for (qlk = nxt_queue_first(listen_connections);
2499          qlk != nxt_queue_tail(listen_connections);
2500          qlk = nxt_queue_next(qlk))
2501     {
2502         lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link);
2503 
2504         if (fd == lev->socket.fd) {
2505             return lev;
2506         }
2507     }
2508 
2509     return NULL;
2510 }
2511 
2512 
2513 static void
2514 nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data)
2515 {
2516     nxt_joint_job_t          *job;
2517     nxt_event_engine_t       *engine;
2518     nxt_listen_event_t       *lev;
2519     nxt_socket_conf_joint_t  *joint, *old;
2520 
2521     job = obj;
2522     joint = data;
2523 
2524     engine = task->thread->engine;
2525 
2526     nxt_queue_insert_tail(&engine->joints, &joint->link);
2527 
2528     lev = nxt_router_listen_event(&engine->listen_connections,
2529                                   joint->socket_conf);
2530 
2531     old = lev->socket.data;
2532     lev->socket.data = joint;
2533     lev->listen = joint->socket_conf->listen;
2534 
2535     job->work.next = NULL;
2536     job->work.handler = nxt_router_conf_wait;
2537 
2538     nxt_event_engine_post(job->tmcf->engine, &job->work);
2539 
2540     /*
2541      * The task is allocated from configuration temporary
2542      * memory pool so it can be freed after engine post operation.
2543      */
2544 
2545     nxt_router_conf_release(&engine->task, old);
2546 }
2547 
2548 
2549 static void
2550 nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data)
2551 {
2552     nxt_joint_job_t     *job;
2553     nxt_socket_conf_t   *skcf;
2554     nxt_listen_event_t  *lev;
2555     nxt_event_engine_t  *engine;
2556 
2557     job = obj;
2558     skcf = data;
2559 
2560     engine = task->thread->engine;
2561 
2562     lev = nxt_router_listen_event(&engine->listen_connections, skcf);
2563 
2564     nxt_fd_event_delete(engine, &lev->socket);
2565 
2566     nxt_debug(task, "engine %p: listen socket delete: %d", engine,
2567               lev->socket.fd);
2568 
2569     lev->timer.handler = nxt_router_listen_socket_close;
2570     lev->timer.work_queue = &engine->fast_work_queue;
2571 
2572     nxt_timer_add(engine, &lev->timer, 0);
2573 
2574     job->work.next = NULL;
2575     job->work.handler = nxt_router_conf_wait;
2576 
2577     nxt_event_engine_post(job->tmcf->engine, &job->work);
2578 }
2579 
2580 
2581 static void
2582 nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, void *data)
2583 {
2584     nxt_event_engine_t  *engine;
2585 
2586     nxt_debug(task, "router worker thread quit");
2587 
2588     engine = task->thread->engine;
2589 
2590     engine->shutdown = 1;
2591 
2592     if (nxt_queue_is_empty(&engine->joints)) {
2593         nxt_thread_exit(task->thread);
2594     }
2595 }
2596 
2597 
2598 static void
2599 nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data)
2600 {
2601     nxt_timer_t              *timer;
2602     nxt_listen_event_t       *lev;
2603     nxt_socket_conf_joint_t  *joint;
2604 
2605     timer = obj;
2606     lev = nxt_timer_data(timer, nxt_listen_event_t, timer);
2607 
2608     nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine,
2609               lev->socket.fd);
2610 
2611     nxt_queue_remove(&lev->link);
2612 
2613     joint = lev->socket.data;
2614     lev->socket.data = NULL;
2615 
2616     /* 'task' refers to lev->task and we cannot use after nxt_free() */
2617     task = &task->thread->engine->task;
2618 
2619     nxt_router_listen_socket_release(task, joint->socket_conf);
2620 
2621     nxt_router_listen_event_release(task, lev, joint);
2622 }
2623 
2624 
2625 static void
2626 nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf)
2627 {
2628     nxt_listen_socket_t    *ls;
2629     nxt_thread_spinlock_t  *lock;
2630 
2631     ls = skcf->listen;
2632     lock = &skcf->router_conf->router->lock;
2633 
2634     nxt_thread_spin_lock(lock);
2635 
2636     nxt_debug(task, "engine %p: listen socket release: ls->count %D",
2637               task->thread->engine, ls->count);
2638 
2639     if (--ls->count != 0) {
2640         ls = NULL;
2641     }
2642 
2643     nxt_thread_spin_unlock(lock);
2644 
2645     if (ls != NULL) {
2646         nxt_socket_close(task, ls->socket);
2647         nxt_free(ls);
2648     }
2649 }
2650 
2651 
2652 void
2653 nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev,
2654     nxt_socket_conf_joint_t *joint)
2655 {
2656     nxt_event_engine_t  *engine;
2657 
2658     nxt_debug(task, "listen event count: %D", lev->count);
2659 
2660     if (--lev->count == 0) {
2661         nxt_free(lev);
2662     }
2663 
2664     if (joint != NULL) {
2665         nxt_router_conf_release(task, joint);
2666     }
2667 
2668     engine = task->thread->engine;
2669 
2670     if (engine->shutdown && nxt_queue_is_empty(&engine->joints)) {
2671         nxt_thread_exit(task->thread);
2672     }
2673 }
2674 
2675 
2676 void
2677 nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
2678 {
2679     nxt_app_t              *app;
2680     nxt_socket_conf_t      *skcf;
2681     nxt_router_conf_t      *rtcf;
2682     nxt_thread_spinlock_t  *lock;
2683 
2684     nxt_debug(task, "conf joint %p count: %D", joint, joint->count);
2685 
2686     if (--joint->count != 0) {
2687         return;
2688     }
2689 
2690     nxt_queue_remove(&joint->link);
2691 
2692     /*
2693      * The joint content can not be safely used after the critical
2694      * section protected by the spinlock because its memory pool may
2695      * be already destroyed by another thread.
2696      */
2697     skcf = joint->socket_conf;
2698     app = skcf->application;
2699     rtcf = skcf->router_conf;
2700     lock = &rtcf->router->lock;
2701 
2702     nxt_thread_spin_lock(lock);
2703 
2704     nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count,
2705               rtcf, rtcf->count);
2706 
2707     if (--skcf->count != 0) {
2708         rtcf = NULL;
2709         app = NULL;
2710 
2711     } else {
2712         nxt_queue_remove(&skcf->link);
2713 
2714         if (--rtcf->count != 0) {
2715             rtcf = NULL;
2716         }
2717     }
2718 
2719     nxt_thread_spin_unlock(lock);
2720 
2721     if (app != NULL) {
2722         nxt_router_app_use(task, app, -1);
2723     }
2724 
2725     /* TODO remove engine->port */
2726     /* TODO excude from connected ports */
2727 
2728     if (rtcf != NULL) {
2729         nxt_debug(task, "old router conf is destroyed");
2730 
2731         nxt_router_access_log_release(task, lock, rtcf->access_log);
2732 
2733         nxt_mp_thread_adopt(rtcf->mem_pool);
2734 
2735         nxt_mp_destroy(rtcf->mem_pool);
2736     }
2737 }
2738 
2739 
2740 static void
2741 nxt_router_access_log_writer(nxt_task_t *task, nxt_http_request_t *r,
2742     nxt_router_access_log_t *access_log)
2743 {
2744     size_t     size;
2745     u_char     *buf, *p;
2746     nxt_off_t  bytes;
2747 
2748     static nxt_time_string_t  date_cache = {
2749         (nxt_atomic_uint_t) -1,
2750         nxt_router_access_log_date,
2751         "%02d/%s/%4d:%02d:%02d:%02d %c%02d%02d",
2752         nxt_length("31/Dec/1986:19:40:00 +0300"),
2753         NXT_THREAD_TIME_LOCAL,
2754         NXT_THREAD_TIME_SEC,
2755     };
2756 
2757     size = r->remote->address_length
2758            + 6                  /* ' - - [' */
2759            + date_cache.size
2760            + 3                  /* '] "' */
2761            + r->method->length
2762            + 1                  /* space */
2763            + r->target.length
2764            + 1                  /* space */
2765            + r->version.length
2766            + 2                  /* '" ' */
2767            + 3                  /* status */
2768            + 1                  /* space */
2769            + NXT_OFF_T_LEN
2770            + 2                  /* ' "' */
2771            + (r->referer != NULL ? r->referer->value_length : 1)
2772            + 3                  /* '" "' */
2773            + (r->user_agent != NULL ? r->user_agent->value_length : 1)
2774            + 2                  /* '"\n' */
2775     ;
2776 
2777     buf = nxt_mp_nget(r->mem_pool, size);
2778     if (nxt_slow_path(buf == NULL)) {
2779         return;
2780     }
2781 
2782     p = nxt_cpymem(buf, nxt_sockaddr_address(r->remote),
2783                    r->remote->address_length);
2784 
2785     p = nxt_cpymem(p, " - - [", 6);
2786 
2787     p = nxt_thread_time_string(task->thread, &date_cache, p);
2788 
2789     p = nxt_cpymem(p, "] \"", 3);
2790 
2791     if (r->method->length != 0) {
2792         p = nxt_cpymem(p, r->method->start, r->method->length);
2793 
2794         if (r->target.length != 0) {
2795             *p++ = ' ';
2796             p = nxt_cpymem(p, r->target.start, r->target.length);
2797 
2798             if (r->version.length != 0) {
2799                 *p++ = ' ';
2800                 p = nxt_cpymem(p, r->version.start, r->version.length);
2801             }
2802         }
2803 
2804     } else {
2805         *p++ = '-';
2806     }
2807 
2808     p = nxt_cpymem(p, "\" ", 2);
2809 
2810     p = nxt_sprintf(p, p + 3, "%03d", r->status);
2811 
2812     *p++ = ' ';
2813 
2814     bytes = nxt_http_proto_body_bytes_sent[r->protocol](task, r->proto);
2815 
2816     p = nxt_sprintf(p, p + NXT_OFF_T_LEN, "%O", bytes);
2817 
2818     p = nxt_cpymem(p, " \"", 2);
2819 
2820     if (r->referer != NULL) {
2821         p = nxt_cpymem(p, r->referer->value, r->referer->value_length);
2822 
2823     } else {
2824         *p++ = '-';
2825     }
2826 
2827     p = nxt_cpymem(p, "\" \"", 3);
2828 
2829     if (r->user_agent != NULL) {
2830         p = nxt_cpymem(p, r->user_agent->value, r->user_agent->value_length);
2831 
2832     } else {
2833         *p++ = '-';
2834     }
2835 
2836     p = nxt_cpymem(p, "\"\n", 2);
2837 
2838     nxt_fd_write(access_log->fd, buf, p - buf);
2839 }
2840 
2841 
2842 static u_char *
2843 nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, struct tm *tm,
2844     size_t size, const char *format)
2845 {
2846     u_char  sign;
2847     time_t  gmtoff;
2848 
2849     static const char  *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun",
2850                                     "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
2851 
2852     gmtoff = nxt_timezone(tm) / 60;
2853 
2854     if (gmtoff < 0) {
2855         gmtoff = -gmtoff;
2856         sign = '-';
2857 
2858     } else {
2859         sign = '+';
2860     }
2861 
2862     return nxt_sprintf(buf, buf + size, format,
2863                        tm->tm_mday, month[tm->tm_mon], tm->tm_year + 1900,
2864                        tm->tm_hour, tm->tm_min, tm->tm_sec,
2865                        sign, gmtoff / 60, gmtoff % 60);
2866 }
2867 
2868 
2869 static void
2870 nxt_router_access_log_open(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
2871 {
2872     uint32_t                 stream;
2873     nxt_int_t                ret;
2874     nxt_buf_t                *b;
2875     nxt_port_t               *main_port, *router_port;
2876     nxt_runtime_t            *rt;
2877     nxt_router_access_log_t  *access_log;
2878 
2879     access_log = tmcf->router_conf->access_log;
2880 
2881     b = nxt_buf_mem_alloc(tmcf->mem_pool, access_log->path.length + 1, 0);
2882     if (nxt_slow_path(b == NULL)) {
2883         goto fail;
2884     }
2885 
2886     nxt_buf_cpystr(b, &access_log->path);
2887     *b->mem.free++ = '\0';
2888 
2889     rt = task->thread->runtime;
2890     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2891     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2892 
2893     stream = nxt_port_rpc_register_handler(task, router_port,
2894                                            nxt_router_access_log_ready,
2895                                            nxt_router_access_log_error,
2896                                            -1, tmcf);
2897     if (nxt_slow_path(stream == 0)) {
2898         goto fail;
2899     }
2900 
2901     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1,
2902                                 stream, router_port->id, b);
2903 
2904     if (nxt_slow_path(ret != NXT_OK)) {
2905         nxt_port_rpc_cancel(task, router_port, stream);
2906         goto fail;
2907     }
2908 
2909     return;
2910 
2911 fail:
2912 
2913     nxt_router_conf_error(task, tmcf);
2914 }
2915 
2916 
2917 static void
2918 nxt_router_access_log_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2919     void *data)
2920 {
2921     nxt_router_temp_conf_t   *tmcf;
2922     nxt_router_access_log_t  *access_log;
2923 
2924     tmcf = data;
2925 
2926     access_log = tmcf->router_conf->access_log;
2927 
2928     access_log->fd = msg->fd;
2929 
2930     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2931                        nxt_router_conf_apply, task, tmcf, NULL);
2932 }
2933 
2934 
2935 static void
2936 nxt_router_access_log_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2937     void *data)
2938 {
2939     nxt_router_temp_conf_t  *tmcf;
2940 
2941     tmcf = data;
2942 
2943     nxt_router_conf_error(task, tmcf);
2944 }
2945 
2946 
2947 static void
2948 nxt_router_access_log_release(nxt_task_t *task, nxt_thread_spinlock_t *lock,
2949     nxt_router_access_log_t *access_log)
2950 {
2951     if (access_log == NULL) {
2952         return;
2953     }
2954 
2955     nxt_thread_spin_lock(lock);
2956 
2957     if (--access_log->count != 0) {
2958         access_log = NULL;
2959     }
2960 
2961     nxt_thread_spin_unlock(lock);
2962 
2963     if (access_log != NULL) {
2964 
2965         if (access_log->fd != -1) {
2966             nxt_fd_close(access_log->fd);
2967         }
2968 
2969         nxt_free(access_log);
2970     }
2971 }
2972 
2973 
2974 typedef struct {
2975     nxt_mp_t                 *mem_pool;
2976     nxt_router_access_log_t  *access_log;
2977 } nxt_router_access_log_reopen_t;
2978 
2979 
2980 void
2981 nxt_router_access_log_reopen_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
2982 {
2983     nxt_mp_t                        *mp;
2984     uint32_t                        stream;
2985     nxt_int_t                       ret;
2986     nxt_buf_t                       *b;
2987     nxt_port_t                      *main_port, *router_port;
2988     nxt_runtime_t                   *rt;
2989     nxt_router_access_log_t         *access_log;
2990     nxt_router_access_log_reopen_t  *reopen;
2991 
2992     access_log = nxt_router->access_log;
2993 
2994     if (access_log == NULL) {
2995         return;
2996     }
2997 
2998     mp = nxt_mp_create(1024, 128, 256, 32);
2999     if (nxt_slow_path(mp == NULL)) {
3000         return;
3001     }
3002 
3003     reopen = nxt_mp_get(mp, sizeof(nxt_router_access_log_reopen_t));
3004     if (nxt_slow_path(reopen == NULL)) {
3005         goto fail;
3006     }
3007 
3008     reopen->mem_pool = mp;
3009     reopen->access_log = access_log;
3010 
3011     b = nxt_buf_mem_alloc(mp, access_log->path.length + 1, 0);
3012     if (nxt_slow_path(b == NULL)) {
3013         goto fail;
3014     }
3015 
3016     b->completion_handler = nxt_router_access_log_reopen_completion;
3017 
3018     nxt_buf_cpystr(b, &access_log->path);
3019     *b->mem.free++ = '\0';
3020 
3021     rt = task->thread->runtime;
3022     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
3023     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
3024 
3025     stream = nxt_port_rpc_register_handler(task, router_port,
3026                                            nxt_router_access_log_reopen_ready,
3027                                            nxt_router_access_log_reopen_error,
3028                                            -1, reopen);
3029     if (nxt_slow_path(stream == 0)) {
3030         goto fail;
3031     }
3032 
3033     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1,
3034                                 stream, router_port->id, b);
3035 
3036     if (nxt_slow_path(ret != NXT_OK)) {
3037         nxt_port_rpc_cancel(task, router_port, stream);
3038         goto fail;
3039     }
3040 
3041     nxt_mp_retain(mp);
3042 
3043     return;
3044 
3045 fail:
3046 
3047     nxt_mp_destroy(mp);
3048 }
3049 
3050 
3051 static void
3052 nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj, void *data)
3053 {
3054     nxt_mp_t   *mp;
3055     nxt_buf_t  *b;
3056 
3057     b = obj;
3058     mp = b->data;
3059 
3060     nxt_mp_release(mp);
3061 }
3062 
3063 
3064 static void
3065 nxt_router_access_log_reopen_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3066     void *data)
3067 {
3068     nxt_router_access_log_t         *access_log;
3069     nxt_router_access_log_reopen_t  *reopen;
3070 
3071     reopen = data;
3072 
3073     access_log = reopen->access_log;
3074 
3075     if (access_log == nxt_router->access_log) {
3076 
3077         if (nxt_slow_path(dup2(msg->fd, access_log->fd) == -1)) {
3078             nxt_alert(task, "dup2(%FD, %FD) failed %E",
3079                       msg->fd, access_log->fd, nxt_errno);
3080         }
3081     }
3082 
3083     nxt_fd_close(msg->fd);
3084     nxt_mp_release(reopen->mem_pool);
3085 }
3086 
3087 
3088 static void
3089 nxt_router_access_log_reopen_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3090     void *data)
3091 {
3092     nxt_router_access_log_reopen_t  *reopen;
3093 
3094     reopen = data;
3095 
3096     nxt_mp_release(reopen->mem_pool);
3097 }
3098 
3099 
3100 static void
3101 nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data)
3102 {
3103     nxt_port_t           *port;
3104     nxt_thread_link_t    *link;
3105     nxt_event_engine_t   *engine;
3106     nxt_thread_handle_t  handle;
3107 
3108     handle = (nxt_thread_handle_t) obj;
3109     link = data;
3110 
3111     nxt_thread_wait(handle);
3112 
3113     engine = link->engine;
3114 
3115     nxt_queue_remove(&engine->link);
3116 
3117     port = engine->port;
3118 
3119     // TODO notify all apps
3120 
3121     port->engine = task->thread->engine;
3122     nxt_mp_thread_adopt(port->mem_pool);
3123     nxt_port_use(task, port, -1);
3124 
3125     nxt_mp_thread_adopt(engine->mem_pool);
3126     nxt_mp_destroy(engine->mem_pool);
3127 
3128     nxt_event_engine_free(engine);
3129 
3130     nxt_free(link);
3131 }
3132 
3133 
3134 static void
3135 nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3136     void *data)
3137 {
3138     size_t               dump_size;
3139     nxt_int_t            ret;
3140     nxt_buf_t            *b;
3141     nxt_http_request_t   *r;
3142     nxt_req_conn_link_t  *rc;
3143     nxt_app_parse_ctx_t  *ar;
3144     nxt_unit_response_t  *resp;
3145 
3146     b = msg->buf;
3147     rc = data;
3148 
3149     dump_size = nxt_buf_used_size(b);
3150 
3151     if (dump_size > 300) {
3152         dump_size = 300;
3153     }
3154 
3155     nxt_debug(task, "%srouter app data (%uz): %*s",
3156               msg->port_msg.last ? "last " : "", msg->size, dump_size,
3157               b->mem.pos);
3158 
3159     if (msg->size == 0) {
3160         b = NULL;
3161     }
3162 
3163     ar = rc->ap;
3164     if (nxt_slow_path(ar == NULL)) {
3165         return;
3166     }
3167 
3168     if (ar->request->error) {
3169         nxt_router_rc_unlink(task, rc);
3170         return;
3171     }
3172 
3173     if (msg->port_msg.last != 0) {
3174         nxt_debug(task, "router data create last buf");
3175 
3176         nxt_buf_chain_add(&b, nxt_http_buf_last(ar->request));
3177 
3178         nxt_router_rc_unlink(task, rc);
3179 
3180     } else {
3181         if (rc->app != NULL && rc->app->timeout != 0) {
3182             ar->timer.handler = nxt_router_app_timeout;
3183             ar->timer_data = rc;
3184             nxt_timer_add(task->thread->engine, &ar->timer, rc->app->timeout);
3185         }
3186     }
3187 
3188     if (b == NULL) {