xref: /unit/src/nxt_unit.h (revision 1546:06017e6e3a5f)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #ifndef _NXT_UNIT_H_INCLUDED_
7 #define _NXT_UNIT_H_INCLUDED_
8 
9 
10 #include <inttypes.h>
11 #include <sys/types.h>
12 #include <sys/uio.h>
13 #include <string.h>
14 
15 #include "nxt_version.h"
16 #include "nxt_unit_typedefs.h"
17 
18 
19 enum {
20     NXT_UNIT_OK          = 0,
21     NXT_UNIT_ERROR       = 1,
22     NXT_UNIT_AGAIN       = 2,
23     NXT_UNIT_CANCELLED   = 3,
24 };
25 
26 enum {
27     NXT_UNIT_LOG_ALERT   = 0,
28     NXT_UNIT_LOG_ERR     = 1,
29     NXT_UNIT_LOG_WARN    = 2,
30     NXT_UNIT_LOG_NOTICE  = 3,
31     NXT_UNIT_LOG_INFO    = 4,
32     NXT_UNIT_LOG_DEBUG   = 5,
33 };
34 
35 #define NXT_UNIT_INIT_ENV  "NXT_UNIT_INIT"
36 
37 /*
38  * Mostly opaque structure with library state.
39  *
40  * Only user defined 'data' pointer exposed here.  The rest is unit
41  * implementation specific and hidden.
42  */
43 struct nxt_unit_s {
44     void                  *data;  /* User defined data. */
45 };
46 
47 /*
48  * Thread context.
49  *
50  * First (main) context is provided 'for free'.  To receive and process
51  * requests in other thread, one need to allocate context and use it
52  * further in this thread.
53  */
54 struct nxt_unit_ctx_s {
55     void                  *data;  /* User context-specific data. */
56     nxt_unit_t            *unit;
57 };
58 
59 /*
60  * Unit port identification structure.
61  *
62  * Each port can be uniquely identified by listen process id (pid) and port id.
63  * This identification is required to refer the port from different process.
64  */
65 struct nxt_unit_port_id_s {
66     pid_t                 pid;
67     uint32_t              hash;
68     uint16_t              id;
69 };
70 
71 /*
72  * unit provides port storage which is able to store and find the following
73  * data structures.
74  */
75 struct nxt_unit_port_s {
76     nxt_unit_port_id_t    id;
77 
78     int                   in_fd;
79     int                   out_fd;
80 
81     void                  *data;
82 };
83 
84 
85 struct nxt_unit_buf_s {
86     char                  *start;
87     char                  *free;
88     char                  *end;
89 };
90 
91 
92 struct nxt_unit_request_info_s {
93     nxt_unit_t            *unit;
94     nxt_unit_ctx_t        *ctx;
95 
96     nxt_unit_port_t       *response_port;
97 
98     nxt_unit_request_t    *request;
99     nxt_unit_buf_t        *request_buf;
100 
101     nxt_unit_response_t   *response;
102     nxt_unit_buf_t        *response_buf;
103     uint32_t              response_max_fields;
104 
105     nxt_unit_buf_t        *content_buf;
106     uint64_t              content_length;
107     int                   content_fd;
108 
109     void                  *data;
110 };
111 
112 
113 /*
114  * Set of application-specific callbacks. Application may leave all optional
115  * callbacks as NULL.
116  */
117 struct nxt_unit_callbacks_s {
118     /*
119      * Process request. Unlike all other callback, this callback
120      * need to be defined by application.
121      */
122     void     (*request_handler)(nxt_unit_request_info_t *req);
123 
124     /* Process websocket frame. */
125     void     (*websocket_handler)(nxt_unit_websocket_frame_t *ws);
126 
127     /* Connection closed. */
128     void     (*close_handler)(nxt_unit_request_info_t *req);
129 
130     /* Add new Unit port to communicate with process pid. Optional. */
131     int      (*add_port)(nxt_unit_ctx_t *, nxt_unit_port_t *port);
132 
133     /* Remove previously added port. Optional. */
134     void     (*remove_port)(nxt_unit_t *, nxt_unit_port_t *port);
135 
136     /* Remove all data associated with process pid including ports. Optional. */
137     void     (*remove_pid)(nxt_unit_t *, pid_t pid);
138 
139     /* Gracefully quit the application. Optional. */
140     void     (*quit)(nxt_unit_ctx_t *);
141 
142     /* Shared memory release acknowledgement. Optional. */
143     void     (*shm_ack_handler)(nxt_unit_ctx_t *);
144 
145     /* Send data and control to process pid using port id. Optional. */
146     ssize_t  (*port_send)(nxt_unit_ctx_t *, nxt_unit_port_t *port,
147                  const void *buf, size_t buf_size,
148                  const void *oob, size_t oob_size);
149 
150     /* Receive data on port id. Optional. */
151     ssize_t  (*port_recv)(nxt_unit_ctx_t *, nxt_unit_port_t *port,
152                  void *buf, size_t buf_size, void *oob, size_t oob_size);
153 };
154 
155 
156 struct nxt_unit_init_s {
157     void                  *data;     /* Opaque pointer to user-defined data. */
158     void                  *ctx_data; /* Opaque pointer to user-defined data. */
159     int                   max_pending_requests;
160 
161     uint32_t              request_data_size;
162     uint32_t              shm_limit;
163 
164     nxt_unit_callbacks_t  callbacks;
165 
166     nxt_unit_port_t       ready_port;
167     uint32_t              ready_stream;
168     nxt_unit_port_t       router_port;
169     nxt_unit_port_t       read_port;
170     int                   log_fd;
171 };
172 
173 
174 typedef ssize_t (*nxt_unit_read_func_t)(nxt_unit_read_info_t *read_info,
175     void *dst, size_t size);
176 
177 
178 struct nxt_unit_read_info_s {
179     nxt_unit_read_func_t  read;
180     int                   eof;
181     uint32_t              buf_size;
182     void                  *data;
183 };
184 
185 
186 /*
187  * Initialize Unit application library with necessary callbacks and
188  * ready/reply port parameters, send 'READY' response to master.
189  */
190 nxt_unit_ctx_t *nxt_unit_init(nxt_unit_init_t *);
191 
192 /*
193  * Main function useful in case when application does not have it's own
194  * event loop. nxt_unit_run() starts infinite message wait and process loop.
195  *
196  *  for (;;) {
197  *      app_lib->port_recv(...);
198  *      nxt_unit_process_msg(...);
199  *  }
200  *
201  * The normally function returns when QUIT message received from Unit.
202  */
203 int nxt_unit_run(nxt_unit_ctx_t *);
204 
205 int nxt_unit_run_once(nxt_unit_ctx_t *ctx);
206 
207 /* Destroy application library object. */
208 void nxt_unit_done(nxt_unit_ctx_t *);
209 
210 /*
211  * Allocate and initialize new execution context with new listen port to
212  * process requests in other thread.
213  */
214 nxt_unit_ctx_t *nxt_unit_ctx_alloc(nxt_unit_ctx_t *, void *);
215 
216 /* Initialize port_id, calculate hash. */
217 void nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id);
218 
219 /* Calculates hash for given field name. */
220 uint16_t nxt_unit_field_hash(const char* name, size_t name_length);
221 
222 /* Split host for server name and port. */
223 void nxt_unit_split_host(char *host_start, uint32_t host_length,
224     char **name, uint32_t *name_length, char **port, uint32_t *port_length);
225 
226 /* Group duplicate fields for easy enumeration. */
227 void nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req);
228 
229 /*
230  * Allocate response structure capable to store limited numer of fields.
231  * The structure may be accessed directly via req->response pointer or
232  * filled step-by-step using functions add_field and add_content.
233  */
234 int nxt_unit_response_init(nxt_unit_request_info_t *req,
235     uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size);
236 
237 int nxt_unit_response_realloc(nxt_unit_request_info_t *req,
238     uint32_t max_fields_count, uint32_t max_fields_size);
239 
240 int nxt_unit_response_is_init(nxt_unit_request_info_t *req);
241 
242 int nxt_unit_response_add_field(nxt_unit_request_info_t *req,
243     const char* name, uint8_t name_length,
244     const char* value, uint32_t value_length);
245 
246 int nxt_unit_response_add_content(nxt_unit_request_info_t *req,
247     const void* src, uint32_t size);
248 
249 /*
250  * Send prepared response to Unit server.  Response structure destroyed during
251  * this call.
252  */
253 int nxt_unit_response_send(nxt_unit_request_info_t *req);
254 
255 int nxt_unit_response_is_sent(nxt_unit_request_info_t *req);
256 
257 nxt_unit_buf_t *nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req,
258     uint32_t size);
259 
260 int nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req);
261 
262 int nxt_unit_response_upgrade(nxt_unit_request_info_t *req);
263 
264 int nxt_unit_response_is_websocket(nxt_unit_request_info_t *req);
265 
266 nxt_unit_request_info_t *nxt_unit_get_request_info_from_data(void *data);
267 
268 int nxt_unit_buf_send(nxt_unit_buf_t *buf);
269 
270 void nxt_unit_buf_free(nxt_unit_buf_t *buf);
271 
272 nxt_unit_buf_t *nxt_unit_buf_next(nxt_unit_buf_t *buf);
273 
274 uint32_t nxt_unit_buf_max(void);
275 
276 uint32_t nxt_unit_buf_min(void);
277 
278 int nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
279     size_t size);
280 
281 ssize_t nxt_unit_response_write_nb(nxt_unit_request_info_t *req,
282     const void *start, size_t size, size_t min_size);
283 
284 int nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
285     nxt_unit_read_info_t *read_info);
286 
287 ssize_t nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst,
288     size_t size);
289 
290 ssize_t nxt_unit_request_readline_size(nxt_unit_request_info_t *req,
291     size_t max_size);
292 
293 void nxt_unit_request_done(nxt_unit_request_info_t *req, int rc);
294 
295 
296 int nxt_unit_websocket_send(nxt_unit_request_info_t *req, uint8_t opcode,
297     uint8_t last, const void *start, size_t size);
298 
299 int nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode,
300     uint8_t last, const struct iovec *iov, int iovcnt);
301 
302 ssize_t nxt_unit_websocket_read(nxt_unit_websocket_frame_t *ws, void *dst,
303     size_t size);
304 
305 int nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws);
306 
307 void nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws);
308 
309 
310 #if defined __has_attribute
311 
312 #if __has_attribute(format)
313 
314 #define NXT_ATTR_FORMAT  __attribute__((format(printf, 3, 4)))
315 
316 #endif
317 
318 #endif
319 
320 
321 #if !defined(NXT_ATTR_FORMAT)
322 
323 #define NXT_ATTR_FORMAT
324 
325 #endif
326 
327 
328 void nxt_unit_log(nxt_unit_ctx_t *ctx, int level, const char* fmt, ...)
329     NXT_ATTR_FORMAT;
330 
331 void nxt_unit_req_log(nxt_unit_request_info_t *req, int level,
332     const char* fmt, ...) NXT_ATTR_FORMAT;
333 
334 #if (NXT_DEBUG)
335 
336 #define nxt_unit_debug(ctx, fmt, ARGS...) \
337     nxt_unit_log(ctx, NXT_UNIT_LOG_DEBUG, fmt, ##ARGS)
338 
339 #define nxt_unit_req_debug(req, fmt, ARGS...) \
340     nxt_unit_req_log(req, NXT_UNIT_LOG_DEBUG, fmt, ##ARGS)
341 
342 #else
343 
344 #define nxt_unit_debug(ctx, fmt, ARGS...)
345 
346 #define nxt_unit_req_debug(req, fmt, ARGS...)
347 
348 #endif
349 
350 
351 #define nxt_unit_warn(ctx, fmt, ARGS...) \
352     nxt_unit_log(ctx, NXT_UNIT_LOG_WARN, fmt, ##ARGS)
353 
354 #define nxt_unit_req_warn(req, fmt, ARGS...) \
355     nxt_unit_req_log(req, NXT_UNIT_LOG_WARN, fmt, ##ARGS)
356 
357 #define nxt_unit_error(ctx, fmt, ARGS...) \
358     nxt_unit_log(ctx, NXT_UNIT_LOG_ERR, fmt, ##ARGS)
359 
360 #define nxt_unit_req_error(req, fmt, ARGS...) \
361     nxt_unit_req_log(req, NXT_UNIT_LOG_ERR, fmt, ##ARGS)
362 
363 #define nxt_unit_alert(ctx, fmt, ARGS...) \
364     nxt_unit_log(ctx, NXT_UNIT_LOG_ALERT, fmt, ##ARGS)
365 
366 #define nxt_unit_req_alert(req, fmt, ARGS...) \
367     nxt_unit_req_log(req, NXT_UNIT_LOG_ALERT, fmt, ##ARGS)
368 
369 
370 #endif /* _NXT_UNIT_H_INCLUDED_ */
371