xref: /unit/src/nxt_port.h (revision 216:07257705cd64)
1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #ifndef _NXT_PORT_H_INCLUDED_
8 #define _NXT_PORT_H_INCLUDED_
9 
10 
11 typedef enum {
12     NXT_PORT_MSG_LAST           = 0x100,
13     NXT_PORT_MSG_CLOSE_FD       = 0x200,
14     NXT_PORT_MSG_SYNC           = 0x400,
15 
16     NXT_PORT_MSG_MASK           = 0xFF,
17 
18     _NXT_PORT_MSG_QUIT          = 0,
19     _NXT_PORT_MSG_NEW_PORT,
20     _NXT_PORT_MSG_CHANGE_FILE,
21     _NXT_PORT_MSG_MMAP,
22     _NXT_PORT_MSG_DATA,
23     _NXT_PORT_MSG_REMOVE_PID,
24     _NXT_PORT_MSG_READY,
25     _NXT_PORT_MSG_START_WORKER,
26     _NXT_PORT_MSG_SOCKET,
27     _NXT_PORT_MSG_MODULES,
28     _NXT_PORT_MSG_RPC_READY,
29     _NXT_PORT_MSG_RPC_ERROR,
30 
31     NXT_PORT_MSG_MAX,
32 
33     NXT_PORT_MSG_QUIT           = _NXT_PORT_MSG_QUIT | NXT_PORT_MSG_LAST,
34     NXT_PORT_MSG_NEW_PORT       = _NXT_PORT_MSG_NEW_PORT | NXT_PORT_MSG_LAST,
35     NXT_PORT_MSG_CHANGE_FILE    = _NXT_PORT_MSG_CHANGE_FILE | NXT_PORT_MSG_LAST,
36     NXT_PORT_MSG_MMAP           = _NXT_PORT_MSG_MMAP | NXT_PORT_MSG_LAST |
37                                   NXT_PORT_MSG_CLOSE_FD | NXT_PORT_MSG_SYNC,
38     NXT_PORT_MSG_DATA           = _NXT_PORT_MSG_DATA,
39     NXT_PORT_MSG_DATA_LAST      = _NXT_PORT_MSG_DATA | NXT_PORT_MSG_LAST,
40     NXT_PORT_MSG_REMOVE_PID     = _NXT_PORT_MSG_REMOVE_PID | NXT_PORT_MSG_LAST,
41     NXT_PORT_MSG_READY          = _NXT_PORT_MSG_READY | NXT_PORT_MSG_LAST,
42     NXT_PORT_MSG_START_WORKER   = _NXT_PORT_MSG_START_WORKER |
43                                   NXT_PORT_MSG_LAST,
44     NXT_PORT_MSG_SOCKET         = _NXT_PORT_MSG_SOCKET | NXT_PORT_MSG_LAST,
45     NXT_PORT_MSG_MODULES        = _NXT_PORT_MSG_MODULES | NXT_PORT_MSG_LAST,
46     NXT_PORT_MSG_RPC_READY      = _NXT_PORT_MSG_RPC_READY,
47     NXT_PORT_MSG_RPC_READY_LAST = _NXT_PORT_MSG_RPC_READY | NXT_PORT_MSG_LAST,
48     NXT_PORT_MSG_RPC_ERROR      = _NXT_PORT_MSG_RPC_ERROR | NXT_PORT_MSG_LAST,
49 } nxt_port_msg_type_t;
50 
51 
52 /* Passed as a first iov chunk. */
53 typedef struct {
54     uint32_t             stream;
55     nxt_pid_t            pid;
56     nxt_port_id_t        reply_port;
57 
58     uint8_t              type;
59     uint8_t              last;      /* 1 bit */
60 
61     /* Message data send using mmap, next chunk is a nxt_port_mmap_msg_t. */
62     uint8_t              mmap;      /* 1 bit */
63 } nxt_port_msg_t;
64 
65 
66 typedef struct {
67     nxt_queue_link_t    link;
68     nxt_buf_t           *buf;
69     size_t              share;
70     nxt_fd_t            fd;
71     nxt_bool_t          close_fd;
72     nxt_bool_t          opened;
73     nxt_port_msg_t      port_msg;
74 
75     nxt_work_t          work;
76     nxt_event_engine_t  *engine;
77     nxt_mp_t            *mem_pool;
78 } nxt_port_send_msg_t;
79 
80 
81 struct nxt_port_recv_msg_s {
82     nxt_fd_t            fd;
83     nxt_buf_t           *buf;
84     nxt_port_t          *port;
85     nxt_port_msg_t      port_msg;
86     size_t              size;
87     nxt_port_t          *new_port;
88 };
89 
90 typedef struct nxt_app_s  nxt_app_t;
91 
92 struct nxt_port_s {
93     nxt_fd_event_t      socket;
94 
95     nxt_queue_link_t    link;       /* for nxt_process_t.ports */
96     nxt_process_t       *process;
97 
98     nxt_queue_link_t    app_link;   /* for nxt_app_t.ports */
99     nxt_app_t           *app;
100 
101     nxt_queue_t         messages;   /* of nxt_port_send_msg_t */
102 
103     /* Maximum size of message part. */
104     uint32_t            max_size;
105     /* Maximum interleave of message parts. */
106     uint32_t            max_share;
107     uint32_t            app_req_id;
108 
109     nxt_port_handler_t  handler;
110     nxt_port_handler_t  *data;
111 
112     nxt_mp_t            *mem_pool;
113     nxt_event_engine_t  *engine;
114 
115     nxt_buf_t           *free_bufs;
116     nxt_socket_t        pair[2];
117 
118     nxt_port_id_t       id;
119     nxt_pid_t           pid;
120 
121     nxt_lvlhsh_t        rpc_streams; /* stream to nxt_port_rpc_reg_t */
122     nxt_lvlhsh_t        rpc_peers;   /* peer to queue of nxt_port_rpc_reg_t */
123     uint32_t            next_stream;
124 
125     nxt_process_type_t  type;
126     nxt_work_t          work;
127 
128     struct iovec        *iov;
129     void                *mmsg_buf;
130 };
131 
132 
133 typedef struct {
134     nxt_port_id_t       id;
135     nxt_pid_t           pid;
136     size_t              max_size;
137     size_t              max_share;
138     nxt_process_type_t  type:8;
139 } nxt_port_msg_new_port_t;
140 
141 
142 /*
143  * nxt_port_data_t size is allocation size
144  * which enables effective reuse of memory pool cache.
145  */
146 typedef union {
147     nxt_buf_t                buf;
148     nxt_port_msg_new_port_t  new_port;
149 } nxt_port_data_t;
150 
151 
152 nxt_port_t *nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid,
153     nxt_process_type_t type);
154 nxt_bool_t nxt_port_release(nxt_port_t *port);
155 
156 nxt_port_id_t nxt_port_get_next_id(void);
157 void nxt_port_reset_next_id(void);
158 
159 nxt_int_t nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port,
160     size_t max_size);
161 void nxt_port_destroy(nxt_port_t *port);
162 void nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port);
163 void nxt_port_write_close(nxt_port_t *port);
164 void nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port);
165 void nxt_port_read_close(nxt_port_t *port);
166 nxt_int_t nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port,
167     nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port,
168     nxt_buf_t *b);
169 
170 void nxt_port_enable(nxt_task_t *task, nxt_port_t *port,
171     nxt_port_handler_t *handlers);
172 void nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt,
173     nxt_port_t *port, uint32_t stream);
174 nxt_int_t nxt_port_send_port(nxt_task_t *task, nxt_port_t *port,
175     nxt_port_t *new_port, uint32_t stream);
176 void nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt,
177     nxt_uint_t slot, nxt_fd_t fd);
178 
179 void nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
180 void nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
181 void nxt_port_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
182 void nxt_port_change_log_file_handler(nxt_task_t *task,
183     nxt_port_recv_msg_t *msg);
184 void nxt_port_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
185 void nxt_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
186 void nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
187 void nxt_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
188 
189 
190 #endif /* _NXT_PORT_H_INCLUDED_ */
191