xref: /unit/src/nxt_router.c (revision 977:4f9268f27b57)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Valentin V. Bartenev
5  * Copyright (C) NGINX, Inc.
6  */
7 
8 #include <nxt_router.h>
9 #include <nxt_conf.h>
10 #if (NXT_TLS)
11 #include <nxt_cert.h>
12 #endif
13 #include <nxt_http.h>
14 #include <nxt_port_memory_int.h>
15 #include <nxt_unit_request.h>
16 #include <nxt_unit_response.h>
17 
18 
19 typedef struct {
20     nxt_str_t         type;
21     uint32_t          processes;
22     uint32_t          max_processes;
23     uint32_t          spare_processes;
24     nxt_msec_t        timeout;
25     nxt_msec_t        res_timeout;
26     nxt_msec_t        idle_timeout;
27     uint32_t          requests;
28     nxt_conf_value_t  *limits_value;
29     nxt_conf_value_t  *processes_value;
30 } nxt_router_app_conf_t;
31 
32 
33 typedef struct {
34     nxt_str_t         pass;
35     nxt_str_t         application;
36 } nxt_router_listener_conf_t;
37 
38 
39 #if (NXT_TLS)
40 
41 typedef struct {
42     nxt_str_t          name;
43     nxt_socket_conf_t  *conf;
44 
45     nxt_queue_link_t   link;  /* for nxt_socket_conf_t.tls */
46 } nxt_router_tlssock_t;
47 
48 #endif
49 
50 
51 typedef struct nxt_msg_info_s {
52     nxt_buf_t                 *buf;
53     nxt_port_mmap_tracking_t  tracking;
54     nxt_work_handler_t        completion_handler;
55 } nxt_msg_info_t;
56 
57 
58 typedef struct nxt_req_app_link_s nxt_req_app_link_t;
59 
60 
61 typedef struct {
62     uint32_t                 stream;
63     nxt_app_t                *app;
64     nxt_port_t               *app_port;
65     nxt_app_parse_ctx_t      *ap;
66     nxt_msg_info_t           msg_info;
67     nxt_req_app_link_t       *ra;
68 
69     nxt_queue_link_t         link;     /* for nxt_conn_t.requests */
70 } nxt_req_conn_link_t;
71 
72 
73 struct nxt_req_app_link_s {
74     uint32_t             stream;
75     nxt_atomic_t         use_count;
76     nxt_port_t           *app_port;
77     nxt_port_t           *reply_port;
78     nxt_app_parse_ctx_t  *ap;
79     nxt_msg_info_t       msg_info;
80     nxt_req_conn_link_t  *rc;
81 
82     nxt_nsec_t           res_time;
83 
84     nxt_queue_link_t     link_app_requests; /* for nxt_app_t.requests */
85     nxt_queue_link_t     link_port_pending; /* for nxt_port_t.pending_requests */
86     nxt_queue_link_t     link_app_pending;  /* for nxt_app_t.pending */
87 
88     nxt_mp_t             *mem_pool;
89     nxt_work_t           work;
90 
91     int                  err_code;
92     const char           *err_str;
93 };
94 
95 
96 typedef struct {
97     nxt_socket_conf_t       *socket_conf;
98     nxt_router_temp_conf_t  *temp_conf;
99 } nxt_socket_rpc_t;
100 
101 
102 typedef struct {
103     nxt_app_t               *app;
104     nxt_router_temp_conf_t  *temp_conf;
105 } nxt_app_rpc_t;
106 
107 
108 struct nxt_port_select_state_s {
109     nxt_app_t           *app;
110     nxt_req_app_link_t  *ra;
111 
112     nxt_port_t          *failed_port;
113     int                 failed_port_use_delta;
114 
115     uint8_t             start_process;    /* 1 bit */
116     nxt_req_app_link_t  *shared_ra;
117     nxt_port_t          *port;
118 };
119 
120 typedef struct nxt_port_select_state_s nxt_port_select_state_t;
121 
122 static void nxt_router_greet_controller(nxt_task_t *task,
123     nxt_port_t *controller_port);
124 
125 static void nxt_router_port_select(nxt_task_t *task,
126     nxt_port_select_state_t *state);
127 
128 static nxt_int_t nxt_router_port_post_select(nxt_task_t *task,
129     nxt_port_select_state_t *state);
130 
131 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
132 
133 nxt_inline void
134 nxt_router_ra_inc_use(nxt_req_app_link_t *ra)
135 {
136     nxt_atomic_fetch_add(&ra->use_count, 1);
137 }
138 
139 nxt_inline void
140 nxt_router_ra_dec_use(nxt_req_app_link_t *ra)
141 {
142 #if (NXT_DEBUG)
143     int  c;
144 
145     c = nxt_atomic_fetch_add(&ra->use_count, -1);
146 
147     nxt_assert(c > 1);
148 #else
149     (void) nxt_atomic_fetch_add(&ra->use_count, -1);
150 #endif
151 }
152 
153 static void nxt_router_ra_use(nxt_task_t *task, nxt_req_app_link_t *ra, int i);
154 
155 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
156 static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
157 static void nxt_router_conf_ready(nxt_task_t *task,
158     nxt_router_temp_conf_t *tmcf);
159 static void nxt_router_conf_error(nxt_task_t *task,
160     nxt_router_temp_conf_t *tmcf);
161 static void nxt_router_conf_send(nxt_task_t *task,
162     nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
163 
164 static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
165     nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
166 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
167 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
168     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
169 static void nxt_router_listen_socket_ready(nxt_task_t *task,
170     nxt_port_recv_msg_t *msg, void *data);
171 static void nxt_router_listen_socket_error(nxt_task_t *task,
172     nxt_port_recv_msg_t *msg, void *data);
173 #if (NXT_TLS)
174 static void nxt_router_tls_rpc_create(nxt_task_t *task,
175     nxt_router_temp_conf_t *tmcf, nxt_router_tlssock_t *tls);
176 static void nxt_router_tls_rpc_handler(nxt_task_t *task,
177     nxt_port_recv_msg_t *msg, void *data);
178 #endif
179 static void nxt_router_app_rpc_create(nxt_task_t *task,
180     nxt_router_temp_conf_t *tmcf, nxt_app_t *app);
181 static void nxt_router_app_prefork_ready(nxt_task_t *task,
182     nxt_port_recv_msg_t *msg, void *data);
183 static void nxt_router_app_prefork_error(nxt_task_t *task,
184     nxt_port_recv_msg_t *msg, void *data);
185 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
186     nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
187 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
188     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa);
189 
190 static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
191     nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
192     const nxt_event_interface_t *interface);
193 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
194     nxt_router_engine_conf_t *recf);
195 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
196     nxt_router_engine_conf_t *recf);
197 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
198     nxt_router_engine_conf_t *recf);
199 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
200     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
201     nxt_work_handler_t handler);
202 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
203     nxt_router_engine_conf_t *recf);
204 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
205     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
206 
207 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
208     nxt_router_temp_conf_t *tmcf);
209 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
210     nxt_event_engine_t *engine);
211 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
212     nxt_router_temp_conf_t *tmcf);
213 
214 static void nxt_router_engines_post(nxt_router_t *router,
215     nxt_router_temp_conf_t *tmcf);
216 static void nxt_router_engine_post(nxt_event_engine_t *engine,
217     nxt_work_t *jobs);
218 
219 static void nxt_router_thread_start(void *data);
220 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
221     void *data);
222 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
223     void *data);
224 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
225     void *data);
226 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
227     void *data);
228 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
229     void *data);
230 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
231     void *data);
232 static void nxt_router_listen_socket_release(nxt_task_t *task,
233     nxt_socket_conf_t *skcf);
234 
235 static void nxt_router_access_log_writer(nxt_task_t *task,
236     nxt_http_request_t *r, nxt_router_access_log_t *access_log);
237 static u_char *nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now,
238     struct tm *tm, size_t size, const char *format);
239 static void nxt_router_access_log_open(nxt_task_t *task,
240     nxt_router_temp_conf_t *tmcf);
241 static void nxt_router_access_log_ready(nxt_task_t *task,
242     nxt_port_recv_msg_t *msg, void *data);
243 static void nxt_router_access_log_error(nxt_task_t *task,
244     nxt_port_recv_msg_t *msg, void *data);
245 static void nxt_router_access_log_release(nxt_task_t *task,
246     nxt_thread_spinlock_t *lock, nxt_router_access_log_t *access_log);
247 static void nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj,
248     void *data);
249 static void nxt_router_access_log_reopen_ready(nxt_task_t *task,
250     nxt_port_recv_msg_t *msg, void *data);
251 static void nxt_router_access_log_reopen_error(nxt_task_t *task,
252     nxt_port_recv_msg_t *msg, void *data);
253 
254 static void nxt_router_app_port_ready(nxt_task_t *task,
255     nxt_port_recv_msg_t *msg, void *data);
256 static void nxt_router_app_port_error(nxt_task_t *task,
257     nxt_port_recv_msg_t *msg, void *data);
258 
259 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
260 static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
261     uint32_t request_failed, uint32_t got_response);
262 static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app,
263     nxt_req_app_link_t *ra);
264 
265 static void nxt_router_app_prepare_request(nxt_task_t *task,
266     nxt_req_app_link_t *ra);
267 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
268     nxt_port_t *port, const nxt_str_t *prefix);
269 
270 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
271 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
272     void *data);
273 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj,
274     void *data);
275 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj,
276     void *data);
277 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data);
278 
279 static const nxt_http_request_state_t  nxt_http_request_send_state;
280 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
281 
282 static void nxt_router_app_joint_use(nxt_task_t *task,
283     nxt_app_joint_t *app_joint, int i);
284 
285 static nxt_router_t  *nxt_router;
286 
287 static const nxt_str_t http_prefix = nxt_string("HTTP_");
288 static const nxt_str_t empty_prefix = nxt_string("");
289 
290 static const nxt_str_t  *nxt_app_msg_prefix[] = {
291     &empty_prefix,
292     &http_prefix,
293     &http_prefix,
294     &http_prefix,
295     &http_prefix,
296     &empty_prefix,
297 };
298 
299 
300 nxt_port_handlers_t  nxt_router_process_port_handlers = {
301     .quit         = nxt_worker_process_quit_handler,
302     .new_port     = nxt_router_new_port_handler,
303     .change_file  = nxt_port_change_log_file_handler,
304     .mmap         = nxt_port_mmap_handler,
305     .data         = nxt_router_conf_data_handler,
306     .remove_pid   = nxt_router_remove_pid_handler,
307     .access_log   = nxt_router_access_log_reopen_handler,
308     .rpc_ready    = nxt_port_rpc_handler,
309     .rpc_error    = nxt_port_rpc_handler,
310 };
311 
312 
313 nxt_int_t
314 nxt_router_start(nxt_task_t *task, void *data)
315 {
316     nxt_int_t      ret;
317     nxt_port_t     *controller_port;
318     nxt_router_t   *router;
319     nxt_runtime_t  *rt;
320 
321     rt = task->thread->runtime;
322 
323 #if (NXT_TLS)
324     rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL");
325     if (nxt_slow_path(rt->tls == NULL)) {
326         return NXT_ERROR;
327     }
328 
329     ret = rt->tls->library_init(task);
330     if (nxt_slow_path(ret != NXT_OK)) {
331         return ret;
332     }
333 #endif
334 
335     ret = nxt_http_init(task, rt);
336     if (nxt_slow_path(ret != NXT_OK)) {
337         return ret;
338     }
339 
340     router = nxt_zalloc(sizeof(nxt_router_t));
341     if (nxt_slow_path(router == NULL)) {
342         return NXT_ERROR;
343     }
344 
345     nxt_queue_init(&router->engines);
346     nxt_queue_init(&router->sockets);
347     nxt_queue_init(&router->apps);
348 
349     nxt_router = router;
350 
351     controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
352     if (controller_port != NULL) {
353         nxt_router_greet_controller(task, controller_port);
354     }
355 
356     return NXT_OK;
357 }
358 
359 
360 static void
361 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port)
362 {
363     nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY,
364                           -1, 0, 0, NULL);
365 }
366 
367 
368 static void
369 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
370     void *data)
371 {
372     size_t         size;
373     uint32_t       stream;
374     nxt_mp_t       *mp;
375     nxt_int_t      ret;
376     nxt_app_t      *app;
377     nxt_buf_t      *b;
378     nxt_port_t     *main_port;
379     nxt_runtime_t  *rt;
380 
381     app = data;
382 
383     rt = task->thread->runtime;
384     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
385 
386     nxt_debug(task, "app '%V' %p start process", &app->name, app);
387 
388     size = app->name.length + 1 + app->conf.length;
389 
390     b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size);
391 
392     if (nxt_slow_path(b == NULL)) {
393         goto failed;
394     }
395 
396     nxt_buf_cpystr(b, &app->name);
397     *b->mem.free++ = '\0';
398     nxt_buf_cpystr(b, &app->conf);
399 
400     nxt_router_app_joint_use(task, app->joint, 1);
401 
402     stream = nxt_port_rpc_register_handler(task, port,
403                                            nxt_router_app_port_ready,
404                                            nxt_router_app_port_error,
405                                            -1, app->joint);
406 
407     if (nxt_slow_path(stream == 0)) {
408         nxt_router_app_joint_use(task, app->joint, -1);
409 
410         goto failed;
411     }
412 
413     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1,
414                                 stream, port->id, b);
415 
416     if (nxt_slow_path(ret != NXT_OK)) {
417         nxt_port_rpc_cancel(task, port, stream);
418 
419         nxt_router_app_joint_use(task, app->joint, -1);
420 
421         goto failed;
422     }
423 
424     nxt_router_app_use(task, app, -1);
425 
426     return;
427 
428 failed:
429 
430     if (b != NULL) {
431         mp = b->data;
432         nxt_mp_free(mp, b);
433         nxt_mp_release(mp);
434     }
435 
436     nxt_thread_mutex_lock(&app->mutex);
437 
438     app->pending_processes--;
439 
440     nxt_thread_mutex_unlock(&app->mutex);
441 
442     nxt_router_app_use(task, app, -1);
443 }
444 
445 
446 static void
447 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i)
448 {
449     app_joint->use_count += i;
450 
451     if (app_joint->use_count == 0) {
452         nxt_assert(app_joint->app == NULL);
453 
454         nxt_free(app_joint);
455     }
456 }
457 
458 
459 static nxt_int_t
460 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
461 {
462     nxt_int_t      res;
463     nxt_port_t     *router_port;
464     nxt_runtime_t  *rt;
465 
466     rt = task->thread->runtime;
467     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
468 
469     nxt_router_app_use(task, app, 1);
470 
471     res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler,
472                         app);
473 
474     if (res == NXT_OK) {
475         return res;
476     }
477 
478     nxt_thread_mutex_lock(&app->mutex);
479 
480     app->pending_processes--;
481 
482     nxt_thread_mutex_unlock(&app->mutex);
483 
484     nxt_router_app_use(task, app, -1);
485 
486     return NXT_ERROR;
487 }
488 
489 
490 nxt_inline void
491 nxt_router_ra_init(nxt_task_t *task, nxt_req_app_link_t *ra,
492     nxt_req_conn_link_t *rc)
493 {
494     nxt_event_engine_t  *engine;
495 
496     engine = task->thread->engine;
497 
498     nxt_memzero(ra, sizeof(nxt_req_app_link_t));
499 
500     ra->stream = rc->stream;
501     ra->use_count = 1;
502     ra->rc = rc;
503     rc->ra = ra;
504     ra->reply_port = engine->port;
505     ra->ap = rc->ap;
506 
507     ra->work.handler = NULL;
508     ra->work.task = &engine->task;
509     ra->work.obj = ra;
510     ra->work.data = engine;
511 }
512 
513 
514 nxt_inline nxt_req_app_link_t *
515 nxt_router_ra_create(nxt_task_t *task, nxt_req_app_link_t *ra_src)
516 {
517     nxt_mp_t            *mp;
518     nxt_req_app_link_t  *ra;
519 
520     if (ra_src->mem_pool != NULL) {
521         return ra_src;
522     }
523 
524     mp = ra_src->ap->mem_pool;
525 
526     ra = nxt_mp_alloc(mp, sizeof(nxt_req_app_link_t));
527 
528     if (nxt_slow_path(ra == NULL)) {
529 
530         ra_src->rc->ra = NULL;
531         ra_src->rc = NULL;
532 
533         return NULL;
534     }
535 
536     nxt_mp_retain(mp);
537 
538     nxt_router_ra_init(task, ra, ra_src->rc);
539 
540     ra->mem_pool = mp;
541 
542     return ra;
543 }
544 
545 
546 nxt_inline nxt_bool_t
547 nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info,
548     uint32_t stream)
549 {
550     nxt_buf_t   *b, *next;
551     nxt_bool_t  cancelled;
552 
553     if (msg_info->buf == NULL) {
554         return 0;
555     }
556 
557     cancelled = nxt_port_mmap_tracking_cancel(task, &msg_info->tracking,
558                                               stream);
559 
560     if (cancelled) {
561         nxt_debug(task, "stream #%uD: cancelled by router", stream);
562     }
563 
564     for (b = msg_info->buf; b != NULL; b = next) {
565         next = b->next;
566 
567         b->completion_handler = msg_info->completion_handler;
568 
569         if (b->is_port_mmap_sent) {
570             b->is_port_mmap_sent = cancelled == 0;
571             b->completion_handler(task, b, b->parent);
572         }
573     }
574 
575     msg_info->buf = NULL;
576 
577     return cancelled;
578 }
579 
580 
581 static void
582 nxt_router_ra_update_peer(nxt_task_t *task, nxt_req_app_link_t *ra);
583 
584 
585 static void
586 nxt_router_ra_update_peer_handler(nxt_task_t *task, void *obj, void *data)
587 {
588     nxt_req_app_link_t  *ra;
589 
590     ra = obj;
591 
592     nxt_router_ra_update_peer(task, ra);
593 
594     nxt_router_ra_use(task, ra, -1);
595 }
596 
597 
598 static void
599 nxt_router_ra_update_peer(nxt_task_t *task, nxt_req_app_link_t *ra)
600 {
601     nxt_event_engine_t   *engine;
602     nxt_req_conn_link_t  *rc;
603 
604     engine = ra->work.data;
605 
606     if (task->thread->engine != engine) {
607         nxt_router_ra_inc_use(ra);
608 
609         ra->work.handler = nxt_router_ra_update_peer_handler;
610         ra->work.task = &engine->task;
611         ra->work.next = NULL;
612 
613         nxt_debug(task, "ra stream #%uD post update peer to %p",
614                   ra->stream, engine);
615 
616         nxt_event_engine_post(engine, &ra->work);
617 
618         return;
619     }
620 
621     nxt_debug(task, "ra stream #%uD update peer", ra->stream);
622 
623     rc = ra->rc;
624 
625     if (rc != NULL && ra->app_port != NULL) {
626         nxt_port_rpc_ex_set_peer(task, engine->port, rc, ra->app_port->pid);
627     }
628 
629     nxt_router_ra_use(task, ra, -1);
630 }
631 
632 
633 static void
634 nxt_router_ra_release(nxt_task_t *task, nxt_req_app_link_t *ra)
635 {
636     nxt_mp_t                *mp;
637     nxt_req_conn_link_t     *rc;
638 
639     nxt_assert(task->thread->engine == ra->work.data);
640     nxt_assert(ra->use_count == 0);
641 
642     nxt_debug(task, "ra stream #%uD release", ra->stream);
643 
644     rc = ra->rc;
645 
646     if (rc != NULL) {
647         if (nxt_slow_path(ra->err_code != 0)) {
648             nxt_http_request_error(task, rc->ap->request, ra->err_code);
649 
650         } else {
651             rc->app_port = ra->app_port;
652             rc->msg_info = ra->msg_info;
653 
654             if (rc->app->timeout != 0) {
655                 rc->ap->timer.handler = nxt_router_app_timeout;
656                 rc->ap->timer_data = rc;
657                 nxt_timer_add(task->thread->engine, &rc->ap->timer,
658                               rc->app->timeout);
659             }
660 
661             ra->app_port = NULL;
662             ra->msg_info.buf = NULL;
663         }
664 
665         rc->ra = NULL;
666         ra->rc = NULL;
667     }
668 
669     if (ra->app_port != NULL) {
670         nxt_router_app_port_release(task, ra->app_port, 0, 1);
671 
672         ra->app_port = NULL;
673     }
674 
675     nxt_router_msg_cancel(task, &ra->msg_info, ra->stream);
676 
677     mp = ra->mem_pool;
678 
679     if (mp != NULL) {
680         nxt_mp_free(mp, ra);
681         nxt_mp_release(mp);
682     }
683 }
684 
685 
686 static void
687 nxt_router_ra_release_handler(nxt_task_t *task, void *obj, void *data)
688 {
689     nxt_req_app_link_t  *ra;
690 
691     ra = obj;
692 
693     nxt_assert(ra->work.data == data);
694 
695     nxt_atomic_fetch_add(&ra->use_count, -1);
696 
697     nxt_router_ra_release(task, ra);
698 }
699 
700 
701 static void
702 nxt_router_ra_use(nxt_task_t *task, nxt_req_app_link_t *ra, int i)
703 {
704     int                 c;
705     nxt_event_engine_t  *engine;
706 
707     c = nxt_atomic_fetch_add(&ra->use_count, i);
708 
709     if (i < 0 && c == -i) {
710         engine = ra->work.data;
711 
712         if (task->thread->engine == engine) {
713             nxt_router_ra_release(task, ra);
714 
715             return;
716         }
717 
718         nxt_router_ra_inc_use(ra);
719 
720         ra->work.handler = nxt_router_ra_release_handler;
721         ra->work.task = &engine->task;
722         ra->work.next = NULL;
723 
724         nxt_debug(task, "ra stream #%uD post release to %p",
725                   ra->stream, engine);
726 
727         nxt_event_engine_post(engine, &ra->work);
728     }
729 }
730 
731 
732 nxt_inline void
733 nxt_router_ra_error(nxt_req_app_link_t *ra, int code, const char *str)
734 {
735     ra->app_port = NULL;
736     ra->err_code = code;
737     ra->err_str = str;
738 }
739 
740 
741 nxt_inline void
742 nxt_router_ra_pending(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra)
743 {
744     nxt_queue_insert_tail(&ra->app_port->pending_requests,
745                           &ra->link_port_pending);
746     nxt_queue_insert_tail(&app->pending, &ra->link_app_pending);
747 
748     nxt_router_ra_inc_use(ra);
749 
750     ra->res_time = nxt_thread_monotonic_time(task->thread) + app->res_timeout;
751 
752     nxt_debug(task, "ra stream #%uD enqueue to pending_requests", ra->stream);
753 }
754 
755 
756 nxt_inline nxt_bool_t
757 nxt_queue_chk_remove(nxt_queue_link_t *lnk)
758 {
759     if (lnk->next != NULL) {
760         nxt_queue_remove(lnk);
761 
762         lnk->next = NULL;
763 
764         return 1;
765     }
766 
767     return 0;
768 }
769 
770 
771 nxt_inline void
772 nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc)
773 {
774     int                 ra_use_delta;
775     nxt_req_app_link_t  *ra;
776 
777     if (rc->app_port != NULL) {
778         nxt_router_app_port_release(task, rc->app_port, 0, 1);
779 
780         rc->app_port = NULL;
781     }
782 
783     nxt_router_msg_cancel(task, &rc->msg_info, rc->stream);
784 
785     ra = rc->ra;
786 
787     if (ra != NULL) {
788         rc->ra = NULL;
789         ra->rc = NULL;
790 
791         ra_use_delta = 0;
792 
793         nxt_thread_mutex_lock(&rc->app->mutex);
794 
795         if (ra->link_app_requests.next == NULL
796             && ra->link_port_pending.next == NULL
797             && ra->link_app_pending.next == NULL)
798         {
799             ra = NULL;
800 
801         } else {
802             ra_use_delta -= nxt_queue_chk_remove(&ra->link_app_requests);
803             ra_use_delta -= nxt_queue_chk_remove(&ra->link_port_pending);
804             nxt_queue_chk_remove(&ra->link_app_pending);
805         }
806 
807         nxt_thread_mutex_unlock(&rc->app->mutex);
808 
809         if (ra != NULL) {
810             nxt_router_ra_use(task, ra, ra_use_delta);
811         }
812     }
813 
814     if (rc->app != NULL) {
815         nxt_router_app_use(task, rc->app, -1);
816 
817         rc->app = NULL;
818     }
819 
820     if (rc->ap != NULL) {
821         rc->ap->timer_data = NULL;
822 
823         nxt_app_http_req_done(task, rc->ap);
824 
825         rc->ap = NULL;
826     }
827 }
828 
829 
830 void
831 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
832 {
833     nxt_port_new_port_handler(task, msg);
834 
835     if (msg->u.new_port != NULL
836         && msg->u.new_port->type == NXT_PROCESS_CONTROLLER)
837     {
838         nxt_router_greet_controller(task, msg->u.new_port);
839     }
840 
841     if (msg->port_msg.stream == 0) {
842         return;
843     }
844 
845     if (msg->u.new_port == NULL
846         || msg->u.new_port->type != NXT_PROCESS_WORKER)
847     {
848         msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
849     }
850 
851     nxt_port_rpc_handler(task, msg);
852 }
853 
854 
855 void
856 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
857 {
858     nxt_int_t               ret;
859     nxt_buf_t               *b;
860     nxt_router_temp_conf_t  *tmcf;
861 
862     tmcf = nxt_router_temp_conf(task);
863     if (nxt_slow_path(tmcf == NULL)) {
864         return;
865     }
866 
867     nxt_debug(task, "nxt_router_conf_data_handler(%O): %*s",
868               nxt_buf_used_size(msg->buf),
869               (size_t) nxt_buf_used_size(msg->buf), msg->buf->mem.pos);
870 
871     tmcf->router_conf->router = nxt_router;
872     tmcf->stream = msg->port_msg.stream;
873     tmcf->port = nxt_runtime_port_find(task->thread->runtime,
874                                        msg->port_msg.pid,
875                                        msg->port_msg.reply_port);
876 
877     if (nxt_slow_path(tmcf->port == NULL)) {
878         nxt_alert(task, "reply port not found");
879 
880         return;
881     }
882 
883     nxt_port_use(task, tmcf->port, 1);
884 
885     b = nxt_buf_chk_make_plain(tmcf->router_conf->mem_pool,
886                                msg->buf, msg->size);
887     if (nxt_slow_path(b == NULL)) {
888         nxt_router_conf_error(task, tmcf);
889 
890         return;
891     }
892 
893     ret = nxt_router_conf_create(task, tmcf, b->mem.pos, b->mem.free);
894 
895     if (nxt_fast_path(ret == NXT_OK)) {
896         nxt_router_conf_apply(task, tmcf, NULL);
897 
898     } else {
899         nxt_router_conf_error(task, tmcf);
900     }
901 }
902 
903 
904 static void
905 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
906     void *data)
907 {
908     union {
909         nxt_pid_t  removed_pid;
910         void       *data;
911     } u;
912 
913     u.data = data;
914 
915     nxt_port_rpc_remove_peer(task, port, u.removed_pid);
916 }
917 
918 
919 void
920 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
921 {
922     nxt_event_engine_t  *engine;
923 
924     nxt_port_remove_pid_handler(task, msg);
925 
926     if (msg->port_msg.stream == 0) {
927         return;
928     }
929 
930     nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
931     {
932         nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid,
933                       msg->u.data);
934     }
935     nxt_queue_loop;
936 
937     msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
938 
939     nxt_port_rpc_handler(task, msg);
940 }
941 
942 
943 static nxt_router_temp_conf_t *
944 nxt_router_temp_conf(nxt_task_t *task)
945 {
946     nxt_mp_t                *mp, *tmp;
947     nxt_router_conf_t       *rtcf;
948     nxt_router_temp_conf_t  *tmcf;
949 
950     mp = nxt_mp_create(1024, 128, 256, 32);
951     if (nxt_slow_path(mp == NULL)) {
952         return NULL;
953     }
954 
955     rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t));
956     if (nxt_slow_path(rtcf == NULL)) {
957         goto fail;
958     }
959 
960     rtcf->mem_pool = mp;
961 
962     tmp = nxt_mp_create(1024, 128, 256, 32);
963     if (nxt_slow_path(tmp == NULL)) {
964         goto fail;
965     }
966 
967     tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t));
968     if (nxt_slow_path(tmcf == NULL)) {
969         goto temp_fail;
970     }
971 
972     tmcf->mem_pool = tmp;
973     tmcf->router_conf = rtcf;
974     tmcf->count = 1;
975     tmcf->engine = task->thread->engine;
976 
977     tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
978                                      sizeof(nxt_router_engine_conf_t));
979     if (nxt_slow_path(tmcf->engines == NULL)) {
980         goto temp_fail;
981     }
982 
983     nxt_queue_init(&tmcf->deleting);
984     nxt_queue_init(&tmcf->keeping);
985     nxt_queue_init(&tmcf->updating);
986     nxt_queue_init(&tmcf->pending);
987     nxt_queue_init(&tmcf->creating);
988 
989 #if (NXT_TLS)
990     nxt_queue_init(&tmcf->tls);
991 #endif
992 
993     nxt_queue_init(&tmcf->apps);
994     nxt_queue_init(&tmcf->previous);
995 
996     return tmcf;
997 
998 temp_fail:
999 
1000     nxt_mp_destroy(tmp);
1001 
1002 fail:
1003 
1004     nxt_mp_destroy(mp);
1005 
1006     return NULL;
1007 }
1008 
1009 
1010 nxt_inline nxt_bool_t
1011 nxt_router_app_can_start(nxt_app_t *app)
1012 {
1013     return app->processes + app->pending_processes < app->max_processes
1014             && app->pending_processes < app->max_pending_processes;
1015 }
1016 
1017 
1018 nxt_inline nxt_bool_t
1019 nxt_router_app_need_start(nxt_app_t *app)
1020 {
1021     return app->idle_processes + app->pending_processes
1022             < app->spare_processes;
1023 }
1024 
1025 
1026 static void
1027 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
1028 {
1029     nxt_int_t                    ret;
1030     nxt_app_t                    *app;
1031     nxt_router_t                 *router;
1032     nxt_runtime_t                *rt;
1033     nxt_queue_link_t             *qlk;
1034     nxt_socket_conf_t            *skcf;
1035     nxt_router_conf_t            *rtcf;
1036     nxt_router_temp_conf_t       *tmcf;
1037     const nxt_event_interface_t  *interface;
1038 #if (NXT_TLS)
1039     nxt_router_tlssock_t         *tls;
1040 #endif
1041 
1042     tmcf = obj;
1043 
1044     qlk = nxt_queue_first(&tmcf->pending);
1045 
1046     if (qlk != nxt_queue_tail(&tmcf->pending)) {
1047         nxt_queue_remove(qlk);
1048         nxt_queue_insert_tail(&tmcf->creating, qlk);
1049 
1050         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1051 
1052         nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
1053 
1054         return;
1055     }
1056 
1057 #if (NXT_TLS)
1058     qlk = nxt_queue_first(&tmcf->tls);
1059 
1060     if (qlk != nxt_queue_tail(&tmcf->tls)) {
1061         nxt_queue_remove(qlk);
1062 
1063         tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link);
1064 
1065         nxt_router_tls_rpc_create(task, tmcf, tls);
1066         return;
1067     }
1068 #endif
1069 
1070     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1071 
1072         if (nxt_router_app_need_start(app)) {
1073             nxt_router_app_rpc_create(task, tmcf, app);
1074             return;
1075         }
1076 
1077     } nxt_queue_loop;
1078 
1079     rtcf = tmcf->router_conf;
1080 
1081     if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) {
1082         nxt_router_access_log_open(task, tmcf);
1083         return;
1084     }
1085 
1086     rt = task->thread->runtime;
1087 
1088     interface = nxt_service_get(rt->services, "engine", NULL);
1089 
1090     router = rtcf->router;
1091 
1092     ret = nxt_router_engines_create(task, router, tmcf, interface);
1093     if (nxt_slow_path(ret != NXT_OK)) {
1094         goto fail;
1095     }
1096 
1097     ret = nxt_router_threads_create(task, rt, tmcf);
1098     if (nxt_slow_path(ret != NXT_OK)) {
1099         goto fail;
1100     }
1101 
1102     nxt_router_apps_sort(task, router, tmcf);
1103 
1104     nxt_router_engines_post(router, tmcf);
1105 
1106     nxt_queue_add(&router->sockets, &tmcf->updating);
1107     nxt_queue_add(&router->sockets, &tmcf->creating);
1108 
1109     router->access_log = rtcf->access_log;
1110 
1111     nxt_router_conf_ready(task, tmcf);
1112 
1113     return;
1114 
1115 fail:
1116 
1117     nxt_router_conf_error(task, tmcf);
1118 
1119     return;
1120 }
1121 
1122 
1123 static void
1124 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data)
1125 {
1126     nxt_joint_job_t  *job;
1127 
1128     job = obj;
1129 
1130     nxt_router_conf_ready(task, job->tmcf);
1131 }
1132 
1133 
1134 static void
1135 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1136 {
1137     nxt_debug(task, "temp conf count:%D", tmcf->count);
1138 
1139     if (--tmcf->count == 0) {
1140         nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
1141     }
1142 }
1143 
1144 
1145 static void
1146 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1147 {
1148     nxt_app_t          *app;
1149     nxt_queue_t        new_socket_confs;
1150     nxt_socket_t       s;
1151     nxt_router_t       *router;
1152     nxt_queue_link_t   *qlk;
1153     nxt_socket_conf_t  *skcf;
1154     nxt_router_conf_t  *rtcf;
1155 
1156     nxt_alert(task, "failed to apply new conf");
1157 
1158     for (qlk = nxt_queue_first(&tmcf->creating);
1159          qlk != nxt_queue_tail(&tmcf->creating);
1160          qlk = nxt_queue_next(qlk))
1161     {
1162         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1163         s = skcf->listen->socket;
1164 
1165         if (s != -1) {
1166             nxt_socket_close(task, s);
1167         }
1168 
1169         nxt_free(skcf->listen);
1170     }
1171 
1172     nxt_queue_init(&new_socket_confs);
1173     nxt_queue_add(&new_socket_confs, &tmcf->updating);
1174     nxt_queue_add(&new_socket_confs, &tmcf->pending);
1175     nxt_queue_add(&new_socket_confs, &tmcf->creating);
1176 
1177     rtcf = tmcf->router_conf;
1178 
1179     nxt_http_routes_cleanup(task, rtcf->routes);
1180 
1181     nxt_queue_each(skcf, &new_socket_confs, nxt_socket_conf_t, link) {
1182 
1183         if (skcf->pass != NULL) {
1184             nxt_http_pass_cleanup(task, skcf->pass);
1185         }
1186 
1187     } nxt_queue_loop;
1188 
1189     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1190 
1191         nxt_router_app_unlink(task, app);
1192 
1193     } nxt_queue_loop;
1194 
1195     router = rtcf->router;
1196 
1197     nxt_queue_add(&router->sockets, &tmcf->keeping);
1198     nxt_queue_add(&router->sockets, &tmcf->deleting);
1199 
1200     nxt_queue_add(&router->apps, &tmcf->previous);
1201 
1202     // TODO: new engines and threads
1203 
1204     nxt_router_access_log_release(task, &router->lock, rtcf->access_log);
1205 
1206     nxt_mp_destroy(rtcf->mem_pool);
1207 
1208     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
1209 }
1210 
1211 
1212 static void
1213 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1214     nxt_port_msg_type_t type)
1215 {
1216     nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);
1217 
1218     nxt_port_use(task, tmcf->port, -1);
1219 
1220     tmcf->port = NULL;
1221 }
1222 
1223 
1224 static nxt_conf_map_t  nxt_router_conf[] = {
1225     {
1226         nxt_string("listeners_threads"),
1227         NXT_CONF_MAP_INT32,
1228         offsetof(nxt_router_conf_t, threads),
1229     },
1230 };
1231 
1232 
1233 static nxt_conf_map_t  nxt_router_app_conf[] = {
1234     {
1235         nxt_string("type"),
1236         NXT_CONF_MAP_STR,
1237         offsetof(nxt_router_app_conf_t, type),
1238     },
1239 
1240     {
1241         nxt_string("limits"),
1242         NXT_CONF_MAP_PTR,
1243         offsetof(nxt_router_app_conf_t, limits_value),
1244     },
1245 
1246     {
1247         nxt_string("processes"),
1248         NXT_CONF_MAP_INT32,
1249         offsetof(nxt_router_app_conf_t, processes),
1250     },
1251 
1252     {
1253         nxt_string("processes"),
1254         NXT_CONF_MAP_PTR,
1255         offsetof(nxt_router_app_conf_t, processes_value),
1256     },
1257 };
1258 
1259 
1260 static nxt_conf_map_t  nxt_router_app_limits_conf[] = {
1261     {
1262         nxt_string("timeout"),
1263         NXT_CONF_MAP_MSEC,
1264         offsetof(nxt_router_app_conf_t, timeout),
1265     },
1266 
1267     {
1268         nxt_string("reschedule_timeout"),
1269         NXT_CONF_MAP_MSEC,
1270         offsetof(nxt_router_app_conf_t, res_timeout),
1271     },
1272 
1273     {
1274         nxt_string("requests"),
1275         NXT_CONF_MAP_INT32,
1276         offsetof(nxt_router_app_conf_t, requests),
1277     },
1278 };
1279 
1280 
1281 static nxt_conf_map_t  nxt_router_app_processes_conf[] = {
1282     {
1283         nxt_string("spare"),
1284         NXT_CONF_MAP_INT32,
1285         offsetof(nxt_router_app_conf_t, spare_processes),
1286     },
1287 
1288     {
1289         nxt_string("max"),
1290         NXT_CONF_MAP_INT32,
1291         offsetof(nxt_router_app_conf_t, max_processes),
1292     },
1293 
1294     {
1295         nxt_string("idle_timeout"),
1296         NXT_CONF_MAP_MSEC,
1297         offsetof(nxt_router_app_conf_t, idle_timeout),
1298     },
1299 };
1300 
1301 
1302 static nxt_conf_map_t  nxt_router_listener_conf[] = {
1303     {
1304         nxt_string("pass"),
1305         NXT_CONF_MAP_STR_COPY,
1306         offsetof(nxt_router_listener_conf_t, pass),
1307     },
1308 
1309     {
1310         nxt_string("application"),
1311         NXT_CONF_MAP_STR_COPY,
1312         offsetof(nxt_router_listener_conf_t, application),
1313     },
1314 };
1315 
1316 
1317 static nxt_conf_map_t  nxt_router_http_conf[] = {
1318     {
1319         nxt_string("header_buffer_size"),
1320         NXT_CONF_MAP_SIZE,
1321         offsetof(nxt_socket_conf_t, header_buffer_size),
1322     },
1323 
1324     {
1325         nxt_string("large_header_buffer_size"),
1326         NXT_CONF_MAP_SIZE,
1327         offsetof(nxt_socket_conf_t, large_header_buffer_size),
1328     },
1329 
1330     {
1331         nxt_string("large_header_buffers"),
1332         NXT_CONF_MAP_SIZE,
1333         offsetof(nxt_socket_conf_t, large_header_buffers),
1334     },
1335 
1336     {
1337         nxt_string("body_buffer_size"),
1338         NXT_CONF_MAP_SIZE,
1339         offsetof(nxt_socket_conf_t, body_buffer_size),
1340     },
1341 
1342     {
1343         nxt_string("max_body_size"),
1344         NXT_CONF_MAP_SIZE,
1345         offsetof(nxt_socket_conf_t, max_body_size),
1346     },
1347 
1348     {
1349         nxt_string("idle_timeout"),
1350         NXT_CONF_MAP_MSEC,
1351         offsetof(nxt_socket_conf_t, idle_timeout),
1352     },
1353 
1354     {
1355         nxt_string("header_read_timeout"),
1356         NXT_CONF_MAP_MSEC,
1357         offsetof(nxt_socket_conf_t, header_read_timeout),
1358     },
1359 
1360     {
1361         nxt_string("body_read_timeout"),
1362         NXT_CONF_MAP_MSEC,
1363         offsetof(nxt_socket_conf_t, body_read_timeout),
1364     },
1365 
1366     {
1367         nxt_string("send_timeout"),
1368         NXT_CONF_MAP_MSEC,
1369         offsetof(nxt_socket_conf_t, send_timeout),
1370     },
1371 };
1372 
1373 
1374 static nxt_int_t
1375 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1376     u_char *start, u_char *end)
1377 {
1378     u_char                      *p;
1379     size_t                      size;
1380     nxt_mp_t                    *mp;
1381     uint32_t                    next;
1382     nxt_int_t                   ret;
1383     nxt_str_t                   name, path;
1384     nxt_app_t                   *app, *prev;
1385     nxt_router_t                *router;
1386     nxt_app_joint_t             *app_joint;
1387     nxt_conf_value_t            *conf, *http, *value;
1388     nxt_conf_value_t            *applications, *application;
1389     nxt_conf_value_t            *listeners, *listener;
1390     nxt_conf_value_t            *routes_conf;
1391     nxt_socket_conf_t           *skcf;
1392     nxt_http_routes_t           *routes;
1393     nxt_event_engine_t          *engine;
1394     nxt_app_lang_module_t       *lang;
1395     nxt_router_app_conf_t       apcf;
1396     nxt_router_access_log_t     *access_log;
1397     nxt_router_listener_conf_t  lscf;
1398 #if (NXT_TLS)
1399     nxt_router_tlssock_t        *tls;
1400 #endif
1401 
1402     static nxt_str_t  http_path = nxt_string("/settings/http");
1403     static nxt_str_t  applications_path = nxt_string("/applications");
1404     static nxt_str_t  listeners_path = nxt_string("/listeners");
1405     static nxt_str_t  routes_path = nxt_string("/routes");
1406     static nxt_str_t  access_log_path = nxt_string("/access_log");
1407 #if (NXT_TLS)
1408     static nxt_str_t  certificate_path = nxt_string("/tls/certificate");
1409 #endif
1410 
1411     conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
1412     if (conf == NULL) {
1413         nxt_alert(task, "configuration parsing error");
1414         return NXT_ERROR;
1415     }
1416 
1417     mp = tmcf->router_conf->mem_pool;
1418 
1419     ret = nxt_conf_map_object(mp, conf, nxt_router_conf,
1420                               nxt_nitems(nxt_router_conf), tmcf->router_conf);
1421     if (ret != NXT_OK) {
1422         nxt_alert(task, "root map error");
1423         return NXT_ERROR;
1424     }
1425 
1426     if (tmcf->router_conf->threads == 0) {
1427         tmcf->router_conf->threads = nxt_ncpu;
1428     }
1429 
1430     applications = nxt_conf_get_path(conf, &applications_path);
1431     if (applications == NULL) {
1432         nxt_alert(task, "no \"applications\" block");
1433         return NXT_ERROR;
1434     }
1435 
1436     router = tmcf->router_conf->router;
1437 
1438     next = 0;
1439 
1440     for ( ;; ) {
1441         application = nxt_conf_next_object_member(applications, &name, &next);
1442         if (application == NULL) {
1443             break;
1444         }
1445 
1446         nxt_debug(task, "application \"%V\"", &name);
1447 
1448         size = nxt_conf_json_length(application, NULL);
1449 
1450         app = nxt_malloc(sizeof(nxt_app_t) + name.length + size);
1451         if (app == NULL) {
1452             goto fail;
1453         }
1454 
1455         nxt_memzero(app, sizeof(nxt_app_t));
1456 
1457         app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t));
1458         app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t) + name.length);
1459 
1460         p = nxt_conf_json_print(app->conf.start, application, NULL);
1461         app->conf.length = p - app->conf.start;
1462 
1463         nxt_assert(app->conf.length <= size);
1464 
1465         nxt_debug(task, "application conf \"%V\"", &app->conf);
1466 
1467         prev = nxt_router_app_find(&router->apps, &name);
1468 
1469         if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
1470             nxt_free(app);
1471 
1472             nxt_queue_remove(&prev->link);
1473             nxt_queue_insert_tail(&tmcf->previous, &prev->link);
1474             continue;
1475         }
1476 
1477         apcf.processes = 1;
1478         apcf.max_processes = 1;
1479         apcf.spare_processes = 0;
1480         apcf.timeout = 0;
1481         apcf.res_timeout = 1000;
1482         apcf.idle_timeout = 15000;
1483         apcf.requests = 0;
1484         apcf.limits_value = NULL;
1485         apcf.processes_value = NULL;
1486 
1487         app_joint = nxt_malloc(sizeof(nxt_app_joint_t));
1488         if (nxt_slow_path(app_joint == NULL)) {
1489             goto app_fail;
1490         }
1491 
1492         nxt_memzero(app_joint, sizeof(nxt_app_joint_t));
1493 
1494         ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
1495                                   nxt_nitems(nxt_router_app_conf), &apcf);
1496         if (ret != NXT_OK) {
1497             nxt_alert(task, "application map error");
1498             goto app_fail;
1499         }
1500 
1501         if (apcf.limits_value != NULL) {
1502 
1503             if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) {
1504                 nxt_alert(task, "application limits is not object");
1505                 goto app_fail;
1506             }
1507 
1508             ret = nxt_conf_map_object(mp, apcf.limits_value,
1509                                       nxt_router_app_limits_conf,
1510                                       nxt_nitems(nxt_router_app_limits_conf),
1511                                       &apcf);
1512             if (ret != NXT_OK) {
1513                 nxt_alert(task, "application limits map error");
1514                 goto app_fail;
1515             }
1516         }
1517 
1518         if (apcf.processes_value != NULL
1519             && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT)
1520         {
1521             ret = nxt_conf_map_object(mp, apcf.processes_value,
1522                                       nxt_router_app_processes_conf,
1523                                       nxt_nitems(nxt_router_app_processes_conf),
1524                                       &apcf);
1525             if (ret != NXT_OK) {
1526                 nxt_alert(task, "application processes map error");
1527                 goto app_fail;
1528             }
1529 
1530         } else {
1531             apcf.max_processes = apcf.processes;
1532             apcf.spare_processes = apcf.processes;
1533         }
1534 
1535         nxt_debug(task, "application type: %V", &apcf.type);
1536         nxt_debug(task, "application processes: %D", apcf.processes);
1537         nxt_debug(task, "application request timeout: %M", apcf.timeout);
1538         nxt_debug(task, "application reschedule timeout: %M", apcf.res_timeout);
1539         nxt_debug(task, "application requests: %D", apcf.requests);
1540 
1541         lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
1542 
1543         if (lang == NULL) {
1544             nxt_alert(task, "unknown application type: \"%V\"", &apcf.type);
1545             goto app_fail;
1546         }
1547 
1548         nxt_debug(task, "application language module: \"%s\"", lang->file);
1549 
1550         ret = nxt_thread_mutex_create(&app->mutex);
1551         if (ret != NXT_OK) {
1552             goto app_fail;
1553         }
1554 
1555         nxt_queue_init(&app->ports);
1556         nxt_queue_init(&app->spare_ports);
1557         nxt_queue_init(&app->idle_ports);
1558         nxt_queue_init(&app->requests);
1559         nxt_queue_init(&app->pending);
1560 
1561         app->name.length = name.length;
1562         nxt_memcpy(app->name.start, name.start, name.length);
1563 
1564         app->type = lang->type;
1565         app->max_processes = apcf.max_processes;
1566         app->spare_processes = apcf.spare_processes;
1567         app->max_pending_processes = apcf.spare_processes
1568                                       ? apcf.spare_processes : 1;
1569         app->timeout = apcf.timeout;
1570         app->res_timeout = apcf.res_timeout * 1000000;
1571         app->idle_timeout = apcf.idle_timeout;
1572         app->max_pending_responses = 2;
1573         app->max_requests = apcf.requests;
1574 
1575         engine = task->thread->engine;
1576 
1577         app->engine = engine;
1578 
1579         app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;
1580         app->adjust_idle_work.task = &engine->task;
1581         app->adjust_idle_work.obj = app;
1582 
1583         nxt_queue_insert_tail(&tmcf->apps, &app->link);
1584 
1585         nxt_router_app_use(task, app, 1);
1586 
1587         app->joint = app_joint;
1588 
1589         app_joint->use_count = 1;
1590         app_joint->app = app;
1591 
1592         app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS;
1593         app_joint->idle_timer.work_queue = &engine->fast_work_queue;
1594         app_joint->idle_timer.handler = nxt_router_app_idle_timeout;
1595         app_joint->idle_timer.task = &engine->task;
1596         app_joint->idle_timer.log = app_joint->idle_timer.task->log;
1597 
1598         app_joint->free_app_work.handler = nxt_router_free_app;
1599         app_joint->free_app_work.task = &engine->task;
1600         app_joint->free_app_work.obj = app_joint;
1601     }
1602 
1603     routes_conf = nxt_conf_get_path(conf, &routes_path);
1604     if (nxt_fast_path(routes_conf != NULL)) {
1605         routes = nxt_http_routes_create(task, tmcf, routes_conf);
1606         if (nxt_slow_path(routes == NULL)) {
1607             return NXT_ERROR;
1608         }
1609         tmcf->router_conf->routes = routes;
1610     }
1611 
1612     http = nxt_conf_get_path(conf, &http_path);
1613 #if 0
1614     if (http == NULL) {
1615         nxt_alert(task, "no \"http\" block");
1616         return NXT_ERROR;
1617     }
1618 #endif
1619 
1620     listeners = nxt_conf_get_path(conf, &listeners_path);
1621     if (listeners == NULL) {
1622         nxt_alert(task, "no \"listeners\" block");
1623         return NXT_ERROR;
1624     }
1625 
1626     next = 0;
1627 
1628     for ( ;; ) {
1629         listener = nxt_conf_next_object_member(listeners, &name, &next);
1630         if (listener == NULL) {
1631             break;
1632         }
1633 
1634         skcf = nxt_router_socket_conf(task, tmcf, &name);
1635         if (skcf == NULL) {
1636             goto fail;
1637         }
1638 
1639         nxt_memzero(&lscf, sizeof(lscf));
1640 
1641         ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
1642                                   nxt_nitems(nxt_router_listener_conf), &lscf);
1643         if (ret != NXT_OK) {
1644             nxt_alert(task, "listener map error");
1645             goto fail;
1646         }
1647 
1648         nxt_debug(task, "application: %V", &lscf.application);
1649 
1650         // STUB, default values if http block is not defined.
1651         skcf->header_buffer_size = 2048;
1652         skcf->large_header_buffer_size = 8192;
1653         skcf->large_header_buffers = 4;
1654         skcf->body_buffer_size = 16 * 1024;
1655         skcf->max_body_size = 8 * 1024 * 1024;
1656         skcf->idle_timeout = 180 * 1000;
1657         skcf->header_read_timeout = 30 * 1000;
1658         skcf->body_read_timeout = 30 * 1000;
1659         skcf->send_timeout = 30 * 1000;
1660 
1661         if (http != NULL) {
1662             ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
1663                                       nxt_nitems(nxt_router_http_conf), skcf);
1664             if (ret != NXT_OK) {
1665                 nxt_alert(task, "http map error");
1666                 goto fail;
1667             }
1668         }
1669 
1670 #if (NXT_TLS)
1671 
1672         value = nxt_conf_get_path(listener, &certificate_path);
1673 
1674         if (value != NULL) {
1675             nxt_conf_get_string(value, &name);
1676 
1677             tls = nxt_mp_get(mp, sizeof(nxt_router_tlssock_t));
1678             if (nxt_slow_path(tls == NULL)) {
1679                 goto fail;
1680             }
1681 
1682             tls->name = name;
1683             tls->conf = skcf;
1684 
1685             nxt_queue_insert_tail(&tmcf->tls, &tls->link);
1686         }
1687 
1688 #endif
1689 
1690         skcf->listen->handler = nxt_http_conn_init;
1691         skcf->router_conf = tmcf->router_conf;
1692         skcf->router_conf->count++;
1693 
1694         if (lscf.pass.length != 0) {
1695             skcf->pass = nxt_http_pass_create(task, tmcf, &lscf.pass);
1696 
1697         /* COMPATIBILITY: listener application. */
1698         } else if (lscf.application.length > 0) {
1699             skcf->pass = nxt_http_pass_application(task, tmcf,
1700                                                    &lscf.application);
1701         }
1702     }
1703 
1704     value = nxt_conf_get_path(conf, &access_log_path);
1705 
1706     if (value != NULL) {
1707         nxt_conf_get_string(value, &path);
1708 
1709         access_log = router->access_log;
1710 
1711         if (access_log != NULL && nxt_strstr_eq(&path, &access_log->path)) {
1712             nxt_thread_spin_lock(&router->lock);
1713             access_log->count++;
1714             nxt_thread_spin_unlock(&router->lock);
1715 
1716         } else {
1717             access_log = nxt_malloc(sizeof(nxt_router_access_log_t)
1718                                     + path.length);
1719             if (access_log == NULL) {
1720                 nxt_alert(task, "failed to allocate access log structure");
1721                 goto fail;
1722             }
1723 
1724             access_log->fd = -1;
1725             access_log->handler = &nxt_router_access_log_writer;
1726             access_log->count = 1;
1727 
1728             access_log->path.length = path.length;
1729             access_log->path.start = (u_char *) access_log
1730                                      + sizeof(nxt_router_access_log_t);
1731 
1732             nxt_memcpy(access_log->path.start, path.start, path.length);
1733         }
1734 
1735         tmcf->router_conf->access_log = access_log;
1736     }
1737 
1738     nxt_http_routes_resolve(task, tmcf);
1739 
1740     nxt_queue_add(&tmcf->deleting, &router->sockets);
1741     nxt_queue_init(&router->sockets);
1742 
1743     return NXT_OK;
1744 
1745 app_fail:
1746 
1747     nxt_free(app);
1748 
1749 fail:
1750 
1751     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1752 
1753         nxt_queue_remove(&app->link);
1754         nxt_thread_mutex_destroy(&app->mutex);
1755         nxt_free(app);
1756 
1757     } nxt_queue_loop;
1758 
1759     return NXT_ERROR;
1760 }
1761 
1762 
1763 static nxt_app_t *
1764 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
1765 {
1766     nxt_app_t  *app;
1767 
1768     nxt_queue_each(app, queue, nxt_app_t, link) {
1769 
1770         if (nxt_strstr_eq(name, &app->name)) {
1771             return app;
1772         }
1773 
1774     } nxt_queue_loop;
1775 
1776     return NULL;
1777 }
1778 
1779 
1780 nxt_app_t *
1781 nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name)
1782 {
1783     nxt_app_t  *app;
1784 
1785     app = nxt_router_app_find(&tmcf->apps, name);
1786 
1787     if (app == NULL) {
1788         app = nxt_router_app_find(&tmcf->previous, name);
1789     }
1790 
1791     return app;
1792 }
1793 
1794 
1795 static nxt_socket_conf_t *
1796 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1797     nxt_str_t *name)
1798 {
1799     size_t               size;
1800     nxt_int_t            ret;
1801     nxt_bool_t           wildcard;
1802     nxt_sockaddr_t       *sa;
1803     nxt_socket_conf_t    *skcf;
1804     nxt_listen_socket_t  *ls;
1805 
1806     sa = nxt_sockaddr_parse(tmcf->mem_pool, name);
1807     if (nxt_slow_path(sa == NULL)) {
1808         nxt_alert(task, "invalid listener \"%V\"", name);
1809         return NULL;
1810     }
1811 
1812     sa->type = SOCK_STREAM;
1813 
1814     nxt_debug(task, "router listener: \"%*s\"",
1815               (size_t) sa->length, nxt_sockaddr_start(sa));
1816 
1817     skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t));
1818     if (nxt_slow_path(skcf == NULL)) {
1819         return NULL;
1820     }
1821 
1822     size = nxt_sockaddr_size(sa);
1823 
1824     ret = nxt_router_listen_socket_find(tmcf, skcf, sa);
1825 
1826     if (ret != NXT_OK) {
1827 
1828         ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size);
1829         if (nxt_slow_path(ls == NULL)) {
1830             return NULL;
1831         }
1832 
1833         skcf->listen = ls;
1834 
1835         ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t));
1836         nxt_memcpy(ls->sockaddr, sa, size);
1837 
1838         nxt_listen_socket_remote_size(ls);
1839 
1840         ls->socket = -1;
1841         ls->backlog = NXT_LISTEN_BACKLOG;
1842         ls->flags = NXT_NONBLOCK;
1843         ls->read_after_accept = 1;
1844     }
1845 
1846     switch (sa->u.sockaddr.sa_family) {
1847 #if (NXT_HAVE_UNIX_DOMAIN)
1848     case AF_UNIX:
1849         wildcard = 0;
1850         break;
1851 #endif
1852 #if (NXT_INET6)
1853     case AF_INET6:
1854         wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr);
1855         break;
1856 #endif
1857     case AF_INET:
1858     default:
1859         wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY);
1860         break;
1861     }
1862 
1863     if (!wildcard) {
1864         skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size);
1865         if (nxt_slow_path(skcf->sockaddr == NULL)) {
1866             return NULL;
1867         }
1868 
1869         nxt_memcpy(skcf->sockaddr, sa, size);
1870     }
1871 
1872     return skcf;
1873 }
1874 
1875 
1876 static nxt_int_t
1877 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
1878     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa)
1879 {
1880     nxt_router_t       *router;
1881     nxt_queue_link_t   *qlk;
1882     nxt_socket_conf_t  *skcf;
1883 
1884     router = tmcf->router_conf->router;
1885 
1886     for (qlk = nxt_queue_first(&router->sockets);
1887          qlk != nxt_queue_tail(&router->sockets);
1888          qlk = nxt_queue_next(qlk))
1889     {
1890         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1891 
1892         if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) {
1893             nskcf->listen = skcf->listen;
1894 
1895             nxt_queue_remove(qlk);
1896             nxt_queue_insert_tail(&tmcf->keeping, qlk);
1897 
1898             nxt_queue_insert_tail(&tmcf->updating, &nskcf->link);
1899 
1900             return NXT_OK;
1901         }
1902     }
1903 
1904     nxt_queue_insert_tail(&tmcf->pending, &nskcf->link);
1905 
1906     return NXT_DECLINED;
1907 }
1908 
1909 
1910 static void
1911 nxt_router_listen_socket_rpc_create(nxt_task_t *task,
1912     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)
1913 {
1914     size_t            size;
1915     uint32_t          stream;
1916     nxt_int_t         ret;
1917     nxt_buf_t         *b;
1918     nxt_port_t        *main_port, *router_port;
1919     nxt_runtime_t     *rt;
1920     nxt_socket_rpc_t  *rpc;
1921 
1922     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
1923     if (rpc == NULL) {
1924         goto fail;
1925     }
1926 
1927     rpc->socket_conf = skcf;
1928     rpc->temp_conf = tmcf;
1929 
1930     size = nxt_sockaddr_size(skcf->listen->sockaddr);
1931 
1932     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
1933     if (b == NULL) {
1934         goto fail;
1935     }
1936 
1937     b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size);
1938 
1939     rt = task->thread->runtime;
1940     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
1941     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
1942 
1943     stream = nxt_port_rpc_register_handler(task, router_port,
1944                                            nxt_router_listen_socket_ready,
1945                                            nxt_router_listen_socket_error,
1946                                            main_port->pid, rpc);
1947     if (nxt_slow_path(stream == 0)) {
1948         goto fail;
1949     }
1950 
1951     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1,
1952                                 stream, router_port->id, b);
1953 
1954     if (nxt_slow_path(ret != NXT_OK)) {
1955         nxt_port_rpc_cancel(task, router_port, stream);
1956         goto fail;
1957     }
1958 
1959     return;
1960 
1961 fail:
1962 
1963     nxt_router_conf_error(task, tmcf);
1964 }
1965 
1966 
1967 static void
1968 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
1969     void *data)
1970 {
1971     nxt_int_t         ret;
1972     nxt_socket_t      s;
1973     nxt_socket_rpc_t  *rpc;
1974 
1975     rpc = data;
1976 
1977     s = msg->fd;
1978 
1979     ret = nxt_socket_nonblocking(task, s);
1980     if (nxt_slow_path(ret != NXT_OK)) {
1981         goto fail;
1982     }
1983 
1984     nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr);
1985 
1986     ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
1987     if (nxt_slow_path(ret != NXT_OK)) {
1988         goto fail;
1989     }
1990 
1991     rpc->socket_conf->listen->socket = s;
1992 
1993     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
1994                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
1995 
1996     return;
1997 
1998 fail:
1999 
2000     nxt_socket_close(task, s);
2001 
2002     nxt_router_conf_error(task, rpc->temp_conf);
2003 }
2004 
2005 
2006 static void
2007 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2008     void *data)
2009 {
2010     nxt_socket_rpc_t        *rpc;
2011     nxt_router_temp_conf_t  *tmcf;
2012 
2013     rpc = data;
2014     tmcf = rpc->temp_conf;
2015 
2016 #if 0
2017     u_char                  *p;
2018     size_t                  size;
2019     uint8_t                 error;
2020     nxt_buf_t               *in, *out;
2021     nxt_sockaddr_t          *sa;
2022 
2023     static nxt_str_t  socket_errors[] = {
2024         nxt_string("ListenerSystem"),
2025         nxt_string("ListenerNoIPv6"),
2026         nxt_string("ListenerPort"),
2027         nxt_string("ListenerInUse"),
2028         nxt_string("ListenerNoAddress"),
2029         nxt_string("ListenerNoAccess"),
2030         nxt_string("ListenerPath"),
2031     };
2032 
2033     sa = rpc->socket_conf->listen->sockaddr;
2034 
2035     in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size);
2036 
2037     if (nxt_slow_path(in == NULL)) {
2038         return;
2039     }
2040 
2041     p = in->mem.pos;
2042 
2043     error = *p++;
2044 
2045     size = nxt_length("listen socket error: ")
2046            + nxt_length("{listener: \"\", code:\"\", message: \"\"}")
2047            + sa->length + socket_errors[error].length + (in->mem.free - p);
2048 
2049     out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2050     if (nxt_slow_path(out == NULL)) {
2051         return;
2052     }
2053 
2054     out->mem.free = nxt_sprintf(out->mem.free, out->mem.end,
2055                         "listen socket error: "
2056                         "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}",
2057                         (size_t) sa->length, nxt_sockaddr_start(sa),
2058                         &socket_errors[error], in->mem.free - p, p);
2059 
2060     nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos);
2061 #endif
2062 
2063     nxt_router_conf_error(task, tmcf);
2064 }
2065 
2066 
2067 #if (NXT_TLS)
2068 
2069 static void
2070 nxt_router_tls_rpc_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2071     nxt_router_tlssock_t *tls)
2072 {
2073     nxt_socket_rpc_t  *rpc;
2074 
2075     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
2076     if (rpc == NULL) {
2077         nxt_router_conf_error(task, tmcf);
2078         return;
2079     }
2080 
2081     rpc->socket_conf = tls->conf;
2082     rpc->temp_conf = tmcf;
2083 
2084     nxt_cert_store_get(task, &tls->name, tmcf->mem_pool,
2085                        nxt_router_tls_rpc_handler, rpc);
2086 }
2087 
2088 
2089 static void
2090 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2091     void *data)
2092 {
2093     nxt_mp_t               *mp;
2094     nxt_int_t              ret;
2095     nxt_tls_conf_t         *tlscf;
2096     nxt_socket_rpc_t       *rpc;
2097     nxt_router_temp_conf_t *tmcf;
2098 
2099     nxt_debug(task, "tls rpc handler");
2100 
2101     rpc = data;
2102     tmcf = rpc->temp_conf;
2103 
2104     if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) {
2105         goto fail;
2106     }
2107 
2108     mp = tmcf->router_conf->mem_pool;
2109 
2110     tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t));
2111     if (nxt_slow_path(tlscf == NULL)) {
2112         goto fail;
2113     }
2114 
2115     tlscf->chain_file = msg->fd;
2116 
2117     ret = task->thread->runtime->tls->server_init(task, tlscf);
2118     if (nxt_slow_path(ret != NXT_OK)) {
2119         goto fail;
2120     }
2121 
2122     rpc->socket_conf->tls = tlscf;
2123 
2124     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2125                        nxt_router_conf_apply, task, tmcf, NULL);
2126     return;
2127 
2128 fail:
2129 
2130     nxt_router_conf_error(task, tmcf);
2131 }
2132 
2133 #endif
2134 
2135 
2136 static void
2137 nxt_router_app_rpc_create(nxt_task_t *task,
2138     nxt_router_temp_conf_t *tmcf, nxt_app_t *app)
2139 {
2140     size_t         size;
2141     uint32_t       stream;
2142     nxt_int_t      ret;
2143     nxt_buf_t      *b;
2144     nxt_port_t     *main_port, *router_port;
2145     nxt_runtime_t  *rt;
2146     nxt_app_rpc_t  *rpc;
2147 
2148     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_app_rpc_t));
2149     if (rpc == NULL) {
2150         goto fail;
2151     }
2152 
2153     rpc->app = app;
2154     rpc->temp_conf = tmcf;
2155 
2156     nxt_debug(task, "app '%V' prefork", &app->name);
2157 
2158     size = app->name.length + 1 + app->conf.length;
2159 
2160     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2161     if (nxt_slow_path(b == NULL)) {
2162         goto fail;
2163     }
2164 
2165     nxt_buf_cpystr(b, &app->name);
2166     *b->mem.free++ = '\0';
2167     nxt_buf_cpystr(b, &app->conf);
2168 
2169     rt = task->thread->runtime;
2170     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2171     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2172 
2173     stream = nxt_port_rpc_register_handler(task, router_port,
2174                                            nxt_router_app_prefork_ready,
2175                                            nxt_router_app_prefork_error,
2176                                            -1, rpc);
2177     if (nxt_slow_path(stream == 0)) {
2178         goto fail;
2179     }
2180 
2181     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1,
2182                                 stream, router_port->id, b);
2183 
2184     if (nxt_slow_path(ret != NXT_OK)) {
2185         nxt_port_rpc_cancel(task, router_port, stream);
2186         goto fail;
2187     }
2188 
2189     app->pending_processes++;
2190 
2191     return;
2192 
2193 fail:
2194 
2195     nxt_router_conf_error(task, tmcf);
2196 }
2197 
2198 
2199 static void
2200 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2201     void *data)
2202 {
2203     nxt_app_t           *app;
2204     nxt_port_t          *port;
2205     nxt_app_rpc_t       *rpc;
2206     nxt_event_engine_t  *engine;
2207 
2208     rpc = data;
2209     app = rpc->app;
2210 
2211     port = msg->u.new_port;
2212     port->app = app;
2213 
2214     app->pending_processes--;
2215     app->processes++;
2216     app->idle_processes++;
2217 
2218     engine = task->thread->engine;
2219 
2220     nxt_queue_insert_tail(&app->ports, &port->app_link);
2221     nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
2222 
2223     port->idle_start = 0;
2224 
2225     nxt_port_inc_use(port);
2226 
2227     nxt_work_queue_add(&engine->fast_work_queue,
2228                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2229 }
2230 
2231 
2232 static void
2233 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2234     void *data)
2235 {
2236     nxt_app_t               *app;
2237     nxt_app_rpc_t           *rpc;
2238     nxt_router_temp_conf_t  *tmcf;
2239 
2240     rpc = data;
2241     app = rpc->app;
2242     tmcf = rpc->temp_conf;
2243 
2244     nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"",
2245             &app->name);
2246 
2247     app->pending_processes--;
2248 
2249     nxt_router_conf_error(task, tmcf);
2250 }
2251 
2252 
2253 static nxt_int_t
2254 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
2255     nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
2256 {
2257     nxt_int_t                 ret;
2258     nxt_uint_t                n, threads;
2259     nxt_queue_link_t          *qlk;
2260     nxt_router_engine_conf_t  *recf;
2261 
2262     threads = tmcf->router_conf->threads;
2263 
2264     tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
2265                                      sizeof(nxt_router_engine_conf_t));
2266     if (nxt_slow_path(tmcf->engines == NULL)) {
2267         return NXT_ERROR;
2268     }
2269 
2270     n = 0;
2271 
2272     for (qlk = nxt_queue_first(&router->engines);
2273          qlk != nxt_queue_tail(&router->engines);
2274          qlk = nxt_queue_next(qlk))
2275     {
2276         recf = nxt_array_zero_add(tmcf->engines);
2277         if (nxt_slow_path(recf == NULL)) {
2278             return NXT_ERROR;
2279         }
2280 
2281         recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0);
2282 
2283         if (n < threads) {
2284             recf->action = NXT_ROUTER_ENGINE_KEEP;
2285             ret = nxt_router_engine_conf_update(tmcf, recf);
2286 
2287         } else {
2288             recf->action = NXT_ROUTER_ENGINE_DELETE;
2289             ret = nxt_router_engine_conf_delete(tmcf, recf);
2290         }
2291 
2292         if (nxt_slow_path(ret != NXT_OK)) {
2293             return ret;
2294         }
2295 
2296         n++;
2297     }
2298 
2299     tmcf->new_threads = n;
2300 
2301     while (n < threads) {
2302         recf = nxt_array_zero_add(tmcf->engines);
2303         if (nxt_slow_path(recf == NULL)) {
2304             return NXT_ERROR;
2305         }
2306 
2307         recf->action = NXT_ROUTER_ENGINE_ADD;
2308 
2309         recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
2310         if (nxt_slow_path(recf->engine == NULL)) {
2311             return NXT_ERROR;
2312         }
2313 
2314         ret = nxt_router_engine_conf_create(tmcf, recf);
2315         if (nxt_slow_path(ret != NXT_OK)) {
2316             return ret;
2317         }
2318 
2319         n++;
2320     }
2321 
2322     return NXT_OK;
2323 }
2324 
2325 
2326 static nxt_int_t
2327 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
2328     nxt_router_engine_conf_t *recf)
2329 {
2330     nxt_int_t  ret;
2331 
2332     ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating,
2333                                           nxt_router_listen_socket_create);
2334     if (nxt_slow_path(ret != NXT_OK)) {
2335         return ret;
2336     }
2337 
2338     ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating,
2339                                           nxt_router_listen_socket_create);
2340     if (nxt_slow_path(ret != NXT_OK)) {
2341         return ret;
2342     }
2343 
2344     return ret;
2345 }
2346 
2347 
2348 static nxt_int_t
2349 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
2350     nxt_router_engine_conf_t *recf)
2351 {
2352     nxt_int_t  ret;
2353 
2354     ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating,
2355                                           nxt_router_listen_socket_create);
2356     if (nxt_slow_path(ret != NXT_OK)) {
2357         return ret;
2358     }
2359 
2360     ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating,
2361                                           nxt_router_listen_socket_update);
2362     if (nxt_slow_path(ret != NXT_OK)) {
2363         return ret;
2364     }
2365 
2366     ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting);
2367     if (nxt_slow_path(ret != NXT_OK)) {
2368         return ret;
2369     }
2370 
2371     return ret;
2372 }
2373 
2374 
2375 static nxt_int_t
2376 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
2377     nxt_router_engine_conf_t *recf)
2378 {
2379     nxt_int_t  ret;
2380 
2381     ret = nxt_router_engine_quit(tmcf, recf);
2382     if (nxt_slow_path(ret != NXT_OK)) {
2383         return ret;
2384     }
2385 
2386     ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->updating);
2387     if (nxt_slow_path(ret != NXT_OK)) {
2388         return ret;
2389     }
2390 
2391     return nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting);
2392 }
2393 
2394 
2395 static nxt_int_t
2396 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
2397     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
2398     nxt_work_handler_t handler)
2399 {
2400     nxt_joint_job_t          *job;
2401     nxt_queue_link_t         *qlk;
2402     nxt_socket_conf_t        *skcf;
2403     nxt_socket_conf_joint_t  *joint;
2404 
2405     for (qlk = nxt_queue_first(sockets);
2406          qlk != nxt_queue_tail(sockets);
2407          qlk = nxt_queue_next(qlk))
2408     {
2409         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
2410         if (nxt_slow_path(job == NULL)) {
2411             return NXT_ERROR;
2412         }
2413 
2414         job->work.next = recf->jobs;
2415         recf->jobs = &job->work;
2416 
2417         job->task = tmcf->engine->task;
2418         job->work.handler = handler;
2419         job->work.task = &job->task;
2420         job->work.obj = job;
2421         job->tmcf = tmcf;
2422 
2423         tmcf->count++;
2424 
2425         joint = nxt_mp_alloc(tmcf->router_conf->mem_pool,
2426                              sizeof(nxt_socket_conf_joint_t));
2427         if (nxt_slow_path(joint == NULL)) {
2428             return NXT_ERROR;
2429         }
2430 
2431         job->work.data = joint;
2432 
2433         joint->count = 1;
2434 
2435         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2436         skcf->count++;
2437         joint->socket_conf = skcf;
2438 
2439         joint->engine = recf->engine;
2440     }
2441 
2442     return NXT_OK;
2443 }
2444 
2445 
2446 static nxt_int_t
2447 nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
2448     nxt_router_engine_conf_t *recf)
2449 {
2450     nxt_joint_job_t  *job;
2451 
2452     job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
2453     if (nxt_slow_path(job == NULL)) {
2454         return NXT_ERROR;
2455     }
2456 
2457     job->work.next = recf->jobs;
2458     recf->jobs = &job->work;
2459 
2460     job->task = tmcf->engine->task;
2461     job->work.handler = nxt_router_worker_thread_quit;
2462     job->work.task = &job->task;
2463     job->work.obj = NULL;
2464     job->work.data = NULL;
2465     job->tmcf = NULL;
2466 
2467     return NXT_OK;
2468 }
2469 
2470 
2471 static nxt_int_t
2472 nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
2473     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets)
2474 {
2475     nxt_joint_job_t   *job;
2476     nxt_queue_link_t  *qlk;
2477 
2478     for (qlk = nxt_queue_first(sockets);
2479          qlk != nxt_queue_tail(sockets);
2480          qlk = nxt_queue_next(qlk))
2481     {
2482         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
2483         if (nxt_slow_path(job == NULL)) {
2484             return NXT_ERROR;
2485         }
2486 
2487         job->work.next = recf->jobs;
2488         recf->jobs = &job->work;
2489 
2490         job->task = tmcf->engine->task;
2491         job->work.handler = nxt_router_listen_socket_delete;
2492         job->work.task = &job->task;
2493         job->work.obj = job;
2494         job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2495         job->tmcf = tmcf;
2496 
2497         tmcf->count++;
2498     }
2499 
2500     return NXT_OK;
2501 }
2502 
2503 
2504 static nxt_int_t
2505 nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
2506     nxt_router_temp_conf_t *tmcf)
2507 {
2508     nxt_int_t                 ret;
2509     nxt_uint_t                i, threads;
2510     nxt_router_engine_conf_t  *recf;
2511 
2512     recf = tmcf->engines->elts;
2513     threads = tmcf->router_conf->threads;
2514 
2515     for (i = tmcf->new_threads; i < threads; i++) {
2516         ret = nxt_router_thread_create(task, rt, recf[i].engine);
2517         if (nxt_slow_path(ret != NXT_OK)) {
2518             return ret;
2519         }
2520     }
2521 
2522     return NXT_OK;
2523 }
2524 
2525 
2526 static nxt_int_t
2527 nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
2528     nxt_event_engine_t *engine)
2529 {
2530     nxt_int_t            ret;
2531     nxt_thread_link_t    *link;
2532     nxt_thread_handle_t  handle;
2533 
2534     link = nxt_zalloc(sizeof(nxt_thread_link_t));
2535 
2536     if (nxt_slow_path(link == NULL)) {
2537         return NXT_ERROR;
2538     }
2539 
2540     link->start = nxt_router_thread_start;
2541     link->engine = engine;
2542     link->work.handler = nxt_router_thread_exit_handler;
2543     link->work.task = task;
2544     link->work.data = link;
2545 
2546     nxt_queue_insert_tail(&rt->engines, &engine->link);
2547 
2548     ret = nxt_thread_create(&handle, link);
2549 
2550     if (nxt_slow_path(ret != NXT_OK)) {
2551         nxt_queue_remove(&engine->link);
2552     }
2553 
2554     return ret;
2555 }
2556 
2557 
2558 static void
2559 nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
2560     nxt_router_temp_conf_t *tmcf)
2561 {
2562     nxt_app_t  *app;
2563 
2564     nxt_queue_each(app, &router->apps, nxt_app_t, link) {
2565 
2566         nxt_router_app_unlink(task, app);
2567 
2568     } nxt_queue_loop;
2569 
2570     nxt_queue_add(&router->apps, &tmcf->previous);
2571     nxt_queue_add(&router->apps, &tmcf->apps);
2572 }
2573 
2574 
2575 static void
2576 nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf)
2577 {
2578     nxt_uint_t                n;
2579     nxt_event_engine_t        *engine;
2580     nxt_router_engine_conf_t  *recf;
2581 
2582     recf = tmcf->engines->elts;
2583 
2584     for (n = tmcf->engines->nelts; n != 0; n--) {
2585         engine = recf->engine;
2586 
2587         switch (recf->action) {
2588 
2589         case NXT_ROUTER_ENGINE_KEEP:
2590             break;
2591 
2592         case NXT_ROUTER_ENGINE_ADD:
2593             nxt_queue_insert_tail(&router->engines, &engine->link0);
2594             break;
2595 
2596         case NXT_ROUTER_ENGINE_DELETE:
2597             nxt_queue_remove(&engine->link0);
2598             break;
2599         }
2600 
2601         nxt_router_engine_post(engine, recf->jobs);
2602 
2603         recf++;
2604     }
2605 }
2606 
2607 
2608 static void
2609 nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs)
2610 {
2611     nxt_work_t  *work, *next;
2612 
2613     for (work = jobs; work != NULL; work = next) {
2614         next = work->next;
2615         work->next = NULL;
2616 
2617         nxt_event_engine_post(engine, work);
2618     }
2619 }
2620 
2621 
2622 static nxt_port_handlers_t  nxt_router_app_port_handlers = {
2623     .rpc_error = nxt_port_rpc_handler,
2624     .mmap      = nxt_port_mmap_handler,
2625     .data      = nxt_port_rpc_handler,
2626 };
2627 
2628 
2629 static void
2630 nxt_router_thread_start(void *data)
2631 {
2632     nxt_int_t           ret;
2633     nxt_port_t          *port;
2634     nxt_task_t          *task;
2635     nxt_thread_t        *thread;
2636     nxt_thread_link_t   *link;
2637     nxt_event_engine_t  *engine;
2638 
2639     link = data;
2640     engine = link->engine;
2641     task = &engine->task;
2642 
2643     thread = nxt_thread();
2644 
2645     nxt_event_engine_thread_adopt(engine);
2646 
2647     /* STUB */
2648     thread->runtime = engine->task.thread->runtime;
2649 
2650     engine->task.thread = thread;
2651     engine->task.log = thread->log;
2652     thread->engine = engine;
2653     thread->task = &engine->task;
2654 #if 0
2655     thread->fiber = &engine->fibers->fiber;
2656 #endif
2657 
2658     engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
2659     if (nxt_slow_path(engine->mem_pool == NULL)) {
2660         return;
2661     }
2662 
2663     port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid,
2664                         NXT_PROCESS_ROUTER);
2665     if (nxt_slow_path(port == NULL)) {
2666         return;
2667     }
2668 
2669     ret = nxt_port_socket_init(task, port, 0);
2670     if (nxt_slow_path(ret != NXT_OK)) {
2671         nxt_port_use(task, port, -1);
2672         return;
2673     }
2674 
2675     engine->port = port;
2676 
2677     nxt_port_enable(task, port, &nxt_router_app_port_handlers);
2678 
2679     nxt_event_engine_start(engine);
2680 }
2681 
2682 
2683 static void
2684 nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data)
2685 {
2686     nxt_joint_job_t          *job;
2687     nxt_socket_conf_t        *skcf;
2688     nxt_listen_event_t       *lev;
2689     nxt_listen_socket_t      *ls;
2690     nxt_thread_spinlock_t    *lock;
2691     nxt_socket_conf_joint_t  *joint;
2692 
2693     job = obj;
2694     joint = data;
2695 
2696     nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link);
2697 
2698     skcf = joint->socket_conf;
2699     ls = skcf->listen;
2700 
2701     lev = nxt_listen_event(task, ls);
2702     if (nxt_slow_path(lev == NULL)) {
2703         nxt_router_listen_socket_release(task, skcf);
2704         return;
2705     }
2706 
2707     lev->socket.data = joint;
2708 
2709     lock = &skcf->router_conf->router->lock;
2710 
2711     nxt_thread_spin_lock(lock);
2712     ls->count++;
2713     nxt_thread_spin_unlock(lock);
2714 
2715     job->work.next = NULL;
2716     job->work.handler = nxt_router_conf_wait;
2717 
2718     nxt_event_engine_post(job->tmcf->engine, &job->work);
2719 }
2720 
2721 
2722 nxt_inline nxt_listen_event_t *
2723 nxt_router_listen_event(nxt_queue_t *listen_connections,
2724     nxt_socket_conf_t *skcf)
2725 {
2726     nxt_socket_t        fd;
2727     nxt_queue_link_t    *qlk;
2728     nxt_listen_event_t  *lev;
2729 
2730     fd = skcf->listen->socket;
2731 
2732     for (qlk = nxt_queue_first(listen_connections);
2733          qlk != nxt_queue_tail(listen_connections);
2734          qlk = nxt_queue_next(qlk))
2735     {
2736         lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link);
2737 
2738         if (fd == lev->socket.fd) {
2739             return lev;
2740         }
2741     }
2742 
2743     return NULL;
2744 }
2745 
2746 
2747 static void
2748 nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data)
2749 {
2750     nxt_joint_job_t          *job;
2751     nxt_event_engine_t       *engine;
2752     nxt_listen_event_t       *lev;
2753     nxt_socket_conf_joint_t  *joint, *old;
2754 
2755     job = obj;
2756     joint = data;
2757 
2758     engine = task->thread->engine;
2759 
2760     nxt_queue_insert_tail(&engine->joints, &joint->link);
2761 
2762     lev = nxt_router_listen_event(&engine->listen_connections,
2763                                   joint->socket_conf);
2764 
2765     old = lev->socket.data;
2766     lev->socket.data = joint;
2767     lev->listen = joint->socket_conf->listen;
2768 
2769     job->work.next = NULL;
2770     job->work.handler = nxt_router_conf_wait;
2771 
2772     nxt_event_engine_post(job->tmcf->engine, &job->work);
2773 
2774     /*
2775      * The task is allocated from configuration temporary
2776      * memory pool so it can be freed after engine post operation.
2777      */
2778 
2779     nxt_router_conf_release(&engine->task, old);
2780 }
2781 
2782 
2783 static void
2784 nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data)
2785 {
2786     nxt_joint_job_t     *job;
2787     nxt_socket_conf_t   *skcf;
2788     nxt_listen_event_t  *lev;
2789     nxt_event_engine_t  *engine;
2790 
2791     job = obj;
2792     skcf = data;
2793 
2794     engine = task->thread->engine;
2795 
2796     lev = nxt_router_listen_event(&engine->listen_connections, skcf);
2797 
2798     nxt_fd_event_delete(engine, &lev->socket);
2799 
2800     nxt_debug(task, "engine %p: listen socket delete: %d", engine,
2801               lev->socket.fd);
2802 
2803     lev->timer.handler = nxt_router_listen_socket_close;
2804     lev->timer.work_queue = &engine->fast_work_queue;
2805 
2806     nxt_timer_add(engine, &lev->timer, 0);
2807 
2808     job->work.next = NULL;
2809     job->work.handler = nxt_router_conf_wait;
2810 
2811     nxt_event_engine_post(job->tmcf->engine, &job->work);
2812 }
2813 
2814 
2815 static void
2816 nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, void *data)
2817 {
2818     nxt_event_engine_t  *engine;
2819 
2820     nxt_debug(task, "router worker thread quit");
2821 
2822     engine = task->thread->engine;
2823 
2824     engine->shutdown = 1;
2825 
2826     if (nxt_queue_is_empty(&engine->joints)) {
2827         nxt_thread_exit(task->thread);
2828     }
2829 }
2830 
2831 
2832 static void
2833 nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data)
2834 {
2835     nxt_timer_t              *timer;
2836     nxt_listen_event_t       *lev;
2837     nxt_socket_conf_joint_t  *joint;
2838 
2839     timer = obj;
2840     lev = nxt_timer_data(timer, nxt_listen_event_t, timer);
2841 
2842     nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine,
2843               lev->socket.fd);
2844 
2845     nxt_queue_remove(&lev->link);
2846 
2847     joint = lev->socket.data;
2848     lev->socket.data = NULL;
2849 
2850     /* 'task' refers to lev->task and we cannot use after nxt_free() */
2851     task = &task->thread->engine->task;
2852 
2853     nxt_router_listen_socket_release(task, joint->socket_conf);
2854 
2855     nxt_router_listen_event_release(task, lev, joint);
2856 }
2857 
2858 
2859 static void
2860 nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf)
2861 {
2862     nxt_listen_socket_t    *ls;
2863     nxt_thread_spinlock_t  *lock;
2864 
2865     ls = skcf->listen;
2866     lock = &skcf->router_conf->router->lock;
2867 
2868     nxt_thread_spin_lock(lock);
2869 
2870     nxt_debug(task, "engine %p: listen socket release: ls->count %D",
2871               task->thread->engine, ls->count);
2872 
2873     if (--ls->count != 0) {
2874         ls = NULL;
2875     }
2876 
2877     nxt_thread_spin_unlock(lock);
2878 
2879     if (ls != NULL) {
2880         nxt_socket_close(task, ls->socket);
2881         nxt_free(ls);
2882     }
2883 }
2884 
2885 
2886 void
2887 nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev,
2888     nxt_socket_conf_joint_t *joint)
2889 {
2890     nxt_event_engine_t  *engine;
2891 
2892     nxt_debug(task, "listen event count: %D", lev->count);
2893 
2894     if (--lev->count == 0) {
2895         nxt_free(lev);
2896     }
2897 
2898     if (joint != NULL) {
2899         nxt_router_conf_release(task, joint);
2900     }
2901 
2902     engine = task->thread->engine;
2903 
2904     if (engine->shutdown && nxt_queue_is_empty(&engine->joints)) {
2905         nxt_thread_exit(task->thread);
2906     }
2907 }
2908 
2909 
2910 void
2911 nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
2912 {
2913     nxt_socket_conf_t      *skcf;
2914     nxt_router_conf_t      *rtcf;
2915     nxt_thread_spinlock_t  *lock;
2916 
2917     nxt_debug(task, "conf joint %p count: %D", joint, joint->count);
2918 
2919     if (--joint->count != 0) {
2920         return;
2921     }
2922 
2923     nxt_queue_remove(&joint->link);
2924 
2925     /*
2926      * The joint content can not be safely used after the critical
2927      * section protected by the spinlock because its memory pool may
2928      * be already destroyed by another thread.
2929      */
2930     skcf = joint->socket_conf;
2931     rtcf = skcf->router_conf;
2932     lock = &rtcf->router->lock;
2933 
2934     nxt_thread_spin_lock(lock);
2935 
2936     nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count,
2937               rtcf, rtcf->count);
2938 
2939     if (--skcf->count != 0) {
2940         skcf = NULL;
2941         rtcf = NULL;
2942 
2943     } else {
2944         nxt_queue_remove(&skcf->link);
2945 
2946         if (--rtcf->count != 0) {
2947             rtcf = NULL;
2948         }
2949     }
2950 
2951     nxt_thread_spin_unlock(lock);
2952 
2953     if (skcf != NULL) {
2954         if (skcf->pass != NULL) {
2955             nxt_http_pass_cleanup(task, skcf->pass);
2956         }
2957 
2958 #if (NXT_TLS)
2959         if (skcf->tls != NULL) {
2960             task->thread->runtime->tls->server_free(task, skcf->tls);
2961         }
2962 #endif
2963     }
2964 
2965     /* TODO remove engine->port */
2966     /* TODO excude from connected ports */
2967 
2968     if (rtcf != NULL) {
2969         nxt_debug(task, "old router conf is destroyed");
2970 
2971         nxt_http_routes_cleanup(task, rtcf->routes);
2972 
2973         nxt_router_access_log_release(task, lock, rtcf->access_log);
2974 
2975         nxt_mp_thread_adopt(rtcf->mem_pool);
2976 
2977         nxt_mp_destroy(rtcf->mem_pool);
2978     }
2979 }
2980 
2981 
2982 static void
2983 nxt_router_access_log_writer(nxt_task_t *task, nxt_http_request_t *r,
2984     nxt_router_access_log_t *access_log)
2985 {
2986     size_t     size;
2987     u_char     *buf, *p;
2988     nxt_off_t  bytes;
2989 
2990     static nxt_time_string_t  date_cache = {
2991         (nxt_atomic_uint_t) -1,
2992         nxt_router_access_log_date,
2993         "%02d/%s/%4d:%02d:%02d:%02d %c%02d%02d",
2994         nxt_length("31/Dec/1986:19:40:00 +0300"),
2995         NXT_THREAD_TIME_LOCAL,
2996         NXT_THREAD_TIME_SEC,
2997     };
2998 
2999     size = r->remote->address_length
3000            + 6                  /* ' - - [' */
3001            + date_cache.size
3002            + 3                  /* '] "' */
3003            + r->method->length
3004            + 1                  /* space */
3005            + r->target.length
3006            + 1                  /* space */
3007            + r->version.length
3008            + 2                  /* '" ' */
3009            + 3                  /* status */
3010            + 1                  /* space */
3011            + NXT_OFF_T_LEN
3012            + 2                  /* ' "' */
3013            + (r->referer != NULL ? r->referer->value_length : 1)
3014            + 3                  /* '" "' */
3015            + (r->user_agent != NULL ? r->user_agent->value_length : 1)
3016            + 2                  /* '"\n' */
3017     ;
3018 
3019     buf = nxt_mp_nget(r->mem_pool, size);
3020     if (nxt_slow_path(buf == NULL)) {
3021         return;
3022     }
3023 
3024     p = nxt_cpymem(buf, nxt_sockaddr_address(r->remote),
3025                    r->remote->address_length);
3026 
3027     p = nxt_cpymem(p, " - - [", 6);
3028 
3029     p = nxt_thread_time_string(task->thread, &date_cache, p);
3030 
3031     p = nxt_cpymem(p, "] \"", 3);
3032 
3033     if (r->method->length != 0) {
3034         p = nxt_cpymem(p, r->method->start, r->method->length);
3035 
3036         if (r->target.length != 0) {
3037             *p++ = ' ';
3038             p = nxt_cpymem(p, r->target.start, r->target.length);
3039 
3040             if (r->version.length != 0) {
3041                 *p++ = ' ';
3042                 p = nxt_cpymem(p, r->version.start, r->version.length);
3043             }
3044         }
3045 
3046     } else {
3047         *p++ = '-';
3048     }
3049 
3050     p = nxt_cpymem(p, "\" ", 2);
3051 
3052     p = nxt_sprintf(p, p + 3, "%03d", r->status);
3053 
3054     *p++ = ' ';
3055 
3056     bytes = nxt_http_proto_body_bytes_sent[r->protocol](task, r->proto);
3057 
3058     p = nxt_sprintf(p, p + NXT_OFF_T_LEN, "%O", bytes);
3059 
3060     p = nxt_cpymem(p, " \"", 2);
3061 
3062     if (r->referer != NULL) {
3063         p = nxt_cpymem(p, r->referer->value, r->referer->value_length);
3064 
3065     } else {
3066         *p++ = '-';
3067     }
3068 
3069     p = nxt_cpymem(p, "\" \"", 3);
3070 
3071     if (r->user_agent != NULL) {
3072         p = nxt_cpymem(p, r->user_agent->value, r->user_agent->value_length);
3073 
3074     } else {
3075         *p++ = '-';
3076     }
3077 
3078     p = nxt_cpymem(p, "\"\n", 2);
3079 
3080     nxt_fd_write(access_log->fd, buf, p - buf);
3081 }
3082 
3083 
3084 static u_char *
3085 nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, struct tm *tm,
3086     size_t size, const char *format)
3087 {
3088     u_char  sign;
3089     time_t  gmtoff;
3090 
3091     static const char  *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun",
3092                                     "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
3093 
3094     gmtoff = nxt_timezone(tm) / 60;
3095 
3096     if (gmtoff < 0) {
3097         gmtoff = -gmtoff;
3098         sign = '-';
3099 
3100     } else {
3101         sign = '+';
3102     }
3103 
3104     return nxt_sprintf(buf, buf + size, format,
3105                        tm->tm_mday, month[tm->tm_mon], tm->tm_year + 1900,
3106                        tm->tm_hour, tm->tm_min, tm->tm_sec,
3107                        sign, gmtoff / 60, gmtoff % 60);
3108 }
3109 
3110 
3111 static void
3112 nxt_router_access_log_open(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
3113 {
3114     uint32_t                 stream;
3115     nxt_int_t                ret;
3116     nxt_buf_t                *b;
3117     nxt_port_t               *main_port, *router_port;
3118     nxt_runtime_t            *rt;
3119     nxt_router_access_log_t  *access_log;
3120 
3121     access_log = tmcf->router_conf->access_log;
3122 
3123     b = nxt_buf_mem_alloc(tmcf->mem_pool, access_log->path.length + 1, 0);
3124     if (nxt_slow_path(b == NULL)) {
3125         goto fail;
3126     }
3127 
3128     nxt_buf_cpystr(b, &access_log->path);
3129     *b->mem.free++ = '\0';
3130 
3131     rt = task->thread->runtime;
3132     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
3133     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
3134 
3135     stream = nxt_port_rpc_register_handler(task, router_port,
3136                                            nxt_router_access_log_ready,
3137                                            nxt_router_access_log_error,
3138                                            -1, tmcf);
3139     if (nxt_slow_path(stream == 0)) {
3140         goto fail;
3141     }
3142 
3143     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1,
3144                                 stream, router_port->id, b);
3145 
3146     if (nxt_slow_path(ret != NXT_OK)) {
3147         nxt_port_rpc_cancel(task, router_port, stream);
3148         goto fail;
3149     }
3150 
3151     return;
3152 
3153 fail:
3154 
3155     nxt_router_conf_error(task, tmcf);
3156 }
3157 
3158 
3159 static void
3160 nxt_router_access_log_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3161     void *data)
3162 {
3163     nxt_router_temp_conf_t   *tmcf;
3164     nxt_router_access_log_t  *access_log;
3165 
3166     tmcf = data;
3167 
3168     access_log = tmcf->router_conf->access_log;
3169 
3170     access_log->fd = msg->fd;
3171 
3172     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
3173                        nxt_router_conf_apply, task, tmcf, NULL);
3174 }
3175 
3176 
3177 static void
3178 nxt_router_access_log_error(nxt_task_t *task, nxt_port_recv_msg_t *