1 2 /* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6 #include "unit.h" 7 8 #include <unistd.h> 9 #include <fcntl.h> 10 11 #include <uv.h> 12 13 14 napi_ref Unit::constructor_; 15 16 17 struct nxt_nodejs_ctx_t { 18 nxt_unit_port_id_t port_id; 19 uv_poll_t poll; 20 }; 21 22 23 Unit::Unit(napi_env env, napi_value jsthis): 24 nxt_napi(env), 25 wrapper_(wrap(jsthis, this, destroy)), 26 unit_ctx_(nullptr) 27 { 28 } 29 30 31 Unit::~Unit() 32 { 33 delete_reference(wrapper_); 34 } 35 36 37 napi_value 38 Unit::init(napi_env env, napi_value exports) 39 { 40 nxt_napi napi(env); 41 napi_value cons; 42 43 napi_property_descriptor properties[] = { 44 { "createServer", 0, create_server, 0, 0, 0, napi_default, 0 }, 45 { "listen", 0, listen, 0, 0, 0, napi_default, 0 }, 46 { "_read", 0, _read, 0, 0, 0, napi_default, 0 } 47 }; 48 49 try { 50 cons = napi.define_class("Unit", create, 3, properties); 51 constructor_ = napi.create_reference(cons); 52 53 napi.set_named_property(exports, "Unit", cons); 54 napi.set_named_property(exports, "unit_response_headers", 55 response_send_headers); 56 napi.set_named_property(exports, "unit_response_write", response_write); 57 napi.set_named_property(exports, "unit_response_end", response_end); 58 59 } catch (exception &e) { 60 napi.throw_error(e); 61 return nullptr; 62 } 63 64 return exports; 65 } 66 67 68 void 69 Unit::destroy(napi_env env, void *nativeObject, void *finalize_hint) 70 { 71 Unit *obj = reinterpret_cast<Unit *>(nativeObject); 72 73 delete obj; 74 } 75 76 77 napi_value 78 Unit::create(napi_env env, napi_callback_info info) 79 { 80 nxt_napi napi(env); 81 napi_value target, cons, instance, jsthis; 82 83 try { 84 target = napi.get_new_target(info); 85 86 if (target != nullptr) { 87 /* Invoked as constructor: `new Unit(...)`. */ 88 jsthis = napi.get_cb_info(info); 89 90 new Unit(env, jsthis); 91 napi.create_reference(jsthis); 92 93 return jsthis; 94 } 95 96 /* Invoked as plain function `Unit(...)`, turn into construct call. */ 97 cons = napi.get_reference_value(constructor_); 98 instance = napi.new_instance(cons); 99 napi.create_reference(instance); 100 101 } catch (exception &e) { 102 napi.throw_error(e); 103 return nullptr; 104 } 105 106 return instance; 107 } 108 109 110 napi_value 111 Unit::create_server(napi_env env, napi_callback_info info) 112 { 113 Unit *obj; 114 size_t argc; 115 nxt_napi napi(env); 116 napi_value jsthis, argv; 117 nxt_unit_init_t unit_init; 118 119 argc = 1; 120 121 try { 122 jsthis = napi.get_cb_info(info, argc, &argv); 123 obj = (Unit *) napi.unwrap(jsthis); 124 125 } catch (exception &e) { 126 napi.throw_error(e); 127 return nullptr; 128 } 129 130 memset(&unit_init, 0, sizeof(nxt_unit_init_t)); 131 132 unit_init.data = obj; 133 unit_init.callbacks.request_handler = request_handler; 134 unit_init.callbacks.add_port = add_port; 135 unit_init.callbacks.remove_port = remove_port; 136 unit_init.callbacks.quit = quit; 137 138 obj->unit_ctx_ = nxt_unit_init(&unit_init); 139 if (obj->unit_ctx_ == NULL) { 140 goto failed; 141 } 142 143 return nullptr; 144 145 failed: 146 147 napi_throw_error(env, NULL, "Failed to create Unit object"); 148 149 return nullptr; 150 } 151 152 153 napi_value 154 Unit::listen(napi_env env, napi_callback_info info) 155 { 156 return nullptr; 157 } 158 159 160 napi_value 161 Unit::_read(napi_env env, napi_callback_info info) 162 { 163 void *data; 164 size_t argc; 165 nxt_napi napi(env); 166 napi_value buffer, argv; 167 nxt_unit_request_info_t *req; 168 169 argc = 1; 170 171 try { 172 napi.get_cb_info(info, argc, &argv); 173 174 req = napi.get_request_info(argv); 175 buffer = napi.create_buffer((size_t) req->content_length, &data); 176 177 } catch (exception &e) { 178 napi.throw_error(e); 179 return nullptr; 180 } 181 182 nxt_unit_request_read(req, data, req->content_length); 183 184 return buffer; 185 } 186 187 188 void 189 Unit::request_handler(nxt_unit_request_info_t *req) 190 { 191 Unit *obj; 192 napi_value socket, request, response, server_obj; 193 napi_value emit_events; 194 napi_value events_args[3]; 195 196 obj = reinterpret_cast<Unit *>(req->unit->data); 197 198 try { 199 nxt_handle_scope scope(obj->env()); 200 201 server_obj = obj->get_server_object(); 202 203 socket = obj->create_socket(server_obj, req); 204 request = obj->create_request(server_obj, socket); 205 response = obj->create_response(server_obj, socket, request, req); 206 207 obj->create_headers(req, request); 208 209 emit_events = obj->get_named_property(server_obj, "emit_events"); 210 211 events_args[0] = server_obj; 212 events_args[1] = request; 213 events_args[2] = response; 214 215 nxt_async_context async_context(obj->env(), "unit_request_handler"); 216 nxt_callback_scope async_scope(async_context); 217 218 obj->make_callback(async_context, server_obj, emit_events, 219 3, events_args); 220 221 } catch (exception &e) { 222 obj->throw_error(e); 223 } 224 } 225 226 227 void 228 nxt_uv_read_callback(uv_poll_t *handle, int status, int events) 229 { 230 nxt_unit_run_once((nxt_unit_ctx_t *) handle->data); 231 } 232 233 234 int 235 Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 236 { 237 int err; 238 Unit *obj; 239 uv_loop_t *loop; 240 napi_status status; 241 nxt_nodejs_ctx_t *node_ctx; 242 243 if (port->in_fd != -1) { 244 obj = reinterpret_cast<Unit *>(ctx->unit->data); 245 246 if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) { 247 obj->throw_error("Failed to upgrade read" 248 " file descriptor to O_NONBLOCK"); 249 return -1; 250 } 251 252 status = napi_get_uv_event_loop(obj->env(), &loop); 253 if (status != napi_ok) { 254 obj->throw_error("Failed to get uv.loop"); 255 return NXT_UNIT_ERROR; 256 } 257 258 node_ctx = new nxt_nodejs_ctx_t; 259 260 err = uv_poll_init(loop, &node_ctx->poll, port->in_fd); 261 if (err < 0) { 262 obj->throw_error("Failed to init uv.poll"); 263 return NXT_UNIT_ERROR; 264 } 265 266 err = uv_poll_start(&node_ctx->poll, UV_READABLE, nxt_uv_read_callback); 267 if (err < 0) { 268 obj->throw_error("Failed to start uv.poll"); 269 return NXT_UNIT_ERROR; 270 } 271 272 ctx->data = node_ctx; 273 274 node_ctx->port_id = port->id; 275 node_ctx->poll.data = ctx; 276 } 277 278 return nxt_unit_add_port(ctx, port); 279 } 280 281 282 inline bool 283 operator == (const nxt_unit_port_id_t &p1, const nxt_unit_port_id_t &p2) 284 { 285 return p1.pid == p2.pid && p1.id == p2.id; 286 } 287 288 289 void 290 Unit::remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) 291 { 292 nxt_nodejs_ctx_t *node_ctx; 293 294 if (ctx->data != NULL) { 295 node_ctx = (nxt_nodejs_ctx_t *) ctx->data; 296 297 if (node_ctx->port_id == *port_id) { 298 uv_poll_stop(&node_ctx->poll); 299 300 delete node_ctx; 301 302 ctx->data = NULL; 303 } 304 } 305 306 nxt_unit_remove_port(ctx, port_id); 307 } 308 309 310 void 311 Unit::quit(nxt_unit_ctx_t *ctx) 312 { 313 Unit *obj; 314 napi_value server_obj, emit_close; 315 316 obj = reinterpret_cast<Unit *>(ctx->unit->data); 317 318 try { 319 nxt_handle_scope scope(obj->env()); 320 321 server_obj = obj->get_server_object(); 322 323 emit_close = obj->get_named_property(server_obj, "emit_close"); 324 325 nxt_async_context async_context(obj->env(), "unit_quit"); 326 nxt_callback_scope async_scope(async_context); 327 328 obj->make_callback(async_context, server_obj, emit_close, 0, NULL); 329 330 } catch (exception &e) { 331 obj->throw_error(e); 332 } 333 334 nxt_unit_done(ctx); 335 } 336 337 338 napi_value 339 Unit::get_server_object() 340 { 341 napi_value unit_obj; 342 343 unit_obj = get_reference_value(wrapper_); 344 345 return get_named_property(unit_obj, "server"); 346 } 347 348 349 void 350 Unit::create_headers(nxt_unit_request_info_t *req, napi_value request) 351 { 352 uint32_t i; 353 napi_value headers, raw_headers; 354 napi_status status; 355 nxt_unit_request_t *r; 356 357 r = req->request; 358 359 headers = create_object(); 360 361 status = napi_create_array_with_length(env(), r->fields_count * 2, 362 &raw_headers); 363 if (status != napi_ok) { 364 throw exception("Failed to create array"); 365 } 366 367 for (i = 0; i < r->fields_count; i++) { 368 append_header(r->fields + i, headers, raw_headers, i); 369 } 370 371 set_named_property(request, "headers", headers); 372 set_named_property(request, "rawHeaders", raw_headers); 373 set_named_property(request, "httpVersion", r->version, r->version_length); 374 set_named_property(request, "method", r->method, r->method_length); 375 set_named_property(request, "url", r->target, r->target_length); 376 } 377 378 379 inline void 380 Unit::append_header(nxt_unit_field_t *f, napi_value headers, 381 napi_value raw_headers, uint32_t idx) 382 { 383 const char *name; 384 napi_value str, vstr; 385 386 name = (const char *) nxt_unit_sptr_get(&f->name); 387 388 vstr = set_named_property(headers, name, f->value, f->value_length); 389 str = create_string_latin1(name, f->name_length); 390 391 set_element(raw_headers, idx * 2, str); 392 set_element(raw_headers, idx * 2 + 1, vstr); 393 } 394 395 396 napi_value 397 Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req) 398 { 399 napi_value constructor, res; 400 nxt_unit_request_t *r; 401 402 r = req->request; 403 404 constructor = get_named_property(server_obj, "socket"); 405 406 res = new_instance(constructor); 407 408 set_named_property(res, "req_pointer", (intptr_t) req); 409 set_named_property(res, "remoteAddress", r->remote, r->remote_length); 410 set_named_property(res, "localAddress", r->local, r->local_length); 411 412 return res; 413 } 414 415 416 napi_value 417 Unit::create_request(napi_value server_obj, napi_value socket) 418 { 419 napi_value constructor, return_val; 420 421 constructor = get_named_property(server_obj, "request"); 422 423 return_val = new_instance(constructor, server_obj); 424 425 set_named_property(return_val, "socket", socket); 426 set_named_property(return_val, "connection", socket); 427 428 return return_val; 429 } 430 431 432 napi_value 433 Unit::create_response(napi_value server_obj, napi_value socket, 434 napi_value request, nxt_unit_request_info_t *req) 435 { 436 napi_value constructor, return_val; 437 438 constructor = get_named_property(server_obj, "response"); 439 440 return_val = new_instance(constructor, request); 441 442 set_named_property(return_val, "socket", socket); 443 set_named_property(return_val, "connection", socket); 444 set_named_property(return_val, "_req_point", (intptr_t) req); 445 446 return return_val; 447 } 448 449 450 napi_value 451 Unit::response_send_headers(napi_env env, napi_callback_info info) 452 { 453 int ret; 454 char *ptr, *name_ptr; 455 bool is_array; 456 size_t argc, name_len, value_len; 457 uint32_t status_code, header_len, keys_len, array_len; 458 uint32_t keys_count, i, j; 459 uint16_t hash; 460 nxt_napi napi(env); 461 napi_value this_arg, headers, keys, name, value, array_val; 462 napi_value req_num, array_entry; 463 napi_valuetype val_type; 464 nxt_unit_field_t *f; 465 nxt_unit_request_info_t *req; 466 napi_value argv[5]; 467 468 argc = 5; 469 470 try { 471 this_arg = napi.get_cb_info(info, argc, argv); 472 if (argc != 5) { 473 napi.throw_error("Wrong args count. Expected: " 474 "statusCode, headers, headers count, " 475 "headers length"); 476 return nullptr; 477 } 478 479 req_num = napi.get_named_property(argv[0], "_req_point"); 480 481 req = napi.get_request_info(req_num); 482 483 status_code = napi.get_value_uint32(argv[1]); 484 keys_count = napi.get_value_uint32(argv[3]); 485 header_len = napi.get_value_uint32(argv[4]); 486 487 /* Need to reserve extra byte for C-string 0-termination. */ 488 header_len++; 489 490 headers = argv[2]; 491 492 ret = nxt_unit_response_init(req, status_code, keys_count, header_len); 493 if (ret != NXT_UNIT_OK) { 494 napi.throw_error("Failed to create response"); 495 return nullptr; 496 } 497 498 keys = napi.get_property_names(headers); 499 keys_len = napi.get_array_length(keys); 500 501 ptr = req->response_buf->free; 502 503 for (i = 0; i < keys_len; i++) { 504 name = napi.get_element(keys, i); 505 506 array_entry = napi.get_property(headers, name); 507 508 name = napi.get_element(array_entry, 0); 509 value = napi.get_element(array_entry, 1); 510 511 name_len = napi.get_value_string_latin1(name, ptr, header_len); 512 name_ptr = ptr; 513 514 ptr += name_len; 515 header_len -= name_len; 516 517 hash = nxt_unit_field_hash(name_ptr, name_len); 518 519 is_array = napi.is_array(value); 520 521 if (is_array) { 522 array_len = napi.get_array_length(value); 523 524 for (j = 0; j < array_len; j++) { 525 array_val = napi.get_element(value, j); 526 527 val_type = napi.type_of(array_val); 528 529 if (val_type != napi_string) { 530 array_val = napi.coerce_to_string(array_val); 531 } 532 533 value_len = napi.get_value_string_latin1(array_val, ptr, 534 header_len); 535 536 f = req->response->fields + req->response->fields_count; 537 f->skip = 0; 538 539 nxt_unit_sptr_set(&f->name, name_ptr); 540 541 f->name_length = name_len; 542 f->hash = hash; 543 544 nxt_unit_sptr_set(&f->value, ptr); 545 f->value_length = (uint32_t) value_len; 546 547 ptr += value_len; 548 header_len -= value_len; 549 550 req->response->fields_count++; 551 } 552 553 } else { 554 val_type = napi.type_of(value); 555 556 if (val_type != napi_string) { 557 value = napi.coerce_to_string(value); 558 } 559 560 value_len = napi.get_value_string_latin1(value, ptr, header_len); 561 562 f = req->response->fields + req->response->fields_count; 563 f->skip = 0; 564 565 nxt_unit_sptr_set(&f->name, name_ptr); 566 567 f->name_length = name_len; 568 f->hash = hash; 569 570 nxt_unit_sptr_set(&f->value, ptr); 571 f->value_length = (uint32_t) value_len; 572 573 ptr += value_len; 574 header_len -= value_len; 575 576 req->response->fields_count++; 577 } 578 } 579 580 } catch (exception &e) { 581 napi.throw_error(e); 582 return nullptr; 583 } 584 585 req->response_buf->free = ptr; 586 587 ret = nxt_unit_response_send(req); 588 if (ret != NXT_UNIT_OK) { 589 napi.throw_error("Failed to send response"); 590 return nullptr; 591 } 592 593 return this_arg; 594 } 595 596 597 napi_value 598 Unit::response_write(napi_env env, napi_callback_info info) 599 { 600 int ret; 601 char *ptr; 602 size_t argc, have_buf_len; 603 uint32_t buf_len; 604 nxt_napi napi(env); 605 napi_value this_arg, req_num; 606 napi_status status; 607 nxt_unit_buf_t *buf; 608 napi_valuetype buf_type; 609 nxt_unit_request_info_t *req; 610 napi_value argv[3]; 611 612 argc = 3; 613 614 try { 615 this_arg = napi.get_cb_info(info, argc, argv); 616 if (argc != 3) { 617 throw exception("Wrong args count. Expected: " 618 "chunk, chunk length"); 619 } 620 621 req_num = napi.get_named_property(argv[0], "_req_point"); 622 req = napi.get_request_info(req_num); 623 624 buf_len = napi.get_value_uint32(argv[2]); 625 626 buf_type = napi.type_of(argv[1]); 627 628 } catch (exception &e) { 629 napi.throw_error(e); 630 return nullptr; 631 } 632 633 buf_len++; 634 635 buf = nxt_unit_response_buf_alloc(req, buf_len); 636 if (buf == NULL) { 637 goto failed; 638 } 639 640 if (buf_type == napi_string) { 641 /* TODO: will work only for utf8 content-type */ 642 643 status = napi_get_value_string_utf8(env, argv[1], buf->free, 644 buf_len, &have_buf_len); 645 646 } else { 647 status = napi_get_buffer_info(env, argv[1], (void **) &ptr, 648 &have_buf_len); 649 650 memcpy(buf->free, ptr, have_buf_len); 651 } 652 653 if (status != napi_ok) { 654 goto failed; 655 } 656 657 buf->free += have_buf_len; 658 659 ret = nxt_unit_buf_send(buf); 660 if (ret != NXT_UNIT_OK) { 661 goto failed; 662 } 663 664 return this_arg; 665 666 failed: 667 668 napi.throw_error("Failed to write body"); 669 670 return nullptr; 671 } 672 673 674 napi_value 675 Unit::response_end(napi_env env, napi_callback_info info) 676 { 677 size_t argc; 678 nxt_napi napi(env); 679 napi_value resp, this_arg, req_num; 680 nxt_unit_request_info_t *req; 681 682 argc = 1; 683 684 try { 685 this_arg = napi.get_cb_info(info, argc, &resp); 686 687 req_num = napi.get_named_property(resp, "_req_point"); 688 req = napi.get_request_info(req_num); 689 690 } catch (exception &e) { 691 napi.throw_error(e); 692 return nullptr; 693 } 694 695 nxt_unit_request_done(req, NXT_UNIT_OK); 696 697 return this_arg; 698 } 699