Deleted
Added
1 2/* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6#include "unit.h" 7 8#include <unistd.h> 9#include <fcntl.h> 10 11#include <uv.h> 12 13#include <nxt_unit_websocket.h> 14 15 |
16napi_ref Unit::constructor_; 17 18 19struct port_data_t { |
20 port_data_t(nxt_unit_ctx_t *c, nxt_unit_port_t *p); 21 22 void process_port_msg(); 23 void stop(); 24 25 template<typename T> 26 static port_data_t *get(T *handle); 27 28 static void read_callback(uv_poll_t *handle, int status, int events); 29 static void timer_callback(uv_timer_t *handle); 30 static void delete_data(uv_handle_t* handle); 31 32 nxt_unit_ctx_t *ctx; 33 nxt_unit_port_t *port; 34 uv_poll_t poll; 35 uv_timer_t timer; 36 int ref_count; 37 bool scheduled; 38 bool stopped; |
39}; 40 41 42struct req_data_t { 43 napi_ref sock_ref; 44 napi_ref req_ref; 45 napi_ref resp_ref; 46 napi_ref conn_ref; 47}; 48 49 |
50port_data_t::port_data_t(nxt_unit_ctx_t *c, nxt_unit_port_t *p) : 51 ctx(c), port(p), ref_count(0), scheduled(false), stopped(false) 52{ 53 timer.type = UV_UNKNOWN_HANDLE; 54} 55 56 57void 58port_data_t::process_port_msg() 59{ 60 int rc, err; 61 62 rc = nxt_unit_process_port_msg(ctx, port); 63 64 if (rc != NXT_UNIT_OK) { 65 return; 66 } 67 68 if (timer.type == UV_UNKNOWN_HANDLE) { 69 err = uv_timer_init(poll.loop, &timer); 70 if (err < 0) { 71 nxt_unit_warn(ctx, "Failed to init uv.poll"); 72 return; 73 } 74 75 ref_count++; 76 timer.data = this; 77 } 78 79 if (!scheduled && !stopped) { 80 uv_timer_start(&timer, timer_callback, 0, 0); 81 82 scheduled = true; 83 } 84} 85 86 87void 88port_data_t::stop() 89{ 90 stopped = true; 91 92 uv_poll_stop(&poll); 93 94 uv_close((uv_handle_t *) &poll, delete_data); 95 96 if (timer.type == UV_UNKNOWN_HANDLE) { 97 return; 98 } 99 100 uv_timer_stop(&timer); 101 102 uv_close((uv_handle_t *) &timer, delete_data); 103} 104 105 106template<typename T> 107port_data_t * 108port_data_t::get(T *handle) 109{ 110 return (port_data_t *) handle->data; 111} 112 113 114void 115port_data_t::read_callback(uv_poll_t *handle, int status, int events) 116{ 117 get(handle)->process_port_msg(); 118} 119 120 121void 122port_data_t::timer_callback(uv_timer_t *handle) 123{ 124 port_data_t *data; 125 126 data = get(handle); 127 128 data->scheduled = false; 129 if (data->stopped) { 130 return; 131 } 132 133 data->process_port_msg(); 134} 135 136 137void 138port_data_t::delete_data(uv_handle_t* handle) 139{ 140 port_data_t *data; 141 142 data = get(handle); 143 144 if (--data->ref_count <= 0) { 145 delete data; 146 } 147} 148 149 |
150Unit::Unit(napi_env env, napi_value jsthis): 151 nxt_napi(env), 152 wrapper_(wrap(jsthis, this, destroy)), 153 unit_ctx_(nullptr) 154{ 155 nxt_unit_debug(NULL, "Unit::Unit()"); 156} 157 --- 304 unchanged lines hidden (view full) --- 462 make_callback(async_context, server_obj, emit_drain); 463 464 } catch (exception &e) { 465 nxt_unit_warn(ctx, "shm_ack_handler: %s", e.str); 466 } 467} 468 469 |
470int 471Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 472{ |
473 int err; 474 Unit *obj; 475 uv_loop_t *loop; 476 port_data_t *data; 477 napi_status status; |
478 479 if (port->in_fd != -1) { |
480 if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) { 481 nxt_unit_warn(ctx, "fcntl(%d, O_NONBLOCK) failed: %s (%d)", 482 port->in_fd, strerror(errno), errno); 483 return -1; 484 } 485 |
486 obj = reinterpret_cast<Unit *>(ctx->unit->data); 487 |
488 status = napi_get_uv_event_loop(obj->env(), &loop); 489 if (status != napi_ok) { 490 nxt_unit_warn(ctx, "Failed to get uv.loop"); 491 return NXT_UNIT_ERROR; 492 } 493 |
494 data = new port_data_t(ctx, port); |
495 496 err = uv_poll_init(loop, &data->poll, port->in_fd); 497 if (err < 0) { 498 nxt_unit_warn(ctx, "Failed to init uv.poll"); |
499 delete data; |
500 return NXT_UNIT_ERROR; 501 } 502 |
503 err = uv_poll_start(&data->poll, UV_READABLE, 504 port_data_t::read_callback); |
505 if (err < 0) { 506 nxt_unit_warn(ctx, "Failed to start uv.poll"); |
507 delete data; |
508 return NXT_UNIT_ERROR; 509 } 510 511 port->data = data; 512 |
513 data->ref_count++; |
514 data->poll.data = data; 515 } 516 517 return NXT_UNIT_OK; 518} 519 520 521void 522Unit::remove_port(nxt_unit_t *unit, nxt_unit_port_t *port) 523{ 524 port_data_t *data; 525 526 if (port->data != NULL) { 527 data = (port_data_t *) port->data; 528 |
529 data->stop(); |
530 } 531} 532 533 |
534void 535Unit::quit_cb(nxt_unit_ctx_t *ctx) 536{ 537 Unit *obj; 538 539 obj = reinterpret_cast<Unit *>(ctx->unit->data); 540 541 obj->quit(ctx); --- 620 unchanged lines hidden --- |