1190Smax.romanov@nginx.com
2190Smax.romanov@nginx.com /*
3190Smax.romanov@nginx.com * Copyright (C) Max Romanov
4190Smax.romanov@nginx.com * Copyright (C) NGINX, Inc.
5190Smax.romanov@nginx.com */
6190Smax.romanov@nginx.com
7190Smax.romanov@nginx.com #include <nxt_main.h>
8190Smax.romanov@nginx.com #include <nxt_port_rpc.h>
9190Smax.romanov@nginx.com
10190Smax.romanov@nginx.com
111487Smax.romanov@nginx.com static volatile uint32_t *nxt_stream_ident;
12318Smax.romanov@nginx.com
13190Smax.romanov@nginx.com typedef struct nxt_port_rpc_reg_s nxt_port_rpc_reg_t;
14190Smax.romanov@nginx.com
15190Smax.romanov@nginx.com struct nxt_port_rpc_reg_s {
16190Smax.romanov@nginx.com uint32_t stream;
17190Smax.romanov@nginx.com
18190Smax.romanov@nginx.com nxt_pid_t peer;
19190Smax.romanov@nginx.com nxt_queue_link_t link;
20318Smax.romanov@nginx.com nxt_bool_t link_first;
21190Smax.romanov@nginx.com
22190Smax.romanov@nginx.com nxt_port_rpc_handler_t ready_handler;
23190Smax.romanov@nginx.com nxt_port_rpc_handler_t error_handler;
24190Smax.romanov@nginx.com void *data;
25190Smax.romanov@nginx.com };
26190Smax.romanov@nginx.com
27190Smax.romanov@nginx.com
28425Smax.romanov@nginx.com static void
29425Smax.romanov@nginx.com nxt_port_rpc_remove_from_peers(nxt_task_t *task, nxt_port_t *port,
30425Smax.romanov@nginx.com nxt_port_rpc_reg_t *reg);
31425Smax.romanov@nginx.com
32425Smax.romanov@nginx.com
331487Smax.romanov@nginx.com nxt_int_t
nxt_port_rpc_init(void)341487Smax.romanov@nginx.com nxt_port_rpc_init(void)
351487Smax.romanov@nginx.com {
361487Smax.romanov@nginx.com void *p;
371487Smax.romanov@nginx.com
381487Smax.romanov@nginx.com if (nxt_stream_ident != NULL) {
391487Smax.romanov@nginx.com return NXT_OK;
401487Smax.romanov@nginx.com }
411487Smax.romanov@nginx.com
421487Smax.romanov@nginx.com p = nxt_mem_mmap(NULL, sizeof(*nxt_stream_ident), PROT_READ | PROT_WRITE,
431487Smax.romanov@nginx.com MAP_ANON | MAP_SHARED, -1, 0);
441487Smax.romanov@nginx.com
451487Smax.romanov@nginx.com if (nxt_slow_path(p == MAP_FAILED)) {
461487Smax.romanov@nginx.com return NXT_ERROR;
471487Smax.romanov@nginx.com }
481487Smax.romanov@nginx.com
491487Smax.romanov@nginx.com nxt_stream_ident = p;
501487Smax.romanov@nginx.com *nxt_stream_ident = 1;
511487Smax.romanov@nginx.com
521487Smax.romanov@nginx.com return NXT_OK;
531487Smax.romanov@nginx.com }
541487Smax.romanov@nginx.com
551487Smax.romanov@nginx.com
56190Smax.romanov@nginx.com static nxt_int_t
nxt_rpc_reg_test(nxt_lvlhsh_query_t * lhq,void * data)57190Smax.romanov@nginx.com nxt_rpc_reg_test(nxt_lvlhsh_query_t *lhq, void *data)
58190Smax.romanov@nginx.com {
59190Smax.romanov@nginx.com return NXT_OK;
60190Smax.romanov@nginx.com }
61190Smax.romanov@nginx.com
62190Smax.romanov@nginx.com
63190Smax.romanov@nginx.com static const nxt_lvlhsh_proto_t lvlhsh_rpc_reg_proto nxt_aligned(64) = {
64190Smax.romanov@nginx.com NXT_LVLHSH_DEFAULT,
65190Smax.romanov@nginx.com nxt_rpc_reg_test,
66190Smax.romanov@nginx.com nxt_lvlhsh_alloc,
67190Smax.romanov@nginx.com nxt_lvlhsh_free,
68190Smax.romanov@nginx.com };
69190Smax.romanov@nginx.com
70190Smax.romanov@nginx.com
71190Smax.romanov@nginx.com nxt_inline void
nxt_port_rpc_lhq_stream(nxt_lvlhsh_query_t * lhq,uint32_t * stream)72190Smax.romanov@nginx.com nxt_port_rpc_lhq_stream(nxt_lvlhsh_query_t *lhq, uint32_t *stream)
73190Smax.romanov@nginx.com {
74190Smax.romanov@nginx.com lhq->key_hash = nxt_murmur_hash2(stream, sizeof(*stream));
75190Smax.romanov@nginx.com lhq->key.length = sizeof(*stream);
76190Smax.romanov@nginx.com lhq->key.start = (u_char *) stream;
77190Smax.romanov@nginx.com lhq->proto = &lvlhsh_rpc_reg_proto;
78190Smax.romanov@nginx.com }
79190Smax.romanov@nginx.com
80190Smax.romanov@nginx.com
81190Smax.romanov@nginx.com nxt_inline void
nxt_port_rpc_lhq_peer(nxt_lvlhsh_query_t * lhq,nxt_pid_t * peer)82190Smax.romanov@nginx.com nxt_port_rpc_lhq_peer(nxt_lvlhsh_query_t *lhq, nxt_pid_t *peer)
83190Smax.romanov@nginx.com {
84190Smax.romanov@nginx.com lhq->key_hash = nxt_murmur_hash2(peer, sizeof(*peer));
85190Smax.romanov@nginx.com lhq->key.length = sizeof(*peer);
86190Smax.romanov@nginx.com lhq->key.start = (u_char *) peer;
87190Smax.romanov@nginx.com lhq->proto = &lvlhsh_rpc_reg_proto;
88190Smax.romanov@nginx.com }
89190Smax.romanov@nginx.com
90190Smax.romanov@nginx.com
91190Smax.romanov@nginx.com uint32_t
nxt_port_rpc_register_handler(nxt_task_t * task,nxt_port_t * port,nxt_port_rpc_handler_t ready_handler,nxt_port_rpc_handler_t error_handler,nxt_pid_t peer,void * data)92190Smax.romanov@nginx.com nxt_port_rpc_register_handler(nxt_task_t *task, nxt_port_t *port,
93190Smax.romanov@nginx.com nxt_port_rpc_handler_t ready_handler, nxt_port_rpc_handler_t error_handler,
94190Smax.romanov@nginx.com nxt_pid_t peer, void *data)
95190Smax.romanov@nginx.com {
96318Smax.romanov@nginx.com void *ex;
97318Smax.romanov@nginx.com nxt_port_rpc_reg_t *reg;
98318Smax.romanov@nginx.com
99318Smax.romanov@nginx.com ex = nxt_port_rpc_register_handler_ex(task, port, ready_handler,
100318Smax.romanov@nginx.com error_handler, 0);
101318Smax.romanov@nginx.com
102318Smax.romanov@nginx.com if (ex == NULL) {
103318Smax.romanov@nginx.com return 0;
104318Smax.romanov@nginx.com }
105318Smax.romanov@nginx.com
106318Smax.romanov@nginx.com if (peer != -1) {
107318Smax.romanov@nginx.com nxt_port_rpc_ex_set_peer(task, port, ex, peer);
108318Smax.romanov@nginx.com }
109318Smax.romanov@nginx.com
110318Smax.romanov@nginx.com reg = nxt_pointer_to(ex, -sizeof(nxt_port_rpc_reg_t));
111318Smax.romanov@nginx.com
112318Smax.romanov@nginx.com nxt_assert(reg->data == ex);
113318Smax.romanov@nginx.com
114318Smax.romanov@nginx.com reg->data = data;
115318Smax.romanov@nginx.com
116318Smax.romanov@nginx.com return reg->stream;
117318Smax.romanov@nginx.com }
118318Smax.romanov@nginx.com
119318Smax.romanov@nginx.com
120318Smax.romanov@nginx.com void *
nxt_port_rpc_register_handler_ex(nxt_task_t * task,nxt_port_t * port,nxt_port_rpc_handler_t ready_handler,nxt_port_rpc_handler_t error_handler,size_t ex_size)121318Smax.romanov@nginx.com nxt_port_rpc_register_handler_ex(nxt_task_t *task, nxt_port_t *port,
122318Smax.romanov@nginx.com nxt_port_rpc_handler_t ready_handler, nxt_port_rpc_handler_t error_handler,
123318Smax.romanov@nginx.com size_t ex_size)
124318Smax.romanov@nginx.com {
125190Smax.romanov@nginx.com uint32_t stream;
126190Smax.romanov@nginx.com nxt_port_rpc_reg_t *reg;
127190Smax.romanov@nginx.com nxt_lvlhsh_query_t lhq;
128190Smax.romanov@nginx.com
129277Sigor@sysoev.ru nxt_assert(port->pair[0] != -1);
130190Smax.romanov@nginx.com
1311487Smax.romanov@nginx.com stream = nxt_atomic_fetch_add(nxt_stream_ident, 1);
132190Smax.romanov@nginx.com
133318Smax.romanov@nginx.com reg = nxt_mp_zalloc(port->mem_pool, sizeof(nxt_port_rpc_reg_t) + ex_size);
134190Smax.romanov@nginx.com
135190Smax.romanov@nginx.com if (nxt_slow_path(reg == NULL)) {
136318Smax.romanov@nginx.com nxt_debug(task, "rpc: stream #%uD failed to allocate reg", stream);
137190Smax.romanov@nginx.com
138318Smax.romanov@nginx.com return NULL;
139190Smax.romanov@nginx.com }
140190Smax.romanov@nginx.com
141190Smax.romanov@nginx.com reg->stream = stream;
142318Smax.romanov@nginx.com reg->peer = -1;
143190Smax.romanov@nginx.com reg->ready_handler = ready_handler;
144190Smax.romanov@nginx.com reg->error_handler = error_handler;
145318Smax.romanov@nginx.com reg->data = reg + 1;
146190Smax.romanov@nginx.com
147190Smax.romanov@nginx.com nxt_port_rpc_lhq_stream(&lhq, &stream);
148190Smax.romanov@nginx.com lhq.replace = 0;
149190Smax.romanov@nginx.com lhq.value = reg;
150190Smax.romanov@nginx.com lhq.pool = port->mem_pool;
151190Smax.romanov@nginx.com
152190Smax.romanov@nginx.com switch (nxt_lvlhsh_insert(&port->rpc_streams, &lhq)) {
153190Smax.romanov@nginx.com
154190Smax.romanov@nginx.com case NXT_OK:
155190Smax.romanov@nginx.com break;
156190Smax.romanov@nginx.com
157190Smax.romanov@nginx.com default:
158318Smax.romanov@nginx.com nxt_log_error(NXT_LOG_ERR, task->log, "rpc: stream #%uD failed to add "
159318Smax.romanov@nginx.com "reg ", stream);
160190Smax.romanov@nginx.com
161190Smax.romanov@nginx.com nxt_mp_free(port->mem_pool, reg);
162190Smax.romanov@nginx.com
163318Smax.romanov@nginx.com return NULL;
164318Smax.romanov@nginx.com }
165318Smax.romanov@nginx.com
166318Smax.romanov@nginx.com nxt_debug(task, "rpc: stream #%uD registered", stream);
167318Smax.romanov@nginx.com
168582Smax.romanov@nginx.com nxt_port_inc_use(port);
169582Smax.romanov@nginx.com
170318Smax.romanov@nginx.com return reg->data;
171318Smax.romanov@nginx.com }
172318Smax.romanov@nginx.com
173318Smax.romanov@nginx.com
174318Smax.romanov@nginx.com uint32_t
nxt_port_rpc_ex_stream(void * ex)175318Smax.romanov@nginx.com nxt_port_rpc_ex_stream(void *ex)
176318Smax.romanov@nginx.com {
177318Smax.romanov@nginx.com nxt_port_rpc_reg_t *reg;
178318Smax.romanov@nginx.com
179318Smax.romanov@nginx.com reg = nxt_pointer_to(ex, -sizeof(nxt_port_rpc_reg_t));
180318Smax.romanov@nginx.com
181318Smax.romanov@nginx.com nxt_assert(reg->data == ex);
182318Smax.romanov@nginx.com
183318Smax.romanov@nginx.com return reg->stream;
184318Smax.romanov@nginx.com }
185318Smax.romanov@nginx.com
186318Smax.romanov@nginx.com
187318Smax.romanov@nginx.com void
nxt_port_rpc_ex_set_peer(nxt_task_t * task,nxt_port_t * port,void * ex,nxt_pid_t peer)188318Smax.romanov@nginx.com nxt_port_rpc_ex_set_peer(nxt_task_t *task, nxt_port_t *port,
189318Smax.romanov@nginx.com void *ex, nxt_pid_t peer)
190318Smax.romanov@nginx.com {
191318Smax.romanov@nginx.com nxt_int_t ret;
192318Smax.romanov@nginx.com nxt_queue_link_t *peer_link;
193318Smax.romanov@nginx.com nxt_port_rpc_reg_t *reg;
194318Smax.romanov@nginx.com nxt_lvlhsh_query_t lhq;
195318Smax.romanov@nginx.com
196318Smax.romanov@nginx.com reg = nxt_pointer_to(ex, -sizeof(nxt_port_rpc_reg_t));
197318Smax.romanov@nginx.com
198318Smax.romanov@nginx.com nxt_assert(reg->data == ex);
199318Smax.romanov@nginx.com
200425Smax.romanov@nginx.com if (nxt_slow_path(peer == reg->peer)) {
201425Smax.romanov@nginx.com return;
202425Smax.romanov@nginx.com }
203318Smax.romanov@nginx.com
204425Smax.romanov@nginx.com if (reg->peer != -1) {
205425Smax.romanov@nginx.com nxt_port_rpc_remove_from_peers(task, port, reg);
206425Smax.romanov@nginx.com
207425Smax.romanov@nginx.com reg->peer = -1;
208425Smax.romanov@nginx.com }
209425Smax.romanov@nginx.com
210425Smax.romanov@nginx.com if (peer == -1) {
211318Smax.romanov@nginx.com return;
212190Smax.romanov@nginx.com }
213190Smax.romanov@nginx.com
214318Smax.romanov@nginx.com reg->peer = peer;
215318Smax.romanov@nginx.com
216318Smax.romanov@nginx.com nxt_port_rpc_lhq_peer(&lhq, &peer);
217318Smax.romanov@nginx.com lhq.replace = 0;
218318Smax.romanov@nginx.com lhq.value = ®->link;
219318Smax.romanov@nginx.com lhq.pool = port->mem_pool;
220318Smax.romanov@nginx.com
221318Smax.romanov@nginx.com ret = nxt_lvlhsh_insert(&port->rpc_peers, &lhq);
222318Smax.romanov@nginx.com
223318Smax.romanov@nginx.com switch (ret) {
224318Smax.romanov@nginx.com
225318Smax.romanov@nginx.com case NXT_OK:
226318Smax.romanov@nginx.com reg->link_first = 1;
227318Smax.romanov@nginx.com nxt_queue_self(®->link);
228318Smax.romanov@nginx.com
229318Smax.romanov@nginx.com nxt_debug(task, "rpc: stream #%uD assigned uniq pid %PI (%p)",
230318Smax.romanov@nginx.com reg->stream, reg->peer, reg->link.next);
231318Smax.romanov@nginx.com break;
232318Smax.romanov@nginx.com
233318Smax.romanov@nginx.com case NXT_DECLINED:
234318Smax.romanov@nginx.com reg->link_first = 0;
235318Smax.romanov@nginx.com peer_link = lhq.value;
236318Smax.romanov@nginx.com nxt_queue_insert_after(peer_link, ®->link);
237318Smax.romanov@nginx.com
238318Smax.romanov@nginx.com nxt_debug(task, "rpc: stream #%uD assigned duplicate pid %PI (%p)",
239318Smax.romanov@nginx.com reg->stream, reg->peer, reg->link.next);
240318Smax.romanov@nginx.com break;
241318Smax.romanov@nginx.com
242318Smax.romanov@nginx.com default:
243494Spluknet@nginx.com nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to add "
244318Smax.romanov@nginx.com "peer for stream #%uD (%d)", reg->stream, ret);
245318Smax.romanov@nginx.com
246318Smax.romanov@nginx.com reg->peer = -1;
247318Smax.romanov@nginx.com break;
248318Smax.romanov@nginx.com }
249318Smax.romanov@nginx.com
250318Smax.romanov@nginx.com }
251318Smax.romanov@nginx.com
252318Smax.romanov@nginx.com
253318Smax.romanov@nginx.com static void
nxt_port_rpc_remove_from_peers(nxt_task_t * task,nxt_port_t * port,nxt_port_rpc_reg_t * reg)254318Smax.romanov@nginx.com nxt_port_rpc_remove_from_peers(nxt_task_t *task, nxt_port_t *port,
255318Smax.romanov@nginx.com nxt_port_rpc_reg_t *reg)
256318Smax.romanov@nginx.com {
257318Smax.romanov@nginx.com uint32_t stream;
258318Smax.romanov@nginx.com nxt_int_t ret;
259318Smax.romanov@nginx.com nxt_lvlhsh_query_t lhq;
260318Smax.romanov@nginx.com nxt_port_rpc_reg_t *r;
261318Smax.romanov@nginx.com
262318Smax.romanov@nginx.com stream = reg->stream;
263318Smax.romanov@nginx.com
264318Smax.romanov@nginx.com if (reg->link_first != 0) {
265318Smax.romanov@nginx.com nxt_port_rpc_lhq_peer(&lhq, ®->peer);
266204Smax.romanov@nginx.com lhq.pool = port->mem_pool;
267190Smax.romanov@nginx.com
268318Smax.romanov@nginx.com if (reg->link.next == ®->link) {
269318Smax.romanov@nginx.com nxt_assert(reg->link.prev == ®->link);
270318Smax.romanov@nginx.com
271318Smax.romanov@nginx.com nxt_debug(task, "rpc: stream #%uD remove first and last pid %PI "
272318Smax.romanov@nginx.com "registration (%p)", stream, reg->peer, reg->link.next);
273190Smax.romanov@nginx.com
274318Smax.romanov@nginx.com ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq);
275613Svbart@nginx.com
276318Smax.romanov@nginx.com } else {
277318Smax.romanov@nginx.com nxt_debug(task, "rpc: stream #%uD remove first pid %PI "
278318Smax.romanov@nginx.com "registration (%p)", stream, reg->peer, reg->link.next);
279318Smax.romanov@nginx.com
280318Smax.romanov@nginx.com lhq.replace = 1;
281318Smax.romanov@nginx.com lhq.value = reg->link.next;
282190Smax.romanov@nginx.com
283318Smax.romanov@nginx.com r = nxt_queue_link_data(reg->link.next, nxt_port_rpc_reg_t, link);
284318Smax.romanov@nginx.com r->link_first = 1;
285318Smax.romanov@nginx.com
286318Smax.romanov@nginx.com nxt_queue_remove(®->link);
287190Smax.romanov@nginx.com
288318Smax.romanov@nginx.com ret = nxt_lvlhsh_insert(&port->rpc_peers, &lhq);
289204Smax.romanov@nginx.com }
290613Svbart@nginx.com
291318Smax.romanov@nginx.com } else {
292318Smax.romanov@nginx.com nxt_debug(task, "rpc: stream #%uD remove pid %PI "
293318Smax.romanov@nginx.com "registration (%p)", stream, reg->peer, reg->link.next);
294318Smax.romanov@nginx.com
295318Smax.romanov@nginx.com nxt_queue_remove(®->link);
296318Smax.romanov@nginx.com ret = NXT_OK;
297190Smax.romanov@nginx.com }
298190Smax.romanov@nginx.com
299318Smax.romanov@nginx.com if (nxt_slow_path(ret != NXT_OK)) {
300318Smax.romanov@nginx.com nxt_log_error(NXT_LOG_ERR, task->log, "rpc: stream #%uD failed"
301318Smax.romanov@nginx.com " to delete peer %PI (%d)", stream, reg->peer, ret);
302318Smax.romanov@nginx.com }
303190Smax.romanov@nginx.com }
304190Smax.romanov@nginx.com
305190Smax.romanov@nginx.com
306190Smax.romanov@nginx.com void
nxt_port_rpc_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)307190Smax.romanov@nginx.com nxt_port_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
308190Smax.romanov@nginx.com {
309190Smax.romanov@nginx.com uint8_t last;
310190Smax.romanov@nginx.com uint32_t stream;
311190Smax.romanov@nginx.com nxt_int_t ret;
312190Smax.romanov@nginx.com nxt_port_t *port;
313190Smax.romanov@nginx.com nxt_port_rpc_reg_t *reg;
314190Smax.romanov@nginx.com nxt_lvlhsh_query_t lhq;
315190Smax.romanov@nginx.com nxt_port_msg_type_t type;
316190Smax.romanov@nginx.com
317190Smax.romanov@nginx.com stream = msg->port_msg.stream;
318190Smax.romanov@nginx.com port = msg->port;
319190Smax.romanov@nginx.com last = msg->port_msg.last;
320190Smax.romanov@nginx.com type = msg->port_msg.type;
321190Smax.romanov@nginx.com
322190Smax.romanov@nginx.com nxt_port_rpc_lhq_stream(&lhq, &stream);
323190Smax.romanov@nginx.com lhq.pool = port->mem_pool;
324190Smax.romanov@nginx.com
325190Smax.romanov@nginx.com if (last != 0) {
326190Smax.romanov@nginx.com ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq);
327277Sigor@sysoev.ru
328190Smax.romanov@nginx.com } else {
329190Smax.romanov@nginx.com ret = nxt_lvlhsh_find(&port->rpc_streams, &lhq);
330190Smax.romanov@nginx.com }
331190Smax.romanov@nginx.com
332190Smax.romanov@nginx.com if (ret != NXT_OK) {
333318Smax.romanov@nginx.com nxt_debug(task, "rpc: stream #%uD no handler found", stream);
334190Smax.romanov@nginx.com
335190Smax.romanov@nginx.com return;
336190Smax.romanov@nginx.com }
337190Smax.romanov@nginx.com
338318Smax.romanov@nginx.com nxt_debug(task, "rpc: stream #%uD %shandler, type %d", stream,
339318Smax.romanov@nginx.com (last ? "last " : ""), type);
340318Smax.romanov@nginx.com
341190Smax.romanov@nginx.com reg = lhq.value;
342190Smax.romanov@nginx.com
343190Smax.romanov@nginx.com if (type == _NXT_PORT_MSG_RPC_ERROR) {
344190Smax.romanov@nginx.com reg->error_handler(task, msg, reg->data);
345277Sigor@sysoev.ru
346190Smax.romanov@nginx.com } else {
347190Smax.romanov@nginx.com reg->ready_handler(task, msg, reg->data);
348190Smax.romanov@nginx.com }
349190Smax.romanov@nginx.com
350190Smax.romanov@nginx.com if (last == 0) {
351190Smax.romanov@nginx.com return;
352190Smax.romanov@nginx.com }
353190Smax.romanov@nginx.com
354204Smax.romanov@nginx.com if (reg->peer != -1) {
355318Smax.romanov@nginx.com nxt_port_rpc_remove_from_peers(task, port, reg);
356318Smax.romanov@nginx.com }
357190Smax.romanov@nginx.com
358318Smax.romanov@nginx.com nxt_debug(task, "rpc: stream #%uD free registration", stream);
359190Smax.romanov@nginx.com
360190Smax.romanov@nginx.com nxt_mp_free(port->mem_pool, reg);
361582Smax.romanov@nginx.com
362582Smax.romanov@nginx.com nxt_port_use(task, port, -1);
363190Smax.romanov@nginx.com }
364190Smax.romanov@nginx.com
365190Smax.romanov@nginx.com
366190Smax.romanov@nginx.com void
nxt_port_rpc_remove_peer(nxt_task_t * task,nxt_port_t * port,nxt_pid_t peer)367190Smax.romanov@nginx.com nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer)
368190Smax.romanov@nginx.com {
369190Smax.romanov@nginx.com uint8_t last;
370190Smax.romanov@nginx.com uint32_t stream;
371190Smax.romanov@nginx.com nxt_int_t ret;
372190Smax.romanov@nginx.com nxt_buf_t buf;
373318Smax.romanov@nginx.com nxt_queue_link_t *peer_link, *next_link;
374190Smax.romanov@nginx.com nxt_port_rpc_reg_t *reg;
375190Smax.romanov@nginx.com nxt_lvlhsh_query_t lhq;
376190Smax.romanov@nginx.com nxt_port_recv_msg_t msg;
377190Smax.romanov@nginx.com
378190Smax.romanov@nginx.com nxt_port_rpc_lhq_peer(&lhq, &peer);
379190Smax.romanov@nginx.com lhq.pool = port->mem_pool;
380190Smax.romanov@nginx.com
381190Smax.romanov@nginx.com ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq);
382190Smax.romanov@nginx.com
383190Smax.romanov@nginx.com if (nxt_slow_path(ret != NXT_OK)) {
384318Smax.romanov@nginx.com nxt_debug(task, "rpc: no reg found for peer %PI", peer);
385190Smax.romanov@nginx.com
386190Smax.romanov@nginx.com return;
387190Smax.romanov@nginx.com }
388190Smax.romanov@nginx.com
389190Smax.romanov@nginx.com nxt_memzero(&msg, sizeof(msg));
390190Smax.romanov@nginx.com nxt_memzero(&buf, sizeof(buf));
391190Smax.romanov@nginx.com
3921558Smax.romanov@nginx.com msg.fd[0] = -1;
3931558Smax.romanov@nginx.com msg.fd[1] = -1;
394190Smax.romanov@nginx.com msg.buf = &buf;
395190Smax.romanov@nginx.com msg.port = port;
396*1996St.nateldemoura@f5.com msg.u.removed_pid = peer;
397*1996St.nateldemoura@f5.com msg.port_msg.pid = nxt_pid;
398190Smax.romanov@nginx.com msg.port_msg.type = _NXT_PORT_MSG_REMOVE_PID;
399190Smax.romanov@nginx.com
400190Smax.romanov@nginx.com peer_link = lhq.value;
401190Smax.romanov@nginx.com last = 0;
402190Smax.romanov@nginx.com
403190Smax.romanov@nginx.com while (last == 0) {
404190Smax.romanov@nginx.com
405190Smax.romanov@nginx.com reg = nxt_queue_link_data(peer_link, nxt_port_rpc_reg_t, link);
406190Smax.romanov@nginx.com
407318Smax.romanov@nginx.com nxt_assert(reg->peer == peer);
408318Smax.romanov@nginx.com
409318Smax.romanov@nginx.com stream = reg->stream;
410190Smax.romanov@nginx.com
411318Smax.romanov@nginx.com nxt_debug(task, "rpc: stream #%uD trigger error", stream);
412318Smax.romanov@nginx.com
413318Smax.romanov@nginx.com msg.port_msg.stream = stream;
414425Smax.romanov@nginx.com msg.port_msg.last = 1;
415190Smax.romanov@nginx.com
416190Smax.romanov@nginx.com if (peer_link == peer_link->next) {
417318Smax.romanov@nginx.com nxt_assert(peer_link->prev == peer_link);
418318Smax.romanov@nginx.com
419190Smax.romanov@nginx.com last = 1;
420190Smax.romanov@nginx.com
421190Smax.romanov@nginx.com } else {
422318Smax.romanov@nginx.com nxt_assert(peer_link->next->prev == peer_link);
423318Smax.romanov@nginx.com nxt_assert(peer_link->prev->next == peer_link);
424318Smax.romanov@nginx.com
425318Smax.romanov@nginx.com next_link = peer_link->next;
426318Smax.romanov@nginx.com nxt_queue_remove(peer_link);
427318Smax.romanov@nginx.com
428318Smax.romanov@nginx.com peer_link = next_link;
429190Smax.romanov@nginx.com }
430190Smax.romanov@nginx.com
431425Smax.romanov@nginx.com reg->peer = -1;
432425Smax.romanov@nginx.com
433425Smax.romanov@nginx.com reg->error_handler(task, &msg, reg->data);
434425Smax.romanov@nginx.com
435425Smax.romanov@nginx.com /* Reset 'last' flag to preserve rpc handler. */
436425Smax.romanov@nginx.com if (msg.port_msg.last == 0) {
437425Smax.romanov@nginx.com continue;
438425Smax.romanov@nginx.com }
439425Smax.romanov@nginx.com
440425Smax.romanov@nginx.com nxt_port_rpc_lhq_stream(&lhq, &stream);
441425Smax.romanov@nginx.com lhq.pool = port->mem_pool;
442425Smax.romanov@nginx.com
443425Smax.romanov@nginx.com ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq);
444425Smax.romanov@nginx.com
445425Smax.romanov@nginx.com if (nxt_slow_path(ret != NXT_OK)) {
446425Smax.romanov@nginx.com nxt_log_error(NXT_LOG_ERR, task->log,
447425Smax.romanov@nginx.com "rpc: stream #%uD failed to delete handler", stream);
448425Smax.romanov@nginx.com
449425Smax.romanov@nginx.com return;
450425Smax.romanov@nginx.com }
451425Smax.romanov@nginx.com
452190Smax.romanov@nginx.com nxt_mp_free(port->mem_pool, reg);
453582Smax.romanov@nginx.com
454582Smax.romanov@nginx.com nxt_port_use(task, port, -1);
455190Smax.romanov@nginx.com }
456190Smax.romanov@nginx.com }
457190Smax.romanov@nginx.com
458190Smax.romanov@nginx.com
459190Smax.romanov@nginx.com void
nxt_port_rpc_cancel(nxt_task_t * task,nxt_port_t * port,uint32_t stream)460190Smax.romanov@nginx.com nxt_port_rpc_cancel(nxt_task_t *task, nxt_port_t *port, uint32_t stream)
461190Smax.romanov@nginx.com {
462190Smax.romanov@nginx.com nxt_int_t ret;
463190Smax.romanov@nginx.com nxt_port_rpc_reg_t *reg;
464190Smax.romanov@nginx.com nxt_lvlhsh_query_t lhq;
465190Smax.romanov@nginx.com
466190Smax.romanov@nginx.com nxt_port_rpc_lhq_stream(&lhq, &stream);
467190Smax.romanov@nginx.com lhq.pool = port->mem_pool;
468190Smax.romanov@nginx.com
469190Smax.romanov@nginx.com ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq);
470190Smax.romanov@nginx.com
471190Smax.romanov@nginx.com if (ret != NXT_OK) {
472318Smax.romanov@nginx.com nxt_debug(task, "rpc: stream #%uD no handler found", stream);
473190Smax.romanov@nginx.com
474190Smax.romanov@nginx.com return;
475190Smax.romanov@nginx.com }
476190Smax.romanov@nginx.com
477190Smax.romanov@nginx.com reg = lhq.value;
478190Smax.romanov@nginx.com
479204Smax.romanov@nginx.com if (reg->peer != -1) {
480318Smax.romanov@nginx.com nxt_port_rpc_remove_from_peers(task, port, reg);
481318Smax.romanov@nginx.com }
482190Smax.romanov@nginx.com
483318Smax.romanov@nginx.com nxt_debug(task, "rpc: stream #%uD cancel registration", stream);
484190Smax.romanov@nginx.com
485190Smax.romanov@nginx.com nxt_mp_free(port->mem_pool, reg);
486582Smax.romanov@nginx.com
487582Smax.romanov@nginx.com nxt_port_use(task, port, -1);
488190Smax.romanov@nginx.com }
489583Smax.romanov@nginx.com
490583Smax.romanov@nginx.com static nxt_buf_t nxt_port_close_dummy_buf;
491583Smax.romanov@nginx.com
492583Smax.romanov@nginx.com void
nxt_port_rpc_close(nxt_task_t * task,nxt_port_t * port)493583Smax.romanov@nginx.com nxt_port_rpc_close(nxt_task_t *task, nxt_port_t *port)
494583Smax.romanov@nginx.com {
495583Smax.romanov@nginx.com nxt_port_rpc_reg_t *reg;
496583Smax.romanov@nginx.com nxt_port_recv_msg_t msg;
497583Smax.romanov@nginx.com
498583Smax.romanov@nginx.com for ( ;; ) {
499596Sigor@sysoev.ru reg = nxt_lvlhsh_peek(&port->rpc_streams, &lvlhsh_rpc_reg_proto);
500583Smax.romanov@nginx.com if (reg == NULL) {
501583Smax.romanov@nginx.com return;
502583Smax.romanov@nginx.com }
503583Smax.romanov@nginx.com
5041558Smax.romanov@nginx.com msg.fd[0] = -1;
5051558Smax.romanov@nginx.com msg.fd[1] = -1;
506583Smax.romanov@nginx.com msg.buf = &nxt_port_close_dummy_buf;
507583Smax.romanov@nginx.com msg.port = port;
508583Smax.romanov@nginx.com msg.port_msg.stream = reg->stream;
509583Smax.romanov@nginx.com msg.port_msg.pid = nxt_pid;
510583Smax.romanov@nginx.com msg.port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
511583Smax.romanov@nginx.com msg.port_msg.last = 1;
512583Smax.romanov@nginx.com msg.port_msg.mmap = 0;
513583Smax.romanov@nginx.com msg.port_msg.nf = 0;
514583Smax.romanov@nginx.com msg.port_msg.mf = 0;
515956Smax.romanov@nginx.com msg.size = 0;
516583Smax.romanov@nginx.com msg.cancelled = 0;
517583Smax.romanov@nginx.com msg.u.data = NULL;
518583Smax.romanov@nginx.com
519583Smax.romanov@nginx.com nxt_port_rpc_handler(task, &msg);
520583Smax.romanov@nginx.com }
521583Smax.romanov@nginx.com }
522