1
2 /*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7 #ifndef _NXT_PORT_H_INCLUDED_
8 #define _NXT_PORT_H_INCLUDED_
9
10
11 struct nxt_port_handlers_s {
12 /* RPC responses. */
13 nxt_port_handler_t rpc_ready;
14 nxt_port_handler_t rpc_error;
15
16 /* Main process RPC requests. */
17 nxt_port_handler_t start_process;
18 nxt_port_handler_t socket;
19 nxt_port_handler_t modules;
20 nxt_port_handler_t conf_store;
21 nxt_port_handler_t cert_get;
22 nxt_port_handler_t cert_delete;
23 nxt_port_handler_t access_log;
24
25 /* File descriptor exchange. */
26 nxt_port_handler_t change_file;
27 nxt_port_handler_t new_port;
28 nxt_port_handler_t get_port;
29 nxt_port_handler_t port_ack;
30 nxt_port_handler_t mmap;
31 nxt_port_handler_t get_mmap;
32
33 /* New process */
34 nxt_port_handler_t process_created;
35 nxt_port_handler_t process_ready;
36 nxt_port_handler_t whoami;
37
38 /* Process exit/crash notification. */
39 nxt_port_handler_t remove_pid;
40
41 /* Stop process command. */
42 nxt_port_handler_t quit;
43
44 /* Request headers. */
45 nxt_port_handler_t req_headers;
46 nxt_port_handler_t req_headers_ack;
47 nxt_port_handler_t req_body;
48
49 /* Websocket frame. */
50 nxt_port_handler_t websocket_frame;
51
52 /* Various data. */
53 nxt_port_handler_t data;
54 nxt_port_handler_t app_restart;
55
56 nxt_port_handler_t oosm;
57 nxt_port_handler_t shm_ack;
58 nxt_port_handler_t read_queue;
59 nxt_port_handler_t read_socket;
60 };
61
62
63 #define nxt_port_handler_idx(name) \
64 ( offsetof(nxt_port_handlers_t, name) / sizeof(nxt_port_handler_t) )
65
66 #define nxt_msg_last(handler) \
67 (handler | NXT_PORT_MSG_LAST)
68
69 typedef enum {
70 NXT_PORT_MSG_LAST = 0x100,
71 NXT_PORT_MSG_CLOSE_FD = 0x200,
72 NXT_PORT_MSG_SYNC = 0x400,
73
74 NXT_PORT_MSG_MASK = 0xFF,
75
76 _NXT_PORT_MSG_RPC_READY = nxt_port_handler_idx(rpc_ready),
77 _NXT_PORT_MSG_RPC_ERROR = nxt_port_handler_idx(rpc_error),
78
79 _NXT_PORT_MSG_START_PROCESS = nxt_port_handler_idx(start_process),
80 _NXT_PORT_MSG_SOCKET = nxt_port_handler_idx(socket),
81 _NXT_PORT_MSG_MODULES = nxt_port_handler_idx(modules),
82 _NXT_PORT_MSG_CONF_STORE = nxt_port_handler_idx(conf_store),
83 _NXT_PORT_MSG_CERT_GET = nxt_port_handler_idx(cert_get),
84 _NXT_PORT_MSG_CERT_DELETE = nxt_port_handler_idx(cert_delete),
85 _NXT_PORT_MSG_ACCESS_LOG = nxt_port_handler_idx(access_log),
86
87 _NXT_PORT_MSG_CHANGE_FILE = nxt_port_handler_idx(change_file),
88 _NXT_PORT_MSG_NEW_PORT = nxt_port_handler_idx(new_port),
89 _NXT_PORT_MSG_GET_PORT = nxt_port_handler_idx(get_port),
90 _NXT_PORT_MSG_PORT_ACK = nxt_port_handler_idx(port_ack),
91 _NXT_PORT_MSG_MMAP = nxt_port_handler_idx(mmap),
92 _NXT_PORT_MSG_GET_MMAP = nxt_port_handler_idx(get_mmap),
93
94 _NXT_PORT_MSG_PROCESS_CREATED = nxt_port_handler_idx(process_created),
95 _NXT_PORT_MSG_PROCESS_READY = nxt_port_handler_idx(process_ready),
96 _NXT_PORT_MSG_WHOAMI = nxt_port_handler_idx(whoami),
97 _NXT_PORT_MSG_REMOVE_PID = nxt_port_handler_idx(remove_pid),
98 _NXT_PORT_MSG_QUIT = nxt_port_handler_idx(quit),
99
100 _NXT_PORT_MSG_REQ_HEADERS = nxt_port_handler_idx(req_headers),
101 _NXT_PORT_MSG_REQ_HEADERS_ACK = nxt_port_handler_idx(req_headers_ack),
102 _NXT_PORT_MSG_REQ_BODY = nxt_port_handler_idx(req_body),
103 _NXT_PORT_MSG_WEBSOCKET = nxt_port_handler_idx(websocket_frame),
104
105 _NXT_PORT_MSG_DATA = nxt_port_handler_idx(data),
106 _NXT_PORT_MSG_APP_RESTART = nxt_port_handler_idx(app_restart),
107
108 _NXT_PORT_MSG_OOSM = nxt_port_handler_idx(oosm),
109 _NXT_PORT_MSG_SHM_ACK = nxt_port_handler_idx(shm_ack),
110 _NXT_PORT_MSG_READ_QUEUE = nxt_port_handler_idx(read_queue),
111 _NXT_PORT_MSG_READ_SOCKET = nxt_port_handler_idx(read_socket),
112
113 NXT_PORT_MSG_MAX = sizeof(nxt_port_handlers_t)
114 / sizeof(nxt_port_handler_t),
115
116 NXT_PORT_MSG_RPC_READY = _NXT_PORT_MSG_RPC_READY,
117 NXT_PORT_MSG_RPC_READY_LAST = nxt_msg_last(_NXT_PORT_MSG_RPC_READY),
118 NXT_PORT_MSG_RPC_ERROR = nxt_msg_last(_NXT_PORT_MSG_RPC_ERROR),
119 NXT_PORT_MSG_START_PROCESS = nxt_msg_last(_NXT_PORT_MSG_START_PROCESS),
120 NXT_PORT_MSG_SOCKET = nxt_msg_last(_NXT_PORT_MSG_SOCKET),
121 NXT_PORT_MSG_MODULES = nxt_msg_last(_NXT_PORT_MSG_MODULES),
122 NXT_PORT_MSG_CONF_STORE = nxt_msg_last(_NXT_PORT_MSG_CONF_STORE),
123 NXT_PORT_MSG_CERT_GET = nxt_msg_last(_NXT_PORT_MSG_CERT_GET),
124 NXT_PORT_MSG_CERT_DELETE = nxt_msg_last(_NXT_PORT_MSG_CERT_DELETE),
125 NXT_PORT_MSG_ACCESS_LOG = nxt_msg_last(_NXT_PORT_MSG_ACCESS_LOG),
126 NXT_PORT_MSG_CHANGE_FILE = nxt_msg_last(_NXT_PORT_MSG_CHANGE_FILE),
127 NXT_PORT_MSG_NEW_PORT = nxt_msg_last(_NXT_PORT_MSG_NEW_PORT),
128 NXT_PORT_MSG_GET_PORT = nxt_msg_last(_NXT_PORT_MSG_GET_PORT),
129 NXT_PORT_MSG_PORT_ACK = nxt_msg_last(_NXT_PORT_MSG_PORT_ACK),
130 NXT_PORT_MSG_MMAP = nxt_msg_last(_NXT_PORT_MSG_MMAP)
131 | NXT_PORT_MSG_SYNC,
132 NXT_PORT_MSG_GET_MMAP = nxt_msg_last(_NXT_PORT_MSG_GET_MMAP),
133
134 NXT_PORT_MSG_PROCESS_CREATED = nxt_msg_last(_NXT_PORT_MSG_PROCESS_CREATED),
135 NXT_PORT_MSG_PROCESS_READY = nxt_msg_last(_NXT_PORT_MSG_PROCESS_READY),
136 NXT_PORT_MSG_WHOAMI = nxt_msg_last(_NXT_PORT_MSG_WHOAMI),
137 NXT_PORT_MSG_QUIT = nxt_msg_last(_NXT_PORT_MSG_QUIT),
138 NXT_PORT_MSG_REMOVE_PID = nxt_msg_last(_NXT_PORT_MSG_REMOVE_PID),
139
140 NXT_PORT_MSG_REQ_HEADERS = _NXT_PORT_MSG_REQ_HEADERS,
141 NXT_PORT_MSG_REQ_BODY = _NXT_PORT_MSG_REQ_BODY,
142 NXT_PORT_MSG_WEBSOCKET = _NXT_PORT_MSG_WEBSOCKET,
143 NXT_PORT_MSG_WEBSOCKET_LAST = nxt_msg_last(_NXT_PORT_MSG_WEBSOCKET),
144
145 NXT_PORT_MSG_DATA = _NXT_PORT_MSG_DATA,
146 NXT_PORT_MSG_DATA_LAST = nxt_msg_last(_NXT_PORT_MSG_DATA),
147 NXT_PORT_MSG_APP_RESTART = nxt_msg_last(_NXT_PORT_MSG_APP_RESTART),
148
149 NXT_PORT_MSG_OOSM = nxt_msg_last(_NXT_PORT_MSG_OOSM),
150 NXT_PORT_MSG_SHM_ACK = nxt_msg_last(_NXT_PORT_MSG_SHM_ACK),
151 NXT_PORT_MSG_READ_QUEUE = _NXT_PORT_MSG_READ_QUEUE,
152 NXT_PORT_MSG_READ_SOCKET = _NXT_PORT_MSG_READ_SOCKET,
153 } nxt_port_msg_type_t;
154
155
156 /* Passed as a first iov chunk. */
157 typedef struct {
158 uint32_t stream;
159
160 nxt_pid_t pid; /* not used on Linux and FreeBSD */
161
162 nxt_port_id_t reply_port;
163
164 uint8_t type;
165
166 /* Last message for this stream. */
167 uint8_t last; /* 1 bit */
168
169 /* Message data send using mmap, next chunk is a nxt_port_mmap_msg_t. */
170 uint8_t mmap; /* 1 bit */
171
172 /* Non-First fragment in fragmented message sequence. */
173 uint8_t nf; /* 1 bit */
174
175 /* More Fragments followed. */
176 uint8_t mf; /* 1 bit */
177 } nxt_port_msg_t;
178
179
180 typedef struct {
181 nxt_queue_link_t link;
182 nxt_buf_t *buf;
183 size_t share;
184 nxt_fd_t fd[2];
185 nxt_port_msg_t port_msg;
186 uint8_t close_fd; /* 1 bit */
187 uint8_t allocated; /* 1 bit */
188 } nxt_port_send_msg_t;
189
190 #if (NXT_HAVE_UCRED) || (NXT_HAVE_MSGHDR_CMSGCRED)
191 #define NXT_USE_CMSG_PID 1
192 #endif
193
194 struct nxt_port_recv_msg_s {
195 nxt_fd_t fd[2];
196 nxt_buf_t *buf;
197 nxt_port_t *port;
198 nxt_port_msg_t port_msg;
199 size_t size;
200 #if (NXT_USE_CMSG_PID)
201 nxt_pid_t cmsg_pid;
202 #endif
203 nxt_bool_t cancelled;
204 union {
205 nxt_port_t *new_port;
206 nxt_pid_t removed_pid;
207 void *data;
208 } u;
209 };
210
211
212 #if (NXT_USE_CMSG_PID)
213 #define nxt_recv_msg_cmsg_pid(msg) ((msg)->cmsg_pid)
214 #define nxt_recv_msg_cmsg_pid_ref(msg) (&(msg)->cmsg_pid)
215 #else
216 #define nxt_recv_msg_cmsg_pid(msg) ((msg)->port_msg.pid)
217 #define nxt_recv_msg_cmsg_pid_ref(msg) (NULL)
218 #endif
219
220 typedef struct nxt_app_s nxt_app_t;
221
222 struct nxt_port_s {
223 nxt_fd_event_t socket;
224
225 nxt_queue_link_t link; /* for nxt_process_t.ports */
226 nxt_process_t *process;
227
228 nxt_queue_link_t app_link; /* for nxt_app_t.ports */
229 nxt_app_t *app;
230 nxt_port_t *main_app_port;
231
232 nxt_queue_link_t idle_link; /* for nxt_app_t.idle_ports */
233 nxt_msec_t idle_start;
234
235 nxt_queue_t messages; /* of nxt_port_send_msg_t */
236 nxt_thread_mutex_t write_mutex;
237
238 /* Maximum size of message part. */
239 uint32_t max_size;
240 /* Maximum interleave of message parts. */
241 uint32_t max_share;
242
243 uint32_t active_websockets;
244 uint32_t active_requests;
245
246 nxt_port_handler_t handler;
247 nxt_port_handler_t *data;
248
249 nxt_mp_t *mem_pool;
250 nxt_event_engine_t *engine;
251
252 nxt_buf_t *free_bufs;
253 nxt_socket_t pair[2];
254
255 nxt_port_id_t id;
256 nxt_pid_t pid;
257
258 nxt_lvlhsh_t rpc_streams; /* stream to nxt_port_rpc_reg_t */
259 nxt_lvlhsh_t rpc_peers; /* peer to queue of nxt_port_rpc_reg_t */
260
261 nxt_lvlhsh_t frags;
262
263 nxt_atomic_t use_count;
264
265 nxt_process_type_t type;
266
267 nxt_fd_t queue_fd;
268 void *queue;
269
270 void *socket_msg;
271 int from_socket;
272 };
273
274
275 typedef struct {
276 nxt_port_id_t id;
277 nxt_pid_t pid;
278 size_t max_size;
279 size_t max_share;
280 nxt_process_type_t type:8;
281 } nxt_port_msg_new_port_t;
282
283
284 typedef struct {
285 nxt_port_id_t id;
286 nxt_pid_t pid;
287 } nxt_port_msg_get_port_t;
288
289
290 typedef struct {
291 uint32_t id;
292 } nxt_port_msg_get_mmap_t;
293
294
295 /*
296 * nxt_port_data_t size is allocation size
297 * which enables effective reuse of memory pool cache.
298 */
299 typedef union {
300 nxt_buf_t buf;
301 nxt_port_msg_new_port_t new_port;
302 } nxt_port_data_t;
303
304
305 typedef void (*nxt_port_post_handler_t)(nxt_task_t *task, nxt_port_t *port,
306 void *data);
307
308 nxt_port_t *nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid,
309 nxt_process_type_t type);
310
311 nxt_port_id_t nxt_port_get_next_id(void);
312 void nxt_port_reset_next_id(void);
313
314 nxt_int_t nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port,
315 size_t max_size);
316 void nxt_port_destroy(nxt_port_t *port);
317 void nxt_port_close(nxt_task_t *task, nxt_port_t *port);
318 void nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port);
319 void nxt_port_write_close(nxt_port_t *port);
320 void nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port);
321 void nxt_port_read_close(nxt_port_t *port);
322 nxt_int_t nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port,
323 nxt_uint_t type, nxt_fd_t fd, nxt_fd_t fd2, uint32_t stream,
324 nxt_port_id_t reply_port, nxt_buf_t *b);
325
326 nxt_inline nxt_int_t
nxt_port_socket_write(nxt_task_t * task,nxt_port_t * port,nxt_uint_t type,nxt_fd_t fd,uint32_t stream,nxt_port_id_t reply_port,nxt_buf_t * b)327 nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port,
328 nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port,
329 nxt_buf_t *b)
330 {
331 return nxt_port_socket_write2(task, port, type, fd, -1, stream, reply_port,
332 b);
333 }
334
335 void nxt_port_enable(nxt_task_t *task, nxt_port_t *port,
336 const nxt_port_handlers_t *handlers);
337 nxt_int_t nxt_port_send_port(nxt_task_t *task, nxt_port_t *port,
338 nxt_port_t *new_port, uint32_t stream);
339 void nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt,
340 nxt_uint_t slot, nxt_fd_t fd);
341 void nxt_port_remove_notify_others(nxt_task_t *task, nxt_process_t *process);
342
343 void nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
344 void nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
345 void nxt_port_process_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
346 void nxt_port_change_log_file_handler(nxt_task_t *task,
347 nxt_port_recv_msg_t *msg);
348 void nxt_port_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
349 void nxt_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
350 void nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
351 void nxt_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
352
353 nxt_int_t nxt_port_post(nxt_task_t *task, nxt_port_t *port,
354 nxt_port_post_handler_t handler, void *data);
355 void nxt_port_use(nxt_task_t *task, nxt_port_t *port, int i);
356
nxt_port_inc_use(nxt_port_t * port)357 nxt_inline void nxt_port_inc_use(nxt_port_t *port)
358 {
359 nxt_atomic_fetch_add(&port->use_count, 1);
360 }
361
362 #endif /* _NXT_PORT_H_INCLUDED_ */
363