Deleted Added
1
2/*
3 * Copyright (C) NGINX, Inc.
4 */
5
6#include "unit.h"
7
8#include <unistd.h>
9#include <fcntl.h>
10
11#include <uv.h>
12
13#include <nxt_unit_websocket.h>
14
15
16napi_ref Unit::constructor_;
17
18
19struct nxt_nodejs_ctx_t {
20 nxt_unit_port_id_t port_id;
21 uv_poll_t poll;
22};
23
24
25struct req_data_t {
26 napi_ref sock_ref;
27 napi_ref resp_ref;
28 napi_ref conn_ref;
29};
30
31
32Unit::Unit(napi_env env, napi_value jsthis):
33 nxt_napi(env),
34 wrapper_(wrap(jsthis, this, destroy)),
35 unit_ctx_(nullptr)
36{
37 nxt_unit_debug(NULL, "Unit::Unit()");
38}
39
40
41Unit::~Unit()
42{
43 delete_reference(wrapper_);
44
45 nxt_unit_debug(NULL, "Unit::~Unit()");
46}
47
48
49napi_value
50Unit::init(napi_env env, napi_value exports)
51{
52 nxt_napi napi(env);
53 napi_value ctor;
54
55 napi_property_descriptor unit_props[] = {
56 { "createServer", 0, create_server, 0, 0, 0, napi_default, 0 },
57 { "listen", 0, listen, 0, 0, 0, napi_default, 0 },
58 };
59
60 try {
61 ctor = napi.define_class("Unit", create, 2, unit_props);
62 constructor_ = napi.create_reference(ctor);
63
64 napi.set_named_property(exports, "Unit", ctor);
65 napi.set_named_property(exports, "response_send_headers",
66 response_send_headers);
67 napi.set_named_property(exports, "response_write", response_write);
68 napi.set_named_property(exports, "response_end", response_end);
69 napi.set_named_property(exports, "websocket_send_frame",
70 websocket_send_frame);
71 napi.set_named_property(exports, "websocket_set_sock",
72 websocket_set_sock);
73
74 } catch (exception &e) {
75 napi.throw_error(e);
76 return nullptr;
77 }
78
79 return exports;
80}

--- 7 unchanged lines hidden (view full) ---

88 delete obj;
89}
90
91
92napi_value
93Unit::create(napi_env env, napi_callback_info info)
94{
95 nxt_napi napi(env);
96 napi_value target, ctor, instance, jsthis;
97
98 try {
99 target = napi.get_new_target(info);
100
101 if (target != nullptr) {
102 /* Invoked as constructor: `new Unit(...)`. */
103 jsthis = napi.get_cb_info(info);
104
105 new Unit(env, jsthis);
106 napi.create_reference(jsthis);
107
108 return jsthis;
109 }
110
111 /* Invoked as plain function `Unit(...)`, turn into construct call. */
112 ctor = napi.get_reference_value(constructor_);
113 instance = napi.new_instance(ctor);
114 napi.create_reference(instance);
115
116 } catch (exception &e) {
117 napi.throw_error(e);
118 return nullptr;
119 }
120
121 return instance;

--- 18 unchanged lines hidden (view full) ---

140 } catch (exception &e) {
141 napi.throw_error(e);
142 return nullptr;
143 }
144
145 memset(&unit_init, 0, sizeof(nxt_unit_init_t));
146
147 unit_init.data = obj;
148 unit_init.callbacks.request_handler = request_handler_cb;
149 unit_init.callbacks.websocket_handler = websocket_handler_cb;
150 unit_init.callbacks.close_handler = close_handler_cb;
151 unit_init.callbacks.add_port = add_port;
152 unit_init.callbacks.remove_port = remove_port;
153 unit_init.callbacks.quit = quit_cb;
154
155 unit_init.request_data_size = sizeof(req_data_t);
156
157 obj->unit_ctx_ = nxt_unit_init(&unit_init);
158 if (obj->unit_ctx_ == NULL) {
159 goto failed;
160 }
161
162 return nullptr;
163
164failed:

--- 6 unchanged lines hidden (view full) ---

171
172napi_value
173Unit::listen(napi_env env, napi_callback_info info)
174{
175 return nullptr;
176}
177
178
179void
180Unit::request_handler_cb(nxt_unit_request_info_t *req)
181{
182 Unit *obj;
183
184 obj = reinterpret_cast<Unit *>(req->unit->data);
185
186 obj->request_handler(req);
187}
188
189
190void
191Unit::request_handler(nxt_unit_request_info_t *req)
192{
193 napi_value socket, request, response, server_obj, emit_request;
194
195 memset(req->data, 0, sizeof(req_data_t));
196
197 try {
198 nxt_handle_scope scope(env());
199
200 server_obj = get_server_object();
201
202 socket = create_socket(server_obj, req);
203 request = create_request(server_obj, socket);
204 response = create_response(server_obj, request, req);
205
206 create_headers(req, request);
207
208 emit_request = get_named_property(server_obj, "emit_request");
209
210 nxt_async_context async_context(env(), "request_handler");
211 nxt_callback_scope async_scope(async_context);
212
213 make_callback(async_context, server_obj, emit_request, request,
214 response);
215
216 } catch (exception &e) {
217 nxt_unit_req_warn(req, "request_handler: %s", e.str);
218 }
219}
220
221
222void
223Unit::websocket_handler_cb(nxt_unit_websocket_frame_t *ws)
224{
225 Unit *obj;
226
227 obj = reinterpret_cast<Unit *>(ws->req->unit->data);
228
229 obj->websocket_handler(ws);
230}
231
232
233void
234Unit::websocket_handler(nxt_unit_websocket_frame_t *ws)
235{
236 napi_value frame, server_obj, process_frame, conn;
237 req_data_t *req_data;
238
239 req_data = (req_data_t *) ws->req->data;
240
241 try {
242 nxt_handle_scope scope(env());
243
244 server_obj = get_server_object();
245
246 frame = create_websocket_frame(server_obj, ws);
247
248 conn = get_reference_value(req_data->conn_ref);
249
250 process_frame = get_named_property(conn, "processFrame");
251
252 nxt_async_context async_context(env(), "websocket_handler");
253 nxt_callback_scope async_scope(async_context);
254
255 make_callback(async_context, conn, process_frame, frame);
256
257 } catch (exception &e) {
258 nxt_unit_req_warn(ws->req, "websocket_handler: %s", e.str);
259 }
260
261 nxt_unit_websocket_done(ws);
262}
263
264
265void
266Unit::close_handler_cb(nxt_unit_request_info_t *req)
267{
268 Unit *obj;
269
270 obj = reinterpret_cast<Unit *>(req->unit->data);
271
272 obj->close_handler(req);
273}
274
275
276void
277Unit::close_handler(nxt_unit_request_info_t *req)
278{
279 napi_value conn_handle_close, conn;
280 req_data_t *req_data;
281
282 req_data = (req_data_t *) req->data;
283
284 try {
285 nxt_handle_scope scope(env());
286
287 conn = get_reference_value(req_data->conn_ref);
288
289 conn_handle_close = get_named_property(conn, "handleSocketClose");
290
291 nxt_async_context async_context(env(), "close_handler");
292 nxt_callback_scope async_scope(async_context);
293
294 make_callback(async_context, conn, conn_handle_close,
295 nxt_napi::create(0));
296
297 remove_wrap(req_data->sock_ref);
298 remove_wrap(req_data->resp_ref);
299 remove_wrap(req_data->conn_ref);
300
301 } catch (exception &e) {
302 nxt_unit_req_warn(req, "close_handler: %s", e.str);
303
304 return;
305 }
306
307 nxt_unit_request_done(req, NXT_UNIT_OK);
308}
309
310
311static void
312nxt_uv_read_callback(uv_poll_t *handle, int status, int events)
313{
314 nxt_unit_run_once((nxt_unit_ctx_t *) handle->data);
315}
316
317
318int
319Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
320{
321 int err;
322 Unit *obj;
323 uv_loop_t *loop;
324 napi_status status;
325 nxt_nodejs_ctx_t *node_ctx;
326
327 if (port->in_fd != -1) {
328 obj = reinterpret_cast<Unit *>(ctx->unit->data);
329
330 if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) {
331 nxt_unit_warn(ctx, "fcntl(%d, O_NONBLOCK) failed: %s (%d)",
332 port->in_fd, strerror(errno), errno);
333 return -1;
334 }
335
336 status = napi_get_uv_event_loop(obj->env(), &loop);
337 if (status != napi_ok) {
338 nxt_unit_warn(ctx, "Failed to get uv.loop");
339 return NXT_UNIT_ERROR;
340 }
341
342 node_ctx = new nxt_nodejs_ctx_t;
343
344 err = uv_poll_init(loop, &node_ctx->poll, port->in_fd);
345 if (err < 0) {
346 nxt_unit_warn(ctx, "Failed to init uv.poll");
347 return NXT_UNIT_ERROR;
348 }
349
350 err = uv_poll_start(&node_ctx->poll, UV_READABLE, nxt_uv_read_callback);
351 if (err < 0) {
352 nxt_unit_warn(ctx, "Failed to start uv.poll");
353 return NXT_UNIT_ERROR;
354 }
355
356 ctx->data = node_ctx;
357
358 node_ctx->port_id = port->id;
359 node_ctx->poll.data = ctx;
360 }

--- 26 unchanged lines hidden (view full) ---

387 }
388 }
389
390 nxt_unit_remove_port(ctx, port_id);
391}
392
393
394void
395Unit::quit_cb(nxt_unit_ctx_t *ctx)
396{
397 Unit *obj;
398
399 obj = reinterpret_cast<Unit *>(ctx->unit->data);
400
401 obj->quit(ctx);
402}
403
404
405void
406Unit::quit(nxt_unit_ctx_t *ctx)
407{
408 napi_value server_obj, emit_close;
409
410 try {
411 nxt_handle_scope scope(env());
412
413 server_obj = get_server_object();
414
415 emit_close = get_named_property(server_obj, "emit_close");
416
417 nxt_async_context async_context(env(), "unit_quit");
418 nxt_callback_scope async_scope(async_context);
419
420 make_callback(async_context, server_obj, emit_close);
421
422 } catch (exception &e) {
423 nxt_unit_debug(ctx, "quit: %s", e.str);
424 }
425
426 nxt_unit_done(ctx);
427}
428
429
430napi_value
431Unit::get_server_object()

--- 4 unchanged lines hidden (view full) ---

436
437 return get_named_property(unit_obj, "server");
438}
439
440
441void
442Unit::create_headers(nxt_unit_request_info_t *req, napi_value request)
443{
444 void *data;
445 uint32_t i;
446 napi_value headers, raw_headers, buffer;
447 napi_status status;
448 nxt_unit_request_t *r;
449
450 r = req->request;
451
452 headers = create_object();
453
454 status = napi_create_array_with_length(env(), r->fields_count * 2,

--- 6 unchanged lines hidden (view full) ---

461 append_header(r->fields + i, headers, raw_headers, i);
462 }
463
464 set_named_property(request, "headers", headers);
465 set_named_property(request, "rawHeaders", raw_headers);
466 set_named_property(request, "httpVersion", r->version, r->version_length);
467 set_named_property(request, "method", r->method, r->method_length);
468 set_named_property(request, "url", r->target, r->target_length);
469
470 set_named_property(request, "_websocket_handshake", r->websocket_handshake);
471
472 buffer = create_buffer((size_t) req->content_length, &data);
473 nxt_unit_request_read(req, data, req->content_length);
474
475 set_named_property(request, "_data", buffer);
476}
477
478
479inline char
480lowcase(char c)
481{
482 return (c >= 'A' && c <= 'Z') ? (c | 0x20) : c;
483}

--- 21 unchanged lines hidden (view full) ---

505 set_element(raw_headers, idx * 2 + 1, vstr);
506}
507
508
509napi_value
510Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req)
511{
512 napi_value constructor, res;
513 req_data_t *req_data;
514 nxt_unit_request_t *r;
515
516 r = req->request;
517
518 constructor = get_named_property(server_obj, "Socket");
519
520 res = new_instance(constructor);
521
522 req_data = (req_data_t *) req->data;
523 req_data->sock_ref = wrap(res, req, sock_destroy);
524
525 set_named_property(res, "remoteAddress", r->remote, r->remote_length);
526 set_named_property(res, "localAddress", r->local, r->local_length);
527
528 return res;
529}
530
531
532napi_value
533Unit::create_request(napi_value server_obj, napi_value socket)
534{
535 napi_value constructor;
536
537 constructor = get_named_property(server_obj, "ServerRequest");
538
539 return new_instance(constructor, server_obj, socket);
540}
541
542
543napi_value
544Unit::create_response(napi_value server_obj, napi_value request,
545 nxt_unit_request_info_t *req)
546{
547 napi_value constructor, res;
548 req_data_t *req_data;
549
550 constructor = get_named_property(server_obj, "ServerResponse");
551
552 res = new_instance(constructor, request);
553
554 req_data = (req_data_t *) req->data;
555 req_data->resp_ref = wrap(res, req, resp_destroy);
556
557 return res;
558}
559
560
561napi_value
562Unit::create_websocket_frame(napi_value server_obj,
563 nxt_unit_websocket_frame_t *ws)
564{
565 void *data;
566 napi_value constructor, res, buffer;
567 uint8_t sc[2];
568
569 constructor = get_named_property(server_obj, "WebSocketFrame");
570
571 res = new_instance(constructor);
572
573 set_named_property(res, "fin", (bool) ws->header->fin);
574 set_named_property(res, "opcode", ws->header->opcode);
575 set_named_property(res, "length", (int64_t) ws->payload_len);
576
577 if (ws->header->opcode == NXT_WEBSOCKET_OP_CLOSE) {
578 if (ws->payload_len >= 2) {
579 nxt_unit_websocket_read(ws, sc, 2);
580
581 set_named_property(res, "closeStatus",
582 (((uint16_t) sc[0]) << 8) | sc[1]);
583
584 } else {
585 set_named_property(res, "closeStatus", -1);
586 }
587 }
588
589 buffer = create_buffer((size_t) ws->content_length, &data);
590 nxt_unit_websocket_read(ws, data, ws->content_length);
591
592 set_named_property(res, "binaryPayload", buffer);
593
594 return res;
595}
596
597
598napi_value
599Unit::response_send_headers(napi_env env, napi_callback_info info)
600{
601 int ret;
602 char *ptr, *name_ptr;
603 bool is_array;
604 size_t argc, name_len, value_len;
605 uint32_t status_code, header_len, keys_len, array_len;
606 uint32_t keys_count, i, j;
607 uint16_t hash;
608 nxt_napi napi(env);
609 napi_value this_arg, headers, keys, name, value, array_val;
610 napi_value array_entry;
611 napi_valuetype val_type;
612 nxt_unit_field_t *f;
613 nxt_unit_request_info_t *req;
614 napi_value argv[4];
615
616 argc = 4;
617
618 try {
619 this_arg = napi.get_cb_info(info, argc, argv);
620 if (argc != 4) {
621 napi.throw_error("Wrong args count. Expected: "
622 "statusCode, headers, headers count, "
623 "headers length");
624 return nullptr;
625 }
626
627 req = napi.get_request_info(this_arg);
628 status_code = napi.get_value_uint32(argv[0]);
629 keys_count = napi.get_value_uint32(argv[2]);
630 header_len = napi.get_value_uint32(argv[3]);
631
632 /* Need to reserve extra byte for C-string 0-termination. */
633 header_len++;
634
635 headers = argv[1];
636
637 ret = nxt_unit_response_init(req, status_code, keys_count, header_len);
638 if (ret != NXT_UNIT_OK) {
639 napi.throw_error("Failed to create response");
640 return nullptr;
641 }
642
643 keys = napi.get_property_names(headers);

--- 94 unchanged lines hidden (view full) ---

738 return this_arg;
739}
740
741
742napi_value
743Unit::response_write(napi_env env, napi_callback_info info)
744{
745 int ret;
746 void *ptr;
747 size_t argc, have_buf_len;
748 uint32_t buf_len;
749 nxt_napi napi(env);
750 napi_value this_arg;
751 nxt_unit_buf_t *buf;
752 napi_valuetype buf_type;
753 nxt_unit_request_info_t *req;
754 napi_value argv[2];
755
756 argc = 2;
757
758 try {
759 this_arg = napi.get_cb_info(info, argc, argv);
760 if (argc != 2) {
761 throw exception("Wrong args count. Expected: "
762 "chunk, chunk length");
763 }
764
765 req = napi.get_request_info(this_arg);
766 buf_type = napi.type_of(argv[0]);
767 buf_len = napi.get_value_uint32(argv[1]) + 1;
768
769 buf = nxt_unit_response_buf_alloc(req, buf_len);
770 if (buf == NULL) {
771 throw exception("Failed to allocate response buffer");
772 }
773
774 if (buf_type == napi_string) {
775 /* TODO: will work only for utf8 content-type */
776
777 have_buf_len = napi.get_value_string_utf8(argv[0], buf->free,
778 buf_len);
779
780 } else {
781 ptr = napi.get_buffer_info(argv[0], have_buf_len);
782
783 memcpy(buf->free, ptr, have_buf_len);
784 }
785
786 buf->free += have_buf_len;
787
788 ret = nxt_unit_buf_send(buf);
789 if (ret != NXT_UNIT_OK) {
790 throw exception("Failed to send body buf");
791 }
792 } catch (exception &e) {
793 napi.throw_error(e);
794 return nullptr;
795 }
796
797 return this_arg;
798}
799
800
801napi_value
802Unit::response_end(napi_env env, napi_callback_info info)
803{
804 nxt_napi napi(env);
805 napi_value this_arg;
806 req_data_t *req_data;
807 nxt_unit_request_info_t *req;
808
809 try {
810 this_arg = napi.get_cb_info(info);
811
812 req = napi.get_request_info(this_arg);
813
814 req_data = (req_data_t *) req->data;
815
816 napi.remove_wrap(req_data->sock_ref);
817 napi.remove_wrap(req_data->resp_ref);
818 napi.remove_wrap(req_data->conn_ref);
819
820 } catch (exception &e) {
821 napi.throw_error(e);
822 return nullptr;
823 }
824
825 nxt_unit_request_done(req, NXT_UNIT_OK);
826
827 return this_arg;
828}
829
830
831napi_value
832Unit::websocket_send_frame(napi_env env, napi_callback_info info)
833{
834 int ret, iovec_len;
835 bool fin;
836 size_t buf_len;
837 uint32_t opcode, sc;
838 nxt_napi napi(env);
839 napi_value this_arg, frame, payload;
840 nxt_unit_request_info_t *req;
841 char status_code[2];
842 struct iovec iov[2];
843
844 iovec_len = 0;
845
846 try {
847 this_arg = napi.get_cb_info(info, frame);
848
849 req = napi.get_request_info(this_arg);
850
851 opcode = napi.get_value_uint32(napi.get_named_property(frame,
852 "opcode"));
853 if (opcode == NXT_WEBSOCKET_OP_CLOSE) {
854 sc = napi.get_value_uint32(napi.get_named_property(frame,
855 "closeStatus"));
856 status_code[0] = (sc >> 8) & 0xFF;
857 status_code[1] = sc & 0xFF;
858
859 iov[iovec_len].iov_base = status_code;
860 iov[iovec_len].iov_len = 2;
861 iovec_len++;
862 }
863
864 try {
865 fin = napi.get_value_bool(napi.get_named_property(frame, "fin"));
866
867 } catch (exception &e) {
868 fin = true;
869 }
870
871 payload = napi.get_named_property(frame, "binaryPayload");
872
873 if (napi.is_buffer(payload)) {
874 iov[iovec_len].iov_base = napi.get_buffer_info(payload, buf_len);
875
876 } else {
877 buf_len = 0;
878 }
879
880 } catch (exception &e) {
881 napi.throw_error(e);
882 return nullptr;
883 }
884
885 if (buf_len > 0) {
886 iov[iovec_len].iov_len = buf_len;
887 iovec_len++;
888 }
889
890 ret = nxt_unit_websocket_sendv(req, opcode, fin ? 1 : 0, iov, iovec_len);
891 if (ret != NXT_UNIT_OK) {
892 goto failed;
893 }
894
895 return this_arg;
896
897failed:
898
899 napi.throw_error("Failed to send frame");
900
901 return nullptr;
902}
903
904
905napi_value
906Unit::websocket_set_sock(napi_env env, napi_callback_info info)
907{
908 nxt_napi napi(env);
909 napi_value this_arg, sock;
910 req_data_t *req_data;
911 nxt_unit_request_info_t *req;
912
913 try {
914 this_arg = napi.get_cb_info(info, sock);
915
916 req = napi.get_request_info(sock);
917
918 req_data = (req_data_t *) req->data;
919 req_data->conn_ref = napi.wrap(this_arg, req, conn_destroy);
920
921 } catch (exception &e) {
922 napi.throw_error(e);
923 return nullptr;
924 }
925
926 return this_arg;
927}
928
929
930void
931Unit::conn_destroy(napi_env env, void *nativeObject, void *finalize_hint)
932{
933 nxt_unit_request_info_t *req;
934
935 req = (nxt_unit_request_info_t *) nativeObject;
936
937 nxt_unit_warn(NULL, "conn_destroy: %p", req);
938}
939
940
941void
942Unit::sock_destroy(napi_env env, void *nativeObject, void *finalize_hint)
943{
944 nxt_unit_request_info_t *req;
945
946 req = (nxt_unit_request_info_t *) nativeObject;
947
948 nxt_unit_warn(NULL, "sock_destroy: %p", req);
949}
950
951
952void
953Unit::resp_destroy(napi_env env, void *nativeObject, void *finalize_hint)
954{
955 nxt_unit_request_info_t *req;
956
957 req = (nxt_unit_request_info_t *) nativeObject;
958
959 nxt_unit_warn(NULL, "resp_destroy: %p", req);
960}