1 2 /* 3 * Copyright (C) Max Romanov 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 #include <nxt_port_rpc.h> 9 10 11 static volatile uint32_t *nxt_stream_ident; 12 13 typedef struct nxt_port_rpc_reg_s nxt_port_rpc_reg_t; 14 15 struct nxt_port_rpc_reg_s { 16 uint32_t stream; 17 18 nxt_pid_t peer; 19 nxt_queue_link_t link; 20 nxt_bool_t link_first; 21 22 nxt_port_rpc_handler_t ready_handler; 23 nxt_port_rpc_handler_t error_handler; 24 void *data; 25 }; 26 27 28 static void 29 nxt_port_rpc_remove_from_peers(nxt_task_t *task, nxt_port_t *port, 30 nxt_port_rpc_reg_t *reg); 31 32 33 nxt_int_t 34 nxt_port_rpc_init(void) 35 { 36 void *p; 37 38 if (nxt_stream_ident != NULL) { 39 return NXT_OK; 40 } 41 42 p = nxt_mem_mmap(NULL, sizeof(*nxt_stream_ident), PROT_READ | PROT_WRITE, 43 MAP_ANON | MAP_SHARED, -1, 0); 44 45 if (nxt_slow_path(p == MAP_FAILED)) { 46 return NXT_ERROR; 47 } 48 49 nxt_stream_ident = p; 50 *nxt_stream_ident = 1; 51 52 return NXT_OK; 53 } 54 55 56 static nxt_int_t 57 nxt_rpc_reg_test(nxt_lvlhsh_query_t *lhq, void *data) 58 { 59 return NXT_OK; 60 } 61 62 63 static const nxt_lvlhsh_proto_t lvlhsh_rpc_reg_proto nxt_aligned(64) = { 64 NXT_LVLHSH_DEFAULT, 65 nxt_rpc_reg_test, 66 nxt_lvlhsh_alloc, 67 nxt_lvlhsh_free, 68 }; 69 70 71 nxt_inline void 72 nxt_port_rpc_lhq_stream(nxt_lvlhsh_query_t *lhq, uint32_t *stream) 73 { 74 lhq->key_hash = nxt_murmur_hash2(stream, sizeof(*stream)); 75 lhq->key.length = sizeof(*stream); 76 lhq->key.start = (u_char *) stream; 77 lhq->proto = &lvlhsh_rpc_reg_proto; 78 } 79 80 81 nxt_inline void 82 nxt_port_rpc_lhq_peer(nxt_lvlhsh_query_t *lhq, nxt_pid_t *peer) 83 { 84 lhq->key_hash = nxt_murmur_hash2(peer, sizeof(*peer)); 85 lhq->key.length = sizeof(*peer); 86 lhq->key.start = (u_char *) peer; 87 lhq->proto = &lvlhsh_rpc_reg_proto; 88 } 89 90 91 uint32_t 92 nxt_port_rpc_register_handler(nxt_task_t *task, nxt_port_t *port, 93 nxt_port_rpc_handler_t ready_handler, nxt_port_rpc_handler_t error_handler, 94 nxt_pid_t peer, void *data) 95 { 96 void *ex; 97 nxt_port_rpc_reg_t *reg; 98 99 ex = nxt_port_rpc_register_handler_ex(task, port, ready_handler, 100 error_handler, 0); 101 102 if (ex == NULL) { 103 return 0; 104 } 105 106 if (peer != -1) { 107 nxt_port_rpc_ex_set_peer(task, port, ex, peer); 108 } 109 110 reg = nxt_pointer_to(ex, -sizeof(nxt_port_rpc_reg_t)); 111 112 nxt_assert(reg->data == ex); 113 114 reg->data = data; 115 116 return reg->stream; 117 } 118 119 120 void * 121 nxt_port_rpc_register_handler_ex(nxt_task_t *task, nxt_port_t *port, 122 nxt_port_rpc_handler_t ready_handler, nxt_port_rpc_handler_t error_handler, 123 size_t ex_size) 124 { 125 uint32_t stream; 126 nxt_port_rpc_reg_t *reg; 127 nxt_lvlhsh_query_t lhq; 128 129 nxt_assert(port->pair[0] != -1); 130 131 stream = nxt_atomic_fetch_add(nxt_stream_ident, 1); 132 133 reg = nxt_mp_zalloc(port->mem_pool, sizeof(nxt_port_rpc_reg_t) + ex_size); 134 135 if (nxt_slow_path(reg == NULL)) { 136 nxt_debug(task, "rpc: stream #%uD failed to allocate reg", stream); 137 138 return NULL; 139 } 140 141 reg->stream = stream; 142 reg->peer = -1; 143 reg->ready_handler = ready_handler; 144 reg->error_handler = error_handler; 145 reg->data = reg + 1; 146 147 nxt_port_rpc_lhq_stream(&lhq, &stream); 148 lhq.replace = 0; 149 lhq.value = reg; 150 lhq.pool = port->mem_pool; 151 152 switch (nxt_lvlhsh_insert(&port->rpc_streams, &lhq)) { 153 154 case NXT_OK: 155 break; 156 157 default: 158 nxt_log_error(NXT_LOG_ERR, task->log, "rpc: stream #%uD failed to add " 159 "reg ", stream); 160 161 nxt_mp_free(port->mem_pool, reg); 162 163 return NULL; 164 } 165 166 nxt_debug(task, "rpc: stream #%uD registered", stream); 167 168 nxt_port_inc_use(port); 169 170 return reg->data; 171 } 172 173 174 uint32_t 175 nxt_port_rpc_ex_stream(void *ex) 176 { 177 nxt_port_rpc_reg_t *reg; 178 179 reg = nxt_pointer_to(ex, -sizeof(nxt_port_rpc_reg_t)); 180 181 nxt_assert(reg->data == ex); 182 183 return reg->stream; 184 } 185 186 187 void 188 nxt_port_rpc_ex_set_peer(nxt_task_t *task, nxt_port_t *port, 189 void *ex, nxt_pid_t peer) 190 { 191 nxt_int_t ret; 192 nxt_queue_link_t *peer_link; 193 nxt_port_rpc_reg_t *reg; 194 nxt_lvlhsh_query_t lhq; 195 196 reg = nxt_pointer_to(ex, -sizeof(nxt_port_rpc_reg_t)); 197 198 nxt_assert(reg->data == ex); 199 200 if (nxt_slow_path(peer == reg->peer)) { 201 return; 202 } 203 204 if (reg->peer != -1) { 205 nxt_port_rpc_remove_from_peers(task, port, reg); 206 207 reg->peer = -1; 208 } 209 210 if (peer == -1) { 211 return; 212 } 213 214 reg->peer = peer; 215 216 nxt_port_rpc_lhq_peer(&lhq, &peer); 217 lhq.replace = 0; 218 lhq.value = ®->link; 219 lhq.pool = port->mem_pool; 220 221 ret = nxt_lvlhsh_insert(&port->rpc_peers, &lhq); 222 223 switch (ret) { 224 225 case NXT_OK: 226 reg->link_first = 1; 227 nxt_queue_self(®->link); 228 229 nxt_debug(task, "rpc: stream #%uD assigned uniq pid %PI (%p)", 230 reg->stream, reg->peer, reg->link.next); 231 break; 232 233 case NXT_DECLINED: 234 reg->link_first = 0; 235 peer_link = lhq.value; 236 nxt_queue_insert_after(peer_link, ®->link); 237 238 nxt_debug(task, "rpc: stream #%uD assigned duplicate pid %PI (%p)", 239 reg->stream, reg->peer, reg->link.next); 240 break; 241 242 default: 243 nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to add " 244 "peer for stream #%uD (%d)", reg->stream, ret); 245 246 reg->peer = -1; 247 break; 248 } 249 250 } 251 252 253 static void 254 nxt_port_rpc_remove_from_peers(nxt_task_t *task, nxt_port_t *port, 255 nxt_port_rpc_reg_t *reg) 256 { 257 uint32_t stream; 258 nxt_int_t ret; 259 nxt_lvlhsh_query_t lhq; 260 nxt_port_rpc_reg_t *r; 261 262 stream = reg->stream; 263 264 if (reg->link_first != 0) { 265 nxt_port_rpc_lhq_peer(&lhq, ®->peer); 266 lhq.pool = port->mem_pool; 267 268 if (reg->link.next == ®->link) { 269 nxt_assert(reg->link.prev == ®->link); 270 271 nxt_debug(task, "rpc: stream #%uD remove first and last pid %PI " 272 "registration (%p)", stream, reg->peer, reg->link.next); 273 274 ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq); 275 276 } else { 277 nxt_debug(task, "rpc: stream #%uD remove first pid %PI " 278 "registration (%p)", stream, reg->peer, reg->link.next); 279 280 lhq.replace = 1; 281 lhq.value = reg->link.next; 282 283 r = nxt_queue_link_data(reg->link.next, nxt_port_rpc_reg_t, link); 284 r->link_first = 1; 285 286 nxt_queue_remove(®->link); 287 288 ret = nxt_lvlhsh_insert(&port->rpc_peers, &lhq); 289 } 290 291 } else { 292 nxt_debug(task, "rpc: stream #%uD remove pid %PI " 293 "registration (%p)", stream, reg->peer, reg->link.next); 294 295 nxt_queue_remove(®->link); 296 ret = NXT_OK; 297 } 298 299 if (nxt_slow_path(ret != NXT_OK)) { 300 nxt_log_error(NXT_LOG_ERR, task->log, "rpc: stream #%uD failed" 301 " to delete peer %PI (%d)", stream, reg->peer, ret); 302 } 303 } 304 305 306 void 307 nxt_port_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 308 { 309 uint8_t last; 310 uint32_t stream; 311 nxt_int_t ret; 312 nxt_port_t *port; 313 nxt_port_rpc_reg_t *reg; 314 nxt_lvlhsh_query_t lhq; 315 nxt_port_msg_type_t type; 316 317 stream = msg->port_msg.stream; 318 port = msg->port; 319 last = msg->port_msg.last; 320 type = msg->port_msg.type; 321 322 nxt_port_rpc_lhq_stream(&lhq, &stream); 323 lhq.pool = port->mem_pool; 324 325 if (last != 0) { 326 ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq); 327 328 } else { 329 ret = nxt_lvlhsh_find(&port->rpc_streams, &lhq); 330 } 331 332 if (ret != NXT_OK) { 333 nxt_debug(task, "rpc: stream #%uD no handler found", stream); 334 335 return; 336 } 337 338 nxt_debug(task, "rpc: stream #%uD %shandler, type %d", stream, 339 (last ? "last " : ""), type); 340 341 reg = lhq.value; 342 343 if (type == _NXT_PORT_MSG_RPC_ERROR) { 344 reg->error_handler(task, msg, reg->data); 345 346 } else { 347 reg->ready_handler(task, msg, reg->data); 348 } 349 350 if (last == 0) { 351 return; 352 } 353 354 if (reg->peer != -1) { 355 nxt_port_rpc_remove_from_peers(task, port, reg); 356 } 357 358 nxt_debug(task, "rpc: stream #%uD free registration", stream); 359 360 nxt_mp_free(port->mem_pool, reg); 361 362 nxt_port_use(task, port, -1); 363 } 364 365 366 void 367 nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer) 368 { 369 uint8_t last; 370 uint32_t stream; 371 nxt_int_t ret; 372 nxt_buf_t buf; 373 nxt_queue_link_t *peer_link, *next_link; 374 nxt_port_rpc_reg_t *reg; 375 nxt_lvlhsh_query_t lhq; 376 nxt_port_recv_msg_t msg; 377 378 nxt_port_rpc_lhq_peer(&lhq, &peer); 379 lhq.pool = port->mem_pool; 380 381 ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq); 382 383 if (nxt_slow_path(ret != NXT_OK)) { 384 nxt_debug(task, "rpc: no reg found for peer %PI", peer); 385 386 return; 387 } 388 389 nxt_memzero(&msg, sizeof(msg)); 390 nxt_memzero(&buf, sizeof(buf)); 391 392 msg.fd[0] = -1; 393 msg.fd[1] = -1; 394 msg.buf = &buf; 395 msg.port = port; 396 msg.u.removed_pid = peer; 397 msg.port_msg.pid = nxt_pid; 398 msg.port_msg.type = _NXT_PORT_MSG_REMOVE_PID; 399 400 peer_link = lhq.value; 401 last = 0; 402 403 while (last == 0) { 404 405 reg = nxt_queue_link_data(peer_link, nxt_port_rpc_reg_t, link); 406 407 nxt_assert(reg->peer == peer); 408 409 stream = reg->stream; 410 411 nxt_debug(task, "rpc: stream #%uD trigger error", stream); 412 413 msg.port_msg.stream = stream; 414 msg.port_msg.last = 1; 415 416 if (peer_link == peer_link->next) { 417 nxt_assert(peer_link->prev == peer_link); 418 419 last = 1; 420 421 } else { 422 nxt_assert(peer_link->next->prev == peer_link); 423 nxt_assert(peer_link->prev->next == peer_link); 424 425 next_link = peer_link->next; 426 nxt_queue_remove(peer_link); 427 428 peer_link = next_link; 429 } 430 431 reg->peer = -1; 432 433 reg->error_handler(task, &msg, reg->data); 434 435 /* Reset 'last' flag to preserve rpc handler. */ 436 if (msg.port_msg.last == 0) { 437 continue; 438 } 439 440 nxt_port_rpc_lhq_stream(&lhq, &stream); 441 lhq.pool = port->mem_pool; 442 443 ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq); 444 445 if (nxt_slow_path(ret != NXT_OK)) { 446 nxt_log_error(NXT_LOG_ERR, task->log, 447 "rpc: stream #%uD failed to delete handler", stream); 448 449 return; 450 } 451 452 nxt_mp_free(port->mem_pool, reg); 453 454 nxt_port_use(task, port, -1); 455 } 456 } 457 458 459 void 460 nxt_port_rpc_cancel(nxt_task_t *task, nxt_port_t *port, uint32_t stream) 461 { 462 nxt_int_t ret; 463 nxt_port_rpc_reg_t *reg; 464 nxt_lvlhsh_query_t lhq; 465 466 nxt_port_rpc_lhq_stream(&lhq, &stream); 467 lhq.pool = port->mem_pool; 468 469 ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq); 470 471 if (ret != NXT_OK) { 472 nxt_debug(task, "rpc: stream #%uD no handler found", stream); 473 474 return; 475 } 476 477 reg = lhq.value; 478 479 if (reg->peer != -1) { 480 nxt_port_rpc_remove_from_peers(task, port, reg); 481 } 482 483 nxt_debug(task, "rpc: stream #%uD cancel registration", stream); 484 485 nxt_mp_free(port->mem_pool, reg); 486 487 nxt_port_use(task, port, -1); 488 } 489 490 static nxt_buf_t nxt_port_close_dummy_buf; 491 492 void 493 nxt_port_rpc_close(nxt_task_t *task, nxt_port_t *port) 494 { 495 nxt_port_rpc_reg_t *reg; 496 nxt_port_recv_msg_t msg; 497 498 for ( ;; ) { 499 reg = nxt_lvlhsh_peek(&port->rpc_streams, &lvlhsh_rpc_reg_proto); 500 if (reg == NULL) { 501 return; 502 } 503 504 msg.fd[0] = -1; 505 msg.fd[1] = -1; 506 msg.buf = &nxt_port_close_dummy_buf; 507 msg.port = port; 508 msg.port_msg.stream = reg->stream; 509 msg.port_msg.pid = nxt_pid; 510 msg.port_msg.type = _NXT_PORT_MSG_RPC_ERROR; 511 msg.port_msg.last = 1; 512 msg.port_msg.mmap = 0; 513 msg.port_msg.nf = 0; 514 msg.port_msg.mf = 0; 515 msg.port_msg.tracking = 0; 516 msg.size = 0; 517 msg.cancelled = 0; 518 msg.u.data = NULL; 519 520 nxt_port_rpc_handler(task, &msg); 521 } 522 } 523