1/* 2 * Copyright (C) Max Romanov 3 * Copyright (C) NGINX, Inc. 4 */ 5 6package unit 7 8/* 9#include "nxt_cgo_lib.h" 10*/ 11import "C" 12 13import ( 14 "net" 15 "os" 16 "sync" 17 "unsafe" 18) 19 20type port_key struct { 21 pid int 22 id int 23} 24 25type port struct { 26 key port_key 27 rcv *net.UnixConn 28 snd *net.UnixConn 29} 30 31type port_registry struct { 32 sync.RWMutex 33 m map[port_key]*port 34} 35 36var port_registry_ port_registry 37 38func find_port(key port_key) *port { 39 port_registry_.RLock() 40 res := port_registry_.m[key] 41 port_registry_.RUnlock() 42 43 return res 44} 45 46func add_port(p *port) { 47 48 port_registry_.Lock() 49 if port_registry_.m == nil { 50 port_registry_.m = make(map[port_key]*port) 51 } 52 53 old := port_registry_.m[p.key] 54 55 if old == nil { 56 port_registry_.m[p.key] = p 57 } 58 59 port_registry_.Unlock() 60} 61 62func (p *port) Close() { 63 if p.rcv != nil { 64 p.rcv.Close() 65 } 66 67 if p.snd != nil { 68 p.snd.Close() 69 } 70} 71 72func getUnixConn(fd int) *net.UnixConn { 73 if fd < 0 { 74 return nil 75 } 76 77 f := os.NewFile(uintptr(fd), "sock") 78 defer f.Close() 79 80 c, err := net.FileConn(f) 81 if err != nil { 82 nxt_go_warn("FileConn error %s", err) 83 return nil 84 } 85 86 uc, ok := c.(*net.UnixConn) 87 if !ok { 88 nxt_go_warn("Not a Unix-domain socket %d", fd) 89 return nil 90 } 91 92 return uc 93} 94 95//export nxt_go_add_port 96func nxt_go_add_port(ctx C.uintptr_t, pid C.int, id C.int, rcv C.int, snd C.int) { 97 p := &port{ 98 key: port_key{ 99 pid: int(pid), 100 id: int(id), 101 }, 102 rcv: getUnixConn(int(rcv)), 103 snd: getUnixConn(int(snd)), 104 } 105 106 add_port(p) 107 108 if id == 65535 { 109 go func(ctx C.uintptr_t) { 110 C.nxt_cgo_unit_run_shared(ctx); 111 }(ctx) 112 } 113} 114 115//export nxt_go_remove_port 116func nxt_go_remove_port(pid C.int, id C.int) { 117 key := port_key{ 118 pid: int(pid), 119 id: int(id), 120 } 121 122 port_registry_.Lock() 123 if port_registry_.m != nil { 124 delete(port_registry_.m, key) 125 } 126 127 port_registry_.Unlock() 128} 129 130//export nxt_go_port_send 131func nxt_go_port_send(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int, 132 oob unsafe.Pointer, oob_size C.int) C.ssize_t { 133 134 key := port_key{ 135 pid: int(pid), 136 id: int(id), 137 } 138 139 p := find_port(key) 140 141 if p == nil { 142 nxt_go_warn("port %d:%d not found", pid, id) 143 return 0 144 } 145 146 n, oobn, err := p.snd.WriteMsgUnix(GoBytes(buf, buf_size), 147 GoBytes(oob, oob_size), nil) 148 149 if err != nil { 150 nxt_go_warn("write result %d (%d), %s", n, oobn, err) 151 152 n = -1 153 } 154 155 return C.ssize_t(n) 156} 157 158//export nxt_go_port_recv 159func nxt_go_port_recv(pid C.int, id C.int, buf unsafe.Pointer, buf_size C.int, 160 oob unsafe.Pointer, oob_size C.int) C.ssize_t { 161 162 key := port_key{ 163 pid: int(pid), 164 id: int(id), 165 } 166 167 p := find_port(key) 168 169 if p == nil { 170 nxt_go_warn("port %d:%d not found", pid, id) 171 return 0 172 } 173 174 n, oobn, _, _, err := p.rcv.ReadMsgUnix(GoBytes(buf, buf_size), 175 GoBytes(oob, oob_size)) 176 177 if err != nil { 178 nxt_go_warn("read result %d (%d), %s", n, oobn, err) 179 180 n = -1 181 } 182 183 return C.ssize_t(n) 184} 185