xref: /unit/src/nodejs/unit-http/unit.cpp (revision 1257:00eea530513c)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #include "unit.h"
7 
8 #include <unistd.h>
9 #include <fcntl.h>
10 
11 #include <uv.h>
12 
13 #include <nxt_unit_websocket.h>
14 
15 
16 napi_ref Unit::constructor_;
17 
18 
19 struct nxt_nodejs_ctx_t {
20     nxt_unit_port_id_t  port_id;
21     uv_poll_t           poll;
22 };
23 
24 
25 struct req_data_t {
26     napi_ref  sock_ref;
27     napi_ref  resp_ref;
28     napi_ref  conn_ref;
29 };
30 
31 
32 Unit::Unit(napi_env env, napi_value jsthis):
33     nxt_napi(env),
34     wrapper_(wrap(jsthis, this, destroy)),
35     unit_ctx_(nullptr)
36 {
37     nxt_unit_debug(NULL, "Unit::Unit()");
38 }
39 
40 
41 Unit::~Unit()
42 {
43     delete_reference(wrapper_);
44 
45     nxt_unit_debug(NULL, "Unit::~Unit()");
46 }
47 
48 
49 napi_value
50 Unit::init(napi_env env, napi_value exports)
51 {
52     nxt_napi    napi(env);
53     napi_value  ctor;
54 
55     napi_property_descriptor  unit_props[] = {
56         { "createServer", 0, create_server, 0, 0, 0, napi_default, 0 },
57         { "listen", 0, listen, 0, 0, 0, napi_default, 0 },
58     };
59 
60     try {
61         ctor = napi.define_class("Unit", create, 2, unit_props);
62         constructor_ = napi.create_reference(ctor);
63 
64         napi.set_named_property(exports, "Unit", ctor);
65         napi.set_named_property(exports, "response_send_headers",
66                                 response_send_headers);
67         napi.set_named_property(exports, "response_write", response_write);
68         napi.set_named_property(exports, "response_end", response_end);
69         napi.set_named_property(exports, "websocket_send_frame",
70                                 websocket_send_frame);
71         napi.set_named_property(exports, "websocket_set_sock",
72                                 websocket_set_sock);
73 
74     } catch (exception &e) {
75         napi.throw_error(e);
76         return nullptr;
77     }
78 
79     return exports;
80 }
81 
82 
83 void
84 Unit::destroy(napi_env env, void *nativeObject, void *finalize_hint)
85 {
86     Unit  *obj = reinterpret_cast<Unit *>(nativeObject);
87 
88     delete obj;
89 }
90 
91 
92 napi_value
93 Unit::create(napi_env env, napi_callback_info info)
94 {
95     nxt_napi    napi(env);
96     napi_value  target, ctor, instance, jsthis;
97 
98     try {
99         target = napi.get_new_target(info);
100 
101         if (target != nullptr) {
102             /* Invoked as constructor: `new Unit(...)`. */
103             jsthis = napi.get_cb_info(info);
104 
105             new Unit(env, jsthis);
106             napi.create_reference(jsthis);
107 
108             return jsthis;
109         }
110 
111         /* Invoked as plain function `Unit(...)`, turn into construct call. */
112         ctor = napi.get_reference_value(constructor_);
113         instance = napi.new_instance(ctor);
114         napi.create_reference(instance);
115 
116     } catch (exception &e) {
117         napi.throw_error(e);
118         return nullptr;
119     }
120 
121     return instance;
122 }
123 
124 
125 napi_value
126 Unit::create_server(napi_env env, napi_callback_info info)
127 {
128     Unit             *obj;
129     size_t           argc;
130     nxt_napi         napi(env);
131     napi_value       jsthis, argv;
132     nxt_unit_init_t  unit_init;
133 
134     argc = 1;
135 
136     try {
137         jsthis = napi.get_cb_info(info, argc, &argv);
138         obj = (Unit *) napi.unwrap(jsthis);
139 
140     } catch (exception &e) {
141         napi.throw_error(e);
142         return nullptr;
143     }
144 
145     memset(&unit_init, 0, sizeof(nxt_unit_init_t));
146 
147     unit_init.data = obj;
148     unit_init.callbacks.request_handler   = request_handler_cb;
149     unit_init.callbacks.websocket_handler = websocket_handler_cb;
150     unit_init.callbacks.close_handler     = close_handler_cb;
151     unit_init.callbacks.add_port          = add_port;
152     unit_init.callbacks.remove_port       = remove_port;
153     unit_init.callbacks.quit              = quit_cb;
154 
155     unit_init.request_data_size = sizeof(req_data_t);
156 
157     obj->unit_ctx_ = nxt_unit_init(&unit_init);
158     if (obj->unit_ctx_ == NULL) {
159         goto failed;
160     }
161 
162     return nullptr;
163 
164 failed:
165 
166     napi_throw_error(env, NULL, "Failed to create Unit object");
167 
168     return nullptr;
169 }
170 
171 
172 napi_value
173 Unit::listen(napi_env env, napi_callback_info info)
174 {
175     return nullptr;
176 }
177 
178 
179 void
180 Unit::request_handler_cb(nxt_unit_request_info_t *req)
181 {
182     Unit  *obj;
183 
184     obj = reinterpret_cast<Unit *>(req->unit->data);
185 
186     obj->request_handler(req);
187 }
188 
189 
190 void
191 Unit::request_handler(nxt_unit_request_info_t *req)
192 {
193     napi_value  socket, request, response, server_obj, emit_request;
194 
195     memset(req->data, 0, sizeof(req_data_t));
196 
197     try {
198         nxt_handle_scope  scope(env());
199 
200         server_obj = get_server_object();
201 
202         socket = create_socket(server_obj, req);
203         request = create_request(server_obj, socket);
204         response = create_response(server_obj, request, req);
205 
206         create_headers(req, request);
207 
208         emit_request = get_named_property(server_obj, "emit_request");
209 
210         nxt_async_context   async_context(env(), "request_handler");
211         nxt_callback_scope  async_scope(async_context);
212 
213         make_callback(async_context, server_obj, emit_request, request,
214                       response);
215 
216     } catch (exception &e) {
217         nxt_unit_req_warn(req, "request_handler: %s", e.str);
218     }
219 }
220 
221 
222 void
223 Unit::websocket_handler_cb(nxt_unit_websocket_frame_t *ws)
224 {
225     Unit  *obj;
226 
227     obj = reinterpret_cast<Unit *>(ws->req->unit->data);
228 
229     obj->websocket_handler(ws);
230 }
231 
232 
233 void
234 Unit::websocket_handler(nxt_unit_websocket_frame_t *ws)
235 {
236     napi_value  frame, server_obj, process_frame, conn;
237     req_data_t  *req_data;
238 
239     req_data = (req_data_t *) ws->req->data;
240 
241     try {
242         nxt_handle_scope  scope(env());
243 
244         server_obj = get_server_object();
245 
246         frame = create_websocket_frame(server_obj, ws);
247 
248         conn = get_reference_value(req_data->conn_ref);
249 
250         process_frame = get_named_property(conn, "processFrame");
251 
252         nxt_async_context   async_context(env(), "websocket_handler");
253         nxt_callback_scope  async_scope(async_context);
254 
255         make_callback(async_context, conn, process_frame, frame);
256 
257     } catch (exception &e) {
258         nxt_unit_req_warn(ws->req, "websocket_handler: %s", e.str);
259     }
260 
261     nxt_unit_websocket_done(ws);
262 }
263 
264 
265 void
266 Unit::close_handler_cb(nxt_unit_request_info_t *req)
267 {
268     Unit  *obj;
269 
270     obj = reinterpret_cast<Unit *>(req->unit->data);
271 
272     obj->close_handler(req);
273 }
274 
275 
276 void
277 Unit::close_handler(nxt_unit_request_info_t *req)
278 {
279     napi_value  conn_handle_close, conn;
280     req_data_t  *req_data;
281 
282     req_data = (req_data_t *) req->data;
283 
284     try {
285         nxt_handle_scope  scope(env());
286 
287         conn = get_reference_value(req_data->conn_ref);
288 
289         conn_handle_close = get_named_property(conn, "handleSocketClose");
290 
291         nxt_async_context   async_context(env(), "close_handler");
292         nxt_callback_scope  async_scope(async_context);
293 
294         make_callback(async_context, conn, conn_handle_close,
295                       nxt_napi::create(0));
296 
297         remove_wrap(req_data->sock_ref);
298         remove_wrap(req_data->resp_ref);
299         remove_wrap(req_data->conn_ref);
300 
301     } catch (exception &e) {
302         nxt_unit_req_warn(req, "close_handler: %s", e.str);
303 
304         return;
305     }
306 
307     nxt_unit_request_done(req, NXT_UNIT_OK);
308 }
309 
310 
311 static void
312 nxt_uv_read_callback(uv_poll_t *handle, int status, int events)
313 {
314     nxt_unit_run_once((nxt_unit_ctx_t *) handle->data);
315 }
316 
317 
318 int
319 Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
320 {
321     int               err;
322     Unit              *obj;
323     uv_loop_t         *loop;
324     napi_status       status;
325     nxt_nodejs_ctx_t  *node_ctx;
326 
327     if (port->in_fd != -1) {
328         obj = reinterpret_cast<Unit *>(ctx->unit->data);
329 
330         if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) {
331             nxt_unit_warn(ctx, "fcntl(%d, O_NONBLOCK) failed: %s (%d)",
332                           port->in_fd, strerror(errno), errno);
333             return -1;
334         }
335 
336         status = napi_get_uv_event_loop(obj->env(), &loop);
337         if (status != napi_ok) {
338             nxt_unit_warn(ctx, "Failed to get uv.loop");
339             return NXT_UNIT_ERROR;
340         }
341 
342         node_ctx = new nxt_nodejs_ctx_t;
343 
344         err = uv_poll_init(loop, &node_ctx->poll, port->in_fd);
345         if (err < 0) {
346             nxt_unit_warn(ctx, "Failed to init uv.poll");
347             return NXT_UNIT_ERROR;
348         }
349 
350         err = uv_poll_start(&node_ctx->poll, UV_READABLE, nxt_uv_read_callback);
351         if (err < 0) {
352             nxt_unit_warn(ctx, "Failed to start uv.poll");
353             return NXT_UNIT_ERROR;
354         }
355 
356         ctx->data = node_ctx;
357 
358         node_ctx->port_id = port->id;
359         node_ctx->poll.data = ctx;
360     }
361 
362     return nxt_unit_add_port(ctx, port);
363 }
364 
365 
366 inline bool
367 operator == (const nxt_unit_port_id_t &p1, const nxt_unit_port_id_t &p2)
368 {
369     return p1.pid == p2.pid && p1.id == p2.id;
370 }
371 
372 
373 void
374 Unit::remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
375 {
376     nxt_nodejs_ctx_t  *node_ctx;
377 
378     if (ctx->data != NULL) {
379         node_ctx = (nxt_nodejs_ctx_t *) ctx->data;
380 
381         if (node_ctx->port_id == *port_id) {
382             uv_poll_stop(&node_ctx->poll);
383 
384             delete node_ctx;
385 
386             ctx->data = NULL;
387         }
388     }
389 
390     nxt_unit_remove_port(ctx, port_id);
391 }
392 
393 
394 void
395 Unit::quit_cb(nxt_unit_ctx_t *ctx)
396 {
397     Unit  *obj;
398 
399     obj = reinterpret_cast<Unit *>(ctx->unit->data);
400 
401     obj->quit(ctx);
402 }
403 
404 
405 void
406 Unit::quit(nxt_unit_ctx_t *ctx)
407 {
408     napi_value  server_obj, emit_close;
409 
410     try {
411         nxt_handle_scope  scope(env());
412 
413         server_obj = get_server_object();
414 
415         emit_close = get_named_property(server_obj, "emit_close");
416 
417         nxt_async_context   async_context(env(), "unit_quit");
418         nxt_callback_scope  async_scope(async_context);
419 
420         make_callback(async_context, server_obj, emit_close);
421 
422     } catch (exception &e) {
423         nxt_unit_debug(ctx, "quit: %s", e.str);
424     }
425 
426     nxt_unit_done(ctx);
427 }
428 
429 
430 napi_value
431 Unit::get_server_object()
432 {
433     napi_value  unit_obj;
434 
435     unit_obj = get_reference_value(wrapper_);
436 
437     return get_named_property(unit_obj, "server");
438 }
439 
440 
441 void
442 Unit::create_headers(nxt_unit_request_info_t *req, napi_value request)
443 {
444     void                *data;
445     uint32_t            i;
446     napi_value          headers, raw_headers, buffer;
447     napi_status         status;
448     nxt_unit_request_t  *r;
449 
450     r = req->request;
451 
452     headers = create_object();
453 
454     status = napi_create_array_with_length(env(), r->fields_count * 2,
455                                            &raw_headers);
456     if (status != napi_ok) {
457         throw exception("Failed to create array");
458     }
459 
460     for (i = 0; i < r->fields_count; i++) {
461         append_header(r->fields + i, headers, raw_headers, i);
462     }
463 
464     set_named_property(request, "headers", headers);
465     set_named_property(request, "rawHeaders", raw_headers);
466     set_named_property(request, "httpVersion", r->version, r->version_length);
467     set_named_property(request, "method", r->method, r->method_length);
468     set_named_property(request, "url", r->target, r->target_length);
469 
470     set_named_property(request, "_websocket_handshake", r->websocket_handshake);
471 
472     buffer = create_buffer((size_t) req->content_length, &data);
473     nxt_unit_request_read(req, data, req->content_length);
474 
475     set_named_property(request, "_data", buffer);
476 }
477 
478 
479 inline char
480 lowcase(char c)
481 {
482     return (c >= 'A' && c <= 'Z') ? (c | 0x20) : c;
483 }
484 
485 
486 inline void
487 Unit::append_header(nxt_unit_field_t *f, napi_value headers,
488     napi_value raw_headers, uint32_t idx)
489 {
490     char        *name;
491     uint8_t     i;
492     napi_value  str, vstr;
493 
494     name = (char *) nxt_unit_sptr_get(&f->name);
495 
496     str = create_string_latin1(name, f->name_length);
497 
498     for (i = 0; i < f->name_length; i++) {
499         name[i] = lowcase(name[i]);
500     }
501 
502     vstr = set_named_property(headers, name, f->value, f->value_length);
503 
504     set_element(raw_headers, idx * 2, str);
505     set_element(raw_headers, idx * 2 + 1, vstr);
506 }
507 
508 
509 napi_value
510 Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req)
511 {
512     napi_value          constructor, res;
513     req_data_t          *req_data;
514     nxt_unit_request_t  *r;
515 
516     r = req->request;
517 
518     constructor = get_named_property(server_obj, "Socket");
519 
520     res = new_instance(constructor);
521 
522     req_data = (req_data_t *) req->data;
523     req_data->sock_ref = wrap(res, req, sock_destroy);
524 
525     set_named_property(res, "remoteAddress", r->remote, r->remote_length);
526     set_named_property(res, "localAddress", r->local, r->local_length);
527 
528     return res;
529 }
530 
531 
532 napi_value
533 Unit::create_request(napi_value server_obj, napi_value socket)
534 {
535     napi_value  constructor;
536 
537     constructor = get_named_property(server_obj, "ServerRequest");
538 
539     return new_instance(constructor, server_obj, socket);
540 }
541 
542 
543 napi_value
544 Unit::create_response(napi_value server_obj, napi_value request,
545     nxt_unit_request_info_t *req)
546 {
547     napi_value  constructor, res;
548     req_data_t  *req_data;
549 
550     constructor = get_named_property(server_obj, "ServerResponse");
551 
552     res = new_instance(constructor, request);
553 
554     req_data = (req_data_t *) req->data;
555     req_data->resp_ref = wrap(res, req, resp_destroy);
556 
557     return res;
558 }
559 
560 
561 napi_value
562 Unit::create_websocket_frame(napi_value server_obj,
563                              nxt_unit_websocket_frame_t *ws)
564 {
565     void        *data;
566     napi_value  constructor, res, buffer;
567     uint8_t     sc[2];
568 
569     constructor = get_named_property(server_obj, "WebSocketFrame");
570 
571     res = new_instance(constructor);
572 
573     set_named_property(res, "fin", (bool) ws->header->fin);
574     set_named_property(res, "opcode", ws->header->opcode);
575     set_named_property(res, "length", (int64_t) ws->payload_len);
576 
577     if (ws->header->opcode == NXT_WEBSOCKET_OP_CLOSE) {
578         if (ws->payload_len >= 2) {
579             nxt_unit_websocket_read(ws, sc, 2);
580 
581             set_named_property(res, "closeStatus",
582                                (((uint16_t) sc[0]) << 8) | sc[1]);
583 
584         } else {
585             set_named_property(res, "closeStatus", -1);
586         }
587     }
588 
589     buffer = create_buffer((size_t) ws->content_length, &data);
590     nxt_unit_websocket_read(ws, data, ws->content_length);
591 
592     set_named_property(res, "binaryPayload", buffer);
593 
594     return res;
595 }
596 
597 
598 napi_value
599 Unit::response_send_headers(napi_env env, napi_callback_info info)
600 {
601     int                      ret;
602     char                     *ptr, *name_ptr;
603     bool                     is_array;
604     size_t                   argc, name_len, value_len;
605     uint32_t                 status_code, header_len, keys_len, array_len;
606     uint32_t                 keys_count, i, j;
607     uint16_t                 hash;
608     nxt_napi                 napi(env);
609     napi_value               this_arg, headers, keys, name, value, array_val;
610     napi_value               array_entry;
611     napi_valuetype           val_type;
612     nxt_unit_field_t         *f;
613     nxt_unit_request_info_t  *req;
614     napi_value               argv[4];
615 
616     argc = 4;
617 
618     try {
619         this_arg = napi.get_cb_info(info, argc, argv);
620         if (argc != 4) {
621             napi.throw_error("Wrong args count. Expected: "
622                              "statusCode, headers, headers count, "
623                              "headers length");
624             return nullptr;
625         }
626 
627         req = napi.get_request_info(this_arg);
628         status_code = napi.get_value_uint32(argv[0]);
629         keys_count = napi.get_value_uint32(argv[2]);
630         header_len = napi.get_value_uint32(argv[3]);
631 
632         headers = argv[1];
633 
634         ret = nxt_unit_response_init(req, status_code, keys_count, header_len);
635         if (ret != NXT_UNIT_OK) {
636             napi.throw_error("Failed to create response");
637             return nullptr;
638         }
639 
640         /*
641          * Each name and value are 0-terminated by libunit.
642          * Need to add extra 2 bytes for each header.
643          */
644         header_len += keys_count * 2;
645 
646         keys = napi.get_property_names(headers);
647         keys_len = napi.get_array_length(keys);
648 
649         ptr = req->response_buf->free;
650 
651         for (i = 0; i < keys_len; i++) {
652             name = napi.get_element(keys, i);
653 
654             array_entry = napi.get_property(headers, name);
655 
656             name = napi.get_element(array_entry, 0);
657             value = napi.get_element(array_entry, 1);
658 
659             name_len = napi.get_value_string_latin1(name, ptr, header_len);
660             name_ptr = ptr;
661 
662             ptr += name_len + 1;
663             header_len -= name_len + 1;
664 
665             hash = nxt_unit_field_hash(name_ptr, name_len);
666 
667             is_array = napi.is_array(value);
668 
669             if (is_array) {
670                 array_len = napi.get_array_length(value);
671 
672                 for (j = 0; j < array_len; j++) {
673                     array_val = napi.get_element(value, j);
674 
675                     val_type = napi.type_of(array_val);
676 
677                     if (val_type != napi_string) {
678                         array_val = napi.coerce_to_string(array_val);
679                     }
680 
681                     value_len = napi.get_value_string_latin1(array_val, ptr,
682                                                              header_len);
683 
684                     f = req->response->fields + req->response->fields_count;
685                     f->skip = 0;
686 
687                     nxt_unit_sptr_set(&f->name, name_ptr);
688 
689                     f->name_length = name_len;
690                     f->hash = hash;
691 
692                     nxt_unit_sptr_set(&f->value, ptr);
693                     f->value_length = (uint32_t) value_len;
694 
695                     ptr += value_len + 1;
696                     header_len -= value_len + 1;
697 
698                     req->response->fields_count++;
699                 }
700 
701             } else {
702                 val_type = napi.type_of(value);
703 
704                 if (val_type != napi_string) {
705                     value = napi.coerce_to_string(value);
706                 }
707 
708                 value_len = napi.get_value_string_latin1(value, ptr, header_len);
709 
710                 f = req->response->fields + req->response->fields_count;
711                 f->skip = 0;
712 
713                 nxt_unit_sptr_set(&f->name, name_ptr);
714 
715                 f->name_length = name_len;
716                 f->hash = hash;
717 
718                 nxt_unit_sptr_set(&f->value, ptr);
719                 f->value_length = (uint32_t) value_len;
720 
721                 ptr += value_len + 1;
722                 header_len -= value_len + 1;
723 
724                 req->response->fields_count++;
725             }
726         }
727 
728     } catch (exception &e) {
729         napi.throw_error(e);
730         return nullptr;
731     }
732 
733     req->response_buf->free = ptr;
734 
735     ret = nxt_unit_response_send(req);
736     if (ret != NXT_UNIT_OK) {
737         napi.throw_error("Failed to send response");
738         return nullptr;
739     }
740 
741     return this_arg;
742 }
743 
744 
745 napi_value
746 Unit::response_write(napi_env env, napi_callback_info info)
747 {
748     int                      ret;
749     void                     *ptr;
750     size_t                   argc, have_buf_len;
751     uint32_t                 buf_len;
752     nxt_napi                 napi(env);
753     napi_value               this_arg;
754     nxt_unit_buf_t           *buf;
755     napi_valuetype           buf_type;
756     nxt_unit_request_info_t  *req;
757     napi_value               argv[2];
758 
759     argc = 2;
760 
761     try {
762         this_arg = napi.get_cb_info(info, argc, argv);
763         if (argc != 2) {
764             throw exception("Wrong args count. Expected: "
765                             "chunk, chunk length");
766         }
767 
768         req = napi.get_request_info(this_arg);
769         buf_type = napi.type_of(argv[0]);
770         buf_len = napi.get_value_uint32(argv[1]) + 1;
771 
772         buf = nxt_unit_response_buf_alloc(req, buf_len);
773         if (buf == NULL) {
774             throw exception("Failed to allocate response buffer");
775         }
776 
777         if (buf_type == napi_string) {
778             /* TODO: will work only for utf8 content-type */
779 
780             have_buf_len = napi.get_value_string_utf8(argv[0], buf->free,
781                                                       buf_len);
782 
783         } else {
784             ptr = napi.get_buffer_info(argv[0], have_buf_len);
785 
786             memcpy(buf->free, ptr, have_buf_len);
787         }
788 
789         buf->free += have_buf_len;
790 
791         ret = nxt_unit_buf_send(buf);
792         if (ret != NXT_UNIT_OK) {
793             throw exception("Failed to send body buf");
794         }
795     } catch (exception &e) {
796         napi.throw_error(e);
797         return nullptr;
798     }
799 
800     return this_arg;
801 }
802 
803 
804 napi_value
805 Unit::response_end(napi_env env, napi_callback_info info)
806 {
807     nxt_napi                 napi(env);
808     napi_value               this_arg;
809     req_data_t               *req_data;
810     nxt_unit_request_info_t  *req;
811 
812     try {
813         this_arg = napi.get_cb_info(info);
814 
815         req = napi.get_request_info(this_arg);
816 
817         req_data = (req_data_t *) req->data;
818 
819         napi.remove_wrap(req_data->sock_ref);
820         napi.remove_wrap(req_data->resp_ref);
821         napi.remove_wrap(req_data->conn_ref);
822 
823     } catch (exception &e) {
824         napi.throw_error(e);
825         return nullptr;
826     }
827 
828     nxt_unit_request_done(req, NXT_UNIT_OK);
829 
830     return this_arg;
831 }
832 
833 
834 napi_value
835 Unit::websocket_send_frame(napi_env env, napi_callback_info info)
836 {
837     int                      ret, iovec_len;
838     bool                     fin;
839     size_t                   buf_len;
840     uint32_t                 opcode, sc;
841     nxt_napi                 napi(env);
842     napi_value               this_arg, frame, payload;
843     nxt_unit_request_info_t  *req;
844     char                     status_code[2];
845     struct iovec             iov[2];
846 
847     iovec_len = 0;
848 
849     try {
850         this_arg = napi.get_cb_info(info, frame);
851 
852         req = napi.get_request_info(this_arg);
853 
854         opcode = napi.get_value_uint32(napi.get_named_property(frame,
855                                                                "opcode"));
856         if (opcode == NXT_WEBSOCKET_OP_CLOSE) {
857             sc = napi.get_value_uint32(napi.get_named_property(frame,
858                                                                "closeStatus"));
859             status_code[0] = (sc >> 8) & 0xFF;
860             status_code[1] = sc & 0xFF;
861 
862             iov[iovec_len].iov_base = status_code;
863             iov[iovec_len].iov_len = 2;
864             iovec_len++;
865         }
866 
867         try {
868             fin = napi.get_value_bool(napi.get_named_property(frame, "fin"));
869 
870         } catch (exception &e) {
871             fin = true;
872         }
873 
874         payload = napi.get_named_property(frame, "binaryPayload");
875 
876         if (napi.is_buffer(payload)) {
877             iov[iovec_len].iov_base = napi.get_buffer_info(payload, buf_len);
878 
879         } else {
880             buf_len = 0;
881         }
882 
883     } catch (exception &e) {
884         napi.throw_error(e);
885         return nullptr;
886     }
887 
888     if (buf_len > 0) {
889         iov[iovec_len].iov_len = buf_len;
890         iovec_len++;
891     }
892 
893     ret = nxt_unit_websocket_sendv(req, opcode, fin ? 1 : 0, iov, iovec_len);
894     if (ret != NXT_UNIT_OK) {
895         goto failed;
896     }
897 
898     return this_arg;
899 
900 failed:
901 
902     napi.throw_error("Failed to send frame");
903 
904     return nullptr;
905 }
906 
907 
908 napi_value
909 Unit::websocket_set_sock(napi_env env, napi_callback_info info)
910 {
911     nxt_napi                 napi(env);
912     napi_value               this_arg, sock;
913     req_data_t               *req_data;
914     nxt_unit_request_info_t  *req;
915 
916     try {
917         this_arg = napi.get_cb_info(info, sock);
918 
919         req = napi.get_request_info(sock);
920 
921         req_data = (req_data_t *) req->data;
922         req_data->conn_ref = napi.wrap(this_arg, req, conn_destroy);
923 
924     } catch (exception &e) {
925         napi.throw_error(e);
926         return nullptr;
927     }
928 
929     return this_arg;
930 }
931 
932 
933 void
934 Unit::conn_destroy(napi_env env, void *nativeObject, void *finalize_hint)
935 {
936     nxt_unit_request_info_t  *req;
937 
938     req = (nxt_unit_request_info_t *) nativeObject;
939 
940     nxt_unit_warn(NULL, "conn_destroy: %p", req);
941 }
942 
943 
944 void
945 Unit::sock_destroy(napi_env env, void *nativeObject, void *finalize_hint)
946 {
947     nxt_unit_request_info_t  *req;
948 
949     req = (nxt_unit_request_info_t *) nativeObject;
950 
951     nxt_unit_warn(NULL, "sock_destroy: %p", req);
952 }
953 
954 
955 void
956 Unit::resp_destroy(napi_env env, void *nativeObject, void *finalize_hint)
957 {
958     nxt_unit_request_info_t  *req;
959 
960     req = (nxt_unit_request_info_t *) nativeObject;
961 
962     nxt_unit_warn(NULL, "resp_destroy: %p", req);
963 }
964