nxt_port.c (88:c6879c7b5bdf) nxt_port.c (122:d18727e877c6)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8#include <nxt_runtime.h>
9#include <nxt_port.h>
10
11
12static void nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8#include <nxt_runtime.h>
9#include <nxt_port.h>
10
11
12static void nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
13static void nxt_port_new_port_buf_completion(nxt_task_t *task, void *obj,
14 void *data);
15
16
17void
18nxt_port_create(nxt_task_t *task, nxt_port_t *port,
19 nxt_port_handler_t *handlers)
20{
21 port->pid = nxt_pid;
22 port->engine = task->thread->engine->id;

--- 74 unchanged lines hidden (view full) ---

97 }
98 nxt_runtime_process_loop;
99}
100
101
102nxt_int_t
103nxt_port_send_port(nxt_task_t *task, nxt_port_t *port, nxt_port_t *new_port)
104{
13
14
15void
16nxt_port_create(nxt_task_t *task, nxt_port_t *port,
17 nxt_port_handler_t *handlers)
18{
19 port->pid = nxt_pid;
20 port->engine = task->thread->engine->id;

--- 74 unchanged lines hidden (view full) ---

95 }
96 nxt_runtime_process_loop;
97}
98
99
100nxt_int_t
101nxt_port_send_port(nxt_task_t *task, nxt_port_t *port, nxt_port_t *new_port)
102{
105 nxt_mp_t *mp;
106 nxt_buf_t *b;
107 nxt_port_msg_new_port_t *msg;
108
103 nxt_buf_t *b;
104 nxt_port_msg_new_port_t *msg;
105
109 mp = port->mem_pool;
110
111 b = nxt_buf_mem_alloc(mp, sizeof(nxt_port_data_t), 0);
106 b = nxt_buf_mem_ts_alloc(task, port->mem_pool, sizeof(nxt_port_data_t));
112 if (nxt_slow_path(b == NULL)) {
113 return NXT_ERROR;
114 }
115
116 nxt_debug(task, "send port %FD to process %PI",
117 new_port->pair[1], port->pid);
118
107 if (nxt_slow_path(b == NULL)) {
108 return NXT_ERROR;
109 }
110
111 nxt_debug(task, "send port %FD to process %PI",
112 new_port->pair[1], port->pid);
113
119 b->data = mp;
120 b->completion_handler = nxt_port_new_port_buf_completion;
121 b->mem.free += sizeof(nxt_port_msg_new_port_t);
122 msg = (nxt_port_msg_new_port_t *) b->mem.pos;
123
124 msg->id = new_port->id;
125 msg->pid = new_port->pid;
126 msg->engine = new_port->engine;
127 msg->max_size = port->max_size;
128 msg->max_share = port->max_share;
129 msg->type = new_port->type;
130
131 return nxt_port_socket_write(task, port, NXT_PORT_MSG_NEW_PORT,
132 new_port->pair[1], 0, 0, b);
133}
134
135
114 b->mem.free += sizeof(nxt_port_msg_new_port_t);
115 msg = (nxt_port_msg_new_port_t *) b->mem.pos;
116
117 msg->id = new_port->id;
118 msg->pid = new_port->pid;
119 msg->engine = new_port->engine;
120 msg->max_size = port->max_size;
121 msg->max_share = port->max_share;
122 msg->type = new_port->type;
123
124 return nxt_port_socket_write(task, port, NXT_PORT_MSG_NEW_PORT,
125 new_port->pair[1], 0, 0, b);
126}
127
128
136static void
137nxt_port_new_port_buf_completion(nxt_task_t *task, void *obj, void *data)
138{
139 nxt_mp_t *mp;
140 nxt_buf_t *b;
141
142 b = obj;
143 mp = b->data;
144
145 nxt_buf_free(mp, b);
146}
147
148
149void
150nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
151{
152 nxt_mp_t *mp;
153 nxt_port_t *port;
154 nxt_process_t *process;
155 nxt_runtime_t *rt;
156 nxt_port_msg_new_port_t *new_port_msg;

--- 162 unchanged lines hidden ---
129void
130nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
131{
132 nxt_mp_t *mp;
133 nxt_port_t *port;
134 nxt_process_t *process;
135 nxt_runtime_t *rt;
136 nxt_port_msg_new_port_t *new_port_msg;

--- 162 unchanged lines hidden ---