Deleted
Added
nxt_port.c (1547:cbcd76704c90) | nxt_port.c (1555:1d84b9e4b459) |
---|---|
1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7#include <nxt_main.h> 8#include <nxt_runtime.h> 9#include <nxt_port.h> 10#include <nxt_router.h> | 1 2/* 3 * Copyright (C) Igor Sysoev 4 * Copyright (C) NGINX, Inc. 5 */ 6 7#include <nxt_main.h> 8#include <nxt_runtime.h> 9#include <nxt_port.h> 10#include <nxt_router.h> |
11#include <nxt_port_queue.h> |
|
11 12 13static void nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); 14 15static nxt_atomic_uint_t nxt_port_last_id = 1; 16 17 18static void --- 44 unchanged lines hidden (view full) --- 63 port->mem_pool = mp; 64 port->use_count = 1; 65 66 nxt_mp_cleanup(mp, nxt_port_mp_cleanup, task, port, mp); 67 68 nxt_queue_init(&port->messages); 69 nxt_thread_mutex_create(&port->write_mutex); 70 | 12 13 14static void nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); 15 16static nxt_atomic_uint_t nxt_port_last_id = 1; 17 18 19static void --- 44 unchanged lines hidden (view full) --- 64 port->mem_pool = mp; 65 port->use_count = 1; 66 67 nxt_mp_cleanup(mp, nxt_port_mp_cleanup, task, port, mp); 68 69 nxt_queue_init(&port->messages); 70 nxt_thread_mutex_create(&port->write_mutex); 71 |
72 port->queue_fd = -1; 73 |
|
71 } else { 72 nxt_mp_destroy(mp); 73 } 74 75 nxt_thread_log_debug("port %p %d:%d new, type %d", port, pid, id, type); 76 77 return port; 78} --- 15 unchanged lines hidden (view full) --- 94 if (port->pair[1] != -1) { 95 nxt_fd_close(port->pair[1]); 96 port->pair[1] = -1; 97 98 if (port->app != NULL) { 99 nxt_router_app_port_close(task, port); 100 } 101 } | 74 } else { 75 nxt_mp_destroy(mp); 76 } 77 78 nxt_thread_log_debug("port %p %d:%d new, type %d", port, pid, id, type); 79 80 return port; 81} --- 15 unchanged lines hidden (view full) --- 97 if (port->pair[1] != -1) { 98 nxt_fd_close(port->pair[1]); 99 port->pair[1] = -1; 100 101 if (port->app != NULL) { 102 nxt_router_app_port_close(task, port); 103 } 104 } |
105 106 if (port->queue_fd != -1) { 107 nxt_fd_close(port->queue_fd); 108 port->queue_fd = -1; 109 } 110 111 if (port->queue != NULL) { 112 nxt_mem_munmap(port->queue, sizeof(nxt_port_queue_t)); 113 port->queue = NULL; 114 } |
|
102} 103 104 105static void 106nxt_port_release(nxt_task_t *task, nxt_port_t *port) 107{ 108 nxt_debug(task, "port %p %d:%d release, type %d", port, port->pid, 109 port->id, port->type); --- 61 unchanged lines hidden (view full) --- 171 172void 173nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 174{ 175 nxt_runtime_quit(task, 0); 176} 177 178 | 115} 116 117 118static void 119nxt_port_release(nxt_task_t *task, nxt_port_t *port) 120{ 121 nxt_debug(task, "port %p %d:%d release, type %d", port, port->pid, 122 port->id, port->type); --- 61 unchanged lines hidden (view full) --- 184 185void 186nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 187{ 188 nxt_runtime_quit(task, 0); 189} 190 191 |
192/* TODO join with process_ready and move to nxt_main_process.c */ |
|
179nxt_inline void 180nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt, 181 nxt_port_t *new_port, uint32_t stream) 182{ 183 nxt_port_t *port; 184 nxt_process_t *process; 185 186 nxt_debug(task, "new port %d for process %PI", --- 35 unchanged lines hidden (view full) --- 222 msg = (nxt_port_msg_new_port_t *) b->mem.pos; 223 224 msg->id = new_port->id; 225 msg->pid = new_port->pid; 226 msg->max_size = port->max_size; 227 msg->max_share = port->max_share; 228 msg->type = new_port->type; 229 | 193nxt_inline void 194nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt, 195 nxt_port_t *new_port, uint32_t stream) 196{ 197 nxt_port_t *port; 198 nxt_process_t *process; 199 200 nxt_debug(task, "new port %d for process %PI", --- 35 unchanged lines hidden (view full) --- 236 msg = (nxt_port_msg_new_port_t *) b->mem.pos; 237 238 msg->id = new_port->id; 239 msg->pid = new_port->pid; 240 msg->max_size = port->max_size; 241 msg->max_share = port->max_share; 242 msg->type = new_port->type; 243 |
230 return nxt_port_socket_write(task, port, NXT_PORT_MSG_NEW_PORT, 231 new_port->pair[1], stream, 0, b); | 244 return nxt_port_socket_write2(task, port, NXT_PORT_MSG_NEW_PORT, 245 new_port->pair[1], new_port->queue_fd, 246 stream, 0, b); |
232} 233 234 235void 236nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 237{ 238 nxt_port_t *port; 239 nxt_runtime_t *rt; --- 34 unchanged lines hidden (view full) --- 274 275 port->socket.task = task; 276 277 nxt_port_write_enable(task, port); 278 279 msg->u.new_port = port; 280} 281 | 247} 248 249 250void 251nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 252{ 253 nxt_port_t *port; 254 nxt_runtime_t *rt; --- 34 unchanged lines hidden (view full) --- 289 290 port->socket.task = task; 291 292 nxt_port_write_enable(task, port); 293 294 msg->u.new_port = port; 295} 296 |
282 | 297/* TODO move to nxt_main_process.c */ |
283void 284nxt_port_process_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 285{ 286 nxt_port_t *port; 287 nxt_process_t *process; 288 nxt_runtime_t *rt; 289 290 rt = task->thread->runtime; --- 8 unchanged lines hidden (view full) --- 299 process->state = NXT_PROCESS_STATE_READY; 300 301 nxt_assert(!nxt_queue_is_empty(&process->ports)); 302 303 port = nxt_process_port_first(process); 304 305 nxt_debug(task, "process %PI ready", msg->port_msg.pid); 306 | 298void 299nxt_port_process_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 300{ 301 nxt_port_t *port; 302 nxt_process_t *process; 303 nxt_runtime_t *rt; 304 305 rt = task->thread->runtime; --- 8 unchanged lines hidden (view full) --- 314 process->state = NXT_PROCESS_STATE_READY; 315 316 nxt_assert(!nxt_queue_is_empty(&process->ports)); 317 318 port = nxt_process_port_first(process); 319 320 nxt_debug(task, "process %PI ready", msg->port_msg.pid); 321 |
322 if (msg->fd != -1) { 323 port->queue_fd = msg->fd; 324 port->queue = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t), 325 PROT_READ | PROT_WRITE, MAP_SHARED, msg->fd, 326 0); 327 } 328 |
|
307 nxt_port_send_new_port(task, rt, port, msg->port_msg.stream); 308} 309 310 311void 312nxt_port_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 313{ 314 nxt_runtime_t *rt; --- 229 unchanged lines hidden --- | 329 nxt_port_send_new_port(task, rt, port, msg->port_msg.stream); 330} 331 332 333void 334nxt_port_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 335{ 336 nxt_runtime_t *rt; --- 229 unchanged lines hidden --- |