Deleted
Added
nxt_router.c (160:bd2c565d412a) | nxt_router.c (163:e4d237f57e43) |
---|---|
1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8#include <nxt_router.h> --- 85 unchanged lines hidden (view full) --- 94 void *data); 95static void nxt_router_listen_socket_release(nxt_task_t *task, 96 nxt_socket_conf_joint_t *joint); 97static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, 98 void *data); 99static void nxt_router_conf_release(nxt_task_t *task, 100 nxt_socket_conf_joint_t *joint); 101 | 1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) Valentin V. Bartenev 5 * Copyright (C) NGINX, Inc. 6 */ 7 8#include <nxt_router.h> --- 85 unchanged lines hidden (view full) --- 94 void *data); 95static void nxt_router_listen_socket_release(nxt_task_t *task, 96 nxt_socket_conf_joint_t *joint); 97static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, 98 void *data); 99static void nxt_router_conf_release(nxt_task_t *task, 100 nxt_socket_conf_joint_t *joint); 101 |
102static nxt_bool_t nxt_router_app_free(nxt_app_t *app); |
|
102static nxt_port_t * nxt_router_app_get_port(nxt_app_t *app); 103static void nxt_router_app_release_port(nxt_task_t *task, void *obj, 104 void *data); 105 106static void nxt_router_sw_add(nxt_task_t *task, nxt_router_t *router, 107 nxt_start_worker_t *sw); 108static nxt_start_worker_t *nxt_router_sw_find_remove(nxt_task_t *task, 109 nxt_router_t *router, uint32_t id); --- 74 unchanged lines hidden (view full) --- 184 return; 185 } 186 187 sw = nxt_router_sw_find_remove(task, nxt_router, msg->port_msg.stream); 188 189 if (nxt_fast_path(sw != NULL)) { 190 msg->new_port->app = sw->app; 191 | 103static nxt_port_t * nxt_router_app_get_port(nxt_app_t *app); 104static void nxt_router_app_release_port(nxt_task_t *task, void *obj, 105 void *data); 106 107static void nxt_router_sw_add(nxt_task_t *task, nxt_router_t *router, 108 nxt_start_worker_t *sw); 109static nxt_start_worker_t *nxt_router_sw_find_remove(nxt_task_t *task, 110 nxt_router_t *router, uint32_t id); --- 74 unchanged lines hidden (view full) --- 185 return; 186 } 187 188 sw = nxt_router_sw_find_remove(task, nxt_router, msg->port_msg.stream); 189 190 if (nxt_fast_path(sw != NULL)) { 191 msg->new_port->app = sw->app; 192 |
193 sw->app->workers++; 194 195 nxt_assert(sw->app->pending_workers != 0); 196 197 sw->app->pending_workers--; 198 |
|
192 nxt_router_app_release_port(task, msg->new_port, sw->app); 193 194 sw->work.handler = nxt_router_sw_release; 195 196 nxt_debug(task, "post sw #%uxD release to %p", sw->stream, 197 sw->work.data); 198 199 nxt_event_engine_post(sw->work.data, &sw->work); --- 549 unchanged lines hidden (view full) --- 749 750 return app; 751} 752 753 754static nxt_socket_conf_t * 755nxt_router_socket_conf(nxt_task_t *task, nxt_mp_t *mp, nxt_sockaddr_t *sa) 756{ | 199 nxt_router_app_release_port(task, msg->new_port, sw->app); 200 201 sw->work.handler = nxt_router_sw_release; 202 203 nxt_debug(task, "post sw #%uxD release to %p", sw->stream, 204 sw->work.data); 205 206 nxt_event_engine_post(sw->work.data, &sw->work); --- 549 unchanged lines hidden (view full) --- 756 757 return app; 758} 759 760 761static nxt_socket_conf_t * 762nxt_router_socket_conf(nxt_task_t *task, nxt_mp_t *mp, nxt_sockaddr_t *sa) 763{ |
757 nxt_socket_conf_t *conf; | 764 nxt_socket_conf_t *skcf; |
758 | 765 |
759 conf = nxt_mp_zget(mp, sizeof(nxt_socket_conf_t)); 760 if (nxt_slow_path(conf == NULL)) { | 766 skcf = nxt_mp_zget(mp, sizeof(nxt_socket_conf_t)); 767 if (nxt_slow_path(skcf == NULL)) { |
761 return NULL; 762 } 763 | 768 return NULL; 769 } 770 |
764 conf->sockaddr = sa; | 771 skcf->sockaddr = sa; |
765 | 772 |
766 conf->listen.sockaddr = sa; 767 conf->listen.socklen = sa->socklen; 768 conf->listen.address_length = sa->length; | 773 skcf->listen.sockaddr = sa; 774 skcf->listen.socklen = sa->socklen; 775 skcf->listen.address_length = sa->length; |
769 | 776 |
770 conf->listen.socket = -1; 771 conf->listen.backlog = NXT_LISTEN_BACKLOG; 772 conf->listen.flags = NXT_NONBLOCK; 773 conf->listen.read_after_accept = 1; | 777 skcf->listen.socket = -1; 778 skcf->listen.backlog = NXT_LISTEN_BACKLOG; 779 skcf->listen.flags = NXT_NONBLOCK; 780 skcf->listen.read_after_accept = 1; |
774 | 781 |
775 return conf; | 782 return skcf; |
776} 777 778 779static void 780nxt_router_listen_sockets_sort(nxt_router_t *router, 781 nxt_router_temp_conf_t *tmcf) 782{ 783 nxt_queue_link_t *nqlk, *oqlk, *next; --- 390 unchanged lines hidden (view full) --- 1174 1175 return ret; 1176} 1177 1178 1179static void 1180nxt_router_apps_sort(nxt_router_t *router, nxt_router_temp_conf_t *tmcf) 1181{ | 783} 784 785 786static void 787nxt_router_listen_sockets_sort(nxt_router_t *router, 788 nxt_router_temp_conf_t *tmcf) 789{ 790 nxt_queue_link_t *nqlk, *oqlk, *next; --- 390 unchanged lines hidden (view full) --- 1181 1182 return ret; 1183} 1184 1185 1186static void 1187nxt_router_apps_sort(nxt_router_t *router, nxt_router_temp_conf_t *tmcf) 1188{ |
1182 nxt_app_t *app; | 1189 nxt_app_t *app; 1190 nxt_port_t *port; |
1183 1184 nxt_queue_each(app, &router->apps, nxt_app_t, link) { 1185 1186 nxt_queue_remove(&app->link); 1187 | 1191 1192 nxt_queue_each(app, &router->apps, nxt_app_t, link) { 1193 1194 nxt_queue_remove(&app->link); 1195 |
1188 // TODO RELEASE APP 1189#if 0 1190 nxt_thread_mutex_destroy(&app->mutex); 1191 nxt_free(app); 1192#endif | 1196 app->live = 0; 1197 1198 if (nxt_router_app_free(app) != 0) { 1199 continue; 1200 } 1201 1202 if (nxt_queue_is_empty(&app->requests)) { 1203 1204 do { 1205 port = nxt_router_app_get_port(app); 1206 if (port == NULL) { 1207 break; 1208 } 1209 1210 nxt_port_socket_write(&port->engine->task, port, 1211 NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); 1212 } while (1); 1213 1214 } 1215 |
1193 } nxt_queue_loop; 1194 1195 nxt_queue_add(&router->apps, &tmcf->previous); 1196 nxt_queue_add(&router->apps, &tmcf->apps); 1197} 1198 1199 1200static void --- 60 unchanged lines hidden (view full) --- 1261 engine->task.thread = thread; 1262 engine->task.log = thread->log; 1263 thread->engine = engine; 1264 thread->task = &engine->task; 1265 thread->fiber = &engine->fibers->fiber; 1266 1267 engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64); 1268 | 1216 } nxt_queue_loop; 1217 1218 nxt_queue_add(&router->apps, &tmcf->previous); 1219 nxt_queue_add(&router->apps, &tmcf->apps); 1220} 1221 1222 1223static void --- 60 unchanged lines hidden (view full) --- 1284 engine->task.thread = thread; 1285 engine->task.log = thread->log; 1286 thread->engine = engine; 1287 thread->task = &engine->task; 1288 thread->fiber = &engine->fibers->fiber; 1289 1290 engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64); 1291 |
1269 port = nxt_mp_zalloc(engine->mem_pool, sizeof(nxt_port_t)); | 1292 port = nxt_port_new(nxt_port_get_next_id(), nxt_pid, NXT_PROCESS_ROUTER); |
1270 if (nxt_slow_path(port == NULL)) { 1271 return; 1272 } 1273 | 1293 if (nxt_slow_path(port == NULL)) { 1294 return; 1295 } 1296 |
1274 port->id = nxt_port_get_next_id(); 1275 port->pid = nxt_pid; 1276 | |
1277 ret = nxt_port_socket_init(task, port, 0); 1278 if (nxt_slow_path(ret != NXT_OK)) { | 1297 ret = nxt_port_socket_init(task, port, 0); 1298 if (nxt_slow_path(ret != NXT_OK)) { |
1299 nxt_mp_release(port->mem_pool, port); |
|
1279 return; 1280 } 1281 | 1300 return; 1301 } 1302 |
1282 port->type = NXT_PROCESS_ROUTER; 1283 | |
1284 engine->port = port; 1285 1286 nxt_port_enable(task, port, nxt_router_app_port_handlers); 1287 1288 nxt_event_engine_start(engine); 1289} 1290 1291 --- 94 unchanged lines hidden (view full) --- 1386 skcf = data; 1387 1388 engine = task->thread->engine; 1389 1390 listen = nxt_router_listen_event(&engine->listen_connections, skcf); 1391 1392 nxt_fd_event_delete(engine, &listen->socket); 1393 | 1303 engine->port = port; 1304 1305 nxt_port_enable(task, port, nxt_router_app_port_handlers); 1306 1307 nxt_event_engine_start(engine); 1308} 1309 1310 --- 94 unchanged lines hidden (view full) --- 1405 skcf = data; 1406 1407 engine = task->thread->engine; 1408 1409 listen = nxt_router_listen_event(&engine->listen_connections, skcf); 1410 1411 nxt_fd_event_delete(engine, &listen->socket); 1412 |
1413 nxt_debug(task, "engine %p: listen socket delete: %d", engine, 1414 listen->socket.fd); 1415 |
|
1394 listen->timer.handler = nxt_router_listen_socket_close; 1395 listen->timer.work_queue = &engine->fast_work_queue; 1396 1397 nxt_timer_add(engine, &listen->timer, 0); 1398 1399 job->work.next = NULL; 1400 job->work.handler = nxt_router_conf_wait; 1401 --- 7 unchanged lines hidden (view full) --- 1409 nxt_timer_t *timer; 1410 nxt_listen_event_t *listen; 1411 nxt_socket_conf_joint_t *joint; 1412 1413 timer = obj; 1414 listen = nxt_timer_data(timer, nxt_listen_event_t, timer); 1415 joint = listen->socket.data; 1416 | 1416 listen->timer.handler = nxt_router_listen_socket_close; 1417 listen->timer.work_queue = &engine->fast_work_queue; 1418 1419 nxt_timer_add(engine, &listen->timer, 0); 1420 1421 job->work.next = NULL; 1422 job->work.handler = nxt_router_conf_wait; 1423 --- 7 unchanged lines hidden (view full) --- 1431 nxt_timer_t *timer; 1432 nxt_listen_event_t *listen; 1433 nxt_socket_conf_joint_t *joint; 1434 1435 timer = obj; 1436 listen = nxt_timer_data(timer, nxt_listen_event_t, timer); 1437 joint = listen->socket.data; 1438 |
1439 nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine, 1440 listen->socket.fd); 1441 |
|
1417 nxt_queue_remove(&listen->link); 1418 1419 /* 'task' refers to listen->task and we cannot use after nxt_free() */ 1420 task = &task->thread->engine->task; 1421 1422 nxt_free(listen); 1423 1424 nxt_router_listen_socket_release(task, joint); --- 9 unchanged lines hidden (view full) --- 1434 nxt_thread_spinlock_t *lock; 1435 1436 skcf = joint->socket_conf; 1437 rtsk = skcf->socket; 1438 lock = &skcf->router_conf->router->lock; 1439 1440 nxt_thread_spin_lock(lock); 1441 | 1442 nxt_queue_remove(&listen->link); 1443 1444 /* 'task' refers to listen->task and we cannot use after nxt_free() */ 1445 task = &task->thread->engine->task; 1446 1447 nxt_free(listen); 1448 1449 nxt_router_listen_socket_release(task, joint); --- 9 unchanged lines hidden (view full) --- 1459 nxt_thread_spinlock_t *lock; 1460 1461 skcf = joint->socket_conf; 1462 rtsk = skcf->socket; 1463 lock = &skcf->router_conf->router->lock; 1464 1465 nxt_thread_spin_lock(lock); 1466 |
1467 nxt_debug(task, "engine %p: listen socket release: rtsk->count %D", 1468 task->thread->engine, rtsk->count); 1469 |
|
1442 if (--rtsk->count != 0) { 1443 rtsk = NULL; 1444 } 1445 1446 nxt_thread_spin_unlock(lock); 1447 1448 if (rtsk != NULL) { 1449 nxt_socket_close(task, rtsk->fd); --- 8 unchanged lines hidden (view full) --- 1458static void 1459nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint) 1460{ 1461 nxt_bool_t exit; 1462 nxt_socket_conf_t *skcf; 1463 nxt_router_conf_t *rtcf; 1464 nxt_thread_spinlock_t *lock; 1465 | 1470 if (--rtsk->count != 0) { 1471 rtsk = NULL; 1472 } 1473 1474 nxt_thread_spin_unlock(lock); 1475 1476 if (rtsk != NULL) { 1477 nxt_socket_close(task, rtsk->fd); --- 8 unchanged lines hidden (view full) --- 1486static void 1487nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint) 1488{ 1489 nxt_bool_t exit; 1490 nxt_socket_conf_t *skcf; 1491 nxt_router_conf_t *rtcf; 1492 nxt_thread_spinlock_t *lock; 1493 |
1466 nxt_debug(task, "conf joint count: %D", joint->count); | 1494 nxt_debug(task, "conf joint %p count: %D", joint, joint->count); |
1467 1468 if (--joint->count != 0) { 1469 return; 1470 } 1471 1472 nxt_queue_remove(&joint->link); 1473 1474 skcf = joint->socket_conf; 1475 rtcf = skcf->router_conf; 1476 lock = &rtcf->router->lock; 1477 1478 nxt_thread_spin_lock(lock); 1479 | 1495 1496 if (--joint->count != 0) { 1497 return; 1498 } 1499 1500 nxt_queue_remove(&joint->link); 1501 1502 skcf = joint->socket_conf; 1503 rtcf = skcf->router_conf; 1504 lock = &rtcf->router->lock; 1505 1506 nxt_thread_spin_lock(lock); 1507 |
1508 nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count, 1509 rtcf, rtcf->count); 1510 |
|
1480 if (--skcf->count != 0) { 1481 rtcf = NULL; 1482 1483 } else { 1484 nxt_queue_remove(&skcf->link); 1485 1486 if (--rtcf->count != 0) { 1487 rtcf = NULL; --- 38 unchanged lines hidden (view full) --- 1526 engine = link->engine; 1527 1528 nxt_queue_remove(&engine->link); 1529 1530 port = engine->port; 1531 1532 // TODO notify all apps 1533 | 1511 if (--skcf->count != 0) { 1512 rtcf = NULL; 1513 1514 } else { 1515 nxt_queue_remove(&skcf->link); 1516 1517 if (--rtcf->count != 0) { 1518 rtcf = NULL; --- 38 unchanged lines hidden (view full) --- 1557 engine = link->engine; 1558 1559 nxt_queue_remove(&engine->link); 1560 1561 port = engine->port; 1562 1563 // TODO notify all apps 1564 |
1534 if (port->pair[0] != -1) { 1535 nxt_fd_close(port->pair[0]); 1536 } | 1565 nxt_mp_thread_adopt(port->mem_pool); 1566 nxt_port_release(port); |
1537 | 1567 |
1538 if (port->pair[1] != -1) { 1539 nxt_fd_close(port->pair[1]); 1540 } 1541 1542 if (port->mem_pool) { 1543 nxt_mp_destroy(port->mem_pool); 1544 } 1545 | 1568 nxt_mp_thread_adopt(engine->mem_pool); |
1546 nxt_mp_destroy(engine->mem_pool); 1547 1548 nxt_event_engine_free(engine); 1549 1550 nxt_free(link); 1551} 1552 1553 --- 124 unchanged lines hidden (view full) --- 1678 case 400: return "Bad request"; 1679 case 404: return "Not found"; 1680 case 403: return "Forbidden"; 1681 case 500: 1682 default: return "Internal server error"; 1683 } 1684} 1685 | 1569 nxt_mp_destroy(engine->mem_pool); 1570 1571 nxt_event_engine_free(engine); 1572 1573 nxt_free(link); 1574} 1575 1576 --- 124 unchanged lines hidden (view full) --- 1701 case 400: return "Bad request"; 1702 case 404: return "Not found"; 1703 case 403: return "Forbidden"; 1704 case 500: 1705 default: return "Internal server error"; 1706 } 1707} 1708 |
1686static void 1687nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code, 1688 const char* fmt, ...) | 1709 1710static nxt_buf_t * 1711nxt_router_get_error_buf(nxt_task_t *task, nxt_mp_t *mp, int code, 1712 const char* fmt, va_list args) |
1689{ | 1713{ |
1690 va_list args; 1691 nxt_buf_t *b, *last; 1692 const char *msg; | 1714 nxt_buf_t *b, *last; 1715 const char *msg; |
1693 | 1716 |
1694 b = nxt_buf_mem_alloc(c->mem_pool, 16384, 0); | 1717 b = nxt_buf_mem_ts_alloc(task, mp, 16384); |
1695 if (nxt_slow_path(b == NULL)) { | 1718 if (nxt_slow_path(b == NULL)) { |
1696 /* TODO pogorevaTb */ | 1719 return NULL; |
1697 } 1698 1699 b->mem.free = nxt_sprintf(b->mem.free, b->mem.end, 1700 "HTTP/1.0 %d %s\r\n" 1701 "Content-Type: text/plain\r\n" 1702 "Connection: close\r\n\r\n", 1703 code, nxt_router_text_by_code(code)); 1704 1705 msg = (const char *) b->mem.free; 1706 | 1720 } 1721 1722 b->mem.free = nxt_sprintf(b->mem.free, b->mem.end, 1723 "HTTP/1.0 %d %s\r\n" 1724 "Content-Type: text/plain\r\n" 1725 "Connection: close\r\n\r\n", 1726 code, nxt_router_text_by_code(code)); 1727 1728 msg = (const char *) b->mem.free; 1729 |
1707 va_start(args, fmt); | |
1708 b->mem.free = nxt_vsprintf(b->mem.free, b->mem.end, fmt, args); | 1730 b->mem.free = nxt_vsprintf(b->mem.free, b->mem.end, fmt, args); |
1709 va_end(args); | |
1710 1711 nxt_log_alert(task->log, "error %d: %s", code, msg); 1712 | 1731 1732 nxt_log_alert(task->log, "error %d: %s", code, msg); 1733 |
1713 last = nxt_buf_sync_alloc(c->mem_pool, NXT_BUF_SYNC_LAST); | 1734 last = nxt_buf_mem_ts_alloc(task, mp, 0); 1735 |
1714 if (nxt_slow_path(last == NULL)) { | 1736 if (nxt_slow_path(last == NULL)) { |
1715 /* TODO pogorevaTb */ | 1737 nxt_mp_release(mp, b); 1738 return NULL; |
1716 } 1717 | 1739 } 1740 |
1741 nxt_buf_set_sync(last); 1742 nxt_buf_set_last(last); 1743 |
|
1718 nxt_buf_chain_add(&b, last); 1719 | 1744 nxt_buf_chain_add(&b, last); 1745 |
1746 return b; 1747} 1748 1749 1750 1751static void 1752nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code, 1753 const char* fmt, ...) 1754{ 1755 va_list args; 1756 nxt_buf_t *b; 1757 1758 va_start(args, fmt); 1759 b = nxt_router_get_error_buf(task, c->mem_pool, code, fmt, args); 1760 va_end(args); 1761 |
|
1720 if (c->write == NULL) { 1721 c->write = b; 1722 c->write_state = &nxt_router_conn_write_state; 1723 1724 nxt_conn_write(task->thread->engine, c); 1725 } else { 1726 nxt_debug(task, "router data attach out bufs to existing chain"); 1727 --- 9 unchanged lines hidden (view full) --- 1737 nxt_app_t *app; 1738 nxt_port_t *port; 1739 nxt_runtime_t *rt; 1740 nxt_start_worker_t *sw; 1741 1742 sw = obj; 1743 app = sw->app; 1744 | 1762 if (c->write == NULL) { 1763 c->write = b; 1764 c->write_state = &nxt_router_conn_write_state; 1765 1766 nxt_conn_write(task->thread->engine, c); 1767 } else { 1768 nxt_debug(task, "router data attach out bufs to existing chain"); 1769 --- 9 unchanged lines hidden (view full) --- 1779 nxt_app_t *app; 1780 nxt_port_t *port; 1781 nxt_runtime_t *rt; 1782 nxt_start_worker_t *sw; 1783 1784 sw = obj; 1785 app = sw->app; 1786 |
1787 if (app->workers + app->pending_workers >= app->max_workers) { 1788 sw->work.handler = nxt_router_sw_release; 1789 1790 nxt_debug(task, "%uD/%uD running/penging workers, post sw #%uxD " 1791 "release to %p", sw->stream, sw->work.data); 1792 1793 nxt_event_engine_post(sw->work.data, &sw->work); 1794 1795 return; 1796 } 1797 1798 app->pending_workers++; 1799 |
|
1745 nxt_debug(task, "send sw #%uD", sw->stream); 1746 1747 nxt_router_sw_add(task, nxt_router, sw); 1748 nxt_queue_insert_tail(&app->requests, &sw->rc->app_link); 1749 1750 rt = task->thread->runtime; 1751 port = rt->port_by_type[NXT_PROCESS_MASTER]; 1752 1753 b = nxt_buf_mem_alloc(port->mem_pool, app->conf.length, 0); 1754 1755 nxt_buf_cpystr(b, &app->conf); 1756 1757 nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA, -1, sw->stream, 0, b); 1758} 1759 1760 | 1800 nxt_debug(task, "send sw #%uD", sw->stream); 1801 1802 nxt_router_sw_add(task, nxt_router, sw); 1803 nxt_queue_insert_tail(&app->requests, &sw->rc->app_link); 1804 1805 rt = task->thread->runtime; 1806 port = rt->port_by_type[NXT_PROCESS_MASTER]; 1807 1808 b = nxt_buf_mem_alloc(port->mem_pool, app->conf.length, 0); 1809 1810 nxt_buf_cpystr(b, &app->conf); 1811 1812 nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA, -1, sw->stream, 0, b); 1813} 1814 1815 |
1816static nxt_bool_t 1817nxt_router_app_free(nxt_app_t *app) 1818{ 1819 if (app->live == 0 && app->workers == 0 && 1820 app->pending_workers == 0 && 1821 nxt_queue_is_empty(&app->requests)) { 1822 1823 nxt_thread_mutex_destroy(&app->mutex); 1824 nxt_free(app); 1825 1826 return 1; 1827 } 1828 1829 return 0; 1830} 1831 1832 |
|
1761static nxt_port_t * 1762nxt_router_app_get_port(nxt_app_t *app) 1763{ 1764 nxt_port_t *port; 1765 nxt_queue_link_t *lnk; 1766 1767 port = NULL; 1768 --- 15 unchanged lines hidden (view full) --- 1784 1785 1786static void 1787nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data) 1788{ 1789 nxt_app_t *app; 1790 nxt_port_t *port; 1791 nxt_work_t *work; | 1833static nxt_port_t * 1834nxt_router_app_get_port(nxt_app_t *app) 1835{ 1836 nxt_port_t *port; 1837 nxt_queue_link_t *lnk; 1838 1839 port = NULL; 1840 --- 15 unchanged lines hidden (view full) --- 1856 1857 1858static void 1859nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data) 1860{ 1861 nxt_app_t *app; 1862 nxt_port_t *port; 1863 nxt_work_t *work; |
1864 nxt_process_t *process; |
|
1792 nxt_queue_link_t *lnk; 1793 nxt_req_conn_link_t *rc; 1794 1795 port = obj; 1796 app = data; 1797 1798 nxt_assert(app != NULL); 1799 nxt_assert(app == port->app); 1800 nxt_assert(port->app_link.next == NULL); 1801 1802 1803 if (task->thread->engine != port->engine) { | 1865 nxt_queue_link_t *lnk; 1866 nxt_req_conn_link_t *rc; 1867 1868 port = obj; 1869 app = data; 1870 1871 nxt_assert(app != NULL); 1872 nxt_assert(app == port->app); 1873 nxt_assert(port->app_link.next == NULL); 1874 1875 1876 if (task->thread->engine != port->engine) { |
1804 work = (nxt_work_t *) (port + 1); | 1877 work = &port->work; |
1805 1806 nxt_debug(task, "post release port to engine %p", port->engine); 1807 1808 work->next = NULL; 1809 work->handler = nxt_router_app_release_port; 1810 work->task = port->socket.task; 1811 work->obj = port; 1812 work->data = app; --- 4 unchanged lines hidden (view full) --- 1817 } 1818 1819 if (!nxt_queue_is_empty(&app->requests)) { 1820 lnk = nxt_queue_first(&app->requests); 1821 nxt_queue_remove(lnk); 1822 1823 rc = nxt_queue_link_data(lnk, nxt_req_conn_link_t, app_link); 1824 | 1878 1879 nxt_debug(task, "post release port to engine %p", port->engine); 1880 1881 work->next = NULL; 1882 work->handler = nxt_router_app_release_port; 1883 work->task = port->socket.task; 1884 work->obj = port; 1885 work->data = app; --- 4 unchanged lines hidden (view full) --- 1890 } 1891 1892 if (!nxt_queue_is_empty(&app->requests)) { 1893 lnk = nxt_queue_first(&app->requests); 1894 nxt_queue_remove(lnk); 1895 1896 rc = nxt_queue_link_data(lnk, nxt_req_conn_link_t, app_link); 1897 |
1825 nxt_debug(task, "process request #%uxD", rc->req_id); | 1898 nxt_debug(task, "app '%V' process next request #%uxD", 1899 &app->name, rc->req_id); |
1826 1827 rc->app_port = port; 1828 1829 nxt_router_process_http_request_mp(task, rc, rc->app_port->mem_pool); 1830 1831 return; 1832 } 1833 | 1900 1901 rc->app_port = port; 1902 1903 nxt_router_process_http_request_mp(task, rc, rc->app_port->mem_pool); 1904 1905 return; 1906 } 1907 |
1834 nxt_debug(task, "app requests queue is empty"); | 1908 if (port->pair[1] == -1) { 1909 nxt_debug(task, "app '%V' port already closed (pid %PI dead?)", 1910 &app->name, port->pid); |
1835 | 1911 |
1912 app->workers--; 1913 nxt_router_app_free(app); 1914 1915 port->app = NULL; 1916 process = port->process; 1917 1918 nxt_port_release(port); 1919 1920 if (nxt_queue_is_empty(&process->ports)) { 1921 nxt_runtime_process_destroy(task->thread->runtime, process); 1922 } 1923 1924 return; 1925 } 1926 1927 if (!app->live) { 1928 nxt_debug(task, "app '%V' is not alive, send QUIT to port", 1929 &app->name); 1930 1931 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, 1932 -1, 0, 0, NULL); 1933 1934 return; 1935 } 1936 1937 nxt_debug(task, "app '%V' requests queue is empty, keep the port", 1938 &app->name); 1939 |
|
1836 nxt_thread_mutex_lock(&app->mutex); 1837 1838 nxt_queue_insert_head(&app->ports, &port->app_link); 1839 1840 nxt_thread_mutex_unlock(&app->mutex); 1841} 1842 1843 | 1940 nxt_thread_mutex_lock(&app->mutex); 1941 1942 nxt_queue_insert_head(&app->ports, &port->app_link); 1943 1944 nxt_thread_mutex_unlock(&app->mutex); 1945} 1946 1947 |
1844void | 1948nxt_bool_t |
1845nxt_router_app_remove_port(nxt_port_t *port) 1846{ | 1949nxt_router_app_remove_port(nxt_port_t *port) 1950{ |
1847 nxt_app_t *app; | 1951 nxt_app_t *app; 1952 nxt_bool_t busy; |
1848 | 1953 |
1849 if (port->app_link.next == NULL) { 1850 return; 1851 } 1852 | |
1853 app = port->app; | 1954 app = port->app; |
1955 busy = 1; |
|
1854 | 1956 |
1855#if (NXT_DEBUG) 1856 if (nxt_slow_path(app == NULL)) { 1857 nxt_abort(); | 1957 if (app == NULL) { 1958 nxt_assert(port->app_link.next == NULL); 1959 1960 return 1; |
1858 } | 1961 } |
1859#endif | |
1860 1861 nxt_thread_mutex_lock(&app->mutex); 1862 | 1962 1963 nxt_thread_mutex_lock(&app->mutex); 1964 |
1863 nxt_queue_remove(&port->app_link); 1864 port->app_link.next = NULL; | 1965 if (port->app_link.next != NULL) { |
1865 | 1966 |
1967 nxt_queue_remove(&port->app_link); 1968 port->app_link.next = NULL; 1969 1970 busy = 0; 1971 } 1972 |
|
1866 nxt_thread_mutex_unlock(&app->mutex); | 1973 nxt_thread_mutex_unlock(&app->mutex); |
1974 1975 if (busy == 0) { 1976 1977 app->workers--; 1978 nxt_router_app_free(app); 1979 1980 return 1; 1981 } 1982 1983 return 0; |
|
1867} 1868 1869 1870nxt_inline nxt_int_t 1871nxt_router_app_port(nxt_task_t *task, nxt_req_conn_link_t *rc) 1872{ 1873 nxt_app_t *app; 1874 nxt_conn_t *c; --- 14 unchanged lines hidden (view full) --- 1889 "Application is NULL in socket_conf"); 1890 return NXT_ERROR; 1891 } 1892 1893 1894 port = nxt_router_app_get_port(app); 1895 1896 if (port != NULL) { | 1984} 1985 1986 1987nxt_inline nxt_int_t 1988nxt_router_app_port(nxt_task_t *task, nxt_req_conn_link_t *rc) 1989{ 1990 nxt_app_t *app; 1991 nxt_conn_t *c; --- 14 unchanged lines hidden (view full) --- 2006 "Application is NULL in socket_conf"); 2007 return NXT_ERROR; 2008 } 2009 2010 2011 port = nxt_router_app_get_port(app); 2012 2013 if (port != NULL) { |
2014 nxt_debug(task, "already have port for app '%V'", &app->name); 2015 |
|
1897 rc->app_port = port; 1898 return NXT_OK; 1899 } 1900 | 2016 rc->app_port = port; 2017 return NXT_OK; 2018 } 2019 |
1901 | |
1902 sw = nxt_mp_retain(c->mem_pool, sizeof(nxt_start_worker_t)); 1903 1904 if (nxt_slow_path(sw == NULL)) { 1905 nxt_router_gen_error(task, rc->conn, 500, 1906 "Failed to allocate start worker struct"); 1907 return NXT_ERROR; 1908 } 1909 --- 154 unchanged lines hidden (view full) --- 2064 2065 if (nxt_slow_path(rc == NULL)) { 2066 nxt_router_gen_error(task, c, 500, "Failed to allocate " 2067 "req->conn link"); 2068 2069 return; 2070 } 2071 | 2020 sw = nxt_mp_retain(c->mem_pool, sizeof(nxt_start_worker_t)); 2021 2022 if (nxt_slow_path(sw == NULL)) { 2023 nxt_router_gen_error(task, rc->conn, 500, 2024 "Failed to allocate start worker struct"); 2025 return NXT_ERROR; 2026 } 2027 --- 154 unchanged lines hidden (view full) --- 2182 2183 if (nxt_slow_path(rc == NULL)) { 2184 nxt_router_gen_error(task, c, 500, "Failed to allocate " 2185 "req->conn link"); 2186 2187 return; 2188 } 2189 |
2190 rc->ap = ap; 2191 |
|
2072 nxt_event_engine_request_add(engine, rc); 2073 2074 nxt_debug(task, "req_id %uxD linked to conn %p at engine %p", 2075 req_id, c, engine); 2076 2077 rc->reply_port = engine->port; 2078 2079 res = nxt_router_app_port(task, rc); --- 19 unchanged lines hidden (view full) --- 2099 port = rc->app_port; 2100 2101 if (nxt_slow_path(port == NULL)) { 2102 nxt_router_gen_error(task, rc->conn, 500, "Application port not found"); 2103 return; 2104 } 2105 2106 reply_port = rc->reply_port; | 2192 nxt_event_engine_request_add(engine, rc); 2193 2194 nxt_debug(task, "req_id %uxD linked to conn %p at engine %p", 2195 req_id, c, engine); 2196 2197 rc->reply_port = engine->port; 2198 2199 res = nxt_router_app_port(task, rc); --- 19 unchanged lines hidden (view full) --- 2219 port = rc->app_port; 2220 2221 if (nxt_slow_path(port == NULL)) { 2222 nxt_router_gen_error(task, rc->conn, 500, "Application port not found"); 2223 return; 2224 } 2225 2226 reply_port = rc->reply_port; |
2107 ap = rc->conn->socket.data; | 2227 ap = rc->ap; |
2108 2109 port_mp = port->mem_pool; 2110 port->mem_pool = mp; 2111 2112 c_port = nxt_process_connected_port_find(port->process, reply_port->pid, 2113 reply_port->id); 2114 if (nxt_slow_path(c_port != reply_port)) { 2115 res = nxt_port_send_port(task, port, reply_port, 0); --- 263 unchanged lines hidden --- | 2228 2229 port_mp = port->mem_pool; 2230 port->mem_pool = mp; 2231 2232 c_port = nxt_process_connected_port_find(port->process, reply_port->pid, 2233 reply_port->id); 2234 if (nxt_slow_path(c_port != reply_port)) { 2235 res = nxt_port_send_port(task, port, reply_port, 0); --- 263 unchanged lines hidden --- |