1 2 /* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6 #include "unit.h" 7 8 #include <unistd.h> 9 #include <fcntl.h> 10 11 #include <uv.h> 12 13 #include <nxt_unit_websocket.h> 14 15 16 static void delete_port_data(uv_handle_t* handle); 17 18 napi_ref Unit::constructor_; 19 20 21 struct port_data_t { 22 nxt_unit_ctx_t *ctx; 23 nxt_unit_port_id_t port_id; 24 uv_poll_t poll; 25 }; 26 27 28 struct req_data_t { 29 napi_ref sock_ref; 30 napi_ref resp_ref; 31 napi_ref conn_ref; 32 }; 33 34 35 Unit::Unit(napi_env env, napi_value jsthis): 36 nxt_napi(env), 37 wrapper_(wrap(jsthis, this, destroy)), 38 unit_ctx_(nullptr) 39 { 40 nxt_unit_debug(NULL, "Unit::Unit()"); 41 } 42 43 44 Unit::~Unit() 45 { 46 delete_reference(wrapper_); 47 48 nxt_unit_debug(NULL, "Unit::~Unit()"); 49 } 50 51 52 napi_value 53 Unit::init(napi_env env, napi_value exports) 54 { 55 nxt_napi napi(env); 56 napi_value ctor; 57 58 napi_property_descriptor unit_props[] = { 59 { "createServer", 0, create_server, 0, 0, 0, napi_default, 0 }, 60 { "listen", 0, listen, 0, 0, 0, napi_default, 0 }, 61 }; 62 63 try { 64 ctor = napi.define_class("Unit", create, 2, unit_props); 65 constructor_ = napi.create_reference(ctor); 66 67 napi.set_named_property(exports, "Unit", ctor); 68 napi.set_named_property(exports, "response_send_headers", 69 response_send_headers); 70 napi.set_named_property(exports, "response_write", response_write); 71 napi.set_named_property(exports, "response_end", response_end); 72 napi.set_named_property(exports, "websocket_send_frame", 73 websocket_send_frame); 74 napi.set_named_property(exports, "websocket_set_sock", 75 websocket_set_sock); 76 napi.set_named_property(exports, "buf_min", nxt_unit_buf_min()); 77 napi.set_named_property(exports, "buf_max", nxt_unit_buf_max()); 78 79 } catch (exception &e) { 80 napi.throw_error(e); 81 return nullptr; 82 } 83 84 return exports; 85 } 86 87 88 void 89 Unit::destroy(napi_env env, void *nativeObject, void *finalize_hint) 90 { 91 Unit *obj = reinterpret_cast<Unit *>(nativeObject); 92 93 delete obj; 94 } 95 96 97 napi_value 98 Unit::create(napi_env env, napi_callback_info info) 99 { 100 nxt_napi napi(env); 101 napi_value target, ctor, instance, jsthis; 102 103 try { 104 target = napi.get_new_target(info); 105 106 if (target != nullptr) { 107 /* Invoked as constructor: `new Unit(...)`. */ 108 jsthis = napi.get_cb_info(info); 109 110 new Unit(env, jsthis); 111 napi.create_reference(jsthis); 112 113 return jsthis; 114 } 115 116 /* Invoked as plain function `Unit(...)`, turn into construct call. */ 117 ctor = napi.get_reference_value(constructor_); 118 instance = napi.new_instance(ctor); 119 napi.create_reference(instance); 120 121 } catch (exception &e) { 122 napi.throw_error(e); 123 return nullptr; 124 } 125 126 return instance; 127 } 128 129 130 napi_value 131 Unit::create_server(napi_env env, napi_callback_info info) 132 { 133 Unit *obj; 134 size_t argc; 135 nxt_napi napi(env); 136 napi_value jsthis, argv; 137 nxt_unit_init_t unit_init; 138 139 argc = 1; 140 141 try { 142 jsthis = napi.get_cb_info(info, argc, &argv); 143 obj = (Unit *) napi.unwrap(jsthis); 144 145 } catch (exception &e) { 146 napi.throw_error(e); 147 return nullptr; 148 } 149 150 memset(&unit_init, 0, sizeof(nxt_unit_init_t)); 151 152 unit_init.data = obj; 153 unit_init.callbacks.request_handler = request_handler_cb; 154 unit_init.callbacks.websocket_handler = websocket_handler_cb; 155 unit_init.callbacks.close_handler = close_handler_cb; 156 unit_init.callbacks.shm_ack_handler = shm_ack_handler_cb; 157 unit_init.callbacks.add_port = add_port; 158 unit_init.callbacks.remove_port = remove_port; 159 unit_init.callbacks.quit = quit_cb; 160 161 unit_init.request_data_size = sizeof(req_data_t); 162 163 obj->unit_ctx_ = nxt_unit_init(&unit_init); 164 if (obj->unit_ctx_ == NULL) { 165 goto failed; 166 } 167 168 return nullptr; 169 170 failed: 171 172 napi_throw_error(env, NULL, "Failed to create Unit object"); 173 174 return nullptr; 175 } 176 177 178 napi_value 179 Unit::listen(napi_env env, napi_callback_info info) 180 { 181 return nullptr; 182 } 183 184 185 void 186 Unit::request_handler_cb(nxt_unit_request_info_t *req) 187 { 188 Unit *obj; 189 190 obj = reinterpret_cast<Unit *>(req->unit->data); 191 192 obj->request_handler(req); 193 } 194 195 196 void 197 Unit::request_handler(nxt_unit_request_info_t *req) 198 { 199 napi_value socket, request, response, server_obj, emit_request; 200 201 memset(req->data, 0, sizeof(req_data_t)); 202 203 try { 204 nxt_handle_scope scope(env()); 205 206 server_obj = get_server_object(); 207 208 socket = create_socket(server_obj, req); 209 request = create_request(server_obj, socket); 210 response = create_response(server_obj, request, req); 211 212 create_headers(req, request); 213 214 emit_request = get_named_property(server_obj, "emit_request"); 215 216 nxt_async_context async_context(env(), "request_handler"); 217 nxt_callback_scope async_scope(async_context); 218 219 make_callback(async_context, server_obj, emit_request, request, 220 response); 221 222 } catch (exception &e) { 223 nxt_unit_req_warn(req, "request_handler: %s", e.str); 224 } 225 } 226 227 228 void 229 Unit::websocket_handler_cb(nxt_unit_websocket_frame_t *ws) 230 { 231 Unit *obj; 232 233 obj = reinterpret_cast<Unit *>(ws->req->unit->data); 234 235 obj->websocket_handler(ws); 236 } 237 238 239 void 240 Unit::websocket_handler(nxt_unit_websocket_frame_t *ws) 241 { 242 napi_value frame, server_obj, process_frame, conn; 243 req_data_t *req_data; 244 245 req_data = (req_data_t *) ws->req->data; 246 247 try { 248 nxt_handle_scope scope(env()); 249 250 server_obj = get_server_object(); 251 252 frame = create_websocket_frame(server_obj, ws); 253 254 conn = get_reference_value(req_data->conn_ref); 255 256 process_frame = get_named_property(conn, "processFrame"); 257 258 nxt_async_context async_context(env(), "websocket_handler"); 259 nxt_callback_scope async_scope(async_context); 260 261 make_callback(async_context, conn, process_frame, frame); 262 263 } catch (exception &e) { 264 nxt_unit_req_warn(ws->req, "websocket_handler: %s", e.str); 265 } 266 267 nxt_unit_websocket_done(ws); 268 } 269 270 271 void 272 Unit::close_handler_cb(nxt_unit_request_info_t *req) 273 { 274 Unit *obj; 275 276 obj = reinterpret_cast<Unit *>(req->unit->data); 277 278 obj->close_handler(req); 279 } 280 281 282 void 283 Unit::close_handler(nxt_unit_request_info_t *req) 284 { 285 napi_value conn_handle_close, conn; 286 req_data_t *req_data; 287 288 req_data = (req_data_t *) req->data; 289 290 try { 291 nxt_handle_scope scope(env()); 292 293 conn = get_reference_value(req_data->conn_ref); 294 295 conn_handle_close = get_named_property(conn, "handleSocketClose"); 296 297 nxt_async_context async_context(env(), "close_handler"); 298 nxt_callback_scope async_scope(async_context); 299 300 make_callback(async_context, conn, conn_handle_close, 301 nxt_napi::create(0)); 302 303 remove_wrap(req_data->sock_ref); 304 remove_wrap(req_data->resp_ref); 305 remove_wrap(req_data->conn_ref); 306 307 } catch (exception &e) { 308 nxt_unit_req_warn(req, "close_handler: %s", e.str); 309 310 return; 311 } 312 313 nxt_unit_request_done(req, NXT_UNIT_OK); 314 } 315 316 317 void 318 Unit::shm_ack_handler_cb(nxt_unit_ctx_t *ctx) 319 { 320 Unit *obj; 321 322 obj = reinterpret_cast<Unit *>(ctx->unit->data); 323 324 obj->shm_ack_handler(ctx); 325 } 326 327 328 void 329 Unit::shm_ack_handler(nxt_unit_ctx_t *ctx) 330 { 331 napi_value server_obj, emit_drain; 332 333 try { 334 nxt_handle_scope scope(env()); 335 336 server_obj = get_server_object(); 337 338 emit_drain = get_named_property(server_obj, "emit_drain"); 339 340 nxt_async_context async_context(env(), "shm_ack_handler"); 341 nxt_callback_scope async_scope(async_context); 342 343 make_callback(async_context, server_obj, emit_drain); 344 345 } catch (exception &e) { 346 nxt_unit_warn(ctx, "shm_ack_handler: %s", e.str); 347 } 348 } 349 350 351 static void 352 nxt_uv_read_callback(uv_poll_t *handle, int status, int events) 353 { 354 nxt_unit_run_once((nxt_unit_ctx_t *) handle->data); 355 } 356 357 358 int 359 Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 360 { 361 int err; 362 Unit *obj; 363 uv_loop_t *loop; 364 port_data_t *data; 365 napi_status status; 366 367 if (port->in_fd != -1) { 368 obj = reinterpret_cast<Unit *>(ctx->unit->data); 369 370 if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) { 371 nxt_unit_warn(ctx, "fcntl(%d, O_NONBLOCK) failed: %s (%d)", 372 port->in_fd, strerror(errno), errno); 373 return -1; 374 } 375 376 status = napi_get_uv_event_loop(obj->env(), &loop); 377 if (status != napi_ok) { 378 nxt_unit_warn(ctx, "Failed to get uv.loop"); 379 return NXT_UNIT_ERROR; 380 } 381 382 data = new port_data_t; 383 384 err = uv_poll_init(loop, &data->poll, port->in_fd); 385 if (err < 0) { 386 nxt_unit_warn(ctx, "Failed to init uv.poll"); 387 return NXT_UNIT_ERROR; 388 } 389 390 err = uv_poll_start(&data->poll, UV_READABLE, nxt_uv_read_callback); 391 if (err < 0) { 392 nxt_unit_warn(ctx, "Failed to start uv.poll"); 393 return NXT_UNIT_ERROR; 394 } 395 396 port->data = data; 397 398 data->ctx = ctx; 399 data->port_id = port->id; 400 data->poll.data = ctx; 401 } 402 403 return NXT_UNIT_OK; 404 } 405 406 407 inline bool 408 operator == (const nxt_unit_port_id_t &p1, const nxt_unit_port_id_t &p2) 409 { 410 return p1.pid == p2.pid && p1.id == p2.id; 411 } 412 413 414 void 415 Unit::remove_port(nxt_unit_t *unit, nxt_unit_port_t *port) 416 { 417 port_data_t *data; 418 419 if (port->data != NULL) { 420 data = (port_data_t *) port->data; 421 422 if (data->port_id == port->id) { 423 uv_poll_stop(&data->poll); 424 425 data->poll.data = data; 426 uv_close((uv_handle_t *) &data->poll, delete_port_data); 427 } 428 } 429 } 430 431 432 static void 433 delete_port_data(uv_handle_t* handle) 434 { 435 port_data_t *data; 436 437 data = (port_data_t *) handle->data; 438 439 delete data; 440 } 441 442 443 void 444 Unit::quit_cb(nxt_unit_ctx_t *ctx) 445 { 446 Unit *obj; 447 448 obj = reinterpret_cast<Unit *>(ctx->unit->data); 449 450 obj->quit(ctx); 451 } 452 453 454 void 455 Unit::quit(nxt_unit_ctx_t *ctx) 456 { 457 napi_value server_obj, emit_close; 458 459 try { 460 nxt_handle_scope scope(env()); 461 462 server_obj = get_server_object(); 463 464 emit_close = get_named_property(server_obj, "emit_close"); 465 466 nxt_async_context async_context(env(), "unit_quit"); 467 nxt_callback_scope async_scope(async_context); 468 469 make_callback(async_context, server_obj, emit_close); 470 471 } catch (exception &e) { 472 nxt_unit_debug(ctx, "quit: %s", e.str); 473 } 474 475 nxt_unit_done(ctx); 476 } 477 478 479 napi_value 480 Unit::get_server_object() 481 { 482 napi_value unit_obj; 483 484 unit_obj = get_reference_value(wrapper_); 485 486 return get_named_property(unit_obj, "server"); 487 } 488 489 490 void 491 Unit::create_headers(nxt_unit_request_info_t *req, napi_value request) 492 { 493 void *data; 494 uint32_t i; 495 napi_value headers, raw_headers, buffer; 496 napi_status status; 497 nxt_unit_request_t *r; 498 499 r = req->request; 500 501 headers = create_object(); 502 503 status = napi_create_array_with_length(env(), r->fields_count * 2, 504 &raw_headers); 505 if (status != napi_ok) { 506 throw exception("Failed to create array"); 507 } 508 509 for (i = 0; i < r->fields_count; i++) { 510 append_header(r->fields + i, headers, raw_headers, i); 511 } 512 513 set_named_property(request, "headers", headers); 514 set_named_property(request, "rawHeaders", raw_headers); 515 set_named_property(request, "httpVersion", r->version, r->version_length); 516 set_named_property(request, "method", r->method, r->method_length); 517 set_named_property(request, "url", r->target, r->target_length); 518 519 set_named_property(request, "_websocket_handshake", r->websocket_handshake); 520 521 buffer = create_buffer((size_t) req->content_length, &data); 522 nxt_unit_request_read(req, data, req->content_length); 523 524 set_named_property(request, "_data", buffer); 525 } 526 527 528 inline char 529 lowcase(char c) 530 { 531 return (c >= 'A' && c <= 'Z') ? (c | 0x20) : c; 532 } 533 534 535 inline void 536 Unit::append_header(nxt_unit_field_t *f, napi_value headers, 537 napi_value raw_headers, uint32_t idx) 538 { 539 char *name; 540 uint8_t i; 541 napi_value str, vstr; 542 543 name = (char *) nxt_unit_sptr_get(&f->name); 544 545 str = create_string_latin1(name, f->name_length); 546 547 for (i = 0; i < f->name_length; i++) { 548 name[i] = lowcase(name[i]); 549 } 550 551 vstr = set_named_property(headers, name, f->value, f->value_length); 552 553 set_element(raw_headers, idx * 2, str); 554 set_element(raw_headers, idx * 2 + 1, vstr); 555 } 556 557 558 napi_value 559 Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req) 560 { 561 napi_value constructor, res; 562 req_data_t *req_data; 563 nxt_unit_request_t *r; 564 565 r = req->request; 566 567 constructor = get_named_property(server_obj, "Socket"); 568 569 res = new_instance(constructor); 570 571 req_data = (req_data_t *) req->data; 572 req_data->sock_ref = wrap(res, req, sock_destroy); 573 574 set_named_property(res, "remoteAddress", r->remote, r->remote_length); 575 set_named_property(res, "localAddress", r->local, r->local_length); 576 577 return res; 578 } 579 580 581 napi_value 582 Unit::create_request(napi_value server_obj, napi_value socket) 583 { 584 napi_value constructor; 585 586 constructor = get_named_property(server_obj, "ServerRequest"); 587 588 return new_instance(constructor, server_obj, socket); 589 } 590 591 592 napi_value 593 Unit::create_response(napi_value server_obj, napi_value request, 594 nxt_unit_request_info_t *req) 595 { 596 napi_value constructor, res; 597 req_data_t *req_data; 598 599 constructor = get_named_property(server_obj, "ServerResponse"); 600 601 res = new_instance(constructor, request); 602 603 req_data = (req_data_t *) req->data; 604 req_data->resp_ref = wrap(res, req, resp_destroy); 605 606 return res; 607 } 608 609 610 napi_value 611 Unit::create_websocket_frame(napi_value server_obj, 612 nxt_unit_websocket_frame_t *ws) 613 { 614 void *data; 615 napi_value constructor, res, buffer; 616 uint8_t sc[2]; 617 618 constructor = get_named_property(server_obj, "WebSocketFrame"); 619 620 res = new_instance(constructor); 621 622 set_named_property(res, "fin", (bool) ws->header->fin); 623 set_named_property(res, "opcode", ws->header->opcode); 624 set_named_property(res, "length", (int64_t) ws->payload_len); 625 626 if (ws->header->opcode == NXT_WEBSOCKET_OP_CLOSE) { 627 if (ws->payload_len >= 2) { 628 nxt_unit_websocket_read(ws, sc, 2); 629 630 set_named_property(res, "closeStatus", 631 (((uint16_t) sc[0]) << 8) | sc[1]); 632 633 } else { 634 set_named_property(res, "closeStatus", -1); 635 } 636 } 637 638 buffer = create_buffer((size_t) ws->content_length, &data); 639 nxt_unit_websocket_read(ws, data, ws->content_length); 640 641 set_named_property(res, "binaryPayload", buffer); 642 643 return res; 644 } 645 646 647 napi_value 648 Unit::response_send_headers(napi_env env, napi_callback_info info) 649 { 650 int ret; 651 char *ptr, *name_ptr; 652 bool is_array; 653 size_t argc, name_len, value_len; 654 uint32_t status_code, header_len, keys_len, array_len; 655 uint32_t keys_count, i, j; 656 uint16_t hash; 657 nxt_napi napi(env); 658 napi_value this_arg, headers, keys, name, value, array_val; 659 napi_value array_entry; 660 napi_valuetype val_type; 661 nxt_unit_field_t *f; 662 nxt_unit_request_info_t *req; 663 napi_value argv[4]; 664 665 argc = 4; 666 667 try { 668 this_arg = napi.get_cb_info(info, argc, argv); 669 if (argc != 4) { 670 napi.throw_error("Wrong args count. Expected: " 671 "statusCode, headers, headers count, " 672 "headers length"); 673 return nullptr; 674 } 675 676 req = napi.get_request_info(this_arg); 677 status_code = napi.get_value_uint32(argv[0]); 678 keys_count = napi.get_value_uint32(argv[2]); 679 header_len = napi.get_value_uint32(argv[3]); 680 681 headers = argv[1]; 682 683 ret = nxt_unit_response_init(req, status_code, keys_count, header_len); 684 if (ret != NXT_UNIT_OK) { 685 napi.throw_error("Failed to create response"); 686 return nullptr; 687 } 688 689 /* 690 * Each name and value are 0-terminated by libunit. 691 * Need to add extra 2 bytes for each header. 692 */ 693 header_len += keys_count * 2; 694 695 keys = napi.get_property_names(headers); 696 keys_len = napi.get_array_length(keys); 697 698 ptr = req->response_buf->free; 699 700 for (i = 0; i < keys_len; i++) { 701 name = napi.get_element(keys, i); 702 703 array_entry = napi.get_property(headers, name); 704 705 name = napi.get_element(array_entry, 0); 706 value = napi.get_element(array_entry, 1); 707 708 name_len = napi.get_value_string_latin1(name, ptr, header_len); 709 name_ptr = ptr; 710 711 ptr += name_len + 1; 712 header_len -= name_len + 1; 713 714 hash = nxt_unit_field_hash(name_ptr, name_len); 715 716 is_array = napi.is_array(value); 717 718 if (is_array) { 719 array_len = napi.get_array_length(value); 720 721 for (j = 0; j < array_len; j++) { 722 array_val = napi.get_element(value, j); 723 724 val_type = napi.type_of(array_val); 725 726 if (val_type != napi_string) { 727 array_val = napi.coerce_to_string(array_val); 728 } 729 730 value_len = napi.get_value_string_latin1(array_val, ptr, 731 header_len); 732 733 f = req->response->fields + req->response->fields_count; 734 f->skip = 0; 735 736 nxt_unit_sptr_set(&f->name, name_ptr); 737 738 f->name_length = name_len; 739 f->hash = hash; 740 741 nxt_unit_sptr_set(&f->value, ptr); 742 f->value_length = (uint32_t) value_len; 743 744 ptr += value_len + 1; 745 header_len -= value_len + 1; 746 747 req->response->fields_count++; 748 } 749 750 } else { 751 val_type = napi.type_of(value); 752 753 if (val_type != napi_string) { 754 value = napi.coerce_to_string(value); 755 } 756 757 value_len = napi.get_value_string_latin1(value, ptr, header_len); 758 759 f = req->response->fields + req->response->fields_count; 760 f->skip = 0; 761 762 nxt_unit_sptr_set(&f->name, name_ptr); 763 764 f->name_length = name_len; 765 f->hash = hash; 766 767 nxt_unit_sptr_set(&f->value, ptr); 768 f->value_length = (uint32_t) value_len; 769 770 ptr += value_len + 1; 771 header_len -= value_len + 1; 772 773 req->response->fields_count++; 774 } 775 } 776 777 } catch (exception &e) { 778 napi.throw_error(e); 779 return nullptr; 780 } 781 782 req->response_buf->free = ptr; 783 784 ret = nxt_unit_response_send(req); 785 if (ret != NXT_UNIT_OK) { 786 napi.throw_error("Failed to send response"); 787 return nullptr; 788 } 789 790 return this_arg; 791 } 792 793 794 napi_value 795 Unit::response_write(napi_env env, napi_callback_info info) 796 { 797 int ret; 798 void *ptr; 799 size_t argc, have_buf_len; 800 ssize_t res_len; 801 uint32_t buf_start, buf_len; 802 nxt_napi napi(env); 803 napi_value this_arg; 804 nxt_unit_buf_t *buf; 805 napi_valuetype buf_type; 806 nxt_unit_request_info_t *req; 807 napi_value argv[3]; 808 809 argc = 3; 810 811 try { 812 this_arg = napi.get_cb_info(info, argc, argv); 813 if (argc != 3) { 814 throw exception("Wrong args count. Expected: " 815 "chunk, start, length"); 816 } 817 818 req = napi.get_request_info(this_arg); 819 buf_type = napi.type_of(argv[0]); 820 buf_start = napi.get_value_uint32(argv[1]); 821 buf_len = napi.get_value_uint32(argv[2]) + 1; 822 823 if (buf_type == napi_string) { 824 /* TODO: will work only for utf8 content-type */ 825 826 if (req->response_buf != NULL 827 && req->response_buf->end >= req->response_buf->free + buf_len) 828 { 829 buf = req->response_buf; 830 831 } else { 832 buf = nxt_unit_response_buf_alloc(req, buf_len); 833 if (buf == NULL) { 834 throw exception("Failed to allocate response buffer"); 835 } 836 } 837 838 have_buf_len = napi.get_value_string_utf8(argv[0], buf->free, 839 buf_len); 840 841 buf->free += have_buf_len; 842 843 ret = nxt_unit_buf_send(buf); 844 if (ret == NXT_UNIT_OK) { 845 res_len = have_buf_len; 846 } 847 848 } else { 849 ptr = napi.get_buffer_info(argv[0], have_buf_len); 850 851 if (buf_start > 0) { 852 ptr = ((uint8_t *) ptr) + buf_start; 853 have_buf_len -= buf_start; 854 } 855 856 res_len = nxt_unit_response_write_nb(req, ptr, have_buf_len, 0); 857 858 ret = res_len < 0 ? -res_len : (int) NXT_UNIT_OK; 859 } 860 861 if (ret != NXT_UNIT_OK) { 862 throw exception("Failed to send body buf"); 863 } 864 } catch (exception &e) { 865 napi.throw_error(e); 866 return nullptr; 867 } 868 869 return napi.create((int64_t) res_len); 870 } 871 872 873 napi_value 874 Unit::response_end(napi_env env, napi_callback_info info) 875 { 876 nxt_napi napi(env); 877 napi_value this_arg; 878 req_data_t *req_data; 879 nxt_unit_request_info_t *req; 880 881 try { 882 this_arg = napi.get_cb_info(info); 883 884 req = napi.get_request_info(this_arg); 885 886 req_data = (req_data_t *) req->data; 887 888 napi.remove_wrap(req_data->sock_ref); 889 napi.remove_wrap(req_data->resp_ref); 890 napi.remove_wrap(req_data->conn_ref); 891 892 } catch (exception &e) { 893 napi.throw_error(e); 894 return nullptr; 895 } 896 897 nxt_unit_request_done(req, NXT_UNIT_OK); 898 899 return this_arg; 900 } 901 902 903 napi_value 904 Unit::websocket_send_frame(napi_env env, napi_callback_info info) 905 { 906 int ret, iovec_len; 907 bool fin; 908 size_t buf_len; 909 uint32_t opcode, sc; 910 nxt_napi napi(env); 911 napi_value this_arg, frame, payload; 912 nxt_unit_request_info_t *req; 913 char status_code[2]; 914 struct iovec iov[2]; 915 916 iovec_len = 0; 917 918 try { 919 this_arg = napi.get_cb_info(info, frame); 920 921 req = napi.get_request_info(this_arg); 922 923 opcode = napi.get_value_uint32(napi.get_named_property(frame, 924 "opcode")); 925 if (opcode == NXT_WEBSOCKET_OP_CLOSE) { 926 sc = napi.get_value_uint32(napi.get_named_property(frame, 927 "closeStatus")); 928 status_code[0] = (sc >> 8) & 0xFF; 929 status_code[1] = sc & 0xFF; 930 931 iov[iovec_len].iov_base = status_code; 932 iov[iovec_len].iov_len = 2; 933 iovec_len++; 934 } 935 936 try { 937 fin = napi.get_value_bool(napi.get_named_property(frame, "fin")); 938 939 } catch (exception &e) { 940 fin = true; 941 } 942 943 payload = napi.get_named_property(frame, "binaryPayload"); 944 945 if (napi.is_buffer(payload)) { 946 iov[iovec_len].iov_base = napi.get_buffer_info(payload, buf_len); 947 948 } else { 949 buf_len = 0; 950 } 951 952 } catch (exception &e) { 953 napi.throw_error(e); 954 return nullptr; 955 } 956 957 if (buf_len > 0) { 958 iov[iovec_len].iov_len = buf_len; 959 iovec_len++; 960 } 961 962 ret = nxt_unit_websocket_sendv(req, opcode, fin ? 1 : 0, iov, iovec_len); 963 if (ret != NXT_UNIT_OK) { 964 goto failed; 965 } 966 967 return this_arg; 968 969 failed: 970 971 napi.throw_error("Failed to send frame"); 972 973 return nullptr; 974 } 975 976 977 napi_value 978 Unit::websocket_set_sock(napi_env env, napi_callback_info info) 979 { 980 nxt_napi napi(env); 981 napi_value this_arg, sock; 982 req_data_t *req_data; 983 nxt_unit_request_info_t *req; 984 985 try { 986 this_arg = napi.get_cb_info(info, sock); 987 988 req = napi.get_request_info(sock); 989 990 req_data = (req_data_t *) req->data; 991 req_data->conn_ref = napi.wrap(this_arg, req, conn_destroy); 992 993 } catch (exception &e) { 994 napi.throw_error(e); 995 return nullptr; 996 } 997 998 return this_arg; 999 } 1000 1001 1002 void 1003 Unit::conn_destroy(napi_env env, void *nativeObject, void *finalize_hint) 1004 { 1005 nxt_unit_request_info_t *req; 1006 1007 req = (nxt_unit_request_info_t *) nativeObject; 1008 1009 nxt_unit_warn(NULL, "conn_destroy: %p", req); 1010 } 1011 1012 1013 void 1014 Unit::sock_destroy(napi_env env, void *nativeObject, void *finalize_hint) 1015 { 1016 nxt_unit_request_info_t *req; 1017 1018 req = (nxt_unit_request_info_t *) nativeObject; 1019 1020 nxt_unit_warn(NULL, "sock_destroy: %p", req); 1021 } 1022 1023 1024 void 1025 Unit::resp_destroy(napi_env env, void *nativeObject, void *finalize_hint) 1026 { 1027 nxt_unit_request_info_t *req; 1028 1029 req = (nxt_unit_request_info_t *) nativeObject; 1030 1031 nxt_unit_warn(NULL, "resp_destroy: %p", req); 1032 } 1033