1 2 /* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6 #include "unit.h" 7 8 #include <unistd.h> 9 #include <fcntl.h> 10 11 #include <uv.h> 12 13 14 napi_ref Unit::constructor_; 15 16 17 struct nxt_nodejs_ctx_t { 18 nxt_unit_port_id_t port_id; 19 uv_poll_t poll; 20 }; 21 22 23 Unit::Unit(napi_env env, napi_value jsthis): 24 nxt_napi(env), 25 wrapper_(wrap(jsthis, this, destroy)), 26 unit_ctx_(nullptr) 27 { 28 } 29 30 31 Unit::~Unit() 32 { 33 delete_reference(wrapper_); 34 } 35 36 37 napi_value 38 Unit::init(napi_env env, napi_value exports) 39 { 40 nxt_napi napi(env); 41 napi_value cons; 42 43 napi_property_descriptor properties[] = { 44 { "createServer", 0, create_server, 0, 0, 0, napi_default, 0 }, 45 { "listen", 0, listen, 0, 0, 0, napi_default, 0 }, 46 { "_read", 0, _read, 0, 0, 0, napi_default, 0 } 47 }; 48 49 try { 50 cons = napi.define_class("Unit", create, 3, properties); 51 constructor_ = napi.create_reference(cons); 52 53 napi.set_named_property(exports, "Unit", cons); 54 napi.set_named_property(exports, "unit_response_headers", 55 response_send_headers); 56 napi.set_named_property(exports, "unit_response_write", response_write); 57 napi.set_named_property(exports, "unit_response_end", response_end); 58 59 } catch (exception &e) { 60 napi.throw_error(e); 61 return nullptr; 62 } 63 64 return exports; 65 } 66 67 68 void 69 Unit::destroy(napi_env env, void *nativeObject, void *finalize_hint) 70 { 71 Unit *obj = reinterpret_cast<Unit *>(nativeObject); 72 73 delete obj; 74 } 75 76 77 napi_value 78 Unit::create(napi_env env, napi_callback_info info) 79 { 80 nxt_napi napi(env); 81 napi_value target, cons, instance, jsthis; 82 83 try { 84 target = napi.get_new_target(info); 85 86 if (target != nullptr) { 87 /* Invoked as constructor: `new Unit(...)`. */ 88 jsthis = napi.get_cb_info(info); 89 90 new Unit(env, jsthis); 91 napi.create_reference(jsthis); 92 93 return jsthis; 94 } 95 96 /* Invoked as plain function `Unit(...)`, turn into construct call. */ 97 cons = napi.get_reference_value(constructor_); 98 instance = napi.new_instance(cons); 99 napi.create_reference(instance); 100 101 } catch (exception &e) { 102 napi.throw_error(e); 103 return nullptr; 104 } 105 106 return instance; 107 } 108 109 110 napi_value 111 Unit::create_server(napi_env env, napi_callback_info info) 112 { 113 Unit *obj; 114 size_t argc; 115 nxt_napi napi(env); 116 napi_value jsthis, argv; 117 nxt_unit_init_t unit_init; 118 119 argc = 1; 120 121 try { 122 jsthis = napi.get_cb_info(info, argc, &argv); 123 obj = (Unit *) napi.unwrap(jsthis); 124 125 } catch (exception &e) { 126 napi.throw_error(e); 127 return nullptr; 128 } 129 130 memset(&unit_init, 0, sizeof(nxt_unit_init_t)); 131 132 unit_init.data = obj; 133 unit_init.callbacks.request_handler = request_handler; 134 unit_init.callbacks.add_port = add_port; 135 unit_init.callbacks.remove_port = remove_port; 136 unit_init.callbacks.quit = quit; 137 138 obj->unit_ctx_ = nxt_unit_init(&unit_init); 139 if (obj->unit_ctx_ == NULL) { 140 goto failed; 141 } 142 143 return nullptr; 144 145 failed: 146 147 napi_throw_error(env, NULL, "Failed to create Unit object"); 148 149 return nullptr; 150 } 151 152 153 napi_value 154 Unit::listen(napi_env env, napi_callback_info info) 155 { 156 return nullptr; 157 } 158 159 160 napi_value 161 Unit::_read(napi_env env, napi_callback_info info) 162 { 163 void *data; 164 size_t argc; 165 nxt_napi napi(env); 166 napi_value buffer, argv; 167 nxt_unit_request_info_t *req; 168 169 argc = 1; 170 171 try { 172 napi.get_cb_info(info, argc, &argv); 173 174 req = napi.get_request_info(argv); 175 buffer = napi.create_buffer((size_t) req->content_length, &data); 176 177 } catch (exception &e) { 178 napi.throw_error(e); 179 return nullptr; 180 } 181 182 nxt_unit_request_read(req, data, req->content_length); 183 184 return buffer; 185 } 186 187 188 void 189 Unit::request_handler(nxt_unit_request_info_t *req) 190 { 191 Unit *obj; 192 napi_value socket, request, response, server_obj; 193 napi_value emit_events; 194 napi_value events_args[3]; 195 196 obj = reinterpret_cast<Unit *>(req->unit->data); 197 198 try { 199 nxt_handle_scope scope(obj->env()); 200 201 server_obj = obj->get_server_object(); 202 203 socket = obj->create_socket(server_obj, req); 204 request = obj->create_request(server_obj, socket); 205 response = obj->create_response(server_obj, socket, request, req); 206 207 obj->create_headers(req, request); 208 209 emit_events = obj->get_named_property(server_obj, "emit_events"); 210 211 events_args[0] = server_obj; 212 events_args[1] = request; 213 events_args[2] = response; 214 215 nxt_async_context async_context(obj->env(), "unit_request_handler"); 216 nxt_callback_scope async_scope(async_context); 217 218 obj->make_callback(async_context, server_obj, emit_events, 219 3, events_args); 220 221 } catch (exception &e) { 222 obj->throw_error(e); 223 } 224 } 225 226 227 void 228 nxt_uv_read_callback(uv_poll_t *handle, int status, int events) 229 { 230 nxt_unit_run_once((nxt_unit_ctx_t *) handle->data); 231 } 232 233 234 int 235 Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 236 { 237 int err; 238 Unit *obj; 239 uv_loop_t *loop; 240 napi_status status; 241 nxt_nodejs_ctx_t *node_ctx; 242 243 if (port->in_fd != -1) { 244 obj = reinterpret_cast<Unit *>(ctx->unit->data); 245 246 if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) { 247 obj->throw_error("Failed to upgrade read" 248 " file descriptor to O_NONBLOCK"); 249 return -1; 250 } 251 252 status = napi_get_uv_event_loop(obj->env(), &loop); 253 if (status != napi_ok) { 254 obj->throw_error("Failed to get uv.loop"); 255 return NXT_UNIT_ERROR; 256 } 257 258 node_ctx = new nxt_nodejs_ctx_t; 259 260 err = uv_poll_init(loop, &node_ctx->poll, port->in_fd); 261 if (err < 0) { 262 obj->throw_error("Failed to init uv.poll"); 263 return NXT_UNIT_ERROR; 264 } 265 266 err = uv_poll_start(&node_ctx->poll, UV_READABLE, nxt_uv_read_callback); 267 if (err < 0) { 268 obj->throw_error("Failed to start uv.poll"); 269 return NXT_UNIT_ERROR; 270 } 271 272 ctx->data = node_ctx; 273 274 node_ctx->port_id = port->id; 275 node_ctx->poll.data = ctx; 276 } 277 278 return nxt_unit_add_port(ctx, port); 279 } 280 281 282 inline bool 283 operator == (const nxt_unit_port_id_t &p1, const nxt_unit_port_id_t &p2) 284 { 285 return p1.pid == p2.pid && p1.id == p2.id; 286 } 287 288 289 void 290 Unit::remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) 291 { 292 nxt_nodejs_ctx_t *node_ctx; 293 294 if (ctx->data != NULL) { 295 node_ctx = (nxt_nodejs_ctx_t *) ctx->data; 296 297 if (node_ctx->port_id == *port_id) { 298 uv_poll_stop(&node_ctx->poll); 299 300 delete node_ctx; 301 302 ctx->data = NULL; 303 } 304 } 305 306 nxt_unit_remove_port(ctx, port_id); 307 } 308 309 310 void 311 Unit::quit(nxt_unit_ctx_t *ctx) 312 { 313 Unit *obj; 314 napi_value server_obj, emit_close; 315 316 obj = reinterpret_cast<Unit *>(ctx->unit->data); 317 318 try { 319 nxt_handle_scope scope(obj->env()); 320 321 server_obj = obj->get_server_object(); 322 323 emit_close = obj->get_named_property(server_obj, "emit_close"); 324 325 nxt_async_context async_context(obj->env(), "unit_quit"); 326 nxt_callback_scope async_scope(async_context); 327 328 obj->make_callback(async_context, server_obj, emit_close, 0, NULL); 329 330 } catch (exception &e) { 331 obj->throw_error(e); 332 } 333 334 nxt_unit_done(ctx); 335 } 336 337 338 napi_value 339 Unit::get_server_object() 340 { 341 napi_value unit_obj; 342 343 unit_obj = get_reference_value(wrapper_); 344 345 return get_named_property(unit_obj, "server"); 346 } 347 348 349 void 350 Unit::create_headers(nxt_unit_request_info_t *req, napi_value request) 351 { 352 uint32_t i; 353 napi_value headers, raw_headers; 354 napi_status status; 355 nxt_unit_request_t *r; 356 357 r = req->request; 358 359 headers = create_object(); 360 361 status = napi_create_array_with_length(env(), r->fields_count * 2, 362 &raw_headers); 363 if (status != napi_ok) { 364 throw exception("Failed to create array"); 365 } 366 367 for (i = 0; i < r->fields_count; i++) { 368 append_header(r->fields + i, headers, raw_headers, i); 369 } 370 371 set_named_property(request, "headers", headers); 372 set_named_property(request, "rawHeaders", raw_headers); 373 set_named_property(request, "httpVersion", r->version, r->version_length); 374 set_named_property(request, "method", r->method, r->method_length); 375 set_named_property(request, "url", r->target, r->target_length); 376 } 377 378 379 inline char 380 lowcase(char c) 381 { 382 return (c >= 'A' && c <= 'Z') ? (c | 0x20) : c; 383 } 384 385 386 inline void 387 Unit::append_header(nxt_unit_field_t *f, napi_value headers, 388 napi_value raw_headers, uint32_t idx) 389 { 390 char *name; 391 uint8_t i; 392 napi_value str, vstr; 393 394 name = (char *) nxt_unit_sptr_get(&f->name); 395 396 str = create_string_latin1(name, f->name_length); 397 398 for (i = 0; i < f->name_length; i++) { 399 name[i] = lowcase(name[i]); 400 } 401 402 vstr = set_named_property(headers, name, f->value, f->value_length); 403 404 set_element(raw_headers, idx * 2, str); 405 set_element(raw_headers, idx * 2 + 1, vstr); 406 } 407 408 409 napi_value 410 Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req) 411 { 412 napi_value constructor, res; 413 nxt_unit_request_t *r; 414 415 r = req->request; 416 417 constructor = get_named_property(server_obj, "socket"); 418 419 res = new_instance(constructor); 420 421 set_named_property(res, "req_pointer", (intptr_t) req); 422 set_named_property(res, "remoteAddress", r->remote, r->remote_length); 423 set_named_property(res, "localAddress", r->local, r->local_length); 424 425 return res; 426 } 427 428 429 napi_value 430 Unit::create_request(napi_value server_obj, napi_value socket) 431 { 432 napi_value constructor, return_val; 433 434 constructor = get_named_property(server_obj, "request"); 435 436 return_val = new_instance(constructor, server_obj); 437 438 set_named_property(return_val, "socket", socket); 439 set_named_property(return_val, "connection", socket); 440 441 return return_val; 442 } 443 444 445 napi_value 446 Unit::create_response(napi_value server_obj, napi_value socket, 447 napi_value request, nxt_unit_request_info_t *req) 448 { 449 napi_value constructor, return_val; 450 451 constructor = get_named_property(server_obj, "response"); 452 453 return_val = new_instance(constructor, request); 454 455 set_named_property(return_val, "socket", socket); 456 set_named_property(return_val, "connection", socket); 457 set_named_property(return_val, "_req_point", (intptr_t) req); 458 459 return return_val; 460 } 461 462 463 napi_value 464 Unit::response_send_headers(napi_env env, napi_callback_info info) 465 { 466 int ret; 467 char *ptr, *name_ptr; 468 bool is_array; 469 size_t argc, name_len, value_len; 470 uint32_t status_code, header_len, keys_len, array_len; 471 uint32_t keys_count, i, j; 472 uint16_t hash; 473 nxt_napi napi(env); 474 napi_value this_arg, headers, keys, name, value, array_val; 475 napi_value req_num, array_entry; 476 napi_valuetype val_type; 477 nxt_unit_field_t *f; 478 nxt_unit_request_info_t *req; 479 napi_value argv[5]; 480 481 argc = 5; 482 483 try { 484 this_arg = napi.get_cb_info(info, argc, argv); 485 if (argc != 5) { 486 napi.throw_error("Wrong args count. Expected: " 487 "statusCode, headers, headers count, " 488 "headers length"); 489 return nullptr; 490 } 491 492 req_num = napi.get_named_property(argv[0], "_req_point"); 493 494 req = napi.get_request_info(req_num); 495 496 status_code = napi.get_value_uint32(argv[1]); 497 keys_count = napi.get_value_uint32(argv[3]); 498 header_len = napi.get_value_uint32(argv[4]); 499 500 /* Need to reserve extra byte for C-string 0-termination. */ 501 header_len++; 502 503 headers = argv[2]; 504 505 ret = nxt_unit_response_init(req, status_code, keys_count, header_len); 506 if (ret != NXT_UNIT_OK) { 507 napi.throw_error("Failed to create response"); 508 return nullptr; 509 } 510 511 keys = napi.get_property_names(headers); 512 keys_len = napi.get_array_length(keys); 513 514 ptr = req->response_buf->free; 515 516 for (i = 0; i < keys_len; i++) { 517 name = napi.get_element(keys, i); 518 519 array_entry = napi.get_property(headers, name); 520 521 name = napi.get_element(array_entry, 0); 522 value = napi.get_element(array_entry, 1); 523 524 name_len = napi.get_value_string_latin1(name, ptr, header_len); 525 name_ptr = ptr; 526 527 ptr += name_len; 528 header_len -= name_len; 529 530 hash = nxt_unit_field_hash(name_ptr, name_len); 531 532 is_array = napi.is_array(value); 533 534 if (is_array) { 535 array_len = napi.get_array_length(value); 536 537 for (j = 0; j < array_len; j++) { 538 array_val = napi.get_element(value, j); 539 540 val_type = napi.type_of(array_val); 541 542 if (val_type != napi_string) { 543 array_val = napi.coerce_to_string(array_val); 544 } 545 546 value_len = napi.get_value_string_latin1(array_val, ptr, 547 header_len); 548 549 f = req->response->fields + req->response->fields_count; 550 f->skip = 0; 551 552 nxt_unit_sptr_set(&f->name, name_ptr); 553 554 f->name_length = name_len; 555 f->hash = hash; 556 557 nxt_unit_sptr_set(&f->value, ptr); 558 f->value_length = (uint32_t) value_len; 559 560 ptr += value_len; 561 header_len -= value_len; 562 563 req->response->fields_count++; 564 } 565 566 } else { 567 val_type = napi.type_of(value); 568 569 if (val_type != napi_string) { 570 value = napi.coerce_to_string(value); 571 } 572 573 value_len = napi.get_value_string_latin1(value, ptr, header_len); 574 575 f = req->response->fields + req->response->fields_count; 576 f->skip = 0; 577 578 nxt_unit_sptr_set(&f->name, name_ptr); 579 580 f->name_length = name_len; 581 f->hash = hash; 582 583 nxt_unit_sptr_set(&f->value, ptr); 584 f->value_length = (uint32_t) value_len; 585 586 ptr += value_len; 587 header_len -= value_len; 588 589 req->response->fields_count++; 590 } 591 } 592 593 } catch (exception &e) { 594 napi.throw_error(e); 595 return nullptr; 596 } 597 598 req->response_buf->free = ptr; 599 600 ret = nxt_unit_response_send(req); 601 if (ret != NXT_UNIT_OK) { 602 napi.throw_error("Failed to send response"); 603 return nullptr; 604 } 605 606 return this_arg; 607 } 608 609 610 napi_value 611 Unit::response_write(napi_env env, napi_callback_info info) 612 { 613 int ret; 614 char *ptr; 615 size_t argc, have_buf_len; 616 uint32_t buf_len; 617 nxt_napi napi(env); 618 napi_value this_arg, req_num; 619 napi_status status; 620 nxt_unit_buf_t *buf; 621 napi_valuetype buf_type; 622 nxt_unit_request_info_t *req; 623 napi_value argv[3]; 624 625 argc = 3; 626 627 try { 628 this_arg = napi.get_cb_info(info, argc, argv); 629 if (argc != 3) { 630 throw exception("Wrong args count. Expected: " 631 "chunk, chunk length"); 632 } 633 634 req_num = napi.get_named_property(argv[0], "_req_point"); 635 req = napi.get_request_info(req_num); 636 637 buf_len = napi.get_value_uint32(argv[2]); 638 639 buf_type = napi.type_of(argv[1]); 640 641 } catch (exception &e) { 642 napi.throw_error(e); 643 return nullptr; 644 } 645 646 buf_len++; 647 648 buf = nxt_unit_response_buf_alloc(req, buf_len); 649 if (buf == NULL) { 650 goto failed; 651 } 652 653 if (buf_type == napi_string) { 654 /* TODO: will work only for utf8 content-type */ 655 656 status = napi_get_value_string_utf8(env, argv[1], buf->free, 657 buf_len, &have_buf_len); 658 659 } else { 660 status = napi_get_buffer_info(env, argv[1], (void **) &ptr, 661 &have_buf_len); 662 663 memcpy(buf->free, ptr, have_buf_len); 664 } 665 666 if (status != napi_ok) { 667 goto failed; 668 } 669 670 buf->free += have_buf_len; 671 672 ret = nxt_unit_buf_send(buf); 673 if (ret != NXT_UNIT_OK) { 674 goto failed; 675 } 676 677 return this_arg; 678 679 failed: 680 681 napi.throw_error("Failed to write body"); 682 683 return nullptr; 684 } 685 686 687 napi_value 688 Unit::response_end(napi_env env, napi_callback_info info) 689 { 690 size_t argc; 691 nxt_napi napi(env); 692 napi_value resp, this_arg, req_num; 693 nxt_unit_request_info_t *req; 694 695 argc = 1; 696 697 try { 698 this_arg = napi.get_cb_info(info, argc, &resp); 699 700 req_num = napi.get_named_property(resp, "_req_point"); 701 req = napi.get_request_info(req_num); 702 703 } catch (exception &e) { 704 napi.throw_error(e); 705 return nullptr; 706 } 707 708 nxt_unit_request_done(req, NXT_UNIT_OK); 709 710 return this_arg; 711 } 712