nxt_router.c (119:22bc18e61479) nxt_router.c (122:d18727e877c6)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>

--- 774 unchanged lines hidden (view full) ---

783 return NXT_OK;
784}
785
786
787static nxt_int_t
788nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
789 nxt_event_engine_t *engine)
790{
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>

--- 774 unchanged lines hidden (view full) ---

783 return NXT_OK;
784}
785
786
787static nxt_int_t
788nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
789 nxt_event_engine_t *engine)
790{
791 nxt_mp_t *mp;
792 nxt_int_t ret;
793 nxt_port_t *port;
794 nxt_process_t *process;
795 nxt_thread_link_t *link;
796 nxt_thread_handle_t handle;
797
798 link = nxt_zalloc(sizeof(nxt_thread_link_t));
799

--- 19 unchanged lines hidden (view full) ---

819 return NXT_ERROR;
820 }
821
822 ret = nxt_port_socket_init(task, port, 0);
823 if (nxt_slow_path(ret != NXT_OK)) {
824 return ret;
825 }
826
791 nxt_int_t ret;
792 nxt_port_t *port;
793 nxt_process_t *process;
794 nxt_thread_link_t *link;
795 nxt_thread_handle_t handle;
796
797 link = nxt_zalloc(sizeof(nxt_thread_link_t));
798

--- 19 unchanged lines hidden (view full) ---

818 return NXT_ERROR;
819 }
820
821 ret = nxt_port_socket_init(task, port, 0);
822 if (nxt_slow_path(ret != NXT_OK)) {
823 return ret;
824 }
825
827 mp = nxt_mp_create(1024, 128, 256, 32);
828 if (nxt_slow_path(mp == NULL)) {
829 return NXT_ERROR;
830 }
831
832 port->mem_pool = mp;
833 port->engine = 0;
834 port->type = NXT_PROCESS_ROUTER;
835
836 engine->port = port;
837
838 nxt_runtime_port_add(rt, port);
839
840 ret = nxt_thread_create(&handle, link);

--- 595 unchanged lines hidden (view full) ---

1436 nxt_conn_read(task->thread->engine, c);
1437}
1438
1439
1440static void
1441nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
1442 nxt_app_parse_ctx_t *ap)
1443{
826 port->engine = 0;
827 port->type = NXT_PROCESS_ROUTER;
828
829 engine->port = port;
830
831 nxt_runtime_port_add(rt, port);
832
833 ret = nxt_thread_create(&handle, link);

--- 595 unchanged lines hidden (view full) ---

1429 nxt_conn_read(task->thread->engine, c);
1430}
1431
1432
1433static void
1434nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
1435 nxt_app_parse_ctx_t *ap)
1436{
1437 nxt_mp_t *port_mp;
1438 nxt_int_t res;
1444 nxt_port_t *port, *c_port;
1445 nxt_req_id_t req_id;
1446 nxt_app_wmsg_t wmsg;
1447 nxt_event_engine_t *engine;
1448 nxt_req_conn_link_t *rc;
1449
1450 if (nxt_slow_path(nxt_app == NULL)) {
1451 // 500 Application not found

--- 9 unchanged lines hidden (view full) ---

1461
1462 engine = task->thread->engine;
1463
1464 do {
1465 req_id = nxt_random(&nxt_random_data);
1466 } while (nxt_event_engine_request_find(engine, req_id) != NULL);
1467
1468 rc = nxt_conn_request_add(c, req_id);
1439 nxt_port_t *port, *c_port;
1440 nxt_req_id_t req_id;
1441 nxt_app_wmsg_t wmsg;
1442 nxt_event_engine_t *engine;
1443 nxt_req_conn_link_t *rc;
1444
1445 if (nxt_slow_path(nxt_app == NULL)) {
1446 // 500 Application not found

--- 9 unchanged lines hidden (view full) ---

1456
1457 engine = task->thread->engine;
1458
1459 do {
1460 req_id = nxt_random(&nxt_random_data);
1461 } while (nxt_event_engine_request_find(engine, req_id) != NULL);
1462
1463 rc = nxt_conn_request_add(c, req_id);
1464
1469 if (nxt_slow_path(rc == NULL)) {
1470 // 500 Failed to allocate req->conn link
1471 nxt_log_alert(task->log, "failed to allocate req->conn link");
1472 }
1473
1474 nxt_event_engine_request_add(engine, rc);
1475
1476 nxt_debug(task, "req_id %uxD linked to conn %p at engine %p",
1477 req_id, c, engine);
1478
1465 if (nxt_slow_path(rc == NULL)) {
1466 // 500 Failed to allocate req->conn link
1467 nxt_log_alert(task->log, "failed to allocate req->conn link");
1468 }
1469
1470 nxt_event_engine_request_add(engine, rc);
1471
1472 nxt_debug(task, "req_id %uxD linked to conn %p at engine %p",
1473 req_id, c, engine);
1474
1475 port_mp = port->mem_pool;
1476 port->mem_pool = c->mem_pool;
1477
1479 c_port = nxt_process_connected_port_find(port->process,
1480 engine->port->pid,
1481 engine->port->id);
1482 if (nxt_slow_path(c_port != engine->port)) {
1478 c_port = nxt_process_connected_port_find(port->process,
1479 engine->port->pid,
1480 engine->port->id);
1481 if (nxt_slow_path(c_port != engine->port)) {
1483 (void) nxt_port_send_port(task, port, engine->port);
1482 res = nxt_port_send_port(task, port, engine->port);
1483
1484 if (nxt_slow_path(res != NXT_OK)) {
1485 // 500 Failed to send reply port
1486 nxt_log_alert(task->log, "failed to send reply port to application");
1487 }
1488
1484 nxt_process_connected_port_add(port->process, engine->port);
1485 }
1486
1487 wmsg.port = port;
1488 wmsg.write = NULL;
1489 wmsg.buf = &wmsg.write;
1490 wmsg.stream = req_id;
1491
1489 nxt_process_connected_port_add(port->process, engine->port);
1490 }
1491
1492 wmsg.port = port;
1493 wmsg.write = NULL;
1494 wmsg.buf = &wmsg.write;
1495 wmsg.stream = req_id;
1496
1492 (void)nxt_app->prepare_msg(task, &ap->r, &wmsg);
1497 res = nxt_app->prepare_msg(task, &ap->r, &wmsg);
1493
1498
1499 if (nxt_slow_path(res != NXT_OK)) {
1500 // 500 Failed to prepare message
1501 nxt_log_alert(task->log, "failed to prepare message for application");
1502 }
1503
1494 nxt_debug(task, "about to send %d bytes buffer to worker port %d",
1495 nxt_buf_used_size(wmsg.write),
1496 wmsg.port->socket.fd);
1497
1504 nxt_debug(task, "about to send %d bytes buffer to worker port %d",
1505 nxt_buf_used_size(wmsg.write),
1506 wmsg.port->socket.fd);
1507
1498 (void) nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA,
1508 res = nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA,
1499 -1, req_id, engine->port->id, wmsg.write);
1509 -1, req_id, engine->port->id, wmsg.write);
1510
1511 if (nxt_slow_path(res != NXT_OK)) {
1512 // 500 Failed to send message
1513 nxt_log_alert(task->log, "failed to send message to application");
1514 }
1515
1516 port->mem_pool = port_mp;
1500}
1501
1502
1503static const nxt_conn_state_t nxt_router_conn_close_state
1504 nxt_aligned(64) =
1505{
1506 .ready_handler = nxt_router_conn_free,
1507};

--- 84 unchanged lines hidden (view full) ---

1592 nxt_queue_each(rc, &c->requests, nxt_req_conn_link_t, link) {
1593
1594 nxt_debug(task, "conn %p close, req %uxD", c, rc->req_id);
1595
1596 nxt_event_engine_request_remove(task->thread->engine, rc);
1597
1598 } nxt_queue_loop;
1599
1517}
1518
1519
1520static const nxt_conn_state_t nxt_router_conn_close_state
1521 nxt_aligned(64) =
1522{
1523 .ready_handler = nxt_router_conn_free,
1524};

--- 84 unchanged lines hidden (view full) ---

1609 nxt_queue_each(rc, &c->requests, nxt_req_conn_link_t, link) {
1610
1611 nxt_debug(task, "conn %p close, req %uxD", c, rc->req_id);
1612
1613 nxt_event_engine_request_remove(task->thread->engine, rc);
1614
1615 } nxt_queue_loop;
1616
1600 nxt_mp_destroy(c->mem_pool);
1617 nxt_queue_remove(&c->link);
1618
1619 nxt_mp_release(c->mem_pool, c);
1601}
1602
1603
1604static void
1605nxt_router_conn_error(nxt_task_t *task, void *obj, void *data)
1606{
1607 nxt_conn_t *c;
1608

--- 37 unchanged lines hidden ---
1620}
1621
1622
1623static void
1624nxt_router_conn_error(nxt_task_t *task, void *obj, void *data)
1625{
1626 nxt_conn_t *c;
1627

--- 37 unchanged lines hidden ---