xref: /unit/src/nxt_port.c (revision 2031:e8518399bc10)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 #include <nxt_runtime.h>
9 #include <nxt_port.h>
10 #include <nxt_router.h>
11 #include <nxt_app_queue.h>
12 #include <nxt_port_queue.h>
13 
14 
15 static void nxt_port_remove_pid(nxt_task_t *task, nxt_port_recv_msg_t *msg,
16     nxt_pid_t pid);
17 static void nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
18 
19 static nxt_atomic_uint_t nxt_port_last_id = 1;
20 
21 
22 static void
23 nxt_port_mp_cleanup(nxt_task_t *task, void *obj, void *data)
24 {
25     nxt_mp_t    *mp;
26     nxt_port_t  *port;
27 
28     port = obj;
29     mp = data;
30 
31     nxt_assert(port->pair[0] == -1);
32     nxt_assert(port->pair[1] == -1);
33 
34     nxt_assert(port->use_count == 0);
35     nxt_assert(port->app_link.next == NULL);
36     nxt_assert(port->idle_link.next == NULL);
37 
38     nxt_assert(nxt_queue_is_empty(&port->messages));
39     nxt_assert(nxt_lvlhsh_is_empty(&port->rpc_streams));
40     nxt_assert(nxt_lvlhsh_is_empty(&port->rpc_peers));
41 
42     nxt_thread_mutex_destroy(&port->write_mutex);
43 
44     nxt_mp_free(mp, port);
45 }
46 
47 
48 nxt_port_t *
49 nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid,
50     nxt_process_type_t type)
51 {
52     nxt_mp_t    *mp;
53     nxt_port_t  *port;
54 
55     mp = nxt_mp_create(1024, 128, 256, 32);
56 
57     if (nxt_slow_path(mp == NULL)) {
58         return NULL;
59     }
60 
61     port = nxt_mp_zalloc(mp, sizeof(nxt_port_t));
62 
63     if (nxt_fast_path(port != NULL)) {
64         port->id = id;
65         port->pid = pid;
66         port->type = type;
67         port->mem_pool = mp;
68         port->use_count = 1;
69 
70         nxt_mp_cleanup(mp, nxt_port_mp_cleanup, task, port, mp);
71 
72         nxt_queue_init(&port->messages);
73         nxt_thread_mutex_create(&port->write_mutex);
74 
75         port->queue_fd = -1;
76 
77     } else {
78         nxt_mp_destroy(mp);
79     }
80 
81     nxt_thread_log_debug("port %p %d:%d new, type %d", port, pid, id, type);
82 
83     return port;
84 }
85 
86 
87 void
88 nxt_port_close(nxt_task_t *task, nxt_port_t *port)
89 {
90     size_t  size;
91 
92     nxt_debug(task, "port %p %d:%d close, type %d", port, port->pid,
93               port->id, port->type);
94 
95     if (port->pair[0] != -1) {
96         nxt_port_rpc_close(task, port);
97 
98         nxt_fd_close(port->pair[0]);
99         port->pair[0] = -1;
100     }
101 
102     if (port->pair[1] != -1) {
103         nxt_fd_close(port->pair[1]);
104         port->pair[1] = -1;
105 
106         if (port->app != NULL) {
107             nxt_router_app_port_close(task, port);
108         }
109     }
110 
111     if (port->queue_fd != -1) {
112         nxt_fd_close(port->queue_fd);
113         port->queue_fd = -1;
114     }
115 
116     if (port->queue != NULL) {
117         size = (port->id == (nxt_port_id_t) -1) ? sizeof(nxt_app_queue_t)
118                                                 : sizeof(nxt_port_queue_t);
119         nxt_mem_munmap(port->queue, size);
120 
121         port->queue = NULL;
122     }
123 }
124 
125 
126 static void
127 nxt_port_release(nxt_task_t *task, nxt_port_t *port)
128 {
129     nxt_debug(task, "port %p %d:%d release, type %d", port, port->pid,
130               port->id, port->type);
131 
132     port->app = NULL;
133 
134     if (port->link.next != NULL) {
135         nxt_assert(port->process != NULL);
136 
137         nxt_process_port_remove(port);
138 
139         nxt_process_use(task, port->process, -1);
140     }
141 
142     nxt_mp_release(port->mem_pool);
143 }
144 
145 
146 nxt_port_id_t
147 nxt_port_get_next_id()
148 {
149     return nxt_atomic_fetch_add(&nxt_port_last_id, 1);
150 }
151 
152 
153 void
154 nxt_port_reset_next_id()
155 {
156     nxt_port_last_id = 1;
157 }
158 
159 
160 void
161 nxt_port_enable(nxt_task_t *task, nxt_port_t *port,
162     const nxt_port_handlers_t *handlers)
163 {
164     port->pid = nxt_pid;
165     port->handler = nxt_port_handler;
166     port->data = (nxt_port_handler_t *) (handlers);
167 
168     nxt_port_read_enable(task, port);
169 }
170 
171 
172 static void
173 nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
174 {
175     nxt_port_handler_t  *handlers;
176 
177     if (nxt_fast_path(msg->port_msg.type < NXT_PORT_MSG_MAX)) {
178 
179         nxt_debug(task, "port %d: message type:%uD",
180                   msg->port->socket.fd, msg->port_msg.type);
181 
182         handlers = msg->port->data;
183         handlers[msg->port_msg.type](task, msg);
184 
185         return;
186     }
187 
188     nxt_alert(task, "port %d: unknown message type:%uD",
189               msg->port->socket.fd, msg->port_msg.type);
190 }
191 
192 
193 void
194 nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
195 {
196     nxt_runtime_quit(task, 0);
197 }
198 
199 
200 /* TODO join with process_ready and move to nxt_main_process.c */
201 nxt_inline void
202 nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt,
203     nxt_port_t *new_port, uint32_t stream)
204 {
205     nxt_port_t     *port;
206     nxt_process_t  *process;
207 
208     nxt_debug(task, "new port %d for process %PI",
209               new_port->pair[1], new_port->pid);
210 
211     nxt_runtime_process_each(rt, process) {
212 
213         if (process->pid == new_port->pid || process->pid == nxt_pid) {
214             continue;
215         }
216 
217         port = nxt_process_port_first(process);
218 
219         if (nxt_proc_send_matrix[port->type][new_port->type]) {
220             (void) nxt_port_send_port(task, port, new_port, stream);
221         }
222 
223     } nxt_runtime_process_loop;
224 }
225 
226 
227 nxt_int_t
228 nxt_port_send_port(nxt_task_t *task, nxt_port_t *port, nxt_port_t *new_port,
229     uint32_t stream)
230 {
231     nxt_buf_t                *b;
232     nxt_port_msg_new_port_t  *msg;
233 
234     b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
235                              sizeof(nxt_port_data_t));
236     if (nxt_slow_path(b == NULL)) {
237         return NXT_ERROR;
238     }
239 
240     nxt_debug(task, "send port %FD to process %PI",
241               new_port->pair[1], port->pid);
242 
243     b->mem.free += sizeof(nxt_port_msg_new_port_t);
244     msg = (nxt_port_msg_new_port_t *) b->mem.pos;
245 
246     msg->id = new_port->id;
247     msg->pid = new_port->pid;
248     msg->max_size = port->max_size;
249     msg->max_share = port->max_share;
250     msg->type = new_port->type;
251 
252     return nxt_port_socket_write2(task, port, NXT_PORT_MSG_NEW_PORT,
253                                   new_port->pair[1], new_port->queue_fd,
254                                   stream, 0, b);
255 }
256 
257 
258 void
259 nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
260 {
261     nxt_port_t               *port;
262     nxt_runtime_t            *rt;
263     nxt_port_msg_new_port_t  *new_port_msg;
264 
265     rt = task->thread->runtime;
266 
267     new_port_msg = (nxt_port_msg_new_port_t *) msg->buf->mem.pos;
268 
269     /* TODO check b size and make plain */
270 
271     nxt_debug(task, "new port %d received for process %PI:%d",
272               msg->fd[0], new_port_msg->pid, new_port_msg->id);
273 
274     port = nxt_runtime_port_find(rt, new_port_msg->pid, new_port_msg->id);
275     if (port != NULL) {
276         nxt_debug(task, "port %PI:%d already exists", new_port_msg->pid,
277               new_port_msg->id);
278 
279         msg->u.new_port = port;
280 
281         nxt_fd_close(msg->fd[0]);
282         msg->fd[0] = -1;
283         return;
284     }
285 
286     port = nxt_runtime_process_port_create(task, rt, new_port_msg->pid,
287                                            new_port_msg->id,
288                                            new_port_msg->type);
289     if (nxt_slow_path(port == NULL)) {
290         return;
291     }
292 
293     nxt_fd_nonblocking(task, msg->fd[0]);
294 
295     port->pair[0] = -1;
296     port->pair[1] = msg->fd[0];
297     port->max_size = new_port_msg->max_size;
298     port->max_share = new_port_msg->max_share;
299 
300     port->socket.task = task;
301 
302     nxt_port_write_enable(task, port);
303 
304     msg->u.new_port = port;
305 }
306 
307 /* TODO move to nxt_main_process.c */
308 void
309 nxt_port_process_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
310 {
311     nxt_port_t     *port;
312     nxt_process_t  *process;
313     nxt_runtime_t  *rt;
314 
315     rt = task->thread->runtime;
316 
317     process = nxt_runtime_process_find(rt, msg->port_msg.pid);
318     if (nxt_slow_path(process == NULL)) {
319         return;
320     }
321 
322     nxt_assert(process->state != NXT_PROCESS_STATE_READY);
323 
324     process->state = NXT_PROCESS_STATE_READY;
325 
326     nxt_assert(!nxt_queue_is_empty(&process->ports));
327 
328     port = nxt_process_port_first(process);
329 
330     nxt_debug(task, "process %PI ready", msg->port_msg.pid);
331 
332     if (msg->fd[0] != -1) {
333         port->queue_fd = msg->fd[0];
334         port->queue = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
335                                    PROT_READ | PROT_WRITE, MAP_SHARED,
336                                    msg->fd[0], 0);
337     }
338 
339     nxt_port_send_new_port(task, rt, port, msg->port_msg.stream);
340 }
341 
342 
343 void
344 nxt_port_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
345 {
346     nxt_runtime_t  *rt;
347     nxt_process_t  *process;
348 
349     rt = task->thread->runtime;
350 
351     if (nxt_slow_path(msg->fd[0] == -1)) {
352         nxt_log(task, NXT_LOG_WARN, "invalid fd passed with mmap message");
353 
354         return;
355     }
356 
357     process = nxt_runtime_process_find(rt, msg->port_msg.pid);
358     if (nxt_slow_path(process == NULL)) {
359         nxt_log(task, NXT_LOG_WARN, "failed to get process #%PI",
360                 msg->port_msg.pid);
361 
362         goto fail_close;
363     }
364 
365     nxt_port_incoming_port_mmap(task, process, msg->fd[0]);
366 
367 fail_close:
368 
369     nxt_fd_close(msg->fd[0]);
370 }
371 
372 
373 void
374 nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt, nxt_uint_t slot,
375     nxt_fd_t fd)
376 {
377     nxt_buf_t      *b;
378     nxt_port_t     *port;
379     nxt_process_t  *process;
380 
381     nxt_debug(task, "change log file #%ui fd:%FD", slot, fd);
382 
383     nxt_runtime_process_each(rt, process) {
384 
385         if (nxt_pid == process->pid) {
386             continue;
387         }
388 
389         port = nxt_process_port_first(process);
390 
391         b = nxt_buf_mem_alloc(task->thread->engine->mem_pool,
392                               sizeof(nxt_uint_t), 0);
393         if (nxt_slow_path(b == NULL)) {
394             continue;
395         }
396 
397         b->mem.free = nxt_cpymem(b->mem.free, &slot, sizeof(nxt_uint_t));
398 
399         (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_CHANGE_FILE,
400                                      fd, 0, 0, b);
401 
402     } nxt_runtime_process_loop;
403 }
404 
405 
406 void
407 nxt_port_change_log_file_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
408 {
409     nxt_buf_t      *b;
410     nxt_uint_t     slot;
411     nxt_file_t     *log_file;
412     nxt_runtime_t  *rt;
413 
414     rt = task->thread->runtime;
415 
416     b = msg->buf;
417     slot = *(nxt_uint_t *) b->mem.pos;
418 
419     log_file = nxt_list_elt(rt->log_files, slot);
420 
421     nxt_debug(task, "change log file %FD:%FD", msg->fd[0], log_file->fd);
422 
423     /*
424      * The old log file descriptor must be closed at the moment when no
425      * other threads use it.  dup2() allows to use the old file descriptor
426      * for new log file.  This change is performed atomically in the kernel.
427      */
428     if (nxt_file_redirect(log_file, msg->fd[0]) == NXT_OK) {
429         if (slot == 0) {
430             (void) nxt_file_stderr(log_file);
431         }
432     }
433 }
434 
435 
436 void
437 nxt_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
438 {
439     size_t     dump_size;
440     nxt_buf_t  *b;
441 
442     b = msg->buf;
443     dump_size = b->mem.free - b->mem.pos;
444 
445     if (dump_size > 300) {
446         dump_size = 300;
447     }
448 
449     nxt_debug(task, "data: %*s", dump_size, b->mem.pos);
450 }
451 
452 
453 void
454 nxt_port_remove_notify_others(nxt_task_t *task, nxt_process_t *process)
455 {
456     nxt_pid_t           pid;
457     nxt_buf_t           *buf;
458     nxt_port_t          *port;
459     nxt_runtime_t       *rt;
460     nxt_process_t       *p;
461     nxt_process_type_t  ptype;
462 
463     pid = process->pid;
464 
465     ptype = nxt_process_type(process);
466 
467     rt = task->thread->runtime;
468 
469     nxt_runtime_process_each(rt, p) {
470 
471         if (p->pid == nxt_pid
472             || p->pid == pid
473             || nxt_queue_is_empty(&p->ports))
474         {
475             continue;
476         }
477 
478         port = nxt_process_port_first(p);
479 
480         if (nxt_proc_remove_notify_matrix[ptype][port->type] == 0) {
481             continue;
482         }
483 
484         buf = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
485                                    sizeof(pid));
486 
487         if (nxt_slow_path(buf == NULL)) {
488             continue;
489         }
490 
491         buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(pid));
492 
493         nxt_port_socket_write(task, port, NXT_PORT_MSG_REMOVE_PID, -1,
494                               process->stream, 0, buf);
495 
496     } nxt_runtime_process_loop;
497 }
498 
499 
500 void
501 nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
502 {
503     nxt_pid_t  pid;
504     nxt_buf_t  *buf;
505 
506     buf = msg->buf;
507 
508     nxt_assert(nxt_buf_used_size(buf) == sizeof(pid));
509 
510     nxt_memcpy(&pid, buf->mem.pos, sizeof(nxt_pid_t));
511 
512     nxt_port_remove_pid(task, msg, pid);
513 }
514 
515 
516 static void
517 nxt_port_remove_pid(nxt_task_t *task, nxt_port_recv_msg_t *msg,
518     nxt_pid_t pid)
519 {
520     nxt_runtime_t  *rt;
521     nxt_process_t  *process;
522 
523     msg->u.removed_pid = pid;
524 
525     nxt_debug(task, "port remove pid %PI handler", pid);
526 
527     rt = task->thread->runtime;
528 
529     nxt_port_rpc_remove_peer(task, msg->port, pid);
530 
531     process = nxt_runtime_process_find(rt, pid);
532 
533     if (process) {
534         nxt_process_close_ports(task, process);
535     }
536 }
537 
538 
539 void
540 nxt_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
541 {
542     nxt_debug(task, "port empty handler");
543 }
544 
545 
546 typedef struct {
547     nxt_work_t               work;
548     nxt_port_t               *port;
549     nxt_port_post_handler_t  handler;
550 } nxt_port_work_t;
551 
552 
553 static void
554 nxt_port_post_handler(nxt_task_t *task, void *obj, void *data)
555 {
556     nxt_port_t               *port;
557     nxt_port_work_t          *pw;
558     nxt_port_post_handler_t  handler;
559 
560     pw = obj;
561     port = pw->port;
562     handler = pw->handler;
563 
564     nxt_free(pw);
565 
566     handler(task, port, data);
567 
568     nxt_port_use(task, port, -1);
569 }
570 
571 
572 nxt_int_t
573 nxt_port_post(nxt_task_t *task, nxt_port_t *port,
574     nxt_port_post_handler_t handler, void *data)
575 {
576     nxt_port_work_t  *pw;
577 
578     if (task->thread->engine == port->engine) {
579         handler(task, port, data);
580 
581         return NXT_OK;
582     }
583 
584     pw = nxt_zalloc(sizeof(nxt_port_work_t));
585 
586     if (nxt_slow_path(pw == NULL)) {
587         return NXT_ERROR;
588     }
589 
590     nxt_atomic_fetch_add(&port->use_count, 1);
591 
592     pw->work.handler = nxt_port_post_handler;
593     pw->work.task = &port->engine->task;
594     pw->work.obj = pw;
595     pw->work.data = data;
596 
597     pw->port = port;
598     pw->handler = handler;
599 
600     nxt_event_engine_post(port->engine, &pw->work);
601 
602     return NXT_OK;
603 }
604 
605 
606 static void
607 nxt_port_release_handler(nxt_task_t *task, nxt_port_t *port, void *data)
608 {
609     /* no op */
610 }
611 
612 
613 void
614 nxt_port_use(nxt_task_t *task, nxt_port_t *port, int i)
615 {
616     int  c;
617 
618     c = nxt_atomic_fetch_add(&port->use_count, i);
619 
620     if (i < 0 && c == -i) {
621 
622         if (port->engine == NULL || task->thread->engine == port->engine) {
623             nxt_port_release(task, port);
624 
625             return;
626         }
627 
628         nxt_port_post(task, port, nxt_port_release_handler, NULL);
629     }
630 }
631