nxt_h1proto.c (1269:41331471eee7) nxt_h1proto.c (1270:9efa309be18b)
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_router.h>
8#include <nxt_http.h>

--- 31 unchanged lines hidden (view full) ---

40 uintptr_t data);
41static nxt_int_t nxt_h1p_transfer_encoding(void *ctx, nxt_http_field_t *field,
42 uintptr_t data);
43static void nxt_h1p_request_body_read(nxt_task_t *task, nxt_http_request_t *r);
44static void nxt_h1p_conn_request_body_read(nxt_task_t *task, void *obj,
45 void *data);
46static void nxt_h1p_request_local_addr(nxt_task_t *task, nxt_http_request_t *r);
47static void nxt_h1p_request_header_send(nxt_task_t *task,
1
2/*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7#include <nxt_router.h>
8#include <nxt_http.h>

--- 31 unchanged lines hidden (view full) ---

40 uintptr_t data);
41static nxt_int_t nxt_h1p_transfer_encoding(void *ctx, nxt_http_field_t *field,
42 uintptr_t data);
43static void nxt_h1p_request_body_read(nxt_task_t *task, nxt_http_request_t *r);
44static void nxt_h1p_conn_request_body_read(nxt_task_t *task, void *obj,
45 void *data);
46static void nxt_h1p_request_local_addr(nxt_task_t *task, nxt_http_request_t *r);
47static void nxt_h1p_request_header_send(nxt_task_t *task,
48 nxt_http_request_t *r, nxt_work_handler_t body_handler);
48 nxt_http_request_t *r, nxt_work_handler_t body_handler, void *data);
49static void nxt_h1p_request_send(nxt_task_t *task, nxt_http_request_t *r,
50 nxt_buf_t *out);
51static nxt_buf_t *nxt_h1p_chunk_create(nxt_task_t *task, nxt_http_request_t *r,
52 nxt_buf_t *out);
53static nxt_off_t nxt_h1p_request_body_bytes_sent(nxt_task_t *task,
54 nxt_http_proto_t proto);
55static void nxt_h1p_request_discard(nxt_task_t *task, nxt_http_request_t *r,
56 nxt_buf_t *last);

--- 16 unchanged lines hidden (view full) ---

73static void nxt_h1p_idle_timeout(nxt_task_t *task, void *obj, void *data);
74static void nxt_h1p_idle_response(nxt_task_t *task, nxt_conn_t *c);
75static void nxt_h1p_idle_response_sent(nxt_task_t *task, void *obj, void *data);
76static void nxt_h1p_idle_response_timeout(nxt_task_t *task, void *obj,
77 void *data);
78static nxt_msec_t nxt_h1p_idle_response_timer_value(nxt_conn_t *c,
79 uintptr_t data);
80static void nxt_h1p_shutdown(nxt_task_t *task, nxt_conn_t *c);
49static void nxt_h1p_request_send(nxt_task_t *task, nxt_http_request_t *r,
50 nxt_buf_t *out);
51static nxt_buf_t *nxt_h1p_chunk_create(nxt_task_t *task, nxt_http_request_t *r,
52 nxt_buf_t *out);
53static nxt_off_t nxt_h1p_request_body_bytes_sent(nxt_task_t *task,
54 nxt_http_proto_t proto);
55static void nxt_h1p_request_discard(nxt_task_t *task, nxt_http_request_t *r,
56 nxt_buf_t *last);

--- 16 unchanged lines hidden (view full) ---

73static void nxt_h1p_idle_timeout(nxt_task_t *task, void *obj, void *data);
74static void nxt_h1p_idle_response(nxt_task_t *task, nxt_conn_t *c);
75static void nxt_h1p_idle_response_sent(nxt_task_t *task, void *obj, void *data);
76static void nxt_h1p_idle_response_timeout(nxt_task_t *task, void *obj,
77 void *data);
78static nxt_msec_t nxt_h1p_idle_response_timer_value(nxt_conn_t *c,
79 uintptr_t data);
80static void nxt_h1p_shutdown(nxt_task_t *task, nxt_conn_t *c);
81static void nxt_h1p_shutdown_(nxt_task_t *task, nxt_conn_t *c);
81static void nxt_h1p_closing(nxt_task_t *task, nxt_conn_t *c);
82static void nxt_h1p_conn_ws_shutdown(nxt_task_t *task, void *obj, void *data);
83static void nxt_h1p_conn_closing(nxt_task_t *task, void *obj, void *data);
84static void nxt_h1p_conn_free(nxt_task_t *task, void *obj, void *data);
85
82static void nxt_h1p_conn_ws_shutdown(nxt_task_t *task, void *obj, void *data);
83static void nxt_h1p_conn_closing(nxt_task_t *task, void *obj, void *data);
84static void nxt_h1p_conn_free(nxt_task_t *task, void *obj, void *data);
85
86static void nxt_h1p_peer_connect(nxt_task_t *task, nxt_http_peer_t *peer);
87static void nxt_h1p_peer_connected(nxt_task_t *task, void *obj, void *data);
88static void nxt_h1p_peer_refused(nxt_task_t *task, void *obj, void *data);
89static void nxt_h1p_peer_header_send(nxt_task_t *task, nxt_http_peer_t *peer);
90static void nxt_h1p_peer_header_sent(nxt_task_t *task, void *obj, void *data);
91static void nxt_h1p_peer_header_read(nxt_task_t *task, nxt_http_peer_t *peer);
92static ssize_t nxt_h1p_peer_io_read_handler(nxt_task_t *task, nxt_conn_t *c);
93static void nxt_h1p_peer_header_read_done(nxt_task_t *task, void *obj,
94 void *data);
95static nxt_int_t nxt_h1p_peer_header_parse(nxt_http_peer_t *peer,
96 nxt_buf_mem_t *bm);
97static void nxt_h1p_peer_read(nxt_task_t *task, nxt_http_peer_t *peer);
98static void nxt_h1p_peer_read_done(nxt_task_t *task, void *obj, void *data);
99static void nxt_h1p_peer_closed(nxt_task_t *task, void *obj, void *data);
100static void nxt_h1p_peer_error(nxt_task_t *task, void *obj, void *data);
101static void nxt_h1p_peer_send_timeout(nxt_task_t *task, void *obj, void *data);
102static void nxt_h1p_peer_read_timeout(nxt_task_t *task, void *obj, void *data);
103static nxt_msec_t nxt_h1p_peer_timer_value(nxt_conn_t *c, uintptr_t data);
104static void nxt_h1p_peer_close(nxt_task_t *task, nxt_http_peer_t *peer);
105static void nxt_h1p_peer_free(nxt_task_t *task, void *obj, void *data);
106
86#if (NXT_TLS)
87static const nxt_conn_state_t nxt_http_idle_state;
88static const nxt_conn_state_t nxt_h1p_shutdown_state;
89#endif
90static const nxt_conn_state_t nxt_h1p_idle_state;
91static const nxt_conn_state_t nxt_h1p_header_parse_state;
92static const nxt_conn_state_t nxt_h1p_read_body_state;
93static const nxt_conn_state_t nxt_h1p_request_send_state;
94static const nxt_conn_state_t nxt_h1p_timeout_response_state;
95static const nxt_conn_state_t nxt_h1p_keepalive_state;
96static const nxt_conn_state_t nxt_h1p_close_state;
107#if (NXT_TLS)
108static const nxt_conn_state_t nxt_http_idle_state;
109static const nxt_conn_state_t nxt_h1p_shutdown_state;
110#endif
111static const nxt_conn_state_t nxt_h1p_idle_state;
112static const nxt_conn_state_t nxt_h1p_header_parse_state;
113static const nxt_conn_state_t nxt_h1p_read_body_state;
114static const nxt_conn_state_t nxt_h1p_request_send_state;
115static const nxt_conn_state_t nxt_h1p_timeout_response_state;
116static const nxt_conn_state_t nxt_h1p_keepalive_state;
117static const nxt_conn_state_t nxt_h1p_close_state;
118static const nxt_conn_state_t nxt_h1p_peer_connect_state;
119static const nxt_conn_state_t nxt_h1p_peer_header_send_state;
120static const nxt_conn_state_t nxt_h1p_peer_header_body_send_state;
121static const nxt_conn_state_t nxt_h1p_peer_header_read_state;
122static const nxt_conn_state_t nxt_h1p_peer_header_read_timer_state;
123static const nxt_conn_state_t nxt_h1p_peer_read_state;
124static const nxt_conn_state_t nxt_h1p_peer_close_state;
97
98
99const nxt_http_proto_table_t nxt_http_proto[3] = {
100 /* NXT_HTTP_PROTO_H1 */
101 {
102 .body_read = nxt_h1p_request_body_read,
103 .local_addr = nxt_h1p_request_local_addr,
104 .header_send = nxt_h1p_request_header_send,
105 .send = nxt_h1p_request_send,
106 .body_bytes_sent = nxt_h1p_request_body_bytes_sent,
107 .discard = nxt_h1p_request_discard,
108 .close = nxt_h1p_request_close,
125
126
127const nxt_http_proto_table_t nxt_http_proto[3] = {
128 /* NXT_HTTP_PROTO_H1 */
129 {
130 .body_read = nxt_h1p_request_body_read,
131 .local_addr = nxt_h1p_request_local_addr,
132 .header_send = nxt_h1p_request_header_send,
133 .send = nxt_h1p_request_send,
134 .body_bytes_sent = nxt_h1p_request_body_bytes_sent,
135 .discard = nxt_h1p_request_discard,
136 .close = nxt_h1p_request_close,
137
138 .peer_connect = nxt_h1p_peer_connect,
139 .peer_header_send = nxt_h1p_peer_header_send,
140 .peer_header_read = nxt_h1p_peer_header_read,
141 .peer_read = nxt_h1p_peer_read,
142 .peer_close = nxt_h1p_peer_close,
143
109 .ws_frame_start = nxt_h1p_websocket_frame_start,
110 },
111 /* NXT_HTTP_PROTO_H2 */
112 /* NXT_HTTP_PROTO_DEVNULL */
113};
114
115
144 .ws_frame_start = nxt_h1p_websocket_frame_start,
145 },
146 /* NXT_HTTP_PROTO_H2 */
147 /* NXT_HTTP_PROTO_DEVNULL */
148};
149
150
116static nxt_lvlhsh_t nxt_h1p_fields_hash;
151static nxt_lvlhsh_t nxt_h1p_fields_hash;
117
152
118static nxt_http_field_proc_t nxt_h1p_fields[] = {
153static nxt_http_field_proc_t nxt_h1p_fields[] = {
119 { nxt_string("Connection"), &nxt_h1p_connection, 0 },
120 { nxt_string("Upgrade"), &nxt_h1p_upgrade, 0 },
121 { nxt_string("Sec-WebSocket-Key"), &nxt_h1p_websocket_key, 0 },
122 { nxt_string("Sec-WebSocket-Version"),
123 &nxt_h1p_websocket_version, 0 },
124 { nxt_string("Transfer-Encoding"), &nxt_h1p_transfer_encoding, 0 },
125
126 { nxt_string("Host"), &nxt_http_request_host, 0 },

--- 4 unchanged lines hidden (view full) ---

131 { nxt_string("User-Agent"), &nxt_http_request_field,
132 offsetof(nxt_http_request_t, user_agent) },
133 { nxt_string("Content-Type"), &nxt_http_request_field,
134 offsetof(nxt_http_request_t, content_type) },
135 { nxt_string("Content-Length"), &nxt_http_request_content_length, 0 },
136};
137
138
154 { nxt_string("Connection"), &nxt_h1p_connection, 0 },
155 { nxt_string("Upgrade"), &nxt_h1p_upgrade, 0 },
156 { nxt_string("Sec-WebSocket-Key"), &nxt_h1p_websocket_key, 0 },
157 { nxt_string("Sec-WebSocket-Version"),
158 &nxt_h1p_websocket_version, 0 },
159 { nxt_string("Transfer-Encoding"), &nxt_h1p_transfer_encoding, 0 },
160
161 { nxt_string("Host"), &nxt_http_request_host, 0 },

--- 4 unchanged lines hidden (view full) ---

166 { nxt_string("User-Agent"), &nxt_http_request_field,
167 offsetof(nxt_http_request_t, user_agent) },
168 { nxt_string("Content-Type"), &nxt_http_request_field,
169 offsetof(nxt_http_request_t, content_type) },
170 { nxt_string("Content-Length"), &nxt_http_request_content_length, 0 },
171};
172
173
174static nxt_lvlhsh_t nxt_h1p_peer_fields_hash;
175
176static nxt_http_field_proc_t nxt_h1p_peer_fields[] = {
177 { nxt_string("Connection"), &nxt_http_proxy_skip, 0 },
178 { nxt_string("Transfer-Encoding"), &nxt_http_proxy_skip, 0 },
179 { nxt_string("Server"), &nxt_http_proxy_skip, 0 },
180 { nxt_string("Date"), &nxt_http_proxy_date, 0 },
181 { nxt_string("Content-Length"), &nxt_http_proxy_content_length, 0 },
182};
183
184
139nxt_int_t
140nxt_h1p_init(nxt_task_t *task, nxt_runtime_t *rt)
141{
185nxt_int_t
186nxt_h1p_init(nxt_task_t *task, nxt_runtime_t *rt)
187{
142 return nxt_http_fields_hash(&nxt_h1p_fields_hash, rt->mem_pool,
143 nxt_h1p_fields, nxt_nitems(nxt_h1p_fields));
188 nxt_int_t ret;
189
190 ret = nxt_http_fields_hash(&nxt_h1p_fields_hash, rt->mem_pool,
191 nxt_h1p_fields, nxt_nitems(nxt_h1p_fields));
192
193 if (nxt_fast_path(ret == NXT_OK)) {
194 ret = nxt_http_fields_hash(&nxt_h1p_peer_fields_hash,
195 rt->mem_pool, nxt_h1p_peer_fields,
196 nxt_nitems(nxt_h1p_peer_fields));
197 }
198
199 return ret;
144}
145
146
147void
148nxt_http_conn_init(nxt_task_t *task, void *obj, void *data)
149{
150 nxt_conn_t *c;
151 nxt_socket_conf_t *skcf;

--- 146 unchanged lines hidden (view full) ---

298
299 joint = c->listen->socket.data;
300
301 if (nxt_slow_path(joint == NULL)) {
302 /*
303 * Listening socket had been closed while
304 * connection was in keep-alive state.
305 */
200}
201
202
203void
204nxt_http_conn_init(nxt_task_t *task, void *obj, void *data)
205{
206 nxt_conn_t *c;
207 nxt_socket_conf_t *skcf;

--- 146 unchanged lines hidden (view full) ---

354
355 joint = c->listen->socket.data;
356
357 if (nxt_slow_path(joint == NULL)) {
358 /*
359 * Listening socket had been closed while
360 * connection was in keep-alive state.
361 */
306 nxt_h1p_shutdown(task, c);
362 nxt_h1p_closing(task, c);
307 return;
308 }
309
310 tls = joint->socket_conf->tls;
311
312 tls->conn_init(task, tls, c);
313}
314

--- 68 unchanged lines hidden (view full) ---

383 nxt_h1proto_t *h1p;
384
385 c = obj;
386
387 nxt_debug(task, "h1p conn proto init");
388
389 h1p = nxt_mp_zget(c->mem_pool, sizeof(nxt_h1proto_t));
390 if (nxt_slow_path(h1p == NULL)) {
363 return;
364 }
365
366 tls = joint->socket_conf->tls;
367
368 tls->conn_init(task, tls, c);
369}
370

--- 68 unchanged lines hidden (view full) ---

439 nxt_h1proto_t *h1p;
440
441 c = obj;
442
443 nxt_debug(task, "h1p conn proto init");
444
445 h1p = nxt_mp_zget(c->mem_pool, sizeof(nxt_h1proto_t));
446 if (nxt_slow_path(h1p == NULL)) {
391 nxt_h1p_shutdown(task, c);
447 nxt_h1p_closing(task, c);
392 return;
393 }
394
395 c->socket.data = h1p;
396 h1p->conn = c;
397
398 nxt_h1p_conn_request_init(task, c, h1p);
399}

--- 47 unchanged lines hidden (view full) ---

447
448 /*
449 * The request is very incomplete here,
450 * so "internal server error" useless here.
451 */
452 nxt_mp_release(r->mem_pool);
453 }
454
448 return;
449 }
450
451 c->socket.data = h1p;
452 h1p->conn = c;
453
454 nxt_h1p_conn_request_init(task, c, h1p);
455}

--- 47 unchanged lines hidden (view full) ---

503
504 /*
505 * The request is very incomplete here,
506 * so "internal server error" useless here.
507 */
508 nxt_mp_release(r->mem_pool);
509 }
510
455 nxt_h1p_shutdown(task, c);
511 nxt_h1p_closing(task, c);
456}
457
458
459static const nxt_conn_state_t nxt_h1p_header_parse_state
460 nxt_aligned(64) =
461{
462 .ready_handler = nxt_h1p_conn_request_header_parse,
463 .close_handler = nxt_h1p_conn_request_error,

--- 199 unchanged lines hidden (view full) ---

663
664
665static nxt_int_t
666nxt_h1p_connection(void *ctx, nxt_http_field_t *field, uintptr_t data)
667{
668 nxt_http_request_t *r;
669
670 r = ctx;
512}
513
514
515static const nxt_conn_state_t nxt_h1p_header_parse_state
516 nxt_aligned(64) =
517{
518 .ready_handler = nxt_h1p_conn_request_header_parse,
519 .close_handler = nxt_h1p_conn_request_error,

--- 199 unchanged lines hidden (view full) ---

719
720
721static nxt_int_t
722nxt_h1p_connection(void *ctx, nxt_http_field_t *field, uintptr_t data)
723{
724 nxt_http_request_t *r;
725
726 r = ctx;
727 field->hopbyhop = 1;
671
672 if (field->value_length == 5 && nxt_memcmp(field->value, "close", 5) == 0) {
673 r->proto.h1->keepalive = 0;
674
675 } else if (field->value_length == 7
676 && nxt_memcasecmp(field->value, "upgrade", 7) == 0)
677 {
678 r->proto.h1->connection_upgrade = 1;

--- 54 unchanged lines hidden (view full) ---

733
734static nxt_int_t
735nxt_h1p_transfer_encoding(void *ctx, nxt_http_field_t *field, uintptr_t data)
736{
737 nxt_http_te_t te;
738 nxt_http_request_t *r;
739
740 r = ctx;
728
729 if (field->value_length == 5 && nxt_memcmp(field->value, "close", 5) == 0) {
730 r->proto.h1->keepalive = 0;
731
732 } else if (field->value_length == 7
733 && nxt_memcasecmp(field->value, "upgrade", 7) == 0)
734 {
735 r->proto.h1->connection_upgrade = 1;

--- 54 unchanged lines hidden (view full) ---

790
791static nxt_int_t
792nxt_h1p_transfer_encoding(void *ctx, nxt_http_field_t *field, uintptr_t data)
793{
794 nxt_http_te_t te;
795 nxt_http_request_t *r;
796
797 r = ctx;
798 field->skip = 1;
799 field->hopbyhop = 1;
741
742 if (field->value_length == 7
743 && nxt_memcmp(field->value, "chunked", 7) == 0)
744 {
745 te = NXT_HTTP_TE_CHUNKED;
746
747 } else {
748 te = NXT_HTTP_TE_UNSUPPORTED;

--- 249 unchanged lines hidden (view full) ---

998 nxt_string("HTTP/1.1 505 HTTP Version Not Supported\r\n"),
999};
1000
1001
1002#define UNKNOWN_STATUS_LENGTH nxt_length("HTTP/1.1 65536\r\n")
1003
1004static void
1005nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r,
800
801 if (field->value_length == 7
802 && nxt_memcmp(field->value, "chunked", 7) == 0)
803 {
804 te = NXT_HTTP_TE_CHUNKED;
805
806 } else {
807 te = NXT_HTTP_TE_UNSUPPORTED;

--- 249 unchanged lines hidden (view full) ---

1057 nxt_string("HTTP/1.1 505 HTTP Version Not Supported\r\n"),
1058};
1059
1060
1061#define UNKNOWN_STATUS_LENGTH nxt_length("HTTP/1.1 65536\r\n")
1062
1063static void
1064nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r,
1006 nxt_work_handler_t body_handler)
1065 nxt_work_handler_t body_handler, void *data)
1007{
1008 u_char *p;
1009 size_t size;
1010 nxt_buf_t *header;
1011 nxt_str_t unknown_status;
1012 nxt_int_t conn;
1013 nxt_uint_t n;
1014 nxt_bool_t http11;

--- 160 unchanged lines hidden (view full) ---

1175
1176 if (body_handler != NULL) {
1177 /*
1178 * The body handler will run before c->io->write() handler,
1179 * because the latter was inqueued by nxt_conn_write()
1180 * in engine->write_work_queue.
1181 */
1182 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
1066{
1067 u_char *p;
1068 size_t size;
1069 nxt_buf_t *header;
1070 nxt_str_t unknown_status;
1071 nxt_int_t conn;
1072 nxt_uint_t n;
1073 nxt_bool_t http11;

--- 160 unchanged lines hidden (view full) ---

1234
1235 if (body_handler != NULL) {
1236 /*
1237 * The body handler will run before c->io->write() handler,
1238 * because the latter was inqueued by nxt_conn_write()
1239 * in engine->write_work_queue.
1240 */
1241 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
1183 body_handler, task, r, NULL);
1242 body_handler, task, r, data);
1184
1185 } else {
1186 header->next = nxt_http_buf_last(r);
1187 }
1188
1189 nxt_conn_write(task->thread->engine, c);
1190
1191 if (h1p->websocket) {

--- 587 unchanged lines hidden (view full) ---

1779 timer->handler = nxt_h1p_conn_ws_shutdown;
1780 nxt_timer_add(task->thread->engine, timer, 0);
1781
1782 } else {
1783 nxt_debug(task, "h1p already scheduled ws shutdown");
1784 }
1785
1786 } else {
1243
1244 } else {
1245 header->next = nxt_http_buf_last(r);
1246 }
1247
1248 nxt_conn_write(task->thread->engine, c);
1249
1250 if (h1p->websocket) {

--- 587 unchanged lines hidden (view full) ---

1838 timer->handler = nxt_h1p_conn_ws_shutdown;
1839 nxt_timer_add(task->thread->engine, timer, 0);
1840
1841 } else {
1842 nxt_debug(task, "h1p already scheduled ws shutdown");
1843 }
1844
1845 } else {
1787 nxt_h1p_shutdown_(task, c);
1846 nxt_h1p_closing(task, c);
1788 }
1789}
1790
1791
1792static void
1847 }
1848}
1849
1850
1851static void
1793nxt_h1p_shutdown_(nxt_task_t *task, nxt_conn_t *c)
1852nxt_h1p_conn_ws_shutdown(nxt_task_t *task, void *obj, void *data)
1794{
1853{
1854 nxt_timer_t *timer;
1855 nxt_h1p_websocket_timer_t *ws_timer;
1856
1857 nxt_debug(task, "h1p conn ws shutdown");
1858
1859 timer = obj;
1860 ws_timer = nxt_timer_data(timer, nxt_h1p_websocket_timer_t, timer);
1861
1862 nxt_h1p_closing(task, ws_timer->h1p->conn);
1863}
1864
1865
1866static void
1867nxt_h1p_closing(nxt_task_t *task, nxt_conn_t *c)
1868{
1869 nxt_debug(task, "h1p closing");
1870
1795 c->socket.data = NULL;
1796
1797#if (NXT_TLS)
1798
1799 if (c->u.tls != NULL) {
1800 c->write_state = &nxt_h1p_shutdown_state;
1801
1802 c->io->shutdown(task, c, NULL);

--- 15 unchanged lines hidden (view full) ---

1818 .close_handler = nxt_h1p_conn_closing,
1819 .error_handler = nxt_h1p_conn_closing,
1820};
1821
1822#endif
1823
1824
1825static void
1871 c->socket.data = NULL;
1872
1873#if (NXT_TLS)
1874
1875 if (c->u.tls != NULL) {
1876 c->write_state = &nxt_h1p_shutdown_state;
1877
1878 c->io->shutdown(task, c, NULL);

--- 15 unchanged lines hidden (view full) ---

1894 .close_handler = nxt_h1p_conn_closing,
1895 .error_handler = nxt_h1p_conn_closing,
1896};
1897
1898#endif
1899
1900
1901static void
1826nxt_h1p_conn_ws_shutdown(nxt_task_t *task, void *obj, void *data)
1827{
1828 nxt_timer_t *timer;
1829 nxt_h1p_websocket_timer_t *ws_timer;
1830
1831 nxt_debug(task, "h1p conn ws shutdown");
1832
1833 timer = obj;
1834 ws_timer = nxt_timer_data(timer, nxt_h1p_websocket_timer_t, timer);
1835
1836 nxt_h1p_shutdown_(task, ws_timer->h1p->conn);
1837}
1838
1839
1840static void
1841nxt_h1p_conn_closing(nxt_task_t *task, void *obj, void *data)
1842{
1843 nxt_conn_t *c;
1844
1845 c = obj;
1846
1847 nxt_debug(task, "h1p conn closing");
1848

--- 28 unchanged lines hidden (view full) ---

1877 nxt_sockaddr_cache_free(engine, c);
1878
1879 lev = c->listen;
1880
1881 nxt_conn_free(task, c);
1882
1883 nxt_router_listen_event_release(&engine->task, lev, NULL);
1884}
1902nxt_h1p_conn_closing(nxt_task_t *task, void *obj, void *data)
1903{
1904 nxt_conn_t *c;
1905
1906 c = obj;
1907
1908 nxt_debug(task, "h1p conn closing");
1909

--- 28 unchanged lines hidden (view full) ---

1938 nxt_sockaddr_cache_free(engine, c);
1939
1940 lev = c->listen;
1941
1942 nxt_conn_free(task, c);
1943
1944 nxt_router_listen_event_release(&engine->task, lev, NULL);
1945}
1946
1947
1948static void
1949nxt_h1p_peer_connect(nxt_task_t *task, nxt_http_peer_t *peer)
1950{
1951 nxt_mp_t *mp;
1952 nxt_int_t ret;
1953 nxt_conn_t *c, *client;
1954 nxt_h1proto_t *h1p;
1955 nxt_fd_event_t *socket;
1956 nxt_work_queue_t *wq;
1957 nxt_http_request_t *r;
1958
1959 nxt_debug(task, "h1p peer connect");
1960
1961 peer->status = NXT_HTTP_UNSET;
1962 r = peer->request;
1963
1964 mp = nxt_mp_create(1024, 128, 256, 32);
1965
1966 if (nxt_slow_path(mp == NULL)) {
1967 goto fail;
1968 }
1969
1970 h1p = nxt_mp_zalloc(mp, sizeof(nxt_h1proto_t));
1971 if (nxt_slow_path(h1p == NULL)) {
1972 goto fail;
1973 }
1974
1975 ret = nxt_http_parse_request_init(&h1p->parser, r->mem_pool);
1976 if (nxt_slow_path(ret != NXT_OK)) {
1977 goto fail;
1978 }
1979
1980 c = nxt_conn_create(mp, task);
1981 if (nxt_slow_path(c == NULL)) {
1982 goto fail;
1983 }
1984
1985 c->mem_pool = mp;
1986 h1p->conn = c;
1987
1988 peer->proto.h1 = h1p;
1989 h1p->request = r;
1990
1991 c->socket.task = task;
1992 c->read_timer.task = task;
1993 c->write_timer.task = task;
1994 c->socket.data = peer;
1995 c->remote = peer->sockaddr;
1996
1997 c->socket.write_ready = 1;
1998 c->write_state = &nxt_h1p_peer_connect_state;
1999
2000 /*
2001 * TODO: queues should be implemented via client proto interface.
2002 */
2003 client = r->proto.h1->conn;
2004
2005 socket = &client->socket;
2006 wq = socket->read_work_queue;
2007 c->read_work_queue = wq;
2008 c->socket.read_work_queue = wq;
2009 c->read_timer.work_queue = wq;
2010
2011 wq = socket->write_work_queue;
2012 c->write_work_queue = wq;
2013 c->socket.write_work_queue = wq;
2014 c->write_timer.work_queue = wq;
2015 /* TODO END */
2016
2017 nxt_conn_connect(task->thread->engine, c);
2018
2019 return;
2020
2021fail:
2022
2023 peer->status = NXT_HTTP_INTERNAL_SERVER_ERROR;
2024
2025 r->state->error_handler(task, r, peer);
2026}
2027
2028
2029static const nxt_conn_state_t nxt_h1p_peer_connect_state
2030 nxt_aligned(64) =
2031{
2032 .ready_handler = nxt_h1p_peer_connected,
2033 .close_handler = nxt_h1p_peer_refused,
2034 .error_handler = nxt_h1p_peer_error,
2035
2036 .timer_handler = nxt_h1p_peer_send_timeout,
2037 .timer_value = nxt_h1p_peer_timer_value,
2038 .timer_data = offsetof(nxt_socket_conf_t, proxy_timeout),
2039};
2040
2041
2042static void
2043nxt_h1p_peer_connected(nxt_task_t *task, void *obj, void *data)
2044{
2045 nxt_http_peer_t *peer;
2046 nxt_http_request_t *r;
2047
2048 peer = data;
2049
2050 nxt_debug(task, "h1p peer connected");
2051
2052 r = peer->request;
2053 r->state->ready_handler(task, r, peer);
2054}
2055
2056
2057static void
2058nxt_h1p_peer_refused(nxt_task_t *task, void *obj, void *data)
2059{
2060 nxt_http_peer_t *peer;
2061 nxt_http_request_t *r;
2062
2063 peer = data;
2064
2065 nxt_debug(task, "h1p peer refused");
2066
2067 //peer->status = NXT_HTTP_SERVICE_UNAVAILABLE;
2068 peer->status = NXT_HTTP_BAD_GATEWAY;
2069
2070 r = peer->request;
2071 r->state->error_handler(task, r, peer);
2072}
2073
2074
2075static void
2076nxt_h1p_peer_header_send(nxt_task_t *task, nxt_http_peer_t *peer)
2077{
2078 u_char *p;
2079 size_t size;
2080 nxt_buf_t *header, *body;
2081 nxt_conn_t *c;
2082 nxt_http_field_t *field;
2083 nxt_http_request_t *r;
2084
2085 nxt_debug(task, "h1p peer header send");
2086
2087 r = peer->request;
2088
2089 size = r->method->length + sizeof(" ") + r->target.length
2090 + sizeof(" HTTP/1.0\r\n")
2091 + sizeof("\r\n");
2092
2093 nxt_list_each(field, r->fields) {
2094
2095 if (!field->hopbyhop) {
2096 size += field->name_length + field->value_length;
2097 size += nxt_length(": \r\n");
2098 }
2099
2100 } nxt_list_loop;
2101
2102 header = nxt_http_buf_mem(task, r, size);
2103 if (nxt_slow_path(header == NULL)) {
2104 r->state->error_handler(task, r, peer);
2105 return;
2106 }
2107
2108 p = header->mem.free;
2109
2110 p = nxt_cpymem(p, r->method->start, r->method->length);
2111 *p++ = ' ';
2112 p = nxt_cpymem(p, r->target.start, r->target.length);
2113 p = nxt_cpymem(p, " HTTP/1.0\r\n", 11);
2114
2115 nxt_list_each(field, r->fields) {
2116
2117 if (!field->hopbyhop) {
2118 p = nxt_cpymem(p, field->name, field->name_length);
2119 *p++ = ':'; *p++ = ' ';
2120 p = nxt_cpymem(p, field->value, field->value_length);
2121 *p++ = '\r'; *p++ = '\n';
2122 }
2123
2124 } nxt_list_loop;
2125
2126 *p++ = '\r'; *p++ = '\n';
2127 header->mem.free = p;
2128 size = p - header->mem.pos;
2129
2130 c = peer->proto.h1->conn;
2131 c->write = header;
2132 c->write_state = &nxt_h1p_peer_header_send_state;
2133
2134 if (r->body != NULL) {
2135 body = nxt_buf_mem_alloc(r->mem_pool, 0, 0);
2136 if (nxt_slow_path(body == NULL)) {
2137 r->state->error_handler(task, r, peer);
2138 return;
2139 }
2140
2141 header->next = body;
2142
2143 body->mem = r->body->mem;
2144 size += nxt_buf_mem_used_size(&body->mem);
2145
2146// nxt_mp_retain(r->mem_pool);
2147 }
2148
2149 if (size > 16384) {
2150 /* Use proxy_send_timeout instead of proxy_timeout. */
2151 c->write_state = &nxt_h1p_peer_header_body_send_state;
2152 }
2153
2154 nxt_conn_write(task->thread->engine, c);
2155}
2156
2157
2158static const nxt_conn_state_t nxt_h1p_peer_header_send_state
2159 nxt_aligned(64) =
2160{
2161 .ready_handler = nxt_h1p_peer_header_sent,
2162 .error_handler = nxt_h1p_peer_error,
2163
2164 .timer_handler = nxt_h1p_peer_send_timeout,
2165 .timer_value = nxt_h1p_peer_timer_value,
2166 .timer_data = offsetof(nxt_socket_conf_t, proxy_timeout),
2167};
2168
2169
2170static const nxt_conn_state_t nxt_h1p_peer_header_body_send_state
2171 nxt_aligned(64) =
2172{
2173 .ready_handler = nxt_h1p_peer_header_sent,
2174 .error_handler = nxt_h1p_peer_error,
2175
2176 .timer_handler = nxt_h1p_peer_send_timeout,
2177 .timer_value = nxt_h1p_peer_timer_value,
2178 .timer_data = offsetof(nxt_socket_conf_t, proxy_send_timeout),
2179 .timer_autoreset = 1,
2180};
2181
2182
2183static void
2184nxt_h1p_peer_header_sent(nxt_task_t *task, void *obj, void *data)
2185{
2186 nxt_conn_t *c;
2187 nxt_http_peer_t *peer;
2188 nxt_http_request_t *r;
2189 nxt_event_engine_t *engine;
2190
2191 c = obj;
2192 peer = data;
2193
2194 nxt_debug(task, "h1p peer header sent");
2195
2196 engine = task->thread->engine;
2197
2198 c->write = nxt_sendbuf_completion(task, &engine->fast_work_queue, c->write);
2199
2200 if (c->write == NULL) {
2201 r = peer->request;
2202 r->state->ready_handler(task, r, peer);
2203 return;
2204 }
2205
2206 nxt_conn_write(engine, c);
2207}
2208
2209
2210static void
2211nxt_h1p_peer_header_read(nxt_task_t *task, nxt_http_peer_t *peer)
2212{
2213 nxt_conn_t *c;
2214
2215 nxt_debug(task, "h1p peer header read");
2216
2217 c = peer->proto.h1->conn;
2218
2219 if (c->write_timer.enabled) {
2220 c->read_state = &nxt_h1p_peer_header_read_state;
2221
2222 } else {
2223 c->read_state = &nxt_h1p_peer_header_read_timer_state;
2224 }
2225
2226 nxt_conn_read(task->thread->engine, c);
2227}
2228
2229
2230static const nxt_conn_state_t nxt_h1p_peer_header_read_state
2231 nxt_aligned(64) =
2232{
2233 .ready_handler = nxt_h1p_peer_header_read_done,
2234 .close_handler = nxt_h1p_peer_closed,
2235 .error_handler = nxt_h1p_peer_error,
2236
2237 .io_read_handler = nxt_h1p_peer_io_read_handler,
2238};
2239
2240
2241static const nxt_conn_state_t nxt_h1p_peer_header_read_timer_state
2242 nxt_aligned(64) =
2243{
2244 .ready_handler = nxt_h1p_peer_header_read_done,
2245 .close_handler = nxt_h1p_peer_closed,
2246 .error_handler = nxt_h1p_peer_error,
2247
2248 .io_read_handler = nxt_h1p_peer_io_read_handler,
2249
2250 .timer_handler = nxt_h1p_peer_read_timeout,
2251 .timer_value = nxt_h1p_peer_timer_value,
2252 .timer_data = offsetof(nxt_socket_conf_t, proxy_timeout),
2253};
2254
2255
2256static ssize_t
2257nxt_h1p_peer_io_read_handler(nxt_task_t *task, nxt_conn_t *c)
2258{
2259 size_t size;
2260 ssize_t n;
2261 nxt_buf_t *b;
2262 nxt_http_peer_t *peer;
2263 nxt_socket_conf_t *skcf;
2264 nxt_http_request_t *r;
2265
2266 peer = c->socket.data;
2267 r = peer->request;
2268 b = c->read;
2269
2270 if (b == NULL) {
2271 skcf = r->conf->socket_conf;
2272
2273 size = (peer->header_received) ? skcf->proxy_buffer_size
2274 : skcf->proxy_header_buffer_size;
2275
2276 nxt_debug(task, "h1p peer io read: %z", size);
2277
2278 b = nxt_http_proxy_buf_mem_alloc(task, r, size);
2279 if (nxt_slow_path(b == NULL)) {
2280 c->socket.error = NXT_ENOMEM;
2281 return NXT_ERROR;
2282 }
2283 }
2284
2285 n = c->io->recvbuf(c, b);
2286
2287 if (n > 0) {
2288 c->read = b;
2289
2290 } else {
2291 c->read = NULL;
2292 nxt_http_proxy_buf_mem_free(task, r, b);
2293 }
2294
2295 return n;
2296}
2297
2298
2299static void
2300nxt_h1p_peer_header_read_done(nxt_task_t *task, void *obj, void *data)
2301{
2302 nxt_int_t ret;
2303 nxt_buf_t *b;
2304 nxt_conn_t *c;
2305 nxt_http_peer_t *peer;
2306 nxt_http_request_t *r;
2307 nxt_event_engine_t *engine;
2308
2309 c = obj;
2310 peer = data;
2311
2312 nxt_debug(task, "h1p peer header read done");
2313
2314 b = c->read;
2315
2316 ret = nxt_h1p_peer_header_parse(peer, &b->mem);
2317
2318 r = peer->request;
2319
2320 ret = nxt_expect(NXT_DONE, ret);
2321
2322 if (ret != NXT_AGAIN) {
2323 engine = task->thread->engine;
2324 nxt_timer_disable(engine, &c->write_timer);
2325 nxt_timer_disable(engine, &c->read_timer);
2326 }
2327
2328 switch (ret) {
2329
2330 case NXT_DONE:
2331 peer->fields = peer->proto.h1->parser.fields;
2332
2333 ret = nxt_http_fields_process(peer->fields,
2334 &nxt_h1p_peer_fields_hash, r);
2335 if (nxt_slow_path(ret != NXT_OK)) {
2336 peer->status = NXT_HTTP_INTERNAL_SERVER_ERROR;
2337 break;
2338 }
2339
2340 c->read = NULL;
2341
2342 if (nxt_buf_mem_used_size(&b->mem) != 0) {
2343 peer->body = b;
2344 }
2345
2346 peer->header_received = 1;
2347
2348 r->state->ready_handler(task, r, peer);
2349 return;
2350
2351 case NXT_AGAIN:
2352 if (nxt_buf_mem_free_size(&b->mem) != 0) {
2353 nxt_conn_read(task->thread->engine, c);
2354 return;
2355 }
2356
2357 /* Fall through. */
2358
2359 default:
2360 case NXT_ERROR:
2361 case NXT_HTTP_PARSE_INVALID:
2362 case NXT_HTTP_PARSE_UNSUPPORTED_VERSION:
2363 case NXT_HTTP_PARSE_TOO_LARGE_FIELD:
2364 peer->status = NXT_HTTP_BAD_GATEWAY;
2365 break;
2366 }
2367
2368 nxt_http_proxy_buf_mem_free(task, r, b);
2369
2370 r->state->error_handler(task, r, peer);
2371}
2372
2373
2374static nxt_int_t
2375nxt_h1p_peer_header_parse(nxt_http_peer_t *peer, nxt_buf_mem_t *bm)
2376{
2377 u_char *p;
2378 size_t length;
2379 nxt_int_t status;
2380
2381 if (peer->status < 0) {
2382 length = nxt_buf_mem_used_size(bm);
2383
2384 if (nxt_slow_path(length < 12)) {
2385 return NXT_AGAIN;
2386 }
2387
2388 p = bm->pos;
2389
2390 if (nxt_slow_path(nxt_memcmp(p, "HTTP/1.", 7) != 0
2391 || (p[7] != '0' && p[7] != '1')))
2392 {
2393 return NXT_ERROR;
2394 }
2395
2396 status = nxt_int_parse(&p[9], 3);
2397
2398 if (nxt_slow_path(status < 0)) {
2399 return NXT_ERROR;
2400 }
2401
2402 p += 12;
2403 length -= 12;
2404
2405 p = nxt_memchr(p, '\n', length);
2406
2407 if (nxt_slow_path(p == NULL)) {
2408 return NXT_AGAIN;
2409 }
2410
2411 bm->pos = p + 1;
2412 peer->status = status;
2413 }
2414
2415 return nxt_http_parse_fields(&peer->proto.h1->parser, bm);
2416}
2417
2418
2419static void
2420nxt_h1p_peer_read(nxt_task_t *task, nxt_http_peer_t *peer)
2421{
2422 nxt_conn_t *c;
2423
2424 nxt_debug(task, "h1p peer read");
2425
2426 c = peer->proto.h1->conn;
2427 c->read_state = &nxt_h1p_peer_read_state;
2428
2429 nxt_conn_read(task->thread->engine, c);
2430}
2431
2432
2433static const nxt_conn_state_t nxt_h1p_peer_read_state
2434 nxt_aligned(64) =
2435{
2436 .ready_handler = nxt_h1p_peer_read_done,
2437 .close_handler = nxt_h1p_peer_closed,
2438 .error_handler = nxt_h1p_peer_error,
2439
2440 .io_read_handler = nxt_h1p_peer_io_read_handler,
2441
2442 .timer_handler = nxt_h1p_peer_read_timeout,
2443 .timer_value = nxt_h1p_peer_timer_value,
2444 .timer_data = offsetof(nxt_socket_conf_t, proxy_read_timeout),
2445 .timer_autoreset = 1,
2446};
2447
2448
2449static void
2450nxt_h1p_peer_read_done(nxt_task_t *task, void *obj, void *data)
2451{
2452 nxt_conn_t *c;
2453 nxt_http_peer_t *peer;
2454 nxt_http_request_t *r;
2455
2456 c = obj;
2457 peer = data;
2458
2459 nxt_debug(task, "h1p peer read done");
2460
2461 peer->body = c->read;
2462 c->read = NULL;
2463
2464 r = peer->request;
2465 r->state->ready_handler(task, r, peer);
2466}
2467
2468
2469static void
2470nxt_h1p_peer_closed(nxt_task_t *task, void *obj, void *data)
2471{
2472 nxt_http_peer_t *peer;
2473 nxt_http_request_t *r;
2474
2475 peer = data;
2476
2477 nxt_debug(task, "h1p peer closed");
2478
2479 r = peer->request;
2480
2481 if (peer->header_received) {
2482 peer->body = nxt_http_buf_last(r);
2483
2484 peer->closed = 1;
2485
2486 r->state->ready_handler(task, r, peer);
2487
2488 } else {
2489 peer->status = NXT_HTTP_BAD_GATEWAY;
2490
2491 r->state->error_handler(task, r, peer);
2492 }
2493}
2494
2495
2496static void
2497nxt_h1p_peer_error(nxt_task_t *task, void *obj, void *data)
2498{
2499 nxt_http_peer_t *peer;
2500 nxt_http_request_t *r;
2501
2502 peer = data;
2503
2504 nxt_debug(task, "h1p peer error");
2505
2506 peer->status = NXT_HTTP_BAD_GATEWAY;
2507
2508 r = peer->request;
2509 r->state->error_handler(task, r, peer);
2510}
2511
2512
2513static void
2514nxt_h1p_peer_send_timeout(nxt_task_t *task, void *obj, void *data)
2515{
2516 nxt_conn_t *c;
2517 nxt_timer_t *timer;
2518 nxt_http_peer_t *peer;
2519 nxt_http_request_t *r;
2520
2521 timer = obj;
2522
2523 nxt_debug(task, "h1p peer send timeout");
2524
2525 c = nxt_write_timer_conn(timer);
2526 c->block_write = 1;
2527 c->block_read = 1;
2528
2529 peer = c->socket.data;
2530 peer->status = NXT_HTTP_GATEWAY_TIMEOUT;
2531
2532 r = peer->request;
2533 r->state->error_handler(task, r, peer);
2534}
2535
2536
2537static void
2538nxt_h1p_peer_read_timeout(nxt_task_t *task, void *obj, void *data)
2539{
2540 nxt_conn_t *c;
2541 nxt_timer_t *timer;
2542 nxt_http_peer_t *peer;
2543 nxt_http_request_t *r;
2544
2545 timer = obj;
2546
2547 nxt_debug(task, "h1p peer read timeout");
2548
2549 c = nxt_read_timer_conn(timer);
2550 c->block_write = 1;
2551 c->block_read = 1;
2552
2553 peer = c->socket.data;
2554 peer->status = NXT_HTTP_GATEWAY_TIMEOUT;
2555
2556 r = peer->request;
2557 r->state->error_handler(task, r, peer);
2558}
2559
2560
2561static nxt_msec_t
2562nxt_h1p_peer_timer_value(nxt_conn_t *c, uintptr_t data)
2563{
2564 nxt_http_peer_t *peer;
2565
2566 peer = c->socket.data;
2567
2568 return nxt_value_at(nxt_msec_t, peer->request->conf->socket_conf, data);
2569}
2570
2571
2572static void
2573nxt_h1p_peer_close(nxt_task_t *task, nxt_http_peer_t *peer)
2574{
2575 nxt_conn_t *c;
2576
2577 nxt_debug(task, "h1p peer close");
2578
2579 peer->closed = 1;
2580
2581 c = peer->proto.h1->conn;
2582 task = &c->task;
2583 c->socket.task = task;
2584 c->read_timer.task = task;
2585 c->write_timer.task = task;
2586
2587 if (c->socket.fd != -1) {
2588 c->write_state = &nxt_h1p_peer_close_state;
2589
2590 nxt_conn_close(task->thread->engine, c);
2591
2592 } else {
2593 nxt_h1p_peer_free(task, c, NULL);
2594 }
2595}
2596
2597
2598static const nxt_conn_state_t nxt_h1p_peer_close_state
2599 nxt_aligned(64) =
2600{
2601 .ready_handler = nxt_h1p_peer_free,
2602};
2603
2604
2605static void
2606nxt_h1p_peer_free(nxt_task_t *task, void *obj, void *data)
2607{
2608 nxt_conn_t *c;
2609
2610 c = obj;
2611
2612 nxt_debug(task, "h1p peer free");
2613
2614 nxt_conn_free(task, c);
2615}