1 2 /* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6 #include "nxt_main.h" 7 #include "nxt_port_memory_int.h" 8 #include "nxt_socket_msg.h" 9 #include "nxt_port_queue.h" 10 #include "nxt_app_queue.h" 11 12 #include "nxt_unit.h" 13 #include "nxt_unit_request.h" 14 #include "nxt_unit_response.h" 15 #include "nxt_unit_websocket.h" 16 17 #include "nxt_websocket.h" 18 19 #if (NXT_HAVE_MEMFD_CREATE) 20 #include <linux/memfd.h> 21 #endif 22 23 #define NXT_UNIT_MAX_PLAIN_SIZE 1024 24 #define NXT_UNIT_LOCAL_BUF_SIZE \ 25 (NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t)) 26 27 enum { 28 NXT_QUIT_NORMAL = 0, 29 NXT_QUIT_GRACEFUL = 1, 30 }; 31 32 typedef struct nxt_unit_impl_s nxt_unit_impl_t; 33 typedef struct nxt_unit_mmap_s nxt_unit_mmap_t; 34 typedef struct nxt_unit_mmaps_s nxt_unit_mmaps_t; 35 typedef struct nxt_unit_process_s nxt_unit_process_t; 36 typedef struct nxt_unit_mmap_buf_s nxt_unit_mmap_buf_t; 37 typedef struct nxt_unit_recv_msg_s nxt_unit_recv_msg_t; 38 typedef struct nxt_unit_read_buf_s nxt_unit_read_buf_t; 39 typedef struct nxt_unit_ctx_impl_s nxt_unit_ctx_impl_t; 40 typedef struct nxt_unit_port_impl_s nxt_unit_port_impl_t; 41 typedef struct nxt_unit_request_info_impl_s nxt_unit_request_info_impl_t; 42 typedef struct nxt_unit_websocket_frame_impl_s nxt_unit_websocket_frame_impl_t; 43 44 static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init); 45 static int nxt_unit_ctx_init(nxt_unit_impl_t *lib, 46 nxt_unit_ctx_impl_t *ctx_impl, void *data); 47 nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_t *ctx); 48 nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_t *ctx); 49 nxt_inline void nxt_unit_lib_use(nxt_unit_impl_t *lib); 50 nxt_inline void nxt_unit_lib_release(nxt_unit_impl_t *lib); 51 nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head, 52 nxt_unit_mmap_buf_t *mmap_buf); 53 nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev, 54 nxt_unit_mmap_buf_t *mmap_buf); 55 nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf); 56 static int nxt_unit_read_env(nxt_unit_port_t *ready_port, 57 nxt_unit_port_t *router_port, nxt_unit_port_t *read_port, 58 int *shared_port_fd, int *shared_queue_fd, 59 int *log_fd, uint32_t *stream, uint32_t *shm_limit, 60 uint32_t *request_limit); 61 static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, 62 int queue_fd); 63 static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf, 64 nxt_unit_request_info_t **preq); 65 static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, 66 nxt_unit_recv_msg_t *recv_msg); 67 static int nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx); 68 static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, 69 nxt_unit_recv_msg_t *recv_msg, nxt_unit_request_info_t **preq); 70 static int nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, 71 nxt_unit_recv_msg_t *recv_msg); 72 static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, 73 nxt_unit_port_id_t *port_id); 74 static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req); 75 static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, 76 nxt_unit_recv_msg_t *recv_msg); 77 static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx); 78 static nxt_unit_request_info_impl_t *nxt_unit_request_info_get( 79 nxt_unit_ctx_t *ctx); 80 static void nxt_unit_request_info_release(nxt_unit_request_info_t *req); 81 static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req); 82 static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get( 83 nxt_unit_ctx_t *ctx); 84 static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws); 85 static void nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx, 86 nxt_unit_websocket_frame_impl_t *ws); 87 static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx); 88 static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf); 89 static int nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req, 90 nxt_unit_mmap_buf_t *mmap_buf, int last); 91 static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf); 92 static void nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf); 93 static nxt_unit_read_buf_t *nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx); 94 static nxt_unit_read_buf_t *nxt_unit_read_buf_get_impl( 95 nxt_unit_ctx_impl_t *ctx_impl); 96 static void nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx, 97 nxt_unit_read_buf_t *rbuf); 98 static nxt_unit_mmap_buf_t *nxt_unit_request_preread( 99 nxt_unit_request_info_t *req, size_t size); 100 static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, 101 size_t size); 102 static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, 103 nxt_unit_port_t *port, nxt_chunk_id_t *c, int *n, int min_n); 104 static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); 105 static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx); 106 static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i); 107 static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, 108 nxt_unit_port_t *port, int n); 109 static int nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size); 110 static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 111 int fd); 112 static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, 113 nxt_unit_port_t *port, uint32_t size, 114 uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf); 115 static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd); 116 117 static void nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx, 118 nxt_unit_ctx_impl_t *ctx_impl); 119 static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps); 120 nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process); 121 nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process); 122 static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps); 123 static int nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx, 124 nxt_unit_mmaps_t *mmaps, pid_t pid, uint32_t id, 125 nxt_port_mmap_header_t **hdr, nxt_unit_read_buf_t *rbuf); 126 static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, 127 nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf); 128 static int nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id); 129 static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, 130 nxt_port_mmap_header_t *hdr, void *start, uint32_t size); 131 static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid); 132 133 static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid); 134 static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib, 135 pid_t pid, int remove); 136 static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib); 137 static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx); 138 static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf); 139 static int nxt_unit_chk_ready(nxt_unit_ctx_t *ctx); 140 static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx); 141 static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx); 142 nxt_inline int nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf); 143 nxt_inline int nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf); 144 nxt_inline int nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf); 145 nxt_inline int nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf); 146 static int nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, 147 nxt_unit_port_t *port); 148 static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl); 149 static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx); 150 151 static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst, 152 nxt_unit_port_t *port, int queue_fd); 153 154 nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port); 155 nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port); 156 static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx, 157 nxt_unit_port_t *port, void *queue); 158 static void nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx, 159 nxt_queue_t *awaiting_req); 160 static void nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_ctx_t *ctx, 161 nxt_unit_port_id_t *port_id); 162 static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, 163 nxt_unit_port_id_t *port_id); 164 static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid); 165 static void nxt_unit_remove_process(nxt_unit_impl_t *lib, 166 nxt_unit_process_t *process); 167 static void nxt_unit_quit(nxt_unit_ctx_t *ctx, uint8_t quit_param); 168 static int nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id); 169 static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx, 170 nxt_unit_port_t *port, const void *buf, size_t buf_size, 171 const nxt_send_oob_t *oob); 172 static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd, 173 const void *buf, size_t buf_size, const nxt_send_oob_t *oob); 174 static int nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 175 nxt_unit_read_buf_t *rbuf); 176 nxt_inline void nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst, 177 nxt_unit_read_buf_t *src); 178 static int nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 179 nxt_unit_read_buf_t *rbuf); 180 static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 181 nxt_unit_read_buf_t *rbuf); 182 static int nxt_unit_port_queue_recv(nxt_unit_port_t *port, 183 nxt_unit_read_buf_t *rbuf); 184 static int nxt_unit_app_queue_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 185 nxt_unit_read_buf_t *rbuf); 186 nxt_inline int nxt_unit_close(int fd); 187 static int nxt_unit_fd_blocking(int fd); 188 189 static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, 190 nxt_unit_port_t *port); 191 static nxt_unit_port_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, 192 nxt_unit_port_id_t *port_id, int remove); 193 194 static int nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx, 195 nxt_unit_request_info_t *req); 196 static nxt_unit_request_info_t *nxt_unit_request_hash_find( 197 nxt_unit_ctx_t *ctx, uint32_t stream, int remove); 198 199 static char * nxt_unit_snprint_prefix(char *p, char *end, pid_t pid, int level); 200 static void *nxt_unit_lvlhsh_alloc(void *data, size_t size); 201 static void nxt_unit_lvlhsh_free(void *data, void *p); 202 static int nxt_unit_memcasecmp(const void *p1, const void *p2, size_t length); 203 204 205 struct nxt_unit_mmap_buf_s { 206 nxt_unit_buf_t buf; 207 208 nxt_unit_mmap_buf_t *next; 209 nxt_unit_mmap_buf_t **prev; 210 211 nxt_port_mmap_header_t *hdr; 212 nxt_unit_request_info_t *req; 213 nxt_unit_ctx_impl_t *ctx_impl; 214 char *free_ptr; 215 char *plain_ptr; 216 }; 217 218 219 struct nxt_unit_recv_msg_s { 220 uint32_t stream; 221 nxt_pid_t pid; 222 nxt_port_id_t reply_port; 223 224 uint8_t last; /* 1 bit */ 225 uint8_t mmap; /* 1 bit */ 226 227 void *start; 228 uint32_t size; 229 230 int fd[2]; 231 232 nxt_unit_mmap_buf_t *incoming_buf; 233 }; 234 235 236 typedef enum { 237 NXT_UNIT_RS_START = 0, 238 NXT_UNIT_RS_RESPONSE_INIT, 239 NXT_UNIT_RS_RESPONSE_HAS_CONTENT, 240 NXT_UNIT_RS_RESPONSE_SENT, 241 NXT_UNIT_RS_RELEASED, 242 } nxt_unit_req_state_t; 243 244 245 struct nxt_unit_request_info_impl_s { 246 nxt_unit_request_info_t req; 247 248 uint32_t stream; 249 250 nxt_unit_mmap_buf_t *outgoing_buf; 251 nxt_unit_mmap_buf_t *incoming_buf; 252 253 nxt_unit_req_state_t state; 254 uint8_t websocket; 255 uint8_t in_hash; 256 257 /* for nxt_unit_ctx_impl_t.free_req or active_req */ 258 nxt_queue_link_t link; 259 /* for nxt_unit_port_impl_t.awaiting_req */ 260 nxt_queue_link_t port_wait_link; 261 262 char extra_data[]; 263 }; 264 265 266 struct nxt_unit_websocket_frame_impl_s { 267 nxt_unit_websocket_frame_t ws; 268 269 nxt_unit_mmap_buf_t *buf; 270 271 nxt_queue_link_t link; 272 273 nxt_unit_ctx_impl_t *ctx_impl; 274 }; 275 276 277 struct nxt_unit_read_buf_s { 278 nxt_queue_link_t link; 279 nxt_unit_ctx_impl_t *ctx_impl; 280 ssize_t size; 281 nxt_recv_oob_t oob; 282 char buf[16384]; 283 }; 284 285 286 struct nxt_unit_ctx_impl_s { 287 nxt_unit_ctx_t ctx; 288 289 nxt_atomic_t use_count; 290 nxt_atomic_t wait_items; 291 292 pthread_mutex_t mutex; 293 294 nxt_unit_port_t *read_port; 295 296 nxt_queue_link_t link; 297 298 nxt_unit_mmap_buf_t *free_buf; 299 300 /* of nxt_unit_request_info_impl_t */ 301 nxt_queue_t free_req; 302 303 /* of nxt_unit_websocket_frame_impl_t */ 304 nxt_queue_t free_ws; 305 306 /* of nxt_unit_request_info_impl_t */ 307 nxt_queue_t active_req; 308 309 /* of nxt_unit_request_info_impl_t */ 310 nxt_lvlhsh_t requests; 311 312 /* of nxt_unit_request_info_impl_t */ 313 nxt_queue_t ready_req; 314 315 /* of nxt_unit_read_buf_t */ 316 nxt_queue_t pending_rbuf; 317 318 /* of nxt_unit_read_buf_t */ 319 nxt_queue_t free_rbuf; 320 321 uint8_t online; /* 1 bit */ 322 uint8_t ready; /* 1 bit */ 323 uint8_t quit_param; 324 325 nxt_unit_mmap_buf_t ctx_buf[2]; 326 nxt_unit_read_buf_t ctx_read_buf; 327 328 nxt_unit_request_info_impl_t req; 329 }; 330 331 332 struct nxt_unit_mmap_s { 333 nxt_port_mmap_header_t *hdr; 334 pthread_t src_thread; 335 336 /* of nxt_unit_read_buf_t */ 337 nxt_queue_t awaiting_rbuf; 338 }; 339 340 341 struct nxt_unit_mmaps_s { 342 pthread_mutex_t mutex; 343 uint32_t size; 344 uint32_t cap; 345 nxt_atomic_t allocated_chunks; 346 nxt_unit_mmap_t *elts; 347 }; 348 349 350 struct nxt_unit_impl_s { 351 nxt_unit_t unit; 352 nxt_unit_callbacks_t callbacks; 353 354 nxt_atomic_t use_count; 355 nxt_atomic_t request_count; 356 357 uint32_t request_data_size; 358 uint32_t shm_mmap_limit; 359 uint32_t request_limit; 360 361 pthread_mutex_t mutex; 362 363 nxt_lvlhsh_t processes; /* of nxt_unit_process_t */ 364 nxt_lvlhsh_t ports; /* of nxt_unit_port_impl_t */ 365 366 nxt_unit_port_t *router_port; 367 nxt_unit_port_t *shared_port; 368 369 nxt_queue_t contexts; /* of nxt_unit_ctx_impl_t */ 370 371 nxt_unit_mmaps_t incoming; 372 nxt_unit_mmaps_t outgoing; 373 374 pid_t pid; 375 int log_fd; 376 377 nxt_unit_ctx_impl_t main_ctx; 378 }; 379 380 381 struct nxt_unit_port_impl_s { 382 nxt_unit_port_t port; 383 384 nxt_atomic_t use_count; 385 386 /* for nxt_unit_process_t.ports */ 387 nxt_queue_link_t link; 388 nxt_unit_process_t *process; 389 390 /* of nxt_unit_request_info_impl_t */ 391 nxt_queue_t awaiting_req; 392 393 int ready; 394 395 void *queue; 396 397 int from_socket; 398 nxt_unit_read_buf_t *socket_rbuf; 399 }; 400 401 402 struct nxt_unit_process_s { 403 pid_t pid; 404 405 nxt_queue_t ports; /* of nxt_unit_port_impl_t */ 406 407 nxt_unit_impl_t *lib; 408 409 nxt_atomic_t use_count; 410 411 uint32_t next_port_id; 412 }; 413 414 415 /* Explicitly using 32 bit types to avoid possible alignment. */ 416 typedef struct { 417 int32_t pid; 418 uint32_t id; 419 } nxt_unit_port_hash_id_t; 420 421 422 static pid_t nxt_unit_pid; 423 424 425 nxt_unit_ctx_t * 426 nxt_unit_init(nxt_unit_init_t *init) 427 { 428 int rc, queue_fd, shared_queue_fd; 429 void *mem; 430 uint32_t ready_stream, shm_limit, request_limit; 431 nxt_unit_ctx_t *ctx; 432 nxt_unit_impl_t *lib; 433 nxt_unit_port_t ready_port, router_port, read_port, shared_port; 434 435 nxt_unit_pid = getpid(); 436 437 lib = nxt_unit_create(init); 438 if (nxt_slow_path(lib == NULL)) { 439 return NULL; 440 } 441 442 queue_fd = -1; 443 mem = MAP_FAILED; 444 shared_port.out_fd = -1; 445 shared_port.data = NULL; 446 447 if (init->ready_port.id.pid != 0 448 && init->ready_stream != 0 449 && init->read_port.id.pid != 0) 450 { 451 ready_port = init->ready_port; 452 ready_stream = init->ready_stream; 453 router_port = init->router_port; 454 read_port = init->read_port; 455 lib->log_fd = init->log_fd; 456 457 nxt_unit_port_id_init(&ready_port.id, ready_port.id.pid, 458 ready_port.id.id); 459 nxt_unit_port_id_init(&router_port.id, router_port.id.pid, 460 router_port.id.id); 461 nxt_unit_port_id_init(&read_port.id, read_port.id.pid, 462 read_port.id.id); 463 464 shared_port.in_fd = init->shared_port_fd; 465 shared_queue_fd = init->shared_queue_fd; 466 467 } else { 468 rc = nxt_unit_read_env(&ready_port, &router_port, &read_port, 469 &shared_port.in_fd, &shared_queue_fd, 470 &lib->log_fd, &ready_stream, &shm_limit, 471 &request_limit); 472 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 473 goto fail; 474 } 475 476 lib->shm_mmap_limit = (shm_limit + PORT_MMAP_DATA_SIZE - 1) 477 / PORT_MMAP_DATA_SIZE; 478 lib->request_limit = request_limit; 479 } 480 481 if (nxt_slow_path(lib->shm_mmap_limit < 1)) { 482 lib->shm_mmap_limit = 1; 483 } 484 485 lib->pid = read_port.id.pid; 486 nxt_unit_pid = lib->pid; 487 488 ctx = &lib->main_ctx.ctx; 489 490 rc = nxt_unit_fd_blocking(router_port.out_fd); 491 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 492 goto fail; 493 } 494 495 lib->router_port = nxt_unit_add_port(ctx, &router_port, NULL); 496 if (nxt_slow_path(lib->router_port == NULL)) { 497 nxt_unit_alert(NULL, "failed to add router_port"); 498 499 goto fail; 500 } 501 502 queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t)); 503 if (nxt_slow_path(queue_fd == -1)) { 504 goto fail; 505 } 506 507 mem = mmap(NULL, sizeof(nxt_port_queue_t), 508 PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0); 509 if (nxt_slow_path(mem == MAP_FAILED)) { 510 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd, 511 strerror(errno), errno); 512 513 goto fail; 514 } 515 516 nxt_port_queue_init(mem); 517 518 rc = nxt_unit_fd_blocking(read_port.in_fd); 519 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 520 goto fail; 521 } 522 523 lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port, mem); 524 if (nxt_slow_path(lib->main_ctx.read_port == NULL)) { 525 nxt_unit_alert(NULL, "failed to add read_port"); 526 527 goto fail; 528 } 529 530 rc = nxt_unit_fd_blocking(ready_port.out_fd); 531 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 532 goto fail; 533 } 534 535 nxt_unit_port_id_init(&shared_port.id, read_port.id.pid, 536 NXT_UNIT_SHARED_PORT_ID); 537 538 mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE, 539 MAP_SHARED, shared_queue_fd, 0); 540 if (nxt_slow_path(mem == MAP_FAILED)) { 541 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", shared_queue_fd, 542 strerror(errno), errno); 543 544 goto fail; 545 } 546 547 nxt_unit_close(shared_queue_fd); 548 549 lib->shared_port = nxt_unit_add_port(ctx, &shared_port, mem); 550 if (nxt_slow_path(lib->shared_port == NULL)) { 551 nxt_unit_alert(NULL, "failed to add shared_port"); 552 553 goto fail; 554 } 555 556 rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream, queue_fd); 557 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 558 nxt_unit_alert(NULL, "failed to send READY message"); 559 560 goto fail; 561 } 562 563 nxt_unit_close(ready_port.out_fd); 564 nxt_unit_close(queue_fd); 565 566 return ctx; 567 568 fail: 569 570 if (mem != MAP_FAILED) { 571 munmap(mem, sizeof(nxt_port_queue_t)); 572 } 573 574 if (queue_fd != -1) { 575 nxt_unit_close(queue_fd); 576 } 577 578 nxt_unit_ctx_release(&lib->main_ctx.ctx); 579 580 return NULL; 581 } 582 583 584 static nxt_unit_impl_t * 585 nxt_unit_create(nxt_unit_init_t *init) 586 { 587 int rc; 588 nxt_unit_impl_t *lib; 589 nxt_unit_callbacks_t *cb; 590 591 lib = nxt_unit_malloc(NULL, 592 sizeof(nxt_unit_impl_t) + init->request_data_size); 593 if (nxt_slow_path(lib == NULL)) { 594 nxt_unit_alert(NULL, "failed to allocate unit struct"); 595 596 return NULL; 597 } 598 599 rc = pthread_mutex_init(&lib->mutex, NULL); 600 if (nxt_slow_path(rc != 0)) { 601 nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc); 602 603 goto fail; 604 } 605 606 lib->unit.data = init->data; 607 lib->callbacks = init->callbacks; 608 609 lib->request_data_size = init->request_data_size; 610 lib->shm_mmap_limit = (init->shm_limit + PORT_MMAP_DATA_SIZE - 1) 611 / PORT_MMAP_DATA_SIZE; 612 lib->request_limit = init->request_limit; 613 614 lib->processes.slot = NULL; 615 lib->ports.slot = NULL; 616 617 lib->log_fd = STDERR_FILENO; 618 619 nxt_queue_init(&lib->contexts); 620 621 lib->use_count = 0; 622 lib->request_count = 0; 623 lib->router_port = NULL; 624 lib->shared_port = NULL; 625 626 rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data); 627 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 628 pthread_mutex_destroy(&lib->mutex); 629 goto fail; 630 } 631 632 cb = &lib->callbacks; 633 634 if (cb->request_handler == NULL) { 635 nxt_unit_alert(NULL, "request_handler is NULL"); 636 637 pthread_mutex_destroy(&lib->mutex); 638 goto fail; 639 } 640 641 nxt_unit_mmaps_init(&lib->incoming); 642 nxt_unit_mmaps_init(&lib->outgoing); 643 644 return lib; 645 646 fail: 647 648 nxt_unit_free(NULL, lib); 649 650 return NULL; 651 } 652 653 654 static int 655 nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, 656 void *data) 657 { 658 int rc; 659 660 ctx_impl->ctx.data = data; 661 ctx_impl->ctx.unit = &lib->unit; 662 663 rc = pthread_mutex_init(&ctx_impl->mutex, NULL); 664 if (nxt_slow_path(rc != 0)) { 665 nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc); 666 667 return NXT_UNIT_ERROR; 668 } 669 670 nxt_unit_lib_use(lib); 671 672 pthread_mutex_lock(&lib->mutex); 673 674 nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link); 675 676 pthread_mutex_unlock(&lib->mutex); 677 678 ctx_impl->use_count = 1; 679 ctx_impl->wait_items = 0; 680 ctx_impl->online = 1; 681 ctx_impl->ready = 0; 682 ctx_impl->quit_param = NXT_QUIT_GRACEFUL; 683 684 nxt_queue_init(&ctx_impl->free_req); 685 nxt_queue_init(&ctx_impl->free_ws); 686 nxt_queue_init(&ctx_impl->active_req); 687 nxt_queue_init(&ctx_impl->ready_req); 688 nxt_queue_init(&ctx_impl->pending_rbuf); 689 nxt_queue_init(&ctx_impl->free_rbuf); 690 691 ctx_impl->free_buf = NULL; 692 nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]); 693 nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]); 694 695 nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link); 696 nxt_queue_insert_tail(&ctx_impl->free_rbuf, &ctx_impl->ctx_read_buf.link); 697 698 ctx_impl->ctx_read_buf.ctx_impl = ctx_impl; 699 700 ctx_impl->req.req.ctx = &ctx_impl->ctx; 701 ctx_impl->req.req.unit = &lib->unit; 702 703 ctx_impl->read_port = NULL; 704 ctx_impl->requests.slot = 0; 705 706 return NXT_UNIT_OK; 707 } 708 709 710 nxt_inline void 711 nxt_unit_ctx_use(nxt_unit_ctx_t *ctx) 712 { 713 nxt_unit_ctx_impl_t *ctx_impl; 714 715 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 716 717 nxt_atomic_fetch_add(&ctx_impl->use_count, 1); 718 } 719 720 721 nxt_inline void 722 nxt_unit_ctx_release(nxt_unit_ctx_t *ctx) 723 { 724 long c; 725 nxt_unit_ctx_impl_t *ctx_impl; 726 727 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 728 729 c = nxt_atomic_fetch_add(&ctx_impl->use_count, -1); 730 731 if (c == 1) { 732 nxt_unit_ctx_free(ctx_impl); 733 } 734 } 735 736 737 nxt_inline void 738 nxt_unit_lib_use(nxt_unit_impl_t *lib) 739 { 740 nxt_atomic_fetch_add(&lib->use_count, 1); 741 } 742 743 744 nxt_inline void 745 nxt_unit_lib_release(nxt_unit_impl_t *lib) 746 { 747 long c; 748 nxt_unit_process_t *process; 749 750 c = nxt_atomic_fetch_add(&lib->use_count, -1); 751 752 if (c == 1) { 753 for ( ;; ) { 754 pthread_mutex_lock(&lib->mutex); 755 756 process = nxt_unit_process_pop_first(lib); 757 if (process == NULL) { 758 pthread_mutex_unlock(&lib->mutex); 759 760 break; 761 } 762 763 nxt_unit_remove_process(lib, process); 764 } 765 766 pthread_mutex_destroy(&lib->mutex); 767 768 if (nxt_fast_path(lib->router_port != NULL)) { 769 nxt_unit_port_release(lib->router_port); 770 } 771 772 if (nxt_fast_path(lib->shared_port != NULL)) { 773 nxt_unit_port_release(lib->shared_port); 774 } 775 776 nxt_unit_mmaps_destroy(&lib->incoming); 777 nxt_unit_mmaps_destroy(&lib->outgoing); 778 779 nxt_unit_free(NULL, lib); 780 } 781 } 782 783 784 nxt_inline void 785 nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head, 786 nxt_unit_mmap_buf_t *mmap_buf) 787 { 788 mmap_buf->next = *head; 789 790 if (mmap_buf->next != NULL) { 791 mmap_buf->next->prev = &mmap_buf->next; 792 } 793 794 *head = mmap_buf; 795 mmap_buf->prev = head; 796 } 797 798 799 nxt_inline void 800 nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev, 801 nxt_unit_mmap_buf_t *mmap_buf) 802 { 803 while (*prev != NULL) { 804 prev = &(*prev)->next; 805 } 806 807 nxt_unit_mmap_buf_insert(prev, mmap_buf); 808 } 809 810 811 nxt_inline void 812 nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf) 813 { 814 nxt_unit_mmap_buf_t **prev; 815 816 prev = mmap_buf->prev; 817 818 if (mmap_buf->next != NULL) { 819 mmap_buf->next->prev = prev; 820 } 821 822 if (prev != NULL) { 823 *prev = mmap_buf->next; 824 } 825 } 826 827 828 static int 829 nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, 830 nxt_unit_port_t *read_port, int *shared_port_fd, int *shared_queue_fd, 831 int *log_fd, uint32_t *stream, 832 uint32_t *shm_limit, uint32_t *request_limit) 833 { 834 int rc; 835 int ready_fd, router_fd, read_in_fd, read_out_fd; 836 char *unit_init, *version_end, *vars; 837 size_t version_length; 838 int64_t ready_pid, router_pid, read_pid; 839 uint32_t ready_stream, router_id, ready_id, read_id; 840 841 unit_init = getenv(NXT_UNIT_INIT_ENV); 842 if (nxt_slow_path(unit_init == NULL)) { 843 nxt_unit_alert(NULL, "%s is not in the current environment", 844 NXT_UNIT_INIT_ENV); 845 846 return NXT_UNIT_ERROR; 847 } 848 849 version_end = strchr(unit_init, ';'); 850 if (nxt_slow_path(version_end == NULL)) { 851 nxt_unit_alert(NULL, "Unit version not found in %s=\"%s\"", 852 NXT_UNIT_INIT_ENV, unit_init); 853 854 return NXT_UNIT_ERROR; 855 } 856 857 version_length = version_end - unit_init; 858 859 rc = version_length != nxt_length(NXT_VERSION) 860 || memcmp(unit_init, NXT_VERSION, nxt_length(NXT_VERSION)); 861 862 if (nxt_slow_path(rc != 0)) { 863 nxt_unit_alert(NULL, "versions mismatch: the Unit daemon has version " 864 "%.*s, while the app was compiled with libunit %s", 865 (int) version_length, unit_init, NXT_VERSION); 866 867 return NXT_UNIT_ERROR; 868 } 869 870 vars = version_end + 1; 871 872 rc = sscanf(vars, 873 "%"PRIu32";" 874 "%"PRId64",%"PRIu32",%d;" 875 "%"PRId64",%"PRIu32",%d;" 876 "%"PRId64",%"PRIu32",%d,%d;" 877 "%d,%d;" 878 "%d,%"PRIu32",%"PRIu32, 879 &ready_stream, 880 &ready_pid, &ready_id, &ready_fd, 881 &router_pid, &router_id, &router_fd, 882 &read_pid, &read_id, &read_in_fd, &read_out_fd, 883 shared_port_fd, shared_queue_fd, 884 log_fd, shm_limit, request_limit); 885 886 if (nxt_slow_path(rc == EOF)) { 887 nxt_unit_alert(NULL, "sscanf(%s) failed: %s (%d) for %s env", 888 vars, strerror(errno), errno, NXT_UNIT_INIT_ENV); 889 890 return NXT_UNIT_ERROR; 891 } 892 893 if (nxt_slow_path(rc != 16)) { 894 nxt_unit_alert(NULL, "invalid number of variables in %s env: " 895 "found %d of %d in %s", NXT_UNIT_INIT_ENV, rc, 16, vars); 896 897 return NXT_UNIT_ERROR; 898 } 899 900 nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init); 901 902 nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id); 903 904 ready_port->in_fd = -1; 905 ready_port->out_fd = ready_fd; 906 ready_port->data = NULL; 907 908 nxt_unit_port_id_init(&router_port->id, (pid_t) router_pid, router_id); 909 910 router_port->in_fd = -1; 911 router_port->out_fd = router_fd; 912 router_port->data = NULL; 913 914 nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id); 915 916 read_port->in_fd = read_in_fd; 917 read_port->out_fd = read_out_fd; 918 read_port->data = NULL; 919 920 *stream = ready_stream; 921 922 return NXT_UNIT_OK; 923 } 924 925 926 static int 927 nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd) 928 { 929 ssize_t res; 930 nxt_send_oob_t oob; 931 nxt_port_msg_t msg; 932 nxt_unit_impl_t *lib; 933 int fds[2] = {queue_fd, -1}; 934 935 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 936 937 msg.stream = stream; 938 msg.pid = lib->pid; 939 msg.reply_port = 0; 940 msg.type = _NXT_PORT_MSG_PROCESS_READY; 941 msg.last = 1; 942 msg.mmap = 0; 943 msg.nf = 0; 944 msg.mf = 0; 945 msg.tracking = 0; 946 947 nxt_socket_msg_oob_init(&oob, fds); 948 949 res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg), &oob); 950 if (res != sizeof(msg)) { 951 return NXT_UNIT_ERROR; 952 } 953 954 return NXT_UNIT_OK; 955 } 956 957 958 static int 959 nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf, 960 nxt_unit_request_info_t **preq) 961 { 962 int rc; 963 pid_t pid; 964 uint8_t quit_param; 965 nxt_port_msg_t *port_msg; 966 nxt_unit_impl_t *lib; 967 nxt_unit_recv_msg_t recv_msg; 968 969 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 970 971 recv_msg.incoming_buf = NULL; 972 recv_msg.fd[0] = -1; 973 recv_msg.fd[1] = -1; 974 975 rc = nxt_socket_msg_oob_get_fds(&rbuf->oob, recv_msg.fd); 976 if (nxt_slow_path(rc != NXT_OK)) { 977 nxt_unit_alert(ctx, "failed to receive file descriptor over cmsg"); 978 rc = NXT_UNIT_ERROR; 979 goto done; 980 } 981 982 if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) { 983 if (nxt_slow_path(rbuf->size == 0)) { 984 nxt_unit_debug(ctx, "read port closed"); 985 986 nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL); 987 rc = NXT_UNIT_OK; 988 goto done; 989 } 990 991 nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size); 992 993 rc = NXT_UNIT_ERROR; 994 goto done; 995 } 996 997 port_msg = (nxt_port_msg_t *) rbuf->buf; 998 999 nxt_unit_debug(ctx, "#%"PRIu32": process message %d fd[0] %d fd[1] %d", 1000 port_msg->stream, (int) port_msg->type, 1001 recv_msg.fd[0], recv_msg.fd[1]); 1002 1003 recv_msg.stream = port_msg->stream; 1004 recv_msg.pid = port_msg->pid; 1005 recv_msg.reply_port = port_msg->reply_port; 1006 recv_msg.last = port_msg->last; 1007 recv_msg.mmap = port_msg->mmap; 1008 1009 recv_msg.start = port_msg + 1; 1010 recv_msg.size = rbuf->size - sizeof(nxt_port_msg_t); 1011 1012 if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) { 1013 nxt_unit_alert(ctx, "#%"PRIu32": unknown message type (%d)", 1014 port_msg->stream, (int) port_msg->type); 1015 rc = NXT_UNIT_ERROR; 1016 goto done; 1017 } 1018 1019 /* Fragmentation is unsupported. */ 1020 if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) { 1021 nxt_unit_alert(ctx, "#%"PRIu32": fragmented message type (%d)", 1022 port_msg->stream, (int) port_msg->type); 1023 rc = NXT_UNIT_ERROR; 1024 goto done; 1025 } 1026 1027 if (port_msg->mmap) { 1028 rc = nxt_unit_mmap_read(ctx, &recv_msg, rbuf); 1029 1030 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 1031 if (rc == NXT_UNIT_AGAIN) { 1032 recv_msg.fd[0] = -1; 1033 recv_msg.fd[1] = -1; 1034 } 1035 1036 goto done; 1037 } 1038 } 1039 1040 switch (port_msg->type) { 1041 1042 case _NXT_PORT_MSG_RPC_READY: 1043 rc = NXT_UNIT_OK; 1044 break; 1045 1046 case _NXT_PORT_MSG_QUIT: 1047 if (recv_msg.size == sizeof(quit_param)) { 1048 memcpy(&quit_param, recv_msg.start, sizeof(quit_param)); 1049 1050 } else { 1051 quit_param = NXT_QUIT_NORMAL; 1052 } 1053 1054 nxt_unit_debug(ctx, "#%"PRIu32": %squit", port_msg->stream, 1055 (quit_param == NXT_QUIT_GRACEFUL ? "graceful " : "")); 1056 1057 nxt_unit_quit(ctx, quit_param); 1058 1059 rc = NXT_UNIT_OK; 1060 break; 1061 1062 case _NXT_PORT_MSG_NEW_PORT: 1063 rc = nxt_unit_process_new_port(ctx, &recv_msg); 1064 break; 1065 1066 case _NXT_PORT_MSG_PORT_ACK: 1067 rc = nxt_unit_ctx_ready(ctx); 1068 break; 1069 1070 case _NXT_PORT_MSG_CHANGE_FILE: 1071 nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d", 1072 port_msg->stream, recv_msg.fd[0]); 1073 1074 if (dup2(recv_msg.fd[0], lib->log_fd) == -1) { 1075 nxt_unit_alert(ctx, "#%"PRIu32": dup2(%d, %d) failed: %s (%d)", 1076 port_msg->stream, recv_msg.fd[0], lib->log_fd, 1077 strerror(errno), errno); 1078 1079 rc = NXT_UNIT_ERROR; 1080 goto done; 1081 } 1082 1083 rc = NXT_UNIT_OK; 1084 break; 1085 1086 case _NXT_PORT_MSG_MMAP: 1087 if (nxt_slow_path(recv_msg.fd[0] < 0)) { 1088 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap", 1089 port_msg->stream, recv_msg.fd[0]); 1090 1091 rc = NXT_UNIT_ERROR; 1092 goto done; 1093 } 1094 1095 rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd[0]); 1096 break; 1097 1098 case _NXT_PORT_MSG_REQ_HEADERS: 1099 rc = nxt_unit_process_req_headers(ctx, &recv_msg, preq); 1100 break; 1101 1102 case _NXT_PORT_MSG_REQ_BODY: 1103 rc = nxt_unit_process_req_body(ctx, &recv_msg); 1104 break; 1105 1106 case _NXT_PORT_MSG_WEBSOCKET: 1107 rc = nxt_unit_process_websocket(ctx, &recv_msg); 1108 break; 1109 1110 case _NXT_PORT_MSG_REMOVE_PID: 1111 if (nxt_slow_path(recv_msg.size != sizeof(pid))) { 1112 nxt_unit_alert(ctx, "#%"PRIu32": remove_pid: invalid message size " 1113 "(%d != %d)", port_msg->stream, (int) recv_msg.size, 1114 (int) sizeof(pid)); 1115 1116 rc = NXT_UNIT_ERROR; 1117 goto done; 1118 } 1119 1120 memcpy(&pid, recv_msg.start, sizeof(pid)); 1121 1122 nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d", 1123 port_msg->stream, (int) pid); 1124 1125 nxt_unit_remove_pid(lib, pid); 1126 1127 rc = NXT_UNIT_OK; 1128 break; 1129 1130 case _NXT_PORT_MSG_SHM_ACK: 1131 rc = nxt_unit_process_shm_ack(ctx); 1132 break; 1133 1134 default: 1135 nxt_unit_alert(ctx, "#%"PRIu32": ignore message type: %d", 1136 port_msg->stream, (int) port_msg->type); 1137 1138 rc = NXT_UNIT_ERROR; 1139 goto done; 1140 } 1141 1142 done: 1143 1144 if (recv_msg.fd[0] != -1) { 1145 nxt_unit_close(recv_msg.fd[0]); 1146 } 1147 1148 if (recv_msg.fd[1] != -1) { 1149 nxt_unit_close(recv_msg.fd[1]); 1150 } 1151 1152 while (recv_msg.incoming_buf != NULL) { 1153 nxt_unit_mmap_buf_free(recv_msg.incoming_buf); 1154 } 1155 1156 if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) { 1157 #if (NXT_DEBUG) 1158 memset(rbuf->buf, 0xAC, rbuf->size); 1159 #endif 1160 nxt_unit_read_buf_release(ctx, rbuf); 1161 } 1162 1163 return rc; 1164 } 1165 1166 1167 static int 1168 nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 1169 { 1170 void *mem; 1171 nxt_unit_port_t new_port, *port; 1172 nxt_port_msg_new_port_t *new_port_msg; 1173 1174 if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) { 1175 nxt_unit_warn(ctx, "#%"PRIu32": new_port: " 1176 "invalid message size (%d)", 1177 recv_msg->stream, (int) recv_msg->size); 1178 1179 return NXT_UNIT_ERROR; 1180 } 1181 1182 if (nxt_slow_path(recv_msg->fd[0] < 0)) { 1183 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port", 1184 recv_msg->stream, recv_msg->fd[0]); 1185 1186 return NXT_UNIT_ERROR; 1187 } 1188 1189 new_port_msg = recv_msg->start; 1190 1191 nxt_unit_debug(ctx, "#%"PRIu32": new_port: port{%d,%d} fd[0] %d fd[1] %d", 1192 recv_msg->stream, (int) new_port_msg->pid, 1193 (int) new_port_msg->id, recv_msg->fd[0], recv_msg->fd[1]); 1194 1195 if (nxt_slow_path(nxt_unit_fd_blocking(recv_msg->fd[0]) != NXT_UNIT_OK)) { 1196 return NXT_UNIT_ERROR; 1197 } 1198 1199 nxt_unit_port_id_init(&new_port.id, new_port_msg->pid, new_port_msg->id); 1200 1201 new_port.in_fd = -1; 1202 new_port.out_fd = recv_msg->fd[0]; 1203 1204 mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE, 1205 MAP_SHARED, recv_msg->fd[1], 0); 1206 1207 if (nxt_slow_path(mem == MAP_FAILED)) { 1208 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd[1], 1209 strerror(errno), errno); 1210 1211 return NXT_UNIT_ERROR; 1212 } 1213 1214 new_port.data = NULL; 1215 1216 recv_msg->fd[0] = -1; 1217 1218 port = nxt_unit_add_port(ctx, &new_port, mem); 1219 if (nxt_slow_path(port == NULL)) { 1220 return NXT_UNIT_ERROR; 1221 } 1222 1223 nxt_unit_port_release(port); 1224 1225 return NXT_UNIT_OK; 1226 } 1227 1228 1229 static int 1230 nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx) 1231 { 1232 nxt_unit_impl_t *lib; 1233 nxt_unit_ctx_impl_t *ctx_impl; 1234 1235 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1236 1237 if (nxt_slow_path(ctx_impl->ready)) { 1238 return NXT_UNIT_OK; 1239 } 1240 1241 ctx_impl->ready = 1; 1242 1243 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1244 1245 /* Call ready_handler() only for main context. */ 1246 if (&lib->main_ctx == ctx_impl && lib->callbacks.ready_handler != NULL) { 1247 return lib->callbacks.ready_handler(ctx); 1248 } 1249 1250 if (&lib->main_ctx != ctx_impl) { 1251 /* Check if the main context is already stopped or quit. */ 1252 if (nxt_slow_path(!lib->main_ctx.ready)) { 1253 ctx_impl->ready = 0; 1254 1255 nxt_unit_quit(ctx, lib->main_ctx.quit_param); 1256 1257 return NXT_UNIT_OK; 1258 } 1259 1260 if (lib->callbacks.add_port != NULL) { 1261 lib->callbacks.add_port(ctx, lib->shared_port); 1262 } 1263 } 1264 1265 return NXT_UNIT_OK; 1266 } 1267 1268 1269 static int 1270 nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, 1271 nxt_unit_request_info_t **preq) 1272 { 1273 int res; 1274 nxt_unit_impl_t *lib; 1275 nxt_unit_port_id_t port_id; 1276 nxt_unit_request_t *r; 1277 nxt_unit_mmap_buf_t *b; 1278 nxt_unit_request_info_t *req; 1279 nxt_unit_request_info_impl_t *req_impl; 1280 1281 if (nxt_slow_path(recv_msg->mmap == 0)) { 1282 nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory", 1283 recv_msg->stream); 1284 1285 return NXT_UNIT_ERROR; 1286 } 1287 1288 if (nxt_slow_path(recv_msg->size < sizeof(nxt_unit_request_t))) { 1289 nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least " 1290 "%d expected", recv_msg->stream, (int) recv_msg->size, 1291 (int) sizeof(nxt_unit_request_t)); 1292 1293 return NXT_UNIT_ERROR; 1294 } 1295 1296 req_impl = nxt_unit_request_info_get(ctx); 1297 if (nxt_slow_path(req_impl == NULL)) { 1298 nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed", 1299 recv_msg->stream); 1300 1301 return NXT_UNIT_ERROR; 1302 } 1303 1304 req = &req_impl->req; 1305 1306 req->request = recv_msg->start; 1307 1308 b = recv_msg->incoming_buf; 1309 1310 req->request_buf = &b->buf; 1311 req->response = NULL; 1312 req->response_buf = NULL; 1313 1314 r = req->request; 1315 1316 req->content_length = r->content_length; 1317 1318 req->content_buf = req->request_buf; 1319 req->content_buf->free = nxt_unit_sptr_get(&r->preread_content); 1320 1321 req_impl->stream = recv_msg->stream; 1322 1323 req_impl->outgoing_buf = NULL; 1324 1325 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) { 1326 b->req = req; 1327 } 1328 1329 /* "Move" incoming buffer list to req_impl. */ 1330 req_impl->incoming_buf = recv_msg->incoming_buf; 1331 req_impl->incoming_buf->prev = &req_impl->incoming_buf; 1332 recv_msg->incoming_buf = NULL; 1333 1334 req->content_fd = recv_msg->fd[0]; 1335 recv_msg->fd[0] = -1; 1336 1337 req->response_max_fields = 0; 1338 req_impl->state = NXT_UNIT_RS_START; 1339 req_impl->websocket = 0; 1340 req_impl->in_hash = 0; 1341 1342 nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream, 1343 (int) r->method_length, 1344 (char *) nxt_unit_sptr_get(&r->method), 1345 (int) r->target_length, 1346 (char *) nxt_unit_sptr_get(&r->target), 1347 (int) r->content_length); 1348 1349 nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port); 1350 1351 res = nxt_unit_request_check_response_port(req, &port_id); 1352 if (nxt_slow_path(res == NXT_UNIT_ERROR)) { 1353 return NXT_UNIT_ERROR; 1354 } 1355 1356 if (nxt_fast_path(res == NXT_UNIT_OK)) { 1357 res = nxt_unit_send_req_headers_ack(req); 1358 if (nxt_slow_path(res == NXT_UNIT_ERROR)) { 1359 nxt_unit_request_done(req, NXT_UNIT_ERROR); 1360 1361 return NXT_UNIT_ERROR; 1362 } 1363 1364 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1365 1366 if (req->content_length 1367 > (uint64_t) (req->content_buf->end - req->content_buf->free)) 1368 { 1369 res = nxt_unit_request_hash_add(ctx, req); 1370 if (nxt_slow_path(res != NXT_UNIT_OK)) { 1371 nxt_unit_req_warn(req, "failed to add request to hash"); 1372 1373 nxt_unit_request_done(req, NXT_UNIT_ERROR); 1374 1375 return NXT_UNIT_ERROR; 1376 } 1377 1378 /* 1379 * If application have separate data handler, we may start 1380 * request processing and process data when it is arrived. 1381 */ 1382 if (lib->callbacks.data_handler == NULL) { 1383 return NXT_UNIT_OK; 1384 } 1385 } 1386 1387 if (preq == NULL) { 1388 lib->callbacks.request_handler(req); 1389 1390 } else { 1391 *preq = req; 1392 } 1393 } 1394 1395 return NXT_UNIT_OK; 1396 } 1397 1398 1399 static int 1400 nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 1401 { 1402 uint64_t l; 1403 nxt_unit_impl_t *lib; 1404 nxt_unit_mmap_buf_t *b; 1405 nxt_unit_request_info_t *req; 1406 1407 req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last); 1408 if (req == NULL) { 1409 return NXT_UNIT_OK; 1410 } 1411 1412 l = req->content_buf->end - req->content_buf->free; 1413 1414 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) { 1415 b->req = req; 1416 l += b->buf.end - b->buf.free; 1417 } 1418 1419 if (recv_msg->incoming_buf != NULL) { 1420 b = nxt_container_of(req->content_buf, nxt_unit_mmap_buf_t, buf); 1421 1422 while (b->next != NULL) { 1423 b = b->next; 1424 } 1425 1426 /* "Move" incoming buffer list to req_impl. */ 1427 b->next = recv_msg->incoming_buf; 1428 b->next->prev = &b->next; 1429 1430 recv_msg->incoming_buf = NULL; 1431 } 1432 1433 req->content_fd = recv_msg->fd[0]; 1434 recv_msg->fd[0] = -1; 1435 1436 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1437 1438 if (lib->callbacks.data_handler != NULL) { 1439 lib->callbacks.data_handler(req); 1440 1441 return NXT_UNIT_OK; 1442 } 1443 1444 if (req->content_fd != -1 || l == req->content_length) { 1445 lib->callbacks.request_handler(req); 1446 } 1447 1448 return NXT_UNIT_OK; 1449 } 1450 1451 1452 static int 1453 nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, 1454 nxt_unit_port_id_t *port_id) 1455 { 1456 int res; 1457 nxt_unit_ctx_t *ctx; 1458 nxt_unit_impl_t *lib; 1459 nxt_unit_port_t *port; 1460 nxt_unit_process_t *process; 1461 nxt_unit_ctx_impl_t *ctx_impl; 1462 nxt_unit_port_impl_t *port_impl; 1463 nxt_unit_request_info_impl_t *req_impl; 1464 1465 ctx = req->ctx; 1466 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1467 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1468 1469 pthread_mutex_lock(&lib->mutex); 1470 1471 port = nxt_unit_port_hash_find(&lib->ports, port_id, 0); 1472 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 1473 1474 if (nxt_fast_path(port != NULL)) { 1475 req->response_port = port; 1476 1477 if (nxt_fast_path(port_impl->ready)) { 1478 pthread_mutex_unlock(&lib->mutex); 1479 1480 nxt_unit_debug(ctx, "check_response_port: found port{%d,%d}", 1481 (int) port->id.pid, (int) port->id.id); 1482 1483 return NXT_UNIT_OK; 1484 } 1485 1486 nxt_unit_debug(ctx, "check_response_port: " 1487 "port{%d,%d} already requested", 1488 (int) port->id.pid, (int) port->id.id); 1489 1490 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1491 1492 nxt_queue_insert_tail(&port_impl->awaiting_req, 1493 &req_impl->port_wait_link); 1494 1495 pthread_mutex_unlock(&lib->mutex); 1496 1497 nxt_atomic_fetch_add(&ctx_impl->wait_items, 1); 1498 1499 return NXT_UNIT_AGAIN; 1500 } 1501 1502 port_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_port_impl_t)); 1503 if (nxt_slow_path(port_impl == NULL)) { 1504 nxt_unit_alert(ctx, "check_response_port: malloc(%d) failed", 1505 (int) sizeof(nxt_unit_port_impl_t)); 1506 1507 pthread_mutex_unlock(&lib->mutex); 1508 1509 return NXT_UNIT_ERROR; 1510 } 1511 1512 port = &port_impl->port; 1513 1514 port->id = *port_id; 1515 port->in_fd = -1; 1516 port->out_fd = -1; 1517 port->data = NULL; 1518 1519 res = nxt_unit_port_hash_add(&lib->ports, port); 1520 if (nxt_slow_path(res != NXT_UNIT_OK)) { 1521 nxt_unit_alert(ctx, "check_response_port: %d,%d hash_add failed", 1522 port->id.pid, port->id.id); 1523 1524 pthread_mutex_unlock(&lib->mutex); 1525 1526 nxt_unit_free(ctx, port); 1527 1528 return NXT_UNIT_ERROR; 1529 } 1530 1531 process = nxt_unit_process_find(lib, port_id->pid, 0); 1532 if (nxt_slow_path(process == NULL)) { 1533 nxt_unit_alert(ctx, "check_response_port: process %d not found", 1534 port->id.pid); 1535 1536 nxt_unit_port_hash_find(&lib->ports, port_id, 1); 1537 1538 pthread_mutex_unlock(&lib->mutex); 1539 1540 nxt_unit_free(ctx, port); 1541 1542 return NXT_UNIT_ERROR; 1543 } 1544 1545 nxt_queue_insert_tail(&process->ports, &port_impl->link); 1546 1547 port_impl->process = process; 1548 port_impl->queue = NULL; 1549 port_impl->from_socket = 0; 1550 port_impl->socket_rbuf = NULL; 1551 1552 nxt_queue_init(&port_impl->awaiting_req); 1553 1554 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1555 1556 nxt_queue_insert_tail(&port_impl->awaiting_req, &req_impl->port_wait_link); 1557 1558 port_impl->use_count = 2; 1559 port_impl->ready = 0; 1560 1561 req->response_port = port; 1562 1563 pthread_mutex_unlock(&lib->mutex); 1564 1565 res = nxt_unit_get_port(ctx, port_id); 1566 if (nxt_slow_path(res == NXT_UNIT_ERROR)) { 1567 return NXT_UNIT_ERROR; 1568 } 1569 1570 nxt_atomic_fetch_add(&ctx_impl->wait_items, 1); 1571 1572 return NXT_UNIT_AGAIN; 1573 } 1574 1575 1576 static int 1577 nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req) 1578 { 1579 ssize_t res; 1580 nxt_port_msg_t msg; 1581 nxt_unit_impl_t *lib; 1582 nxt_unit_ctx_impl_t *ctx_impl; 1583 nxt_unit_request_info_impl_t *req_impl; 1584 1585 lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit); 1586 ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx); 1587 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1588 1589 memset(&msg, 0, sizeof(nxt_port_msg_t)); 1590 1591 msg.stream = req_impl->stream; 1592 msg.pid = lib->pid; 1593 msg.reply_port = ctx_impl->read_port->id.id; 1594 msg.type = _NXT_PORT_MSG_REQ_HEADERS_ACK; 1595 1596 res = nxt_unit_port_send(req->ctx, req->response_port, 1597 &msg, sizeof(msg), NULL); 1598 if (nxt_slow_path(res != sizeof(msg))) { 1599 return NXT_UNIT_ERROR; 1600 } 1601 1602 return NXT_UNIT_OK; 1603 } 1604 1605 1606 static int 1607 nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 1608 { 1609 size_t hsize; 1610 nxt_unit_impl_t *lib; 1611 nxt_unit_mmap_buf_t *b; 1612 nxt_unit_callbacks_t *cb; 1613 nxt_unit_request_info_t *req; 1614 nxt_unit_request_info_impl_t *req_impl; 1615 nxt_unit_websocket_frame_impl_t *ws_impl; 1616 1617 req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last); 1618 if (nxt_slow_path(req == NULL)) { 1619 return NXT_UNIT_OK; 1620 } 1621 1622 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1623 1624 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1625 cb = &lib->callbacks; 1626 1627 if (cb->websocket_handler && recv_msg->size >= 2) { 1628 ws_impl = nxt_unit_websocket_frame_get(ctx); 1629 if (nxt_slow_path(ws_impl == NULL)) { 1630 nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed", 1631 req_impl->stream); 1632 1633 return NXT_UNIT_ERROR; 1634 } 1635 1636 ws_impl->ws.req = req; 1637 1638 ws_impl->buf = NULL; 1639 1640 if (recv_msg->mmap) { 1641 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) { 1642 b->req = req; 1643 } 1644 1645 /* "Move" incoming buffer list to ws_impl. */ 1646 ws_impl->buf = recv_msg->incoming_buf; 1647 ws_impl->buf->prev = &ws_impl->buf; 1648 recv_msg->incoming_buf = NULL; 1649 1650 b = ws_impl->buf; 1651 1652 } else { 1653 b = nxt_unit_mmap_buf_get(ctx); 1654 if (nxt_slow_path(b == NULL)) { 1655 nxt_unit_alert(ctx, "#%"PRIu32": failed to allocate buf", 1656 req_impl->stream); 1657 1658 nxt_unit_websocket_frame_release(&ws_impl->ws); 1659 1660 return NXT_UNIT_ERROR; 1661 } 1662 1663 b->req = req; 1664 b->buf.start = recv_msg->start; 1665 b->buf.free = b->buf.start; 1666 b->buf.end = b->buf.start + recv_msg->size; 1667 1668 nxt_unit_mmap_buf_insert(&ws_impl->buf, b); 1669 } 1670 1671 ws_impl->ws.header = (void *) b->buf.start; 1672 ws_impl->ws.payload_len = nxt_websocket_frame_payload_len( 1673 ws_impl->ws.header); 1674 1675 hsize = nxt_websocket_frame_header_size(ws_impl->ws.header); 1676 1677 if (ws_impl->ws.header->mask) { 1678 ws_impl->ws.mask = (uint8_t *) b->buf.start + hsize - 4; 1679 1680 } else { 1681 ws_impl->ws.mask = NULL; 1682 } 1683 1684 b->buf.free += hsize; 1685 1686 ws_impl->ws.content_buf = &b->buf; 1687 ws_impl->ws.content_length = ws_impl->ws.payload_len; 1688 1689 nxt_unit_req_debug(req, "websocket_handler: opcode=%d, " 1690 "payload_len=%"PRIu64, 1691 ws_impl->ws.header->opcode, 1692 ws_impl->ws.payload_len); 1693 1694 cb->websocket_handler(&ws_impl->ws); 1695 } 1696 1697 if (recv_msg->last) { 1698 if (cb->close_handler) { 1699 nxt_unit_req_debug(req, "close_handler"); 1700 1701 cb->close_handler(req); 1702 1703 } else { 1704 nxt_unit_request_done(req, NXT_UNIT_ERROR); 1705 } 1706 } 1707 1708 return NXT_UNIT_OK; 1709 } 1710 1711 1712 static int 1713 nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx) 1714 { 1715 nxt_unit_impl_t *lib; 1716 nxt_unit_callbacks_t *cb; 1717 1718 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1719 cb = &lib->callbacks; 1720 1721 if (cb->shm_ack_handler != NULL) { 1722 cb->shm_ack_handler(ctx); 1723 } 1724 1725 return NXT_UNIT_OK; 1726 } 1727 1728 1729 static nxt_unit_request_info_impl_t * 1730 nxt_unit_request_info_get(nxt_unit_ctx_t *ctx) 1731 { 1732 nxt_unit_impl_t *lib; 1733 nxt_queue_link_t *lnk; 1734 nxt_unit_ctx_impl_t *ctx_impl; 1735 nxt_unit_request_info_impl_t *req_impl; 1736 1737 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1738 1739 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1740 1741 pthread_mutex_lock(&ctx_impl->mutex); 1742 1743 if (nxt_queue_is_empty(&ctx_impl->free_req)) { 1744 pthread_mutex_unlock(&ctx_impl->mutex); 1745 1746 req_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_request_info_impl_t) 1747 + lib->request_data_size); 1748 if (nxt_slow_path(req_impl == NULL)) { 1749 return NULL; 1750 } 1751 1752 req_impl->req.unit = ctx->unit; 1753 req_impl->req.ctx = ctx; 1754 1755 pthread_mutex_lock(&ctx_impl->mutex); 1756 1757 } else { 1758 lnk = nxt_queue_first(&ctx_impl->free_req); 1759 nxt_queue_remove(lnk); 1760 1761 req_impl = nxt_container_of(lnk, nxt_unit_request_info_impl_t, link); 1762 } 1763 1764 nxt_queue_insert_tail(&ctx_impl->active_req, &req_impl->link); 1765 1766 pthread_mutex_unlock(&ctx_impl->mutex); 1767 1768 req_impl->req.data = lib->request_data_size ? req_impl->extra_data : NULL; 1769 1770 return req_impl; 1771 } 1772 1773 1774 static void 1775 nxt_unit_request_info_release(nxt_unit_request_info_t *req) 1776 { 1777 nxt_unit_ctx_t *ctx; 1778 nxt_unit_ctx_impl_t *ctx_impl; 1779 nxt_unit_request_info_impl_t *req_impl; 1780 1781 ctx = req->ctx; 1782 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1783 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1784 1785 req->response = NULL; 1786 req->response_buf = NULL; 1787 1788 if (req_impl->in_hash) { 1789 nxt_unit_request_hash_find(req->ctx, req_impl->stream, 1); 1790 } 1791 1792 while (req_impl->outgoing_buf != NULL) { 1793 nxt_unit_mmap_buf_free(req_impl->outgoing_buf); 1794 } 1795 1796 while (req_impl->incoming_buf != NULL) { 1797 nxt_unit_mmap_buf_free(req_impl->incoming_buf); 1798 } 1799 1800 if (req->content_fd != -1) { 1801 nxt_unit_close(req->content_fd); 1802 1803 req->content_fd = -1; 1804 } 1805 1806 if (req->response_port != NULL) { 1807 nxt_unit_port_release(req->response_port); 1808 1809 req->response_port = NULL; 1810 } 1811 1812 req_impl->state = NXT_UNIT_RS_RELEASED; 1813 1814 pthread_mutex_lock(&ctx_impl->mutex); 1815 1816 nxt_queue_remove(&req_impl->link); 1817 1818 nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link); 1819 1820 pthread_mutex_unlock(&ctx_impl->mutex); 1821 1822 if (nxt_slow_path(!nxt_unit_chk_ready(ctx))) { 1823 nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL); 1824 } 1825 } 1826 1827 1828 static void 1829 nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl) 1830 { 1831 nxt_unit_ctx_impl_t *ctx_impl; 1832 1833 ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, ctx); 1834 1835 nxt_queue_remove(&req_impl->link); 1836 1837 if (req_impl != &ctx_impl->req) { 1838 nxt_unit_free(&ctx_impl->ctx, req_impl); 1839 } 1840 } 1841 1842 1843 static nxt_unit_websocket_frame_impl_t * 1844 nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx) 1845 { 1846 nxt_queue_link_t *lnk; 1847 nxt_unit_ctx_impl_t *ctx_impl; 1848 nxt_unit_websocket_frame_impl_t *ws_impl; 1849 1850 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1851 1852 pthread_mutex_lock(&ctx_impl->mutex); 1853 1854 if (nxt_queue_is_empty(&ctx_impl->free_ws)) { 1855 pthread_mutex_unlock(&ctx_impl->mutex); 1856 1857 ws_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_websocket_frame_impl_t)); 1858 if (nxt_slow_path(ws_impl == NULL)) { 1859 return NULL; 1860 } 1861 1862 } else { 1863 lnk = nxt_queue_first(&ctx_impl->free_ws); 1864 nxt_queue_remove(lnk); 1865 1866 pthread_mutex_unlock(&ctx_impl->mutex); 1867 1868 ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link); 1869 } 1870 1871 ws_impl->ctx_impl = ctx_impl; 1872 1873 return ws_impl; 1874 } 1875 1876 1877 static void 1878 nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws) 1879 { 1880 nxt_unit_websocket_frame_impl_t *ws_impl; 1881 1882 ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws); 1883 1884 while (ws_impl->buf != NULL) { 1885 nxt_unit_mmap_buf_free(ws_impl->buf); 1886 } 1887 1888 ws->req = NULL; 1889 1890 pthread_mutex_lock(&ws_impl->ctx_impl->mutex); 1891 1892 nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link); 1893 1894 pthread_mutex_unlock(&ws_impl->ctx_impl->mutex); 1895 } 1896 1897 1898 static void 1899 nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx, 1900 nxt_unit_websocket_frame_impl_t *ws_impl) 1901 { 1902 nxt_queue_remove(&ws_impl->link); 1903 1904 nxt_unit_free(ctx, ws_impl); 1905 } 1906 1907 1908 uint16_t 1909 nxt_unit_field_hash(const char *name, size_t name_length) 1910 { 1911 u_char ch; 1912 uint32_t hash; 1913 const char *p, *end; 1914 1915 hash = 159406; /* Magic value copied from nxt_http_parse.c */ 1916 end = name + name_length; 1917 1918 for (p = name; p < end; p++) { 1919 ch = *p; 1920 hash = (hash << 4) + hash + nxt_lowcase(ch); 1921 } 1922 1923 hash = (hash >> 16) ^ hash; 1924 1925 return hash; 1926 } 1927 1928 1929 void 1930 nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req) 1931 { 1932 char *name; 1933 uint32_t i, j; 1934 nxt_unit_field_t *fields, f; 1935 nxt_unit_request_t *r; 1936 1937 static nxt_str_t content_length = nxt_string("content-length"); 1938 static nxt_str_t content_type = nxt_string("content-type"); 1939 static nxt_str_t cookie = nxt_string("cookie"); 1940 1941 nxt_unit_req_debug(req, "group_dup_fields"); 1942 1943 r = req->request; 1944 fields = r->fields; 1945 1946 for (i = 0; i < r->fields_count; i++) { 1947 name = nxt_unit_sptr_get(&fields[i].name); 1948 1949 switch (fields[i].hash) { 1950 case NXT_UNIT_HASH_CONTENT_LENGTH: 1951 if (fields[i].name_length == content_length.length 1952 && nxt_unit_memcasecmp(name, content_length.start, 1953 content_length.length) == 0) 1954 { 1955 r->content_length_field = i; 1956 } 1957 1958 break; 1959 1960 case NXT_UNIT_HASH_CONTENT_TYPE: 1961 if (fields[i].name_length == content_type.length 1962 && nxt_unit_memcasecmp(name, content_type.start, 1963 content_type.length) == 0) 1964 { 1965 r->content_type_field = i; 1966 } 1967 1968 break; 1969 1970 case NXT_UNIT_HASH_COOKIE: 1971 if (fields[i].name_length == cookie.length 1972 && nxt_unit_memcasecmp(name, cookie.start, 1973 cookie.length) == 0) 1974 { 1975 r->cookie_field = i; 1976 } 1977 1978 break; 1979 } 1980 1981 for (j = i + 1; j < r->fields_count; j++) { 1982 if (fields[i].hash != fields[j].hash 1983 || fields[i].name_length != fields[j].name_length 1984 || nxt_unit_memcasecmp(name, 1985 nxt_unit_sptr_get(&fields[j].name), 1986 fields[j].name_length) != 0) 1987 { 1988 continue; 1989 } 1990 1991 f = fields[j]; 1992 f.value.offset += (j - (i + 1)) * sizeof(f); 1993 1994 while (j > i + 1) { 1995 fields[j] = fields[j - 1]; 1996 fields[j].name.offset -= sizeof(f); 1997 fields[j].value.offset -= sizeof(f); 1998 j--; 1999 } 2000 2001 fields[j] = f; 2002 2003 /* Assign the same name pointer for further grouping simplicity. */ 2004 nxt_unit_sptr_set(&fields[j].name, name); 2005 2006 i++; 2007 } 2008 } 2009 } 2010 2011 2012 int 2013 nxt_unit_response_init(nxt_unit_request_info_t *req, 2014 uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size) 2015 { 2016 uint32_t buf_size; 2017 nxt_unit_buf_t *buf; 2018 nxt_unit_request_info_impl_t *req_impl; 2019 2020 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2021 2022 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 2023 nxt_unit_req_warn(req, "init: response already sent"); 2024 2025 return NXT_UNIT_ERROR; 2026 } 2027 2028 nxt_unit_req_debug(req, "init: %d, max fields %d/%d", (int) status, 2029 (int) max_fields_count, (int) max_fields_size); 2030 2031 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT)) { 2032 nxt_unit_req_debug(req, "duplicate response init"); 2033 } 2034 2035 /* 2036 * Each field name and value 0-terminated by libunit, 2037 * this is the reason of '+ 2' below. 2038 */ 2039 buf_size = sizeof(nxt_unit_response_t) 2040 + max_fields_count * (sizeof(nxt_unit_field_t) + 2) 2041 + max_fields_size; 2042 2043 if (nxt_slow_path(req->response_buf != NULL)) { 2044 buf = req->response_buf; 2045 2046 if (nxt_fast_path(buf_size <= (uint32_t) (buf->end - buf->start))) { 2047 goto init_response; 2048 } 2049 2050 nxt_unit_buf_free(buf); 2051 2052 req->response_buf = NULL; 2053 req->response = NULL; 2054 req->response_max_fields = 0; 2055 2056 req_impl->state = NXT_UNIT_RS_START; 2057 } 2058 2059 buf = nxt_unit_response_buf_alloc(req, buf_size); 2060 if (nxt_slow_path(buf == NULL)) { 2061 return NXT_UNIT_ERROR; 2062 } 2063 2064 init_response: 2065 2066 memset(buf->start, 0, sizeof(nxt_unit_response_t)); 2067 2068 req->response_buf = buf; 2069 2070 req->response = (nxt_unit_response_t *) buf->start; 2071 req->response->status = status; 2072 2073 buf->free = buf->start + sizeof(nxt_unit_response_t) 2074 + max_fields_count * sizeof(nxt_unit_field_t); 2075 2076 req->response_max_fields = max_fields_count; 2077 req_impl->state = NXT_UNIT_RS_RESPONSE_INIT; 2078 2079 return NXT_UNIT_OK; 2080 } 2081 2082 2083 int 2084 nxt_unit_response_realloc(nxt_unit_request_info_t *req, 2085 uint32_t max_fields_count, uint32_t max_fields_size) 2086 { 2087 char *p; 2088 uint32_t i, buf_size; 2089 nxt_unit_buf_t *buf; 2090 nxt_unit_field_t *f, *src; 2091 nxt_unit_response_t *resp; 2092 nxt_unit_request_info_impl_t *req_impl; 2093 2094 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2095 2096 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2097 nxt_unit_req_warn(req, "realloc: response not init"); 2098 2099 return NXT_UNIT_ERROR; 2100 } 2101 2102 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 2103 nxt_unit_req_warn(req, "realloc: response already sent"); 2104 2105 return NXT_UNIT_ERROR; 2106 } 2107 2108 if (nxt_slow_path(max_fields_count < req->response->fields_count)) { 2109 nxt_unit_req_warn(req, "realloc: new max_fields_count is too small"); 2110 2111 return NXT_UNIT_ERROR; 2112 } 2113 2114 /* 2115 * Each field name and value 0-terminated by libunit, 2116 * this is the reason of '+ 2' below. 2117 */ 2118 buf_size = sizeof(nxt_unit_response_t) 2119 + max_fields_count * (sizeof(nxt_unit_field_t) + 2) 2120 + max_fields_size; 2121 2122 nxt_unit_req_debug(req, "realloc %"PRIu32"", buf_size); 2123 2124 buf = nxt_unit_response_buf_alloc(req, buf_size); 2125 if (nxt_slow_path(buf == NULL)) { 2126 nxt_unit_req_warn(req, "realloc: new buf allocation failed"); 2127 return NXT_UNIT_ERROR; 2128 } 2129 2130 resp = (nxt_unit_response_t *) buf->start; 2131 2132 memset(resp, 0, sizeof(nxt_unit_response_t)); 2133 2134 resp->status = req->response->status; 2135 resp->content_length = req->response->content_length; 2136 2137 p = buf->start + max_fields_count * sizeof(nxt_unit_field_t); 2138 f = resp->fields; 2139 2140 for (i = 0; i < req->response->fields_count; i++) { 2141 src = req->response->fields + i; 2142 2143 if (nxt_slow_path(src->skip != 0)) { 2144 continue; 2145 } 2146 2147 if (nxt_slow_path(src->name_length + src->value_length + 2 2148 > (uint32_t) (buf->end - p))) 2149 { 2150 nxt_unit_req_warn(req, "realloc: not enough space for field" 2151 " #%"PRIu32" (%p), (%"PRIu32" + %"PRIu32") required", 2152 i, src, src->name_length, src->value_length); 2153 2154 goto fail; 2155 } 2156 2157 nxt_unit_sptr_set(&f->name, p); 2158 p = nxt_cpymem(p, nxt_unit_sptr_get(&src->name), src->name_length); 2159 *p++ = '\0'; 2160 2161 nxt_unit_sptr_set(&f->value, p); 2162 p = nxt_cpymem(p, nxt_unit_sptr_get(&src->value), src->value_length); 2163 *p++ = '\0'; 2164 2165 f->hash = src->hash; 2166 f->skip = 0; 2167 f->name_length = src->name_length; 2168 f->value_length = src->value_length; 2169 2170 resp->fields_count++; 2171 f++; 2172 } 2173 2174 if (req->response->piggyback_content_length > 0) { 2175 if (nxt_slow_path(req->response->piggyback_content_length 2176 > (uint32_t) (buf->end - p))) 2177 { 2178 nxt_unit_req_warn(req, "realloc: not enought space for content" 2179 " #%"PRIu32", %"PRIu32" required", 2180 i, req->response->piggyback_content_length); 2181 2182 goto fail; 2183 } 2184 2185 resp->piggyback_content_length = 2186 req->response->piggyback_content_length; 2187 2188 nxt_unit_sptr_set(&resp->piggyback_content, p); 2189 p = nxt_cpymem(p, nxt_unit_sptr_get(&req->response->piggyback_content), 2190 req->response->piggyback_content_length); 2191 } 2192 2193 buf->free = p; 2194 2195 nxt_unit_buf_free(req->response_buf); 2196 2197 req->response = resp; 2198 req->response_buf = buf; 2199 req->response_max_fields = max_fields_count; 2200 2201 return NXT_UNIT_OK; 2202 2203 fail: 2204 2205 nxt_unit_buf_free(buf); 2206 2207 return NXT_UNIT_ERROR; 2208 } 2209 2210 2211 int 2212 nxt_unit_response_is_init(nxt_unit_request_info_t *req) 2213 { 2214 nxt_unit_request_info_impl_t *req_impl; 2215 2216 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2217 2218 return req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT; 2219 } 2220 2221 2222 int 2223 nxt_unit_response_add_field(nxt_unit_request_info_t *req, 2224 const char *name, uint8_t name_length, 2225 const char *value, uint32_t value_length) 2226 { 2227 nxt_unit_buf_t *buf; 2228 nxt_unit_field_t *f; 2229 nxt_unit_response_t *resp; 2230 nxt_unit_request_info_impl_t *req_impl; 2231 2232 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2233 2234 if (nxt_slow_path(req_impl->state != NXT_UNIT_RS_RESPONSE_INIT)) { 2235 nxt_unit_req_warn(req, "add_field: response not initialized or " 2236 "already sent"); 2237 2238 return NXT_UNIT_ERROR; 2239 } 2240 2241 resp = req->response; 2242 2243 if (nxt_slow_path(resp->fields_count >= req->response_max_fields)) { 2244 nxt_unit_req_warn(req, "add_field: too many response fields (%d)", 2245 (int) resp->fields_count); 2246 2247 return NXT_UNIT_ERROR; 2248 } 2249 2250 buf = req->response_buf; 2251 2252 if (nxt_slow_path(name_length + value_length + 2 2253 > (uint32_t) (buf->end - buf->free))) 2254 { 2255 nxt_unit_req_warn(req, "add_field: response buffer overflow"); 2256 2257 return NXT_UNIT_ERROR; 2258 } 2259 2260 nxt_unit_req_debug(req, "add_field #%"PRIu32": %.*s: %.*s", 2261 resp->fields_count, 2262 (int) name_length, name, 2263 (int) value_length, value); 2264 2265 f = resp->fields + resp->fields_count; 2266 2267 nxt_unit_sptr_set(&f->name, buf->free); 2268 buf->free = nxt_cpymem(buf->free, name, name_length); 2269 *buf->free++ = '\0'; 2270 2271 nxt_unit_sptr_set(&f->value, buf->free); 2272 buf->free = nxt_cpymem(buf->free, value, value_length); 2273 *buf->free++ = '\0'; 2274 2275 f->hash = nxt_unit_field_hash(name, name_length); 2276 f->skip = 0; 2277 f->name_length = name_length; 2278 f->value_length = value_length; 2279 2280 resp->fields_count++; 2281 2282 return NXT_UNIT_OK; 2283 } 2284 2285 2286 int 2287 nxt_unit_response_add_content(nxt_unit_request_info_t *req, 2288 const void* src, uint32_t size) 2289 { 2290 nxt_unit_buf_t *buf; 2291 nxt_unit_response_t *resp; 2292 nxt_unit_request_info_impl_t *req_impl; 2293 2294 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2295 2296 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2297 nxt_unit_req_warn(req, "add_content: response not initialized yet"); 2298 2299 return NXT_UNIT_ERROR; 2300 } 2301 2302 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 2303 nxt_unit_req_warn(req, "add_content: response already sent"); 2304 2305 return NXT_UNIT_ERROR; 2306 } 2307 2308 buf = req->response_buf; 2309 2310 if (nxt_slow_path(size > (uint32_t) (buf->end - buf->free))) { 2311 nxt_unit_req_warn(req, "add_content: buffer overflow"); 2312 2313 return NXT_UNIT_ERROR; 2314 } 2315 2316 resp = req->response; 2317 2318 if (resp->piggyback_content_length == 0) { 2319 nxt_unit_sptr_set(&resp->piggyback_content, buf->free); 2320 req_impl->state = NXT_UNIT_RS_RESPONSE_HAS_CONTENT; 2321 } 2322 2323 resp->piggyback_content_length += size; 2324 2325 buf->free = nxt_cpymem(buf->free, src, size); 2326 2327 return NXT_UNIT_OK; 2328 } 2329 2330 2331 int 2332 nxt_unit_response_send(nxt_unit_request_info_t *req) 2333 { 2334 int rc; 2335 nxt_unit_mmap_buf_t *mmap_buf; 2336 nxt_unit_request_info_impl_t *req_impl; 2337 2338 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2339 2340 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2341 nxt_unit_req_warn(req, "send: response is not initialized yet"); 2342 2343 return NXT_UNIT_ERROR; 2344 } 2345 2346 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 2347 nxt_unit_req_warn(req, "send: response already sent"); 2348 2349 return NXT_UNIT_ERROR; 2350 } 2351 2352 if (req->request->websocket_handshake && req->response->status == 101) { 2353 nxt_unit_response_upgrade(req); 2354 } 2355 2356 nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes", 2357 req->response->fields_count, 2358 (int) (req->response_buf->free 2359 - req->response_buf->start)); 2360 2361 mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf); 2362 2363 rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0); 2364 if (nxt_fast_path(rc == NXT_UNIT_OK)) { 2365 req->response = NULL; 2366 req->response_buf = NULL; 2367 req_impl->state = NXT_UNIT_RS_RESPONSE_SENT; 2368 2369 nxt_unit_mmap_buf_free(mmap_buf); 2370 } 2371 2372 return rc; 2373 } 2374 2375 2376 int 2377 nxt_unit_response_is_sent(nxt_unit_request_info_t *req) 2378 { 2379 nxt_unit_request_info_impl_t *req_impl; 2380 2381 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2382 2383 return req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT; 2384 } 2385 2386 2387 nxt_unit_buf_t * 2388 nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size) 2389 { 2390 int rc; 2391 nxt_unit_mmap_buf_t *mmap_buf; 2392 nxt_unit_request_info_impl_t *req_impl; 2393 2394 if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) { 2395 nxt_unit_req_warn(req, "response_buf_alloc: " 2396 "requested buffer (%"PRIu32") too big", size); 2397 2398 return NULL; 2399 } 2400 2401 nxt_unit_req_debug(req, "response_buf_alloc: %"PRIu32, size); 2402 2403 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2404 2405 mmap_buf = nxt_unit_mmap_buf_get(req->ctx); 2406 if (nxt_slow_path(mmap_buf == NULL)) { 2407 nxt_unit_req_alert(req, "response_buf_alloc: failed to allocate buf"); 2408 2409 return NULL; 2410 } 2411 2412 mmap_buf->req = req; 2413 2414 nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf); 2415 2416 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, 2417 size, size, mmap_buf, 2418 NULL); 2419 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2420 nxt_unit_mmap_buf_release(mmap_buf); 2421 2422 nxt_unit_req_alert(req, "response_buf_alloc: failed to get out buf"); 2423 2424 return NULL; 2425 } 2426 2427 return &mmap_buf->buf; 2428 } 2429 2430 2431 static nxt_unit_mmap_buf_t * 2432 nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx) 2433 { 2434 nxt_unit_mmap_buf_t *mmap_buf; 2435 nxt_unit_ctx_impl_t *ctx_impl; 2436 2437 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 2438 2439 pthread_mutex_lock(&ctx_impl->mutex); 2440 2441 if (ctx_impl->free_buf == NULL) { 2442 pthread_mutex_unlock(&ctx_impl->mutex); 2443 2444 mmap_buf = nxt_unit_malloc(ctx, sizeof(nxt_unit_mmap_buf_t)); 2445 if (nxt_slow_path(mmap_buf == NULL)) { 2446 return NULL; 2447 } 2448 2449 } else { 2450 mmap_buf = ctx_impl->free_buf; 2451 2452 nxt_unit_mmap_buf_unlink(mmap_buf); 2453 2454 pthread_mutex_unlock(&ctx_impl->mutex); 2455 } 2456 2457 mmap_buf->ctx_impl = ctx_impl; 2458 2459 mmap_buf->hdr = NULL; 2460 mmap_buf->free_ptr = NULL; 2461 2462 return mmap_buf; 2463 } 2464 2465 2466 static void 2467 nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf) 2468 { 2469 nxt_unit_mmap_buf_unlink(mmap_buf); 2470 2471 pthread_mutex_lock(&mmap_buf->ctx_impl->mutex); 2472 2473 nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf); 2474 2475 pthread_mutex_unlock(&mmap_buf->ctx_impl->mutex); 2476 } 2477 2478 2479 int 2480 nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req) 2481 { 2482 return req->request->websocket_handshake; 2483 } 2484 2485 2486 int 2487 nxt_unit_response_upgrade(nxt_unit_request_info_t *req) 2488 { 2489 int rc; 2490 nxt_unit_request_info_impl_t *req_impl; 2491 2492 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2493 2494 if (nxt_slow_path(req_impl->websocket != 0)) { 2495 nxt_unit_req_debug(req, "upgrade: already upgraded"); 2496 2497 return NXT_UNIT_OK; 2498 } 2499 2500 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2501 nxt_unit_req_warn(req, "upgrade: response is not initialized yet"); 2502 2503 return NXT_UNIT_ERROR; 2504 } 2505 2506 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 2507 nxt_unit_req_warn(req, "upgrade: response already sent"); 2508 2509 return NXT_UNIT_ERROR; 2510 } 2511 2512 rc = nxt_unit_request_hash_add(req->ctx, req); 2513 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2514 nxt_unit_req_warn(req, "upgrade: failed to add request to hash"); 2515 2516 return NXT_UNIT_ERROR; 2517 } 2518 2519 req_impl->websocket = 1; 2520 2521 req->response->status = 101; 2522 2523 return NXT_UNIT_OK; 2524 } 2525 2526 2527 int 2528 nxt_unit_response_is_websocket(nxt_unit_request_info_t *req) 2529 { 2530 nxt_unit_request_info_impl_t *req_impl; 2531 2532 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2533 2534 return req_impl->websocket; 2535 } 2536 2537 2538 nxt_unit_request_info_t * 2539 nxt_unit_get_request_info_from_data(void *data) 2540 { 2541 nxt_unit_request_info_impl_t *req_impl; 2542 2543 req_impl = nxt_container_of(data, nxt_unit_request_info_impl_t, extra_data); 2544 2545 return &req_impl->req; 2546 } 2547 2548 2549 int 2550 nxt_unit_buf_send(nxt_unit_buf_t *buf) 2551 { 2552 int rc; 2553 nxt_unit_mmap_buf_t *mmap_buf; 2554 nxt_unit_request_info_t *req; 2555 nxt_unit_request_info_impl_t *req_impl; 2556 2557 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); 2558 2559 req = mmap_buf->req; 2560 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2561 2562 nxt_unit_req_debug(req, "buf_send: %d bytes", 2563 (int) (buf->free - buf->start)); 2564 2565 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2566 nxt_unit_req_warn(req, "buf_send: response not initialized yet"); 2567 2568 return NXT_UNIT_ERROR; 2569 } 2570 2571 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) { 2572 nxt_unit_req_warn(req, "buf_send: headers not sent yet"); 2573 2574 return NXT_UNIT_ERROR; 2575 } 2576 2577 if (nxt_fast_path(buf->free > buf->start)) { 2578 rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0); 2579 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2580 return rc; 2581 } 2582 } 2583 2584 nxt_unit_mmap_buf_free(mmap_buf); 2585 2586 return NXT_UNIT_OK; 2587 } 2588 2589 2590 static void 2591 nxt_unit_buf_send_done(nxt_unit_buf_t *buf) 2592 { 2593 int rc; 2594 nxt_unit_mmap_buf_t *mmap_buf; 2595 nxt_unit_request_info_t *req; 2596 2597 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); 2598 2599 req = mmap_buf->req; 2600 2601 rc = nxt_unit_mmap_buf_send(req, mmap_buf, 1); 2602 if (nxt_slow_path(rc == NXT_UNIT_OK)) { 2603 nxt_unit_mmap_buf_free(mmap_buf); 2604 2605 nxt_unit_request_info_release(req); 2606 2607 } else { 2608 nxt_unit_request_done(req, rc); 2609 } 2610 } 2611 2612 2613 static int 2614 nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req, 2615 nxt_unit_mmap_buf_t *mmap_buf, int last) 2616 { 2617 struct { 2618 nxt_port_msg_t msg; 2619 nxt_port_mmap_msg_t mmap_msg; 2620 } m; 2621 2622 int rc; 2623 u_char *last_used, *first_free; 2624 ssize_t res; 2625 nxt_chunk_id_t first_free_chunk; 2626 nxt_unit_buf_t *buf; 2627 nxt_unit_impl_t *lib; 2628 nxt_port_mmap_header_t *hdr; 2629 nxt_unit_request_info_impl_t *req_impl; 2630 2631 lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit); 2632 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2633 2634 buf = &mmap_buf->buf; 2635 hdr = mmap_buf->hdr; 2636 2637 m.mmap_msg.size = buf->free - buf->start; 2638 2639 m.msg.stream = req_impl->stream; 2640 m.msg.pid = lib->pid; 2641 m.msg.reply_port = 0; 2642 m.msg.type = _NXT_PORT_MSG_DATA; 2643 m.msg.last = last != 0; 2644 m.msg.mmap = hdr != NULL && m.mmap_msg.size > 0; 2645 m.msg.nf = 0; 2646 m.msg.mf = 0; 2647 m.msg.tracking = 0; 2648 2649 rc = NXT_UNIT_ERROR; 2650 2651 if (m.msg.mmap) { 2652 m.mmap_msg.mmap_id = hdr->id; 2653 m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr, 2654 (u_char *) buf->start); 2655 2656 nxt_unit_debug(req->ctx, "#%"PRIu32": send mmap: (%d,%d,%d)", 2657 req_impl->stream, 2658 (int) m.mmap_msg.mmap_id, 2659 (int) m.mmap_msg.chunk_id, 2660 (int) m.mmap_msg.size); 2661 2662 res = nxt_unit_port_send(req->ctx, req->response_port, &m, sizeof(m), 2663 NULL); 2664 if (nxt_slow_path(res != sizeof(m))) { 2665 goto free_buf; 2666 } 2667 2668 last_used = (u_char *) buf->free - 1; 2669 first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1; 2670 2671 if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) { 2672 first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk); 2673 2674 buf->start = (char *) first_free; 2675 buf->free = buf->start; 2676 2677 if (buf->end < buf->start) { 2678 buf->end = buf->start; 2679 } 2680 2681 } else { 2682 buf->start = NULL; 2683 buf->free = NULL; 2684 buf->end = NULL; 2685 2686 mmap_buf->hdr = NULL; 2687 } 2688 2689 nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, 2690 (int) m.mmap_msg.chunk_id - (int) first_free_chunk); 2691 2692 nxt_unit_debug(req->ctx, "allocated_chunks %d", 2693 (int) lib->outgoing.allocated_chunks); 2694 2695 } else { 2696 if (nxt_slow_path(mmap_buf->plain_ptr == NULL 2697 || mmap_buf->plain_ptr > buf->start - sizeof(m.msg))) 2698 { 2699 nxt_unit_alert(req->ctx, 2700 "#%"PRIu32": failed to send plain memory buffer" 2701 ": no space reserved for message header", 2702 req_impl->stream); 2703 2704 goto free_buf; 2705 } 2706 2707 memcpy(buf->start - sizeof(m.msg), &m.msg, sizeof(m.msg)); 2708 2709 nxt_unit_debug(req->ctx, "#%"PRIu32": send plain: %d", 2710 req_impl->stream, 2711 (int) (sizeof(m.msg) + m.mmap_msg.size)); 2712 2713 res = nxt_unit_port_send(req->ctx, req->response_port, 2714 buf->start - sizeof(m.msg), 2715 m.mmap_msg.size + sizeof(m.msg), NULL); 2716 2717 if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) { 2718 goto free_buf; 2719 } 2720 } 2721 2722 rc = NXT_UNIT_OK; 2723 2724 free_buf: 2725 2726 nxt_unit_free_outgoing_buf(mmap_buf); 2727 2728 return rc; 2729 } 2730 2731 2732 void 2733 nxt_unit_buf_free(nxt_unit_buf_t *buf) 2734 { 2735 nxt_unit_mmap_buf_free(nxt_container_of(buf, nxt_unit_mmap_buf_t, buf)); 2736 } 2737 2738 2739 static void 2740 nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf) 2741 { 2742 nxt_unit_free_outgoing_buf(mmap_buf); 2743 2744 nxt_unit_mmap_buf_release(mmap_buf); 2745 } 2746 2747 2748 static void 2749 nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf) 2750 { 2751 if (mmap_buf->hdr != NULL) { 2752 nxt_unit_mmap_release(&mmap_buf->ctx_impl->ctx, 2753 mmap_buf->hdr, mmap_buf->buf.start, 2754 mmap_buf->buf.end - mmap_buf->buf.start); 2755 2756 mmap_buf->hdr = NULL; 2757 2758 return; 2759 } 2760 2761 if (mmap_buf->free_ptr != NULL) { 2762 nxt_unit_free(&mmap_buf->ctx_impl->ctx, mmap_buf->free_ptr); 2763 2764 mmap_buf->free_ptr = NULL; 2765 } 2766 } 2767 2768 2769 static nxt_unit_read_buf_t * 2770 nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx) 2771 { 2772 nxt_unit_ctx_impl_t *ctx_impl; 2773 nxt_unit_read_buf_t *rbuf; 2774 2775 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 2776 2777 pthread_mutex_lock(&ctx_impl->mutex); 2778 2779 rbuf = nxt_unit_read_buf_get_impl(ctx_impl); 2780 2781 pthread_mutex_unlock(&ctx_impl->mutex); 2782 2783 rbuf->oob.size = 0; 2784 2785 return rbuf; 2786 } 2787 2788 2789 static nxt_unit_read_buf_t * 2790 nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl) 2791 { 2792 nxt_queue_link_t *link; 2793 nxt_unit_read_buf_t *rbuf; 2794 2795 if (!nxt_queue_is_empty(&ctx_impl->free_rbuf)) { 2796 link = nxt_queue_first(&ctx_impl->free_rbuf); 2797 nxt_queue_remove(link); 2798 2799 rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link); 2800 2801 return rbuf; 2802 } 2803 2804 rbuf = nxt_unit_malloc(&ctx_impl->ctx, sizeof(nxt_unit_read_buf_t)); 2805 2806 if (nxt_fast_path(rbuf != NULL)) { 2807 rbuf->ctx_impl = ctx_impl; 2808 } 2809 2810 return rbuf; 2811 } 2812 2813 2814 static void 2815 nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx, 2816 nxt_unit_read_buf_t *rbuf) 2817 { 2818 nxt_unit_ctx_impl_t *ctx_impl; 2819 2820 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 2821 2822 pthread_mutex_lock(&ctx_impl->mutex); 2823 2824 nxt_queue_insert_head(&ctx_impl->free_rbuf, &rbuf->link); 2825 2826 pthread_mutex_unlock(&ctx_impl->mutex); 2827 } 2828 2829 2830 nxt_unit_buf_t * 2831 nxt_unit_buf_next(nxt_unit_buf_t *buf) 2832 { 2833 nxt_unit_mmap_buf_t *mmap_buf; 2834 2835 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); 2836 2837 if (mmap_buf->next == NULL) { 2838 return NULL; 2839 } 2840 2841 return &mmap_buf->next->buf; 2842 } 2843 2844 2845 uint32_t 2846 nxt_unit_buf_max(void) 2847 { 2848 return PORT_MMAP_DATA_SIZE; 2849 } 2850 2851 2852 uint32_t 2853 nxt_unit_buf_min(void) 2854 { 2855 return PORT_MMAP_CHUNK_SIZE; 2856 } 2857 2858 2859 int 2860 nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start, 2861 size_t size) 2862 { 2863 ssize_t res; 2864 2865 res = nxt_unit_response_write_nb(req, start, size, size); 2866 2867 return res < 0 ? -res : NXT_UNIT_OK; 2868 } 2869 2870 2871 ssize_t 2872 nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start, 2873 size_t size, size_t min_size) 2874 { 2875 int rc; 2876 ssize_t sent; 2877 uint32_t part_size, min_part_size, buf_size; 2878 const char *part_start; 2879 nxt_unit_mmap_buf_t mmap_buf; 2880 nxt_unit_request_info_impl_t *req_impl; 2881 char local_buf[NXT_UNIT_LOCAL_BUF_SIZE]; 2882 2883 nxt_unit_req_debug(req, "write: %d", (int) size); 2884 2885 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2886 2887 part_start = start; 2888 sent = 0; 2889 2890 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2891 nxt_unit_req_alert(req, "write: response not initialized yet"); 2892 2893 return -NXT_UNIT_ERROR; 2894 } 2895 2896 /* Check if response is not send yet. */ 2897 if (nxt_slow_path(req->response_buf != NULL)) { 2898 part_size = req->response_buf->end - req->response_buf->free; 2899 part_size = nxt_min(size, part_size); 2900 2901 rc = nxt_unit_response_add_content(req, part_start, part_size); 2902 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2903 return -rc; 2904 } 2905 2906 rc = nxt_unit_response_send(req); 2907 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2908 return -rc; 2909 } 2910 2911 size -= part_size; 2912 part_start += part_size; 2913 sent += part_size; 2914 2915 min_size -= nxt_min(min_size, part_size); 2916 } 2917 2918 while (size > 0) { 2919 part_size = nxt_min(size, PORT_MMAP_DATA_SIZE); 2920 min_part_size = nxt_min(min_size, part_size); 2921 min_part_size = nxt_min(min_part_size, PORT_MMAP_CHUNK_SIZE); 2922 2923 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, part_size, 2924 min_part_size, &mmap_buf, local_buf); 2925 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2926 return -rc; 2927 } 2928 2929 buf_size = mmap_buf.buf.end - mmap_buf.buf.free; 2930 if (nxt_slow_path(buf_size == 0)) { 2931 return sent; 2932 } 2933 part_size = nxt_min(buf_size, part_size); 2934 2935 mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free, 2936 part_start, part_size); 2937 2938 rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0); 2939 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2940 return -rc; 2941 } 2942 2943 size -= part_size; 2944 part_start += part_size; 2945 sent += part_size; 2946 2947 min_size -= nxt_min(min_size, part_size); 2948 } 2949 2950 return sent; 2951 } 2952 2953 2954 int 2955 nxt_unit_response_write_cb(nxt_unit_request_info_t *req, 2956 nxt_unit_read_info_t *read_info) 2957 { 2958 int rc; 2959 ssize_t n; 2960 uint32_t buf_size; 2961 nxt_unit_buf_t *buf; 2962 nxt_unit_mmap_buf_t mmap_buf; 2963 nxt_unit_request_info_impl_t *req_impl; 2964 char local_buf[NXT_UNIT_LOCAL_BUF_SIZE]; 2965 2966 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2967 2968 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2969 nxt_unit_req_alert(req, "write: response not initialized yet"); 2970 2971 return NXT_UNIT_ERROR; 2972 } 2973 2974 /* Check if response is not send yet. */ 2975 if (nxt_slow_path(req->response_buf != NULL)) { 2976 2977 /* Enable content in headers buf. */ 2978 rc = nxt_unit_response_add_content(req, "", 0); 2979 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2980 nxt_unit_req_error(req, "Failed to add piggyback content"); 2981 2982 return rc; 2983 } 2984 2985 buf = req->response_buf; 2986 2987 while (buf->end - buf->free > 0) { 2988 n = read_info->read(read_info, buf->free, buf->end - buf->free); 2989 if (nxt_slow_path(n < 0)) { 2990 nxt_unit_req_error(req, "Read error"); 2991 2992 return NXT_UNIT_ERROR; 2993 } 2994 2995 /* Manually increase sizes. */ 2996 buf->free += n; 2997 req->response->piggyback_content_length += n; 2998 2999 if (read_info->eof) { 3000 break; 3001 } 3002 } 3003 3004 rc = nxt_unit_response_send(req); 3005 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3006 nxt_unit_req_error(req, "Failed to send headers with content"); 3007 3008 return rc; 3009 } 3010 3011 if (read_info->eof) { 3012 return NXT_UNIT_OK; 3013 } 3014 } 3015 3016 while (!read_info->eof) { 3017 nxt_unit_req_debug(req, "write_cb, alloc %"PRIu32"", 3018 read_info->buf_size); 3019 3020 buf_size = nxt_min(read_info->buf_size, PORT_MMAP_DATA_SIZE); 3021 3022 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, 3023 buf_size, buf_size, 3024 &mmap_buf, local_buf); 3025 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3026 return rc; 3027 } 3028 3029 buf = &mmap_buf.buf; 3030 3031 while (!read_info->eof && buf->end > buf->free) { 3032 n = read_info->read(read_info, buf->free, buf->end - buf->free); 3033 if (nxt_slow_path(n < 0)) { 3034 nxt_unit_req_error(req, "Read error"); 3035 3036 nxt_unit_free_outgoing_buf(&mmap_buf); 3037 3038 return NXT_UNIT_ERROR; 3039 } 3040 3041 buf->free += n; 3042 } 3043 3044 rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0); 3045 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3046 nxt_unit_req_error(req, "Failed to send content"); 3047 3048 return rc; 3049 } 3050 } 3051 3052 return NXT_UNIT_OK; 3053 } 3054 3055 3056 ssize_t 3057 nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size) 3058 { 3059 ssize_t buf_res, res; 3060 3061 buf_res = nxt_unit_buf_read(&req->content_buf, &req->content_length, 3062 dst, size); 3063 3064 if (buf_res < (ssize_t) size && req->content_fd != -1) { 3065 res = read(req->content_fd, dst, size); 3066 if (nxt_slow_path(res < 0)) { 3067 nxt_unit_req_alert(req, "failed to read content: %s (%d)", 3068 strerror(errno), errno); 3069 3070 return res; 3071 } 3072 3073 if (res < (ssize_t) size) { 3074 nxt_unit_close(req->content_fd); 3075 3076 req->content_fd = -1; 3077 } 3078 3079 req->content_length -= res; 3080 size -= res; 3081 3082 dst = nxt_pointer_to(dst, res); 3083 3084 } else { 3085 res = 0; 3086 } 3087 3088 return buf_res + res; 3089 } 3090 3091 3092 ssize_t 3093 nxt_unit_request_readline_size(nxt_unit_request_info_t *req, size_t max_size) 3094 { 3095 char *p; 3096 size_t l_size, b_size; 3097 nxt_unit_buf_t *b; 3098 nxt_unit_mmap_buf_t *mmap_buf, *preread_buf; 3099 3100 if (req->content_length == 0) { 3101 return 0; 3102 } 3103 3104 l_size = 0; 3105 3106 b = req->content_buf; 3107 3108 while (b != NULL) { 3109 b_size = b->end - b->free; 3110 p = memchr(b->free, '\n', b_size); 3111 3112 if (p != NULL) { 3113 p++; 3114 l_size += p - b->free; 3115 break; 3116 } 3117 3118 l_size += b_size; 3119 3120 if (max_size <= l_size) { 3121 break; 3122 } 3123 3124 mmap_buf = nxt_container_of(b, nxt_unit_mmap_buf_t, buf); 3125 if (mmap_buf->next == NULL 3126 && req->content_fd != -1 3127 && l_size < req->content_length) 3128 { 3129 preread_buf = nxt_unit_request_preread(req, 16384); 3130 if (nxt_slow_path(preread_buf == NULL)) { 3131 return -1; 3132 } 3133 3134 nxt_unit_mmap_buf_insert(&mmap_buf->next, preread_buf); 3135 } 3136 3137 b =