xref: /unit/src/nxt_h1proto.c (revision 2505:450bf52c3a2c)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_router.h>
8 #include <nxt_http.h>
9 #include <nxt_upstream.h>
10 #include <nxt_h1proto.h>
11 #include <nxt_websocket.h>
12 #include <nxt_websocket_header.h>
13 
14 
15 /*
16  * nxt_http_conn_ and nxt_h1p_conn_ prefixes are used for connection handlers.
17  * nxt_h1p_idle_ prefix is used for idle connection handlers.
18  * nxt_h1p_request_ prefix is used for HTTP/1 protocol request methods.
19  */
20 
21 #if (NXT_TLS)
22 static ssize_t nxt_http_idle_io_read_handler(nxt_task_t *task, nxt_conn_t *c);
23 static void nxt_http_conn_test(nxt_task_t *task, void *obj, void *data);
24 #endif
25 static ssize_t nxt_h1p_idle_io_read_handler(nxt_task_t *task, nxt_conn_t *c);
26 static void nxt_h1p_conn_proto_init(nxt_task_t *task, void *obj, void *data);
27 static void nxt_h1p_conn_request_init(nxt_task_t *task, void *obj, void *data);
28 static void nxt_h1p_conn_request_header_parse(nxt_task_t *task, void *obj,
29     void *data);
30 static nxt_int_t nxt_h1p_header_process(nxt_task_t *task, nxt_h1proto_t *h1p,
31     nxt_http_request_t *r);
32 static nxt_int_t nxt_h1p_header_buffer_test(nxt_task_t *task,
33     nxt_h1proto_t *h1p, nxt_conn_t *c, nxt_socket_conf_t *skcf);
34 static nxt_int_t nxt_h1p_connection(void *ctx, nxt_http_field_t *field,
35     uintptr_t data);
36 static nxt_int_t nxt_h1p_upgrade(void *ctx, nxt_http_field_t *field,
37     uintptr_t data);
38 static nxt_int_t nxt_h1p_websocket_key(void *ctx, nxt_http_field_t *field,
39     uintptr_t data);
40 static nxt_int_t nxt_h1p_websocket_version(void *ctx, nxt_http_field_t *field,
41     uintptr_t data);
42 static nxt_int_t nxt_h1p_transfer_encoding(void *ctx, nxt_http_field_t *field,
43     uintptr_t data);
44 static void nxt_h1p_request_body_read(nxt_task_t *task, nxt_http_request_t *r);
45 static void nxt_h1p_conn_request_body_read(nxt_task_t *task, void *obj,
46     void *data);
47 static void nxt_h1p_request_local_addr(nxt_task_t *task, nxt_http_request_t *r);
48 static void nxt_h1p_request_header_send(nxt_task_t *task,
49     nxt_http_request_t *r, nxt_work_handler_t body_handler, void *data);
50 static void nxt_h1p_request_send(nxt_task_t *task, nxt_http_request_t *r,
51     nxt_buf_t *out);
52 static nxt_buf_t *nxt_h1p_chunk_create(nxt_task_t *task, nxt_http_request_t *r,
53     nxt_buf_t *out);
54 static nxt_off_t nxt_h1p_request_body_bytes_sent(nxt_task_t *task,
55     nxt_http_proto_t proto);
56 static void nxt_h1p_request_discard(nxt_task_t *task, nxt_http_request_t *r,
57     nxt_buf_t *last);
58 static void nxt_h1p_conn_request_error(nxt_task_t *task, void *obj, void *data);
59 static void nxt_h1p_conn_request_timeout(nxt_task_t *task, void *obj,
60     void *data);
61 static void nxt_h1p_conn_request_send_timeout(nxt_task_t *task, void *obj,
62     void *data);
63 nxt_inline void nxt_h1p_request_error(nxt_task_t *task, nxt_h1proto_t *h1p,
64     nxt_http_request_t *r);
65 static void nxt_h1p_request_close(nxt_task_t *task, nxt_http_proto_t proto,
66     nxt_socket_conf_joint_t *joint);
67 static void nxt_h1p_conn_sent(nxt_task_t *task, void *obj, void *data);
68 static void nxt_h1p_conn_close(nxt_task_t *task, void *obj, void *data);
69 static void nxt_h1p_conn_error(nxt_task_t *task, void *obj, void *data);
70 static nxt_msec_t nxt_h1p_conn_timer_value(nxt_conn_t *c, uintptr_t data);
71 static void nxt_h1p_keepalive(nxt_task_t *task, nxt_h1proto_t *h1p,
72     nxt_conn_t *c);
73 static void nxt_h1p_idle_close(nxt_task_t *task, void *obj, void *data);
74 static void nxt_h1p_idle_timeout(nxt_task_t *task, void *obj, void *data);
75 static void nxt_h1p_idle_response(nxt_task_t *task, nxt_conn_t *c);
76 static void nxt_h1p_idle_response_sent(nxt_task_t *task, void *obj, void *data);
77 static void nxt_h1p_idle_response_error(nxt_task_t *task, void *obj,
78     void *data);
79 static void nxt_h1p_idle_response_timeout(nxt_task_t *task, void *obj,
80     void *data);
81 static nxt_msec_t nxt_h1p_idle_response_timer_value(nxt_conn_t *c,
82     uintptr_t data);
83 static void nxt_h1p_shutdown(nxt_task_t *task, nxt_conn_t *c);
84 static void nxt_h1p_closing(nxt_task_t *task, nxt_conn_t *c);
85 static void nxt_h1p_conn_ws_shutdown(nxt_task_t *task, void *obj, void *data);
86 static void nxt_h1p_conn_closing(nxt_task_t *task, void *obj, void *data);
87 static void nxt_h1p_conn_free(nxt_task_t *task, void *obj, void *data);
88 
89 static void nxt_h1p_peer_connect(nxt_task_t *task, nxt_http_peer_t *peer);
90 static void nxt_h1p_peer_connected(nxt_task_t *task, void *obj, void *data);
91 static void nxt_h1p_peer_refused(nxt_task_t *task, void *obj, void *data);
92 static void nxt_h1p_peer_header_send(nxt_task_t *task, nxt_http_peer_t *peer);
93 static void nxt_h1p_peer_header_sent(nxt_task_t *task, void *obj, void *data);
94 static void nxt_h1p_peer_header_read(nxt_task_t *task, nxt_http_peer_t *peer);
95 static ssize_t nxt_h1p_peer_io_read_handler(nxt_task_t *task, nxt_conn_t *c);
96 static void nxt_h1p_peer_header_read_done(nxt_task_t *task, void *obj,
97     void *data);
98 static nxt_int_t nxt_h1p_peer_header_parse(nxt_http_peer_t *peer,
99     nxt_buf_mem_t *bm);
100 static void nxt_h1p_peer_read(nxt_task_t *task, nxt_http_peer_t *peer);
101 static void nxt_h1p_peer_read_done(nxt_task_t *task, void *obj, void *data);
102 static void nxt_h1p_peer_body_process(nxt_task_t *task, nxt_http_peer_t *peer, nxt_buf_t *out);
103 static void nxt_h1p_peer_closed(nxt_task_t *task, void *obj, void *data);
104 static void nxt_h1p_peer_error(nxt_task_t *task, void *obj, void *data);
105 static void nxt_h1p_peer_send_timeout(nxt_task_t *task, void *obj, void *data);
106 static void nxt_h1p_peer_read_timeout(nxt_task_t *task, void *obj, void *data);
107 static nxt_msec_t nxt_h1p_peer_timer_value(nxt_conn_t *c, uintptr_t data);
108 static void nxt_h1p_peer_close(nxt_task_t *task, nxt_http_peer_t *peer);
109 static void nxt_h1p_peer_free(nxt_task_t *task, void *obj, void *data);
110 static nxt_int_t nxt_h1p_peer_transfer_encoding(void *ctx,
111     nxt_http_field_t *field, uintptr_t data);
112 
113 #if (NXT_TLS)
114 static const nxt_conn_state_t  nxt_http_idle_state;
115 static const nxt_conn_state_t  nxt_h1p_shutdown_state;
116 #endif
117 static const nxt_conn_state_t  nxt_h1p_idle_state;
118 static const nxt_conn_state_t  nxt_h1p_header_parse_state;
119 static const nxt_conn_state_t  nxt_h1p_read_body_state;
120 static const nxt_conn_state_t  nxt_h1p_request_send_state;
121 static const nxt_conn_state_t  nxt_h1p_timeout_response_state;
122 static const nxt_conn_state_t  nxt_h1p_keepalive_state;
123 static const nxt_conn_state_t  nxt_h1p_close_state;
124 static const nxt_conn_state_t  nxt_h1p_peer_connect_state;
125 static const nxt_conn_state_t  nxt_h1p_peer_header_send_state;
126 static const nxt_conn_state_t  nxt_h1p_peer_header_body_send_state;
127 static const nxt_conn_state_t  nxt_h1p_peer_header_read_state;
128 static const nxt_conn_state_t  nxt_h1p_peer_header_read_timer_state;
129 static const nxt_conn_state_t  nxt_h1p_peer_read_state;
130 static const nxt_conn_state_t  nxt_h1p_peer_close_state;
131 
132 
133 const nxt_http_proto_table_t  nxt_http_proto[3] = {
134     /* NXT_HTTP_PROTO_H1 */
135     {
136         .body_read        = nxt_h1p_request_body_read,
137         .local_addr       = nxt_h1p_request_local_addr,
138         .header_send      = nxt_h1p_request_header_send,
139         .send             = nxt_h1p_request_send,
140         .body_bytes_sent  = nxt_h1p_request_body_bytes_sent,
141         .discard          = nxt_h1p_request_discard,
142         .close            = nxt_h1p_request_close,
143 
144         .peer_connect     = nxt_h1p_peer_connect,
145         .peer_header_send = nxt_h1p_peer_header_send,
146         .peer_header_read = nxt_h1p_peer_header_read,
147         .peer_read        = nxt_h1p_peer_read,
148         .peer_close       = nxt_h1p_peer_close,
149 
150         .ws_frame_start   = nxt_h1p_websocket_frame_start,
151     },
152     /* NXT_HTTP_PROTO_H2      */
153     /* NXT_HTTP_PROTO_DEVNULL */
154 };
155 
156 
157 static nxt_lvlhsh_t                    nxt_h1p_fields_hash;
158 
159 static nxt_http_field_proc_t           nxt_h1p_fields[] = {
160     { nxt_string("Connection"),        &nxt_h1p_connection, 0 },
161     { nxt_string("Upgrade"),           &nxt_h1p_upgrade, 0 },
162     { nxt_string("Sec-WebSocket-Key"), &nxt_h1p_websocket_key, 0 },
163     { nxt_string("Sec-WebSocket-Version"),
164                                        &nxt_h1p_websocket_version, 0 },
165     { nxt_string("Transfer-Encoding"), &nxt_h1p_transfer_encoding, 0 },
166 
167     { nxt_string("Host"),              &nxt_http_request_host, 0 },
168     { nxt_string("Cookie"),            &nxt_http_request_field,
169         offsetof(nxt_http_request_t, cookie) },
170     { nxt_string("Referer"),           &nxt_http_request_field,
171         offsetof(nxt_http_request_t, referer) },
172     { nxt_string("User-Agent"),        &nxt_http_request_field,
173         offsetof(nxt_http_request_t, user_agent) },
174     { nxt_string("Content-Type"),      &nxt_http_request_field,
175         offsetof(nxt_http_request_t, content_type) },
176     { nxt_string("Content-Length"),    &nxt_http_request_content_length, 0 },
177     { nxt_string("Authorization"),     &nxt_http_request_field,
178         offsetof(nxt_http_request_t, authorization) },
179 };
180 
181 
182 static nxt_lvlhsh_t                    nxt_h1p_peer_fields_hash;
183 
184 static nxt_http_field_proc_t           nxt_h1p_peer_fields[] = {
185     { nxt_string("Connection"),        &nxt_http_proxy_skip, 0 },
186     { nxt_string("Transfer-Encoding"), &nxt_h1p_peer_transfer_encoding, 0 },
187     { nxt_string("Server"),            &nxt_http_proxy_skip, 0 },
188     { nxt_string("Date"),              &nxt_http_proxy_date, 0 },
189     { nxt_string("Content-Length"),    &nxt_http_proxy_content_length, 0 },
190 };
191 
192 
193 nxt_int_t
nxt_h1p_init(nxt_task_t * task)194 nxt_h1p_init(nxt_task_t *task)
195 {
196     nxt_int_t  ret;
197 
198     ret = nxt_http_fields_hash(&nxt_h1p_fields_hash,
199                                nxt_h1p_fields, nxt_nitems(nxt_h1p_fields));
200 
201     if (nxt_fast_path(ret == NXT_OK)) {
202         ret = nxt_http_fields_hash(&nxt_h1p_peer_fields_hash,
203                                    nxt_h1p_peer_fields,
204                                    nxt_nitems(nxt_h1p_peer_fields));
205     }
206 
207     return ret;
208 }
209 
210 
211 void
nxt_http_conn_init(nxt_task_t * task,void * obj,void * data)212 nxt_http_conn_init(nxt_task_t *task, void *obj, void *data)
213 {
214     nxt_conn_t               *c;
215     nxt_socket_conf_t        *skcf;
216     nxt_event_engine_t       *engine;
217     nxt_listen_event_t       *lev;
218     nxt_socket_conf_joint_t  *joint;
219 
220     c = obj;
221     lev = data;
222 
223     nxt_debug(task, "http conn init");
224 
225     joint = lev->socket.data;
226     skcf = joint->socket_conf;
227     c->local = skcf->sockaddr;
228 
229     engine = task->thread->engine;
230     c->read_work_queue = &engine->fast_work_queue;
231     c->write_work_queue = &engine->fast_work_queue;
232 
233     c->read_state = &nxt_h1p_idle_state;
234 
235 #if (NXT_TLS)
236     if (skcf->tls != NULL) {
237         c->read_state = &nxt_http_idle_state;
238     }
239 #endif
240 
241     nxt_conn_read(engine, c);
242 }
243 
244 
245 #if (NXT_TLS)
246 
247 static const nxt_conn_state_t  nxt_http_idle_state
248     nxt_aligned(64) =
249 {
250     .ready_handler = nxt_http_conn_test,
251     .close_handler = nxt_h1p_conn_close,
252     .error_handler = nxt_h1p_conn_error,
253 
254     .io_read_handler = nxt_http_idle_io_read_handler,
255 
256     .timer_handler = nxt_h1p_idle_timeout,
257     .timer_value = nxt_h1p_conn_timer_value,
258     .timer_data = offsetof(nxt_socket_conf_t, idle_timeout),
259 };
260 
261 
262 static ssize_t
nxt_http_idle_io_read_handler(nxt_task_t * task,nxt_conn_t * c)263 nxt_http_idle_io_read_handler(nxt_task_t *task, nxt_conn_t *c)
264 {
265     size_t                   size;
266     ssize_t                  n;
267     nxt_buf_t                *b;
268     nxt_socket_conf_joint_t  *joint;
269 
270     joint = c->listen->socket.data;
271 
272     if (nxt_slow_path(joint == NULL)) {
273         /*
274          * Listening socket had been closed while
275          * connection was in keep-alive state.
276          */
277         c->read_state = &nxt_h1p_idle_close_state;
278         return 0;
279     }
280 
281     size = joint->socket_conf->header_buffer_size;
282 
283     b = nxt_event_engine_buf_mem_alloc(task->thread->engine, size);
284     if (nxt_slow_path(b == NULL)) {
285         c->socket.error = NXT_ENOMEM;
286         return NXT_ERROR;
287     }
288 
289     /*
290      * 1 byte is enough to distinguish between SSLv3/TLS and plain HTTP.
291      * 11 bytes are enough to log supported SSLv3/TLS version.
292      * 16 bytes are just for more optimized kernel copy-out operation.
293      */
294     n = c->io->recv(c, b->mem.pos, 16, MSG_PEEK);
295 
296     if (n > 0) {
297         c->read = b;
298 
299     } else {
300         c->read = NULL;
301         nxt_event_engine_buf_mem_free(task->thread->engine, b);
302     }
303 
304     return n;
305 }
306 
307 
308 static void
nxt_http_conn_test(nxt_task_t * task,void * obj,void * data)309 nxt_http_conn_test(nxt_task_t *task, void *obj, void *data)
310 {
311     u_char                   *p;
312     nxt_buf_t                *b;
313     nxt_conn_t               *c;
314     nxt_tls_conf_t           *tls;
315     nxt_event_engine_t       *engine;
316     nxt_socket_conf_joint_t  *joint;
317 
318     c = obj;
319 
320     nxt_debug(task, "h1p conn https test");
321 
322     engine = task->thread->engine;
323     b = c->read;
324     p = b->mem.pos;
325 
326     c->read_state = &nxt_h1p_idle_state;
327 
328     if (p[0] != 0x16) {
329         b->mem.free = b->mem.pos;
330 
331         nxt_conn_read(engine, c);
332         return;
333     }
334 
335     /* SSLv3/TLS ClientHello message. */
336 
337 #if (NXT_DEBUG)
338     if (nxt_buf_mem_used_size(&b->mem) >= 11) {
339         u_char      major, minor;
340         const char  *protocol;
341 
342         major = p[9];
343         minor = p[10];
344 
345         if (major == 3) {
346             if (minor == 0) {
347                 protocol = "SSLv";
348 
349             } else {
350                 protocol = "TLSv";
351                 major -= 2;
352                 minor -= 1;
353             }
354 
355             nxt_debug(task, "SSL/TLS: %s%ud.%ud", protocol, major, minor);
356         }
357     }
358 #endif
359 
360     c->read = NULL;
361     nxt_event_engine_buf_mem_free(engine, b);
362 
363     joint = c->listen->socket.data;
364 
365     if (nxt_slow_path(joint == NULL)) {
366         /*
367          * Listening socket had been closed while
368          * connection was in keep-alive state.
369          */
370         nxt_h1p_closing(task, c);
371         return;
372     }
373 
374     tls = joint->socket_conf->tls;
375 
376     tls->conn_init(task, tls, c);
377 }
378 
379 #endif
380 
381 
382 static const nxt_conn_state_t  nxt_h1p_idle_state
383     nxt_aligned(64) =
384 {
385     .ready_handler = nxt_h1p_conn_proto_init,
386     .close_handler = nxt_h1p_conn_close,
387     .error_handler = nxt_h1p_conn_error,
388 
389     .io_read_handler = nxt_h1p_idle_io_read_handler,
390 
391     .timer_handler = nxt_h1p_idle_timeout,
392     .timer_value = nxt_h1p_conn_timer_value,
393     .timer_data = offsetof(nxt_socket_conf_t, idle_timeout),
394     .timer_autoreset = 1,
395 };
396 
397 
398 static ssize_t
nxt_h1p_idle_io_read_handler(nxt_task_t * task,nxt_conn_t * c)399 nxt_h1p_idle_io_read_handler(nxt_task_t *task, nxt_conn_t *c)
400 {
401     size_t                   size;
402     ssize_t                  n;
403     nxt_buf_t                *b;
404     nxt_socket_conf_joint_t  *joint;
405 
406     joint = c->listen->socket.data;
407 
408     if (nxt_slow_path(joint == NULL)) {
409         /*
410          * Listening socket had been closed while
411          * connection was in keep-alive state.
412          */
413         c->read_state = &nxt_h1p_idle_close_state;
414         return 0;
415     }
416 
417     b = c->read;
418 
419     if (b == NULL) {
420         size = joint->socket_conf->header_buffer_size;
421 
422         b = nxt_event_engine_buf_mem_alloc(task->thread->engine, size);
423         if (nxt_slow_path(b == NULL)) {
424             c->socket.error = NXT_ENOMEM;
425             return NXT_ERROR;
426         }
427     }
428 
429     n = c->io->recvbuf(c, b);
430 
431     if (n > 0) {
432         c->read = b;
433 
434     } else {
435         c->read = NULL;
436         nxt_event_engine_buf_mem_free(task->thread->engine, b);
437     }
438 
439     return n;
440 }
441 
442 
443 static void
nxt_h1p_conn_proto_init(nxt_task_t * task,void * obj,void * data)444 nxt_h1p_conn_proto_init(nxt_task_t *task, void *obj, void *data)
445 {
446     nxt_conn_t     *c;
447     nxt_h1proto_t  *h1p;
448 
449     c = obj;
450 
451     nxt_debug(task, "h1p conn proto init");
452 
453     h1p = nxt_mp_zget(c->mem_pool, sizeof(nxt_h1proto_t));
454     if (nxt_slow_path(h1p == NULL)) {
455         nxt_h1p_closing(task, c);
456         return;
457     }
458 
459     c->socket.data = h1p;
460     h1p->conn = c;
461 
462     nxt_h1p_conn_request_init(task, c, h1p);
463 }
464 
465 
466 static void
nxt_h1p_conn_request_init(nxt_task_t * task,void * obj,void * data)467 nxt_h1p_conn_request_init(nxt_task_t *task, void *obj, void *data)
468 {
469     nxt_int_t                ret;
470     nxt_conn_t               *c;
471     nxt_h1proto_t            *h1p;
472     nxt_socket_conf_t        *skcf;
473     nxt_http_request_t       *r;
474     nxt_socket_conf_joint_t  *joint;
475 
476     c = obj;
477     h1p = data;
478 
479     nxt_debug(task, "h1p conn request init");
480 
481     nxt_conn_active(task->thread->engine, c);
482 
483     r = nxt_http_request_create(task);
484 
485     if (nxt_fast_path(r != NULL)) {
486         h1p->request = r;
487         r->proto.h1 = h1p;
488 
489         /* r->protocol = NXT_HTTP_PROTO_H1 is done by zeroing. */
490         r->remote = c->remote;
491 
492 #if (NXT_TLS)
493         r->tls = (c->u.tls != NULL);
494 #endif
495 
496         r->task = c->task;
497         task = &r->task;
498         c->socket.task = task;
499         c->read_timer.task = task;
500         c->write_timer.task = task;
501 
502         ret = nxt_http_parse_request_init(&h1p->parser, r->mem_pool);
503 
504         if (nxt_fast_path(ret == NXT_OK)) {
505             joint = c->listen->socket.data;
506             joint->count++;
507 
508             r->conf = joint;
509             skcf = joint->socket_conf;
510             r->log_route = skcf->log_route;
511 
512             if (c->local == NULL) {
513                 c->local = skcf->sockaddr;
514             }
515 
516             h1p->parser.discard_unsafe_fields = skcf->discard_unsafe_fields;
517 
518             nxt_h1p_conn_request_header_parse(task, c, h1p);
519             return;
520         }
521 
522         /*
523          * The request is very incomplete here,
524          * so "internal server error" useless here.
525          */
526         nxt_mp_release(r->mem_pool);
527     }
528 
529     nxt_h1p_closing(task, c);
530 }
531 
532 
533 static const nxt_conn_state_t  nxt_h1p_header_parse_state
534     nxt_aligned(64) =
535 {
536     .ready_handler = nxt_h1p_conn_request_header_parse,
537     .close_handler = nxt_h1p_conn_request_error,
538     .error_handler = nxt_h1p_conn_request_error,
539 
540     .timer_handler = nxt_h1p_conn_request_timeout,
541     .timer_value = nxt_h1p_conn_request_timer_value,
542     .timer_data = offsetof(nxt_socket_conf_t, header_read_timeout),
543 };
544 
545 
546 static void
nxt_h1p_conn_request_header_parse(nxt_task_t * task,void * obj,void * data)547 nxt_h1p_conn_request_header_parse(nxt_task_t *task, void *obj, void *data)
548 {
549     nxt_int_t           ret;
550     nxt_conn_t          *c;
551     nxt_h1proto_t       *h1p;
552     nxt_http_status_t   status;
553     nxt_http_request_t  *r;
554 
555     c = obj;
556     h1p = data;
557 
558     nxt_debug(task, "h1p conn header parse");
559 
560     ret = nxt_http_parse_request(&h1p->parser, &c->read->mem);
561 
562     ret = nxt_expect(NXT_DONE, ret);
563 
564     if (ret != NXT_AGAIN) {
565         nxt_timer_disable(task->thread->engine, &c->read_timer);
566     }
567 
568     r = h1p->request;
569 
570     switch (ret) {
571 
572     case NXT_DONE:
573         /*
574          * By default the keepalive mode is disabled in HTTP/1.0 and
575          * enabled in HTTP/1.1.  The mode can be overridden later by
576          * the "Connection" field processed in nxt_h1p_connection().
577          */
578         h1p->keepalive = (h1p->parser.version.s.minor != '0');
579 
580         r->request_line.start = h1p->parser.method.start;
581         r->request_line.length = h1p->parser.request_line_end
582                                  - r->request_line.start;
583 
584         if (nxt_slow_path(r->log_route)) {
585             nxt_log(task, NXT_LOG_NOTICE, "http request line \"%V\"",
586                     &r->request_line);
587         }
588 
589         ret = nxt_h1p_header_process(task, h1p, r);
590 
591         if (nxt_fast_path(ret == NXT_OK)) {
592 
593 #if (NXT_TLS)
594             if (c->u.tls == NULL && r->conf->socket_conf->tls != NULL) {
595                 status = NXT_HTTP_TO_HTTPS;
596                 goto error;
597             }
598 #endif
599 
600             r->state->ready_handler(task, r, NULL);
601             return;
602         }
603 
604         status = ret;
605         goto error;
606 
607     case NXT_AGAIN:
608         status = nxt_h1p_header_buffer_test(task, h1p, c, r->conf->socket_conf);
609 
610         if (nxt_fast_path(status == NXT_OK)) {
611             c->read_state = &nxt_h1p_header_parse_state;
612 
613             nxt_conn_read(task->thread->engine, c);
614             return;
615         }
616 
617         break;
618 
619     case NXT_HTTP_PARSE_INVALID:
620         status = NXT_HTTP_BAD_REQUEST;
621         break;
622 
623     case NXT_HTTP_PARSE_UNSUPPORTED_VERSION:
624         status = NXT_HTTP_VERSION_NOT_SUPPORTED;
625         break;
626 
627     case NXT_HTTP_PARSE_TOO_LARGE_FIELD:
628         status = NXT_HTTP_REQUEST_HEADER_FIELDS_TOO_LARGE;
629         break;
630 
631     default:
632     case NXT_ERROR:
633         status = NXT_HTTP_INTERNAL_SERVER_ERROR;
634         break;
635     }
636 
637     (void) nxt_h1p_header_process(task, h1p, r);
638 
639 error:
640 
641     h1p->keepalive = 0;
642 
643     nxt_http_request_error(task, r, status);
644 }
645 
646 
647 static nxt_int_t
nxt_h1p_header_process(nxt_task_t * task,nxt_h1proto_t * h1p,nxt_http_request_t * r)648 nxt_h1p_header_process(nxt_task_t *task, nxt_h1proto_t *h1p,
649     nxt_http_request_t *r)
650 {
651     u_char     *m;
652     nxt_int_t  ret;
653 
654     r->target.start = h1p->parser.target_start;
655     r->target.length = h1p->parser.target_end - h1p->parser.target_start;
656 
657     if (h1p->parser.version.ui64 != 0) {
658         r->version.start = h1p->parser.version.str;
659         r->version.length = sizeof(h1p->parser.version.str);
660     }
661 
662     r->method = &h1p->parser.method;
663     r->path = &h1p->parser.path;
664     r->args = &h1p->parser.args;
665 
666     r->fields = h1p->parser.fields;
667 
668     ret = nxt_http_fields_process(r->fields, &nxt_h1p_fields_hash, r);
669     if (nxt_slow_path(ret != NXT_OK)) {
670         return ret;
671     }
672 
673     if (h1p->connection_upgrade && h1p->upgrade_websocket) {
674         m = h1p->parser.method.start;
675 
676         if (nxt_slow_path(h1p->parser.method.length != 3
677                           || m[0] != 'G'
678                           || m[1] != 'E'
679                           || m[2] != 'T'))
680         {
681             nxt_log(task, NXT_LOG_INFO, "h1p upgrade: bad method");
682 
683             return NXT_HTTP_BAD_REQUEST;
684         }
685 
686         if (nxt_slow_path(h1p->parser.version.s.minor != '1')) {
687             nxt_log(task, NXT_LOG_INFO, "h1p upgrade: bad protocol version");
688 
689             return NXT_HTTP_BAD_REQUEST;
690         }
691 
692         if (nxt_slow_path(h1p->websocket_key == NULL)) {
693             nxt_log(task, NXT_LOG_INFO,
694                     "h1p upgrade: bad or absent websocket key");
695 
696             return NXT_HTTP_BAD_REQUEST;
697         }
698 
699         if (nxt_slow_path(h1p->websocket_version_ok == 0)) {
700             nxt_log(task, NXT_LOG_INFO,
701                     "h1p upgrade: bad or absent websocket version");
702 
703             return NXT_HTTP_UPGRADE_REQUIRED;
704         }
705 
706         r->websocket_handshake = 1;
707     }
708 
709     return ret;
710 }
711 
712 
713 static nxt_int_t
nxt_h1p_header_buffer_test(nxt_task_t * task,nxt_h1proto_t * h1p,nxt_conn_t * c,nxt_socket_conf_t * skcf)714 nxt_h1p_header_buffer_test(nxt_task_t *task, nxt_h1proto_t *h1p, nxt_conn_t *c,
715     nxt_socket_conf_t *skcf)
716 {
717     size_t     size, used;
718     nxt_buf_t  *in, *b;
719 
720     in = c->read;
721 
722     if (nxt_buf_mem_free_size(&in->mem) == 0) {
723         size = skcf->large_header_buffer_size;
724         used = nxt_buf_mem_used_size(&in->mem);
725 
726         if (size <= used || h1p->nbuffers >= skcf->large_header_buffers) {
727             return NXT_HTTP_REQUEST_HEADER_FIELDS_TOO_LARGE;
728         }
729 
730         b = nxt_buf_mem_alloc(c->mem_pool, size, 0);
731         if (nxt_slow_path(b == NULL)) {
732             return NXT_HTTP_INTERNAL_SERVER_ERROR;
733         }
734 
735         b->mem.free = nxt_cpymem(b->mem.pos, in->mem.pos, used);
736 
737         in->next = h1p->buffers;
738         h1p->buffers = in;
739         h1p->nbuffers++;
740 
741         c->read = b;
742     }
743 
744     return NXT_OK;
745 }
746 
747 
748 static nxt_int_t
nxt_h1p_connection(void * ctx,nxt_http_field_t * field,uintptr_t data)749 nxt_h1p_connection(void *ctx, nxt_http_field_t *field, uintptr_t data)
750 {
751     nxt_http_request_t  *r;
752 
753     r = ctx;
754     field->hopbyhop = 1;
755 
756     if (field->value_length == 5
757         && nxt_memcasecmp(field->value, "close", 5) == 0)
758     {
759         r->proto.h1->keepalive = 0;
760 
761     } else if (field->value_length == 10
762                && nxt_memcasecmp(field->value, "keep-alive", 10) == 0)
763     {
764         r->proto.h1->keepalive = 1;
765 
766     } else if (field->value_length == 7
767                && nxt_memcasecmp(field->value, "upgrade", 7) == 0)
768     {
769         r->proto.h1->connection_upgrade = 1;
770     }
771 
772     return NXT_OK;
773 }
774 
775 
776 static nxt_int_t
nxt_h1p_upgrade(void * ctx,nxt_http_field_t * field,uintptr_t data)777 nxt_h1p_upgrade(void *ctx, nxt_http_field_t *field, uintptr_t data)
778 {
779     nxt_http_request_t  *r;
780 
781     r = ctx;
782 
783     if (field->value_length == 9
784         && nxt_memcasecmp(field->value, "websocket", 9) == 0)
785     {
786         r->proto.h1->upgrade_websocket = 1;
787     }
788 
789     return NXT_OK;
790 }
791 
792 
793 static nxt_int_t
nxt_h1p_websocket_key(void * ctx,nxt_http_field_t * field,uintptr_t data)794 nxt_h1p_websocket_key(void *ctx, nxt_http_field_t *field, uintptr_t data)
795 {
796     nxt_http_request_t  *r;
797 
798     r = ctx;
799 
800     if (field->value_length == 24) {
801         r->proto.h1->websocket_key = field;
802     }
803 
804     return NXT_OK;
805 }
806 
807 
808 static nxt_int_t
nxt_h1p_websocket_version(void * ctx,nxt_http_field_t * field,uintptr_t data)809 nxt_h1p_websocket_version(void *ctx, nxt_http_field_t *field, uintptr_t data)
810 {
811     nxt_http_request_t  *r;
812 
813     r = ctx;
814 
815     if (field->value_length == 2
816         && field->value[0] == '1' && field->value[1] == '3')
817     {
818         r->proto.h1->websocket_version_ok = 1;
819     }
820 
821     return NXT_OK;
822 }
823 
824 
825 static nxt_int_t
nxt_h1p_transfer_encoding(void * ctx,nxt_http_field_t * field,uintptr_t data)826 nxt_h1p_transfer_encoding(void *ctx, nxt_http_field_t *field, uintptr_t data)
827 {
828     nxt_http_te_t       te;
829     nxt_http_request_t  *r;
830 
831     r = ctx;
832     field->skip = 1;
833     field->hopbyhop = 1;
834 
835     if (field->value_length == 7
836         && memcmp(field->value, "chunked", 7) == 0)
837     {
838         te = NXT_HTTP_TE_CHUNKED;
839 
840     } else {
841         te = NXT_HTTP_TE_UNSUPPORTED;
842     }
843 
844     r->proto.h1->transfer_encoding = te;
845 
846     return NXT_OK;
847 }
848 
849 
850 static void
nxt_h1p_request_body_read(nxt_task_t * task,nxt_http_request_t * r)851 nxt_h1p_request_body_read(nxt_task_t *task, nxt_http_request_t *r)
852 {
853     size_t             size, body_length, body_buffer_size, body_rest;
854     ssize_t            res;
855     nxt_str_t          *tmp_path, tmp_name;
856     nxt_buf_t          *in, *b;
857     nxt_conn_t         *c;
858     nxt_h1proto_t      *h1p;
859     nxt_http_status_t  status;
860 
861     static const nxt_str_t tmp_name_pattern = nxt_string("/req-XXXXXXXX");
862 
863     h1p = r->proto.h1;
864 
865     nxt_debug(task, "h1p request body read %O te:%d",
866               r->content_length_n, h1p->transfer_encoding);
867 
868     switch (h1p->transfer_encoding) {
869 
870     case NXT_HTTP_TE_CHUNKED:
871         status = NXT_HTTP_LENGTH_REQUIRED;
872         goto error;
873 
874     case NXT_HTTP_TE_UNSUPPORTED:
875         status = NXT_HTTP_NOT_IMPLEMENTED;
876         goto error;
877 
878     default:
879     case NXT_HTTP_TE_NONE:
880         break;
881     }
882 
883     if (r->content_length_n == -1 || r->content_length_n == 0) {
884         goto ready;
885     }
886 
887     body_length = (size_t) r->content_length_n;
888 
889     body_buffer_size = nxt_min(r->conf->socket_conf->body_buffer_size,
890                                body_length);
891 
892     if (body_length > body_buffer_size) {
893         tmp_path = &r->conf->socket_conf->body_temp_path;
894 
895         tmp_name.length = tmp_path->length + tmp_name_pattern.length;
896 
897         b = nxt_buf_file_alloc(r->mem_pool,
898                                body_buffer_size + sizeof(nxt_file_t)
899                                + tmp_name.length + 1, 0);
900 
901     } else {
902         /* This initialization required for CentOS 6, gcc 4.4.7. */
903         tmp_path = NULL;
904         tmp_name.length = 0;
905 
906         b = nxt_buf_mem_alloc(r->mem_pool, body_buffer_size, 0);
907     }
908 
909     if (nxt_slow_path(b == NULL)) {
910         status = NXT_HTTP_INTERNAL_SERVER_ERROR;
911         goto error;
912     }
913 
914     r->body = b;
915 
916     if (body_length > body_buffer_size) {
917         tmp_name.start = nxt_pointer_to(b->mem.start, sizeof(nxt_file_t));
918 
919         memcpy(tmp_name.start, tmp_path->start, tmp_path->length);
920         memcpy(tmp_name.start + tmp_path->length, tmp_name_pattern.start,
921                tmp_name_pattern.length);
922         tmp_name.start[tmp_name.length] = '\0';
923 
924         b->file = (nxt_file_t *) b->mem.start;
925         nxt_memzero(b->file, sizeof(nxt_file_t));
926         b->file->fd = -1;
927         b->file->size = body_length;
928 
929         b->mem.start += sizeof(nxt_file_t) + tmp_name.length + 1;
930         b->mem.pos = b->mem.start;
931         b->mem.free = b->mem.start;
932 
933         b->file->fd = mkstemp((char *) tmp_name.start);
934         if (nxt_slow_path(b->file->fd == -1)) {
935             nxt_alert(task, "mkstemp(%s) failed %E", tmp_name.start, nxt_errno);
936 
937             status = NXT_HTTP_INTERNAL_SERVER_ERROR;
938             goto error;
939         }
940 
941         nxt_debug(task, "create body tmp file \"%V\", %d",
942                   &tmp_name, b->file->fd);
943 
944         unlink((char *) tmp_name.start);
945     }
946 
947     body_rest = body_length;
948 
949     in = h1p->conn->read;
950 
951     size = nxt_buf_mem_used_size(&in->mem);
952 
953     if (size != 0) {
954         size = nxt_min(size, body_length);
955 
956         if (nxt_buf_is_file(b)) {
957             res = nxt_fd_write(b->file->fd, in->mem.pos, size);
958             if (nxt_slow_path(res < (ssize_t) size)) {
959                 status = NXT_HTTP_INTERNAL_SERVER_ERROR;
960                 goto error;
961             }
962 
963             b->file_end += size;
964 
965         } else {
966             size = nxt_min(body_buffer_size, size);
967             b->mem.free = nxt_cpymem(b->mem.free, in->mem.pos, size);
968         }
969 
970         in->mem.pos += size;
971         body_rest -= size;
972     }
973 
974     nxt_debug(task, "h1p body rest: %uz", body_rest);
975 
976     if (body_rest != 0) {
977         in->next = h1p->buffers;
978         h1p->buffers = in;
979         h1p->nbuffers++;
980 
981         c = h1p->conn;
982         c->read = b;
983         c->read_state = &nxt_h1p_read_body_state;
984 
985         nxt_conn_read(task->thread->engine, c);
986         return;
987     }
988 
989     if (nxt_buf_is_file(b)) {
990         b->mem.start = NULL;
991         b->mem.end = NULL;
992         b->mem.pos = NULL;
993         b->mem.free = NULL;
994     }
995 
996 ready:
997 
998     r->state->ready_handler(task, r, NULL);
999 
1000     return;
1001 
1002 error:
1003 
1004     h1p->keepalive = 0;
1005 
1006     nxt_http_request_error(task, r, status);
1007 }
1008 
1009 
1010 static const nxt_conn_state_t  nxt_h1p_read_body_state
1011     nxt_aligned(64) =
1012 {
1013     .ready_handler = nxt_h1p_conn_request_body_read,
1014     .close_handler = nxt_h1p_conn_request_error,
1015     .error_handler = nxt_h1p_conn_request_error,
1016 
1017     .timer_handler = nxt_h1p_conn_request_timeout,
1018     .timer_value = nxt_h1p_conn_request_timer_value,
1019     .timer_data = offsetof(nxt_socket_conf_t, body_read_timeout),
1020     .timer_autoreset = 1,
1021 };
1022 
1023 
1024 static void
nxt_h1p_conn_request_body_read(nxt_task_t * task,void * obj,void * data)1025 nxt_h1p_conn_request_body_read(nxt_task_t *task, void *obj, void *data)
1026 {
1027     size_t              size, body_rest;
1028     ssize_t             res;
1029     nxt_buf_t           *b;
1030     nxt_conn_t          *c;
1031     nxt_h1proto_t       *h1p;
1032     nxt_http_request_t  *r;
1033     nxt_event_engine_t  *engine;
1034 
1035     c = obj;
1036     h1p = data;
1037 
1038     nxt_debug(task, "h1p conn request body read");
1039 
1040     r = h1p->request;
1041 
1042     engine = task->thread->engine;
1043 
1044     b = c->read;
1045 
1046     if (nxt_buf_is_file(b)) {
1047         body_rest = b->file->size - b->file_end;
1048 
1049         size = nxt_buf_mem_used_size(&b->mem);
1050         size = nxt_min(size, body_rest);
1051 
1052         res = nxt_fd_write(b->file->fd, b->mem.pos, size);
1053         if (nxt_slow_path(res < (ssize_t) size)) {
1054             nxt_h1p_request_error(task, h1p, r);
1055             return;
1056         }
1057 
1058         b->file_end += size;
1059         body_rest -= res;
1060 
1061         b->mem.pos += size;
1062 
1063         if (b->mem.pos == b->mem.free) {
1064             if (body_rest >= (size_t) nxt_buf_mem_size(&b->mem)) {
1065                 b->mem.free = b->mem.start;
1066 
1067             } else {
1068                 /* This required to avoid reading next request. */
1069                 b->mem.free = b->mem.end - body_rest;
1070             }
1071 
1072             b->mem.pos = b->mem.free;
1073         }
1074 
1075     } else {
1076         body_rest = nxt_buf_mem_free_size(&c->read->mem);
1077     }
1078 
1079     nxt_debug(task, "h1p body rest: %uz", body_rest);
1080 
1081     if (body_rest != 0) {
1082         nxt_conn_read(engine, c);
1083 
1084     } else {
1085         if (nxt_buf_is_file(b)) {
1086             b->mem.start = NULL;
1087             b->mem.end = NULL;
1088             b->mem.pos = NULL;
1089             b->mem.free = NULL;
1090         }
1091 
1092         c->read = NULL;
1093 
1094         r->state->ready_handler(task, r, NULL);
1095     }
1096 }
1097 
1098 
1099 static void
nxt_h1p_request_local_addr(nxt_task_t * task,nxt_http_request_t * r)1100 nxt_h1p_request_local_addr(nxt_task_t *task, nxt_http_request_t *r)
1101 {
1102     r->local = nxt_conn_local_addr(task, r->proto.h1->conn);
1103 }
1104 
1105 
1106 #define NXT_HTTP_LAST_INFORMATIONAL                                           \
1107     (NXT_HTTP_CONTINUE + nxt_nitems(nxt_http_informational) - 1)
1108 
1109 static const nxt_str_t  nxt_http_informational[] = {
1110     nxt_string("HTTP/1.1 100 Continue\r\n"),
1111     nxt_string("HTTP/1.1 101 Switching Protocols\r\n"),
1112 };
1113 
1114 
1115 #define NXT_HTTP_LAST_SUCCESS                                                 \
1116     (NXT_HTTP_OK + nxt_nitems(nxt_http_success) - 1)
1117 
1118 static const nxt_str_t  nxt_http_success[] = {
1119     nxt_string("HTTP/1.1 200 OK\r\n"),
1120     nxt_string("HTTP/1.1 201 Created\r\n"),
1121     nxt_string("HTTP/1.1 202 Accepted\r\n"),
1122     nxt_string("HTTP/1.1 203 Non-Authoritative Information\r\n"),
1123     nxt_string("HTTP/1.1 204 No Content\r\n"),
1124     nxt_string("HTTP/1.1 205 Reset Content\r\n"),
1125     nxt_string("HTTP/1.1 206 Partial Content\r\n"),
1126 };
1127 
1128 
1129 #define NXT_HTTP_LAST_REDIRECTION                                             \
1130     (NXT_HTTP_MULTIPLE_CHOICES + nxt_nitems(nxt_http_redirection) - 1)
1131 
1132 static const nxt_str_t  nxt_http_redirection[] = {
1133     nxt_string("HTTP/1.1 300 Multiple Choices\r\n"),
1134     nxt_string("HTTP/1.1 301 Moved Permanently\r\n"),
1135     nxt_string("HTTP/1.1 302 Found\r\n"),
1136     nxt_string("HTTP/1.1 303 See Other\r\n"),
1137     nxt_string("HTTP/1.1 304 Not Modified\r\n"),
1138     nxt_string("HTTP/1.1 307 Temporary Redirect\r\n"),
1139     nxt_string("HTTP/1.1 308 Permanent Redirect\r\n"),
1140 };
1141 
1142 
1143 #define NXT_HTTP_LAST_CLIENT_ERROR                                            \
1144     (NXT_HTTP_BAD_REQUEST + nxt_nitems(nxt_http_client_error) - 1)
1145 
1146 static const nxt_str_t  nxt_http_client_error[] = {
1147     nxt_string("HTTP/1.1 400 Bad Request\r\n"),
1148     nxt_string("HTTP/1.1 401 Unauthorized\r\n"),
1149     nxt_string("HTTP/1.1 402 Payment Required\r\n"),
1150     nxt_string("HTTP/1.1 403 Forbidden\r\n"),
1151     nxt_string("HTTP/1.1 404 Not Found\r\n"),
1152     nxt_string("HTTP/1.1 405 Method Not Allowed\r\n"),
1153     nxt_string("HTTP/1.1 406 Not Acceptable\r\n"),
1154     nxt_string("HTTP/1.1 407 Proxy Authentication Required\r\n"),
1155     nxt_string("HTTP/1.1 408 Request Timeout\r\n"),
1156     nxt_string("HTTP/1.1 409 Conflict\r\n"),
1157     nxt_string("HTTP/1.1 410 Gone\r\n"),
1158     nxt_string("HTTP/1.1 411 Length Required\r\n"),
1159     nxt_string("HTTP/1.1 412 Precondition Failed\r\n"),
1160     nxt_string("HTTP/1.1 413 Payload Too Large\r\n"),
1161     nxt_string("HTTP/1.1 414 URI Too Long\r\n"),
1162     nxt_string("HTTP/1.1 415 Unsupported Media Type\r\n"),
1163     nxt_string("HTTP/1.1 416 Range Not Satisfiable\r\n"),
1164     nxt_string("HTTP/1.1 417 Expectation Failed\r\n"),
1165     nxt_string("HTTP/1.1 418 I'm a teapot\r\n"),
1166     nxt_string("HTTP/1.1 419 \r\n"),
1167     nxt_string("HTTP/1.1 420 \r\n"),
1168     nxt_string("HTTP/1.1 421 Misdirected Request\r\n"),
1169     nxt_string("HTTP/1.1 422 Unprocessable Entity\r\n"),
1170     nxt_string("HTTP/1.1 423 Locked\r\n"),
1171     nxt_string("HTTP/1.1 424 Failed Dependency\r\n"),
1172     nxt_string("HTTP/1.1 425 \r\n"),
1173     nxt_string("HTTP/1.1 426 Upgrade Required\r\n"),
1174     nxt_string("HTTP/1.1 427 \r\n"),
1175     nxt_string("HTTP/1.1 428 \r\n"),
1176     nxt_string("HTTP/1.1 429 \r\n"),
1177     nxt_string("HTTP/1.1 430 \r\n"),
1178     nxt_string("HTTP/1.1 431 Request Header Fields Too Large\r\n"),
1179 };
1180 
1181 
1182 #define NXT_HTTP_LAST_NGINX_ERROR                                             \
1183     (NXT_HTTP_TO_HTTPS + nxt_nitems(nxt_http_nginx_error) - 1)
1184 
1185 static const nxt_str_t  nxt_http_nginx_error[] = {
1186     nxt_string("HTTP/1.1 400 "
1187                "The plain HTTP request was sent to HTTPS port\r\n"),
1188 };
1189 
1190 
1191 #define NXT_HTTP_LAST_SERVER_ERROR                                            \
1192     (NXT_HTTP_INTERNAL_SERVER_ERROR + nxt_nitems(nxt_http_server_error) - 1)
1193 
1194 static const nxt_str_t  nxt_http_server_error[] = {
1195     nxt_string("HTTP/1.1 500 Internal Server Error\r\n"),
1196     nxt_string("HTTP/1.1 501 Not Implemented\r\n"),
1197     nxt_string("HTTP/1.1 502 Bad Gateway\r\n"),
1198     nxt_string("HTTP/1.1 503 Service Unavailable\r\n"),
1199     nxt_string("HTTP/1.1 504 Gateway Timeout\r\n"),
1200     nxt_string("HTTP/1.1 505 HTTP Version Not Supported\r\n"),
1201 };
1202 
1203 
1204 #define UNKNOWN_STATUS_LENGTH  nxt_length("HTTP/1.1 999 \r\n")
1205 
1206 static void
nxt_h1p_request_header_send(nxt_task_t * task,nxt_http_request_t * r,nxt_work_handler_t body_handler,void * data)1207 nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r,
1208     nxt_work_handler_t body_handler, void *data)
1209 {
1210     u_char              *p;
1211     size_t              size;
1212     nxt_buf_t           *header;
1213     nxt_str_t           unknown_status;
1214     nxt_int_t           conn;
1215     nxt_uint_t          n;
1216     nxt_bool_t          http11;
1217     nxt_conn_t          *c;
1218     nxt_h1proto_t       *h1p;
1219     const nxt_str_t     *status;
1220     nxt_http_field_t    *field;
1221     u_char              buf[UNKNOWN_STATUS_LENGTH];
1222 
1223     static const char   chunked[] = "Transfer-Encoding: chunked\r\n";
1224     static const char   websocket_version[] = "Sec-WebSocket-Version: 13\r\n";
1225 
1226     static const nxt_str_t  connection[3] = {
1227         nxt_string("Connection: close\r\n"),
1228         nxt_string("Connection: keep-alive\r\n"),
1229         nxt_string("Upgrade: websocket\r\n"
1230                    "Connection: Upgrade\r\n"
1231                    "Sec-WebSocket-Accept: "),
1232     };
1233 
1234     nxt_debug(task, "h1p request header send");
1235 
1236     r->header_sent = 1;
1237     h1p = r->proto.h1;
1238     n = r->status;
1239 
1240     if (n >= NXT_HTTP_CONTINUE && n <= NXT_HTTP_LAST_INFORMATIONAL) {
1241         status = &nxt_http_informational[n - NXT_HTTP_CONTINUE];
1242 
1243     } else if (n >= NXT_HTTP_OK && n <= NXT_HTTP_LAST_SUCCESS) {
1244         status = &nxt_http_success[n - NXT_HTTP_OK];
1245 
1246     } else if (n >= NXT_HTTP_MULTIPLE_CHOICES
1247                && n <= NXT_HTTP_LAST_REDIRECTION)
1248     {
1249         status = &nxt_http_redirection[n - NXT_HTTP_MULTIPLE_CHOICES];
1250 
1251     } else if (n >= NXT_HTTP_BAD_REQUEST && n <= NXT_HTTP_LAST_CLIENT_ERROR) {
1252         status = &nxt_http_client_error[n - NXT_HTTP_BAD_REQUEST];
1253 
1254     } else if (n >= NXT_HTTP_TO_HTTPS && n <= NXT_HTTP_LAST_NGINX_ERROR) {
1255         status = &nxt_http_nginx_error[n - NXT_HTTP_TO_HTTPS];
1256 
1257     } else if (n >= NXT_HTTP_INTERNAL_SERVER_ERROR
1258                && n <= NXT_HTTP_LAST_SERVER_ERROR)
1259     {
1260         status = &nxt_http_server_error[n - NXT_HTTP_INTERNAL_SERVER_ERROR];
1261 
1262     } else if (n <= NXT_HTTP_STATUS_MAX) {
1263         (void) nxt_sprintf(buf, buf + UNKNOWN_STATUS_LENGTH,
1264                            "HTTP/1.1 %03d \r\n", n);
1265 
1266         unknown_status.length = UNKNOWN_STATUS_LENGTH;
1267         unknown_status.start = buf;
1268         status = &unknown_status;
1269 
1270     } else {
1271         status = &nxt_http_server_error[0];
1272     }
1273 
1274     size = status->length;
1275     /* Trailing CRLF at the end of header. */
1276     size += nxt_length("\r\n");
1277 
1278     conn = -1;
1279 
1280     if (r->websocket_handshake && n == NXT_HTTP_SWITCHING_PROTOCOLS) {
1281         h1p->websocket = 1;
1282         h1p->keepalive = 0;
1283         conn = 2;
1284         size += NXT_WEBSOCKET_ACCEPT_SIZE + 2;
1285 
1286     } else {
1287         http11 = nxt_h1p_is_http11(h1p);
1288 
1289         if (r->resp.content_length == NULL || r->resp.content_length->skip) {
1290 
1291             if (http11) {
1292                 if (n != NXT_HTTP_NOT_MODIFIED
1293                     && n != NXT_HTTP_NO_CONTENT
1294                     && body_handler != NULL
1295                     && !h1p->websocket)
1296                 {
1297                     h1p->chunked = 1;
1298                     size += nxt_length(chunked);
1299                     /* Trailing CRLF will be added by the first chunk header. */
1300                     size -= nxt_length("\r\n");
1301                 }
1302 
1303             } else {
1304                 h1p->keepalive = 0;
1305             }
1306         }
1307 
1308         if (http11 ^ h1p->keepalive) {
1309             conn = h1p->keepalive;
1310         }
1311     }
1312 
1313     if (conn >= 0) {
1314         size += connection[conn].length;
1315     }
1316 
1317     nxt_list_each(field, r->resp.fields) {
1318 
1319         if (!field->skip) {
1320             size += field->name_length + field->value_length;
1321             size += nxt_length(": \r\n");
1322         }
1323 
1324     } nxt_list_loop;
1325 
1326     if (nxt_slow_path(n == NXT_HTTP_UPGRADE_REQUIRED)) {
1327         size += nxt_length(websocket_version);
1328     }
1329 
1330     header = nxt_http_buf_mem(task, r, size);
1331     if (nxt_slow_path(header == NULL)) {
1332         nxt_h1p_request_error(task, h1p, r);
1333         return;
1334     }
1335 
1336     p = nxt_cpymem(header->mem.free, status->start, status->length);
1337 
1338     nxt_list_each(field, r->resp.fields) {
1339 
1340         if (!field->skip) {
1341             p = nxt_cpymem(p, field->name, field->name_length);
1342             *p++ = ':'; *p++ = ' ';
1343             p = nxt_cpymem(p, field->value, field->value_length);
1344             *p++ = '\r'; *p++ = '\n';
1345         }
1346 
1347     } nxt_list_loop;
1348 
1349     if (conn >= 0) {
1350         p = nxt_cpymem(p, connection[conn].start, connection[conn].length);
1351     }
1352 
1353     if (h1p->websocket) {
1354         nxt_websocket_accept(p, h1p->websocket_key->value);
1355         p += NXT_WEBSOCKET_ACCEPT_SIZE;
1356 
1357         *p++ = '\r'; *p++ = '\n';
1358     }
1359 
1360     if (nxt_slow_path(n == NXT_HTTP_UPGRADE_REQUIRED)) {
1361         p = nxt_cpymem(p, websocket_version, nxt_length(websocket_version));
1362     }
1363 
1364     if (h1p->chunked) {
1365         p = nxt_cpymem(p, chunked, nxt_length(chunked));
1366         /* Trailing CRLF will be added by the first chunk header. */
1367 
1368     } else {
1369         *p++ = '\r'; *p++ = '\n';
1370     }
1371 
1372     header->mem.free = p;
1373 
1374     h1p->header_size = nxt_buf_mem_used_size(&header->mem);
1375 
1376     c = h1p->conn;
1377 
1378     c->write = header;
1379     h1p->conn_write_tail = &header->next;
1380     c->write_state = &nxt_h1p_request_send_state;
1381 
1382     if (body_handler != NULL) {
1383         /*
1384          * The body handler will run before c->io->write() handler,
1385          * because the latter was inqueued by nxt_conn_write()
1386          * in engine->write_work_queue.
1387          */
1388         nxt_work_queue_add(&task->thread->engine->fast_work_queue,
1389                            body_handler, task, r, data);
1390 
1391     } else {
1392         header->next = nxt_http_buf_last(r);
1393     }
1394 
1395     nxt_conn_write(task->thread->engine, c);
1396 
1397     if (h1p->websocket) {
1398         nxt_h1p_websocket_first_frame_start(task, r, c->read);
1399     }
1400 }
1401 
1402 
1403 void
nxt_h1p_complete_buffers(nxt_task_t * task,nxt_h1proto_t * h1p,nxt_bool_t all)1404 nxt_h1p_complete_buffers(nxt_task_t *task, nxt_h1proto_t *h1p, nxt_bool_t all)
1405 {
1406     size_t            size;
1407     nxt_buf_t         *b, *in, *next;
1408     nxt_conn_t        *c;
1409 
1410     nxt_debug(task, "h1p complete buffers");
1411 
1412     b = h1p->buffers;
1413     c = h1p->conn;
1414     in = c->read;
1415 
1416     if (b != NULL) {
1417         if (in == NULL) {
1418             /* A request with large body. */
1419             in = b;
1420             c->read = in;
1421 
1422             b = in->next;
1423             in->next = NULL;
1424         }
1425 
1426         while (b != NULL) {
1427             next = b->next;
1428             b->next = NULL;
1429 
1430             b->completion_handler(task, b, b->parent);
1431 
1432             b = next;
1433         }
1434 
1435         h1p->buffers = NULL;
1436         h1p->nbuffers = 0;
1437     }
1438 
1439     if (in != NULL) {
1440         size = nxt_buf_mem_used_size(&in->mem);
1441 
1442         if (size == 0 || all) {
1443             in->completion_handler(task, in, in->parent);
1444 
1445             c->read = NULL;
1446         }
1447     }
1448 }
1449 
1450 
1451 static const nxt_conn_state_t  nxt_h1p_request_send_state
1452     nxt_aligned(64) =
1453 {
1454     .ready_handler = nxt_h1p_conn_sent,
1455     .error_handler = nxt_h1p_conn_request_error,
1456 
1457     .timer_handler = nxt_h1p_conn_request_send_timeout,
1458     .timer_value = nxt_h1p_conn_request_timer_value,
1459     .timer_data = offsetof(nxt_socket_conf_t, send_timeout),
1460     .timer_autoreset = 1,
1461 };
1462 
1463 
1464 static void
nxt_h1p_request_send(nxt_task_t * task,nxt_http_request_t * r,nxt_buf_t * out)1465 nxt_h1p_request_send(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *out)
1466 {
1467     nxt_conn_t     *c;
1468     nxt_h1proto_t  *h1p;
1469 
1470     nxt_debug(task, "h1p request send");
1471 
1472     h1p = r->proto.h1;
1473     c = h1p->conn;
1474 
1475     if (h1p->chunked) {
1476         out = nxt_h1p_chunk_create(task, r, out);
1477         if (nxt_slow_path(out == NULL)) {
1478             nxt_h1p_request_error(task, h1p, r);
1479             return;
1480         }
1481     }
1482 
1483     if (c->write == NULL) {
1484         c->write = out;
1485         c->write_state = &nxt_h1p_request_send_state;
1486 
1487         nxt_conn_write(task->thread->engine, c);
1488 
1489     } else {
1490         *h1p->conn_write_tail = out;
1491     }
1492 
1493     while (out->next != NULL) {
1494         out = out->next;
1495     }
1496 
1497     h1p->conn_write_tail = &out->next;
1498 }
1499 
1500 
1501 static nxt_buf_t *
nxt_h1p_chunk_create(nxt_task_t * task,nxt_http_request_t * r,nxt_buf_t * out)1502 nxt_h1p_chunk_create(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *out)
1503 {
1504     nxt_off_t          size;
1505     nxt_buf_t          *b, **prev, *header, *tail;
1506 
1507     const size_t       chunk_size = 2 * nxt_length("\r\n") + NXT_OFF_T_HEXLEN;
1508     static const char  tail_chunk[] = "\r\n0\r\n\r\n";
1509 
1510     size = 0;
1511     prev = &out;
1512 
1513     for (b = out; b != NULL; b = b->next) {
1514 
1515         if (nxt_buf_is_last(b)) {
1516             tail = nxt_http_buf_mem(task, r, sizeof(tail_chunk));
1517             if (nxt_slow_path(tail == NULL)) {
1518                 return NULL;
1519             }
1520 
1521             *prev = tail;
1522             tail->next = b;
1523             /*
1524              * The tail_chunk size with trailing zero is 8 bytes, so
1525              * memcpy may be inlined with just single 8 byte move operation.
1526              */
1527             nxt_memcpy(tail->mem.free, tail_chunk, sizeof(tail_chunk));
1528             tail->mem.free += nxt_length(tail_chunk);
1529 
1530             break;
1531         }
1532 
1533         size += nxt_buf_used_size(b);
1534         prev = &b->next;
1535     }
1536 
1537     if (size == 0) {
1538         return out;
1539     }
1540 
1541     header = nxt_http_buf_mem(task, r, chunk_size);
1542     if (nxt_slow_path(header == NULL)) {
1543         return NULL;
1544     }
1545 
1546     header->next = out;
1547     header->mem.free = nxt_sprintf(header->mem.free, header->mem.end,
1548                                    "\r\n%xO\r\n", size);
1549     return header;
1550 }
1551 
1552 
1553 static nxt_off_t
nxt_h1p_request_body_bytes_sent(nxt_task_t * task,nxt_http_proto_t proto)1554 nxt_h1p_request_body_bytes_sent(nxt_task_t *task, nxt_http_proto_t proto)
1555 {
1556     nxt_off_t      sent;
1557     nxt_h1proto_t  *h1p;
1558 
1559     h1p = proto.h1;
1560 
1561     sent = h1p->conn->sent - h1p->header_size;
1562 
1563     return (sent > 0) ? sent : 0;
1564 }
1565 
1566 
1567 static void
nxt_h1p_request_discard(nxt_task_t * task,nxt_http_request_t * r,nxt_buf_t * last)1568 nxt_h1p_request_discard(nxt_task_t *task, nxt_http_request_t *r,
1569     nxt_buf_t *last)
1570 {
1571     nxt_buf_t         *b;
1572     nxt_conn_t        *c;
1573     nxt_h1proto_t     *h1p;
1574     nxt_work_queue_t  *wq;
1575 
1576     nxt_debug(task, "h1p request discard");
1577 
1578     h1p = r->proto.h1;
1579     h1p->keepalive = 0;
1580 
1581     c = h1p->conn;
1582     b = c->write;
1583     c->write = NULL;
1584 
1585     wq = &task->thread->engine->fast_work_queue;
1586 
1587     nxt_sendbuf_drain(task, wq, b);
1588     nxt_sendbuf_drain(task, wq, last);
1589 }
1590 
1591 
1592 static void
nxt_h1p_conn_request_error(nxt_task_t * task,void * obj,void * data)1593 nxt_h1p_conn_request_error(nxt_task_t *task, void *obj, void *data)
1594 {
1595     nxt_h1proto_t       *h1p;
1596     nxt_http_request_t  *r;
1597 
1598     h1p = data;
1599 
1600     nxt_debug(task, "h1p conn request error");
1601 
1602     r = h1p->request;
1603 
1604     if (nxt_slow_path(r == NULL)) {
1605         nxt_h1p_shutdown(task, h1p->conn);
1606         return;
1607     }
1608 
1609     if (r->fields == NULL) {
1610         (void) nxt_h1p_header_process(task, h1p, r);
1611     }
1612 
1613     if (r->status == 0) {
1614         r->status = NXT_HTTP_BAD_REQUEST;
1615     }
1616 
1617     nxt_h1p_request_error(task, h1p, r);
1618 }
1619 
1620 
1621 static void
nxt_h1p_conn_request_timeout(nxt_task_t * task,void * obj,void * data)1622 nxt_h1p_conn_request_timeout(nxt_task_t *task, void *obj, void *data)
1623 {
1624     nxt_conn_t          *c;
1625     nxt_timer_t         *timer;
1626     nxt_h1proto_t       *h1p;
1627     nxt_http_request_t  *r;
1628 
1629     timer = obj;
1630 
1631     nxt_debug(task, "h1p conn request timeout");
1632 
1633     c = nxt_read_timer_conn(timer);
1634     c->block_read = 1;
1635     /*
1636      * Disable SO_LINGER off during socket closing
1637      * to send "408 Request Timeout" error response.
1638      */
1639     c->socket.timedout = 0;
1640 
1641     h1p = c->socket.data;
1642     h1p->keepalive = 0;
1643     r = h1p->request;
1644 
1645     if (r->fields == NULL) {
1646         (void) nxt_h1p_header_process(task, h1p, r);
1647     }
1648 
1649     nxt_http_request_error(task, r, NXT_HTTP_REQUEST_TIMEOUT);
1650 }
1651 
1652 
1653 static void
nxt_h1p_conn_request_send_timeout(nxt_task_t * task,void * obj,void * data)1654 nxt_h1p_conn_request_send_timeout(nxt_task_t *task, void *obj, void *data)
1655 {
1656     nxt_conn_t     *c;
1657     nxt_timer_t    *timer;
1658     nxt_h1proto_t  *h1p;
1659 
1660     timer = obj;
1661 
1662     nxt_debug(task, "h1p conn request send timeout");
1663 
1664     c = nxt_write_timer_conn(timer);
1665     c->block_write = 1;
1666     h1p = c->socket.data;
1667 
1668     nxt_h1p_request_error(task, h1p, h1p->request);
1669 }
1670 
1671 
1672 nxt_msec_t
nxt_h1p_conn_request_timer_value(nxt_conn_t * c,uintptr_t data)1673 nxt_h1p_conn_request_timer_value(nxt_conn_t *c, uintptr_t data)
1674 {
1675     nxt_h1proto_t  *h1p;
1676 
1677     h1p = c->socket.data;
1678 
1679     return nxt_value_at(nxt_msec_t, h1p->request->conf->socket_conf, data);
1680 }
1681 
1682 
1683 nxt_inline void
nxt_h1p_request_error(nxt_task_t * task,nxt_h1proto_t * h1p,nxt_http_request_t * r)1684 nxt_h1p_request_error(nxt_task_t *task, nxt_h1proto_t *h1p,
1685     nxt_http_request_t *r)
1686 {
1687     h1p->keepalive = 0;
1688 
1689     r->state->error_handler(task, r, h1p);
1690 }
1691 
1692 
1693 static void
nxt_h1p_request_close(nxt_task_t * task,nxt_http_proto_t proto,nxt_socket_conf_joint_t * joint)1694 nxt_h1p_request_close(nxt_task_t *task, nxt_http_proto_t proto,
1695     nxt_socket_conf_joint_t *joint)
1696 {
1697     nxt_conn_t     *c;
1698     nxt_h1proto_t  *h1p;
1699 
1700     nxt_debug(task, "h1p request close");
1701 
1702     h1p = proto.h1;
1703     h1p->keepalive &= !h1p->request->inconsistent;
1704     h1p->request = NULL;
1705 
1706     nxt_router_conf_release(task, joint);
1707 
1708     c = h1p->conn;
1709     task = &c->task;
1710     c->socket.task = task;
1711     c->read_timer.task = task;
1712     c->write_timer.task = task;
1713 
1714     if (h1p->keepalive) {
1715         nxt_h1p_keepalive(task, h1p, c);
1716 
1717     } else {
1718         nxt_h1p_shutdown(task, c);
1719     }
1720 }
1721 
1722 
1723 static void
nxt_h1p_conn_sent(nxt_task_t * task,void * obj,void * data)1724 nxt_h1p_conn_sent(nxt_task_t *task, void *obj, void *data)
1725 {
1726     nxt_conn_t          *c;
1727     nxt_event_engine_t  *engine;
1728 
1729     c = obj;
1730 
1731     nxt_debug(task, "h1p conn sent");
1732 
1733     engine = task->thread->engine;
1734 
1735     c->write = nxt_sendbuf_completion(task, &engine->fast_work_queue, c->write);
1736 
1737     if (c->write != NULL) {
1738         nxt_conn_write(engine, c);
1739     }
1740 }
1741 
1742 
1743 static void
nxt_h1p_conn_close(nxt_task_t * task,void * obj,void * data)1744 nxt_h1p_conn_close(nxt_task_t *task, void *obj, void *data)
1745 {
1746     nxt_conn_t  *c;
1747 
1748     c = obj;
1749 
1750     nxt_debug(task, "h1p conn close");
1751 
1752     nxt_conn_active(task->thread->engine, c);
1753 
1754     nxt_h1p_shutdown(task, c);
1755 }
1756 
1757 
1758 static void
nxt_h1p_conn_error(nxt_task_t * task,void * obj,void * data)1759 nxt_h1p_conn_error(nxt_task_t *task, void *obj, void *data)
1760 {
1761     nxt_conn_t  *c;
1762 
1763     c = obj;
1764 
1765     nxt_debug(task, "h1p conn error");
1766 
1767     nxt_conn_active(task->thread->engine, c);
1768 
1769     nxt_h1p_shutdown(task, c);
1770 }
1771 
1772 
1773 static nxt_msec_t
nxt_h1p_conn_timer_value(nxt_conn_t * c,uintptr_t data)1774 nxt_h1p_conn_timer_value(nxt_conn_t *c, uintptr_t data)
1775 {
1776     nxt_socket_conf_joint_t  *joint;
1777 
1778     joint = c->listen->socket.data;
1779 
1780     if (nxt_fast_path(joint != NULL)) {
1781         return nxt_value_at(nxt_msec_t, joint->socket_conf, data);
1782     }
1783 
1784     /*
1785      * Listening socket had been closed while
1786      * connection was in keep-alive state.
1787      */
1788     return 1;
1789 }
1790 
1791 
1792 static void
nxt_h1p_keepalive(nxt_task_t * task,nxt_h1proto_t * h1p,nxt_conn_t * c)1793 nxt_h1p_keepalive(nxt_task_t *task, nxt_h1proto_t *h1p, nxt_conn_t *c)
1794 {
1795     size_t              size;
1796     nxt_buf_t           *in;
1797     nxt_event_engine_t  *engine;
1798 
1799     nxt_debug(task, "h1p keepalive");
1800 
1801     if (!c->tcp_nodelay) {
1802         nxt_conn_tcp_nodelay_on(task, c);
1803     }
1804 
1805     nxt_h1p_complete_buffers(task, h1p, 0);
1806 
1807     in = c->read;
1808 
1809     nxt_memzero(h1p, offsetof(nxt_h1proto_t, conn));
1810 
1811     c->sent = 0;
1812 
1813     engine = task->thread->engine;
1814 
1815     nxt_conn_idle(engine, c);
1816 
1817     if (in == NULL) {
1818         c->read_state = &nxt_h1p_keepalive_state;
1819 
1820         nxt_conn_read(engine, c);
1821 
1822     } else {
1823         size = nxt_buf_mem_used_size(&in->mem);
1824 
1825         nxt_debug(task, "h1p pipelining");
1826 
1827         nxt_memmove(in->mem.start, in->mem.pos, size);
1828 
1829         in->mem.pos = in->mem.start;
1830         in->mem.free = in->mem.start + size;
1831 
1832         nxt_h1p_conn_request_init(task, c, c->socket.data);
1833     }
1834 }
1835 
1836 
1837 static const nxt_conn_state_t  nxt_h1p_keepalive_state
1838     nxt_aligned(64) =
1839 {
1840     .ready_handler = nxt_h1p_conn_request_init,
1841     .close_handler = nxt_h1p_conn_close,
1842     .error_handler = nxt_h1p_conn_error,
1843 
1844     .io_read_handler = nxt_h1p_idle_io_read_handler,
1845 
1846     .timer_handler = nxt_h1p_idle_timeout,
1847     .timer_value = nxt_h1p_conn_timer_value,
1848     .timer_data = offsetof(nxt_socket_conf_t, idle_timeout),
1849     .timer_autoreset = 1,
1850 };
1851 
1852 
1853 const nxt_conn_state_t  nxt_h1p_idle_close_state
1854     nxt_aligned(64) =
1855 {
1856     .close_handler = nxt_h1p_idle_close,
1857 };
1858 
1859 
1860 static void
nxt_h1p_idle_close(nxt_task_t * task,void * obj,void * data)1861 nxt_h1p_idle_close(nxt_task_t *task, void *obj, void *data)
1862 {
1863     nxt_conn_t  *c;
1864 
1865     c = obj;
1866 
1867     nxt_debug(task, "h1p idle close");
1868 
1869     nxt_conn_active(task->thread->engine, c);
1870 
1871     nxt_h1p_idle_response(task, c);
1872 }
1873 
1874 
1875 static void
nxt_h1p_idle_timeout(nxt_task_t * task,void * obj,void * data)1876 nxt_h1p_idle_timeout(nxt_task_t *task, void *obj, void *data)
1877 {
1878     nxt_conn_t   *c;
1879     nxt_timer_t  *timer;
1880 
1881     timer = obj;
1882 
1883     nxt_debug(task, "h1p idle timeout");
1884 
1885     c = nxt_read_timer_conn(timer);
1886     c->block_read = 1;
1887 
1888     nxt_conn_active(task->thread->engine, c);
1889 
1890     nxt_h1p_idle_response(task, c);
1891 }
1892 
1893 
1894 #define NXT_H1P_IDLE_TIMEOUT                                                  \
1895     "HTTP/1.1 408 Request Timeout\r\n"                                        \
1896     "Server: " NXT_SERVER "\r\n"                                              \
1897     "Connection: close\r\n"                                                   \
1898     "Content-Length: 0\r\n"                                                   \
1899     "Date: "
1900 
1901 
1902 static void
nxt_h1p_idle_response(nxt_task_t * task,nxt_conn_t * c)1903 nxt_h1p_idle_response(nxt_task_t *task, nxt_conn_t *c)
1904 {
1905     u_char     *p;
1906     size_t     size;
1907     nxt_buf_t  *out, *last;
1908 
1909     size = nxt_length(NXT_H1P_IDLE_TIMEOUT)
1910            + nxt_http_date_cache.size
1911            + nxt_length("\r\n\r\n");
1912 
1913     out = nxt_buf_mem_alloc(c->mem_pool, size, 0);
1914     if (nxt_slow_path(out == NULL)) {
1915         goto fail;
1916     }
1917 
1918     p = nxt_cpymem(out->mem.free, NXT_H1P_IDLE_TIMEOUT,
1919                    nxt_length(NXT_H1P_IDLE_TIMEOUT));
1920 
1921     p = nxt_thread_time_string(task->thread, &nxt_http_date_cache, p);
1922 
1923     out->mem.free = nxt_cpymem(p, "\r\n\r\n", 4);
1924 
1925     last = nxt_mp_zget(c->mem_pool, NXT_BUF_SYNC_SIZE);
1926     if (nxt_slow_path(last == NULL)) {
1927         goto fail;
1928     }
1929 
1930     out->next = last;
1931     nxt_buf_set_sync(last);
1932     nxt_buf_set_last(last);
1933 
1934     last->completion_handler = nxt_h1p_idle_response_sent;
1935     last->parent = c;
1936 
1937     c->write = out;
1938     c->write_state = &nxt_h1p_timeout_response_state;
1939 
1940     nxt_conn_write(task->thread->engine, c);
1941     return;
1942 
1943 fail:
1944 
1945     nxt_h1p_shutdown(task, c);
1946 }
1947 
1948 
1949 static const nxt_conn_state_t  nxt_h1p_timeout_response_state
1950     nxt_aligned(64) =
1951 {
1952     .ready_handler = nxt_h1p_conn_sent,
1953     .error_handler = nxt_h1p_idle_response_error,
1954 
1955     .timer_handler = nxt_h1p_idle_response_timeout,
1956     .timer_value = nxt_h1p_idle_response_timer_value,
1957 };
1958 
1959 
1960 static void
nxt_h1p_idle_response_sent(nxt_task_t * task,void * obj,void * data)1961 nxt_h1p_idle_response_sent(nxt_task_t *task, void *obj, void *data)
1962 {
1963     nxt_conn_t  *c;
1964 
1965     c = data;
1966 
1967     nxt_debug(task, "h1p idle timeout response sent");
1968 
1969     nxt_h1p_shutdown(task, c);
1970 }
1971 
1972 
1973 static void
nxt_h1p_idle_response_error(nxt_task_t * task,void * obj,void * data)1974 nxt_h1p_idle_response_error(nxt_task_t *task, void *obj, void *data)
1975 {
1976     nxt_conn_t  *c;
1977 
1978     c = obj;
1979 
1980     nxt_debug(task, "h1p response error");
1981 
1982     nxt_h1p_shutdown(task, c);
1983 }
1984 
1985 
1986 static void
nxt_h1p_idle_response_timeout(nxt_task_t * task,void * obj,void * data)1987 nxt_h1p_idle_response_timeout(nxt_task_t *task, void *obj, void *data)
1988 {
1989     nxt_conn_t   *c;
1990     nxt_timer_t  *timer;
1991 
1992     timer = obj;
1993 
1994     nxt_debug(task, "h1p idle timeout response timeout");
1995 
1996     c = nxt_read_timer_conn(timer);
1997     c->block_write = 1;
1998 
1999     nxt_h1p_shutdown(task, c);
2000 }
2001 
2002 
2003 static nxt_msec_t
nxt_h1p_idle_response_timer_value(nxt_conn_t * c,uintptr_t data)2004 nxt_h1p_idle_response_timer_value(nxt_conn_t *c, uintptr_t data)
2005 {
2006     return 10 * 1000;
2007 }
2008 
2009 
2010 static void
nxt_h1p_shutdown(nxt_task_t * task,nxt_conn_t * c)2011 nxt_h1p_shutdown(nxt_task_t *task, nxt_conn_t *c)
2012 {
2013     nxt_timer_t    *timer;
2014     nxt_h1proto_t  *h1p;
2015 
2016     nxt_debug(task, "h1p shutdown");
2017 
2018     h1p = c->socket.data;
2019 
2020     if (h1p != NULL) {
2021         nxt_h1p_complete_buffers(task, h1p, 1);
2022 
2023         if (nxt_slow_path(h1p->websocket_timer != NULL)) {
2024             timer = &h1p->websocket_timer->timer;
2025 
2026             if (timer->handler != nxt_h1p_conn_ws_shutdown) {
2027                 timer->handler = nxt_h1p_conn_ws_shutdown;
2028                 nxt_timer_add(task->thread->engine, timer, 0);
2029 
2030             } else {
2031                 nxt_debug(task, "h1p already scheduled ws shutdown");
2032             }
2033 
2034             return;
2035         }
2036     }
2037 
2038     nxt_h1p_closing(task, c);
2039 }
2040 
2041 
2042 static void
nxt_h1p_conn_ws_shutdown(nxt_task_t * task,void * obj,void * data)2043 nxt_h1p_conn_ws_shutdown(nxt_task_t *task, void *obj, void *data)
2044 {
2045     nxt_timer_t                *timer;
2046     nxt_h1p_websocket_timer_t  *ws_timer;
2047 
2048     nxt_debug(task, "h1p conn ws shutdown");
2049 
2050     timer = obj;
2051     ws_timer = nxt_timer_data(timer, nxt_h1p_websocket_timer_t, timer);
2052 
2053     nxt_h1p_closing(task, ws_timer->h1p->conn);
2054 }
2055 
2056 
2057 static void
nxt_h1p_closing(nxt_task_t * task,nxt_conn_t * c)2058 nxt_h1p_closing(nxt_task_t *task, nxt_conn_t *c)
2059 {
2060     nxt_debug(task, "h1p closing");
2061 
2062     c->socket.data = NULL;
2063 
2064 #if (NXT_TLS)
2065 
2066     if (c->u.tls != NULL) {
2067         c->write_state = &nxt_h1p_shutdown_state;
2068 
2069         c->io->shutdown(task, c, NULL);
2070         return;
2071     }
2072 
2073 #endif
2074 
2075     nxt_h1p_conn_closing(task, c, NULL);
2076 }
2077 
2078 
2079 #if (NXT_TLS)
2080 
2081 static const nxt_conn_state_t  nxt_h1p_shutdown_state
2082     nxt_aligned(64) =
2083 {
2084     .ready_handler = nxt_h1p_conn_closing,
2085     .close_handler = nxt_h1p_conn_closing,
2086     .error_handler = nxt_h1p_conn_closing,
2087 };
2088 
2089 #endif
2090 
2091 
2092 static void
nxt_h1p_conn_closing(nxt_task_t * task,void * obj,void * data)2093 nxt_h1p_conn_closing(nxt_task_t *task, void *obj, void *data)
2094 {
2095     nxt_conn_t  *c;
2096 
2097     c = obj;
2098 
2099     nxt_debug(task, "h1p conn closing");
2100 
2101     c->write_state = &nxt_h1p_close_state;
2102 
2103     nxt_conn_close(task->thread->engine, c);
2104 }
2105 
2106 
2107 static const nxt_conn_state_t  nxt_h1p_close_state
2108     nxt_aligned(64) =
2109 {
2110     .ready_handler = nxt_h1p_conn_free,
2111 };
2112 
2113 
2114 static void
nxt_h1p_conn_free(nxt_task_t * task,void * obj,void * data)2115 nxt_h1p_conn_free(nxt_task_t *task, void *obj, void *data)
2116 {
2117     nxt_conn_t          *c;
2118     nxt_listen_event_t  *lev;
2119     nxt_event_engine_t  *engine;
2120 
2121     c = obj;
2122 
2123     nxt_debug(task, "h1p conn free");
2124 
2125     engine = task->thread->engine;
2126 
2127     nxt_sockaddr_cache_free(engine, c);
2128 
2129     lev = c->listen;
2130 
2131     nxt_conn_free(task, c);
2132 
2133     nxt_router_listen_event_release(&engine->task, lev, NULL);
2134 }
2135 
2136 
2137 static void
nxt_h1p_peer_connect(nxt_task_t * task,nxt_http_peer_t * peer)2138 nxt_h1p_peer_connect(nxt_task_t *task, nxt_http_peer_t *peer)
2139 {
2140     nxt_mp_t            *mp;
2141     nxt_int_t           ret;
2142     nxt_conn_t          *c, *client;
2143     nxt_h1proto_t       *h1p;
2144     nxt_fd_event_t      *socket;
2145     nxt_work_queue_t    *wq;
2146     nxt_http_request_t  *r;
2147 
2148     nxt_debug(task, "h1p peer connect");
2149 
2150     peer->status = NXT_HTTP_UNSET;
2151     r = peer->request;
2152 
2153     mp = nxt_mp_create(1024, 128, 256, 32);
2154 
2155     if (nxt_slow_path(mp == NULL)) {
2156         goto fail;
2157     }
2158 
2159     h1p = nxt_mp_zalloc(mp, sizeof(nxt_h1proto_t));
2160     if (nxt_slow_path(h1p == NULL)) {
2161         goto fail;
2162     }
2163 
2164     ret = nxt_http_parse_request_init(&h1p->parser, r->mem_pool);
2165     if (nxt_slow_path(ret != NXT_OK)) {
2166         goto fail;
2167     }
2168 
2169     c = nxt_conn_create(mp, task);
2170     if (nxt_slow_path(c == NULL)) {
2171         goto fail;
2172     }
2173 
2174     c->mem_pool = mp;
2175     h1p->conn = c;
2176 
2177     peer->proto.h1 = h1p;
2178     h1p->request = r;
2179 
2180     c->socket.data = peer;
2181     c->remote = peer->server->sockaddr;
2182 
2183     c->socket.write_ready = 1;
2184     c->write_state = &nxt_h1p_peer_connect_state;
2185 
2186     /*
2187      * TODO: queues should be implemented via client proto interface.
2188      */
2189     client = r->proto.h1->conn;
2190 
2191     socket = &client->socket;
2192     wq = socket->read_work_queue;
2193     c->read_work_queue = wq;
2194     c->socket.read_work_queue = wq;
2195     c->read_timer.work_queue = wq;
2196 
2197     wq = socket->write_work_queue;
2198     c->write_work_queue = wq;
2199     c->socket.write_work_queue = wq;
2200     c->write_timer.work_queue = wq;
2201     /* TODO END */
2202 
2203     nxt_conn_connect(task->thread->engine, c);
2204 
2205     return;
2206 
2207 fail:
2208 
2209     peer->status = NXT_HTTP_INTERNAL_SERVER_ERROR;
2210 
2211     r->state->error_handler(task, r, peer);
2212 }
2213 
2214 
2215 static const nxt_conn_state_t  nxt_h1p_peer_connect_state
2216     nxt_aligned(64) =
2217 {
2218     .ready_handler = nxt_h1p_peer_connected,
2219     .close_handler = nxt_h1p_peer_refused,
2220     .error_handler = nxt_h1p_peer_error,
2221 
2222     .timer_handler = nxt_h1p_peer_send_timeout,
2223     .timer_value = nxt_h1p_peer_timer_value,
2224     .timer_data = offsetof(nxt_socket_conf_t, proxy_timeout),
2225 };
2226 
2227 
2228 static void
nxt_h1p_peer_connected(nxt_task_t * task,void * obj,void * data)2229 nxt_h1p_peer_connected(nxt_task_t *task, void *obj, void *data)
2230 {
2231     nxt_http_peer_t     *peer;
2232     nxt_http_request_t  *r;
2233 
2234     peer = data;
2235 
2236     nxt_debug(task, "h1p peer connected");
2237 
2238     r = peer->request;
2239     r->state->ready_handler(task, r, peer);
2240 }
2241 
2242 
2243 static void
nxt_h1p_peer_refused(nxt_task_t * task,void * obj,void * data)2244 nxt_h1p_peer_refused(nxt_task_t *task, void *obj, void *data)
2245 {
2246     nxt_http_peer_t     *peer;
2247     nxt_http_request_t  *r;
2248 
2249     peer = data;
2250 
2251     nxt_debug(task, "h1p peer refused");
2252 
2253     //peer->status = NXT_HTTP_SERVICE_UNAVAILABLE;
2254     peer->status = NXT_HTTP_BAD_GATEWAY;
2255 
2256     r = peer->request;
2257     r->state->error_handler(task, r, peer);
2258 }
2259 
2260 
2261 static void
nxt_h1p_peer_header_send(nxt_task_t * task,nxt_http_peer_t * peer)2262 nxt_h1p_peer_header_send(nxt_task_t *task, nxt_http_peer_t *peer)
2263 {
2264     u_char              *p;
2265     size_t              size;
2266     nxt_buf_t           *header, *body;
2267     nxt_conn_t          *c;
2268     nxt_http_field_t    *field;
2269     nxt_http_request_t  *r;
2270 
2271     nxt_debug(task, "h1p peer header send");
2272 
2273     r = peer->request;
2274 
2275     size = r->method->length + sizeof(" ") + r->target.length
2276            + sizeof(" HTTP/1.1\r\n")
2277            + sizeof("Connection: close\r\n")
2278            + sizeof("\r\n");
2279 
2280     nxt_list_each(field, r->fields) {
2281 
2282         if (!field->hopbyhop) {
2283             size += field->name_length + field->value_length;
2284             size += nxt_length(": \r\n");
2285         }
2286 
2287     } nxt_list_loop;
2288 
2289     header = nxt_http_buf_mem(task, r, size);
2290     if (nxt_slow_path(header == NULL)) {
2291         r->state->error_handler(task, r, peer);
2292         return;
2293     }
2294 
2295     p = header->mem.free;
2296 
2297     p = nxt_cpymem(p, r->method->start, r->method->length);
2298     *p++ = ' ';
2299     p = nxt_cpymem(p, r->target.start, r->target.length);
2300     p = nxt_cpymem(p, " HTTP/1.1\r\n", 11);
2301     p = nxt_cpymem(p, "Connection: close\r\n", 19);
2302 
2303     nxt_list_each(field, r->fields) {
2304 
2305         if (!field->hopbyhop) {
2306             p = nxt_cpymem(p, field->name, field->name_length);
2307             *p++ = ':'; *p++ = ' ';
2308             p = nxt_cpymem(p, field->value, field->value_length);
2309             *p++ = '\r'; *p++ = '\n';
2310         }
2311 
2312     } nxt_list_loop;
2313 
2314     *p++ = '\r'; *p++ = '\n';
2315     header->mem.free = p;
2316     size = p - header->mem.pos;
2317 
2318     c = peer->proto.h1->conn;
2319     c->write = header;
2320     c->write_state = &nxt_h1p_peer_header_send_state;
2321 
2322     if (r->body != NULL) {
2323         if (nxt_buf_is_file(r->body)) {
2324             body = nxt_buf_file_alloc(r->mem_pool, 0, 0);
2325 
2326         } else {
2327             body = nxt_buf_mem_alloc(r->mem_pool, 0, 0);
2328         }
2329 
2330         if (nxt_slow_path(body == NULL)) {
2331             r->state->error_handler(task, r, peer);
2332             return;
2333         }
2334 
2335         header->next = body;
2336 
2337         if (nxt_buf_is_file(r->body)) {
2338             body->file = r->body->file;
2339             body->file_end = r->body->file_end;
2340 
2341         } else {
2342             body->mem = r->body->mem;
2343         }
2344 
2345         size += nxt_buf_used_size(body);
2346 
2347 //        nxt_mp_retain(r->mem_pool);
2348     }
2349 
2350     if (size > 16384) {
2351         /* Use proxy_send_timeout instead of proxy_timeout. */
2352         c->write_state = &nxt_h1p_peer_header_body_send_state;
2353     }
2354 
2355     nxt_conn_write(task->thread->engine, c);
2356 }
2357 
2358 
2359 static const nxt_conn_state_t  nxt_h1p_peer_header_send_state
2360     nxt_aligned(64) =
2361 {
2362     .ready_handler = nxt_h1p_peer_header_sent,
2363     .error_handler = nxt_h1p_peer_error,
2364 
2365     .timer_handler = nxt_h1p_peer_send_timeout,
2366     .timer_value = nxt_h1p_peer_timer_value,
2367     .timer_data = offsetof(nxt_socket_conf_t, proxy_timeout),
2368 };
2369 
2370 
2371 static const nxt_conn_state_t  nxt_h1p_peer_header_body_send_state
2372     nxt_aligned(64) =
2373 {
2374     .ready_handler = nxt_h1p_peer_header_sent,
2375     .error_handler = nxt_h1p_peer_error,
2376 
2377     .timer_handler = nxt_h1p_peer_send_timeout,
2378     .timer_value = nxt_h1p_peer_timer_value,
2379     .timer_data = offsetof(nxt_socket_conf_t, proxy_send_timeout),
2380     .timer_autoreset = 1,
2381 };
2382 
2383 
2384 static void
nxt_h1p_peer_header_sent(nxt_task_t * task,void * obj,void * data)2385 nxt_h1p_peer_header_sent(nxt_task_t *task, void *obj, void *data)
2386 {
2387     nxt_conn_t          *c;
2388     nxt_http_peer_t     *peer;
2389     nxt_http_request_t  *r;
2390     nxt_event_engine_t  *engine;
2391 
2392     c = obj;
2393     peer = data;
2394 
2395     nxt_debug(task, "h1p peer header sent");
2396 
2397     engine = task->thread->engine;
2398 
2399     c->write = nxt_sendbuf_completion(task, &engine->fast_work_queue, c->write);
2400 
2401     if (c->write != NULL) {
2402         nxt_conn_write(engine, c);
2403         return;
2404     }
2405 
2406     r = peer->request;
2407     r->state->ready_handler(task, r, peer);
2408 }
2409 
2410 
2411 static void
nxt_h1p_peer_header_read(nxt_task_t * task,nxt_http_peer_t * peer)2412 nxt_h1p_peer_header_read(nxt_task_t *task, nxt_http_peer_t *peer)
2413 {
2414     nxt_conn_t  *c;
2415 
2416     nxt_debug(task, "h1p peer header read");
2417 
2418     c = peer->proto.h1->conn;
2419 
2420     if (c->write_timer.enabled) {
2421         c->read_state = &nxt_h1p_peer_header_read_state;
2422 
2423     } else {
2424         c->read_state = &nxt_h1p_peer_header_read_timer_state;
2425     }
2426 
2427     nxt_conn_read(task->thread->engine, c);
2428 }
2429 
2430 
2431 static const nxt_conn_state_t  nxt_h1p_peer_header_read_state
2432     nxt_aligned(64) =
2433 {
2434     .ready_handler = nxt_h1p_peer_header_read_done,
2435     .close_handler = nxt_h1p_peer_closed,
2436     .error_handler = nxt_h1p_peer_error,
2437 
2438     .io_read_handler = nxt_h1p_peer_io_read_handler,
2439 };
2440 
2441 
2442 static const nxt_conn_state_t  nxt_h1p_peer_header_read_timer_state
2443     nxt_aligned(64) =
2444 {
2445     .ready_handler = nxt_h1p_peer_header_read_done,
2446     .close_handler = nxt_h1p_peer_closed,
2447     .error_handler = nxt_h1p_peer_error,
2448 
2449     .io_read_handler = nxt_h1p_peer_io_read_handler,
2450 
2451     .timer_handler = nxt_h1p_peer_read_timeout,
2452     .timer_value = nxt_h1p_peer_timer_value,
2453     .timer_data = offsetof(nxt_socket_conf_t, proxy_timeout),
2454 };
2455 
2456 
2457 static ssize_t
nxt_h1p_peer_io_read_handler(nxt_task_t * task,nxt_conn_t * c)2458 nxt_h1p_peer_io_read_handler(nxt_task_t *task, nxt_conn_t *c)
2459 {
2460     size_t              size;
2461     ssize_t             n;
2462     nxt_buf_t           *b;
2463     nxt_http_peer_t     *peer;
2464     nxt_socket_conf_t   *skcf;
2465     nxt_http_request_t  *r;
2466 
2467     peer = c->socket.data;
2468     r = peer->request;
2469     b = c->read;
2470 
2471     if (b == NULL) {
2472         skcf = r->conf->socket_conf;
2473 
2474         size = (peer->header_received) ? skcf->proxy_buffer_size
2475                                        : skcf->proxy_header_buffer_size;
2476 
2477         nxt_debug(task, "h1p peer io read: %z", size);
2478 
2479         b = nxt_http_proxy_buf_mem_alloc(task, r, size);
2480         if (nxt_slow_path(b == NULL)) {
2481             c->socket.error = NXT_ENOMEM;
2482             return NXT_ERROR;
2483         }
2484     }
2485 
2486     n = c->io->recvbuf(c, b);
2487 
2488     if (n > 0) {
2489         c->read = b;
2490 
2491     } else {
2492         c->read = NULL;
2493         nxt_http_proxy_buf_mem_free(task, r, b);
2494     }
2495 
2496     return n;
2497 }
2498 
2499 
2500 static void
nxt_h1p_peer_header_read_done(nxt_task_t * task,void * obj,void * data)2501 nxt_h1p_peer_header_read_done(nxt_task_t *task, void *obj, void *data)
2502 {
2503     nxt_int_t           ret;
2504     nxt_buf_t           *b;
2505     nxt_conn_t          *c;
2506     nxt_h1proto_t       *h1p;
2507     nxt_http_peer_t     *peer;
2508     nxt_http_request_t  *r;
2509     nxt_event_engine_t  *engine;
2510 
2511     c = obj;
2512     peer = data;
2513 
2514     nxt_debug(task, "h1p peer header read done");
2515 
2516     b = c->read;
2517 
2518     ret = nxt_h1p_peer_header_parse(peer, &b->mem);
2519 
2520     r = peer->request;
2521 
2522     ret = nxt_expect(NXT_DONE, ret);
2523 
2524     if (ret != NXT_AGAIN) {
2525         engine = task->thread->engine;
2526         nxt_timer_disable(engine, &c->write_timer);
2527         nxt_timer_disable(engine, &c->read_timer);
2528     }
2529 
2530     switch (ret) {
2531 
2532     case NXT_DONE:
2533         peer->fields = peer->proto.h1->parser.fields;
2534 
2535         ret = nxt_http_fields_process(peer->fields,
2536                                       &nxt_h1p_peer_fields_hash, r);
2537         if (nxt_slow_path(ret != NXT_OK)) {
2538             peer->status = NXT_HTTP_INTERNAL_SERVER_ERROR;
2539             break;
2540         }
2541 
2542         c->read = NULL;
2543 
2544         peer->header_received = 1;
2545 
2546         h1p = peer->proto.h1;
2547 
2548         if (h1p->chunked) {
2549             if (r->resp.content_length != NULL) {
2550                 peer->status = NXT_HTTP_BAD_GATEWAY;
2551                 break;
2552             }
2553 
2554             h1p->chunked_parse.mem_pool = c->mem_pool;
2555 
2556         } else if (r->resp.content_length_n > 0) {
2557             h1p->remainder = r->resp.content_length_n;
2558         }
2559 
2560         if (nxt_buf_mem_used_size(&b->mem) != 0) {
2561             nxt_h1p_peer_body_process(task, peer, b);
2562             return;
2563         }
2564 
2565         r->state->ready_handler(task, r, peer);
2566         return;
2567 
2568     case NXT_AGAIN:
2569         if (nxt_buf_mem_free_size(&b->mem) != 0) {
2570             nxt_conn_read(task->thread->engine, c);
2571             return;
2572         }
2573 
2574         /* Fall through. */
2575 
2576     default:
2577     case NXT_ERROR:
2578     case NXT_HTTP_PARSE_INVALID:
2579     case NXT_HTTP_PARSE_UNSUPPORTED_VERSION:
2580     case NXT_HTTP_PARSE_TOO_LARGE_FIELD:
2581         peer->status = NXT_HTTP_BAD_GATEWAY;
2582         break;
2583     }
2584 
2585     nxt_http_proxy_buf_mem_free(task, r, b);
2586 
2587     r->state->error_handler(task, r, peer);
2588 }
2589 
2590 
2591 static nxt_int_t
nxt_h1p_peer_header_parse(nxt_http_peer_t * peer,nxt_buf_mem_t * bm)2592 nxt_h1p_peer_header_parse(nxt_http_peer_t *peer, nxt_buf_mem_t *bm)
2593 {
2594     u_char     *p;
2595     size_t     length;
2596     nxt_int_t  status;
2597 
2598     if (peer->status < 0) {
2599         length = nxt_buf_mem_used_size(bm);
2600 
2601         if (nxt_slow_path(length < 12)) {
2602             return NXT_AGAIN;
2603         }
2604 
2605         p = bm->pos;
2606 
2607         if (nxt_slow_path(memcmp(p, "HTTP/1.", 7) != 0
2608                           || (p[7] != '0' && p[7] != '1')))
2609         {
2610             return NXT_ERROR;
2611         }
2612 
2613         status = nxt_int_parse(&p[9], 3);
2614 
2615         if (nxt_slow_path(status < 0)) {
2616             return NXT_ERROR;
2617         }
2618 
2619         p += 12;
2620         length -= 12;
2621 
2622         p = memchr(p, '\n', length);
2623 
2624         if (nxt_slow_path(p == NULL)) {
2625             return NXT_AGAIN;
2626         }
2627 
2628         bm->pos = p + 1;
2629         peer->status = status;
2630     }
2631 
2632     return nxt_http_parse_fields(&peer->proto.h1->parser, bm);
2633 }
2634 
2635 
2636 static void
nxt_h1p_peer_read(nxt_task_t * task,nxt_http_peer_t * peer)2637 nxt_h1p_peer_read(nxt_task_t *task, nxt_http_peer_t *peer)
2638 {
2639     nxt_conn_t  *c;
2640 
2641     nxt_debug(task, "h1p peer read");
2642 
2643     c = peer->proto.h1->conn;
2644     c->read_state = &nxt_h1p_peer_read_state;
2645 
2646     nxt_conn_read(task->thread->engine, c);
2647 }
2648 
2649 
2650 static const nxt_conn_state_t  nxt_h1p_peer_read_state
2651     nxt_aligned(64) =
2652 {
2653     .ready_handler = nxt_h1p_peer_read_done,
2654     .close_handler = nxt_h1p_peer_closed,
2655     .error_handler = nxt_h1p_peer_error,
2656 
2657     .io_read_handler = nxt_h1p_peer_io_read_handler,
2658 
2659     .timer_handler = nxt_h1p_peer_read_timeout,
2660     .timer_value = nxt_h1p_peer_timer_value,
2661     .timer_data = offsetof(nxt_socket_conf_t, proxy_read_timeout),
2662     .timer_autoreset = 1,
2663 };
2664 
2665 
2666 static void
nxt_h1p_peer_read_done(nxt_task_t * task,void * obj,void * data)2667 nxt_h1p_peer_read_done(nxt_task_t *task, void *obj, void *data)
2668 {
2669     nxt_buf_t        *out;
2670     nxt_conn_t       *c;
2671     nxt_http_peer_t  *peer;
2672 
2673     c = obj;
2674     peer = data;
2675 
2676     nxt_debug(task, "h1p peer read done");
2677 
2678     out = c->read;
2679     c->read = NULL;
2680 
2681     nxt_h1p_peer_body_process(task, peer, out);
2682 }
2683 
2684 
2685 static void
nxt_h1p_peer_body_process(nxt_task_t * task,nxt_http_peer_t * peer,nxt_buf_t * out)2686 nxt_h1p_peer_body_process(nxt_task_t *task, nxt_http_peer_t *peer,
2687     nxt_buf_t *out)
2688 {
2689     size_t              length;
2690     nxt_h1proto_t       *h1p;
2691     nxt_http_request_t  *r;
2692 
2693     h1p = peer->proto.h1;
2694 
2695     if (h1p->chunked) {
2696         out = nxt_http_chunk_parse(task, &h1p->chunked_parse, out);
2697 
2698         if (h1p->chunked_parse.chunk_error || h1p->chunked_parse.error) {
2699             peer->status = NXT_HTTP_BAD_GATEWAY;
2700             r = peer->request;
2701             r->state->error_handler(task, r, peer);
2702             return;
2703         }
2704 
2705         if (h1p->chunked_parse.last) {
2706             nxt_buf_chain_add(&out, nxt_http_buf_last(peer->request));
2707             peer->closed = 1;
2708         }
2709 
2710     } else if (h1p->remainder > 0) {
2711         length = nxt_buf_chain_length(out);
2712         h1p->remainder -= length;
2713     }
2714 
2715     peer->body = out;
2716 
2717     r = peer->request;
2718     r->state->ready_handler(task, r, peer);
2719 }
2720 
2721 
2722 static void
nxt_h1p_peer_closed(nxt_task_t * task,void * obj,void * data)2723 nxt_h1p_peer_closed(nxt_task_t *task, void *obj, void *data)
2724 {
2725     nxt_http_peer_t     *peer;
2726     nxt_http_request_t  *r;
2727 
2728     peer = data;
2729 
2730     nxt_debug(task, "h1p peer closed");
2731 
2732     r = peer->request;
2733 
2734     if (peer->header_received) {
2735         peer->body = nxt_http_buf_last(r);
2736         peer->closed = 1;
2737         r->inconsistent = (peer->proto.h1->remainder != 0);
2738 
2739         r->state->ready_handler(task, r, peer);
2740 
2741     } else {
2742         peer->status = NXT_HTTP_BAD_GATEWAY;
2743 
2744         r->state->error_handler(task, r, peer);
2745     }
2746 }
2747 
2748 
2749 static void
nxt_h1p_peer_error(nxt_task_t * task,void * obj,void * data)2750 nxt_h1p_peer_error(nxt_task_t *task, void *obj, void *data)
2751 {
2752     nxt_http_peer_t     *peer;
2753     nxt_http_request_t  *r;
2754 
2755     peer = data;
2756 
2757     nxt_debug(task, "h1p peer error");
2758 
2759     peer->status = NXT_HTTP_BAD_GATEWAY;
2760 
2761     r = peer->request;
2762     r->state->error_handler(task, r, peer);
2763 }
2764 
2765 
2766 static void
nxt_h1p_peer_send_timeout(nxt_task_t * task,void * obj,void * data)2767 nxt_h1p_peer_send_timeout(nxt_task_t *task, void *obj, void *data)
2768 {
2769     nxt_conn_t          *c;
2770     nxt_timer_t         *timer;
2771     nxt_http_peer_t     *peer;
2772     nxt_http_request_t  *r;
2773 
2774     timer = obj;
2775 
2776     nxt_debug(task, "h1p peer send timeout");
2777 
2778     c = nxt_write_timer_conn(timer);
2779     c->block_write = 1;
2780     c->block_read = 1;
2781 
2782     peer = c->socket.data;
2783     peer->status = NXT_HTTP_GATEWAY_TIMEOUT;
2784 
2785     r = peer->request;
2786     r->state->error_handler(task, r, peer);
2787 }
2788 
2789 
2790 static void
nxt_h1p_peer_read_timeout(nxt_task_t * task,void * obj,void * data)2791 nxt_h1p_peer_read_timeout(nxt_task_t *task, void *obj, void *data)
2792 {
2793     nxt_conn_t          *c;
2794     nxt_timer_t         *timer;
2795     nxt_http_peer_t     *peer;
2796     nxt_http_request_t  *r;
2797 
2798     timer = obj;
2799 
2800     nxt_debug(task, "h1p peer read timeout");
2801 
2802     c = nxt_read_timer_conn(timer);
2803     c->block_write = 1;
2804     c->block_read = 1;
2805 
2806     peer = c->socket.data;
2807     peer->status = NXT_HTTP_GATEWAY_TIMEOUT;
2808 
2809     r = peer->request;
2810     r->state->error_handler(task, r, peer);
2811 }
2812 
2813 
2814 static nxt_msec_t
nxt_h1p_peer_timer_value(nxt_conn_t * c,uintptr_t data)2815 nxt_h1p_peer_timer_value(nxt_conn_t *c, uintptr_t data)
2816 {
2817     nxt_http_peer_t  *peer;
2818 
2819     peer = c->socket.data;
2820 
2821     return nxt_value_at(nxt_msec_t, peer->request->conf->socket_conf, data);
2822 }
2823 
2824 
2825 static void
nxt_h1p_peer_close(nxt_task_t * task,nxt_http_peer_t * peer)2826 nxt_h1p_peer_close(nxt_task_t *task, nxt_http_peer_t *peer)
2827 {
2828     nxt_conn_t  *c;
2829 
2830     nxt_debug(task, "h1p peer close");
2831 
2832     peer->closed = 1;
2833 
2834     c = peer->proto.h1->conn;
2835     task = &c->task;
2836     c->socket.task = task;
2837     c->read_timer.task = task;
2838     c->write_timer.task = task;
2839 
2840     if (c->socket.fd != -1) {
2841         c->write_state = &nxt_h1p_peer_close_state;
2842 
2843         nxt_conn_close(task->thread->engine, c);
2844 
2845     } else {
2846         nxt_h1p_peer_free(task, c, NULL);
2847     }
2848 }
2849 
2850 
2851 static const nxt_conn_state_t  nxt_h1p_peer_close_state
2852     nxt_aligned(64) =
2853 {
2854     .ready_handler = nxt_h1p_peer_free,
2855 };
2856 
2857 
2858 static void
nxt_h1p_peer_free(nxt_task_t * task,void * obj,void * data)2859 nxt_h1p_peer_free(nxt_task_t *task, void *obj, void *data)
2860 {
2861     nxt_conn_t  *c;
2862 
2863     c = obj;
2864 
2865     nxt_debug(task, "h1p peer free");
2866 
2867     nxt_conn_free(task, c);
2868 }
2869 
2870 
2871 static nxt_int_t
nxt_h1p_peer_transfer_encoding(void * ctx,nxt_http_field_t * field,uintptr_t data)2872 nxt_h1p_peer_transfer_encoding(void *ctx, nxt_http_field_t *field,
2873     uintptr_t data)
2874 {
2875     nxt_http_request_t  *r;
2876 
2877     r = ctx;
2878     field->skip = 1;
2879 
2880     if (field->value_length == 7
2881         && memcmp(field->value, "chunked", 7) == 0)
2882     {
2883         r->peer->proto.h1->chunked = 1;
2884     }
2885 
2886     return NXT_OK;
2887 }
2888