nxt_router.c (317:94010c8bd7bc) nxt_router.c (318:c2442f5e054d)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>
9#include <nxt_conf.h>
10
11
12typedef struct {
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>
9#include <nxt_conf.h>
10
11
12typedef struct {
13 nxt_str_t type;
14 uint32_t workers;
13 nxt_str_t type;
14 uint32_t workers;
15 nxt_msec_t timeout;
16 uint32_t requests;
17 nxt_conf_value_t *limits_value;
15} nxt_router_app_conf_t;
16
17
18typedef struct {
19 nxt_str_t application;
20} nxt_router_listener_conf_t;
21
22
23typedef struct nxt_req_app_link_s nxt_req_app_link_t;
24typedef struct nxt_start_worker_s nxt_start_worker_t;
25
26struct nxt_start_worker_s {
27 nxt_app_t *app;
28 nxt_req_app_link_t *ra;
29
30 nxt_work_t work;
31};
32
33
18} nxt_router_app_conf_t;
19
20
21typedef struct {
22 nxt_str_t application;
23} nxt_router_listener_conf_t;
24
25
26typedef struct nxt_req_app_link_s nxt_req_app_link_t;
27typedef struct nxt_start_worker_s nxt_start_worker_t;
28
29struct nxt_start_worker_s {
30 nxt_app_t *app;
31 nxt_req_app_link_t *ra;
32
33 nxt_work_t work;
34};
35
36
37typedef struct {
38 uint32_t stream;
39 nxt_conn_t *conn;
40 nxt_port_t *app_port;
41 nxt_req_app_link_t *ra;
42
43 nxt_queue_link_t link; /* for nxt_conn_t.requests */
44} nxt_req_conn_link_t;
45
46
34struct nxt_req_app_link_s {
47struct nxt_req_app_link_s {
35 nxt_req_id_t req_id;
48 uint32_t stream;
36 nxt_port_t *app_port;
49 nxt_port_t *app_port;
50 nxt_pid_t app_pid;
37 nxt_port_t *reply_port;
38 nxt_app_parse_ctx_t *ap;
39 nxt_req_conn_link_t *rc;
40
41 nxt_queue_link_t link; /* for nxt_app_t.requests */
42
43 nxt_mp_t *mem_pool;
44 nxt_work_t work;
45};
46
47
48typedef struct {
49 nxt_socket_conf_t *socket_conf;
50 nxt_router_temp_conf_t *temp_conf;
51} nxt_socket_rpc_t;
52
53
51 nxt_port_t *reply_port;
52 nxt_app_parse_ctx_t *ap;
53 nxt_req_conn_link_t *rc;
54
55 nxt_queue_link_t link; /* for nxt_app_t.requests */
56
57 nxt_mp_t *mem_pool;
58 nxt_work_t work;
59};
60
61
62typedef struct {
63 nxt_socket_conf_t *socket_conf;
64 nxt_router_temp_conf_t *temp_conf;
65} nxt_socket_rpc_t;
66
67
68typedef struct {
69 nxt_mp_t *mem_pool;
70 nxt_port_recv_msg_t msg;
71 nxt_work_t work;
72} nxt_remove_pid_msg_t;
73
74
75static void nxt_router_worker_remove_pid_handler(nxt_task_t *task, void *obj,
76 void *data);
77static void nxt_router_worker_remove_pid_done(nxt_task_t *task, void *obj,
78 void *data);
79
54static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
55static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
56static void nxt_router_conf_ready(nxt_task_t *task,
57 nxt_router_temp_conf_t *tmcf);
58static void nxt_router_conf_error(nxt_task_t *task,
59 nxt_router_temp_conf_t *tmcf);
60static void nxt_router_conf_send(nxt_task_t *task,
61 nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);

--- 38 unchanged lines hidden (view full) ---

100 nxt_event_engine_t *engine);
101static void nxt_router_apps_sort(nxt_router_t *router,
102 nxt_router_temp_conf_t *tmcf);
103
104static void nxt_router_engines_post(nxt_router_t *router,
105 nxt_router_temp_conf_t *tmcf);
106static void nxt_router_engine_post(nxt_event_engine_t *engine,
107 nxt_work_t *jobs);
80static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
81static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
82static void nxt_router_conf_ready(nxt_task_t *task,
83 nxt_router_temp_conf_t *tmcf);
84static void nxt_router_conf_error(nxt_task_t *task,
85 nxt_router_temp_conf_t *tmcf);
86static void nxt_router_conf_send(nxt_task_t *task,
87 nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);

--- 38 unchanged lines hidden (view full) ---

126 nxt_event_engine_t *engine);
127static void nxt_router_apps_sort(nxt_router_t *router,
128 nxt_router_temp_conf_t *tmcf);
129
130static void nxt_router_engines_post(nxt_router_t *router,
131 nxt_router_temp_conf_t *tmcf);
132static void nxt_router_engine_post(nxt_event_engine_t *engine,
133 nxt_work_t *jobs);
108static void nxt_router_app_data_handler(nxt_task_t *task,
109 nxt_port_recv_msg_t *msg);
110
111static void nxt_router_thread_start(void *data);
112static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
113 void *data);
114static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
115 void *data);
116static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
117 void *data);

--- 6 unchanged lines hidden (view full) ---

124static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
125 void *data);
126static void nxt_router_conf_release(nxt_task_t *task,
127 nxt_socket_conf_joint_t *joint);
128
129static void nxt_router_send_sw_request(nxt_task_t *task, void *obj,
130 void *data);
131static nxt_bool_t nxt_router_app_free(nxt_task_t *task, nxt_app_t *app);
134
135static void nxt_router_thread_start(void *data);
136static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
137 void *data);
138static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
139 void *data);
140static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
141 void *data);

--- 6 unchanged lines hidden (view full) ---

148static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
149 void *data);
150static void nxt_router_conf_release(nxt_task_t *task,
151 nxt_socket_conf_joint_t *joint);
152
153static void nxt_router_send_sw_request(nxt_task_t *task, void *obj,
154 void *data);
155static nxt_bool_t nxt_router_app_free(nxt_task_t *task, nxt_app_t *app);
132static nxt_port_t * nxt_router_app_get_port(nxt_app_t *app, uint32_t req_id);
156static nxt_port_t * nxt_router_app_get_port(nxt_app_t *app, uint32_t stream);
133static void nxt_router_app_release_port(nxt_task_t *task, void *obj,
134 void *data);
135
136static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data);
137static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj,
138 void *data);
139static void nxt_router_conn_http_body_read(nxt_task_t *task, void *obj,
140 void *data);

--- 7 unchanged lines hidden (view full) ---

148 nxt_app_wmsg_t *wmsg);
149static nxt_int_t nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
150 nxt_app_wmsg_t *wmsg);
151static void nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data);
152static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data);
153static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data);
154static void nxt_router_conn_error(nxt_task_t *task, void *obj, void *data);
155static void nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data);
157static void nxt_router_app_release_port(nxt_task_t *task, void *obj,
158 void *data);
159
160static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data);
161static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj,
162 void *data);
163static void nxt_router_conn_http_body_read(nxt_task_t *task, void *obj,
164 void *data);

--- 7 unchanged lines hidden (view full) ---

172 nxt_app_wmsg_t *wmsg);
173static nxt_int_t nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
174 nxt_app_wmsg_t *wmsg);
175static void nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data);
176static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data);
177static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data);
178static void nxt_router_conn_error(nxt_task_t *task, void *obj, void *data);
179static void nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data);
180static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
156static nxt_msec_t nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data);
157
158static void nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code,
159 const char* fmt, ...);
160
161static nxt_router_t *nxt_router;
162
163

--- 44 unchanged lines hidden (view full) ---

208
209 if (nxt_slow_path(sw == NULL)) {
210 return NULL;
211 }
212
213 sw->app = app;
214 sw->ra = ra;
215
181static nxt_msec_t nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data);
182
183static void nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code,
184 const char* fmt, ...);
185
186static nxt_router_t *nxt_router;
187
188

--- 44 unchanged lines hidden (view full) ---

233
234 if (nxt_slow_path(sw == NULL)) {
235 return NULL;
236 }
237
238 sw->app = app;
239 sw->ra = ra;
240
216 nxt_debug(task, "sw %p create, request #%uxD, app '%V' %p", sw,
217 ra->req_id, &app->name, app);
241 nxt_debug(task, "sw %p create, stream #%uD, app '%V' %p", sw,
242 ra->stream, &app->name, app);
218
219 rt = task->thread->runtime;
220 main_port = rt->port_by_type[NXT_PROCESS_MAIN];
221
222 sw->work.handler = nxt_router_send_sw_request;
223 sw->work.task = &main_port->engine->task;
224 sw->work.obj = sw;
225 sw->work.data = task->thread->engine;

--- 17 unchanged lines hidden (view full) ---

243nxt_router_sw_release(nxt_task_t *task, nxt_start_worker_t *sw)
244{
245 nxt_debug(task, "sw %p release", sw);
246
247 nxt_free(sw);
248}
249
250
243
244 rt = task->thread->runtime;
245 main_port = rt->port_by_type[NXT_PROCESS_MAIN];
246
247 sw->work.handler = nxt_router_send_sw_request;
248 sw->work.task = &main_port->engine->task;
249 sw->work.obj = sw;
250 sw->work.data = task->thread->engine;

--- 17 unchanged lines hidden (view full) ---

268nxt_router_sw_release(nxt_task_t *task, nxt_start_worker_t *sw)
269{
270 nxt_debug(task, "sw %p release", sw);
271
272 nxt_free(sw);
273}
274
275
276nxt_inline void
277nxt_router_rc_unlink(nxt_req_conn_link_t *rc)
278{
279 nxt_queue_remove(&rc->link);
280
281 if (rc->ra != NULL) {
282 rc->ra->rc = NULL;
283 rc->ra = NULL;
284 }
285
286 rc->conn = NULL;
287}
288
289
251static nxt_req_app_link_t *
252nxt_router_ra_create(nxt_task_t *task, nxt_req_conn_link_t *rc)
253{
254 nxt_mp_t *mp;
290static nxt_req_app_link_t *
291nxt_router_ra_create(nxt_task_t *task, nxt_req_conn_link_t *rc)
292{
293 nxt_mp_t *mp;
294 nxt_event_engine_t *engine;
255 nxt_req_app_link_t *ra;
256
257 mp = rc->conn->mem_pool;
295 nxt_req_app_link_t *ra;
296
297 mp = rc->conn->mem_pool;
298 engine = task->thread->engine;
258
259 ra = nxt_mp_retain(mp, sizeof(nxt_req_app_link_t));
260
261 if (nxt_slow_path(ra == NULL)) {
262 return NULL;
263 }
264
299
300 ra = nxt_mp_retain(mp, sizeof(nxt_req_app_link_t));
301
302 if (nxt_slow_path(ra == NULL)) {
303 return NULL;
304 }
305
265 nxt_debug(task, "ra #%uxD create", rc->req_id);
306 nxt_debug(task, "ra stream #%uD create", rc->stream);
266
267 nxt_memzero(ra, sizeof(nxt_req_app_link_t));
268
307
308 nxt_memzero(ra, sizeof(nxt_req_app_link_t));
309
269 ra->req_id = rc->req_id;
270 ra->app_port = NULL;
310 ra->stream = rc->stream;
311 ra->app_pid = -1;
271 ra->rc = rc;
312 ra->rc = rc;
313 rc->ra = ra;
314 ra->reply_port = engine->port;
272
273 ra->mem_pool = mp;
274
275 ra->work.handler = NULL;
315
316 ra->mem_pool = mp;
317
318 ra->work.handler = NULL;
276 ra->work.task = &task->thread->engine->task;
319 ra->work.task = &engine->task;
277 ra->work.obj = ra;
320 ra->work.obj = ra;
278 ra->work.data = task->thread->engine;
321 ra->work.data = engine;
279
280 return ra;
281}
282
283
284static void
285nxt_router_ra_release(nxt_task_t *task, void *obj, void *data)
286{
322
323 return ra;
324}
325
326
327static void
328nxt_router_ra_release(nxt_task_t *task, void *obj, void *data)
329{
330 nxt_port_t *app_port;
287 nxt_req_app_link_t *ra;
288 nxt_event_engine_t *engine;
289
290 ra = obj;
291 engine = data;
292
331 nxt_req_app_link_t *ra;
332 nxt_event_engine_t *engine;
333
334 ra = obj;
335 engine = data;
336
337 if (ra->app_port != NULL) {
338
339 app_port = ra->app_port;
340 ra->app_port = NULL;
341
342 if (task->thread->engine != engine) {
343 ra->app_pid = app_port->pid;
344 }
345
346 nxt_router_app_release_port(task, app_port, app_port->app);
347
348#if 0
349 /* Uncomment to hold app port until complete response received. */
350 if (ra->rc != NULL) {
351 ra->rc->app_port = ra->app_port;
352
353 } else {
354 nxt_router_app_release_port(task, ra->app_port, ra->app_port->app);
355 }
356#endif
357 }
358
293 if (task->thread->engine != engine) {
294 ra->work.handler = nxt_router_ra_release;
295 ra->work.task = &engine->task;
296 ra->work.next = NULL;
297
359 if (task->thread->engine != engine) {
360 ra->work.handler = nxt_router_ra_release;
361 ra->work.task = &engine->task;
362 ra->work.next = NULL;
363
298 nxt_debug(task, "ra #%uxD post release to %p", ra->req_id, engine);
364 nxt_debug(task, "ra stream #%uD post release to %p",
365 ra->stream, engine);
299
300 nxt_event_engine_post(engine, &ra->work);
301
302 return;
303 }
304
366
367 nxt_event_engine_post(engine, &ra->work);
368
369 return;
370 }
371
305 nxt_debug(task, "ra #%uxD release", ra->req_id);
372 if (ra->rc != NULL && ra->app_pid != -1) {
373 nxt_port_rpc_ex_set_peer(task, engine->port, ra->rc, ra->app_pid);
374 }
306
375
307 if (ra->app_port != NULL) {
376 nxt_debug(task, "ra stream #%uD release", ra->stream);
308
377
309 nxt_router_app_release_port(task, ra->app_port, ra->app_port->app);
378 nxt_mp_release(ra->mem_pool, ra);
379}
310
380
311#if 0
312 /* Uncomment to hold app port until complete response received. */
313 if (ra->rc->conn != NULL) {
314 ra->rc->app_port = ra->app_port;
315
381
316 } else {
317 nxt_router_app_release_port(task, ra->app_port, ra->app_port->app);
318 }
319#endif
382static void
383nxt_router_ra_abort(nxt_task_t *task, void *obj, void *data)
384{
385 nxt_conn_t *c;
386 nxt_req_app_link_t *ra;
387 nxt_event_engine_t *engine;
388
389 ra = obj;
390 engine = data;
391
392 if (task->thread->engine != engine) {
393 ra->work.handler = nxt_router_ra_abort;
394 ra->work.task = &engine->task;
395 ra->work.next = NULL;
396
397 nxt_debug(task, "ra stream #%uD post abort to %p", ra->stream, engine);
398
399 nxt_event_engine_post(engine, &ra->work);
400
401 return;
320 }
321
402 }
403
404 nxt_debug(task, "ra stream #%uD abort", ra->stream);
405
406 if (ra->rc != NULL) {
407 c = ra->rc->conn;
408
409 nxt_router_gen_error(task, c, 500,
410 "Failed to start application worker");
411 }
412
322 nxt_mp_release(ra->mem_pool, ra);
323}
324
325
326void
327nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
328{
329 nxt_port_new_port_handler(task, msg);

--- 49 unchanged lines hidden (view full) ---

379 nxt_router_conf_error(task, tmcf);
380 }
381}
382
383
384void
385nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
386{
413 nxt_mp_release(ra->mem_pool, ra);
414}
415
416
417void
418nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
419{
420 nxt_port_new_port_handler(task, msg);

--- 49 unchanged lines hidden (view full) ---

470 nxt_router_conf_error(task, tmcf);
471 }
472}
473
474
475void
476nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
477{
478 nxt_mp_t *mp;
479 nxt_buf_t *buf;
480 nxt_event_engine_t *engine;
481 nxt_remove_pid_msg_t *rp;
482
387 nxt_port_remove_pid_handler(task, msg);
388
389 if (msg->port_msg.stream == 0) {
390 return;
391 }
392
483 nxt_port_remove_pid_handler(task, msg);
484
485 if (msg->port_msg.stream == 0) {
486 return;
487 }
488
489 mp = nxt_mp_create(1024, 128, 256, 32);
490
491 buf = nxt_buf_mem_alloc(mp, nxt_buf_used_size(msg->buf), 0);
492 buf->mem.free = nxt_cpymem(buf->mem.free, msg->buf->mem.pos,
493 nxt_buf_used_size(msg->buf));
494
495 nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
496 {
497 rp = nxt_mp_retain(mp, sizeof(nxt_remove_pid_msg_t));
498
499 rp->mem_pool = mp;
500
501 rp->msg.fd = msg->fd;
502 rp->msg.buf = buf;
503 rp->msg.port = engine->port;
504 rp->msg.port_msg = msg->port_msg;
505 rp->msg.size = msg->size;
506 rp->msg.new_port = NULL;
507
508 rp->work.handler = nxt_router_worker_remove_pid_handler;
509 rp->work.task = &engine->task;
510 rp->work.obj = rp;
511 rp->work.data = task->thread->engine;
512 rp->work.next = NULL;
513
514 nxt_event_engine_post(engine, &rp->work);
515 }
516 nxt_queue_loop;
517
393 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
394
395 nxt_port_rpc_handler(task, msg);
396}
397
398
518 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
519
520 nxt_port_rpc_handler(task, msg);
521}
522
523
524static void
525nxt_router_worker_remove_pid_handler(nxt_task_t *task, void *obj, void *data)
526{
527 nxt_event_engine_t *engine;
528 nxt_remove_pid_msg_t *rp;
529
530 rp = obj;
531
532 nxt_port_remove_pid_handler(task, &rp->msg);
533
534 engine = rp->work.data;
535
536 rp->work.handler = nxt_router_worker_remove_pid_done;
537 rp->work.task = &engine->task;
538 rp->work.next = NULL;
539
540 nxt_event_engine_post(engine, &rp->work);
541}
542
543
544static void
545nxt_router_worker_remove_pid_done(nxt_task_t *task, void *obj, void *data)
546{
547 nxt_remove_pid_msg_t *rp;
548
549 rp = obj;
550
551 nxt_mp_release(rp->mem_pool, rp);
552}
553
554
399static nxt_router_temp_conf_t *
400nxt_router_temp_conf(nxt_task_t *task)
401{
402 nxt_mp_t *mp, *tmp;
403 nxt_router_conf_t *rtcf;
404 nxt_router_temp_conf_t *tmcf;
405
406 mp = nxt_mp_create(1024, 128, 256, 32);

--- 195 unchanged lines hidden (view full) ---

602 offsetof(nxt_router_app_conf_t, type),
603 },
604
605 {
606 nxt_string("workers"),
607 NXT_CONF_MAP_INT32,
608 offsetof(nxt_router_app_conf_t, workers),
609 },
555static nxt_router_temp_conf_t *
556nxt_router_temp_conf(nxt_task_t *task)
557{
558 nxt_mp_t *mp, *tmp;
559 nxt_router_conf_t *rtcf;
560 nxt_router_temp_conf_t *tmcf;
561
562 mp = nxt_mp_create(1024, 128, 256, 32);

--- 195 unchanged lines hidden (view full) ---

758 offsetof(nxt_router_app_conf_t, type),
759 },
760
761 {
762 nxt_string("workers"),
763 NXT_CONF_MAP_INT32,
764 offsetof(nxt_router_app_conf_t, workers),
765 },
766
767 {
768 nxt_string("limits"),
769 NXT_CONF_MAP_PTR,
770 offsetof(nxt_router_app_conf_t, limits_value),
771 },
610};
611
612
772};
773
774
775static nxt_conf_map_t nxt_router_app_limits_conf[] = {
776 {
777 nxt_string("timeout"),
778 NXT_CONF_MAP_MSEC,
779 offsetof(nxt_router_app_conf_t, timeout),
780 },
781
782 {
783 nxt_string("requests"),
784 NXT_CONF_MAP_INT32,
785 offsetof(nxt_router_app_conf_t, requests),
786 },
787};
788
789
613static nxt_conf_map_t nxt_router_listener_conf[] = {
614 {
615 nxt_string("application"),
616 NXT_CONF_MAP_STR,
617 offsetof(nxt_router_listener_conf_t, application),
618 },
619};
620

--- 128 unchanged lines hidden (view full) ---

749 nxt_free(app);
750
751 nxt_queue_remove(&prev->link);
752 nxt_queue_insert_tail(&tmcf->previous, &prev->link);
753 continue;
754 }
755
756 apcf.workers = 1;
790static nxt_conf_map_t nxt_router_listener_conf[] = {
791 {
792 nxt_string("application"),
793 NXT_CONF_MAP_STR,
794 offsetof(nxt_router_listener_conf_t, application),
795 },
796};
797

--- 128 unchanged lines hidden (view full) ---

926 nxt_free(app);
927
928 nxt_queue_remove(&prev->link);
929 nxt_queue_insert_tail(&tmcf->previous, &prev->link);
930 continue;
931 }
932
933 apcf.workers = 1;
934 apcf.timeout = 0;
935 apcf.requests = 0;
936 apcf.limits_value = NULL;
757
758 ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
759 nxt_nitems(nxt_router_app_conf), &apcf);
760 if (ret != NXT_OK) {
761 nxt_log(task, NXT_LOG_CRIT, "application map error");
762 goto app_fail;
763 }
764
937
938 ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
939 nxt_nitems(nxt_router_app_conf), &apcf);
940 if (ret != NXT_OK) {
941 nxt_log(task, NXT_LOG_CRIT, "application map error");
942 goto app_fail;
943 }
944
945 if (apcf.limits_value != NULL) {
946
947 if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) {
948 nxt_log(task, NXT_LOG_CRIT, "application limits is not object");
949 goto app_fail;
950 }
951
952 ret = nxt_conf_map_object(mp, apcf.limits_value,
953 nxt_router_app_limits_conf,
954 nxt_nitems(nxt_router_app_limits_conf),
955 &apcf);
956 if (ret != NXT_OK) {
957 nxt_log(task, NXT_LOG_CRIT, "application limits map error");
958 goto app_fail;
959 }
960 }
961
765 nxt_debug(task, "application type: %V", &apcf.type);
766 nxt_debug(task, "application workers: %D", apcf.workers);
962 nxt_debug(task, "application type: %V", &apcf.type);
963 nxt_debug(task, "application workers: %D", apcf.workers);
964 nxt_debug(task, "application timeout: %D", apcf.timeout);
965 nxt_debug(task, "application requests: %D", apcf.requests);
767
768 lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
769
770 if (lang == NULL) {
771 nxt_log(task, NXT_LOG_CRIT, "unknown application type: \"%V\"",
772 &apcf.type);
773 goto app_fail;
774 }

--- 22 unchanged lines hidden (view full) ---

797 nxt_queue_init(&app->ports);
798 nxt_queue_init(&app->requests);
799
800 app->name.length = name.length;
801 nxt_memcpy(app->name.start, name.start, name.length);
802
803 app->type = type;
804 app->max_workers = apcf.workers;
966
967 lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
968
969 if (lang == NULL) {
970 nxt_log(task, NXT_LOG_CRIT, "unknown application type: \"%V\"",
971 &apcf.type);
972 goto app_fail;
973 }

--- 22 unchanged lines hidden (view full) ---

996 nxt_queue_init(&app->ports);
997 nxt_queue_init(&app->requests);
998
999 app->name.length = name.length;
1000 nxt_memcpy(app->name.start, name.start, name.length);
1001
1002 app->type = type;
1003 app->max_workers = apcf.workers;
1004 app->timeout = apcf.timeout;
805 app->live = 1;
806 app->prepare_msg = nxt_app_prepare_msg[type];
807
808 nxt_queue_insert_tail(&tmcf->apps, &app->link);
809 }
810
811 http = nxt_conf_get_path(conf, &http_path);
812#if 0

--- 771 unchanged lines hidden (view full) ---

1584
1585
1586static nxt_port_handler_t nxt_router_app_port_handlers[] = {
1587 NULL, /* NXT_PORT_MSG_QUIT */
1588 NULL, /* NXT_PORT_MSG_NEW_PORT */
1589 NULL, /* NXT_PORT_MSG_CHANGE_FILE */
1590 /* TODO: remove mmap_handler from app ports */
1591 nxt_port_mmap_handler, /* NXT_PORT_MSG_MMAP */
1005 app->live = 1;
1006 app->prepare_msg = nxt_app_prepare_msg[type];
1007
1008 nxt_queue_insert_tail(&tmcf->apps, &app->link);
1009 }
1010
1011 http = nxt_conf_get_path(conf, &http_path);
1012#if 0

--- 771 unchanged lines hidden (view full) ---

1784
1785
1786static nxt_port_handler_t nxt_router_app_port_handlers[] = {
1787 NULL, /* NXT_PORT_MSG_QUIT */
1788 NULL, /* NXT_PORT_MSG_NEW_PORT */
1789 NULL, /* NXT_PORT_MSG_CHANGE_FILE */
1790 /* TODO: remove mmap_handler from app ports */
1791 nxt_port_mmap_handler, /* NXT_PORT_MSG_MMAP */
1592 nxt_router_app_data_handler,
1792 nxt_port_rpc_handler, /* NXT_PORT_MSG_DATA */
1593 NULL, /* NXT_PORT_MSG_REMOVE_PID */
1594 NULL, /* NXT_PORT_MSG_READY */
1595 NULL, /* NXT_PORT_MSG_START_WORKER */
1596 NULL, /* NXT_PORT_MSG_SOCKET */
1597 NULL, /* NXT_PORT_MSG_MODULES */
1598 NULL, /* NXT_PORT_MSG_CONF_STORE */
1599 nxt_port_rpc_handler,
1600 nxt_port_rpc_handler,

--- 402 unchanged lines hidden (view full) ---

2003{
2004 .ready_handler = nxt_router_conn_ready,
2005 .close_handler = nxt_router_conn_close,
2006 .error_handler = nxt_router_conn_error,
2007};
2008
2009
2010static void
1793 NULL, /* NXT_PORT_MSG_REMOVE_PID */
1794 NULL, /* NXT_PORT_MSG_READY */
1795 NULL, /* NXT_PORT_MSG_START_WORKER */
1796 NULL, /* NXT_PORT_MSG_SOCKET */
1797 NULL, /* NXT_PORT_MSG_MODULES */
1798 NULL, /* NXT_PORT_MSG_CONF_STORE */
1799 nxt_port_rpc_handler,
1800 nxt_port_rpc_handler,

--- 402 unchanged lines hidden (view full) ---

2203{
2204 .ready_handler = nxt_router_conn_ready,
2205 .close_handler = nxt_router_conn_close,
2206 .error_handler = nxt_router_conn_error,
2207};
2208
2209
2210static void
2011nxt_router_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
2211nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2212 void *data)
2012{
2013 size_t dump_size;
2014 nxt_buf_t *b, *last;
2015 nxt_conn_t *c;
2016 nxt_req_conn_link_t *rc;
2213{
2214 size_t dump_size;
2215 nxt_buf_t *b, *last;
2216 nxt_conn_t *c;
2217 nxt_req_conn_link_t *rc;
2017 nxt_event_engine_t *engine;
2018
2019 b = msg->buf;
2218
2219 b = msg->buf;
2020 engine = task->thread->engine;
2220 rc = data;
2021
2221
2022 rc = nxt_event_engine_request_find(engine, msg->port_msg.stream);
2023 if (nxt_slow_path(rc == NULL)) {
2024 nxt_debug(task, "request id %08uxD not found", msg->port_msg.stream);
2025
2026 return;
2027 }
2028
2029 c = rc->conn;
2030
2031 dump_size = nxt_buf_used_size(b);
2032
2033 if (dump_size > 300) {
2034 dump_size = 300;
2035 }
2036

--- 16 unchanged lines hidden (view full) ---

2053 nxt_buf_chain_add(&b, last);
2054
2055 if (rc->app_port != NULL) {
2056 nxt_router_app_release_port(task, rc->app_port, rc->app_port->app);
2057
2058 rc->app_port = NULL;
2059 }
2060
2222 c = rc->conn;
2223
2224 dump_size = nxt_buf_used_size(b);
2225
2226 if (dump_size > 300) {
2227 dump_size = 300;
2228 }
2229

--- 16 unchanged lines hidden (view full) ---

2246 nxt_buf_chain_add(&b, last);
2247
2248 if (rc->app_port != NULL) {
2249 nxt_router_app_release_port(task, rc->app_port, rc->app_port->app);
2250
2251 rc->app_port = NULL;
2252 }
2253
2061 rc->conn = NULL;
2254 nxt_router_rc_unlink(rc);
2062 }
2063
2064 if (b == NULL) {
2065 return;
2066 }
2067
2068 if (msg->buf == b) {
2069 /* Disable instant buffer completion/re-using by port. */

--- 9 unchanged lines hidden (view full) ---

2079 } else {
2080 nxt_debug(task, "router data attach out bufs to existing chain");
2081
2082 nxt_buf_chain_add(&c->write, b);
2083 }
2084}
2085
2086
2255 }
2256
2257 if (b == NULL) {
2258 return;
2259 }
2260
2261 if (msg->buf == b) {
2262 /* Disable instant buffer completion/re-using by port. */

--- 9 unchanged lines hidden (view full) ---

2272 } else {
2273 nxt_debug(task, "router data attach out bufs to existing chain");
2274
2275 nxt_buf_chain_add(&c->write, b);
2276 }
2277}
2278
2279
2280static void
2281nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2282 void *data)
2283{
2284 nxt_req_conn_link_t *rc;
2285
2286 rc = data;
2287
2288 nxt_router_gen_error(task, rc->conn, 500,
2289 "Application terminated unexpectedly");
2290
2291 nxt_router_rc_unlink(rc);
2292}
2293
2294
2087nxt_inline const char *
2088nxt_router_text_by_code(int code)
2089{
2090 switch (code) {
2091 case 400: return "Bad request";
2092 case 404: return "Not found";
2093 case 403: return "Forbidden";
2094 case 408: return "Request Timeout";

--- 47 unchanged lines hidden (view full) ---

2142
2143
2144
2145static void
2146nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code,
2147 const char* fmt, ...)
2148{
2149 va_list args;
2295nxt_inline const char *
2296nxt_router_text_by_code(int code)
2297{
2298 switch (code) {
2299 case 400: return "Bad request";
2300 case 404: return "Not found";
2301 case 403: return "Forbidden";
2302 case 408: return "Request Timeout";

--- 47 unchanged lines hidden (view full) ---

2350
2351
2352
2353static void
2354nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code,
2355 const char* fmt, ...)
2356{
2357 va_list args;
2358 nxt_mp_t *mp;
2150 nxt_buf_t *b;
2151
2359 nxt_buf_t *b;
2360
2361 /* TODO: fix when called from main thread */
2362 /* TODO: fix when called in the middle of response */
2363
2364 mp = nxt_mp_create(1024, 128, 256, 32);
2365
2152 va_start(args, fmt);
2366 va_start(args, fmt);
2153 b = nxt_router_get_error_buf(task, c->mem_pool, code, fmt, args);
2367 b = nxt_router_get_error_buf(task, mp, code, fmt, args);
2154 va_end(args);
2155
2368 va_end(args);
2369
2156 if (c->socket.data != NULL) {
2157 nxt_mp_free(c->mem_pool, c->socket.data);
2158 c->socket.data = NULL;
2159 }
2160
2161 if (c->socket.fd == -1) {
2370 if (c->socket.fd == -1) {
2162 nxt_mp_release(c->mem_pool, b->next);
2163 nxt_mp_release(c->mem_pool, b);
2371 nxt_mp_release(mp, b->next);
2372 nxt_mp_release(mp, b);
2164 return;
2165 }
2166
2167 if (c->write == NULL) {
2168 c->write = b;
2169 c->write_state = &nxt_router_conn_write_state;
2170
2171 nxt_conn_write(task->thread->engine, c);

--- 27 unchanged lines hidden (view full) ---

2199
2200 nxt_router_sw_release(task, sw);
2201}
2202
2203
2204static void
2205nxt_router_sw_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data)
2206{
2373 return;
2374 }
2375
2376 if (c->write == NULL) {
2377 c->write = b;
2378 c->write_state = &nxt_router_conn_write_state;
2379
2380 nxt_conn_write(task->thread->engine, c);

--- 27 unchanged lines hidden (view full) ---

2408
2409 nxt_router_sw_release(task, sw);
2410}
2411
2412
2413static void
2414nxt_router_sw_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data)
2415{
2416 nxt_app_t *app;
2417 nxt_queue_link_t *lnk;
2418 nxt_req_app_link_t *ra;
2207 nxt_start_worker_t *sw;
2208
2209 sw = data;
2210
2211 nxt_assert(sw != NULL);
2419 nxt_start_worker_t *sw;
2420
2421 sw = data;
2422
2423 nxt_assert(sw != NULL);
2424 nxt_assert(sw->app != NULL);
2212 nxt_assert(sw->app->pending_workers != 0);
2213
2425 nxt_assert(sw->app->pending_workers != 0);
2426
2427 app = sw->app;
2428
2214 sw->app->pending_workers--;
2215
2216 nxt_debug(task, "sw %p error, failed to start app '%V'",
2429 sw->app->pending_workers--;
2430
2431 nxt_debug(task, "sw %p error, failed to start app '%V'",
2217 sw, &sw->app->name);
2432 sw, &app->name);
2218
2433
2434 if (!nxt_queue_is_empty(&app->requests)) {
2435 lnk = nxt_queue_last(&app->requests);
2436 nxt_queue_remove(lnk);
2437
2438 ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link);
2439
2440 nxt_debug(task, "app '%V' %p abort next stream #%uD",
2441 &app->name, app, ra->stream);
2442
2443 nxt_router_ra_abort(task, ra, ra->work.data);
2444 }
2445
2219 nxt_router_sw_release(task, sw);
2220}
2221
2222
2223static void
2224nxt_router_send_sw_request(nxt_task_t *task, void *obj, void *data)
2225{
2226 size_t size;

--- 5 unchanged lines hidden (view full) ---

2232 nxt_start_worker_t *sw;
2233 nxt_req_app_link_t *ra;
2234
2235 sw = obj;
2236 app = sw->app;
2237
2238 if (nxt_queue_is_empty(&app->requests)) {
2239 ra = sw->ra;
2446 nxt_router_sw_release(task, sw);
2447}
2448
2449
2450static void
2451nxt_router_send_sw_request(nxt_task_t *task, void *obj, void *data)
2452{
2453 size_t size;

--- 5 unchanged lines hidden (view full) ---

2459 nxt_start_worker_t *sw;
2460 nxt_req_app_link_t *ra;
2461
2462 sw = obj;
2463 app = sw->app;
2464
2465 if (nxt_queue_is_empty(&app->requests)) {
2466 ra = sw->ra;
2240 app_port = nxt_router_app_get_port(app, ra->req_id);
2467 app_port = nxt_router_app_get_port(app, ra->stream);
2241
2242 if (app_port != NULL) {
2468
2469 if (app_port != NULL) {
2243 nxt_debug(task, "app '%V' %p process request #%uxD",
2244 &app->name, app, ra->req_id);
2470 nxt_debug(task, "app '%V' %p process stream #%uD",
2471 &app->name, app, ra->stream);
2245
2246 ra->app_port = app_port;
2247
2248 nxt_router_process_http_request_mp(task, ra, app_port);
2249
2250 nxt_router_ra_release(task, ra, ra->work.data);
2251 nxt_router_sw_release(task, sw);
2252

--- 72 unchanged lines hidden (view full) ---

2325 nxt_router_sw_create(task, app, ra);
2326 }
2327
2328 return 0;
2329}
2330
2331
2332static nxt_port_t *
2472
2473 ra->app_port = app_port;
2474
2475 nxt_router_process_http_request_mp(task, ra, app_port);
2476
2477 nxt_router_ra_release(task, ra, ra->work.data);
2478 nxt_router_sw_release(task, sw);
2479

--- 72 unchanged lines hidden (view full) ---

2552 nxt_router_sw_create(task, app, ra);
2553 }
2554
2555 return 0;
2556}
2557
2558
2559static nxt_port_t *
2333nxt_router_app_get_port(nxt_app_t *app, uint32_t req_id)
2560nxt_router_app_get_port(nxt_app_t *app, uint32_t stream)
2334{
2335 nxt_port_t *port;
2336 nxt_queue_link_t *lnk;
2337
2338 port = NULL;
2339
2340 nxt_thread_mutex_lock(&app->mutex);
2341
2342 if (!nxt_queue_is_empty(&app->ports)) {
2343 lnk = nxt_queue_first(&app->ports);
2344 nxt_queue_remove(lnk);
2345
2346 lnk->next = NULL;
2347
2348 port = nxt_queue_link_data(lnk, nxt_port_t, app_link);
2349
2561{
2562 nxt_port_t *port;
2563 nxt_queue_link_t *lnk;
2564
2565 port = NULL;
2566
2567 nxt_thread_mutex_lock(&app->mutex);
2568
2569 if (!nxt_queue_is_empty(&app->ports)) {
2570 lnk = nxt_queue_first(&app->ports);
2571 nxt_queue_remove(lnk);
2572
2573 lnk->next = NULL;
2574
2575 port = nxt_queue_link_data(lnk, nxt_port_t, app_link);
2576
2350 port->app_req_id = req_id;
2577 port->app_stream = stream;
2351 }
2352
2353 nxt_thread_mutex_unlock(&app->mutex);
2354
2355 return port;
2356}
2357
2358

--- 31 unchanged lines hidden (view full) ---

2390 }
2391
2392 if (!nxt_queue_is_empty(&app->requests)) {
2393 lnk = nxt_queue_first(&app->requests);
2394 nxt_queue_remove(lnk);
2395
2396 ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link);
2397
2578 }
2579
2580 nxt_thread_mutex_unlock(&app->mutex);
2581
2582 return port;
2583}
2584
2585

--- 31 unchanged lines hidden (view full) ---

2617 }
2618
2619 if (!nxt_queue_is_empty(&app->requests)) {
2620 lnk = nxt_queue_first(&app->requests);
2621 nxt_queue_remove(lnk);
2622
2623 ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link);
2624
2398 nxt_debug(task, "app '%V' %p process next request #%uxD",
2399 &app->name, app, ra->req_id);
2625 nxt_debug(task, "app '%V' %p process next stream #%uD",
2626 &app->name, app, ra->stream);
2400
2401 ra->app_port = port;
2627
2628 ra->app_port = port;
2402 port->app_req_id = ra->req_id;
2629 port->app_stream = ra->stream;
2403
2404 nxt_router_process_http_request_mp(task, ra, port);
2405
2406 nxt_router_ra_release(task, ra, ra->work.data);
2407
2408 return;
2409 }
2410
2630
2631 nxt_router_process_http_request_mp(task, ra, port);
2632
2633 nxt_router_ra_release(task, ra, ra->work.data);
2634
2635 return;
2636 }
2637
2411 port->app_req_id = 0;
2638 port->app_stream = 0;
2412
2413 if (port->pair[1] == -1) {
2414 nxt_debug(task, "app '%V' %p port already closed (pid %PI dead?)",
2415 &app->name, app, port->pid);
2416
2417 app->workers--;
2418 nxt_router_app_free(task, app);
2419

--- 27 unchanged lines hidden (view full) ---

2447
2448nxt_bool_t
2449nxt_router_app_remove_port(nxt_port_t *port)
2450{
2451 nxt_app_t *app;
2452 nxt_bool_t busy;
2453
2454 app = port->app;
2639
2640 if (port->pair[1] == -1) {
2641 nxt_debug(task, "app '%V' %p port already closed (pid %PI dead?)",
2642 &app->name, app, port->pid);
2643
2644 app->workers--;
2645 nxt_router_app_free(task, app);
2646

--- 27 unchanged lines hidden (view full) ---

2674
2675nxt_bool_t
2676nxt_router_app_remove_port(nxt_port_t *port)
2677{
2678 nxt_app_t *app;
2679 nxt_bool_t busy;
2680
2681 app = port->app;
2455 busy = port->app_req_id != 0;
2682 busy = port->app_stream != 0;
2456
2457 if (app == NULL) {
2458 nxt_thread_log_debug("port %p app remove, no app", port);
2459
2460 nxt_assert(port->app_link.next == NULL);
2461
2462 return 1;
2463 }

--- 14 unchanged lines hidden (view full) ---

2478 &app->name, app);
2479
2480 app->workers--;
2481 nxt_router_app_free(&port->engine->task, app);
2482
2483 return 1;
2484 }
2485
2683
2684 if (app == NULL) {
2685 nxt_thread_log_debug("port %p app remove, no app", port);
2686
2687 nxt_assert(port->app_link.next == NULL);
2688
2689 return 1;
2690 }

--- 14 unchanged lines hidden (view full) ---

2705 &app->name, app);
2706
2707 app->workers--;
2708 nxt_router_app_free(&port->engine->task, app);
2709
2710 return 1;
2711 }
2712
2486 nxt_thread_log_debug("port %p app remove, busy, app '%V' %p, req #%uxD",
2487 port, &app->name, app, port->app_req_id);
2713 nxt_thread_log_debug("port %p app remove, busy, app '%V' %p, "
2714 "app stream #%uD", port, &app->name, app,
2715 port->app_stream);
2488
2489 return 0;
2490}
2491
2492
2493static nxt_int_t
2494nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
2495{
2496 nxt_app_t *app;
2497 nxt_conn_t *c;
2498 nxt_port_t *port;
2716
2717 return 0;
2718}
2719
2720
2721static nxt_int_t
2722nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
2723{
2724 nxt_app_t *app;
2725 nxt_conn_t *c;
2726 nxt_port_t *port;
2727 nxt_event_engine_t *engine;
2499 nxt_start_worker_t *sw;
2500 nxt_socket_conf_joint_t *joint;
2501
2502 port = NULL;
2503 c = ra->rc->conn;
2504
2505 joint = c->listen->socket.data;
2506 app = joint->socket_conf->application;
2507
2508 if (app == NULL) {
2509 nxt_router_gen_error(task, c, 500,
2510 "Application is NULL in socket_conf");
2511 return NXT_ERROR;
2512 }
2513
2728 nxt_start_worker_t *sw;
2729 nxt_socket_conf_joint_t *joint;
2730
2731 port = NULL;
2732 c = ra->rc->conn;
2733
2734 joint = c->listen->socket.data;
2735 app = joint->socket_conf->application;
2736
2737 if (app == NULL) {
2738 nxt_router_gen_error(task, c, 500,
2739 "Application is NULL in socket_conf");
2740 return NXT_ERROR;
2741 }
2742
2743 engine = task->thread->engine;
2514
2744
2515 port = nxt_router_app_get_port(app, ra->req_id);
2745 nxt_timer_disable(engine, &c->read_timer);
2516
2746
2747 if (app->timeout != 0) {
2748 c->read_timer.handler = nxt_router_app_timeout;
2749 nxt_timer_add(engine, &c->read_timer, app->timeout);
2750 }
2751
2752 port = nxt_router_app_get_port(app, ra->stream);
2753
2517 if (port != NULL) {
2518 nxt_debug(task, "already have port for app '%V'", &app->name);
2519
2520 ra->app_port = port;
2521 return NXT_OK;
2522 }
2523
2524 sw = nxt_router_sw_create(task, app, ra);

--- 210 unchanged lines hidden (view full) ---

2735
2736static void
2737nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
2738 nxt_app_parse_ctx_t *ap)
2739{
2740 nxt_mp_t *port_mp;
2741 nxt_int_t res;
2742 nxt_port_t *port;
2754 if (port != NULL) {
2755 nxt_debug(task, "already have port for app '%V'", &app->name);
2756
2757 ra->app_port = port;
2758 return NXT_OK;
2759 }
2760
2761 sw = nxt_router_sw_create(task, app, ra);

--- 210 unchanged lines hidden (view full) ---

2972
2973static void
2974nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
2975 nxt_app_parse_ctx_t *ap)
2976{
2977 nxt_mp_t *port_mp;
2978 nxt_int_t res;
2979 nxt_port_t *port;
2743 nxt_req_id_t req_id;
2744 nxt_event_engine_t *engine;
2745 nxt_req_app_link_t *ra;
2746 nxt_req_conn_link_t *rc;
2747
2748 engine = task->thread->engine;
2749
2980 nxt_event_engine_t *engine;
2981 nxt_req_app_link_t *ra;
2982 nxt_req_conn_link_t *rc;
2983
2984 engine = task->thread->engine;
2985
2750 do {
2751 req_id = nxt_random(&task->thread->random);
2752 } while (nxt_event_engine_request_find(engine, req_id) != NULL);
2986 rc = nxt_port_rpc_register_handler_ex(task, engine->port,
2987 nxt_router_response_ready_handler,
2988 nxt_router_response_error_handler,
2989 sizeof(nxt_req_conn_link_t));
2753
2990
2754 rc = nxt_conn_request_add(c, req_id);
2755
2756 if (nxt_slow_path(rc == NULL)) {
2757 nxt_router_gen_error(task, c, 500, "Failed to allocate "
2758 "req->conn link");
2759
2760 return;
2761 }
2762
2991 if (nxt_slow_path(rc == NULL)) {
2992 nxt_router_gen_error(task, c, 500, "Failed to allocate "
2993 "req->conn link");
2994
2995 return;
2996 }
2997
2763 nxt_event_engine_request_add(engine, rc);
2998 rc->stream = nxt_port_rpc_ex_stream(rc);
2999 rc->conn = c;
2764
3000
2765 nxt_debug(task, "req_id %uxD linked to conn %p at engine %p",
2766 req_id, c, engine);
3001 nxt_queue_insert_tail(&c->requests, &rc->link);
2767
3002
3003 nxt_debug(task, "stream #%uD linked to conn %p at engine %p",
3004 rc->stream, c, engine);
3005
2768 c->socket.data = NULL;
2769
2770 ra = nxt_router_ra_create(task, rc);
2771
2772 ra->ap = ap;
3006 c->socket.data = NULL;
3007
3008 ra = nxt_router_ra_create(task, rc);
3009
3010 ra->ap = ap;
2773 ra->reply_port = engine->port;
2774
2775 res = nxt_router_app_port(task, ra);
2776
2777 if (res != NXT_OK) {
2778 return;
2779 }
2780
2781 port = ra->app_port;
2782
2783 if (nxt_slow_path(port == NULL)) {
3011
3012 res = nxt_router_app_port(task, ra);
3013
3014 if (res != NXT_OK) {
3015 return;
3016 }
3017
3018 port = ra->app_port;
3019
3020 if (nxt_slow_path(port == NULL)) {
2784 nxt_router_gen_error(task, rc->conn, 500, "Application port not found");
3021 nxt_router_gen_error(task, c, 500, "Application port not found");
2785 return;
2786 }
2787
3022 return;
3023 }
3024
3025 nxt_port_rpc_ex_set_peer(task, engine->port, rc, port->pid);
3026
2788 port_mp = port->mem_pool;
2789 port->mem_pool = c->mem_pool;
2790
2791 nxt_router_process_http_request_mp(task, ra, port);
2792
2793 port->mem_pool = port_mp;
2794
3027 port_mp = port->mem_pool;
3028 port->mem_pool = c->mem_pool;
3029
3030 nxt_router_process_http_request_mp(task, ra, port);
3031
3032 port->mem_pool = port_mp;
3033
2795
2796 nxt_router_ra_release(task, ra, ra->work.data);
2797}
2798
2799
2800static void
2801nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra,
2802 nxt_port_t *port)
2803{
2804 nxt_int_t res;
2805 nxt_port_t *c_port, *reply_port;
2806 nxt_conn_t *c;
2807 nxt_app_wmsg_t wmsg;
2808 nxt_app_parse_ctx_t *ap;
2809
3034 nxt_router_ra_release(task, ra, ra->work.data);
3035}
3036
3037
3038static void
3039nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra,
3040 nxt_port_t *port)
3041{
3042 nxt_int_t res;
3043 nxt_port_t *c_port, *reply_port;
3044 nxt_conn_t *c;
3045 nxt_app_wmsg_t wmsg;
3046 nxt_app_parse_ctx_t *ap;
3047
3048 /* TODO: it is unsafe to use ra->rc and ra->rc->conn in main thread */
3049
3050 nxt_assert(ra->rc != NULL);
3051
2810 reply_port = ra->reply_port;
2811 ap = ra->ap;
2812 c = ra->rc->conn;
2813
2814 c_port = nxt_process_connected_port_find(port->process, reply_port->pid,
2815 reply_port->id);
2816 if (nxt_slow_path(c_port != reply_port)) {
2817 res = nxt_port_send_port(task, port, reply_port, 0);

--- 5 unchanged lines hidden (view full) ---

2823 }
2824
2825 nxt_process_connected_port_add(port->process, reply_port);
2826 }
2827
2828 wmsg.port = port;
2829 wmsg.write = NULL;
2830 wmsg.buf = &wmsg.write;
3052 reply_port = ra->reply_port;
3053 ap = ra->ap;
3054 c = ra->rc->conn;
3055
3056 c_port = nxt_process_connected_port_find(port->process, reply_port->pid,
3057 reply_port->id);
3058 if (nxt_slow_path(c_port != reply_port)) {
3059 res = nxt_port_send_port(task, port, reply_port, 0);

--- 5 unchanged lines hidden (view full) ---

3065 }
3066
3067 nxt_process_connected_port_add(port->process, reply_port);
3068 }
3069
3070 wmsg.port = port;
3071 wmsg.write = NULL;
3072 wmsg.buf = &wmsg.write;
2831 wmsg.stream = ra->req_id;
3073 wmsg.stream = ra->stream;
2832
2833 res = port->app->prepare_msg(task, &ap->r, &wmsg);
2834
2835 if (nxt_slow_path(res != NXT_OK)) {
2836 nxt_router_gen_error(task, c, 500,
2837 "Failed to prepare message for application");
2838 return;
2839 }
2840
2841 nxt_debug(task, "about to send %d bytes buffer to worker port %d",
2842 nxt_buf_used_size(wmsg.write),
2843 wmsg.port->socket.fd);
2844
2845 res = nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA,
3074
3075 res = port->app->prepare_msg(task, &ap->r, &wmsg);
3076
3077 if (nxt_slow_path(res != NXT_OK)) {
3078 nxt_router_gen_error(task, c, 500,
3079 "Failed to prepare message for application");
3080 return;
3081 }
3082
3083 nxt_debug(task, "about to send %d bytes buffer to worker port %d",
3084 nxt_buf_used_size(wmsg.write),
3085 wmsg.port->socket.fd);
3086
3087 res = nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA,
2846 -1, ra->req_id, reply_port->id, wmsg.write);
3088 -1, ra->stream, reply_port->id, wmsg.write);
2847
2848 if (nxt_slow_path(res != NXT_OK)) {
2849 nxt_router_gen_error(task, c, 500,
2850 "Failed to send message to application");
2851 return;
2852 }
2853}
2854

--- 357 unchanged lines hidden (view full) ---

3212 nxt_socket_conf_joint_t *joint;
3213
3214 c = obj;
3215
3216 nxt_debug(task, "router conn close done");
3217
3218 nxt_queue_each(rc, &c->requests, nxt_req_conn_link_t, link) {
3219
3089
3090 if (nxt_slow_path(res != NXT_OK)) {
3091 nxt_router_gen_error(task, c, 500,
3092 "Failed to send message to application");
3093 return;
3094 }
3095}
3096

--- 357 unchanged lines hidden (view full) ---

3454 nxt_socket_conf_joint_t *joint;
3455
3456 c = obj;
3457
3458 nxt_debug(task, "router conn close done");
3459
3460 nxt_queue_each(rc, &c->requests, nxt_req_conn_link_t, link) {
3461
3220 nxt_debug(task, "conn %p close, req %uxD", c, rc->req_id);
3462 nxt_debug(task, "conn %p close, stream #%uD", c, rc->stream);
3221
3222 if (rc->app_port != NULL) {
3223 nxt_router_app_release_port(task, rc->app_port, rc->app_port->app);
3224
3225 rc->app_port = NULL;
3226 }
3227
3463
3464 if (rc->app_port != NULL) {
3465 nxt_router_app_release_port(task, rc->app_port, rc->app_port->app);
3466
3467 rc->app_port = NULL;
3468 }
3469
3228 rc->conn = NULL;
3470 nxt_router_rc_unlink(rc);
3229
3471
3230 nxt_event_engine_request_remove(task->thread->engine, rc);
3472 nxt_port_rpc_cancel(task, task->thread->engine->port, rc->stream);
3231
3232 } nxt_queue_loop;
3233
3234 nxt_queue_remove(&c->link);
3235
3236 joint = c->listen->socket.data;
3237
3238 task = &task->thread->engine->task;

--- 37 unchanged lines hidden (view full) ---

3276 nxt_router_gen_error(task, c, 408, "Read header timeout");
3277
3278 } else {
3279 nxt_router_gen_error(task, c, 408, "Read body timeout");
3280 }
3281}
3282
3283
3473
3474 } nxt_queue_loop;
3475
3476 nxt_queue_remove(&c->link);
3477
3478 joint = c->listen->socket.data;
3479
3480 task = &task->thread->engine->task;

--- 37 unchanged lines hidden (view full) ---

3518 nxt_router_gen_error(task, c, 408, "Read header timeout");
3519
3520 } else {
3521 nxt_router_gen_error(task, c, 408, "Read body timeout");
3522 }
3523}
3524
3525
3526static void
3527nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
3528{
3529 nxt_conn_t *c;
3530 nxt_timer_t *timer;
3531
3532 timer = obj;
3533
3534 nxt_debug(task, "router app timeout");
3535
3536 c = nxt_read_timer_conn(timer);
3537
3538 nxt_router_gen_error(task, c, 408, "Application timeout");
3539}
3540
3541
3284static nxt_msec_t
3285nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data)
3286{
3287 nxt_socket_conf_joint_t *joint;
3288
3289 joint = c->listen->socket.data;
3290
3291 return nxt_value_at(nxt_msec_t, joint->socket_conf, data);
3292}
3542static nxt_msec_t
3543nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data)
3544{
3545 nxt_socket_conf_joint_t *joint;
3546
3547 joint = c->listen->socket.data;
3548
3549 return nxt_value_at(nxt_msec_t, joint->socket_conf, data);
3550}