1/* 2 * Copyright (C) Max Romanov 3 * Copyright (C) NGINX, Inc. 4 */ 5 6package unit 7 8/* 9#include "nxt_cgo_lib.h" 10*/ 11import "C" 12 13import ( 14 "io" 15 "net" 16 "os" 17 "sync" 18 "unsafe" 19) 20 21type port_key struct { 22 pid int 23 id int 24} 25 26type port struct { 27 key port_key 28 rcv *net.UnixConn 29 snd *net.UnixConn 30} 31 32type port_registry struct { 33 sync.RWMutex 34 m map[port_key]*port 35} 36 37var port_registry_ port_registry 38 39func find_port(key port_key) *port { 40 port_registry_.RLock() 41 res := port_registry_.m[key] 42 port_registry_.RUnlock() 43 44 return res 45} 46 47func add_port(p *port) { 48 49 port_registry_.Lock() 50 if port_registry_.m == nil { 51 port_registry_.m = make(map[port_key]*port) 52 } 53 54 old := port_registry_.m[p.key] 55 56 if old == nil { 57 port_registry_.m[p.key] = p 58 } 59 60 port_registry_.Unlock() 61} 62 63func (p *port) Close() { 64 if p.rcv != nil { 65 p.rcv.Close() 66 } 67 68 if p.snd != nil { 69 p.snd.Close() 70 } 71} 72 73func getUnixConn(fd int) *net.UnixConn { 74 if fd < 0 { 75 return nil 76 } 77 78 f := os.NewFile(uintptr(fd), "sock") 79 defer f.Close() 80 81 c, err := net.FileConn(f) 82 if err != nil { 83 nxt_go_alert("FileConn error %s", err) 84 return nil 85 } 86 87 uc, ok := c.(*net.UnixConn) 88 if !ok { 89 nxt_go_alert("Not a Unix-domain socket %d", fd) 90 return nil 91 } 92 93 return uc 94} 95 96//export nxt_go_add_port 97func nxt_go_add_port(ctx *C.nxt_unit_ctx_t, p *C.nxt_unit_port_t) C.int { 98 99 new_port := &port{ 100 key: port_key{ 101 pid: int(p.id.pid), 102 id: int(p.id.id), 103 }, 104 rcv: getUnixConn(int(p.in_fd)), 105 snd: getUnixConn(int(p.out_fd)), 106 } 107 108 add_port(new_port) 109 110 p.in_fd = -1 111 p.out_fd = -1 112 113 return C.NXT_UNIT_OK 114} 115 116//export nxt_go_ready 117func nxt_go_ready(ctx *C.nxt_unit_ctx_t) C.int { 118 go func(ctx *C.nxt_unit_ctx_t) { 119 C.nxt_unit_run_shared(ctx) 120 }(ctx) 121 122 return C.NXT_UNIT_OK 123} 124 125//export nxt_go_remove_port 126func nxt_go_remove_port(unit *C.nxt_unit_t, ctx *C.nxt_unit_ctx_t, 127 p *C.nxt_unit_port_t) { 128 129 key := port_key{ 130 pid: int(p.id.pid), 131 id: int(p.id.id), 132 } 133 134 port_registry_.Lock() 135 if port_registry_.m != nil { 136 delete(port_registry_.m, key) 137 } 138 139 port_registry_.Unlock() 140} 141 142//export nxt_go_port_send 143func nxt_go_port_send(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int, 144 oob unsafe.Pointer, oob_size C.int) C.ssize_t { 145 146 key := port_key{ 147 pid: int(pid), 148 id: int(id), 149 } 150 151 p := find_port(key) 152 153 if p == nil { 154 nxt_go_alert("port %d:%d not found", pid, id) 155 return 0 156 } 157 158 n, oobn, err := p.snd.WriteMsgUnix(GoBytes(buf, buf_size), 159 GoBytes(oob, oob_size), nil) 160 161 if err != nil { 162 nxt_go_warn("write result %d (%d), %s", n, oobn, err) 163 164 n = -1 165 } 166 167 return C.ssize_t(n) 168} 169 170//export nxt_go_port_recv 171func nxt_go_port_recv(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int, 172 oob unsafe.Pointer, oob_size *C.size_t) C.ssize_t { 173 174 key := port_key{ 175 pid: int(pid), 176 id: int(id), 177 } 178 179 p := find_port(key) 180 181 if p == nil { 182 nxt_go_alert("port %d:%d not found", pid, id) 183 return 0 184 } 185 186 n, oobn, _, _, err := p.rcv.ReadMsgUnix(GoBytes(buf, buf_size), 187 GoBytes(oob, C.int(*oob_size))) 188 189 if err != nil { 190 if nerr, ok := err.(*net.OpError); ok { 191 if nerr.Err == io.EOF { 192 return 0 193 } 194 } 195 196 nxt_go_warn("read result %d (%d), %s", n, oobn, err) 197 198 n = -1 199 200 } else { 201 *oob_size = C.size_t(oobn) 202 } 203 204 return C.ssize_t(n) 205} 206