1 2 /* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6 #ifndef _NXT_UNIT_H_INCLUDED_ 7 #define _NXT_UNIT_H_INCLUDED_ 8 9 10 #include <inttypes.h> 11 #include <sys/types.h> 12 #include <sys/uio.h> 13 #include <string.h> 14 15 #include "nxt_auto_config.h" 16 #include "nxt_version.h" 17 #include "nxt_unit_typedefs.h" 18 19 20 enum { 21 NXT_UNIT_OK = 0, 22 NXT_UNIT_ERROR = 1, 23 NXT_UNIT_AGAIN = 2, 24 NXT_UNIT_CANCELLED = 3, 25 }; 26 27 enum { 28 NXT_UNIT_LOG_ALERT = 0, 29 NXT_UNIT_LOG_ERR = 1, 30 NXT_UNIT_LOG_WARN = 2, 31 NXT_UNIT_LOG_NOTICE = 3, 32 NXT_UNIT_LOG_INFO = 4, 33 NXT_UNIT_LOG_DEBUG = 5, 34 }; 35 36 #define NXT_UNIT_INIT_ENV "NXT_UNIT_INIT" 37 38 #define NXT_UNIT_SHARED_PORT_ID ((uint16_t) 0xFFFFu) 39 40 /* 41 * Mostly opaque structure with library state. 42 * 43 * Only the user defined 'data' pointer is exposed here. The rest is unit 44 * implementation specific and hidden. 45 */ 46 struct nxt_unit_s { 47 void *data; /* User defined data. */ 48 }; 49 50 /* 51 * Thread context. 52 * 53 * First (main) context is provided 'for free'. To receive and process 54 * requests in other threads, one needs to allocate a new context and use it 55 * further in that thread. 56 */ 57 struct nxt_unit_ctx_s { 58 void *data; /* User context-specific data. */ 59 nxt_unit_t *unit; 60 }; 61 62 /* 63 * Unit port identification structure. 64 * 65 * Each port can be uniquely identified by listen process id (pid) and port id. 66 * This identification is required to refer the port from different process. 67 */ 68 struct nxt_unit_port_id_s { 69 pid_t pid; 70 uint32_t hash; 71 uint16_t id; 72 }; 73 74 /* 75 * Unit provides port storage which is able to store and find the following 76 * data structures. 77 */ 78 struct nxt_unit_port_s { 79 nxt_unit_port_id_t id; 80 81 int in_fd; 82 int out_fd; 83 84 void *data; 85 }; 86 87 88 struct nxt_unit_buf_s { 89 char *start; 90 char *free; 91 char *end; 92 }; 93 94 95 struct nxt_unit_request_info_s { 96 nxt_unit_t *unit; 97 nxt_unit_ctx_t *ctx; 98 99 nxt_unit_port_t *response_port; 100 101 nxt_unit_request_t *request; 102 nxt_unit_buf_t *request_buf; 103 104 nxt_unit_response_t *response; 105 nxt_unit_buf_t *response_buf; 106 uint32_t response_max_fields; 107 108 nxt_unit_buf_t *content_buf; 109 uint64_t content_length; 110 int content_fd; 111 112 void *data; 113 }; 114 115 116 /* 117 * Set of application-specific callbacks. The application may leave all 118 * optional callbacks as NULL. 119 */ 120 struct nxt_unit_callbacks_s { 121 /* 122 * Process request. Unlike all other callbacks, this callback is required 123 * and needs to be defined by the application. 124 */ 125 void (*request_handler)(nxt_unit_request_info_t *req); 126 127 void (*data_handler)(nxt_unit_request_info_t *req); 128 129 /* Process websocket frame. */ 130 void (*websocket_handler)(nxt_unit_websocket_frame_t *ws); 131 132 /* Connection closed. */ 133 void (*close_handler)(nxt_unit_request_info_t *req); 134 135 /* Add new Unit port to communicate with process pid. Optional. */ 136 int (*add_port)(nxt_unit_ctx_t *, nxt_unit_port_t *port); 137 138 /* Remove previously added port. Optional. */ 139 void (*remove_port)(nxt_unit_t *, nxt_unit_ctx_t *, 140 nxt_unit_port_t *port); 141 142 /* Remove all data associated with process pid including ports. Optional. */ 143 void (*remove_pid)(nxt_unit_t *, pid_t pid); 144 145 /* Gracefully quit the application. Optional. */ 146 void (*quit)(nxt_unit_ctx_t *); 147 148 /* Shared memory release acknowledgement. Optional. */ 149 void (*shm_ack_handler)(nxt_unit_ctx_t *); 150 151 /* Send data and control to process pid using port id. Optional. */ 152 ssize_t (*port_send)(nxt_unit_ctx_t *, nxt_unit_port_t *port, 153 const void *buf, size_t buf_size, 154 const void *oob, size_t oob_size); 155 156 /* Receive data on port id. Optional. */ 157 ssize_t (*port_recv)(nxt_unit_ctx_t *, nxt_unit_port_t *port, 158 void *buf, size_t buf_size, void *oob, size_t *oob_size); 159 160 int (*ready_handler)(nxt_unit_ctx_t *); 161 }; 162 163 164 struct nxt_unit_init_s { 165 void *data; /* Opaque pointer to user-defined data. */ 166 void *ctx_data; /* Opaque pointer to user-defined data. */ 167 int max_pending_requests; 168 169 uint32_t request_data_size; 170 uint32_t shm_limit; 171 uint32_t request_limit; 172 173 nxt_unit_callbacks_t callbacks; 174 175 nxt_unit_port_t ready_port; 176 uint32_t ready_stream; 177 nxt_unit_port_t router_port; 178 nxt_unit_port_t read_port; 179 int shared_port_fd; 180 int shared_queue_fd; 181 int log_fd; 182 }; 183 184 185 typedef ssize_t (*nxt_unit_read_func_t)(nxt_unit_read_info_t *read_info, 186 void *dst, size_t size); 187 188 189 struct nxt_unit_read_info_s { 190 nxt_unit_read_func_t read; 191 int eof; 192 uint32_t buf_size; 193 void *data; 194 }; 195 196 197 /* 198 * Initialize Unit application library with necessary callbacks and 199 * ready/reply port parameters, send 'READY' response to main. 200 */ 201 nxt_unit_ctx_t *nxt_unit_init(nxt_unit_init_t *); 202 203 /* 204 * Main function, useful in case the application does not have its own event 205 * loop. nxt_unit_run() starts an infinite message wait and process loop. 206 * 207 * for (;;) { 208 * app_lib->port_recv(...); 209 * nxt_unit_process_msg(...); 210 * } 211 * 212 * The function returns normally when a QUIT message is received from Unit. 213 */ 214 int nxt_unit_run(nxt_unit_ctx_t *); 215 216 int nxt_unit_run_ctx(nxt_unit_ctx_t *ctx); 217 218 int nxt_unit_run_shared(nxt_unit_ctx_t *ctx); 219 220 nxt_unit_request_info_t *nxt_unit_dequeue_request(nxt_unit_ctx_t *ctx); 221 222 /* 223 * Receive and process one message, and invoke configured callbacks. 224 * 225 * If the application implements its own event loop, each datagram received 226 * from the port socket should be initially processed by unit. This function 227 * may invoke other application-defined callback for message processing. 228 */ 229 int nxt_unit_run_once(nxt_unit_ctx_t *ctx); 230 231 int nxt_unit_process_port_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); 232 233 /* Destroy application library object. */ 234 void nxt_unit_done(nxt_unit_ctx_t *); 235 236 /* 237 * Allocate and initialize a new execution context with a new listen port to 238 * process requests in another thread. 239 */ 240 nxt_unit_ctx_t *nxt_unit_ctx_alloc(nxt_unit_ctx_t *, void *); 241 242 /* Initialize port_id, calculate hash. */ 243 void nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id); 244 245 /* Calculates hash for given field name. */ 246 uint16_t nxt_unit_field_hash(const char* name, size_t name_length); 247 248 /* Split host for server name and port. */ 249 void nxt_unit_split_host(char *host_start, uint32_t host_length, 250 char **name, uint32_t *name_length, char **port, uint32_t *port_length); 251 252 /* Group duplicate fields for easy enumeration. */ 253 void nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req); 254 255 /* 256 * Allocate response structure capable of storing a limited number of fields. 257 * The structure may be accessed directly via req->response pointer or 258 * filled step-by-step using functions add_field and add_content. 259 */ 260 int nxt_unit_response_init(nxt_unit_request_info_t *req, 261 uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size); 262 263 int nxt_unit_response_realloc(nxt_unit_request_info_t *req, 264 uint32_t max_fields_count, uint32_t max_fields_size); 265 266 int nxt_unit_response_is_init(nxt_unit_request_info_t *req); 267 268 int nxt_unit_response_add_field(nxt_unit_request_info_t *req, 269 const char* name, uint8_t name_length, 270 const char* value, uint32_t value_length); 271 272 int nxt_unit_response_add_content(nxt_unit_request_info_t *req, 273 const void* src, uint32_t size); 274 275 /* 276 * Send the prepared response to the Unit server. The Response structure is 277 * destroyed during this call. 278 */ 279 int nxt_unit_response_send(nxt_unit_request_info_t *req); 280 281 int nxt_unit_response_is_sent(nxt_unit_request_info_t *req); 282 283 nxt_unit_buf_t *nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, 284 uint32_t size); 285 286 int nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req); 287 288 int nxt_unit_response_upgrade(nxt_unit_request_info_t *req); 289 290 int nxt_unit_response_is_websocket(nxt_unit_request_info_t *req); 291 292 nxt_unit_request_info_t *nxt_unit_get_request_info_from_data(void *data); 293 294 int nxt_unit_buf_send(nxt_unit_buf_t *buf); 295 296 void nxt_unit_buf_free(nxt_unit_buf_t *buf); 297 298 nxt_unit_buf_t *nxt_unit_buf_next(nxt_unit_buf_t *buf); 299 300 uint32_t nxt_unit_buf_max(void); 301 302 uint32_t nxt_unit_buf_min(void); 303 304 int nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start, 305 size_t size); 306 307 ssize_t nxt_unit_response_write_nb(nxt_unit_request_info_t *req, 308 const void *start, size_t size, size_t min_size); 309 310 int nxt_unit_response_write_cb(nxt_unit_request_info_t *req, 311 nxt_unit_read_info_t *read_info); 312 313 ssize_t nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, 314 size_t size); 315 316 ssize_t nxt_unit_request_readline_size(nxt_unit_request_info_t *req, 317 size_t max_size); 318 319 void nxt_unit_request_done(nxt_unit_request_info_t *req, int rc); 320 321 322 int nxt_unit_websocket_send(nxt_unit_request_info_t *req, uint8_t opcode, 323 uint8_t last, const void *start, size_t size); 324 325 int nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode, 326 uint8_t last, const struct iovec *iov, int iovcnt); 327 328 ssize_t nxt_unit_websocket_read(nxt_unit_websocket_frame_t *ws, void *dst, 329 size_t size); 330 331 int nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws); 332 333 void nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws); 334 335 336 void *nxt_unit_malloc(nxt_unit_ctx_t *ctx, size_t size); 337 338 void nxt_unit_free(nxt_unit_ctx_t *ctx, void *p); 339 340 #if defined __has_attribute 341 342 #if __has_attribute(format) 343 344 #define NXT_ATTR_FORMAT __attribute__((format(printf, 3, 4))) 345 346 #endif 347 348 #endif 349 350 351 #if !defined(NXT_ATTR_FORMAT) 352 353 #define NXT_ATTR_FORMAT 354 355 #endif 356 357 358 void nxt_unit_log(nxt_unit_ctx_t *ctx, int level, const char* fmt, ...) 359 NXT_ATTR_FORMAT; 360 361 void nxt_unit_req_log(nxt_unit_request_info_t *req, int level, 362 const char* fmt, ...) NXT_ATTR_FORMAT; 363 364 #if (NXT_DEBUG) 365 366 #define nxt_unit_debug(ctx, fmt, ARGS...) \ 367 nxt_unit_log(ctx, NXT_UNIT_LOG_DEBUG, fmt, ##ARGS) 368 369 #define nxt_unit_req_debug(req, fmt, ARGS...) \ 370 nxt_unit_req_log(req, NXT_UNIT_LOG_DEBUG, fmt, ##ARGS) 371 372 #else 373 374 #define nxt_unit_debug(ctx, fmt, ARGS...) 375 376 #define nxt_unit_req_debug(req, fmt, ARGS...) 377 378 #endif 379 380 381 #define nxt_unit_warn(ctx, fmt, ARGS...) \ 382 nxt_unit_log(ctx, NXT_UNIT_LOG_WARN, fmt, ##ARGS) 383 384 #define nxt_unit_req_warn(req, fmt, ARGS...) \ 385 nxt_unit_req_log(req, NXT_UNIT_LOG_WARN, fmt, ##ARGS) 386 387 #define nxt_unit_error(ctx, fmt, ARGS...) \ 388 nxt_unit_log(ctx, NXT_UNIT_LOG_ERR, fmt, ##ARGS) 389 390 #define nxt_unit_req_error(req, fmt, ARGS...) \ 391 nxt_unit_req_log(req, NXT_UNIT_LOG_ERR, fmt, ##ARGS) 392 393 #define nxt_unit_alert(ctx, fmt, ARGS...) \ 394 nxt_unit_log(ctx, NXT_UNIT_LOG_ALERT, fmt, ##ARGS) 395 396 #define nxt_unit_req_alert(req, fmt, ARGS...) \ 397 nxt_unit_req_log(req, NXT_UNIT_LOG_ALERT, fmt, ##ARGS) 398 399 400 #endif /* _NXT_UNIT_H_INCLUDED_ */ 401