nxt_router.c (1545:78836321a126) nxt_router.c (1546:06017e6e3a5f)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>

--- 243 unchanged lines hidden (view full) ---

252
253static nxt_int_t nxt_router_http_request_done(nxt_task_t *task,
254 nxt_http_request_t *r);
255static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
256 void *data);
257static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
258static void nxt_router_get_port_handler(nxt_task_t *task,
259 nxt_port_recv_msg_t *msg);
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>

--- 243 unchanged lines hidden (view full) ---

252
253static nxt_int_t nxt_router_http_request_done(nxt_task_t *task,
254 nxt_http_request_t *r);
255static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
256 void *data);
257static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
258static void nxt_router_get_port_handler(nxt_task_t *task,
259 nxt_port_recv_msg_t *msg);
260static void nxt_router_get_mmap_handler(nxt_task_t *task,
261 nxt_port_recv_msg_t *msg);
260
261extern const nxt_http_request_state_t nxt_http_websocket;
262
263static nxt_router_t *nxt_router;
264
265static const nxt_str_t http_prefix = nxt_string("HTTP_");
266static const nxt_str_t empty_prefix = nxt_string("");
267

--- 8 unchanged lines hidden (view full) ---

276
277
278static const nxt_port_handlers_t nxt_router_process_port_handlers = {
279 .quit = nxt_signal_quit_handler,
280 .new_port = nxt_router_new_port_handler,
281 .get_port = nxt_router_get_port_handler,
282 .change_file = nxt_port_change_log_file_handler,
283 .mmap = nxt_port_mmap_handler,
262
263extern const nxt_http_request_state_t nxt_http_websocket;
264
265static nxt_router_t *nxt_router;
266
267static const nxt_str_t http_prefix = nxt_string("HTTP_");
268static const nxt_str_t empty_prefix = nxt_string("");
269

--- 8 unchanged lines hidden (view full) ---

278
279
280static const nxt_port_handlers_t nxt_router_process_port_handlers = {
281 .quit = nxt_signal_quit_handler,
282 .new_port = nxt_router_new_port_handler,
283 .get_port = nxt_router_get_port_handler,
284 .change_file = nxt_port_change_log_file_handler,
285 .mmap = nxt_port_mmap_handler,
286 .get_mmap = nxt_router_get_mmap_handler,
284 .data = nxt_router_conf_data_handler,
285 .remove_pid = nxt_router_remove_pid_handler,
286 .access_log = nxt_router_access_log_reopen_handler,
287 .rpc_ready = nxt_port_rpc_handler,
288 .rpc_error = nxt_port_rpc_handler,
289 .oosm = nxt_router_oosm_handler,
290};
291

--- 4711 unchanged lines hidden (view full) ---

5003 req_app_link->msg_info.completion_handler = buf->completion_handler;
5004
5005 for (; buf; buf = buf->next) {
5006 buf->completion_handler = nxt_router_dummy_buf_completion;
5007 }
5008
5009 buf = req_app_link->msg_info.buf;
5010
287 .data = nxt_router_conf_data_handler,
288 .remove_pid = nxt_router_remove_pid_handler,
289 .access_log = nxt_router_access_log_reopen_handler,
290 .rpc_ready = nxt_port_rpc_handler,
291 .rpc_error = nxt_port_rpc_handler,
292 .oosm = nxt_router_oosm_handler,
293};
294

--- 4711 unchanged lines hidden (view full) ---

5006 req_app_link->msg_info.completion_handler = buf->completion_handler;
5007
5008 for (; buf; buf = buf->next) {
5009 buf->completion_handler = nxt_router_dummy_buf_completion;
5010 }
5011
5012 buf = req_app_link->msg_info.buf;
5013
5011 res = nxt_port_mmap_get_tracking(task, port,
5014 res = nxt_port_mmap_get_tracking(task, &port->process->outgoing,
5012 &req_app_link->msg_info.tracking,
5013 req_app_link->stream);
5014 if (nxt_slow_path(res != NXT_OK)) {
5015 nxt_request_app_link_error(task, port->app, req_app_link,
5016 "Failed to get tracking area");
5017 goto release_port;
5018 }
5019

--- 113 unchanged lines hidden (view full) ---

5133
5134 if (nxt_slow_path(req_size > PORT_MMAP_DATA_SIZE)) {
5135 nxt_alert(task, "headers to big to fit in shared memory (%d)",
5136 (int) req_size);
5137
5138 return NULL;
5139 }
5140
5015 &req_app_link->msg_info.tracking,
5016 req_app_link->stream);
5017 if (nxt_slow_path(res != NXT_OK)) {
5018 nxt_request_app_link_error(task, port->app, req_app_link,
5019 "Failed to get tracking area");
5020 goto release_port;
5021 }
5022

--- 113 unchanged lines hidden (view full) ---

5136
5137 if (nxt_slow_path(req_size > PORT_MMAP_DATA_SIZE)) {
5138 nxt_alert(task, "headers to big to fit in shared memory (%d)",
5139 (int) req_size);
5140
5141 return NULL;
5142 }
5143
5141 out = nxt_port_mmap_get_buf(task, port,
5144 out = nxt_port_mmap_get_buf(task, &port->process->outgoing,
5142 nxt_min(req_size + content_length, PORT_MMAP_DATA_SIZE));
5143 if (nxt_slow_path(out == NULL)) {
5144 return NULL;
5145 }
5146
5147 req = (nxt_unit_request_t *) out->mem.free;
5148 out->mem.free += req_size;
5149

--- 165 unchanged lines hidden (view full) ---

5315 for (b = r->body; b != NULL; b = b->next) {
5316 size = nxt_buf_mem_used_size(&b->mem);
5317 pos = b->mem.pos;
5318
5319 while (size > 0) {
5320 if (buf == NULL) {
5321 free_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
5322
5145 nxt_min(req_size + content_length, PORT_MMAP_DATA_SIZE));
5146 if (nxt_slow_path(out == NULL)) {
5147 return NULL;
5148 }
5149
5150 req = (nxt_unit_request_t *) out->mem.free;
5151 out->mem.free += req_size;
5152

--- 165 unchanged lines hidden (view full) ---

5318 for (b = r->body; b != NULL; b = b->next) {
5319 size = nxt_buf_mem_used_size(&b->mem);
5320 pos = b->mem.pos;
5321
5322 while (size > 0) {
5323 if (buf == NULL) {
5324 free_size = nxt_min(size, PORT_MMAP_DATA_SIZE);
5325
5323 buf = nxt_port_mmap_get_buf(task, port, free_size);
5326 buf = nxt_port_mmap_get_buf(task, &port->process->outgoing,
5327 free_size);
5324 if (nxt_slow_path(buf == NULL)) {
5325 while (out != NULL) {
5326 buf = out->next;
5327 out->next = NULL;
5328 out->completion_handler(task, out, out->parent);
5329 out = buf;
5330 }
5331 return NULL;

--- 219 unchanged lines hidden (view full) ---

5551 if (ack) {
5552 (void) nxt_port_socket_write(task, msg->port, NXT_PORT_MSG_SHM_ACK,
5553 -1, 0, 0, NULL);
5554 }
5555}
5556
5557
5558static void
5328 if (nxt_slow_path(buf == NULL)) {
5329 while (out != NULL) {
5330 buf = out->next;
5331 out->next = NULL;
5332 out->completion_handler(task, out, out->parent);
5333 out = buf;
5334 }
5335 return NULL;

--- 219 unchanged lines hidden (view full) ---

5555 if (ack) {
5556 (void) nxt_port_socket_write(task, msg->port, NXT_PORT_MSG_SHM_ACK,
5557 -1, 0, 0, NULL);
5558 }
5559}
5560
5561
5562static void
5563nxt_router_get_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5564{
5565 nxt_fd_t fd;
5566 nxt_port_t *port;
5567 nxt_runtime_t *rt;
5568 nxt_port_mmaps_t *mmaps;
5569 nxt_port_msg_get_mmap_t *get_mmap_msg;
5570 nxt_port_mmap_handler_t *mmap_handler;
5571
5572 rt = task->thread->runtime;
5573
5574 port = nxt_runtime_port_find(rt, msg->port_msg.pid,
5575 msg->port_msg.reply_port);
5576 if (nxt_slow_path(port == NULL)) {
5577 nxt_alert(task, "get_mmap_handler: reply_port %PI:%d not found",
5578 msg->port_msg.pid, msg->port_msg.reply_port);
5579
5580 return;
5581 }
5582
5583 if (nxt_slow_path(nxt_buf_used_size(msg->buf)
5584 < (int) sizeof(nxt_port_msg_get_mmap_t)))
5585 {
5586 nxt_alert(task, "get_mmap_handler: message buffer too small (%d)",
5587 (int) nxt_buf_used_size(msg->buf));
5588
5589 return;
5590 }
5591
5592 get_mmap_msg = (nxt_port_msg_get_mmap_t *) msg->buf->mem.pos;
5593
5594 nxt_assert(port->type == NXT_PROCESS_APP);
5595
5596 mmaps = &port->process->outgoing;
5597 nxt_thread_mutex_lock(&mmaps->mutex);
5598
5599 if (nxt_slow_path(get_mmap_msg->id >= mmaps->size)) {
5600 nxt_thread_mutex_unlock(&mmaps->mutex);
5601
5602 nxt_alert(task, "get_mmap_handler: mmap id is too big (%d)",
5603 (int) get_mmap_msg->id);
5604
5605 return;
5606 }
5607
5608 mmap_handler = mmaps->elts[get_mmap_msg->id].mmap_handler;
5609
5610 fd = mmap_handler->fd;
5611
5612 nxt_thread_mutex_unlock(&mmaps->mutex);
5613
5614 nxt_debug(task, "get mmap %PI:%d found",
5615 msg->port_msg.pid, (int) get_mmap_msg->id);
5616
5617 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL);
5618}
5619
5620
5621static void
5559nxt_router_get_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5560{
5561 nxt_port_t *port, *reply_port;
5562 nxt_runtime_t *rt;
5563 nxt_port_msg_get_port_t *get_port_msg;
5564
5565 rt = task->thread->runtime;
5566

--- 33 unchanged lines hidden ---
5622nxt_router_get_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5623{
5624 nxt_port_t *port, *reply_port;
5625 nxt_runtime_t *rt;
5626 nxt_port_msg_get_port_t *get_port_msg;
5627
5628 rt = task->thread->runtime;
5629

--- 33 unchanged lines hidden ---