nxt_router.c (160:bd2c565d412a) nxt_router.c (163:e4d237f57e43)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>

--- 85 unchanged lines hidden (view full) ---

94 void *data);
95static void nxt_router_listen_socket_release(nxt_task_t *task,
96 nxt_socket_conf_joint_t *joint);
97static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
98 void *data);
99static void nxt_router_conf_release(nxt_task_t *task,
100 nxt_socket_conf_joint_t *joint);
101
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>

--- 85 unchanged lines hidden (view full) ---

94 void *data);
95static void nxt_router_listen_socket_release(nxt_task_t *task,
96 nxt_socket_conf_joint_t *joint);
97static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
98 void *data);
99static void nxt_router_conf_release(nxt_task_t *task,
100 nxt_socket_conf_joint_t *joint);
101
102static nxt_bool_t nxt_router_app_free(nxt_app_t *app);
102static nxt_port_t * nxt_router_app_get_port(nxt_app_t *app);
103static void nxt_router_app_release_port(nxt_task_t *task, void *obj,
104 void *data);
105
106static void nxt_router_sw_add(nxt_task_t *task, nxt_router_t *router,
107 nxt_start_worker_t *sw);
108static nxt_start_worker_t *nxt_router_sw_find_remove(nxt_task_t *task,
109 nxt_router_t *router, uint32_t id);

--- 74 unchanged lines hidden (view full) ---

184 return;
185 }
186
187 sw = nxt_router_sw_find_remove(task, nxt_router, msg->port_msg.stream);
188
189 if (nxt_fast_path(sw != NULL)) {
190 msg->new_port->app = sw->app;
191
103static nxt_port_t * nxt_router_app_get_port(nxt_app_t *app);
104static void nxt_router_app_release_port(nxt_task_t *task, void *obj,
105 void *data);
106
107static void nxt_router_sw_add(nxt_task_t *task, nxt_router_t *router,
108 nxt_start_worker_t *sw);
109static nxt_start_worker_t *nxt_router_sw_find_remove(nxt_task_t *task,
110 nxt_router_t *router, uint32_t id);

--- 74 unchanged lines hidden (view full) ---

185 return;
186 }
187
188 sw = nxt_router_sw_find_remove(task, nxt_router, msg->port_msg.stream);
189
190 if (nxt_fast_path(sw != NULL)) {
191 msg->new_port->app = sw->app;
192
193 sw->app->workers++;
194
195 nxt_assert(sw->app->pending_workers != 0);
196
197 sw->app->pending_workers--;
198
192 nxt_router_app_release_port(task, msg->new_port, sw->app);
193
194 sw->work.handler = nxt_router_sw_release;
195
196 nxt_debug(task, "post sw #%uxD release to %p", sw->stream,
197 sw->work.data);
198
199 nxt_event_engine_post(sw->work.data, &sw->work);

--- 549 unchanged lines hidden (view full) ---

749
750 return app;
751}
752
753
754static nxt_socket_conf_t *
755nxt_router_socket_conf(nxt_task_t *task, nxt_mp_t *mp, nxt_sockaddr_t *sa)
756{
199 nxt_router_app_release_port(task, msg->new_port, sw->app);
200
201 sw->work.handler = nxt_router_sw_release;
202
203 nxt_debug(task, "post sw #%uxD release to %p", sw->stream,
204 sw->work.data);
205
206 nxt_event_engine_post(sw->work.data, &sw->work);

--- 549 unchanged lines hidden (view full) ---

756
757 return app;
758}
759
760
761static nxt_socket_conf_t *
762nxt_router_socket_conf(nxt_task_t *task, nxt_mp_t *mp, nxt_sockaddr_t *sa)
763{
757 nxt_socket_conf_t *conf;
764 nxt_socket_conf_t *skcf;
758
765
759 conf = nxt_mp_zget(mp, sizeof(nxt_socket_conf_t));
760 if (nxt_slow_path(conf == NULL)) {
766 skcf = nxt_mp_zget(mp, sizeof(nxt_socket_conf_t));
767 if (nxt_slow_path(skcf == NULL)) {
761 return NULL;
762 }
763
768 return NULL;
769 }
770
764 conf->sockaddr = sa;
771 skcf->sockaddr = sa;
765
772
766 conf->listen.sockaddr = sa;
767 conf->listen.socklen = sa->socklen;
768 conf->listen.address_length = sa->length;
773 skcf->listen.sockaddr = sa;
774 skcf->listen.socklen = sa->socklen;
775 skcf->listen.address_length = sa->length;
769
776
770 conf->listen.socket = -1;
771 conf->listen.backlog = NXT_LISTEN_BACKLOG;
772 conf->listen.flags = NXT_NONBLOCK;
773 conf->listen.read_after_accept = 1;
777 skcf->listen.socket = -1;
778 skcf->listen.backlog = NXT_LISTEN_BACKLOG;
779 skcf->listen.flags = NXT_NONBLOCK;
780 skcf->listen.read_after_accept = 1;
774
781
775 return conf;
782 return skcf;
776}
777
778
779static void
780nxt_router_listen_sockets_sort(nxt_router_t *router,
781 nxt_router_temp_conf_t *tmcf)
782{
783 nxt_queue_link_t *nqlk, *oqlk, *next;

--- 390 unchanged lines hidden (view full) ---

1174
1175 return ret;
1176}
1177
1178
1179static void
1180nxt_router_apps_sort(nxt_router_t *router, nxt_router_temp_conf_t *tmcf)
1181{
783}
784
785
786static void
787nxt_router_listen_sockets_sort(nxt_router_t *router,
788 nxt_router_temp_conf_t *tmcf)
789{
790 nxt_queue_link_t *nqlk, *oqlk, *next;

--- 390 unchanged lines hidden (view full) ---

1181
1182 return ret;
1183}
1184
1185
1186static void
1187nxt_router_apps_sort(nxt_router_t *router, nxt_router_temp_conf_t *tmcf)
1188{
1182 nxt_app_t *app;
1189 nxt_app_t *app;
1190 nxt_port_t *port;
1183
1184 nxt_queue_each(app, &router->apps, nxt_app_t, link) {
1185
1186 nxt_queue_remove(&app->link);
1187
1191
1192 nxt_queue_each(app, &router->apps, nxt_app_t, link) {
1193
1194 nxt_queue_remove(&app->link);
1195
1188 // TODO RELEASE APP
1189#if 0
1190 nxt_thread_mutex_destroy(&app->mutex);
1191 nxt_free(app);
1192#endif
1196 app->live = 0;
1197
1198 if (nxt_router_app_free(app) != 0) {
1199 continue;
1200 }
1201
1202 if (nxt_queue_is_empty(&app->requests)) {
1203
1204 do {
1205 port = nxt_router_app_get_port(app);
1206 if (port == NULL) {
1207 break;
1208 }
1209
1210 nxt_port_socket_write(&port->engine->task, port,
1211 NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
1212 } while (1);
1213
1214 }
1215
1193 } nxt_queue_loop;
1194
1195 nxt_queue_add(&router->apps, &tmcf->previous);
1196 nxt_queue_add(&router->apps, &tmcf->apps);
1197}
1198
1199
1200static void

--- 60 unchanged lines hidden (view full) ---

1261 engine->task.thread = thread;
1262 engine->task.log = thread->log;
1263 thread->engine = engine;
1264 thread->task = &engine->task;
1265 thread->fiber = &engine->fibers->fiber;
1266
1267 engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
1268
1216 } nxt_queue_loop;
1217
1218 nxt_queue_add(&router->apps, &tmcf->previous);
1219 nxt_queue_add(&router->apps, &tmcf->apps);
1220}
1221
1222
1223static void

--- 60 unchanged lines hidden (view full) ---

1284 engine->task.thread = thread;
1285 engine->task.log = thread->log;
1286 thread->engine = engine;
1287 thread->task = &engine->task;
1288 thread->fiber = &engine->fibers->fiber;
1289
1290 engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
1291
1269 port = nxt_mp_zalloc(engine->mem_pool, sizeof(nxt_port_t));
1292 port = nxt_port_new(nxt_port_get_next_id(), nxt_pid, NXT_PROCESS_ROUTER);
1270 if (nxt_slow_path(port == NULL)) {
1271 return;
1272 }
1273
1293 if (nxt_slow_path(port == NULL)) {
1294 return;
1295 }
1296
1274 port->id = nxt_port_get_next_id();
1275 port->pid = nxt_pid;
1276
1277 ret = nxt_port_socket_init(task, port, 0);
1278 if (nxt_slow_path(ret != NXT_OK)) {
1297 ret = nxt_port_socket_init(task, port, 0);
1298 if (nxt_slow_path(ret != NXT_OK)) {
1299 nxt_mp_release(port->mem_pool, port);
1279 return;
1280 }
1281
1300 return;
1301 }
1302
1282 port->type = NXT_PROCESS_ROUTER;
1283
1284 engine->port = port;
1285
1286 nxt_port_enable(task, port, nxt_router_app_port_handlers);
1287
1288 nxt_event_engine_start(engine);
1289}
1290
1291

--- 94 unchanged lines hidden (view full) ---

1386 skcf = data;
1387
1388 engine = task->thread->engine;
1389
1390 listen = nxt_router_listen_event(&engine->listen_connections, skcf);
1391
1392 nxt_fd_event_delete(engine, &listen->socket);
1393
1303 engine->port = port;
1304
1305 nxt_port_enable(task, port, nxt_router_app_port_handlers);
1306
1307 nxt_event_engine_start(engine);
1308}
1309
1310

--- 94 unchanged lines hidden (view full) ---

1405 skcf = data;
1406
1407 engine = task->thread->engine;
1408
1409 listen = nxt_router_listen_event(&engine->listen_connections, skcf);
1410
1411 nxt_fd_event_delete(engine, &listen->socket);
1412
1413 nxt_debug(task, "engine %p: listen socket delete: %d", engine,
1414 listen->socket.fd);
1415
1394 listen->timer.handler = nxt_router_listen_socket_close;
1395 listen->timer.work_queue = &engine->fast_work_queue;
1396
1397 nxt_timer_add(engine, &listen->timer, 0);
1398
1399 job->work.next = NULL;
1400 job->work.handler = nxt_router_conf_wait;
1401

--- 7 unchanged lines hidden (view full) ---

1409 nxt_timer_t *timer;
1410 nxt_listen_event_t *listen;
1411 nxt_socket_conf_joint_t *joint;
1412
1413 timer = obj;
1414 listen = nxt_timer_data(timer, nxt_listen_event_t, timer);
1415 joint = listen->socket.data;
1416
1416 listen->timer.handler = nxt_router_listen_socket_close;
1417 listen->timer.work_queue = &engine->fast_work_queue;
1418
1419 nxt_timer_add(engine, &listen->timer, 0);
1420
1421 job->work.next = NULL;
1422 job->work.handler = nxt_router_conf_wait;
1423

--- 7 unchanged lines hidden (view full) ---

1431 nxt_timer_t *timer;
1432 nxt_listen_event_t *listen;
1433 nxt_socket_conf_joint_t *joint;
1434
1435 timer = obj;
1436 listen = nxt_timer_data(timer, nxt_listen_event_t, timer);
1437 joint = listen->socket.data;
1438
1439 nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine,
1440 listen->socket.fd);
1441
1417 nxt_queue_remove(&listen->link);
1418
1419 /* 'task' refers to listen->task and we cannot use after nxt_free() */
1420 task = &task->thread->engine->task;
1421
1422 nxt_free(listen);
1423
1424 nxt_router_listen_socket_release(task, joint);

--- 9 unchanged lines hidden (view full) ---

1434 nxt_thread_spinlock_t *lock;
1435
1436 skcf = joint->socket_conf;
1437 rtsk = skcf->socket;
1438 lock = &skcf->router_conf->router->lock;
1439
1440 nxt_thread_spin_lock(lock);
1441
1442 nxt_queue_remove(&listen->link);
1443
1444 /* 'task' refers to listen->task and we cannot use after nxt_free() */
1445 task = &task->thread->engine->task;
1446
1447 nxt_free(listen);
1448
1449 nxt_router_listen_socket_release(task, joint);

--- 9 unchanged lines hidden (view full) ---

1459 nxt_thread_spinlock_t *lock;
1460
1461 skcf = joint->socket_conf;
1462 rtsk = skcf->socket;
1463 lock = &skcf->router_conf->router->lock;
1464
1465 nxt_thread_spin_lock(lock);
1466
1467 nxt_debug(task, "engine %p: listen socket release: rtsk->count %D",
1468 task->thread->engine, rtsk->count);
1469
1442 if (--rtsk->count != 0) {
1443 rtsk = NULL;
1444 }
1445
1446 nxt_thread_spin_unlock(lock);
1447
1448 if (rtsk != NULL) {
1449 nxt_socket_close(task, rtsk->fd);

--- 8 unchanged lines hidden (view full) ---

1458static void
1459nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
1460{
1461 nxt_bool_t exit;
1462 nxt_socket_conf_t *skcf;
1463 nxt_router_conf_t *rtcf;
1464 nxt_thread_spinlock_t *lock;
1465
1470 if (--rtsk->count != 0) {
1471 rtsk = NULL;
1472 }
1473
1474 nxt_thread_spin_unlock(lock);
1475
1476 if (rtsk != NULL) {
1477 nxt_socket_close(task, rtsk->fd);

--- 8 unchanged lines hidden (view full) ---

1486static void
1487nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
1488{
1489 nxt_bool_t exit;
1490 nxt_socket_conf_t *skcf;
1491 nxt_router_conf_t *rtcf;
1492 nxt_thread_spinlock_t *lock;
1493
1466 nxt_debug(task, "conf joint count: %D", joint->count);
1494 nxt_debug(task, "conf joint %p count: %D", joint, joint->count);
1467
1468 if (--joint->count != 0) {
1469 return;
1470 }
1471
1472 nxt_queue_remove(&joint->link);
1473
1474 skcf = joint->socket_conf;
1475 rtcf = skcf->router_conf;
1476 lock = &rtcf->router->lock;
1477
1478 nxt_thread_spin_lock(lock);
1479
1495
1496 if (--joint->count != 0) {
1497 return;
1498 }
1499
1500 nxt_queue_remove(&joint->link);
1501
1502 skcf = joint->socket_conf;
1503 rtcf = skcf->router_conf;
1504 lock = &rtcf->router->lock;
1505
1506 nxt_thread_spin_lock(lock);
1507
1508 nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count,
1509 rtcf, rtcf->count);
1510
1480 if (--skcf->count != 0) {
1481 rtcf = NULL;
1482
1483 } else {
1484 nxt_queue_remove(&skcf->link);
1485
1486 if (--rtcf->count != 0) {
1487 rtcf = NULL;

--- 38 unchanged lines hidden (view full) ---

1526 engine = link->engine;
1527
1528 nxt_queue_remove(&engine->link);
1529
1530 port = engine->port;
1531
1532 // TODO notify all apps
1533
1511 if (--skcf->count != 0) {
1512 rtcf = NULL;
1513
1514 } else {
1515 nxt_queue_remove(&skcf->link);
1516
1517 if (--rtcf->count != 0) {
1518 rtcf = NULL;

--- 38 unchanged lines hidden (view full) ---

1557 engine = link->engine;
1558
1559 nxt_queue_remove(&engine->link);
1560
1561 port = engine->port;
1562
1563 // TODO notify all apps
1564
1534 if (port->pair[0] != -1) {
1535 nxt_fd_close(port->pair[0]);
1536 }
1565 nxt_mp_thread_adopt(port->mem_pool);
1566 nxt_port_release(port);
1537
1567
1538 if (port->pair[1] != -1) {
1539 nxt_fd_close(port->pair[1]);
1540 }
1541
1542 if (port->mem_pool) {
1543 nxt_mp_destroy(port->mem_pool);
1544 }
1545
1568 nxt_mp_thread_adopt(engine->mem_pool);
1546 nxt_mp_destroy(engine->mem_pool);
1547
1548 nxt_event_engine_free(engine);
1549
1550 nxt_free(link);
1551}
1552
1553

--- 124 unchanged lines hidden (view full) ---

1678 case 400: return "Bad request";
1679 case 404: return "Not found";
1680 case 403: return "Forbidden";
1681 case 500:
1682 default: return "Internal server error";
1683 }
1684}
1685
1569 nxt_mp_destroy(engine->mem_pool);
1570
1571 nxt_event_engine_free(engine);
1572
1573 nxt_free(link);
1574}
1575
1576

--- 124 unchanged lines hidden (view full) ---

1701 case 400: return "Bad request";
1702 case 404: return "Not found";
1703 case 403: return "Forbidden";
1704 case 500:
1705 default: return "Internal server error";
1706 }
1707}
1708
1686static void
1687nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code,
1688 const char* fmt, ...)
1709
1710static nxt_buf_t *
1711nxt_router_get_error_buf(nxt_task_t *task, nxt_mp_t *mp, int code,
1712 const char* fmt, va_list args)
1689{
1713{
1690 va_list args;
1691 nxt_buf_t *b, *last;
1692 const char *msg;
1714 nxt_buf_t *b, *last;
1715 const char *msg;
1693
1716
1694 b = nxt_buf_mem_alloc(c->mem_pool, 16384, 0);
1717 b = nxt_buf_mem_ts_alloc(task, mp, 16384);
1695 if (nxt_slow_path(b == NULL)) {
1718 if (nxt_slow_path(b == NULL)) {
1696 /* TODO pogorevaTb */
1719 return NULL;
1697 }
1698
1699 b->mem.free = nxt_sprintf(b->mem.free, b->mem.end,
1700 "HTTP/1.0 %d %s\r\n"
1701 "Content-Type: text/plain\r\n"
1702 "Connection: close\r\n\r\n",
1703 code, nxt_router_text_by_code(code));
1704
1705 msg = (const char *) b->mem.free;
1706
1720 }
1721
1722 b->mem.free = nxt_sprintf(b->mem.free, b->mem.end,
1723 "HTTP/1.0 %d %s\r\n"
1724 "Content-Type: text/plain\r\n"
1725 "Connection: close\r\n\r\n",
1726 code, nxt_router_text_by_code(code));
1727
1728 msg = (const char *) b->mem.free;
1729
1707 va_start(args, fmt);
1708 b->mem.free = nxt_vsprintf(b->mem.free, b->mem.end, fmt, args);
1730 b->mem.free = nxt_vsprintf(b->mem.free, b->mem.end, fmt, args);
1709 va_end(args);
1710
1711 nxt_log_alert(task->log, "error %d: %s", code, msg);
1712
1731
1732 nxt_log_alert(task->log, "error %d: %s", code, msg);
1733
1713 last = nxt_buf_sync_alloc(c->mem_pool, NXT_BUF_SYNC_LAST);
1734 last = nxt_buf_mem_ts_alloc(task, mp, 0);
1735
1714 if (nxt_slow_path(last == NULL)) {
1736 if (nxt_slow_path(last == NULL)) {
1715 /* TODO pogorevaTb */
1737 nxt_mp_release(mp, b);
1738 return NULL;
1716 }
1717
1739 }
1740
1741 nxt_buf_set_sync(last);
1742 nxt_buf_set_last(last);
1743
1718 nxt_buf_chain_add(&b, last);
1719
1744 nxt_buf_chain_add(&b, last);
1745
1746 return b;
1747}
1748
1749
1750
1751static void
1752nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code,
1753 const char* fmt, ...)
1754{
1755 va_list args;
1756 nxt_buf_t *b;
1757
1758 va_start(args, fmt);
1759 b = nxt_router_get_error_buf(task, c->mem_pool, code, fmt, args);
1760 va_end(args);
1761
1720 if (c->write == NULL) {
1721 c->write = b;
1722 c->write_state = &nxt_router_conn_write_state;
1723
1724 nxt_conn_write(task->thread->engine, c);
1725 } else {
1726 nxt_debug(task, "router data attach out bufs to existing chain");
1727

--- 9 unchanged lines hidden (view full) ---

1737 nxt_app_t *app;
1738 nxt_port_t *port;
1739 nxt_runtime_t *rt;
1740 nxt_start_worker_t *sw;
1741
1742 sw = obj;
1743 app = sw->app;
1744
1762 if (c->write == NULL) {
1763 c->write = b;
1764 c->write_state = &nxt_router_conn_write_state;
1765
1766 nxt_conn_write(task->thread->engine, c);
1767 } else {
1768 nxt_debug(task, "router data attach out bufs to existing chain");
1769

--- 9 unchanged lines hidden (view full) ---

1779 nxt_app_t *app;
1780 nxt_port_t *port;
1781 nxt_runtime_t *rt;
1782 nxt_start_worker_t *sw;
1783
1784 sw = obj;
1785 app = sw->app;
1786
1787 if (app->workers + app->pending_workers >= app->max_workers) {
1788 sw->work.handler = nxt_router_sw_release;
1789
1790 nxt_debug(task, "%uD/%uD running/penging workers, post sw #%uxD "
1791 "release to %p", sw->stream, sw->work.data);
1792
1793 nxt_event_engine_post(sw->work.data, &sw->work);
1794
1795 return;
1796 }
1797
1798 app->pending_workers++;
1799
1745 nxt_debug(task, "send sw #%uD", sw->stream);
1746
1747 nxt_router_sw_add(task, nxt_router, sw);
1748 nxt_queue_insert_tail(&app->requests, &sw->rc->app_link);
1749
1750 rt = task->thread->runtime;
1751 port = rt->port_by_type[NXT_PROCESS_MASTER];
1752
1753 b = nxt_buf_mem_alloc(port->mem_pool, app->conf.length, 0);
1754
1755 nxt_buf_cpystr(b, &app->conf);
1756
1757 nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA, -1, sw->stream, 0, b);
1758}
1759
1760
1800 nxt_debug(task, "send sw #%uD", sw->stream);
1801
1802 nxt_router_sw_add(task, nxt_router, sw);
1803 nxt_queue_insert_tail(&app->requests, &sw->rc->app_link);
1804
1805 rt = task->thread->runtime;
1806 port = rt->port_by_type[NXT_PROCESS_MASTER];
1807
1808 b = nxt_buf_mem_alloc(port->mem_pool, app->conf.length, 0);
1809
1810 nxt_buf_cpystr(b, &app->conf);
1811
1812 nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA, -1, sw->stream, 0, b);
1813}
1814
1815
1816static nxt_bool_t
1817nxt_router_app_free(nxt_app_t *app)
1818{
1819 if (app->live == 0 && app->workers == 0 &&
1820 app->pending_workers == 0 &&
1821 nxt_queue_is_empty(&app->requests)) {
1822
1823 nxt_thread_mutex_destroy(&app->mutex);
1824 nxt_free(app);
1825
1826 return 1;
1827 }
1828
1829 return 0;
1830}
1831
1832
1761static nxt_port_t *
1762nxt_router_app_get_port(nxt_app_t *app)
1763{
1764 nxt_port_t *port;
1765 nxt_queue_link_t *lnk;
1766
1767 port = NULL;
1768

--- 15 unchanged lines hidden (view full) ---

1784
1785
1786static void
1787nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data)
1788{
1789 nxt_app_t *app;
1790 nxt_port_t *port;
1791 nxt_work_t *work;
1833static nxt_port_t *
1834nxt_router_app_get_port(nxt_app_t *app)
1835{
1836 nxt_port_t *port;
1837 nxt_queue_link_t *lnk;
1838
1839 port = NULL;
1840

--- 15 unchanged lines hidden (view full) ---

1856
1857
1858static void
1859nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data)
1860{
1861 nxt_app_t *app;
1862 nxt_port_t *port;
1863 nxt_work_t *work;
1864 nxt_process_t *process;
1792 nxt_queue_link_t *lnk;
1793 nxt_req_conn_link_t *rc;
1794
1795 port = obj;
1796 app = data;
1797
1798 nxt_assert(app != NULL);
1799 nxt_assert(app == port->app);
1800 nxt_assert(port->app_link.next == NULL);
1801
1802
1803 if (task->thread->engine != port->engine) {
1865 nxt_queue_link_t *lnk;
1866 nxt_req_conn_link_t *rc;
1867
1868 port = obj;
1869 app = data;
1870
1871 nxt_assert(app != NULL);
1872 nxt_assert(app == port->app);
1873 nxt_assert(port->app_link.next == NULL);
1874
1875
1876 if (task->thread->engine != port->engine) {
1804 work = (nxt_work_t *) (port + 1);
1877 work = &port->work;
1805
1806 nxt_debug(task, "post release port to engine %p", port->engine);
1807
1808 work->next = NULL;
1809 work->handler = nxt_router_app_release_port;
1810 work->task = port->socket.task;
1811 work->obj = port;
1812 work->data = app;

--- 4 unchanged lines hidden (view full) ---

1817 }
1818
1819 if (!nxt_queue_is_empty(&app->requests)) {
1820 lnk = nxt_queue_first(&app->requests);
1821 nxt_queue_remove(lnk);
1822
1823 rc = nxt_queue_link_data(lnk, nxt_req_conn_link_t, app_link);
1824
1878
1879 nxt_debug(task, "post release port to engine %p", port->engine);
1880
1881 work->next = NULL;
1882 work->handler = nxt_router_app_release_port;
1883 work->task = port->socket.task;
1884 work->obj = port;
1885 work->data = app;

--- 4 unchanged lines hidden (view full) ---

1890 }
1891
1892 if (!nxt_queue_is_empty(&app->requests)) {
1893 lnk = nxt_queue_first(&app->requests);
1894 nxt_queue_remove(lnk);
1895
1896 rc = nxt_queue_link_data(lnk, nxt_req_conn_link_t, app_link);
1897
1825 nxt_debug(task, "process request #%uxD", rc->req_id);
1898 nxt_debug(task, "app '%V' process next request #%uxD",
1899 &app->name, rc->req_id);
1826
1827 rc->app_port = port;
1828
1829 nxt_router_process_http_request_mp(task, rc, rc->app_port->mem_pool);
1830
1831 return;
1832 }
1833
1900
1901 rc->app_port = port;
1902
1903 nxt_router_process_http_request_mp(task, rc, rc->app_port->mem_pool);
1904
1905 return;
1906 }
1907
1834 nxt_debug(task, "app requests queue is empty");
1908 if (port->pair[1] == -1) {
1909 nxt_debug(task, "app '%V' port already closed (pid %PI dead?)",
1910 &app->name, port->pid);
1835
1911
1912 app->workers--;
1913 nxt_router_app_free(app);
1914
1915 port->app = NULL;
1916 process = port->process;
1917
1918 nxt_port_release(port);
1919
1920 if (nxt_queue_is_empty(&process->ports)) {
1921 nxt_runtime_process_destroy(task->thread->runtime, process);
1922 }
1923
1924 return;
1925 }
1926
1927 if (!app->live) {
1928 nxt_debug(task, "app '%V' is not alive, send QUIT to port",
1929 &app->name);
1930
1931 nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT,
1932 -1, 0, 0, NULL);
1933
1934 return;
1935 }
1936
1937 nxt_debug(task, "app '%V' requests queue is empty, keep the port",
1938 &app->name);
1939
1836 nxt_thread_mutex_lock(&app->mutex);
1837
1838 nxt_queue_insert_head(&app->ports, &port->app_link);
1839
1840 nxt_thread_mutex_unlock(&app->mutex);
1841}
1842
1843
1940 nxt_thread_mutex_lock(&app->mutex);
1941
1942 nxt_queue_insert_head(&app->ports, &port->app_link);
1943
1944 nxt_thread_mutex_unlock(&app->mutex);
1945}
1946
1947
1844void
1948nxt_bool_t
1845nxt_router_app_remove_port(nxt_port_t *port)
1846{
1949nxt_router_app_remove_port(nxt_port_t *port)
1950{
1847 nxt_app_t *app;
1951 nxt_app_t *app;
1952 nxt_bool_t busy;
1848
1953
1849 if (port->app_link.next == NULL) {
1850 return;
1851 }
1852
1853 app = port->app;
1954 app = port->app;
1955 busy = 1;
1854
1956
1855#if (NXT_DEBUG)
1856 if (nxt_slow_path(app == NULL)) {
1857 nxt_abort();
1957 if (app == NULL) {
1958 nxt_assert(port->app_link.next == NULL);
1959
1960 return 1;
1858 }
1961 }
1859#endif
1860
1861 nxt_thread_mutex_lock(&app->mutex);
1862
1962
1963 nxt_thread_mutex_lock(&app->mutex);
1964
1863 nxt_queue_remove(&port->app_link);
1864 port->app_link.next = NULL;
1965 if (port->app_link.next != NULL) {
1865
1966
1967 nxt_queue_remove(&port->app_link);
1968 port->app_link.next = NULL;
1969
1970 busy = 0;
1971 }
1972
1866 nxt_thread_mutex_unlock(&app->mutex);
1973 nxt_thread_mutex_unlock(&app->mutex);
1974
1975 if (busy == 0) {
1976
1977 app->workers--;
1978 nxt_router_app_free(app);
1979
1980 return 1;
1981 }
1982
1983 return 0;
1867}
1868
1869
1870nxt_inline nxt_int_t
1871nxt_router_app_port(nxt_task_t *task, nxt_req_conn_link_t *rc)
1872{
1873 nxt_app_t *app;
1874 nxt_conn_t *c;

--- 14 unchanged lines hidden (view full) ---

1889 "Application is NULL in socket_conf");
1890 return NXT_ERROR;
1891 }
1892
1893
1894 port = nxt_router_app_get_port(app);
1895
1896 if (port != NULL) {
1984}
1985
1986
1987nxt_inline nxt_int_t
1988nxt_router_app_port(nxt_task_t *task, nxt_req_conn_link_t *rc)
1989{
1990 nxt_app_t *app;
1991 nxt_conn_t *c;

--- 14 unchanged lines hidden (view full) ---

2006 "Application is NULL in socket_conf");
2007 return NXT_ERROR;
2008 }
2009
2010
2011 port = nxt_router_app_get_port(app);
2012
2013 if (port != NULL) {
2014 nxt_debug(task, "already have port for app '%V'", &app->name);
2015
1897 rc->app_port = port;
1898 return NXT_OK;
1899 }
1900
2016 rc->app_port = port;
2017 return NXT_OK;
2018 }
2019
1901
1902 sw = nxt_mp_retain(c->mem_pool, sizeof(nxt_start_worker_t));
1903
1904 if (nxt_slow_path(sw == NULL)) {
1905 nxt_router_gen_error(task, rc->conn, 500,
1906 "Failed to allocate start worker struct");
1907 return NXT_ERROR;
1908 }
1909

--- 154 unchanged lines hidden (view full) ---

2064
2065 if (nxt_slow_path(rc == NULL)) {
2066 nxt_router_gen_error(task, c, 500, "Failed to allocate "
2067 "req->conn link");
2068
2069 return;
2070 }
2071
2020 sw = nxt_mp_retain(c->mem_pool, sizeof(nxt_start_worker_t));
2021
2022 if (nxt_slow_path(sw == NULL)) {
2023 nxt_router_gen_error(task, rc->conn, 500,
2024 "Failed to allocate start worker struct");
2025 return NXT_ERROR;
2026 }
2027

--- 154 unchanged lines hidden (view full) ---

2182
2183 if (nxt_slow_path(rc == NULL)) {
2184 nxt_router_gen_error(task, c, 500, "Failed to allocate "
2185 "req->conn link");
2186
2187 return;
2188 }
2189
2190 rc->ap = ap;
2191
2072 nxt_event_engine_request_add(engine, rc);
2073
2074 nxt_debug(task, "req_id %uxD linked to conn %p at engine %p",
2075 req_id, c, engine);
2076
2077 rc->reply_port = engine->port;
2078
2079 res = nxt_router_app_port(task, rc);

--- 19 unchanged lines hidden (view full) ---

2099 port = rc->app_port;
2100
2101 if (nxt_slow_path(port == NULL)) {
2102 nxt_router_gen_error(task, rc->conn, 500, "Application port not found");
2103 return;
2104 }
2105
2106 reply_port = rc->reply_port;
2192 nxt_event_engine_request_add(engine, rc);
2193
2194 nxt_debug(task, "req_id %uxD linked to conn %p at engine %p",
2195 req_id, c, engine);
2196
2197 rc->reply_port = engine->port;
2198
2199 res = nxt_router_app_port(task, rc);

--- 19 unchanged lines hidden (view full) ---

2219 port = rc->app_port;
2220
2221 if (nxt_slow_path(port == NULL)) {
2222 nxt_router_gen_error(task, rc->conn, 500, "Application port not found");
2223 return;
2224 }
2225
2226 reply_port = rc->reply_port;
2107 ap = rc->conn->socket.data;
2227 ap = rc->ap;
2108
2109 port_mp = port->mem_pool;
2110 port->mem_pool = mp;
2111
2112 c_port = nxt_process_connected_port_find(port->process, reply_port->pid,
2113 reply_port->id);
2114 if (nxt_slow_path(c_port != reply_port)) {
2115 res = nxt_port_send_port(task, port, reply_port, 0);

--- 263 unchanged lines hidden ---
2228
2229 port_mp = port->mem_pool;
2230 port->mem_pool = mp;
2231
2232 c_port = nxt_process_connected_port_find(port->process, reply_port->pid,
2233 reply_port->id);
2234 if (nxt_slow_path(c_port != reply_port)) {
2235 res = nxt_port_send_port(task, port, reply_port, 0);

--- 263 unchanged lines hidden ---