xref: /unit/src/nxt_h1proto_websocket.c (revision 1131:ec7d924d8dfb)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #include <nxt_main.h>
7 #include <nxt_router.h>
8 #include <nxt_http.h>
9 #include <nxt_h1proto.h>
10 #include <nxt_websocket.h>
11 #include <nxt_websocket_header.h>
12 
13 typedef struct {
14     uint16_t   code;
15     uint8_t    args;
16     nxt_str_t  desc;
17 } nxt_ws_error_t;
18 
19 static void nxt_h1p_conn_ws_keepalive(nxt_task_t *task, void *obj, void *data);
20 static void nxt_h1p_conn_ws_frame_header_read(nxt_task_t *task, void *obj,
21     void *data);
22 static void nxt_h1p_conn_ws_keepalive_disable(nxt_task_t *task,
23     nxt_h1proto_t *h1p);
24 static void nxt_h1p_conn_ws_keepalive_enable(nxt_task_t *task,
25     nxt_h1proto_t *h1p);
26 static void nxt_h1p_conn_ws_frame_process(nxt_task_t *task, nxt_conn_t *c,
27     nxt_h1proto_t *h1p, nxt_websocket_header_t *wsh);
28 static void nxt_h1p_conn_ws_error(nxt_task_t *task, void *obj, void *data);
29 static ssize_t nxt_h1p_ws_io_read_handler(nxt_conn_t *c);
30 static void nxt_h1p_conn_ws_timeout(nxt_task_t *task, void *obj, void *data);
31 static void nxt_h1p_conn_ws_frame_payload_read(nxt_task_t *task, void *obj,
32     void *data);
33 static void hxt_h1p_send_ws_error(nxt_task_t *task, nxt_http_request_t *r,
34     const nxt_ws_error_t *err, ...);
35 static void nxt_h1p_conn_ws_error_sent(nxt_task_t *task, void *obj, void *data);
36 static void nxt_h1p_conn_ws_pong(nxt_task_t *task, void *obj, void *data);
37 
38 static const nxt_conn_state_t  nxt_h1p_read_ws_frame_header_state;
39 static const nxt_conn_state_t  nxt_h1p_read_ws_frame_payload_state;
40 
41 static const nxt_ws_error_t  nxt_ws_err_out_of_memory = {
42     NXT_WEBSOCKET_CR_INTERNAL_SERVER_ERROR,
43     0, nxt_string("Out of memory") };
44 static const nxt_ws_error_t  nxt_ws_err_too_big = {
45     NXT_WEBSOCKET_CR_MESSAGE_TOO_BIG,
46     1, nxt_string("Message too big: %uL bytes") };
47 static const nxt_ws_error_t  nxt_ws_err_invalid_close_code = {
48     NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
49     1, nxt_string("Close code %ud is not valid") };
50 static const nxt_ws_error_t  nxt_ws_err_going_away = {
51     NXT_WEBSOCKET_CR_GOING_AWAY,
52     0, nxt_string("Remote peer is going away") };
53 static const nxt_ws_error_t  nxt_ws_err_not_masked = {
54     NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
55     0, nxt_string("Not masked client frame") };
56 static const nxt_ws_error_t  nxt_ws_err_ctrl_fragmented = {
57     NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
58     0, nxt_string("Fragmented control frame") };
59 static const nxt_ws_error_t  nxt_ws_err_ctrl_too_big = {
60     NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
61     1, nxt_string("Control frame too big: %uL bytes") };
62 static const nxt_ws_error_t  nxt_ws_err_invalid_close_len = {
63     NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
64     0, nxt_string("Close frame payload length cannot be 1") };
65 static const nxt_ws_error_t  nxt_ws_err_invalid_opcode = {
66     NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
67     1, nxt_string("Unrecognized opcode %ud") };
68 static const nxt_ws_error_t  nxt_ws_err_cont_expected = {
69     NXT_WEBSOCKET_CR_PROTOCOL_ERROR,
70     1, nxt_string("Continuation expected, but %ud opcode received") };
71 
72 void
73 nxt_h1p_websocket_first_frame_start(nxt_task_t *task, nxt_http_request_t *r,
74     nxt_buf_t *ws_frame)
75 {
76     nxt_conn_t               *c;
77     nxt_timer_t              *timer;
78     nxt_h1proto_t            *h1p;
79     nxt_socket_conf_joint_t  *joint;
80 
81     nxt_debug(task, "h1p ws first frame start");
82 
83     h1p = r->proto.h1;
84     c = h1p->conn;
85 
86     if (!c->tcp_nodelay) {
87         nxt_conn_tcp_nodelay_on(task, c);
88     }
89 
90     joint = c->listen->socket.data;
91 
92     if (nxt_slow_path(joint != NULL
93         && joint->socket_conf->websocket_conf.keepalive_interval != 0))
94     {
95         h1p->websocket_timer = nxt_mp_zget(c->mem_pool,
96                                            sizeof(nxt_h1p_websocket_timer_t));
97         if (nxt_slow_path(h1p->websocket_timer == NULL)) {
98             hxt_h1p_send_ws_error(task, r, &nxt_ws_err_out_of_memory);
99             return;
100         }
101 
102         h1p->websocket_timer->keepalive_interval =
103             joint->socket_conf->websocket_conf.keepalive_interval;
104         h1p->websocket_timer->h1p = h1p;
105 
106         timer = &h1p->websocket_timer->timer;
107         timer->task = &c->task;
108         timer->work_queue = &task->thread->engine->fast_work_queue;
109         timer->log = &c->log;
110         timer->bias = NXT_TIMER_DEFAULT_BIAS;
111         timer->handler = nxt_h1p_conn_ws_keepalive;
112     }
113 
114     nxt_h1p_websocket_frame_start(task, r, ws_frame);
115 }
116 
117 
118 void
119 nxt_h1p_websocket_frame_start(nxt_task_t *task, nxt_http_request_t *r,
120     nxt_buf_t *ws_frame)
121 {
122     size_t         size;
123     nxt_buf_t      *in;
124     nxt_conn_t     *c;
125     nxt_h1proto_t  *h1p;
126 
127     nxt_debug(task, "h1p ws frame start");
128 
129     h1p = r->proto.h1;
130 
131     if (nxt_slow_path(h1p->websocket_closed)) {
132         return;
133     }
134 
135     c = h1p->conn;
136     c->read = ws_frame;
137 
138     nxt_h1p_complete_buffers(task, h1p);
139 
140     in = c->read;
141     c->read_state = &nxt_h1p_read_ws_frame_header_state;
142 
143     if (in == NULL) {
144         nxt_conn_read(task->thread->engine, c);
145         nxt_h1p_conn_ws_keepalive_enable(task, h1p);
146 
147     } else {
148         size = nxt_buf_mem_used_size(&in->mem);
149 
150         nxt_debug(task, "h1p read client ws frame");
151 
152         nxt_memmove(in->mem.start, in->mem.pos, size);
153 
154         in->mem.pos = in->mem.start;
155         in->mem.free = in->mem.start + size;
156 
157         nxt_h1p_conn_ws_frame_header_read(task, c, h1p);
158     }
159 }
160 
161 
162 static void
163 nxt_h1p_conn_ws_keepalive(nxt_task_t *task, void *obj, void *data)
164 {
165     nxt_buf_t                  *out;
166     nxt_timer_t                *timer;
167     nxt_h1proto_t              *h1p;
168     nxt_http_request_t         *r;
169     nxt_websocket_header_t     *wsh;
170     nxt_h1p_websocket_timer_t  *ws_timer;
171 
172     nxt_debug(task, "h1p conn ws keepalive");
173 
174     timer = obj;
175     ws_timer = nxt_timer_data(timer, nxt_h1p_websocket_timer_t, timer);
176     h1p = ws_timer->h1p;
177 
178     r = h1p->request;
179     if (nxt_slow_path(r == NULL)) {
180         return;
181     }
182 
183     out = nxt_http_buf_mem(task, r, 2);
184     if (nxt_slow_path(out == NULL)) {
185         nxt_http_request_error_handler(task, r, r->proto.any);
186         return;
187     }
188 
189     out->mem.start[0] = 0;
190     out->mem.start[1] = 0;
191 
192     wsh = (nxt_websocket_header_t *) out->mem.start;
193     out->mem.free = nxt_websocket_frame_init(wsh, 0);
194 
195     wsh->fin = 1;
196     wsh->opcode = NXT_WEBSOCKET_OP_PING;
197 
198     nxt_http_request_send(task, r, out);
199 }
200 
201 
202 static const nxt_conn_state_t  nxt_h1p_read_ws_frame_header_state
203     nxt_aligned(64) =
204 {
205     .ready_handler = nxt_h1p_conn_ws_frame_header_read,
206     .close_handler = nxt_h1p_conn_ws_error,
207     .error_handler = nxt_h1p_conn_ws_error,
208 
209     .io_read_handler = nxt_h1p_ws_io_read_handler,
210 
211     .timer_handler = nxt_h1p_conn_ws_timeout,
212     .timer_value = nxt_h1p_conn_request_timer_value,
213     .timer_data = offsetof(nxt_socket_conf_t, websocket_conf.read_timeout),
214     .timer_autoreset = 1,
215 };
216 
217 
218 static void
219 nxt_h1p_conn_ws_frame_header_read(nxt_task_t *task, void *obj, void *data)
220 {
221     size_t                   size, hsize, frame_size, max_frame_size;
222     uint64_t                 payload_len;
223     nxt_conn_t               *c;
224     nxt_h1proto_t            *h1p;
225     nxt_http_request_t       *r;
226     nxt_event_engine_t       *engine;
227     nxt_websocket_header_t   *wsh;
228     nxt_socket_conf_joint_t  *joint;
229 
230     c = obj;
231     h1p = data;
232 
233     nxt_h1p_conn_ws_keepalive_disable(task, h1p);
234 
235     size = nxt_buf_mem_used_size(&c->read->mem);
236 
237     engine = task->thread->engine;
238 
239     if (size < 2) {
240         nxt_debug(task, "h1p conn ws frame header read %z", size);
241 
242         nxt_conn_read(engine, c);
243         nxt_h1p_conn_ws_keepalive_enable(task, h1p);
244 
245         return;
246     }
247 
248     wsh = (nxt_websocket_header_t *) c->read->mem.pos;
249 
250     hsize = nxt_websocket_frame_header_size(wsh);
251 
252     if (size < hsize) {
253         nxt_debug(task, "h1p conn ws frame header read %z < %z", size, hsize);
254 
255         nxt_conn_read(engine, c);
256         nxt_h1p_conn_ws_keepalive_enable(task, h1p);
257 
258         return;
259     }
260 
261     r = h1p->request;
262     if (nxt_slow_path(r == NULL)) {
263         return;
264     }
265 
266     r->ws_frame = c->read;
267 
268     joint = c->listen->socket.data;
269 
270     if (nxt_slow_path(joint == NULL)) {
271         /*
272          * Listening socket had been closed while
273          * connection was in keep-alive state.
274          */
275         c->read_state = &nxt_h1p_idle_close_state;
276         return;
277     }
278 
279     if (nxt_slow_path(wsh->mask == 0)) {
280         hxt_h1p_send_ws_error(task, r, &nxt_ws_err_not_masked);
281         return;
282     }
283 
284     if ((wsh->opcode & NXT_WEBSOCKET_OP_CTRL) != 0) {
285         if (nxt_slow_path(wsh->fin == 0)) {
286             hxt_h1p_send_ws_error(task, r, &nxt_ws_err_ctrl_fragmented);
287             return;
288         }
289 
290         if (nxt_slow_path(wsh->opcode != NXT_WEBSOCKET_OP_PING
291                           && wsh->opcode != NXT_WEBSOCKET_OP_PONG
292                           && wsh->opcode != NXT_WEBSOCKET_OP_CLOSE))
293         {
294             hxt_h1p_send_ws_error(task, r, &nxt_ws_err_invalid_opcode,
295                                   wsh->opcode);
296             return;
297         }
298 
299         if (nxt_slow_path(wsh->payload_len > 125)) {
300             hxt_h1p_send_ws_error(task, r, &nxt_ws_err_ctrl_too_big,
301                                   nxt_websocket_frame_payload_len(wsh));
302             return;
303         }
304 
305         if (nxt_slow_path(wsh->opcode == NXT_WEBSOCKET_OP_CLOSE
306                           && wsh->payload_len == 1))
307         {
308             hxt_h1p_send_ws_error(task, r, &nxt_ws_err_invalid_close_len);
309             return;
310         }
311 
312     } else {
313         if (h1p->websocket_cont_expected) {
314             if (nxt_slow_path(wsh->opcode != NXT_WEBSOCKET_OP_CONT)) {
315                 hxt_h1p_send_ws_error(task, r, &nxt_ws_err_cont_expected,
316                                       wsh->opcode);
317                 return;
318             }
319 
320         } else {
321             if (nxt_slow_path(wsh->opcode != NXT_WEBSOCKET_OP_BINARY
322                               && wsh->opcode != NXT_WEBSOCKET_OP_TEXT))
323             {
324                 hxt_h1p_send_ws_error(task, r, &nxt_ws_err_invalid_opcode,
325                                       wsh->opcode);
326                 return;
327             }
328         }
329 
330         h1p->websocket_cont_expected = !wsh->fin;
331     }
332 
333     max_frame_size = joint->socket_conf->websocket_conf.max_frame_size;
334 
335     payload_len = nxt_websocket_frame_payload_len(wsh);
336 
337     if (nxt_slow_path(hsize > max_frame_size
338                       || payload_len > (max_frame_size - hsize)))
339     {
340         hxt_h1p_send_ws_error(task, r, &nxt_ws_err_too_big, payload_len);
341         return;
342     }
343 
344     c->read_state = &nxt_h1p_read_ws_frame_payload_state;
345 
346     frame_size = payload_len + hsize;
347 
348     nxt_debug(task, "h1p conn ws frame header read: %z, %z", size, frame_size);
349 
350     if (frame_size <= size) {
351         nxt_h1p_conn_ws_frame_process(task, c, h1p, wsh);
352 
353         return;
354     }
355 
356     if (frame_size < (size_t) nxt_buf_mem_size(&c->read->mem)) {
357         c->read->mem.end = c->read->mem.start + frame_size;
358 
359     } else {
360         nxt_buf_t *b = nxt_buf_mem_alloc(c->mem_pool, frame_size - size, 0);
361 
362         c->read->next = b;
363         c->read = b;
364     }
365 
366     nxt_conn_read(engine, c);
367     nxt_h1p_conn_ws_keepalive_enable(task, h1p);
368 }
369 
370 
371 static void
372 nxt_h1p_conn_ws_keepalive_disable(nxt_task_t *task, nxt_h1proto_t *h1p)
373 {
374     nxt_timer_t  *timer;
375 
376     if (h1p->websocket_timer == NULL) {
377         return;
378     }
379 
380     timer = &h1p->websocket_timer->timer;
381 
382     if (nxt_slow_path(timer->handler != nxt_h1p_conn_ws_keepalive)) {
383         nxt_debug(task, "h1p ws keepalive disable: scheduled ws shutdown");
384         return;
385     }
386 
387     nxt_timer_disable(task->thread->engine, timer);
388 }
389 
390 
391 static void
392 nxt_h1p_conn_ws_keepalive_enable(nxt_task_t *task, nxt_h1proto_t *h1p)
393 {
394     nxt_timer_t  *timer;
395 
396     if (h1p->websocket_timer == NULL) {
397         return;
398     }
399 
400     timer = &h1p->websocket_timer->timer;
401 
402     if (nxt_slow_path(timer->handler != nxt_h1p_conn_ws_keepalive)) {
403         nxt_debug(task, "h1p ws keepalive enable: scheduled ws shutdown");
404         return;
405     }
406 
407     nxt_timer_add(task->thread->engine, timer,
408                   h1p->websocket_timer->keepalive_interval);
409 }
410 
411 
412 static void
413 nxt_h1p_conn_ws_frame_process(nxt_task_t *task, nxt_conn_t *c,
414     nxt_h1proto_t *h1p, nxt_websocket_header_t *wsh)
415 {
416     size_t              hsize;
417     uint8_t             *p, *mask;
418     uint16_t            code;
419     nxt_http_request_t  *r;
420     nxt_event_engine_t  *engine;
421 
422     engine = task->thread->engine;
423     r = h1p->request;
424 
425     c->read = NULL;
426 
427     if (nxt_slow_path(wsh->opcode == NXT_WEBSOCKET_OP_PING)) {
428         nxt_work_queue_add(&engine->fast_work_queue, nxt_h1p_conn_ws_pong,
429                            task, r, NULL);
430         return;
431     }
432 
433     if (nxt_slow_path(wsh->opcode == NXT_WEBSOCKET_OP_CLOSE)) {
434         if (wsh->payload_len >= 2) {
435             hsize = nxt_websocket_frame_header_size(wsh);
436             mask = nxt_pointer_to(wsh, hsize - 4);
437             p = nxt_pointer_to(wsh, hsize);
438 
439             code = ((p[0] ^ mask[0]) << 8) + (p[1] ^ mask[1]);
440 
441             if (nxt_slow_path(code < 1000 || code >= 5000
442                               || (code > 1003 && code < 1007)
443                               || (code > 1014 && code < 3000)))
444             {
445                 hxt_h1p_send_ws_error(task, r, &nxt_ws_err_invalid_close_code,
446                                       code);
447                 return;
448             }
449         }
450 
451         h1p->websocket_closed = 1;
452     }
453 
454     nxt_work_queue_add(&engine->fast_work_queue, r->state->ready_handler,
455                        task, r, NULL);
456 }
457 
458 
459 static void
460 nxt_h1p_conn_ws_error(nxt_task_t *task, void *obj, void *data)
461 {
462     nxt_h1proto_t       *h1p;
463     nxt_http_request_t  *r;
464 
465     h1p = data;
466 
467     nxt_debug(task, "h1p conn ws error");
468 
469     r = h1p->request;
470 
471     h1p->keepalive = 0;
472 
473     if (nxt_fast_path(r != NULL)) {
474         r->state->error_handler(task, r, h1p);
475     }
476 }
477 
478 
479 static ssize_t
480 nxt_h1p_ws_io_read_handler(nxt_conn_t *c)
481 {
482     size_t     size;
483     ssize_t    n;
484     nxt_buf_t  *b;
485 
486     b = c->read;
487 
488     if (b == NULL) {
489         /* Enough for control frame. */
490         size = 10 + 125;
491 
492         b = nxt_buf_mem_alloc(c->mem_pool, size, 0);
493         if (nxt_slow_path(b == NULL)) {
494             c->socket.error = NXT_ENOMEM;
495             return NXT_ERROR;
496         }
497     }
498 
499     n = c->io->recvbuf(c, b);
500 
501     if (n > 0) {
502         c->read = b;
503 
504     } else {
505         c->read = NULL;
506         nxt_mp_free(c->mem_pool, b);
507     }
508 
509     return n;
510 }
511 
512 
513 static void
514 nxt_h1p_conn_ws_timeout(nxt_task_t *task, void *obj, void *data)
515 {
516     nxt_conn_t          *c;
517     nxt_timer_t         *timer;
518     nxt_h1proto_t       *h1p;
519     nxt_http_request_t  *r;
520 
521     timer = obj;
522 
523     nxt_debug(task, "h1p conn ws timeout");
524 
525     c = nxt_read_timer_conn(timer);
526     c->block_read = 1;
527     /*
528      * Disable SO_LINGER off during socket closing
529      * to send "408 Request Timeout" error response.
530      */
531     c->socket.timedout = 0;
532 
533     h1p = c->socket.data;
534     h1p->keepalive = 0;
535 
536     r = h1p->request;
537     if (nxt_slow_path(r == NULL)) {
538         return;
539     }
540 
541     hxt_h1p_send_ws_error(task, r, &nxt_ws_err_going_away);
542 }
543 
544 
545 static const nxt_conn_state_t  nxt_h1p_read_ws_frame_payload_state
546     nxt_aligned(64) =
547 {
548     .ready_handler = nxt_h1p_conn_ws_frame_payload_read,
549     .close_handler = nxt_h1p_conn_ws_error,
550     .error_handler = nxt_h1p_conn_ws_error,
551 
552     .timer_handler = nxt_h1p_conn_ws_timeout,
553     .timer_value = nxt_h1p_conn_request_timer_value,
554     .timer_data = offsetof(nxt_socket_conf_t, websocket_conf.read_timeout),
555     .timer_autoreset = 1,
556 };
557 
558 
559 static void
560 nxt_h1p_conn_ws_frame_payload_read(nxt_task_t *task, void *obj, void *data)
561 {
562     nxt_conn_t              *c;
563     nxt_h1proto_t           *h1p;
564     nxt_http_request_t      *r;
565     nxt_event_engine_t      *engine;
566     nxt_websocket_header_t  *wsh;
567 
568     c = obj;
569     h1p = data;
570 
571     nxt_h1p_conn_ws_keepalive_disable(task, h1p);
572 
573     nxt_debug(task, "h1p conn ws frame read");
574 
575     if (nxt_buf_mem_free_size(&c->read->mem) == 0) {
576         r = h1p->request;
577         if (nxt_slow_path(r == NULL)) {
578             return;
579         }
580 
581         wsh = (nxt_websocket_header_t *) r->ws_frame->mem.pos;
582 
583         nxt_h1p_conn_ws_frame_process(task, c, h1p, wsh);
584 
585         return;
586     }
587 
588     engine = task->thread->engine;
589 
590     nxt_conn_read(engine, c);
591     nxt_h1p_conn_ws_keepalive_enable(task, h1p);
592 }
593 
594 
595 static void
596 hxt_h1p_send_ws_error(nxt_task_t *task, nxt_http_request_t *r,
597     const nxt_ws_error_t *err, ...)
598 {
599     u_char                  *p;
600     va_list                 args;
601     nxt_buf_t               *out;
602     nxt_str_t               desc;
603     nxt_websocket_header_t  *wsh;
604     u_char                  buf[125];
605 
606     if (nxt_slow_path(err->args)) {
607         va_start(args, err);
608         p = nxt_vsprintf(buf, buf + sizeof(buf), (char *) err->desc.start,
609                          args);
610         va_end(args);
611 
612         desc.start = buf;
613         desc.length = p - buf;
614 
615     } else {
616         desc = err->desc;
617     }
618 
619     nxt_log(task, NXT_LOG_INFO, "websocket error %d: %V", err->code, &desc);
620 
621     out = nxt_http_buf_mem(task, r, 2 + sizeof(err->code) + desc.length);
622     if (nxt_slow_path(out == NULL)) {
623         nxt_http_request_error_handler(task, r, r->proto.any);
624         return;
625     }
626 
627     out->mem.start[0] = 0;
628     out->mem.start[1] = 0;
629 
630     wsh = (nxt_websocket_header_t *) out->mem.start;
631     p = nxt_websocket_frame_init(wsh, sizeof(err->code) + desc.length);
632 
633     wsh->fin = 1;
634     wsh->opcode = NXT_WEBSOCKET_OP_CLOSE;
635 
636     *p++ = (err->code >> 8) & 0xFF;
637     *p++ = err->code & 0xFF;
638 
639     out->mem.free = nxt_cpymem(p, desc.start, desc.length);
640     out->next = nxt_http_buf_last(r);
641 
642     if (out->next != NULL) {
643         out->next->completion_handler = nxt_h1p_conn_ws_error_sent;
644     }
645 
646     nxt_http_request_send(task, r, out);
647 }
648 
649 
650 static void
651 nxt_h1p_conn_ws_error_sent(nxt_task_t *task, void *obj, void *data)
652 {
653     nxt_http_request_t  *r;
654 
655     r = data;
656 
657     nxt_debug(task, "h1p conn ws error sent");
658 
659     r->state->error_handler(task, r, r->proto.any);
660 }
661 
662 
663 static void
664 nxt_h1p_conn_ws_pong(nxt_task_t *task, void *obj, void *data)
665 {
666     uint8_t                 payload_len, i;
667     nxt_buf_t               *b, *out, *next;
668     nxt_http_request_t      *r;
669     nxt_websocket_header_t  *wsh;
670     uint8_t                 mask[4];
671 
672     nxt_debug(task, "h1p conn ws pong");
673 
674     r = obj;
675     b = r->ws_frame;
676 
677     wsh = (nxt_websocket_header_t *) b->mem.pos;
678     payload_len = wsh->payload_len;
679 
680     b->mem.pos += 2;
681 
682     nxt_memcpy(mask, b->mem.pos, 4);
683 
684     b->mem.pos += 4;
685 
686     out = nxt_http_buf_mem(task, r, 2 + payload_len);
687     if (nxt_slow_path(out == NULL)) {
688         nxt_http_request_error_handler(task, r, r->proto.any);
689         return;
690     }
691 
692     out->mem.start[0] = 0;
693     out->mem.start[1] = 0;
694 
695     wsh = (nxt_websocket_header_t *) out->mem.start;
696     out->mem.free = nxt_websocket_frame_init(wsh, payload_len);
697 
698     wsh->fin = 1;
699     wsh->opcode = NXT_WEBSOCKET_OP_PONG;
700 
701     for (i = 0; i < payload_len; i++) {
702         while (nxt_buf_mem_used_size(&b->mem) == 0) {
703             next = b->next;
704 
705             nxt_work_queue_add(&task->thread->engine->fast_work_queue,
706                                b->completion_handler, task, b, b->parent);
707 
708             b = next;
709         }
710 
711         *out->mem.free++ = *b->mem.pos++ ^ mask[i % 4];
712     }
713 
714     r->ws_frame = b;
715 
716     nxt_http_request_send(task, r, out);
717 
718     nxt_http_request_ws_frame_start(task, r, r->ws_frame);
719 }
720