1 2 /* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6 #include "unit.h" 7 8 #include <unistd.h> 9 #include <fcntl.h> 10 11 #include <uv.h> 12 13 #include <nxt_unit_websocket.h> 14 15 16 napi_ref Unit::constructor_; 17 18 19 struct nxt_nodejs_ctx_t { 20 nxt_unit_port_id_t port_id; 21 uv_poll_t poll; 22 }; 23 24 25 struct req_data_t { 26 napi_ref sock_ref; 27 napi_ref resp_ref; 28 napi_ref conn_ref; 29 }; 30 31 32 Unit::Unit(napi_env env, napi_value jsthis): 33 nxt_napi(env), 34 wrapper_(wrap(jsthis, this, destroy)), 35 unit_ctx_(nullptr) 36 { 37 nxt_unit_debug(NULL, "Unit::Unit()"); 38 } 39 40 41 Unit::~Unit() 42 { 43 delete_reference(wrapper_); 44 45 nxt_unit_debug(NULL, "Unit::~Unit()"); 46 } 47 48 49 napi_value 50 Unit::init(napi_env env, napi_value exports) 51 { 52 nxt_napi napi(env); 53 napi_value ctor; 54 55 napi_property_descriptor unit_props[] = { 56 { "createServer", 0, create_server, 0, 0, 0, napi_default, 0 }, 57 { "listen", 0, listen, 0, 0, 0, napi_default, 0 }, 58 }; 59 60 try { 61 ctor = napi.define_class("Unit", create, 2, unit_props); 62 constructor_ = napi.create_reference(ctor); 63 64 napi.set_named_property(exports, "Unit", ctor); 65 napi.set_named_property(exports, "response_send_headers", 66 response_send_headers); 67 napi.set_named_property(exports, "response_write", response_write); 68 napi.set_named_property(exports, "response_end", response_end); 69 napi.set_named_property(exports, "websocket_send_frame", 70 websocket_send_frame); 71 napi.set_named_property(exports, "websocket_set_sock", 72 websocket_set_sock); 73 napi.set_named_property(exports, "buf_min", nxt_unit_buf_min()); 74 napi.set_named_property(exports, "buf_max", nxt_unit_buf_max()); 75 76 } catch (exception &e) { 77 napi.throw_error(e); 78 return nullptr; 79 } 80 81 return exports; 82 } 83 84 85 void 86 Unit::destroy(napi_env env, void *nativeObject, void *finalize_hint) 87 { 88 Unit *obj = reinterpret_cast<Unit *>(nativeObject); 89 90 delete obj; 91 } 92 93 94 napi_value 95 Unit::create(napi_env env, napi_callback_info info) 96 { 97 nxt_napi napi(env); 98 napi_value target, ctor, instance, jsthis; 99 100 try { 101 target = napi.get_new_target(info); 102 103 if (target != nullptr) { 104 /* Invoked as constructor: `new Unit(...)`. */ 105 jsthis = napi.get_cb_info(info); 106 107 new Unit(env, jsthis); 108 napi.create_reference(jsthis); 109 110 return jsthis; 111 } 112 113 /* Invoked as plain function `Unit(...)`, turn into construct call. */ 114 ctor = napi.get_reference_value(constructor_); 115 instance = napi.new_instance(ctor); 116 napi.create_reference(instance); 117 118 } catch (exception &e) { 119 napi.throw_error(e); 120 return nullptr; 121 } 122 123 return instance; 124 } 125 126 127 napi_value 128 Unit::create_server(napi_env env, napi_callback_info info) 129 { 130 Unit *obj; 131 size_t argc; 132 nxt_napi napi(env); 133 napi_value jsthis, argv; 134 nxt_unit_init_t unit_init; 135 136 argc = 1; 137 138 try { 139 jsthis = napi.get_cb_info(info, argc, &argv); 140 obj = (Unit *) napi.unwrap(jsthis); 141 142 } catch (exception &e) { 143 napi.throw_error(e); 144 return nullptr; 145 } 146 147 memset(&unit_init, 0, sizeof(nxt_unit_init_t)); 148 149 unit_init.data = obj; 150 unit_init.callbacks.request_handler = request_handler_cb; 151 unit_init.callbacks.websocket_handler = websocket_handler_cb; 152 unit_init.callbacks.close_handler = close_handler_cb; 153 unit_init.callbacks.shm_ack_handler = shm_ack_handler_cb; 154 unit_init.callbacks.add_port = add_port; 155 unit_init.callbacks.remove_port = remove_port; 156 unit_init.callbacks.quit = quit_cb; 157 158 unit_init.request_data_size = sizeof(req_data_t); 159 160 obj->unit_ctx_ = nxt_unit_init(&unit_init); 161 if (obj->unit_ctx_ == NULL) { 162 goto failed; 163 } 164 165 return nullptr; 166 167 failed: 168 169 napi_throw_error(env, NULL, "Failed to create Unit object"); 170 171 return nullptr; 172 } 173 174 175 napi_value 176 Unit::listen(napi_env env, napi_callback_info info) 177 { 178 return nullptr; 179 } 180 181 182 void 183 Unit::request_handler_cb(nxt_unit_request_info_t *req) 184 { 185 Unit *obj; 186 187 obj = reinterpret_cast<Unit *>(req->unit->data); 188 189 obj->request_handler(req); 190 } 191 192 193 void 194 Unit::request_handler(nxt_unit_request_info_t *req) 195 { 196 napi_value socket, request, response, server_obj, emit_request; 197 198 memset(req->data, 0, sizeof(req_data_t)); 199 200 try { 201 nxt_handle_scope scope(env()); 202 203 server_obj = get_server_object(); 204 205 socket = create_socket(server_obj, req); 206 request = create_request(server_obj, socket); 207 response = create_response(server_obj, request, req); 208 209 create_headers(req, request); 210 211 emit_request = get_named_property(server_obj, "emit_request"); 212 213 nxt_async_context async_context(env(), "request_handler"); 214 nxt_callback_scope async_scope(async_context); 215 216 make_callback(async_context, server_obj, emit_request, request, 217 response); 218 219 } catch (exception &e) { 220 nxt_unit_req_warn(req, "request_handler: %s", e.str); 221 } 222 } 223 224 225 void 226 Unit::websocket_handler_cb(nxt_unit_websocket_frame_t *ws) 227 { 228 Unit *obj; 229 230 obj = reinterpret_cast<Unit *>(ws->req->unit->data); 231 232 obj->websocket_handler(ws); 233 } 234 235 236 void 237 Unit::websocket_handler(nxt_unit_websocket_frame_t *ws) 238 { 239 napi_value frame, server_obj, process_frame, conn; 240 req_data_t *req_data; 241 242 req_data = (req_data_t *) ws->req->data; 243 244 try { 245 nxt_handle_scope scope(env()); 246 247 server_obj = get_server_object(); 248 249 frame = create_websocket_frame(server_obj, ws); 250 251 conn = get_reference_value(req_data->conn_ref); 252 253 process_frame = get_named_property(conn, "processFrame"); 254 255 nxt_async_context async_context(env(), "websocket_handler"); 256 nxt_callback_scope async_scope(async_context); 257 258 make_callback(async_context, conn, process_frame, frame); 259 260 } catch (exception &e) { 261 nxt_unit_req_warn(ws->req, "websocket_handler: %s", e.str); 262 } 263 264 nxt_unit_websocket_done(ws); 265 } 266 267 268 void 269 Unit::close_handler_cb(nxt_unit_request_info_t *req) 270 { 271 Unit *obj; 272 273 obj = reinterpret_cast<Unit *>(req->unit->data); 274 275 obj->close_handler(req); 276 } 277 278 279 void 280 Unit::close_handler(nxt_unit_request_info_t *req) 281 { 282 napi_value conn_handle_close, conn; 283 req_data_t *req_data; 284 285 req_data = (req_data_t *) req->data; 286 287 try { 288 nxt_handle_scope scope(env()); 289 290 conn = get_reference_value(req_data->conn_ref); 291 292 conn_handle_close = get_named_property(conn, "handleSocketClose"); 293 294 nxt_async_context async_context(env(), "close_handler"); 295 nxt_callback_scope async_scope(async_context); 296 297 make_callback(async_context, conn, conn_handle_close, 298 nxt_napi::create(0)); 299 300 remove_wrap(req_data->sock_ref); 301 remove_wrap(req_data->resp_ref); 302 remove_wrap(req_data->conn_ref); 303 304 } catch (exception &e) { 305 nxt_unit_req_warn(req, "close_handler: %s", e.str); 306 307 return; 308 } 309 310 nxt_unit_request_done(req, NXT_UNIT_OK); 311 } 312 313 314 void 315 Unit::shm_ack_handler_cb(nxt_unit_ctx_t *ctx) 316 { 317 Unit *obj; 318 319 obj = reinterpret_cast<Unit *>(ctx->unit->data); 320 321 obj->shm_ack_handler(ctx); 322 } 323 324 325 void 326 Unit::shm_ack_handler(nxt_unit_ctx_t *ctx) 327 { 328 napi_value server_obj, emit_drain; 329 330 try { 331 nxt_handle_scope scope(env()); 332 333 server_obj = get_server_object(); 334 335 emit_drain = get_named_property(server_obj, "emit_drain"); 336 337 nxt_async_context async_context(env(), "shm_ack_handler"); 338 nxt_callback_scope async_scope(async_context); 339 340 make_callback(async_context, server_obj, emit_drain); 341 342 } catch (exception &e) { 343 nxt_unit_warn(ctx, "shm_ack_handler: %s", e.str); 344 } 345 } 346 347 348 static void 349 nxt_uv_read_callback(uv_poll_t *handle, int status, int events) 350 { 351 nxt_unit_run_once((nxt_unit_ctx_t *) handle->data); 352 } 353 354 355 int 356 Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 357 { 358 int err; 359 Unit *obj; 360 uv_loop_t *loop; 361 napi_status status; 362 nxt_nodejs_ctx_t *node_ctx; 363 364 if (port->in_fd != -1) { 365 obj = reinterpret_cast<Unit *>(ctx->unit->data); 366 367 if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) { 368 nxt_unit_warn(ctx, "fcntl(%d, O_NONBLOCK) failed: %s (%d)", 369 port->in_fd, strerror(errno), errno); 370 return -1; 371 } 372 373 status = napi_get_uv_event_loop(obj->env(), &loop); 374 if (status != napi_ok) { 375 nxt_unit_warn(ctx, "Failed to get uv.loop"); 376 return NXT_UNIT_ERROR; 377 } 378 379 node_ctx = new nxt_nodejs_ctx_t; 380 381 err = uv_poll_init(loop, &node_ctx->poll, port->in_fd); 382 if (err < 0) { 383 nxt_unit_warn(ctx, "Failed to init uv.poll"); 384 return NXT_UNIT_ERROR; 385 } 386 387 err = uv_poll_start(&node_ctx->poll, UV_READABLE, nxt_uv_read_callback); 388 if (err < 0) { 389 nxt_unit_warn(ctx, "Failed to start uv.poll"); 390 return NXT_UNIT_ERROR; 391 } 392 393 ctx->data = node_ctx; 394 395 node_ctx->port_id = port->id; 396 node_ctx->poll.data = ctx; 397 } 398 399 return nxt_unit_add_port(ctx, port); 400 } 401 402 403 inline bool 404 operator == (const nxt_unit_port_id_t &p1, const nxt_unit_port_id_t &p2) 405 { 406 return p1.pid == p2.pid && p1.id == p2.id; 407 } 408 409 410 void 411 Unit::remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) 412 { 413 nxt_nodejs_ctx_t *node_ctx; 414 415 if (ctx->data != NULL) { 416 node_ctx = (nxt_nodejs_ctx_t *) ctx->data; 417 418 if (node_ctx->port_id == *port_id) { 419 uv_poll_stop(&node_ctx->poll); 420 421 delete node_ctx; 422 423 ctx->data = NULL; 424 } 425 } 426 427 nxt_unit_remove_port(ctx, port_id); 428 } 429 430 431 void 432 Unit::quit_cb(nxt_unit_ctx_t *ctx) 433 { 434 Unit *obj; 435 436 obj = reinterpret_cast<Unit *>(ctx->unit->data); 437 438 obj->quit(ctx); 439 } 440 441 442 void 443 Unit::quit(nxt_unit_ctx_t *ctx) 444 { 445 napi_value server_obj, emit_close; 446 447 try { 448 nxt_handle_scope scope(env()); 449 450 server_obj = get_server_object(); 451 452 emit_close = get_named_property(server_obj, "emit_close"); 453 454 nxt_async_context async_context(env(), "unit_quit"); 455 nxt_callback_scope async_scope(async_context); 456 457 make_callback(async_context, server_obj, emit_close); 458 459 } catch (exception &e) { 460 nxt_unit_debug(ctx, "quit: %s", e.str); 461 } 462 463 nxt_unit_done(ctx); 464 } 465 466 467 napi_value 468 Unit::get_server_object() 469 { 470 napi_value unit_obj; 471 472 unit_obj = get_reference_value(wrapper_); 473 474 return get_named_property(unit_obj, "server"); 475 } 476 477 478 void 479 Unit::create_headers(nxt_unit_request_info_t *req, napi_value request) 480 { 481 void *data; 482 uint32_t i; 483 napi_value headers, raw_headers, buffer; 484 napi_status status; 485 nxt_unit_request_t *r; 486 487 r = req->request; 488 489 headers = create_object(); 490 491 status = napi_create_array_with_length(env(), r->fields_count * 2, 492 &raw_headers); 493 if (status != napi_ok) { 494 throw exception("Failed to create array"); 495 } 496 497 for (i = 0; i < r->fields_count; i++) { 498 append_header(r->fields + i, headers, raw_headers, i); 499 } 500 501 set_named_property(request, "headers", headers); 502 set_named_property(request, "rawHeaders", raw_headers); 503 set_named_property(request, "httpVersion", r->version, r->version_length); 504 set_named_property(request, "method", r->method, r->method_length); 505 set_named_property(request, "url", r->target, r->target_length); 506 507 set_named_property(request, "_websocket_handshake", r->websocket_handshake); 508 509 buffer = create_buffer((size_t) req->content_length, &data); 510 nxt_unit_request_read(req, data, req->content_length); 511 512 set_named_property(request, "_data", buffer); 513 } 514 515 516 inline char 517 lowcase(char c) 518 { 519 return (c >= 'A' && c <= 'Z') ? (c | 0x20) : c; 520 } 521 522 523 inline void 524 Unit::append_header(nxt_unit_field_t *f, napi_value headers, 525 napi_value raw_headers, uint32_t idx) 526 { 527 char *name; 528 uint8_t i; 529 napi_value str, vstr; 530 531 name = (char *) nxt_unit_sptr_get(&f->name); 532 533 str = create_string_latin1(name, f->name_length); 534 535 for (i = 0; i < f->name_length; i++) { 536 name[i] = lowcase(name[i]); 537 } 538 539 vstr = set_named_property(headers, name, f->value, f->value_length); 540 541 set_element(raw_headers, idx * 2, str); 542 set_element(raw_headers, idx * 2 + 1, vstr); 543 } 544 545 546 napi_value 547 Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req) 548 { 549 napi_value constructor, res; 550 req_data_t *req_data; 551 nxt_unit_request_t *r; 552 553 r = req->request; 554 555 constructor = get_named_property(server_obj, "Socket"); 556 557 res = new_instance(constructor); 558 559 req_data = (req_data_t *) req->data; 560 req_data->sock_ref = wrap(res, req, sock_destroy); 561 562 set_named_property(res, "remoteAddress", r->remote, r->remote_length); 563 set_named_property(res, "localAddress", r->local, r->local_length); 564 565 return res; 566 } 567 568 569 napi_value 570 Unit::create_request(napi_value server_obj, napi_value socket) 571 { 572 napi_value constructor; 573 574 constructor = get_named_property(server_obj, "ServerRequest"); 575 576 return new_instance(constructor, server_obj, socket); 577 } 578 579 580 napi_value 581 Unit::create_response(napi_value server_obj, napi_value request, 582 nxt_unit_request_info_t *req) 583 { 584 napi_value constructor, res; 585 req_data_t *req_data; 586 587 constructor = get_named_property(server_obj, "ServerResponse"); 588 589 res = new_instance(constructor, request); 590 591 req_data = (req_data_t *) req->data; 592 req_data->resp_ref = wrap(res, req, resp_destroy); 593 594 return res; 595 } 596 597 598 napi_value 599 Unit::create_websocket_frame(napi_value server_obj, 600 nxt_unit_websocket_frame_t *ws) 601 { 602 void *data; 603 napi_value constructor, res, buffer; 604 uint8_t sc[2]; 605 606 constructor = get_named_property(server_obj, "WebSocketFrame"); 607 608 res = new_instance(constructor); 609 610 set_named_property(res, "fin", (bool) ws->header->fin); 611 set_named_property(res, "opcode", ws->header->opcode); 612 set_named_property(res, "length", (int64_t) ws->payload_len); 613 614 if (ws->header->opcode == NXT_WEBSOCKET_OP_CLOSE) { 615 if (ws->payload_len >= 2) { 616 nxt_unit_websocket_read(ws, sc, 2); 617 618 set_named_property(res, "closeStatus", 619 (((uint16_t) sc[0]) << 8) | sc[1]); 620 621 } else { 622 set_named_property(res, "closeStatus", -1); 623 } 624 } 625 626 buffer = create_buffer((size_t) ws->content_length, &data); 627 nxt_unit_websocket_read(ws, data, ws->content_length); 628 629 set_named_property(res, "binaryPayload", buffer); 630 631 return res; 632 } 633 634 635 napi_value 636 Unit::response_send_headers(napi_env env, napi_callback_info info) 637 { 638 int ret; 639 char *ptr, *name_ptr; 640 bool is_array; 641 size_t argc, name_len, value_len; 642 uint32_t status_code, header_len, keys_len, array_len; 643 uint32_t keys_count, i, j; 644 uint16_t hash; 645 nxt_napi napi(env); 646 napi_value this_arg, headers, keys, name, value, array_val; 647 napi_value array_entry; 648 napi_valuetype val_type; 649 nxt_unit_field_t *f; 650 nxt_unit_request_info_t *req; 651 napi_value argv[4]; 652 653 argc = 4; 654 655 try { 656 this_arg = napi.get_cb_info(info, argc, argv); 657 if (argc != 4) { 658 napi.throw_error("Wrong args count. Expected: " 659 "statusCode, headers, headers count, " 660 "headers length"); 661 return nullptr; 662 } 663 664 req = napi.get_request_info(this_arg); 665 status_code = napi.get_value_uint32(argv[0]); 666 keys_count = napi.get_value_uint32(argv[2]); 667 header_len = napi.get_value_uint32(argv[3]); 668 669 headers = argv[1]; 670 671 ret = nxt_unit_response_init(req, status_code, keys_count, header_len); 672 if (ret != NXT_UNIT_OK) { 673 napi.throw_error("Failed to create response"); 674 return nullptr; 675 } 676 677 /* 678 * Each name and value are 0-terminated by libunit. 679 * Need to add extra 2 bytes for each header. 680 */ 681 header_len += keys_count * 2; 682 683 keys = napi.get_property_names(headers); 684 keys_len = napi.get_array_length(keys); 685 686 ptr = req->response_buf->free; 687 688 for (i = 0; i < keys_len; i++) { 689 name = napi.get_element(keys, i); 690 691 array_entry = napi.get_property(headers, name); 692 693 name = napi.get_element(array_entry, 0); 694 value = napi.get_element(array_entry, 1); 695 696 name_len = napi.get_value_string_latin1(name, ptr, header_len); 697 name_ptr = ptr; 698 699 ptr += name_len + 1; 700 header_len -= name_len + 1; 701 702 hash = nxt_unit_field_hash(name_ptr, name_len); 703 704 is_array = napi.is_array(value); 705 706 if (is_array) { 707 array_len = napi.get_array_length(value); 708 709 for (j = 0; j < array_len; j++) { 710 array_val = napi.get_element(value, j); 711 712 val_type = napi.type_of(array_val); 713 714 if (val_type != napi_string) { 715 array_val = napi.coerce_to_string(array_val); 716 } 717 718 value_len = napi.get_value_string_latin1(array_val, ptr, 719 header_len); 720 721 f = req->response->fields + req->response->fields_count; 722 f->skip = 0; 723 724 nxt_unit_sptr_set(&f->name, name_ptr); 725 726 f->name_length = name_len; 727 f->hash = hash; 728 729 nxt_unit_sptr_set(&f->value, ptr); 730 f->value_length = (uint32_t) value_len; 731 732 ptr += value_len + 1; 733 header_len -= value_len + 1; 734 735 req->response->fields_count++; 736 } 737 738 } else { 739 val_type = napi.type_of(value); 740 741 if (val_type != napi_string) { 742 value = napi.coerce_to_string(value); 743 } 744 745 value_len = napi.get_value_string_latin1(value, ptr, header_len); 746 747 f = req->response->fields + req->response->fields_count; 748 f->skip = 0; 749 750 nxt_unit_sptr_set(&f->name, name_ptr); 751 752 f->name_length = name_len; 753 f->hash = hash; 754 755 nxt_unit_sptr_set(&f->value, ptr); 756 f->value_length = (uint32_t) value_len; 757 758 ptr += value_len + 1; 759 header_len -= value_len + 1; 760 761 req->response->fields_count++; 762 } 763 } 764 765 } catch (exception &e) { 766 napi.throw_error(e); 767 return nullptr; 768 } 769 770 req->response_buf->free = ptr; 771 772 ret = nxt_unit_response_send(req); 773 if (ret != NXT_UNIT_OK) { 774 napi.throw_error("Failed to send response"); 775 return nullptr; 776 } 777 778 return this_arg; 779 } 780 781 782 napi_value 783 Unit::response_write(napi_env env, napi_callback_info info) 784 { 785 int ret; 786 void *ptr; 787 size_t argc, have_buf_len; 788 ssize_t res_len; 789 uint32_t buf_start, buf_len; 790 nxt_napi napi(env); 791 napi_value this_arg; 792 nxt_unit_buf_t *buf; 793 napi_valuetype buf_type; 794 nxt_unit_request_info_t *req; 795 napi_value argv[3]; 796 797 argc = 3; 798 799 try { 800 this_arg = napi.get_cb_info(info, argc, argv); 801 if (argc != 3) { 802 throw exception("Wrong args count. Expected: " 803 "chunk, start, length"); 804 } 805 806 req = napi.get_request_info(this_arg); 807 buf_type = napi.type_of(argv[0]); 808 buf_start = napi.get_value_uint32(argv[1]); 809 buf_len = napi.get_value_uint32(argv[2]) + 1; 810 811 if (buf_type == napi_string) { 812 /* TODO: will work only for utf8 content-type */ 813 814 if (req->response_buf != NULL 815 && (req->response_buf->end - req->response_buf->free) 816 >= buf_len) 817 { 818 buf = req->response_buf; 819 820 } else { 821 buf = nxt_unit_response_buf_alloc(req, buf_len); 822 if (buf == NULL) { 823 throw exception("Failed to allocate response buffer"); 824 } 825 } 826 827 have_buf_len = napi.get_value_string_utf8(argv[0], buf->free, 828 buf_len); 829 830 buf->free += have_buf_len; 831 832 ret = nxt_unit_buf_send(buf); 833 if (ret == NXT_UNIT_OK) { 834 res_len = have_buf_len; 835 } 836 837 } else { 838 ptr = napi.get_buffer_info(argv[0], have_buf_len); 839 840 if (buf_start > 0) { 841 ptr = ((uint8_t *) ptr) + buf_start; 842 have_buf_len -= buf_start; 843 } 844 845 res_len = nxt_unit_response_write_nb(req, ptr, have_buf_len, 0); 846 847 ret = res_len < 0 ? -res_len : (int) NXT_UNIT_OK; 848 } 849 850 if (ret != NXT_UNIT_OK) { 851 throw exception("Failed to send body buf"); 852 } 853 } catch (exception &e) { 854 napi.throw_error(e); 855 return nullptr; 856 } 857 858 return napi.create((int64_t) res_len); 859 } 860 861 862 napi_value 863 Unit::response_end(napi_env env, napi_callback_info info) 864 { 865 nxt_napi napi(env); 866 napi_value this_arg; 867 req_data_t *req_data; 868 nxt_unit_request_info_t *req; 869 870 try { 871 this_arg = napi.get_cb_info(info); 872 873 req = napi.get_request_info(this_arg); 874 875 req_data = (req_data_t *) req->data; 876 877 napi.remove_wrap(req_data->sock_ref); 878 napi.remove_wrap(req_data->resp_ref); 879 napi.remove_wrap(req_data->conn_ref); 880 881 } catch (exception &e) { 882 napi.throw_error(e); 883 return nullptr; 884 } 885 886 nxt_unit_request_done(req, NXT_UNIT_OK); 887 888 return this_arg; 889 } 890 891 892 napi_value 893 Unit::websocket_send_frame(napi_env env, napi_callback_info info) 894 { 895 int ret, iovec_len; 896 bool fin; 897 size_t buf_len; 898 uint32_t opcode, sc; 899 nxt_napi napi(env); 900 napi_value this_arg, frame, payload; 901 nxt_unit_request_info_t *req; 902 char status_code[2]; 903 struct iovec iov[2]; 904 905 iovec_len = 0; 906 907 try { 908 this_arg = napi.get_cb_info(info, frame); 909 910 req = napi.get_request_info(this_arg); 911 912 opcode = napi.get_value_uint32(napi.get_named_property(frame, 913 "opcode")); 914 if (opcode == NXT_WEBSOCKET_OP_CLOSE) { 915 sc = napi.get_value_uint32(napi.get_named_property(frame, 916 "closeStatus")); 917 status_code[0] = (sc >> 8) & 0xFF; 918 status_code[1] = sc & 0xFF; 919 920 iov[iovec_len].iov_base = status_code; 921 iov[iovec_len].iov_len = 2; 922 iovec_len++; 923 } 924 925 try { 926 fin = napi.get_value_bool(napi.get_named_property(frame, "fin")); 927 928 } catch (exception &e) { 929 fin = true; 930 } 931 932 payload = napi.get_named_property(frame, "binaryPayload"); 933 934 if (napi.is_buffer(payload)) { 935 iov[iovec_len].iov_base = napi.get_buffer_info(payload, buf_len); 936 937 } else { 938 buf_len = 0; 939 } 940 941 } catch (exception &e) { 942 napi.throw_error(e); 943 return nullptr; 944 } 945 946 if (buf_len > 0) { 947 iov[iovec_len].iov_len = buf_len; 948 iovec_len++; 949 } 950 951 ret = nxt_unit_websocket_sendv(req, opcode, fin ? 1 : 0, iov, iovec_len); 952 if (ret != NXT_UNIT_OK) { 953 goto failed; 954 } 955 956 return this_arg; 957 958 failed: 959 960 napi.throw_error("Failed to send frame"); 961 962 return nullptr; 963 } 964 965 966 napi_value 967 Unit::websocket_set_sock(napi_env env, napi_callback_info info) 968 { 969 nxt_napi napi(env); 970 napi_value this_arg, sock; 971 req_data_t *req_data; 972 nxt_unit_request_info_t *req; 973 974 try { 975 this_arg = napi.get_cb_info(info, sock); 976 977 req = napi.get_request_info(sock); 978 979 req_data = (req_data_t *) req->data; 980 req_data->conn_ref = napi.wrap(this_arg, req, conn_destroy); 981 982 } catch (exception &e) { 983 napi.throw_error(e); 984 return nullptr; 985 } 986 987 return this_arg; 988 } 989 990 991 void 992 Unit::conn_destroy(napi_env env, void *nativeObject, void *finalize_hint) 993 { 994 nxt_unit_request_info_t *req; 995 996 req = (nxt_unit_request_info_t *) nativeObject; 997 998 nxt_unit_warn(NULL, "conn_destroy: %p", req); 999 } 1000 1001 1002 void 1003 Unit::sock_destroy(napi_env env, void *nativeObject, void *finalize_hint) 1004 { 1005 nxt_unit_request_info_t *req; 1006 1007 req = (nxt_unit_request_info_t *) nativeObject; 1008 1009 nxt_unit_warn(NULL, "sock_destroy: %p", req); 1010 } 1011 1012 1013 void 1014 Unit::resp_destroy(napi_env env, void *nativeObject, void *finalize_hint) 1015 { 1016 nxt_unit_request_info_t *req; 1017 1018 req = (nxt_unit_request_info_t *) nativeObject; 1019 1020 nxt_unit_warn(NULL, "resp_destroy: %p", req); 1021 } 1022