nxt_port.c (1547:cbcd76704c90) nxt_port.c (1555:1d84b9e4b459)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8#include <nxt_runtime.h>
9#include <nxt_port.h>
10#include <nxt_router.h>
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8#include <nxt_runtime.h>
9#include <nxt_port.h>
10#include <nxt_router.h>
11#include <nxt_port_queue.h>
11
12
13static void nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
14
15static nxt_atomic_uint_t nxt_port_last_id = 1;
16
17
18static void

--- 44 unchanged lines hidden (view full) ---

63 port->mem_pool = mp;
64 port->use_count = 1;
65
66 nxt_mp_cleanup(mp, nxt_port_mp_cleanup, task, port, mp);
67
68 nxt_queue_init(&port->messages);
69 nxt_thread_mutex_create(&port->write_mutex);
70
12
13
14static void nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
15
16static nxt_atomic_uint_t nxt_port_last_id = 1;
17
18
19static void

--- 44 unchanged lines hidden (view full) ---

64 port->mem_pool = mp;
65 port->use_count = 1;
66
67 nxt_mp_cleanup(mp, nxt_port_mp_cleanup, task, port, mp);
68
69 nxt_queue_init(&port->messages);
70 nxt_thread_mutex_create(&port->write_mutex);
71
72 port->queue_fd = -1;
73
71 } else {
72 nxt_mp_destroy(mp);
73 }
74
75 nxt_thread_log_debug("port %p %d:%d new, type %d", port, pid, id, type);
76
77 return port;
78}

--- 15 unchanged lines hidden (view full) ---

94 if (port->pair[1] != -1) {
95 nxt_fd_close(port->pair[1]);
96 port->pair[1] = -1;
97
98 if (port->app != NULL) {
99 nxt_router_app_port_close(task, port);
100 }
101 }
74 } else {
75 nxt_mp_destroy(mp);
76 }
77
78 nxt_thread_log_debug("port %p %d:%d new, type %d", port, pid, id, type);
79
80 return port;
81}

--- 15 unchanged lines hidden (view full) ---

97 if (port->pair[1] != -1) {
98 nxt_fd_close(port->pair[1]);
99 port->pair[1] = -1;
100
101 if (port->app != NULL) {
102 nxt_router_app_port_close(task, port);
103 }
104 }
105
106 if (port->queue_fd != -1) {
107 nxt_fd_close(port->queue_fd);
108 port->queue_fd = -1;
109 }
110
111 if (port->queue != NULL) {
112 nxt_mem_munmap(port->queue, sizeof(nxt_port_queue_t));
113 port->queue = NULL;
114 }
102}
103
104
105static void
106nxt_port_release(nxt_task_t *task, nxt_port_t *port)
107{
108 nxt_debug(task, "port %p %d:%d release, type %d", port, port->pid,
109 port->id, port->type);

--- 61 unchanged lines hidden (view full) ---

171
172void
173nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
174{
175 nxt_runtime_quit(task, 0);
176}
177
178
115}
116
117
118static void
119nxt_port_release(nxt_task_t *task, nxt_port_t *port)
120{
121 nxt_debug(task, "port %p %d:%d release, type %d", port, port->pid,
122 port->id, port->type);

--- 61 unchanged lines hidden (view full) ---

184
185void
186nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
187{
188 nxt_runtime_quit(task, 0);
189}
190
191
192/* TODO join with process_ready and move to nxt_main_process.c */
179nxt_inline void
180nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt,
181 nxt_port_t *new_port, uint32_t stream)
182{
183 nxt_port_t *port;
184 nxt_process_t *process;
185
186 nxt_debug(task, "new port %d for process %PI",

--- 35 unchanged lines hidden (view full) ---

222 msg = (nxt_port_msg_new_port_t *) b->mem.pos;
223
224 msg->id = new_port->id;
225 msg->pid = new_port->pid;
226 msg->max_size = port->max_size;
227 msg->max_share = port->max_share;
228 msg->type = new_port->type;
229
193nxt_inline void
194nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt,
195 nxt_port_t *new_port, uint32_t stream)
196{
197 nxt_port_t *port;
198 nxt_process_t *process;
199
200 nxt_debug(task, "new port %d for process %PI",

--- 35 unchanged lines hidden (view full) ---

236 msg = (nxt_port_msg_new_port_t *) b->mem.pos;
237
238 msg->id = new_port->id;
239 msg->pid = new_port->pid;
240 msg->max_size = port->max_size;
241 msg->max_share = port->max_share;
242 msg->type = new_port->type;
243
230 return nxt_port_socket_write(task, port, NXT_PORT_MSG_NEW_PORT,
231 new_port->pair[1], stream, 0, b);
244 return nxt_port_socket_write2(task, port, NXT_PORT_MSG_NEW_PORT,
245 new_port->pair[1], new_port->queue_fd,
246 stream, 0, b);
232}
233
234
235void
236nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
237{
238 nxt_port_t *port;
239 nxt_runtime_t *rt;

--- 34 unchanged lines hidden (view full) ---

274
275 port->socket.task = task;
276
277 nxt_port_write_enable(task, port);
278
279 msg->u.new_port = port;
280}
281
247}
248
249
250void
251nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
252{
253 nxt_port_t *port;
254 nxt_runtime_t *rt;

--- 34 unchanged lines hidden (view full) ---

289
290 port->socket.task = task;
291
292 nxt_port_write_enable(task, port);
293
294 msg->u.new_port = port;
295}
296
282
297/* TODO move to nxt_main_process.c */
283void
284nxt_port_process_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
285{
286 nxt_port_t *port;
287 nxt_process_t *process;
288 nxt_runtime_t *rt;
289
290 rt = task->thread->runtime;

--- 8 unchanged lines hidden (view full) ---

299 process->state = NXT_PROCESS_STATE_READY;
300
301 nxt_assert(!nxt_queue_is_empty(&process->ports));
302
303 port = nxt_process_port_first(process);
304
305 nxt_debug(task, "process %PI ready", msg->port_msg.pid);
306
298void
299nxt_port_process_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
300{
301 nxt_port_t *port;
302 nxt_process_t *process;
303 nxt_runtime_t *rt;
304
305 rt = task->thread->runtime;

--- 8 unchanged lines hidden (view full) ---

314 process->state = NXT_PROCESS_STATE_READY;
315
316 nxt_assert(!nxt_queue_is_empty(&process->ports));
317
318 port = nxt_process_port_first(process);
319
320 nxt_debug(task, "process %PI ready", msg->port_msg.pid);
321
322 if (msg->fd != -1) {
323 port->queue_fd = msg->fd;
324 port->queue = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
325 PROT_READ | PROT_WRITE, MAP_SHARED, msg->fd,
326 0);
327 }
328
307 nxt_port_send_new_port(task, rt, port, msg->port_msg.stream);
308}
309
310
311void
312nxt_port_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
313{
314 nxt_runtime_t *rt;

--- 229 unchanged lines hidden ---
329 nxt_port_send_new_port(task, rt, port, msg->port_msg.stream);
330}
331
332
333void
334nxt_port_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
335{
336 nxt_runtime_t *rt;

--- 229 unchanged lines hidden ---