nxt_port_rpc.c (318:c2442f5e054d) nxt_port_rpc.c (425:1da949cf0a34)
1
2/*
3 * Copyright (C) Max Romanov
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8#include <nxt_port_rpc.h>

--- 11 unchanged lines hidden (view full) ---

20 nxt_bool_t link_first;
21
22 nxt_port_rpc_handler_t ready_handler;
23 nxt_port_rpc_handler_t error_handler;
24 void *data;
25};
26
27
1
2/*
3 * Copyright (C) Max Romanov
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_main.h>
8#include <nxt_port_rpc.h>

--- 11 unchanged lines hidden (view full) ---

20 nxt_bool_t link_first;
21
22 nxt_port_rpc_handler_t ready_handler;
23 nxt_port_rpc_handler_t error_handler;
24 void *data;
25};
26
27
28static void
29nxt_port_rpc_remove_from_peers(nxt_task_t *task, nxt_port_t *port,
30 nxt_port_rpc_reg_t *reg);
31
32
28static nxt_int_t
29nxt_rpc_reg_test(nxt_lvlhsh_query_t *lhq, void *data)
30{
31 return NXT_OK;
32}
33
34
35static const nxt_lvlhsh_proto_t lvlhsh_rpc_reg_proto nxt_aligned(64) = {

--- 127 unchanged lines hidden (view full) ---

163 nxt_queue_link_t *peer_link;
164 nxt_port_rpc_reg_t *reg;
165 nxt_lvlhsh_query_t lhq;
166
167 reg = nxt_pointer_to(ex, -sizeof(nxt_port_rpc_reg_t));
168
169 nxt_assert(reg->data == ex);
170
33static nxt_int_t
34nxt_rpc_reg_test(nxt_lvlhsh_query_t *lhq, void *data)
35{
36 return NXT_OK;
37}
38
39
40static const nxt_lvlhsh_proto_t lvlhsh_rpc_reg_proto nxt_aligned(64) = {

--- 127 unchanged lines hidden (view full) ---

168 nxt_queue_link_t *peer_link;
169 nxt_port_rpc_reg_t *reg;
170 nxt_lvlhsh_query_t lhq;
171
172 reg = nxt_pointer_to(ex, -sizeof(nxt_port_rpc_reg_t));
173
174 nxt_assert(reg->data == ex);
175
171 if (peer == -1 || reg->peer != -1) {
172 nxt_log_error(NXT_LOG_ERR, task->log, "rpc: stream #%uD failed to "
173 "change peer %PI->%PI", reg->stream, reg->peer, peer);
176 if (nxt_slow_path(peer == reg->peer)) {
177 return;
178 }
174
179
180 if (reg->peer != -1) {
181 nxt_port_rpc_remove_from_peers(task, port, reg);
182
183 reg->peer = -1;
184 }
185
186 if (peer == -1) {
175 return;
176 }
177
178 reg->peer = peer;
179
180 nxt_port_rpc_lhq_peer(&lhq, &peer);
181 lhq.replace = 0;
182 lhq.value = &reg->link;

--- 171 unchanged lines hidden (view full) ---

354 nxt_memzero(&buf, sizeof(buf));
355
356 msg.fd = -1;
357 msg.buf = &buf;
358 msg.port = port;
359
360 msg.port_msg.pid = peer;
361 msg.port_msg.type = _NXT_PORT_MSG_REMOVE_PID;
187 return;
188 }
189
190 reg->peer = peer;
191
192 nxt_port_rpc_lhq_peer(&lhq, &peer);
193 lhq.replace = 0;
194 lhq.value = &reg->link;

--- 171 unchanged lines hidden (view full) ---

366 nxt_memzero(&buf, sizeof(buf));
367
368 msg.fd = -1;
369 msg.buf = &buf;
370 msg.port = port;
371
372 msg.port_msg.pid = peer;
373 msg.port_msg.type = _NXT_PORT_MSG_REMOVE_PID;
362 msg.port_msg.last = 1;
363
364 peer_link = lhq.value;
365 last = 0;
366
367 while (last == 0) {
368
369 reg = nxt_queue_link_data(peer_link, nxt_port_rpc_reg_t, link);
370
371 nxt_assert(reg->peer == peer);
372
373 stream = reg->stream;
374
375 nxt_debug(task, "rpc: stream #%uD trigger error", stream);
376
377 msg.port_msg.stream = stream;
374
375 peer_link = lhq.value;
376 last = 0;
377
378 while (last == 0) {
379
380 reg = nxt_queue_link_data(peer_link, nxt_port_rpc_reg_t, link);
381
382 nxt_assert(reg->peer == peer);
383
384 stream = reg->stream;
385
386 nxt_debug(task, "rpc: stream #%uD trigger error", stream);
387
388 msg.port_msg.stream = stream;
389 msg.port_msg.last = 1;
378
390
379 reg->error_handler(task, &msg, reg->data);
380
381 nxt_port_rpc_lhq_stream(&lhq, &stream);
382 lhq.pool = port->mem_pool;
383
384 ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq);
385
386 if (nxt_slow_path(ret != NXT_OK)) {
387 nxt_log_error(NXT_LOG_ERR, task->log,
388 "rpc: stream #%uD failed to delete handler", stream);
389
390 return;
391 }
392
393 if (peer_link == peer_link->next) {
394 nxt_assert(peer_link->prev == peer_link);
395
396 last = 1;
397
398 } else {
399 nxt_assert(peer_link->next->prev == peer_link);
400 nxt_assert(peer_link->prev->next == peer_link);
401
402 next_link = peer_link->next;
403 nxt_queue_remove(peer_link);
404
405 peer_link = next_link;
406 }
407
391 if (peer_link == peer_link->next) {
392 nxt_assert(peer_link->prev == peer_link);
393
394 last = 1;
395
396 } else {
397 nxt_assert(peer_link->next->prev == peer_link);
398 nxt_assert(peer_link->prev->next == peer_link);
399
400 next_link = peer_link->next;
401 nxt_queue_remove(peer_link);
402
403 peer_link = next_link;
404 }
405
406 reg->peer = -1;
407
408 reg->error_handler(task, &msg, reg->data);
409
410 /* Reset 'last' flag to preserve rpc handler. */
411 if (msg.port_msg.last == 0) {
412 continue;
413 }
414
415 nxt_port_rpc_lhq_stream(&lhq, &stream);
416 lhq.pool = port->mem_pool;
417
418 ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq);
419
420 if (nxt_slow_path(ret != NXT_OK)) {
421 nxt_log_error(NXT_LOG_ERR, task->log,
422 "rpc: stream #%uD failed to delete handler", stream);
423
424 return;
425 }
426
408 nxt_mp_free(port->mem_pool, reg);
409 }
410}
411
412
413void
414nxt_port_rpc_cancel(nxt_task_t *task, nxt_port_t *port, uint32_t stream)
415{

--- 25 unchanged lines hidden ---
427 nxt_mp_free(port->mem_pool, reg);
428 }
429}
430
431
432void
433nxt_port_rpc_cancel(nxt_task_t *task, nxt_port_t *port, uint32_t stream)
434{

--- 25 unchanged lines hidden ---