xref: /unit/go/port.go (revision 1980:43553aa72111)
1/*
2 * Copyright (C) Max Romanov
3 * Copyright (C) NGINX, Inc.
4 */
5
6package unit
7
8/*
9#include "nxt_cgo_lib.h"
10*/
11import "C"
12
13import (
14	"io"
15	"net"
16	"os"
17	"sync"
18	"unsafe"
19)
20
21type port_key struct {
22	pid int
23	id  int
24}
25
26type port struct {
27	key port_key
28	rcv *net.UnixConn
29	snd *net.UnixConn
30}
31
32type port_registry struct {
33	sync.RWMutex
34	m map[port_key]*port
35}
36
37var port_registry_ port_registry
38
39func find_port(key port_key) *port {
40	port_registry_.RLock()
41	res := port_registry_.m[key]
42	port_registry_.RUnlock()
43
44	return res
45}
46
47func add_port(p *port) {
48
49	port_registry_.Lock()
50	if port_registry_.m == nil {
51		port_registry_.m = make(map[port_key]*port)
52	}
53
54	old := port_registry_.m[p.key]
55
56	if old == nil {
57		port_registry_.m[p.key] = p
58	}
59
60	port_registry_.Unlock()
61}
62
63func (p *port) Close() {
64	if p.rcv != nil {
65		p.rcv.Close()
66	}
67
68	if p.snd != nil {
69		p.snd.Close()
70	}
71}
72
73func getUnixConn(fd int) *net.UnixConn {
74	if fd < 0 {
75		return nil
76	}
77
78	f := os.NewFile(uintptr(fd), "sock")
79	defer f.Close()
80
81	c, err := net.FileConn(f)
82	if err != nil {
83		nxt_go_alert("FileConn error %s", err)
84		return nil
85	}
86
87	uc, ok := c.(*net.UnixConn)
88	if !ok {
89		nxt_go_alert("Not a Unix-domain socket %d", fd)
90		return nil
91	}
92
93	return uc
94}
95
96//export nxt_go_add_port
97func nxt_go_add_port(ctx *C.nxt_unit_ctx_t, p *C.nxt_unit_port_t) C.int {
98
99	new_port := &port{
100		key: port_key{
101			pid: int(p.id.pid),
102			id:  int(p.id.id),
103		},
104		rcv: getUnixConn(int(p.in_fd)),
105		snd: getUnixConn(int(p.out_fd)),
106	}
107
108	add_port(new_port)
109
110	p.in_fd = -1
111	p.out_fd = -1
112
113	if new_port.key.id == 65535 {
114		go func(ctx *C.nxt_unit_ctx_t) {
115			C.nxt_unit_run_shared(ctx);
116		}(ctx)
117	}
118
119	return C.NXT_UNIT_OK
120}
121
122//export nxt_go_remove_port
123func nxt_go_remove_port(unit *C.nxt_unit_t, ctx *C.nxt_unit_ctx_t,
124	p *C.nxt_unit_port_t) {
125
126	key := port_key{
127		pid: int(p.id.pid),
128		id:  int(p.id.id),
129	}
130
131	port_registry_.Lock()
132	if port_registry_.m != nil {
133		delete(port_registry_.m, key)
134	}
135
136	port_registry_.Unlock()
137}
138
139//export nxt_go_port_send
140func nxt_go_port_send(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int,
141	oob unsafe.Pointer, oob_size C.int) C.ssize_t {
142
143	key := port_key{
144		pid: int(pid),
145		id:  int(id),
146	}
147
148	p := find_port(key)
149
150	if p == nil {
151		nxt_go_alert("port %d:%d not found", pid, id)
152		return 0
153	}
154
155	n, oobn, err := p.snd.WriteMsgUnix(GoBytes(buf, buf_size),
156		GoBytes(oob, oob_size), nil)
157
158	if err != nil {
159		nxt_go_warn("write result %d (%d), %s", n, oobn, err)
160
161		n = -1
162	}
163
164	return C.ssize_t(n)
165}
166
167//export nxt_go_port_recv
168func nxt_go_port_recv(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int,
169	oob unsafe.Pointer, oob_size C.int) C.ssize_t {
170
171	key := port_key{
172		pid: int(pid),
173		id:  int(id),
174	}
175
176	p := find_port(key)
177
178	if p == nil {
179		nxt_go_alert("port %d:%d not found", pid, id)
180		return 0
181	}
182
183	n, oobn, _, _, err := p.rcv.ReadMsgUnix(GoBytes(buf, buf_size),
184		GoBytes(oob, oob_size))
185
186	if err != nil {
187		if nerr, ok := err.(*net.OpError); ok {
188			if nerr.Err == io.EOF {
189				return 0
190			}
191		}
192
193		nxt_go_warn("read result %d (%d), %s", n, oobn, err)
194
195		n = -1
196	}
197
198	return C.ssize_t(n)
199}
200