nxt_router.c (61:50517325136d) nxt_router.c (62:5e1efcc7b740)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>

--- 57 unchanged lines hidden (view full) ---

66
67static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data);
68static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj,
69 void *data);
70static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data);
71static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data);
72static void nxt_router_conn_error(nxt_task_t *task, void *obj, void *data);
73static void nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data);
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) Valentin V. Bartenev
5 * Copyright (C) NGINX, Inc.
6 */
7
8#include <nxt_router.h>

--- 57 unchanged lines hidden (view full) ---

66
67static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data);
68static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj,
69 void *data);
70static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data);
71static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data);
72static void nxt_router_conn_error(nxt_task_t *task, void *obj, void *data);
73static void nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data);
74static nxt_msec_t nxt_router_conn_timeout_value(nxt_event_conn_t *c,
75 uintptr_t data);
74static nxt_msec_t nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data);
76
77
78nxt_int_t
79nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt)
80{
81 nxt_int_t ret;
82 nxt_router_t *router;
83 nxt_router_temp_conf_t *tmcf;

--- 118 unchanged lines hidden (view full) ---

202
203 mp = tmcf->conf->mem_pool;
204
205 sa = nxt_router_listen_sockaddr_stub(task, mp, 8000);
206 skcf = nxt_router_socket_conf(task, mp, sa);
207
208 skcf->listen.handler = nxt_router_conn_init;
209 skcf->listen.mem_pool_size = nxt_listen_socket_pool_min_size(&skcf->listen)
75
76
77nxt_int_t
78nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt)
79{
80 nxt_int_t ret;
81 nxt_router_t *router;
82 nxt_router_temp_conf_t *tmcf;

--- 118 unchanged lines hidden (view full) ---

201
202 mp = tmcf->conf->mem_pool;
203
204 sa = nxt_router_listen_sockaddr_stub(task, mp, 8000);
205 skcf = nxt_router_socket_conf(task, mp, sa);
206
207 skcf->listen.handler = nxt_router_conn_init;
208 skcf->listen.mem_pool_size = nxt_listen_socket_pool_min_size(&skcf->listen)
210 + sizeof(nxt_event_conn_proxy_t)
211 + sizeof(nxt_event_conn_t)
209 + sizeof(nxt_conn_proxy_t)
210 + sizeof(nxt_conn_t)
212 + 4 * sizeof(nxt_buf_t);
213
214 skcf->header_buffer_size = 2048;
215 skcf->large_header_buffer_size = 8192;
216 skcf->header_read_timeout = 5000;
217
218 nxt_queue_insert_tail(&tmcf->pending, &skcf->link);
219
220 sa = nxt_router_listen_sockaddr_stub(task, mp, 8001);
221 skcf = nxt_router_socket_conf(task, mp, sa);
222
223 skcf->listen.handler = nxt_stream_connection_init;
224 skcf->listen.mem_pool_size = nxt_listen_socket_pool_min_size(&skcf->listen)
211 + 4 * sizeof(nxt_buf_t);
212
213 skcf->header_buffer_size = 2048;
214 skcf->large_header_buffer_size = 8192;
215 skcf->header_read_timeout = 5000;
216
217 nxt_queue_insert_tail(&tmcf->pending, &skcf->link);
218
219 sa = nxt_router_listen_sockaddr_stub(task, mp, 8001);
220 skcf = nxt_router_socket_conf(task, mp, sa);
221
222 skcf->listen.handler = nxt_stream_connection_init;
223 skcf->listen.mem_pool_size = nxt_listen_socket_pool_min_size(&skcf->listen)
225 + sizeof(nxt_event_conn_proxy_t)
226 + sizeof(nxt_event_conn_t)
224 + sizeof(nxt_conn_proxy_t)
225 + sizeof(nxt_conn_t)
227 + 4 * sizeof(nxt_buf_t);
228
229 skcf->header_read_timeout = 5000;
230
231 nxt_queue_insert_tail(&tmcf->pending, &skcf->link);
232
233 return NXT_OK;
234}

--- 641 unchanged lines hidden (view full) ---

876 nxt_event_engine_free(engine);
877
878 nxt_free(link);
879
880 // TODO: free port
881}
882
883
226 + 4 * sizeof(nxt_buf_t);
227
228 skcf->header_read_timeout = 5000;
229
230 nxt_queue_insert_tail(&tmcf->pending, &skcf->link);
231
232 return NXT_OK;
233}

--- 641 unchanged lines hidden (view full) ---

875 nxt_event_engine_free(engine);
876
877 nxt_free(link);
878
879 // TODO: free port
880}
881
882
884static const nxt_event_conn_state_t nxt_router_conn_read_state
883static const nxt_conn_state_t nxt_router_conn_read_state
885 nxt_aligned(64) =
886{
887 .ready_handler = nxt_router_conn_http_header_parse,
888 .close_handler = nxt_router_conn_close,
889 .error_handler = nxt_router_conn_error,
890
891 .timer_handler = nxt_router_conn_timeout,
892 .timer_value = nxt_router_conn_timeout_value,
893 .timer_data = offsetof(nxt_socket_conf_t, header_read_timeout),
894};
895
896
897static void
898nxt_router_conn_init(nxt_task_t *task, void *obj, void *data)
899{
900 size_t size;
884 nxt_aligned(64) =
885{
886 .ready_handler = nxt_router_conn_http_header_parse,
887 .close_handler = nxt_router_conn_close,
888 .error_handler = nxt_router_conn_error,
889
890 .timer_handler = nxt_router_conn_timeout,
891 .timer_value = nxt_router_conn_timeout_value,
892 .timer_data = offsetof(nxt_socket_conf_t, header_read_timeout),
893};
894
895
896static void
897nxt_router_conn_init(nxt_task_t *task, void *obj, void *data)
898{
899 size_t size;
901 nxt_event_conn_t *c;
900 nxt_conn_t *c;
902 nxt_event_engine_t *engine;
903 nxt_socket_conf_joint_t *joint;
904
905 c = obj;
906 joint = data;
907
908 nxt_debug(task, "router conn init");
909

--- 5 unchanged lines hidden (view full) ---

915 c->socket.data = NULL;
916
917 engine = task->thread->engine;
918 c->read_work_queue = &engine->fast_work_queue;
919 c->write_work_queue = &engine->fast_work_queue;
920
921 c->read_state = &nxt_router_conn_read_state;
922
901 nxt_event_engine_t *engine;
902 nxt_socket_conf_joint_t *joint;
903
904 c = obj;
905 joint = data;
906
907 nxt_debug(task, "router conn init");
908

--- 5 unchanged lines hidden (view full) ---

914 c->socket.data = NULL;
915
916 engine = task->thread->engine;
917 c->read_work_queue = &engine->fast_work_queue;
918 c->write_work_queue = &engine->fast_work_queue;
919
920 c->read_state = &nxt_router_conn_read_state;
921
923 nxt_event_conn_read(engine, c);
922 nxt_conn_read(engine, c);
924}
925
926
923}
924
925
927static const nxt_event_conn_state_t nxt_router_conn_write_state
926static const nxt_conn_state_t nxt_router_conn_write_state
928 nxt_aligned(64) =
929{
930 .ready_handler = nxt_router_conn_close,
931 .close_handler = nxt_router_conn_close,
932 .error_handler = nxt_router_conn_error,
933};
934
935
936static void
937nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data)
938{
939 size_t size;
940 nxt_int_t ret;
941 nxt_buf_t *b;
927 nxt_aligned(64) =
928{
929 .ready_handler = nxt_router_conn_close,
930 .close_handler = nxt_router_conn_close,
931 .error_handler = nxt_router_conn_error,
932};
933
934
935static void
936nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data)
937{
938 size_t size;
939 nxt_int_t ret;
940 nxt_buf_t *b;
942 nxt_event_conn_t *c;
941 nxt_conn_t *c;
943 nxt_socket_conf_joint_t *joint;
944 nxt_http_request_parse_t *rp;
945
946 c = obj;
947 rp = data;
948
949 nxt_debug(task, "router conn http header parse");
950

--- 40 unchanged lines hidden (view full) ---

991
992 size = c->read->mem.free - c->read->mem.pos;
993 nxt_memcpy(b->mem.pos, c->read->mem.pos, size);
994
995 b->mem.free += size;
996 c->read = b;
997 }
998
942 nxt_socket_conf_joint_t *joint;
943 nxt_http_request_parse_t *rp;
944
945 c = obj;
946 rp = data;
947
948 nxt_debug(task, "router conn http header parse");
949

--- 40 unchanged lines hidden (view full) ---

990
991 size = c->read->mem.free - c->read->mem.pos;
992 nxt_memcpy(b->mem.pos, c->read->mem.pos, size);
993
994 b->mem.free += size;
995 c->read = b;
996 }
997
999 nxt_event_conn_read(task->thread->engine, c);
998 nxt_conn_read(task->thread->engine, c);
1000 return;
1001 }
1002
1003 c->write = c->read;
1004 c->write->mem.pos = c->write->mem.start;
1005 c->write_state = &nxt_router_conn_write_state;
1006
999 return;
1000 }
1001
1002 c->write = c->read;
1003 c->write->mem.pos = c->write->mem.start;
1004 c->write_state = &nxt_router_conn_write_state;
1005
1007 nxt_event_conn_write(task->thread->engine, c);
1006 nxt_conn_write(task->thread->engine, c);
1008}
1009
1010
1007}
1008
1009
1011static const nxt_event_conn_state_t nxt_router_conn_close_state
1010static const nxt_conn_state_t nxt_router_conn_close_state
1012 nxt_aligned(64) =
1013{
1014 .ready_handler = nxt_router_conn_free,
1015};
1016
1017
1018static void
1019nxt_router_conn_close(nxt_task_t *task, void *obj, void *data)
1020{
1011 nxt_aligned(64) =
1012{
1013 .ready_handler = nxt_router_conn_free,
1014};
1015
1016
1017static void
1018nxt_router_conn_close(nxt_task_t *task, void *obj, void *data)
1019{
1021 nxt_event_conn_t *c;
1020 nxt_conn_t *c;
1022
1023 c = obj;
1024
1025 nxt_debug(task, "router conn close");
1026
1027 c->write_state = &nxt_router_conn_close_state;
1028
1021
1022 c = obj;
1023
1024 nxt_debug(task, "router conn close");
1025
1026 c->write_state = &nxt_router_conn_close_state;
1027
1029 nxt_event_conn_close(task->thread->engine, c);
1028 nxt_conn_close(task->thread->engine, c);
1030}
1031
1032
1033static void
1034nxt_router_conn_free(nxt_task_t *task, void *obj, void *data)
1035{
1029}
1030
1031
1032static void
1033nxt_router_conn_free(nxt_task_t *task, void *obj, void *data)
1034{
1036 nxt_event_conn_t *c;
1035 nxt_conn_t *c;
1037 nxt_socket_conf_joint_t *joint;
1038
1039 c = obj;
1040
1041 nxt_debug(task, "router conn close done");
1042
1043 joint = c->listen->socket.data;
1044 nxt_router_conf_release(task, joint);
1045
1046 nxt_mem_pool_destroy(c->mem_pool);
1047}
1048
1049
1050static void
1051nxt_router_conn_error(nxt_task_t *task, void *obj, void *data)
1052{
1036 nxt_socket_conf_joint_t *joint;
1037
1038 c = obj;
1039
1040 nxt_debug(task, "router conn close done");
1041
1042 joint = c->listen->socket.data;
1043 nxt_router_conf_release(task, joint);
1044
1045 nxt_mem_pool_destroy(c->mem_pool);
1046}
1047
1048
1049static void
1050nxt_router_conn_error(nxt_task_t *task, void *obj, void *data)
1051{
1053 nxt_event_conn_t *c;
1052 nxt_conn_t *c;
1054
1055 c = obj;
1056
1057 nxt_debug(task, "router conn error");
1058
1059 c->write_state = &nxt_router_conn_close_state;
1060
1053
1054 c = obj;
1055
1056 nxt_debug(task, "router conn error");
1057
1058 c->write_state = &nxt_router_conn_close_state;
1059
1061 nxt_event_conn_close(task->thread->engine, c);
1060 nxt_conn_close(task->thread->engine, c);
1062}
1063
1064
1065static void
1066nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data)
1067{
1061}
1062
1063
1064static void
1065nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data)
1066{
1068 nxt_timer_t *timer;
1069 nxt_event_conn_t *c;
1067 nxt_conn_t *c;
1068 nxt_timer_t *timer;
1070
1071 timer = obj;
1072
1073 nxt_debug(task, "router conn timeout");
1074
1069
1070 timer = obj;
1071
1072 nxt_debug(task, "router conn timeout");
1073
1075 c = nxt_event_read_timer_conn(timer);
1074 c = nxt_read_timer_conn(timer);
1076
1077 c->write_state = &nxt_router_conn_close_state;
1078
1075
1076 c->write_state = &nxt_router_conn_close_state;
1077
1079 nxt_event_conn_close(task->thread->engine, c);
1078 nxt_conn_close(task->thread->engine, c);
1080}
1081
1082
1083static nxt_msec_t
1079}
1080
1081
1082static nxt_msec_t
1084nxt_router_conn_timeout_value(nxt_event_conn_t *c, uintptr_t data)
1083nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data)
1085{
1086 nxt_socket_conf_joint_t *joint;
1087
1088 joint = c->listen->socket.data;
1089
1090 return nxt_value_at(nxt_msec_t, joint->socket_conf, data);
1091}
1084{
1085 nxt_socket_conf_joint_t *joint;
1086
1087 joint = c->listen->socket.data;
1088
1089 return nxt_value_at(nxt_msec_t, joint->socket_conf, data);
1090}