1 2 /* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6 #include <nxt_main.h> 7 #include <nxt_router.h> 8 #include <nxt_http.h> 9 #include <nxt_h1proto.h> 10 #include <nxt_websocket.h> 11 #include <nxt_websocket_header.h> 12 13 typedef struct { 14 uint16_t code; 15 uint8_t args; 16 nxt_str_t desc; 17 } nxt_ws_error_t; 18 19 static void nxt_h1p_conn_ws_keepalive(nxt_task_t *task, void *obj, void *data); 20 static void nxt_h1p_conn_ws_frame_header_read(nxt_task_t *task, void *obj, 21 void *data); 22 static void nxt_h1p_conn_ws_keepalive_disable(nxt_task_t *task, 23 nxt_h1proto_t *h1p); 24 static void nxt_h1p_conn_ws_keepalive_enable(nxt_task_t *task, 25 nxt_h1proto_t *h1p); 26 static void nxt_h1p_conn_ws_frame_process(nxt_task_t *task, nxt_conn_t *c, 27 nxt_h1proto_t *h1p, nxt_websocket_header_t *wsh); 28 static void nxt_h1p_conn_ws_error(nxt_task_t *task, void *obj, void *data); 29 static ssize_t nxt_h1p_ws_io_read_handler(nxt_task_t *task, nxt_conn_t *c); 30 static void nxt_h1p_conn_ws_timeout(nxt_task_t *task, void *obj, void *data); 31 static void nxt_h1p_conn_ws_frame_payload_read(nxt_task_t *task, void *obj, 32 void *data); 33 static void hxt_h1p_send_ws_error(nxt_task_t *task, nxt_http_request_t *r, 34 const nxt_ws_error_t *err, ...); 35 static void nxt_h1p_conn_ws_error_sent(nxt_task_t *task, void *obj, void *data); 36 static void nxt_h1p_conn_ws_pong(nxt_task_t *task, void *obj, void *data); 37 38 static const nxt_conn_state_t nxt_h1p_read_ws_frame_header_state; 39 static const nxt_conn_state_t nxt_h1p_read_ws_frame_payload_state; 40 41 static const nxt_ws_error_t nxt_ws_err_out_of_memory = { 42 NXT_WEBSOCKET_CR_INTERNAL_SERVER_ERROR, 43 0, nxt_string("Out of memory") }; 44 static const nxt_ws_error_t nxt_ws_err_too_big = { 45 NXT_WEBSOCKET_CR_MESSAGE_TOO_BIG, 46 1, nxt_string("Message too big: %uL bytes") }; 47 static const nxt_ws_error_t nxt_ws_err_invalid_close_code = { 48 NXT_WEBSOCKET_CR_PROTOCOL_ERROR, 49 1, nxt_string("Close code %ud is not valid") }; 50 static const nxt_ws_error_t nxt_ws_err_going_away = { 51 NXT_WEBSOCKET_CR_GOING_AWAY, 52 0, nxt_string("Remote peer is going away") }; 53 static const nxt_ws_error_t nxt_ws_err_not_masked = { 54 NXT_WEBSOCKET_CR_PROTOCOL_ERROR, 55 0, nxt_string("Not masked client frame") }; 56 static const nxt_ws_error_t nxt_ws_err_ctrl_fragmented = { 57 NXT_WEBSOCKET_CR_PROTOCOL_ERROR, 58 0, nxt_string("Fragmented control frame") }; 59 static const nxt_ws_error_t nxt_ws_err_ctrl_too_big = { 60 NXT_WEBSOCKET_CR_PROTOCOL_ERROR, 61 1, nxt_string("Control frame too big: %uL bytes") }; 62 static const nxt_ws_error_t nxt_ws_err_invalid_close_len = { 63 NXT_WEBSOCKET_CR_PROTOCOL_ERROR, 64 0, nxt_string("Close frame payload length cannot be 1") }; 65 static const nxt_ws_error_t nxt_ws_err_invalid_opcode = { 66 NXT_WEBSOCKET_CR_PROTOCOL_ERROR, 67 1, nxt_string("Unrecognized opcode %ud") }; 68 static const nxt_ws_error_t nxt_ws_err_cont_expected = { 69 NXT_WEBSOCKET_CR_PROTOCOL_ERROR, 70 1, nxt_string("Continuation expected, but %ud opcode received") }; 71 72 void 73 nxt_h1p_websocket_first_frame_start(nxt_task_t *task, nxt_http_request_t *r, 74 nxt_buf_t *ws_frame) 75 { 76 nxt_conn_t *c; 77 nxt_timer_t *timer; 78 nxt_h1proto_t *h1p; 79 nxt_socket_conf_joint_t *joint; 80 81 nxt_debug(task, "h1p ws first frame start"); 82 83 h1p = r->proto.h1; 84 c = h1p->conn; 85 86 if (!c->tcp_nodelay) { 87 nxt_conn_tcp_nodelay_on(task, c); 88 } 89 90 joint = c->listen->socket.data; 91 92 if (nxt_slow_path(joint != NULL 93 && joint->socket_conf->websocket_conf.keepalive_interval != 0)) 94 { 95 h1p->websocket_timer = nxt_mp_zget(c->mem_pool, 96 sizeof(nxt_h1p_websocket_timer_t)); 97 if (nxt_slow_path(h1p->websocket_timer == NULL)) { 98 hxt_h1p_send_ws_error(task, r, &nxt_ws_err_out_of_memory); 99 return; 100 } 101 102 h1p->websocket_timer->keepalive_interval = 103 joint->socket_conf->websocket_conf.keepalive_interval; 104 h1p->websocket_timer->h1p = h1p; 105 106 timer = &h1p->websocket_timer->timer; 107 timer->task = &c->task; 108 timer->work_queue = &task->thread->engine->fast_work_queue; 109 timer->log = &c->log; 110 timer->bias = NXT_TIMER_DEFAULT_BIAS; 111 timer->handler = nxt_h1p_conn_ws_keepalive; 112 } 113 114 nxt_h1p_websocket_frame_start(task, r, ws_frame); 115 } 116 117 118 void 119 nxt_h1p_websocket_frame_start(nxt_task_t *task, nxt_http_request_t *r, 120 nxt_buf_t *ws_frame) 121 { 122 size_t size; 123 nxt_buf_t *in; 124 nxt_conn_t *c; 125 nxt_h1proto_t *h1p; 126 127 nxt_debug(task, "h1p ws frame start"); 128 129 h1p = r->proto.h1; 130 131 if (nxt_slow_path(h1p->websocket_closed)) { 132 return; 133 } 134 135 c = h1p->conn; 136 c->read = ws_frame; 137 138 nxt_h1p_complete_buffers(task, h1p); 139 140 in = c->read; 141 c->read_state = &nxt_h1p_read_ws_frame_header_state; 142 143 if (in == NULL) { 144 nxt_conn_read(task->thread->engine, c); 145 nxt_h1p_conn_ws_keepalive_enable(task, h1p); 146 147 } else { 148 size = nxt_buf_mem_used_size(&in->mem); 149 150 nxt_debug(task, "h1p read client ws frame"); 151 152 nxt_memmove(in->mem.start, in->mem.pos, size); 153 154 in->mem.pos = in->mem.start; 155 in->mem.free = in->mem.start + size; 156 157 nxt_h1p_conn_ws_frame_header_read(task, c, h1p); 158 } 159 } 160 161 162 static void 163 nxt_h1p_conn_ws_keepalive(nxt_task_t *task, void *obj, void *data) 164 { 165 nxt_buf_t *out; 166 nxt_timer_t *timer; 167 nxt_h1proto_t *h1p; 168 nxt_http_request_t *r; 169 nxt_websocket_header_t *wsh; 170 nxt_h1p_websocket_timer_t *ws_timer; 171 172 nxt_debug(task, "h1p conn ws keepalive"); 173 174 timer = obj; 175 ws_timer = nxt_timer_data(timer, nxt_h1p_websocket_timer_t, timer); 176 h1p = ws_timer->h1p; 177 178 r = h1p->request; 179 if (nxt_slow_path(r == NULL)) { 180 return; 181 } 182 183 out = nxt_http_buf_mem(task, r, 2); 184 if (nxt_slow_path(out == NULL)) { 185 nxt_http_request_error_handler(task, r, r->proto.any); 186 return; 187 } 188 189 out->mem.start[0] = 0; 190 out->mem.start[1] = 0; 191 192 wsh = (nxt_websocket_header_t *) out->mem.start; 193 out->mem.free = nxt_websocket_frame_init(wsh, 0); 194 195 wsh->fin = 1; 196 wsh->opcode = NXT_WEBSOCKET_OP_PING; 197 198 nxt_http_request_send(task, r, out); 199 } 200 201 202 static const nxt_conn_state_t nxt_h1p_read_ws_frame_header_state 203 nxt_aligned(64) = 204 { 205 .ready_handler = nxt_h1p_conn_ws_frame_header_read, 206 .close_handler = nxt_h1p_conn_ws_error, 207 .error_handler = nxt_h1p_conn_ws_error, 208 209 .io_read_handler = nxt_h1p_ws_io_read_handler, 210 211 .timer_handler = nxt_h1p_conn_ws_timeout, 212 .timer_value = nxt_h1p_conn_request_timer_value, 213 .timer_data = offsetof(nxt_socket_conf_t, websocket_conf.read_timeout), 214 .timer_autoreset = 1, 215 }; 216 217 218 static void 219 nxt_h1p_conn_ws_frame_header_read(nxt_task_t *task, void *obj, void *data) 220 { 221 size_t size, hsize, frame_size, max_frame_size; 222 uint64_t payload_len; 223 nxt_conn_t *c; 224 nxt_h1proto_t *h1p; 225 nxt_http_request_t *r; 226 nxt_event_engine_t *engine; 227 nxt_websocket_header_t *wsh; 228 nxt_socket_conf_joint_t *joint; 229 230 c = obj; 231 h1p = data; 232 233 nxt_h1p_conn_ws_keepalive_disable(task, h1p); 234 235 size = nxt_buf_mem_used_size(&c->read->mem); 236 237 engine = task->thread->engine; 238 239 if (size < 2) { 240 nxt_debug(task, "h1p conn ws frame header read %z", size); 241 242 nxt_conn_read(engine, c); 243 nxt_h1p_conn_ws_keepalive_enable(task, h1p); 244 245 return; 246 } 247 248 wsh = (nxt_websocket_header_t *) c->read->mem.pos; 249 250 hsize = nxt_websocket_frame_header_size(wsh); 251 252 if (size < hsize) { 253 nxt_debug(task, "h1p conn ws frame header read %z < %z", size, hsize); 254 255 nxt_conn_read(engine, c); 256 nxt_h1p_conn_ws_keepalive_enable(task, h1p); 257 258 return; 259 } 260 261 r = h1p->request; 262 if (nxt_slow_path(r == NULL)) { 263 return; 264 } 265 266 r->ws_frame = c->read; 267 268 joint = c->listen->socket.data; 269 270 if (nxt_slow_path(joint == NULL)) { 271 /* 272 * Listening socket had been closed while 273 * connection was in keep-alive state. 274 */ 275 c->read_state = &nxt_h1p_idle_close_state; 276 return; 277 } 278 279 if (nxt_slow_path(wsh->mask == 0)) { 280 hxt_h1p_send_ws_error(task, r, &nxt_ws_err_not_masked); 281 return; 282 } 283 284 if ((wsh->opcode & NXT_WEBSOCKET_OP_CTRL) != 0) { 285 if (nxt_slow_path(wsh->fin == 0)) { 286 hxt_h1p_send_ws_error(task, r, &nxt_ws_err_ctrl_fragmented); 287 return; 288 } 289 290 if (nxt_slow_path(wsh->opcode != NXT_WEBSOCKET_OP_PING 291 && wsh->opcode != NXT_WEBSOCKET_OP_PONG 292 && wsh->opcode != NXT_WEBSOCKET_OP_CLOSE)) 293 { 294 hxt_h1p_send_ws_error(task, r, &nxt_ws_err_invalid_opcode, 295 wsh->opcode); 296 return; 297 } 298 299 if (nxt_slow_path(wsh->payload_len > 125)) { 300 hxt_h1p_send_ws_error(task, r, &nxt_ws_err_ctrl_too_big, 301 nxt_websocket_frame_payload_len(wsh)); 302 return; 303 } 304 305 if (nxt_slow_path(wsh->opcode == NXT_WEBSOCKET_OP_CLOSE 306 && wsh->payload_len == 1)) 307 { 308 hxt_h1p_send_ws_error(task, r, &nxt_ws_err_invalid_close_len); 309 return; 310 } 311 312 } else { 313 if (h1p->websocket_cont_expected) { 314 if (nxt_slow_path(wsh->opcode != NXT_WEBSOCKET_OP_CONT)) { 315 hxt_h1p_send_ws_error(task, r, &nxt_ws_err_cont_expected, 316 wsh->opcode); 317 return; 318 } 319 320 } else { 321 if (nxt_slow_path(wsh->opcode != NXT_WEBSOCKET_OP_BINARY 322 && wsh->opcode != NXT_WEBSOCKET_OP_TEXT)) 323 { 324 hxt_h1p_send_ws_error(task, r, &nxt_ws_err_invalid_opcode, 325 wsh->opcode); 326 return; 327 } 328 } 329 330 h1p->websocket_cont_expected = !wsh->fin; 331 } 332 333 max_frame_size = joint->socket_conf->websocket_conf.max_frame_size; 334 335 payload_len = nxt_websocket_frame_payload_len(wsh); 336 337 if (nxt_slow_path(hsize > max_frame_size 338 || payload_len > (max_frame_size - hsize))) 339 { 340 hxt_h1p_send_ws_error(task, r, &nxt_ws_err_too_big, payload_len); 341 return; 342 } 343 344 c->read_state = &nxt_h1p_read_ws_frame_payload_state; 345 346 frame_size = payload_len + hsize; 347 348 nxt_debug(task, "h1p conn ws frame header read: %z, %z", size, frame_size); 349 350 if (frame_size <= size) { 351 nxt_h1p_conn_ws_frame_process(task, c, h1p, wsh); 352 353 return; 354 } 355 356 if (frame_size < (size_t) nxt_buf_mem_size(&c->read->mem)) { 357 c->read->mem.end = c->read->mem.start + frame_size; 358 359 } else { 360 nxt_buf_t *b = nxt_buf_mem_alloc(c->mem_pool, frame_size - size, 0); 361 362 c->read->next = b; 363 c->read = b; 364 } 365 366 nxt_conn_read(engine, c); 367 nxt_h1p_conn_ws_keepalive_enable(task, h1p); 368 } 369 370 371 static void 372 nxt_h1p_conn_ws_keepalive_disable(nxt_task_t *task, nxt_h1proto_t *h1p) 373 { 374 nxt_timer_t *timer; 375 376 if (h1p->websocket_timer == NULL) { 377 return; 378 } 379 380 timer = &h1p->websocket_timer->timer; 381 382 if (nxt_slow_path(timer->handler != nxt_h1p_conn_ws_keepalive)) { 383 nxt_debug(task, "h1p ws keepalive disable: scheduled ws shutdown"); 384 return; 385 } 386 387 nxt_timer_disable(task->thread->engine, timer); 388 } 389 390 391 static void 392 nxt_h1p_conn_ws_keepalive_enable(nxt_task_t *task, nxt_h1proto_t *h1p) 393 { 394 nxt_timer_t *timer; 395 396 if (h1p->websocket_timer == NULL) { 397 return; 398 } 399 400 timer = &h1p->websocket_timer->timer; 401 402 if (nxt_slow_path(timer->handler != nxt_h1p_conn_ws_keepalive)) { 403 nxt_debug(task, "h1p ws keepalive enable: scheduled ws shutdown"); 404 return; 405 } 406 407 nxt_timer_add(task->thread->engine, timer, 408 h1p->websocket_timer->keepalive_interval); 409 } 410 411 412 static void 413 nxt_h1p_conn_ws_frame_process(nxt_task_t *task, nxt_conn_t *c, 414 nxt_h1proto_t *h1p, nxt_websocket_header_t *wsh) 415 { 416 size_t hsize; 417 uint8_t *p, *mask; 418 uint16_t code; 419 nxt_http_request_t *r; 420 421 r = h1p->request; 422 423 c->read = NULL; 424 425 if (nxt_slow_path(wsh->opcode == NXT_WEBSOCKET_OP_PING)) { 426 nxt_h1p_conn_ws_pong(task, r, NULL); 427 return; 428 } 429 430 if (nxt_slow_path(wsh->opcode == NXT_WEBSOCKET_OP_CLOSE)) { 431 if (wsh->payload_len >= 2) { 432 hsize = nxt_websocket_frame_header_size(wsh); 433 mask = nxt_pointer_to(wsh, hsize - 4); 434 p = nxt_pointer_to(wsh, hsize); 435 436 code = ((p[0] ^ mask[0]) << 8) + (p[1] ^ mask[1]); 437 438 if (nxt_slow_path(code < 1000 || code >= 5000 439 || (code > 1003 && code < 1007) 440 || (code > 1014 && code < 3000))) 441 { 442 hxt_h1p_send_ws_error(task, r, &nxt_ws_err_invalid_close_code, 443 code); 444 return; 445 } 446 } 447 448 h1p->websocket_closed = 1; 449 } 450 451 r->state->ready_handler(task, r, NULL); 452 } 453 454 455 static void 456 nxt_h1p_conn_ws_error(nxt_task_t *task, void *obj, void *data) 457 { 458 nxt_h1proto_t *h1p; 459 nxt_http_request_t *r; 460 461 h1p = data; 462 463 nxt_debug(task, "h1p conn ws error"); 464 465 r = h1p->request; 466 467 h1p->keepalive = 0; 468 469 if (nxt_fast_path(r != NULL)) { 470 r->state->error_handler(task, r, h1p); 471 } 472 } 473 474 475 static ssize_t 476 nxt_h1p_ws_io_read_handler(nxt_task_t *task, nxt_conn_t *c) 477 { 478 size_t size; 479 ssize_t n; 480 nxt_buf_t *b; 481 482 b = c->read; 483 484 if (b == NULL) { 485 /* Enough for control frame. */ 486 size = 10 + 125; 487 488 b = nxt_buf_mem_alloc(c->mem_pool, size, 0); 489 if (nxt_slow_path(b == NULL)) { 490 c->socket.error = NXT_ENOMEM; 491 return NXT_ERROR; 492 } 493 } 494 495 n = c->io->recvbuf(c, b); 496 497 if (n > 0) { 498 c->read = b; 499 500 } else { 501 c->read = NULL; 502 nxt_mp_free(c->mem_pool, b); 503 } 504 505 return n; 506 } 507 508 509 static void 510 nxt_h1p_conn_ws_timeout(nxt_task_t *task, void *obj, void *data) 511 { 512 nxt_conn_t *c; 513 nxt_timer_t *timer; 514 nxt_h1proto_t *h1p; 515 nxt_http_request_t *r; 516 517 timer = obj; 518 519 nxt_debug(task, "h1p conn ws timeout"); 520 521 c = nxt_read_timer_conn(timer); 522 c->block_read = 1; 523 /* 524 * Disable SO_LINGER off during socket closing 525 * to send "408 Request Timeout" error response. 526 */ 527 c->socket.timedout = 0; 528 529 h1p = c->socket.data; 530 h1p->keepalive = 0; 531 532 r = h1p->request; 533 if (nxt_slow_path(r == NULL)) { 534 return; 535 } 536 537 hxt_h1p_send_ws_error(task, r, &nxt_ws_err_going_away); 538 } 539 540 541 static const nxt_conn_state_t nxt_h1p_read_ws_frame_payload_state 542 nxt_aligned(64) = 543 { 544 .ready_handler = nxt_h1p_conn_ws_frame_payload_read, 545 .close_handler = nxt_h1p_conn_ws_error, 546 .error_handler = nxt_h1p_conn_ws_error, 547 548 .timer_handler = nxt_h1p_conn_ws_timeout, 549 .timer_value = nxt_h1p_conn_request_timer_value, 550 .timer_data = offsetof(nxt_socket_conf_t, websocket_conf.read_timeout), 551 .timer_autoreset = 1, 552 }; 553 554 555 static void 556 nxt_h1p_conn_ws_frame_payload_read(nxt_task_t *task, void *obj, void *data) 557 { 558 nxt_conn_t *c; 559 nxt_h1proto_t *h1p; 560 nxt_http_request_t *r; 561 nxt_event_engine_t *engine; 562 nxt_websocket_header_t *wsh; 563 564 c = obj; 565 h1p = data; 566 567 nxt_h1p_conn_ws_keepalive_disable(task, h1p); 568 569 nxt_debug(task, "h1p conn ws frame read"); 570 571 if (nxt_buf_mem_free_size(&c->read->mem) == 0) { 572 r = h1p->request; 573 if (nxt_slow_path(r == NULL)) { 574 return; 575 } 576 577 wsh = (nxt_websocket_header_t *) r->ws_frame->mem.pos; 578 579 nxt_h1p_conn_ws_frame_process(task, c, h1p, wsh); 580 581 return; 582 } 583 584 engine = task->thread->engine; 585 586 nxt_conn_read(engine, c); 587 nxt_h1p_conn_ws_keepalive_enable(task, h1p); 588 } 589 590 591 static void 592 hxt_h1p_send_ws_error(nxt_task_t *task, nxt_http_request_t *r, 593 const nxt_ws_error_t *err, ...) 594 { 595 u_char *p; 596 va_list args; 597 nxt_buf_t *out; 598 nxt_str_t desc; 599 nxt_websocket_header_t *wsh; 600 u_char buf[125]; 601 602 if (nxt_slow_path(err->args)) { 603 va_start(args, err); 604 p = nxt_vsprintf(buf, buf + sizeof(buf), (char *) err->desc.start, 605 args); 606 va_end(args); 607 608 desc.start = buf; 609 desc.length = p - buf; 610 611 } else { 612 desc = err->desc; 613 } 614 615 nxt_log(task, NXT_LOG_INFO, "websocket error %d: %V", err->code, &desc); 616 617 out = nxt_http_buf_mem(task, r, 2 + sizeof(err->code) + desc.length); 618 if (nxt_slow_path(out == NULL)) { 619 nxt_http_request_error_handler(task, r, r->proto.any); 620 return; 621 } 622 623 out->mem.start[0] = 0; 624 out->mem.start[1] = 0; 625 626 wsh = (nxt_websocket_header_t *) out->mem.start; 627 p = nxt_websocket_frame_init(wsh, sizeof(err->code) + desc.length); 628 629 wsh->fin = 1; 630 wsh->opcode = NXT_WEBSOCKET_OP_CLOSE; 631 632 *p++ = (err->code >> 8) & 0xFF; 633 *p++ = err->code & 0xFF; 634 635 out->mem.free = nxt_cpymem(p, desc.start, desc.length); 636 out->next = nxt_http_buf_last(r); 637 638 if (out->next != NULL) { 639 out->next->completion_handler = nxt_h1p_conn_ws_error_sent; 640 } 641 642 nxt_http_request_send(task, r, out); 643 } 644 645 646 static void 647 nxt_h1p_conn_ws_error_sent(nxt_task_t *task, void *obj, void *data) 648 { 649 nxt_http_request_t *r; 650 651 r = data; 652 653 nxt_debug(task, "h1p conn ws error sent"); 654 655 r->state->error_handler(task, r, r->proto.any); 656 } 657 658 659 static void 660 nxt_h1p_conn_ws_pong(nxt_task_t *task, void *obj, void *data) 661 { 662 uint8_t payload_len, i; 663 nxt_buf_t *b, *out, *next; 664 nxt_http_request_t *r; 665 nxt_websocket_header_t *wsh; 666 uint8_t mask[4]; 667 668 nxt_debug(task, "h1p conn ws pong"); 669 670 r = obj; 671 b = r->ws_frame; 672 673 wsh = (nxt_websocket_header_t *) b->mem.pos; 674 payload_len = wsh->payload_len; 675 676 b->mem.pos += 2; 677 678 nxt_memcpy(mask, b->mem.pos, 4); 679 680 b->mem.pos += 4; 681 682 out = nxt_http_buf_mem(task, r, 2 + payload_len); 683 if (nxt_slow_path(out == NULL)) { 684 nxt_http_request_error_handler(task, r, r->proto.any); 685 return; 686 } 687 688 out->mem.start[0] = 0; 689 out->mem.start[1] = 0; 690 691 wsh = (nxt_websocket_header_t *) out->mem.start; 692 out->mem.free = nxt_websocket_frame_init(wsh, payload_len); 693 694 wsh->fin = 1; 695 wsh->opcode = NXT_WEBSOCKET_OP_PONG; 696 697 for (i = 0; i < payload_len; i++) { 698 while (nxt_buf_mem_used_size(&b->mem) == 0) { 699 next = b->next; 700 b->next = NULL; 701 702 nxt_work_queue_add(&task->thread->engine->fast_work_queue, 703 b->completion_handler, task, b, b->parent); 704 705 b = next; 706 } 707 708 *out->mem.free++ = *b->mem.pos++ ^ mask[i % 4]; 709 } 710 711 r->ws_frame = b; 712 713 nxt_http_request_send(task, r, out); 714 715 nxt_http_request_ws_frame_start(task, r, r->ws_frame); 716 } 717