xref: /unit/src/nxt_h1proto.c (revision 1401:c88f739aac1c)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_router.h>
8 #include <nxt_http.h>
9 #include <nxt_upstream.h>
10 #include <nxt_h1proto.h>
11 #include <nxt_websocket.h>
12 #include <nxt_websocket_header.h>
13 
14 
15 /*
16  * nxt_http_conn_ and nxt_h1p_conn_ prefixes are used for connection handlers.
17  * nxt_h1p_idle_ prefix is used for idle connection handlers.
18  * nxt_h1p_request_ prefix is used for HTTP/1 protocol request methods.
19  */
20 
21 #if (NXT_TLS)
22 static ssize_t nxt_http_idle_io_read_handler(nxt_task_t *task, nxt_conn_t *c);
23 static void nxt_http_conn_test(nxt_task_t *task, void *obj, void *data);
24 #endif
25 static ssize_t nxt_h1p_idle_io_read_handler(nxt_task_t *task, nxt_conn_t *c);
26 static void nxt_h1p_conn_proto_init(nxt_task_t *task, void *obj, void *data);
27 static void nxt_h1p_conn_request_init(nxt_task_t *task, void *obj, void *data);
28 static void nxt_h1p_conn_request_header_parse(nxt_task_t *task, void *obj,
29     void *data);
30 static nxt_int_t nxt_h1p_header_process(nxt_task_t *task, nxt_h1proto_t *h1p,
31     nxt_http_request_t *r);
32 static nxt_int_t nxt_h1p_header_buffer_test(nxt_task_t *task,
33     nxt_h1proto_t *h1p, nxt_conn_t *c, nxt_socket_conf_t *skcf);
34 static nxt_int_t nxt_h1p_connection(void *ctx, nxt_http_field_t *field,
35     uintptr_t data);
36 static nxt_int_t nxt_h1p_upgrade(void *ctx, nxt_http_field_t *field,
37     uintptr_t data);
38 static nxt_int_t nxt_h1p_websocket_key(void *ctx, nxt_http_field_t *field,
39     uintptr_t data);
40 static nxt_int_t nxt_h1p_websocket_version(void *ctx, nxt_http_field_t *field,
41     uintptr_t data);
42 static nxt_int_t nxt_h1p_transfer_encoding(void *ctx, nxt_http_field_t *field,
43     uintptr_t data);
44 static void nxt_h1p_request_body_read(nxt_task_t *task, nxt_http_request_t *r);
45 static void nxt_h1p_conn_request_body_read(nxt_task_t *task, void *obj,
46     void *data);
47 static void nxt_h1p_request_local_addr(nxt_task_t *task, nxt_http_request_t *r);
48 static void nxt_h1p_request_header_send(nxt_task_t *task,
49     nxt_http_request_t *r, nxt_work_handler_t body_handler, void *data);
50 static void nxt_h1p_request_send(nxt_task_t *task, nxt_http_request_t *r,
51     nxt_buf_t *out);
52 static nxt_buf_t *nxt_h1p_chunk_create(nxt_task_t *task, nxt_http_request_t *r,
53     nxt_buf_t *out);
54 static nxt_off_t nxt_h1p_request_body_bytes_sent(nxt_task_t *task,
55     nxt_http_proto_t proto);
56 static void nxt_h1p_request_discard(nxt_task_t *task, nxt_http_request_t *r,
57     nxt_buf_t *last);
58 static void nxt_h1p_conn_request_error(nxt_task_t *task, void *obj, void *data);
59 static void nxt_h1p_conn_request_timeout(nxt_task_t *task, void *obj,
60     void *data);
61 static void nxt_h1p_conn_request_send_timeout(nxt_task_t *task, void *obj,
62     void *data);
63 nxt_inline void nxt_h1p_request_error(nxt_task_t *task, nxt_h1proto_t *h1p,
64     nxt_http_request_t *r);
65 static void nxt_h1p_request_close(nxt_task_t *task, nxt_http_proto_t proto,
66     nxt_socket_conf_joint_t *joint);
67 static void nxt_h1p_conn_sent(nxt_task_t *task, void *obj, void *data);
68 static void nxt_h1p_conn_close(nxt_task_t *task, void *obj, void *data);
69 static void nxt_h1p_conn_error(nxt_task_t *task, void *obj, void *data);
70 static nxt_msec_t nxt_h1p_conn_timer_value(nxt_conn_t *c, uintptr_t data);
71 static void nxt_h1p_keepalive(nxt_task_t *task, nxt_h1proto_t *h1p,
72     nxt_conn_t *c);
73 static void nxt_h1p_idle_close(nxt_task_t *task, void *obj, void *data);
74 static void nxt_h1p_idle_timeout(nxt_task_t *task, void *obj, void *data);
75 static void nxt_h1p_idle_response(nxt_task_t *task, nxt_conn_t *c);
76 static void nxt_h1p_idle_response_sent(nxt_task_t *task, void *obj, void *data);
77 static void nxt_h1p_idle_response_timeout(nxt_task_t *task, void *obj,
78     void *data);
79 static nxt_msec_t nxt_h1p_idle_response_timer_value(nxt_conn_t *c,
80     uintptr_t data);
81 static void nxt_h1p_shutdown(nxt_task_t *task, nxt_conn_t *c);
82 static void nxt_h1p_closing(nxt_task_t *task, nxt_conn_t *c);
83 static void nxt_h1p_conn_ws_shutdown(nxt_task_t *task, void *obj, void *data);
84 static void nxt_h1p_conn_closing(nxt_task_t *task, void *obj, void *data);
85 static void nxt_h1p_conn_free(nxt_task_t *task, void *obj, void *data);
86 
87 static void nxt_h1p_peer_connect(nxt_task_t *task, nxt_http_peer_t *peer);
88 static void nxt_h1p_peer_connected(nxt_task_t *task, void *obj, void *data);
89 static void nxt_h1p_peer_refused(nxt_task_t *task, void *obj, void *data);
90 static void nxt_h1p_peer_header_send(nxt_task_t *task, nxt_http_peer_t *peer);
91 static void nxt_h1p_peer_header_sent(nxt_task_t *task, void *obj, void *data);
92 static void nxt_h1p_peer_header_read(nxt_task_t *task, nxt_http_peer_t *peer);
93 static ssize_t nxt_h1p_peer_io_read_handler(nxt_task_t *task, nxt_conn_t *c);
94 static void nxt_h1p_peer_header_read_done(nxt_task_t *task, void *obj,
95     void *data);
96 static nxt_int_t nxt_h1p_peer_header_parse(nxt_http_peer_t *peer,
97     nxt_buf_mem_t *bm);
98 static void nxt_h1p_peer_read(nxt_task_t *task, nxt_http_peer_t *peer);
99 static void nxt_h1p_peer_read_done(nxt_task_t *task, void *obj, void *data);
100 static void nxt_h1p_peer_closed(nxt_task_t *task, void *obj, void *data);
101 static void nxt_h1p_peer_error(nxt_task_t *task, void *obj, void *data);
102 static void nxt_h1p_peer_send_timeout(nxt_task_t *task, void *obj, void *data);
103 static void nxt_h1p_peer_read_timeout(nxt_task_t *task, void *obj, void *data);
104 static nxt_msec_t nxt_h1p_peer_timer_value(nxt_conn_t *c, uintptr_t data);
105 static void nxt_h1p_peer_close(nxt_task_t *task, nxt_http_peer_t *peer);
106 static void nxt_h1p_peer_free(nxt_task_t *task, void *obj, void *data);
107 
108 #if (NXT_TLS)
109 static const nxt_conn_state_t  nxt_http_idle_state;
110 static const nxt_conn_state_t  nxt_h1p_shutdown_state;
111 #endif
112 static const nxt_conn_state_t  nxt_h1p_idle_state;
113 static const nxt_conn_state_t  nxt_h1p_header_parse_state;
114 static const nxt_conn_state_t  nxt_h1p_read_body_state;
115 static const nxt_conn_state_t  nxt_h1p_request_send_state;
116 static const nxt_conn_state_t  nxt_h1p_timeout_response_state;
117 static const nxt_conn_state_t  nxt_h1p_keepalive_state;
118 static const nxt_conn_state_t  nxt_h1p_close_state;
119 static const nxt_conn_state_t  nxt_h1p_peer_connect_state;
120 static const nxt_conn_state_t  nxt_h1p_peer_header_send_state;
121 static const nxt_conn_state_t  nxt_h1p_peer_header_body_send_state;
122 static const nxt_conn_state_t  nxt_h1p_peer_header_read_state;
123 static const nxt_conn_state_t  nxt_h1p_peer_header_read_timer_state;
124 static const nxt_conn_state_t  nxt_h1p_peer_read_state;
125 static const nxt_conn_state_t  nxt_h1p_peer_close_state;
126 
127 
128 const nxt_http_proto_table_t  nxt_http_proto[3] = {
129     /* NXT_HTTP_PROTO_H1 */
130     {
131         .body_read        = nxt_h1p_request_body_read,
132         .local_addr       = nxt_h1p_request_local_addr,
133         .header_send      = nxt_h1p_request_header_send,
134         .send             = nxt_h1p_request_send,
135         .body_bytes_sent  = nxt_h1p_request_body_bytes_sent,
136         .discard          = nxt_h1p_request_discard,
137         .close            = nxt_h1p_request_close,
138 
139         .peer_connect     = nxt_h1p_peer_connect,
140         .peer_header_send = nxt_h1p_peer_header_send,
141         .peer_header_read = nxt_h1p_peer_header_read,
142         .peer_read        = nxt_h1p_peer_read,
143         .peer_close       = nxt_h1p_peer_close,
144 
145         .ws_frame_start   = nxt_h1p_websocket_frame_start,
146     },
147     /* NXT_HTTP_PROTO_H2      */
148     /* NXT_HTTP_PROTO_DEVNULL */
149 };
150 
151 
152 static nxt_lvlhsh_t                    nxt_h1p_fields_hash;
153 
154 static nxt_http_field_proc_t           nxt_h1p_fields[] = {
155     { nxt_string("Connection"),        &nxt_h1p_connection, 0 },
156     { nxt_string("Upgrade"),           &nxt_h1p_upgrade, 0 },
157     { nxt_string("Sec-WebSocket-Key"), &nxt_h1p_websocket_key, 0 },
158     { nxt_string("Sec-WebSocket-Version"),
159                                        &nxt_h1p_websocket_version, 0 },
160     { nxt_string("Transfer-Encoding"), &nxt_h1p_transfer_encoding, 0 },
161 
162     { nxt_string("Host"),              &nxt_http_request_host, 0 },
163     { nxt_string("Cookie"),            &nxt_http_request_field,
164         offsetof(nxt_http_request_t, cookie) },
165     { nxt_string("Referer"),           &nxt_http_request_field,
166         offsetof(nxt_http_request_t, referer) },
167     { nxt_string("User-Agent"),        &nxt_http_request_field,
168         offsetof(nxt_http_request_t, user_agent) },
169     { nxt_string("Content-Type"),      &nxt_http_request_field,
170         offsetof(nxt_http_request_t, content_type) },
171     { nxt_string("Content-Length"),    &nxt_http_request_content_length, 0 },
172 };
173 
174 
175 static nxt_lvlhsh_t                    nxt_h1p_peer_fields_hash;
176 
177 static nxt_http_field_proc_t           nxt_h1p_peer_fields[] = {
178     { nxt_string("Connection"),        &nxt_http_proxy_skip, 0 },
179     { nxt_string("Transfer-Encoding"), &nxt_http_proxy_skip, 0 },
180     { nxt_string("Server"),            &nxt_http_proxy_skip, 0 },
181     { nxt_string("Date"),              &nxt_http_proxy_date, 0 },
182     { nxt_string("Content-Length"),    &nxt_http_proxy_content_length, 0 },
183 };
184 
185 
186 nxt_int_t
187 nxt_h1p_init(nxt_task_t *task, nxt_runtime_t *rt)
188 {
189     nxt_int_t  ret;
190 
191     ret = nxt_http_fields_hash(&nxt_h1p_fields_hash, rt->mem_pool,
192                                nxt_h1p_fields, nxt_nitems(nxt_h1p_fields));
193 
194     if (nxt_fast_path(ret == NXT_OK)) {
195         ret = nxt_http_fields_hash(&nxt_h1p_peer_fields_hash,
196                                    rt->mem_pool, nxt_h1p_peer_fields,
197                                    nxt_nitems(nxt_h1p_peer_fields));
198     }
199 
200     return ret;
201 }
202 
203 
204 void
205 nxt_http_conn_init(nxt_task_t *task, void *obj, void *data)
206 {
207     nxt_conn_t               *c;
208     nxt_socket_conf_t        *skcf;
209     nxt_event_engine_t       *engine;
210     nxt_listen_event_t       *lev;
211     nxt_socket_conf_joint_t  *joint;
212 
213     c = obj;
214     lev = data;
215 
216     nxt_debug(task, "http conn init");
217 
218     joint = lev->socket.data;
219     skcf = joint->socket_conf;
220     c->local = skcf->sockaddr;
221 
222     engine = task->thread->engine;
223     c->read_work_queue = &engine->fast_work_queue;
224     c->write_work_queue = &engine->fast_work_queue;
225 
226     c->read_state = &nxt_h1p_idle_state;
227 
228 #if (NXT_TLS)
229     if (skcf->tls != NULL) {
230         c->read_state = &nxt_http_idle_state;
231     }
232 #endif
233 
234     nxt_conn_read(engine, c);
235 }
236 
237 
238 #if (NXT_TLS)
239 
240 static const nxt_conn_state_t  nxt_http_idle_state
241     nxt_aligned(64) =
242 {
243     .ready_handler = nxt_http_conn_test,
244     .close_handler = nxt_h1p_conn_close,
245     .error_handler = nxt_h1p_conn_error,
246 
247     .io_read_handler = nxt_http_idle_io_read_handler,
248 
249     .timer_handler = nxt_h1p_idle_timeout,
250     .timer_value = nxt_h1p_conn_timer_value,
251     .timer_data = offsetof(nxt_socket_conf_t, idle_timeout),
252 };
253 
254 
255 static ssize_t
256 nxt_http_idle_io_read_handler(nxt_task_t *task, nxt_conn_t *c)
257 {
258     size_t                   size;
259     ssize_t                  n;
260     nxt_buf_t                *b;
261     nxt_socket_conf_joint_t  *joint;
262 
263     joint = c->listen->socket.data;
264 
265     if (nxt_slow_path(joint == NULL)) {
266         /*
267          * Listening socket had been closed while
268          * connection was in keep-alive state.
269          */
270         c->read_state = &nxt_h1p_idle_close_state;
271         return 0;
272     }
273 
274     size = joint->socket_conf->header_buffer_size;
275 
276     b = nxt_event_engine_buf_mem_alloc(task->thread->engine, size);
277     if (nxt_slow_path(b == NULL)) {
278         c->socket.error = NXT_ENOMEM;
279         return NXT_ERROR;
280     }
281 
282     /*
283      * 1 byte is enough to distinguish between SSLv3/TLS and plain HTTP.
284      * 11 bytes are enough to log supported SSLv3/TLS version.
285      * 16 bytes are just for more optimized kernel copy-out operation.
286      */
287     n = c->io->recv(c, b->mem.pos, 16, MSG_PEEK);
288 
289     if (n > 0) {
290         c->read = b;
291 
292     } else {
293         c->read = NULL;
294         nxt_event_engine_buf_mem_free(task->thread->engine, b);
295     }
296 
297     return n;
298 }
299 
300 
301 static void
302 nxt_http_conn_test(nxt_task_t *task, void *obj, void *data)
303 {
304     u_char                   *p;
305     nxt_buf_t                *b;
306     nxt_conn_t               *c;
307     nxt_tls_conf_t           *tls;
308     nxt_event_engine_t       *engine;
309     nxt_socket_conf_joint_t  *joint;
310 
311     c = obj;
312 
313     nxt_debug(task, "h1p conn https test");
314 
315     engine = task->thread->engine;
316     b = c->read;
317     p = b->mem.pos;
318 
319     c->read_state = &nxt_h1p_idle_state;
320 
321     if (p[0] != 0x16) {
322         b->mem.free = b->mem.pos;
323 
324         nxt_conn_read(engine, c);
325         return;
326     }
327 
328     /* SSLv3/TLS ClientHello message. */
329 
330 #if (NXT_DEBUG)
331     if (nxt_buf_mem_used_size(&b->mem) >= 11) {
332         u_char      major, minor;
333         const char  *protocol;
334 
335         major = p[9];
336         minor = p[10];
337 
338         if (major == 3) {
339             if (minor == 0) {
340                 protocol = "SSLv";
341 
342             } else {
343                 protocol = "TLSv";
344                 major -= 2;
345                 minor -= 1;
346             }
347 
348             nxt_debug(task, "SSL/TLS: %s%ud.%ud", protocol, major, minor);
349         }
350     }
351 #endif
352 
353     c->read = NULL;
354     nxt_event_engine_buf_mem_free(engine, b);
355 
356     joint = c->listen->socket.data;
357 
358     if (nxt_slow_path(joint == NULL)) {
359         /*
360          * Listening socket had been closed while
361          * connection was in keep-alive state.
362          */
363         nxt_h1p_closing(task, c);
364         return;
365     }
366 
367     tls = joint->socket_conf->tls;
368 
369     tls->conn_init(task, tls, c);
370 }
371 
372 #endif
373 
374 
375 static const nxt_conn_state_t  nxt_h1p_idle_state
376     nxt_aligned(64) =
377 {
378     .ready_handler = nxt_h1p_conn_proto_init,
379     .close_handler = nxt_h1p_conn_close,
380     .error_handler = nxt_h1p_conn_error,
381 
382     .io_read_handler = nxt_h1p_idle_io_read_handler,
383 
384     .timer_handler = nxt_h1p_idle_timeout,
385     .timer_value = nxt_h1p_conn_timer_value,
386     .timer_data = offsetof(nxt_socket_conf_t, idle_timeout),
387     .timer_autoreset = 1,
388 };
389 
390 
391 static ssize_t
392 nxt_h1p_idle_io_read_handler(nxt_task_t *task, nxt_conn_t *c)
393 {
394     size_t                   size;
395     ssize_t                  n;
396     nxt_buf_t                *b;
397     nxt_socket_conf_joint_t  *joint;
398 
399     joint = c->listen->socket.data;
400 
401     if (nxt_slow_path(joint == NULL)) {
402         /*
403          * Listening socket had been closed while
404          * connection was in keep-alive state.
405          */
406         c->read_state = &nxt_h1p_idle_close_state;
407         return 0;
408     }
409 
410     b = c->read;
411 
412     if (b == NULL) {
413         size = joint->socket_conf->header_buffer_size;
414 
415         b = nxt_event_engine_buf_mem_alloc(task->thread->engine, size);
416         if (nxt_slow_path(b == NULL)) {
417             c->socket.error = NXT_ENOMEM;
418             return NXT_ERROR;
419         }
420     }
421 
422     n = c->io->recvbuf(c, b);
423 
424     if (n > 0) {
425         c->read = b;
426 
427     } else {
428         c->read = NULL;
429         nxt_event_engine_buf_mem_free(task->thread->engine, b);
430     }
431 
432     return n;
433 }
434 
435 
436 static void
437 nxt_h1p_conn_proto_init(nxt_task_t *task, void *obj, void *data)
438 {
439     nxt_conn_t     *c;
440     nxt_h1proto_t  *h1p;
441 
442     c = obj;
443 
444     nxt_debug(task, "h1p conn proto init");
445 
446     h1p = nxt_mp_zget(c->mem_pool, sizeof(nxt_h1proto_t));
447     if (nxt_slow_path(h1p == NULL)) {
448         nxt_h1p_closing(task, c);
449         return;
450     }
451 
452     c->socket.data = h1p;
453     h1p->conn = c;
454 
455     nxt_h1p_conn_request_init(task, c, h1p);
456 }
457 
458 
459 static void
460 nxt_h1p_conn_request_init(nxt_task_t *task, void *obj, void *data)
461 {
462     nxt_int_t                ret;
463     nxt_conn_t               *c;
464     nxt_h1proto_t            *h1p;
465     nxt_http_request_t       *r;
466     nxt_socket_conf_joint_t  *joint;
467 
468     c = obj;
469     h1p = data;
470 
471     nxt_debug(task, "h1p conn request init");
472 
473     r = nxt_http_request_create(task);
474 
475     if (nxt_fast_path(r != NULL)) {
476         h1p->request = r;
477         r->proto.h1 = h1p;
478 
479         /* r->protocol = NXT_HTTP_PROTO_H1 is done by zeroing. */
480         r->remote = c->remote;
481 
482 #if (NXT_TLS)
483         r->tls = c->u.tls;
484 #endif
485 
486         r->task = c->task;
487         task = &r->task;
488         c->socket.task = task;
489         c->read_timer.task = task;
490         c->write_timer.task = task;
491 
492         ret = nxt_http_parse_request_init(&h1p->parser, r->mem_pool);
493 
494         if (nxt_fast_path(ret == NXT_OK)) {
495             joint = c->listen->socket.data;
496             joint->count++;
497 
498             r->conf = joint;
499             c->local = joint->socket_conf->sockaddr;
500 
501             nxt_h1p_conn_request_header_parse(task, c, h1p);
502             return;
503         }
504 
505         /*
506          * The request is very incomplete here,
507          * so "internal server error" useless here.
508          */
509         nxt_mp_release(r->mem_pool);
510     }
511 
512     nxt_h1p_closing(task, c);
513 }
514 
515 
516 static const nxt_conn_state_t  nxt_h1p_header_parse_state
517     nxt_aligned(64) =
518 {
519     .ready_handler = nxt_h1p_conn_request_header_parse,
520     .close_handler = nxt_h1p_conn_request_error,
521     .error_handler = nxt_h1p_conn_request_error,
522 
523     .timer_handler = nxt_h1p_conn_request_timeout,
524     .timer_value = nxt_h1p_conn_request_timer_value,
525     .timer_data = offsetof(nxt_socket_conf_t, header_read_timeout),
526 };
527 
528 
529 static void
530 nxt_h1p_conn_request_header_parse(nxt_task_t *task, void *obj, void *data)
531 {
532     nxt_int_t           ret;
533     nxt_conn_t          *c;
534     nxt_h1proto_t       *h1p;
535     nxt_http_status_t   status;
536     nxt_http_request_t  *r;
537 
538     c = obj;
539     h1p = data;
540 
541     nxt_debug(task, "h1p conn header parse");
542 
543     ret = nxt_http_parse_request(&h1p->parser, &c->read->mem);
544 
545     ret = nxt_expect(NXT_DONE, ret);
546 
547     if (ret != NXT_AGAIN) {
548         nxt_timer_disable(task->thread->engine, &c->read_timer);
549     }
550 
551     r = h1p->request;
552 
553     switch (ret) {
554 
555     case NXT_DONE:
556         /*
557          * By default the keepalive mode is disabled in HTTP/1.0 and
558          * enabled in HTTP/1.1.  The mode can be overridden later by
559          * the "Connection" field processed in nxt_h1p_connection().
560          */
561         h1p->keepalive = (h1p->parser.version.s.minor != '0');
562 
563         ret = nxt_h1p_header_process(task, h1p, r);
564 
565         if (nxt_fast_path(ret == NXT_OK)) {
566 
567 #if (NXT_TLS)
568             if (c->u.tls == NULL && r->conf->socket_conf->tls != NULL) {
569                 status = NXT_HTTP_TO_HTTPS;
570                 goto error;
571             }
572 #endif
573 
574             r->state->ready_handler(task, r, NULL);
575             return;
576         }
577 
578         status = ret;
579         goto error;
580 
581     case NXT_AGAIN:
582         status = nxt_h1p_header_buffer_test(task, h1p, c, r->conf->socket_conf);
583 
584         if (nxt_fast_path(status == NXT_OK)) {
585             c->read_state = &nxt_h1p_header_parse_state;
586 
587             nxt_conn_read(task->thread->engine, c);
588             return;
589         }
590 
591         break;
592 
593     case NXT_HTTP_PARSE_INVALID:
594         status = NXT_HTTP_BAD_REQUEST;
595         break;
596 
597     case NXT_HTTP_PARSE_UNSUPPORTED_VERSION:
598         status = NXT_HTTP_VERSION_NOT_SUPPORTED;
599         break;
600 
601     case NXT_HTTP_PARSE_TOO_LARGE_FIELD:
602         status = NXT_HTTP_REQUEST_HEADER_FIELDS_TOO_LARGE;
603         break;
604 
605     default:
606     case NXT_ERROR:
607         status = NXT_HTTP_INTERNAL_SERVER_ERROR;
608         break;
609     }
610 
611     (void) nxt_h1p_header_process(task, h1p, r);
612 
613 error:
614 
615     h1p->keepalive = 0;
616 
617     nxt_http_request_error(task, r, status);
618 }
619 
620 
621 static nxt_int_t
622 nxt_h1p_header_process(nxt_task_t *task, nxt_h1proto_t *h1p,
623     nxt_http_request_t *r)
624 {
625     u_char     *m;
626     nxt_int_t  ret;
627 
628     r->target.start = h1p->parser.target_start;
629     r->target.length = h1p->parser.target_end - h1p->parser.target_start;
630 
631     if (h1p->parser.version.ui64 != 0) {
632         r->version.start = h1p->parser.version.str;
633         r->version.length = sizeof(h1p->parser.version.str);
634     }
635 
636     r->method = &h1p->parser.method;
637     r->path = &h1p->parser.path;
638     r->args = &h1p->parser.args;
639 
640     r->fields = h1p->parser.fields;
641 
642     ret = nxt_http_fields_process(r->fields, &nxt_h1p_fields_hash, r);
643     if (nxt_slow_path(ret != NXT_OK)) {
644         return ret;
645     }
646 
647     if (h1p->connection_upgrade && h1p->upgrade_websocket) {
648         m = h1p->parser.method.start;
649 
650         if (nxt_slow_path(h1p->parser.method.length != 3
651                           || m[0] != 'G'
652                           || m[1] != 'E'
653                           || m[2] != 'T'))
654         {
655             nxt_log(task, NXT_LOG_INFO, "h1p upgrade: bad method");
656 
657             return NXT_HTTP_BAD_REQUEST;
658         }
659 
660         if (nxt_slow_path(h1p->parser.version.s.minor != '1')) {
661             nxt_log(task, NXT_LOG_INFO, "h1p upgrade: bad protocol version");
662 
663             return NXT_HTTP_BAD_REQUEST;
664         }
665 
666         if (nxt_slow_path(h1p->websocket_key == NULL)) {
667             nxt_log(task, NXT_LOG_INFO,
668                     "h1p upgrade: bad or absent websocket key");
669 
670             return NXT_HTTP_BAD_REQUEST;
671         }
672 
673         if (nxt_slow_path(h1p->websocket_version_ok == 0)) {
674             nxt_log(task, NXT_LOG_INFO,
675                     "h1p upgrade: bad or absent websocket version");
676 
677             return NXT_HTTP_UPGRADE_REQUIRED;
678         }
679 
680         r->websocket_handshake = 1;
681     }
682 
683     return ret;
684 }
685 
686 
687 static nxt_int_t
688 nxt_h1p_header_buffer_test(nxt_task_t *task, nxt_h1proto_t *h1p, nxt_conn_t *c,
689     nxt_socket_conf_t *skcf)
690 {
691     size_t     size, used;
692     nxt_buf_t  *in, *b;
693 
694     in = c->read;
695 
696     if (nxt_buf_mem_free_size(&in->mem) == 0) {
697         size = skcf->large_header_buffer_size;
698         used = nxt_buf_mem_used_size(&in->mem);
699 
700         if (size <= used || h1p->nbuffers >= skcf->large_header_buffers) {
701             return NXT_HTTP_REQUEST_HEADER_FIELDS_TOO_LARGE;
702         }
703 
704         b = nxt_buf_mem_alloc(c->mem_pool, size, 0);
705         if (nxt_slow_path(b == NULL)) {
706             return NXT_HTTP_INTERNAL_SERVER_ERROR;
707         }
708 
709         b->mem.free = nxt_cpymem(b->mem.pos, in->mem.pos, used);
710 
711         in->next = h1p->buffers;
712         h1p->buffers = in;
713         h1p->nbuffers++;
714 
715         c->read = b;
716     }
717 
718     return NXT_OK;
719 }
720 
721 
722 static nxt_int_t
723 nxt_h1p_connection(void *ctx, nxt_http_field_t *field, uintptr_t data)
724 {
725     nxt_http_request_t  *r;
726 
727     r = ctx;
728     field->hopbyhop = 1;
729 
730     if (field->value_length == 5 && nxt_memcmp(field->value, "close", 5) == 0) {
731         r->proto.h1->keepalive = 0;
732 
733     } else if (field->value_length == 7
734                && nxt_memcasecmp(field->value, "upgrade", 7) == 0)
735     {
736         r->proto.h1->connection_upgrade = 1;
737     }
738 
739     return NXT_OK;
740 }
741 
742 
743 static nxt_int_t
744 nxt_h1p_upgrade(void *ctx, nxt_http_field_t *field, uintptr_t data)
745 {
746     nxt_http_request_t  *r;
747 
748     r = ctx;
749 
750     if (field->value_length == 9
751         && nxt_memcasecmp(field->value, "websocket", 9) == 0)
752     {
753         r->proto.h1->upgrade_websocket = 1;
754     }
755 
756     return NXT_OK;
757 }
758 
759 
760 static nxt_int_t
761 nxt_h1p_websocket_key(void *ctx, nxt_http_field_t *field, uintptr_t data)
762 {
763     nxt_http_request_t  *r;
764 
765     r = ctx;
766 
767     if (field->value_length == 24) {
768         r->proto.h1->websocket_key = field;
769     }
770 
771     return NXT_OK;
772 }
773 
774 
775 static nxt_int_t
776 nxt_h1p_websocket_version(void *ctx, nxt_http_field_t *field, uintptr_t data)
777 {
778     nxt_http_request_t  *r;
779 
780     r = ctx;
781 
782     if (field->value_length == 2
783         && field->value[0] == '1' && field->value[1] == '3')
784     {
785         r->proto.h1->websocket_version_ok = 1;
786     }
787 
788     return NXT_OK;
789 }
790 
791 
792 static nxt_int_t
793 nxt_h1p_transfer_encoding(void *ctx, nxt_http_field_t *field, uintptr_t data)
794 {
795     nxt_http_te_t       te;
796     nxt_http_request_t  *r;
797 
798     r = ctx;
799     field->skip = 1;
800     field->hopbyhop = 1;
801 
802     if (field->value_length == 7
803         && nxt_memcmp(field->value, "chunked", 7) == 0)
804     {
805         te = NXT_HTTP_TE_CHUNKED;
806 
807     } else {
808         te = NXT_HTTP_TE_UNSUPPORTED;
809     }
810 
811     r->proto.h1->transfer_encoding = te;
812 
813     return NXT_OK;
814 }
815 
816 
817 static void
818 nxt_h1p_request_body_read(nxt_task_t *task, nxt_http_request_t *r)
819 {
820     size_t             size, body_length;
821     nxt_buf_t          *in, *b;
822     nxt_conn_t         *c;
823     nxt_h1proto_t      *h1p;
824     nxt_http_status_t  status;
825 
826     h1p = r->proto.h1;
827 
828     nxt_debug(task, "h1p request body read %O te:%d",
829               r->content_length_n, h1p->transfer_encoding);
830 
831     switch (h1p->transfer_encoding) {
832 
833     case NXT_HTTP_TE_CHUNKED:
834         status = NXT_HTTP_LENGTH_REQUIRED;
835         goto error;
836 
837     case NXT_HTTP_TE_UNSUPPORTED:
838         status = NXT_HTTP_NOT_IMPLEMENTED;
839         goto error;
840 
841     default:
842     case NXT_HTTP_TE_NONE:
843         break;
844     }
845 
846     if (r->content_length_n == -1 || r->content_length_n == 0) {
847         goto ready;
848     }
849 
850     body_length = (size_t) r->content_length_n;
851 
852     b = r->body;
853 
854     if (b == NULL) {
855         b = nxt_buf_mem_alloc(r->mem_pool, body_length, 0);
856         if (nxt_slow_path(b == NULL)) {
857             status = NXT_HTTP_INTERNAL_SERVER_ERROR;
858             goto error;
859         }
860 
861         r->body = b;
862     }
863 
864     in = h1p->conn->read;
865 
866     size = nxt_buf_mem_used_size(&in->mem);
867 
868     if (size != 0) {
869         if (size > body_length) {
870             size = body_length;
871         }
872 
873         b->mem.free = nxt_cpymem(b->mem.free, in->mem.pos, size);
874         in->mem.pos += size;
875     }
876 
877     size = nxt_buf_mem_free_size(&b->mem);
878 
879     nxt_debug(task, "h1p body rest: %uz", size);
880 
881     if (size != 0) {
882         in->next = h1p->buffers;
883         h1p->buffers = in;
884         h1p->nbuffers++;
885 
886         c = h1p->conn;
887         c->read = b;
888         c->read_state = &nxt_h1p_read_body_state;
889 
890         nxt_conn_read(task->thread->engine, c);
891         return;
892     }
893 
894 ready:
895 
896     r->state->ready_handler(task, r, NULL);
897 
898     return;
899 
900 error:
901 
902     h1p->keepalive = 0;
903 
904     nxt_http_request_error(task, r, status);
905 }
906 
907 
908 static const nxt_conn_state_t  nxt_h1p_read_body_state
909     nxt_aligned(64) =
910 {
911     .ready_handler = nxt_h1p_conn_request_body_read,
912     .close_handler = nxt_h1p_conn_request_error,
913     .error_handler = nxt_h1p_conn_request_error,
914 
915     .timer_handler = nxt_h1p_conn_request_timeout,
916     .timer_value = nxt_h1p_conn_request_timer_value,
917     .timer_data = offsetof(nxt_socket_conf_t, body_read_timeout),
918     .timer_autoreset = 1,
919 };
920 
921 
922 static void
923 nxt_h1p_conn_request_body_read(nxt_task_t *task, void *obj, void *data)
924 {
925     size_t              size;
926     nxt_conn_t          *c;
927     nxt_h1proto_t       *h1p;
928     nxt_http_request_t  *r;
929     nxt_event_engine_t  *engine;
930 
931     c = obj;
932     h1p = data;
933 
934     nxt_debug(task, "h1p conn request body read");
935 
936     size = nxt_buf_mem_free_size(&c->read->mem);
937 
938     nxt_debug(task, "h1p body rest: %uz", size);
939 
940     engine = task->thread->engine;
941 
942     if (size != 0) {
943         nxt_conn_read(engine, c);
944 
945     } else {
946         c->read = NULL;
947         r = h1p->request;
948 
949         r->state->ready_handler(task, r, NULL);
950     }
951 }
952 
953 
954 static void
955 nxt_h1p_request_local_addr(nxt_task_t *task, nxt_http_request_t *r)
956 {
957     r->local = nxt_conn_local_addr(task, r->proto.h1->conn);
958 }
959 
960 
961 #define NXT_HTTP_LAST_INFORMATIONAL                                           \
962     (NXT_HTTP_CONTINUE + nxt_nitems(nxt_http_informational) - 1)
963 
964 static const nxt_str_t  nxt_http_informational[] = {
965     nxt_string("HTTP/1.1 100 Continue\r\n"),
966     nxt_string("HTTP/1.1 101 Switching Protocols\r\n"),
967 };
968 
969 
970 #define NXT_HTTP_LAST_SUCCESS                                                 \
971     (NXT_HTTP_OK + nxt_nitems(nxt_http_success) - 1)
972 
973 static const nxt_str_t  nxt_http_success[] = {
974     nxt_string("HTTP/1.1 200 OK\r\n"),
975     nxt_string("HTTP/1.1 201 Created\r\n"),
976     nxt_string("HTTP/1.1 202 Accepted\r\n"),
977     nxt_string("HTTP/1.1 203 Non-Authoritative Information\r\n"),
978     nxt_string("HTTP/1.1 204 No Content\r\n"),
979     nxt_string("HTTP/1.1 205 Reset Content\r\n"),
980     nxt_string("HTTP/1.1 206 Partial Content\r\n"),
981 };
982 
983 
984 #define NXT_HTTP_LAST_REDIRECTION                                             \
985     (NXT_HTTP_MULTIPLE_CHOICES + nxt_nitems(nxt_http_redirection) - 1)
986 
987 static const nxt_str_t  nxt_http_redirection[] = {
988     nxt_string("HTTP/1.1 300 Multiple Choices\r\n"),
989     nxt_string("HTTP/1.1 301 Moved Permanently\r\n"),
990     nxt_string("HTTP/1.1 302 Found\r\n"),
991     nxt_string("HTTP/1.1 303 See Other\r\n"),
992     nxt_string("HTTP/1.1 304 Not Modified\r\n"),
993 };
994 
995 
996 #define NXT_HTTP_LAST_CLIENT_ERROR                                            \
997     (NXT_HTTP_BAD_REQUEST + nxt_nitems(nxt_http_client_error) - 1)
998 
999 static const nxt_str_t  nxt_http_client_error[] = {
1000     nxt_string("HTTP/1.1 400 Bad Request\r\n"),
1001     nxt_string("HTTP/1.1 401 Unauthorized\r\n"),
1002     nxt_string("HTTP/1.1 402 Payment Required\r\n"),
1003     nxt_string("HTTP/1.1 403 Forbidden\r\n"),
1004     nxt_string("HTTP/1.1 404 Not Found\r\n"),
1005     nxt_string("HTTP/1.1 405 Method Not Allowed\r\n"),
1006     nxt_string("HTTP/1.1 406 Not Acceptable\r\n"),
1007     nxt_string("HTTP/1.1 407 Proxy Authentication Required\r\n"),
1008     nxt_string("HTTP/1.1 408 Request Timeout\r\n"),
1009     nxt_string("HTTP/1.1 409 Conflict\r\n"),
1010     nxt_string("HTTP/1.1 410 Gone\r\n"),
1011     nxt_string("HTTP/1.1 411 Length Required\r\n"),
1012     nxt_string("HTTP/1.1 412 Precondition Failed\r\n"),
1013     nxt_string("HTTP/1.1 413 Payload Too Large\r\n"),
1014     nxt_string("HTTP/1.1 414 URI Too Long\r\n"),
1015     nxt_string("HTTP/1.1 415 Unsupported Media Type\r\n"),
1016     nxt_string("HTTP/1.1 416 Range Not Satisfiable\r\n"),
1017     nxt_string("HTTP/1.1 417 Expectation Failed\r\n"),
1018     nxt_string("HTTP/1.1 418\r\n"),
1019     nxt_string("HTTP/1.1 419\r\n"),
1020     nxt_string("HTTP/1.1 420\r\n"),
1021     nxt_string("HTTP/1.1 421\r\n"),
1022     nxt_string("HTTP/1.1 422\r\n"),
1023     nxt_string("HTTP/1.1 423\r\n"),
1024     nxt_string("HTTP/1.1 424\r\n"),
1025     nxt_string("HTTP/1.1 425\r\n"),
1026     nxt_string("HTTP/1.1 426 Upgrade Required\r\n"),
1027     nxt_string("HTTP/1.1 427\r\n"),
1028     nxt_string("HTTP/1.1 428\r\n"),
1029     nxt_string("HTTP/1.1 429\r\n"),
1030     nxt_string("HTTP/1.1 430\r\n"),
1031     nxt_string("HTTP/1.1 431 Request Header Fields Too Large\r\n"),
1032 };
1033 
1034 
1035 #define NXT_HTTP_LAST_NGINX_ERROR                                             \
1036     (NXT_HTTP_TO_HTTPS + nxt_nitems(nxt_http_nginx_error) - 1)
1037 
1038 static const nxt_str_t  nxt_http_nginx_error[] = {
1039     nxt_string("HTTP/1.1 400 "
1040                "The plain HTTP request was sent to HTTPS port\r\n"),
1041 };
1042 
1043 
1044 #define NXT_HTTP_LAST_SERVER_ERROR                                            \
1045     (NXT_HTTP_INTERNAL_SERVER_ERROR + nxt_nitems(nxt_http_server_error) - 1)
1046 
1047 static const nxt_str_t  nxt_http_server_error[] = {
1048     nxt_string("HTTP/1.1 500 Internal Server Error\r\n"),
1049     nxt_string("HTTP/1.1 501 Not Implemented\r\n"),
1050     nxt_string("HTTP/1.1 502 Bad Gateway\r\n"),
1051     nxt_string("HTTP/1.1 503 Service Unavailable\r\n"),
1052     nxt_string("HTTP/1.1 504 Gateway Timeout\r\n"),
1053     nxt_string("HTTP/1.1 505 HTTP Version Not Supported\r\n"),
1054 };
1055 
1056 
1057 #define UNKNOWN_STATUS_LENGTH  nxt_length("HTTP/1.1 65536\r\n")
1058 
1059 static void
1060 nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r,
1061     nxt_work_handler_t body_handler, void *data)
1062 {
1063     u_char              *p;
1064     size_t              size;
1065     nxt_buf_t           *header;
1066     nxt_str_t           unknown_status;
1067     nxt_int_t           conn;
1068     nxt_uint_t          n;
1069     nxt_bool_t          http11;
1070     nxt_conn_t          *c;
1071     nxt_h1proto_t       *h1p;
1072     const nxt_str_t     *status;
1073     nxt_http_field_t    *field;
1074     u_char              buf[UNKNOWN_STATUS_LENGTH];
1075 
1076     static const char   chunked[] = "Transfer-Encoding: chunked\r\n";
1077     static const char   websocket_version[] = "Sec-WebSocket-Version: 13\r\n";
1078 
1079     static const nxt_str_t  connection[3] = {
1080         nxt_string("Connection: close\r\n"),
1081         nxt_string("Connection: keep-alive\r\n"),
1082         nxt_string("Upgrade: websocket\r\n"
1083                    "Connection: Upgrade\r\n"
1084                    "Sec-WebSocket-Accept: "),
1085     };
1086 
1087     nxt_debug(task, "h1p request header send");
1088 
1089     r->header_sent = 1;
1090     h1p = r->proto.h1;
1091     n = r->status;
1092 
1093     if (n >= NXT_HTTP_CONTINUE && n <= NXT_HTTP_LAST_INFORMATIONAL) {
1094         status = &nxt_http_informational[n - NXT_HTTP_CONTINUE];
1095 
1096     } else if (n >= NXT_HTTP_OK && n <= NXT_HTTP_LAST_SUCCESS) {
1097         status = &nxt_http_success[n - NXT_HTTP_OK];
1098 
1099     } else if (n >= NXT_HTTP_MULTIPLE_CHOICES
1100                && n <= NXT_HTTP_LAST_REDIRECTION)
1101     {
1102         status = &nxt_http_redirection[n - NXT_HTTP_MULTIPLE_CHOICES];
1103 
1104     } else if (n >= NXT_HTTP_BAD_REQUEST && n <= NXT_HTTP_LAST_CLIENT_ERROR) {
1105         status = &nxt_http_client_error[n - NXT_HTTP_BAD_REQUEST];
1106 
1107     } else if (n >= NXT_HTTP_TO_HTTPS && n <= NXT_HTTP_LAST_NGINX_ERROR) {
1108         status = &nxt_http_nginx_error[n - NXT_HTTP_TO_HTTPS];
1109 
1110     } else if (n >= NXT_HTTP_INTERNAL_SERVER_ERROR
1111                && n <= NXT_HTTP_LAST_SERVER_ERROR)
1112     {
1113         status = &nxt_http_server_error[n - NXT_HTTP_INTERNAL_SERVER_ERROR];
1114 
1115     } else {
1116         p = nxt_sprintf(buf, buf + UNKNOWN_STATUS_LENGTH,
1117                         "HTTP/1.1 %03d\r\n", n);
1118 
1119         unknown_status.length = p - buf;
1120         unknown_status.start = buf;
1121         status = &unknown_status;
1122     }
1123 
1124     size = status->length;
1125     /* Trailing CRLF at the end of header. */
1126     size += nxt_length("\r\n");
1127 
1128     conn = -1;
1129 
1130     if (r->websocket_handshake && n == NXT_HTTP_SWITCHING_PROTOCOLS) {
1131         h1p->websocket = 1;
1132         h1p->keepalive = 0;
1133         conn = 2;
1134         size += NXT_WEBSOCKET_ACCEPT_SIZE + 2;
1135 
1136     } else {
1137         http11 = (h1p->parser.version.s.minor != '0');
1138 
1139         if (r->resp.content_length == NULL || r->resp.content_length->skip) {
1140 
1141             if (http11) {
1142                 if (n != NXT_HTTP_NOT_MODIFIED
1143                     && n != NXT_HTTP_NO_CONTENT
1144                     && body_handler != NULL
1145                     && !h1p->websocket)
1146                 {
1147                     h1p->chunked = 1;
1148                     size += nxt_length(chunked);
1149                     /* Trailing CRLF will be added by the first chunk header. */
1150                     size -= nxt_length("\r\n");
1151                 }
1152 
1153             } else {
1154                 h1p->keepalive = 0;
1155             }
1156         }
1157 
1158         if (http11 ^ h1p->keepalive) {
1159             conn = h1p->keepalive;
1160         }
1161     }
1162 
1163     if (conn >= 0) {
1164         size += connection[conn].length;
1165     }
1166 
1167     nxt_list_each(field, r->resp.fields) {
1168 
1169         if (!field->skip) {
1170             size += field->name_length + field->value_length;
1171             size += nxt_length(": \r\n");
1172         }
1173 
1174     } nxt_list_loop;
1175 
1176     if (nxt_slow_path(n == NXT_HTTP_UPGRADE_REQUIRED)) {
1177         size += nxt_length(websocket_version);
1178     }
1179 
1180     header = nxt_http_buf_mem(task, r, size);
1181     if (nxt_slow_path(header == NULL)) {
1182         nxt_h1p_request_error(task, h1p, r);
1183         return;
1184     }
1185 
1186     p = nxt_cpymem(header->mem.free, status->start, status->length);
1187 
1188     nxt_list_each(field, r->resp.fields) {
1189 
1190         if (!field->skip) {
1191             p = nxt_cpymem(p, field->name, field->name_length);
1192             *p++ = ':'; *p++ = ' ';
1193             p = nxt_cpymem(p, field->value, field->value_length);
1194             *p++ = '\r'; *p++ = '\n';
1195         }
1196 
1197     } nxt_list_loop;
1198 
1199     if (conn >= 0) {
1200         p = nxt_cpymem(p, connection[conn].start, connection[conn].length);
1201     }
1202 
1203     if (h1p->websocket) {
1204         nxt_websocket_accept(p, h1p->websocket_key->value);
1205         p += NXT_WEBSOCKET_ACCEPT_SIZE;
1206 
1207         *p++ = '\r'; *p++ = '\n';
1208     }
1209 
1210     if (nxt_slow_path(n == NXT_HTTP_UPGRADE_REQUIRED)) {
1211         p = nxt_cpymem(p, websocket_version, nxt_length(websocket_version));
1212     }
1213 
1214     if (h1p->chunked) {
1215         p = nxt_cpymem(p, chunked, nxt_length(chunked));
1216         /* Trailing CRLF will be added by the first chunk header. */
1217 
1218     } else {
1219         *p++ = '\r'; *p++ = '\n';
1220     }
1221 
1222     header->mem.free = p;
1223 
1224     h1p->header_size = nxt_buf_mem_used_size(&header->mem);
1225 
1226     c = h1p->conn;
1227 
1228     c->write = header;
1229     h1p->conn_write_tail = &header->next;
1230     c->write_state = &nxt_h1p_request_send_state;
1231 
1232     if (body_handler != NULL) {
1233         /*
1234          * The body handler will run before c->io->write() handler,
1235          * because the latter was inqueued by nxt_conn_write()
1236          * in engine->write_work_queue.
1237          */
1238         nxt_work_queue_add(&task->thread->engine->fast_work_queue,
1239                            body_handler, task, r, data);
1240 
1241     } else {
1242         header->next = nxt_http_buf_last(r);
1243     }
1244 
1245     nxt_conn_write(task->thread->engine, c);
1246 
1247     if (h1p->websocket) {
1248         nxt_h1p_websocket_first_frame_start(task, r, c->read);
1249     }
1250 }
1251 
1252 
1253 void
1254 nxt_h1p_complete_buffers(nxt_task_t *task, nxt_h1proto_t *h1p)
1255 {
1256     size_t         size;
1257     nxt_buf_t      *b, *in, *next;
1258     nxt_conn_t     *c;
1259 
1260     nxt_debug(task, "h1p complete buffers");
1261 
1262     b = h1p->buffers;
1263     c = h1p->conn;
1264     in = c->read;
1265 
1266     if (b != NULL) {
1267         if (in == NULL) {
1268             /* A request with large body. */
1269             in = b;
1270             c->read = in;
1271 
1272             b = in->next;
1273             in->next = NULL;
1274         }
1275 
1276         while (b != NULL) {
1277             next = b->next;
1278             b->next = NULL;
1279 
1280             nxt_work_queue_add(&task->thread->engine->fast_work_queue,
1281                                b->completion_handler, task, b, b->parent);
1282 
1283             b = next;
1284         }
1285 
1286         h1p->buffers = NULL;
1287         h1p->nbuffers = 0;
1288     }
1289 
1290     if (in != NULL) {
1291         size = nxt_buf_mem_used_size(&in->mem);
1292 
1293         if (size == 0) {
1294             nxt_work_queue_add(&task->thread->engine->fast_work_queue,
1295                                in->completion_handler, task, in, in->parent);
1296 
1297             c->read = NULL;
1298         }
1299     }
1300 }
1301 
1302 
1303 static const nxt_conn_state_t  nxt_h1p_request_send_state
1304     nxt_aligned(64) =
1305 {
1306     .ready_handler = nxt_h1p_conn_sent,
1307     .error_handler = nxt_h1p_conn_request_error,
1308 
1309     .timer_handler = nxt_h1p_conn_request_send_timeout,
1310     .timer_value = nxt_h1p_conn_request_timer_value,
1311     .timer_data = offsetof(nxt_socket_conf_t, send_timeout),
1312     .timer_autoreset = 1,
1313 };
1314 
1315 
1316 static void
1317 nxt_h1p_request_send(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *out)
1318 {
1319     nxt_conn_t     *c;
1320     nxt_h1proto_t  *h1p;
1321 
1322     nxt_debug(task, "h1p request send");
1323 
1324     h1p = r->proto.h1;
1325     c = h1p->conn;
1326 
1327     if (h1p->chunked) {
1328         out = nxt_h1p_chunk_create(task, r, out);
1329         if (nxt_slow_path(out == NULL)) {
1330             nxt_h1p_request_error(task, h1p, r);
1331             return;
1332         }
1333     }
1334 
1335     if (c->write == NULL) {
1336         c->write = out;
1337         c->write_state = &nxt_h1p_request_send_state;
1338 
1339         nxt_conn_write(task->thread->engine, c);
1340 
1341     } else {
1342         *h1p->conn_write_tail = out;
1343     }
1344 
1345     while (out->next != NULL) {
1346         out = out->next;
1347     }
1348 
1349     h1p->conn_write_tail = &out->next;
1350 }
1351 
1352 
1353 static nxt_buf_t *
1354 nxt_h1p_chunk_create(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *out)
1355 {
1356     nxt_off_t          size;
1357     nxt_buf_t          *b, **prev, *header, *tail;
1358 
1359     const size_t       chunk_size = 2 * nxt_length("\r\n") + NXT_OFF_T_HEXLEN;
1360     static const char  tail_chunk[] = "\r\n0\r\n\r\n";
1361 
1362     size = 0;
1363     prev = &out;
1364 
1365     for (b = out; b != NULL; b = b->next) {
1366 
1367         if (nxt_buf_is_last(b)) {
1368             tail = nxt_http_buf_mem(task, r, sizeof(tail_chunk));
1369             if (nxt_slow_path(tail == NULL)) {
1370                 return NULL;
1371             }
1372 
1373             *prev = tail;
1374             tail->next = b;
1375             /*
1376              * The tail_chunk size with trailing zero is 8 bytes, so
1377              * memcpy may be inlined with just single 8 byte move operation.
1378              */
1379             nxt_memcpy(tail->mem.free, tail_chunk, sizeof(tail_chunk));
1380             tail->mem.free += nxt_length(tail_chunk);
1381 
1382             break;
1383         }
1384 
1385         size += nxt_buf_used_size(b);
1386         prev = &b->next;
1387     }
1388 
1389     if (size == 0) {
1390         return out;
1391     }
1392 
1393     header = nxt_http_buf_mem(task, r, chunk_size);
1394     if (nxt_slow_path(header == NULL)) {
1395         return NULL;
1396     }
1397 
1398     header->next = out;
1399     header->mem.free = nxt_sprintf(header->mem.free, header->mem.end,
1400                                    "\r\n%xO\r\n", size);
1401     return header;
1402 }
1403 
1404 
1405 static nxt_off_t
1406 nxt_h1p_request_body_bytes_sent(nxt_task_t *task, nxt_http_proto_t proto)
1407 {
1408     nxt_off_t      sent;
1409     nxt_h1proto_t  *h1p;
1410 
1411     h1p = proto.h1;
1412 
1413     sent = h1p->conn->sent - h1p->header_size;
1414 
1415     return (sent > 0) ? sent : 0;
1416 }
1417 
1418 
1419 static void
1420 nxt_h1p_request_discard(nxt_task_t *task, nxt_http_request_t *r,
1421     nxt_buf_t *last)
1422 {
1423     nxt_buf_t         *b;
1424     nxt_conn_t        *c;
1425     nxt_h1proto_t     *h1p;
1426     nxt_work_queue_t  *wq;
1427 
1428     nxt_debug(task, "h1p request discard");
1429 
1430     h1p = r->proto.h1;
1431     h1p->keepalive = 0;
1432 
1433     c = h1p->conn;
1434     b = c->write;
1435     c->write = NULL;
1436 
1437     wq = &task->thread->engine->fast_work_queue;
1438 
1439     nxt_sendbuf_drain(task, wq, b);
1440     nxt_sendbuf_drain(task, wq, last);
1441 }
1442 
1443 
1444 static void
1445 nxt_h1p_conn_request_error(nxt_task_t *task, void *obj, void *data)
1446 {
1447     nxt_h1proto_t       *h1p;
1448     nxt_http_request_t  *r;
1449 
1450     h1p = data;
1451 
1452     nxt_debug(task, "h1p conn request error");
1453 
1454     r = h1p->request;
1455 
1456     if (nxt_slow_path(r == NULL)) {
1457         nxt_h1p_shutdown(task, h1p->conn);
1458         return;
1459     }
1460 
1461     if (r->fields == NULL) {
1462         (void) nxt_h1p_header_process(task, h1p, r);
1463     }
1464 
1465     if (r->status == 0) {
1466         r->status = NXT_HTTP_BAD_REQUEST;
1467     }
1468 
1469     nxt_h1p_request_error(task, h1p, r);
1470 }
1471 
1472 
1473 static void
1474 nxt_h1p_conn_request_timeout(nxt_task_t *task, void *obj, void *data)
1475 {
1476     nxt_conn_t          *c;
1477     nxt_timer_t         *timer;
1478     nxt_h1proto_t       *h1p;
1479     nxt_http_request_t  *r;
1480 
1481     timer = obj;
1482 
1483     nxt_debug(task, "h1p conn request timeout");
1484 
1485     c = nxt_read_timer_conn(timer);
1486     c->block_read = 1;
1487     /*
1488      * Disable SO_LINGER off during socket closing
1489      * to send "408 Request Timeout" error response.
1490      */
1491     c->socket.timedout = 0;
1492 
1493     h1p = c->socket.data;
1494     h1p->keepalive = 0;
1495     r = h1p->request;
1496 
1497     if (r->fields == NULL) {
1498         (void) nxt_h1p_header_process(task, h1p, r);
1499     }
1500 
1501     nxt_http_request_error(task, r, NXT_HTTP_REQUEST_TIMEOUT);
1502 }
1503 
1504 
1505 static void
1506 nxt_h1p_conn_request_send_timeout(nxt_task_t *task, void *obj, void *data)
1507 {
1508     nxt_conn_t     *c;
1509     nxt_timer_t    *timer;
1510     nxt_h1proto_t  *h1p;
1511 
1512     timer = obj;
1513 
1514     nxt_debug(task, "h1p conn request send timeout");
1515 
1516     c = nxt_write_timer_conn(timer);
1517     c->block_write = 1;
1518     h1p = c->socket.data;
1519 
1520     nxt_h1p_request_error(task, h1p, h1p->request);
1521 }
1522 
1523 
1524 nxt_msec_t
1525 nxt_h1p_conn_request_timer_value(nxt_conn_t *c, uintptr_t data)
1526 {
1527     nxt_h1proto_t  *h1p;
1528 
1529     h1p = c->socket.data;
1530 
1531     return nxt_value_at(nxt_msec_t, h1p->request->conf->socket_conf, data);
1532 }
1533 
1534 
1535 nxt_inline void
1536 nxt_h1p_request_error(nxt_task_t *task, nxt_h1proto_t *h1p,
1537     nxt_http_request_t *r)
1538 {
1539     h1p->keepalive = 0;
1540 
1541     r->state->error_handler(task, r, h1p);
1542 }
1543 
1544 
1545 static void
1546 nxt_h1p_request_close(nxt_task_t *task, nxt_http_proto_t proto,
1547     nxt_socket_conf_joint_t *joint)
1548 {
1549     nxt_conn_t     *c;
1550     nxt_h1proto_t  *h1p;
1551 
1552     nxt_debug(task, "h1p request close");
1553 
1554     h1p = proto.h1;
1555     h1p->keepalive &= !h1p->request->inconsistent;
1556     h1p->request = NULL;
1557 
1558     nxt_router_conf_release(task, joint);
1559 
1560     c = h1p->conn;
1561     task = &c->task;
1562     c->socket.task = task;
1563     c->read_timer.task = task;
1564     c->write_timer.task = task;
1565 
1566     if (h1p->keepalive) {
1567         nxt_h1p_keepalive(task, h1p, c);
1568 
1569     } else {
1570         nxt_h1p_shutdown(task, c);
1571     }
1572 }
1573 
1574 
1575 static void
1576 nxt_h1p_conn_sent(nxt_task_t *task, void *obj, void *data)
1577 {
1578     nxt_conn_t          *c;
1579     nxt_event_engine_t  *engine;
1580 
1581     c = obj;
1582 
1583     nxt_debug(task, "h1p conn sent");
1584 
1585     engine = task->thread->engine;
1586 
1587     c->write = nxt_sendbuf_completion(task, &engine->fast_work_queue, c->write);
1588 
1589     if (c->write != NULL) {
1590         nxt_conn_write(engine, c);
1591     }
1592 }
1593 
1594 
1595 static void
1596 nxt_h1p_conn_close(nxt_task_t *task, void *obj, void *data)
1597 {
1598     nxt_conn_t  *c;
1599 
1600     c = obj;
1601 
1602     nxt_debug(task, "h1p conn close");
1603 
1604     nxt_h1p_shutdown(task, c);
1605 }
1606 
1607 
1608 static void
1609 nxt_h1p_conn_error(nxt_task_t *task, void *obj, void *data)
1610 {
1611     nxt_conn_t  *c;
1612 
1613     c = obj;
1614 
1615     nxt_debug(task, "h1p conn error");
1616 
1617     nxt_h1p_shutdown(task, c);
1618 }
1619 
1620 
1621 static nxt_msec_t
1622 nxt_h1p_conn_timer_value(nxt_conn_t *c, uintptr_t data)
1623 {
1624     nxt_socket_conf_joint_t  *joint;
1625 
1626     joint = c->listen->socket.data;
1627 
1628     return nxt_value_at(nxt_msec_t, joint->socket_conf, data);
1629 }
1630 
1631 
1632 static void
1633 nxt_h1p_keepalive(nxt_task_t *task, nxt_h1proto_t *h1p, nxt_conn_t *c)
1634 {
1635     size_t     size;
1636     nxt_buf_t  *in;
1637 
1638     nxt_debug(task, "h1p keepalive");
1639 
1640     if (!c->tcp_nodelay) {
1641         nxt_conn_tcp_nodelay_on(task, c);
1642     }
1643 
1644     nxt_h1p_complete_buffers(task, h1p);
1645 
1646     in = c->read;
1647 
1648     nxt_memzero(h1p, offsetof(nxt_h1proto_t, conn));
1649 
1650     c->sent = 0;
1651 
1652     if (in == NULL) {
1653         c->read_state = &nxt_h1p_keepalive_state;
1654 
1655         nxt_conn_read(task->thread->engine, c);
1656 
1657     } else {
1658         size = nxt_buf_mem_used_size(&in->mem);
1659 
1660         nxt_debug(task, "h1p pipelining");
1661 
1662         nxt_memmove(in->mem.start, in->mem.pos, size);
1663 
1664         in->mem.pos = in->mem.start;
1665         in->mem.free = in->mem.start + size;
1666 
1667         nxt_h1p_conn_request_init(task, c, c->socket.data);
1668     }
1669 }
1670 
1671 
1672 static const nxt_conn_state_t  nxt_h1p_keepalive_state
1673     nxt_aligned(64) =
1674 {
1675     .ready_handler = nxt_h1p_conn_request_init,
1676     .close_handler = nxt_h1p_conn_close,
1677     .error_handler = nxt_h1p_conn_error,
1678 
1679     .io_read_handler = nxt_h1p_idle_io_read_handler,
1680 
1681     .timer_handler = nxt_h1p_idle_timeout,
1682     .timer_value = nxt_h1p_conn_timer_value,
1683     .timer_data = offsetof(nxt_socket_conf_t, idle_timeout),
1684     .timer_autoreset = 1,
1685 };
1686 
1687 
1688 const nxt_conn_state_t  nxt_h1p_idle_close_state
1689     nxt_aligned(64) =
1690 {
1691     .close_handler = nxt_h1p_idle_close,
1692 };
1693 
1694 
1695 static void
1696 nxt_h1p_idle_close(nxt_task_t *task, void *obj, void *data)
1697 {
1698     nxt_conn_t  *c;
1699 
1700     c = obj;
1701 
1702     nxt_debug(task, "h1p idle close");
1703 
1704     nxt_h1p_idle_response(task, c);
1705 }
1706 
1707 
1708 static void
1709 nxt_h1p_idle_timeout(nxt_task_t *task, void *obj, void *data)
1710 {
1711     nxt_conn_t   *c;
1712     nxt_timer_t  *timer;
1713 
1714     timer = obj;
1715 
1716     nxt_debug(task, "h1p idle timeout");
1717 
1718     c = nxt_read_timer_conn(timer);
1719     c->block_read = 1;
1720 
1721     nxt_h1p_idle_response(task, c);
1722 }
1723 
1724 
1725 #define NXT_H1P_IDLE_TIMEOUT                                                  \
1726      "HTTP/1.1 408 Request Timeout\r\n"                                       \
1727      "Server: " NXT_SERVER "\r\n"                                             \
1728      "Connection: close\r\n"                                                  \
1729      "Content-Length: 0\r\n"                                                  \
1730      "Date: "
1731 
1732 
1733 static void
1734 nxt_h1p_idle_response(nxt_task_t *task, nxt_conn_t *c)
1735 {
1736     u_char         *p;
1737     size_t         size;
1738     nxt_buf_t      *out, *last;
1739     nxt_h1proto_t  *h1p;
1740 
1741     size = nxt_length(NXT_H1P_IDLE_TIMEOUT)
1742            + nxt_http_date_cache.size
1743            + nxt_length("\r\n\r\n");
1744 
1745     out = nxt_buf_mem_alloc(c->mem_pool, size, 0);
1746     if (nxt_slow_path(out == NULL)) {
1747         goto fail;
1748     }
1749 
1750     p = nxt_cpymem(out->mem.free, NXT_H1P_IDLE_TIMEOUT,
1751                    nxt_length(NXT_H1P_IDLE_TIMEOUT));
1752 
1753     p = nxt_thread_time_string(task->thread, &nxt_http_date_cache, p);
1754 
1755     out->mem.free = nxt_cpymem(p, "\r\n\r\n", 4);
1756 
1757     last = nxt_mp_zget(c->mem_pool, NXT_BUF_SYNC_SIZE);
1758     if (nxt_slow_path(last == NULL)) {
1759         goto fail;
1760     }
1761 
1762     out->next = last;
1763     nxt_buf_set_sync(last);
1764     nxt_buf_set_last(last);
1765 
1766     last->completion_handler = nxt_h1p_idle_response_sent;
1767     last->parent = c;
1768 
1769     h1p = c->socket.data;
1770     h1p->conn_write_tail = &last->next;
1771 
1772     c->write = out;
1773     c->write_state = &nxt_h1p_timeout_response_state;
1774 
1775     nxt_conn_write(task->thread->engine, c);
1776     return;
1777 
1778 fail:
1779 
1780     nxt_h1p_shutdown(task, c);
1781 }
1782 
1783 
1784 static const nxt_conn_state_t  nxt_h1p_timeout_response_state
1785     nxt_aligned(64) =
1786 {
1787     .ready_handler = nxt_h1p_conn_sent,
1788     .error_handler = nxt_h1p_conn_error,
1789 
1790     .timer_handler = nxt_h1p_idle_response_timeout,
1791     .timer_value = nxt_h1p_idle_response_timer_value,
1792 };
1793 
1794 
1795 static void
1796 nxt_h1p_idle_response_sent(nxt_task_t *task, void *obj, void *data)
1797 {
1798     nxt_conn_t  *c;
1799 
1800     c = data;
1801 
1802     nxt_debug(task, "h1p idle timeout response sent");
1803 
1804     nxt_h1p_shutdown(task, c);
1805 }
1806 
1807 
1808 static void
1809 nxt_h1p_idle_response_timeout(nxt_task_t *task, void *obj, void *data)
1810 {
1811     nxt_conn_t   *c;
1812     nxt_timer_t  *timer;
1813 
1814     timer = obj;
1815 
1816     nxt_debug(task, "h1p idle timeout response timeout");
1817 
1818     c = nxt_read_timer_conn(timer);
1819     c->block_write = 1;
1820 
1821     nxt_h1p_shutdown(task, c);
1822 }
1823 
1824 
1825 static nxt_msec_t
1826 nxt_h1p_idle_response_timer_value(nxt_conn_t *c, uintptr_t data)
1827 {
1828     return 10 * 1000;
1829 }
1830 
1831 
1832 static void
1833 nxt_h1p_shutdown(nxt_task_t *task, nxt_conn_t *c)
1834 {
1835     nxt_timer_t    *timer;
1836     nxt_h1proto_t  *h1p;
1837 
1838     nxt_debug(task, "h1p shutdown");
1839 
1840     h1p = c->socket.data;
1841 
1842     if (nxt_slow_path(h1p != NULL && h1p->websocket_timer != NULL)) {
1843         timer = &h1p->websocket_timer->timer;
1844 
1845         if (timer->handler != nxt_h1p_conn_ws_shutdown) {
1846             timer->handler = nxt_h1p_conn_ws_shutdown;
1847             nxt_timer_add(task->thread->engine, timer, 0);
1848 
1849         } else {
1850             nxt_debug(task, "h1p already scheduled ws shutdown");
1851         }
1852 
1853     } else {
1854         nxt_h1p_closing(task, c);
1855     }
1856 }
1857 
1858 
1859 static void
1860 nxt_h1p_conn_ws_shutdown(nxt_task_t *task, void *obj, void *data)
1861 {
1862     nxt_timer_t                *timer;
1863     nxt_h1p_websocket_timer_t  *ws_timer;
1864 
1865     nxt_debug(task, "h1p conn ws shutdown");
1866 
1867     timer = obj;
1868     ws_timer = nxt_timer_data(timer, nxt_h1p_websocket_timer_t, timer);
1869 
1870     nxt_h1p_closing(task, ws_timer->h1p->conn);
1871 }
1872 
1873 
1874 static void
1875 nxt_h1p_closing(nxt_task_t *task, nxt_conn_t *c)
1876 {
1877     nxt_debug(task, "h1p closing");
1878 
1879     c->socket.data = NULL;
1880 
1881 #if (NXT_TLS)
1882 
1883     if (c->u.tls != NULL) {
1884         c->write_state = &nxt_h1p_shutdown_state;
1885 
1886         c->io->shutdown(task, c, NULL);
1887         return;
1888     }
1889 
1890 #endif
1891 
1892     nxt_h1p_conn_closing(task, c, NULL);
1893 }
1894 
1895 
1896 #if (NXT_TLS)
1897 
1898 static const nxt_conn_state_t  nxt_h1p_shutdown_state
1899     nxt_aligned(64) =
1900 {
1901     .ready_handler = nxt_h1p_conn_closing,
1902     .close_handler = nxt_h1p_conn_closing,
1903     .error_handler = nxt_h1p_conn_closing,
1904 };
1905 
1906 #endif
1907 
1908 
1909 static void
1910 nxt_h1p_conn_closing(nxt_task_t *task, void *obj, void *data)
1911 {
1912     nxt_conn_t  *c;
1913 
1914     c = obj;
1915 
1916     nxt_debug(task, "h1p conn closing");
1917 
1918     c->write_state = &nxt_h1p_close_state;
1919 
1920     nxt_conn_close(task->thread->engine, c);
1921 }
1922 
1923 
1924 static const nxt_conn_state_t  nxt_h1p_close_state
1925     nxt_aligned(64) =
1926 {
1927     .ready_handler = nxt_h1p_conn_free,
1928 };
1929 
1930 
1931 static void
1932 nxt_h1p_conn_free(nxt_task_t *task, void *obj, void *data)
1933 {
1934     nxt_conn_t          *c;
1935     nxt_listen_event_t  *lev;
1936     nxt_event_engine_t  *engine;
1937 
1938     c = obj;
1939 
1940     nxt_debug(task, "h1p conn free");
1941 
1942     nxt_queue_remove(&c->link);
1943 
1944     engine = task->thread->engine;
1945 
1946     nxt_sockaddr_cache_free(engine, c);
1947 
1948     lev = c->listen;
1949 
1950     nxt_conn_free(task, c);
1951 
1952     nxt_router_listen_event_release(&engine->task, lev, NULL);
1953 }
1954 
1955 
1956 static void
1957 nxt_h1p_peer_connect(nxt_task_t *task, nxt_http_peer_t *peer)
1958 {
1959     nxt_mp_t            *mp;
1960     nxt_int_t           ret;
1961     nxt_conn_t          *c, *client;
1962     nxt_h1proto_t       *h1p;
1963     nxt_fd_event_t      *socket;
1964     nxt_work_queue_t    *wq;
1965     nxt_http_request_t  *r;
1966 
1967     nxt_debug(task, "h1p peer connect");
1968 
1969     peer->status = NXT_HTTP_UNSET;
1970     r = peer->request;
1971 
1972     mp = nxt_mp_create(1024, 128, 256, 32);
1973 
1974     if (nxt_slow_path(mp == NULL)) {
1975         goto fail;
1976     }
1977 
1978     h1p = nxt_mp_zalloc(mp, sizeof(nxt_h1proto_t));
1979     if (nxt_slow_path(h1p == NULL)) {
1980         goto fail;
1981     }
1982 
1983     ret = nxt_http_parse_request_init(&h1p->parser, r->mem_pool);
1984     if (nxt_slow_path(ret != NXT_OK)) {
1985         goto fail;
1986     }
1987 
1988     c = nxt_conn_create(mp, task);
1989     if (nxt_slow_path(c == NULL)) {
1990         goto fail;
1991     }
1992 
1993     c->mem_pool = mp;
1994     h1p->conn = c;
1995 
1996     peer->proto.h1 = h1p;
1997     h1p->request = r;
1998 
1999     c->socket.task = task;
2000     c->read_timer.task = task;
2001     c->write_timer.task = task;
2002     c->socket.data = peer;
2003     c->remote = peer->server->sockaddr;
2004 
2005     c->socket.write_ready = 1;
2006     c->write_state = &nxt_h1p_peer_connect_state;
2007 
2008     /*
2009      * TODO: queues should be implemented via client proto interface.
2010      */
2011     client = r->proto.h1->conn;
2012 
2013     socket = &client->socket;
2014     wq = socket->read_work_queue;
2015     c->read_work_queue = wq;
2016     c->socket.read_work_queue = wq;
2017     c->read_timer.work_queue = wq;
2018 
2019     wq = socket->write_work_queue;
2020     c->write_work_queue = wq;
2021     c->socket.write_work_queue = wq;
2022     c->write_timer.work_queue = wq;
2023     /* TODO END */
2024 
2025     nxt_conn_connect(task->thread->engine, c);
2026 
2027     return;
2028 
2029 fail:
2030 
2031     peer->status = NXT_HTTP_INTERNAL_SERVER_ERROR;
2032 
2033     r->state->error_handler(task, r, peer);
2034 }
2035 
2036 
2037 static const nxt_conn_state_t  nxt_h1p_peer_connect_state
2038     nxt_aligned(64) =
2039 {
2040     .ready_handler = nxt_h1p_peer_connected,
2041     .close_handler = nxt_h1p_peer_refused,
2042     .error_handler = nxt_h1p_peer_error,
2043 
2044     .timer_handler = nxt_h1p_peer_send_timeout,
2045     .timer_value = nxt_h1p_peer_timer_value,
2046     .timer_data = offsetof(nxt_socket_conf_t, proxy_timeout),
2047 };
2048 
2049 
2050 static void
2051 nxt_h1p_peer_connected(nxt_task_t *task, void *obj, void *data)
2052 {
2053     nxt_http_peer_t     *peer;
2054     nxt_http_request_t  *r;
2055 
2056     peer = data;
2057 
2058     nxt_debug(task, "h1p peer connected");
2059 
2060     r = peer->request;
2061     r->state->ready_handler(task, r, peer);
2062 }
2063 
2064 
2065 static void
2066 nxt_h1p_peer_refused(nxt_task_t *task, void *obj, void *data)
2067 {
2068     nxt_http_peer_t     *peer;
2069     nxt_http_request_t  *r;
2070 
2071     peer = data;
2072 
2073     nxt_debug(task, "h1p peer refused");
2074 
2075     //peer->status = NXT_HTTP_SERVICE_UNAVAILABLE;
2076     peer->status = NXT_HTTP_BAD_GATEWAY;
2077 
2078     r = peer->request;
2079     r->state->error_handler(task, r, peer);
2080 }
2081 
2082 
2083 static void
2084 nxt_h1p_peer_header_send(nxt_task_t *task, nxt_http_peer_t *peer)
2085 {
2086     u_char              *p;
2087     size_t              size;
2088     nxt_buf_t           *header, *body;
2089     nxt_conn_t          *c;
2090     nxt_http_field_t    *field;
2091     nxt_http_request_t  *r;
2092 
2093     nxt_debug(task, "h1p peer header send");
2094 
2095     r = peer->request;
2096 
2097     size = r->method->length + sizeof(" ") + r->target.length
2098            + sizeof(" HTTP/1.0\r\n")
2099            + sizeof("\r\n");
2100 
2101     nxt_list_each(field, r->fields) {
2102 
2103         if (!field->hopbyhop) {
2104             size += field->name_length + field->value_length;
2105             size += nxt_length(": \r\n");
2106         }
2107 
2108     } nxt_list_loop;
2109 
2110     header = nxt_http_buf_mem(task, r, size);
2111     if (nxt_slow_path(header == NULL)) {
2112         r->state->error_handler(task, r, peer);
2113         return;
2114     }
2115 
2116     p = header->mem.free;
2117 
2118     p = nxt_cpymem(p, r->method->start, r->method->length);
2119     *p++ = ' ';
2120     p = nxt_cpymem(p, r->target.start, r->target.length);
2121     p = nxt_cpymem(p, " HTTP/1.0\r\n", 11);
2122 
2123     nxt_list_each(field, r->fields) {
2124 
2125         if (!field->hopbyhop) {
2126             p = nxt_cpymem(p, field->name, field->name_length);
2127             *p++ = ':'; *p++ = ' ';
2128             p = nxt_cpymem(p, field->value, field->value_length);
2129             *p++ = '\r'; *p++ = '\n';
2130         }
2131 
2132     } nxt_list_loop;
2133 
2134     *p++ = '\r'; *p++ = '\n';
2135     header->mem.free = p;
2136     size = p - header->mem.pos;
2137 
2138     c = peer->proto.h1->conn;
2139     c->write = header;
2140     c->write_state = &nxt_h1p_peer_header_send_state;
2141 
2142     if (r->body != NULL) {
2143         body = nxt_buf_mem_alloc(r->mem_pool, 0, 0);
2144         if (nxt_slow_path(body == NULL)) {
2145             r->state->error_handler(task, r, peer);
2146             return;
2147         }
2148 
2149         header->next = body;
2150 
2151         body->mem = r->body->mem;
2152         size += nxt_buf_mem_used_size(&body->mem);
2153 
2154 //        nxt_mp_retain(r->mem_pool);
2155     }
2156 
2157     if (size > 16384) {
2158         /* Use proxy_send_timeout instead of proxy_timeout. */
2159         c->write_state = &nxt_h1p_peer_header_body_send_state;
2160     }
2161 
2162     nxt_conn_write(task->thread->engine, c);
2163 }
2164 
2165 
2166 static const nxt_conn_state_t  nxt_h1p_peer_header_send_state
2167     nxt_aligned(64) =
2168 {
2169     .ready_handler = nxt_h1p_peer_header_sent,
2170     .error_handler = nxt_h1p_peer_error,
2171 
2172     .timer_handler = nxt_h1p_peer_send_timeout,
2173     .timer_value = nxt_h1p_peer_timer_value,
2174     .timer_data = offsetof(nxt_socket_conf_t, proxy_timeout),
2175 };
2176 
2177 
2178 static const nxt_conn_state_t  nxt_h1p_peer_header_body_send_state
2179     nxt_aligned(64) =
2180 {
2181     .ready_handler = nxt_h1p_peer_header_sent,
2182     .error_handler = nxt_h1p_peer_error,
2183 
2184     .timer_handler = nxt_h1p_peer_send_timeout,
2185     .timer_value = nxt_h1p_peer_timer_value,
2186     .timer_data = offsetof(nxt_socket_conf_t, proxy_send_timeout),
2187     .timer_autoreset = 1,
2188 };
2189 
2190 
2191 static void
2192 nxt_h1p_peer_header_sent(nxt_task_t *task, void *obj, void *data)
2193 {
2194     nxt_conn_t          *c;
2195     nxt_http_peer_t     *peer;
2196     nxt_http_request_t  *r;
2197     nxt_event_engine_t  *engine;
2198 
2199     c = obj;
2200     peer = data;
2201 
2202     nxt_debug(task, "h1p peer header sent");
2203 
2204     engine = task->thread->engine;
2205 
2206     c->write = nxt_sendbuf_completion(task, &engine->fast_work_queue, c->write);
2207 
2208     if (c->write == NULL) {
2209         r = peer->request;
2210         r->state->ready_handler(task, r, peer);
2211         return;
2212     }
2213 
2214     nxt_conn_write(engine, c);
2215 }
2216 
2217 
2218 static void
2219 nxt_h1p_peer_header_read(nxt_task_t *task, nxt_http_peer_t *peer)
2220 {
2221     nxt_conn_t  *c;
2222 
2223     nxt_debug(task, "h1p peer header read");
2224 
2225     c = peer->proto.h1->conn;
2226 
2227     if (c->write_timer.enabled) {
2228         c->read_state = &nxt_h1p_peer_header_read_state;
2229 
2230     } else {
2231         c->read_state = &nxt_h1p_peer_header_read_timer_state;
2232     }
2233 
2234     nxt_conn_read(task->thread->engine, c);
2235 }
2236 
2237 
2238 static const nxt_conn_state_t  nxt_h1p_peer_header_read_state
2239     nxt_aligned(64) =
2240 {
2241     .ready_handler = nxt_h1p_peer_header_read_done,
2242     .close_handler = nxt_h1p_peer_closed,
2243     .error_handler = nxt_h1p_peer_error,
2244 
2245     .io_read_handler = nxt_h1p_peer_io_read_handler,
2246 };
2247 
2248 
2249 static const nxt_conn_state_t  nxt_h1p_peer_header_read_timer_state
2250     nxt_aligned(64) =
2251 {
2252     .ready_handler = nxt_h1p_peer_header_read_done,
2253     .close_handler = nxt_h1p_peer_closed,
2254     .error_handler = nxt_h1p_peer_error,
2255 
2256     .io_read_handler = nxt_h1p_peer_io_read_handler,
2257 
2258     .timer_handler = nxt_h1p_peer_read_timeout,
2259     .timer_value = nxt_h1p_peer_timer_value,
2260     .timer_data = offsetof(nxt_socket_conf_t, proxy_timeout),
2261 };
2262 
2263 
2264 static ssize_t
2265 nxt_h1p_peer_io_read_handler(nxt_task_t *task, nxt_conn_t *c)
2266 {
2267     size_t              size;
2268     ssize_t             n;
2269     nxt_buf_t           *b;
2270     nxt_http_peer_t     *peer;
2271     nxt_socket_conf_t   *skcf;
2272     nxt_http_request_t  *r;
2273 
2274     peer = c->socket.data;
2275     r = peer->request;
2276     b = c->read;
2277 
2278     if (b == NULL) {
2279         skcf = r->conf->socket_conf;
2280 
2281         size = (peer->header_received) ? skcf->proxy_buffer_size
2282                                        : skcf->proxy_header_buffer_size;
2283 
2284         nxt_debug(task, "h1p peer io read: %z", size);
2285 
2286         b = nxt_http_proxy_buf_mem_alloc(task, r, size);
2287         if (nxt_slow_path(b == NULL)) {
2288             c->socket.error = NXT_ENOMEM;
2289             return NXT_ERROR;
2290         }
2291     }
2292 
2293     n = c->io->recvbuf(c, b);
2294 
2295     if (n > 0) {
2296         c->read = b;
2297 
2298     } else {
2299         c->read = NULL;
2300         nxt_http_proxy_buf_mem_free(task, r, b);
2301     }
2302 
2303     return n;
2304 }
2305 
2306 
2307 static void
2308 nxt_h1p_peer_header_read_done(nxt_task_t *task, void *obj, void *data)
2309 {
2310     nxt_int_t           ret;
2311     nxt_buf_t           *b;
2312     nxt_conn_t          *c;
2313     nxt_http_peer_t     *peer;
2314     nxt_http_request_t  *r;
2315     nxt_event_engine_t  *engine;
2316 
2317     c = obj;
2318     peer = data;
2319 
2320     nxt_debug(task, "h1p peer header read done");
2321 
2322     b = c->read;
2323 
2324     ret = nxt_h1p_peer_header_parse(peer, &b->mem);
2325 
2326     r = peer->request;
2327 
2328     ret = nxt_expect(NXT_DONE, ret);
2329 
2330     if (ret != NXT_AGAIN) {
2331         engine = task->thread->engine;
2332         nxt_timer_disable(engine, &c->write_timer);
2333         nxt_timer_disable(engine, &c->read_timer);
2334     }
2335 
2336     switch (ret) {
2337 
2338     case NXT_DONE:
2339         peer->fields = peer->proto.h1->parser.fields;
2340 
2341         ret = nxt_http_fields_process(peer->fields,
2342                                       &nxt_h1p_peer_fields_hash, r);
2343         if (nxt_slow_path(ret != NXT_OK)) {
2344             peer->status = NXT_HTTP_INTERNAL_SERVER_ERROR;
2345             break;
2346         }
2347 
2348         c->read = NULL;
2349 
2350         if (nxt_buf_mem_used_size(&b->mem) != 0) {
2351             peer->body = b;
2352         }
2353 
2354         peer->header_received = 1;
2355 
2356         r->state->ready_handler(task, r, peer);
2357         return;
2358 
2359     case NXT_AGAIN:
2360         if (nxt_buf_mem_free_size(&b->mem) != 0) {
2361             nxt_conn_read(task->thread->engine, c);
2362             return;
2363         }
2364 
2365         /* Fall through. */
2366 
2367     default:
2368     case NXT_ERROR:
2369     case NXT_HTTP_PARSE_INVALID:
2370     case NXT_HTTP_PARSE_UNSUPPORTED_VERSION:
2371     case NXT_HTTP_PARSE_TOO_LARGE_FIELD:
2372         peer->status = NXT_HTTP_BAD_GATEWAY;
2373         break;
2374     }
2375 
2376     nxt_http_proxy_buf_mem_free(task, r, b);
2377 
2378     r->state->error_handler(task, r, peer);
2379 }
2380 
2381 
2382 static nxt_int_t
2383 nxt_h1p_peer_header_parse(nxt_http_peer_t *peer, nxt_buf_mem_t *bm)
2384 {
2385     u_char     *p;
2386     size_t     length;
2387     nxt_int_t  status;
2388 
2389     if (peer->status < 0) {
2390         length = nxt_buf_mem_used_size(bm);
2391 
2392         if (nxt_slow_path(length < 12)) {
2393             return NXT_AGAIN;
2394         }
2395 
2396         p = bm->pos;
2397 
2398         if (nxt_slow_path(nxt_memcmp(p, "HTTP/1.", 7) != 0
2399                           || (p[7] != '0' && p[7] != '1')))
2400         {
2401             return NXT_ERROR;
2402         }
2403 
2404         status = nxt_int_parse(&p[9], 3);
2405 
2406         if (nxt_slow_path(status < 0)) {
2407             return NXT_ERROR;
2408         }
2409 
2410         p += 12;
2411         length -= 12;
2412 
2413         p = nxt_memchr(p, '\n', length);
2414 
2415         if (nxt_slow_path(p == NULL)) {
2416             return NXT_AGAIN;
2417         }
2418 
2419         bm->pos = p + 1;
2420         peer->status = status;
2421     }
2422 
2423     return nxt_http_parse_fields(&peer->proto.h1->parser, bm);
2424 }
2425 
2426 
2427 static void
2428 nxt_h1p_peer_read(nxt_task_t *task, nxt_http_peer_t *peer)
2429 {
2430     nxt_conn_t  *c;
2431 
2432     nxt_debug(task, "h1p peer read");
2433 
2434     c = peer->proto.h1->conn;
2435     c->read_state = &nxt_h1p_peer_read_state;
2436 
2437     nxt_conn_read(task->thread->engine, c);
2438 }
2439 
2440 
2441 static const nxt_conn_state_t  nxt_h1p_peer_read_state
2442     nxt_aligned(64) =
2443 {
2444     .ready_handler = nxt_h1p_peer_read_done,
2445     .close_handler = nxt_h1p_peer_closed,
2446     .error_handler = nxt_h1p_peer_error,
2447 
2448     .io_read_handler = nxt_h1p_peer_io_read_handler,
2449 
2450     .timer_handler = nxt_h1p_peer_read_timeout,
2451     .timer_value = nxt_h1p_peer_timer_value,
2452     .timer_data = offsetof(nxt_socket_conf_t, proxy_read_timeout),
2453     .timer_autoreset = 1,
2454 };
2455 
2456 
2457 static void
2458 nxt_h1p_peer_read_done(nxt_task_t *task, void *obj, void *data)
2459 {
2460     nxt_conn_t          *c;
2461     nxt_http_peer_t     *peer;
2462     nxt_http_request_t  *r;
2463 
2464     c = obj;
2465     peer = data;
2466 
2467     nxt_debug(task, "h1p peer read done");
2468 
2469     peer->body = c->read;
2470     c->read = NULL;
2471 
2472     r = peer->request;
2473     r->state->ready_handler(task, r, peer);
2474 }
2475 
2476 
2477 static void
2478 nxt_h1p_peer_closed(nxt_task_t *task, void *obj, void *data)
2479 {
2480     nxt_http_peer_t     *peer;
2481     nxt_http_request_t  *r;
2482 
2483     peer = data;
2484 
2485     nxt_debug(task, "h1p peer closed");
2486 
2487     r = peer->request;
2488 
2489     if (peer->header_received) {
2490         peer->body = nxt_http_buf_last(r);
2491 
2492         peer->closed = 1;
2493 
2494         r->state->ready_handler(task, r, peer);
2495 
2496     } else {
2497         peer->status = NXT_HTTP_BAD_GATEWAY;
2498 
2499         r->state->error_handler(task, r, peer);
2500     }
2501 }
2502 
2503 
2504 static void
2505 nxt_h1p_peer_error(nxt_task_t *task, void *obj, void *data)
2506 {
2507     nxt_http_peer_t     *peer;
2508     nxt_http_request_t  *r;
2509 
2510     peer = data;
2511 
2512     nxt_debug(task, "h1p peer error");
2513 
2514     peer->status = NXT_HTTP_BAD_GATEWAY;
2515 
2516     r = peer->request;
2517     r->state->error_handler(task, r, peer);
2518 }
2519 
2520 
2521 static void
2522 nxt_h1p_peer_send_timeout(nxt_task_t *task, void *obj, void *data)
2523 {
2524     nxt_conn_t          *c;
2525     nxt_timer_t         *timer;
2526     nxt_http_peer_t     *peer;
2527     nxt_http_request_t  *r;
2528 
2529     timer = obj;
2530 
2531     nxt_debug(task, "h1p peer send timeout");
2532 
2533     c = nxt_write_timer_conn(timer);
2534     c->block_write = 1;
2535     c->block_read = 1;
2536 
2537     peer = c->socket.data;
2538     peer->status = NXT_HTTP_GATEWAY_TIMEOUT;
2539 
2540     r = peer->request;
2541     r->state->error_handler(task, r, peer);
2542 }
2543 
2544 
2545 static void
2546 nxt_h1p_peer_read_timeout(nxt_task_t *task, void *obj, void *data)
2547 {
2548     nxt_conn_t          *c;
2549     nxt_timer_t         *timer;
2550     nxt_http_peer_t     *peer;
2551     nxt_http_request_t  *r;
2552 
2553     timer = obj;
2554 
2555     nxt_debug(task, "h1p peer read timeout");
2556 
2557     c = nxt_read_timer_conn(timer);
2558     c->block_write = 1;
2559     c->block_read = 1;
2560 
2561     peer = c->socket.data;
2562     peer->status = NXT_HTTP_GATEWAY_TIMEOUT;
2563 
2564     r = peer->request;
2565     r->state->error_handler(task, r, peer);
2566 }
2567 
2568 
2569 static nxt_msec_t
2570 nxt_h1p_peer_timer_value(nxt_conn_t *c, uintptr_t data)
2571 {
2572     nxt_http_peer_t  *peer;
2573 
2574     peer = c->socket.data;
2575 
2576     return nxt_value_at(nxt_msec_t, peer->request->conf->socket_conf, data);
2577 }
2578 
2579 
2580 static void
2581 nxt_h1p_peer_close(nxt_task_t *task, nxt_http_peer_t *peer)
2582 {
2583     nxt_conn_t  *c;
2584 
2585     nxt_debug(task, "h1p peer close");
2586 
2587     peer->closed = 1;
2588 
2589     c = peer->proto.h1->conn;
2590     task = &c->task;
2591     c->socket.task = task;
2592     c->read_timer.task = task;
2593     c->write_timer.task = task;
2594 
2595     if (c->socket.fd != -1) {
2596         c->write_state = &nxt_h1p_peer_close_state;
2597 
2598         nxt_conn_close(task->thread->engine, c);
2599 
2600     } else {
2601         nxt_h1p_peer_free(task, c, NULL);
2602     }
2603 }
2604 
2605 
2606 static const nxt_conn_state_t  nxt_h1p_peer_close_state
2607     nxt_aligned(64) =
2608 {
2609     .ready_handler = nxt_h1p_peer_free,
2610 };
2611 
2612 
2613 static void
2614 nxt_h1p_peer_free(nxt_task_t *task, void *obj, void *data)
2615 {
2616     nxt_conn_t  *c;
2617 
2618     c = obj;
2619 
2620     nxt_debug(task, "h1p peer free");
2621 
2622     nxt_conn_free(task, c);
2623 }
2624