xref: /unit/src/nodejs/unit-http/unit.cpp (revision 1536:8d1511734750)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #include "unit.h"
7 
8 #include <unistd.h>
9 #include <fcntl.h>
10 
11 #include <uv.h>
12 
13 #include <nxt_unit_websocket.h>
14 
15 
16 static void delete_port_data(uv_handle_t* handle);
17 
18 napi_ref Unit::constructor_;
19 
20 
21 struct nxt_nodejs_ctx_t {
22     nxt_unit_port_id_t  port_id;
23     uv_poll_t           poll;
24 };
25 
26 
27 struct req_data_t {
28     napi_ref  sock_ref;
29     napi_ref  resp_ref;
30     napi_ref  conn_ref;
31 };
32 
33 
34 Unit::Unit(napi_env env, napi_value jsthis):
35     nxt_napi(env),
36     wrapper_(wrap(jsthis, this, destroy)),
37     unit_ctx_(nullptr)
38 {
39     nxt_unit_debug(NULL, "Unit::Unit()");
40 }
41 
42 
43 Unit::~Unit()
44 {
45     delete_reference(wrapper_);
46 
47     nxt_unit_debug(NULL, "Unit::~Unit()");
48 }
49 
50 
51 napi_value
52 Unit::init(napi_env env, napi_value exports)
53 {
54     nxt_napi    napi(env);
55     napi_value  ctor;
56 
57     napi_property_descriptor  unit_props[] = {
58         { "createServer", 0, create_server, 0, 0, 0, napi_default, 0 },
59         { "listen", 0, listen, 0, 0, 0, napi_default, 0 },
60     };
61 
62     try {
63         ctor = napi.define_class("Unit", create, 2, unit_props);
64         constructor_ = napi.create_reference(ctor);
65 
66         napi.set_named_property(exports, "Unit", ctor);
67         napi.set_named_property(exports, "response_send_headers",
68                                 response_send_headers);
69         napi.set_named_property(exports, "response_write", response_write);
70         napi.set_named_property(exports, "response_end", response_end);
71         napi.set_named_property(exports, "websocket_send_frame",
72                                 websocket_send_frame);
73         napi.set_named_property(exports, "websocket_set_sock",
74                                 websocket_set_sock);
75         napi.set_named_property(exports, "buf_min", nxt_unit_buf_min());
76         napi.set_named_property(exports, "buf_max", nxt_unit_buf_max());
77 
78     } catch (exception &e) {
79         napi.throw_error(e);
80         return nullptr;
81     }
82 
83     return exports;
84 }
85 
86 
87 void
88 Unit::destroy(napi_env env, void *nativeObject, void *finalize_hint)
89 {
90     Unit  *obj = reinterpret_cast<Unit *>(nativeObject);
91 
92     delete obj;
93 }
94 
95 
96 napi_value
97 Unit::create(napi_env env, napi_callback_info info)
98 {
99     nxt_napi    napi(env);
100     napi_value  target, ctor, instance, jsthis;
101 
102     try {
103         target = napi.get_new_target(info);
104 
105         if (target != nullptr) {
106             /* Invoked as constructor: `new Unit(...)`. */
107             jsthis = napi.get_cb_info(info);
108 
109             new Unit(env, jsthis);
110             napi.create_reference(jsthis);
111 
112             return jsthis;
113         }
114 
115         /* Invoked as plain function `Unit(...)`, turn into construct call. */
116         ctor = napi.get_reference_value(constructor_);
117         instance = napi.new_instance(ctor);
118         napi.create_reference(instance);
119 
120     } catch (exception &e) {
121         napi.throw_error(e);
122         return nullptr;
123     }
124 
125     return instance;
126 }
127 
128 
129 napi_value
130 Unit::create_server(napi_env env, napi_callback_info info)
131 {
132     Unit             *obj;
133     size_t           argc;
134     nxt_napi         napi(env);
135     napi_value       jsthis, argv;
136     nxt_unit_init_t  unit_init;
137 
138     argc = 1;
139 
140     try {
141         jsthis = napi.get_cb_info(info, argc, &argv);
142         obj = (Unit *) napi.unwrap(jsthis);
143 
144     } catch (exception &e) {
145         napi.throw_error(e);
146         return nullptr;
147     }
148 
149     memset(&unit_init, 0, sizeof(nxt_unit_init_t));
150 
151     unit_init.data = obj;
152     unit_init.callbacks.request_handler   = request_handler_cb;
153     unit_init.callbacks.websocket_handler = websocket_handler_cb;
154     unit_init.callbacks.close_handler     = close_handler_cb;
155     unit_init.callbacks.shm_ack_handler   = shm_ack_handler_cb;
156     unit_init.callbacks.add_port          = add_port;
157     unit_init.callbacks.remove_port       = remove_port;
158     unit_init.callbacks.quit              = quit_cb;
159 
160     unit_init.request_data_size = sizeof(req_data_t);
161 
162     obj->unit_ctx_ = nxt_unit_init(&unit_init);
163     if (obj->unit_ctx_ == NULL) {
164         goto failed;
165     }
166 
167     return nullptr;
168 
169 failed:
170 
171     napi_throw_error(env, NULL, "Failed to create Unit object");
172 
173     return nullptr;
174 }
175 
176 
177 napi_value
178 Unit::listen(napi_env env, napi_callback_info info)
179 {
180     return nullptr;
181 }
182 
183 
184 void
185 Unit::request_handler_cb(nxt_unit_request_info_t *req)
186 {
187     Unit  *obj;
188 
189     obj = reinterpret_cast<Unit *>(req->unit->data);
190 
191     obj->request_handler(req);
192 }
193 
194 
195 void
196 Unit::request_handler(nxt_unit_request_info_t *req)
197 {
198     napi_value  socket, request, response, server_obj, emit_request;
199 
200     memset(req->data, 0, sizeof(req_data_t));
201 
202     try {
203         nxt_handle_scope  scope(env());
204 
205         server_obj = get_server_object();
206 
207         socket = create_socket(server_obj, req);
208         request = create_request(server_obj, socket);
209         response = create_response(server_obj, request, req);
210 
211         create_headers(req, request);
212 
213         emit_request = get_named_property(server_obj, "emit_request");
214 
215         nxt_async_context   async_context(env(), "request_handler");
216         nxt_callback_scope  async_scope(async_context);
217 
218         make_callback(async_context, server_obj, emit_request, request,
219                       response);
220 
221     } catch (exception &e) {
222         nxt_unit_req_warn(req, "request_handler: %s", e.str);
223     }
224 }
225 
226 
227 void
228 Unit::websocket_handler_cb(nxt_unit_websocket_frame_t *ws)
229 {
230     Unit  *obj;
231 
232     obj = reinterpret_cast<Unit *>(ws->req->unit->data);
233 
234     obj->websocket_handler(ws);
235 }
236 
237 
238 void
239 Unit::websocket_handler(nxt_unit_websocket_frame_t *ws)
240 {
241     napi_value  frame, server_obj, process_frame, conn;
242     req_data_t  *req_data;
243 
244     req_data = (req_data_t *) ws->req->data;
245 
246     try {
247         nxt_handle_scope  scope(env());
248 
249         server_obj = get_server_object();
250 
251         frame = create_websocket_frame(server_obj, ws);
252 
253         conn = get_reference_value(req_data->conn_ref);
254 
255         process_frame = get_named_property(conn, "processFrame");
256 
257         nxt_async_context   async_context(env(), "websocket_handler");
258         nxt_callback_scope  async_scope(async_context);
259 
260         make_callback(async_context, conn, process_frame, frame);
261 
262     } catch (exception &e) {
263         nxt_unit_req_warn(ws->req, "websocket_handler: %s", e.str);
264     }
265 
266     nxt_unit_websocket_done(ws);
267 }
268 
269 
270 void
271 Unit::close_handler_cb(nxt_unit_request_info_t *req)
272 {
273     Unit  *obj;
274 
275     obj = reinterpret_cast<Unit *>(req->unit->data);
276 
277     obj->close_handler(req);
278 }
279 
280 
281 void
282 Unit::close_handler(nxt_unit_request_info_t *req)
283 {
284     napi_value  conn_handle_close, conn;
285     req_data_t  *req_data;
286 
287     req_data = (req_data_t *) req->data;
288 
289     try {
290         nxt_handle_scope  scope(env());
291 
292         conn = get_reference_value(req_data->conn_ref);
293 
294         conn_handle_close = get_named_property(conn, "handleSocketClose");
295 
296         nxt_async_context   async_context(env(), "close_handler");
297         nxt_callback_scope  async_scope(async_context);
298 
299         make_callback(async_context, conn, conn_handle_close,
300                       nxt_napi::create(0));
301 
302         remove_wrap(req_data->sock_ref);
303         remove_wrap(req_data->resp_ref);
304         remove_wrap(req_data->conn_ref);
305 
306     } catch (exception &e) {
307         nxt_unit_req_warn(req, "close_handler: %s", e.str);
308 
309         return;
310     }
311 
312     nxt_unit_request_done(req, NXT_UNIT_OK);
313 }
314 
315 
316 void
317 Unit::shm_ack_handler_cb(nxt_unit_ctx_t *ctx)
318 {
319     Unit  *obj;
320 
321     obj = reinterpret_cast<Unit *>(ctx->unit->data);
322 
323     obj->shm_ack_handler(ctx);
324 }
325 
326 
327 void
328 Unit::shm_ack_handler(nxt_unit_ctx_t *ctx)
329 {
330     napi_value  server_obj, emit_drain;
331 
332     try {
333         nxt_handle_scope  scope(env());
334 
335         server_obj = get_server_object();
336 
337         emit_drain = get_named_property(server_obj, "emit_drain");
338 
339         nxt_async_context   async_context(env(), "shm_ack_handler");
340         nxt_callback_scope  async_scope(async_context);
341 
342         make_callback(async_context, server_obj, emit_drain);
343 
344     } catch (exception &e) {
345         nxt_unit_warn(ctx, "shm_ack_handler: %s", e.str);
346     }
347 }
348 
349 
350 static void
351 nxt_uv_read_callback(uv_poll_t *handle, int status, int events)
352 {
353     nxt_unit_run_once((nxt_unit_ctx_t *) handle->data);
354 }
355 
356 
357 int
358 Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
359 {
360     int               err;
361     Unit              *obj;
362     uv_loop_t         *loop;
363     napi_status       status;
364     nxt_nodejs_ctx_t  *node_ctx;
365 
366     if (port->in_fd != -1) {
367         obj = reinterpret_cast<Unit *>(ctx->unit->data);
368 
369         if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) {
370             nxt_unit_warn(ctx, "fcntl(%d, O_NONBLOCK) failed: %s (%d)",
371                           port->in_fd, strerror(errno), errno);
372             return -1;
373         }
374 
375         status = napi_get_uv_event_loop(obj->env(), &loop);
376         if (status != napi_ok) {
377             nxt_unit_warn(ctx, "Failed to get uv.loop");
378             return NXT_UNIT_ERROR;
379         }
380 
381         node_ctx = new nxt_nodejs_ctx_t;
382 
383         err = uv_poll_init(loop, &node_ctx->poll, port->in_fd);
384         if (err < 0) {
385             nxt_unit_warn(ctx, "Failed to init uv.poll");
386             return NXT_UNIT_ERROR;
387         }
388 
389         err = uv_poll_start(&node_ctx->poll, UV_READABLE, nxt_uv_read_callback);
390         if (err < 0) {
391             nxt_unit_warn(ctx, "Failed to start uv.poll");
392             return NXT_UNIT_ERROR;
393         }
394 
395         ctx->data = node_ctx;
396 
397         node_ctx->port_id = port->id;
398         node_ctx->poll.data = ctx;
399     }
400 
401     return nxt_unit_add_port(ctx, port);
402 }
403 
404 
405 inline bool
406 operator == (const nxt_unit_port_id_t &p1, const nxt_unit_port_id_t &p2)
407 {
408     return p1.pid == p2.pid && p1.id == p2.id;
409 }
410 
411 
412 void
413 Unit::remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
414 {
415     nxt_nodejs_ctx_t  *node_ctx;
416 
417     if (ctx->data != NULL) {
418         node_ctx = (nxt_nodejs_ctx_t *) ctx->data;
419 
420         if (node_ctx->port_id == *port_id) {
421             uv_poll_stop(&node_ctx->poll);
422 
423             node_ctx->poll.data = node_ctx;
424             uv_close((uv_handle_t *) &node_ctx->poll, delete_port_data);
425 
426             ctx->data = NULL;
427         }
428     }
429 
430     nxt_unit_remove_port(ctx, port_id);
431 }
432 
433 
434 static void
435 delete_port_data(uv_handle_t* handle)
436 {
437     nxt_nodejs_ctx_t  *node_ctx;
438 
439     node_ctx = (nxt_nodejs_ctx_t *) handle->data;
440 
441     delete node_ctx;
442 }
443 
444 
445 void
446 Unit::quit_cb(nxt_unit_ctx_t *ctx)
447 {
448     Unit  *obj;
449 
450     obj = reinterpret_cast<Unit *>(ctx->unit->data);
451 
452     obj->quit(ctx);
453 }
454 
455 
456 void
457 Unit::quit(nxt_unit_ctx_t *ctx)
458 {
459     napi_value  server_obj, emit_close;
460 
461     try {
462         nxt_handle_scope  scope(env());
463 
464         server_obj = get_server_object();
465 
466         emit_close = get_named_property(server_obj, "emit_close");
467 
468         nxt_async_context   async_context(env(), "unit_quit");
469         nxt_callback_scope  async_scope(async_context);
470 
471         make_callback(async_context, server_obj, emit_close);
472 
473     } catch (exception &e) {
474         nxt_unit_debug(ctx, "quit: %s", e.str);
475     }
476 
477     nxt_unit_done(ctx);
478 }
479 
480 
481 napi_value
482 Unit::get_server_object()
483 {
484     napi_value  unit_obj;
485 
486     unit_obj = get_reference_value(wrapper_);
487 
488     return get_named_property(unit_obj, "server");
489 }
490 
491 
492 void
493 Unit::create_headers(nxt_unit_request_info_t *req, napi_value request)
494 {
495     void                *data;
496     uint32_t            i;
497     napi_value          headers, raw_headers, buffer;
498     napi_status         status;
499     nxt_unit_request_t  *r;
500 
501     r = req->request;
502 
503     headers = create_object();
504 
505     status = napi_create_array_with_length(env(), r->fields_count * 2,
506                                            &raw_headers);
507     if (status != napi_ok) {
508         throw exception("Failed to create array");
509     }
510 
511     for (i = 0; i < r->fields_count; i++) {
512         append_header(r->fields + i, headers, raw_headers, i);
513     }
514 
515     set_named_property(request, "headers", headers);
516     set_named_property(request, "rawHeaders", raw_headers);
517     set_named_property(request, "httpVersion", r->version, r->version_length);
518     set_named_property(request, "method", r->method, r->method_length);
519     set_named_property(request, "url", r->target, r->target_length);
520 
521     set_named_property(request, "_websocket_handshake", r->websocket_handshake);
522 
523     buffer = create_buffer((size_t) req->content_length, &data);
524     nxt_unit_request_read(req, data, req->content_length);
525 
526     set_named_property(request, "_data", buffer);
527 }
528 
529 
530 inline char
531 lowcase(char c)
532 {
533     return (c >= 'A' && c <= 'Z') ? (c | 0x20) : c;
534 }
535 
536 
537 inline void
538 Unit::append_header(nxt_unit_field_t *f, napi_value headers,
539     napi_value raw_headers, uint32_t idx)
540 {
541     char        *name;
542     uint8_t     i;
543     napi_value  str, vstr;
544 
545     name = (char *) nxt_unit_sptr_get(&f->name);
546 
547     str = create_string_latin1(name, f->name_length);
548 
549     for (i = 0; i < f->name_length; i++) {
550         name[i] = lowcase(name[i]);
551     }
552 
553     vstr = set_named_property(headers, name, f->value, f->value_length);
554 
555     set_element(raw_headers, idx * 2, str);
556     set_element(raw_headers, idx * 2 + 1, vstr);
557 }
558 
559 
560 napi_value
561 Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req)
562 {
563     napi_value          constructor, res;
564     req_data_t          *req_data;
565     nxt_unit_request_t  *r;
566 
567     r = req->request;
568 
569     constructor = get_named_property(server_obj, "Socket");
570 
571     res = new_instance(constructor);
572 
573     req_data = (req_data_t *) req->data;
574     req_data->sock_ref = wrap(res, req, sock_destroy);
575 
576     set_named_property(res, "remoteAddress", r->remote, r->remote_length);
577     set_named_property(res, "localAddress", r->local, r->local_length);
578 
579     return res;
580 }
581 
582 
583 napi_value
584 Unit::create_request(napi_value server_obj, napi_value socket)
585 {
586     napi_value  constructor;
587 
588     constructor = get_named_property(server_obj, "ServerRequest");
589 
590     return new_instance(constructor, server_obj, socket);
591 }
592 
593 
594 napi_value
595 Unit::create_response(napi_value server_obj, napi_value request,
596     nxt_unit_request_info_t *req)
597 {
598     napi_value  constructor, res;
599     req_data_t  *req_data;
600 
601     constructor = get_named_property(server_obj, "ServerResponse");
602 
603     res = new_instance(constructor, request);
604 
605     req_data = (req_data_t *) req->data;
606     req_data->resp_ref = wrap(res, req, resp_destroy);
607 
608     return res;
609 }
610 
611 
612 napi_value
613 Unit::create_websocket_frame(napi_value server_obj,
614                              nxt_unit_websocket_frame_t *ws)
615 {
616     void        *data;
617     napi_value  constructor, res, buffer;
618     uint8_t     sc[2];
619 
620     constructor = get_named_property(server_obj, "WebSocketFrame");
621 
622     res = new_instance(constructor);
623 
624     set_named_property(res, "fin", (bool) ws->header->fin);
625     set_named_property(res, "opcode", ws->header->opcode);
626     set_named_property(res, "length", (int64_t) ws->payload_len);
627 
628     if (ws->header->opcode == NXT_WEBSOCKET_OP_CLOSE) {
629         if (ws->payload_len >= 2) {
630             nxt_unit_websocket_read(ws, sc, 2);
631 
632             set_named_property(res, "closeStatus",
633                                (((uint16_t) sc[0]) << 8) | sc[1]);
634 
635         } else {
636             set_named_property(res, "closeStatus", -1);
637         }
638     }
639 
640     buffer = create_buffer((size_t) ws->content_length, &data);
641     nxt_unit_websocket_read(ws, data, ws->content_length);
642 
643     set_named_property(res, "binaryPayload", buffer);
644 
645     return res;
646 }
647 
648 
649 napi_value
650 Unit::response_send_headers(napi_env env, napi_callback_info info)
651 {
652     int                      ret;
653     char                     *ptr, *name_ptr;
654     bool                     is_array;
655     size_t                   argc, name_len, value_len;
656     uint32_t                 status_code, header_len, keys_len, array_len;
657     uint32_t                 keys_count, i, j;
658     uint16_t                 hash;
659     nxt_napi                 napi(env);
660     napi_value               this_arg, headers, keys, name, value, array_val;
661     napi_value               array_entry;
662     napi_valuetype           val_type;
663     nxt_unit_field_t         *f;
664     nxt_unit_request_info_t  *req;
665     napi_value               argv[4];
666 
667     argc = 4;
668 
669     try {
670         this_arg = napi.get_cb_info(info, argc, argv);
671         if (argc != 4) {
672             napi.throw_error("Wrong args count. Expected: "
673                              "statusCode, headers, headers count, "
674                              "headers length");
675             return nullptr;
676         }
677 
678         req = napi.get_request_info(this_arg);
679         status_code = napi.get_value_uint32(argv[0]);
680         keys_count = napi.get_value_uint32(argv[2]);
681         header_len = napi.get_value_uint32(argv[3]);
682 
683         headers = argv[1];
684 
685         ret = nxt_unit_response_init(req, status_code, keys_count, header_len);
686         if (ret != NXT_UNIT_OK) {
687             napi.throw_error("Failed to create response");
688             return nullptr;
689         }
690 
691         /*
692          * Each name and value are 0-terminated by libunit.
693          * Need to add extra 2 bytes for each header.
694          */
695         header_len += keys_count * 2;
696 
697         keys = napi.get_property_names(headers);
698         keys_len = napi.get_array_length(keys);
699 
700         ptr = req->response_buf->free;
701 
702         for (i = 0; i < keys_len; i++) {
703             name = napi.get_element(keys, i);
704 
705             array_entry = napi.get_property(headers, name);
706 
707             name = napi.get_element(array_entry, 0);
708             value = napi.get_element(array_entry, 1);
709 
710             name_len = napi.get_value_string_latin1(name, ptr, header_len);
711             name_ptr = ptr;
712 
713             ptr += name_len + 1;
714             header_len -= name_len + 1;
715 
716             hash = nxt_unit_field_hash(name_ptr, name_len);
717 
718             is_array = napi.is_array(value);
719 
720             if (is_array) {
721                 array_len = napi.get_array_length(value);
722 
723                 for (j = 0; j < array_len; j++) {
724                     array_val = napi.get_element(value, j);
725 
726                     val_type = napi.type_of(array_val);
727 
728                     if (val_type != napi_string) {
729                         array_val = napi.coerce_to_string(array_val);
730                     }
731 
732                     value_len = napi.get_value_string_latin1(array_val, ptr,
733                                                              header_len);
734 
735                     f = req->response->fields + req->response->fields_count;
736                     f->skip = 0;
737 
738                     nxt_unit_sptr_set(&f->name, name_ptr);
739 
740                     f->name_length = name_len;
741                     f->hash = hash;
742 
743                     nxt_unit_sptr_set(&f->value, ptr);
744                     f->value_length = (uint32_t) value_len;
745 
746                     ptr += value_len + 1;
747                     header_len -= value_len + 1;
748 
749                     req->response->fields_count++;
750                 }
751 
752             } else {
753                 val_type = napi.type_of(value);
754 
755                 if (val_type != napi_string) {
756                     value = napi.coerce_to_string(value);
757                 }
758 
759                 value_len = napi.get_value_string_latin1(value, ptr, header_len);
760 
761                 f = req->response->fields + req->response->fields_count;
762                 f->skip = 0;
763 
764                 nxt_unit_sptr_set(&f->name, name_ptr);
765 
766                 f->name_length = name_len;
767                 f->hash = hash;
768 
769                 nxt_unit_sptr_set(&f->value, ptr);
770                 f->value_length = (uint32_t) value_len;
771 
772                 ptr += value_len + 1;
773                 header_len -= value_len + 1;
774 
775                 req->response->fields_count++;
776             }
777         }
778 
779     } catch (exception &e) {
780         napi.throw_error(e);
781         return nullptr;
782     }
783 
784     req->response_buf->free = ptr;
785 
786     ret = nxt_unit_response_send(req);
787     if (ret != NXT_UNIT_OK) {
788         napi.throw_error("Failed to send response");
789         return nullptr;
790     }
791 
792     return this_arg;
793 }
794 
795 
796 napi_value
797 Unit::response_write(napi_env env, napi_callback_info info)
798 {
799     int                      ret;
800     void                     *ptr;
801     size_t                   argc, have_buf_len;
802     ssize_t                  res_len;
803     uint32_t                 buf_start, buf_len;
804     nxt_napi                 napi(env);
805     napi_value               this_arg;
806     nxt_unit_buf_t           *buf;
807     napi_valuetype           buf_type;
808     nxt_unit_request_info_t  *req;
809     napi_value               argv[3];
810 
811     argc = 3;
812 
813     try {
814         this_arg = napi.get_cb_info(info, argc, argv);
815         if (argc != 3) {
816             throw exception("Wrong args count. Expected: "
817                             "chunk, start, length");
818         }
819 
820         req = napi.get_request_info(this_arg);
821         buf_type = napi.type_of(argv[0]);
822         buf_start = napi.get_value_uint32(argv[1]);
823         buf_len = napi.get_value_uint32(argv[2]) + 1;
824 
825         if (buf_type == napi_string) {
826             /* TODO: will work only for utf8 content-type */
827 
828             if (req->response_buf != NULL
829                 && req->response_buf->end >= req->response_buf->free + buf_len)
830             {
831                 buf = req->response_buf;
832 
833             } else {
834                 buf = nxt_unit_response_buf_alloc(req, buf_len);
835                 if (buf == NULL) {
836                     throw exception("Failed to allocate response buffer");
837                 }
838             }
839 
840             have_buf_len = napi.get_value_string_utf8(argv[0], buf->free,
841                                                       buf_len);
842 
843             buf->free += have_buf_len;
844 
845             ret = nxt_unit_buf_send(buf);
846             if (ret == NXT_UNIT_OK) {
847                 res_len = have_buf_len;
848             }
849 
850         } else {
851             ptr = napi.get_buffer_info(argv[0], have_buf_len);
852 
853             if (buf_start > 0) {
854                 ptr = ((uint8_t *) ptr) + buf_start;
855                 have_buf_len -= buf_start;
856             }
857 
858             res_len = nxt_unit_response_write_nb(req, ptr, have_buf_len, 0);
859 
860             ret = res_len < 0 ? -res_len : (int) NXT_UNIT_OK;
861         }
862 
863         if (ret != NXT_UNIT_OK) {
864             throw exception("Failed to send body buf");
865         }
866     } catch (exception &e) {
867         napi.throw_error(e);
868         return nullptr;
869     }
870 
871     return napi.create((int64_t) res_len);
872 }
873 
874 
875 napi_value
876 Unit::response_end(napi_env env, napi_callback_info info)
877 {
878     nxt_napi                 napi(env);
879     napi_value               this_arg;
880     req_data_t               *req_data;
881     nxt_unit_request_info_t  *req;
882 
883     try {
884         this_arg = napi.get_cb_info(info);
885 
886         req = napi.get_request_info(this_arg);
887 
888         req_data = (req_data_t *) req->data;
889 
890         napi.remove_wrap(req_data->sock_ref);
891         napi.remove_wrap(req_data->resp_ref);
892         napi.remove_wrap(req_data->conn_ref);
893 
894     } catch (exception &e) {
895         napi.throw_error(e);
896         return nullptr;
897     }
898 
899     nxt_unit_request_done(req, NXT_UNIT_OK);
900 
901     return this_arg;
902 }
903 
904 
905 napi_value
906 Unit::websocket_send_frame(napi_env env, napi_callback_info info)
907 {
908     int                      ret, iovec_len;
909     bool                     fin;
910     size_t                   buf_len;
911     uint32_t                 opcode, sc;
912     nxt_napi                 napi(env);
913     napi_value               this_arg, frame, payload;
914     nxt_unit_request_info_t  *req;
915     char                     status_code[2];
916     struct iovec             iov[2];
917 
918     iovec_len = 0;
919 
920     try {
921         this_arg = napi.get_cb_info(info, frame);
922 
923         req = napi.get_request_info(this_arg);
924 
925         opcode = napi.get_value_uint32(napi.get_named_property(frame,
926                                                                "opcode"));
927         if (opcode == NXT_WEBSOCKET_OP_CLOSE) {
928             sc = napi.get_value_uint32(napi.get_named_property(frame,
929                                                                "closeStatus"));
930             status_code[0] = (sc >> 8) & 0xFF;
931             status_code[1] = sc & 0xFF;
932 
933             iov[iovec_len].iov_base = status_code;
934             iov[iovec_len].iov_len = 2;
935             iovec_len++;
936         }
937 
938         try {
939             fin = napi.get_value_bool(napi.get_named_property(frame, "fin"));
940 
941         } catch (exception &e) {
942             fin = true;
943         }
944 
945         payload = napi.get_named_property(frame, "binaryPayload");
946 
947         if (napi.is_buffer(payload)) {
948             iov[iovec_len].iov_base = napi.get_buffer_info(payload, buf_len);
949 
950         } else {
951             buf_len = 0;
952         }
953 
954     } catch (exception &e) {
955         napi.throw_error(e);
956         return nullptr;
957     }
958 
959     if (buf_len > 0) {
960         iov[iovec_len].iov_len = buf_len;
961         iovec_len++;
962     }
963 
964     ret = nxt_unit_websocket_sendv(req, opcode, fin ? 1 : 0, iov, iovec_len);
965     if (ret != NXT_UNIT_OK) {
966         goto failed;
967     }
968 
969     return this_arg;
970 
971 failed:
972 
973     napi.throw_error("Failed to send frame");
974 
975     return nullptr;
976 }
977 
978 
979 napi_value
980 Unit::websocket_set_sock(napi_env env, napi_callback_info info)
981 {
982     nxt_napi                 napi(env);
983     napi_value               this_arg, sock;
984     req_data_t               *req_data;
985     nxt_unit_request_info_t  *req;
986 
987     try {
988         this_arg = napi.get_cb_info(info, sock);
989 
990         req = napi.get_request_info(sock);
991 
992         req_data = (req_data_t *) req->data;
993         req_data->conn_ref = napi.wrap(this_arg, req, conn_destroy);
994 
995     } catch (exception &e) {
996         napi.throw_error(e);
997         return nullptr;
998     }
999 
1000     return this_arg;
1001 }
1002 
1003 
1004 void
1005 Unit::conn_destroy(napi_env env, void *nativeObject, void *finalize_hint)
1006 {
1007     nxt_unit_request_info_t  *req;
1008 
1009     req = (nxt_unit_request_info_t *) nativeObject;
1010 
1011     nxt_unit_warn(NULL, "conn_destroy: %p", req);
1012 }
1013 
1014 
1015 void
1016 Unit::sock_destroy(napi_env env, void *nativeObject, void *finalize_hint)
1017 {
1018     nxt_unit_request_info_t  *req;
1019 
1020     req = (nxt_unit_request_info_t *) nativeObject;
1021 
1022     nxt_unit_warn(NULL, "sock_destroy: %p", req);
1023 }
1024 
1025 
1026 void
1027 Unit::resp_destroy(napi_env env, void *nativeObject, void *finalize_hint)
1028 {
1029     nxt_unit_request_info_t  *req;
1030 
1031     req = (nxt_unit_request_info_t *) nativeObject;
1032 
1033     nxt_unit_warn(NULL, "resp_destroy: %p", req);
1034 }
1035