nxt_router.c (1294:61e9f23a566d) nxt_router.c (1321:2c7f79bf0a1f)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>

--- 234 unchanged lines hidden (view full) ---

243
244static void nxt_router_app_joint_use(nxt_task_t *task,
245 nxt_app_joint_t *app_joint, int i);
246
247static nxt_int_t nxt_router_http_request_done(nxt_task_t *task,
248 nxt_http_request_t *r);
249static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
250 void *data);
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>

--- 234 unchanged lines hidden (view full) ---

243
244static void nxt_router_app_joint_use(nxt_task_t *task,
245 nxt_app_joint_t *app_joint, int i);
246
247static nxt_int_t nxt_router_http_request_done(nxt_task_t *task,
248 nxt_http_request_t *r);
249static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
250 void *data);
251static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
251
252extern const nxt_http_request_state_t nxt_http_websocket;
253
254static nxt_router_t *nxt_router;
255
256static const nxt_str_t http_prefix = nxt_string("HTTP_");
257static const nxt_str_t empty_prefix = nxt_string("");
258

--- 12 unchanged lines hidden (view full) ---

271 .new_port = nxt_router_new_port_handler,
272 .change_file = nxt_port_change_log_file_handler,
273 .mmap = nxt_port_mmap_handler,
274 .data = nxt_router_conf_data_handler,
275 .remove_pid = nxt_router_remove_pid_handler,
276 .access_log = nxt_router_access_log_reopen_handler,
277 .rpc_ready = nxt_port_rpc_handler,
278 .rpc_error = nxt_port_rpc_handler,
252
253extern const nxt_http_request_state_t nxt_http_websocket;
254
255static nxt_router_t *nxt_router;
256
257static const nxt_str_t http_prefix = nxt_string("HTTP_");
258static const nxt_str_t empty_prefix = nxt_string("");
259

--- 12 unchanged lines hidden (view full) ---

272 .new_port = nxt_router_new_port_handler,
273 .change_file = nxt_port_change_log_file_handler,
274 .mmap = nxt_port_mmap_handler,
275 .data = nxt_router_conf_data_handler,
276 .remove_pid = nxt_router_remove_pid_handler,
277 .access_log = nxt_router_access_log_reopen_handler,
278 .rpc_ready = nxt_port_rpc_handler,
279 .rpc_error = nxt_port_rpc_handler,
280 .oosm = nxt_router_oosm_handler,
279};
280
281
282nxt_int_t
283nxt_router_start(nxt_task_t *task, void *data)
284{
285 nxt_int_t ret;
286 nxt_port_t *controller_port;

--- 2456 unchanged lines hidden (view full) ---

2743 }
2744}
2745
2746
2747static nxt_port_handlers_t nxt_router_app_port_handlers = {
2748 .rpc_error = nxt_port_rpc_handler,
2749 .mmap = nxt_port_mmap_handler,
2750 .data = nxt_port_rpc_handler,
281};
282
283
284nxt_int_t
285nxt_router_start(nxt_task_t *task, void *data)
286{
287 nxt_int_t ret;
288 nxt_port_t *controller_port;

--- 2456 unchanged lines hidden (view full) ---

2745 }
2746}
2747
2748
2749static nxt_port_handlers_t nxt_router_app_port_handlers = {
2750 .rpc_error = nxt_port_rpc_handler,
2751 .mmap = nxt_port_mmap_handler,
2752 .data = nxt_port_rpc_handler,
2753 .oosm = nxt_router_oosm_handler,
2751};
2752
2753
2754static void
2755nxt_router_thread_start(void *data)
2756{
2757 nxt_int_t ret;
2758 nxt_port_t *port;

--- 2477 unchanged lines hidden (view full) ---

5236 nxt_http_request_t *r;
5237
5238 nxt_debug(task, "http app release");
5239
5240 r = nxt_timer_data(obj, nxt_http_request_t, timer);
5241
5242 nxt_mp_release(r->mem_pool);
5243}
2754};
2755
2756
2757static void
2758nxt_router_thread_start(void *data)
2759{
2760 nxt_int_t ret;
2761 nxt_port_t *port;

--- 2477 unchanged lines hidden (view full) ---

5239 nxt_http_request_t *r;
5240
5241 nxt_debug(task, "http app release");
5242
5243 r = nxt_timer_data(obj, nxt_http_request_t, timer);
5244
5245 nxt_mp_release(r->mem_pool);
5246}
5247
5248
5249static void
5250nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
5251{
5252 size_t mi;
5253 uint32_t i;
5254 nxt_bool_t ack;
5255 nxt_process_t *process;
5256 nxt_free_map_t *m;
5257 nxt_port_mmap_header_t *hdr;
5258
5259 nxt_debug(task, "oosm in %PI", msg->port_msg.pid);
5260
5261 process = nxt_runtime_process_find(task->thread->runtime,
5262 msg->port_msg.pid);
5263 if (nxt_slow_path(process == NULL)) {
5264 return;
5265 }
5266
5267 ack = 0;
5268
5269 /*
5270 * To mitigate possible racing condition (when OOSM message received
5271 * after some of the memory was already freed), need to try to find
5272 * first free segment in shared memory and send ACK if found.
5273 */
5274
5275 nxt_thread_mutex_lock(&process->incoming.mutex);
5276
5277 for (i = 0; i < process->incoming.size; i++) {
5278 hdr = process->incoming.elts[i].mmap_handler->hdr;
5279 m = hdr->free_map;
5280
5281 for (mi = 0; mi < MAX_FREE_IDX; mi++) {
5282 if (m[mi] != 0) {
5283 ack = 1;
5284
5285 nxt_debug(task, "oosm: already free #%uD %uz = 0x%08xA",
5286 i, mi, m[mi]);
5287
5288 break;
5289 }
5290 }
5291 }
5292
5293 nxt_thread_mutex_unlock(&process->incoming.mutex);
5294
5295 if (ack) {
5296 (void) nxt_port_socket_write(task, msg->port, NXT_PORT_MSG_SHM_ACK,
5297 -1, 0, 0, NULL);
5298 }
5299}