1 2 /* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6 #include "unit.h" 7 8 #include <unistd.h> 9 #include <fcntl.h> 10 11 #include <uv.h> 12 13 #include <nxt_unit_websocket.h> 14 15 16 static void delete_port_data(uv_handle_t* handle); 17 18 napi_ref Unit::constructor_; 19 20 21 struct port_data_t { 22 nxt_unit_ctx_t *ctx; 23 nxt_unit_port_t *port; 24 uv_poll_t poll; 25 }; 26 27 28 struct req_data_t { 29 napi_ref sock_ref; 30 napi_ref req_ref; 31 napi_ref resp_ref; 32 napi_ref conn_ref; 33 }; 34 35 36 Unit::Unit(napi_env env, napi_value jsthis): 37 nxt_napi(env), 38 wrapper_(wrap(jsthis, this, destroy)), 39 unit_ctx_(nullptr) 40 { 41 nxt_unit_debug(NULL, "Unit::Unit()"); 42 } 43 44 45 Unit::~Unit() 46 { 47 delete_reference(wrapper_); 48 49 nxt_unit_debug(NULL, "Unit::~Unit()"); 50 } 51 52 53 napi_value 54 Unit::init(napi_env env, napi_value exports) 55 { 56 nxt_napi napi(env); 57 napi_value ctor; 58 59 napi_property_descriptor unit_props[] = { 60 { "createServer", 0, create_server, 0, 0, 0, napi_default, 0 }, 61 { "listen", 0, listen, 0, 0, 0, napi_default, 0 }, 62 }; 63 64 try { 65 ctor = napi.define_class("Unit", create, 2, unit_props); 66 constructor_ = napi.create_reference(ctor); 67 68 napi.set_named_property(exports, "Unit", ctor); 69 napi.set_named_property(exports, "request_read", request_read); 70 napi.set_named_property(exports, "response_send_headers", 71 response_send_headers); 72 napi.set_named_property(exports, "response_write", response_write); 73 napi.set_named_property(exports, "response_end", response_end); 74 napi.set_named_property(exports, "websocket_send_frame", 75 websocket_send_frame); 76 napi.set_named_property(exports, "websocket_set_sock", 77 websocket_set_sock); 78 napi.set_named_property(exports, "buf_min", nxt_unit_buf_min()); 79 napi.set_named_property(exports, "buf_max", nxt_unit_buf_max()); 80 81 } catch (exception &e) { 82 napi.throw_error(e); 83 return nullptr; 84 } 85 86 return exports; 87 } 88 89 90 void 91 Unit::destroy(napi_env env, void *nativeObject, void *finalize_hint) 92 { 93 Unit *obj = reinterpret_cast<Unit *>(nativeObject); 94 95 delete obj; 96 } 97 98 99 napi_value 100 Unit::create(napi_env env, napi_callback_info info) 101 { 102 nxt_napi napi(env); 103 napi_value target, ctor, instance, jsthis; 104 105 try { 106 target = napi.get_new_target(info); 107 108 if (target != nullptr) { 109 /* Invoked as constructor: `new Unit(...)`. */ 110 jsthis = napi.get_cb_info(info); 111 112 new Unit(env, jsthis); 113 napi.create_reference(jsthis); 114 115 return jsthis; 116 } 117 118 /* Invoked as plain function `Unit(...)`, turn into construct call. */ 119 ctor = napi.get_reference_value(constructor_); 120 instance = napi.new_instance(ctor); 121 napi.create_reference(instance); 122 123 } catch (exception &e) { 124 napi.throw_error(e); 125 return nullptr; 126 } 127 128 return instance; 129 } 130 131 132 napi_value 133 Unit::create_server(napi_env env, napi_callback_info info) 134 { 135 Unit *obj; 136 size_t argc; 137 nxt_napi napi(env); 138 napi_value jsthis, argv; 139 nxt_unit_init_t unit_init; 140 141 argc = 1; 142 143 try { 144 jsthis = napi.get_cb_info(info, argc, &argv); 145 obj = (Unit *) napi.unwrap(jsthis); 146 147 } catch (exception &e) { 148 napi.throw_error(e); 149 return nullptr; 150 } 151 152 memset(&unit_init, 0, sizeof(nxt_unit_init_t)); 153 154 unit_init.data = obj; 155 unit_init.callbacks.request_handler = request_handler_cb; 156 unit_init.callbacks.websocket_handler = websocket_handler_cb; 157 unit_init.callbacks.close_handler = close_handler_cb; 158 unit_init.callbacks.shm_ack_handler = shm_ack_handler_cb; 159 unit_init.callbacks.add_port = add_port; 160 unit_init.callbacks.remove_port = remove_port; 161 unit_init.callbacks.quit = quit_cb; 162 163 unit_init.request_data_size = sizeof(req_data_t); 164 165 obj->unit_ctx_ = nxt_unit_init(&unit_init); 166 if (obj->unit_ctx_ == NULL) { 167 goto failed; 168 } 169 170 return nullptr; 171 172 failed: 173 174 napi_throw_error(env, NULL, "Failed to create Unit object"); 175 176 return nullptr; 177 } 178 179 180 napi_value 181 Unit::listen(napi_env env, napi_callback_info info) 182 { 183 return nullptr; 184 } 185 186 187 void 188 Unit::request_handler_cb(nxt_unit_request_info_t *req) 189 { 190 Unit *obj; 191 192 obj = reinterpret_cast<Unit *>(req->unit->data); 193 194 obj->request_handler(req); 195 } 196 197 198 void 199 Unit::request_handler(nxt_unit_request_info_t *req) 200 { 201 napi_value socket, request, response, server_obj, emit_request; 202 203 memset(req->data, 0, sizeof(req_data_t)); 204 205 try { 206 nxt_handle_scope scope(env()); 207 208 server_obj = get_server_object(); 209 210 socket = create_socket(server_obj, req); 211 request = create_request(server_obj, socket, req); 212 response = create_response(server_obj, request, req); 213 214 create_headers(req, request); 215 216 emit_request = get_named_property(server_obj, "emit_request"); 217 218 nxt_async_context async_context(env(), "request_handler"); 219 nxt_callback_scope async_scope(async_context); 220 221 make_callback(async_context, server_obj, emit_request, request, 222 response); 223 224 } catch (exception &e) { 225 nxt_unit_req_warn(req, "request_handler: %s", e.str); 226 } 227 } 228 229 230 void 231 Unit::websocket_handler_cb(nxt_unit_websocket_frame_t *ws) 232 { 233 Unit *obj; 234 235 obj = reinterpret_cast<Unit *>(ws->req->unit->data); 236 237 obj->websocket_handler(ws); 238 } 239 240 241 void 242 Unit::websocket_handler(nxt_unit_websocket_frame_t *ws) 243 { 244 napi_value frame, server_obj, process_frame, conn; 245 req_data_t *req_data; 246 247 req_data = (req_data_t *) ws->req->data; 248 249 try { 250 nxt_handle_scope scope(env()); 251 252 server_obj = get_server_object(); 253 254 frame = create_websocket_frame(server_obj, ws); 255 256 conn = get_reference_value(req_data->conn_ref); 257 258 process_frame = get_named_property(conn, "processFrame"); 259 260 nxt_async_context async_context(env(), "websocket_handler"); 261 nxt_callback_scope async_scope(async_context); 262 263 make_callback(async_context, conn, process_frame, frame); 264 265 } catch (exception &e) { 266 nxt_unit_req_warn(ws->req, "websocket_handler: %s", e.str); 267 } 268 269 nxt_unit_websocket_done(ws); 270 } 271 272 273 void 274 Unit::close_handler_cb(nxt_unit_request_info_t *req) 275 { 276 Unit *obj; 277 278 obj = reinterpret_cast<Unit *>(req->unit->data); 279 280 obj->close_handler(req); 281 } 282 283 284 void 285 Unit::close_handler(nxt_unit_request_info_t *req) 286 { 287 napi_value conn_handle_close, conn; 288 req_data_t *req_data; 289 290 req_data = (req_data_t *) req->data; 291 292 try { 293 nxt_handle_scope scope(env()); 294 295 conn = get_reference_value(req_data->conn_ref); 296 297 conn_handle_close = get_named_property(conn, "handleSocketClose"); 298 299 nxt_async_context async_context(env(), "close_handler"); 300 nxt_callback_scope async_scope(async_context); 301 302 make_callback(async_context, conn, conn_handle_close, 303 nxt_napi::create(0)); 304 305 remove_wrap(req_data->sock_ref); 306 remove_wrap(req_data->req_ref); 307 remove_wrap(req_data->resp_ref); 308 remove_wrap(req_data->conn_ref); 309 310 } catch (exception &e) { 311 nxt_unit_req_warn(req, "close_handler: %s", e.str); 312 313 nxt_unit_request_done(req, NXT_UNIT_ERROR); 314 315 return; 316 } 317 318 nxt_unit_request_done(req, NXT_UNIT_OK); 319 } 320 321 322 void 323 Unit::shm_ack_handler_cb(nxt_unit_ctx_t *ctx) 324 { 325 Unit *obj; 326 327 obj = reinterpret_cast<Unit *>(ctx->unit->data); 328 329 obj->shm_ack_handler(ctx); 330 } 331 332 333 void 334 Unit::shm_ack_handler(nxt_unit_ctx_t *ctx) 335 { 336 napi_value server_obj, emit_drain; 337 338 try { 339 nxt_handle_scope scope(env()); 340 341 server_obj = get_server_object(); 342 343 emit_drain = get_named_property(server_obj, "emit_drain"); 344 345 nxt_async_context async_context(env(), "shm_ack_handler"); 346 nxt_callback_scope async_scope(async_context); 347 348 make_callback(async_context, server_obj, emit_drain); 349 350 } catch (exception &e) { 351 nxt_unit_warn(ctx, "shm_ack_handler: %s", e.str); 352 } 353 } 354 355 356 static void 357 nxt_uv_read_callback(uv_poll_t *handle, int status, int events) 358 { 359 port_data_t *data; 360 361 data = (port_data_t *) handle->data; 362 363 nxt_unit_process_port_msg(data->ctx, data->port); 364 } 365 366 367 int 368 Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 369 { 370 int err; 371 Unit *obj; 372 uv_loop_t *loop; 373 port_data_t *data; 374 napi_status status; 375 376 if (port->in_fd != -1) { 377 obj = reinterpret_cast<Unit *>(ctx->unit->data); 378 379 if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) { 380 nxt_unit_warn(ctx, "fcntl(%d, O_NONBLOCK) failed: %s (%d)", 381 port->in_fd, strerror(errno), errno); 382 return -1; 383 } 384 385 status = napi_get_uv_event_loop(obj->env(), &loop); 386 if (status != napi_ok) { 387 nxt_unit_warn(ctx, "Failed to get uv.loop"); 388 return NXT_UNIT_ERROR; 389 } 390 391 data = new port_data_t; 392 393 err = uv_poll_init(loop, &data->poll, port->in_fd); 394 if (err < 0) { 395 nxt_unit_warn(ctx, "Failed to init uv.poll"); 396 return NXT_UNIT_ERROR; 397 } 398 399 err = uv_poll_start(&data->poll, UV_READABLE, nxt_uv_read_callback); 400 if (err < 0) { 401 nxt_unit_warn(ctx, "Failed to start uv.poll"); 402 return NXT_UNIT_ERROR; 403 } 404 405 port->data = data; 406 407 data->ctx = ctx; 408 data->port = port; 409 data->poll.data = data; 410 } 411 412 return NXT_UNIT_OK; 413 } 414 415 416 void 417 Unit::remove_port(nxt_unit_t *unit, nxt_unit_port_t *port) 418 { 419 port_data_t *data; 420 421 if (port->data != NULL) { 422 data = (port_data_t *) port->data; 423 424 if (data->port == port) { 425 uv_poll_stop(&data->poll); 426 427 uv_close((uv_handle_t *) &data->poll, delete_port_data); 428 } 429 } 430 } 431 432 433 static void 434 delete_port_data(uv_handle_t* handle) 435 { 436 port_data_t *data; 437 438 data = (port_data_t *) handle->data; 439 440 delete data; 441 } 442 443 444 void 445 Unit::quit_cb(nxt_unit_ctx_t *ctx) 446 { 447 Unit *obj; 448 449 obj = reinterpret_cast<Unit *>(ctx->unit->data); 450 451 obj->quit(ctx); 452 } 453 454 455 void 456 Unit::quit(nxt_unit_ctx_t *ctx) 457 { 458 napi_value server_obj, emit_close; 459 460 try { 461 nxt_handle_scope scope(env()); 462 463 server_obj = get_server_object(); 464 465 emit_close = get_named_property(server_obj, "emit_close"); 466 467 nxt_async_context async_context(env(), "unit_quit"); 468 nxt_callback_scope async_scope(async_context); 469 470 make_callback(async_context, server_obj, emit_close); 471 472 } catch (exception &e) { 473 nxt_unit_debug(ctx, "quit: %s", e.str); 474 } 475 476 nxt_unit_done(ctx); 477 } 478 479 480 napi_value 481 Unit::get_server_object() 482 { 483 napi_value unit_obj; 484 485 unit_obj = get_reference_value(wrapper_); 486 487 return get_named_property(unit_obj, "server"); 488 } 489 490 491 void 492 Unit::create_headers(nxt_unit_request_info_t *req, napi_value request) 493 { 494 uint32_t i; 495 napi_value headers, raw_headers; 496 napi_status status; 497 nxt_unit_request_t *r; 498 499 r = req->request; 500 501 headers = create_object(); 502 503 status = napi_create_array_with_length(env(), r->fields_count * 2, 504 &raw_headers); 505 if (status != napi_ok) { 506 throw exception("Failed to create array"); 507 } 508 509 for (i = 0; i < r->fields_count; i++) { 510 append_header(r->fields + i, headers, raw_headers, i); 511 } 512 513 set_named_property(request, "headers", headers); 514 set_named_property(request, "rawHeaders", raw_headers); 515 set_named_property(request, "httpVersion", r->version, r->version_length); 516 set_named_property(request, "method", r->method, r->method_length); 517 set_named_property(request, "url", r->target, r->target_length); 518 519 set_named_property(request, "_websocket_handshake", r->websocket_handshake); 520 } 521 522 523 inline char 524 lowcase(char c) 525 { 526 return (c >= 'A' && c <= 'Z') ? (c | 0x20) : c; 527 } 528 529 530 inline void 531 Unit::append_header(nxt_unit_field_t *f, napi_value headers, 532 napi_value raw_headers, uint32_t idx) 533 { 534 char *name; 535 uint8_t i; 536 napi_value str, vstr; 537 538 name = (char *) nxt_unit_sptr_get(&f->name); 539 540 str = create_string_latin1(name, f->name_length); 541 542 for (i = 0; i < f->name_length; i++) { 543 name[i] = lowcase(name[i]); 544 } 545 546 vstr = set_named_property(headers, name, f->value, f->value_length); 547 548 set_element(raw_headers, idx * 2, str); 549 set_element(raw_headers, idx * 2 + 1, vstr); 550 } 551 552 553 napi_value 554 Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req) 555 { 556 napi_value constructor, res; 557 req_data_t *req_data; 558 nxt_unit_request_t *r; 559 560 r = req->request; 561 562 constructor = get_named_property(server_obj, "Socket"); 563 564 res = new_instance(constructor); 565 566 req_data = (req_data_t *) req->data; 567 req_data->sock_ref = wrap(res, req, sock_destroy); 568 569 set_named_property(res, "remoteAddress", r->remote, r->remote_length); 570 set_named_property(res, "localAddress", r->local, r->local_length); 571 572 return res; 573 } 574 575 576 napi_value 577 Unit::create_request(napi_value server_obj, napi_value socket, 578 nxt_unit_request_info_t *req) 579 { 580 napi_value constructor, res; 581 req_data_t *req_data; 582 583 constructor = get_named_property(server_obj, "ServerRequest"); 584 585 res = new_instance(constructor, server_obj, socket); 586 587 req_data = (req_data_t *) req->data; 588 req_data->req_ref = wrap(res, req, req_destroy); 589 590 return res; 591 } 592 593 594 napi_value 595 Unit::create_response(napi_value server_obj, napi_value request, 596 nxt_unit_request_info_t *req) 597 { 598 napi_value constructor, res; 599 req_data_t *req_data; 600 601 constructor = get_named_property(server_obj, "ServerResponse"); 602 603 res = new_instance(constructor, request); 604 605 req_data = (req_data_t *) req->data; 606 req_data->resp_ref = wrap(res, req, resp_destroy); 607 608 return res; 609 } 610 611 612 napi_value 613 Unit::create_websocket_frame(napi_value server_obj, 614 nxt_unit_websocket_frame_t *ws) 615 { 616 void *data; 617 napi_value constructor, res, buffer; 618 uint8_t sc[2]; 619 620 constructor = get_named_property(server_obj, "WebSocketFrame"); 621 622 res = new_instance(constructor); 623 624 set_named_property(res, "fin", (bool) ws->header->fin); 625 set_named_property(res, "opcode", ws->header->opcode); 626 set_named_property(res, "length", (int64_t) ws->payload_len); 627 628 if (ws->header->opcode == NXT_WEBSOCKET_OP_CLOSE) { 629 if (ws->payload_len >= 2) { 630 nxt_unit_websocket_read(ws, sc, 2); 631 632 set_named_property(res, "closeStatus", 633 (((uint16_t) sc[0]) << 8) | sc[1]); 634 635 } else { 636 set_named_property(res, "closeStatus", -1); 637 } 638 } 639 640 buffer = create_buffer((size_t) ws->content_length, &data); 641 nxt_unit_websocket_read(ws, data, ws->content_length); 642 643 set_named_property(res, "binaryPayload", buffer); 644 645 return res; 646 } 647 648 649 napi_value 650 Unit::request_read(napi_env env, napi_callback_info info) 651 { 652 void *data; 653 uint32_t wm; 654 nxt_napi napi(env); 655 napi_value this_arg, argv, buffer; 656 nxt_unit_request_info_t *req; 657 658 try { 659 this_arg = napi.get_cb_info(info, argv); 660 661 try { 662 req = napi.get_request_info(this_arg); 663 664 } catch (exception &e) { 665 return nullptr; 666 } 667 668 if (req->content_length == 0) { 669 return nullptr; 670 } 671 672 wm = napi.get_value_uint32(argv); 673 674 if (wm > req->content_length) { 675 wm = req->content_length; 676 } 677 678 buffer = napi.create_buffer((size_t) wm, &data); 679 nxt_unit_request_read(req, data, wm); 680 681 } catch (exception &e) { 682 napi.throw_error(e); 683 return nullptr; 684 } 685 686 return buffer; 687 } 688 689 690 napi_value 691 Unit::response_send_headers(napi_env env, napi_callback_info info) 692 { 693 int ret; 694 char *ptr, *name_ptr; 695 bool is_array; 696 size_t argc, name_len, value_len; 697 uint32_t status_code, header_len, keys_len, array_len; 698 uint32_t keys_count, i, j; 699 uint16_t hash; 700 nxt_napi napi(env); 701 napi_value this_arg, headers, keys, name, value, array_val; 702 napi_value array_entry; 703 napi_valuetype val_type; 704 nxt_unit_field_t *f; 705 nxt_unit_request_info_t *req; 706 napi_value argv[4]; 707 708 argc = 4; 709 710 try { 711 this_arg = napi.get_cb_info(info, argc, argv); 712 if (argc != 4) { 713 napi.throw_error("Wrong args count. Expected: " 714 "statusCode, headers, headers count, " 715 "headers length"); 716 return nullptr; 717 } 718 719 req = napi.get_request_info(this_arg); 720 status_code = napi.get_value_uint32(argv[0]); 721 keys_count = napi.get_value_uint32(argv[2]); 722 header_len = napi.get_value_uint32(argv[3]); 723 724 headers = argv[1]; 725 726 ret = nxt_unit_response_init(req, status_code, keys_count, header_len); 727 if (ret != NXT_UNIT_OK) { 728 napi.throw_error("Failed to create response"); 729 return nullptr; 730 } 731 732 /* 733 * Each name and value are 0-terminated by libunit. 734 * Need to add extra 2 bytes for each header. 735 */ 736 header_len += keys_count * 2; 737 738 keys = napi.get_property_names(headers); 739 keys_len = napi.get_array_length(keys); 740 741 ptr = req->response_buf->free; 742 743 for (i = 0; i < keys_len; i++) { 744 name = napi.get_element(keys, i); 745 746 array_entry = napi.get_property(headers, name); 747 748 name = napi.get_element(array_entry, 0); 749 value = napi.get_element(array_entry, 1); 750 751 name_len = napi.get_value_string_latin1(name, ptr, header_len); 752 name_ptr = ptr; 753 754 ptr += name_len + 1; 755 header_len -= name_len + 1; 756 757 hash = nxt_unit_field_hash(name_ptr, name_len); 758 759 is_array = napi.is_array(value); 760 761 if (is_array) { 762 array_len = napi.get_array_length(value); 763 764 for (j = 0; j < array_len; j++) { 765 array_val = napi.get_element(value, j); 766 767 val_type = napi.type_of(array_val); 768 769 if (val_type != napi_string) { 770 array_val = napi.coerce_to_string(array_val); 771 } 772 773 value_len = napi.get_value_string_latin1(array_val, ptr, 774 header_len); 775 776 f = req->response->fields + req->response->fields_count; 777 f->skip = 0; 778 779 nxt_unit_sptr_set(&f->name, name_ptr); 780 781 f->name_length = name_len; 782 f->hash = hash; 783 784 nxt_unit_sptr_set(&f->value, ptr); 785 f->value_length = (uint32_t) value_len; 786 787 ptr += value_len + 1; 788 header_len -= value_len + 1; 789 790 req->response->fields_count++; 791 } 792 793 } else { 794 val_type = napi.type_of(value); 795 796 if (val_type != napi_string) { 797 value = napi.coerce_to_string(value); 798 } 799 800 value_len = napi.get_value_string_latin1(value, ptr, header_len); 801 802 f = req->response->fields + req->response->fields_count; 803 f->skip = 0; 804 805 nxt_unit_sptr_set(&f->name, name_ptr); 806 807 f->name_length = name_len; 808 f->hash = hash; 809 810 nxt_unit_sptr_set(&f->value, ptr); 811 f->value_length = (uint32_t) value_len; 812 813 ptr += value_len + 1; 814 header_len -= value_len + 1; 815 816 req->response->fields_count++; 817 } 818 } 819 820 } catch (exception &e) { 821 napi.throw_error(e); 822 return nullptr; 823 } 824 825 req->response_buf->free = ptr; 826 827 ret = nxt_unit_response_send(req); 828 if (ret != NXT_UNIT_OK) { 829 napi.throw_error("Failed to send response"); 830 return nullptr; 831 } 832 833 return this_arg; 834 } 835 836 837 napi_value 838 Unit::response_write(napi_env env, napi_callback_info info) 839 { 840 int ret; 841 void *ptr; 842 size_t argc, have_buf_len; 843 ssize_t res_len; 844 uint32_t buf_start, buf_len; 845 nxt_napi napi(env); 846 napi_value this_arg; 847 nxt_unit_buf_t *buf; 848 napi_valuetype buf_type; 849 nxt_unit_request_info_t *req; 850 napi_value argv[3]; 851 852 argc = 3; 853 854 try { 855 this_arg = napi.get_cb_info(info, argc, argv); 856 if (argc != 3) { 857 throw exception("Wrong args count. Expected: " 858 "chunk, start, length"); 859 } 860 861 req = napi.get_request_info(this_arg); 862 buf_type = napi.type_of(argv[0]); 863 buf_start = napi.get_value_uint32(argv[1]); 864 buf_len = napi.get_value_uint32(argv[2]) + 1; 865 866 if (buf_type == napi_string) { 867 /* TODO: will work only for utf8 content-type */ 868 869 if (req->response_buf != NULL 870 && req->response_buf->end >= req->response_buf->free + buf_len) 871 { 872 buf = req->response_buf; 873 874 } else { 875 buf = nxt_unit_response_buf_alloc(req, buf_len); 876 if (buf == NULL) { 877 throw exception("Failed to allocate response buffer"); 878 } 879 } 880 881 have_buf_len = napi.get_value_string_utf8(argv[0], buf->free, 882 buf_len); 883 884 buf->free += have_buf_len; 885 886 ret = nxt_unit_buf_send(buf); 887 if (ret == NXT_UNIT_OK) { 888 res_len = have_buf_len; 889 } 890 891 } else { 892 ptr = napi.get_buffer_info(argv[0], have_buf_len); 893 894 if (buf_start > 0) { 895 ptr = ((uint8_t *) ptr) + buf_start; 896 have_buf_len -= buf_start; 897 } 898 899 res_len = nxt_unit_response_write_nb(req, ptr, have_buf_len, 0); 900 901 ret = res_len < 0 ? -res_len : (int) NXT_UNIT_OK; 902 } 903 904 if (ret != NXT_UNIT_OK) { 905 throw exception("Failed to send body buf"); 906 } 907 } catch (exception &e) { 908 napi.throw_error(e); 909 return nullptr; 910 } 911 912 return napi.create((int64_t) res_len); 913 } 914 915 916 napi_value 917 Unit::response_end(napi_env env, napi_callback_info info) 918 { 919 nxt_napi napi(env); 920 napi_value this_arg; 921 req_data_t *req_data; 922 nxt_unit_request_info_t *req; 923 924 try { 925 this_arg = napi.get_cb_info(info); 926 927 req = napi.get_request_info(this_arg); 928 929 req_data = (req_data_t *) req->data; 930 931 napi.remove_wrap(req_data->sock_ref); 932 napi.remove_wrap(req_data->req_ref); 933 napi.remove_wrap(req_data->resp_ref); 934 napi.remove_wrap(req_data->conn_ref); 935 936 } catch (exception &e) { 937 napi.throw_error(e); 938 return nullptr; 939 } 940 941 nxt_unit_request_done(req, NXT_UNIT_OK); 942 943 return this_arg; 944 } 945 946 947 napi_value 948 Unit::websocket_send_frame(napi_env env, napi_callback_info info) 949 { 950 int ret, iovec_len; 951 bool fin; 952 size_t buf_len; 953 uint32_t opcode, sc; 954 nxt_napi napi(env); 955 napi_value this_arg, frame, payload; 956 nxt_unit_request_info_t *req; 957 char status_code[2]; 958 struct iovec iov[2]; 959 960 iovec_len = 0; 961 962 try { 963 this_arg = napi.get_cb_info(info, frame); 964 965 req = napi.get_request_info(this_arg); 966 967 opcode = napi.get_value_uint32(napi.get_named_property(frame, 968 "opcode")); 969 if (opcode == NXT_WEBSOCKET_OP_CLOSE) { 970 sc = napi.get_value_uint32(napi.get_named_property(frame, 971 "closeStatus")); 972 status_code[0] = (sc >> 8) & 0xFF; 973 status_code[1] = sc & 0xFF; 974 975 iov[iovec_len].iov_base = status_code; 976 iov[iovec_len].iov_len = 2; 977 iovec_len++; 978 } 979 980 try { 981 fin = napi.get_value_bool(napi.get_named_property(frame, "fin")); 982 983 } catch (exception &e) { 984 fin = true; 985 } 986 987 payload = napi.get_named_property(frame, "binaryPayload"); 988 989 if (napi.is_buffer(payload)) { 990 iov[iovec_len].iov_base = napi.get_buffer_info(payload, buf_len); 991 992 } else { 993 buf_len = 0; 994 } 995 996 } catch (exception &e) { 997 napi.throw_error(e); 998 return nullptr; 999 } 1000 1001 if (buf_len > 0) { 1002 iov[iovec_len].iov_len = buf_len; 1003 iovec_len++; 1004 } 1005 1006 ret = nxt_unit_websocket_sendv(req, opcode, fin ? 1 : 0, iov, iovec_len); 1007 if (ret != NXT_UNIT_OK) { 1008 goto failed; 1009 } 1010 1011 return this_arg; 1012 1013 failed: 1014 1015 napi.throw_error("Failed to send frame"); 1016 1017 return nullptr; 1018 } 1019 1020 1021 napi_value 1022 Unit::websocket_set_sock(napi_env env, napi_callback_info info) 1023 { 1024 nxt_napi napi(env); 1025 napi_value this_arg, sock; 1026 req_data_t *req_data; 1027 nxt_unit_request_info_t *req; 1028 1029 try { 1030 this_arg = napi.get_cb_info(info, sock); 1031 1032 req = napi.get_request_info(sock); 1033 1034 req_data = (req_data_t *) req->data; 1035 req_data->conn_ref = napi.wrap(this_arg, req, conn_destroy); 1036 1037 } catch (exception &e) { 1038 napi.throw_error(e); 1039 return nullptr; 1040 } 1041 1042 return this_arg; 1043 } 1044 1045 1046 void 1047 Unit::conn_destroy(napi_env env, void *r, void *finalize_hint) 1048 { 1049 nxt_unit_req_debug(NULL, "conn_destroy: %p", r); 1050 } 1051 1052 1053 void 1054 Unit::sock_destroy(napi_env env, void *r, void *finalize_hint) 1055 { 1056 nxt_unit_req_debug(NULL, "sock_destroy: %p", r); 1057 } 1058 1059 1060 void 1061 Unit::req_destroy(napi_env env, void *r, void *finalize_hint) 1062 { 1063 nxt_unit_req_debug(NULL, "req_destroy: %p", r); 1064 } 1065 1066 1067 void 1068 Unit::resp_destroy(napi_env env, void *r, void *finalize_hint) 1069 { 1070 nxt_unit_req_debug(NULL, "resp_destroy: %p", r); 1071 } 1072