xref: /unit/src/nodejs/unit-http/unit.cpp (revision 1543:42f27153db91)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #include "unit.h"
7 
8 #include <unistd.h>
9 #include <fcntl.h>
10 
11 #include <uv.h>
12 
13 #include <nxt_unit_websocket.h>
14 
15 
16 static void delete_port_data(uv_handle_t* handle);
17 
18 napi_ref Unit::constructor_;
19 
20 
21 struct port_data_t {
22     nxt_unit_ctx_t      *ctx;
23     nxt_unit_port_id_t  port_id;
24     uv_poll_t           poll;
25 };
26 
27 
28 struct req_data_t {
29     napi_ref  sock_ref;
30     napi_ref  resp_ref;
31     napi_ref  conn_ref;
32 };
33 
34 
35 Unit::Unit(napi_env env, napi_value jsthis):
36     nxt_napi(env),
37     wrapper_(wrap(jsthis, this, destroy)),
38     unit_ctx_(nullptr)
39 {
40     nxt_unit_debug(NULL, "Unit::Unit()");
41 }
42 
43 
44 Unit::~Unit()
45 {
46     delete_reference(wrapper_);
47 
48     nxt_unit_debug(NULL, "Unit::~Unit()");
49 }
50 
51 
52 napi_value
53 Unit::init(napi_env env, napi_value exports)
54 {
55     nxt_napi    napi(env);
56     napi_value  ctor;
57 
58     napi_property_descriptor  unit_props[] = {
59         { "createServer", 0, create_server, 0, 0, 0, napi_default, 0 },
60         { "listen", 0, listen, 0, 0, 0, napi_default, 0 },
61     };
62 
63     try {
64         ctor = napi.define_class("Unit", create, 2, unit_props);
65         constructor_ = napi.create_reference(ctor);
66 
67         napi.set_named_property(exports, "Unit", ctor);
68         napi.set_named_property(exports, "response_send_headers",
69                                 response_send_headers);
70         napi.set_named_property(exports, "response_write", response_write);
71         napi.set_named_property(exports, "response_end", response_end);
72         napi.set_named_property(exports, "websocket_send_frame",
73                                 websocket_send_frame);
74         napi.set_named_property(exports, "websocket_set_sock",
75                                 websocket_set_sock);
76         napi.set_named_property(exports, "buf_min", nxt_unit_buf_min());
77         napi.set_named_property(exports, "buf_max", nxt_unit_buf_max());
78 
79     } catch (exception &e) {
80         napi.throw_error(e);
81         return nullptr;
82     }
83 
84     return exports;
85 }
86 
87 
88 void
89 Unit::destroy(napi_env env, void *nativeObject, void *finalize_hint)
90 {
91     Unit  *obj = reinterpret_cast<Unit *>(nativeObject);
92 
93     delete obj;
94 }
95 
96 
97 napi_value
98 Unit::create(napi_env env, napi_callback_info info)
99 {
100     nxt_napi    napi(env);
101     napi_value  target, ctor, instance, jsthis;
102 
103     try {
104         target = napi.get_new_target(info);
105 
106         if (target != nullptr) {
107             /* Invoked as constructor: `new Unit(...)`. */
108             jsthis = napi.get_cb_info(info);
109 
110             new Unit(env, jsthis);
111             napi.create_reference(jsthis);
112 
113             return jsthis;
114         }
115 
116         /* Invoked as plain function `Unit(...)`, turn into construct call. */
117         ctor = napi.get_reference_value(constructor_);
118         instance = napi.new_instance(ctor);
119         napi.create_reference(instance);
120 
121     } catch (exception &e) {
122         napi.throw_error(e);
123         return nullptr;
124     }
125 
126     return instance;
127 }
128 
129 
130 napi_value
131 Unit::create_server(napi_env env, napi_callback_info info)
132 {
133     Unit             *obj;
134     size_t           argc;
135     nxt_napi         napi(env);
136     napi_value       jsthis, argv;
137     nxt_unit_init_t  unit_init;
138 
139     argc = 1;
140 
141     try {
142         jsthis = napi.get_cb_info(info, argc, &argv);
143         obj = (Unit *) napi.unwrap(jsthis);
144 
145     } catch (exception &e) {
146         napi.throw_error(e);
147         return nullptr;
148     }
149 
150     memset(&unit_init, 0, sizeof(nxt_unit_init_t));
151 
152     unit_init.data = obj;
153     unit_init.callbacks.request_handler   = request_handler_cb;
154     unit_init.callbacks.websocket_handler = websocket_handler_cb;
155     unit_init.callbacks.close_handler     = close_handler_cb;
156     unit_init.callbacks.shm_ack_handler   = shm_ack_handler_cb;
157     unit_init.callbacks.add_port          = add_port;
158     unit_init.callbacks.remove_port       = remove_port;
159     unit_init.callbacks.quit              = quit_cb;
160 
161     unit_init.request_data_size = sizeof(req_data_t);
162 
163     obj->unit_ctx_ = nxt_unit_init(&unit_init);
164     if (obj->unit_ctx_ == NULL) {
165         goto failed;
166     }
167 
168     return nullptr;
169 
170 failed:
171 
172     napi_throw_error(env, NULL, "Failed to create Unit object");
173 
174     return nullptr;
175 }
176 
177 
178 napi_value
179 Unit::listen(napi_env env, napi_callback_info info)
180 {
181     return nullptr;
182 }
183 
184 
185 void
186 Unit::request_handler_cb(nxt_unit_request_info_t *req)
187 {
188     Unit  *obj;
189 
190     obj = reinterpret_cast<Unit *>(req->unit->data);
191 
192     obj->request_handler(req);
193 }
194 
195 
196 void
197 Unit::request_handler(nxt_unit_request_info_t *req)
198 {
199     napi_value  socket, request, response, server_obj, emit_request;
200 
201     memset(req->data, 0, sizeof(req_data_t));
202 
203     try {
204         nxt_handle_scope  scope(env());
205 
206         server_obj = get_server_object();
207 
208         socket = create_socket(server_obj, req);
209         request = create_request(server_obj, socket);
210         response = create_response(server_obj, request, req);
211 
212         create_headers(req, request);
213 
214         emit_request = get_named_property(server_obj, "emit_request");
215 
216         nxt_async_context   async_context(env(), "request_handler");
217         nxt_callback_scope  async_scope(async_context);
218 
219         make_callback(async_context, server_obj, emit_request, request,
220                       response);
221 
222     } catch (exception &e) {
223         nxt_unit_req_warn(req, "request_handler: %s", e.str);
224     }
225 }
226 
227 
228 void
229 Unit::websocket_handler_cb(nxt_unit_websocket_frame_t *ws)
230 {
231     Unit  *obj;
232 
233     obj = reinterpret_cast<Unit *>(ws->req->unit->data);
234 
235     obj->websocket_handler(ws);
236 }
237 
238 
239 void
240 Unit::websocket_handler(nxt_unit_websocket_frame_t *ws)
241 {
242     napi_value  frame, server_obj, process_frame, conn;
243     req_data_t  *req_data;
244 
245     req_data = (req_data_t *) ws->req->data;
246 
247     try {
248         nxt_handle_scope  scope(env());
249 
250         server_obj = get_server_object();
251 
252         frame = create_websocket_frame(server_obj, ws);
253 
254         conn = get_reference_value(req_data->conn_ref);
255 
256         process_frame = get_named_property(conn, "processFrame");
257 
258         nxt_async_context   async_context(env(), "websocket_handler");
259         nxt_callback_scope  async_scope(async_context);
260 
261         make_callback(async_context, conn, process_frame, frame);
262 
263     } catch (exception &e) {
264         nxt_unit_req_warn(ws->req, "websocket_handler: %s", e.str);
265     }
266 
267     nxt_unit_websocket_done(ws);
268 }
269 
270 
271 void
272 Unit::close_handler_cb(nxt_unit_request_info_t *req)
273 {
274     Unit  *obj;
275 
276     obj = reinterpret_cast<Unit *>(req->unit->data);
277 
278     obj->close_handler(req);
279 }
280 
281 
282 void
283 Unit::close_handler(nxt_unit_request_info_t *req)
284 {
285     napi_value  conn_handle_close, conn;
286     req_data_t  *req_data;
287 
288     req_data = (req_data_t *) req->data;
289 
290     try {
291         nxt_handle_scope  scope(env());
292 
293         conn = get_reference_value(req_data->conn_ref);
294 
295         conn_handle_close = get_named_property(conn, "handleSocketClose");
296 
297         nxt_async_context   async_context(env(), "close_handler");
298         nxt_callback_scope  async_scope(async_context);
299 
300         make_callback(async_context, conn, conn_handle_close,
301                       nxt_napi::create(0));
302 
303         remove_wrap(req_data->sock_ref);
304         remove_wrap(req_data->resp_ref);
305         remove_wrap(req_data->conn_ref);
306 
307     } catch (exception &e) {
308         nxt_unit_req_warn(req, "close_handler: %s", e.str);
309 
310         return;
311     }
312 
313     nxt_unit_request_done(req, NXT_UNIT_OK);
314 }
315 
316 
317 void
318 Unit::shm_ack_handler_cb(nxt_unit_ctx_t *ctx)
319 {
320     Unit  *obj;
321 
322     obj = reinterpret_cast<Unit *>(ctx->unit->data);
323 
324     obj->shm_ack_handler(ctx);
325 }
326 
327 
328 void
329 Unit::shm_ack_handler(nxt_unit_ctx_t *ctx)
330 {
331     napi_value  server_obj, emit_drain;
332 
333     try {
334         nxt_handle_scope  scope(env());
335 
336         server_obj = get_server_object();
337 
338         emit_drain = get_named_property(server_obj, "emit_drain");
339 
340         nxt_async_context   async_context(env(), "shm_ack_handler");
341         nxt_callback_scope  async_scope(async_context);
342 
343         make_callback(async_context, server_obj, emit_drain);
344 
345     } catch (exception &e) {
346         nxt_unit_warn(ctx, "shm_ack_handler: %s", e.str);
347     }
348 }
349 
350 
351 static void
352 nxt_uv_read_callback(uv_poll_t *handle, int status, int events)
353 {
354     nxt_unit_run_once((nxt_unit_ctx_t *) handle->data);
355 }
356 
357 
358 int
359 Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
360 {
361     int               err;
362     Unit              *obj;
363     uv_loop_t         *loop;
364     port_data_t       *data;
365     napi_status       status;
366 
367     if (port->in_fd != -1) {
368         obj = reinterpret_cast<Unit *>(ctx->unit->data);
369 
370         if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) {
371             nxt_unit_warn(ctx, "fcntl(%d, O_NONBLOCK) failed: %s (%d)",
372                           port->in_fd, strerror(errno), errno);
373             return -1;
374         }
375 
376         status = napi_get_uv_event_loop(obj->env(), &loop);
377         if (status != napi_ok) {
378             nxt_unit_warn(ctx, "Failed to get uv.loop");
379             return NXT_UNIT_ERROR;
380         }
381 
382         data = new port_data_t;
383 
384         err = uv_poll_init(loop, &data->poll, port->in_fd);
385         if (err < 0) {
386             nxt_unit_warn(ctx, "Failed to init uv.poll");
387             return NXT_UNIT_ERROR;
388         }
389 
390         err = uv_poll_start(&data->poll, UV_READABLE, nxt_uv_read_callback);
391         if (err < 0) {
392             nxt_unit_warn(ctx, "Failed to start uv.poll");
393             return NXT_UNIT_ERROR;
394         }
395 
396         port->data = data;
397 
398         data->ctx = ctx;
399         data->port_id = port->id;
400         data->poll.data = ctx;
401     }
402 
403     return NXT_UNIT_OK;
404 }
405 
406 
407 inline bool
408 operator == (const nxt_unit_port_id_t &p1, const nxt_unit_port_id_t &p2)
409 {
410     return p1.pid == p2.pid && p1.id == p2.id;
411 }
412 
413 
414 void
415 Unit::remove_port(nxt_unit_t *unit, nxt_unit_port_t *port)
416 {
417     port_data_t  *data;
418 
419     if (port->data != NULL) {
420         data = (port_data_t *) port->data;
421 
422         if (data->port_id == port->id) {
423             uv_poll_stop(&data->poll);
424 
425             data->poll.data = data;
426             uv_close((uv_handle_t *) &data->poll, delete_port_data);
427         }
428     }
429 }
430 
431 
432 static void
433 delete_port_data(uv_handle_t* handle)
434 {
435     port_data_t  *data;
436 
437     data = (port_data_t *) handle->data;
438 
439     delete data;
440 }
441 
442 
443 void
444 Unit::quit_cb(nxt_unit_ctx_t *ctx)
445 {
446     Unit  *obj;
447 
448     obj = reinterpret_cast<Unit *>(ctx->unit->data);
449 
450     obj->quit(ctx);
451 }
452 
453 
454 void
455 Unit::quit(nxt_unit_ctx_t *ctx)
456 {
457     napi_value  server_obj, emit_close;
458 
459     try {
460         nxt_handle_scope  scope(env());
461 
462         server_obj = get_server_object();
463 
464         emit_close = get_named_property(server_obj, "emit_close");
465 
466         nxt_async_context   async_context(env(), "unit_quit");
467         nxt_callback_scope  async_scope(async_context);
468 
469         make_callback(async_context, server_obj, emit_close);
470 
471     } catch (exception &e) {
472         nxt_unit_debug(ctx, "quit: %s", e.str);
473     }
474 
475     nxt_unit_done(ctx);
476 }
477 
478 
479 napi_value
480 Unit::get_server_object()
481 {
482     napi_value  unit_obj;
483 
484     unit_obj = get_reference_value(wrapper_);
485 
486     return get_named_property(unit_obj, "server");
487 }
488 
489 
490 void
491 Unit::create_headers(nxt_unit_request_info_t *req, napi_value request)
492 {
493     void                *data;
494     uint32_t            i;
495     napi_value          headers, raw_headers, buffer;
496     napi_status         status;
497     nxt_unit_request_t  *r;
498 
499     r = req->request;
500 
501     headers = create_object();
502 
503     status = napi_create_array_with_length(env(), r->fields_count * 2,
504                                            &raw_headers);
505     if (status != napi_ok) {
506         throw exception("Failed to create array");
507     }
508 
509     for (i = 0; i < r->fields_count; i++) {
510         append_header(r->fields + i, headers, raw_headers, i);
511     }
512 
513     set_named_property(request, "headers", headers);
514     set_named_property(request, "rawHeaders", raw_headers);
515     set_named_property(request, "httpVersion", r->version, r->version_length);
516     set_named_property(request, "method", r->method, r->method_length);
517     set_named_property(request, "url", r->target, r->target_length);
518 
519     set_named_property(request, "_websocket_handshake", r->websocket_handshake);
520 
521     buffer = create_buffer((size_t) req->content_length, &data);
522     nxt_unit_request_read(req, data, req->content_length);
523 
524     set_named_property(request, "_data", buffer);
525 }
526 
527 
528 inline char
529 lowcase(char c)
530 {
531     return (c >= 'A' && c <= 'Z') ? (c | 0x20) : c;
532 }
533 
534 
535 inline void
536 Unit::append_header(nxt_unit_field_t *f, napi_value headers,
537     napi_value raw_headers, uint32_t idx)
538 {
539     char        *name;
540     uint8_t     i;
541     napi_value  str, vstr;
542 
543     name = (char *) nxt_unit_sptr_get(&f->name);
544 
545     str = create_string_latin1(name, f->name_length);
546 
547     for (i = 0; i < f->name_length; i++) {
548         name[i] = lowcase(name[i]);
549     }
550 
551     vstr = set_named_property(headers, name, f->value, f->value_length);
552 
553     set_element(raw_headers, idx * 2, str);
554     set_element(raw_headers, idx * 2 + 1, vstr);
555 }
556 
557 
558 napi_value
559 Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req)
560 {
561     napi_value          constructor, res;
562     req_data_t          *req_data;
563     nxt_unit_request_t  *r;
564 
565     r = req->request;
566 
567     constructor = get_named_property(server_obj, "Socket");
568 
569     res = new_instance(constructor);
570 
571     req_data = (req_data_t *) req->data;
572     req_data->sock_ref = wrap(res, req, sock_destroy);
573 
574     set_named_property(res, "remoteAddress", r->remote, r->remote_length);
575     set_named_property(res, "localAddress", r->local, r->local_length);
576 
577     return res;
578 }
579 
580 
581 napi_value
582 Unit::create_request(napi_value server_obj, napi_value socket)
583 {
584     napi_value  constructor;
585 
586     constructor = get_named_property(server_obj, "ServerRequest");
587 
588     return new_instance(constructor, server_obj, socket);
589 }
590 
591 
592 napi_value
593 Unit::create_response(napi_value server_obj, napi_value request,
594     nxt_unit_request_info_t *req)
595 {
596     napi_value  constructor, res;
597     req_data_t  *req_data;
598 
599     constructor = get_named_property(server_obj, "ServerResponse");
600 
601     res = new_instance(constructor, request);
602 
603     req_data = (req_data_t *) req->data;
604     req_data->resp_ref = wrap(res, req, resp_destroy);
605 
606     return res;
607 }
608 
609 
610 napi_value
611 Unit::create_websocket_frame(napi_value server_obj,
612                              nxt_unit_websocket_frame_t *ws)
613 {
614     void        *data;
615     napi_value  constructor, res, buffer;
616     uint8_t     sc[2];
617 
618     constructor = get_named_property(server_obj, "WebSocketFrame");
619 
620     res = new_instance(constructor);
621 
622     set_named_property(res, "fin", (bool) ws->header->fin);
623     set_named_property(res, "opcode", ws->header->opcode);
624     set_named_property(res, "length", (int64_t) ws->payload_len);
625 
626     if (ws->header->opcode == NXT_WEBSOCKET_OP_CLOSE) {
627         if (ws->payload_len >= 2) {
628             nxt_unit_websocket_read(ws, sc, 2);
629 
630             set_named_property(res, "closeStatus",
631                                (((uint16_t) sc[0]) << 8) | sc[1]);
632 
633         } else {
634             set_named_property(res, "closeStatus", -1);
635         }
636     }
637 
638     buffer = create_buffer((size_t) ws->content_length, &data);
639     nxt_unit_websocket_read(ws, data, ws->content_length);
640 
641     set_named_property(res, "binaryPayload", buffer);
642 
643     return res;
644 }
645 
646 
647 napi_value
648 Unit::response_send_headers(napi_env env, napi_callback_info info)
649 {
650     int                      ret;
651     char                     *ptr, *name_ptr;
652     bool                     is_array;
653     size_t                   argc, name_len, value_len;
654     uint32_t                 status_code, header_len, keys_len, array_len;
655     uint32_t                 keys_count, i, j;
656     uint16_t                 hash;
657     nxt_napi                 napi(env);
658     napi_value               this_arg, headers, keys, name, value, array_val;
659     napi_value               array_entry;
660     napi_valuetype           val_type;
661     nxt_unit_field_t         *f;
662     nxt_unit_request_info_t  *req;
663     napi_value               argv[4];
664 
665     argc = 4;
666 
667     try {
668         this_arg = napi.get_cb_info(info, argc, argv);
669         if (argc != 4) {
670             napi.throw_error("Wrong args count. Expected: "
671                              "statusCode, headers, headers count, "
672                              "headers length");
673             return nullptr;
674         }
675 
676         req = napi.get_request_info(this_arg);
677         status_code = napi.get_value_uint32(argv[0]);
678         keys_count = napi.get_value_uint32(argv[2]);
679         header_len = napi.get_value_uint32(argv[3]);
680 
681         headers = argv[1];
682 
683         ret = nxt_unit_response_init(req, status_code, keys_count, header_len);
684         if (ret != NXT_UNIT_OK) {
685             napi.throw_error("Failed to create response");
686             return nullptr;
687         }
688 
689         /*
690          * Each name and value are 0-terminated by libunit.
691          * Need to add extra 2 bytes for each header.
692          */
693         header_len += keys_count * 2;
694 
695         keys = napi.get_property_names(headers);
696         keys_len = napi.get_array_length(keys);
697 
698         ptr = req->response_buf->free;
699 
700         for (i = 0; i < keys_len; i++) {
701             name = napi.get_element(keys, i);
702 
703             array_entry = napi.get_property(headers, name);
704 
705             name = napi.get_element(array_entry, 0);
706             value = napi.get_element(array_entry, 1);
707 
708             name_len = napi.get_value_string_latin1(name, ptr, header_len);
709             name_ptr = ptr;
710 
711             ptr += name_len + 1;
712             header_len -= name_len + 1;
713 
714             hash = nxt_unit_field_hash(name_ptr, name_len);
715 
716             is_array = napi.is_array(value);
717 
718             if (is_array) {
719                 array_len = napi.get_array_length(value);
720 
721                 for (j = 0; j < array_len; j++) {
722                     array_val = napi.get_element(value, j);
723 
724                     val_type = napi.type_of(array_val);
725 
726                     if (val_type != napi_string) {
727                         array_val = napi.coerce_to_string(array_val);
728                     }
729 
730                     value_len = napi.get_value_string_latin1(array_val, ptr,
731                                                              header_len);
732 
733                     f = req->response->fields + req->response->fields_count;
734                     f->skip = 0;
735 
736                     nxt_unit_sptr_set(&f->name, name_ptr);
737 
738                     f->name_length = name_len;
739                     f->hash = hash;
740 
741                     nxt_unit_sptr_set(&f->value, ptr);
742                     f->value_length = (uint32_t) value_len;
743 
744                     ptr += value_len + 1;
745                     header_len -= value_len + 1;
746 
747                     req->response->fields_count++;
748                 }
749 
750             } else {
751                 val_type = napi.type_of(value);
752 
753                 if (val_type != napi_string) {
754                     value = napi.coerce_to_string(value);
755                 }
756 
757                 value_len = napi.get_value_string_latin1(value, ptr, header_len);
758 
759                 f = req->response->fields + req->response->fields_count;
760                 f->skip = 0;
761 
762                 nxt_unit_sptr_set(&f->name, name_ptr);
763 
764                 f->name_length = name_len;
765                 f->hash = hash;
766 
767                 nxt_unit_sptr_set(&f->value, ptr);
768                 f->value_length = (uint32_t) value_len;
769 
770                 ptr += value_len + 1;
771                 header_len -= value_len + 1;
772 
773                 req->response->fields_count++;
774             }
775         }
776 
777     } catch (exception &e) {
778         napi.throw_error(e);
779         return nullptr;
780     }
781 
782     req->response_buf->free = ptr;
783 
784     ret = nxt_unit_response_send(req);
785     if (ret != NXT_UNIT_OK) {
786         napi.throw_error("Failed to send response");
787         return nullptr;
788     }
789 
790     return this_arg;
791 }
792 
793 
794 napi_value
795 Unit::response_write(napi_env env, napi_callback_info info)
796 {
797     int                      ret;
798     void                     *ptr;
799     size_t                   argc, have_buf_len;
800     ssize_t                  res_len;
801     uint32_t                 buf_start, buf_len;
802     nxt_napi                 napi(env);
803     napi_value               this_arg;
804     nxt_unit_buf_t           *buf;
805     napi_valuetype           buf_type;
806     nxt_unit_request_info_t  *req;
807     napi_value               argv[3];
808 
809     argc = 3;
810 
811     try {
812         this_arg = napi.get_cb_info(info, argc, argv);
813         if (argc != 3) {
814             throw exception("Wrong args count. Expected: "
815                             "chunk, start, length");
816         }
817 
818         req = napi.get_request_info(this_arg);
819         buf_type = napi.type_of(argv[0]);
820         buf_start = napi.get_value_uint32(argv[1]);
821         buf_len = napi.get_value_uint32(argv[2]) + 1;
822 
823         if (buf_type == napi_string) {
824             /* TODO: will work only for utf8 content-type */
825 
826             if (req->response_buf != NULL
827                 && req->response_buf->end >= req->response_buf->free + buf_len)
828             {
829                 buf = req->response_buf;
830 
831             } else {
832                 buf = nxt_unit_response_buf_alloc(req, buf_len);
833                 if (buf == NULL) {
834                     throw exception("Failed to allocate response buffer");
835                 }
836             }
837 
838             have_buf_len = napi.get_value_string_utf8(argv[0], buf->free,
839                                                       buf_len);
840 
841             buf->free += have_buf_len;
842 
843             ret = nxt_unit_buf_send(buf);
844             if (ret == NXT_UNIT_OK) {
845                 res_len = have_buf_len;
846             }
847 
848         } else {
849             ptr = napi.get_buffer_info(argv[0], have_buf_len);
850 
851             if (buf_start > 0) {
852                 ptr = ((uint8_t *) ptr) + buf_start;
853                 have_buf_len -= buf_start;
854             }
855 
856             res_len = nxt_unit_response_write_nb(req, ptr, have_buf_len, 0);
857 
858             ret = res_len < 0 ? -res_len : (int) NXT_UNIT_OK;
859         }
860 
861         if (ret != NXT_UNIT_OK) {
862             throw exception("Failed to send body buf");
863         }
864     } catch (exception &e) {
865         napi.throw_error(e);
866         return nullptr;
867     }
868 
869     return napi.create((int64_t) res_len);
870 }
871 
872 
873 napi_value
874 Unit::response_end(napi_env env, napi_callback_info info)
875 {
876     nxt_napi                 napi(env);
877     napi_value               this_arg;
878     req_data_t               *req_data;
879     nxt_unit_request_info_t  *req;
880 
881     try {
882         this_arg = napi.get_cb_info(info);
883 
884         req = napi.get_request_info(this_arg);
885 
886         req_data = (req_data_t *) req->data;
887 
888         napi.remove_wrap(req_data->sock_ref);
889         napi.remove_wrap(req_data->resp_ref);
890         napi.remove_wrap(req_data->conn_ref);
891 
892     } catch (exception &e) {
893         napi.throw_error(e);
894         return nullptr;
895     }
896 
897     nxt_unit_request_done(req, NXT_UNIT_OK);
898 
899     return this_arg;
900 }
901 
902 
903 napi_value
904 Unit::websocket_send_frame(napi_env env, napi_callback_info info)
905 {
906     int                      ret, iovec_len;
907     bool                     fin;
908     size_t                   buf_len;
909     uint32_t                 opcode, sc;
910     nxt_napi                 napi(env);
911     napi_value               this_arg, frame, payload;
912     nxt_unit_request_info_t  *req;
913     char                     status_code[2];
914     struct iovec             iov[2];
915 
916     iovec_len = 0;
917 
918     try {
919         this_arg = napi.get_cb_info(info, frame);
920 
921         req = napi.get_request_info(this_arg);
922 
923         opcode = napi.get_value_uint32(napi.get_named_property(frame,
924                                                                "opcode"));
925         if (opcode == NXT_WEBSOCKET_OP_CLOSE) {
926             sc = napi.get_value_uint32(napi.get_named_property(frame,
927                                                                "closeStatus"));
928             status_code[0] = (sc >> 8) & 0xFF;
929             status_code[1] = sc & 0xFF;
930 
931             iov[iovec_len].iov_base = status_code;
932             iov[iovec_len].iov_len = 2;
933             iovec_len++;
934         }
935 
936         try {
937             fin = napi.get_value_bool(napi.get_named_property(frame, "fin"));
938 
939         } catch (exception &e) {
940             fin = true;
941         }
942 
943         payload = napi.get_named_property(frame, "binaryPayload");
944 
945         if (napi.is_buffer(payload)) {
946             iov[iovec_len].iov_base = napi.get_buffer_info(payload, buf_len);
947 
948         } else {
949             buf_len = 0;
950         }
951 
952     } catch (exception &e) {
953         napi.throw_error(e);
954         return nullptr;
955     }
956 
957     if (buf_len > 0) {
958         iov[iovec_len].iov_len = buf_len;
959         iovec_len++;
960     }
961 
962     ret = nxt_unit_websocket_sendv(req, opcode, fin ? 1 : 0, iov, iovec_len);
963     if (ret != NXT_UNIT_OK) {
964         goto failed;
965     }
966 
967     return this_arg;
968 
969 failed:
970 
971     napi.throw_error("Failed to send frame");
972 
973     return nullptr;
974 }
975 
976 
977 napi_value
978 Unit::websocket_set_sock(napi_env env, napi_callback_info info)
979 {
980     nxt_napi                 napi(env);
981     napi_value               this_arg, sock;
982     req_data_t               *req_data;
983     nxt_unit_request_info_t  *req;
984 
985     try {
986         this_arg = napi.get_cb_info(info, sock);
987 
988         req = napi.get_request_info(sock);
989 
990         req_data = (req_data_t *) req->data;
991         req_data->conn_ref = napi.wrap(this_arg, req, conn_destroy);
992 
993     } catch (exception &e) {
994         napi.throw_error(e);
995         return nullptr;
996     }
997 
998     return this_arg;
999 }
1000 
1001 
1002 void
1003 Unit::conn_destroy(napi_env env, void *nativeObject, void *finalize_hint)
1004 {
1005     nxt_unit_request_info_t  *req;
1006 
1007     req = (nxt_unit_request_info_t *) nativeObject;
1008 
1009     nxt_unit_warn(NULL, "conn_destroy: %p", req);
1010 }
1011 
1012 
1013 void
1014 Unit::sock_destroy(napi_env env, void *nativeObject, void *finalize_hint)
1015 {
1016     nxt_unit_request_info_t  *req;
1017 
1018     req = (nxt_unit_request_info_t *) nativeObject;
1019 
1020     nxt_unit_warn(NULL, "sock_destroy: %p", req);
1021 }
1022 
1023 
1024 void
1025 Unit::resp_destroy(napi_env env, void *nativeObject, void *finalize_hint)
1026 {
1027     nxt_unit_request_info_t  *req;
1028 
1029     req = (nxt_unit_request_info_t *) nativeObject;
1030 
1031     nxt_unit_warn(NULL, "resp_destroy: %p", req);
1032 }
1033