1
2 /*
3 * Copyright (C) NGINX, Inc.
4 */
5
6 #include "unit.h"
7
8 #include <unistd.h>
9 #include <fcntl.h>
10
11 #include <uv.h>
12
13 #include <nxt_unit_websocket.h>
14
15
16 napi_ref Unit::constructor_;
17
18
19 struct port_data_t {
20 port_data_t(nxt_unit_ctx_t *c, nxt_unit_port_t *p);
21
22 void process_port_msg();
23 void stop();
24
25 template<typename T>
26 static port_data_t *get(T *handle);
27
28 static void read_callback(uv_poll_t *handle, int status, int events);
29 static void timer_callback(uv_timer_t *handle);
30 static void delete_data(uv_handle_t* handle);
31
32 nxt_unit_ctx_t *ctx;
33 nxt_unit_port_t *port;
34 uv_poll_t poll;
35 uv_timer_t timer;
36 int ref_count;
37 bool scheduled;
38 bool stopped;
39 };
40
41
42 struct req_data_t {
43 napi_ref sock_ref;
44 napi_ref req_ref;
45 napi_ref resp_ref;
46 napi_ref conn_ref;
47 };
48
49
port_data_t(nxt_unit_ctx_t * c,nxt_unit_port_t * p)50 port_data_t::port_data_t(nxt_unit_ctx_t *c, nxt_unit_port_t *p) :
51 ctx(c), port(p), ref_count(0), scheduled(false), stopped(false)
52 {
53 timer.type = UV_UNKNOWN_HANDLE;
54 }
55
56
57 void
process_port_msg()58 port_data_t::process_port_msg()
59 {
60 int rc, err;
61
62 rc = nxt_unit_process_port_msg(ctx, port);
63
64 if (rc != NXT_UNIT_OK) {
65 return;
66 }
67
68 if (timer.type == UV_UNKNOWN_HANDLE) {
69 err = uv_timer_init(poll.loop, &timer);
70 if (err < 0) {
71 nxt_unit_warn(ctx, "Failed to init uv.poll");
72 return;
73 }
74
75 ref_count++;
76 timer.data = this;
77 }
78
79 if (!scheduled && !stopped) {
80 uv_timer_start(&timer, timer_callback, 0, 0);
81
82 scheduled = true;
83 }
84 }
85
86
87 void
stop()88 port_data_t::stop()
89 {
90 stopped = true;
91
92 uv_poll_stop(&poll);
93
94 uv_close((uv_handle_t *) &poll, delete_data);
95
96 if (timer.type == UV_UNKNOWN_HANDLE) {
97 return;
98 }
99
100 uv_timer_stop(&timer);
101
102 uv_close((uv_handle_t *) &timer, delete_data);
103 }
104
105
106 template<typename T>
107 port_data_t *
get(T * handle)108 port_data_t::get(T *handle)
109 {
110 return (port_data_t *) handle->data;
111 }
112
113
114 void
read_callback(uv_poll_t * handle,int status,int events)115 port_data_t::read_callback(uv_poll_t *handle, int status, int events)
116 {
117 get(handle)->process_port_msg();
118 }
119
120
121 void
timer_callback(uv_timer_t * handle)122 port_data_t::timer_callback(uv_timer_t *handle)
123 {
124 port_data_t *data;
125
126 data = get(handle);
127
128 data->scheduled = false;
129 if (data->stopped) {
130 return;
131 }
132
133 data->process_port_msg();
134 }
135
136
137 void
delete_data(uv_handle_t * handle)138 port_data_t::delete_data(uv_handle_t* handle)
139 {
140 port_data_t *data;
141
142 data = get(handle);
143
144 if (--data->ref_count <= 0) {
145 delete data;
146 }
147 }
148
149
Unit(napi_env env,napi_value jsthis)150 Unit::Unit(napi_env env, napi_value jsthis):
151 nxt_napi(env),
152 wrapper_(wrap(jsthis, this, destroy)),
153 unit_ctx_(nullptr)
154 {
155 nxt_unit_debug(NULL, "Unit::Unit()");
156 }
157
158
~Unit()159 Unit::~Unit()
160 {
161 delete_reference(wrapper_);
162
163 nxt_unit_debug(NULL, "Unit::~Unit()");
164 }
165
166
167 napi_value
init(napi_env env,napi_value exports)168 Unit::init(napi_env env, napi_value exports)
169 {
170 nxt_napi napi(env);
171 napi_value ctor;
172
173 napi_property_descriptor unit_props[] = {
174 { "createServer", 0, create_server, 0, 0, 0, napi_default, 0 },
175 { "listen", 0, listen, 0, 0, 0, napi_default, 0 },
176 };
177
178 try {
179 ctor = napi.define_class("Unit", create, 2, unit_props);
180 constructor_ = napi.create_reference(ctor);
181
182 napi.set_named_property(exports, "Unit", ctor);
183 napi.set_named_property(exports, "request_read", request_read);
184 napi.set_named_property(exports, "response_send_headers",
185 response_send_headers);
186 napi.set_named_property(exports, "response_write", response_write);
187 napi.set_named_property(exports, "response_end", response_end);
188 napi.set_named_property(exports, "websocket_send_frame",
189 websocket_send_frame);
190 napi.set_named_property(exports, "websocket_set_sock",
191 websocket_set_sock);
192 napi.set_named_property(exports, "buf_min", nxt_unit_buf_min());
193 napi.set_named_property(exports, "buf_max", nxt_unit_buf_max());
194
195 } catch (exception &e) {
196 napi.throw_error(e);
197 return nullptr;
198 }
199
200 return exports;
201 }
202
203
204 void
destroy(napi_env env,void * nativeObject,void * finalize_hint)205 Unit::destroy(napi_env env, void *nativeObject, void *finalize_hint)
206 {
207 Unit *obj = reinterpret_cast<Unit *>(nativeObject);
208
209 delete obj;
210 }
211
212
213 napi_value
create(napi_env env,napi_callback_info info)214 Unit::create(napi_env env, napi_callback_info info)
215 {
216 nxt_napi napi(env);
217 napi_value target, ctor, instance, jsthis;
218
219 try {
220 target = napi.get_new_target(info);
221
222 if (target != nullptr) {
223 /* Invoked as constructor: `new Unit(...)`. */
224 jsthis = napi.get_cb_info(info);
225
226 new Unit(env, jsthis);
227 napi.create_reference(jsthis);
228
229 return jsthis;
230 }
231
232 /* Invoked as plain function `Unit(...)`, turn into construct call. */
233 ctor = napi.get_reference_value(constructor_);
234 instance = napi.new_instance(ctor);
235 napi.create_reference(instance);
236
237 } catch (exception &e) {
238 napi.throw_error(e);
239 return nullptr;
240 }
241
242 return instance;
243 }
244
245
246 napi_value
create_server(napi_env env,napi_callback_info info)247 Unit::create_server(napi_env env, napi_callback_info info)
248 {
249 Unit *obj;
250 size_t argc;
251 nxt_napi napi(env);
252 napi_value jsthis, argv;
253 nxt_unit_init_t unit_init;
254
255 argc = 1;
256
257 try {
258 jsthis = napi.get_cb_info(info, argc, &argv);
259 obj = (Unit *) napi.unwrap(jsthis);
260
261 } catch (exception &e) {
262 napi.throw_error(e);
263 return nullptr;
264 }
265
266 memset(&unit_init, 0, sizeof(nxt_unit_init_t));
267
268 unit_init.data = obj;
269 unit_init.callbacks.request_handler = request_handler_cb;
270 unit_init.callbacks.websocket_handler = websocket_handler_cb;
271 unit_init.callbacks.close_handler = close_handler_cb;
272 unit_init.callbacks.shm_ack_handler = shm_ack_handler_cb;
273 unit_init.callbacks.add_port = add_port;
274 unit_init.callbacks.remove_port = remove_port;
275 unit_init.callbacks.quit = quit_cb;
276
277 unit_init.request_data_size = sizeof(req_data_t);
278
279 obj->unit_ctx_ = nxt_unit_init(&unit_init);
280 if (obj->unit_ctx_ == NULL) {
281 goto failed;
282 }
283
284 return nullptr;
285
286 failed:
287
288 napi_throw_error(env, NULL, "Failed to create Unit object");
289
290 return nullptr;
291 }
292
293
294 napi_value
listen(napi_env env,napi_callback_info info)295 Unit::listen(napi_env env, napi_callback_info info)
296 {
297 return nullptr;
298 }
299
300
301 void
request_handler_cb(nxt_unit_request_info_t * req)302 Unit::request_handler_cb(nxt_unit_request_info_t *req)
303 {
304 Unit *obj;
305
306 obj = reinterpret_cast<Unit *>(req->unit->data);
307
308 obj->request_handler(req);
309 }
310
311
312 void
request_handler(nxt_unit_request_info_t * req)313 Unit::request_handler(nxt_unit_request_info_t *req)
314 {
315 napi_value socket, request, response, server_obj, emit_request;
316
317 memset(req->data, 0, sizeof(req_data_t));
318
319 try {
320 nxt_handle_scope scope(env());
321
322 server_obj = get_server_object();
323
324 socket = create_socket(server_obj, req);
325 request = create_request(server_obj, socket, req);
326 response = create_response(server_obj, request, req);
327
328 create_headers(req, request);
329
330 emit_request = get_named_property(server_obj, "emit_request");
331
332 nxt_async_context async_context(env(), "request_handler");
333 nxt_callback_scope async_scope(async_context);
334
335 make_callback(async_context, server_obj, emit_request, request,
336 response);
337
338 } catch (exception &e) {
339 nxt_unit_req_warn(req, "request_handler: %s", e.str);
340 }
341 }
342
343
344 void
websocket_handler_cb(nxt_unit_websocket_frame_t * ws)345 Unit::websocket_handler_cb(nxt_unit_websocket_frame_t *ws)
346 {
347 Unit *obj;
348
349 obj = reinterpret_cast<Unit *>(ws->req->unit->data);
350
351 obj->websocket_handler(ws);
352 }
353
354
355 void
websocket_handler(nxt_unit_websocket_frame_t * ws)356 Unit::websocket_handler(nxt_unit_websocket_frame_t *ws)
357 {
358 napi_value frame, server_obj, process_frame, conn;
359 req_data_t *req_data;
360
361 req_data = (req_data_t *) ws->req->data;
362
363 try {
364 nxt_handle_scope scope(env());
365
366 server_obj = get_server_object();
367
368 frame = create_websocket_frame(server_obj, ws);
369
370 conn = get_reference_value(req_data->conn_ref);
371
372 process_frame = get_named_property(conn, "processFrame");
373
374 nxt_async_context async_context(env(), "websocket_handler");
375 nxt_callback_scope async_scope(async_context);
376
377 make_callback(async_context, conn, process_frame, frame);
378
379 } catch (exception &e) {
380 nxt_unit_req_warn(ws->req, "websocket_handler: %s", e.str);
381 }
382
383 nxt_unit_websocket_done(ws);
384 }
385
386
387 void
close_handler_cb(nxt_unit_request_info_t * req)388 Unit::close_handler_cb(nxt_unit_request_info_t *req)
389 {
390 Unit *obj;
391
392 obj = reinterpret_cast<Unit *>(req->unit->data);
393
394 obj->close_handler(req);
395 }
396
397
398 void
close_handler(nxt_unit_request_info_t * req)399 Unit::close_handler(nxt_unit_request_info_t *req)
400 {
401 napi_value conn_handle_close, conn;
402 req_data_t *req_data;
403
404 req_data = (req_data_t *) req->data;
405
406 try {
407 nxt_handle_scope scope(env());
408
409 conn = get_reference_value(req_data->conn_ref);
410
411 conn_handle_close = get_named_property(conn, "handleSocketClose");
412
413 nxt_async_context async_context(env(), "close_handler");
414 nxt_callback_scope async_scope(async_context);
415
416 make_callback(async_context, conn, conn_handle_close,
417 nxt_napi::create(0));
418
419 remove_wrap(req_data->sock_ref);
420 remove_wrap(req_data->req_ref);
421 remove_wrap(req_data->resp_ref);
422 remove_wrap(req_data->conn_ref);
423
424 } catch (exception &e) {
425 nxt_unit_req_warn(req, "close_handler: %s", e.str);
426
427 nxt_unit_request_done(req, NXT_UNIT_ERROR);
428
429 return;
430 }
431
432 nxt_unit_request_done(req, NXT_UNIT_OK);
433 }
434
435
436 void
shm_ack_handler_cb(nxt_unit_ctx_t * ctx)437 Unit::shm_ack_handler_cb(nxt_unit_ctx_t *ctx)
438 {
439 Unit *obj;
440
441 obj = reinterpret_cast<Unit *>(ctx->unit->data);
442
443 obj->shm_ack_handler(ctx);
444 }
445
446
447 void
shm_ack_handler(nxt_unit_ctx_t * ctx)448 Unit::shm_ack_handler(nxt_unit_ctx_t *ctx)
449 {
450 napi_value server_obj, emit_drain;
451
452 try {
453 nxt_handle_scope scope(env());
454
455 server_obj = get_server_object();
456
457 emit_drain = get_named_property(server_obj, "emit_drain");
458
459 nxt_async_context async_context(env(), "shm_ack_handler");
460 nxt_callback_scope async_scope(async_context);
461
462 make_callback(async_context, server_obj, emit_drain);
463
464 } catch (exception &e) {
465 nxt_unit_warn(ctx, "shm_ack_handler: %s", e.str);
466 }
467 }
468
469
470 int
add_port(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port)471 Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
472 {
473 int err;
474 Unit *obj;
475 uv_loop_t *loop;
476 port_data_t *data;
477 napi_status status;
478
479 if (port->in_fd != -1) {
480 if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) {
481 nxt_unit_warn(ctx, "fcntl(%d, O_NONBLOCK) failed: %s (%d)",
482 port->in_fd, strerror(errno), errno);
483 return -1;
484 }
485
486 obj = reinterpret_cast<Unit *>(ctx->unit->data);
487
488 status = napi_get_uv_event_loop(obj->env(), &loop);
489 if (status != napi_ok) {
490 nxt_unit_warn(ctx, "Failed to get uv.loop");
491 return NXT_UNIT_ERROR;
492 }
493
494 data = new port_data_t(ctx, port);
495
496 err = uv_poll_init(loop, &data->poll, port->in_fd);
497 if (err < 0) {
498 nxt_unit_warn(ctx, "Failed to init uv.poll");
499 delete data;
500 return NXT_UNIT_ERROR;
501 }
502
503 err = uv_poll_start(&data->poll, UV_READABLE,
504 port_data_t::read_callback);
505 if (err < 0) {
506 nxt_unit_warn(ctx, "Failed to start uv.poll");
507 delete data;
508 return NXT_UNIT_ERROR;
509 }
510
511 port->data = data;
512
513 data->ref_count++;
514 data->poll.data = data;
515 }
516
517 return NXT_UNIT_OK;
518 }
519
520
521 void
remove_port(nxt_unit_t * unit,nxt_unit_ctx_t * ctx,nxt_unit_port_t * port)522 Unit::remove_port(nxt_unit_t *unit, nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
523 {
524 port_data_t *data;
525
526 if (port->data != NULL && ctx != NULL) {
527 data = (port_data_t *) port->data;
528
529 data->stop();
530 }
531 }
532
533
534 void
quit_cb(nxt_unit_ctx_t * ctx)535 Unit::quit_cb(nxt_unit_ctx_t *ctx)
536 {
537 Unit *obj;
538
539 obj = reinterpret_cast<Unit *>(ctx->unit->data);
540
541 obj->quit(ctx);
542 }
543
544
545 void
quit(nxt_unit_ctx_t * ctx)546 Unit::quit(nxt_unit_ctx_t *ctx)
547 {
548 napi_value server_obj, emit_close;
549
550 try {
551 nxt_handle_scope scope(env());
552
553 server_obj = get_server_object();
554
555 emit_close = get_named_property(server_obj, "emit_close");
556
557 nxt_async_context async_context(env(), "unit_quit");
558 nxt_callback_scope async_scope(async_context);
559
560 make_callback(async_context, server_obj, emit_close);
561
562 } catch (exception &e) {
563 nxt_unit_debug(ctx, "quit: %s", e.str);
564 }
565
566 nxt_unit_done(ctx);
567 }
568
569
570 napi_value
get_server_object()571 Unit::get_server_object()
572 {
573 napi_value unit_obj;
574
575 unit_obj = get_reference_value(wrapper_);
576
577 return get_named_property(unit_obj, "server");
578 }
579
580
581 void
create_headers(nxt_unit_request_info_t * req,napi_value request)582 Unit::create_headers(nxt_unit_request_info_t *req, napi_value request)
583 {
584 char *p;
585 uint32_t i;
586 napi_value headers, raw_headers;
587 napi_status status;
588 nxt_unit_request_t *r;
589
590 r = req->request;
591
592 headers = create_object();
593
594 status = napi_create_array_with_length(env(), r->fields_count * 2,
595 &raw_headers);
596 if (status != napi_ok) {
597 throw exception("Failed to create array");
598 }
599
600 for (i = 0; i < r->fields_count; i++) {
601 append_header(r->fields + i, headers, raw_headers, i);
602 }
603
604 set_named_property(request, "headers", headers);
605 set_named_property(request, "rawHeaders", raw_headers);
606
607 // trim the "HTTP/" protocol prefix
608 p = (char *) nxt_unit_sptr_get(&r->version);
609 p += 5;
610
611 set_named_property(request, "httpVersion", create_string_latin1(p, 3));
612 set_named_property(request, "method", r->method, r->method_length);
613 set_named_property(request, "url", r->target, r->target_length);
614
615 set_named_property(request, "_websocket_handshake", r->websocket_handshake);
616 }
617
618
619 inline char
lowcase(char c)620 lowcase(char c)
621 {
622 return (c >= 'A' && c <= 'Z') ? (c | 0x20) : c;
623 }
624
625
626 inline void
append_header(nxt_unit_field_t * f,napi_value headers,napi_value raw_headers,uint32_t idx)627 Unit::append_header(nxt_unit_field_t *f, napi_value headers,
628 napi_value raw_headers, uint32_t idx)
629 {
630 char *name;
631 uint8_t i;
632 napi_value str, vstr;
633
634 name = (char *) nxt_unit_sptr_get(&f->name);
635
636 str = create_string_latin1(name, f->name_length);
637
638 for (i = 0; i < f->name_length; i++) {
639 name[i] = lowcase(name[i]);
640 }
641
642 vstr = set_named_property(headers, name, f->value, f->value_length);
643
644 set_element(raw_headers, idx * 2, str);
645 set_element(raw_headers, idx * 2 + 1, vstr);
646 }
647
648
649 napi_value
create_socket(napi_value server_obj,nxt_unit_request_info_t * req)650 Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req)
651 {
652 napi_value constructor, res;
653 req_data_t *req_data;
654 nxt_unit_request_t *r;
655
656 r = req->request;
657
658 constructor = get_named_property(server_obj, "Socket");
659
660 res = new_instance(constructor);
661
662 req_data = (req_data_t *) req->data;
663 req_data->sock_ref = wrap(res, req, sock_destroy);
664
665 set_named_property(res, "remoteAddress", r->remote, r->remote_length);
666 set_named_property(res, "localAddress", r->local_addr,
667 r->local_addr_length);
668
669 return res;
670 }
671
672
673 napi_value
create_request(napi_value server_obj,napi_value socket,nxt_unit_request_info_t * req)674 Unit::create_request(napi_value server_obj, napi_value socket,
675 nxt_unit_request_info_t *req)
676 {
677 napi_value constructor, res;
678 req_data_t *req_data;
679
680 constructor = get_named_property(server_obj, "ServerRequest");
681
682 res = new_instance(constructor, server_obj, socket);
683
684 req_data = (req_data_t *) req->data;
685 req_data->req_ref = wrap(res, req, req_destroy);
686
687 return res;
688 }
689
690
691 napi_value
create_response(napi_value server_obj,napi_value request,nxt_unit_request_info_t * req)692 Unit::create_response(napi_value server_obj, napi_value request,
693 nxt_unit_request_info_t *req)
694 {
695 napi_value constructor, res;
696 req_data_t *req_data;
697
698 constructor = get_named_property(server_obj, "ServerResponse");
699
700 res = new_instance(constructor, request);
701
702 req_data = (req_data_t *) req->data;
703 req_data->resp_ref = wrap(res, req, resp_destroy);
704
705 return res;
706 }
707
708
709 napi_value
create_websocket_frame(napi_value server_obj,nxt_unit_websocket_frame_t * ws)710 Unit::create_websocket_frame(napi_value server_obj,
711 nxt_unit_websocket_frame_t *ws)
712 {
713 void *data;
714 napi_value constructor, res, buffer;
715 uint8_t sc[2];
716
717 constructor = get_named_property(server_obj, "WebSocketFrame");
718
719 res = new_instance(constructor);
720
721 set_named_property(res, "fin", (bool) ws->header->fin);
722 set_named_property(res, "opcode", ws->header->opcode);
723 set_named_property(res, "length", (int64_t) ws->payload_len);
724
725 if (ws->header->opcode == NXT_WEBSOCKET_OP_CLOSE) {
726 if (ws->payload_len >= 2) {
727 nxt_unit_websocket_read(ws, sc, 2);
728
729 set_named_property(res, "closeStatus",
730 (((uint16_t) sc[0]) << 8) | sc[1]);
731
732 } else {
733 set_named_property(res, "closeStatus", -1);
734 }
735 }
736
737 buffer = create_buffer((size_t) ws->content_length, &data);
738 nxt_unit_websocket_read(ws, data, ws->content_length);
739
740 set_named_property(res, "binaryPayload", buffer);
741
742 return res;
743 }
744
745
746 napi_value
request_read(napi_env env,napi_callback_info info)747 Unit::request_read(napi_env env, napi_callback_info info)
748 {
749 void *data;
750 uint32_t wm;
751 nxt_napi napi(env);
752 napi_value this_arg, argv, buffer;
753 nxt_unit_request_info_t *req;
754
755 try {
756 this_arg = napi.get_cb_info(info, argv);
757
758 try {
759 req = napi.get_request_info(this_arg);
760
761 } catch (exception &e) {
762 return nullptr;
763 }
764
765 if (req->content_length == 0) {
766 return nullptr;
767 }
768
769 wm = napi.get_value_uint32(argv);
770
771 if (wm > req->content_length) {
772 wm = req->content_length;
773 }
774
775 buffer = napi.create_buffer((size_t) wm, &data);
776 nxt_unit_request_read(req, data, wm);
777
778 } catch (exception &e) {
779 napi.throw_error(e);
780 return nullptr;
781 }
782
783 return buffer;
784 }
785
786
787 napi_value
response_send_headers(napi_env env,napi_callback_info info)788 Unit::response_send_headers(napi_env env, napi_callback_info info)
789 {
790 int ret;
791 char *ptr, *name_ptr;
792 bool is_array;
793 size_t argc, name_len, value_len;
794 uint32_t status_code, header_len, keys_len, array_len;
795 uint32_t keys_count, i, j;
796 uint16_t hash;
797 nxt_napi napi(env);
798 napi_value this_arg, headers, keys, name, value, array_val;
799 napi_value array_entry;
800 napi_valuetype val_type;
801 nxt_unit_field_t *f;
802 nxt_unit_request_info_t *req;
803 napi_value argv[4];
804
805 argc = 4;
806
807 try {
808 this_arg = napi.get_cb_info(info, argc, argv);
809 if (argc != 4) {
810 napi.throw_error("Wrong args count. Expected: "
811 "statusCode, headers, headers count, "
812 "headers length");
813 return nullptr;
814 }
815
816 req = napi.get_request_info(this_arg);
817 status_code = napi.get_value_uint32(argv[0]);
818 keys_count = napi.get_value_uint32(argv[2]);
819 header_len = napi.get_value_uint32(argv[3]);
820
821 headers = argv[1];
822
823 ret = nxt_unit_response_init(req, status_code, keys_count, header_len);
824 if (ret != NXT_UNIT_OK) {
825 napi.throw_error("Failed to create response");
826 return nullptr;
827 }
828
829 /*
830 * Each name and value are 0-terminated by libunit.
831 * Need to add extra 2 bytes for each header.
832 */
833 header_len += keys_count * 2;
834
835 keys = napi.get_property_names(headers);
836 keys_len = napi.get_array_length(keys);
837
838 ptr = req->response_buf->free;
839
840 for (i = 0; i < keys_len; i++) {
841 name = napi.get_element(keys, i);
842
843 array_entry = napi.get_property(headers, name);
844
845 name = napi.get_element(array_entry, 0);
846 value = napi.get_element(array_entry, 1);
847
848 name_len = napi.get_value_string_latin1(name, ptr, header_len);
849 name_ptr = ptr;
850
851 ptr += name_len + 1;
852 header_len -= name_len + 1;
853
854 hash = nxt_unit_field_hash(name_ptr, name_len);
855
856 is_array = napi.is_array(value);
857
858 if (is_array) {
859 array_len = napi.get_array_length(value);
860
861 for (j = 0; j < array_len; j++) {
862 array_val = napi.get_element(value, j);
863
864 val_type = napi.type_of(array_val);
865
866 if (val_type != napi_string) {
867 array_val = napi.coerce_to_string(array_val);
868 }
869
870 value_len = napi.get_value_string_latin1(array_val, ptr,
871 header_len);
872
873 f = req->response->fields + req->response->fields_count;
874 f->skip = 0;
875
876 nxt_unit_sptr_set(&f->name, name_ptr);
877
878 f->name_length = name_len;
879 f->hash = hash;
880
881 nxt_unit_sptr_set(&f->value, ptr);
882 f->value_length = (uint32_t) value_len;
883
884 ptr += value_len + 1;
885 header_len -= value_len + 1;
886
887 req->response->fields_count++;
888 }
889
890 } else {
891 val_type = napi.type_of(value);
892
893 if (val_type != napi_string) {
894 value = napi.coerce_to_string(value);
895 }
896
897 value_len = napi.get_value_string_latin1(value, ptr, header_len);
898
899 f = req->response->fields + req->response->fields_count;
900 f->skip = 0;
901
902 nxt_unit_sptr_set(&f->name, name_ptr);
903
904 f->name_length = name_len;
905 f->hash = hash;
906
907 nxt_unit_sptr_set(&f->value, ptr);
908 f->value_length = (uint32_t) value_len;
909
910 ptr += value_len + 1;
911 header_len -= value_len + 1;
912
913 req->response->fields_count++;
914 }
915 }
916
917 } catch (exception &e) {
918 napi.throw_error(e);
919 return nullptr;
920 }
921
922 req->response_buf->free = ptr;
923
924 ret = nxt_unit_response_send(req);
925 if (ret != NXT_UNIT_OK) {
926 napi.throw_error("Failed to send response");
927 return nullptr;
928 }
929
930 return this_arg;
931 }
932
933
934 napi_value
response_write(napi_env env,napi_callback_info info)935 Unit::response_write(napi_env env, napi_callback_info info)
936 {
937 int ret;
938 void *ptr;
939 size_t argc, have_buf_len;
940 ssize_t res_len;
941 uint32_t buf_start, buf_len;
942 nxt_napi napi(env);
943 napi_value this_arg;
944 nxt_unit_buf_t *buf;
945 napi_valuetype buf_type;
946 nxt_unit_request_info_t *req;
947 napi_value argv[3];
948
949 argc = 3;
950
951 try {
952 this_arg = napi.get_cb_info(info, argc, argv);
953 if (argc != 3) {
954 throw exception("Wrong args count. Expected: "
955 "chunk, start, length");
956 }
957
958 req = napi.get_request_info(this_arg);
959 buf_type = napi.type_of(argv[0]);
960 buf_start = napi.get_value_uint32(argv[1]);
961 buf_len = napi.get_value_uint32(argv[2]) + 1;
962
963 if (buf_type == napi_string) {
964 /* TODO: will work only for utf8 content-type */
965
966 if (req->response_buf != NULL
967 && req->response_buf->end >= req->response_buf->free + buf_len)
968 {
969 buf = req->response_buf;
970
971 } else {
972 buf = nxt_unit_response_buf_alloc(req, buf_len);
973 if (buf == NULL) {
974 throw exception("Failed to allocate response buffer");
975 }
976 }
977
978 have_buf_len = napi.get_value_string_utf8(argv[0], buf->free,
979 buf_len);
980
981 buf->free += have_buf_len;
982
983 ret = nxt_unit_buf_send(buf);
984 if (ret == NXT_UNIT_OK) {
985 res_len = have_buf_len;
986 }
987
988 } else {
989 ptr = napi.get_buffer_info(argv[0], have_buf_len);
990
991 if (buf_start > 0) {
992 ptr = ((uint8_t *) ptr) + buf_start;
993 have_buf_len -= buf_start;
994 }
995
996 res_len = nxt_unit_response_write_nb(req, ptr, have_buf_len, 0);
997
998 ret = res_len < 0 ? -res_len : (int) NXT_UNIT_OK;
999 }
1000
1001 if (ret != NXT_UNIT_OK) {
1002 throw exception("Failed to send body buf");
1003 }
1004 } catch (exception &e) {
1005 napi.throw_error(e);
1006 return nullptr;
1007 }
1008
1009 return napi.create((int64_t) res_len);
1010 }
1011
1012
1013 napi_value
response_end(napi_env env,napi_callback_info info)1014 Unit::response_end(napi_env env, napi_callback_info info)
1015 {
1016 nxt_napi napi(env);
1017 napi_value this_arg;
1018 req_data_t *req_data;
1019 nxt_unit_request_info_t *req;
1020
1021 try {
1022 this_arg = napi.get_cb_info(info);
1023
1024 req = napi.get_request_info(this_arg);
1025
1026 req_data = (req_data_t *) req->data;
1027
1028 napi.remove_wrap(req_data->sock_ref);
1029 napi.remove_wrap(req_data->req_ref);
1030 napi.remove_wrap(req_data->resp_ref);
1031 napi.remove_wrap(req_data->conn_ref);
1032
1033 } catch (exception &e) {
1034 napi.throw_error(e);
1035 return nullptr;
1036 }
1037
1038 nxt_unit_request_done(req, NXT_UNIT_OK);
1039
1040 return this_arg;
1041 }
1042
1043
1044 napi_value
websocket_send_frame(napi_env env,napi_callback_info info)1045 Unit::websocket_send_frame(napi_env env, napi_callback_info info)
1046 {
1047 int ret, iovec_len;
1048 bool fin;
1049 size_t buf_len;
1050 uint32_t opcode, sc;
1051 nxt_napi napi(env);
1052 napi_value this_arg, frame, payload;
1053 nxt_unit_request_info_t *req;
1054 char status_code[2];
1055 struct iovec iov[2];
1056
1057 iovec_len = 0;
1058
1059 try {
1060 this_arg = napi.get_cb_info(info, frame);
1061
1062 req = napi.get_request_info(this_arg);
1063
1064 opcode = napi.get_value_uint32(napi.get_named_property(frame,
1065 "opcode"));
1066 if (opcode == NXT_WEBSOCKET_OP_CLOSE) {
1067 sc = napi.get_value_uint32(napi.get_named_property(frame,
1068 "closeStatus"));
1069 status_code[0] = (sc >> 8) & 0xFF;
1070 status_code[1] = sc & 0xFF;
1071
1072 iov[iovec_len].iov_base = status_code;
1073 iov[iovec_len].iov_len = 2;
1074 iovec_len++;
1075 }
1076
1077 try {
1078 fin = napi.get_value_bool(napi.get_named_property(frame, "fin"));
1079
1080 } catch (exception &e) {
1081 fin = true;
1082 }
1083
1084 payload = napi.get_named_property(frame, "binaryPayload");
1085
1086 if (napi.is_buffer(payload)) {
1087 iov[iovec_len].iov_base = napi.get_buffer_info(payload, buf_len);
1088
1089 } else {
1090 buf_len = 0;
1091 }
1092
1093 } catch (exception &e) {
1094 napi.throw_error(e);
1095 return nullptr;
1096 }
1097
1098 if (buf_len > 0) {
1099 iov[iovec_len].iov_len = buf_len;
1100 iovec_len++;
1101 }
1102
1103 ret = nxt_unit_websocket_sendv(req, opcode, fin ? 1 : 0, iov, iovec_len);
1104 if (ret != NXT_UNIT_OK) {
1105 goto failed;
1106 }
1107
1108 return this_arg;
1109
1110 failed:
1111
1112 napi.throw_error("Failed to send frame");
1113
1114 return nullptr;
1115 }
1116
1117
1118 napi_value
websocket_set_sock(napi_env env,napi_callback_info info)1119 Unit::websocket_set_sock(napi_env env, napi_callback_info info)
1120 {
1121 nxt_napi napi(env);
1122 napi_value this_arg, sock;
1123 req_data_t *req_data;
1124 nxt_unit_request_info_t *req;
1125
1126 try {
1127 this_arg = napi.get_cb_info(info, sock);
1128
1129 req = napi.get_request_info(sock);
1130
1131 req_data = (req_data_t *) req->data;
1132 req_data->conn_ref = napi.wrap(this_arg, req, conn_destroy);
1133
1134 } catch (exception &e) {
1135 napi.throw_error(e);
1136 return nullptr;
1137 }
1138
1139 return this_arg;
1140 }
1141
1142
1143 void
conn_destroy(napi_env env,void * r,void * finalize_hint)1144 Unit::conn_destroy(napi_env env, void *r, void *finalize_hint)
1145 {
1146 nxt_unit_req_debug(NULL, "conn_destroy: %p", r);
1147 }
1148
1149
1150 void
sock_destroy(napi_env env,void * r,void * finalize_hint)1151 Unit::sock_destroy(napi_env env, void *r, void *finalize_hint)
1152 {
1153 nxt_unit_req_debug(NULL, "sock_destroy: %p", r);
1154 }
1155
1156
1157 void
req_destroy(napi_env env,void * r,void * finalize_hint)1158 Unit::req_destroy(napi_env env, void *r, void *finalize_hint)
1159 {
1160 nxt_unit_req_debug(NULL, "req_destroy: %p", r);
1161 }
1162
1163
1164 void
resp_destroy(napi_env env,void * r,void * finalize_hint)1165 Unit::resp_destroy(napi_env env, void *r, void *finalize_hint)
1166 {
1167 nxt_unit_req_debug(NULL, "resp_destroy: %p", r);
1168 }
1169