xref: /unit/src/nxt_h1proto.c (revision 1318:f237e8c553fd)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_router.h>
8 #include <nxt_http.h>
9 #include <nxt_h1proto.h>
10 #include <nxt_websocket.h>
11 #include <nxt_websocket_header.h>
12 
13 
14 /*
15  * nxt_http_conn_ and nxt_h1p_conn_ prefixes are used for connection handlers.
16  * nxt_h1p_idle_ prefix is used for idle connection handlers.
17  * nxt_h1p_request_ prefix is used for HTTP/1 protocol request methods.
18  */
19 
20 #if (NXT_TLS)
21 static ssize_t nxt_http_idle_io_read_handler(nxt_task_t *task, nxt_conn_t *c);
22 static void nxt_http_conn_test(nxt_task_t *task, void *obj, void *data);
23 #endif
24 static ssize_t nxt_h1p_idle_io_read_handler(nxt_task_t *task, nxt_conn_t *c);
25 static void nxt_h1p_conn_proto_init(nxt_task_t *task, void *obj, void *data);
26 static void nxt_h1p_conn_request_init(nxt_task_t *task, void *obj, void *data);
27 static void nxt_h1p_conn_request_header_parse(nxt_task_t *task, void *obj,
28     void *data);
29 static nxt_int_t nxt_h1p_header_process(nxt_task_t *task, nxt_h1proto_t *h1p,
30     nxt_http_request_t *r);
31 static nxt_int_t nxt_h1p_header_buffer_test(nxt_task_t *task,
32     nxt_h1proto_t *h1p, nxt_conn_t *c, nxt_socket_conf_t *skcf);
33 static nxt_int_t nxt_h1p_connection(void *ctx, nxt_http_field_t *field,
34     uintptr_t data);
35 static nxt_int_t nxt_h1p_upgrade(void *ctx, nxt_http_field_t *field,
36     uintptr_t data);
37 static nxt_int_t nxt_h1p_websocket_key(void *ctx, nxt_http_field_t *field,
38     uintptr_t data);
39 static nxt_int_t nxt_h1p_websocket_version(void *ctx, nxt_http_field_t *field,
40     uintptr_t data);
41 static nxt_int_t nxt_h1p_transfer_encoding(void *ctx, nxt_http_field_t *field,
42     uintptr_t data);
43 static void nxt_h1p_request_body_read(nxt_task_t *task, nxt_http_request_t *r);
44 static void nxt_h1p_conn_request_body_read(nxt_task_t *task, void *obj,
45     void *data);
46 static void nxt_h1p_request_local_addr(nxt_task_t *task, nxt_http_request_t *r);
47 static void nxt_h1p_request_header_send(nxt_task_t *task,
48     nxt_http_request_t *r, nxt_work_handler_t body_handler, void *data);
49 static void nxt_h1p_request_send(nxt_task_t *task, nxt_http_request_t *r,
50     nxt_buf_t *out);
51 static nxt_buf_t *nxt_h1p_chunk_create(nxt_task_t *task, nxt_http_request_t *r,
52     nxt_buf_t *out);
53 static nxt_off_t nxt_h1p_request_body_bytes_sent(nxt_task_t *task,
54     nxt_http_proto_t proto);
55 static void nxt_h1p_request_discard(nxt_task_t *task, nxt_http_request_t *r,
56     nxt_buf_t *last);
57 static void nxt_h1p_conn_request_error(nxt_task_t *task, void *obj, void *data);
58 static void nxt_h1p_conn_request_timeout(nxt_task_t *task, void *obj,
59     void *data);
60 static void nxt_h1p_conn_request_send_timeout(nxt_task_t *task, void *obj,
61     void *data);
62 nxt_inline void nxt_h1p_request_error(nxt_task_t *task, nxt_h1proto_t *h1p,
63     nxt_http_request_t *r);
64 static void nxt_h1p_request_close(nxt_task_t *task, nxt_http_proto_t proto,
65     nxt_socket_conf_joint_t *joint);
66 static void nxt_h1p_conn_sent(nxt_task_t *task, void *obj, void *data);
67 static void nxt_h1p_conn_close(nxt_task_t *task, void *obj, void *data);
68 static void nxt_h1p_conn_error(nxt_task_t *task, void *obj, void *data);
69 static nxt_msec_t nxt_h1p_conn_timer_value(nxt_conn_t *c, uintptr_t data);
70 static void nxt_h1p_keepalive(nxt_task_t *task, nxt_h1proto_t *h1p,
71     nxt_conn_t *c);
72 static void nxt_h1p_idle_close(nxt_task_t *task, void *obj, void *data);
73 static void nxt_h1p_idle_timeout(nxt_task_t *task, void *obj, void *data);
74 static void nxt_h1p_idle_response(nxt_task_t *task, nxt_conn_t *c);
75 static void nxt_h1p_idle_response_sent(nxt_task_t *task, void *obj, void *data);
76 static void nxt_h1p_idle_response_timeout(nxt_task_t *task, void *obj,
77     void *data);
78 static nxt_msec_t nxt_h1p_idle_response_timer_value(nxt_conn_t *c,
79     uintptr_t data);
80 static void nxt_h1p_shutdown(nxt_task_t *task, nxt_conn_t *c);
81 static void nxt_h1p_closing(nxt_task_t *task, nxt_conn_t *c);
82 static void nxt_h1p_conn_ws_shutdown(nxt_task_t *task, void *obj, void *data);
83 static void nxt_h1p_conn_closing(nxt_task_t *task, void *obj, void *data);
84 static void nxt_h1p_conn_free(nxt_task_t *task, void *obj, void *data);
85 
86 static void nxt_h1p_peer_connect(nxt_task_t *task, nxt_http_peer_t *peer);
87 static void nxt_h1p_peer_connected(nxt_task_t *task, void *obj, void *data);
88 static void nxt_h1p_peer_refused(nxt_task_t *task, void *obj, void *data);
89 static void nxt_h1p_peer_header_send(nxt_task_t *task, nxt_http_peer_t *peer);
90 static void nxt_h1p_peer_header_sent(nxt_task_t *task, void *obj, void *data);
91 static void nxt_h1p_peer_header_read(nxt_task_t *task, nxt_http_peer_t *peer);
92 static ssize_t nxt_h1p_peer_io_read_handler(nxt_task_t *task, nxt_conn_t *c);
93 static void nxt_h1p_peer_header_read_done(nxt_task_t *task, void *obj,
94     void *data);
95 static nxt_int_t nxt_h1p_peer_header_parse(nxt_http_peer_t *peer,
96     nxt_buf_mem_t *bm);
97 static void nxt_h1p_peer_read(nxt_task_t *task, nxt_http_peer_t *peer);
98 static void nxt_h1p_peer_read_done(nxt_task_t *task, void *obj, void *data);
99 static void nxt_h1p_peer_closed(nxt_task_t *task, void *obj, void *data);
100 static void nxt_h1p_peer_error(nxt_task_t *task, void *obj, void *data);
101 static void nxt_h1p_peer_send_timeout(nxt_task_t *task, void *obj, void *data);
102 static void nxt_h1p_peer_read_timeout(nxt_task_t *task, void *obj, void *data);
103 static nxt_msec_t nxt_h1p_peer_timer_value(nxt_conn_t *c, uintptr_t data);
104 static void nxt_h1p_peer_close(nxt_task_t *task, nxt_http_peer_t *peer);
105 static void nxt_h1p_peer_free(nxt_task_t *task, void *obj, void *data);
106 
107 #if (NXT_TLS)
108 static const nxt_conn_state_t  nxt_http_idle_state;
109 static const nxt_conn_state_t  nxt_h1p_shutdown_state;
110 #endif
111 static const nxt_conn_state_t  nxt_h1p_idle_state;
112 static const nxt_conn_state_t  nxt_h1p_header_parse_state;
113 static const nxt_conn_state_t  nxt_h1p_read_body_state;
114 static const nxt_conn_state_t  nxt_h1p_request_send_state;
115 static const nxt_conn_state_t  nxt_h1p_timeout_response_state;
116 static const nxt_conn_state_t  nxt_h1p_keepalive_state;
117 static const nxt_conn_state_t  nxt_h1p_close_state;
118 static const nxt_conn_state_t  nxt_h1p_peer_connect_state;
119 static const nxt_conn_state_t  nxt_h1p_peer_header_send_state;
120 static const nxt_conn_state_t  nxt_h1p_peer_header_body_send_state;
121 static const nxt_conn_state_t  nxt_h1p_peer_header_read_state;
122 static const nxt_conn_state_t  nxt_h1p_peer_header_read_timer_state;
123 static const nxt_conn_state_t  nxt_h1p_peer_read_state;
124 static const nxt_conn_state_t  nxt_h1p_peer_close_state;
125 
126 
127 const nxt_http_proto_table_t  nxt_http_proto[3] = {
128     /* NXT_HTTP_PROTO_H1 */
129     {
130         .body_read        = nxt_h1p_request_body_read,
131         .local_addr       = nxt_h1p_request_local_addr,
132         .header_send      = nxt_h1p_request_header_send,
133         .send             = nxt_h1p_request_send,
134         .body_bytes_sent  = nxt_h1p_request_body_bytes_sent,
135         .discard          = nxt_h1p_request_discard,
136         .close            = nxt_h1p_request_close,
137 
138         .peer_connect     = nxt_h1p_peer_connect,
139         .peer_header_send = nxt_h1p_peer_header_send,
140         .peer_header_read = nxt_h1p_peer_header_read,
141         .peer_read        = nxt_h1p_peer_read,
142         .peer_close       = nxt_h1p_peer_close,
143 
144         .ws_frame_start   = nxt_h1p_websocket_frame_start,
145     },
146     /* NXT_HTTP_PROTO_H2      */
147     /* NXT_HTTP_PROTO_DEVNULL */
148 };
149 
150 
151 static nxt_lvlhsh_t                    nxt_h1p_fields_hash;
152 
153 static nxt_http_field_proc_t           nxt_h1p_fields[] = {
154     { nxt_string("Connection"),        &nxt_h1p_connection, 0 },
155     { nxt_string("Upgrade"),           &nxt_h1p_upgrade, 0 },
156     { nxt_string("Sec-WebSocket-Key"), &nxt_h1p_websocket_key, 0 },
157     { nxt_string("Sec-WebSocket-Version"),
158                                        &nxt_h1p_websocket_version, 0 },
159     { nxt_string("Transfer-Encoding"), &nxt_h1p_transfer_encoding, 0 },
160 
161     { nxt_string("Host"),              &nxt_http_request_host, 0 },
162     { nxt_string("Cookie"),            &nxt_http_request_field,
163         offsetof(nxt_http_request_t, cookie) },
164     { nxt_string("Referer"),           &nxt_http_request_field,
165         offsetof(nxt_http_request_t, referer) },
166     { nxt_string("User-Agent"),        &nxt_http_request_field,
167         offsetof(nxt_http_request_t, user_agent) },
168     { nxt_string("Content-Type"),      &nxt_http_request_field,
169         offsetof(nxt_http_request_t, content_type) },
170     { nxt_string("Content-Length"),    &nxt_http_request_content_length, 0 },
171 };
172 
173 
174 static nxt_lvlhsh_t                    nxt_h1p_peer_fields_hash;
175 
176 static nxt_http_field_proc_t           nxt_h1p_peer_fields[] = {
177     { nxt_string("Connection"),        &nxt_http_proxy_skip, 0 },
178     { nxt_string("Transfer-Encoding"), &nxt_http_proxy_skip, 0 },
179     { nxt_string("Server"),            &nxt_http_proxy_skip, 0 },
180     { nxt_string("Date"),              &nxt_http_proxy_date, 0 },
181     { nxt_string("Content-Length"),    &nxt_http_proxy_content_length, 0 },
182 };
183 
184 
185 nxt_int_t
186 nxt_h1p_init(nxt_task_t *task, nxt_runtime_t *rt)
187 {
188     nxt_int_t  ret;
189 
190     ret = nxt_http_fields_hash(&nxt_h1p_fields_hash, rt->mem_pool,
191                                nxt_h1p_fields, nxt_nitems(nxt_h1p_fields));
192 
193     if (nxt_fast_path(ret == NXT_OK)) {
194         ret = nxt_http_fields_hash(&nxt_h1p_peer_fields_hash,
195                                    rt->mem_pool, nxt_h1p_peer_fields,
196                                    nxt_nitems(nxt_h1p_peer_fields));
197     }
198 
199     return ret;
200 }
201 
202 
203 void
204 nxt_http_conn_init(nxt_task_t *task, void *obj, void *data)
205 {
206     nxt_conn_t               *c;
207     nxt_socket_conf_t        *skcf;
208     nxt_event_engine_t       *engine;
209     nxt_listen_event_t       *lev;
210     nxt_socket_conf_joint_t  *joint;
211 
212     c = obj;
213     lev = data;
214 
215     nxt_debug(task, "http conn init");
216 
217     joint = lev->socket.data;
218     skcf = joint->socket_conf;
219     c->local = skcf->sockaddr;
220 
221     engine = task->thread->engine;
222     c->read_work_queue = &engine->fast_work_queue;
223     c->write_work_queue = &engine->fast_work_queue;
224 
225     c->read_state = &nxt_h1p_idle_state;
226 
227 #if (NXT_TLS)
228     if (skcf->tls != NULL) {
229         c->read_state = &nxt_http_idle_state;
230     }
231 #endif
232 
233     nxt_conn_read(engine, c);
234 }
235 
236 
237 #if (NXT_TLS)
238 
239 static const nxt_conn_state_t  nxt_http_idle_state
240     nxt_aligned(64) =
241 {
242     .ready_handler = nxt_http_conn_test,
243     .close_handler = nxt_h1p_conn_close,
244     .error_handler = nxt_h1p_conn_error,
245 
246     .io_read_handler = nxt_http_idle_io_read_handler,
247 
248     .timer_handler = nxt_h1p_idle_timeout,
249     .timer_value = nxt_h1p_conn_timer_value,
250     .timer_data = offsetof(nxt_socket_conf_t, idle_timeout),
251 };
252 
253 
254 static ssize_t
255 nxt_http_idle_io_read_handler(nxt_task_t *task, nxt_conn_t *c)
256 {
257     size_t                   size;
258     ssize_t                  n;
259     nxt_buf_t                *b;
260     nxt_socket_conf_joint_t  *joint;
261 
262     joint = c->listen->socket.data;
263 
264     if (nxt_slow_path(joint == NULL)) {
265         /*
266          * Listening socket had been closed while
267          * connection was in keep-alive state.
268          */
269         c->read_state = &nxt_h1p_idle_close_state;
270         return 0;
271     }
272 
273     size = joint->socket_conf->header_buffer_size;
274 
275     b = nxt_event_engine_buf_mem_alloc(task->thread->engine, size);
276     if (nxt_slow_path(b == NULL)) {
277         c->socket.error = NXT_ENOMEM;
278         return NXT_ERROR;
279     }
280 
281     /*
282      * 1 byte is enough to distinguish between SSLv3/TLS and plain HTTP.
283      * 11 bytes are enough to log supported SSLv3/TLS version.
284      * 16 bytes are just for more optimized kernel copy-out operation.
285      */
286     n = c->io->recv(c, b->mem.pos, 16, MSG_PEEK);
287 
288     if (n > 0) {
289         c->read = b;
290 
291     } else {
292         c->read = NULL;
293         nxt_event_engine_buf_mem_free(task->thread->engine, b);
294     }
295 
296     return n;
297 }
298 
299 
300 static void
301 nxt_http_conn_test(nxt_task_t *task, void *obj, void *data)
302 {
303     u_char                   *p;
304     nxt_buf_t                *b;
305     nxt_conn_t               *c;
306     nxt_tls_conf_t           *tls;
307     nxt_event_engine_t       *engine;
308     nxt_socket_conf_joint_t  *joint;
309 
310     c = obj;
311 
312     nxt_debug(task, "h1p conn https test");
313 
314     engine = task->thread->engine;
315     b = c->read;
316     p = b->mem.pos;
317 
318     c->read_state = &nxt_h1p_idle_state;
319 
320     if (p[0] != 0x16) {
321         b->mem.free = b->mem.pos;
322 
323         nxt_conn_read(engine, c);
324         return;
325     }
326 
327     /* SSLv3/TLS ClientHello message. */
328 
329 #if (NXT_DEBUG)
330     if (nxt_buf_mem_used_size(&b->mem) >= 11) {
331         u_char      major, minor;
332         const char  *protocol;
333 
334         major = p[9];
335         minor = p[10];
336 
337         if (major == 3) {
338             if (minor == 0) {
339                 protocol = "SSLv";
340 
341             } else {
342                 protocol = "TLSv";
343                 major -= 2;
344                 minor -= 1;
345             }
346 
347             nxt_debug(task, "SSL/TLS: %s%ud.%ud", protocol, major, minor);
348         }
349     }
350 #endif
351 
352     c->read = NULL;
353     nxt_event_engine_buf_mem_free(engine, b);
354 
355     joint = c->listen->socket.data;
356 
357     if (nxt_slow_path(joint == NULL)) {
358         /*
359          * Listening socket had been closed while
360          * connection was in keep-alive state.
361          */
362         nxt_h1p_closing(task, c);
363         return;
364     }
365 
366     tls = joint->socket_conf->tls;
367 
368     tls->conn_init(task, tls, c);
369 }
370 
371 #endif
372 
373 
374 static const nxt_conn_state_t  nxt_h1p_idle_state
375     nxt_aligned(64) =
376 {
377     .ready_handler = nxt_h1p_conn_proto_init,
378     .close_handler = nxt_h1p_conn_close,
379     .error_handler = nxt_h1p_conn_error,
380 
381     .io_read_handler = nxt_h1p_idle_io_read_handler,
382 
383     .timer_handler = nxt_h1p_idle_timeout,
384     .timer_value = nxt_h1p_conn_timer_value,
385     .timer_data = offsetof(nxt_socket_conf_t, idle_timeout),
386     .timer_autoreset = 1,
387 };
388 
389 
390 static ssize_t
391 nxt_h1p_idle_io_read_handler(nxt_task_t *task, nxt_conn_t *c)
392 {
393     size_t                   size;
394     ssize_t                  n;
395     nxt_buf_t                *b;
396     nxt_socket_conf_joint_t  *joint;
397 
398     joint = c->listen->socket.data;
399 
400     if (nxt_slow_path(joint == NULL)) {
401         /*
402          * Listening socket had been closed while
403          * connection was in keep-alive state.
404          */
405         c->read_state = &nxt_h1p_idle_close_state;
406         return 0;
407     }
408 
409     b = c->read;
410 
411     if (b == NULL) {
412         size = joint->socket_conf->header_buffer_size;
413 
414         b = nxt_event_engine_buf_mem_alloc(task->thread->engine, size);
415         if (nxt_slow_path(b == NULL)) {
416             c->socket.error = NXT_ENOMEM;
417             return NXT_ERROR;
418         }
419     }
420 
421     n = c->io->recvbuf(c, b);
422 
423     if (n > 0) {
424         c->read = b;
425 
426     } else {
427         c->read = NULL;
428         nxt_event_engine_buf_mem_free(task->thread->engine, b);
429     }
430 
431     return n;
432 }
433 
434 
435 static void
436 nxt_h1p_conn_proto_init(nxt_task_t *task, void *obj, void *data)
437 {
438     nxt_conn_t     *c;
439     nxt_h1proto_t  *h1p;
440 
441     c = obj;
442 
443     nxt_debug(task, "h1p conn proto init");
444 
445     h1p = nxt_mp_zget(c->mem_pool, sizeof(nxt_h1proto_t));
446     if (nxt_slow_path(h1p == NULL)) {
447         nxt_h1p_closing(task, c);
448         return;
449     }
450 
451     c->socket.data = h1p;
452     h1p->conn = c;
453 
454     nxt_h1p_conn_request_init(task, c, h1p);
455 }
456 
457 
458 static void
459 nxt_h1p_conn_request_init(nxt_task_t *task, void *obj, void *data)
460 {
461     nxt_int_t                ret;
462     nxt_conn_t               *c;
463     nxt_h1proto_t            *h1p;
464     nxt_http_request_t       *r;
465     nxt_socket_conf_joint_t  *joint;
466 
467     c = obj;
468     h1p = data;
469 
470     nxt_debug(task, "h1p conn request init");
471 
472     r = nxt_http_request_create(task);
473 
474     if (nxt_fast_path(r != NULL)) {
475         h1p->request = r;
476         r->proto.h1 = h1p;
477 
478         /* r->protocol = NXT_HTTP_PROTO_H1 is done by zeroing. */
479         r->remote = c->remote;
480 
481 #if (NXT_TLS)
482         r->tls = c->u.tls;
483 #endif
484 
485         r->task = c->task;
486         task = &r->task;
487         c->socket.task = task;
488         c->read_timer.task = task;
489         c->write_timer.task = task;
490 
491         ret = nxt_http_parse_request_init(&h1p->parser, r->mem_pool);
492 
493         if (nxt_fast_path(ret == NXT_OK)) {
494             joint = c->listen->socket.data;
495             joint->count++;
496 
497             r->conf = joint;
498             c->local = joint->socket_conf->sockaddr;
499 
500             nxt_h1p_conn_request_header_parse(task, c, h1p);
501             return;
502         }
503 
504         /*
505          * The request is very incomplete here,
506          * so "internal server error" useless here.
507          */
508         nxt_mp_release(r->mem_pool);
509     }
510 
511     nxt_h1p_closing(task, c);
512 }
513 
514 
515 static const nxt_conn_state_t  nxt_h1p_header_parse_state
516     nxt_aligned(64) =
517 {
518     .ready_handler = nxt_h1p_conn_request_header_parse,
519     .close_handler = nxt_h1p_conn_request_error,
520     .error_handler = nxt_h1p_conn_request_error,
521 
522     .timer_handler = nxt_h1p_conn_request_timeout,
523     .timer_value = nxt_h1p_conn_request_timer_value,
524     .timer_data = offsetof(nxt_socket_conf_t, header_read_timeout),
525 };
526 
527 
528 static void
529 nxt_h1p_conn_request_header_parse(nxt_task_t *task, void *obj, void *data)
530 {
531     nxt_int_t           ret;
532     nxt_conn_t          *c;
533     nxt_h1proto_t       *h1p;
534     nxt_http_status_t   status;
535     nxt_http_request_t  *r;
536 
537     c = obj;
538     h1p = data;
539 
540     nxt_debug(task, "h1p conn header parse");
541 
542     ret = nxt_http_parse_request(&h1p->parser, &c->read->mem);
543 
544     ret = nxt_expect(NXT_DONE, ret);
545 
546     if (ret != NXT_AGAIN) {
547         nxt_timer_disable(task->thread->engine, &c->read_timer);
548     }
549 
550     r = h1p->request;
551 
552     switch (ret) {
553 
554     case NXT_DONE:
555         /*
556          * By default the keepalive mode is disabled in HTTP/1.0 and
557          * enabled in HTTP/1.1.  The mode can be overridden later by
558          * the "Connection" field processed in nxt_h1p_connection().
559          */
560         h1p->keepalive = (h1p->parser.version.s.minor != '0');
561 
562         ret = nxt_h1p_header_process(task, h1p, r);
563 
564         if (nxt_fast_path(ret == NXT_OK)) {
565 
566 #if (NXT_TLS)
567             if (c->u.tls == NULL && r->conf->socket_conf->tls != NULL) {
568                 status = NXT_HTTP_TO_HTTPS;
569                 goto error;
570             }
571 #endif
572 
573             r->state->ready_handler(task, r, NULL);
574             return;
575         }
576 
577         status = ret;
578         goto error;
579 
580     case NXT_AGAIN:
581         status = nxt_h1p_header_buffer_test(task, h1p, c, r->conf->socket_conf);
582 
583         if (nxt_fast_path(status == NXT_OK)) {
584             c->read_state = &nxt_h1p_header_parse_state;
585 
586             nxt_conn_read(task->thread->engine, c);
587             return;
588         }
589 
590         break;
591 
592     case NXT_HTTP_PARSE_INVALID:
593         status = NXT_HTTP_BAD_REQUEST;
594         break;
595 
596     case NXT_HTTP_PARSE_UNSUPPORTED_VERSION:
597         status = NXT_HTTP_VERSION_NOT_SUPPORTED;
598         break;
599 
600     case NXT_HTTP_PARSE_TOO_LARGE_FIELD:
601         status = NXT_HTTP_REQUEST_HEADER_FIELDS_TOO_LARGE;
602         break;
603 
604     default:
605     case NXT_ERROR:
606         status = NXT_HTTP_INTERNAL_SERVER_ERROR;
607         break;
608     }
609 
610     (void) nxt_h1p_header_process(task, h1p, r);
611 
612 error:
613 
614     h1p->keepalive = 0;
615 
616     nxt_http_request_error(task, r, status);
617 }
618 
619 
620 static nxt_int_t
621 nxt_h1p_header_process(nxt_task_t *task, nxt_h1proto_t *h1p,
622     nxt_http_request_t *r)
623 {
624     u_char     *m;
625     nxt_int_t  ret;
626 
627     r->target.start = h1p->parser.target_start;
628     r->target.length = h1p->parser.target_end - h1p->parser.target_start;
629 
630     if (h1p->parser.version.ui64 != 0) {
631         r->version.start = h1p->parser.version.str;
632         r->version.length = sizeof(h1p->parser.version.str);
633     }
634 
635     r->method = &h1p->parser.method;
636     r->path = &h1p->parser.path;
637     r->args = &h1p->parser.args;
638 
639     r->fields = h1p->parser.fields;
640 
641     ret = nxt_http_fields_process(r->fields, &nxt_h1p_fields_hash, r);
642     if (nxt_slow_path(ret != NXT_OK)) {
643         return ret;
644     }
645 
646     if (h1p->connection_upgrade && h1p->upgrade_websocket) {
647         m = h1p->parser.method.start;
648 
649         if (nxt_slow_path(h1p->parser.method.length != 3
650                           || m[0] != 'G'
651                           || m[1] != 'E'
652                           || m[2] != 'T'))
653         {
654             nxt_log(task, NXT_LOG_INFO, "h1p upgrade: bad method");
655 
656             return NXT_HTTP_BAD_REQUEST;
657         }
658 
659         if (nxt_slow_path(h1p->parser.version.s.minor != '1')) {
660             nxt_log(task, NXT_LOG_INFO, "h1p upgrade: bad protocol version");
661 
662             return NXT_HTTP_BAD_REQUEST;
663         }
664 
665         if (nxt_slow_path(h1p->websocket_key == NULL)) {
666             nxt_log(task, NXT_LOG_INFO,
667                     "h1p upgrade: bad or absent websocket key");
668 
669             return NXT_HTTP_BAD_REQUEST;
670         }
671 
672         if (nxt_slow_path(h1p->websocket_version_ok == 0)) {
673             nxt_log(task, NXT_LOG_INFO,
674                     "h1p upgrade: bad or absent websocket version");
675 
676             return NXT_HTTP_UPGRADE_REQUIRED;
677         }
678 
679         r->websocket_handshake = 1;
680     }
681 
682     return ret;
683 }
684 
685 
686 static nxt_int_t
687 nxt_h1p_header_buffer_test(nxt_task_t *task, nxt_h1proto_t *h1p, nxt_conn_t *c,
688     nxt_socket_conf_t *skcf)
689 {
690     size_t     size, used;
691     nxt_buf_t  *in, *b;
692 
693     in = c->read;
694 
695     if (nxt_buf_mem_free_size(&in->mem) == 0) {
696         size = skcf->large_header_buffer_size;
697         used = nxt_buf_mem_used_size(&in->mem);
698 
699         if (size <= used || h1p->nbuffers >= skcf->large_header_buffers) {
700             return NXT_HTTP_REQUEST_HEADER_FIELDS_TOO_LARGE;
701         }
702 
703         b = nxt_buf_mem_alloc(c->mem_pool, size, 0);
704         if (nxt_slow_path(b == NULL)) {
705             return NXT_HTTP_INTERNAL_SERVER_ERROR;
706         }
707 
708         b->mem.free = nxt_cpymem(b->mem.pos, in->mem.pos, used);
709 
710         in->next = h1p->buffers;
711         h1p->buffers = in;
712         h1p->nbuffers++;
713 
714         c->read = b;
715     }
716 
717     return NXT_OK;
718 }
719 
720 
721 static nxt_int_t
722 nxt_h1p_connection(void *ctx, nxt_http_field_t *field, uintptr_t data)
723 {
724     nxt_http_request_t  *r;
725 
726     r = ctx;
727     field->hopbyhop = 1;
728 
729     if (field->value_length == 5 && nxt_memcmp(field->value, "close", 5) == 0) {
730         r->proto.h1->keepalive = 0;
731 
732     } else if (field->value_length == 7
733                && nxt_memcasecmp(field->value, "upgrade", 7) == 0)
734     {
735         r->proto.h1->connection_upgrade = 1;
736     }
737 
738     return NXT_OK;
739 }
740 
741 
742 static nxt_int_t
743 nxt_h1p_upgrade(void *ctx, nxt_http_field_t *field, uintptr_t data)
744 {
745     nxt_http_request_t  *r;
746 
747     r = ctx;
748 
749     if (field->value_length == 9
750         && nxt_memcasecmp(field->value, "websocket", 9) == 0)
751     {
752         r->proto.h1->upgrade_websocket = 1;
753     }
754 
755     return NXT_OK;
756 }
757 
758 
759 static nxt_int_t
760 nxt_h1p_websocket_key(void *ctx, nxt_http_field_t *field, uintptr_t data)
761 {
762     nxt_http_request_t  *r;
763 
764     r = ctx;
765 
766     if (field->value_length == 24) {
767         r->proto.h1->websocket_key = field;
768     }
769 
770     return NXT_OK;
771 }
772 
773 
774 static nxt_int_t
775 nxt_h1p_websocket_version(void *ctx, nxt_http_field_t *field, uintptr_t data)
776 {
777     nxt_http_request_t  *r;
778 
779     r = ctx;
780 
781     if (field->value_length == 2
782         && field->value[0] == '1' && field->value[1] == '3')
783     {
784         r->proto.h1->websocket_version_ok = 1;
785     }
786 
787     return NXT_OK;
788 }
789 
790 
791 static nxt_int_t
792 nxt_h1p_transfer_encoding(void *ctx, nxt_http_field_t *field, uintptr_t data)
793 {
794     nxt_http_te_t       te;
795     nxt_http_request_t  *r;
796 
797     r = ctx;
798     field->skip = 1;
799     field->hopbyhop = 1;
800 
801     if (field->value_length == 7
802         && nxt_memcmp(field->value, "chunked", 7) == 0)
803     {
804         te = NXT_HTTP_TE_CHUNKED;
805 
806     } else {
807         te = NXT_HTTP_TE_UNSUPPORTED;
808     }
809 
810     r->proto.h1->transfer_encoding = te;
811 
812     return NXT_OK;
813 }
814 
815 
816 static void
817 nxt_h1p_request_body_read(nxt_task_t *task, nxt_http_request_t *r)
818 {
819     size_t             size, body_length;
820     nxt_buf_t          *in, *b;
821     nxt_conn_t         *c;
822     nxt_h1proto_t      *h1p;
823     nxt_http_status_t  status;
824 
825     h1p = r->proto.h1;
826 
827     nxt_debug(task, "h1p request body read %O te:%d",
828               r->content_length_n, h1p->transfer_encoding);
829 
830     switch (h1p->transfer_encoding) {
831 
832     case NXT_HTTP_TE_CHUNKED:
833         status = NXT_HTTP_LENGTH_REQUIRED;
834         goto error;
835 
836     case NXT_HTTP_TE_UNSUPPORTED:
837         status = NXT_HTTP_NOT_IMPLEMENTED;
838         goto error;
839 
840     default:
841     case NXT_HTTP_TE_NONE:
842         break;
843     }
844 
845     if (r->content_length_n == -1 || r->content_length_n == 0) {
846         goto ready;
847     }
848 
849     if (r->content_length_n > (nxt_off_t) r->conf->socket_conf->max_body_size) {
850         status = NXT_HTTP_PAYLOAD_TOO_LARGE;
851         goto error;
852     }
853 
854     body_length = (size_t) r->content_length_n;
855 
856     b = r->body;
857 
858     if (b == NULL) {
859         b = nxt_buf_mem_alloc(r->mem_pool, body_length, 0);
860         if (nxt_slow_path(b == NULL)) {
861             status = NXT_HTTP_INTERNAL_SERVER_ERROR;
862             goto error;
863         }
864 
865         r->body = b;
866     }
867 
868     in = h1p->conn->read;
869 
870     size = nxt_buf_mem_used_size(&in->mem);
871 
872     if (size != 0) {
873         if (size > body_length) {
874             size = body_length;
875         }
876 
877         b->mem.free = nxt_cpymem(b->mem.free, in->mem.pos, size);
878         in->mem.pos += size;
879     }
880 
881     size = nxt_buf_mem_free_size(&b->mem);
882 
883     nxt_debug(task, "h1p body rest: %uz", size);
884 
885     if (size != 0) {
886         in->next = h1p->buffers;
887         h1p->buffers = in;
888         h1p->nbuffers++;
889 
890         c = h1p->conn;
891         c->read = b;
892         c->read_state = &nxt_h1p_read_body_state;
893 
894         nxt_conn_read(task->thread->engine, c);
895         return;
896     }
897 
898 ready:
899 
900     r->state->ready_handler(task, r, NULL);
901 
902     return;
903 
904 error:
905 
906     h1p->keepalive = 0;
907 
908     nxt_http_request_error(task, r, status);
909 }
910 
911 
912 static const nxt_conn_state_t  nxt_h1p_read_body_state
913     nxt_aligned(64) =
914 {
915     .ready_handler = nxt_h1p_conn_request_body_read,
916     .close_handler = nxt_h1p_conn_request_error,
917     .error_handler = nxt_h1p_conn_request_error,
918 
919     .timer_handler = nxt_h1p_conn_request_timeout,
920     .timer_value = nxt_h1p_conn_request_timer_value,
921     .timer_data = offsetof(nxt_socket_conf_t, body_read_timeout),
922     .timer_autoreset = 1,
923 };
924 
925 
926 static void
927 nxt_h1p_conn_request_body_read(nxt_task_t *task, void *obj, void *data)
928 {
929     size_t              size;
930     nxt_conn_t          *c;
931     nxt_h1proto_t       *h1p;
932     nxt_http_request_t  *r;
933     nxt_event_engine_t  *engine;
934 
935     c = obj;
936     h1p = data;
937 
938     nxt_debug(task, "h1p conn request body read");
939 
940     size = nxt_buf_mem_free_size(&c->read->mem);
941 
942     nxt_debug(task, "h1p body rest: %uz", size);
943 
944     engine = task->thread->engine;
945 
946     if (size != 0) {
947         nxt_conn_read(engine, c);
948 
949     } else {
950         c->read = NULL;
951         r = h1p->request;
952 
953         r->state->ready_handler(task, r, NULL);
954     }
955 }
956 
957 
958 static void
959 nxt_h1p_request_local_addr(nxt_task_t *task, nxt_http_request_t *r)
960 {
961     r->local = nxt_conn_local_addr(task, r->proto.h1->conn);
962 }
963 
964 
965 #define NXT_HTTP_LAST_INFORMATIONAL                                           \
966     (NXT_HTTP_CONTINUE + nxt_nitems(nxt_http_informational) - 1)
967 
968 static const nxt_str_t  nxt_http_informational[] = {
969     nxt_string("HTTP/1.1 100 Continue\r\n"),
970     nxt_string("HTTP/1.1 101 Switching Protocols\r\n"),
971 };
972 
973 
974 #define NXT_HTTP_LAST_SUCCESS                                                 \
975     (NXT_HTTP_OK + nxt_nitems(nxt_http_success) - 1)
976 
977 static const nxt_str_t  nxt_http_success[] = {
978     nxt_string("HTTP/1.1 200 OK\r\n"),
979     nxt_string("HTTP/1.1 201 Created\r\n"),
980     nxt_string("HTTP/1.1 202 Accepted\r\n"),
981     nxt_string("HTTP/1.1 203 Non-Authoritative Information\r\n"),
982     nxt_string("HTTP/1.1 204 No Content\r\n"),
983     nxt_string("HTTP/1.1 205 Reset Content\r\n"),
984     nxt_string("HTTP/1.1 206 Partial Content\r\n"),
985 };
986 
987 
988 #define NXT_HTTP_LAST_REDIRECTION                                             \
989     (NXT_HTTP_MULTIPLE_CHOICES + nxt_nitems(nxt_http_redirection) - 1)
990 
991 static const nxt_str_t  nxt_http_redirection[] = {
992     nxt_string("HTTP/1.1 300 Multiple Choices\r\n"),
993     nxt_string("HTTP/1.1 301 Moved Permanently\r\n"),
994     nxt_string("HTTP/1.1 302 Found\r\n"),
995     nxt_string("HTTP/1.1 303 See Other\r\n"),
996     nxt_string("HTTP/1.1 304 Not Modified\r\n"),
997 };
998 
999 
1000 #define NXT_HTTP_LAST_CLIENT_ERROR                                            \
1001     (NXT_HTTP_BAD_REQUEST + nxt_nitems(nxt_http_client_error) - 1)
1002 
1003 static const nxt_str_t  nxt_http_client_error[] = {
1004     nxt_string("HTTP/1.1 400 Bad Request\r\n"),
1005     nxt_string("HTTP/1.1 401 Unauthorized\r\n"),
1006     nxt_string("HTTP/1.1 402 Payment Required\r\n"),
1007     nxt_string("HTTP/1.1 403 Forbidden\r\n"),
1008     nxt_string("HTTP/1.1 404 Not Found\r\n"),
1009     nxt_string("HTTP/1.1 405 Method Not Allowed\r\n"),
1010     nxt_string("HTTP/1.1 406 Not Acceptable\r\n"),
1011     nxt_string("HTTP/1.1 407 Proxy Authentication Required\r\n"),
1012     nxt_string("HTTP/1.1 408 Request Timeout\r\n"),
1013     nxt_string("HTTP/1.1 409 Conflict\r\n"),
1014     nxt_string("HTTP/1.1 410 Gone\r\n"),
1015     nxt_string("HTTP/1.1 411 Length Required\r\n"),
1016     nxt_string("HTTP/1.1 412 Precondition Failed\r\n"),
1017     nxt_string("HTTP/1.1 413 Payload Too Large\r\n"),
1018     nxt_string("HTTP/1.1 414 URI Too Long\r\n"),
1019     nxt_string("HTTP/1.1 415 Unsupported Media Type\r\n"),
1020     nxt_string("HTTP/1.1 416 Range Not Satisfiable\r\n"),
1021     nxt_string("HTTP/1.1 417 Expectation Failed\r\n"),
1022     nxt_string("HTTP/1.1 418\r\n"),
1023     nxt_string("HTTP/1.1 419\r\n"),
1024     nxt_string("HTTP/1.1 420\r\n"),
1025     nxt_string("HTTP/1.1 421\r\n"),
1026     nxt_string("HTTP/1.1 422\r\n"),
1027     nxt_string("HTTP/1.1 423\r\n"),
1028     nxt_string("HTTP/1.1 424\r\n"),
1029     nxt_string("HTTP/1.1 425\r\n"),
1030     nxt_string("HTTP/1.1 426 Upgrade Required\r\n"),
1031     nxt_string("HTTP/1.1 427\r\n"),
1032     nxt_string("HTTP/1.1 428\r\n"),
1033     nxt_string("HTTP/1.1 429\r\n"),
1034     nxt_string("HTTP/1.1 430\r\n"),
1035     nxt_string("HTTP/1.1 431 Request Header Fields Too Large\r\n"),
1036 };
1037 
1038 
1039 #define NXT_HTTP_LAST_NGINX_ERROR                                             \
1040     (NXT_HTTP_TO_HTTPS + nxt_nitems(nxt_http_nginx_error) - 1)
1041 
1042 static const nxt_str_t  nxt_http_nginx_error[] = {
1043     nxt_string("HTTP/1.1 400 "
1044                "The plain HTTP request was sent to HTTPS port\r\n"),
1045 };
1046 
1047 
1048 #define NXT_HTTP_LAST_SERVER_ERROR                                            \
1049     (NXT_HTTP_INTERNAL_SERVER_ERROR + nxt_nitems(nxt_http_server_error) - 1)
1050 
1051 static const nxt_str_t  nxt_http_server_error[] = {
1052     nxt_string("HTTP/1.1 500 Internal Server Error\r\n"),
1053     nxt_string("HTTP/1.1 501 Not Implemented\r\n"),
1054     nxt_string("HTTP/1.1 502 Bad Gateway\r\n"),
1055     nxt_string("HTTP/1.1 503 Service Unavailable\r\n"),
1056     nxt_string("HTTP/1.1 504 Gateway Timeout\r\n"),
1057     nxt_string("HTTP/1.1 505 HTTP Version Not Supported\r\n"),
1058 };
1059 
1060 
1061 #define UNKNOWN_STATUS_LENGTH  nxt_length("HTTP/1.1 65536\r\n")
1062 
1063 static void
1064 nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r,
1065     nxt_work_handler_t body_handler, void *data)
1066 {
1067     u_char              *p;
1068     size_t              size;
1069     nxt_buf_t           *header;
1070     nxt_str_t           unknown_status;
1071     nxt_int_t           conn;
1072     nxt_uint_t          n;
1073     nxt_bool_t          http11;
1074     nxt_conn_t          *c;
1075     nxt_h1proto_t       *h1p;
1076     const nxt_str_t     *status;
1077     nxt_http_field_t    *field;
1078     u_char              buf[UNKNOWN_STATUS_LENGTH];
1079 
1080     static const char   chunked[] = "Transfer-Encoding: chunked\r\n";
1081     static const char   websocket_version[] = "Sec-WebSocket-Version: 13\r\n";
1082 
1083     static const nxt_str_t  connection[3] = {
1084         nxt_string("Connection: close\r\n"),
1085         nxt_string("Connection: keep-alive\r\n"),
1086         nxt_string("Upgrade: websocket\r\n"
1087                    "Connection: Upgrade\r\n"
1088                    "Sec-WebSocket-Accept: "),
1089     };
1090 
1091     nxt_debug(task, "h1p request header send");
1092 
1093     r->header_sent = 1;
1094     h1p = r->proto.h1;
1095     n = r->status;
1096 
1097     if (n >= NXT_HTTP_CONTINUE && n <= NXT_HTTP_LAST_INFORMATIONAL) {
1098         status = &nxt_http_informational[n - NXT_HTTP_CONTINUE];
1099 
1100     } else if (n >= NXT_HTTP_OK && n <= NXT_HTTP_LAST_SUCCESS) {
1101         status = &nxt_http_success[n - NXT_HTTP_OK];
1102 
1103     } else if (n >= NXT_HTTP_MULTIPLE_CHOICES
1104                && n <= NXT_HTTP_LAST_REDIRECTION)
1105     {
1106         status = &nxt_http_redirection[n - NXT_HTTP_MULTIPLE_CHOICES];
1107 
1108     } else if (n >= NXT_HTTP_BAD_REQUEST && n <= NXT_HTTP_LAST_CLIENT_ERROR) {
1109         status = &nxt_http_client_error[n - NXT_HTTP_BAD_REQUEST];
1110 
1111     } else if (n >= NXT_HTTP_TO_HTTPS && n <= NXT_HTTP_LAST_NGINX_ERROR) {
1112         status = &nxt_http_nginx_error[n - NXT_HTTP_TO_HTTPS];
1113 
1114     } else if (n >= NXT_HTTP_INTERNAL_SERVER_ERROR
1115                && n <= NXT_HTTP_LAST_SERVER_ERROR)
1116     {
1117         status = &nxt_http_server_error[n - NXT_HTTP_INTERNAL_SERVER_ERROR];
1118 
1119     } else {
1120         p = nxt_sprintf(buf, buf + UNKNOWN_STATUS_LENGTH,
1121                         "HTTP/1.1 %03d\r\n", n);
1122 
1123         unknown_status.length = p - buf;
1124         unknown_status.start = buf;
1125         status = &unknown_status;
1126     }
1127 
1128     size = status->length;
1129     /* Trailing CRLF at the end of header. */
1130     size += nxt_length("\r\n");
1131 
1132     conn = -1;
1133 
1134     if (r->websocket_handshake && n == NXT_HTTP_SWITCHING_PROTOCOLS) {
1135         h1p->websocket = 1;
1136         h1p->keepalive = 0;
1137         conn = 2;
1138         size += NXT_WEBSOCKET_ACCEPT_SIZE + 2;
1139 
1140     } else {
1141         http11 = (h1p->parser.version.s.minor != '0');
1142 
1143         if (r->resp.content_length == NULL || r->resp.content_length->skip) {
1144 
1145             if (http11) {
1146                 if (n != NXT_HTTP_NOT_MODIFIED
1147                     && n != NXT_HTTP_NO_CONTENT
1148                     && body_handler != NULL
1149                     && !h1p->websocket)
1150                 {
1151                     h1p->chunked = 1;
1152                     size += nxt_length(chunked);
1153                     /* Trailing CRLF will be added by the first chunk header. */
1154                     size -= nxt_length("\r\n");
1155                 }
1156 
1157             } else {
1158                 h1p->keepalive = 0;
1159             }
1160         }
1161 
1162         if (http11 ^ h1p->keepalive) {
1163             conn = h1p->keepalive;
1164         }
1165     }
1166 
1167     if (conn >= 0) {
1168         size += connection[conn].length;
1169     }
1170 
1171     nxt_list_each(field, r->resp.fields) {
1172 
1173         if (!field->skip) {
1174             size += field->name_length + field->value_length;
1175             size += nxt_length(": \r\n");
1176         }
1177 
1178     } nxt_list_loop;
1179 
1180     if (nxt_slow_path(n == NXT_HTTP_UPGRADE_REQUIRED)) {
1181         size += nxt_length(websocket_version);
1182     }
1183 
1184     header = nxt_http_buf_mem(task, r, size);
1185     if (nxt_slow_path(header == NULL)) {
1186         nxt_h1p_request_error(task, h1p, r);
1187         return;
1188     }
1189 
1190     p = nxt_cpymem(header->mem.free, status->start, status->length);
1191 
1192     nxt_list_each(field, r->resp.fields) {
1193 
1194         if (!field->skip) {
1195             p = nxt_cpymem(p, field->name, field->name_length);
1196             *p++ = ':'; *p++ = ' ';
1197             p = nxt_cpymem(p, field->value, field->value_length);
1198             *p++ = '\r'; *p++ = '\n';
1199         }
1200 
1201     } nxt_list_loop;
1202 
1203     if (conn >= 0) {
1204         p = nxt_cpymem(p, connection[conn].start, connection[conn].length);
1205     }
1206 
1207     if (h1p->websocket) {
1208         nxt_websocket_accept(p, h1p->websocket_key->value);
1209         p += NXT_WEBSOCKET_ACCEPT_SIZE;
1210 
1211         *p++ = '\r'; *p++ = '\n';
1212     }
1213 
1214     if (nxt_slow_path(n == NXT_HTTP_UPGRADE_REQUIRED)) {
1215         p = nxt_cpymem(p, websocket_version, nxt_length(websocket_version));
1216     }
1217 
1218     if (h1p->chunked) {
1219         p = nxt_cpymem(p, chunked, nxt_length(chunked));
1220         /* Trailing CRLF will be added by the first chunk header. */
1221 
1222     } else {
1223         *p++ = '\r'; *p++ = '\n';
1224     }
1225 
1226     header->mem.free = p;
1227 
1228     h1p->header_size = nxt_buf_mem_used_size(&header->mem);
1229 
1230     c = h1p->conn;
1231 
1232     c->write = header;
1233     h1p->conn_write_tail = &header->next;
1234     c->write_state = &nxt_h1p_request_send_state;
1235 
1236     if (body_handler != NULL) {
1237         /*
1238          * The body handler will run before c->io->write() handler,
1239          * because the latter was inqueued by nxt_conn_write()
1240          * in engine->write_work_queue.
1241          */
1242         nxt_work_queue_add(&task->thread->engine->fast_work_queue,
1243                            body_handler, task, r, data);
1244 
1245     } else {
1246         header->next = nxt_http_buf_last(r);
1247     }
1248 
1249     nxt_conn_write(task->thread->engine, c);
1250 
1251     if (h1p->websocket) {
1252         nxt_h1p_websocket_first_frame_start(task, r, c->read);
1253     }
1254 }
1255 
1256 
1257 void
1258 nxt_h1p_complete_buffers(nxt_task_t *task, nxt_h1proto_t *h1p)
1259 {
1260     size_t         size;
1261     nxt_buf_t      *b, *in, *next;
1262     nxt_conn_t     *c;
1263 
1264     nxt_debug(task, "h1p complete buffers");
1265 
1266     b = h1p->buffers;
1267     c = h1p->conn;
1268     in = c->read;
1269 
1270     if (b != NULL) {
1271         if (in == NULL) {
1272             /* A request with large body. */
1273             in = b;
1274             c->read = in;
1275 
1276             b = in->next;
1277             in->next = NULL;
1278         }
1279 
1280         while (b != NULL) {
1281             next = b->next;
1282             b->next = NULL;
1283 
1284             nxt_work_queue_add(&task->thread->engine->fast_work_queue,
1285                                b->completion_handler, task, b, b->parent);
1286 
1287             b = next;
1288         }
1289 
1290         h1p->buffers = NULL;
1291         h1p->nbuffers = 0;
1292     }
1293 
1294     if (in != NULL) {
1295         size = nxt_buf_mem_used_size(&in->mem);
1296 
1297         if (size == 0) {
1298             nxt_work_queue_add(&task->thread->engine->fast_work_queue,
1299                                in->completion_handler, task, in, in->parent);
1300 
1301             c->read = NULL;
1302         }
1303     }
1304 }
1305 
1306 
1307 static const nxt_conn_state_t  nxt_h1p_request_send_state
1308     nxt_aligned(64) =
1309 {
1310     .ready_handler = nxt_h1p_conn_sent,
1311     .error_handler = nxt_h1p_conn_request_error,
1312 
1313     .timer_handler = nxt_h1p_conn_request_send_timeout,
1314     .timer_value = nxt_h1p_conn_request_timer_value,
1315     .timer_data = offsetof(nxt_socket_conf_t, send_timeout),
1316     .timer_autoreset = 1,
1317 };
1318 
1319 
1320 static void
1321 nxt_h1p_request_send(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *out)
1322 {
1323     nxt_conn_t     *c;
1324     nxt_h1proto_t  *h1p;
1325 
1326     nxt_debug(task, "h1p request send");
1327 
1328     h1p = r->proto.h1;
1329     c = h1p->conn;
1330 
1331     if (h1p->chunked) {
1332         out = nxt_h1p_chunk_create(task, r, out);
1333         if (nxt_slow_path(out == NULL)) {
1334             nxt_h1p_request_error(task, h1p, r);
1335             return;
1336         }
1337     }
1338 
1339     if (c->write == NULL) {
1340         c->write = out;
1341         c->write_state = &nxt_h1p_request_send_state;
1342 
1343         nxt_conn_write(task->thread->engine, c);
1344 
1345     } else {
1346         *h1p->conn_write_tail = out;
1347     }
1348 
1349     while (out->next != NULL) {
1350         out = out->next;
1351     }
1352 
1353     h1p->conn_write_tail = &out->next;
1354 }
1355 
1356 
1357 static nxt_buf_t *
1358 nxt_h1p_chunk_create(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *out)
1359 {
1360     nxt_off_t          size;
1361     nxt_buf_t          *b, **prev, *header, *tail;
1362 
1363     const size_t       chunk_size = 2 * nxt_length("\r\n") + NXT_OFF_T_HEXLEN;
1364     static const char  tail_chunk[] = "\r\n0\r\n\r\n";
1365 
1366     size = 0;
1367     prev = &out;
1368 
1369     for (b = out; b != NULL; b = b->next) {
1370 
1371         if (nxt_buf_is_last(b)) {
1372             tail = nxt_http_buf_mem(task, r, sizeof(tail_chunk));
1373             if (nxt_slow_path(tail == NULL)) {
1374                 return NULL;
1375             }
1376 
1377             *prev = tail;
1378             tail->next = b;
1379             /*
1380              * The tail_chunk size with trailing zero is 8 bytes, so
1381              * memcpy may be inlined with just single 8 byte move operation.
1382              */
1383             nxt_memcpy(tail->mem.free, tail_chunk, sizeof(tail_chunk));
1384             tail->mem.free += nxt_length(tail_chunk);
1385 
1386             break;
1387         }
1388 
1389         size += nxt_buf_used_size(b);
1390         prev = &b->next;
1391     }
1392 
1393     if (size == 0) {
1394         return out;
1395     }
1396 
1397     header = nxt_http_buf_mem(task, r, chunk_size);
1398     if (nxt_slow_path(header == NULL)) {
1399         return NULL;
1400     }
1401 
1402     header->next = out;
1403     header->mem.free = nxt_sprintf(header->mem.free, header->mem.end,
1404                                    "\r\n%xO\r\n", size);
1405     return header;
1406 }
1407 
1408 
1409 static nxt_off_t
1410 nxt_h1p_request_body_bytes_sent(nxt_task_t *task, nxt_http_proto_t proto)
1411 {
1412     nxt_off_t      sent;
1413     nxt_h1proto_t  *h1p;
1414 
1415     h1p = proto.h1;
1416 
1417     sent = h1p->conn->sent - h1p->header_size;
1418 
1419     return (sent > 0) ? sent : 0;
1420 }
1421 
1422 
1423 static void
1424 nxt_h1p_request_discard(nxt_task_t *task, nxt_http_request_t *r,
1425     nxt_buf_t *last)
1426 {
1427     nxt_buf_t         *b;
1428     nxt_conn_t        *c;
1429     nxt_h1proto_t     *h1p;
1430     nxt_work_queue_t  *wq;
1431 
1432     nxt_debug(task, "h1p request discard");
1433 
1434     h1p = r->proto.h1;
1435     h1p->keepalive = 0;
1436 
1437     c = h1p->conn;
1438     b = c->write;
1439     c->write = NULL;
1440 
1441     wq = &task->thread->engine->fast_work_queue;
1442 
1443     nxt_sendbuf_drain(task, wq, b);
1444     nxt_sendbuf_drain(task, wq, last);
1445 }
1446 
1447 
1448 static void
1449 nxt_h1p_conn_request_error(nxt_task_t *task, void *obj, void *data)
1450 {
1451     nxt_h1proto_t       *h1p;
1452     nxt_http_request_t  *r;
1453 
1454     h1p = data;
1455 
1456     nxt_debug(task, "h1p conn request error");
1457 
1458     r = h1p->request;
1459 
1460     if (nxt_slow_path(r == NULL)) {
1461         nxt_h1p_shutdown(task, h1p->conn);
1462         return;
1463     }
1464 
1465     if (r->fields == NULL) {
1466         (void) nxt_h1p_header_process(task, h1p, r);
1467     }
1468 
1469     if (r->status == 0) {
1470         r->status = NXT_HTTP_BAD_REQUEST;
1471     }
1472 
1473     nxt_h1p_request_error(task, h1p, r);
1474 }
1475 
1476 
1477 static void
1478 nxt_h1p_conn_request_timeout(nxt_task_t *task, void *obj, void *data)
1479 {
1480     nxt_conn_t          *c;
1481     nxt_timer_t         *timer;
1482     nxt_h1proto_t       *h1p;
1483     nxt_http_request_t  *r;
1484 
1485     timer = obj;
1486 
1487     nxt_debug(task, "h1p conn request timeout");
1488 
1489     c = nxt_read_timer_conn(timer);
1490     c->block_read = 1;
1491     /*
1492      * Disable SO_LINGER off during socket closing
1493      * to send "408 Request Timeout" error response.
1494      */
1495     c->socket.timedout = 0;
1496 
1497     h1p = c->socket.data;
1498     h1p->keepalive = 0;
1499     r = h1p->request;
1500 
1501     if (r->fields == NULL) {
1502         (void) nxt_h1p_header_process(task, h1p, r);
1503     }
1504 
1505     nxt_http_request_error(task, r, NXT_HTTP_REQUEST_TIMEOUT);
1506 }
1507 
1508 
1509 static void
1510 nxt_h1p_conn_request_send_timeout(nxt_task_t *task, void *obj, void *data)
1511 {
1512     nxt_conn_t     *c;
1513     nxt_timer_t    *timer;
1514     nxt_h1proto_t  *h1p;
1515 
1516     timer = obj;
1517 
1518     nxt_debug(task, "h1p conn request send timeout");
1519 
1520     c = nxt_write_timer_conn(timer);
1521     c->block_write = 1;
1522     h1p = c->socket.data;
1523 
1524     nxt_h1p_request_error(task, h1p, h1p->request);
1525 }
1526 
1527 
1528 nxt_msec_t
1529 nxt_h1p_conn_request_timer_value(nxt_conn_t *c, uintptr_t data)
1530 {
1531     nxt_h1proto_t  *h1p;
1532 
1533     h1p = c->socket.data;
1534 
1535     return nxt_value_at(nxt_msec_t, h1p->request->conf->socket_conf, data);
1536 }
1537 
1538 
1539 nxt_inline void
1540 nxt_h1p_request_error(nxt_task_t *task, nxt_h1proto_t *h1p,
1541     nxt_http_request_t *r)
1542 {
1543     h1p->keepalive = 0;
1544 
1545     r->state->error_handler(task, r, h1p);
1546 }
1547 
1548 
1549 static void
1550 nxt_h1p_request_close(nxt_task_t *task, nxt_http_proto_t proto,
1551     nxt_socket_conf_joint_t *joint)
1552 {
1553     nxt_conn_t     *c;
1554     nxt_h1proto_t  *h1p;
1555 
1556     nxt_debug(task, "h1p request close");
1557 
1558     h1p = proto.h1;
1559     h1p->keepalive &= !h1p->request->inconsistent;
1560     h1p->request = NULL;
1561 
1562     nxt_router_conf_release(task, joint);
1563 
1564     c = h1p->conn;
1565     task = &c->task;
1566     c->socket.task = task;
1567     c->read_timer.task = task;
1568     c->write_timer.task = task;
1569 
1570     if (h1p->keepalive) {
1571         nxt_h1p_keepalive(task, h1p, c);
1572 
1573     } else {
1574         nxt_h1p_shutdown(task, c);
1575     }
1576 }
1577 
1578 
1579 static void
1580 nxt_h1p_conn_sent(nxt_task_t *task, void *obj, void *data)
1581 {
1582     nxt_conn_t          *c;
1583     nxt_event_engine_t  *engine;
1584 
1585     c = obj;
1586 
1587     nxt_debug(task, "h1p conn sent");
1588 
1589     engine = task->thread->engine;
1590 
1591     c->write = nxt_sendbuf_completion(task, &engine->fast_work_queue, c->write);
1592 
1593     if (c->write != NULL) {
1594         nxt_conn_write(engine, c);
1595     }
1596 }
1597 
1598 
1599 static void
1600 nxt_h1p_conn_close(nxt_task_t *task, void *obj, void *data)
1601 {
1602     nxt_conn_t  *c;
1603 
1604     c = obj;
1605 
1606     nxt_debug(task, "h1p conn close");
1607 
1608     nxt_h1p_shutdown(task, c);
1609 }
1610 
1611 
1612 static void
1613 nxt_h1p_conn_error(nxt_task_t *task, void *obj, void *data)
1614 {
1615     nxt_conn_t  *c;
1616 
1617     c = obj;
1618 
1619     nxt_debug(task, "h1p conn error");
1620 
1621     nxt_h1p_shutdown(task, c);
1622 }
1623 
1624 
1625 static nxt_msec_t
1626 nxt_h1p_conn_timer_value(nxt_conn_t *c, uintptr_t data)
1627 {
1628     nxt_socket_conf_joint_t  *joint;
1629 
1630     joint = c->listen->socket.data;
1631 
1632     return nxt_value_at(nxt_msec_t, joint->socket_conf, data);
1633 }
1634 
1635 
1636 static void
1637 nxt_h1p_keepalive(nxt_task_t *task, nxt_h1proto_t *h1p, nxt_conn_t *c)
1638 {
1639     size_t     size;
1640     nxt_buf_t  *in;
1641 
1642     nxt_debug(task, "h1p keepalive");
1643 
1644     if (!c->tcp_nodelay) {
1645         nxt_conn_tcp_nodelay_on(task, c);
1646     }
1647 
1648     nxt_h1p_complete_buffers(task, h1p);
1649 
1650     in = c->read;
1651 
1652     nxt_memzero(h1p, offsetof(nxt_h1proto_t, conn));
1653 
1654     c->sent = 0;
1655 
1656     if (in == NULL) {
1657         c->read_state = &nxt_h1p_keepalive_state;
1658 
1659         nxt_conn_read(task->thread->engine, c);
1660 
1661     } else {
1662         size = nxt_buf_mem_used_size(&in->mem);
1663 
1664         nxt_debug(task, "h1p pipelining");
1665 
1666         nxt_memmove(in->mem.start, in->mem.pos, size);
1667 
1668         in->mem.pos = in->mem.start;
1669         in->mem.free = in->mem.start + size;
1670 
1671         nxt_h1p_conn_request_init(task, c, c->socket.data);
1672     }
1673 }
1674 
1675 
1676 static const nxt_conn_state_t  nxt_h1p_keepalive_state
1677     nxt_aligned(64) =
1678 {
1679     .ready_handler = nxt_h1p_conn_request_init,
1680     .close_handler = nxt_h1p_conn_close,
1681     .error_handler = nxt_h1p_conn_error,
1682 
1683     .io_read_handler = nxt_h1p_idle_io_read_handler,
1684 
1685     .timer_handler = nxt_h1p_idle_timeout,
1686     .timer_value = nxt_h1p_conn_timer_value,
1687     .timer_data = offsetof(nxt_socket_conf_t, idle_timeout),
1688     .timer_autoreset = 1,
1689 };
1690 
1691 
1692 const nxt_conn_state_t  nxt_h1p_idle_close_state
1693     nxt_aligned(64) =
1694 {
1695     .close_handler = nxt_h1p_idle_close,
1696 };
1697 
1698 
1699 static void
1700 nxt_h1p_idle_close(nxt_task_t *task, void *obj, void *data)
1701 {
1702     nxt_conn_t  *c;
1703 
1704     c = obj;
1705 
1706     nxt_debug(task, "h1p idle close");
1707 
1708     nxt_h1p_idle_response(task, c);
1709 }
1710 
1711 
1712 static void
1713 nxt_h1p_idle_timeout(nxt_task_t *task, void *obj, void *data)
1714 {
1715     nxt_conn_t   *c;
1716     nxt_timer_t  *timer;
1717 
1718     timer = obj;
1719 
1720     nxt_debug(task, "h1p idle timeout");
1721 
1722     c = nxt_read_timer_conn(timer);
1723     c->block_read = 1;
1724 
1725     nxt_h1p_idle_response(task, c);
1726 }
1727 
1728 
1729 #define NXT_H1P_IDLE_TIMEOUT                                                  \
1730      "HTTP/1.1 408 Request Timeout\r\n"                                       \
1731      "Server: " NXT_SERVER "\r\n"                                             \
1732      "Connection: close\r\n"                                                  \
1733      "Content-Length: 0\r\n"                                                  \
1734      "Date: "
1735 
1736 
1737 static void
1738 nxt_h1p_idle_response(nxt_task_t *task, nxt_conn_t *c)
1739 {
1740     u_char         *p;
1741     size_t         size;
1742     nxt_buf_t      *out, *last;
1743     nxt_h1proto_t  *h1p;
1744 
1745     size = nxt_length(NXT_H1P_IDLE_TIMEOUT)
1746            + nxt_http_date_cache.size
1747            + nxt_length("\r\n\r\n");
1748 
1749     out = nxt_buf_mem_alloc(c->mem_pool, size, 0);
1750     if (nxt_slow_path(out == NULL)) {
1751         goto fail;
1752     }
1753 
1754     p = nxt_cpymem(out->mem.free, NXT_H1P_IDLE_TIMEOUT,
1755                    nxt_length(NXT_H1P_IDLE_TIMEOUT));
1756 
1757     p = nxt_thread_time_string(task->thread, &nxt_http_date_cache, p);
1758 
1759     out->mem.free = nxt_cpymem(p, "\r\n\r\n", 4);
1760 
1761     last = nxt_mp_zget(c->mem_pool, NXT_BUF_SYNC_SIZE);
1762     if (nxt_slow_path(last == NULL)) {
1763         goto fail;
1764     }
1765 
1766     out->next = last;
1767     nxt_buf_set_sync(last);
1768     nxt_buf_set_last(last);
1769 
1770     last->completion_handler = nxt_h1p_idle_response_sent;
1771     last->parent = c;
1772 
1773     h1p = c->socket.data;
1774     h1p->conn_write_tail = &last->next;
1775 
1776     c->write = out;
1777     c->write_state = &nxt_h1p_timeout_response_state;
1778 
1779     nxt_conn_write(task->thread->engine, c);
1780     return;
1781 
1782 fail:
1783 
1784     nxt_h1p_shutdown(task, c);
1785 }
1786 
1787 
1788 static const nxt_conn_state_t  nxt_h1p_timeout_response_state
1789     nxt_aligned(64) =
1790 {
1791     .ready_handler = nxt_h1p_conn_sent,
1792     .error_handler = nxt_h1p_conn_error,
1793 
1794     .timer_handler = nxt_h1p_idle_response_timeout,
1795     .timer_value = nxt_h1p_idle_response_timer_value,
1796 };
1797 
1798 
1799 static void
1800 nxt_h1p_idle_response_sent(nxt_task_t *task, void *obj, void *data)
1801 {
1802     nxt_conn_t  *c;
1803 
1804     c = data;
1805 
1806     nxt_debug(task, "h1p idle timeout response sent");
1807 
1808     nxt_h1p_shutdown(task, c);
1809 }
1810 
1811 
1812 static void
1813 nxt_h1p_idle_response_timeout(nxt_task_t *task, void *obj, void *data)
1814 {
1815     nxt_conn_t   *c;
1816     nxt_timer_t  *timer;
1817 
1818     timer = obj;
1819 
1820     nxt_debug(task, "h1p idle timeout response timeout");
1821 
1822     c = nxt_read_timer_conn(timer);
1823     c->block_write = 1;
1824 
1825     nxt_h1p_shutdown(task, c);
1826 }
1827 
1828 
1829 static nxt_msec_t
1830 nxt_h1p_idle_response_timer_value(nxt_conn_t *c, uintptr_t data)
1831 {
1832     return 10 * 1000;
1833 }
1834 
1835 
1836 static void
1837 nxt_h1p_shutdown(nxt_task_t *task, nxt_conn_t *c)
1838 {
1839     nxt_timer_t    *timer;
1840     nxt_h1proto_t  *h1p;
1841 
1842     nxt_debug(task, "h1p shutdown");
1843 
1844     h1p = c->socket.data;
1845 
1846     if (nxt_slow_path(h1p != NULL && h1p->websocket_timer != NULL)) {
1847         timer = &h1p->websocket_timer->timer;
1848 
1849         if (timer->handler != nxt_h1p_conn_ws_shutdown) {
1850             timer->handler = nxt_h1p_conn_ws_shutdown;
1851             nxt_timer_add(task->thread->engine, timer, 0);
1852 
1853         } else {
1854             nxt_debug(task, "h1p already scheduled ws shutdown");
1855         }
1856 
1857     } else {
1858         nxt_h1p_closing(task, c);
1859     }
1860 }
1861 
1862 
1863 static void
1864 nxt_h1p_conn_ws_shutdown(nxt_task_t *task, void *obj, void *data)
1865 {
1866     nxt_timer_t                *timer;
1867     nxt_h1p_websocket_timer_t  *ws_timer;
1868 
1869     nxt_debug(task, "h1p conn ws shutdown");
1870 
1871     timer = obj;
1872     ws_timer = nxt_timer_data(timer, nxt_h1p_websocket_timer_t, timer);
1873 
1874     nxt_h1p_closing(task, ws_timer->h1p->conn);
1875 }
1876 
1877 
1878 static void
1879 nxt_h1p_closing(nxt_task_t *task, nxt_conn_t *c)
1880 {
1881     nxt_debug(task, "h1p closing");
1882 
1883     c->socket.data = NULL;
1884 
1885 #if (NXT_TLS)
1886 
1887     if (c->u.tls != NULL) {
1888         c->write_state = &nxt_h1p_shutdown_state;
1889 
1890         c->io->shutdown(task, c, NULL);
1891         return;
1892     }
1893 
1894 #endif
1895 
1896     nxt_h1p_conn_closing(task, c, NULL);
1897 }
1898 
1899 
1900 #if (NXT_TLS)
1901 
1902 static const nxt_conn_state_t  nxt_h1p_shutdown_state
1903     nxt_aligned(64) =
1904 {
1905     .ready_handler = nxt_h1p_conn_closing,
1906     .close_handler = nxt_h1p_conn_closing,
1907     .error_handler = nxt_h1p_conn_closing,
1908 };
1909 
1910 #endif
1911 
1912 
1913 static void
1914 nxt_h1p_conn_closing(nxt_task_t *task, void *obj, void *data)
1915 {
1916     nxt_conn_t  *c;
1917 
1918     c = obj;
1919 
1920     nxt_debug(task, "h1p conn closing");
1921 
1922     c->write_state = &nxt_h1p_close_state;
1923 
1924     nxt_conn_close(task->thread->engine, c);
1925 }
1926 
1927 
1928 static const nxt_conn_state_t  nxt_h1p_close_state
1929     nxt_aligned(64) =
1930 {
1931     .ready_handler = nxt_h1p_conn_free,
1932 };
1933 
1934 
1935 static void
1936 nxt_h1p_conn_free(nxt_task_t *task, void *obj, void *data)
1937 {
1938     nxt_conn_t          *c;
1939     nxt_listen_event_t  *lev;
1940     nxt_event_engine_t  *engine;
1941 
1942     c = obj;
1943 
1944     nxt_debug(task, "h1p conn free");
1945 
1946     nxt_queue_remove(&c->link);
1947 
1948     engine = task->thread->engine;
1949 
1950     nxt_sockaddr_cache_free(engine, c);
1951 
1952     lev = c->listen;
1953 
1954     nxt_conn_free(task, c);
1955 
1956     nxt_router_listen_event_release(&engine->task, lev, NULL);
1957 }
1958 
1959 
1960 static void
1961 nxt_h1p_peer_connect(nxt_task_t *task, nxt_http_peer_t *peer)
1962 {
1963     nxt_mp_t            *mp;
1964     nxt_int_t           ret;
1965     nxt_conn_t          *c, *client;
1966     nxt_h1proto_t       *h1p;
1967     nxt_fd_event_t      *socket;
1968     nxt_work_queue_t    *wq;
1969     nxt_http_request_t  *r;
1970 
1971     nxt_debug(task, "h1p peer connect");
1972 
1973     peer->status = NXT_HTTP_UNSET;
1974     r = peer->request;
1975 
1976     mp = nxt_mp_create(1024, 128, 256, 32);
1977 
1978     if (nxt_slow_path(mp == NULL)) {
1979         goto fail;
1980     }
1981 
1982     h1p = nxt_mp_zalloc(mp, sizeof(nxt_h1proto_t));
1983     if (nxt_slow_path(h1p == NULL)) {
1984         goto fail;
1985     }
1986 
1987     ret = nxt_http_parse_request_init(&h1p->parser, r->mem_pool);
1988     if (nxt_slow_path(ret != NXT_OK)) {
1989         goto fail;
1990     }
1991 
1992     c = nxt_conn_create(mp, task);
1993     if (nxt_slow_path(c == NULL)) {
1994         goto fail;
1995     }
1996 
1997     c->mem_pool = mp;
1998     h1p->conn = c;
1999 
2000     peer->proto.h1 = h1p;
2001     h1p->request = r;
2002 
2003     c->socket.task = task;
2004     c->read_timer.task = task;
2005     c->write_timer.task = task;
2006     c->socket.data = peer;
2007     c->remote = peer->sockaddr;
2008 
2009     c->socket.write_ready = 1;
2010     c->write_state = &nxt_h1p_peer_connect_state;
2011 
2012     /*
2013      * TODO: queues should be implemented via client proto interface.
2014      */
2015     client = r->proto.h1->conn;
2016 
2017     socket = &client->socket;
2018     wq = socket->read_work_queue;
2019     c->read_work_queue = wq;
2020     c->socket.read_work_queue = wq;
2021     c->read_timer.work_queue = wq;
2022 
2023     wq = socket->write_work_queue;
2024     c->write_work_queue = wq;
2025     c->socket.write_work_queue = wq;
2026     c->write_timer.work_queue = wq;
2027     /* TODO END */
2028 
2029     nxt_conn_connect(task->thread->engine, c);
2030 
2031     return;
2032 
2033 fail:
2034 
2035     peer->status = NXT_HTTP_INTERNAL_SERVER_ERROR;
2036 
2037     r->state->error_handler(task, r, peer);
2038 }
2039 
2040 
2041 static const nxt_conn_state_t  nxt_h1p_peer_connect_state
2042     nxt_aligned(64) =
2043 {
2044     .ready_handler = nxt_h1p_peer_connected,
2045     .close_handler = nxt_h1p_peer_refused,
2046     .error_handler = nxt_h1p_peer_error,
2047 
2048     .timer_handler = nxt_h1p_peer_send_timeout,
2049     .timer_value = nxt_h1p_peer_timer_value,
2050     .timer_data = offsetof(nxt_socket_conf_t, proxy_timeout),
2051 };
2052 
2053 
2054 static void
2055 nxt_h1p_peer_connected(nxt_task_t *task, void *obj, void *data)
2056 {
2057     nxt_http_peer_t     *peer;
2058     nxt_http_request_t  *r;
2059 
2060     peer = data;
2061 
2062     nxt_debug(task, "h1p peer connected");
2063 
2064     r = peer->request;
2065     r->state->ready_handler(task, r, peer);
2066 }
2067 
2068 
2069 static void
2070 nxt_h1p_peer_refused(nxt_task_t *task, void *obj, void *data)
2071 {
2072     nxt_http_peer_t     *peer;
2073     nxt_http_request_t  *r;
2074 
2075     peer = data;
2076 
2077     nxt_debug(task, "h1p peer refused");
2078 
2079     //peer->status = NXT_HTTP_SERVICE_UNAVAILABLE;
2080     peer->status = NXT_HTTP_BAD_GATEWAY;
2081 
2082     r = peer->request;
2083     r->state->error_handler(task, r, peer);
2084 }
2085 
2086 
2087 static void
2088 nxt_h1p_peer_header_send(nxt_task_t *task, nxt_http_peer_t *peer)
2089 {
2090     u_char              *p;
2091     size_t              size;
2092     nxt_buf_t           *header, *body;
2093     nxt_conn_t          *c;
2094     nxt_http_field_t    *field;
2095     nxt_http_request_t  *r;
2096 
2097     nxt_debug(task, "h1p peer header send");
2098 
2099     r = peer->request;
2100 
2101     size = r->method->length + sizeof(" ") + r->target.length
2102            + sizeof(" HTTP/1.0\r\n")
2103            + sizeof("\r\n");
2104 
2105     nxt_list_each(field, r->fields) {
2106 
2107         if (!field->hopbyhop) {
2108             size += field->name_length + field->value_length;
2109             size += nxt_length(": \r\n");
2110         }
2111 
2112     } nxt_list_loop;
2113 
2114     header = nxt_http_buf_mem(task, r, size);
2115     if (nxt_slow_path(header == NULL)) {
2116         r->state->error_handler(task, r, peer);
2117         return;
2118     }
2119 
2120     p = header->mem.free;
2121 
2122     p = nxt_cpymem(p, r->method->start, r->method->length);
2123     *p++ = ' ';
2124     p = nxt_cpymem(p, r->target.start, r->target.length);
2125     p = nxt_cpymem(p, " HTTP/1.0\r\n", 11);
2126 
2127     nxt_list_each(field, r->fields) {
2128 
2129         if (!field->hopbyhop) {
2130             p = nxt_cpymem(p, field->name, field->name_length);
2131             *p++ = ':'; *p++ = ' ';
2132             p = nxt_cpymem(p, field->value, field->value_length);
2133             *p++ = '\r'; *p++ = '\n';
2134         }
2135 
2136     } nxt_list_loop;
2137 
2138     *p++ = '\r'; *p++ = '\n';
2139     header->mem.free = p;
2140     size = p - header->mem.pos;
2141 
2142     c = peer->proto.h1->conn;
2143     c->write = header;
2144     c->write_state = &nxt_h1p_peer_header_send_state;
2145 
2146     if (r->body != NULL) {
2147         body = nxt_buf_mem_alloc(r->mem_pool, 0, 0);
2148         if (nxt_slow_path(body == NULL)) {
2149             r->state->error_handler(task, r, peer);
2150             return;
2151         }
2152 
2153         header->next = body;
2154 
2155         body->mem = r->body->mem;
2156         size += nxt_buf_mem_used_size(&body->mem);
2157 
2158 //        nxt_mp_retain(r->mem_pool);
2159     }
2160 
2161     if (size > 16384) {
2162         /* Use proxy_send_timeout instead of proxy_timeout. */
2163         c->write_state = &nxt_h1p_peer_header_body_send_state;
2164     }
2165 
2166     nxt_conn_write(task->thread->engine, c);
2167 }
2168 
2169 
2170 static const nxt_conn_state_t  nxt_h1p_peer_header_send_state
2171     nxt_aligned(64) =
2172 {
2173     .ready_handler = nxt_h1p_peer_header_sent,
2174     .error_handler = nxt_h1p_peer_error,
2175 
2176     .timer_handler = nxt_h1p_peer_send_timeout,
2177     .timer_value = nxt_h1p_peer_timer_value,
2178     .timer_data = offsetof(nxt_socket_conf_t, proxy_timeout),
2179 };
2180 
2181 
2182 static const nxt_conn_state_t  nxt_h1p_peer_header_body_send_state
2183     nxt_aligned(64) =
2184 {
2185     .ready_handler = nxt_h1p_peer_header_sent,
2186     .error_handler = nxt_h1p_peer_error,
2187 
2188     .timer_handler = nxt_h1p_peer_send_timeout,
2189     .timer_value = nxt_h1p_peer_timer_value,
2190     .timer_data = offsetof(nxt_socket_conf_t, proxy_send_timeout),
2191     .timer_autoreset = 1,
2192 };
2193 
2194 
2195 static void
2196 nxt_h1p_peer_header_sent(nxt_task_t *task, void *obj, void *data)
2197 {
2198     nxt_conn_t          *c;
2199     nxt_http_peer_t     *peer;
2200     nxt_http_request_t  *r;
2201     nxt_event_engine_t  *engine;
2202 
2203     c = obj;
2204     peer = data;
2205 
2206     nxt_debug(task, "h1p peer header sent");
2207 
2208     engine = task->thread->engine;
2209 
2210     c->write = nxt_sendbuf_completion(task, &engine->fast_work_queue, c->write);
2211 
2212     if (c->write == NULL) {
2213         r = peer->request;
2214         r->state->ready_handler(task, r, peer);
2215         return;
2216     }
2217 
2218     nxt_conn_write(engine, c);
2219 }
2220 
2221 
2222 static void
2223 nxt_h1p_peer_header_read(nxt_task_t *task, nxt_http_peer_t *peer)
2224 {
2225     nxt_conn_t  *c;
2226 
2227     nxt_debug(task, "h1p peer header read");
2228 
2229     c = peer->proto.h1->conn;
2230 
2231     if (c->write_timer.enabled) {
2232         c->read_state = &nxt_h1p_peer_header_read_state;
2233 
2234     } else {
2235         c->read_state = &nxt_h1p_peer_header_read_timer_state;
2236     }
2237 
2238     nxt_conn_read(task->thread->engine, c);
2239 }
2240 
2241 
2242 static const nxt_conn_state_t  nxt_h1p_peer_header_read_state
2243     nxt_aligned(64) =
2244 {
2245     .ready_handler = nxt_h1p_peer_header_read_done,
2246     .close_handler = nxt_h1p_peer_closed,
2247     .error_handler = nxt_h1p_peer_error,
2248 
2249     .io_read_handler = nxt_h1p_peer_io_read_handler,
2250 };
2251 
2252 
2253 static const nxt_conn_state_t  nxt_h1p_peer_header_read_timer_state
2254     nxt_aligned(64) =
2255 {
2256     .ready_handler = nxt_h1p_peer_header_read_done,
2257     .close_handler = nxt_h1p_peer_closed,
2258     .error_handler = nxt_h1p_peer_error,
2259 
2260     .io_read_handler = nxt_h1p_peer_io_read_handler,
2261 
2262     .timer_handler = nxt_h1p_peer_read_timeout,
2263     .timer_value = nxt_h1p_peer_timer_value,
2264     .timer_data = offsetof(nxt_socket_conf_t, proxy_timeout),
2265 };
2266 
2267 
2268 static ssize_t
2269 nxt_h1p_peer_io_read_handler(nxt_task_t *task, nxt_conn_t *c)
2270 {
2271     size_t              size;
2272     ssize_t             n;
2273     nxt_buf_t           *b;
2274     nxt_http_peer_t     *peer;
2275     nxt_socket_conf_t   *skcf;
2276     nxt_http_request_t  *r;
2277 
2278     peer = c->socket.data;
2279     r = peer->request;
2280     b = c->read;
2281 
2282     if (b == NULL) {
2283         skcf = r->conf->socket_conf;
2284 
2285         size = (peer->header_received) ? skcf->proxy_buffer_size
2286                                        : skcf->proxy_header_buffer_size;
2287 
2288         nxt_debug(task, "h1p peer io read: %z", size);
2289 
2290         b = nxt_http_proxy_buf_mem_alloc(task, r, size);
2291         if (nxt_slow_path(b == NULL)) {
2292             c->socket.error = NXT_ENOMEM;
2293             return NXT_ERROR;
2294         }
2295     }
2296 
2297     n = c->io->recvbuf(c, b);
2298 
2299     if (n > 0) {
2300         c->read = b;
2301 
2302     } else {
2303         c->read = NULL;
2304         nxt_http_proxy_buf_mem_free(task, r, b);
2305     }
2306 
2307     return n;
2308 }
2309 
2310 
2311 static void
2312 nxt_h1p_peer_header_read_done(nxt_task_t *task, void *obj, void *data)
2313 {
2314     nxt_int_t           ret;
2315     nxt_buf_t           *b;
2316     nxt_conn_t          *c;
2317     nxt_http_peer_t     *peer;
2318     nxt_http_request_t  *r;
2319     nxt_event_engine_t  *engine;
2320 
2321     c = obj;
2322     peer = data;
2323 
2324     nxt_debug(task, "h1p peer header read done");
2325 
2326     b = c->read;
2327 
2328     ret = nxt_h1p_peer_header_parse(peer, &b->mem);
2329 
2330     r = peer->request;
2331 
2332     ret = nxt_expect(NXT_DONE, ret);
2333 
2334     if (ret != NXT_AGAIN) {
2335         engine = task->thread->engine;
2336         nxt_timer_disable(engine, &c->write_timer);
2337         nxt_timer_disable(engine, &c->read_timer);
2338     }
2339 
2340     switch (ret) {
2341 
2342     case NXT_DONE:
2343         peer->fields = peer->proto.h1->parser.fields;
2344 
2345         ret = nxt_http_fields_process(peer->fields,
2346                                       &nxt_h1p_peer_fields_hash, r);
2347         if (nxt_slow_path(ret != NXT_OK)) {
2348             peer->status = NXT_HTTP_INTERNAL_SERVER_ERROR;
2349             break;
2350         }
2351 
2352         c->read = NULL;
2353 
2354         if (nxt_buf_mem_used_size(&b->mem) != 0) {
2355             peer->body = b;
2356         }
2357 
2358         peer->header_received = 1;
2359 
2360         r->state->ready_handler(task, r, peer);
2361         return;
2362 
2363     case NXT_AGAIN:
2364         if (nxt_buf_mem_free_size(&b->mem) != 0) {
2365             nxt_conn_read(task->thread->engine, c);
2366             return;
2367         }
2368 
2369         /* Fall through. */
2370 
2371     default:
2372     case NXT_ERROR:
2373     case NXT_HTTP_PARSE_INVALID:
2374     case NXT_HTTP_PARSE_UNSUPPORTED_VERSION:
2375     case NXT_HTTP_PARSE_TOO_LARGE_FIELD:
2376         peer->status = NXT_HTTP_BAD_GATEWAY;
2377         break;
2378     }
2379 
2380     nxt_http_proxy_buf_mem_free(task, r, b);
2381 
2382     r->state->error_handler(task, r, peer);
2383 }
2384 
2385 
2386 static nxt_int_t
2387 nxt_h1p_peer_header_parse(nxt_http_peer_t *peer, nxt_buf_mem_t *bm)
2388 {
2389     u_char     *p;
2390     size_t     length;
2391     nxt_int_t  status;
2392 
2393     if (peer->status < 0) {
2394         length = nxt_buf_mem_used_size(bm);
2395 
2396         if (nxt_slow_path(length < 12)) {
2397             return NXT_AGAIN;
2398         }
2399 
2400         p = bm->pos;
2401 
2402         if (nxt_slow_path(nxt_memcmp(p, "HTTP/1.", 7) != 0
2403                           || (p[7] != '0' && p[7] != '1')))
2404         {
2405             return NXT_ERROR;
2406         }
2407 
2408         status = nxt_int_parse(&p[9], 3);
2409 
2410         if (nxt_slow_path(status < 0)) {
2411             return NXT_ERROR;
2412         }
2413 
2414         p += 12;
2415         length -= 12;
2416 
2417         p = nxt_memchr(p, '\n', length);
2418 
2419         if (nxt_slow_path(p == NULL)) {
2420             return NXT_AGAIN;
2421         }
2422 
2423         bm->pos = p + 1;
2424         peer->status = status;
2425     }
2426 
2427     return nxt_http_parse_fields(&peer->proto.h1->parser, bm);
2428 }
2429 
2430 
2431 static void
2432 nxt_h1p_peer_read(nxt_task_t *task, nxt_http_peer_t *peer)
2433 {
2434     nxt_conn_t  *c;
2435 
2436     nxt_debug(task, "h1p peer read");
2437 
2438     c = peer->proto.h1->conn;
2439     c->read_state = &nxt_h1p_peer_read_state;
2440 
2441     nxt_conn_read(task->thread->engine, c);
2442 }
2443 
2444 
2445 static const nxt_conn_state_t  nxt_h1p_peer_read_state
2446     nxt_aligned(64) =
2447 {
2448     .ready_handler = nxt_h1p_peer_read_done,
2449     .close_handler = nxt_h1p_peer_closed,
2450     .error_handler = nxt_h1p_peer_error,
2451 
2452     .io_read_handler = nxt_h1p_peer_io_read_handler,
2453 
2454     .timer_handler = nxt_h1p_peer_read_timeout,
2455     .timer_value = nxt_h1p_peer_timer_value,
2456     .timer_data = offsetof(nxt_socket_conf_t, proxy_read_timeout),
2457     .timer_autoreset = 1,
2458 };
2459 
2460 
2461 static void
2462 nxt_h1p_peer_read_done(nxt_task_t *task, void *obj, void *data)
2463 {
2464     nxt_conn_t          *c;
2465     nxt_http_peer_t     *peer;
2466     nxt_http_request_t  *r;
2467 
2468     c = obj;
2469     peer = data;
2470 
2471     nxt_debug(task, "h1p peer read done");
2472 
2473     peer->body = c->read;
2474     c->read = NULL;
2475 
2476     r = peer->request;
2477     r->state->ready_handler(task, r, peer);
2478 }
2479 
2480 
2481 static void
2482 nxt_h1p_peer_closed(nxt_task_t *task, void *obj, void *data)
2483 {
2484     nxt_http_peer_t     *peer;
2485     nxt_http_request_t  *r;
2486 
2487     peer = data;
2488 
2489     nxt_debug(task, "h1p peer closed");
2490 
2491     r = peer->request;
2492 
2493     if (peer->header_received) {
2494         peer->body = nxt_http_buf_last(r);
2495 
2496         peer->closed = 1;
2497 
2498         r->state->ready_handler(task, r, peer);
2499 
2500     } else {
2501         peer->status = NXT_HTTP_BAD_GATEWAY;
2502 
2503         r->state->error_handler(task, r, peer);
2504     }
2505 }
2506 
2507 
2508 static void
2509 nxt_h1p_peer_error(nxt_task_t *task, void *obj, void *data)
2510 {
2511     nxt_http_peer_t     *peer;
2512     nxt_http_request_t  *r;
2513 
2514     peer = data;
2515 
2516     nxt_debug(task, "h1p peer error");
2517 
2518     peer->status = NXT_HTTP_BAD_GATEWAY;
2519 
2520     r = peer->request;
2521     r->state->error_handler(task, r, peer);
2522 }
2523 
2524 
2525 static void
2526 nxt_h1p_peer_send_timeout(nxt_task_t *task, void *obj, void *data)
2527 {
2528     nxt_conn_t          *c;
2529     nxt_timer_t         *timer;
2530     nxt_http_peer_t     *peer;
2531     nxt_http_request_t  *r;
2532 
2533     timer = obj;
2534 
2535     nxt_debug(task, "h1p peer send timeout");
2536 
2537     c = nxt_write_timer_conn(timer);
2538     c->block_write = 1;
2539     c->block_read = 1;
2540 
2541     peer = c->socket.data;
2542     peer->status = NXT_HTTP_GATEWAY_TIMEOUT;
2543 
2544     r = peer->request;
2545     r->state->error_handler(task, r, peer);
2546 }
2547 
2548 
2549 static void
2550 nxt_h1p_peer_read_timeout(nxt_task_t *task, void *obj, void *data)
2551 {
2552     nxt_conn_t          *c;
2553     nxt_timer_t         *timer;
2554     nxt_http_peer_t     *peer;
2555     nxt_http_request_t  *r;
2556 
2557     timer = obj;
2558 
2559     nxt_debug(task, "h1p peer read timeout");
2560 
2561     c = nxt_read_timer_conn(timer);
2562     c->block_write = 1;
2563     c->block_read = 1;
2564 
2565     peer = c->socket.data;
2566     peer->status = NXT_HTTP_GATEWAY_TIMEOUT;
2567 
2568     r = peer->request;
2569     r->state->error_handler(task, r, peer);
2570 }
2571 
2572 
2573 static nxt_msec_t
2574 nxt_h1p_peer_timer_value(nxt_conn_t *c, uintptr_t data)
2575 {
2576     nxt_http_peer_t  *peer;
2577 
2578     peer = c->socket.data;
2579 
2580     return nxt_value_at(nxt_msec_t, peer->request->conf->socket_conf, data);
2581 }
2582 
2583 
2584 static void
2585 nxt_h1p_peer_close(nxt_task_t *task, nxt_http_peer_t *peer)
2586 {
2587     nxt_conn_t  *c;
2588 
2589     nxt_debug(task, "h1p peer close");
2590 
2591     peer->closed = 1;
2592 
2593     c = peer->proto.h1->conn;
2594     task = &c->task;
2595     c->socket.task = task;
2596     c->read_timer.task = task;
2597     c->write_timer.task = task;
2598 
2599     if (c->socket.fd != -1) {
2600         c->write_state = &nxt_h1p_peer_close_state;
2601 
2602         nxt_conn_close(task->thread->engine, c);
2603 
2604     } else {
2605         nxt_h1p_peer_free(task, c, NULL);
2606     }
2607 }
2608 
2609 
2610 static const nxt_conn_state_t  nxt_h1p_peer_close_state
2611     nxt_aligned(64) =
2612 {
2613     .ready_handler = nxt_h1p_peer_free,
2614 };
2615 
2616 
2617 static void
2618 nxt_h1p_peer_free(nxt_task_t *task, void *obj, void *data)
2619 {
2620     nxt_conn_t  *c;
2621 
2622     c = obj;
2623 
2624     nxt_debug(task, "h1p peer free");
2625 
2626     nxt_conn_free(task, c);
2627 }
2628