xref: /unit/go/port.go (revision 1547:cbcd76704c90)
1/*
2 * Copyright (C) Max Romanov
3 * Copyright (C) NGINX, Inc.
4 */
5
6package unit
7
8/*
9#include "nxt_cgo_lib.h"
10*/
11import "C"
12
13import (
14	"net"
15	"os"
16	"sync"
17	"unsafe"
18)
19
20type port_key struct {
21	pid int
22	id  int
23}
24
25type port struct {
26	key port_key
27	rcv *net.UnixConn
28	snd *net.UnixConn
29}
30
31type port_registry struct {
32	sync.RWMutex
33	m map[port_key]*port
34}
35
36var port_registry_ port_registry
37
38func find_port(key port_key) *port {
39	port_registry_.RLock()
40	res := port_registry_.m[key]
41	port_registry_.RUnlock()
42
43	return res
44}
45
46func add_port(p *port) {
47
48	port_registry_.Lock()
49	if port_registry_.m == nil {
50		port_registry_.m = make(map[port_key]*port)
51	}
52
53	old := port_registry_.m[p.key]
54
55	if old == nil {
56		port_registry_.m[p.key] = p
57	}
58
59	port_registry_.Unlock()
60}
61
62func (p *port) Close() {
63	if p.rcv != nil {
64		p.rcv.Close()
65	}
66
67	if p.snd != nil {
68		p.snd.Close()
69	}
70}
71
72func getUnixConn(fd int) *net.UnixConn {
73	if fd < 0 {
74		return nil
75	}
76
77	f := os.NewFile(uintptr(fd), "sock")
78	defer f.Close()
79
80	c, err := net.FileConn(f)
81	if err != nil {
82		nxt_go_warn("FileConn error %s", err)
83		return nil
84	}
85
86	uc, ok := c.(*net.UnixConn)
87	if !ok {
88		nxt_go_warn("Not a Unix-domain socket %d", fd)
89		return nil
90	}
91
92	return uc
93}
94
95//export nxt_go_add_port
96func nxt_go_add_port(ctx C.uintptr_t, pid C.int, id C.int, rcv C.int, snd C.int) {
97	p := &port{
98		key: port_key{
99			pid: int(pid),
100			id:  int(id),
101		},
102		rcv: getUnixConn(int(rcv)),
103		snd: getUnixConn(int(snd)),
104	}
105
106	add_port(p)
107
108	if id == 65535 {
109		go func(ctx C.uintptr_t) {
110			C.nxt_cgo_unit_run_shared(ctx);
111		}(ctx)
112	}
113}
114
115//export nxt_go_remove_port
116func nxt_go_remove_port(pid C.int, id C.int) {
117	key := port_key{
118		pid: int(pid),
119		id:  int(id),
120	}
121
122	port_registry_.Lock()
123	if port_registry_.m != nil {
124		delete(port_registry_.m, key)
125	}
126
127	port_registry_.Unlock()
128}
129
130//export nxt_go_port_send
131func nxt_go_port_send(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int,
132	oob unsafe.Pointer, oob_size C.int) C.ssize_t {
133
134	key := port_key{
135		pid: int(pid),
136		id:  int(id),
137	}
138
139	p := find_port(key)
140
141	if p == nil {
142		nxt_go_warn("port %d:%d not found", pid, id)
143		return 0
144	}
145
146	n, oobn, err := p.snd.WriteMsgUnix(GoBytes(buf, buf_size),
147		GoBytes(oob, oob_size), nil)
148
149	if err != nil {
150		nxt_go_warn("write result %d (%d), %s", n, oobn, err)
151
152		n = -1
153	}
154
155	return C.ssize_t(n)
156}
157
158//export nxt_go_port_recv
159func nxt_go_port_recv(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int,
160	oob unsafe.Pointer, oob_size C.int) C.ssize_t {
161
162	key := port_key{
163		pid: int(pid),
164		id:  int(id),
165	}
166
167	p := find_port(key)
168
169	if p == nil {
170		nxt_go_warn("port %d:%d not found", pid, id)
171		return 0
172	}
173
174	n, oobn, _, _, err := p.rcv.ReadMsgUnix(GoBytes(buf, buf_size),
175		GoBytes(oob, oob_size))
176
177	if err != nil {
178		nxt_go_warn("read result %d (%d), %s", n, oobn, err)
179
180		n = -1
181	}
182
183	return C.ssize_t(n)
184}
185