xref: /unit/src/nxt_port_rpc.c (revision 494:7c83ddcc1c42)
1 
2 /*
3  * Copyright (C) Max Romanov
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 #include <nxt_port_rpc.h>
9 
10 
11 static nxt_atomic_t  nxt_stream_ident = 1;
12 
13 typedef struct nxt_port_rpc_reg_s nxt_port_rpc_reg_t;
14 
15 struct nxt_port_rpc_reg_s {
16     uint32_t                stream;
17 
18     nxt_pid_t               peer;
19     nxt_queue_link_t        link;
20     nxt_bool_t              link_first;
21 
22     nxt_port_rpc_handler_t  ready_handler;
23     nxt_port_rpc_handler_t  error_handler;
24     void                    *data;
25 };
26 
27 
28 static void
29 nxt_port_rpc_remove_from_peers(nxt_task_t *task, nxt_port_t *port,
30     nxt_port_rpc_reg_t *reg);
31 
32 
33 static nxt_int_t
34 nxt_rpc_reg_test(nxt_lvlhsh_query_t *lhq, void *data)
35 {
36     return NXT_OK;
37 }
38 
39 
40 static const nxt_lvlhsh_proto_t  lvlhsh_rpc_reg_proto  nxt_aligned(64) = {
41     NXT_LVLHSH_DEFAULT,
42     nxt_rpc_reg_test,
43     nxt_lvlhsh_alloc,
44     nxt_lvlhsh_free,
45 };
46 
47 
48 nxt_inline void
49 nxt_port_rpc_lhq_stream(nxt_lvlhsh_query_t *lhq, uint32_t *stream)
50 {
51     lhq->key_hash = nxt_murmur_hash2(stream, sizeof(*stream));
52     lhq->key.length = sizeof(*stream);
53     lhq->key.start = (u_char *) stream;
54     lhq->proto = &lvlhsh_rpc_reg_proto;
55 }
56 
57 
58 nxt_inline void
59 nxt_port_rpc_lhq_peer(nxt_lvlhsh_query_t *lhq, nxt_pid_t *peer)
60 {
61     lhq->key_hash = nxt_murmur_hash2(peer, sizeof(*peer));
62     lhq->key.length = sizeof(*peer);
63     lhq->key.start = (u_char *) peer;
64     lhq->proto = &lvlhsh_rpc_reg_proto;
65 }
66 
67 
68 uint32_t
69 nxt_port_rpc_register_handler(nxt_task_t *task, nxt_port_t *port,
70     nxt_port_rpc_handler_t ready_handler, nxt_port_rpc_handler_t error_handler,
71     nxt_pid_t peer, void *data)
72 {
73     void                *ex;
74     nxt_port_rpc_reg_t  *reg;
75 
76     ex = nxt_port_rpc_register_handler_ex(task, port, ready_handler,
77                                           error_handler, 0);
78 
79     if (ex == NULL) {
80         return 0;
81     }
82 
83     if (peer != -1) {
84         nxt_port_rpc_ex_set_peer(task, port, ex, peer);
85     }
86 
87     reg = nxt_pointer_to(ex, -sizeof(nxt_port_rpc_reg_t));
88 
89     nxt_assert(reg->data == ex);
90 
91     reg->data = data;
92 
93     return reg->stream;
94 }
95 
96 
97 void *
98 nxt_port_rpc_register_handler_ex(nxt_task_t *task, nxt_port_t *port,
99     nxt_port_rpc_handler_t ready_handler, nxt_port_rpc_handler_t error_handler,
100     size_t ex_size)
101 {
102     uint32_t            stream;
103     nxt_port_rpc_reg_t  *reg;
104     nxt_lvlhsh_query_t  lhq;
105 
106     nxt_assert(port->pair[0] != -1);
107 
108     stream =
109         (uint32_t) nxt_atomic_fetch_add(&nxt_stream_ident, 1) & 0x3fffffff;
110 
111     reg = nxt_mp_zalloc(port->mem_pool, sizeof(nxt_port_rpc_reg_t) + ex_size);
112 
113     if (nxt_slow_path(reg == NULL)) {
114         nxt_debug(task, "rpc: stream #%uD failed to allocate reg", stream);
115 
116         return NULL;
117     }
118 
119     reg->stream = stream;
120     reg->peer = -1;
121     reg->ready_handler = ready_handler;
122     reg->error_handler = error_handler;
123     reg->data = reg + 1;
124 
125     nxt_port_rpc_lhq_stream(&lhq, &stream);
126     lhq.replace = 0;
127     lhq.value = reg;
128     lhq.pool = port->mem_pool;
129 
130     switch (nxt_lvlhsh_insert(&port->rpc_streams, &lhq)) {
131 
132     case NXT_OK:
133         break;
134 
135     default:
136         nxt_log_error(NXT_LOG_ERR, task->log, "rpc: stream #%uD failed to add "
137                       "reg ", stream);
138 
139         nxt_mp_free(port->mem_pool, reg);
140 
141         return NULL;
142     }
143 
144     nxt_debug(task, "rpc: stream #%uD registered", stream);
145 
146     return reg->data;
147 }
148 
149 
150 uint32_t
151 nxt_port_rpc_ex_stream(void *ex)
152 {
153     nxt_port_rpc_reg_t  *reg;
154 
155     reg = nxt_pointer_to(ex, -sizeof(nxt_port_rpc_reg_t));
156 
157     nxt_assert(reg->data == ex);
158 
159     return reg->stream;
160 }
161 
162 
163 void
164 nxt_port_rpc_ex_set_peer(nxt_task_t *task, nxt_port_t *port,
165     void *ex, nxt_pid_t peer)
166 {
167     nxt_int_t           ret;
168     nxt_queue_link_t    *peer_link;
169     nxt_port_rpc_reg_t  *reg;
170     nxt_lvlhsh_query_t  lhq;
171 
172     reg = nxt_pointer_to(ex, -sizeof(nxt_port_rpc_reg_t));
173 
174     nxt_assert(reg->data == ex);
175 
176     if (nxt_slow_path(peer == reg->peer)) {
177         return;
178     }
179 
180     if (reg->peer != -1) {
181         nxt_port_rpc_remove_from_peers(task, port, reg);
182 
183         reg->peer = -1;
184     }
185 
186     if (peer == -1) {
187         return;
188     }
189 
190     reg->peer = peer;
191 
192     nxt_port_rpc_lhq_peer(&lhq, &peer);
193     lhq.replace = 0;
194     lhq.value = &reg->link;
195     lhq.pool = port->mem_pool;
196 
197     ret = nxt_lvlhsh_insert(&port->rpc_peers, &lhq);
198 
199     switch (ret) {
200 
201     case NXT_OK:
202         reg->link_first = 1;
203         nxt_queue_self(&reg->link);
204 
205         nxt_debug(task, "rpc: stream #%uD assigned uniq pid %PI (%p)",
206                   reg->stream, reg->peer, reg->link.next);
207         break;
208 
209     case NXT_DECLINED:
210         reg->link_first = 0;
211         peer_link = lhq.value;
212         nxt_queue_insert_after(peer_link, &reg->link);
213 
214         nxt_debug(task, "rpc: stream #%uD assigned duplicate pid %PI (%p)",
215                   reg->stream, reg->peer, reg->link.next);
216         break;
217 
218     default:
219         nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to add "
220                       "peer for stream #%uD (%d)", reg->stream, ret);
221 
222         reg->peer = -1;
223         break;
224     }
225 
226 }
227 
228 
229 static void
230 nxt_port_rpc_remove_from_peers(nxt_task_t *task, nxt_port_t *port,
231     nxt_port_rpc_reg_t *reg)
232 {
233     uint32_t            stream;
234     nxt_int_t           ret;
235     nxt_lvlhsh_query_t  lhq;
236     nxt_port_rpc_reg_t  *r;
237 
238     stream = reg->stream;
239 
240     if (reg->link_first != 0) {
241         nxt_port_rpc_lhq_peer(&lhq, &reg->peer);
242         lhq.pool = port->mem_pool;
243 
244         if (reg->link.next == &reg->link) {
245             nxt_assert(reg->link.prev == &reg->link);
246 
247             nxt_debug(task, "rpc: stream #%uD remove first and last pid %PI "
248                       "registration (%p)", stream, reg->peer, reg->link.next);
249 
250             ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq);
251         } else {
252             nxt_debug(task, "rpc: stream #%uD remove first pid %PI "
253                       "registration (%p)", stream, reg->peer, reg->link.next);
254 
255             lhq.replace = 1;
256             lhq.value = reg->link.next;
257 
258             r = nxt_queue_link_data(reg->link.next, nxt_port_rpc_reg_t, link);
259             r->link_first = 1;
260 
261             nxt_queue_remove(&reg->link);
262 
263             ret = nxt_lvlhsh_insert(&port->rpc_peers, &lhq);
264         }
265     } else {
266         nxt_debug(task, "rpc: stream #%uD remove pid %PI "
267                   "registration (%p)", stream, reg->peer, reg->link.next);
268 
269         nxt_queue_remove(&reg->link);
270         ret = NXT_OK;
271     }
272 
273     if (nxt_slow_path(ret != NXT_OK)) {
274         nxt_log_error(NXT_LOG_ERR, task->log, "rpc: stream #%uD failed"
275                       " to delete peer %PI (%d)", stream, reg->peer, ret);
276     }
277 }
278 
279 
280 void
281 nxt_port_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
282 {
283     uint8_t              last;
284     uint32_t             stream;
285     nxt_int_t            ret;
286     nxt_port_t           *port;
287     nxt_port_rpc_reg_t   *reg;
288     nxt_lvlhsh_query_t   lhq;
289     nxt_port_msg_type_t  type;
290 
291     stream = msg->port_msg.stream;
292     port = msg->port;
293     last = msg->port_msg.last;
294     type = msg->port_msg.type;
295 
296     nxt_port_rpc_lhq_stream(&lhq, &stream);
297     lhq.pool = port->mem_pool;
298 
299     if (last != 0) {
300         ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq);
301 
302     } else {
303         ret = nxt_lvlhsh_find(&port->rpc_streams, &lhq);
304     }
305 
306     if (ret != NXT_OK) {
307         nxt_debug(task, "rpc: stream #%uD no handler found", stream);
308 
309         return;
310     }
311 
312     nxt_debug(task, "rpc: stream #%uD %shandler, type %d", stream,
313                     (last ? "last " : ""), type);
314 
315     reg = lhq.value;
316 
317     if (type == _NXT_PORT_MSG_RPC_ERROR) {
318         reg->error_handler(task, msg, reg->data);
319 
320     } else {
321         reg->ready_handler(task, msg, reg->data);
322     }
323 
324     if (last == 0) {
325         return;
326     }
327 
328     if (reg->peer != -1) {
329         nxt_port_rpc_remove_from_peers(task, port, reg);
330     }
331 
332     nxt_debug(task, "rpc: stream #%uD free registration", stream);
333 
334     nxt_mp_free(port->mem_pool, reg);
335 }
336 
337 
338 void
339 nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer)
340 {
341     uint8_t              last;
342     uint32_t             stream;
343     nxt_int_t            ret;
344     nxt_buf_t            buf;
345     nxt_queue_link_t     *peer_link, *next_link;
346     nxt_port_rpc_reg_t   *reg;
347     nxt_lvlhsh_query_t   lhq;
348     nxt_port_recv_msg_t  msg;
349 
350     nxt_port_rpc_lhq_peer(&lhq, &peer);
351     lhq.pool = port->mem_pool;
352 
353     ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq);
354 
355     if (nxt_slow_path(ret != NXT_OK)) {
356         nxt_debug(task, "rpc: no reg found for peer %PI", peer);
357 
358         return;
359     }
360 
361     nxt_memzero(&msg, sizeof(msg));
362     nxt_memzero(&buf, sizeof(buf));
363 
364     msg.fd = -1;
365     msg.buf = &buf;
366     msg.port = port;
367 
368     msg.port_msg.pid = peer;
369     msg.port_msg.type = _NXT_PORT_MSG_REMOVE_PID;
370 
371     peer_link = lhq.value;
372     last = 0;
373 
374     while (last == 0) {
375 
376         reg = nxt_queue_link_data(peer_link, nxt_port_rpc_reg_t, link);
377 
378         nxt_assert(reg->peer == peer);
379 
380         stream = reg->stream;
381 
382         nxt_debug(task, "rpc: stream #%uD trigger error", stream);
383 
384         msg.port_msg.stream = stream;
385         msg.port_msg.last = 1;
386 
387         if (peer_link == peer_link->next) {
388             nxt_assert(peer_link->prev == peer_link);
389 
390             last = 1;
391 
392         } else {
393             nxt_assert(peer_link->next->prev == peer_link);
394             nxt_assert(peer_link->prev->next == peer_link);
395 
396             next_link = peer_link->next;
397             nxt_queue_remove(peer_link);
398 
399             peer_link = next_link;
400         }
401 
402         reg->peer = -1;
403 
404         reg->error_handler(task, &msg, reg->data);
405 
406         /* Reset 'last' flag to preserve rpc handler. */
407         if (msg.port_msg.last == 0) {
408             continue;
409         }
410 
411         nxt_port_rpc_lhq_stream(&lhq, &stream);
412         lhq.pool = port->mem_pool;
413 
414         ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq);
415 
416         if (nxt_slow_path(ret != NXT_OK)) {
417             nxt_log_error(NXT_LOG_ERR, task->log,
418                           "rpc: stream #%uD failed to delete handler", stream);
419 
420             return;
421         }
422 
423         nxt_mp_free(port->mem_pool, reg);
424     }
425 }
426 
427 
428 void
429 nxt_port_rpc_cancel(nxt_task_t *task, nxt_port_t *port, uint32_t stream)
430 {
431     nxt_int_t           ret;
432     nxt_port_rpc_reg_t  *reg;
433     nxt_lvlhsh_query_t  lhq;
434 
435     nxt_port_rpc_lhq_stream(&lhq, &stream);
436     lhq.pool = port->mem_pool;
437 
438     ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq);
439 
440     if (ret != NXT_OK) {
441         nxt_debug(task, "rpc: stream #%uD no handler found", stream);
442 
443         return;
444     }
445 
446     reg = lhq.value;
447 
448     if (reg->peer != -1) {
449         nxt_port_rpc_remove_from_peers(task, port, reg);
450     }
451 
452     nxt_debug(task, "rpc: stream #%uD cancel registration", stream);
453 
454     nxt_mp_free(port->mem_pool, reg);
455 }
456