1 2 /* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6 #include "unit.h" 7 8 #include <unistd.h> 9 #include <fcntl.h> 10 11 #include <uv.h> 12 13 #include <nxt_unit_websocket.h> 14 15 16 static void delete_port_data(uv_handle_t* handle); 17 18 napi_ref Unit::constructor_; 19 20 21 struct port_data_t { 22 nxt_unit_ctx_t *ctx; 23 nxt_unit_port_t *port; 24 uv_poll_t poll; 25 }; 26 27 28 struct req_data_t { 29 napi_ref sock_ref; 30 napi_ref resp_ref; 31 napi_ref conn_ref; 32 }; 33 34 35 Unit::Unit(napi_env env, napi_value jsthis): 36 nxt_napi(env), 37 wrapper_(wrap(jsthis, this, destroy)), 38 unit_ctx_(nullptr) 39 { 40 nxt_unit_debug(NULL, "Unit::Unit()"); 41 } 42 43 44 Unit::~Unit() 45 { 46 delete_reference(wrapper_); 47 48 nxt_unit_debug(NULL, "Unit::~Unit()"); 49 } 50 51 52 napi_value 53 Unit::init(napi_env env, napi_value exports) 54 { 55 nxt_napi napi(env); 56 napi_value ctor; 57 58 napi_property_descriptor unit_props[] = { 59 { "createServer", 0, create_server, 0, 0, 0, napi_default, 0 }, 60 { "listen", 0, listen, 0, 0, 0, napi_default, 0 }, 61 }; 62 63 try { 64 ctor = napi.define_class("Unit", create, 2, unit_props); 65 constructor_ = napi.create_reference(ctor); 66 67 napi.set_named_property(exports, "Unit", ctor); 68 napi.set_named_property(exports, "response_send_headers", 69 response_send_headers); 70 napi.set_named_property(exports, "response_write", response_write); 71 napi.set_named_property(exports, "response_end", response_end); 72 napi.set_named_property(exports, "websocket_send_frame", 73 websocket_send_frame); 74 napi.set_named_property(exports, "websocket_set_sock", 75 websocket_set_sock); 76 napi.set_named_property(exports, "buf_min", nxt_unit_buf_min()); 77 napi.set_named_property(exports, "buf_max", nxt_unit_buf_max()); 78 79 } catch (exception &e) { 80 napi.throw_error(e); 81 return nullptr; 82 } 83 84 return exports; 85 } 86 87 88 void 89 Unit::destroy(napi_env env, void *nativeObject, void *finalize_hint) 90 { 91 Unit *obj = reinterpret_cast<Unit *>(nativeObject); 92 93 delete obj; 94 } 95 96 97 napi_value 98 Unit::create(napi_env env, napi_callback_info info) 99 { 100 nxt_napi napi(env); 101 napi_value target, ctor, instance, jsthis; 102 103 try { 104 target = napi.get_new_target(info); 105 106 if (target != nullptr) { 107 /* Invoked as constructor: `new Unit(...)`. */ 108 jsthis = napi.get_cb_info(info); 109 110 new Unit(env, jsthis); 111 napi.create_reference(jsthis); 112 113 return jsthis; 114 } 115 116 /* Invoked as plain function `Unit(...)`, turn into construct call. */ 117 ctor = napi.get_reference_value(constructor_); 118 instance = napi.new_instance(ctor); 119 napi.create_reference(instance); 120 121 } catch (exception &e) { 122 napi.throw_error(e); 123 return nullptr; 124 } 125 126 return instance; 127 } 128 129 130 napi_value 131 Unit::create_server(napi_env env, napi_callback_info info) 132 { 133 Unit *obj; 134 size_t argc; 135 nxt_napi napi(env); 136 napi_value jsthis, argv; 137 nxt_unit_init_t unit_init; 138 139 argc = 1; 140 141 try { 142 jsthis = napi.get_cb_info(info, argc, &argv); 143 obj = (Unit *) napi.unwrap(jsthis); 144 145 } catch (exception &e) { 146 napi.throw_error(e); 147 return nullptr; 148 } 149 150 memset(&unit_init, 0, sizeof(nxt_unit_init_t)); 151 152 unit_init.data = obj; 153 unit_init.callbacks.request_handler = request_handler_cb; 154 unit_init.callbacks.websocket_handler = websocket_handler_cb; 155 unit_init.callbacks.close_handler = close_handler_cb; 156 unit_init.callbacks.shm_ack_handler = shm_ack_handler_cb; 157 unit_init.callbacks.add_port = add_port; 158 unit_init.callbacks.remove_port = remove_port; 159 unit_init.callbacks.quit = quit_cb; 160 161 unit_init.request_data_size = sizeof(req_data_t); 162 163 obj->unit_ctx_ = nxt_unit_init(&unit_init); 164 if (obj->unit_ctx_ == NULL) { 165 goto failed; 166 } 167 168 return nullptr; 169 170 failed: 171 172 napi_throw_error(env, NULL, "Failed to create Unit object"); 173 174 return nullptr; 175 } 176 177 178 napi_value 179 Unit::listen(napi_env env, napi_callback_info info) 180 { 181 return nullptr; 182 } 183 184 185 void 186 Unit::request_handler_cb(nxt_unit_request_info_t *req) 187 { 188 Unit *obj; 189 190 obj = reinterpret_cast<Unit *>(req->unit->data); 191 192 obj->request_handler(req); 193 } 194 195 196 void 197 Unit::request_handler(nxt_unit_request_info_t *req) 198 { 199 napi_value socket, request, response, server_obj, emit_request; 200 201 memset(req->data, 0, sizeof(req_data_t)); 202 203 try { 204 nxt_handle_scope scope(env()); 205 206 server_obj = get_server_object(); 207 208 socket = create_socket(server_obj, req); 209 request = create_request(server_obj, socket); 210 response = create_response(server_obj, request, req); 211 212 create_headers(req, request); 213 214 emit_request = get_named_property(server_obj, "emit_request"); 215 216 nxt_async_context async_context(env(), "request_handler"); 217 nxt_callback_scope async_scope(async_context); 218 219 make_callback(async_context, server_obj, emit_request, request, 220 response); 221 222 } catch (exception &e) { 223 nxt_unit_req_warn(req, "request_handler: %s", e.str); 224 } 225 } 226 227 228 void 229 Unit::websocket_handler_cb(nxt_unit_websocket_frame_t *ws) 230 { 231 Unit *obj; 232 233 obj = reinterpret_cast<Unit *>(ws->req->unit->data); 234 235 obj->websocket_handler(ws); 236 } 237 238 239 void 240 Unit::websocket_handler(nxt_unit_websocket_frame_t *ws) 241 { 242 napi_value frame, server_obj, process_frame, conn; 243 req_data_t *req_data; 244 245 req_data = (req_data_t *) ws->req->data; 246 247 try { 248 nxt_handle_scope scope(env()); 249 250 server_obj = get_server_object(); 251 252 frame = create_websocket_frame(server_obj, ws); 253 254 conn = get_reference_value(req_data->conn_ref); 255 256 process_frame = get_named_property(conn, "processFrame"); 257 258 nxt_async_context async_context(env(), "websocket_handler"); 259 nxt_callback_scope async_scope(async_context); 260 261 make_callback(async_context, conn, process_frame, frame); 262 263 } catch (exception &e) { 264 nxt_unit_req_warn(ws->req, "websocket_handler: %s", e.str); 265 } 266 267 nxt_unit_websocket_done(ws); 268 } 269 270 271 void 272 Unit::close_handler_cb(nxt_unit_request_info_t *req) 273 { 274 Unit *obj; 275 276 obj = reinterpret_cast<Unit *>(req->unit->data); 277 278 obj->close_handler(req); 279 } 280 281 282 void 283 Unit::close_handler(nxt_unit_request_info_t *req) 284 { 285 napi_value conn_handle_close, conn; 286 req_data_t *req_data; 287 288 req_data = (req_data_t *) req->data; 289 290 try { 291 nxt_handle_scope scope(env()); 292 293 conn = get_reference_value(req_data->conn_ref); 294 295 conn_handle_close = get_named_property(conn, "handleSocketClose"); 296 297 nxt_async_context async_context(env(), "close_handler"); 298 nxt_callback_scope async_scope(async_context); 299 300 make_callback(async_context, conn, conn_handle_close, 301 nxt_napi::create(0)); 302 303 remove_wrap(req_data->sock_ref); 304 remove_wrap(req_data->resp_ref); 305 remove_wrap(req_data->conn_ref); 306 307 } catch (exception &e) { 308 nxt_unit_req_warn(req, "close_handler: %s", e.str); 309 310 nxt_unit_request_done(req, NXT_UNIT_ERROR); 311 312 return; 313 } 314 315 nxt_unit_request_done(req, NXT_UNIT_OK); 316 } 317 318 319 void 320 Unit::shm_ack_handler_cb(nxt_unit_ctx_t *ctx) 321 { 322 Unit *obj; 323 324 obj = reinterpret_cast<Unit *>(ctx->unit->data); 325 326 obj->shm_ack_handler(ctx); 327 } 328 329 330 void 331 Unit::shm_ack_handler(nxt_unit_ctx_t *ctx) 332 { 333 napi_value server_obj, emit_drain; 334 335 try { 336 nxt_handle_scope scope(env()); 337 338 server_obj = get_server_object(); 339 340 emit_drain = get_named_property(server_obj, "emit_drain"); 341 342 nxt_async_context async_context(env(), "shm_ack_handler"); 343 nxt_callback_scope async_scope(async_context); 344 345 make_callback(async_context, server_obj, emit_drain); 346 347 } catch (exception &e) { 348 nxt_unit_warn(ctx, "shm_ack_handler: %s", e.str); 349 } 350 } 351 352 353 static void 354 nxt_uv_read_callback(uv_poll_t *handle, int status, int events) 355 { 356 port_data_t *data; 357 358 data = (port_data_t *) handle->data; 359 360 nxt_unit_process_port_msg(data->ctx, data->port); 361 } 362 363 364 int 365 Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 366 { 367 int err; 368 Unit *obj; 369 uv_loop_t *loop; 370 port_data_t *data; 371 napi_status status; 372 373 if (port->in_fd != -1) { 374 obj = reinterpret_cast<Unit *>(ctx->unit->data); 375 376 if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) { 377 nxt_unit_warn(ctx, "fcntl(%d, O_NONBLOCK) failed: %s (%d)", 378 port->in_fd, strerror(errno), errno); 379 return -1; 380 } 381 382 status = napi_get_uv_event_loop(obj->env(), &loop); 383 if (status != napi_ok) { 384 nxt_unit_warn(ctx, "Failed to get uv.loop"); 385 return NXT_UNIT_ERROR; 386 } 387 388 data = new port_data_t; 389 390 err = uv_poll_init(loop, &data->poll, port->in_fd); 391 if (err < 0) { 392 nxt_unit_warn(ctx, "Failed to init uv.poll"); 393 return NXT_UNIT_ERROR; 394 } 395 396 err = uv_poll_start(&data->poll, UV_READABLE, nxt_uv_read_callback); 397 if (err < 0) { 398 nxt_unit_warn(ctx, "Failed to start uv.poll"); 399 return NXT_UNIT_ERROR; 400 } 401 402 port->data = data; 403 404 data->ctx = ctx; 405 data->port = port; 406 data->poll.data = data; 407 } 408 409 return NXT_UNIT_OK; 410 } 411 412 413 void 414 Unit::remove_port(nxt_unit_t *unit, nxt_unit_port_t *port) 415 { 416 port_data_t *data; 417 418 if (port->data != NULL) { 419 data = (port_data_t *) port->data; 420 421 if (data->port == port) { 422 uv_poll_stop(&data->poll); 423 424 uv_close((uv_handle_t *) &data->poll, delete_port_data); 425 } 426 } 427 } 428 429 430 static void 431 delete_port_data(uv_handle_t* handle) 432 { 433 port_data_t *data; 434 435 data = (port_data_t *) handle->data; 436 437 delete data; 438 } 439 440 441 void 442 Unit::quit_cb(nxt_unit_ctx_t *ctx) 443 { 444 Unit *obj; 445 446 obj = reinterpret_cast<Unit *>(ctx->unit->data); 447 448 obj->quit(ctx); 449 } 450 451 452 void 453 Unit::quit(nxt_unit_ctx_t *ctx) 454 { 455 napi_value server_obj, emit_close; 456 457 try { 458 nxt_handle_scope scope(env()); 459 460 server_obj = get_server_object(); 461 462 emit_close = get_named_property(server_obj, "emit_close"); 463 464 nxt_async_context async_context(env(), "unit_quit"); 465 nxt_callback_scope async_scope(async_context); 466 467 make_callback(async_context, server_obj, emit_close); 468 469 } catch (exception &e) { 470 nxt_unit_debug(ctx, "quit: %s", e.str); 471 } 472 473 nxt_unit_done(ctx); 474 } 475 476 477 napi_value 478 Unit::get_server_object() 479 { 480 napi_value unit_obj; 481 482 unit_obj = get_reference_value(wrapper_); 483 484 return get_named_property(unit_obj, "server"); 485 } 486 487 488 void 489 Unit::create_headers(nxt_unit_request_info_t *req, napi_value request) 490 { 491 void *data; 492 uint32_t i; 493 napi_value headers, raw_headers, buffer; 494 napi_status status; 495 nxt_unit_request_t *r; 496 497 r = req->request; 498 499 headers = create_object(); 500 501 status = napi_create_array_with_length(env(), r->fields_count * 2, 502 &raw_headers); 503 if (status != napi_ok) { 504 throw exception("Failed to create array"); 505 } 506 507 for (i = 0; i < r->fields_count; i++) { 508 append_header(r->fields + i, headers, raw_headers, i); 509 } 510 511 set_named_property(request, "headers", headers); 512 set_named_property(request, "rawHeaders", raw_headers); 513 set_named_property(request, "httpVersion", r->version, r->version_length); 514 set_named_property(request, "method", r->method, r->method_length); 515 set_named_property(request, "url", r->target, r->target_length); 516 517 set_named_property(request, "_websocket_handshake", r->websocket_handshake); 518 519 buffer = create_buffer((size_t) req->content_length, &data); 520 nxt_unit_request_read(req, data, req->content_length); 521 522 set_named_property(request, "_data", buffer); 523 } 524 525 526 inline char 527 lowcase(char c) 528 { 529 return (c >= 'A' && c <= 'Z') ? (c | 0x20) : c; 530 } 531 532 533 inline void 534 Unit::append_header(nxt_unit_field_t *f, napi_value headers, 535 napi_value raw_headers, uint32_t idx) 536 { 537 char *name; 538 uint8_t i; 539 napi_value str, vstr; 540 541 name = (char *) nxt_unit_sptr_get(&f->name); 542 543 str = create_string_latin1(name, f->name_length); 544 545 for (i = 0; i < f->name_length; i++) { 546 name[i] = lowcase(name[i]); 547 } 548 549 vstr = set_named_property(headers, name, f->value, f->value_length); 550 551 set_element(raw_headers, idx * 2, str); 552 set_element(raw_headers, idx * 2 + 1, vstr); 553 } 554 555 556 napi_value 557 Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req) 558 { 559 napi_value constructor, res; 560 req_data_t *req_data; 561 nxt_unit_request_t *r; 562 563 r = req->request; 564 565 constructor = get_named_property(server_obj, "Socket"); 566 567 res = new_instance(constructor); 568 569 req_data = (req_data_t *) req->data; 570 req_data->sock_ref = wrap(res, req, sock_destroy); 571 572 set_named_property(res, "remoteAddress", r->remote, r->remote_length); 573 set_named_property(res, "localAddress", r->local, r->local_length); 574 575 return res; 576 } 577 578 579 napi_value 580 Unit::create_request(napi_value server_obj, napi_value socket) 581 { 582 napi_value constructor; 583 584 constructor = get_named_property(server_obj, "ServerRequest"); 585 586 return new_instance(constructor, server_obj, socket); 587 } 588 589 590 napi_value 591 Unit::create_response(napi_value server_obj, napi_value request, 592 nxt_unit_request_info_t *req) 593 { 594 napi_value constructor, res; 595 req_data_t *req_data; 596 597 constructor = get_named_property(server_obj, "ServerResponse"); 598 599 res = new_instance(constructor, request); 600 601 req_data = (req_data_t *) req->data; 602 req_data->resp_ref = wrap(res, req, resp_destroy); 603 604 return res; 605 } 606 607 608 napi_value 609 Unit::create_websocket_frame(napi_value server_obj, 610 nxt_unit_websocket_frame_t *ws) 611 { 612 void *data; 613 napi_value constructor, res, buffer; 614 uint8_t sc[2]; 615 616 constructor = get_named_property(server_obj, "WebSocketFrame"); 617 618 res = new_instance(constructor); 619 620 set_named_property(res, "fin", (bool) ws->header->fin); 621 set_named_property(res, "opcode", ws->header->opcode); 622 set_named_property(res, "length", (int64_t) ws->payload_len); 623 624 if (ws->header->opcode == NXT_WEBSOCKET_OP_CLOSE) { 625 if (ws->payload_len >= 2) { 626 nxt_unit_websocket_read(ws, sc, 2); 627 628 set_named_property(res, "closeStatus", 629 (((uint16_t) sc[0]) << 8) | sc[1]); 630 631 } else { 632 set_named_property(res, "closeStatus", -1); 633 } 634 } 635 636 buffer = create_buffer((size_t) ws->content_length, &data); 637 nxt_unit_websocket_read(ws, data, ws->content_length); 638 639 set_named_property(res, "binaryPayload", buffer); 640 641 return res; 642 } 643 644 645 napi_value 646 Unit::response_send_headers(napi_env env, napi_callback_info info) 647 { 648 int ret; 649 char *ptr, *name_ptr; 650 bool is_array; 651 size_t argc, name_len, value_len; 652 uint32_t status_code, header_len, keys_len, array_len; 653 uint32_t keys_count, i, j; 654 uint16_t hash; 655 nxt_napi napi(env); 656 napi_value this_arg, headers, keys, name, value, array_val; 657 napi_value array_entry; 658 napi_valuetype val_type; 659 nxt_unit_field_t *f; 660 nxt_unit_request_info_t *req; 661 napi_value argv[4]; 662 663 argc = 4; 664 665 try { 666 this_arg = napi.get_cb_info(info, argc, argv); 667 if (argc != 4) { 668 napi.throw_error("Wrong args count. Expected: " 669 "statusCode, headers, headers count, " 670 "headers length"); 671 return nullptr; 672 } 673 674 req = napi.get_request_info(this_arg); 675 status_code = napi.get_value_uint32(argv[0]); 676 keys_count = napi.get_value_uint32(argv[2]); 677 header_len = napi.get_value_uint32(argv[3]); 678 679 headers = argv[1]; 680 681 ret = nxt_unit_response_init(req, status_code, keys_count, header_len); 682 if (ret != NXT_UNIT_OK) { 683 napi.throw_error("Failed to create response"); 684 return nullptr; 685 } 686 687 /* 688 * Each name and value are 0-terminated by libunit. 689 * Need to add extra 2 bytes for each header. 690 */ 691 header_len += keys_count * 2; 692 693 keys = napi.get_property_names(headers); 694 keys_len = napi.get_array_length(keys); 695 696 ptr = req->response_buf->free; 697 698 for (i = 0; i < keys_len; i++) { 699 name = napi.get_element(keys, i); 700 701 array_entry = napi.get_property(headers, name); 702 703 name = napi.get_element(array_entry, 0); 704 value = napi.get_element(array_entry, 1); 705 706 name_len = napi.get_value_string_latin1(name, ptr, header_len); 707 name_ptr = ptr; 708 709 ptr += name_len + 1; 710 header_len -= name_len + 1; 711 712 hash = nxt_unit_field_hash(name_ptr, name_len); 713 714 is_array = napi.is_array(value); 715 716 if (is_array) { 717 array_len = napi.get_array_length(value); 718 719 for (j = 0; j < array_len; j++) { 720 array_val = napi.get_element(value, j); 721 722 val_type = napi.type_of(array_val); 723 724 if (val_type != napi_string) { 725 array_val = napi.coerce_to_string(array_val); 726 } 727 728 value_len = napi.get_value_string_latin1(array_val, ptr, 729 header_len); 730 731 f = req->response->fields + req->response->fields_count; 732 f->skip = 0; 733 734 nxt_unit_sptr_set(&f->name, name_ptr); 735 736 f->name_length = name_len; 737 f->hash = hash; 738 739 nxt_unit_sptr_set(&f->value, ptr); 740 f->value_length = (uint32_t) value_len; 741 742 ptr += value_len + 1; 743 header_len -= value_len + 1; 744 745 req->response->fields_count++; 746 } 747 748 } else { 749 val_type = napi.type_of(value); 750 751 if (val_type != napi_string) { 752 value = napi.coerce_to_string(value); 753 } 754 755 value_len = napi.get_value_string_latin1(value, ptr, header_len); 756 757 f = req->response->fields + req->response->fields_count; 758 f->skip = 0; 759 760 nxt_unit_sptr_set(&f->name, name_ptr); 761 762 f->name_length = name_len; 763 f->hash = hash; 764 765 nxt_unit_sptr_set(&f->value, ptr); 766 f->value_length = (uint32_t) value_len; 767 768 ptr += value_len + 1; 769 header_len -= value_len + 1; 770 771 req->response->fields_count++; 772 } 773 } 774 775 } catch (exception &e) { 776 napi.throw_error(e); 777 return nullptr; 778 } 779 780 req->response_buf->free = ptr; 781 782 ret = nxt_unit_response_send(req); 783 if (ret != NXT_UNIT_OK) { 784 napi.throw_error("Failed to send response"); 785 return nullptr; 786 } 787 788 return this_arg; 789 } 790 791 792 napi_value 793 Unit::response_write(napi_env env, napi_callback_info info) 794 { 795 int ret; 796 void *ptr; 797 size_t argc, have_buf_len; 798 ssize_t res_len; 799 uint32_t buf_start, buf_len; 800 nxt_napi napi(env); 801 napi_value this_arg; 802 nxt_unit_buf_t *buf; 803 napi_valuetype buf_type; 804 nxt_unit_request_info_t *req; 805 napi_value argv[3]; 806 807 argc = 3; 808 809 try { 810 this_arg = napi.get_cb_info(info, argc, argv); 811 if (argc != 3) { 812 throw exception("Wrong args count. Expected: " 813 "chunk, start, length"); 814 } 815 816 req = napi.get_request_info(this_arg); 817 buf_type = napi.type_of(argv[0]); 818 buf_start = napi.get_value_uint32(argv[1]); 819 buf_len = napi.get_value_uint32(argv[2]) + 1; 820 821 if (buf_type == napi_string) { 822 /* TODO: will work only for utf8 content-type */ 823 824 if (req->response_buf != NULL 825 && req->response_buf->end >= req->response_buf->free + buf_len) 826 { 827 buf = req->response_buf; 828 829 } else { 830 buf = nxt_unit_response_buf_alloc(req, buf_len); 831 if (buf == NULL) { 832 throw exception("Failed to allocate response buffer"); 833 } 834 } 835 836 have_buf_len = napi.get_value_string_utf8(argv[0], buf->free, 837 buf_len); 838 839 buf->free += have_buf_len; 840 841 ret = nxt_unit_buf_send(buf); 842 if (ret == NXT_UNIT_OK) { 843 res_len = have_buf_len; 844 } 845 846 } else { 847 ptr = napi.get_buffer_info(argv[0], have_buf_len); 848 849 if (buf_start > 0) { 850 ptr = ((uint8_t *) ptr) + buf_start; 851 have_buf_len -= buf_start; 852 } 853 854 res_len = nxt_unit_response_write_nb(req, ptr, have_buf_len, 0); 855 856 ret = res_len < 0 ? -res_len : (int) NXT_UNIT_OK; 857 } 858 859 if (ret != NXT_UNIT_OK) { 860 throw exception("Failed to send body buf"); 861 } 862 } catch (exception &e) { 863 napi.throw_error(e); 864 return nullptr; 865 } 866 867 return napi.create((int64_t) res_len); 868 } 869 870 871 napi_value 872 Unit::response_end(napi_env env, napi_callback_info info) 873 { 874 nxt_napi napi(env); 875 napi_value this_arg; 876 req_data_t *req_data; 877 nxt_unit_request_info_t *req; 878 879 try { 880 this_arg = napi.get_cb_info(info); 881 882 req = napi.get_request_info(this_arg); 883 884 req_data = (req_data_t *) req->data; 885 886 napi.remove_wrap(req_data->sock_ref); 887 napi.remove_wrap(req_data->resp_ref); 888 napi.remove_wrap(req_data->conn_ref); 889 890 } catch (exception &e) { 891 napi.throw_error(e); 892 return nullptr; 893 } 894 895 nxt_unit_request_done(req, NXT_UNIT_OK); 896 897 return this_arg; 898 } 899 900 901 napi_value 902 Unit::websocket_send_frame(napi_env env, napi_callback_info info) 903 { 904 int ret, iovec_len; 905 bool fin; 906 size_t buf_len; 907 uint32_t opcode, sc; 908 nxt_napi napi(env); 909 napi_value this_arg, frame, payload; 910 nxt_unit_request_info_t *req; 911 char status_code[2]; 912 struct iovec iov[2]; 913 914 iovec_len = 0; 915 916 try { 917 this_arg = napi.get_cb_info(info, frame); 918 919 req = napi.get_request_info(this_arg); 920 921 opcode = napi.get_value_uint32(napi.get_named_property(frame, 922 "opcode")); 923 if (opcode == NXT_WEBSOCKET_OP_CLOSE) { 924 sc = napi.get_value_uint32(napi.get_named_property(frame, 925 "closeStatus")); 926 status_code[0] = (sc >> 8) & 0xFF; 927 status_code[1] = sc & 0xFF; 928 929 iov[iovec_len].iov_base = status_code; 930 iov[iovec_len].iov_len = 2; 931 iovec_len++; 932 } 933 934 try { 935 fin = napi.get_value_bool(napi.get_named_property(frame, "fin")); 936 937 } catch (exception &e) { 938 fin = true; 939 } 940 941 payload = napi.get_named_property(frame, "binaryPayload"); 942 943 if (napi.is_buffer(payload)) { 944 iov[iovec_len].iov_base = napi.get_buffer_info(payload, buf_len); 945 946 } else { 947 buf_len = 0; 948 } 949 950 } catch (exception &e) { 951 napi.throw_error(e); 952 return nullptr; 953 } 954 955 if (buf_len > 0) { 956 iov[iovec_len].iov_len = buf_len; 957 iovec_len++; 958 } 959 960 ret = nxt_unit_websocket_sendv(req, opcode, fin ? 1 : 0, iov, iovec_len); 961 if (ret != NXT_UNIT_OK) { 962 goto failed; 963 } 964 965 return this_arg; 966 967 failed: 968 969 napi.throw_error("Failed to send frame"); 970 971 return nullptr; 972 } 973 974 975 napi_value 976 Unit::websocket_set_sock(napi_env env, napi_callback_info info) 977 { 978 nxt_napi napi(env); 979 napi_value this_arg, sock; 980 req_data_t *req_data; 981 nxt_unit_request_info_t *req; 982 983 try { 984 this_arg = napi.get_cb_info(info, sock); 985 986 req = napi.get_request_info(sock); 987 988 req_data = (req_data_t *) req->data; 989 req_data->conn_ref = napi.wrap(this_arg, req, conn_destroy); 990 991 } catch (exception &e) { 992 napi.throw_error(e); 993 return nullptr; 994 } 995 996 return this_arg; 997 } 998 999 1000 void 1001 Unit::conn_destroy(napi_env env, void *r, void *finalize_hint) 1002 { 1003 nxt_unit_req_debug((nxt_unit_request_info_t *) r, "conn_destroy: %p", r); 1004 } 1005 1006 1007 void 1008 Unit::sock_destroy(napi_env env, void *r, void *finalize_hint) 1009 { 1010 nxt_unit_req_debug((nxt_unit_request_info_t *) r, "sock_destroy: %p", r); 1011 } 1012 1013 1014 void 1015 Unit::resp_destroy(napi_env env, void *r, void *finalize_hint) 1016 { 1017 nxt_unit_req_debug((nxt_unit_request_info_t *) r, "resp_destroy: %p", r); 1018 } 1019