xref: /unit/src/nxt_router.c (revision 977:4f9268f27b57)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) Valentin V. Bartenev
5  * Copyright (C) NGINX, Inc.
6  */
7 
8 #include <nxt_router.h>
9 #include <nxt_conf.h>
10 #if (NXT_TLS)
11 #include <nxt_cert.h>
12 #endif
13 #include <nxt_http.h>
14 #include <nxt_port_memory_int.h>
15 #include <nxt_unit_request.h>
16 #include <nxt_unit_response.h>
17 
18 
19 typedef struct {
20     nxt_str_t         type;
21     uint32_t          processes;
22     uint32_t          max_processes;
23     uint32_t          spare_processes;
24     nxt_msec_t        timeout;
25     nxt_msec_t        res_timeout;
26     nxt_msec_t        idle_timeout;
27     uint32_t          requests;
28     nxt_conf_value_t  *limits_value;
29     nxt_conf_value_t  *processes_value;
30 } nxt_router_app_conf_t;
31 
32 
33 typedef struct {
34     nxt_str_t         pass;
35     nxt_str_t         application;
36 } nxt_router_listener_conf_t;
37 
38 
39 #if (NXT_TLS)
40 
41 typedef struct {
42     nxt_str_t          name;
43     nxt_socket_conf_t  *conf;
44 
45     nxt_queue_link_t   link;  /* for nxt_socket_conf_t.tls */
46 } nxt_router_tlssock_t;
47 
48 #endif
49 
50 
51 typedef struct nxt_msg_info_s {
52     nxt_buf_t                 *buf;
53     nxt_port_mmap_tracking_t  tracking;
54     nxt_work_handler_t        completion_handler;
55 } nxt_msg_info_t;
56 
57 
58 typedef struct nxt_req_app_link_s nxt_req_app_link_t;
59 
60 
61 typedef struct {
62     uint32_t                 stream;
63     nxt_app_t                *app;
64     nxt_port_t               *app_port;
65     nxt_app_parse_ctx_t      *ap;
66     nxt_msg_info_t           msg_info;
67     nxt_req_app_link_t       *ra;
68 
69     nxt_queue_link_t         link;     /* for nxt_conn_t.requests */
70 } nxt_req_conn_link_t;
71 
72 
73 struct nxt_req_app_link_s {
74     uint32_t             stream;
75     nxt_atomic_t         use_count;
76     nxt_port_t           *app_port;
77     nxt_port_t           *reply_port;
78     nxt_app_parse_ctx_t  *ap;
79     nxt_msg_info_t       msg_info;
80     nxt_req_conn_link_t  *rc;
81 
82     nxt_nsec_t           res_time;
83 
84     nxt_queue_link_t     link_app_requests; /* for nxt_app_t.requests */
85     nxt_queue_link_t     link_port_pending; /* for nxt_port_t.pending_requests */
86     nxt_queue_link_t     link_app_pending;  /* for nxt_app_t.pending */
87 
88     nxt_mp_t             *mem_pool;
89     nxt_work_t           work;
90 
91     int                  err_code;
92     const char           *err_str;
93 };
94 
95 
96 typedef struct {
97     nxt_socket_conf_t       *socket_conf;
98     nxt_router_temp_conf_t  *temp_conf;
99 } nxt_socket_rpc_t;
100 
101 
102 typedef struct {
103     nxt_app_t               *app;
104     nxt_router_temp_conf_t  *temp_conf;
105 } nxt_app_rpc_t;
106 
107 
108 struct nxt_port_select_state_s {
109     nxt_app_t           *app;
110     nxt_req_app_link_t  *ra;
111 
112     nxt_port_t          *failed_port;
113     int                 failed_port_use_delta;
114 
115     uint8_t             start_process;    /* 1 bit */
116     nxt_req_app_link_t  *shared_ra;
117     nxt_port_t          *port;
118 };
119 
120 typedef struct nxt_port_select_state_s nxt_port_select_state_t;
121 
122 static void nxt_router_greet_controller(nxt_task_t *task,
123     nxt_port_t *controller_port);
124 
125 static void nxt_router_port_select(nxt_task_t *task,
126     nxt_port_select_state_t *state);
127 
128 static nxt_int_t nxt_router_port_post_select(nxt_task_t *task,
129     nxt_port_select_state_t *state);
130 
131 static nxt_int_t nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app);
132 
133 nxt_inline void
134 nxt_router_ra_inc_use(nxt_req_app_link_t *ra)
135 {
136     nxt_atomic_fetch_add(&ra->use_count, 1);
137 }
138 
139 nxt_inline void
140 nxt_router_ra_dec_use(nxt_req_app_link_t *ra)
141 {
142 #if (NXT_DEBUG)
143     int  c;
144 
145     c = nxt_atomic_fetch_add(&ra->use_count, -1);
146 
147     nxt_assert(c > 1);
148 #else
149     (void) nxt_atomic_fetch_add(&ra->use_count, -1);
150 #endif
151 }
152 
153 static void nxt_router_ra_use(nxt_task_t *task, nxt_req_app_link_t *ra, int i);
154 
155 static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
156 static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
157 static void nxt_router_conf_ready(nxt_task_t *task,
158     nxt_router_temp_conf_t *tmcf);
159 static void nxt_router_conf_error(nxt_task_t *task,
160     nxt_router_temp_conf_t *tmcf);
161 static void nxt_router_conf_send(nxt_task_t *task,
162     nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
163 
164 static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
165     nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
166 static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
167 static void nxt_router_listen_socket_rpc_create(nxt_task_t *task,
168     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf);
169 static void nxt_router_listen_socket_ready(nxt_task_t *task,
170     nxt_port_recv_msg_t *msg, void *data);
171 static void nxt_router_listen_socket_error(nxt_task_t *task,
172     nxt_port_recv_msg_t *msg, void *data);
173 #if (NXT_TLS)
174 static void nxt_router_tls_rpc_create(nxt_task_t *task,
175     nxt_router_temp_conf_t *tmcf, nxt_router_tlssock_t *tls);
176 static void nxt_router_tls_rpc_handler(nxt_task_t *task,
177     nxt_port_recv_msg_t *msg, void *data);
178 #endif
179 static void nxt_router_app_rpc_create(nxt_task_t *task,
180     nxt_router_temp_conf_t *tmcf, nxt_app_t *app);
181 static void nxt_router_app_prefork_ready(nxt_task_t *task,
182     nxt_port_recv_msg_t *msg, void *data);
183 static void nxt_router_app_prefork_error(nxt_task_t *task,
184     nxt_port_recv_msg_t *msg, void *data);
185 static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
186     nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
187 static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
188     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa);
189 
190 static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
191     nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
192     const nxt_event_interface_t *interface);
193 static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
194     nxt_router_engine_conf_t *recf);
195 static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
196     nxt_router_engine_conf_t *recf);
197 static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
198     nxt_router_engine_conf_t *recf);
199 static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
200     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
201     nxt_work_handler_t handler);
202 static nxt_int_t nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
203     nxt_router_engine_conf_t *recf);
204 static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
205     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
206 
207 static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
208     nxt_router_temp_conf_t *tmcf);
209 static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
210     nxt_event_engine_t *engine);
211 static void nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
212     nxt_router_temp_conf_t *tmcf);
213 
214 static void nxt_router_engines_post(nxt_router_t *router,
215     nxt_router_temp_conf_t *tmcf);
216 static void nxt_router_engine_post(nxt_event_engine_t *engine,
217     nxt_work_t *jobs);
218 
219 static void nxt_router_thread_start(void *data);
220 static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
221     void *data);
222 static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
223     void *data);
224 static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
225     void *data);
226 static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
227     void *data);
228 static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
229     void *data);
230 static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
231     void *data);
232 static void nxt_router_listen_socket_release(nxt_task_t *task,
233     nxt_socket_conf_t *skcf);
234 
235 static void nxt_router_access_log_writer(nxt_task_t *task,
236     nxt_http_request_t *r, nxt_router_access_log_t *access_log);
237 static u_char *nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now,
238     struct tm *tm, size_t size, const char *format);
239 static void nxt_router_access_log_open(nxt_task_t *task,
240     nxt_router_temp_conf_t *tmcf);
241 static void nxt_router_access_log_ready(nxt_task_t *task,
242     nxt_port_recv_msg_t *msg, void *data);
243 static void nxt_router_access_log_error(nxt_task_t *task,
244     nxt_port_recv_msg_t *msg, void *data);
245 static void nxt_router_access_log_release(nxt_task_t *task,
246     nxt_thread_spinlock_t *lock, nxt_router_access_log_t *access_log);
247 static void nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj,
248     void *data);
249 static void nxt_router_access_log_reopen_ready(nxt_task_t *task,
250     nxt_port_recv_msg_t *msg, void *data);
251 static void nxt_router_access_log_reopen_error(nxt_task_t *task,
252     nxt_port_recv_msg_t *msg, void *data);
253 
254 static void nxt_router_app_port_ready(nxt_task_t *task,
255     nxt_port_recv_msg_t *msg, void *data);
256 static void nxt_router_app_port_error(nxt_task_t *task,
257     nxt_port_recv_msg_t *msg, void *data);
258 
259 static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
260 static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
261     uint32_t request_failed, uint32_t got_response);
262 static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app,
263     nxt_req_app_link_t *ra);
264 
265 static void nxt_router_app_prepare_request(nxt_task_t *task,
266     nxt_req_app_link_t *ra);
267 static nxt_buf_t *nxt_router_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
268     nxt_port_t *port, const nxt_str_t *prefix);
269 
270 static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
271 static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
272     void *data);
273 static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj,
274     void *data);
275 static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj,
276     void *data);
277 static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data);
278 
279 static const nxt_http_request_state_t  nxt_http_request_send_state;
280 static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
281 
282 static void nxt_router_app_joint_use(nxt_task_t *task,
283     nxt_app_joint_t *app_joint, int i);
284 
285 static nxt_router_t  *nxt_router;
286 
287 static const nxt_str_t http_prefix = nxt_string("HTTP_");
288 static const nxt_str_t empty_prefix = nxt_string("");
289 
290 static const nxt_str_t  *nxt_app_msg_prefix[] = {
291     &empty_prefix,
292     &http_prefix,
293     &http_prefix,
294     &http_prefix,
295     &http_prefix,
296     &empty_prefix,
297 };
298 
299 
300 nxt_port_handlers_t  nxt_router_process_port_handlers = {
301     .quit         = nxt_worker_process_quit_handler,
302     .new_port     = nxt_router_new_port_handler,
303     .change_file  = nxt_port_change_log_file_handler,
304     .mmap         = nxt_port_mmap_handler,
305     .data         = nxt_router_conf_data_handler,
306     .remove_pid   = nxt_router_remove_pid_handler,
307     .access_log   = nxt_router_access_log_reopen_handler,
308     .rpc_ready    = nxt_port_rpc_handler,
309     .rpc_error    = nxt_port_rpc_handler,
310 };
311 
312 
313 nxt_int_t
314 nxt_router_start(nxt_task_t *task, void *data)
315 {
316     nxt_int_t      ret;
317     nxt_port_t     *controller_port;
318     nxt_router_t   *router;
319     nxt_runtime_t  *rt;
320 
321     rt = task->thread->runtime;
322 
323 #if (NXT_TLS)
324     rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL");
325     if (nxt_slow_path(rt->tls == NULL)) {
326         return NXT_ERROR;
327     }
328 
329     ret = rt->tls->library_init(task);
330     if (nxt_slow_path(ret != NXT_OK)) {
331         return ret;
332     }
333 #endif
334 
335     ret = nxt_http_init(task, rt);
336     if (nxt_slow_path(ret != NXT_OK)) {
337         return ret;
338     }
339 
340     router = nxt_zalloc(sizeof(nxt_router_t));
341     if (nxt_slow_path(router == NULL)) {
342         return NXT_ERROR;
343     }
344 
345     nxt_queue_init(&router->engines);
346     nxt_queue_init(&router->sockets);
347     nxt_queue_init(&router->apps);
348 
349     nxt_router = router;
350 
351     controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
352     if (controller_port != NULL) {
353         nxt_router_greet_controller(task, controller_port);
354     }
355 
356     return NXT_OK;
357 }
358 
359 
360 static void
361 nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port)
362 {
363     nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY,
364                           -1, 0, 0, NULL);
365 }
366 
367 
368 static void
369 nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
370     void *data)
371 {
372     size_t         size;
373     uint32_t       stream;
374     nxt_mp_t       *mp;
375     nxt_int_t      ret;
376     nxt_app_t      *app;
377     nxt_buf_t      *b;
378     nxt_port_t     *main_port;
379     nxt_runtime_t  *rt;
380 
381     app = data;
382 
383     rt = task->thread->runtime;
384     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
385 
386     nxt_debug(task, "app '%V' %p start process", &app->name, app);
387 
388     size = app->name.length + 1 + app->conf.length;
389 
390     b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size);
391 
392     if (nxt_slow_path(b == NULL)) {
393         goto failed;
394     }
395 
396     nxt_buf_cpystr(b, &app->name);
397     *b->mem.free++ = '\0';
398     nxt_buf_cpystr(b, &app->conf);
399 
400     nxt_router_app_joint_use(task, app->joint, 1);
401 
402     stream = nxt_port_rpc_register_handler(task, port,
403                                            nxt_router_app_port_ready,
404                                            nxt_router_app_port_error,
405                                            -1, app->joint);
406 
407     if (nxt_slow_path(stream == 0)) {
408         nxt_router_app_joint_use(task, app->joint, -1);
409 
410         goto failed;
411     }
412 
413     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1,
414                                 stream, port->id, b);
415 
416     if (nxt_slow_path(ret != NXT_OK)) {
417         nxt_port_rpc_cancel(task, port, stream);
418 
419         nxt_router_app_joint_use(task, app->joint, -1);
420 
421         goto failed;
422     }
423 
424     nxt_router_app_use(task, app, -1);
425 
426     return;
427 
428 failed:
429 
430     if (b != NULL) {
431         mp = b->data;
432         nxt_mp_free(mp, b);
433         nxt_mp_release(mp);
434     }
435 
436     nxt_thread_mutex_lock(&app->mutex);
437 
438     app->pending_processes--;
439 
440     nxt_thread_mutex_unlock(&app->mutex);
441 
442     nxt_router_app_use(task, app, -1);
443 }
444 
445 
446 static void
447 nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i)
448 {
449     app_joint->use_count += i;
450 
451     if (app_joint->use_count == 0) {
452         nxt_assert(app_joint->app == NULL);
453 
454         nxt_free(app_joint);
455     }
456 }
457 
458 
459 static nxt_int_t
460 nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
461 {
462     nxt_int_t      res;
463     nxt_port_t     *router_port;
464     nxt_runtime_t  *rt;
465 
466     rt = task->thread->runtime;
467     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
468 
469     nxt_router_app_use(task, app, 1);
470 
471     res = nxt_port_post(task, router_port, nxt_router_start_app_process_handler,
472                         app);
473 
474     if (res == NXT_OK) {
475         return res;
476     }
477 
478     nxt_thread_mutex_lock(&app->mutex);
479 
480     app->pending_processes--;
481 
482     nxt_thread_mutex_unlock(&app->mutex);
483 
484     nxt_router_app_use(task, app, -1);
485 
486     return NXT_ERROR;
487 }
488 
489 
490 nxt_inline void
491 nxt_router_ra_init(nxt_task_t *task, nxt_req_app_link_t *ra,
492     nxt_req_conn_link_t *rc)
493 {
494     nxt_event_engine_t  *engine;
495 
496     engine = task->thread->engine;
497 
498     nxt_memzero(ra, sizeof(nxt_req_app_link_t));
499 
500     ra->stream = rc->stream;
501     ra->use_count = 1;
502     ra->rc = rc;
503     rc->ra = ra;
504     ra->reply_port = engine->port;
505     ra->ap = rc->ap;
506 
507     ra->work.handler = NULL;
508     ra->work.task = &engine->task;
509     ra->work.obj = ra;
510     ra->work.data = engine;
511 }
512 
513 
514 nxt_inline nxt_req_app_link_t *
515 nxt_router_ra_create(nxt_task_t *task, nxt_req_app_link_t *ra_src)
516 {
517     nxt_mp_t            *mp;
518     nxt_req_app_link_t  *ra;
519 
520     if (ra_src->mem_pool != NULL) {
521         return ra_src;
522     }
523 
524     mp = ra_src->ap->mem_pool;
525 
526     ra = nxt_mp_alloc(mp, sizeof(nxt_req_app_link_t));
527 
528     if (nxt_slow_path(ra == NULL)) {
529 
530         ra_src->rc->ra = NULL;
531         ra_src->rc = NULL;
532 
533         return NULL;
534     }
535 
536     nxt_mp_retain(mp);
537 
538     nxt_router_ra_init(task, ra, ra_src->rc);
539 
540     ra->mem_pool = mp;
541 
542     return ra;
543 }
544 
545 
546 nxt_inline nxt_bool_t
547 nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info,
548     uint32_t stream)
549 {
550     nxt_buf_t   *b, *next;
551     nxt_bool_t  cancelled;
552 
553     if (msg_info->buf == NULL) {
554         return 0;
555     }
556 
557     cancelled = nxt_port_mmap_tracking_cancel(task, &msg_info->tracking,
558                                               stream);
559 
560     if (cancelled) {
561         nxt_debug(task, "stream #%uD: cancelled by router", stream);
562     }
563 
564     for (b = msg_info->buf; b != NULL; b = next) {
565         next = b->next;
566 
567         b->completion_handler = msg_info->completion_handler;
568 
569         if (b->is_port_mmap_sent) {
570             b->is_port_mmap_sent = cancelled == 0;
571             b->completion_handler(task, b, b->parent);
572         }
573     }
574 
575     msg_info->buf = NULL;
576 
577     return cancelled;
578 }
579 
580 
581 static void
582 nxt_router_ra_update_peer(nxt_task_t *task, nxt_req_app_link_t *ra);
583 
584 
585 static void
586 nxt_router_ra_update_peer_handler(nxt_task_t *task, void *obj, void *data)
587 {
588     nxt_req_app_link_t  *ra;
589 
590     ra = obj;
591 
592     nxt_router_ra_update_peer(task, ra);
593 
594     nxt_router_ra_use(task, ra, -1);
595 }
596 
597 
598 static void
599 nxt_router_ra_update_peer(nxt_task_t *task, nxt_req_app_link_t *ra)
600 {
601     nxt_event_engine_t   *engine;
602     nxt_req_conn_link_t  *rc;
603 
604     engine = ra->work.data;
605 
606     if (task->thread->engine != engine) {
607         nxt_router_ra_inc_use(ra);
608 
609         ra->work.handler = nxt_router_ra_update_peer_handler;
610         ra->work.task = &engine->task;
611         ra->work.next = NULL;
612 
613         nxt_debug(task, "ra stream #%uD post update peer to %p",
614                   ra->stream, engine);
615 
616         nxt_event_engine_post(engine, &ra->work);
617 
618         return;
619     }
620 
621     nxt_debug(task, "ra stream #%uD update peer", ra->stream);
622 
623     rc = ra->rc;
624 
625     if (rc != NULL && ra->app_port != NULL) {
626         nxt_port_rpc_ex_set_peer(task, engine->port, rc, ra->app_port->pid);
627     }
628 
629     nxt_router_ra_use(task, ra, -1);
630 }
631 
632 
633 static void
634 nxt_router_ra_release(nxt_task_t *task, nxt_req_app_link_t *ra)
635 {
636     nxt_mp_t                *mp;
637     nxt_req_conn_link_t     *rc;
638 
639     nxt_assert(task->thread->engine == ra->work.data);
640     nxt_assert(ra->use_count == 0);
641 
642     nxt_debug(task, "ra stream #%uD release", ra->stream);
643 
644     rc = ra->rc;
645 
646     if (rc != NULL) {
647         if (nxt_slow_path(ra->err_code != 0)) {
648             nxt_http_request_error(task, rc->ap->request, ra->err_code);
649 
650         } else {
651             rc->app_port = ra->app_port;
652             rc->msg_info = ra->msg_info;
653 
654             if (rc->app->timeout != 0) {
655                 rc->ap->timer.handler = nxt_router_app_timeout;
656                 rc->ap->timer_data = rc;
657                 nxt_timer_add(task->thread->engine, &rc->ap->timer,
658                               rc->app->timeout);
659             }
660 
661             ra->app_port = NULL;
662             ra->msg_info.buf = NULL;
663         }
664 
665         rc->ra = NULL;
666         ra->rc = NULL;
667     }
668 
669     if (ra->app_port != NULL) {
670         nxt_router_app_port_release(task, ra->app_port, 0, 1);
671 
672         ra->app_port = NULL;
673     }
674 
675     nxt_router_msg_cancel(task, &ra->msg_info, ra->stream);
676 
677     mp = ra->mem_pool;
678 
679     if (mp != NULL) {
680         nxt_mp_free(mp, ra);
681         nxt_mp_release(mp);
682     }
683 }
684 
685 
686 static void
687 nxt_router_ra_release_handler(nxt_task_t *task, void *obj, void *data)
688 {
689     nxt_req_app_link_t  *ra;
690 
691     ra = obj;
692 
693     nxt_assert(ra->work.data == data);
694 
695     nxt_atomic_fetch_add(&ra->use_count, -1);
696 
697     nxt_router_ra_release(task, ra);
698 }
699 
700 
701 static void
702 nxt_router_ra_use(nxt_task_t *task, nxt_req_app_link_t *ra, int i)
703 {
704     int                 c;
705     nxt_event_engine_t  *engine;
706 
707     c = nxt_atomic_fetch_add(&ra->use_count, i);
708 
709     if (i < 0 && c == -i) {
710         engine = ra->work.data;
711 
712         if (task->thread->engine == engine) {
713             nxt_router_ra_release(task, ra);
714 
715             return;
716         }
717 
718         nxt_router_ra_inc_use(ra);
719 
720         ra->work.handler = nxt_router_ra_release_handler;
721         ra->work.task = &engine->task;
722         ra->work.next = NULL;
723 
724         nxt_debug(task, "ra stream #%uD post release to %p",
725                   ra->stream, engine);
726 
727         nxt_event_engine_post(engine, &ra->work);
728     }
729 }
730 
731 
732 nxt_inline void
733 nxt_router_ra_error(nxt_req_app_link_t *ra, int code, const char *str)
734 {
735     ra->app_port = NULL;
736     ra->err_code = code;
737     ra->err_str = str;
738 }
739 
740 
741 nxt_inline void
742 nxt_router_ra_pending(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra)
743 {
744     nxt_queue_insert_tail(&ra->app_port->pending_requests,
745                           &ra->link_port_pending);
746     nxt_queue_insert_tail(&app->pending, &ra->link_app_pending);
747 
748     nxt_router_ra_inc_use(ra);
749 
750     ra->res_time = nxt_thread_monotonic_time(task->thread) + app->res_timeout;
751 
752     nxt_debug(task, "ra stream #%uD enqueue to pending_requests", ra->stream);
753 }
754 
755 
756 nxt_inline nxt_bool_t
757 nxt_queue_chk_remove(nxt_queue_link_t *lnk)
758 {
759     if (lnk->next != NULL) {
760         nxt_queue_remove(lnk);
761 
762         lnk->next = NULL;
763 
764         return 1;
765     }
766 
767     return 0;
768 }
769 
770 
771 nxt_inline void
772 nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc)
773 {
774     int                 ra_use_delta;
775     nxt_req_app_link_t  *ra;
776 
777     if (rc->app_port != NULL) {
778         nxt_router_app_port_release(task, rc->app_port, 0, 1);
779 
780         rc->app_port = NULL;
781     }
782 
783     nxt_router_msg_cancel(task, &rc->msg_info, rc->stream);
784 
785     ra = rc->ra;
786 
787     if (ra != NULL) {
788         rc->ra = NULL;
789         ra->rc = NULL;
790 
791         ra_use_delta = 0;
792 
793         nxt_thread_mutex_lock(&rc->app->mutex);
794 
795         if (ra->link_app_requests.next == NULL
796             && ra->link_port_pending.next == NULL
797             && ra->link_app_pending.next == NULL)
798         {
799             ra = NULL;
800 
801         } else {
802             ra_use_delta -= nxt_queue_chk_remove(&ra->link_app_requests);
803             ra_use_delta -= nxt_queue_chk_remove(&ra->link_port_pending);
804             nxt_queue_chk_remove(&ra->link_app_pending);
805         }
806 
807         nxt_thread_mutex_unlock(&rc->app->mutex);
808 
809         if (ra != NULL) {
810             nxt_router_ra_use(task, ra, ra_use_delta);
811         }
812     }
813 
814     if (rc->app != NULL) {
815         nxt_router_app_use(task, rc->app, -1);
816 
817         rc->app = NULL;
818     }
819 
820     if (rc->ap != NULL) {
821         rc->ap->timer_data = NULL;
822 
823         nxt_app_http_req_done(task, rc->ap);
824 
825         rc->ap = NULL;
826     }
827 }
828 
829 
830 void
831 nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
832 {
833     nxt_port_new_port_handler(task, msg);
834 
835     if (msg->u.new_port != NULL
836         && msg->u.new_port->type == NXT_PROCESS_CONTROLLER)
837     {
838         nxt_router_greet_controller(task, msg->u.new_port);
839     }
840 
841     if (msg->port_msg.stream == 0) {
842         return;
843     }
844 
845     if (msg->u.new_port == NULL
846         || msg->u.new_port->type != NXT_PROCESS_WORKER)
847     {
848         msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
849     }
850 
851     nxt_port_rpc_handler(task, msg);
852 }
853 
854 
855 void
856 nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
857 {
858     nxt_int_t               ret;
859     nxt_buf_t               *b;
860     nxt_router_temp_conf_t  *tmcf;
861 
862     tmcf = nxt_router_temp_conf(task);
863     if (nxt_slow_path(tmcf == NULL)) {
864         return;
865     }
866 
867     nxt_debug(task, "nxt_router_conf_data_handler(%O): %*s",
868               nxt_buf_used_size(msg->buf),
869               (size_t) nxt_buf_used_size(msg->buf), msg->buf->mem.pos);
870 
871     tmcf->router_conf->router = nxt_router;
872     tmcf->stream = msg->port_msg.stream;
873     tmcf->port = nxt_runtime_port_find(task->thread->runtime,
874                                        msg->port_msg.pid,
875                                        msg->port_msg.reply_port);
876 
877     if (nxt_slow_path(tmcf->port == NULL)) {
878         nxt_alert(task, "reply port not found");
879 
880         return;
881     }
882 
883     nxt_port_use(task, tmcf->port, 1);
884 
885     b = nxt_buf_chk_make_plain(tmcf->router_conf->mem_pool,
886                                msg->buf, msg->size);
887     if (nxt_slow_path(b == NULL)) {
888         nxt_router_conf_error(task, tmcf);
889 
890         return;
891     }
892 
893     ret = nxt_router_conf_create(task, tmcf, b->mem.pos, b->mem.free);
894 
895     if (nxt_fast_path(ret == NXT_OK)) {
896         nxt_router_conf_apply(task, tmcf, NULL);
897 
898     } else {
899         nxt_router_conf_error(task, tmcf);
900     }
901 }
902 
903 
904 static void
905 nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
906     void *data)
907 {
908     union {
909         nxt_pid_t  removed_pid;
910         void       *data;
911     } u;
912 
913     u.data = data;
914 
915     nxt_port_rpc_remove_peer(task, port, u.removed_pid);
916 }
917 
918 
919 void
920 nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
921 {
922     nxt_event_engine_t  *engine;
923 
924     nxt_port_remove_pid_handler(task, msg);
925 
926     if (msg->port_msg.stream == 0) {
927         return;
928     }
929 
930     nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
931     {
932         nxt_port_post(task, engine->port, nxt_router_app_process_remove_pid,
933                       msg->u.data);
934     }
935     nxt_queue_loop;
936 
937     msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
938 
939     nxt_port_rpc_handler(task, msg);
940 }
941 
942 
943 static nxt_router_temp_conf_t *
944 nxt_router_temp_conf(nxt_task_t *task)
945 {
946     nxt_mp_t                *mp, *tmp;
947     nxt_router_conf_t       *rtcf;
948     nxt_router_temp_conf_t  *tmcf;
949 
950     mp = nxt_mp_create(1024, 128, 256, 32);
951     if (nxt_slow_path(mp == NULL)) {
952         return NULL;
953     }
954 
955     rtcf = nxt_mp_zget(mp, sizeof(nxt_router_conf_t));
956     if (nxt_slow_path(rtcf == NULL)) {
957         goto fail;
958     }
959 
960     rtcf->mem_pool = mp;
961 
962     tmp = nxt_mp_create(1024, 128, 256, 32);
963     if (nxt_slow_path(tmp == NULL)) {
964         goto fail;
965     }
966 
967     tmcf = nxt_mp_zget(tmp, sizeof(nxt_router_temp_conf_t));
968     if (nxt_slow_path(tmcf == NULL)) {
969         goto temp_fail;
970     }
971 
972     tmcf->mem_pool = tmp;
973     tmcf->router_conf = rtcf;
974     tmcf->count = 1;
975     tmcf->engine = task->thread->engine;
976 
977     tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
978                                      sizeof(nxt_router_engine_conf_t));
979     if (nxt_slow_path(tmcf->engines == NULL)) {
980         goto temp_fail;
981     }
982 
983     nxt_queue_init(&tmcf->deleting);
984     nxt_queue_init(&tmcf->keeping);
985     nxt_queue_init(&tmcf->updating);
986     nxt_queue_init(&tmcf->pending);
987     nxt_queue_init(&tmcf->creating);
988 
989 #if (NXT_TLS)
990     nxt_queue_init(&tmcf->tls);
991 #endif
992 
993     nxt_queue_init(&tmcf->apps);
994     nxt_queue_init(&tmcf->previous);
995 
996     return tmcf;
997 
998 temp_fail:
999 
1000     nxt_mp_destroy(tmp);
1001 
1002 fail:
1003 
1004     nxt_mp_destroy(mp);
1005 
1006     return NULL;
1007 }
1008 
1009 
1010 nxt_inline nxt_bool_t
1011 nxt_router_app_can_start(nxt_app_t *app)
1012 {
1013     return app->processes + app->pending_processes < app->max_processes
1014             && app->pending_processes < app->max_pending_processes;
1015 }
1016 
1017 
1018 nxt_inline nxt_bool_t
1019 nxt_router_app_need_start(nxt_app_t *app)
1020 {
1021     return app->idle_processes + app->pending_processes
1022             < app->spare_processes;
1023 }
1024 
1025 
1026 static void
1027 nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data)
1028 {
1029     nxt_int_t                    ret;
1030     nxt_app_t                    *app;
1031     nxt_router_t                 *router;
1032     nxt_runtime_t                *rt;
1033     nxt_queue_link_t             *qlk;
1034     nxt_socket_conf_t            *skcf;
1035     nxt_router_conf_t            *rtcf;
1036     nxt_router_temp_conf_t       *tmcf;
1037     const nxt_event_interface_t  *interface;
1038 #if (NXT_TLS)
1039     nxt_router_tlssock_t         *tls;
1040 #endif
1041 
1042     tmcf = obj;
1043 
1044     qlk = nxt_queue_first(&tmcf->pending);
1045 
1046     if (qlk != nxt_queue_tail(&tmcf->pending)) {
1047         nxt_queue_remove(qlk);
1048         nxt_queue_insert_tail(&tmcf->creating, qlk);
1049 
1050         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1051 
1052         nxt_router_listen_socket_rpc_create(task, tmcf, skcf);
1053 
1054         return;
1055     }
1056 
1057 #if (NXT_TLS)
1058     qlk = nxt_queue_first(&tmcf->tls);
1059 
1060     if (qlk != nxt_queue_tail(&tmcf->tls)) {
1061         nxt_queue_remove(qlk);
1062 
1063         tls = nxt_queue_link_data(qlk, nxt_router_tlssock_t, link);
1064 
1065         nxt_router_tls_rpc_create(task, tmcf, tls);
1066         return;
1067     }
1068 #endif
1069 
1070     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1071 
1072         if (nxt_router_app_need_start(app)) {
1073             nxt_router_app_rpc_create(task, tmcf, app);
1074             return;
1075         }
1076 
1077     } nxt_queue_loop;
1078 
1079     rtcf = tmcf->router_conf;
1080 
1081     if (rtcf->access_log != NULL && rtcf->access_log->fd == -1) {
1082         nxt_router_access_log_open(task, tmcf);
1083         return;
1084     }
1085 
1086     rt = task->thread->runtime;
1087 
1088     interface = nxt_service_get(rt->services, "engine", NULL);
1089 
1090     router = rtcf->router;
1091 
1092     ret = nxt_router_engines_create(task, router, tmcf, interface);
1093     if (nxt_slow_path(ret != NXT_OK)) {
1094         goto fail;
1095     }
1096 
1097     ret = nxt_router_threads_create(task, rt, tmcf);
1098     if (nxt_slow_path(ret != NXT_OK)) {
1099         goto fail;
1100     }
1101 
1102     nxt_router_apps_sort(task, router, tmcf);
1103 
1104     nxt_router_engines_post(router, tmcf);
1105 
1106     nxt_queue_add(&router->sockets, &tmcf->updating);
1107     nxt_queue_add(&router->sockets, &tmcf->creating);
1108 
1109     router->access_log = rtcf->access_log;
1110 
1111     nxt_router_conf_ready(task, tmcf);
1112 
1113     return;
1114 
1115 fail:
1116 
1117     nxt_router_conf_error(task, tmcf);
1118 
1119     return;
1120 }
1121 
1122 
1123 static void
1124 nxt_router_conf_wait(nxt_task_t *task, void *obj, void *data)
1125 {
1126     nxt_joint_job_t  *job;
1127 
1128     job = obj;
1129 
1130     nxt_router_conf_ready(task, job->tmcf);
1131 }
1132 
1133 
1134 static void
1135 nxt_router_conf_ready(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1136 {
1137     nxt_debug(task, "temp conf count:%D", tmcf->count);
1138 
1139     if (--tmcf->count == 0) {
1140         nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_READY_LAST);
1141     }
1142 }
1143 
1144 
1145 static void
1146 nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
1147 {
1148     nxt_app_t          *app;
1149     nxt_queue_t        new_socket_confs;
1150     nxt_socket_t       s;
1151     nxt_router_t       *router;
1152     nxt_queue_link_t   *qlk;
1153     nxt_socket_conf_t  *skcf;
1154     nxt_router_conf_t  *rtcf;
1155 
1156     nxt_alert(task, "failed to apply new conf");
1157 
1158     for (qlk = nxt_queue_first(&tmcf->creating);
1159          qlk != nxt_queue_tail(&tmcf->creating);
1160          qlk = nxt_queue_next(qlk))
1161     {
1162         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1163         s = skcf->listen->socket;
1164 
1165         if (s != -1) {
1166             nxt_socket_close(task, s);
1167         }
1168 
1169         nxt_free(skcf->listen);
1170     }
1171 
1172     nxt_queue_init(&new_socket_confs);
1173     nxt_queue_add(&new_socket_confs, &tmcf->updating);
1174     nxt_queue_add(&new_socket_confs, &tmcf->pending);
1175     nxt_queue_add(&new_socket_confs, &tmcf->creating);
1176 
1177     rtcf = tmcf->router_conf;
1178 
1179     nxt_http_routes_cleanup(task, rtcf->routes);
1180 
1181     nxt_queue_each(skcf, &new_socket_confs, nxt_socket_conf_t, link) {
1182 
1183         if (skcf->pass != NULL) {
1184             nxt_http_pass_cleanup(task, skcf->pass);
1185         }
1186 
1187     } nxt_queue_loop;
1188 
1189     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1190 
1191         nxt_router_app_unlink(task, app);
1192 
1193     } nxt_queue_loop;
1194 
1195     router = rtcf->router;
1196 
1197     nxt_queue_add(&router->sockets, &tmcf->keeping);
1198     nxt_queue_add(&router->sockets, &tmcf->deleting);
1199 
1200     nxt_queue_add(&router->apps, &tmcf->previous);
1201 
1202     // TODO: new engines and threads
1203 
1204     nxt_router_access_log_release(task, &router->lock, rtcf->access_log);
1205 
1206     nxt_mp_destroy(rtcf->mem_pool);
1207 
1208     nxt_router_conf_send(task, tmcf, NXT_PORT_MSG_RPC_ERROR);
1209 }
1210 
1211 
1212 static void
1213 nxt_router_conf_send(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1214     nxt_port_msg_type_t type)
1215 {
1216     nxt_port_socket_write(task, tmcf->port, type, -1, tmcf->stream, 0, NULL);
1217 
1218     nxt_port_use(task, tmcf->port, -1);
1219 
1220     tmcf->port = NULL;
1221 }
1222 
1223 
1224 static nxt_conf_map_t  nxt_router_conf[] = {
1225     {
1226         nxt_string("listeners_threads"),
1227         NXT_CONF_MAP_INT32,
1228         offsetof(nxt_router_conf_t, threads),
1229     },
1230 };
1231 
1232 
1233 static nxt_conf_map_t  nxt_router_app_conf[] = {
1234     {
1235         nxt_string("type"),
1236         NXT_CONF_MAP_STR,
1237         offsetof(nxt_router_app_conf_t, type),
1238     },
1239 
1240     {
1241         nxt_string("limits"),
1242         NXT_CONF_MAP_PTR,
1243         offsetof(nxt_router_app_conf_t, limits_value),
1244     },
1245 
1246     {
1247         nxt_string("processes"),
1248         NXT_CONF_MAP_INT32,
1249         offsetof(nxt_router_app_conf_t, processes),
1250     },
1251 
1252     {
1253         nxt_string("processes"),
1254         NXT_CONF_MAP_PTR,
1255         offsetof(nxt_router_app_conf_t, processes_value),
1256     },
1257 };
1258 
1259 
1260 static nxt_conf_map_t  nxt_router_app_limits_conf[] = {
1261     {
1262         nxt_string("timeout"),
1263         NXT_CONF_MAP_MSEC,
1264         offsetof(nxt_router_app_conf_t, timeout),
1265     },
1266 
1267     {
1268         nxt_string("reschedule_timeout"),
1269         NXT_CONF_MAP_MSEC,
1270         offsetof(nxt_router_app_conf_t, res_timeout),
1271     },
1272 
1273     {
1274         nxt_string("requests"),
1275         NXT_CONF_MAP_INT32,
1276         offsetof(nxt_router_app_conf_t, requests),
1277     },
1278 };
1279 
1280 
1281 static nxt_conf_map_t  nxt_router_app_processes_conf[] = {
1282     {
1283         nxt_string("spare"),
1284         NXT_CONF_MAP_INT32,
1285         offsetof(nxt_router_app_conf_t, spare_processes),
1286     },
1287 
1288     {
1289         nxt_string("max"),
1290         NXT_CONF_MAP_INT32,
1291         offsetof(nxt_router_app_conf_t, max_processes),
1292     },
1293 
1294     {
1295         nxt_string("idle_timeout"),
1296         NXT_CONF_MAP_MSEC,
1297         offsetof(nxt_router_app_conf_t, idle_timeout),
1298     },
1299 };
1300 
1301 
1302 static nxt_conf_map_t  nxt_router_listener_conf[] = {
1303     {
1304         nxt_string("pass"),
1305         NXT_CONF_MAP_STR_COPY,
1306         offsetof(nxt_router_listener_conf_t, pass),
1307     },
1308 
1309     {
1310         nxt_string("application"),
1311         NXT_CONF_MAP_STR_COPY,
1312         offsetof(nxt_router_listener_conf_t, application),
1313     },
1314 };
1315 
1316 
1317 static nxt_conf_map_t  nxt_router_http_conf[] = {
1318     {
1319         nxt_string("header_buffer_size"),
1320         NXT_CONF_MAP_SIZE,
1321         offsetof(nxt_socket_conf_t, header_buffer_size),
1322     },
1323 
1324     {
1325         nxt_string("large_header_buffer_size"),
1326         NXT_CONF_MAP_SIZE,
1327         offsetof(nxt_socket_conf_t, large_header_buffer_size),
1328     },
1329 
1330     {
1331         nxt_string("large_header_buffers"),
1332         NXT_CONF_MAP_SIZE,
1333         offsetof(nxt_socket_conf_t, large_header_buffers),
1334     },
1335 
1336     {
1337         nxt_string("body_buffer_size"),
1338         NXT_CONF_MAP_SIZE,
1339         offsetof(nxt_socket_conf_t, body_buffer_size),
1340     },
1341 
1342     {
1343         nxt_string("max_body_size"),
1344         NXT_CONF_MAP_SIZE,
1345         offsetof(nxt_socket_conf_t, max_body_size),
1346     },
1347 
1348     {
1349         nxt_string("idle_timeout"),
1350         NXT_CONF_MAP_MSEC,
1351         offsetof(nxt_socket_conf_t, idle_timeout),
1352     },
1353 
1354     {
1355         nxt_string("header_read_timeout"),
1356         NXT_CONF_MAP_MSEC,
1357         offsetof(nxt_socket_conf_t, header_read_timeout),
1358     },
1359 
1360     {
1361         nxt_string("body_read_timeout"),
1362         NXT_CONF_MAP_MSEC,
1363         offsetof(nxt_socket_conf_t, body_read_timeout),
1364     },
1365 
1366     {
1367         nxt_string("send_timeout"),
1368         NXT_CONF_MAP_MSEC,
1369         offsetof(nxt_socket_conf_t, send_timeout),
1370     },
1371 };
1372 
1373 
1374 static nxt_int_t
1375 nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1376     u_char *start, u_char *end)
1377 {
1378     u_char                      *p;
1379     size_t                      size;
1380     nxt_mp_t                    *mp;
1381     uint32_t                    next;
1382     nxt_int_t                   ret;
1383     nxt_str_t                   name, path;
1384     nxt_app_t                   *app, *prev;
1385     nxt_router_t                *router;
1386     nxt_app_joint_t             *app_joint;
1387     nxt_conf_value_t            *conf, *http, *value;
1388     nxt_conf_value_t            *applications, *application;
1389     nxt_conf_value_t            *listeners, *listener;
1390     nxt_conf_value_t            *routes_conf;
1391     nxt_socket_conf_t           *skcf;
1392     nxt_http_routes_t           *routes;
1393     nxt_event_engine_t          *engine;
1394     nxt_app_lang_module_t       *lang;
1395     nxt_router_app_conf_t       apcf;
1396     nxt_router_access_log_t     *access_log;
1397     nxt_router_listener_conf_t  lscf;
1398 #if (NXT_TLS)
1399     nxt_router_tlssock_t        *tls;
1400 #endif
1401 
1402     static nxt_str_t  http_path = nxt_string("/settings/http");
1403     static nxt_str_t  applications_path = nxt_string("/applications");
1404     static nxt_str_t  listeners_path = nxt_string("/listeners");
1405     static nxt_str_t  routes_path = nxt_string("/routes");
1406     static nxt_str_t  access_log_path = nxt_string("/access_log");
1407 #if (NXT_TLS)
1408     static nxt_str_t  certificate_path = nxt_string("/tls/certificate");
1409 #endif
1410 
1411     conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
1412     if (conf == NULL) {
1413         nxt_alert(task, "configuration parsing error");
1414         return NXT_ERROR;
1415     }
1416 
1417     mp = tmcf->router_conf->mem_pool;
1418 
1419     ret = nxt_conf_map_object(mp, conf, nxt_router_conf,
1420                               nxt_nitems(nxt_router_conf), tmcf->router_conf);
1421     if (ret != NXT_OK) {
1422         nxt_alert(task, "root map error");
1423         return NXT_ERROR;
1424     }
1425 
1426     if (tmcf->router_conf->threads == 0) {
1427         tmcf->router_conf->threads = nxt_ncpu;
1428     }
1429 
1430     applications = nxt_conf_get_path(conf, &applications_path);
1431     if (applications == NULL) {
1432         nxt_alert(task, "no \"applications\" block");
1433         return NXT_ERROR;
1434     }
1435 
1436     router = tmcf->router_conf->router;
1437 
1438     next = 0;
1439 
1440     for ( ;; ) {
1441         application = nxt_conf_next_object_member(applications, &name, &next);
1442         if (application == NULL) {
1443             break;
1444         }
1445 
1446         nxt_debug(task, "application \"%V\"", &name);
1447 
1448         size = nxt_conf_json_length(application, NULL);
1449 
1450         app = nxt_malloc(sizeof(nxt_app_t) + name.length + size);
1451         if (app == NULL) {
1452             goto fail;
1453         }
1454 
1455         nxt_memzero(app, sizeof(nxt_app_t));
1456 
1457         app->name.start = nxt_pointer_to(app, sizeof(nxt_app_t));
1458         app->conf.start = nxt_pointer_to(app, sizeof(nxt_app_t) + name.length);
1459 
1460         p = nxt_conf_json_print(app->conf.start, application, NULL);
1461         app->conf.length = p - app->conf.start;
1462 
1463         nxt_assert(app->conf.length <= size);
1464 
1465         nxt_debug(task, "application conf \"%V\"", &app->conf);
1466 
1467         prev = nxt_router_app_find(&router->apps, &name);
1468 
1469         if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
1470             nxt_free(app);
1471 
1472             nxt_queue_remove(&prev->link);
1473             nxt_queue_insert_tail(&tmcf->previous, &prev->link);
1474             continue;
1475         }
1476 
1477         apcf.processes = 1;
1478         apcf.max_processes = 1;
1479         apcf.spare_processes = 0;
1480         apcf.timeout = 0;
1481         apcf.res_timeout = 1000;
1482         apcf.idle_timeout = 15000;
1483         apcf.requests = 0;
1484         apcf.limits_value = NULL;
1485         apcf.processes_value = NULL;
1486 
1487         app_joint = nxt_malloc(sizeof(nxt_app_joint_t));
1488         if (nxt_slow_path(app_joint == NULL)) {
1489             goto app_fail;
1490         }
1491 
1492         nxt_memzero(app_joint, sizeof(nxt_app_joint_t));
1493 
1494         ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
1495                                   nxt_nitems(nxt_router_app_conf), &apcf);
1496         if (ret != NXT_OK) {
1497             nxt_alert(task, "application map error");
1498             goto app_fail;
1499         }
1500 
1501         if (apcf.limits_value != NULL) {
1502 
1503             if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) {
1504                 nxt_alert(task, "application limits is not object");
1505                 goto app_fail;
1506             }
1507 
1508             ret = nxt_conf_map_object(mp, apcf.limits_value,
1509                                       nxt_router_app_limits_conf,
1510                                       nxt_nitems(nxt_router_app_limits_conf),
1511                                       &apcf);
1512             if (ret != NXT_OK) {
1513                 nxt_alert(task, "application limits map error");
1514                 goto app_fail;
1515             }
1516         }
1517 
1518         if (apcf.processes_value != NULL
1519             && nxt_conf_type(apcf.processes_value) == NXT_CONF_OBJECT)
1520         {
1521             ret = nxt_conf_map_object(mp, apcf.processes_value,
1522                                       nxt_router_app_processes_conf,
1523                                       nxt_nitems(nxt_router_app_processes_conf),
1524                                       &apcf);
1525             if (ret != NXT_OK) {
1526                 nxt_alert(task, "application processes map error");
1527                 goto app_fail;
1528             }
1529 
1530         } else {
1531             apcf.max_processes = apcf.processes;
1532             apcf.spare_processes = apcf.processes;
1533         }
1534 
1535         nxt_debug(task, "application type: %V", &apcf.type);
1536         nxt_debug(task, "application processes: %D", apcf.processes);
1537         nxt_debug(task, "application request timeout: %M", apcf.timeout);
1538         nxt_debug(task, "application reschedule timeout: %M", apcf.res_timeout);
1539         nxt_debug(task, "application requests: %D", apcf.requests);
1540 
1541         lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
1542 
1543         if (lang == NULL) {
1544             nxt_alert(task, "unknown application type: \"%V\"", &apcf.type);
1545             goto app_fail;
1546         }
1547 
1548         nxt_debug(task, "application language module: \"%s\"", lang->file);
1549 
1550         ret = nxt_thread_mutex_create(&app->mutex);
1551         if (ret != NXT_OK) {
1552             goto app_fail;
1553         }
1554 
1555         nxt_queue_init(&app->ports);
1556         nxt_queue_init(&app->spare_ports);
1557         nxt_queue_init(&app->idle_ports);
1558         nxt_queue_init(&app->requests);
1559         nxt_queue_init(&app->pending);
1560 
1561         app->name.length = name.length;
1562         nxt_memcpy(app->name.start, name.start, name.length);
1563 
1564         app->type = lang->type;
1565         app->max_processes = apcf.max_processes;
1566         app->spare_processes = apcf.spare_processes;
1567         app->max_pending_processes = apcf.spare_processes
1568                                       ? apcf.spare_processes : 1;
1569         app->timeout = apcf.timeout;
1570         app->res_timeout = apcf.res_timeout * 1000000;
1571         app->idle_timeout = apcf.idle_timeout;
1572         app->max_pending_responses = 2;
1573         app->max_requests = apcf.requests;
1574 
1575         engine = task->thread->engine;
1576 
1577         app->engine = engine;
1578 
1579         app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;
1580         app->adjust_idle_work.task = &engine->task;
1581         app->adjust_idle_work.obj = app;
1582 
1583         nxt_queue_insert_tail(&tmcf->apps, &app->link);
1584 
1585         nxt_router_app_use(task, app, 1);
1586 
1587         app->joint = app_joint;
1588 
1589         app_joint->use_count = 1;
1590         app_joint->app = app;
1591 
1592         app_joint->idle_timer.bias = NXT_TIMER_DEFAULT_BIAS;
1593         app_joint->idle_timer.work_queue = &engine->fast_work_queue;
1594         app_joint->idle_timer.handler = nxt_router_app_idle_timeout;
1595         app_joint->idle_timer.task = &engine->task;
1596         app_joint->idle_timer.log = app_joint->idle_timer.task->log;
1597 
1598         app_joint->free_app_work.handler = nxt_router_free_app;
1599         app_joint->free_app_work.task = &engine->task;
1600         app_joint->free_app_work.obj = app_joint;
1601     }
1602 
1603     routes_conf = nxt_conf_get_path(conf, &routes_path);
1604     if (nxt_fast_path(routes_conf != NULL)) {
1605         routes = nxt_http_routes_create(task, tmcf, routes_conf);
1606         if (nxt_slow_path(routes == NULL)) {
1607             return NXT_ERROR;
1608         }
1609         tmcf->router_conf->routes = routes;
1610     }
1611 
1612     http = nxt_conf_get_path(conf, &http_path);
1613 #if 0
1614     if (http == NULL) {
1615         nxt_alert(task, "no \"http\" block");
1616         return NXT_ERROR;
1617     }
1618 #endif
1619 
1620     listeners = nxt_conf_get_path(conf, &listeners_path);
1621     if (listeners == NULL) {
1622         nxt_alert(task, "no \"listeners\" block");
1623         return NXT_ERROR;
1624     }
1625 
1626     next = 0;
1627 
1628     for ( ;; ) {
1629         listener = nxt_conf_next_object_member(listeners, &name, &next);
1630         if (listener == NULL) {
1631             break;
1632         }
1633 
1634         skcf = nxt_router_socket_conf(task, tmcf, &name);
1635         if (skcf == NULL) {
1636             goto fail;
1637         }
1638 
1639         nxt_memzero(&lscf, sizeof(lscf));
1640 
1641         ret = nxt_conf_map_object(mp, listener, nxt_router_listener_conf,
1642                                   nxt_nitems(nxt_router_listener_conf), &lscf);
1643         if (ret != NXT_OK) {
1644             nxt_alert(task, "listener map error");
1645             goto fail;
1646         }
1647 
1648         nxt_debug(task, "application: %V", &lscf.application);
1649 
1650         // STUB, default values if http block is not defined.
1651         skcf->header_buffer_size = 2048;
1652         skcf->large_header_buffer_size = 8192;
1653         skcf->large_header_buffers = 4;
1654         skcf->body_buffer_size = 16 * 1024;
1655         skcf->max_body_size = 8 * 1024 * 1024;
1656         skcf->idle_timeout = 180 * 1000;
1657         skcf->header_read_timeout = 30 * 1000;
1658         skcf->body_read_timeout = 30 * 1000;
1659         skcf->send_timeout = 30 * 1000;
1660 
1661         if (http != NULL) {
1662             ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
1663                                       nxt_nitems(nxt_router_http_conf), skcf);
1664             if (ret != NXT_OK) {
1665                 nxt_alert(task, "http map error");
1666                 goto fail;
1667             }
1668         }
1669 
1670 #if (NXT_TLS)
1671 
1672         value = nxt_conf_get_path(listener, &certificate_path);
1673 
1674         if (value != NULL) {
1675             nxt_conf_get_string(value, &name);
1676 
1677             tls = nxt_mp_get(mp, sizeof(nxt_router_tlssock_t));
1678             if (nxt_slow_path(tls == NULL)) {
1679                 goto fail;
1680             }
1681 
1682             tls->name = name;
1683             tls->conf = skcf;
1684 
1685             nxt_queue_insert_tail(&tmcf->tls, &tls->link);
1686         }
1687 
1688 #endif
1689 
1690         skcf->listen->handler = nxt_http_conn_init;
1691         skcf->router_conf = tmcf->router_conf;
1692         skcf->router_conf->count++;
1693 
1694         if (lscf.pass.length != 0) {
1695             skcf->pass = nxt_http_pass_create(task, tmcf, &lscf.pass);
1696 
1697         /* COMPATIBILITY: listener application. */
1698         } else if (lscf.application.length > 0) {
1699             skcf->pass = nxt_http_pass_application(task, tmcf,
1700                                                    &lscf.application);
1701         }
1702     }
1703 
1704     value = nxt_conf_get_path(conf, &access_log_path);
1705 
1706     if (value != NULL) {
1707         nxt_conf_get_string(value, &path);
1708 
1709         access_log = router->access_log;
1710 
1711         if (access_log != NULL && nxt_strstr_eq(&path, &access_log->path)) {
1712             nxt_thread_spin_lock(&router->lock);
1713             access_log->count++;
1714             nxt_thread_spin_unlock(&router->lock);
1715 
1716         } else {
1717             access_log = nxt_malloc(sizeof(nxt_router_access_log_t)
1718                                     + path.length);
1719             if (access_log == NULL) {
1720                 nxt_alert(task, "failed to allocate access log structure");
1721                 goto fail;
1722             }
1723 
1724             access_log->fd = -1;
1725             access_log->handler = &nxt_router_access_log_writer;
1726             access_log->count = 1;
1727 
1728             access_log->path.length = path.length;
1729             access_log->path.start = (u_char *) access_log
1730                                      + sizeof(nxt_router_access_log_t);
1731 
1732             nxt_memcpy(access_log->path.start, path.start, path.length);
1733         }
1734 
1735         tmcf->router_conf->access_log = access_log;
1736     }
1737 
1738     nxt_http_routes_resolve(task, tmcf);
1739 
1740     nxt_queue_add(&tmcf->deleting, &router->sockets);
1741     nxt_queue_init(&router->sockets);
1742 
1743     return NXT_OK;
1744 
1745 app_fail:
1746 
1747     nxt_free(app);
1748 
1749 fail:
1750 
1751     nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
1752 
1753         nxt_queue_remove(&app->link);
1754         nxt_thread_mutex_destroy(&app->mutex);
1755         nxt_free(app);
1756 
1757     } nxt_queue_loop;
1758 
1759     return NXT_ERROR;
1760 }
1761 
1762 
1763 static nxt_app_t *
1764 nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
1765 {
1766     nxt_app_t  *app;
1767 
1768     nxt_queue_each(app, queue, nxt_app_t, link) {
1769 
1770         if (nxt_strstr_eq(name, &app->name)) {
1771             return app;
1772         }
1773 
1774     } nxt_queue_loop;
1775 
1776     return NULL;
1777 }
1778 
1779 
1780 nxt_app_t *
1781 nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name)
1782 {
1783     nxt_app_t  *app;
1784 
1785     app = nxt_router_app_find(&tmcf->apps, name);
1786 
1787     if (app == NULL) {
1788         app = nxt_router_app_find(&tmcf->previous, name);
1789     }
1790 
1791     return app;
1792 }
1793 
1794 
1795 static nxt_socket_conf_t *
1796 nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
1797     nxt_str_t *name)
1798 {
1799     size_t               size;
1800     nxt_int_t            ret;
1801     nxt_bool_t           wildcard;
1802     nxt_sockaddr_t       *sa;
1803     nxt_socket_conf_t    *skcf;
1804     nxt_listen_socket_t  *ls;
1805 
1806     sa = nxt_sockaddr_parse(tmcf->mem_pool, name);
1807     if (nxt_slow_path(sa == NULL)) {
1808         nxt_alert(task, "invalid listener \"%V\"", name);
1809         return NULL;
1810     }
1811 
1812     sa->type = SOCK_STREAM;
1813 
1814     nxt_debug(task, "router listener: \"%*s\"",
1815               (size_t) sa->length, nxt_sockaddr_start(sa));
1816 
1817     skcf = nxt_mp_zget(tmcf->router_conf->mem_pool, sizeof(nxt_socket_conf_t));
1818     if (nxt_slow_path(skcf == NULL)) {
1819         return NULL;
1820     }
1821 
1822     size = nxt_sockaddr_size(sa);
1823 
1824     ret = nxt_router_listen_socket_find(tmcf, skcf, sa);
1825 
1826     if (ret != NXT_OK) {
1827 
1828         ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size);
1829         if (nxt_slow_path(ls == NULL)) {
1830             return NULL;
1831         }
1832 
1833         skcf->listen = ls;
1834 
1835         ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t));
1836         nxt_memcpy(ls->sockaddr, sa, size);
1837 
1838         nxt_listen_socket_remote_size(ls);
1839 
1840         ls->socket = -1;
1841         ls->backlog = NXT_LISTEN_BACKLOG;
1842         ls->flags = NXT_NONBLOCK;
1843         ls->read_after_accept = 1;
1844     }
1845 
1846     switch (sa->u.sockaddr.sa_family) {
1847 #if (NXT_HAVE_UNIX_DOMAIN)
1848     case AF_UNIX:
1849         wildcard = 0;
1850         break;
1851 #endif
1852 #if (NXT_INET6)
1853     case AF_INET6:
1854         wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr);
1855         break;
1856 #endif
1857     case AF_INET:
1858     default:
1859         wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY);
1860         break;
1861     }
1862 
1863     if (!wildcard) {
1864         skcf->sockaddr = nxt_mp_zget(tmcf->router_conf->mem_pool, size);
1865         if (nxt_slow_path(skcf->sockaddr == NULL)) {
1866             return NULL;
1867         }
1868 
1869         nxt_memcpy(skcf->sockaddr, sa, size);
1870     }
1871 
1872     return skcf;
1873 }
1874 
1875 
1876 static nxt_int_t
1877 nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
1878     nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa)
1879 {
1880     nxt_router_t       *router;
1881     nxt_queue_link_t   *qlk;
1882     nxt_socket_conf_t  *skcf;
1883 
1884     router = tmcf->router_conf->router;
1885 
1886     for (qlk = nxt_queue_first(&router->sockets);
1887          qlk != nxt_queue_tail(&router->sockets);
1888          qlk = nxt_queue_next(qlk))
1889     {
1890         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
1891 
1892         if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) {
1893             nskcf->listen = skcf->listen;
1894 
1895             nxt_queue_remove(qlk);
1896             nxt_queue_insert_tail(&tmcf->keeping, qlk);
1897 
1898             nxt_queue_insert_tail(&tmcf->updating, &nskcf->link);
1899 
1900             return NXT_OK;
1901         }
1902     }
1903 
1904     nxt_queue_insert_tail(&tmcf->pending, &nskcf->link);
1905 
1906     return NXT_DECLINED;
1907 }
1908 
1909 
1910 static void
1911 nxt_router_listen_socket_rpc_create(nxt_task_t *task,
1912     nxt_router_temp_conf_t *tmcf, nxt_socket_conf_t *skcf)
1913 {
1914     size_t            size;
1915     uint32_t          stream;
1916     nxt_int_t         ret;
1917     nxt_buf_t         *b;
1918     nxt_port_t        *main_port, *router_port;
1919     nxt_runtime_t     *rt;
1920     nxt_socket_rpc_t  *rpc;
1921 
1922     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
1923     if (rpc == NULL) {
1924         goto fail;
1925     }
1926 
1927     rpc->socket_conf = skcf;
1928     rpc->temp_conf = tmcf;
1929 
1930     size = nxt_sockaddr_size(skcf->listen->sockaddr);
1931 
1932     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
1933     if (b == NULL) {
1934         goto fail;
1935     }
1936 
1937     b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size);
1938 
1939     rt = task->thread->runtime;
1940     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
1941     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
1942 
1943     stream = nxt_port_rpc_register_handler(task, router_port,
1944                                            nxt_router_listen_socket_ready,
1945                                            nxt_router_listen_socket_error,
1946                                            main_port->pid, rpc);
1947     if (nxt_slow_path(stream == 0)) {
1948         goto fail;
1949     }
1950 
1951     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_SOCKET, -1,
1952                                 stream, router_port->id, b);
1953 
1954     if (nxt_slow_path(ret != NXT_OK)) {
1955         nxt_port_rpc_cancel(task, router_port, stream);
1956         goto fail;
1957     }
1958 
1959     return;
1960 
1961 fail:
1962 
1963     nxt_router_conf_error(task, tmcf);
1964 }
1965 
1966 
1967 static void
1968 nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
1969     void *data)
1970 {
1971     nxt_int_t         ret;
1972     nxt_socket_t      s;
1973     nxt_socket_rpc_t  *rpc;
1974 
1975     rpc = data;
1976 
1977     s = msg->fd;
1978 
1979     ret = nxt_socket_nonblocking(task, s);
1980     if (nxt_slow_path(ret != NXT_OK)) {
1981         goto fail;
1982     }
1983 
1984     nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr);
1985 
1986     ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
1987     if (nxt_slow_path(ret != NXT_OK)) {
1988         goto fail;
1989     }
1990 
1991     rpc->socket_conf->listen->socket = s;
1992 
1993     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
1994                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
1995 
1996     return;
1997 
1998 fail:
1999 
2000     nxt_socket_close(task, s);
2001 
2002     nxt_router_conf_error(task, rpc->temp_conf);
2003 }
2004 
2005 
2006 static void
2007 nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2008     void *data)
2009 {
2010     nxt_socket_rpc_t        *rpc;
2011     nxt_router_temp_conf_t  *tmcf;
2012 
2013     rpc = data;
2014     tmcf = rpc->temp_conf;
2015 
2016 #if 0
2017     u_char                  *p;
2018     size_t                  size;
2019     uint8_t                 error;
2020     nxt_buf_t               *in, *out;
2021     nxt_sockaddr_t          *sa;
2022 
2023     static nxt_str_t  socket_errors[] = {
2024         nxt_string("ListenerSystem"),
2025         nxt_string("ListenerNoIPv6"),
2026         nxt_string("ListenerPort"),
2027         nxt_string("ListenerInUse"),
2028         nxt_string("ListenerNoAddress"),
2029         nxt_string("ListenerNoAccess"),
2030         nxt_string("ListenerPath"),
2031     };
2032 
2033     sa = rpc->socket_conf->listen->sockaddr;
2034 
2035     in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size);
2036 
2037     if (nxt_slow_path(in == NULL)) {
2038         return;
2039     }
2040 
2041     p = in->mem.pos;
2042 
2043     error = *p++;
2044 
2045     size = nxt_length("listen socket error: ")
2046            + nxt_length("{listener: \"\", code:\"\", message: \"\"}")
2047            + sa->length + socket_errors[error].length + (in->mem.free - p);
2048 
2049     out = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2050     if (nxt_slow_path(out == NULL)) {
2051         return;
2052     }
2053 
2054     out->mem.free = nxt_sprintf(out->mem.free, out->mem.end,
2055                         "listen socket error: "
2056                         "{listener: \"%*s\", code:\"%V\", message: \"%*s\"}",
2057                         (size_t) sa->length, nxt_sockaddr_start(sa),
2058                         &socket_errors[error], in->mem.free - p, p);
2059 
2060     nxt_debug(task, "%*s", out->mem.free - out->mem.pos, out->mem.pos);
2061 #endif
2062 
2063     nxt_router_conf_error(task, tmcf);
2064 }
2065 
2066 
2067 #if (NXT_TLS)
2068 
2069 static void
2070 nxt_router_tls_rpc_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
2071     nxt_router_tlssock_t *tls)
2072 {
2073     nxt_socket_rpc_t  *rpc;
2074 
2075     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_socket_rpc_t));
2076     if (rpc == NULL) {
2077         nxt_router_conf_error(task, tmcf);
2078         return;
2079     }
2080 
2081     rpc->socket_conf = tls->conf;
2082     rpc->temp_conf = tmcf;
2083 
2084     nxt_cert_store_get(task, &tls->name, tmcf->mem_pool,
2085                        nxt_router_tls_rpc_handler, rpc);
2086 }
2087 
2088 
2089 static void
2090 nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2091     void *data)
2092 {
2093     nxt_mp_t               *mp;
2094     nxt_int_t              ret;
2095     nxt_tls_conf_t         *tlscf;
2096     nxt_socket_rpc_t       *rpc;
2097     nxt_router_temp_conf_t *tmcf;
2098 
2099     nxt_debug(task, "tls rpc handler");
2100 
2101     rpc = data;
2102     tmcf = rpc->temp_conf;
2103 
2104     if (msg == NULL || msg->port_msg.type == _NXT_PORT_MSG_RPC_ERROR) {
2105         goto fail;
2106     }
2107 
2108     mp = tmcf->router_conf->mem_pool;
2109 
2110     tlscf = nxt_mp_zget(mp, sizeof(nxt_tls_conf_t));
2111     if (nxt_slow_path(tlscf == NULL)) {
2112         goto fail;
2113     }
2114 
2115     tlscf->chain_file = msg->fd;
2116 
2117     ret = task->thread->runtime->tls->server_init(task, tlscf);
2118     if (nxt_slow_path(ret != NXT_OK)) {
2119         goto fail;
2120     }
2121 
2122     rpc->socket_conf->tls = tlscf;
2123 
2124     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
2125                        nxt_router_conf_apply, task, tmcf, NULL);
2126     return;
2127 
2128 fail:
2129 
2130     nxt_router_conf_error(task, tmcf);
2131 }
2132 
2133 #endif
2134 
2135 
2136 static void
2137 nxt_router_app_rpc_create(nxt_task_t *task,
2138     nxt_router_temp_conf_t *tmcf, nxt_app_t *app)
2139 {
2140     size_t         size;
2141     uint32_t       stream;
2142     nxt_int_t      ret;
2143     nxt_buf_t      *b;
2144     nxt_port_t     *main_port, *router_port;
2145     nxt_runtime_t  *rt;
2146     nxt_app_rpc_t  *rpc;
2147 
2148     rpc = nxt_mp_alloc(tmcf->mem_pool, sizeof(nxt_app_rpc_t));
2149     if (rpc == NULL) {
2150         goto fail;
2151     }
2152 
2153     rpc->app = app;
2154     rpc->temp_conf = tmcf;
2155 
2156     nxt_debug(task, "app '%V' prefork", &app->name);
2157 
2158     size = app->name.length + 1 + app->conf.length;
2159 
2160     b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
2161     if (nxt_slow_path(b == NULL)) {
2162         goto fail;
2163     }
2164 
2165     nxt_buf_cpystr(b, &app->name);
2166     *b->mem.free++ = '\0';
2167     nxt_buf_cpystr(b, &app->conf);
2168 
2169     rt = task->thread->runtime;
2170     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
2171     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
2172 
2173     stream = nxt_port_rpc_register_handler(task, router_port,
2174                                            nxt_router_app_prefork_ready,
2175                                            nxt_router_app_prefork_error,
2176                                            -1, rpc);
2177     if (nxt_slow_path(stream == 0)) {
2178         goto fail;
2179     }
2180 
2181     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1,
2182                                 stream, router_port->id, b);
2183 
2184     if (nxt_slow_path(ret != NXT_OK)) {
2185         nxt_port_rpc_cancel(task, router_port, stream);
2186         goto fail;
2187     }
2188 
2189     app->pending_processes++;
2190 
2191     return;
2192 
2193 fail:
2194 
2195     nxt_router_conf_error(task, tmcf);
2196 }
2197 
2198 
2199 static void
2200 nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2201     void *data)
2202 {
2203     nxt_app_t           *app;
2204     nxt_port_t          *port;
2205     nxt_app_rpc_t       *rpc;
2206     nxt_event_engine_t  *engine;
2207 
2208     rpc = data;
2209     app = rpc->app;
2210 
2211     port = msg->u.new_port;
2212     port->app = app;
2213 
2214     app->pending_processes--;
2215     app->processes++;
2216     app->idle_processes++;
2217 
2218     engine = task->thread->engine;
2219 
2220     nxt_queue_insert_tail(&app->ports, &port->app_link);
2221     nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
2222 
2223     port->idle_start = 0;
2224 
2225     nxt_port_inc_use(port);
2226 
2227     nxt_work_queue_add(&engine->fast_work_queue,
2228                        nxt_router_conf_apply, task, rpc->temp_conf, NULL);
2229 }
2230 
2231 
2232 static void
2233 nxt_router_app_prefork_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2234     void *data)
2235 {
2236     nxt_app_t               *app;
2237     nxt_app_rpc_t           *rpc;
2238     nxt_router_temp_conf_t  *tmcf;
2239 
2240     rpc = data;
2241     app = rpc->app;
2242     tmcf = rpc->temp_conf;
2243 
2244     nxt_log(task, NXT_LOG_WARN, "failed to start application \"%V\"",
2245             &app->name);
2246 
2247     app->pending_processes--;
2248 
2249     nxt_router_conf_error(task, tmcf);
2250 }
2251 
2252 
2253 static nxt_int_t
2254 nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
2255     nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
2256 {
2257     nxt_int_t                 ret;
2258     nxt_uint_t                n, threads;
2259     nxt_queue_link_t          *qlk;
2260     nxt_router_engine_conf_t  *recf;
2261 
2262     threads = tmcf->router_conf->threads;
2263 
2264     tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
2265                                      sizeof(nxt_router_engine_conf_t));
2266     if (nxt_slow_path(tmcf->engines == NULL)) {
2267         return NXT_ERROR;
2268     }
2269 
2270     n = 0;
2271 
2272     for (qlk = nxt_queue_first(&router->engines);
2273          qlk != nxt_queue_tail(&router->engines);
2274          qlk = nxt_queue_next(qlk))
2275     {
2276         recf = nxt_array_zero_add(tmcf->engines);
2277         if (nxt_slow_path(recf == NULL)) {
2278             return NXT_ERROR;
2279         }
2280 
2281         recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0);
2282 
2283         if (n < threads) {
2284             recf->action = NXT_ROUTER_ENGINE_KEEP;
2285             ret = nxt_router_engine_conf_update(tmcf, recf);
2286 
2287         } else {
2288             recf->action = NXT_ROUTER_ENGINE_DELETE;
2289             ret = nxt_router_engine_conf_delete(tmcf, recf);
2290         }
2291 
2292         if (nxt_slow_path(ret != NXT_OK)) {
2293             return ret;
2294         }
2295 
2296         n++;
2297     }
2298 
2299     tmcf->new_threads = n;
2300 
2301     while (n < threads) {
2302         recf = nxt_array_zero_add(tmcf->engines);
2303         if (nxt_slow_path(recf == NULL)) {
2304             return NXT_ERROR;
2305         }
2306 
2307         recf->action = NXT_ROUTER_ENGINE_ADD;
2308 
2309         recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
2310         if (nxt_slow_path(recf->engine == NULL)) {
2311             return NXT_ERROR;
2312         }
2313 
2314         ret = nxt_router_engine_conf_create(tmcf, recf);
2315         if (nxt_slow_path(ret != NXT_OK)) {
2316             return ret;
2317         }
2318 
2319         n++;
2320     }
2321 
2322     return NXT_OK;
2323 }
2324 
2325 
2326 static nxt_int_t
2327 nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
2328     nxt_router_engine_conf_t *recf)
2329 {
2330     nxt_int_t  ret;
2331 
2332     ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating,
2333                                           nxt_router_listen_socket_create);
2334     if (nxt_slow_path(ret != NXT_OK)) {
2335         return ret;
2336     }
2337 
2338     ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating,
2339                                           nxt_router_listen_socket_create);
2340     if (nxt_slow_path(ret != NXT_OK)) {
2341         return ret;
2342     }
2343 
2344     return ret;
2345 }
2346 
2347 
2348 static nxt_int_t
2349 nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
2350     nxt_router_engine_conf_t *recf)
2351 {
2352     nxt_int_t  ret;
2353 
2354     ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating,
2355                                           nxt_router_listen_socket_create);
2356     if (nxt_slow_path(ret != NXT_OK)) {
2357         return ret;
2358     }
2359 
2360     ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating,
2361                                           nxt_router_listen_socket_update);
2362     if (nxt_slow_path(ret != NXT_OK)) {
2363         return ret;
2364     }
2365 
2366     ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting);
2367     if (nxt_slow_path(ret != NXT_OK)) {
2368         return ret;
2369     }
2370 
2371     return ret;
2372 }
2373 
2374 
2375 static nxt_int_t
2376 nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
2377     nxt_router_engine_conf_t *recf)
2378 {
2379     nxt_int_t  ret;
2380 
2381     ret = nxt_router_engine_quit(tmcf, recf);
2382     if (nxt_slow_path(ret != NXT_OK)) {
2383         return ret;
2384     }
2385 
2386     ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->updating);
2387     if (nxt_slow_path(ret != NXT_OK)) {
2388         return ret;
2389     }
2390 
2391     return nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting);
2392 }
2393 
2394 
2395 static nxt_int_t
2396 nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
2397     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
2398     nxt_work_handler_t handler)
2399 {
2400     nxt_joint_job_t          *job;
2401     nxt_queue_link_t         *qlk;
2402     nxt_socket_conf_t        *skcf;
2403     nxt_socket_conf_joint_t  *joint;
2404 
2405     for (qlk = nxt_queue_first(sockets);
2406          qlk != nxt_queue_tail(sockets);
2407          qlk = nxt_queue_next(qlk))
2408     {
2409         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
2410         if (nxt_slow_path(job == NULL)) {
2411             return NXT_ERROR;
2412         }
2413 
2414         job->work.next = recf->jobs;
2415         recf->jobs = &job->work;
2416 
2417         job->task = tmcf->engine->task;
2418         job->work.handler = handler;
2419         job->work.task = &job->task;
2420         job->work.obj = job;
2421         job->tmcf = tmcf;
2422 
2423         tmcf->count++;
2424 
2425         joint = nxt_mp_alloc(tmcf->router_conf->mem_pool,
2426                              sizeof(nxt_socket_conf_joint_t));
2427         if (nxt_slow_path(joint == NULL)) {
2428             return NXT_ERROR;
2429         }
2430 
2431         job->work.data = joint;
2432 
2433         joint->count = 1;
2434 
2435         skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2436         skcf->count++;
2437         joint->socket_conf = skcf;
2438 
2439         joint->engine = recf->engine;
2440     }
2441 
2442     return NXT_OK;
2443 }
2444 
2445 
2446 static nxt_int_t
2447 nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
2448     nxt_router_engine_conf_t *recf)
2449 {
2450     nxt_joint_job_t  *job;
2451 
2452     job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
2453     if (nxt_slow_path(job == NULL)) {
2454         return NXT_ERROR;
2455     }
2456 
2457     job->work.next = recf->jobs;
2458     recf->jobs = &job->work;
2459 
2460     job->task = tmcf->engine->task;
2461     job->work.handler = nxt_router_worker_thread_quit;
2462     job->work.task = &job->task;
2463     job->work.obj = NULL;
2464     job->work.data = NULL;
2465     job->tmcf = NULL;
2466 
2467     return NXT_OK;
2468 }
2469 
2470 
2471 static nxt_int_t
2472 nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
2473     nxt_router_engine_conf_t *recf, nxt_queue_t *sockets)
2474 {
2475     nxt_joint_job_t   *job;
2476     nxt_queue_link_t  *qlk;
2477 
2478     for (qlk = nxt_queue_first(sockets);
2479          qlk != nxt_queue_tail(sockets);
2480          qlk = nxt_queue_next(qlk))
2481     {
2482         job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
2483         if (nxt_slow_path(job == NULL)) {
2484             return NXT_ERROR;
2485         }
2486 
2487         job->work.next = recf->jobs;
2488         recf->jobs = &job->work;
2489 
2490         job->task = tmcf->engine->task;
2491         job->work.handler = nxt_router_listen_socket_delete;
2492         job->work.task = &job->task;
2493         job->work.obj = job;
2494         job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
2495         job->tmcf = tmcf;
2496 
2497         tmcf->count++;
2498     }
2499 
2500     return NXT_OK;
2501 }
2502 
2503 
2504 static nxt_int_t
2505 nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
2506     nxt_router_temp_conf_t *tmcf)
2507 {
2508     nxt_int_t                 ret;
2509     nxt_uint_t                i, threads;
2510     nxt_router_engine_conf_t  *recf;
2511 
2512     recf = tmcf->engines->elts;
2513     threads = tmcf->router_conf->threads;
2514 
2515     for (i = tmcf->new_threads; i < threads; i++) {
2516         ret = nxt_router_thread_create(task, rt, recf[i].engine);
2517         if (nxt_slow_path(ret != NXT_OK)) {
2518             return ret;
2519         }
2520     }
2521 
2522     return NXT_OK;
2523 }
2524 
2525 
2526 static nxt_int_t
2527 nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
2528     nxt_event_engine_t *engine)
2529 {
2530     nxt_int_t            ret;
2531     nxt_thread_link_t    *link;
2532     nxt_thread_handle_t  handle;
2533 
2534     link = nxt_zalloc(sizeof(nxt_thread_link_t));
2535 
2536     if (nxt_slow_path(link == NULL)) {
2537         return NXT_ERROR;
2538     }
2539 
2540     link->start = nxt_router_thread_start;
2541     link->engine = engine;
2542     link->work.handler = nxt_router_thread_exit_handler;
2543     link->work.task = task;
2544     link->work.data = link;
2545 
2546     nxt_queue_insert_tail(&rt->engines, &engine->link);
2547 
2548     ret = nxt_thread_create(&handle, link);
2549 
2550     if (nxt_slow_path(ret != NXT_OK)) {
2551         nxt_queue_remove(&engine->link);
2552     }
2553 
2554     return ret;
2555 }
2556 
2557 
2558 static void
2559 nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
2560     nxt_router_temp_conf_t *tmcf)
2561 {
2562     nxt_app_t  *app;
2563 
2564     nxt_queue_each(app, &router->apps, nxt_app_t, link) {
2565 
2566         nxt_router_app_unlink(task, app);
2567 
2568     } nxt_queue_loop;
2569 
2570     nxt_queue_add(&router->apps, &tmcf->previous);
2571     nxt_queue_add(&router->apps, &tmcf->apps);
2572 }
2573 
2574 
2575 static void
2576 nxt_router_engines_post(nxt_router_t *router, nxt_router_temp_conf_t *tmcf)
2577 {
2578     nxt_uint_t                n;
2579     nxt_event_engine_t        *engine;
2580     nxt_router_engine_conf_t  *recf;
2581 
2582     recf = tmcf->engines->elts;
2583 
2584     for (n = tmcf->engines->nelts; n != 0; n--) {
2585         engine = recf->engine;
2586 
2587         switch (recf->action) {
2588 
2589         case NXT_ROUTER_ENGINE_KEEP:
2590             break;
2591 
2592         case NXT_ROUTER_ENGINE_ADD:
2593             nxt_queue_insert_tail(&router->engines, &engine->link0);
2594             break;
2595 
2596         case NXT_ROUTER_ENGINE_DELETE:
2597             nxt_queue_remove(&engine->link0);
2598             break;
2599         }
2600 
2601         nxt_router_engine_post(engine, recf->jobs);
2602 
2603         recf++;
2604     }
2605 }
2606 
2607 
2608 static void
2609 nxt_router_engine_post(nxt_event_engine_t *engine, nxt_work_t *jobs)
2610 {
2611     nxt_work_t  *work, *next;
2612 
2613     for (work = jobs; work != NULL; work = next) {
2614         next = work->next;
2615         work->next = NULL;
2616 
2617         nxt_event_engine_post(engine, work);
2618     }
2619 }
2620 
2621 
2622 static nxt_port_handlers_t  nxt_router_app_port_handlers = {
2623     .rpc_error = nxt_port_rpc_handler,
2624     .mmap      = nxt_port_mmap_handler,
2625     .data      = nxt_port_rpc_handler,
2626 };
2627 
2628 
2629 static void
2630 nxt_router_thread_start(void *data)
2631 {
2632     nxt_int_t           ret;
2633     nxt_port_t          *port;
2634     nxt_task_t          *task;
2635     nxt_thread_t        *thread;
2636     nxt_thread_link_t   *link;
2637     nxt_event_engine_t  *engine;
2638 
2639     link = data;
2640     engine = link->engine;
2641     task = &engine->task;
2642 
2643     thread = nxt_thread();
2644 
2645     nxt_event_engine_thread_adopt(engine);
2646 
2647     /* STUB */
2648     thread->runtime = engine->task.thread->runtime;
2649 
2650     engine->task.thread = thread;
2651     engine->task.log = thread->log;
2652     thread->engine = engine;
2653     thread->task = &engine->task;
2654 #if 0
2655     thread->fiber = &engine->fibers->fiber;
2656 #endif
2657 
2658     engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
2659     if (nxt_slow_path(engine->mem_pool == NULL)) {
2660         return;
2661     }
2662 
2663     port = nxt_port_new(task, nxt_port_get_next_id(), nxt_pid,
2664                         NXT_PROCESS_ROUTER);
2665     if (nxt_slow_path(port == NULL)) {
2666         return;
2667     }
2668 
2669     ret = nxt_port_socket_init(task, port, 0);
2670     if (nxt_slow_path(ret != NXT_OK)) {
2671         nxt_port_use(task, port, -1);
2672         return;
2673     }
2674 
2675     engine->port = port;
2676 
2677     nxt_port_enable(task, port, &nxt_router_app_port_handlers);
2678 
2679     nxt_event_engine_start(engine);
2680 }
2681 
2682 
2683 static void
2684 nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data)
2685 {
2686     nxt_joint_job_t          *job;
2687     nxt_socket_conf_t        *skcf;
2688     nxt_listen_event_t       *lev;
2689     nxt_listen_socket_t      *ls;
2690     nxt_thread_spinlock_t    *lock;
2691     nxt_socket_conf_joint_t  *joint;
2692 
2693     job = obj;
2694     joint = data;
2695 
2696     nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link);
2697 
2698     skcf = joint->socket_conf;
2699     ls = skcf->listen;
2700 
2701     lev = nxt_listen_event(task, ls);
2702     if (nxt_slow_path(lev == NULL)) {
2703         nxt_router_listen_socket_release(task, skcf);
2704         return;
2705     }
2706 
2707     lev->socket.data = joint;
2708 
2709     lock = &skcf->router_conf->router->lock;
2710 
2711     nxt_thread_spin_lock(lock);
2712     ls->count++;
2713     nxt_thread_spin_unlock(lock);
2714 
2715     job->work.next = NULL;
2716     job->work.handler = nxt_router_conf_wait;
2717 
2718     nxt_event_engine_post(job->tmcf->engine, &job->work);
2719 }
2720 
2721 
2722 nxt_inline nxt_listen_event_t *
2723 nxt_router_listen_event(nxt_queue_t *listen_connections,
2724     nxt_socket_conf_t *skcf)
2725 {
2726     nxt_socket_t        fd;
2727     nxt_queue_link_t    *qlk;
2728     nxt_listen_event_t  *lev;
2729 
2730     fd = skcf->listen->socket;
2731 
2732     for (qlk = nxt_queue_first(listen_connections);
2733          qlk != nxt_queue_tail(listen_connections);
2734          qlk = nxt_queue_next(qlk))
2735     {
2736         lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link);
2737 
2738         if (fd == lev->socket.fd) {
2739             return lev;
2740         }
2741     }
2742 
2743     return NULL;
2744 }
2745 
2746 
2747 static void
2748 nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data)
2749 {
2750     nxt_joint_job_t          *job;
2751     nxt_event_engine_t       *engine;
2752     nxt_listen_event_t       *lev;
2753     nxt_socket_conf_joint_t  *joint, *old;
2754 
2755     job = obj;
2756     joint = data;
2757 
2758     engine = task->thread->engine;
2759 
2760     nxt_queue_insert_tail(&engine->joints, &joint->link);
2761 
2762     lev = nxt_router_listen_event(&engine->listen_connections,
2763                                   joint->socket_conf);
2764 
2765     old = lev->socket.data;
2766     lev->socket.data = joint;
2767     lev->listen = joint->socket_conf->listen;
2768 
2769     job->work.next = NULL;
2770     job->work.handler = nxt_router_conf_wait;
2771 
2772     nxt_event_engine_post(job->tmcf->engine, &job->work);
2773 
2774     /*
2775      * The task is allocated from configuration temporary
2776      * memory pool so it can be freed after engine post operation.
2777      */
2778 
2779     nxt_router_conf_release(&engine->task, old);
2780 }
2781 
2782 
2783 static void
2784 nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data)
2785 {
2786     nxt_joint_job_t     *job;
2787     nxt_socket_conf_t   *skcf;
2788     nxt_listen_event_t  *lev;
2789     nxt_event_engine_t  *engine;
2790 
2791     job = obj;
2792     skcf = data;
2793 
2794     engine = task->thread->engine;
2795 
2796     lev = nxt_router_listen_event(&engine->listen_connections, skcf);
2797 
2798     nxt_fd_event_delete(engine, &lev->socket);
2799 
2800     nxt_debug(task, "engine %p: listen socket delete: %d", engine,
2801               lev->socket.fd);
2802 
2803     lev->timer.handler = nxt_router_listen_socket_close;
2804     lev->timer.work_queue = &engine->fast_work_queue;
2805 
2806     nxt_timer_add(engine, &lev->timer, 0);
2807 
2808     job->work.next = NULL;
2809     job->work.handler = nxt_router_conf_wait;
2810 
2811     nxt_event_engine_post(job->tmcf->engine, &job->work);
2812 }
2813 
2814 
2815 static void
2816 nxt_router_worker_thread_quit(nxt_task_t *task, void *obj, void *data)
2817 {
2818     nxt_event_engine_t  *engine;
2819 
2820     nxt_debug(task, "router worker thread quit");
2821 
2822     engine = task->thread->engine;
2823 
2824     engine->shutdown = 1;
2825 
2826     if (nxt_queue_is_empty(&engine->joints)) {
2827         nxt_thread_exit(task->thread);
2828     }
2829 }
2830 
2831 
2832 static void
2833 nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data)
2834 {
2835     nxt_timer_t              *timer;
2836     nxt_listen_event_t       *lev;
2837     nxt_socket_conf_joint_t  *joint;
2838 
2839     timer = obj;
2840     lev = nxt_timer_data(timer, nxt_listen_event_t, timer);
2841 
2842     nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine,
2843               lev->socket.fd);
2844 
2845     nxt_queue_remove(&lev->link);
2846 
2847     joint = lev->socket.data;
2848     lev->socket.data = NULL;
2849 
2850     /* 'task' refers to lev->task and we cannot use after nxt_free() */
2851     task = &task->thread->engine->task;
2852 
2853     nxt_router_listen_socket_release(task, joint->socket_conf);
2854 
2855     nxt_router_listen_event_release(task, lev, joint);
2856 }
2857 
2858 
2859 static void
2860 nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf)
2861 {
2862     nxt_listen_socket_t    *ls;
2863     nxt_thread_spinlock_t  *lock;
2864 
2865     ls = skcf->listen;
2866     lock = &skcf->router_conf->router->lock;
2867 
2868     nxt_thread_spin_lock(lock);
2869 
2870     nxt_debug(task, "engine %p: listen socket release: ls->count %D",
2871               task->thread->engine, ls->count);
2872 
2873     if (--ls->count != 0) {
2874         ls = NULL;
2875     }
2876 
2877     nxt_thread_spin_unlock(lock);
2878 
2879     if (ls != NULL) {
2880         nxt_socket_close(task, ls->socket);
2881         nxt_free(ls);
2882     }
2883 }
2884 
2885 
2886 void
2887 nxt_router_listen_event_release(nxt_task_t *task, nxt_listen_event_t *lev,
2888     nxt_socket_conf_joint_t *joint)
2889 {
2890     nxt_event_engine_t  *engine;
2891 
2892     nxt_debug(task, "listen event count: %D", lev->count);
2893 
2894     if (--lev->count == 0) {
2895         nxt_free(lev);
2896     }
2897 
2898     if (joint != NULL) {
2899         nxt_router_conf_release(task, joint);
2900     }
2901 
2902     engine = task->thread->engine;
2903 
2904     if (engine->shutdown && nxt_queue_is_empty(&engine->joints)) {
2905         nxt_thread_exit(task->thread);
2906     }
2907 }
2908 
2909 
2910 void
2911 nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
2912 {
2913     nxt_socket_conf_t      *skcf;
2914     nxt_router_conf_t      *rtcf;
2915     nxt_thread_spinlock_t  *lock;
2916 
2917     nxt_debug(task, "conf joint %p count: %D", joint, joint->count);
2918 
2919     if (--joint->count != 0) {
2920         return;
2921     }
2922 
2923     nxt_queue_remove(&joint->link);
2924 
2925     /*
2926      * The joint content can not be safely used after the critical
2927      * section protected by the spinlock because its memory pool may
2928      * be already destroyed by another thread.
2929      */
2930     skcf = joint->socket_conf;
2931     rtcf = skcf->router_conf;
2932     lock = &rtcf->router->lock;
2933 
2934     nxt_thread_spin_lock(lock);
2935 
2936     nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count,
2937               rtcf, rtcf->count);
2938 
2939     if (--skcf->count != 0) {
2940         skcf = NULL;
2941         rtcf = NULL;
2942 
2943     } else {
2944         nxt_queue_remove(&skcf->link);
2945 
2946         if (--rtcf->count != 0) {
2947             rtcf = NULL;
2948         }
2949     }
2950 
2951     nxt_thread_spin_unlock(lock);
2952 
2953     if (skcf != NULL) {
2954         if (skcf->pass != NULL) {
2955             nxt_http_pass_cleanup(task, skcf->pass);
2956         }
2957 
2958 #if (NXT_TLS)
2959         if (skcf->tls != NULL) {
2960             task->thread->runtime->tls->server_free(task, skcf->tls);
2961         }
2962 #endif
2963     }
2964 
2965     /* TODO remove engine->port */
2966     /* TODO excude from connected ports */
2967 
2968     if (rtcf != NULL) {
2969         nxt_debug(task, "old router conf is destroyed");
2970 
2971         nxt_http_routes_cleanup(task, rtcf->routes);
2972 
2973         nxt_router_access_log_release(task, lock, rtcf->access_log);
2974 
2975         nxt_mp_thread_adopt(rtcf->mem_pool);
2976 
2977         nxt_mp_destroy(rtcf->mem_pool);
2978     }
2979 }
2980 
2981 
2982 static void
2983 nxt_router_access_log_writer(nxt_task_t *task, nxt_http_request_t *r,
2984     nxt_router_access_log_t *access_log)
2985 {
2986     size_t     size;
2987     u_char     *buf, *p;
2988     nxt_off_t  bytes;
2989 
2990     static nxt_time_string_t  date_cache = {
2991         (nxt_atomic_uint_t) -1,
2992         nxt_router_access_log_date,
2993         "%02d/%s/%4d:%02d:%02d:%02d %c%02d%02d",
2994         nxt_length("31/Dec/1986:19:40:00 +0300"),
2995         NXT_THREAD_TIME_LOCAL,
2996         NXT_THREAD_TIME_SEC,
2997     };
2998 
2999     size = r->remote->address_length
3000            + 6                  /* ' - - [' */
3001            + date_cache.size
3002            + 3                  /* '] "' */
3003            + r->method->length
3004            + 1                  /* space */
3005            + r->target.length
3006            + 1                  /* space */
3007            + r->version.length
3008            + 2                  /* '" ' */
3009            + 3                  /* status */
3010            + 1                  /* space */
3011            + NXT_OFF_T_LEN
3012            + 2                  /* ' "' */
3013            + (r->referer != NULL ? r->referer->value_length : 1)
3014            + 3                  /* '" "' */
3015            + (r->user_agent != NULL ? r->user_agent->value_length : 1)
3016            + 2                  /* '"\n' */
3017     ;
3018 
3019     buf = nxt_mp_nget(r->mem_pool, size);
3020     if (nxt_slow_path(buf == NULL)) {
3021         return;
3022     }
3023 
3024     p = nxt_cpymem(buf, nxt_sockaddr_address(r->remote),
3025                    r->remote->address_length);
3026 
3027     p = nxt_cpymem(p, " - - [", 6);
3028 
3029     p = nxt_thread_time_string(task->thread, &date_cache, p);
3030 
3031     p = nxt_cpymem(p, "] \"", 3);
3032 
3033     if (r->method->length != 0) {
3034         p = nxt_cpymem(p, r->method->start, r->method->length);
3035 
3036         if (r->target.length != 0) {
3037             *p++ = ' ';
3038             p = nxt_cpymem(p, r->target.start, r->target.length);
3039 
3040             if (r->version.length != 0) {
3041                 *p++ = ' ';
3042                 p = nxt_cpymem(p, r->version.start, r->version.length);
3043             }
3044         }
3045 
3046     } else {
3047         *p++ = '-';
3048     }
3049 
3050     p = nxt_cpymem(p, "\" ", 2);
3051 
3052     p = nxt_sprintf(p, p + 3, "%03d", r->status);
3053 
3054     *p++ = ' ';
3055 
3056     bytes = nxt_http_proto_body_bytes_sent[r->protocol](task, r->proto);
3057 
3058     p = nxt_sprintf(p, p + NXT_OFF_T_LEN, "%O", bytes);
3059 
3060     p = nxt_cpymem(p, " \"", 2);
3061 
3062     if (r->referer != NULL) {
3063         p = nxt_cpymem(p, r->referer->value, r->referer->value_length);
3064 
3065     } else {
3066         *p++ = '-';
3067     }
3068 
3069     p = nxt_cpymem(p, "\" \"", 3);
3070 
3071     if (r->user_agent != NULL) {
3072         p = nxt_cpymem(p, r->user_agent->value, r->user_agent->value_length);
3073 
3074     } else {
3075         *p++ = '-';
3076     }
3077 
3078     p = nxt_cpymem(p, "\"\n", 2);
3079 
3080     nxt_fd_write(access_log->fd, buf, p - buf);
3081 }
3082 
3083 
3084 static u_char *
3085 nxt_router_access_log_date(u_char *buf, nxt_realtime_t *now, struct tm *tm,
3086     size_t size, const char *format)
3087 {
3088     u_char  sign;
3089     time_t  gmtoff;
3090 
3091     static const char  *month[] = { "Jan", "Feb", "Mar", "Apr", "May", "Jun",
3092                                     "Jul", "Aug", "Sep", "Oct", "Nov", "Dec" };
3093 
3094     gmtoff = nxt_timezone(tm) / 60;
3095 
3096     if (gmtoff < 0) {
3097         gmtoff = -gmtoff;
3098         sign = '-';
3099 
3100     } else {
3101         sign = '+';
3102     }
3103 
3104     return nxt_sprintf(buf, buf + size, format,
3105                        tm->tm_mday, month[tm->tm_mon], tm->tm_year + 1900,
3106                        tm->tm_hour, tm->tm_min, tm->tm_sec,
3107                        sign, gmtoff / 60, gmtoff % 60);
3108 }
3109 
3110 
3111 static void
3112 nxt_router_access_log_open(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
3113 {
3114     uint32_t                 stream;
3115     nxt_int_t                ret;
3116     nxt_buf_t                *b;
3117     nxt_port_t               *main_port, *router_port;
3118     nxt_runtime_t            *rt;
3119     nxt_router_access_log_t  *access_log;
3120 
3121     access_log = tmcf->router_conf->access_log;
3122 
3123     b = nxt_buf_mem_alloc(tmcf->mem_pool, access_log->path.length + 1, 0);
3124     if (nxt_slow_path(b == NULL)) {
3125         goto fail;
3126     }
3127 
3128     nxt_buf_cpystr(b, &access_log->path);
3129     *b->mem.free++ = '\0';
3130 
3131     rt = task->thread->runtime;
3132     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
3133     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
3134 
3135     stream = nxt_port_rpc_register_handler(task, router_port,
3136                                            nxt_router_access_log_ready,
3137                                            nxt_router_access_log_error,
3138                                            -1, tmcf);
3139     if (nxt_slow_path(stream == 0)) {
3140         goto fail;
3141     }
3142 
3143     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1,
3144                                 stream, router_port->id, b);
3145 
3146     if (nxt_slow_path(ret != NXT_OK)) {
3147         nxt_port_rpc_cancel(task, router_port, stream);
3148         goto fail;
3149     }
3150 
3151     return;
3152 
3153 fail:
3154 
3155     nxt_router_conf_error(task, tmcf);
3156 }
3157 
3158 
3159 static void
3160 nxt_router_access_log_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3161     void *data)
3162 {
3163     nxt_router_temp_conf_t   *tmcf;
3164     nxt_router_access_log_t  *access_log;
3165 
3166     tmcf = data;
3167 
3168     access_log = tmcf->router_conf->access_log;
3169 
3170     access_log->fd = msg->fd;
3171 
3172     nxt_work_queue_add(&task->thread->engine->fast_work_queue,
3173                        nxt_router_conf_apply, task, tmcf, NULL);
3174 }
3175 
3176 
3177 static void
3178 nxt_router_access_log_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3179     void *data)
3180 {
3181     nxt_router_temp_conf_t  *tmcf;
3182 
3183     tmcf = data;
3184 
3185     nxt_router_conf_error(task, tmcf);
3186 }
3187 
3188 
3189 static void
3190 nxt_router_access_log_release(nxt_task_t *task, nxt_thread_spinlock_t *lock,
3191     nxt_router_access_log_t *access_log)
3192 {
3193     if (access_log == NULL) {
3194         return;
3195     }
3196 
3197     nxt_thread_spin_lock(lock);
3198 
3199     if (--access_log->count != 0) {
3200         access_log = NULL;
3201     }
3202 
3203     nxt_thread_spin_unlock(lock);
3204 
3205     if (access_log != NULL) {
3206 
3207         if (access_log->fd != -1) {
3208             nxt_fd_close(access_log->fd);
3209         }
3210 
3211         nxt_free(access_log);
3212     }
3213 }
3214 
3215 
3216 typedef struct {
3217     nxt_mp_t                 *mem_pool;
3218     nxt_router_access_log_t  *access_log;
3219 } nxt_router_access_log_reopen_t;
3220 
3221 
3222 void
3223 nxt_router_access_log_reopen_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
3224 {
3225     nxt_mp_t                        *mp;
3226     uint32_t                        stream;
3227     nxt_int_t                       ret;
3228     nxt_buf_t                       *b;
3229     nxt_port_t                      *main_port, *router_port;
3230     nxt_runtime_t                   *rt;
3231     nxt_router_access_log_t         *access_log;
3232     nxt_router_access_log_reopen_t  *reopen;
3233 
3234     access_log = nxt_router->access_log;
3235 
3236     if (access_log == NULL) {
3237         return;
3238     }
3239 
3240     mp = nxt_mp_create(1024, 128, 256, 32);
3241     if (nxt_slow_path(mp == NULL)) {
3242         return;
3243     }
3244 
3245     reopen = nxt_mp_get(mp, sizeof(nxt_router_access_log_reopen_t));
3246     if (nxt_slow_path(reopen == NULL)) {
3247         goto fail;
3248     }
3249 
3250     reopen->mem_pool = mp;
3251     reopen->access_log = access_log;
3252 
3253     b = nxt_buf_mem_alloc(mp, access_log->path.length + 1, 0);
3254     if (nxt_slow_path(b == NULL)) {
3255         goto fail;
3256     }
3257 
3258     b->completion_handler = nxt_router_access_log_reopen_completion;
3259 
3260     nxt_buf_cpystr(b, &access_log->path);
3261     *b->mem.free++ = '\0';
3262 
3263     rt = task->thread->runtime;
3264     main_port = rt->port_by_type[NXT_PROCESS_MAIN];
3265     router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
3266 
3267     stream = nxt_port_rpc_register_handler(task, router_port,
3268                                            nxt_router_access_log_reopen_ready,
3269                                            nxt_router_access_log_reopen_error,
3270                                            -1, reopen);
3271     if (nxt_slow_path(stream == 0)) {
3272         goto fail;
3273     }
3274 
3275     ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_ACCESS_LOG, -1,
3276                                 stream, router_port->id, b);
3277 
3278     if (nxt_slow_path(ret != NXT_OK)) {
3279         nxt_port_rpc_cancel(task, router_port, stream);
3280         goto fail;
3281     }
3282 
3283     nxt_mp_retain(mp);
3284 
3285     return;
3286 
3287 fail:
3288 
3289     nxt_mp_destroy(mp);
3290 }
3291 
3292 
3293 static void
3294 nxt_router_access_log_reopen_completion(nxt_task_t *task, void *obj, void *data)
3295 {
3296     nxt_mp_t   *mp;
3297     nxt_buf_t  *b;
3298 
3299     b = obj;
3300     mp = b->data;
3301 
3302     nxt_mp_release(mp);
3303 }
3304 
3305 
3306 static void
3307 nxt_router_access_log_reopen_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3308     void *data)
3309 {
3310     nxt_router_access_log_t         *access_log;
3311     nxt_router_access_log_reopen_t  *reopen;
3312 
3313     reopen = data;
3314 
3315     access_log = reopen->access_log;
3316 
3317     if (access_log == nxt_router->access_log) {
3318 
3319         if (nxt_slow_path(dup2(msg->fd, access_log->fd) == -1)) {
3320             nxt_alert(task, "dup2(%FD, %FD) failed %E",
3321                       msg->fd, access_log->fd, nxt_errno);
3322         }
3323     }
3324 
3325     nxt_fd_close(msg->fd);
3326     nxt_mp_release(reopen->mem_pool);
3327 }
3328 
3329 
3330 static void
3331 nxt_router_access_log_reopen_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3332     void *data)
3333 {
3334     nxt_router_access_log_reopen_t  *reopen;
3335 
3336     reopen = data;
3337 
3338     nxt_mp_release(reopen->mem_pool);
3339 }
3340 
3341 
3342 static void
3343 nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data)
3344 {
3345     nxt_port_t           *port;
3346     nxt_thread_link_t    *link;
3347     nxt_event_engine_t   *engine;
3348     nxt_thread_handle_t  handle;
3349 
3350     handle = (nxt_thread_handle_t) obj;
3351     link = data;
3352 
3353     nxt_thread_wait(handle);
3354 
3355     engine = link->engine;
3356 
3357     nxt_queue_remove(&engine->link);
3358 
3359     port = engine->port;
3360 
3361     // TODO notify all apps
3362 
3363     port->engine = task->thread->engine;
3364     nxt_mp_thread_adopt(port->mem_pool);
3365     nxt_port_use(task, port, -1);
3366 
3367     nxt_mp_thread_adopt(engine->mem_pool);
3368     nxt_mp_destroy(engine->mem_pool);
3369 
3370     nxt_event_engine_free(engine);
3371 
3372     nxt_free(link);
3373 }
3374 
3375 
3376 static void
3377 nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3378     void *data)
3379 {
3380     size_t               dump_size;
3381     nxt_int_t            ret;
3382     nxt_buf_t            *b;
3383     nxt_http_request_t   *r;
3384     nxt_req_conn_link_t  *rc;
3385     nxt_app_parse_ctx_t  *ar;
3386     nxt_unit_response_t  *resp;
3387 
3388     b = msg->buf;
3389     rc = data;
3390 
3391     dump_size = nxt_buf_used_size(b);
3392 
3393     if (dump_size > 300) {
3394         dump_size = 300;
3395     }
3396 
3397     if (msg->size == 0) {
3398         b = NULL;
3399     }
3400 
3401     ar = rc->ap;
3402     if (nxt_slow_path(ar == NULL)) {
3403         return;
3404     }
3405 
3406     if (ar->request->error) {
3407         nxt_router_rc_unlink(task, rc);
3408         return;
3409     }
3410 
3411     if (msg->port_msg.last != 0) {
3412         nxt_debug(task, "router data create last buf");
3413 
3414         nxt_buf_chain_add(&b, nxt_http_buf_last(ar->request));
3415 
3416         nxt_router_rc_unlink(task, rc);
3417 
3418     } else {
3419         if (rc->app != NULL && rc->app->timeout != 0) {
3420             ar->timer.handler = nxt_router_app_timeout;
3421             ar->timer_data = rc;
3422             nxt_timer_add(task->thread->engine, &ar->timer, rc->app->timeout);
3423         }
3424     }
3425 
3426     if (b == NULL) {
3427         return;
3428     }
3429 
3430     if (msg->buf == b) {
3431         /* Disable instant buffer completion/re-using by port. */
3432         msg->buf = NULL;
3433     }
3434 
3435     r = ar->request;
3436 
3437     if (r->header_sent) {
3438         nxt_buf_chain_add(&r->out, b);
3439         nxt_http_request_send_body(task, r, NULL);
3440 
3441     } else {
3442         size_t b_size = nxt_buf_mem_used_size(&b->mem);
3443 
3444         if (nxt_slow_path(b_size < sizeof(*resp))) {
3445             goto fail;
3446         }
3447 
3448         resp = (void *) b->mem.pos;
3449         if (nxt_slow_path(b_size < sizeof(*resp)
3450               + resp->fields_count * sizeof(nxt_unit_field_t))) {
3451             goto fail;
3452         }
3453 
3454         nxt_unit_field_t  *f;
3455         nxt_http_field_t  *field;
3456 
3457         for (f = resp->fields; f < resp->fields + resp->fields_count; f++) {
3458             field = nxt_list_add(ar->resp_parser.fields);
3459 
3460             if (nxt_slow_path(field == NULL)) {
3461                 goto fail;
3462             }
3463 
3464             field->hash = f->hash;
3465             field->skip = f->skip;
3466 
3467             field->name_length = f->name_length;
3468             field->value_length = f->value_length;
3469             field->name = nxt_unit_sptr_get(&f->name);
3470             field->value = nxt_unit_sptr_get(&f->value);
3471 
3472             nxt_debug(task, "header: %*s: %*s",
3473                       (size_t) field->name_length, field->name,
3474                       (size_t) field->value_length, field->value);
3475         }
3476         r->status = resp->status;
3477 
3478 /*
3479         ret = nxt_http_parse_fields(&ar->resp_parser, &b->mem);
3480         if (nxt_slow_path(ret != NXT_DONE)) {
3481             goto fail;
3482         }
3483 */
3484         r->resp.fields = ar->resp_parser.fields;
3485 
3486         ret = nxt_http_fields_process(r->resp.fields,
3487                                       &nxt_response_fields_hash, r);
3488         if (nxt_slow_path(ret != NXT_OK)) {
3489             goto fail;
3490         }
3491 
3492         if (resp->piggyback_content_length != 0) {
3493             b->mem.pos = nxt_unit_sptr_get(&resp->piggyback_content);
3494             b->mem.free = b->mem.pos + resp->piggyback_content_length;
3495 
3496         } else {
3497             b->mem.pos = b->mem.free;
3498         }
3499 
3500         if (nxt_buf_mem_used_size(&b->mem) == 0) {
3501             nxt_work_queue_add(&task->thread->engine->fast_work_queue,
3502                                b->completion_handler, task, b, b->parent);
3503 
3504             b = b->next;
3505         }
3506 
3507         if (b != NULL) {
3508             nxt_buf_chain_add(&r->out, b);
3509         }
3510 
3511         r->state = &nxt_http_request_send_state;
3512 
3513         nxt_http_request_header_send(task, r);
3514     }
3515 
3516     return;
3517 
3518 fail:
3519 
3520     nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
3521 
3522     nxt_router_rc_unlink(task, rc);
3523 }
3524 
3525 
3526 static const nxt_http_request_state_t  nxt_http_request_send_state
3527     nxt_aligned(64) =
3528 {
3529     .ready_handler = nxt_http_request_send_body,
3530     .error_handler = nxt_http_request_error_handler,
3531 };
3532 
3533 
3534 static void
3535 nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data)
3536 {
3537     nxt_buf_t           *out;
3538     nxt_http_request_t  *r;
3539 
3540     r = obj;
3541 
3542     out = r->out;
3543 
3544     if (out != NULL) {
3545         r->out = NULL;
3546         nxt_http_request_send(task, r, out);
3547     }
3548 }
3549 
3550 
3551 static void
3552 nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3553     void *data)
3554 {
3555     nxt_int_t            res;
3556     nxt_port_t           *port;
3557     nxt_bool_t           cancelled;
3558     nxt_req_app_link_t   *ra;
3559     nxt_req_conn_link_t  *rc;
3560 
3561     rc = data;
3562 
3563     ra = rc->ra;
3564 
3565     if (ra != NULL) {
3566         cancelled = nxt_router_msg_cancel(task, &ra->msg_info, ra->stream);
3567 
3568         if (cancelled) {
3569             nxt_router_ra_inc_use(ra);
3570 
3571             res = nxt_router_app_port(task, rc->app, ra);
3572 
3573             if (res == NXT_OK) {
3574                 port = ra->app_port;
3575 
3576                 if (nxt_slow_path(port == NULL)) {
3577                     nxt_log(task, NXT_LOG_ERR, "port is NULL in cancelled ra");
3578                     return;
3579                 }
3580 
3581                 nxt_port_rpc_ex_set_peer(task, task->thread->engine->port, rc,
3582                                          port->pid);
3583 
3584                 nxt_router_app_prepare_request(task, ra);
3585             }
3586 
3587             msg->port_msg.last = 0;
3588 
3589             return;
3590         }
3591     }
3592 
3593     if (rc->ap != NULL) {
3594         nxt_http_request_error(task, rc->ap->request,
3595                                NXT_HTTP_SERVICE_UNAVAILABLE);
3596     }
3597 
3598     nxt_router_rc_unlink(task, rc);
3599 }
3600 
3601 
3602 static void
3603 nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3604     void *data)
3605 {
3606     nxt_app_t        *app;
3607     nxt_port_t       *port;
3608     nxt_app_joint_t  *app_joint;
3609 
3610     app_joint = data;
3611     port = msg->u.new_port;
3612 
3613     nxt_assert(app_joint != NULL);
3614     nxt_assert(port != NULL);
3615 
3616     app = app_joint->app;
3617 
3618     nxt_router_app_joint_use(task, app_joint, -1);
3619 
3620     if (nxt_slow_path(app == NULL)) {
3621         nxt_debug(task, "new port ready for released app, send QUIT");
3622 
3623         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
3624 
3625         return;
3626     }
3627 
3628     port->app = app;
3629 
3630     nxt_thread_mutex_lock(&app->mutex);
3631 
3632     nxt_assert(app->pending_processes != 0);
3633 
3634     app->pending_processes--;
3635     app->processes++;
3636 
3637     nxt_thread_mutex_unlock(&app->mutex);
3638 
3639     nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d",
3640               &app->name, port->pid, app->processes, app->pending_processes);
3641 
3642     nxt_router_app_port_release(task, port, 0, 0);
3643 }
3644 
3645 
3646 static void
3647 nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
3648     void *data)
3649 {
3650     nxt_app_t           *app;
3651     nxt_app_joint_t     *app_joint;
3652     nxt_queue_link_t    *lnk;
3653     nxt_req_app_link_t  *ra;
3654 
3655     app_joint = data;
3656 
3657     nxt_assert(app_joint != NULL);
3658 
3659     app = app_joint->app;
3660 
3661     nxt_router_app_joint_use(task, app_joint, -1);
3662 
3663     if (nxt_slow_path(app == NULL)) {
3664         nxt_debug(task, "start error for released app");
3665 
3666         return;
3667     }
3668 
3669     nxt_debug(task, "app '%V' %p start error", &app->name, app);
3670 
3671     nxt_thread_mutex_lock(&app->mutex);
3672 
3673     nxt_assert(app->pending_processes != 0);
3674 
3675     app->pending_processes--;
3676 
3677     if (!nxt_queue_is_empty(&app->requests)) {
3678         lnk = nxt_queue_last(&app->requests);
3679         nxt_queue_remove(lnk);
3680         lnk->next = NULL;
3681 
3682         ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_requests);
3683 
3684     } else {
3685         ra = NULL;
3686     }
3687 
3688     nxt_thread_mutex_unlock(&app->mutex);
3689 
3690     if (ra != NULL) {
3691         nxt_debug(task, "app '%V' %p abort next stream #%uD",
3692                   &app->name, app, ra->stream);
3693 
3694         nxt_router_ra_error(ra, 500, "Failed to start application process");
3695         nxt_router_ra_use(task, ra, -1);
3696     }
3697 }
3698 
3699 nxt_inline nxt_port_t *
3700 nxt_router_app_get_port_for_quit(nxt_app_t *app);
3701 
3702 void
3703 nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i)
3704 {
3705     int  c;
3706 
3707     c = nxt_atomic_fetch_add(&app->use_count, i);
3708 
3709     if (i < 0 && c == -i) {
3710 
3711         if (task->thread->engine != app->engine) {
3712             nxt_event_engine_post(app->engine, &app->joint->free_app_work);
3713 
3714         } else {
3715             nxt_router_free_app(task, app->joint, NULL);
3716         }
3717     }
3718 }
3719 
3720 
3721 nxt_inline nxt_bool_t
3722 nxt_router_app_first_port_busy(nxt_app_t *app)
3723 {
3724     nxt_port_t        *port;
3725     nxt_queue_link_t  *lnk;
3726 
3727     lnk = nxt_queue_first(&app->ports);
3728     port = nxt_queue_link_data(lnk, nxt_port_t, app_link);
3729 
3730     return port->app_pending_responses > 0;
3731 }
3732 
3733 
3734 nxt_inline nxt_port_t *
3735 nxt_router_pop_first_port(nxt_app_t *app)
3736 {
3737     nxt_port_t        *port;
3738     nxt_queue_link_t  *lnk;
3739 
3740     lnk = nxt_queue_first(&app->ports);
3741     nxt_queue_remove(lnk);
3742 
3743     port = nxt_queue_link_data(lnk, nxt_port_t, app_link);
3744 
3745     port->app_pending_responses++;
3746 
3747     if (nxt_queue_chk_remove(&port->idle_link)) {
3748         app->idle_processes--;
3749 
3750         if (port->idle_start == 0) {
3751             nxt_assert(app->idle_processes < app->spare_processes);
3752 
3753         } else {
3754             nxt_assert(app->idle_processes >= app->spare_processes);
3755 
3756             port->idle_start = 0;
3757         }
3758     }
3759 
3760     if ((app->max_pending_responses == 0
3761             || port->app_pending_responses < app->max_pending_responses)
3762         && (app->max_requests == 0
3763             || port->app_responses + port->app_pending_responses
3764                 < app->max_requests))
3765     {
3766         nxt_queue_insert_tail(&app->ports, lnk);
3767 
3768         nxt_port_inc_use(port);
3769 
3770     } else {
3771         lnk->next = NULL;
3772     }
3773 
3774     return port;
3775 }
3776 
3777 
3778 nxt_inline nxt_port_t *
3779 nxt_router_app_get_port_for_quit(nxt_app_t *app)
3780 {
3781     nxt_port_t  *port;
3782 
3783     port = NULL;
3784 
3785     nxt_thread_mutex_lock(&app->mutex);
3786 
3787     nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
3788 
3789         if (port->app_pending_responses > 0) {
3790             port = NULL;
3791 
3792             continue;
3793         }
3794 
3795         /* Caller is responsible to decrease port use count. */
3796         nxt_queue_chk_remove(&port->app_link);
3797 
3798         if (nxt_queue_chk_remove(&port->idle_link)) {
3799             app->idle_processes--;
3800         }
3801 
3802         port->app = NULL;
3803         app->processes--;
3804 
3805         break;
3806 
3807     } nxt_queue_loop;
3808 
3809     nxt_thread_mutex_unlock(&app->mutex);
3810 
3811     return port;
3812 }
3813 
3814 
3815 static void
3816 nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app)
3817 {
3818     nxt_debug(task, "app '%V' %p unlink", &app->name, app);
3819 
3820     nxt_queue_remove(&app->link);
3821 
3822     nxt_router_app_use(task, app, -1);
3823 }
3824 
3825 
3826 static void
3827 nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data)
3828 {
3829     nxt_req_app_link_t  *ra;
3830 
3831     ra = data;
3832 
3833 #if (NXT_DEBUG)
3834     {
3835     nxt_app_t  *app;
3836 
3837     app = obj;
3838 
3839     nxt_assert(app != NULL);
3840     nxt_assert(ra != NULL);
3841     nxt_assert(ra->app_port != NULL);
3842 
3843     nxt_debug(task, "app '%V' %p process next stream #%uD",
3844               &app->name, app, ra->stream);
3845     }
3846 #endif
3847 
3848     nxt_router_app_prepare_request(task, ra);
3849 }
3850 
3851 
3852 static void
3853 nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
3854     uint32_t request_failed, uint32_t got_response)
3855 {
3856     nxt_app_t                *app;
3857     nxt_bool_t               port_unchained;
3858     nxt_bool_t               send_quit, cancelled, adjust_idle_timer;
3859     nxt_queue_link_t         *lnk;
3860     nxt_req_app_link_t       *ra, *pending_ra, *re_ra;
3861     nxt_port_select_state_t  state;
3862 
3863     nxt_assert(port != NULL);
3864     nxt_assert(port->app != NULL);
3865 
3866     ra = NULL;
3867 
3868     app = port->app;
3869 
3870     nxt_thread_mutex_lock(&app->mutex);
3871 
3872     port->app_pending_responses -= request_failed + got_response;
3873     port->app_responses += got_response;
3874 
3875     if (port->pair[1] != -1
3876         && (app->max_pending_responses == 0
3877             || port->app_pending_responses < app->max_pending_responses)
3878         && (app->max_requests == 0
3879             || port->app_responses + port->app_pending_responses
3880                 < app->max_requests))
3881     {
3882         if (port->app_link.next == NULL) {
3883             if (port->app_pending_responses > 0) {
3884                 nxt_queue_insert_tail(&app->ports, &port->app_link);
3885 
3886             } else {
3887                 nxt_queue_insert_head(&app->ports, &port->app_link);
3888             }
3889 
3890             nxt_port_inc_use(port);
3891 
3892         } else {
3893             if (port->app_pending_responses == 0
3894                 && nxt_queue_first(&app->ports) != &port->app_link)
3895             {
3896                 nxt_queue_remove(&port->app_link);
3897                 nxt_queue_insert_head(&app->ports, &port->app_link);
3898             }
3899         }
3900     }
3901 
3902     if (!nxt_queue_is_empty(&app->ports)
3903         && !nxt_queue_is_empty(&app->requests))
3904     {
3905         lnk = nxt_queue_first(&app->requests);
3906         nxt_queue_remove(lnk);
3907         lnk->next = NULL;
3908 
3909         ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_requests);
3910 
3911         ra->app_port = nxt_router_pop_first_port(app);
3912 
3913         if (ra->app_port->app_pending_responses > 1) {
3914             nxt_router_ra_pending(task, app, ra);
3915         }
3916     }
3917 
3918     /* Pop first pending request for this port. */
3919     if ((request_failed > 0 || got_response > 0)
3920         && !nxt_queue_is_empty(&port->pending_requests))
3921     {
3922         lnk = nxt_queue_first(&port->pending_requests);
3923         nxt_queue_remove(lnk);
3924         lnk->next = NULL;
3925 
3926         pending_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t,
3927                                          link_port_pending);
3928 
3929         nxt_assert(pending_ra->link_app_pending.next != NULL);
3930 
3931         nxt_queue_remove(&pending_ra->link_app_pending);
3932         pending_ra->link_app_pending.next = NULL;
3933 
3934     } else {
3935         pending_ra = NULL;
3936     }
3937 
3938     /* Try to cancel and re-schedule first stalled request for this app. */
3939     if (got_response > 0 && !nxt_queue_is_empty(&app->pending)) {
3940         lnk = nxt_queue_first(&app->pending);
3941 
3942         re_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_pending);
3943 
3944         if (re_ra->res_time <= nxt_thread_monotonic_time(task->thread)) {
3945 
3946             nxt_debug(task, "app '%V' stalled request #%uD detected",
3947                       &app->name, re_ra->stream);
3948 
3949             cancelled = nxt_router_msg_cancel(task, &re_ra->msg_info,
3950                                               re_ra->stream);
3951 
3952             if (cancelled) {
3953                 nxt_router_ra_inc_use(re_ra);
3954 
3955                 state.ra = re_ra;
3956                 state.app = app;
3957 
3958                 nxt_router_port_select(task, &state);
3959 
3960                 goto re_ra_cancelled;
3961             }
3962         }
3963     }
3964 
3965     re_ra = NULL;
3966 
3967 re_ra_cancelled:
3968 
3969     send_quit = (app->max_requests > 0
3970                  && port->app_pending_responses == 0
3971                  && port->app_responses >= app->max_requests);
3972 
3973     if (send_quit) {
3974         port_unchained = nxt_queue_chk_remove(&port->app_link);
3975 
3976         port->app = NULL;
3977         app->processes--;
3978 
3979     } else {
3980         port_unchained = 0;
3981     }
3982 
3983     adjust_idle_timer = 0;
3984 
3985     if (port->pair[1] != -1 && !send_quit && port->app_pending_responses == 0) {
3986         nxt_assert(port->idle_link.next == NULL);
3987 
3988         if (app->idle_processes == app->spare_processes
3989             && app->adjust_idle_work.data == NULL)
3990         {
3991             adjust_idle_timer = 1;
3992             app->adjust_idle_work.data = app;
3993             app->adjust_idle_work.next = NULL;
3994         }
3995 
3996         if (app->idle_processes < app->spare_processes) {
3997             nxt_queue_insert_tail(&app->spare_ports, &port->idle_link);
3998 
3999         } else {
4000             nxt_queue_insert_tail(&app->idle_ports, &port->idle_link);
4001 
4002             port->idle_start = task->thread->engine->timers.now;
4003         }
4004 
4005         app->idle_processes++;
4006     }
4007 
4008     nxt_thread_mutex_unlock(&app->mutex);
4009 
4010     if (adjust_idle_timer) {
4011         nxt_router_app_use(task, app, 1);
4012         nxt_event_engine_post(app->engine, &app->adjust_idle_work);
4013     }
4014 
4015     if (pending_ra != NULL) {
4016         nxt_router_ra_use(task, pending_ra, -1);
4017     }
4018 
4019     if (re_ra != NULL) {
4020         if (nxt_router_port_post_select(task, &state) == NXT_OK) {
4021             nxt_work_queue_add(&task->thread->engine->fast_work_queue,
4022                                nxt_router_app_process_request,
4023                                &task->thread->engine->task, app, re_ra);
4024         }
4025     }
4026 
4027     if (ra != NULL) {
4028         nxt_router_ra_use(task, ra, -1);
4029 
4030         nxt_work_queue_add(&task->thread->engine->fast_work_queue,
4031                            nxt_router_app_process_request,
4032                            &task->thread->engine->task, app, ra);
4033 
4034         goto adjust_use;
4035     }
4036 
4037     /* ? */
4038     if (port->pair[1] == -1) {
4039         nxt_debug(task, "app '%V' %p port %p already closed (pid %PI dead?)",
4040                   &app->name, app, port, port->pid);
4041 
4042         goto adjust_use;
4043     }
4044 
4045     if (send_quit) {
4046         nxt_debug(task, "app '%V' %p send QUIT to port",
4047                   &app->name, app);
4048 
4049         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT,
4050                               -1, 0, 0, NULL);
4051 
4052         if (port_unchained) {
4053             nxt_port_use(task, port, -1);
4054         }
4055 
4056         goto adjust_use;
4057     }
4058 
4059     nxt_debug(task, "app '%V' %p requests queue is empty, keep the port",
4060               &app->name, app);
4061 
4062 adjust_use:
4063 
4064     if (request_failed > 0 || got_response > 0) {
4065         nxt_port_use(task, port, -1);
4066     }
4067 }
4068 
4069 
4070 void
4071 nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port)
4072 {
4073     nxt_app_t         *app;
4074     nxt_bool_t        unchain, start_process;
4075     nxt_port_t        *idle_port;
4076     nxt_queue_link_t  *idle_lnk;
4077 
4078     app = port->app;
4079 
4080     nxt_assert(app != NULL);
4081 
4082     nxt_thread_mutex_lock(&app->mutex);
4083 
4084     unchain = nxt_queue_chk_remove(&port->app_link);
4085 
4086     if (nxt_queue_chk_remove(&port->idle_link)) {
4087         app->idle_processes--;
4088 
4089         if (port->idle_start == 0
4090             && app->idle_processes >= app->spare_processes)
4091         {
4092             nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4093 
4094             idle_lnk = nxt_queue_last(&app->idle_ports);
4095             idle_port = nxt_queue_link_data(idle_lnk, nxt_port_t, idle_link);
4096             nxt_queue_remove(idle_lnk);
4097 
4098             nxt_queue_insert_tail(&app->spare_ports, idle_lnk);
4099 
4100             idle_port->idle_start = 0;
4101         }
4102     }
4103 
4104     app->processes--;
4105 
4106     start_process = !task->thread->engine->shutdown
4107                     && nxt_router_app_can_start(app)
4108                     && (!nxt_queue_is_empty(&app->requests)
4109                         || nxt_router_app_need_start(app));
4110 
4111     if (start_process) {
4112         app->pending_processes++;
4113     }
4114 
4115     nxt_thread_mutex_unlock(&app->mutex);
4116 
4117     nxt_debug(task, "app '%V' pid %PI closed", &app->name, port->pid);
4118 
4119     if (unchain) {
4120         nxt_port_use(task, port, -1);
4121     }
4122 
4123     if (start_process) {
4124         nxt_router_start_app_process(task, app);
4125     }
4126 }
4127 
4128 
4129 static void
4130 nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data)
4131 {
4132     nxt_app_t           *app;
4133     nxt_bool_t          queued;
4134     nxt_port_t          *port;
4135     nxt_msec_t          timeout, threshold;
4136     nxt_queue_link_t    *lnk;
4137     nxt_event_engine_t  *engine;
4138 
4139     app = obj;
4140     queued = (data == app);
4141 
4142     nxt_debug(task, "nxt_router_adjust_idle_timer: app \"%V\", queued %b",
4143               &app->name, queued);
4144 
4145     engine = task->thread->engine;
4146 
4147     nxt_assert(app->engine == engine);
4148 
4149     threshold = engine->timers.now + app->joint->idle_timer.bias;
4150     timeout = 0;
4151 
4152     nxt_thread_mutex_lock(&app->mutex);
4153 
4154     if (queued) {
4155         app->adjust_idle_work.data = NULL;
4156     }
4157 
4158     while (app->idle_processes > app->spare_processes) {
4159 
4160         nxt_assert(!nxt_queue_is_empty(&app->idle_ports));
4161 
4162         lnk = nxt_queue_first(&app->idle_ports);
4163         port = nxt_queue_link_data(lnk, nxt_port_t, idle_link);
4164 
4165         timeout = port->idle_start + app->idle_timeout;
4166 
4167         if (timeout > threshold) {
4168             break;
4169         }
4170 
4171         nxt_queue_remove(lnk);
4172         lnk->next = NULL;
4173 
4174         nxt_queue_chk_remove(&port->app_link);
4175 
4176         app->idle_processes--;
4177         app->processes--;
4178         port->app = NULL;
4179 
4180         nxt_thread_mutex_unlock(&app->mutex);
4181 
4182         nxt_debug(task, "app '%V' send QUIT to idle port %PI",
4183                   &app->name, port->pid);
4184 
4185         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4186 
4187         nxt_port_use(task, port, -1);
4188 
4189         nxt_thread_mutex_lock(&app->mutex);
4190     }
4191 
4192     nxt_thread_mutex_unlock(&app->mutex);
4193 
4194     if (timeout > threshold) {
4195         nxt_timer_add(engine, &app->joint->idle_timer, timeout - threshold);
4196 
4197     } else {
4198         nxt_timer_disable(engine, &app->joint->idle_timer);
4199     }
4200 
4201     if (queued) {
4202         nxt_router_app_use(task, app, -1);
4203     }
4204 }
4205 
4206 
4207 static void
4208 nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, void *data)
4209 {
4210     nxt_timer_t      *timer;
4211     nxt_app_joint_t  *app_joint;
4212 
4213     timer = obj;
4214     app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer);
4215 
4216     if (nxt_fast_path(app_joint->app != NULL)) {
4217         nxt_router_adjust_idle_timer(task, app_joint->app, NULL);
4218     }
4219 }
4220 
4221 
4222 static void
4223 nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, void *data)
4224 {
4225     nxt_timer_t      *timer;
4226     nxt_app_joint_t  *app_joint;
4227 
4228     timer = obj;
4229     app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer);
4230 
4231     nxt_router_app_joint_use(task, app_joint, -1);
4232 }
4233 
4234 
4235 static void
4236 nxt_router_free_app(nxt_task_t *task, void *obj, void *data)
4237 {
4238     nxt_app_t        *app;
4239     nxt_port_t       *port;
4240     nxt_app_joint_t  *app_joint;
4241 
4242     app_joint = obj;
4243     app = app_joint->app;
4244 
4245     for ( ;; ) {
4246         port = nxt_router_app_get_port_for_quit(app);
4247         if (port == NULL) {
4248             break;
4249         }
4250 
4251         nxt_debug(task, "send QUIT to app '%V' pid %PI", &app->name, port->pid);
4252 
4253         nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4254 
4255         nxt_port_use(task, port, -1);
4256     }
4257 
4258     nxt_assert(app->processes == 0);
4259     nxt_assert(app->idle_processes == 0);
4260     nxt_assert(nxt_queue_is_empty(&app->requests));
4261     nxt_assert(nxt_queue_is_empty(&app->ports));
4262     nxt_assert(nxt_queue_is_empty(&app->spare_ports));
4263     nxt_assert(nxt_queue_is_empty(&app->idle_ports));
4264 
4265     nxt_thread_mutex_destroy(&app->mutex);
4266     nxt_free(app);
4267 
4268     app_joint->app = NULL;
4269 
4270     if (nxt_timer_delete(task->thread->engine, &app_joint->idle_timer)) {
4271         app_joint->idle_timer.handler = nxt_router_app_joint_release_handler;
4272         nxt_timer_add(task->thread->engine, &app_joint->idle_timer, 0);
4273 
4274     } else {
4275         nxt_router_app_joint_use(task, app_joint, -1);
4276     }
4277 }
4278 
4279 
4280 static void
4281 nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state)
4282 {
4283     nxt_app_t           *app;
4284     nxt_bool_t          can_start_process;
4285     nxt_req_app_link_t  *ra;
4286 
4287     ra = state->ra;
4288     app = state->app;
4289 
4290     state->failed_port_use_delta = 0;
4291 
4292     if (nxt_queue_chk_remove(&ra->link_app_requests))
4293     {
4294         nxt_router_ra_dec_use(ra);
4295     }
4296 
4297     if (nxt_queue_chk_remove(&ra->link_port_pending))
4298     {
4299         nxt_assert(ra->link_app_pending.next != NULL);
4300 
4301         nxt_queue_remove(&ra->link_app_pending);
4302         ra->link_app_pending.next = NULL;
4303 
4304         nxt_router_ra_dec_use(ra);
4305     }
4306 
4307     state->failed_port = ra->app_port;
4308 
4309     if (ra->app_port != NULL) {
4310         state->failed_port_use_delta--;
4311 
4312         state->failed_port->app_pending_responses--;
4313 
4314         if (nxt_queue_chk_remove(&state->failed_port->app_link)) {
4315             state->failed_port_use_delta--;
4316         }
4317 
4318         ra->app_port = NULL;
4319     }
4320 
4321     can_start_process = nxt_router_app_can_start(app);
4322 
4323     state->port = NULL;
4324     state->start_process = 0;
4325 
4326     if (nxt_queue_is_empty(&app->ports)
4327         || (can_start_process && nxt_router_app_first_port_busy(app)) )
4328     {
4329         ra = nxt_router_ra_create(task, ra);
4330 
4331         if (nxt_slow_path(ra == NULL)) {
4332             goto fail;
4333         }
4334 
4335         if (nxt_slow_path(state->failed_port != NULL)) {
4336             nxt_queue_insert_head(&app->requests, &ra->link_app_requests);
4337 
4338         } else {
4339             nxt_queue_insert_tail(&app->requests, &ra->link_app_requests);
4340         }
4341 
4342         nxt_router_ra_inc_use(ra);
4343 
4344         nxt_debug(task, "ra stream #%uD enqueue to app->requests", ra->stream);
4345 
4346         if (can_start_process) {
4347             app->pending_processes++;
4348             state->start_process = 1;
4349         }
4350 
4351     } else {
4352         state->port = nxt_router_pop_first_port(app);
4353 
4354         if (state->port->app_pending_responses > 1) {
4355             ra = nxt_router_ra_create(task, ra);
4356 
4357             if (nxt_slow_path(ra == NULL)) {
4358                 goto fail;
4359             }
4360 
4361             ra->app_port = state->port;
4362 
4363             nxt_router_ra_pending(task, app, ra);
4364         }
4365 
4366         if (can_start_process && nxt_router_app_need_start(app)) {
4367             app->pending_processes++;
4368             state->start_process = 1;
4369         }
4370     }
4371 
4372 fail:
4373 
4374     state->shared_ra = ra;
4375 }
4376 
4377 
4378 static nxt_int_t
4379 nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state)
4380 {
4381     nxt_int_t           res;
4382     nxt_app_t           *app;
4383     nxt_req_app_link_t  *ra;
4384 
4385     ra = state->shared_ra;
4386     app = state->app;
4387 
4388     if (state->failed_port_use_delta != 0) {
4389         nxt_port_use(task, state->failed_port, state->failed_port_use_delta);
4390     }
4391 
4392     if (nxt_slow_path(ra == NULL)) {
4393         if (state->port != NULL) {
4394             nxt_port_use(task, state->port, -1);
4395         }
4396 
4397         nxt_router_ra_error(state->ra, 500,
4398                             "Failed to allocate shared req<->app link");
4399         nxt_router_ra_use(task, state->ra, -1);
4400 
4401         return NXT_ERROR;
4402     }
4403 
4404     if (state->port != NULL) {
4405         nxt_debug(task, "already have port for app '%V' %p ", &app->name, app);
4406 
4407         ra->app_port = state->port;
4408 
4409         if (state->start_process) {
4410             nxt_router_start_app_process(task, app);
4411         }
4412 
4413         return NXT_OK;
4414     }
4415 
4416     if (!state->start_process) {
4417         nxt_debug(task, "app '%V' %p too many running or pending processes",
4418                   &app->name, app);
4419 
4420         return NXT_AGAIN;
4421     }
4422 
4423     res = nxt_router_start_app_process(task, app);
4424 
4425     if (nxt_slow_path(res != NXT_OK)) {
4426         nxt_router_ra_error(ra, 500, "Failed to start app process");
4427         nxt_router_ra_use(task, ra, -1);
4428 
4429         return NXT_ERROR;
4430     }
4431 
4432     return NXT_AGAIN;
4433 }
4434 
4435 
4436 static nxt_int_t
4437 nxt_router_app_port(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra)
4438 {
4439     nxt_port_select_state_t  state;
4440 
4441     state.ra = ra;
4442     state.app = app;
4443 
4444     nxt_thread_mutex_lock(&app->mutex);
4445 
4446     nxt_router_port_select(task, &state);
4447 
4448     nxt_thread_mutex_unlock(&app->mutex);
4449 
4450     return nxt_router_port_post_select(task, &state);
4451 }
4452 
4453 
4454 void
4455 nxt_router_process_http_request(nxt_task_t *task, nxt_app_parse_ctx_t *ar,
4456     nxt_app_t *app)
4457 {
4458     nxt_int_t            res;
4459     nxt_port_t           *port;
4460     nxt_event_engine_t   *engine;
4461     nxt_http_request_t   *r;
4462     nxt_req_app_link_t   ra_local, *ra;
4463     nxt_req_conn_link_t  *rc;
4464 
4465     r = ar->request;
4466     engine = task->thread->engine;
4467 
4468     rc = nxt_port_rpc_register_handler_ex(task, engine->port,
4469                                           nxt_router_response_ready_handler,
4470                                           nxt_router_response_error_handler,
4471                                           sizeof(nxt_req_conn_link_t));
4472 
4473     if (nxt_slow_path(rc == NULL)) {
4474         nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
4475         return;
4476     }
4477 
4478     rc->stream = nxt_port_rpc_ex_stream(rc);
4479     rc->app = app;
4480 
4481     nxt_router_app_use(task, app, 1);
4482 
4483     rc->ap = ar;
4484 
4485     ra = &ra_local;
4486     nxt_router_ra_init(task, ra, rc);
4487 
4488     res = nxt_router_app_port(task, app, ra);
4489 
4490     if (res != NXT_OK) {
4491         return;
4492     }
4493 
4494     ra = rc->ra;
4495     port = ra->app_port;
4496 
4497     nxt_assert(port != NULL);
4498 
4499     nxt_port_rpc_ex_set_peer(task, engine->port, rc, port->pid);
4500 
4501     nxt_router_app_prepare_request(task, ra);
4502 }
4503 
4504 
4505 static void
4506 nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj, void *data)
4507 {
4508 }
4509 
4510 
4511 static void
4512 nxt_router_app_prepare_request(nxt_task_t *task, nxt_req_app_link_t *ra)
4513 {
4514     uint32_t             request_failed;
4515     nxt_buf_t            *buf;
4516     nxt_int_t            res;
4517     nxt_port_t           *port, *c_port, *reply_port;
4518     nxt_app_parse_ctx_t  *ap;
4519 
4520     nxt_assert(ra->app_port != NULL);
4521 
4522     port = ra->app_port;
4523     reply_port = ra->reply_port;
4524     ap = ra->ap;
4525 
4526     request_failed = 1;
4527 
4528     c_port = nxt_process_connected_port_find(port->process, reply_port->pid,
4529                                              reply_port->id);
4530     if (nxt_slow_path(c_port != reply_port)) {
4531         res = nxt_port_send_port(task, port, reply_port, 0);
4532 
4533         if (nxt_slow_path(res != NXT_OK)) {
4534             nxt_router_ra_error(ra, 500,
4535                                 "Failed to send reply port to application");
4536             goto release_port;
4537         }
4538 
4539         nxt_process_connected_port_add(port->process, reply_port);
4540     }
4541 
4542     buf = nxt_router_prepare_msg(task, &ap->r, port,
4543                                  nxt_app_msg_prefix[port->app->type]);
4544 
4545     if (nxt_slow_path(buf == NULL)) {
4546         nxt_router_ra_error(ra, 500,
4547                             "Failed to prepare message for application");
4548         goto release_port;
4549     }
4550 
4551     nxt_debug(task, "about to send %O bytes buffer to app process port %d",
4552                     nxt_buf_used_size(buf),
4553                     port->socket.fd);
4554 
4555     request_failed = 0;
4556 
4557     ra->msg_info.buf = buf;
4558     ra->msg_info.completion_handler = buf->completion_handler;
4559 
4560     for (; buf; buf = buf->next) {
4561         buf->completion_handler = nxt_router_dummy_buf_completion;
4562     }
4563 
4564     buf = ra->msg_info.buf;
4565 
4566     res = nxt_port_mmap_get_tracking(task, port, &ra->msg_info.tracking,
4567                                      ra->stream);
4568     if (nxt_slow_path(res != NXT_OK)) {
4569         nxt_router_ra_error(ra, 500,
4570                             "Failed to get tracking area");
4571         goto release_port;
4572     }
4573 
4574     res = nxt_port_socket_twrite(task, port, NXT_PORT_MSG_DATA,
4575                                  -1, ra->stream, reply_port->id, buf,
4576                                  &ra->msg_info.tracking);
4577 
4578     if (nxt_slow_path(res != NXT_OK)) {
4579         nxt_router_ra_error(ra, 500,
4580                             "Failed to send message to application");
4581         goto release_port;
4582     }
4583 
4584 release_port:
4585 
4586     nxt_router_app_port_release(task, port, request_failed, 0);
4587 
4588     nxt_router_ra_update_peer(task, ra);
4589 }
4590 
4591 
4592 struct nxt_fields_iter_s {
4593     nxt_list_part_t   *part;
4594     nxt_http_field_t  *field;
4595 };
4596 
4597 typedef struct nxt_fields_iter_s  nxt_fields_iter_t;
4598 
4599 
4600 static nxt_http_field_t *
4601 nxt_fields_part_first(nxt_list_part_t *part, nxt_fields_iter_t *i)
4602 {
4603     if (part == NULL) {
4604         return NULL;
4605     }
4606 
4607     while (part->nelts == 0) {
4608         part = part->next;
4609         if (part == NULL) {
4610             return NULL;
4611         }
4612     }
4613 
4614     i->part = part;
4615     i->field = nxt_list_data(i->part);
4616 
4617     return i->field;
4618 }
4619 
4620 
4621 static nxt_http_field_t *
4622 nxt_fields_first(nxt_list_t *fields, nxt_fields_iter_t *i)
4623 {
4624     return nxt_fields_part_first(nxt_list_part(fields), i);
4625 }
4626 
4627 
4628 static nxt_http_field_t *
4629 nxt_fields_next(nxt_fields_iter_t *i)
4630 {
4631     nxt_http_field_t  *end = nxt_list_data(i->part);
4632 
4633     end += i->part->nelts;
4634     i->field++;
4635 
4636     if (i->field < end) {
4637         return i->field;
4638     }
4639 
4640     return nxt_fields_part_first(i->part->next, i);
4641 }
4642 
4643 
4644 static nxt_buf_t *
4645 nxt_router_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
4646     nxt_port_t *port, const nxt_str_t *prefix)
4647 {
4648     void                      *target_pos, *query_pos;
4649     u_char                    *pos, *end, *p, c;
4650     size_t                    fields_count, req_size, size, free_size;
4651     size_t                    copy_size;
4652     nxt_buf_t                 *b, *buf, *out, **tail;
4653     nxt_http_field_t          *field, *dup;
4654     nxt_unit_field_t          *dst_field;
4655     nxt_fields_iter_t         iter, dup_iter;
4656     nxt_unit_request_t        *req;
4657     nxt_app_request_header_t  *h;
4658 
4659     h = &r->header;
4660 
4661     req_size = sizeof(nxt_unit_request_t)
4662                + h->method.length + 1
4663                + h->version.length + 1
4664                + r->remote.length + 1
4665                + r->local.length + 1
4666                + h->server_name.length + 1
4667                + h->target.length + 1
4668                + (h->path.start != h->target.start ? h->path.length + 1 : 0);
4669 
4670     fields_count = 0;
4671 
4672     nxt_list_each(field, h->fields) {
4673         fields_count++;
4674 
4675         req_size += field->name_length + prefix->length + 1
4676                     + field->value_length + 1;
4677     } nxt_list_loop;
4678 
4679     req_size += fields_count * sizeof(nxt_unit_field_t);
4680 
4681     if (nxt_slow_path(req_size > PORT_MMAP_DATA_SIZE)) {
4682         nxt_alert(task, "headers to big to fit in shared memory (%d)",
4683                   (int) req_size);
4684 
4685         return NULL;
4686     }
4687 
4688     out = nxt_port_mmap_get_buf(task, port,
4689               nxt_min(req_size + r->body.preread_size, PORT_MMAP_DATA_SIZE));
4690     if (nxt_slow_path(out == NULL)) {
4691         return NULL;
4692     }
4693 
4694     req = (nxt_unit_request_t *) out->mem.free;
4695     out->mem.free += req_size;
4696 
4697     req->content_length = h->parsed_content_length;
4698 
4699     p = (u_char *) (req->fields + fields_count);
4700 
4701     nxt_debug(task, "fields_count=%d", (int) fields_count);
4702 
4703     req->method_length = h->method.length;
4704     nxt_unit_sptr_set(&req->method, p);
4705     p = nxt_cpymem(p, h->method.start, h->method.length);
4706     *p++ = '\0';
4707 
4708     req->version_length = h->version.length;
4709     nxt_unit_sptr_set(&req->version, p);
4710     p = nxt_cpymem(p, h->version.start, h->version.length);
4711     *p++ = '\0';
4712 
4713     req->remote_length = r->remote.length;
4714     nxt_unit_sptr_set(&req->remote, p);
4715     p = nxt_cpymem(p, r->remote.start, r->remote.length);
4716     *p++ = '\0';
4717 
4718     req->local_length = r->local.length;
4719     nxt_unit_sptr_set(&req->local, p);
4720     p = nxt_cpymem(p, r->local.start, r->local.length);
4721     *p++ = '\0';
4722 
4723     req->server_name_length = h->server_name.length;
4724     nxt_unit_sptr_set(&req->server_name, p);
4725     p = nxt_cpymem(p, h->server_name.start, h->server_name.length);
4726     *p++ = '\0';
4727 
4728     target_pos = p;
4729     req->target_length = h->target.length;
4730     nxt_unit_sptr_set(&req->target, p);
4731     p = nxt_cpymem(p, h->target.start, h->target.length);
4732     *p++ = '\0';
4733 
4734     req->path_length = h->path.length;
4735     if (h->path.start == h->target.start) {
4736         nxt_unit_sptr_set(&req->path, target_pos);
4737 
4738     } else {
4739         nxt_unit_sptr_set(&req->path, p);
4740         p = nxt_cpymem(p, h->path.start, h->path.length);
4741         *p++ = '\0';
4742     }
4743 
4744     req->query_length = h->query.length;
4745     if (h->query.start != NULL) {
4746         query_pos = nxt_pointer_to(target_pos,
4747                                    h->query.start - h->target.start);
4748 
4749         nxt_unit_sptr_set(&req->query, query_pos);
4750 
4751     } else {
4752         req->query.offset = 0;
4753     }
4754 
4755     req->content_length_field = NXT_UNIT_NONE_FIELD;
4756     req->content_type_field   = NXT_UNIT_NONE_FIELD;
4757     req->cookie_field         = NXT_UNIT_NONE_FIELD;
4758 
4759     dst_field = req->fields;
4760 
4761     for (field = nxt_fields_first(h->fields, &iter);
4762          field != NULL;
4763          field = nxt_fields_next(&iter))
4764     {
4765         if (field->skip) {
4766             continue;
4767         }
4768 
4769         dst_field->hash = field->hash;
4770         dst_field->skip = 0;
4771         dst_field->name_length = field->name_length + prefix->length;
4772         dst_field->value_length = field->value_length;
4773 
4774         if (field->value == h->content_length.start) {
4775             req->content_length_field = dst_field - req->fields;
4776 
4777         } else if (field->value == h->content_type.start) {
4778             req->content_type_field = dst_field - req->fields;
4779 
4780         } else if (field->value == h->cookie.start) {
4781             req->cookie_field = dst_field - req->fields;
4782         }
4783 
4784         nxt_debug(task, "add field 0x%04Xd, %d, %d, %p : %d %p",
4785                   (int) field->hash, (int) field->skip,
4786                   (int) field->name_length, field->name,
4787                   (int) field->value_length, field->value);
4788 
4789         if (prefix->length != 0) {
4790             nxt_unit_sptr_set(&dst_field->name, p);
4791             p = nxt_cpymem(p, prefix->start, prefix->length);
4792 
4793             end = field->name + field->name_length;
4794             for (pos = field->name; pos < end; pos++) {
4795                 c = *pos;
4796 
4797                 if (c >= 'a' && c <= 'z') {
4798                     *p++ = (c & ~0x20);
4799                     continue;
4800                 }
4801 
4802                 if (c == '-') {
4803                     *p++ = '_';
4804                     continue;
4805                 }
4806 
4807                 *p++ = c;
4808             }
4809 
4810         } else {
4811             nxt_unit_sptr_set(&dst_field->name, p);
4812             p = nxt_cpymem(p, field->name, field->name_length);
4813         }
4814 
4815         *p++ = '\0';
4816 
4817         nxt_unit_sptr_set(&dst_field->value, p);
4818         p = nxt_cpymem(p, field->value, field->value_length);
4819 
4820         if (prefix->length != 0) {
4821             dup_iter = iter;
4822 
4823             for (dup = nxt_fields_next(&dup_iter);
4824                  dup != NULL;
4825                  dup = nxt_fields_next(&dup_iter))
4826             {
4827                 if (dup->name_length != field->name_length
4828                     || dup->skip
4829                     || dup->hash != field->hash
4830                     || nxt_memcasecmp(dup->name, field->name, dup->name_length))
4831                 {
4832                     continue;
4833                 }
4834 
4835                 p = nxt_cpymem(p, ", ", 2);
4836                 p = nxt_cpymem(p, dup->value, dup->value_length);
4837 
4838                 dst_field->value_length += 2 + dup->value_length;
4839 
4840                 dup->skip = 1;
4841             }
4842         }
4843 
4844         *p++ = '\0';
4845 
4846         dst_field++;
4847     }
4848 
4849     req->fields_count = dst_field - req->fields;
4850 
4851     nxt_unit_sptr_set(&req->preread_content, out->mem.free);
4852 
4853     buf = out;
4854     tail = &buf->next;
4855 
4856     for (b = r->body.buf; b != NULL; b = b->next) {
4857         size = nxt_buf_mem_used_size(&b->mem);
4858         pos = b->mem.pos;
4859 
4860         while (size > 0) {
4861             if (buf == NULL) {
4862                 free_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
4863 
4864                 buf = nxt_port_mmap_get_buf(task, port, free_size);
4865                 if (nxt_slow_path(buf == NULL)) {
4866                     while (out != NULL) {
4867                         buf = out->next;
4868                         out->completion_handler(task, out, out->parent);
4869                         out = buf;
4870                     }
4871                     return NULL;
4872                 }
4873 
4874                 *tail = buf;
4875                 tail = &buf->next;
4876 
4877             } else {
4878                 free_size = nxt_buf_mem_free_size(&buf->mem);
4879                 if (free_size < size
4880                     && nxt_port_mmap_increase_buf(task, buf, size, 1)
4881                        == NXT_OK)
4882                 {
4883                     free_size = nxt_buf_mem_free_size(&buf->mem);
4884                 }
4885             }
4886 
4887             if (free_size > 0) {
4888                 copy_size = nxt_min(free_size, size);
4889 
4890                 buf->mem.free = nxt_cpymem(buf->mem.free, pos, copy_size);
4891 
4892                 size -= copy_size;
4893                 pos += copy_size;
4894 
4895                 if (size == 0) {
4896                     break;
4897                 }
4898             }
4899 
4900             buf = NULL;
4901         }
4902     }
4903 
4904     return out;
4905 }
4906 
4907 
4908 static void
4909 nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
4910 {
4911     nxt_app_t                *app;
4912     nxt_bool_t               cancelled, unlinked;
4913     nxt_port_t               *port;
4914     nxt_timer_t              *timer;
4915     nxt_queue_link_t         *lnk;
4916     nxt_req_app_link_t       *pending_ra;
4917     nxt_app_parse_ctx_t      *ar;
4918     nxt_req_conn_link_t      *rc;
4919     nxt_port_select_state_t  state;
4920 
4921     timer = obj;
4922 
4923     nxt_debug(task, "router app timeout");
4924 
4925     ar = nxt_timer_data(timer, nxt_app_parse_ctx_t, timer);
4926     rc = ar->timer_data;
4927     app = rc->app;
4928 
4929     if (app == NULL) {
4930         goto generate_error;
4931     }
4932 
4933     port = NULL;
4934     pending_ra = NULL;
4935 
4936     if (rc->app_port != NULL) {
4937         port = rc->app_port;
4938         rc->app_port = NULL;
4939     }
4940 
4941     if (port == NULL && rc->ra != NULL && rc->ra->app_port != NULL) {
4942         port = rc->ra->app_port;
4943         rc->ra->app_port = NULL;
4944     }
4945 
4946     if (port == NULL) {
4947         goto generate_error;
4948     }
4949 
4950     nxt_thread_mutex_lock(&app->mutex);
4951 
4952     unlinked = nxt_queue_chk_remove(&port->app_link);
4953 
4954     if (!nxt_queue_is_empty(&port->pending_requests)) {
4955         lnk = nxt_queue_first(&port->pending_requests);
4956 
4957         pending_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t,
4958                                          link_port_pending);
4959 
4960         nxt_assert(pending_ra->link_app_pending.next != NULL);
4961 
4962         nxt_debug(task, "app '%V' pending request #%uD found",
4963                   &app->name, pending_ra->stream);
4964 
4965         cancelled = nxt_router_msg_cancel(task, &pending_ra->msg_info,
4966                                           pending_ra->stream);
4967 
4968         if (cancelled) {
4969             nxt_router_ra_inc_use(pending_ra);
4970 
4971             state.ra = pending_ra;
4972             state.app = app;
4973 
4974             nxt_router_port_select(task, &state);
4975 
4976         } else {
4977             pending_ra = NULL;
4978         }
4979     }
4980 
4981     nxt_thread_mutex_unlock(&app->mutex);
4982 
4983     if (pending_ra != NULL
4984         && nxt_router_port_post_select(task, &state) == NXT_OK)
4985     {
4986         nxt_router_app_prepare_request(task, pending_ra);
4987     }
4988 
4989     nxt_debug(task, "send quit to app '%V' pid %PI", &app->name, port->pid);
4990 
4991     nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
4992 
4993     nxt_port_use(task, port, unlinked ? -2 : -1);
4994 
4995 generate_error:
4996 
4997     nxt_http_request_error(task, ar->request, NXT_HTTP_SERVICE_UNAVAILABLE);
4998 
4999     nxt_router_rc_unlink(task, rc);
5000 }
5001