1 2 /* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6 #include "unit.h" 7 8 #include <unistd.h> 9 #include <fcntl.h> 10 11 #include <uv.h> 12 13 #include <nxt_unit_websocket.h> 14 15 16 napi_ref Unit::constructor_; 17 18 19 struct port_data_t { 20 port_data_t(nxt_unit_ctx_t *c, nxt_unit_port_t *p); 21 22 void process_port_msg(); 23 void stop(); 24 25 template<typename T> 26 static port_data_t *get(T *handle); 27 28 static void read_callback(uv_poll_t *handle, int status, int events); 29 static void timer_callback(uv_timer_t *handle); 30 static void delete_data(uv_handle_t* handle); 31 32 nxt_unit_ctx_t *ctx; 33 nxt_unit_port_t *port; 34 uv_poll_t poll; 35 uv_timer_t timer; 36 int ref_count; 37 bool scheduled; 38 bool stopped; 39 }; 40 41 42 struct req_data_t { 43 napi_ref sock_ref; 44 napi_ref req_ref; 45 napi_ref resp_ref; 46 napi_ref conn_ref; 47 }; 48 49 50 port_data_t::port_data_t(nxt_unit_ctx_t *c, nxt_unit_port_t *p) : 51 ctx(c), port(p), ref_count(0), scheduled(false), stopped(false) 52 { 53 timer.type = UV_UNKNOWN_HANDLE; 54 } 55 56 57 void 58 port_data_t::process_port_msg() 59 { 60 int rc, err; 61 62 rc = nxt_unit_process_port_msg(ctx, port); 63 64 if (rc != NXT_UNIT_OK) { 65 return; 66 } 67 68 if (timer.type == UV_UNKNOWN_HANDLE) { 69 err = uv_timer_init(poll.loop, &timer); 70 if (err < 0) { 71 nxt_unit_warn(ctx, "Failed to init uv.poll"); 72 return; 73 } 74 75 ref_count++; 76 timer.data = this; 77 } 78 79 if (!scheduled && !stopped) { 80 uv_timer_start(&timer, timer_callback, 0, 0); 81 82 scheduled = true; 83 } 84 } 85 86 87 void 88 port_data_t::stop() 89 { 90 stopped = true; 91 92 uv_poll_stop(&poll); 93 94 uv_close((uv_handle_t *) &poll, delete_data); 95 96 if (timer.type == UV_UNKNOWN_HANDLE) { 97 return; 98 } 99 100 uv_timer_stop(&timer); 101 102 uv_close((uv_handle_t *) &timer, delete_data); 103 } 104 105 106 template<typename T> 107 port_data_t * 108 port_data_t::get(T *handle) 109 { 110 return (port_data_t *) handle->data; 111 } 112 113 114 void 115 port_data_t::read_callback(uv_poll_t *handle, int status, int events) 116 { 117 get(handle)->process_port_msg(); 118 } 119 120 121 void 122 port_data_t::timer_callback(uv_timer_t *handle) 123 { 124 port_data_t *data; 125 126 data = get(handle); 127 128 data->scheduled = false; 129 if (data->stopped) { 130 return; 131 } 132 133 data->process_port_msg(); 134 } 135 136 137 void 138 port_data_t::delete_data(uv_handle_t* handle) 139 { 140 port_data_t *data; 141 142 data = get(handle); 143 144 if (--data->ref_count <= 0) { 145 delete data; 146 } 147 } 148 149 150 Unit::Unit(napi_env env, napi_value jsthis): 151 nxt_napi(env), 152 wrapper_(wrap(jsthis, this, destroy)), 153 unit_ctx_(nullptr) 154 { 155 nxt_unit_debug(NULL, "Unit::Unit()"); 156 } 157 158 159 Unit::~Unit() 160 { 161 delete_reference(wrapper_); 162 163 nxt_unit_debug(NULL, "Unit::~Unit()"); 164 } 165 166 167 napi_value 168 Unit::init(napi_env env, napi_value exports) 169 { 170 nxt_napi napi(env); 171 napi_value ctor; 172 173 napi_property_descriptor unit_props[] = { 174 { "createServer", 0, create_server, 0, 0, 0, napi_default, 0 }, 175 { "listen", 0, listen, 0, 0, 0, napi_default, 0 }, 176 }; 177 178 try { 179 ctor = napi.define_class("Unit", create, 2, unit_props); 180 constructor_ = napi.create_reference(ctor); 181 182 napi.set_named_property(exports, "Unit", ctor); 183 napi.set_named_property(exports, "request_read", request_read); 184 napi.set_named_property(exports, "response_send_headers", 185 response_send_headers); 186 napi.set_named_property(exports, "response_write", response_write); 187 napi.set_named_property(exports, "response_end", response_end); 188 napi.set_named_property(exports, "websocket_send_frame", 189 websocket_send_frame); 190 napi.set_named_property(exports, "websocket_set_sock", 191 websocket_set_sock); 192 napi.set_named_property(exports, "buf_min", nxt_unit_buf_min()); 193 napi.set_named_property(exports, "buf_max", nxt_unit_buf_max()); 194 195 } catch (exception &e) { 196 napi.throw_error(e); 197 return nullptr; 198 } 199 200 return exports; 201 } 202 203 204 void 205 Unit::destroy(napi_env env, void *nativeObject, void *finalize_hint) 206 { 207 Unit *obj = reinterpret_cast<Unit *>(nativeObject); 208 209 delete obj; 210 } 211 212 213 napi_value 214 Unit::create(napi_env env, napi_callback_info info) 215 { 216 nxt_napi napi(env); 217 napi_value target, ctor, instance, jsthis; 218 219 try { 220 target = napi.get_new_target(info); 221 222 if (target != nullptr) { 223 /* Invoked as constructor: `new Unit(...)`. */ 224 jsthis = napi.get_cb_info(info); 225 226 new Unit(env, jsthis); 227 napi.create_reference(jsthis); 228 229 return jsthis; 230 } 231 232 /* Invoked as plain function `Unit(...)`, turn into construct call. */ 233 ctor = napi.get_reference_value(constructor_); 234 instance = napi.new_instance(ctor); 235 napi.create_reference(instance); 236 237 } catch (exception &e) { 238 napi.throw_error(e); 239 return nullptr; 240 } 241 242 return instance; 243 } 244 245 246 napi_value 247 Unit::create_server(napi_env env, napi_callback_info info) 248 { 249 Unit *obj; 250 size_t argc; 251 nxt_napi napi(env); 252 napi_value jsthis, argv; 253 nxt_unit_init_t unit_init; 254 255 argc = 1; 256 257 try { 258 jsthis = napi.get_cb_info(info, argc, &argv); 259 obj = (Unit *) napi.unwrap(jsthis); 260 261 } catch (exception &e) { 262 napi.throw_error(e); 263 return nullptr; 264 } 265 266 memset(&unit_init, 0, sizeof(nxt_unit_init_t)); 267 268 unit_init.data = obj; 269 unit_init.callbacks.request_handler = request_handler_cb; 270 unit_init.callbacks.websocket_handler = websocket_handler_cb; 271 unit_init.callbacks.close_handler = close_handler_cb; 272 unit_init.callbacks.shm_ack_handler = shm_ack_handler_cb; 273 unit_init.callbacks.add_port = add_port; 274 unit_init.callbacks.remove_port = remove_port; 275 unit_init.callbacks.quit = quit_cb; 276 277 unit_init.request_data_size = sizeof(req_data_t); 278 279 obj->unit_ctx_ = nxt_unit_init(&unit_init); 280 if (obj->unit_ctx_ == NULL) { 281 goto failed; 282 } 283 284 return nullptr; 285 286 failed: 287 288 napi_throw_error(env, NULL, "Failed to create Unit object"); 289 290 return nullptr; 291 } 292 293 294 napi_value 295 Unit::listen(napi_env env, napi_callback_info info) 296 { 297 return nullptr; 298 } 299 300 301 void 302 Unit::request_handler_cb(nxt_unit_request_info_t *req) 303 { 304 Unit *obj; 305 306 obj = reinterpret_cast<Unit *>(req->unit->data); 307 308 obj->request_handler(req); 309 } 310 311 312 void 313 Unit::request_handler(nxt_unit_request_info_t *req) 314 { 315 napi_value socket, request, response, server_obj, emit_request; 316 317 memset(req->data, 0, sizeof(req_data_t)); 318 319 try { 320 nxt_handle_scope scope(env()); 321 322 server_obj = get_server_object(); 323 324 socket = create_socket(server_obj, req); 325 request = create_request(server_obj, socket, req); 326 response = create_response(server_obj, request, req); 327 328 create_headers(req, request); 329 330 emit_request = get_named_property(server_obj, "emit_request"); 331 332 nxt_async_context async_context(env(), "request_handler"); 333 nxt_callback_scope async_scope(async_context); 334 335 make_callback(async_context, server_obj, emit_request, request, 336 response); 337 338 } catch (exception &e) { 339 nxt_unit_req_warn(req, "request_handler: %s", e.str); 340 } 341 } 342 343 344 void 345 Unit::websocket_handler_cb(nxt_unit_websocket_frame_t *ws) 346 { 347 Unit *obj; 348 349 obj = reinterpret_cast<Unit *>(ws->req->unit->data); 350 351 obj->websocket_handler(ws); 352 } 353 354 355 void 356 Unit::websocket_handler(nxt_unit_websocket_frame_t *ws) 357 { 358 napi_value frame, server_obj, process_frame, conn; 359 req_data_t *req_data; 360 361 req_data = (req_data_t *) ws->req->data; 362 363 try { 364 nxt_handle_scope scope(env()); 365 366 server_obj = get_server_object(); 367 368 frame = create_websocket_frame(server_obj, ws); 369 370 conn = get_reference_value(req_data->conn_ref); 371 372 process_frame = get_named_property(conn, "processFrame"); 373 374 nxt_async_context async_context(env(), "websocket_handler"); 375 nxt_callback_scope async_scope(async_context); 376 377 make_callback(async_context, conn, process_frame, frame); 378 379 } catch (exception &e) { 380 nxt_unit_req_warn(ws->req, "websocket_handler: %s", e.str); 381 } 382 383 nxt_unit_websocket_done(ws); 384 } 385 386 387 void 388 Unit::close_handler_cb(nxt_unit_request_info_t *req) 389 { 390 Unit *obj; 391 392 obj = reinterpret_cast<Unit *>(req->unit->data); 393 394 obj->close_handler(req); 395 } 396 397 398 void 399 Unit::close_handler(nxt_unit_request_info_t *req) 400 { 401 napi_value conn_handle_close, conn; 402 req_data_t *req_data; 403 404 req_data = (req_data_t *) req->data; 405 406 try { 407 nxt_handle_scope scope(env()); 408 409 conn = get_reference_value(req_data->conn_ref); 410 411 conn_handle_close = get_named_property(conn, "handleSocketClose"); 412 413 nxt_async_context async_context(env(), "close_handler"); 414 nxt_callback_scope async_scope(async_context); 415 416 make_callback(async_context, conn, conn_handle_close, 417 nxt_napi::create(0)); 418 419 remove_wrap(req_data->sock_ref); 420 remove_wrap(req_data->req_ref); 421 remove_wrap(req_data->resp_ref); 422 remove_wrap(req_data->conn_ref); 423 424 } catch (exception &e) { 425 nxt_unit_req_warn(req, "close_handler: %s", e.str); 426 427 nxt_unit_request_done(req, NXT_UNIT_ERROR); 428 429 return; 430 } 431 432 nxt_unit_request_done(req, NXT_UNIT_OK); 433 } 434 435 436 void 437 Unit::shm_ack_handler_cb(nxt_unit_ctx_t *ctx) 438 { 439 Unit *obj; 440 441 obj = reinterpret_cast<Unit *>(ctx->unit->data); 442 443 obj->shm_ack_handler(ctx); 444 } 445 446 447 void 448 Unit::shm_ack_handler(nxt_unit_ctx_t *ctx) 449 { 450 napi_value server_obj, emit_drain; 451 452 try { 453 nxt_handle_scope scope(env()); 454 455 server_obj = get_server_object(); 456 457 emit_drain = get_named_property(server_obj, "emit_drain"); 458 459 nxt_async_context async_context(env(), "shm_ack_handler"); 460 nxt_callback_scope async_scope(async_context); 461 462 make_callback(async_context, server_obj, emit_drain); 463 464 } catch (exception &e) { 465 nxt_unit_warn(ctx, "shm_ack_handler: %s", e.str); 466 } 467 } 468 469 470 int 471 Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 472 { 473 int err; 474 Unit *obj; 475 uv_loop_t *loop; 476 port_data_t *data; 477 napi_status status; 478 479 if (port->in_fd != -1) { 480 if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) { 481 nxt_unit_warn(ctx, "fcntl(%d, O_NONBLOCK) failed: %s (%d)", 482 port->in_fd, strerror(errno), errno); 483 return -1; 484 } 485 486 obj = reinterpret_cast<Unit *>(ctx->unit->data); 487 488 status = napi_get_uv_event_loop(obj->env(), &loop); 489 if (status != napi_ok) { 490 nxt_unit_warn(ctx, "Failed to get uv.loop"); 491 return NXT_UNIT_ERROR; 492 } 493 494 data = new port_data_t(ctx, port); 495 496 err = uv_poll_init(loop, &data->poll, port->in_fd); 497 if (err < 0) { 498 nxt_unit_warn(ctx, "Failed to init uv.poll"); 499 delete data; 500 return NXT_UNIT_ERROR; 501 } 502 503 err = uv_poll_start(&data->poll, UV_READABLE, 504 port_data_t::read_callback); 505 if (err < 0) { 506 nxt_unit_warn(ctx, "Failed to start uv.poll"); 507 delete data; 508 return NXT_UNIT_ERROR; 509 } 510 511 port->data = data; 512 513 data->ref_count++; 514 data->poll.data = data; 515 } 516 517 return NXT_UNIT_OK; 518 } 519 520 521 void 522 Unit::remove_port(nxt_unit_t *unit, nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 523 { 524 port_data_t *data; 525 526 if (port->data != NULL && ctx != NULL) { 527 data = (port_data_t *) port->data; 528 529 data->stop(); 530 } 531 } 532 533 534 void 535 Unit::quit_cb(nxt_unit_ctx_t *ctx) 536 { 537 Unit *obj; 538 539 obj = reinterpret_cast<Unit *>(ctx->unit->data); 540 541 obj->quit(ctx); 542 } 543 544 545 void 546 Unit::quit(nxt_unit_ctx_t *ctx) 547 { 548 napi_value server_obj, emit_close; 549 550 try { 551 nxt_handle_scope scope(env()); 552 553 server_obj = get_server_object(); 554 555 emit_close = get_named_property(server_obj, "emit_close"); 556 557 nxt_async_context async_context(env(), "unit_quit"); 558 nxt_callback_scope async_scope(async_context); 559 560 make_callback(async_context, server_obj, emit_close); 561 562 } catch (exception &e) { 563 nxt_unit_debug(ctx, "quit: %s", e.str); 564 } 565 566 nxt_unit_done(ctx); 567 } 568 569 570 napi_value 571 Unit::get_server_object() 572 { 573 napi_value unit_obj; 574 575 unit_obj = get_reference_value(wrapper_); 576 577 return get_named_property(unit_obj, "server"); 578 } 579 580 581 void 582 Unit::create_headers(nxt_unit_request_info_t *req, napi_value request) 583 { 584 uint32_t i; 585 napi_value headers, raw_headers; 586 napi_status status; 587 nxt_unit_request_t *r; 588 589 r = req->request; 590 591 headers = create_object(); 592 593 status = napi_create_array_with_length(env(), r->fields_count * 2, 594 &raw_headers); 595 if (status != napi_ok) { 596 throw exception("Failed to create array"); 597 } 598 599 for (i = 0; i < r->fields_count; i++) { 600 append_header(r->fields + i, headers, raw_headers, i); 601 } 602 603 set_named_property(request, "headers", headers); 604 set_named_property(request, "rawHeaders", raw_headers); 605 set_named_property(request, "httpVersion", r->version, r->version_length); 606 set_named_property(request, "method", r->method, r->method_length); 607 set_named_property(request, "url", r->target, r->target_length); 608 609 set_named_property(request, "_websocket_handshake", r->websocket_handshake); 610 } 611 612 613 inline char 614 lowcase(char c) 615 { 616 return (c >= 'A' && c <= 'Z') ? (c | 0x20) : c; 617 } 618 619 620 inline void 621 Unit::append_header(nxt_unit_field_t *f, napi_value headers, 622 napi_value raw_headers, uint32_t idx) 623 { 624 char *name; 625 uint8_t i; 626 napi_value str, vstr; 627 628 name = (char *) nxt_unit_sptr_get(&f->name); 629 630 str = create_string_latin1(name, f->name_length); 631 632 for (i = 0; i < f->name_length; i++) { 633 name[i] = lowcase(name[i]); 634 } 635 636 vstr = set_named_property(headers, name, f->value, f->value_length); 637 638 set_element(raw_headers, idx * 2, str); 639 set_element(raw_headers, idx * 2 + 1, vstr); 640 } 641 642 643 napi_value 644 Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req) 645 { 646 napi_value constructor, res; 647 req_data_t *req_data; 648 nxt_unit_request_t *r; 649 650 r = req->request; 651 652 constructor = get_named_property(server_obj, "Socket"); 653 654 res = new_instance(constructor); 655 656 req_data = (req_data_t *) req->data; 657 req_data->sock_ref = wrap(res, req, sock_destroy); 658 659 set_named_property(res, "remoteAddress", r->remote, r->remote_length); 660 set_named_property(res, "localAddress", r->local, r->local_length); 661 662 return res; 663 } 664 665 666 napi_value 667 Unit::create_request(napi_value server_obj, napi_value socket, 668 nxt_unit_request_info_t *req) 669 { 670 napi_value constructor, res; 671 req_data_t *req_data; 672 673 constructor = get_named_property(server_obj, "ServerRequest"); 674 675 res = new_instance(constructor, server_obj, socket); 676 677 req_data = (req_data_t *) req->data; 678 req_data->req_ref = wrap(res, req, req_destroy); 679 680 return res; 681 } 682 683 684 napi_value 685 Unit::create_response(napi_value server_obj, napi_value request, 686 nxt_unit_request_info_t *req) 687 { 688 napi_value constructor, res; 689 req_data_t *req_data; 690 691 constructor = get_named_property(server_obj, "ServerResponse"); 692 693 res = new_instance(constructor, request); 694 695 req_data = (req_data_t *) req->data; 696 req_data->resp_ref = wrap(res, req, resp_destroy); 697 698 return res; 699 } 700 701 702 napi_value 703 Unit::create_websocket_frame(napi_value server_obj, 704 nxt_unit_websocket_frame_t *ws) 705 { 706 void *data; 707 napi_value constructor, res, buffer; 708 uint8_t sc[2]; 709 710 constructor = get_named_property(server_obj, "WebSocketFrame"); 711 712 res = new_instance(constructor); 713 714 set_named_property(res, "fin", (bool) ws->header->fin); 715 set_named_property(res, "opcode", ws->header->opcode); 716 set_named_property(res, "length", (int64_t) ws->payload_len); 717 718 if (ws->header->opcode == NXT_WEBSOCKET_OP_CLOSE) { 719 if (ws->payload_len >= 2) { 720 nxt_unit_websocket_read(ws, sc, 2); 721 722 set_named_property(res, "closeStatus", 723 (((uint16_t) sc[0]) << 8) | sc[1]); 724 725 } else { 726 set_named_property(res, "closeStatus", -1); 727 } 728 } 729 730 buffer = create_buffer((size_t) ws->content_length, &data); 731 nxt_unit_websocket_read(ws, data, ws->content_length); 732 733 set_named_property(res, "binaryPayload", buffer); 734 735 return res; 736 } 737 738 739 napi_value 740 Unit::request_read(napi_env env, napi_callback_info info) 741 { 742 void *data; 743 uint32_t wm; 744 nxt_napi napi(env); 745 napi_value this_arg, argv, buffer; 746 nxt_unit_request_info_t *req; 747 748 try { 749 this_arg = napi.get_cb_info(info, argv); 750 751 try { 752 req = napi.get_request_info(this_arg); 753 754 } catch (exception &e) { 755 return nullptr; 756 } 757 758 if (req->content_length == 0) { 759 return nullptr; 760 } 761 762 wm = napi.get_value_uint32(argv); 763 764 if (wm > req->content_length) { 765 wm = req->content_length; 766 } 767 768 buffer = napi.create_buffer((size_t) wm, &data); 769 nxt_unit_request_read(req, data, wm); 770 771 } catch (exception &e) { 772 napi.throw_error(e); 773 return nullptr; 774 } 775 776 return buffer; 777 } 778 779 780 napi_value 781 Unit::response_send_headers(napi_env env, napi_callback_info info) 782 { 783 int ret; 784 char *ptr, *name_ptr; 785 bool is_array; 786 size_t argc, name_len, value_len; 787 uint32_t status_code, header_len, keys_len, array_len; 788 uint32_t keys_count, i, j; 789 uint16_t hash; 790 nxt_napi napi(env); 791 napi_value this_arg, headers, keys, name, value, array_val; 792 napi_value array_entry; 793 napi_valuetype val_type; 794 nxt_unit_field_t *f; 795 nxt_unit_request_info_t *req; 796 napi_value argv[4]; 797 798 argc = 4; 799 800 try { 801 this_arg = napi.get_cb_info(info, argc, argv); 802 if (argc != 4) { 803 napi.throw_error("Wrong args count. Expected: " 804 "statusCode, headers, headers count, " 805 "headers length"); 806 return nullptr; 807 } 808 809 req = napi.get_request_info(this_arg); 810 status_code = napi.get_value_uint32(argv[0]); 811 keys_count = napi.get_value_uint32(argv[2]); 812 header_len = napi.get_value_uint32(argv[3]); 813 814 headers = argv[1]; 815 816 ret = nxt_unit_response_init(req, status_code, keys_count, header_len); 817 if (ret != NXT_UNIT_OK) { 818 napi.throw_error("Failed to create response"); 819 return nullptr; 820 } 821 822 /* 823 * Each name and value are 0-terminated by libunit. 824 * Need to add extra 2 bytes for each header. 825 */ 826 header_len += keys_count * 2; 827 828 keys = napi.get_property_names(headers); 829 keys_len = napi.get_array_length(keys); 830 831 ptr = req->response_buf->free; 832 833 for (i = 0; i < keys_len; i++) { 834 name = napi.get_element(keys, i); 835 836 array_entry = napi.get_property(headers, name); 837 838 name = napi.get_element(array_entry, 0); 839 value = napi.get_element(array_entry, 1); 840 841 name_len = napi.get_value_string_latin1(name, ptr, header_len); 842 name_ptr = ptr; 843 844 ptr += name_len + 1; 845 header_len -= name_len + 1; 846 847 hash = nxt_unit_field_hash(name_ptr, name_len); 848 849 is_array = napi.is_array(value); 850 851 if (is_array) { 852 array_len = napi.get_array_length(value); 853 854 for (j = 0; j < array_len; j++) { 855 array_val = napi.get_element(value, j); 856 857 val_type = napi.type_of(array_val); 858 859 if (val_type != napi_string) { 860 array_val = napi.coerce_to_string(array_val); 861 } 862 863 value_len = napi.get_value_string_latin1(array_val, ptr, 864 header_len); 865 866 f = req->response->fields + req->response->fields_count; 867 f->skip = 0; 868 869 nxt_unit_sptr_set(&f->name, name_ptr); 870 871 f->name_length = name_len; 872 f->hash = hash; 873 874 nxt_unit_sptr_set(&f->value, ptr); 875 f->value_length = (uint32_t) value_len; 876 877 ptr += value_len + 1; 878 header_len -= value_len + 1; 879 880 req->response->fields_count++; 881 } 882 883 } else { 884 val_type = napi.type_of(value); 885 886 if (val_type != napi_string) { 887 value = napi.coerce_to_string(value); 888 } 889 890 value_len = napi.get_value_string_latin1(value, ptr, header_len); 891 892 f = req->response->fields + req->response->fields_count; 893 f->skip = 0; 894 895 nxt_unit_sptr_set(&f->name, name_ptr); 896 897 f->name_length = name_len; 898 f->hash = hash; 899 900 nxt_unit_sptr_set(&f->value, ptr); 901 f->value_length = (uint32_t) value_len; 902 903 ptr += value_len + 1; 904 header_len -= value_len + 1; 905 906 req->response->fields_count++; 907 } 908 } 909 910 } catch (exception &e) { 911 napi.throw_error(e); 912 return nullptr; 913 } 914 915 req->response_buf->free = ptr; 916 917 ret = nxt_unit_response_send(req); 918 if (ret != NXT_UNIT_OK) { 919 napi.throw_error("Failed to send response"); 920 return nullptr; 921 } 922 923 return this_arg; 924 } 925 926 927 napi_value 928 Unit::response_write(napi_env env, napi_callback_info info) 929 { 930 int ret; 931 void *ptr; 932 size_t argc, have_buf_len; 933 ssize_t res_len; 934 uint32_t buf_start, buf_len; 935 nxt_napi napi(env); 936 napi_value this_arg; 937 nxt_unit_buf_t *buf; 938 napi_valuetype buf_type; 939 nxt_unit_request_info_t *req; 940 napi_value argv[3]; 941 942 argc = 3; 943 944 try { 945 this_arg = napi.get_cb_info(info, argc, argv); 946 if (argc != 3) { 947 throw exception("Wrong args count. Expected: " 948 "chunk, start, length"); 949 } 950 951 req = napi.get_request_info(this_arg); 952 buf_type = napi.type_of(argv[0]); 953 buf_start = napi.get_value_uint32(argv[1]); 954 buf_len = napi.get_value_uint32(argv[2]) + 1; 955 956 if (buf_type == napi_string) { 957 /* TODO: will work only for utf8 content-type */ 958 959 if (req->response_buf != NULL 960 && req->response_buf->end >= req->response_buf->free + buf_len) 961 { 962 buf = req->response_buf; 963 964 } else { 965 buf = nxt_unit_response_buf_alloc(req, buf_len); 966 if (buf == NULL) { 967 throw exception("Failed to allocate response buffer"); 968 } 969 } 970 971 have_buf_len = napi.get_value_string_utf8(argv[0], buf->free, 972 buf_len); 973 974 buf->free += have_buf_len; 975 976 ret = nxt_unit_buf_send(buf); 977 if (ret == NXT_UNIT_OK) { 978 res_len = have_buf_len; 979 } 980 981 } else { 982 ptr = napi.get_buffer_info(argv[0], have_buf_len); 983 984 if (buf_start > 0) { 985 ptr = ((uint8_t *) ptr) + buf_start; 986 have_buf_len -= buf_start; 987 } 988 989 res_len = nxt_unit_response_write_nb(req, ptr, have_buf_len, 0); 990 991 ret = res_len < 0 ? -res_len : (int) NXT_UNIT_OK; 992 } 993 994 if (ret != NXT_UNIT_OK) { 995 throw exception("Failed to send body buf"); 996 } 997 } catch (exception &e) { 998 napi.throw_error(e); 999 return nullptr; 1000 } 1001 1002 return napi.create((int64_t) res_len); 1003 } 1004 1005 1006 napi_value 1007 Unit::response_end(napi_env env, napi_callback_info info) 1008 { 1009 nxt_napi napi(env); 1010 napi_value this_arg; 1011 req_data_t *req_data; 1012 nxt_unit_request_info_t *req; 1013 1014 try { 1015 this_arg = napi.get_cb_info(info); 1016 1017 req = napi.get_request_info(this_arg); 1018 1019 req_data = (req_data_t *) req->data; 1020 1021 napi.remove_wrap(req_data->sock_ref); 1022 napi.remove_wrap(req_data->req_ref); 1023 napi.remove_wrap(req_data->resp_ref); 1024 napi.remove_wrap(req_data->conn_ref); 1025 1026 } catch (exception &e) { 1027 napi.throw_error(e); 1028 return nullptr; 1029 } 1030 1031 nxt_unit_request_done(req, NXT_UNIT_OK); 1032 1033 return this_arg; 1034 } 1035 1036 1037 napi_value 1038 Unit::websocket_send_frame(napi_env env, napi_callback_info info) 1039 { 1040 int ret, iovec_len; 1041 bool fin; 1042 size_t buf_len; 1043 uint32_t opcode, sc; 1044 nxt_napi napi(env); 1045 napi_value this_arg, frame, payload; 1046 nxt_unit_request_info_t *req; 1047 char status_code[2]; 1048 struct iovec iov[2]; 1049 1050 iovec_len = 0; 1051 1052 try { 1053 this_arg = napi.get_cb_info(info, frame); 1054 1055 req = napi.get_request_info(this_arg); 1056 1057 opcode = napi.get_value_uint32(napi.get_named_property(frame, 1058 "opcode")); 1059 if (opcode == NXT_WEBSOCKET_OP_CLOSE) { 1060 sc = napi.get_value_uint32(napi.get_named_property(frame, 1061 "closeStatus")); 1062 status_code[0] = (sc >> 8) & 0xFF; 1063 status_code[1] = sc & 0xFF; 1064 1065 iov[iovec_len].iov_base = status_code; 1066 iov[iovec_len].iov_len = 2; 1067 iovec_len++; 1068 } 1069 1070 try { 1071 fin = napi.get_value_bool(napi.get_named_property(frame, "fin")); 1072 1073 } catch (exception &e) { 1074 fin = true; 1075 } 1076 1077 payload = napi.get_named_property(frame, "binaryPayload"); 1078 1079 if (napi.is_buffer(payload)) { 1080 iov[iovec_len].iov_base = napi.get_buffer_info(payload, buf_len); 1081 1082 } else { 1083 buf_len = 0; 1084 } 1085 1086 } catch (exception &e) { 1087 napi.throw_error(e); 1088 return nullptr; 1089 } 1090 1091 if (buf_len > 0) { 1092 iov[iovec_len].iov_len = buf_len; 1093 iovec_len++; 1094 } 1095 1096 ret = nxt_unit_websocket_sendv(req, opcode, fin ? 1 : 0, iov, iovec_len); 1097 if (ret != NXT_UNIT_OK) { 1098 goto failed; 1099 } 1100 1101 return this_arg; 1102 1103 failed: 1104 1105 napi.throw_error("Failed to send frame"); 1106 1107 return nullptr; 1108 } 1109 1110 1111 napi_value 1112 Unit::websocket_set_sock(napi_env env, napi_callback_info info) 1113 { 1114 nxt_napi napi(env); 1115 napi_value this_arg, sock; 1116 req_data_t *req_data; 1117 nxt_unit_request_info_t *req; 1118 1119 try { 1120 this_arg = napi.get_cb_info(info, sock); 1121 1122 req = napi.get_request_info(sock); 1123 1124 req_data = (req_data_t *) req->data; 1125 req_data->conn_ref = napi.wrap(this_arg, req, conn_destroy); 1126 1127 } catch (exception &e) { 1128 napi.throw_error(e); 1129 return nullptr; 1130 } 1131 1132 return this_arg; 1133 } 1134 1135 1136 void 1137 Unit::conn_destroy(napi_env env, void *r, void *finalize_hint) 1138 { 1139 nxt_unit_req_debug(NULL, "conn_destroy: %p", r); 1140 } 1141 1142 1143 void 1144 Unit::sock_destroy(napi_env env, void *r, void *finalize_hint) 1145 { 1146 nxt_unit_req_debug(NULL, "sock_destroy: %p", r); 1147 } 1148 1149 1150 void 1151 Unit::req_destroy(napi_env env, void *r, void *finalize_hint) 1152 { 1153 nxt_unit_req_debug(NULL, "req_destroy: %p", r); 1154 } 1155 1156 1157 void 1158 Unit::resp_destroy(napi_env env, void *r, void *finalize_hint) 1159 { 1160 nxt_unit_req_debug(NULL, "resp_destroy: %p", r); 1161 } 1162