xref: /unit/src/nodejs/unit-http/unit.cpp (revision 1766:9ec17030b67e)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #include "unit.h"
7 
8 #include <unistd.h>
9 #include <fcntl.h>
10 
11 #include <uv.h>
12 
13 #include <nxt_unit_websocket.h>
14 
15 
16 static void delete_port_data(uv_handle_t* handle);
17 
18 napi_ref Unit::constructor_;
19 
20 
21 struct port_data_t {
22     nxt_unit_ctx_t      *ctx;
23     nxt_unit_port_t     *port;
24     uv_poll_t           poll;
25 };
26 
27 
28 struct req_data_t {
29     napi_ref  sock_ref;
30     napi_ref  req_ref;
31     napi_ref  resp_ref;
32     napi_ref  conn_ref;
33 };
34 
35 
36 Unit::Unit(napi_env env, napi_value jsthis):
37     nxt_napi(env),
38     wrapper_(wrap(jsthis, this, destroy)),
39     unit_ctx_(nullptr)
40 {
41     nxt_unit_debug(NULL, "Unit::Unit()");
42 }
43 
44 
45 Unit::~Unit()
46 {
47     delete_reference(wrapper_);
48 
49     nxt_unit_debug(NULL, "Unit::~Unit()");
50 }
51 
52 
53 napi_value
54 Unit::init(napi_env env, napi_value exports)
55 {
56     nxt_napi    napi(env);
57     napi_value  ctor;
58 
59     napi_property_descriptor  unit_props[] = {
60         { "createServer", 0, create_server, 0, 0, 0, napi_default, 0 },
61         { "listen", 0, listen, 0, 0, 0, napi_default, 0 },
62     };
63 
64     try {
65         ctor = napi.define_class("Unit", create, 2, unit_props);
66         constructor_ = napi.create_reference(ctor);
67 
68         napi.set_named_property(exports, "Unit", ctor);
69         napi.set_named_property(exports, "request_read", request_read);
70         napi.set_named_property(exports, "response_send_headers",
71                                 response_send_headers);
72         napi.set_named_property(exports, "response_write", response_write);
73         napi.set_named_property(exports, "response_end", response_end);
74         napi.set_named_property(exports, "websocket_send_frame",
75                                 websocket_send_frame);
76         napi.set_named_property(exports, "websocket_set_sock",
77                                 websocket_set_sock);
78         napi.set_named_property(exports, "buf_min", nxt_unit_buf_min());
79         napi.set_named_property(exports, "buf_max", nxt_unit_buf_max());
80 
81     } catch (exception &e) {
82         napi.throw_error(e);
83         return nullptr;
84     }
85 
86     return exports;
87 }
88 
89 
90 void
91 Unit::destroy(napi_env env, void *nativeObject, void *finalize_hint)
92 {
93     Unit  *obj = reinterpret_cast<Unit *>(nativeObject);
94 
95     delete obj;
96 }
97 
98 
99 napi_value
100 Unit::create(napi_env env, napi_callback_info info)
101 {
102     nxt_napi    napi(env);
103     napi_value  target, ctor, instance, jsthis;
104 
105     try {
106         target = napi.get_new_target(info);
107 
108         if (target != nullptr) {
109             /* Invoked as constructor: `new Unit(...)`. */
110             jsthis = napi.get_cb_info(info);
111 
112             new Unit(env, jsthis);
113             napi.create_reference(jsthis);
114 
115             return jsthis;
116         }
117 
118         /* Invoked as plain function `Unit(...)`, turn into construct call. */
119         ctor = napi.get_reference_value(constructor_);
120         instance = napi.new_instance(ctor);
121         napi.create_reference(instance);
122 
123     } catch (exception &e) {
124         napi.throw_error(e);
125         return nullptr;
126     }
127 
128     return instance;
129 }
130 
131 
132 napi_value
133 Unit::create_server(napi_env env, napi_callback_info info)
134 {
135     Unit             *obj;
136     size_t           argc;
137     nxt_napi         napi(env);
138     napi_value       jsthis, argv;
139     nxt_unit_init_t  unit_init;
140 
141     argc = 1;
142 
143     try {
144         jsthis = napi.get_cb_info(info, argc, &argv);
145         obj = (Unit *) napi.unwrap(jsthis);
146 
147     } catch (exception &e) {
148         napi.throw_error(e);
149         return nullptr;
150     }
151 
152     memset(&unit_init, 0, sizeof(nxt_unit_init_t));
153 
154     unit_init.data = obj;
155     unit_init.callbacks.request_handler   = request_handler_cb;
156     unit_init.callbacks.websocket_handler = websocket_handler_cb;
157     unit_init.callbacks.close_handler     = close_handler_cb;
158     unit_init.callbacks.shm_ack_handler   = shm_ack_handler_cb;
159     unit_init.callbacks.add_port          = add_port;
160     unit_init.callbacks.remove_port       = remove_port;
161     unit_init.callbacks.quit              = quit_cb;
162 
163     unit_init.request_data_size = sizeof(req_data_t);
164 
165     obj->unit_ctx_ = nxt_unit_init(&unit_init);
166     if (obj->unit_ctx_ == NULL) {
167         goto failed;
168     }
169 
170     return nullptr;
171 
172 failed:
173 
174     napi_throw_error(env, NULL, "Failed to create Unit object");
175 
176     return nullptr;
177 }
178 
179 
180 napi_value
181 Unit::listen(napi_env env, napi_callback_info info)
182 {
183     return nullptr;
184 }
185 
186 
187 void
188 Unit::request_handler_cb(nxt_unit_request_info_t *req)
189 {
190     Unit  *obj;
191 
192     obj = reinterpret_cast<Unit *>(req->unit->data);
193 
194     obj->request_handler(req);
195 }
196 
197 
198 void
199 Unit::request_handler(nxt_unit_request_info_t *req)
200 {
201     napi_value  socket, request, response, server_obj, emit_request;
202 
203     memset(req->data, 0, sizeof(req_data_t));
204 
205     try {
206         nxt_handle_scope  scope(env());
207 
208         server_obj = get_server_object();
209 
210         socket = create_socket(server_obj, req);
211         request = create_request(server_obj, socket, req);
212         response = create_response(server_obj, request, req);
213 
214         create_headers(req, request);
215 
216         emit_request = get_named_property(server_obj, "emit_request");
217 
218         nxt_async_context   async_context(env(), "request_handler");
219         nxt_callback_scope  async_scope(async_context);
220 
221         make_callback(async_context, server_obj, emit_request, request,
222                       response);
223 
224     } catch (exception &e) {
225         nxt_unit_req_warn(req, "request_handler: %s", e.str);
226     }
227 }
228 
229 
230 void
231 Unit::websocket_handler_cb(nxt_unit_websocket_frame_t *ws)
232 {
233     Unit  *obj;
234 
235     obj = reinterpret_cast<Unit *>(ws->req->unit->data);
236 
237     obj->websocket_handler(ws);
238 }
239 
240 
241 void
242 Unit::websocket_handler(nxt_unit_websocket_frame_t *ws)
243 {
244     napi_value  frame, server_obj, process_frame, conn;
245     req_data_t  *req_data;
246 
247     req_data = (req_data_t *) ws->req->data;
248 
249     try {
250         nxt_handle_scope  scope(env());
251 
252         server_obj = get_server_object();
253 
254         frame = create_websocket_frame(server_obj, ws);
255 
256         conn = get_reference_value(req_data->conn_ref);
257 
258         process_frame = get_named_property(conn, "processFrame");
259 
260         nxt_async_context   async_context(env(), "websocket_handler");
261         nxt_callback_scope  async_scope(async_context);
262 
263         make_callback(async_context, conn, process_frame, frame);
264 
265     } catch (exception &e) {
266         nxt_unit_req_warn(ws->req, "websocket_handler: %s", e.str);
267     }
268 
269     nxt_unit_websocket_done(ws);
270 }
271 
272 
273 void
274 Unit::close_handler_cb(nxt_unit_request_info_t *req)
275 {
276     Unit  *obj;
277 
278     obj = reinterpret_cast<Unit *>(req->unit->data);
279 
280     obj->close_handler(req);
281 }
282 
283 
284 void
285 Unit::close_handler(nxt_unit_request_info_t *req)
286 {
287     napi_value  conn_handle_close, conn;
288     req_data_t  *req_data;
289 
290     req_data = (req_data_t *) req->data;
291 
292     try {
293         nxt_handle_scope  scope(env());
294 
295         conn = get_reference_value(req_data->conn_ref);
296 
297         conn_handle_close = get_named_property(conn, "handleSocketClose");
298 
299         nxt_async_context   async_context(env(), "close_handler");
300         nxt_callback_scope  async_scope(async_context);
301 
302         make_callback(async_context, conn, conn_handle_close,
303                       nxt_napi::create(0));
304 
305         remove_wrap(req_data->sock_ref);
306         remove_wrap(req_data->req_ref);
307         remove_wrap(req_data->resp_ref);
308         remove_wrap(req_data->conn_ref);
309 
310     } catch (exception &e) {
311         nxt_unit_req_warn(req, "close_handler: %s", e.str);
312 
313         nxt_unit_request_done(req, NXT_UNIT_ERROR);
314 
315         return;
316     }
317 
318     nxt_unit_request_done(req, NXT_UNIT_OK);
319 }
320 
321 
322 void
323 Unit::shm_ack_handler_cb(nxt_unit_ctx_t *ctx)
324 {
325     Unit  *obj;
326 
327     obj = reinterpret_cast<Unit *>(ctx->unit->data);
328 
329     obj->shm_ack_handler(ctx);
330 }
331 
332 
333 void
334 Unit::shm_ack_handler(nxt_unit_ctx_t *ctx)
335 {
336     napi_value  server_obj, emit_drain;
337 
338     try {
339         nxt_handle_scope  scope(env());
340 
341         server_obj = get_server_object();
342 
343         emit_drain = get_named_property(server_obj, "emit_drain");
344 
345         nxt_async_context   async_context(env(), "shm_ack_handler");
346         nxt_callback_scope  async_scope(async_context);
347 
348         make_callback(async_context, server_obj, emit_drain);
349 
350     } catch (exception &e) {
351         nxt_unit_warn(ctx, "shm_ack_handler: %s", e.str);
352     }
353 }
354 
355 
356 static void
357 nxt_uv_read_callback(uv_poll_t *handle, int status, int events)
358 {
359     port_data_t  *data;
360 
361     data = (port_data_t *) handle->data;
362 
363     nxt_unit_process_port_msg(data->ctx, data->port);
364 }
365 
366 
367 int
368 Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
369 {
370     int               err;
371     Unit              *obj;
372     uv_loop_t         *loop;
373     port_data_t       *data;
374     napi_status       status;
375 
376     if (port->in_fd != -1) {
377         obj = reinterpret_cast<Unit *>(ctx->unit->data);
378 
379         if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) {
380             nxt_unit_warn(ctx, "fcntl(%d, O_NONBLOCK) failed: %s (%d)",
381                           port->in_fd, strerror(errno), errno);
382             return -1;
383         }
384 
385         status = napi_get_uv_event_loop(obj->env(), &loop);
386         if (status != napi_ok) {
387             nxt_unit_warn(ctx, "Failed to get uv.loop");
388             return NXT_UNIT_ERROR;
389         }
390 
391         data = new port_data_t;
392 
393         err = uv_poll_init(loop, &data->poll, port->in_fd);
394         if (err < 0) {
395             nxt_unit_warn(ctx, "Failed to init uv.poll");
396             return NXT_UNIT_ERROR;
397         }
398 
399         err = uv_poll_start(&data->poll, UV_READABLE, nxt_uv_read_callback);
400         if (err < 0) {
401             nxt_unit_warn(ctx, "Failed to start uv.poll");
402             return NXT_UNIT_ERROR;
403         }
404 
405         port->data = data;
406 
407         data->ctx = ctx;
408         data->port = port;
409         data->poll.data = data;
410     }
411 
412     return NXT_UNIT_OK;
413 }
414 
415 
416 void
417 Unit::remove_port(nxt_unit_t *unit, nxt_unit_port_t *port)
418 {
419     port_data_t  *data;
420 
421     if (port->data != NULL) {
422         data = (port_data_t *) port->data;
423 
424         if (data->port == port) {
425             uv_poll_stop(&data->poll);
426 
427             uv_close((uv_handle_t *) &data->poll, delete_port_data);
428         }
429     }
430 }
431 
432 
433 static void
434 delete_port_data(uv_handle_t* handle)
435 {
436     port_data_t  *data;
437 
438     data = (port_data_t *) handle->data;
439 
440     delete data;
441 }
442 
443 
444 void
445 Unit::quit_cb(nxt_unit_ctx_t *ctx)
446 {
447     Unit  *obj;
448 
449     obj = reinterpret_cast<Unit *>(ctx->unit->data);
450 
451     obj->quit(ctx);
452 }
453 
454 
455 void
456 Unit::quit(nxt_unit_ctx_t *ctx)
457 {
458     napi_value  server_obj, emit_close;
459 
460     try {
461         nxt_handle_scope  scope(env());
462 
463         server_obj = get_server_object();
464 
465         emit_close = get_named_property(server_obj, "emit_close");
466 
467         nxt_async_context   async_context(env(), "unit_quit");
468         nxt_callback_scope  async_scope(async_context);
469 
470         make_callback(async_context, server_obj, emit_close);
471 
472     } catch (exception &e) {
473         nxt_unit_debug(ctx, "quit: %s", e.str);
474     }
475 
476     nxt_unit_done(ctx);
477 }
478 
479 
480 napi_value
481 Unit::get_server_object()
482 {
483     napi_value  unit_obj;
484 
485     unit_obj = get_reference_value(wrapper_);
486 
487     return get_named_property(unit_obj, "server");
488 }
489 
490 
491 void
492 Unit::create_headers(nxt_unit_request_info_t *req, napi_value request)
493 {
494     uint32_t            i;
495     napi_value          headers, raw_headers;
496     napi_status         status;
497     nxt_unit_request_t  *r;
498 
499     r = req->request;
500 
501     headers = create_object();
502 
503     status = napi_create_array_with_length(env(), r->fields_count * 2,
504                                            &raw_headers);
505     if (status != napi_ok) {
506         throw exception("Failed to create array");
507     }
508 
509     for (i = 0; i < r->fields_count; i++) {
510         append_header(r->fields + i, headers, raw_headers, i);
511     }
512 
513     set_named_property(request, "headers", headers);
514     set_named_property(request, "rawHeaders", raw_headers);
515     set_named_property(request, "httpVersion", r->version, r->version_length);
516     set_named_property(request, "method", r->method, r->method_length);
517     set_named_property(request, "url", r->target, r->target_length);
518 
519     set_named_property(request, "_websocket_handshake", r->websocket_handshake);
520 }
521 
522 
523 inline char
524 lowcase(char c)
525 {
526     return (c >= 'A' && c <= 'Z') ? (c | 0x20) : c;
527 }
528 
529 
530 inline void
531 Unit::append_header(nxt_unit_field_t *f, napi_value headers,
532     napi_value raw_headers, uint32_t idx)
533 {
534     char        *name;
535     uint8_t     i;
536     napi_value  str, vstr;
537 
538     name = (char *) nxt_unit_sptr_get(&f->name);
539 
540     str = create_string_latin1(name, f->name_length);
541 
542     for (i = 0; i < f->name_length; i++) {
543         name[i] = lowcase(name[i]);
544     }
545 
546     vstr = set_named_property(headers, name, f->value, f->value_length);
547 
548     set_element(raw_headers, idx * 2, str);
549     set_element(raw_headers, idx * 2 + 1, vstr);
550 }
551 
552 
553 napi_value
554 Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req)
555 {
556     napi_value          constructor, res;
557     req_data_t          *req_data;
558     nxt_unit_request_t  *r;
559 
560     r = req->request;
561 
562     constructor = get_named_property(server_obj, "Socket");
563 
564     res = new_instance(constructor);
565 
566     req_data = (req_data_t *) req->data;
567     req_data->sock_ref = wrap(res, req, sock_destroy);
568 
569     set_named_property(res, "remoteAddress", r->remote, r->remote_length);
570     set_named_property(res, "localAddress", r->local, r->local_length);
571 
572     return res;
573 }
574 
575 
576 napi_value
577 Unit::create_request(napi_value server_obj, napi_value socket,
578     nxt_unit_request_info_t *req)
579 {
580     napi_value  constructor, res;
581     req_data_t  *req_data;
582 
583     constructor = get_named_property(server_obj, "ServerRequest");
584 
585     res = new_instance(constructor, server_obj, socket);
586 
587     req_data = (req_data_t *) req->data;
588     req_data->req_ref = wrap(res, req, req_destroy);
589 
590     return res;
591 }
592 
593 
594 napi_value
595 Unit::create_response(napi_value server_obj, napi_value request,
596     nxt_unit_request_info_t *req)
597 {
598     napi_value  constructor, res;
599     req_data_t  *req_data;
600 
601     constructor = get_named_property(server_obj, "ServerResponse");
602 
603     res = new_instance(constructor, request);
604 
605     req_data = (req_data_t *) req->data;
606     req_data->resp_ref = wrap(res, req, resp_destroy);
607 
608     return res;
609 }
610 
611 
612 napi_value
613 Unit::create_websocket_frame(napi_value server_obj,
614                              nxt_unit_websocket_frame_t *ws)
615 {
616     void        *data;
617     napi_value  constructor, res, buffer;
618     uint8_t     sc[2];
619 
620     constructor = get_named_property(server_obj, "WebSocketFrame");
621 
622     res = new_instance(constructor);
623 
624     set_named_property(res, "fin", (bool) ws->header->fin);
625     set_named_property(res, "opcode", ws->header->opcode);
626     set_named_property(res, "length", (int64_t) ws->payload_len);
627 
628     if (ws->header->opcode == NXT_WEBSOCKET_OP_CLOSE) {
629         if (ws->payload_len >= 2) {
630             nxt_unit_websocket_read(ws, sc, 2);
631 
632             set_named_property(res, "closeStatus",
633                                (((uint16_t) sc[0]) << 8) | sc[1]);
634 
635         } else {
636             set_named_property(res, "closeStatus", -1);
637         }
638     }
639 
640     buffer = create_buffer((size_t) ws->content_length, &data);
641     nxt_unit_websocket_read(ws, data, ws->content_length);
642 
643     set_named_property(res, "binaryPayload", buffer);
644 
645     return res;
646 }
647 
648 
649 napi_value
650 Unit::request_read(napi_env env, napi_callback_info info)
651 {
652     void                     *data;
653     uint32_t                 wm;
654     nxt_napi                 napi(env);
655     napi_value               this_arg, argv, buffer;
656     nxt_unit_request_info_t  *req;
657 
658     try {
659         this_arg = napi.get_cb_info(info, argv);
660 
661         try {
662             req = napi.get_request_info(this_arg);
663 
664         } catch (exception &e) {
665             return nullptr;
666         }
667 
668         if (req->content_length == 0) {
669             return nullptr;
670         }
671 
672         wm = napi.get_value_uint32(argv);
673 
674         if (wm > req->content_length) {
675             wm = req->content_length;
676         }
677 
678         buffer = napi.create_buffer((size_t) wm, &data);
679         nxt_unit_request_read(req, data, wm);
680 
681     } catch (exception &e) {
682         napi.throw_error(e);
683         return nullptr;
684     }
685 
686     return buffer;
687 }
688 
689 
690 napi_value
691 Unit::response_send_headers(napi_env env, napi_callback_info info)
692 {
693     int                      ret;
694     char                     *ptr, *name_ptr;
695     bool                     is_array;
696     size_t                   argc, name_len, value_len;
697     uint32_t                 status_code, header_len, keys_len, array_len;
698     uint32_t                 keys_count, i, j;
699     uint16_t                 hash;
700     nxt_napi                 napi(env);
701     napi_value               this_arg, headers, keys, name, value, array_val;
702     napi_value               array_entry;
703     napi_valuetype           val_type;
704     nxt_unit_field_t         *f;
705     nxt_unit_request_info_t  *req;
706     napi_value               argv[4];
707 
708     argc = 4;
709 
710     try {
711         this_arg = napi.get_cb_info(info, argc, argv);
712         if (argc != 4) {
713             napi.throw_error("Wrong args count. Expected: "
714                              "statusCode, headers, headers count, "
715                              "headers length");
716             return nullptr;
717         }
718 
719         req = napi.get_request_info(this_arg);
720         status_code = napi.get_value_uint32(argv[0]);
721         keys_count = napi.get_value_uint32(argv[2]);
722         header_len = napi.get_value_uint32(argv[3]);
723 
724         headers = argv[1];
725 
726         ret = nxt_unit_response_init(req, status_code, keys_count, header_len);
727         if (ret != NXT_UNIT_OK) {
728             napi.throw_error("Failed to create response");
729             return nullptr;
730         }
731 
732         /*
733          * Each name and value are 0-terminated by libunit.
734          * Need to add extra 2 bytes for each header.
735          */
736         header_len += keys_count * 2;
737 
738         keys = napi.get_property_names(headers);
739         keys_len = napi.get_array_length(keys);
740 
741         ptr = req->response_buf->free;
742 
743         for (i = 0; i < keys_len; i++) {
744             name = napi.get_element(keys, i);
745 
746             array_entry = napi.get_property(headers, name);
747 
748             name = napi.get_element(array_entry, 0);
749             value = napi.get_element(array_entry, 1);
750 
751             name_len = napi.get_value_string_latin1(name, ptr, header_len);
752             name_ptr = ptr;
753 
754             ptr += name_len + 1;
755             header_len -= name_len + 1;
756 
757             hash = nxt_unit_field_hash(name_ptr, name_len);
758 
759             is_array = napi.is_array(value);
760 
761             if (is_array) {
762                 array_len = napi.get_array_length(value);
763 
764                 for (j = 0; j < array_len; j++) {
765                     array_val = napi.get_element(value, j);
766 
767                     val_type = napi.type_of(array_val);
768 
769                     if (val_type != napi_string) {
770                         array_val = napi.coerce_to_string(array_val);
771                     }
772 
773                     value_len = napi.get_value_string_latin1(array_val, ptr,
774                                                              header_len);
775 
776                     f = req->response->fields + req->response->fields_count;
777                     f->skip = 0;
778 
779                     nxt_unit_sptr_set(&f->name, name_ptr);
780 
781                     f->name_length = name_len;
782                     f->hash = hash;
783 
784                     nxt_unit_sptr_set(&f->value, ptr);
785                     f->value_length = (uint32_t) value_len;
786 
787                     ptr += value_len + 1;
788                     header_len -= value_len + 1;
789 
790                     req->response->fields_count++;
791                 }
792 
793             } else {
794                 val_type = napi.type_of(value);
795 
796                 if (val_type != napi_string) {
797                     value = napi.coerce_to_string(value);
798                 }
799 
800                 value_len = napi.get_value_string_latin1(value, ptr, header_len);
801 
802                 f = req->response->fields + req->response->fields_count;
803                 f->skip = 0;
804 
805                 nxt_unit_sptr_set(&f->name, name_ptr);
806 
807                 f->name_length = name_len;
808                 f->hash = hash;
809 
810                 nxt_unit_sptr_set(&f->value, ptr);
811                 f->value_length = (uint32_t) value_len;
812 
813                 ptr += value_len + 1;
814                 header_len -= value_len + 1;
815 
816                 req->response->fields_count++;
817             }
818         }
819 
820     } catch (exception &e) {
821         napi.throw_error(e);
822         return nullptr;
823     }
824 
825     req->response_buf->free = ptr;
826 
827     ret = nxt_unit_response_send(req);
828     if (ret != NXT_UNIT_OK) {
829         napi.throw_error("Failed to send response");
830         return nullptr;
831     }
832 
833     return this_arg;
834 }
835 
836 
837 napi_value
838 Unit::response_write(napi_env env, napi_callback_info info)
839 {
840     int                      ret;
841     void                     *ptr;
842     size_t                   argc, have_buf_len;
843     ssize_t                  res_len;
844     uint32_t                 buf_start, buf_len;
845     nxt_napi                 napi(env);
846     napi_value               this_arg;
847     nxt_unit_buf_t           *buf;
848     napi_valuetype           buf_type;
849     nxt_unit_request_info_t  *req;
850     napi_value               argv[3];
851 
852     argc = 3;
853 
854     try {
855         this_arg = napi.get_cb_info(info, argc, argv);
856         if (argc != 3) {
857             throw exception("Wrong args count. Expected: "
858                             "chunk, start, length");
859         }
860 
861         req = napi.get_request_info(this_arg);
862         buf_type = napi.type_of(argv[0]);
863         buf_start = napi.get_value_uint32(argv[1]);
864         buf_len = napi.get_value_uint32(argv[2]) + 1;
865 
866         if (buf_type == napi_string) {
867             /* TODO: will work only for utf8 content-type */
868 
869             if (req->response_buf != NULL
870                 && req->response_buf->end >= req->response_buf->free + buf_len)
871             {
872                 buf = req->response_buf;
873 
874             } else {
875                 buf = nxt_unit_response_buf_alloc(req, buf_len);
876                 if (buf == NULL) {
877                     throw exception("Failed to allocate response buffer");
878                 }
879             }
880 
881             have_buf_len = napi.get_value_string_utf8(argv[0], buf->free,
882                                                       buf_len);
883 
884             buf->free += have_buf_len;
885 
886             ret = nxt_unit_buf_send(buf);
887             if (ret == NXT_UNIT_OK) {
888                 res_len = have_buf_len;
889             }
890 
891         } else {
892             ptr = napi.get_buffer_info(argv[0], have_buf_len);
893 
894             if (buf_start > 0) {
895                 ptr = ((uint8_t *) ptr) + buf_start;
896                 have_buf_len -= buf_start;
897             }
898 
899             res_len = nxt_unit_response_write_nb(req, ptr, have_buf_len, 0);
900 
901             ret = res_len < 0 ? -res_len : (int) NXT_UNIT_OK;
902         }
903 
904         if (ret != NXT_UNIT_OK) {
905             throw exception("Failed to send body buf");
906         }
907     } catch (exception &e) {
908         napi.throw_error(e);
909         return nullptr;
910     }
911 
912     return napi.create((int64_t) res_len);
913 }
914 
915 
916 napi_value
917 Unit::response_end(napi_env env, napi_callback_info info)
918 {
919     nxt_napi                 napi(env);
920     napi_value               this_arg;
921     req_data_t               *req_data;
922     nxt_unit_request_info_t  *req;
923 
924     try {
925         this_arg = napi.get_cb_info(info);
926 
927         req = napi.get_request_info(this_arg);
928 
929         req_data = (req_data_t *) req->data;
930 
931         napi.remove_wrap(req_data->sock_ref);
932         napi.remove_wrap(req_data->req_ref);
933         napi.remove_wrap(req_data->resp_ref);
934         napi.remove_wrap(req_data->conn_ref);
935 
936     } catch (exception &e) {
937         napi.throw_error(e);
938         return nullptr;
939     }
940 
941     nxt_unit_request_done(req, NXT_UNIT_OK);
942 
943     return this_arg;
944 }
945 
946 
947 napi_value
948 Unit::websocket_send_frame(napi_env env, napi_callback_info info)
949 {
950     int                      ret, iovec_len;
951     bool                     fin;
952     size_t                   buf_len;
953     uint32_t                 opcode, sc;
954     nxt_napi                 napi(env);
955     napi_value               this_arg, frame, payload;
956     nxt_unit_request_info_t  *req;
957     char                     status_code[2];
958     struct iovec             iov[2];
959 
960     iovec_len = 0;
961 
962     try {
963         this_arg = napi.get_cb_info(info, frame);
964 
965         req = napi.get_request_info(this_arg);
966 
967         opcode = napi.get_value_uint32(napi.get_named_property(frame,
968                                                                "opcode"));
969         if (opcode == NXT_WEBSOCKET_OP_CLOSE) {
970             sc = napi.get_value_uint32(napi.get_named_property(frame,
971                                                                "closeStatus"));
972             status_code[0] = (sc >> 8) & 0xFF;
973             status_code[1] = sc & 0xFF;
974 
975             iov[iovec_len].iov_base = status_code;
976             iov[iovec_len].iov_len = 2;
977             iovec_len++;
978         }
979 
980         try {
981             fin = napi.get_value_bool(napi.get_named_property(frame, "fin"));
982 
983         } catch (exception &e) {
984             fin = true;
985         }
986 
987         payload = napi.get_named_property(frame, "binaryPayload");
988 
989         if (napi.is_buffer(payload)) {
990             iov[iovec_len].iov_base = napi.get_buffer_info(payload, buf_len);
991 
992         } else {
993             buf_len = 0;
994         }
995 
996     } catch (exception &e) {
997         napi.throw_error(e);
998         return nullptr;
999     }
1000 
1001     if (buf_len > 0) {
1002         iov[iovec_len].iov_len = buf_len;
1003         iovec_len++;
1004     }
1005 
1006     ret = nxt_unit_websocket_sendv(req, opcode, fin ? 1 : 0, iov, iovec_len);
1007     if (ret != NXT_UNIT_OK) {
1008         goto failed;
1009     }
1010 
1011     return this_arg;
1012 
1013 failed:
1014 
1015     napi.throw_error("Failed to send frame");
1016 
1017     return nullptr;
1018 }
1019 
1020 
1021 napi_value
1022 Unit::websocket_set_sock(napi_env env, napi_callback_info info)
1023 {
1024     nxt_napi                 napi(env);
1025     napi_value               this_arg, sock;
1026     req_data_t               *req_data;
1027     nxt_unit_request_info_t  *req;
1028 
1029     try {
1030         this_arg = napi.get_cb_info(info, sock);
1031 
1032         req = napi.get_request_info(sock);
1033 
1034         req_data = (req_data_t *) req->data;
1035         req_data->conn_ref = napi.wrap(this_arg, req, conn_destroy);
1036 
1037     } catch (exception &e) {
1038         napi.throw_error(e);
1039         return nullptr;
1040     }
1041 
1042     return this_arg;
1043 }
1044 
1045 
1046 void
1047 Unit::conn_destroy(napi_env env, void *r, void *finalize_hint)
1048 {
1049     nxt_unit_req_debug(NULL, "conn_destroy: %p", r);
1050 }
1051 
1052 
1053 void
1054 Unit::sock_destroy(napi_env env, void *r, void *finalize_hint)
1055 {
1056     nxt_unit_req_debug(NULL, "sock_destroy: %p", r);
1057 }
1058 
1059 
1060 void
1061 Unit::req_destroy(napi_env env, void *r, void *finalize_hint)
1062 {
1063     nxt_unit_req_debug(NULL, "req_destroy: %p", r);
1064 }
1065 
1066 
1067 void
1068 Unit::resp_destroy(napi_env env, void *r, void *finalize_hint)
1069 {
1070     nxt_unit_req_debug(NULL, "resp_destroy: %p", r);
1071 }
1072