1 2/* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6#include "unit.h" 7 8#include <unistd.h> 9#include <fcntl.h> 10 11#include <uv.h> 12 13#include <nxt_unit_websocket.h> 14 15 16static void delete_port_data(uv_handle_t* handle); 17 18napi_ref Unit::constructor_; 19 20 21struct port_data_t { 22 nxt_unit_ctx_t *ctx; 23 nxt_unit_port_t *port; 24 uv_poll_t poll; 25}; 26 27 28struct req_data_t { 29 napi_ref sock_ref;
| 1 2/* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6#include "unit.h" 7 8#include <unistd.h> 9#include <fcntl.h> 10 11#include <uv.h> 12 13#include <nxt_unit_websocket.h> 14 15 16static void delete_port_data(uv_handle_t* handle); 17 18napi_ref Unit::constructor_; 19 20 21struct port_data_t { 22 nxt_unit_ctx_t *ctx; 23 nxt_unit_port_t *port; 24 uv_poll_t poll; 25}; 26 27 28struct req_data_t { 29 napi_ref sock_ref;
|
| 30 napi_ref req_ref;
|
30 napi_ref resp_ref; 31 napi_ref conn_ref; 32}; 33 34 35Unit::Unit(napi_env env, napi_value jsthis): 36 nxt_napi(env), 37 wrapper_(wrap(jsthis, this, destroy)), 38 unit_ctx_(nullptr) 39{ 40 nxt_unit_debug(NULL, "Unit::Unit()"); 41} 42 43 44Unit::~Unit() 45{ 46 delete_reference(wrapper_); 47 48 nxt_unit_debug(NULL, "Unit::~Unit()"); 49} 50 51 52napi_value 53Unit::init(napi_env env, napi_value exports) 54{ 55 nxt_napi napi(env); 56 napi_value ctor; 57 58 napi_property_descriptor unit_props[] = { 59 { "createServer", 0, create_server, 0, 0, 0, napi_default, 0 }, 60 { "listen", 0, listen, 0, 0, 0, napi_default, 0 }, 61 }; 62 63 try { 64 ctor = napi.define_class("Unit", create, 2, unit_props); 65 constructor_ = napi.create_reference(ctor); 66 67 napi.set_named_property(exports, "Unit", ctor);
| 31 napi_ref resp_ref; 32 napi_ref conn_ref; 33}; 34 35 36Unit::Unit(napi_env env, napi_value jsthis): 37 nxt_napi(env), 38 wrapper_(wrap(jsthis, this, destroy)), 39 unit_ctx_(nullptr) 40{ 41 nxt_unit_debug(NULL, "Unit::Unit()"); 42} 43 44 45Unit::~Unit() 46{ 47 delete_reference(wrapper_); 48 49 nxt_unit_debug(NULL, "Unit::~Unit()"); 50} 51 52 53napi_value 54Unit::init(napi_env env, napi_value exports) 55{ 56 nxt_napi napi(env); 57 napi_value ctor; 58 59 napi_property_descriptor unit_props[] = { 60 { "createServer", 0, create_server, 0, 0, 0, napi_default, 0 }, 61 { "listen", 0, listen, 0, 0, 0, napi_default, 0 }, 62 }; 63 64 try { 65 ctor = napi.define_class("Unit", create, 2, unit_props); 66 constructor_ = napi.create_reference(ctor); 67 68 napi.set_named_property(exports, "Unit", ctor);
|
| 69 napi.set_named_property(exports, "request_read", request_read);
|
68 napi.set_named_property(exports, "response_send_headers", 69 response_send_headers); 70 napi.set_named_property(exports, "response_write", response_write); 71 napi.set_named_property(exports, "response_end", response_end); 72 napi.set_named_property(exports, "websocket_send_frame", 73 websocket_send_frame); 74 napi.set_named_property(exports, "websocket_set_sock", 75 websocket_set_sock); 76 napi.set_named_property(exports, "buf_min", nxt_unit_buf_min()); 77 napi.set_named_property(exports, "buf_max", nxt_unit_buf_max()); 78 79 } catch (exception &e) { 80 napi.throw_error(e); 81 return nullptr; 82 } 83 84 return exports; 85} 86 87 88void 89Unit::destroy(napi_env env, void *nativeObject, void *finalize_hint) 90{ 91 Unit *obj = reinterpret_cast<Unit *>(nativeObject); 92 93 delete obj; 94} 95 96 97napi_value 98Unit::create(napi_env env, napi_callback_info info) 99{ 100 nxt_napi napi(env); 101 napi_value target, ctor, instance, jsthis; 102 103 try { 104 target = napi.get_new_target(info); 105 106 if (target != nullptr) { 107 /* Invoked as constructor: `new Unit(...)`. */ 108 jsthis = napi.get_cb_info(info); 109 110 new Unit(env, jsthis); 111 napi.create_reference(jsthis); 112 113 return jsthis; 114 } 115 116 /* Invoked as plain function `Unit(...)`, turn into construct call. */ 117 ctor = napi.get_reference_value(constructor_); 118 instance = napi.new_instance(ctor); 119 napi.create_reference(instance); 120 121 } catch (exception &e) { 122 napi.throw_error(e); 123 return nullptr; 124 } 125 126 return instance; 127} 128 129 130napi_value 131Unit::create_server(napi_env env, napi_callback_info info) 132{ 133 Unit *obj; 134 size_t argc; 135 nxt_napi napi(env); 136 napi_value jsthis, argv; 137 nxt_unit_init_t unit_init; 138 139 argc = 1; 140 141 try { 142 jsthis = napi.get_cb_info(info, argc, &argv); 143 obj = (Unit *) napi.unwrap(jsthis); 144 145 } catch (exception &e) { 146 napi.throw_error(e); 147 return nullptr; 148 } 149 150 memset(&unit_init, 0, sizeof(nxt_unit_init_t)); 151 152 unit_init.data = obj; 153 unit_init.callbacks.request_handler = request_handler_cb; 154 unit_init.callbacks.websocket_handler = websocket_handler_cb; 155 unit_init.callbacks.close_handler = close_handler_cb; 156 unit_init.callbacks.shm_ack_handler = shm_ack_handler_cb; 157 unit_init.callbacks.add_port = add_port; 158 unit_init.callbacks.remove_port = remove_port; 159 unit_init.callbacks.quit = quit_cb; 160 161 unit_init.request_data_size = sizeof(req_data_t); 162 163 obj->unit_ctx_ = nxt_unit_init(&unit_init); 164 if (obj->unit_ctx_ == NULL) { 165 goto failed; 166 } 167 168 return nullptr; 169 170failed: 171 172 napi_throw_error(env, NULL, "Failed to create Unit object"); 173 174 return nullptr; 175} 176 177 178napi_value 179Unit::listen(napi_env env, napi_callback_info info) 180{ 181 return nullptr; 182} 183 184 185void 186Unit::request_handler_cb(nxt_unit_request_info_t *req) 187{ 188 Unit *obj; 189 190 obj = reinterpret_cast<Unit *>(req->unit->data); 191 192 obj->request_handler(req); 193} 194 195 196void 197Unit::request_handler(nxt_unit_request_info_t *req) 198{ 199 napi_value socket, request, response, server_obj, emit_request; 200 201 memset(req->data, 0, sizeof(req_data_t)); 202 203 try { 204 nxt_handle_scope scope(env()); 205 206 server_obj = get_server_object(); 207 208 socket = create_socket(server_obj, req);
| 70 napi.set_named_property(exports, "response_send_headers", 71 response_send_headers); 72 napi.set_named_property(exports, "response_write", response_write); 73 napi.set_named_property(exports, "response_end", response_end); 74 napi.set_named_property(exports, "websocket_send_frame", 75 websocket_send_frame); 76 napi.set_named_property(exports, "websocket_set_sock", 77 websocket_set_sock); 78 napi.set_named_property(exports, "buf_min", nxt_unit_buf_min()); 79 napi.set_named_property(exports, "buf_max", nxt_unit_buf_max()); 80 81 } catch (exception &e) { 82 napi.throw_error(e); 83 return nullptr; 84 } 85 86 return exports; 87} 88 89 90void 91Unit::destroy(napi_env env, void *nativeObject, void *finalize_hint) 92{ 93 Unit *obj = reinterpret_cast<Unit *>(nativeObject); 94 95 delete obj; 96} 97 98 99napi_value 100Unit::create(napi_env env, napi_callback_info info) 101{ 102 nxt_napi napi(env); 103 napi_value target, ctor, instance, jsthis; 104 105 try { 106 target = napi.get_new_target(info); 107 108 if (target != nullptr) { 109 /* Invoked as constructor: `new Unit(...)`. */ 110 jsthis = napi.get_cb_info(info); 111 112 new Unit(env, jsthis); 113 napi.create_reference(jsthis); 114 115 return jsthis; 116 } 117 118 /* Invoked as plain function `Unit(...)`, turn into construct call. */ 119 ctor = napi.get_reference_value(constructor_); 120 instance = napi.new_instance(ctor); 121 napi.create_reference(instance); 122 123 } catch (exception &e) { 124 napi.throw_error(e); 125 return nullptr; 126 } 127 128 return instance; 129} 130 131 132napi_value 133Unit::create_server(napi_env env, napi_callback_info info) 134{ 135 Unit *obj; 136 size_t argc; 137 nxt_napi napi(env); 138 napi_value jsthis, argv; 139 nxt_unit_init_t unit_init; 140 141 argc = 1; 142 143 try { 144 jsthis = napi.get_cb_info(info, argc, &argv); 145 obj = (Unit *) napi.unwrap(jsthis); 146 147 } catch (exception &e) { 148 napi.throw_error(e); 149 return nullptr; 150 } 151 152 memset(&unit_init, 0, sizeof(nxt_unit_init_t)); 153 154 unit_init.data = obj; 155 unit_init.callbacks.request_handler = request_handler_cb; 156 unit_init.callbacks.websocket_handler = websocket_handler_cb; 157 unit_init.callbacks.close_handler = close_handler_cb; 158 unit_init.callbacks.shm_ack_handler = shm_ack_handler_cb; 159 unit_init.callbacks.add_port = add_port; 160 unit_init.callbacks.remove_port = remove_port; 161 unit_init.callbacks.quit = quit_cb; 162 163 unit_init.request_data_size = sizeof(req_data_t); 164 165 obj->unit_ctx_ = nxt_unit_init(&unit_init); 166 if (obj->unit_ctx_ == NULL) { 167 goto failed; 168 } 169 170 return nullptr; 171 172failed: 173 174 napi_throw_error(env, NULL, "Failed to create Unit object"); 175 176 return nullptr; 177} 178 179 180napi_value 181Unit::listen(napi_env env, napi_callback_info info) 182{ 183 return nullptr; 184} 185 186 187void 188Unit::request_handler_cb(nxt_unit_request_info_t *req) 189{ 190 Unit *obj; 191 192 obj = reinterpret_cast<Unit *>(req->unit->data); 193 194 obj->request_handler(req); 195} 196 197 198void 199Unit::request_handler(nxt_unit_request_info_t *req) 200{ 201 napi_value socket, request, response, server_obj, emit_request; 202 203 memset(req->data, 0, sizeof(req_data_t)); 204 205 try { 206 nxt_handle_scope scope(env()); 207 208 server_obj = get_server_object(); 209 210 socket = create_socket(server_obj, req);
|
209 request = create_request(server_obj, socket);
| 211 request = create_request(server_obj, socket, req);
|
210 response = create_response(server_obj, request, req); 211 212 create_headers(req, request); 213 214 emit_request = get_named_property(server_obj, "emit_request"); 215 216 nxt_async_context async_context(env(), "request_handler"); 217 nxt_callback_scope async_scope(async_context); 218 219 make_callback(async_context, server_obj, emit_request, request, 220 response); 221 222 } catch (exception &e) { 223 nxt_unit_req_warn(req, "request_handler: %s", e.str); 224 } 225} 226 227 228void 229Unit::websocket_handler_cb(nxt_unit_websocket_frame_t *ws) 230{ 231 Unit *obj; 232 233 obj = reinterpret_cast<Unit *>(ws->req->unit->data); 234 235 obj->websocket_handler(ws); 236} 237 238 239void 240Unit::websocket_handler(nxt_unit_websocket_frame_t *ws) 241{ 242 napi_value frame, server_obj, process_frame, conn; 243 req_data_t *req_data; 244 245 req_data = (req_data_t *) ws->req->data; 246 247 try { 248 nxt_handle_scope scope(env()); 249 250 server_obj = get_server_object(); 251 252 frame = create_websocket_frame(server_obj, ws); 253 254 conn = get_reference_value(req_data->conn_ref); 255 256 process_frame = get_named_property(conn, "processFrame"); 257 258 nxt_async_context async_context(env(), "websocket_handler"); 259 nxt_callback_scope async_scope(async_context); 260 261 make_callback(async_context, conn, process_frame, frame); 262 263 } catch (exception &e) { 264 nxt_unit_req_warn(ws->req, "websocket_handler: %s", e.str); 265 } 266 267 nxt_unit_websocket_done(ws); 268} 269 270 271void 272Unit::close_handler_cb(nxt_unit_request_info_t *req) 273{ 274 Unit *obj; 275 276 obj = reinterpret_cast<Unit *>(req->unit->data); 277 278 obj->close_handler(req); 279} 280 281 282void 283Unit::close_handler(nxt_unit_request_info_t *req) 284{ 285 napi_value conn_handle_close, conn; 286 req_data_t *req_data; 287 288 req_data = (req_data_t *) req->data; 289 290 try { 291 nxt_handle_scope scope(env()); 292 293 conn = get_reference_value(req_data->conn_ref); 294 295 conn_handle_close = get_named_property(conn, "handleSocketClose"); 296 297 nxt_async_context async_context(env(), "close_handler"); 298 nxt_callback_scope async_scope(async_context); 299 300 make_callback(async_context, conn, conn_handle_close, 301 nxt_napi::create(0)); 302 303 remove_wrap(req_data->sock_ref);
| 212 response = create_response(server_obj, request, req); 213 214 create_headers(req, request); 215 216 emit_request = get_named_property(server_obj, "emit_request"); 217 218 nxt_async_context async_context(env(), "request_handler"); 219 nxt_callback_scope async_scope(async_context); 220 221 make_callback(async_context, server_obj, emit_request, request, 222 response); 223 224 } catch (exception &e) { 225 nxt_unit_req_warn(req, "request_handler: %s", e.str); 226 } 227} 228 229 230void 231Unit::websocket_handler_cb(nxt_unit_websocket_frame_t *ws) 232{ 233 Unit *obj; 234 235 obj = reinterpret_cast<Unit *>(ws->req->unit->data); 236 237 obj->websocket_handler(ws); 238} 239 240 241void 242Unit::websocket_handler(nxt_unit_websocket_frame_t *ws) 243{ 244 napi_value frame, server_obj, process_frame, conn; 245 req_data_t *req_data; 246 247 req_data = (req_data_t *) ws->req->data; 248 249 try { 250 nxt_handle_scope scope(env()); 251 252 server_obj = get_server_object(); 253 254 frame = create_websocket_frame(server_obj, ws); 255 256 conn = get_reference_value(req_data->conn_ref); 257 258 process_frame = get_named_property(conn, "processFrame"); 259 260 nxt_async_context async_context(env(), "websocket_handler"); 261 nxt_callback_scope async_scope(async_context); 262 263 make_callback(async_context, conn, process_frame, frame); 264 265 } catch (exception &e) { 266 nxt_unit_req_warn(ws->req, "websocket_handler: %s", e.str); 267 } 268 269 nxt_unit_websocket_done(ws); 270} 271 272 273void 274Unit::close_handler_cb(nxt_unit_request_info_t *req) 275{ 276 Unit *obj; 277 278 obj = reinterpret_cast<Unit *>(req->unit->data); 279 280 obj->close_handler(req); 281} 282 283 284void 285Unit::close_handler(nxt_unit_request_info_t *req) 286{ 287 napi_value conn_handle_close, conn; 288 req_data_t *req_data; 289 290 req_data = (req_data_t *) req->data; 291 292 try { 293 nxt_handle_scope scope(env()); 294 295 conn = get_reference_value(req_data->conn_ref); 296 297 conn_handle_close = get_named_property(conn, "handleSocketClose"); 298 299 nxt_async_context async_context(env(), "close_handler"); 300 nxt_callback_scope async_scope(async_context); 301 302 make_callback(async_context, conn, conn_handle_close, 303 nxt_napi::create(0)); 304 305 remove_wrap(req_data->sock_ref);
|
| 306 remove_wrap(req_data->req_ref);
|
304 remove_wrap(req_data->resp_ref); 305 remove_wrap(req_data->conn_ref); 306 307 } catch (exception &e) { 308 nxt_unit_req_warn(req, "close_handler: %s", e.str); 309 310 nxt_unit_request_done(req, NXT_UNIT_ERROR); 311 312 return; 313 } 314 315 nxt_unit_request_done(req, NXT_UNIT_OK); 316} 317 318 319void 320Unit::shm_ack_handler_cb(nxt_unit_ctx_t *ctx) 321{ 322 Unit *obj; 323 324 obj = reinterpret_cast<Unit *>(ctx->unit->data); 325 326 obj->shm_ack_handler(ctx); 327} 328 329 330void 331Unit::shm_ack_handler(nxt_unit_ctx_t *ctx) 332{ 333 napi_value server_obj, emit_drain; 334 335 try { 336 nxt_handle_scope scope(env()); 337 338 server_obj = get_server_object(); 339 340 emit_drain = get_named_property(server_obj, "emit_drain"); 341 342 nxt_async_context async_context(env(), "shm_ack_handler"); 343 nxt_callback_scope async_scope(async_context); 344 345 make_callback(async_context, server_obj, emit_drain); 346 347 } catch (exception &e) { 348 nxt_unit_warn(ctx, "shm_ack_handler: %s", e.str); 349 } 350} 351 352 353static void 354nxt_uv_read_callback(uv_poll_t *handle, int status, int events) 355{ 356 port_data_t *data; 357 358 data = (port_data_t *) handle->data; 359 360 nxt_unit_process_port_msg(data->ctx, data->port); 361} 362 363 364int 365Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 366{ 367 int err; 368 Unit *obj; 369 uv_loop_t *loop; 370 port_data_t *data; 371 napi_status status; 372 373 if (port->in_fd != -1) { 374 obj = reinterpret_cast<Unit *>(ctx->unit->data); 375 376 if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) { 377 nxt_unit_warn(ctx, "fcntl(%d, O_NONBLOCK) failed: %s (%d)", 378 port->in_fd, strerror(errno), errno); 379 return -1; 380 } 381 382 status = napi_get_uv_event_loop(obj->env(), &loop); 383 if (status != napi_ok) { 384 nxt_unit_warn(ctx, "Failed to get uv.loop"); 385 return NXT_UNIT_ERROR; 386 } 387 388 data = new port_data_t; 389 390 err = uv_poll_init(loop, &data->poll, port->in_fd); 391 if (err < 0) { 392 nxt_unit_warn(ctx, "Failed to init uv.poll"); 393 return NXT_UNIT_ERROR; 394 } 395 396 err = uv_poll_start(&data->poll, UV_READABLE, nxt_uv_read_callback); 397 if (err < 0) { 398 nxt_unit_warn(ctx, "Failed to start uv.poll"); 399 return NXT_UNIT_ERROR; 400 } 401 402 port->data = data; 403 404 data->ctx = ctx; 405 data->port = port; 406 data->poll.data = data; 407 } 408 409 return NXT_UNIT_OK; 410} 411 412 413void 414Unit::remove_port(nxt_unit_t *unit, nxt_unit_port_t *port) 415{ 416 port_data_t *data; 417 418 if (port->data != NULL) { 419 data = (port_data_t *) port->data; 420 421 if (data->port == port) { 422 uv_poll_stop(&data->poll); 423 424 uv_close((uv_handle_t *) &data->poll, delete_port_data); 425 } 426 } 427} 428 429 430static void 431delete_port_data(uv_handle_t* handle) 432{ 433 port_data_t *data; 434 435 data = (port_data_t *) handle->data; 436 437 delete data; 438} 439 440 441void 442Unit::quit_cb(nxt_unit_ctx_t *ctx) 443{ 444 Unit *obj; 445 446 obj = reinterpret_cast<Unit *>(ctx->unit->data); 447 448 obj->quit(ctx); 449} 450 451 452void 453Unit::quit(nxt_unit_ctx_t *ctx) 454{ 455 napi_value server_obj, emit_close; 456 457 try { 458 nxt_handle_scope scope(env()); 459 460 server_obj = get_server_object(); 461 462 emit_close = get_named_property(server_obj, "emit_close"); 463 464 nxt_async_context async_context(env(), "unit_quit"); 465 nxt_callback_scope async_scope(async_context); 466 467 make_callback(async_context, server_obj, emit_close); 468 469 } catch (exception &e) { 470 nxt_unit_debug(ctx, "quit: %s", e.str); 471 } 472 473 nxt_unit_done(ctx); 474} 475 476 477napi_value 478Unit::get_server_object() 479{ 480 napi_value unit_obj; 481 482 unit_obj = get_reference_value(wrapper_); 483 484 return get_named_property(unit_obj, "server"); 485} 486 487 488void 489Unit::create_headers(nxt_unit_request_info_t *req, napi_value request) 490{
| 307 remove_wrap(req_data->resp_ref); 308 remove_wrap(req_data->conn_ref); 309 310 } catch (exception &e) { 311 nxt_unit_req_warn(req, "close_handler: %s", e.str); 312 313 nxt_unit_request_done(req, NXT_UNIT_ERROR); 314 315 return; 316 } 317 318 nxt_unit_request_done(req, NXT_UNIT_OK); 319} 320 321 322void 323Unit::shm_ack_handler_cb(nxt_unit_ctx_t *ctx) 324{ 325 Unit *obj; 326 327 obj = reinterpret_cast<Unit *>(ctx->unit->data); 328 329 obj->shm_ack_handler(ctx); 330} 331 332 333void 334Unit::shm_ack_handler(nxt_unit_ctx_t *ctx) 335{ 336 napi_value server_obj, emit_drain; 337 338 try { 339 nxt_handle_scope scope(env()); 340 341 server_obj = get_server_object(); 342 343 emit_drain = get_named_property(server_obj, "emit_drain"); 344 345 nxt_async_context async_context(env(), "shm_ack_handler"); 346 nxt_callback_scope async_scope(async_context); 347 348 make_callback(async_context, server_obj, emit_drain); 349 350 } catch (exception &e) { 351 nxt_unit_warn(ctx, "shm_ack_handler: %s", e.str); 352 } 353} 354 355 356static void 357nxt_uv_read_callback(uv_poll_t *handle, int status, int events) 358{ 359 port_data_t *data; 360 361 data = (port_data_t *) handle->data; 362 363 nxt_unit_process_port_msg(data->ctx, data->port); 364} 365 366 367int 368Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 369{ 370 int err; 371 Unit *obj; 372 uv_loop_t *loop; 373 port_data_t *data; 374 napi_status status; 375 376 if (port->in_fd != -1) { 377 obj = reinterpret_cast<Unit *>(ctx->unit->data); 378 379 if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) { 380 nxt_unit_warn(ctx, "fcntl(%d, O_NONBLOCK) failed: %s (%d)", 381 port->in_fd, strerror(errno), errno); 382 return -1; 383 } 384 385 status = napi_get_uv_event_loop(obj->env(), &loop); 386 if (status != napi_ok) { 387 nxt_unit_warn(ctx, "Failed to get uv.loop"); 388 return NXT_UNIT_ERROR; 389 } 390 391 data = new port_data_t; 392 393 err = uv_poll_init(loop, &data->poll, port->in_fd); 394 if (err < 0) { 395 nxt_unit_warn(ctx, "Failed to init uv.poll"); 396 return NXT_UNIT_ERROR; 397 } 398 399 err = uv_poll_start(&data->poll, UV_READABLE, nxt_uv_read_callback); 400 if (err < 0) { 401 nxt_unit_warn(ctx, "Failed to start uv.poll"); 402 return NXT_UNIT_ERROR; 403 } 404 405 port->data = data; 406 407 data->ctx = ctx; 408 data->port = port; 409 data->poll.data = data; 410 } 411 412 return NXT_UNIT_OK; 413} 414 415 416void 417Unit::remove_port(nxt_unit_t *unit, nxt_unit_port_t *port) 418{ 419 port_data_t *data; 420 421 if (port->data != NULL) { 422 data = (port_data_t *) port->data; 423 424 if (data->port == port) { 425 uv_poll_stop(&data->poll); 426 427 uv_close((uv_handle_t *) &data->poll, delete_port_data); 428 } 429 } 430} 431 432 433static void 434delete_port_data(uv_handle_t* handle) 435{ 436 port_data_t *data; 437 438 data = (port_data_t *) handle->data; 439 440 delete data; 441} 442 443 444void 445Unit::quit_cb(nxt_unit_ctx_t *ctx) 446{ 447 Unit *obj; 448 449 obj = reinterpret_cast<Unit *>(ctx->unit->data); 450 451 obj->quit(ctx); 452} 453 454 455void 456Unit::quit(nxt_unit_ctx_t *ctx) 457{ 458 napi_value server_obj, emit_close; 459 460 try { 461 nxt_handle_scope scope(env()); 462 463 server_obj = get_server_object(); 464 465 emit_close = get_named_property(server_obj, "emit_close"); 466 467 nxt_async_context async_context(env(), "unit_quit"); 468 nxt_callback_scope async_scope(async_context); 469 470 make_callback(async_context, server_obj, emit_close); 471 472 } catch (exception &e) { 473 nxt_unit_debug(ctx, "quit: %s", e.str); 474 } 475 476 nxt_unit_done(ctx); 477} 478 479 480napi_value 481Unit::get_server_object() 482{ 483 napi_value unit_obj; 484 485 unit_obj = get_reference_value(wrapper_); 486 487 return get_named_property(unit_obj, "server"); 488} 489 490 491void 492Unit::create_headers(nxt_unit_request_info_t *req, napi_value request) 493{
|
491 void *data;
| |
492 uint32_t i;
| 494 uint32_t i;
|
493 napi_value headers, raw_headers, buffer;
| 495 napi_value headers, raw_headers;
|
494 napi_status status; 495 nxt_unit_request_t *r; 496 497 r = req->request; 498 499 headers = create_object(); 500 501 status = napi_create_array_with_length(env(), r->fields_count * 2, 502 &raw_headers); 503 if (status != napi_ok) { 504 throw exception("Failed to create array"); 505 } 506 507 for (i = 0; i < r->fields_count; i++) { 508 append_header(r->fields + i, headers, raw_headers, i); 509 } 510 511 set_named_property(request, "headers", headers); 512 set_named_property(request, "rawHeaders", raw_headers); 513 set_named_property(request, "httpVersion", r->version, r->version_length); 514 set_named_property(request, "method", r->method, r->method_length); 515 set_named_property(request, "url", r->target, r->target_length); 516 517 set_named_property(request, "_websocket_handshake", r->websocket_handshake);
| 496 napi_status status; 497 nxt_unit_request_t *r; 498 499 r = req->request; 500 501 headers = create_object(); 502 503 status = napi_create_array_with_length(env(), r->fields_count * 2, 504 &raw_headers); 505 if (status != napi_ok) { 506 throw exception("Failed to create array"); 507 } 508 509 for (i = 0; i < r->fields_count; i++) { 510 append_header(r->fields + i, headers, raw_headers, i); 511 } 512 513 set_named_property(request, "headers", headers); 514 set_named_property(request, "rawHeaders", raw_headers); 515 set_named_property(request, "httpVersion", r->version, r->version_length); 516 set_named_property(request, "method", r->method, r->method_length); 517 set_named_property(request, "url", r->target, r->target_length); 518 519 set_named_property(request, "_websocket_handshake", r->websocket_handshake);
|
518 519 buffer = create_buffer((size_t) req->content_length, &data); 520 nxt_unit_request_read(req, data, req->content_length); 521 522 set_named_property(request, "_data", buffer);
| |
523} 524 525 526inline char 527lowcase(char c) 528{ 529 return (c >= 'A' && c <= 'Z') ? (c | 0x20) : c; 530} 531 532 533inline void 534Unit::append_header(nxt_unit_field_t *f, napi_value headers, 535 napi_value raw_headers, uint32_t idx) 536{ 537 char *name; 538 uint8_t i; 539 napi_value str, vstr; 540 541 name = (char *) nxt_unit_sptr_get(&f->name); 542 543 str = create_string_latin1(name, f->name_length); 544 545 for (i = 0; i < f->name_length; i++) { 546 name[i] = lowcase(name[i]); 547 } 548 549 vstr = set_named_property(headers, name, f->value, f->value_length); 550 551 set_element(raw_headers, idx * 2, str); 552 set_element(raw_headers, idx * 2 + 1, vstr); 553} 554 555 556napi_value 557Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req) 558{ 559 napi_value constructor, res; 560 req_data_t *req_data; 561 nxt_unit_request_t *r; 562 563 r = req->request; 564 565 constructor = get_named_property(server_obj, "Socket"); 566 567 res = new_instance(constructor); 568 569 req_data = (req_data_t *) req->data; 570 req_data->sock_ref = wrap(res, req, sock_destroy); 571 572 set_named_property(res, "remoteAddress", r->remote, r->remote_length); 573 set_named_property(res, "localAddress", r->local, r->local_length); 574 575 return res; 576} 577 578 579napi_value
| 520} 521 522 523inline char 524lowcase(char c) 525{ 526 return (c >= 'A' && c <= 'Z') ? (c | 0x20) : c; 527} 528 529 530inline void 531Unit::append_header(nxt_unit_field_t *f, napi_value headers, 532 napi_value raw_headers, uint32_t idx) 533{ 534 char *name; 535 uint8_t i; 536 napi_value str, vstr; 537 538 name = (char *) nxt_unit_sptr_get(&f->name); 539 540 str = create_string_latin1(name, f->name_length); 541 542 for (i = 0; i < f->name_length; i++) { 543 name[i] = lowcase(name[i]); 544 } 545 546 vstr = set_named_property(headers, name, f->value, f->value_length); 547 548 set_element(raw_headers, idx * 2, str); 549 set_element(raw_headers, idx * 2 + 1, vstr); 550} 551 552 553napi_value 554Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req) 555{ 556 napi_value constructor, res; 557 req_data_t *req_data; 558 nxt_unit_request_t *r; 559 560 r = req->request; 561 562 constructor = get_named_property(server_obj, "Socket"); 563 564 res = new_instance(constructor); 565 566 req_data = (req_data_t *) req->data; 567 req_data->sock_ref = wrap(res, req, sock_destroy); 568 569 set_named_property(res, "remoteAddress", r->remote, r->remote_length); 570 set_named_property(res, "localAddress", r->local, r->local_length); 571 572 return res; 573} 574 575 576napi_value
|
580Unit::create_request(napi_value server_obj, napi_value socket)
| 577Unit::create_request(napi_value server_obj, napi_value socket, 578 nxt_unit_request_info_t *req)
|
581{
| 579{
|
582 napi_value constructor;
| 580 napi_value constructor, res; 581 req_data_t *req_data;
|
583 584 constructor = get_named_property(server_obj, "ServerRequest"); 585
| 582 583 constructor = get_named_property(server_obj, "ServerRequest"); 584
|
586 return new_instance(constructor, server_obj, socket);
| 585 res = new_instance(constructor, server_obj, socket); 586 587 req_data = (req_data_t *) req->data; 588 req_data->req_ref = wrap(res, req, req_destroy); 589 590 return res;
|
587} 588 589 590napi_value 591Unit::create_response(napi_value server_obj, napi_value request, 592 nxt_unit_request_info_t *req) 593{ 594 napi_value constructor, res; 595 req_data_t *req_data; 596 597 constructor = get_named_property(server_obj, "ServerResponse"); 598 599 res = new_instance(constructor, request); 600 601 req_data = (req_data_t *) req->data; 602 req_data->resp_ref = wrap(res, req, resp_destroy); 603 604 return res; 605} 606 607 608napi_value 609Unit::create_websocket_frame(napi_value server_obj, 610 nxt_unit_websocket_frame_t *ws) 611{ 612 void *data; 613 napi_value constructor, res, buffer; 614 uint8_t sc[2]; 615 616 constructor = get_named_property(server_obj, "WebSocketFrame"); 617 618 res = new_instance(constructor); 619 620 set_named_property(res, "fin", (bool) ws->header->fin); 621 set_named_property(res, "opcode", ws->header->opcode); 622 set_named_property(res, "length", (int64_t) ws->payload_len); 623 624 if (ws->header->opcode == NXT_WEBSOCKET_OP_CLOSE) { 625 if (ws->payload_len >= 2) { 626 nxt_unit_websocket_read(ws, sc, 2); 627 628 set_named_property(res, "closeStatus", 629 (((uint16_t) sc[0]) << 8) | sc[1]); 630 631 } else { 632 set_named_property(res, "closeStatus", -1); 633 } 634 } 635 636 buffer = create_buffer((size_t) ws->content_length, &data); 637 nxt_unit_websocket_read(ws, data, ws->content_length); 638 639 set_named_property(res, "binaryPayload", buffer); 640 641 return res; 642} 643 644 645napi_value
| 591} 592 593 594napi_value 595Unit::create_response(napi_value server_obj, napi_value request, 596 nxt_unit_request_info_t *req) 597{ 598 napi_value constructor, res; 599 req_data_t *req_data; 600 601 constructor = get_named_property(server_obj, "ServerResponse"); 602 603 res = new_instance(constructor, request); 604 605 req_data = (req_data_t *) req->data; 606 req_data->resp_ref = wrap(res, req, resp_destroy); 607 608 return res; 609} 610 611 612napi_value 613Unit::create_websocket_frame(napi_value server_obj, 614 nxt_unit_websocket_frame_t *ws) 615{ 616 void *data; 617 napi_value constructor, res, buffer; 618 uint8_t sc[2]; 619 620 constructor = get_named_property(server_obj, "WebSocketFrame"); 621 622 res = new_instance(constructor); 623 624 set_named_property(res, "fin", (bool) ws->header->fin); 625 set_named_property(res, "opcode", ws->header->opcode); 626 set_named_property(res, "length", (int64_t) ws->payload_len); 627 628 if (ws->header->opcode == NXT_WEBSOCKET_OP_CLOSE) { 629 if (ws->payload_len >= 2) { 630 nxt_unit_websocket_read(ws, sc, 2); 631 632 set_named_property(res, "closeStatus", 633 (((uint16_t) sc[0]) << 8) | sc[1]); 634 635 } else { 636 set_named_property(res, "closeStatus", -1); 637 } 638 } 639 640 buffer = create_buffer((size_t) ws->content_length, &data); 641 nxt_unit_websocket_read(ws, data, ws->content_length); 642 643 set_named_property(res, "binaryPayload", buffer); 644 645 return res; 646} 647 648 649napi_value
|
| 650Unit::request_read(napi_env env, napi_callback_info info) 651{ 652 void *data; 653 uint32_t wm; 654 nxt_napi napi(env); 655 napi_value this_arg, argv, buffer; 656 nxt_unit_request_info_t *req; 657 658 try { 659 this_arg = napi.get_cb_info(info, argv); 660 661 try { 662 req = napi.get_request_info(this_arg); 663 664 } catch (exception &e) { 665 return nullptr; 666 } 667 668 if (req->content_length == 0) { 669 return nullptr; 670 } 671 672 wm = napi.get_value_uint32(argv); 673 674 if (wm > req->content_length) { 675 wm = req->content_length; 676 } 677 678 buffer = napi.create_buffer((size_t) wm, &data); 679 nxt_unit_request_read(req, data, wm); 680 681 } catch (exception &e) { 682 napi.throw_error(e); 683 return nullptr; 684 } 685 686 return buffer; 687} 688 689 690napi_value
|
646Unit::response_send_headers(napi_env env, napi_callback_info info) 647{ 648 int ret; 649 char *ptr, *name_ptr; 650 bool is_array; 651 size_t argc, name_len, value_len; 652 uint32_t status_code, header_len, keys_len, array_len; 653 uint32_t keys_count, i, j; 654 uint16_t hash; 655 nxt_napi napi(env); 656 napi_value this_arg, headers, keys, name, value, array_val; 657 napi_value array_entry; 658 napi_valuetype val_type; 659 nxt_unit_field_t *f; 660 nxt_unit_request_info_t *req; 661 napi_value argv[4]; 662 663 argc = 4; 664 665 try { 666 this_arg = napi.get_cb_info(info, argc, argv); 667 if (argc != 4) { 668 napi.throw_error("Wrong args count. Expected: " 669 "statusCode, headers, headers count, " 670 "headers length"); 671 return nullptr; 672 } 673 674 req = napi.get_request_info(this_arg); 675 status_code = napi.get_value_uint32(argv[0]); 676 keys_count = napi.get_value_uint32(argv[2]); 677 header_len = napi.get_value_uint32(argv[3]); 678 679 headers = argv[1]; 680 681 ret = nxt_unit_response_init(req, status_code, keys_count, header_len); 682 if (ret != NXT_UNIT_OK) { 683 napi.throw_error("Failed to create response"); 684 return nullptr; 685 } 686 687 /* 688 * Each name and value are 0-terminated by libunit. 689 * Need to add extra 2 bytes for each header. 690 */ 691 header_len += keys_count * 2; 692 693 keys = napi.get_property_names(headers); 694 keys_len = napi.get_array_length(keys); 695 696 ptr = req->response_buf->free; 697 698 for (i = 0; i < keys_len; i++) { 699 name = napi.get_element(keys, i); 700 701 array_entry = napi.get_property(headers, name); 702 703 name = napi.get_element(array_entry, 0); 704 value = napi.get_element(array_entry, 1); 705 706 name_len = napi.get_value_string_latin1(name, ptr, header_len); 707 name_ptr = ptr; 708 709 ptr += name_len + 1; 710 header_len -= name_len + 1; 711 712 hash = nxt_unit_field_hash(name_ptr, name_len); 713 714 is_array = napi.is_array(value); 715 716 if (is_array) { 717 array_len = napi.get_array_length(value); 718 719 for (j = 0; j < array_len; j++) { 720 array_val = napi.get_element(value, j); 721 722 val_type = napi.type_of(array_val); 723 724 if (val_type != napi_string) { 725 array_val = napi.coerce_to_string(array_val); 726 } 727 728 value_len = napi.get_value_string_latin1(array_val, ptr, 729 header_len); 730 731 f = req->response->fields + req->response->fields_count; 732 f->skip = 0; 733 734 nxt_unit_sptr_set(&f->name, name_ptr); 735 736 f->name_length = name_len; 737 f->hash = hash; 738 739 nxt_unit_sptr_set(&f->value, ptr); 740 f->value_length = (uint32_t) value_len; 741 742 ptr += value_len + 1; 743 header_len -= value_len + 1; 744 745 req->response->fields_count++; 746 } 747 748 } else { 749 val_type = napi.type_of(value); 750 751 if (val_type != napi_string) { 752 value = napi.coerce_to_string(value); 753 } 754 755 value_len = napi.get_value_string_latin1(value, ptr, header_len); 756 757 f = req->response->fields + req->response->fields_count; 758 f->skip = 0; 759 760 nxt_unit_sptr_set(&f->name, name_ptr); 761 762 f->name_length = name_len; 763 f->hash = hash; 764 765 nxt_unit_sptr_set(&f->value, ptr); 766 f->value_length = (uint32_t) value_len; 767 768 ptr += value_len + 1; 769 header_len -= value_len + 1; 770 771 req->response->fields_count++; 772 } 773 } 774 775 } catch (exception &e) { 776 napi.throw_error(e); 777 return nullptr; 778 } 779 780 req->response_buf->free = ptr; 781 782 ret = nxt_unit_response_send(req); 783 if (ret != NXT_UNIT_OK) { 784 napi.throw_error("Failed to send response"); 785 return nullptr; 786 } 787 788 return this_arg; 789} 790 791 792napi_value 793Unit::response_write(napi_env env, napi_callback_info info) 794{ 795 int ret; 796 void *ptr; 797 size_t argc, have_buf_len; 798 ssize_t res_len; 799 uint32_t buf_start, buf_len; 800 nxt_napi napi(env); 801 napi_value this_arg; 802 nxt_unit_buf_t *buf; 803 napi_valuetype buf_type; 804 nxt_unit_request_info_t *req; 805 napi_value argv[3]; 806 807 argc = 3; 808 809 try { 810 this_arg = napi.get_cb_info(info, argc, argv); 811 if (argc != 3) { 812 throw exception("Wrong args count. Expected: " 813 "chunk, start, length"); 814 } 815 816 req = napi.get_request_info(this_arg); 817 buf_type = napi.type_of(argv[0]); 818 buf_start = napi.get_value_uint32(argv[1]); 819 buf_len = napi.get_value_uint32(argv[2]) + 1; 820 821 if (buf_type == napi_string) { 822 /* TODO: will work only for utf8 content-type */ 823 824 if (req->response_buf != NULL 825 && req->response_buf->end >= req->response_buf->free + buf_len) 826 { 827 buf = req->response_buf; 828 829 } else { 830 buf = nxt_unit_response_buf_alloc(req, buf_len); 831 if (buf == NULL) { 832 throw exception("Failed to allocate response buffer"); 833 } 834 } 835 836 have_buf_len = napi.get_value_string_utf8(argv[0], buf->free, 837 buf_len); 838 839 buf->free += have_buf_len; 840 841 ret = nxt_unit_buf_send(buf); 842 if (ret == NXT_UNIT_OK) { 843 res_len = have_buf_len; 844 } 845 846 } else { 847 ptr = napi.get_buffer_info(argv[0], have_buf_len); 848 849 if (buf_start > 0) { 850 ptr = ((uint8_t *) ptr) + buf_start; 851 have_buf_len -= buf_start; 852 } 853 854 res_len = nxt_unit_response_write_nb(req, ptr, have_buf_len, 0); 855 856 ret = res_len < 0 ? -res_len : (int) NXT_UNIT_OK; 857 } 858 859 if (ret != NXT_UNIT_OK) { 860 throw exception("Failed to send body buf"); 861 } 862 } catch (exception &e) { 863 napi.throw_error(e); 864 return nullptr; 865 } 866 867 return napi.create((int64_t) res_len); 868} 869 870 871napi_value 872Unit::response_end(napi_env env, napi_callback_info info) 873{ 874 nxt_napi napi(env); 875 napi_value this_arg; 876 req_data_t *req_data; 877 nxt_unit_request_info_t *req; 878 879 try { 880 this_arg = napi.get_cb_info(info); 881 882 req = napi.get_request_info(this_arg); 883 884 req_data = (req_data_t *) req->data; 885 886 napi.remove_wrap(req_data->sock_ref);
| 691Unit::response_send_headers(napi_env env, napi_callback_info info) 692{ 693 int ret; 694 char *ptr, *name_ptr; 695 bool is_array; 696 size_t argc, name_len, value_len; 697 uint32_t status_code, header_len, keys_len, array_len; 698 uint32_t keys_count, i, j; 699 uint16_t hash; 700 nxt_napi napi(env); 701 napi_value this_arg, headers, keys, name, value, array_val; 702 napi_value array_entry; 703 napi_valuetype val_type; 704 nxt_unit_field_t *f; 705 nxt_unit_request_info_t *req; 706 napi_value argv[4]; 707 708 argc = 4; 709 710 try { 711 this_arg = napi.get_cb_info(info, argc, argv); 712 if (argc != 4) { 713 napi.throw_error("Wrong args count. Expected: " 714 "statusCode, headers, headers count, " 715 "headers length"); 716 return nullptr; 717 } 718 719 req = napi.get_request_info(this_arg); 720 status_code = napi.get_value_uint32(argv[0]); 721 keys_count = napi.get_value_uint32(argv[2]); 722 header_len = napi.get_value_uint32(argv[3]); 723 724 headers = argv[1]; 725 726 ret = nxt_unit_response_init(req, status_code, keys_count, header_len); 727 if (ret != NXT_UNIT_OK) { 728 napi.throw_error("Failed to create response"); 729 return nullptr; 730 } 731 732 /* 733 * Each name and value are 0-terminated by libunit. 734 * Need to add extra 2 bytes for each header. 735 */ 736 header_len += keys_count * 2; 737 738 keys = napi.get_property_names(headers); 739 keys_len = napi.get_array_length(keys); 740 741 ptr = req->response_buf->free; 742 743 for (i = 0; i < keys_len; i++) { 744 name = napi.get_element(keys, i); 745 746 array_entry = napi.get_property(headers, name); 747 748 name = napi.get_element(array_entry, 0); 749 value = napi.get_element(array_entry, 1); 750 751 name_len = napi.get_value_string_latin1(name, ptr, header_len); 752 name_ptr = ptr; 753 754 ptr += name_len + 1; 755 header_len -= name_len + 1; 756 757 hash = nxt_unit_field_hash(name_ptr, name_len); 758 759 is_array = napi.is_array(value); 760 761 if (is_array) { 762 array_len = napi.get_array_length(value); 763 764 for (j = 0; j < array_len; j++) { 765 array_val = napi.get_element(value, j); 766 767 val_type = napi.type_of(array_val); 768 769 if (val_type != napi_string) { 770 array_val = napi.coerce_to_string(array_val); 771 } 772 773 value_len = napi.get_value_string_latin1(array_val, ptr, 774 header_len); 775 776 f = req->response->fields + req->response->fields_count; 777 f->skip = 0; 778 779 nxt_unit_sptr_set(&f->name, name_ptr); 780 781 f->name_length = name_len; 782 f->hash = hash; 783 784 nxt_unit_sptr_set(&f->value, ptr); 785 f->value_length = (uint32_t) value_len; 786 787 ptr += value_len + 1; 788 header_len -= value_len + 1; 789 790 req->response->fields_count++; 791 } 792 793 } else { 794 val_type = napi.type_of(value); 795 796 if (val_type != napi_string) { 797 value = napi.coerce_to_string(value); 798 } 799 800 value_len = napi.get_value_string_latin1(value, ptr, header_len); 801 802 f = req->response->fields + req->response->fields_count; 803 f->skip = 0; 804 805 nxt_unit_sptr_set(&f->name, name_ptr); 806 807 f->name_length = name_len; 808 f->hash = hash; 809 810 nxt_unit_sptr_set(&f->value, ptr); 811 f->value_length = (uint32_t) value_len; 812 813 ptr += value_len + 1; 814 header_len -= value_len + 1; 815 816 req->response->fields_count++; 817 } 818 } 819 820 } catch (exception &e) { 821 napi.throw_error(e); 822 return nullptr; 823 } 824 825 req->response_buf->free = ptr; 826 827 ret = nxt_unit_response_send(req); 828 if (ret != NXT_UNIT_OK) { 829 napi.throw_error("Failed to send response"); 830 return nullptr; 831 } 832 833 return this_arg; 834} 835 836 837napi_value 838Unit::response_write(napi_env env, napi_callback_info info) 839{ 840 int ret; 841 void *ptr; 842 size_t argc, have_buf_len; 843 ssize_t res_len; 844 uint32_t buf_start, buf_len; 845 nxt_napi napi(env); 846 napi_value this_arg; 847 nxt_unit_buf_t *buf; 848 napi_valuetype buf_type; 849 nxt_unit_request_info_t *req; 850 napi_value argv[3]; 851 852 argc = 3; 853 854 try { 855 this_arg = napi.get_cb_info(info, argc, argv); 856 if (argc != 3) { 857 throw exception("Wrong args count. Expected: " 858 "chunk, start, length"); 859 } 860 861 req = napi.get_request_info(this_arg); 862 buf_type = napi.type_of(argv[0]); 863 buf_start = napi.get_value_uint32(argv[1]); 864 buf_len = napi.get_value_uint32(argv[2]) + 1; 865 866 if (buf_type == napi_string) { 867 /* TODO: will work only for utf8 content-type */ 868 869 if (req->response_buf != NULL 870 && req->response_buf->end >= req->response_buf->free + buf_len) 871 { 872 buf = req->response_buf; 873 874 } else { 875 buf = nxt_unit_response_buf_alloc(req, buf_len); 876 if (buf == NULL) { 877 throw exception("Failed to allocate response buffer"); 878 } 879 } 880 881 have_buf_len = napi.get_value_string_utf8(argv[0], buf->free, 882 buf_len); 883 884 buf->free += have_buf_len; 885 886 ret = nxt_unit_buf_send(buf); 887 if (ret == NXT_UNIT_OK) { 888 res_len = have_buf_len; 889 } 890 891 } else { 892 ptr = napi.get_buffer_info(argv[0], have_buf_len); 893 894 if (buf_start > 0) { 895 ptr = ((uint8_t *) ptr) + buf_start; 896 have_buf_len -= buf_start; 897 } 898 899 res_len = nxt_unit_response_write_nb(req, ptr, have_buf_len, 0); 900 901 ret = res_len < 0 ? -res_len : (int) NXT_UNIT_OK; 902 } 903 904 if (ret != NXT_UNIT_OK) { 905 throw exception("Failed to send body buf"); 906 } 907 } catch (exception &e) { 908 napi.throw_error(e); 909 return nullptr; 910 } 911 912 return napi.create((int64_t) res_len); 913} 914 915 916napi_value 917Unit::response_end(napi_env env, napi_callback_info info) 918{ 919 nxt_napi napi(env); 920 napi_value this_arg; 921 req_data_t *req_data; 922 nxt_unit_request_info_t *req; 923 924 try { 925 this_arg = napi.get_cb_info(info); 926 927 req = napi.get_request_info(this_arg); 928 929 req_data = (req_data_t *) req->data; 930 931 napi.remove_wrap(req_data->sock_ref);
|
| 932 napi.remove_wrap(req_data->req_ref);
|
887 napi.remove_wrap(req_data->resp_ref); 888 napi.remove_wrap(req_data->conn_ref); 889 890 } catch (exception &e) { 891 napi.throw_error(e); 892 return nullptr; 893 } 894 895 nxt_unit_request_done(req, NXT_UNIT_OK); 896 897 return this_arg; 898} 899 900 901napi_value 902Unit::websocket_send_frame(napi_env env, napi_callback_info info) 903{ 904 int ret, iovec_len; 905 bool fin; 906 size_t buf_len; 907 uint32_t opcode, sc; 908 nxt_napi napi(env); 909 napi_value this_arg, frame, payload; 910 nxt_unit_request_info_t *req; 911 char status_code[2]; 912 struct iovec iov[2]; 913 914 iovec_len = 0; 915 916 try { 917 this_arg = napi.get_cb_info(info, frame); 918 919 req = napi.get_request_info(this_arg); 920 921 opcode = napi.get_value_uint32(napi.get_named_property(frame, 922 "opcode")); 923 if (opcode == NXT_WEBSOCKET_OP_CLOSE) { 924 sc = napi.get_value_uint32(napi.get_named_property(frame, 925 "closeStatus")); 926 status_code[0] = (sc >> 8) & 0xFF; 927 status_code[1] = sc & 0xFF; 928 929 iov[iovec_len].iov_base = status_code; 930 iov[iovec_len].iov_len = 2; 931 iovec_len++; 932 } 933 934 try { 935 fin = napi.get_value_bool(napi.get_named_property(frame, "fin")); 936 937 } catch (exception &e) { 938 fin = true; 939 } 940 941 payload = napi.get_named_property(frame, "binaryPayload"); 942 943 if (napi.is_buffer(payload)) { 944 iov[iovec_len].iov_base = napi.get_buffer_info(payload, buf_len); 945 946 } else { 947 buf_len = 0; 948 } 949 950 } catch (exception &e) { 951 napi.throw_error(e); 952 return nullptr; 953 } 954 955 if (buf_len > 0) { 956 iov[iovec_len].iov_len = buf_len; 957 iovec_len++; 958 } 959 960 ret = nxt_unit_websocket_sendv(req, opcode, fin ? 1 : 0, iov, iovec_len); 961 if (ret != NXT_UNIT_OK) { 962 goto failed; 963 } 964 965 return this_arg; 966 967failed: 968 969 napi.throw_error("Failed to send frame"); 970 971 return nullptr; 972} 973 974 975napi_value 976Unit::websocket_set_sock(napi_env env, napi_callback_info info) 977{ 978 nxt_napi napi(env); 979 napi_value this_arg, sock; 980 req_data_t *req_data; 981 nxt_unit_request_info_t *req; 982 983 try { 984 this_arg = napi.get_cb_info(info, sock); 985 986 req = napi.get_request_info(sock); 987 988 req_data = (req_data_t *) req->data; 989 req_data->conn_ref = napi.wrap(this_arg, req, conn_destroy); 990 991 } catch (exception &e) { 992 napi.throw_error(e); 993 return nullptr; 994 } 995 996 return this_arg; 997} 998 999 1000void 1001Unit::conn_destroy(napi_env env, void *r, void *finalize_hint) 1002{ 1003 nxt_unit_req_debug(NULL, "conn_destroy: %p", r); 1004} 1005 1006 1007void 1008Unit::sock_destroy(napi_env env, void *r, void *finalize_hint) 1009{ 1010 nxt_unit_req_debug(NULL, "sock_destroy: %p", r); 1011} 1012 1013 1014void
| 933 napi.remove_wrap(req_data->resp_ref); 934 napi.remove_wrap(req_data->conn_ref); 935 936 } catch (exception &e) { 937 napi.throw_error(e); 938 return nullptr; 939 } 940 941 nxt_unit_request_done(req, NXT_UNIT_OK); 942 943 return this_arg; 944} 945 946 947napi_value 948Unit::websocket_send_frame(napi_env env, napi_callback_info info) 949{ 950 int ret, iovec_len; 951 bool fin; 952 size_t buf_len; 953 uint32_t opcode, sc; 954 nxt_napi napi(env); 955 napi_value this_arg, frame, payload; 956 nxt_unit_request_info_t *req; 957 char status_code[2]; 958 struct iovec iov[2]; 959 960 iovec_len = 0; 961 962 try { 963 this_arg = napi.get_cb_info(info, frame); 964 965 req = napi.get_request_info(this_arg); 966 967 opcode = napi.get_value_uint32(napi.get_named_property(frame, 968 "opcode")); 969 if (opcode == NXT_WEBSOCKET_OP_CLOSE) { 970 sc = napi.get_value_uint32(napi.get_named_property(frame, 971 "closeStatus")); 972 status_code[0] = (sc >> 8) & 0xFF; 973 status_code[1] = sc & 0xFF; 974 975 iov[iovec_len].iov_base = status_code; 976 iov[iovec_len].iov_len = 2; 977 iovec_len++; 978 } 979 980 try { 981 fin = napi.get_value_bool(napi.get_named_property(frame, "fin")); 982 983 } catch (exception &e) { 984 fin = true; 985 } 986 987 payload = napi.get_named_property(frame, "binaryPayload"); 988 989 if (napi.is_buffer(payload)) { 990 iov[iovec_len].iov_base = napi.get_buffer_info(payload, buf_len); 991 992 } else { 993 buf_len = 0; 994 } 995 996 } catch (exception &e) { 997 napi.throw_error(e); 998 return nullptr; 999 } 1000 1001 if (buf_len > 0) { 1002 iov[iovec_len].iov_len = buf_len; 1003 iovec_len++; 1004 } 1005 1006 ret = nxt_unit_websocket_sendv(req, opcode, fin ? 1 : 0, iov, iovec_len); 1007 if (ret != NXT_UNIT_OK) { 1008 goto failed; 1009 } 1010 1011 return this_arg; 1012 1013failed: 1014 1015 napi.throw_error("Failed to send frame"); 1016 1017 return nullptr; 1018} 1019 1020 1021napi_value 1022Unit::websocket_set_sock(napi_env env, napi_callback_info info) 1023{ 1024 nxt_napi napi(env); 1025 napi_value this_arg, sock; 1026 req_data_t *req_data; 1027 nxt_unit_request_info_t *req; 1028 1029 try { 1030 this_arg = napi.get_cb_info(info, sock); 1031 1032 req = napi.get_request_info(sock); 1033 1034 req_data = (req_data_t *) req->data; 1035 req_data->conn_ref = napi.wrap(this_arg, req, conn_destroy); 1036 1037 } catch (exception &e) { 1038 napi.throw_error(e); 1039 return nullptr; 1040 } 1041 1042 return this_arg; 1043} 1044 1045 1046void 1047Unit::conn_destroy(napi_env env, void *r, void *finalize_hint) 1048{ 1049 nxt_unit_req_debug(NULL, "conn_destroy: %p", r); 1050} 1051 1052 1053void 1054Unit::sock_destroy(napi_env env, void *r, void *finalize_hint) 1055{ 1056 nxt_unit_req_debug(NULL, "sock_destroy: %p", r); 1057} 1058 1059 1060void
|
| 1061Unit::req_destroy(napi_env env, void *r, void *finalize_hint) 1062{ 1063 nxt_unit_req_debug(NULL, "req_destroy: %p", r); 1064} 1065 1066 1067void
|
1015Unit::resp_destroy(napi_env env, void *r, void *finalize_hint) 1016{ 1017 nxt_unit_req_debug(NULL, "resp_destroy: %p", r); 1018}
| 1068Unit::resp_destroy(napi_env env, void *r, void *finalize_hint) 1069{ 1070 nxt_unit_req_debug(NULL, "resp_destroy: %p", r); 1071}
|