Deleted
Added
unit.cpp (1038:77fb332f214a) | unit.cpp (1132:9ac5b5f33ed9) |
---|---|
1 2/* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6#include "unit.h" 7 8#include <unistd.h> 9#include <fcntl.h> 10 11#include <uv.h> 12 | 1 2/* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6#include "unit.h" 7 8#include <unistd.h> 9#include <fcntl.h> 10 11#include <uv.h> 12 |
13#include <nxt_unit_websocket.h> |
|
13 | 14 |
15 |
|
14napi_ref Unit::constructor_; 15 16 17struct nxt_nodejs_ctx_t { 18 nxt_unit_port_id_t port_id; 19 uv_poll_t poll; 20}; 21 22 | 16napi_ref Unit::constructor_; 17 18 19struct nxt_nodejs_ctx_t { 20 nxt_unit_port_id_t port_id; 21 uv_poll_t poll; 22}; 23 24 |
25struct req_data_t { 26 napi_ref sock_ref; 27 napi_ref resp_ref; 28 napi_ref conn_ref; 29}; 30 31 |
|
23Unit::Unit(napi_env env, napi_value jsthis): 24 nxt_napi(env), 25 wrapper_(wrap(jsthis, this, destroy)), 26 unit_ctx_(nullptr) 27{ | 32Unit::Unit(napi_env env, napi_value jsthis): 33 nxt_napi(env), 34 wrapper_(wrap(jsthis, this, destroy)), 35 unit_ctx_(nullptr) 36{ |
37 nxt_unit_debug(NULL, "Unit::Unit()"); |
|
28} 29 30 31Unit::~Unit() 32{ 33 delete_reference(wrapper_); | 38} 39 40 41Unit::~Unit() 42{ 43 delete_reference(wrapper_); |
44 45 nxt_unit_debug(NULL, "Unit::~Unit()"); |
|
34} 35 36 37napi_value 38Unit::init(napi_env env, napi_value exports) 39{ 40 nxt_napi napi(env); | 46} 47 48 49napi_value 50Unit::init(napi_env env, napi_value exports) 51{ 52 nxt_napi napi(env); |
41 napi_value cons; | 53 napi_value ctor; |
42 | 54 |
43 napi_property_descriptor properties[] = { | 55 napi_property_descriptor unit_props[] = { |
44 { "createServer", 0, create_server, 0, 0, 0, napi_default, 0 }, 45 { "listen", 0, listen, 0, 0, 0, napi_default, 0 }, | 56 { "createServer", 0, create_server, 0, 0, 0, napi_default, 0 }, 57 { "listen", 0, listen, 0, 0, 0, napi_default, 0 }, |
46 { "_read", 0, _read, 0, 0, 0, napi_default, 0 } | |
47 }; 48 49 try { | 58 }; 59 60 try { |
50 cons = napi.define_class("Unit", create, 3, properties); 51 constructor_ = napi.create_reference(cons); | 61 ctor = napi.define_class("Unit", create, 2, unit_props); 62 constructor_ = napi.create_reference(ctor); |
52 | 63 |
53 napi.set_named_property(exports, "Unit", cons); 54 napi.set_named_property(exports, "unit_response_headers", | 64 napi.set_named_property(exports, "Unit", ctor); 65 napi.set_named_property(exports, "response_send_headers", |
55 response_send_headers); | 66 response_send_headers); |
56 napi.set_named_property(exports, "unit_response_write", response_write); 57 napi.set_named_property(exports, "unit_response_end", response_end); | 67 napi.set_named_property(exports, "response_write", response_write); 68 napi.set_named_property(exports, "response_end", response_end); 69 napi.set_named_property(exports, "websocket_send_frame", 70 websocket_send_frame); 71 napi.set_named_property(exports, "websocket_set_sock", 72 websocket_set_sock); |
58 59 } catch (exception &e) { 60 napi.throw_error(e); 61 return nullptr; 62 } 63 64 return exports; 65} --- 7 unchanged lines hidden (view full) --- 73 delete obj; 74} 75 76 77napi_value 78Unit::create(napi_env env, napi_callback_info info) 79{ 80 nxt_napi napi(env); | 73 74 } catch (exception &e) { 75 napi.throw_error(e); 76 return nullptr; 77 } 78 79 return exports; 80} --- 7 unchanged lines hidden (view full) --- 88 delete obj; 89} 90 91 92napi_value 93Unit::create(napi_env env, napi_callback_info info) 94{ 95 nxt_napi napi(env); |
81 napi_value target, cons, instance, jsthis; | 96 napi_value target, ctor, instance, jsthis; |
82 83 try { 84 target = napi.get_new_target(info); 85 86 if (target != nullptr) { 87 /* Invoked as constructor: `new Unit(...)`. */ 88 jsthis = napi.get_cb_info(info); 89 90 new Unit(env, jsthis); 91 napi.create_reference(jsthis); 92 93 return jsthis; 94 } 95 96 /* Invoked as plain function `Unit(...)`, turn into construct call. */ | 97 98 try { 99 target = napi.get_new_target(info); 100 101 if (target != nullptr) { 102 /* Invoked as constructor: `new Unit(...)`. */ 103 jsthis = napi.get_cb_info(info); 104 105 new Unit(env, jsthis); 106 napi.create_reference(jsthis); 107 108 return jsthis; 109 } 110 111 /* Invoked as plain function `Unit(...)`, turn into construct call. */ |
97 cons = napi.get_reference_value(constructor_); 98 instance = napi.new_instance(cons); | 112 ctor = napi.get_reference_value(constructor_); 113 instance = napi.new_instance(ctor); |
99 napi.create_reference(instance); 100 101 } catch (exception &e) { 102 napi.throw_error(e); 103 return nullptr; 104 } 105 106 return instance; --- 18 unchanged lines hidden (view full) --- 125 } catch (exception &e) { 126 napi.throw_error(e); 127 return nullptr; 128 } 129 130 memset(&unit_init, 0, sizeof(nxt_unit_init_t)); 131 132 unit_init.data = obj; | 114 napi.create_reference(instance); 115 116 } catch (exception &e) { 117 napi.throw_error(e); 118 return nullptr; 119 } 120 121 return instance; --- 18 unchanged lines hidden (view full) --- 140 } catch (exception &e) { 141 napi.throw_error(e); 142 return nullptr; 143 } 144 145 memset(&unit_init, 0, sizeof(nxt_unit_init_t)); 146 147 unit_init.data = obj; |
133 unit_init.callbacks.request_handler = request_handler; 134 unit_init.callbacks.add_port = add_port; 135 unit_init.callbacks.remove_port = remove_port; 136 unit_init.callbacks.quit = quit; | 148 unit_init.callbacks.request_handler = request_handler_cb; 149 unit_init.callbacks.websocket_handler = websocket_handler_cb; 150 unit_init.callbacks.close_handler = close_handler_cb; 151 unit_init.callbacks.add_port = add_port; 152 unit_init.callbacks.remove_port = remove_port; 153 unit_init.callbacks.quit = quit_cb; |
137 | 154 |
155 unit_init.request_data_size = sizeof(req_data_t); 156 |
|
138 obj->unit_ctx_ = nxt_unit_init(&unit_init); 139 if (obj->unit_ctx_ == NULL) { 140 goto failed; 141 } 142 143 return nullptr; 144 145failed: --- 6 unchanged lines hidden (view full) --- 152 153napi_value 154Unit::listen(napi_env env, napi_callback_info info) 155{ 156 return nullptr; 157} 158 159 | 157 obj->unit_ctx_ = nxt_unit_init(&unit_init); 158 if (obj->unit_ctx_ == NULL) { 159 goto failed; 160 } 161 162 return nullptr; 163 164failed: --- 6 unchanged lines hidden (view full) --- 171 172napi_value 173Unit::listen(napi_env env, napi_callback_info info) 174{ 175 return nullptr; 176} 177 178 |
160napi_value 161Unit::_read(napi_env env, napi_callback_info info) | 179void 180Unit::request_handler_cb(nxt_unit_request_info_t *req) |
162{ | 181{ |
163 void *data; 164 size_t argc; 165 nxt_napi napi(env); 166 napi_value buffer, argv; 167 nxt_unit_request_info_t *req; | 182 Unit *obj; |
168 | 183 |
169 argc = 1; | 184 obj = reinterpret_cast<Unit *>(req->unit->data); |
170 | 185 |
186 obj->request_handler(req); 187} 188 189 190void 191Unit::request_handler(nxt_unit_request_info_t *req) 192{ 193 napi_value socket, request, response, server_obj, emit_request; 194 195 memset(req->data, 0, sizeof(req_data_t)); 196 |
|
171 try { | 197 try { |
172 napi.get_cb_info(info, argc, &argv); | 198 nxt_handle_scope scope(env()); |
173 | 199 |
174 req = napi.get_request_info(argv); 175 buffer = napi.create_buffer((size_t) req->content_length, &data); | 200 server_obj = get_server_object(); |
176 | 201 |
202 socket = create_socket(server_obj, req); 203 request = create_request(server_obj, socket); 204 response = create_response(server_obj, request, req); 205 206 create_headers(req, request); 207 208 emit_request = get_named_property(server_obj, "emit_request"); 209 210 nxt_async_context async_context(env(), "request_handler"); 211 nxt_callback_scope async_scope(async_context); 212 213 make_callback(async_context, server_obj, emit_request, request, 214 response); 215 |
|
177 } catch (exception &e) { | 216 } catch (exception &e) { |
178 napi.throw_error(e); 179 return nullptr; | 217 nxt_unit_req_warn(req, "request_handler: %s", e.str); |
180 } | 218 } |
219} |
|
181 | 220 |
182 nxt_unit_request_read(req, data, req->content_length); | |
183 | 221 |
184 return buffer; | 222void 223Unit::websocket_handler_cb(nxt_unit_websocket_frame_t *ws) 224{ 225 Unit *obj; 226 227 obj = reinterpret_cast<Unit *>(ws->req->unit->data); 228 229 obj->websocket_handler(ws); |
185} 186 187 188void | 230} 231 232 233void |
189Unit::request_handler(nxt_unit_request_info_t *req) | 234Unit::websocket_handler(nxt_unit_websocket_frame_t *ws) |
190{ | 235{ |
191 Unit *obj; 192 napi_value socket, request, response, server_obj; 193 napi_value emit_events; 194 napi_value events_args[3]; | 236 napi_value frame, server_obj, process_frame, conn; 237 req_data_t *req_data; |
195 | 238 |
196 obj = reinterpret_cast<Unit *>(req->unit->data); | 239 req_data = (req_data_t *) ws->req->data; |
197 198 try { | 240 241 try { |
199 nxt_handle_scope scope(obj->env()); | 242 nxt_handle_scope scope(env()); |
200 | 243 |
201 server_obj = obj->get_server_object(); | 244 server_obj = get_server_object(); |
202 | 245 |
203 socket = obj->create_socket(server_obj, req); 204 request = obj->create_request(server_obj, socket); 205 response = obj->create_response(server_obj, socket, request, req); | 246 frame = create_websocket_frame(server_obj, ws); |
206 | 247 |
207 obj->create_headers(req, request); | 248 conn = get_reference_value(req_data->conn_ref); |
208 | 249 |
209 emit_events = obj->get_named_property(server_obj, "emit_events"); | 250 process_frame = get_named_property(conn, "processFrame"); |
210 | 251 |
211 events_args[0] = server_obj; 212 events_args[1] = request; 213 events_args[2] = response; 214 215 nxt_async_context async_context(obj->env(), "unit_request_handler"); | 252 nxt_async_context async_context(env(), "websocket_handler"); |
216 nxt_callback_scope async_scope(async_context); 217 | 253 nxt_callback_scope async_scope(async_context); 254 |
218 obj->make_callback(async_context, server_obj, emit_events, 219 3, events_args); | 255 make_callback(async_context, conn, process_frame, frame); |
220 221 } catch (exception &e) { | 256 257 } catch (exception &e) { |
222 obj->throw_error(e); | 258 nxt_unit_req_warn(ws->req, "websocket_handler: %s", e.str); |
223 } | 259 } |
260 261 nxt_unit_websocket_done(ws); |
|
224} 225 226 227void | 262} 263 264 265void |
266Unit::close_handler_cb(nxt_unit_request_info_t *req) 267{ 268 Unit *obj; 269 270 obj = reinterpret_cast<Unit *>(req->unit->data); 271 272 obj->close_handler(req); 273} 274 275 276void 277Unit::close_handler(nxt_unit_request_info_t *req) 278{ 279 napi_value conn_handle_close, conn; 280 req_data_t *req_data; 281 282 req_data = (req_data_t *) req->data; 283 284 try { 285 nxt_handle_scope scope(env()); 286 287 conn = get_reference_value(req_data->conn_ref); 288 289 conn_handle_close = get_named_property(conn, "handleSocketClose"); 290 291 nxt_async_context async_context(env(), "close_handler"); 292 nxt_callback_scope async_scope(async_context); 293 294 make_callback(async_context, conn, conn_handle_close, 295 nxt_napi::create(0)); 296 297 remove_wrap(req_data->sock_ref); 298 remove_wrap(req_data->resp_ref); 299 remove_wrap(req_data->conn_ref); 300 301 } catch (exception &e) { 302 nxt_unit_req_warn(req, "close_handler: %s", e.str); 303 304 return; 305 } 306 307 nxt_unit_request_done(req, NXT_UNIT_OK); 308} 309 310 311static void |
|
228nxt_uv_read_callback(uv_poll_t *handle, int status, int events) 229{ 230 nxt_unit_run_once((nxt_unit_ctx_t *) handle->data); 231} 232 233 234int 235Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 236{ 237 int err; 238 Unit *obj; 239 uv_loop_t *loop; 240 napi_status status; 241 nxt_nodejs_ctx_t *node_ctx; 242 243 if (port->in_fd != -1) { 244 obj = reinterpret_cast<Unit *>(ctx->unit->data); 245 246 if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) { | 312nxt_uv_read_callback(uv_poll_t *handle, int status, int events) 313{ 314 nxt_unit_run_once((nxt_unit_ctx_t *) handle->data); 315} 316 317 318int 319Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port) 320{ 321 int err; 322 Unit *obj; 323 uv_loop_t *loop; 324 napi_status status; 325 nxt_nodejs_ctx_t *node_ctx; 326 327 if (port->in_fd != -1) { 328 obj = reinterpret_cast<Unit *>(ctx->unit->data); 329 330 if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) { |
247 obj->throw_error("Failed to upgrade read" 248 " file descriptor to O_NONBLOCK"); | 331 nxt_unit_warn(ctx, "fcntl(%d, O_NONBLOCK) failed: %s (%d)", 332 port->in_fd, strerror(errno), errno); |
249 return -1; 250 } 251 252 status = napi_get_uv_event_loop(obj->env(), &loop); 253 if (status != napi_ok) { | 333 return -1; 334 } 335 336 status = napi_get_uv_event_loop(obj->env(), &loop); 337 if (status != napi_ok) { |
254 obj->throw_error("Failed to get uv.loop"); | 338 nxt_unit_warn(ctx, "Failed to get uv.loop"); |
255 return NXT_UNIT_ERROR; 256 } 257 258 node_ctx = new nxt_nodejs_ctx_t; 259 260 err = uv_poll_init(loop, &node_ctx->poll, port->in_fd); 261 if (err < 0) { | 339 return NXT_UNIT_ERROR; 340 } 341 342 node_ctx = new nxt_nodejs_ctx_t; 343 344 err = uv_poll_init(loop, &node_ctx->poll, port->in_fd); 345 if (err < 0) { |
262 obj->throw_error("Failed to init uv.poll"); | 346 nxt_unit_warn(ctx, "Failed to init uv.poll"); |
263 return NXT_UNIT_ERROR; 264 } 265 266 err = uv_poll_start(&node_ctx->poll, UV_READABLE, nxt_uv_read_callback); 267 if (err < 0) { | 347 return NXT_UNIT_ERROR; 348 } 349 350 err = uv_poll_start(&node_ctx->poll, UV_READABLE, nxt_uv_read_callback); 351 if (err < 0) { |
268 obj->throw_error("Failed to start uv.poll"); | 352 nxt_unit_warn(ctx, "Failed to start uv.poll"); |
269 return NXT_UNIT_ERROR; 270 } 271 272 ctx->data = node_ctx; 273 274 node_ctx->port_id = port->id; 275 node_ctx->poll.data = ctx; 276 } --- 26 unchanged lines hidden (view full) --- 303 } 304 } 305 306 nxt_unit_remove_port(ctx, port_id); 307} 308 309 310void | 353 return NXT_UNIT_ERROR; 354 } 355 356 ctx->data = node_ctx; 357 358 node_ctx->port_id = port->id; 359 node_ctx->poll.data = ctx; 360 } --- 26 unchanged lines hidden (view full) --- 387 } 388 } 389 390 nxt_unit_remove_port(ctx, port_id); 391} 392 393 394void |
311Unit::quit(nxt_unit_ctx_t *ctx) | 395Unit::quit_cb(nxt_unit_ctx_t *ctx) |
312{ | 396{ |
313 Unit *obj; 314 napi_value server_obj, emit_close; | 397 Unit *obj; |
315 316 obj = reinterpret_cast<Unit *>(ctx->unit->data); 317 | 398 399 obj = reinterpret_cast<Unit *>(ctx->unit->data); 400 |
401 obj->quit(ctx); 402} 403 404 405void 406Unit::quit(nxt_unit_ctx_t *ctx) 407{ 408 napi_value server_obj, emit_close; 409 |
|
318 try { | 410 try { |
319 nxt_handle_scope scope(obj->env()); | 411 nxt_handle_scope scope(env()); |
320 | 412 |
321 server_obj = obj->get_server_object(); | 413 server_obj = get_server_object(); |
322 | 414 |
323 emit_close = obj->get_named_property(server_obj, "emit_close"); | 415 emit_close = get_named_property(server_obj, "emit_close"); |
324 | 416 |
325 nxt_async_context async_context(obj->env(), "unit_quit"); | 417 nxt_async_context async_context(env(), "unit_quit"); |
326 nxt_callback_scope async_scope(async_context); 327 | 418 nxt_callback_scope async_scope(async_context); 419 |
328 obj->make_callback(async_context, server_obj, emit_close, 0, NULL); | 420 make_callback(async_context, server_obj, emit_close); |
329 330 } catch (exception &e) { | 421 422 } catch (exception &e) { |
331 obj->throw_error(e); | 423 nxt_unit_debug(ctx, "quit: %s", e.str); |
332 } 333 334 nxt_unit_done(ctx); 335} 336 337 338napi_value 339Unit::get_server_object() --- 4 unchanged lines hidden (view full) --- 344 345 return get_named_property(unit_obj, "server"); 346} 347 348 349void 350Unit::create_headers(nxt_unit_request_info_t *req, napi_value request) 351{ | 424 } 425 426 nxt_unit_done(ctx); 427} 428 429 430napi_value 431Unit::get_server_object() --- 4 unchanged lines hidden (view full) --- 436 437 return get_named_property(unit_obj, "server"); 438} 439 440 441void 442Unit::create_headers(nxt_unit_request_info_t *req, napi_value request) 443{ |
444 void *data; |
|
352 uint32_t i; | 445 uint32_t i; |
353 napi_value headers, raw_headers; | 446 napi_value headers, raw_headers, buffer; |
354 napi_status status; 355 nxt_unit_request_t *r; 356 357 r = req->request; 358 359 headers = create_object(); 360 361 status = napi_create_array_with_length(env(), r->fields_count * 2, --- 6 unchanged lines hidden (view full) --- 368 append_header(r->fields + i, headers, raw_headers, i); 369 } 370 371 set_named_property(request, "headers", headers); 372 set_named_property(request, "rawHeaders", raw_headers); 373 set_named_property(request, "httpVersion", r->version, r->version_length); 374 set_named_property(request, "method", r->method, r->method_length); 375 set_named_property(request, "url", r->target, r->target_length); | 447 napi_status status; 448 nxt_unit_request_t *r; 449 450 r = req->request; 451 452 headers = create_object(); 453 454 status = napi_create_array_with_length(env(), r->fields_count * 2, --- 6 unchanged lines hidden (view full) --- 461 append_header(r->fields + i, headers, raw_headers, i); 462 } 463 464 set_named_property(request, "headers", headers); 465 set_named_property(request, "rawHeaders", raw_headers); 466 set_named_property(request, "httpVersion", r->version, r->version_length); 467 set_named_property(request, "method", r->method, r->method_length); 468 set_named_property(request, "url", r->target, r->target_length); |
469 470 set_named_property(request, "_websocket_handshake", r->websocket_handshake); 471 472 buffer = create_buffer((size_t) req->content_length, &data); 473 nxt_unit_request_read(req, data, req->content_length); 474 475 set_named_property(request, "_data", buffer); |
|
376} 377 378 379inline char 380lowcase(char c) 381{ 382 return (c >= 'A' && c <= 'Z') ? (c | 0x20) : c; 383} --- 21 unchanged lines hidden (view full) --- 405 set_element(raw_headers, idx * 2 + 1, vstr); 406} 407 408 409napi_value 410Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req) 411{ 412 napi_value constructor, res; | 476} 477 478 479inline char 480lowcase(char c) 481{ 482 return (c >= 'A' && c <= 'Z') ? (c | 0x20) : c; 483} --- 21 unchanged lines hidden (view full) --- 505 set_element(raw_headers, idx * 2 + 1, vstr); 506} 507 508 509napi_value 510Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req) 511{ 512 napi_value constructor, res; |
513 req_data_t *req_data; |
|
413 nxt_unit_request_t *r; 414 415 r = req->request; 416 | 514 nxt_unit_request_t *r; 515 516 r = req->request; 517 |
417 constructor = get_named_property(server_obj, "socket"); | 518 constructor = get_named_property(server_obj, "Socket"); |
418 419 res = new_instance(constructor); 420 | 519 520 res = new_instance(constructor); 521 |
421 set_named_property(res, "req_pointer", (intptr_t) req); | 522 req_data = (req_data_t *) req->data; 523 req_data->sock_ref = wrap(res, req, sock_destroy); 524 |
422 set_named_property(res, "remoteAddress", r->remote, r->remote_length); 423 set_named_property(res, "localAddress", r->local, r->local_length); 424 425 return res; 426} 427 428 429napi_value 430Unit::create_request(napi_value server_obj, napi_value socket) 431{ | 525 set_named_property(res, "remoteAddress", r->remote, r->remote_length); 526 set_named_property(res, "localAddress", r->local, r->local_length); 527 528 return res; 529} 530 531 532napi_value 533Unit::create_request(napi_value server_obj, napi_value socket) 534{ |
432 napi_value constructor, return_val; | 535 napi_value constructor; |
433 | 536 |
434 constructor = get_named_property(server_obj, "request"); | 537 constructor = get_named_property(server_obj, "ServerRequest"); |
435 | 538 |
436 return_val = new_instance(constructor, server_obj); | 539 return new_instance(constructor, server_obj, socket); 540} |
437 | 541 |
438 set_named_property(return_val, "socket", socket); 439 set_named_property(return_val, "connection", socket); | |
440 | 542 |
441 return return_val; | 543napi_value 544Unit::create_response(napi_value server_obj, napi_value request, 545 nxt_unit_request_info_t *req) 546{ 547 napi_value constructor, res; 548 req_data_t *req_data; 549 550 constructor = get_named_property(server_obj, "ServerResponse"); 551 552 res = new_instance(constructor, request); 553 554 req_data = (req_data_t *) req->data; 555 req_data->resp_ref = wrap(res, req, resp_destroy); 556 557 return res; |
442} 443 444 445napi_value | 558} 559 560 561napi_value |
446Unit::create_response(napi_value server_obj, napi_value socket, 447 napi_value request, nxt_unit_request_info_t *req) | 562Unit::create_websocket_frame(napi_value server_obj, 563 nxt_unit_websocket_frame_t *ws) |
448{ | 564{ |
449 napi_value constructor, return_val; | 565 void *data; 566 napi_value constructor, res, buffer; 567 uint8_t sc[2]; |
450 | 568 |
451 constructor = get_named_property(server_obj, "response"); | 569 constructor = get_named_property(server_obj, "WebSocketFrame"); |
452 | 570 |
453 return_val = new_instance(constructor, request); | 571 res = new_instance(constructor); |
454 | 572 |
455 set_named_property(return_val, "socket", socket); 456 set_named_property(return_val, "connection", socket); 457 set_named_property(return_val, "_req_point", (intptr_t) req); | 573 set_named_property(res, "fin", (bool) ws->header->fin); 574 set_named_property(res, "opcode", ws->header->opcode); 575 set_named_property(res, "length", (int64_t) ws->payload_len); |
458 | 576 |
459 return return_val; | 577 if (ws->header->opcode == NXT_WEBSOCKET_OP_CLOSE) { 578 if (ws->payload_len >= 2) { 579 nxt_unit_websocket_read(ws, sc, 2); 580 581 set_named_property(res, "closeStatus", 582 (((uint16_t) sc[0]) << 8) | sc[1]); 583 584 } else { 585 set_named_property(res, "closeStatus", -1); 586 } 587 } 588 589 buffer = create_buffer((size_t) ws->content_length, &data); 590 nxt_unit_websocket_read(ws, data, ws->content_length); 591 592 set_named_property(res, "binaryPayload", buffer); 593 594 return res; |
460} 461 462 463napi_value 464Unit::response_send_headers(napi_env env, napi_callback_info info) 465{ 466 int ret; 467 char *ptr, *name_ptr; 468 bool is_array; 469 size_t argc, name_len, value_len; 470 uint32_t status_code, header_len, keys_len, array_len; 471 uint32_t keys_count, i, j; 472 uint16_t hash; 473 nxt_napi napi(env); 474 napi_value this_arg, headers, keys, name, value, array_val; | 595} 596 597 598napi_value 599Unit::response_send_headers(napi_env env, napi_callback_info info) 600{ 601 int ret; 602 char *ptr, *name_ptr; 603 bool is_array; 604 size_t argc, name_len, value_len; 605 uint32_t status_code, header_len, keys_len, array_len; 606 uint32_t keys_count, i, j; 607 uint16_t hash; 608 nxt_napi napi(env); 609 napi_value this_arg, headers, keys, name, value, array_val; |
475 napi_value req_num, array_entry; | 610 napi_value array_entry; |
476 napi_valuetype val_type; 477 nxt_unit_field_t *f; 478 nxt_unit_request_info_t *req; | 611 napi_valuetype val_type; 612 nxt_unit_field_t *f; 613 nxt_unit_request_info_t *req; |
479 napi_value argv[5]; | 614 napi_value argv[4]; |
480 | 615 |
481 argc = 5; | 616 argc = 4; |
482 483 try { 484 this_arg = napi.get_cb_info(info, argc, argv); | 617 618 try { 619 this_arg = napi.get_cb_info(info, argc, argv); |
485 if (argc != 5) { | 620 if (argc != 4) { |
486 napi.throw_error("Wrong args count. Expected: " 487 "statusCode, headers, headers count, " 488 "headers length"); 489 return nullptr; 490 } 491 | 621 napi.throw_error("Wrong args count. Expected: " 622 "statusCode, headers, headers count, " 623 "headers length"); 624 return nullptr; 625 } 626 |
492 req_num = napi.get_named_property(argv[0], "_req_point"); | 627 req = napi.get_request_info(this_arg); 628 status_code = napi.get_value_uint32(argv[0]); 629 keys_count = napi.get_value_uint32(argv[2]); 630 header_len = napi.get_value_uint32(argv[3]); |
493 | 631 |
494 req = napi.get_request_info(req_num); 495 496 status_code = napi.get_value_uint32(argv[1]); 497 keys_count = napi.get_value_uint32(argv[3]); 498 header_len = napi.get_value_uint32(argv[4]); 499 | |
500 /* Need to reserve extra byte for C-string 0-termination. */ 501 header_len++; 502 | 632 /* Need to reserve extra byte for C-string 0-termination. */ 633 header_len++; 634 |
503 headers = argv[2]; | 635 headers = argv[1]; |
504 505 ret = nxt_unit_response_init(req, status_code, keys_count, header_len); 506 if (ret != NXT_UNIT_OK) { 507 napi.throw_error("Failed to create response"); 508 return nullptr; 509 } 510 511 keys = napi.get_property_names(headers); --- 94 unchanged lines hidden (view full) --- 606 return this_arg; 607} 608 609 610napi_value 611Unit::response_write(napi_env env, napi_callback_info info) 612{ 613 int ret; | 636 637 ret = nxt_unit_response_init(req, status_code, keys_count, header_len); 638 if (ret != NXT_UNIT_OK) { 639 napi.throw_error("Failed to create response"); 640 return nullptr; 641 } 642 643 keys = napi.get_property_names(headers); --- 94 unchanged lines hidden (view full) --- 738 return this_arg; 739} 740 741 742napi_value 743Unit::response_write(napi_env env, napi_callback_info info) 744{ 745 int ret; |
614 char *ptr; | 746 void *ptr; |
615 size_t argc, have_buf_len; 616 uint32_t buf_len; 617 nxt_napi napi(env); | 747 size_t argc, have_buf_len; 748 uint32_t buf_len; 749 nxt_napi napi(env); |
618 napi_value this_arg, req_num; 619 napi_status status; | 750 napi_value this_arg; |
620 nxt_unit_buf_t *buf; 621 napi_valuetype buf_type; 622 nxt_unit_request_info_t *req; | 751 nxt_unit_buf_t *buf; 752 napi_valuetype buf_type; 753 nxt_unit_request_info_t *req; |
623 napi_value argv[3]; | 754 napi_value argv[2]; |
624 | 755 |
625 argc = 3; | 756 argc = 2; |
626 627 try { 628 this_arg = napi.get_cb_info(info, argc, argv); | 757 758 try { 759 this_arg = napi.get_cb_info(info, argc, argv); |
629 if (argc != 3) { | 760 if (argc != 2) { |
630 throw exception("Wrong args count. Expected: " 631 "chunk, chunk length"); 632 } 633 | 761 throw exception("Wrong args count. Expected: " 762 "chunk, chunk length"); 763 } 764 |
634 req_num = napi.get_named_property(argv[0], "_req_point"); 635 req = napi.get_request_info(req_num); | 765 req = napi.get_request_info(this_arg); 766 buf_type = napi.type_of(argv[0]); 767 buf_len = napi.get_value_uint32(argv[1]) + 1; |
636 | 768 |
637 buf_len = napi.get_value_uint32(argv[2]); | 769 buf = nxt_unit_response_buf_alloc(req, buf_len); 770 if (buf == NULL) { 771 throw exception("Failed to allocate response buffer"); 772 } |
638 | 773 |
639 buf_type = napi.type_of(argv[1]); | 774 if (buf_type == napi_string) { 775 /* TODO: will work only for utf8 content-type */ |
640 | 776 |
777 have_buf_len = napi.get_value_string_utf8(argv[0], buf->free, 778 buf_len); 779 780 } else { 781 ptr = napi.get_buffer_info(argv[0], have_buf_len); 782 783 memcpy(buf->free, ptr, have_buf_len); 784 } 785 786 buf->free += have_buf_len; 787 788 ret = nxt_unit_buf_send(buf); 789 if (ret != NXT_UNIT_OK) { 790 throw exception("Failed to send body buf"); 791 } |
|
641 } catch (exception &e) { 642 napi.throw_error(e); 643 return nullptr; 644 } 645 | 792 } catch (exception &e) { 793 napi.throw_error(e); 794 return nullptr; 795 } 796 |
646 buf_len++; | 797 return this_arg; 798} |
647 | 799 |
648 buf = nxt_unit_response_buf_alloc(req, buf_len); 649 if (buf == NULL) { 650 goto failed; | 800 801napi_value 802Unit::response_end(napi_env env, napi_callback_info info) 803{ 804 nxt_napi napi(env); 805 napi_value this_arg; 806 req_data_t *req_data; 807 nxt_unit_request_info_t *req; 808 809 try { 810 this_arg = napi.get_cb_info(info); 811 812 req = napi.get_request_info(this_arg); 813 814 req_data = (req_data_t *) req->data; 815 816 napi.remove_wrap(req_data->sock_ref); 817 napi.remove_wrap(req_data->resp_ref); 818 napi.remove_wrap(req_data->conn_ref); 819 820 } catch (exception &e) { 821 napi.throw_error(e); 822 return nullptr; |
651 } 652 | 823 } 824 |
653 if (buf_type == napi_string) { 654 /* TODO: will work only for utf8 content-type */ | 825 nxt_unit_request_done(req, NXT_UNIT_OK); |
655 | 826 |
656 status = napi_get_value_string_utf8(env, argv[1], buf->free, 657 buf_len, &have_buf_len); | 827 return this_arg; 828} |
658 | 829 |
659 } else { 660 status = napi_get_buffer_info(env, argv[1], (void **) &ptr, 661 &have_buf_len); | |
662 | 830 |
663 memcpy(buf->free, ptr, have_buf_len); | 831napi_value 832Unit::websocket_send_frame(napi_env env, napi_callback_info info) 833{ 834 int ret, iovec_len; 835 bool fin; 836 size_t buf_len; 837 uint32_t opcode, sc; 838 nxt_napi napi(env); 839 napi_value this_arg, frame, payload; 840 nxt_unit_request_info_t *req; 841 char status_code[2]; 842 struct iovec iov[2]; 843 844 iovec_len = 0; 845 846 try { 847 this_arg = napi.get_cb_info(info, frame); 848 849 req = napi.get_request_info(this_arg); 850 851 opcode = napi.get_value_uint32(napi.get_named_property(frame, 852 "opcode")); 853 if (opcode == NXT_WEBSOCKET_OP_CLOSE) { 854 sc = napi.get_value_uint32(napi.get_named_property(frame, 855 "closeStatus")); 856 status_code[0] = (sc >> 8) & 0xFF; 857 status_code[1] = sc & 0xFF; 858 859 iov[iovec_len].iov_base = status_code; 860 iov[iovec_len].iov_len = 2; 861 iovec_len++; 862 } 863 864 try { 865 fin = napi.get_value_bool(napi.get_named_property(frame, "fin")); 866 867 } catch (exception &e) { 868 fin = true; 869 } 870 871 payload = napi.get_named_property(frame, "binaryPayload"); 872 873 if (napi.is_buffer(payload)) { 874 iov[iovec_len].iov_base = napi.get_buffer_info(payload, buf_len); 875 876 } else { 877 buf_len = 0; 878 } 879 880 } catch (exception &e) { 881 napi.throw_error(e); 882 return nullptr; |
664 } 665 | 883 } 884 |
666 if (status != napi_ok) { 667 goto failed; | 885 if (buf_len > 0) { 886 iov[iovec_len].iov_len = buf_len; 887 iovec_len++; |
668 } 669 | 888 } 889 |
670 buf->free += have_buf_len; 671 672 ret = nxt_unit_buf_send(buf); | 890 ret = nxt_unit_websocket_sendv(req, opcode, fin ? 1 : 0, iov, iovec_len); |
673 if (ret != NXT_UNIT_OK) { 674 goto failed; 675 } 676 677 return this_arg; 678 679failed: 680 | 891 if (ret != NXT_UNIT_OK) { 892 goto failed; 893 } 894 895 return this_arg; 896 897failed: 898 |
681 napi.throw_error("Failed to write body"); | 899 napi.throw_error("Failed to send frame"); |
682 683 return nullptr; 684} 685 686 687napi_value | 900 901 return nullptr; 902} 903 904 905napi_value |
688Unit::response_end(napi_env env, napi_callback_info info) | 906Unit::websocket_set_sock(napi_env env, napi_callback_info info) |
689{ | 907{ |
690 size_t argc; | |
691 nxt_napi napi(env); | 908 nxt_napi napi(env); |
692 napi_value resp, this_arg, req_num; | 909 napi_value this_arg, sock; 910 req_data_t *req_data; |
693 nxt_unit_request_info_t *req; 694 | 911 nxt_unit_request_info_t *req; 912 |
695 argc = 1; 696 | |
697 try { | 913 try { |
698 this_arg = napi.get_cb_info(info, argc, &resp); | 914 this_arg = napi.get_cb_info(info, sock); |
699 | 915 |
700 req_num = napi.get_named_property(resp, "_req_point"); 701 req = napi.get_request_info(req_num); | 916 req = napi.get_request_info(sock); |
702 | 917 |
918 req_data = (req_data_t *) req->data; 919 req_data->conn_ref = napi.wrap(this_arg, req, conn_destroy); 920 |
|
703 } catch (exception &e) { 704 napi.throw_error(e); 705 return nullptr; 706 } 707 | 921 } catch (exception &e) { 922 napi.throw_error(e); 923 return nullptr; 924 } 925 |
708 nxt_unit_request_done(req, NXT_UNIT_OK); 709 | |
710 return this_arg; 711} | 926 return this_arg; 927} |
928 929 930void 931Unit::conn_destroy(napi_env env, void *nativeObject, void *finalize_hint) 932{ 933 nxt_unit_request_info_t *req; 934 935 req = (nxt_unit_request_info_t *) nativeObject; 936 937 nxt_unit_warn(NULL, "conn_destroy: %p", req); 938} 939 940 941void 942Unit::sock_destroy(napi_env env, void *nativeObject, void *finalize_hint) 943{ 944 nxt_unit_request_info_t *req; 945 946 req = (nxt_unit_request_info_t *) nativeObject; 947 948 nxt_unit_warn(NULL, "sock_destroy: %p", req); 949} 950 951 952void 953Unit::resp_destroy(napi_env env, void *nativeObject, void *finalize_hint) 954{ 955 nxt_unit_request_info_t *req; 956 957 req = (nxt_unit_request_info_t *) nativeObject; 958 959 nxt_unit_warn(NULL, "resp_destroy: %p", req); 960} |
|