xref: /unit/src/nxt_port.h (revision 1546:06017e6e3a5f)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #ifndef _NXT_PORT_H_INCLUDED_
8 #define _NXT_PORT_H_INCLUDED_
9 
10 
11 struct nxt_port_handlers_s {
12     /* RPC responses. */
13     nxt_port_handler_t  rpc_ready;
14     nxt_port_handler_t  rpc_error;
15 
16     /* Main process RPC requests. */
17     nxt_port_handler_t  start_process;
18     nxt_port_handler_t  socket;
19     nxt_port_handler_t  modules;
20     nxt_port_handler_t  conf_store;
21     nxt_port_handler_t  cert_get;
22     nxt_port_handler_t  cert_delete;
23     nxt_port_handler_t  access_log;
24 
25     /* File descriptor exchange. */
26     nxt_port_handler_t  change_file;
27     nxt_port_handler_t  new_port;
28     nxt_port_handler_t  get_port;
29     nxt_port_handler_t  mmap;
30     nxt_port_handler_t  get_mmap;
31 
32     /* New process */
33     nxt_port_handler_t  process_created;
34     nxt_port_handler_t  process_ready;
35 
36     /* Process exit/crash notification. */
37     nxt_port_handler_t  remove_pid;
38 
39     /* Stop process command. */
40     nxt_port_handler_t  quit;
41 
42     /* Request headers. */
43     nxt_port_handler_t  req_headers;
44 
45     /* Websocket frame. */
46     nxt_port_handler_t  websocket_frame;
47 
48     /* Various data. */
49     nxt_port_handler_t  data;
50 
51     nxt_port_handler_t  oosm;
52     nxt_port_handler_t  shm_ack;
53 };
54 
55 
56 #define nxt_port_handler_idx(name)                                            \
57     ( offsetof(nxt_port_handlers_t, name) / sizeof(nxt_port_handler_t) )
58 
59 #define nxt_msg_last(handler)                                                 \
60     (handler | NXT_PORT_MSG_LAST)
61 
62 typedef enum {
63     NXT_PORT_MSG_LAST             = 0x100,
64     NXT_PORT_MSG_CLOSE_FD         = 0x200,
65     NXT_PORT_MSG_SYNC             = 0x400,
66 
67     NXT_PORT_MSG_MASK             = 0xFF,
68 
69     _NXT_PORT_MSG_RPC_READY       = nxt_port_handler_idx(rpc_ready),
70     _NXT_PORT_MSG_RPC_ERROR       = nxt_port_handler_idx(rpc_error),
71 
72     _NXT_PORT_MSG_START_PROCESS   = nxt_port_handler_idx(start_process),
73     _NXT_PORT_MSG_SOCKET          = nxt_port_handler_idx(socket),
74     _NXT_PORT_MSG_MODULES         = nxt_port_handler_idx(modules),
75     _NXT_PORT_MSG_CONF_STORE      = nxt_port_handler_idx(conf_store),
76     _NXT_PORT_MSG_CERT_GET        = nxt_port_handler_idx(cert_get),
77     _NXT_PORT_MSG_CERT_DELETE     = nxt_port_handler_idx(cert_delete),
78     _NXT_PORT_MSG_ACCESS_LOG      = nxt_port_handler_idx(access_log),
79 
80     _NXT_PORT_MSG_CHANGE_FILE     = nxt_port_handler_idx(change_file),
81     _NXT_PORT_MSG_NEW_PORT        = nxt_port_handler_idx(new_port),
82     _NXT_PORT_MSG_GET_PORT        = nxt_port_handler_idx(get_port),
83     _NXT_PORT_MSG_MMAP            = nxt_port_handler_idx(mmap),
84     _NXT_PORT_MSG_GET_MMAP        = nxt_port_handler_idx(get_mmap),
85 
86     _NXT_PORT_MSG_PROCESS_CREATED = nxt_port_handler_idx(process_created),
87     _NXT_PORT_MSG_PROCESS_READY   = nxt_port_handler_idx(process_ready),
88     _NXT_PORT_MSG_REMOVE_PID      = nxt_port_handler_idx(remove_pid),
89     _NXT_PORT_MSG_QUIT            = nxt_port_handler_idx(quit),
90 
91     _NXT_PORT_MSG_REQ_HEADERS     = nxt_port_handler_idx(req_headers),
92     _NXT_PORT_MSG_WEBSOCKET       = nxt_port_handler_idx(websocket_frame),
93 
94     _NXT_PORT_MSG_DATA            = nxt_port_handler_idx(data),
95 
96     _NXT_PORT_MSG_OOSM            = nxt_port_handler_idx(oosm),
97     _NXT_PORT_MSG_SHM_ACK         = nxt_port_handler_idx(shm_ack),
98 
99     NXT_PORT_MSG_MAX              = sizeof(nxt_port_handlers_t)
100                                     / sizeof(nxt_port_handler_t),
101 
102     NXT_PORT_MSG_RPC_READY        = _NXT_PORT_MSG_RPC_READY,
103     NXT_PORT_MSG_RPC_READY_LAST   = nxt_msg_last(_NXT_PORT_MSG_RPC_READY),
104     NXT_PORT_MSG_RPC_ERROR        = nxt_msg_last(_NXT_PORT_MSG_RPC_ERROR),
105     NXT_PORT_MSG_START_PROCESS    = nxt_msg_last(_NXT_PORT_MSG_START_PROCESS),
106     NXT_PORT_MSG_SOCKET           = nxt_msg_last(_NXT_PORT_MSG_SOCKET),
107     NXT_PORT_MSG_MODULES          = nxt_msg_last(_NXT_PORT_MSG_MODULES),
108     NXT_PORT_MSG_CONF_STORE       = nxt_msg_last(_NXT_PORT_MSG_CONF_STORE),
109     NXT_PORT_MSG_CERT_GET         = nxt_msg_last(_NXT_PORT_MSG_CERT_GET),
110     NXT_PORT_MSG_CERT_DELETE      = nxt_msg_last(_NXT_PORT_MSG_CERT_DELETE),
111     NXT_PORT_MSG_ACCESS_LOG       = nxt_msg_last(_NXT_PORT_MSG_ACCESS_LOG),
112     NXT_PORT_MSG_CHANGE_FILE      = nxt_msg_last(_NXT_PORT_MSG_CHANGE_FILE),
113     NXT_PORT_MSG_NEW_PORT         = nxt_msg_last(_NXT_PORT_MSG_NEW_PORT),
114     NXT_PORT_MSG_GET_PORT         = nxt_msg_last(_NXT_PORT_MSG_GET_PORT),
115     NXT_PORT_MSG_MMAP             = nxt_msg_last(_NXT_PORT_MSG_MMAP)
116                                     | NXT_PORT_MSG_CLOSE_FD | NXT_PORT_MSG_SYNC,
117 
118     NXT_PORT_MSG_PROCESS_CREATED  = nxt_msg_last(_NXT_PORT_MSG_PROCESS_CREATED),
119     NXT_PORT_MSG_PROCESS_READY    = nxt_msg_last(_NXT_PORT_MSG_PROCESS_READY),
120     NXT_PORT_MSG_QUIT             = nxt_msg_last(_NXT_PORT_MSG_QUIT),
121     NXT_PORT_MSG_REMOVE_PID       = nxt_msg_last(_NXT_PORT_MSG_REMOVE_PID),
122 
123     NXT_PORT_MSG_REQ_HEADERS      = _NXT_PORT_MSG_REQ_HEADERS,
124     NXT_PORT_MSG_WEBSOCKET        = _NXT_PORT_MSG_WEBSOCKET,
125     NXT_PORT_MSG_WEBSOCKET_LAST   = nxt_msg_last(_NXT_PORT_MSG_WEBSOCKET),
126 
127     NXT_PORT_MSG_DATA             = _NXT_PORT_MSG_DATA,
128     NXT_PORT_MSG_DATA_LAST        = nxt_msg_last(_NXT_PORT_MSG_DATA),
129 
130     NXT_PORT_MSG_OOSM             = nxt_msg_last(_NXT_PORT_MSG_OOSM),
131     NXT_PORT_MSG_SHM_ACK          = nxt_msg_last(_NXT_PORT_MSG_SHM_ACK),
132 } nxt_port_msg_type_t;
133 
134 
135 /* Passed as a first iov chunk. */
136 typedef struct {
137     uint32_t             stream;
138     nxt_pid_t            pid;
139     nxt_port_id_t        reply_port;
140 
141     uint8_t              type;
142 
143     /* Last message for this stream. */
144     uint8_t              last;      /* 1 bit */
145 
146     /* Message data send using mmap, next chunk is a nxt_port_mmap_msg_t. */
147     uint8_t              mmap;      /* 1 bit */
148 
149     /* Non-First fragment in fragmented message sequence. */
150     uint8_t              nf;        /* 1 bit */
151 
152     /* More Fragments followed. */
153     uint8_t              mf;        /* 1 bit */
154 
155     /* Message delivery tracking enabled, next chunk is tracking msg. */
156     uint8_t              tracking;  /* 1 bit */
157 } nxt_port_msg_t;
158 
159 
160 typedef struct {
161     nxt_queue_link_t    link;
162     nxt_buf_t           *buf;
163     size_t              share;
164     nxt_fd_t            fd;
165     nxt_port_msg_t      port_msg;
166     uint32_t            tracking_msg[2];
167     uint8_t             close_fd;   /* 1 bit */
168     uint8_t             allocated;  /* 1 bit */
169 } nxt_port_send_msg_t;
170 
171 
172 struct nxt_port_recv_msg_s {
173     nxt_fd_t            fd;
174     nxt_buf_t           *buf;
175     nxt_port_t          *port;
176     nxt_port_msg_t      port_msg;
177     size_t              size;
178     nxt_bool_t          cancelled;
179     union {
180         nxt_port_t      *new_port;
181         nxt_pid_t       removed_pid;
182         void            *data;
183     } u;
184 };
185 
186 typedef struct nxt_app_s  nxt_app_t;
187 
188 struct nxt_port_s {
189     nxt_fd_event_t      socket;
190 
191     nxt_queue_link_t    link;       /* for nxt_process_t.ports */
192     nxt_process_t       *process;
193 
194     nxt_queue_link_t    app_link;   /* for nxt_app_t.ports */
195     nxt_app_t           *app;
196 
197     nxt_queue_link_t    idle_link;  /* for nxt_app_t.idle_ports */
198     nxt_msec_t          idle_start;
199 
200     nxt_queue_t         messages;   /* of nxt_port_send_msg_t */
201     nxt_thread_mutex_t  write_mutex;
202 
203     /* Maximum size of message part. */
204     uint32_t            max_size;
205     /* Maximum interleave of message parts. */
206     uint32_t            max_share;
207 
208     uint32_t            app_pending_responses;
209     uint32_t            app_responses;
210     nxt_queue_t         pending_requests;
211 
212     nxt_queue_t         active_websockets;
213 
214     nxt_port_handler_t  handler;
215     nxt_port_handler_t  *data;
216 
217     nxt_mp_t            *mem_pool;
218     nxt_event_engine_t  *engine;
219 
220     nxt_buf_t           *free_bufs;
221     nxt_socket_t        pair[2];
222 
223     nxt_port_id_t       id;
224     nxt_pid_t           pid;
225 
226     nxt_lvlhsh_t        rpc_streams; /* stream to nxt_port_rpc_reg_t */
227     nxt_lvlhsh_t        rpc_peers;   /* peer to queue of nxt_port_rpc_reg_t */
228 
229     nxt_lvlhsh_t        frags;
230 
231     nxt_atomic_t        use_count;
232 
233     nxt_process_type_t  type;
234 };
235 
236 
237 typedef struct {
238     nxt_port_id_t       id;
239     nxt_pid_t           pid;
240     size_t              max_size;
241     size_t              max_share;
242     nxt_process_type_t  type:8;
243 } nxt_port_msg_new_port_t;
244 
245 
246 typedef struct {
247     nxt_port_id_t       id;
248     nxt_pid_t           pid;
249 } nxt_port_msg_get_port_t;
250 
251 
252 typedef struct {
253     uint32_t            id;
254 } nxt_port_msg_get_mmap_t;
255 
256 
257 /*
258  * nxt_port_data_t size is allocation size
259  * which enables effective reuse of memory pool cache.
260  */
261 typedef union {
262     nxt_buf_t                buf;
263     nxt_port_msg_new_port_t  new_port;
264 } nxt_port_data_t;
265 
266 
267 typedef void (*nxt_port_post_handler_t)(nxt_task_t *task, nxt_port_t *port,
268     void *data);
269 
270 nxt_port_t *nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid,
271     nxt_process_type_t type);
272 
273 nxt_port_id_t nxt_port_get_next_id(void);
274 void nxt_port_reset_next_id(void);
275 
276 nxt_int_t nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port,
277     size_t max_size);
278 void nxt_port_destroy(nxt_port_t *port);
279 void nxt_port_close(nxt_task_t *task, nxt_port_t *port);
280 void nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port);
281 void nxt_port_write_close(nxt_port_t *port);
282 void nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port);
283 void nxt_port_read_close(nxt_port_t *port);
284 nxt_int_t nxt_port_socket_twrite(nxt_task_t *task, nxt_port_t *port,
285     nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port,
286     nxt_buf_t *b, void *tracking);
287 
288 nxt_inline nxt_int_t
289 nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port,
290     nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port,
291     nxt_buf_t *b)
292 {
293     return nxt_port_socket_twrite(task, port, type, fd, stream, reply_port, b,
294                                   NULL);
295 }
296 
297 void nxt_port_enable(nxt_task_t *task, nxt_port_t *port,
298     const nxt_port_handlers_t *handlers);
299 nxt_int_t nxt_port_send_port(nxt_task_t *task, nxt_port_t *port,
300     nxt_port_t *new_port, uint32_t stream);
301 void nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt,
302     nxt_uint_t slot, nxt_fd_t fd);
303 
304 void nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
305 void nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
306 void nxt_port_process_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
307 void nxt_port_change_log_file_handler(nxt_task_t *task,
308     nxt_port_recv_msg_t *msg);
309 void nxt_port_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
310 void nxt_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
311 void nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
312 void nxt_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
313 
314 nxt_int_t nxt_port_post(nxt_task_t *task, nxt_port_t *port,
315     nxt_port_post_handler_t handler, void *data);
316 void nxt_port_use(nxt_task_t *task, nxt_port_t *port, int i);
317 
318 nxt_inline void nxt_port_inc_use(nxt_port_t *port)
319 {
320     nxt_atomic_fetch_add(&port->use_count, 1);
321 }
322 
323 #endif /* _NXT_PORT_H_INCLUDED_ */
324