1
2 /*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7 #ifndef _NXT_PORT_H_INCLUDED_
8 #define _NXT_PORT_H_INCLUDED_
9
10
11 struct nxt_port_handlers_s {
12 /* RPC responses. */
13 nxt_port_handler_t rpc_ready;
14 nxt_port_handler_t rpc_error;
15
16 /* Main process RPC requests. */
17 nxt_port_handler_t start_process;
18 nxt_port_handler_t socket;
19 nxt_port_handler_t socket_unlink;
20 nxt_port_handler_t modules;
21 nxt_port_handler_t conf_store;
22 nxt_port_handler_t cert_get;
23 nxt_port_handler_t cert_delete;
24 nxt_port_handler_t access_log;
25
26 /* File descriptor exchange. */
27 nxt_port_handler_t change_file;
28 nxt_port_handler_t new_port;
29 nxt_port_handler_t get_port;
30 nxt_port_handler_t port_ack;
31 nxt_port_handler_t mmap;
32 nxt_port_handler_t get_mmap;
33
34 /* New process */
35 nxt_port_handler_t process_created;
36 nxt_port_handler_t process_ready;
37 nxt_port_handler_t whoami;
38
39 /* Process exit/crash notification. */
40 nxt_port_handler_t remove_pid;
41
42 /* Stop process command. */
43 nxt_port_handler_t quit;
44
45 /* Request headers. */
46 nxt_port_handler_t req_headers;
47 nxt_port_handler_t req_headers_ack;
48 nxt_port_handler_t req_body;
49
50 /* Websocket frame. */
51 nxt_port_handler_t websocket_frame;
52
53 /* Various data. */
54 nxt_port_handler_t data;
55 nxt_port_handler_t app_restart;
56
57 /* Status report. */
58 nxt_port_handler_t status;
59
60 nxt_port_handler_t oosm;
61 nxt_port_handler_t shm_ack;
62 nxt_port_handler_t read_queue;
63 nxt_port_handler_t read_socket;
64 };
65
66
67 #define nxt_port_handler_idx(name) \
68 ( offsetof(nxt_port_handlers_t, name) / sizeof(nxt_port_handler_t) )
69
70 #define nxt_msg_last(handler) \
71 (handler | NXT_PORT_MSG_LAST)
72
73 typedef enum {
74 NXT_PORT_MSG_LAST = 0x100,
75 NXT_PORT_MSG_CLOSE_FD = 0x200,
76 NXT_PORT_MSG_SYNC = 0x400,
77
78 NXT_PORT_MSG_MASK = 0xFF,
79
80 _NXT_PORT_MSG_RPC_READY = nxt_port_handler_idx(rpc_ready),
81 _NXT_PORT_MSG_RPC_ERROR = nxt_port_handler_idx(rpc_error),
82
83 _NXT_PORT_MSG_START_PROCESS = nxt_port_handler_idx(start_process),
84 _NXT_PORT_MSG_SOCKET = nxt_port_handler_idx(socket),
85 _NXT_PORT_MSG_SOCKET_UNLINK = nxt_port_handler_idx(socket_unlink),
86 _NXT_PORT_MSG_MODULES = nxt_port_handler_idx(modules),
87 _NXT_PORT_MSG_CONF_STORE = nxt_port_handler_idx(conf_store),
88 _NXT_PORT_MSG_CERT_GET = nxt_port_handler_idx(cert_get),
89 _NXT_PORT_MSG_CERT_DELETE = nxt_port_handler_idx(cert_delete),
90 _NXT_PORT_MSG_ACCESS_LOG = nxt_port_handler_idx(access_log),
91
92 _NXT_PORT_MSG_CHANGE_FILE = nxt_port_handler_idx(change_file),
93 _NXT_PORT_MSG_NEW_PORT = nxt_port_handler_idx(new_port),
94 _NXT_PORT_MSG_GET_PORT = nxt_port_handler_idx(get_port),
95 _NXT_PORT_MSG_PORT_ACK = nxt_port_handler_idx(port_ack),
96 _NXT_PORT_MSG_MMAP = nxt_port_handler_idx(mmap),
97 _NXT_PORT_MSG_GET_MMAP = nxt_port_handler_idx(get_mmap),
98
99 _NXT_PORT_MSG_PROCESS_CREATED = nxt_port_handler_idx(process_created),
100 _NXT_PORT_MSG_PROCESS_READY = nxt_port_handler_idx(process_ready),
101 _NXT_PORT_MSG_WHOAMI = nxt_port_handler_idx(whoami),
102 _NXT_PORT_MSG_REMOVE_PID = nxt_port_handler_idx(remove_pid),
103 _NXT_PORT_MSG_QUIT = nxt_port_handler_idx(quit),
104
105 _NXT_PORT_MSG_REQ_HEADERS = nxt_port_handler_idx(req_headers),
106 _NXT_PORT_MSG_REQ_HEADERS_ACK = nxt_port_handler_idx(req_headers_ack),
107 _NXT_PORT_MSG_REQ_BODY = nxt_port_handler_idx(req_body),
108 _NXT_PORT_MSG_WEBSOCKET = nxt_port_handler_idx(websocket_frame),
109
110 _NXT_PORT_MSG_DATA = nxt_port_handler_idx(data),
111 _NXT_PORT_MSG_APP_RESTART = nxt_port_handler_idx(app_restart),
112 _NXT_PORT_MSG_STATUS = nxt_port_handler_idx(status),
113
114 _NXT_PORT_MSG_OOSM = nxt_port_handler_idx(oosm),
115 _NXT_PORT_MSG_SHM_ACK = nxt_port_handler_idx(shm_ack),
116 _NXT_PORT_MSG_READ_QUEUE = nxt_port_handler_idx(read_queue),
117 _NXT_PORT_MSG_READ_SOCKET = nxt_port_handler_idx(read_socket),
118
119 NXT_PORT_MSG_MAX = sizeof(nxt_port_handlers_t)
120 / sizeof(nxt_port_handler_t),
121
122 NXT_PORT_MSG_RPC_READY = _NXT_PORT_MSG_RPC_READY,
123 NXT_PORT_MSG_RPC_READY_LAST = nxt_msg_last(_NXT_PORT_MSG_RPC_READY),
124 NXT_PORT_MSG_RPC_ERROR = nxt_msg_last(_NXT_PORT_MSG_RPC_ERROR),
125 NXT_PORT_MSG_START_PROCESS = nxt_msg_last(_NXT_PORT_MSG_START_PROCESS),
126 NXT_PORT_MSG_SOCKET = nxt_msg_last(_NXT_PORT_MSG_SOCKET),
127 NXT_PORT_MSG_SOCKET_UNLINK = nxt_msg_last(_NXT_PORT_MSG_SOCKET_UNLINK),
128 NXT_PORT_MSG_MODULES = nxt_msg_last(_NXT_PORT_MSG_MODULES),
129 NXT_PORT_MSG_CONF_STORE = nxt_msg_last(_NXT_PORT_MSG_CONF_STORE),
130 NXT_PORT_MSG_CERT_GET = nxt_msg_last(_NXT_PORT_MSG_CERT_GET),
131 NXT_PORT_MSG_CERT_DELETE = nxt_msg_last(_NXT_PORT_MSG_CERT_DELETE),
132 NXT_PORT_MSG_ACCESS_LOG = nxt_msg_last(_NXT_PORT_MSG_ACCESS_LOG),
133 NXT_PORT_MSG_CHANGE_FILE = nxt_msg_last(_NXT_PORT_MSG_CHANGE_FILE),
134 NXT_PORT_MSG_NEW_PORT = nxt_msg_last(_NXT_PORT_MSG_NEW_PORT),
135 NXT_PORT_MSG_GET_PORT = nxt_msg_last(_NXT_PORT_MSG_GET_PORT),
136 NXT_PORT_MSG_PORT_ACK = nxt_msg_last(_NXT_PORT_MSG_PORT_ACK),
137 NXT_PORT_MSG_MMAP = nxt_msg_last(_NXT_PORT_MSG_MMAP)
138 | NXT_PORT_MSG_SYNC,
139 NXT_PORT_MSG_GET_MMAP = nxt_msg_last(_NXT_PORT_MSG_GET_MMAP),
140
141 NXT_PORT_MSG_PROCESS_CREATED = nxt_msg_last(_NXT_PORT_MSG_PROCESS_CREATED),
142 NXT_PORT_MSG_PROCESS_READY = nxt_msg_last(_NXT_PORT_MSG_PROCESS_READY),
143 NXT_PORT_MSG_WHOAMI = nxt_msg_last(_NXT_PORT_MSG_WHOAMI),
144 NXT_PORT_MSG_QUIT = nxt_msg_last(_NXT_PORT_MSG_QUIT),
145 NXT_PORT_MSG_REMOVE_PID = nxt_msg_last(_NXT_PORT_MSG_REMOVE_PID),
146
147 NXT_PORT_MSG_REQ_HEADERS = _NXT_PORT_MSG_REQ_HEADERS,
148 NXT_PORT_MSG_REQ_BODY = _NXT_PORT_MSG_REQ_BODY,
149 NXT_PORT_MSG_WEBSOCKET = _NXT_PORT_MSG_WEBSOCKET,
150 NXT_PORT_MSG_WEBSOCKET_LAST = nxt_msg_last(_NXT_PORT_MSG_WEBSOCKET),
151
152 NXT_PORT_MSG_DATA = _NXT_PORT_MSG_DATA,
153 NXT_PORT_MSG_DATA_LAST = nxt_msg_last(_NXT_PORT_MSG_DATA),
154 NXT_PORT_MSG_APP_RESTART = nxt_msg_last(_NXT_PORT_MSG_APP_RESTART),
155 NXT_PORT_MSG_STATUS = nxt_msg_last(_NXT_PORT_MSG_STATUS),
156
157 NXT_PORT_MSG_OOSM = nxt_msg_last(_NXT_PORT_MSG_OOSM),
158 NXT_PORT_MSG_SHM_ACK = nxt_msg_last(_NXT_PORT_MSG_SHM_ACK),
159 NXT_PORT_MSG_READ_QUEUE = _NXT_PORT_MSG_READ_QUEUE,
160 NXT_PORT_MSG_READ_SOCKET = _NXT_PORT_MSG_READ_SOCKET,
161 } nxt_port_msg_type_t;
162
163
164 /* Passed as a first iov chunk. */
165 typedef struct {
166 uint32_t stream;
167
168 nxt_pid_t pid; /* not used on Linux and FreeBSD */
169
170 nxt_port_id_t reply_port;
171
172 uint8_t type;
173
174 /* Last message for this stream. */
175 uint8_t last; /* 1 bit */
176
177 /* Message data send using mmap, next chunk is a nxt_port_mmap_msg_t. */
178 uint8_t mmap; /* 1 bit */
179
180 /* Non-First fragment in fragmented message sequence. */
181 uint8_t nf; /* 1 bit */
182
183 /* More Fragments followed. */
184 uint8_t mf; /* 1 bit */
185 } nxt_port_msg_t;
186
187
188 typedef struct {
189 nxt_queue_link_t link;
190 nxt_buf_t *buf;
191 size_t share;
192 nxt_fd_t fd[2];
193 nxt_port_msg_t port_msg;
194 uint8_t close_fd; /* 1 bit */
195 uint8_t allocated; /* 1 bit */
196 } nxt_port_send_msg_t;
197
198 #if (NXT_HAVE_UCRED) || (NXT_HAVE_MSGHDR_CMSGCRED)
199 #define NXT_USE_CMSG_PID 1
200 #endif
201
202 struct nxt_port_recv_msg_s {
203 nxt_fd_t fd[2];
204 nxt_buf_t *buf;
205 nxt_port_t *port;
206 nxt_port_msg_t port_msg;
207 size_t size;
208 #if (NXT_USE_CMSG_PID)
209 nxt_pid_t cmsg_pid;
210 #endif
211 nxt_bool_t cancelled;
212 union {
213 nxt_port_t *new_port;
214 nxt_pid_t removed_pid;
215 void *data;
216 } u;
217 };
218
219
220 #if (NXT_USE_CMSG_PID)
221 #define nxt_recv_msg_cmsg_pid(msg) ((msg)->cmsg_pid)
222 #define nxt_recv_msg_cmsg_pid_ref(msg) (&(msg)->cmsg_pid)
223 #else
224 #define nxt_recv_msg_cmsg_pid(msg) ((msg)->port_msg.pid)
225 #define nxt_recv_msg_cmsg_pid_ref(msg) (NULL)
226 #endif
227
228 typedef struct nxt_app_s nxt_app_t;
229
230 struct nxt_port_s {
231 nxt_fd_event_t socket;
232
233 nxt_queue_link_t link; /* for nxt_process_t.ports */
234 nxt_process_t *process;
235
236 nxt_queue_link_t app_link; /* for nxt_app_t.ports */
237 nxt_app_t *app;
238 nxt_port_t *main_app_port;
239
240 nxt_queue_link_t idle_link; /* for nxt_app_t.idle_ports */
241 nxt_msec_t idle_start;
242
243 nxt_queue_t messages; /* of nxt_port_send_msg_t */
244 nxt_thread_mutex_t write_mutex;
245
246 /* Maximum size of message part. */
247 uint32_t max_size;
248 /* Maximum interleave of message parts. */
249 uint32_t max_share;
250
251 uint32_t active_websockets;
252 uint32_t active_requests;
253
254 nxt_port_handler_t handler;
255 nxt_port_handler_t *data;
256
257 nxt_mp_t *mem_pool;
258 nxt_event_engine_t *engine;
259
260 nxt_buf_t *free_bufs;
261 nxt_socket_t pair[2];
262
263 nxt_port_id_t id;
264 nxt_pid_t pid;
265
266 nxt_lvlhsh_t rpc_streams; /* stream to nxt_port_rpc_reg_t */
267 nxt_lvlhsh_t rpc_peers; /* peer to queue of nxt_port_rpc_reg_t */
268
269 nxt_lvlhsh_t frags;
270
271 nxt_atomic_t use_count;
272
273 nxt_process_type_t type;
274
275 nxt_fd_t queue_fd;
276 void *queue;
277
278 void *socket_msg;
279 int from_socket;
280 };
281
282
283 typedef struct {
284 nxt_port_id_t id;
285 nxt_pid_t pid;
286 size_t max_size;
287 size_t max_share;
288 nxt_process_type_t type:8;
289 } nxt_port_msg_new_port_t;
290
291
292 typedef struct {
293 nxt_port_id_t id;
294 nxt_pid_t pid;
295 } nxt_port_msg_get_port_t;
296
297
298 typedef struct {
299 uint32_t id;
300 } nxt_port_msg_get_mmap_t;
301
302
303 /*
304 * nxt_port_data_t size is allocation size
305 * which enables effective reuse of memory pool cache.
306 */
307 typedef union {
308 nxt_buf_t buf;
309 nxt_port_msg_new_port_t new_port;
310 } nxt_port_data_t;
311
312
313 typedef void (*nxt_port_post_handler_t)(nxt_task_t *task, nxt_port_t *port,
314 void *data);
315
316 nxt_port_t *nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid,
317 nxt_process_type_t type);
318
319 nxt_port_id_t nxt_port_get_next_id(void);
320 void nxt_port_reset_next_id(void);
321
322 nxt_int_t nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port,
323 size_t max_size);
324 void nxt_port_destroy(nxt_port_t *port);
325 void nxt_port_close(nxt_task_t *task, nxt_port_t *port);
326 void nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port);
327 void nxt_port_write_close(nxt_port_t *port);
328 void nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port);
329 void nxt_port_read_close(nxt_port_t *port);
330 nxt_int_t nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port,
331 nxt_uint_t type, nxt_fd_t fd, nxt_fd_t fd2, uint32_t stream,
332 nxt_port_id_t reply_port, nxt_buf_t *b);
333
334 nxt_inline nxt_int_t
nxt_port_socket_write(nxt_task_t * task,nxt_port_t * port,nxt_uint_t type,nxt_fd_t fd,uint32_t stream,nxt_port_id_t reply_port,nxt_buf_t * b)335 nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port,
336 nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port,
337 nxt_buf_t *b)
338 {
339 return nxt_port_socket_write2(task, port, type, fd, -1, stream, reply_port,
340 b);
341 }
342
343 void nxt_port_enable(nxt_task_t *task, nxt_port_t *port,
344 const nxt_port_handlers_t *handlers);
345 nxt_int_t nxt_port_send_port(nxt_task_t *task, nxt_port_t *port,
346 nxt_port_t *new_port, uint32_t stream);
347 void nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt,
348 nxt_uint_t slot, nxt_fd_t fd);
349 void nxt_port_remove_notify_others(nxt_task_t *task, nxt_process_t *process);
350
351 void nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
352 void nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
353 void nxt_port_process_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
354 void nxt_port_change_log_file_handler(nxt_task_t *task,
355 nxt_port_recv_msg_t *msg);
356 void nxt_port_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
357 void nxt_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
358 void nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
359 void nxt_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
360
361 nxt_int_t nxt_port_post(nxt_task_t *task, nxt_port_t *port,
362 nxt_port_post_handler_t handler, void *data);
363 void nxt_port_use(nxt_task_t *task, nxt_port_t *port, int i);
364
nxt_port_inc_use(nxt_port_t * port)365 nxt_inline void nxt_port_inc_use(nxt_port_t *port)
366 {
367 nxt_atomic_fetch_add(&port->use_count, 1);
368 }
369
370 #endif /* _NXT_PORT_H_INCLUDED_ */
371