1 2 /* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6 #ifndef _NXT_UNIT_H_INCLUDED_ 7 #define _NXT_UNIT_H_INCLUDED_ 8 9 10 #include <inttypes.h> 11 #include <sys/types.h> 12 #include <sys/uio.h> 13 #include <string.h> 14 15 #include "nxt_version.h" 16 #include "nxt_unit_typedefs.h" 17 18 19 enum { 20 NXT_UNIT_OK = 0, 21 NXT_UNIT_ERROR = 1, 22 NXT_UNIT_AGAIN = 2, 23 NXT_UNIT_CANCELLED = 3, 24 }; 25 26 enum { 27 NXT_UNIT_LOG_ALERT = 0, 28 NXT_UNIT_LOG_ERR = 1, 29 NXT_UNIT_LOG_WARN = 2, 30 NXT_UNIT_LOG_NOTICE = 3, 31 NXT_UNIT_LOG_INFO = 4, 32 NXT_UNIT_LOG_DEBUG = 5, 33 }; 34 35 #define NXT_UNIT_INIT_ENV "NXT_UNIT_INIT" 36 37 #define NXT_UNIT_SHARED_PORT_ID ((uint16_t) 0xFFFFu) 38 39 /* 40 * Mostly opaque structure with library state. 41 * 42 * Only user defined 'data' pointer exposed here. The rest is unit 43 * implementation specific and hidden. 44 */ 45 struct nxt_unit_s { 46 void *data; /* User defined data. */ 47 }; 48 49 /* 50 * Thread context. 51 * 52 * First (main) context is provided 'for free'. To receive and process 53 * requests in other thread, one need to allocate context and use it 54 * further in this thread. 55 */ 56 struct nxt_unit_ctx_s { 57 void *data; /* User context-specific data. */ 58 nxt_unit_t *unit; 59 }; 60 61 /* 62 * Unit port identification structure. 63 * 64 * Each port can be uniquely identified by listen process id (pid) and port id. 65 * This identification is required to refer the port from different process. 66 */ 67 struct nxt_unit_port_id_s { 68 pid_t pid; 69 uint32_t hash; 70 uint16_t id; 71 }; 72 73 /* 74 * unit provides port storage which is able to store and find the following 75 * data structures. 76 */ 77 struct nxt_unit_port_s { 78 nxt_unit_port_id_t id; 79 80 int in_fd; 81 int out_fd; 82 83 void *data; 84 }; 85 86 87 struct nxt_unit_buf_s { 88 char *start; 89 char *free; 90 char *end; 91 }; 92 93 94 struct nxt_unit_request_info_s { 95 nxt_unit_t *unit; 96 nxt_unit_ctx_t *ctx; 97 98 nxt_unit_port_t *response_port; 99 100 nxt_unit_request_t *request; 101 nxt_unit_buf_t *request_buf; 102 103 nxt_unit_response_t *response; 104 nxt_unit_buf_t *response_buf; 105 uint32_t response_max_fields; 106 107 nxt_unit_buf_t *content_buf; 108 uint64_t content_length; 109 int content_fd; 110 111 void *data; 112 }; 113 114 115 /* 116 * Set of application-specific callbacks. Application may leave all optional 117 * callbacks as NULL. 118 */ 119 struct nxt_unit_callbacks_s { 120 /* 121 * Process request. Unlike all other callback, this callback 122 * need to be defined by application. 123 */ 124 void (*request_handler)(nxt_unit_request_info_t *req); 125 126 void (*data_handler)(nxt_unit_request_info_t *req); 127 128 /* Process websocket frame. */ 129 void (*websocket_handler)(nxt_unit_websocket_frame_t *ws); 130 131 /* Connection closed. */ 132 void (*close_handler)(nxt_unit_request_info_t *req); 133 134 /* Add new Unit port to communicate with process pid. Optional. */ 135 int (*add_port)(nxt_unit_ctx_t *, nxt_unit_port_t *port); 136 137 /* Remove previously added port. Optional. */ 138 void (*remove_port)(nxt_unit_t *, nxt_unit_port_t *port); 139 140 /* Remove all data associated with process pid including ports. Optional. */ 141 void (*remove_pid)(nxt_unit_t *, pid_t pid); 142 143 /* Gracefully quit the application. Optional. */ 144 void (*quit)(nxt_unit_ctx_t *); 145 146 /* Shared memory release acknowledgement. Optional. */ 147 void (*shm_ack_handler)(nxt_unit_ctx_t *); 148 149 /* Send data and control to process pid using port id. Optional. */ 150 ssize_t (*port_send)(nxt_unit_ctx_t *, nxt_unit_port_t *port, 151 const void *buf, size_t buf_size, 152 const void *oob, size_t oob_size); 153 154 /* Receive data on port id. Optional. */ 155 ssize_t (*port_recv)(nxt_unit_ctx_t *, nxt_unit_port_t *port, 156 void *buf, size_t buf_size, void *oob, size_t oob_size); 157 158 int (*ready_handler)(nxt_unit_ctx_t *); 159 }; 160 161 162 struct nxt_unit_init_s { 163 void *data; /* Opaque pointer to user-defined data. */ 164 void *ctx_data; /* Opaque pointer to user-defined data. */ 165 int max_pending_requests; 166 167 uint32_t request_data_size; 168 uint32_t shm_limit; 169 170 nxt_unit_callbacks_t callbacks; 171 172 nxt_unit_port_t ready_port; 173 uint32_t ready_stream; 174 nxt_unit_port_t router_port; 175 nxt_unit_port_t read_port; 176 int log_fd; 177 }; 178 179 180 typedef ssize_t (*nxt_unit_read_func_t)(nxt_unit_read_info_t *read_info, 181 void *dst, size_t size); 182 183 184 struct nxt_unit_read_info_s { 185 nxt_unit_read_func_t read; 186 int eof; 187 uint32_t buf_size; 188 void *data; 189 }; 190 191 192 /* 193 * Initialize Unit application library with necessary callbacks and 194 * ready/reply port parameters, send 'READY' response to main. 195 */ 196 nxt_unit_ctx_t *nxt_unit_init(nxt_unit_init_t *); 197 198 /* 199 * Main function useful in case when application does not have it's own 200 * event loop. nxt_unit_run() starts infinite message wait and process loop. 201 * 202 * for (;;) { 203 * app_lib->port_recv(...); 204 * nxt_unit_process_msg(...); 205 * } 206 * 207 * The normally function returns when QUIT message received from Unit. 208 */ 209 int nxt_unit_run(nxt_unit_ctx_t *); 210 211 int nxt_unit_run_ctx(nxt_unit_ctx_t *ctx); 212 213 int nxt_unit_run_shared(nxt_unit_ctx_t *ctx); 214 215 int nxt_unit_is_main_ctx(nxt_unit_ctx_t *ctx); 216 217 /* 218 * Receive and process one message, invoke configured callbacks. 219 * 220 * If application implements it's own event loop, each datagram received 221 * from port socket should be initially processed by unit. This function 222 * may invoke other application-defined callback for message processing. 223 */ 224 int nxt_unit_run_once(nxt_unit_ctx_t *ctx); 225 226 int nxt_unit_process_port_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); 227 228 /* Destroy application library object. */ 229 void nxt_unit_done(nxt_unit_ctx_t *); 230 231 /* 232 * Allocate and initialize new execution context with new listen port to 233 * process requests in other thread. 234 */ 235 nxt_unit_ctx_t *nxt_unit_ctx_alloc(nxt_unit_ctx_t *, void *); 236 237 /* Initialize port_id, calculate hash. */ 238 void nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id); 239 240 /* Calculates hash for given field name. */ 241 uint16_t nxt_unit_field_hash(const char* name, size_t name_length); 242 243 /* Split host for server name and port. */ 244 void nxt_unit_split_host(char *host_start, uint32_t host_length, 245 char **name, uint32_t *name_length, char **port, uint32_t *port_length); 246 247 /* Group duplicate fields for easy enumeration. */ 248 void nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req); 249 250 /* 251 * Allocate response structure capable to store limited numer of fields. 252 * The structure may be accessed directly via req->response pointer or 253 * filled step-by-step using functions add_field and add_content. 254 */ 255 int nxt_unit_response_init(nxt_unit_request_info_t *req, 256 uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size); 257 258 int nxt_unit_response_realloc(nxt_unit_request_info_t *req, 259 uint32_t max_fields_count, uint32_t max_fields_size); 260 261 int nxt_unit_response_is_init(nxt_unit_request_info_t *req); 262 263 int nxt_unit_response_add_field(nxt_unit_request_info_t *req, 264 const char* name, uint8_t name_length, 265 const char* value, uint32_t value_length); 266 267 int nxt_unit_response_add_content(nxt_unit_request_info_t *req, 268 const void* src, uint32_t size); 269 270 /* 271 * Send prepared response to Unit server. Response structure destroyed during 272 * this call. 273 */ 274 int nxt_unit_response_send(nxt_unit_request_info_t *req); 275 276 int nxt_unit_response_is_sent(nxt_unit_request_info_t *req); 277 278 nxt_unit_buf_t *nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, 279 uint32_t size); 280 281 int nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req); 282 283 int nxt_unit_response_upgrade(nxt_unit_request_info_t *req); 284 285 int nxt_unit_response_is_websocket(nxt_unit_request_info_t *req); 286 287 nxt_unit_request_info_t *nxt_unit_get_request_info_from_data(void *data); 288 289 int nxt_unit_buf_send(nxt_unit_buf_t *buf); 290 291 void nxt_unit_buf_free(nxt_unit_buf_t *buf); 292 293 nxt_unit_buf_t *nxt_unit_buf_next(nxt_unit_buf_t *buf); 294 295 uint32_t nxt_unit_buf_max(void); 296 297 uint32_t nxt_unit_buf_min(void); 298 299 int nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start, 300 size_t size); 301 302 ssize_t nxt_unit_response_write_nb(nxt_unit_request_info_t *req, 303 const void *start, size_t size, size_t min_size); 304 305 int nxt_unit_response_write_cb(nxt_unit_request_info_t *req, 306 nxt_unit_read_info_t *read_info); 307 308 ssize_t nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, 309 size_t size); 310 311 ssize_t nxt_unit_request_readline_size(nxt_unit_request_info_t *req, 312 size_t max_size); 313 314 void nxt_unit_request_done(nxt_unit_request_info_t *req, int rc); 315 316 317 int nxt_unit_websocket_send(nxt_unit_request_info_t *req, uint8_t opcode, 318 uint8_t last, const void *start, size_t size); 319 320 int nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode, 321 uint8_t last, const struct iovec *iov, int iovcnt); 322 323 ssize_t nxt_unit_websocket_read(nxt_unit_websocket_frame_t *ws, void *dst, 324 size_t size); 325 326 int nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws); 327 328 void nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws); 329 330 331 void *nxt_unit_malloc(nxt_unit_ctx_t *ctx, size_t size); 332 333 void nxt_unit_free(nxt_unit_ctx_t *ctx, void *p); 334 335 #if defined __has_attribute 336 337 #if __has_attribute(format) 338 339 #define NXT_ATTR_FORMAT __attribute__((format(printf, 3, 4))) 340 341 #endif 342 343 #endif 344 345 346 #if !defined(NXT_ATTR_FORMAT) 347 348 #define NXT_ATTR_FORMAT 349 350 #endif 351 352 353 void nxt_unit_log(nxt_unit_ctx_t *ctx, int level, const char* fmt, ...) 354 NXT_ATTR_FORMAT; 355 356 void nxt_unit_req_log(nxt_unit_request_info_t *req, int level, 357 const char* fmt, ...) NXT_ATTR_FORMAT; 358 359 #if (NXT_DEBUG) 360 361 #define nxt_unit_debug(ctx, fmt, ARGS...) \ 362 nxt_unit_log(ctx, NXT_UNIT_LOG_DEBUG, fmt, ##ARGS) 363 364 #define nxt_unit_req_debug(req, fmt, ARGS...) \ 365 nxt_unit_req_log(req, NXT_UNIT_LOG_DEBUG, fmt, ##ARGS) 366 367 #else 368 369 #define nxt_unit_debug(ctx, fmt, ARGS...) 370 371 #define nxt_unit_req_debug(req, fmt, ARGS...) 372 373 #endif 374 375 376 #define nxt_unit_warn(ctx, fmt, ARGS...) \ 377 nxt_unit_log(ctx, NXT_UNIT_LOG_WARN, fmt, ##ARGS) 378 379 #define nxt_unit_req_warn(req, fmt, ARGS...) \ 380 nxt_unit_req_log(req, NXT_UNIT_LOG_WARN, fmt, ##ARGS) 381 382 #define nxt_unit_error(ctx, fmt, ARGS...) \ 383 nxt_unit_log(ctx, NXT_UNIT_LOG_ERR, fmt, ##ARGS) 384 385 #define nxt_unit_req_error(req, fmt, ARGS...) \ 386 nxt_unit_req_log(req, NXT_UNIT_LOG_ERR, fmt, ##ARGS) 387 388 #define nxt_unit_alert(ctx, fmt, ARGS...) \ 389 nxt_unit_log(ctx, NXT_UNIT_LOG_ALERT, fmt, ##ARGS) 390 391 #define nxt_unit_req_alert(req, fmt, ARGS...) \ 392 nxt_unit_req_log(req, NXT_UNIT_LOG_ALERT, fmt, ##ARGS) 393 394 395 #endif /* _NXT_UNIT_H_INCLUDED_ */ 396