xref: /unit/src/nxt_port.h (revision 424:38b478d79178)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #ifndef _NXT_PORT_H_INCLUDED_
8 #define _NXT_PORT_H_INCLUDED_
9 
10 
11 struct nxt_port_handlers_s {
12     /* RPC responses. */
13     nxt_port_handler_t  rpc_ready;
14     nxt_port_handler_t  rpc_error;
15 
16     /* Main process RPC requests. */
17     nxt_port_handler_t  start_worker;
18     nxt_port_handler_t  socket;
19     nxt_port_handler_t  modules;
20     nxt_port_handler_t  conf_store;
21 
22     /* File descriptor exchange. */
23     nxt_port_handler_t  change_file;
24     nxt_port_handler_t  new_port;
25     nxt_port_handler_t  mmap;
26 
27     /* New process ready. */
28     nxt_port_handler_t  process_ready;
29 
30     /* Process exit/crash notification. */
31     nxt_port_handler_t  remove_pid;
32 
33     /* Stop process command. */
34     nxt_port_handler_t  quit;
35 
36     /* Various data. */
37     nxt_port_handler_t  data;
38 };
39 
40 
41 #define nxt_port_handler_idx(name)                                            \
42     ( offsetof(nxt_port_handlers_t, name) / sizeof(nxt_port_handler_t) )
43 
44 
45 typedef enum {
46     NXT_PORT_MSG_LAST           = 0x100,
47     NXT_PORT_MSG_CLOSE_FD       = 0x200,
48     NXT_PORT_MSG_SYNC           = 0x400,
49 
50     NXT_PORT_MSG_MASK           = 0xFF,
51 
52     _NXT_PORT_MSG_RPC_READY     = nxt_port_handler_idx(rpc_ready),
53     _NXT_PORT_MSG_RPC_ERROR     = nxt_port_handler_idx(rpc_error),
54 
55     _NXT_PORT_MSG_START_WORKER  = nxt_port_handler_idx(start_worker),
56     _NXT_PORT_MSG_SOCKET        = nxt_port_handler_idx(socket),
57     _NXT_PORT_MSG_MODULES       = nxt_port_handler_idx(modules),
58     _NXT_PORT_MSG_CONF_STORE    = nxt_port_handler_idx(conf_store),
59 
60     _NXT_PORT_MSG_CHANGE_FILE   = nxt_port_handler_idx(change_file),
61     _NXT_PORT_MSG_NEW_PORT      = nxt_port_handler_idx(new_port),
62     _NXT_PORT_MSG_MMAP          = nxt_port_handler_idx(mmap),
63 
64     _NXT_PORT_MSG_PROCESS_READY = nxt_port_handler_idx(process_ready),
65     _NXT_PORT_MSG_REMOVE_PID    = nxt_port_handler_idx(remove_pid),
66     _NXT_PORT_MSG_QUIT          = nxt_port_handler_idx(quit),
67 
68     _NXT_PORT_MSG_DATA          = nxt_port_handler_idx(data),
69 
70     NXT_PORT_MSG_MAX            = sizeof(nxt_port_handlers_t) /
71                                       sizeof(nxt_port_handler_t),
72 
73     NXT_PORT_MSG_RPC_READY      = _NXT_PORT_MSG_RPC_READY,
74     NXT_PORT_MSG_RPC_READY_LAST = _NXT_PORT_MSG_RPC_READY | NXT_PORT_MSG_LAST,
75     NXT_PORT_MSG_RPC_ERROR      = _NXT_PORT_MSG_RPC_ERROR | NXT_PORT_MSG_LAST,
76 
77     NXT_PORT_MSG_START_WORKER   = _NXT_PORT_MSG_START_WORKER |
78                                   NXT_PORT_MSG_LAST,
79     NXT_PORT_MSG_SOCKET         = _NXT_PORT_MSG_SOCKET | NXT_PORT_MSG_LAST,
80     NXT_PORT_MSG_MODULES        = _NXT_PORT_MSG_MODULES | NXT_PORT_MSG_LAST,
81     NXT_PORT_MSG_CONF_STORE     = _NXT_PORT_MSG_CONF_STORE | NXT_PORT_MSG_LAST,
82 
83     NXT_PORT_MSG_CHANGE_FILE    = _NXT_PORT_MSG_CHANGE_FILE | NXT_PORT_MSG_LAST,
84     NXT_PORT_MSG_NEW_PORT       = _NXT_PORT_MSG_NEW_PORT | NXT_PORT_MSG_LAST,
85     NXT_PORT_MSG_MMAP           = _NXT_PORT_MSG_MMAP | NXT_PORT_MSG_LAST |
86                                   NXT_PORT_MSG_CLOSE_FD | NXT_PORT_MSG_SYNC,
87 
88     NXT_PORT_MSG_PROCESS_READY  = _NXT_PORT_MSG_PROCESS_READY |
89                                   NXT_PORT_MSG_LAST,
90     NXT_PORT_MSG_QUIT           = _NXT_PORT_MSG_QUIT | NXT_PORT_MSG_LAST,
91     NXT_PORT_MSG_REMOVE_PID     = _NXT_PORT_MSG_REMOVE_PID | NXT_PORT_MSG_LAST,
92 
93     NXT_PORT_MSG_DATA           = _NXT_PORT_MSG_DATA,
94     NXT_PORT_MSG_DATA_LAST      = _NXT_PORT_MSG_DATA | NXT_PORT_MSG_LAST,
95 } nxt_port_msg_type_t;
96 
97 
98 /* Passed as a first iov chunk. */
99 typedef struct {
100     uint32_t             stream;
101     nxt_pid_t            pid;
102     nxt_port_id_t        reply_port;
103 
104     uint8_t              type;
105 
106     /* Last message for this stream. */
107     uint8_t              last;      /* 1 bit */
108 
109     /* Message data send using mmap, next chunk is a nxt_port_mmap_msg_t. */
110     uint8_t              mmap;      /* 1 bit */
111 
112     /* Non-First fragment in fragmented message sequence. */
113     uint8_t              nf;        /* 1 bit */
114 
115     /* More Fragments followed. */
116     uint8_t              mf;        /* 1 bit */
117 
118     /* Message delivery tracking enabled, next chunk is tracking msg. */
119     uint8_t              tracking;  /* 1 bit */
120 } nxt_port_msg_t;
121 
122 
123 typedef struct {
124     nxt_queue_link_t    link;
125     nxt_buf_t           *buf;
126     size_t              share;
127     nxt_fd_t            fd;
128     nxt_bool_t          close_fd;
129     nxt_port_msg_t      port_msg;
130     uint32_t            tracking_msg[2];
131 
132     nxt_work_t          work;
133 } nxt_port_send_msg_t;
134 
135 
136 struct nxt_port_recv_msg_s {
137     nxt_fd_t            fd;
138     nxt_buf_t           *buf;
139     nxt_port_t          *port;
140     nxt_port_msg_t      port_msg;
141     size_t              size;
142     nxt_bool_t          cancelled;
143     union {
144         nxt_port_t      *new_port;
145         nxt_pid_t       removed_pid;
146         void            *data;
147     } u;
148 };
149 
150 typedef struct nxt_app_s  nxt_app_t;
151 
152 struct nxt_port_s {
153     nxt_fd_event_t      socket;
154 
155     nxt_queue_link_t    link;       /* for nxt_process_t.ports */
156     nxt_process_t       *process;
157 
158     nxt_queue_link_t    app_link;   /* for nxt_app_t.ports */
159     nxt_app_t           *app;
160 
161     nxt_queue_t         messages;   /* of nxt_port_send_msg_t */
162     nxt_thread_mutex_t  write_mutex;
163 
164     /* Maximum size of message part. */
165     uint32_t            max_size;
166     /* Maximum interleave of message parts. */
167     uint32_t            max_share;
168 
169     uint32_t            app_pending_responses;
170     uint32_t            app_responses;
171 
172     nxt_port_handler_t  handler;
173     nxt_port_handler_t  *data;
174 
175     nxt_mp_t            *mem_pool;
176     nxt_event_engine_t  *engine;
177 
178     nxt_buf_t           *free_bufs;
179     nxt_socket_t        pair[2];
180 
181     nxt_port_id_t       id;
182     nxt_pid_t           pid;
183 
184     nxt_lvlhsh_t        rpc_streams; /* stream to nxt_port_rpc_reg_t */
185     nxt_lvlhsh_t        rpc_peers;   /* peer to queue of nxt_port_rpc_reg_t */
186 
187     nxt_lvlhsh_t        frags;
188 
189     nxt_atomic_t        use_count;
190 
191     nxt_process_type_t  type;
192 
193     struct iovec        *iov;
194     void                *mmsg_buf;
195 };
196 
197 
198 typedef struct {
199     nxt_port_id_t       id;
200     nxt_pid_t           pid;
201     size_t              max_size;
202     size_t              max_share;
203     nxt_process_type_t  type:8;
204 } nxt_port_msg_new_port_t;
205 
206 
207 /*
208  * nxt_port_data_t size is allocation size
209  * which enables effective reuse of memory pool cache.
210  */
211 typedef union {
212     nxt_buf_t                buf;
213     nxt_port_msg_new_port_t  new_port;
214 } nxt_port_data_t;
215 
216 
217 typedef void (*nxt_port_post_handler_t)(nxt_task_t *task, nxt_port_t *port,
218     void *data);
219 
220 nxt_port_t *nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid,
221     nxt_process_type_t type);
222 
223 nxt_port_id_t nxt_port_get_next_id(void);
224 void nxt_port_reset_next_id(void);
225 
226 nxt_int_t nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port,
227     size_t max_size);
228 void nxt_port_destroy(nxt_port_t *port);
229 void nxt_port_close(nxt_task_t *task, nxt_port_t *port);
230 void nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port);
231 void nxt_port_write_close(nxt_port_t *port);
232 void nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port);
233 void nxt_port_read_close(nxt_port_t *port);
234 nxt_int_t nxt_port_socket_twrite(nxt_task_t *task, nxt_port_t *port,
235     nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port,
236     nxt_buf_t *b, void *tracking);
237 
238 nxt_inline nxt_int_t
239 nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port,
240     nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port,
241     nxt_buf_t *b)
242 {
243     return nxt_port_socket_twrite(task, port, type, fd, stream, reply_port, b,
244                                   NULL);
245 }
246 
247 void nxt_port_enable(nxt_task_t *task, nxt_port_t *port,
248     nxt_port_handlers_t *handlers);
249 nxt_int_t nxt_port_send_port(nxt_task_t *task, nxt_port_t *port,
250     nxt_port_t *new_port, uint32_t stream);
251 void nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt,
252     nxt_uint_t slot, nxt_fd_t fd);
253 
254 void nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
255 void nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
256 void nxt_port_process_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
257 void nxt_port_change_log_file_handler(nxt_task_t *task,
258     nxt_port_recv_msg_t *msg);
259 void nxt_port_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
260 void nxt_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
261 void nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
262 void nxt_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
263 
264 nxt_int_t nxt_port_post(nxt_task_t *task, nxt_port_t *port,
265     nxt_port_post_handler_t handler, void *data);
266 void nxt_port_use(nxt_task_t *task, nxt_port_t *port, int i);
267 
268 #endif /* _NXT_PORT_H_INCLUDED_ */
269