xref: /unit/src/nodejs/unit-http/unit.cpp (revision 1980:43553aa72111)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #include "unit.h"
7 
8 #include <unistd.h>
9 #include <fcntl.h>
10 
11 #include <uv.h>
12 
13 #include <nxt_unit_websocket.h>
14 
15 
16 napi_ref Unit::constructor_;
17 
18 
19 struct port_data_t {
20     port_data_t(nxt_unit_ctx_t *c, nxt_unit_port_t *p);
21 
22     void process_port_msg();
23     void stop();
24 
25     template<typename T>
26     static port_data_t *get(T *handle);
27 
28     static void read_callback(uv_poll_t *handle, int status, int events);
29     static void timer_callback(uv_timer_t *handle);
30     static void delete_data(uv_handle_t* handle);
31 
32     nxt_unit_ctx_t   *ctx;
33     nxt_unit_port_t  *port;
34     uv_poll_t        poll;
35     uv_timer_t       timer;
36     int              ref_count;
37     bool             scheduled;
38     bool             stopped;
39 };
40 
41 
42 struct req_data_t {
43     napi_ref  sock_ref;
44     napi_ref  req_ref;
45     napi_ref  resp_ref;
46     napi_ref  conn_ref;
47 };
48 
49 
50 port_data_t::port_data_t(nxt_unit_ctx_t *c, nxt_unit_port_t *p) :
51     ctx(c), port(p), ref_count(0), scheduled(false), stopped(false)
52 {
53     timer.type = UV_UNKNOWN_HANDLE;
54 }
55 
56 
57 void
58 port_data_t::process_port_msg()
59 {
60     int  rc, err;
61 
62     rc = nxt_unit_process_port_msg(ctx, port);
63 
64     if (rc != NXT_UNIT_OK) {
65         return;
66     }
67 
68     if (timer.type == UV_UNKNOWN_HANDLE) {
69         err = uv_timer_init(poll.loop, &timer);
70         if (err < 0) {
71             nxt_unit_warn(ctx, "Failed to init uv.poll");
72             return;
73         }
74 
75         ref_count++;
76         timer.data = this;
77     }
78 
79     if (!scheduled && !stopped) {
80         uv_timer_start(&timer, timer_callback, 0, 0);
81 
82         scheduled = true;
83     }
84 }
85 
86 
87 void
88 port_data_t::stop()
89 {
90     stopped = true;
91 
92     uv_poll_stop(&poll);
93 
94     uv_close((uv_handle_t *) &poll, delete_data);
95 
96     if (timer.type == UV_UNKNOWN_HANDLE) {
97         return;
98     }
99 
100     uv_timer_stop(&timer);
101 
102     uv_close((uv_handle_t *) &timer, delete_data);
103 }
104 
105 
106 template<typename T>
107 port_data_t *
108 port_data_t::get(T *handle)
109 {
110     return (port_data_t *) handle->data;
111 }
112 
113 
114 void
115 port_data_t::read_callback(uv_poll_t *handle, int status, int events)
116 {
117     get(handle)->process_port_msg();
118 }
119 
120 
121 void
122 port_data_t::timer_callback(uv_timer_t *handle)
123 {
124     port_data_t  *data;
125 
126     data = get(handle);
127 
128     data->scheduled = false;
129     if (data->stopped) {
130         return;
131     }
132 
133     data->process_port_msg();
134 }
135 
136 
137 void
138 port_data_t::delete_data(uv_handle_t* handle)
139 {
140     port_data_t  *data;
141 
142     data = get(handle);
143 
144     if (--data->ref_count <= 0) {
145         delete data;
146     }
147 }
148 
149 
150 Unit::Unit(napi_env env, napi_value jsthis):
151     nxt_napi(env),
152     wrapper_(wrap(jsthis, this, destroy)),
153     unit_ctx_(nullptr)
154 {
155     nxt_unit_debug(NULL, "Unit::Unit()");
156 }
157 
158 
159 Unit::~Unit()
160 {
161     delete_reference(wrapper_);
162 
163     nxt_unit_debug(NULL, "Unit::~Unit()");
164 }
165 
166 
167 napi_value
168 Unit::init(napi_env env, napi_value exports)
169 {
170     nxt_napi    napi(env);
171     napi_value  ctor;
172 
173     napi_property_descriptor  unit_props[] = {
174         { "createServer", 0, create_server, 0, 0, 0, napi_default, 0 },
175         { "listen", 0, listen, 0, 0, 0, napi_default, 0 },
176     };
177 
178     try {
179         ctor = napi.define_class("Unit", create, 2, unit_props);
180         constructor_ = napi.create_reference(ctor);
181 
182         napi.set_named_property(exports, "Unit", ctor);
183         napi.set_named_property(exports, "request_read", request_read);
184         napi.set_named_property(exports, "response_send_headers",
185                                 response_send_headers);
186         napi.set_named_property(exports, "response_write", response_write);
187         napi.set_named_property(exports, "response_end", response_end);
188         napi.set_named_property(exports, "websocket_send_frame",
189                                 websocket_send_frame);
190         napi.set_named_property(exports, "websocket_set_sock",
191                                 websocket_set_sock);
192         napi.set_named_property(exports, "buf_min", nxt_unit_buf_min());
193         napi.set_named_property(exports, "buf_max", nxt_unit_buf_max());
194 
195     } catch (exception &e) {
196         napi.throw_error(e);
197         return nullptr;
198     }
199 
200     return exports;
201 }
202 
203 
204 void
205 Unit::destroy(napi_env env, void *nativeObject, void *finalize_hint)
206 {
207     Unit  *obj = reinterpret_cast<Unit *>(nativeObject);
208 
209     delete obj;
210 }
211 
212 
213 napi_value
214 Unit::create(napi_env env, napi_callback_info info)
215 {
216     nxt_napi    napi(env);
217     napi_value  target, ctor, instance, jsthis;
218 
219     try {
220         target = napi.get_new_target(info);
221 
222         if (target != nullptr) {
223             /* Invoked as constructor: `new Unit(...)`. */
224             jsthis = napi.get_cb_info(info);
225 
226             new Unit(env, jsthis);
227             napi.create_reference(jsthis);
228 
229             return jsthis;
230         }
231 
232         /* Invoked as plain function `Unit(...)`, turn into construct call. */
233         ctor = napi.get_reference_value(constructor_);
234         instance = napi.new_instance(ctor);
235         napi.create_reference(instance);
236 
237     } catch (exception &e) {
238         napi.throw_error(e);
239         return nullptr;
240     }
241 
242     return instance;
243 }
244 
245 
246 napi_value
247 Unit::create_server(napi_env env, napi_callback_info info)
248 {
249     Unit             *obj;
250     size_t           argc;
251     nxt_napi         napi(env);
252     napi_value       jsthis, argv;
253     nxt_unit_init_t  unit_init;
254 
255     argc = 1;
256 
257     try {
258         jsthis = napi.get_cb_info(info, argc, &argv);
259         obj = (Unit *) napi.unwrap(jsthis);
260 
261     } catch (exception &e) {
262         napi.throw_error(e);
263         return nullptr;
264     }
265 
266     memset(&unit_init, 0, sizeof(nxt_unit_init_t));
267 
268     unit_init.data = obj;
269     unit_init.callbacks.request_handler   = request_handler_cb;
270     unit_init.callbacks.websocket_handler = websocket_handler_cb;
271     unit_init.callbacks.close_handler     = close_handler_cb;
272     unit_init.callbacks.shm_ack_handler   = shm_ack_handler_cb;
273     unit_init.callbacks.add_port          = add_port;
274     unit_init.callbacks.remove_port       = remove_port;
275     unit_init.callbacks.quit              = quit_cb;
276 
277     unit_init.request_data_size = sizeof(req_data_t);
278 
279     obj->unit_ctx_ = nxt_unit_init(&unit_init);
280     if (obj->unit_ctx_ == NULL) {
281         goto failed;
282     }
283 
284     return nullptr;
285 
286 failed:
287 
288     napi_throw_error(env, NULL, "Failed to create Unit object");
289 
290     return nullptr;
291 }
292 
293 
294 napi_value
295 Unit::listen(napi_env env, napi_callback_info info)
296 {
297     return nullptr;
298 }
299 
300 
301 void
302 Unit::request_handler_cb(nxt_unit_request_info_t *req)
303 {
304     Unit  *obj;
305 
306     obj = reinterpret_cast<Unit *>(req->unit->data);
307 
308     obj->request_handler(req);
309 }
310 
311 
312 void
313 Unit::request_handler(nxt_unit_request_info_t *req)
314 {
315     napi_value  socket, request, response, server_obj, emit_request;
316 
317     memset(req->data, 0, sizeof(req_data_t));
318 
319     try {
320         nxt_handle_scope  scope(env());
321 
322         server_obj = get_server_object();
323 
324         socket = create_socket(server_obj, req);
325         request = create_request(server_obj, socket, req);
326         response = create_response(server_obj, request, req);
327 
328         create_headers(req, request);
329 
330         emit_request = get_named_property(server_obj, "emit_request");
331 
332         nxt_async_context   async_context(env(), "request_handler");
333         nxt_callback_scope  async_scope(async_context);
334 
335         make_callback(async_context, server_obj, emit_request, request,
336                       response);
337 
338     } catch (exception &e) {
339         nxt_unit_req_warn(req, "request_handler: %s", e.str);
340     }
341 }
342 
343 
344 void
345 Unit::websocket_handler_cb(nxt_unit_websocket_frame_t *ws)
346 {
347     Unit  *obj;
348 
349     obj = reinterpret_cast<Unit *>(ws->req->unit->data);
350 
351     obj->websocket_handler(ws);
352 }
353 
354 
355 void
356 Unit::websocket_handler(nxt_unit_websocket_frame_t *ws)
357 {
358     napi_value  frame, server_obj, process_frame, conn;
359     req_data_t  *req_data;
360 
361     req_data = (req_data_t *) ws->req->data;
362 
363     try {
364         nxt_handle_scope  scope(env());
365 
366         server_obj = get_server_object();
367 
368         frame = create_websocket_frame(server_obj, ws);
369 
370         conn = get_reference_value(req_data->conn_ref);
371 
372         process_frame = get_named_property(conn, "processFrame");
373 
374         nxt_async_context   async_context(env(), "websocket_handler");
375         nxt_callback_scope  async_scope(async_context);
376 
377         make_callback(async_context, conn, process_frame, frame);
378 
379     } catch (exception &e) {
380         nxt_unit_req_warn(ws->req, "websocket_handler: %s", e.str);
381     }
382 
383     nxt_unit_websocket_done(ws);
384 }
385 
386 
387 void
388 Unit::close_handler_cb(nxt_unit_request_info_t *req)
389 {
390     Unit  *obj;
391 
392     obj = reinterpret_cast<Unit *>(req->unit->data);
393 
394     obj->close_handler(req);
395 }
396 
397 
398 void
399 Unit::close_handler(nxt_unit_request_info_t *req)
400 {
401     napi_value  conn_handle_close, conn;
402     req_data_t  *req_data;
403 
404     req_data = (req_data_t *) req->data;
405 
406     try {
407         nxt_handle_scope  scope(env());
408 
409         conn = get_reference_value(req_data->conn_ref);
410 
411         conn_handle_close = get_named_property(conn, "handleSocketClose");
412 
413         nxt_async_context   async_context(env(), "close_handler");
414         nxt_callback_scope  async_scope(async_context);
415 
416         make_callback(async_context, conn, conn_handle_close,
417                       nxt_napi::create(0));
418 
419         remove_wrap(req_data->sock_ref);
420         remove_wrap(req_data->req_ref);
421         remove_wrap(req_data->resp_ref);
422         remove_wrap(req_data->conn_ref);
423 
424     } catch (exception &e) {
425         nxt_unit_req_warn(req, "close_handler: %s", e.str);
426 
427         nxt_unit_request_done(req, NXT_UNIT_ERROR);
428 
429         return;
430     }
431 
432     nxt_unit_request_done(req, NXT_UNIT_OK);
433 }
434 
435 
436 void
437 Unit::shm_ack_handler_cb(nxt_unit_ctx_t *ctx)
438 {
439     Unit  *obj;
440 
441     obj = reinterpret_cast<Unit *>(ctx->unit->data);
442 
443     obj->shm_ack_handler(ctx);
444 }
445 
446 
447 void
448 Unit::shm_ack_handler(nxt_unit_ctx_t *ctx)
449 {
450     napi_value  server_obj, emit_drain;
451 
452     try {
453         nxt_handle_scope  scope(env());
454 
455         server_obj = get_server_object();
456 
457         emit_drain = get_named_property(server_obj, "emit_drain");
458 
459         nxt_async_context   async_context(env(), "shm_ack_handler");
460         nxt_callback_scope  async_scope(async_context);
461 
462         make_callback(async_context, server_obj, emit_drain);
463 
464     } catch (exception &e) {
465         nxt_unit_warn(ctx, "shm_ack_handler: %s", e.str);
466     }
467 }
468 
469 
470 int
471 Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
472 {
473     int          err;
474     Unit         *obj;
475     uv_loop_t    *loop;
476     port_data_t  *data;
477     napi_status  status;
478 
479     if (port->in_fd != -1) {
480         if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) {
481             nxt_unit_warn(ctx, "fcntl(%d, O_NONBLOCK) failed: %s (%d)",
482                           port->in_fd, strerror(errno), errno);
483             return -1;
484         }
485 
486         obj = reinterpret_cast<Unit *>(ctx->unit->data);
487 
488         status = napi_get_uv_event_loop(obj->env(), &loop);
489         if (status != napi_ok) {
490             nxt_unit_warn(ctx, "Failed to get uv.loop");
491             return NXT_UNIT_ERROR;
492         }
493 
494         data = new port_data_t(ctx, port);
495 
496         err = uv_poll_init(loop, &data->poll, port->in_fd);
497         if (err < 0) {
498             nxt_unit_warn(ctx, "Failed to init uv.poll");
499             delete data;
500             return NXT_UNIT_ERROR;
501         }
502 
503         err = uv_poll_start(&data->poll, UV_READABLE,
504                             port_data_t::read_callback);
505         if (err < 0) {
506             nxt_unit_warn(ctx, "Failed to start uv.poll");
507             delete data;
508             return NXT_UNIT_ERROR;
509         }
510 
511         port->data = data;
512 
513         data->ref_count++;
514         data->poll.data = data;
515     }
516 
517     return NXT_UNIT_OK;
518 }
519 
520 
521 void
522 Unit::remove_port(nxt_unit_t *unit, nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
523 {
524     port_data_t  *data;
525 
526     if (port->data != NULL && ctx != NULL) {
527         data = (port_data_t *) port->data;
528 
529         data->stop();
530     }
531 }
532 
533 
534 void
535 Unit::quit_cb(nxt_unit_ctx_t *ctx)
536 {
537     Unit  *obj;
538 
539     obj = reinterpret_cast<Unit *>(ctx->unit->data);
540 
541     obj->quit(ctx);
542 }
543 
544 
545 void
546 Unit::quit(nxt_unit_ctx_t *ctx)
547 {
548     napi_value  server_obj, emit_close;
549 
550     try {
551         nxt_handle_scope  scope(env());
552 
553         server_obj = get_server_object();
554 
555         emit_close = get_named_property(server_obj, "emit_close");
556 
557         nxt_async_context   async_context(env(), "unit_quit");
558         nxt_callback_scope  async_scope(async_context);
559 
560         make_callback(async_context, server_obj, emit_close);
561 
562     } catch (exception &e) {
563         nxt_unit_debug(ctx, "quit: %s", e.str);
564     }
565 
566     nxt_unit_done(ctx);
567 }
568 
569 
570 napi_value
571 Unit::get_server_object()
572 {
573     napi_value  unit_obj;
574 
575     unit_obj = get_reference_value(wrapper_);
576 
577     return get_named_property(unit_obj, "server");
578 }
579 
580 
581 void
582 Unit::create_headers(nxt_unit_request_info_t *req, napi_value request)
583 {
584     uint32_t            i;
585     napi_value          headers, raw_headers;
586     napi_status         status;
587     nxt_unit_request_t  *r;
588 
589     r = req->request;
590 
591     headers = create_object();
592 
593     status = napi_create_array_with_length(env(), r->fields_count * 2,
594                                            &raw_headers);
595     if (status != napi_ok) {
596         throw exception("Failed to create array");
597     }
598 
599     for (i = 0; i < r->fields_count; i++) {
600         append_header(r->fields + i, headers, raw_headers, i);
601     }
602 
603     set_named_property(request, "headers", headers);
604     set_named_property(request, "rawHeaders", raw_headers);
605     set_named_property(request, "httpVersion", r->version, r->version_length);
606     set_named_property(request, "method", r->method, r->method_length);
607     set_named_property(request, "url", r->target, r->target_length);
608 
609     set_named_property(request, "_websocket_handshake", r->websocket_handshake);
610 }
611 
612 
613 inline char
614 lowcase(char c)
615 {
616     return (c >= 'A' && c <= 'Z') ? (c | 0x20) : c;
617 }
618 
619 
620 inline void
621 Unit::append_header(nxt_unit_field_t *f, napi_value headers,
622     napi_value raw_headers, uint32_t idx)
623 {
624     char        *name;
625     uint8_t     i;
626     napi_value  str, vstr;
627 
628     name = (char *) nxt_unit_sptr_get(&f->name);
629 
630     str = create_string_latin1(name, f->name_length);
631 
632     for (i = 0; i < f->name_length; i++) {
633         name[i] = lowcase(name[i]);
634     }
635 
636     vstr = set_named_property(headers, name, f->value, f->value_length);
637 
638     set_element(raw_headers, idx * 2, str);
639     set_element(raw_headers, idx * 2 + 1, vstr);
640 }
641 
642 
643 napi_value
644 Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req)
645 {
646     napi_value          constructor, res;
647     req_data_t          *req_data;
648     nxt_unit_request_t  *r;
649 
650     r = req->request;
651 
652     constructor = get_named_property(server_obj, "Socket");
653 
654     res = new_instance(constructor);
655 
656     req_data = (req_data_t *) req->data;
657     req_data->sock_ref = wrap(res, req, sock_destroy);
658 
659     set_named_property(res, "remoteAddress", r->remote, r->remote_length);
660     set_named_property(res, "localAddress", r->local, r->local_length);
661 
662     return res;
663 }
664 
665 
666 napi_value
667 Unit::create_request(napi_value server_obj, napi_value socket,
668     nxt_unit_request_info_t *req)
669 {
670     napi_value  constructor, res;
671     req_data_t  *req_data;
672 
673     constructor = get_named_property(server_obj, "ServerRequest");
674 
675     res = new_instance(constructor, server_obj, socket);
676 
677     req_data = (req_data_t *) req->data;
678     req_data->req_ref = wrap(res, req, req_destroy);
679 
680     return res;
681 }
682 
683 
684 napi_value
685 Unit::create_response(napi_value server_obj, napi_value request,
686     nxt_unit_request_info_t *req)
687 {
688     napi_value  constructor, res;
689     req_data_t  *req_data;
690 
691     constructor = get_named_property(server_obj, "ServerResponse");
692 
693     res = new_instance(constructor, request);
694 
695     req_data = (req_data_t *) req->data;
696     req_data->resp_ref = wrap(res, req, resp_destroy);
697 
698     return res;
699 }
700 
701 
702 napi_value
703 Unit::create_websocket_frame(napi_value server_obj,
704                              nxt_unit_websocket_frame_t *ws)
705 {
706     void        *data;
707     napi_value  constructor, res, buffer;
708     uint8_t     sc[2];
709 
710     constructor = get_named_property(server_obj, "WebSocketFrame");
711 
712     res = new_instance(constructor);
713 
714     set_named_property(res, "fin", (bool) ws->header->fin);
715     set_named_property(res, "opcode", ws->header->opcode);
716     set_named_property(res, "length", (int64_t) ws->payload_len);
717 
718     if (ws->header->opcode == NXT_WEBSOCKET_OP_CLOSE) {
719         if (ws->payload_len >= 2) {
720             nxt_unit_websocket_read(ws, sc, 2);
721 
722             set_named_property(res, "closeStatus",
723                                (((uint16_t) sc[0]) << 8) | sc[1]);
724 
725         } else {
726             set_named_property(res, "closeStatus", -1);
727         }
728     }
729 
730     buffer = create_buffer((size_t) ws->content_length, &data);
731     nxt_unit_websocket_read(ws, data, ws->content_length);
732 
733     set_named_property(res, "binaryPayload", buffer);
734 
735     return res;
736 }
737 
738 
739 napi_value
740 Unit::request_read(napi_env env, napi_callback_info info)
741 {
742     void                     *data;
743     uint32_t                 wm;
744     nxt_napi                 napi(env);
745     napi_value               this_arg, argv, buffer;
746     nxt_unit_request_info_t  *req;
747 
748     try {
749         this_arg = napi.get_cb_info(info, argv);
750 
751         try {
752             req = napi.get_request_info(this_arg);
753 
754         } catch (exception &e) {
755             return nullptr;
756         }
757 
758         if (req->content_length == 0) {
759             return nullptr;
760         }
761 
762         wm = napi.get_value_uint32(argv);
763 
764         if (wm > req->content_length) {
765             wm = req->content_length;
766         }
767 
768         buffer = napi.create_buffer((size_t) wm, &data);
769         nxt_unit_request_read(req, data, wm);
770 
771     } catch (exception &e) {
772         napi.throw_error(e);
773         return nullptr;
774     }
775 
776     return buffer;
777 }
778 
779 
780 napi_value
781 Unit::response_send_headers(napi_env env, napi_callback_info info)
782 {
783     int                      ret;
784     char                     *ptr, *name_ptr;
785     bool                     is_array;
786     size_t                   argc, name_len, value_len;
787     uint32_t                 status_code, header_len, keys_len, array_len;
788     uint32_t                 keys_count, i, j;
789     uint16_t                 hash;
790     nxt_napi                 napi(env);
791     napi_value               this_arg, headers, keys, name, value, array_val;
792     napi_value               array_entry;
793     napi_valuetype           val_type;
794     nxt_unit_field_t         *f;
795     nxt_unit_request_info_t  *req;
796     napi_value               argv[4];
797 
798     argc = 4;
799 
800     try {
801         this_arg = napi.get_cb_info(info, argc, argv);
802         if (argc != 4) {
803             napi.throw_error("Wrong args count. Expected: "
804                              "statusCode, headers, headers count, "
805                              "headers length");
806             return nullptr;
807         }
808 
809         req = napi.get_request_info(this_arg);
810         status_code = napi.get_value_uint32(argv[0]);
811         keys_count = napi.get_value_uint32(argv[2]);
812         header_len = napi.get_value_uint32(argv[3]);
813 
814         headers = argv[1];
815 
816         ret = nxt_unit_response_init(req, status_code, keys_count, header_len);
817         if (ret != NXT_UNIT_OK) {
818             napi.throw_error("Failed to create response");
819             return nullptr;
820         }
821 
822         /*
823          * Each name and value are 0-terminated by libunit.
824          * Need to add extra 2 bytes for each header.
825          */
826         header_len += keys_count * 2;
827 
828         keys = napi.get_property_names(headers);
829         keys_len = napi.get_array_length(keys);
830 
831         ptr = req->response_buf->free;
832 
833         for (i = 0; i < keys_len; i++) {
834             name = napi.get_element(keys, i);
835 
836             array_entry = napi.get_property(headers, name);
837 
838             name = napi.get_element(array_entry, 0);
839             value = napi.get_element(array_entry, 1);
840 
841             name_len = napi.get_value_string_latin1(name, ptr, header_len);
842             name_ptr = ptr;
843 
844             ptr += name_len + 1;
845             header_len -= name_len + 1;
846 
847             hash = nxt_unit_field_hash(name_ptr, name_len);
848 
849             is_array = napi.is_array(value);
850 
851             if (is_array) {
852                 array_len = napi.get_array_length(value);
853 
854                 for (j = 0; j < array_len; j++) {
855                     array_val = napi.get_element(value, j);
856 
857                     val_type = napi.type_of(array_val);
858 
859                     if (val_type != napi_string) {
860                         array_val = napi.coerce_to_string(array_val);
861                     }
862 
863                     value_len = napi.get_value_string_latin1(array_val, ptr,
864                                                              header_len);
865 
866                     f = req->response->fields + req->response->fields_count;
867                     f->skip = 0;
868 
869                     nxt_unit_sptr_set(&f->name, name_ptr);
870 
871                     f->name_length = name_len;
872                     f->hash = hash;
873 
874                     nxt_unit_sptr_set(&f->value, ptr);
875                     f->value_length = (uint32_t) value_len;
876 
877                     ptr += value_len + 1;
878                     header_len -= value_len + 1;
879 
880                     req->response->fields_count++;
881                 }
882 
883             } else {
884                 val_type = napi.type_of(value);
885 
886                 if (val_type != napi_string) {
887                     value = napi.coerce_to_string(value);
888                 }
889 
890                 value_len = napi.get_value_string_latin1(value, ptr, header_len);
891 
892                 f = req->response->fields + req->response->fields_count;
893                 f->skip = 0;
894 
895                 nxt_unit_sptr_set(&f->name, name_ptr);
896 
897                 f->name_length = name_len;
898                 f->hash = hash;
899 
900                 nxt_unit_sptr_set(&f->value, ptr);
901                 f->value_length = (uint32_t) value_len;
902 
903                 ptr += value_len + 1;
904                 header_len -= value_len + 1;
905 
906                 req->response->fields_count++;
907             }
908         }
909 
910     } catch (exception &e) {
911         napi.throw_error(e);
912         return nullptr;
913     }
914 
915     req->response_buf->free = ptr;
916 
917     ret = nxt_unit_response_send(req);
918     if (ret != NXT_UNIT_OK) {
919         napi.throw_error("Failed to send response");
920         return nullptr;
921     }
922 
923     return this_arg;
924 }
925 
926 
927 napi_value
928 Unit::response_write(napi_env env, napi_callback_info info)
929 {
930     int                      ret;
931     void                     *ptr;
932     size_t                   argc, have_buf_len;
933     ssize_t                  res_len;
934     uint32_t                 buf_start, buf_len;
935     nxt_napi                 napi(env);
936     napi_value               this_arg;
937     nxt_unit_buf_t           *buf;
938     napi_valuetype           buf_type;
939     nxt_unit_request_info_t  *req;
940     napi_value               argv[3];
941 
942     argc = 3;
943 
944     try {
945         this_arg = napi.get_cb_info(info, argc, argv);
946         if (argc != 3) {
947             throw exception("Wrong args count. Expected: "
948                             "chunk, start, length");
949         }
950 
951         req = napi.get_request_info(this_arg);
952         buf_type = napi.type_of(argv[0]);
953         buf_start = napi.get_value_uint32(argv[1]);
954         buf_len = napi.get_value_uint32(argv[2]) + 1;
955 
956         if (buf_type == napi_string) {
957             /* TODO: will work only for utf8 content-type */
958 
959             if (req->response_buf != NULL
960                 && req->response_buf->end >= req->response_buf->free + buf_len)
961             {
962                 buf = req->response_buf;
963 
964             } else {
965                 buf = nxt_unit_response_buf_alloc(req, buf_len);
966                 if (buf == NULL) {
967                     throw exception("Failed to allocate response buffer");
968                 }
969             }
970 
971             have_buf_len = napi.get_value_string_utf8(argv[0], buf->free,
972                                                       buf_len);
973 
974             buf->free += have_buf_len;
975 
976             ret = nxt_unit_buf_send(buf);
977             if (ret == NXT_UNIT_OK) {
978                 res_len = have_buf_len;
979             }
980 
981         } else {
982             ptr = napi.get_buffer_info(argv[0], have_buf_len);
983 
984             if (buf_start > 0) {
985                 ptr = ((uint8_t *) ptr) + buf_start;
986                 have_buf_len -= buf_start;
987             }
988 
989             res_len = nxt_unit_response_write_nb(req, ptr, have_buf_len, 0);
990 
991             ret = res_len < 0 ? -res_len : (int) NXT_UNIT_OK;
992         }
993 
994         if (ret != NXT_UNIT_OK) {
995             throw exception("Failed to send body buf");
996         }
997     } catch (exception &e) {
998         napi.throw_error(e);
999         return nullptr;
1000     }
1001 
1002     return napi.create((int64_t) res_len);
1003 }
1004 
1005 
1006 napi_value
1007 Unit::response_end(napi_env env, napi_callback_info info)
1008 {
1009     nxt_napi                 napi(env);
1010     napi_value               this_arg;
1011     req_data_t               *req_data;
1012     nxt_unit_request_info_t  *req;
1013 
1014     try {
1015         this_arg = napi.get_cb_info(info);
1016 
1017         req = napi.get_request_info(this_arg);
1018 
1019         req_data = (req_data_t *) req->data;
1020 
1021         napi.remove_wrap(req_data->sock_ref);
1022         napi.remove_wrap(req_data->req_ref);
1023         napi.remove_wrap(req_data->resp_ref);
1024         napi.remove_wrap(req_data->conn_ref);
1025 
1026     } catch (exception &e) {
1027         napi.throw_error(e);
1028         return nullptr;
1029     }
1030 
1031     nxt_unit_request_done(req, NXT_UNIT_OK);
1032 
1033     return this_arg;
1034 }
1035 
1036 
1037 napi_value
1038 Unit::websocket_send_frame(napi_env env, napi_callback_info info)
1039 {
1040     int                      ret, iovec_len;
1041     bool                     fin;
1042     size_t                   buf_len;
1043     uint32_t                 opcode, sc;
1044     nxt_napi                 napi(env);
1045     napi_value               this_arg, frame, payload;
1046     nxt_unit_request_info_t  *req;
1047     char                     status_code[2];
1048     struct iovec             iov[2];
1049 
1050     iovec_len = 0;
1051 
1052     try {
1053         this_arg = napi.get_cb_info(info, frame);
1054 
1055         req = napi.get_request_info(this_arg);
1056 
1057         opcode = napi.get_value_uint32(napi.get_named_property(frame,
1058                                                                "opcode"));
1059         if (opcode == NXT_WEBSOCKET_OP_CLOSE) {
1060             sc = napi.get_value_uint32(napi.get_named_property(frame,
1061                                                                "closeStatus"));
1062             status_code[0] = (sc >> 8) & 0xFF;
1063             status_code[1] = sc & 0xFF;
1064 
1065             iov[iovec_len].iov_base = status_code;
1066             iov[iovec_len].iov_len = 2;
1067             iovec_len++;
1068         }
1069 
1070         try {
1071             fin = napi.get_value_bool(napi.get_named_property(frame, "fin"));
1072 
1073         } catch (exception &e) {
1074             fin = true;
1075         }
1076 
1077         payload = napi.get_named_property(frame, "binaryPayload");
1078 
1079         if (napi.is_buffer(payload)) {
1080             iov[iovec_len].iov_base = napi.get_buffer_info(payload, buf_len);
1081 
1082         } else {
1083             buf_len = 0;
1084         }
1085 
1086     } catch (exception &e) {
1087         napi.throw_error(e);
1088         return nullptr;
1089     }
1090 
1091     if (buf_len > 0) {
1092         iov[iovec_len].iov_len = buf_len;
1093         iovec_len++;
1094     }
1095 
1096     ret = nxt_unit_websocket_sendv(req, opcode, fin ? 1 : 0, iov, iovec_len);
1097     if (ret != NXT_UNIT_OK) {
1098         goto failed;
1099     }
1100 
1101     return this_arg;
1102 
1103 failed:
1104 
1105     napi.throw_error("Failed to send frame");
1106 
1107     return nullptr;
1108 }
1109 
1110 
1111 napi_value
1112 Unit::websocket_set_sock(napi_env env, napi_callback_info info)
1113 {
1114     nxt_napi                 napi(env);
1115     napi_value               this_arg, sock;
1116     req_data_t               *req_data;
1117     nxt_unit_request_info_t  *req;
1118 
1119     try {
1120         this_arg = napi.get_cb_info(info, sock);
1121 
1122         req = napi.get_request_info(sock);
1123 
1124         req_data = (req_data_t *) req->data;
1125         req_data->conn_ref = napi.wrap(this_arg, req, conn_destroy);
1126 
1127     } catch (exception &e) {
1128         napi.throw_error(e);
1129         return nullptr;
1130     }
1131 
1132     return this_arg;
1133 }
1134 
1135 
1136 void
1137 Unit::conn_destroy(napi_env env, void *r, void *finalize_hint)
1138 {
1139     nxt_unit_req_debug(NULL, "conn_destroy: %p", r);
1140 }
1141 
1142 
1143 void
1144 Unit::sock_destroy(napi_env env, void *r, void *finalize_hint)
1145 {
1146     nxt_unit_req_debug(NULL, "sock_destroy: %p", r);
1147 }
1148 
1149 
1150 void
1151 Unit::req_destroy(napi_env env, void *r, void *finalize_hint)
1152 {
1153     nxt_unit_req_debug(NULL, "req_destroy: %p", r);
1154 }
1155 
1156 
1157 void
1158 Unit::resp_destroy(napi_env env, void *r, void *finalize_hint)
1159 {
1160     nxt_unit_req_debug(NULL, "resp_destroy: %p", r);
1161 }
1162