Deleted
Added
1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8#include <nxt_router.h> --- 774 unchanged lines hidden (view full) --- 783 return NXT_OK; 784} 785 786 787static nxt_int_t 788nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt, 789 nxt_event_engine_t *engine) 790{ |
791 nxt_int_t ret; 792 nxt_port_t *port; 793 nxt_process_t *process; 794 nxt_thread_link_t *link; 795 nxt_thread_handle_t handle; 796 797 link = nxt_zalloc(sizeof(nxt_thread_link_t)); 798 --- 19 unchanged lines hidden (view full) --- 818 return NXT_ERROR; 819 } 820 821 ret = nxt_port_socket_init(task, port, 0); 822 if (nxt_slow_path(ret != NXT_OK)) { 823 return ret; 824 } 825 |
826 port->engine = 0; 827 port->type = NXT_PROCESS_ROUTER; 828 829 engine->port = port; 830 831 nxt_runtime_port_add(rt, port); 832 833 ret = nxt_thread_create(&handle, link); --- 595 unchanged lines hidden (view full) --- 1429 nxt_conn_read(task->thread->engine, c); 1430} 1431 1432 1433static void 1434nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, 1435 nxt_app_parse_ctx_t *ap) 1436{ |
1437 nxt_mp_t *port_mp; 1438 nxt_int_t res; |
1439 nxt_port_t *port, *c_port; 1440 nxt_req_id_t req_id; 1441 nxt_app_wmsg_t wmsg; 1442 nxt_event_engine_t *engine; 1443 nxt_req_conn_link_t *rc; 1444 1445 if (nxt_slow_path(nxt_app == NULL)) { 1446 // 500 Application not found --- 9 unchanged lines hidden (view full) --- 1456 1457 engine = task->thread->engine; 1458 1459 do { 1460 req_id = nxt_random(&nxt_random_data); 1461 } while (nxt_event_engine_request_find(engine, req_id) != NULL); 1462 1463 rc = nxt_conn_request_add(c, req_id); |
1464 |
1465 if (nxt_slow_path(rc == NULL)) { 1466 // 500 Failed to allocate req->conn link 1467 nxt_log_alert(task->log, "failed to allocate req->conn link"); 1468 } 1469 1470 nxt_event_engine_request_add(engine, rc); 1471 1472 nxt_debug(task, "req_id %uxD linked to conn %p at engine %p", 1473 req_id, c, engine); 1474 |
1475 port_mp = port->mem_pool; 1476 port->mem_pool = c->mem_pool; 1477 |
1478 c_port = nxt_process_connected_port_find(port->process, 1479 engine->port->pid, 1480 engine->port->id); 1481 if (nxt_slow_path(c_port != engine->port)) { |
1482 res = nxt_port_send_port(task, port, engine->port); 1483 1484 if (nxt_slow_path(res != NXT_OK)) { 1485 // 500 Failed to send reply port 1486 nxt_log_alert(task->log, "failed to send reply port to application"); 1487 } 1488 |
1489 nxt_process_connected_port_add(port->process, engine->port); 1490 } 1491 1492 wmsg.port = port; 1493 wmsg.write = NULL; 1494 wmsg.buf = &wmsg.write; 1495 wmsg.stream = req_id; 1496 |
1497 res = nxt_app->prepare_msg(task, &ap->r, &wmsg); |
1498 |
1499 if (nxt_slow_path(res != NXT_OK)) { 1500 // 500 Failed to prepare message 1501 nxt_log_alert(task->log, "failed to prepare message for application"); 1502 } 1503 |
1504 nxt_debug(task, "about to send %d bytes buffer to worker port %d", 1505 nxt_buf_used_size(wmsg.write), 1506 wmsg.port->socket.fd); 1507 |
1508 res = nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA, |
1509 -1, req_id, engine->port->id, wmsg.write); |
1510 1511 if (nxt_slow_path(res != NXT_OK)) { 1512 // 500 Failed to send message 1513 nxt_log_alert(task->log, "failed to send message to application"); 1514 } 1515 1516 port->mem_pool = port_mp; |
1517} 1518 1519 1520static const nxt_conn_state_t nxt_router_conn_close_state 1521 nxt_aligned(64) = 1522{ 1523 .ready_handler = nxt_router_conn_free, 1524}; --- 84 unchanged lines hidden (view full) --- 1609 nxt_queue_each(rc, &c->requests, nxt_req_conn_link_t, link) { 1610 1611 nxt_debug(task, "conn %p close, req %uxD", c, rc->req_id); 1612 1613 nxt_event_engine_request_remove(task->thread->engine, rc); 1614 1615 } nxt_queue_loop; 1616 |
1617 nxt_queue_remove(&c->link); 1618 1619 nxt_mp_release(c->mem_pool, c); |
1620} 1621 1622 1623static void 1624nxt_router_conn_error(nxt_task_t *task, void *obj, void *data) 1625{ 1626 nxt_conn_t *c; 1627 --- 37 unchanged lines hidden --- |