xref: /unit/src/nxt_port_rpc.c (revision 613)
1190Smax.romanov@nginx.com 
2190Smax.romanov@nginx.com /*
3190Smax.romanov@nginx.com  * Copyright (C) Max Romanov
4190Smax.romanov@nginx.com  * Copyright (C) NGINX, Inc.
5190Smax.romanov@nginx.com  */
6190Smax.romanov@nginx.com 
7190Smax.romanov@nginx.com #include <nxt_main.h>
8190Smax.romanov@nginx.com #include <nxt_port_rpc.h>
9190Smax.romanov@nginx.com 
10190Smax.romanov@nginx.com 
11318Smax.romanov@nginx.com static nxt_atomic_t  nxt_stream_ident = 1;
12318Smax.romanov@nginx.com 
13190Smax.romanov@nginx.com typedef struct nxt_port_rpc_reg_s nxt_port_rpc_reg_t;
14190Smax.romanov@nginx.com 
15190Smax.romanov@nginx.com struct nxt_port_rpc_reg_s {
16190Smax.romanov@nginx.com     uint32_t                stream;
17190Smax.romanov@nginx.com 
18190Smax.romanov@nginx.com     nxt_pid_t               peer;
19190Smax.romanov@nginx.com     nxt_queue_link_t        link;
20318Smax.romanov@nginx.com     nxt_bool_t              link_first;
21190Smax.romanov@nginx.com 
22190Smax.romanov@nginx.com     nxt_port_rpc_handler_t  ready_handler;
23190Smax.romanov@nginx.com     nxt_port_rpc_handler_t  error_handler;
24190Smax.romanov@nginx.com     void                    *data;
25190Smax.romanov@nginx.com };
26190Smax.romanov@nginx.com 
27190Smax.romanov@nginx.com 
28425Smax.romanov@nginx.com static void
29425Smax.romanov@nginx.com nxt_port_rpc_remove_from_peers(nxt_task_t *task, nxt_port_t *port,
30425Smax.romanov@nginx.com     nxt_port_rpc_reg_t *reg);
31425Smax.romanov@nginx.com 
32425Smax.romanov@nginx.com 
33190Smax.romanov@nginx.com static nxt_int_t
34190Smax.romanov@nginx.com nxt_rpc_reg_test(nxt_lvlhsh_query_t *lhq, void *data)
35190Smax.romanov@nginx.com {
36190Smax.romanov@nginx.com     return NXT_OK;
37190Smax.romanov@nginx.com }
38190Smax.romanov@nginx.com 
39190Smax.romanov@nginx.com 
40190Smax.romanov@nginx.com static const nxt_lvlhsh_proto_t  lvlhsh_rpc_reg_proto  nxt_aligned(64) = {
41190Smax.romanov@nginx.com     NXT_LVLHSH_DEFAULT,
42190Smax.romanov@nginx.com     nxt_rpc_reg_test,
43190Smax.romanov@nginx.com     nxt_lvlhsh_alloc,
44190Smax.romanov@nginx.com     nxt_lvlhsh_free,
45190Smax.romanov@nginx.com };
46190Smax.romanov@nginx.com 
47190Smax.romanov@nginx.com 
48190Smax.romanov@nginx.com nxt_inline void
49190Smax.romanov@nginx.com nxt_port_rpc_lhq_stream(nxt_lvlhsh_query_t *lhq, uint32_t *stream)
50190Smax.romanov@nginx.com {
51190Smax.romanov@nginx.com     lhq->key_hash = nxt_murmur_hash2(stream, sizeof(*stream));
52190Smax.romanov@nginx.com     lhq->key.length = sizeof(*stream);
53190Smax.romanov@nginx.com     lhq->key.start = (u_char *) stream;
54190Smax.romanov@nginx.com     lhq->proto = &lvlhsh_rpc_reg_proto;
55190Smax.romanov@nginx.com }
56190Smax.romanov@nginx.com 
57190Smax.romanov@nginx.com 
58190Smax.romanov@nginx.com nxt_inline void
59190Smax.romanov@nginx.com nxt_port_rpc_lhq_peer(nxt_lvlhsh_query_t *lhq, nxt_pid_t *peer)
60190Smax.romanov@nginx.com {
61190Smax.romanov@nginx.com     lhq->key_hash = nxt_murmur_hash2(peer, sizeof(*peer));
62190Smax.romanov@nginx.com     lhq->key.length = sizeof(*peer);
63190Smax.romanov@nginx.com     lhq->key.start = (u_char *) peer;
64190Smax.romanov@nginx.com     lhq->proto = &lvlhsh_rpc_reg_proto;
65190Smax.romanov@nginx.com }
66190Smax.romanov@nginx.com 
67190Smax.romanov@nginx.com 
68190Smax.romanov@nginx.com uint32_t
69190Smax.romanov@nginx.com nxt_port_rpc_register_handler(nxt_task_t *task, nxt_port_t *port,
70190Smax.romanov@nginx.com     nxt_port_rpc_handler_t ready_handler, nxt_port_rpc_handler_t error_handler,
71190Smax.romanov@nginx.com     nxt_pid_t peer, void *data)
72190Smax.romanov@nginx.com {
73318Smax.romanov@nginx.com     void                *ex;
74318Smax.romanov@nginx.com     nxt_port_rpc_reg_t  *reg;
75318Smax.romanov@nginx.com 
76318Smax.romanov@nginx.com     ex = nxt_port_rpc_register_handler_ex(task, port, ready_handler,
77318Smax.romanov@nginx.com                                           error_handler, 0);
78318Smax.romanov@nginx.com 
79318Smax.romanov@nginx.com     if (ex == NULL) {
80318Smax.romanov@nginx.com         return 0;
81318Smax.romanov@nginx.com     }
82318Smax.romanov@nginx.com 
83318Smax.romanov@nginx.com     if (peer != -1) {
84318Smax.romanov@nginx.com         nxt_port_rpc_ex_set_peer(task, port, ex, peer);
85318Smax.romanov@nginx.com     }
86318Smax.romanov@nginx.com 
87318Smax.romanov@nginx.com     reg = nxt_pointer_to(ex, -sizeof(nxt_port_rpc_reg_t));
88318Smax.romanov@nginx.com 
89318Smax.romanov@nginx.com     nxt_assert(reg->data == ex);
90318Smax.romanov@nginx.com 
91318Smax.romanov@nginx.com     reg->data = data;
92318Smax.romanov@nginx.com 
93318Smax.romanov@nginx.com     return reg->stream;
94318Smax.romanov@nginx.com }
95318Smax.romanov@nginx.com 
96318Smax.romanov@nginx.com 
97318Smax.romanov@nginx.com void *
98318Smax.romanov@nginx.com nxt_port_rpc_register_handler_ex(nxt_task_t *task, nxt_port_t *port,
99318Smax.romanov@nginx.com     nxt_port_rpc_handler_t ready_handler, nxt_port_rpc_handler_t error_handler,
100318Smax.romanov@nginx.com     size_t ex_size)
101318Smax.romanov@nginx.com {
102190Smax.romanov@nginx.com     uint32_t            stream;
103190Smax.romanov@nginx.com     nxt_port_rpc_reg_t  *reg;
104190Smax.romanov@nginx.com     nxt_lvlhsh_query_t  lhq;
105190Smax.romanov@nginx.com 
106277Sigor@sysoev.ru     nxt_assert(port->pair[0] != -1);
107190Smax.romanov@nginx.com 
108318Smax.romanov@nginx.com     stream =
109611Svbart@nginx.com         (uint32_t) nxt_atomic_fetch_add(&nxt_stream_ident, 1) & 0x3FFFFFFF;
110190Smax.romanov@nginx.com 
111318Smax.romanov@nginx.com     reg = nxt_mp_zalloc(port->mem_pool, sizeof(nxt_port_rpc_reg_t) + ex_size);
112190Smax.romanov@nginx.com 
113190Smax.romanov@nginx.com     if (nxt_slow_path(reg == NULL)) {
114318Smax.romanov@nginx.com         nxt_debug(task, "rpc: stream #%uD failed to allocate reg", stream);
115190Smax.romanov@nginx.com 
116318Smax.romanov@nginx.com         return NULL;
117190Smax.romanov@nginx.com     }
118190Smax.romanov@nginx.com 
119190Smax.romanov@nginx.com     reg->stream = stream;
120318Smax.romanov@nginx.com     reg->peer = -1;
121190Smax.romanov@nginx.com     reg->ready_handler = ready_handler;
122190Smax.romanov@nginx.com     reg->error_handler = error_handler;
123318Smax.romanov@nginx.com     reg->data = reg + 1;
124190Smax.romanov@nginx.com 
125190Smax.romanov@nginx.com     nxt_port_rpc_lhq_stream(&lhq, &stream);
126190Smax.romanov@nginx.com     lhq.replace = 0;
127190Smax.romanov@nginx.com     lhq.value = reg;
128190Smax.romanov@nginx.com     lhq.pool = port->mem_pool;
129190Smax.romanov@nginx.com 
130190Smax.romanov@nginx.com     switch (nxt_lvlhsh_insert(&port->rpc_streams, &lhq)) {
131190Smax.romanov@nginx.com 
132190Smax.romanov@nginx.com     case NXT_OK:
133190Smax.romanov@nginx.com         break;
134190Smax.romanov@nginx.com 
135190Smax.romanov@nginx.com     default:
136318Smax.romanov@nginx.com         nxt_log_error(NXT_LOG_ERR, task->log, "rpc: stream #%uD failed to add "
137318Smax.romanov@nginx.com                       "reg ", stream);
138190Smax.romanov@nginx.com 
139190Smax.romanov@nginx.com         nxt_mp_free(port->mem_pool, reg);
140190Smax.romanov@nginx.com 
141318Smax.romanov@nginx.com         return NULL;
142318Smax.romanov@nginx.com     }
143318Smax.romanov@nginx.com 
144318Smax.romanov@nginx.com     nxt_debug(task, "rpc: stream #%uD registered", stream);
145318Smax.romanov@nginx.com 
146582Smax.romanov@nginx.com     nxt_port_inc_use(port);
147582Smax.romanov@nginx.com 
148318Smax.romanov@nginx.com     return reg->data;
149318Smax.romanov@nginx.com }
150318Smax.romanov@nginx.com 
151318Smax.romanov@nginx.com 
152318Smax.romanov@nginx.com uint32_t
153318Smax.romanov@nginx.com nxt_port_rpc_ex_stream(void *ex)
154318Smax.romanov@nginx.com {
155318Smax.romanov@nginx.com     nxt_port_rpc_reg_t  *reg;
156318Smax.romanov@nginx.com 
157318Smax.romanov@nginx.com     reg = nxt_pointer_to(ex, -sizeof(nxt_port_rpc_reg_t));
158318Smax.romanov@nginx.com 
159318Smax.romanov@nginx.com     nxt_assert(reg->data == ex);
160318Smax.romanov@nginx.com 
161318Smax.romanov@nginx.com     return reg->stream;
162318Smax.romanov@nginx.com }
163318Smax.romanov@nginx.com 
164318Smax.romanov@nginx.com 
165318Smax.romanov@nginx.com void
166318Smax.romanov@nginx.com nxt_port_rpc_ex_set_peer(nxt_task_t *task, nxt_port_t *port,
167318Smax.romanov@nginx.com     void *ex, nxt_pid_t peer)
168318Smax.romanov@nginx.com {
169318Smax.romanov@nginx.com     nxt_int_t           ret;
170318Smax.romanov@nginx.com     nxt_queue_link_t    *peer_link;
171318Smax.romanov@nginx.com     nxt_port_rpc_reg_t  *reg;
172318Smax.romanov@nginx.com     nxt_lvlhsh_query_t  lhq;
173318Smax.romanov@nginx.com 
174318Smax.romanov@nginx.com     reg = nxt_pointer_to(ex, -sizeof(nxt_port_rpc_reg_t));
175318Smax.romanov@nginx.com 
176318Smax.romanov@nginx.com     nxt_assert(reg->data == ex);
177318Smax.romanov@nginx.com 
178425Smax.romanov@nginx.com     if (nxt_slow_path(peer == reg->peer)) {
179425Smax.romanov@nginx.com         return;
180425Smax.romanov@nginx.com     }
181318Smax.romanov@nginx.com 
182425Smax.romanov@nginx.com     if (reg->peer != -1) {
183425Smax.romanov@nginx.com         nxt_port_rpc_remove_from_peers(task, port, reg);
184425Smax.romanov@nginx.com 
185425Smax.romanov@nginx.com         reg->peer = -1;
186425Smax.romanov@nginx.com     }
187425Smax.romanov@nginx.com 
188425Smax.romanov@nginx.com     if (peer == -1) {
189318Smax.romanov@nginx.com         return;
190190Smax.romanov@nginx.com     }
191190Smax.romanov@nginx.com 
192318Smax.romanov@nginx.com     reg->peer = peer;
193318Smax.romanov@nginx.com 
194318Smax.romanov@nginx.com     nxt_port_rpc_lhq_peer(&lhq, &peer);
195318Smax.romanov@nginx.com     lhq.replace = 0;
196318Smax.romanov@nginx.com     lhq.value = &reg->link;
197318Smax.romanov@nginx.com     lhq.pool = port->mem_pool;
198318Smax.romanov@nginx.com 
199318Smax.romanov@nginx.com     ret = nxt_lvlhsh_insert(&port->rpc_peers, &lhq);
200318Smax.romanov@nginx.com 
201318Smax.romanov@nginx.com     switch (ret) {
202318Smax.romanov@nginx.com 
203318Smax.romanov@nginx.com     case NXT_OK:
204318Smax.romanov@nginx.com         reg->link_first = 1;
205318Smax.romanov@nginx.com         nxt_queue_self(&reg->link);
206318Smax.romanov@nginx.com 
207318Smax.romanov@nginx.com         nxt_debug(task, "rpc: stream #%uD assigned uniq pid %PI (%p)",
208318Smax.romanov@nginx.com                   reg->stream, reg->peer, reg->link.next);
209318Smax.romanov@nginx.com         break;
210318Smax.romanov@nginx.com 
211318Smax.romanov@nginx.com     case NXT_DECLINED:
212318Smax.romanov@nginx.com         reg->link_first = 0;
213318Smax.romanov@nginx.com         peer_link = lhq.value;
214318Smax.romanov@nginx.com         nxt_queue_insert_after(peer_link, &reg->link);
215318Smax.romanov@nginx.com 
216318Smax.romanov@nginx.com         nxt_debug(task, "rpc: stream #%uD assigned duplicate pid %PI (%p)",
217318Smax.romanov@nginx.com                   reg->stream, reg->peer, reg->link.next);
218318Smax.romanov@nginx.com         break;
219318Smax.romanov@nginx.com 
220318Smax.romanov@nginx.com     default:
221494Spluknet@nginx.com         nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to add "
222318Smax.romanov@nginx.com                       "peer for stream #%uD (%d)", reg->stream, ret);
223318Smax.romanov@nginx.com 
224318Smax.romanov@nginx.com         reg->peer = -1;
225318Smax.romanov@nginx.com         break;
226318Smax.romanov@nginx.com     }
227318Smax.romanov@nginx.com 
228318Smax.romanov@nginx.com }
229318Smax.romanov@nginx.com 
230318Smax.romanov@nginx.com 
231318Smax.romanov@nginx.com static void
232318Smax.romanov@nginx.com nxt_port_rpc_remove_from_peers(nxt_task_t *task, nxt_port_t *port,
233318Smax.romanov@nginx.com     nxt_port_rpc_reg_t *reg)
234318Smax.romanov@nginx.com {
235318Smax.romanov@nginx.com     uint32_t            stream;
236318Smax.romanov@nginx.com     nxt_int_t           ret;
237318Smax.romanov@nginx.com     nxt_lvlhsh_query_t  lhq;
238318Smax.romanov@nginx.com     nxt_port_rpc_reg_t  *r;
239318Smax.romanov@nginx.com 
240318Smax.romanov@nginx.com     stream = reg->stream;
241318Smax.romanov@nginx.com 
242318Smax.romanov@nginx.com     if (reg->link_first != 0) {
243318Smax.romanov@nginx.com         nxt_port_rpc_lhq_peer(&lhq, &reg->peer);
244204Smax.romanov@nginx.com         lhq.pool = port->mem_pool;
245190Smax.romanov@nginx.com 
246318Smax.romanov@nginx.com         if (reg->link.next == &reg->link) {
247318Smax.romanov@nginx.com             nxt_assert(reg->link.prev == &reg->link);
248318Smax.romanov@nginx.com 
249318Smax.romanov@nginx.com             nxt_debug(task, "rpc: stream #%uD remove first and last pid %PI "
250318Smax.romanov@nginx.com                       "registration (%p)", stream, reg->peer, reg->link.next);
251190Smax.romanov@nginx.com 
252318Smax.romanov@nginx.com             ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq);
253*613Svbart@nginx.com 
254318Smax.romanov@nginx.com         } else {
255318Smax.romanov@nginx.com             nxt_debug(task, "rpc: stream #%uD remove first pid %PI "
256318Smax.romanov@nginx.com                       "registration (%p)", stream, reg->peer, reg->link.next);
257318Smax.romanov@nginx.com 
258318Smax.romanov@nginx.com             lhq.replace = 1;
259318Smax.romanov@nginx.com             lhq.value = reg->link.next;
260190Smax.romanov@nginx.com 
261318Smax.romanov@nginx.com             r = nxt_queue_link_data(reg->link.next, nxt_port_rpc_reg_t, link);
262318Smax.romanov@nginx.com             r->link_first = 1;
263318Smax.romanov@nginx.com 
264318Smax.romanov@nginx.com             nxt_queue_remove(&reg->link);
265190Smax.romanov@nginx.com 
266318Smax.romanov@nginx.com             ret = nxt_lvlhsh_insert(&port->rpc_peers, &lhq);
267204Smax.romanov@nginx.com         }
268*613Svbart@nginx.com 
269318Smax.romanov@nginx.com     } else {
270318Smax.romanov@nginx.com         nxt_debug(task, "rpc: stream #%uD remove pid %PI "
271318Smax.romanov@nginx.com                   "registration (%p)", stream, reg->peer, reg->link.next);
272318Smax.romanov@nginx.com 
273318Smax.romanov@nginx.com         nxt_queue_remove(&reg->link);
274318Smax.romanov@nginx.com         ret = NXT_OK;
275190Smax.romanov@nginx.com     }
276190Smax.romanov@nginx.com 
277318Smax.romanov@nginx.com     if (nxt_slow_path(ret != NXT_OK)) {
278318Smax.romanov@nginx.com         nxt_log_error(NXT_LOG_ERR, task->log, "rpc: stream #%uD failed"
279318Smax.romanov@nginx.com                       " to delete peer %PI (%d)", stream, reg->peer, ret);
280318Smax.romanov@nginx.com     }
281190Smax.romanov@nginx.com }
282190Smax.romanov@nginx.com 
283190Smax.romanov@nginx.com 
284190Smax.romanov@nginx.com void
285190Smax.romanov@nginx.com nxt_port_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
286190Smax.romanov@nginx.com {
287190Smax.romanov@nginx.com     uint8_t              last;
288190Smax.romanov@nginx.com     uint32_t             stream;
289190Smax.romanov@nginx.com     nxt_int_t            ret;
290190Smax.romanov@nginx.com     nxt_port_t           *port;
291190Smax.romanov@nginx.com     nxt_port_rpc_reg_t   *reg;
292190Smax.romanov@nginx.com     nxt_lvlhsh_query_t   lhq;
293190Smax.romanov@nginx.com     nxt_port_msg_type_t  type;
294190Smax.romanov@nginx.com 
295190Smax.romanov@nginx.com     stream = msg->port_msg.stream;
296190Smax.romanov@nginx.com     port = msg->port;
297190Smax.romanov@nginx.com     last = msg->port_msg.last;
298190Smax.romanov@nginx.com     type = msg->port_msg.type;
299190Smax.romanov@nginx.com 
300190Smax.romanov@nginx.com     nxt_port_rpc_lhq_stream(&lhq, &stream);
301190Smax.romanov@nginx.com     lhq.pool = port->mem_pool;
302190Smax.romanov@nginx.com 
303190Smax.romanov@nginx.com     if (last != 0) {
304190Smax.romanov@nginx.com         ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq);
305277Sigor@sysoev.ru 
306190Smax.romanov@nginx.com     } else {
307190Smax.romanov@nginx.com         ret = nxt_lvlhsh_find(&port->rpc_streams, &lhq);
308190Smax.romanov@nginx.com     }
309190Smax.romanov@nginx.com 
310190Smax.romanov@nginx.com     if (ret != NXT_OK) {
311318Smax.romanov@nginx.com         nxt_debug(task, "rpc: stream #%uD no handler found", stream);
312190Smax.romanov@nginx.com 
313190Smax.romanov@nginx.com         return;
314190Smax.romanov@nginx.com     }
315190Smax.romanov@nginx.com 
316318Smax.romanov@nginx.com     nxt_debug(task, "rpc: stream #%uD %shandler, type %d", stream,
317318Smax.romanov@nginx.com                     (last ? "last " : ""), type);
318318Smax.romanov@nginx.com 
319190Smax.romanov@nginx.com     reg = lhq.value;
320190Smax.romanov@nginx.com 
321190Smax.romanov@nginx.com     if (type == _NXT_PORT_MSG_RPC_ERROR) {
322190Smax.romanov@nginx.com         reg->error_handler(task, msg, reg->data);
323277Sigor@sysoev.ru 
324190Smax.romanov@nginx.com     } else {
325190Smax.romanov@nginx.com         reg->ready_handler(task, msg, reg->data);
326190Smax.romanov@nginx.com     }
327190Smax.romanov@nginx.com 
328190Smax.romanov@nginx.com     if (last == 0) {
329190Smax.romanov@nginx.com         return;
330190Smax.romanov@nginx.com     }
331190Smax.romanov@nginx.com 
332204Smax.romanov@nginx.com     if (reg->peer != -1) {
333318Smax.romanov@nginx.com         nxt_port_rpc_remove_from_peers(task, port, reg);
334318Smax.romanov@nginx.com     }
335190Smax.romanov@nginx.com 
336318Smax.romanov@nginx.com     nxt_debug(task, "rpc: stream #%uD free registration", stream);
337190Smax.romanov@nginx.com 
338190Smax.romanov@nginx.com     nxt_mp_free(port->mem_pool, reg);
339582Smax.romanov@nginx.com 
340582Smax.romanov@nginx.com     nxt_port_use(task, port, -1);
341190Smax.romanov@nginx.com }
342190Smax.romanov@nginx.com 
343190Smax.romanov@nginx.com 
344190Smax.romanov@nginx.com void
345190Smax.romanov@nginx.com nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer)
346190Smax.romanov@nginx.com {
347190Smax.romanov@nginx.com     uint8_t              last;
348190Smax.romanov@nginx.com     uint32_t             stream;
349190Smax.romanov@nginx.com     nxt_int_t            ret;
350190Smax.romanov@nginx.com     nxt_buf_t            buf;
351318Smax.romanov@nginx.com     nxt_queue_link_t     *peer_link, *next_link;
352190Smax.romanov@nginx.com     nxt_port_rpc_reg_t   *reg;
353190Smax.romanov@nginx.com     nxt_lvlhsh_query_t   lhq;
354190Smax.romanov@nginx.com     nxt_port_recv_msg_t  msg;
355190Smax.romanov@nginx.com 
356190Smax.romanov@nginx.com     nxt_port_rpc_lhq_peer(&lhq, &peer);
357190Smax.romanov@nginx.com     lhq.pool = port->mem_pool;
358190Smax.romanov@nginx.com 
359190Smax.romanov@nginx.com     ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq);
360190Smax.romanov@nginx.com 
361190Smax.romanov@nginx.com     if (nxt_slow_path(ret != NXT_OK)) {
362318Smax.romanov@nginx.com         nxt_debug(task, "rpc: no reg found for peer %PI", peer);
363190Smax.romanov@nginx.com 
364190Smax.romanov@nginx.com         return;
365190Smax.romanov@nginx.com     }
366190Smax.romanov@nginx.com 
367190Smax.romanov@nginx.com     nxt_memzero(&msg, sizeof(msg));
368190Smax.romanov@nginx.com     nxt_memzero(&buf, sizeof(buf));
369190Smax.romanov@nginx.com 
370190Smax.romanov@nginx.com     msg.fd = -1;
371190Smax.romanov@nginx.com     msg.buf = &buf;
372190Smax.romanov@nginx.com     msg.port = port;
373190Smax.romanov@nginx.com 
374190Smax.romanov@nginx.com     msg.port_msg.pid = peer;
375190Smax.romanov@nginx.com     msg.port_msg.type = _NXT_PORT_MSG_REMOVE_PID;
376190Smax.romanov@nginx.com 
377190Smax.romanov@nginx.com     peer_link = lhq.value;
378190Smax.romanov@nginx.com     last = 0;
379190Smax.romanov@nginx.com 
380190Smax.romanov@nginx.com     while (last == 0) {
381190Smax.romanov@nginx.com 
382190Smax.romanov@nginx.com         reg = nxt_queue_link_data(peer_link, nxt_port_rpc_reg_t, link);
383190Smax.romanov@nginx.com 
384318Smax.romanov@nginx.com         nxt_assert(reg->peer == peer);
385318Smax.romanov@nginx.com 
386318Smax.romanov@nginx.com         stream = reg->stream;
387190Smax.romanov@nginx.com 
388318Smax.romanov@nginx.com         nxt_debug(task, "rpc: stream #%uD trigger error", stream);
389318Smax.romanov@nginx.com 
390318Smax.romanov@nginx.com         msg.port_msg.stream = stream;
391425Smax.romanov@nginx.com         msg.port_msg.last = 1;
392190Smax.romanov@nginx.com 
393190Smax.romanov@nginx.com         if (peer_link == peer_link->next) {
394318Smax.romanov@nginx.com             nxt_assert(peer_link->prev == peer_link);
395318Smax.romanov@nginx.com 
396190Smax.romanov@nginx.com             last = 1;
397190Smax.romanov@nginx.com 
398190Smax.romanov@nginx.com         } else {
399318Smax.romanov@nginx.com             nxt_assert(peer_link->next->prev == peer_link);
400318Smax.romanov@nginx.com             nxt_assert(peer_link->prev->next == peer_link);
401318Smax.romanov@nginx.com 
402318Smax.romanov@nginx.com             next_link = peer_link->next;
403318Smax.romanov@nginx.com             nxt_queue_remove(peer_link);
404318Smax.romanov@nginx.com 
405318Smax.romanov@nginx.com             peer_link = next_link;
406190Smax.romanov@nginx.com         }
407190Smax.romanov@nginx.com 
408425Smax.romanov@nginx.com         reg->peer = -1;
409425Smax.romanov@nginx.com 
410425Smax.romanov@nginx.com         reg->error_handler(task, &msg, reg->data);
411425Smax.romanov@nginx.com 
412425Smax.romanov@nginx.com         /* Reset 'last' flag to preserve rpc handler. */
413425Smax.romanov@nginx.com         if (msg.port_msg.last == 0) {
414425Smax.romanov@nginx.com             continue;
415425Smax.romanov@nginx.com         }
416425Smax.romanov@nginx.com 
417425Smax.romanov@nginx.com         nxt_port_rpc_lhq_stream(&lhq, &stream);
418425Smax.romanov@nginx.com         lhq.pool = port->mem_pool;
419425Smax.romanov@nginx.com 
420425Smax.romanov@nginx.com         ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq);
421425Smax.romanov@nginx.com 
422425Smax.romanov@nginx.com         if (nxt_slow_path(ret != NXT_OK)) {
423425Smax.romanov@nginx.com             nxt_log_error(NXT_LOG_ERR, task->log,
424425Smax.romanov@nginx.com                           "rpc: stream #%uD failed to delete handler", stream);
425425Smax.romanov@nginx.com 
426425Smax.romanov@nginx.com             return;
427425Smax.romanov@nginx.com         }
428425Smax.romanov@nginx.com 
429190Smax.romanov@nginx.com         nxt_mp_free(port->mem_pool, reg);
430582Smax.romanov@nginx.com 
431582Smax.romanov@nginx.com         nxt_port_use(task, port, -1);
432190Smax.romanov@nginx.com     }
433190Smax.romanov@nginx.com }
434190Smax.romanov@nginx.com 
435190Smax.romanov@nginx.com 
436190Smax.romanov@nginx.com void
437190Smax.romanov@nginx.com nxt_port_rpc_cancel(nxt_task_t *task, nxt_port_t *port, uint32_t stream)
438190Smax.romanov@nginx.com {
439190Smax.romanov@nginx.com     nxt_int_t           ret;
440190Smax.romanov@nginx.com     nxt_port_rpc_reg_t  *reg;
441190Smax.romanov@nginx.com     nxt_lvlhsh_query_t  lhq;
442190Smax.romanov@nginx.com 
443190Smax.romanov@nginx.com     nxt_port_rpc_lhq_stream(&lhq, &stream);
444190Smax.romanov@nginx.com     lhq.pool = port->mem_pool;
445190Smax.romanov@nginx.com 
446190Smax.romanov@nginx.com     ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq);
447190Smax.romanov@nginx.com 
448190Smax.romanov@nginx.com     if (ret != NXT_OK) {
449318Smax.romanov@nginx.com         nxt_debug(task, "rpc: stream #%uD no handler found", stream);
450190Smax.romanov@nginx.com 
451190Smax.romanov@nginx.com         return;
452190Smax.romanov@nginx.com     }
453190Smax.romanov@nginx.com 
454190Smax.romanov@nginx.com     reg = lhq.value;
455190Smax.romanov@nginx.com 
456204Smax.romanov@nginx.com     if (reg->peer != -1) {
457318Smax.romanov@nginx.com         nxt_port_rpc_remove_from_peers(task, port, reg);
458318Smax.romanov@nginx.com     }
459190Smax.romanov@nginx.com 
460318Smax.romanov@nginx.com     nxt_debug(task, "rpc: stream #%uD cancel registration", stream);
461190Smax.romanov@nginx.com 
462190Smax.romanov@nginx.com     nxt_mp_free(port->mem_pool, reg);
463582Smax.romanov@nginx.com 
464582Smax.romanov@nginx.com     nxt_port_use(task, port, -1);
465190Smax.romanov@nginx.com }
466583Smax.romanov@nginx.com 
467583Smax.romanov@nginx.com static nxt_buf_t  nxt_port_close_dummy_buf;
468583Smax.romanov@nginx.com 
469583Smax.romanov@nginx.com void
470583Smax.romanov@nginx.com nxt_port_rpc_close(nxt_task_t *task, nxt_port_t *port)
471583Smax.romanov@nginx.com {
472583Smax.romanov@nginx.com     nxt_port_rpc_reg_t   *reg;
473583Smax.romanov@nginx.com     nxt_port_recv_msg_t  msg;
474583Smax.romanov@nginx.com 
475583Smax.romanov@nginx.com     for ( ;; ) {
476596Sigor@sysoev.ru         reg = nxt_lvlhsh_peek(&port->rpc_streams, &lvlhsh_rpc_reg_proto);
477583Smax.romanov@nginx.com         if (reg == NULL) {
478583Smax.romanov@nginx.com             return;
479583Smax.romanov@nginx.com         }
480583Smax.romanov@nginx.com 
481583Smax.romanov@nginx.com         msg.fd = -1;
482583Smax.romanov@nginx.com         msg.buf = &nxt_port_close_dummy_buf;
483583Smax.romanov@nginx.com         msg.port = port;
484583Smax.romanov@nginx.com         msg.port_msg.stream = reg->stream;
485583Smax.romanov@nginx.com         msg.port_msg.pid = nxt_pid;
486583Smax.romanov@nginx.com         msg.port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
487583Smax.romanov@nginx.com         msg.port_msg.last = 1;
488583Smax.romanov@nginx.com         msg.port_msg.mmap = 0;
489583Smax.romanov@nginx.com         msg.port_msg.nf = 0;
490583Smax.romanov@nginx.com         msg.port_msg.mf = 0;
491583Smax.romanov@nginx.com         msg.port_msg.tracking = 0;
492583Smax.romanov@nginx.com         msg.size = sizeof(msg.port_msg);
493583Smax.romanov@nginx.com         msg.cancelled = 0;
494583Smax.romanov@nginx.com         msg.u.data = NULL;
495583Smax.romanov@nginx.com 
496583Smax.romanov@nginx.com         nxt_port_rpc_handler(task, &msg);
497583Smax.romanov@nginx.com     }
498583Smax.romanov@nginx.com }
499