xref: /unit/src/nxt_port.h (revision 389:3f222d4a7df8)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #ifndef _NXT_PORT_H_INCLUDED_
8 #define _NXT_PORT_H_INCLUDED_
9 
10 
11 struct nxt_port_handlers_s {
12     /* RPC responses. */
13     nxt_port_handler_t  rpc_ready;
14     nxt_port_handler_t  rpc_error;
15 
16     /* Main process RPC requests. */
17     nxt_port_handler_t  start_worker;
18     nxt_port_handler_t  socket;
19     nxt_port_handler_t  modules;
20     nxt_port_handler_t  conf_store;
21 
22     /* File descriptor exchange. */
23     nxt_port_handler_t  change_file;
24     nxt_port_handler_t  new_port;
25     nxt_port_handler_t  mmap;
26 
27     /* New process ready. */
28     nxt_port_handler_t  process_ready;
29 
30     /* Process exit/crash notification. */
31     nxt_port_handler_t  remove_pid;
32 
33     /* Stop process command. */
34     nxt_port_handler_t  quit;
35 
36     /* Various data. */
37     nxt_port_handler_t  data;
38 };
39 
40 
41 #define nxt_port_handler_idx(name)                                            \
42     ( offsetof(nxt_port_handlers_t, name) / sizeof(nxt_port_handler_t) )
43 
44 
45 typedef enum {
46     NXT_PORT_MSG_LAST           = 0x100,
47     NXT_PORT_MSG_CLOSE_FD       = 0x200,
48     NXT_PORT_MSG_SYNC           = 0x400,
49 
50     NXT_PORT_MSG_MASK           = 0xFF,
51 
52     _NXT_PORT_MSG_RPC_READY     = nxt_port_handler_idx(rpc_ready),
53     _NXT_PORT_MSG_RPC_ERROR     = nxt_port_handler_idx(rpc_error),
54 
55     _NXT_PORT_MSG_START_WORKER  = nxt_port_handler_idx(start_worker),
56     _NXT_PORT_MSG_SOCKET        = nxt_port_handler_idx(socket),
57     _NXT_PORT_MSG_MODULES       = nxt_port_handler_idx(modules),
58     _NXT_PORT_MSG_CONF_STORE    = nxt_port_handler_idx(conf_store),
59 
60     _NXT_PORT_MSG_CHANGE_FILE   = nxt_port_handler_idx(change_file),
61     _NXT_PORT_MSG_NEW_PORT      = nxt_port_handler_idx(new_port),
62     _NXT_PORT_MSG_MMAP          = nxt_port_handler_idx(mmap),
63 
64     _NXT_PORT_MSG_PROCESS_READY = nxt_port_handler_idx(process_ready),
65     _NXT_PORT_MSG_REMOVE_PID    = nxt_port_handler_idx(remove_pid),
66     _NXT_PORT_MSG_QUIT          = nxt_port_handler_idx(quit),
67 
68     _NXT_PORT_MSG_DATA          = nxt_port_handler_idx(data),
69 
70     NXT_PORT_MSG_MAX            = sizeof(nxt_port_handlers_t) /
71                                       sizeof(nxt_port_handler_t),
72 
73     NXT_PORT_MSG_RPC_READY      = _NXT_PORT_MSG_RPC_READY,
74     NXT_PORT_MSG_RPC_READY_LAST = _NXT_PORT_MSG_RPC_READY | NXT_PORT_MSG_LAST,
75     NXT_PORT_MSG_RPC_ERROR      = _NXT_PORT_MSG_RPC_ERROR | NXT_PORT_MSG_LAST,
76 
77     NXT_PORT_MSG_START_WORKER   = _NXT_PORT_MSG_START_WORKER |
78                                   NXT_PORT_MSG_LAST,
79     NXT_PORT_MSG_SOCKET         = _NXT_PORT_MSG_SOCKET | NXT_PORT_MSG_LAST,
80     NXT_PORT_MSG_MODULES        = _NXT_PORT_MSG_MODULES | NXT_PORT_MSG_LAST,
81     NXT_PORT_MSG_CONF_STORE     = _NXT_PORT_MSG_CONF_STORE | NXT_PORT_MSG_LAST,
82 
83     NXT_PORT_MSG_CHANGE_FILE    = _NXT_PORT_MSG_CHANGE_FILE | NXT_PORT_MSG_LAST,
84     NXT_PORT_MSG_NEW_PORT       = _NXT_PORT_MSG_NEW_PORT | NXT_PORT_MSG_LAST,
85     NXT_PORT_MSG_MMAP           = _NXT_PORT_MSG_MMAP | NXT_PORT_MSG_LAST |
86                                   NXT_PORT_MSG_CLOSE_FD | NXT_PORT_MSG_SYNC,
87 
88     NXT_PORT_MSG_PROCESS_READY  = _NXT_PORT_MSG_PROCESS_READY |
89                                   NXT_PORT_MSG_LAST,
90     NXT_PORT_MSG_QUIT           = _NXT_PORT_MSG_QUIT | NXT_PORT_MSG_LAST,
91     NXT_PORT_MSG_REMOVE_PID     = _NXT_PORT_MSG_REMOVE_PID | NXT_PORT_MSG_LAST,
92 
93     NXT_PORT_MSG_DATA           = _NXT_PORT_MSG_DATA,
94     NXT_PORT_MSG_DATA_LAST      = _NXT_PORT_MSG_DATA | NXT_PORT_MSG_LAST,
95 } nxt_port_msg_type_t;
96 
97 
98 /* Passed as a first iov chunk. */
99 typedef struct {
100     uint32_t             stream;
101     nxt_pid_t            pid;
102     nxt_port_id_t        reply_port;
103 
104     uint8_t              type;
105     uint8_t              last;      /* 1 bit */
106 
107     /* Message data send using mmap, next chunk is a nxt_port_mmap_msg_t. */
108     uint8_t              mmap;      /* 1 bit */
109 
110     uint8_t              nf;
111     uint8_t              mf;
112 } nxt_port_msg_t;
113 
114 
115 typedef struct {
116     nxt_queue_link_t    link;
117     nxt_buf_t           *buf;
118     size_t              share;
119     nxt_fd_t            fd;
120     nxt_bool_t          close_fd;
121     nxt_port_msg_t      port_msg;
122 
123     nxt_work_t          work;
124 } nxt_port_send_msg_t;
125 
126 
127 struct nxt_port_recv_msg_s {
128     nxt_fd_t            fd;
129     nxt_buf_t           *buf;
130     nxt_port_t          *port;
131     nxt_port_msg_t      port_msg;
132     size_t              size;
133     union {
134         nxt_port_t      *new_port;
135         nxt_pid_t       removed_pid;
136         void            *data;
137     } u;
138 };
139 
140 typedef struct nxt_app_s  nxt_app_t;
141 
142 struct nxt_port_s {
143     nxt_fd_event_t      socket;
144 
145     nxt_queue_link_t    link;       /* for nxt_process_t.ports */
146     nxt_process_t       *process;
147 
148     nxt_queue_link_t    app_link;   /* for nxt_app_t.ports */
149     nxt_app_t           *app;
150 
151     nxt_queue_t         messages;   /* of nxt_port_send_msg_t */
152     nxt_thread_mutex_t  write_mutex;
153 
154     /* Maximum size of message part. */
155     uint32_t            max_size;
156     /* Maximum interleave of message parts. */
157     uint32_t            max_share;
158 
159     uint32_t            app_requests;
160     uint32_t            app_responses;
161 
162     nxt_port_handler_t  handler;
163     nxt_port_handler_t  *data;
164 
165     nxt_mp_t            *mem_pool;
166     nxt_event_engine_t  *engine;
167 
168     nxt_buf_t           *free_bufs;
169     nxt_socket_t        pair[2];
170 
171     nxt_port_id_t       id;
172     nxt_pid_t           pid;
173 
174     nxt_lvlhsh_t        rpc_streams; /* stream to nxt_port_rpc_reg_t */
175     nxt_lvlhsh_t        rpc_peers;   /* peer to queue of nxt_port_rpc_reg_t */
176 
177     nxt_lvlhsh_t        frags;
178 
179     nxt_atomic_t        use_count;
180 
181     nxt_process_type_t  type;
182 
183     struct iovec        *iov;
184     void                *mmsg_buf;
185 };
186 
187 
188 typedef struct {
189     nxt_port_id_t       id;
190     nxt_pid_t           pid;
191     size_t              max_size;
192     size_t              max_share;
193     nxt_process_type_t  type:8;
194 } nxt_port_msg_new_port_t;
195 
196 
197 /*
198  * nxt_port_data_t size is allocation size
199  * which enables effective reuse of memory pool cache.
200  */
201 typedef union {
202     nxt_buf_t                buf;
203     nxt_port_msg_new_port_t  new_port;
204 } nxt_port_data_t;
205 
206 
207 typedef void (*nxt_port_post_handler_t)(nxt_task_t *task, nxt_port_t *port,
208     void *data);
209 
210 nxt_port_t *nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid,
211     nxt_process_type_t type);
212 
213 nxt_port_id_t nxt_port_get_next_id(void);
214 void nxt_port_reset_next_id(void);
215 
216 nxt_int_t nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port,
217     size_t max_size);
218 void nxt_port_destroy(nxt_port_t *port);
219 void nxt_port_close(nxt_task_t *task, nxt_port_t *port);
220 void nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port);
221 void nxt_port_write_close(nxt_port_t *port);
222 void nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port);
223 void nxt_port_read_close(nxt_port_t *port);
224 nxt_int_t nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port,
225     nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port,
226     nxt_buf_t *b);
227 
228 void nxt_port_enable(nxt_task_t *task, nxt_port_t *port,
229     nxt_port_handlers_t *handlers);
230 nxt_int_t nxt_port_send_port(nxt_task_t *task, nxt_port_t *port,
231     nxt_port_t *new_port, uint32_t stream);
232 void nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt,
233     nxt_uint_t slot, nxt_fd_t fd);
234 
235 void nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
236 void nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
237 void nxt_port_process_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
238 void nxt_port_change_log_file_handler(nxt_task_t *task,
239     nxt_port_recv_msg_t *msg);
240 void nxt_port_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
241 void nxt_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
242 void nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
243 void nxt_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
244 
245 nxt_int_t nxt_port_post(nxt_task_t *task, nxt_port_t *port,
246     nxt_port_post_handler_t handler, void *data);
247 void nxt_port_use(nxt_task_t *task, nxt_port_t *port, int i);
248 
249 #endif /* _NXT_PORT_H_INCLUDED_ */
250