1 2 /* 3 * Copyright (C) Max Romanov 4 * Copyright (C) NGINX, Inc. 5 */ 6 7 #include <nxt_main.h> 8 #include <nxt_port_rpc.h> 9 10 11 static nxt_atomic_t nxt_stream_ident = 1; 12 13 typedef struct nxt_port_rpc_reg_s nxt_port_rpc_reg_t; 14 15 struct nxt_port_rpc_reg_s { 16 uint32_t stream; 17 18 nxt_pid_t peer; 19 nxt_queue_link_t link; 20 nxt_bool_t link_first; 21 22 nxt_port_rpc_handler_t ready_handler; 23 nxt_port_rpc_handler_t error_handler; 24 void *data; 25 }; 26 27 28 static void 29 nxt_port_rpc_remove_from_peers(nxt_task_t *task, nxt_port_t *port, 30 nxt_port_rpc_reg_t *reg); 31 32 33 static nxt_int_t 34 nxt_rpc_reg_test(nxt_lvlhsh_query_t *lhq, void *data) 35 { 36 return NXT_OK; 37 } 38 39 40 static const nxt_lvlhsh_proto_t lvlhsh_rpc_reg_proto nxt_aligned(64) = { 41 NXT_LVLHSH_DEFAULT, 42 nxt_rpc_reg_test, 43 nxt_lvlhsh_alloc, 44 nxt_lvlhsh_free, 45 }; 46 47 48 nxt_inline void 49 nxt_port_rpc_lhq_stream(nxt_lvlhsh_query_t *lhq, uint32_t *stream) 50 { 51 lhq->key_hash = nxt_murmur_hash2(stream, sizeof(*stream)); 52 lhq->key.length = sizeof(*stream); 53 lhq->key.start = (u_char *) stream; 54 lhq->proto = &lvlhsh_rpc_reg_proto; 55 } 56 57 58 nxt_inline void 59 nxt_port_rpc_lhq_peer(nxt_lvlhsh_query_t *lhq, nxt_pid_t *peer) 60 { 61 lhq->key_hash = nxt_murmur_hash2(peer, sizeof(*peer)); 62 lhq->key.length = sizeof(*peer); 63 lhq->key.start = (u_char *) peer; 64 lhq->proto = &lvlhsh_rpc_reg_proto; 65 } 66 67 68 uint32_t 69 nxt_port_rpc_register_handler(nxt_task_t *task, nxt_port_t *port, 70 nxt_port_rpc_handler_t ready_handler, nxt_port_rpc_handler_t error_handler, 71 nxt_pid_t peer, void *data) 72 { 73 void *ex; 74 nxt_port_rpc_reg_t *reg; 75 76 ex = nxt_port_rpc_register_handler_ex(task, port, ready_handler, 77 error_handler, 0); 78 79 if (ex == NULL) { 80 return 0; 81 } 82 83 if (peer != -1) { 84 nxt_port_rpc_ex_set_peer(task, port, ex, peer); 85 } 86 87 reg = nxt_pointer_to(ex, -sizeof(nxt_port_rpc_reg_t)); 88 89 nxt_assert(reg->data == ex); 90 91 reg->data = data; 92 93 return reg->stream; 94 } 95 96 97 void * 98 nxt_port_rpc_register_handler_ex(nxt_task_t *task, nxt_port_t *port, 99 nxt_port_rpc_handler_t ready_handler, nxt_port_rpc_handler_t error_handler, 100 size_t ex_size) 101 { 102 uint32_t stream; 103 nxt_port_rpc_reg_t *reg; 104 nxt_lvlhsh_query_t lhq; 105 106 nxt_assert(port->pair[0] != -1); 107 108 stream = 109 (uint32_t) nxt_atomic_fetch_add(&nxt_stream_ident, 1) & 0x3fffffff; 110 111 reg = nxt_mp_zalloc(port->mem_pool, sizeof(nxt_port_rpc_reg_t) + ex_size); 112 113 if (nxt_slow_path(reg == NULL)) { 114 nxt_debug(task, "rpc: stream #%uD failed to allocate reg", stream); 115 116 return NULL; 117 } 118 119 reg->stream = stream; 120 reg->peer = -1; 121 reg->ready_handler = ready_handler; 122 reg->error_handler = error_handler; 123 reg->data = reg + 1; 124 125 nxt_port_rpc_lhq_stream(&lhq, &stream); 126 lhq.replace = 0; 127 lhq.value = reg; 128 lhq.pool = port->mem_pool; 129 130 switch (nxt_lvlhsh_insert(&port->rpc_streams, &lhq)) { 131 132 case NXT_OK: 133 break; 134 135 default: 136 nxt_log_error(NXT_LOG_ERR, task->log, "rpc: stream #%uD failed to add " 137 "reg ", stream); 138 139 nxt_mp_free(port->mem_pool, reg); 140 141 return NULL; 142 } 143 144 nxt_debug(task, "rpc: stream #%uD registered", stream); 145 146 return reg->data; 147 } 148 149 150 uint32_t 151 nxt_port_rpc_ex_stream(void *ex) 152 { 153 nxt_port_rpc_reg_t *reg; 154 155 reg = nxt_pointer_to(ex, -sizeof(nxt_port_rpc_reg_t)); 156 157 nxt_assert(reg->data == ex); 158 159 return reg->stream; 160 } 161 162 163 void 164 nxt_port_rpc_ex_set_peer(nxt_task_t *task, nxt_port_t *port, 165 void *ex, nxt_pid_t peer) 166 { 167 nxt_int_t ret; 168 nxt_queue_link_t *peer_link; 169 nxt_port_rpc_reg_t *reg; 170 nxt_lvlhsh_query_t lhq; 171 172 reg = nxt_pointer_to(ex, -sizeof(nxt_port_rpc_reg_t)); 173 174 nxt_assert(reg->data == ex); 175 176 if (nxt_slow_path(peer == reg->peer)) { 177 return; 178 } 179 180 if (reg->peer != -1) { 181 nxt_port_rpc_remove_from_peers(task, port, reg); 182 183 reg->peer = -1; 184 } 185 186 if (peer == -1) { 187 return; 188 } 189 190 reg->peer = peer; 191 192 nxt_port_rpc_lhq_peer(&lhq, &peer); 193 lhq.replace = 0; 194 lhq.value = ®->link; 195 lhq.pool = port->mem_pool; 196 197 ret = nxt_lvlhsh_insert(&port->rpc_peers, &lhq); 198 199 switch (ret) { 200 201 case NXT_OK: 202 reg->link_first = 1; 203 nxt_queue_self(®->link); 204 205 nxt_debug(task, "rpc: stream #%uD assigned uniq pid %PI (%p)", 206 reg->stream, reg->peer, reg->link.next); 207 break; 208 209 case NXT_DECLINED: 210 reg->link_first = 0; 211 peer_link = lhq.value; 212 nxt_queue_insert_after(peer_link, ®->link); 213 214 nxt_debug(task, "rpc: stream #%uD assigned duplicate pid %PI (%p)", 215 reg->stream, reg->peer, reg->link.next); 216 break; 217 218 default: 219 nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to add " 220 "peer for stream #%uD (%d)", reg->stream, ret); 221 222 reg->peer = -1; 223 break; 224 } 225 226 } 227 228 229 static void 230 nxt_port_rpc_remove_from_peers(nxt_task_t *task, nxt_port_t *port, 231 nxt_port_rpc_reg_t *reg) 232 { 233 uint32_t stream; 234 nxt_int_t ret; 235 nxt_lvlhsh_query_t lhq; 236 nxt_port_rpc_reg_t *r; 237 238 stream = reg->stream; 239 240 if (reg->link_first != 0) { 241 nxt_port_rpc_lhq_peer(&lhq, ®->peer); 242 lhq.pool = port->mem_pool; 243 244 if (reg->link.next == ®->link) { 245 nxt_assert(reg->link.prev == ®->link); 246 247 nxt_debug(task, "rpc: stream #%uD remove first and last pid %PI " 248 "registration (%p)", stream, reg->peer, reg->link.next); 249 250 ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq); 251 } else { 252 nxt_debug(task, "rpc: stream #%uD remove first pid %PI " 253 "registration (%p)", stream, reg->peer, reg->link.next); 254 255 lhq.replace = 1; 256 lhq.value = reg->link.next; 257 258 r = nxt_queue_link_data(reg->link.next, nxt_port_rpc_reg_t, link); 259 r->link_first = 1; 260 261 nxt_queue_remove(®->link); 262 263 ret = nxt_lvlhsh_insert(&port->rpc_peers, &lhq); 264 } 265 } else { 266 nxt_debug(task, "rpc: stream #%uD remove pid %PI " 267 "registration (%p)", stream, reg->peer, reg->link.next); 268 269 nxt_queue_remove(®->link); 270 ret = NXT_OK; 271 } 272 273 if (nxt_slow_path(ret != NXT_OK)) { 274 nxt_log_error(NXT_LOG_ERR, task->log, "rpc: stream #%uD failed" 275 " to delete peer %PI (%d)", stream, reg->peer, ret); 276 } 277 } 278 279 280 void 281 nxt_port_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 282 { 283 uint8_t last; 284 uint32_t stream; 285 nxt_int_t ret; 286 nxt_port_t *port; 287 nxt_port_rpc_reg_t *reg; 288 nxt_lvlhsh_query_t lhq; 289 nxt_port_msg_type_t type; 290 291 stream = msg->port_msg.stream; 292 port = msg->port; 293 last = msg->port_msg.last; 294 type = msg->port_msg.type; 295 296 nxt_port_rpc_lhq_stream(&lhq, &stream); 297 lhq.pool = port->mem_pool; 298 299 if (last != 0) { 300 ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq); 301 302 } else { 303 ret = nxt_lvlhsh_find(&port->rpc_streams, &lhq); 304 } 305 306 if (ret != NXT_OK) { 307 nxt_debug(task, "rpc: stream #%uD no handler found", stream); 308 309 return; 310 } 311 312 nxt_debug(task, "rpc: stream #%uD %shandler, type %d", stream, 313 (last ? "last " : ""), type); 314 315 reg = lhq.value; 316 317 if (type == _NXT_PORT_MSG_RPC_ERROR) { 318 reg->error_handler(task, msg, reg->data); 319 320 } else { 321 reg->ready_handler(task, msg, reg->data); 322 } 323 324 if (last == 0) { 325 return; 326 } 327 328 if (reg->peer != -1) { 329 nxt_port_rpc_remove_from_peers(task, port, reg); 330 } 331 332 nxt_debug(task, "rpc: stream #%uD free registration", stream); 333 334 nxt_mp_free(port->mem_pool, reg); 335 } 336 337 338 void 339 nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer) 340 { 341 uint8_t last; 342 uint32_t stream; 343 nxt_int_t ret; 344 nxt_buf_t buf; 345 nxt_queue_link_t *peer_link, *next_link; 346 nxt_port_rpc_reg_t *reg; 347 nxt_lvlhsh_query_t lhq; 348 nxt_port_recv_msg_t msg; 349 350 nxt_port_rpc_lhq_peer(&lhq, &peer); 351 lhq.pool = port->mem_pool; 352 353 ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq); 354 355 if (nxt_slow_path(ret != NXT_OK)) { 356 nxt_debug(task, "rpc: no reg found for peer %PI", peer); 357 358 return; 359 } 360 361 nxt_memzero(&msg, sizeof(msg)); 362 nxt_memzero(&buf, sizeof(buf)); 363 364 msg.fd = -1; 365 msg.buf = &buf; 366 msg.port = port; 367 368 msg.port_msg.pid = peer; 369 msg.port_msg.type = _NXT_PORT_MSG_REMOVE_PID; 370 371 peer_link = lhq.value; 372 last = 0; 373 374 while (last == 0) { 375 376 reg = nxt_queue_link_data(peer_link, nxt_port_rpc_reg_t, link); 377 378 nxt_assert(reg->peer == peer); 379 380 stream = reg->stream; 381 382 nxt_debug(task, "rpc: stream #%uD trigger error", stream); 383 384 msg.port_msg.stream = stream; 385 msg.port_msg.last = 1; 386 387 if (peer_link == peer_link->next) { 388 nxt_assert(peer_link->prev == peer_link); 389 390 last = 1; 391 392 } else { 393 nxt_assert(peer_link->next->prev == peer_link); 394 nxt_assert(peer_link->prev->next == peer_link); 395 396 next_link = peer_link->next; 397 nxt_queue_remove(peer_link); 398 399 peer_link = next_link; 400 } 401 402 reg->peer = -1; 403 404 reg->error_handler(task, &msg, reg->data); 405 406 /* Reset 'last' flag to preserve rpc handler. */ 407 if (msg.port_msg.last == 0) { 408 continue; 409 } 410 411 nxt_port_rpc_lhq_stream(&lhq, &stream); 412 lhq.pool = port->mem_pool; 413 414 ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq); 415 416 if (nxt_slow_path(ret != NXT_OK)) { 417 nxt_log_error(NXT_LOG_ERR, task->log, 418 "rpc: stream #%uD failed to delete handler", stream); 419 420 return; 421 } 422 423 nxt_mp_free(port->mem_pool, reg); 424 } 425 } 426 427 428 void 429 nxt_port_rpc_cancel(nxt_task_t *task, nxt_port_t *port, uint32_t stream) 430 { 431 nxt_int_t ret; 432 nxt_port_rpc_reg_t *reg; 433 nxt_lvlhsh_query_t lhq; 434 435 nxt_port_rpc_lhq_stream(&lhq, &stream); 436 lhq.pool = port->mem_pool; 437 438 ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq); 439 440 if (ret != NXT_OK) { 441 nxt_debug(task, "rpc: stream #%uD no handler found", stream); 442 443 return; 444 } 445 446 reg = lhq.value; 447 448 if (reg->peer != -1) { 449 nxt_port_rpc_remove_from_peers(task, port, reg); 450 } 451 452 nxt_debug(task, "rpc: stream #%uD cancel registration", stream); 453 454 nxt_mp_free(port->mem_pool, reg); 455 } 456