xref: /unit/src/nodejs/unit-http/unit.cpp (revision 2208:26af8eadc943)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #include "unit.h"
7 
8 #include <unistd.h>
9 #include <fcntl.h>
10 
11 #include <uv.h>
12 
13 #include <nxt_unit_websocket.h>
14 
15 
16 napi_ref Unit::constructor_;
17 
18 
19 struct port_data_t {
20     port_data_t(nxt_unit_ctx_t *c, nxt_unit_port_t *p);
21 
22     void process_port_msg();
23     void stop();
24 
25     template<typename T>
26     static port_data_t *get(T *handle);
27 
28     static void read_callback(uv_poll_t *handle, int status, int events);
29     static void timer_callback(uv_timer_t *handle);
30     static void delete_data(uv_handle_t* handle);
31 
32     nxt_unit_ctx_t   *ctx;
33     nxt_unit_port_t  *port;
34     uv_poll_t        poll;
35     uv_timer_t       timer;
36     int              ref_count;
37     bool             scheduled;
38     bool             stopped;
39 };
40 
41 
42 struct req_data_t {
43     napi_ref  sock_ref;
44     napi_ref  req_ref;
45     napi_ref  resp_ref;
46     napi_ref  conn_ref;
47 };
48 
49 
port_data_t(nxt_unit_ctx_t * c,nxt_unit_port_t * p)50 port_data_t::port_data_t(nxt_unit_ctx_t *c, nxt_unit_port_t *p) :
51     ctx(c), port(p), ref_count(0), scheduled(false), stopped(false)
52 {
53     timer.type = UV_UNKNOWN_HANDLE;
54 }
55 
56 
57 void
process_port_msg()58 port_data_t::process_port_msg()
59 {
60     int  rc, err;
61 
62     rc = nxt_unit_process_port_msg(ctx, port);
63 
64     if (rc != NXT_UNIT_OK) {
65         return;
66     }
67 
68     if (timer.type == UV_UNKNOWN_HANDLE) {
69         err = uv_timer_init(poll.loop, &timer);
70         if (err < 0) {
71             nxt_unit_warn(ctx, "Failed to init uv.poll");
72             return;
73         }
74 
75         ref_count++;
76         timer.data = this;
77     }
78 
79     if (!scheduled && !stopped) {
80         uv_timer_start(&timer, timer_callback, 0, 0);
81 
82         scheduled = true;
83     }
84 }
85 
86 
87 void
stop()88 port_data_t::stop()
89 {
90     stopped = true;
91 
92     uv_poll_stop(&poll);
93 
94     uv_close((uv_handle_t *) &poll, delete_data);
95 
96     if (timer.type == UV_UNKNOWN_HANDLE) {
97         return;
98     }
99 
100     uv_timer_stop(&timer);
101 
102     uv_close((uv_handle_t *) &timer, delete_data);
103 }
104 
105 
106 template<typename T>
107 port_data_t *
get(T * handle)108 port_data_t::get(T *handle)
109 {
110     return (port_data_t *) handle->data;
111 }
112 
113 
114 void
read_callback(uv_poll_t * handle,int status,int events)115 port_data_t::read_callback(uv_poll_t *handle, int status, int events)
116 {
117     get(handle)->process_port_msg();
118 }
119 
120 
121 void
timer_callback(uv_timer_t * handle)122 port_data_t::timer_callback(uv_timer_t *handle)
123 {
124     port_data_t  *data;
125 
126     data = get(handle);
127 
128     data->scheduled = false;
129     if (data->stopped) {
130         return;
131     }
132 
133     data->process_port_msg();
134 }
135 
136 
137 void
delete_data(uv_handle_t * handle)138 port_data_t::delete_data(uv_handle_t* handle)
139 {
140     port_data_t  *data;
141 
142     data = get(handle);
143 
144     if (--data->ref_count <= 0) {
145         delete data;
146     }
147 }
148 
149 
Unit(napi_env env,napi_value jsthis)150 Unit::Unit(napi_env env, napi_value jsthis):
151     nxt_napi(env),
152     wrapper_(wrap(jsthis, this, destroy)),
153     unit_ctx_(nullptr)
154 {
155     nxt_unit_debug(NULL, "Unit::Unit()");
156 }
157 
158 
~Unit()159 Unit::~Unit()
160 {
161     delete_reference(wrapper_);
162 
163     nxt_unit_debug(NULL, "Unit::~Unit()");
164 }
165 
166 
167 napi_value
init(napi_env env,napi_value exports)168 Unit::init(napi_env env, napi_value exports)
169 {
170     nxt_napi    napi(env);
171     napi_value  ctor;
172 
173     napi_property_descriptor  unit_props[] = {
174         { "createServer", 0, create_server, 0, 0, 0, napi_default, 0 },
175         { "listen", 0, listen, 0, 0, 0, napi_default, 0 },
176     };
177 
178     try {
179         ctor = napi.define_class("Unit", create, 2, unit_props);
180         constructor_ = napi.create_reference(ctor);
181 
182         napi.set_named_property(exports, "Unit", ctor);
183         napi.set_named_property(exports, "request_read", request_read);
184         napi.set_named_property(exports, "response_send_headers",
185                                 response_send_headers);
186         napi.set_named_property(exports, "response_write", response_write);
187         napi.set_named_property(exports, "response_end", response_end);
188         napi.set_named_property(exports, "websocket_send_frame",
189                                 websocket_send_frame);
190         napi.set_named_property(exports, "websocket_set_sock",
191                                 websocket_set_sock);
192         napi.set_named_property(exports, "buf_min", nxt_unit_buf_min());
193         napi.set_named_property(exports, "buf_max", nxt_unit_buf_max());
194 
195     } catch (exception &e) {
196         napi.throw_error(e);
197         return nullptr;
198     }
199 
200     return exports;
201 }
202 
203 
204 void
destroy(napi_env env,void * nativeObject,void * finalize_hint)205 Unit::destroy(napi_env env, void *nativeObject, void *finalize_hint)
206 {
207     Unit  *obj = reinterpret_cast<Unit *>(nativeObject);
208 
209     delete obj;
210 }
211 
212 
213 napi_value
create(napi_env env,napi_callback_info info)214 Unit::create(napi_env env, napi_callback_info info)
215 {
216     nxt_napi    napi(env);
217     napi_value  target, ctor, instance, jsthis;
218 
219     try {
220         target = napi.get_new_target(info);
221 
222         if (target != nullptr) {
223             /* Invoked as constructor: `new Unit(...)`. */
224             jsthis = napi.get_cb_info(info);
225 
226             new Unit(env, jsthis);
227             napi.create_reference(jsthis);
228 
229             return jsthis;
230         }
231 
232         /* Invoked as plain function `Unit(...)`, turn into construct call. */
233         ctor = napi.get_reference_value(constructor_);
234         instance = napi.new_instance(ctor);
235         napi.create_reference(instance);
236 
237     } catch (exception &e) {
238         napi.throw_error(e);
239         return nullptr;
240     }
241 
242     return instance;
243 }
244 
245 
246 napi_value
create_server(napi_env env,napi_callback_info info)247 Unit::create_server(napi_env env, napi_callback_info info)
248 {
249     Unit             *obj;
250     size_t           argc;
251     nxt_napi         napi(env);
252     napi_value       jsthis, argv;
253     nxt_unit_init_t  unit_init;
254 
255     argc = 1;
256 
257     try {
258         jsthis = napi.get_cb_info(info, argc, &argv);
259         obj = (Unit *) napi.unwrap(jsthis);
260 
261     } catch (exception &e) {
262         napi.throw_error(e);
263         return nullptr;
264     }
265 
266     memset(&unit_init, 0, sizeof(nxt_unit_init_t));
267 
268     unit_init.data = obj;
269     unit_init.callbacks.request_handler   = request_handler_cb;
270     unit_init.callbacks.websocket_handler = websocket_handler_cb;
271     unit_init.callbacks.close_handler     = close_handler_cb;
272     unit_init.callbacks.shm_ack_handler   = shm_ack_handler_cb;
273     unit_init.callbacks.add_port          = add_port;
274     unit_init.callbacks.remove_port       = remove_port;
275     unit_init.callbacks.quit              = quit_cb;
276 
277     unit_init.request_data_size = sizeof(req_data_t);
278 
279     obj->unit_ctx_ = nxt_unit_init(&unit_init);
280     if (obj->unit_ctx_ == NULL) {
281         goto failed;
282     }
283 
284     return nullptr;
285 
286 failed:
287 
288     napi_throw_error(env, NULL, "Failed to create Unit object");
289 
290     return nullptr;
291 }
292 
293 
294 napi_value
listen(napi_env env,napi_callback_info info)295 Unit::listen(napi_env env, napi_callback_info info)
296 {
297     return nullptr;
298 }
299 
300 
301 void
request_handler_cb(nxt_unit_request_info_t * req)302 Unit::request_handler_cb(nxt_unit_request_info_t *req)
303 {
304     Unit  *obj;
305 
306     obj = reinterpret_cast<Unit *>(req->unit->data);
307 
308     obj->request_handler(req);
309 }
310 
311 
312 void
request_handler(nxt_unit_request_info_t * req)313 Unit::request_handler(nxt_unit_request_info_t *req)
314 {
315     napi_value  socket, request, response, server_obj, emit_request;
316 
317     memset(req->data, 0, sizeof(req_data_t));
318 
319     try {
320         nxt_handle_scope  scope(env());
321 
322         server_obj = get_server_object();
323 
324         socket = create_socket(server_obj, req);
325         request = create_request(server_obj, socket, req);
326         response = create_response(server_obj, request, req);
327 
328         create_headers(req, request);
329 
330         emit_request = get_named_property(server_obj, "emit_request");
331 
332         nxt_async_context   async_context(env(), "request_handler");
333         nxt_callback_scope  async_scope(async_context);
334 
335         make_callback(async_context, server_obj, emit_request, request,
336                       response);
337 
338     } catch (exception &e) {
339         nxt_unit_req_warn(req, "request_handler: %s", e.str);
340     }
341 }
342 
343 
344 void
websocket_handler_cb(nxt_unit_websocket_frame_t * ws)345 Unit::websocket_handler_cb(nxt_unit_websocket_frame_t *ws)
346 {
347     Unit  *obj;
348 
349     obj = reinterpret_cast<Unit *>(ws->req->unit->data);
350 
351     obj->websocket_handler(ws);
352 }
353 
354 
355 void
websocket_handler(nxt_unit_websocket_frame_t * ws)356 Unit::websocket_handler(nxt_unit_websocket_frame_t *ws)
357 {
358     napi_value  frame, server_obj, process_frame, conn;
359     req_data_t  *req_data;
360 
361     req_data = (req_data_t *) ws->req->data;
362 
363     try {
364         nxt_handle_scope  scope(env());
365 
366         server_obj = get_server_object();
367 
368         frame = create_websocket_frame(server_obj, ws);
369 
370         conn = get_reference_value(req_data->conn_ref);
371 
372         process_frame = get_named_property(conn, "processFrame");
373 
374         nxt_async_context   async_context(env(), "websocket_handler");
375         nxt_callback_scope  async_scope(async_context);
376 
377         make_callback(async_context, conn, process_frame, frame);
378 
379     } catch (exception &e) {
380         nxt_unit_req_warn(ws->req, "websocket_handler: %s", e.str);
381     }
382 
383     nxt_unit_websocket_done(ws);
384 }
385 
386 
387 void
close_handler_cb(nxt_unit_request_info_t * req)388 Unit::close_handler_cb(nxt_unit_request_info_t *req)
389 {
390     Unit  *obj;
391 
392     obj = reinterpret_cast<Unit *>(req->unit->data);
393 
394     obj->close_handler(req);
395 }
396 
397 
398 void
close_handler(nxt_unit_request_info_t * req)399 Unit::close_handler(nxt_unit_request_info_t *req)
400 {
401     napi_value  conn_handle_close, conn;
402     req_data_t  *req_data;
403 
404     req_data = (req_data_t *) req->data;
405 
406     try {
407         nxt_handle_scope  scope(env());
408 
409         conn = get_reference_value(req_data->conn_ref);
410 
411         conn_handle_close = get_named_property(conn, "handleSocketClose");
412 
413         nxt_async_context   async_context(env(), "close_handler");
414         nxt_callback_scope  async_scope(async_context);
415 
416         make_callback(async_context, conn, conn_handle_close,
417                       nxt_napi::create(0));
418 
419         remove_wrap(req_data->sock_ref);
420         remove_wrap(req_data->req_ref);
421         remove_wrap(req_data->resp_ref);
422         remove_wrap(req_data->conn_ref);
423 
424     } catch (exception &e) {
425         nxt_unit_req_warn(req, "close_handler: %s", e.str);
426 
427         nxt_unit_request_done(req, NXT_UNIT_ERROR);
428 
429         return;
430     }
431 
432     nxt_unit_request_done(req, NXT_UNIT_OK);
433 }
434 
435 
436 void
shm_ack_handler_cb(nxt_unit_ctx_t * ctx)437 Unit::shm_ack_handler_cb(nxt_unit_ctx_t *ctx)
438 {
439     Unit  *obj;
440 
441     obj = reinterpret_cast<Unit *>(ctx->unit->data);
442 
443     obj->shm_ack_handler(ctx);
444 }
445 
446 
447 void
shm_ack_handler(nxt_unit_ctx_t * ctx)448 Unit::shm_ack_handler(nxt_unit_ctx_t *ctx)
449 {
450     napi_value  server_obj, emit_drain;
451 
452     try {
453         nxt_handle_scope  scope(env());
454 
455         server_obj = get_server_object();
456 
457         emit_drain = get_named_property(server_obj, "emit_drain");
458 
459         nxt_async_context   async_context(env(), "shm_ack_handler");
460         nxt_callback_scope  async_scope(async_context);
461 
462         make_callback(async_context, server_obj, emit_drain);
463 
464     } catch (exception &e) {
465         nxt_unit_warn(ctx, "shm_ack_handler: %s", e.str);
466     }
467 }
468 
469 
470 int
add_port(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port)471 Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
472 {
473     int          err;
474     Unit         *obj;
475     uv_loop_t    *loop;
476     port_data_t  *data;
477     napi_status  status;
478 
479     if (port->in_fd != -1) {
480         if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) {
481             nxt_unit_warn(ctx, "fcntl(%d, O_NONBLOCK) failed: %s (%d)",
482                           port->in_fd, strerror(errno), errno);
483             return -1;
484         }
485 
486         obj = reinterpret_cast<Unit *>(ctx->unit->data);
487 
488         status = napi_get_uv_event_loop(obj->env(), &loop);
489         if (status != napi_ok) {
490             nxt_unit_warn(ctx, "Failed to get uv.loop");
491             return NXT_UNIT_ERROR;
492         }
493 
494         data = new port_data_t(ctx, port);
495 
496         err = uv_poll_init(loop, &data->poll, port->in_fd);
497         if (err < 0) {
498             nxt_unit_warn(ctx, "Failed to init uv.poll");
499             delete data;
500             return NXT_UNIT_ERROR;
501         }
502 
503         err = uv_poll_start(&data->poll, UV_READABLE,
504                             port_data_t::read_callback);
505         if (err < 0) {
506             nxt_unit_warn(ctx, "Failed to start uv.poll");
507             delete data;
508             return NXT_UNIT_ERROR;
509         }
510 
511         port->data = data;
512 
513         data->ref_count++;
514         data->poll.data = data;
515     }
516 
517     return NXT_UNIT_OK;
518 }
519 
520 
521 void
remove_port(nxt_unit_t * unit,nxt_unit_ctx_t * ctx,nxt_unit_port_t * port)522 Unit::remove_port(nxt_unit_t *unit, nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
523 {
524     port_data_t  *data;
525 
526     if (port->data != NULL && ctx != NULL) {
527         data = (port_data_t *) port->data;
528 
529         data->stop();
530     }
531 }
532 
533 
534 void
quit_cb(nxt_unit_ctx_t * ctx)535 Unit::quit_cb(nxt_unit_ctx_t *ctx)
536 {
537     Unit  *obj;
538 
539     obj = reinterpret_cast<Unit *>(ctx->unit->data);
540 
541     obj->quit(ctx);
542 }
543 
544 
545 void
quit(nxt_unit_ctx_t * ctx)546 Unit::quit(nxt_unit_ctx_t *ctx)
547 {
548     napi_value  server_obj, emit_close;
549 
550     try {
551         nxt_handle_scope  scope(env());
552 
553         server_obj = get_server_object();
554 
555         emit_close = get_named_property(server_obj, "emit_close");
556 
557         nxt_async_context   async_context(env(), "unit_quit");
558         nxt_callback_scope  async_scope(async_context);
559 
560         make_callback(async_context, server_obj, emit_close);
561 
562     } catch (exception &e) {
563         nxt_unit_debug(ctx, "quit: %s", e.str);
564     }
565 
566     nxt_unit_done(ctx);
567 }
568 
569 
570 napi_value
get_server_object()571 Unit::get_server_object()
572 {
573     napi_value  unit_obj;
574 
575     unit_obj = get_reference_value(wrapper_);
576 
577     return get_named_property(unit_obj, "server");
578 }
579 
580 
581 void
create_headers(nxt_unit_request_info_t * req,napi_value request)582 Unit::create_headers(nxt_unit_request_info_t *req, napi_value request)
583 {
584     uint32_t            i;
585     napi_value          headers, raw_headers;
586     napi_status         status;
587     nxt_unit_request_t  *r;
588 
589     r = req->request;
590 
591     headers = create_object();
592 
593     status = napi_create_array_with_length(env(), r->fields_count * 2,
594                                            &raw_headers);
595     if (status != napi_ok) {
596         throw exception("Failed to create array");
597     }
598 
599     for (i = 0; i < r->fields_count; i++) {
600         append_header(r->fields + i, headers, raw_headers, i);
601     }
602 
603     set_named_property(request, "headers", headers);
604     set_named_property(request, "rawHeaders", raw_headers);
605     set_named_property(request, "httpVersion", r->version, r->version_length);
606     set_named_property(request, "method", r->method, r->method_length);
607     set_named_property(request, "url", r->target, r->target_length);
608 
609     set_named_property(request, "_websocket_handshake", r->websocket_handshake);
610 }
611 
612 
613 inline char
lowcase(char c)614 lowcase(char c)
615 {
616     return (c >= 'A' && c <= 'Z') ? (c | 0x20) : c;
617 }
618 
619 
620 inline void
append_header(nxt_unit_field_t * f,napi_value headers,napi_value raw_headers,uint32_t idx)621 Unit::append_header(nxt_unit_field_t *f, napi_value headers,
622     napi_value raw_headers, uint32_t idx)
623 {
624     char        *name;
625     uint8_t     i;
626     napi_value  str, vstr;
627 
628     name = (char *) nxt_unit_sptr_get(&f->name);
629 
630     str = create_string_latin1(name, f->name_length);
631 
632     for (i = 0; i < f->name_length; i++) {
633         name[i] = lowcase(name[i]);
634     }
635 
636     vstr = set_named_property(headers, name, f->value, f->value_length);
637 
638     set_element(raw_headers, idx * 2, str);
639     set_element(raw_headers, idx * 2 + 1, vstr);
640 }
641 
642 
643 napi_value
create_socket(napi_value server_obj,nxt_unit_request_info_t * req)644 Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req)
645 {
646     napi_value          constructor, res;
647     req_data_t          *req_data;
648     nxt_unit_request_t  *r;
649 
650     r = req->request;
651 
652     constructor = get_named_property(server_obj, "Socket");
653 
654     res = new_instance(constructor);
655 
656     req_data = (req_data_t *) req->data;
657     req_data->sock_ref = wrap(res, req, sock_destroy);
658 
659     set_named_property(res, "remoteAddress", r->remote, r->remote_length);
660     set_named_property(res, "localAddress", r->local_addr,
661                        r->local_addr_length);
662 
663     return res;
664 }
665 
666 
667 napi_value
create_request(napi_value server_obj,napi_value socket,nxt_unit_request_info_t * req)668 Unit::create_request(napi_value server_obj, napi_value socket,
669     nxt_unit_request_info_t *req)
670 {
671     napi_value  constructor, res;
672     req_data_t  *req_data;
673 
674     constructor = get_named_property(server_obj, "ServerRequest");
675 
676     res = new_instance(constructor, server_obj, socket);
677 
678     req_data = (req_data_t *) req->data;
679     req_data->req_ref = wrap(res, req, req_destroy);
680 
681     return res;
682 }
683 
684 
685 napi_value
create_response(napi_value server_obj,napi_value request,nxt_unit_request_info_t * req)686 Unit::create_response(napi_value server_obj, napi_value request,
687     nxt_unit_request_info_t *req)
688 {
689     napi_value  constructor, res;
690     req_data_t  *req_data;
691 
692     constructor = get_named_property(server_obj, "ServerResponse");
693 
694     res = new_instance(constructor, request);
695 
696     req_data = (req_data_t *) req->data;
697     req_data->resp_ref = wrap(res, req, resp_destroy);
698 
699     return res;
700 }
701 
702 
703 napi_value
create_websocket_frame(napi_value server_obj,nxt_unit_websocket_frame_t * ws)704 Unit::create_websocket_frame(napi_value server_obj,
705                              nxt_unit_websocket_frame_t *ws)
706 {
707     void        *data;
708     napi_value  constructor, res, buffer;
709     uint8_t     sc[2];
710 
711     constructor = get_named_property(server_obj, "WebSocketFrame");
712 
713     res = new_instance(constructor);
714 
715     set_named_property(res, "fin", (bool) ws->header->fin);
716     set_named_property(res, "opcode", ws->header->opcode);
717     set_named_property(res, "length", (int64_t) ws->payload_len);
718 
719     if (ws->header->opcode == NXT_WEBSOCKET_OP_CLOSE) {
720         if (ws->payload_len >= 2) {
721             nxt_unit_websocket_read(ws, sc, 2);
722 
723             set_named_property(res, "closeStatus",
724                                (((uint16_t) sc[0]) << 8) | sc[1]);
725 
726         } else {
727             set_named_property(res, "closeStatus", -1);
728         }
729     }
730 
731     buffer = create_buffer((size_t) ws->content_length, &data);
732     nxt_unit_websocket_read(ws, data, ws->content_length);
733 
734     set_named_property(res, "binaryPayload", buffer);
735 
736     return res;
737 }
738 
739 
740 napi_value
request_read(napi_env env,napi_callback_info info)741 Unit::request_read(napi_env env, napi_callback_info info)
742 {
743     void                     *data;
744     uint32_t                 wm;
745     nxt_napi                 napi(env);
746     napi_value               this_arg, argv, buffer;
747     nxt_unit_request_info_t  *req;
748 
749     try {
750         this_arg = napi.get_cb_info(info, argv);
751 
752         try {
753             req = napi.get_request_info(this_arg);
754 
755         } catch (exception &e) {
756             return nullptr;
757         }
758 
759         if (req->content_length == 0) {
760             return nullptr;
761         }
762 
763         wm = napi.get_value_uint32(argv);
764 
765         if (wm > req->content_length) {
766             wm = req->content_length;
767         }
768 
769         buffer = napi.create_buffer((size_t) wm, &data);
770         nxt_unit_request_read(req, data, wm);
771 
772     } catch (exception &e) {
773         napi.throw_error(e);
774         return nullptr;
775     }
776 
777     return buffer;
778 }
779 
780 
781 napi_value
response_send_headers(napi_env env,napi_callback_info info)782 Unit::response_send_headers(napi_env env, napi_callback_info info)
783 {
784     int                      ret;
785     char                     *ptr, *name_ptr;
786     bool                     is_array;
787     size_t                   argc, name_len, value_len;
788     uint32_t                 status_code, header_len, keys_len, array_len;
789     uint32_t                 keys_count, i, j;
790     uint16_t                 hash;
791     nxt_napi                 napi(env);
792     napi_value               this_arg, headers, keys, name, value, array_val;
793     napi_value               array_entry;
794     napi_valuetype           val_type;
795     nxt_unit_field_t         *f;
796     nxt_unit_request_info_t  *req;
797     napi_value               argv[4];
798 
799     argc = 4;
800 
801     try {
802         this_arg = napi.get_cb_info(info, argc, argv);
803         if (argc != 4) {
804             napi.throw_error("Wrong args count. Expected: "
805                              "statusCode, headers, headers count, "
806                              "headers length");
807             return nullptr;
808         }
809 
810         req = napi.get_request_info(this_arg);
811         status_code = napi.get_value_uint32(argv[0]);
812         keys_count = napi.get_value_uint32(argv[2]);
813         header_len = napi.get_value_uint32(argv[3]);
814 
815         headers = argv[1];
816 
817         ret = nxt_unit_response_init(req, status_code, keys_count, header_len);
818         if (ret != NXT_UNIT_OK) {
819             napi.throw_error("Failed to create response");
820             return nullptr;
821         }
822 
823         /*
824          * Each name and value are 0-terminated by libunit.
825          * Need to add extra 2 bytes for each header.
826          */
827         header_len += keys_count * 2;
828 
829         keys = napi.get_property_names(headers);
830         keys_len = napi.get_array_length(keys);
831 
832         ptr = req->response_buf->free;
833 
834         for (i = 0; i < keys_len; i++) {
835             name = napi.get_element(keys, i);
836 
837             array_entry = napi.get_property(headers, name);
838 
839             name = napi.get_element(array_entry, 0);
840             value = napi.get_element(array_entry, 1);
841 
842             name_len = napi.get_value_string_latin1(name, ptr, header_len);
843             name_ptr = ptr;
844 
845             ptr += name_len + 1;
846             header_len -= name_len + 1;
847 
848             hash = nxt_unit_field_hash(name_ptr, name_len);
849 
850             is_array = napi.is_array(value);
851 
852             if (is_array) {
853                 array_len = napi.get_array_length(value);
854 
855                 for (j = 0; j < array_len; j++) {
856                     array_val = napi.get_element(value, j);
857 
858                     val_type = napi.type_of(array_val);
859 
860                     if (val_type != napi_string) {
861                         array_val = napi.coerce_to_string(array_val);
862                     }
863 
864                     value_len = napi.get_value_string_latin1(array_val, ptr,
865                                                              header_len);
866 
867                     f = req->response->fields + req->response->fields_count;
868                     f->skip = 0;
869 
870                     nxt_unit_sptr_set(&f->name, name_ptr);
871 
872                     f->name_length = name_len;
873                     f->hash = hash;
874 
875                     nxt_unit_sptr_set(&f->value, ptr);
876                     f->value_length = (uint32_t) value_len;
877 
878                     ptr += value_len + 1;
879                     header_len -= value_len + 1;
880 
881                     req->response->fields_count++;
882                 }
883 
884             } else {
885                 val_type = napi.type_of(value);
886 
887                 if (val_type != napi_string) {
888                     value = napi.coerce_to_string(value);
889                 }
890 
891                 value_len = napi.get_value_string_latin1(value, ptr, header_len);
892 
893                 f = req->response->fields + req->response->fields_count;
894                 f->skip = 0;
895 
896                 nxt_unit_sptr_set(&f->name, name_ptr);
897 
898                 f->name_length = name_len;
899                 f->hash = hash;
900 
901                 nxt_unit_sptr_set(&f->value, ptr);
902                 f->value_length = (uint32_t) value_len;
903 
904                 ptr += value_len + 1;
905                 header_len -= value_len + 1;
906 
907                 req->response->fields_count++;
908             }
909         }
910 
911     } catch (exception &e) {
912         napi.throw_error(e);
913         return nullptr;
914     }
915 
916     req->response_buf->free = ptr;
917 
918     ret = nxt_unit_response_send(req);
919     if (ret != NXT_UNIT_OK) {
920         napi.throw_error("Failed to send response");
921         return nullptr;
922     }
923 
924     return this_arg;
925 }
926 
927 
928 napi_value
response_write(napi_env env,napi_callback_info info)929 Unit::response_write(napi_env env, napi_callback_info info)
930 {
931     int                      ret;
932     void                     *ptr;
933     size_t                   argc, have_buf_len;
934     ssize_t                  res_len;
935     uint32_t                 buf_start, buf_len;
936     nxt_napi                 napi(env);
937     napi_value               this_arg;
938     nxt_unit_buf_t           *buf;
939     napi_valuetype           buf_type;
940     nxt_unit_request_info_t  *req;
941     napi_value               argv[3];
942 
943     argc = 3;
944 
945     try {
946         this_arg = napi.get_cb_info(info, argc, argv);
947         if (argc != 3) {
948             throw exception("Wrong args count. Expected: "
949                             "chunk, start, length");
950         }
951 
952         req = napi.get_request_info(this_arg);
953         buf_type = napi.type_of(argv[0]);
954         buf_start = napi.get_value_uint32(argv[1]);
955         buf_len = napi.get_value_uint32(argv[2]) + 1;
956 
957         if (buf_type == napi_string) {
958             /* TODO: will work only for utf8 content-type */
959 
960             if (req->response_buf != NULL
961                 && req->response_buf->end >= req->response_buf->free + buf_len)
962             {
963                 buf = req->response_buf;
964 
965             } else {
966                 buf = nxt_unit_response_buf_alloc(req, buf_len);
967                 if (buf == NULL) {
968                     throw exception("Failed to allocate response buffer");
969                 }
970             }
971 
972             have_buf_len = napi.get_value_string_utf8(argv[0], buf->free,
973                                                       buf_len);
974 
975             buf->free += have_buf_len;
976 
977             ret = nxt_unit_buf_send(buf);
978             if (ret == NXT_UNIT_OK) {
979                 res_len = have_buf_len;
980             }
981 
982         } else {
983             ptr = napi.get_buffer_info(argv[0], have_buf_len);
984 
985             if (buf_start > 0) {
986                 ptr = ((uint8_t *) ptr) + buf_start;
987                 have_buf_len -= buf_start;
988             }
989 
990             res_len = nxt_unit_response_write_nb(req, ptr, have_buf_len, 0);
991 
992             ret = res_len < 0 ? -res_len : (int) NXT_UNIT_OK;
993         }
994 
995         if (ret != NXT_UNIT_OK) {
996             throw exception("Failed to send body buf");
997         }
998     } catch (exception &e) {
999         napi.throw_error(e);
1000         return nullptr;
1001     }
1002 
1003     return napi.create((int64_t) res_len);
1004 }
1005 
1006 
1007 napi_value
response_end(napi_env env,napi_callback_info info)1008 Unit::response_end(napi_env env, napi_callback_info info)
1009 {
1010     nxt_napi                 napi(env);
1011     napi_value               this_arg;
1012     req_data_t               *req_data;
1013     nxt_unit_request_info_t  *req;
1014 
1015     try {
1016         this_arg = napi.get_cb_info(info);
1017 
1018         req = napi.get_request_info(this_arg);
1019 
1020         req_data = (req_data_t *) req->data;
1021 
1022         napi.remove_wrap(req_data->sock_ref);
1023         napi.remove_wrap(req_data->req_ref);
1024         napi.remove_wrap(req_data->resp_ref);
1025         napi.remove_wrap(req_data->conn_ref);
1026 
1027     } catch (exception &e) {
1028         napi.throw_error(e);
1029         return nullptr;
1030     }
1031 
1032     nxt_unit_request_done(req, NXT_UNIT_OK);
1033 
1034     return this_arg;
1035 }
1036 
1037 
1038 napi_value
websocket_send_frame(napi_env env,napi_callback_info info)1039 Unit::websocket_send_frame(napi_env env, napi_callback_info info)
1040 {
1041     int                      ret, iovec_len;
1042     bool                     fin;
1043     size_t                   buf_len;
1044     uint32_t                 opcode, sc;
1045     nxt_napi                 napi(env);
1046     napi_value               this_arg, frame, payload;
1047     nxt_unit_request_info_t  *req;
1048     char                     status_code[2];
1049     struct iovec             iov[2];
1050 
1051     iovec_len = 0;
1052 
1053     try {
1054         this_arg = napi.get_cb_info(info, frame);
1055 
1056         req = napi.get_request_info(this_arg);
1057 
1058         opcode = napi.get_value_uint32(napi.get_named_property(frame,
1059                                                                "opcode"));
1060         if (opcode == NXT_WEBSOCKET_OP_CLOSE) {
1061             sc = napi.get_value_uint32(napi.get_named_property(frame,
1062                                                                "closeStatus"));
1063             status_code[0] = (sc >> 8) & 0xFF;
1064             status_code[1] = sc & 0xFF;
1065 
1066             iov[iovec_len].iov_base = status_code;
1067             iov[iovec_len].iov_len = 2;
1068             iovec_len++;
1069         }
1070 
1071         try {
1072             fin = napi.get_value_bool(napi.get_named_property(frame, "fin"));
1073 
1074         } catch (exception &e) {
1075             fin = true;
1076         }
1077 
1078         payload = napi.get_named_property(frame, "binaryPayload");
1079 
1080         if (napi.is_buffer(payload)) {
1081             iov[iovec_len].iov_base = napi.get_buffer_info(payload, buf_len);
1082 
1083         } else {
1084             buf_len = 0;
1085         }
1086 
1087     } catch (exception &e) {
1088         napi.throw_error(e);
1089         return nullptr;
1090     }
1091 
1092     if (buf_len > 0) {
1093         iov[iovec_len].iov_len = buf_len;
1094         iovec_len++;
1095     }
1096 
1097     ret = nxt_unit_websocket_sendv(req, opcode, fin ? 1 : 0, iov, iovec_len);
1098     if (ret != NXT_UNIT_OK) {
1099         goto failed;
1100     }
1101 
1102     return this_arg;
1103 
1104 failed:
1105 
1106     napi.throw_error("Failed to send frame");
1107 
1108     return nullptr;
1109 }
1110 
1111 
1112 napi_value
websocket_set_sock(napi_env env,napi_callback_info info)1113 Unit::websocket_set_sock(napi_env env, napi_callback_info info)
1114 {
1115     nxt_napi                 napi(env);
1116     napi_value               this_arg, sock;
1117     req_data_t               *req_data;
1118     nxt_unit_request_info_t  *req;
1119 
1120     try {
1121         this_arg = napi.get_cb_info(info, sock);
1122 
1123         req = napi.get_request_info(sock);
1124 
1125         req_data = (req_data_t *) req->data;
1126         req_data->conn_ref = napi.wrap(this_arg, req, conn_destroy);
1127 
1128     } catch (exception &e) {
1129         napi.throw_error(e);
1130         return nullptr;
1131     }
1132 
1133     return this_arg;
1134 }
1135 
1136 
1137 void
conn_destroy(napi_env env,void * r,void * finalize_hint)1138 Unit::conn_destroy(napi_env env, void *r, void *finalize_hint)
1139 {
1140     nxt_unit_req_debug(NULL, "conn_destroy: %p", r);
1141 }
1142 
1143 
1144 void
sock_destroy(napi_env env,void * r,void * finalize_hint)1145 Unit::sock_destroy(napi_env env, void *r, void *finalize_hint)
1146 {
1147     nxt_unit_req_debug(NULL, "sock_destroy: %p", r);
1148 }
1149 
1150 
1151 void
req_destroy(napi_env env,void * r,void * finalize_hint)1152 Unit::req_destroy(napi_env env, void *r, void *finalize_hint)
1153 {
1154     nxt_unit_req_debug(NULL, "req_destroy: %p", r);
1155 }
1156 
1157 
1158 void
resp_destroy(napi_env env,void * r,void * finalize_hint)1159 Unit::resp_destroy(napi_env env, void *r, void *finalize_hint)
1160 {
1161     nxt_unit_req_debug(NULL, "resp_destroy: %p", r);
1162 }
1163