1
2 /*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7 #include <nxt_main.h>
8 #include <nxt_runtime.h>
9 #include <nxt_port.h>
10 #include <nxt_router.h>
11 #include <nxt_app_queue.h>
12 #include <nxt_port_queue.h>
13
14
15 static void nxt_port_remove_pid(nxt_task_t *task, nxt_port_recv_msg_t *msg,
16 nxt_pid_t pid);
17 static void nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
18
19 static nxt_atomic_uint_t nxt_port_last_id = 1;
20
21
22 static void
nxt_port_mp_cleanup(nxt_task_t * task,void * obj,void * data)23 nxt_port_mp_cleanup(nxt_task_t *task, void *obj, void *data)
24 {
25 nxt_mp_t *mp;
26 nxt_port_t *port;
27
28 port = obj;
29 mp = data;
30
31 nxt_assert(port->pair[0] == -1);
32 nxt_assert(port->pair[1] == -1);
33
34 nxt_assert(port->use_count == 0);
35 nxt_assert(port->app_link.next == NULL);
36 nxt_assert(port->idle_link.next == NULL);
37
38 nxt_assert(nxt_queue_is_empty(&port->messages));
39 nxt_assert(nxt_lvlhsh_is_empty(&port->rpc_streams));
40 nxt_assert(nxt_lvlhsh_is_empty(&port->rpc_peers));
41
42 nxt_thread_mutex_destroy(&port->write_mutex);
43
44 nxt_mp_free(mp, port);
45 }
46
47
48 nxt_port_t *
nxt_port_new(nxt_task_t * task,nxt_port_id_t id,nxt_pid_t pid,nxt_process_type_t type)49 nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid,
50 nxt_process_type_t type)
51 {
52 nxt_mp_t *mp;
53 nxt_port_t *port;
54
55 mp = nxt_mp_create(1024, 128, 256, 32);
56
57 if (nxt_slow_path(mp == NULL)) {
58 return NULL;
59 }
60
61 port = nxt_mp_zalloc(mp, sizeof(nxt_port_t));
62
63 if (nxt_fast_path(port != NULL)) {
64 port->id = id;
65 port->pid = pid;
66 port->type = type;
67 port->mem_pool = mp;
68 port->use_count = 1;
69
70 nxt_mp_cleanup(mp, nxt_port_mp_cleanup, task, port, mp);
71
72 nxt_queue_init(&port->messages);
73 nxt_thread_mutex_create(&port->write_mutex);
74
75 port->queue_fd = -1;
76
77 } else {
78 nxt_mp_destroy(mp);
79 }
80
81 nxt_thread_log_debug("port %p %d:%d new, type %d", port, pid, id, type);
82
83 return port;
84 }
85
86
87 void
nxt_port_close(nxt_task_t * task,nxt_port_t * port)88 nxt_port_close(nxt_task_t *task, nxt_port_t *port)
89 {
90 size_t size;
91
92 nxt_debug(task, "port %p %d:%d close, type %d", port, port->pid,
93 port->id, port->type);
94
95 if (port->pair[0] != -1) {
96 nxt_port_rpc_close(task, port);
97
98 nxt_fd_close(port->pair[0]);
99 port->pair[0] = -1;
100 }
101
102 if (port->pair[1] != -1) {
103 nxt_fd_close(port->pair[1]);
104 port->pair[1] = -1;
105
106 if (port->app != NULL) {
107 nxt_router_app_port_close(task, port);
108 }
109 }
110
111 if (port->queue_fd != -1) {
112 nxt_fd_close(port->queue_fd);
113 port->queue_fd = -1;
114 }
115
116 if (port->queue != NULL) {
117 size = (port->id == (nxt_port_id_t) -1) ? sizeof(nxt_app_queue_t)
118 : sizeof(nxt_port_queue_t);
119 nxt_mem_munmap(port->queue, size);
120
121 port->queue = NULL;
122 }
123 }
124
125
126 static void
nxt_port_release(nxt_task_t * task,nxt_port_t * port)127 nxt_port_release(nxt_task_t *task, nxt_port_t *port)
128 {
129 nxt_debug(task, "port %p %d:%d release, type %d", port, port->pid,
130 port->id, port->type);
131
132 port->app = NULL;
133
134 if (port->link.next != NULL) {
135 nxt_assert(port->process != NULL);
136
137 nxt_process_port_remove(port);
138
139 nxt_process_use(task, port->process, -1);
140 }
141
142 nxt_mp_release(port->mem_pool);
143 }
144
145
146 nxt_port_id_t
nxt_port_get_next_id(void)147 nxt_port_get_next_id(void)
148 {
149 return nxt_atomic_fetch_add(&nxt_port_last_id, 1);
150 }
151
152
153 void
nxt_port_reset_next_id(void)154 nxt_port_reset_next_id(void)
155 {
156 nxt_port_last_id = 1;
157 }
158
159
160 void
nxt_port_enable(nxt_task_t * task,nxt_port_t * port,const nxt_port_handlers_t * handlers)161 nxt_port_enable(nxt_task_t *task, nxt_port_t *port,
162 const nxt_port_handlers_t *handlers)
163 {
164 port->pid = nxt_pid;
165 port->handler = nxt_port_handler;
166 port->data = (nxt_port_handler_t *) (handlers);
167
168 nxt_port_read_enable(task, port);
169 }
170
171
172 static void
nxt_port_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)173 nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
174 {
175 nxt_port_handler_t *handlers;
176
177 if (nxt_fast_path(msg->port_msg.type < NXT_PORT_MSG_MAX)) {
178
179 nxt_debug(task, "port %d: message type:%uD fds:%d,%d",
180 msg->port->socket.fd, msg->port_msg.type,
181 msg->fd[0], msg->fd[1]);
182
183 handlers = msg->port->data;
184 handlers[msg->port_msg.type](task, msg);
185
186 return;
187 }
188
189 nxt_alert(task, "port %d: unknown message type:%uD",
190 msg->port->socket.fd, msg->port_msg.type);
191 }
192
193
194 void
nxt_port_quit_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)195 nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
196 {
197 nxt_runtime_quit(task, 0);
198 }
199
200
201 /* TODO join with process_ready and move to nxt_main_process.c */
202 nxt_inline void
nxt_port_send_new_port(nxt_task_t * task,nxt_runtime_t * rt,nxt_port_t * new_port,uint32_t stream)203 nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt,
204 nxt_port_t *new_port, uint32_t stream)
205 {
206 nxt_port_t *port;
207 nxt_process_t *process;
208
209 nxt_debug(task, "new port %d for process %PI",
210 new_port->pair[1], new_port->pid);
211
212 nxt_runtime_process_each(rt, process) {
213
214 if (process->pid == new_port->pid || process->pid == nxt_pid) {
215 continue;
216 }
217
218 port = nxt_process_port_first(process);
219
220 if (nxt_proc_send_matrix[port->type][new_port->type]) {
221 (void) nxt_port_send_port(task, port, new_port, stream);
222 }
223
224 } nxt_runtime_process_loop;
225 }
226
227
228 nxt_int_t
nxt_port_send_port(nxt_task_t * task,nxt_port_t * port,nxt_port_t * new_port,uint32_t stream)229 nxt_port_send_port(nxt_task_t *task, nxt_port_t *port, nxt_port_t *new_port,
230 uint32_t stream)
231 {
232 nxt_buf_t *b;
233 nxt_port_msg_new_port_t *msg;
234
235 b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
236 sizeof(nxt_port_data_t));
237 if (nxt_slow_path(b == NULL)) {
238 return NXT_ERROR;
239 }
240
241 nxt_debug(task, "send port %FD to process %PI",
242 new_port->pair[1], port->pid);
243
244 b->mem.free += sizeof(nxt_port_msg_new_port_t);
245 msg = (nxt_port_msg_new_port_t *) b->mem.pos;
246
247 msg->id = new_port->id;
248 msg->pid = new_port->pid;
249 msg->max_size = port->max_size;
250 msg->max_share = port->max_share;
251 msg->type = new_port->type;
252
253 return nxt_port_socket_write2(task, port, NXT_PORT_MSG_NEW_PORT,
254 new_port->pair[1], new_port->queue_fd,
255 stream, 0, b);
256 }
257
258
259 void
nxt_port_new_port_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)260 nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
261 {
262 nxt_port_t *port;
263 nxt_runtime_t *rt;
264 nxt_port_msg_new_port_t *new_port_msg;
265
266 rt = task->thread->runtime;
267
268 new_port_msg = (nxt_port_msg_new_port_t *) msg->buf->mem.pos;
269
270 /* TODO check b size and make plain */
271
272 nxt_debug(task, "new port %d received for process %PI:%d",
273 msg->fd[0], new_port_msg->pid, new_port_msg->id);
274
275 port = nxt_runtime_port_find(rt, new_port_msg->pid, new_port_msg->id);
276 if (port != NULL) {
277 nxt_debug(task, "port %PI:%d already exists", new_port_msg->pid,
278 new_port_msg->id);
279
280 msg->u.new_port = port;
281
282 nxt_fd_close(msg->fd[0]);
283 msg->fd[0] = -1;
284 return;
285 }
286
287 port = nxt_runtime_process_port_create(task, rt, new_port_msg->pid,
288 new_port_msg->id,
289 new_port_msg->type);
290 if (nxt_slow_path(port == NULL)) {
291 return;
292 }
293
294 nxt_fd_nonblocking(task, msg->fd[0]);
295
296 port->pair[0] = -1;
297 port->pair[1] = msg->fd[0];
298 port->max_size = new_port_msg->max_size;
299 port->max_share = new_port_msg->max_share;
300
301 port->socket.task = task;
302
303 nxt_port_write_enable(task, port);
304
305 msg->u.new_port = port;
306 }
307
308 /* TODO move to nxt_main_process.c */
309 void
nxt_port_process_ready_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)310 nxt_port_process_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
311 {
312 nxt_port_t *port;
313 nxt_process_t *process;
314 nxt_runtime_t *rt;
315
316 rt = task->thread->runtime;
317
318 process = nxt_runtime_process_find(rt, msg->port_msg.pid);
319 if (nxt_slow_path(process == NULL)) {
320 return;
321 }
322
323 nxt_assert(process->state != NXT_PROCESS_STATE_READY);
324
325 process->state = NXT_PROCESS_STATE_READY;
326
327 nxt_assert(!nxt_queue_is_empty(&process->ports));
328
329 port = nxt_process_port_first(process);
330
331 nxt_debug(task, "process %PI ready", msg->port_msg.pid);
332
333 if (msg->fd[0] != -1) {
334 port->queue_fd = msg->fd[0];
335 port->queue = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
336 PROT_READ | PROT_WRITE, MAP_SHARED,
337 msg->fd[0], 0);
338 }
339
340 nxt_port_send_new_port(task, rt, port, msg->port_msg.stream);
341 }
342
343
344 void
nxt_port_mmap_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)345 nxt_port_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
346 {
347 nxt_runtime_t *rt;
348 nxt_process_t *process;
349
350 rt = task->thread->runtime;
351
352 if (nxt_slow_path(msg->fd[0] == -1)) {
353 nxt_log(task, NXT_LOG_WARN, "invalid fd passed with mmap message");
354
355 return;
356 }
357
358 process = nxt_runtime_process_find(rt, msg->port_msg.pid);
359 if (nxt_slow_path(process == NULL)) {
360 nxt_log(task, NXT_LOG_WARN, "failed to get process #%PI",
361 msg->port_msg.pid);
362
363 goto fail_close;
364 }
365
366 nxt_port_incoming_port_mmap(task, process, msg->fd[0]);
367
368 fail_close:
369
370 nxt_fd_close(msg->fd[0]);
371 }
372
373
374 void
nxt_port_change_log_file(nxt_task_t * task,nxt_runtime_t * rt,nxt_uint_t slot,nxt_fd_t fd)375 nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt, nxt_uint_t slot,
376 nxt_fd_t fd)
377 {
378 nxt_buf_t *b;
379 nxt_port_t *port;
380 nxt_process_t *process;
381
382 nxt_debug(task, "change log file #%ui fd:%FD", slot, fd);
383
384 nxt_runtime_process_each(rt, process) {
385
386 if (nxt_pid == process->pid) {
387 continue;
388 }
389
390 port = nxt_process_port_first(process);
391
392 b = nxt_buf_mem_alloc(task->thread->engine->mem_pool,
393 sizeof(nxt_uint_t), 0);
394 if (nxt_slow_path(b == NULL)) {
395 continue;
396 }
397
398 b->mem.free = nxt_cpymem(b->mem.free, &slot, sizeof(nxt_uint_t));
399
400 (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_CHANGE_FILE,
401 fd, 0, 0, b);
402
403 } nxt_runtime_process_loop;
404 }
405
406
407 void
nxt_port_change_log_file_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)408 nxt_port_change_log_file_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
409 {
410 nxt_buf_t *b;
411 nxt_uint_t slot;
412 nxt_file_t *log_file;
413 nxt_runtime_t *rt;
414
415 rt = task->thread->runtime;
416
417 b = msg->buf;
418 slot = *(nxt_uint_t *) b->mem.pos;
419
420 log_file = nxt_list_elt(rt->log_files, slot);
421
422 nxt_debug(task, "change log file %FD:%FD", msg->fd[0], log_file->fd);
423
424 /*
425 * The old log file descriptor must be closed at the moment when no
426 * other threads use it. dup2() allows to use the old file descriptor
427 * for new log file. This change is performed atomically in the kernel.
428 */
429 if (nxt_file_redirect(log_file, msg->fd[0]) == NXT_OK) {
430 if (slot == 0) {
431 (void) nxt_file_stderr(log_file);
432 }
433 }
434 }
435
436
437 void
nxt_port_data_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)438 nxt_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
439 {
440 size_t dump_size;
441 nxt_buf_t *b;
442
443 b = msg->buf;
444 dump_size = b->mem.free - b->mem.pos;
445
446 if (dump_size > 300) {
447 dump_size = 300;
448 }
449
450 nxt_debug(task, "data: %*s", dump_size, b->mem.pos);
451 }
452
453
454 void
nxt_port_remove_notify_others(nxt_task_t * task,nxt_process_t * process)455 nxt_port_remove_notify_others(nxt_task_t *task, nxt_process_t *process)
456 {
457 nxt_pid_t pid;
458 nxt_buf_t *buf;
459 nxt_port_t *port;
460 nxt_runtime_t *rt;
461 nxt_process_t *p;
462 nxt_process_type_t ptype;
463
464 pid = process->pid;
465
466 ptype = nxt_process_type(process);
467
468 rt = task->thread->runtime;
469
470 nxt_runtime_process_each(rt, p) {
471
472 if (p->pid == nxt_pid
473 || p->pid == pid
474 || nxt_queue_is_empty(&p->ports))
475 {
476 continue;
477 }
478
479 port = nxt_process_port_first(p);
480
481 if (nxt_proc_remove_notify_matrix[ptype][port->type] == 0) {
482 continue;
483 }
484
485 buf = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
486 sizeof(pid));
487
488 if (nxt_slow_path(buf == NULL)) {
489 continue;
490 }
491
492 buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(pid));
493
494 nxt_port_socket_write(task, port, NXT_PORT_MSG_REMOVE_PID, -1,
495 process->stream, 0, buf);
496
497 } nxt_runtime_process_loop;
498 }
499
500
501 void
nxt_port_remove_pid_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)502 nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
503 {
504 nxt_pid_t pid;
505 nxt_buf_t *buf;
506
507 buf = msg->buf;
508
509 nxt_assert(nxt_buf_used_size(buf) == sizeof(pid));
510
511 nxt_memcpy(&pid, buf->mem.pos, sizeof(nxt_pid_t));
512
513 nxt_port_remove_pid(task, msg, pid);
514 }
515
516
517 static void
nxt_port_remove_pid(nxt_task_t * task,nxt_port_recv_msg_t * msg,nxt_pid_t pid)518 nxt_port_remove_pid(nxt_task_t *task, nxt_port_recv_msg_t *msg,
519 nxt_pid_t pid)
520 {
521 nxt_runtime_t *rt;
522 nxt_process_t *process;
523
524 msg->u.removed_pid = pid;
525
526 nxt_debug(task, "port remove pid %PI handler", pid);
527
528 rt = task->thread->runtime;
529
530 nxt_port_rpc_remove_peer(task, msg->port, pid);
531
532 process = nxt_runtime_process_find(rt, pid);
533
534 if (process) {
535 nxt_process_close_ports(task, process);
536 }
537 }
538
539
540 void
nxt_port_empty_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)541 nxt_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
542 {
543 nxt_debug(task, "port empty handler");
544 }
545
546
547 typedef struct {
548 nxt_work_t work;
549 nxt_port_t *port;
550 nxt_port_post_handler_t handler;
551 } nxt_port_work_t;
552
553
554 static void
nxt_port_post_handler(nxt_task_t * task,void * obj,void * data)555 nxt_port_post_handler(nxt_task_t *task, void *obj, void *data)
556 {
557 nxt_port_t *port;
558 nxt_port_work_t *pw;
559 nxt_port_post_handler_t handler;
560
561 pw = obj;
562 port = pw->port;
563 handler = pw->handler;
564
565 nxt_free(pw);
566
567 handler(task, port, data);
568
569 nxt_port_use(task, port, -1);
570 }
571
572
573 nxt_int_t
nxt_port_post(nxt_task_t * task,nxt_port_t * port,nxt_port_post_handler_t handler,void * data)574 nxt_port_post(nxt_task_t *task, nxt_port_t *port,
575 nxt_port_post_handler_t handler, void *data)
576 {
577 nxt_port_work_t *pw;
578
579 if (task->thread->engine == port->engine) {
580 handler(task, port, data);
581
582 return NXT_OK;
583 }
584
585 pw = nxt_zalloc(sizeof(nxt_port_work_t));
586
587 if (nxt_slow_path(pw == NULL)) {
588 return NXT_ERROR;
589 }
590
591 nxt_atomic_fetch_add(&port->use_count, 1);
592
593 pw->work.handler = nxt_port_post_handler;
594 pw->work.task = &port->engine->task;
595 pw->work.obj = pw;
596 pw->work.data = data;
597
598 pw->port = port;
599 pw->handler = handler;
600
601 nxt_event_engine_post(port->engine, &pw->work);
602
603 return NXT_OK;
604 }
605
606
607 static void
nxt_port_release_handler(nxt_task_t * task,nxt_port_t * port,void * data)608 nxt_port_release_handler(nxt_task_t *task, nxt_port_t *port, void *data)
609 {
610 /* no op */
611 }
612
613
614 void
nxt_port_use(nxt_task_t * task,nxt_port_t * port,int i)615 nxt_port_use(nxt_task_t *task, nxt_port_t *port, int i)
616 {
617 int c;
618
619 c = nxt_atomic_fetch_add(&port->use_count, i);
620
621 if (i < 0 && c == -i) {
622
623 if (port->engine == NULL || task->thread->engine == port->engine) {
624 nxt_port_release(task, port);
625
626 return;
627 }
628
629 nxt_port_post(task, port, nxt_port_release_handler, NULL);
630 }
631 }
632