Deleted
Added
1 2/* 3 * Copyright (C) Max Romanov 4 * Copyright (C) NGINX, Inc. 5 */ 6 7#include <nxt_main.h> 8#include <nxt_port_rpc.h> --- 11 unchanged lines hidden (view full) --- 20 nxt_bool_t link_first; 21 22 nxt_port_rpc_handler_t ready_handler; 23 nxt_port_rpc_handler_t error_handler; 24 void *data; 25}; 26 27 |
28static void 29nxt_port_rpc_remove_from_peers(nxt_task_t *task, nxt_port_t *port, 30 nxt_port_rpc_reg_t *reg); 31 32 |
33static nxt_int_t 34nxt_rpc_reg_test(nxt_lvlhsh_query_t *lhq, void *data) 35{ 36 return NXT_OK; 37} 38 39 40static const nxt_lvlhsh_proto_t lvlhsh_rpc_reg_proto nxt_aligned(64) = { --- 127 unchanged lines hidden (view full) --- 168 nxt_queue_link_t *peer_link; 169 nxt_port_rpc_reg_t *reg; 170 nxt_lvlhsh_query_t lhq; 171 172 reg = nxt_pointer_to(ex, -sizeof(nxt_port_rpc_reg_t)); 173 174 nxt_assert(reg->data == ex); 175 |
176 if (nxt_slow_path(peer == reg->peer)) { 177 return; 178 } |
179 |
180 if (reg->peer != -1) { 181 nxt_port_rpc_remove_from_peers(task, port, reg); 182 183 reg->peer = -1; 184 } 185 186 if (peer == -1) { |
187 return; 188 } 189 190 reg->peer = peer; 191 192 nxt_port_rpc_lhq_peer(&lhq, &peer); 193 lhq.replace = 0; 194 lhq.value = ®->link; --- 171 unchanged lines hidden (view full) --- 366 nxt_memzero(&buf, sizeof(buf)); 367 368 msg.fd = -1; 369 msg.buf = &buf; 370 msg.port = port; 371 372 msg.port_msg.pid = peer; 373 msg.port_msg.type = _NXT_PORT_MSG_REMOVE_PID; |
374 375 peer_link = lhq.value; 376 last = 0; 377 378 while (last == 0) { 379 380 reg = nxt_queue_link_data(peer_link, nxt_port_rpc_reg_t, link); 381 382 nxt_assert(reg->peer == peer); 383 384 stream = reg->stream; 385 386 nxt_debug(task, "rpc: stream #%uD trigger error", stream); 387 388 msg.port_msg.stream = stream; |
389 msg.port_msg.last = 1; |
390 |
391 if (peer_link == peer_link->next) { 392 nxt_assert(peer_link->prev == peer_link); 393 394 last = 1; 395 396 } else { 397 nxt_assert(peer_link->next->prev == peer_link); 398 nxt_assert(peer_link->prev->next == peer_link); 399 400 next_link = peer_link->next; 401 nxt_queue_remove(peer_link); 402 403 peer_link = next_link; 404 } 405 |
406 reg->peer = -1; 407 408 reg->error_handler(task, &msg, reg->data); 409 410 /* Reset 'last' flag to preserve rpc handler. */ 411 if (msg.port_msg.last == 0) { 412 continue; 413 } 414 415 nxt_port_rpc_lhq_stream(&lhq, &stream); 416 lhq.pool = port->mem_pool; 417 418 ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq); 419 420 if (nxt_slow_path(ret != NXT_OK)) { 421 nxt_log_error(NXT_LOG_ERR, task->log, 422 "rpc: stream #%uD failed to delete handler", stream); 423 424 return; 425 } 426 |
427 nxt_mp_free(port->mem_pool, reg); 428 } 429} 430 431 432void 433nxt_port_rpc_cancel(nxt_task_t *task, nxt_port_t *port, uint32_t stream) 434{ --- 25 unchanged lines hidden --- |