1 2 /* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6 #include "unit.h" 7 8 #include <unistd.h> 9 #include <fcntl.h> 10 11 #include <uv.h> 12 13 14 napi_ref Unit::constructor_; 15 16 17 struct nxt_nodejs_ctx_t { 18 nxt_unit_port_id_t port_id; 19 uv_poll_t poll; 20 }; 21 22 23 Unit::Unit(napi_env env, napi_value jsthis): 24 nxt_napi(env), 25 wrapper_(wrap(jsthis, this, destroy)), 26 unit_ctx_(nullptr) 27 { 28 } 29 30 31 Unit::~Unit() 32 { 33 delete_reference(wrapper_); 34 } 35 36 37 napi_value 38 Unit::init(napi_env env, napi_value exports) 39 { 40 nxt_napi napi(env); 41 napi_value cons; 42 43 napi_property_descriptor properties[] = { 44 { "createServer", 0, create_server, 0, 0, 0, napi_default, 0 }, 45 { "listen", 0, listen, 0, 0, 0, napi_default, 0 }, 46 { "_read", 0, _read, 0, 0, 0, napi_default, 0 } 47 }; 48 49 try { 50 cons = napi.define_class("Unit", create, 3, properties); 51 constructor_ = napi.create_reference(cons); 52 53 napi.set_named_property(exports, "Unit", cons); 54 napi.set_named_property(exports, "unit_response_headers", 55 response_send_headers); 56 napi.set_named_property(exports, "unit_response_write", response_write); 57 napi.set_named_property(exports, "unit_response_end", response_end); 58 59 } catch (exception &e) { 60 napi.throw_error(e); 61 return nullptr; 62 } 63 64 return exports; 65 } 66 67 68 void 69 Unit::destroy(napi_env env, void *nativeObject, void *finalize_hint) 70 { 71 Unit *obj = reinterpret_cast<Unit *>(nativeObject); 72 73 delete obj; 74 } 75 76 77 napi_value 78 Unit::create(napi_env env, napi_callback_info info) 79 { 80 Unit *obj; 81 nxt_napi napi(env); 82 napi_ref ref; 83 napi_value target, cons, instance, jsthis; 84 85 try { 86 target = napi.get_new_target(info); 87 88 if (target != nullptr) { 89 /* Invoked as constructor: `new Unit(...)`. */ 90 jsthis = napi.get_cb_info(info); 91 92 obj = new Unit(env, jsthis); 93 94 ref = napi.create_reference(jsthis); 95 96 return jsthis; 97 } 98 99 /* Invoked as plain function `Unit(...)`, turn into construct call. */ 100 cons = napi.get_reference_value(constructor_); 101 instance = napi.new_instance(cons); 102 ref = napi.create_reference(instance); 103 104 } catch (exception &e) { 105 napi.throw_error(e); 106 return nullptr; 107 } 108 109 return instance; 110 } 111 112 113 napi_value 114 Unit::create_server(napi_env env, napi_callback_info info) 115 { 116 Unit *obj; 117 size_t argc; 118 nxt_napi napi(env); 119 napi_value jsthis, argv; 120 nxt_unit_init_t unit_init; 121 122 argc = 1; 123 124 try { 125 jsthis = napi.get_cb_info(info, argc, &argv); 126 obj = (Unit *) napi.unwrap(jsthis); 127 128 } catch (exception &e) { 129 napi.throw_error(e); 130 return nullptr; 131 } 132 133 memset(&unit_init, 0, sizeof(nxt_unit_init_t)); 134 135 unit_init.data = obj; 136 unit_init.callbacks.request_handler = request_handler; 137 unit_init.callbacks.add_port = add_port; 138 unit_init.callbacks.remove_port = remove_port; 139 unit_init.callbacks.quit = quit; 140 141 obj->unit_ctx_ = nxt_unit_init(&unit_init); 142 if (obj->unit_ctx_ == NULL) { 143 goto failed; 144 } 145 146 return nullptr; 147 148 failed: 149 150 napi_throw_error(env, NULL, "Failed to create Unit object"); 151 152 return nullptr; 153 } 154 155 156 napi_value 157 Unit::listen(napi_env env, napi_callback_info info) 158 { 159 return nullptr; 160 } 161 162 163 napi_value 164 Unit::_read(napi_env env, napi_callback_info info) 165 { 166 void *data; 167 size_t argc; 168 nxt_napi napi(env); 169 napi_value jsthis, buffer, argv; 170 nxt_unit_request_info_t *req; 171 172 argc = 1; 173 174 try { 175 jsthis = napi.get_cb_info(info, argc, &argv); 176 177 req = napi.get_request_info(argv); 178 buffer = napi.create_buffer((size_t) req->content_length, &data); 179 180 } catch (exception &e) { 181 napi.throw_error(e); 182 return nullptr; 183 } 184 185 nxt_unit_request_read(req, data, req->content_length); 186 187 return buffer; 188 } 189 190 191 void 192 Unit::request_handler(nxt_unit_request_info_t *req) 193 { 194 Unit *obj; 195 napi_value socket, request, response, server_obj; 196 napi_value emit_events; 197 napi_value events_args[3]; 198 199 obj = reinterpret_cast<Unit *>(req->unit->data); 200 201 try { 202 nxt_handle_scope scope(obj->env()); 203 204 server_obj = obj->get_server_object(); 205 206 socket = obj->create_socket(server_obj, req); 207 request = obj->create_request(server_obj, socket); 208 response = obj->create_response(server_obj, socket, request, req); 209 210 obj->create_headers(req, request); 211 212 emit_events = obj->get_named_property(server_obj, "emit_events"); 213 214 events_args[0] = server_obj; 215 events_args[1] = request; 216 events_args[2] = response; 217 218 nxt_async_context async_context(obj->env(), "unit_request_handler"); 219 nxt_callback_scope async_scope(async_context); 220 221 obj->make_callback(async_context, server_obj, emit_events, 222 3, events_args); 223 224 } catch (exception &e) { 225 obj->throw_error(e); 226 } 227 } 228 229 230 void 231 nxt_uv_read_callback(uv_poll_t *handle, int status, int events) 232 { 233 nxt_unit_run_once((nxt_unit_ctx_t *) handle->data); 234 } 235 236 237 int 238 Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 239 { 240 int err; 241 Unit *obj; 242 uv_loop_t *loop; 243 napi_status status; 244 nxt_nodejs_ctx_t *node_ctx; 245 246 if (port->in_fd != -1) { 247 obj = reinterpret_cast<Unit *>(ctx->unit->data); 248 249 if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) { 250 obj->throw_error("Failed to upgrade read" 251 " file descriptor to O_NONBLOCK"); 252 return -1; 253 } 254 255 status = napi_get_uv_event_loop(obj->env(), &loop); 256 if (status != napi_ok) { 257 obj->throw_error("Failed to get uv.loop"); 258 return NXT_UNIT_ERROR; 259 } 260 261 node_ctx = new nxt_nodejs_ctx_t; 262 263 err = uv_poll_init(loop, &node_ctx->poll, port->in_fd); 264 if (err < 0) { 265 obj->throw_error("Failed to init uv.poll"); 266 return NXT_UNIT_ERROR; 267 } 268 269 err = uv_poll_start(&node_ctx->poll, UV_READABLE, nxt_uv_read_callback); 270 if (err < 0) { 271 obj->throw_error("Failed to start uv.poll"); 272 return NXT_UNIT_ERROR; 273 } 274 275 ctx->data = node_ctx; 276 277 node_ctx->port_id = port->id; 278 node_ctx->poll.data = ctx; 279 } 280 281 return nxt_unit_add_port(ctx, port); 282 } 283 284 285 inline bool 286 operator == (const nxt_unit_port_id_t &p1, const nxt_unit_port_id_t &p2) 287 { 288 return p1.pid == p2.pid && p1.id == p2.id; 289 } 290 291 292 void 293 Unit::remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) 294 { 295 nxt_nodejs_ctx_t *node_ctx; 296 297 if (ctx->data != NULL) { 298 node_ctx = (nxt_nodejs_ctx_t *) ctx->data; 299 300 if (node_ctx->port_id == *port_id) { 301 uv_poll_stop(&node_ctx->poll); 302 303 delete node_ctx; 304 305 ctx->data = NULL; 306 } 307 } 308 309 nxt_unit_remove_port(ctx, port_id); 310 } 311 312 313 void 314 Unit::quit(nxt_unit_ctx_t *ctx) 315 { 316 nxt_unit_done(ctx); 317 } 318 319 320 napi_value 321 Unit::get_server_object() 322 { 323 napi_value unit_obj; 324 325 unit_obj = get_reference_value(wrapper_); 326 327 return get_named_property(unit_obj, "server"); 328 } 329 330 331 void 332 Unit::create_headers(nxt_unit_request_info_t *req, napi_value request) 333 { 334 uint32_t i; 335 napi_value headers, raw_headers; 336 napi_status status; 337 nxt_unit_request_t *r; 338 339 r = req->request; 340 341 headers = create_object(); 342 343 status = napi_create_array_with_length(env(), r->fields_count * 2, 344 &raw_headers); 345 if (status != napi_ok) { 346 throw exception("Failed to create array"); 347 } 348 349 for (i = 0; i < r->fields_count; i++) { 350 append_header(r->fields + i, headers, raw_headers, i); 351 } 352 353 set_named_property(request, "headers", headers); 354 set_named_property(request, "rawHeaders", raw_headers); 355 set_named_property(request, "httpVersion", r->version, r->version_length); 356 set_named_property(request, "method", r->method, r->method_length); 357 set_named_property(request, "url", r->target, r->target_length); 358 } 359 360 361 inline void 362 Unit::append_header(nxt_unit_field_t *f, napi_value headers, 363 napi_value raw_headers, uint32_t idx) 364 { 365 const char *name; 366 napi_value str, vstr; 367 368 name = (const char *) nxt_unit_sptr_get(&f->name); 369 370 vstr = set_named_property(headers, name, f->value, f->value_length); 371 str = create_string_latin1(name, f->name_length); 372 373 set_element(raw_headers, idx * 2, str); 374 set_element(raw_headers, idx * 2 + 1, vstr); 375 } 376 377 378 napi_value 379 Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req) 380 { 381 napi_value constructor, return_val; 382 383 constructor = get_named_property(server_obj, "socket"); 384 385 return_val = new_instance(constructor); 386 387 set_named_property(return_val, "req_pointer", (intptr_t) req); 388 389 return return_val; 390 } 391 392 393 napi_value 394 Unit::create_request(napi_value server_obj, napi_value socket) 395 { 396 napi_value constructor, return_val; 397 398 constructor = get_named_property(server_obj, "request"); 399 400 return_val = new_instance(constructor, server_obj); 401 402 set_named_property(return_val, "socket", socket); 403 404 return return_val; 405 } 406 407 408 napi_value 409 Unit::create_response(napi_value server_obj, napi_value socket, 410 napi_value request, nxt_unit_request_info_t *req) 411 { 412 napi_value constructor, return_val; 413 414 constructor = get_named_property(server_obj, "response"); 415 416 return_val = new_instance(constructor, request); 417 418 set_named_property(return_val, "socket", socket); 419 set_named_property(return_val, "_req_point", (intptr_t) req); 420 421 return return_val; 422 } 423 424 425 napi_value 426 Unit::response_send_headers(napi_env env, napi_callback_info info) 427 { 428 int ret; 429 char *ptr, *name_ptr; 430 bool is_array; 431 size_t argc, name_len, value_len; 432 uint32_t status_code, header_len, keys_len, array_len; 433 uint32_t keys_count, i, j; 434 uint16_t hash; 435 nxt_napi napi(env); 436 napi_value this_arg, headers, keys, name, value, array_val; 437 napi_value req_num, array_entry; 438 napi_valuetype val_type; 439 nxt_unit_field_t *f; 440 nxt_unit_request_info_t *req; 441 napi_value argv[5]; 442 443 argc = 5; 444 445 try { 446 this_arg = napi.get_cb_info(info, argc, argv); 447 if (argc != 5) { 448 napi.throw_error("Wrong args count. Expected: " 449 "statusCode, headers, headers count, " 450 "headers length"); 451 return nullptr; 452 } 453 454 req_num = napi.get_named_property(argv[0], "_req_point"); 455 456 req = napi.get_request_info(req_num); 457 458 status_code = napi.get_value_uint32(argv[1]); 459 keys_count = napi.get_value_uint32(argv[3]); 460 header_len = napi.get_value_uint32(argv[4]); 461 462 /* Need to reserve extra byte for C-string 0-termination. */ 463 header_len++; 464 465 headers = argv[2]; 466 467 ret = nxt_unit_response_init(req, status_code, keys_count, header_len); 468 if (ret != NXT_UNIT_OK) { 469 napi.throw_error("Failed to create response"); 470 return nullptr; 471 } 472 473 keys = napi.get_property_names(headers); 474 keys_len = napi.get_array_length(keys); 475 476 ptr = req->response_buf->free; 477 478 for (i = 0; i < keys_len; i++) { 479 name = napi.get_element(keys, i); 480 481 array_entry = napi.get_property(headers, name); 482 483 name = napi.get_element(array_entry, 0); 484 value = napi.get_element(array_entry, 1); 485 486 name_len = napi.get_value_string_latin1(name, ptr, header_len); 487 name_ptr = ptr; 488 489 ptr += name_len; 490 header_len -= name_len; 491 492 hash = nxt_unit_field_hash(name_ptr, name_len); 493 494 is_array = napi.is_array(value); 495 496 if (is_array) { 497 array_len = napi.get_array_length(value); 498 499 for (j = 0; j < array_len; j++) { 500 array_val = napi.get_element(value, j); 501 502 val_type = napi.type_of(array_val); 503 504 if (val_type != napi_string) { 505 array_val = napi.coerce_to_string(array_val); 506 } 507 508 value_len = napi.get_value_string_latin1(array_val, ptr, 509 header_len); 510 511 f = req->response->fields + req->response->fields_count; 512 f->skip = 0; 513 514 nxt_unit_sptr_set(&f->name, name_ptr); 515 516 f->name_length = name_len; 517 f->hash = hash; 518 519 nxt_unit_sptr_set(&f->value, ptr); 520 f->value_length = (uint32_t) value_len; 521 522 ptr += value_len; 523 header_len -= value_len; 524 525 req->response->fields_count++; 526 } 527 528 } else { 529 val_type = napi.type_of(value); 530 531 if (val_type != napi_string) { 532 value = napi.coerce_to_string(value); 533 } 534 535 value_len = napi.get_value_string_latin1(value, ptr, header_len); 536 537 f = req->response->fields + req->response->fields_count; 538 f->skip = 0; 539 540 nxt_unit_sptr_set(&f->name, name_ptr); 541 542 f->name_length = name_len; 543 f->hash = hash; 544 545 nxt_unit_sptr_set(&f->value, ptr); 546 f->value_length = (uint32_t) value_len; 547 548 ptr += value_len; 549 header_len -= value_len; 550 551 req->response->fields_count++; 552 } 553 } 554 555 } catch (exception &e) { 556 napi.throw_error(e); 557 return nullptr; 558 } 559 560 req->response_buf->free = ptr; 561 562 ret = nxt_unit_response_send(req); 563 if (ret != NXT_UNIT_OK) { 564 napi.throw_error("Failed to send response"); 565 return nullptr; 566 } 567 568 return this_arg; 569 } 570 571 572 napi_value 573 Unit::response_write(napi_env env, napi_callback_info info) 574 { 575 int ret; 576 char *ptr; 577 size_t argc, have_buf_len; 578 uint32_t buf_len; 579 nxt_napi napi(env); 580 napi_value this_arg, req_num; 581 napi_status status; 582 nxt_unit_buf_t *buf; 583 napi_valuetype buf_type; 584 nxt_unit_request_info_t *req; 585 napi_value argv[3]; 586 587 argc = 3; 588 589 try { 590 this_arg = napi.get_cb_info(info, argc, argv); 591 if (argc != 3) { 592 throw exception("Wrong args count. Expected: " 593 "chunk, chunk length"); 594 } 595 596 req_num = napi.get_named_property(argv[0], "_req_point"); 597 req = napi.get_request_info(req_num); 598 599 buf_len = napi.get_value_uint32(argv[2]); 600 601 buf_type = napi.type_of(argv[1]); 602 603 } catch (exception &e) { 604 napi.throw_error(e); 605 return nullptr; 606 } 607 608 buf_len++; 609 610 buf = nxt_unit_response_buf_alloc(req, buf_len); 611 if (buf == NULL) { 612 goto failed; 613 } 614 615 if (buf_type == napi_string) { 616 /* TODO: will work only for utf8 content-type */ 617 618 status = napi_get_value_string_utf8(env, argv[1], buf->free, 619 buf_len, &have_buf_len); 620 621 } else { 622 status = napi_get_buffer_info(env, argv[1], (void **) &ptr, 623 &have_buf_len); 624 625 memcpy(buf->free, ptr, have_buf_len); 626 } 627 628 if (status != napi_ok) { 629 goto failed; 630 } 631 632 buf->free += have_buf_len; 633 634 ret = nxt_unit_buf_send(buf); 635 if (ret != NXT_UNIT_OK) { 636 goto failed; 637 } 638 639 return this_arg; 640 641 failed: 642 643 napi.throw_error("Failed to write body"); 644 645 return nullptr; 646 } 647 648 649 napi_value 650 Unit::response_end(napi_env env, napi_callback_info info) 651 { 652 size_t argc; 653 nxt_napi napi(env); 654 napi_value resp, this_arg, req_num; 655 nxt_unit_request_info_t *req; 656 657 argc = 1; 658 659 try { 660 this_arg = napi.get_cb_info(info, argc, &resp); 661 662 req_num = napi.get_named_property(resp, "_req_point"); 663 req = napi.get_request_info(req_num); 664 665 } catch (exception &e) { 666 napi.throw_error(e); 667 return nullptr; 668 } 669 670 nxt_unit_request_done(req, NXT_UNIT_OK); 671 672 return this_arg; 673 } 674