1 2 /* 3 * Copyright (C) NGINX, Inc. 4 */ 5 6 #include "nxt_main.h" 7 #include "nxt_port_memory_int.h" 8 #include "nxt_socket_msg.h" 9 #include "nxt_port_queue.h" 10 #include "nxt_app_queue.h" 11 12 #include "nxt_unit.h" 13 #include "nxt_unit_request.h" 14 #include "nxt_unit_response.h" 15 #include "nxt_unit_websocket.h" 16 17 #include "nxt_websocket.h" 18 19 #if (NXT_HAVE_MEMFD_CREATE) 20 #include <linux/memfd.h> 21 #endif 22 23 #define NXT_UNIT_MAX_PLAIN_SIZE 1024 24 #define NXT_UNIT_LOCAL_BUF_SIZE \ 25 (NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t)) 26 27 enum { 28 NXT_QUIT_NORMAL = 0, 29 NXT_QUIT_GRACEFUL = 1, 30 }; 31 32 typedef struct nxt_unit_impl_s nxt_unit_impl_t; 33 typedef struct nxt_unit_mmap_s nxt_unit_mmap_t; 34 typedef struct nxt_unit_mmaps_s nxt_unit_mmaps_t; 35 typedef struct nxt_unit_process_s nxt_unit_process_t; 36 typedef struct nxt_unit_mmap_buf_s nxt_unit_mmap_buf_t; 37 typedef struct nxt_unit_recv_msg_s nxt_unit_recv_msg_t; 38 typedef struct nxt_unit_read_buf_s nxt_unit_read_buf_t; 39 typedef struct nxt_unit_ctx_impl_s nxt_unit_ctx_impl_t; 40 typedef struct nxt_unit_port_impl_s nxt_unit_port_impl_t; 41 typedef struct nxt_unit_request_info_impl_s nxt_unit_request_info_impl_t; 42 typedef struct nxt_unit_websocket_frame_impl_s nxt_unit_websocket_frame_impl_t; 43 44 static nxt_unit_impl_t *nxt_unit_create(nxt_unit_init_t *init); 45 static int nxt_unit_ctx_init(nxt_unit_impl_t *lib, 46 nxt_unit_ctx_impl_t *ctx_impl, void *data); 47 nxt_inline void nxt_unit_ctx_use(nxt_unit_ctx_t *ctx); 48 nxt_inline void nxt_unit_ctx_release(nxt_unit_ctx_t *ctx); 49 nxt_inline void nxt_unit_lib_use(nxt_unit_impl_t *lib); 50 nxt_inline void nxt_unit_lib_release(nxt_unit_impl_t *lib); 51 nxt_inline void nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head, 52 nxt_unit_mmap_buf_t *mmap_buf); 53 nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev, 54 nxt_unit_mmap_buf_t *mmap_buf); 55 nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf); 56 static int nxt_unit_read_env(nxt_unit_port_t *ready_port, 57 nxt_unit_port_t *router_port, nxt_unit_port_t *read_port, 58 int *shared_port_fd, int *shared_queue_fd, 59 int *log_fd, uint32_t *stream, uint32_t *shm_limit, 60 uint32_t *request_limit); 61 static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, 62 int queue_fd); 63 static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf, 64 nxt_unit_request_info_t **preq); 65 static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, 66 nxt_unit_recv_msg_t *recv_msg); 67 static int nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx); 68 static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, 69 nxt_unit_recv_msg_t *recv_msg, nxt_unit_request_info_t **preq); 70 static int nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, 71 nxt_unit_recv_msg_t *recv_msg); 72 static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, 73 nxt_unit_port_id_t *port_id); 74 static int nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req); 75 static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, 76 nxt_unit_recv_msg_t *recv_msg); 77 static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx); 78 static nxt_unit_request_info_impl_t *nxt_unit_request_info_get( 79 nxt_unit_ctx_t *ctx); 80 static void nxt_unit_request_info_release(nxt_unit_request_info_t *req); 81 static void nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req); 82 static nxt_unit_websocket_frame_impl_t *nxt_unit_websocket_frame_get( 83 nxt_unit_ctx_t *ctx); 84 static void nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws); 85 static void nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx, 86 nxt_unit_websocket_frame_impl_t *ws); 87 static nxt_unit_mmap_buf_t *nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx); 88 static void nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf); 89 static int nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req, 90 nxt_unit_mmap_buf_t *mmap_buf, int last); 91 static void nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf); 92 static void nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf); 93 static nxt_unit_read_buf_t *nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx); 94 static nxt_unit_read_buf_t *nxt_unit_read_buf_get_impl( 95 nxt_unit_ctx_impl_t *ctx_impl); 96 static void nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx, 97 nxt_unit_read_buf_t *rbuf); 98 static nxt_unit_mmap_buf_t *nxt_unit_request_preread( 99 nxt_unit_request_info_t *req, size_t size); 100 static ssize_t nxt_unit_buf_read(nxt_unit_buf_t **b, uint64_t *len, void *dst, 101 size_t size); 102 static nxt_port_mmap_header_t *nxt_unit_mmap_get(nxt_unit_ctx_t *ctx, 103 nxt_unit_port_t *port, nxt_chunk_id_t *c, int *n, int min_n); 104 static int nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); 105 static int nxt_unit_wait_shm_ack(nxt_unit_ctx_t *ctx); 106 static nxt_unit_mmap_t *nxt_unit_mmap_at(nxt_unit_mmaps_t *mmaps, uint32_t i); 107 static nxt_port_mmap_header_t *nxt_unit_new_mmap(nxt_unit_ctx_t *ctx, 108 nxt_unit_port_t *port, int n); 109 static int nxt_unit_shm_open(nxt_unit_ctx_t *ctx, size_t size); 110 static int nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 111 int fd); 112 static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx, 113 nxt_unit_port_t *port, uint32_t size, 114 uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf); 115 static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd); 116 117 static void nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx, 118 nxt_unit_ctx_impl_t *ctx_impl); 119 static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps); 120 nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process); 121 nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process); 122 static void nxt_unit_mmaps_destroy(nxt_unit_mmaps_t *mmaps); 123 static int nxt_unit_check_rbuf_mmap(nxt_unit_ctx_t *ctx, 124 nxt_unit_mmaps_t *mmaps, pid_t pid, uint32_t id, 125 nxt_port_mmap_header_t **hdr, nxt_unit_read_buf_t *rbuf); 126 static int nxt_unit_mmap_read(nxt_unit_ctx_t *ctx, 127 nxt_unit_recv_msg_t *recv_msg, nxt_unit_read_buf_t *rbuf); 128 static int nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id); 129 static void nxt_unit_mmap_release(nxt_unit_ctx_t *ctx, 130 nxt_port_mmap_header_t *hdr, void *start, uint32_t size); 131 static int nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid); 132 133 static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx, pid_t pid); 134 static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib, 135 pid_t pid, int remove); 136 static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib); 137 static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx); 138 static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf); 139 static int nxt_unit_chk_ready(nxt_unit_ctx_t *ctx); 140 static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx); 141 static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx); 142 nxt_inline int nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf); 143 nxt_inline int nxt_unit_is_read_socket(nxt_unit_read_buf_t *rbuf); 144 nxt_inline int nxt_unit_is_shm_ack(nxt_unit_read_buf_t *rbuf); 145 nxt_inline int nxt_unit_is_quit(nxt_unit_read_buf_t *rbuf); 146 static int nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, 147 nxt_unit_port_t *port); 148 static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl); 149 static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx); 150 151 static int nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst, 152 nxt_unit_port_t *port, int queue_fd); 153 154 nxt_inline void nxt_unit_port_use(nxt_unit_port_t *port); 155 nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port); 156 static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx, 157 nxt_unit_port_t *port, void *queue); 158 static void nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx, 159 nxt_queue_t *awaiting_req); 160 static void nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_ctx_t *ctx, 161 nxt_unit_port_id_t *port_id); 162 static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib, 163 nxt_unit_port_id_t *port_id); 164 static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid); 165 static void nxt_unit_remove_process(nxt_unit_impl_t *lib, 166 nxt_unit_process_t *process); 167 static void nxt_unit_quit(nxt_unit_ctx_t *ctx, uint8_t quit_param); 168 static int nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id); 169 static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx, 170 nxt_unit_port_t *port, const void *buf, size_t buf_size, 171 const nxt_send_oob_t *oob); 172 static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd, 173 const void *buf, size_t buf_size, const nxt_send_oob_t *oob); 174 static int nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 175 nxt_unit_read_buf_t *rbuf); 176 nxt_inline void nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst, 177 nxt_unit_read_buf_t *src); 178 static int nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 179 nxt_unit_read_buf_t *rbuf); 180 static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 181 nxt_unit_read_buf_t *rbuf); 182 static int nxt_unit_port_queue_recv(nxt_unit_port_t *port, 183 nxt_unit_read_buf_t *rbuf); 184 static int nxt_unit_app_queue_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, 185 nxt_unit_read_buf_t *rbuf); 186 nxt_inline int nxt_unit_close(int fd); 187 static int nxt_unit_fd_blocking(int fd); 188 189 static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash, 190 nxt_unit_port_t *port); 191 static nxt_unit_port_t *nxt_unit_port_hash_find(nxt_lvlhsh_t *port_hash, 192 nxt_unit_port_id_t *port_id, int remove); 193 194 static int nxt_unit_request_hash_add(nxt_unit_ctx_t *ctx, 195 nxt_unit_request_info_t *req); 196 static nxt_unit_request_info_t *nxt_unit_request_hash_find( 197 nxt_unit_ctx_t *ctx, uint32_t stream, int remove); 198 199 static char * nxt_unit_snprint_prefix(char *p, const char *end, pid_t pid, 200 int level); 201 static void *nxt_unit_lvlhsh_alloc(void *data, size_t size); 202 static void nxt_unit_lvlhsh_free(void *data, void *p); 203 static int nxt_unit_memcasecmp(const void *p1, const void *p2, size_t length); 204 205 206 struct nxt_unit_mmap_buf_s { 207 nxt_unit_buf_t buf; 208 209 nxt_unit_mmap_buf_t *next; 210 nxt_unit_mmap_buf_t **prev; 211 212 nxt_port_mmap_header_t *hdr; 213 nxt_unit_request_info_t *req; 214 nxt_unit_ctx_impl_t *ctx_impl; 215 char *free_ptr; 216 char *plain_ptr; 217 }; 218 219 220 struct nxt_unit_recv_msg_s { 221 uint32_t stream; 222 nxt_pid_t pid; 223 nxt_port_id_t reply_port; 224 225 uint8_t last; /* 1 bit */ 226 uint8_t mmap; /* 1 bit */ 227 228 void *start; 229 uint32_t size; 230 231 int fd[2]; 232 233 nxt_unit_mmap_buf_t *incoming_buf; 234 }; 235 236 237 typedef enum { 238 NXT_UNIT_RS_START = 0, 239 NXT_UNIT_RS_RESPONSE_INIT, 240 NXT_UNIT_RS_RESPONSE_HAS_CONTENT, 241 NXT_UNIT_RS_RESPONSE_SENT, 242 NXT_UNIT_RS_RELEASED, 243 } nxt_unit_req_state_t; 244 245 246 struct nxt_unit_request_info_impl_s { 247 nxt_unit_request_info_t req; 248 249 uint32_t stream; 250 251 nxt_unit_mmap_buf_t *outgoing_buf; 252 nxt_unit_mmap_buf_t *incoming_buf; 253 254 nxt_unit_req_state_t state; 255 uint8_t websocket; 256 uint8_t in_hash; 257 258 /* for nxt_unit_ctx_impl_t.free_req or active_req */ 259 nxt_queue_link_t link; 260 /* for nxt_unit_port_impl_t.awaiting_req */ 261 nxt_queue_link_t port_wait_link; 262 263 char extra_data[]; 264 }; 265 266 267 struct nxt_unit_websocket_frame_impl_s { 268 nxt_unit_websocket_frame_t ws; 269 270 nxt_unit_mmap_buf_t *buf; 271 272 nxt_queue_link_t link; 273 274 nxt_unit_ctx_impl_t *ctx_impl; 275 }; 276 277 278 struct nxt_unit_read_buf_s { 279 nxt_queue_link_t link; 280 nxt_unit_ctx_impl_t *ctx_impl; 281 ssize_t size; 282 nxt_recv_oob_t oob; 283 char buf[16384]; 284 }; 285 286 287 struct nxt_unit_ctx_impl_s { 288 nxt_unit_ctx_t ctx; 289 290 nxt_atomic_t use_count; 291 nxt_atomic_t wait_items; 292 293 pthread_mutex_t mutex; 294 295 nxt_unit_port_t *read_port; 296 297 nxt_queue_link_t link; 298 299 nxt_unit_mmap_buf_t *free_buf; 300 301 /* of nxt_unit_request_info_impl_t */ 302 nxt_queue_t free_req; 303 304 /* of nxt_unit_websocket_frame_impl_t */ 305 nxt_queue_t free_ws; 306 307 /* of nxt_unit_request_info_impl_t */ 308 nxt_queue_t active_req; 309 310 /* of nxt_unit_request_info_impl_t */ 311 nxt_lvlhsh_t requests; 312 313 /* of nxt_unit_request_info_impl_t */ 314 nxt_queue_t ready_req; 315 316 /* of nxt_unit_read_buf_t */ 317 nxt_queue_t pending_rbuf; 318 319 /* of nxt_unit_read_buf_t */ 320 nxt_queue_t free_rbuf; 321 322 uint8_t online; /* 1 bit */ 323 uint8_t ready; /* 1 bit */ 324 uint8_t quit_param; 325 326 nxt_unit_mmap_buf_t ctx_buf[2]; 327 nxt_unit_read_buf_t ctx_read_buf; 328 329 nxt_unit_request_info_impl_t req; 330 }; 331 332 333 struct nxt_unit_mmap_s { 334 nxt_port_mmap_header_t *hdr; 335 pthread_t src_thread; 336 337 /* of nxt_unit_read_buf_t */ 338 nxt_queue_t awaiting_rbuf; 339 }; 340 341 342 struct nxt_unit_mmaps_s { 343 pthread_mutex_t mutex; 344 uint32_t size; 345 uint32_t cap; 346 nxt_atomic_t allocated_chunks; 347 nxt_unit_mmap_t *elts; 348 }; 349 350 351 struct nxt_unit_impl_s { 352 nxt_unit_t unit; 353 nxt_unit_callbacks_t callbacks; 354 355 nxt_atomic_t use_count; 356 nxt_atomic_t request_count; 357 358 uint32_t request_data_size; 359 uint32_t shm_mmap_limit; 360 uint32_t request_limit; 361 362 pthread_mutex_t mutex; 363 364 nxt_lvlhsh_t processes; /* of nxt_unit_process_t */ 365 nxt_lvlhsh_t ports; /* of nxt_unit_port_impl_t */ 366 367 nxt_unit_port_t *router_port; 368 nxt_unit_port_t *shared_port; 369 370 nxt_queue_t contexts; /* of nxt_unit_ctx_impl_t */ 371 372 nxt_unit_mmaps_t incoming; 373 nxt_unit_mmaps_t outgoing; 374 375 pid_t pid; 376 int log_fd; 377 378 nxt_unit_ctx_impl_t main_ctx; 379 }; 380 381 382 struct nxt_unit_port_impl_s { 383 nxt_unit_port_t port; 384 385 nxt_atomic_t use_count; 386 387 /* for nxt_unit_process_t.ports */ 388 nxt_queue_link_t link; 389 nxt_unit_process_t *process; 390 391 /* of nxt_unit_request_info_impl_t */ 392 nxt_queue_t awaiting_req; 393 394 int ready; 395 396 void *queue; 397 398 int from_socket; 399 nxt_unit_read_buf_t *socket_rbuf; 400 }; 401 402 403 struct nxt_unit_process_s { 404 pid_t pid; 405 406 nxt_queue_t ports; /* of nxt_unit_port_impl_t */ 407 408 nxt_unit_impl_t *lib; 409 410 nxt_atomic_t use_count; 411 412 uint32_t next_port_id; 413 }; 414 415 416 /* Explicitly using 32 bit types to avoid possible alignment. */ 417 typedef struct { 418 int32_t pid; 419 uint32_t id; 420 } nxt_unit_port_hash_id_t; 421 422 423 static pid_t nxt_unit_pid; 424 425 426 nxt_unit_ctx_t * 427 nxt_unit_init(nxt_unit_init_t *init) 428 { 429 int rc, queue_fd, shared_queue_fd; 430 void *mem; 431 uint32_t ready_stream, shm_limit, request_limit; 432 nxt_unit_ctx_t *ctx; 433 nxt_unit_impl_t *lib; 434 nxt_unit_port_t ready_port, router_port, read_port, shared_port; 435 436 nxt_unit_pid = getpid(); 437 438 lib = nxt_unit_create(init); 439 if (nxt_slow_path(lib == NULL)) { 440 return NULL; 441 } 442 443 queue_fd = -1; 444 mem = MAP_FAILED; 445 shared_port.out_fd = -1; 446 shared_port.data = NULL; 447 448 if (init->ready_port.id.pid != 0 449 && init->ready_stream != 0 450 && init->read_port.id.pid != 0) 451 { 452 ready_port = init->ready_port; 453 ready_stream = init->ready_stream; 454 router_port = init->router_port; 455 read_port = init->read_port; 456 lib->log_fd = init->log_fd; 457 458 nxt_unit_port_id_init(&ready_port.id, ready_port.id.pid, 459 ready_port.id.id); 460 nxt_unit_port_id_init(&router_port.id, router_port.id.pid, 461 router_port.id.id); 462 nxt_unit_port_id_init(&read_port.id, read_port.id.pid, 463 read_port.id.id); 464 465 shared_port.in_fd = init->shared_port_fd; 466 shared_queue_fd = init->shared_queue_fd; 467 468 } else { 469 rc = nxt_unit_read_env(&ready_port, &router_port, &read_port, 470 &shared_port.in_fd, &shared_queue_fd, 471 &lib->log_fd, &ready_stream, &shm_limit, 472 &request_limit); 473 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 474 goto fail; 475 } 476 477 lib->shm_mmap_limit = (shm_limit + PORT_MMAP_DATA_SIZE - 1) 478 / PORT_MMAP_DATA_SIZE; 479 lib->request_limit = request_limit; 480 } 481 482 if (nxt_slow_path(lib->shm_mmap_limit < 1)) { 483 lib->shm_mmap_limit = 1; 484 } 485 486 lib->pid = read_port.id.pid; 487 nxt_unit_pid = lib->pid; 488 489 ctx = &lib->main_ctx.ctx; 490 491 rc = nxt_unit_fd_blocking(router_port.out_fd); 492 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 493 goto fail; 494 } 495 496 lib->router_port = nxt_unit_add_port(ctx, &router_port, NULL); 497 if (nxt_slow_path(lib->router_port == NULL)) { 498 nxt_unit_alert(NULL, "failed to add router_port"); 499 500 goto fail; 501 } 502 503 queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t)); 504 if (nxt_slow_path(queue_fd == -1)) { 505 goto fail; 506 } 507 508 mem = mmap(NULL, sizeof(nxt_port_queue_t), 509 PROT_READ | PROT_WRITE, MAP_SHARED, queue_fd, 0); 510 if (nxt_slow_path(mem == MAP_FAILED)) { 511 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", queue_fd, 512 strerror(errno), errno); 513 514 goto fail; 515 } 516 517 nxt_port_queue_init(mem); 518 519 rc = nxt_unit_fd_blocking(read_port.in_fd); 520 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 521 goto fail; 522 } 523 524 lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port, mem); 525 if (nxt_slow_path(lib->main_ctx.read_port == NULL)) { 526 nxt_unit_alert(NULL, "failed to add read_port"); 527 528 goto fail; 529 } 530 531 rc = nxt_unit_fd_blocking(ready_port.out_fd); 532 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 533 goto fail; 534 } 535 536 nxt_unit_port_id_init(&shared_port.id, read_port.id.pid, 537 NXT_UNIT_SHARED_PORT_ID); 538 539 mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE, 540 MAP_SHARED, shared_queue_fd, 0); 541 if (nxt_slow_path(mem == MAP_FAILED)) { 542 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", shared_queue_fd, 543 strerror(errno), errno); 544 545 goto fail; 546 } 547 548 nxt_unit_close(shared_queue_fd); 549 550 lib->shared_port = nxt_unit_add_port(ctx, &shared_port, mem); 551 if (nxt_slow_path(lib->shared_port == NULL)) { 552 nxt_unit_alert(NULL, "failed to add shared_port"); 553 554 goto fail; 555 } 556 557 rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream, queue_fd); 558 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 559 nxt_unit_alert(NULL, "failed to send READY message"); 560 561 goto fail; 562 } 563 564 nxt_unit_close(ready_port.out_fd); 565 nxt_unit_close(queue_fd); 566 567 return ctx; 568 569 fail: 570 571 if (mem != MAP_FAILED) { 572 munmap(mem, sizeof(nxt_port_queue_t)); 573 } 574 575 if (queue_fd != -1) { 576 nxt_unit_close(queue_fd); 577 } 578 579 nxt_unit_ctx_release(&lib->main_ctx.ctx); 580 581 return NULL; 582 } 583 584 585 static nxt_unit_impl_t * 586 nxt_unit_create(nxt_unit_init_t *init) 587 { 588 int rc; 589 nxt_unit_impl_t *lib; 590 nxt_unit_callbacks_t *cb; 591 592 lib = nxt_unit_malloc(NULL, 593 sizeof(nxt_unit_impl_t) + init->request_data_size); 594 if (nxt_slow_path(lib == NULL)) { 595 nxt_unit_alert(NULL, "failed to allocate unit struct"); 596 597 return NULL; 598 } 599 600 rc = pthread_mutex_init(&lib->mutex, NULL); 601 if (nxt_slow_path(rc != 0)) { 602 nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc); 603 604 goto fail; 605 } 606 607 lib->unit.data = init->data; 608 lib->callbacks = init->callbacks; 609 610 lib->request_data_size = init->request_data_size; 611 lib->shm_mmap_limit = (init->shm_limit + PORT_MMAP_DATA_SIZE - 1) 612 / PORT_MMAP_DATA_SIZE; 613 lib->request_limit = init->request_limit; 614 615 lib->processes.slot = NULL; 616 lib->ports.slot = NULL; 617 618 lib->log_fd = STDERR_FILENO; 619 620 nxt_queue_init(&lib->contexts); 621 622 lib->use_count = 0; 623 lib->request_count = 0; 624 lib->router_port = NULL; 625 lib->shared_port = NULL; 626 627 rc = nxt_unit_ctx_init(lib, &lib->main_ctx, init->ctx_data); 628 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 629 pthread_mutex_destroy(&lib->mutex); 630 goto fail; 631 } 632 633 cb = &lib->callbacks; 634 635 if (cb->request_handler == NULL) { 636 nxt_unit_alert(NULL, "request_handler is NULL"); 637 638 pthread_mutex_destroy(&lib->mutex); 639 goto fail; 640 } 641 642 nxt_unit_mmaps_init(&lib->incoming); 643 nxt_unit_mmaps_init(&lib->outgoing); 644 645 return lib; 646 647 fail: 648 649 nxt_unit_free(NULL, lib); 650 651 return NULL; 652 } 653 654 655 static int 656 nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl, 657 void *data) 658 { 659 int rc; 660 661 ctx_impl->ctx.data = data; 662 ctx_impl->ctx.unit = &lib->unit; 663 664 rc = pthread_mutex_init(&ctx_impl->mutex, NULL); 665 if (nxt_slow_path(rc != 0)) { 666 nxt_unit_alert(NULL, "failed to initialize mutex (%d)", rc); 667 668 return NXT_UNIT_ERROR; 669 } 670 671 nxt_unit_lib_use(lib); 672 673 pthread_mutex_lock(&lib->mutex); 674 675 nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link); 676 677 pthread_mutex_unlock(&lib->mutex); 678 679 ctx_impl->use_count = 1; 680 ctx_impl->wait_items = 0; 681 ctx_impl->online = 1; 682 ctx_impl->ready = 0; 683 ctx_impl->quit_param = NXT_QUIT_GRACEFUL; 684 685 nxt_queue_init(&ctx_impl->free_req); 686 nxt_queue_init(&ctx_impl->free_ws); 687 nxt_queue_init(&ctx_impl->active_req); 688 nxt_queue_init(&ctx_impl->ready_req); 689 nxt_queue_init(&ctx_impl->pending_rbuf); 690 nxt_queue_init(&ctx_impl->free_rbuf); 691 692 ctx_impl->free_buf = NULL; 693 nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]); 694 nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[0]); 695 696 nxt_queue_insert_tail(&ctx_impl->free_req, &ctx_impl->req.link); 697 nxt_queue_insert_tail(&ctx_impl->free_rbuf, &ctx_impl->ctx_read_buf.link); 698 699 ctx_impl->ctx_read_buf.ctx_impl = ctx_impl; 700 701 ctx_impl->req.req.ctx = &ctx_impl->ctx; 702 ctx_impl->req.req.unit = &lib->unit; 703 704 ctx_impl->read_port = NULL; 705 ctx_impl->requests.slot = 0; 706 707 return NXT_UNIT_OK; 708 } 709 710 711 nxt_inline void 712 nxt_unit_ctx_use(nxt_unit_ctx_t *ctx) 713 { 714 nxt_unit_ctx_impl_t *ctx_impl; 715 716 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 717 718 nxt_atomic_fetch_add(&ctx_impl->use_count, 1); 719 } 720 721 722 nxt_inline void 723 nxt_unit_ctx_release(nxt_unit_ctx_t *ctx) 724 { 725 long c; 726 nxt_unit_ctx_impl_t *ctx_impl; 727 728 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 729 730 c = nxt_atomic_fetch_add(&ctx_impl->use_count, -1); 731 732 if (c == 1) { 733 nxt_unit_ctx_free(ctx_impl); 734 } 735 } 736 737 738 nxt_inline void 739 nxt_unit_lib_use(nxt_unit_impl_t *lib) 740 { 741 nxt_atomic_fetch_add(&lib->use_count, 1); 742 } 743 744 745 nxt_inline void 746 nxt_unit_lib_release(nxt_unit_impl_t *lib) 747 { 748 long c; 749 nxt_unit_process_t *process; 750 751 c = nxt_atomic_fetch_add(&lib->use_count, -1); 752 753 if (c == 1) { 754 for ( ;; ) { 755 pthread_mutex_lock(&lib->mutex); 756 757 process = nxt_unit_process_pop_first(lib); 758 if (process == NULL) { 759 pthread_mutex_unlock(&lib->mutex); 760 761 break; 762 } 763 764 nxt_unit_remove_process(lib, process); 765 } 766 767 pthread_mutex_destroy(&lib->mutex); 768 769 if (nxt_fast_path(lib->router_port != NULL)) { 770 nxt_unit_port_release(lib->router_port); 771 } 772 773 if (nxt_fast_path(lib->shared_port != NULL)) { 774 nxt_unit_port_release(lib->shared_port); 775 } 776 777 nxt_unit_mmaps_destroy(&lib->incoming); 778 nxt_unit_mmaps_destroy(&lib->outgoing); 779 780 nxt_unit_free(NULL, lib); 781 } 782 } 783 784 785 nxt_inline void 786 nxt_unit_mmap_buf_insert(nxt_unit_mmap_buf_t **head, 787 nxt_unit_mmap_buf_t *mmap_buf) 788 { 789 mmap_buf->next = *head; 790 791 if (mmap_buf->next != NULL) { 792 mmap_buf->next->prev = &mmap_buf->next; 793 } 794 795 *head = mmap_buf; 796 mmap_buf->prev = head; 797 } 798 799 800 nxt_inline void 801 nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev, 802 nxt_unit_mmap_buf_t *mmap_buf) 803 { 804 while (*prev != NULL) { 805 prev = &(*prev)->next; 806 } 807 808 nxt_unit_mmap_buf_insert(prev, mmap_buf); 809 } 810 811 812 nxt_inline void 813 nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf) 814 { 815 nxt_unit_mmap_buf_t **prev; 816 817 prev = mmap_buf->prev; 818 819 if (mmap_buf->next != NULL) { 820 mmap_buf->next->prev = prev; 821 } 822 823 if (prev != NULL) { 824 *prev = mmap_buf->next; 825 } 826 } 827 828 829 static int 830 nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port, 831 nxt_unit_port_t *read_port, int *shared_port_fd, int *shared_queue_fd, 832 int *log_fd, uint32_t *stream, 833 uint32_t *shm_limit, uint32_t *request_limit) 834 { 835 int rc; 836 int ready_fd, router_fd, read_in_fd, read_out_fd; 837 char *unit_init, *version_end, *vars; 838 size_t version_length; 839 int64_t ready_pid, router_pid, read_pid; 840 uint32_t ready_stream, router_id, ready_id, read_id; 841 842 unit_init = getenv(NXT_UNIT_INIT_ENV); 843 if (nxt_slow_path(unit_init == NULL)) { 844 nxt_unit_alert(NULL, "%s is not in the current environment", 845 NXT_UNIT_INIT_ENV); 846 847 return NXT_UNIT_ERROR; 848 } 849 850 version_end = strchr(unit_init, ';'); 851 if (nxt_slow_path(version_end == NULL)) { 852 nxt_unit_alert(NULL, "Unit version not found in %s=\"%s\"", 853 NXT_UNIT_INIT_ENV, unit_init); 854 855 return NXT_UNIT_ERROR; 856 } 857 858 version_length = version_end - unit_init; 859 860 rc = version_length != nxt_length(NXT_VERSION) 861 || memcmp(unit_init, NXT_VERSION, nxt_length(NXT_VERSION)); 862 863 if (nxt_slow_path(rc != 0)) { 864 nxt_unit_alert(NULL, "versions mismatch: the Unit daemon has version " 865 "%.*s, while the app was compiled with libunit %s", 866 (int) version_length, unit_init, NXT_VERSION); 867 868 return NXT_UNIT_ERROR; 869 } 870 871 vars = version_end + 1; 872 873 rc = sscanf(vars, 874 "%"PRIu32";" 875 "%"PRId64",%"PRIu32",%d;" 876 "%"PRId64",%"PRIu32",%d;" 877 "%"PRId64",%"PRIu32",%d,%d;" 878 "%d,%d;" 879 "%d,%"PRIu32",%"PRIu32, 880 &ready_stream, 881 &ready_pid, &ready_id, &ready_fd, 882 &router_pid, &router_id, &router_fd, 883 &read_pid, &read_id, &read_in_fd, &read_out_fd, 884 shared_port_fd, shared_queue_fd, 885 log_fd, shm_limit, request_limit); 886 887 if (nxt_slow_path(rc == EOF)) { 888 nxt_unit_alert(NULL, "sscanf(%s) failed: %s (%d) for %s env", 889 vars, strerror(errno), errno, NXT_UNIT_INIT_ENV); 890 891 return NXT_UNIT_ERROR; 892 } 893 894 if (nxt_slow_path(rc != 16)) { 895 nxt_unit_alert(NULL, "invalid number of variables in %s env: " 896 "found %d of %d in %s", NXT_UNIT_INIT_ENV, rc, 16, vars); 897 898 return NXT_UNIT_ERROR; 899 } 900 901 nxt_unit_debug(NULL, "%s='%s'", NXT_UNIT_INIT_ENV, unit_init); 902 903 nxt_unit_port_id_init(&ready_port->id, (pid_t) ready_pid, ready_id); 904 905 ready_port->in_fd = -1; 906 ready_port->out_fd = ready_fd; 907 ready_port->data = NULL; 908 909 nxt_unit_port_id_init(&router_port->id, (pid_t) router_pid, router_id); 910 911 router_port->in_fd = -1; 912 router_port->out_fd = router_fd; 913 router_port->data = NULL; 914 915 nxt_unit_port_id_init(&read_port->id, (pid_t) read_pid, read_id); 916 917 read_port->in_fd = read_in_fd; 918 read_port->out_fd = read_out_fd; 919 read_port->data = NULL; 920 921 *stream = ready_stream; 922 923 return NXT_UNIT_OK; 924 } 925 926 927 static int 928 nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd) 929 { 930 ssize_t res; 931 nxt_send_oob_t oob; 932 nxt_port_msg_t msg; 933 nxt_unit_impl_t *lib; 934 int fds[2] = {queue_fd, -1}; 935 936 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 937 938 msg.stream = stream; 939 msg.pid = lib->pid; 940 msg.reply_port = 0; 941 msg.type = _NXT_PORT_MSG_PROCESS_READY; 942 msg.last = 1; 943 msg.mmap = 0; 944 msg.nf = 0; 945 msg.mf = 0; 946 947 nxt_socket_msg_oob_init(&oob, fds); 948 949 res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg), &oob); 950 if (res != sizeof(msg)) { 951 return NXT_UNIT_ERROR; 952 } 953 954 return NXT_UNIT_OK; 955 } 956 957 958 static int 959 nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf, 960 nxt_unit_request_info_t **preq) 961 { 962 int rc; 963 pid_t pid; 964 uint8_t quit_param; 965 nxt_port_msg_t *port_msg; 966 nxt_unit_impl_t *lib; 967 nxt_unit_recv_msg_t recv_msg; 968 969 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 970 971 recv_msg.incoming_buf = NULL; 972 recv_msg.fd[0] = -1; 973 recv_msg.fd[1] = -1; 974 975 rc = nxt_socket_msg_oob_get_fds(&rbuf->oob, recv_msg.fd); 976 if (nxt_slow_path(rc != NXT_OK)) { 977 nxt_unit_alert(ctx, "failed to receive file descriptor over cmsg"); 978 rc = NXT_UNIT_ERROR; 979 goto done; 980 } 981 982 if (nxt_slow_path(rbuf->size < (ssize_t) sizeof(nxt_port_msg_t))) { 983 if (nxt_slow_path(rbuf->size == 0)) { 984 nxt_unit_debug(ctx, "read port closed"); 985 986 nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL); 987 rc = NXT_UNIT_OK; 988 goto done; 989 } 990 991 nxt_unit_alert(ctx, "message too small (%d bytes)", (int) rbuf->size); 992 993 rc = NXT_UNIT_ERROR; 994 goto done; 995 } 996 997 port_msg = (nxt_port_msg_t *) rbuf->buf; 998 999 nxt_unit_debug(ctx, "#%"PRIu32": process message %d fd[0] %d fd[1] %d", 1000 port_msg->stream, (int) port_msg->type, 1001 recv_msg.fd[0], recv_msg.fd[1]); 1002 1003 recv_msg.stream = port_msg->stream; 1004 recv_msg.pid = port_msg->pid; 1005 recv_msg.reply_port = port_msg->reply_port; 1006 recv_msg.last = port_msg->last; 1007 recv_msg.mmap = port_msg->mmap; 1008 1009 recv_msg.start = port_msg + 1; 1010 recv_msg.size = rbuf->size - sizeof(nxt_port_msg_t); 1011 1012 if (nxt_slow_path(port_msg->type >= NXT_PORT_MSG_MAX)) { 1013 nxt_unit_alert(ctx, "#%"PRIu32": unknown message type (%d)", 1014 port_msg->stream, (int) port_msg->type); 1015 rc = NXT_UNIT_ERROR; 1016 goto done; 1017 } 1018 1019 /* Fragmentation is unsupported. */ 1020 if (nxt_slow_path(port_msg->nf != 0 || port_msg->mf != 0)) { 1021 nxt_unit_alert(ctx, "#%"PRIu32": fragmented message type (%d)", 1022 port_msg->stream, (int) port_msg->type); 1023 rc = NXT_UNIT_ERROR; 1024 goto done; 1025 } 1026 1027 if (port_msg->mmap) { 1028 rc = nxt_unit_mmap_read(ctx, &recv_msg, rbuf); 1029 1030 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 1031 if (rc == NXT_UNIT_AGAIN) { 1032 recv_msg.fd[0] = -1; 1033 recv_msg.fd[1] = -1; 1034 } 1035 1036 goto done; 1037 } 1038 } 1039 1040 switch (port_msg->type) { 1041 1042 case _NXT_PORT_MSG_RPC_READY: 1043 rc = NXT_UNIT_OK; 1044 break; 1045 1046 case _NXT_PORT_MSG_QUIT: 1047 if (recv_msg.size == sizeof(quit_param)) { 1048 memcpy(&quit_param, recv_msg.start, sizeof(quit_param)); 1049 1050 } else { 1051 quit_param = NXT_QUIT_NORMAL; 1052 } 1053 1054 nxt_unit_debug(ctx, "#%"PRIu32": %squit", port_msg->stream, 1055 (quit_param == NXT_QUIT_GRACEFUL ? "graceful " : "")); 1056 1057 nxt_unit_quit(ctx, quit_param); 1058 1059 rc = NXT_UNIT_OK; 1060 break; 1061 1062 case _NXT_PORT_MSG_NEW_PORT: 1063 rc = nxt_unit_process_new_port(ctx, &recv_msg); 1064 break; 1065 1066 case _NXT_PORT_MSG_PORT_ACK: 1067 rc = nxt_unit_ctx_ready(ctx); 1068 break; 1069 1070 case _NXT_PORT_MSG_CHANGE_FILE: 1071 nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d", 1072 port_msg->stream, recv_msg.fd[0]); 1073 1074 if (dup2(recv_msg.fd[0], lib->log_fd) == -1) { 1075 nxt_unit_alert(ctx, "#%"PRIu32": dup2(%d, %d) failed: %s (%d)", 1076 port_msg->stream, recv_msg.fd[0], lib->log_fd, 1077 strerror(errno), errno); 1078 1079 rc = NXT_UNIT_ERROR; 1080 goto done; 1081 } 1082 1083 rc = NXT_UNIT_OK; 1084 break; 1085 1086 case _NXT_PORT_MSG_MMAP: 1087 if (nxt_slow_path(recv_msg.fd[0] < 0)) { 1088 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap", 1089 port_msg->stream, recv_msg.fd[0]); 1090 1091 rc = NXT_UNIT_ERROR; 1092 goto done; 1093 } 1094 1095 rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd[0]); 1096 break; 1097 1098 case _NXT_PORT_MSG_REQ_HEADERS: 1099 rc = nxt_unit_process_req_headers(ctx, &recv_msg, preq); 1100 break; 1101 1102 case _NXT_PORT_MSG_REQ_BODY: 1103 rc = nxt_unit_process_req_body(ctx, &recv_msg); 1104 break; 1105 1106 case _NXT_PORT_MSG_WEBSOCKET: 1107 rc = nxt_unit_process_websocket(ctx, &recv_msg); 1108 break; 1109 1110 case _NXT_PORT_MSG_REMOVE_PID: 1111 if (nxt_slow_path(recv_msg.size != sizeof(pid))) { 1112 nxt_unit_alert(ctx, "#%"PRIu32": remove_pid: invalid message size " 1113 "(%d != %d)", port_msg->stream, (int) recv_msg.size, 1114 (int) sizeof(pid)); 1115 1116 rc = NXT_UNIT_ERROR; 1117 goto done; 1118 } 1119 1120 memcpy(&pid, recv_msg.start, sizeof(pid)); 1121 1122 nxt_unit_debug(ctx, "#%"PRIu32": remove_pid: %d", 1123 port_msg->stream, (int) pid); 1124 1125 nxt_unit_remove_pid(lib, pid); 1126 1127 rc = NXT_UNIT_OK; 1128 break; 1129 1130 case _NXT_PORT_MSG_SHM_ACK: 1131 rc = nxt_unit_process_shm_ack(ctx); 1132 break; 1133 1134 default: 1135 nxt_unit_alert(ctx, "#%"PRIu32": ignore message type: %d", 1136 port_msg->stream, (int) port_msg->type); 1137 1138 rc = NXT_UNIT_ERROR; 1139 goto done; 1140 } 1141 1142 done: 1143 1144 if (recv_msg.fd[0] != -1) { 1145 nxt_unit_close(recv_msg.fd[0]); 1146 } 1147 1148 if (recv_msg.fd[1] != -1) { 1149 nxt_unit_close(recv_msg.fd[1]); 1150 } 1151 1152 while (recv_msg.incoming_buf != NULL) { 1153 nxt_unit_mmap_buf_free(recv_msg.incoming_buf); 1154 } 1155 1156 if (nxt_fast_path(rc != NXT_UNIT_AGAIN)) { 1157 #if (NXT_DEBUG) 1158 memset(rbuf->buf, 0xAC, rbuf->size); 1159 #endif 1160 nxt_unit_read_buf_release(ctx, rbuf); 1161 } 1162 1163 return rc; 1164 } 1165 1166 1167 static int 1168 nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 1169 { 1170 void *mem; 1171 nxt_unit_port_t new_port, *port; 1172 nxt_port_msg_new_port_t *new_port_msg; 1173 1174 if (nxt_slow_path(recv_msg->size != sizeof(nxt_port_msg_new_port_t))) { 1175 nxt_unit_warn(ctx, "#%"PRIu32": new_port: " 1176 "invalid message size (%d)", 1177 recv_msg->stream, (int) recv_msg->size); 1178 1179 return NXT_UNIT_ERROR; 1180 } 1181 1182 if (nxt_slow_path(recv_msg->fd[0] < 0)) { 1183 nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port", 1184 recv_msg->stream, recv_msg->fd[0]); 1185 1186 return NXT_UNIT_ERROR; 1187 } 1188 1189 new_port_msg = recv_msg->start; 1190 1191 nxt_unit_debug(ctx, "#%"PRIu32": new_port: port{%d,%d} fd[0] %d fd[1] %d", 1192 recv_msg->stream, (int) new_port_msg->pid, 1193 (int) new_port_msg->id, recv_msg->fd[0], recv_msg->fd[1]); 1194 1195 if (nxt_slow_path(nxt_unit_fd_blocking(recv_msg->fd[0]) != NXT_UNIT_OK)) { 1196 return NXT_UNIT_ERROR; 1197 } 1198 1199 nxt_unit_port_id_init(&new_port.id, new_port_msg->pid, new_port_msg->id); 1200 1201 new_port.in_fd = -1; 1202 new_port.out_fd = recv_msg->fd[0]; 1203 1204 mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE, 1205 MAP_SHARED, recv_msg->fd[1], 0); 1206 1207 if (nxt_slow_path(mem == MAP_FAILED)) { 1208 nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd[1], 1209 strerror(errno), errno); 1210 1211 return NXT_UNIT_ERROR; 1212 } 1213 1214 new_port.data = NULL; 1215 1216 recv_msg->fd[0] = -1; 1217 1218 port = nxt_unit_add_port(ctx, &new_port, mem); 1219 if (nxt_slow_path(port == NULL)) { 1220 return NXT_UNIT_ERROR; 1221 } 1222 1223 nxt_unit_port_release(port); 1224 1225 return NXT_UNIT_OK; 1226 } 1227 1228 1229 static int 1230 nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx) 1231 { 1232 nxt_unit_impl_t *lib; 1233 nxt_unit_ctx_impl_t *ctx_impl; 1234 1235 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1236 1237 if (nxt_slow_path(ctx_impl->ready)) { 1238 return NXT_UNIT_OK; 1239 } 1240 1241 ctx_impl->ready = 1; 1242 1243 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1244 1245 /* Call ready_handler() only for main context. */ 1246 if (&lib->main_ctx == ctx_impl && lib->callbacks.ready_handler != NULL) { 1247 return lib->callbacks.ready_handler(ctx); 1248 } 1249 1250 if (&lib->main_ctx != ctx_impl) { 1251 /* Check if the main context is already stopped or quit. */ 1252 if (nxt_slow_path(!lib->main_ctx.ready)) { 1253 ctx_impl->ready = 0; 1254 1255 nxt_unit_quit(ctx, lib->main_ctx.quit_param); 1256 1257 return NXT_UNIT_OK; 1258 } 1259 1260 if (lib->callbacks.add_port != NULL) { 1261 lib->callbacks.add_port(ctx, lib->shared_port); 1262 } 1263 } 1264 1265 return NXT_UNIT_OK; 1266 } 1267 1268 1269 static int 1270 nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg, 1271 nxt_unit_request_info_t **preq) 1272 { 1273 int res; 1274 nxt_unit_impl_t *lib; 1275 nxt_unit_port_id_t port_id; 1276 nxt_unit_request_t *r; 1277 nxt_unit_mmap_buf_t *b; 1278 nxt_unit_request_info_t *req; 1279 nxt_unit_request_info_impl_t *req_impl; 1280 1281 if (nxt_slow_path(recv_msg->mmap == 0)) { 1282 nxt_unit_warn(ctx, "#%"PRIu32": data is not in shared memory", 1283 recv_msg->stream); 1284 1285 return NXT_UNIT_ERROR; 1286 } 1287 1288 if (nxt_slow_path(recv_msg->size < sizeof(nxt_unit_request_t))) { 1289 nxt_unit_warn(ctx, "#%"PRIu32": data too short: %d while at least " 1290 "%d expected", recv_msg->stream, (int) recv_msg->size, 1291 (int) sizeof(nxt_unit_request_t)); 1292 1293 return NXT_UNIT_ERROR; 1294 } 1295 1296 req_impl = nxt_unit_request_info_get(ctx); 1297 if (nxt_slow_path(req_impl == NULL)) { 1298 nxt_unit_warn(ctx, "#%"PRIu32": request info allocation failed", 1299 recv_msg->stream); 1300 1301 return NXT_UNIT_ERROR; 1302 } 1303 1304 req = &req_impl->req; 1305 1306 req->request = recv_msg->start; 1307 1308 b = recv_msg->incoming_buf; 1309 1310 req->request_buf = &b->buf; 1311 req->response = NULL; 1312 req->response_buf = NULL; 1313 1314 r = req->request; 1315 1316 req->content_length = r->content_length; 1317 1318 req->content_buf = req->request_buf; 1319 req->content_buf->free = nxt_unit_sptr_get(&r->preread_content); 1320 1321 req_impl->stream = recv_msg->stream; 1322 1323 req_impl->outgoing_buf = NULL; 1324 1325 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) { 1326 b->req = req; 1327 } 1328 1329 /* "Move" incoming buffer list to req_impl. */ 1330 req_impl->incoming_buf = recv_msg->incoming_buf; 1331 req_impl->incoming_buf->prev = &req_impl->incoming_buf; 1332 recv_msg->incoming_buf = NULL; 1333 1334 req->content_fd = recv_msg->fd[0]; 1335 recv_msg->fd[0] = -1; 1336 1337 req->response_max_fields = 0; 1338 req_impl->state = NXT_UNIT_RS_START; 1339 req_impl->websocket = 0; 1340 req_impl->in_hash = 0; 1341 1342 nxt_unit_debug(ctx, "#%"PRIu32": %.*s %.*s (%d)", recv_msg->stream, 1343 (int) r->method_length, 1344 (char *) nxt_unit_sptr_get(&r->method), 1345 (int) r->target_length, 1346 (char *) nxt_unit_sptr_get(&r->target), 1347 (int) r->content_length); 1348 1349 nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port); 1350 1351 res = nxt_unit_request_check_response_port(req, &port_id); 1352 if (nxt_slow_path(res == NXT_UNIT_ERROR)) { 1353 return NXT_UNIT_ERROR; 1354 } 1355 1356 if (nxt_fast_path(res == NXT_UNIT_OK)) { 1357 res = nxt_unit_send_req_headers_ack(req); 1358 if (nxt_slow_path(res == NXT_UNIT_ERROR)) { 1359 nxt_unit_request_done(req, NXT_UNIT_ERROR); 1360 1361 return NXT_UNIT_ERROR; 1362 } 1363 1364 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1365 1366 if (req->content_length 1367 > (uint64_t) (req->content_buf->end - req->content_buf->free)) 1368 { 1369 res = nxt_unit_request_hash_add(ctx, req); 1370 if (nxt_slow_path(res != NXT_UNIT_OK)) { 1371 nxt_unit_req_warn(req, "failed to add request to hash"); 1372 1373 nxt_unit_request_done(req, NXT_UNIT_ERROR); 1374 1375 return NXT_UNIT_ERROR; 1376 } 1377 1378 /* 1379 * If application have separate data handler, we may start 1380 * request processing and process data when it is arrived. 1381 */ 1382 if (lib->callbacks.data_handler == NULL) { 1383 return NXT_UNIT_OK; 1384 } 1385 } 1386 1387 if (preq == NULL) { 1388 lib->callbacks.request_handler(req); 1389 1390 } else { 1391 *preq = req; 1392 } 1393 } 1394 1395 return NXT_UNIT_OK; 1396 } 1397 1398 1399 static int 1400 nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 1401 { 1402 uint64_t l; 1403 nxt_unit_impl_t *lib; 1404 nxt_unit_mmap_buf_t *b; 1405 nxt_unit_request_info_t *req; 1406 1407 req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last); 1408 if (req == NULL) { 1409 return NXT_UNIT_OK; 1410 } 1411 1412 l = req->content_buf->end - req->content_buf->free; 1413 1414 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) { 1415 b->req = req; 1416 l += b->buf.end - b->buf.free; 1417 } 1418 1419 if (recv_msg->incoming_buf != NULL) { 1420 b = nxt_container_of(req->content_buf, nxt_unit_mmap_buf_t, buf); 1421 1422 while (b->next != NULL) { 1423 b = b->next; 1424 } 1425 1426 /* "Move" incoming buffer list to req_impl. */ 1427 b->next = recv_msg->incoming_buf; 1428 b->next->prev = &b->next; 1429 1430 recv_msg->incoming_buf = NULL; 1431 } 1432 1433 req->content_fd = recv_msg->fd[0]; 1434 recv_msg->fd[0] = -1; 1435 1436 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1437 1438 if (lib->callbacks.data_handler != NULL) { 1439 lib->callbacks.data_handler(req); 1440 1441 return NXT_UNIT_OK; 1442 } 1443 1444 if (req->content_fd != -1 || l == req->content_length) { 1445 lib->callbacks.request_handler(req); 1446 } 1447 1448 return NXT_UNIT_OK; 1449 } 1450 1451 1452 static int 1453 nxt_unit_request_check_response_port(nxt_unit_request_info_t *req, 1454 nxt_unit_port_id_t *port_id) 1455 { 1456 int res; 1457 nxt_unit_ctx_t *ctx; 1458 nxt_unit_impl_t *lib; 1459 nxt_unit_port_t *port; 1460 nxt_unit_process_t *process; 1461 nxt_unit_ctx_impl_t *ctx_impl; 1462 nxt_unit_port_impl_t *port_impl; 1463 nxt_unit_request_info_impl_t *req_impl; 1464 1465 ctx = req->ctx; 1466 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1467 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1468 1469 pthread_mutex_lock(&lib->mutex); 1470 1471 port = nxt_unit_port_hash_find(&lib->ports, port_id, 0); 1472 port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port); 1473 1474 if (nxt_fast_path(port != NULL)) { 1475 req->response_port = port; 1476 1477 if (nxt_fast_path(port_impl->ready)) { 1478 pthread_mutex_unlock(&lib->mutex); 1479 1480 nxt_unit_debug(ctx, "check_response_port: found port{%d,%d}", 1481 (int) port->id.pid, (int) port->id.id); 1482 1483 return NXT_UNIT_OK; 1484 } 1485 1486 nxt_unit_debug(ctx, "check_response_port: " 1487 "port{%d,%d} already requested", 1488 (int) port->id.pid, (int) port->id.id); 1489 1490 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1491 1492 nxt_queue_insert_tail(&port_impl->awaiting_req, 1493 &req_impl->port_wait_link); 1494 1495 pthread_mutex_unlock(&lib->mutex); 1496 1497 nxt_atomic_fetch_add(&ctx_impl->wait_items, 1); 1498 1499 return NXT_UNIT_AGAIN; 1500 } 1501 1502 port_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_port_impl_t)); 1503 if (nxt_slow_path(port_impl == NULL)) { 1504 nxt_unit_alert(ctx, "check_response_port: malloc(%d) failed", 1505 (int) sizeof(nxt_unit_port_impl_t)); 1506 1507 pthread_mutex_unlock(&lib->mutex); 1508 1509 return NXT_UNIT_ERROR; 1510 } 1511 1512 port = &port_impl->port; 1513 1514 port->id = *port_id; 1515 port->in_fd = -1; 1516 port->out_fd = -1; 1517 port->data = NULL; 1518 1519 res = nxt_unit_port_hash_add(&lib->ports, port); 1520 if (nxt_slow_path(res != NXT_UNIT_OK)) { 1521 nxt_unit_alert(ctx, "check_response_port: %d,%d hash_add failed", 1522 port->id.pid, port->id.id); 1523 1524 pthread_mutex_unlock(&lib->mutex); 1525 1526 nxt_unit_free(ctx, port); 1527 1528 return NXT_UNIT_ERROR; 1529 } 1530 1531 process = nxt_unit_process_find(lib, port_id->pid, 0); 1532 if (nxt_slow_path(process == NULL)) { 1533 nxt_unit_alert(ctx, "check_response_port: process %d not found", 1534 port->id.pid); 1535 1536 nxt_unit_port_hash_find(&lib->ports, port_id, 1); 1537 1538 pthread_mutex_unlock(&lib->mutex); 1539 1540 nxt_unit_free(ctx, port); 1541 1542 return NXT_UNIT_ERROR; 1543 } 1544 1545 nxt_queue_insert_tail(&process->ports, &port_impl->link); 1546 1547 port_impl->process = process; 1548 port_impl->queue = NULL; 1549 port_impl->from_socket = 0; 1550 port_impl->socket_rbuf = NULL; 1551 1552 nxt_queue_init(&port_impl->awaiting_req); 1553 1554 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1555 1556 nxt_queue_insert_tail(&port_impl->awaiting_req, &req_impl->port_wait_link); 1557 1558 port_impl->use_count = 2; 1559 port_impl->ready = 0; 1560 1561 req->response_port = port; 1562 1563 pthread_mutex_unlock(&lib->mutex); 1564 1565 res = nxt_unit_get_port(ctx, port_id); 1566 if (nxt_slow_path(res == NXT_UNIT_ERROR)) { 1567 return NXT_UNIT_ERROR; 1568 } 1569 1570 nxt_atomic_fetch_add(&ctx_impl->wait_items, 1); 1571 1572 return NXT_UNIT_AGAIN; 1573 } 1574 1575 1576 static int 1577 nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req) 1578 { 1579 ssize_t res; 1580 nxt_port_msg_t msg; 1581 nxt_unit_impl_t *lib; 1582 nxt_unit_ctx_impl_t *ctx_impl; 1583 nxt_unit_request_info_impl_t *req_impl; 1584 1585 lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit); 1586 ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx); 1587 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1588 1589 memset(&msg, 0, sizeof(nxt_port_msg_t)); 1590 1591 msg.stream = req_impl->stream; 1592 msg.pid = lib->pid; 1593 msg.reply_port = ctx_impl->read_port->id.id; 1594 msg.type = _NXT_PORT_MSG_REQ_HEADERS_ACK; 1595 1596 res = nxt_unit_port_send(req->ctx, req->response_port, 1597 &msg, sizeof(msg), NULL); 1598 if (nxt_slow_path(res != sizeof(msg))) { 1599 return NXT_UNIT_ERROR; 1600 } 1601 1602 return NXT_UNIT_OK; 1603 } 1604 1605 1606 static int 1607 nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg) 1608 { 1609 size_t hsize; 1610 nxt_unit_impl_t *lib; 1611 nxt_unit_mmap_buf_t *b; 1612 nxt_unit_callbacks_t *cb; 1613 nxt_unit_request_info_t *req; 1614 nxt_unit_request_info_impl_t *req_impl; 1615 nxt_unit_websocket_frame_impl_t *ws_impl; 1616 1617 req = nxt_unit_request_hash_find(ctx, recv_msg->stream, recv_msg->last); 1618 if (nxt_slow_path(req == NULL)) { 1619 return NXT_UNIT_OK; 1620 } 1621 1622 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1623 1624 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1625 cb = &lib->callbacks; 1626 1627 if (cb->websocket_handler && recv_msg->size >= 2) { 1628 ws_impl = nxt_unit_websocket_frame_get(ctx); 1629 if (nxt_slow_path(ws_impl == NULL)) { 1630 nxt_unit_warn(ctx, "#%"PRIu32": websocket frame allocation failed", 1631 req_impl->stream); 1632 1633 return NXT_UNIT_ERROR; 1634 } 1635 1636 ws_impl->ws.req = req; 1637 1638 ws_impl->buf = NULL; 1639 1640 if (recv_msg->mmap) { 1641 for (b = recv_msg->incoming_buf; b != NULL; b = b->next) { 1642 b->req = req; 1643 } 1644 1645 /* "Move" incoming buffer list to ws_impl. */ 1646 ws_impl->buf = recv_msg->incoming_buf; 1647 ws_impl->buf->prev = &ws_impl->buf; 1648 recv_msg->incoming_buf = NULL; 1649 1650 b = ws_impl->buf; 1651 1652 } else { 1653 b = nxt_unit_mmap_buf_get(ctx); 1654 if (nxt_slow_path(b == NULL)) { 1655 nxt_unit_alert(ctx, "#%"PRIu32": failed to allocate buf", 1656 req_impl->stream); 1657 1658 nxt_unit_websocket_frame_release(&ws_impl->ws); 1659 1660 return NXT_UNIT_ERROR; 1661 } 1662 1663 b->req = req; 1664 b->buf.start = recv_msg->start; 1665 b->buf.free = b->buf.start; 1666 b->buf.end = b->buf.start + recv_msg->size; 1667 1668 nxt_unit_mmap_buf_insert(&ws_impl->buf, b); 1669 } 1670 1671 ws_impl->ws.header = (void *) b->buf.start; 1672 ws_impl->ws.payload_len = nxt_websocket_frame_payload_len( 1673 ws_impl->ws.header); 1674 1675 hsize = nxt_websocket_frame_header_size(ws_impl->ws.header); 1676 1677 if (ws_impl->ws.header->mask) { 1678 ws_impl->ws.mask = (uint8_t *) b->buf.start + hsize - 4; 1679 1680 } else { 1681 ws_impl->ws.mask = NULL; 1682 } 1683 1684 b->buf.free += hsize; 1685 1686 ws_impl->ws.content_buf = &b->buf; 1687 ws_impl->ws.content_length = ws_impl->ws.payload_len; 1688 1689 nxt_unit_req_debug(req, "websocket_handler: opcode=%d, " 1690 "payload_len=%"PRIu64, 1691 ws_impl->ws.header->opcode, 1692 ws_impl->ws.payload_len); 1693 1694 cb->websocket_handler(&ws_impl->ws); 1695 } 1696 1697 if (recv_msg->last) { 1698 if (cb->close_handler) { 1699 nxt_unit_req_debug(req, "close_handler"); 1700 1701 cb->close_handler(req); 1702 1703 } else { 1704 nxt_unit_request_done(req, NXT_UNIT_ERROR); 1705 } 1706 } 1707 1708 return NXT_UNIT_OK; 1709 } 1710 1711 1712 static int 1713 nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx) 1714 { 1715 nxt_unit_impl_t *lib; 1716 nxt_unit_callbacks_t *cb; 1717 1718 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1719 cb = &lib->callbacks; 1720 1721 if (cb->shm_ack_handler != NULL) { 1722 cb->shm_ack_handler(ctx); 1723 } 1724 1725 return NXT_UNIT_OK; 1726 } 1727 1728 1729 static nxt_unit_request_info_impl_t * 1730 nxt_unit_request_info_get(nxt_unit_ctx_t *ctx) 1731 { 1732 nxt_unit_impl_t *lib; 1733 nxt_queue_link_t *lnk; 1734 nxt_unit_ctx_impl_t *ctx_impl; 1735 nxt_unit_request_info_impl_t *req_impl; 1736 1737 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1738 1739 lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); 1740 1741 pthread_mutex_lock(&ctx_impl->mutex); 1742 1743 if (nxt_queue_is_empty(&ctx_impl->free_req)) { 1744 pthread_mutex_unlock(&ctx_impl->mutex); 1745 1746 req_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_request_info_impl_t) 1747 + lib->request_data_size); 1748 if (nxt_slow_path(req_impl == NULL)) { 1749 return NULL; 1750 } 1751 1752 req_impl->req.unit = ctx->unit; 1753 req_impl->req.ctx = ctx; 1754 1755 pthread_mutex_lock(&ctx_impl->mutex); 1756 1757 } else { 1758 lnk = nxt_queue_first(&ctx_impl->free_req); 1759 nxt_queue_remove(lnk); 1760 1761 req_impl = nxt_container_of(lnk, nxt_unit_request_info_impl_t, link); 1762 } 1763 1764 nxt_queue_insert_tail(&ctx_impl->active_req, &req_impl->link); 1765 1766 pthread_mutex_unlock(&ctx_impl->mutex); 1767 1768 req_impl->req.data = lib->request_data_size ? req_impl->extra_data : NULL; 1769 1770 return req_impl; 1771 } 1772 1773 1774 static void 1775 nxt_unit_request_info_release(nxt_unit_request_info_t *req) 1776 { 1777 nxt_unit_ctx_t *ctx; 1778 nxt_unit_ctx_impl_t *ctx_impl; 1779 nxt_unit_request_info_impl_t *req_impl; 1780 1781 ctx = req->ctx; 1782 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1783 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 1784 1785 req->response = NULL; 1786 req->response_buf = NULL; 1787 1788 if (req_impl->in_hash) { 1789 nxt_unit_request_hash_find(req->ctx, req_impl->stream, 1); 1790 } 1791 1792 while (req_impl->outgoing_buf != NULL) { 1793 nxt_unit_mmap_buf_free(req_impl->outgoing_buf); 1794 } 1795 1796 while (req_impl->incoming_buf != NULL) { 1797 nxt_unit_mmap_buf_free(req_impl->incoming_buf); 1798 } 1799 1800 if (req->content_fd != -1) { 1801 nxt_unit_close(req->content_fd); 1802 1803 req->content_fd = -1; 1804 } 1805 1806 if (req->response_port != NULL) { 1807 nxt_unit_port_release(req->response_port); 1808 1809 req->response_port = NULL; 1810 } 1811 1812 req_impl->state = NXT_UNIT_RS_RELEASED; 1813 1814 pthread_mutex_lock(&ctx_impl->mutex); 1815 1816 nxt_queue_remove(&req_impl->link); 1817 1818 nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link); 1819 1820 pthread_mutex_unlock(&ctx_impl->mutex); 1821 1822 if (nxt_slow_path(!nxt_unit_chk_ready(ctx))) { 1823 nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL); 1824 } 1825 } 1826 1827 1828 static void 1829 nxt_unit_request_info_free(nxt_unit_request_info_impl_t *req_impl) 1830 { 1831 nxt_unit_ctx_impl_t *ctx_impl; 1832 1833 ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t, ctx); 1834 1835 nxt_queue_remove(&req_impl->link); 1836 1837 if (req_impl != &ctx_impl->req) { 1838 nxt_unit_free(&ctx_impl->ctx, req_impl); 1839 } 1840 } 1841 1842 1843 static nxt_unit_websocket_frame_impl_t * 1844 nxt_unit_websocket_frame_get(nxt_unit_ctx_t *ctx) 1845 { 1846 nxt_queue_link_t *lnk; 1847 nxt_unit_ctx_impl_t *ctx_impl; 1848 nxt_unit_websocket_frame_impl_t *ws_impl; 1849 1850 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 1851 1852 pthread_mutex_lock(&ctx_impl->mutex); 1853 1854 if (nxt_queue_is_empty(&ctx_impl->free_ws)) { 1855 pthread_mutex_unlock(&ctx_impl->mutex); 1856 1857 ws_impl = nxt_unit_malloc(ctx, sizeof(nxt_unit_websocket_frame_impl_t)); 1858 if (nxt_slow_path(ws_impl == NULL)) { 1859 return NULL; 1860 } 1861 1862 } else { 1863 lnk = nxt_queue_first(&ctx_impl->free_ws); 1864 nxt_queue_remove(lnk); 1865 1866 pthread_mutex_unlock(&ctx_impl->mutex); 1867 1868 ws_impl = nxt_container_of(lnk, nxt_unit_websocket_frame_impl_t, link); 1869 } 1870 1871 ws_impl->ctx_impl = ctx_impl; 1872 1873 return ws_impl; 1874 } 1875 1876 1877 static void 1878 nxt_unit_websocket_frame_release(nxt_unit_websocket_frame_t *ws) 1879 { 1880 nxt_unit_websocket_frame_impl_t *ws_impl; 1881 1882 ws_impl = nxt_container_of(ws, nxt_unit_websocket_frame_impl_t, ws); 1883 1884 while (ws_impl->buf != NULL) { 1885 nxt_unit_mmap_buf_free(ws_impl->buf); 1886 } 1887 1888 ws->req = NULL; 1889 1890 pthread_mutex_lock(&ws_impl->ctx_impl->mutex); 1891 1892 nxt_queue_insert_tail(&ws_impl->ctx_impl->free_ws, &ws_impl->link); 1893 1894 pthread_mutex_unlock(&ws_impl->ctx_impl->mutex); 1895 } 1896 1897 1898 static void 1899 nxt_unit_websocket_frame_free(nxt_unit_ctx_t *ctx, 1900 nxt_unit_websocket_frame_impl_t *ws_impl) 1901 { 1902 nxt_queue_remove(&ws_impl->link); 1903 1904 nxt_unit_free(ctx, ws_impl); 1905 } 1906 1907 1908 uint16_t 1909 nxt_unit_field_hash(const char *name, size_t name_length) 1910 { 1911 u_char ch; 1912 uint32_t hash; 1913 const char *p, *end; 1914 1915 hash = 159406; /* Magic value copied from nxt_http_parse.c */ 1916 end = name + name_length; 1917 1918 for (p = name; p < end; p++) { 1919 ch = *p; 1920 hash = (hash << 4) + hash + nxt_lowcase(ch); 1921 } 1922 1923 hash = (hash >> 16) ^ hash; 1924 1925 return hash; 1926 } 1927 1928 1929 void 1930 nxt_unit_request_group_dup_fields(nxt_unit_request_info_t *req) 1931 { 1932 char *name; 1933 uint32_t i, j; 1934 nxt_unit_field_t *fields, f; 1935 nxt_unit_request_t *r; 1936 1937 static nxt_str_t content_length = nxt_string("content-length"); 1938 static nxt_str_t content_type = nxt_string("content-type"); 1939 static nxt_str_t cookie = nxt_string("cookie"); 1940 1941 nxt_unit_req_debug(req, "group_dup_fields"); 1942 1943 r = req->request; 1944 fields = r->fields; 1945 1946 for (i = 0; i < r->fields_count; i++) { 1947 name = nxt_unit_sptr_get(&fields[i].name); 1948 1949 switch (fields[i].hash) { 1950 case NXT_UNIT_HASH_CONTENT_LENGTH: 1951 if (fields[i].name_length == content_length.length 1952 && nxt_unit_memcasecmp(name, content_length.start, 1953 content_length.length) == 0) 1954 { 1955 r->content_length_field = i; 1956 } 1957 1958 break; 1959 1960 case NXT_UNIT_HASH_CONTENT_TYPE: 1961 if (fields[i].name_length == content_type.length 1962 && nxt_unit_memcasecmp(name, content_type.start, 1963 content_type.length) == 0) 1964 { 1965 r->content_type_field = i; 1966 } 1967 1968 break; 1969 1970 case NXT_UNIT_HASH_COOKIE: 1971 if (fields[i].name_length == cookie.length 1972 && nxt_unit_memcasecmp(name, cookie.start, 1973 cookie.length) == 0) 1974 { 1975 r->cookie_field = i; 1976 } 1977 1978 break; 1979 } 1980 1981 for (j = i + 1; j < r->fields_count; j++) { 1982 if (fields[i].hash != fields[j].hash 1983 || fields[i].name_length != fields[j].name_length 1984 || nxt_unit_memcasecmp(name, 1985 nxt_unit_sptr_get(&fields[j].name), 1986 fields[j].name_length) != 0) 1987 { 1988 continue; 1989 } 1990 1991 f = fields[j]; 1992 f.value.offset += (j - (i + 1)) * sizeof(f); 1993 1994 while (j > i + 1) { 1995 fields[j] = fields[j - 1]; 1996 fields[j].name.offset -= sizeof(f); 1997 fields[j].value.offset -= sizeof(f); 1998 j--; 1999 } 2000 2001 fields[j] = f; 2002 2003 /* Assign the same name pointer for further grouping simplicity. */ 2004 nxt_unit_sptr_set(&fields[j].name, name); 2005 2006 i++; 2007 } 2008 } 2009 } 2010 2011 2012 int 2013 nxt_unit_response_init(nxt_unit_request_info_t *req, 2014 uint16_t status, uint32_t max_fields_count, uint32_t max_fields_size) 2015 { 2016 uint32_t buf_size; 2017 nxt_unit_buf_t *buf; 2018 nxt_unit_request_info_impl_t *req_impl; 2019 2020 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2021 2022 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 2023 nxt_unit_req_warn(req, "init: response already sent"); 2024 2025 return NXT_UNIT_ERROR; 2026 } 2027 2028 nxt_unit_req_debug(req, "init: %d, max fields %d/%d", (int) status, 2029 (int) max_fields_count, (int) max_fields_size); 2030 2031 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT)) { 2032 nxt_unit_req_debug(req, "duplicate response init"); 2033 } 2034 2035 /* 2036 * Each field name and value 0-terminated by libunit, 2037 * this is the reason of '+ 2' below. 2038 */ 2039 buf_size = sizeof(nxt_unit_response_t) 2040 + max_fields_count * (sizeof(nxt_unit_field_t) + 2) 2041 + max_fields_size; 2042 2043 if (nxt_slow_path(req->response_buf != NULL)) { 2044 buf = req->response_buf; 2045 2046 if (nxt_fast_path(buf_size <= (uint32_t) (buf->end - buf->start))) { 2047 goto init_response; 2048 } 2049 2050 nxt_unit_buf_free(buf); 2051 2052 req->response_buf = NULL; 2053 req->response = NULL; 2054 req->response_max_fields = 0; 2055 2056 req_impl->state = NXT_UNIT_RS_START; 2057 } 2058 2059 buf = nxt_unit_response_buf_alloc(req, buf_size); 2060 if (nxt_slow_path(buf == NULL)) { 2061 return NXT_UNIT_ERROR; 2062 } 2063 2064 init_response: 2065 2066 memset(buf->start, 0, sizeof(nxt_unit_response_t)); 2067 2068 req->response_buf = buf; 2069 2070 req->response = (nxt_unit_response_t *) buf->start; 2071 req->response->status = status; 2072 2073 buf->free = buf->start + sizeof(nxt_unit_response_t) 2074 + max_fields_count * sizeof(nxt_unit_field_t); 2075 2076 req->response_max_fields = max_fields_count; 2077 req_impl->state = NXT_UNIT_RS_RESPONSE_INIT; 2078 2079 return NXT_UNIT_OK; 2080 } 2081 2082 2083 int 2084 nxt_unit_response_realloc(nxt_unit_request_info_t *req, 2085 uint32_t max_fields_count, uint32_t max_fields_size) 2086 { 2087 char *p; 2088 uint32_t i, buf_size; 2089 nxt_unit_buf_t *buf; 2090 nxt_unit_field_t *f, *src; 2091 nxt_unit_response_t *resp; 2092 nxt_unit_request_info_impl_t *req_impl; 2093 2094 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2095 2096 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2097 nxt_unit_req_warn(req, "realloc: response not init"); 2098 2099 return NXT_UNIT_ERROR; 2100 } 2101 2102 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 2103 nxt_unit_req_warn(req, "realloc: response already sent"); 2104 2105 return NXT_UNIT_ERROR; 2106 } 2107 2108 if (nxt_slow_path(max_fields_count < req->response->fields_count)) { 2109 nxt_unit_req_warn(req, "realloc: new max_fields_count is too small"); 2110 2111 return NXT_UNIT_ERROR; 2112 } 2113 2114 /* 2115 * Each field name and value 0-terminated by libunit, 2116 * this is the reason of '+ 2' below. 2117 */ 2118 buf_size = sizeof(nxt_unit_response_t) 2119 + max_fields_count * (sizeof(nxt_unit_field_t) + 2) 2120 + max_fields_size; 2121 2122 nxt_unit_req_debug(req, "realloc %"PRIu32"", buf_size); 2123 2124 buf = nxt_unit_response_buf_alloc(req, buf_size); 2125 if (nxt_slow_path(buf == NULL)) { 2126 nxt_unit_req_warn(req, "realloc: new buf allocation failed"); 2127 return NXT_UNIT_ERROR; 2128 } 2129 2130 resp = (nxt_unit_response_t *) buf->start; 2131 2132 memset(resp, 0, sizeof(nxt_unit_response_t)); 2133 2134 resp->status = req->response->status; 2135 resp->content_length = req->response->content_length; 2136 2137 p = buf->start + max_fields_count * sizeof(nxt_unit_field_t); 2138 f = resp->fields; 2139 2140 for (i = 0; i < req->response->fields_count; i++) { 2141 src = req->response->fields + i; 2142 2143 if (nxt_slow_path(src->skip != 0)) { 2144 continue; 2145 } 2146 2147 if (nxt_slow_path(src->name_length + src->value_length + 2 2148 > (uint32_t) (buf->end - p))) 2149 { 2150 nxt_unit_req_warn(req, "realloc: not enough space for field" 2151 " #%"PRIu32" (%p), (%"PRIu32" + %"PRIu32") required", 2152 i, src, src->name_length, src->value_length); 2153 2154 goto fail; 2155 } 2156 2157 nxt_unit_sptr_set(&f->name, p); 2158 p = nxt_cpymem(p, nxt_unit_sptr_get(&src->name), src->name_length); 2159 *p++ = '\0'; 2160 2161 nxt_unit_sptr_set(&f->value, p); 2162 p = nxt_cpymem(p, nxt_unit_sptr_get(&src->value), src->value_length); 2163 *p++ = '\0'; 2164 2165 f->hash = src->hash; 2166 f->skip = 0; 2167 f->name_length = src->name_length; 2168 f->value_length = src->value_length; 2169 2170 resp->fields_count++; 2171 f++; 2172 } 2173 2174 if (req->response->piggyback_content_length > 0) { 2175 if (nxt_slow_path(req->response->piggyback_content_length 2176 > (uint32_t) (buf->end - p))) 2177 { 2178 nxt_unit_req_warn(req, "realloc: not enought space for content" 2179 " #%"PRIu32", %"PRIu32" required", 2180 i, req->response->piggyback_content_length); 2181 2182 goto fail; 2183 } 2184 2185 resp->piggyback_content_length = 2186 req->response->piggyback_content_length; 2187 2188 nxt_unit_sptr_set(&resp->piggyback_content, p); 2189 p = nxt_cpymem(p, nxt_unit_sptr_get(&req->response->piggyback_content), 2190 req->response->piggyback_content_length); 2191 } 2192 2193 buf->free = p; 2194 2195 nxt_unit_buf_free(req->response_buf); 2196 2197 req->response = resp; 2198 req->response_buf = buf; 2199 req->response_max_fields = max_fields_count; 2200 2201 return NXT_UNIT_OK; 2202 2203 fail: 2204 2205 nxt_unit_buf_free(buf); 2206 2207 return NXT_UNIT_ERROR; 2208 } 2209 2210 2211 int 2212 nxt_unit_response_is_init(nxt_unit_request_info_t *req) 2213 { 2214 nxt_unit_request_info_impl_t *req_impl; 2215 2216 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2217 2218 return req_impl->state >= NXT_UNIT_RS_RESPONSE_INIT; 2219 } 2220 2221 2222 int 2223 nxt_unit_response_add_field(nxt_unit_request_info_t *req, 2224 const char *name, uint8_t name_length, 2225 const char *value, uint32_t value_length) 2226 { 2227 nxt_unit_buf_t *buf; 2228 nxt_unit_field_t *f; 2229 nxt_unit_response_t *resp; 2230 nxt_unit_request_info_impl_t *req_impl; 2231 2232 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2233 2234 if (nxt_slow_path(req_impl->state != NXT_UNIT_RS_RESPONSE_INIT)) { 2235 nxt_unit_req_warn(req, "add_field: response not initialized or " 2236 "already sent"); 2237 2238 return NXT_UNIT_ERROR; 2239 } 2240 2241 resp = req->response; 2242 2243 if (nxt_slow_path(resp->fields_count >= req->response_max_fields)) { 2244 nxt_unit_req_warn(req, "add_field: too many response fields (%d)", 2245 (int) resp->fields_count); 2246 2247 return NXT_UNIT_ERROR; 2248 } 2249 2250 buf = req->response_buf; 2251 2252 if (nxt_slow_path(name_length + value_length + 2 2253 > (uint32_t) (buf->end - buf->free))) 2254 { 2255 nxt_unit_req_warn(req, "add_field: response buffer overflow"); 2256 2257 return NXT_UNIT_ERROR; 2258 } 2259 2260 nxt_unit_req_debug(req, "add_field #%"PRIu32": %.*s: %.*s", 2261 resp->fields_count, 2262 (int) name_length, name, 2263 (int) value_length, value); 2264 2265 f = resp->fields + resp->fields_count; 2266 2267 nxt_unit_sptr_set(&f->name, buf->free); 2268 buf->free = nxt_cpymem(buf->free, name, name_length); 2269 *buf->free++ = '\0'; 2270 2271 nxt_unit_sptr_set(&f->value, buf->free); 2272 buf->free = nxt_cpymem(buf->free, value, value_length); 2273 *buf->free++ = '\0'; 2274 2275 f->hash = nxt_unit_field_hash(name, name_length); 2276 f->skip = 0; 2277 f->name_length = name_length; 2278 f->value_length = value_length; 2279 2280 resp->fields_count++; 2281 2282 return NXT_UNIT_OK; 2283 } 2284 2285 2286 int 2287 nxt_unit_response_add_content(nxt_unit_request_info_t *req, 2288 const void* src, uint32_t size) 2289 { 2290 nxt_unit_buf_t *buf; 2291 nxt_unit_response_t *resp; 2292 nxt_unit_request_info_impl_t *req_impl; 2293 2294 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2295 2296 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2297 nxt_unit_req_warn(req, "add_content: response not initialized yet"); 2298 2299 return NXT_UNIT_ERROR; 2300 } 2301 2302 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 2303 nxt_unit_req_warn(req, "add_content: response already sent"); 2304 2305 return NXT_UNIT_ERROR; 2306 } 2307 2308 buf = req->response_buf; 2309 2310 if (nxt_slow_path(size > (uint32_t) (buf->end - buf->free))) { 2311 nxt_unit_req_warn(req, "add_content: buffer overflow"); 2312 2313 return NXT_UNIT_ERROR; 2314 } 2315 2316 resp = req->response; 2317 2318 if (resp->piggyback_content_length == 0) { 2319 nxt_unit_sptr_set(&resp->piggyback_content, buf->free); 2320 req_impl->state = NXT_UNIT_RS_RESPONSE_HAS_CONTENT; 2321 } 2322 2323 resp->piggyback_content_length += size; 2324 2325 buf->free = nxt_cpymem(buf->free, src, size); 2326 2327 return NXT_UNIT_OK; 2328 } 2329 2330 2331 int 2332 nxt_unit_response_send(nxt_unit_request_info_t *req) 2333 { 2334 int rc; 2335 nxt_unit_mmap_buf_t *mmap_buf; 2336 nxt_unit_request_info_impl_t *req_impl; 2337 2338 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2339 2340 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2341 nxt_unit_req_warn(req, "send: response is not initialized yet"); 2342 2343 return NXT_UNIT_ERROR; 2344 } 2345 2346 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 2347 nxt_unit_req_warn(req, "send: response already sent"); 2348 2349 return NXT_UNIT_ERROR; 2350 } 2351 2352 if (req->request->websocket_handshake && req->response->status == 101) { 2353 nxt_unit_response_upgrade(req); 2354 } 2355 2356 nxt_unit_req_debug(req, "send: %"PRIu32" fields, %d bytes", 2357 req->response->fields_count, 2358 (int) (req->response_buf->free 2359 - req->response_buf->start)); 2360 2361 mmap_buf = nxt_container_of(req->response_buf, nxt_unit_mmap_buf_t, buf); 2362 2363 rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0); 2364 if (nxt_fast_path(rc == NXT_UNIT_OK)) { 2365 req->response = NULL; 2366 req->response_buf = NULL; 2367 req_impl->state = NXT_UNIT_RS_RESPONSE_SENT; 2368 2369 nxt_unit_mmap_buf_free(mmap_buf); 2370 } 2371 2372 return rc; 2373 } 2374 2375 2376 int 2377 nxt_unit_response_is_sent(nxt_unit_request_info_t *req) 2378 { 2379 nxt_unit_request_info_impl_t *req_impl; 2380 2381 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2382 2383 return req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT; 2384 } 2385 2386 2387 nxt_unit_buf_t * 2388 nxt_unit_response_buf_alloc(nxt_unit_request_info_t *req, uint32_t size) 2389 { 2390 int rc; 2391 nxt_unit_mmap_buf_t *mmap_buf; 2392 nxt_unit_request_info_impl_t *req_impl; 2393 2394 if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) { 2395 nxt_unit_req_warn(req, "response_buf_alloc: " 2396 "requested buffer (%"PRIu32") too big", size); 2397 2398 return NULL; 2399 } 2400 2401 nxt_unit_req_debug(req, "response_buf_alloc: %"PRIu32, size); 2402 2403 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2404 2405 mmap_buf = nxt_unit_mmap_buf_get(req->ctx); 2406 if (nxt_slow_path(mmap_buf == NULL)) { 2407 nxt_unit_req_alert(req, "response_buf_alloc: failed to allocate buf"); 2408 2409 return NULL; 2410 } 2411 2412 mmap_buf->req = req; 2413 2414 nxt_unit_mmap_buf_insert_tail(&req_impl->outgoing_buf, mmap_buf); 2415 2416 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, 2417 size, size, mmap_buf, 2418 NULL); 2419 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2420 nxt_unit_mmap_buf_release(mmap_buf); 2421 2422 nxt_unit_req_alert(req, "response_buf_alloc: failed to get out buf"); 2423 2424 return NULL; 2425 } 2426 2427 return &mmap_buf->buf; 2428 } 2429 2430 2431 static nxt_unit_mmap_buf_t * 2432 nxt_unit_mmap_buf_get(nxt_unit_ctx_t *ctx) 2433 { 2434 nxt_unit_mmap_buf_t *mmap_buf; 2435 nxt_unit_ctx_impl_t *ctx_impl; 2436 2437 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 2438 2439 pthread_mutex_lock(&ctx_impl->mutex); 2440 2441 if (ctx_impl->free_buf == NULL) { 2442 pthread_mutex_unlock(&ctx_impl->mutex); 2443 2444 mmap_buf = nxt_unit_malloc(ctx, sizeof(nxt_unit_mmap_buf_t)); 2445 if (nxt_slow_path(mmap_buf == NULL)) { 2446 return NULL; 2447 } 2448 2449 } else { 2450 mmap_buf = ctx_impl->free_buf; 2451 2452 nxt_unit_mmap_buf_unlink(mmap_buf); 2453 2454 pthread_mutex_unlock(&ctx_impl->mutex); 2455 } 2456 2457 mmap_buf->ctx_impl = ctx_impl; 2458 2459 mmap_buf->hdr = NULL; 2460 mmap_buf->free_ptr = NULL; 2461 2462 return mmap_buf; 2463 } 2464 2465 2466 static void 2467 nxt_unit_mmap_buf_release(nxt_unit_mmap_buf_t *mmap_buf) 2468 { 2469 nxt_unit_mmap_buf_unlink(mmap_buf); 2470 2471 pthread_mutex_lock(&mmap_buf->ctx_impl->mutex); 2472 2473 nxt_unit_mmap_buf_insert(&mmap_buf->ctx_impl->free_buf, mmap_buf); 2474 2475 pthread_mutex_unlock(&mmap_buf->ctx_impl->mutex); 2476 } 2477 2478 2479 int 2480 nxt_unit_request_is_websocket_handshake(nxt_unit_request_info_t *req) 2481 { 2482 return req->request->websocket_handshake; 2483 } 2484 2485 2486 int 2487 nxt_unit_response_upgrade(nxt_unit_request_info_t *req) 2488 { 2489 int rc; 2490 nxt_unit_request_info_impl_t *req_impl; 2491 2492 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2493 2494 if (nxt_slow_path(req_impl->websocket != 0)) { 2495 nxt_unit_req_debug(req, "upgrade: already upgraded"); 2496 2497 return NXT_UNIT_OK; 2498 } 2499 2500 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2501 nxt_unit_req_warn(req, "upgrade: response is not initialized yet"); 2502 2503 return NXT_UNIT_ERROR; 2504 } 2505 2506 if (nxt_slow_path(req_impl->state >= NXT_UNIT_RS_RESPONSE_SENT)) { 2507 nxt_unit_req_warn(req, "upgrade: response already sent"); 2508 2509 return NXT_UNIT_ERROR; 2510 } 2511 2512 rc = nxt_unit_request_hash_add(req->ctx, req); 2513 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2514 nxt_unit_req_warn(req, "upgrade: failed to add request to hash"); 2515 2516 return NXT_UNIT_ERROR; 2517 } 2518 2519 req_impl->websocket = 1; 2520 2521 req->response->status = 101; 2522 2523 return NXT_UNIT_OK; 2524 } 2525 2526 2527 int 2528 nxt_unit_response_is_websocket(nxt_unit_request_info_t *req) 2529 { 2530 nxt_unit_request_info_impl_t *req_impl; 2531 2532 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2533 2534 return req_impl->websocket; 2535 } 2536 2537 2538 nxt_unit_request_info_t * 2539 nxt_unit_get_request_info_from_data(void *data) 2540 { 2541 nxt_unit_request_info_impl_t *req_impl; 2542 2543 req_impl = nxt_container_of(data, nxt_unit_request_info_impl_t, extra_data); 2544 2545 return &req_impl->req; 2546 } 2547 2548 2549 int 2550 nxt_unit_buf_send(nxt_unit_buf_t *buf) 2551 { 2552 int rc; 2553 nxt_unit_mmap_buf_t *mmap_buf; 2554 nxt_unit_request_info_t *req; 2555 nxt_unit_request_info_impl_t *req_impl; 2556 2557 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); 2558 2559 req = mmap_buf->req; 2560 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2561 2562 nxt_unit_req_debug(req, "buf_send: %d bytes", 2563 (int) (buf->free - buf->start)); 2564 2565 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2566 nxt_unit_req_warn(req, "buf_send: response not initialized yet"); 2567 2568 return NXT_UNIT_ERROR; 2569 } 2570 2571 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_SENT)) { 2572 nxt_unit_req_warn(req, "buf_send: headers not sent yet"); 2573 2574 return NXT_UNIT_ERROR; 2575 } 2576 2577 if (nxt_fast_path(buf->free > buf->start)) { 2578 rc = nxt_unit_mmap_buf_send(req, mmap_buf, 0); 2579 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2580 return rc; 2581 } 2582 } 2583 2584 nxt_unit_mmap_buf_free(mmap_buf); 2585 2586 return NXT_UNIT_OK; 2587 } 2588 2589 2590 static void 2591 nxt_unit_buf_send_done(nxt_unit_buf_t *buf) 2592 { 2593 int rc; 2594 nxt_unit_mmap_buf_t *mmap_buf; 2595 nxt_unit_request_info_t *req; 2596 2597 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); 2598 2599 req = mmap_buf->req; 2600 2601 rc = nxt_unit_mmap_buf_send(req, mmap_buf, 1); 2602 if (nxt_slow_path(rc == NXT_UNIT_OK)) { 2603 nxt_unit_mmap_buf_free(mmap_buf); 2604 2605 nxt_unit_request_info_release(req); 2606 2607 } else { 2608 nxt_unit_request_done(req, rc); 2609 } 2610 } 2611 2612 2613 static int 2614 nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req, 2615 nxt_unit_mmap_buf_t *mmap_buf, int last) 2616 { 2617 struct { 2618 nxt_port_msg_t msg; 2619 nxt_port_mmap_msg_t mmap_msg; 2620 } m; 2621 2622 int rc; 2623 u_char *last_used, *first_free; 2624 ssize_t res; 2625 nxt_chunk_id_t first_free_chunk; 2626 nxt_unit_buf_t *buf; 2627 nxt_unit_impl_t *lib; 2628 nxt_port_mmap_header_t *hdr; 2629 nxt_unit_request_info_impl_t *req_impl; 2630 2631 lib = nxt_container_of(req->ctx->unit, nxt_unit_impl_t, unit); 2632 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2633 2634 buf = &mmap_buf->buf; 2635 hdr = mmap_buf->hdr; 2636 2637 m.mmap_msg.size = buf->free - buf->start; 2638 2639 m.msg.stream = req_impl->stream; 2640 m.msg.pid = lib->pid; 2641 m.msg.reply_port = 0; 2642 m.msg.type = _NXT_PORT_MSG_DATA; 2643 m.msg.last = last != 0; 2644 m.msg.mmap = hdr != NULL && m.mmap_msg.size > 0; 2645 m.msg.nf = 0; 2646 m.msg.mf = 0; 2647 2648 rc = NXT_UNIT_ERROR; 2649 2650 if (m.msg.mmap) { 2651 m.mmap_msg.mmap_id = hdr->id; 2652 m.mmap_msg.chunk_id = nxt_port_mmap_chunk_id(hdr, 2653 (u_char *) buf->start); 2654 2655 nxt_unit_debug(req->ctx, "#%"PRIu32": send mmap: (%d,%d,%d)", 2656 req_impl->stream, 2657 (int) m.mmap_msg.mmap_id, 2658 (int) m.mmap_msg.chunk_id, 2659 (int) m.mmap_msg.size); 2660 2661 res = nxt_unit_port_send(req->ctx, req->response_port, &m, sizeof(m), 2662 NULL); 2663 if (nxt_slow_path(res != sizeof(m))) { 2664 goto free_buf; 2665 } 2666 2667 last_used = (u_char *) buf->free - 1; 2668 first_free_chunk = nxt_port_mmap_chunk_id(hdr, last_used) + 1; 2669 2670 if (buf->end - buf->free >= PORT_MMAP_CHUNK_SIZE) { 2671 first_free = nxt_port_mmap_chunk_start(hdr, first_free_chunk); 2672 2673 buf->start = (char *) first_free; 2674 buf->free = buf->start; 2675 2676 if (buf->end < buf->start) { 2677 buf->end = buf->start; 2678 } 2679 2680 } else { 2681 buf->start = NULL; 2682 buf->free = NULL; 2683 buf->end = NULL; 2684 2685 mmap_buf->hdr = NULL; 2686 } 2687 2688 nxt_atomic_fetch_add(&lib->outgoing.allocated_chunks, 2689 (int) m.mmap_msg.chunk_id - (int) first_free_chunk); 2690 2691 nxt_unit_debug(req->ctx, "allocated_chunks %d", 2692 (int) lib->outgoing.allocated_chunks); 2693 2694 } else { 2695 if (nxt_slow_path(mmap_buf->plain_ptr == NULL 2696 || mmap_buf->plain_ptr > buf->start - sizeof(m.msg))) 2697 { 2698 nxt_unit_alert(req->ctx, 2699 "#%"PRIu32": failed to send plain memory buffer" 2700 ": no space reserved for message header", 2701 req_impl->stream); 2702 2703 goto free_buf; 2704 } 2705 2706 memcpy(buf->start - sizeof(m.msg), &m.msg, sizeof(m.msg)); 2707 2708 nxt_unit_debug(req->ctx, "#%"PRIu32": send plain: %d", 2709 req_impl->stream, 2710 (int) (sizeof(m.msg) + m.mmap_msg.size)); 2711 2712 res = nxt_unit_port_send(req->ctx, req->response_port, 2713 buf->start - sizeof(m.msg), 2714 m.mmap_msg.size + sizeof(m.msg), NULL); 2715 2716 if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) { 2717 goto free_buf; 2718 } 2719 } 2720 2721 rc = NXT_UNIT_OK; 2722 2723 free_buf: 2724 2725 nxt_unit_free_outgoing_buf(mmap_buf); 2726 2727 return rc; 2728 } 2729 2730 2731 void 2732 nxt_unit_buf_free(nxt_unit_buf_t *buf) 2733 { 2734 nxt_unit_mmap_buf_free(nxt_container_of(buf, nxt_unit_mmap_buf_t, buf)); 2735 } 2736 2737 2738 static void 2739 nxt_unit_mmap_buf_free(nxt_unit_mmap_buf_t *mmap_buf) 2740 { 2741 nxt_unit_free_outgoing_buf(mmap_buf); 2742 2743 nxt_unit_mmap_buf_release(mmap_buf); 2744 } 2745 2746 2747 static void 2748 nxt_unit_free_outgoing_buf(nxt_unit_mmap_buf_t *mmap_buf) 2749 { 2750 if (mmap_buf->hdr != NULL) { 2751 nxt_unit_mmap_release(&mmap_buf->ctx_impl->ctx, 2752 mmap_buf->hdr, mmap_buf->buf.start, 2753 mmap_buf->buf.end - mmap_buf->buf.start); 2754 2755 mmap_buf->hdr = NULL; 2756 2757 return; 2758 } 2759 2760 if (mmap_buf->free_ptr != NULL) { 2761 nxt_unit_free(&mmap_buf->ctx_impl->ctx, mmap_buf->free_ptr); 2762 2763 mmap_buf->free_ptr = NULL; 2764 } 2765 } 2766 2767 2768 static nxt_unit_read_buf_t * 2769 nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx) 2770 { 2771 nxt_unit_ctx_impl_t *ctx_impl; 2772 nxt_unit_read_buf_t *rbuf; 2773 2774 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 2775 2776 pthread_mutex_lock(&ctx_impl->mutex); 2777 2778 rbuf = nxt_unit_read_buf_get_impl(ctx_impl); 2779 2780 pthread_mutex_unlock(&ctx_impl->mutex); 2781 2782 rbuf->oob.size = 0; 2783 2784 return rbuf; 2785 } 2786 2787 2788 static nxt_unit_read_buf_t * 2789 nxt_unit_read_buf_get_impl(nxt_unit_ctx_impl_t *ctx_impl) 2790 { 2791 nxt_queue_link_t *link; 2792 nxt_unit_read_buf_t *rbuf; 2793 2794 if (!nxt_queue_is_empty(&ctx_impl->free_rbuf)) { 2795 link = nxt_queue_first(&ctx_impl->free_rbuf); 2796 nxt_queue_remove(link); 2797 2798 rbuf = nxt_container_of(link, nxt_unit_read_buf_t, link); 2799 2800 return rbuf; 2801 } 2802 2803 rbuf = nxt_unit_malloc(&ctx_impl->ctx, sizeof(nxt_unit_read_buf_t)); 2804 2805 if (nxt_fast_path(rbuf != NULL)) { 2806 rbuf->ctx_impl = ctx_impl; 2807 } 2808 2809 return rbuf; 2810 } 2811 2812 2813 static void 2814 nxt_unit_read_buf_release(nxt_unit_ctx_t *ctx, 2815 nxt_unit_read_buf_t *rbuf) 2816 { 2817 nxt_unit_ctx_impl_t *ctx_impl; 2818 2819 ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx); 2820 2821 pthread_mutex_lock(&ctx_impl->mutex); 2822 2823 nxt_queue_insert_head(&ctx_impl->free_rbuf, &rbuf->link); 2824 2825 pthread_mutex_unlock(&ctx_impl->mutex); 2826 } 2827 2828 2829 nxt_unit_buf_t * 2830 nxt_unit_buf_next(nxt_unit_buf_t *buf) 2831 { 2832 nxt_unit_mmap_buf_t *mmap_buf; 2833 2834 mmap_buf = nxt_container_of(buf, nxt_unit_mmap_buf_t, buf); 2835 2836 if (mmap_buf->next == NULL) { 2837 return NULL; 2838 } 2839 2840 return &mmap_buf->next->buf; 2841 } 2842 2843 2844 uint32_t 2845 nxt_unit_buf_max(void) 2846 { 2847 return PORT_MMAP_DATA_SIZE; 2848 } 2849 2850 2851 uint32_t 2852 nxt_unit_buf_min(void) 2853 { 2854 return PORT_MMAP_CHUNK_SIZE; 2855 } 2856 2857 2858 int 2859 nxt_unit_response_write(nxt_unit_request_info_t *req, const void *start, 2860 size_t size) 2861 { 2862 ssize_t res; 2863 2864 res = nxt_unit_response_write_nb(req, start, size, size); 2865 2866 return res < 0 ? -res : NXT_UNIT_OK; 2867 } 2868 2869 2870 ssize_t 2871 nxt_unit_response_write_nb(nxt_unit_request_info_t *req, const void *start, 2872 size_t size, size_t min_size) 2873 { 2874 int rc; 2875 ssize_t sent; 2876 uint32_t part_size, min_part_size, buf_size; 2877 const char *part_start; 2878 nxt_unit_mmap_buf_t mmap_buf; 2879 nxt_unit_request_info_impl_t *req_impl; 2880 char local_buf[NXT_UNIT_LOCAL_BUF_SIZE]; 2881 2882 nxt_unit_req_debug(req, "write: %d", (int) size); 2883 2884 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2885 2886 part_start = start; 2887 sent = 0; 2888 2889 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2890 nxt_unit_req_alert(req, "write: response not initialized yet"); 2891 2892 return -NXT_UNIT_ERROR; 2893 } 2894 2895 /* Check if response is not send yet. */ 2896 if (nxt_slow_path(req->response_buf != NULL)) { 2897 part_size = req->response_buf->end - req->response_buf->free; 2898 part_size = nxt_min(size, part_size); 2899 2900 rc = nxt_unit_response_add_content(req, part_start, part_size); 2901 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2902 return -rc; 2903 } 2904 2905 rc = nxt_unit_response_send(req); 2906 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2907 return -rc; 2908 } 2909 2910 size -= part_size; 2911 part_start += part_size; 2912 sent += part_size; 2913 2914 min_size -= nxt_min(min_size, part_size); 2915 } 2916 2917 while (size > 0) { 2918 part_size = nxt_min(size, PORT_MMAP_DATA_SIZE); 2919 min_part_size = nxt_min(min_size, part_size); 2920 min_part_size = nxt_min(min_part_size, PORT_MMAP_CHUNK_SIZE); 2921 2922 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, part_size, 2923 min_part_size, &mmap_buf, local_buf); 2924 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2925 return -rc; 2926 } 2927 2928 buf_size = mmap_buf.buf.end - mmap_buf.buf.free; 2929 if (nxt_slow_path(buf_size == 0)) { 2930 return sent; 2931 } 2932 part_size = nxt_min(buf_size, part_size); 2933 2934 mmap_buf.buf.free = nxt_cpymem(mmap_buf.buf.free, 2935 part_start, part_size); 2936 2937 rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0); 2938 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2939 return -rc; 2940 } 2941 2942 size -= part_size; 2943 part_start += part_size; 2944 sent += part_size; 2945 2946 min_size -= nxt_min(min_size, part_size); 2947 } 2948 2949 return sent; 2950 } 2951 2952 2953 int 2954 nxt_unit_response_write_cb(nxt_unit_request_info_t *req, 2955 nxt_unit_read_info_t *read_info) 2956 { 2957 int rc; 2958 ssize_t n; 2959 uint32_t buf_size; 2960 nxt_unit_buf_t *buf; 2961 nxt_unit_mmap_buf_t mmap_buf; 2962 nxt_unit_request_info_impl_t *req_impl; 2963 char local_buf[NXT_UNIT_LOCAL_BUF_SIZE]; 2964 2965 req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req); 2966 2967 if (nxt_slow_path(req_impl->state < NXT_UNIT_RS_RESPONSE_INIT)) { 2968 nxt_unit_req_alert(req, "write: response not initialized yet"); 2969 2970 return NXT_UNIT_ERROR; 2971 } 2972 2973 /* Check if response is not send yet. */ 2974 if (nxt_slow_path(req->response_buf != NULL)) { 2975 2976 /* Enable content in headers buf. */ 2977 rc = nxt_unit_response_add_content(req, "", 0); 2978 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 2979 nxt_unit_req_error(req, "Failed to add piggyback content"); 2980 2981 return rc; 2982 } 2983 2984 buf = req->response_buf; 2985 2986 while (buf->end - buf->free > 0) { 2987 n = read_info->read(read_info, buf->free, buf->end - buf->free); 2988 if (nxt_slow_path(n < 0)) { 2989 nxt_unit_req_error(req, "Read error"); 2990 2991 return NXT_UNIT_ERROR; 2992 } 2993 2994 /* Manually increase sizes. */ 2995 buf->free += n; 2996 req->response->piggyback_content_length += n; 2997 2998 if (read_info->eof) { 2999 break; 3000 } 3001 } 3002 3003 rc = nxt_unit_response_send(req); 3004 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3005 nxt_unit_req_error(req, "Failed to send headers with content"); 3006 3007 return rc; 3008 } 3009 3010 if (read_info->eof) { 3011 return NXT_UNIT_OK; 3012 } 3013 } 3014 3015 while (!read_info->eof) { 3016 nxt_unit_req_debug(req, "write_cb, alloc %"PRIu32"", 3017 read_info->buf_size); 3018 3019 buf_size = nxt_min(read_info->buf_size, PORT_MMAP_DATA_SIZE); 3020 3021 rc = nxt_unit_get_outgoing_buf(req->ctx, req->response_port, 3022 buf_size, buf_size, 3023 &mmap_buf, local_buf); 3024 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3025 return rc; 3026 } 3027 3028 buf = &mmap_buf.buf; 3029 3030 while (!read_info->eof && buf->end > buf->free) { 3031 n = read_info->read(read_info, buf->free, buf->end - buf->free); 3032 if (nxt_slow_path(n < 0)) { 3033 nxt_unit_req_error(req, "Read error"); 3034 3035 nxt_unit_free_outgoing_buf(&mmap_buf); 3036 3037 return NXT_UNIT_ERROR; 3038 } 3039 3040 buf->free += n; 3041 } 3042 3043 rc = nxt_unit_mmap_buf_send(req, &mmap_buf, 0); 3044 if (nxt_slow_path(rc != NXT_UNIT_OK)) { 3045 nxt_unit_req_error(req, "Failed to send content"); 3046 3047 return rc; 3048 } 3049 } 3050 3051 return NXT_UNIT_OK; 3052 } 3053 3054 3055 ssize_t 3056 nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size) 3057 { 3058 ssize_t buf_res, res; 3059 3060 buf_res = nxt_unit_buf_read(&req->content_buf, &req->content_length, 3061 dst, size); 3062 3063 if (buf_res < (ssize_t) size && req->content_fd != -1) { 3064 res = read(req->content_fd, dst, size); 3065 if (nxt_slow_path(res < 0)) { 3066 nxt_unit_req_alert(req, "failed to read content: %s (%d)", 3067 strerror(errno), errno); 3068 3069 return res; 3070 } 3071 3072 if (res < (ssize_t) size) { 3073 nxt_unit_close(req->content_fd); 3074 3075 req->content_fd = -1; 3076 } 3077 3078 req->content_length -= res; 3079 3080 dst = nxt_pointer_to(dst, res); 3081 3082 } else { 3083 res = 0; 3084 } 3085 3086 return buf_res + res; 3087 } 3088 3089 3090 ssize_t 3091 nxt_unit_request_readline_size(nxt_unit_request_info_t *req, size_t max_size) 3092 { 3093 char *p; 3094 size_t l_size, b_size; 3095 nxt_unit_buf_t *b; 3096 nxt_unit_mmap_buf_t *mmap_buf, *preread_buf; 3097 3098 if (req->content_length == 0) { 3099 return 0; 3100 } 3101 3102 l_size = 0; 3103 3104 b = req->content_buf; 3105 3106 while (b != NULL) { 3107 b_size = b->end - b->free; 3108 p = memchr(b->free, '\n', b_size); 3109 3110 if (p != NULL) { 3111 p++; 3112 l_size += p - b->free; 3113 break; 3114 } 3115 3116 l_size += b_size; 3117 3118 if (max_size <= l_size) { 3119 break; 3120 } 3121 3122 mmap_buf = nxt_container_of(b, nxt_unit_mmap_buf_t, buf); 3123 if (mmap_buf->next == NULL 3124 && req->content_fd != -1 3125 && l_size < req->content_length) 3126 { 3127 preread_buf = nxt_unit_request_preread(req, 16384); 3128 if (nxt_slow_path(preread_buf == NULL)) { 3129 return -1; 3130 } 3131 3132 nxt_unit_mmap_buf_insert(&mmap_buf->next, preread_buf); 3133 } 3134 3135 b = nxt_unit_buf_next(b); 3136 } 3137 3138 return nxt_min(max_size, l_size); 3139 } 3140 3141