xref: /unit/src/nodejs/unit-http/unit.cpp (revision 1342:f32913c67752)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #include "unit.h"
7 
8 #include <unistd.h>
9 #include <fcntl.h>
10 
11 #include <uv.h>
12 
13 #include <nxt_unit_websocket.h>
14 
15 
16 napi_ref Unit::constructor_;
17 
18 
19 struct nxt_nodejs_ctx_t {
20     nxt_unit_port_id_t  port_id;
21     uv_poll_t           poll;
22 };
23 
24 
25 struct req_data_t {
26     napi_ref  sock_ref;
27     napi_ref  resp_ref;
28     napi_ref  conn_ref;
29 };
30 
31 
32 Unit::Unit(napi_env env, napi_value jsthis):
33     nxt_napi(env),
34     wrapper_(wrap(jsthis, this, destroy)),
35     unit_ctx_(nullptr)
36 {
37     nxt_unit_debug(NULL, "Unit::Unit()");
38 }
39 
40 
41 Unit::~Unit()
42 {
43     delete_reference(wrapper_);
44 
45     nxt_unit_debug(NULL, "Unit::~Unit()");
46 }
47 
48 
49 napi_value
50 Unit::init(napi_env env, napi_value exports)
51 {
52     nxt_napi    napi(env);
53     napi_value  ctor;
54 
55     napi_property_descriptor  unit_props[] = {
56         { "createServer", 0, create_server, 0, 0, 0, napi_default, 0 },
57         { "listen", 0, listen, 0, 0, 0, napi_default, 0 },
58     };
59 
60     try {
61         ctor = napi.define_class("Unit", create, 2, unit_props);
62         constructor_ = napi.create_reference(ctor);
63 
64         napi.set_named_property(exports, "Unit", ctor);
65         napi.set_named_property(exports, "response_send_headers",
66                                 response_send_headers);
67         napi.set_named_property(exports, "response_write", response_write);
68         napi.set_named_property(exports, "response_end", response_end);
69         napi.set_named_property(exports, "websocket_send_frame",
70                                 websocket_send_frame);
71         napi.set_named_property(exports, "websocket_set_sock",
72                                 websocket_set_sock);
73         napi.set_named_property(exports, "buf_min", nxt_unit_buf_min());
74         napi.set_named_property(exports, "buf_max", nxt_unit_buf_max());
75 
76     } catch (exception &e) {
77         napi.throw_error(e);
78         return nullptr;
79     }
80 
81     return exports;
82 }
83 
84 
85 void
86 Unit::destroy(napi_env env, void *nativeObject, void *finalize_hint)
87 {
88     Unit  *obj = reinterpret_cast<Unit *>(nativeObject);
89 
90     delete obj;
91 }
92 
93 
94 napi_value
95 Unit::create(napi_env env, napi_callback_info info)
96 {
97     nxt_napi    napi(env);
98     napi_value  target, ctor, instance, jsthis;
99 
100     try {
101         target = napi.get_new_target(info);
102 
103         if (target != nullptr) {
104             /* Invoked as constructor: `new Unit(...)`. */
105             jsthis = napi.get_cb_info(info);
106 
107             new Unit(env, jsthis);
108             napi.create_reference(jsthis);
109 
110             return jsthis;
111         }
112 
113         /* Invoked as plain function `Unit(...)`, turn into construct call. */
114         ctor = napi.get_reference_value(constructor_);
115         instance = napi.new_instance(ctor);
116         napi.create_reference(instance);
117 
118     } catch (exception &e) {
119         napi.throw_error(e);
120         return nullptr;
121     }
122 
123     return instance;
124 }
125 
126 
127 napi_value
128 Unit::create_server(napi_env env, napi_callback_info info)
129 {
130     Unit             *obj;
131     size_t           argc;
132     nxt_napi         napi(env);
133     napi_value       jsthis, argv;
134     nxt_unit_init_t  unit_init;
135 
136     argc = 1;
137 
138     try {
139         jsthis = napi.get_cb_info(info, argc, &argv);
140         obj = (Unit *) napi.unwrap(jsthis);
141 
142     } catch (exception &e) {
143         napi.throw_error(e);
144         return nullptr;
145     }
146 
147     memset(&unit_init, 0, sizeof(nxt_unit_init_t));
148 
149     unit_init.data = obj;
150     unit_init.callbacks.request_handler   = request_handler_cb;
151     unit_init.callbacks.websocket_handler = websocket_handler_cb;
152     unit_init.callbacks.close_handler     = close_handler_cb;
153     unit_init.callbacks.shm_ack_handler   = shm_ack_handler_cb;
154     unit_init.callbacks.add_port          = add_port;
155     unit_init.callbacks.remove_port       = remove_port;
156     unit_init.callbacks.quit              = quit_cb;
157 
158     unit_init.request_data_size = sizeof(req_data_t);
159 
160     obj->unit_ctx_ = nxt_unit_init(&unit_init);
161     if (obj->unit_ctx_ == NULL) {
162         goto failed;
163     }
164 
165     return nullptr;
166 
167 failed:
168 
169     napi_throw_error(env, NULL, "Failed to create Unit object");
170 
171     return nullptr;
172 }
173 
174 
175 napi_value
176 Unit::listen(napi_env env, napi_callback_info info)
177 {
178     return nullptr;
179 }
180 
181 
182 void
183 Unit::request_handler_cb(nxt_unit_request_info_t *req)
184 {
185     Unit  *obj;
186 
187     obj = reinterpret_cast<Unit *>(req->unit->data);
188 
189     obj->request_handler(req);
190 }
191 
192 
193 void
194 Unit::request_handler(nxt_unit_request_info_t *req)
195 {
196     napi_value  socket, request, response, server_obj, emit_request;
197 
198     memset(req->data, 0, sizeof(req_data_t));
199 
200     try {
201         nxt_handle_scope  scope(env());
202 
203         server_obj = get_server_object();
204 
205         socket = create_socket(server_obj, req);
206         request = create_request(server_obj, socket);
207         response = create_response(server_obj, request, req);
208 
209         create_headers(req, request);
210 
211         emit_request = get_named_property(server_obj, "emit_request");
212 
213         nxt_async_context   async_context(env(), "request_handler");
214         nxt_callback_scope  async_scope(async_context);
215 
216         make_callback(async_context, server_obj, emit_request, request,
217                       response);
218 
219     } catch (exception &e) {
220         nxt_unit_req_warn(req, "request_handler: %s", e.str);
221     }
222 }
223 
224 
225 void
226 Unit::websocket_handler_cb(nxt_unit_websocket_frame_t *ws)
227 {
228     Unit  *obj;
229 
230     obj = reinterpret_cast<Unit *>(ws->req->unit->data);
231 
232     obj->websocket_handler(ws);
233 }
234 
235 
236 void
237 Unit::websocket_handler(nxt_unit_websocket_frame_t *ws)
238 {
239     napi_value  frame, server_obj, process_frame, conn;
240     req_data_t  *req_data;
241 
242     req_data = (req_data_t *) ws->req->data;
243 
244     try {
245         nxt_handle_scope  scope(env());
246 
247         server_obj = get_server_object();
248 
249         frame = create_websocket_frame(server_obj, ws);
250 
251         conn = get_reference_value(req_data->conn_ref);
252 
253         process_frame = get_named_property(conn, "processFrame");
254 
255         nxt_async_context   async_context(env(), "websocket_handler");
256         nxt_callback_scope  async_scope(async_context);
257 
258         make_callback(async_context, conn, process_frame, frame);
259 
260     } catch (exception &e) {
261         nxt_unit_req_warn(ws->req, "websocket_handler: %s", e.str);
262     }
263 
264     nxt_unit_websocket_done(ws);
265 }
266 
267 
268 void
269 Unit::close_handler_cb(nxt_unit_request_info_t *req)
270 {
271     Unit  *obj;
272 
273     obj = reinterpret_cast<Unit *>(req->unit->data);
274 
275     obj->close_handler(req);
276 }
277 
278 
279 void
280 Unit::close_handler(nxt_unit_request_info_t *req)
281 {
282     napi_value  conn_handle_close, conn;
283     req_data_t  *req_data;
284 
285     req_data = (req_data_t *) req->data;
286 
287     try {
288         nxt_handle_scope  scope(env());
289 
290         conn = get_reference_value(req_data->conn_ref);
291 
292         conn_handle_close = get_named_property(conn, "handleSocketClose");
293 
294         nxt_async_context   async_context(env(), "close_handler");
295         nxt_callback_scope  async_scope(async_context);
296 
297         make_callback(async_context, conn, conn_handle_close,
298                       nxt_napi::create(0));
299 
300         remove_wrap(req_data->sock_ref);
301         remove_wrap(req_data->resp_ref);
302         remove_wrap(req_data->conn_ref);
303 
304     } catch (exception &e) {
305         nxt_unit_req_warn(req, "close_handler: %s", e.str);
306 
307         return;
308     }
309 
310     nxt_unit_request_done(req, NXT_UNIT_OK);
311 }
312 
313 
314 void
315 Unit::shm_ack_handler_cb(nxt_unit_ctx_t *ctx)
316 {
317     Unit  *obj;
318 
319     obj = reinterpret_cast<Unit *>(ctx->unit->data);
320 
321     obj->shm_ack_handler(ctx);
322 }
323 
324 
325 void
326 Unit::shm_ack_handler(nxt_unit_ctx_t *ctx)
327 {
328     napi_value  server_obj, emit_drain;
329 
330     try {
331         nxt_handle_scope  scope(env());
332 
333         server_obj = get_server_object();
334 
335         emit_drain = get_named_property(server_obj, "emit_drain");
336 
337         nxt_async_context   async_context(env(), "shm_ack_handler");
338         nxt_callback_scope  async_scope(async_context);
339 
340         make_callback(async_context, server_obj, emit_drain);
341 
342     } catch (exception &e) {
343         nxt_unit_warn(ctx, "shm_ack_handler: %s", e.str);
344     }
345 }
346 
347 
348 static void
349 nxt_uv_read_callback(uv_poll_t *handle, int status, int events)
350 {
351     nxt_unit_run_once((nxt_unit_ctx_t *) handle->data);
352 }
353 
354 
355 int
356 Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
357 {
358     int               err;
359     Unit              *obj;
360     uv_loop_t         *loop;
361     napi_status       status;
362     nxt_nodejs_ctx_t  *node_ctx;
363 
364     if (port->in_fd != -1) {
365         obj = reinterpret_cast<Unit *>(ctx->unit->data);
366 
367         if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) {
368             nxt_unit_warn(ctx, "fcntl(%d, O_NONBLOCK) failed: %s (%d)",
369                           port->in_fd, strerror(errno), errno);
370             return -1;
371         }
372 
373         status = napi_get_uv_event_loop(obj->env(), &loop);
374         if (status != napi_ok) {
375             nxt_unit_warn(ctx, "Failed to get uv.loop");
376             return NXT_UNIT_ERROR;
377         }
378 
379         node_ctx = new nxt_nodejs_ctx_t;
380 
381         err = uv_poll_init(loop, &node_ctx->poll, port->in_fd);
382         if (err < 0) {
383             nxt_unit_warn(ctx, "Failed to init uv.poll");
384             return NXT_UNIT_ERROR;
385         }
386 
387         err = uv_poll_start(&node_ctx->poll, UV_READABLE, nxt_uv_read_callback);
388         if (err < 0) {
389             nxt_unit_warn(ctx, "Failed to start uv.poll");
390             return NXT_UNIT_ERROR;
391         }
392 
393         ctx->data = node_ctx;
394 
395         node_ctx->port_id = port->id;
396         node_ctx->poll.data = ctx;
397     }
398 
399     return nxt_unit_add_port(ctx, port);
400 }
401 
402 
403 inline bool
404 operator == (const nxt_unit_port_id_t &p1, const nxt_unit_port_id_t &p2)
405 {
406     return p1.pid == p2.pid && p1.id == p2.id;
407 }
408 
409 
410 void
411 Unit::remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
412 {
413     nxt_nodejs_ctx_t  *node_ctx;
414 
415     if (ctx->data != NULL) {
416         node_ctx = (nxt_nodejs_ctx_t *) ctx->data;
417 
418         if (node_ctx->port_id == *port_id) {
419             uv_poll_stop(&node_ctx->poll);
420 
421             delete node_ctx;
422 
423             ctx->data = NULL;
424         }
425     }
426 
427     nxt_unit_remove_port(ctx, port_id);
428 }
429 
430 
431 void
432 Unit::quit_cb(nxt_unit_ctx_t *ctx)
433 {
434     Unit  *obj;
435 
436     obj = reinterpret_cast<Unit *>(ctx->unit->data);
437 
438     obj->quit(ctx);
439 }
440 
441 
442 void
443 Unit::quit(nxt_unit_ctx_t *ctx)
444 {
445     napi_value  server_obj, emit_close;
446 
447     try {
448         nxt_handle_scope  scope(env());
449 
450         server_obj = get_server_object();
451 
452         emit_close = get_named_property(server_obj, "emit_close");
453 
454         nxt_async_context   async_context(env(), "unit_quit");
455         nxt_callback_scope  async_scope(async_context);
456 
457         make_callback(async_context, server_obj, emit_close);
458 
459     } catch (exception &e) {
460         nxt_unit_debug(ctx, "quit: %s", e.str);
461     }
462 
463     nxt_unit_done(ctx);
464 }
465 
466 
467 napi_value
468 Unit::get_server_object()
469 {
470     napi_value  unit_obj;
471 
472     unit_obj = get_reference_value(wrapper_);
473 
474     return get_named_property(unit_obj, "server");
475 }
476 
477 
478 void
479 Unit::create_headers(nxt_unit_request_info_t *req, napi_value request)
480 {
481     void                *data;
482     uint32_t            i;
483     napi_value          headers, raw_headers, buffer;
484     napi_status         status;
485     nxt_unit_request_t  *r;
486 
487     r = req->request;
488 
489     headers = create_object();
490 
491     status = napi_create_array_with_length(env(), r->fields_count * 2,
492                                            &raw_headers);
493     if (status != napi_ok) {
494         throw exception("Failed to create array");
495     }
496 
497     for (i = 0; i < r->fields_count; i++) {
498         append_header(r->fields + i, headers, raw_headers, i);
499     }
500 
501     set_named_property(request, "headers", headers);
502     set_named_property(request, "rawHeaders", raw_headers);
503     set_named_property(request, "httpVersion", r->version, r->version_length);
504     set_named_property(request, "method", r->method, r->method_length);
505     set_named_property(request, "url", r->target, r->target_length);
506 
507     set_named_property(request, "_websocket_handshake", r->websocket_handshake);
508 
509     buffer = create_buffer((size_t) req->content_length, &data);
510     nxt_unit_request_read(req, data, req->content_length);
511 
512     set_named_property(request, "_data", buffer);
513 }
514 
515 
516 inline char
517 lowcase(char c)
518 {
519     return (c >= 'A' && c <= 'Z') ? (c | 0x20) : c;
520 }
521 
522 
523 inline void
524 Unit::append_header(nxt_unit_field_t *f, napi_value headers,
525     napi_value raw_headers, uint32_t idx)
526 {
527     char        *name;
528     uint8_t     i;
529     napi_value  str, vstr;
530 
531     name = (char *) nxt_unit_sptr_get(&f->name);
532 
533     str = create_string_latin1(name, f->name_length);
534 
535     for (i = 0; i < f->name_length; i++) {
536         name[i] = lowcase(name[i]);
537     }
538 
539     vstr = set_named_property(headers, name, f->value, f->value_length);
540 
541     set_element(raw_headers, idx * 2, str);
542     set_element(raw_headers, idx * 2 + 1, vstr);
543 }
544 
545 
546 napi_value
547 Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req)
548 {
549     napi_value          constructor, res;
550     req_data_t          *req_data;
551     nxt_unit_request_t  *r;
552 
553     r = req->request;
554 
555     constructor = get_named_property(server_obj, "Socket");
556 
557     res = new_instance(constructor);
558 
559     req_data = (req_data_t *) req->data;
560     req_data->sock_ref = wrap(res, req, sock_destroy);
561 
562     set_named_property(res, "remoteAddress", r->remote, r->remote_length);
563     set_named_property(res, "localAddress", r->local, r->local_length);
564 
565     return res;
566 }
567 
568 
569 napi_value
570 Unit::create_request(napi_value server_obj, napi_value socket)
571 {
572     napi_value  constructor;
573 
574     constructor = get_named_property(server_obj, "ServerRequest");
575 
576     return new_instance(constructor, server_obj, socket);
577 }
578 
579 
580 napi_value
581 Unit::create_response(napi_value server_obj, napi_value request,
582     nxt_unit_request_info_t *req)
583 {
584     napi_value  constructor, res;
585     req_data_t  *req_data;
586 
587     constructor = get_named_property(server_obj, "ServerResponse");
588 
589     res = new_instance(constructor, request);
590 
591     req_data = (req_data_t *) req->data;
592     req_data->resp_ref = wrap(res, req, resp_destroy);
593 
594     return res;
595 }
596 
597 
598 napi_value
599 Unit::create_websocket_frame(napi_value server_obj,
600                              nxt_unit_websocket_frame_t *ws)
601 {
602     void        *data;
603     napi_value  constructor, res, buffer;
604     uint8_t     sc[2];
605 
606     constructor = get_named_property(server_obj, "WebSocketFrame");
607 
608     res = new_instance(constructor);
609 
610     set_named_property(res, "fin", (bool) ws->header->fin);
611     set_named_property(res, "opcode", ws->header->opcode);
612     set_named_property(res, "length", (int64_t) ws->payload_len);
613 
614     if (ws->header->opcode == NXT_WEBSOCKET_OP_CLOSE) {
615         if (ws->payload_len >= 2) {
616             nxt_unit_websocket_read(ws, sc, 2);
617 
618             set_named_property(res, "closeStatus",
619                                (((uint16_t) sc[0]) << 8) | sc[1]);
620 
621         } else {
622             set_named_property(res, "closeStatus", -1);
623         }
624     }
625 
626     buffer = create_buffer((size_t) ws->content_length, &data);
627     nxt_unit_websocket_read(ws, data, ws->content_length);
628 
629     set_named_property(res, "binaryPayload", buffer);
630 
631     return res;
632 }
633 
634 
635 napi_value
636 Unit::response_send_headers(napi_env env, napi_callback_info info)
637 {
638     int                      ret;
639     char                     *ptr, *name_ptr;
640     bool                     is_array;
641     size_t                   argc, name_len, value_len;
642     uint32_t                 status_code, header_len, keys_len, array_len;
643     uint32_t                 keys_count, i, j;
644     uint16_t                 hash;
645     nxt_napi                 napi(env);
646     napi_value               this_arg, headers, keys, name, value, array_val;
647     napi_value               array_entry;
648     napi_valuetype           val_type;
649     nxt_unit_field_t         *f;
650     nxt_unit_request_info_t  *req;
651     napi_value               argv[4];
652 
653     argc = 4;
654 
655     try {
656         this_arg = napi.get_cb_info(info, argc, argv);
657         if (argc != 4) {
658             napi.throw_error("Wrong args count. Expected: "
659                              "statusCode, headers, headers count, "
660                              "headers length");
661             return nullptr;
662         }
663 
664         req = napi.get_request_info(this_arg);
665         status_code = napi.get_value_uint32(argv[0]);
666         keys_count = napi.get_value_uint32(argv[2]);
667         header_len = napi.get_value_uint32(argv[3]);
668 
669         headers = argv[1];
670 
671         ret = nxt_unit_response_init(req, status_code, keys_count, header_len);
672         if (ret != NXT_UNIT_OK) {
673             napi.throw_error("Failed to create response");
674             return nullptr;
675         }
676 
677         /*
678          * Each name and value are 0-terminated by libunit.
679          * Need to add extra 2 bytes for each header.
680          */
681         header_len += keys_count * 2;
682 
683         keys = napi.get_property_names(headers);
684         keys_len = napi.get_array_length(keys);
685 
686         ptr = req->response_buf->free;
687 
688         for (i = 0; i < keys_len; i++) {
689             name = napi.get_element(keys, i);
690 
691             array_entry = napi.get_property(headers, name);
692 
693             name = napi.get_element(array_entry, 0);
694             value = napi.get_element(array_entry, 1);
695 
696             name_len = napi.get_value_string_latin1(name, ptr, header_len);
697             name_ptr = ptr;
698 
699             ptr += name_len + 1;
700             header_len -= name_len + 1;
701 
702             hash = nxt_unit_field_hash(name_ptr, name_len);
703 
704             is_array = napi.is_array(value);
705 
706             if (is_array) {
707                 array_len = napi.get_array_length(value);
708 
709                 for (j = 0; j < array_len; j++) {
710                     array_val = napi.get_element(value, j);
711 
712                     val_type = napi.type_of(array_val);
713 
714                     if (val_type != napi_string) {
715                         array_val = napi.coerce_to_string(array_val);
716                     }
717 
718                     value_len = napi.get_value_string_latin1(array_val, ptr,
719                                                              header_len);
720 
721                     f = req->response->fields + req->response->fields_count;
722                     f->skip = 0;
723 
724                     nxt_unit_sptr_set(&f->name, name_ptr);
725 
726                     f->name_length = name_len;
727                     f->hash = hash;
728 
729                     nxt_unit_sptr_set(&f->value, ptr);
730                     f->value_length = (uint32_t) value_len;
731 
732                     ptr += value_len + 1;
733                     header_len -= value_len + 1;
734 
735                     req->response->fields_count++;
736                 }
737 
738             } else {
739                 val_type = napi.type_of(value);
740 
741                 if (val_type != napi_string) {
742                     value = napi.coerce_to_string(value);
743                 }
744 
745                 value_len = napi.get_value_string_latin1(value, ptr, header_len);
746 
747                 f = req->response->fields + req->response->fields_count;
748                 f->skip = 0;
749 
750                 nxt_unit_sptr_set(&f->name, name_ptr);
751 
752                 f->name_length = name_len;
753                 f->hash = hash;
754 
755                 nxt_unit_sptr_set(&f->value, ptr);
756                 f->value_length = (uint32_t) value_len;
757 
758                 ptr += value_len + 1;
759                 header_len -= value_len + 1;
760 
761                 req->response->fields_count++;
762             }
763         }
764 
765     } catch (exception &e) {
766         napi.throw_error(e);
767         return nullptr;
768     }
769 
770     req->response_buf->free = ptr;
771 
772     ret = nxt_unit_response_send(req);
773     if (ret != NXT_UNIT_OK) {
774         napi.throw_error("Failed to send response");
775         return nullptr;
776     }
777 
778     return this_arg;
779 }
780 
781 
782 napi_value
783 Unit::response_write(napi_env env, napi_callback_info info)
784 {
785     int                      ret;
786     void                     *ptr;
787     size_t                   argc, have_buf_len;
788     ssize_t                  res_len;
789     uint32_t                 buf_start, buf_len;
790     nxt_napi                 napi(env);
791     napi_value               this_arg;
792     nxt_unit_buf_t           *buf;
793     napi_valuetype           buf_type;
794     nxt_unit_request_info_t  *req;
795     napi_value               argv[3];
796 
797     argc = 3;
798 
799     try {
800         this_arg = napi.get_cb_info(info, argc, argv);
801         if (argc != 3) {
802             throw exception("Wrong args count. Expected: "
803                             "chunk, start, length");
804         }
805 
806         req = napi.get_request_info(this_arg);
807         buf_type = napi.type_of(argv[0]);
808         buf_start = napi.get_value_uint32(argv[1]);
809         buf_len = napi.get_value_uint32(argv[2]) + 1;
810 
811         if (buf_type == napi_string) {
812             /* TODO: will work only for utf8 content-type */
813 
814             if (req->response_buf != NULL
815                 && (req->response_buf->end - req->response_buf->free)
816                     >= buf_len)
817             {
818                 buf = req->response_buf;
819 
820             } else {
821                 buf = nxt_unit_response_buf_alloc(req, buf_len);
822                 if (buf == NULL) {
823                     throw exception("Failed to allocate response buffer");
824                 }
825             }
826 
827             have_buf_len = napi.get_value_string_utf8(argv[0], buf->free,
828                                                       buf_len);
829 
830             buf->free += have_buf_len;
831 
832             ret = nxt_unit_buf_send(buf);
833             if (ret == NXT_UNIT_OK) {
834                 res_len = have_buf_len;
835             }
836 
837         } else {
838             ptr = napi.get_buffer_info(argv[0], have_buf_len);
839 
840             if (buf_start > 0) {
841                 ptr = ((uint8_t *) ptr) + buf_start;
842                 have_buf_len -= buf_start;
843             }
844 
845             res_len = nxt_unit_response_write_nb(req, ptr, have_buf_len, 0);
846 
847             ret = res_len < 0 ? -res_len : (int) NXT_UNIT_OK;
848         }
849 
850         if (ret != NXT_UNIT_OK) {
851             throw exception("Failed to send body buf");
852         }
853     } catch (exception &e) {
854         napi.throw_error(e);
855         return nullptr;
856     }
857 
858     return napi.create((int64_t) res_len);
859 }
860 
861 
862 napi_value
863 Unit::response_end(napi_env env, napi_callback_info info)
864 {
865     nxt_napi                 napi(env);
866     napi_value               this_arg;
867     req_data_t               *req_data;
868     nxt_unit_request_info_t  *req;
869 
870     try {
871         this_arg = napi.get_cb_info(info);
872 
873         req = napi.get_request_info(this_arg);
874 
875         req_data = (req_data_t *) req->data;
876 
877         napi.remove_wrap(req_data->sock_ref);
878         napi.remove_wrap(req_data->resp_ref);
879         napi.remove_wrap(req_data->conn_ref);
880 
881     } catch (exception &e) {
882         napi.throw_error(e);
883         return nullptr;
884     }
885 
886     nxt_unit_request_done(req, NXT_UNIT_OK);
887 
888     return this_arg;
889 }
890 
891 
892 napi_value
893 Unit::websocket_send_frame(napi_env env, napi_callback_info info)
894 {
895     int                      ret, iovec_len;
896     bool                     fin;
897     size_t                   buf_len;
898     uint32_t                 opcode, sc;
899     nxt_napi                 napi(env);
900     napi_value               this_arg, frame, payload;
901     nxt_unit_request_info_t  *req;
902     char                     status_code[2];
903     struct iovec             iov[2];
904 
905     iovec_len = 0;
906 
907     try {
908         this_arg = napi.get_cb_info(info, frame);
909 
910         req = napi.get_request_info(this_arg);
911 
912         opcode = napi.get_value_uint32(napi.get_named_property(frame,
913                                                                "opcode"));
914         if (opcode == NXT_WEBSOCKET_OP_CLOSE) {
915             sc = napi.get_value_uint32(napi.get_named_property(frame,
916                                                                "closeStatus"));
917             status_code[0] = (sc >> 8) & 0xFF;
918             status_code[1] = sc & 0xFF;
919 
920             iov[iovec_len].iov_base = status_code;
921             iov[iovec_len].iov_len = 2;
922             iovec_len++;
923         }
924 
925         try {
926             fin = napi.get_value_bool(napi.get_named_property(frame, "fin"));
927 
928         } catch (exception &e) {
929             fin = true;
930         }
931 
932         payload = napi.get_named_property(frame, "binaryPayload");
933 
934         if (napi.is_buffer(payload)) {
935             iov[iovec_len].iov_base = napi.get_buffer_info(payload, buf_len);
936 
937         } else {
938             buf_len = 0;
939         }
940 
941     } catch (exception &e) {
942         napi.throw_error(e);
943         return nullptr;
944     }
945 
946     if (buf_len > 0) {
947         iov[iovec_len].iov_len = buf_len;
948         iovec_len++;
949     }
950 
951     ret = nxt_unit_websocket_sendv(req, opcode, fin ? 1 : 0, iov, iovec_len);
952     if (ret != NXT_UNIT_OK) {
953         goto failed;
954     }
955 
956     return this_arg;
957 
958 failed:
959 
960     napi.throw_error("Failed to send frame");
961 
962     return nullptr;
963 }
964 
965 
966 napi_value
967 Unit::websocket_set_sock(napi_env env, napi_callback_info info)
968 {
969     nxt_napi                 napi(env);
970     napi_value               this_arg, sock;
971     req_data_t               *req_data;
972     nxt_unit_request_info_t  *req;
973 
974     try {
975         this_arg = napi.get_cb_info(info, sock);
976 
977         req = napi.get_request_info(sock);
978 
979         req_data = (req_data_t *) req->data;
980         req_data->conn_ref = napi.wrap(this_arg, req, conn_destroy);
981 
982     } catch (exception &e) {
983         napi.throw_error(e);
984         return nullptr;
985     }
986 
987     return this_arg;
988 }
989 
990 
991 void
992 Unit::conn_destroy(napi_env env, void *nativeObject, void *finalize_hint)
993 {
994     nxt_unit_request_info_t  *req;
995 
996     req = (nxt_unit_request_info_t *) nativeObject;
997 
998     nxt_unit_warn(NULL, "conn_destroy: %p", req);
999 }
1000 
1001 
1002 void
1003 Unit::sock_destroy(napi_env env, void *nativeObject, void *finalize_hint)
1004 {
1005     nxt_unit_request_info_t  *req;
1006 
1007     req = (nxt_unit_request_info_t *) nativeObject;
1008 
1009     nxt_unit_warn(NULL, "sock_destroy: %p", req);
1010 }
1011 
1012 
1013 void
1014 Unit::resp_destroy(napi_env env, void *nativeObject, void *finalize_hint)
1015 {
1016     nxt_unit_request_info_t  *req;
1017 
1018     req = (nxt_unit_request_info_t *) nativeObject;
1019 
1020     nxt_unit_warn(NULL, "resp_destroy: %p", req);
1021 }
1022