Deleted
Added
nxt_port.c (1997:a8a3f1d243ee) | nxt_port.c (1998:c8790d2a89bb) |
---|---|
1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7#include <nxt_main.h> 8#include <nxt_runtime.h> 9#include <nxt_port.h> 10#include <nxt_router.h> 11#include <nxt_app_queue.h> 12#include <nxt_port_queue.h> 13 14 | 1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7#include <nxt_main.h> 8#include <nxt_runtime.h> 9#include <nxt_port.h> 10#include <nxt_router.h> 11#include <nxt_app_queue.h> 12#include <nxt_port_queue.h> 13 14 |
15static void nxt_port_remove_pid(nxt_task_t *task, nxt_port_recv_msg_t *msg, 16 nxt_pid_t pid); |
|
15static void nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); 16 17static nxt_atomic_uint_t nxt_port_last_id = 1; 18 19 20static void 21nxt_port_mp_cleanup(nxt_task_t *task, void *obj, void *data) 22{ --- 246 unchanged lines hidden (view full) --- 269 nxt_debug(task, "new port %d received for process %PI:%d", 270 msg->fd[0], new_port_msg->pid, new_port_msg->id); 271 272 port = nxt_runtime_port_find(rt, new_port_msg->pid, new_port_msg->id); 273 if (port != NULL) { 274 nxt_debug(task, "port %PI:%d already exists", new_port_msg->pid, 275 new_port_msg->id); 276 | 17static void nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); 18 19static nxt_atomic_uint_t nxt_port_last_id = 1; 20 21 22static void 23nxt_port_mp_cleanup(nxt_task_t *task, void *obj, void *data) 24{ --- 246 unchanged lines hidden (view full) --- 271 nxt_debug(task, "new port %d received for process %PI:%d", 272 msg->fd[0], new_port_msg->pid, new_port_msg->id); 273 274 port = nxt_runtime_port_find(rt, new_port_msg->pid, new_port_msg->id); 275 if (port != NULL) { 276 nxt_debug(task, "port %PI:%d already exists", new_port_msg->pid, 277 new_port_msg->id); 278 |
279 msg->u.new_port = port; 280 |
|
277 nxt_fd_close(msg->fd[0]); 278 msg->fd[0] = -1; 279 return; 280 } 281 282 port = nxt_runtime_process_port_create(task, rt, new_port_msg->pid, 283 new_port_msg->id, 284 new_port_msg->type); --- 94 unchanged lines hidden (view full) --- 379 nxt_runtime_process_each(rt, process) { 380 381 if (nxt_pid == process->pid) { 382 continue; 383 } 384 385 port = nxt_process_port_first(process); 386 | 281 nxt_fd_close(msg->fd[0]); 282 msg->fd[0] = -1; 283 return; 284 } 285 286 port = nxt_runtime_process_port_create(task, rt, new_port_msg->pid, 287 new_port_msg->id, 288 new_port_msg->type); --- 94 unchanged lines hidden (view full) --- 383 nxt_runtime_process_each(rt, process) { 384 385 if (nxt_pid == process->pid) { 386 continue; 387 } 388 389 port = nxt_process_port_first(process); 390 |
387 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 388 sizeof(nxt_port_data_t)); | 391 b = nxt_buf_mem_alloc(task->thread->engine->mem_pool, 392 sizeof(nxt_uint_t), 0); |
389 if (nxt_slow_path(b == NULL)) { 390 continue; 391 } 392 | 393 if (nxt_slow_path(b == NULL)) { 394 continue; 395 } 396 |
393 *(nxt_uint_t *) b->mem.pos = slot; 394 b->mem.free += sizeof(nxt_uint_t); | 397 b->mem.free = nxt_cpymem(b->mem.free, &slot, sizeof(nxt_uint_t)); |
395 396 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_CHANGE_FILE, 397 fd, 0, 0, b); 398 399 } nxt_runtime_process_loop; 400} 401 402 --- 89 unchanged lines hidden (view full) --- 492 493 } nxt_runtime_process_loop; 494} 495 496 497void 498nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 499{ | 398 399 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_CHANGE_FILE, 400 fd, 0, 0, b); 401 402 } nxt_runtime_process_loop; 403} 404 405 --- 89 unchanged lines hidden (view full) --- 495 496 } nxt_runtime_process_loop; 497} 498 499 500void 501nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 502{ |
500 nxt_buf_t *buf; 501 nxt_pid_t pid; 502 nxt_runtime_t *rt; 503 nxt_process_t *process; | 503 nxt_pid_t pid; 504 nxt_buf_t *buf; |
504 505 buf = msg->buf; 506 507 nxt_assert(nxt_buf_used_size(buf) == sizeof(pid)); 508 | 505 506 buf = msg->buf; 507 508 nxt_assert(nxt_buf_used_size(buf) == sizeof(pid)); 509 |
509 nxt_memcpy(&pid, buf->mem.pos, sizeof(pid)); | 510 nxt_memcpy(&pid, buf->mem.pos, sizeof(nxt_pid_t)); |
510 | 511 |
512 nxt_port_remove_pid(task, msg, pid); 513} 514 515 516static void 517nxt_port_remove_pid(nxt_task_t *task, nxt_port_recv_msg_t *msg, 518 nxt_pid_t pid) 519{ 520 nxt_runtime_t *rt; 521 nxt_process_t *process; 522 |
|
511 msg->u.removed_pid = pid; 512 513 nxt_debug(task, "port remove pid %PI handler", pid); 514 515 rt = task->thread->runtime; 516 517 nxt_port_rpc_remove_peer(task, msg->port, pid); 518 --- 100 unchanged lines hidden --- | 523 msg->u.removed_pid = pid; 524 525 nxt_debug(task, "port remove pid %PI handler", pid); 526 527 rt = task->thread->runtime; 528 529 nxt_port_rpc_remove_peer(task, msg->port, pid); 530 --- 100 unchanged lines hidden --- |