xref: /unit/src/nxt_unit.h (revision 743:e0f0cd7d244a)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #ifndef _NXT_UNIT_H_INCLUDED_
7 #define _NXT_UNIT_H_INCLUDED_
8 
9 
10 #include <inttypes.h>
11 #include <sys/types.h>
12 #include <string.h>
13 
14 #include "nxt_unit_typedefs.h"
15 
16 enum {
17     NXT_UNIT_OK          = 0,
18     NXT_UNIT_ERROR       = 1,
19 };
20 
21 enum {
22     NXT_UNIT_LOG_ALERT   = 0,
23     NXT_UNIT_LOG_ERR     = 1,
24     NXT_UNIT_LOG_WARN    = 2,
25     NXT_UNIT_LOG_NOTICE  = 3,
26     NXT_UNIT_LOG_INFO    = 4,
27     NXT_UNIT_LOG_DEBUG   = 5,
28 };
29 
30 #define NXT_UNIT_INIT_ENV  "NXT_UNIT_INIT"
31 
32 /*
33  * Mostly opaque structure with library state.
34  *
35  * Only user defined 'data' pointer exposed here.  The rest is unit
36  * implementation specific and hidden.
37  */
38 struct nxt_unit_s {
39     void                  *data;  /* User defined data. */
40 };
41 
42 /*
43  * Thread context.
44  *
45  * First (main) context is provided 'for free'.  To receive and process
46  * requests in other thread, one need to allocate context and use it
47  * further in this thread.
48  */
49 struct nxt_unit_ctx_s {
50     void                  *data;  /* User context-specific data. */
51     nxt_unit_t            *unit;
52 };
53 
54 /*
55  * Unit port identification structure.
56  *
57  * Each port can be uniquely identified by listen process id (pid) and port id.
58  * This identification is required to refer the port from different process.
59  */
60 struct nxt_unit_port_id_s {
61     pid_t                 pid;
62     uint32_t              hash;
63     uint16_t              id;
64 };
65 
66 /*
67  * unit provides port storage which is able to store and find the following
68  * data structures.
69  */
70 struct nxt_unit_port_s {
71     nxt_unit_port_id_t    id;
72 
73     int                   in_fd;
74     int                   out_fd;
75 
76     void                  *data;
77 };
78 
79 
80 struct nxt_unit_buf_s {
81     char                  *start;
82     char                  *free;
83     char                  *end;
84 };
85 
86 
87 struct nxt_unit_request_info_s {
88     nxt_unit_t            *unit;
89     nxt_unit_ctx_t        *ctx;
90 
91     nxt_unit_port_id_t    request_port;
92     nxt_unit_port_id_t    response_port;
93 
94     nxt_unit_request_t    *request;
95     nxt_unit_buf_t        *request_buf;
96 
97     nxt_unit_response_t   *response;
98     nxt_unit_buf_t        *response_buf;
99     uint32_t              response_max_fields;
100 
101     nxt_unit_buf_t        *content_buf;
102     uint64_t              content_length;
103 
104     void                  *data;
105 };
106 
107 /*
108  * Set of application-specific callbacks. Application may leave all optional
109  * callbacks as NULL.
110  */
111 struct nxt_unit_callbacks_s {
112     /*
113      * Process request data. Unlike all other callback, this callback
114      * need to be defined by application.
115      */
116     void     (*request_handler)(nxt_unit_request_info_t *req);
117 
118     /* Add new Unit port to communicate with process pid. Optional. */
119     int      (*add_port)(nxt_unit_ctx_t *, nxt_unit_port_t *port);
120 
121     /* Remove previously added port. Optional. */
122     void     (*remove_port)(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id);
123 
124     /* Remove all data associated with process pid including ports. Optional. */
125     void     (*remove_pid)(nxt_unit_ctx_t *, pid_t pid);
126 
127     /* Gracefully quit the application. Optional. */
128     void     (*quit)(nxt_unit_ctx_t *);
129 
130     /* Send data and control to process pid using port id. Optional. */
131     ssize_t  (*port_send)(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id,
132                  const void *buf, size_t buf_size,
133                  const void *oob, size_t oob_size);
134 
135     /* Receive data on port id. Optional. */
136     ssize_t  (*port_recv)(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id,
137                  void *buf, size_t buf_size, void *oob, size_t oob_size);
138 
139 };
140 
141 
142 struct nxt_unit_init_s {
143     void                  *data;     /* Opaque pointer to user-defined data. */
144     void                  *ctx_data; /* Opaque pointer to user-defined data. */
145     int                   max_pending_requests;
146 
147     uint32_t              request_data_size;
148 
149     nxt_unit_callbacks_t  callbacks;
150 
151     nxt_unit_port_t       ready_port;
152     uint32_t              ready_stream;
153     nxt_unit_port_t       read_port;
154     int                   log_fd;
155 };
156 
157 
158 typedef ssize_t (*nxt_unit_read_func_t)(nxt_unit_read_info_t *read_info,
159     void *dst, size_t size);
160 
161 
162 struct nxt_unit_read_info_s {
163     nxt_unit_read_func_t  read;
164     int                   eof;
165     uint32_t              buf_size;
166     void                  *data;
167 };
168 
169 
170 /*
171  * Initialize Unit application library with necessary callbacks and
172  * ready/reply port parameters, send 'READY' response to master.
173  */
174 nxt_unit_ctx_t *nxt_unit_init(nxt_unit_init_t *);
175 
176 /*
177  * Process received message, invoke configured callbacks.
178  *
179  * If application implements it's own event loop, each datagram received
180  * from port socket should be initially processed by unit.  This function
181  * may invoke other application-defined callback for message processing.
182  */
183 int nxt_unit_process_msg(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id,
184     void *buf, size_t buf_size, void *oob, size_t oob_size);
185 
186 /*
187  * Main function useful in case when application does not have it's own
188  * event loop. nxt_unit_run() starts infinite message wait and process loop.
189  *
190  *  for (;;) {
191  *      app_lib->port_recv(...);
192  *      nxt_unit_process_msg(...);
193  *  }
194  *
195  * The normally function returns when QUIT message received from Unit.
196  */
197 int nxt_unit_run(nxt_unit_ctx_t *);
198 
199 /* Destroy application library object. */
200 void nxt_unit_done(nxt_unit_ctx_t *);
201 
202 /*
203  * Allocate and initialize new execution context with new listen port to
204  * process requests in other thread.
205  */
206 nxt_unit_ctx_t *nxt_unit_ctx_alloc(nxt_unit_ctx_t *, void *);
207 
208 /* Free unused context.  It is not required to free main context. */
209 void nxt_unit_ctx_free(nxt_unit_ctx_t *);
210 
211 /* Initialize port_id, calculate hash. */
212 void nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id);
213 
214 /*
215  * Create extra incoming port, perform all required actions to propogate
216  * the port to Unit server.  Fills structure referenced by port_id with
217  * current pid and new port id.
218  */
219 int nxt_unit_create_send_port(nxt_unit_ctx_t *, nxt_unit_port_id_t *dst,
220     nxt_unit_port_id_t *port_id);
221 
222 /* Default 'add_port' implementation. */
223 int nxt_unit_add_port(nxt_unit_ctx_t *, nxt_unit_port_t *port);
224 
225 /* Find previously added port. */
226 nxt_unit_port_t *nxt_unit_find_port(nxt_unit_ctx_t *,
227     nxt_unit_port_id_t *port_id);
228 
229 /* Find, fill output 'port' and remove port from storage.  */
230 void nxt_unit_find_remove_port(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id,
231     nxt_unit_port_t *port);
232 
233 /* Default 'remove_port' implementation. */
234 void nxt_unit_remove_port(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id);
235 
236 /* Default 'remove_pid' implementation. */
237 void nxt_unit_remove_pid(nxt_unit_ctx_t *, pid_t pid);
238 
239 /* Default 'quit' implementation. */
240 void nxt_unit_quit(nxt_unit_ctx_t *);
241 
242 /* Default 'port_send' implementation. */
243 ssize_t nxt_unit_port_send(nxt_unit_ctx_t *, int fd,
244     const void *buf, size_t buf_size,
245     const void *oob, size_t oob_size);
246 
247 /* Default 'port_recv' implementation. */
248 ssize_t nxt_unit_port_recv(nxt_unit_ctx_t *, int fd, void *buf, size_t buf_size,
249     void *oob, size_t oob_size);
250 
251 /* Calculates hash for given field name. */
252 uint16_t nxt_unit_field_hash(const char* name, size_t name_length);
253 
254 /* Split host for server name and port. */
255 void nxt_unit_split_host(char *host_start, uint32_t host_length,
256     char **name, uint32_t *name_length, char **port, uint32_t *port_length);
257 
258 /* Group duplicate fields for easy enumeration. */
259 void nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req);
260 
261 /*
262  * Allocate response structure capable to store limited numer of fields.
263  * The structure may be accessed directly via req->response pointer or
264  * filled step-by-step using functions add_field and add_content.
265  */
266 int nxt_unit_response_init(nxt_unit_request_info_t *req,
267     uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size);
268 
269 int nxt_unit_response_realloc(nxt_unit_request_info_t *req,
270     uint32_t max_fields_count, uint32_t max_fields_size);
271 
272 int nxt_unit_response_is_init(nxt_unit_request_info_t *req);
273 
274 int nxt_unit_response_add_field(nxt_unit_request_info_t *req,
275     const char* name, uint8_t name_length,
276     const char* value, uint32_t value_length);
277 
278 int nxt_unit_response_add_content(nxt_unit_request_info_t *req,
279     const void* src, uint32_t size);
280 
281 /*
282  * Send prepared response to Unit server.  Response structure destroyed during
283  * this call.
284  */
285 int nxt_unit_response_send(nxt_unit_request_info_t *req);
286 
287 int nxt_unit_response_is_sent(nxt_unit_request_info_t *req);
288 
289 nxt_unit_buf_t *nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req,
290     uint32_t size);
291 
292 int nxt_unit_buf_send(nxt_unit_buf_t *buf);
293 
294 void nxt_unit_buf_free(nxt_unit_buf_t *buf);
295 
296 nxt_unit_buf_t *nxt_unit_buf_next(nxt_unit_buf_t *buf);
297 
298 uint32_t nxt_unit_buf_max(void);
299 
300 uint32_t nxt_unit_buf_min(void);
301 
302 int nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
303     size_t size);
304 
305 int nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
306     nxt_unit_read_info_t *read_info);
307 
308 ssize_t nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst,
309     size_t size);
310 
311 void nxt_unit_request_done(nxt_unit_request_info_t *req, int rc);
312 
313 
314 void nxt_unit_log(nxt_unit_ctx_t *ctx, int level, const char* fmt, ...);
315 
316 void nxt_unit_req_log(nxt_unit_request_info_t *req, int level,
317     const char* fmt, ...);
318 
319 #if (NXT_DEBUG)
320 
321 #define nxt_unit_debug(ctx, fmt, ARGS...) \
322     nxt_unit_log(ctx, NXT_UNIT_LOG_DEBUG, fmt, ##ARGS)
323 
324 #define nxt_unit_req_debug(req, fmt, ARGS...) \
325     nxt_unit_req_log(req, NXT_UNIT_LOG_DEBUG, fmt, ##ARGS)
326 
327 #else
328 
329 #define nxt_unit_debug(ctx, fmt, ARGS...)
330 
331 #define nxt_unit_req_debug(req, fmt, ARGS...)
332 
333 #endif
334 
335 
336 #define nxt_unit_warn(ctx, fmt, ARGS...) \
337     nxt_unit_log(ctx, NXT_UNIT_LOG_WARN, fmt, ##ARGS)
338 
339 #define nxt_unit_req_warn(req, fmt, ARGS...) \
340     nxt_unit_req_log(req, NXT_UNIT_LOG_WARN, fmt, ##ARGS)
341 
342 #define nxt_unit_error(ctx, fmt, ARGS...) \
343     nxt_unit_log(ctx, NXT_UNIT_LOG_ERR, fmt, ##ARGS)
344 
345 #define nxt_unit_req_error(req, fmt, ARGS...) \
346     nxt_unit_req_log(req, NXT_UNIT_LOG_ERR, fmt, ##ARGS)
347 
348 #define nxt_unit_alert(ctx, fmt, ARGS...) \
349     nxt_unit_log(ctx, NXT_UNIT_LOG_ALERT, fmt, ##ARGS)
350 
351 #define nxt_unit_req_alert(req, fmt, ARGS...) \
352     nxt_unit_req_log(req, NXT_UNIT_LOG_ALERT, fmt, ##ARGS)
353 
354 
355 #endif /* _NXT_UNIT_H_INCLUDED_ */
356