xref: /unit/src/nxt_unit.h (revision 1663:08a83734a986)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #ifndef _NXT_UNIT_H_INCLUDED_
7 #define _NXT_UNIT_H_INCLUDED_
8 
9 
10 #include <inttypes.h>
11 #include <sys/types.h>
12 #include <sys/uio.h>
13 #include <string.h>
14 
15 #include "nxt_version.h"
16 #include "nxt_unit_typedefs.h"
17 
18 
19 enum {
20     NXT_UNIT_OK          = 0,
21     NXT_UNIT_ERROR       = 1,
22     NXT_UNIT_AGAIN       = 2,
23     NXT_UNIT_CANCELLED   = 3,
24 };
25 
26 enum {
27     NXT_UNIT_LOG_ALERT   = 0,
28     NXT_UNIT_LOG_ERR     = 1,
29     NXT_UNIT_LOG_WARN    = 2,
30     NXT_UNIT_LOG_NOTICE  = 3,
31     NXT_UNIT_LOG_INFO    = 4,
32     NXT_UNIT_LOG_DEBUG   = 5,
33 };
34 
35 #define NXT_UNIT_INIT_ENV  "NXT_UNIT_INIT"
36 
37 #define NXT_UNIT_SHARED_PORT_ID  ((uint16_t) 0xFFFFu)
38 
39 /*
40  * Mostly opaque structure with library state.
41  *
42  * Only user defined 'data' pointer exposed here.  The rest is unit
43  * implementation specific and hidden.
44  */
45 struct nxt_unit_s {
46     void                  *data;  /* User defined data. */
47 };
48 
49 /*
50  * Thread context.
51  *
52  * First (main) context is provided 'for free'.  To receive and process
53  * requests in other thread, one need to allocate context and use it
54  * further in this thread.
55  */
56 struct nxt_unit_ctx_s {
57     void                  *data;  /* User context-specific data. */
58     nxt_unit_t            *unit;
59 };
60 
61 /*
62  * Unit port identification structure.
63  *
64  * Each port can be uniquely identified by listen process id (pid) and port id.
65  * This identification is required to refer the port from different process.
66  */
67 struct nxt_unit_port_id_s {
68     pid_t                 pid;
69     uint32_t              hash;
70     uint16_t              id;
71 };
72 
73 /*
74  * unit provides port storage which is able to store and find the following
75  * data structures.
76  */
77 struct nxt_unit_port_s {
78     nxt_unit_port_id_t    id;
79 
80     int                   in_fd;
81     int                   out_fd;
82 
83     void                  *data;
84 };
85 
86 
87 struct nxt_unit_buf_s {
88     char                  *start;
89     char                  *free;
90     char                  *end;
91 };
92 
93 
94 struct nxt_unit_request_info_s {
95     nxt_unit_t            *unit;
96     nxt_unit_ctx_t        *ctx;
97 
98     nxt_unit_port_t       *response_port;
99 
100     nxt_unit_request_t    *request;
101     nxt_unit_buf_t        *request_buf;
102 
103     nxt_unit_response_t   *response;
104     nxt_unit_buf_t        *response_buf;
105     uint32_t              response_max_fields;
106 
107     nxt_unit_buf_t        *content_buf;
108     uint64_t              content_length;
109     int                   content_fd;
110 
111     void                  *data;
112 };
113 
114 
115 /*
116  * Set of application-specific callbacks. Application may leave all optional
117  * callbacks as NULL.
118  */
119 struct nxt_unit_callbacks_s {
120     /*
121      * Process request. Unlike all other callback, this callback
122      * need to be defined by application.
123      */
124     void     (*request_handler)(nxt_unit_request_info_t *req);
125 
126     void     (*data_handler)(nxt_unit_request_info_t *req);
127 
128     /* Process websocket frame. */
129     void     (*websocket_handler)(nxt_unit_websocket_frame_t *ws);
130 
131     /* Connection closed. */
132     void     (*close_handler)(nxt_unit_request_info_t *req);
133 
134     /* Add new Unit port to communicate with process pid. Optional. */
135     int      (*add_port)(nxt_unit_ctx_t *, nxt_unit_port_t *port);
136 
137     /* Remove previously added port. Optional. */
138     void     (*remove_port)(nxt_unit_t *, nxt_unit_port_t *port);
139 
140     /* Remove all data associated with process pid including ports. Optional. */
141     void     (*remove_pid)(nxt_unit_t *, pid_t pid);
142 
143     /* Gracefully quit the application. Optional. */
144     void     (*quit)(nxt_unit_ctx_t *);
145 
146     /* Shared memory release acknowledgement. Optional. */
147     void     (*shm_ack_handler)(nxt_unit_ctx_t *);
148 
149     /* Send data and control to process pid using port id. Optional. */
150     ssize_t  (*port_send)(nxt_unit_ctx_t *, nxt_unit_port_t *port,
151                  const void *buf, size_t buf_size,
152                  const void *oob, size_t oob_size);
153 
154     /* Receive data on port id. Optional. */
155     ssize_t  (*port_recv)(nxt_unit_ctx_t *, nxt_unit_port_t *port,
156                  void *buf, size_t buf_size, void *oob, size_t oob_size);
157 };
158 
159 
160 struct nxt_unit_init_s {
161     void                  *data;     /* Opaque pointer to user-defined data. */
162     void                  *ctx_data; /* Opaque pointer to user-defined data. */
163     int                   max_pending_requests;
164 
165     uint32_t              request_data_size;
166     uint32_t              shm_limit;
167 
168     nxt_unit_callbacks_t  callbacks;
169 
170     nxt_unit_port_t       ready_port;
171     uint32_t              ready_stream;
172     nxt_unit_port_t       router_port;
173     nxt_unit_port_t       read_port;
174     int                   log_fd;
175 };
176 
177 
178 typedef ssize_t (*nxt_unit_read_func_t)(nxt_unit_read_info_t *read_info,
179     void *dst, size_t size);
180 
181 
182 struct nxt_unit_read_info_s {
183     nxt_unit_read_func_t  read;
184     int                   eof;
185     uint32_t              buf_size;
186     void                  *data;
187 };
188 
189 
190 /*
191  * Initialize Unit application library with necessary callbacks and
192  * ready/reply port parameters, send 'READY' response to main.
193  */
194 nxt_unit_ctx_t *nxt_unit_init(nxt_unit_init_t *);
195 
196 /*
197  * Main function useful in case when application does not have it's own
198  * event loop. nxt_unit_run() starts infinite message wait and process loop.
199  *
200  *  for (;;) {
201  *      app_lib->port_recv(...);
202  *      nxt_unit_process_msg(...);
203  *  }
204  *
205  * The normally function returns when QUIT message received from Unit.
206  */
207 int nxt_unit_run(nxt_unit_ctx_t *);
208 
209 int nxt_unit_run_ctx(nxt_unit_ctx_t *ctx);
210 
211 int nxt_unit_run_shared(nxt_unit_ctx_t *ctx);
212 
213 /*
214  * Receive and process one message, invoke configured callbacks.
215  *
216  * If application implements it's own event loop, each datagram received
217  * from port socket should be initially processed by unit.  This function
218  * may invoke other application-defined callback for message processing.
219  */
220 int nxt_unit_run_once(nxt_unit_ctx_t *ctx);
221 
222 int nxt_unit_process_port_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
223 
224 /* Destroy application library object. */
225 void nxt_unit_done(nxt_unit_ctx_t *);
226 
227 /*
228  * Allocate and initialize new execution context with new listen port to
229  * process requests in other thread.
230  */
231 nxt_unit_ctx_t *nxt_unit_ctx_alloc(nxt_unit_ctx_t *, void *);
232 
233 /* Initialize port_id, calculate hash. */
234 void nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id);
235 
236 /* Calculates hash for given field name. */
237 uint16_t nxt_unit_field_hash(const char* name, size_t name_length);
238 
239 /* Split host for server name and port. */
240 void nxt_unit_split_host(char *host_start, uint32_t host_length,
241     char **name, uint32_t *name_length, char **port, uint32_t *port_length);
242 
243 /* Group duplicate fields for easy enumeration. */
244 void nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req);
245 
246 /*
247  * Allocate response structure capable to store limited numer of fields.
248  * The structure may be accessed directly via req->response pointer or
249  * filled step-by-step using functions add_field and add_content.
250  */
251 int nxt_unit_response_init(nxt_unit_request_info_t *req,
252     uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size);
253 
254 int nxt_unit_response_realloc(nxt_unit_request_info_t *req,
255     uint32_t max_fields_count, uint32_t max_fields_size);
256 
257 int nxt_unit_response_is_init(nxt_unit_request_info_t *req);
258 
259 int nxt_unit_response_add_field(nxt_unit_request_info_t *req,
260     const char* name, uint8_t name_length,
261     const char* value, uint32_t value_length);
262 
263 int nxt_unit_response_add_content(nxt_unit_request_info_t *req,
264     const void* src, uint32_t size);
265 
266 /*
267  * Send prepared response to Unit server.  Response structure destroyed during
268  * this call.
269  */
270 int nxt_unit_response_send(nxt_unit_request_info_t *req);
271 
272 int nxt_unit_response_is_sent(nxt_unit_request_info_t *req);
273 
274 nxt_unit_buf_t *nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req,
275     uint32_t size);
276 
277 int nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req);
278 
279 int nxt_unit_response_upgrade(nxt_unit_request_info_t *req);
280 
281 int nxt_unit_response_is_websocket(nxt_unit_request_info_t *req);
282 
283 nxt_unit_request_info_t *nxt_unit_get_request_info_from_data(void *data);
284 
285 int nxt_unit_buf_send(nxt_unit_buf_t *buf);
286 
287 void nxt_unit_buf_free(nxt_unit_buf_t *buf);
288 
289 nxt_unit_buf_t *nxt_unit_buf_next(nxt_unit_buf_t *buf);
290 
291 uint32_t nxt_unit_buf_max(void);
292 
293 uint32_t nxt_unit_buf_min(void);
294 
295 int nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
296     size_t size);
297 
298 ssize_t nxt_unit_response_write_nb(nxt_unit_request_info_t *req,
299     const void *start, size_t size, size_t min_size);
300 
301 int nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
302     nxt_unit_read_info_t *read_info);
303 
304 ssize_t nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst,
305     size_t size);
306 
307 ssize_t nxt_unit_request_readline_size(nxt_unit_request_info_t *req,
308     size_t max_size);
309 
310 void nxt_unit_request_done(nxt_unit_request_info_t *req, int rc);
311 
312 
313 int nxt_unit_websocket_send(nxt_unit_request_info_t *req, uint8_t opcode,
314     uint8_t last, const void *start, size_t size);
315 
316 int nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode,
317     uint8_t last, const struct iovec *iov, int iovcnt);
318 
319 ssize_t nxt_unit_websocket_read(nxt_unit_websocket_frame_t *ws, void *dst,
320     size_t size);
321 
322 int nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws);
323 
324 void nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws);
325 
326 
327 void *nxt_unit_malloc(nxt_unit_ctx_t *ctx, size_t size);
328 
329 void nxt_unit_free(nxt_unit_ctx_t *ctx, void *p);
330 
331 #if defined __has_attribute
332 
333 #if __has_attribute(format)
334 
335 #define NXT_ATTR_FORMAT  __attribute__((format(printf, 3, 4)))
336 
337 #endif
338 
339 #endif
340 
341 
342 #if !defined(NXT_ATTR_FORMAT)
343 
344 #define NXT_ATTR_FORMAT
345 
346 #endif
347 
348 
349 void nxt_unit_log(nxt_unit_ctx_t *ctx, int level, const char* fmt, ...)
350     NXT_ATTR_FORMAT;
351 
352 void nxt_unit_req_log(nxt_unit_request_info_t *req, int level,
353     const char* fmt, ...) NXT_ATTR_FORMAT;
354 
355 #if (NXT_DEBUG)
356 
357 #define nxt_unit_debug(ctx, fmt, ARGS...) \
358     nxt_unit_log(ctx, NXT_UNIT_LOG_DEBUG, fmt, ##ARGS)
359 
360 #define nxt_unit_req_debug(req, fmt, ARGS...) \
361     nxt_unit_req_log(req, NXT_UNIT_LOG_DEBUG, fmt, ##ARGS)
362 
363 #else
364 
365 #define nxt_unit_debug(ctx, fmt, ARGS...)
366 
367 #define nxt_unit_req_debug(req, fmt, ARGS...)
368 
369 #endif
370 
371 
372 #define nxt_unit_warn(ctx, fmt, ARGS...) \
373     nxt_unit_log(ctx, NXT_UNIT_LOG_WARN, fmt, ##ARGS)
374 
375 #define nxt_unit_req_warn(req, fmt, ARGS...) \
376     nxt_unit_req_log(req, NXT_UNIT_LOG_WARN, fmt, ##ARGS)
377 
378 #define nxt_unit_error(ctx, fmt, ARGS...) \
379     nxt_unit_log(ctx, NXT_UNIT_LOG_ERR, fmt, ##ARGS)
380 
381 #define nxt_unit_req_error(req, fmt, ARGS...) \
382     nxt_unit_req_log(req, NXT_UNIT_LOG_ERR, fmt, ##ARGS)
383 
384 #define nxt_unit_alert(ctx, fmt, ARGS...) \
385     nxt_unit_log(ctx, NXT_UNIT_LOG_ALERT, fmt, ##ARGS)
386 
387 #define nxt_unit_req_alert(req, fmt, ARGS...) \
388     nxt_unit_req_log(req, NXT_UNIT_LOG_ALERT, fmt, ##ARGS)
389 
390 
391 #endif /* _NXT_UNIT_H_INCLUDED_ */
392