xref: /unit/src/nxt_port.h (revision 1131:ec7d924d8dfb)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #ifndef _NXT_PORT_H_INCLUDED_
8 #define _NXT_PORT_H_INCLUDED_
9 
10 
11 struct nxt_port_handlers_s {
12     /* RPC responses. */
13     nxt_port_handler_t  rpc_ready;
14     nxt_port_handler_t  rpc_error;
15 
16     /* Main process RPC requests. */
17     nxt_port_handler_t  start_worker;
18     nxt_port_handler_t  socket;
19     nxt_port_handler_t  modules;
20     nxt_port_handler_t  conf_store;
21     nxt_port_handler_t  cert_get;
22     nxt_port_handler_t  cert_delete;
23     nxt_port_handler_t  access_log;
24 
25     /* File descriptor exchange. */
26     nxt_port_handler_t  change_file;
27     nxt_port_handler_t  new_port;
28     nxt_port_handler_t  mmap;
29 
30     /* New process ready. */
31     nxt_port_handler_t  process_ready;
32 
33     /* Process exit/crash notification. */
34     nxt_port_handler_t  remove_pid;
35 
36     /* Stop process command. */
37     nxt_port_handler_t  quit;
38 
39     /* Request headers. */
40     nxt_port_handler_t  req_headers;
41 
42     /* Websocket frame. */
43     nxt_port_handler_t  websocket_frame;
44 
45     /* Various data. */
46     nxt_port_handler_t  data;
47 };
48 
49 
50 #define nxt_port_handler_idx(name)                                            \
51     ( offsetof(nxt_port_handlers_t, name) / sizeof(nxt_port_handler_t) )
52 
53 
54 typedef enum {
55     NXT_PORT_MSG_LAST           = 0x100,
56     NXT_PORT_MSG_CLOSE_FD       = 0x200,
57     NXT_PORT_MSG_SYNC           = 0x400,
58 
59     NXT_PORT_MSG_MASK           = 0xFF,
60 
61     _NXT_PORT_MSG_RPC_READY     = nxt_port_handler_idx(rpc_ready),
62     _NXT_PORT_MSG_RPC_ERROR     = nxt_port_handler_idx(rpc_error),
63 
64     _NXT_PORT_MSG_START_WORKER  = nxt_port_handler_idx(start_worker),
65     _NXT_PORT_MSG_SOCKET        = nxt_port_handler_idx(socket),
66     _NXT_PORT_MSG_MODULES       = nxt_port_handler_idx(modules),
67     _NXT_PORT_MSG_CONF_STORE    = nxt_port_handler_idx(conf_store),
68     _NXT_PORT_MSG_CERT_GET      = nxt_port_handler_idx(cert_get),
69     _NXT_PORT_MSG_CERT_DELETE   = nxt_port_handler_idx(cert_delete),
70     _NXT_PORT_MSG_ACCESS_LOG    = nxt_port_handler_idx(access_log),
71 
72     _NXT_PORT_MSG_CHANGE_FILE   = nxt_port_handler_idx(change_file),
73     _NXT_PORT_MSG_NEW_PORT      = nxt_port_handler_idx(new_port),
74     _NXT_PORT_MSG_MMAP          = nxt_port_handler_idx(mmap),
75 
76     _NXT_PORT_MSG_PROCESS_READY = nxt_port_handler_idx(process_ready),
77     _NXT_PORT_MSG_REMOVE_PID    = nxt_port_handler_idx(remove_pid),
78     _NXT_PORT_MSG_QUIT          = nxt_port_handler_idx(quit),
79 
80     _NXT_PORT_MSG_REQ_HEADERS   = nxt_port_handler_idx(req_headers),
81     _NXT_PORT_MSG_WEBSOCKET     = nxt_port_handler_idx(websocket_frame),
82 
83     _NXT_PORT_MSG_DATA          = nxt_port_handler_idx(data),
84 
85     NXT_PORT_MSG_MAX            = sizeof(nxt_port_handlers_t)
86                                   / sizeof(nxt_port_handler_t),
87 
88     NXT_PORT_MSG_RPC_READY      = _NXT_PORT_MSG_RPC_READY,
89     NXT_PORT_MSG_RPC_READY_LAST = _NXT_PORT_MSG_RPC_READY | NXT_PORT_MSG_LAST,
90     NXT_PORT_MSG_RPC_ERROR      = _NXT_PORT_MSG_RPC_ERROR | NXT_PORT_MSG_LAST,
91 
92     NXT_PORT_MSG_START_WORKER   = _NXT_PORT_MSG_START_WORKER
93                                   | NXT_PORT_MSG_LAST,
94     NXT_PORT_MSG_SOCKET         = _NXT_PORT_MSG_SOCKET | NXT_PORT_MSG_LAST,
95     NXT_PORT_MSG_MODULES        = _NXT_PORT_MSG_MODULES | NXT_PORT_MSG_LAST,
96     NXT_PORT_MSG_CONF_STORE     = _NXT_PORT_MSG_CONF_STORE | NXT_PORT_MSG_LAST,
97     NXT_PORT_MSG_CERT_GET       = _NXT_PORT_MSG_CERT_GET | NXT_PORT_MSG_LAST,
98     NXT_PORT_MSG_CERT_DELETE    = _NXT_PORT_MSG_CERT_DELETE | NXT_PORT_MSG_LAST,
99     NXT_PORT_MSG_ACCESS_LOG     = _NXT_PORT_MSG_ACCESS_LOG | NXT_PORT_MSG_LAST,
100 
101     NXT_PORT_MSG_CHANGE_FILE    = _NXT_PORT_MSG_CHANGE_FILE | NXT_PORT_MSG_LAST,
102     NXT_PORT_MSG_NEW_PORT       = _NXT_PORT_MSG_NEW_PORT | NXT_PORT_MSG_LAST,
103     NXT_PORT_MSG_MMAP           = _NXT_PORT_MSG_MMAP | NXT_PORT_MSG_LAST
104                                   | NXT_PORT_MSG_CLOSE_FD | NXT_PORT_MSG_SYNC,
105 
106     NXT_PORT_MSG_PROCESS_READY  = _NXT_PORT_MSG_PROCESS_READY
107                                   | NXT_PORT_MSG_LAST,
108     NXT_PORT_MSG_QUIT           = _NXT_PORT_MSG_QUIT | NXT_PORT_MSG_LAST,
109     NXT_PORT_MSG_REMOVE_PID     = _NXT_PORT_MSG_REMOVE_PID | NXT_PORT_MSG_LAST,
110 
111     NXT_PORT_MSG_REQ_HEADERS    = _NXT_PORT_MSG_REQ_HEADERS,
112     NXT_PORT_MSG_WEBSOCKET      = _NXT_PORT_MSG_WEBSOCKET,
113     NXT_PORT_MSG_WEBSOCKET_LAST = _NXT_PORT_MSG_WEBSOCKET | NXT_PORT_MSG_LAST,
114 
115     NXT_PORT_MSG_DATA           = _NXT_PORT_MSG_DATA,
116     NXT_PORT_MSG_DATA_LAST      = _NXT_PORT_MSG_DATA | NXT_PORT_MSG_LAST,
117 } nxt_port_msg_type_t;
118 
119 
120 /* Passed as a first iov chunk. */
121 typedef struct {
122     uint32_t             stream;
123     nxt_pid_t            pid;
124     nxt_port_id_t        reply_port;
125 
126     uint8_t              type;
127 
128     /* Last message for this stream. */
129     uint8_t              last;      /* 1 bit */
130 
131     /* Message data send using mmap, next chunk is a nxt_port_mmap_msg_t. */
132     uint8_t              mmap;      /* 1 bit */
133 
134     /* Non-First fragment in fragmented message sequence. */
135     uint8_t              nf;        /* 1 bit */
136 
137     /* More Fragments followed. */
138     uint8_t              mf;        /* 1 bit */
139 
140     /* Message delivery tracking enabled, next chunk is tracking msg. */
141     uint8_t              tracking;  /* 1 bit */
142 } nxt_port_msg_t;
143 
144 
145 typedef struct {
146     nxt_queue_link_t    link;
147     nxt_buf_t           *buf;
148     size_t              share;
149     nxt_fd_t            fd;
150     nxt_port_msg_t      port_msg;
151     uint32_t            tracking_msg[2];
152     uint8_t             close_fd;   /* 1 bit */
153     uint8_t             allocated;  /* 1 bit */
154 } nxt_port_send_msg_t;
155 
156 
157 struct nxt_port_recv_msg_s {
158     nxt_fd_t            fd;
159     nxt_buf_t           *buf;
160     nxt_port_t          *port;
161     nxt_port_msg_t      port_msg;
162     size_t              size;
163     nxt_bool_t          cancelled;
164     union {
165         nxt_port_t      *new_port;
166         nxt_pid_t       removed_pid;
167         void            *data;
168     } u;
169 };
170 
171 typedef struct nxt_app_s  nxt_app_t;
172 
173 struct nxt_port_s {
174     nxt_fd_event_t      socket;
175 
176     nxt_queue_link_t    link;       /* for nxt_process_t.ports */
177     nxt_process_t       *process;
178 
179     nxt_queue_link_t    app_link;   /* for nxt_app_t.ports */
180     nxt_app_t           *app;
181 
182     nxt_queue_link_t    idle_link;  /* for nxt_app_t.idle_ports */
183     nxt_msec_t          idle_start;
184 
185     nxt_queue_t         messages;   /* of nxt_port_send_msg_t */
186     nxt_thread_mutex_t  write_mutex;
187 
188     /* Maximum size of message part. */
189     uint32_t            max_size;
190     /* Maximum interleave of message parts. */
191     uint32_t            max_share;
192 
193     uint32_t            app_pending_responses;
194     uint32_t            app_responses;
195     nxt_queue_t         pending_requests;
196 
197     nxt_queue_t         active_websockets;
198 
199     nxt_port_handler_t  handler;
200     nxt_port_handler_t  *data;
201 
202     nxt_mp_t            *mem_pool;
203     nxt_event_engine_t  *engine;
204 
205     nxt_buf_t           *free_bufs;
206     nxt_socket_t        pair[2];
207 
208     nxt_port_id_t       id;
209     nxt_pid_t           pid;
210 
211     nxt_lvlhsh_t        rpc_streams; /* stream to nxt_port_rpc_reg_t */
212     nxt_lvlhsh_t        rpc_peers;   /* peer to queue of nxt_port_rpc_reg_t */
213 
214     nxt_lvlhsh_t        frags;
215 
216     nxt_atomic_t        use_count;
217 
218     nxt_process_type_t  type;
219 };
220 
221 
222 typedef struct {
223     nxt_port_id_t       id;
224     nxt_pid_t           pid;
225     size_t              max_size;
226     size_t              max_share;
227     nxt_process_type_t  type:8;
228 } nxt_port_msg_new_port_t;
229 
230 
231 /*
232  * nxt_port_data_t size is allocation size
233  * which enables effective reuse of memory pool cache.
234  */
235 typedef union {
236     nxt_buf_t                buf;
237     nxt_port_msg_new_port_t  new_port;
238 } nxt_port_data_t;
239 
240 
241 typedef void (*nxt_port_post_handler_t)(nxt_task_t *task, nxt_port_t *port,
242     void *data);
243 
244 nxt_port_t *nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid,
245     nxt_process_type_t type);
246 
247 nxt_port_id_t nxt_port_get_next_id(void);
248 void nxt_port_reset_next_id(void);
249 
250 nxt_int_t nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port,
251     size_t max_size);
252 void nxt_port_destroy(nxt_port_t *port);
253 void nxt_port_close(nxt_task_t *task, nxt_port_t *port);
254 void nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port);
255 void nxt_port_write_close(nxt_port_t *port);
256 void nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port);
257 void nxt_port_read_close(nxt_port_t *port);
258 nxt_int_t nxt_port_socket_twrite(nxt_task_t *task, nxt_port_t *port,
259     nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port,
260     nxt_buf_t *b, void *tracking);
261 
262 nxt_inline nxt_int_t
263 nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port,
264     nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port,
265     nxt_buf_t *b)
266 {
267     return nxt_port_socket_twrite(task, port, type, fd, stream, reply_port, b,
268                                   NULL);
269 }
270 
271 void nxt_port_enable(nxt_task_t *task, nxt_port_t *port,
272     nxt_port_handlers_t *handlers);
273 nxt_int_t nxt_port_send_port(nxt_task_t *task, nxt_port_t *port,
274     nxt_port_t *new_port, uint32_t stream);
275 void nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt,
276     nxt_uint_t slot, nxt_fd_t fd);
277 
278 void nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
279 void nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
280 void nxt_port_process_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
281 void nxt_port_change_log_file_handler(nxt_task_t *task,
282     nxt_port_recv_msg_t *msg);
283 void nxt_port_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
284 void nxt_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
285 void nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
286 void nxt_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
287 
288 nxt_int_t nxt_port_post(nxt_task_t *task, nxt_port_t *port,
289     nxt_port_post_handler_t handler, void *data);
290 void nxt_port_use(nxt_task_t *task, nxt_port_t *port, int i);
291 
292 nxt_inline void nxt_port_inc_use(nxt_port_t *port)
293 {
294     nxt_atomic_fetch_add(&port->use_count, 1);
295 }
296 
297 #endif /* _NXT_PORT_H_INCLUDED_ */
298