xref: /unit/src/nodejs/unit-http/unit.cpp (revision 1132:9ac5b5f33ed9)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #include "unit.h"
7 
8 #include <unistd.h>
9 #include <fcntl.h>
10 
11 #include <uv.h>
12 
13 #include <nxt_unit_websocket.h>
14 
15 
16 napi_ref Unit::constructor_;
17 
18 
19 struct nxt_nodejs_ctx_t {
20     nxt_unit_port_id_t  port_id;
21     uv_poll_t           poll;
22 };
23 
24 
25 struct req_data_t {
26     napi_ref  sock_ref;
27     napi_ref  resp_ref;
28     napi_ref  conn_ref;
29 };
30 
31 
32 Unit::Unit(napi_env env, napi_value jsthis):
33     nxt_napi(env),
34     wrapper_(wrap(jsthis, this, destroy)),
35     unit_ctx_(nullptr)
36 {
37     nxt_unit_debug(NULL, "Unit::Unit()");
38 }
39 
40 
41 Unit::~Unit()
42 {
43     delete_reference(wrapper_);
44 
45     nxt_unit_debug(NULL, "Unit::~Unit()");
46 }
47 
48 
49 napi_value
50 Unit::init(napi_env env, napi_value exports)
51 {
52     nxt_napi    napi(env);
53     napi_value  ctor;
54 
55     napi_property_descriptor  unit_props[] = {
56         { "createServer", 0, create_server, 0, 0, 0, napi_default, 0 },
57         { "listen", 0, listen, 0, 0, 0, napi_default, 0 },
58     };
59 
60     try {
61         ctor = napi.define_class("Unit", create, 2, unit_props);
62         constructor_ = napi.create_reference(ctor);
63 
64         napi.set_named_property(exports, "Unit", ctor);
65         napi.set_named_property(exports, "response_send_headers",
66                                 response_send_headers);
67         napi.set_named_property(exports, "response_write", response_write);
68         napi.set_named_property(exports, "response_end", response_end);
69         napi.set_named_property(exports, "websocket_send_frame",
70                                 websocket_send_frame);
71         napi.set_named_property(exports, "websocket_set_sock",
72                                 websocket_set_sock);
73 
74     } catch (exception &e) {
75         napi.throw_error(e);
76         return nullptr;
77     }
78 
79     return exports;
80 }
81 
82 
83 void
84 Unit::destroy(napi_env env, void *nativeObject, void *finalize_hint)
85 {
86     Unit  *obj = reinterpret_cast<Unit *>(nativeObject);
87 
88     delete obj;
89 }
90 
91 
92 napi_value
93 Unit::create(napi_env env, napi_callback_info info)
94 {
95     nxt_napi    napi(env);
96     napi_value  target, ctor, instance, jsthis;
97 
98     try {
99         target = napi.get_new_target(info);
100 
101         if (target != nullptr) {
102             /* Invoked as constructor: `new Unit(...)`. */
103             jsthis = napi.get_cb_info(info);
104 
105             new Unit(env, jsthis);
106             napi.create_reference(jsthis);
107 
108             return jsthis;
109         }
110 
111         /* Invoked as plain function `Unit(...)`, turn into construct call. */
112         ctor = napi.get_reference_value(constructor_);
113         instance = napi.new_instance(ctor);
114         napi.create_reference(instance);
115 
116     } catch (exception &e) {
117         napi.throw_error(e);
118         return nullptr;
119     }
120 
121     return instance;
122 }
123 
124 
125 napi_value
126 Unit::create_server(napi_env env, napi_callback_info info)
127 {
128     Unit             *obj;
129     size_t           argc;
130     nxt_napi         napi(env);
131     napi_value       jsthis, argv;
132     nxt_unit_init_t  unit_init;
133 
134     argc = 1;
135 
136     try {
137         jsthis = napi.get_cb_info(info, argc, &argv);
138         obj = (Unit *) napi.unwrap(jsthis);
139 
140     } catch (exception &e) {
141         napi.throw_error(e);
142         return nullptr;
143     }
144 
145     memset(&unit_init, 0, sizeof(nxt_unit_init_t));
146 
147     unit_init.data = obj;
148     unit_init.callbacks.request_handler   = request_handler_cb;
149     unit_init.callbacks.websocket_handler = websocket_handler_cb;
150     unit_init.callbacks.close_handler     = close_handler_cb;
151     unit_init.callbacks.add_port          = add_port;
152     unit_init.callbacks.remove_port       = remove_port;
153     unit_init.callbacks.quit              = quit_cb;
154 
155     unit_init.request_data_size = sizeof(req_data_t);
156 
157     obj->unit_ctx_ = nxt_unit_init(&unit_init);
158     if (obj->unit_ctx_ == NULL) {
159         goto failed;
160     }
161 
162     return nullptr;
163 
164 failed:
165 
166     napi_throw_error(env, NULL, "Failed to create Unit object");
167 
168     return nullptr;
169 }
170 
171 
172 napi_value
173 Unit::listen(napi_env env, napi_callback_info info)
174 {
175     return nullptr;
176 }
177 
178 
179 void
180 Unit::request_handler_cb(nxt_unit_request_info_t *req)
181 {
182     Unit  *obj;
183 
184     obj = reinterpret_cast<Unit *>(req->unit->data);
185 
186     obj->request_handler(req);
187 }
188 
189 
190 void
191 Unit::request_handler(nxt_unit_request_info_t *req)
192 {
193     napi_value  socket, request, response, server_obj, emit_request;
194 
195     memset(req->data, 0, sizeof(req_data_t));
196 
197     try {
198         nxt_handle_scope  scope(env());
199 
200         server_obj = get_server_object();
201 
202         socket = create_socket(server_obj, req);
203         request = create_request(server_obj, socket);
204         response = create_response(server_obj, request, req);
205 
206         create_headers(req, request);
207 
208         emit_request = get_named_property(server_obj, "emit_request");
209 
210         nxt_async_context   async_context(env(), "request_handler");
211         nxt_callback_scope  async_scope(async_context);
212 
213         make_callback(async_context, server_obj, emit_request, request,
214                       response);
215 
216     } catch (exception &e) {
217         nxt_unit_req_warn(req, "request_handler: %s", e.str);
218     }
219 }
220 
221 
222 void
223 Unit::websocket_handler_cb(nxt_unit_websocket_frame_t *ws)
224 {
225     Unit  *obj;
226 
227     obj = reinterpret_cast<Unit *>(ws->req->unit->data);
228 
229     obj->websocket_handler(ws);
230 }
231 
232 
233 void
234 Unit::websocket_handler(nxt_unit_websocket_frame_t *ws)
235 {
236     napi_value  frame, server_obj, process_frame, conn;
237     req_data_t  *req_data;
238 
239     req_data = (req_data_t *) ws->req->data;
240 
241     try {
242         nxt_handle_scope  scope(env());
243 
244         server_obj = get_server_object();
245 
246         frame = create_websocket_frame(server_obj, ws);
247 
248         conn = get_reference_value(req_data->conn_ref);
249 
250         process_frame = get_named_property(conn, "processFrame");
251 
252         nxt_async_context   async_context(env(), "websocket_handler");
253         nxt_callback_scope  async_scope(async_context);
254 
255         make_callback(async_context, conn, process_frame, frame);
256 
257     } catch (exception &e) {
258         nxt_unit_req_warn(ws->req, "websocket_handler: %s", e.str);
259     }
260 
261     nxt_unit_websocket_done(ws);
262 }
263 
264 
265 void
266 Unit::close_handler_cb(nxt_unit_request_info_t *req)
267 {
268     Unit  *obj;
269 
270     obj = reinterpret_cast<Unit *>(req->unit->data);
271 
272     obj->close_handler(req);
273 }
274 
275 
276 void
277 Unit::close_handler(nxt_unit_request_info_t *req)
278 {
279     napi_value  conn_handle_close, conn;
280     req_data_t  *req_data;
281 
282     req_data = (req_data_t *) req->data;
283 
284     try {
285         nxt_handle_scope  scope(env());
286 
287         conn = get_reference_value(req_data->conn_ref);
288 
289         conn_handle_close = get_named_property(conn, "handleSocketClose");
290 
291         nxt_async_context   async_context(env(), "close_handler");
292         nxt_callback_scope  async_scope(async_context);
293 
294         make_callback(async_context, conn, conn_handle_close,
295                       nxt_napi::create(0));
296 
297         remove_wrap(req_data->sock_ref);
298         remove_wrap(req_data->resp_ref);
299         remove_wrap(req_data->conn_ref);
300 
301     } catch (exception &e) {
302         nxt_unit_req_warn(req, "close_handler: %s", e.str);
303 
304         return;
305     }
306 
307     nxt_unit_request_done(req, NXT_UNIT_OK);
308 }
309 
310 
311 static void
312 nxt_uv_read_callback(uv_poll_t *handle, int status, int events)
313 {
314     nxt_unit_run_once((nxt_unit_ctx_t *) handle->data);
315 }
316 
317 
318 int
319 Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
320 {
321     int               err;
322     Unit              *obj;
323     uv_loop_t         *loop;
324     napi_status       status;
325     nxt_nodejs_ctx_t  *node_ctx;
326 
327     if (port->in_fd != -1) {
328         obj = reinterpret_cast<Unit *>(ctx->unit->data);
329 
330         if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) {
331             nxt_unit_warn(ctx, "fcntl(%d, O_NONBLOCK) failed: %s (%d)",
332                           port->in_fd, strerror(errno), errno);
333             return -1;
334         }
335 
336         status = napi_get_uv_event_loop(obj->env(), &loop);
337         if (status != napi_ok) {
338             nxt_unit_warn(ctx, "Failed to get uv.loop");
339             return NXT_UNIT_ERROR;
340         }
341 
342         node_ctx = new nxt_nodejs_ctx_t;
343 
344         err = uv_poll_init(loop, &node_ctx->poll, port->in_fd);
345         if (err < 0) {
346             nxt_unit_warn(ctx, "Failed to init uv.poll");
347             return NXT_UNIT_ERROR;
348         }
349 
350         err = uv_poll_start(&node_ctx->poll, UV_READABLE, nxt_uv_read_callback);
351         if (err < 0) {
352             nxt_unit_warn(ctx, "Failed to start uv.poll");
353             return NXT_UNIT_ERROR;
354         }
355 
356         ctx->data = node_ctx;
357 
358         node_ctx->port_id = port->id;
359         node_ctx->poll.data = ctx;
360     }
361 
362     return nxt_unit_add_port(ctx, port);
363 }
364 
365 
366 inline bool
367 operator == (const nxt_unit_port_id_t &p1, const nxt_unit_port_id_t &p2)
368 {
369     return p1.pid == p2.pid && p1.id == p2.id;
370 }
371 
372 
373 void
374 Unit::remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
375 {
376     nxt_nodejs_ctx_t  *node_ctx;
377 
378     if (ctx->data != NULL) {
379         node_ctx = (nxt_nodejs_ctx_t *) ctx->data;
380 
381         if (node_ctx->port_id == *port_id) {
382             uv_poll_stop(&node_ctx->poll);
383 
384             delete node_ctx;
385 
386             ctx->data = NULL;
387         }
388     }
389 
390     nxt_unit_remove_port(ctx, port_id);
391 }
392 
393 
394 void
395 Unit::quit_cb(nxt_unit_ctx_t *ctx)
396 {
397     Unit  *obj;
398 
399     obj = reinterpret_cast<Unit *>(ctx->unit->data);
400 
401     obj->quit(ctx);
402 }
403 
404 
405 void
406 Unit::quit(nxt_unit_ctx_t *ctx)
407 {
408     napi_value  server_obj, emit_close;
409 
410     try {
411         nxt_handle_scope  scope(env());
412 
413         server_obj = get_server_object();
414 
415         emit_close = get_named_property(server_obj, "emit_close");
416 
417         nxt_async_context   async_context(env(), "unit_quit");
418         nxt_callback_scope  async_scope(async_context);
419 
420         make_callback(async_context, server_obj, emit_close);
421 
422     } catch (exception &e) {
423         nxt_unit_debug(ctx, "quit: %s", e.str);
424     }
425 
426     nxt_unit_done(ctx);
427 }
428 
429 
430 napi_value
431 Unit::get_server_object()
432 {
433     napi_value  unit_obj;
434 
435     unit_obj = get_reference_value(wrapper_);
436 
437     return get_named_property(unit_obj, "server");
438 }
439 
440 
441 void
442 Unit::create_headers(nxt_unit_request_info_t *req, napi_value request)
443 {
444     void                *data;
445     uint32_t            i;
446     napi_value          headers, raw_headers, buffer;
447     napi_status         status;
448     nxt_unit_request_t  *r;
449 
450     r = req->request;
451 
452     headers = create_object();
453 
454     status = napi_create_array_with_length(env(), r->fields_count * 2,
455                                            &raw_headers);
456     if (status != napi_ok) {
457         throw exception("Failed to create array");
458     }
459 
460     for (i = 0; i < r->fields_count; i++) {
461         append_header(r->fields + i, headers, raw_headers, i);
462     }
463 
464     set_named_property(request, "headers", headers);
465     set_named_property(request, "rawHeaders", raw_headers);
466     set_named_property(request, "httpVersion", r->version, r->version_length);
467     set_named_property(request, "method", r->method, r->method_length);
468     set_named_property(request, "url", r->target, r->target_length);
469 
470     set_named_property(request, "_websocket_handshake", r->websocket_handshake);
471 
472     buffer = create_buffer((size_t) req->content_length, &data);
473     nxt_unit_request_read(req, data, req->content_length);
474 
475     set_named_property(request, "_data", buffer);
476 }
477 
478 
479 inline char
480 lowcase(char c)
481 {
482     return (c >= 'A' && c <= 'Z') ? (c | 0x20) : c;
483 }
484 
485 
486 inline void
487 Unit::append_header(nxt_unit_field_t *f, napi_value headers,
488     napi_value raw_headers, uint32_t idx)
489 {
490     char        *name;
491     uint8_t     i;
492     napi_value  str, vstr;
493 
494     name = (char *) nxt_unit_sptr_get(&f->name);
495 
496     str = create_string_latin1(name, f->name_length);
497 
498     for (i = 0; i < f->name_length; i++) {
499         name[i] = lowcase(name[i]);
500     }
501 
502     vstr = set_named_property(headers, name, f->value, f->value_length);
503 
504     set_element(raw_headers, idx * 2, str);
505     set_element(raw_headers, idx * 2 + 1, vstr);
506 }
507 
508 
509 napi_value
510 Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req)
511 {
512     napi_value          constructor, res;
513     req_data_t          *req_data;
514     nxt_unit_request_t  *r;
515 
516     r = req->request;
517 
518     constructor = get_named_property(server_obj, "Socket");
519 
520     res = new_instance(constructor);
521 
522     req_data = (req_data_t *) req->data;
523     req_data->sock_ref = wrap(res, req, sock_destroy);
524 
525     set_named_property(res, "remoteAddress", r->remote, r->remote_length);
526     set_named_property(res, "localAddress", r->local, r->local_length);
527 
528     return res;
529 }
530 
531 
532 napi_value
533 Unit::create_request(napi_value server_obj, napi_value socket)
534 {
535     napi_value  constructor;
536 
537     constructor = get_named_property(server_obj, "ServerRequest");
538 
539     return new_instance(constructor, server_obj, socket);
540 }
541 
542 
543 napi_value
544 Unit::create_response(napi_value server_obj, napi_value request,
545     nxt_unit_request_info_t *req)
546 {
547     napi_value  constructor, res;
548     req_data_t  *req_data;
549 
550     constructor = get_named_property(server_obj, "ServerResponse");
551 
552     res = new_instance(constructor, request);
553 
554     req_data = (req_data_t *) req->data;
555     req_data->resp_ref = wrap(res, req, resp_destroy);
556 
557     return res;
558 }
559 
560 
561 napi_value
562 Unit::create_websocket_frame(napi_value server_obj,
563                              nxt_unit_websocket_frame_t *ws)
564 {
565     void        *data;
566     napi_value  constructor, res, buffer;
567     uint8_t     sc[2];
568 
569     constructor = get_named_property(server_obj, "WebSocketFrame");
570 
571     res = new_instance(constructor);
572 
573     set_named_property(res, "fin", (bool) ws->header->fin);
574     set_named_property(res, "opcode", ws->header->opcode);
575     set_named_property(res, "length", (int64_t) ws->payload_len);
576 
577     if (ws->header->opcode == NXT_WEBSOCKET_OP_CLOSE) {
578         if (ws->payload_len >= 2) {
579             nxt_unit_websocket_read(ws, sc, 2);
580 
581             set_named_property(res, "closeStatus",
582                                (((uint16_t) sc[0]) << 8) | sc[1]);
583 
584         } else {
585             set_named_property(res, "closeStatus", -1);
586         }
587     }
588 
589     buffer = create_buffer((size_t) ws->content_length, &data);
590     nxt_unit_websocket_read(ws, data, ws->content_length);
591 
592     set_named_property(res, "binaryPayload", buffer);
593 
594     return res;
595 }
596 
597 
598 napi_value
599 Unit::response_send_headers(napi_env env, napi_callback_info info)
600 {
601     int                      ret;
602     char                     *ptr, *name_ptr;
603     bool                     is_array;
604     size_t                   argc, name_len, value_len;
605     uint32_t                 status_code, header_len, keys_len, array_len;
606     uint32_t                 keys_count, i, j;
607     uint16_t                 hash;
608     nxt_napi                 napi(env);
609     napi_value               this_arg, headers, keys, name, value, array_val;
610     napi_value               array_entry;
611     napi_valuetype           val_type;
612     nxt_unit_field_t         *f;
613     nxt_unit_request_info_t  *req;
614     napi_value               argv[4];
615 
616     argc = 4;
617 
618     try {
619         this_arg = napi.get_cb_info(info, argc, argv);
620         if (argc != 4) {
621             napi.throw_error("Wrong args count. Expected: "
622                              "statusCode, headers, headers count, "
623                              "headers length");
624             return nullptr;
625         }
626 
627         req = napi.get_request_info(this_arg);
628         status_code = napi.get_value_uint32(argv[0]);
629         keys_count = napi.get_value_uint32(argv[2]);
630         header_len = napi.get_value_uint32(argv[3]);
631 
632         /* Need to reserve extra byte for C-string 0-termination. */
633         header_len++;
634 
635         headers = argv[1];
636 
637         ret = nxt_unit_response_init(req, status_code, keys_count, header_len);
638         if (ret != NXT_UNIT_OK) {
639             napi.throw_error("Failed to create response");
640             return nullptr;
641         }
642 
643         keys = napi.get_property_names(headers);
644         keys_len = napi.get_array_length(keys);
645 
646         ptr = req->response_buf->free;
647 
648         for (i = 0; i < keys_len; i++) {
649             name = napi.get_element(keys, i);
650 
651             array_entry = napi.get_property(headers, name);
652 
653             name = napi.get_element(array_entry, 0);
654             value = napi.get_element(array_entry, 1);
655 
656             name_len = napi.get_value_string_latin1(name, ptr, header_len);
657             name_ptr = ptr;
658 
659             ptr += name_len;
660             header_len -= name_len;
661 
662             hash = nxt_unit_field_hash(name_ptr, name_len);
663 
664             is_array = napi.is_array(value);
665 
666             if (is_array) {
667                 array_len = napi.get_array_length(value);
668 
669                 for (j = 0; j < array_len; j++) {
670                     array_val = napi.get_element(value, j);
671 
672                     val_type = napi.type_of(array_val);
673 
674                     if (val_type != napi_string) {
675                         array_val = napi.coerce_to_string(array_val);
676                     }
677 
678                     value_len = napi.get_value_string_latin1(array_val, ptr,
679                                                              header_len);
680 
681                     f = req->response->fields + req->response->fields_count;
682                     f->skip = 0;
683 
684                     nxt_unit_sptr_set(&f->name, name_ptr);
685 
686                     f->name_length = name_len;
687                     f->hash = hash;
688 
689                     nxt_unit_sptr_set(&f->value, ptr);
690                     f->value_length = (uint32_t) value_len;
691 
692                     ptr += value_len;
693                     header_len -= value_len;
694 
695                     req->response->fields_count++;
696                 }
697 
698             } else {
699                 val_type = napi.type_of(value);
700 
701                 if (val_type != napi_string) {
702                     value = napi.coerce_to_string(value);
703                 }
704 
705                 value_len = napi.get_value_string_latin1(value, ptr, header_len);
706 
707                 f = req->response->fields + req->response->fields_count;
708                 f->skip = 0;
709 
710                 nxt_unit_sptr_set(&f->name, name_ptr);
711 
712                 f->name_length = name_len;
713                 f->hash = hash;
714 
715                 nxt_unit_sptr_set(&f->value, ptr);
716                 f->value_length = (uint32_t) value_len;
717 
718                 ptr += value_len;
719                 header_len -= value_len;
720 
721                 req->response->fields_count++;
722             }
723         }
724 
725     } catch (exception &e) {
726         napi.throw_error(e);
727         return nullptr;
728     }
729 
730     req->response_buf->free = ptr;
731 
732     ret = nxt_unit_response_send(req);
733     if (ret != NXT_UNIT_OK) {
734         napi.throw_error("Failed to send response");
735         return nullptr;
736     }
737 
738     return this_arg;
739 }
740 
741 
742 napi_value
743 Unit::response_write(napi_env env, napi_callback_info info)
744 {
745     int                      ret;
746     void                     *ptr;
747     size_t                   argc, have_buf_len;
748     uint32_t                 buf_len;
749     nxt_napi                 napi(env);
750     napi_value               this_arg;
751     nxt_unit_buf_t           *buf;
752     napi_valuetype           buf_type;
753     nxt_unit_request_info_t  *req;
754     napi_value               argv[2];
755 
756     argc = 2;
757 
758     try {
759         this_arg = napi.get_cb_info(info, argc, argv);
760         if (argc != 2) {
761             throw exception("Wrong args count. Expected: "
762                             "chunk, chunk length");
763         }
764 
765         req = napi.get_request_info(this_arg);
766         buf_type = napi.type_of(argv[0]);
767         buf_len = napi.get_value_uint32(argv[1]) + 1;
768 
769         buf = nxt_unit_response_buf_alloc(req, buf_len);
770         if (buf == NULL) {
771             throw exception("Failed to allocate response buffer");
772         }
773 
774         if (buf_type == napi_string) {
775             /* TODO: will work only for utf8 content-type */
776 
777             have_buf_len = napi.get_value_string_utf8(argv[0], buf->free,
778                                                       buf_len);
779 
780         } else {
781             ptr = napi.get_buffer_info(argv[0], have_buf_len);
782 
783             memcpy(buf->free, ptr, have_buf_len);
784         }
785 
786         buf->free += have_buf_len;
787 
788         ret = nxt_unit_buf_send(buf);
789         if (ret != NXT_UNIT_OK) {
790             throw exception("Failed to send body buf");
791         }
792     } catch (exception &e) {
793         napi.throw_error(e);
794         return nullptr;
795     }
796 
797     return this_arg;
798 }
799 
800 
801 napi_value
802 Unit::response_end(napi_env env, napi_callback_info info)
803 {
804     nxt_napi                 napi(env);
805     napi_value               this_arg;
806     req_data_t               *req_data;
807     nxt_unit_request_info_t  *req;
808 
809     try {
810         this_arg = napi.get_cb_info(info);
811 
812         req = napi.get_request_info(this_arg);
813 
814         req_data = (req_data_t *) req->data;
815 
816         napi.remove_wrap(req_data->sock_ref);
817         napi.remove_wrap(req_data->resp_ref);
818         napi.remove_wrap(req_data->conn_ref);
819 
820     } catch (exception &e) {
821         napi.throw_error(e);
822         return nullptr;
823     }
824 
825     nxt_unit_request_done(req, NXT_UNIT_OK);
826 
827     return this_arg;
828 }
829 
830 
831 napi_value
832 Unit::websocket_send_frame(napi_env env, napi_callback_info info)
833 {
834     int                      ret, iovec_len;
835     bool                     fin;
836     size_t                   buf_len;
837     uint32_t                 opcode, sc;
838     nxt_napi                 napi(env);
839     napi_value               this_arg, frame, payload;
840     nxt_unit_request_info_t  *req;
841     char                     status_code[2];
842     struct iovec             iov[2];
843 
844     iovec_len = 0;
845 
846     try {
847         this_arg = napi.get_cb_info(info, frame);
848 
849         req = napi.get_request_info(this_arg);
850 
851         opcode = napi.get_value_uint32(napi.get_named_property(frame,
852                                                                "opcode"));
853         if (opcode == NXT_WEBSOCKET_OP_CLOSE) {
854             sc = napi.get_value_uint32(napi.get_named_property(frame,
855                                                                "closeStatus"));
856             status_code[0] = (sc >> 8) & 0xFF;
857             status_code[1] = sc & 0xFF;
858 
859             iov[iovec_len].iov_base = status_code;
860             iov[iovec_len].iov_len = 2;
861             iovec_len++;
862         }
863 
864         try {
865             fin = napi.get_value_bool(napi.get_named_property(frame, "fin"));
866 
867         } catch (exception &e) {
868             fin = true;
869         }
870 
871         payload = napi.get_named_property(frame, "binaryPayload");
872 
873         if (napi.is_buffer(payload)) {
874             iov[iovec_len].iov_base = napi.get_buffer_info(payload, buf_len);
875 
876         } else {
877             buf_len = 0;
878         }
879 
880     } catch (exception &e) {
881         napi.throw_error(e);
882         return nullptr;
883     }
884 
885     if (buf_len > 0) {
886         iov[iovec_len].iov_len = buf_len;
887         iovec_len++;
888     }
889 
890     ret = nxt_unit_websocket_sendv(req, opcode, fin ? 1 : 0, iov, iovec_len);
891     if (ret != NXT_UNIT_OK) {
892         goto failed;
893     }
894 
895     return this_arg;
896 
897 failed:
898 
899     napi.throw_error("Failed to send frame");
900 
901     return nullptr;
902 }
903 
904 
905 napi_value
906 Unit::websocket_set_sock(napi_env env, napi_callback_info info)
907 {
908     nxt_napi                 napi(env);
909     napi_value               this_arg, sock;
910     req_data_t               *req_data;
911     nxt_unit_request_info_t  *req;
912 
913     try {
914         this_arg = napi.get_cb_info(info, sock);
915 
916         req = napi.get_request_info(sock);
917 
918         req_data = (req_data_t *) req->data;
919         req_data->conn_ref = napi.wrap(this_arg, req, conn_destroy);
920 
921     } catch (exception &e) {
922         napi.throw_error(e);
923         return nullptr;
924     }
925 
926     return this_arg;
927 }
928 
929 
930 void
931 Unit::conn_destroy(napi_env env, void *nativeObject, void *finalize_hint)
932 {
933     nxt_unit_request_info_t  *req;
934 
935     req = (nxt_unit_request_info_t *) nativeObject;
936 
937     nxt_unit_warn(NULL, "conn_destroy: %p", req);
938 }
939 
940 
941 void
942 Unit::sock_destroy(napi_env env, void *nativeObject, void *finalize_hint)
943 {
944     nxt_unit_request_info_t  *req;
945 
946     req = (nxt_unit_request_info_t *) nativeObject;
947 
948     nxt_unit_warn(NULL, "sock_destroy: %p", req);
949 }
950 
951 
952 void
953 Unit::resp_destroy(napi_env env, void *nativeObject, void *finalize_hint)
954 {
955     nxt_unit_request_info_t  *req;
956 
957     req = (nxt_unit_request_info_t *) nativeObject;
958 
959     nxt_unit_warn(NULL, "resp_destroy: %p", req);
960 }
961