xref: /unit/src/nodejs/unit-http/unit.cpp (revision 2623:15a457cb829e)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #include "unit.h"
7 
8 #include <unistd.h>
9 #include <fcntl.h>
10 
11 #include <uv.h>
12 
13 #include <nxt_unit_websocket.h>
14 
15 
16 napi_ref Unit::constructor_;
17 
18 
19 struct port_data_t {
20     port_data_t(nxt_unit_ctx_t *c, nxt_unit_port_t *p);
21 
22     void process_port_msg();
23     void stop();
24 
25     template<typename T>
26     static port_data_t *get(T *handle);
27 
28     static void read_callback(uv_poll_t *handle, int status, int events);
29     static void timer_callback(uv_timer_t *handle);
30     static void delete_data(uv_handle_t* handle);
31 
32     nxt_unit_ctx_t   *ctx;
33     nxt_unit_port_t  *port;
34     uv_poll_t        poll;
35     uv_timer_t       timer;
36     int              ref_count;
37     bool             scheduled;
38     bool             stopped;
39 };
40 
41 
42 struct req_data_t {
43     napi_ref  sock_ref;
44     napi_ref  req_ref;
45     napi_ref  resp_ref;
46     napi_ref  conn_ref;
47 };
48 
49 
port_data_t(nxt_unit_ctx_t * c,nxt_unit_port_t * p)50 port_data_t::port_data_t(nxt_unit_ctx_t *c, nxt_unit_port_t *p) :
51     ctx(c), port(p), ref_count(0), scheduled(false), stopped(false)
52 {
53     timer.type = UV_UNKNOWN_HANDLE;
54 }
55 
56 
57 void
process_port_msg()58 port_data_t::process_port_msg()
59 {
60     int  rc, err;
61 
62     rc = nxt_unit_process_port_msg(ctx, port);
63 
64     if (rc != NXT_UNIT_OK) {
65         return;
66     }
67 
68     if (timer.type == UV_UNKNOWN_HANDLE) {
69         err = uv_timer_init(poll.loop, &timer);
70         if (err < 0) {
71             nxt_unit_warn(ctx, "Failed to init uv.poll");
72             return;
73         }
74 
75         ref_count++;
76         timer.data = this;
77     }
78 
79     if (!scheduled && !stopped) {
80         uv_timer_start(&timer, timer_callback, 0, 0);
81 
82         scheduled = true;
83     }
84 }
85 
86 
87 void
stop()88 port_data_t::stop()
89 {
90     stopped = true;
91 
92     uv_poll_stop(&poll);
93 
94     uv_close((uv_handle_t *) &poll, delete_data);
95 
96     if (timer.type == UV_UNKNOWN_HANDLE) {
97         return;
98     }
99 
100     uv_timer_stop(&timer);
101 
102     uv_close((uv_handle_t *) &timer, delete_data);
103 }
104 
105 
106 template<typename T>
107 port_data_t *
get(T * handle)108 port_data_t::get(T *handle)
109 {
110     return (port_data_t *) handle->data;
111 }
112 
113 
114 void
read_callback(uv_poll_t * handle,int status,int events)115 port_data_t::read_callback(uv_poll_t *handle, int status, int events)
116 {
117     get(handle)->process_port_msg();
118 }
119 
120 
121 void
timer_callback(uv_timer_t * handle)122 port_data_t::timer_callback(uv_timer_t *handle)
123 {
124     port_data_t  *data;
125 
126     data = get(handle);
127 
128     data->scheduled = false;
129     if (data->stopped) {
130         return;
131     }
132 
133     data->process_port_msg();
134 }
135 
136 
137 void
delete_data(uv_handle_t * handle)138 port_data_t::delete_data(uv_handle_t* handle)
139 {
140     port_data_t  *data;
141 
142     data = get(handle);
143 
144     if (--data->ref_count <= 0) {
145         delete data;
146     }
147 }
148 
149 
Unit(napi_env env,napi_value jsthis)150 Unit::Unit(napi_env env, napi_value jsthis):
151     nxt_napi(env),
152     wrapper_(wrap(jsthis, this, destroy)),
153     unit_ctx_(nullptr)
154 {
155     nxt_unit_debug(NULL, "Unit::Unit()");
156 }
157 
158 
~Unit()159 Unit::~Unit()
160 {
161     delete_reference(wrapper_);
162 
163     nxt_unit_debug(NULL, "Unit::~Unit()");
164 }
165 
166 
167 napi_value
init(napi_env env,napi_value exports)168 Unit::init(napi_env env, napi_value exports)
169 {
170     nxt_napi    napi(env);
171     napi_value  ctor;
172 
173     napi_property_descriptor  unit_props[] = {
174         { "createServer", 0, create_server, 0, 0, 0, napi_default, 0 },
175         { "listen", 0, listen, 0, 0, 0, napi_default, 0 },
176     };
177 
178     try {
179         ctor = napi.define_class("Unit", create, 2, unit_props);
180         constructor_ = napi.create_reference(ctor);
181 
182         napi.set_named_property(exports, "Unit", ctor);
183         napi.set_named_property(exports, "request_read", request_read);
184         napi.set_named_property(exports, "response_send_headers",
185                                 response_send_headers);
186         napi.set_named_property(exports, "response_write", response_write);
187         napi.set_named_property(exports, "response_end", response_end);
188         napi.set_named_property(exports, "websocket_send_frame",
189                                 websocket_send_frame);
190         napi.set_named_property(exports, "websocket_set_sock",
191                                 websocket_set_sock);
192         napi.set_named_property(exports, "buf_min", nxt_unit_buf_min());
193         napi.set_named_property(exports, "buf_max", nxt_unit_buf_max());
194 
195     } catch (exception &e) {
196         napi.throw_error(e);
197         return nullptr;
198     }
199 
200     return exports;
201 }
202 
203 
204 void
destroy(napi_env env,void * nativeObject,void * finalize_hint)205 Unit::destroy(napi_env env, void *nativeObject, void *finalize_hint)
206 {
207     Unit  *obj = reinterpret_cast<Unit *>(nativeObject);
208 
209     delete obj;
210 }
211 
212 
213 napi_value
create(napi_env env,napi_callback_info info)214 Unit::create(napi_env env, napi_callback_info info)
215 {
216     nxt_napi    napi(env);
217     napi_value  target, ctor, instance, jsthis;
218 
219     try {
220         target = napi.get_new_target(info);
221 
222         if (target != nullptr) {
223             /* Invoked as constructor: `new Unit(...)`. */
224             jsthis = napi.get_cb_info(info);
225 
226             new Unit(env, jsthis);
227             napi.create_reference(jsthis);
228 
229             return jsthis;
230         }
231 
232         /* Invoked as plain function `Unit(...)`, turn into construct call. */
233         ctor = napi.get_reference_value(constructor_);
234         instance = napi.new_instance(ctor);
235         napi.create_reference(instance);
236 
237     } catch (exception &e) {
238         napi.throw_error(e);
239         return nullptr;
240     }
241 
242     return instance;
243 }
244 
245 
246 napi_value
create_server(napi_env env,napi_callback_info info)247 Unit::create_server(napi_env env, napi_callback_info info)
248 {
249     Unit             *obj;
250     size_t           argc;
251     nxt_napi         napi(env);
252     napi_value       jsthis, argv;
253     nxt_unit_init_t  unit_init;
254 
255     argc = 1;
256 
257     try {
258         jsthis = napi.get_cb_info(info, argc, &argv);
259         obj = (Unit *) napi.unwrap(jsthis);
260 
261     } catch (exception &e) {
262         napi.throw_error(e);
263         return nullptr;
264     }
265 
266     memset(&unit_init, 0, sizeof(nxt_unit_init_t));
267 
268     unit_init.data = obj;
269     unit_init.callbacks.request_handler   = request_handler_cb;
270     unit_init.callbacks.websocket_handler = websocket_handler_cb;
271     unit_init.callbacks.close_handler     = close_handler_cb;
272     unit_init.callbacks.shm_ack_handler   = shm_ack_handler_cb;
273     unit_init.callbacks.add_port          = add_port;
274     unit_init.callbacks.remove_port       = remove_port;
275     unit_init.callbacks.quit              = quit_cb;
276 
277     unit_init.request_data_size = sizeof(req_data_t);
278 
279     obj->unit_ctx_ = nxt_unit_init(&unit_init);
280     if (obj->unit_ctx_ == NULL) {
281         goto failed;
282     }
283 
284     return nullptr;
285 
286 failed:
287 
288     napi_throw_error(env, NULL, "Failed to create Unit object");
289 
290     return nullptr;
291 }
292 
293 
294 napi_value
listen(napi_env env,napi_callback_info info)295 Unit::listen(napi_env env, napi_callback_info info)
296 {
297     return nullptr;
298 }
299 
300 
301 void
request_handler_cb(nxt_unit_request_info_t * req)302 Unit::request_handler_cb(nxt_unit_request_info_t *req)
303 {
304     Unit  *obj;
305 
306     obj = reinterpret_cast<Unit *>(req->unit->data);
307 
308     obj->request_handler(req);
309 }
310 
311 
312 void
request_handler(nxt_unit_request_info_t * req)313 Unit::request_handler(nxt_unit_request_info_t *req)
314 {
315     napi_value  socket, request, response, server_obj, emit_request;
316 
317     memset(req->data, 0, sizeof(req_data_t));
318 
319     try {
320         nxt_handle_scope  scope(env());
321 
322         server_obj = get_server_object();
323 
324         socket = create_socket(server_obj, req);
325         request = create_request(server_obj, socket, req);
326         response = create_response(server_obj, request, req);
327 
328         create_headers(req, request);
329 
330         emit_request = get_named_property(server_obj, "emit_request");
331 
332         nxt_async_context   async_context(env(), "request_handler");
333         nxt_callback_scope  async_scope(async_context);
334 
335         make_callback(async_context, server_obj, emit_request, request,
336                       response);
337 
338     } catch (exception &e) {
339         nxt_unit_req_warn(req, "request_handler: %s", e.str);
340     }
341 }
342 
343 
344 void
websocket_handler_cb(nxt_unit_websocket_frame_t * ws)345 Unit::websocket_handler_cb(nxt_unit_websocket_frame_t *ws)
346 {
347     Unit  *obj;
348 
349     obj = reinterpret_cast<Unit *>(ws->req->unit->data);
350 
351     obj->websocket_handler(ws);
352 }
353 
354 
355 void
websocket_handler(nxt_unit_websocket_frame_t * ws)356 Unit::websocket_handler(nxt_unit_websocket_frame_t *ws)
357 {
358     napi_value  frame, server_obj, process_frame, conn;
359     req_data_t  *req_data;
360 
361     req_data = (req_data_t *) ws->req->data;
362 
363     try {
364         nxt_handle_scope  scope(env());
365 
366         server_obj = get_server_object();
367 
368         frame = create_websocket_frame(server_obj, ws);
369 
370         conn = get_reference_value(req_data->conn_ref);
371 
372         process_frame = get_named_property(conn, "processFrame");
373 
374         nxt_async_context   async_context(env(), "websocket_handler");
375         nxt_callback_scope  async_scope(async_context);
376 
377         make_callback(async_context, conn, process_frame, frame);
378 
379     } catch (exception &e) {
380         nxt_unit_req_warn(ws->req, "websocket_handler: %s", e.str);
381     }
382 
383     nxt_unit_websocket_done(ws);
384 }
385 
386 
387 void
close_handler_cb(nxt_unit_request_info_t * req)388 Unit::close_handler_cb(nxt_unit_request_info_t *req)
389 {
390     Unit  *obj;
391 
392     obj = reinterpret_cast<Unit *>(req->unit->data);
393 
394     obj->close_handler(req);
395 }
396 
397 
398 void
close_handler(nxt_unit_request_info_t * req)399 Unit::close_handler(nxt_unit_request_info_t *req)
400 {
401     napi_value  conn_handle_close, conn;
402     req_data_t  *req_data;
403 
404     req_data = (req_data_t *) req->data;
405 
406     try {
407         nxt_handle_scope  scope(env());
408 
409         conn = get_reference_value(req_data->conn_ref);
410 
411         conn_handle_close = get_named_property(conn, "handleSocketClose");
412 
413         nxt_async_context   async_context(env(), "close_handler");
414         nxt_callback_scope  async_scope(async_context);
415 
416         make_callback(async_context, conn, conn_handle_close,
417                       nxt_napi::create(0));
418 
419         remove_wrap(req_data->sock_ref);
420         remove_wrap(req_data->req_ref);
421         remove_wrap(req_data->resp_ref);
422         remove_wrap(req_data->conn_ref);
423 
424     } catch (exception &e) {
425         nxt_unit_req_warn(req, "close_handler: %s", e.str);
426 
427         nxt_unit_request_done(req, NXT_UNIT_ERROR);
428 
429         return;
430     }
431 
432     nxt_unit_request_done(req, NXT_UNIT_OK);
433 }
434 
435 
436 void
shm_ack_handler_cb(nxt_unit_ctx_t * ctx)437 Unit::shm_ack_handler_cb(nxt_unit_ctx_t *ctx)
438 {
439     Unit  *obj;
440 
441     obj = reinterpret_cast<Unit *>(ctx->unit->data);
442 
443     obj->shm_ack_handler(ctx);
444 }
445 
446 
447 void
shm_ack_handler(nxt_unit_ctx_t * ctx)448 Unit::shm_ack_handler(nxt_unit_ctx_t *ctx)
449 {
450     napi_value  server_obj, emit_drain;
451 
452     try {
453         nxt_handle_scope  scope(env());
454 
455         server_obj = get_server_object();
456 
457         emit_drain = get_named_property(server_obj, "emit_drain");
458 
459         nxt_async_context   async_context(env(), "shm_ack_handler");
460         nxt_callback_scope  async_scope(async_context);
461 
462         make_callback(async_context, server_obj, emit_drain);
463 
464     } catch (exception &e) {
465         nxt_unit_warn(ctx, "shm_ack_handler: %s", e.str);
466     }
467 }
468 
469 
470 int
add_port(nxt_unit_ctx_t * ctx,nxt_unit_port_t * port)471 Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
472 {
473     int          err;
474     Unit         *obj;
475     uv_loop_t    *loop;
476     port_data_t  *data;
477     napi_status  status;
478 
479     if (port->in_fd != -1) {
480         if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) {
481             nxt_unit_warn(ctx, "fcntl(%d, O_NONBLOCK) failed: %s (%d)",
482                           port->in_fd, strerror(errno), errno);
483             return -1;
484         }
485 
486         obj = reinterpret_cast<Unit *>(ctx->unit->data);
487 
488         status = napi_get_uv_event_loop(obj->env(), &loop);
489         if (status != napi_ok) {
490             nxt_unit_warn(ctx, "Failed to get uv.loop");
491             return NXT_UNIT_ERROR;
492         }
493 
494         data = new port_data_t(ctx, port);
495 
496         err = uv_poll_init(loop, &data->poll, port->in_fd);
497         if (err < 0) {
498             nxt_unit_warn(ctx, "Failed to init uv.poll");
499             delete data;
500             return NXT_UNIT_ERROR;
501         }
502 
503         err = uv_poll_start(&data->poll, UV_READABLE,
504                             port_data_t::read_callback);
505         if (err < 0) {
506             nxt_unit_warn(ctx, "Failed to start uv.poll");
507             delete data;
508             return NXT_UNIT_ERROR;
509         }
510 
511         port->data = data;
512 
513         data->ref_count++;
514         data->poll.data = data;
515     }
516 
517     return NXT_UNIT_OK;
518 }
519 
520 
521 void
remove_port(nxt_unit_t * unit,nxt_unit_ctx_t * ctx,nxt_unit_port_t * port)522 Unit::remove_port(nxt_unit_t *unit, nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
523 {
524     port_data_t  *data;
525 
526     if (port->data != NULL && ctx != NULL) {
527         data = (port_data_t *) port->data;
528 
529         data->stop();
530     }
531 }
532 
533 
534 void
quit_cb(nxt_unit_ctx_t * ctx)535 Unit::quit_cb(nxt_unit_ctx_t *ctx)
536 {
537     Unit  *obj;
538 
539     obj = reinterpret_cast<Unit *>(ctx->unit->data);
540 
541     obj->quit(ctx);
542 }
543 
544 
545 void
quit(nxt_unit_ctx_t * ctx)546 Unit::quit(nxt_unit_ctx_t *ctx)
547 {
548     napi_value  server_obj, emit_close;
549 
550     try {
551         nxt_handle_scope  scope(env());
552 
553         server_obj = get_server_object();
554 
555         emit_close = get_named_property(server_obj, "emit_close");
556 
557         nxt_async_context   async_context(env(), "unit_quit");
558         nxt_callback_scope  async_scope(async_context);
559 
560         make_callback(async_context, server_obj, emit_close);
561 
562     } catch (exception &e) {
563         nxt_unit_debug(ctx, "quit: %s", e.str);
564     }
565 
566     nxt_unit_done(ctx);
567 }
568 
569 
570 napi_value
get_server_object()571 Unit::get_server_object()
572 {
573     napi_value  unit_obj;
574 
575     unit_obj = get_reference_value(wrapper_);
576 
577     return get_named_property(unit_obj, "server");
578 }
579 
580 
581 void
create_headers(nxt_unit_request_info_t * req,napi_value request)582 Unit::create_headers(nxt_unit_request_info_t *req, napi_value request)
583 {
584     char                *p;
585     uint32_t            i;
586     napi_value          headers, raw_headers;
587     napi_status         status;
588     nxt_unit_request_t  *r;
589 
590     r = req->request;
591 
592     headers = create_object();
593 
594     status = napi_create_array_with_length(env(), r->fields_count * 2,
595                                            &raw_headers);
596     if (status != napi_ok) {
597         throw exception("Failed to create array");
598     }
599 
600     for (i = 0; i < r->fields_count; i++) {
601         append_header(r->fields + i, headers, raw_headers, i);
602     }
603 
604     set_named_property(request, "headers", headers);
605     set_named_property(request, "rawHeaders", raw_headers);
606 
607     // trim the "HTTP/" protocol prefix
608     p = (char *) nxt_unit_sptr_get(&r->version);
609     p += 5;
610 
611     set_named_property(request, "httpVersion", create_string_latin1(p, 3));
612     set_named_property(request, "method", r->method, r->method_length);
613     set_named_property(request, "url", r->target, r->target_length);
614 
615     set_named_property(request, "_websocket_handshake", r->websocket_handshake);
616 }
617 
618 
619 inline char
lowcase(char c)620 lowcase(char c)
621 {
622     return (c >= 'A' && c <= 'Z') ? (c | 0x20) : c;
623 }
624 
625 
626 inline void
append_header(nxt_unit_field_t * f,napi_value headers,napi_value raw_headers,uint32_t idx)627 Unit::append_header(nxt_unit_field_t *f, napi_value headers,
628     napi_value raw_headers, uint32_t idx)
629 {
630     char        *name;
631     uint8_t     i;
632     napi_value  str, vstr;
633 
634     name = (char *) nxt_unit_sptr_get(&f->name);
635 
636     str = create_string_latin1(name, f->name_length);
637 
638     for (i = 0; i < f->name_length; i++) {
639         name[i] = lowcase(name[i]);
640     }
641 
642     vstr = set_named_property(headers, name, f->value, f->value_length);
643 
644     set_element(raw_headers, idx * 2, str);
645     set_element(raw_headers, idx * 2 + 1, vstr);
646 }
647 
648 
649 napi_value
create_socket(napi_value server_obj,nxt_unit_request_info_t * req)650 Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req)
651 {
652     napi_value          constructor, res;
653     req_data_t          *req_data;
654     nxt_unit_request_t  *r;
655 
656     r = req->request;
657 
658     constructor = get_named_property(server_obj, "Socket");
659 
660     res = new_instance(constructor);
661 
662     req_data = (req_data_t *) req->data;
663     req_data->sock_ref = wrap(res, req, sock_destroy);
664 
665     set_named_property(res, "remoteAddress", r->remote, r->remote_length);
666     set_named_property(res, "localAddress", r->local_addr,
667                        r->local_addr_length);
668 
669     return res;
670 }
671 
672 
673 napi_value
create_request(napi_value server_obj,napi_value socket,nxt_unit_request_info_t * req)674 Unit::create_request(napi_value server_obj, napi_value socket,
675     nxt_unit_request_info_t *req)
676 {
677     napi_value  constructor, res;
678     req_data_t  *req_data;
679 
680     constructor = get_named_property(server_obj, "ServerRequest");
681 
682     res = new_instance(constructor, server_obj, socket);
683 
684     req_data = (req_data_t *) req->data;
685     req_data->req_ref = wrap(res, req, req_destroy);
686 
687     return res;
688 }
689 
690 
691 napi_value
create_response(napi_value server_obj,napi_value request,nxt_unit_request_info_t * req)692 Unit::create_response(napi_value server_obj, napi_value request,
693     nxt_unit_request_info_t *req)
694 {
695     napi_value  constructor, res;
696     req_data_t  *req_data;
697 
698     constructor = get_named_property(server_obj, "ServerResponse");
699 
700     res = new_instance(constructor, request);
701 
702     req_data = (req_data_t *) req->data;
703     req_data->resp_ref = wrap(res, req, resp_destroy);
704 
705     return res;
706 }
707 
708 
709 napi_value
create_websocket_frame(napi_value server_obj,nxt_unit_websocket_frame_t * ws)710 Unit::create_websocket_frame(napi_value server_obj,
711                              nxt_unit_websocket_frame_t *ws)
712 {
713     void        *data;
714     napi_value  constructor, res, buffer;
715     uint8_t     sc[2];
716 
717     constructor = get_named_property(server_obj, "WebSocketFrame");
718 
719     res = new_instance(constructor);
720 
721     set_named_property(res, "fin", (bool) ws->header->fin);
722     set_named_property(res, "opcode", ws->header->opcode);
723     set_named_property(res, "length", (int64_t) ws->payload_len);
724 
725     if (ws->header->opcode == NXT_WEBSOCKET_OP_CLOSE) {
726         if (ws->payload_len >= 2) {
727             nxt_unit_websocket_read(ws, sc, 2);
728 
729             set_named_property(res, "closeStatus",
730                                (((uint16_t) sc[0]) << 8) | sc[1]);
731 
732         } else {
733             set_named_property(res, "closeStatus", -1);
734         }
735     }
736 
737     buffer = create_buffer((size_t) ws->content_length, &data);
738     nxt_unit_websocket_read(ws, data, ws->content_length);
739 
740     set_named_property(res, "binaryPayload", buffer);
741 
742     return res;
743 }
744 
745 
746 napi_value
request_read(napi_env env,napi_callback_info info)747 Unit::request_read(napi_env env, napi_callback_info info)
748 {
749     void                     *data;
750     uint32_t                 wm;
751     nxt_napi                 napi(env);
752     napi_value               this_arg, argv, buffer;
753     nxt_unit_request_info_t  *req;
754 
755     try {
756         this_arg = napi.get_cb_info(info, argv);
757 
758         try {
759             req = napi.get_request_info(this_arg);
760 
761         } catch (exception &e) {
762             return nullptr;
763         }
764 
765         if (req->content_length == 0) {
766             return nullptr;
767         }
768 
769         wm = napi.get_value_uint32(argv);
770 
771         if (wm > req->content_length) {
772             wm = req->content_length;
773         }
774 
775         buffer = napi.create_buffer((size_t) wm, &data);
776         nxt_unit_request_read(req, data, wm);
777 
778     } catch (exception &e) {
779         napi.throw_error(e);
780         return nullptr;
781     }
782 
783     return buffer;
784 }
785 
786 
787 napi_value
response_send_headers(napi_env env,napi_callback_info info)788 Unit::response_send_headers(napi_env env, napi_callback_info info)
789 {
790     int                      ret;
791     char                     *ptr, *name_ptr;
792     bool                     is_array;
793     size_t                   argc, name_len, value_len;
794     uint32_t                 status_code, header_len, keys_len, array_len;
795     uint32_t                 keys_count, i, j;
796     uint16_t                 hash;
797     nxt_napi                 napi(env);
798     napi_value               this_arg, headers, keys, name, value, array_val;
799     napi_value               array_entry;
800     napi_valuetype           val_type;
801     nxt_unit_field_t         *f;
802     nxt_unit_request_info_t  *req;
803     napi_value               argv[4];
804 
805     argc = 4;
806 
807     try {
808         this_arg = napi.get_cb_info(info, argc, argv);
809         if (argc != 4) {
810             napi.throw_error("Wrong args count. Expected: "
811                              "statusCode, headers, headers count, "
812                              "headers length");
813             return nullptr;
814         }
815 
816         req = napi.get_request_info(this_arg);
817         status_code = napi.get_value_uint32(argv[0]);
818         keys_count = napi.get_value_uint32(argv[2]);
819         header_len = napi.get_value_uint32(argv[3]);
820 
821         headers = argv[1];
822 
823         ret = nxt_unit_response_init(req, status_code, keys_count, header_len);
824         if (ret != NXT_UNIT_OK) {
825             napi.throw_error("Failed to create response");
826             return nullptr;
827         }
828 
829         /*
830          * Each name and value are 0-terminated by libunit.
831          * Need to add extra 2 bytes for each header.
832          */
833         header_len += keys_count * 2;
834 
835         keys = napi.get_property_names(headers);
836         keys_len = napi.get_array_length(keys);
837 
838         ptr = req->response_buf->free;
839 
840         for (i = 0; i < keys_len; i++) {
841             name = napi.get_element(keys, i);
842 
843             array_entry = napi.get_property(headers, name);
844 
845             name = napi.get_element(array_entry, 0);
846             value = napi.get_element(array_entry, 1);
847 
848             name_len = napi.get_value_string_latin1(name, ptr, header_len);
849             name_ptr = ptr;
850 
851             ptr += name_len + 1;
852             header_len -= name_len + 1;
853 
854             hash = nxt_unit_field_hash(name_ptr, name_len);
855 
856             is_array = napi.is_array(value);
857 
858             if (is_array) {
859                 array_len = napi.get_array_length(value);
860 
861                 for (j = 0; j < array_len; j++) {
862                     array_val = napi.get_element(value, j);
863 
864                     val_type = napi.type_of(array_val);
865 
866                     if (val_type != napi_string) {
867                         array_val = napi.coerce_to_string(array_val);
868                     }
869 
870                     value_len = napi.get_value_string_latin1(array_val, ptr,
871                                                              header_len);
872 
873                     f = req->response->fields + req->response->fields_count;
874                     f->skip = 0;
875 
876                     nxt_unit_sptr_set(&f->name, name_ptr);
877 
878                     f->name_length = name_len;
879                     f->hash = hash;
880 
881                     nxt_unit_sptr_set(&f->value, ptr);
882                     f->value_length = (uint32_t) value_len;
883 
884                     ptr += value_len + 1;
885                     header_len -= value_len + 1;
886 
887                     req->response->fields_count++;
888                 }
889 
890             } else {
891                 val_type = napi.type_of(value);
892 
893                 if (val_type != napi_string) {
894                     value = napi.coerce_to_string(value);
895                 }
896 
897                 value_len = napi.get_value_string_latin1(value, ptr, header_len);
898 
899                 f = req->response->fields + req->response->fields_count;
900                 f->skip = 0;
901 
902                 nxt_unit_sptr_set(&f->name, name_ptr);
903 
904                 f->name_length = name_len;
905                 f->hash = hash;
906 
907                 nxt_unit_sptr_set(&f->value, ptr);
908                 f->value_length = (uint32_t) value_len;
909 
910                 ptr += value_len + 1;
911                 header_len -= value_len + 1;
912 
913                 req->response->fields_count++;
914             }
915         }
916 
917     } catch (exception &e) {
918         napi.throw_error(e);
919         return nullptr;
920     }
921 
922     req->response_buf->free = ptr;
923 
924     ret = nxt_unit_response_send(req);
925     if (ret != NXT_UNIT_OK) {
926         napi.throw_error("Failed to send response");
927         return nullptr;
928     }
929 
930     return this_arg;
931 }
932 
933 
934 napi_value
response_write(napi_env env,napi_callback_info info)935 Unit::response_write(napi_env env, napi_callback_info info)
936 {
937     int                      ret;
938     void                     *ptr;
939     size_t                   argc, have_buf_len;
940     ssize_t                  res_len;
941     uint32_t                 buf_start, buf_len;
942     nxt_napi                 napi(env);
943     napi_value               this_arg;
944     nxt_unit_buf_t           *buf;
945     napi_valuetype           buf_type;
946     nxt_unit_request_info_t  *req;
947     napi_value               argv[3];
948 
949     argc = 3;
950 
951     try {
952         this_arg = napi.get_cb_info(info, argc, argv);
953         if (argc != 3) {
954             throw exception("Wrong args count. Expected: "
955                             "chunk, start, length");
956         }
957 
958         req = napi.get_request_info(this_arg);
959         buf_type = napi.type_of(argv[0]);
960         buf_start = napi.get_value_uint32(argv[1]);
961         buf_len = napi.get_value_uint32(argv[2]) + 1;
962 
963         if (buf_type == napi_string) {
964             /* TODO: will work only for utf8 content-type */
965 
966             if (req->response_buf != NULL
967                 && req->response_buf->end >= req->response_buf->free + buf_len)
968             {
969                 buf = req->response_buf;
970 
971             } else {
972                 buf = nxt_unit_response_buf_alloc(req, buf_len);
973                 if (buf == NULL) {
974                     throw exception("Failed to allocate response buffer");
975                 }
976             }
977 
978             have_buf_len = napi.get_value_string_utf8(argv[0], buf->free,
979                                                       buf_len);
980 
981             buf->free += have_buf_len;
982 
983             ret = nxt_unit_buf_send(buf);
984             if (ret == NXT_UNIT_OK) {
985                 res_len = have_buf_len;
986             }
987 
988         } else {
989             ptr = napi.get_buffer_info(argv[0], have_buf_len);
990 
991             if (buf_start > 0) {
992                 ptr = ((uint8_t *) ptr) + buf_start;
993                 have_buf_len -= buf_start;
994             }
995 
996             res_len = nxt_unit_response_write_nb(req, ptr, have_buf_len, 0);
997 
998             ret = res_len < 0 ? -res_len : (int) NXT_UNIT_OK;
999         }
1000 
1001         if (ret != NXT_UNIT_OK) {
1002             throw exception("Failed to send body buf");
1003         }
1004     } catch (exception &e) {
1005         napi.throw_error(e);
1006         return nullptr;
1007     }
1008 
1009     return napi.create((int64_t) res_len);
1010 }
1011 
1012 
1013 napi_value
response_end(napi_env env,napi_callback_info info)1014 Unit::response_end(napi_env env, napi_callback_info info)
1015 {
1016     nxt_napi                 napi(env);
1017     napi_value               this_arg;
1018     req_data_t               *req_data;
1019     nxt_unit_request_info_t  *req;
1020 
1021     try {
1022         this_arg = napi.get_cb_info(info);
1023 
1024         req = napi.get_request_info(this_arg);
1025 
1026         req_data = (req_data_t *) req->data;
1027 
1028         napi.remove_wrap(req_data->sock_ref);
1029         napi.remove_wrap(req_data->req_ref);
1030         napi.remove_wrap(req_data->resp_ref);
1031         napi.remove_wrap(req_data->conn_ref);
1032 
1033     } catch (exception &e) {
1034         napi.throw_error(e);
1035         return nullptr;
1036     }
1037 
1038     nxt_unit_request_done(req, NXT_UNIT_OK);
1039 
1040     return this_arg;
1041 }
1042 
1043 
1044 napi_value
websocket_send_frame(napi_env env,napi_callback_info info)1045 Unit::websocket_send_frame(napi_env env, napi_callback_info info)
1046 {
1047     int                      ret, iovec_len;
1048     bool                     fin;
1049     size_t                   buf_len;
1050     uint32_t                 opcode, sc;
1051     nxt_napi                 napi(env);
1052     napi_value               this_arg, frame, payload;
1053     nxt_unit_request_info_t  *req;
1054     char                     status_code[2];
1055     struct iovec             iov[2];
1056 
1057     iovec_len = 0;
1058 
1059     try {
1060         this_arg = napi.get_cb_info(info, frame);
1061 
1062         req = napi.get_request_info(this_arg);
1063 
1064         opcode = napi.get_value_uint32(napi.get_named_property(frame,
1065                                                                "opcode"));
1066         if (opcode == NXT_WEBSOCKET_OP_CLOSE) {
1067             sc = napi.get_value_uint32(napi.get_named_property(frame,
1068                                                                "closeStatus"));
1069             status_code[0] = (sc >> 8) & 0xFF;
1070             status_code[1] = sc & 0xFF;
1071 
1072             iov[iovec_len].iov_base = status_code;
1073             iov[iovec_len].iov_len = 2;
1074             iovec_len++;
1075         }
1076 
1077         try {
1078             fin = napi.get_value_bool(napi.get_named_property(frame, "fin"));
1079 
1080         } catch (exception &e) {
1081             fin = true;
1082         }
1083 
1084         payload = napi.get_named_property(frame, "binaryPayload");
1085 
1086         if (napi.is_buffer(payload)) {
1087             iov[iovec_len].iov_base = napi.get_buffer_info(payload, buf_len);
1088 
1089         } else {
1090             buf_len = 0;
1091         }
1092 
1093     } catch (exception &e) {
1094         napi.throw_error(e);
1095         return nullptr;
1096     }
1097 
1098     if (buf_len > 0) {
1099         iov[iovec_len].iov_len = buf_len;
1100         iovec_len++;
1101     }
1102 
1103     ret = nxt_unit_websocket_sendv(req, opcode, fin ? 1 : 0, iov, iovec_len);
1104     if (ret != NXT_UNIT_OK) {
1105         goto failed;
1106     }
1107 
1108     return this_arg;
1109 
1110 failed:
1111 
1112     napi.throw_error("Failed to send frame");
1113 
1114     return nullptr;
1115 }
1116 
1117 
1118 napi_value
websocket_set_sock(napi_env env,napi_callback_info info)1119 Unit::websocket_set_sock(napi_env env, napi_callback_info info)
1120 {
1121     nxt_napi                 napi(env);
1122     napi_value               this_arg, sock;
1123     req_data_t               *req_data;
1124     nxt_unit_request_info_t  *req;
1125 
1126     try {
1127         this_arg = napi.get_cb_info(info, sock);
1128 
1129         req = napi.get_request_info(sock);
1130 
1131         req_data = (req_data_t *) req->data;
1132         req_data->conn_ref = napi.wrap(this_arg, req, conn_destroy);
1133 
1134     } catch (exception &e) {
1135         napi.throw_error(e);
1136         return nullptr;
1137     }
1138 
1139     return this_arg;
1140 }
1141 
1142 
1143 void
conn_destroy(napi_env env,void * r,void * finalize_hint)1144 Unit::conn_destroy(napi_env env, void *r, void *finalize_hint)
1145 {
1146     nxt_unit_req_debug(NULL, "conn_destroy: %p", r);
1147 }
1148 
1149 
1150 void
sock_destroy(napi_env env,void * r,void * finalize_hint)1151 Unit::sock_destroy(napi_env env, void *r, void *finalize_hint)
1152 {
1153     nxt_unit_req_debug(NULL, "sock_destroy: %p", r);
1154 }
1155 
1156 
1157 void
req_destroy(napi_env env,void * r,void * finalize_hint)1158 Unit::req_destroy(napi_env env, void *r, void *finalize_hint)
1159 {
1160     nxt_unit_req_debug(NULL, "req_destroy: %p", r);
1161 }
1162 
1163 
1164 void
resp_destroy(napi_env env,void * r,void * finalize_hint)1165 Unit::resp_destroy(napi_env env, void *r, void *finalize_hint)
1166 {
1167     nxt_unit_req_debug(NULL, "resp_destroy: %p", r);
1168 }
1169