xref: /unit/src/nxt_unit.h (revision 2596:7ea1758fb8d4)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #ifndef _NXT_UNIT_H_INCLUDED_
7 #define _NXT_UNIT_H_INCLUDED_
8 
9 
10 #include <inttypes.h>
11 #include <sys/types.h>
12 #include <sys/uio.h>
13 #include <string.h>
14 
15 #include "nxt_auto_config.h"
16 #include "nxt_version.h"
17 #include "nxt_unit_typedefs.h"
18 
19 
20 enum {
21     NXT_UNIT_OK          = 0,
22     NXT_UNIT_ERROR       = 1,
23     NXT_UNIT_AGAIN       = 2,
24     NXT_UNIT_CANCELLED   = 3,
25 };
26 
27 enum {
28     NXT_UNIT_LOG_ALERT   = 0,
29     NXT_UNIT_LOG_ERR     = 1,
30     NXT_UNIT_LOG_WARN    = 2,
31     NXT_UNIT_LOG_NOTICE  = 3,
32     NXT_UNIT_LOG_INFO    = 4,
33     NXT_UNIT_LOG_DEBUG   = 5,
34 };
35 
36 #define NXT_UNIT_INIT_ENV  "NXT_UNIT_INIT"
37 
38 #define NXT_UNIT_SHARED_PORT_ID  ((uint16_t) 0xFFFFu)
39 
40 /*
41  * Mostly opaque structure with library state.
42  *
43  * Only the user defined 'data' pointer is exposed here.  The rest is unit
44  * implementation specific and hidden.
45  */
46 struct nxt_unit_s {
47     void                  *data;  /* User defined data. */
48 };
49 
50 /*
51  * Thread context.
52  *
53  * First (main) context is provided 'for free'.  To receive and process
54  * requests in other threads, one needs to allocate a new context and use it
55  * further in that thread.
56  */
57 struct nxt_unit_ctx_s {
58     void                  *data;  /* User context-specific data. */
59     nxt_unit_t            *unit;
60 };
61 
62 /*
63  * Unit port identification structure.
64  *
65  * Each port can be uniquely identified by listen process id (pid) and port id.
66  * This identification is required to refer the port from different process.
67  */
68 struct nxt_unit_port_id_s {
69     pid_t                 pid;
70     uint32_t              hash;
71     uint16_t              id;
72 };
73 
74 /*
75  * Unit provides port storage which is able to store and find the following
76  * data structures.
77  */
78 struct nxt_unit_port_s {
79     nxt_unit_port_id_t    id;
80 
81     int                   in_fd;
82     int                   out_fd;
83 
84     void                  *data;
85 };
86 
87 
88 struct nxt_unit_buf_s {
89     char                  *start;
90     char                  *free;
91     char                  *end;
92 };
93 
94 
95 struct nxt_unit_request_info_s {
96     nxt_unit_t            *unit;
97     nxt_unit_ctx_t        *ctx;
98 
99     nxt_unit_port_t       *response_port;
100 
101     nxt_unit_request_t    *request;
102     nxt_unit_buf_t        *request_buf;
103 
104     nxt_unit_response_t   *response;
105     nxt_unit_buf_t        *response_buf;
106     uint32_t              response_max_fields;
107 
108     nxt_unit_buf_t        *content_buf;
109     uint64_t              content_length;
110     int                   content_fd;
111 
112     void                  *data;
113 };
114 
115 
116 /*
117  * Set of application-specific callbacks.  The application may leave all
118  * optional callbacks as NULL.
119  */
120 struct nxt_unit_callbacks_s {
121     /*
122      * Process request. Unlike all other callbacks, this callback is required
123      * and needs to be defined by the application.
124      */
125     void     (*request_handler)(nxt_unit_request_info_t *req);
126 
127     void     (*data_handler)(nxt_unit_request_info_t *req);
128 
129     /* Process websocket frame. */
130     void     (*websocket_handler)(nxt_unit_websocket_frame_t *ws);
131 
132     /* Connection closed. */
133     void     (*close_handler)(nxt_unit_request_info_t *req);
134 
135     /* Add new Unit port to communicate with process pid. Optional. */
136     int      (*add_port)(nxt_unit_ctx_t *, nxt_unit_port_t *port);
137 
138     /* Remove previously added port. Optional. */
139     void     (*remove_port)(nxt_unit_t *, nxt_unit_ctx_t *,
140                             nxt_unit_port_t *port);
141 
142     /* Remove all data associated with process pid including ports. Optional. */
143     void     (*remove_pid)(nxt_unit_t *, pid_t pid);
144 
145     /* Gracefully quit the application. Optional. */
146     void     (*quit)(nxt_unit_ctx_t *);
147 
148     /* Shared memory release acknowledgement. Optional. */
149     void     (*shm_ack_handler)(nxt_unit_ctx_t *);
150 
151     /* Send data and control to process pid using port id. Optional. */
152     ssize_t  (*port_send)(nxt_unit_ctx_t *, nxt_unit_port_t *port,
153                  const void *buf, size_t buf_size,
154                  const void *oob, size_t oob_size);
155 
156     /* Receive data on port id. Optional. */
157     ssize_t  (*port_recv)(nxt_unit_ctx_t *, nxt_unit_port_t *port,
158                  void *buf, size_t buf_size, void *oob, size_t *oob_size);
159 
160     int      (*ready_handler)(nxt_unit_ctx_t *);
161 };
162 
163 
164 struct nxt_unit_init_s {
165     void                  *data;     /* Opaque pointer to user-defined data. */
166     void                  *ctx_data; /* Opaque pointer to user-defined data. */
167     int                   max_pending_requests;
168 
169     uint32_t              request_data_size;
170     uint32_t              shm_limit;
171     uint32_t              request_limit;
172 
173     nxt_unit_callbacks_t  callbacks;
174 
175     nxt_unit_port_t       ready_port;
176     uint32_t              ready_stream;
177     nxt_unit_port_t       router_port;
178     nxt_unit_port_t       read_port;
179     int                   shared_port_fd;
180     int                   shared_queue_fd;
181     int                   log_fd;
182 };
183 
184 
185 typedef ssize_t (*nxt_unit_read_func_t)(nxt_unit_read_info_t *read_info,
186     void *dst, size_t size);
187 
188 
189 struct nxt_unit_read_info_s {
190     nxt_unit_read_func_t  read;
191     int                   eof;
192     uint32_t              buf_size;
193     void                  *data;
194 };
195 
196 
197 /*
198  * Initialize Unit application library with necessary callbacks and
199  * ready/reply port parameters, send 'READY' response to main.
200  */
201 nxt_unit_ctx_t *nxt_unit_init(nxt_unit_init_t *);
202 
203 /*
204  * Main function, useful in case the application does not have its own event
205  * loop. nxt_unit_run() starts an infinite message wait and process loop.
206  *
207  *  for (;;) {
208  *      app_lib->port_recv(...);
209  *      nxt_unit_process_msg(...);
210  *  }
211  *
212  * The function returns normally when a QUIT message is received from Unit.
213  */
214 int nxt_unit_run(nxt_unit_ctx_t *);
215 
216 int nxt_unit_run_ctx(nxt_unit_ctx_t *ctx);
217 
218 int nxt_unit_run_shared(nxt_unit_ctx_t *ctx);
219 
220 nxt_unit_request_info_t *nxt_unit_dequeue_request(nxt_unit_ctx_t *ctx);
221 
222 /*
223  * Receive and process one message, and invoke configured callbacks.
224  *
225  * If the application implements its own event loop, each datagram received
226  * from the port socket should be initially processed by unit.  This function
227  * may invoke other application-defined callback for message processing.
228  */
229 int nxt_unit_run_once(nxt_unit_ctx_t *ctx);
230 
231 int nxt_unit_process_port_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
232 
233 /* Destroy application library object. */
234 void nxt_unit_done(nxt_unit_ctx_t *);
235 
236 /*
237  * Allocate and initialize a new execution context with a new listen port to
238  * process requests in another thread.
239  */
240 nxt_unit_ctx_t *nxt_unit_ctx_alloc(nxt_unit_ctx_t *, void *);
241 
242 /* Initialize port_id, calculate hash. */
243 void nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id);
244 
245 /* Calculates hash for given field name. */
246 uint16_t nxt_unit_field_hash(const char* name, size_t name_length);
247 
248 /* Split host for server name and port. */
249 void nxt_unit_split_host(char *host_start, uint32_t host_length,
250     char **name, uint32_t *name_length, char **port, uint32_t *port_length);
251 
252 /* Group duplicate fields for easy enumeration. */
253 void nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req);
254 
255 /*
256  * Allocate response structure capable of storing a limited number of fields.
257  * The structure may be accessed directly via req->response pointer or
258  * filled step-by-step using functions add_field and add_content.
259  */
260 int nxt_unit_response_init(nxt_unit_request_info_t *req,
261     uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size);
262 
263 int nxt_unit_response_realloc(nxt_unit_request_info_t *req,
264     uint32_t max_fields_count, uint32_t max_fields_size);
265 
266 int nxt_unit_response_is_init(nxt_unit_request_info_t *req);
267 
268 int nxt_unit_response_add_field(nxt_unit_request_info_t *req,
269     const char* name, uint8_t name_length,
270     const char* value, uint32_t value_length);
271 
272 int nxt_unit_response_add_content(nxt_unit_request_info_t *req,
273     const void* src, uint32_t size);
274 
275 /*
276  * Send the prepared response to the Unit server.  The Response structure is
277  * destroyed during this call.
278  */
279 int nxt_unit_response_send(nxt_unit_request_info_t *req);
280 
281 int nxt_unit_response_is_sent(nxt_unit_request_info_t *req);
282 
283 nxt_unit_buf_t *nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req,
284     uint32_t size);
285 
286 int nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req);
287 
288 int nxt_unit_response_upgrade(nxt_unit_request_info_t *req);
289 
290 int nxt_unit_response_is_websocket(nxt_unit_request_info_t *req);
291 
292 nxt_unit_request_info_t *nxt_unit_get_request_info_from_data(void *data);
293 
294 int nxt_unit_buf_send(nxt_unit_buf_t *buf);
295 
296 void nxt_unit_buf_free(nxt_unit_buf_t *buf);
297 
298 nxt_unit_buf_t *nxt_unit_buf_next(nxt_unit_buf_t *buf);
299 
300 uint32_t nxt_unit_buf_max(void);
301 
302 uint32_t nxt_unit_buf_min(void);
303 
304 int nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
305     size_t size);
306 
307 ssize_t nxt_unit_response_write_nb(nxt_unit_request_info_t *req,
308     const void *start, size_t size, size_t min_size);
309 
310 int nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
311     nxt_unit_read_info_t *read_info);
312 
313 ssize_t nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst,
314     size_t size);
315 
316 ssize_t nxt_unit_request_readline_size(nxt_unit_request_info_t *req,
317     size_t max_size);
318 
319 void nxt_unit_request_done(nxt_unit_request_info_t *req, int rc);
320 
321 
322 int nxt_unit_websocket_send(nxt_unit_request_info_t *req, uint8_t opcode,
323     uint8_t last, const void *start, size_t size);
324 
325 int nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode,
326     uint8_t last, const struct iovec *iov, int iovcnt);
327 
328 ssize_t nxt_unit_websocket_read(nxt_unit_websocket_frame_t *ws, void *dst,
329     size_t size);
330 
331 int nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws);
332 
333 void nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws);
334 
335 
336 void *nxt_unit_malloc(nxt_unit_ctx_t *ctx, size_t size);
337 
338 void nxt_unit_free(nxt_unit_ctx_t *ctx, void *p);
339 
340 #if defined __has_attribute
341 
342 #if __has_attribute(format)
343 
344 #define NXT_ATTR_FORMAT  __attribute__((format(printf, 3, 4)))
345 
346 #endif
347 
348 #endif
349 
350 
351 #if !defined(NXT_ATTR_FORMAT)
352 
353 #define NXT_ATTR_FORMAT
354 
355 #endif
356 
357 
358 void nxt_unit_log(nxt_unit_ctx_t *ctx, int level, const char* fmt, ...)
359     NXT_ATTR_FORMAT;
360 
361 void nxt_unit_req_log(nxt_unit_request_info_t *req, int level,
362     const char* fmt, ...) NXT_ATTR_FORMAT;
363 
364 #if (NXT_DEBUG)
365 
366 #define nxt_unit_debug(ctx, fmt, ARGS...) \
367     nxt_unit_log(ctx, NXT_UNIT_LOG_DEBUG, fmt, ##ARGS)
368 
369 #define nxt_unit_req_debug(req, fmt, ARGS...) \
370     nxt_unit_req_log(req, NXT_UNIT_LOG_DEBUG, fmt, ##ARGS)
371 
372 #else
373 
374 #define nxt_unit_debug(ctx, fmt, ARGS...)
375 
376 #define nxt_unit_req_debug(req, fmt, ARGS...)
377 
378 #endif
379 
380 
381 #define nxt_unit_warn(ctx, fmt, ARGS...) \
382     nxt_unit_log(ctx, NXT_UNIT_LOG_WARN, fmt, ##ARGS)
383 
384 #define nxt_unit_req_warn(req, fmt, ARGS...) \
385     nxt_unit_req_log(req, NXT_UNIT_LOG_WARN, fmt, ##ARGS)
386 
387 #define nxt_unit_error(ctx, fmt, ARGS...) \
388     nxt_unit_log(ctx, NXT_UNIT_LOG_ERR, fmt, ##ARGS)
389 
390 #define nxt_unit_req_error(req, fmt, ARGS...) \
391     nxt_unit_req_log(req, NXT_UNIT_LOG_ERR, fmt, ##ARGS)
392 
393 #define nxt_unit_alert(ctx, fmt, ARGS...) \
394     nxt_unit_log(ctx, NXT_UNIT_LOG_ALERT, fmt, ##ARGS)
395 
396 #define nxt_unit_req_alert(req, fmt, ARGS...) \
397     nxt_unit_req_log(req, NXT_UNIT_LOG_ALERT, fmt, ##ARGS)
398 
399 
400 #endif /* _NXT_UNIT_H_INCLUDED_ */
401