xref: /unit/src/nodejs/unit-http/unit.cpp (revision 1547:cbcd76704c90)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #include "unit.h"
7 
8 #include <unistd.h>
9 #include <fcntl.h>
10 
11 #include <uv.h>
12 
13 #include <nxt_unit_websocket.h>
14 
15 
16 static void delete_port_data(uv_handle_t* handle);
17 
18 napi_ref Unit::constructor_;
19 
20 
21 struct port_data_t {
22     nxt_unit_ctx_t      *ctx;
23     nxt_unit_port_t     *port;
24     uv_poll_t           poll;
25 };
26 
27 
28 struct req_data_t {
29     napi_ref  sock_ref;
30     napi_ref  resp_ref;
31     napi_ref  conn_ref;
32 };
33 
34 
35 Unit::Unit(napi_env env, napi_value jsthis):
36     nxt_napi(env),
37     wrapper_(wrap(jsthis, this, destroy)),
38     unit_ctx_(nullptr)
39 {
40     nxt_unit_debug(NULL, "Unit::Unit()");
41 }
42 
43 
44 Unit::~Unit()
45 {
46     delete_reference(wrapper_);
47 
48     nxt_unit_debug(NULL, "Unit::~Unit()");
49 }
50 
51 
52 napi_value
53 Unit::init(napi_env env, napi_value exports)
54 {
55     nxt_napi    napi(env);
56     napi_value  ctor;
57 
58     napi_property_descriptor  unit_props[] = {
59         { "createServer", 0, create_server, 0, 0, 0, napi_default, 0 },
60         { "listen", 0, listen, 0, 0, 0, napi_default, 0 },
61     };
62 
63     try {
64         ctor = napi.define_class("Unit", create, 2, unit_props);
65         constructor_ = napi.create_reference(ctor);
66 
67         napi.set_named_property(exports, "Unit", ctor);
68         napi.set_named_property(exports, "response_send_headers",
69                                 response_send_headers);
70         napi.set_named_property(exports, "response_write", response_write);
71         napi.set_named_property(exports, "response_end", response_end);
72         napi.set_named_property(exports, "websocket_send_frame",
73                                 websocket_send_frame);
74         napi.set_named_property(exports, "websocket_set_sock",
75                                 websocket_set_sock);
76         napi.set_named_property(exports, "buf_min", nxt_unit_buf_min());
77         napi.set_named_property(exports, "buf_max", nxt_unit_buf_max());
78 
79     } catch (exception &e) {
80         napi.throw_error(e);
81         return nullptr;
82     }
83 
84     return exports;
85 }
86 
87 
88 void
89 Unit::destroy(napi_env env, void *nativeObject, void *finalize_hint)
90 {
91     Unit  *obj = reinterpret_cast<Unit *>(nativeObject);
92 
93     delete obj;
94 }
95 
96 
97 napi_value
98 Unit::create(napi_env env, napi_callback_info info)
99 {
100     nxt_napi    napi(env);
101     napi_value  target, ctor, instance, jsthis;
102 
103     try {
104         target = napi.get_new_target(info);
105 
106         if (target != nullptr) {
107             /* Invoked as constructor: `new Unit(...)`. */
108             jsthis = napi.get_cb_info(info);
109 
110             new Unit(env, jsthis);
111             napi.create_reference(jsthis);
112 
113             return jsthis;
114         }
115 
116         /* Invoked as plain function `Unit(...)`, turn into construct call. */
117         ctor = napi.get_reference_value(constructor_);
118         instance = napi.new_instance(ctor);
119         napi.create_reference(instance);
120 
121     } catch (exception &e) {
122         napi.throw_error(e);
123         return nullptr;
124     }
125 
126     return instance;
127 }
128 
129 
130 napi_value
131 Unit::create_server(napi_env env, napi_callback_info info)
132 {
133     Unit             *obj;
134     size_t           argc;
135     nxt_napi         napi(env);
136     napi_value       jsthis, argv;
137     nxt_unit_init_t  unit_init;
138 
139     argc = 1;
140 
141     try {
142         jsthis = napi.get_cb_info(info, argc, &argv);
143         obj = (Unit *) napi.unwrap(jsthis);
144 
145     } catch (exception &e) {
146         napi.throw_error(e);
147         return nullptr;
148     }
149 
150     memset(&unit_init, 0, sizeof(nxt_unit_init_t));
151 
152     unit_init.data = obj;
153     unit_init.callbacks.request_handler   = request_handler_cb;
154     unit_init.callbacks.websocket_handler = websocket_handler_cb;
155     unit_init.callbacks.close_handler     = close_handler_cb;
156     unit_init.callbacks.shm_ack_handler   = shm_ack_handler_cb;
157     unit_init.callbacks.add_port          = add_port;
158     unit_init.callbacks.remove_port       = remove_port;
159     unit_init.callbacks.quit              = quit_cb;
160 
161     unit_init.request_data_size = sizeof(req_data_t);
162 
163     obj->unit_ctx_ = nxt_unit_init(&unit_init);
164     if (obj->unit_ctx_ == NULL) {
165         goto failed;
166     }
167 
168     return nullptr;
169 
170 failed:
171 
172     napi_throw_error(env, NULL, "Failed to create Unit object");
173 
174     return nullptr;
175 }
176 
177 
178 napi_value
179 Unit::listen(napi_env env, napi_callback_info info)
180 {
181     return nullptr;
182 }
183 
184 
185 void
186 Unit::request_handler_cb(nxt_unit_request_info_t *req)
187 {
188     Unit  *obj;
189 
190     obj = reinterpret_cast<Unit *>(req->unit->data);
191 
192     obj->request_handler(req);
193 }
194 
195 
196 void
197 Unit::request_handler(nxt_unit_request_info_t *req)
198 {
199     napi_value  socket, request, response, server_obj, emit_request;
200 
201     memset(req->data, 0, sizeof(req_data_t));
202 
203     try {
204         nxt_handle_scope  scope(env());
205 
206         server_obj = get_server_object();
207 
208         socket = create_socket(server_obj, req);
209         request = create_request(server_obj, socket);
210         response = create_response(server_obj, request, req);
211 
212         create_headers(req, request);
213 
214         emit_request = get_named_property(server_obj, "emit_request");
215 
216         nxt_async_context   async_context(env(), "request_handler");
217         nxt_callback_scope  async_scope(async_context);
218 
219         make_callback(async_context, server_obj, emit_request, request,
220                       response);
221 
222     } catch (exception &e) {
223         nxt_unit_req_warn(req, "request_handler: %s", e.str);
224     }
225 }
226 
227 
228 void
229 Unit::websocket_handler_cb(nxt_unit_websocket_frame_t *ws)
230 {
231     Unit  *obj;
232 
233     obj = reinterpret_cast<Unit *>(ws->req->unit->data);
234 
235     obj->websocket_handler(ws);
236 }
237 
238 
239 void
240 Unit::websocket_handler(nxt_unit_websocket_frame_t *ws)
241 {
242     napi_value  frame, server_obj, process_frame, conn;
243     req_data_t  *req_data;
244 
245     req_data = (req_data_t *) ws->req->data;
246 
247     try {
248         nxt_handle_scope  scope(env());
249 
250         server_obj = get_server_object();
251 
252         frame = create_websocket_frame(server_obj, ws);
253 
254         conn = get_reference_value(req_data->conn_ref);
255 
256         process_frame = get_named_property(conn, "processFrame");
257 
258         nxt_async_context   async_context(env(), "websocket_handler");
259         nxt_callback_scope  async_scope(async_context);
260 
261         make_callback(async_context, conn, process_frame, frame);
262 
263     } catch (exception &e) {
264         nxt_unit_req_warn(ws->req, "websocket_handler: %s", e.str);
265     }
266 
267     nxt_unit_websocket_done(ws);
268 }
269 
270 
271 void
272 Unit::close_handler_cb(nxt_unit_request_info_t *req)
273 {
274     Unit  *obj;
275 
276     obj = reinterpret_cast<Unit *>(req->unit->data);
277 
278     obj->close_handler(req);
279 }
280 
281 
282 void
283 Unit::close_handler(nxt_unit_request_info_t *req)
284 {
285     napi_value  conn_handle_close, conn;
286     req_data_t  *req_data;
287 
288     req_data = (req_data_t *) req->data;
289 
290     try {
291         nxt_handle_scope  scope(env());
292 
293         conn = get_reference_value(req_data->conn_ref);
294 
295         conn_handle_close = get_named_property(conn, "handleSocketClose");
296 
297         nxt_async_context   async_context(env(), "close_handler");
298         nxt_callback_scope  async_scope(async_context);
299 
300         make_callback(async_context, conn, conn_handle_close,
301                       nxt_napi::create(0));
302 
303         remove_wrap(req_data->sock_ref);
304         remove_wrap(req_data->resp_ref);
305         remove_wrap(req_data->conn_ref);
306 
307     } catch (exception &e) {
308         nxt_unit_req_warn(req, "close_handler: %s", e.str);
309 
310         return;
311     }
312 
313     nxt_unit_request_done(req, NXT_UNIT_OK);
314 }
315 
316 
317 void
318 Unit::shm_ack_handler_cb(nxt_unit_ctx_t *ctx)
319 {
320     Unit  *obj;
321 
322     obj = reinterpret_cast<Unit *>(ctx->unit->data);
323 
324     obj->shm_ack_handler(ctx);
325 }
326 
327 
328 void
329 Unit::shm_ack_handler(nxt_unit_ctx_t *ctx)
330 {
331     napi_value  server_obj, emit_drain;
332 
333     try {
334         nxt_handle_scope  scope(env());
335 
336         server_obj = get_server_object();
337 
338         emit_drain = get_named_property(server_obj, "emit_drain");
339 
340         nxt_async_context   async_context(env(), "shm_ack_handler");
341         nxt_callback_scope  async_scope(async_context);
342 
343         make_callback(async_context, server_obj, emit_drain);
344 
345     } catch (exception &e) {
346         nxt_unit_warn(ctx, "shm_ack_handler: %s", e.str);
347     }
348 }
349 
350 
351 static void
352 nxt_uv_read_callback(uv_poll_t *handle, int status, int events)
353 {
354     port_data_t  *data;
355 
356     data = (port_data_t *) handle->data;
357 
358     nxt_unit_process_port_msg(data->ctx, data->port);
359 }
360 
361 
362 int
363 Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
364 {
365     int               err;
366     Unit              *obj;
367     uv_loop_t         *loop;
368     port_data_t       *data;
369     napi_status       status;
370 
371     if (port->in_fd != -1) {
372         obj = reinterpret_cast<Unit *>(ctx->unit->data);
373 
374         if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) {
375             nxt_unit_warn(ctx, "fcntl(%d, O_NONBLOCK) failed: %s (%d)",
376                           port->in_fd, strerror(errno), errno);
377             return -1;
378         }
379 
380         status = napi_get_uv_event_loop(obj->env(), &loop);
381         if (status != napi_ok) {
382             nxt_unit_warn(ctx, "Failed to get uv.loop");
383             return NXT_UNIT_ERROR;
384         }
385 
386         data = new port_data_t;
387 
388         err = uv_poll_init(loop, &data->poll, port->in_fd);
389         if (err < 0) {
390             nxt_unit_warn(ctx, "Failed to init uv.poll");
391             return NXT_UNIT_ERROR;
392         }
393 
394         err = uv_poll_start(&data->poll, UV_READABLE, nxt_uv_read_callback);
395         if (err < 0) {
396             nxt_unit_warn(ctx, "Failed to start uv.poll");
397             return NXT_UNIT_ERROR;
398         }
399 
400         port->data = data;
401 
402         data->ctx = ctx;
403         data->port = port;
404         data->poll.data = data;
405     }
406 
407     return NXT_UNIT_OK;
408 }
409 
410 
411 void
412 Unit::remove_port(nxt_unit_t *unit, nxt_unit_port_t *port)
413 {
414     port_data_t  *data;
415 
416     if (port->data != NULL) {
417         data = (port_data_t *) port->data;
418 
419         if (data->port == port) {
420             uv_poll_stop(&data->poll);
421 
422             uv_close((uv_handle_t *) &data->poll, delete_port_data);
423         }
424     }
425 }
426 
427 
428 static void
429 delete_port_data(uv_handle_t* handle)
430 {
431     port_data_t  *data;
432 
433     data = (port_data_t *) handle->data;
434 
435     delete data;
436 }
437 
438 
439 void
440 Unit::quit_cb(nxt_unit_ctx_t *ctx)
441 {
442     Unit  *obj;
443 
444     obj = reinterpret_cast<Unit *>(ctx->unit->data);
445 
446     obj->quit(ctx);
447 }
448 
449 
450 void
451 Unit::quit(nxt_unit_ctx_t *ctx)
452 {
453     napi_value  server_obj, emit_close;
454 
455     try {
456         nxt_handle_scope  scope(env());
457 
458         server_obj = get_server_object();
459 
460         emit_close = get_named_property(server_obj, "emit_close");
461 
462         nxt_async_context   async_context(env(), "unit_quit");
463         nxt_callback_scope  async_scope(async_context);
464 
465         make_callback(async_context, server_obj, emit_close);
466 
467     } catch (exception &e) {
468         nxt_unit_debug(ctx, "quit: %s", e.str);
469     }
470 
471     nxt_unit_done(ctx);
472 }
473 
474 
475 napi_value
476 Unit::get_server_object()
477 {
478     napi_value  unit_obj;
479 
480     unit_obj = get_reference_value(wrapper_);
481 
482     return get_named_property(unit_obj, "server");
483 }
484 
485 
486 void
487 Unit::create_headers(nxt_unit_request_info_t *req, napi_value request)
488 {
489     void                *data;
490     uint32_t            i;
491     napi_value          headers, raw_headers, buffer;
492     napi_status         status;
493     nxt_unit_request_t  *r;
494 
495     r = req->request;
496 
497     headers = create_object();
498 
499     status = napi_create_array_with_length(env(), r->fields_count * 2,
500                                            &raw_headers);
501     if (status != napi_ok) {
502         throw exception("Failed to create array");
503     }
504 
505     for (i = 0; i < r->fields_count; i++) {
506         append_header(r->fields + i, headers, raw_headers, i);
507     }
508 
509     set_named_property(request, "headers", headers);
510     set_named_property(request, "rawHeaders", raw_headers);
511     set_named_property(request, "httpVersion", r->version, r->version_length);
512     set_named_property(request, "method", r->method, r->method_length);
513     set_named_property(request, "url", r->target, r->target_length);
514 
515     set_named_property(request, "_websocket_handshake", r->websocket_handshake);
516 
517     buffer = create_buffer((size_t) req->content_length, &data);
518     nxt_unit_request_read(req, data, req->content_length);
519 
520     set_named_property(request, "_data", buffer);
521 }
522 
523 
524 inline char
525 lowcase(char c)
526 {
527     return (c >= 'A' && c <= 'Z') ? (c | 0x20) : c;
528 }
529 
530 
531 inline void
532 Unit::append_header(nxt_unit_field_t *f, napi_value headers,
533     napi_value raw_headers, uint32_t idx)
534 {
535     char        *name;
536     uint8_t     i;
537     napi_value  str, vstr;
538 
539     name = (char *) nxt_unit_sptr_get(&f->name);
540 
541     str = create_string_latin1(name, f->name_length);
542 
543     for (i = 0; i < f->name_length; i++) {
544         name[i] = lowcase(name[i]);
545     }
546 
547     vstr = set_named_property(headers, name, f->value, f->value_length);
548 
549     set_element(raw_headers, idx * 2, str);
550     set_element(raw_headers, idx * 2 + 1, vstr);
551 }
552 
553 
554 napi_value
555 Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req)
556 {
557     napi_value          constructor, res;
558     req_data_t          *req_data;
559     nxt_unit_request_t  *r;
560 
561     r = req->request;
562 
563     constructor = get_named_property(server_obj, "Socket");
564 
565     res = new_instance(constructor);
566 
567     req_data = (req_data_t *) req->data;
568     req_data->sock_ref = wrap(res, req, sock_destroy);
569 
570     set_named_property(res, "remoteAddress", r->remote, r->remote_length);
571     set_named_property(res, "localAddress", r->local, r->local_length);
572 
573     return res;
574 }
575 
576 
577 napi_value
578 Unit::create_request(napi_value server_obj, napi_value socket)
579 {
580     napi_value  constructor;
581 
582     constructor = get_named_property(server_obj, "ServerRequest");
583 
584     return new_instance(constructor, server_obj, socket);
585 }
586 
587 
588 napi_value
589 Unit::create_response(napi_value server_obj, napi_value request,
590     nxt_unit_request_info_t *req)
591 {
592     napi_value  constructor, res;
593     req_data_t  *req_data;
594 
595     constructor = get_named_property(server_obj, "ServerResponse");
596 
597     res = new_instance(constructor, request);
598 
599     req_data = (req_data_t *) req->data;
600     req_data->resp_ref = wrap(res, req, resp_destroy);
601 
602     return res;
603 }
604 
605 
606 napi_value
607 Unit::create_websocket_frame(napi_value server_obj,
608                              nxt_unit_websocket_frame_t *ws)
609 {
610     void        *data;
611     napi_value  constructor, res, buffer;
612     uint8_t     sc[2];
613 
614     constructor = get_named_property(server_obj, "WebSocketFrame");
615 
616     res = new_instance(constructor);
617 
618     set_named_property(res, "fin", (bool) ws->header->fin);
619     set_named_property(res, "opcode", ws->header->opcode);
620     set_named_property(res, "length", (int64_t) ws->payload_len);
621 
622     if (ws->header->opcode == NXT_WEBSOCKET_OP_CLOSE) {
623         if (ws->payload_len >= 2) {
624             nxt_unit_websocket_read(ws, sc, 2);
625 
626             set_named_property(res, "closeStatus",
627                                (((uint16_t) sc[0]) << 8) | sc[1]);
628 
629         } else {
630             set_named_property(res, "closeStatus", -1);
631         }
632     }
633 
634     buffer = create_buffer((size_t) ws->content_length, &data);
635     nxt_unit_websocket_read(ws, data, ws->content_length);
636 
637     set_named_property(res, "binaryPayload", buffer);
638 
639     return res;
640 }
641 
642 
643 napi_value
644 Unit::response_send_headers(napi_env env, napi_callback_info info)
645 {
646     int                      ret;
647     char                     *ptr, *name_ptr;
648     bool                     is_array;
649     size_t                   argc, name_len, value_len;
650     uint32_t                 status_code, header_len, keys_len, array_len;
651     uint32_t                 keys_count, i, j;
652     uint16_t                 hash;
653     nxt_napi                 napi(env);
654     napi_value               this_arg, headers, keys, name, value, array_val;
655     napi_value               array_entry;
656     napi_valuetype           val_type;
657     nxt_unit_field_t         *f;
658     nxt_unit_request_info_t  *req;
659     napi_value               argv[4];
660 
661     argc = 4;
662 
663     try {
664         this_arg = napi.get_cb_info(info, argc, argv);
665         if (argc != 4) {
666             napi.throw_error("Wrong args count. Expected: "
667                              "statusCode, headers, headers count, "
668                              "headers length");
669             return nullptr;
670         }
671 
672         req = napi.get_request_info(this_arg);
673         status_code = napi.get_value_uint32(argv[0]);
674         keys_count = napi.get_value_uint32(argv[2]);
675         header_len = napi.get_value_uint32(argv[3]);
676 
677         headers = argv[1];
678 
679         ret = nxt_unit_response_init(req, status_code, keys_count, header_len);
680         if (ret != NXT_UNIT_OK) {
681             napi.throw_error("Failed to create response");
682             return nullptr;
683         }
684 
685         /*
686          * Each name and value are 0-terminated by libunit.
687          * Need to add extra 2 bytes for each header.
688          */
689         header_len += keys_count * 2;
690 
691         keys = napi.get_property_names(headers);
692         keys_len = napi.get_array_length(keys);
693 
694         ptr = req->response_buf->free;
695 
696         for (i = 0; i < keys_len; i++) {
697             name = napi.get_element(keys, i);
698 
699             array_entry = napi.get_property(headers, name);
700 
701             name = napi.get_element(array_entry, 0);
702             value = napi.get_element(array_entry, 1);
703 
704             name_len = napi.get_value_string_latin1(name, ptr, header_len);
705             name_ptr = ptr;
706 
707             ptr += name_len + 1;
708             header_len -= name_len + 1;
709 
710             hash = nxt_unit_field_hash(name_ptr, name_len);
711 
712             is_array = napi.is_array(value);
713 
714             if (is_array) {
715                 array_len = napi.get_array_length(value);
716 
717                 for (j = 0; j < array_len; j++) {
718                     array_val = napi.get_element(value, j);
719 
720                     val_type = napi.type_of(array_val);
721 
722                     if (val_type != napi_string) {
723                         array_val = napi.coerce_to_string(array_val);
724                     }
725 
726                     value_len = napi.get_value_string_latin1(array_val, ptr,
727                                                              header_len);
728 
729                     f = req->response->fields + req->response->fields_count;
730                     f->skip = 0;
731 
732                     nxt_unit_sptr_set(&f->name, name_ptr);
733 
734                     f->name_length = name_len;
735                     f->hash = hash;
736 
737                     nxt_unit_sptr_set(&f->value, ptr);
738                     f->value_length = (uint32_t) value_len;
739 
740                     ptr += value_len + 1;
741                     header_len -= value_len + 1;
742 
743                     req->response->fields_count++;
744                 }
745 
746             } else {
747                 val_type = napi.type_of(value);
748 
749                 if (val_type != napi_string) {
750                     value = napi.coerce_to_string(value);
751                 }
752 
753                 value_len = napi.get_value_string_latin1(value, ptr, header_len);
754 
755                 f = req->response->fields + req->response->fields_count;
756                 f->skip = 0;
757 
758                 nxt_unit_sptr_set(&f->name, name_ptr);
759 
760                 f->name_length = name_len;
761                 f->hash = hash;
762 
763                 nxt_unit_sptr_set(&f->value, ptr);
764                 f->value_length = (uint32_t) value_len;
765 
766                 ptr += value_len + 1;
767                 header_len -= value_len + 1;
768 
769                 req->response->fields_count++;
770             }
771         }
772 
773     } catch (exception &e) {
774         napi.throw_error(e);
775         return nullptr;
776     }
777 
778     req->response_buf->free = ptr;
779 
780     ret = nxt_unit_response_send(req);
781     if (ret != NXT_UNIT_OK) {
782         napi.throw_error("Failed to send response");
783         return nullptr;
784     }
785 
786     return this_arg;
787 }
788 
789 
790 napi_value
791 Unit::response_write(napi_env env, napi_callback_info info)
792 {
793     int                      ret;
794     void                     *ptr;
795     size_t                   argc, have_buf_len;
796     ssize_t                  res_len;
797     uint32_t                 buf_start, buf_len;
798     nxt_napi                 napi(env);
799     napi_value               this_arg;
800     nxt_unit_buf_t           *buf;
801     napi_valuetype           buf_type;
802     nxt_unit_request_info_t  *req;
803     napi_value               argv[3];
804 
805     argc = 3;
806 
807     try {
808         this_arg = napi.get_cb_info(info, argc, argv);
809         if (argc != 3) {
810             throw exception("Wrong args count. Expected: "
811                             "chunk, start, length");
812         }
813 
814         req = napi.get_request_info(this_arg);
815         buf_type = napi.type_of(argv[0]);
816         buf_start = napi.get_value_uint32(argv[1]);
817         buf_len = napi.get_value_uint32(argv[2]) + 1;
818 
819         if (buf_type == napi_string) {
820             /* TODO: will work only for utf8 content-type */
821 
822             if (req->response_buf != NULL
823                 && req->response_buf->end >= req->response_buf->free + buf_len)
824             {
825                 buf = req->response_buf;
826 
827             } else {
828                 buf = nxt_unit_response_buf_alloc(req, buf_len);
829                 if (buf == NULL) {
830                     throw exception("Failed to allocate response buffer");
831                 }
832             }
833 
834             have_buf_len = napi.get_value_string_utf8(argv[0], buf->free,
835                                                       buf_len);
836 
837             buf->free += have_buf_len;
838 
839             ret = nxt_unit_buf_send(buf);
840             if (ret == NXT_UNIT_OK) {
841                 res_len = have_buf_len;
842             }
843 
844         } else {
845             ptr = napi.get_buffer_info(argv[0], have_buf_len);
846 
847             if (buf_start > 0) {
848                 ptr = ((uint8_t *) ptr) + buf_start;
849                 have_buf_len -= buf_start;
850             }
851 
852             res_len = nxt_unit_response_write_nb(req, ptr, have_buf_len, 0);
853 
854             ret = res_len < 0 ? -res_len : (int) NXT_UNIT_OK;
855         }
856 
857         if (ret != NXT_UNIT_OK) {
858             throw exception("Failed to send body buf");
859         }
860     } catch (exception &e) {
861         napi.throw_error(e);
862         return nullptr;
863     }
864 
865     return napi.create((int64_t) res_len);
866 }
867 
868 
869 napi_value
870 Unit::response_end(napi_env env, napi_callback_info info)
871 {
872     nxt_napi                 napi(env);
873     napi_value               this_arg;
874     req_data_t               *req_data;
875     nxt_unit_request_info_t  *req;
876 
877     try {
878         this_arg = napi.get_cb_info(info);
879 
880         req = napi.get_request_info(this_arg);
881 
882         req_data = (req_data_t *) req->data;
883 
884         napi.remove_wrap(req_data->sock_ref);
885         napi.remove_wrap(req_data->resp_ref);
886         napi.remove_wrap(req_data->conn_ref);
887 
888     } catch (exception &e) {
889         napi.throw_error(e);
890         return nullptr;
891     }
892 
893     nxt_unit_request_done(req, NXT_UNIT_OK);
894 
895     return this_arg;
896 }
897 
898 
899 napi_value
900 Unit::websocket_send_frame(napi_env env, napi_callback_info info)
901 {
902     int                      ret, iovec_len;
903     bool                     fin;
904     size_t                   buf_len;
905     uint32_t                 opcode, sc;
906     nxt_napi                 napi(env);
907     napi_value               this_arg, frame, payload;
908     nxt_unit_request_info_t  *req;
909     char                     status_code[2];
910     struct iovec             iov[2];
911 
912     iovec_len = 0;
913 
914     try {
915         this_arg = napi.get_cb_info(info, frame);
916 
917         req = napi.get_request_info(this_arg);
918 
919         opcode = napi.get_value_uint32(napi.get_named_property(frame,
920                                                                "opcode"));
921         if (opcode == NXT_WEBSOCKET_OP_CLOSE) {
922             sc = napi.get_value_uint32(napi.get_named_property(frame,
923                                                                "closeStatus"));
924             status_code[0] = (sc >> 8) & 0xFF;
925             status_code[1] = sc & 0xFF;
926 
927             iov[iovec_len].iov_base = status_code;
928             iov[iovec_len].iov_len = 2;
929             iovec_len++;
930         }
931 
932         try {
933             fin = napi.get_value_bool(napi.get_named_property(frame, "fin"));
934 
935         } catch (exception &e) {
936             fin = true;
937         }
938 
939         payload = napi.get_named_property(frame, "binaryPayload");
940 
941         if (napi.is_buffer(payload)) {
942             iov[iovec_len].iov_base = napi.get_buffer_info(payload, buf_len);
943 
944         } else {
945             buf_len = 0;
946         }
947 
948     } catch (exception &e) {
949         napi.throw_error(e);
950         return nullptr;
951     }
952 
953     if (buf_len > 0) {
954         iov[iovec_len].iov_len = buf_len;
955         iovec_len++;
956     }
957 
958     ret = nxt_unit_websocket_sendv(req, opcode, fin ? 1 : 0, iov, iovec_len);
959     if (ret != NXT_UNIT_OK) {
960         goto failed;
961     }
962 
963     return this_arg;
964 
965 failed:
966 
967     napi.throw_error("Failed to send frame");
968 
969     return nullptr;
970 }
971 
972 
973 napi_value
974 Unit::websocket_set_sock(napi_env env, napi_callback_info info)
975 {
976     nxt_napi                 napi(env);
977     napi_value               this_arg, sock;
978     req_data_t               *req_data;
979     nxt_unit_request_info_t  *req;
980 
981     try {
982         this_arg = napi.get_cb_info(info, sock);
983 
984         req = napi.get_request_info(sock);
985 
986         req_data = (req_data_t *) req->data;
987         req_data->conn_ref = napi.wrap(this_arg, req, conn_destroy);
988 
989     } catch (exception &e) {
990         napi.throw_error(e);
991         return nullptr;
992     }
993 
994     return this_arg;
995 }
996 
997 
998 void
999 Unit::conn_destroy(napi_env env, void *nativeObject, void *finalize_hint)
1000 {
1001     nxt_unit_request_info_t  *req;
1002 
1003     req = (nxt_unit_request_info_t *) nativeObject;
1004 
1005     nxt_unit_warn(NULL, "conn_destroy: %p", req);
1006 }
1007 
1008 
1009 void
1010 Unit::sock_destroy(napi_env env, void *nativeObject, void *finalize_hint)
1011 {
1012     nxt_unit_request_info_t  *req;
1013 
1014     req = (nxt_unit_request_info_t *) nativeObject;
1015 
1016     nxt_unit_warn(NULL, "sock_destroy: %p", req);
1017 }
1018 
1019 
1020 void
1021 Unit::resp_destroy(napi_env env, void *nativeObject, void *finalize_hint)
1022 {
1023     nxt_unit_request_info_t  *req;
1024 
1025     req = (nxt_unit_request_info_t *) nativeObject;
1026 
1027     nxt_unit_warn(NULL, "resp_destroy: %p", req);
1028 }
1029