xref: /unit/go/port.go (revision 1996:35873fa78fed)
1/*
2 * Copyright (C) Max Romanov
3 * Copyright (C) NGINX, Inc.
4 */
5
6package unit
7
8/*
9#include "nxt_cgo_lib.h"
10*/
11import "C"
12
13import (
14	"io"
15	"net"
16	"os"
17	"sync"
18	"unsafe"
19)
20
21type port_key struct {
22	pid int
23	id  int
24}
25
26type port struct {
27	key port_key
28	rcv *net.UnixConn
29	snd *net.UnixConn
30}
31
32type port_registry struct {
33	sync.RWMutex
34	m map[port_key]*port
35}
36
37var port_registry_ port_registry
38
39func find_port(key port_key) *port {
40	port_registry_.RLock()
41	res := port_registry_.m[key]
42	port_registry_.RUnlock()
43
44	return res
45}
46
47func add_port(p *port) {
48
49	port_registry_.Lock()
50	if port_registry_.m == nil {
51		port_registry_.m = make(map[port_key]*port)
52	}
53
54	old := port_registry_.m[p.key]
55
56	if old == nil {
57		port_registry_.m[p.key] = p
58	}
59
60	port_registry_.Unlock()
61}
62
63func (p *port) Close() {
64	if p.rcv != nil {
65		p.rcv.Close()
66	}
67
68	if p.snd != nil {
69		p.snd.Close()
70	}
71}
72
73func getUnixConn(fd int) *net.UnixConn {
74	if fd < 0 {
75		return nil
76	}
77
78	f := os.NewFile(uintptr(fd), "sock")
79	defer f.Close()
80
81	c, err := net.FileConn(f)
82	if err != nil {
83		nxt_go_alert("FileConn error %s", err)
84		return nil
85	}
86
87	uc, ok := c.(*net.UnixConn)
88	if !ok {
89		nxt_go_alert("Not a Unix-domain socket %d", fd)
90		return nil
91	}
92
93	return uc
94}
95
96//export nxt_go_add_port
97func nxt_go_add_port(ctx *C.nxt_unit_ctx_t, p *C.nxt_unit_port_t) C.int {
98
99	new_port := &port{
100		key: port_key{
101			pid: int(p.id.pid),
102			id:  int(p.id.id),
103		},
104		rcv: getUnixConn(int(p.in_fd)),
105		snd: getUnixConn(int(p.out_fd)),
106	}
107
108	add_port(new_port)
109
110	p.in_fd = -1
111	p.out_fd = -1
112
113	return C.NXT_UNIT_OK
114}
115
116//export nxt_go_ready
117func nxt_go_ready(ctx *C.nxt_unit_ctx_t) C.int {
118	go func(ctx *C.nxt_unit_ctx_t) {
119		C.nxt_unit_run_shared(ctx)
120	}(ctx)
121
122	return C.NXT_UNIT_OK
123}
124
125//export nxt_go_remove_port
126func nxt_go_remove_port(unit *C.nxt_unit_t, ctx *C.nxt_unit_ctx_t,
127	p *C.nxt_unit_port_t) {
128
129	key := port_key{
130		pid: int(p.id.pid),
131		id:  int(p.id.id),
132	}
133
134	port_registry_.Lock()
135	if port_registry_.m != nil {
136		delete(port_registry_.m, key)
137	}
138
139	port_registry_.Unlock()
140}
141
142//export nxt_go_port_send
143func nxt_go_port_send(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int,
144	oob unsafe.Pointer, oob_size C.int) C.ssize_t {
145
146	key := port_key{
147		pid: int(pid),
148		id:  int(id),
149	}
150
151	p := find_port(key)
152
153	if p == nil {
154		nxt_go_alert("port %d:%d not found", pid, id)
155		return 0
156	}
157
158	n, oobn, err := p.snd.WriteMsgUnix(GoBytes(buf, buf_size),
159		GoBytes(oob, oob_size), nil)
160
161	if err != nil {
162		nxt_go_warn("write result %d (%d), %s", n, oobn, err)
163
164		n = -1
165	}
166
167	return C.ssize_t(n)
168}
169
170//export nxt_go_port_recv
171func nxt_go_port_recv(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int,
172	oob unsafe.Pointer, oob_size *C.size_t) C.ssize_t {
173
174	key := port_key{
175		pid: int(pid),
176		id:  int(id),
177	}
178
179	p := find_port(key)
180
181	if p == nil {
182		nxt_go_alert("port %d:%d not found", pid, id)
183		return 0
184	}
185
186	n, oobn, _, _, err := p.rcv.ReadMsgUnix(GoBytes(buf, buf_size),
187		GoBytes(oob, C.int(*oob_size)))
188
189	if err != nil {
190		if nerr, ok := err.(*net.OpError); ok {
191			if nerr.Err == io.EOF {
192				return 0
193			}
194		}
195
196		nxt_go_warn("read result %d (%d), %s", n, oobn, err)
197
198		n = -1
199
200	} else {
201		*oob_size = C.size_t(oobn)
202	}
203
204	return C.ssize_t(n)
205}
206