1 2 /* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6 #ifndef _NXT_UNIT_H_INCLUDED_ 7 #define _NXT_UNIT_H_INCLUDED_ 8 9 10 #include <inttypes.h> 11 #include <sys/types.h> 12 #include <sys/uio.h> 13 #include <string.h> 14 15 #include "nxt_version.h" 16 #include "nxt_unit_typedefs.h" 17 18 19 enum { 20 NXT_UNIT_OK = 0, 21 NXT_UNIT_ERROR = 1, 22 }; 23 24 enum { 25 NXT_UNIT_LOG_ALERT = 0, 26 NXT_UNIT_LOG_ERR = 1, 27 NXT_UNIT_LOG_WARN = 2, 28 NXT_UNIT_LOG_NOTICE = 3, 29 NXT_UNIT_LOG_INFO = 4, 30 NXT_UNIT_LOG_DEBUG = 5, 31 }; 32 33 #define NXT_UNIT_INIT_ENV "NXT_UNIT_INIT" 34 35 /* 36 * Mostly opaque structure with library state. 37 * 38 * Only user defined 'data' pointer exposed here. The rest is unit 39 * implementation specific and hidden. 40 */ 41 struct nxt_unit_s { 42 void *data; /* User defined data. */ 43 }; 44 45 /* 46 * Thread context. 47 * 48 * First (main) context is provided 'for free'. To receive and process 49 * requests in other thread, one need to allocate context and use it 50 * further in this thread. 51 */ 52 struct nxt_unit_ctx_s { 53 void *data; /* User context-specific data. */ 54 nxt_unit_t *unit; 55 }; 56 57 /* 58 * Unit port identification structure. 59 * 60 * Each port can be uniquely identified by listen process id (pid) and port id. 61 * This identification is required to refer the port from different process. 62 */ 63 struct nxt_unit_port_id_s { 64 pid_t pid; 65 uint32_t hash; 66 uint16_t id; 67 }; 68 69 /* 70 * unit provides port storage which is able to store and find the following 71 * data structures. 72 */ 73 struct nxt_unit_port_s { 74 nxt_unit_port_id_t id; 75 76 int in_fd; 77 int out_fd; 78 79 void *data; 80 }; 81 82 83 struct nxt_unit_buf_s { 84 char *start; 85 char *free; 86 char *end; 87 }; 88 89 90 struct nxt_unit_request_info_s { 91 nxt_unit_t *unit; 92 nxt_unit_ctx_t *ctx; 93 94 nxt_unit_port_id_t request_port; 95 nxt_unit_port_id_t response_port; 96 97 nxt_unit_request_t *request; 98 nxt_unit_buf_t *request_buf; 99 100 nxt_unit_response_t *response; 101 nxt_unit_buf_t *response_buf; 102 uint32_t response_max_fields; 103 104 nxt_unit_buf_t *content_buf; 105 uint64_t content_length; 106 int content_fd; 107 108 void *data; 109 }; 110 111 112 /* 113 * Set of application-specific callbacks. Application may leave all optional 114 * callbacks as NULL. 115 */ 116 struct nxt_unit_callbacks_s { 117 /* 118 * Process request. Unlike all other callback, this callback 119 * need to be defined by application. 120 */ 121 void (*request_handler)(nxt_unit_request_info_t *req); 122 123 /* Process websocket frame. */ 124 void (*websocket_handler)(nxt_unit_websocket_frame_t *ws); 125 126 /* Connection closed. */ 127 void (*close_handler)(nxt_unit_request_info_t *req); 128 129 /* Add new Unit port to communicate with process pid. Optional. */ 130 int (*add_port)(nxt_unit_ctx_t *, nxt_unit_port_t *port); 131 132 /* Remove previously added port. Optional. */ 133 void (*remove_port)(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id); 134 135 /* Remove all data associated with process pid including ports. Optional. */ 136 void (*remove_pid)(nxt_unit_ctx_t *, pid_t pid); 137 138 /* Gracefully quit the application. Optional. */ 139 void (*quit)(nxt_unit_ctx_t *); 140 141 /* Shared memory release acknowledgement. */ 142 void (*shm_ack_handler)(nxt_unit_ctx_t *); 143 144 /* Send data and control to process pid using port id. Optional. */ 145 ssize_t (*port_send)(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id, 146 const void *buf, size_t buf_size, 147 const void *oob, size_t oob_size); 148 149 /* Receive data on port id. Optional. */ 150 ssize_t (*port_recv)(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id, 151 void *buf, size_t buf_size, void *oob, size_t oob_size); 152 153 }; 154 155 156 struct nxt_unit_init_s { 157 void *data; /* Opaque pointer to user-defined data. */ 158 void *ctx_data; /* Opaque pointer to user-defined data. */ 159 int max_pending_requests; 160 161 uint32_t request_data_size; 162 uint32_t shm_limit; 163 164 nxt_unit_callbacks_t callbacks; 165 166 nxt_unit_port_t ready_port; 167 uint32_t ready_stream; 168 nxt_unit_port_t read_port; 169 int log_fd; 170 }; 171 172 173 typedef ssize_t (*nxt_unit_read_func_t)(nxt_unit_read_info_t *read_info, 174 void *dst, size_t size); 175 176 177 struct nxt_unit_read_info_s { 178 nxt_unit_read_func_t read; 179 int eof; 180 uint32_t buf_size; 181 void *data; 182 }; 183 184 185 /* 186 * Initialize Unit application library with necessary callbacks and 187 * ready/reply port parameters, send 'READY' response to master. 188 */ 189 nxt_unit_ctx_t *nxt_unit_init(nxt_unit_init_t *); 190 191 /* 192 * Process received message, invoke configured callbacks. 193 * 194 * If application implements it's own event loop, each datagram received 195 * from port socket should be initially processed by unit. This function 196 * may invoke other application-defined callback for message processing. 197 */ 198 int nxt_unit_process_msg(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id, 199 void *buf, size_t buf_size, void *oob, size_t oob_size); 200 201 /* 202 * Main function useful in case when application does not have it's own 203 * event loop. nxt_unit_run() starts infinite message wait and process loop. 204 * 205 * for (;;) { 206 * app_lib->port_recv(...); 207 * nxt_unit_process_msg(...); 208 * } 209 * 210 * The normally function returns when QUIT message received from Unit. 211 */ 212 int nxt_unit_run(nxt_unit_ctx_t *); 213 214 int nxt_unit_run_once(nxt_unit_ctx_t *ctx); 215 216 /* Destroy application library object. */ 217 void nxt_unit_done(nxt_unit_ctx_t *); 218 219 /* 220 * Allocate and initialize new execution context with new listen port to 221 * process requests in other thread. 222 */ 223 nxt_unit_ctx_t *nxt_unit_ctx_alloc(nxt_unit_ctx_t *, void *); 224 225 /* Free unused context. It is not required to free main context. */ 226 void nxt_unit_ctx_free(nxt_unit_ctx_t *); 227 228 /* Initialize port_id, calculate hash. */ 229 void nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id); 230 231 /* 232 * Create extra incoming port, perform all required actions to propogate 233 * the port to Unit server. Fills structure referenced by port_id with 234 * current pid and new port id. 235 */ 236 int nxt_unit_create_send_port(nxt_unit_ctx_t *, nxt_unit_port_id_t *dst, 237 nxt_unit_port_id_t *port_id); 238 239 /* Default 'add_port' implementation. */ 240 int nxt_unit_add_port(nxt_unit_ctx_t *, nxt_unit_port_t *port); 241 242 /* Find previously added port. */ 243 nxt_unit_port_t *nxt_unit_find_port(nxt_unit_ctx_t *, 244 nxt_unit_port_id_t *port_id); 245 246 /* Find, fill output 'port' and remove port from storage. */ 247 void nxt_unit_find_remove_port(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id, 248 nxt_unit_port_t *port); 249 250 /* Default 'remove_port' implementation. */ 251 void nxt_unit_remove_port(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id); 252 253 /* Default 'remove_pid' implementation. */ 254 void nxt_unit_remove_pid(nxt_unit_ctx_t *, pid_t pid); 255 256 /* Default 'quit' implementation. */ 257 void nxt_unit_quit(nxt_unit_ctx_t *); 258 259 /* Default 'port_send' implementation. */ 260 ssize_t nxt_unit_port_send(nxt_unit_ctx_t *, int fd, 261 const void *buf, size_t buf_size, 262 const void *oob, size_t oob_size); 263 264 /* Default 'port_recv' implementation. */ 265 ssize_t nxt_unit_port_recv(nxt_unit_ctx_t *, int fd, void *buf, size_t buf_size, 266 void *oob, size_t oob_size); 267 268 /* Calculates hash for given field name. */ 269 uint16_t nxt_unit_field_hash(const char* name, size_t name_length); 270 271 /* Split host for server name and port. */ 272 void nxt_unit_split_host(char *host_start, uint32_t host_length, 273 char **name, uint32_t *name_length, char **port, uint32_t *port_length); 274 275 /* Group duplicate fields for easy enumeration. */ 276 void nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req); 277 278 /* 279 * Allocate response structure capable to store limited numer of fields. 280 * The structure may be accessed directly via req->response pointer or 281 * filled step-by-step using functions add_field and add_content. 282 */ 283 int nxt_unit_response_init(nxt_unit_request_info_t *req, 284 uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size); 285 286 int nxt_unit_response_realloc(nxt_unit_request_info_t *req, 287 uint32_t max_fields_count, uint32_t max_fields_size); 288 289 int nxt_unit_response_is_init(nxt_unit_request_info_t *req); 290 291 int nxt_unit_response_add_field(nxt_unit_request_info_t *req, 292 const char* name, uint8_t name_length, 293 const char* value, uint32_t value_length); 294 295 int nxt_unit_response_add_content(nxt_unit_request_info_t *req, 296 const void* src, uint32_t size); 297 298 /* 299 * Send prepared response to Unit server. Response structure destroyed during 300 * this call. 301 */ 302 int nxt_unit_response_send(nxt_unit_request_info_t *req); 303 304 int nxt_unit_response_is_sent(nxt_unit_request_info_t *req); 305 306 nxt_unit_buf_t *nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, 307 uint32_t size); 308 309 int nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req); 310 311 int nxt_unit_response_upgrade(nxt_unit_request_info_t *req); 312 313 int nxt_unit_response_is_websocket(nxt_unit_request_info_t *req); 314 315 nxt_unit_request_info_t *nxt_unit_get_request_info_from_data(void *data); 316 317 int nxt_unit_buf_send(nxt_unit_buf_t *buf); 318 319 void nxt_unit_buf_free(nxt_unit_buf_t *buf); 320 321 nxt_unit_buf_t *nxt_unit_buf_next(nxt_unit_buf_t *buf); 322 323 uint32_t nxt_unit_buf_max(void); 324 325 uint32_t nxt_unit_buf_min(void); 326 327 int nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start, 328 size_t size); 329 330 ssize_t nxt_unit_response_write_nb(nxt_unit_request_info_t *req, 331 const void *start, size_t size, size_t min_size); 332 333 int nxt_unit_response_write_cb(nxt_unit_request_info_t *req, 334 nxt_unit_read_info_t *read_info); 335 336 ssize_t nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, 337 size_t size); 338 339 ssize_t nxt_unit_request_readline_size(nxt_unit_request_info_t *req, 340 size_t max_size); 341 342 void nxt_unit_request_done(nxt_unit_request_info_t *req, int rc); 343 344 345 int nxt_unit_websocket_send(nxt_unit_request_info_t *req, uint8_t opcode, 346 uint8_t last, const void *start, size_t size); 347 348 int nxt_unit_websocket_sendv(nxt_unit_request_info_t *req, uint8_t opcode, 349 uint8_t last, const struct iovec *iov, int iovcnt); 350 351 ssize_t nxt_unit_websocket_read(nxt_unit_websocket_frame_t *ws, void *dst, 352 size_t size); 353 354 int nxt_unit_websocket_retain(nxt_unit_websocket_frame_t *ws); 355 356 void nxt_unit_websocket_done(nxt_unit_websocket_frame_t *ws); 357 358 359 #if defined __has_attribute 360 361 #if __has_attribute(format) 362 363 #define NXT_ATTR_FORMAT __attribute__((format(printf, 3, 4))) 364 365 #endif 366 367 #endif 368 369 370 #if !defined(NXT_ATTR_FORMAT) 371 372 #define NXT_ATTR_FORMAT 373 374 #endif 375 376 377 void nxt_unit_log(nxt_unit_ctx_t *ctx, int level, const char* fmt, ...) 378 NXT_ATTR_FORMAT; 379 380 void nxt_unit_req_log(nxt_unit_request_info_t *req, int level, 381 const char* fmt, ...) NXT_ATTR_FORMAT; 382 383 #if (NXT_DEBUG) 384 385 #define nxt_unit_debug(ctx, fmt, ARGS...) \ 386 nxt_unit_log(ctx, NXT_UNIT_LOG_DEBUG, fmt, ##ARGS) 387 388 #define nxt_unit_req_debug(req, fmt, ARGS...) \ 389 nxt_unit_req_log(req, NXT_UNIT_LOG_DEBUG, fmt, ##ARGS) 390 391 #else 392 393 #define nxt_unit_debug(ctx, fmt, ARGS...) 394 395 #define nxt_unit_req_debug(req, fmt, ARGS...) 396 397 #endif 398 399 400 #define nxt_unit_warn(ctx, fmt, ARGS...) \ 401 nxt_unit_log(ctx, NXT_UNIT_LOG_WARN, fmt, ##ARGS) 402 403 #define nxt_unit_req_warn(req, fmt, ARGS...) \ 404 nxt_unit_req_log(req, NXT_UNIT_LOG_WARN, fmt, ##ARGS) 405 406 #define nxt_unit_error(ctx, fmt, ARGS...) \ 407 nxt_unit_log(ctx, NXT_UNIT_LOG_ERR, fmt, ##ARGS) 408 409 #define nxt_unit_req_error(req, fmt, ARGS...) \ 410 nxt_unit_req_log(req, NXT_UNIT_LOG_ERR, fmt, ##ARGS) 411 412 #define nxt_unit_alert(ctx, fmt, ARGS...) \ 413 nxt_unit_log(ctx, NXT_UNIT_LOG_ALERT, fmt, ##ARGS) 414 415 #define nxt_unit_req_alert(req, fmt, ARGS...) \ 416 nxt_unit_req_log(req, NXT_UNIT_LOG_ALERT, fmt, ##ARGS) 417 418 419 #endif /* _NXT_UNIT_H_INCLUDED_ */ 420