1 2 /* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6 #include "unit.h" 7 8 #include <unistd.h> 9 #include <fcntl.h> 10 11 #include <uv.h> 12 13 #include <nxt_unit_websocket.h> 14 15 16 napi_ref Unit::constructor_; 17 18 19 struct nxt_nodejs_ctx_t { 20 nxt_unit_port_id_t port_id; 21 uv_poll_t poll; 22 }; 23 24 25 struct req_data_t { 26 napi_ref sock_ref; 27 napi_ref resp_ref; 28 napi_ref conn_ref; 29 }; 30 31 32 Unit::Unit(napi_env env, napi_value jsthis): 33 nxt_napi(env), 34 wrapper_(wrap(jsthis, this, destroy)), 35 unit_ctx_(nullptr) 36 { 37 nxt_unit_debug(NULL, "Unit::Unit()"); 38 } 39 40 41 Unit::~Unit() 42 { 43 delete_reference(wrapper_); 44 45 nxt_unit_debug(NULL, "Unit::~Unit()"); 46 } 47 48 49 napi_value 50 Unit::init(napi_env env, napi_value exports) 51 { 52 nxt_napi napi(env); 53 napi_value ctor; 54 55 napi_property_descriptor unit_props[] = { 56 { "createServer", 0, create_server, 0, 0, 0, napi_default, 0 }, 57 { "listen", 0, listen, 0, 0, 0, napi_default, 0 }, 58 }; 59 60 try { 61 ctor = napi.define_class("Unit", create, 2, unit_props); 62 constructor_ = napi.create_reference(ctor); 63 64 napi.set_named_property(exports, "Unit", ctor); 65 napi.set_named_property(exports, "response_send_headers", 66 response_send_headers); 67 napi.set_named_property(exports, "response_write", response_write); 68 napi.set_named_property(exports, "response_end", response_end); 69 napi.set_named_property(exports, "websocket_send_frame", 70 websocket_send_frame); 71 napi.set_named_property(exports, "websocket_set_sock", 72 websocket_set_sock); 73 74 } catch (exception &e) { 75 napi.throw_error(e); 76 return nullptr; 77 } 78 79 return exports; 80 } 81 82 83 void 84 Unit::destroy(napi_env env, void *nativeObject, void *finalize_hint) 85 { 86 Unit *obj = reinterpret_cast<Unit *>(nativeObject); 87 88 delete obj; 89 } 90 91 92 napi_value 93 Unit::create(napi_env env, napi_callback_info info) 94 { 95 nxt_napi napi(env); 96 napi_value target, ctor, instance, jsthis; 97 98 try { 99 target = napi.get_new_target(info); 100 101 if (target != nullptr) { 102 /* Invoked as constructor: `new Unit(...)`. */ 103 jsthis = napi.get_cb_info(info); 104 105 new Unit(env, jsthis); 106 napi.create_reference(jsthis); 107 108 return jsthis; 109 } 110 111 /* Invoked as plain function `Unit(...)`, turn into construct call. */ 112 ctor = napi.get_reference_value(constructor_); 113 instance = napi.new_instance(ctor); 114 napi.create_reference(instance); 115 116 } catch (exception &e) { 117 napi.throw_error(e); 118 return nullptr; 119 } 120 121 return instance; 122 } 123 124 125 napi_value 126 Unit::create_server(napi_env env, napi_callback_info info) 127 { 128 Unit *obj; 129 size_t argc; 130 nxt_napi napi(env); 131 napi_value jsthis, argv; 132 nxt_unit_init_t unit_init; 133 134 argc = 1; 135 136 try { 137 jsthis = napi.get_cb_info(info, argc, &argv); 138 obj = (Unit *) napi.unwrap(jsthis); 139 140 } catch (exception &e) { 141 napi.throw_error(e); 142 return nullptr; 143 } 144 145 memset(&unit_init, 0, sizeof(nxt_unit_init_t)); 146 147 unit_init.data = obj; 148 unit_init.callbacks.request_handler = request_handler_cb; 149 unit_init.callbacks.websocket_handler = websocket_handler_cb; 150 unit_init.callbacks.close_handler = close_handler_cb; 151 unit_init.callbacks.add_port = add_port; 152 unit_init.callbacks.remove_port = remove_port; 153 unit_init.callbacks.quit = quit_cb; 154 155 unit_init.request_data_size = sizeof(req_data_t); 156 157 obj->unit_ctx_ = nxt_unit_init(&unit_init); 158 if (obj->unit_ctx_ == NULL) { 159 goto failed; 160 } 161 162 return nullptr; 163 164 failed: 165 166 napi_throw_error(env, NULL, "Failed to create Unit object"); 167 168 return nullptr; 169 } 170 171 172 napi_value 173 Unit::listen(napi_env env, napi_callback_info info) 174 { 175 return nullptr; 176 } 177 178 179 void 180 Unit::request_handler_cb(nxt_unit_request_info_t *req) 181 { 182 Unit *obj; 183 184 obj = reinterpret_cast<Unit *>(req->unit->data); 185 186 obj->request_handler(req); 187 } 188 189 190 void 191 Unit::request_handler(nxt_unit_request_info_t *req) 192 { 193 napi_value socket, request, response, server_obj, emit_request; 194 195 memset(req->data, 0, sizeof(req_data_t)); 196 197 try { 198 nxt_handle_scope scope(env()); 199 200 server_obj = get_server_object(); 201 202 socket = create_socket(server_obj, req); 203 request = create_request(server_obj, socket); 204 response = create_response(server_obj, request, req); 205 206 create_headers(req, request); 207 208 emit_request = get_named_property(server_obj, "emit_request"); 209 210 nxt_async_context async_context(env(), "request_handler"); 211 nxt_callback_scope async_scope(async_context); 212 213 make_callback(async_context, server_obj, emit_request, request, 214 response); 215 216 } catch (exception &e) { 217 nxt_unit_req_warn(req, "request_handler: %s", e.str); 218 } 219 } 220 221 222 void 223 Unit::websocket_handler_cb(nxt_unit_websocket_frame_t *ws) 224 { 225 Unit *obj; 226 227 obj = reinterpret_cast<Unit *>(ws->req->unit->data); 228 229 obj->websocket_handler(ws); 230 } 231 232 233 void 234 Unit::websocket_handler(nxt_unit_websocket_frame_t *ws) 235 { 236 napi_value frame, server_obj, process_frame, conn; 237 req_data_t *req_data; 238 239 req_data = (req_data_t *) ws->req->data; 240 241 try { 242 nxt_handle_scope scope(env()); 243 244 server_obj = get_server_object(); 245 246 frame = create_websocket_frame(server_obj, ws); 247 248 conn = get_reference_value(req_data->conn_ref); 249 250 process_frame = get_named_property(conn, "processFrame"); 251 252 nxt_async_context async_context(env(), "websocket_handler"); 253 nxt_callback_scope async_scope(async_context); 254 255 make_callback(async_context, conn, process_frame, frame); 256 257 } catch (exception &e) { 258 nxt_unit_req_warn(ws->req, "websocket_handler: %s", e.str); 259 } 260 261 nxt_unit_websocket_done(ws); 262 } 263 264 265 void 266 Unit::close_handler_cb(nxt_unit_request_info_t *req) 267 { 268 Unit *obj; 269 270 obj = reinterpret_cast<Unit *>(req->unit->data); 271 272 obj->close_handler(req); 273 } 274 275 276 void 277 Unit::close_handler(nxt_unit_request_info_t *req) 278 { 279 napi_value conn_handle_close, conn; 280 req_data_t *req_data; 281 282 req_data = (req_data_t *) req->data; 283 284 try { 285 nxt_handle_scope scope(env()); 286 287 conn = get_reference_value(req_data->conn_ref); 288 289 conn_handle_close = get_named_property(conn, "handleSocketClose"); 290 291 nxt_async_context async_context(env(), "close_handler"); 292 nxt_callback_scope async_scope(async_context); 293 294 make_callback(async_context, conn, conn_handle_close, 295 nxt_napi::create(0)); 296 297 remove_wrap(req_data->sock_ref); 298 remove_wrap(req_data->resp_ref); 299 remove_wrap(req_data->conn_ref); 300 301 } catch (exception &e) { 302 nxt_unit_req_warn(req, "close_handler: %s", e.str); 303 304 return; 305 } 306 307 nxt_unit_request_done(req, NXT_UNIT_OK); 308 } 309 310 311 static void 312 nxt_uv_read_callback(uv_poll_t *handle, int status, int events) 313 { 314 nxt_unit_run_once((nxt_unit_ctx_t *) handle->data); 315 } 316 317 318 int 319 Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 320 { 321 int err; 322 Unit *obj; 323 uv_loop_t *loop; 324 napi_status status; 325 nxt_nodejs_ctx_t *node_ctx; 326 327 if (port->in_fd != -1) { 328 obj = reinterpret_cast<Unit *>(ctx->unit->data); 329 330 if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) { 331 nxt_unit_warn(ctx, "fcntl(%d, O_NONBLOCK) failed: %s (%d)", 332 port->in_fd, strerror(errno), errno); 333 return -1; 334 } 335 336 status = napi_get_uv_event_loop(obj->env(), &loop); 337 if (status != napi_ok) { 338 nxt_unit_warn(ctx, "Failed to get uv.loop"); 339 return NXT_UNIT_ERROR; 340 } 341 342 node_ctx = new nxt_nodejs_ctx_t; 343 344 err = uv_poll_init(loop, &node_ctx->poll, port->in_fd); 345 if (err < 0) { 346 nxt_unit_warn(ctx, "Failed to init uv.poll"); 347 return NXT_UNIT_ERROR; 348 } 349 350 err = uv_poll_start(&node_ctx->poll, UV_READABLE, nxt_uv_read_callback); 351 if (err < 0) { 352 nxt_unit_warn(ctx, "Failed to start uv.poll"); 353 return NXT_UNIT_ERROR; 354 } 355 356 ctx->data = node_ctx; 357 358 node_ctx->port_id = port->id; 359 node_ctx->poll.data = ctx; 360 } 361 362 return nxt_unit_add_port(ctx, port); 363 } 364 365 366 inline bool 367 operator == (const nxt_unit_port_id_t &p1, const nxt_unit_port_id_t &p2) 368 { 369 return p1.pid == p2.pid && p1.id == p2.id; 370 } 371 372 373 void 374 Unit::remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) 375 { 376 nxt_nodejs_ctx_t *node_ctx; 377 378 if (ctx->data != NULL) { 379 node_ctx = (nxt_nodejs_ctx_t *) ctx->data; 380 381 if (node_ctx->port_id == *port_id) { 382 uv_poll_stop(&node_ctx->poll); 383 384 delete node_ctx; 385 386 ctx->data = NULL; 387 } 388 } 389 390 nxt_unit_remove_port(ctx, port_id); 391 } 392 393 394 void 395 Unit::quit_cb(nxt_unit_ctx_t *ctx) 396 { 397 Unit *obj; 398 399 obj = reinterpret_cast<Unit *>(ctx->unit->data); 400 401 obj->quit(ctx); 402 } 403 404 405 void 406 Unit::quit(nxt_unit_ctx_t *ctx) 407 { 408 napi_value server_obj, emit_close; 409 410 try { 411 nxt_handle_scope scope(env()); 412 413 server_obj = get_server_object(); 414 415 emit_close = get_named_property(server_obj, "emit_close"); 416 417 nxt_async_context async_context(env(), "unit_quit"); 418 nxt_callback_scope async_scope(async_context); 419 420 make_callback(async_context, server_obj, emit_close); 421 422 } catch (exception &e) { 423 nxt_unit_debug(ctx, "quit: %s", e.str); 424 } 425 426 nxt_unit_done(ctx); 427 } 428 429 430 napi_value 431 Unit::get_server_object() 432 { 433 napi_value unit_obj; 434 435 unit_obj = get_reference_value(wrapper_); 436 437 return get_named_property(unit_obj, "server"); 438 } 439 440 441 void 442 Unit::create_headers(nxt_unit_request_info_t *req, napi_value request) 443 { 444 void *data; 445 uint32_t i; 446 napi_value headers, raw_headers, buffer; 447 napi_status status; 448 nxt_unit_request_t *r; 449 450 r = req->request; 451 452 headers = create_object(); 453 454 status = napi_create_array_with_length(env(), r->fields_count * 2, 455 &raw_headers); 456 if (status != napi_ok) { 457 throw exception("Failed to create array"); 458 } 459 460 for (i = 0; i < r->fields_count; i++) { 461 append_header(r->fields + i, headers, raw_headers, i); 462 } 463 464 set_named_property(request, "headers", headers); 465 set_named_property(request, "rawHeaders", raw_headers); 466 set_named_property(request, "httpVersion", r->version, r->version_length); 467 set_named_property(request, "method", r->method, r->method_length); 468 set_named_property(request, "url", r->target, r->target_length); 469 470 set_named_property(request, "_websocket_handshake", r->websocket_handshake); 471 472 buffer = create_buffer((size_t) req->content_length, &data); 473 nxt_unit_request_read(req, data, req->content_length); 474 475 set_named_property(request, "_data", buffer); 476 } 477 478 479 inline char 480 lowcase(char c) 481 { 482 return (c >= 'A' && c <= 'Z') ? (c | 0x20) : c; 483 } 484 485 486 inline void 487 Unit::append_header(nxt_unit_field_t *f, napi_value headers, 488 napi_value raw_headers, uint32_t idx) 489 { 490 char *name; 491 uint8_t i; 492 napi_value str, vstr; 493 494 name = (char *) nxt_unit_sptr_get(&f->name); 495 496 str = create_string_latin1(name, f->name_length); 497 498 for (i = 0; i < f->name_length; i++) { 499 name[i] = lowcase(name[i]); 500 } 501 502 vstr = set_named_property(headers, name, f->value, f->value_length); 503 504 set_element(raw_headers, idx * 2, str); 505 set_element(raw_headers, idx * 2 + 1, vstr); 506 } 507 508 509 napi_value 510 Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req) 511 { 512 napi_value constructor, res; 513 req_data_t *req_data; 514 nxt_unit_request_t *r; 515 516 r = req->request; 517 518 constructor = get_named_property(server_obj, "Socket"); 519 520 res = new_instance(constructor); 521 522 req_data = (req_data_t *) req->data; 523 req_data->sock_ref = wrap(res, req, sock_destroy); 524 525 set_named_property(res, "remoteAddress", r->remote, r->remote_length); 526 set_named_property(res, "localAddress", r->local, r->local_length); 527 528 return res; 529 } 530 531 532 napi_value 533 Unit::create_request(napi_value server_obj, napi_value socket) 534 { 535 napi_value constructor; 536 537 constructor = get_named_property(server_obj, "ServerRequest"); 538 539 return new_instance(constructor, server_obj, socket); 540 } 541 542 543 napi_value 544 Unit::create_response(napi_value server_obj, napi_value request, 545 nxt_unit_request_info_t *req) 546 { 547 napi_value constructor, res; 548 req_data_t *req_data; 549 550 constructor = get_named_property(server_obj, "ServerResponse"); 551 552 res = new_instance(constructor, request); 553 554 req_data = (req_data_t *) req->data; 555 req_data->resp_ref = wrap(res, req, resp_destroy); 556 557 return res; 558 } 559 560 561 napi_value 562 Unit::create_websocket_frame(napi_value server_obj, 563 nxt_unit_websocket_frame_t *ws) 564 { 565 void *data; 566 napi_value constructor, res, buffer; 567 uint8_t sc[2]; 568 569 constructor = get_named_property(server_obj, "WebSocketFrame"); 570 571 res = new_instance(constructor); 572 573 set_named_property(res, "fin", (bool) ws->header->fin); 574 set_named_property(res, "opcode", ws->header->opcode); 575 set_named_property(res, "length", (int64_t) ws->payload_len); 576 577 if (ws->header->opcode == NXT_WEBSOCKET_OP_CLOSE) { 578 if (ws->payload_len >= 2) { 579 nxt_unit_websocket_read(ws, sc, 2); 580 581 set_named_property(res, "closeStatus", 582 (((uint16_t) sc[0]) << 8) | sc[1]); 583 584 } else { 585 set_named_property(res, "closeStatus", -1); 586 } 587 } 588 589 buffer = create_buffer((size_t) ws->content_length, &data); 590 nxt_unit_websocket_read(ws, data, ws->content_length); 591 592 set_named_property(res, "binaryPayload", buffer); 593 594 return res; 595 } 596 597 598 napi_value 599 Unit::response_send_headers(napi_env env, napi_callback_info info) 600 { 601 int ret; 602 char *ptr, *name_ptr; 603 bool is_array; 604 size_t argc, name_len, value_len; 605 uint32_t status_code, header_len, keys_len, array_len; 606 uint32_t keys_count, i, j; 607 uint16_t hash; 608 nxt_napi napi(env); 609 napi_value this_arg, headers, keys, name, value, array_val; 610 napi_value array_entry; 611 napi_valuetype val_type; 612 nxt_unit_field_t *f; 613 nxt_unit_request_info_t *req; 614 napi_value argv[4]; 615 616 argc = 4; 617 618 try { 619 this_arg = napi.get_cb_info(info, argc, argv); 620 if (argc != 4) { 621 napi.throw_error("Wrong args count. Expected: " 622 "statusCode, headers, headers count, " 623 "headers length"); 624 return nullptr; 625 } 626 627 req = napi.get_request_info(this_arg); 628 status_code = napi.get_value_uint32(argv[0]); 629 keys_count = napi.get_value_uint32(argv[2]); 630 header_len = napi.get_value_uint32(argv[3]); 631 632 /* Need to reserve extra byte for C-string 0-termination. */ 633 header_len++; 634 635 headers = argv[1]; 636 637 ret = nxt_unit_response_init(req, status_code, keys_count, header_len); 638 if (ret != NXT_UNIT_OK) { 639 napi.throw_error("Failed to create response"); 640 return nullptr; 641 } 642 643 keys = napi.get_property_names(headers); 644 keys_len = napi.get_array_length(keys); 645 646 ptr = req->response_buf->free; 647 648 for (i = 0; i < keys_len; i++) { 649 name = napi.get_element(keys, i); 650 651 array_entry = napi.get_property(headers, name); 652 653 name = napi.get_element(array_entry, 0); 654 value = napi.get_element(array_entry, 1); 655 656 name_len = napi.get_value_string_latin1(name, ptr, header_len); 657 name_ptr = ptr; 658 659 ptr += name_len; 660 header_len -= name_len; 661 662 hash = nxt_unit_field_hash(name_ptr, name_len); 663 664 is_array = napi.is_array(value); 665 666 if (is_array) { 667 array_len = napi.get_array_length(value); 668 669 for (j = 0; j < array_len; j++) { 670 array_val = napi.get_element(value, j); 671 672 val_type = napi.type_of(array_val); 673 674 if (val_type != napi_string) { 675 array_val = napi.coerce_to_string(array_val); 676 } 677 678 value_len = napi.get_value_string_latin1(array_val, ptr, 679 header_len); 680 681 f = req->response->fields + req->response->fields_count; 682 f->skip = 0; 683 684 nxt_unit_sptr_set(&f->name, name_ptr); 685 686 f->name_length = name_len; 687 f->hash = hash; 688 689 nxt_unit_sptr_set(&f->value, ptr); 690 f->value_length = (uint32_t) value_len; 691 692 ptr += value_len; 693 header_len -= value_len; 694 695 req->response->fields_count++; 696 } 697 698 } else { 699 val_type = napi.type_of(value); 700 701 if (val_type != napi_string) { 702 value = napi.coerce_to_string(value); 703 } 704 705 value_len = napi.get_value_string_latin1(value, ptr, header_len); 706 707 f = req->response->fields + req->response->fields_count; 708 f->skip = 0; 709 710 nxt_unit_sptr_set(&f->name, name_ptr); 711 712 f->name_length = name_len; 713 f->hash = hash; 714 715 nxt_unit_sptr_set(&f->value, ptr); 716 f->value_length = (uint32_t) value_len; 717 718 ptr += value_len; 719 header_len -= value_len; 720 721 req->response->fields_count++; 722 } 723 } 724 725 } catch (exception &e) { 726 napi.throw_error(e); 727 return nullptr; 728 } 729 730 req->response_buf->free = ptr; 731 732 ret = nxt_unit_response_send(req); 733 if (ret != NXT_UNIT_OK) { 734 napi.throw_error("Failed to send response"); 735 return nullptr; 736 } 737 738 return this_arg; 739 } 740 741 742 napi_value 743 Unit::response_write(napi_env env, napi_callback_info info) 744 { 745 int ret; 746 void *ptr; 747 size_t argc, have_buf_len; 748 uint32_t buf_len; 749 nxt_napi napi(env); 750 napi_value this_arg; 751 nxt_unit_buf_t *buf; 752 napi_valuetype buf_type; 753 nxt_unit_request_info_t *req; 754 napi_value argv[2]; 755 756 argc = 2; 757 758 try { 759 this_arg = napi.get_cb_info(info, argc, argv); 760 if (argc != 2) { 761 throw exception("Wrong args count. Expected: " 762 "chunk, chunk length"); 763 } 764 765 req = napi.get_request_info(this_arg); 766 buf_type = napi.type_of(argv[0]); 767 buf_len = napi.get_value_uint32(argv[1]) + 1; 768 769 buf = nxt_unit_response_buf_alloc(req, buf_len); 770 if (buf == NULL) { 771 throw exception("Failed to allocate response buffer"); 772 } 773 774 if (buf_type == napi_string) { 775 /* TODO: will work only for utf8 content-type */ 776 777 have_buf_len = napi.get_value_string_utf8(argv[0], buf->free, 778 buf_len); 779 780 } else { 781 ptr = napi.get_buffer_info(argv[0], have_buf_len); 782 783 memcpy(buf->free, ptr, have_buf_len); 784 } 785 786 buf->free += have_buf_len; 787 788 ret = nxt_unit_buf_send(buf); 789 if (ret != NXT_UNIT_OK) { 790 throw exception("Failed to send body buf"); 791 } 792 } catch (exception &e) { 793 napi.throw_error(e); 794 return nullptr; 795 } 796 797 return this_arg; 798 } 799 800 801 napi_value 802 Unit::response_end(napi_env env, napi_callback_info info) 803 { 804 nxt_napi napi(env); 805 napi_value this_arg; 806 req_data_t *req_data; 807 nxt_unit_request_info_t *req; 808 809 try { 810 this_arg = napi.get_cb_info(info); 811 812 req = napi.get_request_info(this_arg); 813 814 req_data = (req_data_t *) req->data; 815 816 napi.remove_wrap(req_data->sock_ref); 817 napi.remove_wrap(req_data->resp_ref); 818 napi.remove_wrap(req_data->conn_ref); 819 820 } catch (exception &e) { 821 napi.throw_error(e); 822 return nullptr; 823 } 824 825 nxt_unit_request_done(req, NXT_UNIT_OK); 826 827 return this_arg; 828 } 829 830 831 napi_value 832 Unit::websocket_send_frame(napi_env env, napi_callback_info info) 833 { 834 int ret, iovec_len; 835 bool fin; 836 size_t buf_len; 837 uint32_t opcode, sc; 838 nxt_napi napi(env); 839 napi_value this_arg, frame, payload; 840 nxt_unit_request_info_t *req; 841 char status_code[2]; 842 struct iovec iov[2]; 843 844 iovec_len = 0; 845 846 try { 847 this_arg = napi.get_cb_info(info, frame); 848 849 req = napi.get_request_info(this_arg); 850 851 opcode = napi.get_value_uint32(napi.get_named_property(frame, 852 "opcode")); 853 if (opcode == NXT_WEBSOCKET_OP_CLOSE) { 854 sc = napi.get_value_uint32(napi.get_named_property(frame, 855 "closeStatus")); 856 status_code[0] = (sc >> 8) & 0xFF; 857 status_code[1] = sc & 0xFF; 858 859 iov[iovec_len].iov_base = status_code; 860 iov[iovec_len].iov_len = 2; 861 iovec_len++; 862 } 863 864 try { 865 fin = napi.get_value_bool(napi.get_named_property(frame, "fin")); 866 867 } catch (exception &e) { 868 fin = true; 869 } 870 871 payload = napi.get_named_property(frame, "binaryPayload"); 872 873 if (napi.is_buffer(payload)) { 874 iov[iovec_len].iov_base = napi.get_buffer_info(payload, buf_len); 875 876 } else { 877 buf_len = 0; 878 } 879 880 } catch (exception &e) { 881 napi.throw_error(e); 882 return nullptr; 883 } 884 885 if (buf_len > 0) { 886 iov[iovec_len].iov_len = buf_len; 887 iovec_len++; 888 } 889 890 ret = nxt_unit_websocket_sendv(req, opcode, fin ? 1 : 0, iov, iovec_len); 891 if (ret != NXT_UNIT_OK) { 892 goto failed; 893 } 894 895 return this_arg; 896 897 failed: 898 899 napi.throw_error("Failed to send frame"); 900 901 return nullptr; 902 } 903 904 905 napi_value 906 Unit::websocket_set_sock(napi_env env, napi_callback_info info) 907 { 908 nxt_napi napi(env); 909 napi_value this_arg, sock; 910 req_data_t *req_data; 911 nxt_unit_request_info_t *req; 912 913 try { 914 this_arg = napi.get_cb_info(info, sock); 915 916 req = napi.get_request_info(sock); 917 918 req_data = (req_data_t *) req->data; 919 req_data->conn_ref = napi.wrap(this_arg, req, conn_destroy); 920 921 } catch (exception &e) { 922 napi.throw_error(e); 923 return nullptr; 924 } 925 926 return this_arg; 927 } 928 929 930 void 931 Unit::conn_destroy(napi_env env, void *nativeObject, void *finalize_hint) 932 { 933 nxt_unit_request_info_t *req; 934 935 req = (nxt_unit_request_info_t *) nativeObject; 936 937 nxt_unit_warn(NULL, "conn_destroy: %p", req); 938 } 939 940 941 void 942 Unit::sock_destroy(napi_env env, void *nativeObject, void *finalize_hint) 943 { 944 nxt_unit_request_info_t *req; 945 946 req = (nxt_unit_request_info_t *) nativeObject; 947 948 nxt_unit_warn(NULL, "sock_destroy: %p", req); 949 } 950 951 952 void 953 Unit::resp_destroy(napi_env env, void *nativeObject, void *finalize_hint) 954 { 955 nxt_unit_request_info_t *req; 956 957 req = (nxt_unit_request_info_t *) nativeObject; 958 959 nxt_unit_warn(NULL, "resp_destroy: %p", req); 960 } 961