xref: /unit/src/nxt_port.c (revision 141)
111Sigor@sysoev.ru 
211Sigor@sysoev.ru /*
311Sigor@sysoev.ru  * Copyright (C) Igor Sysoev
411Sigor@sysoev.ru  * Copyright (C) NGINX, Inc.
511Sigor@sysoev.ru  */
611Sigor@sysoev.ru 
711Sigor@sysoev.ru #include <nxt_main.h>
820Sigor@sysoev.ru #include <nxt_runtime.h>
911Sigor@sysoev.ru #include <nxt_port.h>
1011Sigor@sysoev.ru 
1111Sigor@sysoev.ru 
1214Sigor@sysoev.ru static void nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
1311Sigor@sysoev.ru 
14*141Smax.romanov@nginx.com static nxt_atomic_uint_t nxt_port_last_id;
15*141Smax.romanov@nginx.com 
16*141Smax.romanov@nginx.com nxt_port_id_t
17*141Smax.romanov@nginx.com nxt_port_get_next_id()
18*141Smax.romanov@nginx.com {
19*141Smax.romanov@nginx.com     return nxt_atomic_fetch_add(&nxt_port_last_id, 1);
20*141Smax.romanov@nginx.com }
21*141Smax.romanov@nginx.com 
2211Sigor@sysoev.ru 
2311Sigor@sysoev.ru void
24*141Smax.romanov@nginx.com nxt_port_reset_next_id()
25*141Smax.romanov@nginx.com {
26*141Smax.romanov@nginx.com     nxt_port_last_id = 1;
27*141Smax.romanov@nginx.com }
28*141Smax.romanov@nginx.com 
29*141Smax.romanov@nginx.com 
30*141Smax.romanov@nginx.com void
31*141Smax.romanov@nginx.com nxt_port_enable(nxt_task_t *task, nxt_port_t *port,
3214Sigor@sysoev.ru     nxt_port_handler_t *handlers)
3311Sigor@sysoev.ru {
3414Sigor@sysoev.ru     port->pid = nxt_pid;
35*141Smax.romanov@nginx.com     port->engine = task->thread->engine;
3614Sigor@sysoev.ru     port->handler = nxt_port_handler;
3714Sigor@sysoev.ru     port->data = handlers;
3811Sigor@sysoev.ru 
3977Smax.romanov@nginx.com     nxt_port_read_enable(task, port);
4011Sigor@sysoev.ru }
4111Sigor@sysoev.ru 
4211Sigor@sysoev.ru 
4311Sigor@sysoev.ru void
4420Sigor@sysoev.ru nxt_port_write(nxt_task_t *task, nxt_runtime_t *rt, nxt_uint_t type,
4511Sigor@sysoev.ru     nxt_fd_t fd, uint32_t stream, nxt_buf_t *b)
4611Sigor@sysoev.ru {
4720Sigor@sysoev.ru     nxt_port_t     *port;
4820Sigor@sysoev.ru     nxt_process_t  *process;
4920Sigor@sysoev.ru 
5042Smax.romanov@nginx.com     nxt_runtime_process_each(rt, process)
5142Smax.romanov@nginx.com     {
5242Smax.romanov@nginx.com         if (nxt_pid != process->pid) {
5342Smax.romanov@nginx.com             nxt_process_port_each(process, port) {
5411Sigor@sysoev.ru 
5542Smax.romanov@nginx.com                 (void) nxt_port_socket_write(task, port, type,
5642Smax.romanov@nginx.com                                              fd, stream, 0, b);
5711Sigor@sysoev.ru 
5842Smax.romanov@nginx.com             } nxt_process_port_loop;
5911Sigor@sysoev.ru         }
6011Sigor@sysoev.ru     }
6142Smax.romanov@nginx.com     nxt_runtime_process_loop;
6211Sigor@sysoev.ru }
6311Sigor@sysoev.ru 
6411Sigor@sysoev.ru 
6511Sigor@sysoev.ru static void
6614Sigor@sysoev.ru nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
6711Sigor@sysoev.ru {
6814Sigor@sysoev.ru     nxt_port_handler_t  *handlers;
6911Sigor@sysoev.ru 
70125Smax.romanov@nginx.com     if (nxt_fast_path(msg->port_msg.type < NXT_PORT_MSG_MAX)) {
7111Sigor@sysoev.ru 
7211Sigor@sysoev.ru         nxt_debug(task, "port %d: message type:%uD",
7342Smax.romanov@nginx.com                   msg->port->socket.fd, msg->port_msg.type);
7411Sigor@sysoev.ru 
7511Sigor@sysoev.ru         handlers = msg->port->data;
7642Smax.romanov@nginx.com         handlers[msg->port_msg.type](task, msg);
7711Sigor@sysoev.ru 
7811Sigor@sysoev.ru         return;
7911Sigor@sysoev.ru     }
8011Sigor@sysoev.ru 
8111Sigor@sysoev.ru     nxt_log(task, NXT_LOG_CRIT, "port %d: unknown message type:%uD",
8242Smax.romanov@nginx.com             msg->port->socket.fd, msg->port_msg.type);
8311Sigor@sysoev.ru }
8411Sigor@sysoev.ru 
8511Sigor@sysoev.ru 
8611Sigor@sysoev.ru void
8714Sigor@sysoev.ru nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
8811Sigor@sysoev.ru {
8920Sigor@sysoev.ru     nxt_runtime_quit(task);
9011Sigor@sysoev.ru }
9111Sigor@sysoev.ru 
9211Sigor@sysoev.ru 
9311Sigor@sysoev.ru void
9420Sigor@sysoev.ru nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt,
95*141Smax.romanov@nginx.com     nxt_port_t *new_port, uint32_t stream)
9611Sigor@sysoev.ru {
97*141Smax.romanov@nginx.com     nxt_port_t     *port;
9881Smax.romanov@nginx.com     nxt_process_t  *process;
9911Sigor@sysoev.ru 
100*141Smax.romanov@nginx.com     nxt_debug(task, "new port %d for process %PI",
101*141Smax.romanov@nginx.com               new_port->pair[1], new_port->pid);
10211Sigor@sysoev.ru 
10342Smax.romanov@nginx.com     nxt_runtime_process_each(rt, process)
10442Smax.romanov@nginx.com     {
10542Smax.romanov@nginx.com         if (process->pid == new_port->pid || process->pid == nxt_pid) {
10611Sigor@sysoev.ru             continue;
10711Sigor@sysoev.ru         }
10811Sigor@sysoev.ru 
109*141Smax.romanov@nginx.com         port = nxt_process_port_first(process);
110*141Smax.romanov@nginx.com 
111*141Smax.romanov@nginx.com         if (port->type == NXT_PROCESS_MASTER ||
112*141Smax.romanov@nginx.com             port->type == NXT_PROCESS_CONTROLLER ||
113*141Smax.romanov@nginx.com             port->type == NXT_PROCESS_ROUTER) {
114*141Smax.romanov@nginx.com 
115*141Smax.romanov@nginx.com             (void) nxt_port_send_port(task, port, new_port, stream);
116*141Smax.romanov@nginx.com         }
11711Sigor@sysoev.ru     }
11842Smax.romanov@nginx.com     nxt_runtime_process_loop;
11911Sigor@sysoev.ru }
12011Sigor@sysoev.ru 
12111Sigor@sysoev.ru 
12281Smax.romanov@nginx.com nxt_int_t
123*141Smax.romanov@nginx.com nxt_port_send_port(nxt_task_t *task, nxt_port_t *port, nxt_port_t *new_port,
124*141Smax.romanov@nginx.com     uint32_t stream)
12581Smax.romanov@nginx.com {
12681Smax.romanov@nginx.com     nxt_buf_t                *b;
12781Smax.romanov@nginx.com     nxt_port_msg_new_port_t  *msg;
12881Smax.romanov@nginx.com 
129122Smax.romanov@nginx.com     b = nxt_buf_mem_ts_alloc(task, port->mem_pool, sizeof(nxt_port_data_t));
13081Smax.romanov@nginx.com     if (nxt_slow_path(b == NULL)) {
13181Smax.romanov@nginx.com         return NXT_ERROR;
13281Smax.romanov@nginx.com     }
13381Smax.romanov@nginx.com 
13481Smax.romanov@nginx.com     nxt_debug(task, "send port %FD to process %PI",
13581Smax.romanov@nginx.com               new_port->pair[1], port->pid);
13681Smax.romanov@nginx.com 
13781Smax.romanov@nginx.com     b->mem.free += sizeof(nxt_port_msg_new_port_t);
13881Smax.romanov@nginx.com     msg = (nxt_port_msg_new_port_t *) b->mem.pos;
13981Smax.romanov@nginx.com 
14081Smax.romanov@nginx.com     msg->id = new_port->id;
14181Smax.romanov@nginx.com     msg->pid = new_port->pid;
14281Smax.romanov@nginx.com     msg->max_size = port->max_size;
14381Smax.romanov@nginx.com     msg->max_share = port->max_share;
14481Smax.romanov@nginx.com     msg->type = new_port->type;
14581Smax.romanov@nginx.com 
14681Smax.romanov@nginx.com     return nxt_port_socket_write(task, port, NXT_PORT_MSG_NEW_PORT,
147*141Smax.romanov@nginx.com                                  new_port->pair[1], stream, 0, b);
14881Smax.romanov@nginx.com }
14981Smax.romanov@nginx.com 
15081Smax.romanov@nginx.com 
15111Sigor@sysoev.ru void
15214Sigor@sysoev.ru nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
15311Sigor@sysoev.ru {
15465Sigor@sysoev.ru     nxt_mp_t                 *mp;
15511Sigor@sysoev.ru     nxt_port_t               *port;
15620Sigor@sysoev.ru     nxt_process_t            *process;
15720Sigor@sysoev.ru     nxt_runtime_t            *rt;
15814Sigor@sysoev.ru     nxt_port_msg_new_port_t  *new_port_msg;
15911Sigor@sysoev.ru 
16020Sigor@sysoev.ru     rt = task->thread->runtime;
16111Sigor@sysoev.ru 
16242Smax.romanov@nginx.com     new_port_msg = (nxt_port_msg_new_port_t *) msg->buf->mem.pos;
16342Smax.romanov@nginx.com     msg->buf->mem.pos = msg->buf->mem.free;
16442Smax.romanov@nginx.com 
165*141Smax.romanov@nginx.com     nxt_debug(task, "new port %d received for process %PI:%d",
166*141Smax.romanov@nginx.com               msg->fd, new_port_msg->pid, new_port_msg->id);
167*141Smax.romanov@nginx.com 
168*141Smax.romanov@nginx.com     port = nxt_runtime_port_find(rt, new_port_msg->pid, new_port_msg->id);
169*141Smax.romanov@nginx.com     if (port != NULL) {
170*141Smax.romanov@nginx.com         nxt_debug(task, "port %PI:%d already exists", new_port_msg->pid,
171*141Smax.romanov@nginx.com               new_port_msg->id);
172*141Smax.romanov@nginx.com 
173*141Smax.romanov@nginx.com         nxt_fd_close(msg->fd);
174*141Smax.romanov@nginx.com         msg->fd = -1;
175*141Smax.romanov@nginx.com         return;
176*141Smax.romanov@nginx.com     }
177*141Smax.romanov@nginx.com 
17842Smax.romanov@nginx.com     process = nxt_runtime_process_get(rt, new_port_msg->pid);
17920Sigor@sysoev.ru     if (nxt_slow_path(process == NULL)) {
18020Sigor@sysoev.ru         return;
18120Sigor@sysoev.ru     }
18220Sigor@sysoev.ru 
183*141Smax.romanov@nginx.com     port = nxt_process_port_new(rt, process, new_port_msg->id,
184*141Smax.romanov@nginx.com                                 new_port_msg->type);
18511Sigor@sysoev.ru     if (nxt_slow_path(port == NULL)) {
18611Sigor@sysoev.ru         return;
18711Sigor@sysoev.ru     }
18811Sigor@sysoev.ru 
18965Sigor@sysoev.ru     mp = nxt_mp_create(1024, 128, 256, 32);
19014Sigor@sysoev.ru     if (nxt_slow_path(mp == NULL)) {
19114Sigor@sysoev.ru         return;
19214Sigor@sysoev.ru     }
19311Sigor@sysoev.ru 
19414Sigor@sysoev.ru     port->mem_pool = mp;
19514Sigor@sysoev.ru 
19614Sigor@sysoev.ru     port->pair[0] = -1;
19711Sigor@sysoev.ru     port->pair[1] = msg->fd;
19814Sigor@sysoev.ru     port->max_size = new_port_msg->max_size;
19914Sigor@sysoev.ru     port->max_share = new_port_msg->max_share;
20011Sigor@sysoev.ru 
20114Sigor@sysoev.ru     nxt_queue_init(&port->messages);
20214Sigor@sysoev.ru 
20314Sigor@sysoev.ru     port->socket.task = task;
20414Sigor@sysoev.ru 
20542Smax.romanov@nginx.com     nxt_runtime_port_add(rt, port);
20642Smax.romanov@nginx.com 
20711Sigor@sysoev.ru     nxt_port_write_enable(task, port);
208*141Smax.romanov@nginx.com 
209*141Smax.romanov@nginx.com     msg->new_port = port;
210*141Smax.romanov@nginx.com }
211*141Smax.romanov@nginx.com 
212*141Smax.romanov@nginx.com 
213*141Smax.romanov@nginx.com void
214*141Smax.romanov@nginx.com nxt_port_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
215*141Smax.romanov@nginx.com {
216*141Smax.romanov@nginx.com     nxt_port_t     *port;
217*141Smax.romanov@nginx.com     nxt_process_t  *process;
218*141Smax.romanov@nginx.com     nxt_runtime_t  *rt;
219*141Smax.romanov@nginx.com 
220*141Smax.romanov@nginx.com     rt = task->thread->runtime;
221*141Smax.romanov@nginx.com 
222*141Smax.romanov@nginx.com     process = nxt_runtime_process_get(rt, msg->port_msg.pid);
223*141Smax.romanov@nginx.com     if (nxt_slow_path(process == NULL)) {
224*141Smax.romanov@nginx.com         return;
225*141Smax.romanov@nginx.com     }
226*141Smax.romanov@nginx.com 
227*141Smax.romanov@nginx.com     process->ready = 1;
228*141Smax.romanov@nginx.com 
229*141Smax.romanov@nginx.com     port = nxt_process_port_first(process);
230*141Smax.romanov@nginx.com     if (nxt_slow_path(port == NULL)) {
231*141Smax.romanov@nginx.com         return;
232*141Smax.romanov@nginx.com     }
233*141Smax.romanov@nginx.com 
234*141Smax.romanov@nginx.com     nxt_debug(task, "process %PI ready", msg->port_msg.pid);
235*141Smax.romanov@nginx.com 
236*141Smax.romanov@nginx.com     if (nxt_runtime_is_master(rt)) {
237*141Smax.romanov@nginx.com         nxt_port_send_new_port(task, rt, port, msg->port_msg.stream);
238*141Smax.romanov@nginx.com     }
23911Sigor@sysoev.ru }
24011Sigor@sysoev.ru 
24111Sigor@sysoev.ru 
24211Sigor@sysoev.ru void
24342Smax.romanov@nginx.com nxt_port_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
24442Smax.romanov@nginx.com {
24542Smax.romanov@nginx.com     nxt_runtime_t  *rt;
24642Smax.romanov@nginx.com     nxt_process_t  *process;
24742Smax.romanov@nginx.com 
24842Smax.romanov@nginx.com     rt = task->thread->runtime;
24942Smax.romanov@nginx.com 
25042Smax.romanov@nginx.com     if (nxt_slow_path(msg->fd == -1)) {
25142Smax.romanov@nginx.com         nxt_log(task, NXT_LOG_WARN, "invalid fd passed with mmap message");
25242Smax.romanov@nginx.com 
25342Smax.romanov@nginx.com         return;
25442Smax.romanov@nginx.com     }
25542Smax.romanov@nginx.com 
25642Smax.romanov@nginx.com     process = nxt_runtime_process_get(rt, msg->port_msg.pid);
25742Smax.romanov@nginx.com     if (nxt_slow_path(process == NULL)) {
25842Smax.romanov@nginx.com         nxt_log(task, NXT_LOG_WARN, "failed to get process #%PI",
25942Smax.romanov@nginx.com                 msg->port_msg.pid);
26042Smax.romanov@nginx.com 
26142Smax.romanov@nginx.com         goto fail_close;
26242Smax.romanov@nginx.com     }
26342Smax.romanov@nginx.com 
26442Smax.romanov@nginx.com     nxt_port_incoming_port_mmap(task, process, msg->fd);
26542Smax.romanov@nginx.com 
26642Smax.romanov@nginx.com fail_close:
26742Smax.romanov@nginx.com 
26842Smax.romanov@nginx.com     close(msg->fd);
26942Smax.romanov@nginx.com }
27042Smax.romanov@nginx.com 
27142Smax.romanov@nginx.com 
27242Smax.romanov@nginx.com void
27320Sigor@sysoev.ru nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt, nxt_uint_t slot,
27420Sigor@sysoev.ru     nxt_fd_t fd)
27511Sigor@sysoev.ru {
27620Sigor@sysoev.ru     nxt_buf_t      *b;
27720Sigor@sysoev.ru     nxt_port_t     *port;
27820Sigor@sysoev.ru     nxt_process_t  *process;
27911Sigor@sysoev.ru 
28014Sigor@sysoev.ru     nxt_debug(task, "change log file #%ui fd:%FD", slot, fd);
28111Sigor@sysoev.ru 
28242Smax.romanov@nginx.com     nxt_runtime_process_each(rt, process)
28342Smax.romanov@nginx.com     {
28442Smax.romanov@nginx.com         if (nxt_pid == process->pid) {
28542Smax.romanov@nginx.com             continue;
28642Smax.romanov@nginx.com         }
28711Sigor@sysoev.ru 
28842Smax.romanov@nginx.com         port = nxt_process_port_first(process);
28911Sigor@sysoev.ru 
29020Sigor@sysoev.ru         b = nxt_buf_mem_alloc(port->mem_pool, sizeof(nxt_port_data_t), 0);
29111Sigor@sysoev.ru         if (nxt_slow_path(b == NULL)) {
29211Sigor@sysoev.ru             continue;
29311Sigor@sysoev.ru         }
29411Sigor@sysoev.ru 
29511Sigor@sysoev.ru         *(nxt_uint_t *) b->mem.pos = slot;
29611Sigor@sysoev.ru         b->mem.free += sizeof(nxt_uint_t);
29711Sigor@sysoev.ru 
29820Sigor@sysoev.ru         (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_CHANGE_FILE,
29942Smax.romanov@nginx.com                                      fd, 0, 0, b);
30011Sigor@sysoev.ru     }
30142Smax.romanov@nginx.com     nxt_runtime_process_loop;
30211Sigor@sysoev.ru }
30311Sigor@sysoev.ru 
30411Sigor@sysoev.ru 
30511Sigor@sysoev.ru void
30614Sigor@sysoev.ru nxt_port_change_log_file_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
30711Sigor@sysoev.ru {
30820Sigor@sysoev.ru     nxt_buf_t      *b;
30920Sigor@sysoev.ru     nxt_uint_t     slot;
31020Sigor@sysoev.ru     nxt_file_t     *log_file;
31120Sigor@sysoev.ru     nxt_runtime_t  *rt;
31211Sigor@sysoev.ru 
31320Sigor@sysoev.ru     rt = task->thread->runtime;
31411Sigor@sysoev.ru 
31511Sigor@sysoev.ru     b = msg->buf;
31611Sigor@sysoev.ru     slot = *(nxt_uint_t *) b->mem.pos;
31711Sigor@sysoev.ru 
31820Sigor@sysoev.ru     log_file = nxt_list_elt(rt->log_files, slot);
31911Sigor@sysoev.ru 
32011Sigor@sysoev.ru     nxt_debug(task, "change log file %FD:%FD", msg->fd, log_file->fd);
32111Sigor@sysoev.ru 
32211Sigor@sysoev.ru     /*
32311Sigor@sysoev.ru      * The old log file descriptor must be closed at the moment when no
32411Sigor@sysoev.ru      * other threads use it.  dup2() allows to use the old file descriptor
32511Sigor@sysoev.ru      * for new log file.  This change is performed atomically in the kernel.
32611Sigor@sysoev.ru      */
32711Sigor@sysoev.ru     if (nxt_file_redirect(log_file, msg->fd) == NXT_OK) {
32811Sigor@sysoev.ru 
32911Sigor@sysoev.ru         if (slot == 0) {
33011Sigor@sysoev.ru             (void) nxt_file_stderr(log_file);
33111Sigor@sysoev.ru         }
33211Sigor@sysoev.ru     }
33311Sigor@sysoev.ru }
33411Sigor@sysoev.ru 
33511Sigor@sysoev.ru 
33611Sigor@sysoev.ru void
33714Sigor@sysoev.ru nxt_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
33811Sigor@sysoev.ru {
33942Smax.romanov@nginx.com     size_t     dump_size;
34011Sigor@sysoev.ru     nxt_buf_t  *b;
34111Sigor@sysoev.ru 
34211Sigor@sysoev.ru     b = msg->buf;
34342Smax.romanov@nginx.com     dump_size = b->mem.free - b->mem.pos;
34411Sigor@sysoev.ru 
34542Smax.romanov@nginx.com     if (dump_size > 300) {
34642Smax.romanov@nginx.com         dump_size = 300;
34742Smax.romanov@nginx.com     }
34842Smax.romanov@nginx.com 
34942Smax.romanov@nginx.com     nxt_debug(task, "data: %*s", dump_size, b->mem.pos);
35011Sigor@sysoev.ru 
35111Sigor@sysoev.ru     b->mem.pos = b->mem.free;
35211Sigor@sysoev.ru }
35311Sigor@sysoev.ru 
35411Sigor@sysoev.ru 
35511Sigor@sysoev.ru void
356125Smax.romanov@nginx.com nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
357125Smax.romanov@nginx.com {
358125Smax.romanov@nginx.com     nxt_pid_t           pid;
359125Smax.romanov@nginx.com     nxt_runtime_t       *rt;
360125Smax.romanov@nginx.com     nxt_process_t       *process;
361125Smax.romanov@nginx.com 
362125Smax.romanov@nginx.com     nxt_debug(task, "port remove pid handler");
363125Smax.romanov@nginx.com 
364125Smax.romanov@nginx.com     rt = task->thread->runtime;
365125Smax.romanov@nginx.com     pid = msg->port_msg.stream;
366125Smax.romanov@nginx.com 
367125Smax.romanov@nginx.com     process = nxt_runtime_process_find(rt, pid);
368125Smax.romanov@nginx.com 
369125Smax.romanov@nginx.com     if (process) {
370125Smax.romanov@nginx.com         nxt_runtime_process_remove(rt, process);
371125Smax.romanov@nginx.com     }
372125Smax.romanov@nginx.com }
373125Smax.romanov@nginx.com 
374125Smax.romanov@nginx.com 
375125Smax.romanov@nginx.com void
37614Sigor@sysoev.ru nxt_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
37711Sigor@sysoev.ru {
37811Sigor@sysoev.ru     nxt_debug(task, "port empty handler");
37911Sigor@sysoev.ru }
380