nxt_port.c (1997:a8a3f1d243ee) nxt_port.c (1998:c8790d2a89bb)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8#include <nxt_runtime.h>
9#include <nxt_port.h>
10#include <nxt_router.h>
11#include <nxt_app_queue.h>
12#include <nxt_port_queue.h>
13
14
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8#include <nxt_runtime.h>
9#include <nxt_port.h>
10#include <nxt_router.h>
11#include <nxt_app_queue.h>
12#include <nxt_port_queue.h>
13
14
15static void nxt_port_remove_pid(nxt_task_t *task, nxt_port_recv_msg_t *msg,
16 nxt_pid_t pid);
15static void nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
16
17static nxt_atomic_uint_t nxt_port_last_id = 1;
18
19
20static void
21nxt_port_mp_cleanup(nxt_task_t *task, void *obj, void *data)
22{

--- 246 unchanged lines hidden (view full) ---

269 nxt_debug(task, "new port %d received for process %PI:%d",
270 msg->fd[0], new_port_msg->pid, new_port_msg->id);
271
272 port = nxt_runtime_port_find(rt, new_port_msg->pid, new_port_msg->id);
273 if (port != NULL) {
274 nxt_debug(task, "port %PI:%d already exists", new_port_msg->pid,
275 new_port_msg->id);
276
17static void nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
18
19static nxt_atomic_uint_t nxt_port_last_id = 1;
20
21
22static void
23nxt_port_mp_cleanup(nxt_task_t *task, void *obj, void *data)
24{

--- 246 unchanged lines hidden (view full) ---

271 nxt_debug(task, "new port %d received for process %PI:%d",
272 msg->fd[0], new_port_msg->pid, new_port_msg->id);
273
274 port = nxt_runtime_port_find(rt, new_port_msg->pid, new_port_msg->id);
275 if (port != NULL) {
276 nxt_debug(task, "port %PI:%d already exists", new_port_msg->pid,
277 new_port_msg->id);
278
279 msg->u.new_port = port;
280
277 nxt_fd_close(msg->fd[0]);
278 msg->fd[0] = -1;
279 return;
280 }
281
282 port = nxt_runtime_process_port_create(task, rt, new_port_msg->pid,
283 new_port_msg->id,
284 new_port_msg->type);

--- 94 unchanged lines hidden (view full) ---

379 nxt_runtime_process_each(rt, process) {
380
381 if (nxt_pid == process->pid) {
382 continue;
383 }
384
385 port = nxt_process_port_first(process);
386
281 nxt_fd_close(msg->fd[0]);
282 msg->fd[0] = -1;
283 return;
284 }
285
286 port = nxt_runtime_process_port_create(task, rt, new_port_msg->pid,
287 new_port_msg->id,
288 new_port_msg->type);

--- 94 unchanged lines hidden (view full) ---

383 nxt_runtime_process_each(rt, process) {
384
385 if (nxt_pid == process->pid) {
386 continue;
387 }
388
389 port = nxt_process_port_first(process);
390
387 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
388 sizeof(nxt_port_data_t));
391 b = nxt_buf_mem_alloc(task->thread->engine->mem_pool,
392 sizeof(nxt_uint_t), 0);
389 if (nxt_slow_path(b == NULL)) {
390 continue;
391 }
392
393 if (nxt_slow_path(b == NULL)) {
394 continue;
395 }
396
393 *(nxt_uint_t *) b->mem.pos = slot;
394 b->mem.free += sizeof(nxt_uint_t);
397 b->mem.free = nxt_cpymem(b->mem.free, &slot, sizeof(nxt_uint_t));
395
396 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_CHANGE_FILE,
397 fd, 0, 0, b);
398
399 } nxt_runtime_process_loop;
400}
401
402

--- 89 unchanged lines hidden (view full) ---

492
493 } nxt_runtime_process_loop;
494}
495
496
497void
498nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
499{
398
399 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_CHANGE_FILE,
400 fd, 0, 0, b);
401
402 } nxt_runtime_process_loop;
403}
404
405

--- 89 unchanged lines hidden (view full) ---

495
496 } nxt_runtime_process_loop;
497}
498
499
500void
501nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
502{
500 nxt_buf_t *buf;
501 nxt_pid_t pid;
502 nxt_runtime_t *rt;
503 nxt_process_t *process;
503 nxt_pid_t pid;
504 nxt_buf_t *buf;
504
505 buf = msg->buf;
506
507 nxt_assert(nxt_buf_used_size(buf) == sizeof(pid));
508
505
506 buf = msg->buf;
507
508 nxt_assert(nxt_buf_used_size(buf) == sizeof(pid));
509
509 nxt_memcpy(&pid, buf->mem.pos, sizeof(pid));
510 nxt_memcpy(&pid, buf->mem.pos, sizeof(nxt_pid_t));
510
511
512 nxt_port_remove_pid(task, msg, pid);
513}
514
515
516static void
517nxt_port_remove_pid(nxt_task_t *task, nxt_port_recv_msg_t *msg,
518 nxt_pid_t pid)
519{
520 nxt_runtime_t *rt;
521 nxt_process_t *process;
522
511 msg->u.removed_pid = pid;
512
513 nxt_debug(task, "port remove pid %PI handler", pid);
514
515 rt = task->thread->runtime;
516
517 nxt_port_rpc_remove_peer(task, msg->port, pid);
518

--- 100 unchanged lines hidden ---
523 msg->u.removed_pid = pid;
524
525 nxt_debug(task, "port remove pid %PI handler", pid);
526
527 rt = task->thread->runtime;
528
529 nxt_port_rpc_remove_peer(task, msg->port, pid);
530

--- 100 unchanged lines hidden ---