xref: /unit/go/port.go (revision 1316:5b767c6bfd0a)
1/*
2 * Copyright (C) Max Romanov
3 * Copyright (C) NGINX, Inc.
4 */
5
6package unit
7
8/*
9#include "nxt_cgo_lib.h"
10*/
11import "C"
12
13import (
14	"net"
15	"os"
16	"sync"
17	"unsafe"
18)
19
20type port_key struct {
21	pid int
22	id  int
23}
24
25type port struct {
26	key port_key
27	rcv *net.UnixConn
28	snd *net.UnixConn
29}
30
31type port_registry struct {
32	sync.RWMutex
33	m map[port_key]*port
34}
35
36var port_registry_ port_registry
37
38func find_port(key port_key) *port {
39	port_registry_.RLock()
40	res := port_registry_.m[key]
41	port_registry_.RUnlock()
42
43	return res
44}
45
46func add_port(p *port) {
47
48	port_registry_.Lock()
49	if port_registry_.m == nil {
50		port_registry_.m = make(map[port_key]*port)
51	}
52
53	port_registry_.m[p.key] = p
54
55	port_registry_.Unlock()
56}
57
58func (p *port) Close() {
59	if p.rcv != nil {
60		p.rcv.Close()
61	}
62
63	if p.snd != nil {
64		p.snd.Close()
65	}
66}
67
68func getUnixConn(fd int) *net.UnixConn {
69	if fd < 0 {
70		return nil
71	}
72
73	f := os.NewFile(uintptr(fd), "sock")
74	defer f.Close()
75
76	c, err := net.FileConn(f)
77	if err != nil {
78		nxt_go_warn("FileConn error %s", err)
79		return nil
80	}
81
82	uc, ok := c.(*net.UnixConn)
83	if !ok {
84		nxt_go_warn("Not a Unix-domain socket %d", fd)
85		return nil
86	}
87
88	return uc
89}
90
91//export nxt_go_add_port
92func nxt_go_add_port(pid C.int, id C.int, rcv C.int, snd C.int) {
93	p := &port{
94		key: port_key{
95			pid: int(pid),
96			id:  int(id),
97		},
98		rcv: getUnixConn(int(rcv)),
99		snd: getUnixConn(int(snd)),
100	}
101
102	add_port(p)
103}
104
105//export nxt_go_remove_port
106func nxt_go_remove_port(pid C.int, id C.int) {
107	key := port_key{
108		pid: int(pid),
109		id:  int(id),
110	}
111
112	port_registry_.Lock()
113	if port_registry_.m != nil {
114		delete(port_registry_.m, key)
115	}
116
117	port_registry_.Unlock()
118}
119
120//export nxt_go_port_send
121func nxt_go_port_send(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int,
122	oob unsafe.Pointer, oob_size C.int) C.ssize_t {
123
124	key := port_key{
125		pid: int(pid),
126		id:  int(id),
127	}
128
129	p := find_port(key)
130
131	if p == nil {
132		nxt_go_warn("port %d:%d not found", pid, id)
133		return 0
134	}
135
136	n, oobn, err := p.snd.WriteMsgUnix(GoBytes(buf, buf_size),
137		GoBytes(oob, oob_size), nil)
138
139	if err != nil {
140		nxt_go_warn("write result %d (%d), %s", n, oobn, err)
141	}
142
143	return C.ssize_t(n)
144}
145
146//export nxt_go_port_recv
147func nxt_go_port_recv(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int,
148	oob unsafe.Pointer, oob_size C.int) C.ssize_t {
149
150	key := port_key{
151		pid: int(pid),
152		id:  int(id),
153	}
154
155	p := find_port(key)
156
157	if p == nil {
158		nxt_go_warn("port %d:%d not found", pid, id)
159		return 0
160	}
161
162	n, oobn, _, _, err := p.rcv.ReadMsgUnix(GoBytes(buf, buf_size),
163		GoBytes(oob, oob_size))
164
165	if err != nil {
166		nxt_go_warn("read result %d (%d), %s", n, oobn, err)
167	}
168
169	return C.ssize_t(n)
170}
171