xref: /unit/src/nxt_h1proto.c (revision 2133:46433e3cef45)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_router.h>
8 #include <nxt_http.h>
9 #include <nxt_upstream.h>
10 #include <nxt_h1proto.h>
11 #include <nxt_websocket.h>
12 #include <nxt_websocket_header.h>
13 
14 
15 /*
16  * nxt_http_conn_ and nxt_h1p_conn_ prefixes are used for connection handlers.
17  * nxt_h1p_idle_ prefix is used for idle connection handlers.
18  * nxt_h1p_request_ prefix is used for HTTP/1 protocol request methods.
19  */
20 
21 #if (NXT_TLS)
22 static ssize_t nxt_http_idle_io_read_handler(nxt_task_t *task, nxt_conn_t *c);
23 static void nxt_http_conn_test(nxt_task_t *task, void *obj, void *data);
24 #endif
25 static ssize_t nxt_h1p_idle_io_read_handler(nxt_task_t *task, nxt_conn_t *c);
26 static void nxt_h1p_conn_proto_init(nxt_task_t *task, void *obj, void *data);
27 static void nxt_h1p_conn_request_init(nxt_task_t *task, void *obj, void *data);
28 static void nxt_h1p_conn_request_header_parse(nxt_task_t *task, void *obj,
29     void *data);
30 static nxt_int_t nxt_h1p_header_process(nxt_task_t *task, nxt_h1proto_t *h1p,
31     nxt_http_request_t *r);
32 static nxt_int_t nxt_h1p_header_buffer_test(nxt_task_t *task,
33     nxt_h1proto_t *h1p, nxt_conn_t *c, nxt_socket_conf_t *skcf);
34 static nxt_int_t nxt_h1p_connection(void *ctx, nxt_http_field_t *field,
35     uintptr_t data);
36 static nxt_int_t nxt_h1p_upgrade(void *ctx, nxt_http_field_t *field,
37     uintptr_t data);
38 static nxt_int_t nxt_h1p_websocket_key(void *ctx, nxt_http_field_t *field,
39     uintptr_t data);
40 static nxt_int_t nxt_h1p_websocket_version(void *ctx, nxt_http_field_t *field,
41     uintptr_t data);
42 static nxt_int_t nxt_h1p_transfer_encoding(void *ctx, nxt_http_field_t *field,
43     uintptr_t data);
44 static void nxt_h1p_request_body_read(nxt_task_t *task, nxt_http_request_t *r);
45 static void nxt_h1p_conn_request_body_read(nxt_task_t *task, void *obj,
46     void *data);
47 static void nxt_h1p_request_local_addr(nxt_task_t *task, nxt_http_request_t *r);
48 static void nxt_h1p_request_header_send(nxt_task_t *task,
49     nxt_http_request_t *r, nxt_work_handler_t body_handler, void *data);
50 static void nxt_h1p_request_send(nxt_task_t *task, nxt_http_request_t *r,
51     nxt_buf_t *out);
52 static nxt_buf_t *nxt_h1p_chunk_create(nxt_task_t *task, nxt_http_request_t *r,
53     nxt_buf_t *out);
54 static nxt_off_t nxt_h1p_request_body_bytes_sent(nxt_task_t *task,
55     nxt_http_proto_t proto);
56 static void nxt_h1p_request_discard(nxt_task_t *task, nxt_http_request_t *r,
57     nxt_buf_t *last);
58 static void nxt_h1p_conn_request_error(nxt_task_t *task, void *obj, void *data);
59 static void nxt_h1p_conn_request_timeout(nxt_task_t *task, void *obj,
60     void *data);
61 static void nxt_h1p_conn_request_send_timeout(nxt_task_t *task, void *obj,
62     void *data);
63 nxt_inline void nxt_h1p_request_error(nxt_task_t *task, nxt_h1proto_t *h1p,
64     nxt_http_request_t *r);
65 static void nxt_h1p_request_close(nxt_task_t *task, nxt_http_proto_t proto,
66     nxt_socket_conf_joint_t *joint);
67 static void nxt_h1p_conn_sent(nxt_task_t *task, void *obj, void *data);
68 static void nxt_h1p_conn_close(nxt_task_t *task, void *obj, void *data);
69 static void nxt_h1p_conn_error(nxt_task_t *task, void *obj, void *data);
70 static nxt_msec_t nxt_h1p_conn_timer_value(nxt_conn_t *c, uintptr_t data);
71 static void nxt_h1p_keepalive(nxt_task_t *task, nxt_h1proto_t *h1p,
72     nxt_conn_t *c);
73 static void nxt_h1p_idle_close(nxt_task_t *task, void *obj, void *data);
74 static void nxt_h1p_idle_timeout(nxt_task_t *task, void *obj, void *data);
75 static void nxt_h1p_idle_response(nxt_task_t *task, nxt_conn_t *c);
76 static void nxt_h1p_idle_response_sent(nxt_task_t *task, void *obj, void *data);
77 static void nxt_h1p_idle_response_error(nxt_task_t *task, void *obj,
78     void *data);
79 static void nxt_h1p_idle_response_timeout(nxt_task_t *task, void *obj,
80     void *data);
81 static nxt_msec_t nxt_h1p_idle_response_timer_value(nxt_conn_t *c,
82     uintptr_t data);
83 static void nxt_h1p_shutdown(nxt_task_t *task, nxt_conn_t *c);
84 static void nxt_h1p_closing(nxt_task_t *task, nxt_conn_t *c);
85 static void nxt_h1p_conn_ws_shutdown(nxt_task_t *task, void *obj, void *data);
86 static void nxt_h1p_conn_closing(nxt_task_t *task, void *obj, void *data);
87 static void nxt_h1p_conn_free(nxt_task_t *task, void *obj, void *data);
88 
89 static void nxt_h1p_peer_connect(nxt_task_t *task, nxt_http_peer_t *peer);
90 static void nxt_h1p_peer_connected(nxt_task_t *task, void *obj, void *data);
91 static void nxt_h1p_peer_refused(nxt_task_t *task, void *obj, void *data);
92 static void nxt_h1p_peer_header_send(nxt_task_t *task, nxt_http_peer_t *peer);
93 static void nxt_h1p_peer_header_sent(nxt_task_t *task, void *obj, void *data);
94 static void nxt_h1p_peer_header_read(nxt_task_t *task, nxt_http_peer_t *peer);
95 static ssize_t nxt_h1p_peer_io_read_handler(nxt_task_t *task, nxt_conn_t *c);
96 static void nxt_h1p_peer_header_read_done(nxt_task_t *task, void *obj,
97     void *data);
98 static nxt_int_t nxt_h1p_peer_header_parse(nxt_http_peer_t *peer,
99     nxt_buf_mem_t *bm);
100 static void nxt_h1p_peer_read(nxt_task_t *task, nxt_http_peer_t *peer);
101 static void nxt_h1p_peer_read_done(nxt_task_t *task, void *obj, void *data);
102 static void nxt_h1p_peer_body_process(nxt_task_t *task, nxt_http_peer_t *peer, nxt_buf_t *out);
103 static void nxt_h1p_peer_closed(nxt_task_t *task, void *obj, void *data);
104 static void nxt_h1p_peer_error(nxt_task_t *task, void *obj, void *data);
105 static void nxt_h1p_peer_send_timeout(nxt_task_t *task, void *obj, void *data);
106 static void nxt_h1p_peer_read_timeout(nxt_task_t *task, void *obj, void *data);
107 static nxt_msec_t nxt_h1p_peer_timer_value(nxt_conn_t *c, uintptr_t data);
108 static void nxt_h1p_peer_close(nxt_task_t *task, nxt_http_peer_t *peer);
109 static void nxt_h1p_peer_free(nxt_task_t *task, void *obj, void *data);
110 static nxt_int_t nxt_h1p_peer_transfer_encoding(void *ctx,
111     nxt_http_field_t *field, uintptr_t data);
112 
113 #if (NXT_TLS)
114 static const nxt_conn_state_t  nxt_http_idle_state;
115 static const nxt_conn_state_t  nxt_h1p_shutdown_state;
116 #endif
117 static const nxt_conn_state_t  nxt_h1p_idle_state;
118 static const nxt_conn_state_t  nxt_h1p_header_parse_state;
119 static const nxt_conn_state_t  nxt_h1p_read_body_state;
120 static const nxt_conn_state_t  nxt_h1p_request_send_state;
121 static const nxt_conn_state_t  nxt_h1p_timeout_response_state;
122 static const nxt_conn_state_t  nxt_h1p_keepalive_state;
123 static const nxt_conn_state_t  nxt_h1p_close_state;
124 static const nxt_conn_state_t  nxt_h1p_peer_connect_state;
125 static const nxt_conn_state_t  nxt_h1p_peer_header_send_state;
126 static const nxt_conn_state_t  nxt_h1p_peer_header_body_send_state;
127 static const nxt_conn_state_t  nxt_h1p_peer_header_read_state;
128 static const nxt_conn_state_t  nxt_h1p_peer_header_read_timer_state;
129 static const nxt_conn_state_t  nxt_h1p_peer_read_state;
130 static const nxt_conn_state_t  nxt_h1p_peer_close_state;
131 
132 
133 const nxt_http_proto_table_t  nxt_http_proto[3] = {
134     /* NXT_HTTP_PROTO_H1 */
135     {
136         .body_read        = nxt_h1p_request_body_read,
137         .local_addr       = nxt_h1p_request_local_addr,
138         .header_send      = nxt_h1p_request_header_send,
139         .send             = nxt_h1p_request_send,
140         .body_bytes_sent  = nxt_h1p_request_body_bytes_sent,
141         .discard          = nxt_h1p_request_discard,
142         .close            = nxt_h1p_request_close,
143 
144         .peer_connect     = nxt_h1p_peer_connect,
145         .peer_header_send = nxt_h1p_peer_header_send,
146         .peer_header_read = nxt_h1p_peer_header_read,
147         .peer_read        = nxt_h1p_peer_read,
148         .peer_close       = nxt_h1p_peer_close,
149 
150         .ws_frame_start   = nxt_h1p_websocket_frame_start,
151     },
152     /* NXT_HTTP_PROTO_H2      */
153     /* NXT_HTTP_PROTO_DEVNULL */
154 };
155 
156 
157 static nxt_lvlhsh_t                    nxt_h1p_fields_hash;
158 
159 static nxt_http_field_proc_t           nxt_h1p_fields[] = {
160     { nxt_string("Connection"),        &nxt_h1p_connection, 0 },
161     { nxt_string("Upgrade"),           &nxt_h1p_upgrade, 0 },
162     { nxt_string("Sec-WebSocket-Key"), &nxt_h1p_websocket_key, 0 },
163     { nxt_string("Sec-WebSocket-Version"),
164                                        &nxt_h1p_websocket_version, 0 },
165     { nxt_string("Transfer-Encoding"), &nxt_h1p_transfer_encoding, 0 },
166 
167     { nxt_string("Host"),              &nxt_http_request_host, 0 },
168     { nxt_string("Cookie"),            &nxt_http_request_field,
169         offsetof(nxt_http_request_t, cookie) },
170     { nxt_string("Referer"),           &nxt_http_request_field,
171         offsetof(nxt_http_request_t, referer) },
172     { nxt_string("User-Agent"),        &nxt_http_request_field,
173         offsetof(nxt_http_request_t, user_agent) },
174     { nxt_string("Content-Type"),      &nxt_http_request_field,
175         offsetof(nxt_http_request_t, content_type) },
176     { nxt_string("Content-Length"),    &nxt_http_request_content_length, 0 },
177     { nxt_string("Authorization"),     &nxt_http_request_field,
178         offsetof(nxt_http_request_t, authorization) },
179 };
180 
181 
182 static nxt_lvlhsh_t                    nxt_h1p_peer_fields_hash;
183 
184 static nxt_http_field_proc_t           nxt_h1p_peer_fields[] = {
185     { nxt_string("Connection"),        &nxt_http_proxy_skip, 0 },
186     { nxt_string("Transfer-Encoding"), &nxt_h1p_peer_transfer_encoding, 0 },
187     { nxt_string("Server"),            &nxt_http_proxy_skip, 0 },
188     { nxt_string("Date"),              &nxt_http_proxy_date, 0 },
189     { nxt_string("Content-Length"),    &nxt_http_proxy_content_length, 0 },
190 };
191 
192 
193 nxt_int_t
nxt_h1p_init(nxt_task_t * task)194 nxt_h1p_init(nxt_task_t *task)
195 {
196     nxt_int_t  ret;
197 
198     ret = nxt_http_fields_hash(&nxt_h1p_fields_hash,
199                                nxt_h1p_fields, nxt_nitems(nxt_h1p_fields));
200 
201     if (nxt_fast_path(ret == NXT_OK)) {
202         ret = nxt_http_fields_hash(&nxt_h1p_peer_fields_hash,
203                                    nxt_h1p_peer_fields,
204                                    nxt_nitems(nxt_h1p_peer_fields));
205     }
206 
207     return ret;
208 }
209 
210 
211 void
nxt_http_conn_init(nxt_task_t * task,void * obj,void * data)212 nxt_http_conn_init(nxt_task_t *task, void *obj, void *data)
213 {
214     nxt_conn_t               *c;
215     nxt_socket_conf_t        *skcf;
216     nxt_event_engine_t       *engine;
217     nxt_listen_event_t       *lev;
218     nxt_socket_conf_joint_t  *joint;
219 
220     c = obj;
221     lev = data;
222 
223     nxt_debug(task, "http conn init");
224 
225     joint = lev->socket.data;
226     skcf = joint->socket_conf;
227     c->local = skcf->sockaddr;
228 
229     engine = task->thread->engine;
230     c->read_work_queue = &engine->fast_work_queue;
231     c->write_work_queue = &engine->fast_work_queue;
232 
233     c->read_state = &nxt_h1p_idle_state;
234 
235 #if (NXT_TLS)
236     if (skcf->tls != NULL) {
237         c->read_state = &nxt_http_idle_state;
238     }
239 #endif
240 
241     nxt_conn_read(engine, c);
242 }
243 
244 
245 #if (NXT_TLS)
246 
247 static const nxt_conn_state_t  nxt_http_idle_state
248     nxt_aligned(64) =
249 {
250     .ready_handler = nxt_http_conn_test,
251     .close_handler = nxt_h1p_conn_close,
252     .error_handler = nxt_h1p_conn_error,
253 
254     .io_read_handler = nxt_http_idle_io_read_handler,
255 
256     .timer_handler = nxt_h1p_idle_timeout,
257     .timer_value = nxt_h1p_conn_timer_value,
258     .timer_data = offsetof(nxt_socket_conf_t, idle_timeout),
259 };
260 
261 
262 static ssize_t
nxt_http_idle_io_read_handler(nxt_task_t * task,nxt_conn_t * c)263 nxt_http_idle_io_read_handler(nxt_task_t *task, nxt_conn_t *c)
264 {
265     size_t                   size;
266     ssize_t                  n;
267     nxt_buf_t                *b;
268     nxt_socket_conf_joint_t  *joint;
269 
270     joint = c->listen->socket.data;
271 
272     if (nxt_slow_path(joint == NULL)) {
273         /*
274          * Listening socket had been closed while
275          * connection was in keep-alive state.
276          */
277         c->read_state = &nxt_h1p_idle_close_state;
278         return 0;
279     }
280 
281     size = joint->socket_conf->header_buffer_size;
282 
283     b = nxt_event_engine_buf_mem_alloc(task->thread->engine, size);
284     if (nxt_slow_path(b == NULL)) {
285         c->socket.error = NXT_ENOMEM;
286         return NXT_ERROR;
287     }
288 
289     /*
290      * 1 byte is enough to distinguish between SSLv3/TLS and plain HTTP.
291      * 11 bytes are enough to log supported SSLv3/TLS version.
292      * 16 bytes are just for more optimized kernel copy-out operation.
293      */
294     n = c->io->recv(c, b->mem.pos, 16, MSG_PEEK);
295 
296     if (n > 0) {
297         c->read = b;
298 
299     } else {
300         c->read = NULL;
301         nxt_event_engine_buf_mem_free(task->thread->engine, b);
302     }
303 
304     return n;
305 }
306 
307 
308 static void
nxt_http_conn_test(nxt_task_t * task,void * obj,void * data)309 nxt_http_conn_test(nxt_task_t *task, void *obj, void *data)
310 {
311     u_char                   *p;
312     nxt_buf_t                *b;
313     nxt_conn_t               *c;
314     nxt_tls_conf_t           *tls;
315     nxt_event_engine_t       *engine;
316     nxt_socket_conf_joint_t  *joint;
317 
318     c = obj;
319 
320     nxt_debug(task, "h1p conn https test");
321 
322     engine = task->thread->engine;
323     b = c->read;
324     p = b->mem.pos;
325 
326     c->read_state = &nxt_h1p_idle_state;
327 
328     if (p[0] != 0x16) {
329         b->mem.free = b->mem.pos;
330 
331         nxt_conn_read(engine, c);
332         return;
333     }
334 
335     /* SSLv3/TLS ClientHello message. */
336 
337 #if (NXT_DEBUG)
338     if (nxt_buf_mem_used_size(&b->mem) >= 11) {
339         u_char      major, minor;
340         const char  *protocol;
341 
342         major = p[9];
343         minor = p[10];
344 
345         if (major == 3) {
346             if (minor == 0) {
347                 protocol = "SSLv";
348 
349             } else {
350                 protocol = "TLSv";
351                 major -= 2;
352                 minor -= 1;
353             }
354 
355             nxt_debug(task, "SSL/TLS: %s%ud.%ud", protocol, major, minor);
356         }
357     }
358 #endif
359 
360     c->read = NULL;
361     nxt_event_engine_buf_mem_free(engine, b);
362 
363     joint = c->listen->socket.data;
364 
365     if (nxt_slow_path(joint == NULL)) {
366         /*
367          * Listening socket had been closed while
368          * connection was in keep-alive state.
369          */
370         nxt_h1p_closing(task, c);
371         return;
372     }
373 
374     tls = joint->socket_conf->tls;
375 
376     tls->conn_init(task, tls, c);
377 }
378 
379 #endif
380 
381 
382 static const nxt_conn_state_t  nxt_h1p_idle_state
383     nxt_aligned(64) =
384 {
385     .ready_handler = nxt_h1p_conn_proto_init,
386     .close_handler = nxt_h1p_conn_close,
387     .error_handler = nxt_h1p_conn_error,
388 
389     .io_read_handler = nxt_h1p_idle_io_read_handler,
390 
391     .timer_handler = nxt_h1p_idle_timeout,
392     .timer_value = nxt_h1p_conn_timer_value,
393     .timer_data = offsetof(nxt_socket_conf_t, idle_timeout),
394     .timer_autoreset = 1,
395 };
396 
397 
398 static ssize_t
nxt_h1p_idle_io_read_handler(nxt_task_t * task,nxt_conn_t * c)399 nxt_h1p_idle_io_read_handler(nxt_task_t *task, nxt_conn_t *c)
400 {
401     size_t                   size;
402     ssize_t                  n;
403     nxt_buf_t                *b;
404     nxt_socket_conf_joint_t  *joint;
405 
406     joint = c->listen->socket.data;
407 
408     if (nxt_slow_path(joint == NULL)) {
409         /*
410          * Listening socket had been closed while
411          * connection was in keep-alive state.
412          */
413         c->read_state = &nxt_h1p_idle_close_state;
414         return 0;
415     }
416 
417     b = c->read;
418 
419     if (b == NULL) {
420         size = joint->socket_conf->header_buffer_size;
421 
422         b = nxt_event_engine_buf_mem_alloc(task->thread->engine, size);
423         if (nxt_slow_path(b == NULL)) {
424             c->socket.error = NXT_ENOMEM;
425             return NXT_ERROR;
426         }
427     }
428 
429     n = c->io->recvbuf(c, b);
430 
431     if (n > 0) {
432         c->read = b;
433 
434     } else {
435         c->read = NULL;
436         nxt_event_engine_buf_mem_free(task->thread->engine, b);
437     }
438 
439     return n;
440 }
441 
442 
443 static void
nxt_h1p_conn_proto_init(nxt_task_t * task,void * obj,void * data)444 nxt_h1p_conn_proto_init(nxt_task_t *task, void *obj, void *data)
445 {
446     nxt_conn_t     *c;
447     nxt_h1proto_t  *h1p;
448 
449     c = obj;
450 
451     nxt_debug(task, "h1p conn proto init");
452 
453     h1p = nxt_mp_zget(c->mem_pool, sizeof(nxt_h1proto_t));
454     if (nxt_slow_path(h1p == NULL)) {
455         nxt_h1p_closing(task, c);
456         return;
457     }
458 
459     c->socket.data = h1p;
460     h1p->conn = c;
461 
462     nxt_h1p_conn_request_init(task, c, h1p);
463 }
464 
465 
466 static void
nxt_h1p_conn_request_init(nxt_task_t * task,void * obj,void * data)467 nxt_h1p_conn_request_init(nxt_task_t *task, void *obj, void *data)
468 {
469     nxt_int_t                ret;
470     nxt_conn_t               *c;
471     nxt_h1proto_t            *h1p;
472     nxt_socket_conf_t        *skcf;
473     nxt_http_request_t       *r;
474     nxt_socket_conf_joint_t  *joint;
475 
476     c = obj;
477     h1p = data;
478 
479     nxt_debug(task, "h1p conn request init");
480 
481     nxt_queue_remove(&c->link);
482 
483     r = nxt_http_request_create(task);
484 
485     if (nxt_fast_path(r != NULL)) {
486         h1p->request = r;
487         r->proto.h1 = h1p;
488 
489         /* r->protocol = NXT_HTTP_PROTO_H1 is done by zeroing. */
490         r->remote = c->remote;
491 
492 #if (NXT_TLS)
493         r->tls = (c->u.tls != NULL);
494 #endif
495 
496         r->task = c->task;
497         task = &r->task;
498         c->socket.task = task;
499         c->read_timer.task = task;
500         c->write_timer.task = task;
501 
502         ret = nxt_http_parse_request_init(&h1p->parser, r->mem_pool);
503 
504         if (nxt_fast_path(ret == NXT_OK)) {
505             joint = c->listen->socket.data;
506             joint->count++;
507 
508             r->conf = joint;
509             skcf = joint->socket_conf;
510 
511             if (c->local == NULL) {
512                 c->local = skcf->sockaddr;
513             }
514 
515             h1p->parser.discard_unsafe_fields = skcf->discard_unsafe_fields;
516 
517             nxt_h1p_conn_request_header_parse(task, c, h1p);
518             return;
519         }
520 
521         /*
522          * The request is very incomplete here,
523          * so "internal server error" useless here.
524          */
525         nxt_mp_release(r->mem_pool);
526     }
527 
528     nxt_h1p_closing(task, c);
529 }
530 
531 
532 static const nxt_conn_state_t  nxt_h1p_header_parse_state
533     nxt_aligned(64) =
534 {
535     .ready_handler = nxt_h1p_conn_request_header_parse,
536     .close_handler = nxt_h1p_conn_request_error,
537     .error_handler = nxt_h1p_conn_request_error,
538 
539     .timer_handler = nxt_h1p_conn_request_timeout,
540     .timer_value = nxt_h1p_conn_request_timer_value,
541     .timer_data = offsetof(nxt_socket_conf_t, header_read_timeout),
542 };
543 
544 
545 static void
nxt_h1p_conn_request_header_parse(nxt_task_t * task,void * obj,void * data)546 nxt_h1p_conn_request_header_parse(nxt_task_t *task, void *obj, void *data)
547 {
548     nxt_int_t           ret;
549     nxt_conn_t          *c;
550     nxt_h1proto_t       *h1p;
551     nxt_http_status_t   status;
552     nxt_http_request_t  *r;
553 
554     c = obj;
555     h1p = data;
556 
557     nxt_debug(task, "h1p conn header parse");
558 
559     ret = nxt_http_parse_request(&h1p->parser, &c->read->mem);
560 
561     ret = nxt_expect(NXT_DONE, ret);
562 
563     if (ret != NXT_AGAIN) {
564         nxt_timer_disable(task->thread->engine, &c->read_timer);
565     }
566 
567     r = h1p->request;
568 
569     switch (ret) {
570 
571     case NXT_DONE:
572         /*
573          * By default the keepalive mode is disabled in HTTP/1.0 and
574          * enabled in HTTP/1.1.  The mode can be overridden later by
575          * the "Connection" field processed in nxt_h1p_connection().
576          */
577         h1p->keepalive = (h1p->parser.version.s.minor != '0');
578 
579         ret = nxt_h1p_header_process(task, h1p, r);
580 
581         if (nxt_fast_path(ret == NXT_OK)) {
582 
583 #if (NXT_TLS)
584             if (c->u.tls == NULL && r->conf->socket_conf->tls != NULL) {
585                 status = NXT_HTTP_TO_HTTPS;
586                 goto error;
587             }
588 #endif
589 
590             r->state->ready_handler(task, r, NULL);
591             return;
592         }
593 
594         status = ret;
595         goto error;
596 
597     case NXT_AGAIN:
598         status = nxt_h1p_header_buffer_test(task, h1p, c, r->conf->socket_conf);
599 
600         if (nxt_fast_path(status == NXT_OK)) {
601             c->read_state = &nxt_h1p_header_parse_state;
602 
603             nxt_conn_read(task->thread->engine, c);
604             return;
605         }
606 
607         break;
608 
609     case NXT_HTTP_PARSE_INVALID:
610         status = NXT_HTTP_BAD_REQUEST;
611         break;
612 
613     case NXT_HTTP_PARSE_UNSUPPORTED_VERSION:
614         status = NXT_HTTP_VERSION_NOT_SUPPORTED;
615         break;
616 
617     case NXT_HTTP_PARSE_TOO_LARGE_FIELD:
618         status = NXT_HTTP_REQUEST_HEADER_FIELDS_TOO_LARGE;
619         break;
620 
621     default:
622     case NXT_ERROR:
623         status = NXT_HTTP_INTERNAL_SERVER_ERROR;
624         break;
625     }
626 
627     (void) nxt_h1p_header_process(task, h1p, r);
628 
629 error:
630 
631     h1p->keepalive = 0;
632 
633     nxt_http_request_error(task, r, status);
634 }
635 
636 
637 static nxt_int_t
nxt_h1p_header_process(nxt_task_t * task,nxt_h1proto_t * h1p,nxt_http_request_t * r)638 nxt_h1p_header_process(nxt_task_t *task, nxt_h1proto_t *h1p,
639     nxt_http_request_t *r)
640 {
641     u_char     *m;
642     nxt_int_t  ret;
643 
644     r->target.start = h1p->parser.target_start;
645     r->target.length = h1p->parser.target_end - h1p->parser.target_start;
646 
647     if (h1p->parser.version.ui64 != 0) {
648         r->version.start = h1p->parser.version.str;
649         r->version.length = sizeof(h1p->parser.version.str);
650     }
651 
652     r->method = &h1p->parser.method;
653     r->path = &h1p->parser.path;
654     r->args = &h1p->parser.args;
655 
656     r->fields = h1p->parser.fields;
657 
658     ret = nxt_http_fields_process(r->fields, &nxt_h1p_fields_hash, r);
659     if (nxt_slow_path(ret != NXT_OK)) {
660         return ret;
661     }
662 
663     if (h1p->connection_upgrade && h1p->upgrade_websocket) {
664         m = h1p->parser.method.start;
665 
666         if (nxt_slow_path(h1p->parser.method.length != 3
667                           || m[0] != 'G'
668                           || m[1] != 'E'
669                           || m[2] != 'T'))
670         {
671             nxt_log(task, NXT_LOG_INFO, "h1p upgrade: bad method");
672 
673             return NXT_HTTP_BAD_REQUEST;
674         }
675 
676         if (nxt_slow_path(h1p->parser.version.s.minor != '1')) {
677             nxt_log(task, NXT_LOG_INFO, "h1p upgrade: bad protocol version");
678 
679             return NXT_HTTP_BAD_REQUEST;
680         }
681 
682         if (nxt_slow_path(h1p->websocket_key == NULL)) {
683             nxt_log(task, NXT_LOG_INFO,
684                     "h1p upgrade: bad or absent websocket key");
685 
686             return NXT_HTTP_BAD_REQUEST;
687         }
688 
689         if (nxt_slow_path(h1p->websocket_version_ok == 0)) {
690             nxt_log(task, NXT_LOG_INFO,
691                     "h1p upgrade: bad or absent websocket version");
692 
693             return NXT_HTTP_UPGRADE_REQUIRED;
694         }
695 
696         r->websocket_handshake = 1;
697     }
698 
699     return ret;
700 }
701 
702 
703 static nxt_int_t
nxt_h1p_header_buffer_test(nxt_task_t * task,nxt_h1proto_t * h1p,nxt_conn_t * c,nxt_socket_conf_t * skcf)704 nxt_h1p_header_buffer_test(nxt_task_t *task, nxt_h1proto_t *h1p, nxt_conn_t *c,
705     nxt_socket_conf_t *skcf)
706 {
707     size_t     size, used;
708     nxt_buf_t  *in, *b;
709 
710     in = c->read;
711 
712     if (nxt_buf_mem_free_size(&in->mem) == 0) {
713         size = skcf->large_header_buffer_size;
714         used = nxt_buf_mem_used_size(&in->mem);
715 
716         if (size <= used || h1p->nbuffers >= skcf->large_header_buffers) {
717             return NXT_HTTP_REQUEST_HEADER_FIELDS_TOO_LARGE;
718         }
719 
720         b = nxt_buf_mem_alloc(c->mem_pool, size, 0);
721         if (nxt_slow_path(b == NULL)) {
722             return NXT_HTTP_INTERNAL_SERVER_ERROR;
723         }
724 
725         b->mem.free = nxt_cpymem(b->mem.pos, in->mem.pos, used);
726 
727         in->next = h1p->buffers;
728         h1p->buffers = in;
729         h1p->nbuffers++;
730 
731         c->read = b;
732     }
733 
734     return NXT_OK;
735 }
736 
737 
738 static nxt_int_t
nxt_h1p_connection(void * ctx,nxt_http_field_t * field,uintptr_t data)739 nxt_h1p_connection(void *ctx, nxt_http_field_t *field, uintptr_t data)
740 {
741     nxt_http_request_t  *r;
742 
743     r = ctx;
744     field->hopbyhop = 1;
745 
746     if (field->value_length == 5
747         && nxt_memcasecmp(field->value, "close", 5) == 0)
748     {
749         r->proto.h1->keepalive = 0;
750 
751     } else if (field->value_length == 10
752                && nxt_memcasecmp(field->value, "keep-alive", 10) == 0)
753     {
754         r->proto.h1->keepalive = 1;
755 
756     } else if (field->value_length == 7
757                && nxt_memcasecmp(field->value, "upgrade", 7) == 0)
758     {
759         r->proto.h1->connection_upgrade = 1;
760     }
761 
762     return NXT_OK;
763 }
764 
765 
766 static nxt_int_t
nxt_h1p_upgrade(void * ctx,nxt_http_field_t * field,uintptr_t data)767 nxt_h1p_upgrade(void *ctx, nxt_http_field_t *field, uintptr_t data)
768 {
769     nxt_http_request_t  *r;
770 
771     r = ctx;
772 
773     if (field->value_length == 9
774         && nxt_memcasecmp(field->value, "websocket", 9) == 0)
775     {
776         r->proto.h1->upgrade_websocket = 1;
777     }
778 
779     return NXT_OK;
780 }
781 
782 
783 static nxt_int_t
nxt_h1p_websocket_key(void * ctx,nxt_http_field_t * field,uintptr_t data)784 nxt_h1p_websocket_key(void *ctx, nxt_http_field_t *field, uintptr_t data)
785 {
786     nxt_http_request_t  *r;
787 
788     r = ctx;
789 
790     if (field->value_length == 24) {
791         r->proto.h1->websocket_key = field;
792     }
793 
794     return NXT_OK;
795 }
796 
797 
798 static nxt_int_t
nxt_h1p_websocket_version(void * ctx,nxt_http_field_t * field,uintptr_t data)799 nxt_h1p_websocket_version(void *ctx, nxt_http_field_t *field, uintptr_t data)
800 {
801     nxt_http_request_t  *r;
802 
803     r = ctx;
804 
805     if (field->value_length == 2
806         && field->value[0] == '1' && field->value[1] == '3')
807     {
808         r->proto.h1->websocket_version_ok = 1;
809     }
810 
811     return NXT_OK;
812 }
813 
814 
815 static nxt_int_t
nxt_h1p_transfer_encoding(void * ctx,nxt_http_field_t * field,uintptr_t data)816 nxt_h1p_transfer_encoding(void *ctx, nxt_http_field_t *field, uintptr_t data)
817 {
818     nxt_http_te_t       te;
819     nxt_http_request_t  *r;
820 
821     r = ctx;
822     field->skip = 1;
823     field->hopbyhop = 1;
824 
825     if (field->value_length == 7
826         && nxt_memcmp(field->value, "chunked", 7) == 0)
827     {
828         te = NXT_HTTP_TE_CHUNKED;
829 
830     } else {
831         te = NXT_HTTP_TE_UNSUPPORTED;
832     }
833 
834     r->proto.h1->transfer_encoding = te;
835 
836     return NXT_OK;
837 }
838 
839 
840 static void
nxt_h1p_request_body_read(nxt_task_t * task,nxt_http_request_t * r)841 nxt_h1p_request_body_read(nxt_task_t *task, nxt_http_request_t *r)
842 {
843     size_t             size, body_length, body_buffer_size, body_rest;
844     ssize_t            res;
845     nxt_str_t          *tmp_path, tmp_name;
846     nxt_buf_t          *in, *b;
847     nxt_conn_t         *c;
848     nxt_h1proto_t      *h1p;
849     nxt_http_status_t  status;
850 
851     static const nxt_str_t tmp_name_pattern = nxt_string("/req-XXXXXXXX");
852 
853     h1p = r->proto.h1;
854 
855     nxt_debug(task, "h1p request body read %O te:%d",
856               r->content_length_n, h1p->transfer_encoding);
857 
858     switch (h1p->transfer_encoding) {
859 
860     case NXT_HTTP_TE_CHUNKED:
861         status = NXT_HTTP_LENGTH_REQUIRED;
862         goto error;
863 
864     case NXT_HTTP_TE_UNSUPPORTED:
865         status = NXT_HTTP_NOT_IMPLEMENTED;
866         goto error;
867 
868     default:
869     case NXT_HTTP_TE_NONE:
870         break;
871     }
872 
873     if (r->content_length_n == -1 || r->content_length_n == 0) {
874         goto ready;
875     }
876 
877     body_length = (size_t) r->content_length_n;
878 
879     body_buffer_size = nxt_min(r->conf->socket_conf->body_buffer_size,
880                                body_length);
881 
882     if (body_length > body_buffer_size) {
883         tmp_path = &r->conf->socket_conf->body_temp_path;
884 
885         tmp_name.length = tmp_path->length + tmp_name_pattern.length;
886 
887         b = nxt_buf_file_alloc(r->mem_pool,
888                                body_buffer_size + sizeof(nxt_file_t)
889                                + tmp_name.length + 1, 0);
890 
891     } else {
892         /* This initialization required for CentOS 6, gcc 4.4.7. */
893         tmp_path = NULL;
894         tmp_name.length = 0;
895 
896         b = nxt_buf_mem_alloc(r->mem_pool, body_buffer_size, 0);
897     }
898 
899     if (nxt_slow_path(b == NULL)) {
900         status = NXT_HTTP_INTERNAL_SERVER_ERROR;
901         goto error;
902     }
903 
904     r->body = b;
905 
906     if (body_length > body_buffer_size) {
907         tmp_name.start = nxt_pointer_to(b->mem.start, sizeof(nxt_file_t));
908 
909         memcpy(tmp_name.start, tmp_path->start, tmp_path->length);
910         memcpy(tmp_name.start + tmp_path->length, tmp_name_pattern.start,
911                tmp_name_pattern.length);
912         tmp_name.start[tmp_name.length] = '\0';
913 
914         b->file = (nxt_file_t *) b->mem.start;
915         nxt_memzero(b->file, sizeof(nxt_file_t));
916         b->file->fd = -1;
917         b->file->size = body_length;
918 
919         b->mem.start += sizeof(nxt_file_t) + tmp_name.length + 1;
920         b->mem.pos = b->mem.start;
921         b->mem.free = b->mem.start;
922 
923         b->file->fd = mkstemp((char *) tmp_name.start);
924         if (nxt_slow_path(b->file->fd == -1)) {
925             nxt_alert(task, "mkstemp(%s) failed %E", tmp_name.start, nxt_errno);
926 
927             status = NXT_HTTP_INTERNAL_SERVER_ERROR;
928             goto error;
929         }
930 
931         nxt_debug(task, "create body tmp file \"%V\", %d",
932                   &tmp_name, b->file->fd);
933 
934         unlink((char *) tmp_name.start);
935     }
936 
937     body_rest = body_length;
938 
939     in = h1p->conn->read;
940 
941     size = nxt_buf_mem_used_size(&in->mem);
942 
943     if (size != 0) {
944         size = nxt_min(size, body_length);
945 
946         if (nxt_buf_is_file(b)) {
947             res = nxt_fd_write(b->file->fd, in->mem.pos, size);
948             if (nxt_slow_path(res < (ssize_t) size)) {
949                 status = NXT_HTTP_INTERNAL_SERVER_ERROR;
950                 goto error;
951             }
952 
953             b->file_end += size;
954 
955         } else {
956             size = nxt_min(body_buffer_size, size);
957             b->mem.free = nxt_cpymem(b->mem.free, in->mem.pos, size);
958         }
959 
960         in->mem.pos += size;
961         body_rest -= size;
962     }
963 
964     nxt_debug(task, "h1p body rest: %uz", body_rest);
965 
966     if (body_rest != 0) {
967         in->next = h1p->buffers;
968         h1p->buffers = in;
969         h1p->nbuffers++;
970 
971         c = h1p->conn;
972         c->read = b;
973         c->read_state = &nxt_h1p_read_body_state;
974 
975         nxt_conn_read(task->thread->engine, c);
976         return;
977     }
978 
979     if (nxt_buf_is_file(b)) {
980         b->mem.start = NULL;
981         b->mem.end = NULL;
982         b->mem.pos = NULL;
983         b->mem.free = NULL;
984     }
985 
986 ready:
987 
988     r->state->ready_handler(task, r, NULL);
989 
990     return;
991 
992 error:
993 
994     h1p->keepalive = 0;
995 
996     nxt_http_request_error(task, r, status);
997 }
998 
999 
1000 static const nxt_conn_state_t  nxt_h1p_read_body_state
1001     nxt_aligned(64) =
1002 {
1003     .ready_handler = nxt_h1p_conn_request_body_read,
1004     .close_handler = nxt_h1p_conn_request_error,
1005     .error_handler = nxt_h1p_conn_request_error,
1006 
1007     .timer_handler = nxt_h1p_conn_request_timeout,
1008     .timer_value = nxt_h1p_conn_request_timer_value,
1009     .timer_data = offsetof(nxt_socket_conf_t, body_read_timeout),
1010     .timer_autoreset = 1,
1011 };
1012 
1013 
1014 static void
nxt_h1p_conn_request_body_read(nxt_task_t * task,void * obj,void * data)1015 nxt_h1p_conn_request_body_read(nxt_task_t *task, void *obj, void *data)
1016 {
1017     size_t              size, body_rest;
1018     ssize_t             res;
1019     nxt_buf_t           *b;
1020     nxt_conn_t          *c;
1021     nxt_h1proto_t       *h1p;
1022     nxt_http_request_t  *r;
1023     nxt_event_engine_t  *engine;
1024 
1025     c = obj;
1026     h1p = data;
1027 
1028     nxt_debug(task, "h1p conn request body read");
1029 
1030     r = h1p->request;
1031 
1032     engine = task->thread->engine;
1033 
1034     b = c->read;
1035 
1036     if (nxt_buf_is_file(b)) {
1037         body_rest = b->file->size - b->file_end;
1038 
1039         size = nxt_buf_mem_used_size(&b->mem);
1040         size = nxt_min(size, body_rest);
1041 
1042         res = nxt_fd_write(b->file->fd, b->mem.pos, size);
1043         if (nxt_slow_path(res < (ssize_t) size)) {
1044             nxt_h1p_request_error(task, h1p, r);
1045             return;
1046         }
1047 
1048         b->file_end += size;
1049         body_rest -= res;
1050 
1051         b->mem.pos += size;
1052 
1053         if (b->mem.pos == b->mem.free) {
1054             if (body_rest >= (size_t) nxt_buf_mem_size(&b->mem)) {
1055                 b->mem.free = b->mem.start;
1056 
1057             } else {
1058                 /* This required to avoid reading next request. */
1059                 b->mem.free = b->mem.end - body_rest;
1060             }
1061 
1062             b->mem.pos = b->mem.free;
1063         }
1064 
1065     } else {
1066         body_rest = nxt_buf_mem_free_size(&c->read->mem);
1067     }
1068 
1069     nxt_debug(task, "h1p body rest: %uz", body_rest);
1070 
1071     if (body_rest != 0) {
1072         nxt_conn_read(engine, c);
1073 
1074     } else {
1075         if (nxt_buf_is_file(b)) {
1076             b->mem.start = NULL;
1077             b->mem.end = NULL;
1078             b->mem.pos = NULL;
1079             b->mem.free = NULL;
1080         }
1081 
1082         c->read = NULL;
1083 
1084         r->state->ready_handler(task, r, NULL);
1085     }
1086 }
1087 
1088 
1089 static void
nxt_h1p_request_local_addr(nxt_task_t * task,nxt_http_request_t * r)1090 nxt_h1p_request_local_addr(nxt_task_t *task, nxt_http_request_t *r)
1091 {
1092     r->local = nxt_conn_local_addr(task, r->proto.h1->conn);
1093 }
1094 
1095 
1096 #define NXT_HTTP_LAST_INFORMATIONAL                                           \
1097     (NXT_HTTP_CONTINUE + nxt_nitems(nxt_http_informational) - 1)
1098 
1099 static const nxt_str_t  nxt_http_informational[] = {
1100     nxt_string("HTTP/1.1 100 Continue\r\n"),
1101     nxt_string("HTTP/1.1 101 Switching Protocols\r\n"),
1102 };
1103 
1104 
1105 #define NXT_HTTP_LAST_SUCCESS                                                 \
1106     (NXT_HTTP_OK + nxt_nitems(nxt_http_success) - 1)
1107 
1108 static const nxt_str_t  nxt_http_success[] = {
1109     nxt_string("HTTP/1.1 200 OK\r\n"),
1110     nxt_string("HTTP/1.1 201 Created\r\n"),
1111     nxt_string("HTTP/1.1 202 Accepted\r\n"),
1112     nxt_string("HTTP/1.1 203 Non-Authoritative Information\r\n"),
1113     nxt_string("HTTP/1.1 204 No Content\r\n"),
1114     nxt_string("HTTP/1.1 205 Reset Content\r\n"),
1115     nxt_string("HTTP/1.1 206 Partial Content\r\n"),
1116 };
1117 
1118 
1119 #define NXT_HTTP_LAST_REDIRECTION                                             \
1120     (NXT_HTTP_MULTIPLE_CHOICES + nxt_nitems(nxt_http_redirection) - 1)
1121 
1122 static const nxt_str_t  nxt_http_redirection[] = {
1123     nxt_string("HTTP/1.1 300 Multiple Choices\r\n"),
1124     nxt_string("HTTP/1.1 301 Moved Permanently\r\n"),
1125     nxt_string("HTTP/1.1 302 Found\r\n"),
1126     nxt_string("HTTP/1.1 303 See Other\r\n"),
1127     nxt_string("HTTP/1.1 304 Not Modified\r\n"),
1128     nxt_string("HTTP/1.1 307 Temporary Redirect\r\n"),
1129     nxt_string("HTTP/1.1 308 Permanent Redirect\r\n"),
1130 };
1131 
1132 
1133 #define NXT_HTTP_LAST_CLIENT_ERROR                                            \
1134     (NXT_HTTP_BAD_REQUEST + nxt_nitems(nxt_http_client_error) - 1)
1135 
1136 static const nxt_str_t  nxt_http_client_error[] = {
1137     nxt_string("HTTP/1.1 400 Bad Request\r\n"),
1138     nxt_string("HTTP/1.1 401 Unauthorized\r\n"),
1139     nxt_string("HTTP/1.1 402 Payment Required\r\n"),
1140     nxt_string("HTTP/1.1 403 Forbidden\r\n"),
1141     nxt_string("HTTP/1.1 404 Not Found\r\n"),
1142     nxt_string("HTTP/1.1 405 Method Not Allowed\r\n"),
1143     nxt_string("HTTP/1.1 406 Not Acceptable\r\n"),
1144     nxt_string("HTTP/1.1 407 Proxy Authentication Required\r\n"),
1145     nxt_string("HTTP/1.1 408 Request Timeout\r\n"),
1146     nxt_string("HTTP/1.1 409 Conflict\r\n"),
1147     nxt_string("HTTP/1.1 410 Gone\r\n"),
1148     nxt_string("HTTP/1.1 411 Length Required\r\n"),
1149     nxt_string("HTTP/1.1 412 Precondition Failed\r\n"),
1150     nxt_string("HTTP/1.1 413 Payload Too Large\r\n"),
1151     nxt_string("HTTP/1.1 414 URI Too Long\r\n"),
1152     nxt_string("HTTP/1.1 415 Unsupported Media Type\r\n"),
1153     nxt_string("HTTP/1.1 416 Range Not Satisfiable\r\n"),
1154     nxt_string("HTTP/1.1 417 Expectation Failed\r\n"),
1155     nxt_string("HTTP/1.1 418 I'm a teapot\r\n"),
1156     nxt_string("HTTP/1.1 419 \r\n"),
1157     nxt_string("HTTP/1.1 420 \r\n"),
1158     nxt_string("HTTP/1.1 421 Misdirected Request\r\n"),
1159     nxt_string("HTTP/1.1 422 Unprocessable Entity\r\n"),
1160     nxt_string("HTTP/1.1 423 Locked\r\n"),
1161     nxt_string("HTTP/1.1 424 Failed Dependency\r\n"),
1162     nxt_string("HTTP/1.1 425 \r\n"),
1163     nxt_string("HTTP/1.1 426 Upgrade Required\r\n"),
1164     nxt_string("HTTP/1.1 427 \r\n"),
1165     nxt_string("HTTP/1.1 428 \r\n"),
1166     nxt_string("HTTP/1.1 429 \r\n"),
1167     nxt_string("HTTP/1.1 430 \r\n"),
1168     nxt_string("HTTP/1.1 431 Request Header Fields Too Large\r\n"),
1169 };
1170 
1171 
1172 #define NXT_HTTP_LAST_NGINX_ERROR                                             \
1173     (NXT_HTTP_TO_HTTPS + nxt_nitems(nxt_http_nginx_error) - 1)
1174 
1175 static const nxt_str_t  nxt_http_nginx_error[] = {
1176     nxt_string("HTTP/1.1 400 "
1177                "The plain HTTP request was sent to HTTPS port\r\n"),
1178 };
1179 
1180 
1181 #define NXT_HTTP_LAST_SERVER_ERROR                                            \
1182     (NXT_HTTP_INTERNAL_SERVER_ERROR + nxt_nitems(nxt_http_server_error) - 1)
1183 
1184 static const nxt_str_t  nxt_http_server_error[] = {
1185     nxt_string("HTTP/1.1 500 Internal Server Error\r\n"),
1186     nxt_string("HTTP/1.1 501 Not Implemented\r\n"),
1187     nxt_string("HTTP/1.1 502 Bad Gateway\r\n"),
1188     nxt_string("HTTP/1.1 503 Service Unavailable\r\n"),
1189     nxt_string("HTTP/1.1 504 Gateway Timeout\r\n"),
1190     nxt_string("HTTP/1.1 505 HTTP Version Not Supported\r\n"),
1191 };
1192 
1193 
1194 #define UNKNOWN_STATUS_LENGTH  nxt_length("HTTP/1.1 999 \r\n")
1195 
1196 static void
nxt_h1p_request_header_send(nxt_task_t * task,nxt_http_request_t * r,nxt_work_handler_t body_handler,void * data)1197 nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r,
1198     nxt_work_handler_t body_handler, void *data)
1199 {
1200     u_char              *p;
1201     size_t              size;
1202     nxt_buf_t           *header;
1203     nxt_str_t           unknown_status;
1204     nxt_int_t           conn;
1205     nxt_uint_t          n;
1206     nxt_bool_t          http11;
1207     nxt_conn_t          *c;
1208     nxt_h1proto_t       *h1p;
1209     const nxt_str_t     *status;
1210     nxt_http_field_t    *field;
1211     u_char              buf[UNKNOWN_STATUS_LENGTH];
1212 
1213     static const char   chunked[] = "Transfer-Encoding: chunked\r\n";
1214     static const char   websocket_version[] = "Sec-WebSocket-Version: 13\r\n";
1215 
1216     static const nxt_str_t  connection[3] = {
1217         nxt_string("Connection: close\r\n"),
1218         nxt_string("Connection: keep-alive\r\n"),
1219         nxt_string("Upgrade: websocket\r\n"
1220                    "Connection: Upgrade\r\n"
1221                    "Sec-WebSocket-Accept: "),
1222     };
1223 
1224     nxt_debug(task, "h1p request header send");
1225 
1226     r->header_sent = 1;
1227     h1p = r->proto.h1;
1228     n = r->status;
1229 
1230     if (n >= NXT_HTTP_CONTINUE && n <= NXT_HTTP_LAST_INFORMATIONAL) {
1231         status = &nxt_http_informational[n - NXT_HTTP_CONTINUE];
1232 
1233     } else if (n >= NXT_HTTP_OK && n <= NXT_HTTP_LAST_SUCCESS) {
1234         status = &nxt_http_success[n - NXT_HTTP_OK];
1235 
1236     } else if (n >= NXT_HTTP_MULTIPLE_CHOICES
1237                && n <= NXT_HTTP_LAST_REDIRECTION)
1238     {
1239         status = &nxt_http_redirection[n - NXT_HTTP_MULTIPLE_CHOICES];
1240 
1241     } else if (n >= NXT_HTTP_BAD_REQUEST && n <= NXT_HTTP_LAST_CLIENT_ERROR) {
1242         status = &nxt_http_client_error[n - NXT_HTTP_BAD_REQUEST];
1243 
1244     } else if (n >= NXT_HTTP_TO_HTTPS && n <= NXT_HTTP_LAST_NGINX_ERROR) {
1245         status = &nxt_http_nginx_error[n - NXT_HTTP_TO_HTTPS];
1246 
1247     } else if (n >= NXT_HTTP_INTERNAL_SERVER_ERROR
1248                && n <= NXT_HTTP_LAST_SERVER_ERROR)
1249     {
1250         status = &nxt_http_server_error[n - NXT_HTTP_INTERNAL_SERVER_ERROR];
1251 
1252     } else if (n <= NXT_HTTP_STATUS_MAX) {
1253         (void) nxt_sprintf(buf, buf + UNKNOWN_STATUS_LENGTH,
1254                            "HTTP/1.1 %03d \r\n", n);
1255 
1256         unknown_status.length = UNKNOWN_STATUS_LENGTH;
1257         unknown_status.start = buf;
1258         status = &unknown_status;
1259 
1260     } else {
1261         status = &nxt_http_server_error[0];
1262     }
1263 
1264     size = status->length;
1265     /* Trailing CRLF at the end of header. */
1266     size += nxt_length("\r\n");
1267 
1268     conn = -1;
1269 
1270     if (r->websocket_handshake && n == NXT_HTTP_SWITCHING_PROTOCOLS) {
1271         h1p->websocket = 1;
1272         h1p->keepalive = 0;
1273         conn = 2;
1274         size += NXT_WEBSOCKET_ACCEPT_SIZE + 2;
1275 
1276     } else {
1277         http11 = (h1p->parser.version.s.minor != '0');
1278 
1279         if (r->resp.content_length == NULL || r->resp.content_length->skip) {
1280 
1281             if (http11) {
1282                 if (n != NXT_HTTP_NOT_MODIFIED
1283                     && n != NXT_HTTP_NO_CONTENT
1284                     && body_handler != NULL
1285                     && !h1p->websocket)
1286                 {
1287                     h1p->chunked = 1;
1288                     size += nxt_length(chunked);
1289                     /* Trailing CRLF will be added by the first chunk header. */
1290                     size -= nxt_length("\r\n");
1291                 }
1292 
1293             } else {
1294                 h1p->keepalive = 0;
1295             }
1296         }
1297 
1298         if (http11 ^ h1p->keepalive) {
1299             conn = h1p->keepalive;
1300         }
1301     }
1302 
1303     if (conn >= 0) {
1304         size += connection[conn].length;
1305     }
1306 
1307     nxt_list_each(field, r->resp.fields) {
1308 
1309         if (!field->skip) {
1310             size += field->name_length + field->value_length;
1311             size += nxt_length(": \r\n");
1312         }
1313 
1314     } nxt_list_loop;
1315 
1316     if (nxt_slow_path(n == NXT_HTTP_UPGRADE_REQUIRED)) {
1317         size += nxt_length(websocket_version);
1318     }
1319 
1320     header = nxt_http_buf_mem(task, r, size);
1321     if (nxt_slow_path(header == NULL)) {
1322         nxt_h1p_request_error(task, h1p, r);
1323         return;
1324     }
1325 
1326     p = nxt_cpymem(header->mem.free, status->start, status->length);
1327 
1328     nxt_list_each(field, r->resp.fields) {
1329 
1330         if (!field->skip) {
1331             p = nxt_cpymem(p, field->name, field->name_length);
1332             *p++ = ':'; *p++ = ' ';
1333             p = nxt_cpymem(p, field->value, field->value_length);
1334             *p++ = '\r'; *p++ = '\n';
1335         }
1336 
1337     } nxt_list_loop;
1338 
1339     if (conn >= 0) {
1340         p = nxt_cpymem(p, connection[conn].start, connection[conn].length);
1341     }
1342 
1343     if (h1p->websocket) {
1344         nxt_websocket_accept(p, h1p->websocket_key->value);
1345         p += NXT_WEBSOCKET_ACCEPT_SIZE;
1346 
1347         *p++ = '\r'; *p++ = '\n';
1348     }
1349 
1350     if (nxt_slow_path(n == NXT_HTTP_UPGRADE_REQUIRED)) {
1351         p = nxt_cpymem(p, websocket_version, nxt_length(websocket_version));
1352     }
1353 
1354     if (h1p->chunked) {
1355         p = nxt_cpymem(p, chunked, nxt_length(chunked));
1356         /* Trailing CRLF will be added by the first chunk header. */
1357 
1358     } else {
1359         *p++ = '\r'; *p++ = '\n';
1360     }
1361 
1362     header->mem.free = p;
1363 
1364     h1p->header_size = nxt_buf_mem_used_size(&header->mem);
1365 
1366     c = h1p->conn;
1367 
1368     c->write = header;
1369     h1p->conn_write_tail = &header->next;
1370     c->write_state = &nxt_h1p_request_send_state;
1371 
1372     if (body_handler != NULL) {
1373         /*
1374          * The body handler will run before c->io->write() handler,
1375          * because the latter was inqueued by nxt_conn_write()
1376          * in engine->write_work_queue.
1377          */
1378         nxt_work_queue_add(&task->thread->engine->fast_work_queue,
1379                            body_handler, task, r, data);
1380 
1381     } else {
1382         header->next = nxt_http_buf_last(r);
1383     }
1384 
1385     nxt_conn_write(task->thread->engine, c);
1386 
1387     if (h1p->websocket) {
1388         nxt_h1p_websocket_first_frame_start(task, r, c->read);
1389     }
1390 }
1391 
1392 
1393 void
nxt_h1p_complete_buffers(nxt_task_t * task,nxt_h1proto_t * h1p,nxt_bool_t all)1394 nxt_h1p_complete_buffers(nxt_task_t *task, nxt_h1proto_t *h1p, nxt_bool_t all)
1395 {
1396     size_t            size;
1397     nxt_buf_t         *b, *in, *next;
1398     nxt_conn_t        *c;
1399 
1400     nxt_debug(task, "h1p complete buffers");
1401 
1402     b = h1p->buffers;
1403     c = h1p->conn;
1404     in = c->read;
1405 
1406     if (b != NULL) {
1407         if (in == NULL) {
1408             /* A request with large body. */
1409             in = b;
1410             c->read = in;
1411 
1412             b = in->next;
1413             in->next = NULL;
1414         }
1415 
1416         while (b != NULL) {
1417             next = b->next;
1418             b->next = NULL;
1419 
1420             b->completion_handler(task, b, b->parent);
1421 
1422             b = next;
1423         }
1424 
1425         h1p->buffers = NULL;
1426         h1p->nbuffers = 0;
1427     }
1428 
1429     if (in != NULL) {
1430         size = nxt_buf_mem_used_size(&in->mem);
1431 
1432         if (size == 0 || all) {
1433             in->completion_handler(task, in, in->parent);
1434 
1435             c->read = NULL;
1436         }
1437     }
1438 }
1439 
1440 
1441 static const nxt_conn_state_t  nxt_h1p_request_send_state
1442     nxt_aligned(64) =
1443 {
1444     .ready_handler = nxt_h1p_conn_sent,
1445     .error_handler = nxt_h1p_conn_request_error,
1446 
1447     .timer_handler = nxt_h1p_conn_request_send_timeout,
1448     .timer_value = nxt_h1p_conn_request_timer_value,
1449     .timer_data = offsetof(nxt_socket_conf_t, send_timeout),
1450     .timer_autoreset = 1,
1451 };
1452 
1453 
1454 static void
nxt_h1p_request_send(nxt_task_t * task,nxt_http_request_t * r,nxt_buf_t * out)1455 nxt_h1p_request_send(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *out)
1456 {
1457     nxt_conn_t     *c;
1458     nxt_h1proto_t  *h1p;
1459 
1460     nxt_debug(task, "h1p request send");
1461 
1462     h1p = r->proto.h1;
1463     c = h1p->conn;
1464 
1465     if (h1p->chunked) {
1466         out = nxt_h1p_chunk_create(task, r, out);
1467         if (nxt_slow_path(out == NULL)) {
1468             nxt_h1p_request_error(task, h1p, r);
1469             return;
1470         }
1471     }
1472 
1473     if (c->write == NULL) {
1474         c->write = out;
1475         c->write_state = &nxt_h1p_request_send_state;
1476 
1477         nxt_conn_write(task->thread->engine, c);
1478 
1479     } else {
1480         *h1p->conn_write_tail = out;
1481     }
1482 
1483     while (out->next != NULL) {
1484         out = out->next;
1485     }
1486 
1487     h1p->conn_write_tail = &out->next;
1488 }
1489 
1490 
1491 static nxt_buf_t *
nxt_h1p_chunk_create(nxt_task_t * task,nxt_http_request_t * r,nxt_buf_t * out)1492 nxt_h1p_chunk_create(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *out)
1493 {
1494     nxt_off_t          size;
1495     nxt_buf_t          *b, **prev, *header, *tail;
1496 
1497     const size_t       chunk_size = 2 * nxt_length("\r\n") + NXT_OFF_T_HEXLEN;
1498     static const char  tail_chunk[] = "\r\n0\r\n\r\n";
1499 
1500     size = 0;
1501     prev = &out;
1502 
1503     for (b = out; b != NULL; b = b->next) {
1504 
1505         if (nxt_buf_is_last(b)) {
1506             tail = nxt_http_buf_mem(task, r, sizeof(tail_chunk));
1507             if (nxt_slow_path(tail == NULL)) {
1508                 return NULL;
1509             }
1510 
1511             *prev = tail;
1512             tail->next = b;
1513             /*
1514              * The tail_chunk size with trailing zero is 8 bytes, so
1515              * memcpy may be inlined with just single 8 byte move operation.
1516              */
1517             nxt_memcpy(tail->mem.free, tail_chunk, sizeof(tail_chunk));
1518             tail->mem.free += nxt_length(tail_chunk);
1519 
1520             break;
1521         }
1522 
1523         size += nxt_buf_used_size(b);
1524         prev = &b->next;
1525     }
1526 
1527     if (size == 0) {
1528         return out;
1529     }
1530 
1531     header = nxt_http_buf_mem(task, r, chunk_size);
1532     if (nxt_slow_path(header == NULL)) {
1533         return NULL;
1534     }
1535 
1536     header->next = out;
1537     header->mem.free = nxt_sprintf(header->mem.free, header->mem.end,
1538                                    "\r\n%xO\r\n", size);
1539     return header;
1540 }
1541 
1542 
1543 static nxt_off_t
nxt_h1p_request_body_bytes_sent(nxt_task_t * task,nxt_http_proto_t proto)1544 nxt_h1p_request_body_bytes_sent(nxt_task_t *task, nxt_http_proto_t proto)
1545 {
1546     nxt_off_t      sent;
1547     nxt_h1proto_t  *h1p;
1548 
1549     h1p = proto.h1;
1550 
1551     sent = h1p->conn->sent - h1p->header_size;
1552 
1553     return (sent > 0) ? sent : 0;
1554 }
1555 
1556 
1557 static void
nxt_h1p_request_discard(nxt_task_t * task,nxt_http_request_t * r,nxt_buf_t * last)1558 nxt_h1p_request_discard(nxt_task_t *task, nxt_http_request_t *r,
1559     nxt_buf_t *last)
1560 {
1561     nxt_buf_t         *b;
1562     nxt_conn_t        *c;
1563     nxt_h1proto_t     *h1p;
1564     nxt_work_queue_t  *wq;
1565 
1566     nxt_debug(task, "h1p request discard");
1567 
1568     h1p = r->proto.h1;
1569     h1p->keepalive = 0;
1570 
1571     c = h1p->conn;
1572     b = c->write;
1573     c->write = NULL;
1574 
1575     wq = &task->thread->engine->fast_work_queue;
1576 
1577     nxt_sendbuf_drain(task, wq, b);
1578     nxt_sendbuf_drain(task, wq, last);
1579 }
1580 
1581 
1582 static void
nxt_h1p_conn_request_error(nxt_task_t * task,void * obj,void * data)1583 nxt_h1p_conn_request_error(nxt_task_t *task, void *obj, void *data)
1584 {
1585     nxt_h1proto_t       *h1p;
1586     nxt_http_request_t  *r;
1587 
1588     h1p = data;
1589 
1590     nxt_debug(task, "h1p conn request error");
1591 
1592     r = h1p->request;
1593 
1594     if (nxt_slow_path(r == NULL)) {
1595         nxt_h1p_shutdown(task, h1p->conn);
1596         return;
1597     }
1598 
1599     if (r->fields == NULL) {
1600         (void) nxt_h1p_header_process(task, h1p, r);
1601     }
1602 
1603     if (r->status == 0) {
1604         r->status = NXT_HTTP_BAD_REQUEST;
1605     }
1606 
1607     nxt_h1p_request_error(task, h1p, r);
1608 }
1609 
1610 
1611 static void
nxt_h1p_conn_request_timeout(nxt_task_t * task,void * obj,void * data)1612 nxt_h1p_conn_request_timeout(nxt_task_t *task, void *obj, void *data)
1613 {
1614     nxt_conn_t          *c;
1615     nxt_timer_t         *timer;
1616     nxt_h1proto_t       *h1p;
1617     nxt_http_request_t  *r;
1618 
1619     timer = obj;
1620 
1621     nxt_debug(task, "h1p conn request timeout");
1622 
1623     c = nxt_read_timer_conn(timer);
1624     c->block_read = 1;
1625     /*
1626      * Disable SO_LINGER off during socket closing
1627      * to send "408 Request Timeout" error response.
1628      */
1629     c->socket.timedout = 0;
1630 
1631     h1p = c->socket.data;
1632     h1p->keepalive = 0;
1633     r = h1p->request;
1634 
1635     if (r->fields == NULL) {
1636         (void) nxt_h1p_header_process(task, h1p, r);
1637     }
1638 
1639     nxt_http_request_error(task, r, NXT_HTTP_REQUEST_TIMEOUT);
1640 }
1641 
1642 
1643 static void
nxt_h1p_conn_request_send_timeout(nxt_task_t * task,void * obj,void * data)1644 nxt_h1p_conn_request_send_timeout(nxt_task_t *task, void *obj, void *data)
1645 {
1646     nxt_conn_t     *c;
1647     nxt_timer_t    *timer;
1648     nxt_h1proto_t  *h1p;
1649 
1650     timer = obj;
1651 
1652     nxt_debug(task, "h1p conn request send timeout");
1653 
1654     c = nxt_write_timer_conn(timer);
1655     c->block_write = 1;
1656     h1p = c->socket.data;
1657 
1658     nxt_h1p_request_error(task, h1p, h1p->request);
1659 }
1660 
1661 
1662 nxt_msec_t
nxt_h1p_conn_request_timer_value(nxt_conn_t * c,uintptr_t data)1663 nxt_h1p_conn_request_timer_value(nxt_conn_t *c, uintptr_t data)
1664 {
1665     nxt_h1proto_t  *h1p;
1666 
1667     h1p = c->socket.data;
1668 
1669     return nxt_value_at(nxt_msec_t, h1p->request->conf->socket_conf, data);
1670 }
1671 
1672 
1673 nxt_inline void
nxt_h1p_request_error(nxt_task_t * task,nxt_h1proto_t * h1p,nxt_http_request_t * r)1674 nxt_h1p_request_error(nxt_task_t *task, nxt_h1proto_t *h1p,
1675     nxt_http_request_t *r)
1676 {
1677     h1p->keepalive = 0;
1678 
1679     r->state->error_handler(task, r, h1p);
1680 }
1681 
1682 
1683 static void
nxt_h1p_request_close(nxt_task_t * task,nxt_http_proto_t proto,nxt_socket_conf_joint_t * joint)1684 nxt_h1p_request_close(nxt_task_t *task, nxt_http_proto_t proto,
1685     nxt_socket_conf_joint_t *joint)
1686 {
1687     nxt_conn_t     *c;
1688     nxt_h1proto_t  *h1p;
1689 
1690     nxt_debug(task, "h1p request close");
1691 
1692     h1p = proto.h1;
1693     h1p->keepalive &= !h1p->request->inconsistent;
1694     h1p->request = NULL;
1695 
1696     nxt_router_conf_release(task, joint);
1697 
1698     c = h1p->conn;
1699     task = &c->task;
1700     c->socket.task = task;
1701     c->read_timer.task = task;
1702     c->write_timer.task = task;
1703 
1704     if (h1p->keepalive) {
1705         nxt_h1p_keepalive(task, h1p, c);
1706 
1707     } else {
1708         nxt_h1p_shutdown(task, c);
1709     }
1710 }
1711 
1712 
1713 static void
nxt_h1p_conn_sent(nxt_task_t * task,void * obj,void * data)1714 nxt_h1p_conn_sent(nxt_task_t *task, void *obj, void *data)
1715 {
1716     nxt_conn_t          *c;
1717     nxt_event_engine_t  *engine;
1718 
1719     c = obj;
1720 
1721     nxt_debug(task, "h1p conn sent");
1722 
1723     engine = task->thread->engine;
1724 
1725     c->write = nxt_sendbuf_completion(task, &engine->fast_work_queue, c->write);
1726 
1727     if (c->write != NULL) {
1728         nxt_conn_write(engine, c);
1729     }
1730 }
1731 
1732 
1733 static void
nxt_h1p_conn_close(nxt_task_t * task,void * obj,void * data)1734 nxt_h1p_conn_close(nxt_task_t *task, void *obj, void *data)
1735 {
1736     nxt_conn_t  *c;
1737 
1738     c = obj;
1739 
1740     nxt_debug(task, "h1p conn close");
1741 
1742     nxt_queue_remove(&c->link);
1743 
1744     nxt_h1p_shutdown(task, c);
1745 }
1746 
1747 
1748 static void
nxt_h1p_conn_error(nxt_task_t * task,void * obj,void * data)1749 nxt_h1p_conn_error(nxt_task_t *task, void *obj, void *data)
1750 {
1751     nxt_conn_t  *c;
1752 
1753     c = obj;
1754 
1755     nxt_debug(task, "h1p conn error");
1756 
1757     nxt_queue_remove(&c->link);
1758 
1759     nxt_h1p_shutdown(task, c);
1760 }
1761 
1762 
1763 static nxt_msec_t
nxt_h1p_conn_timer_value(nxt_conn_t * c,uintptr_t data)1764 nxt_h1p_conn_timer_value(nxt_conn_t *c, uintptr_t data)
1765 {
1766     nxt_socket_conf_joint_t  *joint;
1767 
1768     joint = c->listen->socket.data;
1769 
1770     if (nxt_fast_path(joint != NULL)) {
1771         return nxt_value_at(nxt_msec_t, joint->socket_conf, data);
1772     }
1773 
1774     /*
1775      * Listening socket had been closed while
1776      * connection was in keep-alive state.
1777      */
1778     return 1;
1779 }
1780 
1781 
1782 static void
nxt_h1p_keepalive(nxt_task_t * task,nxt_h1proto_t * h1p,nxt_conn_t * c)1783 nxt_h1p_keepalive(nxt_task_t *task, nxt_h1proto_t *h1p, nxt_conn_t *c)
1784 {
1785     size_t              size;
1786     nxt_buf_t           *in;
1787     nxt_event_engine_t  *engine;
1788 
1789     nxt_debug(task, "h1p keepalive");
1790 
1791     if (!c->tcp_nodelay) {
1792         nxt_conn_tcp_nodelay_on(task, c);
1793     }
1794 
1795     nxt_h1p_complete_buffers(task, h1p, 0);
1796 
1797     in = c->read;
1798 
1799     nxt_memzero(h1p, offsetof(nxt_h1proto_t, conn));
1800 
1801     c->sent = 0;
1802 
1803     engine = task->thread->engine;
1804     nxt_queue_insert_head(&engine->idle_connections, &c->link);
1805 
1806     if (in == NULL) {
1807         c->read_state = &nxt_h1p_keepalive_state;
1808 
1809         nxt_conn_read(engine, c);
1810 
1811     } else {
1812         size = nxt_buf_mem_used_size(&in->mem);
1813 
1814         nxt_debug(task, "h1p pipelining");
1815 
1816         nxt_memmove(in->mem.start, in->mem.pos, size);
1817 
1818         in->mem.pos = in->mem.start;
1819         in->mem.free = in->mem.start + size;
1820 
1821         nxt_h1p_conn_request_init(task, c, c->socket.data);
1822     }
1823 }
1824 
1825 
1826 static const nxt_conn_state_t  nxt_h1p_keepalive_state
1827     nxt_aligned(64) =
1828 {
1829     .ready_handler = nxt_h1p_conn_request_init,
1830     .close_handler = nxt_h1p_conn_close,
1831     .error_handler = nxt_h1p_conn_error,
1832 
1833     .io_read_handler = nxt_h1p_idle_io_read_handler,
1834 
1835     .timer_handler = nxt_h1p_idle_timeout,
1836     .timer_value = nxt_h1p_conn_timer_value,
1837     .timer_data = offsetof(nxt_socket_conf_t, idle_timeout),
1838     .timer_autoreset = 1,
1839 };
1840 
1841 
1842 const nxt_conn_state_t  nxt_h1p_idle_close_state
1843     nxt_aligned(64) =
1844 {
1845     .close_handler = nxt_h1p_idle_close,
1846 };
1847 
1848 
1849 static void
nxt_h1p_idle_close(nxt_task_t * task,void * obj,void * data)1850 nxt_h1p_idle_close(nxt_task_t *task, void *obj, void *data)
1851 {
1852     nxt_conn_t  *c;
1853 
1854     c = obj;
1855 
1856     nxt_debug(task, "h1p idle close");
1857 
1858     nxt_queue_remove(&c->link);
1859 
1860     nxt_h1p_idle_response(task, c);
1861 }
1862 
1863 
1864 static void
nxt_h1p_idle_timeout(nxt_task_t * task,void * obj,void * data)1865 nxt_h1p_idle_timeout(nxt_task_t *task, void *obj, void *data)
1866 {
1867     nxt_conn_t   *c;
1868     nxt_timer_t  *timer;
1869 
1870     timer = obj;
1871 
1872     nxt_debug(task, "h1p idle timeout");
1873 
1874     c = nxt_read_timer_conn(timer);
1875     c->block_read = 1;
1876 
1877     nxt_queue_remove(&c->link);
1878 
1879     nxt_h1p_idle_response(task, c);
1880 }
1881 
1882 
1883 #define NXT_H1P_IDLE_TIMEOUT                                                  \
1884     "HTTP/1.1 408 Request Timeout\r\n"                                        \
1885     "Server: " NXT_SERVER "\r\n"                                              \
1886     "Connection: close\r\n"                                                   \
1887     "Content-Length: 0\r\n"                                                   \
1888     "Date: "
1889 
1890 
1891 static void
nxt_h1p_idle_response(nxt_task_t * task,nxt_conn_t * c)1892 nxt_h1p_idle_response(nxt_task_t *task, nxt_conn_t *c)
1893 {
1894     u_char     *p;
1895     size_t     size;
1896     nxt_buf_t  *out, *last;
1897 
1898     size = nxt_length(NXT_H1P_IDLE_TIMEOUT)
1899            + nxt_http_date_cache.size
1900            + nxt_length("\r\n\r\n");
1901 
1902     out = nxt_buf_mem_alloc(c->mem_pool, size, 0);
1903     if (nxt_slow_path(out == NULL)) {
1904         goto fail;
1905     }
1906 
1907     p = nxt_cpymem(out->mem.free, NXT_H1P_IDLE_TIMEOUT,
1908                    nxt_length(NXT_H1P_IDLE_TIMEOUT));
1909 
1910     p = nxt_thread_time_string(task->thread, &nxt_http_date_cache, p);
1911 
1912     out->mem.free = nxt_cpymem(p, "\r\n\r\n", 4);
1913 
1914     last = nxt_mp_zget(c->mem_pool, NXT_BUF_SYNC_SIZE);
1915     if (nxt_slow_path(last == NULL)) {
1916         goto fail;
1917     }
1918 
1919     out->next = last;
1920     nxt_buf_set_sync(last);
1921     nxt_buf_set_last(last);
1922 
1923     last->completion_handler = nxt_h1p_idle_response_sent;
1924     last->parent = c;
1925 
1926     c->write = out;
1927     c->write_state = &nxt_h1p_timeout_response_state;
1928 
1929     nxt_conn_write(task->thread->engine, c);
1930     return;
1931 
1932 fail:
1933 
1934     nxt_h1p_shutdown(task, c);
1935 }
1936 
1937 
1938 static const nxt_conn_state_t  nxt_h1p_timeout_response_state
1939     nxt_aligned(64) =
1940 {
1941     .ready_handler = nxt_h1p_conn_sent,
1942     .error_handler = nxt_h1p_idle_response_error,
1943 
1944     .timer_handler = nxt_h1p_idle_response_timeout,
1945     .timer_value = nxt_h1p_idle_response_timer_value,
1946 };
1947 
1948 
1949 static void
nxt_h1p_idle_response_sent(nxt_task_t * task,void * obj,void * data)1950 nxt_h1p_idle_response_sent(nxt_task_t *task, void *obj, void *data)
1951 {
1952     nxt_conn_t  *c;
1953 
1954     c = data;
1955 
1956     nxt_debug(task, "h1p idle timeout response sent");
1957 
1958     nxt_h1p_shutdown(task, c);
1959 }
1960 
1961 
1962 static void
nxt_h1p_idle_response_error(nxt_task_t * task,void * obj,void * data)1963 nxt_h1p_idle_response_error(nxt_task_t *task, void *obj, void *data)
1964 {
1965     nxt_conn_t  *c;
1966 
1967     c = obj;
1968 
1969     nxt_debug(task, "h1p response error");
1970 
1971     nxt_h1p_shutdown(task, c);
1972 }
1973 
1974 
1975 static void
nxt_h1p_idle_response_timeout(nxt_task_t * task,void * obj,void * data)1976 nxt_h1p_idle_response_timeout(nxt_task_t *task, void *obj, void *data)
1977 {
1978     nxt_conn_t   *c;
1979     nxt_timer_t  *timer;
1980 
1981     timer = obj;
1982 
1983     nxt_debug(task, "h1p idle timeout response timeout");
1984 
1985     c = nxt_read_timer_conn(timer);
1986     c->block_write = 1;
1987 
1988     nxt_h1p_shutdown(task, c);
1989 }
1990 
1991 
1992 static nxt_msec_t
nxt_h1p_idle_response_timer_value(nxt_conn_t * c,uintptr_t data)1993 nxt_h1p_idle_response_timer_value(nxt_conn_t *c, uintptr_t data)
1994 {
1995     return 10 * 1000;
1996 }
1997 
1998 
1999 static void
nxt_h1p_shutdown(nxt_task_t * task,nxt_conn_t * c)2000 nxt_h1p_shutdown(nxt_task_t *task, nxt_conn_t *c)
2001 {
2002     nxt_timer_t    *timer;
2003     nxt_h1proto_t  *h1p;
2004 
2005     nxt_debug(task, "h1p shutdown");
2006 
2007     h1p = c->socket.data;
2008 
2009     if (h1p != NULL) {
2010         nxt_h1p_complete_buffers(task, h1p, 1);
2011 
2012         if (nxt_slow_path(h1p->websocket_timer != NULL)) {
2013             timer = &h1p->websocket_timer->timer;
2014 
2015             if (timer->handler != nxt_h1p_conn_ws_shutdown) {
2016                 timer->handler = nxt_h1p_conn_ws_shutdown;
2017                 nxt_timer_add(task->thread->engine, timer, 0);
2018 
2019             } else {
2020                 nxt_debug(task, "h1p already scheduled ws shutdown");
2021             }
2022 
2023             return;
2024         }
2025     }
2026 
2027     nxt_h1p_closing(task, c);
2028 }
2029 
2030 
2031 static void
nxt_h1p_conn_ws_shutdown(nxt_task_t * task,void * obj,void * data)2032 nxt_h1p_conn_ws_shutdown(nxt_task_t *task, void *obj, void *data)
2033 {
2034     nxt_timer_t                *timer;
2035     nxt_h1p_websocket_timer_t  *ws_timer;
2036 
2037     nxt_debug(task, "h1p conn ws shutdown");
2038 
2039     timer = obj;
2040     ws_timer = nxt_timer_data(timer, nxt_h1p_websocket_timer_t, timer);
2041 
2042     nxt_h1p_closing(task, ws_timer->h1p->conn);
2043 }
2044 
2045 
2046 static void
nxt_h1p_closing(nxt_task_t * task,nxt_conn_t * c)2047 nxt_h1p_closing(nxt_task_t *task, nxt_conn_t *c)
2048 {
2049     nxt_debug(task, "h1p closing");
2050 
2051     c->socket.data = NULL;
2052 
2053 #if (NXT_TLS)
2054 
2055     if (c->u.tls != NULL) {
2056         c->write_state = &nxt_h1p_shutdown_state;
2057 
2058         c->io->shutdown(task, c, NULL);
2059         return;
2060     }
2061 
2062 #endif
2063 
2064     nxt_h1p_conn_closing(task, c, NULL);
2065 }
2066 
2067 
2068 #if (NXT_TLS)
2069 
2070 static const nxt_conn_state_t  nxt_h1p_shutdown_state
2071     nxt_aligned(64) =
2072 {
2073     .ready_handler = nxt_h1p_conn_closing,
2074     .close_handler = nxt_h1p_conn_closing,
2075     .error_handler = nxt_h1p_conn_closing,
2076 };
2077 
2078 #endif
2079 
2080 
2081 static void
nxt_h1p_conn_closing(nxt_task_t * task,void * obj,void * data)2082 nxt_h1p_conn_closing(nxt_task_t *task, void *obj, void *data)
2083 {
2084     nxt_conn_t  *c;
2085 
2086     c = obj;
2087 
2088     nxt_debug(task, "h1p conn closing");
2089 
2090     c->write_state = &nxt_h1p_close_state;
2091 
2092     nxt_conn_close(task->thread->engine, c);
2093 }
2094 
2095 
2096 static const nxt_conn_state_t  nxt_h1p_close_state
2097     nxt_aligned(64) =
2098 {
2099     .ready_handler = nxt_h1p_conn_free,
2100 };
2101 
2102 
2103 static void
nxt_h1p_conn_free(nxt_task_t * task,void * obj,void * data)2104 nxt_h1p_conn_free(nxt_task_t *task, void *obj, void *data)
2105 {
2106     nxt_conn_t          *c;
2107     nxt_listen_event_t  *lev;
2108     nxt_event_engine_t  *engine;
2109 
2110     c = obj;
2111 
2112     nxt_debug(task, "h1p conn free");
2113 
2114     engine = task->thread->engine;
2115 
2116     nxt_sockaddr_cache_free(engine, c);
2117 
2118     lev = c->listen;
2119 
2120     nxt_conn_free(task, c);
2121 
2122     nxt_router_listen_event_release(&engine->task, lev, NULL);
2123 }
2124 
2125 
2126 static void
nxt_h1p_peer_connect(nxt_task_t * task,nxt_http_peer_t * peer)2127 nxt_h1p_peer_connect(nxt_task_t *task, nxt_http_peer_t *peer)
2128 {
2129     nxt_mp_t            *mp;
2130     nxt_int_t           ret;
2131     nxt_conn_t          *c, *client;
2132     nxt_h1proto_t       *h1p;
2133     nxt_fd_event_t      *socket;
2134     nxt_work_queue_t    *wq;
2135     nxt_http_request_t  *r;
2136 
2137     nxt_debug(task, "h1p peer connect");
2138 
2139     peer->status = NXT_HTTP_UNSET;
2140     r = peer->request;
2141 
2142     mp = nxt_mp_create(1024, 128, 256, 32);
2143 
2144     if (nxt_slow_path(mp == NULL)) {
2145         goto fail;
2146     }
2147 
2148     h1p = nxt_mp_zalloc(mp, sizeof(nxt_h1proto_t));
2149     if (nxt_slow_path(h1p == NULL)) {
2150         goto fail;
2151     }
2152 
2153     ret = nxt_http_parse_request_init(&h1p->parser, r->mem_pool);
2154     if (nxt_slow_path(ret != NXT_OK)) {
2155         goto fail;
2156     }
2157 
2158     c = nxt_conn_create(mp, task);
2159     if (nxt_slow_path(c == NULL)) {
2160         goto fail;
2161     }
2162 
2163     c->mem_pool = mp;
2164     h1p->conn = c;
2165 
2166     peer->proto.h1 = h1p;
2167     h1p->request = r;
2168 
2169     c->socket.data = peer;
2170     c->remote = peer->server->sockaddr;
2171 
2172     c->socket.write_ready = 1;
2173     c->write_state = &nxt_h1p_peer_connect_state;
2174 
2175     /*
2176      * TODO: queues should be implemented via client proto interface.
2177      */
2178     client = r->proto.h1->conn;
2179 
2180     socket = &client->socket;
2181     wq = socket->read_work_queue;
2182     c->read_work_queue = wq;
2183     c->socket.read_work_queue = wq;
2184     c->read_timer.work_queue = wq;
2185 
2186     wq = socket->write_work_queue;
2187     c->write_work_queue = wq;
2188     c->socket.write_work_queue = wq;
2189     c->write_timer.work_queue = wq;
2190     /* TODO END */
2191 
2192     nxt_conn_connect(task->thread->engine, c);
2193 
2194     return;
2195 
2196 fail:
2197 
2198     peer->status = NXT_HTTP_INTERNAL_SERVER_ERROR;
2199 
2200     r->state->error_handler(task, r, peer);
2201 }
2202 
2203 
2204 static const nxt_conn_state_t  nxt_h1p_peer_connect_state
2205     nxt_aligned(64) =
2206 {
2207     .ready_handler = nxt_h1p_peer_connected,
2208     .close_handler = nxt_h1p_peer_refused,
2209     .error_handler = nxt_h1p_peer_error,
2210 
2211     .timer_handler = nxt_h1p_peer_send_timeout,
2212     .timer_value = nxt_h1p_peer_timer_value,
2213     .timer_data = offsetof(nxt_socket_conf_t, proxy_timeout),
2214 };
2215 
2216 
2217 static void
nxt_h1p_peer_connected(nxt_task_t * task,void * obj,void * data)2218 nxt_h1p_peer_connected(nxt_task_t *task, void *obj, void *data)
2219 {
2220     nxt_http_peer_t     *peer;
2221     nxt_http_request_t  *r;
2222 
2223     peer = data;
2224 
2225     nxt_debug(task, "h1p peer connected");
2226 
2227     r = peer->request;
2228     r->state->ready_handler(task, r, peer);
2229 }
2230 
2231 
2232 static void
nxt_h1p_peer_refused(nxt_task_t * task,void * obj,void * data)2233 nxt_h1p_peer_refused(nxt_task_t *task, void *obj, void *data)
2234 {
2235     nxt_http_peer_t     *peer;
2236     nxt_http_request_t  *r;
2237 
2238     peer = data;
2239 
2240     nxt_debug(task, "h1p peer refused");
2241 
2242     //peer->status = NXT_HTTP_SERVICE_UNAVAILABLE;
2243     peer->status = NXT_HTTP_BAD_GATEWAY;
2244 
2245     r = peer->request;
2246     r->state->error_handler(task, r, peer);
2247 }
2248 
2249 
2250 static void
nxt_h1p_peer_header_send(nxt_task_t * task,nxt_http_peer_t * peer)2251 nxt_h1p_peer_header_send(nxt_task_t *task, nxt_http_peer_t *peer)
2252 {
2253     u_char              *p;
2254     size_t              size;
2255     nxt_buf_t           *header, *body;
2256     nxt_conn_t          *c;
2257     nxt_http_field_t    *field;
2258     nxt_http_request_t  *r;
2259 
2260     nxt_debug(task, "h1p peer header send");
2261 
2262     r = peer->request;
2263 
2264     size = r->method->length + sizeof(" ") + r->target.length
2265            + sizeof(" HTTP/1.1\r\n")
2266            + sizeof("Connection: close\r\n")
2267            + sizeof("\r\n");
2268 
2269     nxt_list_each(field, r->fields) {
2270 
2271         if (!field->hopbyhop) {
2272             size += field->name_length + field->value_length;
2273             size += nxt_length(": \r\n");
2274         }
2275 
2276     } nxt_list_loop;
2277 
2278     header = nxt_http_buf_mem(task, r, size);
2279     if (nxt_slow_path(header == NULL)) {
2280         r->state->error_handler(task, r, peer);
2281         return;
2282     }
2283 
2284     p = header->mem.free;
2285 
2286     p = nxt_cpymem(p, r->method->start, r->method->length);
2287     *p++ = ' ';
2288     p = nxt_cpymem(p, r->target.start, r->target.length);
2289     p = nxt_cpymem(p, " HTTP/1.1\r\n", 11);
2290     p = nxt_cpymem(p, "Connection: close\r\n", 19);
2291 
2292     nxt_list_each(field, r->fields) {
2293 
2294         if (!field->hopbyhop) {
2295             p = nxt_cpymem(p, field->name, field->name_length);
2296             *p++ = ':'; *p++ = ' ';
2297             p = nxt_cpymem(p, field->value, field->value_length);
2298             *p++ = '\r'; *p++ = '\n';
2299         }
2300 
2301     } nxt_list_loop;
2302 
2303     *p++ = '\r'; *p++ = '\n';
2304     header->mem.free = p;
2305     size = p - header->mem.pos;
2306 
2307     c = peer->proto.h1->conn;
2308     c->write = header;
2309     c->write_state = &nxt_h1p_peer_header_send_state;
2310 
2311     if (r->body != NULL) {
2312         if (nxt_buf_is_file(r->body)) {
2313             body = nxt_buf_file_alloc(r->mem_pool, 0, 0);
2314 
2315         } else {
2316             body = nxt_buf_mem_alloc(r->mem_pool, 0, 0);
2317         }
2318 
2319         if (nxt_slow_path(body == NULL)) {
2320             r->state->error_handler(task, r, peer);
2321             return;
2322         }
2323 
2324         header->next = body;
2325 
2326         if (nxt_buf_is_file(r->body)) {
2327             body->file = r->body->file;
2328             body->file_end = r->body->file_end;
2329 
2330         } else {
2331             body->mem = r->body->mem;
2332         }
2333 
2334         size += nxt_buf_used_size(body);
2335 
2336 //        nxt_mp_retain(r->mem_pool);
2337     }
2338 
2339     if (size > 16384) {
2340         /* Use proxy_send_timeout instead of proxy_timeout. */
2341         c->write_state = &nxt_h1p_peer_header_body_send_state;
2342     }
2343 
2344     nxt_conn_write(task->thread->engine, c);
2345 }
2346 
2347 
2348 static const nxt_conn_state_t  nxt_h1p_peer_header_send_state
2349     nxt_aligned(64) =
2350 {
2351     .ready_handler = nxt_h1p_peer_header_sent,
2352     .error_handler = nxt_h1p_peer_error,
2353 
2354     .timer_handler = nxt_h1p_peer_send_timeout,
2355     .timer_value = nxt_h1p_peer_timer_value,
2356     .timer_data = offsetof(nxt_socket_conf_t, proxy_timeout),
2357 };
2358 
2359 
2360 static const nxt_conn_state_t  nxt_h1p_peer_header_body_send_state
2361     nxt_aligned(64) =
2362 {
2363     .ready_handler = nxt_h1p_peer_header_sent,
2364     .error_handler = nxt_h1p_peer_error,
2365 
2366     .timer_handler = nxt_h1p_peer_send_timeout,
2367     .timer_value = nxt_h1p_peer_timer_value,
2368     .timer_data = offsetof(nxt_socket_conf_t, proxy_send_timeout),
2369     .timer_autoreset = 1,
2370 };
2371 
2372 
2373 static void
nxt_h1p_peer_header_sent(nxt_task_t * task,void * obj,void * data)2374 nxt_h1p_peer_header_sent(nxt_task_t *task, void *obj, void *data)
2375 {
2376     nxt_conn_t          *c;
2377     nxt_http_peer_t     *peer;
2378     nxt_http_request_t  *r;
2379     nxt_event_engine_t  *engine;
2380 
2381     c = obj;
2382     peer = data;
2383 
2384     nxt_debug(task, "h1p peer header sent");
2385 
2386     engine = task->thread->engine;
2387 
2388     c->write = nxt_sendbuf_completion(task, &engine->fast_work_queue, c->write);
2389 
2390     if (c->write != NULL) {
2391         nxt_conn_write(engine, c);
2392         return;
2393     }
2394 
2395     r = peer->request;
2396     r->state->ready_handler(task, r, peer);
2397 }
2398 
2399 
2400 static void
nxt_h1p_peer_header_read(nxt_task_t * task,nxt_http_peer_t * peer)2401 nxt_h1p_peer_header_read(nxt_task_t *task, nxt_http_peer_t *peer)
2402 {
2403     nxt_conn_t  *c;
2404 
2405     nxt_debug(task, "h1p peer header read");
2406 
2407     c = peer->proto.h1->conn;
2408 
2409     if (c->write_timer.enabled) {
2410         c->read_state = &nxt_h1p_peer_header_read_state;
2411 
2412     } else {
2413         c->read_state = &nxt_h1p_peer_header_read_timer_state;
2414     }
2415 
2416     nxt_conn_read(task->thread->engine, c);
2417 }
2418 
2419 
2420 static const nxt_conn_state_t  nxt_h1p_peer_header_read_state
2421     nxt_aligned(64) =
2422 {
2423     .ready_handler = nxt_h1p_peer_header_read_done,
2424     .close_handler = nxt_h1p_peer_closed,
2425     .error_handler = nxt_h1p_peer_error,
2426 
2427     .io_read_handler = nxt_h1p_peer_io_read_handler,
2428 };
2429 
2430 
2431 static const nxt_conn_state_t  nxt_h1p_peer_header_read_timer_state
2432     nxt_aligned(64) =
2433 {
2434     .ready_handler = nxt_h1p_peer_header_read_done,
2435     .close_handler = nxt_h1p_peer_closed,
2436     .error_handler = nxt_h1p_peer_error,
2437 
2438     .io_read_handler = nxt_h1p_peer_io_read_handler,
2439 
2440     .timer_handler = nxt_h1p_peer_read_timeout,
2441     .timer_value = nxt_h1p_peer_timer_value,
2442     .timer_data = offsetof(nxt_socket_conf_t, proxy_timeout),
2443 };
2444 
2445 
2446 static ssize_t
nxt_h1p_peer_io_read_handler(nxt_task_t * task,nxt_conn_t * c)2447 nxt_h1p_peer_io_read_handler(nxt_task_t *task, nxt_conn_t *c)
2448 {
2449     size_t              size;
2450     ssize_t             n;
2451     nxt_buf_t           *b;
2452     nxt_http_peer_t     *peer;
2453     nxt_socket_conf_t   *skcf;
2454     nxt_http_request_t  *r;
2455 
2456     peer = c->socket.data;
2457     r = peer->request;
2458     b = c->read;
2459 
2460     if (b == NULL) {
2461         skcf = r->conf->socket_conf;
2462 
2463         size = (peer->header_received) ? skcf->proxy_buffer_size
2464                                        : skcf->proxy_header_buffer_size;
2465 
2466         nxt_debug(task, "h1p peer io read: %z", size);
2467 
2468         b = nxt_http_proxy_buf_mem_alloc(task, r, size);
2469         if (nxt_slow_path(b == NULL)) {
2470             c->socket.error = NXT_ENOMEM;
2471             return NXT_ERROR;
2472         }
2473     }
2474 
2475     n = c->io->recvbuf(c, b);
2476 
2477     if (n > 0) {
2478         c->read = b;
2479 
2480     } else {
2481         c->read = NULL;
2482         nxt_http_proxy_buf_mem_free(task, r, b);
2483     }
2484 
2485     return n;
2486 }
2487 
2488 
2489 static void
nxt_h1p_peer_header_read_done(nxt_task_t * task,void * obj,void * data)2490 nxt_h1p_peer_header_read_done(nxt_task_t *task, void *obj, void *data)
2491 {
2492     nxt_int_t           ret;
2493     nxt_buf_t           *b;
2494     nxt_conn_t          *c;
2495     nxt_h1proto_t       *h1p;
2496     nxt_http_peer_t     *peer;
2497     nxt_http_request_t  *r;
2498     nxt_event_engine_t  *engine;
2499 
2500     c = obj;
2501     peer = data;
2502 
2503     nxt_debug(task, "h1p peer header read done");
2504 
2505     b = c->read;
2506 
2507     ret = nxt_h1p_peer_header_parse(peer, &b->mem);
2508 
2509     r = peer->request;
2510 
2511     ret = nxt_expect(NXT_DONE, ret);
2512 
2513     if (ret != NXT_AGAIN) {
2514         engine = task->thread->engine;
2515         nxt_timer_disable(engine, &c->write_timer);
2516         nxt_timer_disable(engine, &c->read_timer);
2517     }
2518 
2519     switch (ret) {
2520 
2521     case NXT_DONE:
2522         peer->fields = peer->proto.h1->parser.fields;
2523 
2524         ret = nxt_http_fields_process(peer->fields,
2525                                       &nxt_h1p_peer_fields_hash, r);
2526         if (nxt_slow_path(ret != NXT_OK)) {
2527             peer->status = NXT_HTTP_INTERNAL_SERVER_ERROR;
2528             break;
2529         }
2530 
2531         c->read = NULL;
2532 
2533         peer->header_received = 1;
2534 
2535         h1p = peer->proto.h1;
2536 
2537         if (h1p->chunked) {
2538             if (r->resp.content_length != NULL) {
2539                 peer->status = NXT_HTTP_BAD_GATEWAY;
2540                 break;
2541             }
2542 
2543             h1p->chunked_parse.mem_pool = c->mem_pool;
2544 
2545         } else if (r->resp.content_length_n > 0) {
2546             h1p->remainder = r->resp.content_length_n;
2547         }
2548 
2549         if (nxt_buf_mem_used_size(&b->mem) != 0) {
2550             nxt_h1p_peer_body_process(task, peer, b);
2551             return;
2552         }
2553 
2554         r->state->ready_handler(task, r, peer);
2555         return;
2556 
2557     case NXT_AGAIN:
2558         if (nxt_buf_mem_free_size(&b->mem) != 0) {
2559             nxt_conn_read(task->thread->engine, c);
2560             return;
2561         }
2562 
2563         /* Fall through. */
2564 
2565     default:
2566     case NXT_ERROR:
2567     case NXT_HTTP_PARSE_INVALID:
2568     case NXT_HTTP_PARSE_UNSUPPORTED_VERSION:
2569     case NXT_HTTP_PARSE_TOO_LARGE_FIELD:
2570         peer->status = NXT_HTTP_BAD_GATEWAY;
2571         break;
2572     }
2573 
2574     nxt_http_proxy_buf_mem_free(task, r, b);
2575 
2576     r->state->error_handler(task, r, peer);
2577 }
2578 
2579 
2580 static nxt_int_t
nxt_h1p_peer_header_parse(nxt_http_peer_t * peer,nxt_buf_mem_t * bm)2581 nxt_h1p_peer_header_parse(nxt_http_peer_t *peer, nxt_buf_mem_t *bm)
2582 {
2583     u_char     *p;
2584     size_t     length;
2585     nxt_int_t  status;
2586 
2587     if (peer->status < 0) {
2588         length = nxt_buf_mem_used_size(bm);
2589 
2590         if (nxt_slow_path(length < 12)) {
2591             return NXT_AGAIN;
2592         }
2593 
2594         p = bm->pos;
2595 
2596         if (nxt_slow_path(nxt_memcmp(p, "HTTP/1.", 7) != 0
2597                           || (p[7] != '0' && p[7] != '1')))
2598         {
2599             return NXT_ERROR;
2600         }
2601 
2602         status = nxt_int_parse(&p[9], 3);
2603 
2604         if (nxt_slow_path(status < 0)) {
2605             return NXT_ERROR;
2606         }
2607 
2608         p += 12;
2609         length -= 12;
2610 
2611         p = nxt_memchr(p, '\n', length);
2612 
2613         if (nxt_slow_path(p == NULL)) {
2614             return NXT_AGAIN;
2615         }
2616 
2617         bm->pos = p + 1;
2618         peer->status = status;
2619     }
2620 
2621     return nxt_http_parse_fields(&peer->proto.h1->parser, bm);
2622 }
2623 
2624 
2625 static void
nxt_h1p_peer_read(nxt_task_t * task,nxt_http_peer_t * peer)2626 nxt_h1p_peer_read(nxt_task_t *task, nxt_http_peer_t *peer)
2627 {
2628     nxt_conn_t  *c;
2629 
2630     nxt_debug(task, "h1p peer read");
2631 
2632     c = peer->proto.h1->conn;
2633     c->read_state = &nxt_h1p_peer_read_state;
2634 
2635     nxt_conn_read(task->thread->engine, c);
2636 }
2637 
2638 
2639 static const nxt_conn_state_t  nxt_h1p_peer_read_state
2640     nxt_aligned(64) =
2641 {
2642     .ready_handler = nxt_h1p_peer_read_done,
2643     .close_handler = nxt_h1p_peer_closed,
2644     .error_handler = nxt_h1p_peer_error,
2645 
2646     .io_read_handler = nxt_h1p_peer_io_read_handler,
2647 
2648     .timer_handler = nxt_h1p_peer_read_timeout,
2649     .timer_value = nxt_h1p_peer_timer_value,
2650     .timer_data = offsetof(nxt_socket_conf_t, proxy_read_timeout),
2651     .timer_autoreset = 1,
2652 };
2653 
2654 
2655 static void
nxt_h1p_peer_read_done(nxt_task_t * task,void * obj,void * data)2656 nxt_h1p_peer_read_done(nxt_task_t *task, void *obj, void *data)
2657 {
2658     nxt_buf_t        *out;
2659     nxt_conn_t       *c;
2660     nxt_http_peer_t  *peer;
2661 
2662     c = obj;
2663     peer = data;
2664 
2665     nxt_debug(task, "h1p peer read done");
2666 
2667     out = c->read;
2668     c->read = NULL;
2669 
2670     nxt_h1p_peer_body_process(task, peer, out);
2671 }
2672 
2673 
2674 static void
nxt_h1p_peer_body_process(nxt_task_t * task,nxt_http_peer_t * peer,nxt_buf_t * out)2675 nxt_h1p_peer_body_process(nxt_task_t *task, nxt_http_peer_t *peer,
2676     nxt_buf_t *out)
2677 {
2678     size_t              length;
2679     nxt_h1proto_t       *h1p;
2680     nxt_http_request_t  *r;
2681 
2682     h1p = peer->proto.h1;
2683 
2684     if (h1p->chunked) {
2685         out = nxt_http_chunk_parse(task, &h1p->chunked_parse, out);
2686 
2687         if (h1p->chunked_parse.chunk_error || h1p->chunked_parse.error) {
2688             peer->status = NXT_HTTP_BAD_GATEWAY;
2689             r = peer->request;
2690             r->state->error_handler(task, r, peer);
2691             return;
2692         }
2693 
2694         if (h1p->chunked_parse.last) {
2695             nxt_buf_chain_add(&out, nxt_http_buf_last(peer->request));
2696             peer->closed = 1;
2697         }
2698 
2699     } else if (h1p->remainder > 0) {
2700         length = nxt_buf_chain_length(out);
2701         h1p->remainder -= length;
2702     }
2703 
2704     peer->body = out;
2705 
2706     r = peer->request;
2707     r->state->ready_handler(task, r, peer);
2708 }
2709 
2710 
2711 static void
nxt_h1p_peer_closed(nxt_task_t * task,void * obj,void * data)2712 nxt_h1p_peer_closed(nxt_task_t *task, void *obj, void *data)
2713 {
2714     nxt_http_peer_t     *peer;
2715     nxt_http_request_t  *r;
2716 
2717     peer = data;
2718 
2719     nxt_debug(task, "h1p peer closed");
2720 
2721     r = peer->request;
2722 
2723     if (peer->header_received) {
2724         peer->body = nxt_http_buf_last(r);
2725         peer->closed = 1;
2726         r->inconsistent = (peer->proto.h1->remainder != 0);
2727 
2728         r->state->ready_handler(task, r, peer);
2729 
2730     } else {
2731         peer->status = NXT_HTTP_BAD_GATEWAY;
2732 
2733         r->state->error_handler(task, r, peer);
2734     }
2735 }
2736 
2737 
2738 static void
nxt_h1p_peer_error(nxt_task_t * task,void * obj,void * data)2739 nxt_h1p_peer_error(nxt_task_t *task, void *obj, void *data)
2740 {
2741     nxt_http_peer_t     *peer;
2742     nxt_http_request_t  *r;
2743 
2744     peer = data;
2745 
2746     nxt_debug(task, "h1p peer error");
2747 
2748     peer->status = NXT_HTTP_BAD_GATEWAY;
2749 
2750     r = peer->request;
2751     r->state->error_handler(task, r, peer);
2752 }
2753 
2754 
2755 static void
nxt_h1p_peer_send_timeout(nxt_task_t * task,void * obj,void * data)2756 nxt_h1p_peer_send_timeout(nxt_task_t *task, void *obj, void *data)
2757 {
2758     nxt_conn_t          *c;
2759     nxt_timer_t         *timer;
2760     nxt_http_peer_t     *peer;
2761     nxt_http_request_t  *r;
2762 
2763     timer = obj;
2764 
2765     nxt_debug(task, "h1p peer send timeout");
2766 
2767     c = nxt_write_timer_conn(timer);
2768     c->block_write = 1;
2769     c->block_read = 1;
2770 
2771     peer = c->socket.data;
2772     peer->status = NXT_HTTP_GATEWAY_TIMEOUT;
2773 
2774     r = peer->request;
2775     r->state->error_handler(task, r, peer);
2776 }
2777 
2778 
2779 static void
nxt_h1p_peer_read_timeout(nxt_task_t * task,void * obj,void * data)2780 nxt_h1p_peer_read_timeout(nxt_task_t *task, void *obj, void *data)
2781 {
2782     nxt_conn_t          *c;
2783     nxt_timer_t         *timer;
2784     nxt_http_peer_t     *peer;
2785     nxt_http_request_t  *r;
2786 
2787     timer = obj;
2788 
2789     nxt_debug(task, "h1p peer read timeout");
2790 
2791     c = nxt_read_timer_conn(timer);
2792     c->block_write = 1;
2793     c->block_read = 1;
2794 
2795     peer = c->socket.data;
2796     peer->status = NXT_HTTP_GATEWAY_TIMEOUT;
2797 
2798     r = peer->request;
2799     r->state->error_handler(task, r, peer);
2800 }
2801 
2802 
2803 static nxt_msec_t
nxt_h1p_peer_timer_value(nxt_conn_t * c,uintptr_t data)2804 nxt_h1p_peer_timer_value(nxt_conn_t *c, uintptr_t data)
2805 {
2806     nxt_http_peer_t  *peer;
2807 
2808     peer = c->socket.data;
2809 
2810     return nxt_value_at(nxt_msec_t, peer->request->conf->socket_conf, data);
2811 }
2812 
2813 
2814 static void
nxt_h1p_peer_close(nxt_task_t * task,nxt_http_peer_t * peer)2815 nxt_h1p_peer_close(nxt_task_t *task, nxt_http_peer_t *peer)
2816 {
2817     nxt_conn_t  *c;
2818 
2819     nxt_debug(task, "h1p peer close");
2820 
2821     peer->closed = 1;
2822 
2823     c = peer->proto.h1->conn;
2824     task = &c->task;
2825     c->socket.task = task;
2826     c->read_timer.task = task;
2827     c->write_timer.task = task;
2828 
2829     if (c->socket.fd != -1) {
2830         c->write_state = &nxt_h1p_peer_close_state;
2831 
2832         nxt_conn_close(task->thread->engine, c);
2833 
2834     } else {
2835         nxt_h1p_peer_free(task, c, NULL);
2836     }
2837 }
2838 
2839 
2840 static const nxt_conn_state_t  nxt_h1p_peer_close_state
2841     nxt_aligned(64) =
2842 {
2843     .ready_handler = nxt_h1p_peer_free,
2844 };
2845 
2846 
2847 static void
nxt_h1p_peer_free(nxt_task_t * task,void * obj,void * data)2848 nxt_h1p_peer_free(nxt_task_t *task, void *obj, void *data)
2849 {
2850     nxt_conn_t  *c;
2851 
2852     c = obj;
2853 
2854     nxt_debug(task, "h1p peer free");
2855 
2856     nxt_conn_free(task, c);
2857 }
2858 
2859 
2860 static nxt_int_t
nxt_h1p_peer_transfer_encoding(void * ctx,nxt_http_field_t * field,uintptr_t data)2861 nxt_h1p_peer_transfer_encoding(void *ctx, nxt_http_field_t *field,
2862     uintptr_t data)
2863 {
2864     nxt_http_request_t  *r;
2865 
2866     r = ctx;
2867     field->skip = 1;
2868 
2869     if (field->value_length == 7
2870         && nxt_memcmp(field->value, "chunked", 7) == 0)
2871     {
2872         r->peer->proto.h1->chunked = 1;
2873     }
2874 
2875     return NXT_OK;
2876 }
2877