1
2 /*
3 * Copyright (C) Max Romanov
4 * Copyright (C) NGINX, Inc.
5 */
6
7 #include <nxt_main.h>
8 #include <nxt_port_rpc.h>
9
10
11 static volatile uint32_t *nxt_stream_ident;
12
13 typedef struct nxt_port_rpc_reg_s nxt_port_rpc_reg_t;
14
15 struct nxt_port_rpc_reg_s {
16 uint32_t stream;
17
18 nxt_pid_t peer;
19 nxt_queue_link_t link;
20 nxt_bool_t link_first;
21
22 nxt_port_rpc_handler_t ready_handler;
23 nxt_port_rpc_handler_t error_handler;
24 void *data;
25 };
26
27
28 static void
29 nxt_port_rpc_remove_from_peers(nxt_task_t *task, nxt_port_t *port,
30 nxt_port_rpc_reg_t *reg);
31
32
33 nxt_int_t
nxt_port_rpc_init(void)34 nxt_port_rpc_init(void)
35 {
36 void *p;
37
38 if (nxt_stream_ident != NULL) {
39 return NXT_OK;
40 }
41
42 p = nxt_mem_mmap(NULL, sizeof(*nxt_stream_ident), PROT_READ | PROT_WRITE,
43 MAP_ANON | MAP_SHARED, -1, 0);
44
45 if (nxt_slow_path(p == MAP_FAILED)) {
46 return NXT_ERROR;
47 }
48
49 nxt_stream_ident = p;
50 *nxt_stream_ident = 1;
51
52 return NXT_OK;
53 }
54
55
56 static nxt_int_t
nxt_rpc_reg_test(nxt_lvlhsh_query_t * lhq,void * data)57 nxt_rpc_reg_test(nxt_lvlhsh_query_t *lhq, void *data)
58 {
59 return NXT_OK;
60 }
61
62
63 static const nxt_lvlhsh_proto_t lvlhsh_rpc_reg_proto nxt_aligned(64) = {
64 NXT_LVLHSH_DEFAULT,
65 nxt_rpc_reg_test,
66 nxt_lvlhsh_alloc,
67 nxt_lvlhsh_free,
68 };
69
70
71 nxt_inline void
nxt_port_rpc_lhq_stream(nxt_lvlhsh_query_t * lhq,uint32_t * stream)72 nxt_port_rpc_lhq_stream(nxt_lvlhsh_query_t *lhq, uint32_t *stream)
73 {
74 lhq->key_hash = nxt_murmur_hash2(stream, sizeof(*stream));
75 lhq->key.length = sizeof(*stream);
76 lhq->key.start = (u_char *) stream;
77 lhq->proto = &lvlhsh_rpc_reg_proto;
78 }
79
80
81 nxt_inline void
nxt_port_rpc_lhq_peer(nxt_lvlhsh_query_t * lhq,nxt_pid_t * peer)82 nxt_port_rpc_lhq_peer(nxt_lvlhsh_query_t *lhq, nxt_pid_t *peer)
83 {
84 lhq->key_hash = nxt_murmur_hash2(peer, sizeof(*peer));
85 lhq->key.length = sizeof(*peer);
86 lhq->key.start = (u_char *) peer;
87 lhq->proto = &lvlhsh_rpc_reg_proto;
88 }
89
90
91 uint32_t
nxt_port_rpc_register_handler(nxt_task_t * task,nxt_port_t * port,nxt_port_rpc_handler_t ready_handler,nxt_port_rpc_handler_t error_handler,nxt_pid_t peer,void * data)92 nxt_port_rpc_register_handler(nxt_task_t *task, nxt_port_t *port,
93 nxt_port_rpc_handler_t ready_handler, nxt_port_rpc_handler_t error_handler,
94 nxt_pid_t peer, void *data)
95 {
96 void *ex;
97 nxt_port_rpc_reg_t *reg;
98
99 ex = nxt_port_rpc_register_handler_ex(task, port, ready_handler,
100 error_handler, 0);
101
102 if (ex == NULL) {
103 return 0;
104 }
105
106 if (peer != -1) {
107 nxt_port_rpc_ex_set_peer(task, port, ex, peer);
108 }
109
110 reg = nxt_pointer_to(ex, -sizeof(nxt_port_rpc_reg_t));
111
112 nxt_assert(reg->data == ex);
113
114 reg->data = data;
115
116 return reg->stream;
117 }
118
119
120 void *
nxt_port_rpc_register_handler_ex(nxt_task_t * task,nxt_port_t * port,nxt_port_rpc_handler_t ready_handler,nxt_port_rpc_handler_t error_handler,size_t ex_size)121 nxt_port_rpc_register_handler_ex(nxt_task_t *task, nxt_port_t *port,
122 nxt_port_rpc_handler_t ready_handler, nxt_port_rpc_handler_t error_handler,
123 size_t ex_size)
124 {
125 uint32_t stream;
126 nxt_port_rpc_reg_t *reg;
127 nxt_lvlhsh_query_t lhq;
128
129 nxt_assert(port->pair[0] != -1);
130
131 stream = nxt_atomic_fetch_add(nxt_stream_ident, 1);
132
133 reg = nxt_mp_zalloc(port->mem_pool, sizeof(nxt_port_rpc_reg_t) + ex_size);
134
135 if (nxt_slow_path(reg == NULL)) {
136 nxt_debug(task, "rpc: stream #%uD failed to allocate reg", stream);
137
138 return NULL;
139 }
140
141 reg->stream = stream;
142 reg->peer = -1;
143 reg->ready_handler = ready_handler;
144 reg->error_handler = error_handler;
145 reg->data = reg + 1;
146
147 nxt_port_rpc_lhq_stream(&lhq, &stream);
148 lhq.replace = 0;
149 lhq.value = reg;
150 lhq.pool = port->mem_pool;
151
152 switch (nxt_lvlhsh_insert(&port->rpc_streams, &lhq)) {
153
154 case NXT_OK:
155 break;
156
157 default:
158 nxt_log_error(NXT_LOG_ERR, task->log, "rpc: stream #%uD failed to add "
159 "reg ", stream);
160
161 nxt_mp_free(port->mem_pool, reg);
162
163 return NULL;
164 }
165
166 nxt_debug(task, "rpc: stream #%uD registered", stream);
167
168 nxt_port_inc_use(port);
169
170 return reg->data;
171 }
172
173
174 uint32_t
nxt_port_rpc_ex_stream(void * ex)175 nxt_port_rpc_ex_stream(void *ex)
176 {
177 nxt_port_rpc_reg_t *reg;
178
179 reg = nxt_pointer_to(ex, -sizeof(nxt_port_rpc_reg_t));
180
181 nxt_assert(reg->data == ex);
182
183 return reg->stream;
184 }
185
186
187 void
nxt_port_rpc_ex_set_peer(nxt_task_t * task,nxt_port_t * port,void * ex,nxt_pid_t peer)188 nxt_port_rpc_ex_set_peer(nxt_task_t *task, nxt_port_t *port,
189 void *ex, nxt_pid_t peer)
190 {
191 nxt_int_t ret;
192 nxt_queue_link_t *peer_link;
193 nxt_port_rpc_reg_t *reg;
194 nxt_lvlhsh_query_t lhq;
195
196 reg = nxt_pointer_to(ex, -sizeof(nxt_port_rpc_reg_t));
197
198 nxt_assert(reg->data == ex);
199
200 if (nxt_slow_path(peer == reg->peer)) {
201 return;
202 }
203
204 if (reg->peer != -1) {
205 nxt_port_rpc_remove_from_peers(task, port, reg);
206
207 reg->peer = -1;
208 }
209
210 if (peer == -1) {
211 return;
212 }
213
214 reg->peer = peer;
215
216 nxt_port_rpc_lhq_peer(&lhq, &peer);
217 lhq.replace = 0;
218 lhq.value = ®->link;
219 lhq.pool = port->mem_pool;
220
221 ret = nxt_lvlhsh_insert(&port->rpc_peers, &lhq);
222
223 switch (ret) {
224
225 case NXT_OK:
226 reg->link_first = 1;
227 nxt_queue_self(®->link);
228
229 nxt_debug(task, "rpc: stream #%uD assigned uniq pid %PI (%p)",
230 reg->stream, reg->peer, reg->link.next);
231 break;
232
233 case NXT_DECLINED:
234 reg->link_first = 0;
235 peer_link = lhq.value;
236 nxt_queue_insert_after(peer_link, ®->link);
237
238 nxt_debug(task, "rpc: stream #%uD assigned duplicate pid %PI (%p)",
239 reg->stream, reg->peer, reg->link.next);
240 break;
241
242 default:
243 nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to add "
244 "peer for stream #%uD (%d)", reg->stream, ret);
245
246 reg->peer = -1;
247 break;
248 }
249
250 }
251
252
253 static void
nxt_port_rpc_remove_from_peers(nxt_task_t * task,nxt_port_t * port,nxt_port_rpc_reg_t * reg)254 nxt_port_rpc_remove_from_peers(nxt_task_t *task, nxt_port_t *port,
255 nxt_port_rpc_reg_t *reg)
256 {
257 uint32_t stream;
258 nxt_int_t ret;
259 nxt_lvlhsh_query_t lhq;
260 nxt_port_rpc_reg_t *r;
261
262 stream = reg->stream;
263
264 if (reg->link_first != 0) {
265 nxt_port_rpc_lhq_peer(&lhq, ®->peer);
266 lhq.pool = port->mem_pool;
267
268 if (reg->link.next == ®->link) {
269 nxt_assert(reg->link.prev == ®->link);
270
271 nxt_debug(task, "rpc: stream #%uD remove first and last pid %PI "
272 "registration (%p)", stream, reg->peer, reg->link.next);
273
274 ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq);
275
276 } else {
277 nxt_debug(task, "rpc: stream #%uD remove first pid %PI "
278 "registration (%p)", stream, reg->peer, reg->link.next);
279
280 lhq.replace = 1;
281 lhq.value = reg->link.next;
282
283 r = nxt_queue_link_data(reg->link.next, nxt_port_rpc_reg_t, link);
284 r->link_first = 1;
285
286 nxt_queue_remove(®->link);
287
288 ret = nxt_lvlhsh_insert(&port->rpc_peers, &lhq);
289 }
290
291 } else {
292 nxt_debug(task, "rpc: stream #%uD remove pid %PI "
293 "registration (%p)", stream, reg->peer, reg->link.next);
294
295 nxt_queue_remove(®->link);
296 ret = NXT_OK;
297 }
298
299 if (nxt_slow_path(ret != NXT_OK)) {
300 nxt_log_error(NXT_LOG_ERR, task->log, "rpc: stream #%uD failed"
301 " to delete peer %PI (%d)", stream, reg->peer, ret);
302 }
303 }
304
305
306 void
nxt_port_rpc_handler(nxt_task_t * task,nxt_port_recv_msg_t * msg)307 nxt_port_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
308 {
309 uint8_t last;
310 uint32_t stream;
311 nxt_int_t ret;
312 nxt_port_t *port;
313 nxt_port_rpc_reg_t *reg;
314 nxt_lvlhsh_query_t lhq;
315 nxt_port_msg_type_t type;
316
317 stream = msg->port_msg.stream;
318 port = msg->port;
319 last = msg->port_msg.last;
320 type = msg->port_msg.type;
321
322 nxt_port_rpc_lhq_stream(&lhq, &stream);
323 lhq.pool = port->mem_pool;
324
325 if (last != 0) {
326 ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq);
327
328 } else {
329 ret = nxt_lvlhsh_find(&port->rpc_streams, &lhq);
330 }
331
332 if (ret != NXT_OK) {
333 nxt_debug(task, "rpc: stream #%uD no handler found", stream);
334
335 return;
336 }
337
338 nxt_debug(task, "rpc: stream #%uD %shandler, type %d", stream,
339 (last ? "last " : ""), type);
340
341 reg = lhq.value;
342
343 if (type == _NXT_PORT_MSG_RPC_ERROR) {
344 reg->error_handler(task, msg, reg->data);
345
346 } else {
347 reg->ready_handler(task, msg, reg->data);
348 }
349
350 if (last == 0) {
351 return;
352 }
353
354 if (reg->peer != -1) {
355 nxt_port_rpc_remove_from_peers(task, port, reg);
356 }
357
358 nxt_debug(task, "rpc: stream #%uD free registration", stream);
359
360 nxt_mp_free(port->mem_pool, reg);
361
362 nxt_port_use(task, port, -1);
363 }
364
365
366 void
nxt_port_rpc_remove_peer(nxt_task_t * task,nxt_port_t * port,nxt_pid_t peer)367 nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer)
368 {
369 uint8_t last;
370 uint32_t stream;
371 nxt_int_t ret;
372 nxt_buf_t buf;
373 nxt_queue_link_t *peer_link, *next_link;
374 nxt_port_rpc_reg_t *reg;
375 nxt_lvlhsh_query_t lhq;
376 nxt_port_recv_msg_t msg;
377
378 nxt_port_rpc_lhq_peer(&lhq, &peer);
379 lhq.pool = port->mem_pool;
380
381 ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq);
382
383 if (nxt_slow_path(ret != NXT_OK)) {
384 nxt_debug(task, "rpc: no reg found for peer %PI", peer);
385
386 return;
387 }
388
389 nxt_memzero(&msg, sizeof(msg));
390 nxt_memzero(&buf, sizeof(buf));
391
392 msg.fd[0] = -1;
393 msg.fd[1] = -1;
394 msg.buf = &buf;
395 msg.port = port;
396 msg.u.removed_pid = peer;
397 msg.port_msg.pid = nxt_pid;
398 msg.port_msg.type = _NXT_PORT_MSG_REMOVE_PID;
399
400 peer_link = lhq.value;
401 last = 0;
402
403 while (last == 0) {
404
405 reg = nxt_queue_link_data(peer_link, nxt_port_rpc_reg_t, link);
406
407 nxt_assert(reg->peer == peer);
408
409 stream = reg->stream;
410
411 nxt_debug(task, "rpc: stream #%uD trigger error", stream);
412
413 msg.port_msg.stream = stream;
414 msg.port_msg.last = 1;
415
416 if (peer_link == peer_link->next) {
417 nxt_assert(peer_link->prev == peer_link);
418
419 last = 1;
420
421 } else {
422 nxt_assert(peer_link->next->prev == peer_link);
423 nxt_assert(peer_link->prev->next == peer_link);
424
425 next_link = peer_link->next;
426 nxt_queue_remove(peer_link);
427
428 peer_link = next_link;
429 }
430
431 reg->peer = -1;
432
433 reg->error_handler(task, &msg, reg->data);
434
435 /* Reset 'last' flag to preserve rpc handler. */
436 if (msg.port_msg.last == 0) {
437 continue;
438 }
439
440 nxt_port_rpc_lhq_stream(&lhq, &stream);
441 lhq.pool = port->mem_pool;
442
443 ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq);
444
445 if (nxt_slow_path(ret != NXT_OK)) {
446 nxt_log_error(NXT_LOG_ERR, task->log,
447 "rpc: stream #%uD failed to delete handler", stream);
448
449 return;
450 }
451
452 nxt_mp_free(port->mem_pool, reg);
453
454 nxt_port_use(task, port, -1);
455 }
456 }
457
458
459 void
nxt_port_rpc_cancel(nxt_task_t * task,nxt_port_t * port,uint32_t stream)460 nxt_port_rpc_cancel(nxt_task_t *task, nxt_port_t *port, uint32_t stream)
461 {
462 nxt_int_t ret;
463 nxt_port_rpc_reg_t *reg;
464 nxt_lvlhsh_query_t lhq;
465
466 nxt_port_rpc_lhq_stream(&lhq, &stream);
467 lhq.pool = port->mem_pool;
468
469 ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq);
470
471 if (ret != NXT_OK) {
472 nxt_debug(task, "rpc: stream #%uD no handler found", stream);
473
474 return;
475 }
476
477 reg = lhq.value;
478
479 if (reg->peer != -1) {
480 nxt_port_rpc_remove_from_peers(task, port, reg);
481 }
482
483 nxt_debug(task, "rpc: stream #%uD cancel registration", stream);
484
485 nxt_mp_free(port->mem_pool, reg);
486
487 nxt_port_use(task, port, -1);
488 }
489
490 static nxt_buf_t nxt_port_close_dummy_buf;
491
492 void
nxt_port_rpc_close(nxt_task_t * task,nxt_port_t * port)493 nxt_port_rpc_close(nxt_task_t *task, nxt_port_t *port)
494 {
495 nxt_port_rpc_reg_t *reg;
496 nxt_port_recv_msg_t msg;
497
498 for ( ;; ) {
499 reg = nxt_lvlhsh_peek(&port->rpc_streams, &lvlhsh_rpc_reg_proto);
500 if (reg == NULL) {
501 return;
502 }
503
504 msg.fd[0] = -1;
505 msg.fd[1] = -1;
506 msg.buf = &nxt_port_close_dummy_buf;
507 msg.port = port;
508 msg.port_msg.stream = reg->stream;
509 msg.port_msg.pid = nxt_pid;
510 msg.port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
511 msg.port_msg.last = 1;
512 msg.port_msg.mmap = 0;
513 msg.port_msg.nf = 0;
514 msg.port_msg.mf = 0;
515 msg.size = 0;
516 msg.cancelled = 0;
517 msg.u.data = NULL;
518
519 nxt_port_rpc_handler(task, &msg);
520 }
521 }
522