1190Smax.romanov@nginx.com 2190Smax.romanov@nginx.com /* 3190Smax.romanov@nginx.com * Copyright (C) Max Romanov 4190Smax.romanov@nginx.com * Copyright (C) NGINX, Inc. 5190Smax.romanov@nginx.com */ 6190Smax.romanov@nginx.com 7190Smax.romanov@nginx.com #include <nxt_main.h> 8190Smax.romanov@nginx.com #include <nxt_port_rpc.h> 9190Smax.romanov@nginx.com 10190Smax.romanov@nginx.com 11318Smax.romanov@nginx.com static nxt_atomic_t nxt_stream_ident = 1; 12318Smax.romanov@nginx.com 13190Smax.romanov@nginx.com typedef struct nxt_port_rpc_reg_s nxt_port_rpc_reg_t; 14190Smax.romanov@nginx.com 15190Smax.romanov@nginx.com struct nxt_port_rpc_reg_s { 16190Smax.romanov@nginx.com uint32_t stream; 17190Smax.romanov@nginx.com 18190Smax.romanov@nginx.com nxt_pid_t peer; 19190Smax.romanov@nginx.com nxt_queue_link_t link; 20318Smax.romanov@nginx.com nxt_bool_t link_first; 21190Smax.romanov@nginx.com 22190Smax.romanov@nginx.com nxt_port_rpc_handler_t ready_handler; 23190Smax.romanov@nginx.com nxt_port_rpc_handler_t error_handler; 24190Smax.romanov@nginx.com void *data; 25190Smax.romanov@nginx.com }; 26190Smax.romanov@nginx.com 27190Smax.romanov@nginx.com 28425Smax.romanov@nginx.com static void 29425Smax.romanov@nginx.com nxt_port_rpc_remove_from_peers(nxt_task_t *task, nxt_port_t *port, 30425Smax.romanov@nginx.com nxt_port_rpc_reg_t *reg); 31425Smax.romanov@nginx.com 32425Smax.romanov@nginx.com 33190Smax.romanov@nginx.com static nxt_int_t 34190Smax.romanov@nginx.com nxt_rpc_reg_test(nxt_lvlhsh_query_t *lhq, void *data) 35190Smax.romanov@nginx.com { 36190Smax.romanov@nginx.com return NXT_OK; 37190Smax.romanov@nginx.com } 38190Smax.romanov@nginx.com 39190Smax.romanov@nginx.com 40190Smax.romanov@nginx.com static const nxt_lvlhsh_proto_t lvlhsh_rpc_reg_proto nxt_aligned(64) = { 41190Smax.romanov@nginx.com NXT_LVLHSH_DEFAULT, 42190Smax.romanov@nginx.com nxt_rpc_reg_test, 43190Smax.romanov@nginx.com nxt_lvlhsh_alloc, 44190Smax.romanov@nginx.com nxt_lvlhsh_free, 45190Smax.romanov@nginx.com }; 46190Smax.romanov@nginx.com 47190Smax.romanov@nginx.com 48190Smax.romanov@nginx.com nxt_inline void 49190Smax.romanov@nginx.com nxt_port_rpc_lhq_stream(nxt_lvlhsh_query_t *lhq, uint32_t *stream) 50190Smax.romanov@nginx.com { 51190Smax.romanov@nginx.com lhq->key_hash = nxt_murmur_hash2(stream, sizeof(*stream)); 52190Smax.romanov@nginx.com lhq->key.length = sizeof(*stream); 53190Smax.romanov@nginx.com lhq->key.start = (u_char *) stream; 54190Smax.romanov@nginx.com lhq->proto = &lvlhsh_rpc_reg_proto; 55190Smax.romanov@nginx.com } 56190Smax.romanov@nginx.com 57190Smax.romanov@nginx.com 58190Smax.romanov@nginx.com nxt_inline void 59190Smax.romanov@nginx.com nxt_port_rpc_lhq_peer(nxt_lvlhsh_query_t *lhq, nxt_pid_t *peer) 60190Smax.romanov@nginx.com { 61190Smax.romanov@nginx.com lhq->key_hash = nxt_murmur_hash2(peer, sizeof(*peer)); 62190Smax.romanov@nginx.com lhq->key.length = sizeof(*peer); 63190Smax.romanov@nginx.com lhq->key.start = (u_char *) peer; 64190Smax.romanov@nginx.com lhq->proto = &lvlhsh_rpc_reg_proto; 65190Smax.romanov@nginx.com } 66190Smax.romanov@nginx.com 67190Smax.romanov@nginx.com 68190Smax.romanov@nginx.com uint32_t 69190Smax.romanov@nginx.com nxt_port_rpc_register_handler(nxt_task_t *task, nxt_port_t *port, 70190Smax.romanov@nginx.com nxt_port_rpc_handler_t ready_handler, nxt_port_rpc_handler_t error_handler, 71190Smax.romanov@nginx.com nxt_pid_t peer, void *data) 72190Smax.romanov@nginx.com { 73318Smax.romanov@nginx.com void *ex; 74318Smax.romanov@nginx.com nxt_port_rpc_reg_t *reg; 75318Smax.romanov@nginx.com 76318Smax.romanov@nginx.com ex = nxt_port_rpc_register_handler_ex(task, port, ready_handler, 77318Smax.romanov@nginx.com error_handler, 0); 78318Smax.romanov@nginx.com 79318Smax.romanov@nginx.com if (ex == NULL) { 80318Smax.romanov@nginx.com return 0; 81318Smax.romanov@nginx.com } 82318Smax.romanov@nginx.com 83318Smax.romanov@nginx.com if (peer != -1) { 84318Smax.romanov@nginx.com nxt_port_rpc_ex_set_peer(task, port, ex, peer); 85318Smax.romanov@nginx.com } 86318Smax.romanov@nginx.com 87318Smax.romanov@nginx.com reg = nxt_pointer_to(ex, -sizeof(nxt_port_rpc_reg_t)); 88318Smax.romanov@nginx.com 89318Smax.romanov@nginx.com nxt_assert(reg->data == ex); 90318Smax.romanov@nginx.com 91318Smax.romanov@nginx.com reg->data = data; 92318Smax.romanov@nginx.com 93318Smax.romanov@nginx.com return reg->stream; 94318Smax.romanov@nginx.com } 95318Smax.romanov@nginx.com 96318Smax.romanov@nginx.com 97318Smax.romanov@nginx.com void * 98318Smax.romanov@nginx.com nxt_port_rpc_register_handler_ex(nxt_task_t *task, nxt_port_t *port, 99318Smax.romanov@nginx.com nxt_port_rpc_handler_t ready_handler, nxt_port_rpc_handler_t error_handler, 100318Smax.romanov@nginx.com size_t ex_size) 101318Smax.romanov@nginx.com { 102190Smax.romanov@nginx.com uint32_t stream; 103190Smax.romanov@nginx.com nxt_port_rpc_reg_t *reg; 104190Smax.romanov@nginx.com nxt_lvlhsh_query_t lhq; 105190Smax.romanov@nginx.com 106277Sigor@sysoev.ru nxt_assert(port->pair[0] != -1); 107190Smax.romanov@nginx.com 108318Smax.romanov@nginx.com stream = 109611Svbart@nginx.com (uint32_t) nxt_atomic_fetch_add(&nxt_stream_ident, 1) & 0x3FFFFFFF; 110190Smax.romanov@nginx.com 111318Smax.romanov@nginx.com reg = nxt_mp_zalloc(port->mem_pool, sizeof(nxt_port_rpc_reg_t) + ex_size); 112190Smax.romanov@nginx.com 113190Smax.romanov@nginx.com if (nxt_slow_path(reg == NULL)) { 114318Smax.romanov@nginx.com nxt_debug(task, "rpc: stream #%uD failed to allocate reg", stream); 115190Smax.romanov@nginx.com 116318Smax.romanov@nginx.com return NULL; 117190Smax.romanov@nginx.com } 118190Smax.romanov@nginx.com 119190Smax.romanov@nginx.com reg->stream = stream; 120318Smax.romanov@nginx.com reg->peer = -1; 121190Smax.romanov@nginx.com reg->ready_handler = ready_handler; 122190Smax.romanov@nginx.com reg->error_handler = error_handler; 123318Smax.romanov@nginx.com reg->data = reg + 1; 124190Smax.romanov@nginx.com 125190Smax.romanov@nginx.com nxt_port_rpc_lhq_stream(&lhq, &stream); 126190Smax.romanov@nginx.com lhq.replace = 0; 127190Smax.romanov@nginx.com lhq.value = reg; 128190Smax.romanov@nginx.com lhq.pool = port->mem_pool; 129190Smax.romanov@nginx.com 130190Smax.romanov@nginx.com switch (nxt_lvlhsh_insert(&port->rpc_streams, &lhq)) { 131190Smax.romanov@nginx.com 132190Smax.romanov@nginx.com case NXT_OK: 133190Smax.romanov@nginx.com break; 134190Smax.romanov@nginx.com 135190Smax.romanov@nginx.com default: 136318Smax.romanov@nginx.com nxt_log_error(NXT_LOG_ERR, task->log, "rpc: stream #%uD failed to add " 137318Smax.romanov@nginx.com "reg ", stream); 138190Smax.romanov@nginx.com 139190Smax.romanov@nginx.com nxt_mp_free(port->mem_pool, reg); 140190Smax.romanov@nginx.com 141318Smax.romanov@nginx.com return NULL; 142318Smax.romanov@nginx.com } 143318Smax.romanov@nginx.com 144318Smax.romanov@nginx.com nxt_debug(task, "rpc: stream #%uD registered", stream); 145318Smax.romanov@nginx.com 146582Smax.romanov@nginx.com nxt_port_inc_use(port); 147582Smax.romanov@nginx.com 148318Smax.romanov@nginx.com return reg->data; 149318Smax.romanov@nginx.com } 150318Smax.romanov@nginx.com 151318Smax.romanov@nginx.com 152318Smax.romanov@nginx.com uint32_t 153318Smax.romanov@nginx.com nxt_port_rpc_ex_stream(void *ex) 154318Smax.romanov@nginx.com { 155318Smax.romanov@nginx.com nxt_port_rpc_reg_t *reg; 156318Smax.romanov@nginx.com 157318Smax.romanov@nginx.com reg = nxt_pointer_to(ex, -sizeof(nxt_port_rpc_reg_t)); 158318Smax.romanov@nginx.com 159318Smax.romanov@nginx.com nxt_assert(reg->data == ex); 160318Smax.romanov@nginx.com 161318Smax.romanov@nginx.com return reg->stream; 162318Smax.romanov@nginx.com } 163318Smax.romanov@nginx.com 164318Smax.romanov@nginx.com 165318Smax.romanov@nginx.com void 166318Smax.romanov@nginx.com nxt_port_rpc_ex_set_peer(nxt_task_t *task, nxt_port_t *port, 167318Smax.romanov@nginx.com void *ex, nxt_pid_t peer) 168318Smax.romanov@nginx.com { 169318Smax.romanov@nginx.com nxt_int_t ret; 170318Smax.romanov@nginx.com nxt_queue_link_t *peer_link; 171318Smax.romanov@nginx.com nxt_port_rpc_reg_t *reg; 172318Smax.romanov@nginx.com nxt_lvlhsh_query_t lhq; 173318Smax.romanov@nginx.com 174318Smax.romanov@nginx.com reg = nxt_pointer_to(ex, -sizeof(nxt_port_rpc_reg_t)); 175318Smax.romanov@nginx.com 176318Smax.romanov@nginx.com nxt_assert(reg->data == ex); 177318Smax.romanov@nginx.com 178425Smax.romanov@nginx.com if (nxt_slow_path(peer == reg->peer)) { 179425Smax.romanov@nginx.com return; 180425Smax.romanov@nginx.com } 181318Smax.romanov@nginx.com 182425Smax.romanov@nginx.com if (reg->peer != -1) { 183425Smax.romanov@nginx.com nxt_port_rpc_remove_from_peers(task, port, reg); 184425Smax.romanov@nginx.com 185425Smax.romanov@nginx.com reg->peer = -1; 186425Smax.romanov@nginx.com } 187425Smax.romanov@nginx.com 188425Smax.romanov@nginx.com if (peer == -1) { 189318Smax.romanov@nginx.com return; 190190Smax.romanov@nginx.com } 191190Smax.romanov@nginx.com 192318Smax.romanov@nginx.com reg->peer = peer; 193318Smax.romanov@nginx.com 194318Smax.romanov@nginx.com nxt_port_rpc_lhq_peer(&lhq, &peer); 195318Smax.romanov@nginx.com lhq.replace = 0; 196318Smax.romanov@nginx.com lhq.value = ®->link; 197318Smax.romanov@nginx.com lhq.pool = port->mem_pool; 198318Smax.romanov@nginx.com 199318Smax.romanov@nginx.com ret = nxt_lvlhsh_insert(&port->rpc_peers, &lhq); 200318Smax.romanov@nginx.com 201318Smax.romanov@nginx.com switch (ret) { 202318Smax.romanov@nginx.com 203318Smax.romanov@nginx.com case NXT_OK: 204318Smax.romanov@nginx.com reg->link_first = 1; 205318Smax.romanov@nginx.com nxt_queue_self(®->link); 206318Smax.romanov@nginx.com 207318Smax.romanov@nginx.com nxt_debug(task, "rpc: stream #%uD assigned uniq pid %PI (%p)", 208318Smax.romanov@nginx.com reg->stream, reg->peer, reg->link.next); 209318Smax.romanov@nginx.com break; 210318Smax.romanov@nginx.com 211318Smax.romanov@nginx.com case NXT_DECLINED: 212318Smax.romanov@nginx.com reg->link_first = 0; 213318Smax.romanov@nginx.com peer_link = lhq.value; 214318Smax.romanov@nginx.com nxt_queue_insert_after(peer_link, ®->link); 215318Smax.romanov@nginx.com 216318Smax.romanov@nginx.com nxt_debug(task, "rpc: stream #%uD assigned duplicate pid %PI (%p)", 217318Smax.romanov@nginx.com reg->stream, reg->peer, reg->link.next); 218318Smax.romanov@nginx.com break; 219318Smax.romanov@nginx.com 220318Smax.romanov@nginx.com default: 221494Spluknet@nginx.com nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to add " 222318Smax.romanov@nginx.com "peer for stream #%uD (%d)", reg->stream, ret); 223318Smax.romanov@nginx.com 224318Smax.romanov@nginx.com reg->peer = -1; 225318Smax.romanov@nginx.com break; 226318Smax.romanov@nginx.com } 227318Smax.romanov@nginx.com 228318Smax.romanov@nginx.com } 229318Smax.romanov@nginx.com 230318Smax.romanov@nginx.com 231318Smax.romanov@nginx.com static void 232318Smax.romanov@nginx.com nxt_port_rpc_remove_from_peers(nxt_task_t *task, nxt_port_t *port, 233318Smax.romanov@nginx.com nxt_port_rpc_reg_t *reg) 234318Smax.romanov@nginx.com { 235318Smax.romanov@nginx.com uint32_t stream; 236318Smax.romanov@nginx.com nxt_int_t ret; 237318Smax.romanov@nginx.com nxt_lvlhsh_query_t lhq; 238318Smax.romanov@nginx.com nxt_port_rpc_reg_t *r; 239318Smax.romanov@nginx.com 240318Smax.romanov@nginx.com stream = reg->stream; 241318Smax.romanov@nginx.com 242318Smax.romanov@nginx.com if (reg->link_first != 0) { 243318Smax.romanov@nginx.com nxt_port_rpc_lhq_peer(&lhq, ®->peer); 244204Smax.romanov@nginx.com lhq.pool = port->mem_pool; 245190Smax.romanov@nginx.com 246318Smax.romanov@nginx.com if (reg->link.next == ®->link) { 247318Smax.romanov@nginx.com nxt_assert(reg->link.prev == ®->link); 248318Smax.romanov@nginx.com 249318Smax.romanov@nginx.com nxt_debug(task, "rpc: stream #%uD remove first and last pid %PI " 250318Smax.romanov@nginx.com "registration (%p)", stream, reg->peer, reg->link.next); 251190Smax.romanov@nginx.com 252318Smax.romanov@nginx.com ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq); 253*613Svbart@nginx.com 254318Smax.romanov@nginx.com } else { 255318Smax.romanov@nginx.com nxt_debug(task, "rpc: stream #%uD remove first pid %PI " 256318Smax.romanov@nginx.com "registration (%p)", stream, reg->peer, reg->link.next); 257318Smax.romanov@nginx.com 258318Smax.romanov@nginx.com lhq.replace = 1; 259318Smax.romanov@nginx.com lhq.value = reg->link.next; 260190Smax.romanov@nginx.com 261318Smax.romanov@nginx.com r = nxt_queue_link_data(reg->link.next, nxt_port_rpc_reg_t, link); 262318Smax.romanov@nginx.com r->link_first = 1; 263318Smax.romanov@nginx.com 264318Smax.romanov@nginx.com nxt_queue_remove(®->link); 265190Smax.romanov@nginx.com 266318Smax.romanov@nginx.com ret = nxt_lvlhsh_insert(&port->rpc_peers, &lhq); 267204Smax.romanov@nginx.com } 268*613Svbart@nginx.com 269318Smax.romanov@nginx.com } else { 270318Smax.romanov@nginx.com nxt_debug(task, "rpc: stream #%uD remove pid %PI " 271318Smax.romanov@nginx.com "registration (%p)", stream, reg->peer, reg->link.next); 272318Smax.romanov@nginx.com 273318Smax.romanov@nginx.com nxt_queue_remove(®->link); 274318Smax.romanov@nginx.com ret = NXT_OK; 275190Smax.romanov@nginx.com } 276190Smax.romanov@nginx.com 277318Smax.romanov@nginx.com if (nxt_slow_path(ret != NXT_OK)) { 278318Smax.romanov@nginx.com nxt_log_error(NXT_LOG_ERR, task->log, "rpc: stream #%uD failed" 279318Smax.romanov@nginx.com " to delete peer %PI (%d)", stream, reg->peer, ret); 280318Smax.romanov@nginx.com } 281190Smax.romanov@nginx.com } 282190Smax.romanov@nginx.com 283190Smax.romanov@nginx.com 284190Smax.romanov@nginx.com void 285190Smax.romanov@nginx.com nxt_port_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) 286190Smax.romanov@nginx.com { 287190Smax.romanov@nginx.com uint8_t last; 288190Smax.romanov@nginx.com uint32_t stream; 289190Smax.romanov@nginx.com nxt_int_t ret; 290190Smax.romanov@nginx.com nxt_port_t *port; 291190Smax.romanov@nginx.com nxt_port_rpc_reg_t *reg; 292190Smax.romanov@nginx.com nxt_lvlhsh_query_t lhq; 293190Smax.romanov@nginx.com nxt_port_msg_type_t type; 294190Smax.romanov@nginx.com 295190Smax.romanov@nginx.com stream = msg->port_msg.stream; 296190Smax.romanov@nginx.com port = msg->port; 297190Smax.romanov@nginx.com last = msg->port_msg.last; 298190Smax.romanov@nginx.com type = msg->port_msg.type; 299190Smax.romanov@nginx.com 300190Smax.romanov@nginx.com nxt_port_rpc_lhq_stream(&lhq, &stream); 301190Smax.romanov@nginx.com lhq.pool = port->mem_pool; 302190Smax.romanov@nginx.com 303190Smax.romanov@nginx.com if (last != 0) { 304190Smax.romanov@nginx.com ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq); 305277Sigor@sysoev.ru 306190Smax.romanov@nginx.com } else { 307190Smax.romanov@nginx.com ret = nxt_lvlhsh_find(&port->rpc_streams, &lhq); 308190Smax.romanov@nginx.com } 309190Smax.romanov@nginx.com 310190Smax.romanov@nginx.com if (ret != NXT_OK) { 311318Smax.romanov@nginx.com nxt_debug(task, "rpc: stream #%uD no handler found", stream); 312190Smax.romanov@nginx.com 313190Smax.romanov@nginx.com return; 314190Smax.romanov@nginx.com } 315190Smax.romanov@nginx.com 316318Smax.romanov@nginx.com nxt_debug(task, "rpc: stream #%uD %shandler, type %d", stream, 317318Smax.romanov@nginx.com (last ? "last " : ""), type); 318318Smax.romanov@nginx.com 319190Smax.romanov@nginx.com reg = lhq.value; 320190Smax.romanov@nginx.com 321190Smax.romanov@nginx.com if (type == _NXT_PORT_MSG_RPC_ERROR) { 322190Smax.romanov@nginx.com reg->error_handler(task, msg, reg->data); 323277Sigor@sysoev.ru 324190Smax.romanov@nginx.com } else { 325190Smax.romanov@nginx.com reg->ready_handler(task, msg, reg->data); 326190Smax.romanov@nginx.com } 327190Smax.romanov@nginx.com 328190Smax.romanov@nginx.com if (last == 0) { 329190Smax.romanov@nginx.com return; 330190Smax.romanov@nginx.com } 331190Smax.romanov@nginx.com 332204Smax.romanov@nginx.com if (reg->peer != -1) { 333318Smax.romanov@nginx.com nxt_port_rpc_remove_from_peers(task, port, reg); 334318Smax.romanov@nginx.com } 335190Smax.romanov@nginx.com 336318Smax.romanov@nginx.com nxt_debug(task, "rpc: stream #%uD free registration", stream); 337190Smax.romanov@nginx.com 338190Smax.romanov@nginx.com nxt_mp_free(port->mem_pool, reg); 339582Smax.romanov@nginx.com 340582Smax.romanov@nginx.com nxt_port_use(task, port, -1); 341190Smax.romanov@nginx.com } 342190Smax.romanov@nginx.com 343190Smax.romanov@nginx.com 344190Smax.romanov@nginx.com void 345190Smax.romanov@nginx.com nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer) 346190Smax.romanov@nginx.com { 347190Smax.romanov@nginx.com uint8_t last; 348190Smax.romanov@nginx.com uint32_t stream; 349190Smax.romanov@nginx.com nxt_int_t ret; 350190Smax.romanov@nginx.com nxt_buf_t buf; 351318Smax.romanov@nginx.com nxt_queue_link_t *peer_link, *next_link; 352190Smax.romanov@nginx.com nxt_port_rpc_reg_t *reg; 353190Smax.romanov@nginx.com nxt_lvlhsh_query_t lhq; 354190Smax.romanov@nginx.com nxt_port_recv_msg_t msg; 355190Smax.romanov@nginx.com 356190Smax.romanov@nginx.com nxt_port_rpc_lhq_peer(&lhq, &peer); 357190Smax.romanov@nginx.com lhq.pool = port->mem_pool; 358190Smax.romanov@nginx.com 359190Smax.romanov@nginx.com ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq); 360190Smax.romanov@nginx.com 361190Smax.romanov@nginx.com if (nxt_slow_path(ret != NXT_OK)) { 362318Smax.romanov@nginx.com nxt_debug(task, "rpc: no reg found for peer %PI", peer); 363190Smax.romanov@nginx.com 364190Smax.romanov@nginx.com return; 365190Smax.romanov@nginx.com } 366190Smax.romanov@nginx.com 367190Smax.romanov@nginx.com nxt_memzero(&msg, sizeof(msg)); 368190Smax.romanov@nginx.com nxt_memzero(&buf, sizeof(buf)); 369190Smax.romanov@nginx.com 370190Smax.romanov@nginx.com msg.fd = -1; 371190Smax.romanov@nginx.com msg.buf = &buf; 372190Smax.romanov@nginx.com msg.port = port; 373190Smax.romanov@nginx.com 374190Smax.romanov@nginx.com msg.port_msg.pid = peer; 375190Smax.romanov@nginx.com msg.port_msg.type = _NXT_PORT_MSG_REMOVE_PID; 376190Smax.romanov@nginx.com 377190Smax.romanov@nginx.com peer_link = lhq.value; 378190Smax.romanov@nginx.com last = 0; 379190Smax.romanov@nginx.com 380190Smax.romanov@nginx.com while (last == 0) { 381190Smax.romanov@nginx.com 382190Smax.romanov@nginx.com reg = nxt_queue_link_data(peer_link, nxt_port_rpc_reg_t, link); 383190Smax.romanov@nginx.com 384318Smax.romanov@nginx.com nxt_assert(reg->peer == peer); 385318Smax.romanov@nginx.com 386318Smax.romanov@nginx.com stream = reg->stream; 387190Smax.romanov@nginx.com 388318Smax.romanov@nginx.com nxt_debug(task, "rpc: stream #%uD trigger error", stream); 389318Smax.romanov@nginx.com 390318Smax.romanov@nginx.com msg.port_msg.stream = stream; 391425Smax.romanov@nginx.com msg.port_msg.last = 1; 392190Smax.romanov@nginx.com 393190Smax.romanov@nginx.com if (peer_link == peer_link->next) { 394318Smax.romanov@nginx.com nxt_assert(peer_link->prev == peer_link); 395318Smax.romanov@nginx.com 396190Smax.romanov@nginx.com last = 1; 397190Smax.romanov@nginx.com 398190Smax.romanov@nginx.com } else { 399318Smax.romanov@nginx.com nxt_assert(peer_link->next->prev == peer_link); 400318Smax.romanov@nginx.com nxt_assert(peer_link->prev->next == peer_link); 401318Smax.romanov@nginx.com 402318Smax.romanov@nginx.com next_link = peer_link->next; 403318Smax.romanov@nginx.com nxt_queue_remove(peer_link); 404318Smax.romanov@nginx.com 405318Smax.romanov@nginx.com peer_link = next_link; 406190Smax.romanov@nginx.com } 407190Smax.romanov@nginx.com 408425Smax.romanov@nginx.com reg->peer = -1; 409425Smax.romanov@nginx.com 410425Smax.romanov@nginx.com reg->error_handler(task, &msg, reg->data); 411425Smax.romanov@nginx.com 412425Smax.romanov@nginx.com /* Reset 'last' flag to preserve rpc handler. */ 413425Smax.romanov@nginx.com if (msg.port_msg.last == 0) { 414425Smax.romanov@nginx.com continue; 415425Smax.romanov@nginx.com } 416425Smax.romanov@nginx.com 417425Smax.romanov@nginx.com nxt_port_rpc_lhq_stream(&lhq, &stream); 418425Smax.romanov@nginx.com lhq.pool = port->mem_pool; 419425Smax.romanov@nginx.com 420425Smax.romanov@nginx.com ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq); 421425Smax.romanov@nginx.com 422425Smax.romanov@nginx.com if (nxt_slow_path(ret != NXT_OK)) { 423425Smax.romanov@nginx.com nxt_log_error(NXT_LOG_ERR, task->log, 424425Smax.romanov@nginx.com "rpc: stream #%uD failed to delete handler", stream); 425425Smax.romanov@nginx.com 426425Smax.romanov@nginx.com return; 427425Smax.romanov@nginx.com } 428425Smax.romanov@nginx.com 429190Smax.romanov@nginx.com nxt_mp_free(port->mem_pool, reg); 430582Smax.romanov@nginx.com 431582Smax.romanov@nginx.com nxt_port_use(task, port, -1); 432190Smax.romanov@nginx.com } 433190Smax.romanov@nginx.com } 434190Smax.romanov@nginx.com 435190Smax.romanov@nginx.com 436190Smax.romanov@nginx.com void 437190Smax.romanov@nginx.com nxt_port_rpc_cancel(nxt_task_t *task, nxt_port_t *port, uint32_t stream) 438190Smax.romanov@nginx.com { 439190Smax.romanov@nginx.com nxt_int_t ret; 440190Smax.romanov@nginx.com nxt_port_rpc_reg_t *reg; 441190Smax.romanov@nginx.com nxt_lvlhsh_query_t lhq; 442190Smax.romanov@nginx.com 443190Smax.romanov@nginx.com nxt_port_rpc_lhq_stream(&lhq, &stream); 444190Smax.romanov@nginx.com lhq.pool = port->mem_pool; 445190Smax.romanov@nginx.com 446190Smax.romanov@nginx.com ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq); 447190Smax.romanov@nginx.com 448190Smax.romanov@nginx.com if (ret != NXT_OK) { 449318Smax.romanov@nginx.com nxt_debug(task, "rpc: stream #%uD no handler found", stream); 450190Smax.romanov@nginx.com 451190Smax.romanov@nginx.com return; 452190Smax.romanov@nginx.com } 453190Smax.romanov@nginx.com 454190Smax.romanov@nginx.com reg = lhq.value; 455190Smax.romanov@nginx.com 456204Smax.romanov@nginx.com if (reg->peer != -1) { 457318Smax.romanov@nginx.com nxt_port_rpc_remove_from_peers(task, port, reg); 458318Smax.romanov@nginx.com } 459190Smax.romanov@nginx.com 460318Smax.romanov@nginx.com nxt_debug(task, "rpc: stream #%uD cancel registration", stream); 461190Smax.romanov@nginx.com 462190Smax.romanov@nginx.com nxt_mp_free(port->mem_pool, reg); 463582Smax.romanov@nginx.com 464582Smax.romanov@nginx.com nxt_port_use(task, port, -1); 465190Smax.romanov@nginx.com } 466583Smax.romanov@nginx.com 467583Smax.romanov@nginx.com static nxt_buf_t nxt_port_close_dummy_buf; 468583Smax.romanov@nginx.com 469583Smax.romanov@nginx.com void 470583Smax.romanov@nginx.com nxt_port_rpc_close(nxt_task_t *task, nxt_port_t *port) 471583Smax.romanov@nginx.com { 472583Smax.romanov@nginx.com nxt_port_rpc_reg_t *reg; 473583Smax.romanov@nginx.com nxt_port_recv_msg_t msg; 474583Smax.romanov@nginx.com 475583Smax.romanov@nginx.com for ( ;; ) { 476596Sigor@sysoev.ru reg = nxt_lvlhsh_peek(&port->rpc_streams, &lvlhsh_rpc_reg_proto); 477583Smax.romanov@nginx.com if (reg == NULL) { 478583Smax.romanov@nginx.com return; 479583Smax.romanov@nginx.com } 480583Smax.romanov@nginx.com 481583Smax.romanov@nginx.com msg.fd = -1; 482583Smax.romanov@nginx.com msg.buf = &nxt_port_close_dummy_buf; 483583Smax.romanov@nginx.com msg.port = port; 484583Smax.romanov@nginx.com msg.port_msg.stream = reg->stream; 485583Smax.romanov@nginx.com msg.port_msg.pid = nxt_pid; 486583Smax.romanov@nginx.com msg.port_msg.type = _NXT_PORT_MSG_RPC_ERROR; 487583Smax.romanov@nginx.com msg.port_msg.last = 1; 488583Smax.romanov@nginx.com msg.port_msg.mmap = 0; 489583Smax.romanov@nginx.com msg.port_msg.nf = 0; 490583Smax.romanov@nginx.com msg.port_msg.mf = 0; 491583Smax.romanov@nginx.com msg.port_msg.tracking = 0; 492583Smax.romanov@nginx.com msg.size = sizeof(msg.port_msg); 493583Smax.romanov@nginx.com msg.cancelled = 0; 494583Smax.romanov@nginx.com msg.u.data = NULL; 495583Smax.romanov@nginx.com 496583Smax.romanov@nginx.com nxt_port_rpc_handler(task, &msg); 497583Smax.romanov@nginx.com } 498583Smax.romanov@nginx.com } 499