unit.cpp (1731:43759e8fd1f6) unit.cpp (1766:9ec17030b67e)
1
2/*
3 * Copyright (C) NGINX, Inc.
4 */
5
6#include "unit.h"
7
8#include <unistd.h>
9#include <fcntl.h>
10
11#include <uv.h>
12
13#include <nxt_unit_websocket.h>
14
15
16static void delete_port_data(uv_handle_t* handle);
17
18napi_ref Unit::constructor_;
19
20
21struct port_data_t {
22 nxt_unit_ctx_t *ctx;
23 nxt_unit_port_t *port;
24 uv_poll_t poll;
25};
26
27
28struct req_data_t {
29 napi_ref sock_ref;
1
2/*
3 * Copyright (C) NGINX, Inc.
4 */
5
6#include "unit.h"
7
8#include <unistd.h>
9#include <fcntl.h>
10
11#include <uv.h>
12
13#include <nxt_unit_websocket.h>
14
15
16static void delete_port_data(uv_handle_t* handle);
17
18napi_ref Unit::constructor_;
19
20
21struct port_data_t {
22 nxt_unit_ctx_t *ctx;
23 nxt_unit_port_t *port;
24 uv_poll_t poll;
25};
26
27
28struct req_data_t {
29 napi_ref sock_ref;
30 napi_ref req_ref;
30 napi_ref resp_ref;
31 napi_ref conn_ref;
32};
33
34
35Unit::Unit(napi_env env, napi_value jsthis):
36 nxt_napi(env),
37 wrapper_(wrap(jsthis, this, destroy)),
38 unit_ctx_(nullptr)
39{
40 nxt_unit_debug(NULL, "Unit::Unit()");
41}
42
43
44Unit::~Unit()
45{
46 delete_reference(wrapper_);
47
48 nxt_unit_debug(NULL, "Unit::~Unit()");
49}
50
51
52napi_value
53Unit::init(napi_env env, napi_value exports)
54{
55 nxt_napi napi(env);
56 napi_value ctor;
57
58 napi_property_descriptor unit_props[] = {
59 { "createServer", 0, create_server, 0, 0, 0, napi_default, 0 },
60 { "listen", 0, listen, 0, 0, 0, napi_default, 0 },
61 };
62
63 try {
64 ctor = napi.define_class("Unit", create, 2, unit_props);
65 constructor_ = napi.create_reference(ctor);
66
67 napi.set_named_property(exports, "Unit", ctor);
31 napi_ref resp_ref;
32 napi_ref conn_ref;
33};
34
35
36Unit::Unit(napi_env env, napi_value jsthis):
37 nxt_napi(env),
38 wrapper_(wrap(jsthis, this, destroy)),
39 unit_ctx_(nullptr)
40{
41 nxt_unit_debug(NULL, "Unit::Unit()");
42}
43
44
45Unit::~Unit()
46{
47 delete_reference(wrapper_);
48
49 nxt_unit_debug(NULL, "Unit::~Unit()");
50}
51
52
53napi_value
54Unit::init(napi_env env, napi_value exports)
55{
56 nxt_napi napi(env);
57 napi_value ctor;
58
59 napi_property_descriptor unit_props[] = {
60 { "createServer", 0, create_server, 0, 0, 0, napi_default, 0 },
61 { "listen", 0, listen, 0, 0, 0, napi_default, 0 },
62 };
63
64 try {
65 ctor = napi.define_class("Unit", create, 2, unit_props);
66 constructor_ = napi.create_reference(ctor);
67
68 napi.set_named_property(exports, "Unit", ctor);
69 napi.set_named_property(exports, "request_read", request_read);
68 napi.set_named_property(exports, "response_send_headers",
69 response_send_headers);
70 napi.set_named_property(exports, "response_write", response_write);
71 napi.set_named_property(exports, "response_end", response_end);
72 napi.set_named_property(exports, "websocket_send_frame",
73 websocket_send_frame);
74 napi.set_named_property(exports, "websocket_set_sock",
75 websocket_set_sock);
76 napi.set_named_property(exports, "buf_min", nxt_unit_buf_min());
77 napi.set_named_property(exports, "buf_max", nxt_unit_buf_max());
78
79 } catch (exception &e) {
80 napi.throw_error(e);
81 return nullptr;
82 }
83
84 return exports;
85}
86
87
88void
89Unit::destroy(napi_env env, void *nativeObject, void *finalize_hint)
90{
91 Unit *obj = reinterpret_cast<Unit *>(nativeObject);
92
93 delete obj;
94}
95
96
97napi_value
98Unit::create(napi_env env, napi_callback_info info)
99{
100 nxt_napi napi(env);
101 napi_value target, ctor, instance, jsthis;
102
103 try {
104 target = napi.get_new_target(info);
105
106 if (target != nullptr) {
107 /* Invoked as constructor: `new Unit(...)`. */
108 jsthis = napi.get_cb_info(info);
109
110 new Unit(env, jsthis);
111 napi.create_reference(jsthis);
112
113 return jsthis;
114 }
115
116 /* Invoked as plain function `Unit(...)`, turn into construct call. */
117 ctor = napi.get_reference_value(constructor_);
118 instance = napi.new_instance(ctor);
119 napi.create_reference(instance);
120
121 } catch (exception &e) {
122 napi.throw_error(e);
123 return nullptr;
124 }
125
126 return instance;
127}
128
129
130napi_value
131Unit::create_server(napi_env env, napi_callback_info info)
132{
133 Unit *obj;
134 size_t argc;
135 nxt_napi napi(env);
136 napi_value jsthis, argv;
137 nxt_unit_init_t unit_init;
138
139 argc = 1;
140
141 try {
142 jsthis = napi.get_cb_info(info, argc, &argv);
143 obj = (Unit *) napi.unwrap(jsthis);
144
145 } catch (exception &e) {
146 napi.throw_error(e);
147 return nullptr;
148 }
149
150 memset(&unit_init, 0, sizeof(nxt_unit_init_t));
151
152 unit_init.data = obj;
153 unit_init.callbacks.request_handler = request_handler_cb;
154 unit_init.callbacks.websocket_handler = websocket_handler_cb;
155 unit_init.callbacks.close_handler = close_handler_cb;
156 unit_init.callbacks.shm_ack_handler = shm_ack_handler_cb;
157 unit_init.callbacks.add_port = add_port;
158 unit_init.callbacks.remove_port = remove_port;
159 unit_init.callbacks.quit = quit_cb;
160
161 unit_init.request_data_size = sizeof(req_data_t);
162
163 obj->unit_ctx_ = nxt_unit_init(&unit_init);
164 if (obj->unit_ctx_ == NULL) {
165 goto failed;
166 }
167
168 return nullptr;
169
170failed:
171
172 napi_throw_error(env, NULL, "Failed to create Unit object");
173
174 return nullptr;
175}
176
177
178napi_value
179Unit::listen(napi_env env, napi_callback_info info)
180{
181 return nullptr;
182}
183
184
185void
186Unit::request_handler_cb(nxt_unit_request_info_t *req)
187{
188 Unit *obj;
189
190 obj = reinterpret_cast<Unit *>(req->unit->data);
191
192 obj->request_handler(req);
193}
194
195
196void
197Unit::request_handler(nxt_unit_request_info_t *req)
198{
199 napi_value socket, request, response, server_obj, emit_request;
200
201 memset(req->data, 0, sizeof(req_data_t));
202
203 try {
204 nxt_handle_scope scope(env());
205
206 server_obj = get_server_object();
207
208 socket = create_socket(server_obj, req);
70 napi.set_named_property(exports, "response_send_headers",
71 response_send_headers);
72 napi.set_named_property(exports, "response_write", response_write);
73 napi.set_named_property(exports, "response_end", response_end);
74 napi.set_named_property(exports, "websocket_send_frame",
75 websocket_send_frame);
76 napi.set_named_property(exports, "websocket_set_sock",
77 websocket_set_sock);
78 napi.set_named_property(exports, "buf_min", nxt_unit_buf_min());
79 napi.set_named_property(exports, "buf_max", nxt_unit_buf_max());
80
81 } catch (exception &e) {
82 napi.throw_error(e);
83 return nullptr;
84 }
85
86 return exports;
87}
88
89
90void
91Unit::destroy(napi_env env, void *nativeObject, void *finalize_hint)
92{
93 Unit *obj = reinterpret_cast<Unit *>(nativeObject);
94
95 delete obj;
96}
97
98
99napi_value
100Unit::create(napi_env env, napi_callback_info info)
101{
102 nxt_napi napi(env);
103 napi_value target, ctor, instance, jsthis;
104
105 try {
106 target = napi.get_new_target(info);
107
108 if (target != nullptr) {
109 /* Invoked as constructor: `new Unit(...)`. */
110 jsthis = napi.get_cb_info(info);
111
112 new Unit(env, jsthis);
113 napi.create_reference(jsthis);
114
115 return jsthis;
116 }
117
118 /* Invoked as plain function `Unit(...)`, turn into construct call. */
119 ctor = napi.get_reference_value(constructor_);
120 instance = napi.new_instance(ctor);
121 napi.create_reference(instance);
122
123 } catch (exception &e) {
124 napi.throw_error(e);
125 return nullptr;
126 }
127
128 return instance;
129}
130
131
132napi_value
133Unit::create_server(napi_env env, napi_callback_info info)
134{
135 Unit *obj;
136 size_t argc;
137 nxt_napi napi(env);
138 napi_value jsthis, argv;
139 nxt_unit_init_t unit_init;
140
141 argc = 1;
142
143 try {
144 jsthis = napi.get_cb_info(info, argc, &argv);
145 obj = (Unit *) napi.unwrap(jsthis);
146
147 } catch (exception &e) {
148 napi.throw_error(e);
149 return nullptr;
150 }
151
152 memset(&unit_init, 0, sizeof(nxt_unit_init_t));
153
154 unit_init.data = obj;
155 unit_init.callbacks.request_handler = request_handler_cb;
156 unit_init.callbacks.websocket_handler = websocket_handler_cb;
157 unit_init.callbacks.close_handler = close_handler_cb;
158 unit_init.callbacks.shm_ack_handler = shm_ack_handler_cb;
159 unit_init.callbacks.add_port = add_port;
160 unit_init.callbacks.remove_port = remove_port;
161 unit_init.callbacks.quit = quit_cb;
162
163 unit_init.request_data_size = sizeof(req_data_t);
164
165 obj->unit_ctx_ = nxt_unit_init(&unit_init);
166 if (obj->unit_ctx_ == NULL) {
167 goto failed;
168 }
169
170 return nullptr;
171
172failed:
173
174 napi_throw_error(env, NULL, "Failed to create Unit object");
175
176 return nullptr;
177}
178
179
180napi_value
181Unit::listen(napi_env env, napi_callback_info info)
182{
183 return nullptr;
184}
185
186
187void
188Unit::request_handler_cb(nxt_unit_request_info_t *req)
189{
190 Unit *obj;
191
192 obj = reinterpret_cast<Unit *>(req->unit->data);
193
194 obj->request_handler(req);
195}
196
197
198void
199Unit::request_handler(nxt_unit_request_info_t *req)
200{
201 napi_value socket, request, response, server_obj, emit_request;
202
203 memset(req->data, 0, sizeof(req_data_t));
204
205 try {
206 nxt_handle_scope scope(env());
207
208 server_obj = get_server_object();
209
210 socket = create_socket(server_obj, req);
209 request = create_request(server_obj, socket);
211 request = create_request(server_obj, socket, req);
210 response = create_response(server_obj, request, req);
211
212 create_headers(req, request);
213
214 emit_request = get_named_property(server_obj, "emit_request");
215
216 nxt_async_context async_context(env(), "request_handler");
217 nxt_callback_scope async_scope(async_context);
218
219 make_callback(async_context, server_obj, emit_request, request,
220 response);
221
222 } catch (exception &e) {
223 nxt_unit_req_warn(req, "request_handler: %s", e.str);
224 }
225}
226
227
228void
229Unit::websocket_handler_cb(nxt_unit_websocket_frame_t *ws)
230{
231 Unit *obj;
232
233 obj = reinterpret_cast<Unit *>(ws->req->unit->data);
234
235 obj->websocket_handler(ws);
236}
237
238
239void
240Unit::websocket_handler(nxt_unit_websocket_frame_t *ws)
241{
242 napi_value frame, server_obj, process_frame, conn;
243 req_data_t *req_data;
244
245 req_data = (req_data_t *) ws->req->data;
246
247 try {
248 nxt_handle_scope scope(env());
249
250 server_obj = get_server_object();
251
252 frame = create_websocket_frame(server_obj, ws);
253
254 conn = get_reference_value(req_data->conn_ref);
255
256 process_frame = get_named_property(conn, "processFrame");
257
258 nxt_async_context async_context(env(), "websocket_handler");
259 nxt_callback_scope async_scope(async_context);
260
261 make_callback(async_context, conn, process_frame, frame);
262
263 } catch (exception &e) {
264 nxt_unit_req_warn(ws->req, "websocket_handler: %s", e.str);
265 }
266
267 nxt_unit_websocket_done(ws);
268}
269
270
271void
272Unit::close_handler_cb(nxt_unit_request_info_t *req)
273{
274 Unit *obj;
275
276 obj = reinterpret_cast<Unit *>(req->unit->data);
277
278 obj->close_handler(req);
279}
280
281
282void
283Unit::close_handler(nxt_unit_request_info_t *req)
284{
285 napi_value conn_handle_close, conn;
286 req_data_t *req_data;
287
288 req_data = (req_data_t *) req->data;
289
290 try {
291 nxt_handle_scope scope(env());
292
293 conn = get_reference_value(req_data->conn_ref);
294
295 conn_handle_close = get_named_property(conn, "handleSocketClose");
296
297 nxt_async_context async_context(env(), "close_handler");
298 nxt_callback_scope async_scope(async_context);
299
300 make_callback(async_context, conn, conn_handle_close,
301 nxt_napi::create(0));
302
303 remove_wrap(req_data->sock_ref);
212 response = create_response(server_obj, request, req);
213
214 create_headers(req, request);
215
216 emit_request = get_named_property(server_obj, "emit_request");
217
218 nxt_async_context async_context(env(), "request_handler");
219 nxt_callback_scope async_scope(async_context);
220
221 make_callback(async_context, server_obj, emit_request, request,
222 response);
223
224 } catch (exception &e) {
225 nxt_unit_req_warn(req, "request_handler: %s", e.str);
226 }
227}
228
229
230void
231Unit::websocket_handler_cb(nxt_unit_websocket_frame_t *ws)
232{
233 Unit *obj;
234
235 obj = reinterpret_cast<Unit *>(ws->req->unit->data);
236
237 obj->websocket_handler(ws);
238}
239
240
241void
242Unit::websocket_handler(nxt_unit_websocket_frame_t *ws)
243{
244 napi_value frame, server_obj, process_frame, conn;
245 req_data_t *req_data;
246
247 req_data = (req_data_t *) ws->req->data;
248
249 try {
250 nxt_handle_scope scope(env());
251
252 server_obj = get_server_object();
253
254 frame = create_websocket_frame(server_obj, ws);
255
256 conn = get_reference_value(req_data->conn_ref);
257
258 process_frame = get_named_property(conn, "processFrame");
259
260 nxt_async_context async_context(env(), "websocket_handler");
261 nxt_callback_scope async_scope(async_context);
262
263 make_callback(async_context, conn, process_frame, frame);
264
265 } catch (exception &e) {
266 nxt_unit_req_warn(ws->req, "websocket_handler: %s", e.str);
267 }
268
269 nxt_unit_websocket_done(ws);
270}
271
272
273void
274Unit::close_handler_cb(nxt_unit_request_info_t *req)
275{
276 Unit *obj;
277
278 obj = reinterpret_cast<Unit *>(req->unit->data);
279
280 obj->close_handler(req);
281}
282
283
284void
285Unit::close_handler(nxt_unit_request_info_t *req)
286{
287 napi_value conn_handle_close, conn;
288 req_data_t *req_data;
289
290 req_data = (req_data_t *) req->data;
291
292 try {
293 nxt_handle_scope scope(env());
294
295 conn = get_reference_value(req_data->conn_ref);
296
297 conn_handle_close = get_named_property(conn, "handleSocketClose");
298
299 nxt_async_context async_context(env(), "close_handler");
300 nxt_callback_scope async_scope(async_context);
301
302 make_callback(async_context, conn, conn_handle_close,
303 nxt_napi::create(0));
304
305 remove_wrap(req_data->sock_ref);
306 remove_wrap(req_data->req_ref);
304 remove_wrap(req_data->resp_ref);
305 remove_wrap(req_data->conn_ref);
306
307 } catch (exception &e) {
308 nxt_unit_req_warn(req, "close_handler: %s", e.str);
309
310 nxt_unit_request_done(req, NXT_UNIT_ERROR);
311
312 return;
313 }
314
315 nxt_unit_request_done(req, NXT_UNIT_OK);
316}
317
318
319void
320Unit::shm_ack_handler_cb(nxt_unit_ctx_t *ctx)
321{
322 Unit *obj;
323
324 obj = reinterpret_cast<Unit *>(ctx->unit->data);
325
326 obj->shm_ack_handler(ctx);
327}
328
329
330void
331Unit::shm_ack_handler(nxt_unit_ctx_t *ctx)
332{
333 napi_value server_obj, emit_drain;
334
335 try {
336 nxt_handle_scope scope(env());
337
338 server_obj = get_server_object();
339
340 emit_drain = get_named_property(server_obj, "emit_drain");
341
342 nxt_async_context async_context(env(), "shm_ack_handler");
343 nxt_callback_scope async_scope(async_context);
344
345 make_callback(async_context, server_obj, emit_drain);
346
347 } catch (exception &e) {
348 nxt_unit_warn(ctx, "shm_ack_handler: %s", e.str);
349 }
350}
351
352
353static void
354nxt_uv_read_callback(uv_poll_t *handle, int status, int events)
355{
356 port_data_t *data;
357
358 data = (port_data_t *) handle->data;
359
360 nxt_unit_process_port_msg(data->ctx, data->port);
361}
362
363
364int
365Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
366{
367 int err;
368 Unit *obj;
369 uv_loop_t *loop;
370 port_data_t *data;
371 napi_status status;
372
373 if (port->in_fd != -1) {
374 obj = reinterpret_cast<Unit *>(ctx->unit->data);
375
376 if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) {
377 nxt_unit_warn(ctx, "fcntl(%d, O_NONBLOCK) failed: %s (%d)",
378 port->in_fd, strerror(errno), errno);
379 return -1;
380 }
381
382 status = napi_get_uv_event_loop(obj->env(), &loop);
383 if (status != napi_ok) {
384 nxt_unit_warn(ctx, "Failed to get uv.loop");
385 return NXT_UNIT_ERROR;
386 }
387
388 data = new port_data_t;
389
390 err = uv_poll_init(loop, &data->poll, port->in_fd);
391 if (err < 0) {
392 nxt_unit_warn(ctx, "Failed to init uv.poll");
393 return NXT_UNIT_ERROR;
394 }
395
396 err = uv_poll_start(&data->poll, UV_READABLE, nxt_uv_read_callback);
397 if (err < 0) {
398 nxt_unit_warn(ctx, "Failed to start uv.poll");
399 return NXT_UNIT_ERROR;
400 }
401
402 port->data = data;
403
404 data->ctx = ctx;
405 data->port = port;
406 data->poll.data = data;
407 }
408
409 return NXT_UNIT_OK;
410}
411
412
413void
414Unit::remove_port(nxt_unit_t *unit, nxt_unit_port_t *port)
415{
416 port_data_t *data;
417
418 if (port->data != NULL) {
419 data = (port_data_t *) port->data;
420
421 if (data->port == port) {
422 uv_poll_stop(&data->poll);
423
424 uv_close((uv_handle_t *) &data->poll, delete_port_data);
425 }
426 }
427}
428
429
430static void
431delete_port_data(uv_handle_t* handle)
432{
433 port_data_t *data;
434
435 data = (port_data_t *) handle->data;
436
437 delete data;
438}
439
440
441void
442Unit::quit_cb(nxt_unit_ctx_t *ctx)
443{
444 Unit *obj;
445
446 obj = reinterpret_cast<Unit *>(ctx->unit->data);
447
448 obj->quit(ctx);
449}
450
451
452void
453Unit::quit(nxt_unit_ctx_t *ctx)
454{
455 napi_value server_obj, emit_close;
456
457 try {
458 nxt_handle_scope scope(env());
459
460 server_obj = get_server_object();
461
462 emit_close = get_named_property(server_obj, "emit_close");
463
464 nxt_async_context async_context(env(), "unit_quit");
465 nxt_callback_scope async_scope(async_context);
466
467 make_callback(async_context, server_obj, emit_close);
468
469 } catch (exception &e) {
470 nxt_unit_debug(ctx, "quit: %s", e.str);
471 }
472
473 nxt_unit_done(ctx);
474}
475
476
477napi_value
478Unit::get_server_object()
479{
480 napi_value unit_obj;
481
482 unit_obj = get_reference_value(wrapper_);
483
484 return get_named_property(unit_obj, "server");
485}
486
487
488void
489Unit::create_headers(nxt_unit_request_info_t *req, napi_value request)
490{
307 remove_wrap(req_data->resp_ref);
308 remove_wrap(req_data->conn_ref);
309
310 } catch (exception &e) {
311 nxt_unit_req_warn(req, "close_handler: %s", e.str);
312
313 nxt_unit_request_done(req, NXT_UNIT_ERROR);
314
315 return;
316 }
317
318 nxt_unit_request_done(req, NXT_UNIT_OK);
319}
320
321
322void
323Unit::shm_ack_handler_cb(nxt_unit_ctx_t *ctx)
324{
325 Unit *obj;
326
327 obj = reinterpret_cast<Unit *>(ctx->unit->data);
328
329 obj->shm_ack_handler(ctx);
330}
331
332
333void
334Unit::shm_ack_handler(nxt_unit_ctx_t *ctx)
335{
336 napi_value server_obj, emit_drain;
337
338 try {
339 nxt_handle_scope scope(env());
340
341 server_obj = get_server_object();
342
343 emit_drain = get_named_property(server_obj, "emit_drain");
344
345 nxt_async_context async_context(env(), "shm_ack_handler");
346 nxt_callback_scope async_scope(async_context);
347
348 make_callback(async_context, server_obj, emit_drain);
349
350 } catch (exception &e) {
351 nxt_unit_warn(ctx, "shm_ack_handler: %s", e.str);
352 }
353}
354
355
356static void
357nxt_uv_read_callback(uv_poll_t *handle, int status, int events)
358{
359 port_data_t *data;
360
361 data = (port_data_t *) handle->data;
362
363 nxt_unit_process_port_msg(data->ctx, data->port);
364}
365
366
367int
368Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
369{
370 int err;
371 Unit *obj;
372 uv_loop_t *loop;
373 port_data_t *data;
374 napi_status status;
375
376 if (port->in_fd != -1) {
377 obj = reinterpret_cast<Unit *>(ctx->unit->data);
378
379 if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) {
380 nxt_unit_warn(ctx, "fcntl(%d, O_NONBLOCK) failed: %s (%d)",
381 port->in_fd, strerror(errno), errno);
382 return -1;
383 }
384
385 status = napi_get_uv_event_loop(obj->env(), &loop);
386 if (status != napi_ok) {
387 nxt_unit_warn(ctx, "Failed to get uv.loop");
388 return NXT_UNIT_ERROR;
389 }
390
391 data = new port_data_t;
392
393 err = uv_poll_init(loop, &data->poll, port->in_fd);
394 if (err < 0) {
395 nxt_unit_warn(ctx, "Failed to init uv.poll");
396 return NXT_UNIT_ERROR;
397 }
398
399 err = uv_poll_start(&data->poll, UV_READABLE, nxt_uv_read_callback);
400 if (err < 0) {
401 nxt_unit_warn(ctx, "Failed to start uv.poll");
402 return NXT_UNIT_ERROR;
403 }
404
405 port->data = data;
406
407 data->ctx = ctx;
408 data->port = port;
409 data->poll.data = data;
410 }
411
412 return NXT_UNIT_OK;
413}
414
415
416void
417Unit::remove_port(nxt_unit_t *unit, nxt_unit_port_t *port)
418{
419 port_data_t *data;
420
421 if (port->data != NULL) {
422 data = (port_data_t *) port->data;
423
424 if (data->port == port) {
425 uv_poll_stop(&data->poll);
426
427 uv_close((uv_handle_t *) &data->poll, delete_port_data);
428 }
429 }
430}
431
432
433static void
434delete_port_data(uv_handle_t* handle)
435{
436 port_data_t *data;
437
438 data = (port_data_t *) handle->data;
439
440 delete data;
441}
442
443
444void
445Unit::quit_cb(nxt_unit_ctx_t *ctx)
446{
447 Unit *obj;
448
449 obj = reinterpret_cast<Unit *>(ctx->unit->data);
450
451 obj->quit(ctx);
452}
453
454
455void
456Unit::quit(nxt_unit_ctx_t *ctx)
457{
458 napi_value server_obj, emit_close;
459
460 try {
461 nxt_handle_scope scope(env());
462
463 server_obj = get_server_object();
464
465 emit_close = get_named_property(server_obj, "emit_close");
466
467 nxt_async_context async_context(env(), "unit_quit");
468 nxt_callback_scope async_scope(async_context);
469
470 make_callback(async_context, server_obj, emit_close);
471
472 } catch (exception &e) {
473 nxt_unit_debug(ctx, "quit: %s", e.str);
474 }
475
476 nxt_unit_done(ctx);
477}
478
479
480napi_value
481Unit::get_server_object()
482{
483 napi_value unit_obj;
484
485 unit_obj = get_reference_value(wrapper_);
486
487 return get_named_property(unit_obj, "server");
488}
489
490
491void
492Unit::create_headers(nxt_unit_request_info_t *req, napi_value request)
493{
491 void *data;
492 uint32_t i;
494 uint32_t i;
493 napi_value headers, raw_headers, buffer;
495 napi_value headers, raw_headers;
494 napi_status status;
495 nxt_unit_request_t *r;
496
497 r = req->request;
498
499 headers = create_object();
500
501 status = napi_create_array_with_length(env(), r->fields_count * 2,
502 &raw_headers);
503 if (status != napi_ok) {
504 throw exception("Failed to create array");
505 }
506
507 for (i = 0; i < r->fields_count; i++) {
508 append_header(r->fields + i, headers, raw_headers, i);
509 }
510
511 set_named_property(request, "headers", headers);
512 set_named_property(request, "rawHeaders", raw_headers);
513 set_named_property(request, "httpVersion", r->version, r->version_length);
514 set_named_property(request, "method", r->method, r->method_length);
515 set_named_property(request, "url", r->target, r->target_length);
516
517 set_named_property(request, "_websocket_handshake", r->websocket_handshake);
496 napi_status status;
497 nxt_unit_request_t *r;
498
499 r = req->request;
500
501 headers = create_object();
502
503 status = napi_create_array_with_length(env(), r->fields_count * 2,
504 &raw_headers);
505 if (status != napi_ok) {
506 throw exception("Failed to create array");
507 }
508
509 for (i = 0; i < r->fields_count; i++) {
510 append_header(r->fields + i, headers, raw_headers, i);
511 }
512
513 set_named_property(request, "headers", headers);
514 set_named_property(request, "rawHeaders", raw_headers);
515 set_named_property(request, "httpVersion", r->version, r->version_length);
516 set_named_property(request, "method", r->method, r->method_length);
517 set_named_property(request, "url", r->target, r->target_length);
518
519 set_named_property(request, "_websocket_handshake", r->websocket_handshake);
518
519 buffer = create_buffer((size_t) req->content_length, &data);
520 nxt_unit_request_read(req, data, req->content_length);
521
522 set_named_property(request, "_data", buffer);
523}
524
525
526inline char
527lowcase(char c)
528{
529 return (c >= 'A' && c <= 'Z') ? (c | 0x20) : c;
530}
531
532
533inline void
534Unit::append_header(nxt_unit_field_t *f, napi_value headers,
535 napi_value raw_headers, uint32_t idx)
536{
537 char *name;
538 uint8_t i;
539 napi_value str, vstr;
540
541 name = (char *) nxt_unit_sptr_get(&f->name);
542
543 str = create_string_latin1(name, f->name_length);
544
545 for (i = 0; i < f->name_length; i++) {
546 name[i] = lowcase(name[i]);
547 }
548
549 vstr = set_named_property(headers, name, f->value, f->value_length);
550
551 set_element(raw_headers, idx * 2, str);
552 set_element(raw_headers, idx * 2 + 1, vstr);
553}
554
555
556napi_value
557Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req)
558{
559 napi_value constructor, res;
560 req_data_t *req_data;
561 nxt_unit_request_t *r;
562
563 r = req->request;
564
565 constructor = get_named_property(server_obj, "Socket");
566
567 res = new_instance(constructor);
568
569 req_data = (req_data_t *) req->data;
570 req_data->sock_ref = wrap(res, req, sock_destroy);
571
572 set_named_property(res, "remoteAddress", r->remote, r->remote_length);
573 set_named_property(res, "localAddress", r->local, r->local_length);
574
575 return res;
576}
577
578
579napi_value
520}
521
522
523inline char
524lowcase(char c)
525{
526 return (c >= 'A' && c <= 'Z') ? (c | 0x20) : c;
527}
528
529
530inline void
531Unit::append_header(nxt_unit_field_t *f, napi_value headers,
532 napi_value raw_headers, uint32_t idx)
533{
534 char *name;
535 uint8_t i;
536 napi_value str, vstr;
537
538 name = (char *) nxt_unit_sptr_get(&f->name);
539
540 str = create_string_latin1(name, f->name_length);
541
542 for (i = 0; i < f->name_length; i++) {
543 name[i] = lowcase(name[i]);
544 }
545
546 vstr = set_named_property(headers, name, f->value, f->value_length);
547
548 set_element(raw_headers, idx * 2, str);
549 set_element(raw_headers, idx * 2 + 1, vstr);
550}
551
552
553napi_value
554Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req)
555{
556 napi_value constructor, res;
557 req_data_t *req_data;
558 nxt_unit_request_t *r;
559
560 r = req->request;
561
562 constructor = get_named_property(server_obj, "Socket");
563
564 res = new_instance(constructor);
565
566 req_data = (req_data_t *) req->data;
567 req_data->sock_ref = wrap(res, req, sock_destroy);
568
569 set_named_property(res, "remoteAddress", r->remote, r->remote_length);
570 set_named_property(res, "localAddress", r->local, r->local_length);
571
572 return res;
573}
574
575
576napi_value
580Unit::create_request(napi_value server_obj, napi_value socket)
577Unit::create_request(napi_value server_obj, napi_value socket,
578 nxt_unit_request_info_t *req)
581{
579{
582 napi_value constructor;
580 napi_value constructor, res;
581 req_data_t *req_data;
583
584 constructor = get_named_property(server_obj, "ServerRequest");
585
582
583 constructor = get_named_property(server_obj, "ServerRequest");
584
586 return new_instance(constructor, server_obj, socket);
585 res = new_instance(constructor, server_obj, socket);
586
587 req_data = (req_data_t *) req->data;
588 req_data->req_ref = wrap(res, req, req_destroy);
589
590 return res;
587}
588
589
590napi_value
591Unit::create_response(napi_value server_obj, napi_value request,
592 nxt_unit_request_info_t *req)
593{
594 napi_value constructor, res;
595 req_data_t *req_data;
596
597 constructor = get_named_property(server_obj, "ServerResponse");
598
599 res = new_instance(constructor, request);
600
601 req_data = (req_data_t *) req->data;
602 req_data->resp_ref = wrap(res, req, resp_destroy);
603
604 return res;
605}
606
607
608napi_value
609Unit::create_websocket_frame(napi_value server_obj,
610 nxt_unit_websocket_frame_t *ws)
611{
612 void *data;
613 napi_value constructor, res, buffer;
614 uint8_t sc[2];
615
616 constructor = get_named_property(server_obj, "WebSocketFrame");
617
618 res = new_instance(constructor);
619
620 set_named_property(res, "fin", (bool) ws->header->fin);
621 set_named_property(res, "opcode", ws->header->opcode);
622 set_named_property(res, "length", (int64_t) ws->payload_len);
623
624 if (ws->header->opcode == NXT_WEBSOCKET_OP_CLOSE) {
625 if (ws->payload_len >= 2) {
626 nxt_unit_websocket_read(ws, sc, 2);
627
628 set_named_property(res, "closeStatus",
629 (((uint16_t) sc[0]) << 8) | sc[1]);
630
631 } else {
632 set_named_property(res, "closeStatus", -1);
633 }
634 }
635
636 buffer = create_buffer((size_t) ws->content_length, &data);
637 nxt_unit_websocket_read(ws, data, ws->content_length);
638
639 set_named_property(res, "binaryPayload", buffer);
640
641 return res;
642}
643
644
645napi_value
591}
592
593
594napi_value
595Unit::create_response(napi_value server_obj, napi_value request,
596 nxt_unit_request_info_t *req)
597{
598 napi_value constructor, res;
599 req_data_t *req_data;
600
601 constructor = get_named_property(server_obj, "ServerResponse");
602
603 res = new_instance(constructor, request);
604
605 req_data = (req_data_t *) req->data;
606 req_data->resp_ref = wrap(res, req, resp_destroy);
607
608 return res;
609}
610
611
612napi_value
613Unit::create_websocket_frame(napi_value server_obj,
614 nxt_unit_websocket_frame_t *ws)
615{
616 void *data;
617 napi_value constructor, res, buffer;
618 uint8_t sc[2];
619
620 constructor = get_named_property(server_obj, "WebSocketFrame");
621
622 res = new_instance(constructor);
623
624 set_named_property(res, "fin", (bool) ws->header->fin);
625 set_named_property(res, "opcode", ws->header->opcode);
626 set_named_property(res, "length", (int64_t) ws->payload_len);
627
628 if (ws->header->opcode == NXT_WEBSOCKET_OP_CLOSE) {
629 if (ws->payload_len >= 2) {
630 nxt_unit_websocket_read(ws, sc, 2);
631
632 set_named_property(res, "closeStatus",
633 (((uint16_t) sc[0]) << 8) | sc[1]);
634
635 } else {
636 set_named_property(res, "closeStatus", -1);
637 }
638 }
639
640 buffer = create_buffer((size_t) ws->content_length, &data);
641 nxt_unit_websocket_read(ws, data, ws->content_length);
642
643 set_named_property(res, "binaryPayload", buffer);
644
645 return res;
646}
647
648
649napi_value
650Unit::request_read(napi_env env, napi_callback_info info)
651{
652 void *data;
653 uint32_t wm;
654 nxt_napi napi(env);
655 napi_value this_arg, argv, buffer;
656 nxt_unit_request_info_t *req;
657
658 try {
659 this_arg = napi.get_cb_info(info, argv);
660
661 try {
662 req = napi.get_request_info(this_arg);
663
664 } catch (exception &e) {
665 return nullptr;
666 }
667
668 if (req->content_length == 0) {
669 return nullptr;
670 }
671
672 wm = napi.get_value_uint32(argv);
673
674 if (wm > req->content_length) {
675 wm = req->content_length;
676 }
677
678 buffer = napi.create_buffer((size_t) wm, &data);
679 nxt_unit_request_read(req, data, wm);
680
681 } catch (exception &e) {
682 napi.throw_error(e);
683 return nullptr;
684 }
685
686 return buffer;
687}
688
689
690napi_value
646Unit::response_send_headers(napi_env env, napi_callback_info info)
647{
648 int ret;
649 char *ptr, *name_ptr;
650 bool is_array;
651 size_t argc, name_len, value_len;
652 uint32_t status_code, header_len, keys_len, array_len;
653 uint32_t keys_count, i, j;
654 uint16_t hash;
655 nxt_napi napi(env);
656 napi_value this_arg, headers, keys, name, value, array_val;
657 napi_value array_entry;
658 napi_valuetype val_type;
659 nxt_unit_field_t *f;
660 nxt_unit_request_info_t *req;
661 napi_value argv[4];
662
663 argc = 4;
664
665 try {
666 this_arg = napi.get_cb_info(info, argc, argv);
667 if (argc != 4) {
668 napi.throw_error("Wrong args count. Expected: "
669 "statusCode, headers, headers count, "
670 "headers length");
671 return nullptr;
672 }
673
674 req = napi.get_request_info(this_arg);
675 status_code = napi.get_value_uint32(argv[0]);
676 keys_count = napi.get_value_uint32(argv[2]);
677 header_len = napi.get_value_uint32(argv[3]);
678
679 headers = argv[1];
680
681 ret = nxt_unit_response_init(req, status_code, keys_count, header_len);
682 if (ret != NXT_UNIT_OK) {
683 napi.throw_error("Failed to create response");
684 return nullptr;
685 }
686
687 /*
688 * Each name and value are 0-terminated by libunit.
689 * Need to add extra 2 bytes for each header.
690 */
691 header_len += keys_count * 2;
692
693 keys = napi.get_property_names(headers);
694 keys_len = napi.get_array_length(keys);
695
696 ptr = req->response_buf->free;
697
698 for (i = 0; i < keys_len; i++) {
699 name = napi.get_element(keys, i);
700
701 array_entry = napi.get_property(headers, name);
702
703 name = napi.get_element(array_entry, 0);
704 value = napi.get_element(array_entry, 1);
705
706 name_len = napi.get_value_string_latin1(name, ptr, header_len);
707 name_ptr = ptr;
708
709 ptr += name_len + 1;
710 header_len -= name_len + 1;
711
712 hash = nxt_unit_field_hash(name_ptr, name_len);
713
714 is_array = napi.is_array(value);
715
716 if (is_array) {
717 array_len = napi.get_array_length(value);
718
719 for (j = 0; j < array_len; j++) {
720 array_val = napi.get_element(value, j);
721
722 val_type = napi.type_of(array_val);
723
724 if (val_type != napi_string) {
725 array_val = napi.coerce_to_string(array_val);
726 }
727
728 value_len = napi.get_value_string_latin1(array_val, ptr,
729 header_len);
730
731 f = req->response->fields + req->response->fields_count;
732 f->skip = 0;
733
734 nxt_unit_sptr_set(&f->name, name_ptr);
735
736 f->name_length = name_len;
737 f->hash = hash;
738
739 nxt_unit_sptr_set(&f->value, ptr);
740 f->value_length = (uint32_t) value_len;
741
742 ptr += value_len + 1;
743 header_len -= value_len + 1;
744
745 req->response->fields_count++;
746 }
747
748 } else {
749 val_type = napi.type_of(value);
750
751 if (val_type != napi_string) {
752 value = napi.coerce_to_string(value);
753 }
754
755 value_len = napi.get_value_string_latin1(value, ptr, header_len);
756
757 f = req->response->fields + req->response->fields_count;
758 f->skip = 0;
759
760 nxt_unit_sptr_set(&f->name, name_ptr);
761
762 f->name_length = name_len;
763 f->hash = hash;
764
765 nxt_unit_sptr_set(&f->value, ptr);
766 f->value_length = (uint32_t) value_len;
767
768 ptr += value_len + 1;
769 header_len -= value_len + 1;
770
771 req->response->fields_count++;
772 }
773 }
774
775 } catch (exception &e) {
776 napi.throw_error(e);
777 return nullptr;
778 }
779
780 req->response_buf->free = ptr;
781
782 ret = nxt_unit_response_send(req);
783 if (ret != NXT_UNIT_OK) {
784 napi.throw_error("Failed to send response");
785 return nullptr;
786 }
787
788 return this_arg;
789}
790
791
792napi_value
793Unit::response_write(napi_env env, napi_callback_info info)
794{
795 int ret;
796 void *ptr;
797 size_t argc, have_buf_len;
798 ssize_t res_len;
799 uint32_t buf_start, buf_len;
800 nxt_napi napi(env);
801 napi_value this_arg;
802 nxt_unit_buf_t *buf;
803 napi_valuetype buf_type;
804 nxt_unit_request_info_t *req;
805 napi_value argv[3];
806
807 argc = 3;
808
809 try {
810 this_arg = napi.get_cb_info(info, argc, argv);
811 if (argc != 3) {
812 throw exception("Wrong args count. Expected: "
813 "chunk, start, length");
814 }
815
816 req = napi.get_request_info(this_arg);
817 buf_type = napi.type_of(argv[0]);
818 buf_start = napi.get_value_uint32(argv[1]);
819 buf_len = napi.get_value_uint32(argv[2]) + 1;
820
821 if (buf_type == napi_string) {
822 /* TODO: will work only for utf8 content-type */
823
824 if (req->response_buf != NULL
825 && req->response_buf->end >= req->response_buf->free + buf_len)
826 {
827 buf = req->response_buf;
828
829 } else {
830 buf = nxt_unit_response_buf_alloc(req, buf_len);
831 if (buf == NULL) {
832 throw exception("Failed to allocate response buffer");
833 }
834 }
835
836 have_buf_len = napi.get_value_string_utf8(argv[0], buf->free,
837 buf_len);
838
839 buf->free += have_buf_len;
840
841 ret = nxt_unit_buf_send(buf);
842 if (ret == NXT_UNIT_OK) {
843 res_len = have_buf_len;
844 }
845
846 } else {
847 ptr = napi.get_buffer_info(argv[0], have_buf_len);
848
849 if (buf_start > 0) {
850 ptr = ((uint8_t *) ptr) + buf_start;
851 have_buf_len -= buf_start;
852 }
853
854 res_len = nxt_unit_response_write_nb(req, ptr, have_buf_len, 0);
855
856 ret = res_len < 0 ? -res_len : (int) NXT_UNIT_OK;
857 }
858
859 if (ret != NXT_UNIT_OK) {
860 throw exception("Failed to send body buf");
861 }
862 } catch (exception &e) {
863 napi.throw_error(e);
864 return nullptr;
865 }
866
867 return napi.create((int64_t) res_len);
868}
869
870
871napi_value
872Unit::response_end(napi_env env, napi_callback_info info)
873{
874 nxt_napi napi(env);
875 napi_value this_arg;
876 req_data_t *req_data;
877 nxt_unit_request_info_t *req;
878
879 try {
880 this_arg = napi.get_cb_info(info);
881
882 req = napi.get_request_info(this_arg);
883
884 req_data = (req_data_t *) req->data;
885
886 napi.remove_wrap(req_data->sock_ref);
691Unit::response_send_headers(napi_env env, napi_callback_info info)
692{
693 int ret;
694 char *ptr, *name_ptr;
695 bool is_array;
696 size_t argc, name_len, value_len;
697 uint32_t status_code, header_len, keys_len, array_len;
698 uint32_t keys_count, i, j;
699 uint16_t hash;
700 nxt_napi napi(env);
701 napi_value this_arg, headers, keys, name, value, array_val;
702 napi_value array_entry;
703 napi_valuetype val_type;
704 nxt_unit_field_t *f;
705 nxt_unit_request_info_t *req;
706 napi_value argv[4];
707
708 argc = 4;
709
710 try {
711 this_arg = napi.get_cb_info(info, argc, argv);
712 if (argc != 4) {
713 napi.throw_error("Wrong args count. Expected: "
714 "statusCode, headers, headers count, "
715 "headers length");
716 return nullptr;
717 }
718
719 req = napi.get_request_info(this_arg);
720 status_code = napi.get_value_uint32(argv[0]);
721 keys_count = napi.get_value_uint32(argv[2]);
722 header_len = napi.get_value_uint32(argv[3]);
723
724 headers = argv[1];
725
726 ret = nxt_unit_response_init(req, status_code, keys_count, header_len);
727 if (ret != NXT_UNIT_OK) {
728 napi.throw_error("Failed to create response");
729 return nullptr;
730 }
731
732 /*
733 * Each name and value are 0-terminated by libunit.
734 * Need to add extra 2 bytes for each header.
735 */
736 header_len += keys_count * 2;
737
738 keys = napi.get_property_names(headers);
739 keys_len = napi.get_array_length(keys);
740
741 ptr = req->response_buf->free;
742
743 for (i = 0; i < keys_len; i++) {
744 name = napi.get_element(keys, i);
745
746 array_entry = napi.get_property(headers, name);
747
748 name = napi.get_element(array_entry, 0);
749 value = napi.get_element(array_entry, 1);
750
751 name_len = napi.get_value_string_latin1(name, ptr, header_len);
752 name_ptr = ptr;
753
754 ptr += name_len + 1;
755 header_len -= name_len + 1;
756
757 hash = nxt_unit_field_hash(name_ptr, name_len);
758
759 is_array = napi.is_array(value);
760
761 if (is_array) {
762 array_len = napi.get_array_length(value);
763
764 for (j = 0; j < array_len; j++) {
765 array_val = napi.get_element(value, j);
766
767 val_type = napi.type_of(array_val);
768
769 if (val_type != napi_string) {
770 array_val = napi.coerce_to_string(array_val);
771 }
772
773 value_len = napi.get_value_string_latin1(array_val, ptr,
774 header_len);
775
776 f = req->response->fields + req->response->fields_count;
777 f->skip = 0;
778
779 nxt_unit_sptr_set(&f->name, name_ptr);
780
781 f->name_length = name_len;
782 f->hash = hash;
783
784 nxt_unit_sptr_set(&f->value, ptr);
785 f->value_length = (uint32_t) value_len;
786
787 ptr += value_len + 1;
788 header_len -= value_len + 1;
789
790 req->response->fields_count++;
791 }
792
793 } else {
794 val_type = napi.type_of(value);
795
796 if (val_type != napi_string) {
797 value = napi.coerce_to_string(value);
798 }
799
800 value_len = napi.get_value_string_latin1(value, ptr, header_len);
801
802 f = req->response->fields + req->response->fields_count;
803 f->skip = 0;
804
805 nxt_unit_sptr_set(&f->name, name_ptr);
806
807 f->name_length = name_len;
808 f->hash = hash;
809
810 nxt_unit_sptr_set(&f->value, ptr);
811 f->value_length = (uint32_t) value_len;
812
813 ptr += value_len + 1;
814 header_len -= value_len + 1;
815
816 req->response->fields_count++;
817 }
818 }
819
820 } catch (exception &e) {
821 napi.throw_error(e);
822 return nullptr;
823 }
824
825 req->response_buf->free = ptr;
826
827 ret = nxt_unit_response_send(req);
828 if (ret != NXT_UNIT_OK) {
829 napi.throw_error("Failed to send response");
830 return nullptr;
831 }
832
833 return this_arg;
834}
835
836
837napi_value
838Unit::response_write(napi_env env, napi_callback_info info)
839{
840 int ret;
841 void *ptr;
842 size_t argc, have_buf_len;
843 ssize_t res_len;
844 uint32_t buf_start, buf_len;
845 nxt_napi napi(env);
846 napi_value this_arg;
847 nxt_unit_buf_t *buf;
848 napi_valuetype buf_type;
849 nxt_unit_request_info_t *req;
850 napi_value argv[3];
851
852 argc = 3;
853
854 try {
855 this_arg = napi.get_cb_info(info, argc, argv);
856 if (argc != 3) {
857 throw exception("Wrong args count. Expected: "
858 "chunk, start, length");
859 }
860
861 req = napi.get_request_info(this_arg);
862 buf_type = napi.type_of(argv[0]);
863 buf_start = napi.get_value_uint32(argv[1]);
864 buf_len = napi.get_value_uint32(argv[2]) + 1;
865
866 if (buf_type == napi_string) {
867 /* TODO: will work only for utf8 content-type */
868
869 if (req->response_buf != NULL
870 && req->response_buf->end >= req->response_buf->free + buf_len)
871 {
872 buf = req->response_buf;
873
874 } else {
875 buf = nxt_unit_response_buf_alloc(req, buf_len);
876 if (buf == NULL) {
877 throw exception("Failed to allocate response buffer");
878 }
879 }
880
881 have_buf_len = napi.get_value_string_utf8(argv[0], buf->free,
882 buf_len);
883
884 buf->free += have_buf_len;
885
886 ret = nxt_unit_buf_send(buf);
887 if (ret == NXT_UNIT_OK) {
888 res_len = have_buf_len;
889 }
890
891 } else {
892 ptr = napi.get_buffer_info(argv[0], have_buf_len);
893
894 if (buf_start > 0) {
895 ptr = ((uint8_t *) ptr) + buf_start;
896 have_buf_len -= buf_start;
897 }
898
899 res_len = nxt_unit_response_write_nb(req, ptr, have_buf_len, 0);
900
901 ret = res_len < 0 ? -res_len : (int) NXT_UNIT_OK;
902 }
903
904 if (ret != NXT_UNIT_OK) {
905 throw exception("Failed to send body buf");
906 }
907 } catch (exception &e) {
908 napi.throw_error(e);
909 return nullptr;
910 }
911
912 return napi.create((int64_t) res_len);
913}
914
915
916napi_value
917Unit::response_end(napi_env env, napi_callback_info info)
918{
919 nxt_napi napi(env);
920 napi_value this_arg;
921 req_data_t *req_data;
922 nxt_unit_request_info_t *req;
923
924 try {
925 this_arg = napi.get_cb_info(info);
926
927 req = napi.get_request_info(this_arg);
928
929 req_data = (req_data_t *) req->data;
930
931 napi.remove_wrap(req_data->sock_ref);
932 napi.remove_wrap(req_data->req_ref);
887 napi.remove_wrap(req_data->resp_ref);
888 napi.remove_wrap(req_data->conn_ref);
889
890 } catch (exception &e) {
891 napi.throw_error(e);
892 return nullptr;
893 }
894
895 nxt_unit_request_done(req, NXT_UNIT_OK);
896
897 return this_arg;
898}
899
900
901napi_value
902Unit::websocket_send_frame(napi_env env, napi_callback_info info)
903{
904 int ret, iovec_len;
905 bool fin;
906 size_t buf_len;
907 uint32_t opcode, sc;
908 nxt_napi napi(env);
909 napi_value this_arg, frame, payload;
910 nxt_unit_request_info_t *req;
911 char status_code[2];
912 struct iovec iov[2];
913
914 iovec_len = 0;
915
916 try {
917 this_arg = napi.get_cb_info(info, frame);
918
919 req = napi.get_request_info(this_arg);
920
921 opcode = napi.get_value_uint32(napi.get_named_property(frame,
922 "opcode"));
923 if (opcode == NXT_WEBSOCKET_OP_CLOSE) {
924 sc = napi.get_value_uint32(napi.get_named_property(frame,
925 "closeStatus"));
926 status_code[0] = (sc >> 8) & 0xFF;
927 status_code[1] = sc & 0xFF;
928
929 iov[iovec_len].iov_base = status_code;
930 iov[iovec_len].iov_len = 2;
931 iovec_len++;
932 }
933
934 try {
935 fin = napi.get_value_bool(napi.get_named_property(frame, "fin"));
936
937 } catch (exception &e) {
938 fin = true;
939 }
940
941 payload = napi.get_named_property(frame, "binaryPayload");
942
943 if (napi.is_buffer(payload)) {
944 iov[iovec_len].iov_base = napi.get_buffer_info(payload, buf_len);
945
946 } else {
947 buf_len = 0;
948 }
949
950 } catch (exception &e) {
951 napi.throw_error(e);
952 return nullptr;
953 }
954
955 if (buf_len > 0) {
956 iov[iovec_len].iov_len = buf_len;
957 iovec_len++;
958 }
959
960 ret = nxt_unit_websocket_sendv(req, opcode, fin ? 1 : 0, iov, iovec_len);
961 if (ret != NXT_UNIT_OK) {
962 goto failed;
963 }
964
965 return this_arg;
966
967failed:
968
969 napi.throw_error("Failed to send frame");
970
971 return nullptr;
972}
973
974
975napi_value
976Unit::websocket_set_sock(napi_env env, napi_callback_info info)
977{
978 nxt_napi napi(env);
979 napi_value this_arg, sock;
980 req_data_t *req_data;
981 nxt_unit_request_info_t *req;
982
983 try {
984 this_arg = napi.get_cb_info(info, sock);
985
986 req = napi.get_request_info(sock);
987
988 req_data = (req_data_t *) req->data;
989 req_data->conn_ref = napi.wrap(this_arg, req, conn_destroy);
990
991 } catch (exception &e) {
992 napi.throw_error(e);
993 return nullptr;
994 }
995
996 return this_arg;
997}
998
999
1000void
1001Unit::conn_destroy(napi_env env, void *r, void *finalize_hint)
1002{
1003 nxt_unit_req_debug(NULL, "conn_destroy: %p", r);
1004}
1005
1006
1007void
1008Unit::sock_destroy(napi_env env, void *r, void *finalize_hint)
1009{
1010 nxt_unit_req_debug(NULL, "sock_destroy: %p", r);
1011}
1012
1013
1014void
933 napi.remove_wrap(req_data->resp_ref);
934 napi.remove_wrap(req_data->conn_ref);
935
936 } catch (exception &e) {
937 napi.throw_error(e);
938 return nullptr;
939 }
940
941 nxt_unit_request_done(req, NXT_UNIT_OK);
942
943 return this_arg;
944}
945
946
947napi_value
948Unit::websocket_send_frame(napi_env env, napi_callback_info info)
949{
950 int ret, iovec_len;
951 bool fin;
952 size_t buf_len;
953 uint32_t opcode, sc;
954 nxt_napi napi(env);
955 napi_value this_arg, frame, payload;
956 nxt_unit_request_info_t *req;
957 char status_code[2];
958 struct iovec iov[2];
959
960 iovec_len = 0;
961
962 try {
963 this_arg = napi.get_cb_info(info, frame);
964
965 req = napi.get_request_info(this_arg);
966
967 opcode = napi.get_value_uint32(napi.get_named_property(frame,
968 "opcode"));
969 if (opcode == NXT_WEBSOCKET_OP_CLOSE) {
970 sc = napi.get_value_uint32(napi.get_named_property(frame,
971 "closeStatus"));
972 status_code[0] = (sc >> 8) & 0xFF;
973 status_code[1] = sc & 0xFF;
974
975 iov[iovec_len].iov_base = status_code;
976 iov[iovec_len].iov_len = 2;
977 iovec_len++;
978 }
979
980 try {
981 fin = napi.get_value_bool(napi.get_named_property(frame, "fin"));
982
983 } catch (exception &e) {
984 fin = true;
985 }
986
987 payload = napi.get_named_property(frame, "binaryPayload");
988
989 if (napi.is_buffer(payload)) {
990 iov[iovec_len].iov_base = napi.get_buffer_info(payload, buf_len);
991
992 } else {
993 buf_len = 0;
994 }
995
996 } catch (exception &e) {
997 napi.throw_error(e);
998 return nullptr;
999 }
1000
1001 if (buf_len > 0) {
1002 iov[iovec_len].iov_len = buf_len;
1003 iovec_len++;
1004 }
1005
1006 ret = nxt_unit_websocket_sendv(req, opcode, fin ? 1 : 0, iov, iovec_len);
1007 if (ret != NXT_UNIT_OK) {
1008 goto failed;
1009 }
1010
1011 return this_arg;
1012
1013failed:
1014
1015 napi.throw_error("Failed to send frame");
1016
1017 return nullptr;
1018}
1019
1020
1021napi_value
1022Unit::websocket_set_sock(napi_env env, napi_callback_info info)
1023{
1024 nxt_napi napi(env);
1025 napi_value this_arg, sock;
1026 req_data_t *req_data;
1027 nxt_unit_request_info_t *req;
1028
1029 try {
1030 this_arg = napi.get_cb_info(info, sock);
1031
1032 req = napi.get_request_info(sock);
1033
1034 req_data = (req_data_t *) req->data;
1035 req_data->conn_ref = napi.wrap(this_arg, req, conn_destroy);
1036
1037 } catch (exception &e) {
1038 napi.throw_error(e);
1039 return nullptr;
1040 }
1041
1042 return this_arg;
1043}
1044
1045
1046void
1047Unit::conn_destroy(napi_env env, void *r, void *finalize_hint)
1048{
1049 nxt_unit_req_debug(NULL, "conn_destroy: %p", r);
1050}
1051
1052
1053void
1054Unit::sock_destroy(napi_env env, void *r, void *finalize_hint)
1055{
1056 nxt_unit_req_debug(NULL, "sock_destroy: %p", r);
1057}
1058
1059
1060void
1061Unit::req_destroy(napi_env env, void *r, void *finalize_hint)
1062{
1063 nxt_unit_req_debug(NULL, "req_destroy: %p", r);
1064}
1065
1066
1067void
1015Unit::resp_destroy(napi_env env, void *r, void *finalize_hint)
1016{
1017 nxt_unit_req_debug(NULL, "resp_destroy: %p", r);
1018}
1068Unit::resp_destroy(napi_env env, void *r, void *finalize_hint)
1069{
1070 nxt_unit_req_debug(NULL, "resp_destroy: %p", r);
1071}