1
2 /*
3 * Copyright (C) NGINX, Inc.
4 */
5
6 #include "unit.h"
7
8 #include <unistd.h>
9 #include <fcntl.h>
10
11 #include <uv.h>
12
13 #include <nxt_unit_websocket.h>
14
15
16 napi_ref Unit::constructor_;
17
18
19 struct port_data_t {
20 port_data_t(nxt_unit_ctx_t *c, nxt_unit_port_t *p);
21
22 void process_port_msg();
23 void stop();
24
25 template<typename T>
26 static port_data_t *get(T *handle);
27
28 static void read_callback(uv_poll_t *handle, int status, int events);
29 static void timer_callback(uv_timer_t *handle);
30 static void delete_data(uv_handle_t* handle);
31
32 nxt_unit_ctx_t *ctx;
33 nxt_unit_port_t *port;
34 uv_poll_t poll;
35 uv_timer_t timer;
36 int ref_count;
37 bool scheduled;
38 bool stopped;
39 };
40
41
42 struct req_data_t {
43 napi_ref sock_ref;
44 napi_ref req_ref;
45 napi_ref resp_ref;
46 napi_ref conn_ref;
47 };
48
49
port_data_t(nxt_unit_ctx_t * c,nxt_unit_port_t * p)50 port_data_t::port_data_t(nxt_unit_ctx_t *c, nxt_unit_port_t *p) :
51 ctx(c), port(p), ref_count(0), scheduled(false), stopped(false)
52 {
53 timer.type = UV_UNKNOWN_HANDLE;
54 }
55
56
57 void
process_port_msg()58 port_data_t::process_port_msg()
59 {
60 int rc, err;
61
62 rc = nxt_unit_process_port_msg(ctx, port);
63
64 if (rc != NXT_UNIT_OK) {
65 return;
66 }
67
68 if (timer.type == UV_UNKNOWN_HANDLE) {
69 err = uv_timer_init(poll.loop, &timer);
70 if (err < 0) {
71 nxt_unit_warn(ctx, "Failed to init uv.poll");
72 return;
73 }
74
75 ref_count++;
76 timer.data = this;
77 }
78
79 if (!scheduled && !stopped) {
80 uv_timer_start(&timer, timer_callback, 0, 0);
81
82 scheduled = true;
83 }
84 }
85
86
87 void
stop()88 port_data_t::stop()
89 {
90 stopped = true;
91
92 uv_poll_stop(&poll);
93
94 uv_close((uv_handle_t *) &poll, delete_data);
95
96 if (timer.type == UV_UNKNOWN_HANDLE) {
97 return;
98 }
99
100 uv_timer_stop(&timer);
101
102 uv_close((uv_handle_t *) &timer, delete_data);
103 }
104
105
106 template<typename T>
107 port_data_t *
get(T * handle)108 port_data_t::get(T *handle)
109 {
110 return (port_data_t *) handle->data;
111 }
112
113
114 void
read_callback(uv_poll_t * handle,int status,int events)115 port_data_t::read_callback(uv_poll_t *handle, int status, int events)
116 {
117 get(handle)->process_port_msg();
118 }
119
120
121 void
timer_callback(uv_timer_t * handle)122 port_data_t::timer_callback(uv_timer_t *handle)
123 {
124 port_data_t *data;
125
126 data = get(handle);
127
128 data->scheduled = false;
129 if (data->stopped) {
130 return;
131 }
132
133 data->process_port_msg();
134 }
135
136
137 void
delete_data(uv_handle_t * handle)138 port_data_t::delete_data(uv_handle_t* handle)
139 {
140 port_data_t *data;
141
142 data = get(handle);
143
144 if (--data->ref_count <= 0) {
145 delete data;
146 }
147 }
148
149
Unit(napi_env env,napi_value jsthis)150 Unit::Unit(napi_env env, napi_value jsthis):
151 nxt_napi(env),
152 wrapper_(wrap(jsthis, this, destroy)),
153 unit_ctx_(nullptr)
154 {
155 nxt_unit_debug(NULL, "Unit::Unit()");
156 }
157
158
~Unit()159 Unit::~Unit()
160 {
161 delete_reference(wrapper_);
162
163 nxt_unit_debug(NULL, "Unit::~Unit()");
164 }
165
166
167 napi_value
init(napi_env env,napi_value exports)168 Unit::init(napi_env env, napi_value exports)
169 {
170 nxt_napi napi(env);
171 napi_value ctor;
172
173 napi_property_descriptor unit_props[] = {
174 { "createServer", 0, create_server, 0, 0, 0, napi_default, 0 },
175 { "listen", 0, listen, 0, 0, 0, napi_default, 0 },
176 };
177
178 try {
179 ctor = napi.define_class("Unit", create, 2, unit_props);
180 constructor_ = napi.create_reference(ctor);
181
182 napi.set_named_property(exports, "Unit", ctor);
183 napi.set_named_property(exports, "request_read", request_read);
184 napi.set_named_property(exports, "response_send_headers",
185 response_send_headers);
186 napi.set_named_property(exports, "response_write", response_write);
187 napi.set_named_property(exports, "response_end", response_end);
188 napi.set_named_property(exports, "websocket_send_frame",
189 websocket_send_frame);
190 napi.set_named_property(exports, "websocket_set_sock",
191 websocket_set_sock);
192 napi.set_named_property(exports, "buf_min", nxt_unit_buf_min());
193 napi.set_named_property(exports, "buf_max", nxt_unit_buf_max());
194
195 } catch (exception &e) {
196 napi.throw_error(e);
197 return nullptr;
198 }
199
200 return exports;
201 }
202
203
204 void
destroy(napi_env env,void * nativeObject,void * finalize_hint)205 Unit::destroy(napi_env env, void *nativeObject, void *finalize_hint)
206 {
207 Unit *obj = reinterpret_cast<Unit *>(nativeObject);
208
209 delete obj;
210 }
211
212
213 napi_value
create(napi_env env,napi_callback_info info)214 Unit::create(napi_env env, napi_callback_info info)
215 {
216 nxt_napi napi(env);
217 napi_value target, ctor, instance, jsthis;
218
219 try {
220 target = napi.get_new_target(info);
221
222 if (target != nullptr) {
223 /* Invoked as constructor: `new Unit(...)`. */
224 jsthis = napi.get_cb_info(info);
225
226 new Unit(env, jsthis);
227 napi.create_reference(jsthis);
228
229 return jsthis;
230 }
231
232 /* Invoked as plain function `Unit(...)`, turn into construct call. */
233 ctor = napi.get_reference_value(constructor_);
234 instance = napi.new_instance(ctor);
235 napi.create_reference(instance);
236
237 } catch (exception &e) {
238 napi.throw_error(e);
239 return nullptr;
240 }
241
242 return instance;
243 }
244
245
246 napi_value
create_server(napi_env env,napi_callback_info info)247 Unit::create_server(napi_env env, napi_callback_info info)
248 {
249 Unit *obj;
250 size_t argc;
251 nxt_napi napi(env);
252 napi_value jsthis, argv;
253 nxt_unit_init_t unit_init;
254
255 argc = 1;
256
257 try {
258 jsthis = napi.get_cb_info(info, argc, &argv);
259 obj = (Unit *) napi.unwrap(jsthis);
260
261 } catch (exception &e) {
262 napi.throw_error(e);
263 return nullptr;
264 }
265
266 memset(&unit_init, 0, sizeof(nxt_unit_init_t));
267
268 unit_init.data = obj;
269 unit_init.callbacks.request_handler = request_handler_cb;
270 unit_init.callbacks.websocket_handler = websocket_handler_cb;
271 unit_init.callbacks.close_handler = close_handler_cb;
272 unit_init.callbacks.shm_ack_handler = shm_ack_handler_cb;
273 unit_init.callbacks.add_port = add_port;
274 unit_init.callbacks.remove_port = remove_port;
275 unit_init.callbacks.quit = quit_cb;
276
277 unit_init.request_data_size = sizeof(req_data_t);
278
279 obj->unit_ctx_ = nxt_unit_init(&unit_init);
280 if (obj->unit_ctx_ == NULL) {
281 goto failed;
282 }
283
284 return nullptr;
285
286 failed:
287
288 napi_throw_error(env, NULL, "Failed to create Unit object");
289
290 return nullptr;
291 }
292
293
294 napi_value
listen(napi_env env,napi_callback_info info)295 Unit::listen(napi_env env, napi_callback_info info)
296 {
297 return nullptr;
298 }
299
300
301 void
request_handler_cb(nxt_unit_request_info_t * req)302 Unit::request_handler_cb(nxt_unit_request_info_t *req)
303 {
304 Unit *obj;
305
306 obj = reinterpret_cast<Unit *>(req->unit->data);
307
308 obj->request_handler(req);
309 }
310
311
312 void
request_handler(nxt_unit_request_info_t * req)313 Unit::request_handler(nxt_unit_request_info_t *req)
314 {
315 napi_value socket, request, response, server_obj, emit_request;
316
317 memset(req->data, 0, sizeof(req_data_t));
318
319 try {
320 nxt_handle_scope scope(env());
321
322 server_obj = get_server_object();
323
324 socket = create_socket(server_obj, req);
325 request = create_request(server_obj, socket, req);
326 response = create_response(server_obj, request, req);
327
328 create_headers(req, request);
329
330 emit_request = get_named_property(server_obj, "emit_request");
331
332 nxt_async_context async_context(env(), "request_handler");
333 nxt_callback_scope async_scope(async_context);
334
335 make_callback(async_context, server_obj, emit_request, request,
336 response);
337
338 } catch (exception &e) {
339 nxt_unit_req_warn(req, "request_handler: %s", e.str);
340 }
341 }
342
343
344 void
websocket_handler_cb(nxt_unit_websocket_frame_t * ws)345 Unit::websocket_handler_cb(nxt_unit_websocket_frame_t *ws)
346 {
347 Unit *obj;
348
349 obj = reinterpret_cast<Unit *>(ws->req->unit->data);
350
351 obj->websocket_handler(ws);
352 }
353
354
355 void
websocket_handler(nxt_unit_websocket_frame_t * ws)356 Unit::websocket_handler(nxt_unit_websocket_frame_t *ws)
357 {
358 napi_value frame, server_obj, process_frame, conn;
359 req_data_t *req_data;
360
361 req_data = (req_data_t *) ws->req->data;
362
363 try {
364 nxt_handle_scope scope(env());
365
366 server_obj = get_server_object();
367
368 frame = create_websocket_frame(server_obj, ws);
369
370 conn = get_reference_value(req_data->conn_ref);
371
372 process_frame = get_named_property(conn, "processFrame");
373
374 nxt_async_context async_context(env(), "websocket_handler");
375 nxt_callback_scope async_scope(async_context);
376
377 make_callback(async_context, conn, process_frame, frame);
378
379 } catch (exception &e) {
380 nxt_unit_req_warn(ws->req, "websocket_handler: %s", e.str);
381 }
382
383 nxt_unit_websocket_done(ws);
384 }
385
386
387 void
close_handler_cb(nxt_unit_request_info_t * req)388 Unit::close_handler_cb(nxt_unit_request_info_t *req)
389 {
390 Unit *obj;
391
392 obj = reinterpret_cast<Unit *>(req->unit->data);
393
394 obj->close_handler(req);
395 }
396
397
398 void
close_handler(nxt_unit_request_info_t * req)399 Unit::close_handler(nxt_unit_request_info_t *req)
400 {
401 napi_value conn_handle_close, conn;
402 req_data_t *req_data;
403
404 req_data = (req_data_t *) req->data;
405
406 try {
407 nxt_handle_scope scope(env());
408
409 conn = get_reference_value(req_data->conn_ref);
410
411 conn_handle_close = get_named_property(conn, "handleSocketClose");
412
413 nxt_async_context async_context(env(), "close_handler");
414 nxt_callback_scope async_scope(async_context);
415
416 make_callback(async_context, conn, conn_handle_close,
417 nxt_napi::create(0));
418
419 remove_wrap(req_data->sock_ref);
420 remove_wrap(req_data->req_ref);
421 remove_wrap(req_data->resp_ref);
422 remove_wrap(req_data->conn_ref);
423
424 } catch (exception &e) {
425 nxt_unit_req_warn(req, "close_handler: %s", e.str);
426
427 nxt_unit_request_done(req, NXT_UNIT_ERROR);
428
429 return;
430 }
431
432 nxt_unit_request_done(req, NXT_UNIT_OK);
433 }
434
435
436 void
shm_ack_handler_cb(nxt_unit_ctx_t * ctx)437 Unit::shm_ack_handler_cb(nxt_unit_ctx_t *ctx)
438 {
439 Unit *obj;
440
441 obj = reinterpret_cast<Unit *>(ctx->unit->data);
442
443 obj->shm_ack_handler(ctx);
444 }
445
446
447 void
shm_ack_handler(nxt_unit_ctx_t * ctx)448 Unit::shm_ack_handler(nxt_unit_ctx_t *ctx)
449 {
450 napi_value server_obj, emit_drain;
451
452 try {
453 nxt_handle_scope scope(env());
454
455 server_obj = get_server_object();
456
457 emit_drain = get_named_property(server_obj, "emit_drain");
458
459 nxt_async_context async_context(env(), "shm_ack_handler");
460 nxt_callback_scope async_scope(async_context);
461
462 make_callback(async_context, server_obj, emit_drain);
463
464 } catch (exception &e) {
465 nxt_unit_warn(ctx, "shm_ack_handler: %s", e.str);
466 }
467 }
468
469
470 int
add_port(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port)471 Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
472 {
473 int err;
474 Unit *obj;
475 uv_loop_t *loop;
476 port_data_t *data;
477 napi_status status;
478
479 if (port->in_fd != -1) {
480 if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) {
481 nxt_unit_warn(ctx, "fcntl(%d, O_NONBLOCK) failed: %s (%d)",
482 port->in_fd, strerror(errno), errno);
483 return -1;
484 }
485
486 obj = reinterpret_cast<Unit *>(ctx->unit->data);
487
488 status = napi_get_uv_event_loop(obj->env(), &loop);
489 if (status != napi_ok) {
490 nxt_unit_warn(ctx, "Failed to get uv.loop");
491 return NXT_UNIT_ERROR;
492 }
493
494 data = new port_data_t(ctx, port);
495
496 err = uv_poll_init(loop, &data->poll, port->in_fd);
497 if (err < 0) {
498 nxt_unit_warn(ctx, "Failed to init uv.poll");
499 delete data;
500 return NXT_UNIT_ERROR;
501 }
502
503 err = uv_poll_start(&data->poll, UV_READABLE,
504 port_data_t::read_callback);
505 if (err < 0) {
506 nxt_unit_warn(ctx, "Failed to start uv.poll");
507 delete data;
508 return NXT_UNIT_ERROR;
509 }
510
511 port->data = data;
512
513 data->ref_count++;
514 data->poll.data = data;
515 }
516
517 return NXT_UNIT_OK;
518 }
519
520
521 void
remove_port(nxt_unit_t * unit,nxt_unit_ctx_t * ctx,nxt_unit_port_t * port)522 Unit::remove_port(nxt_unit_t *unit, nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
523 {
524 port_data_t *data;
525
526 if (port->data != NULL && ctx != NULL) {
527 data = (port_data_t *) port->data;
528
529 data->stop();
530 }
531 }
532
533
534 void
quit_cb(nxt_unit_ctx_t * ctx)535 Unit::quit_cb(nxt_unit_ctx_t *ctx)
536 {
537 Unit *obj;
538
539 obj = reinterpret_cast<Unit *>(ctx->unit->data);
540
541 obj->quit(ctx);
542 }
543
544
545 void
quit(nxt_unit_ctx_t * ctx)546 Unit::quit(nxt_unit_ctx_t *ctx)
547 {
548 napi_value server_obj, emit_close;
549
550 try {
551 nxt_handle_scope scope(env());
552
553 server_obj = get_server_object();
554
555 emit_close = get_named_property(server_obj, "emit_close");
556
557 nxt_async_context async_context(env(), "unit_quit");
558 nxt_callback_scope async_scope(async_context);
559
560 make_callback(async_context, server_obj, emit_close);
561
562 } catch (exception &e) {
563 nxt_unit_debug(ctx, "quit: %s", e.str);
564 }
565
566 nxt_unit_done(ctx);
567 }
568
569
570 napi_value
get_server_object()571 Unit::get_server_object()
572 {
573 napi_value unit_obj;
574
575 unit_obj = get_reference_value(wrapper_);
576
577 return get_named_property(unit_obj, "server");
578 }
579
580
581 void
create_headers(nxt_unit_request_info_t * req,napi_value request)582 Unit::create_headers(nxt_unit_request_info_t *req, napi_value request)
583 {
584 uint32_t i;
585 napi_value headers, raw_headers;
586 napi_status status;
587 nxt_unit_request_t *r;
588
589 r = req->request;
590
591 headers = create_object();
592
593 status = napi_create_array_with_length(env(), r->fields_count * 2,
594 &raw_headers);
595 if (status != napi_ok) {
596 throw exception("Failed to create array");
597 }
598
599 for (i = 0; i < r->fields_count; i++) {
600 append_header(r->fields + i, headers, raw_headers, i);
601 }
602
603 set_named_property(request, "headers", headers);
604 set_named_property(request, "rawHeaders", raw_headers);
605 set_named_property(request, "httpVersion", r->version, r->version_length);
606 set_named_property(request, "method", r->method, r->method_length);
607 set_named_property(request, "url", r->target, r->target_length);
608
609 set_named_property(request, "_websocket_handshake", r->websocket_handshake);
610 }
611
612
613 inline char
lowcase(char c)614 lowcase(char c)
615 {
616 return (c >= 'A' && c <= 'Z') ? (c | 0x20) : c;
617 }
618
619
620 inline void
append_header(nxt_unit_field_t * f,napi_value headers,napi_value raw_headers,uint32_t idx)621 Unit::append_header(nxt_unit_field_t *f, napi_value headers,
622 napi_value raw_headers, uint32_t idx)
623 {
624 char *name;
625 uint8_t i;
626 napi_value str, vstr;
627
628 name = (char *) nxt_unit_sptr_get(&f->name);
629
630 str = create_string_latin1(name, f->name_length);
631
632 for (i = 0; i < f->name_length; i++) {
633 name[i] = lowcase(name[i]);
634 }
635
636 vstr = set_named_property(headers, name, f->value, f->value_length);
637
638 set_element(raw_headers, idx * 2, str);
639 set_element(raw_headers, idx * 2 + 1, vstr);
640 }
641
642
643 napi_value
create_socket(napi_value server_obj,nxt_unit_request_info_t * req)644 Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req)
645 {
646 napi_value constructor, res;
647 req_data_t *req_data;
648 nxt_unit_request_t *r;
649
650 r = req->request;
651
652 constructor = get_named_property(server_obj, "Socket");
653
654 res = new_instance(constructor);
655
656 req_data = (req_data_t *) req->data;
657 req_data->sock_ref = wrap(res, req, sock_destroy);
658
659 set_named_property(res, "remoteAddress", r->remote, r->remote_length);
660 set_named_property(res, "localAddress", r->local_addr,
661 r->local_addr_length);
662
663 return res;
664 }
665
666
667 napi_value
create_request(napi_value server_obj,napi_value socket,nxt_unit_request_info_t * req)668 Unit::create_request(napi_value server_obj, napi_value socket,
669 nxt_unit_request_info_t *req)
670 {
671 napi_value constructor, res;
672 req_data_t *req_data;
673
674 constructor = get_named_property(server_obj, "ServerRequest");
675
676 res = new_instance(constructor, server_obj, socket);
677
678 req_data = (req_data_t *) req->data;
679 req_data->req_ref = wrap(res, req, req_destroy);
680
681 return res;
682 }
683
684
685 napi_value
create_response(napi_value server_obj,napi_value request,nxt_unit_request_info_t * req)686 Unit::create_response(napi_value server_obj, napi_value request,
687 nxt_unit_request_info_t *req)
688 {
689 napi_value constructor, res;
690 req_data_t *req_data;
691
692 constructor = get_named_property(server_obj, "ServerResponse");
693
694 res = new_instance(constructor, request);
695
696 req_data = (req_data_t *) req->data;
697 req_data->resp_ref = wrap(res, req, resp_destroy);
698
699 return res;
700 }
701
702
703 napi_value
create_websocket_frame(napi_value server_obj,nxt_unit_websocket_frame_t * ws)704 Unit::create_websocket_frame(napi_value server_obj,
705 nxt_unit_websocket_frame_t *ws)
706 {
707 void *data;
708 napi_value constructor, res, buffer;
709 uint8_t sc[2];
710
711 constructor = get_named_property(server_obj, "WebSocketFrame");
712
713 res = new_instance(constructor);
714
715 set_named_property(res, "fin", (bool) ws->header->fin);
716 set_named_property(res, "opcode", ws->header->opcode);
717 set_named_property(res, "length", (int64_t) ws->payload_len);
718
719 if (ws->header->opcode == NXT_WEBSOCKET_OP_CLOSE) {
720 if (ws->payload_len >= 2) {
721 nxt_unit_websocket_read(ws, sc, 2);
722
723 set_named_property(res, "closeStatus",
724 (((uint16_t) sc[0]) << 8) | sc[1]);
725
726 } else {
727 set_named_property(res, "closeStatus", -1);
728 }
729 }
730
731 buffer = create_buffer((size_t) ws->content_length, &data);
732 nxt_unit_websocket_read(ws, data, ws->content_length);
733
734 set_named_property(res, "binaryPayload", buffer);
735
736 return res;
737 }
738
739
740 napi_value
request_read(napi_env env,napi_callback_info info)741 Unit::request_read(napi_env env, napi_callback_info info)
742 {
743 void *data;
744 uint32_t wm;
745 nxt_napi napi(env);
746 napi_value this_arg, argv, buffer;
747 nxt_unit_request_info_t *req;
748
749 try {
750 this_arg = napi.get_cb_info(info, argv);
751
752 try {
753 req = napi.get_request_info(this_arg);
754
755 } catch (exception &e) {
756 return nullptr;
757 }
758
759 if (req->content_length == 0) {
760 return nullptr;
761 }
762
763 wm = napi.get_value_uint32(argv);
764
765 if (wm > req->content_length) {
766 wm = req->content_length;
767 }
768
769 buffer = napi.create_buffer((size_t) wm, &data);
770 nxt_unit_request_read(req, data, wm);
771
772 } catch (exception &e) {
773 napi.throw_error(e);
774 return nullptr;
775 }
776
777 return buffer;
778 }
779
780
781 napi_value
response_send_headers(napi_env env,napi_callback_info info)782 Unit::response_send_headers(napi_env env, napi_callback_info info)
783 {
784 int ret;
785 char *ptr, *name_ptr;
786 bool is_array;
787 size_t argc, name_len, value_len;
788 uint32_t status_code, header_len, keys_len, array_len;
789 uint32_t keys_count, i, j;
790 uint16_t hash;
791 nxt_napi napi(env);
792 napi_value this_arg, headers, keys, name, value, array_val;
793 napi_value array_entry;
794 napi_valuetype val_type;
795 nxt_unit_field_t *f;
796 nxt_unit_request_info_t *req;
797 napi_value argv[4];
798
799 argc = 4;
800
801 try {
802 this_arg = napi.get_cb_info(info, argc, argv);
803 if (argc != 4) {
804 napi.throw_error("Wrong args count. Expected: "
805 "statusCode, headers, headers count, "
806 "headers length");
807 return nullptr;
808 }
809
810 req = napi.get_request_info(this_arg);
811 status_code = napi.get_value_uint32(argv[0]);
812 keys_count = napi.get_value_uint32(argv[2]);
813 header_len = napi.get_value_uint32(argv[3]);
814
815 headers = argv[1];
816
817 ret = nxt_unit_response_init(req, status_code, keys_count, header_len);
818 if (ret != NXT_UNIT_OK) {
819 napi.throw_error("Failed to create response");
820 return nullptr;
821 }
822
823 /*
824 * Each name and value are 0-terminated by libunit.
825 * Need to add extra 2 bytes for each header.
826 */
827 header_len += keys_count * 2;
828
829 keys = napi.get_property_names(headers);
830 keys_len = napi.get_array_length(keys);
831
832 ptr = req->response_buf->free;
833
834 for (i = 0; i < keys_len; i++) {
835 name = napi.get_element(keys, i);
836
837 array_entry = napi.get_property(headers, name);
838
839 name = napi.get_element(array_entry, 0);
840 value = napi.get_element(array_entry, 1);
841
842 name_len = napi.get_value_string_latin1(name, ptr, header_len);
843 name_ptr = ptr;
844
845 ptr += name_len + 1;
846 header_len -= name_len + 1;
847
848 hash = nxt_unit_field_hash(name_ptr, name_len);
849
850 is_array = napi.is_array(value);
851
852 if (is_array) {
853 array_len = napi.get_array_length(value);
854
855 for (j = 0; j < array_len; j++) {
856 array_val = napi.get_element(value, j);
857
858 val_type = napi.type_of(array_val);
859
860 if (val_type != napi_string) {
861 array_val = napi.coerce_to_string(array_val);
862 }
863
864 value_len = napi.get_value_string_latin1(array_val, ptr,
865 header_len);
866
867 f = req->response->fields + req->response->fields_count;
868 f->skip = 0;
869
870 nxt_unit_sptr_set(&f->name, name_ptr);
871
872 f->name_length = name_len;
873 f->hash = hash;
874
875 nxt_unit_sptr_set(&f->value, ptr);
876 f->value_length = (uint32_t) value_len;
877
878 ptr += value_len + 1;
879 header_len -= value_len + 1;
880
881 req->response->fields_count++;
882 }
883
884 } else {
885 val_type = napi.type_of(value);
886
887 if (val_type != napi_string) {
888 value = napi.coerce_to_string(value);
889 }
890
891 value_len = napi.get_value_string_latin1(value, ptr, header_len);
892
893 f = req->response->fields + req->response->fields_count;
894 f->skip = 0;
895
896 nxt_unit_sptr_set(&f->name, name_ptr);
897
898 f->name_length = name_len;
899 f->hash = hash;
900
901 nxt_unit_sptr_set(&f->value, ptr);
902 f->value_length = (uint32_t) value_len;
903
904 ptr += value_len + 1;
905 header_len -= value_len + 1;
906
907 req->response->fields_count++;
908 }
909 }
910
911 } catch (exception &e) {
912 napi.throw_error(e);
913 return nullptr;
914 }
915
916 req->response_buf->free = ptr;
917
918 ret = nxt_unit_response_send(req);
919 if (ret != NXT_UNIT_OK) {
920 napi.throw_error("Failed to send response");
921 return nullptr;
922 }
923
924 return this_arg;
925 }
926
927
928 napi_value
response_write(napi_env env,napi_callback_info info)929 Unit::response_write(napi_env env, napi_callback_info info)
930 {
931 int ret;
932 void *ptr;
933 size_t argc, have_buf_len;
934 ssize_t res_len;
935 uint32_t buf_start, buf_len;
936 nxt_napi napi(env);
937 napi_value this_arg;
938 nxt_unit_buf_t *buf;
939 napi_valuetype buf_type;
940 nxt_unit_request_info_t *req;
941 napi_value argv[3];
942
943 argc = 3;
944
945 try {
946 this_arg = napi.get_cb_info(info, argc, argv);
947 if (argc != 3) {
948 throw exception("Wrong args count. Expected: "
949 "chunk, start, length");
950 }
951
952 req = napi.get_request_info(this_arg);
953 buf_type = napi.type_of(argv[0]);
954 buf_start = napi.get_value_uint32(argv[1]);
955 buf_len = napi.get_value_uint32(argv[2]) + 1;
956
957 if (buf_type == napi_string) {
958 /* TODO: will work only for utf8 content-type */
959
960 if (req->response_buf != NULL
961 && req->response_buf->end >= req->response_buf->free + buf_len)
962 {
963 buf = req->response_buf;
964
965 } else {
966 buf = nxt_unit_response_buf_alloc(req, buf_len);
967 if (buf == NULL) {
968 throw exception("Failed to allocate response buffer");
969 }
970 }
971
972 have_buf_len = napi.get_value_string_utf8(argv[0], buf->free,
973 buf_len);
974
975 buf->free += have_buf_len;
976
977 ret = nxt_unit_buf_send(buf);
978 if (ret == NXT_UNIT_OK) {
979 res_len = have_buf_len;
980 }
981
982 } else {
983 ptr = napi.get_buffer_info(argv[0], have_buf_len);
984
985 if (buf_start > 0) {
986 ptr = ((uint8_t *) ptr) + buf_start;
987 have_buf_len -= buf_start;
988 }
989
990 res_len = nxt_unit_response_write_nb(req, ptr, have_buf_len, 0);
991
992 ret = res_len < 0 ? -res_len : (int) NXT_UNIT_OK;
993 }
994
995 if (ret != NXT_UNIT_OK) {
996 throw exception("Failed to send body buf");
997 }
998 } catch (exception &e) {
999 napi.throw_error(e);
1000 return nullptr;
1001 }
1002
1003 return napi.create((int64_t) res_len);
1004 }
1005
1006
1007 napi_value
response_end(napi_env env,napi_callback_info info)1008 Unit::response_end(napi_env env, napi_callback_info info)
1009 {
1010 nxt_napi napi(env);
1011 napi_value this_arg;
1012 req_data_t *req_data;
1013 nxt_unit_request_info_t *req;
1014
1015 try {
1016 this_arg = napi.get_cb_info(info);
1017
1018 req = napi.get_request_info(this_arg);
1019
1020 req_data = (req_data_t *) req->data;
1021
1022 napi.remove_wrap(req_data->sock_ref);
1023 napi.remove_wrap(req_data->req_ref);
1024 napi.remove_wrap(req_data->resp_ref);
1025 napi.remove_wrap(req_data->conn_ref);
1026
1027 } catch (exception &e) {
1028 napi.throw_error(e);
1029 return nullptr;
1030 }
1031
1032 nxt_unit_request_done(req, NXT_UNIT_OK);
1033
1034 return this_arg;
1035 }
1036
1037
1038 napi_value
websocket_send_frame(napi_env env,napi_callback_info info)1039 Unit::websocket_send_frame(napi_env env, napi_callback_info info)
1040 {
1041 int ret, iovec_len;
1042 bool fin;
1043 size_t buf_len;
1044 uint32_t opcode, sc;
1045 nxt_napi napi(env);
1046 napi_value this_arg, frame, payload;
1047 nxt_unit_request_info_t *req;
1048 char status_code[2];
1049 struct iovec iov[2];
1050
1051 iovec_len = 0;
1052
1053 try {
1054 this_arg = napi.get_cb_info(info, frame);
1055
1056 req = napi.get_request_info(this_arg);
1057
1058 opcode = napi.get_value_uint32(napi.get_named_property(frame,
1059 "opcode"));
1060 if (opcode == NXT_WEBSOCKET_OP_CLOSE) {
1061 sc = napi.get_value_uint32(napi.get_named_property(frame,
1062 "closeStatus"));
1063 status_code[0] = (sc >> 8) & 0xFF;
1064 status_code[1] = sc & 0xFF;
1065
1066 iov[iovec_len].iov_base = status_code;
1067 iov[iovec_len].iov_len = 2;
1068 iovec_len++;
1069 }
1070
1071 try {
1072 fin = napi.get_value_bool(napi.get_named_property(frame, "fin"));
1073
1074 } catch (exception &e) {
1075 fin = true;
1076 }
1077
1078 payload = napi.get_named_property(frame, "binaryPayload");
1079
1080 if (napi.is_buffer(payload)) {
1081 iov[iovec_len].iov_base = napi.get_buffer_info(payload, buf_len);
1082
1083 } else {
1084 buf_len = 0;
1085 }
1086
1087 } catch (exception &e) {
1088 napi.throw_error(e);
1089 return nullptr;
1090 }
1091
1092 if (buf_len > 0) {
1093 iov[iovec_len].iov_len = buf_len;
1094 iovec_len++;
1095 }
1096
1097 ret = nxt_unit_websocket_sendv(req, opcode, fin ? 1 : 0, iov, iovec_len);
1098 if (ret != NXT_UNIT_OK) {
1099 goto failed;
1100 }
1101
1102 return this_arg;
1103
1104 failed:
1105
1106 napi.throw_error("Failed to send frame");
1107
1108 return nullptr;
1109 }
1110
1111
1112 napi_value
websocket_set_sock(napi_env env,napi_callback_info info)1113 Unit::websocket_set_sock(napi_env env, napi_callback_info info)
1114 {
1115 nxt_napi napi(env);
1116 napi_value this_arg, sock;
1117 req_data_t *req_data;
1118 nxt_unit_request_info_t *req;
1119
1120 try {
1121 this_arg = napi.get_cb_info(info, sock);
1122
1123 req = napi.get_request_info(sock);
1124
1125 req_data = (req_data_t *) req->data;
1126 req_data->conn_ref = napi.wrap(this_arg, req, conn_destroy);
1127
1128 } catch (exception &e) {
1129 napi.throw_error(e);
1130 return nullptr;
1131 }
1132
1133 return this_arg;
1134 }
1135
1136
1137 void
conn_destroy(napi_env env,void * r,void * finalize_hint)1138 Unit::conn_destroy(napi_env env, void *r, void *finalize_hint)
1139 {
1140 nxt_unit_req_debug(NULL, "conn_destroy: %p", r);
1141 }
1142
1143
1144 void
sock_destroy(napi_env env,void * r,void * finalize_hint)1145 Unit::sock_destroy(napi_env env, void *r, void *finalize_hint)
1146 {
1147 nxt_unit_req_debug(NULL, "sock_destroy: %p", r);
1148 }
1149
1150
1151 void
req_destroy(napi_env env,void * r,void * finalize_hint)1152 Unit::req_destroy(napi_env env, void *r, void *finalize_hint)
1153 {
1154 nxt_unit_req_debug(NULL, "req_destroy: %p", r);
1155 }
1156
1157
1158 void
resp_destroy(napi_env env,void * r,void * finalize_hint)1159 Unit::resp_destroy(napi_env env, void *r, void *finalize_hint)
1160 {
1161 nxt_unit_req_debug(NULL, "resp_destroy: %p", r);
1162 }
1163