1 2 /* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6 #include "unit.h" 7 8 #include <unistd.h> 9 #include <fcntl.h> 10 11 #include <uv.h> 12 13 #include <nxt_unit_websocket.h> 14 15 16 static void delete_port_data(uv_handle_t* handle); 17 18 napi_ref Unit::constructor_; 19 20 21 struct nxt_nodejs_ctx_t { 22 nxt_unit_port_id_t port_id; 23 uv_poll_t poll; 24 }; 25 26 27 struct req_data_t { 28 napi_ref sock_ref; 29 napi_ref resp_ref; 30 napi_ref conn_ref; 31 }; 32 33 34 Unit::Unit(napi_env env, napi_value jsthis): 35 nxt_napi(env), 36 wrapper_(wrap(jsthis, this, destroy)), 37 unit_ctx_(nullptr) 38 { 39 nxt_unit_debug(NULL, "Unit::Unit()"); 40 } 41 42 43 Unit::~Unit() 44 { 45 delete_reference(wrapper_); 46 47 nxt_unit_debug(NULL, "Unit::~Unit()"); 48 } 49 50 51 napi_value 52 Unit::init(napi_env env, napi_value exports) 53 { 54 nxt_napi napi(env); 55 napi_value ctor; 56 57 napi_property_descriptor unit_props[] = { 58 { "createServer", 0, create_server, 0, 0, 0, napi_default, 0 }, 59 { "listen", 0, listen, 0, 0, 0, napi_default, 0 }, 60 }; 61 62 try { 63 ctor = napi.define_class("Unit", create, 2, unit_props); 64 constructor_ = napi.create_reference(ctor); 65 66 napi.set_named_property(exports, "Unit", ctor); 67 napi.set_named_property(exports, "response_send_headers", 68 response_send_headers); 69 napi.set_named_property(exports, "response_write", response_write); 70 napi.set_named_property(exports, "response_end", response_end); 71 napi.set_named_property(exports, "websocket_send_frame", 72 websocket_send_frame); 73 napi.set_named_property(exports, "websocket_set_sock", 74 websocket_set_sock); 75 napi.set_named_property(exports, "buf_min", nxt_unit_buf_min()); 76 napi.set_named_property(exports, "buf_max", nxt_unit_buf_max()); 77 78 } catch (exception &e) { 79 napi.throw_error(e); 80 return nullptr; 81 } 82 83 return exports; 84 } 85 86 87 void 88 Unit::destroy(napi_env env, void *nativeObject, void *finalize_hint) 89 { 90 Unit *obj = reinterpret_cast<Unit *>(nativeObject); 91 92 delete obj; 93 } 94 95 96 napi_value 97 Unit::create(napi_env env, napi_callback_info info) 98 { 99 nxt_napi napi(env); 100 napi_value target, ctor, instance, jsthis; 101 102 try { 103 target = napi.get_new_target(info); 104 105 if (target != nullptr) { 106 /* Invoked as constructor: `new Unit(...)`. */ 107 jsthis = napi.get_cb_info(info); 108 109 new Unit(env, jsthis); 110 napi.create_reference(jsthis); 111 112 return jsthis; 113 } 114 115 /* Invoked as plain function `Unit(...)`, turn into construct call. */ 116 ctor = napi.get_reference_value(constructor_); 117 instance = napi.new_instance(ctor); 118 napi.create_reference(instance); 119 120 } catch (exception &e) { 121 napi.throw_error(e); 122 return nullptr; 123 } 124 125 return instance; 126 } 127 128 129 napi_value 130 Unit::create_server(napi_env env, napi_callback_info info) 131 { 132 Unit *obj; 133 size_t argc; 134 nxt_napi napi(env); 135 napi_value jsthis, argv; 136 nxt_unit_init_t unit_init; 137 138 argc = 1; 139 140 try { 141 jsthis = napi.get_cb_info(info, argc, &argv); 142 obj = (Unit *) napi.unwrap(jsthis); 143 144 } catch (exception &e) { 145 napi.throw_error(e); 146 return nullptr; 147 } 148 149 memset(&unit_init, 0, sizeof(nxt_unit_init_t)); 150 151 unit_init.data = obj; 152 unit_init.callbacks.request_handler = request_handler_cb; 153 unit_init.callbacks.websocket_handler = websocket_handler_cb; 154 unit_init.callbacks.close_handler = close_handler_cb; 155 unit_init.callbacks.shm_ack_handler = shm_ack_handler_cb; 156 unit_init.callbacks.add_port = add_port; 157 unit_init.callbacks.remove_port = remove_port; 158 unit_init.callbacks.quit = quit_cb; 159 160 unit_init.request_data_size = sizeof(req_data_t); 161 162 obj->unit_ctx_ = nxt_unit_init(&unit_init); 163 if (obj->unit_ctx_ == NULL) { 164 goto failed; 165 } 166 167 return nullptr; 168 169 failed: 170 171 napi_throw_error(env, NULL, "Failed to create Unit object"); 172 173 return nullptr; 174 } 175 176 177 napi_value 178 Unit::listen(napi_env env, napi_callback_info info) 179 { 180 return nullptr; 181 } 182 183 184 void 185 Unit::request_handler_cb(nxt_unit_request_info_t *req) 186 { 187 Unit *obj; 188 189 obj = reinterpret_cast<Unit *>(req->unit->data); 190 191 obj->request_handler(req); 192 } 193 194 195 void 196 Unit::request_handler(nxt_unit_request_info_t *req) 197 { 198 napi_value socket, request, response, server_obj, emit_request; 199 200 memset(req->data, 0, sizeof(req_data_t)); 201 202 try { 203 nxt_handle_scope scope(env()); 204 205 server_obj = get_server_object(); 206 207 socket = create_socket(server_obj, req); 208 request = create_request(server_obj, socket); 209 response = create_response(server_obj, request, req); 210 211 create_headers(req, request); 212 213 emit_request = get_named_property(server_obj, "emit_request"); 214 215 nxt_async_context async_context(env(), "request_handler"); 216 nxt_callback_scope async_scope(async_context); 217 218 make_callback(async_context, server_obj, emit_request, request, 219 response); 220 221 } catch (exception &e) { 222 nxt_unit_req_warn(req, "request_handler: %s", e.str); 223 } 224 } 225 226 227 void 228 Unit::websocket_handler_cb(nxt_unit_websocket_frame_t *ws) 229 { 230 Unit *obj; 231 232 obj = reinterpret_cast<Unit *>(ws->req->unit->data); 233 234 obj->websocket_handler(ws); 235 } 236 237 238 void 239 Unit::websocket_handler(nxt_unit_websocket_frame_t *ws) 240 { 241 napi_value frame, server_obj, process_frame, conn; 242 req_data_t *req_data; 243 244 req_data = (req_data_t *) ws->req->data; 245 246 try { 247 nxt_handle_scope scope(env()); 248 249 server_obj = get_server_object(); 250 251 frame = create_websocket_frame(server_obj, ws); 252 253 conn = get_reference_value(req_data->conn_ref); 254 255 process_frame = get_named_property(conn, "processFrame"); 256 257 nxt_async_context async_context(env(), "websocket_handler"); 258 nxt_callback_scope async_scope(async_context); 259 260 make_callback(async_context, conn, process_frame, frame); 261 262 } catch (exception &e) { 263 nxt_unit_req_warn(ws->req, "websocket_handler: %s", e.str); 264 } 265 266 nxt_unit_websocket_done(ws); 267 } 268 269 270 void 271 Unit::close_handler_cb(nxt_unit_request_info_t *req) 272 { 273 Unit *obj; 274 275 obj = reinterpret_cast<Unit *>(req->unit->data); 276 277 obj->close_handler(req); 278 } 279 280 281 void 282 Unit::close_handler(nxt_unit_request_info_t *req) 283 { 284 napi_value conn_handle_close, conn; 285 req_data_t *req_data; 286 287 req_data = (req_data_t *) req->data; 288 289 try { 290 nxt_handle_scope scope(env()); 291 292 conn = get_reference_value(req_data->conn_ref); 293 294 conn_handle_close = get_named_property(conn, "handleSocketClose"); 295 296 nxt_async_context async_context(env(), "close_handler"); 297 nxt_callback_scope async_scope(async_context); 298 299 make_callback(async_context, conn, conn_handle_close, 300 nxt_napi::create(0)); 301 302 remove_wrap(req_data->sock_ref); 303 remove_wrap(req_data->resp_ref); 304 remove_wrap(req_data->conn_ref); 305 306 } catch (exception &e) { 307 nxt_unit_req_warn(req, "close_handler: %s", e.str); 308 309 return; 310 } 311 312 nxt_unit_request_done(req, NXT_UNIT_OK); 313 } 314 315 316 void 317 Unit::shm_ack_handler_cb(nxt_unit_ctx_t *ctx) 318 { 319 Unit *obj; 320 321 obj = reinterpret_cast<Unit *>(ctx->unit->data); 322 323 obj->shm_ack_handler(ctx); 324 } 325 326 327 void 328 Unit::shm_ack_handler(nxt_unit_ctx_t *ctx) 329 { 330 napi_value server_obj, emit_drain; 331 332 try { 333 nxt_handle_scope scope(env()); 334 335 server_obj = get_server_object(); 336 337 emit_drain = get_named_property(server_obj, "emit_drain"); 338 339 nxt_async_context async_context(env(), "shm_ack_handler"); 340 nxt_callback_scope async_scope(async_context); 341 342 make_callback(async_context, server_obj, emit_drain); 343 344 } catch (exception &e) { 345 nxt_unit_warn(ctx, "shm_ack_handler: %s", e.str); 346 } 347 } 348 349 350 static void 351 nxt_uv_read_callback(uv_poll_t *handle, int status, int events) 352 { 353 nxt_unit_run_once((nxt_unit_ctx_t *) handle->data); 354 } 355 356 357 int 358 Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 359 { 360 int err; 361 Unit *obj; 362 uv_loop_t *loop; 363 napi_status status; 364 nxt_nodejs_ctx_t *node_ctx; 365 366 if (port->in_fd != -1) { 367 obj = reinterpret_cast<Unit *>(ctx->unit->data); 368 369 if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) { 370 nxt_unit_warn(ctx, "fcntl(%d, O_NONBLOCK) failed: %s (%d)", 371 port->in_fd, strerror(errno), errno); 372 return -1; 373 } 374 375 status = napi_get_uv_event_loop(obj->env(), &loop); 376 if (status != napi_ok) { 377 nxt_unit_warn(ctx, "Failed to get uv.loop"); 378 return NXT_UNIT_ERROR; 379 } 380 381 node_ctx = new nxt_nodejs_ctx_t; 382 383 err = uv_poll_init(loop, &node_ctx->poll, port->in_fd); 384 if (err < 0) { 385 nxt_unit_warn(ctx, "Failed to init uv.poll"); 386 return NXT_UNIT_ERROR; 387 } 388 389 err = uv_poll_start(&node_ctx->poll, UV_READABLE, nxt_uv_read_callback); 390 if (err < 0) { 391 nxt_unit_warn(ctx, "Failed to start uv.poll"); 392 return NXT_UNIT_ERROR; 393 } 394 395 ctx->data = node_ctx; 396 397 node_ctx->port_id = port->id; 398 node_ctx->poll.data = ctx; 399 } 400 401 return nxt_unit_add_port(ctx, port); 402 } 403 404 405 inline bool 406 operator == (const nxt_unit_port_id_t &p1, const nxt_unit_port_id_t &p2) 407 { 408 return p1.pid == p2.pid && p1.id == p2.id; 409 } 410 411 412 void 413 Unit::remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) 414 { 415 nxt_nodejs_ctx_t *node_ctx; 416 417 if (ctx->data != NULL) { 418 node_ctx = (nxt_nodejs_ctx_t *) ctx->data; 419 420 if (node_ctx->port_id == *port_id) { 421 uv_poll_stop(&node_ctx->poll); 422 423 node_ctx->poll.data = node_ctx; 424 uv_close((uv_handle_t *) &node_ctx->poll, delete_port_data); 425 426 ctx->data = NULL; 427 } 428 } 429 430 nxt_unit_remove_port(ctx, port_id); 431 } 432 433 434 static void 435 delete_port_data(uv_handle_t* handle) 436 { 437 nxt_nodejs_ctx_t *node_ctx; 438 439 node_ctx = (nxt_nodejs_ctx_t *) handle->data; 440 441 delete node_ctx; 442 } 443 444 445 void 446 Unit::quit_cb(nxt_unit_ctx_t *ctx) 447 { 448 Unit *obj; 449 450 obj = reinterpret_cast<Unit *>(ctx->unit->data); 451 452 obj->quit(ctx); 453 } 454 455 456 void 457 Unit::quit(nxt_unit_ctx_t *ctx) 458 { 459 napi_value server_obj, emit_close; 460 461 try { 462 nxt_handle_scope scope(env()); 463 464 server_obj = get_server_object(); 465 466 emit_close = get_named_property(server_obj, "emit_close"); 467 468 nxt_async_context async_context(env(), "unit_quit"); 469 nxt_callback_scope async_scope(async_context); 470 471 make_callback(async_context, server_obj, emit_close); 472 473 } catch (exception &e) { 474 nxt_unit_debug(ctx, "quit: %s", e.str); 475 } 476 477 nxt_unit_done(ctx); 478 } 479 480 481 napi_value 482 Unit::get_server_object() 483 { 484 napi_value unit_obj; 485 486 unit_obj = get_reference_value(wrapper_); 487 488 return get_named_property(unit_obj, "server"); 489 } 490 491 492 void 493 Unit::create_headers(nxt_unit_request_info_t *req, napi_value request) 494 { 495 void *data; 496 uint32_t i; 497 napi_value headers, raw_headers, buffer; 498 napi_status status; 499 nxt_unit_request_t *r; 500 501 r = req->request; 502 503 headers = create_object(); 504 505 status = napi_create_array_with_length(env(), r->fields_count * 2, 506 &raw_headers); 507 if (status != napi_ok) { 508 throw exception("Failed to create array"); 509 } 510 511 for (i = 0; i < r->fields_count; i++) { 512 append_header(r->fields + i, headers, raw_headers, i); 513 } 514 515 set_named_property(request, "headers", headers); 516 set_named_property(request, "rawHeaders", raw_headers); 517 set_named_property(request, "httpVersion", r->version, r->version_length); 518 set_named_property(request, "method", r->method, r->method_length); 519 set_named_property(request, "url", r->target, r->target_length); 520 521 set_named_property(request, "_websocket_handshake", r->websocket_handshake); 522 523 buffer = create_buffer((size_t) req->content_length, &data); 524 nxt_unit_request_read(req, data, req->content_length); 525 526 set_named_property(request, "_data", buffer); 527 } 528 529 530 inline char 531 lowcase(char c) 532 { 533 return (c >= 'A' && c <= 'Z') ? (c | 0x20) : c; 534 } 535 536 537 inline void 538 Unit::append_header(nxt_unit_field_t *f, napi_value headers, 539 napi_value raw_headers, uint32_t idx) 540 { 541 char *name; 542 uint8_t i; 543 napi_value str, vstr; 544 545 name = (char *) nxt_unit_sptr_get(&f->name); 546 547 str = create_string_latin1(name, f->name_length); 548 549 for (i = 0; i < f->name_length; i++) { 550 name[i] = lowcase(name[i]); 551 } 552 553 vstr = set_named_property(headers, name, f->value, f->value_length); 554 555 set_element(raw_headers, idx * 2, str); 556 set_element(raw_headers, idx * 2 + 1, vstr); 557 } 558 559 560 napi_value 561 Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req) 562 { 563 napi_value constructor, res; 564 req_data_t *req_data; 565 nxt_unit_request_t *r; 566 567 r = req->request; 568 569 constructor = get_named_property(server_obj, "Socket"); 570 571 res = new_instance(constructor); 572 573 req_data = (req_data_t *) req->data; 574 req_data->sock_ref = wrap(res, req, sock_destroy); 575 576 set_named_property(res, "remoteAddress", r->remote, r->remote_length); 577 set_named_property(res, "localAddress", r->local, r->local_length); 578 579 return res; 580 } 581 582 583 napi_value 584 Unit::create_request(napi_value server_obj, napi_value socket) 585 { 586 napi_value constructor; 587 588 constructor = get_named_property(server_obj, "ServerRequest"); 589 590 return new_instance(constructor, server_obj, socket); 591 } 592 593 594 napi_value 595 Unit::create_response(napi_value server_obj, napi_value request, 596 nxt_unit_request_info_t *req) 597 { 598 napi_value constructor, res; 599 req_data_t *req_data; 600 601 constructor = get_named_property(server_obj, "ServerResponse"); 602 603 res = new_instance(constructor, request); 604 605 req_data = (req_data_t *) req->data; 606 req_data->resp_ref = wrap(res, req, resp_destroy); 607 608 return res; 609 } 610 611 612 napi_value 613 Unit::create_websocket_frame(napi_value server_obj, 614 nxt_unit_websocket_frame_t *ws) 615 { 616 void *data; 617 napi_value constructor, res, buffer; 618 uint8_t sc[2]; 619 620 constructor = get_named_property(server_obj, "WebSocketFrame"); 621 622 res = new_instance(constructor); 623 624 set_named_property(res, "fin", (bool) ws->header->fin); 625 set_named_property(res, "opcode", ws->header->opcode); 626 set_named_property(res, "length", (int64_t) ws->payload_len); 627 628 if (ws->header->opcode == NXT_WEBSOCKET_OP_CLOSE) { 629 if (ws->payload_len >= 2) { 630 nxt_unit_websocket_read(ws, sc, 2); 631 632 set_named_property(res, "closeStatus", 633 (((uint16_t) sc[0]) << 8) | sc[1]); 634 635 } else { 636 set_named_property(res, "closeStatus", -1); 637 } 638 } 639 640 buffer = create_buffer((size_t) ws->content_length, &data); 641 nxt_unit_websocket_read(ws, data, ws->content_length); 642 643 set_named_property(res, "binaryPayload", buffer); 644 645 return res; 646 } 647 648 649 napi_value 650 Unit::response_send_headers(napi_env env, napi_callback_info info) 651 { 652 int ret; 653 char *ptr, *name_ptr; 654 bool is_array; 655 size_t argc, name_len, value_len; 656 uint32_t status_code, header_len, keys_len, array_len; 657 uint32_t keys_count, i, j; 658 uint16_t hash; 659 nxt_napi napi(env); 660 napi_value this_arg, headers, keys, name, value, array_val; 661 napi_value array_entry; 662 napi_valuetype val_type; 663 nxt_unit_field_t *f; 664 nxt_unit_request_info_t *req; 665 napi_value argv[4]; 666 667 argc = 4; 668 669 try { 670 this_arg = napi.get_cb_info(info, argc, argv); 671 if (argc != 4) { 672 napi.throw_error("Wrong args count. Expected: " 673 "statusCode, headers, headers count, " 674 "headers length"); 675 return nullptr; 676 } 677 678 req = napi.get_request_info(this_arg); 679 status_code = napi.get_value_uint32(argv[0]); 680 keys_count = napi.get_value_uint32(argv[2]); 681 header_len = napi.get_value_uint32(argv[3]); 682 683 headers = argv[1]; 684 685 ret = nxt_unit_response_init(req, status_code, keys_count, header_len); 686 if (ret != NXT_UNIT_OK) { 687 napi.throw_error("Failed to create response"); 688 return nullptr; 689 } 690 691 /* 692 * Each name and value are 0-terminated by libunit. 693 * Need to add extra 2 bytes for each header. 694 */ 695 header_len += keys_count * 2; 696 697 keys = napi.get_property_names(headers); 698 keys_len = napi.get_array_length(keys); 699 700 ptr = req->response_buf->free; 701 702 for (i = 0; i < keys_len; i++) { 703 name = napi.get_element(keys, i); 704 705 array_entry = napi.get_property(headers, name); 706 707 name = napi.get_element(array_entry, 0); 708 value = napi.get_element(array_entry, 1); 709 710 name_len = napi.get_value_string_latin1(name, ptr, header_len); 711 name_ptr = ptr; 712 713 ptr += name_len + 1; 714 header_len -= name_len + 1; 715 716 hash = nxt_unit_field_hash(name_ptr, name_len); 717 718 is_array = napi.is_array(value); 719 720 if (is_array) { 721 array_len = napi.get_array_length(value); 722 723 for (j = 0; j < array_len; j++) { 724 array_val = napi.get_element(value, j); 725 726 val_type = napi.type_of(array_val); 727 728 if (val_type != napi_string) { 729 array_val = napi.coerce_to_string(array_val); 730 } 731 732 value_len = napi.get_value_string_latin1(array_val, ptr, 733 header_len); 734 735 f = req->response->fields + req->response->fields_count; 736 f->skip = 0; 737 738 nxt_unit_sptr_set(&f->name, name_ptr); 739 740 f->name_length = name_len; 741 f->hash = hash; 742 743 nxt_unit_sptr_set(&f->value, ptr); 744 f->value_length = (uint32_t) value_len; 745 746 ptr += value_len + 1; 747 header_len -= value_len + 1; 748 749 req->response->fields_count++; 750 } 751 752 } else { 753 val_type = napi.type_of(value); 754 755 if (val_type != napi_string) { 756 value = napi.coerce_to_string(value); 757 } 758 759 value_len = napi.get_value_string_latin1(value, ptr, header_len); 760 761 f = req->response->fields + req->response->fields_count; 762 f->skip = 0; 763 764 nxt_unit_sptr_set(&f->name, name_ptr); 765 766 f->name_length = name_len; 767 f->hash = hash; 768 769 nxt_unit_sptr_set(&f->value, ptr); 770 f->value_length = (uint32_t) value_len; 771 772 ptr += value_len + 1; 773 header_len -= value_len + 1; 774 775 req->response->fields_count++; 776 } 777 } 778 779 } catch (exception &e) { 780 napi.throw_error(e); 781 return nullptr; 782 } 783 784 req->response_buf->free = ptr; 785 786 ret = nxt_unit_response_send(req); 787 if (ret != NXT_UNIT_OK) { 788 napi.throw_error("Failed to send response"); 789 return nullptr; 790 } 791 792 return this_arg; 793 } 794 795 796 napi_value 797 Unit::response_write(napi_env env, napi_callback_info info) 798 { 799 int ret; 800 void *ptr; 801 size_t argc, have_buf_len; 802 ssize_t res_len; 803 uint32_t buf_start, buf_len; 804 nxt_napi napi(env); 805 napi_value this_arg; 806 nxt_unit_buf_t *buf; 807 napi_valuetype buf_type; 808 nxt_unit_request_info_t *req; 809 napi_value argv[3]; 810 811 argc = 3; 812 813 try { 814 this_arg = napi.get_cb_info(info, argc, argv); 815 if (argc != 3) { 816 throw exception("Wrong args count. Expected: " 817 "chunk, start, length"); 818 } 819 820 req = napi.get_request_info(this_arg); 821 buf_type = napi.type_of(argv[0]); 822 buf_start = napi.get_value_uint32(argv[1]); 823 buf_len = napi.get_value_uint32(argv[2]) + 1; 824 825 if (buf_type == napi_string) { 826 /* TODO: will work only for utf8 content-type */ 827 828 if (req->response_buf != NULL 829 && req->response_buf->end >= req->response_buf->free + buf_len) 830 { 831 buf = req->response_buf; 832 833 } else { 834 buf = nxt_unit_response_buf_alloc(req, buf_len); 835 if (buf == NULL) { 836 throw exception("Failed to allocate response buffer"); 837 } 838 } 839 840 have_buf_len = napi.get_value_string_utf8(argv[0], buf->free, 841 buf_len); 842 843 buf->free += have_buf_len; 844 845 ret = nxt_unit_buf_send(buf); 846 if (ret == NXT_UNIT_OK) { 847 res_len = have_buf_len; 848 } 849 850 } else { 851 ptr = napi.get_buffer_info(argv[0], have_buf_len); 852 853 if (buf_start > 0) { 854 ptr = ((uint8_t *) ptr) + buf_start; 855 have_buf_len -= buf_start; 856 } 857 858 res_len = nxt_unit_response_write_nb(req, ptr, have_buf_len, 0); 859 860 ret = res_len < 0 ? -res_len : (int) NXT_UNIT_OK; 861 } 862 863 if (ret != NXT_UNIT_OK) { 864 throw exception("Failed to send body buf"); 865 } 866 } catch (exception &e) { 867 napi.throw_error(e); 868 return nullptr; 869 } 870 871 return napi.create((int64_t) res_len); 872 } 873 874 875 napi_value 876 Unit::response_end(napi_env env, napi_callback_info info) 877 { 878 nxt_napi napi(env); 879 napi_value this_arg; 880 req_data_t *req_data; 881 nxt_unit_request_info_t *req; 882 883 try { 884 this_arg = napi.get_cb_info(info); 885 886 req = napi.get_request_info(this_arg); 887 888 req_data = (req_data_t *) req->data; 889 890 napi.remove_wrap(req_data->sock_ref); 891 napi.remove_wrap(req_data->resp_ref); 892 napi.remove_wrap(req_data->conn_ref); 893 894 } catch (exception &e) { 895 napi.throw_error(e); 896 return nullptr; 897 } 898 899 nxt_unit_request_done(req, NXT_UNIT_OK); 900 901 return this_arg; 902 } 903 904 905 napi_value 906 Unit::websocket_send_frame(napi_env env, napi_callback_info info) 907 { 908 int ret, iovec_len; 909 bool fin; 910 size_t buf_len; 911 uint32_t opcode, sc; 912 nxt_napi napi(env); 913 napi_value this_arg, frame, payload; 914 nxt_unit_request_info_t *req; 915 char status_code[2]; 916 struct iovec iov[2]; 917 918 iovec_len = 0; 919 920 try { 921 this_arg = napi.get_cb_info(info, frame); 922 923 req = napi.get_request_info(this_arg); 924 925 opcode = napi.get_value_uint32(napi.get_named_property(frame, 926 "opcode")); 927 if (opcode == NXT_WEBSOCKET_OP_CLOSE) { 928 sc = napi.get_value_uint32(napi.get_named_property(frame, 929 "closeStatus")); 930 status_code[0] = (sc >> 8) & 0xFF; 931 status_code[1] = sc & 0xFF; 932 933 iov[iovec_len].iov_base = status_code; 934 iov[iovec_len].iov_len = 2; 935 iovec_len++; 936 } 937 938 try { 939 fin = napi.get_value_bool(napi.get_named_property(frame, "fin")); 940 941 } catch (exception &e) { 942 fin = true; 943 } 944 945 payload = napi.get_named_property(frame, "binaryPayload"); 946 947 if (napi.is_buffer(payload)) { 948 iov[iovec_len].iov_base = napi.get_buffer_info(payload, buf_len); 949 950 } else { 951 buf_len = 0; 952 } 953 954 } catch (exception &e) { 955 napi.throw_error(e); 956 return nullptr; 957 } 958 959 if (buf_len > 0) { 960 iov[iovec_len].iov_len = buf_len; 961 iovec_len++; 962 } 963 964 ret = nxt_unit_websocket_sendv(req, opcode, fin ? 1 : 0, iov, iovec_len); 965 if (ret != NXT_UNIT_OK) { 966 goto failed; 967 } 968 969 return this_arg; 970 971 failed: 972 973 napi.throw_error("Failed to send frame"); 974 975 return nullptr; 976 } 977 978 979 napi_value 980 Unit::websocket_set_sock(napi_env env, napi_callback_info info) 981 { 982 nxt_napi napi(env); 983 napi_value this_arg, sock; 984 req_data_t *req_data; 985 nxt_unit_request_info_t *req; 986 987 try { 988 this_arg = napi.get_cb_info(info, sock); 989 990 req = napi.get_request_info(sock); 991 992 req_data = (req_data_t *) req->data; 993 req_data->conn_ref = napi.wrap(this_arg, req, conn_destroy); 994 995 } catch (exception &e) { 996 napi.throw_error(e); 997 return nullptr; 998 } 999 1000 return this_arg; 1001 } 1002 1003 1004 void 1005 Unit::conn_destroy(napi_env env, void *nativeObject, void *finalize_hint) 1006 { 1007 nxt_unit_request_info_t *req; 1008 1009 req = (nxt_unit_request_info_t *) nativeObject; 1010 1011 nxt_unit_warn(NULL, "conn_destroy: %p", req); 1012 } 1013 1014 1015 void 1016 Unit::sock_destroy(napi_env env, void *nativeObject, void *finalize_hint) 1017 { 1018 nxt_unit_request_info_t *req; 1019 1020 req = (nxt_unit_request_info_t *) nativeObject; 1021 1022 nxt_unit_warn(NULL, "sock_destroy: %p", req); 1023 } 1024 1025 1026 void 1027 Unit::resp_destroy(napi_env env, void *nativeObject, void *finalize_hint) 1028 { 1029 nxt_unit_request_info_t *req; 1030 1031 req = (nxt_unit_request_info_t *) nativeObject; 1032 1033 nxt_unit_warn(NULL, "resp_destroy: %p", req); 1034 } 1035