unit.cpp (1038:77fb332f214a) unit.cpp (1132:9ac5b5f33ed9)
1
2/*
3 * Copyright (C) NGINX, Inc.
4 */
5
6#include "unit.h"
7
8#include <unistd.h>
9#include <fcntl.h>
10
11#include <uv.h>
12
1
2/*
3 * Copyright (C) NGINX, Inc.
4 */
5
6#include "unit.h"
7
8#include <unistd.h>
9#include <fcntl.h>
10
11#include <uv.h>
12
13#include <nxt_unit_websocket.h>
13
14
15
14napi_ref Unit::constructor_;
15
16
17struct nxt_nodejs_ctx_t {
18 nxt_unit_port_id_t port_id;
19 uv_poll_t poll;
20};
21
22
16napi_ref Unit::constructor_;
17
18
19struct nxt_nodejs_ctx_t {
20 nxt_unit_port_id_t port_id;
21 uv_poll_t poll;
22};
23
24
25struct req_data_t {
26 napi_ref sock_ref;
27 napi_ref resp_ref;
28 napi_ref conn_ref;
29};
30
31
23Unit::Unit(napi_env env, napi_value jsthis):
24 nxt_napi(env),
25 wrapper_(wrap(jsthis, this, destroy)),
26 unit_ctx_(nullptr)
27{
32Unit::Unit(napi_env env, napi_value jsthis):
33 nxt_napi(env),
34 wrapper_(wrap(jsthis, this, destroy)),
35 unit_ctx_(nullptr)
36{
37 nxt_unit_debug(NULL, "Unit::Unit()");
28}
29
30
31Unit::~Unit()
32{
33 delete_reference(wrapper_);
38}
39
40
41Unit::~Unit()
42{
43 delete_reference(wrapper_);
44
45 nxt_unit_debug(NULL, "Unit::~Unit()");
34}
35
36
37napi_value
38Unit::init(napi_env env, napi_value exports)
39{
40 nxt_napi napi(env);
46}
47
48
49napi_value
50Unit::init(napi_env env, napi_value exports)
51{
52 nxt_napi napi(env);
41 napi_value cons;
53 napi_value ctor;
42
54
43 napi_property_descriptor properties[] = {
55 napi_property_descriptor unit_props[] = {
44 { "createServer", 0, create_server, 0, 0, 0, napi_default, 0 },
45 { "listen", 0, listen, 0, 0, 0, napi_default, 0 },
56 { "createServer", 0, create_server, 0, 0, 0, napi_default, 0 },
57 { "listen", 0, listen, 0, 0, 0, napi_default, 0 },
46 { "_read", 0, _read, 0, 0, 0, napi_default, 0 }
47 };
48
49 try {
58 };
59
60 try {
50 cons = napi.define_class("Unit", create, 3, properties);
51 constructor_ = napi.create_reference(cons);
61 ctor = napi.define_class("Unit", create, 2, unit_props);
62 constructor_ = napi.create_reference(ctor);
52
63
53 napi.set_named_property(exports, "Unit", cons);
54 napi.set_named_property(exports, "unit_response_headers",
64 napi.set_named_property(exports, "Unit", ctor);
65 napi.set_named_property(exports, "response_send_headers",
55 response_send_headers);
66 response_send_headers);
56 napi.set_named_property(exports, "unit_response_write", response_write);
57 napi.set_named_property(exports, "unit_response_end", response_end);
67 napi.set_named_property(exports, "response_write", response_write);
68 napi.set_named_property(exports, "response_end", response_end);
69 napi.set_named_property(exports, "websocket_send_frame",
70 websocket_send_frame);
71 napi.set_named_property(exports, "websocket_set_sock",
72 websocket_set_sock);
58
59 } catch (exception &e) {
60 napi.throw_error(e);
61 return nullptr;
62 }
63
64 return exports;
65}

--- 7 unchanged lines hidden (view full) ---

73 delete obj;
74}
75
76
77napi_value
78Unit::create(napi_env env, napi_callback_info info)
79{
80 nxt_napi napi(env);
73
74 } catch (exception &e) {
75 napi.throw_error(e);
76 return nullptr;
77 }
78
79 return exports;
80}

--- 7 unchanged lines hidden (view full) ---

88 delete obj;
89}
90
91
92napi_value
93Unit::create(napi_env env, napi_callback_info info)
94{
95 nxt_napi napi(env);
81 napi_value target, cons, instance, jsthis;
96 napi_value target, ctor, instance, jsthis;
82
83 try {
84 target = napi.get_new_target(info);
85
86 if (target != nullptr) {
87 /* Invoked as constructor: `new Unit(...)`. */
88 jsthis = napi.get_cb_info(info);
89
90 new Unit(env, jsthis);
91 napi.create_reference(jsthis);
92
93 return jsthis;
94 }
95
96 /* Invoked as plain function `Unit(...)`, turn into construct call. */
97
98 try {
99 target = napi.get_new_target(info);
100
101 if (target != nullptr) {
102 /* Invoked as constructor: `new Unit(...)`. */
103 jsthis = napi.get_cb_info(info);
104
105 new Unit(env, jsthis);
106 napi.create_reference(jsthis);
107
108 return jsthis;
109 }
110
111 /* Invoked as plain function `Unit(...)`, turn into construct call. */
97 cons = napi.get_reference_value(constructor_);
98 instance = napi.new_instance(cons);
112 ctor = napi.get_reference_value(constructor_);
113 instance = napi.new_instance(ctor);
99 napi.create_reference(instance);
100
101 } catch (exception &e) {
102 napi.throw_error(e);
103 return nullptr;
104 }
105
106 return instance;

--- 18 unchanged lines hidden (view full) ---

125 } catch (exception &e) {
126 napi.throw_error(e);
127 return nullptr;
128 }
129
130 memset(&unit_init, 0, sizeof(nxt_unit_init_t));
131
132 unit_init.data = obj;
114 napi.create_reference(instance);
115
116 } catch (exception &e) {
117 napi.throw_error(e);
118 return nullptr;
119 }
120
121 return instance;

--- 18 unchanged lines hidden (view full) ---

140 } catch (exception &e) {
141 napi.throw_error(e);
142 return nullptr;
143 }
144
145 memset(&unit_init, 0, sizeof(nxt_unit_init_t));
146
147 unit_init.data = obj;
133 unit_init.callbacks.request_handler = request_handler;
134 unit_init.callbacks.add_port = add_port;
135 unit_init.callbacks.remove_port = remove_port;
136 unit_init.callbacks.quit = quit;
148 unit_init.callbacks.request_handler = request_handler_cb;
149 unit_init.callbacks.websocket_handler = websocket_handler_cb;
150 unit_init.callbacks.close_handler = close_handler_cb;
151 unit_init.callbacks.add_port = add_port;
152 unit_init.callbacks.remove_port = remove_port;
153 unit_init.callbacks.quit = quit_cb;
137
154
155 unit_init.request_data_size = sizeof(req_data_t);
156
138 obj->unit_ctx_ = nxt_unit_init(&unit_init);
139 if (obj->unit_ctx_ == NULL) {
140 goto failed;
141 }
142
143 return nullptr;
144
145failed:

--- 6 unchanged lines hidden (view full) ---

152
153napi_value
154Unit::listen(napi_env env, napi_callback_info info)
155{
156 return nullptr;
157}
158
159
157 obj->unit_ctx_ = nxt_unit_init(&unit_init);
158 if (obj->unit_ctx_ == NULL) {
159 goto failed;
160 }
161
162 return nullptr;
163
164failed:

--- 6 unchanged lines hidden (view full) ---

171
172napi_value
173Unit::listen(napi_env env, napi_callback_info info)
174{
175 return nullptr;
176}
177
178
160napi_value
161Unit::_read(napi_env env, napi_callback_info info)
179void
180Unit::request_handler_cb(nxt_unit_request_info_t *req)
162{
181{
163 void *data;
164 size_t argc;
165 nxt_napi napi(env);
166 napi_value buffer, argv;
167 nxt_unit_request_info_t *req;
182 Unit *obj;
168
183
169 argc = 1;
184 obj = reinterpret_cast<Unit *>(req->unit->data);
170
185
186 obj->request_handler(req);
187}
188
189
190void
191Unit::request_handler(nxt_unit_request_info_t *req)
192{
193 napi_value socket, request, response, server_obj, emit_request;
194
195 memset(req->data, 0, sizeof(req_data_t));
196
171 try {
197 try {
172 napi.get_cb_info(info, argc, &argv);
198 nxt_handle_scope scope(env());
173
199
174 req = napi.get_request_info(argv);
175 buffer = napi.create_buffer((size_t) req->content_length, &data);
200 server_obj = get_server_object();
176
201
202 socket = create_socket(server_obj, req);
203 request = create_request(server_obj, socket);
204 response = create_response(server_obj, request, req);
205
206 create_headers(req, request);
207
208 emit_request = get_named_property(server_obj, "emit_request");
209
210 nxt_async_context async_context(env(), "request_handler");
211 nxt_callback_scope async_scope(async_context);
212
213 make_callback(async_context, server_obj, emit_request, request,
214 response);
215
177 } catch (exception &e) {
216 } catch (exception &e) {
178 napi.throw_error(e);
179 return nullptr;
217 nxt_unit_req_warn(req, "request_handler: %s", e.str);
180 }
218 }
219}
181
220
182 nxt_unit_request_read(req, data, req->content_length);
183
221
184 return buffer;
222void
223Unit::websocket_handler_cb(nxt_unit_websocket_frame_t *ws)
224{
225 Unit *obj;
226
227 obj = reinterpret_cast<Unit *>(ws->req->unit->data);
228
229 obj->websocket_handler(ws);
185}
186
187
188void
230}
231
232
233void
189Unit::request_handler(nxt_unit_request_info_t *req)
234Unit::websocket_handler(nxt_unit_websocket_frame_t *ws)
190{
235{
191 Unit *obj;
192 napi_value socket, request, response, server_obj;
193 napi_value emit_events;
194 napi_value events_args[3];
236 napi_value frame, server_obj, process_frame, conn;
237 req_data_t *req_data;
195
238
196 obj = reinterpret_cast<Unit *>(req->unit->data);
239 req_data = (req_data_t *) ws->req->data;
197
198 try {
240
241 try {
199 nxt_handle_scope scope(obj->env());
242 nxt_handle_scope scope(env());
200
243
201 server_obj = obj->get_server_object();
244 server_obj = get_server_object();
202
245
203 socket = obj->create_socket(server_obj, req);
204 request = obj->create_request(server_obj, socket);
205 response = obj->create_response(server_obj, socket, request, req);
246 frame = create_websocket_frame(server_obj, ws);
206
247
207 obj->create_headers(req, request);
248 conn = get_reference_value(req_data->conn_ref);
208
249
209 emit_events = obj->get_named_property(server_obj, "emit_events");
250 process_frame = get_named_property(conn, "processFrame");
210
251
211 events_args[0] = server_obj;
212 events_args[1] = request;
213 events_args[2] = response;
214
215 nxt_async_context async_context(obj->env(), "unit_request_handler");
252 nxt_async_context async_context(env(), "websocket_handler");
216 nxt_callback_scope async_scope(async_context);
217
253 nxt_callback_scope async_scope(async_context);
254
218 obj->make_callback(async_context, server_obj, emit_events,
219 3, events_args);
255 make_callback(async_context, conn, process_frame, frame);
220
221 } catch (exception &e) {
256
257 } catch (exception &e) {
222 obj->throw_error(e);
258 nxt_unit_req_warn(ws->req, "websocket_handler: %s", e.str);
223 }
259 }
260
261 nxt_unit_websocket_done(ws);
224}
225
226
227void
262}
263
264
265void
266Unit::close_handler_cb(nxt_unit_request_info_t *req)
267{
268 Unit *obj;
269
270 obj = reinterpret_cast<Unit *>(req->unit->data);
271
272 obj->close_handler(req);
273}
274
275
276void
277Unit::close_handler(nxt_unit_request_info_t *req)
278{
279 napi_value conn_handle_close, conn;
280 req_data_t *req_data;
281
282 req_data = (req_data_t *) req->data;
283
284 try {
285 nxt_handle_scope scope(env());
286
287 conn = get_reference_value(req_data->conn_ref);
288
289 conn_handle_close = get_named_property(conn, "handleSocketClose");
290
291 nxt_async_context async_context(env(), "close_handler");
292 nxt_callback_scope async_scope(async_context);
293
294 make_callback(async_context, conn, conn_handle_close,
295 nxt_napi::create(0));
296
297 remove_wrap(req_data->sock_ref);
298 remove_wrap(req_data->resp_ref);
299 remove_wrap(req_data->conn_ref);
300
301 } catch (exception &e) {
302 nxt_unit_req_warn(req, "close_handler: %s", e.str);
303
304 return;
305 }
306
307 nxt_unit_request_done(req, NXT_UNIT_OK);
308}
309
310
311static void
228nxt_uv_read_callback(uv_poll_t *handle, int status, int events)
229{
230 nxt_unit_run_once((nxt_unit_ctx_t *) handle->data);
231}
232
233
234int
235Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
236{
237 int err;
238 Unit *obj;
239 uv_loop_t *loop;
240 napi_status status;
241 nxt_nodejs_ctx_t *node_ctx;
242
243 if (port->in_fd != -1) {
244 obj = reinterpret_cast<Unit *>(ctx->unit->data);
245
246 if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) {
312nxt_uv_read_callback(uv_poll_t *handle, int status, int events)
313{
314 nxt_unit_run_once((nxt_unit_ctx_t *) handle->data);
315}
316
317
318int
319Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
320{
321 int err;
322 Unit *obj;
323 uv_loop_t *loop;
324 napi_status status;
325 nxt_nodejs_ctx_t *node_ctx;
326
327 if (port->in_fd != -1) {
328 obj = reinterpret_cast<Unit *>(ctx->unit->data);
329
330 if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) {
247 obj->throw_error("Failed to upgrade read"
248 " file descriptor to O_NONBLOCK");
331 nxt_unit_warn(ctx, "fcntl(%d, O_NONBLOCK) failed: %s (%d)",
332 port->in_fd, strerror(errno), errno);
249 return -1;
250 }
251
252 status = napi_get_uv_event_loop(obj->env(), &loop);
253 if (status != napi_ok) {
333 return -1;
334 }
335
336 status = napi_get_uv_event_loop(obj->env(), &loop);
337 if (status != napi_ok) {
254 obj->throw_error("Failed to get uv.loop");
338 nxt_unit_warn(ctx, "Failed to get uv.loop");
255 return NXT_UNIT_ERROR;
256 }
257
258 node_ctx = new nxt_nodejs_ctx_t;
259
260 err = uv_poll_init(loop, &node_ctx->poll, port->in_fd);
261 if (err < 0) {
339 return NXT_UNIT_ERROR;
340 }
341
342 node_ctx = new nxt_nodejs_ctx_t;
343
344 err = uv_poll_init(loop, &node_ctx->poll, port->in_fd);
345 if (err < 0) {
262 obj->throw_error("Failed to init uv.poll");
346 nxt_unit_warn(ctx, "Failed to init uv.poll");
263 return NXT_UNIT_ERROR;
264 }
265
266 err = uv_poll_start(&node_ctx->poll, UV_READABLE, nxt_uv_read_callback);
267 if (err < 0) {
347 return NXT_UNIT_ERROR;
348 }
349
350 err = uv_poll_start(&node_ctx->poll, UV_READABLE, nxt_uv_read_callback);
351 if (err < 0) {
268 obj->throw_error("Failed to start uv.poll");
352 nxt_unit_warn(ctx, "Failed to start uv.poll");
269 return NXT_UNIT_ERROR;
270 }
271
272 ctx->data = node_ctx;
273
274 node_ctx->port_id = port->id;
275 node_ctx->poll.data = ctx;
276 }

--- 26 unchanged lines hidden (view full) ---

303 }
304 }
305
306 nxt_unit_remove_port(ctx, port_id);
307}
308
309
310void
353 return NXT_UNIT_ERROR;
354 }
355
356 ctx->data = node_ctx;
357
358 node_ctx->port_id = port->id;
359 node_ctx->poll.data = ctx;
360 }

--- 26 unchanged lines hidden (view full) ---

387 }
388 }
389
390 nxt_unit_remove_port(ctx, port_id);
391}
392
393
394void
311Unit::quit(nxt_unit_ctx_t *ctx)
395Unit::quit_cb(nxt_unit_ctx_t *ctx)
312{
396{
313 Unit *obj;
314 napi_value server_obj, emit_close;
397 Unit *obj;
315
316 obj = reinterpret_cast<Unit *>(ctx->unit->data);
317
398
399 obj = reinterpret_cast<Unit *>(ctx->unit->data);
400
401 obj->quit(ctx);
402}
403
404
405void
406Unit::quit(nxt_unit_ctx_t *ctx)
407{
408 napi_value server_obj, emit_close;
409
318 try {
410 try {
319 nxt_handle_scope scope(obj->env());
411 nxt_handle_scope scope(env());
320
412
321 server_obj = obj->get_server_object();
413 server_obj = get_server_object();
322
414
323 emit_close = obj->get_named_property(server_obj, "emit_close");
415 emit_close = get_named_property(server_obj, "emit_close");
324
416
325 nxt_async_context async_context(obj->env(), "unit_quit");
417 nxt_async_context async_context(env(), "unit_quit");
326 nxt_callback_scope async_scope(async_context);
327
418 nxt_callback_scope async_scope(async_context);
419
328 obj->make_callback(async_context, server_obj, emit_close, 0, NULL);
420 make_callback(async_context, server_obj, emit_close);
329
330 } catch (exception &e) {
421
422 } catch (exception &e) {
331 obj->throw_error(e);
423 nxt_unit_debug(ctx, "quit: %s", e.str);
332 }
333
334 nxt_unit_done(ctx);
335}
336
337
338napi_value
339Unit::get_server_object()

--- 4 unchanged lines hidden (view full) ---

344
345 return get_named_property(unit_obj, "server");
346}
347
348
349void
350Unit::create_headers(nxt_unit_request_info_t *req, napi_value request)
351{
424 }
425
426 nxt_unit_done(ctx);
427}
428
429
430napi_value
431Unit::get_server_object()

--- 4 unchanged lines hidden (view full) ---

436
437 return get_named_property(unit_obj, "server");
438}
439
440
441void
442Unit::create_headers(nxt_unit_request_info_t *req, napi_value request)
443{
444 void *data;
352 uint32_t i;
445 uint32_t i;
353 napi_value headers, raw_headers;
446 napi_value headers, raw_headers, buffer;
354 napi_status status;
355 nxt_unit_request_t *r;
356
357 r = req->request;
358
359 headers = create_object();
360
361 status = napi_create_array_with_length(env(), r->fields_count * 2,

--- 6 unchanged lines hidden (view full) ---

368 append_header(r->fields + i, headers, raw_headers, i);
369 }
370
371 set_named_property(request, "headers", headers);
372 set_named_property(request, "rawHeaders", raw_headers);
373 set_named_property(request, "httpVersion", r->version, r->version_length);
374 set_named_property(request, "method", r->method, r->method_length);
375 set_named_property(request, "url", r->target, r->target_length);
447 napi_status status;
448 nxt_unit_request_t *r;
449
450 r = req->request;
451
452 headers = create_object();
453
454 status = napi_create_array_with_length(env(), r->fields_count * 2,

--- 6 unchanged lines hidden (view full) ---

461 append_header(r->fields + i, headers, raw_headers, i);
462 }
463
464 set_named_property(request, "headers", headers);
465 set_named_property(request, "rawHeaders", raw_headers);
466 set_named_property(request, "httpVersion", r->version, r->version_length);
467 set_named_property(request, "method", r->method, r->method_length);
468 set_named_property(request, "url", r->target, r->target_length);
469
470 set_named_property(request, "_websocket_handshake", r->websocket_handshake);
471
472 buffer = create_buffer((size_t) req->content_length, &data);
473 nxt_unit_request_read(req, data, req->content_length);
474
475 set_named_property(request, "_data", buffer);
376}
377
378
379inline char
380lowcase(char c)
381{
382 return (c >= 'A' && c <= 'Z') ? (c | 0x20) : c;
383}

--- 21 unchanged lines hidden (view full) ---

405 set_element(raw_headers, idx * 2 + 1, vstr);
406}
407
408
409napi_value
410Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req)
411{
412 napi_value constructor, res;
476}
477
478
479inline char
480lowcase(char c)
481{
482 return (c >= 'A' && c <= 'Z') ? (c | 0x20) : c;
483}

--- 21 unchanged lines hidden (view full) ---

505 set_element(raw_headers, idx * 2 + 1, vstr);
506}
507
508
509napi_value
510Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req)
511{
512 napi_value constructor, res;
513 req_data_t *req_data;
413 nxt_unit_request_t *r;
414
415 r = req->request;
416
514 nxt_unit_request_t *r;
515
516 r = req->request;
517
417 constructor = get_named_property(server_obj, "socket");
518 constructor = get_named_property(server_obj, "Socket");
418
419 res = new_instance(constructor);
420
519
520 res = new_instance(constructor);
521
421 set_named_property(res, "req_pointer", (intptr_t) req);
522 req_data = (req_data_t *) req->data;
523 req_data->sock_ref = wrap(res, req, sock_destroy);
524
422 set_named_property(res, "remoteAddress", r->remote, r->remote_length);
423 set_named_property(res, "localAddress", r->local, r->local_length);
424
425 return res;
426}
427
428
429napi_value
430Unit::create_request(napi_value server_obj, napi_value socket)
431{
525 set_named_property(res, "remoteAddress", r->remote, r->remote_length);
526 set_named_property(res, "localAddress", r->local, r->local_length);
527
528 return res;
529}
530
531
532napi_value
533Unit::create_request(napi_value server_obj, napi_value socket)
534{
432 napi_value constructor, return_val;
535 napi_value constructor;
433
536
434 constructor = get_named_property(server_obj, "request");
537 constructor = get_named_property(server_obj, "ServerRequest");
435
538
436 return_val = new_instance(constructor, server_obj);
539 return new_instance(constructor, server_obj, socket);
540}
437
541
438 set_named_property(return_val, "socket", socket);
439 set_named_property(return_val, "connection", socket);
440
542
441 return return_val;
543napi_value
544Unit::create_response(napi_value server_obj, napi_value request,
545 nxt_unit_request_info_t *req)
546{
547 napi_value constructor, res;
548 req_data_t *req_data;
549
550 constructor = get_named_property(server_obj, "ServerResponse");
551
552 res = new_instance(constructor, request);
553
554 req_data = (req_data_t *) req->data;
555 req_data->resp_ref = wrap(res, req, resp_destroy);
556
557 return res;
442}
443
444
445napi_value
558}
559
560
561napi_value
446Unit::create_response(napi_value server_obj, napi_value socket,
447 napi_value request, nxt_unit_request_info_t *req)
562Unit::create_websocket_frame(napi_value server_obj,
563 nxt_unit_websocket_frame_t *ws)
448{
564{
449 napi_value constructor, return_val;
565 void *data;
566 napi_value constructor, res, buffer;
567 uint8_t sc[2];
450
568
451 constructor = get_named_property(server_obj, "response");
569 constructor = get_named_property(server_obj, "WebSocketFrame");
452
570
453 return_val = new_instance(constructor, request);
571 res = new_instance(constructor);
454
572
455 set_named_property(return_val, "socket", socket);
456 set_named_property(return_val, "connection", socket);
457 set_named_property(return_val, "_req_point", (intptr_t) req);
573 set_named_property(res, "fin", (bool) ws->header->fin);
574 set_named_property(res, "opcode", ws->header->opcode);
575 set_named_property(res, "length", (int64_t) ws->payload_len);
458
576
459 return return_val;
577 if (ws->header->opcode == NXT_WEBSOCKET_OP_CLOSE) {
578 if (ws->payload_len >= 2) {
579 nxt_unit_websocket_read(ws, sc, 2);
580
581 set_named_property(res, "closeStatus",
582 (((uint16_t) sc[0]) << 8) | sc[1]);
583
584 } else {
585 set_named_property(res, "closeStatus", -1);
586 }
587 }
588
589 buffer = create_buffer((size_t) ws->content_length, &data);
590 nxt_unit_websocket_read(ws, data, ws->content_length);
591
592 set_named_property(res, "binaryPayload", buffer);
593
594 return res;
460}
461
462
463napi_value
464Unit::response_send_headers(napi_env env, napi_callback_info info)
465{
466 int ret;
467 char *ptr, *name_ptr;
468 bool is_array;
469 size_t argc, name_len, value_len;
470 uint32_t status_code, header_len, keys_len, array_len;
471 uint32_t keys_count, i, j;
472 uint16_t hash;
473 nxt_napi napi(env);
474 napi_value this_arg, headers, keys, name, value, array_val;
595}
596
597
598napi_value
599Unit::response_send_headers(napi_env env, napi_callback_info info)
600{
601 int ret;
602 char *ptr, *name_ptr;
603 bool is_array;
604 size_t argc, name_len, value_len;
605 uint32_t status_code, header_len, keys_len, array_len;
606 uint32_t keys_count, i, j;
607 uint16_t hash;
608 nxt_napi napi(env);
609 napi_value this_arg, headers, keys, name, value, array_val;
475 napi_value req_num, array_entry;
610 napi_value array_entry;
476 napi_valuetype val_type;
477 nxt_unit_field_t *f;
478 nxt_unit_request_info_t *req;
611 napi_valuetype val_type;
612 nxt_unit_field_t *f;
613 nxt_unit_request_info_t *req;
479 napi_value argv[5];
614 napi_value argv[4];
480
615
481 argc = 5;
616 argc = 4;
482
483 try {
484 this_arg = napi.get_cb_info(info, argc, argv);
617
618 try {
619 this_arg = napi.get_cb_info(info, argc, argv);
485 if (argc != 5) {
620 if (argc != 4) {
486 napi.throw_error("Wrong args count. Expected: "
487 "statusCode, headers, headers count, "
488 "headers length");
489 return nullptr;
490 }
491
621 napi.throw_error("Wrong args count. Expected: "
622 "statusCode, headers, headers count, "
623 "headers length");
624 return nullptr;
625 }
626
492 req_num = napi.get_named_property(argv[0], "_req_point");
627 req = napi.get_request_info(this_arg);
628 status_code = napi.get_value_uint32(argv[0]);
629 keys_count = napi.get_value_uint32(argv[2]);
630 header_len = napi.get_value_uint32(argv[3]);
493
631
494 req = napi.get_request_info(req_num);
495
496 status_code = napi.get_value_uint32(argv[1]);
497 keys_count = napi.get_value_uint32(argv[3]);
498 header_len = napi.get_value_uint32(argv[4]);
499
500 /* Need to reserve extra byte for C-string 0-termination. */
501 header_len++;
502
632 /* Need to reserve extra byte for C-string 0-termination. */
633 header_len++;
634
503 headers = argv[2];
635 headers = argv[1];
504
505 ret = nxt_unit_response_init(req, status_code, keys_count, header_len);
506 if (ret != NXT_UNIT_OK) {
507 napi.throw_error("Failed to create response");
508 return nullptr;
509 }
510
511 keys = napi.get_property_names(headers);

--- 94 unchanged lines hidden (view full) ---

606 return this_arg;
607}
608
609
610napi_value
611Unit::response_write(napi_env env, napi_callback_info info)
612{
613 int ret;
636
637 ret = nxt_unit_response_init(req, status_code, keys_count, header_len);
638 if (ret != NXT_UNIT_OK) {
639 napi.throw_error("Failed to create response");
640 return nullptr;
641 }
642
643 keys = napi.get_property_names(headers);

--- 94 unchanged lines hidden (view full) ---

738 return this_arg;
739}
740
741
742napi_value
743Unit::response_write(napi_env env, napi_callback_info info)
744{
745 int ret;
614 char *ptr;
746 void *ptr;
615 size_t argc, have_buf_len;
616 uint32_t buf_len;
617 nxt_napi napi(env);
747 size_t argc, have_buf_len;
748 uint32_t buf_len;
749 nxt_napi napi(env);
618 napi_value this_arg, req_num;
619 napi_status status;
750 napi_value this_arg;
620 nxt_unit_buf_t *buf;
621 napi_valuetype buf_type;
622 nxt_unit_request_info_t *req;
751 nxt_unit_buf_t *buf;
752 napi_valuetype buf_type;
753 nxt_unit_request_info_t *req;
623 napi_value argv[3];
754 napi_value argv[2];
624
755
625 argc = 3;
756 argc = 2;
626
627 try {
628 this_arg = napi.get_cb_info(info, argc, argv);
757
758 try {
759 this_arg = napi.get_cb_info(info, argc, argv);
629 if (argc != 3) {
760 if (argc != 2) {
630 throw exception("Wrong args count. Expected: "
631 "chunk, chunk length");
632 }
633
761 throw exception("Wrong args count. Expected: "
762 "chunk, chunk length");
763 }
764
634 req_num = napi.get_named_property(argv[0], "_req_point");
635 req = napi.get_request_info(req_num);
765 req = napi.get_request_info(this_arg);
766 buf_type = napi.type_of(argv[0]);
767 buf_len = napi.get_value_uint32(argv[1]) + 1;
636
768
637 buf_len = napi.get_value_uint32(argv[2]);
769 buf = nxt_unit_response_buf_alloc(req, buf_len);
770 if (buf == NULL) {
771 throw exception("Failed to allocate response buffer");
772 }
638
773
639 buf_type = napi.type_of(argv[1]);
774 if (buf_type == napi_string) {
775 /* TODO: will work only for utf8 content-type */
640
776
777 have_buf_len = napi.get_value_string_utf8(argv[0], buf->free,
778 buf_len);
779
780 } else {
781 ptr = napi.get_buffer_info(argv[0], have_buf_len);
782
783 memcpy(buf->free, ptr, have_buf_len);
784 }
785
786 buf->free += have_buf_len;
787
788 ret = nxt_unit_buf_send(buf);
789 if (ret != NXT_UNIT_OK) {
790 throw exception("Failed to send body buf");
791 }
641 } catch (exception &e) {
642 napi.throw_error(e);
643 return nullptr;
644 }
645
792 } catch (exception &e) {
793 napi.throw_error(e);
794 return nullptr;
795 }
796
646 buf_len++;
797 return this_arg;
798}
647
799
648 buf = nxt_unit_response_buf_alloc(req, buf_len);
649 if (buf == NULL) {
650 goto failed;
800
801napi_value
802Unit::response_end(napi_env env, napi_callback_info info)
803{
804 nxt_napi napi(env);
805 napi_value this_arg;
806 req_data_t *req_data;
807 nxt_unit_request_info_t *req;
808
809 try {
810 this_arg = napi.get_cb_info(info);
811
812 req = napi.get_request_info(this_arg);
813
814 req_data = (req_data_t *) req->data;
815
816 napi.remove_wrap(req_data->sock_ref);
817 napi.remove_wrap(req_data->resp_ref);
818 napi.remove_wrap(req_data->conn_ref);
819
820 } catch (exception &e) {
821 napi.throw_error(e);
822 return nullptr;
651 }
652
823 }
824
653 if (buf_type == napi_string) {
654 /* TODO: will work only for utf8 content-type */
825 nxt_unit_request_done(req, NXT_UNIT_OK);
655
826
656 status = napi_get_value_string_utf8(env, argv[1], buf->free,
657 buf_len, &have_buf_len);
827 return this_arg;
828}
658
829
659 } else {
660 status = napi_get_buffer_info(env, argv[1], (void **) &ptr,
661 &have_buf_len);
662
830
663 memcpy(buf->free, ptr, have_buf_len);
831napi_value
832Unit::websocket_send_frame(napi_env env, napi_callback_info info)
833{
834 int ret, iovec_len;
835 bool fin;
836 size_t buf_len;
837 uint32_t opcode, sc;
838 nxt_napi napi(env);
839 napi_value this_arg, frame, payload;
840 nxt_unit_request_info_t *req;
841 char status_code[2];
842 struct iovec iov[2];
843
844 iovec_len = 0;
845
846 try {
847 this_arg = napi.get_cb_info(info, frame);
848
849 req = napi.get_request_info(this_arg);
850
851 opcode = napi.get_value_uint32(napi.get_named_property(frame,
852 "opcode"));
853 if (opcode == NXT_WEBSOCKET_OP_CLOSE) {
854 sc = napi.get_value_uint32(napi.get_named_property(frame,
855 "closeStatus"));
856 status_code[0] = (sc >> 8) & 0xFF;
857 status_code[1] = sc & 0xFF;
858
859 iov[iovec_len].iov_base = status_code;
860 iov[iovec_len].iov_len = 2;
861 iovec_len++;
862 }
863
864 try {
865 fin = napi.get_value_bool(napi.get_named_property(frame, "fin"));
866
867 } catch (exception &e) {
868 fin = true;
869 }
870
871 payload = napi.get_named_property(frame, "binaryPayload");
872
873 if (napi.is_buffer(payload)) {
874 iov[iovec_len].iov_base = napi.get_buffer_info(payload, buf_len);
875
876 } else {
877 buf_len = 0;
878 }
879
880 } catch (exception &e) {
881 napi.throw_error(e);
882 return nullptr;
664 }
665
883 }
884
666 if (status != napi_ok) {
667 goto failed;
885 if (buf_len > 0) {
886 iov[iovec_len].iov_len = buf_len;
887 iovec_len++;
668 }
669
888 }
889
670 buf->free += have_buf_len;
671
672 ret = nxt_unit_buf_send(buf);
890 ret = nxt_unit_websocket_sendv(req, opcode, fin ? 1 : 0, iov, iovec_len);
673 if (ret != NXT_UNIT_OK) {
674 goto failed;
675 }
676
677 return this_arg;
678
679failed:
680
891 if (ret != NXT_UNIT_OK) {
892 goto failed;
893 }
894
895 return this_arg;
896
897failed:
898
681 napi.throw_error("Failed to write body");
899 napi.throw_error("Failed to send frame");
682
683 return nullptr;
684}
685
686
687napi_value
900
901 return nullptr;
902}
903
904
905napi_value
688Unit::response_end(napi_env env, napi_callback_info info)
906Unit::websocket_set_sock(napi_env env, napi_callback_info info)
689{
907{
690 size_t argc;
691 nxt_napi napi(env);
908 nxt_napi napi(env);
692 napi_value resp, this_arg, req_num;
909 napi_value this_arg, sock;
910 req_data_t *req_data;
693 nxt_unit_request_info_t *req;
694
911 nxt_unit_request_info_t *req;
912
695 argc = 1;
696
697 try {
913 try {
698 this_arg = napi.get_cb_info(info, argc, &resp);
914 this_arg = napi.get_cb_info(info, sock);
699
915
700 req_num = napi.get_named_property(resp, "_req_point");
701 req = napi.get_request_info(req_num);
916 req = napi.get_request_info(sock);
702
917
918 req_data = (req_data_t *) req->data;
919 req_data->conn_ref = napi.wrap(this_arg, req, conn_destroy);
920
703 } catch (exception &e) {
704 napi.throw_error(e);
705 return nullptr;
706 }
707
921 } catch (exception &e) {
922 napi.throw_error(e);
923 return nullptr;
924 }
925
708 nxt_unit_request_done(req, NXT_UNIT_OK);
709
710 return this_arg;
711}
926 return this_arg;
927}
928
929
930void
931Unit::conn_destroy(napi_env env, void *nativeObject, void *finalize_hint)
932{
933 nxt_unit_request_info_t *req;
934
935 req = (nxt_unit_request_info_t *) nativeObject;
936
937 nxt_unit_warn(NULL, "conn_destroy: %p", req);
938}
939
940
941void
942Unit::sock_destroy(napi_env env, void *nativeObject, void *finalize_hint)
943{
944 nxt_unit_request_info_t *req;
945
946 req = (nxt_unit_request_info_t *) nativeObject;
947
948 nxt_unit_warn(NULL, "sock_destroy: %p", req);
949}
950
951
952void
953Unit::resp_destroy(napi_env env, void *nativeObject, void *finalize_hint)
954{
955 nxt_unit_request_info_t *req;
956
957 req = (nxt_unit_request_info_t *) nativeObject;
958
959 nxt_unit_warn(NULL, "resp_destroy: %p", req);
960}