xref: /unit/src/nxt_port_rpc.c (revision 2126:8542c8141a13)
1190Smax.romanov@nginx.com 
2190Smax.romanov@nginx.com /*
3190Smax.romanov@nginx.com  * Copyright (C) Max Romanov
4190Smax.romanov@nginx.com  * Copyright (C) NGINX, Inc.
5190Smax.romanov@nginx.com  */
6190Smax.romanov@nginx.com 
7190Smax.romanov@nginx.com #include <nxt_main.h>
8190Smax.romanov@nginx.com #include <nxt_port_rpc.h>
9190Smax.romanov@nginx.com 
10190Smax.romanov@nginx.com 
111487Smax.romanov@nginx.com static volatile uint32_t  *nxt_stream_ident;
12318Smax.romanov@nginx.com 
13190Smax.romanov@nginx.com typedef struct nxt_port_rpc_reg_s nxt_port_rpc_reg_t;
14190Smax.romanov@nginx.com 
15190Smax.romanov@nginx.com struct nxt_port_rpc_reg_s {
16190Smax.romanov@nginx.com     uint32_t                stream;
17190Smax.romanov@nginx.com 
18190Smax.romanov@nginx.com     nxt_pid_t               peer;
19190Smax.romanov@nginx.com     nxt_queue_link_t        link;
20318Smax.romanov@nginx.com     nxt_bool_t              link_first;
21190Smax.romanov@nginx.com 
22190Smax.romanov@nginx.com     nxt_port_rpc_handler_t  ready_handler;
23190Smax.romanov@nginx.com     nxt_port_rpc_handler_t  error_handler;
24190Smax.romanov@nginx.com     void                    *data;
25190Smax.romanov@nginx.com };
26190Smax.romanov@nginx.com 
27190Smax.romanov@nginx.com 
28425Smax.romanov@nginx.com static void
29425Smax.romanov@nginx.com nxt_port_rpc_remove_from_peers(nxt_task_t *task, nxt_port_t *port,
30425Smax.romanov@nginx.com     nxt_port_rpc_reg_t *reg);
31425Smax.romanov@nginx.com 
32425Smax.romanov@nginx.com 
331487Smax.romanov@nginx.com nxt_int_t
nxt_port_rpc_init(void)341487Smax.romanov@nginx.com nxt_port_rpc_init(void)
351487Smax.romanov@nginx.com {
361487Smax.romanov@nginx.com     void  *p;
371487Smax.romanov@nginx.com 
381487Smax.romanov@nginx.com     if (nxt_stream_ident != NULL) {
391487Smax.romanov@nginx.com         return NXT_OK;
401487Smax.romanov@nginx.com     }
411487Smax.romanov@nginx.com 
421487Smax.romanov@nginx.com     p = nxt_mem_mmap(NULL, sizeof(*nxt_stream_ident), PROT_READ | PROT_WRITE,
431487Smax.romanov@nginx.com                      MAP_ANON | MAP_SHARED, -1, 0);
441487Smax.romanov@nginx.com 
451487Smax.romanov@nginx.com     if (nxt_slow_path(p == MAP_FAILED)) {
461487Smax.romanov@nginx.com         return NXT_ERROR;
471487Smax.romanov@nginx.com     }
481487Smax.romanov@nginx.com 
491487Smax.romanov@nginx.com     nxt_stream_ident = p;
501487Smax.romanov@nginx.com     *nxt_stream_ident = 1;
511487Smax.romanov@nginx.com 
521487Smax.romanov@nginx.com     return NXT_OK;
531487Smax.romanov@nginx.com }
541487Smax.romanov@nginx.com 
551487Smax.romanov@nginx.com 
56190Smax.romanov@nginx.com static nxt_int_t
nxt_rpc_reg_test(nxt_lvlhsh_query_t * lhq,void * data)57190Smax.romanov@nginx.com nxt_rpc_reg_test(nxt_lvlhsh_query_t *lhq, void *data)
58190Smax.romanov@nginx.com {
59190Smax.romanov@nginx.com     return NXT_OK;
60190Smax.romanov@nginx.com }
61190Smax.romanov@nginx.com 
62190Smax.romanov@nginx.com 
63190Smax.romanov@nginx.com static const nxt_lvlhsh_proto_t  lvlhsh_rpc_reg_proto  nxt_aligned(64) = {
64190Smax.romanov@nginx.com     NXT_LVLHSH_DEFAULT,
65190Smax.romanov@nginx.com     nxt_rpc_reg_test,
66190Smax.romanov@nginx.com     nxt_lvlhsh_alloc,
67190Smax.romanov@nginx.com     nxt_lvlhsh_free,
68190Smax.romanov@nginx.com };
69190Smax.romanov@nginx.com 
70190Smax.romanov@nginx.com 
71190Smax.romanov@nginx.com nxt_inline void
nxt_port_rpc_lhq_stream(nxt_lvlhsh_query_t * lhq,uint32_t * stream)72190Smax.romanov@nginx.com nxt_port_rpc_lhq_stream(nxt_lvlhsh_query_t *lhq, uint32_t *stream)
73190Smax.romanov@nginx.com {
74190Smax.romanov@nginx.com     lhq->key_hash = nxt_murmur_hash2(stream, sizeof(*stream));
75190Smax.romanov@nginx.com     lhq->key.length = sizeof(*stream);
76190Smax.romanov@nginx.com     lhq->key.start = (u_char *) stream;
77190Smax.romanov@nginx.com     lhq->proto = &lvlhsh_rpc_reg_proto;
78190Smax.romanov@nginx.com }
79190Smax.romanov@nginx.com 
80190Smax.romanov@nginx.com 
81190Smax.romanov@nginx.com nxt_inline void
nxt_port_rpc_lhq_peer(nxt_lvlhsh_query_t * lhq,nxt_pid_t * peer)82190Smax.romanov@nginx.com nxt_port_rpc_lhq_peer(nxt_lvlhsh_query_t *lhq, nxt_pid_t *peer)
83190Smax.romanov@nginx.com {
84190Smax.romanov@nginx.com     lhq->key_hash = nxt_murmur_hash2(peer, sizeof(*peer));
85190Smax.romanov@nginx.com     lhq->key.length = sizeof(*peer);
86190Smax.romanov@nginx.com     lhq->key.start = (u_char *) peer;
87190Smax.romanov@nginx.com     lhq->proto = &lvlhsh_rpc_reg_proto;
88190Smax.romanov@nginx.com }
89190Smax.romanov@nginx.com 
90190Smax.romanov@nginx.com 
91190Smax.romanov@nginx.com uint32_t
nxt_port_rpc_register_handler(nxt_task_t * task,nxt_port_t * port,nxt_port_rpc_handler_t ready_handler,nxt_port_rpc_handler_t error_handler,nxt_pid_t peer,void * data)92190Smax.romanov@nginx.com nxt_port_rpc_register_handler(nxt_task_t *task, nxt_port_t *port,
93190Smax.romanov@nginx.com     nxt_port_rpc_handler_t ready_handler, nxt_port_rpc_handler_t error_handler,
94190Smax.romanov@nginx.com     nxt_pid_t peer, void *data)
95190Smax.romanov@nginx.com {
96318Smax.romanov@nginx.com     void                *ex;
97318Smax.romanov@nginx.com     nxt_port_rpc_reg_t  *reg;
98318Smax.romanov@nginx.com 
99318Smax.romanov@nginx.com     ex = nxt_port_rpc_register_handler_ex(task, port, ready_handler,
100318Smax.romanov@nginx.com                                           error_handler, 0);
101318Smax.romanov@nginx.com 
102318Smax.romanov@nginx.com     if (ex == NULL) {
103318Smax.romanov@nginx.com         return 0;
104318Smax.romanov@nginx.com     }
105318Smax.romanov@nginx.com 
106318Smax.romanov@nginx.com     if (peer != -1) {
107318Smax.romanov@nginx.com         nxt_port_rpc_ex_set_peer(task, port, ex, peer);
108318Smax.romanov@nginx.com     }
109318Smax.romanov@nginx.com 
110318Smax.romanov@nginx.com     reg = nxt_pointer_to(ex, -sizeof(nxt_port_rpc_reg_t));
111318Smax.romanov@nginx.com 
112318Smax.romanov@nginx.com     nxt_assert(reg->data == ex);
113318Smax.romanov@nginx.com 
114318Smax.romanov@nginx.com     reg->data = data;
115318Smax.romanov@nginx.com 
116318Smax.romanov@nginx.com     return reg->stream;
117318Smax.romanov@nginx.com }
118318Smax.romanov@nginx.com 
119318Smax.romanov@nginx.com 
120318Smax.romanov@nginx.com void *
nxt_port_rpc_register_handler_ex(nxt_task_t * task,nxt_port_t * port,nxt_port_rpc_handler_t ready_handler,nxt_port_rpc_handler_t error_handler,size_t ex_size)121318Smax.romanov@nginx.com nxt_port_rpc_register_handler_ex(nxt_task_t *task, nxt_port_t *port,
122318Smax.romanov@nginx.com     nxt_port_rpc_handler_t ready_handler, nxt_port_rpc_handler_t error_handler,
123318Smax.romanov@nginx.com     size_t ex_size)
124318Smax.romanov@nginx.com {
125190Smax.romanov@nginx.com     uint32_t            stream;
126190Smax.romanov@nginx.com     nxt_port_rpc_reg_t  *reg;
127190Smax.romanov@nginx.com     nxt_lvlhsh_query_t  lhq;
128190Smax.romanov@nginx.com 
129277Sigor@sysoev.ru     nxt_assert(port->pair[0] != -1);
130190Smax.romanov@nginx.com 
1311487Smax.romanov@nginx.com     stream = nxt_atomic_fetch_add(nxt_stream_ident, 1);
132190Smax.romanov@nginx.com 
133318Smax.romanov@nginx.com     reg = nxt_mp_zalloc(port->mem_pool, sizeof(nxt_port_rpc_reg_t) + ex_size);
134190Smax.romanov@nginx.com 
135190Smax.romanov@nginx.com     if (nxt_slow_path(reg == NULL)) {
136318Smax.romanov@nginx.com         nxt_debug(task, "rpc: stream #%uD failed to allocate reg", stream);
137190Smax.romanov@nginx.com 
138318Smax.romanov@nginx.com         return NULL;
139190Smax.romanov@nginx.com     }
140190Smax.romanov@nginx.com 
141190Smax.romanov@nginx.com     reg->stream = stream;
142318Smax.romanov@nginx.com     reg->peer = -1;
143190Smax.romanov@nginx.com     reg->ready_handler = ready_handler;
144190Smax.romanov@nginx.com     reg->error_handler = error_handler;
145318Smax.romanov@nginx.com     reg->data = reg + 1;
146190Smax.romanov@nginx.com 
147190Smax.romanov@nginx.com     nxt_port_rpc_lhq_stream(&lhq, &stream);
148190Smax.romanov@nginx.com     lhq.replace = 0;
149190Smax.romanov@nginx.com     lhq.value = reg;
150190Smax.romanov@nginx.com     lhq.pool = port->mem_pool;
151190Smax.romanov@nginx.com 
152190Smax.romanov@nginx.com     switch (nxt_lvlhsh_insert(&port->rpc_streams, &lhq)) {
153190Smax.romanov@nginx.com 
154190Smax.romanov@nginx.com     case NXT_OK:
155190Smax.romanov@nginx.com         break;
156190Smax.romanov@nginx.com 
157190Smax.romanov@nginx.com     default:
158318Smax.romanov@nginx.com         nxt_log_error(NXT_LOG_ERR, task->log, "rpc: stream #%uD failed to add "
159318Smax.romanov@nginx.com                       "reg ", stream);
160190Smax.romanov@nginx.com 
161190Smax.romanov@nginx.com         nxt_mp_free(port->mem_pool, reg);
162190Smax.romanov@nginx.com 
163318Smax.romanov@nginx.com         return NULL;
164318Smax.romanov@nginx.com     }
165318Smax.romanov@nginx.com 
166318Smax.romanov@nginx.com     nxt_debug(task, "rpc: stream #%uD registered", stream);
167318Smax.romanov@nginx.com 
168582Smax.romanov@nginx.com     nxt_port_inc_use(port);
169582Smax.romanov@nginx.com 
170318Smax.romanov@nginx.com     return reg->data;
171318Smax.romanov@nginx.com }
172318Smax.romanov@nginx.com 
173318Smax.romanov@nginx.com 
174318Smax.romanov@nginx.com uint32_t
nxt_port_rpc_ex_stream(void * ex)175318Smax.romanov@nginx.com nxt_port_rpc_ex_stream(void *ex)
176318Smax.romanov@nginx.com {
177318Smax.romanov@nginx.com     nxt_port_rpc_reg_t  *reg;
178318Smax.romanov@nginx.com 
179318Smax.romanov@nginx.com     reg = nxt_pointer_to(ex, -sizeof(nxt_port_rpc_reg_t));
180318Smax.romanov@nginx.com 
181318Smax.romanov@nginx.com     nxt_assert(reg->data == ex);
182318Smax.romanov@nginx.com 
183318Smax.romanov@nginx.com     return reg->stream;
184318Smax.romanov@nginx.com }
185318Smax.romanov@nginx.com 
186318Smax.romanov@nginx.com 
187318Smax.romanov@nginx.com void
nxt_port_rpc_ex_set_peer(nxt_task_t * task,nxt_port_t * port,void * ex,nxt_pid_t peer)188318Smax.romanov@nginx.com nxt_port_rpc_ex_set_peer(nxt_task_t *task, nxt_port_t *port,
189318Smax.romanov@nginx.com     void *ex, nxt_pid_t peer)
190318Smax.romanov@nginx.com {
191318Smax.romanov@nginx.com     nxt_int_t           ret;
192318Smax.romanov@nginx.com     nxt_queue_link_t    *peer_link;
193318Smax.romanov@nginx.com     nxt_port_rpc_reg_t  *reg;
194318Smax.romanov@nginx.com     nxt_lvlhsh_query_t  lhq;
195318Smax.romanov@nginx.com 
196318Smax.romanov@nginx.com     reg = nxt_pointer_to(ex, -sizeof(nxt_port_rpc_reg_t));
197318Smax.romanov@nginx.com 
198318Smax.romanov@nginx.com     nxt_assert(reg->data == ex);
199318Smax.romanov@nginx.com 
200425Smax.romanov@nginx.com     if (nxt_slow_path(peer == reg->peer)) {
201425Smax.romanov@nginx.com         return;
202425Smax.romanov@nginx.com     }
203318Smax.romanov@nginx.com 
204425Smax.romanov@nginx.com     if (reg->peer != -1) {
205425Smax.romanov@nginx.com         nxt_port_rpc_remove_from_peers(task, port, reg);
206425Smax.romanov@nginx.com 
207425Smax.romanov@nginx.com         reg->peer = -1;
208425Smax.romanov@nginx.com     }
209425Smax.romanov@nginx.com 
210425Smax.romanov@nginx.com     if (peer == -1) {
211318Smax.romanov@nginx.com         return;
212190Smax.romanov@nginx.com     }
213190Smax.romanov@nginx.com 
214318Smax.romanov@nginx.com     reg->peer = peer;
215318Smax.romanov@nginx.com 
216318Smax.romanov@nginx.com     nxt_port_rpc_lhq_peer(&lhq, &peer);
217318Smax.romanov@nginx.com     lhq.replace = 0;
218318Smax.romanov@nginx.com     lhq.value = &reg->link;
219318Smax.romanov@nginx.com     lhq.pool = port->mem_pool;
220318Smax.romanov@nginx.com 
221318Smax.romanov@nginx.com     ret = nxt_lvlhsh_insert(&port->rpc_peers, &lhq);
222318Smax.romanov@nginx.com 
223318Smax.romanov@nginx.com     switch (ret) {
224318Smax.romanov@nginx.com 
225318Smax.romanov@nginx.com     case NXT_OK:
226318Smax.romanov@nginx.com         reg->link_first = 1;
227318Smax.romanov@nginx.com         nxt_queue_self(&reg->link);
228318Smax.romanov@nginx.com 
229318Smax.romanov@nginx.com         nxt_debug(task, "rpc: stream #%uD assigned uniq pid %PI (%p)",
230318Smax.romanov@nginx.com                   reg->stream, reg->peer, reg->link.next);
231318Smax.romanov@nginx.com         break;
232318Smax.romanov@nginx.com 
233318Smax.romanov@nginx.com     case NXT_DECLINED:
234318Smax.romanov@nginx.com         reg->link_first = 0;
235318Smax.romanov@nginx.com         peer_link = lhq.value;
236318Smax.romanov@nginx.com         nxt_queue_insert_after(peer_link, &reg->link);
237318Smax.romanov@nginx.com 
238318Smax.romanov@nginx.com         nxt_debug(task, "rpc: stream #%uD assigned duplicate pid %PI (%p)",
239318Smax.romanov@nginx.com                   reg->stream, reg->peer, reg->link.next);
240318Smax.romanov@nginx.com         break;
241318Smax.romanov@nginx.com 
242318Smax.romanov@nginx.com     default:
243494Spluknet@nginx.com         nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to add "
244318Smax.romanov@nginx.com                       "peer for stream #%uD (%d)", reg->stream, ret);
245318Smax.romanov@nginx.com 
246318Smax.romanov@nginx.com         reg->peer = -1;
247318Smax.romanov@nginx.com         break;
248318Smax.romanov@nginx.com     }
249318Smax.romanov@nginx.com 
250318Smax.romanov@nginx.com }
251318Smax.romanov@nginx.com 
252318Smax.romanov@nginx.com 
253318Smax.romanov@nginx.com static void
nxt_port_rpc_remove_from_peers(nxt_task_t * task,nxt_port_t * port,nxt_port_rpc_reg_t * reg)254318Smax.romanov@nginx.com nxt_port_rpc_remove_from_peers(nxt_task_t *task, nxt_port_t *port,
255318Smax.romanov@nginx.com     nxt_port_rpc_reg_t *reg)
256318Smax.romanov@nginx.com {
257318Smax.romanov@nginx.com     uint32_t            stream;
258318Smax.romanov@nginx.com     nxt_int_t           ret;
259318Smax.romanov@nginx.com     nxt_lvlhsh_query_t  lhq;
260318Smax.romanov@nginx.com     nxt_port_rpc_reg_t  *r;
261318Smax.romanov@nginx.com 
262318Smax.romanov@nginx.com     stream = reg->stream;
263318Smax.romanov@nginx.com 
264318Smax.romanov@nginx.com     if (reg->link_first != 0) {
265318Smax.romanov@nginx.com         nxt_port_rpc_lhq_peer(&lhq, &reg->peer);
266204Smax.romanov@nginx.com         lhq.pool = port->mem_pool;
267190Smax.romanov@nginx.com 
268318Smax.romanov@nginx.com         if (reg->link.next == &reg->link) {
269318Smax.romanov@nginx.com             nxt_assert(reg->link.prev == &reg->link);
270318Smax.romanov@nginx.com 
271318Smax.romanov@nginx.com             nxt_debug(task, "rpc: stream #%uD remove first and last pid %PI "
272318Smax.romanov@nginx.com                       "registration (%p)", stream, reg->peer, reg->link.next);
273190Smax.romanov@nginx.com 
274318Smax.romanov@nginx.com             ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq);
275613Svbart@nginx.com 
276318Smax.romanov@nginx.com         } else {
277318Smax.romanov@nginx.com             nxt_debug(task, "rpc: stream #%uD remove first pid %PI "
278318Smax.romanov@nginx.com                       "registration (%p)", stream, reg->peer, reg->link.next);
279318Smax.romanov@nginx.com 
280318Smax.romanov@nginx.com             lhq.replace = 1;
281318Smax.romanov@nginx.com             lhq.value = reg->link.next;
282190Smax.romanov@nginx.com 
283318Smax.romanov@nginx.com             r = nxt_queue_link_data(reg->link.next, nxt_port_rpc_reg_t, link);
284318Smax.romanov@nginx.com             r->link_first = 1;
285318Smax.romanov@nginx.com 
286318Smax.romanov@nginx.com             nxt_queue_remove(&reg->link);
287190Smax.romanov@nginx.com 
288318Smax.romanov@nginx.com             ret = nxt_lvlhsh_insert(&port->rpc_peers, &lhq);
289204Smax.romanov@nginx.com         }
290613Svbart@nginx.com 
291318Smax.romanov@nginx.com     } else {
292318Smax.romanov@nginx.com         nxt_debug(task, "rpc: stream #%uD remove pid %PI "
293318Smax.romanov@nginx.com                   "registration (%p)", stream, reg->peer, reg->link.next);
294318Smax.romanov@nginx.com 
295318Smax.romanov@nginx.com         nxt_queue_remove(&reg->link);
296318Smax.romanov@nginx.com         ret = NXT_OK;
297190Smax.romanov@nginx.com     }
298190Smax.romanov@nginx.com 
299318Smax.romanov@nginx.com     if (nxt_slow_path(ret != NXT_OK)) {
300318Smax.romanov@nginx.com         nxt_log_error(NXT_LOG_ERR, task->log, "rpc: stream #%uD failed"
301318Smax.romanov@nginx.com                       " to delete peer %PI (%d)", stream, reg->peer, ret);
302318Smax.romanov@nginx.com     }
303190Smax.romanov@nginx.com }
304190Smax.romanov@nginx.com 
305190Smax.romanov@nginx.com 
306190Smax.romanov@nginx.com void
nxt_port_rpc_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)307190Smax.romanov@nginx.com nxt_port_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
308190Smax.romanov@nginx.com {
309190Smax.romanov@nginx.com     uint8_t              last;
310190Smax.romanov@nginx.com     uint32_t             stream;
311190Smax.romanov@nginx.com     nxt_int_t            ret;
312190Smax.romanov@nginx.com     nxt_port_t           *port;
313190Smax.romanov@nginx.com     nxt_port_rpc_reg_t   *reg;
314190Smax.romanov@nginx.com     nxt_lvlhsh_query_t   lhq;
315190Smax.romanov@nginx.com     nxt_port_msg_type_t  type;
316190Smax.romanov@nginx.com 
317190Smax.romanov@nginx.com     stream = msg->port_msg.stream;
318190Smax.romanov@nginx.com     port = msg->port;
319190Smax.romanov@nginx.com     last = msg->port_msg.last;
320190Smax.romanov@nginx.com     type = msg->port_msg.type;
321190Smax.romanov@nginx.com 
322190Smax.romanov@nginx.com     nxt_port_rpc_lhq_stream(&lhq, &stream);
323190Smax.romanov@nginx.com     lhq.pool = port->mem_pool;
324190Smax.romanov@nginx.com 
325190Smax.romanov@nginx.com     if (last != 0) {
326190Smax.romanov@nginx.com         ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq);
327277Sigor@sysoev.ru 
328190Smax.romanov@nginx.com     } else {
329190Smax.romanov@nginx.com         ret = nxt_lvlhsh_find(&port->rpc_streams, &lhq);
330190Smax.romanov@nginx.com     }
331190Smax.romanov@nginx.com 
332190Smax.romanov@nginx.com     if (ret != NXT_OK) {
333318Smax.romanov@nginx.com         nxt_debug(task, "rpc: stream #%uD no handler found", stream);
334190Smax.romanov@nginx.com 
335190Smax.romanov@nginx.com         return;
336190Smax.romanov@nginx.com     }
337190Smax.romanov@nginx.com 
338318Smax.romanov@nginx.com     nxt_debug(task, "rpc: stream #%uD %shandler, type %d", stream,
339318Smax.romanov@nginx.com                     (last ? "last " : ""), type);
340318Smax.romanov@nginx.com 
341190Smax.romanov@nginx.com     reg = lhq.value;
342190Smax.romanov@nginx.com 
343190Smax.romanov@nginx.com     if (type == _NXT_PORT_MSG_RPC_ERROR) {
344190Smax.romanov@nginx.com         reg->error_handler(task, msg, reg->data);
345277Sigor@sysoev.ru 
346190Smax.romanov@nginx.com     } else {
347190Smax.romanov@nginx.com         reg->ready_handler(task, msg, reg->data);
348190Smax.romanov@nginx.com     }
349190Smax.romanov@nginx.com 
350190Smax.romanov@nginx.com     if (last == 0) {
351190Smax.romanov@nginx.com         return;
352190Smax.romanov@nginx.com     }
353190Smax.romanov@nginx.com 
354204Smax.romanov@nginx.com     if (reg->peer != -1) {
355318Smax.romanov@nginx.com         nxt_port_rpc_remove_from_peers(task, port, reg);
356318Smax.romanov@nginx.com     }
357190Smax.romanov@nginx.com 
358318Smax.romanov@nginx.com     nxt_debug(task, "rpc: stream #%uD free registration", stream);
359190Smax.romanov@nginx.com 
360190Smax.romanov@nginx.com     nxt_mp_free(port->mem_pool, reg);
361582Smax.romanov@nginx.com 
362582Smax.romanov@nginx.com     nxt_port_use(task, port, -1);
363190Smax.romanov@nginx.com }
364190Smax.romanov@nginx.com 
365190Smax.romanov@nginx.com 
366190Smax.romanov@nginx.com void
nxt_port_rpc_remove_peer(nxt_task_t * task,nxt_port_t * port,nxt_pid_t peer)367190Smax.romanov@nginx.com nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer)
368190Smax.romanov@nginx.com {
369190Smax.romanov@nginx.com     uint8_t              last;
370190Smax.romanov@nginx.com     uint32_t             stream;
371190Smax.romanov@nginx.com     nxt_int_t            ret;
372190Smax.romanov@nginx.com     nxt_buf_t            buf;
373318Smax.romanov@nginx.com     nxt_queue_link_t     *peer_link, *next_link;
374190Smax.romanov@nginx.com     nxt_port_rpc_reg_t   *reg;
375190Smax.romanov@nginx.com     nxt_lvlhsh_query_t   lhq;
376190Smax.romanov@nginx.com     nxt_port_recv_msg_t  msg;
377190Smax.romanov@nginx.com 
378190Smax.romanov@nginx.com     nxt_port_rpc_lhq_peer(&lhq, &peer);
379190Smax.romanov@nginx.com     lhq.pool = port->mem_pool;
380190Smax.romanov@nginx.com 
381190Smax.romanov@nginx.com     ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq);
382190Smax.romanov@nginx.com 
383190Smax.romanov@nginx.com     if (nxt_slow_path(ret != NXT_OK)) {
384318Smax.romanov@nginx.com         nxt_debug(task, "rpc: no reg found for peer %PI", peer);
385190Smax.romanov@nginx.com 
386190Smax.romanov@nginx.com         return;
387190Smax.romanov@nginx.com     }
388190Smax.romanov@nginx.com 
389190Smax.romanov@nginx.com     nxt_memzero(&msg, sizeof(msg));
390190Smax.romanov@nginx.com     nxt_memzero(&buf, sizeof(buf));
391190Smax.romanov@nginx.com 
3921558Smax.romanov@nginx.com     msg.fd[0] = -1;
3931558Smax.romanov@nginx.com     msg.fd[1] = -1;
394190Smax.romanov@nginx.com     msg.buf = &buf;
395190Smax.romanov@nginx.com     msg.port = port;
396*1996St.nateldemoura@f5.com     msg.u.removed_pid = peer;
397*1996St.nateldemoura@f5.com     msg.port_msg.pid = nxt_pid;
398190Smax.romanov@nginx.com     msg.port_msg.type = _NXT_PORT_MSG_REMOVE_PID;
399190Smax.romanov@nginx.com 
400190Smax.romanov@nginx.com     peer_link = lhq.value;
401190Smax.romanov@nginx.com     last = 0;
402190Smax.romanov@nginx.com 
403190Smax.romanov@nginx.com     while (last == 0) {
404190Smax.romanov@nginx.com 
405190Smax.romanov@nginx.com         reg = nxt_queue_link_data(peer_link, nxt_port_rpc_reg_t, link);
406190Smax.romanov@nginx.com 
407318Smax.romanov@nginx.com         nxt_assert(reg->peer == peer);
408318Smax.romanov@nginx.com 
409318Smax.romanov@nginx.com         stream = reg->stream;
410190Smax.romanov@nginx.com 
411318Smax.romanov@nginx.com         nxt_debug(task, "rpc: stream #%uD trigger error", stream);
412318Smax.romanov@nginx.com 
413318Smax.romanov@nginx.com         msg.port_msg.stream = stream;
414425Smax.romanov@nginx.com         msg.port_msg.last = 1;
415190Smax.romanov@nginx.com 
416190Smax.romanov@nginx.com         if (peer_link == peer_link->next) {
417318Smax.romanov@nginx.com             nxt_assert(peer_link->prev == peer_link);
418318Smax.romanov@nginx.com 
419190Smax.romanov@nginx.com             last = 1;
420190Smax.romanov@nginx.com 
421190Smax.romanov@nginx.com         } else {
422318Smax.romanov@nginx.com             nxt_assert(peer_link->next->prev == peer_link);
423318Smax.romanov@nginx.com             nxt_assert(peer_link->prev->next == peer_link);
424318Smax.romanov@nginx.com 
425318Smax.romanov@nginx.com             next_link = peer_link->next;
426318Smax.romanov@nginx.com             nxt_queue_remove(peer_link);
427318Smax.romanov@nginx.com 
428318Smax.romanov@nginx.com             peer_link = next_link;
429190Smax.romanov@nginx.com         }
430190Smax.romanov@nginx.com 
431425Smax.romanov@nginx.com         reg->peer = -1;
432425Smax.romanov@nginx.com 
433425Smax.romanov@nginx.com         reg->error_handler(task, &msg, reg->data);
434425Smax.romanov@nginx.com 
435425Smax.romanov@nginx.com         /* Reset 'last' flag to preserve rpc handler. */
436425Smax.romanov@nginx.com         if (msg.port_msg.last == 0) {
437425Smax.romanov@nginx.com             continue;
438425Smax.romanov@nginx.com         }
439425Smax.romanov@nginx.com 
440425Smax.romanov@nginx.com         nxt_port_rpc_lhq_stream(&lhq, &stream);
441425Smax.romanov@nginx.com         lhq.pool = port->mem_pool;
442425Smax.romanov@nginx.com 
443425Smax.romanov@nginx.com         ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq);
444425Smax.romanov@nginx.com 
445425Smax.romanov@nginx.com         if (nxt_slow_path(ret != NXT_OK)) {
446425Smax.romanov@nginx.com             nxt_log_error(NXT_LOG_ERR, task->log,
447425Smax.romanov@nginx.com                           "rpc: stream #%uD failed to delete handler", stream);
448425Smax.romanov@nginx.com 
449425Smax.romanov@nginx.com             return;
450425Smax.romanov@nginx.com         }
451425Smax.romanov@nginx.com 
452190Smax.romanov@nginx.com         nxt_mp_free(port->mem_pool, reg);
453582Smax.romanov@nginx.com 
454582Smax.romanov@nginx.com         nxt_port_use(task, port, -1);
455190Smax.romanov@nginx.com     }
456190Smax.romanov@nginx.com }
457190Smax.romanov@nginx.com 
458190Smax.romanov@nginx.com 
459190Smax.romanov@nginx.com void
nxt_port_rpc_cancel(nxt_task_t * task,nxt_port_t * port,uint32_t stream)460190Smax.romanov@nginx.com nxt_port_rpc_cancel(nxt_task_t *task, nxt_port_t *port, uint32_t stream)
461190Smax.romanov@nginx.com {
462190Smax.romanov@nginx.com     nxt_int_t           ret;
463190Smax.romanov@nginx.com     nxt_port_rpc_reg_t  *reg;
464190Smax.romanov@nginx.com     nxt_lvlhsh_query_t  lhq;
465190Smax.romanov@nginx.com 
466190Smax.romanov@nginx.com     nxt_port_rpc_lhq_stream(&lhq, &stream);
467190Smax.romanov@nginx.com     lhq.pool = port->mem_pool;
468190Smax.romanov@nginx.com 
469190Smax.romanov@nginx.com     ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq);
470190Smax.romanov@nginx.com 
471190Smax.romanov@nginx.com     if (ret != NXT_OK) {
472318Smax.romanov@nginx.com         nxt_debug(task, "rpc: stream #%uD no handler found", stream);
473190Smax.romanov@nginx.com 
474190Smax.romanov@nginx.com         return;
475190Smax.romanov@nginx.com     }
476190Smax.romanov@nginx.com 
477190Smax.romanov@nginx.com     reg = lhq.value;
478190Smax.romanov@nginx.com 
479204Smax.romanov@nginx.com     if (reg->peer != -1) {
480318Smax.romanov@nginx.com         nxt_port_rpc_remove_from_peers(task, port, reg);
481318Smax.romanov@nginx.com     }
482190Smax.romanov@nginx.com 
483318Smax.romanov@nginx.com     nxt_debug(task, "rpc: stream #%uD cancel registration", stream);
484190Smax.romanov@nginx.com 
485190Smax.romanov@nginx.com     nxt_mp_free(port->mem_pool, reg);
486582Smax.romanov@nginx.com 
487582Smax.romanov@nginx.com     nxt_port_use(task, port, -1);
488190Smax.romanov@nginx.com }
489583Smax.romanov@nginx.com 
490583Smax.romanov@nginx.com static nxt_buf_t  nxt_port_close_dummy_buf;
491583Smax.romanov@nginx.com 
492583Smax.romanov@nginx.com void
nxt_port_rpc_close(nxt_task_t * task,nxt_port_t * port)493583Smax.romanov@nginx.com nxt_port_rpc_close(nxt_task_t *task, nxt_port_t *port)
494583Smax.romanov@nginx.com {
495583Smax.romanov@nginx.com     nxt_port_rpc_reg_t   *reg;
496583Smax.romanov@nginx.com     nxt_port_recv_msg_t  msg;
497583Smax.romanov@nginx.com 
498583Smax.romanov@nginx.com     for ( ;; ) {
499596Sigor@sysoev.ru         reg = nxt_lvlhsh_peek(&port->rpc_streams, &lvlhsh_rpc_reg_proto);
500583Smax.romanov@nginx.com         if (reg == NULL) {
501583Smax.romanov@nginx.com             return;
502583Smax.romanov@nginx.com         }
503583Smax.romanov@nginx.com 
5041558Smax.romanov@nginx.com         msg.fd[0] = -1;
5051558Smax.romanov@nginx.com         msg.fd[1] = -1;
506583Smax.romanov@nginx.com         msg.buf = &nxt_port_close_dummy_buf;
507583Smax.romanov@nginx.com         msg.port = port;
508583Smax.romanov@nginx.com         msg.port_msg.stream = reg->stream;
509583Smax.romanov@nginx.com         msg.port_msg.pid = nxt_pid;
510583Smax.romanov@nginx.com         msg.port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
511583Smax.romanov@nginx.com         msg.port_msg.last = 1;
512583Smax.romanov@nginx.com         msg.port_msg.mmap = 0;
513583Smax.romanov@nginx.com         msg.port_msg.nf = 0;
514583Smax.romanov@nginx.com         msg.port_msg.mf = 0;
515956Smax.romanov@nginx.com         msg.size = 0;
516583Smax.romanov@nginx.com         msg.cancelled = 0;
517583Smax.romanov@nginx.com         msg.u.data = NULL;
518583Smax.romanov@nginx.com 
519583Smax.romanov@nginx.com         nxt_port_rpc_handler(task, &msg);
520583Smax.romanov@nginx.com     }
521583Smax.romanov@nginx.com }
522