xref: /unit/src/nxt_port_rpc.c (revision 2126:8542c8141a13)
1 
2 /*
3  * Copyright (C) Max Romanov
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 #include <nxt_port_rpc.h>
9 
10 
11 static volatile uint32_t  *nxt_stream_ident;
12 
13 typedef struct nxt_port_rpc_reg_s nxt_port_rpc_reg_t;
14 
15 struct nxt_port_rpc_reg_s {
16     uint32_t                stream;
17 
18     nxt_pid_t               peer;
19     nxt_queue_link_t        link;
20     nxt_bool_t              link_first;
21 
22     nxt_port_rpc_handler_t  ready_handler;
23     nxt_port_rpc_handler_t  error_handler;
24     void                    *data;
25 };
26 
27 
28 static void
29 nxt_port_rpc_remove_from_peers(nxt_task_t *task, nxt_port_t *port,
30     nxt_port_rpc_reg_t *reg);
31 
32 
33 nxt_int_t
nxt_port_rpc_init(void)34 nxt_port_rpc_init(void)
35 {
36     void  *p;
37 
38     if (nxt_stream_ident != NULL) {
39         return NXT_OK;
40     }
41 
42     p = nxt_mem_mmap(NULL, sizeof(*nxt_stream_ident), PROT_READ | PROT_WRITE,
43                      MAP_ANON | MAP_SHARED, -1, 0);
44 
45     if (nxt_slow_path(p == MAP_FAILED)) {
46         return NXT_ERROR;
47     }
48 
49     nxt_stream_ident = p;
50     *nxt_stream_ident = 1;
51 
52     return NXT_OK;
53 }
54 
55 
56 static nxt_int_t
nxt_rpc_reg_test(nxt_lvlhsh_query_t * lhq,void * data)57 nxt_rpc_reg_test(nxt_lvlhsh_query_t *lhq, void *data)
58 {
59     return NXT_OK;
60 }
61 
62 
63 static const nxt_lvlhsh_proto_t  lvlhsh_rpc_reg_proto  nxt_aligned(64) = {
64     NXT_LVLHSH_DEFAULT,
65     nxt_rpc_reg_test,
66     nxt_lvlhsh_alloc,
67     nxt_lvlhsh_free,
68 };
69 
70 
71 nxt_inline void
nxt_port_rpc_lhq_stream(nxt_lvlhsh_query_t * lhq,uint32_t * stream)72 nxt_port_rpc_lhq_stream(nxt_lvlhsh_query_t *lhq, uint32_t *stream)
73 {
74     lhq->key_hash = nxt_murmur_hash2(stream, sizeof(*stream));
75     lhq->key.length = sizeof(*stream);
76     lhq->key.start = (u_char *) stream;
77     lhq->proto = &lvlhsh_rpc_reg_proto;
78 }
79 
80 
81 nxt_inline void
nxt_port_rpc_lhq_peer(nxt_lvlhsh_query_t * lhq,nxt_pid_t * peer)82 nxt_port_rpc_lhq_peer(nxt_lvlhsh_query_t *lhq, nxt_pid_t *peer)
83 {
84     lhq->key_hash = nxt_murmur_hash2(peer, sizeof(*peer));
85     lhq->key.length = sizeof(*peer);
86     lhq->key.start = (u_char *) peer;
87     lhq->proto = &lvlhsh_rpc_reg_proto;
88 }
89 
90 
91 uint32_t
nxt_port_rpc_register_handler(nxt_task_t * task,nxt_port_t * port,nxt_port_rpc_handler_t ready_handler,nxt_port_rpc_handler_t error_handler,nxt_pid_t peer,void * data)92 nxt_port_rpc_register_handler(nxt_task_t *task, nxt_port_t *port,
93     nxt_port_rpc_handler_t ready_handler, nxt_port_rpc_handler_t error_handler,
94     nxt_pid_t peer, void *data)
95 {
96     void                *ex;
97     nxt_port_rpc_reg_t  *reg;
98 
99     ex = nxt_port_rpc_register_handler_ex(task, port, ready_handler,
100                                           error_handler, 0);
101 
102     if (ex == NULL) {
103         return 0;
104     }
105 
106     if (peer != -1) {
107         nxt_port_rpc_ex_set_peer(task, port, ex, peer);
108     }
109 
110     reg = nxt_pointer_to(ex, -sizeof(nxt_port_rpc_reg_t));
111 
112     nxt_assert(reg->data == ex);
113 
114     reg->data = data;
115 
116     return reg->stream;
117 }
118 
119 
120 void *
nxt_port_rpc_register_handler_ex(nxt_task_t * task,nxt_port_t * port,nxt_port_rpc_handler_t ready_handler,nxt_port_rpc_handler_t error_handler,size_t ex_size)121 nxt_port_rpc_register_handler_ex(nxt_task_t *task, nxt_port_t *port,
122     nxt_port_rpc_handler_t ready_handler, nxt_port_rpc_handler_t error_handler,
123     size_t ex_size)
124 {
125     uint32_t            stream;
126     nxt_port_rpc_reg_t  *reg;
127     nxt_lvlhsh_query_t  lhq;
128 
129     nxt_assert(port->pair[0] != -1);
130 
131     stream = nxt_atomic_fetch_add(nxt_stream_ident, 1);
132 
133     reg = nxt_mp_zalloc(port->mem_pool, sizeof(nxt_port_rpc_reg_t) + ex_size);
134 
135     if (nxt_slow_path(reg == NULL)) {
136         nxt_debug(task, "rpc: stream #%uD failed to allocate reg", stream);
137 
138         return NULL;
139     }
140 
141     reg->stream = stream;
142     reg->peer = -1;
143     reg->ready_handler = ready_handler;
144     reg->error_handler = error_handler;
145     reg->data = reg + 1;
146 
147     nxt_port_rpc_lhq_stream(&lhq, &stream);
148     lhq.replace = 0;
149     lhq.value = reg;
150     lhq.pool = port->mem_pool;
151 
152     switch (nxt_lvlhsh_insert(&port->rpc_streams, &lhq)) {
153 
154     case NXT_OK:
155         break;
156 
157     default:
158         nxt_log_error(NXT_LOG_ERR, task->log, "rpc: stream #%uD failed to add "
159                       "reg ", stream);
160 
161         nxt_mp_free(port->mem_pool, reg);
162 
163         return NULL;
164     }
165 
166     nxt_debug(task, "rpc: stream #%uD registered", stream);
167 
168     nxt_port_inc_use(port);
169 
170     return reg->data;
171 }
172 
173 
174 uint32_t
nxt_port_rpc_ex_stream(void * ex)175 nxt_port_rpc_ex_stream(void *ex)
176 {
177     nxt_port_rpc_reg_t  *reg;
178 
179     reg = nxt_pointer_to(ex, -sizeof(nxt_port_rpc_reg_t));
180 
181     nxt_assert(reg->data == ex);
182 
183     return reg->stream;
184 }
185 
186 
187 void
nxt_port_rpc_ex_set_peer(nxt_task_t * task,nxt_port_t * port,void * ex,nxt_pid_t peer)188 nxt_port_rpc_ex_set_peer(nxt_task_t *task, nxt_port_t *port,
189     void *ex, nxt_pid_t peer)
190 {
191     nxt_int_t           ret;
192     nxt_queue_link_t    *peer_link;
193     nxt_port_rpc_reg_t  *reg;
194     nxt_lvlhsh_query_t  lhq;
195 
196     reg = nxt_pointer_to(ex, -sizeof(nxt_port_rpc_reg_t));
197 
198     nxt_assert(reg->data == ex);
199 
200     if (nxt_slow_path(peer == reg->peer)) {
201         return;
202     }
203 
204     if (reg->peer != -1) {
205         nxt_port_rpc_remove_from_peers(task, port, reg);
206 
207         reg->peer = -1;
208     }
209 
210     if (peer == -1) {
211         return;
212     }
213 
214     reg->peer = peer;
215 
216     nxt_port_rpc_lhq_peer(&lhq, &peer);
217     lhq.replace = 0;
218     lhq.value = &reg->link;
219     lhq.pool = port->mem_pool;
220 
221     ret = nxt_lvlhsh_insert(&port->rpc_peers, &lhq);
222 
223     switch (ret) {
224 
225     case NXT_OK:
226         reg->link_first = 1;
227         nxt_queue_self(&reg->link);
228 
229         nxt_debug(task, "rpc: stream #%uD assigned uniq pid %PI (%p)",
230                   reg->stream, reg->peer, reg->link.next);
231         break;
232 
233     case NXT_DECLINED:
234         reg->link_first = 0;
235         peer_link = lhq.value;
236         nxt_queue_insert_after(peer_link, &reg->link);
237 
238         nxt_debug(task, "rpc: stream #%uD assigned duplicate pid %PI (%p)",
239                   reg->stream, reg->peer, reg->link.next);
240         break;
241 
242     default:
243         nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to add "
244                       "peer for stream #%uD (%d)", reg->stream, ret);
245 
246         reg->peer = -1;
247         break;
248     }
249 
250 }
251 
252 
253 static void
nxt_port_rpc_remove_from_peers(nxt_task_t * task,nxt_port_t * port,nxt_port_rpc_reg_t * reg)254 nxt_port_rpc_remove_from_peers(nxt_task_t *task, nxt_port_t *port,
255     nxt_port_rpc_reg_t *reg)
256 {
257     uint32_t            stream;
258     nxt_int_t           ret;
259     nxt_lvlhsh_query_t  lhq;
260     nxt_port_rpc_reg_t  *r;
261 
262     stream = reg->stream;
263 
264     if (reg->link_first != 0) {
265         nxt_port_rpc_lhq_peer(&lhq, &reg->peer);
266         lhq.pool = port->mem_pool;
267 
268         if (reg->link.next == &reg->link) {
269             nxt_assert(reg->link.prev == &reg->link);
270 
271             nxt_debug(task, "rpc: stream #%uD remove first and last pid %PI "
272                       "registration (%p)", stream, reg->peer, reg->link.next);
273 
274             ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq);
275 
276         } else {
277             nxt_debug(task, "rpc: stream #%uD remove first pid %PI "
278                       "registration (%p)", stream, reg->peer, reg->link.next);
279 
280             lhq.replace = 1;
281             lhq.value = reg->link.next;
282 
283             r = nxt_queue_link_data(reg->link.next, nxt_port_rpc_reg_t, link);
284             r->link_first = 1;
285 
286             nxt_queue_remove(&reg->link);
287 
288             ret = nxt_lvlhsh_insert(&port->rpc_peers, &lhq);
289         }
290 
291     } else {
292         nxt_debug(task, "rpc: stream #%uD remove pid %PI "
293                   "registration (%p)", stream, reg->peer, reg->link.next);
294 
295         nxt_queue_remove(&reg->link);
296         ret = NXT_OK;
297     }
298 
299     if (nxt_slow_path(ret != NXT_OK)) {
300         nxt_log_error(NXT_LOG_ERR, task->log, "rpc: stream #%uD failed"
301                       " to delete peer %PI (%d)", stream, reg->peer, ret);
302     }
303 }
304 
305 
306 void
nxt_port_rpc_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)307 nxt_port_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
308 {
309     uint8_t              last;
310     uint32_t             stream;
311     nxt_int_t            ret;
312     nxt_port_t           *port;
313     nxt_port_rpc_reg_t   *reg;
314     nxt_lvlhsh_query_t   lhq;
315     nxt_port_msg_type_t  type;
316 
317     stream = msg->port_msg.stream;
318     port = msg->port;
319     last = msg->port_msg.last;
320     type = msg->port_msg.type;
321 
322     nxt_port_rpc_lhq_stream(&lhq, &stream);
323     lhq.pool = port->mem_pool;
324 
325     if (last != 0) {
326         ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq);
327 
328     } else {
329         ret = nxt_lvlhsh_find(&port->rpc_streams, &lhq);
330     }
331 
332     if (ret != NXT_OK) {
333         nxt_debug(task, "rpc: stream #%uD no handler found", stream);
334 
335         return;
336     }
337 
338     nxt_debug(task, "rpc: stream #%uD %shandler, type %d", stream,
339                     (last ? "last " : ""), type);
340 
341     reg = lhq.value;
342 
343     if (type == _NXT_PORT_MSG_RPC_ERROR) {
344         reg->error_handler(task, msg, reg->data);
345 
346     } else {
347         reg->ready_handler(task, msg, reg->data);
348     }
349 
350     if (last == 0) {
351         return;
352     }
353 
354     if (reg->peer != -1) {
355         nxt_port_rpc_remove_from_peers(task, port, reg);
356     }
357 
358     nxt_debug(task, "rpc: stream #%uD free registration", stream);
359 
360     nxt_mp_free(port->mem_pool, reg);
361 
362     nxt_port_use(task, port, -1);
363 }
364 
365 
366 void
nxt_port_rpc_remove_peer(nxt_task_t * task,nxt_port_t * port,nxt_pid_t peer)367 nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer)
368 {
369     uint8_t              last;
370     uint32_t             stream;
371     nxt_int_t            ret;
372     nxt_buf_t            buf;
373     nxt_queue_link_t     *peer_link, *next_link;
374     nxt_port_rpc_reg_t   *reg;
375     nxt_lvlhsh_query_t   lhq;
376     nxt_port_recv_msg_t  msg;
377 
378     nxt_port_rpc_lhq_peer(&lhq, &peer);
379     lhq.pool = port->mem_pool;
380 
381     ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq);
382 
383     if (nxt_slow_path(ret != NXT_OK)) {
384         nxt_debug(task, "rpc: no reg found for peer %PI", peer);
385 
386         return;
387     }
388 
389     nxt_memzero(&msg, sizeof(msg));
390     nxt_memzero(&buf, sizeof(buf));
391 
392     msg.fd[0] = -1;
393     msg.fd[1] = -1;
394     msg.buf = &buf;
395     msg.port = port;
396     msg.u.removed_pid = peer;
397     msg.port_msg.pid = nxt_pid;
398     msg.port_msg.type = _NXT_PORT_MSG_REMOVE_PID;
399 
400     peer_link = lhq.value;
401     last = 0;
402 
403     while (last == 0) {
404 
405         reg = nxt_queue_link_data(peer_link, nxt_port_rpc_reg_t, link);
406 
407         nxt_assert(reg->peer == peer);
408 
409         stream = reg->stream;
410 
411         nxt_debug(task, "rpc: stream #%uD trigger error", stream);
412 
413         msg.port_msg.stream = stream;
414         msg.port_msg.last = 1;
415 
416         if (peer_link == peer_link->next) {
417             nxt_assert(peer_link->prev == peer_link);
418 
419             last = 1;
420 
421         } else {
422             nxt_assert(peer_link->next->prev == peer_link);
423             nxt_assert(peer_link->prev->next == peer_link);
424 
425             next_link = peer_link->next;
426             nxt_queue_remove(peer_link);
427 
428             peer_link = next_link;
429         }
430 
431         reg->peer = -1;
432 
433         reg->error_handler(task, &msg, reg->data);
434 
435         /* Reset 'last' flag to preserve rpc handler. */
436         if (msg.port_msg.last == 0) {
437             continue;
438         }
439 
440         nxt_port_rpc_lhq_stream(&lhq, &stream);
441         lhq.pool = port->mem_pool;
442 
443         ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq);
444 
445         if (nxt_slow_path(ret != NXT_OK)) {
446             nxt_log_error(NXT_LOG_ERR, task->log,
447                           "rpc: stream #%uD failed to delete handler", stream);
448 
449             return;
450         }
451 
452         nxt_mp_free(port->mem_pool, reg);
453 
454         nxt_port_use(task, port, -1);
455     }
456 }
457 
458 
459 void
nxt_port_rpc_cancel(nxt_task_t * task,nxt_port_t * port,uint32_t stream)460 nxt_port_rpc_cancel(nxt_task_t *task, nxt_port_t *port, uint32_t stream)
461 {
462     nxt_int_t           ret;
463     nxt_port_rpc_reg_t  *reg;
464     nxt_lvlhsh_query_t  lhq;
465 
466     nxt_port_rpc_lhq_stream(&lhq, &stream);
467     lhq.pool = port->mem_pool;
468 
469     ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq);
470 
471     if (ret != NXT_OK) {
472         nxt_debug(task, "rpc: stream #%uD no handler found", stream);
473 
474         return;
475     }
476 
477     reg = lhq.value;
478 
479     if (reg->peer != -1) {
480         nxt_port_rpc_remove_from_peers(task, port, reg);
481     }
482 
483     nxt_debug(task, "rpc: stream #%uD cancel registration", stream);
484 
485     nxt_mp_free(port->mem_pool, reg);
486 
487     nxt_port_use(task, port, -1);
488 }
489 
490 static nxt_buf_t  nxt_port_close_dummy_buf;
491 
492 void
nxt_port_rpc_close(nxt_task_t * task,nxt_port_t * port)493 nxt_port_rpc_close(nxt_task_t *task, nxt_port_t *port)
494 {
495     nxt_port_rpc_reg_t   *reg;
496     nxt_port_recv_msg_t  msg;
497 
498     for ( ;; ) {
499         reg = nxt_lvlhsh_peek(&port->rpc_streams, &lvlhsh_rpc_reg_proto);
500         if (reg == NULL) {
501             return;
502         }
503 
504         msg.fd[0] = -1;
505         msg.fd[1] = -1;
506         msg.buf = &nxt_port_close_dummy_buf;
507         msg.port = port;
508         msg.port_msg.stream = reg->stream;
509         msg.port_msg.pid = nxt_pid;
510         msg.port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
511         msg.port_msg.last = 1;
512         msg.port_msg.mmap = 0;
513         msg.port_msg.nf = 0;
514         msg.port_msg.mf = 0;
515         msg.size = 0;
516         msg.cancelled = 0;
517         msg.u.data = NULL;
518 
519         nxt_port_rpc_handler(task, &msg);
520     }
521 }
522