xref: /unit/src/nxt_unit.h (revision 1436:44ccce64ddf9)
1 
2 /*
3  * Copyright (C) NGINX, Inc.
4  */
5 
6 #ifndef _NXT_UNIT_H_INCLUDED_
7 #define _NXT_UNIT_H_INCLUDED_
8 
9 
10 #include <inttypes.h>
11 #include <sys/types.h>
12 #include <sys/uio.h>
13 #include <string.h>
14 
15 #include "nxt_version.h"
16 #include "nxt_unit_typedefs.h"
17 
18 
19 enum {
20     NXT_UNIT_OK          = 0,
21     NXT_UNIT_ERROR       = 1,
22 };
23 
24 enum {
25     NXT_UNIT_LOG_ALERT   = 0,
26     NXT_UNIT_LOG_ERR     = 1,
27     NXT_UNIT_LOG_WARN    = 2,
28     NXT_UNIT_LOG_NOTICE  = 3,
29     NXT_UNIT_LOG_INFO    = 4,
30     NXT_UNIT_LOG_DEBUG   = 5,
31 };
32 
33 #define NXT_UNIT_INIT_ENV  "NXT_UNIT_INIT"
34 
35 /*
36  * Mostly opaque structure with library state.
37  *
38  * Only user defined 'data' pointer exposed here.  The rest is unit
39  * implementation specific and hidden.
40  */
41 struct nxt_unit_s {
42     void                  *data;  /* User defined data. */
43 };
44 
45 /*
46  * Thread context.
47  *
48  * First (main) context is provided 'for free'.  To receive and process
49  * requests in other thread, one need to allocate context and use it
50  * further in this thread.
51  */
52 struct nxt_unit_ctx_s {
53     void                  *data;  /* User context-specific data. */
54     nxt_unit_t            *unit;
55 };
56 
57 /*
58  * Unit port identification structure.
59  *
60  * Each port can be uniquely identified by listen process id (pid) and port id.
61  * This identification is required to refer the port from different process.
62  */
63 struct nxt_unit_port_id_s {
64     pid_t                 pid;
65     uint32_t              hash;
66     uint16_t              id;
67 };
68 
69 /*
70  * unit provides port storage which is able to store and find the following
71  * data structures.
72  */
73 struct nxt_unit_port_s {
74     nxt_unit_port_id_t    id;
75 
76     int                   in_fd;
77     int                   out_fd;
78 
79     void                  *data;
80 };
81 
82 
83 struct nxt_unit_buf_s {
84     char                  *start;
85     char                  *free;
86     char                  *end;
87 };
88 
89 
90 struct nxt_unit_request_info_s {
91     nxt_unit_t            *unit;
92     nxt_unit_ctx_t        *ctx;
93 
94     nxt_unit_port_id_t    request_port;
95     nxt_unit_port_id_t    response_port;
96 
97     nxt_unit_request_t    *request;
98     nxt_unit_buf_t        *request_buf;
99 
100     nxt_unit_response_t   *response;
101     nxt_unit_buf_t        *response_buf;
102     uint32_t              response_max_fields;
103 
104     nxt_unit_buf_t        *content_buf;
105     uint64_t              content_length;
106     int                   content_fd;
107 
108     void                  *data;
109 };
110 
111 
112 /*
113  * Set of application-specific callbacks. Application may leave all optional
114  * callbacks as NULL.
115  */
116 struct nxt_unit_callbacks_s {
117     /*
118      * Process request. Unlike all other callback, this callback
119      * need to be defined by application.
120      */
121     void     (*request_handler)(nxt_unit_request_info_t *req);
122 
123     /* Process websocket frame. */
124     void     (*websocket_handler)(nxt_unit_websocket_frame_t *ws);
125 
126     /* Connection closed. */
127     void     (*close_handler)(nxt_unit_request_info_t *req);
128 
129     /* Add new Unit port to communicate with process pid. Optional. */
130     int      (*add_port)(nxt_unit_ctx_t *, nxt_unit_port_t *port);
131 
132     /* Remove previously added port. Optional. */
133     void     (*remove_port)(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id);
134 
135     /* Remove all data associated with process pid including ports. Optional. */
136     void     (*remove_pid)(nxt_unit_ctx_t *, pid_t pid);
137 
138     /* Gracefully quit the application. Optional. */
139     void     (*quit)(nxt_unit_ctx_t *);
140 
141     /* Shared memory release acknowledgement. */
142     void     (*shm_ack_handler)(nxt_unit_ctx_t *);
143 
144     /* Send data and control to process pid using port id. Optional. */
145     ssize_t  (*port_send)(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id,
146                  const void *buf, size_t buf_size,
147                  const void *oob, size_t oob_size);
148 
149     /* Receive data on port id. Optional. */
150     ssize_t  (*port_recv)(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id,
151                  void *buf, size_t buf_size, void *oob, size_t oob_size);
152 
153 };
154 
155 
156 struct nxt_unit_init_s {
157     void                  *data;     /* Opaque pointer to user-defined data. */
158     void                  *ctx_data; /* Opaque pointer to user-defined data. */
159     int                   max_pending_requests;
160 
161     uint32_t              request_data_size;
162     uint32_t              shm_limit;
163 
164     nxt_unit_callbacks_t  callbacks;
165 
166     nxt_unit_port_t       ready_port;
167     uint32_t              ready_stream;
168     nxt_unit_port_t       read_port;
169     int                   log_fd;
170 };
171 
172 
173 typedef ssize_t (*nxt_unit_read_func_t)(nxt_unit_read_info_t *read_info,
174     void *dst, size_t size);
175 
176 
177 struct nxt_unit_read_info_s {
178     nxt_unit_read_func_t  read;
179     int                   eof;
180     uint32_t              buf_size;
181     void                  *data;
182 };
183 
184 
185 /*
186  * Initialize Unit application library with necessary callbacks and
187  * ready/reply port parameters, send 'READY' response to master.
188  */
189 nxt_unit_ctx_t *nxt_unit_init(nxt_unit_init_t *);
190 
191 /*
192  * Process received message, invoke configured callbacks.
193  *
194  * If application implements it's own event loop, each datagram received
195  * from port socket should be initially processed by unit.  This function
196  * may invoke other application-defined callback for message processing.
197  */
198 int nxt_unit_process_msg(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id,
199     void *buf, size_t buf_size, void *oob, size_t oob_size);
200 
201 /*
202  * Main function useful in case when application does not have it's own
203  * event loop. nxt_unit_run() starts infinite message wait and process loop.
204  *
205  *  for (;;) {
206  *      app_lib->port_recv(...);
207  *      nxt_unit_process_msg(...);
208  *  }
209  *
210  * The normally function returns when QUIT message received from Unit.
211  */
212 int nxt_unit_run(nxt_unit_ctx_t *);
213 
214 int nxt_unit_run_once(nxt_unit_ctx_t *ctx);
215 
216 /* Destroy application library object. */
217 void nxt_unit_done(nxt_unit_ctx_t *);
218 
219 /*
220  * Allocate and initialize new execution context with new listen port to
221  * process requests in other thread.
222  */
223 nxt_unit_ctx_t *nxt_unit_ctx_alloc(nxt_unit_ctx_t *, void *);
224 
225 /* Free unused context.  It is not required to free main context. */
226 void nxt_unit_ctx_free(nxt_unit_ctx_t *);
227 
228 /* Initialize port_id, calculate hash. */
229 void nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id);
230 
231 /*
232  * Create extra incoming port, perform all required actions to propogate
233  * the port to Unit server.  Fills structure referenced by port_id with
234  * current pid and new port id.
235  */
236 int nxt_unit_create_send_port(nxt_unit_ctx_t *, nxt_unit_port_id_t *dst,
237     nxt_unit_port_id_t *port_id);
238 
239 /* Default 'add_port' implementation. */
240 int nxt_unit_add_port(nxt_unit_ctx_t *, nxt_unit_port_t *port);
241 
242 /* Find previously added port. */
243 nxt_unit_port_t *nxt_unit_find_port(nxt_unit_ctx_t *,
244     nxt_unit_port_id_t *port_id);
245 
246 /* Find, fill output 'port' and remove port from storage.  */
247 void nxt_unit_find_remove_port(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id,
248     nxt_unit_port_t *port);
249 
250 /* Default 'remove_port' implementation. */
251 void nxt_unit_remove_port(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id);
252 
253 /* Default 'remove_pid' implementation. */
254 void nxt_unit_remove_pid(nxt_unit_ctx_t *, pid_t pid);
255 
256 /* Default 'quit' implementation. */
257 void nxt_unit_quit(nxt_unit_ctx_t *);
258 
259 /* Default 'port_send' implementation. */
260 ssize_t nxt_unit_port_send(nxt_unit_ctx_t *, int fd,
261     const void *buf, size_t buf_size,
262     const void *oob, size_t oob_size);
263 
264 /* Default 'port_recv' implementation. */
265 ssize_t nxt_unit_port_recv(nxt_unit_ctx_t *, int fd, void *buf, size_t buf_size,
266     void *oob, size_t oob_size);
267 
268 /* Calculates hash for given field name. */
269 uint16_t nxt_unit_field_hash(const char* name, size_t name_length);
270 
271 /* Split host for server name and port. */
272 void nxt_unit_split_host(char *host_start, uint32_t host_length,
273     char **name, uint32_t *name_length, char **port, uint32_t *port_length);
274 
275 /* Group duplicate fields for easy enumeration. */
276 void nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req);
277 
278 /*
279  * Allocate response structure capable to store limited numer of fields.
280  * The structure may be accessed directly via req->response pointer or
281  * filled step-by-step using functions add_field and add_content.
282  */
283 int nxt_unit_response_init(nxt_unit_request_info_t *req,
284     uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size);
285 
286 int nxt_unit_response_realloc(nxt_unit_request_info_t *req,
287     uint32_t max_fields_count, uint32_t max_fields_size);
288 
289 int nxt_unit_response_is_init(nxt_unit_request_info_t *req);
290 
291 int nxt_unit_response_add_field(nxt_unit_request_info_t *req,
292     const char* name, uint8_t name_length,
293     const char* value, uint32_t value_length);
294 
295 int nxt_unit_response_add_content(nxt_unit_request_info_t *req,
296     const void* src, uint32_t size);
297 
298 /*
299  * Send prepared response to Unit server.  Response structure destroyed during
300  * this call.
301  */
302 int nxt_unit_response_send(nxt_unit_request_info_t *req);
303 
304 int nxt_unit_response_is_sent(nxt_unit_request_info_t *req);
305 
306 nxt_unit_buf_t *nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req,
307     uint32_t size);
308 
309 int nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req);
310 
311 int nxt_unit_response_upgrade(nxt_unit_request_info_t *req);
312 
313 int nxt_unit_response_is_websocket(nxt_unit_request_info_t *req);
314 
315 nxt_unit_request_info_t *nxt_unit_get_request_info_from_data(void *data);
316 
317 int nxt_unit_buf_send(nxt_unit_buf_t *buf);
318 
319 void nxt_unit_buf_free(nxt_unit_buf_t *buf);
320 
321 nxt_unit_buf_t *nxt_unit_buf_next(nxt_unit_buf_t *buf);
322 
323 uint32_t nxt_unit_buf_max(void);
324 
325 uint32_t nxt_unit_buf_min(void);
326 
327 int nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start,
328     size_t size);
329 
330 ssize_t nxt_unit_response_write_nb(nxt_unit_request_info_t *req,
331     const void *start, size_t size, size_t min_size);
332 
333 int nxt_unit_response_write_cb(nxt_unit_request_info_t *req,
334     nxt_unit_read_info_t *read_info);
335 
336 ssize_t nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst,
337     size_t size);
338 
339 ssize_t nxt_unit_request_readline_size(nxt_unit_request_info_t *req,
340     size_t max_size);
341 
342 void nxt_unit_request_done(nxt_unit_request_info_t *req, int rc);
343 
344 
345 int nxt_unit_websocket_send(nxt_unit_request_info_t *req, uint8_t opcode,
346     uint8_t last, const void *start, size_t size);
347 
348 int nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode,
349     uint8_t last, const struct iovec *iov, int iovcnt);
350 
351 ssize_t nxt_unit_websocket_read(nxt_unit_websocket_frame_t *ws, void *dst,
352     size_t size);
353 
354 int nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws);
355 
356 void nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws);
357 
358 
359 #if defined __has_attribute
360 
361 #if __has_attribute(format)
362 
363 #define NXT_ATTR_FORMAT  __attribute__((format(printf, 3, 4)))
364 
365 #endif
366 
367 #endif
368 
369 
370 #if !defined(NXT_ATTR_FORMAT)
371 
372 #define NXT_ATTR_FORMAT
373 
374 #endif
375 
376 
377 void nxt_unit_log(nxt_unit_ctx_t *ctx, int level, const char* fmt, ...)
378     NXT_ATTR_FORMAT;
379 
380 void nxt_unit_req_log(nxt_unit_request_info_t *req, int level,
381     const char* fmt, ...) NXT_ATTR_FORMAT;
382 
383 #if (NXT_DEBUG)
384 
385 #define nxt_unit_debug(ctx, fmt, ARGS...) \
386     nxt_unit_log(ctx, NXT_UNIT_LOG_DEBUG, fmt, ##ARGS)
387 
388 #define nxt_unit_req_debug(req, fmt, ARGS...) \
389     nxt_unit_req_log(req, NXT_UNIT_LOG_DEBUG, fmt, ##ARGS)
390 
391 #else
392 
393 #define nxt_unit_debug(ctx, fmt, ARGS...)
394 
395 #define nxt_unit_req_debug(req, fmt, ARGS...)
396 
397 #endif
398 
399 
400 #define nxt_unit_warn(ctx, fmt, ARGS...) \
401     nxt_unit_log(ctx, NXT_UNIT_LOG_WARN, fmt, ##ARGS)
402 
403 #define nxt_unit_req_warn(req, fmt, ARGS...) \
404     nxt_unit_req_log(req, NXT_UNIT_LOG_WARN, fmt, ##ARGS)
405 
406 #define nxt_unit_error(ctx, fmt, ARGS...) \
407     nxt_unit_log(ctx, NXT_UNIT_LOG_ERR, fmt, ##ARGS)
408 
409 #define nxt_unit_req_error(req, fmt, ARGS...) \
410     nxt_unit_req_log(req, NXT_UNIT_LOG_ERR, fmt, ##ARGS)
411 
412 #define nxt_unit_alert(ctx, fmt, ARGS...) \
413     nxt_unit_log(ctx, NXT_UNIT_LOG_ALERT, fmt, ##ARGS)
414 
415 #define nxt_unit_req_alert(req, fmt, ARGS...) \
416     nxt_unit_req_log(req, NXT_UNIT_LOG_ALERT, fmt, ##ARGS)
417 
418 
419 #endif /* _NXT_UNIT_H_INCLUDED_ */
420