xref: /unit/src/nxt_port.c (revision 20:4dc92b438f58)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 #include <nxt_runtime.h>
9 #include <nxt_port.h>
10 
11 
12 static void nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
13 static void nxt_port_new_port_buf_completion(nxt_task_t *task, void *obj,
14     void *data);
15 
16 
17 void
18 nxt_port_create(nxt_thread_t *thread, nxt_port_t *port,
19     nxt_port_handler_t *handlers)
20 {
21     port->pid = nxt_pid;
22     port->engine = thread->engine->id;
23     port->handler = nxt_port_handler;
24     port->data = handlers;
25 
26     nxt_port_write_close(port);
27     nxt_port_read_enable(&thread->engine->task, port);
28 }
29 
30 
31 void
32 nxt_port_write(nxt_task_t *task, nxt_runtime_t *rt, nxt_uint_t type,
33     nxt_fd_t fd, uint32_t stream, nxt_buf_t *b)
34 {
35     nxt_uint_t     i, n, nprocesses, nports;
36     nxt_port_t     *port;
37     nxt_process_t  *process;
38 
39     process = rt->processes->elts;
40     nprocesses = rt->processes->nelts;
41 
42     for (i = 0; i < nprocesses; i++) {
43 
44         if (nxt_pid != process[i].pid) {
45             port = process[i].ports->elts;
46             nports = process[i].ports->nelts;
47 
48             for (n = 0; n < nports; n++) {
49                 (void) nxt_port_socket_write(task, &port[n], type,
50                                              fd, stream, b);
51             }
52         }
53     }
54 }
55 
56 
57 static void
58 nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
59 {
60     nxt_port_handler_t  *handlers;
61 
62     if (nxt_fast_path(msg->type <= NXT_PORT_MSG_MAX)) {
63 
64         nxt_debug(task, "port %d: message type:%uD",
65                   msg->port->socket.fd, msg->type);
66 
67         handlers = msg->port->data;
68         handlers[msg->type](task, msg);
69 
70         return;
71     }
72 
73     nxt_log(task, NXT_LOG_CRIT, "port %d: unknown message type:%uD",
74             msg->port->socket.fd, msg->type);
75 }
76 
77 
78 void
79 nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
80 {
81     nxt_runtime_quit(task);
82 }
83 
84 
85 void
86 nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt,
87     nxt_port_t *new_port)
88 {
89     nxt_buf_t                *b;
90     nxt_uint_t               i, n;
91     nxt_port_t               *port;
92     nxt_process_t            *process;
93     nxt_port_msg_new_port_t  *msg;
94 
95     n = rt->processes->nelts;
96     if (n == 0) {
97         return;
98     }
99 
100     nxt_debug(task, "new port %d for process %PI engine %uD",
101               new_port->socket.fd, new_port->pid, new_port->engine);
102 
103     process = rt->processes->elts;
104 
105     for (i = 0; i < n; i++) {
106 
107         if (process[i].pid == new_port->pid || process[i].pid == nxt_pid) {
108             continue;
109         }
110 
111         port = process[i].ports->elts;
112 
113         b = nxt_buf_mem_alloc(port->mem_pool, sizeof(nxt_port_data_t), 0);
114 
115         if (nxt_slow_path(b == NULL)) {
116             continue;
117         }
118 
119         b->data = port;
120         b->completion_handler = nxt_port_new_port_buf_completion;
121         b->mem.free += sizeof(nxt_port_msg_new_port_t);
122         msg = (nxt_port_msg_new_port_t *) b->mem.pos;
123 
124         msg->pid = new_port->pid;
125         msg->engine = new_port->engine;
126         msg->max_size = port->max_size;
127         msg->max_share = port->max_share;
128 
129         (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_NEW_PORT,
130                                      new_port->socket.fd, 0, b);
131     }
132 }
133 
134 
135 static void
136 nxt_port_new_port_buf_completion(nxt_task_t *task, void *obj, void *data)
137 {
138     nxt_buf_t   *b;
139     nxt_port_t  *port;
140 
141     b = obj;
142     port = b->data;
143 
144     /* TODO: b->mem.pos */
145 
146     nxt_buf_free(port->mem_pool, b);
147 }
148 
149 
150 void
151 nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
152 {
153     nxt_port_t               *port;
154     nxt_process_t            *process;
155     nxt_runtime_t            *rt;
156     nxt_mem_pool_t           *mp;
157     nxt_port_msg_new_port_t  *new_port_msg;
158 
159     rt = task->thread->runtime;
160 
161     process = nxt_runtime_new_process(rt);
162     if (nxt_slow_path(process == NULL)) {
163         return;
164     }
165 
166     port = nxt_array_zero_add(process->ports);
167     if (nxt_slow_path(port == NULL)) {
168         return;
169     }
170 
171     mp = nxt_mem_pool_create(1024);
172     if (nxt_slow_path(mp == NULL)) {
173         return;
174     }
175 
176     port->mem_pool = mp;
177 
178     new_port_msg = (nxt_port_msg_new_port_t *) msg->buf->mem.pos;
179     msg->buf->mem.pos = msg->buf->mem.free;
180 
181     nxt_debug(task, "new port %d received for process %PI engine %uD",
182               msg->fd, new_port_msg->pid, new_port_msg->engine);
183 
184     process->pid = new_port_msg->pid;
185 
186     port->pid = new_port_msg->pid;
187     port->engine = new_port_msg->engine;
188     port->pair[0] = -1;
189     port->pair[1] = msg->fd;
190     port->max_size = new_port_msg->max_size;
191     port->max_share = new_port_msg->max_share;
192 
193     nxt_queue_init(&port->messages);
194 
195     port->socket.task = task;
196 
197     nxt_port_write_enable(task, port);
198 }
199 
200 
201 void
202 nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt, nxt_uint_t slot,
203     nxt_fd_t fd)
204 {
205     nxt_buf_t      *b;
206     nxt_uint_t     i, n;
207     nxt_port_t     *port;
208     nxt_process_t  *process;
209 
210     n = rt->processes->nelts;
211     if (n == 0) {
212         return;
213     }
214 
215     nxt_debug(task, "change log file #%ui fd:%FD", slot, fd);
216 
217     process = rt->processes->elts;
218 
219     /* process[0] is master process. */
220 
221     for (i = 1; i < n; i++) {
222         port = process[i].ports->elts;
223 
224         b = nxt_buf_mem_alloc(port->mem_pool, sizeof(nxt_port_data_t), 0);
225         if (nxt_slow_path(b == NULL)) {
226             continue;
227         }
228 
229         *(nxt_uint_t *) b->mem.pos = slot;
230         b->mem.free += sizeof(nxt_uint_t);
231 
232         (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_CHANGE_FILE,
233                                      fd, 0, b);
234     }
235 }
236 
237 
238 void
239 nxt_port_change_log_file_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
240 {
241     nxt_buf_t      *b;
242     nxt_uint_t     slot;
243     nxt_file_t     *log_file;
244     nxt_runtime_t  *rt;
245 
246     rt = task->thread->runtime;
247 
248     b = msg->buf;
249     slot = *(nxt_uint_t *) b->mem.pos;
250 
251     log_file = nxt_list_elt(rt->log_files, slot);
252 
253     nxt_debug(task, "change log file %FD:%FD", msg->fd, log_file->fd);
254 
255     /*
256      * The old log file descriptor must be closed at the moment when no
257      * other threads use it.  dup2() allows to use the old file descriptor
258      * for new log file.  This change is performed atomically in the kernel.
259      */
260     if (nxt_file_redirect(log_file, msg->fd) == NXT_OK) {
261 
262         if (slot == 0) {
263             (void) nxt_file_stderr(log_file);
264         }
265     }
266 }
267 
268 
269 void
270 nxt_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
271 {
272     nxt_buf_t  *b;
273 
274     b = msg->buf;
275 
276     nxt_debug(task, "data: %*s", b->mem.free - b->mem.pos, b->mem.pos);
277 
278     b->mem.pos = b->mem.free;
279 }
280 
281 
282 void
283 nxt_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
284 {
285     nxt_debug(task, "port empty handler");
286 }
287