xref: /unit/src/nxt_port.c (revision 13:3a52b2c3d3f1)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 #include <nxt_cycle.h>
9 #include <nxt_port.h>
10 
11 
12 static void nxt_process_port_handler(nxt_task_t *task,
13     nxt_port_recv_msg_t *msg);
14 static void nxt_process_new_port_buf_completion(nxt_task_t *task, void *obj,
15     void *data);
16 
17 
18 void
19 nxt_process_port_create(nxt_thread_t *thr, nxt_process_port_t *proc,
20     nxt_process_port_handler_t *handlers)
21 {
22     proc->pid = nxt_pid;
23     proc->engine = thr->engine->id;
24     proc->port->handler = nxt_process_port_handler;
25     proc->port->data = handlers;
26 
27     nxt_port_write_close(proc->port);
28     nxt_port_read_enable(&thr->engine->task, proc->port);
29 }
30 
31 
32 void
33 nxt_process_port_write(nxt_task_t *task, nxt_cycle_t *cycle, nxt_uint_t type,
34     nxt_fd_t fd, uint32_t stream, nxt_buf_t *b)
35 {
36     nxt_uint_t          i, n;
37     nxt_process_port_t  *proc;
38 
39     proc = cycle->processes->elts;
40     n = cycle->processes->nelts;
41 
42     for (i = 0; i < n; i++) {
43         if (nxt_pid != proc[i].pid) {
44             (void) nxt_port_write(task, proc[i].port, type, fd, stream, b);
45         }
46     }
47 }
48 
49 
50 static void
51 nxt_process_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
52 {
53     nxt_process_port_handler_t  *handlers;
54 
55     if (nxt_fast_path(msg->type <= NXT_PORT_MSG_MAX)) {
56 
57         nxt_debug(task, "port %d: message type:%uD",
58                   msg->port->socket.fd, msg->type);
59 
60         handlers = msg->port->data;
61         handlers[msg->type](task, msg);
62 
63         return;
64     }
65 
66     nxt_log(task, NXT_LOG_CRIT, "port %d: unknown message type:%uD",
67             msg->port->socket.fd, msg->type);
68 }
69 
70 
71 void
72 nxt_process_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
73 {
74     nxt_cycle_quit(task, NULL);
75 }
76 
77 
78 void
79 nxt_process_new_port(nxt_task_t *task, nxt_cycle_t *cycle,
80     nxt_process_port_t *proc)
81 {
82     nxt_buf_t                *b;
83     nxt_uint_t               i, n;
84     nxt_process_port_t       *p;
85     nxt_proc_msg_new_port_t  *new_port;
86 
87     n = cycle->processes->nelts;
88     if (n == 0) {
89         return;
90     }
91 
92     nxt_thread_log_debug("new port %d for process %PI engine %uD",
93                          proc->port->socket.fd, proc->pid, proc->engine);
94 
95     p = cycle->processes->elts;
96 
97     for (i = 0; i < n; i++) {
98 
99         if (proc->pid == p[i].pid || nxt_pid == p[i].pid || p[i].engine != 0) {
100             continue;
101         }
102 
103         b = nxt_buf_mem_alloc(p[i].port->mem_pool,
104                               sizeof(nxt_process_port_data_t), 0);
105 
106         if (nxt_slow_path(b == NULL)) {
107             continue;
108         }
109 
110         b->data = p[i].port;
111         b->completion_handler = nxt_process_new_port_buf_completion;
112         b->mem.free += sizeof(nxt_proc_msg_new_port_t);
113         new_port = (nxt_proc_msg_new_port_t *) b->mem.pos;
114 
115         new_port->pid = proc->pid;
116         new_port->engine = proc->engine;
117         new_port->max_size = p[i].port->max_size;
118         new_port->max_share = p[i].port->max_share;
119 
120         (void) nxt_port_write(task, p[i].port, NXT_PORT_MSG_NEW_PORT,
121                               proc->port->socket.fd, 0, b);
122     }
123 }
124 
125 
126 static void
127 nxt_process_new_port_buf_completion(nxt_task_t *task, void *obj, void *data)
128 {
129     nxt_buf_t   *b;
130     nxt_port_t  *port;
131 
132     b = obj;
133     port = b->data;
134 
135     /* TODO: b->mem.pos */
136 
137     nxt_buf_free(port->mem_pool, b);
138 }
139 
140 
141 void
142 nxt_process_port_new_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
143 {
144     nxt_port_t               *port;
145     nxt_cycle_t              *cycle;
146     nxt_process_port_t       *proc;
147     nxt_proc_msg_new_port_t  *new_port;
148 
149     cycle = nxt_thread_cycle();
150 
151     proc = nxt_array_add(cycle->processes);
152     if (nxt_slow_path(proc == NULL)) {
153         return;
154     }
155 
156     port = nxt_port_alloc(task);
157     if (nxt_slow_path(port == NULL)) {
158         return;
159     }
160 
161     proc->port = port;
162 
163     new_port = (nxt_proc_msg_new_port_t *) msg->buf->mem.pos;
164     msg->buf->mem.pos = msg->buf->mem.free;
165 
166     nxt_debug(task, "new port %d received for process %PI engine %uD",
167               msg->fd, new_port->pid, new_port->engine);
168 
169     proc->pid = new_port->pid;
170     proc->engine = new_port->engine;
171     port->pair[1] = msg->fd;
172     port->max_size = new_port->max_size;
173     port->max_share = new_port->max_share;
174 
175     /* A read port is not passed at all. */
176     nxt_port_write_enable(task, port);
177 }
178 
179 
180 void
181 nxt_process_port_change_log_file(nxt_task_t *task, nxt_cycle_t *cycle,
182     nxt_uint_t slot, nxt_fd_t fd)
183 {
184     nxt_buf_t           *b;
185     nxt_uint_t          i, n;
186     nxt_process_port_t  *p;
187 
188     n = cycle->processes->nelts;
189     if (n == 0) {
190         return;
191     }
192 
193     nxt_thread_log_debug("change log file #%ui fd:%FD", slot, fd);
194 
195     p = cycle->processes->elts;
196 
197     /* p[0] is master process. */
198 
199     for (i = 1; i < n; i++) {
200         b = nxt_buf_mem_alloc(p[i].port->mem_pool,
201                               sizeof(nxt_process_port_data_t), 0);
202 
203         if (nxt_slow_path(b == NULL)) {
204             continue;
205         }
206 
207         *(nxt_uint_t *) b->mem.pos = slot;
208         b->mem.free += sizeof(nxt_uint_t);
209 
210         (void) nxt_port_write(task, p[i].port, NXT_PORT_MSG_PORTGE_FILE,
211                               fd, 0, b);
212     }
213 }
214 
215 
216 void
217 nxt_process_port_change_log_file_handler(nxt_task_t *task,
218     nxt_port_recv_msg_t *msg)
219 {
220     nxt_buf_t    *b;
221     nxt_uint_t   slot;
222     nxt_file_t   *log_file;
223     nxt_cycle_t  *cycle;
224 
225     cycle = nxt_thread_cycle();
226 
227     b = msg->buf;
228     slot = *(nxt_uint_t *) b->mem.pos;
229 
230     log_file = nxt_list_elt(cycle->log_files, slot);
231 
232     nxt_debug(task, "change log file %FD:%FD", msg->fd, log_file->fd);
233 
234     /*
235      * The old log file descriptor must be closed at the moment when no
236      * other threads use it.  dup2() allows to use the old file descriptor
237      * for new log file.  This change is performed atomically in the kernel.
238      */
239     if (nxt_file_redirect(log_file, msg->fd) == NXT_OK) {
240 
241         if (slot == 0) {
242             (void) nxt_file_stderr(log_file);
243         }
244     }
245 }
246 
247 
248 void
249 nxt_process_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
250 {
251     nxt_buf_t  *b;
252 
253     b = msg->buf;
254 
255     nxt_debug(task, "data: %*s", b->mem.free - b->mem.pos, b->mem.pos);
256 
257     b->mem.pos = b->mem.free;
258 }
259 
260 
261 void
262 nxt_process_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
263 {
264     nxt_debug(task, "port empty handler");
265 }
266