xref: /unit/src/nxt_port.c (revision 2229:3a230013e58a)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 #include <nxt_runtime.h>
9 #include <nxt_port.h>
10 #include <nxt_router.h>
11 #include <nxt_app_queue.h>
12 #include <nxt_port_queue.h>
13 
14 
15 static void nxt_port_remove_pid(nxt_task_t *task, nxt_port_recv_msg_t *msg,
16     nxt_pid_t pid);
17 static void nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
18 
19 static nxt_atomic_uint_t nxt_port_last_id = 1;
20 
21 
22 static void
nxt_port_mp_cleanup(nxt_task_t * task,void * obj,void * data)23 nxt_port_mp_cleanup(nxt_task_t *task, void *obj, void *data)
24 {
25     nxt_mp_t    *mp;
26     nxt_port_t  *port;
27 
28     port = obj;
29     mp = data;
30 
31     nxt_assert(port->pair[0] == -1);
32     nxt_assert(port->pair[1] == -1);
33 
34     nxt_assert(port->use_count == 0);
35     nxt_assert(port->app_link.next == NULL);
36     nxt_assert(port->idle_link.next == NULL);
37 
38     nxt_assert(nxt_queue_is_empty(&port->messages));
39     nxt_assert(nxt_lvlhsh_is_empty(&port->rpc_streams));
40     nxt_assert(nxt_lvlhsh_is_empty(&port->rpc_peers));
41 
42     nxt_thread_mutex_destroy(&port->write_mutex);
43 
44     nxt_mp_free(mp, port);
45 }
46 
47 
48 nxt_port_t *
nxt_port_new(nxt_task_t * task,nxt_port_id_t id,nxt_pid_t pid,nxt_process_type_t type)49 nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid,
50     nxt_process_type_t type)
51 {
52     nxt_mp_t    *mp;
53     nxt_port_t  *port;
54 
55     mp = nxt_mp_create(1024, 128, 256, 32);
56 
57     if (nxt_slow_path(mp == NULL)) {
58         return NULL;
59     }
60 
61     port = nxt_mp_zalloc(mp, sizeof(nxt_port_t));
62 
63     if (nxt_fast_path(port != NULL)) {
64         port->id = id;
65         port->pid = pid;
66         port->type = type;
67         port->mem_pool = mp;
68         port->use_count = 1;
69 
70         nxt_mp_cleanup(mp, nxt_port_mp_cleanup, task, port, mp);
71 
72         nxt_queue_init(&port->messages);
73         nxt_thread_mutex_create(&port->write_mutex);
74 
75         port->queue_fd = -1;
76 
77     } else {
78         nxt_mp_destroy(mp);
79     }
80 
81     nxt_thread_log_debug("port %p %d:%d new, type %d", port, pid, id, type);
82 
83     return port;
84 }
85 
86 
87 void
nxt_port_close(nxt_task_t * task,nxt_port_t * port)88 nxt_port_close(nxt_task_t *task, nxt_port_t *port)
89 {
90     size_t  size;
91 
92     nxt_debug(task, "port %p %d:%d close, type %d", port, port->pid,
93               port->id, port->type);
94 
95     if (port->pair[0] != -1) {
96         nxt_port_rpc_close(task, port);
97 
98         nxt_fd_close(port->pair[0]);
99         port->pair[0] = -1;
100     }
101 
102     if (port->pair[1] != -1) {
103         nxt_fd_close(port->pair[1]);
104         port->pair[1] = -1;
105 
106         if (port->app != NULL) {
107             nxt_router_app_port_close(task, port);
108         }
109     }
110 
111     if (port->queue_fd != -1) {
112         nxt_fd_close(port->queue_fd);
113         port->queue_fd = -1;
114     }
115 
116     if (port->queue != NULL) {
117         size = (port->id == (nxt_port_id_t) -1) ? sizeof(nxt_app_queue_t)
118                                                 : sizeof(nxt_port_queue_t);
119         nxt_mem_munmap(port->queue, size);
120 
121         port->queue = NULL;
122     }
123 }
124 
125 
126 static void
nxt_port_release(nxt_task_t * task,nxt_port_t * port)127 nxt_port_release(nxt_task_t *task, nxt_port_t *port)
128 {
129     nxt_debug(task, "port %p %d:%d release, type %d", port, port->pid,
130               port->id, port->type);
131 
132     port->app = NULL;
133 
134     if (port->link.next != NULL) {
135         nxt_assert(port->process != NULL);
136 
137         nxt_process_port_remove(port);
138 
139         nxt_process_use(task, port->process, -1);
140     }
141 
142     nxt_mp_release(port->mem_pool);
143 }
144 
145 
146 nxt_port_id_t
nxt_port_get_next_id(void)147 nxt_port_get_next_id(void)
148 {
149     return nxt_atomic_fetch_add(&nxt_port_last_id, 1);
150 }
151 
152 
153 void
nxt_port_reset_next_id(void)154 nxt_port_reset_next_id(void)
155 {
156     nxt_port_last_id = 1;
157 }
158 
159 
160 void
nxt_port_enable(nxt_task_t * task,nxt_port_t * port,const nxt_port_handlers_t * handlers)161 nxt_port_enable(nxt_task_t *task, nxt_port_t *port,
162     const nxt_port_handlers_t *handlers)
163 {
164     port->pid = nxt_pid;
165     port->handler = nxt_port_handler;
166     port->data = (nxt_port_handler_t *) (handlers);
167 
168     nxt_port_read_enable(task, port);
169 }
170 
171 
172 static void
nxt_port_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)173 nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
174 {
175     nxt_port_handler_t  *handlers;
176 
177     if (nxt_fast_path(msg->port_msg.type < NXT_PORT_MSG_MAX)) {
178 
179         nxt_debug(task, "port %d: message type:%uD fds:%d,%d",
180                   msg->port->socket.fd, msg->port_msg.type,
181                   msg->fd[0], msg->fd[1]);
182 
183         handlers = msg->port->data;
184         handlers[msg->port_msg.type](task, msg);
185 
186         return;
187     }
188 
189     nxt_alert(task, "port %d: unknown message type:%uD",
190               msg->port->socket.fd, msg->port_msg.type);
191 }
192 
193 
194 void
nxt_port_quit_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)195 nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
196 {
197     nxt_runtime_quit(task, 0);
198 }
199 
200 
201 /* TODO join with process_ready and move to nxt_main_process.c */
202 nxt_inline void
nxt_port_send_new_port(nxt_task_t * task,nxt_runtime_t * rt,nxt_port_t * new_port,uint32_t stream)203 nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt,
204     nxt_port_t *new_port, uint32_t stream)
205 {
206     nxt_port_t     *port;
207     nxt_process_t  *process;
208 
209     nxt_debug(task, "new port %d for process %PI",
210               new_port->pair[1], new_port->pid);
211 
212     nxt_runtime_process_each(rt, process) {
213 
214         if (process->pid == new_port->pid || process->pid == nxt_pid) {
215             continue;
216         }
217 
218         port = nxt_process_port_first(process);
219 
220         if (nxt_proc_send_matrix[port->type][new_port->type]) {
221             (void) nxt_port_send_port(task, port, new_port, stream);
222         }
223 
224     } nxt_runtime_process_loop;
225 }
226 
227 
228 nxt_int_t
nxt_port_send_port(nxt_task_t * task,nxt_port_t * port,nxt_port_t * new_port,uint32_t stream)229 nxt_port_send_port(nxt_task_t *task, nxt_port_t *port, nxt_port_t *new_port,
230     uint32_t stream)
231 {
232     nxt_buf_t                *b;
233     nxt_port_msg_new_port_t  *msg;
234 
235     b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
236                              sizeof(nxt_port_data_t));
237     if (nxt_slow_path(b == NULL)) {
238         return NXT_ERROR;
239     }
240 
241     nxt_debug(task, "send port %FD to process %PI",
242               new_port->pair[1], port->pid);
243 
244     b->mem.free += sizeof(nxt_port_msg_new_port_t);
245     msg = (nxt_port_msg_new_port_t *) b->mem.pos;
246 
247     msg->id = new_port->id;
248     msg->pid = new_port->pid;
249     msg->max_size = port->max_size;
250     msg->max_share = port->max_share;
251     msg->type = new_port->type;
252 
253     return nxt_port_socket_write2(task, port, NXT_PORT_MSG_NEW_PORT,
254                                   new_port->pair[1], new_port->queue_fd,
255                                   stream, 0, b);
256 }
257 
258 
259 void
nxt_port_new_port_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)260 nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
261 {
262     nxt_port_t               *port;
263     nxt_runtime_t            *rt;
264     nxt_port_msg_new_port_t  *new_port_msg;
265 
266     rt = task->thread->runtime;
267 
268     new_port_msg = (nxt_port_msg_new_port_t *) msg->buf->mem.pos;
269 
270     /* TODO check b size and make plain */
271 
272     nxt_debug(task, "new port %d received for process %PI:%d",
273               msg->fd[0], new_port_msg->pid, new_port_msg->id);
274 
275     port = nxt_runtime_port_find(rt, new_port_msg->pid, new_port_msg->id);
276     if (port != NULL) {
277         nxt_debug(task, "port %PI:%d already exists", new_port_msg->pid,
278               new_port_msg->id);
279 
280         msg->u.new_port = port;
281 
282         nxt_fd_close(msg->fd[0]);
283         msg->fd[0] = -1;
284         return;
285     }
286 
287     port = nxt_runtime_process_port_create(task, rt, new_port_msg->pid,
288                                            new_port_msg->id,
289                                            new_port_msg->type);
290     if (nxt_slow_path(port == NULL)) {
291         return;
292     }
293 
294     nxt_fd_nonblocking(task, msg->fd[0]);
295 
296     port->pair[0] = -1;
297     port->pair[1] = msg->fd[0];
298     port->max_size = new_port_msg->max_size;
299     port->max_share = new_port_msg->max_share;
300 
301     port->socket.task = task;
302 
303     nxt_port_write_enable(task, port);
304 
305     msg->u.new_port = port;
306 }
307 
308 /* TODO move to nxt_main_process.c */
309 void
nxt_port_process_ready_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)310 nxt_port_process_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
311 {
312     nxt_port_t     *port;
313     nxt_process_t  *process;
314     nxt_runtime_t  *rt;
315 
316     rt = task->thread->runtime;
317 
318     process = nxt_runtime_process_find(rt, msg->port_msg.pid);
319     if (nxt_slow_path(process == NULL)) {
320         return;
321     }
322 
323     nxt_assert(process->state != NXT_PROCESS_STATE_READY);
324 
325     process->state = NXT_PROCESS_STATE_READY;
326 
327     nxt_assert(!nxt_queue_is_empty(&process->ports));
328 
329     port = nxt_process_port_first(process);
330 
331     nxt_debug(task, "process %PI ready", msg->port_msg.pid);
332 
333     if (msg->fd[0] != -1) {
334         port->queue_fd = msg->fd[0];
335         port->queue = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
336                                    PROT_READ | PROT_WRITE, MAP_SHARED,
337                                    msg->fd[0], 0);
338     }
339 
340     nxt_port_send_new_port(task, rt, port, msg->port_msg.stream);
341 }
342 
343 
344 void
nxt_port_mmap_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)345 nxt_port_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
346 {
347     nxt_runtime_t  *rt;
348     nxt_process_t  *process;
349 
350     rt = task->thread->runtime;
351 
352     if (nxt_slow_path(msg->fd[0] == -1)) {
353         nxt_log(task, NXT_LOG_WARN, "invalid fd passed with mmap message");
354 
355         return;
356     }
357 
358     process = nxt_runtime_process_find(rt, msg->port_msg.pid);
359     if (nxt_slow_path(process == NULL)) {
360         nxt_log(task, NXT_LOG_WARN, "failed to get process #%PI",
361                 msg->port_msg.pid);
362 
363         goto fail_close;
364     }
365 
366     nxt_port_incoming_port_mmap(task, process, msg->fd[0]);
367 
368 fail_close:
369 
370     nxt_fd_close(msg->fd[0]);
371 }
372 
373 
374 void
nxt_port_change_log_file(nxt_task_t * task,nxt_runtime_t * rt,nxt_uint_t slot,nxt_fd_t fd)375 nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt, nxt_uint_t slot,
376     nxt_fd_t fd)
377 {
378     nxt_buf_t      *b;
379     nxt_port_t     *port;
380     nxt_process_t  *process;
381 
382     nxt_debug(task, "change log file #%ui fd:%FD", slot, fd);
383 
384     nxt_runtime_process_each(rt, process) {
385 
386         if (nxt_pid == process->pid) {
387             continue;
388         }
389 
390         port = nxt_process_port_first(process);
391 
392         b = nxt_buf_mem_alloc(task->thread->engine->mem_pool,
393                               sizeof(nxt_uint_t), 0);
394         if (nxt_slow_path(b == NULL)) {
395             continue;
396         }
397 
398         b->mem.free = nxt_cpymem(b->mem.free, &slot, sizeof(nxt_uint_t));
399 
400         (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_CHANGE_FILE,
401                                      fd, 0, 0, b);
402 
403     } nxt_runtime_process_loop;
404 }
405 
406 
407 void
nxt_port_change_log_file_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)408 nxt_port_change_log_file_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
409 {
410     nxt_buf_t      *b;
411     nxt_uint_t     slot;
412     nxt_file_t     *log_file;
413     nxt_runtime_t  *rt;
414 
415     rt = task->thread->runtime;
416 
417     b = msg->buf;
418     slot = *(nxt_uint_t *) b->mem.pos;
419 
420     log_file = nxt_list_elt(rt->log_files, slot);
421 
422     nxt_debug(task, "change log file %FD:%FD", msg->fd[0], log_file->fd);
423 
424     /*
425      * The old log file descriptor must be closed at the moment when no
426      * other threads use it.  dup2() allows to use the old file descriptor
427      * for new log file.  This change is performed atomically in the kernel.
428      */
429     if (nxt_file_redirect(log_file, msg->fd[0]) == NXT_OK) {
430         if (slot == 0) {
431             (void) nxt_file_stderr(log_file);
432         }
433     }
434 }
435 
436 
437 void
nxt_port_data_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)438 nxt_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
439 {
440     size_t     dump_size;
441     nxt_buf_t  *b;
442 
443     b = msg->buf;
444     dump_size = b->mem.free - b->mem.pos;
445 
446     if (dump_size > 300) {
447         dump_size = 300;
448     }
449 
450     nxt_debug(task, "data: %*s", dump_size, b->mem.pos);
451 }
452 
453 
454 void
nxt_port_remove_notify_others(nxt_task_t * task,nxt_process_t * process)455 nxt_port_remove_notify_others(nxt_task_t *task, nxt_process_t *process)
456 {
457     nxt_pid_t           pid;
458     nxt_buf_t           *buf;
459     nxt_port_t          *port;
460     nxt_runtime_t       *rt;
461     nxt_process_t       *p;
462     nxt_process_type_t  ptype;
463 
464     pid = process->pid;
465 
466     ptype = nxt_process_type(process);
467 
468     rt = task->thread->runtime;
469 
470     nxt_runtime_process_each(rt, p) {
471 
472         if (p->pid == nxt_pid
473             || p->pid == pid
474             || nxt_queue_is_empty(&p->ports))
475         {
476             continue;
477         }
478 
479         port = nxt_process_port_first(p);
480 
481         if (nxt_proc_remove_notify_matrix[ptype][port->type] == 0) {
482             continue;
483         }
484 
485         buf = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
486                                    sizeof(pid));
487 
488         if (nxt_slow_path(buf == NULL)) {
489             continue;
490         }
491 
492         buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(pid));
493 
494         nxt_port_socket_write(task, port, NXT_PORT_MSG_REMOVE_PID, -1,
495                               process->stream, 0, buf);
496 
497     } nxt_runtime_process_loop;
498 }
499 
500 
501 void
nxt_port_remove_pid_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)502 nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
503 {
504     nxt_pid_t  pid;
505     nxt_buf_t  *buf;
506 
507     buf = msg->buf;
508 
509     nxt_assert(nxt_buf_used_size(buf) == sizeof(pid));
510 
511     nxt_memcpy(&pid, buf->mem.pos, sizeof(nxt_pid_t));
512 
513     nxt_port_remove_pid(task, msg, pid);
514 }
515 
516 
517 static void
nxt_port_remove_pid(nxt_task_t * task,nxt_port_recv_msg_t * msg,nxt_pid_t pid)518 nxt_port_remove_pid(nxt_task_t *task, nxt_port_recv_msg_t *msg,
519     nxt_pid_t pid)
520 {
521     nxt_runtime_t  *rt;
522     nxt_process_t  *process;
523 
524     msg->u.removed_pid = pid;
525 
526     nxt_debug(task, "port remove pid %PI handler", pid);
527 
528     rt = task->thread->runtime;
529 
530     nxt_port_rpc_remove_peer(task, msg->port, pid);
531 
532     process = nxt_runtime_process_find(rt, pid);
533 
534     if (process) {
535         nxt_process_close_ports(task, process);
536     }
537 }
538 
539 
540 void
nxt_port_empty_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)541 nxt_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
542 {
543     nxt_debug(task, "port empty handler");
544 }
545 
546 
547 typedef struct {
548     nxt_work_t               work;
549     nxt_port_t               *port;
550     nxt_port_post_handler_t  handler;
551 } nxt_port_work_t;
552 
553 
554 static void
nxt_port_post_handler(nxt_task_t * task,void * obj,void * data)555 nxt_port_post_handler(nxt_task_t *task, void *obj, void *data)
556 {
557     nxt_port_t               *port;
558     nxt_port_work_t          *pw;
559     nxt_port_post_handler_t  handler;
560 
561     pw = obj;
562     port = pw->port;
563     handler = pw->handler;
564 
565     nxt_free(pw);
566 
567     handler(task, port, data);
568 
569     nxt_port_use(task, port, -1);
570 }
571 
572 
573 nxt_int_t
nxt_port_post(nxt_task_t * task,nxt_port_t * port,nxt_port_post_handler_t handler,void * data)574 nxt_port_post(nxt_task_t *task, nxt_port_t *port,
575     nxt_port_post_handler_t handler, void *data)
576 {
577     nxt_port_work_t  *pw;
578 
579     if (task->thread->engine == port->engine) {
580         handler(task, port, data);
581 
582         return NXT_OK;
583     }
584 
585     pw = nxt_zalloc(sizeof(nxt_port_work_t));
586 
587     if (nxt_slow_path(pw == NULL)) {
588         return NXT_ERROR;
589     }
590 
591     nxt_atomic_fetch_add(&port->use_count, 1);
592 
593     pw->work.handler = nxt_port_post_handler;
594     pw->work.task = &port->engine->task;
595     pw->work.obj = pw;
596     pw->work.data = data;
597 
598     pw->port = port;
599     pw->handler = handler;
600 
601     nxt_event_engine_post(port->engine, &pw->work);
602 
603     return NXT_OK;
604 }
605 
606 
607 static void
nxt_port_release_handler(nxt_task_t * task,nxt_port_t * port,void * data)608 nxt_port_release_handler(nxt_task_t *task, nxt_port_t *port, void *data)
609 {
610     /* no op */
611 }
612 
613 
614 void
nxt_port_use(nxt_task_t * task,nxt_port_t * port,int i)615 nxt_port_use(nxt_task_t *task, nxt_port_t *port, int i)
616 {
617     int  c;
618 
619     c = nxt_atomic_fetch_add(&port->use_count, i);
620 
621     if (i < 0 && c == -i) {
622 
623         if (port->engine == NULL || task->thread->engine == port->engine) {
624             nxt_port_release(task, port);
625 
626             return;
627         }
628 
629         nxt_port_post(task, port, nxt_port_release_handler, NULL);
630     }
631 }
632