xref: /unit/src/nodejs/unit-http/unit.cpp (revision 1729:5be509fda29e)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #include "unit.h"
7 
8 #include <unistd.h>
9 #include <fcntl.h>
10 
11 #include <uv.h>
12 
13 #include <nxt_unit_websocket.h>
14 
15 
16 static void delete_port_data(uv_handle_t* handle);
17 
18 napi_ref Unit::constructor_;
19 
20 
21 struct port_data_t {
22     nxt_unit_ctx_t      *ctx;
23     nxt_unit_port_t     *port;
24     uv_poll_t           poll;
25 };
26 
27 
28 struct req_data_t {
29     napi_ref  sock_ref;
30     napi_ref  resp_ref;
31     napi_ref  conn_ref;
32 };
33 
34 
35 Unit::Unit(napi_env env, napi_value jsthis):
36     nxt_napi(env),
37     wrapper_(wrap(jsthis, this, destroy)),
38     unit_ctx_(nullptr)
39 {
40     nxt_unit_debug(NULL, "Unit::Unit()");
41 }
42 
43 
44 Unit::~Unit()
45 {
46     delete_reference(wrapper_);
47 
48     nxt_unit_debug(NULL, "Unit::~Unit()");
49 }
50 
51 
52 napi_value
53 Unit::init(napi_env env, napi_value exports)
54 {
55     nxt_napi    napi(env);
56     napi_value  ctor;
57 
58     napi_property_descriptor  unit_props[] = {
59         { "createServer", 0, create_server, 0, 0, 0, napi_default, 0 },
60         { "listen", 0, listen, 0, 0, 0, napi_default, 0 },
61     };
62 
63     try {
64         ctor = napi.define_class("Unit", create, 2, unit_props);
65         constructor_ = napi.create_reference(ctor);
66 
67         napi.set_named_property(exports, "Unit", ctor);
68         napi.set_named_property(exports, "response_send_headers",
69                                 response_send_headers);
70         napi.set_named_property(exports, "response_write", response_write);
71         napi.set_named_property(exports, "response_end", response_end);
72         napi.set_named_property(exports, "websocket_send_frame",
73                                 websocket_send_frame);
74         napi.set_named_property(exports, "websocket_set_sock",
75                                 websocket_set_sock);
76         napi.set_named_property(exports, "buf_min", nxt_unit_buf_min());
77         napi.set_named_property(exports, "buf_max", nxt_unit_buf_max());
78 
79     } catch (exception &e) {
80         napi.throw_error(e);
81         return nullptr;
82     }
83 
84     return exports;
85 }
86 
87 
88 void
89 Unit::destroy(napi_env env, void *nativeObject, void *finalize_hint)
90 {
91     Unit  *obj = reinterpret_cast<Unit *>(nativeObject);
92 
93     delete obj;
94 }
95 
96 
97 napi_value
98 Unit::create(napi_env env, napi_callback_info info)
99 {
100     nxt_napi    napi(env);
101     napi_value  target, ctor, instance, jsthis;
102 
103     try {
104         target = napi.get_new_target(info);
105 
106         if (target != nullptr) {
107             /* Invoked as constructor: `new Unit(...)`. */
108             jsthis = napi.get_cb_info(info);
109 
110             new Unit(env, jsthis);
111             napi.create_reference(jsthis);
112 
113             return jsthis;
114         }
115 
116         /* Invoked as plain function `Unit(...)`, turn into construct call. */
117         ctor = napi.get_reference_value(constructor_);
118         instance = napi.new_instance(ctor);
119         napi.create_reference(instance);
120 
121     } catch (exception &e) {
122         napi.throw_error(e);
123         return nullptr;
124     }
125 
126     return instance;
127 }
128 
129 
130 napi_value
131 Unit::create_server(napi_env env, napi_callback_info info)
132 {
133     Unit             *obj;
134     size_t           argc;
135     nxt_napi         napi(env);
136     napi_value       jsthis, argv;
137     nxt_unit_init_t  unit_init;
138 
139     argc = 1;
140 
141     try {
142         jsthis = napi.get_cb_info(info, argc, &argv);
143         obj = (Unit *) napi.unwrap(jsthis);
144 
145     } catch (exception &e) {
146         napi.throw_error(e);
147         return nullptr;
148     }
149 
150     memset(&unit_init, 0, sizeof(nxt_unit_init_t));
151 
152     unit_init.data = obj;
153     unit_init.callbacks.request_handler   = request_handler_cb;
154     unit_init.callbacks.websocket_handler = websocket_handler_cb;
155     unit_init.callbacks.close_handler     = close_handler_cb;
156     unit_init.callbacks.shm_ack_handler   = shm_ack_handler_cb;
157     unit_init.callbacks.add_port          = add_port;
158     unit_init.callbacks.remove_port       = remove_port;
159     unit_init.callbacks.quit              = quit_cb;
160 
161     unit_init.request_data_size = sizeof(req_data_t);
162 
163     obj->unit_ctx_ = nxt_unit_init(&unit_init);
164     if (obj->unit_ctx_ == NULL) {
165         goto failed;
166     }
167 
168     return nullptr;
169 
170 failed:
171 
172     napi_throw_error(env, NULL, "Failed to create Unit object");
173 
174     return nullptr;
175 }
176 
177 
178 napi_value
179 Unit::listen(napi_env env, napi_callback_info info)
180 {
181     return nullptr;
182 }
183 
184 
185 void
186 Unit::request_handler_cb(nxt_unit_request_info_t *req)
187 {
188     Unit  *obj;
189 
190     obj = reinterpret_cast<Unit *>(req->unit->data);
191 
192     obj->request_handler(req);
193 }
194 
195 
196 void
197 Unit::request_handler(nxt_unit_request_info_t *req)
198 {
199     napi_value  socket, request, response, server_obj, emit_request;
200 
201     memset(req->data, 0, sizeof(req_data_t));
202 
203     try {
204         nxt_handle_scope  scope(env());
205 
206         server_obj = get_server_object();
207 
208         socket = create_socket(server_obj, req);
209         request = create_request(server_obj, socket);
210         response = create_response(server_obj, request, req);
211 
212         create_headers(req, request);
213 
214         emit_request = get_named_property(server_obj, "emit_request");
215 
216         nxt_async_context   async_context(env(), "request_handler");
217         nxt_callback_scope  async_scope(async_context);
218 
219         make_callback(async_context, server_obj, emit_request, request,
220                       response);
221 
222     } catch (exception &e) {
223         nxt_unit_req_warn(req, "request_handler: %s", e.str);
224     }
225 }
226 
227 
228 void
229 Unit::websocket_handler_cb(nxt_unit_websocket_frame_t *ws)
230 {
231     Unit  *obj;
232 
233     obj = reinterpret_cast<Unit *>(ws->req->unit->data);
234 
235     obj->websocket_handler(ws);
236 }
237 
238 
239 void
240 Unit::websocket_handler(nxt_unit_websocket_frame_t *ws)
241 {
242     napi_value  frame, server_obj, process_frame, conn;
243     req_data_t  *req_data;
244 
245     req_data = (req_data_t *) ws->req->data;
246 
247     try {
248         nxt_handle_scope  scope(env());
249 
250         server_obj = get_server_object();
251 
252         frame = create_websocket_frame(server_obj, ws);
253 
254         conn = get_reference_value(req_data->conn_ref);
255 
256         process_frame = get_named_property(conn, "processFrame");
257 
258         nxt_async_context   async_context(env(), "websocket_handler");
259         nxt_callback_scope  async_scope(async_context);
260 
261         make_callback(async_context, conn, process_frame, frame);
262 
263     } catch (exception &e) {
264         nxt_unit_req_warn(ws->req, "websocket_handler: %s", e.str);
265     }
266 
267     nxt_unit_websocket_done(ws);
268 }
269 
270 
271 void
272 Unit::close_handler_cb(nxt_unit_request_info_t *req)
273 {
274     Unit  *obj;
275 
276     obj = reinterpret_cast<Unit *>(req->unit->data);
277 
278     obj->close_handler(req);
279 }
280 
281 
282 void
283 Unit::close_handler(nxt_unit_request_info_t *req)
284 {
285     napi_value  conn_handle_close, conn;
286     req_data_t  *req_data;
287 
288     req_data = (req_data_t *) req->data;
289 
290     try {
291         nxt_handle_scope  scope(env());
292 
293         conn = get_reference_value(req_data->conn_ref);
294 
295         conn_handle_close = get_named_property(conn, "handleSocketClose");
296 
297         nxt_async_context   async_context(env(), "close_handler");
298         nxt_callback_scope  async_scope(async_context);
299 
300         make_callback(async_context, conn, conn_handle_close,
301                       nxt_napi::create(0));
302 
303         remove_wrap(req_data->sock_ref);
304         remove_wrap(req_data->resp_ref);
305         remove_wrap(req_data->conn_ref);
306 
307     } catch (exception &e) {
308         nxt_unit_req_warn(req, "close_handler: %s", e.str);
309 
310         nxt_unit_request_done(req, NXT_UNIT_ERROR);
311 
312         return;
313     }
314 
315     nxt_unit_request_done(req, NXT_UNIT_OK);
316 }
317 
318 
319 void
320 Unit::shm_ack_handler_cb(nxt_unit_ctx_t *ctx)
321 {
322     Unit  *obj;
323 
324     obj = reinterpret_cast<Unit *>(ctx->unit->data);
325 
326     obj->shm_ack_handler(ctx);
327 }
328 
329 
330 void
331 Unit::shm_ack_handler(nxt_unit_ctx_t *ctx)
332 {
333     napi_value  server_obj, emit_drain;
334 
335     try {
336         nxt_handle_scope  scope(env());
337 
338         server_obj = get_server_object();
339 
340         emit_drain = get_named_property(server_obj, "emit_drain");
341 
342         nxt_async_context   async_context(env(), "shm_ack_handler");
343         nxt_callback_scope  async_scope(async_context);
344 
345         make_callback(async_context, server_obj, emit_drain);
346 
347     } catch (exception &e) {
348         nxt_unit_warn(ctx, "shm_ack_handler: %s", e.str);
349     }
350 }
351 
352 
353 static void
354 nxt_uv_read_callback(uv_poll_t *handle, int status, int events)
355 {
356     port_data_t  *data;
357 
358     data = (port_data_t *) handle->data;
359 
360     nxt_unit_process_port_msg(data->ctx, data->port);
361 }
362 
363 
364 int
365 Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
366 {
367     int               err;
368     Unit              *obj;
369     uv_loop_t         *loop;
370     port_data_t       *data;
371     napi_status       status;
372 
373     if (port->in_fd != -1) {
374         obj = reinterpret_cast<Unit *>(ctx->unit->data);
375 
376         if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) {
377             nxt_unit_warn(ctx, "fcntl(%d, O_NONBLOCK) failed: %s (%d)",
378                           port->in_fd, strerror(errno), errno);
379             return -1;
380         }
381 
382         status = napi_get_uv_event_loop(obj->env(), &loop);
383         if (status != napi_ok) {
384             nxt_unit_warn(ctx, "Failed to get uv.loop");
385             return NXT_UNIT_ERROR;
386         }
387 
388         data = new port_data_t;
389 
390         err = uv_poll_init(loop, &data->poll, port->in_fd);
391         if (err < 0) {
392             nxt_unit_warn(ctx, "Failed to init uv.poll");
393             return NXT_UNIT_ERROR;
394         }
395 
396         err = uv_poll_start(&data->poll, UV_READABLE, nxt_uv_read_callback);
397         if (err < 0) {
398             nxt_unit_warn(ctx, "Failed to start uv.poll");
399             return NXT_UNIT_ERROR;
400         }
401 
402         port->data = data;
403 
404         data->ctx = ctx;
405         data->port = port;
406         data->poll.data = data;
407     }
408 
409     return NXT_UNIT_OK;
410 }
411 
412 
413 void
414 Unit::remove_port(nxt_unit_t *unit, nxt_unit_port_t *port)
415 {
416     port_data_t  *data;
417 
418     if (port->data != NULL) {
419         data = (port_data_t *) port->data;
420 
421         if (data->port == port) {
422             uv_poll_stop(&data->poll);
423 
424             uv_close((uv_handle_t *) &data->poll, delete_port_data);
425         }
426     }
427 }
428 
429 
430 static void
431 delete_port_data(uv_handle_t* handle)
432 {
433     port_data_t  *data;
434 
435     data = (port_data_t *) handle->data;
436 
437     delete data;
438 }
439 
440 
441 void
442 Unit::quit_cb(nxt_unit_ctx_t *ctx)
443 {
444     Unit  *obj;
445 
446     obj = reinterpret_cast<Unit *>(ctx->unit->data);
447 
448     obj->quit(ctx);
449 }
450 
451 
452 void
453 Unit::quit(nxt_unit_ctx_t *ctx)
454 {
455     napi_value  server_obj, emit_close;
456 
457     try {
458         nxt_handle_scope  scope(env());
459 
460         server_obj = get_server_object();
461 
462         emit_close = get_named_property(server_obj, "emit_close");
463 
464         nxt_async_context   async_context(env(), "unit_quit");
465         nxt_callback_scope  async_scope(async_context);
466 
467         make_callback(async_context, server_obj, emit_close);
468 
469     } catch (exception &e) {
470         nxt_unit_debug(ctx, "quit: %s", e.str);
471     }
472 
473     nxt_unit_done(ctx);
474 }
475 
476 
477 napi_value
478 Unit::get_server_object()
479 {
480     napi_value  unit_obj;
481 
482     unit_obj = get_reference_value(wrapper_);
483 
484     return get_named_property(unit_obj, "server");
485 }
486 
487 
488 void
489 Unit::create_headers(nxt_unit_request_info_t *req, napi_value request)
490 {
491     void                *data;
492     uint32_t            i;
493     napi_value          headers, raw_headers, buffer;
494     napi_status         status;
495     nxt_unit_request_t  *r;
496 
497     r = req->request;
498 
499     headers = create_object();
500 
501     status = napi_create_array_with_length(env(), r->fields_count * 2,
502                                            &raw_headers);
503     if (status != napi_ok) {
504         throw exception("Failed to create array");
505     }
506 
507     for (i = 0; i < r->fields_count; i++) {
508         append_header(r->fields + i, headers, raw_headers, i);
509     }
510 
511     set_named_property(request, "headers", headers);
512     set_named_property(request, "rawHeaders", raw_headers);
513     set_named_property(request, "httpVersion", r->version, r->version_length);
514     set_named_property(request, "method", r->method, r->method_length);
515     set_named_property(request, "url", r->target, r->target_length);
516 
517     set_named_property(request, "_websocket_handshake", r->websocket_handshake);
518 
519     buffer = create_buffer((size_t) req->content_length, &data);
520     nxt_unit_request_read(req, data, req->content_length);
521 
522     set_named_property(request, "_data", buffer);
523 }
524 
525 
526 inline char
527 lowcase(char c)
528 {
529     return (c >= 'A' && c <= 'Z') ? (c | 0x20) : c;
530 }
531 
532 
533 inline void
534 Unit::append_header(nxt_unit_field_t *f, napi_value headers,
535     napi_value raw_headers, uint32_t idx)
536 {
537     char        *name;
538     uint8_t     i;
539     napi_value  str, vstr;
540 
541     name = (char *) nxt_unit_sptr_get(&f->name);
542 
543     str = create_string_latin1(name, f->name_length);
544 
545     for (i = 0; i < f->name_length; i++) {
546         name[i] = lowcase(name[i]);
547     }
548 
549     vstr = set_named_property(headers, name, f->value, f->value_length);
550 
551     set_element(raw_headers, idx * 2, str);
552     set_element(raw_headers, idx * 2 + 1, vstr);
553 }
554 
555 
556 napi_value
557 Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req)
558 {
559     napi_value          constructor, res;
560     req_data_t          *req_data;
561     nxt_unit_request_t  *r;
562 
563     r = req->request;
564 
565     constructor = get_named_property(server_obj, "Socket");
566 
567     res = new_instance(constructor);
568 
569     req_data = (req_data_t *) req->data;
570     req_data->sock_ref = wrap(res, req, sock_destroy);
571 
572     set_named_property(res, "remoteAddress", r->remote, r->remote_length);
573     set_named_property(res, "localAddress", r->local, r->local_length);
574 
575     return res;
576 }
577 
578 
579 napi_value
580 Unit::create_request(napi_value server_obj, napi_value socket)
581 {
582     napi_value  constructor;
583 
584     constructor = get_named_property(server_obj, "ServerRequest");
585 
586     return new_instance(constructor, server_obj, socket);
587 }
588 
589 
590 napi_value
591 Unit::create_response(napi_value server_obj, napi_value request,
592     nxt_unit_request_info_t *req)
593 {
594     napi_value  constructor, res;
595     req_data_t  *req_data;
596 
597     constructor = get_named_property(server_obj, "ServerResponse");
598 
599     res = new_instance(constructor, request);
600 
601     req_data = (req_data_t *) req->data;
602     req_data->resp_ref = wrap(res, req, resp_destroy);
603 
604     return res;
605 }
606 
607 
608 napi_value
609 Unit::create_websocket_frame(napi_value server_obj,
610                              nxt_unit_websocket_frame_t *ws)
611 {
612     void        *data;
613     napi_value  constructor, res, buffer;
614     uint8_t     sc[2];
615 
616     constructor = get_named_property(server_obj, "WebSocketFrame");
617 
618     res = new_instance(constructor);
619 
620     set_named_property(res, "fin", (bool) ws->header->fin);
621     set_named_property(res, "opcode", ws->header->opcode);
622     set_named_property(res, "length", (int64_t) ws->payload_len);
623 
624     if (ws->header->opcode == NXT_WEBSOCKET_OP_CLOSE) {
625         if (ws->payload_len >= 2) {
626             nxt_unit_websocket_read(ws, sc, 2);
627 
628             set_named_property(res, "closeStatus",
629                                (((uint16_t) sc[0]) << 8) | sc[1]);
630 
631         } else {
632             set_named_property(res, "closeStatus", -1);
633         }
634     }
635 
636     buffer = create_buffer((size_t) ws->content_length, &data);
637     nxt_unit_websocket_read(ws, data, ws->content_length);
638 
639     set_named_property(res, "binaryPayload", buffer);
640 
641     return res;
642 }
643 
644 
645 napi_value
646 Unit::response_send_headers(napi_env env, napi_callback_info info)
647 {
648     int                      ret;
649     char                     *ptr, *name_ptr;
650     bool                     is_array;
651     size_t                   argc, name_len, value_len;
652     uint32_t                 status_code, header_len, keys_len, array_len;
653     uint32_t                 keys_count, i, j;
654     uint16_t                 hash;
655     nxt_napi                 napi(env);
656     napi_value               this_arg, headers, keys, name, value, array_val;
657     napi_value               array_entry;
658     napi_valuetype           val_type;
659     nxt_unit_field_t         *f;
660     nxt_unit_request_info_t  *req;
661     napi_value               argv[4];
662 
663     argc = 4;
664 
665     try {
666         this_arg = napi.get_cb_info(info, argc, argv);
667         if (argc != 4) {
668             napi.throw_error("Wrong args count. Expected: "
669                              "statusCode, headers, headers count, "
670                              "headers length");
671             return nullptr;
672         }
673 
674         req = napi.get_request_info(this_arg);
675         status_code = napi.get_value_uint32(argv[0]);
676         keys_count = napi.get_value_uint32(argv[2]);
677         header_len = napi.get_value_uint32(argv[3]);
678 
679         headers = argv[1];
680 
681         ret = nxt_unit_response_init(req, status_code, keys_count, header_len);
682         if (ret != NXT_UNIT_OK) {
683             napi.throw_error("Failed to create response");
684             return nullptr;
685         }
686 
687         /*
688          * Each name and value are 0-terminated by libunit.
689          * Need to add extra 2 bytes for each header.
690          */
691         header_len += keys_count * 2;
692 
693         keys = napi.get_property_names(headers);
694         keys_len = napi.get_array_length(keys);
695 
696         ptr = req->response_buf->free;
697 
698         for (i = 0; i < keys_len; i++) {
699             name = napi.get_element(keys, i);
700 
701             array_entry = napi.get_property(headers, name);
702 
703             name = napi.get_element(array_entry, 0);
704             value = napi.get_element(array_entry, 1);
705 
706             name_len = napi.get_value_string_latin1(name, ptr, header_len);
707             name_ptr = ptr;
708 
709             ptr += name_len + 1;
710             header_len -= name_len + 1;
711 
712             hash = nxt_unit_field_hash(name_ptr, name_len);
713 
714             is_array = napi.is_array(value);
715 
716             if (is_array) {
717                 array_len = napi.get_array_length(value);
718 
719                 for (j = 0; j < array_len; j++) {
720                     array_val = napi.get_element(value, j);
721 
722                     val_type = napi.type_of(array_val);
723 
724                     if (val_type != napi_string) {
725                         array_val = napi.coerce_to_string(array_val);
726                     }
727 
728                     value_len = napi.get_value_string_latin1(array_val, ptr,
729                                                              header_len);
730 
731                     f = req->response->fields + req->response->fields_count;
732                     f->skip = 0;
733 
734                     nxt_unit_sptr_set(&f->name, name_ptr);
735 
736                     f->name_length = name_len;
737                     f->hash = hash;
738 
739                     nxt_unit_sptr_set(&f->value, ptr);
740                     f->value_length = (uint32_t) value_len;
741 
742                     ptr += value_len + 1;
743                     header_len -= value_len + 1;
744 
745                     req->response->fields_count++;
746                 }
747 
748             } else {
749                 val_type = napi.type_of(value);
750 
751                 if (val_type != napi_string) {
752                     value = napi.coerce_to_string(value);
753                 }
754 
755                 value_len = napi.get_value_string_latin1(value, ptr, header_len);
756 
757                 f = req->response->fields + req->response->fields_count;
758                 f->skip = 0;
759 
760                 nxt_unit_sptr_set(&f->name, name_ptr);
761 
762                 f->name_length = name_len;
763                 f->hash = hash;
764 
765                 nxt_unit_sptr_set(&f->value, ptr);
766                 f->value_length = (uint32_t) value_len;
767 
768                 ptr += value_len + 1;
769                 header_len -= value_len + 1;
770 
771                 req->response->fields_count++;
772             }
773         }
774 
775     } catch (exception &e) {
776         napi.throw_error(e);
777         return nullptr;
778     }
779 
780     req->response_buf->free = ptr;
781 
782     ret = nxt_unit_response_send(req);
783     if (ret != NXT_UNIT_OK) {
784         napi.throw_error("Failed to send response");
785         return nullptr;
786     }
787 
788     return this_arg;
789 }
790 
791 
792 napi_value
793 Unit::response_write(napi_env env, napi_callback_info info)
794 {
795     int                      ret;
796     void                     *ptr;
797     size_t                   argc, have_buf_len;
798     ssize_t                  res_len;
799     uint32_t                 buf_start, buf_len;
800     nxt_napi                 napi(env);
801     napi_value               this_arg;
802     nxt_unit_buf_t           *buf;
803     napi_valuetype           buf_type;
804     nxt_unit_request_info_t  *req;
805     napi_value               argv[3];
806 
807     argc = 3;
808 
809     try {
810         this_arg = napi.get_cb_info(info, argc, argv);
811         if (argc != 3) {
812             throw exception("Wrong args count. Expected: "
813                             "chunk, start, length");
814         }
815 
816         req = napi.get_request_info(this_arg);
817         buf_type = napi.type_of(argv[0]);
818         buf_start = napi.get_value_uint32(argv[1]);
819         buf_len = napi.get_value_uint32(argv[2]) + 1;
820 
821         if (buf_type == napi_string) {
822             /* TODO: will work only for utf8 content-type */
823 
824             if (req->response_buf != NULL
825                 && req->response_buf->end >= req->response_buf->free + buf_len)
826             {
827                 buf = req->response_buf;
828 
829             } else {
830                 buf = nxt_unit_response_buf_alloc(req, buf_len);
831                 if (buf == NULL) {
832                     throw exception("Failed to allocate response buffer");
833                 }
834             }
835 
836             have_buf_len = napi.get_value_string_utf8(argv[0], buf->free,
837                                                       buf_len);
838 
839             buf->free += have_buf_len;
840 
841             ret = nxt_unit_buf_send(buf);
842             if (ret == NXT_UNIT_OK) {
843                 res_len = have_buf_len;
844             }
845 
846         } else {
847             ptr = napi.get_buffer_info(argv[0], have_buf_len);
848 
849             if (buf_start > 0) {
850                 ptr = ((uint8_t *) ptr) + buf_start;
851                 have_buf_len -= buf_start;
852             }
853 
854             res_len = nxt_unit_response_write_nb(req, ptr, have_buf_len, 0);
855 
856             ret = res_len < 0 ? -res_len : (int) NXT_UNIT_OK;
857         }
858 
859         if (ret != NXT_UNIT_OK) {
860             throw exception("Failed to send body buf");
861         }
862     } catch (exception &e) {
863         napi.throw_error(e);
864         return nullptr;
865     }
866 
867     return napi.create((int64_t) res_len);
868 }
869 
870 
871 napi_value
872 Unit::response_end(napi_env env, napi_callback_info info)
873 {
874     nxt_napi                 napi(env);
875     napi_value               this_arg;
876     req_data_t               *req_data;
877     nxt_unit_request_info_t  *req;
878 
879     try {
880         this_arg = napi.get_cb_info(info);
881 
882         req = napi.get_request_info(this_arg);
883 
884         req_data = (req_data_t *) req->data;
885 
886         napi.remove_wrap(req_data->sock_ref);
887         napi.remove_wrap(req_data->resp_ref);
888         napi.remove_wrap(req_data->conn_ref);
889 
890     } catch (exception &e) {
891         napi.throw_error(e);
892         return nullptr;
893     }
894 
895     nxt_unit_request_done(req, NXT_UNIT_OK);
896 
897     return this_arg;
898 }
899 
900 
901 napi_value
902 Unit::websocket_send_frame(napi_env env, napi_callback_info info)
903 {
904     int                      ret, iovec_len;
905     bool                     fin;
906     size_t                   buf_len;
907     uint32_t                 opcode, sc;
908     nxt_napi                 napi(env);
909     napi_value               this_arg, frame, payload;
910     nxt_unit_request_info_t  *req;
911     char                     status_code[2];
912     struct iovec             iov[2];
913 
914     iovec_len = 0;
915 
916     try {
917         this_arg = napi.get_cb_info(info, frame);
918 
919         req = napi.get_request_info(this_arg);
920 
921         opcode = napi.get_value_uint32(napi.get_named_property(frame,
922                                                                "opcode"));
923         if (opcode == NXT_WEBSOCKET_OP_CLOSE) {
924             sc = napi.get_value_uint32(napi.get_named_property(frame,
925                                                                "closeStatus"));
926             status_code[0] = (sc >> 8) & 0xFF;
927             status_code[1] = sc & 0xFF;
928 
929             iov[iovec_len].iov_base = status_code;
930             iov[iovec_len].iov_len = 2;
931             iovec_len++;
932         }
933 
934         try {
935             fin = napi.get_value_bool(napi.get_named_property(frame, "fin"));
936 
937         } catch (exception &e) {
938             fin = true;
939         }
940 
941         payload = napi.get_named_property(frame, "binaryPayload");
942 
943         if (napi.is_buffer(payload)) {
944             iov[iovec_len].iov_base = napi.get_buffer_info(payload, buf_len);
945 
946         } else {
947             buf_len = 0;
948         }
949 
950     } catch (exception &e) {
951         napi.throw_error(e);
952         return nullptr;
953     }
954 
955     if (buf_len > 0) {
956         iov[iovec_len].iov_len = buf_len;
957         iovec_len++;
958     }
959 
960     ret = nxt_unit_websocket_sendv(req, opcode, fin ? 1 : 0, iov, iovec_len);
961     if (ret != NXT_UNIT_OK) {
962         goto failed;
963     }
964 
965     return this_arg;
966 
967 failed:
968 
969     napi.throw_error("Failed to send frame");
970 
971     return nullptr;
972 }
973 
974 
975 napi_value
976 Unit::websocket_set_sock(napi_env env, napi_callback_info info)
977 {
978     nxt_napi                 napi(env);
979     napi_value               this_arg, sock;
980     req_data_t               *req_data;
981     nxt_unit_request_info_t  *req;
982 
983     try {
984         this_arg = napi.get_cb_info(info, sock);
985 
986         req = napi.get_request_info(sock);
987 
988         req_data = (req_data_t *) req->data;
989         req_data->conn_ref = napi.wrap(this_arg, req, conn_destroy);
990 
991     } catch (exception &e) {
992         napi.throw_error(e);
993         return nullptr;
994     }
995 
996     return this_arg;
997 }
998 
999 
1000 void
1001 Unit::conn_destroy(napi_env env, void *r, void *finalize_hint)
1002 {
1003     nxt_unit_req_debug((nxt_unit_request_info_t *) r, "conn_destroy: %p", r);
1004 }
1005 
1006 
1007 void
1008 Unit::sock_destroy(napi_env env, void *r, void *finalize_hint)
1009 {
1010     nxt_unit_req_debug((nxt_unit_request_info_t *) r, "sock_destroy: %p", r);
1011 }
1012 
1013 
1014 void
1015 Unit::resp_destroy(napi_env env, void *r, void *finalize_hint)
1016 {
1017     nxt_unit_req_debug((nxt_unit_request_info_t *) r, "resp_destroy: %p", r);
1018 }
1019