nxt_router.c (346:88444c3b5dff) nxt_router.c (347:e14011f5f005)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>

--- 48 unchanged lines hidden (view full) ---

57
58
59typedef struct {
60 nxt_socket_conf_t *socket_conf;
61 nxt_router_temp_conf_t *temp_conf;
62} nxt_socket_rpc_t;
63
64
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>

--- 48 unchanged lines hidden (view full) ---

57
58
59typedef struct {
60 nxt_socket_conf_t *socket_conf;
61 nxt_router_temp_conf_t *temp_conf;
62} nxt_socket_rpc_t;
63
64
65typedef struct {
66 nxt_mp_t *mem_pool;
67 nxt_port_recv_msg_t msg;
68 nxt_work_t work;
69} nxt_remove_pid_msg_t;
70
71
72static nxt_int_t nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app);
73
74static void nxt_router_ra_error(nxt_task_t *task, nxt_req_app_link_t *ra,
75 int code, const char* str);
76
65static nxt_int_t nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app);
66
67static void nxt_router_ra_error(nxt_task_t *task, nxt_req_app_link_t *ra,
68 int code, const char* str);
69
77static void nxt_router_worker_remove_pid_handler(nxt_task_t *task, void *obj,
78 void *data);
79static void nxt_router_worker_remove_pid_done(nxt_task_t *task, void *obj,
80 void *data);
81
82static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
83static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
84static void nxt_router_conf_ready(nxt_task_t *task,
85 nxt_router_temp_conf_t *tmcf);
86static void nxt_router_conf_error(nxt_task_t *task,
87 nxt_router_temp_conf_t *tmcf);
88static void nxt_router_conf_send(nxt_task_t *task,
89 nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);

--- 477 unchanged lines hidden (view full) ---

567nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
568{
569 nxt_port_new_port_handler(task, msg);
570
571 if (msg->port_msg.stream == 0) {
572 return;
573 }
574
70static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
71static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
72static void nxt_router_conf_ready(nxt_task_t *task,
73 nxt_router_temp_conf_t *tmcf);
74static void nxt_router_conf_error(nxt_task_t *task,
75 nxt_router_temp_conf_t *tmcf);
76static void nxt_router_conf_send(nxt_task_t *task,
77 nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);

--- 477 unchanged lines hidden (view full) ---

555nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
556{
557 nxt_port_new_port_handler(task, msg);
558
559 if (msg->port_msg.stream == 0) {
560 return;
561 }
562
575 if (msg->new_port == NULL || msg->new_port->type != NXT_PROCESS_WORKER) {
563 if (msg->u.new_port == NULL ||
564 msg->u.new_port->type != NXT_PROCESS_WORKER)
565 {
576 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
577 }
578
579 nxt_port_rpc_handler(task, msg);
580}
581
582
583void

--- 32 unchanged lines hidden (view full) ---

616 nxt_router_conf_apply(task, tmcf, NULL);
617
618 } else {
619 nxt_router_conf_error(task, tmcf);
620 }
621}
622
623
566 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
567 }
568
569 nxt_port_rpc_handler(task, msg);
570}
571
572
573void

--- 32 unchanged lines hidden (view full) ---

606 nxt_router_conf_apply(task, tmcf, NULL);
607
608 } else {
609 nxt_router_conf_error(task, tmcf);
610 }
611}
612
613
614static void
615nxt_router_worker_remove_pid(nxt_task_t *task, nxt_port_t *port, void *data)
616{
617 union {
618 nxt_pid_t removed_pid;
619 void *data;
620 } u;
621
622 u.data = data;
623
624 nxt_port_rpc_remove_peer(task, port, u.removed_pid);
625}
626
627
624void
625nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
626{
628void
629nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
630{
627 nxt_mp_t *mp;
628 nxt_buf_t *buf;
629 nxt_event_engine_t *engine;
630 nxt_remove_pid_msg_t *rp;
631 nxt_event_engine_t *engine;
631
632 nxt_port_remove_pid_handler(task, msg);
633
634 if (msg->port_msg.stream == 0) {
635 return;
636 }
637
632
633 nxt_port_remove_pid_handler(task, msg);
634
635 if (msg->port_msg.stream == 0) {
636 return;
637 }
638
638 mp = nxt_mp_create(1024, 128, 256, 32);
639
640 buf = nxt_buf_mem_alloc(mp, nxt_buf_used_size(msg->buf), 0);
641 buf->mem.free = nxt_cpymem(buf->mem.free, msg->buf->mem.pos,
642 nxt_buf_used_size(msg->buf));
643
644 nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
645 {
639 nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
640 {
646 rp = nxt_mp_retain(mp, sizeof(nxt_remove_pid_msg_t));
647
648 rp->mem_pool = mp;
649
650 rp->msg.fd = msg->fd;
651 rp->msg.buf = buf;
652 rp->msg.port = engine->port;
653 rp->msg.port_msg = msg->port_msg;
654 rp->msg.size = msg->size;
655 rp->msg.new_port = NULL;
656
657 rp->work.handler = nxt_router_worker_remove_pid_handler;
658 rp->work.task = &engine->task;
659 rp->work.obj = rp;
660 rp->work.data = task->thread->engine;
661 rp->work.next = NULL;
662
663 nxt_event_engine_post(engine, &rp->work);
641 nxt_port_post(task, engine->port, nxt_router_worker_remove_pid,
642 msg->u.data);
664 }
665 nxt_queue_loop;
666
643 }
644 nxt_queue_loop;
645
667 nxt_mp_release(mp, NULL);
668
669 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
670
671 nxt_port_rpc_handler(task, msg);
672}
673
674
646 msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
647
648 nxt_port_rpc_handler(task, msg);
649}
650
651
675static void
676nxt_router_worker_remove_pid_handler(nxt_task_t *task, void *obj, void *data)
677{
678 nxt_pid_t pid;
679 nxt_buf_t *buf;
680 nxt_event_engine_t *engine;
681 nxt_remove_pid_msg_t *rp;
682
683 rp = obj;
684
685 buf = rp->msg.buf;
686
687 nxt_assert(nxt_buf_used_size(buf) == sizeof(pid));
688
689 nxt_memcpy(&pid, buf->mem.pos, sizeof(pid));
690
691 nxt_port_rpc_remove_peer(task, rp->msg.port, pid);
692
693 engine = rp->work.data;
694
695 rp->work.handler = nxt_router_worker_remove_pid_done;
696 rp->work.task = &engine->task;
697 rp->work.next = NULL;
698
699 nxt_event_engine_post(engine, &rp->work);
700}
701
702
703static void
704nxt_router_worker_remove_pid_done(nxt_task_t *task, void *obj, void *data)
705{
706 nxt_remove_pid_msg_t *rp;
707
708 rp = obj;
709
710 nxt_mp_release(rp->mem_pool, rp);
711}
712
713
714static nxt_router_temp_conf_t *
715nxt_router_temp_conf(nxt_task_t *task)
716{
717 nxt_mp_t *mp, *tmp;
718 nxt_router_conf_t *rtcf;
719 nxt_router_temp_conf_t *tmcf;
720
721 mp = nxt_mp_create(1024, 128, 256, 32);

--- 1801 unchanged lines hidden (view full) ---

2523static void
2524nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2525 void *data)
2526{
2527 nxt_app_t *app;
2528 nxt_port_t *port;
2529
2530 app = data;
652static nxt_router_temp_conf_t *
653nxt_router_temp_conf(nxt_task_t *task)
654{
655 nxt_mp_t *mp, *tmp;
656 nxt_router_conf_t *rtcf;
657 nxt_router_temp_conf_t *tmcf;
658
659 mp = nxt_mp_create(1024, 128, 256, 32);

--- 1801 unchanged lines hidden (view full) ---

2461static void
2462nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
2463 void *data)
2464{
2465 nxt_app_t *app;
2466 nxt_port_t *port;
2467
2468 app = data;
2531 port = msg->new_port;
2469 port = msg->u.new_port;
2532
2533 nxt_assert(app != NULL);
2534 nxt_assert(port != NULL);
2535
2536 port->app = app;
2537
2538 nxt_thread_mutex_lock(&app->mutex);
2539

--- 1178 unchanged lines hidden ---
2470
2471 nxt_assert(app != NULL);
2472 nxt_assert(port != NULL);
2473
2474 port->app = app;
2475
2476 nxt_thread_mutex_lock(&app->mutex);
2477

--- 1178 unchanged lines hidden ---