1
2 /*
3 * Copyright (C) NGINX, Inc.
4 */
5
6 #include "unit.h"
7
8 #include <unistd.h>
9 #include <fcntl.h>
10
11 #include <uv.h>
12
13 #include <nxt_unit_websocket.h>
14
15
16 napi_ref Unit::constructor_;
17
18
19 struct port_data_t {
20 port_data_t(nxt_unit_ctx_t *c, nxt_unit_port_t *p);
21
22 void process_port_msg();
23 void stop();
24
25 template<typename T>
26 static port_data_t *get(T *handle);
27
28 static void read_callback(uv_poll_t *handle, int status, int events);
29 static void timer_callback(uv_timer_t *handle);
30 static void delete_data(uv_handle_t* handle);
31
32 nxt_unit_ctx_t *ctx;
33 nxt_unit_port_t *port;
34 uv_poll_t poll;
35 uv_timer_t timer;
36 int ref_count;
37 bool scheduled;
38 bool stopped;
39 };
40
41
42 struct req_data_t {
43 napi_ref sock_ref;
44 napi_ref req_ref;
45 napi_ref resp_ref;
46 napi_ref conn_ref;
47 };
48
49
port_data_t(nxt_unit_ctx_t * c,nxt_unit_port_t * p)50 port_data_t::port_data_t(nxt_unit_ctx_t *c, nxt_unit_port_t *p) :
51 ctx(c), port(p), ref_count(0), scheduled(false), stopped(false)
52 {
53 timer.type = UV_UNKNOWN_HANDLE;
54 }
55
56
57 void
process_port_msg()58 port_data_t::process_port_msg()
59 {
60 int rc, err;
61
62 rc = nxt_unit_process_port_msg(ctx, port);
63
64 if (rc != NXT_UNIT_OK) {
65 return;
66 }
67
68 if (timer.type == UV_UNKNOWN_HANDLE) {
69 err = uv_timer_init(poll.loop, &timer);
70 if (err < 0) {
71 nxt_unit_warn(ctx, "Failed to init uv.poll");
72 return;
73 }
74
75 ref_count++;
76 timer.data = this;
77 }
78
79 if (!scheduled && !stopped) {
80 uv_timer_start(&timer, timer_callback, 0, 0);
81
82 scheduled = true;
83 }
84 }
85
86
87 void
stop()88 port_data_t::stop()
89 {
90 stopped = true;
91
92 uv_poll_stop(&poll);
93
94 uv_close((uv_handle_t *) &poll, delete_data);
95
96 if (timer.type == UV_UNKNOWN_HANDLE) {
97 return;
98 }
99
100 uv_timer_stop(&timer);
101
102 uv_close((uv_handle_t *) &timer, delete_data);
103 }
104
105
106 template<typename T>
107 port_data_t *
get(T * handle)108 port_data_t::get(T *handle)
109 {
110 return (port_data_t *) handle->data;
111 }
112
113
114 void
read_callback(uv_poll_t * handle,int status,int events)115 port_data_t::read_callback(uv_poll_t *handle, int status, int events)
116 {
117 get(handle)->process_port_msg();
118 }
119
120
121 void
timer_callback(uv_timer_t * handle)122 port_data_t::timer_callback(uv_timer_t *handle)
123 {
124 port_data_t *data;
125
126 data = get(handle);
127
128 data->scheduled = false;
129 if (data->stopped) {
130 return;
131 }
132
133 data->process_port_msg();
134 }
135
136
137 void
delete_data(uv_handle_t * handle)138 port_data_t::delete_data(uv_handle_t* handle)
139 {
140 port_data_t *data;
141
142 data = get(handle);
143
144 if (--data->ref_count <= 0) {
145 delete data;
146 }
147 }
148
149
Unit(napi_env env,napi_value jsthis)150 Unit::Unit(napi_env env, napi_value jsthis):
151 nxt_napi(env),
152 wrapper_(wrap(jsthis, this, destroy)),
153 unit_ctx_(nullptr)
154 {
155 nxt_unit_debug(NULL, "Unit::Unit()");
156 }
157
158
~Unit()159 Unit::~Unit()
160 {
161 delete_reference(wrapper_);
162
163 nxt_unit_debug(NULL, "Unit::~Unit()");
164 }
165
166
167 napi_value
init(napi_env env,napi_value exports)168 Unit::init(napi_env env, napi_value exports)
169 {
170 nxt_napi napi(env);
171 napi_value ctor;
172
173 napi_property_descriptor unit_props[] = {
174 { "createServer", 0, create_server, 0, 0, 0, napi_default, 0 },
175 { "listen", 0, listen, 0, 0, 0, napi_default, 0 },
176 };
177
178 try {
179 ctor = napi.define_class("Unit", create, 2, unit_props);
180 constructor_ = napi.create_reference(ctor);
181
182 napi.set_named_property(exports, "Unit", ctor);
183 napi.set_named_property(exports, "request_read", request_read);
184 napi.set_named_property(exports, "response_send_headers",
185 response_send_headers);
186 napi.set_named_property(exports, "response_write", response_write);
187 napi.set_named_property(exports, "response_end", response_end);
188 napi.set_named_property(exports, "websocket_send_frame",
189 websocket_send_frame);
190 napi.set_named_property(exports, "websocket_set_sock",
191 websocket_set_sock);
192 napi.set_named_property(exports, "buf_min", nxt_unit_buf_min());
193 napi.set_named_property(exports, "buf_max", nxt_unit_buf_max());
194
195 } catch (exception &e) {
196 napi.throw_error(e);
197 return nullptr;
198 }
199
200 return exports;
201 }
202
203
204 void
destroy(napi_env env,void * nativeObject,void * finalize_hint)205 Unit::destroy(napi_env env, void *nativeObject, void *finalize_hint)
206 {
207 Unit *obj = reinterpret_cast<Unit *>(nativeObject);
208
209 delete obj;
210 }
211
212
213 napi_value
create(napi_env env,napi_callback_info info)214 Unit::create(napi_env env, napi_callback_info info)
215 {
216 nxt_napi napi(env);
217 napi_value target, ctor, instance, jsthis;
218
219 try {
220 target = napi.get_new_target(info);
221
222 if (target != nullptr) {
223 /* Invoked as constructor: `new Unit(...)`. */
224 jsthis = napi.get_cb_info(info);
225
226 new Unit(env, jsthis);
227 napi.create_reference(jsthis);
228
229 return jsthis;
230 }
231
232 /* Invoked as plain function `Unit(...)`, turn into construct call. */
233 ctor = napi.get_reference_value(constructor_);
234 instance = napi.new_instance(ctor);
235 napi.create_reference(instance);
236
237 } catch (exception &e) {
238 napi.throw_error(e);
239 return nullptr;
240 }
241
242 return instance;
243 }
244
245
246 napi_value
create_server(napi_env env,napi_callback_info info)247 Unit::create_server(napi_env env, napi_callback_info info)
248 {
249 Unit *obj;
250 size_t argc;
251 nxt_napi napi(env);
252 napi_value jsthis, argv;
253 nxt_unit_init_t unit_init;
254
255 argc = 1;
256
257 try {
258 jsthis = napi.get_cb_info(info, argc, &argv);
259 obj = (Unit *) napi.unwrap(jsthis);
260
261 } catch (exception &e) {
262 napi.throw_error(e);
263 return nullptr;
264 }
265
266 memset(&unit_init, 0, sizeof(nxt_unit_init_t));
267
268 unit_init.data = obj;
269 unit_init.callbacks.request_handler = request_handler_cb;
270 unit_init.callbacks.websocket_handler = websocket_handler_cb;
271 unit_init.callbacks.close_handler = close_handler_cb;
272 unit_init.callbacks.shm_ack_handler = shm_ack_handler_cb;
273 unit_init.callbacks.add_port = add_port;
274 unit_init.callbacks.remove_port = remove_port;
275 unit_init.callbacks.quit = quit_cb;
276
277 unit_init.request_data_size = sizeof(req_data_t);
278
279 obj->unit_ctx_ = nxt_unit_init(&unit_init);
280 if (obj->unit_ctx_ == NULL) {
281 goto failed;
282 }
283
284 return nullptr;
285
286 failed:
287
288 napi_throw_error(env, NULL, "Failed to create Unit object");
289
290 return nullptr;
291 }
292
293
294 napi_value
listen(napi_env env,napi_callback_info info)295 Unit::listen(napi_env env, napi_callback_info info)
296 {
297 return nullptr;
298 }
299
300
301 void
request_handler_cb(nxt_unit_request_info_t * req)302 Unit::request_handler_cb(nxt_unit_request_info_t *req)
303 {
304 Unit *obj;
305
306 obj = reinterpret_cast<Unit *>(req->unit->data);
307
308 obj->request_handler(req);
309 }
310
311
312 void
request_handler(nxt_unit_request_info_t * req)313 Unit::request_handler(nxt_unit_request_info_t *req)
314 {
315 napi_value socket, request, response, server_obj, emit_request;
316
317 memset(req->data, 0, sizeof(req_data_t));
318
319 try {
320 nxt_handle_scope scope(env());
321
322 server_obj = get_server_object();
323
324 socket = create_socket(server_obj, req);
325 request = create_request(server_obj, socket, req);
326 response = create_response(server_obj, request, req);
327
328 create_headers(req, request);
329
330 emit_request = get_named_property(server_obj, "emit_request");
331
332 nxt_async_context async_context(env(), "request_handler");
333 nxt_callback_scope async_scope(async_context);
334
335 make_callback(async_context, server_obj, emit_request, request,
336 response);
337
338 } catch (exception &e) {
339 nxt_unit_req_warn(req, "request_handler: %s", e.str);
340 }
341 }
342
343
344 void
websocket_handler_cb(nxt_unit_websocket_frame_t * ws)345 Unit::websocket_handler_cb(nxt_unit_websocket_frame_t *ws)
346 {
347 Unit *obj;
348
349 obj = reinterpret_cast<Unit *>(ws->req->unit->data);
350
351 obj->websocket_handler(ws);
352 }
353
354
355 void
websocket_handler(nxt_unit_websocket_frame_t * ws)356 Unit::websocket_handler(nxt_unit_websocket_frame_t *ws)
357 {
358 napi_value frame, server_obj, process_frame, conn;
359 req_data_t *req_data;
360
361 req_data = (req_data_t *) ws->req->data;
362
363 try {
364 nxt_handle_scope scope(env());
365
366 server_obj = get_server_object();
367
368 frame = create_websocket_frame(server_obj, ws);
369
370 conn = get_reference_value(req_data->conn_ref);
371
372 process_frame = get_named_property(conn, "processFrame");
373
374 nxt_async_context async_context(env(), "websocket_handler");
375 nxt_callback_scope async_scope(async_context);
376
377 make_callback(async_context, conn, process_frame, frame);
378
379 } catch (exception &e) {
380 nxt_unit_req_warn(ws->req, "websocket_handler: %s", e.str);
381 }
382
383 nxt_unit_websocket_done(ws);
384 }
385
386
387 void
close_handler_cb(nxt_unit_request_info_t * req)388 Unit::close_handler_cb(nxt_unit_request_info_t *req)
389 {
390 Unit *obj;
391
392 obj = reinterpret_cast<Unit *>(req->unit->data);
393
394 obj->close_handler(req);
395 }
396
397
398 void
close_handler(nxt_unit_request_info_t * req)399 Unit::close_handler(nxt_unit_request_info_t *req)
400 {
401 napi_value conn_handle_close, conn;
402 req_data_t *req_data;
403
404 req_data = (req_data_t *) req->data;
405
406 try {
407 nxt_handle_scope scope(env());
408
409 conn = get_reference_value(req_data->conn_ref);
410
411 conn_handle_close = get_named_property(conn, "handleSocketClose");
412
413 nxt_async_context async_context(env(), "close_handler");
414 nxt_callback_scope async_scope(async_context);
415
416 make_callback(async_context, conn, conn_handle_close,
417 nxt_napi::create(0));
418
419 remove_wrap(req_data->sock_ref);
420 remove_wrap(req_data->req_ref);
421 remove_wrap(req_data->resp_ref);
422 remove_wrap(req_data->conn_ref);
423
424 } catch (exception &e) {
425 nxt_unit_req_warn(req, "close_handler: %s", e.str);
426
427 nxt_unit_request_done(req, NXT_UNIT_ERROR);
428
429 return;
430 }
431
432 nxt_unit_request_done(req, NXT_UNIT_OK);
433 }
434
435
436 void
shm_ack_handler_cb(nxt_unit_ctx_t * ctx)437 Unit::shm_ack_handler_cb(nxt_unit_ctx_t *ctx)
438 {
439 Unit *obj;
440
441 obj = reinterpret_cast<Unit *>(ctx->unit->data);
442
443 obj->shm_ack_handler(ctx);
444 }
445
446
447 void
shm_ack_handler(nxt_unit_ctx_t * ctx)448 Unit::shm_ack_handler(nxt_unit_ctx_t *ctx)
449 {
450 napi_value server_obj, emit_drain;
451
452 try {
453 nxt_handle_scope scope(env());
454
455 server_obj = get_server_object();
456
457 emit_drain = get_named_property(server_obj, "emit_drain");
458
459 nxt_async_context async_context(env(), "shm_ack_handler");
460 nxt_callback_scope async_scope(async_context);
461
462 make_callback(async_context, server_obj, emit_drain);
463
464 } catch (exception &e) {
465 nxt_unit_warn(ctx, "shm_ack_handler: %s", e.str);
466 }
467 }
468
469
470 int
add_port(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port)471 Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
472 {
473 int err;
474 Unit *obj;
475 uv_loop_t *loop;
476 port_data_t *data;
477 napi_status status;
478
479 if (port->in_fd != -1) {
480 if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) {
481 nxt_unit_warn(ctx, "fcntl(%d, O_NONBLOCK) failed: %s (%d)",
482 port->in_fd, strerror(errno), errno);
483 return -1;
484 }
485
486 obj = reinterpret_cast<Unit *>(ctx->unit->data);
487
488 status = napi_get_uv_event_loop(obj->env(), &loop);
489 if (status != napi_ok) {
490 nxt_unit_warn(ctx, "Failed to get uv.loop");
491 return NXT_UNIT_ERROR;
492 }
493
494 data = new port_data_t(ctx, port);
495
496 err = uv_poll_init(loop, &data->poll, port->in_fd);
497 if (err < 0) {
498 nxt_unit_warn(ctx, "Failed to init uv.poll");
499 delete data;
500 return NXT_UNIT_ERROR;
501 }
502
503 err = uv_poll_start(&data->poll, UV_READABLE,
504 port_data_t::read_callback);
505 if (err < 0) {
506 nxt_unit_warn(ctx, "Failed to start uv.poll");
507 delete data;
508 return NXT_UNIT_ERROR;
509 }
510
511 port->data = data;
512
513 data->ref_count++;
514 data->poll.data = data;
515 }
516
517 return NXT_UNIT_OK;
518 }
519
520
521 void
remove_port(nxt_unit_t * unit,nxt_unit_ctx_t * ctx,nxt_unit_port_t * port)522 Unit::remove_port(nxt_unit_t *unit, nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
523 {
524 port_data_t *data;
525
526 if (port->data != NULL && ctx != NULL) {
527 data = (port_data_t *) port->data;
528
529 data->stop();
530 }
531 }
532
533
534 void
quit_cb(nxt_unit_ctx_t * ctx)535 Unit::quit_cb(nxt_unit_ctx_t *ctx)
536 {
537 Unit *obj;
538
539 obj = reinterpret_cast<Unit *>(ctx->unit->data);
540
541 obj->quit(ctx);
542 }
543
544
545 void
quit(nxt_unit_ctx_t * ctx)546 Unit::quit(nxt_unit_ctx_t *ctx)
547 {
548 napi_value server_obj, emit_close;
549
550 try {
551 nxt_handle_scope scope(env());
552
553 server_obj = get_server_object();
554
555 emit_close = get_named_property(server_obj, "emit_close");
556
557 nxt_async_context async_context(env(), "unit_quit");
558 nxt_callback_scope async_scope(async_context);
559
560 make_callback(async_context, server_obj, emit_close);
561
562 } catch (exception &e) {
563 nxt_unit_debug(ctx, "quit: %s", e.str);
564 }
565
566 nxt_unit_done(ctx);
567 }
568
569
570 napi_value
get_server_object()571 Unit::get_server_object()
572 {
573 napi_value unit_obj;
574
575 unit_obj = get_reference_value(wrapper_);
576
577 return get_named_property(unit_obj, "server");
578 }
579
580
581 void
create_headers(nxt_unit_request_info_t * req,napi_value request)582 Unit::create_headers(nxt_unit_request_info_t *req, napi_value request)
583 {
584 uint32_t i;
585 napi_value headers, raw_headers;
586 napi_status status;
587 nxt_unit_request_t *r;
588
589 r = req->request;
590
591 headers = create_object();
592
593 status = napi_create_array_with_length(env(), r->fields_count * 2,
594 &raw_headers);
595 if (status != napi_ok) {
596 throw exception("Failed to create array");
597 }
598
599 for (i = 0; i < r->fields_count; i++) {
600 append_header(r->fields + i, headers, raw_headers, i);
601 }
602
603 set_named_property(request, "headers", headers);
604 set_named_property(request, "rawHeaders", raw_headers);
605 set_named_property(request, "httpVersion", r->version, r->version_length);
606 set_named_property(request, "method", r->method, r->method_length);
607 set_named_property(request, "url", r->target, r->target_length);
608
609 set_named_property(request, "_websocket_handshake", r->websocket_handshake);
610 }
611
612
613 inline char
lowcase(char c)614 lowcase(char c)
615 {
616 return (c >= 'A' && c <= 'Z') ? (c | 0x20) : c;
617 }
618
619
620 inline void
append_header(nxt_unit_field_t * f,napi_value headers,napi_value raw_headers,uint32_t idx)621 Unit::append_header(nxt_unit_field_t *f, napi_value headers,
622 napi_value raw_headers, uint32_t idx)
623 {
624 char *name;
625 uint8_t i;
626 napi_value str, vstr;
627
628 name = (char *) nxt_unit_sptr_get(&f->name);
629
630 str = create_string_latin1(name, f->name_length);
631
632 for (i = 0; i < f->name_length; i++) {
633 name[i] = lowcase(name[i]);
634 }
635
636 vstr = set_named_property(headers, name, f->value, f->value_length);
637
638 set_element(raw_headers, idx * 2, str);
639 set_element(raw_headers, idx * 2 + 1, vstr);
640 }
641
642
643 napi_value
create_socket(napi_value server_obj,nxt_unit_request_info_t * req)644 Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req)
645 {
646 napi_value constructor, res;
647 req_data_t *req_data;
648 nxt_unit_request_t *r;
649
650 r = req->request;
651
652 constructor = get_named_property(server_obj, "Socket");
653
654 res = new_instance(constructor);
655
656 req_data = (req_data_t *) req->data;
657 req_data->sock_ref = wrap(res, req, sock_destroy);
658
659 set_named_property(res, "remoteAddress", r->remote, r->remote_length);
660 set_named_property(res, "localAddress", r->local, r->local_length);
661
662 return res;
663 }
664
665
666 napi_value
create_request(napi_value server_obj,napi_value socket,nxt_unit_request_info_t * req)667 Unit::create_request(napi_value server_obj, napi_value socket,
668 nxt_unit_request_info_t *req)
669 {
670 napi_value constructor, res;
671 req_data_t *req_data;
672
673 constructor = get_named_property(server_obj, "ServerRequest");
674
675 res = new_instance(constructor, server_obj, socket);
676
677 req_data = (req_data_t *) req->data;
678 req_data->req_ref = wrap(res, req, req_destroy);
679
680 return res;
681 }
682
683
684 napi_value
create_response(napi_value server_obj,napi_value request,nxt_unit_request_info_t * req)685 Unit::create_response(napi_value server_obj, napi_value request,
686 nxt_unit_request_info_t *req)
687 {
688 napi_value constructor, res;
689 req_data_t *req_data;
690
691 constructor = get_named_property(server_obj, "ServerResponse");
692
693 res = new_instance(constructor, request);
694
695 req_data = (req_data_t *) req->data;
696 req_data->resp_ref = wrap(res, req, resp_destroy);
697
698 return res;
699 }
700
701
702 napi_value
create_websocket_frame(napi_value server_obj,nxt_unit_websocket_frame_t * ws)703 Unit::create_websocket_frame(napi_value server_obj,
704 nxt_unit_websocket_frame_t *ws)
705 {
706 void *data;
707 napi_value constructor, res, buffer;
708 uint8_t sc[2];
709
710 constructor = get_named_property(server_obj, "WebSocketFrame");
711
712 res = new_instance(constructor);
713
714 set_named_property(res, "fin", (bool) ws->header->fin);
715 set_named_property(res, "opcode", ws->header->opcode);
716 set_named_property(res, "length", (int64_t) ws->payload_len);
717
718 if (ws->header->opcode == NXT_WEBSOCKET_OP_CLOSE) {
719 if (ws->payload_len >= 2) {
720 nxt_unit_websocket_read(ws, sc, 2);
721
722 set_named_property(res, "closeStatus",
723 (((uint16_t) sc[0]) << 8) | sc[1]);
724
725 } else {
726 set_named_property(res, "closeStatus", -1);
727 }
728 }
729
730 buffer = create_buffer((size_t) ws->content_length, &data);
731 nxt_unit_websocket_read(ws, data, ws->content_length);
732
733 set_named_property(res, "binaryPayload", buffer);
734
735 return res;
736 }
737
738
739 napi_value
request_read(napi_env env,napi_callback_info info)740 Unit::request_read(napi_env env, napi_callback_info info)
741 {
742 void *data;
743 uint32_t wm;
744 nxt_napi napi(env);
745 napi_value this_arg, argv, buffer;
746 nxt_unit_request_info_t *req;
747
748 try {
749 this_arg = napi.get_cb_info(info, argv);
750
751 try {
752 req = napi.get_request_info(this_arg);
753
754 } catch (exception &e) {
755 return nullptr;
756 }
757
758 if (req->content_length == 0) {
759 return nullptr;
760 }
761
762 wm = napi.get_value_uint32(argv);
763
764 if (wm > req->content_length) {
765 wm = req->content_length;
766 }
767
768 buffer = napi.create_buffer((size_t) wm, &data);
769 nxt_unit_request_read(req, data, wm);
770
771 } catch (exception &e) {
772 napi.throw_error(e);
773 return nullptr;
774 }
775
776 return buffer;
777 }
778
779
780 napi_value
response_send_headers(napi_env env,napi_callback_info info)781 Unit::response_send_headers(napi_env env, napi_callback_info info)
782 {
783 int ret;
784 char *ptr, *name_ptr;
785 bool is_array;
786 size_t argc, name_len, value_len;
787 uint32_t status_code, header_len, keys_len, array_len;
788 uint32_t keys_count, i, j;
789 uint16_t hash;
790 nxt_napi napi(env);
791 napi_value this_arg, headers, keys, name, value, array_val;
792 napi_value array_entry;
793 napi_valuetype val_type;
794 nxt_unit_field_t *f;
795 nxt_unit_request_info_t *req;
796 napi_value argv[4];
797
798 argc = 4;
799
800 try {
801 this_arg = napi.get_cb_info(info, argc, argv);
802 if (argc != 4) {
803 napi.throw_error("Wrong args count. Expected: "
804 "statusCode, headers, headers count, "
805 "headers length");
806 return nullptr;
807 }
808
809 req = napi.get_request_info(this_arg);
810 status_code = napi.get_value_uint32(argv[0]);
811 keys_count = napi.get_value_uint32(argv[2]);
812 header_len = napi.get_value_uint32(argv[3]);
813
814 headers = argv[1];
815
816 ret = nxt_unit_response_init(req, status_code, keys_count, header_len);
817 if (ret != NXT_UNIT_OK) {
818 napi.throw_error("Failed to create response");
819 return nullptr;
820 }
821
822 /*
823 * Each name and value are 0-terminated by libunit.
824 * Need to add extra 2 bytes for each header.
825 */
826 header_len += keys_count * 2;
827
828 keys = napi.get_property_names(headers);
829 keys_len = napi.get_array_length(keys);
830
831 ptr = req->response_buf->free;
832
833 for (i = 0; i < keys_len; i++) {
834 name = napi.get_element(keys, i);
835
836 array_entry = napi.get_property(headers, name);
837
838 name = napi.get_element(array_entry, 0);
839 value = napi.get_element(array_entry, 1);
840
841 name_len = napi.get_value_string_latin1(name, ptr, header_len);
842 name_ptr = ptr;
843
844 ptr += name_len + 1;
845 header_len -= name_len + 1;
846
847 hash = nxt_unit_field_hash(name_ptr, name_len);
848
849 is_array = napi.is_array(value);
850
851 if (is_array) {
852 array_len = napi.get_array_length(value);
853
854 for (j = 0; j < array_len; j++) {
855 array_val = napi.get_element(value, j);
856
857 val_type = napi.type_of(array_val);
858
859 if (val_type != napi_string) {
860 array_val = napi.coerce_to_string(array_val);
861 }
862
863 value_len = napi.get_value_string_latin1(array_val, ptr,
864 header_len);
865
866 f = req->response->fields + req->response->fields_count;
867 f->skip = 0;
868
869 nxt_unit_sptr_set(&f->name, name_ptr);
870
871 f->name_length = name_len;
872 f->hash = hash;
873
874 nxt_unit_sptr_set(&f->value, ptr);
875 f->value_length = (uint32_t) value_len;
876
877 ptr += value_len + 1;
878 header_len -= value_len + 1;
879
880 req->response->fields_count++;
881 }
882
883 } else {
884 val_type = napi.type_of(value);
885
886 if (val_type != napi_string) {
887 value = napi.coerce_to_string(value);
888 }
889
890 value_len = napi.get_value_string_latin1(value, ptr, header_len);
891
892 f = req->response->fields + req->response->fields_count;
893 f->skip = 0;
894
895 nxt_unit_sptr_set(&f->name, name_ptr);
896
897 f->name_length = name_len;
898 f->hash = hash;
899
900 nxt_unit_sptr_set(&f->value, ptr);
901 f->value_length = (uint32_t) value_len;
902
903 ptr += value_len + 1;
904 header_len -= value_len + 1;
905
906 req->response->fields_count++;
907 }
908 }
909
910 } catch (exception &e) {
911 napi.throw_error(e);
912 return nullptr;
913 }
914
915 req->response_buf->free = ptr;
916
917 ret = nxt_unit_response_send(req);
918 if (ret != NXT_UNIT_OK) {
919 napi.throw_error("Failed to send response");
920 return nullptr;
921 }
922
923 return this_arg;
924 }
925
926
927 napi_value
response_write(napi_env env,napi_callback_info info)928 Unit::response_write(napi_env env, napi_callback_info info)
929 {
930 int ret;
931 void *ptr;
932 size_t argc, have_buf_len;
933 ssize_t res_len;
934 uint32_t buf_start, buf_len;
935 nxt_napi napi(env);
936 napi_value this_arg;
937 nxt_unit_buf_t *buf;
938 napi_valuetype buf_type;
939 nxt_unit_request_info_t *req;
940 napi_value argv[3];
941
942 argc = 3;
943
944 try {
945 this_arg = napi.get_cb_info(info, argc, argv);
946 if (argc != 3) {
947 throw exception("Wrong args count. Expected: "
948 "chunk, start, length");
949 }
950
951 req = napi.get_request_info(this_arg);
952 buf_type = napi.type_of(argv[0]);
953 buf_start = napi.get_value_uint32(argv[1]);
954 buf_len = napi.get_value_uint32(argv[2]) + 1;
955
956 if (buf_type == napi_string) {
957 /* TODO: will work only for utf8 content-type */
958
959 if (req->response_buf != NULL
960 && req->response_buf->end >= req->response_buf->free + buf_len)
961 {
962 buf = req->response_buf;
963
964 } else {
965 buf = nxt_unit_response_buf_alloc(req, buf_len);
966 if (buf == NULL) {
967 throw exception("Failed to allocate response buffer");
968 }
969 }
970
971 have_buf_len = napi.get_value_string_utf8(argv[0], buf->free,
972 buf_len);
973
974 buf->free += have_buf_len;
975
976 ret = nxt_unit_buf_send(buf);
977 if (ret == NXT_UNIT_OK) {
978 res_len = have_buf_len;
979 }
980
981 } else {
982 ptr = napi.get_buffer_info(argv[0], have_buf_len);
983
984 if (buf_start > 0) {
985 ptr = ((uint8_t *) ptr) + buf_start;
986 have_buf_len -= buf_start;
987 }
988
989 res_len = nxt_unit_response_write_nb(req, ptr, have_buf_len, 0);
990
991 ret = res_len < 0 ? -res_len : (int) NXT_UNIT_OK;
992 }
993
994 if (ret != NXT_UNIT_OK) {
995 throw exception("Failed to send body buf");
996 }
997 } catch (exception &e) {
998 napi.throw_error(e);
999 return nullptr;
1000 }
1001
1002 return napi.create((int64_t) res_len);
1003 }
1004
1005
1006 napi_value
response_end(napi_env env,napi_callback_info info)1007 Unit::response_end(napi_env env, napi_callback_info info)
1008 {
1009 nxt_napi napi(env);
1010 napi_value this_arg;
1011 req_data_t *req_data;
1012 nxt_unit_request_info_t *req;
1013
1014 try {
1015 this_arg = napi.get_cb_info(info);
1016
1017 req = napi.get_request_info(this_arg);
1018
1019 req_data = (req_data_t *) req->data;
1020
1021 napi.remove_wrap(req_data->sock_ref);
1022 napi.remove_wrap(req_data->req_ref);
1023 napi.remove_wrap(req_data->resp_ref);
1024 napi.remove_wrap(req_data->conn_ref);
1025
1026 } catch (exception &e) {
1027 napi.throw_error(e);
1028 return nullptr;
1029 }
1030
1031 nxt_unit_request_done(req, NXT_UNIT_OK);
1032
1033 return this_arg;
1034 }
1035
1036
1037 napi_value
websocket_send_frame(napi_env env,napi_callback_info info)1038 Unit::websocket_send_frame(napi_env env, napi_callback_info info)
1039 {
1040 int ret, iovec_len;
1041 bool fin;
1042 size_t buf_len;
1043 uint32_t opcode, sc;
1044 nxt_napi napi(env);
1045 napi_value this_arg, frame, payload;
1046 nxt_unit_request_info_t *req;
1047 char status_code[2];
1048 struct iovec iov[2];
1049
1050 iovec_len = 0;
1051
1052 try {
1053 this_arg = napi.get_cb_info(info, frame);
1054
1055 req = napi.get_request_info(this_arg);
1056
1057 opcode = napi.get_value_uint32(napi.get_named_property(frame,
1058 "opcode"));
1059 if (opcode == NXT_WEBSOCKET_OP_CLOSE) {
1060 sc = napi.get_value_uint32(napi.get_named_property(frame,
1061 "closeStatus"));
1062 status_code[0] = (sc >> 8) & 0xFF;
1063 status_code[1] = sc & 0xFF;
1064
1065 iov[iovec_len].iov_base = status_code;
1066 iov[iovec_len].iov_len = 2;
1067 iovec_len++;
1068 }
1069
1070 try {
1071 fin = napi.get_value_bool(napi.get_named_property(frame, "fin"));
1072
1073 } catch (exception &e) {
1074 fin = true;
1075 }
1076
1077 payload = napi.get_named_property(frame, "binaryPayload");
1078
1079 if (napi.is_buffer(payload)) {
1080 iov[iovec_len].iov_base = napi.get_buffer_info(payload, buf_len);
1081
1082 } else {
1083 buf_len = 0;
1084 }
1085
1086 } catch (exception &e) {
1087 napi.throw_error(e);
1088 return nullptr;
1089 }
1090
1091 if (buf_len > 0) {
1092 iov[iovec_len].iov_len = buf_len;
1093 iovec_len++;
1094 }
1095
1096 ret = nxt_unit_websocket_sendv(req, opcode, fin ? 1 : 0, iov, iovec_len);
1097 if (ret != NXT_UNIT_OK) {
1098 goto failed;
1099 }
1100
1101 return this_arg;
1102
1103 failed:
1104
1105 napi.throw_error("Failed to send frame");
1106
1107 return nullptr;
1108 }
1109
1110
1111 napi_value
websocket_set_sock(napi_env env,napi_callback_info info)1112 Unit::websocket_set_sock(napi_env env, napi_callback_info info)
1113 {
1114 nxt_napi napi(env);
1115 napi_value this_arg, sock;
1116 req_data_t *req_data;
1117 nxt_unit_request_info_t *req;
1118
1119 try {
1120 this_arg = napi.get_cb_info(info, sock);
1121
1122 req = napi.get_request_info(sock);
1123
1124 req_data = (req_data_t *) req->data;
1125 req_data->conn_ref = napi.wrap(this_arg, req, conn_destroy);
1126
1127 } catch (exception &e) {
1128 napi.throw_error(e);
1129 return nullptr;
1130 }
1131
1132 return this_arg;
1133 }
1134
1135
1136 void
conn_destroy(napi_env env,void * r,void * finalize_hint)1137 Unit::conn_destroy(napi_env env, void *r, void *finalize_hint)
1138 {
1139 nxt_unit_req_debug(NULL, "conn_destroy: %p", r);
1140 }
1141
1142
1143 void
sock_destroy(napi_env env,void * r,void * finalize_hint)1144 Unit::sock_destroy(napi_env env, void *r, void *finalize_hint)
1145 {
1146 nxt_unit_req_debug(NULL, "sock_destroy: %p", r);
1147 }
1148
1149
1150 void
req_destroy(napi_env env,void * r,void * finalize_hint)1151 Unit::req_destroy(napi_env env, void *r, void *finalize_hint)
1152 {
1153 nxt_unit_req_debug(NULL, "req_destroy: %p", r);
1154 }
1155
1156
1157 void
resp_destroy(napi_env env,void * r,void * finalize_hint)1158 Unit::resp_destroy(napi_env env, void *r, void *finalize_hint)
1159 {
1160 nxt_unit_req_debug(NULL, "resp_destroy: %p", r);
1161 }
1162