1
2 /*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7 #ifndef _NXT_PORT_H_INCLUDED_
8 #define _NXT_PORT_H_INCLUDED_
9
10
11 struct nxt_port_handlers_s {
12 /* RPC responses. */
13 nxt_port_handler_t rpc_ready;
14 nxt_port_handler_t rpc_error;
15
16 /* Main process RPC requests. */
17 nxt_port_handler_t start_process;
18 nxt_port_handler_t socket;
19 nxt_port_handler_t socket_unlink;
20 nxt_port_handler_t modules;
21 nxt_port_handler_t conf_store;
22 nxt_port_handler_t cert_get;
23 nxt_port_handler_t cert_delete;
24 nxt_port_handler_t script_get;
25 nxt_port_handler_t script_delete;
26 nxt_port_handler_t access_log;
27
28 /* File descriptor exchange. */
29 nxt_port_handler_t change_file;
30 nxt_port_handler_t new_port;
31 nxt_port_handler_t get_port;
32 nxt_port_handler_t port_ack;
33 nxt_port_handler_t mmap;
34 nxt_port_handler_t get_mmap;
35
36 /* New process */
37 nxt_port_handler_t process_created;
38 nxt_port_handler_t process_ready;
39 nxt_port_handler_t whoami;
40
41 /* Process exit/crash notification. */
42 nxt_port_handler_t remove_pid;
43
44 /* Stop process command. */
45 nxt_port_handler_t quit;
46
47 /* Request headers. */
48 nxt_port_handler_t req_headers;
49 nxt_port_handler_t req_headers_ack;
50 nxt_port_handler_t req_body;
51
52 /* Websocket frame. */
53 nxt_port_handler_t websocket_frame;
54
55 /* Various data. */
56 nxt_port_handler_t data;
57 nxt_port_handler_t app_restart;
58
59 /* Status report. */
60 nxt_port_handler_t status;
61
62 nxt_port_handler_t oosm;
63 nxt_port_handler_t shm_ack;
64 nxt_port_handler_t read_queue;
65 nxt_port_handler_t read_socket;
66 };
67
68
69 #define nxt_port_handler_idx(name) \
70 ( offsetof(nxt_port_handlers_t, name) / sizeof(nxt_port_handler_t) )
71
72 #define nxt_msg_last(handler) \
73 (handler | NXT_PORT_MSG_LAST)
74
75 typedef enum {
76 NXT_PORT_MSG_LAST = 0x100,
77 NXT_PORT_MSG_CLOSE_FD = 0x200,
78 NXT_PORT_MSG_SYNC = 0x400,
79
80 NXT_PORT_MSG_MASK = 0xFF,
81
82 _NXT_PORT_MSG_RPC_READY = nxt_port_handler_idx(rpc_ready),
83 _NXT_PORT_MSG_RPC_ERROR = nxt_port_handler_idx(rpc_error),
84
85 _NXT_PORT_MSG_START_PROCESS = nxt_port_handler_idx(start_process),
86 _NXT_PORT_MSG_SOCKET = nxt_port_handler_idx(socket),
87 _NXT_PORT_MSG_SOCKET_UNLINK = nxt_port_handler_idx(socket_unlink),
88 _NXT_PORT_MSG_MODULES = nxt_port_handler_idx(modules),
89 _NXT_PORT_MSG_CONF_STORE = nxt_port_handler_idx(conf_store),
90 _NXT_PORT_MSG_CERT_GET = nxt_port_handler_idx(cert_get),
91 _NXT_PORT_MSG_CERT_DELETE = nxt_port_handler_idx(cert_delete),
92 _NXT_PORT_MSG_SCRIPT_GET = nxt_port_handler_idx(script_get),
93 _NXT_PORT_MSG_SCRIPT_DELETE = nxt_port_handler_idx(script_delete),
94 _NXT_PORT_MSG_ACCESS_LOG = nxt_port_handler_idx(access_log),
95
96 _NXT_PORT_MSG_CHANGE_FILE = nxt_port_handler_idx(change_file),
97 _NXT_PORT_MSG_NEW_PORT = nxt_port_handler_idx(new_port),
98 _NXT_PORT_MSG_GET_PORT = nxt_port_handler_idx(get_port),
99 _NXT_PORT_MSG_PORT_ACK = nxt_port_handler_idx(port_ack),
100 _NXT_PORT_MSG_MMAP = nxt_port_handler_idx(mmap),
101 _NXT_PORT_MSG_GET_MMAP = nxt_port_handler_idx(get_mmap),
102
103 _NXT_PORT_MSG_PROCESS_CREATED = nxt_port_handler_idx(process_created),
104 _NXT_PORT_MSG_PROCESS_READY = nxt_port_handler_idx(process_ready),
105 _NXT_PORT_MSG_WHOAMI = nxt_port_handler_idx(whoami),
106 _NXT_PORT_MSG_REMOVE_PID = nxt_port_handler_idx(remove_pid),
107 _NXT_PORT_MSG_QUIT = nxt_port_handler_idx(quit),
108
109 _NXT_PORT_MSG_REQ_HEADERS = nxt_port_handler_idx(req_headers),
110 _NXT_PORT_MSG_REQ_HEADERS_ACK = nxt_port_handler_idx(req_headers_ack),
111 _NXT_PORT_MSG_REQ_BODY = nxt_port_handler_idx(req_body),
112 _NXT_PORT_MSG_WEBSOCKET = nxt_port_handler_idx(websocket_frame),
113
114 _NXT_PORT_MSG_DATA = nxt_port_handler_idx(data),
115 _NXT_PORT_MSG_APP_RESTART = nxt_port_handler_idx(app_restart),
116 _NXT_PORT_MSG_STATUS = nxt_port_handler_idx(status),
117
118 _NXT_PORT_MSG_OOSM = nxt_port_handler_idx(oosm),
119 _NXT_PORT_MSG_SHM_ACK = nxt_port_handler_idx(shm_ack),
120 _NXT_PORT_MSG_READ_QUEUE = nxt_port_handler_idx(read_queue),
121 _NXT_PORT_MSG_READ_SOCKET = nxt_port_handler_idx(read_socket),
122
123 NXT_PORT_MSG_MAX = sizeof(nxt_port_handlers_t)
124 / sizeof(nxt_port_handler_t),
125
126 NXT_PORT_MSG_RPC_READY = _NXT_PORT_MSG_RPC_READY,
127 NXT_PORT_MSG_RPC_READY_LAST = nxt_msg_last(_NXT_PORT_MSG_RPC_READY),
128 NXT_PORT_MSG_RPC_ERROR = nxt_msg_last(_NXT_PORT_MSG_RPC_ERROR),
129 NXT_PORT_MSG_START_PROCESS = nxt_msg_last(_NXT_PORT_MSG_START_PROCESS),
130 NXT_PORT_MSG_SOCKET = nxt_msg_last(_NXT_PORT_MSG_SOCKET),
131 NXT_PORT_MSG_SOCKET_UNLINK = nxt_msg_last(_NXT_PORT_MSG_SOCKET_UNLINK),
132 NXT_PORT_MSG_MODULES = nxt_msg_last(_NXT_PORT_MSG_MODULES),
133 NXT_PORT_MSG_CONF_STORE = nxt_msg_last(_NXT_PORT_MSG_CONF_STORE),
134 NXT_PORT_MSG_CERT_GET = nxt_msg_last(_NXT_PORT_MSG_CERT_GET),
135 NXT_PORT_MSG_CERT_DELETE = nxt_msg_last(_NXT_PORT_MSG_CERT_DELETE),
136 NXT_PORT_MSG_SCRIPT_GET = nxt_msg_last(_NXT_PORT_MSG_SCRIPT_GET),
137 NXT_PORT_MSG_SCRIPT_DELETE = nxt_msg_last(_NXT_PORT_MSG_SCRIPT_DELETE),
138 NXT_PORT_MSG_ACCESS_LOG = nxt_msg_last(_NXT_PORT_MSG_ACCESS_LOG),
139 NXT_PORT_MSG_CHANGE_FILE = nxt_msg_last(_NXT_PORT_MSG_CHANGE_FILE),
140 NXT_PORT_MSG_NEW_PORT = nxt_msg_last(_NXT_PORT_MSG_NEW_PORT),
141 NXT_PORT_MSG_GET_PORT = nxt_msg_last(_NXT_PORT_MSG_GET_PORT),
142 NXT_PORT_MSG_PORT_ACK = nxt_msg_last(_NXT_PORT_MSG_PORT_ACK),
143 NXT_PORT_MSG_MMAP = nxt_msg_last(_NXT_PORT_MSG_MMAP)
144 | NXT_PORT_MSG_SYNC,
145 NXT_PORT_MSG_GET_MMAP = nxt_msg_last(_NXT_PORT_MSG_GET_MMAP),
146
147 NXT_PORT_MSG_PROCESS_CREATED = nxt_msg_last(_NXT_PORT_MSG_PROCESS_CREATED),
148 NXT_PORT_MSG_PROCESS_READY = nxt_msg_last(_NXT_PORT_MSG_PROCESS_READY),
149 NXT_PORT_MSG_WHOAMI = nxt_msg_last(_NXT_PORT_MSG_WHOAMI),
150 NXT_PORT_MSG_QUIT = nxt_msg_last(_NXT_PORT_MSG_QUIT),
151 NXT_PORT_MSG_REMOVE_PID = nxt_msg_last(_NXT_PORT_MSG_REMOVE_PID),
152
153 NXT_PORT_MSG_REQ_HEADERS = _NXT_PORT_MSG_REQ_HEADERS,
154 NXT_PORT_MSG_REQ_BODY = _NXT_PORT_MSG_REQ_BODY,
155 NXT_PORT_MSG_WEBSOCKET = _NXT_PORT_MSG_WEBSOCKET,
156 NXT_PORT_MSG_WEBSOCKET_LAST = nxt_msg_last(_NXT_PORT_MSG_WEBSOCKET),
157
158 NXT_PORT_MSG_DATA = _NXT_PORT_MSG_DATA,
159 NXT_PORT_MSG_DATA_LAST = nxt_msg_last(_NXT_PORT_MSG_DATA),
160 NXT_PORT_MSG_APP_RESTART = nxt_msg_last(_NXT_PORT_MSG_APP_RESTART),
161 NXT_PORT_MSG_STATUS = nxt_msg_last(_NXT_PORT_MSG_STATUS),
162
163 NXT_PORT_MSG_OOSM = nxt_msg_last(_NXT_PORT_MSG_OOSM),
164 NXT_PORT_MSG_SHM_ACK = nxt_msg_last(_NXT_PORT_MSG_SHM_ACK),
165 NXT_PORT_MSG_READ_QUEUE = _NXT_PORT_MSG_READ_QUEUE,
166 NXT_PORT_MSG_READ_SOCKET = _NXT_PORT_MSG_READ_SOCKET,
167 } nxt_port_msg_type_t;
168
169
170 /* Passed as a first iov chunk. */
171 typedef struct {
172 uint32_t stream;
173
174 nxt_pid_t pid; /* not used on Linux and FreeBSD */
175
176 nxt_port_id_t reply_port;
177
178 uint8_t type;
179
180 /* Last message for this stream. */
181 uint8_t last; /* 1 bit */
182
183 /* Message data send using mmap, next chunk is a nxt_port_mmap_msg_t. */
184 uint8_t mmap; /* 1 bit */
185
186 /* Non-First fragment in fragmented message sequence. */
187 uint8_t nf; /* 1 bit */
188
189 /* More Fragments followed. */
190 uint8_t mf; /* 1 bit */
191 } nxt_port_msg_t;
192
193
194 typedef struct {
195 nxt_queue_link_t link;
196 nxt_buf_t *buf;
197 size_t share;
198 nxt_fd_t fd[2];
199 nxt_port_msg_t port_msg;
200 uint8_t close_fd; /* 1 bit */
201 uint8_t allocated; /* 1 bit */
202 } nxt_port_send_msg_t;
203
204 #if (NXT_HAVE_UCRED) || (NXT_HAVE_MSGHDR_CMSGCRED)
205 #define NXT_USE_CMSG_PID 1
206 #endif
207
208 struct nxt_port_recv_msg_s {
209 nxt_fd_t fd[2];
210 nxt_buf_t *buf;
211 nxt_port_t *port;
212 nxt_port_msg_t port_msg;
213 size_t size;
214 #if (NXT_USE_CMSG_PID)
215 nxt_pid_t cmsg_pid;
216 #endif
217 nxt_bool_t cancelled;
218 union {
219 nxt_port_t *new_port;
220 nxt_pid_t removed_pid;
221 void *data;
222 } u;
223 };
224
225
226 #if (NXT_USE_CMSG_PID)
227 #define nxt_recv_msg_cmsg_pid(msg) ((msg)->cmsg_pid)
228 #define nxt_recv_msg_cmsg_pid_ref(msg) (&(msg)->cmsg_pid)
229 #else
230 #define nxt_recv_msg_cmsg_pid(msg) ((msg)->port_msg.pid)
231 #define nxt_recv_msg_cmsg_pid_ref(msg) (NULL)
232 #endif
233
234 typedef struct nxt_app_s nxt_app_t;
235
236 struct nxt_port_s {
237 nxt_fd_event_t socket;
238
239 nxt_queue_link_t link; /* for nxt_process_t.ports */
240 nxt_process_t *process;
241
242 nxt_queue_link_t app_link; /* for nxt_app_t.ports */
243 nxt_app_t *app;
244 nxt_port_t *main_app_port;
245
246 nxt_queue_link_t idle_link; /* for nxt_app_t.idle_ports */
247 nxt_msec_t idle_start;
248
249 nxt_queue_t messages; /* of nxt_port_send_msg_t */
250 nxt_thread_mutex_t write_mutex;
251
252 /* Maximum size of message part. */
253 uint32_t max_size;
254 /* Maximum interleave of message parts. */
255 uint32_t max_share;
256
257 uint32_t active_websockets;
258 uint32_t active_requests;
259
260 nxt_port_handler_t handler;
261 nxt_port_handler_t *data;
262
263 nxt_mp_t *mem_pool;
264 nxt_event_engine_t *engine;
265
266 nxt_buf_t *free_bufs;
267 nxt_socket_t pair[2];
268
269 nxt_port_id_t id;
270 nxt_pid_t pid;
271
272 nxt_lvlhsh_t rpc_streams; /* stream to nxt_port_rpc_reg_t */
273 nxt_lvlhsh_t rpc_peers; /* peer to queue of nxt_port_rpc_reg_t */
274
275 nxt_lvlhsh_t frags;
276
277 nxt_atomic_t use_count;
278
279 nxt_process_type_t type;
280
281 nxt_fd_t queue_fd;
282 void *queue;
283
284 void *socket_msg;
285 int from_socket;
286 };
287
288
289 typedef struct {
290 nxt_port_id_t id;
291 nxt_pid_t pid;
292 size_t max_size;
293 size_t max_share;
294 nxt_process_type_t type:8;
295 } nxt_port_msg_new_port_t;
296
297
298 typedef struct {
299 nxt_port_id_t id;
300 nxt_pid_t pid;
301 } nxt_port_msg_get_port_t;
302
303
304 typedef struct {
305 uint32_t id;
306 } nxt_port_msg_get_mmap_t;
307
308
309 /*
310 * nxt_port_data_t size is allocation size
311 * which enables effective reuse of memory pool cache.
312 */
313 typedef union {
314 nxt_buf_t buf;
315 nxt_port_msg_new_port_t new_port;
316 } nxt_port_data_t;
317
318
319 typedef void (*nxt_port_post_handler_t)(nxt_task_t *task, nxt_port_t *port,
320 void *data);
321
322 nxt_port_t *nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid,
323 nxt_process_type_t type);
324
325 nxt_port_id_t nxt_port_get_next_id(void);
326 void nxt_port_reset_next_id(void);
327
328 nxt_int_t nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port,
329 size_t max_size);
330 void nxt_port_destroy(nxt_port_t *port);
331 void nxt_port_close(nxt_task_t *task, nxt_port_t *port);
332 void nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port);
333 void nxt_port_write_close(nxt_port_t *port);
334 void nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port);
335 void nxt_port_read_close(nxt_port_t *port);
336 nxt_int_t nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port,
337 nxt_uint_t type, nxt_fd_t fd, nxt_fd_t fd2, uint32_t stream,
338 nxt_port_id_t reply_port, nxt_buf_t *b);
339
340 nxt_inline nxt_int_t
nxt_port_socket_write(nxt_task_t * task,nxt_port_t * port,nxt_uint_t type,nxt_fd_t fd,uint32_t stream,nxt_port_id_t reply_port,nxt_buf_t * b)341 nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port,
342 nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port,
343 nxt_buf_t *b)
344 {
345 return nxt_port_socket_write2(task, port, type, fd, -1, stream, reply_port,
346 b);
347 }
348
349 void nxt_port_enable(nxt_task_t *task, nxt_port_t *port,
350 const nxt_port_handlers_t *handlers);
351 nxt_int_t nxt_port_send_port(nxt_task_t *task, nxt_port_t *port,
352 nxt_port_t *new_port, uint32_t stream);
353 void nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt,
354 nxt_uint_t slot, nxt_fd_t fd);
355 void nxt_port_remove_notify_others(nxt_task_t *task, nxt_process_t *process);
356
357 void nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
358 void nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
359 void nxt_port_process_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
360 void nxt_port_change_log_file_handler(nxt_task_t *task,
361 nxt_port_recv_msg_t *msg);
362 void nxt_port_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
363 void nxt_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
364 void nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
365 void nxt_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
366
367 nxt_int_t nxt_port_post(nxt_task_t *task, nxt_port_t *port,
368 nxt_port_post_handler_t handler, void *data);
369 void nxt_port_use(nxt_task_t *task, nxt_port_t *port, int i);
370
nxt_port_inc_use(nxt_port_t * port)371 nxt_inline void nxt_port_inc_use(nxt_port_t *port)
372 {
373 nxt_atomic_fetch_add(&port->use_count, 1);
374 }
375
376 #endif /* _NXT_PORT_H_INCLUDED_ */
377