xref: /unit/src/nodejs/unit-http/unit.cpp (revision 1022:15b98689a353)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #include "unit.h"
7 
8 #include <unistd.h>
9 #include <fcntl.h>
10 
11 #include <uv.h>
12 
13 
14 napi_ref Unit::constructor_;
15 
16 
17 struct nxt_nodejs_ctx_t {
18     nxt_unit_port_id_t  port_id;
19     uv_poll_t           poll;
20 };
21 
22 
23 Unit::Unit(napi_env env, napi_value jsthis):
24     nxt_napi(env),
25     wrapper_(wrap(jsthis, this, destroy)),
26     unit_ctx_(nullptr)
27 {
28 }
29 
30 
31 Unit::~Unit()
32 {
33     delete_reference(wrapper_);
34 }
35 
36 
37 napi_value
38 Unit::init(napi_env env, napi_value exports)
39 {
40     nxt_napi    napi(env);
41     napi_value  cons;
42 
43     napi_property_descriptor  properties[] = {
44         { "createServer", 0, create_server, 0, 0, 0, napi_default, 0 },
45         { "listen", 0, listen, 0, 0, 0, napi_default, 0 },
46         { "_read", 0, _read, 0, 0, 0, napi_default, 0 }
47     };
48 
49     try {
50         cons = napi.define_class("Unit", create, 3, properties);
51         constructor_ = napi.create_reference(cons);
52 
53         napi.set_named_property(exports, "Unit", cons);
54         napi.set_named_property(exports, "unit_response_headers",
55                                 response_send_headers);
56         napi.set_named_property(exports, "unit_response_write", response_write);
57         napi.set_named_property(exports, "unit_response_end", response_end);
58 
59     } catch (exception &e) {
60         napi.throw_error(e);
61         return nullptr;
62     }
63 
64     return exports;
65 }
66 
67 
68 void
69 Unit::destroy(napi_env env, void *nativeObject, void *finalize_hint)
70 {
71     Unit  *obj = reinterpret_cast<Unit *>(nativeObject);
72 
73     delete obj;
74 }
75 
76 
77 napi_value
78 Unit::create(napi_env env, napi_callback_info info)
79 {
80     Unit        *obj;
81     nxt_napi    napi(env);
82     napi_ref    ref;
83     napi_value  target, cons, instance, jsthis;
84 
85     try {
86         target = napi.get_new_target(info);
87 
88         if (target != nullptr) {
89             /* Invoked as constructor: `new Unit(...)`. */
90             jsthis = napi.get_cb_info(info);
91 
92             obj = new Unit(env, jsthis);
93 
94             ref = napi.create_reference(jsthis);
95 
96             return jsthis;
97         }
98 
99         /* Invoked as plain function `Unit(...)`, turn into construct call. */
100         cons = napi.get_reference_value(constructor_);
101         instance = napi.new_instance(cons);
102         ref = napi.create_reference(instance);
103 
104     } catch (exception &e) {
105         napi.throw_error(e);
106         return nullptr;
107     }
108 
109     return instance;
110 }
111 
112 
113 napi_value
114 Unit::create_server(napi_env env, napi_callback_info info)
115 {
116     Unit             *obj;
117     size_t           argc;
118     nxt_napi         napi(env);
119     napi_value       jsthis, argv;
120     nxt_unit_init_t  unit_init;
121 
122     argc = 1;
123 
124     try {
125         jsthis = napi.get_cb_info(info, argc, &argv);
126         obj = (Unit *) napi.unwrap(jsthis);
127 
128     } catch (exception &e) {
129         napi.throw_error(e);
130         return nullptr;
131     }
132 
133     memset(&unit_init, 0, sizeof(nxt_unit_init_t));
134 
135     unit_init.data = obj;
136     unit_init.callbacks.request_handler = request_handler;
137     unit_init.callbacks.add_port        = add_port;
138     unit_init.callbacks.remove_port     = remove_port;
139     unit_init.callbacks.quit            = quit;
140 
141     obj->unit_ctx_ = nxt_unit_init(&unit_init);
142     if (obj->unit_ctx_ == NULL) {
143         goto failed;
144     }
145 
146     return nullptr;
147 
148 failed:
149 
150     napi_throw_error(env, NULL, "Failed to create Unit object");
151 
152     return nullptr;
153 }
154 
155 
156 napi_value
157 Unit::listen(napi_env env, napi_callback_info info)
158 {
159     return nullptr;
160 }
161 
162 
163 napi_value
164 Unit::_read(napi_env env, napi_callback_info info)
165 {
166     void                     *data;
167     size_t                   argc;
168     nxt_napi                 napi(env);
169     napi_value               jsthis, buffer, argv;
170     nxt_unit_request_info_t  *req;
171 
172     argc = 1;
173 
174     try {
175         jsthis = napi.get_cb_info(info, argc, &argv);
176 
177         req = napi.get_request_info(argv);
178         buffer = napi.create_buffer((size_t) req->content_length, &data);
179 
180     } catch (exception &e) {
181         napi.throw_error(e);
182         return nullptr;
183     }
184 
185     nxt_unit_request_read(req, data, req->content_length);
186 
187     return buffer;
188 }
189 
190 
191 void
192 Unit::request_handler(nxt_unit_request_info_t *req)
193 {
194     Unit         *obj;
195     napi_value   socket, request, response, server_obj;
196     napi_value   emit_events;
197     napi_value   events_args[3];
198 
199     obj = reinterpret_cast<Unit *>(req->unit->data);
200 
201     try {
202         nxt_handle_scope  scope(obj->env());
203 
204         server_obj = obj->get_server_object();
205 
206         socket = obj->create_socket(server_obj, req);
207         request = obj->create_request(server_obj, socket);
208         response = obj->create_response(server_obj, socket, request, req);
209 
210         obj->create_headers(req, request);
211 
212         emit_events = obj->get_named_property(server_obj, "emit_events");
213 
214         events_args[0] = server_obj;
215         events_args[1] = request;
216         events_args[2] = response;
217 
218         nxt_async_context   async_context(obj->env(), "unit_request_handler");
219         nxt_callback_scope  async_scope(async_context);
220 
221         obj->make_callback(async_context, server_obj, emit_events,
222                            3, events_args);
223 
224     } catch (exception &e) {
225         obj->throw_error(e);
226     }
227 }
228 
229 
230 void
231 nxt_uv_read_callback(uv_poll_t *handle, int status, int events)
232 {
233     nxt_unit_run_once((nxt_unit_ctx_t *) handle->data);
234 }
235 
236 
237 int
238 Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
239 {
240     int               err;
241     Unit              *obj;
242     uv_loop_t         *loop;
243     napi_status       status;
244     nxt_nodejs_ctx_t  *node_ctx;
245 
246     if (port->in_fd != -1) {
247         obj = reinterpret_cast<Unit *>(ctx->unit->data);
248 
249         if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) {
250             obj->throw_error("Failed to upgrade read"
251                              " file descriptor to O_NONBLOCK");
252             return -1;
253         }
254 
255         status = napi_get_uv_event_loop(obj->env(), &loop);
256         if (status != napi_ok) {
257             obj->throw_error("Failed to get uv.loop");
258             return NXT_UNIT_ERROR;
259         }
260 
261         node_ctx = new nxt_nodejs_ctx_t;
262 
263         err = uv_poll_init(loop, &node_ctx->poll, port->in_fd);
264         if (err < 0) {
265             obj->throw_error("Failed to init uv.poll");
266             return NXT_UNIT_ERROR;
267         }
268 
269         err = uv_poll_start(&node_ctx->poll, UV_READABLE, nxt_uv_read_callback);
270         if (err < 0) {
271             obj->throw_error("Failed to start uv.poll");
272             return NXT_UNIT_ERROR;
273         }
274 
275         ctx->data = node_ctx;
276 
277         node_ctx->port_id = port->id;
278         node_ctx->poll.data = ctx;
279     }
280 
281     return nxt_unit_add_port(ctx, port);
282 }
283 
284 
285 inline bool
286 operator == (const nxt_unit_port_id_t &p1, const nxt_unit_port_id_t &p2)
287 {
288     return p1.pid == p2.pid && p1.id == p2.id;
289 }
290 
291 
292 void
293 Unit::remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
294 {
295     nxt_nodejs_ctx_t  *node_ctx;
296 
297     if (ctx->data != NULL) {
298         node_ctx = (nxt_nodejs_ctx_t *) ctx->data;
299 
300         if (node_ctx->port_id == *port_id) {
301             uv_poll_stop(&node_ctx->poll);
302 
303             delete node_ctx;
304 
305             ctx->data = NULL;
306         }
307     }
308 
309     nxt_unit_remove_port(ctx, port_id);
310 }
311 
312 
313 void
314 Unit::quit(nxt_unit_ctx_t *ctx)
315 {
316     Unit        *obj;
317     napi_value  server_obj, emit_close;
318 
319     obj = reinterpret_cast<Unit *>(ctx->unit->data);
320 
321     try {
322         nxt_handle_scope  scope(obj->env());
323 
324         server_obj = obj->get_server_object();
325 
326         emit_close = obj->get_named_property(server_obj, "emit_close");
327 
328         nxt_async_context   async_context(obj->env(), "unit_quit");
329         nxt_callback_scope  async_scope(async_context);
330 
331         obj->make_callback(async_context, server_obj, emit_close, 0, NULL);
332 
333     } catch (exception &e) {
334         obj->throw_error(e);
335     }
336 
337     nxt_unit_done(ctx);
338 }
339 
340 
341 napi_value
342 Unit::get_server_object()
343 {
344     napi_value  unit_obj;
345 
346     unit_obj = get_reference_value(wrapper_);
347 
348     return get_named_property(unit_obj, "server");
349 }
350 
351 
352 void
353 Unit::create_headers(nxt_unit_request_info_t *req, napi_value request)
354 {
355     uint32_t            i;
356     napi_value          headers, raw_headers;
357     napi_status         status;
358     nxt_unit_request_t  *r;
359 
360     r = req->request;
361 
362     headers = create_object();
363 
364     status = napi_create_array_with_length(env(), r->fields_count * 2,
365                                            &raw_headers);
366     if (status != napi_ok) {
367         throw exception("Failed to create array");
368     }
369 
370     for (i = 0; i < r->fields_count; i++) {
371         append_header(r->fields + i, headers, raw_headers, i);
372     }
373 
374     set_named_property(request, "headers", headers);
375     set_named_property(request, "rawHeaders", raw_headers);
376     set_named_property(request, "httpVersion", r->version, r->version_length);
377     set_named_property(request, "method", r->method, r->method_length);
378     set_named_property(request, "url", r->target, r->target_length);
379 }
380 
381 
382 inline void
383 Unit::append_header(nxt_unit_field_t *f, napi_value headers,
384     napi_value raw_headers, uint32_t idx)
385 {
386     const char   *name;
387     napi_value   str, vstr;
388 
389     name = (const char *) nxt_unit_sptr_get(&f->name);
390 
391     vstr = set_named_property(headers, name, f->value, f->value_length);
392     str = create_string_latin1(name, f->name_length);
393 
394     set_element(raw_headers, idx * 2, str);
395     set_element(raw_headers, idx * 2 + 1, vstr);
396 }
397 
398 
399 napi_value
400 Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req)
401 {
402     napi_value          constructor, res;
403     nxt_unit_request_t  *r;
404 
405     r = req->request;
406 
407     constructor = get_named_property(server_obj, "socket");
408 
409     res = new_instance(constructor);
410 
411     set_named_property(res, "req_pointer", (intptr_t) req);
412     set_named_property(res, "remoteAddress", r->remote, r->remote_length);
413     set_named_property(res, "localAddress", r->local, r->local_length);
414 
415     return res;
416 }
417 
418 
419 napi_value
420 Unit::create_request(napi_value server_obj, napi_value socket)
421 {
422     napi_value  constructor, return_val;
423 
424     constructor = get_named_property(server_obj, "request");
425 
426     return_val = new_instance(constructor, server_obj);
427 
428     set_named_property(return_val, "socket", socket);
429     set_named_property(return_val, "connection", socket);
430 
431     return return_val;
432 }
433 
434 
435 napi_value
436 Unit::create_response(napi_value server_obj, napi_value socket,
437     napi_value request, nxt_unit_request_info_t *req)
438 {
439     napi_value  constructor, return_val;
440 
441     constructor = get_named_property(server_obj, "response");
442 
443     return_val = new_instance(constructor, request);
444 
445     set_named_property(return_val, "socket", socket);
446     set_named_property(return_val, "connection", socket);
447     set_named_property(return_val, "_req_point", (intptr_t) req);
448 
449     return return_val;
450 }
451 
452 
453 napi_value
454 Unit::response_send_headers(napi_env env, napi_callback_info info)
455 {
456     int                      ret;
457     char                     *ptr, *name_ptr;
458     bool                     is_array;
459     size_t                   argc, name_len, value_len;
460     uint32_t                 status_code, header_len, keys_len, array_len;
461     uint32_t                 keys_count, i, j;
462     uint16_t                 hash;
463     nxt_napi                 napi(env);
464     napi_value               this_arg, headers, keys, name, value, array_val;
465     napi_value               req_num, array_entry;
466     napi_valuetype           val_type;
467     nxt_unit_field_t         *f;
468     nxt_unit_request_info_t  *req;
469     napi_value               argv[5];
470 
471     argc = 5;
472 
473     try {
474         this_arg = napi.get_cb_info(info, argc, argv);
475         if (argc != 5) {
476             napi.throw_error("Wrong args count. Expected: "
477                              "statusCode, headers, headers count, "
478                              "headers length");
479             return nullptr;
480         }
481 
482         req_num = napi.get_named_property(argv[0], "_req_point");
483 
484         req = napi.get_request_info(req_num);
485 
486         status_code = napi.get_value_uint32(argv[1]);
487         keys_count = napi.get_value_uint32(argv[3]);
488         header_len = napi.get_value_uint32(argv[4]);
489 
490         /* Need to reserve extra byte for C-string 0-termination. */
491         header_len++;
492 
493         headers = argv[2];
494 
495         ret = nxt_unit_response_init(req, status_code, keys_count, header_len);
496         if (ret != NXT_UNIT_OK) {
497             napi.throw_error("Failed to create response");
498             return nullptr;
499         }
500 
501         keys = napi.get_property_names(headers);
502         keys_len = napi.get_array_length(keys);
503 
504         ptr = req->response_buf->free;
505 
506         for (i = 0; i < keys_len; i++) {
507             name = napi.get_element(keys, i);
508 
509             array_entry = napi.get_property(headers, name);
510 
511             name = napi.get_element(array_entry, 0);
512             value = napi.get_element(array_entry, 1);
513 
514             name_len = napi.get_value_string_latin1(name, ptr, header_len);
515             name_ptr = ptr;
516 
517             ptr += name_len;
518             header_len -= name_len;
519 
520             hash = nxt_unit_field_hash(name_ptr, name_len);
521 
522             is_array = napi.is_array(value);
523 
524             if (is_array) {
525                 array_len = napi.get_array_length(value);
526 
527                 for (j = 0; j < array_len; j++) {
528                     array_val = napi.get_element(value, j);
529 
530                     val_type = napi.type_of(array_val);
531 
532                     if (val_type != napi_string) {
533                         array_val = napi.coerce_to_string(array_val);
534                     }
535 
536                     value_len = napi.get_value_string_latin1(array_val, ptr,
537                                                              header_len);
538 
539                     f = req->response->fields + req->response->fields_count;
540                     f->skip = 0;
541 
542                     nxt_unit_sptr_set(&f->name, name_ptr);
543 
544                     f->name_length = name_len;
545                     f->hash = hash;
546 
547                     nxt_unit_sptr_set(&f->value, ptr);
548                     f->value_length = (uint32_t) value_len;
549 
550                     ptr += value_len;
551                     header_len -= value_len;
552 
553                     req->response->fields_count++;
554                 }
555 
556             } else {
557                 val_type = napi.type_of(value);
558 
559                 if (val_type != napi_string) {
560                     value = napi.coerce_to_string(value);
561                 }
562 
563                 value_len = napi.get_value_string_latin1(value, ptr, header_len);
564 
565                 f = req->response->fields + req->response->fields_count;
566                 f->skip = 0;
567 
568                 nxt_unit_sptr_set(&f->name, name_ptr);
569 
570                 f->name_length = name_len;
571                 f->hash = hash;
572 
573                 nxt_unit_sptr_set(&f->value, ptr);
574                 f->value_length = (uint32_t) value_len;
575 
576                 ptr += value_len;
577                 header_len -= value_len;
578 
579                 req->response->fields_count++;
580             }
581         }
582 
583     } catch (exception &e) {
584         napi.throw_error(e);
585         return nullptr;
586     }
587 
588     req->response_buf->free = ptr;
589 
590     ret = nxt_unit_response_send(req);
591     if (ret != NXT_UNIT_OK) {
592         napi.throw_error("Failed to send response");
593         return nullptr;
594     }
595 
596     return this_arg;
597 }
598 
599 
600 napi_value
601 Unit::response_write(napi_env env, napi_callback_info info)
602 {
603     int                      ret;
604     char                     *ptr;
605     size_t                   argc, have_buf_len;
606     uint32_t                 buf_len;
607     nxt_napi                 napi(env);
608     napi_value               this_arg, req_num;
609     napi_status              status;
610     nxt_unit_buf_t           *buf;
611     napi_valuetype           buf_type;
612     nxt_unit_request_info_t  *req;
613     napi_value               argv[3];
614 
615     argc = 3;
616 
617     try {
618         this_arg = napi.get_cb_info(info, argc, argv);
619         if (argc != 3) {
620             throw exception("Wrong args count. Expected: "
621                             "chunk, chunk length");
622         }
623 
624         req_num = napi.get_named_property(argv[0], "_req_point");
625         req = napi.get_request_info(req_num);
626 
627         buf_len = napi.get_value_uint32(argv[2]);
628 
629         buf_type = napi.type_of(argv[1]);
630 
631     } catch (exception &e) {
632         napi.throw_error(e);
633         return nullptr;
634     }
635 
636     buf_len++;
637 
638     buf = nxt_unit_response_buf_alloc(req, buf_len);
639     if (buf == NULL) {
640         goto failed;
641     }
642 
643     if (buf_type == napi_string) {
644         /* TODO: will work only for utf8 content-type */
645 
646         status = napi_get_value_string_utf8(env, argv[1], buf->free,
647                                             buf_len, &have_buf_len);
648 
649     } else {
650         status = napi_get_buffer_info(env, argv[1], (void **) &ptr,
651                                       &have_buf_len);
652 
653         memcpy(buf->free, ptr, have_buf_len);
654     }
655 
656     if (status != napi_ok) {
657         goto failed;
658     }
659 
660     buf->free += have_buf_len;
661 
662     ret = nxt_unit_buf_send(buf);
663     if (ret != NXT_UNIT_OK) {
664         goto failed;
665     }
666 
667     return this_arg;
668 
669 failed:
670 
671     napi.throw_error("Failed to write body");
672 
673     return nullptr;
674 }
675 
676 
677 napi_value
678 Unit::response_end(napi_env env, napi_callback_info info)
679 {
680     size_t                   argc;
681     nxt_napi                 napi(env);
682     napi_value               resp, this_arg, req_num;
683     nxt_unit_request_info_t  *req;
684 
685     argc = 1;
686 
687     try {
688         this_arg = napi.get_cb_info(info, argc, &resp);
689 
690         req_num = napi.get_named_property(resp, "_req_point");
691         req = napi.get_request_info(req_num);
692 
693     } catch (exception &e) {
694         napi.throw_error(e);
695         return nullptr;
696     }
697 
698     nxt_unit_request_done(req, NXT_UNIT_OK);
699 
700     return this_arg;
701 }
702