xref: /unit/src/nxt_h1proto.c (revision 2381:a68b5f5bf46c)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_router.h>
8 #include <nxt_http.h>
9 #include <nxt_upstream.h>
10 #include <nxt_h1proto.h>
11 #include <nxt_websocket.h>
12 #include <nxt_websocket_header.h>
13 
14 
15 /*
16  * nxt_http_conn_ and nxt_h1p_conn_ prefixes are used for connection handlers.
17  * nxt_h1p_idle_ prefix is used for idle connection handlers.
18  * nxt_h1p_request_ prefix is used for HTTP/1 protocol request methods.
19  */
20 
21 #if (NXT_TLS)
22 static ssize_t nxt_http_idle_io_read_handler(nxt_task_t *task, nxt_conn_t *c);
23 static void nxt_http_conn_test(nxt_task_t *task, void *obj, void *data);
24 #endif
25 static ssize_t nxt_h1p_idle_io_read_handler(nxt_task_t *task, nxt_conn_t *c);
26 static void nxt_h1p_conn_proto_init(nxt_task_t *task, void *obj, void *data);
27 static void nxt_h1p_conn_request_init(nxt_task_t *task, void *obj, void *data);
28 static void nxt_h1p_conn_request_header_parse(nxt_task_t *task, void *obj,
29     void *data);
30 static nxt_int_t nxt_h1p_header_process(nxt_task_t *task, nxt_h1proto_t *h1p,
31     nxt_http_request_t *r);
32 static nxt_int_t nxt_h1p_header_buffer_test(nxt_task_t *task,
33     nxt_h1proto_t *h1p, nxt_conn_t *c, nxt_socket_conf_t *skcf);
34 static nxt_int_t nxt_h1p_connection(void *ctx, nxt_http_field_t *field,
35     uintptr_t data);
36 static nxt_int_t nxt_h1p_upgrade(void *ctx, nxt_http_field_t *field,
37     uintptr_t data);
38 static nxt_int_t nxt_h1p_websocket_key(void *ctx, nxt_http_field_t *field,
39     uintptr_t data);
40 static nxt_int_t nxt_h1p_websocket_version(void *ctx, nxt_http_field_t *field,
41     uintptr_t data);
42 static nxt_int_t nxt_h1p_transfer_encoding(void *ctx, nxt_http_field_t *field,
43     uintptr_t data);
44 static void nxt_h1p_request_body_read(nxt_task_t *task, nxt_http_request_t *r);
45 static void nxt_h1p_conn_request_body_read(nxt_task_t *task, void *obj,
46     void *data);
47 static void nxt_h1p_request_local_addr(nxt_task_t *task, nxt_http_request_t *r);
48 static void nxt_h1p_request_header_send(nxt_task_t *task,
49     nxt_http_request_t *r, nxt_work_handler_t body_handler, void *data);
50 static void nxt_h1p_request_send(nxt_task_t *task, nxt_http_request_t *r,
51     nxt_buf_t *out);
52 static nxt_buf_t *nxt_h1p_chunk_create(nxt_task_t *task, nxt_http_request_t *r,
53     nxt_buf_t *out);
54 static nxt_off_t nxt_h1p_request_body_bytes_sent(nxt_task_t *task,
55     nxt_http_proto_t proto);
56 static void nxt_h1p_request_discard(nxt_task_t *task, nxt_http_request_t *r,
57     nxt_buf_t *last);
58 static void nxt_h1p_conn_request_error(nxt_task_t *task, void *obj, void *data);
59 static void nxt_h1p_conn_request_timeout(nxt_task_t *task, void *obj,
60     void *data);
61 static void nxt_h1p_conn_request_send_timeout(nxt_task_t *task, void *obj,
62     void *data);
63 nxt_inline void nxt_h1p_request_error(nxt_task_t *task, nxt_h1proto_t *h1p,
64     nxt_http_request_t *r);
65 static void nxt_h1p_request_close(nxt_task_t *task, nxt_http_proto_t proto,
66     nxt_socket_conf_joint_t *joint);
67 static void nxt_h1p_conn_sent(nxt_task_t *task, void *obj, void *data);
68 static void nxt_h1p_conn_close(nxt_task_t *task, void *obj, void *data);
69 static void nxt_h1p_conn_error(nxt_task_t *task, void *obj, void *data);
70 static nxt_msec_t nxt_h1p_conn_timer_value(nxt_conn_t *c, uintptr_t data);
71 static void nxt_h1p_keepalive(nxt_task_t *task, nxt_h1proto_t *h1p,
72     nxt_conn_t *c);
73 static void nxt_h1p_idle_close(nxt_task_t *task, void *obj, void *data);
74 static void nxt_h1p_idle_timeout(nxt_task_t *task, void *obj, void *data);
75 static void nxt_h1p_idle_response(nxt_task_t *task, nxt_conn_t *c);
76 static void nxt_h1p_idle_response_sent(nxt_task_t *task, void *obj, void *data);
77 static void nxt_h1p_idle_response_error(nxt_task_t *task, void *obj,
78     void *data);
79 static void nxt_h1p_idle_response_timeout(nxt_task_t *task, void *obj,
80     void *data);
81 static nxt_msec_t nxt_h1p_idle_response_timer_value(nxt_conn_t *c,
82     uintptr_t data);
83 static void nxt_h1p_shutdown(nxt_task_t *task, nxt_conn_t *c);
84 static void nxt_h1p_closing(nxt_task_t *task, nxt_conn_t *c);
85 static void nxt_h1p_conn_ws_shutdown(nxt_task_t *task, void *obj, void *data);
86 static void nxt_h1p_conn_closing(nxt_task_t *task, void *obj, void *data);
87 static void nxt_h1p_conn_free(nxt_task_t *task, void *obj, void *data);
88 
89 static void nxt_h1p_peer_connect(nxt_task_t *task, nxt_http_peer_t *peer);
90 static void nxt_h1p_peer_connected(nxt_task_t *task, void *obj, void *data);
91 static void nxt_h1p_peer_refused(nxt_task_t *task, void *obj, void *data);
92 static void nxt_h1p_peer_header_send(nxt_task_t *task, nxt_http_peer_t *peer);
93 static void nxt_h1p_peer_header_sent(nxt_task_t *task, void *obj, void *data);
94 static void nxt_h1p_peer_header_read(nxt_task_t *task, nxt_http_peer_t *peer);
95 static ssize_t nxt_h1p_peer_io_read_handler(nxt_task_t *task, nxt_conn_t *c);
96 static void nxt_h1p_peer_header_read_done(nxt_task_t *task, void *obj,
97     void *data);
98 static nxt_int_t nxt_h1p_peer_header_parse(nxt_http_peer_t *peer,
99     nxt_buf_mem_t *bm);
100 static void nxt_h1p_peer_read(nxt_task_t *task, nxt_http_peer_t *peer);
101 static void nxt_h1p_peer_read_done(nxt_task_t *task, void *obj, void *data);
102 static void nxt_h1p_peer_body_process(nxt_task_t *task, nxt_http_peer_t *peer, nxt_buf_t *out);
103 static void nxt_h1p_peer_closed(nxt_task_t *task, void *obj, void *data);
104 static void nxt_h1p_peer_error(nxt_task_t *task, void *obj, void *data);
105 static void nxt_h1p_peer_send_timeout(nxt_task_t *task, void *obj, void *data);
106 static void nxt_h1p_peer_read_timeout(nxt_task_t *task, void *obj, void *data);
107 static nxt_msec_t nxt_h1p_peer_timer_value(nxt_conn_t *c, uintptr_t data);
108 static void nxt_h1p_peer_close(nxt_task_t *task, nxt_http_peer_t *peer);
109 static void nxt_h1p_peer_free(nxt_task_t *task, void *obj, void *data);
110 static nxt_int_t nxt_h1p_peer_transfer_encoding(void *ctx,
111     nxt_http_field_t *field, uintptr_t data);
112 
113 #if (NXT_TLS)
114 static const nxt_conn_state_t  nxt_http_idle_state;
115 static const nxt_conn_state_t  nxt_h1p_shutdown_state;
116 #endif
117 static const nxt_conn_state_t  nxt_h1p_idle_state;
118 static const nxt_conn_state_t  nxt_h1p_header_parse_state;
119 static const nxt_conn_state_t  nxt_h1p_read_body_state;
120 static const nxt_conn_state_t  nxt_h1p_request_send_state;
121 static const nxt_conn_state_t  nxt_h1p_timeout_response_state;
122 static const nxt_conn_state_t  nxt_h1p_keepalive_state;
123 static const nxt_conn_state_t  nxt_h1p_close_state;
124 static const nxt_conn_state_t  nxt_h1p_peer_connect_state;
125 static const nxt_conn_state_t  nxt_h1p_peer_header_send_state;
126 static const nxt_conn_state_t  nxt_h1p_peer_header_body_send_state;
127 static const nxt_conn_state_t  nxt_h1p_peer_header_read_state;
128 static const nxt_conn_state_t  nxt_h1p_peer_header_read_timer_state;
129 static const nxt_conn_state_t  nxt_h1p_peer_read_state;
130 static const nxt_conn_state_t  nxt_h1p_peer_close_state;
131 
132 
133 const nxt_http_proto_table_t  nxt_http_proto[3] = {
134     /* NXT_HTTP_PROTO_H1 */
135     {
136         .body_read        = nxt_h1p_request_body_read,
137         .local_addr       = nxt_h1p_request_local_addr,
138         .header_send      = nxt_h1p_request_header_send,
139         .send             = nxt_h1p_request_send,
140         .body_bytes_sent  = nxt_h1p_request_body_bytes_sent,
141         .discard          = nxt_h1p_request_discard,
142         .close            = nxt_h1p_request_close,
143 
144         .peer_connect     = nxt_h1p_peer_connect,
145         .peer_header_send = nxt_h1p_peer_header_send,
146         .peer_header_read = nxt_h1p_peer_header_read,
147         .peer_read        = nxt_h1p_peer_read,
148         .peer_close       = nxt_h1p_peer_close,
149 
150         .ws_frame_start   = nxt_h1p_websocket_frame_start,
151     },
152     /* NXT_HTTP_PROTO_H2      */
153     /* NXT_HTTP_PROTO_DEVNULL */
154 };
155 
156 
157 static nxt_lvlhsh_t                    nxt_h1p_fields_hash;
158 
159 static nxt_http_field_proc_t           nxt_h1p_fields[] = {
160     { nxt_string("Connection"),        &nxt_h1p_connection, 0 },
161     { nxt_string("Upgrade"),           &nxt_h1p_upgrade, 0 },
162     { nxt_string("Sec-WebSocket-Key"), &nxt_h1p_websocket_key, 0 },
163     { nxt_string("Sec-WebSocket-Version"),
164                                        &nxt_h1p_websocket_version, 0 },
165     { nxt_string("Transfer-Encoding"), &nxt_h1p_transfer_encoding, 0 },
166 
167     { nxt_string("Host"),              &nxt_http_request_host, 0 },
168     { nxt_string("Cookie"),            &nxt_http_request_field,
169         offsetof(nxt_http_request_t, cookie) },
170     { nxt_string("Referer"),           &nxt_http_request_field,
171         offsetof(nxt_http_request_t, referer) },
172     { nxt_string("User-Agent"),        &nxt_http_request_field,
173         offsetof(nxt_http_request_t, user_agent) },
174     { nxt_string("Content-Type"),      &nxt_http_request_field,
175         offsetof(nxt_http_request_t, content_type) },
176     { nxt_string("Content-Length"),    &nxt_http_request_content_length, 0 },
177     { nxt_string("Authorization"),     &nxt_http_request_field,
178         offsetof(nxt_http_request_t, authorization) },
179 };
180 
181 
182 static nxt_lvlhsh_t                    nxt_h1p_peer_fields_hash;
183 
184 static nxt_http_field_proc_t           nxt_h1p_peer_fields[] = {
185     { nxt_string("Connection"),        &nxt_http_proxy_skip, 0 },
186     { nxt_string("Transfer-Encoding"), &nxt_h1p_peer_transfer_encoding, 0 },
187     { nxt_string("Server"),            &nxt_http_proxy_skip, 0 },
188     { nxt_string("Date"),              &nxt_http_proxy_date, 0 },
189     { nxt_string("Content-Length"),    &nxt_http_proxy_content_length, 0 },
190 };
191 
192 
193 nxt_int_t
nxt_h1p_init(nxt_task_t * task)194 nxt_h1p_init(nxt_task_t *task)
195 {
196     nxt_int_t  ret;
197 
198     ret = nxt_http_fields_hash(&nxt_h1p_fields_hash,
199                                nxt_h1p_fields, nxt_nitems(nxt_h1p_fields));
200 
201     if (nxt_fast_path(ret == NXT_OK)) {
202         ret = nxt_http_fields_hash(&nxt_h1p_peer_fields_hash,
203                                    nxt_h1p_peer_fields,
204                                    nxt_nitems(nxt_h1p_peer_fields));
205     }
206 
207     return ret;
208 }
209 
210 
211 void
nxt_http_conn_init(nxt_task_t * task,void * obj,void * data)212 nxt_http_conn_init(nxt_task_t *task, void *obj, void *data)
213 {
214     nxt_conn_t               *c;
215     nxt_socket_conf_t        *skcf;
216     nxt_event_engine_t       *engine;
217     nxt_listen_event_t       *lev;
218     nxt_socket_conf_joint_t  *joint;
219 
220     c = obj;
221     lev = data;
222 
223     nxt_debug(task, "http conn init");
224 
225     joint = lev->socket.data;
226     skcf = joint->socket_conf;
227     c->local = skcf->sockaddr;
228 
229     engine = task->thread->engine;
230     c->read_work_queue = &engine->fast_work_queue;
231     c->write_work_queue = &engine->fast_work_queue;
232 
233     c->read_state = &nxt_h1p_idle_state;
234 
235 #if (NXT_TLS)
236     if (skcf->tls != NULL) {
237         c->read_state = &nxt_http_idle_state;
238     }
239 #endif
240 
241     nxt_conn_read(engine, c);
242 }
243 
244 
245 #if (NXT_TLS)
246 
247 static const nxt_conn_state_t  nxt_http_idle_state
248     nxt_aligned(64) =
249 {
250     .ready_handler = nxt_http_conn_test,
251     .close_handler = nxt_h1p_conn_close,
252     .error_handler = nxt_h1p_conn_error,
253 
254     .io_read_handler = nxt_http_idle_io_read_handler,
255 
256     .timer_handler = nxt_h1p_idle_timeout,
257     .timer_value = nxt_h1p_conn_timer_value,
258     .timer_data = offsetof(nxt_socket_conf_t, idle_timeout),
259 };
260 
261 
262 static ssize_t
nxt_http_idle_io_read_handler(nxt_task_t * task,nxt_conn_t * c)263 nxt_http_idle_io_read_handler(nxt_task_t *task, nxt_conn_t *c)
264 {
265     size_t                   size;
266     ssize_t                  n;
267     nxt_buf_t                *b;
268     nxt_socket_conf_joint_t  *joint;
269 
270     joint = c->listen->socket.data;
271 
272     if (nxt_slow_path(joint == NULL)) {
273         /*
274          * Listening socket had been closed while
275          * connection was in keep-alive state.
276          */
277         c->read_state = &nxt_h1p_idle_close_state;
278         return 0;
279     }
280 
281     size = joint->socket_conf->header_buffer_size;
282 
283     b = nxt_event_engine_buf_mem_alloc(task->thread->engine, size);
284     if (nxt_slow_path(b == NULL)) {
285         c->socket.error = NXT_ENOMEM;
286         return NXT_ERROR;
287     }
288 
289     /*
290      * 1 byte is enough to distinguish between SSLv3/TLS and plain HTTP.
291      * 11 bytes are enough to log supported SSLv3/TLS version.
292      * 16 bytes are just for more optimized kernel copy-out operation.
293      */
294     n = c->io->recv(c, b->mem.pos, 16, MSG_PEEK);
295 
296     if (n > 0) {
297         c->read = b;
298 
299     } else {
300         c->read = NULL;
301         nxt_event_engine_buf_mem_free(task->thread->engine, b);
302     }
303 
304     return n;
305 }
306 
307 
308 static void
nxt_http_conn_test(nxt_task_t * task,void * obj,void * data)309 nxt_http_conn_test(nxt_task_t *task, void *obj, void *data)
310 {
311     u_char                   *p;
312     nxt_buf_t                *b;
313     nxt_conn_t               *c;
314     nxt_tls_conf_t           *tls;
315     nxt_event_engine_t       *engine;
316     nxt_socket_conf_joint_t  *joint;
317 
318     c = obj;
319 
320     nxt_debug(task, "h1p conn https test");
321 
322     engine = task->thread->engine;
323     b = c->read;
324     p = b->mem.pos;
325 
326     c->read_state = &nxt_h1p_idle_state;
327 
328     if (p[0] != 0x16) {
329         b->mem.free = b->mem.pos;
330 
331         nxt_conn_read(engine, c);
332         return;
333     }
334 
335     /* SSLv3/TLS ClientHello message. */
336 
337 #if (NXT_DEBUG)
338     if (nxt_buf_mem_used_size(&b->mem) >= 11) {
339         u_char      major, minor;
340         const char  *protocol;
341 
342         major = p[9];
343         minor = p[10];
344 
345         if (major == 3) {
346             if (minor == 0) {
347                 protocol = "SSLv";
348 
349             } else {
350                 protocol = "TLSv";
351                 major -= 2;
352                 minor -= 1;
353             }
354 
355             nxt_debug(task, "SSL/TLS: %s%ud.%ud", protocol, major, minor);
356         }
357     }
358 #endif
359 
360     c->read = NULL;
361     nxt_event_engine_buf_mem_free(engine, b);
362 
363     joint = c->listen->socket.data;
364 
365     if (nxt_slow_path(joint == NULL)) {
366         /*
367          * Listening socket had been closed while
368          * connection was in keep-alive state.
369          */
370         nxt_h1p_closing(task, c);
371         return;
372     }
373 
374     tls = joint->socket_conf->tls;
375 
376     tls->conn_init(task, tls, c);
377 }
378 
379 #endif
380 
381 
382 static const nxt_conn_state_t  nxt_h1p_idle_state
383     nxt_aligned(64) =
384 {
385     .ready_handler = nxt_h1p_conn_proto_init,
386     .close_handler = nxt_h1p_conn_close,
387     .error_handler = nxt_h1p_conn_error,
388 
389     .io_read_handler = nxt_h1p_idle_io_read_handler,
390 
391     .timer_handler = nxt_h1p_idle_timeout,
392     .timer_value = nxt_h1p_conn_timer_value,
393     .timer_data = offsetof(nxt_socket_conf_t, idle_timeout),
394     .timer_autoreset = 1,
395 };
396 
397 
398 static ssize_t
nxt_h1p_idle_io_read_handler(nxt_task_t * task,nxt_conn_t * c)399 nxt_h1p_idle_io_read_handler(nxt_task_t *task, nxt_conn_t *c)
400 {
401     size_t                   size;
402     ssize_t                  n;
403     nxt_buf_t                *b;
404     nxt_socket_conf_joint_t  *joint;
405 
406     joint = c->listen->socket.data;
407 
408     if (nxt_slow_path(joint == NULL)) {
409         /*
410          * Listening socket had been closed while
411          * connection was in keep-alive state.
412          */
413         c->read_state = &nxt_h1p_idle_close_state;
414         return 0;
415     }
416 
417     b = c->read;
418 
419     if (b == NULL) {
420         size = joint->socket_conf->header_buffer_size;
421 
422         b = nxt_event_engine_buf_mem_alloc(task->thread->engine, size);
423         if (nxt_slow_path(b == NULL)) {
424             c->socket.error = NXT_ENOMEM;
425             return NXT_ERROR;
426         }
427     }
428 
429     n = c->io->recvbuf(c, b);
430 
431     if (n > 0) {
432         c->read = b;
433 
434     } else {
435         c->read = NULL;
436         nxt_event_engine_buf_mem_free(task->thread->engine, b);
437     }
438 
439     return n;
440 }
441 
442 
443 static void
nxt_h1p_conn_proto_init(nxt_task_t * task,void * obj,void * data)444 nxt_h1p_conn_proto_init(nxt_task_t *task, void *obj, void *data)
445 {
446     nxt_conn_t     *c;
447     nxt_h1proto_t  *h1p;
448 
449     c = obj;
450 
451     nxt_debug(task, "h1p conn proto init");
452 
453     h1p = nxt_mp_zget(c->mem_pool, sizeof(nxt_h1proto_t));
454     if (nxt_slow_path(h1p == NULL)) {
455         nxt_h1p_closing(task, c);
456         return;
457     }
458 
459     c->socket.data = h1p;
460     h1p->conn = c;
461 
462     nxt_h1p_conn_request_init(task, c, h1p);
463 }
464 
465 
466 static void
nxt_h1p_conn_request_init(nxt_task_t * task,void * obj,void * data)467 nxt_h1p_conn_request_init(nxt_task_t *task, void *obj, void *data)
468 {
469     nxt_int_t                ret;
470     nxt_conn_t               *c;
471     nxt_h1proto_t            *h1p;
472     nxt_socket_conf_t        *skcf;
473     nxt_http_request_t       *r;
474     nxt_socket_conf_joint_t  *joint;
475 
476     c = obj;
477     h1p = data;
478 
479     nxt_debug(task, "h1p conn request init");
480 
481     nxt_conn_active(task->thread->engine, c);
482 
483     r = nxt_http_request_create(task);
484 
485     if (nxt_fast_path(r != NULL)) {
486         h1p->request = r;
487         r->proto.h1 = h1p;
488 
489         /* r->protocol = NXT_HTTP_PROTO_H1 is done by zeroing. */
490         r->remote = c->remote;
491 
492 #if (NXT_TLS)
493         r->tls = (c->u.tls != NULL);
494 #endif
495 
496         r->task = c->task;
497         task = &r->task;
498         c->socket.task = task;
499         c->read_timer.task = task;
500         c->write_timer.task = task;
501 
502         ret = nxt_http_parse_request_init(&h1p->parser, r->mem_pool);
503 
504         if (nxt_fast_path(ret == NXT_OK)) {
505             joint = c->listen->socket.data;
506             joint->count++;
507 
508             r->conf = joint;
509             skcf = joint->socket_conf;
510             r->log_route = skcf->log_route;
511 
512             if (c->local == NULL) {
513                 c->local = skcf->sockaddr;
514             }
515 
516             h1p->parser.discard_unsafe_fields = skcf->discard_unsafe_fields;
517 
518             nxt_h1p_conn_request_header_parse(task, c, h1p);
519             return;
520         }
521 
522         /*
523          * The request is very incomplete here,
524          * so "internal server error" useless here.
525          */
526         nxt_mp_release(r->mem_pool);
527     }
528 
529     nxt_h1p_closing(task, c);
530 }
531 
532 
533 static const nxt_conn_state_t  nxt_h1p_header_parse_state
534     nxt_aligned(64) =
535 {
536     .ready_handler = nxt_h1p_conn_request_header_parse,
537     .close_handler = nxt_h1p_conn_request_error,
538     .error_handler = nxt_h1p_conn_request_error,
539 
540     .timer_handler = nxt_h1p_conn_request_timeout,
541     .timer_value = nxt_h1p_conn_request_timer_value,
542     .timer_data = offsetof(nxt_socket_conf_t, header_read_timeout),
543 };
544 
545 
546 static void
nxt_h1p_conn_request_header_parse(nxt_task_t * task,void * obj,void * data)547 nxt_h1p_conn_request_header_parse(nxt_task_t *task, void *obj, void *data)
548 {
549     nxt_int_t           ret;
550     nxt_conn_t          *c;
551     nxt_h1proto_t       *h1p;
552     nxt_http_status_t   status;
553     nxt_http_request_t  *r;
554 
555     c = obj;
556     h1p = data;
557 
558     nxt_debug(task, "h1p conn header parse");
559 
560     ret = nxt_http_parse_request(&h1p->parser, &c->read->mem);
561 
562     ret = nxt_expect(NXT_DONE, ret);
563 
564     if (ret != NXT_AGAIN) {
565         nxt_timer_disable(task->thread->engine, &c->read_timer);
566     }
567 
568     r = h1p->request;
569 
570     switch (ret) {
571 
572     case NXT_DONE:
573         /*
574          * By default the keepalive mode is disabled in HTTP/1.0 and
575          * enabled in HTTP/1.1.  The mode can be overridden later by
576          * the "Connection" field processed in nxt_h1p_connection().
577          */
578         h1p->keepalive = (h1p->parser.version.s.minor != '0');
579 
580         ret = nxt_h1p_header_process(task, h1p, r);
581 
582         if (nxt_fast_path(ret == NXT_OK)) {
583 
584 #if (NXT_TLS)
585             if (c->u.tls == NULL && r->conf->socket_conf->tls != NULL) {
586                 status = NXT_HTTP_TO_HTTPS;
587                 goto error;
588             }
589 #endif
590 
591             r->state->ready_handler(task, r, NULL);
592             return;
593         }
594 
595         status = ret;
596         goto error;
597 
598     case NXT_AGAIN:
599         status = nxt_h1p_header_buffer_test(task, h1p, c, r->conf->socket_conf);
600 
601         if (nxt_fast_path(status == NXT_OK)) {
602             c->read_state = &nxt_h1p_header_parse_state;
603 
604             nxt_conn_read(task->thread->engine, c);
605             return;
606         }
607 
608         break;
609 
610     case NXT_HTTP_PARSE_INVALID:
611         status = NXT_HTTP_BAD_REQUEST;
612         break;
613 
614     case NXT_HTTP_PARSE_UNSUPPORTED_VERSION:
615         status = NXT_HTTP_VERSION_NOT_SUPPORTED;
616         break;
617 
618     case NXT_HTTP_PARSE_TOO_LARGE_FIELD:
619         status = NXT_HTTP_REQUEST_HEADER_FIELDS_TOO_LARGE;
620         break;
621 
622     default:
623     case NXT_ERROR:
624         status = NXT_HTTP_INTERNAL_SERVER_ERROR;
625         break;
626     }
627 
628     (void) nxt_h1p_header_process(task, h1p, r);
629 
630 error:
631 
632     h1p->keepalive = 0;
633 
634     nxt_http_request_error(task, r, status);
635 }
636 
637 
638 static nxt_int_t
nxt_h1p_header_process(nxt_task_t * task,nxt_h1proto_t * h1p,nxt_http_request_t * r)639 nxt_h1p_header_process(nxt_task_t *task, nxt_h1proto_t *h1p,
640     nxt_http_request_t *r)
641 {
642     u_char     *m;
643     nxt_int_t  ret;
644 
645     r->target.start = h1p->parser.target_start;
646     r->target.length = h1p->parser.target_end - h1p->parser.target_start;
647 
648     if (h1p->parser.version.ui64 != 0) {
649         r->version.start = h1p->parser.version.str;
650         r->version.length = sizeof(h1p->parser.version.str);
651     }
652 
653     r->method = &h1p->parser.method;
654     r->path = &h1p->parser.path;
655     r->args = &h1p->parser.args;
656 
657     if (nxt_slow_path(r->log_route)) {
658         nxt_log(task, NXT_LOG_NOTICE, "http request line \"%V %V %V\"",
659                 r->method, &r->target, &r->version);
660     }
661 
662     r->fields = h1p->parser.fields;
663 
664     ret = nxt_http_fields_process(r->fields, &nxt_h1p_fields_hash, r);
665     if (nxt_slow_path(ret != NXT_OK)) {
666         return ret;
667     }
668 
669     if (h1p->connection_upgrade && h1p->upgrade_websocket) {
670         m = h1p->parser.method.start;
671 
672         if (nxt_slow_path(h1p->parser.method.length != 3
673                           || m[0] != 'G'
674                           || m[1] != 'E'
675                           || m[2] != 'T'))
676         {
677             nxt_log(task, NXT_LOG_INFO, "h1p upgrade: bad method");
678 
679             return NXT_HTTP_BAD_REQUEST;
680         }
681 
682         if (nxt_slow_path(h1p->parser.version.s.minor != '1')) {
683             nxt_log(task, NXT_LOG_INFO, "h1p upgrade: bad protocol version");
684 
685             return NXT_HTTP_BAD_REQUEST;
686         }
687 
688         if (nxt_slow_path(h1p->websocket_key == NULL)) {
689             nxt_log(task, NXT_LOG_INFO,
690                     "h1p upgrade: bad or absent websocket key");
691 
692             return NXT_HTTP_BAD_REQUEST;
693         }
694 
695         if (nxt_slow_path(h1p->websocket_version_ok == 0)) {
696             nxt_log(task, NXT_LOG_INFO,
697                     "h1p upgrade: bad or absent websocket version");
698 
699             return NXT_HTTP_UPGRADE_REQUIRED;
700         }
701 
702         r->websocket_handshake = 1;
703     }
704 
705     return ret;
706 }
707 
708 
709 static nxt_int_t
nxt_h1p_header_buffer_test(nxt_task_t * task,nxt_h1proto_t * h1p,nxt_conn_t * c,nxt_socket_conf_t * skcf)710 nxt_h1p_header_buffer_test(nxt_task_t *task, nxt_h1proto_t *h1p, nxt_conn_t *c,
711     nxt_socket_conf_t *skcf)
712 {
713     size_t     size, used;
714     nxt_buf_t  *in, *b;
715 
716     in = c->read;
717 
718     if (nxt_buf_mem_free_size(&in->mem) == 0) {
719         size = skcf->large_header_buffer_size;
720         used = nxt_buf_mem_used_size(&in->mem);
721 
722         if (size <= used || h1p->nbuffers >= skcf->large_header_buffers) {
723             return NXT_HTTP_REQUEST_HEADER_FIELDS_TOO_LARGE;
724         }
725 
726         b = nxt_buf_mem_alloc(c->mem_pool, size, 0);
727         if (nxt_slow_path(b == NULL)) {
728             return NXT_HTTP_INTERNAL_SERVER_ERROR;
729         }
730 
731         b->mem.free = nxt_cpymem(b->mem.pos, in->mem.pos, used);
732 
733         in->next = h1p->buffers;
734         h1p->buffers = in;
735         h1p->nbuffers++;
736 
737         c->read = b;
738     }
739 
740     return NXT_OK;
741 }
742 
743 
744 static nxt_int_t
nxt_h1p_connection(void * ctx,nxt_http_field_t * field,uintptr_t data)745 nxt_h1p_connection(void *ctx, nxt_http_field_t *field, uintptr_t data)
746 {
747     nxt_http_request_t  *r;
748 
749     r = ctx;
750     field->hopbyhop = 1;
751 
752     if (field->value_length == 5
753         && nxt_memcasecmp(field->value, "close", 5) == 0)
754     {
755         r->proto.h1->keepalive = 0;
756 
757     } else if (field->value_length == 10
758                && nxt_memcasecmp(field->value, "keep-alive", 10) == 0)
759     {
760         r->proto.h1->keepalive = 1;
761 
762     } else if (field->value_length == 7
763                && nxt_memcasecmp(field->value, "upgrade", 7) == 0)
764     {
765         r->proto.h1->connection_upgrade = 1;
766     }
767 
768     return NXT_OK;
769 }
770 
771 
772 static nxt_int_t
nxt_h1p_upgrade(void * ctx,nxt_http_field_t * field,uintptr_t data)773 nxt_h1p_upgrade(void *ctx, nxt_http_field_t *field, uintptr_t data)
774 {
775     nxt_http_request_t  *r;
776 
777     r = ctx;
778 
779     if (field->value_length == 9
780         && nxt_memcasecmp(field->value, "websocket", 9) == 0)
781     {
782         r->proto.h1->upgrade_websocket = 1;
783     }
784 
785     return NXT_OK;
786 }
787 
788 
789 static nxt_int_t
nxt_h1p_websocket_key(void * ctx,nxt_http_field_t * field,uintptr_t data)790 nxt_h1p_websocket_key(void *ctx, nxt_http_field_t *field, uintptr_t data)
791 {
792     nxt_http_request_t  *r;
793 
794     r = ctx;
795 
796     if (field->value_length == 24) {
797         r->proto.h1->websocket_key = field;
798     }
799 
800     return NXT_OK;
801 }
802 
803 
804 static nxt_int_t
nxt_h1p_websocket_version(void * ctx,nxt_http_field_t * field,uintptr_t data)805 nxt_h1p_websocket_version(void *ctx, nxt_http_field_t *field, uintptr_t data)
806 {
807     nxt_http_request_t  *r;
808 
809     r = ctx;
810 
811     if (field->value_length == 2
812         && field->value[0] == '1' && field->value[1] == '3')
813     {
814         r->proto.h1->websocket_version_ok = 1;
815     }
816 
817     return NXT_OK;
818 }
819 
820 
821 static nxt_int_t
nxt_h1p_transfer_encoding(void * ctx,nxt_http_field_t * field,uintptr_t data)822 nxt_h1p_transfer_encoding(void *ctx, nxt_http_field_t *field, uintptr_t data)
823 {
824     nxt_http_te_t       te;
825     nxt_http_request_t  *r;
826 
827     r = ctx;
828     field->skip = 1;
829     field->hopbyhop = 1;
830 
831     if (field->value_length == 7
832         && memcmp(field->value, "chunked", 7) == 0)
833     {
834         te = NXT_HTTP_TE_CHUNKED;
835 
836     } else {
837         te = NXT_HTTP_TE_UNSUPPORTED;
838     }
839 
840     r->proto.h1->transfer_encoding = te;
841 
842     return NXT_OK;
843 }
844 
845 
846 static void
nxt_h1p_request_body_read(nxt_task_t * task,nxt_http_request_t * r)847 nxt_h1p_request_body_read(nxt_task_t *task, nxt_http_request_t *r)
848 {
849     size_t             size, body_length, body_buffer_size, body_rest;
850     ssize_t            res;
851     nxt_str_t          *tmp_path, tmp_name;
852     nxt_buf_t          *in, *b;
853     nxt_conn_t         *c;
854     nxt_h1proto_t      *h1p;
855     nxt_http_status_t  status;
856 
857     static const nxt_str_t tmp_name_pattern = nxt_string("/req-XXXXXXXX");
858 
859     h1p = r->proto.h1;
860 
861     nxt_debug(task, "h1p request body read %O te:%d",
862               r->content_length_n, h1p->transfer_encoding);
863 
864     switch (h1p->transfer_encoding) {
865 
866     case NXT_HTTP_TE_CHUNKED:
867         status = NXT_HTTP_LENGTH_REQUIRED;
868         goto error;
869 
870     case NXT_HTTP_TE_UNSUPPORTED:
871         status = NXT_HTTP_NOT_IMPLEMENTED;
872         goto error;
873 
874     default:
875     case NXT_HTTP_TE_NONE:
876         break;
877     }
878 
879     if (r->content_length_n == -1 || r->content_length_n == 0) {
880         goto ready;
881     }
882 
883     body_length = (size_t) r->content_length_n;
884 
885     body_buffer_size = nxt_min(r->conf->socket_conf->body_buffer_size,
886                                body_length);
887 
888     if (body_length > body_buffer_size) {
889         tmp_path = &r->conf->socket_conf->body_temp_path;
890 
891         tmp_name.length = tmp_path->length + tmp_name_pattern.length;
892 
893         b = nxt_buf_file_alloc(r->mem_pool,
894                                body_buffer_size + sizeof(nxt_file_t)
895                                + tmp_name.length + 1, 0);
896 
897     } else {
898         /* This initialization required for CentOS 6, gcc 4.4.7. */
899         tmp_path = NULL;
900         tmp_name.length = 0;
901 
902         b = nxt_buf_mem_alloc(r->mem_pool, body_buffer_size, 0);
903     }
904 
905     if (nxt_slow_path(b == NULL)) {
906         status = NXT_HTTP_INTERNAL_SERVER_ERROR;
907         goto error;
908     }
909 
910     r->body = b;
911 
912     if (body_length > body_buffer_size) {
913         tmp_name.start = nxt_pointer_to(b->mem.start, sizeof(nxt_file_t));
914 
915         memcpy(tmp_name.start, tmp_path->start, tmp_path->length);
916         memcpy(tmp_name.start + tmp_path->length, tmp_name_pattern.start,
917                tmp_name_pattern.length);
918         tmp_name.start[tmp_name.length] = '\0';
919 
920         b->file = (nxt_file_t *) b->mem.start;
921         nxt_memzero(b->file, sizeof(nxt_file_t));
922         b->file->fd = -1;
923         b->file->size = body_length;
924 
925         b->mem.start += sizeof(nxt_file_t) + tmp_name.length + 1;
926         b->mem.pos = b->mem.start;
927         b->mem.free = b->mem.start;
928 
929         b->file->fd = mkstemp((char *) tmp_name.start);
930         if (nxt_slow_path(b->file->fd == -1)) {
931             nxt_alert(task, "mkstemp(%s) failed %E", tmp_name.start, nxt_errno);
932 
933             status = NXT_HTTP_INTERNAL_SERVER_ERROR;
934             goto error;
935         }
936 
937         nxt_debug(task, "create body tmp file \"%V\", %d",
938                   &tmp_name, b->file->fd);
939 
940         unlink((char *) tmp_name.start);
941     }
942 
943     body_rest = body_length;
944 
945     in = h1p->conn->read;
946 
947     size = nxt_buf_mem_used_size(&in->mem);
948 
949     if (size != 0) {
950         size = nxt_min(size, body_length);
951 
952         if (nxt_buf_is_file(b)) {
953             res = nxt_fd_write(b->file->fd, in->mem.pos, size);
954             if (nxt_slow_path(res < (ssize_t) size)) {
955                 status = NXT_HTTP_INTERNAL_SERVER_ERROR;
956                 goto error;
957             }
958 
959             b->file_end += size;
960 
961         } else {
962             size = nxt_min(body_buffer_size, size);
963             b->mem.free = nxt_cpymem(b->mem.free, in->mem.pos, size);
964         }
965 
966         in->mem.pos += size;
967         body_rest -= size;
968     }
969 
970     nxt_debug(task, "h1p body rest: %uz", body_rest);
971 
972     if (body_rest != 0) {
973         in->next = h1p->buffers;
974         h1p->buffers = in;
975         h1p->nbuffers++;
976 
977         c = h1p->conn;
978         c->read = b;
979         c->read_state = &nxt_h1p_read_body_state;
980 
981         nxt_conn_read(task->thread->engine, c);
982         return;
983     }
984 
985     if (nxt_buf_is_file(b)) {
986         b->mem.start = NULL;
987         b->mem.end = NULL;
988         b->mem.pos = NULL;
989         b->mem.free = NULL;
990     }
991 
992 ready:
993 
994     r->state->ready_handler(task, r, NULL);
995 
996     return;
997 
998 error:
999 
1000     h1p->keepalive = 0;
1001 
1002     nxt_http_request_error(task, r, status);
1003 }
1004 
1005 
1006 static const nxt_conn_state_t  nxt_h1p_read_body_state
1007     nxt_aligned(64) =
1008 {
1009     .ready_handler = nxt_h1p_conn_request_body_read,
1010     .close_handler = nxt_h1p_conn_request_error,
1011     .error_handler = nxt_h1p_conn_request_error,
1012 
1013     .timer_handler = nxt_h1p_conn_request_timeout,
1014     .timer_value = nxt_h1p_conn_request_timer_value,
1015     .timer_data = offsetof(nxt_socket_conf_t, body_read_timeout),
1016     .timer_autoreset = 1,
1017 };
1018 
1019 
1020 static void
nxt_h1p_conn_request_body_read(nxt_task_t * task,void * obj,void * data)1021 nxt_h1p_conn_request_body_read(nxt_task_t *task, void *obj, void *data)
1022 {
1023     size_t              size, body_rest;
1024     ssize_t             res;
1025     nxt_buf_t           *b;
1026     nxt_conn_t          *c;
1027     nxt_h1proto_t       *h1p;
1028     nxt_http_request_t  *r;
1029     nxt_event_engine_t  *engine;
1030 
1031     c = obj;
1032     h1p = data;
1033 
1034     nxt_debug(task, "h1p conn request body read");
1035 
1036     r = h1p->request;
1037 
1038     engine = task->thread->engine;
1039 
1040     b = c->read;
1041 
1042     if (nxt_buf_is_file(b)) {
1043         body_rest = b->file->size - b->file_end;
1044 
1045         size = nxt_buf_mem_used_size(&b->mem);
1046         size = nxt_min(size, body_rest);
1047 
1048         res = nxt_fd_write(b->file->fd, b->mem.pos, size);
1049         if (nxt_slow_path(res < (ssize_t) size)) {
1050             nxt_h1p_request_error(task, h1p, r);
1051             return;
1052         }
1053 
1054         b->file_end += size;
1055         body_rest -= res;
1056 
1057         b->mem.pos += size;
1058 
1059         if (b->mem.pos == b->mem.free) {
1060             if (body_rest >= (size_t) nxt_buf_mem_size(&b->mem)) {
1061                 b->mem.free = b->mem.start;
1062 
1063             } else {
1064                 /* This required to avoid reading next request. */
1065                 b->mem.free = b->mem.end - body_rest;
1066             }
1067 
1068             b->mem.pos = b->mem.free;
1069         }
1070 
1071     } else {
1072         body_rest = nxt_buf_mem_free_size(&c->read->mem);
1073     }
1074 
1075     nxt_debug(task, "h1p body rest: %uz", body_rest);
1076 
1077     if (body_rest != 0) {
1078         nxt_conn_read(engine, c);
1079 
1080     } else {
1081         if (nxt_buf_is_file(b)) {
1082             b->mem.start = NULL;
1083             b->mem.end = NULL;
1084             b->mem.pos = NULL;
1085             b->mem.free = NULL;
1086         }
1087 
1088         c->read = NULL;
1089 
1090         r->state->ready_handler(task, r, NULL);
1091     }
1092 }
1093 
1094 
1095 static void
nxt_h1p_request_local_addr(nxt_task_t * task,nxt_http_request_t * r)1096 nxt_h1p_request_local_addr(nxt_task_t *task, nxt_http_request_t *r)
1097 {
1098     r->local = nxt_conn_local_addr(task, r->proto.h1->conn);
1099 }
1100 
1101 
1102 #define NXT_HTTP_LAST_INFORMATIONAL                                           \
1103     (NXT_HTTP_CONTINUE + nxt_nitems(nxt_http_informational) - 1)
1104 
1105 static const nxt_str_t  nxt_http_informational[] = {
1106     nxt_string("HTTP/1.1 100 Continue\r\n"),
1107     nxt_string("HTTP/1.1 101 Switching Protocols\r\n"),
1108 };
1109 
1110 
1111 #define NXT_HTTP_LAST_SUCCESS                                                 \
1112     (NXT_HTTP_OK + nxt_nitems(nxt_http_success) - 1)
1113 
1114 static const nxt_str_t  nxt_http_success[] = {
1115     nxt_string("HTTP/1.1 200 OK\r\n"),
1116     nxt_string("HTTP/1.1 201 Created\r\n"),
1117     nxt_string("HTTP/1.1 202 Accepted\r\n"),
1118     nxt_string("HTTP/1.1 203 Non-Authoritative Information\r\n"),
1119     nxt_string("HTTP/1.1 204 No Content\r\n"),
1120     nxt_string("HTTP/1.1 205 Reset Content\r\n"),
1121     nxt_string("HTTP/1.1 206 Partial Content\r\n"),
1122 };
1123 
1124 
1125 #define NXT_HTTP_LAST_REDIRECTION                                             \
1126     (NXT_HTTP_MULTIPLE_CHOICES + nxt_nitems(nxt_http_redirection) - 1)
1127 
1128 static const nxt_str_t  nxt_http_redirection[] = {
1129     nxt_string("HTTP/1.1 300 Multiple Choices\r\n"),
1130     nxt_string("HTTP/1.1 301 Moved Permanently\r\n"),
1131     nxt_string("HTTP/1.1 302 Found\r\n"),
1132     nxt_string("HTTP/1.1 303 See Other\r\n"),
1133     nxt_string("HTTP/1.1 304 Not Modified\r\n"),
1134     nxt_string("HTTP/1.1 307 Temporary Redirect\r\n"),
1135     nxt_string("HTTP/1.1 308 Permanent Redirect\r\n"),
1136 };
1137 
1138 
1139 #define NXT_HTTP_LAST_CLIENT_ERROR                                            \
1140     (NXT_HTTP_BAD_REQUEST + nxt_nitems(nxt_http_client_error) - 1)
1141 
1142 static const nxt_str_t  nxt_http_client_error[] = {
1143     nxt_string("HTTP/1.1 400 Bad Request\r\n"),
1144     nxt_string("HTTP/1.1 401 Unauthorized\r\n"),
1145     nxt_string("HTTP/1.1 402 Payment Required\r\n"),
1146     nxt_string("HTTP/1.1 403 Forbidden\r\n"),
1147     nxt_string("HTTP/1.1 404 Not Found\r\n"),
1148     nxt_string("HTTP/1.1 405 Method Not Allowed\r\n"),
1149     nxt_string("HTTP/1.1 406 Not Acceptable\r\n"),
1150     nxt_string("HTTP/1.1 407 Proxy Authentication Required\r\n"),
1151     nxt_string("HTTP/1.1 408 Request Timeout\r\n"),
1152     nxt_string("HTTP/1.1 409 Conflict\r\n"),
1153     nxt_string("HTTP/1.1 410 Gone\r\n"),
1154     nxt_string("HTTP/1.1 411 Length Required\r\n"),
1155     nxt_string("HTTP/1.1 412 Precondition Failed\r\n"),
1156     nxt_string("HTTP/1.1 413 Payload Too Large\r\n"),
1157     nxt_string("HTTP/1.1 414 URI Too Long\r\n"),
1158     nxt_string("HTTP/1.1 415 Unsupported Media Type\r\n"),
1159     nxt_string("HTTP/1.1 416 Range Not Satisfiable\r\n"),
1160     nxt_string("HTTP/1.1 417 Expectation Failed\r\n"),
1161     nxt_string("HTTP/1.1 418 I'm a teapot\r\n"),
1162     nxt_string("HTTP/1.1 419 \r\n"),
1163     nxt_string("HTTP/1.1 420 \r\n"),
1164     nxt_string("HTTP/1.1 421 Misdirected Request\r\n"),
1165     nxt_string("HTTP/1.1 422 Unprocessable Entity\r\n"),
1166     nxt_string("HTTP/1.1 423 Locked\r\n"),
1167     nxt_string("HTTP/1.1 424 Failed Dependency\r\n"),
1168     nxt_string("HTTP/1.1 425 \r\n"),
1169     nxt_string("HTTP/1.1 426 Upgrade Required\r\n"),
1170     nxt_string("HTTP/1.1 427 \r\n"),
1171     nxt_string("HTTP/1.1 428 \r\n"),
1172     nxt_string("HTTP/1.1 429 \r\n"),
1173     nxt_string("HTTP/1.1 430 \r\n"),
1174     nxt_string("HTTP/1.1 431 Request Header Fields Too Large\r\n"),
1175 };
1176 
1177 
1178 #define NXT_HTTP_LAST_NGINX_ERROR                                             \
1179     (NXT_HTTP_TO_HTTPS + nxt_nitems(nxt_http_nginx_error) - 1)
1180 
1181 static const nxt_str_t  nxt_http_nginx_error[] = {
1182     nxt_string("HTTP/1.1 400 "
1183                "The plain HTTP request was sent to HTTPS port\r\n"),
1184 };
1185 
1186 
1187 #define NXT_HTTP_LAST_SERVER_ERROR                                            \
1188     (NXT_HTTP_INTERNAL_SERVER_ERROR + nxt_nitems(nxt_http_server_error) - 1)
1189 
1190 static const nxt_str_t  nxt_http_server_error[] = {
1191     nxt_string("HTTP/1.1 500 Internal Server Error\r\n"),
1192     nxt_string("HTTP/1.1 501 Not Implemented\r\n"),
1193     nxt_string("HTTP/1.1 502 Bad Gateway\r\n"),
1194     nxt_string("HTTP/1.1 503 Service Unavailable\r\n"),
1195     nxt_string("HTTP/1.1 504 Gateway Timeout\r\n"),
1196     nxt_string("HTTP/1.1 505 HTTP Version Not Supported\r\n"),
1197 };
1198 
1199 
1200 #define UNKNOWN_STATUS_LENGTH  nxt_length("HTTP/1.1 999 \r\n")
1201 
1202 static void
nxt_h1p_request_header_send(nxt_task_t * task,nxt_http_request_t * r,nxt_work_handler_t body_handler,void * data)1203 nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r,
1204     nxt_work_handler_t body_handler, void *data)
1205 {
1206     u_char              *p;
1207     size_t              size;
1208     nxt_buf_t           *header;
1209     nxt_str_t           unknown_status;
1210     nxt_int_t           conn;
1211     nxt_uint_t          n;
1212     nxt_bool_t          http11;
1213     nxt_conn_t          *c;
1214     nxt_h1proto_t       *h1p;
1215     const nxt_str_t     *status;
1216     nxt_http_field_t    *field;
1217     u_char              buf[UNKNOWN_STATUS_LENGTH];
1218 
1219     static const char   chunked[] = "Transfer-Encoding: chunked\r\n";
1220     static const char   websocket_version[] = "Sec-WebSocket-Version: 13\r\n";
1221 
1222     static const nxt_str_t  connection[3] = {
1223         nxt_string("Connection: close\r\n"),
1224         nxt_string("Connection: keep-alive\r\n"),
1225         nxt_string("Upgrade: websocket\r\n"
1226                    "Connection: Upgrade\r\n"
1227                    "Sec-WebSocket-Accept: "),
1228     };
1229 
1230     nxt_debug(task, "h1p request header send");
1231 
1232     r->header_sent = 1;
1233     h1p = r->proto.h1;
1234     n = r->status;
1235 
1236     if (n >= NXT_HTTP_CONTINUE && n <= NXT_HTTP_LAST_INFORMATIONAL) {
1237         status = &nxt_http_informational[n - NXT_HTTP_CONTINUE];
1238 
1239     } else if (n >= NXT_HTTP_OK && n <= NXT_HTTP_LAST_SUCCESS) {
1240         status = &nxt_http_success[n - NXT_HTTP_OK];
1241 
1242     } else if (n >= NXT_HTTP_MULTIPLE_CHOICES
1243                && n <= NXT_HTTP_LAST_REDIRECTION)
1244     {
1245         status = &nxt_http_redirection[n - NXT_HTTP_MULTIPLE_CHOICES];
1246 
1247     } else if (n >= NXT_HTTP_BAD_REQUEST && n <= NXT_HTTP_LAST_CLIENT_ERROR) {
1248         status = &nxt_http_client_error[n - NXT_HTTP_BAD_REQUEST];
1249 
1250     } else if (n >= NXT_HTTP_TO_HTTPS && n <= NXT_HTTP_LAST_NGINX_ERROR) {
1251         status = &nxt_http_nginx_error[n - NXT_HTTP_TO_HTTPS];
1252 
1253     } else if (n >= NXT_HTTP_INTERNAL_SERVER_ERROR
1254                && n <= NXT_HTTP_LAST_SERVER_ERROR)
1255     {
1256         status = &nxt_http_server_error[n - NXT_HTTP_INTERNAL_SERVER_ERROR];
1257 
1258     } else if (n <= NXT_HTTP_STATUS_MAX) {
1259         (void) nxt_sprintf(buf, buf + UNKNOWN_STATUS_LENGTH,
1260                            "HTTP/1.1 %03d \r\n", n);
1261 
1262         unknown_status.length = UNKNOWN_STATUS_LENGTH;
1263         unknown_status.start = buf;
1264         status = &unknown_status;
1265 
1266     } else {
1267         status = &nxt_http_server_error[0];
1268     }
1269 
1270     size = status->length;
1271     /* Trailing CRLF at the end of header. */
1272     size += nxt_length("\r\n");
1273 
1274     conn = -1;
1275 
1276     if (r->websocket_handshake && n == NXT_HTTP_SWITCHING_PROTOCOLS) {
1277         h1p->websocket = 1;
1278         h1p->keepalive = 0;
1279         conn = 2;
1280         size += NXT_WEBSOCKET_ACCEPT_SIZE + 2;
1281 
1282     } else {
1283         http11 = (h1p->parser.version.s.minor != '0');
1284 
1285         if (r->resp.content_length == NULL || r->resp.content_length->skip) {
1286 
1287             if (http11) {
1288                 if (n != NXT_HTTP_NOT_MODIFIED
1289                     && n != NXT_HTTP_NO_CONTENT
1290                     && body_handler != NULL
1291                     && !h1p->websocket)
1292                 {
1293                     h1p->chunked = 1;
1294                     size += nxt_length(chunked);
1295                     /* Trailing CRLF will be added by the first chunk header. */
1296                     size -= nxt_length("\r\n");
1297                 }
1298 
1299             } else {
1300                 h1p->keepalive = 0;
1301             }
1302         }
1303 
1304         if (http11 ^ h1p->keepalive) {
1305             conn = h1p->keepalive;
1306         }
1307     }
1308 
1309     if (conn >= 0) {
1310         size += connection[conn].length;
1311     }
1312 
1313     nxt_list_each(field, r->resp.fields) {
1314 
1315         if (!field->skip) {
1316             size += field->name_length + field->value_length;
1317             size += nxt_length(": \r\n");
1318         }
1319 
1320     } nxt_list_loop;
1321 
1322     if (nxt_slow_path(n == NXT_HTTP_UPGRADE_REQUIRED)) {
1323         size += nxt_length(websocket_version);
1324     }
1325 
1326     header = nxt_http_buf_mem(task, r, size);
1327     if (nxt_slow_path(header == NULL)) {
1328         nxt_h1p_request_error(task, h1p, r);
1329         return;
1330     }
1331 
1332     p = nxt_cpymem(header->mem.free, status->start, status->length);
1333 
1334     nxt_list_each(field, r->resp.fields) {
1335 
1336         if (!field->skip) {
1337             p = nxt_cpymem(p, field->name, field->name_length);
1338             *p++ = ':'; *p++ = ' ';
1339             p = nxt_cpymem(p, field->value, field->value_length);
1340             *p++ = '\r'; *p++ = '\n';
1341         }
1342 
1343     } nxt_list_loop;
1344 
1345     if (conn >= 0) {
1346         p = nxt_cpymem(p, connection[conn].start, connection[conn].length);
1347     }
1348 
1349     if (h1p->websocket) {
1350         nxt_websocket_accept(p, h1p->websocket_key->value);
1351         p += NXT_WEBSOCKET_ACCEPT_SIZE;
1352 
1353         *p++ = '\r'; *p++ = '\n';
1354     }
1355 
1356     if (nxt_slow_path(n == NXT_HTTP_UPGRADE_REQUIRED)) {
1357         p = nxt_cpymem(p, websocket_version, nxt_length(websocket_version));
1358     }
1359 
1360     if (h1p->chunked) {
1361         p = nxt_cpymem(p, chunked, nxt_length(chunked));
1362         /* Trailing CRLF will be added by the first chunk header. */
1363 
1364     } else {
1365         *p++ = '\r'; *p++ = '\n';
1366     }
1367 
1368     header->mem.free = p;
1369 
1370     h1p->header_size = nxt_buf_mem_used_size(&header->mem);
1371 
1372     c = h1p->conn;
1373 
1374     c->write = header;
1375     h1p->conn_write_tail = &header->next;
1376     c->write_state = &nxt_h1p_request_send_state;
1377 
1378     if (body_handler != NULL) {
1379         /*
1380          * The body handler will run before c->io->write() handler,
1381          * because the latter was inqueued by nxt_conn_write()
1382          * in engine->write_work_queue.
1383          */
1384         nxt_work_queue_add(&task->thread->engine->fast_work_queue,
1385                            body_handler, task, r, data);
1386 
1387     } else {
1388         header->next = nxt_http_buf_last(r);
1389     }
1390 
1391     nxt_conn_write(task->thread->engine, c);
1392 
1393     if (h1p->websocket) {
1394         nxt_h1p_websocket_first_frame_start(task, r, c->read);
1395     }
1396 }
1397 
1398 
1399 void
nxt_h1p_complete_buffers(nxt_task_t * task,nxt_h1proto_t * h1p,nxt_bool_t all)1400 nxt_h1p_complete_buffers(nxt_task_t *task, nxt_h1proto_t *h1p, nxt_bool_t all)
1401 {
1402     size_t            size;
1403     nxt_buf_t         *b, *in, *next;
1404     nxt_conn_t        *c;
1405 
1406     nxt_debug(task, "h1p complete buffers");
1407 
1408     b = h1p->buffers;
1409     c = h1p->conn;
1410     in = c->read;
1411 
1412     if (b != NULL) {
1413         if (in == NULL) {
1414             /* A request with large body. */
1415             in = b;
1416             c->read = in;
1417 
1418             b = in->next;
1419             in->next = NULL;
1420         }
1421 
1422         while (b != NULL) {
1423             next = b->next;
1424             b->next = NULL;
1425 
1426             b->completion_handler(task, b, b->parent);
1427 
1428             b = next;
1429         }
1430 
1431         h1p->buffers = NULL;
1432         h1p->nbuffers = 0;
1433     }
1434 
1435     if (in != NULL) {
1436         size = nxt_buf_mem_used_size(&in->mem);
1437 
1438         if (size == 0 || all) {
1439             in->completion_handler(task, in, in->parent);
1440 
1441             c->read = NULL;
1442         }
1443     }
1444 }
1445 
1446 
1447 static const nxt_conn_state_t  nxt_h1p_request_send_state
1448     nxt_aligned(64) =
1449 {
1450     .ready_handler = nxt_h1p_conn_sent,
1451     .error_handler = nxt_h1p_conn_request_error,
1452 
1453     .timer_handler = nxt_h1p_conn_request_send_timeout,
1454     .timer_value = nxt_h1p_conn_request_timer_value,
1455     .timer_data = offsetof(nxt_socket_conf_t, send_timeout),
1456     .timer_autoreset = 1,
1457 };
1458 
1459 
1460 static void
nxt_h1p_request_send(nxt_task_t * task,nxt_http_request_t * r,nxt_buf_t * out)1461 nxt_h1p_request_send(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *out)
1462 {
1463     nxt_conn_t     *c;
1464     nxt_h1proto_t  *h1p;
1465 
1466     nxt_debug(task, "h1p request send");
1467 
1468     h1p = r->proto.h1;
1469     c = h1p->conn;
1470 
1471     if (h1p->chunked) {
1472         out = nxt_h1p_chunk_create(task, r, out);
1473         if (nxt_slow_path(out == NULL)) {
1474             nxt_h1p_request_error(task, h1p, r);
1475             return;
1476         }
1477     }
1478 
1479     if (c->write == NULL) {
1480         c->write = out;
1481         c->write_state = &nxt_h1p_request_send_state;
1482 
1483         nxt_conn_write(task->thread->engine, c);
1484 
1485     } else {
1486         *h1p->conn_write_tail = out;
1487     }
1488 
1489     while (out->next != NULL) {
1490         out = out->next;
1491     }
1492 
1493     h1p->conn_write_tail = &out->next;
1494 }
1495 
1496 
1497 static nxt_buf_t *
nxt_h1p_chunk_create(nxt_task_t * task,nxt_http_request_t * r,nxt_buf_t * out)1498 nxt_h1p_chunk_create(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *out)
1499 {
1500     nxt_off_t          size;
1501     nxt_buf_t          *b, **prev, *header, *tail;
1502 
1503     const size_t       chunk_size = 2 * nxt_length("\r\n") + NXT_OFF_T_HEXLEN;
1504     static const char  tail_chunk[] = "\r\n0\r\n\r\n";
1505 
1506     size = 0;
1507     prev = &out;
1508 
1509     for (b = out; b != NULL; b = b->next) {
1510 
1511         if (nxt_buf_is_last(b)) {
1512             tail = nxt_http_buf_mem(task, r, sizeof(tail_chunk));
1513             if (nxt_slow_path(tail == NULL)) {
1514                 return NULL;
1515             }
1516 
1517             *prev = tail;
1518             tail->next = b;
1519             /*
1520              * The tail_chunk size with trailing zero is 8 bytes, so
1521              * memcpy may be inlined with just single 8 byte move operation.
1522              */
1523             nxt_memcpy(tail->mem.free, tail_chunk, sizeof(tail_chunk));
1524             tail->mem.free += nxt_length(tail_chunk);
1525 
1526             break;
1527         }
1528 
1529         size += nxt_buf_used_size(b);
1530         prev = &b->next;
1531     }
1532 
1533     if (size == 0) {
1534         return out;
1535     }
1536 
1537     header = nxt_http_buf_mem(task, r, chunk_size);
1538     if (nxt_slow_path(header == NULL)) {
1539         return NULL;
1540     }
1541 
1542     header->next = out;
1543     header->mem.free = nxt_sprintf(header->mem.free, header->mem.end,
1544                                    "\r\n%xO\r\n", size);
1545     return header;
1546 }
1547 
1548 
1549 static nxt_off_t
nxt_h1p_request_body_bytes_sent(nxt_task_t * task,nxt_http_proto_t proto)1550 nxt_h1p_request_body_bytes_sent(nxt_task_t *task, nxt_http_proto_t proto)
1551 {
1552     nxt_off_t      sent;
1553     nxt_h1proto_t  *h1p;
1554 
1555     h1p = proto.h1;
1556 
1557     sent = h1p->conn->sent - h1p->header_size;
1558 
1559     return (sent > 0) ? sent : 0;
1560 }
1561 
1562 
1563 static void
nxt_h1p_request_discard(nxt_task_t * task,nxt_http_request_t * r,nxt_buf_t * last)1564 nxt_h1p_request_discard(nxt_task_t *task, nxt_http_request_t *r,
1565     nxt_buf_t *last)
1566 {
1567     nxt_buf_t         *b;
1568     nxt_conn_t        *c;
1569     nxt_h1proto_t     *h1p;
1570     nxt_work_queue_t  *wq;
1571 
1572     nxt_debug(task, "h1p request discard");
1573 
1574     h1p = r->proto.h1;
1575     h1p->keepalive = 0;
1576 
1577     c = h1p->conn;
1578     b = c->write;
1579     c->write = NULL;
1580 
1581     wq = &task->thread->engine->fast_work_queue;
1582 
1583     nxt_sendbuf_drain(task, wq, b);
1584     nxt_sendbuf_drain(task, wq, last);
1585 }
1586 
1587 
1588 static void
nxt_h1p_conn_request_error(nxt_task_t * task,void * obj,void * data)1589 nxt_h1p_conn_request_error(nxt_task_t *task, void *obj, void *data)
1590 {
1591     nxt_h1proto_t       *h1p;
1592     nxt_http_request_t  *r;
1593 
1594     h1p = data;
1595 
1596     nxt_debug(task, "h1p conn request error");
1597 
1598     r = h1p->request;
1599 
1600     if (nxt_slow_path(r == NULL)) {
1601         nxt_h1p_shutdown(task, h1p->conn);
1602         return;
1603     }
1604 
1605     if (r->fields == NULL) {
1606         (void) nxt_h1p_header_process(task, h1p, r);
1607     }
1608 
1609     if (r->status == 0) {
1610         r->status = NXT_HTTP_BAD_REQUEST;
1611     }
1612 
1613     nxt_h1p_request_error(task, h1p, r);
1614 }
1615 
1616 
1617 static void
nxt_h1p_conn_request_timeout(nxt_task_t * task,void * obj,void * data)1618 nxt_h1p_conn_request_timeout(nxt_task_t *task, void *obj, void *data)
1619 {
1620     nxt_conn_t          *c;
1621     nxt_timer_t         *timer;
1622     nxt_h1proto_t       *h1p;
1623     nxt_http_request_t  *r;
1624 
1625     timer = obj;
1626 
1627     nxt_debug(task, "h1p conn request timeout");
1628 
1629     c = nxt_read_timer_conn(timer);
1630     c->block_read = 1;
1631     /*
1632      * Disable SO_LINGER off during socket closing
1633      * to send "408 Request Timeout" error response.
1634      */
1635     c->socket.timedout = 0;
1636 
1637     h1p = c->socket.data;
1638     h1p->keepalive = 0;
1639     r = h1p->request;
1640 
1641     if (r->fields == NULL) {
1642         (void) nxt_h1p_header_process(task, h1p, r);
1643     }
1644 
1645     nxt_http_request_error(task, r, NXT_HTTP_REQUEST_TIMEOUT);
1646 }
1647 
1648 
1649 static void
nxt_h1p_conn_request_send_timeout(nxt_task_t * task,void * obj,void * data)1650 nxt_h1p_conn_request_send_timeout(nxt_task_t *task, void *obj, void *data)
1651 {
1652     nxt_conn_t     *c;
1653     nxt_timer_t    *timer;
1654     nxt_h1proto_t  *h1p;
1655 
1656     timer = obj;
1657 
1658     nxt_debug(task, "h1p conn request send timeout");
1659 
1660     c = nxt_write_timer_conn(timer);
1661     c->block_write = 1;
1662     h1p = c->socket.data;
1663 
1664     nxt_h1p_request_error(task, h1p, h1p->request);
1665 }
1666 
1667 
1668 nxt_msec_t
nxt_h1p_conn_request_timer_value(nxt_conn_t * c,uintptr_t data)1669 nxt_h1p_conn_request_timer_value(nxt_conn_t *c, uintptr_t data)
1670 {
1671     nxt_h1proto_t  *h1p;
1672 
1673     h1p = c->socket.data;
1674 
1675     return nxt_value_at(nxt_msec_t, h1p->request->conf->socket_conf, data);
1676 }
1677 
1678 
1679 nxt_inline void
nxt_h1p_request_error(nxt_task_t * task,nxt_h1proto_t * h1p,nxt_http_request_t * r)1680 nxt_h1p_request_error(nxt_task_t *task, nxt_h1proto_t *h1p,
1681     nxt_http_request_t *r)
1682 {
1683     h1p->keepalive = 0;
1684 
1685     r->state->error_handler(task, r, h1p);
1686 }
1687 
1688 
1689 static void
nxt_h1p_request_close(nxt_task_t * task,nxt_http_proto_t proto,nxt_socket_conf_joint_t * joint)1690 nxt_h1p_request_close(nxt_task_t *task, nxt_http_proto_t proto,
1691     nxt_socket_conf_joint_t *joint)
1692 {
1693     nxt_conn_t     *c;
1694     nxt_h1proto_t  *h1p;
1695 
1696     nxt_debug(task, "h1p request close");
1697 
1698     h1p = proto.h1;
1699     h1p->keepalive &= !h1p->request->inconsistent;
1700     h1p->request = NULL;
1701 
1702     nxt_router_conf_release(task, joint);
1703 
1704     c = h1p->conn;
1705     task = &c->task;
1706     c->socket.task = task;
1707     c->read_timer.task = task;
1708     c->write_timer.task = task;
1709 
1710     if (h1p->keepalive) {
1711         nxt_h1p_keepalive(task, h1p, c);
1712 
1713     } else {
1714         nxt_h1p_shutdown(task, c);
1715     }
1716 }
1717 
1718 
1719 static void
nxt_h1p_conn_sent(nxt_task_t * task,void * obj,void * data)1720 nxt_h1p_conn_sent(nxt_task_t *task, void *obj, void *data)
1721 {
1722     nxt_conn_t          *c;
1723     nxt_event_engine_t  *engine;
1724 
1725     c = obj;
1726 
1727     nxt_debug(task, "h1p conn sent");
1728 
1729     engine = task->thread->engine;
1730 
1731     c->write = nxt_sendbuf_completion(task, &engine->fast_work_queue, c->write);
1732 
1733     if (c->write != NULL) {
1734         nxt_conn_write(engine, c);
1735     }
1736 }
1737 
1738 
1739 static void
nxt_h1p_conn_close(nxt_task_t * task,void * obj,void * data)1740 nxt_h1p_conn_close(nxt_task_t *task, void *obj, void *data)
1741 {
1742     nxt_conn_t  *c;
1743 
1744     c = obj;
1745 
1746     nxt_debug(task, "h1p conn close");
1747 
1748     nxt_conn_active(task->thread->engine, c);
1749 
1750     nxt_h1p_shutdown(task, c);
1751 }
1752 
1753 
1754 static void
nxt_h1p_conn_error(nxt_task_t * task,void * obj,void * data)1755 nxt_h1p_conn_error(nxt_task_t *task, void *obj, void *data)
1756 {
1757     nxt_conn_t  *c;
1758 
1759     c = obj;
1760 
1761     nxt_debug(task, "h1p conn error");
1762 
1763     nxt_conn_active(task->thread->engine, c);
1764 
1765     nxt_h1p_shutdown(task, c);
1766 }
1767 
1768 
1769 static nxt_msec_t
nxt_h1p_conn_timer_value(nxt_conn_t * c,uintptr_t data)1770 nxt_h1p_conn_timer_value(nxt_conn_t *c, uintptr_t data)
1771 {
1772     nxt_socket_conf_joint_t  *joint;
1773 
1774     joint = c->listen->socket.data;
1775 
1776     if (nxt_fast_path(joint != NULL)) {
1777         return nxt_value_at(nxt_msec_t, joint->socket_conf, data);
1778     }
1779 
1780     /*
1781      * Listening socket had been closed while
1782      * connection was in keep-alive state.
1783      */
1784     return 1;
1785 }
1786 
1787 
1788 static void
nxt_h1p_keepalive(nxt_task_t * task,nxt_h1proto_t * h1p,nxt_conn_t * c)1789 nxt_h1p_keepalive(nxt_task_t *task, nxt_h1proto_t *h1p, nxt_conn_t *c)
1790 {
1791     size_t              size;
1792     nxt_buf_t           *in;
1793     nxt_event_engine_t  *engine;
1794 
1795     nxt_debug(task, "h1p keepalive");
1796 
1797     if (!c->tcp_nodelay) {
1798         nxt_conn_tcp_nodelay_on(task, c);
1799     }
1800 
1801     nxt_h1p_complete_buffers(task, h1p, 0);
1802 
1803     in = c->read;
1804 
1805     nxt_memzero(h1p, offsetof(nxt_h1proto_t, conn));
1806 
1807     c->sent = 0;
1808 
1809     engine = task->thread->engine;
1810 
1811     nxt_conn_idle(engine, c);
1812 
1813     if (in == NULL) {
1814         c->read_state = &nxt_h1p_keepalive_state;
1815 
1816         nxt_conn_read(engine, c);
1817 
1818     } else {
1819         size = nxt_buf_mem_used_size(&in->mem);
1820 
1821         nxt_debug(task, "h1p pipelining");
1822 
1823         nxt_memmove(in->mem.start, in->mem.pos, size);
1824 
1825         in->mem.pos = in->mem.start;
1826         in->mem.free = in->mem.start + size;
1827 
1828         nxt_h1p_conn_request_init(task, c, c->socket.data);
1829     }
1830 }
1831 
1832 
1833 static const nxt_conn_state_t  nxt_h1p_keepalive_state
1834     nxt_aligned(64) =
1835 {
1836     .ready_handler = nxt_h1p_conn_request_init,
1837     .close_handler = nxt_h1p_conn_close,
1838     .error_handler = nxt_h1p_conn_error,
1839 
1840     .io_read_handler = nxt_h1p_idle_io_read_handler,
1841 
1842     .timer_handler = nxt_h1p_idle_timeout,
1843     .timer_value = nxt_h1p_conn_timer_value,
1844     .timer_data = offsetof(nxt_socket_conf_t, idle_timeout),
1845     .timer_autoreset = 1,
1846 };
1847 
1848 
1849 const nxt_conn_state_t  nxt_h1p_idle_close_state
1850     nxt_aligned(64) =
1851 {
1852     .close_handler = nxt_h1p_idle_close,
1853 };
1854 
1855 
1856 static void
nxt_h1p_idle_close(nxt_task_t * task,void * obj,void * data)1857 nxt_h1p_idle_close(nxt_task_t *task, void *obj, void *data)
1858 {
1859     nxt_conn_t  *c;
1860 
1861     c = obj;
1862 
1863     nxt_debug(task, "h1p idle close");
1864 
1865     nxt_conn_active(task->thread->engine, c);
1866 
1867     nxt_h1p_idle_response(task, c);
1868 }
1869 
1870 
1871 static void
nxt_h1p_idle_timeout(nxt_task_t * task,void * obj,void * data)1872 nxt_h1p_idle_timeout(nxt_task_t *task, void *obj, void *data)
1873 {
1874     nxt_conn_t   *c;
1875     nxt_timer_t  *timer;
1876 
1877     timer = obj;
1878 
1879     nxt_debug(task, "h1p idle timeout");
1880 
1881     c = nxt_read_timer_conn(timer);
1882     c->block_read = 1;
1883 
1884     nxt_conn_active(task->thread->engine, c);
1885 
1886     nxt_h1p_idle_response(task, c);
1887 }
1888 
1889 
1890 #define NXT_H1P_IDLE_TIMEOUT                                                  \
1891     "HTTP/1.1 408 Request Timeout\r\n"                                        \
1892     "Server: " NXT_SERVER "\r\n"                                              \
1893     "Connection: close\r\n"                                                   \
1894     "Content-Length: 0\r\n"                                                   \
1895     "Date: "
1896 
1897 
1898 static void
nxt_h1p_idle_response(nxt_task_t * task,nxt_conn_t * c)1899 nxt_h1p_idle_response(nxt_task_t *task, nxt_conn_t *c)
1900 {
1901     u_char     *p;
1902     size_t     size;
1903     nxt_buf_t  *out, *last;
1904 
1905     size = nxt_length(NXT_H1P_IDLE_TIMEOUT)
1906            + nxt_http_date_cache.size
1907            + nxt_length("\r\n\r\n");
1908 
1909     out = nxt_buf_mem_alloc(c->mem_pool, size, 0);
1910     if (nxt_slow_path(out == NULL)) {
1911         goto fail;
1912     }
1913 
1914     p = nxt_cpymem(out->mem.free, NXT_H1P_IDLE_TIMEOUT,
1915                    nxt_length(NXT_H1P_IDLE_TIMEOUT));
1916 
1917     p = nxt_thread_time_string(task->thread, &nxt_http_date_cache, p);
1918 
1919     out->mem.free = nxt_cpymem(p, "\r\n\r\n", 4);
1920 
1921     last = nxt_mp_zget(c->mem_pool, NXT_BUF_SYNC_SIZE);
1922     if (nxt_slow_path(last == NULL)) {
1923         goto fail;
1924     }
1925 
1926     out->next = last;
1927     nxt_buf_set_sync(last);
1928     nxt_buf_set_last(last);
1929 
1930     last->completion_handler = nxt_h1p_idle_response_sent;
1931     last->parent = c;
1932 
1933     c->write = out;
1934     c->write_state = &nxt_h1p_timeout_response_state;
1935 
1936     nxt_conn_write(task->thread->engine, c);
1937     return;
1938 
1939 fail:
1940 
1941     nxt_h1p_shutdown(task, c);
1942 }
1943 
1944 
1945 static const nxt_conn_state_t  nxt_h1p_timeout_response_state
1946     nxt_aligned(64) =
1947 {
1948     .ready_handler = nxt_h1p_conn_sent,
1949     .error_handler = nxt_h1p_idle_response_error,
1950 
1951     .timer_handler = nxt_h1p_idle_response_timeout,
1952     .timer_value = nxt_h1p_idle_response_timer_value,
1953 };
1954 
1955 
1956 static void
nxt_h1p_idle_response_sent(nxt_task_t * task,void * obj,void * data)1957 nxt_h1p_idle_response_sent(nxt_task_t *task, void *obj, void *data)
1958 {
1959     nxt_conn_t  *c;
1960 
1961     c = data;
1962 
1963     nxt_debug(task, "h1p idle timeout response sent");
1964 
1965     nxt_h1p_shutdown(task, c);
1966 }
1967 
1968 
1969 static void
nxt_h1p_idle_response_error(nxt_task_t * task,void * obj,void * data)1970 nxt_h1p_idle_response_error(nxt_task_t *task, void *obj, void *data)
1971 {
1972     nxt_conn_t  *c;
1973 
1974     c = obj;
1975 
1976     nxt_debug(task, "h1p response error");
1977 
1978     nxt_h1p_shutdown(task, c);
1979 }
1980 
1981 
1982 static void
nxt_h1p_idle_response_timeout(nxt_task_t * task,void * obj,void * data)1983 nxt_h1p_idle_response_timeout(nxt_task_t *task, void *obj, void *data)
1984 {
1985     nxt_conn_t   *c;
1986     nxt_timer_t  *timer;
1987 
1988     timer = obj;
1989 
1990     nxt_debug(task, "h1p idle timeout response timeout");
1991 
1992     c = nxt_read_timer_conn(timer);
1993     c->block_write = 1;
1994 
1995     nxt_h1p_shutdown(task, c);
1996 }
1997 
1998 
1999 static nxt_msec_t
nxt_h1p_idle_response_timer_value(nxt_conn_t * c,uintptr_t data)2000 nxt_h1p_idle_response_timer_value(nxt_conn_t *c, uintptr_t data)
2001 {
2002     return 10 * 1000;
2003 }
2004 
2005 
2006 static void
nxt_h1p_shutdown(nxt_task_t * task,nxt_conn_t * c)2007 nxt_h1p_shutdown(nxt_task_t *task, nxt_conn_t *c)
2008 {
2009     nxt_timer_t    *timer;
2010     nxt_h1proto_t  *h1p;
2011 
2012     nxt_debug(task, "h1p shutdown");
2013 
2014     h1p = c->socket.data;
2015 
2016     if (h1p != NULL) {
2017         nxt_h1p_complete_buffers(task, h1p, 1);
2018 
2019         if (nxt_slow_path(h1p->websocket_timer != NULL)) {
2020             timer = &h1p->websocket_timer->timer;
2021 
2022             if (timer->handler != nxt_h1p_conn_ws_shutdown) {
2023                 timer->handler = nxt_h1p_conn_ws_shutdown;
2024                 nxt_timer_add(task->thread->engine, timer, 0);
2025 
2026             } else {
2027                 nxt_debug(task, "h1p already scheduled ws shutdown");
2028             }
2029 
2030             return;
2031         }
2032     }
2033 
2034     nxt_h1p_closing(task, c);
2035 }
2036 
2037 
2038 static void
nxt_h1p_conn_ws_shutdown(nxt_task_t * task,void * obj,void * data)2039 nxt_h1p_conn_ws_shutdown(nxt_task_t *task, void *obj, void *data)
2040 {
2041     nxt_timer_t                *timer;
2042     nxt_h1p_websocket_timer_t  *ws_timer;
2043 
2044     nxt_debug(task, "h1p conn ws shutdown");
2045 
2046     timer = obj;
2047     ws_timer = nxt_timer_data(timer, nxt_h1p_websocket_timer_t, timer);
2048 
2049     nxt_h1p_closing(task, ws_timer->h1p->conn);
2050 }
2051 
2052 
2053 static void
nxt_h1p_closing(nxt_task_t * task,nxt_conn_t * c)2054 nxt_h1p_closing(nxt_task_t *task, nxt_conn_t *c)
2055 {
2056     nxt_debug(task, "h1p closing");
2057 
2058     c->socket.data = NULL;
2059 
2060 #if (NXT_TLS)
2061 
2062     if (c->u.tls != NULL) {
2063         c->write_state = &nxt_h1p_shutdown_state;
2064 
2065         c->io->shutdown(task, c, NULL);
2066         return;
2067     }
2068 
2069 #endif
2070 
2071     nxt_h1p_conn_closing(task, c, NULL);
2072 }
2073 
2074 
2075 #if (NXT_TLS)
2076 
2077 static const nxt_conn_state_t  nxt_h1p_shutdown_state
2078     nxt_aligned(64) =
2079 {
2080     .ready_handler = nxt_h1p_conn_closing,
2081     .close_handler = nxt_h1p_conn_closing,
2082     .error_handler = nxt_h1p_conn_closing,
2083 };
2084 
2085 #endif
2086 
2087 
2088 static void
nxt_h1p_conn_closing(nxt_task_t * task,void * obj,void * data)2089 nxt_h1p_conn_closing(nxt_task_t *task, void *obj, void *data)
2090 {
2091     nxt_conn_t  *c;
2092 
2093     c = obj;
2094 
2095     nxt_debug(task, "h1p conn closing");
2096 
2097     c->write_state = &nxt_h1p_close_state;
2098 
2099     nxt_conn_close(task->thread->engine, c);
2100 }
2101 
2102 
2103 static const nxt_conn_state_t  nxt_h1p_close_state
2104     nxt_aligned(64) =
2105 {
2106     .ready_handler = nxt_h1p_conn_free,
2107 };
2108 
2109 
2110 static void
nxt_h1p_conn_free(nxt_task_t * task,void * obj,void * data)2111 nxt_h1p_conn_free(nxt_task_t *task, void *obj, void *data)
2112 {
2113     nxt_conn_t          *c;
2114     nxt_listen_event_t  *lev;
2115     nxt_event_engine_t  *engine;
2116 
2117     c = obj;
2118 
2119     nxt_debug(task, "h1p conn free");
2120 
2121     engine = task->thread->engine;
2122 
2123     nxt_sockaddr_cache_free(engine, c);
2124 
2125     lev = c->listen;
2126 
2127     nxt_conn_free(task, c);
2128 
2129     nxt_router_listen_event_release(&engine->task, lev, NULL);
2130 }
2131 
2132 
2133 static void
nxt_h1p_peer_connect(nxt_task_t * task,nxt_http_peer_t * peer)2134 nxt_h1p_peer_connect(nxt_task_t *task, nxt_http_peer_t *peer)
2135 {
2136     nxt_mp_t            *mp;
2137     nxt_int_t           ret;
2138     nxt_conn_t          *c, *client;
2139     nxt_h1proto_t       *h1p;
2140     nxt_fd_event_t      *socket;
2141     nxt_work_queue_t    *wq;
2142     nxt_http_request_t  *r;
2143 
2144     nxt_debug(task, "h1p peer connect");
2145 
2146     peer->status = NXT_HTTP_UNSET;
2147     r = peer->request;
2148 
2149     mp = nxt_mp_create(1024, 128, 256, 32);
2150 
2151     if (nxt_slow_path(mp == NULL)) {
2152         goto fail;
2153     }
2154 
2155     h1p = nxt_mp_zalloc(mp, sizeof(nxt_h1proto_t));
2156     if (nxt_slow_path(h1p == NULL)) {
2157         goto fail;
2158     }
2159 
2160     ret = nxt_http_parse_request_init(&h1p->parser, r->mem_pool);
2161     if (nxt_slow_path(ret != NXT_OK)) {
2162         goto fail;
2163     }
2164 
2165     c = nxt_conn_create(mp, task);
2166     if (nxt_slow_path(c == NULL)) {
2167         goto fail;
2168     }
2169 
2170     c->mem_pool = mp;
2171     h1p->conn = c;
2172 
2173     peer->proto.h1 = h1p;
2174     h1p->request = r;
2175 
2176     c->socket.data = peer;
2177     c->remote = peer->server->sockaddr;
2178 
2179     c->socket.write_ready = 1;
2180     c->write_state = &nxt_h1p_peer_connect_state;
2181 
2182     /*
2183      * TODO: queues should be implemented via client proto interface.
2184      */
2185     client = r->proto.h1->conn;
2186 
2187     socket = &client->socket;
2188     wq = socket->read_work_queue;
2189     c->read_work_queue = wq;
2190     c->socket.read_work_queue = wq;
2191     c->read_timer.work_queue = wq;
2192 
2193     wq = socket->write_work_queue;
2194     c->write_work_queue = wq;
2195     c->socket.write_work_queue = wq;
2196     c->write_timer.work_queue = wq;
2197     /* TODO END */
2198 
2199     nxt_conn_connect(task->thread->engine, c);
2200 
2201     return;
2202 
2203 fail:
2204 
2205     peer->status = NXT_HTTP_INTERNAL_SERVER_ERROR;
2206 
2207     r->state->error_handler(task, r, peer);
2208 }
2209 
2210 
2211 static const nxt_conn_state_t  nxt_h1p_peer_connect_state
2212     nxt_aligned(64) =
2213 {
2214     .ready_handler = nxt_h1p_peer_connected,
2215     .close_handler = nxt_h1p_peer_refused,
2216     .error_handler = nxt_h1p_peer_error,
2217 
2218     .timer_handler = nxt_h1p_peer_send_timeout,
2219     .timer_value = nxt_h1p_peer_timer_value,
2220     .timer_data = offsetof(nxt_socket_conf_t, proxy_timeout),
2221 };
2222 
2223 
2224 static void
nxt_h1p_peer_connected(nxt_task_t * task,void * obj,void * data)2225 nxt_h1p_peer_connected(nxt_task_t *task, void *obj, void *data)
2226 {
2227     nxt_http_peer_t     *peer;
2228     nxt_http_request_t  *r;
2229 
2230     peer = data;
2231 
2232     nxt_debug(task, "h1p peer connected");
2233 
2234     r = peer->request;
2235     r->state->ready_handler(task, r, peer);
2236 }
2237 
2238 
2239 static void
nxt_h1p_peer_refused(nxt_task_t * task,void * obj,void * data)2240 nxt_h1p_peer_refused(nxt_task_t *task, void *obj, void *data)
2241 {
2242     nxt_http_peer_t     *peer;
2243     nxt_http_request_t  *r;
2244 
2245     peer = data;
2246 
2247     nxt_debug(task, "h1p peer refused");
2248 
2249     //peer->status = NXT_HTTP_SERVICE_UNAVAILABLE;
2250     peer->status = NXT_HTTP_BAD_GATEWAY;
2251 
2252     r = peer->request;
2253     r->state->error_handler(task, r, peer);
2254 }
2255 
2256 
2257 static void
nxt_h1p_peer_header_send(nxt_task_t * task,nxt_http_peer_t * peer)2258 nxt_h1p_peer_header_send(nxt_task_t *task, nxt_http_peer_t *peer)
2259 {
2260     u_char              *p;
2261     size_t              size;
2262     nxt_buf_t           *header, *body;
2263     nxt_conn_t          *c;
2264     nxt_http_field_t    *field;
2265     nxt_http_request_t  *r;
2266 
2267     nxt_debug(task, "h1p peer header send");
2268 
2269     r = peer->request;
2270 
2271     size = r->method->length + sizeof(" ") + r->target.length
2272            + sizeof(" HTTP/1.1\r\n")
2273            + sizeof("Connection: close\r\n")
2274            + sizeof("\r\n");
2275 
2276     nxt_list_each(field, r->fields) {
2277 
2278         if (!field->hopbyhop) {
2279             size += field->name_length + field->value_length;
2280             size += nxt_length(": \r\n");
2281         }
2282 
2283     } nxt_list_loop;
2284 
2285     header = nxt_http_buf_mem(task, r, size);
2286     if (nxt_slow_path(header == NULL)) {
2287         r->state->error_handler(task, r, peer);
2288         return;
2289     }
2290 
2291     p = header->mem.free;
2292 
2293     p = nxt_cpymem(p, r->method->start, r->method->length);
2294     *p++ = ' ';
2295     p = nxt_cpymem(p, r->target.start, r->target.length);
2296     p = nxt_cpymem(p, " HTTP/1.1\r\n", 11);
2297     p = nxt_cpymem(p, "Connection: close\r\n", 19);
2298 
2299     nxt_list_each(field, r->fields) {
2300 
2301         if (!field->hopbyhop) {
2302             p = nxt_cpymem(p, field->name, field->name_length);
2303             *p++ = ':'; *p++ = ' ';
2304             p = nxt_cpymem(p, field->value, field->value_length);
2305             *p++ = '\r'; *p++ = '\n';
2306         }
2307 
2308     } nxt_list_loop;
2309 
2310     *p++ = '\r'; *p++ = '\n';
2311     header->mem.free = p;
2312     size = p - header->mem.pos;
2313 
2314     c = peer->proto.h1->conn;
2315     c->write = header;
2316     c->write_state = &nxt_h1p_peer_header_send_state;
2317 
2318     if (r->body != NULL) {
2319         if (nxt_buf_is_file(r->body)) {
2320             body = nxt_buf_file_alloc(r->mem_pool, 0, 0);
2321 
2322         } else {
2323             body = nxt_buf_mem_alloc(r->mem_pool, 0, 0);
2324         }
2325 
2326         if (nxt_slow_path(body == NULL)) {
2327             r->state->error_handler(task, r, peer);
2328             return;
2329         }
2330 
2331         header->next = body;
2332 
2333         if (nxt_buf_is_file(r->body)) {
2334             body->file = r->body->file;
2335             body->file_end = r->body->file_end;
2336 
2337         } else {
2338             body->mem = r->body->mem;
2339         }
2340 
2341         size += nxt_buf_used_size(body);
2342 
2343 //        nxt_mp_retain(r->mem_pool);
2344     }
2345 
2346     if (size > 16384) {
2347         /* Use proxy_send_timeout instead of proxy_timeout. */
2348         c->write_state = &nxt_h1p_peer_header_body_send_state;
2349     }
2350 
2351     nxt_conn_write(task->thread->engine, c);
2352 }
2353 
2354 
2355 static const nxt_conn_state_t  nxt_h1p_peer_header_send_state
2356     nxt_aligned(64) =
2357 {
2358     .ready_handler = nxt_h1p_peer_header_sent,
2359     .error_handler = nxt_h1p_peer_error,
2360 
2361     .timer_handler = nxt_h1p_peer_send_timeout,
2362     .timer_value = nxt_h1p_peer_timer_value,
2363     .timer_data = offsetof(nxt_socket_conf_t, proxy_timeout),
2364 };
2365 
2366 
2367 static const nxt_conn_state_t  nxt_h1p_peer_header_body_send_state
2368     nxt_aligned(64) =
2369 {
2370     .ready_handler = nxt_h1p_peer_header_sent,
2371     .error_handler = nxt_h1p_peer_error,
2372 
2373     .timer_handler = nxt_h1p_peer_send_timeout,
2374     .timer_value = nxt_h1p_peer_timer_value,
2375     .timer_data = offsetof(nxt_socket_conf_t, proxy_send_timeout),
2376     .timer_autoreset = 1,
2377 };
2378 
2379 
2380 static void
nxt_h1p_peer_header_sent(nxt_task_t * task,void * obj,void * data)2381 nxt_h1p_peer_header_sent(nxt_task_t *task, void *obj, void *data)
2382 {
2383     nxt_conn_t          *c;
2384     nxt_http_peer_t     *peer;
2385     nxt_http_request_t  *r;
2386     nxt_event_engine_t  *engine;
2387 
2388     c = obj;
2389     peer = data;
2390 
2391     nxt_debug(task, "h1p peer header sent");
2392 
2393     engine = task->thread->engine;
2394 
2395     c->write = nxt_sendbuf_completion(task, &engine->fast_work_queue, c->write);
2396 
2397     if (c->write != NULL) {
2398         nxt_conn_write(engine, c);
2399         return;
2400     }
2401 
2402     r = peer->request;
2403     r->state->ready_handler(task, r, peer);
2404 }
2405 
2406 
2407 static void
nxt_h1p_peer_header_read(nxt_task_t * task,nxt_http_peer_t * peer)2408 nxt_h1p_peer_header_read(nxt_task_t *task, nxt_http_peer_t *peer)
2409 {
2410     nxt_conn_t  *c;
2411 
2412     nxt_debug(task, "h1p peer header read");
2413 
2414     c = peer->proto.h1->conn;
2415 
2416     if (c->write_timer.enabled) {
2417         c->read_state = &nxt_h1p_peer_header_read_state;
2418 
2419     } else {
2420         c->read_state = &nxt_h1p_peer_header_read_timer_state;
2421     }
2422 
2423     nxt_conn_read(task->thread->engine, c);
2424 }
2425 
2426 
2427 static const nxt_conn_state_t  nxt_h1p_peer_header_read_state
2428     nxt_aligned(64) =
2429 {
2430     .ready_handler = nxt_h1p_peer_header_read_done,
2431     .close_handler = nxt_h1p_peer_closed,
2432     .error_handler = nxt_h1p_peer_error,
2433 
2434     .io_read_handler = nxt_h1p_peer_io_read_handler,
2435 };
2436 
2437 
2438 static const nxt_conn_state_t  nxt_h1p_peer_header_read_timer_state
2439     nxt_aligned(64) =
2440 {
2441     .ready_handler = nxt_h1p_peer_header_read_done,
2442     .close_handler = nxt_h1p_peer_closed,
2443     .error_handler = nxt_h1p_peer_error,
2444 
2445     .io_read_handler = nxt_h1p_peer_io_read_handler,
2446 
2447     .timer_handler = nxt_h1p_peer_read_timeout,
2448     .timer_value = nxt_h1p_peer_timer_value,
2449     .timer_data = offsetof(nxt_socket_conf_t, proxy_timeout),
2450 };
2451 
2452 
2453 static ssize_t
nxt_h1p_peer_io_read_handler(nxt_task_t * task,nxt_conn_t * c)2454 nxt_h1p_peer_io_read_handler(nxt_task_t *task, nxt_conn_t *c)
2455 {
2456     size_t              size;
2457     ssize_t             n;
2458     nxt_buf_t           *b;
2459     nxt_http_peer_t     *peer;
2460     nxt_socket_conf_t   *skcf;
2461     nxt_http_request_t  *r;
2462 
2463     peer = c->socket.data;
2464     r = peer->request;
2465     b = c->read;
2466 
2467     if (b == NULL) {
2468         skcf = r->conf->socket_conf;
2469 
2470         size = (peer->header_received) ? skcf->proxy_buffer_size
2471                                        : skcf->proxy_header_buffer_size;
2472 
2473         nxt_debug(task, "h1p peer io read: %z", size);
2474 
2475         b = nxt_http_proxy_buf_mem_alloc(task, r, size);
2476         if (nxt_slow_path(b == NULL)) {
2477             c->socket.error = NXT_ENOMEM;
2478             return NXT_ERROR;
2479         }
2480     }
2481 
2482     n = c->io->recvbuf(c, b);
2483 
2484     if (n > 0) {
2485         c->read = b;
2486 
2487     } else {
2488         c->read = NULL;
2489         nxt_http_proxy_buf_mem_free(task, r, b);
2490     }
2491 
2492     return n;
2493 }
2494 
2495 
2496 static void
nxt_h1p_peer_header_read_done(nxt_task_t * task,void * obj,void * data)2497 nxt_h1p_peer_header_read_done(nxt_task_t *task, void *obj, void *data)
2498 {
2499     nxt_int_t           ret;
2500     nxt_buf_t           *b;
2501     nxt_conn_t          *c;
2502     nxt_h1proto_t       *h1p;
2503     nxt_http_peer_t     *peer;
2504     nxt_http_request_t  *r;
2505     nxt_event_engine_t  *engine;
2506 
2507     c = obj;
2508     peer = data;
2509 
2510     nxt_debug(task, "h1p peer header read done");
2511 
2512     b = c->read;
2513 
2514     ret = nxt_h1p_peer_header_parse(peer, &b->mem);
2515 
2516     r = peer->request;
2517 
2518     ret = nxt_expect(NXT_DONE, ret);
2519 
2520     if (ret != NXT_AGAIN) {
2521         engine = task->thread->engine;
2522         nxt_timer_disable(engine, &c->write_timer);
2523         nxt_timer_disable(engine, &c->read_timer);
2524     }
2525 
2526     switch (ret) {
2527 
2528     case NXT_DONE:
2529         peer->fields = peer->proto.h1->parser.fields;
2530 
2531         ret = nxt_http_fields_process(peer->fields,
2532                                       &nxt_h1p_peer_fields_hash, r);
2533         if (nxt_slow_path(ret != NXT_OK)) {
2534             peer->status = NXT_HTTP_INTERNAL_SERVER_ERROR;
2535             break;
2536         }
2537 
2538         c->read = NULL;
2539 
2540         peer->header_received = 1;
2541 
2542         h1p = peer->proto.h1;
2543 
2544         if (h1p->chunked) {
2545             if (r->resp.content_length != NULL) {
2546                 peer->status = NXT_HTTP_BAD_GATEWAY;
2547                 break;
2548             }
2549 
2550             h1p->chunked_parse.mem_pool = c->mem_pool;
2551 
2552         } else if (r->resp.content_length_n > 0) {
2553             h1p->remainder = r->resp.content_length_n;
2554         }
2555 
2556         if (nxt_buf_mem_used_size(&b->mem) != 0) {
2557             nxt_h1p_peer_body_process(task, peer, b);
2558             return;
2559         }
2560 
2561         r->state->ready_handler(task, r, peer);
2562         return;
2563 
2564     case NXT_AGAIN:
2565         if (nxt_buf_mem_free_size(&b->mem) != 0) {
2566             nxt_conn_read(task->thread->engine, c);
2567             return;
2568         }
2569 
2570         /* Fall through. */
2571 
2572     default:
2573     case NXT_ERROR:
2574     case NXT_HTTP_PARSE_INVALID:
2575     case NXT_HTTP_PARSE_UNSUPPORTED_VERSION:
2576     case NXT_HTTP_PARSE_TOO_LARGE_FIELD:
2577         peer->status = NXT_HTTP_BAD_GATEWAY;
2578         break;
2579     }
2580 
2581     nxt_http_proxy_buf_mem_free(task, r, b);
2582 
2583     r->state->error_handler(task, r, peer);
2584 }
2585 
2586 
2587 static nxt_int_t
nxt_h1p_peer_header_parse(nxt_http_peer_t * peer,nxt_buf_mem_t * bm)2588 nxt_h1p_peer_header_parse(nxt_http_peer_t *peer, nxt_buf_mem_t *bm)
2589 {
2590     u_char     *p;
2591     size_t     length;
2592     nxt_int_t  status;
2593 
2594     if (peer->status < 0) {
2595         length = nxt_buf_mem_used_size(bm);
2596 
2597         if (nxt_slow_path(length < 12)) {
2598             return NXT_AGAIN;
2599         }
2600 
2601         p = bm->pos;
2602 
2603         if (nxt_slow_path(memcmp(p, "HTTP/1.", 7) != 0
2604                           || (p[7] != '0' && p[7] != '1')))
2605         {
2606             return NXT_ERROR;
2607         }
2608 
2609         status = nxt_int_parse(&p[9], 3);
2610 
2611         if (nxt_slow_path(status < 0)) {
2612             return NXT_ERROR;
2613         }
2614 
2615         p += 12;
2616         length -= 12;
2617 
2618         p = memchr(p, '\n', length);
2619 
2620         if (nxt_slow_path(p == NULL)) {
2621             return NXT_AGAIN;
2622         }
2623 
2624         bm->pos = p + 1;
2625         peer->status = status;
2626     }
2627 
2628     return nxt_http_parse_fields(&peer->proto.h1->parser, bm);
2629 }
2630 
2631 
2632 static void
nxt_h1p_peer_read(nxt_task_t * task,nxt_http_peer_t * peer)2633 nxt_h1p_peer_read(nxt_task_t *task, nxt_http_peer_t *peer)
2634 {
2635     nxt_conn_t  *c;
2636 
2637     nxt_debug(task, "h1p peer read");
2638 
2639     c = peer->proto.h1->conn;
2640     c->read_state = &nxt_h1p_peer_read_state;
2641 
2642     nxt_conn_read(task->thread->engine, c);
2643 }
2644 
2645 
2646 static const nxt_conn_state_t  nxt_h1p_peer_read_state
2647     nxt_aligned(64) =
2648 {
2649     .ready_handler = nxt_h1p_peer_read_done,
2650     .close_handler = nxt_h1p_peer_closed,
2651     .error_handler = nxt_h1p_peer_error,
2652 
2653     .io_read_handler = nxt_h1p_peer_io_read_handler,
2654 
2655     .timer_handler = nxt_h1p_peer_read_timeout,
2656     .timer_value = nxt_h1p_peer_timer_value,
2657     .timer_data = offsetof(nxt_socket_conf_t, proxy_read_timeout),
2658     .timer_autoreset = 1,
2659 };
2660 
2661 
2662 static void
nxt_h1p_peer_read_done(nxt_task_t * task,void * obj,void * data)2663 nxt_h1p_peer_read_done(nxt_task_t *task, void *obj, void *data)
2664 {
2665     nxt_buf_t        *out;
2666     nxt_conn_t       *c;
2667     nxt_http_peer_t  *peer;
2668 
2669     c = obj;
2670     peer = data;
2671 
2672     nxt_debug(task, "h1p peer read done");
2673 
2674     out = c->read;
2675     c->read = NULL;
2676 
2677     nxt_h1p_peer_body_process(task, peer, out);
2678 }
2679 
2680 
2681 static void
nxt_h1p_peer_body_process(nxt_task_t * task,nxt_http_peer_t * peer,nxt_buf_t * out)2682 nxt_h1p_peer_body_process(nxt_task_t *task, nxt_http_peer_t *peer,
2683     nxt_buf_t *out)
2684 {
2685     size_t              length;
2686     nxt_h1proto_t       *h1p;
2687     nxt_http_request_t  *r;
2688 
2689     h1p = peer->proto.h1;
2690 
2691     if (h1p->chunked) {
2692         out = nxt_http_chunk_parse(task, &h1p->chunked_parse, out);
2693 
2694         if (h1p->chunked_parse.chunk_error || h1p->chunked_parse.error) {
2695             peer->status = NXT_HTTP_BAD_GATEWAY;
2696             r = peer->request;
2697             r->state->error_handler(task, r, peer);
2698             return;
2699         }
2700 
2701         if (h1p->chunked_parse.last) {
2702             nxt_buf_chain_add(&out, nxt_http_buf_last(peer->request));
2703             peer->closed = 1;
2704         }
2705 
2706     } else if (h1p->remainder > 0) {
2707         length = nxt_buf_chain_length(out);
2708         h1p->remainder -= length;
2709     }
2710 
2711     peer->body = out;
2712 
2713     r = peer->request;
2714     r->state->ready_handler(task, r, peer);
2715 }
2716 
2717 
2718 static void
nxt_h1p_peer_closed(nxt_task_t * task,void * obj,void * data)2719 nxt_h1p_peer_closed(nxt_task_t *task, void *obj, void *data)
2720 {
2721     nxt_http_peer_t     *peer;
2722     nxt_http_request_t  *r;
2723 
2724     peer = data;
2725 
2726     nxt_debug(task, "h1p peer closed");
2727 
2728     r = peer->request;
2729 
2730     if (peer->header_received) {
2731         peer->body = nxt_http_buf_last(r);
2732         peer->closed = 1;
2733         r->inconsistent = (peer->proto.h1->remainder != 0);
2734 
2735         r->state->ready_handler(task, r, peer);
2736 
2737     } else {
2738         peer->status = NXT_HTTP_BAD_GATEWAY;
2739 
2740         r->state->error_handler(task, r, peer);
2741     }
2742 }
2743 
2744 
2745 static void
nxt_h1p_peer_error(nxt_task_t * task,void * obj,void * data)2746 nxt_h1p_peer_error(nxt_task_t *task, void *obj, void *data)
2747 {
2748     nxt_http_peer_t     *peer;
2749     nxt_http_request_t  *r;
2750 
2751     peer = data;
2752 
2753     nxt_debug(task, "h1p peer error");
2754 
2755     peer->status = NXT_HTTP_BAD_GATEWAY;
2756 
2757     r = peer->request;
2758     r->state->error_handler(task, r, peer);
2759 }
2760 
2761 
2762 static void
nxt_h1p_peer_send_timeout(nxt_task_t * task,void * obj,void * data)2763 nxt_h1p_peer_send_timeout(nxt_task_t *task, void *obj, void *data)
2764 {
2765     nxt_conn_t          *c;
2766     nxt_timer_t         *timer;
2767     nxt_http_peer_t     *peer;
2768     nxt_http_request_t  *r;
2769 
2770     timer = obj;
2771 
2772     nxt_debug(task, "h1p peer send timeout");
2773 
2774     c = nxt_write_timer_conn(timer);
2775     c->block_write = 1;
2776     c->block_read = 1;
2777 
2778     peer = c->socket.data;
2779     peer->status = NXT_HTTP_GATEWAY_TIMEOUT;
2780 
2781     r = peer->request;
2782     r->state->error_handler(task, r, peer);
2783 }
2784 
2785 
2786 static void
nxt_h1p_peer_read_timeout(nxt_task_t * task,void * obj,void * data)2787 nxt_h1p_peer_read_timeout(nxt_task_t *task, void *obj, void *data)
2788 {
2789     nxt_conn_t          *c;
2790     nxt_timer_t         *timer;
2791     nxt_http_peer_t     *peer;
2792     nxt_http_request_t  *r;
2793 
2794     timer = obj;
2795 
2796     nxt_debug(task, "h1p peer read timeout");
2797 
2798     c = nxt_read_timer_conn(timer);
2799     c->block_write = 1;
2800     c->block_read = 1;
2801 
2802     peer = c->socket.data;
2803     peer->status = NXT_HTTP_GATEWAY_TIMEOUT;
2804 
2805     r = peer->request;
2806     r->state->error_handler(task, r, peer);
2807 }
2808 
2809 
2810 static nxt_msec_t
nxt_h1p_peer_timer_value(nxt_conn_t * c,uintptr_t data)2811 nxt_h1p_peer_timer_value(nxt_conn_t *c, uintptr_t data)
2812 {
2813     nxt_http_peer_t  *peer;
2814 
2815     peer = c->socket.data;
2816 
2817     return nxt_value_at(nxt_msec_t, peer->request->conf->socket_conf, data);
2818 }
2819 
2820 
2821 static void
nxt_h1p_peer_close(nxt_task_t * task,nxt_http_peer_t * peer)2822 nxt_h1p_peer_close(nxt_task_t *task, nxt_http_peer_t *peer)
2823 {
2824     nxt_conn_t  *c;
2825 
2826     nxt_debug(task, "h1p peer close");
2827 
2828     peer->closed = 1;
2829 
2830     c = peer->proto.h1->conn;
2831     task = &c->task;
2832     c->socket.task = task;
2833     c->read_timer.task = task;
2834     c->write_timer.task = task;
2835 
2836     if (c->socket.fd != -1) {
2837         c->write_state = &nxt_h1p_peer_close_state;
2838 
2839         nxt_conn_close(task->thread->engine, c);
2840 
2841     } else {
2842         nxt_h1p_peer_free(task, c, NULL);
2843     }
2844 }
2845 
2846 
2847 static const nxt_conn_state_t  nxt_h1p_peer_close_state
2848     nxt_aligned(64) =
2849 {
2850     .ready_handler = nxt_h1p_peer_free,
2851 };
2852 
2853 
2854 static void
nxt_h1p_peer_free(nxt_task_t * task,void * obj,void * data)2855 nxt_h1p_peer_free(nxt_task_t *task, void *obj, void *data)
2856 {
2857     nxt_conn_t  *c;
2858 
2859     c = obj;
2860 
2861     nxt_debug(task, "h1p peer free");
2862 
2863     nxt_conn_free(task, c);
2864 }
2865 
2866 
2867 static nxt_int_t
nxt_h1p_peer_transfer_encoding(void * ctx,nxt_http_field_t * field,uintptr_t data)2868 nxt_h1p_peer_transfer_encoding(void *ctx, nxt_http_field_t *field,
2869     uintptr_t data)
2870 {
2871     nxt_http_request_t  *r;
2872 
2873     r = ctx;
2874     field->skip = 1;
2875 
2876     if (field->value_length == 7
2877         && memcmp(field->value, "chunked", 7) == 0)
2878     {
2879         r->peer->proto.h1->chunked = 1;
2880     }
2881 
2882     return NXT_OK;
2883 }
2884