xref: /unit/src/nxt_h1proto_websocket.c (revision 1956:f4c32c2d595d)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #include <nxt_main.h>
7 #include <nxt_router.h>
8 #include <nxt_http.h>
9 #include <nxt_h1proto.h>
10 #include <nxt_websocket.h>
11 #include <nxt_websocket_header.h>
12 
13 typedef struct {
14     uint16_t   code;
15     uint8_t    args;
16     nxt_str_t  desc;
17 } nxt_ws_error_t;
18 
19 static void nxt_h1p_conn_ws_keepalive(nxt_task_t *task, void *obj, void *data);
20 static void nxt_h1p_conn_ws_frame_header_read(nxt_task_t *task, void *obj,
21     void *data);
22 static void nxt_h1p_conn_ws_keepalive_disable(nxt_task_t *task,
23     nxt_h1proto_t *h1p);
24 static void nxt_h1p_conn_ws_keepalive_enable(nxt_task_t *task,
25     nxt_h1proto_t *h1p);
26 static void nxt_h1p_conn_ws_frame_process(nxt_task_t *task, nxt_conn_t *c,
27     nxt_h1proto_t *h1p, nxt_websocket_header_t *wsh);
28 static void nxt_h1p_conn_ws_error(nxt_task_t *task, void *obj, void *data);
29 static ssize_t nxt_h1p_ws_io_read_handler(nxt_task_t *task, nxt_conn_t *c);
30 static void nxt_h1p_conn_ws_timeout(nxt_task_t *task, void *obj, void *data);
31 static void nxt_h1p_conn_ws_frame_payload_read(nxt_task_t *task, void *obj,
32     void *data);
33 static void hxt_h1p_send_ws_error(nxt_task_t *task, nxt_http_request_t *r,
34     const nxt_ws_error_t *err, ...);
35 static void nxt_h1p_conn_ws_error_sent(nxt_task_t *task, void *obj, void *data);
36 static void nxt_h1p_conn_ws_pong(nxt_task_t *task, void *obj, void *data);
37 
38 static const nxt_conn_state_t  nxt_h1p_read_ws_frame_header_state;
39 static const nxt_conn_state_t  nxt_h1p_read_ws_frame_payload_state;
40 
41 static const nxt_ws_error_t  nxt_ws_err_out_of_memory = {
42     NXT_WEBSOCKET_CR_INTERNAL_SERVER_ERROR,
43     0, nxt_string("Out of memory") };
44 static const nxt_ws_error_t  nxt_ws_err_too_big = {
45     NXT_WEBSOCKET_CR_MESSAGE_TOO_BIG,
46     1, nxt_string("Message too big: %uL bytes") };
47 static const nxt_ws_error_t  nxt_ws_err_invalid_close_code = {
48     NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
49     1, nxt_string("Close code %ud is not valid") };
50 static const nxt_ws_error_t  nxt_ws_err_going_away = {
51     NXT_WEBSOCKET_CR_GOING_AWAY,
52     0, nxt_string("Remote peer is going away") };
53 static const nxt_ws_error_t  nxt_ws_err_not_masked = {
54     NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
55     0, nxt_string("Not masked client frame") };
56 static const nxt_ws_error_t  nxt_ws_err_ctrl_fragmented = {
57     NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
58     0, nxt_string("Fragmented control frame") };
59 static const nxt_ws_error_t  nxt_ws_err_ctrl_too_big = {
60     NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
61     1, nxt_string("Control frame too big: %uL bytes") };
62 static const nxt_ws_error_t  nxt_ws_err_invalid_close_len = {
63     NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
64     0, nxt_string("Close frame payload length cannot be 1") };
65 static const nxt_ws_error_t  nxt_ws_err_invalid_opcode = {
66     NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
67     1, nxt_string("Unrecognized opcode %ud") };
68 static const nxt_ws_error_t  nxt_ws_err_cont_expected = {
69     NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
70     1, nxt_string("Continuation expected, but %ud opcode received") };
71 
72 void
nxt_h1p_websocket_first_frame_start(nxt_task_t * task,nxt_http_request_t * r,nxt_buf_t * ws_frame)73 nxt_h1p_websocket_first_frame_start(nxt_task_t *task, nxt_http_request_t *r,
74     nxt_buf_t *ws_frame)
75 {
76     nxt_conn_t            *c;
77     nxt_timer_t           *timer;
78     nxt_h1proto_t         *h1p;
79     nxt_websocket_conf_t  *websocket_conf;
80 
81     nxt_debug(task, "h1p ws first frame start");
82 
83     h1p = r->proto.h1;
84     c = h1p->conn;
85 
86     if (!c->tcp_nodelay) {
87         nxt_conn_tcp_nodelay_on(task, c);
88     }
89 
90     websocket_conf = &r->conf->socket_conf->websocket_conf;
91 
92     if (nxt_slow_path(websocket_conf->keepalive_interval != 0)) {
93         h1p->websocket_timer = nxt_mp_zget(c->mem_pool,
94                                            sizeof(nxt_h1p_websocket_timer_t));
95         if (nxt_slow_path(h1p->websocket_timer == NULL)) {
96             hxt_h1p_send_ws_error(task, r, &nxt_ws_err_out_of_memory);
97             return;
98         }
99 
100         h1p->websocket_timer->keepalive_interval =
101             websocket_conf->keepalive_interval;
102         h1p->websocket_timer->h1p = h1p;
103 
104         timer = &h1p->websocket_timer->timer;
105         timer->task = &c->task;
106         timer->work_queue = &task->thread->engine->fast_work_queue;
107         timer->log = &c->log;
108         timer->bias = NXT_TIMER_DEFAULT_BIAS;
109         timer->handler = nxt_h1p_conn_ws_keepalive;
110     }
111 
112     nxt_h1p_websocket_frame_start(task, r, ws_frame);
113 }
114 
115 
116 void
nxt_h1p_websocket_frame_start(nxt_task_t * task,nxt_http_request_t * r,nxt_buf_t * ws_frame)117 nxt_h1p_websocket_frame_start(nxt_task_t *task, nxt_http_request_t *r,
118     nxt_buf_t *ws_frame)
119 {
120     size_t         size;
121     nxt_buf_t      *in;
122     nxt_conn_t     *c;
123     nxt_h1proto_t  *h1p;
124 
125     nxt_debug(task, "h1p ws frame start");
126 
127     h1p = r->proto.h1;
128 
129     if (nxt_slow_path(h1p->websocket_closed)) {
130         return;
131     }
132 
133     c = h1p->conn;
134     c->read = ws_frame;
135 
136     nxt_h1p_complete_buffers(task, h1p, 0);
137 
138     in = c->read;
139     c->read_state = &nxt_h1p_read_ws_frame_header_state;
140 
141     if (in == NULL) {
142         nxt_conn_read(task->thread->engine, c);
143         nxt_h1p_conn_ws_keepalive_enable(task, h1p);
144 
145     } else {
146         size = nxt_buf_mem_used_size(&in->mem);
147 
148         nxt_debug(task, "h1p read client ws frame");
149 
150         nxt_memmove(in->mem.start, in->mem.pos, size);
151 
152         in->mem.pos = in->mem.start;
153         in->mem.free = in->mem.start + size;
154 
155         nxt_h1p_conn_ws_frame_header_read(task, c, h1p);
156     }
157 }
158 
159 
160 static void
nxt_h1p_conn_ws_keepalive(nxt_task_t * task,void * obj,void * data)161 nxt_h1p_conn_ws_keepalive(nxt_task_t *task, void *obj, void *data)
162 {
163     nxt_buf_t                  *out;
164     nxt_timer_t                *timer;
165     nxt_h1proto_t              *h1p;
166     nxt_http_request_t         *r;
167     nxt_websocket_header_t     *wsh;
168     nxt_h1p_websocket_timer_t  *ws_timer;
169 
170     nxt_debug(task, "h1p conn ws keepalive");
171 
172     timer = obj;
173     ws_timer = nxt_timer_data(timer, nxt_h1p_websocket_timer_t, timer);
174     h1p = ws_timer->h1p;
175 
176     r = h1p->request;
177     if (nxt_slow_path(r == NULL)) {
178         return;
179     }
180 
181     out = nxt_http_buf_mem(task, r, 2);
182     if (nxt_slow_path(out == NULL)) {
183         nxt_http_request_error_handler(task, r, r->proto.any);
184         return;
185     }
186 
187     out->mem.start[0] = 0;
188     out->mem.start[1] = 0;
189 
190     wsh = (nxt_websocket_header_t *) out->mem.start;
191     out->mem.free = nxt_websocket_frame_init(wsh, 0);
192 
193     wsh->fin = 1;
194     wsh->opcode = NXT_WEBSOCKET_OP_PING;
195 
196     nxt_http_request_send(task, r, out);
197 }
198 
199 
200 static const nxt_conn_state_t  nxt_h1p_read_ws_frame_header_state
201     nxt_aligned(64) =
202 {
203     .ready_handler = nxt_h1p_conn_ws_frame_header_read,
204     .close_handler = nxt_h1p_conn_ws_error,
205     .error_handler = nxt_h1p_conn_ws_error,
206 
207     .io_read_handler = nxt_h1p_ws_io_read_handler,
208 
209     .timer_handler = nxt_h1p_conn_ws_timeout,
210     .timer_value = nxt_h1p_conn_request_timer_value,
211     .timer_data = offsetof(nxt_socket_conf_t, websocket_conf.read_timeout),
212     .timer_autoreset = 1,
213 };
214 
215 
216 static void
nxt_h1p_conn_ws_frame_header_read(nxt_task_t * task,void * obj,void * data)217 nxt_h1p_conn_ws_frame_header_read(nxt_task_t *task, void *obj, void *data)
218 {
219     size_t                  size, hsize, frame_size, max_frame_size;
220     uint64_t                payload_len;
221     nxt_conn_t              *c;
222     nxt_h1proto_t           *h1p;
223     nxt_http_request_t      *r;
224     nxt_event_engine_t      *engine;
225     nxt_websocket_header_t  *wsh;
226 
227     c = obj;
228     h1p = data;
229 
230     nxt_h1p_conn_ws_keepalive_disable(task, h1p);
231 
232     size = nxt_buf_mem_used_size(&c->read->mem);
233 
234     engine = task->thread->engine;
235 
236     if (size < 2) {
237         nxt_debug(task, "h1p conn ws frame header read %z", size);
238 
239         nxt_conn_read(engine, c);
240         nxt_h1p_conn_ws_keepalive_enable(task, h1p);
241 
242         return;
243     }
244 
245     wsh = (nxt_websocket_header_t *) c->read->mem.pos;
246 
247     hsize = nxt_websocket_frame_header_size(wsh);
248 
249     if (size < hsize) {
250         nxt_debug(task, "h1p conn ws frame header read %z < %z", size, hsize);
251 
252         nxt_conn_read(engine, c);
253         nxt_h1p_conn_ws_keepalive_enable(task, h1p);
254 
255         return;
256     }
257 
258     r = h1p->request;
259     if (nxt_slow_path(r == NULL)) {
260         return;
261     }
262 
263     r->ws_frame = c->read;
264 
265     if (nxt_slow_path(wsh->mask == 0)) {
266         hxt_h1p_send_ws_error(task, r, &nxt_ws_err_not_masked);
267         return;
268     }
269 
270     if ((wsh->opcode & NXT_WEBSOCKET_OP_CTRL) != 0) {
271         if (nxt_slow_path(wsh->fin == 0)) {
272             hxt_h1p_send_ws_error(task, r, &nxt_ws_err_ctrl_fragmented);
273             return;
274         }
275 
276         if (nxt_slow_path(wsh->opcode != NXT_WEBSOCKET_OP_PING
277                           && wsh->opcode != NXT_WEBSOCKET_OP_PONG
278                           && wsh->opcode != NXT_WEBSOCKET_OP_CLOSE))
279         {
280             hxt_h1p_send_ws_error(task, r, &nxt_ws_err_invalid_opcode,
281                                   wsh->opcode);
282             return;
283         }
284 
285         if (nxt_slow_path(wsh->payload_len > 125)) {
286             hxt_h1p_send_ws_error(task, r, &nxt_ws_err_ctrl_too_big,
287                                   nxt_websocket_frame_payload_len(wsh));
288             return;
289         }
290 
291         if (nxt_slow_path(wsh->opcode == NXT_WEBSOCKET_OP_CLOSE
292                           && wsh->payload_len == 1))
293         {
294             hxt_h1p_send_ws_error(task, r, &nxt_ws_err_invalid_close_len);
295             return;
296         }
297 
298     } else {
299         if (h1p->websocket_cont_expected) {
300             if (nxt_slow_path(wsh->opcode != NXT_WEBSOCKET_OP_CONT)) {
301                 hxt_h1p_send_ws_error(task, r, &nxt_ws_err_cont_expected,
302                                       wsh->opcode);
303                 return;
304             }
305 
306         } else {
307             if (nxt_slow_path(wsh->opcode != NXT_WEBSOCKET_OP_BINARY
308                               && wsh->opcode != NXT_WEBSOCKET_OP_TEXT))
309             {
310                 hxt_h1p_send_ws_error(task, r, &nxt_ws_err_invalid_opcode,
311                                       wsh->opcode);
312                 return;
313             }
314         }
315 
316         h1p->websocket_cont_expected = !wsh->fin;
317     }
318 
319     max_frame_size = r->conf->socket_conf->websocket_conf.max_frame_size;
320 
321     payload_len = nxt_websocket_frame_payload_len(wsh);
322 
323     if (nxt_slow_path(hsize > max_frame_size
324                       || payload_len > (max_frame_size - hsize)))
325     {
326         hxt_h1p_send_ws_error(task, r, &nxt_ws_err_too_big, payload_len);
327         return;
328     }
329 
330     c->read_state = &nxt_h1p_read_ws_frame_payload_state;
331 
332     frame_size = payload_len + hsize;
333 
334     nxt_debug(task, "h1p conn ws frame header read: %z, %z", size, frame_size);
335 
336     if (frame_size <= size) {
337         nxt_h1p_conn_ws_frame_process(task, c, h1p, wsh);
338 
339         return;
340     }
341 
342     if (frame_size < (size_t) nxt_buf_mem_size(&c->read->mem)) {
343         c->read->mem.end = c->read->mem.start + frame_size;
344 
345     } else {
346         nxt_buf_t *b = nxt_buf_mem_alloc(c->mem_pool, frame_size - size, 0);
347 
348         c->read->next = b;
349         c->read = b;
350     }
351 
352     nxt_conn_read(engine, c);
353     nxt_h1p_conn_ws_keepalive_enable(task, h1p);
354 }
355 
356 
357 static void
nxt_h1p_conn_ws_keepalive_disable(nxt_task_t * task,nxt_h1proto_t * h1p)358 nxt_h1p_conn_ws_keepalive_disable(nxt_task_t *task, nxt_h1proto_t *h1p)
359 {
360     nxt_timer_t  *timer;
361 
362     if (h1p->websocket_timer == NULL) {
363         return;
364     }
365 
366     timer = &h1p->websocket_timer->timer;
367 
368     if (nxt_slow_path(timer->handler != nxt_h1p_conn_ws_keepalive)) {
369         nxt_debug(task, "h1p ws keepalive disable: scheduled ws shutdown");
370         return;
371     }
372 
373     nxt_timer_disable(task->thread->engine, timer);
374 }
375 
376 
377 static void
nxt_h1p_conn_ws_keepalive_enable(nxt_task_t * task,nxt_h1proto_t * h1p)378 nxt_h1p_conn_ws_keepalive_enable(nxt_task_t *task, nxt_h1proto_t *h1p)
379 {
380     nxt_timer_t  *timer;
381 
382     if (h1p->websocket_timer == NULL) {
383         return;
384     }
385 
386     timer = &h1p->websocket_timer->timer;
387 
388     if (nxt_slow_path(timer->handler != nxt_h1p_conn_ws_keepalive)) {
389         nxt_debug(task, "h1p ws keepalive enable: scheduled ws shutdown");
390         return;
391     }
392 
393     nxt_timer_add(task->thread->engine, timer,
394                   h1p->websocket_timer->keepalive_interval);
395 }
396 
397 
398 static void
nxt_h1p_conn_ws_frame_process(nxt_task_t * task,nxt_conn_t * c,nxt_h1proto_t * h1p,nxt_websocket_header_t * wsh)399 nxt_h1p_conn_ws_frame_process(nxt_task_t *task, nxt_conn_t *c,
400     nxt_h1proto_t *h1p, nxt_websocket_header_t *wsh)
401 {
402     size_t              hsize;
403     uint8_t             *p, *mask;
404     uint16_t            code;
405     nxt_http_request_t  *r;
406 
407     r = h1p->request;
408 
409     c->read = NULL;
410 
411     if (nxt_slow_path(wsh->opcode == NXT_WEBSOCKET_OP_PING)) {
412         nxt_h1p_conn_ws_pong(task, r, NULL);
413         return;
414     }
415 
416     if (nxt_slow_path(wsh->opcode == NXT_WEBSOCKET_OP_CLOSE)) {
417         if (wsh->payload_len >= 2) {
418             hsize = nxt_websocket_frame_header_size(wsh);
419             mask = nxt_pointer_to(wsh, hsize - 4);
420             p = nxt_pointer_to(wsh, hsize);
421 
422             code = ((p[0] ^ mask[0]) << 8) + (p[1] ^ mask[1]);
423 
424             if (nxt_slow_path(code < 1000 || code >= 5000
425                               || (code > 1003 && code < 1007)
426                               || (code > 1014 && code < 3000)))
427             {
428                 hxt_h1p_send_ws_error(task, r, &nxt_ws_err_invalid_close_code,
429                                       code);
430                 return;
431             }
432         }
433 
434         h1p->websocket_closed = 1;
435     }
436 
437     r->state->ready_handler(task, r, NULL);
438 }
439 
440 
441 static void
nxt_h1p_conn_ws_error(nxt_task_t * task,void * obj,void * data)442 nxt_h1p_conn_ws_error(nxt_task_t *task, void *obj, void *data)
443 {
444     nxt_h1proto_t       *h1p;
445     nxt_http_request_t  *r;
446 
447     h1p = data;
448 
449     nxt_debug(task, "h1p conn ws error");
450 
451     r = h1p->request;
452 
453     h1p->keepalive = 0;
454 
455     if (nxt_fast_path(r != NULL)) {
456         r->state->error_handler(task, r, h1p);
457     }
458 }
459 
460 
461 static ssize_t
nxt_h1p_ws_io_read_handler(nxt_task_t * task,nxt_conn_t * c)462 nxt_h1p_ws_io_read_handler(nxt_task_t *task, nxt_conn_t *c)
463 {
464     size_t     size;
465     ssize_t    n;
466     nxt_buf_t  *b;
467 
468     b = c->read;
469 
470     if (b == NULL) {
471         /* Enough for control frame. */
472         size = 10 + 125;
473 
474         b = nxt_buf_mem_alloc(c->mem_pool, size, 0);
475         if (nxt_slow_path(b == NULL)) {
476             c->socket.error = NXT_ENOMEM;
477             return NXT_ERROR;
478         }
479     }
480 
481     n = c->io->recvbuf(c, b);
482 
483     if (n > 0) {
484         c->read = b;
485 
486     } else {
487         c->read = NULL;
488         nxt_mp_free(c->mem_pool, b);
489     }
490 
491     return n;
492 }
493 
494 
495 static void
nxt_h1p_conn_ws_timeout(nxt_task_t * task,void * obj,void * data)496 nxt_h1p_conn_ws_timeout(nxt_task_t *task, void *obj, void *data)
497 {
498     nxt_conn_t          *c;
499     nxt_timer_t         *timer;
500     nxt_h1proto_t       *h1p;
501     nxt_http_request_t  *r;
502 
503     timer = obj;
504 
505     nxt_debug(task, "h1p conn ws timeout");
506 
507     c = nxt_read_timer_conn(timer);
508     c->block_read = 1;
509     /*
510      * Disable SO_LINGER off during socket closing
511      * to send "408 Request Timeout" error response.
512      */
513     c->socket.timedout = 0;
514 
515     h1p = c->socket.data;
516     h1p->keepalive = 0;
517 
518     r = h1p->request;
519     if (nxt_slow_path(r == NULL)) {
520         return;
521     }
522 
523     hxt_h1p_send_ws_error(task, r, &nxt_ws_err_going_away);
524 }
525 
526 
527 static const nxt_conn_state_t  nxt_h1p_read_ws_frame_payload_state
528     nxt_aligned(64) =
529 {
530     .ready_handler = nxt_h1p_conn_ws_frame_payload_read,
531     .close_handler = nxt_h1p_conn_ws_error,
532     .error_handler = nxt_h1p_conn_ws_error,
533 
534     .timer_handler = nxt_h1p_conn_ws_timeout,
535     .timer_value = nxt_h1p_conn_request_timer_value,
536     .timer_data = offsetof(nxt_socket_conf_t, websocket_conf.read_timeout),
537     .timer_autoreset = 1,
538 };
539 
540 
541 static void
nxt_h1p_conn_ws_frame_payload_read(nxt_task_t * task,void * obj,void * data)542 nxt_h1p_conn_ws_frame_payload_read(nxt_task_t *task, void *obj, void *data)
543 {
544     nxt_conn_t              *c;
545     nxt_h1proto_t           *h1p;
546     nxt_http_request_t      *r;
547     nxt_event_engine_t      *engine;
548     nxt_websocket_header_t  *wsh;
549 
550     c = obj;
551     h1p = data;
552 
553     nxt_h1p_conn_ws_keepalive_disable(task, h1p);
554 
555     nxt_debug(task, "h1p conn ws frame read");
556 
557     if (nxt_buf_mem_free_size(&c->read->mem) == 0) {
558         r = h1p->request;
559         if (nxt_slow_path(r == NULL)) {
560             return;
561         }
562 
563         wsh = (nxt_websocket_header_t *) r->ws_frame->mem.pos;
564 
565         nxt_h1p_conn_ws_frame_process(task, c, h1p, wsh);
566 
567         return;
568     }
569 
570     engine = task->thread->engine;
571 
572     nxt_conn_read(engine, c);
573     nxt_h1p_conn_ws_keepalive_enable(task, h1p);
574 }
575 
576 
577 static void
hxt_h1p_send_ws_error(nxt_task_t * task,nxt_http_request_t * r,const nxt_ws_error_t * err,...)578 hxt_h1p_send_ws_error(nxt_task_t *task, nxt_http_request_t *r,
579     const nxt_ws_error_t *err, ...)
580 {
581     u_char                  *p;
582     va_list                 args;
583     nxt_buf_t               *out;
584     nxt_str_t               desc;
585     nxt_websocket_header_t  *wsh;
586     u_char                  buf[125];
587 
588     if (nxt_slow_path(err->args)) {
589         va_start(args, err);
590         p = nxt_vsprintf(buf, buf + sizeof(buf), (char *) err->desc.start,
591                          args);
592         va_end(args);
593 
594         desc.start = buf;
595         desc.length = p - buf;
596 
597     } else {
598         desc = err->desc;
599     }
600 
601     nxt_log(task, NXT_LOG_INFO, "websocket error %d: %V", err->code, &desc);
602 
603     out = nxt_http_buf_mem(task, r, 2 + sizeof(err->code) + desc.length);
604     if (nxt_slow_path(out == NULL)) {
605         nxt_http_request_error_handler(task, r, r->proto.any);
606         return;
607     }
608 
609     out->mem.start[0] = 0;
610     out->mem.start[1] = 0;
611 
612     wsh = (nxt_websocket_header_t *) out->mem.start;
613     p = nxt_websocket_frame_init(wsh, sizeof(err->code) + desc.length);
614 
615     wsh->fin = 1;
616     wsh->opcode = NXT_WEBSOCKET_OP_CLOSE;
617 
618     *p++ = (err->code >> 8) & 0xFF;
619     *p++ = err->code & 0xFF;
620 
621     out->mem.free = nxt_cpymem(p, desc.start, desc.length);
622     out->next = nxt_http_buf_last(r);
623 
624     if (out->next != NULL) {
625         out->next->completion_handler = nxt_h1p_conn_ws_error_sent;
626     }
627 
628     nxt_http_request_send(task, r, out);
629 }
630 
631 
632 static void
nxt_h1p_conn_ws_error_sent(nxt_task_t * task,void * obj,void * data)633 nxt_h1p_conn_ws_error_sent(nxt_task_t *task, void *obj, void *data)
634 {
635     nxt_http_request_t  *r;
636 
637     r = data;
638 
639     nxt_debug(task, "h1p conn ws error sent");
640 
641     r->state->error_handler(task, r, r->proto.any);
642 }
643 
644 
645 static void
nxt_h1p_conn_ws_pong(nxt_task_t * task,void * obj,void * data)646 nxt_h1p_conn_ws_pong(nxt_task_t *task, void *obj, void *data)
647 {
648     uint8_t                 payload_len, i;
649     nxt_buf_t               *b, *out, *next;
650     nxt_http_request_t      *r;
651     nxt_websocket_header_t  *wsh;
652     uint8_t                 mask[4];
653 
654     nxt_debug(task, "h1p conn ws pong");
655 
656     r = obj;
657     b = r->ws_frame;
658 
659     wsh = (nxt_websocket_header_t *) b->mem.pos;
660     payload_len = wsh->payload_len;
661 
662     b->mem.pos += 2;
663 
664     nxt_memcpy(mask, b->mem.pos, 4);
665 
666     b->mem.pos += 4;
667 
668     out = nxt_http_buf_mem(task, r, 2 + payload_len);
669     if (nxt_slow_path(out == NULL)) {
670         nxt_http_request_error_handler(task, r, r->proto.any);
671         return;
672     }
673 
674     out->mem.start[0] = 0;
675     out->mem.start[1] = 0;
676 
677     wsh = (nxt_websocket_header_t *) out->mem.start;
678     out->mem.free = nxt_websocket_frame_init(wsh, payload_len);
679 
680     wsh->fin = 1;
681     wsh->opcode = NXT_WEBSOCKET_OP_PONG;
682 
683     for (i = 0; i < payload_len; i++) {
684         while (nxt_buf_mem_used_size(&b->mem) == 0) {
685             next = b->next;
686             b->next = NULL;
687 
688             nxt_work_queue_add(&task->thread->engine->fast_work_queue,
689                                b->completion_handler, task, b, b->parent);
690 
691             b = next;
692         }
693 
694         *out->mem.free++ = *b->mem.pos++ ^ mask[i % 4];
695     }
696 
697     r->ws_frame = b;
698 
699     nxt_http_request_send(task, r, out);
700 
701     nxt_http_request_ws_frame_start(task, r, r->ws_frame);
702 }
703