1
2/*
3 * Copyright (C) NGINX, Inc.
4 */
5
6#include "unit.h"
7
8#include <unistd.h>
9#include <fcntl.h>
10
11#include <uv.h>
12
13#include <nxt_unit_websocket.h>
14
15
16static void delete_port_data(uv_handle_t* handle);
17
16napi_ref Unit::constructor_;
17
18
19struct port_data_t {
22 nxt_unit_ctx_t *ctx;
23 nxt_unit_port_t *port;
24 uv_poll_t poll;
20 port_data_t(nxt_unit_ctx_t *c, nxt_unit_port_t *p);
21
22 void process_port_msg();
23 void stop();
24
25 template<typename T>
26 static port_data_t *get(T *handle);
27
28 static void read_callback(uv_poll_t *handle, int status, int events);
29 static void timer_callback(uv_timer_t *handle);
30 static void delete_data(uv_handle_t* handle);
31
32 nxt_unit_ctx_t *ctx;
33 nxt_unit_port_t *port;
34 uv_poll_t poll;
35 uv_timer_t timer;
36 int ref_count;
37 bool scheduled;
38 bool stopped;
39};
40
41
42struct req_data_t {
43 napi_ref sock_ref;
44 napi_ref req_ref;
45 napi_ref resp_ref;
46 napi_ref conn_ref;
47};
48
49
50port_data_t::port_data_t(nxt_unit_ctx_t *c, nxt_unit_port_t *p) :
51 ctx(c), port(p), ref_count(0), scheduled(false), stopped(false)
52{
53 timer.type = UV_UNKNOWN_HANDLE;
54}
55
56
57void
58port_data_t::process_port_msg()
59{
60 int rc, err;
61
62 rc = nxt_unit_process_port_msg(ctx, port);
63
64 if (rc != NXT_UNIT_OK) {
65 return;
66 }
67
68 if (timer.type == UV_UNKNOWN_HANDLE) {
69 err = uv_timer_init(poll.loop, &timer);
70 if (err < 0) {
71 nxt_unit_warn(ctx, "Failed to init uv.poll");
72 return;
73 }
74
75 ref_count++;
76 timer.data = this;
77 }
78
79 if (!scheduled && !stopped) {
80 uv_timer_start(&timer, timer_callback, 0, 0);
81
82 scheduled = true;
83 }
84}
85
86
87void
88port_data_t::stop()
89{
90 stopped = true;
91
92 uv_poll_stop(&poll);
93
94 uv_close((uv_handle_t *) &poll, delete_data);
95
96 if (timer.type == UV_UNKNOWN_HANDLE) {
97 return;
98 }
99
100 uv_timer_stop(&timer);
101
102 uv_close((uv_handle_t *) &timer, delete_data);
103}
104
105
106template<typename T>
107port_data_t *
108port_data_t::get(T *handle)
109{
110 return (port_data_t *) handle->data;
111}
112
113
114void
115port_data_t::read_callback(uv_poll_t *handle, int status, int events)
116{
117 get(handle)->process_port_msg();
118}
119
120
121void
122port_data_t::timer_callback(uv_timer_t *handle)
123{
124 port_data_t *data;
125
126 data = get(handle);
127
128 data->scheduled = false;
129 if (data->stopped) {
130 return;
131 }
132
133 data->process_port_msg();
134}
135
136
137void
138port_data_t::delete_data(uv_handle_t* handle)
139{
140 port_data_t *data;
141
142 data = get(handle);
143
144 if (--data->ref_count <= 0) {
145 delete data;
146 }
147}
148
149
150Unit::Unit(napi_env env, napi_value jsthis):
151 nxt_napi(env),
152 wrapper_(wrap(jsthis, this, destroy)),
153 unit_ctx_(nullptr)
154{
155 nxt_unit_debug(NULL, "Unit::Unit()");
156}
157

--- 304 unchanged lines hidden (view full) ---

462 make_callback(async_context, server_obj, emit_drain);
463
464 } catch (exception &e) {
465 nxt_unit_warn(ctx, "shm_ack_handler: %s", e.str);
466 }
467}
468
469
356static void
357nxt_uv_read_callback(uv_poll_t *handle, int status, int events)
358{
359 port_data_t *data;
360
361 data = (port_data_t *) handle->data;
362
363 nxt_unit_process_port_msg(data->ctx, data->port);
364}
365
366
470int
471Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
472{
370 int err;
371 Unit *obj;
372 uv_loop_t *loop;
373 port_data_t *data;
374 napi_status status;
473 int err;
474 Unit *obj;
475 uv_loop_t *loop;
476 port_data_t *data;
477 napi_status status;
478
479 if (port->in_fd != -1) {
377 obj = reinterpret_cast<Unit *>(ctx->unit->data);
378
480 if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) {
481 nxt_unit_warn(ctx, "fcntl(%d, O_NONBLOCK) failed: %s (%d)",
482 port->in_fd, strerror(errno), errno);
483 return -1;
484 }
485
486 obj = reinterpret_cast<Unit *>(ctx->unit->data);
487
488 status = napi_get_uv_event_loop(obj->env(), &loop);
489 if (status != napi_ok) {
490 nxt_unit_warn(ctx, "Failed to get uv.loop");
491 return NXT_UNIT_ERROR;
492 }
493
391 data = new port_data_t;
494 data = new port_data_t(ctx, port);
495
496 err = uv_poll_init(loop, &data->poll, port->in_fd);
497 if (err < 0) {
498 nxt_unit_warn(ctx, "Failed to init uv.poll");
499 delete data;
500 return NXT_UNIT_ERROR;
501 }
502
399 err = uv_poll_start(&data->poll, UV_READABLE, nxt_uv_read_callback);
503 err = uv_poll_start(&data->poll, UV_READABLE,
504 port_data_t::read_callback);
505 if (err < 0) {
506 nxt_unit_warn(ctx, "Failed to start uv.poll");
507 delete data;
508 return NXT_UNIT_ERROR;
509 }
510
511 port->data = data;
512
407 data->ctx = ctx;
408 data->port = port;
513 data->ref_count++;
514 data->poll.data = data;
515 }
516
517 return NXT_UNIT_OK;
518}
519
520
521void
522Unit::remove_port(nxt_unit_t *unit, nxt_unit_port_t *port)
523{
524 port_data_t *data;
525
526 if (port->data != NULL) {
527 data = (port_data_t *) port->data;
528
424 if (data->port == port) {
425 uv_poll_stop(&data->poll);
426
427 uv_close((uv_handle_t *) &data->poll, delete_port_data);
428 }
529 data->stop();
530 }
531}
532
533
433static void
434delete_port_data(uv_handle_t* handle)
435{
436 port_data_t *data;
437
438 data = (port_data_t *) handle->data;
439
440 delete data;
441}
442
443
534void
535Unit::quit_cb(nxt_unit_ctx_t *ctx)
536{
537 Unit *obj;
538
539 obj = reinterpret_cast<Unit *>(ctx->unit->data);
540
541 obj->quit(ctx);

--- 620 unchanged lines hidden ---