Deleted
Added
nxt_port.c (342:82c2825a617a) | nxt_port.c (343:9fa845db60fb) |
---|---|
1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7#include <nxt_main.h> 8#include <nxt_runtime.h> --- 13 unchanged lines hidden (view full) --- 22 nxt_port_t *port; 23 24 port = obj; 25 mp = data; 26 27 nxt_assert(port->pair[0] == -1); 28 nxt_assert(port->pair[1] == -1); 29 | 1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7#include <nxt_main.h> 8#include <nxt_runtime.h> --- 13 unchanged lines hidden (view full) --- 22 nxt_port_t *port; 23 24 port = obj; 25 mp = data; 26 27 nxt_assert(port->pair[0] == -1); 28 nxt_assert(port->pair[1] == -1); 29 |
30 nxt_assert(port->app_stream == 0); | 30 nxt_assert(port->use_count == 0); |
31 nxt_assert(port->app_link.next == NULL); 32 33 nxt_assert(nxt_queue_is_empty(&port->messages)); 34 nxt_assert(nxt_lvlhsh_is_empty(&port->rpc_streams)); 35 nxt_assert(nxt_lvlhsh_is_empty(&port->rpc_peers)); 36 | 31 nxt_assert(port->app_link.next == NULL); 32 33 nxt_assert(nxt_queue_is_empty(&port->messages)); 34 nxt_assert(nxt_lvlhsh_is_empty(&port->rpc_streams)); 35 nxt_assert(nxt_lvlhsh_is_empty(&port->rpc_peers)); 36 |
37 nxt_thread_mutex_destroy(&port->write_mutex); 38 |
|
37 nxt_mp_free(mp, port); 38} 39 40 41nxt_port_t * 42nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid, 43 nxt_process_type_t type) 44{ --- 8 unchanged lines hidden (view full) --- 53 54 port = nxt_mp_zalloc(mp, sizeof(nxt_port_t)); 55 56 if (nxt_fast_path(port != NULL)) { 57 port->id = id; 58 port->pid = pid; 59 port->type = type; 60 port->mem_pool = mp; | 39 nxt_mp_free(mp, port); 40} 41 42 43nxt_port_t * 44nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid, 45 nxt_process_type_t type) 46{ --- 8 unchanged lines hidden (view full) --- 55 56 port = nxt_mp_zalloc(mp, sizeof(nxt_port_t)); 57 58 if (nxt_fast_path(port != NULL)) { 59 port->id = id; 60 port->pid = pid; 61 port->type = type; 62 port->mem_pool = mp; |
63 port->use_count = 1; |
|
61 62 nxt_mp_cleanup(mp, nxt_port_mp_cleanup, task, port, mp); 63 64 nxt_queue_init(&port->messages); | 64 65 nxt_mp_cleanup(mp, nxt_port_mp_cleanup, task, port, mp); 66 67 nxt_queue_init(&port->messages); |
68 nxt_thread_mutex_create(&port->write_mutex); |
|
65 66 } else { 67 nxt_mp_destroy(mp); 68 } 69 70 nxt_thread_log_debug("port %p %d:%d new, type %d", port, pid, id, type); 71 72 return port; 73} 74 75 | 69 70 } else { 71 nxt_mp_destroy(mp); 72 } 73 74 nxt_thread_log_debug("port %p %d:%d new, type %d", port, pid, id, type); 75 76 return port; 77} 78 79 |
76nxt_bool_t 77nxt_port_release(nxt_port_t *port) | 80void 81nxt_port_close(nxt_task_t *task, nxt_port_t *port) |
78{ | 82{ |
79 nxt_thread_log_debug("port %p %d:%d release, type %d", port, port->pid, 80 port->id, port->type); | 83 nxt_debug(task, "port %p %d:%d close, type %d", port, port->pid, 84 port->id, port->type); |
81 82 if (port->pair[0] != -1) { 83 nxt_fd_close(port->pair[0]); 84 port->pair[0] = -1; 85 } 86 87 if (port->pair[1] != -1) { 88 nxt_fd_close(port->pair[1]); 89 port->pair[1] = -1; | 85 86 if (port->pair[0] != -1) { 87 nxt_fd_close(port->pair[0]); 88 port->pair[0] = -1; 89 } 90 91 if (port->pair[1] != -1) { 92 nxt_fd_close(port->pair[1]); 93 port->pair[1] = -1; |
90 } | |
91 | 94 |
92 if (port->type == NXT_PROCESS_WORKER) { 93 if (nxt_router_app_remove_port(port) == 0) { 94 return 0; | 95 if (port->app != NULL) { 96 nxt_router_app_port_close(task, port); |
95 } 96 } | 97 } 98 } |
99} |
|
97 | 100 |
101 102static void 103nxt_port_release(nxt_task_t *task, nxt_port_t *port) 104{ 105 nxt_debug(task, "port %p %d:%d release, type %d", port, port->pid, 106 port->id, port->type); 107 108 if (port->app != NULL) { 109 nxt_router_app_use(task, port->app, -1); 110 111 port->app = NULL; 112 } 113 |
|
98 if (port->link.next != NULL) { 99 nxt_process_port_remove(port); 100 } 101 102 nxt_mp_release(port->mem_pool, NULL); | 114 if (port->link.next != NULL) { 115 nxt_process_port_remove(port); 116 } 117 118 nxt_mp_release(port->mem_pool, NULL); |
103 104 return 1; | |
105} 106 107 108nxt_port_id_t 109nxt_port_get_next_id() 110{ 111 return nxt_atomic_fetch_add(&nxt_port_last_id, 1); 112} --- 145 unchanged lines hidden (view full) --- 258 259 port->pair[0] = -1; 260 port->pair[1] = msg->fd; 261 port->max_size = new_port_msg->max_size; 262 port->max_share = new_port_msg->max_share; 263 264 port->socket.task = task; 265 | 119} 120 121 122nxt_port_id_t 123nxt_port_get_next_id() 124{ 125 return nxt_atomic_fetch_add(&nxt_port_last_id, 1); 126} --- 145 unchanged lines hidden (view full) --- 272 273 port->pair[0] = -1; 274 port->pair[1] = msg->fd; 275 port->max_size = new_port_msg->max_size; 276 port->max_share = new_port_msg->max_share; 277 278 port->socket.task = task; 279 |
266 nxt_runtime_port_add(rt, port); | 280 nxt_runtime_port_add(task, port); |
267 | 281 |
282 nxt_port_use(task, port, -1); 283 |
|
268 nxt_port_write_enable(task, port); 269 270 msg->new_port = port; 271} 272 273 274void 275nxt_port_process_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) --- 153 unchanged lines hidden (view full) --- 429 430 rt = task->thread->runtime; 431 432 nxt_port_rpc_remove_peer(task, msg->port, pid); 433 434 process = nxt_runtime_process_find(rt, pid); 435 436 if (process) { | 284 nxt_port_write_enable(task, port); 285 286 msg->new_port = port; 287} 288 289 290void 291nxt_port_process_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) --- 153 unchanged lines hidden (view full) --- 445 446 rt = task->thread->runtime; 447 448 nxt_port_rpc_remove_peer(task, msg->port, pid); 449 450 process = nxt_runtime_process_find(rt, pid); 451 452 if (process) { |
437 nxt_runtime_process_remove(rt, process); | 453 nxt_runtime_process_remove(task, process); |
438 } 439} 440 441 442void 443nxt_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 444{ 445 nxt_debug(task, "port empty handler"); 446} | 454 } 455} 456 457 458void 459nxt_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 460{ 461 nxt_debug(task, "port empty handler"); 462} |
463 464 465typedef struct { 466 nxt_work_t work; 467 nxt_port_t *port; 468 nxt_port_post_handler_t handler; 469} nxt_port_work_t; 470 471 472static void 473nxt_port_post_handler(nxt_task_t *task, void *obj, void *data) 474{ 475 nxt_port_t *port; 476 nxt_port_work_t *pw; 477 nxt_port_post_handler_t handler; 478 479 pw = obj; 480 port = pw->port; 481 handler = pw->handler; 482 483 nxt_free(pw); 484 485 handler(task, port, data); 486 487 nxt_port_use(task, port, -1); 488} 489 490 491nxt_int_t 492nxt_port_post(nxt_task_t *task, nxt_port_t *port, 493 nxt_port_post_handler_t handler, void *data) 494{ 495 nxt_port_work_t *pw; 496 497 if (task->thread->engine == port->engine) { 498 handler(task, port, data); 499 500 return NXT_OK; 501 } 502 503 pw = nxt_zalloc(sizeof(nxt_port_work_t)); 504 505 if (nxt_slow_path(pw == NULL)) { 506 return NXT_ERROR; 507 } 508 509 nxt_atomic_fetch_add(&port->use_count, 1); 510 511 pw->work.handler = nxt_port_post_handler; 512 pw->work.task = &port->engine->task; 513 pw->work.obj = pw; 514 pw->work.data = data; 515 516 pw->port = port; 517 pw->handler = handler; 518 519 nxt_event_engine_post(port->engine, &pw->work); 520 521 return NXT_OK; 522} 523 524 525static void 526nxt_port_release_handler(nxt_task_t *task, nxt_port_t *port, void *data) 527{ 528 /* no op */ 529} 530 531 532void 533nxt_port_use(nxt_task_t *task, nxt_port_t *port, int i) 534{ 535 int c; 536 537 c = nxt_atomic_fetch_add(&port->use_count, i); 538 539 if (i < 0 && c == -i) { 540 541 if (task->thread->engine == port->engine) { 542 nxt_port_release(task, port); 543 544 return; 545 } 546 547 nxt_port_post(task, port, nxt_port_release_handler, NULL); 548 } 549} |
|