xref: /unit/src/nxt_h1proto_websocket.c (revision 1155:8f70f87d0254)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #include <nxt_main.h>
7 #include <nxt_router.h>
8 #include <nxt_http.h>
9 #include <nxt_h1proto.h>
10 #include <nxt_websocket.h>
11 #include <nxt_websocket_header.h>
12 
13 typedef struct {
14     uint16_t   code;
15     uint8_t    args;
16     nxt_str_t  desc;
17 } nxt_ws_error_t;
18 
19 static void nxt_h1p_conn_ws_keepalive(nxt_task_t *task, void *obj, void *data);
20 static void nxt_h1p_conn_ws_frame_header_read(nxt_task_t *task, void *obj,
21     void *data);
22 static void nxt_h1p_conn_ws_keepalive_disable(nxt_task_t *task,
23     nxt_h1proto_t *h1p);
24 static void nxt_h1p_conn_ws_keepalive_enable(nxt_task_t *task,
25     nxt_h1proto_t *h1p);
26 static void nxt_h1p_conn_ws_frame_process(nxt_task_t *task, nxt_conn_t *c,
27     nxt_h1proto_t *h1p, nxt_websocket_header_t *wsh);
28 static void nxt_h1p_conn_ws_error(nxt_task_t *task, void *obj, void *data);
29 static ssize_t nxt_h1p_ws_io_read_handler(nxt_conn_t *c);
30 static void nxt_h1p_conn_ws_timeout(nxt_task_t *task, void *obj, void *data);
31 static void nxt_h1p_conn_ws_frame_payload_read(nxt_task_t *task, void *obj,
32     void *data);
33 static void hxt_h1p_send_ws_error(nxt_task_t *task, nxt_http_request_t *r,
34     const nxt_ws_error_t *err, ...);
35 static void nxt_h1p_conn_ws_error_sent(nxt_task_t *task, void *obj, void *data);
36 static void nxt_h1p_conn_ws_pong(nxt_task_t *task, void *obj, void *data);
37 
38 static const nxt_conn_state_t  nxt_h1p_read_ws_frame_header_state;
39 static const nxt_conn_state_t  nxt_h1p_read_ws_frame_payload_state;
40 
41 static const nxt_ws_error_t  nxt_ws_err_out_of_memory = {
42     NXT_WEBSOCKET_CR_INTERNAL_SERVER_ERROR,
43     0, nxt_string("Out of memory") };
44 static const nxt_ws_error_t  nxt_ws_err_too_big = {
45     NXT_WEBSOCKET_CR_MESSAGE_TOO_BIG,
46     1, nxt_string("Message too big: %uL bytes") };
47 static const nxt_ws_error_t  nxt_ws_err_invalid_close_code = {
48     NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
49     1, nxt_string("Close code %ud is not valid") };
50 static const nxt_ws_error_t  nxt_ws_err_going_away = {
51     NXT_WEBSOCKET_CR_GOING_AWAY,
52     0, nxt_string("Remote peer is going away") };
53 static const nxt_ws_error_t  nxt_ws_err_not_masked = {
54     NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
55     0, nxt_string("Not masked client frame") };
56 static const nxt_ws_error_t  nxt_ws_err_ctrl_fragmented = {
57     NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
58     0, nxt_string("Fragmented control frame") };
59 static const nxt_ws_error_t  nxt_ws_err_ctrl_too_big = {
60     NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
61     1, nxt_string("Control frame too big: %uL bytes") };
62 static const nxt_ws_error_t  nxt_ws_err_invalid_close_len = {
63     NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
64     0, nxt_string("Close frame payload length cannot be 1") };
65 static const nxt_ws_error_t  nxt_ws_err_invalid_opcode = {
66     NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
67     1, nxt_string("Unrecognized opcode %ud") };
68 static const nxt_ws_error_t  nxt_ws_err_cont_expected = {
69     NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
70     1, nxt_string("Continuation expected, but %ud opcode received") };
71 
72 void
73 nxt_h1p_websocket_first_frame_start(nxt_task_t *task, nxt_http_request_t *r,
74     nxt_buf_t *ws_frame)
75 {
76     nxt_conn_t               *c;
77     nxt_timer_t              *timer;
78     nxt_h1proto_t            *h1p;
79     nxt_socket_conf_joint_t  *joint;
80 
81     nxt_debug(task, "h1p ws first frame start");
82 
83     h1p = r->proto.h1;
84     c = h1p->conn;
85 
86     if (!c->tcp_nodelay) {
87         nxt_conn_tcp_nodelay_on(task, c);
88     }
89 
90     joint = c->listen->socket.data;
91 
92     if (nxt_slow_path(joint != NULL
93         && joint->socket_conf->websocket_conf.keepalive_interval != 0))
94     {
95         h1p->websocket_timer = nxt_mp_zget(c->mem_pool,
96                                            sizeof(nxt_h1p_websocket_timer_t));
97         if (nxt_slow_path(h1p->websocket_timer == NULL)) {
98             hxt_h1p_send_ws_error(task, r, &nxt_ws_err_out_of_memory);
99             return;
100         }
101 
102         h1p->websocket_timer->keepalive_interval =
103             joint->socket_conf->websocket_conf.keepalive_interval;
104         h1p->websocket_timer->h1p = h1p;
105 
106         timer = &h1p->websocket_timer->timer;
107         timer->task = &c->task;
108         timer->work_queue = &task->thread->engine->fast_work_queue;
109         timer->log = &c->log;
110         timer->bias = NXT_TIMER_DEFAULT_BIAS;
111         timer->handler = nxt_h1p_conn_ws_keepalive;
112     }
113 
114     nxt_h1p_websocket_frame_start(task, r, ws_frame);
115 }
116 
117 
118 void
119 nxt_h1p_websocket_frame_start(nxt_task_t *task, nxt_http_request_t *r,
120     nxt_buf_t *ws_frame)
121 {
122     size_t         size;
123     nxt_buf_t      *in;
124     nxt_conn_t     *c;
125     nxt_h1proto_t  *h1p;
126 
127     nxt_debug(task, "h1p ws frame start");
128 
129     h1p = r->proto.h1;
130 
131     if (nxt_slow_path(h1p->websocket_closed)) {
132         return;
133     }
134 
135     c = h1p->conn;
136     c->read = ws_frame;
137 
138     nxt_h1p_complete_buffers(task, h1p);
139 
140     in = c->read;
141     c->read_state = &nxt_h1p_read_ws_frame_header_state;
142 
143     if (in == NULL) {
144         nxt_conn_read(task->thread->engine, c);
145         nxt_h1p_conn_ws_keepalive_enable(task, h1p);
146 
147     } else {
148         size = nxt_buf_mem_used_size(&in->mem);
149 
150         nxt_debug(task, "h1p read client ws frame");
151 
152         nxt_memmove(in->mem.start, in->mem.pos, size);
153 
154         in->mem.pos = in->mem.start;
155         in->mem.free = in->mem.start + size;
156 
157         nxt_h1p_conn_ws_frame_header_read(task, c, h1p);
158     }
159 }
160 
161 
162 static void
163 nxt_h1p_conn_ws_keepalive(nxt_task_t *task, void *obj, void *data)
164 {
165     nxt_buf_t                  *out;
166     nxt_timer_t                *timer;
167     nxt_h1proto_t              *h1p;
168     nxt_http_request_t         *r;
169     nxt_websocket_header_t     *wsh;
170     nxt_h1p_websocket_timer_t  *ws_timer;
171 
172     nxt_debug(task, "h1p conn ws keepalive");
173 
174     timer = obj;
175     ws_timer = nxt_timer_data(timer, nxt_h1p_websocket_timer_t, timer);
176     h1p = ws_timer->h1p;
177 
178     r = h1p->request;
179     if (nxt_slow_path(r == NULL)) {
180         return;
181     }
182 
183     out = nxt_http_buf_mem(task, r, 2);
184     if (nxt_slow_path(out == NULL)) {
185         nxt_http_request_error_handler(task, r, r->proto.any);
186         return;
187     }
188 
189     out->mem.start[0] = 0;
190     out->mem.start[1] = 0;
191 
192     wsh = (nxt_websocket_header_t *) out->mem.start;
193     out->mem.free = nxt_websocket_frame_init(wsh, 0);
194 
195     wsh->fin = 1;
196     wsh->opcode = NXT_WEBSOCKET_OP_PING;
197 
198     nxt_http_request_send(task, r, out);
199 }
200 
201 
202 static const nxt_conn_state_t  nxt_h1p_read_ws_frame_header_state
203     nxt_aligned(64) =
204 {
205     .ready_handler = nxt_h1p_conn_ws_frame_header_read,
206     .close_handler = nxt_h1p_conn_ws_error,
207     .error_handler = nxt_h1p_conn_ws_error,
208 
209     .io_read_handler = nxt_h1p_ws_io_read_handler,
210 
211     .timer_handler = nxt_h1p_conn_ws_timeout,
212     .timer_value = nxt_h1p_conn_request_timer_value,
213     .timer_data = offsetof(nxt_socket_conf_t, websocket_conf.read_timeout),
214     .timer_autoreset = 1,
215 };
216 
217 
218 static void
219 nxt_h1p_conn_ws_frame_header_read(nxt_task_t *task, void *obj, void *data)
220 {
221     size_t                   size, hsize, frame_size, max_frame_size;
222     uint64_t                 payload_len;
223     nxt_conn_t               *c;
224     nxt_h1proto_t            *h1p;
225     nxt_http_request_t       *r;
226     nxt_event_engine_t       *engine;
227     nxt_websocket_header_t   *wsh;
228     nxt_socket_conf_joint_t  *joint;
229 
230     c = obj;
231     h1p = data;
232 
233     nxt_h1p_conn_ws_keepalive_disable(task, h1p);
234 
235     size = nxt_buf_mem_used_size(&c->read->mem);
236 
237     engine = task->thread->engine;
238 
239     if (size < 2) {
240         nxt_debug(task, "h1p conn ws frame header read %z", size);
241 
242         nxt_conn_read(engine, c);
243         nxt_h1p_conn_ws_keepalive_enable(task, h1p);
244 
245         return;
246     }
247 
248     wsh = (nxt_websocket_header_t *) c->read->mem.pos;
249 
250     hsize = nxt_websocket_frame_header_size(wsh);
251 
252     if (size < hsize) {
253         nxt_debug(task, "h1p conn ws frame header read %z < %z", size, hsize);
254 
255         nxt_conn_read(engine, c);
256         nxt_h1p_conn_ws_keepalive_enable(task, h1p);
257 
258         return;
259     }
260 
261     r = h1p->request;
262     if (nxt_slow_path(r == NULL)) {
263         return;
264     }
265 
266     r->ws_frame = c->read;
267 
268     joint = c->listen->socket.data;
269 
270     if (nxt_slow_path(joint == NULL)) {
271         /*
272          * Listening socket had been closed while
273          * connection was in keep-alive state.
274          */
275         c->read_state = &nxt_h1p_idle_close_state;
276         return;
277     }
278 
279     if (nxt_slow_path(wsh->mask == 0)) {
280         hxt_h1p_send_ws_error(task, r, &nxt_ws_err_not_masked);
281         return;
282     }
283 
284     if ((wsh->opcode & NXT_WEBSOCKET_OP_CTRL) != 0) {
285         if (nxt_slow_path(wsh->fin == 0)) {
286             hxt_h1p_send_ws_error(task, r, &nxt_ws_err_ctrl_fragmented);
287             return;
288         }
289 
290         if (nxt_slow_path(wsh->opcode != NXT_WEBSOCKET_OP_PING
291                           && wsh->opcode != NXT_WEBSOCKET_OP_PONG
292                           && wsh->opcode != NXT_WEBSOCKET_OP_CLOSE))
293         {
294             hxt_h1p_send_ws_error(task, r, &nxt_ws_err_invalid_opcode,
295                                   wsh->opcode);
296             return;
297         }
298 
299         if (nxt_slow_path(wsh->payload_len > 125)) {
300             hxt_h1p_send_ws_error(task, r, &nxt_ws_err_ctrl_too_big,
301                                   nxt_websocket_frame_payload_len(wsh));
302             return;
303         }
304 
305         if (nxt_slow_path(wsh->opcode == NXT_WEBSOCKET_OP_CLOSE
306                           && wsh->payload_len == 1))
307         {
308             hxt_h1p_send_ws_error(task, r, &nxt_ws_err_invalid_close_len);
309             return;
310         }
311 
312     } else {
313         if (h1p->websocket_cont_expected) {
314             if (nxt_slow_path(wsh->opcode != NXT_WEBSOCKET_OP_CONT)) {
315                 hxt_h1p_send_ws_error(task, r, &nxt_ws_err_cont_expected,
316                                       wsh->opcode);
317                 return;
318             }
319 
320         } else {
321             if (nxt_slow_path(wsh->opcode != NXT_WEBSOCKET_OP_BINARY
322                               && wsh->opcode != NXT_WEBSOCKET_OP_TEXT))
323             {
324                 hxt_h1p_send_ws_error(task, r, &nxt_ws_err_invalid_opcode,
325                                       wsh->opcode);
326                 return;
327             }
328         }
329 
330         h1p->websocket_cont_expected = !wsh->fin;
331     }
332 
333     max_frame_size = joint->socket_conf->websocket_conf.max_frame_size;
334 
335     payload_len = nxt_websocket_frame_payload_len(wsh);
336 
337     if (nxt_slow_path(hsize > max_frame_size
338                       || payload_len > (max_frame_size - hsize)))
339     {
340         hxt_h1p_send_ws_error(task, r, &nxt_ws_err_too_big, payload_len);
341         return;
342     }
343 
344     c->read_state = &nxt_h1p_read_ws_frame_payload_state;
345 
346     frame_size = payload_len + hsize;
347 
348     nxt_debug(task, "h1p conn ws frame header read: %z, %z", size, frame_size);
349 
350     if (frame_size <= size) {
351         nxt_h1p_conn_ws_frame_process(task, c, h1p, wsh);
352 
353         return;
354     }
355 
356     if (frame_size < (size_t) nxt_buf_mem_size(&c->read->mem)) {
357         c->read->mem.end = c->read->mem.start + frame_size;
358 
359     } else {
360         nxt_buf_t *b = nxt_buf_mem_alloc(c->mem_pool, frame_size - size, 0);
361 
362         c->read->next = b;
363         c->read = b;
364     }
365 
366     nxt_conn_read(engine, c);
367     nxt_h1p_conn_ws_keepalive_enable(task, h1p);
368 }
369 
370 
371 static void
372 nxt_h1p_conn_ws_keepalive_disable(nxt_task_t *task, nxt_h1proto_t *h1p)
373 {
374     nxt_timer_t  *timer;
375 
376     if (h1p->websocket_timer == NULL) {
377         return;
378     }
379 
380     timer = &h1p->websocket_timer->timer;
381 
382     if (nxt_slow_path(timer->handler != nxt_h1p_conn_ws_keepalive)) {
383         nxt_debug(task, "h1p ws keepalive disable: scheduled ws shutdown");
384         return;
385     }
386 
387     nxt_timer_disable(task->thread->engine, timer);
388 }
389 
390 
391 static void
392 nxt_h1p_conn_ws_keepalive_enable(nxt_task_t *task, nxt_h1proto_t *h1p)
393 {
394     nxt_timer_t  *timer;
395 
396     if (h1p->websocket_timer == NULL) {
397         return;
398     }
399 
400     timer = &h1p->websocket_timer->timer;
401 
402     if (nxt_slow_path(timer->handler != nxt_h1p_conn_ws_keepalive)) {
403         nxt_debug(task, "h1p ws keepalive enable: scheduled ws shutdown");
404         return;
405     }
406 
407     nxt_timer_add(task->thread->engine, timer,
408                   h1p->websocket_timer->keepalive_interval);
409 }
410 
411 
412 static void
413 nxt_h1p_conn_ws_frame_process(nxt_task_t *task, nxt_conn_t *c,
414     nxt_h1proto_t *h1p, nxt_websocket_header_t *wsh)
415 {
416     size_t              hsize;
417     uint8_t             *p, *mask;
418     uint16_t            code;
419     nxt_http_request_t  *r;
420 
421     r = h1p->request;
422 
423     c->read = NULL;
424 
425     if (nxt_slow_path(wsh->opcode == NXT_WEBSOCKET_OP_PING)) {
426         nxt_h1p_conn_ws_pong(task, r, NULL);
427         return;
428     }
429 
430     if (nxt_slow_path(wsh->opcode == NXT_WEBSOCKET_OP_CLOSE)) {
431         if (wsh->payload_len >= 2) {
432             hsize = nxt_websocket_frame_header_size(wsh);
433             mask = nxt_pointer_to(wsh, hsize - 4);
434             p = nxt_pointer_to(wsh, hsize);
435 
436             code = ((p[0] ^ mask[0]) << 8) + (p[1] ^ mask[1]);
437 
438             if (nxt_slow_path(code < 1000 || code >= 5000
439                               || (code > 1003 && code < 1007)
440                               || (code > 1014 && code < 3000)))
441             {
442                 hxt_h1p_send_ws_error(task, r, &nxt_ws_err_invalid_close_code,
443                                       code);
444                 return;
445             }
446         }
447 
448         h1p->websocket_closed = 1;
449     }
450 
451     r->state->ready_handler(task, r, NULL);
452 }
453 
454 
455 static void
456 nxt_h1p_conn_ws_error(nxt_task_t *task, void *obj, void *data)
457 {
458     nxt_h1proto_t       *h1p;
459     nxt_http_request_t  *r;
460 
461     h1p = data;
462 
463     nxt_debug(task, "h1p conn ws error");
464 
465     r = h1p->request;
466 
467     h1p->keepalive = 0;
468 
469     if (nxt_fast_path(r != NULL)) {
470         r->state->error_handler(task, r, h1p);
471     }
472 }
473 
474 
475 static ssize_t
476 nxt_h1p_ws_io_read_handler(nxt_conn_t *c)
477 {
478     size_t     size;
479     ssize_t    n;
480     nxt_buf_t  *b;
481 
482     b = c->read;
483 
484     if (b == NULL) {
485         /* Enough for control frame. */
486         size = 10 + 125;
487 
488         b = nxt_buf_mem_alloc(c->mem_pool, size, 0);
489         if (nxt_slow_path(b == NULL)) {
490             c->socket.error = NXT_ENOMEM;
491             return NXT_ERROR;
492         }
493     }
494 
495     n = c->io->recvbuf(c, b);
496 
497     if (n > 0) {
498         c->read = b;
499 
500     } else {
501         c->read = NULL;
502         nxt_mp_free(c->mem_pool, b);
503     }
504 
505     return n;
506 }
507 
508 
509 static void
510 nxt_h1p_conn_ws_timeout(nxt_task_t *task, void *obj, void *data)
511 {
512     nxt_conn_t          *c;
513     nxt_timer_t         *timer;
514     nxt_h1proto_t       *h1p;
515     nxt_http_request_t  *r;
516 
517     timer = obj;
518 
519     nxt_debug(task, "h1p conn ws timeout");
520 
521     c = nxt_read_timer_conn(timer);
522     c->block_read = 1;
523     /*
524      * Disable SO_LINGER off during socket closing
525      * to send "408 Request Timeout" error response.
526      */
527     c->socket.timedout = 0;
528 
529     h1p = c->socket.data;
530     h1p->keepalive = 0;
531 
532     r = h1p->request;
533     if (nxt_slow_path(r == NULL)) {
534         return;
535     }
536 
537     hxt_h1p_send_ws_error(task, r, &nxt_ws_err_going_away);
538 }
539 
540 
541 static const nxt_conn_state_t  nxt_h1p_read_ws_frame_payload_state
542     nxt_aligned(64) =
543 {
544     .ready_handler = nxt_h1p_conn_ws_frame_payload_read,
545     .close_handler = nxt_h1p_conn_ws_error,
546     .error_handler = nxt_h1p_conn_ws_error,
547 
548     .timer_handler = nxt_h1p_conn_ws_timeout,
549     .timer_value = nxt_h1p_conn_request_timer_value,
550     .timer_data = offsetof(nxt_socket_conf_t, websocket_conf.read_timeout),
551     .timer_autoreset = 1,
552 };
553 
554 
555 static void
556 nxt_h1p_conn_ws_frame_payload_read(nxt_task_t *task, void *obj, void *data)
557 {
558     nxt_conn_t              *c;
559     nxt_h1proto_t           *h1p;
560     nxt_http_request_t      *r;
561     nxt_event_engine_t      *engine;
562     nxt_websocket_header_t  *wsh;
563 
564     c = obj;
565     h1p = data;
566 
567     nxt_h1p_conn_ws_keepalive_disable(task, h1p);
568 
569     nxt_debug(task, "h1p conn ws frame read");
570 
571     if (nxt_buf_mem_free_size(&c->read->mem) == 0) {
572         r = h1p->request;
573         if (nxt_slow_path(r == NULL)) {
574             return;
575         }
576 
577         wsh = (nxt_websocket_header_t *) r->ws_frame->mem.pos;
578 
579         nxt_h1p_conn_ws_frame_process(task, c, h1p, wsh);
580 
581         return;
582     }
583 
584     engine = task->thread->engine;
585 
586     nxt_conn_read(engine, c);
587     nxt_h1p_conn_ws_keepalive_enable(task, h1p);
588 }
589 
590 
591 static void
592 hxt_h1p_send_ws_error(nxt_task_t *task, nxt_http_request_t *r,
593     const nxt_ws_error_t *err, ...)
594 {
595     u_char                  *p;
596     va_list                 args;
597     nxt_buf_t               *out;
598     nxt_str_t               desc;
599     nxt_websocket_header_t  *wsh;
600     u_char                  buf[125];
601 
602     if (nxt_slow_path(err->args)) {
603         va_start(args, err);
604         p = nxt_vsprintf(buf, buf + sizeof(buf), (char *) err->desc.start,
605                          args);
606         va_end(args);
607 
608         desc.start = buf;
609         desc.length = p - buf;
610 
611     } else {
612         desc = err->desc;
613     }
614 
615     nxt_log(task, NXT_LOG_INFO, "websocket error %d: %V", err->code, &desc);
616 
617     out = nxt_http_buf_mem(task, r, 2 + sizeof(err->code) + desc.length);
618     if (nxt_slow_path(out == NULL)) {
619         nxt_http_request_error_handler(task, r, r->proto.any);
620         return;
621     }
622 
623     out->mem.start[0] = 0;
624     out->mem.start[1] = 0;
625 
626     wsh = (nxt_websocket_header_t *) out->mem.start;
627     p = nxt_websocket_frame_init(wsh, sizeof(err->code) + desc.length);
628 
629     wsh->fin = 1;
630     wsh->opcode = NXT_WEBSOCKET_OP_CLOSE;
631 
632     *p++ = (err->code >> 8) & 0xFF;
633     *p++ = err->code & 0xFF;
634 
635     out->mem.free = nxt_cpymem(p, desc.start, desc.length);
636     out->next = nxt_http_buf_last(r);
637 
638     if (out->next != NULL) {
639         out->next->completion_handler = nxt_h1p_conn_ws_error_sent;
640     }
641 
642     nxt_http_request_send(task, r, out);
643 }
644 
645 
646 static void
647 nxt_h1p_conn_ws_error_sent(nxt_task_t *task, void *obj, void *data)
648 {
649     nxt_http_request_t  *r;
650 
651     r = data;
652 
653     nxt_debug(task, "h1p conn ws error sent");
654 
655     r->state->error_handler(task, r, r->proto.any);
656 }
657 
658 
659 static void
660 nxt_h1p_conn_ws_pong(nxt_task_t *task, void *obj, void *data)
661 {
662     uint8_t                 payload_len, i;
663     nxt_buf_t               *b, *out, *next;
664     nxt_http_request_t      *r;
665     nxt_websocket_header_t  *wsh;
666     uint8_t                 mask[4];
667 
668     nxt_debug(task, "h1p conn ws pong");
669 
670     r = obj;
671     b = r->ws_frame;
672 
673     wsh = (nxt_websocket_header_t *) b->mem.pos;
674     payload_len = wsh->payload_len;
675 
676     b->mem.pos += 2;
677 
678     nxt_memcpy(mask, b->mem.pos, 4);
679 
680     b->mem.pos += 4;
681 
682     out = nxt_http_buf_mem(task, r, 2 + payload_len);
683     if (nxt_slow_path(out == NULL)) {
684         nxt_http_request_error_handler(task, r, r->proto.any);
685         return;
686     }
687 
688     out->mem.start[0] = 0;
689     out->mem.start[1] = 0;
690 
691     wsh = (nxt_websocket_header_t *) out->mem.start;
692     out->mem.free = nxt_websocket_frame_init(wsh, payload_len);
693 
694     wsh->fin = 1;
695     wsh->opcode = NXT_WEBSOCKET_OP_PONG;
696 
697     for (i = 0; i < payload_len; i++) {
698         while (nxt_buf_mem_used_size(&b->mem) == 0) {
699             next = b->next;
700 
701             nxt_work_queue_add(&task->thread->engine->fast_work_queue,
702                                b->completion_handler, task, b, b->parent);
703 
704             b = next;
705         }
706 
707         *out->mem.free++ = *b->mem.pos++ ^ mask[i % 4];
708     }
709 
710     r->ws_frame = b;
711 
712     nxt_http_request_send(task, r, out);
713 
714     nxt_http_request_ws_frame_start(task, r, r->ws_frame);
715 }
716